[xiph-commits] r17711 - in icecast/branches/kh/icecast: . src web win32

karl at svn.xiph.org karl at svn.xiph.org
Fri Dec 3 07:56:14 PST 2010


Author: karl
Date: 2010-12-03 07:56:14 -0800 (Fri, 03 Dec 2010)
New Revision: 17711

Added:
   icecast/branches/kh/icecast/web/7.xsl
Modified:
   icecast/branches/kh/icecast/NEWS
   icecast/branches/kh/icecast/config.h.vc6
   icecast/branches/kh/icecast/configure.in
   icecast/branches/kh/icecast/src/admin.c
   icecast/branches/kh/icecast/src/auth.c
   icecast/branches/kh/icecast/src/auth_url.c
   icecast/branches/kh/icecast/src/cfgfile.c
   icecast/branches/kh/icecast/src/cfgfile.h
   icecast/branches/kh/icecast/src/client.c
   icecast/branches/kh/icecast/src/connection.c
   icecast/branches/kh/icecast/src/flv.c
   icecast/branches/kh/icecast/src/flv.h
   icecast/branches/kh/icecast/src/format.c
   icecast/branches/kh/icecast/src/format.h
   icecast/branches/kh/icecast/src/format_mp3.c
   icecast/branches/kh/icecast/src/format_mp3.h
   icecast/branches/kh/icecast/src/fserve.c
   icecast/branches/kh/icecast/src/mpeg.c
   icecast/branches/kh/icecast/src/refbuf.c
   icecast/branches/kh/icecast/src/refbuf.h
   icecast/branches/kh/icecast/src/slave.c
   icecast/branches/kh/icecast/src/source.c
   icecast/branches/kh/icecast/src/source.h
   icecast/branches/kh/icecast/src/stats.c
   icecast/branches/kh/icecast/win32/icecast2.iss
Log:
sync up to kh28. Mostly corruption or NULL pointer fixes, mainly related to metadata.
removed the relay recanning in the slave thread as that is handled by the worker clients
now. A few internal cleanups with worker processing and stats.




Modified: icecast/branches/kh/icecast/NEWS
===================================================================
--- icecast/branches/kh/icecast/NEWS	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/NEWS	2010-12-03 15:56:14 UTC (rev 17711)
@@ -17,6 +17,29 @@
 any extra tags are show in the conf/icecast.xml.dist file
 
 
+2.3.2-kh28
+. fix possible short send shoutcast metadata inserts getting corrupted
+. fix a few possible flv issues when metadata is missing.
+. add filter routine for mpeg parsing, used for file inputs like intro or fallback
+. add mpeg ts frame marker to frame alignment code.
+. add min-queue-size mount setting, defaults to burst size so backward compatible.
+  players can request anything up to min-queue-setting with the initial-burst
+  header or burst= arg
+. worker shutdown forces clients to process, should allow for quicker termination
+  or moving to another worker.
+. stats client tidy up, prevent a few more stats going through to slave, report
+  slave auth failure properly, and initialize settings for inc/dec updates
+. introduce SYNC flag to source to keep internal counts consistent when moving or
+  terminating listeners
+. add proxy/cache avoidance headers
+. drop start timer from relay as we use the more general client schedule timer
+. removed the relay recheck each second as the relays are now installed and started
+  via the clients on the workers. saves work with many relays.
+. possible crash point on relay details change, or xml reload
+. offset in file serving was always 0 even if range was requested.
+. add in the 7.xsl hack for compatability
+. A few internal code and log message cleanups.
+
 2.3.2-kh27
 . use the tree locks instead of stats lock, this should lessen contention with many
   stats updates, typicaly of setups with many streams.

Modified: icecast/branches/kh/icecast/config.h.vc6
===================================================================
--- icecast/branches/kh/icecast/config.h.vc6	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/config.h.vc6	2010-12-03 15:56:14 UTC (rev 17711)
@@ -95,7 +95,7 @@
 #define PACKAGE_NAME "Icecast"
 
 /* Version number of package */
-#define VERSION "2.3.2-kh27"
+#define VERSION "2.3.2-kh28"
 
 /* Define to the version of this package. */
 #define PACKAGE_VERSION VERSION

Modified: icecast/branches/kh/icecast/configure.in
===================================================================
--- icecast/branches/kh/icecast/configure.in	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/configure.in	2010-12-03 15:56:14 UTC (rev 17711)
@@ -1,4 +1,4 @@
-AC_INIT([Icecast], [2.3.2-kh27], [karl at xiph.org])
+AC_INIT([Icecast], [2.3.2-kh28], [karl at xiph.org])
 
 LT_INIT
 AC_PREREQ(2.59)

Modified: icecast/branches/kh/icecast/src/admin.c
===================================================================
--- icecast/branches/kh/icecast/src/admin.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/admin.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -353,12 +353,14 @@
 
     if (connection_check_admin_pass (client->parser))
         client->flags |= CLIENT_AUTHENTICATED;
-
-    /* special case for slaves requesting a streamlist for authenticated relaying */
-    if (strcmp (uri, "streams") == 0 || strcmp (uri, "streamlist.txt") == 0)
+    else
     {
-        if (connection_check_relay_pass (client->parser))
-            client->flags |= CLIENT_AUTHENTICATED;
+        /* special case for slaves requesting a streamlist for authenticated relaying */
+        if (strcmp (uri, "streams") == 0 || strcmp (uri, "streamlist.txt") == 0)
+        {
+            if (connection_check_relay_pass (client->parser))
+                client->flags |= CLIENT_AUTHENTICATED;
+        }
     }
 
     if (mount)
@@ -484,7 +486,7 @@
 
     source_set_fallback (source, dest_source);
     source->termination_count = source->listeners;
-    source->flags |= SOURCE_TEMPORARY_FALLBACK;
+    source->flags |= SOURCE_LISTENERS_SYNC;
 
     snprintf (buf, sizeof(buf), "Clients moved from %s to %s",
             source->mount, dest_source);

Modified: icecast/branches/kh/icecast/src/auth.c
===================================================================
--- icecast/branches/kh/icecast/src/auth.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/auth.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -409,7 +409,7 @@
     /* check whether we are processing a streamlist request for slaves */
     if (strcmp (mount, "/admin/streams") == 0)
     {
-        if (client->parser->req_type == httpp_req_stats)
+        if (client->parser->req_type == httpp_req_stats && (client->flags & CLIENT_IS_SLAVE))
         {
             stats_add_listener (client, STATS_SLAVE|STATS_GENERAL);
             return 0;
@@ -514,55 +514,58 @@
  */
 void auth_add_listener (const char *mount, client_t *client)
 {
-    mount_proxy *mountinfo; 
-    ice_config_t *config;
+    ice_config_t *config = config_get_config();
+    mount_proxy *mountinfo = config_find_mount (config, mount);
 
-    if (connection_check_relay_pass (client->parser))
+    if ((client->flags & CLIENT_AUTHENTICATED) == 0)
     {
-        client->flags |= (CLIENT_IS_SLAVE|CLIENT_AUTHENTICATED);
-        INFO0 ("client connected as slave");
-    }
-    config = config_get_config();
-    mountinfo = config_find_mount (config, mount);
-    if (mountinfo)
-    {
-        if (mountinfo->skip_accesslog)
-            client->flags |= CLIENT_SKIP_ACCESSLOG;
-        if (mountinfo->ban_client)
+        if (mountinfo)
         {
-            DEBUG1 ("ban client value is %d", mountinfo->ban_client);
-            if (mountinfo->ban_client < 0)
-                client->flags |= CLIENT_IP_BAN_LIFT;
-            connection_add_banned_ip (client->connection.ip, mountinfo->ban_client);
+            if (mountinfo->skip_accesslog)
+                client->flags |= CLIENT_SKIP_ACCESSLOG;
+            if (mountinfo->ban_client)
+            {
+                if (mountinfo->ban_client < 0)
+                    client->flags |= CLIENT_IP_BAN_LIFT;
+                connection_add_banned_ip (client->connection.ip, mountinfo->ban_client);
+            }
+            if (mountinfo->no_mount)
+            {
+                config_release_config ();
+                client_send_403 (client, "mountpoint unavailable");
+                return;
+            }
+            if (mountinfo->auth && mountinfo->auth->authenticate)
+            {
+                auth_client *auth_user;
+
+                if (mountinfo->auth->running == 0 || mountinfo->auth->pending_count > 150)
+                {
+                    config_release_config ();
+                    WARN0 ("too many clients awaiting authentication");
+                    client_send_403 (client, "busy, please try again later");
+                    return;
+                }
+                auth_user = auth_client_setup (mount, client);
+                auth_user->process = auth_new_listener;
+                client->flags &= ~CLIENT_ACTIVE;
+                DEBUG0 ("adding client for authentication");
+                queue_auth_client (auth_user, mountinfo);
+                config_release_config ();
+                return;
+            }
         }
-        if (mountinfo->no_mount)
+        else
         {
-            config_release_config ();
-            client_send_403 (client, "mountpoint unavailable");
-            return;
+            if (strcmp (mount, "/admin/streams") == 0)
+            {
+                config_release_config ();
+                client_send_401 (client, NULL);
+                return;
+            }
         }
     }
-    if ((client->flags & CLIENT_AUTHENTICATED) == 0 && mountinfo && mountinfo->auth && mountinfo->auth->authenticate)
-    {
-        auth_client *auth_user;
-
-        if (mountinfo->auth->running == 0 || mountinfo->auth->pending_count > 150)
-        {
-            config_release_config ();
-            WARN0 ("too many clients awaiting authentication");
-            client_send_403 (client, "busy, please try again later");
-            return;
-        }
-        auth_user = auth_client_setup (mount, client);
-        auth_user->process = auth_new_listener;
-        client->flags &= ~CLIENT_ACTIVE;
-        DEBUG0 ("adding client for authentication");
-        queue_auth_client (auth_user, mountinfo);
-    }
-    else
-    {
-        add_authenticated_listener (mount, mountinfo, client);
-    }
+    add_authenticated_listener (mount, mountinfo, client);
     config_release_config ();
 }
 

Modified: icecast/branches/kh/icecast/src/auth_url.c
===================================================================
--- icecast/branches/kh/icecast/src/auth_url.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/auth_url.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -280,11 +280,14 @@
                 next->len = unprocessed;
                 mpeg_data_insert (&x->sync, next);
             }
-            if (n->data[0] != (char)0xFF)
-                WARN0 ("unexpected, no frame marker for buffer");
         }
-        *x->tailp = n;
-        x->tailp = &n->next;
+        if (n->len == 0)
+            refbuf_release (n);
+        else
+        {
+            *x->tailp = n;
+            x->tailp = &n->next;
+        }
     }
     return (int)(bytes);
 }

Modified: icecast/branches/kh/icecast/src/cfgfile.c
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/cfgfile.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -513,6 +513,7 @@
     configuration->relay_username = (char *)xmlCharStrdup (CONFIG_DEFAULT_MASTER_USERNAME);
     configuration->relay_password = NULL;
     /* default to a typical prebuffer size used by clients */
+    configuration->min_queue_size = 0;
     configuration->burst_size = CONFIG_DEFAULT_BURST_SIZE;
 }
 
@@ -766,6 +767,7 @@
         { "source-timeout",     config_get_int,     &mount->source_timeout },
         { "queue-size",         config_get_int,     &mount->queue_size_limit },
         { "burst-size",         config_get_int,     &mount->burst_size},
+        { "min-queue-size",     config_get_int,     &mount->min_queue_size},
         { "username",           config_get_str,     &mount->username },
         { "password",           config_get_str,     &mount->password },
         { "dump-file",          config_get_str,     &mount->dumpfile },
@@ -780,10 +782,9 @@
         { "filter-theora",      config_get_bool,    &mount->filter_theora },
         { "limit-rate",         config_get_bitrate, &mount->limit_rate },
         { "skip-accesslog",     config_get_bool,    &mount->skip_accesslog },
-        { "avg-bitrate-duration",
-                                config_get_int,     &mount->avg_bitrate_duration },
         { "charset",            config_get_str,     &mount->charset },
         { "qblock-size",        config_get_int,     &mount->queue_block_size },
+        { "metadata-interval",  config_get_int,     &mount->mp3_meta_interval },
         { "mp3-metadata-interval",
                                 config_get_int,     &mount->mp3_meta_interval },
         { "ogg-passthrough",    config_get_bool,    &mount->ogg_passthrough },
@@ -819,10 +820,10 @@
     mount->max_listeners = -1;
     mount->max_bandwidth = -1;
     mount->burst_size = -1;
+    mount->min_queue_size = -1;
     mount->mp3_meta_interval = -1;
     mount->yp_public = -1;
     mount->url_ogg_meta = 0;
-    mount->avg_bitrate_duration = 60;
     mount->source_timeout = config->source_timeout;
     mount->file_seekable = 1;
     mount->access_log.logid = -1;
@@ -838,12 +839,12 @@
     }
     if (mount->auth)
         mount->auth->mount = strdup (mount->mountname);
-    if (mount->avg_bitrate_duration < 2)
-        mount->avg_bitrate_duration = 2;
     if (mount->admin_comments_only)
         mount->url_ogg_meta = 1;
     if (mount->url_ogg_meta)
         mount->ogg_passthrough = 0;
+    if (mount->min_queue_size < 0)
+        mount->min_queue_size = mount->burst_size;
     if (mount->queue_block_size < 100)
         mount->queue_block_size = 1400;
     if (mount->ban_client < 0)
@@ -968,6 +969,7 @@
         { "clients",        config_get_int,    &config->client_limit },
         { "sources",        config_get_int,    &config->source_limit },
         { "queue-size",     config_get_int,    &config->queue_size_limit },
+        { "min-queue-size", config_get_int,    &config->min_queue_size },
         { "burst-size",     config_get_int,    &config->burst_size },
         { "workers",        config_get_int,    &config->workers_count },
         { "client-timeout", config_get_int,    &config->client_timeout },

Modified: icecast/branches/kh/icecast/src/cfgfile.h
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/cfgfile.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -104,6 +104,7 @@
     int so_sndbuf;      /* TCP send buffer size for new clients */
     int burst_size; /* amount to send to a new client if possible, -1 take
                      * from global setting */
+    int min_queue_size;     /* minimum length of queue */
     unsigned int queue_size_limit;
     int hidden; /* Do we list this on the xsl pages */
     unsigned int source_timeout;  /* source timeout in seconds */
@@ -116,9 +117,6 @@
     int admin_comments_only; /* enable to only show comments set from the admin page */
     int skip_accesslog;         /* skip logging client to access log */
 
-    /* duration in seconds for sampling the bandwidth */
-    int avg_bitrate_duration;
-
     int64_t limit_rate;
 
     /* duration (secs) for mountpoint to be kept reserved after source client exits */
@@ -191,7 +189,6 @@
     int on_demand;
     int running;
     int cleanup;
-    time_t start;
 } relay_server;
 
 
@@ -214,6 +211,7 @@
     int client_limit;
     int source_limit;
     unsigned int queue_size_limit;
+    int min_queue_size;
     int workers_count;
     unsigned int burst_size;
     int client_timeout;

Modified: icecast/branches/kh/icecast/src/client.c
===================================================================
--- icecast/branches/kh/icecast/src/client.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/client.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -412,14 +412,17 @@
 
 static void worker_wait (worker_t *worker)
 {
-    uint64_t now = timing_get_time();
-    int ret, duration = (int)(worker->wakeup_ms - now);
+    int ret, duration = 3;
     char ca[30];
 
-    if (duration > 60000) /* make duration between 3ms and 60s */
-        duration = 60000;
-    if (duration < 3)
-        duration = 3;
+    if (global.running == ICE_RUNNING)
+    {
+        duration = (int)(worker->wakeup_ms - timing_get_time());
+        if (duration > 60000) /* make duration between 3ms and 60s */
+            duration = 60000;
+        if (duration < 3)
+            duration = 3;
+    }
 
     ret = util_timed_wait_for_fd (worker->wakeup_fd[0], duration);
     if (ret > 0) /* may of been several wakeup attempts */
@@ -486,11 +489,6 @@
     {
         client_t *client = worker->clients, **prevp = &worker->clients;
 
-        if (prev_count != worker->count)
-        {
-            DEBUG2 ("%p now has %d clients", worker, worker->count);
-            prev_count = worker->count;
-        }
         while (client)
         {
             if (client->worker != worker) abort();
@@ -500,7 +498,7 @@
                 int ret = 0;
                 client_t *nx = client->next_on_worker;
 
-                if (client->schedule_ms <= worker->time_ms+10)
+                if (worker->running == 0 || client->schedule_ms <= worker->time_ms+10)
                 {
                     client->schedule_ms = worker->time_ms;
                     ret = client->ops->process (client);
@@ -525,6 +523,11 @@
             prevp = &client->next_on_worker;
             client = *prevp;
         }
+        if (prev_count != worker->count)
+        {
+            DEBUG2 ("%p now has %d clients", worker, worker->count);
+            prev_count = worker->count;
+        }
         if (worker->running == 0)
         {
             if (global.running == ICE_RUNNING)

Modified: icecast/branches/kh/icecast/src/connection.c
===================================================================
--- icecast/branches/kh/icecast/src/connection.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/connection.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -987,15 +987,17 @@
 #ifdef HAVE_OPENSSL
     SSL_CTX_free (ssl_ctx);
 #endif
+    global_lock();
     if (banned_ip.contents)  avl_tree_free (banned_ip.contents, free_filtered_line);
     if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_line);
     if (useragents.contents) avl_tree_free (useragents.contents, free_filtered_line);
-    banned_ip.contents = NULL;
-    allowed_ip.contents = NULL;
-    useragents.contents = NULL;
     free (banned_ip.filename);
     free (allowed_ip.filename);
     free (useragents.filename);
+    memset (&banned_ip, 0, sizeof (banned_ip));
+    memset (&allowed_ip, 0, sizeof (allowed_ip));
+    memset (&useragents, 0, sizeof (useragents));
+    global_unlock();
 
     INFO0 ("connection thread finished");
 
@@ -1243,17 +1245,21 @@
 
 static int _handle_stats_request (client_t *client)
 {
-    const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
+    if (connection_check_admin_pass (client->parser))
+        stats_add_listener (client, STATS_ALL);
+    else
+    {
+        const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
 
-    if (connection_check_admin_pass (client->parser) == 0)
-    {
-        auth_add_listener (uri, client);
-        return 0;
+        if (strcmp (uri, "/admin/streams") == 0 && connection_check_relay_pass (client->parser))
+            stats_add_listener (client, STATS_SLAVE|STATS_GENERAL);
+        else
+            auth_add_listener (uri, client);
     }
-    stats_add_listener (client, STATS_ALL);
     return 0;
 }
 
+
 static void check_for_filtering (ice_config_t *config, client_t *client, char *uri)
 {
     char *pattern = config->access_log.exclude_ext;
@@ -1264,7 +1270,7 @@
         (type && strcmp (type, ".flv") == 0))
     {
         client->flags |= CLIENT_WANTS_FLV;
-        DEBUG0 ("listener has flv extension so flag it for special handling");
+        DEBUG0 ("listener has requested FLV");
     }
     if (extension == NULL || uri == NULL)
         return;
@@ -1452,12 +1458,14 @@
 {
     if (con->con_time)
     {
-        sock_close(con->sock);
+        if (con->sock != SOCK_ERROR)
+            sock_close(con->sock);
         free(con->ip);
 #ifdef HAVE_OPENSSL
         if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); }
 #endif
         memset (con, 0, sizeof (connection_t));
+        con->sock = SOCK_ERROR;
     }
 }
 

Modified: icecast/branches/kh/icecast/src/flv.c
===================================================================
--- icecast/branches/kh/icecast/src/flv.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/flv.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -28,6 +28,7 @@
 #include "client.h"
 #include "stats.h"
 
+#include "flv.h"
 #include "logging.h"
 #include "mpeg.h"
 #include "format_mp3.h"
@@ -35,17 +36,6 @@
 #define CATMODULE "flv"
 
 
-struct flv
-{
-    int prev_tagsize;
-    int block_pos;
-    uint64_t prev_ms;
-    int64_t samples;
-    refbuf_t *seen_metadata;
-    mpeg_sync mpeg_sync;
-    unsigned char tag[30];
-};
-
 struct flvmeta
 {
     int meta_pos;
@@ -93,7 +83,14 @@
     struct flv *flv = mp->callback_key;
 
     if (mp->raw_offset + len + 16 > mp->raw->len)
-        return -1;
+    {
+        int newlen = mp->raw->len + 4096;
+        void *p = realloc (mp->raw->data, newlen);
+        if (p == NULL)
+            return -1;
+        mp->raw->data = p;
+        mp->raw->len = newlen;
+    }
     flv_hdr (flv, len + 1);
     if (flv->tag[15] == 0x22)
     {
@@ -124,7 +121,14 @@
     struct flv *flv = mp->callback_key;
 
     if (mp->raw_offset + len + 17 > mp->raw->len)
-        return -1;
+    {
+        int newlen = mp->raw->len + 4096;
+        void *p = realloc (mp->raw->data, newlen);
+        if (p == NULL)
+            return -1;
+        mp->raw->data = p;
+        mp->raw->len = newlen;
+    }
     flv_hdr (flv, len + 2);
     // a single frame (headerless) follows this
     memcpy (mp->raw->data + mp->raw_offset, &flv->tag[0], 17);
@@ -185,9 +189,11 @@
 
     if (len > 0)
     {
+        if (len > 1400)
+            len = 1400;
         ret = client_send_bytes (client, buf, len);
         if (ret < (int)len)
-            client->schedule_ms += (ret ? 20 : 150);
+            client->schedule_ms += (ret ? 50 : 250);
         if (ret > 0)
             flv->block_pos += ret;
     }
@@ -204,30 +210,44 @@
     struct flv *flv = client_mp3->specific;
     int ret;
 
+    if (client->pos >= ref->len)
+    {
+        WARN2 ("buffer position invalid (%d, %d)", client->pos, ref->len);
+        client->pos = ref->len;
+        return -1;
+    }
     /* check for metadata updates and insert if needed */
     if (flv->seen_metadata != scmeta)
     {
         /* the first assoc block is shoutcast meta, second is flv meta */
-        refbuf_t *flvmeta = scmeta->associated;
         int len;
         char *src, *dst = flv->mpeg_sync.raw->data;
+        refbuf_t *flvmeta = NULL;
+        struct flvmeta *flvm;
 
-        if (flvmeta)
+        if (scmeta)
+            flvmeta = scmeta->associated;
+        if (flvmeta == NULL)
         {
-            struct flvmeta *flvm = (struct flvmeta *)flvmeta->data;
+            char *value = stats_get_value (flv->mpeg_sync.mount, "server_name");
 
-            src = (char *)flvm + sizeof (*flvm);
-            len  = flvm->meta_pos - sizeof (*flvm);
+            flvmeta  = flv_meta_allocate (200);
+            if (value)
+                flv_meta_append (flvmeta, "name", value);
+            free (value);
+            value = stats_get_value (flv->mpeg_sync.mount, "title");
+            if (value)
+                flv_meta_append (flvmeta, "title", value);
+            else
+                flv_meta_append (flvmeta, "title", "");
+            free (value);
+            flv_meta_append (flvmeta, "title", value);
+            flv_meta_append (flvmeta, NULL, NULL);
+            ref->associated = flvmeta;
         }
-        else
-        {
-            src="\002\000\012onMetaData"    // 13
-                "\010\000\000\000\001"      //  5
-                "\000\005title"             //  7
-                "\002\000\001 "             //  4
-                "\000\000\011";             //  3
-            len = 32;
-        }
+        flvm = (struct flvmeta *)flvmeta->data;
+        src = (char *)flvm + sizeof (*flvm);
+        len  = flvm->meta_pos - sizeof (*flvm);
 
         if (len + 15 < flv->mpeg_sync.raw->len)
         {
@@ -340,13 +360,14 @@
     int bytes;
     char *ptr = client->refbuf->data;
 
-    mpeg_setup (&flv->mpeg_sync, plugin->mount);
+    mpeg_setup (&flv->mpeg_sync, client->connection.ip);
     mpeg_check_numframes (&flv->mpeg_sync, 1); 
     client_mp3->specific = flv;
 
     bytes = snprintf (ptr, 200, "HTTP/1.0 200 OK\r\n"
             "content-type: video/x-flv\r\n"
             "Cache-Control: no-cache\r\n"
+            "Expires: Thu, 01 Jan 1970 00:00:01 GMT\r\n"
             "Pragma: no-cache\r\n"
             "\r\n"
             "FLV\x1\x4%c%c%c\x9", 0,0,0);
@@ -363,6 +384,7 @@
         flv->mpeg_sync.frame_callback = flv_mpX_hdr;
     }
     flv->mpeg_sync.callback_key = flv;
+    flv->seen_metadata = (void*)flv; // force metadata initially with non-NULL meta
 
     client->respcode = 200;
     client->refbuf->len = bytes;
@@ -371,6 +393,7 @@
 
 void free_flv_client_data (struct flv *flv)
 {
+    flv->mpeg_sync.mount = NULL;
     mpeg_cleanup (&flv->mpeg_sync);
 }
 

Modified: icecast/branches/kh/icecast/src/flv.h
===================================================================
--- icecast/branches/kh/icecast/src/flv.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/flv.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -22,9 +22,10 @@
     int prev_tagsize;
     int block_pos;
     uint64_t prev_ms;
-    uint64_t samples;
+    int64_t samples;
+    refbuf_t *seen_metadata;
     mpeg_sync mpeg_sync;
-    unsigned char tag[20];
+    unsigned char tag[30];
 };
 
 

Modified: icecast/branches/kh/icecast/src/format.c
===================================================================
--- icecast/branches/kh/icecast/src/format.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/format.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -104,43 +104,61 @@
 }
 
 
-int format_file_read (client_t *client, FILE *intro)
+int format_file_read (client_t *client, format_plugin_t *plugin, FILE *intro)
 {
     refbuf_t *refbuf = client->refbuf;
+    size_t bytes = -1;
+    int unprocessed = 0;
 
-    if (refbuf == NULL)
-        return -1;
-    if (client->pos == refbuf->len)
+    do
     {
-        size_t bytes;
-
+        if (refbuf == NULL)
+        {
+            if (intro == NULL)
+                return -2;
+            refbuf = client->refbuf = refbuf_new (4096);
+            client->pos = refbuf->len;
+            client->intro_offset = 0;
+            client->queue_pos = 0;
+        }
+        if (client->pos < refbuf->len)
+            break;
         if (client->flags & CLIENT_HAS_INTRO_CONTENT)
         {
             if (refbuf->next)
             {
+                //DEBUG1 ("next intro block is %d", refbuf->next->len);
                 client->refbuf = refbuf->next;
                 refbuf->next = NULL;
                 refbuf_release (refbuf);
                 client->pos = 0;
                 return 0;
             }
+            //DEBUG0 ("No more intro data ");
             client_set_queue (client, NULL);
             client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
             client->intro_offset = client->connection.sent_bytes;
-            return -1;
+            refbuf = NULL;
+            continue;
         }
-        if (intro == NULL)
-            return -1;
-        if (fseek (intro, client->intro_offset, SEEK_SET) < 0)
-            return -1;
-        bytes = fread (refbuf->data, 1, PER_CLIENT_REFBUF_SIZE, intro);
-        if (bytes == 0)
-            return -1;
 
-        client->intro_offset += bytes;
+        if (fseek (intro, client->intro_offset, SEEK_SET) < 0 ||
+                (bytes = fread (refbuf->data, 1, 4096, intro)) <= 0)
+        {
+            return bytes < 0 ? -2 : -1;
+        }
         refbuf->len = bytes;
         client->pos = 0;
-    }
+        if (plugin->align_buffer)
+        {
+            /* here the buffer may require truncating to keep the buffers aligned on
+             * certain boundaries */
+            unprocessed = plugin->align_buffer (client, plugin);
+            if (unprocessed < 0 || unprocessed >= bytes)
+                unprocessed = 0;
+        }
+        client->intro_offset += (bytes - unprocessed);
+    } while (1);
     return 0;
 }
 

Modified: icecast/branches/kh/icecast/src/format.h
===================================================================
--- icecast/branches/kh/icecast/src/format.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/format.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -57,7 +57,8 @@
     void (*set_tag)(struct _format_plugin_tag *plugin, const char *tag, const char *value, const char *charset);
     void (*free_plugin)(struct _format_plugin_tag *self, client_t *client);
     void (*apply_settings)(struct _format_plugin_tag *format, struct _mount_proxy *mount);
-    int (*get_image)(client_t *client, struct _format_plugin_tag *format);
+    int  (*align_buffer)(client_t *client, format_plugin_t *plugin);
+    int  (*get_image)(client_t *client, struct _format_plugin_tag *format);
 
     /* for internal state management */
     void *_state;
@@ -67,7 +68,7 @@
 int format_get_plugin (format_plugin_t *plugin, client_t *client);
 int format_generic_write_to_client (client_t *client);
 
-int format_file_read (client_t *client, FILE *fp);
+int format_file_read (client_t *client, format_plugin_t *plugin, FILE *fp);
 int format_general_headers (format_plugin_t *plugin, client_t *client);
 
 void format_send_general_headers(format_plugin_t *format, 

Modified: icecast/branches/kh/icecast/src/format_mp3.c
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/format_mp3.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -60,9 +60,9 @@
 static void write_mp3_to_file (struct source_tag *source, refbuf_t *refbuf);
 static void mp3_set_tag (format_plugin_t *plugin, const char *tag, const char *in_value, const char *charset);
 static void format_mp3_apply_settings (format_plugin_t *format, mount_proxy *mount);
+static int  mpeg_process_buffer (client_t *client, format_plugin_t *plugin);
 
 
-
 /* client format flags */
 #define CLIENT_IN_METADATA          CLIENT_FORMAT_BIT
 #define CLIENT_USING_BLANK_META     (CLIENT_FORMAT_BIT<<1)
@@ -74,7 +74,7 @@
 {
     const char *metadata;
     mp3_state *state = calloc(1, sizeof(mp3_state));
-    refbuf_t *meta;
+    refbuf_t *meta, *flvmeta;
     const char *s;
 
     plugin->get_buffer = mp3_get_no_meta;
@@ -82,6 +82,7 @@
     plugin->write_buf_to_file = write_mp3_to_file;
     plugin->create_client_data = format_mp3_create_client_data;
     plugin->free_plugin = format_mp3_free_plugin;
+    plugin->align_buffer = mpeg_process_buffer;
     plugin->set_tag = mp3_set_tag;
     plugin->apply_settings = format_mp3_apply_settings;
 
@@ -96,7 +97,11 @@
 
     /* initial metadata needs to be blank for sending to clients and for
        comparing with new metadata */
+    flvmeta = flv_meta_allocate (200);
+    flv_meta_append (flvmeta, "Title", "");
+    flv_meta_append (flvmeta, NULL, NULL);
     meta = refbuf_new (17);
+    meta->associated = flvmeta;
     memcpy (meta->data, "\001StreamTitle='';", 17);
     state->metadata = meta;
     state->interval = -1;
@@ -114,13 +119,11 @@
     }
     if (client)
     {
-        if (plugin->type == FORMAT_TYPE_AAC || plugin->type == FORMAT_TYPE_MPEG)
-        {
-            client->format_data = malloc (sizeof (mpeg_sync));
-            mpeg_setup (client->format_data, plugin->mount);
-            plugin->write_buf_to_client = write_mpeg_buf_to_client;
-        }
+        client->format_data = malloc (sizeof (mpeg_sync));
+        mpeg_setup (client->format_data, plugin->mount);
+        plugin->write_buf_to_client = write_mpeg_buf_to_client;
     }
+    mpeg_setup (&state->file_sync, plugin->mount);
 
     return 0;
 }
@@ -182,8 +185,11 @@
 
     do
     {
-        if (metadata == NULL)
+        if (metadata == NULL || meta_len < 16 || meta_len > 4081)
             break;
+        if (*(unsigned char*)metadata * 16 + 1 != meta_len)
+            break;
+        metadata [meta_len-1] = '\0';
         metadata++;
         if (strncmp (metadata, "StreamTitle='", 13))
             break;
@@ -305,7 +311,13 @@
                     int urllen = end - source_mp3->inline_url;
                     int len = urllen + strlen("StreamUrl='") + 3;
                     if (size-r > len);
+                    {
+                        char *tmp = alloca (urllen+1);
                         snprintf (p->data+r, len, "StreamUrl='%.*s';", urllen, source_mp3->inline_url);
+                        snprintf (tmp, urllen+1, "%.*s", urllen, source_mp3->inline_url);
+                        flv_meta_append (flvmeta, "metadata_url", tmp);
+                        stats_event (source->mount, "metadata_url", tmp);
+                    }
                 }
             }
             else if (source_mp3->url)
@@ -314,7 +326,7 @@
                 flv_meta_append (flvmeta, "URL", source_mp3->url);
             }
         }
-        DEBUG1 ("shoutcast metadata block setup with %s", p->data+1);
+        DEBUG1 ("shoutcast metadata block setup with %.80s", p->data+1);
         title = filter_shoutcast_metadata (source, p->data, size);
         logging_playlist (source->mount, title, source->listeners);
         yp_touch (source->mount);
@@ -401,14 +413,15 @@
                 client_mp3->metadata_offset = (ret - remaining);
                 client->flags |= CLIENT_IN_METADATA;
                 client->schedule_ms += 200;
+                client_mp3->since_meta_block += remaining;
             }
             else
             {
                 client->flags &= ~CLIENT_IN_METADATA;
                 client_mp3->metadata_offset = 0;
+                client_mp3->since_meta_block = 0;
+                client->pos += remaining;
             }
-            client_mp3->since_meta_block = 0;
-            client->pos += remaining;
             client->queue_pos += remaining;
             return ret;
         }
@@ -428,6 +441,7 @@
         client_mp3->metadata_offset = 0;
         client->flags &= ~CLIENT_IN_METADATA;
         client_mp3->since_meta_block = 0;
+        client->pos += remaining;
         return ret;
     }
     if (ret > 0)
@@ -459,7 +473,7 @@
                 client_mp3->since_meta_block;
 
             /* leading up to sending the metadata block */
-            if (remaining <= len)
+            if ((client->flags & CLIENT_IN_METADATA) || remaining <= len)
             {
                 ret = send_stream_metadata (client, refbuf, remaining);
                 if (client->flags & CLIENT_IN_METADATA)
@@ -521,6 +535,7 @@
     free (format_mp3->url);
     refbuf_release (format_mp3->metadata);
     refbuf_release (format_mp3->read_data);
+    mpeg_cleanup (&format_mp3->file_sync);
     free (plugin->contenttype);
     free (format_mp3);
 }
@@ -560,11 +575,31 @@
 }
 
 
-static int validate_mpeg (mp3_state *source_mp3, mpeg_sync *mpeg_sync, refbuf_t *refbuf)
+int mpeg_process_buffer (client_t *client, format_plugin_t *plugin)
 {
+    refbuf_t *refbuf = client->refbuf;
+    mp3_state *source_mp3 = plugin->_state;
+    int unprocessed = -1;
+
+    if (refbuf)
+        unprocessed = mpeg_complete_frames (&source_mp3->file_sync, refbuf, 0);
+    return unprocessed;
+}
+
+static int validate_mpeg (source_t *source, refbuf_t *refbuf)
+{
+    client_t *client = source->client;
+    mp3_state *source_mp3 = source->format->_state;
+    mpeg_sync *mpeg_sync = client->format_data;
+
     int unprocessed = mpeg_complete_frames (mpeg_sync, refbuf, 0);
     if (unprocessed < 0)
+    {
+        WARN1 ("no frames processed for %s", source->mount);
+        mpeg_cleanup (client->format_data);
+        client->format_data = NULL;
         return -1;
+    }
     if (unprocessed > 0)
     {
         size_t len;
@@ -579,8 +614,6 @@
             }
             else
             {
-                if (unprocessed > 8000)
-                    return -1;
                 leftover = refbuf_new (unprocessed);
                 memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed);
                 mpeg_data_insert (mpeg_sync, leftover); /* will need to merge this after metadata */
@@ -618,7 +651,7 @@
     refbuf = source_mp3->read_data;
     source_mp3->read_data = NULL;
 
-    if (client->format_data && validate_mpeg (source_mp3, client->format_data, refbuf) < 0)
+    if (client->format_data && validate_mpeg (source, refbuf) < 0)
     {
         refbuf_release (refbuf);
         return NULL;
@@ -720,16 +753,15 @@
         if (source_mp3->build_metadata_len > 1 &&
                 strcmp (source_mp3->build_metadata+1, source_mp3->metadata->data+1) != 0)
         {
-            refbuf_t *meta = refbuf_new (source_mp3->build_metadata_len);
-            memcpy (meta->data, source_mp3->build_metadata,
-                    source_mp3->build_metadata_len);
+            char *title = filter_shoutcast_metadata (source, source_mp3->build_metadata, source_mp3->build_metadata_len);
 
-            DEBUG1("shoutcast metadata %.4080s", meta->data+1);
-            if (strncmp (meta->data+1, "StreamTitle=", 12) == 0)
+            if (title)
             {
-                char *title = filter_shoutcast_metadata (source,
-                        source_mp3->build_metadata, source_mp3->build_metadata_len);
-                refbuf_t *flvmeta = flv_meta_allocate (strlen(title)+20);
+                refbuf_t *flvmeta, *meta = refbuf_new (source_mp3->build_metadata_len);
+
+                memcpy (meta->data, source_mp3->build_metadata, source_mp3->build_metadata_len);
+                DEBUG1("shoutcast metadata %.80s", meta->data+1);
+                flvmeta = flv_meta_allocate (strlen(title)+20);
                 logging_playlist (source->mount, title, source->listeners);
                 stats_event_conv (source->mount, "title", title, source->format->charset);
                 flv_meta_append (flvmeta, "title", title);
@@ -746,10 +778,10 @@
             }
             else
             {
-                ERROR0 ("Incorrect metadata format, ending stream");
+                ERROR2 ("Incorrect metadata format \"%.80s\", ending %s",
+                        source_mp3->build_metadata+1, source->mount);
                 source->flags &= ~SOURCE_RUNNING;
                 refbuf_release (refbuf);
-                refbuf_release (meta);
                 return NULL;
             }
         }
@@ -762,7 +794,7 @@
         refbuf_release (refbuf);
         return NULL;
     }
-    if (client->format_data && validate_mpeg (source_mp3, client->format_data, refbuf) < 0)
+    if (client->format_data && validate_mpeg (source, refbuf) < 0)
     {
         refbuf_release (refbuf);
         return NULL;
@@ -821,12 +853,16 @@
         bytes = snprintf (ptr, remaining, "Content-Length: 221183499\r\n");
         remaining -= bytes;
         ptr += bytes;
-        /* avoid browser caching, reported via forum */
-        bytes = snprintf (ptr, remaining, "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n");
-        remaining -= bytes;
-        ptr += bytes; 
     }
+    /* avoid browser caching, reported via forum */
+    bytes = snprintf (ptr, remaining, "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n");
+    remaining -= bytes;
+    ptr += bytes; 
 
+    bytes = snprintf (ptr, remaining, "Pragma: no-cache\r\n");
+    remaining -= bytes;
+    ptr += bytes; 
+
     /* check for shoutcast style metadata inserts */
     metadata = httpp_getvar(client->parser, "icy-metadata");
     if (metadata && atoi(metadata))

Modified: icecast/branches/kh/icecast/src/format_mp3.h
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/format_mp3.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -19,6 +19,7 @@
 #define __FORMAT_MP3_H__
 
 #include "format.h"
+#include "mpeg.h"
 
 #define CLIENT_WANTS_FLV            (CLIENT_FORMAT_BIT<<20)
 
@@ -43,6 +44,7 @@
     int update_metadata;
     int queue_block_size;
 
+    mpeg_sync file_sync;
     refbuf_t *metadata;
     refbuf_t *read_data;
     int read_count;

Modified: icecast/branches/kh/icecast/src/fserve.c
===================================================================
--- icecast/branches/kh/icecast/src/fserve.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/fserve.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -203,6 +203,7 @@
         fclose (fh->fp);
     if (fh->format)
     {
+        free (fh->format->mount);
         format_plugin_clear (fh->format, NULL);
         free (fh->format);
     }
@@ -294,7 +295,7 @@
             fh->format = calloc (1, sizeof (format_plugin_t));
             fh->format->type = format_get_type (contenttype);
             free (contenttype);
-            fh->format->mount = fh->finfo.mount;
+            fh->format->mount = strdup (fh->finfo.mount);
             if (format_get_plugin (fh->format, NULL) < 0)
             {
                 avl_tree_unlock (fh_cache);
@@ -700,10 +701,7 @@
                     {
                         int len = 8192;
                         if (fh->finfo.flags & FS_FALLBACK)
-                        {
-                            len = 1400;
                             client->ops = &throttled_file_content_ops;
-                        }
                         else
                             client->ops = &file_content_ops;
                         refbuf_release (client->refbuf);
@@ -838,13 +836,13 @@
         return 0;
     }
     if (client->flags & CLIENT_WANTS_FLV) /* increase limit for flv clients as wrapping takes more space */
-        limit = (unsigned long)(limit * 1.02);
+        limit = (unsigned long)(limit * 1.05);
     if (secs)
         rate = (client->counter+1400)/secs;
     thread_mutex_lock (&fh->lock);
     if (rate > limit)
     {
-        client->schedule_ms += (1000*(rate - limit))/limit;
+        client->schedule_ms += 1000/(limit/1400);
         rate_add (fh->format->out_bitrate, 0, worker->time_ms);
         thread_mutex_unlock (&fh->lock);
         global_add_bitrates (global.out_bitrate, 0, worker->time_ms);
@@ -858,14 +856,22 @@
     }
     if (client->pos == refbuf->len)
     {
-        if (read_file (client, 1400) == 0)
+        //DEBUG1 ("reading another block from offset %ld", client->intro_offset);
+        int ret = format_file_read (client, fh->format, fh->fp);
+
+        switch (ret)
         {
-            /* loop fallback file  */
-            thread_mutex_unlock (&fh->lock);
-            client->intro_offset = 0;
-            client->pos = refbuf->len = 0;
-            client->schedule_ms += 150;
-            return 0;
+            case -1: /* loop fallback file  */
+                thread_mutex_unlock (&fh->lock);
+                // DEBUG0 ("loop of file triggered");
+                client->intro_offset = 0;
+                client->schedule_ms += 150;
+                return 0;
+            case -2: /* non-recoverable */
+                thread_mutex_unlock (&fh->lock);
+                // DEBUG0 ("major failure on read, better leave");
+                return -1;
+            default:  ;
         }
         client->pos = 0;
     }
@@ -911,6 +917,7 @@
             if (client->connection.sent_bytes == 0)
                 client->timer_start -= 2;
             client->counter = 0;
+            client->intro_offset = 0;
             fh->stats_update = client->timer_start + 5;
             fh->format->out_bitrate = rate_setup (10000, 1000);
             global_reduce_bitrate_sampling (global.out_bitrate);
@@ -924,7 +931,6 @@
 
     client->ops = &buffer_content_ops;
     client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
-    client->intro_offset = 0;
     if (client->flags & CLIENT_ACTIVE)
         client->schedule_ms = client->worker->time_ms;
     else

Modified: icecast/branches/kh/icecast/src/mpeg.c
===================================================================
--- icecast/branches/kh/icecast/src/mpeg.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/mpeg.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -34,6 +34,12 @@
     -1, 1, 2, 3, 4, 5, 6, 8, -1, -1, -1, -1, -1, -1, -1, -1
 };
 
+int mpeg_samplerates [4][4] = {
+    { 11025, 0, 22050, 44100},
+    { 12000, 0, 24000, 48000 },
+    {  8000, 0, 16000, 32000 },
+    { 0,0,0 } };
+
 #define LAYER_1     3
 #define LAYER_2     2
 #define LAYER_3     1
@@ -70,6 +76,12 @@
     return frame_len;
 }
 
+static int get_mpegframe_samplerate (unsigned char *p)
+{
+    int ver = (p[1] & 0x18) >> 3;
+    return mpeg_samplerates [(p[2]&0xC) >> 2][ver];
+}
+
 static int get_mpeg_bitrate (struct mpeg_sync *mp, unsigned char *p)
 {
     int bitrate = -1;
@@ -124,11 +136,11 @@
         bitrate *= 1000;
         if (mp->layer == LAYER_1)
         {
-            frame_len = (int)(12 * bitrate / mp->samplerate + padding) * 4; // ??
+            frame_len = (int)(12 * bitrate / get_mpegframe_samplerate(p) + padding) * 4; // ??
         }
         else
         {
-            frame_len = (int)(samples / 8 * bitrate / mp->samplerate + padding);
+            frame_len = (int)(samples / 8 * bitrate / get_mpegframe_samplerate(p) + padding);
         }
     }
     return frame_len;
@@ -152,7 +164,16 @@
     return frame_len;
 }
 
+static int handle_ts_frame (struct mpeg_sync *mp, unsigned char *p, int remaining)
+{
+    int frame_len = mp->raw_offset;
 
+    if (remaining - frame_len < 0)
+        return 0;
+    return frame_len;
+}
+
+
 /* return -1 for no valid frame at this specified address, 0 for more data needed */
 static int check_for_aac (struct mpeg_sync *mp, unsigned char *p, unsigned remaining)
 {
@@ -210,15 +231,10 @@
         {
             int checking = mp->check_numframes;
             unsigned char *fh = p;
-            int samplerates [4][4] = {
-                { 11025, 0, 22050, 44100},
-                { 12000, 0, 24000, 48000 },
-                {  8000, 0, 16000, 32000 },
-                { 0,0,0 } };
             char stream_type[20];
 
             // au.crc = (p[1] & 0x1) == 0;
-            mp->samplerate = samplerates [(p[2]&0xC) >> 2][mp->ver];
+            mp->samplerate = get_mpegframe_samplerate (p);
             if (mp->samplerate == 0)
                 return -1;
             while (checking)
@@ -259,29 +275,70 @@
     return -1;
 }
 
+static int check_for_ts (struct mpeg_sync *mp, unsigned char *p, unsigned remaining)
+{
+    int pkt_len = 188, checking;
+    do
+    {
+        int offset = 0;
+        checking = 4;
+        while (checking)
+        {
+            if (offset > remaining) return 0;
+            if (p [offset] != 0x47)
+            {
+                switch (pkt_len) {
+                    case 204: pkt_len = 208; break;
+                    case 188: pkt_len = 204; break;
+                    default:  return -1;
+                }
+                break;
+            }
+            //DEBUG2 ("found 0x37 checking %d (%d)", checking, pkt_len);
+            offset += pkt_len;
+            checking--;
+        }
+    } while (checking);
+    INFO2 ("detected TS (%d) on %s", pkt_len, mp->mount);
+    mp->process_frame = handle_ts_frame;
+    mp->syncbytes = 1;
+    mp->fixed_headerbits [0] = 0x47;
+    mp->raw_offset = pkt_len;
+    return 1;
+}
+
+
 /* return -1 for no valid frame at this specified address, 0 for more data needed */
 static int get_initial_frame (struct mpeg_sync *mp, unsigned char *p, unsigned remaining)
 {
     int ret;
 
+    if (p[0] == 0x47)
+        return check_for_ts (mp, p, remaining);
     if (p[1] < 0xE0)
         return -1;
     mp->layer = (p[1] & 0x6) >> 1;
     ret = check_for_aac (mp, p, remaining);
     if (ret < 0)
-    {
         ret = check_for_mp3 (mp, p, remaining);
-        if (ret < 0)
-            DEBUG0 ("neither aac or mp3");
-    }
+    if (ret > 0) mp->resync_count = 0;
     return ret;
 }
 
 /* return number from 1 to remaining */
-static int find_align_sync (unsigned char *start, int remaining)
+static int find_align_sync (mpeg_sync *mp, unsigned char *start, int remaining)
 {
     int skip = 0;
-    unsigned char *p = memchr (start, 255, remaining);
+    unsigned char *p = start;
+    if (mp->syncbytes)
+        p = memchr (start, mp->fixed_headerbits[0], remaining);
+    else
+    {
+        int offset = remaining;
+        for (; offset && *p != 0xFF && *p != 0x47; offset--)
+            p++;
+        if (offset == 0) p = NULL;
+    }
     if (p)
     {
         skip = p - start;
@@ -290,7 +347,14 @@
     return skip;
 }
 
+static int is_sync_byte (mpeg_sync *mp, unsigned char *p)
+{
+    if (mp->syncbytes)
+        return *p == mp->fixed_headerbits[0];
+    return *p == 0xFF || *p == 0x47;
+}
 
+
 int mpeg_complete_frames (mpeg_sync *mp, refbuf_t *new_block, unsigned offset)
 {
     unsigned char *start, *end;
@@ -299,6 +363,8 @@
     if (mp == NULL)
         return 0;  /* leave as-is */
     
+    if (mp->resync_count > 30)
+        return -1;
     mp->sample_count = 0;
     if (mp->surplus)
     {
@@ -314,7 +380,8 @@
     }
     start = (unsigned char *)new_block->data + offset;
     remaining = new_block->len - offset;
-    mp->raw_offset = 0;
+    if (mp->raw)
+        mp->raw_offset = 0;
     while (1)
     {
         end = (unsigned char*)new_block->data + new_block->len;
@@ -322,13 +389,13 @@
         //DEBUG2 ("block size %d, remaining now %d", new_block->len, remaining);
         if (remaining < 10) /* make sure we have some bytes to check */
             break;
-        if (*start != 255)
+        if (!is_sync_byte (mp, start))
         {
             // need to resync
-            int ret = find_align_sync (start, remaining);
+            int ret = find_align_sync (mp, start, remaining);
             if (ret == 0)
                 break; // no sync in the rest, so dump it
-            DEBUG1 ("no frame sync, re-checking after skipping %d", ret);
+            DEBUG2 ("no frame sync, re-checking after skipping %d (%d)", ret, remaining);
             new_block->len -= ret;
             mp->resync_count++;
             mp->syncbytes = 0; /* force an initial recheck */

Modified: icecast/branches/kh/icecast/src/refbuf.c
===================================================================
--- icecast/branches/kh/icecast/src/refbuf.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/refbuf.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -68,6 +68,22 @@
     self->_count++;
 }
 
+refbuf_t *refbuf_copy(refbuf_t *orig)
+{
+    refbuf_t *ret = refbuf_new (orig->len), *ref = ret;
+    memcpy (ref->data, orig->data, orig->len);
+    orig = orig->associated;
+    while (orig)
+    {
+        ref->associated = refbuf_new (orig->len);
+        ref = ref->associated;
+        memcpy (ref->data, orig->data, orig->len);
+        orig = orig->associated;
+    }
+    return ret;
+}
+
+
 static void refbuf_release_associated (refbuf_t *ref)
 {
     if (ref == NULL)

Modified: icecast/branches/kh/icecast/src/refbuf.h
===================================================================
--- icecast/branches/kh/icecast/src/refbuf.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/refbuf.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -37,7 +37,9 @@
 refbuf_t *refbuf_new(unsigned int size);
 void refbuf_addref(refbuf_t *self);
 void refbuf_release(refbuf_t *self);
+refbuf_t *refbuf_copy(refbuf_t *orig);
 
+
 #define PER_CLIENT_REFBUF_SIZE  4096
 
 #define WRITE_BLOCK_GENERIC     01000

Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/slave.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -498,7 +498,6 @@
     relay_server *relay;
     source_t *src;
     int failed = 1, sources;
-    time_t start = time (NULL);
 
     global_lock();
     sources = ++global.sources;
@@ -543,19 +542,6 @@
         INFO2 ("listener count remaining on %s is %d", src->mount, src->listeners);
         src->flags &= ~SOURCE_PAUSE_LISTENERS;
         thread_mutex_unlock (&src->lock);
-        relay->start = (time_t)(client->schedule_ms/1000);
-        if (relay->on_demand)
-            relay->start += 3;
-        else
-        {
-            if (relay->running && start - relay->start < 300)
-            {
-                INFO1 ("relay %s terminated too quickly, will restart in 30s", relay->localmount);
-                relay->start += 30;
-            }
-            else
-                relay->start += relay->interval;
-        }
         global_lock();
         global.sources--;
         stats_event_args (NULL, "sources", "%d", global.sources);
@@ -572,58 +558,43 @@
 }
 
 
-static void check_relay_stream (relay_server *relay)
+int relay_install (relay_server *relay)
 {
-    source_t *source = relay->source;
-    if (source && source->client)
-        return;
+    client_t *client;
+    source_t *source = source_reserve (relay->localmount);
+
     if (source == NULL)
     {
-        if (relay->localmount[0] != '/')
-        {
-            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping", relay->localmount);
-            return;
-        }
-        /* new relay, reserve the name */
-        source = source_reserve (relay->localmount);
-        if (source == NULL)
-        {
-            if (relay->start == 0)
-            {
-                WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
-                relay->start = 1;
-            }
-            return;
-        }
-        relay->source = source;
-        INFO1("Adding new relay at mountpoint \"%s\"", relay->localmount);
-        stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
+        WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
+        return -1;
     }
-    if (source->client == NULL)
+    relay->source = source;
+    stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
+
+    global_lock();
+    client = source->client = client_create (SOCK_ERROR);
+    global_unlock();
+    client->shared_data = relay;
+    client->ops = &relay_startup_ops;
+
+    if (relay->on_demand)
     {
-        client_t *client;
-        global_lock();
-        client = client_create (SOCK_ERROR);
-        global_unlock();
-        source->client = client;
-        client->shared_data = relay;
-        client->ops = &relay_startup_ops;
-        if (relay->on_demand)
-        {
-            ice_config_t *config;
-            mount_proxy *mountinfo;
-            thread_mutex_lock (&source->lock);
-            config = config_get_config();
-            mountinfo = config_find_mount (config, source->mount);
-            source->flags |= SOURCE_ON_DEMAND;
-            source_update_settings (config, source, mountinfo);
-            thread_mutex_unlock (&source->lock);
-            config_release_config();
-        }
-        client->flags |= CLIENT_ACTIVE;
-        DEBUG1 ("adding relay client for %s", relay->localmount);
-        client_add_worker (client);
+        ice_config_t *config;
+        mount_proxy *mountinfo;
+
+        thread_mutex_lock (&source->lock);
+        config = config_get_config();
+        mountinfo = config_find_mount (config, source->mount);
+        source->flags |= SOURCE_ON_DEMAND;
+        source_update_settings (config, source, mountinfo);
+        thread_mutex_unlock (&source->lock);
+        config_release_config();
     }
+    client->flags |= CLIENT_ACTIVE;
+    DEBUG1 ("adding relay client for %s", relay->localmount);
+    client_add_worker (client);
+
+    return 0;
 }
 
 
@@ -713,6 +684,7 @@
             existing_relay = relay_copy (relay);
             existing_relay->next = new_list;
             new_list = existing_relay;
+            relay_install (existing_relay);
         }
         relay = relay->next;
     }
@@ -722,61 +694,43 @@
 
 /* update the relay_list with entries from new_relay_list. Any new relays
  * are added to the list, and any not listed in the provided new_relay_list
- * are separated and returned in a separate list
+ * are shutdown
  */
-static relay_server *
-update_relays (relay_server **relay_list, relay_server *new_relay_list)
+static void update_relays (relay_server **relay_list, relay_server *new_relay_list)
 {
-    relay_server *active_relays, *cleanup_relays;
+    relay_server *active_relays, *cleanup_relays = new_relay_list;
 
-    active_relays = update_relay_set (relay_list, new_relay_list);
-
-    cleanup_relays = *relay_list;
-    /* re-assign new set */
-    *relay_list = active_relays;
-
-    return cleanup_relays;
-}
-
-
-static void relay_check_streams (relay_server *to_start,
-        relay_server *to_free, int skip_timer)
-{
-    relay_server *relay;
-
-    while (to_free)
+    thread_mutex_lock (&(config_locks()->relay_lock));
+    if (relay_list)
     {
-        relay_server *release = to_free;
-        to_free = release->next;
+        active_relays = update_relay_set (relay_list, new_relay_list);
+        cleanup_relays = *relay_list;
+    }
+    while (cleanup_relays)
+    {
+        relay_server *to_release = cleanup_relays;
+        source_t *source = to_release->source;
 
-        if (release->source && release->source->client)
+        cleanup_relays = to_release->next;
+        if (source && source->client) // use client to free up if available
         {
-            release->cleanup = 1;
-            release->start = 0;
-            release->source->client->schedule_ms = 0;
-            if (release->running)
+            to_release->cleanup = 1;
+            if (to_release->running)
             {
                 /* relay has been removed from xml/streamlist, shut down active relay */
-                INFO1 ("source shutdown request on \"%s\"", release->localmount);
-                release->running = 0;
-                release->source->flags &= ~SOURCE_RUNNING;
+                INFO1 ("source shutdown request on \"%s\"", to_release->localmount);
+                to_release->running = 0;
+                source->flags &= ~SOURCE_RUNNING;
             }
-            else
-                stats_event (release->localmount, NULL, NULL);
+            source->client->schedule_ms = 0;
             continue;
         }
-        if (release->cleanup == 0)
-            relay_free (release);
+        relay_free (to_release);
     }
-
-    relay = to_start;
-    while (relay)
-    {
-        if (skip_timer)
-            relay->start = 0;
-        check_relay_stream (relay);
-        relay = relay->next;
-    }
+    /* re-assign new set */
+    if (relay_list)
+        *relay_list = active_relays;
+    thread_mutex_unlock (&(config_locks()->relay_lock));
 }
 
 
@@ -823,14 +777,9 @@
     {
         int respcode = 0;
         if (sscanf (ptr, "HTTP%*s %d OK", &respcode) == 1 && respcode == 200)
-        {
             master->ok = 1;  // needed if resetting master relays ???
-        }
         else
-        {
             WARN1 ("Failed response from master \"%s\"", (char*)ptr);
-            return -1;
-        }
     }
     return passed_len;
 }
@@ -846,6 +795,8 @@
     size_t len = passed_len + master->previous + 1;
     char *buffer, *buf;
 
+    if (master->ok == 0)
+        return passed_len;
     /* append newly read data to the end of any previous unprocess data */
     buffer = realloc (master->buffer, len);
     memcpy (buffer + master->previous, ptr, passed_len);
@@ -918,8 +869,7 @@
     struct master_conn_details *master = arg;
     CURL *handle;
     const char *protocol = "http";
-    int port = master->port, have_new_list = 1;
-    relay_server *cleanup_relays;
+    int port = master->port;
     char error [CURL_ERROR_SIZE];
     char url [1024], auth [100];
 
@@ -947,6 +897,7 @@
     if (master->bind)
         curl_easy_setopt (handle, CURLOPT_INTERFACE, master->bind);
 
+    master->ok = 0;
     if (curl_easy_perform (handle) != 0)
     {
         /* fall back to traditional request */
@@ -954,23 +905,12 @@
                 protocol, master->server, port, master->args);
         curl_easy_setopt (handle, CURLOPT_URL, url);
         if (curl_easy_perform (handle) != 0)
-        {
             WARN2 ("Failed URL access \"%s\" (%s)", url, error);
-            have_new_list = 0;
-        }
     }
-    if (have_new_list)
-    {
-        /* merge retrieved relays */
-        thread_mutex_lock (&(config_locks()->relay_lock));
-        cleanup_relays = update_relays (&global.master_relays, master->new_relays);
+    if (master->ok)     /* merge retrieved relays */
+        update_relays (&global.master_relays, master->new_relays);
+    update_relays (NULL, master->new_relays);
 
-        relay_check_streams (global.master_relays, cleanup_relays, 1);
-
-        thread_mutex_unlock (&(config_locks()->relay_lock));
-    }
-    relay_check_streams (NULL, master->new_relays, 0);
-
     curl_easy_cleanup (handle);
     free (master->server);
     free (master->username);
@@ -1069,8 +1009,6 @@
 
     while (1)
     {
-        relay_server *cleanup_relays = NULL;
-        int skip_timer = 0;
         struct timespec current;
 
         thread_get_timespec (&current);
@@ -1088,30 +1026,18 @@
 
         if (streamlist_check <= current.tv_sec)
         {
-            ice_config_t *config;
+            ice_config_t *config = config_get_config();
 
-            if (streamlist_check == 0)
-                skip_timer = 1;
-
-            thread_mutex_lock (&(config_locks()->relay_lock));
-            config = config_get_config();
-
             streamlist_check = current.tv_sec + config->master_update_interval;
             update_master_as_slave (config);
 
             update_from_master (config);
 
-            cleanup_relays = update_relays (&global.relays, config->relay);
+            update_relays (&global.relays, config->relay);
 
             config_release_config();
         }
-        else
-            thread_mutex_lock (&(config_locks()->relay_lock));
 
-        relay_check_streams (global.relays, cleanup_relays, skip_timer);
-        relay_check_streams (global.master_relays, NULL, skip_timer);
-        thread_mutex_unlock (&(config_locks()->relay_lock));
-
         if (update_settings)
         {
             source_recheck_mounts (update_all_mounts);
@@ -1129,8 +1055,8 @@
     connection_thread_shutdown();
     fserve_running = 0;
     INFO0 ("shutting down current relays");
-    relay_check_streams (NULL, global.relays, 0);
-    relay_check_streams (NULL, global.master_relays, 0);
+    update_relays (&global.relays, NULL);
+    update_relays (&global.master_relays, NULL);
     global.relays = NULL;
     global.master_relays = NULL;
     redirector_clearall();
@@ -1254,12 +1180,15 @@
     if (relay && relay->new_details)
     {
         relay_server *old_details = relay;
+
+        thread_mutex_lock (&(config_locks()->relay_lock));
         INFO1 ("Detected change in relay details for %s", relay->localmount);
         client->shared_data = relay->new_details;
+        relay = client->shared_data;
         relay->source = old_details->source;
         old_details->source = NULL;
-        relay_free (relay);
-        relay = client->shared_data;
+        relay_free (old_details);
+        thread_mutex_unlock (&(config_locks()->relay_lock));
     }
     return relay;
 }
@@ -1269,14 +1198,13 @@
 {
     relay_server *relay = get_relay_details (client);
     source_t *source = relay->source;
-    int ret = -1;
 
     thread_mutex_lock (&source->lock);
     if (source_running (source))
     {
-        if (relay->cleanup)
+        if (relay->cleanup || relay->running == 0)
             source->flags &= ~SOURCE_RUNNING;
-        if (relay->on_demand && source->listeners == 0)
+        if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 5000000)
             source->flags &= ~SOURCE_RUNNING;
         return source_read (source);
     }
@@ -1292,14 +1220,12 @@
             stats_event_args (NULL, "sources", "%d", global.sources);
             global_unlock();
             global_reduce_bitrate_sampling (global.out_bitrate);
-            connection_close (&client->connection);
         }
         /* don't pause listeners if relay shutting down */
         if (relay->running == 0)
             source->flags &= ~SOURCE_PAUSE_LISTENERS;
         // fallback listeners unless relay is to be retried
         source_shutdown (source, fallback);
-        source->flags |= SOURCE_TERMINATING;
     }
     if (source->termination_count && source->termination_count <= source->listeners)
     {
@@ -1309,15 +1235,11 @@
         return 0;
     }
     DEBUG1 ("all listeners have now been checked on %s", relay->localmount);
-    source->flags &= ~SOURCE_TERMINATING;
-    if (relay->running)
+    free (source->fallback.mount);
+    source->fallback.mount = NULL;
+    source->flags &= ~(SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC);
+    if (relay->cleanup)
     {
-        INFO1 ("standing by to restart relay on %s", relay->localmount);
-        thread_mutex_unlock (&source->lock);
-        ret = 0;
-    }
-    else
-    {
         if (source->listeners)
         {
             INFO1 ("listeners on terminating relay %s, rechecking", relay->localmount);
@@ -1329,9 +1251,26 @@
         thread_mutex_unlock (&source->lock);
         stats_event (relay->localmount, NULL, NULL); // needed???
         slave_update_all_mounts();
+        connection_close (&client->connection);
+        return -1;
     }
+    if (relay->running)
+    {
+        if (client->connection.con_time == 0)
+        {
+            client->schedule_ms = client->worker->time_ms + (relay->interval * 1000);
+            INFO2 ("standing by to restart relay on %s in %d seconds", relay->localmount, relay->interval);
+        }
+        else
+            INFO1 ("standing by to restart relay on %s", relay->localmount);
+    }
+    else
+        INFO1 ("Relay %s is now disabled", relay->localmount);
+
+    thread_mutex_unlock (&source->lock);
+    connection_close (&client->connection);
     client->ops = &relay_startup_ops;
-    return ret;
+    return 0;
 }
 
 
@@ -1358,7 +1297,7 @@
     }
     if (global.running != ICE_RUNNING)
         return 0; /* wait for cleanup */
-    if (relay->running == 0 || relay->start > worker->current_time.tv_sec)
+    if (relay->running == 0)
     {
         client->schedule_ms = client->worker->time_ms + 1000;
         return 0;

Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/source.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -70,6 +70,7 @@
 static int  send_to_listener (client_t *client);
 static int  send_listener (source_t *source, client_t *client);
 static int  wait_for_restart (client_t *client);
+static int  wait_for_other_listeners (client_t *client);
 
 static int  http_source_listener (client_t *client);
 static int  http_source_intro (client_t *client);
@@ -108,6 +109,12 @@
     client_destroy
 };
 
+struct _client_functions listener_wait_ops = 
+{
+    wait_for_other_listeners,
+    NULL
+};
+
 struct _client_functions source_client_http_ops =
 {
     source_client_http_send,
@@ -308,19 +315,20 @@
         p = to_go->next;
         to_go->next = NULL;
         // DEBUG1 ("queue refbuf count is %d", to_go->_count);
-        if (do_twice || to_go == source->burst_point)
+        if (do_twice || to_go == source->min_queue_point)
         { /* burst data is also counted */
             refbuf_release (to_go); 
             do_twice = 1;
         }
         refbuf_release (to_go);
     }
-    source->burst_point = NULL;
+    source->min_queue_point = NULL;
     source->stream_data = NULL;
     source->stream_data_tail = NULL;
 
-    source->burst_size = 0;
-    source->burst_offset = 0;
+    source->min_queue_size = 0;
+    source->min_queue_offset = 0;
+    source->default_burst_size = 0;
     source->queue_size = 0;
     source->queue_size_limit = 0;
     source->client_stats_update = 0;
@@ -440,7 +448,7 @@
             DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
             free (source->fallback.mount);
             source->fallback.mount = NULL;
-            source->flags &= ~SOURCE_TEMPORARY_FALLBACK;
+            source->flags &= ~SOURCE_LISTENERS_SYNC;
         }
         if (source->listeners == 0)
             rate_add (source->format->out_bitrate, 0, client->worker->time_ms);
@@ -521,8 +529,8 @@
                 if (source->stream_data == NULL)
                 {
                     source->stream_data = refbuf;
-                    source->burst_point = refbuf;
-                    source->burst_offset = 0;
+                    source->min_queue_point = refbuf;
+                    source->min_queue_offset = 0;
                 }
                 if (source->stream_data_tail)
                 {
@@ -536,17 +544,21 @@
                 refbuf_addref (refbuf);
 
                 /* move the starting point for new listeners */
-                source->burst_offset += refbuf->len;
-                while (source->burst_offset > source->burst_size)
+                source->min_queue_offset += refbuf->len;
+                while (source->min_queue_offset > source->min_queue_size)
                 {
-                    refbuf_t *to_release = source->burst_point;
+                    refbuf_t *to_release = source->min_queue_point;
                     if (to_release && to_release->next)
                     {
-                        source->burst_offset -= to_release->len;
-                        source->burst_point = to_release->next;
+                        source->min_queue_offset -= to_release->len;
+                        source->min_queue_point = to_release->next;
                         refbuf_release (to_release);
                         continue;
                     }
+                    DEBUG0 ("weird state of min_queue point");
+                    refbuf_release (source->min_queue_point);
+                    source->min_queue_point = refbuf;
+                    source->min_queue_offset = refbuf->len;
                     break;
                 }
 
@@ -607,10 +619,8 @@
     else
     {
         if ((source->flags & SOURCE_TERMINATING) == 0)
-        {
             source_shutdown (source, 1);
-            source->flags |= SOURCE_TERMINATING;
-        }
+
         if (source->termination_count && source->termination_count <= source->listeners)
         {
             DEBUG3 ("%s waiting (%lu, %lu)", source->mount, source->termination_count, source->listeners);
@@ -623,6 +633,7 @@
             client->ops = &source_client_halt_ops;
             free (source->fallback.mount);
             source->fallback.mount = NULL;
+            source->flags &= ~SOURCE_LISTENERS_SYNC;
         }
         thread_mutex_unlock (&source->lock);
     }
@@ -636,10 +647,8 @@
     refbuf_t *refbuf;
 
     if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
-    {
-        client->schedule_ms += 150;
         return -1;
-    }
+
     refbuf = client->refbuf;
 
     /* move to the next buffer if we have finished with the current one */
@@ -660,7 +669,6 @@
 {
     refbuf_t *refbuf;
     long lag = 0;
-    int ret = -1, pos = 0;
 
     /* we only want to attempt a burst at connection time, not midstream
      * however streams like theora may not have the most recent page marked as
@@ -668,17 +676,25 @@
     if (source->stream_data_tail == NULL)
         return -1;
     refbuf = source->stream_data_tail;
-    if (client->intro_offset == -1 && (refbuf->flags & SOURCE_BLOCK_SYNC))
+    if (client->connection.sent_bytes > source->min_queue_offset && (refbuf->flags & SOURCE_BLOCK_SYNC))
     {
-        pos = 0;
         lag = refbuf->len;
     }
     else
     {
-        size_t size = client->intro_offset;
-        refbuf = source->burst_point;
-        lag = source->burst_offset;
-        while (size > 0 && refbuf && refbuf->next)
+        const char *header = httpp_getvar (client->parser, "initial-burst");
+        const char *arg = httpp_get_query_param (client->parser, "burst");
+        size_t size = source->min_queue_size;
+        off_t v = source->default_burst_size;
+        if (arg)
+            v = atol (arg);
+        else if (header)
+            v = atol (header);
+        v -= client->connection.sent_bytes; /* have we sent data already */
+        refbuf = source->min_queue_point;
+        lag = source->min_queue_offset;
+        // DEBUG3 ("size %lld, v %lld, lag %ld", size, v, lag);
+        while (size > v && refbuf && refbuf->next)
         {
             size -= refbuf->len;
             lag -= refbuf->len;
@@ -694,15 +710,15 @@
         {
             client_set_queue (client, refbuf);
             client->intro_offset = -1;
-            client->pos = pos;
+            client->pos = 0;
             client->queue_pos = source->client->queue_pos - lag;
-            ret = 0;
-            break;
+            return 0;
         }
         lag -= refbuf->len;
         refbuf = refbuf->next;
     }
-    return ret;
+    client->schedule_ms += 150;
+    return -1;
 }
 
 
@@ -710,15 +726,8 @@
 {
     source_t *source = client->shared_data;
 
-    if (client->refbuf == NULL && source->intro_file)
+    if (format_file_read (client, source->format, source->intro_file) < 0)
     {
-        client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-        client->pos = client->refbuf->len;
-        client->intro_offset = 0;
-        client->queue_pos = 0;
-    }
-    if (format_file_read (client, source->intro_file) < 0)
-    {
         if (source->stream_data_tail)
         {
             /* better find the right place in queue for this client */
@@ -740,7 +749,7 @@
     source_t *source = client->shared_data;
     int ret;
 
-    if (refbuf == NULL)
+    if (refbuf == NULL || client->respcode)
     {
         client->check_buffer = http_source_intro;
         return http_source_intro (client);
@@ -781,10 +790,12 @@
             client->refbuf = refbuf->next;
             refbuf->next = NULL;
             refbuf_release (refbuf);
+            if (client->refbuf == NULL)
+                client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
             client->pos = 0;
         }
         else
-            client->pos = refbuf->len = 4096;
+            client_set_queue (client, NULL);
         client->connection.sent_bytes = 0;
         return ret;
     }
@@ -803,11 +814,10 @@
         if (ref && client->pos < ref->len && ref->flags&SOURCE_QUEUE_BLOCK)
         {
             /* make a private copy so that a write can complete */
-            refbuf_t *orig = ref;
-            ref = refbuf_new (ref->len);
-            memcpy (ref->data, orig->data, orig->len);
+            refbuf_t *copy = refbuf_copy (client->refbuf);
+
             refbuf_release (client->refbuf);
-            client->refbuf = ref;
+            client->refbuf = copy;
             client->flags |= CLIENT_HAS_INTRO_CONTENT;
         }
         if ((client->flags & CLIENT_HAS_INTRO_CONTENT) == 0)
@@ -818,17 +828,43 @@
 }
 
 
+/* used to hold listeners in waiting over a relay restart. Handling of a failed relay also
+ * needs to occur.
+ */
 static int wait_for_restart (client_t *client)
 {
     source_t *source = client->shared_data;
 
-    if (source_running (source) || source->termination_count || client->connection.error)
+    if (source_running (source) || (source->flags & SOURCE_PAUSE_LISTENERS) == 0 || client->connection.error)
+    {
         client->ops = &listener_client_ops;
+        return 0;
+    }
+    if (source->flags & SOURCE_LISTENERS_SYNC)
+        client->schedule_ms = client->worker->time_ms + 100;
     else
+        client->schedule_ms = client->worker->time_ms + 300;
+    return 0;
+}
+
+
+/* used to hold listeners that have already been processed while other listeners
+ * are still to be done
+ */
+static int wait_for_other_listeners (client_t *client)
+{
+    source_t *source = client->shared_data;
+
+    if (source->flags & SOURCE_LISTENERS_SYNC)
+    {
         client->schedule_ms = client->worker->time_ms + 150;
+        return 0;
+    }
+    client->ops = &listener_client_ops;
     return 0;
 }
 
+
 /* general send routine per listener.
  */
 static int send_to_listener (client_t *client)
@@ -848,16 +884,11 @@
     return ret;
 }
 
-static int send_listener (source_t *source, client_t *client)
+
+int listener_waiting_on_source (source_t *source, client_t *client)
 {
-    int bytes;
-    int loop = 10;   /* max number of iterations in one go */
-    long total_written = 0;
-    int ret = 0;
-
-    if (client->connection.error)
-        return -1;
-
+    source->termination_count--;
+    //DEBUG2 ("termination count on %s now %lu", source->mount, source->termination_count);
     if (source->fallback.mount)
     {
         int move_failed;
@@ -866,17 +897,14 @@
         thread_mutex_unlock (&source->lock);
         move_failed = move_listener (client, &source->fallback);
         thread_mutex_lock (&source->lock);
-        if (move_failed)
-            source_setup_listener (source, client);
-        source->termination_count--;
-        return 0;
+        if (move_failed == 0)
+            return 0;
+        source_setup_listener (source, client);
     }
     if (source->flags & SOURCE_TERMINATING)
     {
-        DEBUG2 ("termination count on %s now %lu", source->mount, source->termination_count);
         if ((source->flags & SOURCE_PAUSE_LISTENERS) && global.running == ICE_RUNNING)
         {
-            source->termination_count--;
             if (client->refbuf && (client->refbuf->flags & SOURCE_QUEUE_BLOCK))
                 client_set_queue (client, NULL);
             client->ops = &listener_pause_ops;
@@ -886,6 +914,27 @@
         }
         return -1;
     }
+    /* wait for all source listeners to go through this */
+    // DEBUG1 ("listener now waiting for the other %d listeners", source->termination_count);
+    client->ops = &listener_wait_ops;
+    client->schedule_ms = client->worker->time_ms + 60;
+    return 0;
+}
+
+
+static int send_listener (source_t *source, client_t *client)
+{
+    int bytes;
+    int loop = 10;   /* max number of iterations in one go */
+    long total_written = 0;
+    int ret = 0;
+
+    if (client->connection.error)
+        return -1;
+
+    if (source->flags & SOURCE_LISTENERS_SYNC)
+        return listener_waiting_on_source (source, client);
+
     /* check for limited listener time */
     if (client->connection.discon_time &&
             client->worker->current_time.tv_sec >= client->connection.discon_time)
@@ -995,6 +1044,9 @@
     stats_event_flags (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
     stats_event_flags (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
     stats_event_flags (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
+    stats_event_flags (source->mount, "outgoing_kbitrate", "0", STATS_COUNTERS);
+    stats_event_flags (source->mount, "incoming_bitrate", "0", STATS_COUNTERS);
+    stats_event_flags (source->mount, "connected", "0", STATS_COUNTERS);
     stats_event (source->mount, "source_ip", source->client->connection.ip);
 
     source->last_read = time(NULL);
@@ -1020,6 +1072,7 @@
     source->format->in_bitrate = rate_setup (60, 1);
     source->format->out_bitrate = rate_setup (9000, 1000);
 
+    source->flags |= SOURCE_RUNNING;
     thread_mutex_unlock (&source->lock);
 
     mountinfo = config_find_mount (config_get_config(), source->mount);
@@ -1044,7 +1097,6 @@
     config_release_config();
 
     INFO1 ("Source %s initialised", source->mount);
-    source->flags |= SOURCE_RUNNING;
 
     /* on demand relays should of already called this */
     if ((source->flags & SOURCE_ON_DEMAND) == 0)
@@ -1068,6 +1120,7 @@
                 source->fallback.limit = 0;
                 source->fallback.mount = strdup (dest);
                 source->termination_count = source->listeners;
+                source->flags |= SOURCE_LISTENERS_SYNC;
             }
             thread_mutex_unlock (&source->lock);
         }
@@ -1111,6 +1164,7 @@
             source_set_fallback (source, mountinfo->fallback_mount);
     }
     config_release_config();
+    source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC);
 }
 
 
@@ -1356,11 +1410,16 @@
         source->timeout = mountinfo->source_timeout;
 
     if (mountinfo && mountinfo->burst_size >= 0)
-        source->burst_size = (unsigned int)mountinfo->burst_size;
+        source->default_burst_size = (unsigned int)mountinfo->burst_size;
 
-    if (source->burst_size > source->queue_size_limit - 50000)
-        source->queue_size_limit = source->burst_size + 50000;
+    if (mountinfo && mountinfo->min_queue_size >= 0)
+        source->min_queue_size = mountinfo->min_queue_size;
+    if (source->min_queue_size < source->default_burst_size)
+        source->min_queue_size = source->default_burst_size;
 
+    if (source->min_queue_size > source->queue_size_limit - 50000)
+        source->queue_size_limit = source->min_queue_size + 50000;
+
     source->wait_time = 0;
     if (mountinfo && mountinfo->wait_time)
         source->wait_time = (time_t)mountinfo->wait_time;
@@ -1374,8 +1433,9 @@
 {
     /* set global settings first */
     source->queue_size_limit = config->queue_size_limit;
+    source->min_queue_size = config->min_queue_size;
     source->timeout = config->source_timeout;
-    source->burst_size = config->burst_size;
+    source->default_burst_size = config->burst_size;
 
     stats_event_args (source->mount, "listenurl", "http://%s:%d%s",
             config->hostname, config->port, source->mount);
@@ -1421,7 +1481,8 @@
     }
     DEBUG1 ("public set to %d", source->yp_public);
     DEBUG1 ("queue size to %u", source->queue_size_limit);
-    DEBUG1 ("burst size to %u", source->burst_size);
+    DEBUG1 ("min queue size to %u", source->min_queue_size);
+    DEBUG1 ("burst size to %u", source->default_burst_size);
     DEBUG1 ("source timeout to %u", source->timeout);
 }
 
@@ -1777,7 +1838,7 @@
         if (mountinfo->max_listener_duration && client->connection.discon_time == 0)
             client->connection.discon_time = time(NULL) + mountinfo->max_listener_duration;
 
-        INFO3 ("max on %s is %ld (cur %lu)", source->mount,
+        INFO3 ("max on %s is %d (cur %lu)", source->mount,
                 mountinfo->max_listeners, source->listeners);
         within_limits = 1;
         if (mountinfo->max_bandwidth > -1 && stream_bitrate)
@@ -1825,6 +1886,7 @@
 
     source_setup_listener (source, client);
     client->flags |= CLIENT_ACTIVE;
+    worker_wakeup (client->worker);
     thread_mutex_unlock (&source->lock);
 
     return 0;
@@ -1950,7 +2012,7 @@
 
     thread_rwlock_rlock (&workers_lock);
     worker = find_least_busy_handler ();
-    if (worker != client->worker)
+    if (worker && worker != client->worker)
     {
         if (worker->count + source->listeners + 10 < client->worker->count)
         {

Modified: icecast/branches/kh/icecast/src/source.h
===================================================================
--- icecast/branches/kh/icecast/src/source.h	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/source.h	2010-12-03 15:56:14 UTC (rev 17711)
@@ -57,10 +57,11 @@
     int yp_public;
 
     /* per source burst handling for connecting clients */
-    unsigned int burst_size;    /* trigger level for burst on connect */
-    unsigned int burst_offset; 
-    refbuf_t *burst_point;
+    unsigned int default_burst_size;
 
+    refbuf_t *min_queue_point;
+    unsigned int min_queue_offset;
+    unsigned int min_queue_size;
     unsigned int queue_size;
     unsigned int queue_size_limit;
 
@@ -78,12 +79,12 @@
 
 } source_t;
 
-#define SOURCE_RUNNING              01
-#define SOURCE_ON_DEMAND            02
-#define SOURCE_PAUSE_LISTENERS      04
-#define SOURCE_SHOUTCAST_COMPAT     010
-#define SOURCE_TERMINATING          020
-#define SOURCE_TEMPORARY_FALLBACK   040
+#define SOURCE_RUNNING              1
+#define SOURCE_ON_DEMAND            (1<<1)
+#define SOURCE_SHOUTCAST_COMPAT     (1<<2)
+#define SOURCE_PAUSE_LISTENERS      (1<<3)
+#define SOURCE_TERMINATING          (1<<4)
+#define SOURCE_LISTENERS_SYNC       (1<<5)
 
 #define source_available(x)     (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && (x)->fallback.mount == NULL)
 #define source_running(x)       ((x)->flags & SOURCE_RUNNING)

Modified: icecast/branches/kh/icecast/src/stats.c
===================================================================
--- icecast/branches/kh/icecast/src/stats.c	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/src/stats.c	2010-12-03 15:56:14 UTC (rev 17711)
@@ -163,6 +163,7 @@
     stats_event_flags (NULL, "connections", "0", STATS_COUNTERS);
     stats_event_flags (NULL, "sources", "0", STATS_COUNTERS);
     stats_event_flags (NULL, "stats", "0", STATS_COUNTERS);
+    stats_event_flags (NULL, "banned_IPs", "0", STATS_COUNTERS);
     stats_event (NULL, "listeners", "0");
 
     /* global accumulating stats */
@@ -303,7 +304,7 @@
 void stats_event_inc(const char *source, const char *name)
 {
     stats_event_t event;
-    char buffer[VAL_BUFSIZE];
+    char buffer[VAL_BUFSIZE] = "1";
     build_event (&event, source, name, buffer);
     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
     event.action = STATS_EVENT_INC;
@@ -342,7 +343,7 @@
 void stats_event_dec(const char *source, const char *name)
 {
     stats_event_t event;
-    char buffer[VAL_BUFSIZE];
+    char buffer[VAL_BUFSIZE] = "0";
     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
     build_event (&event, source, name, buffer);
     event.action = STATS_EVENT_DEC;
@@ -509,7 +510,7 @@
 
         avl_insert(_stats.source_tree, (void *)snode);
     }
-    if (event->action == STATS_EVENT_REMOVE)
+    if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
     {
         avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
         avl_tree_unlock (_stats.source_tree);
@@ -986,7 +987,7 @@
     client_set_queue (client, NULL);
     client->refbuf = refbuf_new (100);
     snprintf (client->refbuf->data, 100,
-            "HTTP/1.0 200 OK\r\ncapability: streamlist\r\n\r\n");
+            "HTTP/1.0 200 OK\r\nCapability: streamlist\r\n\r\n");
     client->refbuf->len = strlen (client->refbuf->data);
     listener->content_len = client->refbuf->len;
     listener->recent_block = client->refbuf;

Added: icecast/branches/kh/icecast/web/7.xsl
===================================================================
--- icecast/branches/kh/icecast/web/7.xsl	                        (rev 0)
+++ icecast/branches/kh/icecast/web/7.xsl	2010-12-03 15:56:14 UTC (rev 17711)
@@ -0,0 +1,12 @@
+<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
+<xsl:output method="html" version="1.0" encoding="iso-8859-1" indent="yes"/>
+<xsl:template match = "/icestats" >
+
+	<xsl:for-each select="source">
+	   <xsl:if test="position()=1">
+		<xsl:value-of select="listeners" />,1,<xsl:value-of select="listener_peak" />,<xsl:value-of select="max_listeners" />,<xsl:value-of select="listeners" />,<xsl:value-of select="bitrate" />,<xsl:if test="artist"><xsl:value-of select="artist" /> - </xsl:if><xsl:value-of select="title" />
+	   </xsl:if>
+	</xsl:for-each>
+
+</xsl:template>
+</xsl:stylesheet>
\ No newline at end of file

Modified: icecast/branches/kh/icecast/win32/icecast2.iss
===================================================================
--- icecast/branches/kh/icecast/win32/icecast2.iss	2010-12-02 22:07:36 UTC (rev 17710)
+++ icecast/branches/kh/icecast/win32/icecast2.iss	2010-12-03 15:56:14 UTC (rev 17711)
@@ -3,7 +3,7 @@
 
 [Setup]
 AppName=Icecast2-KH
-AppVerName=Icecast v2.3.2-kh27
+AppVerName=Icecast v2.3.2-kh28
 AppPublisherURL=http://www.icecast.org
 AppSupportURL=http://www.icecast.org
 AppUpdatesURL=http://www.icecast.org
@@ -13,7 +13,7 @@
 LicenseFile=..\COPYING
 InfoAfterFile=..\README
 OutputDir=.
-OutputBaseFilename=icecast2_win32_v2.3.2-kh27_setup
+OutputBaseFilename=icecast2_win32_v2.3.2-kh28_setup
 WizardImageFile=icecast2logo2.bmp
 WizardImageStretch=no
 VersionInfoVersion=2.3.2



More information about the commits mailing list