[xiph-commits] r7178 - icecast/branches/icecast-singleq/src
karl at dactyl.lonelymoon.com
karl
Mon Jul 19 16:36:02 PDT 2004
Author: karl
Date: Mon Jul 19 16:36:02 2004
New Revision: 7178
Modified:
icecast/branches/icecast-singleq/src/admin.c
icecast/branches/icecast-singleq/src/client.c
icecast/branches/icecast-singleq/src/client.h
icecast/branches/icecast-singleq/src/connection.c
icecast/branches/icecast-singleq/src/format.c
icecast/branches/icecast-singleq/src/format.h
icecast/branches/icecast-singleq/src/format_mp3.c
icecast/branches/icecast-singleq/src/format_mp3.h
icecast/branches/icecast-singleq/src/format_vorbis.c
icecast/branches/icecast-singleq/src/format_vorbis.h
icecast/branches/icecast-singleq/src/refbuf.c
icecast/branches/icecast-singleq/src/refbuf.h
icecast/branches/icecast-singleq/src/source.c
icecast/branches/icecast-singleq/src/source.h
Log:
add single queue handling, and change format handlers to handle it
Modified: icecast/branches/icecast-singleq/src/admin.c
===================================================================
--- icecast/branches/icecast-singleq/src/admin.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/admin.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -799,11 +799,7 @@
state = source->format->_state;
- thread_mutex_lock(&(state->lock));
- free(state->metadata);
- state->metadata = strdup(value);
- state->metadata_age++;
- thread_mutex_unlock(&(state->lock));
+ mp3_set_tag (source->format, "title", value);
DEBUG2("Metadata on mountpoint %s changed to \"%s\"",
source->mount, value);
Modified: icecast/branches/icecast-singleq/src/client.c
===================================================================
--- icecast/branches/icecast-singleq/src/client.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/client.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -42,7 +42,7 @@
client->con = con;
client->parser = parser;
- client->queue = NULL;
+ client->refbuf = NULL;
client->pos = 0;
client->burst_sent = 0;
@@ -51,8 +51,6 @@
void client_destroy(client_t *client)
{
- refbuf_t *refbuf;
-
if (client == NULL)
return;
/* write log entry if ip is set (some things don't set it, like outgoing
@@ -64,9 +62,6 @@
connection_close(client->con);
httpp_destroy(client->parser);
- while ((refbuf = refbuf_queue_remove(&client->queue)))
- refbuf_release(refbuf);
-
/* we need to free client specific format data (if any) */
if (client->free_client_data)
client->free_client_data (client);
Modified: icecast/branches/icecast-singleq/src/client.h
===================================================================
--- icecast/branches/icecast-singleq/src/client.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/client.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -32,8 +32,9 @@
/* http response code for this client */
int respcode;
- /* buffer queue */
- refbuf_queue_t *queue;
+ /* where in the queue the client is */
+ refbuf_t *refbuf;
+
/* position in first buffer */
unsigned long pos;
Modified: icecast/branches/icecast-singleq/src/connection.c
===================================================================
--- icecast/branches/icecast-singleq/src/connection.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/connection.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -465,9 +465,8 @@
"for icecast 1.x relays. Assuming content is mp3.");
format_type = FORMAT_TYPE_MP3;
}
- source->format = format_get_plugin (format_type, source->mount, source->parser);
- if (source->format == NULL)
+ if (format_get_plugin (format_type, source) < 0)
{
global_unlock();
config_release_config();
@@ -960,8 +959,7 @@
global.clients++;
global_unlock();
- client->format_data = source->format->create_client_data(
- source->format, source, client);
+ source->format->create_client_data (source, client);
source->format->client_send_headers(source->format, source, client);
Modified: icecast/branches/icecast-singleq/src/format.c
===================================================================
--- icecast/branches/icecast-singleq/src/format.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -75,34 +75,24 @@
}
}
-format_plugin_t *format_get_plugin(format_type_t type, char *mount,
- http_parser_t *parser)
+int format_get_plugin(format_type_t type, source_t *source)
{
- format_plugin_t *plugin;
+ int ret = -1;
switch (type) {
case FORMAT_TYPE_VORBIS:
- plugin = format_vorbis_get_plugin();
- if (plugin) plugin->mount = mount;
+ ret = format_vorbis_get_plugin (source);
break;
case FORMAT_TYPE_MP3:
- plugin = format_mp3_get_plugin(parser);
- if (plugin) plugin->mount = mount;
+ ret = format_mp3_get_plugin (source);
break;
default:
- plugin = NULL;
break;
}
- return plugin;
+ return ret;
}
-int format_generic_write_buf_to_client(format_plugin_t *format,
- client_t *client, unsigned char *buf, int len)
-{
- return client_send_bytes (client, buf, len);
-}
-
void format_send_general_headers(format_plugin_t *format,
source_t *source, client_t *client)
{
Modified: icecast/branches/icecast-singleq/src/format.h
===================================================================
--- icecast/branches/icecast-singleq/src/format.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -45,13 +45,10 @@
*/
int has_predata;
- int (*get_buffer)(struct _format_plugin_tag *self, char *data, unsigned long
- len, refbuf_t **buffer);
- refbuf_queue_t *(*get_predata)(struct _format_plugin_tag *self);
- int (*write_buf_to_client)(struct _format_plugin_tag *format,
- client_t *client, unsigned char *buf, int len);
- void *(*create_client_data)(struct _format_plugin_tag *format,
- struct source_tag *source, client_t *client);
+ refbuf_t *(*get_buffer)(struct source_tag *);
+ int (*write_buf_to_client)(struct _format_plugin_tag *format, client_t *client);
+ void (*write_buf_to_file)(struct source_tag *source, refbuf_t *refbuf);
+ int (*create_client_data)(struct source_tag *source, client_t *client);
void (*client_send_headers)(struct _format_plugin_tag *format,
struct source_tag *source, client_t *client);
void (*free_plugin)(struct _format_plugin_tag *self);
@@ -62,11 +59,8 @@
format_type_t format_get_type(char *contenttype);
char *format_get_mimetype(format_type_t type);
-format_plugin_t *format_get_plugin(format_type_t type, char *mount,
- http_parser_t *parser);
+int format_get_plugin(format_type_t type, struct source_tag *source);
-int format_generic_write_buf_to_client(format_plugin_t *format,
- client_t *client, unsigned char *buf, int len);
void format_send_general_headers(format_plugin_t *format,
struct source_tag *source, client_t *client);
Modified: icecast/branches/icecast-singleq/src/format_mp3.c
===================================================================
--- icecast/branches/icecast-singleq/src/format_mp3.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_mp3.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -1,4 +1,3 @@
-/* -*- c-basic-offset: 4; -*- */
/* Icecast
*
* This program is distributed under the GNU General Public License, version 2.
@@ -11,6 +10,7 @@
* and others (see AUTHORS for details).
*/
+/* -*- c-basic-offset: 4; -*- */
/* format_mp3.c
**
** format plugin for mp3
@@ -54,38 +54,38 @@
#define ICY_METADATA_INTERVAL 16000
static void format_mp3_free_plugin(format_plugin_t *self);
-static int format_mp3_get_buffer(format_plugin_t *self, char *data,
- unsigned long len, refbuf_t **buffer);
-static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self);
-static void *format_mp3_create_client_data(format_plugin_t *self,
- source_t *source, client_t *client);
+static refbuf_t *mp3_get_filter_meta (source_t *source);
+static refbuf_t *mp3_get_no_meta (source_t *source);
+
+static int format_mp3_create_client_data (source_t *source, client_t *client);
static void free_mp3_client_data (client_t *client);
-static int format_mp3_write_buf_to_client(format_plugin_t *self,
- client_t *client, unsigned char *buf, int len);
+static int format_mp3_write_buf_to_client(format_plugin_t *self, client_t *client);
static void format_mp3_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
+static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf);
+
typedef struct {
int use_metadata;
- int interval;
- int offset;
- int metadata_age;
int metadata_offset;
+ unsigned since_meta_block;
+ int in_metadata;
+ refbuf_t *associated;
} mp3_client_data;
-format_plugin_t *format_mp3_get_plugin(http_parser_t *parser)
+int format_mp3_get_plugin (source_t *source)
{
char *metadata;
format_plugin_t *plugin;
mp3_state *state = calloc(1, sizeof(mp3_state));
+ refbuf_t *meta;
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));
plugin->type = FORMAT_TYPE_MP3;
- plugin->has_predata = 0;
- plugin->get_buffer = format_mp3_get_buffer;
- plugin->get_predata = format_mp3_get_predata;
+ plugin->get_buffer = mp3_get_no_meta;
plugin->write_buf_to_client = format_mp3_write_buf_to_client;
+ plugin->write_buf_to_file = write_mp3_to_file;
plugin->create_client_data = format_mp3_create_client_data;
plugin->client_send_headers = format_mp3_send_headers;
plugin->free_plugin = format_mp3_free_plugin;
@@ -93,309 +93,477 @@
plugin->_state = state;
- state->metadata_age = 0;
- state->metadata = strdup("");
- thread_mutex_create(&(state->lock));
+ meta = refbuf_new (1);
+ memcpy (meta->data, "", 1);
+ state->metadata = meta;
+ state->interval = ICY_METADATA_INTERVAL;
- metadata = httpp_getvar(parser, "icy-metaint");
- if(metadata)
- state->inline_metadata_interval = atoi(metadata);
+ metadata = httpp_getvar (source->parser, "icy-metaint");
+ if (metadata)
+ {
+ state->inline_metadata_interval = atoi (metadata);
+ state->offset = 0;
+ plugin->get_buffer = mp3_get_filter_meta;
+ }
+ source->format = plugin;
+ thread_mutex_create (&state->url_lock);
- return plugin;
+ return 0;
}
-static int send_metadata(client_t *client, mp3_client_data *client_state,
- mp3_state *source_state)
+void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value)
{
- int len_byte;
- int len;
- int ret = -1;
- unsigned char *buf;
- int source_age;
- char *fullmetadata = NULL;
- int fullmetadata_size = 0;
- const char meta_fmt[] = "StreamTitle='';";
+ mp3_state *source_mp3 = plugin->_state;
+ unsigned len;
+ const char meta[] = "StreamTitle='";
+ int size = sizeof (meta) + 1;
- do
+ if (tag==NULL || value == NULL)
+ return;
+
+ len = strlen (value)+1;
+ size += len;
+ /* protect against multiple updaters */
+ thread_mutex_lock (&source_mp3->url_lock);
+ if (strcmp (tag, "title") == 0 || strcmp (tag, "song") == 0)
{
- thread_mutex_lock (&(source_state->lock));
- if (source_state->metadata == NULL)
- break; /* Shouldn't be possible */
-
- fullmetadata_size = strlen (source_state->metadata) + sizeof (meta_fmt);
-
- /* Noted without comment: When the metadata string is a multiple of
- * 16 bytes, the "reference" shoutcast server increases the length
- * byte by one and appends 16 nulls, rather than omitting the
- * pointless trailing null. */
- if (fullmetadata_size > 4079)
+ char *p = strdup (value);
+ if (p)
{
- fullmetadata_size = 4079;
+ free (source_mp3->url_title);
+ free (source_mp3->url_artist);
+ source_mp3->url_artist = NULL;
+ source_mp3->url_title = p;
+ source_mp3->update_metadata = 1;
}
- fullmetadata = malloc (fullmetadata_size);
- if (fullmetadata == NULL)
- break;
+ }
+ else if (strcmp (tag, "artist") == 0)
+ {
+ char *p = strdup (value);
+ if (p)
+ {
+ free (source_mp3->url_artist);
+ source_mp3->url_artist = p;
+ source_mp3->update_metadata = 1;
+ }
+ }
+ thread_mutex_unlock (&source_mp3->url_lock);
+}
- fullmetadata_size = snprintf (fullmetadata, fullmetadata_size,
- "StreamTitle='%.*s';", fullmetadata_size-(sizeof (meta_fmt)-1), source_state->metadata);
- source_age = source_state->metadata_age;
+static void filter_shoutcast_metadata (source_t *source, char *metadata, unsigned meta_len)
+{
+ if (metadata)
+ {
+ char *end, *p;
+ int len;
- if (fullmetadata_size > 0 && source_age != client_state->metadata_age)
+ do
{
- len_byte = (fullmetadata_size)/16 + 1; /* to give 1-255 */
- client_state->metadata_offset = 0;
- }
- else
- len_byte = 0;
- len = 1 + len_byte*16;
- buf = calloc (1, len);
- if (buf == NULL)
- break;
+ metadata++;
+ if (strncmp (metadata, "StreamTitle='", 13))
+ break;
+ if ((end = strstr (metadata, "\';")) == NULL)
+ break;
+ len = (end - metadata) - 13;
+ p = calloc (1, len+1);
+ if (p)
+ {
+ memcpy (p, metadata+13, len);
+ stats_event (source->mount, "title", p);
+ yp_touch (source->mount);
+ free (p);
+ }
+ } while (0);
+ }
+}
- buf[0] = len_byte;
- if (len > 1) {
- strncpy (buf+1, fullmetadata, len-1);
- buf[len-1] = '\0';
- }
+/* called from the source thread when the metadata has been updated.
+ * The artist title are checked and made ready for clients to send
+ */
+void mp3_set_title (source_t *source)
+{
+ const char meta[] = "StreamTitle='";
+ int size;
+ unsigned char len_byte;
+ refbuf_t *p;
+ unsigned len = sizeof(meta) + 2; /* the StreamTitle, quotes, ; and null */
+ mp3_state *source_mp3 = source->format->_state;
- thread_mutex_unlock (&(source_state->lock));
+ /* make sure the url data does not disappear from under us */
+ thread_mutex_lock (&source_mp3->url_lock);
- /* only write what hasn't been written already */
- ret = client_send_bytes (client, buf + client_state->metadata_offset,
- len - client_state->metadata_offset);
+ /* work out message length */
+ if (source_mp3->url_artist)
+ len += strlen (source_mp3->url_artist);
+ if (source_mp3->url_title)
+ len += strlen (source_mp3->url_title);
+ if (source_mp3->url_artist && source_mp3->url_title)
+ len += 3;
+#define MAX_META_LEN 255*16
+ if (len > MAX_META_LEN)
+ {
+ thread_mutex_unlock (&source_mp3->url_lock);
+ WARN1 ("Metadata too long at %d chars", len);
+ return;
+ }
+ /* work out the metadata len byte */
+ len_byte = (len-1) / 16 + 1;
- if (ret > 0 && ret < len) {
- client_state->metadata_offset += ret;
- }
- else if (ret == len) {
- client_state->metadata_age = source_age;
- client_state->offset = 0;
- client_state->metadata_offset = 0;
- }
- free (buf);
- free (fullmetadata);
- return ret;
+ /* now we know how much space to allocate, +1 for the len byte */
+ size = len_byte * 16 + 1;
- } while (0);
+ p = refbuf_new (size);
+ if (p)
+ {
+ mp3_state *source_mp3 = source->format->_state;
- thread_mutex_unlock(&(source_state->lock));
- free (fullmetadata);
- return -1;
+ memset (p->data, '\0', size);
+ if (source_mp3->url_artist && source_mp3->url_title)
+ snprintf (p->data, size, "%c%s%s - %s';", len_byte, meta,
+ source_mp3->url_artist, source_mp3->url_title);
+ else
+ snprintf (p->data, size, "%c%s%s';", len_byte, meta,
+ source_mp3->url_title);
+ filter_shoutcast_metadata (source, p->data, size);
+
+ refbuf_release (source_mp3->metadata);
+ source_mp3->metadata = p;
+ }
+ thread_mutex_unlock (&source_mp3->url_lock);
}
-static int format_mp3_write_buf_to_client(format_plugin_t *self,
- client_t *client, unsigned char *buf, int len)
+
+/* send the appropriate metadata, and return the number of bytes written
+ * which is 0 or greater. Check the client in_metadata value afterwards
+ * to see if all metadata has been sent
+ */
+static int send_mp3_metadata (client_t *client, refbuf_t *associated)
{
int ret = 0;
- mp3_client_data *mp3data = client->format_data;
-
- if(((mp3_state *)self->_state)->metadata && mp3data->use_metadata)
+ unsigned char *metadata;
+ int meta_len;
+ mp3_client_data *client_mp3 = client->format_data;
+
+ /* If there is a change in metadata then send it else
+ * send a single zero value byte in its place
+ */
+ if (associated == client_mp3->associated)
{
- mp3_client_data *state = client->format_data;
- int max = state->interval - state->offset;
+ metadata = "\0";
+ meta_len = 1;
+ }
+ else
+ {
+ metadata = associated->data + client_mp3->metadata_offset;
+ meta_len = associated->len - client_mp3->metadata_offset;
+ }
+ ret = client_send_bytes (client, metadata, meta_len);
- if(len == 0) /* Shouldn't happen */
- return 0;
+ if (ret == meta_len)
+ {
+ client_mp3->associated = associated;
+ client_mp3->metadata_offset = 0;
+ client_mp3->in_metadata = 0;
+ client_mp3->since_meta_block = 0;
+ return ret;
+ }
+ if (ret > 0)
+ client_mp3->metadata_offset += ret;
+ else
+ ret = 0;
+ client_mp3->in_metadata = 1;
- if(max > len)
- max = len;
+ return ret;
+}
- if(max > 0) {
- ret = client_send_bytes (client, buf, max);
- if(ret > 0)
- state->offset += ret;
+
+/* Handler for writing mp3 data to a client, taking into account whether
+ * client has requested shoutcast style metadata updates
+ */
+static int format_mp3_write_buf_to_client (format_plugin_t *self, client_t *client)
+{
+ int ret, written = 0;
+ mp3_client_data *client_mp3 = client->format_data;
+ mp3_state *source_mp3 = self->_state;
+ refbuf_t *refbuf = client->refbuf;
+ char *buf;
+ unsigned len;
+
+ if (refbuf->next == NULL && client->pos == refbuf->len)
+ return 0;
+ buf = refbuf->data + client->pos;
+ len = refbuf->len - client->pos;
+
+ do
+ {
+ /* send any unwritten metadata to the client */
+ if (client_mp3->in_metadata)
+ {
+ refbuf_t *associated = refbuf->associated;
+ ret = send_mp3_metadata (client, associated);
+
+ if (client_mp3->in_metadata)
+ break;
+ written += ret;
}
- else {
- send_metadata (client, state, self->_state);
+ /* see if we need to send the current metadata to the client */
+ if (client_mp3->use_metadata)
+ {
+ unsigned remaining = source_mp3->interval -
+ client_mp3->since_meta_block;
+
+ /* sending the metadata block */
+ if (remaining <= len)
+ {
+ /* send any mp3 before the metadata block */
+ if (remaining)
+ {
+ ret = client_send_bytes (client, buf, remaining);
+
+ if (ret > 0)
+ {
+ client_mp3->since_meta_block += ret;
+ client->pos += ret;
+ }
+ if (ret < (int)remaining)
+ break;
+ written += ret;
+ }
+ ret = send_mp3_metadata (client, refbuf->associated);
+ if (client_mp3->in_metadata)
+ break;
+ written += ret;
+ /* change buf and len */
+ buf += remaining;
+ len -= remaining;
+ }
}
+ /* write any mp3, maybe after the metadata block */
+ if (len)
+ {
+ ret = client_send_bytes (client, buf, len);
- }
- else {
- ret = client_send_bytes (client, buf, len);
- }
+ if (ret > 0)
+ {
+ client_mp3->since_meta_block += ret;
+ client->pos += ret;
+ }
+ if (ret < (int)len)
+ break;
+ written += ret;
+ }
+ ret = 0;
+ /* we have now written what we needed to so move to the next buffer */
+ if (refbuf->next)
+ {
+ client->refbuf = refbuf->next;
+ client->pos = 0;
+ }
+ } while (0);
- return ret;
+ if (ret > 0)
+ written += ret;
+ return written;
}
static void format_mp3_free_plugin(format_plugin_t *self)
{
/* free the plugin instance */
mp3_state *state = self->_state;
- thread_mutex_destroy(&(state->lock));
- free(state->metadata);
+ thread_mutex_destroy (&state->url_lock);
+ free (state->url_artist);
+ free (state->url_title);
+ refbuf_release (state->metadata);
free(state);
free(self);
}
-static int format_mp3_get_buffer(format_plugin_t *self, char *data,
- unsigned long len, refbuf_t **buffer)
+
+/* read an mp3 stream which does not have shoutcast style metadata */
+static refbuf_t *mp3_get_no_meta (source_t *source)
{
+ int bytes;
refbuf_t *refbuf;
- mp3_state *state = self->_state;
+ mp3_state *source_mp3 = source->format->_state;
- /* Set this to NULL in case it doesn't get set to a valid buffer later */
- *buffer = NULL;
+ if ((refbuf = refbuf_new (2048)) == NULL)
+ return NULL;
+ bytes = sock_read_bytes (source->con->sock, refbuf->data, 2048);
- if(!data)
- return 0;
+ if (bytes == 0)
+ {
+ INFO1 ("End of stream %s", source->mount);
+ source->running = 0;
+ refbuf_release (refbuf);
+ return NULL;
+ }
+ if (source_mp3->update_metadata)
+ {
+ mp3_set_title (source);
+ source_mp3->update_metadata = 0;
+ }
+ if (bytes > 0)
+ {
+ refbuf->len = bytes;
+ refbuf->associated = source_mp3->metadata;
+ refbuf_addref (source_mp3->metadata);
+ return refbuf;
+ }
+ refbuf_release (refbuf);
- if(state->inline_metadata_interval) {
- /* Source is sending metadata, handle it... */
+ if (!sock_recoverable (sock_error()))
+ source->running = 0;
- while(len > 0) {
- int to_read = state->inline_metadata_interval - state->offset;
- if(to_read > 0) {
- refbuf_t *old_refbuf = *buffer;
+ return NULL;
+}
- if(to_read > len)
- to_read = len;
- if(old_refbuf) {
- refbuf = refbuf_new(to_read + old_refbuf->len);
- memcpy(refbuf->data, old_refbuf->data, old_refbuf->len);
- memcpy(refbuf->data+old_refbuf->len, data, to_read);
+/* read mp3 data with inlined metadata from the source. Filter out the
+ * metadata so that the mp3 data itself is store on the queue and the
+ * metadata is is associated with it
+ */
+static refbuf_t *mp3_get_filter_meta (source_t *source)
+{
+ refbuf_t *refbuf;
+ format_plugin_t *plugin = source->format;
+ mp3_state *source_mp3 = plugin->_state;
+ unsigned char *src;
+ unsigned bytes, mp3_block;
+ int ret;
- refbuf_release(old_refbuf);
- }
- else {
- refbuf = refbuf_new(to_read);
- memcpy(refbuf->data, data, to_read);
- }
+ refbuf = refbuf_new (2048);
+ src = refbuf->data;
- *buffer = refbuf;
+ ret = sock_read_bytes (source->con->sock, refbuf->data, 2048);
- state->offset += to_read;
- data += to_read;
- len -= to_read;
- }
- else if(!state->metadata_length) {
- /* Next up is the metadata byte... */
- unsigned char byte = data[0];
- data++;
- len--;
+ if (ret == 0)
+ {
+ INFO1 ("End of stream %s", source->mount);
+ source->running = 0;
+ refbuf_release (refbuf);
+ return NULL;
+ }
+ if (source_mp3->update_metadata)
+ {
+ mp3_set_title (source);
+ source_mp3->update_metadata = 0;
+ }
+ if (ret < 0)
+ {
+ refbuf_release (refbuf);
+ if (sock_recoverable (sock_error()))
+ return NULL; /* go back to waiting */
+ INFO0 ("Error on connection from source");
+ source->running = 0;
+ return NULL;
+ }
+ /* fill the buffer with the read data */
+ bytes = (unsigned int)ret;
+ refbuf->len = 0;
+ while (bytes > 0)
+ {
+ unsigned int metadata_remaining;
- /* According to the "spec"... this byte * 16 */
- state->metadata_length = byte * 16;
+ mp3_block = source_mp3->inline_metadata_interval - source_mp3->offset;
- if(state->metadata_length) {
- state->metadata_buffer =
- calloc(state->metadata_length + 1, 1);
+ /* is there only enough to account for mp3 data */
+ if (bytes <= mp3_block)
+ {
+ refbuf->len += bytes;
+ source_mp3->offset += bytes;
+ break;
+ }
+ /* we have enough data to get to the metadata
+ * block, but only transfer upto it */
+ if (mp3_block)
+ {
+ src += mp3_block;
+ bytes -= mp3_block;
+ refbuf->len += mp3_block;
+ source_mp3->offset += mp3_block;
+ continue;
+ }
- /* Ensure we have a null-terminator even if the source
- * stream is invalid.
- */
- state->metadata_buffer[state->metadata_length] = 0;
- }
- else {
- state->offset = 0;
- }
+ /* process the inline metadata, len == 0 indicates not seen any yet */
+ if (source_mp3->build_metadata_len == 0)
+ {
+ memset (source_mp3->build_metadata, 0,
+ sizeof (source_mp3->build_metadata));
+ source_mp3->build_metadata_offset = 0;
+ source_mp3->build_metadata_len = 1 + (*src * 16);
+ }
- state->metadata_offset = 0;
- }
- else {
- /* Metadata to read! */
- int readable = state->metadata_length - state->metadata_offset;
+ /* do we have all of the metatdata block */
+ metadata_remaining = source_mp3->build_metadata_len -
+ source_mp3->build_metadata_offset;
+ if (bytes < metadata_remaining)
+ {
+ memcpy (source_mp3->build_metadata +
+ source_mp3->build_metadata_offset, src, bytes);
+ source_mp3->build_metadata_offset += bytes;
+ break;
+ }
+ /* copy all bytes except the last one, that way we
+ * know a null byte terminates the message */
+ memcpy (source_mp3->build_metadata + source_mp3->build_metadata_offset,
+ src, metadata_remaining-1);
- if(readable > len)
- readable = len;
+ /* overwrite metadata in the buffer */
+ bytes -= metadata_remaining;
+ memmove (src, src+metadata_remaining, bytes);
- memcpy(state->metadata_buffer + state->metadata_offset,
- data, readable);
+ /* assign metadata if it's not 1 byte, as that indicates a change */
+ if (source_mp3->build_metadata_len > 1)
+ {
+ refbuf_t *meta = refbuf_new (source_mp3->build_metadata_len);
+ memcpy (meta->data, source_mp3->build_metadata,
+ source_mp3->build_metadata_len);
- state->metadata_offset += readable;
-
- data += readable;
- len -= readable;
-
- if(state->metadata_offset == state->metadata_length)
- {
- if(state->metadata_length)
- {
- thread_mutex_lock(&(state->lock));
- free(state->metadata);
- /* Now, reformat state->metadata_buffer to strip off
- StreamTitle=' and the closing '; (but only if there's
- enough data for it to be correctly formatted) */
- if(state->metadata_length >= 15) {
- /* This is overly complex because the
- metadata_length is the length of the actual raw
- data, but the (null-terminated) string is going
- to be shorter than this, and we can't trust that
- the raw data doesn't have other embedded-nulls */
- int stringlength;
-
- state->metadata = malloc(state->metadata_length -
- 12);
- memcpy(state->metadata,
- state->metadata_buffer + 13,
- state->metadata_length - 13);
- /* Make sure we've got a null-terminator of some
- sort */
- state->metadata[state->metadata_length - 13] = 0;
-
- /* Now figure out the _right_ one */
- stringlength = strlen(state->metadata);
- if(stringlength > 2)
- state->metadata[stringlength - 2] = 0;
- free(state->metadata_buffer);
- }
- else
- state->metadata = state->metadata_buffer;
-
- stats_event(self->mount, "title", state->metadata);
- state->metadata_buffer = NULL;
- state->metadata_age++;
- thread_mutex_unlock(&(state->lock));
- yp_touch (self->mount);
- }
-
- state->offset = 0;
- state->metadata_length = 0;
- }
+ DEBUG1("shoutcast metadata %.4080s", meta->data+1);
+ if (strncmp (meta->data+1, "StreamTitle=", 12) == 0)
+ {
+ filter_shoutcast_metadata (source, source_mp3->build_metadata,
+ source_mp3->build_metadata_len);
+ refbuf_release (source_mp3->metadata);
+ source_mp3->metadata = meta;
}
+ else
+ {
+ ERROR0 ("Incorrect metadata format, ending stream");
+ source->running = 0;
+ refbuf_release (refbuf);
+ return NULL;
+ }
}
-
- /* Either we got a buffer above (in which case it can be used), or
- * we set *buffer to NULL in the prologue, so the return value is
- * correct anyway...
- */
- return 0;
+ source_mp3->offset = 0;
+ source_mp3->build_metadata_len = 0;
}
- else {
- /* Simple case - no metadata, just dump data directly to a buffer */
- refbuf = refbuf_new(len);
+ refbuf->associated = source_mp3->metadata;
+ refbuf_addref (source_mp3->metadata);
- memcpy(refbuf->data, data, len);
-
- *buffer = refbuf;
- return 0;
- }
+ return refbuf;
}
-static refbuf_queue_t *format_mp3_get_predata(format_plugin_t *self)
-{
- return NULL;
-}
-static void *format_mp3_create_client_data(format_plugin_t *self,
- source_t *source, client_t *client)
+static int format_mp3_create_client_data(source_t *source, client_t *client)
{
mp3_client_data *data = calloc(1,sizeof(mp3_client_data));
char *metadata;
- data->interval = ICY_METADATA_INTERVAL;
- data->offset = 0;
+ if (data == NULL)
+ return -1;
+
+ client->format_data = data;
client->free_client_data = free_mp3_client_data;
-
metadata = httpp_getvar(client->parser, "icy-metadata");
if(metadata)
data->use_metadata = atoi(metadata)>0?1:0;
- return data;
+ return 0;
}
@@ -432,3 +600,16 @@
format_send_general_headers(self, source, client);
}
+
+static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf)
+{
+ if (refbuf->len == 0)
+ return;
+ if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) < (size_t)refbuf->len)
+ {
+ WARN0 ("Write to dump file failed, disabling");
+ fclose (source->dumpfile);
+ source->dumpfile = NULL;
+ }
+}
+
Modified: icecast/branches/icecast-singleq/src/format_mp3.h
===================================================================
--- icecast/branches/icecast-singleq/src/format_mp3.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_mp3.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -19,18 +19,23 @@
#define __FORMAT_MP3_H__
typedef struct {
- char *metadata;
- int metadata_age;
- mutex_t lock;
-
/* These are for inline metadata */
int inline_metadata_interval;
int offset;
- int metadata_length;
- char *metadata_buffer;
- int metadata_offset;
+ unsigned interval;
+ char *url_artist;
+ char *url_title;
+ int update_metadata;
+
+ refbuf_t *metadata;
+ mutex_t url_lock;
+
+ unsigned build_metadata_len;
+ unsigned build_metadata_offset;
+ char build_metadata[4081];
} mp3_state;
-format_plugin_t *format_mp3_get_plugin(http_parser_t *parser);
+int format_mp3_get_plugin(struct source_tag *src);
+void mp3_set_tag (format_plugin_t *plugin, char *tag, char *value);
#endif /* __FORMAT_MP3_H__ */
Modified: icecast/branches/icecast-singleq/src/format_vorbis.c
===================================================================
--- icecast/branches/icecast-singleq/src/format_vorbis.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_vorbis.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -49,20 +49,31 @@
ogg_page og;
unsigned long serialno;
int header;
- refbuf_t *headbuf[MAX_HEADER_PAGES];
+ refbuf_t *file_headers;
+ refbuf_t *header_pages;
+ refbuf_t *header_pages_tail;
int packets;
} vstate_t;
+struct client_vorbis
+{
+ refbuf_t *headers;
+ refbuf_t *header_page;
+ unsigned int pos;
+ int processing_headers;
+};
+
+
static void format_vorbis_free_plugin(format_plugin_t *self);
-static int format_vorbis_get_buffer(format_plugin_t *self, char *data,
- unsigned long len, refbuf_t **buffer);
-static refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self);
-static void *format_vorbis_create_client_data(format_plugin_t *self,
- source_t *source, client_t *client);
+static refbuf_t *format_vorbis_get_buffer (source_t *source);
+static int format_vorbis_create_client_data (source_t *source, client_t *client);
static void format_vorbis_send_headers(format_plugin_t *self,
source_t *source, client_t *client);
+static int write_buf_to_client (format_plugin_t *self, client_t *client);
+static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf);
-format_plugin_t *format_vorbis_get_plugin(void)
+
+int format_vorbis_get_plugin(source_t *source)
{
format_plugin_t *plugin;
vstate_t *state;
@@ -70,10 +81,9 @@
plugin = (format_plugin_t *)malloc(sizeof(format_plugin_t));
plugin->type = FORMAT_TYPE_VORBIS;
- plugin->has_predata = 1;
+ plugin->write_buf_to_file = write_ogg_to_file;
plugin->get_buffer = format_vorbis_get_buffer;
- plugin->get_predata = format_vorbis_get_predata;
- plugin->write_buf_to_client = format_generic_write_buf_to_client;
+ plugin->write_buf_to_client = write_buf_to_client;
plugin->create_client_data = format_vorbis_create_client_data;
plugin->client_send_headers = format_vorbis_send_headers;
plugin->free_plugin = format_vorbis_free_plugin;
@@ -83,52 +93,67 @@
ogg_sync_init(&state->oy);
plugin->_state = (void *)state;
+ source->format = plugin;
- return plugin;
+ return 0;
}
void format_vorbis_free_plugin(format_plugin_t *self)
{
- int i;
vstate_t *state = (vstate_t *)self->_state;
+ refbuf_t *header;
/* free memory associated with this plugin instance */
/* free state memory */
+ while (header)
+ {
+ refbuf_t *to_release = header;
+ header = header->next;
+ refbuf_release (to_release);
+ }
ogg_sync_clear(&state->oy);
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
- for (i = 0; i < MAX_HEADER_PAGES; i++) {
- if (state->headbuf[i]) {
- refbuf_release(state->headbuf[i]);
- state->headbuf[i] = NULL;
- }
- }
-
free(state);
/* free the plugin instance */
free(self);
}
-int format_vorbis_get_buffer(format_plugin_t *self, char *data, unsigned long len, refbuf_t **buffer)
+static refbuf_t *format_vorbis_get_buffer (source_t *source)
{
- char *buf;
- int i, result;
+ int result;
ogg_packet op;
char *tag;
- refbuf_t *refbuf, *source_refbuf;
+ refbuf_t *refbuf, *header;
+ char *data;
+ format_plugin_t *self = source->format;
+ int bytes;
vstate_t *state = (vstate_t *)self->_state;
- source_t *source;
- if (data) {
- /* write the data to the buffer */
- buf = ogg_sync_buffer(&state->oy, len);
- memcpy(buf, data, len);
- ogg_sync_wrote(&state->oy, len);
+ data = ogg_sync_buffer (&state->oy, 4096);
+
+ bytes = sock_read_bytes (source->con->sock, data, 4096);
+ if (bytes < 0)
+ {
+ if (sock_recoverable (sock_error()))
+ return NULL;
+ WARN0 ("source connection has died");
+ ogg_sync_wrote (&state->oy, 0);
+ source->running = 0;
+ return NULL;
}
+ if (bytes == 0)
+ {
+ INFO1 ("End of Stream %s", source->mount);
+ ogg_sync_wrote (&state->oy, 0);
+ source->running = 0;
+ return NULL;
+ }
+ ogg_sync_wrote (&state->oy, bytes);
refbuf = NULL;
if (ogg_sync_pageout(&state->oy, &state->og) == 1) {
@@ -137,21 +162,25 @@
memcpy(&refbuf->data[state->og.header_len], state->og.body, state->og.body_len);
if (state->serialno != ogg_page_serialno(&state->og)) {
+ DEBUG0("new stream");
/* this is a new logical bitstream */
state->header = 0;
state->packets = 0;
- /* release old headers, stream state, vorbis data */
- for (i = 0; i < MAX_HEADER_PAGES; i++) {
- if (state->headbuf[i]) {
- refbuf_release(state->headbuf[i]);
- state->headbuf[i] = NULL;
- }
+ /* Clear old stuff. Rarely but occasionally needed. */
+ header = state->header_pages;
+ while (header)
+ {
+ DEBUG0 ("clearing out header page");
+ refbuf_t *to_release = header;
+ header = header->next;
+ refbuf_release (to_release);
}
- /* Clear old stuff. Rarely but occasionally needed. */
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
+ state->header_pages = NULL;
+ state->header_pages_tail = NULL;
state->serialno = ogg_page_serialno(&state->og);
ogg_stream_init(&state->os, state->serialno);
@@ -164,52 +193,40 @@
* extras pages beyond the header. We need to collect the pages
* here anyway, but they may have to be discarded later.
*/
+ DEBUG1 ("header %d", state->header);
if (ogg_page_granulepos(&state->og) <= 0) {
state->header++;
} else {
/* we're done caching headers */
state->header = -1;
+ DEBUG0 ("doing stats");
/* put known comments in the stats */
tag = vorbis_comment_query(&state->vc, "TITLE", 0);
- if (tag) stats_event(self->mount, "title", tag);
- else stats_event(self->mount, "title", "unknown");
+ if (tag) stats_event(source->mount, "title", tag);
+ else stats_event(source->mount, "title", "unknown");
tag = vorbis_comment_query(&state->vc, "ARTIST", 0);
- if (tag) stats_event(self->mount, "artist", tag);
- else stats_event(self->mount, "artist", "unknown");
+ if (tag) stats_event(source->mount, "artist", tag);
+ else stats_event(source->mount, "artist", "unknown");
/* don't need these now */
ogg_stream_clear(&state->os);
vorbis_comment_clear(&state->vc);
vorbis_info_clear(&state->vi);
- /* Drain the source queue on metadata update otherwise you
- could have a mismatch between what is on the source queue
- and what is in the state->headbuf */
- avl_tree_rlock(global.source_tree);
- source = source_find_mount_raw(self->mount);
- avl_tree_unlock(global.source_tree);
-
- thread_mutex_lock(&source->queue_mutex);
- while ((source_refbuf = refbuf_queue_remove(&source->queue))) {
- refbuf_release(source_refbuf);
- }
- thread_mutex_unlock(&source->queue_mutex);
-
- yp_touch (self->mount);
+ yp_touch (source->mount);
}
}
/* cache header pages */
if (state->header > 0 && state->packets < 3) {
- if(state->header > MAX_HEADER_PAGES) {
- refbuf_release(refbuf);
- ERROR1("Bad vorbis input: header is more than %d pages long", MAX_HEADER_PAGES);
+ /* build a list of headers pages for attaching */
+ if (state->header_pages_tail)
+ state->header_pages_tail->next = refbuf;
+ state->header_pages_tail = refbuf;
- return -1;
- }
- refbuf_addref(refbuf);
- state->headbuf[state->header - 1] = refbuf;
+ if (state->header_pages == NULL)
+ state->header_pages = refbuf;
if (state->packets >= 0 && state->packets < 3) {
ogg_stream_pagein(&state->os, &state->og);
@@ -229,36 +246,40 @@
}
}
}
+ /* we do not place ogg headers on the main queue */
+ return NULL;
}
+ /* increase ref counts on each header page going out */
+ header = state->header_pages;
+ while (header)
+ {
+ refbuf_addref (header);
+ header = header->next;
+ }
+ refbuf->associated = state->header_pages;
}
- *buffer = refbuf;
- return 0;
+ return refbuf;
}
-refbuf_queue_t *format_vorbis_get_predata(format_plugin_t *self)
+static void free_ogg_client_data (client_t *client)
{
- refbuf_queue_t *queue;
- int i;
- vstate_t *state = (vstate_t *)self->_state;
-
- queue = NULL;
- for (i = 0; i < MAX_HEADER_PAGES; i++) {
- if (state->headbuf[i]) {
- refbuf_addref(state->headbuf[i]);
- refbuf_queue_add(&queue, state->headbuf[i]);
- } else {
- break;
- }
- }
-
- return queue;
+ free (client->format_data);
+ client->format_data = NULL;
}
-static void *format_vorbis_create_client_data(format_plugin_t *self,
- source_t *source, client_t *client)
+static int format_vorbis_create_client_data (source_t *source, client_t *client)
{
- return NULL;
+ struct client_vorbis *client_data = calloc (1, sizeof (struct client_vorbis));
+ int ret = -1;
+
+ if (client_data)
+ {
+ client->format_data = client_data;
+ client->free_client_data = free_ogg_client_data;
+ ret = 0;
+ }
+ return ret;
}
static void format_vorbis_send_headers(format_plugin_t *self,
@@ -277,4 +298,123 @@
format_send_general_headers(self, source, client);
}
+static int send_ogg_headers (client_t *client, refbuf_t *headers)
+{
+ struct client_vorbis *client_data = client->format_data;
+ refbuf_t *refbuf;
+ int written = 0;
+ if (client_data->processing_headers == 0)
+ {
+ client_data->header_page = headers;
+ client_data->pos = 0;
+ client_data->processing_headers = 1;
+ }
+ refbuf = client_data->header_page;
+ while (refbuf)
+ {
+ char *data = refbuf->data + client_data->pos;
+ unsigned len = refbuf->len - client_data->pos;
+ int ret;
+
+ ret = client_send_bytes (client, data, len);
+ if (ret > 0)
+ {
+ written += ret;
+ client_data->pos += ret;
+ }
+ if (ret < (int)len)
+ return written;
+ if (client_data->pos == refbuf->len)
+ {
+ refbuf = refbuf->next;
+ client_data->header_page = refbuf;
+ client_data->pos = 0;
+ }
+ }
+ /* update client info on headers sent */
+ client_data->processing_headers = 0;
+ client_data->headers = headers;
+ return written;
+}
+
+static int write_buf_to_client (format_plugin_t *self, client_t *client)
+{
+ refbuf_t *refbuf = client->refbuf;
+ char *buf;
+ unsigned len;
+ struct client_vorbis *client_data = client->format_data;
+ int ret, written = 0;
+
+ if (refbuf->next == NULL && client->pos == refbuf->len)
+ return 0;
+
+ if (refbuf->next && client->pos == refbuf->len)
+ {
+ client->refbuf = refbuf->next;
+ client->pos = 0;
+ }
+ refbuf = client->refbuf;
+ do
+ {
+ if (client_data->headers != refbuf->associated)
+ {
+ /* different headers seen so send the new ones */
+ ret = send_ogg_headers (client, refbuf->associated);
+ if (client_data->processing_headers)
+ break;
+ written += ret;
+ }
+ buf = refbuf->data + client->pos;
+ len = refbuf->len - client->pos;
+ ret = client_send_bytes (client, buf, len);
+
+ if (ret > 0)
+ client->pos += ret;
+
+ if (ret < (int)len)
+ break;
+ written += ret;
+ /* we have now written the page(s) */
+ ret = 0;
+ } while (0);
+
+ if (ret > 0)
+ written += ret;
+ return written;
+}
+
+static int write_ogg_data (struct source_tag *source, refbuf_t *refbuf)
+{
+ int ret = 1;
+
+ if (fwrite (refbuf->data, 1, refbuf->len, source->dumpfile) != refbuf->len)
+ {
+ WARN0 ("Write to dump file failed, disabling");
+ fclose (source->dumpfile);
+ source->dumpfile = NULL;
+ ret = 0;
+ }
+ return ret;
+}
+
+
+static void write_ogg_to_file (struct source_tag *source, refbuf_t *refbuf)
+{
+ vstate_t *state = (vstate_t *)source->format->_state;
+
+
+ if (state->file_headers != refbuf->associated)
+ {
+ refbuf_t *header = refbuf->associated;
+ while (header)
+ {
+ if (write_ogg_data (source, header) == 0)
+ return;
+ header = header->next;
+ }
+ state->file_headers = refbuf->associated;
+ }
+ write_ogg_data (source, refbuf);
+}
+
Modified: icecast/branches/icecast-singleq/src/format_vorbis.h
===================================================================
--- icecast/branches/icecast-singleq/src/format_vorbis.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/format_vorbis.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -18,6 +18,6 @@
#ifndef __FORMAT_VORBIS_H__
#define __FORMAT_VORBIS_H__
-format_plugin_t *format_vorbis_get_plugin(void);
+int format_vorbis_get_plugin(source_t *source);
#endif /* __FORMAT_VORBIS_H__ */
Modified: icecast/branches/icecast-singleq/src/refbuf.c
===================================================================
--- icecast/branches/icecast-singleq/src/refbuf.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/refbuf.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -38,9 +38,22 @@
refbuf_t *refbuf;
refbuf = (refbuf_t *)malloc(sizeof(refbuf_t));
- refbuf->data = (void *)malloc(size);
+ if (refbuf == NULL)
+ return NULL;
+ refbuf->data = NULL;
+ if (size)
+ {
+ refbuf->data = malloc (size);
+ if (refbuf->data == NULL)
+ {
+ free (refbuf);
+ return NULL;
+ }
+ }
refbuf->len = size;
refbuf->_count = 1;
+ refbuf->next = NULL;
+ refbuf->associated = NULL;
return refbuf;
}
@@ -54,7 +67,14 @@
{
self->_count--;
if (self->_count == 0) {
- free(self->data);
+ while (self->associated)
+ {
+ refbuf_t *ref = self->associated;
+ self->associated = ref->next;
+ refbuf_release (ref);
+ }
+ if (self->len)
+ free(self->data);
free(self);
return;
}
Modified: icecast/branches/icecast-singleq/src/refbuf.h
===================================================================
--- icecast/branches/icecast-singleq/src/refbuf.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/refbuf.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -22,6 +22,8 @@
{
char *data;
long len;
+ struct _refbuf_tag *associated;
+ struct _refbuf_tag *next;
unsigned long _count;
} refbuf_t;
@@ -53,10 +55,3 @@
#endif /* __REFBUF_H__ */
-
-
-
-
-
-
-
Modified: icecast/branches/icecast-singleq/src/source.c
===================================================================
--- icecast/branches/icecast-singleq/src/source.c 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/source.c 2004-07-19 23:36:00 UTC (rev 7178)
@@ -60,6 +60,7 @@
static int _compare_clients(void *compare_arg, void *a, void *b);
static int _free_client(void *key);
static void _parse_audio_info (source_t *source, const char *s);
+static void source_shutdown (source_t *source);
/* Allocate a new source with the stated mountpoint, if one already
* exists with that mountpoint in the global source tree then return
@@ -189,7 +190,6 @@
void source_clear_source (source_t *source)
{
- refbuf_t *refbuf;
DEBUG1 ("clearing source \"%s\"", source->mount);
client_destroy(source->client);
source->client = NULL;
@@ -239,12 +239,17 @@
free(source->dumpfilename);
source->dumpfilename = NULL;
+
/* Lets clear out the source queue too */
- while ((refbuf = refbuf_queue_remove(&source->queue)))
- refbuf_release(refbuf);
- source->queue = NULL;
+ while (source->stream_data)
+ {
+ refbuf_t *p = source->stream_data;
+ source->stream_data = p->next;
+ refbuf_release (p);
+ }
+ source->stream_data_tail = NULL;
+
source->burst_on_connect = 1;
- thread_mutex_destroy(&source->queue_mutex);
}
@@ -360,7 +365,106 @@
thread_mutex_unlock (&move_clients_mutex);
}
+/* get some data from the source. The stream data is placed in a refbuf
+ * and sent back, however NULL is also valid as in the case of a short
+ * timeout and there's no data pending.
+ */
+static refbuf_t *get_next_buffer (source_t *source)
+{
+ refbuf_t *refbuf = NULL;
+ int delay = 250;
+ if (source->short_delay)
+ delay = 0;
+ while (global.running == ICE_RUNNING && source->running)
+ {
+ int fds;
+ time_t current = time (NULL);
+
+ fds = util_timed_wait_for_fd (source->con->sock, delay);
+
+ if (fds < 0)
+ {
+ if (! sock_recoverable (sock_error()))
+ {
+ WARN0 ("Error while waiting on socket, Disconnecting source");
+ source->running = 0;
+ }
+ break;
+ }
+ if (fds == 0)
+ {
+ if (source->last_read + (time_t)source->timeout < current)
+ {
+ DEBUG3 ("last %ld, timeout %ld, now %ld", source->last_read, source->timeout, current);
+ WARN0 ("Disconnecting source due to socket timeout");
+ source->running = 0;
+ }
+ break;
+ }
+ source->last_read = current;
+ refbuf = source->format->get_buffer (source);
+ if (refbuf)
+ break;
+ }
+
+ return refbuf;
+}
+
+
+/* general send routine per listener. The deletion_expected tells us whether
+ * the last in the queue is about to disappear, so if this client is still
+ * referring to it after writing then drop the client as it's fallen too far
+ * behind
+ */
+static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
+{
+ int bytes;
+ int loop = 10; /* max number of iterations in one go */
+ int total_written = 0;
+
+ /* new users need somewhere to start from */
+ if (client->refbuf == NULL)
+ {
+ /* make clients start at the most recent data on the queue */
+ client->refbuf = source->stream_data_tail;
+ if (client->refbuf == NULL)
+ return;
+ }
+
+ while (1)
+ {
+ /* jump out if client connection has died */
+ if (client->con->error)
+ break;
+
+ /* lets not send too much to one client in one go, but don't
+ sleep for too long if more data can be sent */
+ if (total_written > 150000 || loop == 0)
+ {
+ source->short_delay = 1;
+ break;
+ }
+
+ loop--;
+
+ bytes = source->format->write_buf_to_client (source->format, client);
+ if (bytes <= 0)
+ break; /* can't write any more */
+
+ total_written += bytes;
+ }
+
+ /* the refbuf referenced at head (last in queue) may be marked for deletion
+ * if so, check to see if this client is still referring to it */
+ if (deletion_expected && client->refbuf == source->stream_data)
+ {
+ DEBUG0("Client has fallen too far behind, removing");
+ client->con->error = 1;
+ }
+}
+
+
static void source_init (source_t *source)
{
ice_config_t *config = config_get_config();
@@ -423,9 +527,8 @@
sock_set_blocking (source->con->sock, SOCK_NONBLOCK);
- thread_mutex_create(&source->queue_mutex);
-
DEBUG0("Source creation complete");
+ source->last_read = time (NULL);
source->running = 1;
/*
@@ -454,212 +557,53 @@
void source_main (source_t *source)
{
- char buffer[4096];
- long bytes, sbytes;
- int ret, i;
+ unsigned int listeners;
+ refbuf_t *refbuf;
client_t *client;
avl_node *client_node;
- refbuf_t *refbuf, *abuf, *stale_refbuf;
- int data_done;
-
source_init (source);
while (global.running == ICE_RUNNING && source->running) {
- ret = source->format->get_buffer(source->format, NULL, 0, &refbuf);
- if(ret < 0) {
- WARN0("Bad data from source");
- break;
- }
- if (source->burst_on_connect) {
- thread_mutex_lock(&source->queue_mutex);
- /* Add to the source buffer */
- if (refbuf) {
- refbuf_addref(refbuf);
- refbuf_queue_add(&(source->queue), refbuf);
- /* We derive the size of the source buffer queue based off the
- setting for queue_size_limit (client buffer queue size).
- This is because the source buffer queue size should be a
- percentage of the client buffer size (definately cannot
- be larger). Why 50% ? Because > 75% does not give the
- client enough leeway for lagging on initial connection
- and < 25% does not provide a good enough burst on connect. */
- if (refbuf_queue_length(&(source->queue)) >
- source->queue_size_limit/2) {
- stale_refbuf = refbuf_queue_remove(&(source->queue));
- refbuf_release(stale_refbuf);
- }
- }
- thread_mutex_unlock(&source->queue_mutex);
- }
- bytes = 1; /* Set to > 0 so that the post-loop check won't be tripped */
- while (refbuf == NULL) {
- bytes = 0;
- while (bytes <= 0) {
- ret = util_timed_wait_for_fd(source->con->sock, source->timeout*1000);
+ int remove_from_q;
- if (ret < 0 && sock_recoverable (sock_error()))
- continue;
- if (ret <= 0) { /* timeout expired */
- WARN1("Disconnecting source: socket timeout (%d s) expired",
- source->timeout);
- bytes = 0;
- break;
- }
+ refbuf = get_next_buffer (source);
- bytes = sock_read_bytes(source->con->sock, buffer, 4096);
- if (bytes == 0 ||
- (bytes < 0 && !sock_recoverable(sock_error())))
- {
- DEBUG1("Disconnecting source due to socket read error: %s",
- strerror(sock_error()));
- break;
- }
- }
- if (bytes <= 0) break;
- source->client->con->sent_bytes += bytes;
- ret = source->format->get_buffer(source->format, buffer, bytes,
- &refbuf);
- if(ret < 0) {
- WARN0("Bad data from source");
- goto done;
- }
- if (source->burst_on_connect) {
- /* Add to the source buffer */
- thread_mutex_lock(&source->queue_mutex);
- if (refbuf) {
- refbuf_addref(refbuf);
- refbuf_queue_add(&(source->queue), refbuf);
- if (refbuf_queue_length(&(source->queue)) >
- source->queue_size_limit/2) {
- stale_refbuf = refbuf_queue_remove(&(source->queue));
- refbuf_release(stale_refbuf);
- }
- }
- thread_mutex_unlock(&source->queue_mutex);
- }
- }
+ remove_from_q = 0;
+ source->short_delay = 0;
- if (bytes <= 0) {
- INFO0("Removing source following disconnection");
- break;
- }
+ if (refbuf)
+ {
+ /* append buffer to the in-flight data queue, */
+ if (source->stream_data == NULL)
+ source->stream_data = refbuf;
+ if (source->stream_data_tail)
+ source->stream_data_tail->next = refbuf;
+ source->stream_data_tail = refbuf;
+ source->queue_size += refbuf->len;
- /* we have a refbuf buffer, which a data block to be sent to
- ** all clients. if a client is not able to send the buffer
- ** immediately, it should store it on its queue for the next
- ** go around.
- **
- ** instead of sending the current block, a client should send
- ** all data in the queue, plus the current block, until either
- ** it runs out of data, or it hits a recoverable error like
- ** EAGAIN. this will allow a client that got slightly lagged
- ** to catch back up if it can
- */
-
- /* First, stream dumping, if enabled */
- if(source->dumpfile) {
- if(fwrite(refbuf->data, 1, refbuf->len, source->dumpfile) !=
- refbuf->len)
- {
- WARN1("Write to dump file failed, disabling: %s",
- strerror(errno));
- fclose(source->dumpfile);
- source->dumpfile = NULL;
- }
+ /* save stream to file */
+ if (source->dumpfile && source->format->write_buf_to_file)
+ source->format->write_buf_to_file (source, refbuf);
}
+ /* lets see if we have too much data in the queue, but don't remove it until later */
+ if (source->queue_size > source->queue_size_limit)
+ remove_from_q = 1;
- /* acquire read lock on client_tree */
- avl_tree_rlock(source->client_tree);
-
- client_node = avl_get_first(source->client_tree);
- while (client_node) {
- /* acquire read lock on node */
- avl_node_wlock(client_node);
-
- client = (client_t *)client_node->key;
-
- data_done = 0;
-
- /* do we have any old buffers? */
- abuf = refbuf_queue_remove(&client->queue);
- while (abuf) {
- bytes = abuf->len - client->pos;
-
- sbytes = source->format->write_buf_to_client(source->format,
- client, &abuf->data[client->pos], bytes);
- if (sbytes < bytes) {
- if (client->con->error) {
- refbuf_release (abuf);
- }
- else {
- /* We didn't send the entire buffer. Leave it for
- * the moment, handle it in the next iteration.
- */
- client->pos += sbytes<0?0:sbytes;
- refbuf_queue_insert (&client->queue, abuf);
- }
- data_done = 1;
- break;
- }
- /* we're done with that refbuf, release it and reset the pos */
- refbuf_release(abuf);
- client->pos = 0;
-
- abuf = refbuf_queue_remove(&client->queue);
- }
-
- /* now send or queue the new data */
- if (data_done) {
- refbuf_addref(refbuf);
- refbuf_queue_add(&client->queue, refbuf);
- } else {
- sbytes = source->format->write_buf_to_client(source->format,
- client, refbuf->data, refbuf->len);
- if (client->con->error == 0 && sbytes < refbuf->len) {
- /* Didn't send the entire buffer, queue it */
- client->pos = sbytes<0?0:sbytes;
- refbuf_addref(refbuf);
- refbuf_queue_insert(&client->queue, refbuf);
- }
- }
-
- /* if the client is too slow, its queue will slowly build up.
- ** we need to make sure the client is keeping up with the
- ** data, so we'll kick any client who's queue gets to large.
- */
- if (refbuf_queue_length(&client->queue) > source->queue_size_limit) {
- DEBUG0("Client has fallen too far behind, removing");
- client->con->error = 1;
- }
-
- /* release read lock on node */
- avl_node_unlock(client_node);
-
- /* get the next node */
- client_node = avl_get_next(client_node);
- }
- /* release read lock on client_tree */
- avl_tree_unlock(source->client_tree);
-
- /* Only release the refbuf if we didn't add it to the source queue */
- if (!source->burst_on_connect) {
- refbuf_release(refbuf);
- }
-
/* acquire write lock on client_tree */
avl_tree_wlock(source->client_tree);
- /** delete bad clients **/
+ listeners = source->listeners;
client_node = avl_get_first(source->client_tree);
while (client_node) {
client = (client_t *)client_node->key;
+
+ send_to_listener (source, client, remove_from_q);
+
if (client->con->error) {
client_node = avl_get_next(client_node);
avl_delete(source->client_tree, (void *)client, _free_client);
source->listeners--;
- stats_event_args(source->mount, "listeners", "%d",
- source->listeners);
DEBUG0("Client removed");
continue;
}
@@ -696,34 +640,7 @@
DEBUG0("Client added");
stats_event_inc(NULL, "clients");
stats_event_inc(source->mount, "connections");
- stats_event_args(source->mount, "listeners", "%d",
- source->listeners);
- /* we have to send cached headers for some data formats
- ** this is where we queue up the buffers to send
- */
- client = (client_t *)client_node->key;
- if (source->format->has_predata) {
- client->queue = source->format->get_predata(source->format);
- }
- if (source->burst_on_connect) {
- /* here is where we fill up the new client with refbufs from
- the source buffer. this will allow an initial burst of
- audio data to be sent to the client, and allow for a faster
- startup time (from listener perspective) for the stream */
- if (!client->burst_sent) {
- thread_mutex_lock(&source->queue_mutex);
- for (i=0;i<refbuf_queue_size(&(source->queue));i++) {
- refbuf_queue_add(&(client->queue),
- refbuf_queue_get(&(source->queue), i));
- }
- thread_mutex_unlock(&source->queue_mutex);
- client->burst_sent = 1;
- DEBUG1("Added %d buffers to initial client queue",
- refbuf_queue_length(&(source->queue)));
- }
- }
-
client_node = avl_get_next(client_node);
}
@@ -737,12 +654,35 @@
/* release write lock on pending_tree */
avl_tree_unlock(source->pending_tree);
+ /* update the stats if need be */
+ if (source->listeners != listeners)
+ {
+ INFO2("listener count on %s now %d", source->mount, source->listeners);
+ stats_event_args (source->mount, "listeners", "%d", source->listeners);
+ }
+
+ if (remove_from_q)
+ {
+ refbuf_t *to_go = source->stream_data;
+ if (to_go->next)
+ {
+ source->stream_data = to_go->next;
+ source->queue_size -= to_go->len;
+ refbuf_release (to_go);
+ }
+ else
+ WARN0("possible queue length error");
+ }
+
/* release write lock on client_tree */
avl_tree_unlock(source->client_tree);
}
+ source_shutdown (source);
+}
-done:
+static void source_shutdown (source_t *source)
+{
source->running = 0;
INFO1("Source \"%s\" exiting", source->mount);
@@ -776,8 +716,6 @@
/* release our hold on the lock so the main thread can continue cleaning up */
thread_rwlock_unlock(source->shutdown_rwlock);
-
- return;
}
static int _compare_clients(void *compare_arg, void *a, void *b)
@@ -816,7 +754,7 @@
static void _parse_audio_info (source_t *source, const char *s)
{
const char *start = s;
- unsigned len;
+ unsigned int len;
while (start != NULL && *start != '\0')
{
Modified: icecast/branches/icecast-singleq/src/source.h
===================================================================
--- icecast/branches/icecast-singleq/src/source.h 2004-07-19 23:21:50 UTC (rev 7177)
+++ icecast/branches/icecast-singleq/src/source.h 2004-07-19 23:36:00 UTC (rev 7178)
@@ -55,10 +55,17 @@
struct auth_tag *authenticator;
int fallback_override;
int no_mount;
+
+ unsigned queue_size;
unsigned queue_size_limit;
+
unsigned timeout; /* source timeout in seconds */
- refbuf_queue_t *queue;
- mutex_t queue_mutex;
+ time_t last_read;
+ int short_delay;
+
+ refbuf_t *stream_data;
+ refbuf_t *stream_data_tail;
+
int burst_on_connect;
} source_t;
More information about the commits
mailing list