[xiph-commits] r7178 - icecast/branches/icecast-singleq/src

karl at dactyl.lonelymoon.com karl
Mon Jul 19 16:36:02 PDT 2004


Author: karl
Date: Mon Jul 19 16:36:02 2004
New Revision: 7178

Modified:
icecast/branches/icecast-singleq/src/admin.c
icecast/branches/icecast-singleq/src/client.c
icecast/branches/icecast-singleq/src/client.h
icecast/branches/icecast-singleq/src/connection.c
icecast/branches/icecast-singleq/src/format.c
icecast/branches/icecast-singleq/src/format.h
icecast/branches/icecast-singleq/src/format_mp3.c
icecast/branches/icecast-singleq/src/format_mp3.h
icecast/branches/icecast-singleq/src/format_vorbis.c
icecast/branches/icecast-singleq/src/format_vorbis.h
icecast/branches/icecast-singleq/src/refbuf.c
icecast/branches/icecast-singleq/src/refbuf.h
icecast/branches/icecast-singleq/src/source.c
icecast/branches/icecast-singleq/src/source.h
Log:
add single queue handling, and change format handlers to handle it


Modified: icecast/branches/icecast-singleq/src/admin.c
===================================================================
--- icecast/branches/icecast-singleq/src/admin.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/admin.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -799,11 +799,7 @@

state = source->format->_state;

-    thread_mutex_lock(&(state->lock));
-    free(state->metadata);
-    state->metadata = strdup(value);
-    state->metadata_age++;
-    thread_mutex_unlock(&(state->lock));
+    mp3_set_tag (source->format, "title", value);

DEBUG2("Metadata on mountpoint %s changed to \"%s\"",
source->mount, value);

Modified: icecast/branches/icecast-singleq/src/client.c
===================================================================
--- icecast/branches/icecast-singleq/src/client.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/client.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -42,7 +42,7 @@

client->con = con;
client->parser = parser;
-    client->queue = NULL;
+    client->refbuf = NULL;
client->pos = 0;
client->burst_sent = 0;

@@ -51,8 +51,6 @@

void client_destroy(client_t *client)
{
-    refbuf_t *refbuf;
-
if (client == NULL)
return;
/* write log entry if ip is set (some things don't set it, like outgoing
@@ -64,9 +62,6 @@
connection_close(client->con);
httpp_destroy(client->parser);

-    while ((refbuf = refbuf_queue_remove(&client->queue)))
-        refbuf_release(refbuf);
-
/* we need to free client specific format data (if any) */
if (client->free_client_data)
client->free_client_data (client);

Modified: icecast/branches/icecast-singleq/src/client.h
===================================================================
--- icecast/branches/icecast-singleq/src/client.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/client.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -32,8 +32,9 @@
/* http response code for this client */
int respcode;

-    /* buffer queue */
-    refbuf_queue_t *queue;
+    /* where in the queue the client is */
+    refbuf_t *refbuf;
+
/* position in first buffer */
unsigned long pos;


Modified: icecast/branches/icecast-singleq/src/connection.c
===================================================================
--- icecast/branches/icecast-singleq/src/connection.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/connection.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -465,9 +465,8 @@
"for icecast 1.x relays. Assuming content is mp3.");
format_type = FORMAT_TYPE_MP3;
}
-        source->format = format_get_plugin (format_type, source->mount, source->parser);

-        if (source->format == NULL)
+        if (format_get_plugin (format_type, source) < 0)
{
global_unlock();
config_release_config();
@@ -960,8 +959,7 @@
global.clients++;
global_unlock();

-        client->format_data = source->format->create_client_data(
-                source->format, source, client);
+        source->format->create_client_data (source, client);

source->format->client_send_headers(source->format, source, client);


Modified: icecast/branches/icecast-singleq/src/format.c
===================================================================
--- icecast/branches/icecast-singleq/src/format.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -75,34 +75,24 @@
}
}

-format_plugin_t *format_get_plugin(format_type_t type, char *mount,
-        http_parser_t *parser)
+int format_get_plugin(format_type_t type, source_t *source)
{
-    format_plugin_t *plugin;
+    int ret = -1;

switch (type) {
case FORMAT_TYPE_VORBIS:
-        plugin = format_vorbis_get_plugin();
-        if (plugin) plugin->mount = mount;
+        ret = format_vorbis_get_plugin (source);
break;
case FORMAT_TYPE_MP3:
-        plugin = format_mp3_get_plugin(parser);
-        if (plugin) plugin->mount = mount;
+        ret = format_mp3_get_plugin (source);
break;
default:
-        plugin = NULL;
break;
}

-    return plugin;
+    return ret;
}

-int format_generic_write_buf_to_client(format_plugin_t *format,
-        client_t *client, unsigned char *buf, int len)
-{
-    return client_send_bytes (client, buf, len);
-}
-
void format_send_general_headers(format_plugin_t *format,
source_t *source, client_t *client)
{

Modified: icecast/branches/icecast-singleq/src/format.h
===================================================================
--- icecast/branches/icecast-singleq/src/format.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -45,13 +45,10 @@
*/
int has_predata;

-    int (*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long
-            len, refbuf_t **buffer);
-    refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self);
-    int (*write_buf_to_client)(struct _format_plugin_tag *format,
-            client_t *client, unsigned char *buf, int len);
-    void *(*create_client_data)(struct _format_plugin_tag *format,
-            struct source_tag *source, client_t *client);
+    refbuf_t *(*get_buffer)(struct source_tag *);
+    int (*write_buf_to_client)(struct _format_plugin_tag *format, client_t *client);
+    void (*write_buf_to_file)(struct source_tag *source, refbuf_t *refbuf);
+    int (*create_client_data)(struct source_tag *source, client_t *client);
void (*client_send_headers)(struct _format_plugin_tag *format,
struct source_tag *source, client_t *client);
void (*free_plugin)(struct _format_plugin_tag *self);
@@ -62,11 +59,8 @@

format_type_t format_get_type(char *contenttype);
char *format_get_mimetype(format_type_t type);
-format_plugin_t *format_get_plugin(format_type_t type, char *mount,
-        http_parser_t *parser);
+int format_get_plugin(format_type_t type, struct source_tag *source);

-int format_generic_write_buf_to_client(format_plugin_t *format,
-        client_t *client, unsigned char *buf, int len);
void format_send_general_headers(format_plugin_t *format,
struct source_tag *source, client_t *client);


Modified: icecast/branches/icecast-singleq/src/format_mp3.c
===================================================================
--- icecast/branches/icecast-singleq/src/format_mp3.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_mp3.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -1,4 +1,3 @@
-/* -*- c-basic-offset: 4; -*- */
/* Icecast
*
* This program is distributed under the GNU General Public License, version 2.
@@ -11,6 +10,7 @@
*                      and others (see AUTHORS for details).
*/

+/* -*- c-basic-offset: 4; -*- */
/* format_mp3.c
**
** format plugin for mp3
@@ -54,38 +54,38 @@
#define ICY_METADATA_INTERVAL 16000

static void format_mp3_free_plugin(format_plugin_t *self);
-static int format_mp3_get_buffer(format_plugin_t *self, char *data,
-        unsigned long len, refbuf_t **buffer);
-static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self);
-static void *format_mp3_create_client_data(format_plugin_t *self,
-        source_t *source, client_t *client);
+static refbuf_t *mp3_get_filter_meta (source_t *source);
+static refbuf_t *mp3_get_no_meta (source_t *source);
+
+static int  format_mp3_create_client_data (source_t *source, client_t *client);
static void free_mp3_client_data (client_t *client);
-static int format_mp3_write_buf_to_client(format_plugin_t *self,
-        client_t *client, unsigned char *buf, int len);
+static int format_mp3_write_buf_to_client(format_plugin_t *self, client_t *client);
static void format_mp3_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
+static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf);

+
typedef struct {
int use_metadata;
-   int interval;
-   int offset;
-   int metadata_age;
int metadata_offset;
+   unsigned since_meta_block;
+   int in_metadata;
+   refbuf_t *associated;
} mp3_client_data;

-format_plugin_t *format_mp3_get_plugin(http_parser_t *parser)
+int format_mp3_get_plugin (source_t *source)
{
char *metadata;
format_plugin_t *plugin;
mp3_state *state = calloc(1, sizeof(mp3_state));
+    refbuf_t *meta;

plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));

plugin->type = FORMAT_TYPE_MP3;
-    plugin->has_predata = 0;
-    plugin->get_buffer = format_mp3_get_buffer;
-    plugin->get_predata = format_mp3_get_predata;
+    plugin->get_buffer = mp3_get_no_meta;
plugin->write_buf_to_client = format_mp3_write_buf_to_client;
+    plugin->write_buf_to_file = write_mp3_to_file;
plugin->create_client_data = format_mp3_create_client_data;
plugin->client_send_headers = format_mp3_send_headers;
plugin->free_plugin = format_mp3_free_plugin;
@@ -93,309 +93,477 @@

plugin->_state = state;

-    state->metadata_age = 0;
-    state->metadata = strdup("");
-    thread_mutex_create(&(state->lock));
+    meta = refbuf_new (1);
+    memcpy (meta->data, "", 1);
+    state->metadata = meta;
+    state->interval = ICY_METADATA_INTERVAL;

-    metadata = httpp_getvar(parser, "icy-metaint");
-    if(metadata)
-        state->inline_metadata_interval = atoi(metadata);
+    metadata = httpp_getvar (source->parser, "icy-metaint");
+    if (metadata)
+    {
+        state->inline_metadata_interval = atoi (metadata);
+        state->offset = 0;
+        plugin->get_buffer = mp3_get_filter_meta;
+    }
+    source->format = plugin;
+    thread_mutex_create (&state->url_lock);

-    return plugin;
+    return 0;
}


-static int send_metadata(client_t *client, mp3_client_data *client_state,
-        mp3_state *source_state)
+void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value)
{
-    int len_byte;
-    int len;
-    int ret = -1;
-    unsigned char *buf;
-    int source_age;
-    char *fullmetadata = NULL;
-    int  fullmetadata_size = 0;
-    const char meta_fmt[] = "StreamTitle='';";
+    mp3_state *source_mp3 = plugin->_state;
+    unsigned len;
+    const char meta[] = "StreamTitle='";
+    int size = sizeof (meta) + 1;

-    do
+    if (tag==NULL || value == NULL)
+        return;
+
+    len = strlen (value)+1;
+    size += len;
+    /* protect against multiple updaters */
+    thread_mutex_lock (&source_mp3->url_lock);
+    if (strcmp (tag, "title") == 0 || strcmp (tag, "song") == 0)
{
-        thread_mutex_lock (&(source_state->lock));
-        if (source_state->metadata == NULL)
-            break; /* Shouldn't be possible */
-
-        fullmetadata_size = strlen (source_state->metadata) + sizeof (meta_fmt);
-
-	/* Noted without comment: When the metadata string is a multiple of
-	 * 16 bytes, the "reference" shoutcast server increases the length
-	 * byte by one and appends 16 nulls, rather than omitting the
-	 * pointless trailing null. */
-        if (fullmetadata_size > 4079)
+        char *p = strdup (value);
+        if (p)
{
-            fullmetadata_size = 4079;
+            free (source_mp3->url_title);
+            free (source_mp3->url_artist);
+            source_mp3->url_artist = NULL;
+            source_mp3->url_title = p;
+            source_mp3->update_metadata = 1;
}
-        fullmetadata = malloc (fullmetadata_size);
-        if (fullmetadata == NULL)
-            break;
+    }
+    else if (strcmp (tag, "artist") == 0)
+    {
+        char *p = strdup (value);
+        if (p)
+        {
+            free (source_mp3->url_artist);
+            source_mp3->url_artist = p;
+            source_mp3->update_metadata = 1;
+        }
+    }
+    thread_mutex_unlock (&source_mp3->url_lock);
+}

-        fullmetadata_size = snprintf (fullmetadata, fullmetadata_size,
-                "StreamTitle='%.*s';", fullmetadata_size-(sizeof (meta_fmt)-1), source_state->metadata);

-        source_age = source_state->metadata_age;
+static void filter_shoutcast_metadata (source_t *source, char *metadata, unsigned meta_len)
+{
+    if (metadata)
+    {
+        char *end, *p;
+        int len;

-        if (fullmetadata_size > 0 && source_age != client_state->metadata_age)
+        do
{
-            len_byte = (fullmetadata_size)/16 + 1; /* to give 1-255 */
-            client_state->metadata_offset = 0;
-        }
-        else
-            len_byte = 0;
-        len = 1 + len_byte*16;
-        buf = calloc (1, len);
-        if (buf == NULL)
-            break;
+            metadata++;
+            if (strncmp (metadata, "StreamTitle='", 13))
+                break;
+            if ((end = strstr (metadata, "\';")) == NULL)
+                break;
+            len = (end - metadata) - 13;
+            p = calloc (1, len+1);
+            if (p)
+            {
+                memcpy (p, metadata+13, len);
+                stats_event (source->mount, "title", p);
+                yp_touch (source->mount);
+                free (p);
+            }
+        } while (0);
+    }
+}

-        buf[0] = len_byte;

-        if (len > 1) {
-            strncpy (buf+1, fullmetadata, len-1);
-            buf[len-1] = '\0';
-        }
+/* called from the source thread when the metadata has been updated.
+ * The artist title are checked and made ready for clients to send
+ */
+void mp3_set_title (source_t *source)
+{
+    const char meta[] = "StreamTitle='";
+    int size;
+    unsigned char len_byte;
+    refbuf_t *p;
+    unsigned len = sizeof(meta) + 2; /* the StreamTitle, quotes, ; and null */
+    mp3_state *source_mp3 = source->format->_state;

-        thread_mutex_unlock (&(source_state->lock));
+    /* make sure the url data does not disappear from under us */
+    thread_mutex_lock (&source_mp3->url_lock);

-        /* only write what hasn't been written already */
-        ret = client_send_bytes (client, buf + client_state->metadata_offset,
-                len - client_state->metadata_offset);
+    /* work out message length */
+    if (source_mp3->url_artist)
+        len += strlen (source_mp3->url_artist);
+    if (source_mp3->url_title)
+        len += strlen (source_mp3->url_title);
+    if (source_mp3->url_artist && source_mp3->url_title)
+        len += 3;
+#define MAX_META_LEN 255*16
+    if (len > MAX_META_LEN)
+    {
+        thread_mutex_unlock (&source_mp3->url_lock);
+        WARN1 ("Metadata too long at %d chars", len);
+        return;
+    }
+    /* work out the metadata len byte */
+    len_byte = (len-1) / 16 + 1;

-        if (ret > 0 && ret < len) {
-            client_state->metadata_offset += ret;
-        }
-        else if (ret == len) {
-            client_state->metadata_age = source_age;
-            client_state->offset = 0;
-            client_state->metadata_offset = 0;
-        }
-        free (buf);
-        free (fullmetadata);
-        return ret;
+    /* now we know how much space to allocate, +1 for the len byte */
+    size = len_byte * 16 + 1;

-    } while (0);
+    p = refbuf_new (size);
+    if (p)
+    {
+        mp3_state *source_mp3 = source->format->_state;

-    thread_mutex_unlock(&(source_state->lock));
-    free (fullmetadata);
-    return -1;
+        memset (p->data, '\0', size);
+        if (source_mp3->url_artist && source_mp3->url_title)
+            snprintf (p->data, size, "%c%s%s - %s';", len_byte, meta,
+                    source_mp3->url_artist, source_mp3->url_title);
+        else
+            snprintf (p->data, size, "%c%s%s';", len_byte, meta,
+                    source_mp3->url_title);
+        filter_shoutcast_metadata (source, p->data, size);
+
+        refbuf_release (source_mp3->metadata);
+        source_mp3->metadata = p;
+    }
+    thread_mutex_unlock (&source_mp3->url_lock);
}

-static int format_mp3_write_buf_to_client(format_plugin_t *self,
-    client_t *client, unsigned char *buf, int len)
+
+/* send the appropriate metadata, and return the number of bytes written
+ * which is 0 or greater.  Check the client in_metadata value afterwards
+ * to see if all metadata has been sent
+ */
+static int send_mp3_metadata (client_t *client, refbuf_t *associated)
{
int ret = 0;
-    mp3_client_data *mp3data = client->format_data;
-
-    if(((mp3_state *)self->_state)->metadata && mp3data->use_metadata)
+    unsigned char *metadata;
+    int meta_len;
+    mp3_client_data *client_mp3 = client->format_data;
+
+    /* If there is a change in metadata then send it else
+     * send a single zero value byte in its place
+     */
+    if (associated == client_mp3->associated)
{
-        mp3_client_data *state = client->format_data;
-        int max = state->interval - state->offset;
+        metadata = "\0";
+        meta_len = 1;
+    }
+    else
+    {
+        metadata = associated->data + client_mp3->metadata_offset;
+        meta_len = associated->len - client_mp3->metadata_offset;
+    }
+    ret = client_send_bytes (client, metadata, meta_len);

-        if(len == 0) /* Shouldn't happen */
-            return 0;
+    if (ret == meta_len)
+    {
+        client_mp3->associated = associated;
+        client_mp3->metadata_offset = 0;
+        client_mp3->in_metadata = 0;
+        client_mp3->since_meta_block = 0;
+        return ret;
+    }
+    if (ret > 0)
+        client_mp3->metadata_offset += ret;
+    else
+        ret = 0;
+    client_mp3->in_metadata = 1;

-        if(max > len)
-            max = len;
+    return ret;
+}

-        if(max > 0) {
-            ret = client_send_bytes (client, buf, max);
-            if(ret > 0)
-                state->offset += ret;
+
+/* Handler for writing mp3 data to a client, taking into account whether
+ * client has requested shoutcast style metadata updates
+ */
+static int format_mp3_write_buf_to_client (format_plugin_t *self, client_t *client)
+{
+    int ret, written = 0;
+    mp3_client_data *client_mp3 = client->format_data;
+    mp3_state *source_mp3 = self->_state;
+    refbuf_t *refbuf = client->refbuf;
+    char *buf;
+    unsigned len;
+
+    if (refbuf->next == NULL && client->pos == refbuf->len)
+        return 0;
+    buf = refbuf->data + client->pos;
+    len = refbuf->len - client->pos;
+
+    do
+    {
+        /* send any unwritten metadata to the client */
+        if (client_mp3->in_metadata)
+        {
+            refbuf_t *associated = refbuf->associated;
+            ret = send_mp3_metadata (client, associated);
+
+            if (client_mp3->in_metadata)
+                break;
+            written += ret;
}
-        else {
-            send_metadata (client, state, self->_state);
+        /* see if we need to send the current metadata to the client */
+        if (client_mp3->use_metadata)
+        {
+            unsigned remaining = source_mp3->interval -
+                client_mp3->since_meta_block;
+
+            /* sending the metadata block */
+            if (remaining <= len)
+            {
+                /* send any mp3 before the metadata block */
+                if (remaining)
+                {
+                    ret = client_send_bytes (client, buf, remaining);
+
+                    if (ret > 0)
+                    {
+                        client_mp3->since_meta_block += ret;
+                        client->pos += ret;
+                    }
+                    if (ret < (int)remaining)
+                        break;
+                    written += ret;
+                }
+                ret = send_mp3_metadata (client, refbuf->associated);
+                if (client_mp3->in_metadata)
+                    break;
+                written += ret;
+                /* change buf and len */
+                buf += remaining;
+                len -= remaining;
+            }
}
+        /* write any mp3, maybe after the metadata block */
+        if (len)
+        {
+            ret = client_send_bytes (client, buf, len);

-    }
-    else {
-        ret = client_send_bytes (client, buf, len);
-    }
+            if (ret > 0)
+            {
+                client_mp3->since_meta_block += ret;
+                client->pos += ret;
+            }
+            if (ret < (int)len)
+                break;
+            written += ret;
+        }
+        ret = 0;
+        /* we have now written what we needed to so move to the next buffer */
+        if (refbuf->next)
+        {
+            client->refbuf = refbuf->next;
+            client->pos = 0;
+        }
+    } while (0);

-    return ret;
+    if (ret > 0)
+        written += ret;
+    return written;
}

static void format_mp3_free_plugin(format_plugin_t *self)
{
/* free the plugin instance */
mp3_state *state = self->_state;
-    thread_mutex_destroy(&(state->lock));

-    free(state->metadata);
+    thread_mutex_destroy (&state->url_lock);
+    free (state->url_artist);
+    free (state->url_title);
+    refbuf_release (state->metadata);
free(state);
free(self);
}

-static int format_mp3_get_buffer(format_plugin_t *self, char *data,
-    unsigned long len, refbuf_t **buffer)
+
+/* read an mp3 stream which does not have shoutcast style metadata */
+static refbuf_t *mp3_get_no_meta (source_t *source)
{
+    int bytes;
refbuf_t *refbuf;
-    mp3_state *state = self->_state;
+    mp3_state *source_mp3 = source->format->_state;

-    /* Set this to NULL in case it doesn't get set to a valid buffer later */
-    *buffer = NULL;
+    if ((refbuf = refbuf_new (2048)) == NULL)
+        return NULL;
+    bytes = sock_read_bytes (source->con->sock, refbuf->data, 2048);

-    if(!data)
-        return 0;
+    if (bytes == 0)
+    {
+        INFO1 ("End of stream %s", source->mount);
+        source->running = 0;
+        refbuf_release (refbuf);
+        return NULL;
+    }
+    if (source_mp3->update_metadata)
+    {
+        mp3_set_title (source);
+        source_mp3->update_metadata = 0;
+    }
+    if (bytes > 0)
+    {
+        refbuf->len  = bytes;
+        refbuf->associated = source_mp3->metadata;
+        refbuf_addref (source_mp3->metadata);
+        return refbuf;
+    }
+    refbuf_release (refbuf);

-    if(state->inline_metadata_interval) {
-        /* Source is sending metadata, handle it... */
+    if (!sock_recoverable (sock_error()))
+        source->running = 0;

-        while(len > 0) {
-            int to_read = state->inline_metadata_interval - state->offset;
-            if(to_read > 0) {
-                refbuf_t *old_refbuf = *buffer;
+    return NULL;
+}

-                if(to_read > len)
-                    to_read = len;

-                if(old_refbuf) {
-                    refbuf = refbuf_new(to_read + old_refbuf->len);
-                    memcpy(refbuf->data, old_refbuf->data, old_refbuf->len);
-                    memcpy(refbuf->data+old_refbuf->len, data, to_read);
+/* read mp3 data with inlined metadata from the source. Filter out the
+ * metadata so that the mp3 data itself is store on the queue and the
+ * metadata is is associated with it
+ */
+static refbuf_t *mp3_get_filter_meta (source_t *source)
+{
+    refbuf_t *refbuf;
+    format_plugin_t *plugin = source->format;
+    mp3_state *source_mp3 = plugin->_state;
+    unsigned char *src;
+    unsigned bytes, mp3_block;
+    int ret;

-                    refbuf_release(old_refbuf);
-                }
-                else {
-                    refbuf = refbuf_new(to_read);
-                    memcpy(refbuf->data, data, to_read);
-                }
+    refbuf = refbuf_new (2048);
+    src = refbuf->data;

-                *buffer = refbuf;
+    ret = sock_read_bytes (source->con->sock, refbuf->data, 2048);

-                state->offset += to_read;
-                data += to_read;
-                len -= to_read;
-            }
-            else if(!state->metadata_length) {
-                /* Next up is the metadata byte... */
-                unsigned char byte = data[0];
-                data++;
-                len--;
+    if (ret == 0)
+    {
+        INFO1 ("End of stream %s", source->mount);
+        source->running = 0;
+        refbuf_release (refbuf);
+        return NULL;
+    }
+    if (source_mp3->update_metadata)
+    {
+        mp3_set_title (source);
+        source_mp3->update_metadata = 0;
+    }
+    if (ret < 0)
+    {
+        refbuf_release (refbuf);
+        if (sock_recoverable (sock_error()))
+            return NULL; /* go back to waiting */
+        INFO0 ("Error on connection from source");
+        source->running = 0;
+        return NULL;
+    }
+    /* fill the buffer with the read data */
+    bytes = (unsigned int)ret;
+    refbuf->len = 0;
+    while (bytes > 0)
+    {
+        unsigned int metadata_remaining;

-                /* According to the "spec"... this byte * 16 */
-                state->metadata_length = byte * 16;
+        mp3_block = source_mp3->inline_metadata_interval - source_mp3->offset;

-                if(state->metadata_length) {
-                    state->metadata_buffer =
-                        calloc(state->metadata_length + 1, 1);
+        /* is there only enough to account for mp3 data */
+        if (bytes <= mp3_block)
+        {
+            refbuf->len += bytes;
+            source_mp3->offset += bytes;
+            break;
+        }
+        /* we have enough data to get to the metadata
+         * block, but only transfer upto it */
+        if (mp3_block)
+        {
+            src += mp3_block;
+            bytes -= mp3_block;
+            refbuf->len += mp3_block;
+            source_mp3->offset += mp3_block;
+            continue;
+        }

-                    /* Ensure we have a null-terminator even if the source
-                     * stream is invalid.
-                     */
-                    state->metadata_buffer[state->metadata_length] = 0;
-                }
-                else {
-                    state->offset = 0;
-                }
+        /* process the inline metadata, len == 0 indicates not seen any yet */
+        if (source_mp3->build_metadata_len == 0)
+        {
+            memset (source_mp3->build_metadata, 0,
+                    sizeof (source_mp3->build_metadata));
+            source_mp3->build_metadata_offset = 0;
+            source_mp3->build_metadata_len = 1 + (*src * 16);
+        }

-                state->metadata_offset = 0;
-            }
-            else {
-                /* Metadata to read! */
-                int readable = state->metadata_length - state->metadata_offset;
+        /* do we have all of the metatdata block */
+        metadata_remaining = source_mp3->build_metadata_len -
+            source_mp3->build_metadata_offset;
+        if (bytes < metadata_remaining)
+        {
+            memcpy (source_mp3->build_metadata +
+                    source_mp3->build_metadata_offset, src, bytes);
+            source_mp3->build_metadata_offset += bytes;
+            break;
+        }
+        /* copy all bytes except the last one, that way we
+         * know a null byte terminates the message */
+        memcpy (source_mp3->build_metadata + source_mp3->build_metadata_offset,
+                src, metadata_remaining-1);

-                if(readable > len)
-                    readable = len;
+        /* overwrite metadata in the buffer */
+        bytes -= metadata_remaining;
+        memmove (src, src+metadata_remaining, bytes);

-                memcpy(state->metadata_buffer + state->metadata_offset,
-                        data, readable);
+        /* assign metadata if it's not 1 byte, as that indicates a change */
+        if (source_mp3->build_metadata_len > 1)
+        {
+            refbuf_t *meta = refbuf_new (source_mp3->build_metadata_len);
+            memcpy (meta->data, source_mp3->build_metadata,
+                    source_mp3->build_metadata_len);

-                state->metadata_offset += readable;
-
-                data += readable;
-                len -= readable;
-
-                if(state->metadata_offset == state->metadata_length)
-                {
-                    if(state->metadata_length)
-                    {
-                        thread_mutex_lock(&(state->lock));
-                        free(state->metadata);
-                        /* Now, reformat state->metadata_buffer to strip off
-                           StreamTitle=' and the closing '; (but only if there's
-                           enough data for it to be correctly formatted) */
-                        if(state->metadata_length >= 15) {
-                            /* This is overly complex because the
-                               metadata_length is the length of the actual raw
-                               data, but the (null-terminated) string is going
-                               to be shorter than this, and we can't trust that
-                               the raw data doesn't have other embedded-nulls */
-                            int stringlength;
-
-                            state->metadata = malloc(state->metadata_length -
-                                    12);
-                            memcpy(state->metadata,
-                                    state->metadata_buffer + 13,
-                                    state->metadata_length - 13);
-                            /* Make sure we've got a null-terminator of some
-                               sort */
-                            state->metadata[state->metadata_length - 13] = 0;
-
-                            /* Now figure out the _right_ one */
-                            stringlength = strlen(state->metadata);
-                            if(stringlength > 2)
-                                state->metadata[stringlength - 2] = 0;
-                            free(state->metadata_buffer);
-                        }
-                        else
-                            state->metadata = state->metadata_buffer;
-
-                        stats_event(self->mount, "title", state->metadata);
-                        state->metadata_buffer = NULL;
-                        state->metadata_age++;
-                        thread_mutex_unlock(&(state->lock));
-                        yp_touch (self->mount);
-                    }
-
-                    state->offset = 0;
-                    state->metadata_length = 0;
-                }
+            DEBUG1("shoutcast metadata %.4080s", meta->data+1);
+            if (strncmp (meta->data+1, "StreamTitle=", 12) == 0)
+            {
+                filter_shoutcast_metadata (source, source_mp3->build_metadata,
+                        source_mp3->build_metadata_len);
+                refbuf_release (source_mp3->metadata);
+                source_mp3->metadata = meta;
}
+            else
+            {
+                ERROR0 ("Incorrect metadata format, ending stream");
+                source->running = 0;
+                refbuf_release (refbuf);
+                return NULL;
+            }
}
-
-        /* Either we got a buffer above (in which case it can be used), or
-         * we set *buffer to NULL in the prologue, so the return value is
-         * correct anyway...
-         */
-        return 0;
+        source_mp3->offset = 0;
+        source_mp3->build_metadata_len = 0;
}
-    else {
-        /* Simple case - no metadata, just dump data directly to a buffer */
-        refbuf = refbuf_new(len);
+    refbuf->associated = source_mp3->metadata;
+    refbuf_addref (source_mp3->metadata);

-        memcpy(refbuf->data, data, len);
-
-        *buffer = refbuf;
-        return 0;
-    }
+    return refbuf;
}

-static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self)
-{
-    return NULL;
-}

-static void *format_mp3_create_client_data(format_plugin_t *self,
-        source_t *source, client_t *client)
+static int format_mp3_create_client_data(source_t *source, client_t *client)
{
mp3_client_data *data = calloc(1,sizeof(mp3_client_data));
char *metadata;

-    data->interval = ICY_METADATA_INTERVAL;
-    data->offset = 0;
+    if (data == NULL)
+        return -1;
+
+    client->format_data = data;
client->free_client_data = free_mp3_client_data;
-
metadata = httpp_getvar(client->parser, "icy-metadata");
if(metadata)
data->use_metadata = atoi(metadata)>0?1:0;

-    return data;
+    return 0;
}


@@ -432,3 +600,16 @@
format_send_general_headers(self, source, client);
}

+
+static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf)
+{
+    if (refbuf->len == 0)
+        return;
+    if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) < (size_t)refbuf->len)
+    {
+        WARN0 ("Write to dump file failed, disabling");
+        fclose (source->dumpfile);
+        source->dumpfile = NULL;
+    }
+}
+

Modified: icecast/branches/icecast-singleq/src/format_mp3.h
===================================================================
--- icecast/branches/icecast-singleq/src/format_mp3.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_mp3.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -19,18 +19,23 @@
#define __FORMAT_MP3_H__

typedef struct {
-    char *metadata;
-    int metadata_age;
-    mutex_t lock;
-
/* These are for inline metadata */
int inline_metadata_interval;
int offset;
-    int metadata_length;
-    char *metadata_buffer;
-    int metadata_offset;
+    unsigned interval;
+    char *url_artist;
+    char *url_title;
+    int update_metadata;
+
+    refbuf_t *metadata;
+    mutex_t url_lock;
+
+    unsigned build_metadata_len;
+    unsigned build_metadata_offset;
+    char build_metadata[4081];
} mp3_state;

-format_plugin_t *format_mp3_get_plugin(http_parser_t *parser);
+int format_mp3_get_plugin(struct source_tag *src);
+void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value);

#endif  /* __FORMAT_MP3_H__ */

Modified: icecast/branches/icecast-singleq/src/format_vorbis.c
===================================================================
--- icecast/branches/icecast-singleq/src/format_vorbis.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_vorbis.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -49,20 +49,31 @@
ogg_page og;
unsigned long serialno;
int header;
-    refbuf_t *headbuf[MAX_HEADER_PAGES];
+    refbuf_t *file_headers;
+    refbuf_t *header_pages;
+    refbuf_t *header_pages_tail;
int packets;
} vstate_t;

+struct client_vorbis
+{
+    refbuf_t *headers;
+    refbuf_t *header_page;
+    unsigned int pos;
+    int processing_headers;
+};
+
+
static void format_vorbis_free_plugin(format_plugin_t *self);
-static int format_vorbis_get_buffer(format_plugin_t *self, char *data,
-        unsigned long len, refbuf_t **buffer);
-static refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self);
-static void *format_vorbis_create_client_data(format_plugin_t *self,
-        source_t *source, client_t *client);
+static refbuf_t *format_vorbis_get_buffer (source_t *source);
+static int format_vorbis_create_client_data (source_t *source, client_t *client);
static void format_vorbis_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
+static int write_buf_to_client (format_plugin_t *self, client_t *client);
+static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf);

-format_plugin_t *format_vorbis_get_plugin(void)
+
+int format_vorbis_get_plugin(source_t *source)
{
format_plugin_t *plugin;
vstate_t *state;
@@ -70,10 +81,9 @@
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));

plugin->type = FORMAT_TYPE_VORBIS;
-    plugin->has_predata = 1;
+    plugin->write_buf_to_file = write_ogg_to_file;
plugin->get_buffer = format_vorbis_get_buffer;
-    plugin->get_predata = format_vorbis_get_predata;
-    plugin->write_buf_to_client = format_generic_write_buf_to_client;
+    plugin->write_buf_to_client = write_buf_to_client;
plugin->create_client_data = format_vorbis_create_client_data;
plugin->client_send_headers = format_vorbis_send_headers;
plugin->free_plugin = format_vorbis_free_plugin;
@@ -83,52 +93,67 @@
ogg_sync_init(&state->oy);

plugin->_state = (void *)state;
+    source->format = plugin;

-    return plugin;
+    return 0;
}

void format_vorbis_free_plugin(format_plugin_t *self)
{
-    int i;
vstate_t *state = (vstate_t *)self->_state;
+    refbuf_t *header;

/* free memory associated with this plugin instance */

/* free state memory */
+    while (header)
+    {
+        refbuf_t *to_release = header;
+        header = header->next;
+        refbuf_release (to_release);
+    }
ogg_sync_clear(&state->oy);
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);

-    for (i = 0; i < MAX_HEADER_PAGES; i++) {
-        if (state->headbuf[i]) {
-            refbuf_release(state->headbuf[i]);
-            state->headbuf[i] = NULL;
-        }
-    }
-
free(state);

/* free the plugin instance */
free(self);
}

-int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len, refbuf_t **buffer)
+static refbuf_t *format_vorbis_get_buffer (source_t *source)
{
-    char *buf;
-    int i, result;
+    int result;
ogg_packet op;
char *tag;
-    refbuf_t *refbuf, *source_refbuf;
+    refbuf_t *refbuf, *header;
+    char *data;
+    format_plugin_t *self = source->format;
+    int bytes;
vstate_t *state = (vstate_t *)self->_state;
-    source_t *source;

-    if (data) {
-        /* write the data to the buffer */
-        buf = ogg_sync_buffer(&state->oy, len);
-        memcpy(buf, data, len);
-        ogg_sync_wrote(&state->oy, len);
+    data = ogg_sync_buffer (&state->oy, 4096);
+
+    bytes = sock_read_bytes (source->con->sock, data, 4096);
+    if (bytes < 0)
+    {
+        if (sock_recoverable (sock_error()))
+            return NULL;
+        WARN0 ("source connection has died");
+        ogg_sync_wrote (&state->oy, 0);
+        source->running = 0;
+        return NULL;
}
+    if (bytes == 0)
+    {
+        INFO1 ("End of Stream %s", source->mount);
+        ogg_sync_wrote (&state->oy, 0);
+        source->running = 0;
+        return NULL;
+    }
+    ogg_sync_wrote (&state->oy, bytes);

refbuf = NULL;
if (ogg_sync_pageout(&state->oy, &state->og) == 1) {
@@ -137,21 +162,25 @@
memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len);

if (state->serialno != ogg_page_serialno(&state->og)) {
+            DEBUG0("new stream");
/* this is a new logical bitstream */
state->header = 0;
state->packets = 0;

-            /* release old headers, stream state, vorbis data */
-            for (i = 0; i < MAX_HEADER_PAGES; i++) {
-                if (state->headbuf[i]) {
-                    refbuf_release(state->headbuf[i]);
-                    state->headbuf[i] = NULL;
-                }
+            /* Clear old stuff. Rarely but occasionally needed. */
+            header = state->header_pages;
+            while (header)
+            {
+                DEBUG0 ("clearing out header page");
+                refbuf_t *to_release = header;
+                header = header->next;
+                refbuf_release (to_release);
}
-            /* Clear old stuff. Rarely but occasionally needed. */
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
+            state->header_pages = NULL;
+            state->header_pages_tail = NULL;

state->serialno = ogg_page_serialno(&state->og);
ogg_stream_init(&state->os, state->serialno);
@@ -164,52 +193,40 @@
* extras pages beyond the header. We need to collect the pages
* here anyway, but they may have to be discarded later.
*/
+            DEBUG1 ("header %d", state->header);
if (ogg_page_granulepos(&state->og) <= 0) {
state->header++;
} else {
/* we're done caching headers */
state->header = -1;

+                DEBUG0 ("doing stats");
/* put known comments in the stats */
tag = vorbis_comment_query(&state->vc, "TITLE", 0);
-                if (tag) stats_event(self->mount, "title", tag);
-                else stats_event(self->mount, "title", "unknown");
+                if (tag) stats_event(source->mount, "title", tag);
+                else stats_event(source->mount, "title", "unknown");
tag = vorbis_comment_query(&state->vc, "ARTIST", 0);
-                if (tag) stats_event(self->mount, "artist", tag);
-                else stats_event(self->mount, "artist", "unknown");
+                if (tag) stats_event(source->mount, "artist", tag);
+                else stats_event(source->mount, "artist", "unknown");

/* don't need these now */
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);

-                /* Drain the source queue on metadata update otherwise you
-                   could have a mismatch between what is on the source queue
-                   and what is in the state->headbuf */
-                avl_tree_rlock(global.source_tree);
-                source = source_find_mount_raw(self->mount);
-                avl_tree_unlock(global.source_tree);
-
-                thread_mutex_lock(&source->queue_mutex);
-                while ((source_refbuf = refbuf_queue_remove(&source->queue))) {
-                    refbuf_release(source_refbuf);
-                }
-                thread_mutex_unlock(&source->queue_mutex);
-
-                yp_touch (self->mount);
+                yp_touch (source->mount);
}
}

/* cache header pages */
if (state->header > 0 && state->packets < 3) {
-            if(state->header > MAX_HEADER_PAGES) {
-                refbuf_release(refbuf);
-                ERROR1("Bad vorbis input: header is more than %d pages long", MAX_HEADER_PAGES);
+            /* build a list of headers pages for attaching */
+            if (state->header_pages_tail)
+                state->header_pages_tail->next = refbuf;
+            state->header_pages_tail = refbuf;

-                return -1;
-            }
-            refbuf_addref(refbuf);
-            state->headbuf[state->header - 1] = refbuf;
+            if (state->header_pages == NULL)
+                state->header_pages = refbuf;

if (state->packets >= 0 && state->packets < 3) {
ogg_stream_pagein(&state->os, &state->og);
@@ -229,36 +246,40 @@
}
}
}
+            /* we do not place ogg headers on the main queue */
+            return NULL;
}
+        /* increase ref counts on each header page going out */
+        header = state->header_pages;
+        while (header)
+        {
+            refbuf_addref (header);
+            header = header->next;
+        }
+        refbuf->associated = state->header_pages;
}

-    *buffer = refbuf;
-    return 0;
+    return refbuf;
}

-refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self)
+static void free_ogg_client_data (client_t *client)
{
-    refbuf_queue_t *queue;
-    int i;
-    vstate_t *state = (vstate_t *)self->_state;
-
-    queue = NULL;
-    for (i = 0; i < MAX_HEADER_PAGES; i++) {
-        if (state->headbuf[i]) {
-            refbuf_addref(state->headbuf[i]);
-            refbuf_queue_add(&queue, state->headbuf[i]);
-        } else {
-            break;
-        }
-    }
-
-    return queue;
+    free (client->format_data);
+    client->format_data = NULL;
}

-static void *format_vorbis_create_client_data(format_plugin_t *self,
-        source_t *source, client_t *client)
+static int format_vorbis_create_client_data (source_t *source, client_t *client)
{
-    return NULL;
+    struct client_vorbis *client_data = calloc (1, sizeof (struct client_vorbis));
+    int ret = -1;
+
+    if (client_data)
+    {
+        client->format_data = client_data;
+        client->free_client_data = free_ogg_client_data;
+        ret = 0;
+    }
+    return ret;
}

static void format_vorbis_send_headers(format_plugin_t *self,
@@ -277,4 +298,123 @@
format_send_general_headers(self, source, client);
}

+static int send_ogg_headers (client_t *client, refbuf_t *headers)
+{
+    struct client_vorbis *client_data = client->format_data;
+    refbuf_t *refbuf;
+    int written = 0;

+    if (client_data->processing_headers == 0)
+    {
+        client_data->header_page = headers;
+        client_data->pos = 0;
+        client_data->processing_headers = 1;
+    }
+    refbuf = client_data->header_page;
+    while (refbuf)
+    {
+        char *data = refbuf->data + client_data->pos;
+        unsigned len = refbuf->len - client_data->pos;
+        int ret;
+
+        ret = client_send_bytes (client, data, len);
+        if (ret > 0)
+        {
+           written += ret;
+           client_data->pos += ret;
+        }
+        if (ret < (int)len)
+            return written;
+        if (client_data->pos == refbuf->len)
+        {
+            refbuf = refbuf->next;
+            client_data->header_page = refbuf;
+            client_data->pos = 0;
+        }
+    }
+    /* update client info on headers sent */
+    client_data->processing_headers = 0;
+    client_data->headers = headers;
+    return written;
+}
+
+static int write_buf_to_client (format_plugin_t *self, client_t *client)
+{
+    refbuf_t *refbuf = client->refbuf;
+    char *buf;
+    unsigned len;
+    struct client_vorbis *client_data = client->format_data;
+    int ret, written = 0;
+
+    if (refbuf->next == NULL && client->pos == refbuf->len)
+        return 0;
+
+    if (refbuf->next && client->pos == refbuf->len)
+    {
+        client->refbuf = refbuf->next;
+        client->pos = 0;
+    }
+    refbuf = client->refbuf;
+    do
+    {
+        if (client_data->headers != refbuf->associated)
+        {
+            /* different headers seen so send the new ones */
+            ret = send_ogg_headers (client, refbuf->associated);
+            if (client_data->processing_headers)
+                break;
+            written += ret;
+        }
+        buf = refbuf->data + client->pos;
+        len = refbuf->len - client->pos;
+        ret = client_send_bytes (client, buf, len);
+
+        if (ret > 0)
+            client->pos += ret;
+
+        if (ret < (int)len)
+            break;
+        written += ret;
+        /* we have now written the page(s) */
+        ret = 0;
+    } while (0);
+
+    if (ret > 0)
+       written += ret;
+    return written;
+}
+
+static int write_ogg_data (struct source_tag *source, refbuf_t *refbuf)
+{
+    int ret = 1;
+
+    if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len)
+    {
+        WARN0 ("Write to dump file failed, disabling");
+        fclose (source->dumpfile);
+        source->dumpfile = NULL;
+        ret = 0;
+    }
+    return ret;
+}
+
+
+static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf)
+{
+    vstate_t *state = (vstate_t *)source->format->_state;
+
+
+    if (state->file_headers != refbuf->associated)
+    {
+        refbuf_t *header = refbuf->associated;
+        while (header)
+        {
+            if (write_ogg_data (source, header) == 0)
+                return;
+            header = header->next;
+        }
+        state->file_headers = refbuf->associated;
+    }
+    write_ogg_data (source, refbuf);
+}
+

Modified: icecast/branches/icecast-singleq/src/format_vorbis.h
===================================================================
--- icecast/branches/icecast-singleq/src/format_vorbis.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_vorbis.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -18,6 +18,6 @@
#ifndef __FORMAT_VORBIS_H__
#define __FORMAT_VORBIS_H__

-format_plugin_t *format_vorbis_get_plugin(void);
+int format_vorbis_get_plugin(source_t *source);

#endif  /* __FORMAT_VORBIS_H__ */

Modified: icecast/branches/icecast-singleq/src/refbuf.c
===================================================================
--- icecast/branches/icecast-singleq/src/refbuf.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/refbuf.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -38,9 +38,22 @@
refbuf_t *refbuf;

refbuf = (refbuf_t *)malloc(sizeof(refbuf_t));
-    refbuf->data = (void *)malloc(size);
+    if (refbuf == NULL)
+        return NULL;
+    refbuf->data = NULL;
+    if (size)
+    {
+        refbuf->data = malloc (size);
+        if (refbuf->data == NULL)
+        {
+            free (refbuf);
+            return NULL;
+        }
+    }
refbuf->len = size;
refbuf->_count = 1;
+    refbuf->next = NULL;
+    refbuf->associated = NULL;

return refbuf;
}
@@ -54,7 +67,14 @@
{
self->_count--;
if (self->_count == 0) {
-        free(self->data);
+        while (self->associated)
+        {
+            refbuf_t *ref = self->associated;
+            self->associated = ref->next;
+            refbuf_release (ref);
+        }
+        if (self->len)
+            free(self->data);
free(self);
return;
}

Modified: icecast/branches/icecast-singleq/src/refbuf.h
===================================================================
--- icecast/branches/icecast-singleq/src/refbuf.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/refbuf.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -22,6 +22,8 @@
{
char *data;
long len;
+    struct _refbuf_tag *associated;
+    struct _refbuf_tag *next;

unsigned long _count;
} refbuf_t;
@@ -53,10 +55,3 @@

#endif  /* __REFBUF_H__ */

-
-
-
-
-
-
-

Modified: icecast/branches/icecast-singleq/src/source.c
===================================================================
--- icecast/branches/icecast-singleq/src/source.c	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/source.c	2004-07-19 23:36:00 UTC (rev 7178)
@@ -60,6 +60,7 @@
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
static void _parse_audio_info (source_t *source, const char *s);
+static void source_shutdown (source_t *source);

/* Allocate a new source with the stated mountpoint, if one already
* exists with that mountpoint in the global source tree then return
@@ -189,7 +190,6 @@

void source_clear_source (source_t *source)
{
-    refbuf_t *refbuf;
DEBUG1 ("clearing source \"%s\"", source->mount);
client_destroy(source->client);
source->client = NULL;
@@ -239,12 +239,17 @@

free(source->dumpfilename);
source->dumpfilename = NULL;
+
/* Lets clear out the source queue too */
-    while ((refbuf = refbuf_queue_remove(&source->queue)))
-        refbuf_release(refbuf);
-    source->queue = NULL;
+    while (source->stream_data)
+    {
+        refbuf_t *p = source->stream_data;
+        source->stream_data = p->next;
+        refbuf_release (p);
+    }
+    source->stream_data_tail = NULL;
+
source->burst_on_connect = 1;
-    thread_mutex_destroy(&source->queue_mutex);
}


@@ -360,7 +365,106 @@
thread_mutex_unlock (&move_clients_mutex);
}

+/* get some data from the source. The stream data is placed in a refbuf
+ * and sent back, however NULL is also valid as in the case of a short
+ * timeout and there's no data pending.
+ */
+static refbuf_t *get_next_buffer (source_t *source)
+{
+    refbuf_t *refbuf = NULL;
+    int delay = 250;

+    if (source->short_delay)
+        delay = 0;
+    while (global.running == ICE_RUNNING && source->running)
+    {
+        int fds;
+        time_t current = time (NULL);
+
+        fds = util_timed_wait_for_fd (source->con->sock, delay);
+
+        if (fds < 0)
+        {
+            if (! sock_recoverable (sock_error()))
+            {
+                WARN0 ("Error while waiting on socket, Disconnecting source");
+                source->running = 0;
+            }
+            break;
+        }
+        if (fds == 0)
+        {
+            if (source->last_read + (time_t)source->timeout < current)
+            {
+                DEBUG3 ("last %ld, timeout %ld, now %ld", source->last_read, source->timeout, current);
+                WARN0 ("Disconnecting source due to socket timeout");
+                source->running = 0;
+            }
+            break;
+        }
+        source->last_read = current;
+        refbuf = source->format->get_buffer (source);
+        if (refbuf)
+            break;
+    }
+
+    return refbuf;
+}
+
+
+/* general send routine per listener.  The deletion_expected tells us whether
+ * the last in the queue is about to disappear, so if this client is still
+ * referring to it after writing then drop the client as it's fallen too far
+ * behind
+ */
+static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
+{
+    int bytes;
+    int loop = 10;   /* max number of iterations in one go */
+    int total_written = 0;
+
+    /* new users need somewhere to start from */
+    if (client->refbuf == NULL)
+    {
+        /* make clients start at the most recent data on the queue */
+        client->refbuf = source->stream_data_tail;
+        if (client->refbuf == NULL)
+           return;
+    }
+
+    while (1)
+    {
+        /* jump out if client connection has died */
+        if (client->con->error)
+            break;
+
+        /* lets not send too much to one client in one go, but don't
+           sleep for too long if more data can be sent */
+        if (total_written > 150000 || loop == 0)
+        {
+            source->short_delay = 1;
+            break;
+        }
+
+        loop--;
+
+        bytes = source->format->write_buf_to_client (source->format, client);
+        if (bytes <= 0)
+            break;  /* can't write any more */
+
+        total_written += bytes;
+    }
+
+    /* the refbuf referenced at head (last in queue) may be marked for deletion
+     * if so, check to see if this client is still referring to it */
+    if (deletion_expected && client->refbuf == source->stream_data)
+    {
+        DEBUG0("Client has fallen too far behind, removing");
+        client->con->error = 1;
+    }
+}
+
+
static void source_init (source_t *source)
{
ice_config_t *config = config_get_config();
@@ -423,9 +527,8 @@

sock_set_blocking (source->con->sock, SOCK_NONBLOCK);

-    thread_mutex_create(&source->queue_mutex);
-
DEBUG0("Source creation complete");
+    source->last_read = time (NULL);
source->running = 1;

/*
@@ -454,212 +557,53 @@

void source_main (source_t *source)
{
-    char buffer[4096];
-    long bytes, sbytes;
-    int ret, i;
+    unsigned int listeners;
+    refbuf_t *refbuf;
client_t *client;
avl_node *client_node;

-    refbuf_t *refbuf, *abuf, *stale_refbuf;
-    int data_done;
-
source_init (source);

while (global.running == ICE_RUNNING && source->running) {
-        ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
-        if(ret < 0) {
-            WARN0("Bad data from source");
-            break;
-        }
-        if (source->burst_on_connect) {
-            thread_mutex_lock(&source->queue_mutex);
-            /* Add to the source buffer */
-            if (refbuf) {
-                refbuf_addref(refbuf);
-                refbuf_queue_add(&(source->queue), refbuf);
-                /* We derive the size of the source buffer queue based off the
-                   setting for queue_size_limit (client buffer queue size).
-                   This is because the source buffer queue size should be a
-                   percentage of the client buffer size (definately cannot
-                   be larger). Why 50% ? Because > 75% does not give the
-                   client enough leeway for lagging on initial connection
-                   and < 25% does not provide a good enough burst on connect. */
-                if (refbuf_queue_length(&(source->queue)) >
-                    source->queue_size_limit/2) {
-                    stale_refbuf = refbuf_queue_remove(&(source->queue));
-                    refbuf_release(stale_refbuf);
-                }
-            }
-            thread_mutex_unlock(&source->queue_mutex);
-        }
-        bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
-        while (refbuf == NULL) {
-            bytes = 0;
-            while (bytes <= 0) {
-                ret = util_timed_wait_for_fd(source->con->sock, source->timeout*1000);
+        int remove_from_q;

-                if (ret < 0 && sock_recoverable (sock_error()))
-                   continue;
-                if (ret <= 0) { /* timeout expired */
-                    WARN1("Disconnecting source: socket timeout (%d s) expired",
-                           source->timeout);
-                    bytes = 0;
-                    break;
-                }
+        refbuf = get_next_buffer (source);

-                bytes = sock_read_bytes(source->con->sock, buffer, 4096);
-                if (bytes == 0 ||
-                        (bytes < 0 && !sock_recoverable(sock_error())))
-                {
-                    DEBUG1("Disconnecting source due to socket read error: %s",
-                            strerror(sock_error()));
-                    break;
-                }
-            }
-            if (bytes <= 0) break;
-            source->client->con->sent_bytes += bytes;
-            ret = source->format->get_buffer(source->format, buffer, bytes,
-                    &refbuf);
-            if(ret < 0) {
-                WARN0("Bad data from source");
-                goto done;
-            }
-            if (source->burst_on_connect) {
-                /* Add to the source buffer */
-                thread_mutex_lock(&source->queue_mutex);
-                if (refbuf) {
-                    refbuf_addref(refbuf);
-                    refbuf_queue_add(&(source->queue), refbuf);
-                    if (refbuf_queue_length(&(source->queue)) >
-                        source->queue_size_limit/2) {
-                        stale_refbuf = refbuf_queue_remove(&(source->queue));
-                        refbuf_release(stale_refbuf);
-                    }
-                }
-                thread_mutex_unlock(&source->queue_mutex);
-            }
-        }
+        remove_from_q = 0;
+        source->short_delay = 0;

-        if (bytes <= 0) {
-            INFO0("Removing source following disconnection");
-            break;
-        }
+        if (refbuf)
+        {
+            /* append buffer to the in-flight data queue,  */
+            if (source->stream_data == NULL)
+                source->stream_data = refbuf;
+            if (source->stream_data_tail)
+                source->stream_data_tail->next = refbuf;
+            source->stream_data_tail = refbuf;
+            source->queue_size += refbuf->len;

-        /* we have a refbuf buffer, which a data block to be sent to
-        ** all clients.  if a client is not able to send the buffer
-        ** immediately, it should store it on its queue for the next
-        ** go around.
-        **
-        ** instead of sending the current block, a client should send
-        ** all data in the queue, plus the current block, until either
-        ** it runs out of data, or it hits a recoverable error like
-        ** EAGAIN.  this will allow a client that got slightly lagged
-        ** to catch back up if it can
-        */
-
-        /* First, stream dumping, if enabled */
-        if(source->dumpfile) {
-            if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
-                    refbuf->len)
-            {
-                WARN1("Write to dump file failed, disabling: %s",
-                        strerror(errno));
-                fclose(source->dumpfile);
-                source->dumpfile = NULL;
-            }
+            /* save stream to file */
+            if (source->dumpfile && source->format->write_buf_to_file)
+                source->format->write_buf_to_file (source, refbuf);
}
+        /* lets see if we have too much data in the queue, but don't remove it until later */
+        if (source->queue_size > source->queue_size_limit)
+            remove_from_q = 1;

-        /* acquire read lock on client_tree */
-        avl_tree_rlock(source->client_tree);
-
-        client_node = avl_get_first(source->client_tree);
-        while (client_node) {
-            /* acquire read lock on node */
-            avl_node_wlock(client_node);
-
-            client = (client_t *)client_node->key;
-
-            data_done = 0;
-
-            /* do we have any old buffers? */
-            abuf = refbuf_queue_remove(&client->queue);
-            while (abuf) {
-                bytes = abuf->len - client->pos;
-
-                sbytes = source->format->write_buf_to_client(source->format,
-                        client, &abuf->data[client->pos], bytes);
-                if (sbytes < bytes) {
-                    if (client->con->error) {
-                        refbuf_release (abuf);
-                    }
-                    else {
-                        /* We didn't send the entire buffer. Leave it for
-                         * the moment, handle it in the next iteration.
-                         */
-                        client->pos += sbytes<0?0:sbytes;
-                        refbuf_queue_insert (&client->queue, abuf);
-                    }
-                    data_done = 1;
-                    break;
-                }
-                /* we're done with that refbuf, release it and reset the pos */
-                refbuf_release(abuf);
-                client->pos = 0;
-
-                abuf = refbuf_queue_remove(&client->queue);
-            }
-
-            /* now send or queue the new data */
-            if (data_done) {
-                refbuf_addref(refbuf);
-                refbuf_queue_add(&client->queue, refbuf);
-            } else {
-                sbytes = source->format->write_buf_to_client(source->format,
-                        client, refbuf->data, refbuf->len);
-                if (client->con->error == 0 && sbytes < refbuf->len) {
-                    /* Didn't send the entire buffer, queue it */
-                    client->pos = sbytes<0?0:sbytes;
-                    refbuf_addref(refbuf);
-                    refbuf_queue_insert(&client->queue, refbuf);
-                }
-            }
-
-            /* if the client is too slow, its queue will slowly build up.
-            ** we need to make sure the client is keeping up with the
-            ** data, so we'll kick any client who's queue gets to large.
-            */
-            if (refbuf_queue_length(&client->queue) > source->queue_size_limit) {
-                DEBUG0("Client has fallen too far behind, removing");
-                client->con->error = 1;
-            }
-
-            /* release read lock on node */
-            avl_node_unlock(client_node);
-
-            /* get the next node */
-            client_node = avl_get_next(client_node);
-        }
-        /* release read lock on client_tree */
-        avl_tree_unlock(source->client_tree);
-
-        /* Only release the refbuf if we didn't add it to the source queue */
-        if (!source->burst_on_connect) {
-            refbuf_release(refbuf);
-        }
-
/* acquire write lock on client_tree */
avl_tree_wlock(source->client_tree);

-        /** delete bad clients **/
+        listeners = source->listeners;
client_node = avl_get_first(source->client_tree);
while (client_node) {
client = (client_t *)client_node->key;
+
+            send_to_listener (source, client, remove_from_q);
+
if (client->con->error) {
client_node = avl_get_next(client_node);
avl_delete(source->client_tree, (void *)client, _free_client);
source->listeners--;
-                stats_event_args(source->mount, "listeners", "%d",
-                        source->listeners);
DEBUG0("Client removed");
continue;
}
@@ -696,34 +640,7 @@
DEBUG0("Client added");
stats_event_inc(NULL, "clients");
stats_event_inc(source->mount, "connections");
-            stats_event_args(source->mount, "listeners", "%d",
-                    source->listeners);

-            /* we have to send cached headers for some data formats
-            ** this is where we queue up the buffers to send
-            */
-            client = (client_t *)client_node->key;
-            if (source->format->has_predata) {
-                client->queue = source->format->get_predata(source->format);
-            }
-            if (source->burst_on_connect) {
-                /* here is where we fill up the new client with refbufs from
-                   the source buffer.  this will allow an initial burst of
-                   audio data to be sent to the client, and allow for a faster
-                   startup time (from listener perspective) for the stream */
-                if (!client->burst_sent) {
-                    thread_mutex_lock(&source->queue_mutex);
-                    for (i=0;i<refbuf_queue_size(&(source->queue));i++) {
-                        refbuf_queue_add(&(client->queue),
-                            refbuf_queue_get(&(source->queue), i));
-                    }
-                    thread_mutex_unlock(&source->queue_mutex);
-                    client->burst_sent = 1;
-                    DEBUG1("Added %d buffers to initial client queue",
-                            refbuf_queue_length(&(source->queue)));
-                }
-            }
-
client_node = avl_get_next(client_node);
}

@@ -737,12 +654,35 @@
/* release write lock on pending_tree */
avl_tree_unlock(source->pending_tree);

+        /* update the stats if need be */
+        if (source->listeners != listeners)
+        {
+            INFO2("listener count on %s now %d", source->mount, source->listeners);
+            stats_event_args (source->mount, "listeners", "%d", source->listeners);
+        }
+
+        if (remove_from_q)
+        {
+            refbuf_t *to_go = source->stream_data;
+            if (to_go->next)
+            {
+                source->stream_data = to_go->next;
+                source->queue_size -= to_go->len;
+                refbuf_release (to_go);
+            }
+            else
+                WARN0("possible queue length error");
+        }
+
/* release write lock on client_tree */
avl_tree_unlock(source->client_tree);
}
+    source_shutdown (source);
+}

-done:

+static void source_shutdown (source_t *source)
+{
source->running = 0;
INFO1("Source \"%s\" exiting", source->mount);

@@ -776,8 +716,6 @@

/* release our hold on the lock so the main thread can continue cleaning up */
thread_rwlock_unlock(source->shutdown_rwlock);
-
-    return;
}

static int _compare_clients(void *compare_arg, void *a, void *b)
@@ -816,7 +754,7 @@
static void _parse_audio_info (source_t *source, const char *s)
{
const char *start = s;
-    unsigned len;
+    unsigned int len;

while (start != NULL && *start != '\0')
{

Modified: icecast/branches/icecast-singleq/src/source.h
===================================================================
--- icecast/branches/icecast-singleq/src/source.h	2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/source.h	2004-07-19 23:36:00 UTC (rev 7178)
@@ -55,10 +55,17 @@
struct auth_tag *authenticator;
int fallback_override;
int no_mount;
+
+    unsigned queue_size;
unsigned queue_size_limit;
+
unsigned timeout;  /* source timeout in seconds */
-    refbuf_queue_t *queue;
-    mutex_t queue_mutex;
+    time_t last_read;
+    int short_delay;
+
+    refbuf_t *stream_data;
+    refbuf_t *stream_data_tail;
+
int burst_on_connect;
} source_t;




More information about the commits mailing list