[xiph-commits] r16219 - in icecast/branches/kh/icecast: . admin src win32

karl at svn.xiph.org karl at svn.xiph.org
Tue Jul 7 16:26:21 PDT 2009


Author: karl
Date: 2009-07-07 16:26:21 -0700 (Tue, 07 Jul 2009)
New Revision: 16219

Modified:
   icecast/branches/kh/icecast/NEWS
   icecast/branches/kh/icecast/admin/moveclients.xsl
   icecast/branches/kh/icecast/admin/stats.xsl
   icecast/branches/kh/icecast/config.h.vc6
   icecast/branches/kh/icecast/configure.in
   icecast/branches/kh/icecast/src/admin.c
   icecast/branches/kh/icecast/src/auth.c
   icecast/branches/kh/icecast/src/auth.h
   icecast/branches/kh/icecast/src/cfgfile.c
   icecast/branches/kh/icecast/src/cfgfile.h
   icecast/branches/kh/icecast/src/client.c
   icecast/branches/kh/icecast/src/client.h
   icecast/branches/kh/icecast/src/connection.c
   icecast/branches/kh/icecast/src/event.c
   icecast/branches/kh/icecast/src/format.c
   icecast/branches/kh/icecast/src/format.h
   icecast/branches/kh/icecast/src/format_mp3.c
   icecast/branches/kh/icecast/src/format_ogg.c
   icecast/branches/kh/icecast/src/fserve.c
   icecast/branches/kh/icecast/src/fserve.h
   icecast/branches/kh/icecast/src/global.c
   icecast/branches/kh/icecast/src/global.h
   icecast/branches/kh/icecast/src/main.c
   icecast/branches/kh/icecast/src/slave.c
   icecast/branches/kh/icecast/src/slave.h
   icecast/branches/kh/icecast/src/source.c
   icecast/branches/kh/icecast/src/source.h
   icecast/branches/kh/icecast/src/stats.c
   icecast/branches/kh/icecast/src/xslt.c
   icecast/branches/kh/icecast/win32/icecast.dsp
   icecast/branches/kh/icecast/win32/icecast2.iss
Log:
commit kh10 changes. Fairly major as it changes the threading. We now use N
workers (as specified in the xml) instead of the one thread per source model.
This affects most files as they tend to interact with a client to some degree
but some of those are in minor ways.

The most significant changes are in :-
- file serving engine. There is no separate thread now, so the client reads
and sends are driven from the worker, this includes the error responses.
- The source client and listener processing. The source client details have
to stay around until the listeners are processed.
- slave engine. relays are always allocated a client on the worker, with a
thread created just for the connection phase. Makes it easier to handle when
switching to another relay source.



Modified: icecast/branches/kh/icecast/NEWS
===================================================================
--- icecast/branches/kh/icecast/NEWS	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/NEWS	2009-07-07 23:26:21 UTC (rev 16219)
@@ -15,6 +15,34 @@
 any extra tags are show in the conf/icecast.xml.dist file
 
 
+2.3.2-kh10
+. big internal thread change. instead of one thread per stream, we now have a fixed number
+  of worker threads, each processing a set of clients (sources/relays/listeners etc).
+  - no source or fserve threads now. auth still has thread(s) as the API for libcurl
+    is blocking
+  - worker threads with no clients sleep for long durations.
+  - defaults to 1 worker thread, overridden by <workers> in <limits>
+  - listeners get moved to the worker thread running the source, helps caching
+  - sources get moved to a less busy thread if it would help
+  - each client has a timestamp to indicate when it wants service. The sleep duration
+    for the worker is based on the next client to be serviced.
+. if fallback to file is specified and the requested mount is not active then a 404
+  response is returned instead of the file contents.
+. fserve engine can handle bandwidth limiting per client. Fallback to file uses this
+  by using the average incoming bitrate of the source stream as the target limit.
+. file serving uses an internal cache of open files, so that 1000 listerners do not
+  suddenly open 1000 files if a fallback to file occurs.
+. move listeners admin request can specify another source or file.
+. relay changes
+  - the client structure is always allocated now, instead of when the relay is started.
+  - listeners stay on relay if switching to another relay master. Only when they all
+    have failed does the fallback apply.
+. post-kh9 fixes
+  - fix for possible race with rejected new listeners going through auth
+  - make mp3-metadata-interval 0 work again
+  - auth htpasswd checks for details provided like user:pass at host:port/mount and if not
+    then checks as host:port/mount?user=a&pass=b
+
 2.3.2-kh9
 . allow shoutcast source client auth work via stream_auth url
 . allow for a flash policy file.

Modified: icecast/branches/kh/icecast/admin/moveclients.xsl
===================================================================
--- icecast/branches/kh/icecast/admin/moveclients.xsl	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/admin/moveclients.xsl	2009-07-07 23:26:21 UTC (rev 16219)
@@ -23,8 +23,8 @@
 <xsl:for-each select="source">
 	<table border="0" cellpadding="6" cellspacing="5" >
 	<tr>        
-		<td>Move from (<xsl:copy-of select="$currentmount" />) to (<xsl:value-of select="@mount" />)</td>
-		<td><xsl:value-of select="listeners" /> Listeners</td>
+		<td>Move from <xsl:copy-of select="$currentmount" /> to <xsl:value-of select="@mount" /></td>
+		<td>(<xsl:value-of select="listeners" /> Listeners)</td>
 		<td><a class="nav2" href="moveclients.xsl?mount={$currentmount}&amp;destination={@mount}">Move Clients</a></td>
 	</tr>        
 	</table>

Modified: icecast/branches/kh/icecast/admin/stats.xsl
===================================================================
--- icecast/branches/kh/icecast/admin/stats.xsl	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/admin/stats.xsl	2009-07-07 23:26:21 UTC (rev 16219)
@@ -71,7 +71,7 @@
 	<tr>        
 	    <td align="center">
 		    <a class="nav2" href="listclients.xsl?mount={@mount}">List Clients</a>
-        	<a class="nav2" href="moveclients.xsl?mount={@mount}">Move MountPoints</a>
+        	<a class="nav2" href="moveclients.xsl?mount={@mount}">Move Listeners</a>
         	<a class="nav2" href="updatemetadata.xsl?mount={@mount}">Update Metadata</a>
         	<a class="nav2" href="killsource.xsl?mount={@mount}">Kill Source</a>
                 <xsl:if test="authenticator"><a class="nav2" href="manageauth.xsl?mount={@mount}">Manage Authentication</a></xsl:if>

Modified: icecast/branches/kh/icecast/config.h.vc6
===================================================================
--- icecast/branches/kh/icecast/config.h.vc6	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/config.h.vc6	2009-07-07 23:26:21 UTC (rev 16219)
@@ -95,7 +95,7 @@
 #define PACKAGE_NAME "Icecast"
 
 /* Version number of package */
-#define VERSION "2.3.2-kh9"
+#define VERSION "2.3.2-kh10"
 
 /* Define to the version of this package. */
 #define PACKAGE_VERSION VERSION
@@ -128,6 +128,7 @@
 /* define if compiler does not handle __attribute__ keyword */
 #define __attribute__(x)
 
+#define HAVE_STRCASECMP 1
 #define strcasecmp _stricmp
 #define strncasecmp _strnicmp
 

Modified: icecast/branches/kh/icecast/configure.in
===================================================================
--- icecast/branches/kh/icecast/configure.in	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/configure.in	2009-07-07 23:26:21 UTC (rev 16219)
@@ -1,4 +1,4 @@
-AC_INIT([Icecast], [2.3.2-kh9b], [karl at xiph.org])
+AC_INIT([Icecast], [2.3.2-kh10], [karl at xiph.org])
 
 AC_PREREQ(2.59)
 AC_CONFIG_SRCDIR(src/main.c)
@@ -42,11 +42,14 @@
 
 dnl Check for types
 AC_TYPE_OFF_T
+AC_CHECK_TYPES([struct timespec])
 
 dnl Checks for library functions.
-AC_CHECK_FUNCS([localtime_r poll atoll strtoll getrlimit gettimeofday ftime fstat])
+AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync])
 AC_SEARCH_LIBS(nanosleep, rt posix4,
         AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep]))
+AC_SEARCH_LIBS(clock_gettime, rt posix4,
+        AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Define if you have clock_gettime]))
 XIPH_NET
 
 dnl -- configure options --

Modified: icecast/branches/kh/icecast/src/admin.c
===================================================================
--- icecast/branches/kh/icecast/src/admin.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/admin.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -83,7 +83,6 @@
 };
 
 
-static time_t now;
 
 static struct admin_command admin_general[] =
 {
@@ -221,7 +220,7 @@
         client->refbuf->len = len;
         xmlFree(buff);
         client->respcode = 200;
-        fserve_add_client (client, NULL);
+        fserve_setup_client (client, NULL);
     }
     if (response == XSLT)
     {
@@ -436,7 +435,7 @@
             "<html><head><title>Admin request successful</title></head>"
             "<body><p>%s</p></body></html>", message);
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -444,7 +443,6 @@
     int response)
 {
     const char *dest_source;
-    source_t *dest;
     xmlDocPtr doc;
     xmlNodePtr node;
     int parameters_passed = 0;
@@ -459,34 +457,14 @@
         xmlFreeDoc(doc);
         return;
     }
+    INFO2 ("source is \"%s\", destination is \"%s\"", source->mount, dest_source);
 
-    dest = source_find_mount (dest_source);
-
-    if (dest == NULL)
-    {
-        client_send_400 (client, "No such destination");
-        return;
-    }
-
-    if (strcmp (dest->mount, source->mount) == 0)
-    {
-        client_send_400 (client, "supplied mountpoints are identical");
-        return;
-    }
-
-    if (dest->running == 0 && dest->on_demand == 0)
-    {
-        client_send_400 (client, "Destination not running");
-        return;
-    }
-
-    INFO2 ("source is \"%s\", destination is \"%s\"", source->mount, dest->mount);
-
     doc = xmlNewDoc(XMLSTR("1.0"));
     node = xmlNewDocNode(doc, NULL, XMLSTR("iceresponse"), NULL);
     xmlDocSetRootElement(doc, node);
 
-    source_move_clients (source, dest);
+    source_set_fallback (source, dest_source);
+    source->flags |= SOURCE_TEMPORARY_FALLBACK;
 
     snprintf (buf, sizeof(buf), "Clients moved from %s to %s",
             source->mount, dest_source);
@@ -611,7 +589,7 @@
         if (relay->enable == 0)
         {
             if (relay->source && source_running (relay->source) == 0)
-                relay->source->on_demand = 0;
+                relay->source->flags &= ~SOURCE_ON_DEMAND;
         }
         slave_update_all_mounts();
     }
@@ -631,6 +609,7 @@
 {
     const char *useragent;
     char buf[30];
+    source_t *source = listener->shared_data;
 
     xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
 
@@ -647,12 +626,15 @@
         xmlFree (str);
     }
 
-    snprintf (buf, sizeof (buf), "%u", listener->lag);
+    snprintf (buf, sizeof (buf), "%ld", (long)(source->client->queue_pos - listener->queue_pos));
     xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
 
-    snprintf (buf, sizeof (buf), "%lu",
-            (unsigned long)(now - listener->con->con_time));
-    xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
+    if (listener->worker)
+    {
+        snprintf (buf, sizeof (buf), "%lu",
+                (unsigned long)(listener->worker->current_time.tv_sec - listener->con->con_time));
+        xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
+    }
     if (listener->username)
     {
         xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username));
@@ -674,8 +656,7 @@
 
     thread_mutex_lock (&source->lock);
 
-    now = time(NULL);
-    listener = source->active_clients;
+    listener = source->client_list;
     while (listener)
     {
         add_listener_node (srcnode, listener);
@@ -755,8 +736,7 @@
         client_t *listener;
         thread_mutex_lock (&source->lock);
 
-        now = time(NULL);
-        listener = source->active_clients;
+        listener = source->client_list;
         while (listener)
         {
             if (listener->con->id == id)
@@ -786,7 +766,7 @@
         if (source->format->get_image (client, source->format) == 0)
         {
             thread_mutex_unlock (&source->lock);
-            fserve_add_client (client, NULL);
+            fserve_setup_client (client, NULL);
             return;
         }
         thread_mutex_unlock (&source->lock);
@@ -821,7 +801,7 @@
     config_release_config();
 
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -926,7 +906,7 @@
     xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
     xmlDocSetRootElement(doc, node);
 
-    source->running = 0;
+    source->flags &= ~SOURCE_RUNNING;
 
     admin_send_response(doc, client, response, "response.xsl");
     xmlFreeDoc(doc);
@@ -1085,7 +1065,7 @@
         return;
     }
 
-    if (source->shoutcast_compat == 0)
+    if ((source->flags & SOURCE_SHOUTCAST_COMPAT) == 0)
     {
         ERROR0 ("illegal change of metadata on non-shoutcast compatible stream");
         client_send_400 (client, "illegal metadata call");
@@ -1192,7 +1172,7 @@
         http->next = content; 
         client->respcode = 200;
         client_set_queue (client, http);
-        fserve_add_client (client, NULL);
+        fserve_setup_client (client, NULL);
     }
 }
 
@@ -1214,7 +1194,7 @@
             client->refbuf->next = stats_get_streams (1);
         else
             client->refbuf->next = stats_get_streams (0);
-        fserve_add_client (client, NULL);
+        fserve_setup_client (client, NULL);
     }
     else
     {

Modified: icecast/branches/kh/icecast/src/auth.c
===================================================================
--- icecast/branches/kh/icecast/src/auth.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/auth.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -189,9 +189,9 @@
         client_t *client = auth_user->client;
 
         if (client->respcode)
-            client_destroy (client);
-        else
-            client_send_401 (client, auth_user->auth->realm);
+            client->con->error = 1;
+        client_send_401 (client, auth_user->auth->realm);
+        client->flags |= CLIENT_ACTIVE;
         auth_user->client = NULL;
     }
     auth_release (auth_user->auth);
@@ -354,6 +354,29 @@
 }
 
 
+void move_listener (client_t *client, struct _fbinfo *finfo)
+{
+    source_t *source;
+
+    DEBUG1 ("moving listener to %s", finfo->mount);
+    avl_tree_rlock (global.source_tree);
+    source = source_find_mount (finfo->mount);
+
+    if (source && source_available (source))
+    {
+        thread_mutex_lock (&source->lock);
+        avl_tree_unlock (global.source_tree);
+        source_setup_listener (source, client);
+        thread_mutex_unlock (&source->lock);
+    }
+    else
+    {
+        avl_tree_unlock (global.source_tree);
+        fserve_setup_client_fb (client, finfo);
+    }
+}
+
+
 /* Add listener to the pending lists of either the source or fserve thread. This can be run
  * from the connection or auth thread context. return -1 to indicate that client has been
  * terminated, 0 for receiving content.
@@ -402,7 +425,7 @@
         {
             DEBUG1 ("disable seek on file matching %s", mountinfo->mountname);
             httpp_deletevar (client->parser, "range");
-            httpp_setvar (client->parser, HTTPP_VAR_NO_CONTENT_LENGTH, "yes");
+            client->flags |= CLIENT_NO_CONTENT_LENGTH;
         }
         ret = fserve_client_create (client, mount);
     }
@@ -430,7 +453,10 @@
         else if (auth->rejected_mount)
             mount = auth->rejected_mount;
         else
+        {
+            client->flags |= CLIENT_ACTIVE;
             return -1;
+        }
     }
     config = config_get_config();
     mountinfo = config_find_mount (config, mount);
@@ -462,6 +488,7 @@
         DEBUG1 ("on mountpoint %s", mount);
         source_startup (client, mount);
     }
+    client->flags |= CLIENT_ACTIVE;
 }
 
 
@@ -473,9 +500,6 @@
     mount_proxy *mountinfo; 
     ice_config_t *config;
 
-    /* we don't need any more data from the listener, just setup for writing */
-    client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
-
     if (connection_check_relay_pass (client->parser))
     {
         client->flags |= (CLIENT_IS_SLAVE|CLIENT_AUTHENTICATED);
@@ -502,6 +526,7 @@
         }
         auth_user = auth_client_setup (mount, client);
         auth_user->process = auth_new_listener;
+        client->flags &= ~CLIENT_ACTIVE;
         INFO0 ("adding client for authentication");
         queue_auth_client (auth_user, mountinfo);
     }
@@ -690,6 +715,7 @@
 
         auth_user->process = stream_auth_callback;
         INFO1 ("request source auth for \"%s\"", mount);
+        client->flags &= ~CLIENT_ACTIVE;
         queue_auth_client (auth_user, mountinfo);
         return 1;
     }

Modified: icecast/branches/kh/icecast/src/auth.h
===================================================================
--- icecast/branches/kh/icecast/src/auth.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/auth.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -19,6 +19,7 @@
 
 struct source_tag;
 struct auth_tag;
+struct _fbinfo;
 typedef struct _auth_thread_t auth_thread_t;
 
 #include <libxml/xmlmemory.h>
@@ -106,6 +107,7 @@
 
 void auth_add_listener (const char *mount, client_t *client);
 int  auth_release_listener (client_t *client, const char *mount, struct _mount_proxy *mountinfo);
+void move_listener (client_t *client, struct _fbinfo *finfo);
 int  auth_check_source (client_t *client, const char *mount);
 
 void auth_initialise (void);

Modified: icecast/branches/kh/icecast/src/cfgfile.c
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/cfgfile.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -469,6 +469,7 @@
     configuration->client_limit = CONFIG_DEFAULT_CLIENT_LIMIT;
     configuration->source_limit = CONFIG_DEFAULT_SOURCE_LIMIT;
     configuration->queue_size_limit = CONFIG_DEFAULT_QUEUE_SIZE_LIMIT;
+    configuration->workers_count = 1;
     configuration->client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT;
     configuration->header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT;
     configuration->source_timeout = CONFIG_DEFAULT_SOURCE_TIMEOUT;
@@ -927,6 +928,7 @@
         { "sources",        config_get_int,    &config->source_limit },
         { "queue-size",     config_get_int,    &config->queue_size_limit },
         { "burst-size",     config_get_int,    &config->burst_size },
+        { "workers",        config_get_int,    &config->workers_count },
         { "client-timeout", config_get_int,    &config->client_timeout },
         { "header-timeout", config_get_int,    &config->header_timeout },
         { "source-timeout", config_get_int,    &config->source_timeout },
@@ -934,6 +936,8 @@
     };
     if (parse_xml_tags (node, icecast_tags))
         return -1;
+    if (config->workers_count < 1)   config->workers_count = 1;
+    if (config->workers_count > 400) config->workers_count = 400;
     return 0;
 }
 

Modified: icecast/branches/kh/icecast/src/cfgfile.h
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/cfgfile.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -28,6 +28,7 @@
 
 #include "avl/avl.h"
 #include "auth.h"
+#include "compat.h"
 
 typedef struct ice_config_dir_tag
 {
@@ -84,8 +85,7 @@
     /* duration in seconds for sampling the bandwidth */
     int avg_bitrate_duration;
 
-    /* trigger level at which a timer is used to prevent excessive incoming bandwidth */
-    int64_t limit_rate;
+    long limit_rate;
 
     /* duration (secs) for mountpoint to be kept reserved after source client exits */
     int wait_time;
@@ -154,7 +154,6 @@
     int cleanup;
     int enable;
     time_t start;
-    thread_type *thread;
     struct _relay_server *next;
 } relay_server;
 
@@ -204,6 +203,7 @@
     int client_limit;
     int source_limit;
     unsigned int queue_size_limit;
+    int workers_count;
     unsigned int burst_size;
     int client_timeout;
     int header_timeout;

Modified: icecast/branches/kh/icecast/src/client.c
===================================================================
--- icecast/branches/kh/icecast/src/client.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/client.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -26,6 +26,7 @@
 #include "thread/thread.h"
 #include "avl/avl.h"
 #include "httpp/httpp.h"
+#include "timing/timing.h"
 
 #include "cfgfile.h"
 #include "connection.h"
@@ -42,6 +43,8 @@
 #undef CATMODULE
 #define CATMODULE "client"
 
+int worker_count;
+
 /* create a client_t with the provided connection and parser details. Return
  * client_t ready for use.  Should be called with global lock held.
  */
@@ -54,7 +57,7 @@
 
     global.clients++;
 
-    if (con->serversock != SOCK_ERROR)
+    if (con && con->serversock != SOCK_ERROR)
     {
         int i;
         for (i=0; i < global.server_sockets; i++)
@@ -69,10 +72,7 @@
     stats_event_args (NULL, "clients", "%d", global.clients);
     client->con = con;
     client->parser = parser;
-    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-    client->refbuf->len = 0; /* force reader code to ignore buffer contents */
     client->pos = 0;
-    client->write_to_client = format_generic_write_to_client;
     return client;
 }
 
@@ -126,19 +126,14 @@
 {
     int bytes;
 
-    if (client->refbuf && client->refbuf->len)
+    if (client->refbuf && client->pos < client->refbuf->len)
     {
-        /* we have data to read from a refbuf first */
-        if (client->refbuf->len < len)
-            len = client->refbuf->len;
-        memcpy (buf, client->refbuf->data, len);
-        if (len < client->refbuf->len)
-        {
-            char *ptr = client->refbuf->data;
-            memmove (ptr, ptr+len, client->refbuf->len - len);
-        }
-        client->refbuf->len -= len;
-        return len;
+        unsigned remaining = client->refbuf->len - client->pos;
+        if (remaining > len)
+            remaining = len;
+        memcpy (buf, client->refbuf->data + client->pos, remaining);
+        client->pos += remaining;
+        return remaining;
     }
     bytes = client->con->read (client->con, buf, len);
 
@@ -158,7 +153,7 @@
             "Moved <a href=\"%s\">here</a>\r\n", location, location);
     client->respcode = 302;
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -169,7 +164,7 @@
             "<b>%s</b>\r\n", message);
     client->respcode = 400;
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -180,7 +175,9 @@
     if (realm == NULL)
         realm = config->server_id;
 
-    snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
+    refbuf_release (client->refbuf);
+    client->refbuf = refbuf_new (500);
+    snprintf (client->refbuf->data, 500,
             "HTTP/1.0 401 Authentication Required\r\n"
             "WWW-Authenticate: Basic realm=\"%s\"\r\n"
             "\r\n"
@@ -188,7 +185,7 @@
     config_release_config();
     client->respcode = 401;
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -201,7 +198,7 @@
             "Content-Type: text/html\r\n\r\n", reason);
     client->respcode = 403;
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 void client_send_403redirect (client_t *client, const char *mount, const char *reason)
@@ -214,22 +211,25 @@
 
 void client_send_404(client_t *client, const char *message)
 {
-    if (client->respcode)
+    if (client->worker == NULL)   /* client is not on any worker now */
     {
         client_destroy (client);
         return;
     }
-    if (message == NULL)
-        message = "Not Available";
-    if (client->refbuf == NULL)
+    client_set_queue (client, NULL);
+    if (client->respcode == 0)
+    {
+        if (message == NULL)
+            message = "Not Available";
         client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-    snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
-            "HTTP/1.0 404 Not Available\r\n"
-            "Content-Type: text/html\r\n\r\n"
-            "<b>%s</b>\r\n", message);
-    client->respcode = 404;
-    client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+        snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
+                "HTTP/1.0 404 Not Available\r\n"
+                "Content-Type: text/html\r\n\r\n"
+                "<b>%s</b>\r\n", message);
+        client->respcode = 404;
+        client->refbuf->len = strlen (client->refbuf->data);
+    }
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -239,7 +239,7 @@
             "HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
     client->respcode = 416;
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client (client, NULL);
+    fserve_setup_client (client, NULL);
 }
 
 
@@ -290,3 +290,217 @@
         refbuf_release (to_release);
 }
 
+
+worker_t *find_least_busy_handler (void)
+{
+    worker_t *handler, *min = NULL;
+
+    if (workers)
+    {
+        min = workers;
+        handler = workers->next;
+        DEBUG2 ("handler %p has %d clients", min, min->count);
+        while (handler)
+        {
+            DEBUG2 ("handler %p has %d clients", handler, handler->count);
+            if (handler->count < min->count)
+                min = handler;
+            handler = handler->next;
+        }
+    }
+    return min;
+}
+
+
+void client_change_worker (client_t *client, worker_t *dest_worker)
+{
+    worker_t *this_worker = client->worker;
+
+    // make sure this client list is ok
+    *this_worker->current_p = client->next_on_worker;
+    this_worker->count--;
+    thread_mutex_unlock (&this_worker->lock);
+
+    thread_mutex_lock (&dest_worker->lock);
+    if (dest_worker->running)
+    {
+        client->worker = dest_worker;
+        client->next_on_worker = dest_worker->clients;
+        dest_worker->clients = client;
+        dest_worker->count++;
+        client->flags |= CLIENT_HAS_CHANGED_THREAD;
+        // make client inactive so that the destination thread does not run it straight away
+        client->flags &= ~CLIENT_ACTIVE;
+    }
+    thread_mutex_unlock (&dest_worker->lock);
+    thread_mutex_lock (&this_worker->lock);
+}
+
+
+void client_add_worker (client_t *client)
+{
+    worker_t *handler;
+
+    thread_rwlock_rlock (&workers_lock);
+    /* add client to the handler with the least number of clients */
+    handler = find_least_busy_handler();
+    thread_mutex_lock (&handler->lock);
+    thread_rwlock_unlock (&workers_lock);
+
+    client->schedule_ms = handler->time_ms;
+    client->next_on_worker = handler->clients;
+    handler->clients = client;
+    client->worker = handler;
+    ++handler->count;
+    if (handler->wakeup_ms - handler->time_ms > 15)
+        thread_cond_signal (&handler->cond); /* wake thread if required */
+    thread_mutex_unlock (&handler->lock);
+}
+
+
+void *worker (void *arg)
+{
+    worker_t *handler = arg;
+    client_t *client, **prevp;
+    long prev_count = -1;
+    struct timespec wakeup_time;
+
+    handler->running = 1;
+    thread_mutex_lock (&handler->lock);
+    thread_get_timespec (&handler->current_time);
+    handler->time_ms = THREAD_TIME_MS (&handler->current_time);
+    wakeup_time = handler->current_time;
+
+    prevp = &handler->clients;
+    while (handler->running)
+    {
+        if (prev_count != handler->count)
+        {
+            DEBUG2 ("%p now has %d clients", handler, handler->count);
+            prev_count = handler->count;
+        }
+        thread_cond_timedwait (&handler->cond, &handler->lock, &wakeup_time);
+        thread_get_timespec (&handler->current_time);
+        handler->time_ms = THREAD_TIME_MS (&handler->current_time);
+        handler->wakeup_ms = handler->time_ms + 60000;
+        client = handler->clients;
+        prevp = &handler->clients;
+        while (client)
+        {
+            /* process client details but skip those that are not ready yet */
+            if (client->flags & CLIENT_ACTIVE)
+            {
+                if (client->schedule_ms <= handler->time_ms+15)
+                {
+                    int ret;
+
+                    handler->current_p = prevp;
+                    ret = client->ops->process (client);
+
+                    /* special handler, client has moved away to another worker */
+                    if (client->flags & CLIENT_HAS_CHANGED_THREAD)
+                    {
+                        client->flags &= ~CLIENT_HAS_CHANGED_THREAD;
+                        client->flags |= CLIENT_ACTIVE;
+                        client = *prevp;
+                        continue;
+                    }
+                    if (ret < 0)
+                    {
+                        client_t *to_go = client;
+                        *prevp = to_go->next_on_worker;
+                        client->next_on_worker = NULL;
+                        client->worker = NULL;
+                        handler->count--;
+                        if (client->ops->release)
+                            client->ops->release (client);
+                        client = *prevp;
+                        continue;
+                    }
+                }
+                if (client->schedule_ms < handler->wakeup_ms)
+                    handler->wakeup_ms = client->schedule_ms;
+            }
+            prevp = &client->next_on_worker;
+            client = *prevp;
+        }
+        handler->wakeup_ms += 10;  /* allow a small sleep */
+        wakeup_time.tv_sec = handler->wakeup_ms/1000;
+        wakeup_time.tv_nsec = (handler->wakeup_ms%1000) *1000000;
+    }
+    thread_mutex_unlock (&handler->lock);
+    INFO0 ("shutting down");
+    return NULL;
+}
+
+
+static void worker_start (void)
+{
+    worker_t *handler = calloc (1, sizeof(worker_t));
+
+    thread_mutex_create (&handler->lock);
+    thread_cond_create (&handler->cond);
+    thread_rwlock_wlock (&workers_lock);
+    handler->next = workers;
+    workers = handler;
+    worker_count++;
+    handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
+    thread_rwlock_unlock (&workers_lock);
+}
+
+static void worker_stop (void)
+{
+    worker_t *handler = workers;
+    client_t *clients = NULL;
+
+    if (handler == NULL)
+        return;
+    thread_rwlock_wlock (&workers_lock);
+    workers = handler->next;
+    worker_count--;
+    thread_rwlock_unlock (&workers_lock);
+    handler->running = 0;
+
+    thread_mutex_lock (&handler->lock);
+    clients = handler->clients;
+    handler->clients = NULL;
+    handler->count = 0;
+    thread_cond_signal (&handler->cond);
+    thread_mutex_unlock (&handler->lock);
+    // move clients to another handler
+    if (worker_count > 1 && clients)
+    {
+        client_t *endp  = clients;
+        int count = 0;
+
+        thread_mutex_lock (&workers->lock);
+        while (endp->next_on_worker)
+        {
+            endp = endp->next_on_worker;
+            count++;
+        }
+        endp->next_on_worker = workers->clients;
+        workers->clients = clients;
+        workers->count += count;
+        thread_mutex_unlock (&workers->lock);
+    }
+    if (handler->count)
+        WARN1 ("%d clients left", handler->count);
+
+    thread_join (handler->thread);
+    thread_mutex_destroy (&handler->lock);
+    thread_cond_destroy (&handler->cond);
+    free (handler);
+}
+
+void workers_adjust (int new_count)
+{
+    while (worker_count != new_count)
+    {
+        if (worker_count < new_count)
+            worker_start ();
+        else
+            worker_stop ();
+    }
+}
+

Modified: icecast/branches/kh/icecast/src/client.h
===================================================================
--- icecast/branches/kh/icecast/src/client.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/client.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -19,20 +19,56 @@
 #define __CLIENT_H__
 
 typedef struct _client_tag client_t;
+typedef struct _worker_t worker_t;
 
 #include "cfgfile.h"
 #include "connection.h"
 #include "refbuf.h"
 #include "httpp/httpp.h"
+#include "compat.h"
+#include "thread/thread.h"
 
+struct _worker_t
+{
+    int running;
+    int count;
+    mutex_t lock;
+    cond_t cond;
+    client_t *clients;
+    client_t **current_p;
+    thread_type *thread;
+    struct timespec current_time;
+    uint64_t time_ms;
+    uint64_t wakeup_ms;
+    struct _worker_t *next;
+};
+
+
+extern worker_t *workers;
+extern int worker_count;
+extern rwlock_t workers_lock;
+
+struct _client_functions
+{
+    int  (*process)(struct _client_tag *client);
+    void (*release)(struct _client_tag *client);
+};
+
 struct _client_tag
 {
+    uint64_t schedule_ms;
+
     /* various states the client could be in */
     unsigned int flags;
 
     /* position in first buffer */
     unsigned int pos;
 
+    /* http response code for this client */
+    int respcode;
+
+    client_t *next_on_worker;
+
     /* the client's connection */
     connection_t *con;
     /* the client's http headers */
@@ -48,30 +84,36 @@
     refbuf_t *refbuf;
 
     /* byte count in queue */
-    unsigned int lag;
+    uint64_t queue_pos;
 
-    /* http response code for this client */
-    int respcode;
-
     /* Client username, if authenticated */
     char *username;
 
     /* Client password, if authenticated */
     char *password;
 
+    /* generic handle */
+    void *shared_data;
+
     /* Format-handler-specific data for this client */
     void *format_data;
 
+    /* the worker the client is attached to */
+    worker_t *worker;
+
+    /* for cases where we want to limit data rates */
+    struct rate_calc *out_bitrate;
+
     /* function to call to release format specific resources */
     void (*free_client_data)(struct _client_tag *client);
 
-    /* write out data associated with client */
-    int (*write_to_client)(struct _client_tag *client);
-
     /* function to check if refbuf needs updating */
-    int (*check_buffer)(struct source_tag *source, struct _client_tag *client);
+    int (*check_buffer)(struct _client_tag *client);
 
-    client_t *next;
+    /* functions to process client */
+    struct _client_functions *ops;
+
+    client_t *next;  /* for use with grouping similar clients */
 };
 
 client_t *client_create (connection_t *con, http_parser_t *parser);
@@ -88,9 +130,18 @@
 int  client_read_bytes (client_t *client, void *buf, unsigned len);
 void client_set_queue (client_t *client, refbuf_t *refbuf);
 
+void client_change_worker (client_t *client, worker_t *dest_worker);
+void client_add_worker (client_t *client);
+worker_t *find_least_busy_handler (void);
+void workers_adjust (int new_count);
+
+
 /* client flags bitmask */
+#define CLIENT_ACTIVE               (001)
 #define CLIENT_AUTHENTICATED        (002)
 #define CLIENT_IS_SLAVE             (004)
+#define CLIENT_HAS_CHANGED_THREAD   (010)
+#define CLIENT_NO_CONTENT_LENGTH    (020)
 #define CLIENT_FORMAT_BIT           (01000)
 
 #endif  /* __CLIENT_H__ */

Modified: icecast/branches/kh/icecast/src/connection.c
===================================================================
--- icecast/branches/kh/icecast/src/connection.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/connection.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -78,13 +78,11 @@
    Icecast auth style uses HTTP and Basic Authorization.
 */
 
-typedef struct client_queue_tag {
-    client_t *client;
-    int offset;
-    int stream_offset;
-    int shoutcast;
-    struct client_queue_tag *next;
-} client_queue_t;
+static int  shoutcast_source_client (client_t *client);
+static int  http_client_request (client_t *client);
+static int  _handle_get_request (client_t *client);
+static int  _handle_source_request (client_t *client);
+static int  _handle_stats_request (client_t *client);
 
 
 typedef struct
@@ -100,13 +98,42 @@
 static volatile unsigned long _current_id = 0;
 static volatile thread_type *conn_tid;
 
-static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
-static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
 static int ssl_ok;
 #ifdef HAVE_OPENSSL
 static SSL_CTX *ssl_ctx;
 #endif
 
+int header_timeout;
+
+struct _client_functions shoutcast_source_ops =
+{
+    shoutcast_source_client,
+    client_destroy
+};
+
+struct _client_functions http_request_ops =
+{
+    http_client_request,
+    client_destroy
+};
+
+struct _client_functions http_req_get_ops =
+{
+    _handle_get_request,
+    client_destroy
+};
+struct _client_functions http_req_source_ops =
+{
+    _handle_source_request,
+    client_destroy
+};
+
+struct _client_functions http_req_stats_ops =
+{
+    _handle_stats_request,
+    client_destroy
+};
+
 /* filtering client connection based on IP */
 cache_file_contents banned_ip, allowed_ip;
 /* filtering listener connection based on useragent */
@@ -114,7 +141,6 @@
 
 int connection_running = 0;
 
-static void _handle_connection(void);
 
 static int compare_line (void *arg, void *a, void *b)
 {
@@ -150,11 +176,6 @@
 void connection_initialize(void)
 {
     thread_spin_create (&_connection_lock);
-    thread_mutex_create(&move_clients_mutex);
-    _req_queue = NULL;
-    _req_queue_tail = &_req_queue;
-    _con_queue = NULL;
-    _con_queue_tail = &_con_queue;
 
     banned_ip.contents = NULL;
     banned_ip.file_mtime = 0;
@@ -177,7 +198,6 @@
     if (useragents.contents) avl_tree_free (useragents.contents, free_filtered_line);
 
     thread_spin_destroy (&_connection_lock);
-    thread_mutex_destroy(&move_clients_mutex);
 }
 
 static unsigned long _next_connection_id(void)
@@ -549,163 +569,204 @@
 }
 
 
-/* add client to connection queue. At this point some header information
- * has been collected, so we now pass it onto the connection thread for
- * further processing
+/* shoutcast source clients are handled specially because the protocol is limited. It is
+ * essentially a password followed by a series of headers, each on a separate line.  In here
+ * we get the password and build a http request like a native source client would do
  */
-static void _add_connection (client_queue_t *node)
+static int shoutcast_source_client (client_t *client)
 {
-    *_con_queue_tail = node;
-    _con_queue_tail = (volatile client_queue_t **)&node->next;
-}
+    do
+    {
+        if (client->con->error || client->con->discon_time <= client->worker->current_time.tv_sec)
+            break;
 
+        if (client->shared_data)  /* need to get password first */
+        {
+            refbuf_t *refbuf = client->shared_data;
+            int remaining = PER_CLIENT_REFBUF_SIZE - 2 - refbuf->len, ret, len;
+            char *buf = refbuf->data + refbuf->len;
+            char *esc_header;
+            refbuf_t *r, *resp;
+            char header [128];
 
-/* this returns queued clients for the connection thread. headers are
- * already provided, but need to be parsed.
- */
-static client_queue_t *_get_connection(void)
-{
-    client_queue_t *node = NULL;
+            if (remaining == 0)
+                break;
 
-    /* common case, no new connections so don't bother taking locks */
-    if (_con_queue)
-    {
-        node = (client_queue_t *)_con_queue;
-        _con_queue = node->next;
-        if (_con_queue == NULL)
-            _con_queue_tail = &_con_queue;
-        node->next = NULL;
-    }
-    return node;
+            ret = client_read_bytes (client, buf, remaining);
+            if (ret == 0 || client->con->error)
+                break;
+            if (ret < 0)
+                return 0;
+
+            buf [ret] = '\0';
+            len = strcspn (refbuf->data, "\r\n");
+            if (refbuf->data [len] == '\0')  /* no EOL yet */
+                return 0;
+
+            refbuf->data [len] = '\0';
+            snprintf (header, sizeof(header), "source:%s", refbuf->data);
+            esc_header = util_base64_encode (header);
+
+            len += 1 + strspn (refbuf->data+len+1, "\r\n");
+            r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+            snprintf (r->data, PER_CLIENT_REFBUF_SIZE,
+                    "SOURCE %s HTTP/1.0\r\n" "Authorization: Basic %s\r\n%s",
+                    client->server_conn->shoutcast_mount, esc_header, refbuf->data+len);
+            r->len = strlen (r->data);
+            free (esc_header);
+            client->respcode = 200;
+            resp = refbuf_new (30);
+            snprintf (resp->data, 30, "OK2\r\nicy-caps:11\r\n\r\n");
+            resp->len = strlen (resp->data);
+            resp->associated = r;
+            client->refbuf = resp;
+            refbuf_release (refbuf);
+            client->shared_data = NULL;
+            INFO1 ("emulation on %s", client->server_conn->shoutcast_mount);
+        }
+        format_generic_write_to_client (client);
+        if (client->pos == client->refbuf->len)
+        {
+            refbuf_t *r = client->refbuf;
+            client->shared_data = r->associated;
+            client->refbuf = NULL;
+            r->associated = NULL;
+            refbuf_release (r);
+            client->ops = &http_request_ops;
+            client->pos = 0;
+        }
+        client->schedule_ms = client->worker->time_ms + 100;
+        return 0;
+    } while (0);
+
+    refbuf_release (client->shared_data);
+    client->shared_data = NULL;
+    return -1;
 }
 
 
-/* run along queue checking for any data that has come in or a timeout */
-static void process_request_queue (void)
+static int http_client_request (client_t *client)
 {
-    client_queue_t **node_ref = (client_queue_t **)&_req_queue;
-    ice_config_t *config = config_get_config ();
-    int timeout = config->header_timeout;
-    config_release_config();
+    refbuf_t *refbuf = client->shared_data;
+    int remaining = PER_CLIENT_REFBUF_SIZE - 1 - refbuf->len, ret = -1;
 
-    while (*node_ref)
+    if (remaining && client->con->discon_time > client->worker->current_time.tv_sec)
     {
-        client_queue_t *node = *node_ref;
-        client_t *client = node->client;
-        int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
-        char *buf = client->refbuf->data + node->offset;
+        char *buf = refbuf->data + refbuf->len;
 
-        if (len > 0)
+        ret = client_read_bytes (client, buf, remaining);
+        if (ret > 0)
         {
-            if (client->con->con_time + timeout <= now)
-                len = 0;
-            else
-                len = client_read_bytes (client, buf, len);
-        }
-
-        if (len > 0)
-        {
-            int pass_it = 1;
             char *ptr;
 
-            /* handle \n, \r\n and nsvcap which for some strange reason has
-             * EOL as \r\r\n */
-            node->offset += len;
-            client->refbuf->data [node->offset] = '\000';
+            buf [ret] = '\0';
+            refbuf->len += ret;
+            if (memcmp (refbuf->data, "<policy-file-request/>", 23) == 0)
+            {
+                fbinfo fb;
+                fb.mount = "/flashpolicy";
+                fb.flags = FS_NORMAL|FS_USE_ADMIN;
+                fb.fallback = NULL;
+                fb.limit = 0;
+                refbuf_release (refbuf);
+                client->shared_data = NULL;
+                client->check_buffer = format_generic_write_to_client;
+                fserve_setup_client_fb (client, &fb);
+                return 1;
+            }
+            /* find a blank line */
             do
             {
-                /* ugly hack for flash policy files */
-                if (node->offset == 23 && memcmp (client->refbuf->data, "<policy-file-request/>", 23) == 0)
-                    break;
-
-                if (node->shoutcast == 1)
-                {
-                    /* password line */
-                    if (strstr (client->refbuf->data, "\r\r\n") != NULL)
-                        break;
-                    if (strstr (client->refbuf->data, "\r\n") != NULL)
-                        break;
-                    if (strstr (client->refbuf->data, "\n") != NULL)
-                        break;
-                }
-                /* stream_offset refers to the start of any data sent after the
-                 * http style headers, we don't want to lose those */
-                ptr = strstr (client->refbuf->data, "\r\r\n\r\r\n");
+                buf = refbuf->data;
+                ptr = strstr (buf, "\r\n\r\n");
                 if (ptr)
                 {
-                    node->stream_offset = (ptr+6) - client->refbuf->data;
+                    ptr += 4;
                     break;
                 }
-                ptr = strstr (client->refbuf->data, "\r\n\r\n");
+                ptr = strstr (buf, "\n\n");
                 if (ptr)
                 {
-                    node->stream_offset = (ptr+4) - client->refbuf->data;
+                    ptr += 2;
                     break;
                 }
-                ptr = strstr (client->refbuf->data, "\n\n");
+                ptr = strstr (buf, "\r\r\n\r\r\n");
                 if (ptr)
                 {
-                    node->stream_offset = (ptr+2) - client->refbuf->data;
+                    ptr += 6;
                     break;
                 }
-                pass_it = 0;
+                client->schedule_ms = client->worker->time_ms + 100;
+                return 0;
             } while (0);
-
-            if (pass_it)
+            client->refbuf = client->shared_data;
+            client->shared_data = NULL;
+            client->con->discon_time = 0;
+            client->parser = httpp_create_parser();
+            httpp_initialize (client->parser, NULL);
+            if (httpp_parse (client->parser, refbuf->data, refbuf->len))
             {
-                if ((client_queue_t **)_req_queue_tail == &(node->next))
-                    _req_queue_tail = (volatile client_queue_t **)node_ref;
-                *node_ref = node->next;
-                node->next = NULL;
-                _add_connection (node);
-                continue;
+                recheck_cached_file (&useragents);
+                if (useragents.contents)
+                {
+                    const char *agent = httpp_getvar (client->parser, "user-agent");
+                    void *result;
+
+                    if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0)
+                    {
+                        INFO1 ("dropping client because useragent is %s", agent);
+                        return -1;
+                    }
+                }
+
+                /* headers now parsed, make sure any sent content is next */
+                if (strcmp("ICE",  httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL)) &&
+                        strcmp("HTTP", httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL)))
+                {
+                    ERROR0("Bad HTTP protocol detected");
+                    return -1;
+                }
+                client->schedule_ms = client->worker->time_ms + 20;
+                auth_check_http (client);
+                switch (client->parser->req_type)
+                {
+                    case httpp_req_get:
+                        refbuf->len = PER_CLIENT_REFBUF_SIZE;
+                        client->ops = &http_req_get_ops;
+                        break;
+                    case httpp_req_source:
+                        client->pos = ptr - refbuf->data;
+                        client->ops = &http_req_source_ops;
+                        break;
+                    case httpp_req_stats:
+                        refbuf->len = PER_CLIENT_REFBUF_SIZE;
+                        client->ops = &http_req_stats_ops;
+                        break;
+                    default:
+                        WARN0("unhandled request type from client");
+                        client_send_400 (client, "unknown request");
+                }
+                return 1;
             }
+            /* invalid http request */
+            return -1;
         }
-        else
+        if (ret && client->con->error == 0)
         {
-            if (len == 0 || client->con->error)
-            {
-                if ((client_queue_t **)_req_queue_tail == &node->next)
-                    _req_queue_tail = (volatile client_queue_t **)node_ref;
-                *node_ref = node->next;
-                client_destroy (client);
-                free (node);
-                continue;
-            }
+            client->schedule_ms = client->worker->time_ms + 100;
+            return 0;
         }
-        node_ref = &node->next;
     }
-    _handle_connection();
+    refbuf_release (refbuf);
+    client->shared_data = NULL;
+    return -1;
 }
 
 
-/* add node to the queue of requests. This is where the clients are when
- * initial http details are read.
- */
-static void _add_request_queue (client_queue_t *node)
-{
-    *_req_queue_tail = node;
-    _req_queue_tail = (volatile client_queue_t **)&node->next;
-}
-
-
-static client_queue_t *_create_req_node (client_t *client)
-{
-    client_queue_t *node = calloc (1, sizeof (client_queue_t));
-    node->client = client;
-
-    if (client->server_conn->shoutcast_compat)
-        node->shoutcast = 1;
-
-    return node;
-}
-
-
 void *connection_thread (void *arg)
 {
     connection_t *con;
     ice_config_t *config;
-    int duration = 300;
 
     connection_running = 1;
     INFO0 ("connection thread started");
@@ -714,16 +775,17 @@
     get_ssl_certificate (config);
     if (config->chuid == 0)
         connection_setup_sockets (config);
+    header_timeout = config->header_timeout;
     config_release_config ();
 
     while (connection_running)
     {
-        con = _accept_connection (duration);
+        con = _accept_connection (333);
 
         if (con)
         {
-            client_queue_t *node;
             client_t *client = NULL;
+            refbuf_t *r;
 
             global_lock();
             client = client_create (con, NULL);
@@ -732,9 +794,6 @@
             if (client->server_conn->ssl && ssl_ok)
                 connection_uses_ssl (client->con);
 
-            /* setup client for reading incoming http */
-            client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
-
             if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock))
             {
                 WARN0 ("failed to set tcp options on client connection, dropping");
@@ -742,25 +801,23 @@
                 continue;
             }
 
-            node = _create_req_node (client);
-            if (node == NULL)
-            {
-                client_destroy (client);
-                continue;
-            }
-            _add_request_queue (node);
+            if (client->server_conn->shoutcast_compat)
+                client->ops = &shoutcast_source_ops;
+            else
+                client->ops = &http_request_ops;
+            r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+            r->len = 0;
+            client->shared_data = r;
+            client->flags |= CLIENT_ACTIVE;
+            client->con->discon_time = time(NULL) + header_timeout;
+
+            /* do a small delay here so the client has chance to send the request after
+             * getting a connect. This also prevents excessively large number of new
+             * listeners from joining at the same time */
+            thread_sleep (3000);
+            client_add_worker (client);
             stats_event_inc (NULL, "connections");
-            if (duration > 5)
-            {
-                duration = 5;
-            }
         }
-        else
-        {
-            if (_req_queue == NULL)
-                duration = 300; /* use longer timeouts when nothing waiting */
-        }
-        process_request_queue ();
     }
 #ifdef HAVE_OPENSSL
     SSL_CTX_free (ssl_ctx);
@@ -851,12 +908,9 @@
         stats_event_args (NULL, "sources", "%d", global.sources);
         global_unlock();
 
-        source->running = 1;
         mountinfo = config_find_mount (config, source->mount);
-        thread_mutex_lock (&source->lock);
         source_update_settings (config, source, mountinfo);
         INFO1 ("source %s is ready to start", source->mount);
-        thread_mutex_unlock (&source->lock);
         config_release_config();
         slave_rebuild_mounts();
 
@@ -1018,42 +1072,42 @@
 }
 
 
-static void _handle_source_request (client_t *client, const char *uri)
+static int _handle_source_request (client_t *client)
 {
+    const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
+
     INFO1("Source logging in at mountpoint \"%s\"", uri);
     if (uri[0] != '/')
     {
         WARN0 ("source mountpoint not starting with /");
         client_send_401 (client, NULL);
-        return;
+        return 1;
     }
     switch (auth_check_source (client, uri))
     {
-        case 0: /* authenticated from config file */
-            source_startup (client, uri);
+        case 0:         /* authenticated from config file */
+            return source_startup (client, uri);
+        case 1:         /* auth pending */
             break;
-
-        case 1: /* auth pending */
-            break;
-
-        default: /* failed */
+        default:        /* failed */
             INFO1("Source (%s) attempted to login with invalid or missing password", uri);
             client_send_401 (client, NULL);
-            break;
     }
+    return 0;
 }
 
 
-static void _handle_stats_request (client_t *client, char *uri)
+static int _handle_stats_request (client_t *client)
 {
+    const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
+
     if (connection_check_admin_pass (client->parser) == 0)
     {
         auth_add_listener (uri, client);
-        return;
+        return 0;
     }
-
-    client->flags |= CLIENT_AUTHENTICATED;
     stats_add_listener (client, STATS_ALL);
+    return 1;
 }
 
 static void check_for_filtering (ice_config_t *config, client_t *client)
@@ -1081,17 +1135,22 @@
 }
 
 
-static void _handle_get_request (client_t *client, char *passed_uri)
+static int _handle_get_request (client_t *client)
 {
     int port;
     char *serverhost = NULL;
     int serverport = 0;
     aliases *alias;
     ice_config_t *config;
-    char *uri = passed_uri;
+    char *uri = util_normalise_uri (httpp_getvar (client->parser, HTTPP_VAR_URI));
     int client_limit_reached = 0;
 
-    DEBUG1 ("start with %s", passed_uri);
+    if (uri == NULL)
+    {
+        client_send_400 (client, "invalid request URI");
+        return 0;
+    }
+    DEBUG1 ("start with %s", uri);
     config = config_get_config();
     check_for_filtering (config, client);
     port = config->port;
@@ -1115,8 +1174,10 @@
     /* Handle aliases */
     while(alias) {
         if(strcmp(uri, alias->source) == 0 && (alias->port == -1 || alias->port == serverport) && (alias->bind_address == NULL || (serverhost != NULL && strcmp(alias->bind_address, serverhost) == 0))) {
-            uri = strdup (alias->destination);
-            DEBUG2 ("alias has made %s into %s", passed_uri, uri);
+            char *newuri = strdup (alias->destination);
+            DEBUG2 ("alias has made %s into %s", uri, newuri);
+            free (uri);
+            uri = newuri;
             break;
         }
         alias = alias->next;
@@ -1133,165 +1194,19 @@
     /* Dispatch all admin requests */
     if (admin_handle_request (client, uri) == 0)
     {
-        if (uri != passed_uri) free (uri);
-        return;
+        free (uri);
+        return 0;
     }
     /* drop non-admin GET requests here if clients limit reached */
     if (client_limit_reached)
         client_send_403 (client, "Too many clients connected");
     else
         auth_add_listener (uri, client);
-    if (uri != passed_uri) free (uri);
+    free (uri);
+    return 0;
 }
 
-static void _handle_shoutcast_compatible (client_queue_t *node)
-{
-    ice_config_t *config = config_get_config ();
-    client_t *client = node->client;
-    listener_t *server_conn = client->server_conn;
 
-    char *user = "source", *ptr, *headers, *esc_header;
-    mount_proxy *mountinfo = config_find_mount (config, server_conn->shoutcast_mount);
-    refbuf_t *r;
-    int len;
-    char auth [256];
-
-    if (mountinfo && mountinfo->username)
-        user = mountinfo->username;
-
-    ptr = client->refbuf->data;
-    len = strcspn (ptr, "\r\n");
-    snprintf (auth, sizeof auth, "%s:%.*s", user, len, ptr);
-    config_release_config();
-    ptr += len;
-    headers = ptr + strspn (ptr, "\r\n");
-
-    esc_header = util_base64_encode (auth);
-    sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n");
-    /* build a buffer in a way that an icecast2 source client would present as */
-    r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-    snprintf (r->data, PER_CLIENT_REFBUF_SIZE,
-            "SOURCE %s HTTP/1.0\r\n" "Authorization: Basic %s\r\n%s",
-            server_conn->shoutcast_mount, esc_header, headers);
-    free (esc_header);
-    refbuf_release (client->refbuf);
-    client->refbuf = r;
-    r->len = 0;
-    node->shoutcast = 0;
-    node->offset = strlen (r->data);
-    _add_request_queue (node);
-}
-
-
-/* Connection thread. Here we take clients off the connection queue and check
- * the contents provided. We set up the parser then hand off to the specific
- * request handler.
- */
-static void _handle_connection (void)
-{
-    http_parser_t *parser;
-    const char *rawuri;
-    client_queue_t *node;
-
-    while ((node = _get_connection()))
-    {
-
-        if (node)
-        {
-            client_t *client = node->client;
-
-            if (node->offset == 23 && memcmp (client->refbuf->data, "<policy-file-request/>", 23) == 0)
-            {
-                client->respcode = 200;
-                fserve_client_create (client, "/flashpolicy");
-                free (node);
-                continue;
-            }
-            /* Check for special shoutcast compatability processing */
-            if (node->shoutcast)
-            {
-                _handle_shoutcast_compatible (node);
-                continue;
-            }
-
-            /* process normal HTTP headers */
-            parser = httpp_create_parser();
-            httpp_initialize (parser, NULL);
-            client->parser = parser;
-            if (httpp_parse (parser, client->refbuf->data, node->offset))
-            {
-                char *uri;
-
-                recheck_cached_file (&useragents);
-                if (useragents.contents)
-                {
-                    const char *agent = httpp_getvar (parser, "user-agent");
-                    void *result;
-
-                    if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0)
-                    {
-                        DEBUG1 ("dropping client because useragent is %s", agent);
-                        free (node);
-                        client_destroy (client);
-                        continue;
-                    }
-                }
-                /* we may have more than just headers, so prepare for it */
-                if (node->stream_offset == node->offset)
-                    client->refbuf->len = 0;
-                else
-                {
-                    char *ptr = client->refbuf->data;
-                    client->refbuf->len = node->offset - node->stream_offset;
-                    memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
-                }
-
-                rawuri = httpp_getvar (parser, HTTPP_VAR_URI);
-
-                free (node);
-                
-                if (strcmp("ICE",  httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) &&
-                    strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) {
-                    ERROR0("Bad HTTP protocol detected");
-                    client_destroy (client);
-                    continue;
-                }
-
-                uri = util_normalise_uri(rawuri);
-
-                if (uri == NULL)
-                {
-                    client_destroy (client);
-                    continue;
-                }
-                auth_check_http (client);
-
-                if (parser->req_type == httpp_req_source) {
-                    _handle_source_request (client, uri);
-                }
-                else if (parser->req_type == httpp_req_stats) {
-                    _handle_stats_request (client, uri);
-                }
-                else if (parser->req_type == httpp_req_get) {
-                    _handle_get_request (client, uri);
-                }
-                else {
-                    ERROR0("Wrong request type from client");
-                    client_send_400 (client, "unknown request");
-                }
-
-                free(uri);
-            } 
-            else
-            {
-                free (node);
-                ERROR0("HTTP request parsing failed");
-                client_destroy (client);
-            }
-        }
-    }
-}
-
 /* close any open listening sockets and reopen new listener sockets based on the settings
  * in the configuration.
  */

Modified: icecast/branches/kh/icecast/src/event.c
===================================================================
--- icecast/branches/kh/icecast/src/event.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/event.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -67,6 +67,7 @@
         yp_recheck_config (config);
         fserve_recheck_mime_types (config);
         stats_global (config);
+        workers_adjust (config->workers_count);
         config_release_config();
         connection_thread_shutdown();
         slave_restart();

Modified: icecast/branches/kh/icecast/src/format.c
===================================================================
--- icecast/branches/kh/icecast/src/format.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -134,30 +134,6 @@
 }
 
 
-/* This is the commonly used for source streams, here we just progress to
- * the next buffer in the queue if there is no more left to be written from 
- * the existing buffer.
- */
-int format_advance_queue (source_t *source, client_t *client)
-{
-    refbuf_t *refbuf = client->refbuf;
-
-    if (refbuf == NULL)
-        return -1;
-
-    if (refbuf->next == NULL && client->pos == refbuf->len)
-        return -1;
-
-    /* move to the next buffer if we have finished with the current one */
-    if (refbuf->next && client->pos == refbuf->len)
-    {
-        client_set_queue (client, refbuf->next);
-        refbuf = client->refbuf;
-    }
-    return 0;
-}
-
-
 int format_prepare_headers (source_t *source, client_t *client)
 {
     unsigned remaining;

Modified: icecast/branches/kh/icecast/src/format.h
===================================================================
--- icecast/branches/kh/icecast/src/format.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -62,7 +62,6 @@
 int format_get_plugin(format_type_t type, struct source_tag *source);
 
 int format_generic_write_to_client (client_t *client);
-int format_advance_queue (struct source_tag *source, client_t *client);
 
 int format_file_read (client_t *client, FILE *fp);
 int format_prepare_headers (struct source_tag *source, client_t *client);

Modified: icecast/branches/kh/icecast/src/format_mp3.c
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format_mp3.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -68,8 +68,11 @@
 
 /* client format flags */
 #define CLIENT_IN_METADATA          CLIENT_FORMAT_BIT
+#define CLIENT_USING_BLANK_META     (CLIENT_FORMAT_BIT<<1)
 
+static refbuf_t blank_meta = { 17, 1, "\001StreamTitle='';", NULL, NULL, 0 };
 
+
 int format_mp3_get_plugin (source_t *source)
 {
     const char *metadata;
@@ -338,19 +341,27 @@
     {
         metadata = associated->data + client_mp3->metadata_offset;
         meta_len = associated->len - client_mp3->metadata_offset;
+        client->flags &= ~CLIENT_USING_BLANK_META;
+        refbuf_release (client_mp3->associated);
+        refbuf_addref (associated);
+        client_mp3->associated = associated;
     }
     else
     {
-        if (associated)
+        if (associated || ((client->flags & CLIENT_USING_BLANK_META) &&
+                    client_mp3->metadata_offset == 0))
         {
             metadata = "\0";
             meta_len = 1;
         }
         else
         {
-            char *meta = "\001StreamTitle='';";
+            char *meta = blank_meta.data;
             metadata = meta + client_mp3->metadata_offset;
-            meta_len = 17 - client_mp3->metadata_offset;
+            meta_len = blank_meta.len - client_mp3->metadata_offset;
+            client->flags |= CLIENT_USING_BLANK_META;
+            refbuf_release (client_mp3->associated);
+            client_mp3->associated = NULL;
         }
     }
     /* if there is normal stream data to send as well as metadata then try
@@ -371,19 +382,19 @@
             {
                 client_mp3->metadata_offset += (ret - remaining);
                 client->flags |= CLIENT_IN_METADATA;
+                client->schedule_ms += 100;
             }
-            else
-                client_mp3->associated = associated;
             client_mp3->since_meta_block = 0;
             client->pos += remaining;
-            client->lag -= remaining;
+            client->queue_pos += remaining;
             return ret;
         }
+        client->schedule_ms += 100;
         if (ret > 0)
         {
             client_mp3->since_meta_block += ret;
             client->pos += ret;
-            client->lag -= ret;
+            client->queue_pos += ret;
         }
         return ret > 0 ? ret : 0;
     }
@@ -391,7 +402,6 @@
 
     if (ret == meta_len)
     {
-        client_mp3->associated = associated;
         client_mp3->metadata_offset = 0;
         client->flags &= ~CLIENT_IN_METADATA;
         client_mp3->since_meta_block = 0;
@@ -399,6 +409,7 @@
     }
     if (ret > 0)
         client_mp3->metadata_offset += ret;
+    client->schedule_ms += 100;
     client->flags |= CLIENT_IN_METADATA;
 
     return ret > 0 ? ret : 0;
@@ -456,7 +467,7 @@
             {
                 client_mp3->since_meta_block += ret;
                 client->pos += ret;
-                client->lag -= ret;
+                client->queue_pos += ret;
             }
             if (ret < (int)len)
                 break;
@@ -498,6 +509,7 @@
     mp3_state *source_mp3 = format->_state;
     char *buf;
     refbuf_t *refbuf;
+    client_t *client = source->client;
 
     if (source_mp3->read_data == NULL)
     {
@@ -507,10 +519,13 @@
     buf = source_mp3->read_data->data + source_mp3->read_count;
     if (source_mp3->read_count < source_mp3->queue_block_size)
     {
-        bytes = client_read_bytes (source->client, buf, source_mp3->queue_block_size-source_mp3->read_count);
+        int read_in = source_mp3->queue_block_size - source_mp3->read_count;
+        bytes = client_read_bytes (client, buf, read_in);
+        if (bytes < read_in)
+            client->schedule_ms = client->worker->time_ms + source->skip_duration;
         if (bytes < 0)
             return 0;
-        rate_add (format->in_bitrate, bytes, time(NULL));
+        rate_add (format->in_bitrate, bytes, client->worker->current_time.tv_sec);
     }
     source_mp3->read_count += bytes;
     refbuf = source_mp3->read_data;
@@ -535,6 +550,7 @@
     refbuf = source_mp3->read_data;
     source_mp3->read_data = NULL;
 
+    source->client->queue_pos += refbuf->len;
     if (source_mp3->update_metadata)
     {
         mp3_set_title (source);
@@ -650,7 +666,7 @@
             else
             {
                 ERROR0 ("Incorrect metadata format, ending stream");
-                source->running = 0;
+                source->flags &= ~SOURCE_RUNNING;
                 refbuf_release (refbuf);
                 refbuf_release (meta);
                 return NULL;
@@ -665,6 +681,7 @@
         refbuf_release (refbuf);
         return NULL;
     }
+    source->client->queue_pos += refbuf->len;
     refbuf->associated = source_mp3->metadata;
     refbuf_addref (source_mp3->metadata);
     refbuf->flags |= SOURCE_BLOCK_SYNC;

Modified: icecast/branches/kh/icecast/src/format_ogg.c
===================================================================
--- icecast/branches/kh/icecast/src/format_ogg.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format_ogg.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -376,6 +376,7 @@
      * marking starting points */
     if (ogg_info->codec_sync == NULL)
         refbuf->flags |= SOURCE_BLOCK_SYNC;
+    source->client->queue_pos += refbuf->len;
     return refbuf;
 }
 
@@ -447,7 +448,7 @@
                 if (ogg_info->error)
                 {
                     ERROR0 ("Problem processing stream");
-                    source->running = 0;
+                    source->flags &= ~SOURCE_RUNNING;
                     return NULL;
                 }
                 if (refbuf)
@@ -461,13 +462,15 @@
         data = ogg_sync_buffer (&ogg_info->oy, 4096);
 
         bytes = client_read_bytes (source->client, data, 4096);
+        if (bytes < 4096)
+            source->client->schedule_ms = source->client->worker->time_ms + source->skip_duration;
         if (bytes <= 0)
         {
             ogg_sync_wrote (&ogg_info->oy, 0);
             return NULL;
         }
         format->read_bytes += bytes;
-        rate_add (format->in_bitrate, bytes, time(NULL));
+        rate_add (format->in_bitrate, bytes, source->client->worker->current_time.tv_sec);
         ogg_sync_wrote (&ogg_info->oy, bytes);
     }
 }
@@ -562,7 +565,7 @@
         if (ret > 0)
         {
             client->pos += ret;
-            client->lag -= ret;
+            client->queue_pos += ret;
         }
 
         if (ret < (int)len)

Modified: icecast/branches/kh/icecast/src/fserve.c
===================================================================
--- icecast/branches/kh/icecast/src/fserve.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/fserve.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -62,274 +62,68 @@
 
 #define BUFSIZE 4096
 
-static fserve_t *active_list = NULL;
-static volatile fserve_t *pending_list = NULL;
-
 static spin_t pending_lock;
 static avl_tree *mimetypes = NULL;
+static avl_tree *fh_cache = NULL;
 
-static volatile int run_fserv = 0;
-static unsigned int fserve_clients;
-static int client_tree_changed=0;
-
-#ifdef HAVE_POLL
-static struct pollfd *ufds = NULL;
-#else
-static fd_set fds;
-static sock_t fd_max = SOCK_ERROR;
-#endif
-
 typedef struct {
     char *ext;
     char *type;
 } mime_type;
 
-static void fserve_client_destroy(fserve_t *fclient);
+typedef struct {
+    fbinfo finfo;
+    mutex_t lock;
+    int refcount;
+    FILE *fp;
+} fh_node;
+
+int fserve_running;
+
 static int _delete_mapping(void *mapping);
-static void *fserv_thread_function(void *arg);
-static int fserve_add_client_mount (client_t *client, const char *mount, FILE *file);
+static int prefile_send (client_t *client);
+static int file_send (client_t *client);
+static int _compare_fh(void *arg, void *a, void *b);
+static int _delete_fh (void *mapping);
 
 void fserve_initialize(void)
 {
     ice_config_t *config = config_get_config();
 
     mimetypes = NULL;
-    active_list = NULL;
-    pending_list = NULL;
     thread_spin_create (&pending_lock);
+    fh_cache = avl_tree_new (_compare_fh, NULL);
 
     fserve_recheck_mime_types (config);
     config_release_config();
 
     stats_event (NULL, "file_connections", "0");
+    fserve_running = 1;
     INFO0("file serving started");
 }
 
 void fserve_shutdown(void)
 {
-    thread_spin_lock (&pending_lock);
-    run_fserv = 0;
-    while (pending_list)
-    {
-        fserve_t *to_go = (fserve_t *)pending_list;
-        pending_list = to_go->next;
-
-        fserve_client_destroy (to_go);
-    }
-    while (active_list)
-    {
-        fserve_t *to_go = active_list;
-        active_list = to_go->next;
-        fserve_client_destroy (to_go);
-    }
-
+    fserve_running = 0;
     if (mimetypes)
         avl_tree_free (mimetypes, _delete_mapping);
-
-    thread_spin_unlock (&pending_lock);
-    thread_spin_destroy (&pending_lock);
-    INFO0("file serving stopped");
-}
-
-#ifdef HAVE_POLL
-int fserve_client_waiting (void)
-{
-    fserve_t *fclient;
-    unsigned int i = 0;
-
-    /* only rebuild ufds if there are clients added/removed */
-    if (client_tree_changed)
+    if (fh_cache)
     {
-        client_tree_changed = 0;
-        ufds = realloc(ufds, fserve_clients * sizeof(struct pollfd));
-        fclient = active_list;
-        while (fclient)
+        int count = 100;
+        while (fh_cache->length && count)
         {
-            ufds[i].fd = fclient->client->con->sock;
-            ufds[i].events = POLLOUT;
-            ufds[i].revents = 0;
-            fclient = fclient->next;
-            i++;
+            DEBUG1 ("waiting for %u entries to clear", fh_cache->length);
+            thread_sleep (20000);
+            count--;
         }
+        avl_tree_free (fh_cache, _delete_fh);
     }
-    if (!ufds)
-    {
-        thread_spin_lock (&pending_lock);
-        run_fserv = 0;
-        thread_spin_unlock (&pending_lock);
-        return -1;
-    }
-    else if (poll(ufds, fserve_clients, 200) > 0)
-    {
-        /* mark any clients that are ready */
-        fclient = active_list;
-        for (i=0; i<fserve_clients; i++)
-        {
-            if (ufds[i].revents & (POLLOUT|POLLHUP|POLLERR))
-                fclient->ready = 1;
-            fclient = fclient->next;
-        }
-        return 1;
-    }
-    return 0;
-}
-#else
-int fserve_client_waiting (void)
-{
-    fserve_t *fclient;
-    fd_set realfds;
 
-    /* only rebuild fds if there are clients added/removed */
-    if(client_tree_changed) {
-        client_tree_changed = 0;
-        FD_ZERO(&fds);
-        fd_max = SOCK_ERROR;
-        fclient = active_list;
-        while (fclient) {
-            FD_SET (fclient->client->con->sock, &fds);
-            if (fclient->client->con->sock > fd_max || fd_max == SOCK_ERROR)
-                fd_max = fclient->client->con->sock;
-            fclient = fclient->next;
-        }
-    }
-    /* hack for windows, select needs at least 1 descriptor */
-    if (fd_max == SOCK_ERROR)
-    {
-        thread_spin_lock (&pending_lock);
-        run_fserv = 0;
-        thread_spin_unlock (&pending_lock);
-        return -1;
-    }
-    else
-    {
-        struct timeval tv;
-        tv.tv_sec = 0;
-        tv.tv_usec = 200000;
-        /* make a duplicate of the set so we do not have to rebuild it
-         * each time around */
-        memcpy(&realfds, &fds, sizeof(fd_set));
-        if(select(fd_max+1, NULL, &realfds, NULL, &tv) > 0)
-        {
-            /* mark any clients that are ready */
-            fclient = active_list;
-            while (fclient)
-            {
-                if (FD_ISSET (fclient->client->con->sock, &realfds))
-                    fclient->ready = 1;
-                fclient = fclient->next;
-            }
-            return 1;
-        }
-    }
-    return 0;
+    thread_spin_destroy (&pending_lock);
+    INFO0("file serving stopped");
 }
-#endif
 
-static int wait_for_fds(void)
-{
-    fserve_t *fclient;
-    int ret;
 
-    while (run_fserv)
-    {
-        /* add any new clients here */
-        if (pending_list)
-        {
-            thread_spin_lock (&pending_lock);
-
-            fclient = (fserve_t*)pending_list;
-            while (fclient)
-            {
-                fserve_t *to_move = fclient;
-                fclient = fclient->next;
-                to_move->next = active_list;
-                active_list = to_move;
-                client_tree_changed = 1;
-                fserve_clients++;
-            }
-            pending_list = NULL;
-            thread_spin_unlock (&pending_lock);
-        }
-        /* drop out of here if someone is ready */
-        ret = fserve_client_waiting();
-        if (ret)
-            return ret;
-    }
-    return -1;
-}
-
-static void *fserv_thread_function(void *arg)
-{
-    fserve_t *fclient, **trail;
-    size_t bytes;
-
-    while (1)
-    {
-        if (wait_for_fds() < 0)
-            break;
-
-        fclient = active_list;
-        trail = &active_list;
-
-        while (fclient)
-        {
-            /* process this client, if it is ready */
-            if (fclient->ready)
-            {
-                client_t *client = fclient->client;
-                refbuf_t *refbuf = client->refbuf;
-                fclient->ready = 0;
-                if (client->pos == refbuf->len)
-                {
-                    /* Grab a new chunk */
-                    if (fclient->file)
-                        bytes = fread (refbuf->data, 1, BUFSIZE, fclient->file);
-                    else
-                        bytes = 0;
-                    if (bytes == 0)
-                    {
-                        if (refbuf->next == NULL)
-                        {
-                            fserve_t *to_go = fclient;
-                            fclient = fclient->next;
-                            *trail = fclient;
-                            fserve_client_destroy (to_go);
-                            fserve_clients--;
-                            client_tree_changed = 1;
-                            continue;
-                        }
-                        refbuf = refbuf->next;
-                        client->refbuf->next = NULL;
-                        refbuf_release (client->refbuf);
-                        client->refbuf = refbuf;
-                        bytes = refbuf->len;
-                    }
-                    refbuf->len = bytes;
-                    client->pos = 0;
-                }
-
-                /* Now try and send current chunk. */
-                format_generic_write_to_client (client);
-
-                if (client->con->error)
-                {
-                    fserve_t *to_go = fclient;
-                    fclient = fclient->next;
-                    *trail = fclient;
-                    fserve_clients--;
-                    fserve_client_destroy (to_go);
-                    client_tree_changed = 1;
-                    continue;
-                }
-            }
-            trail = &fclient->next;
-            fclient = fclient->next;
-        }
-    }
-    DEBUG0 ("fserve handler exit");
-    return NULL;
-}
-
 /* string returned needs to be free'd */
 char *fserve_content_type (const char *path)
 {
@@ -377,28 +171,73 @@
     return type;
 }
 
-static void fserve_client_destroy(fserve_t *fclient)
+static int _compare_fh(void *arg, void *a, void *b)
 {
-    if (fclient)
-    {
-        if (fclient->file)
-            fclose (fclient->file);
+    fh_node *x = a, *y = b;
+    int r = strcmp (x->finfo.mount, y->finfo.mount);
 
-        if (fclient->callback)
-            fclient->callback (fclient->client, fclient->arg);
-        else
-            if (fclient->client)
-            {
-                ice_config_t *config = config_get_config ();
-                mount_proxy *mountinfo = config_find_mount (config, fclient->mount);
+    if (r) return r;
+    r = (int)x->finfo.flags - y->finfo.flags;
+    if (r) return r;
+    if (x->finfo.flags & FS_JINGLE)
+        return strcmp (x->finfo.fallback, y->finfo.fallback);
+    return 0;
+}
 
-                fclient->client->flags &= ~CLIENT_AUTHENTICATED;
-                auth_release_listener (fclient->client, fclient->mount, mountinfo);
-                config_release_config();
-            }
-        free (fclient->mount);
-        free (fclient);
+static int _delete_fh (void *mapping)
+{
+    fh_node *fh = mapping;
+    if (fh->refcount)
+        WARN2 ("handle for %s has refcount %d", fh->finfo.mount, fh->refcount);
+    thread_mutex_destroy (&fh->lock);
+    if (fh->fp)
+        fclose (fh->fp);
+    free (fh->finfo.mount);
+    free (fh->finfo.fallback);
+    free (fh);
+
+    return 1;
+}
+
+/* find/create handle and return it with the structure in a locked state */
+static fh_node *open_fh (fbinfo *finfo)
+{
+    fh_node *fh, *result;
+
+    if (finfo->mount == NULL)
+        finfo->mount = "";
+    fh = calloc (1, sizeof (fh_node));
+    memcpy (&fh->finfo, finfo, sizeof (fbinfo));
+    avl_tree_wlock (fh_cache);
+    if (avl_get_by_key (fh_cache, fh, (void**)&result) == 0)
+    {
+        free (fh);
+        thread_mutex_lock (&result->lock);
+        result->refcount++;
+        DEBUG2 ("refcount now %d for %s", result->refcount, result->finfo.mount);
+        avl_tree_unlock (fh_cache);
+        return result;
     }
+
+    // insert new one
+    if (fh->finfo.mount[0])
+    {
+        char *fullpath= util_get_path_from_normalised_uri (fh->finfo.mount, fh->finfo.flags&FS_USE_ADMIN);
+        DEBUG1 ("lookup of \"%s\"", finfo->mount);
+        fh->fp = fopen (fullpath, "rb");
+        if (fh->fp == NULL)
+            WARN1 ("Failed to open \"%s\"", fullpath);
+        free (fullpath);
+    }
+    thread_mutex_create (&fh->lock);
+    thread_mutex_lock (&fh->lock);
+    fh->refcount = 1;
+    fh->finfo.mount = strdup (finfo->mount);
+    if (finfo->fallback)
+        fh->finfo.fallback = strdup (finfo->fallback);
+    avl_insert (fh_cache, fh);
+    avl_tree_unlock (fh_cache);
+    return fh;
 }
 
 
@@ -411,17 +250,15 @@
     const char *range = NULL;
     off_t new_content_len = 0;
     off_t rangenumber = 0, content_length;
-    int ret = 0, use_admin = 0;
+    int ret = 0;
     char *fullpath;
     int m3u_requested = 0, m3u_file_available = 1;
     int xspf_requested = 0, xspf_file_available = 1;
     ice_config_t *config;
-    FILE *file;
+    fh_node *fh;
+    fbinfo finfo;
 
-    if (httpclient->parser == NULL) /* special case for specific non-http content */
-        use_admin = 1;
-
-    fullpath = util_get_path_from_normalised_uri (path, use_admin);
+    fullpath = util_get_path_from_normalised_uri (path, 0);
     INFO2 ("checking for file %s (%s)", path, fullpath);
 
     if (strcmp (util_get_extension (fullpath), "m3u") == 0)
@@ -493,7 +330,7 @@
                     );
         }
         httpclient->refbuf->len = strlen (httpclient->refbuf->data);
-        fserve_add_client (httpclient, NULL);
+        fserve_setup_client_fb (httpclient, NULL);
         free (sourceuri);
         free (fullpath);
         return 0;
@@ -532,8 +369,13 @@
         return -1;
     }
 
-    file = fopen (fullpath, "rb");
-    if (file == NULL)
+    finfo.flags = FS_NORMAL;
+    finfo.mount = (char *)path;
+    finfo.fallback = NULL;
+    finfo.limit = 0;
+
+    fh = open_fh (&finfo);
+    if (fh == NULL)
     {
         WARN1 ("Problem accessing file \"%s\"", fullpath);
         client_send_404 (httpclient, "File not readable");
@@ -549,6 +391,7 @@
     {
         int bytes;
 
+        httpclient->intro_offset = 0;
         /* full http range handling is currently not done but we deal with the common case */
         if (range)
         {
@@ -566,10 +409,11 @@
                 off_t endpos;
                 char *type;
 
-                ret = fseeko (file, rangenumber, SEEK_SET);
+                ret = fseeko (fh->fp, rangenumber, SEEK_SET);
                 if (ret == -1)
                     break;
 
+                httpclient->intro_offset = rangenumber;
                 new_content_len = content_length - rangenumber;
                 endpos = rangenumber + new_content_len - 1;
                 if (endpos < 0)
@@ -600,11 +444,11 @@
             else
                 break;
         }
-        else if (httpclient->parser)
+        else
         {
             char *type = fserve_content_type (path);
             httpclient->respcode = 200;
-            if (httpp_getvar (httpclient->parser, HTTPP_VAR_NO_CONTENT_LENGTH))
+            if (httpclient->flags & CLIENT_NO_CONTENT_LENGTH)
                 bytes = snprintf (httpclient->refbuf->data, BUFSIZE,
                         "HTTP/1.0 200 OK\r\n"
                         "Content-Type: %s\r\n"
@@ -623,86 +467,276 @@
         }
         httpclient->refbuf->len = bytes;
         httpclient->pos = 0;
+        httpclient->shared_data = fh;
 
         stats_event_inc (NULL, "file_connections");
-        fserve_add_client_mount (httpclient, path, file);
+        thread_mutex_unlock (&fh->lock);
+        fserve_setup_client_fb (httpclient, NULL);
         return 0;
     } while (0);
+    thread_mutex_unlock (&fh->lock);
     /* If we run into any issues with the ranges
        we fallback to a normal/non-range request */
-    fclose (file);
     client_send_416 (httpclient);
     return -1;
 }
 
-static void fserve_add_pending (fserve_t *fclient)
+// fh must be locked before calling this
+static void fh_release (fh_node *fh)
 {
-    thread_spin_lock (&pending_lock);
-    fclient->next = (fserve_t *)pending_list;
-    pending_list = fclient;
-    if (run_fserv == 0)
+    fh->refcount--;
+    if (fh->finfo.mount[0])
+        DEBUG2 ("refcount now %d on %s", fh->refcount, fh->finfo.mount);
+    if (fh->refcount)
     {
-        run_fserv = 1;
-        DEBUG0 ("fserve handler waking up");
-        thread_create("File Serving Thread", fserv_thread_function, NULL, THREAD_DETACHED);
+        thread_mutex_unlock (&fh->lock);
+        return;
     }
-    thread_spin_unlock (&pending_lock);
+    avl_tree_wlock (fh_cache);
+    thread_mutex_unlock (&fh->lock);
+    avl_delete (fh_cache, fh, _delete_fh);
+    avl_tree_unlock (fh_cache);
 }
 
-
-/* Add client to fserve thread, client needs to have refbuf set and filled
- * but may provide a NULL file if no data needs to be read
- */
-static int fserve_add_client_mount (client_t *client, const char *mount, FILE *file)
+static void file_release (client_t *client)
 {
-    fserve_t *fclient = calloc (1, sizeof(fserve_t));
+    fh_node *fh = client->shared_data;
+    const char *mount = httpp_getvar (client->parser, HTTPP_VAR_URI);
 
-    DEBUG0 ("Adding client to file serving engine");
-    if (fclient == NULL)
+    rate_free (client->out_bitrate);
+    if (client->respcode == 200)
     {
+        ice_config_t *config = config_get_config ();
+        mount_proxy *mountinfo = config_find_mount (config, mount);
+        auth_release_listener (client, mount, mountinfo);
+        config_release_config();
+    }
+    else
+    {
+        client->flags &= ~CLIENT_AUTHENTICATED;
         client_destroy (client);
-        return -1;
     }
-    fclient->file = file;
-    fclient->client = client;
-    if (mount)
-        fclient->mount = strdup (mount);
-    fclient->ready = 0;
-    fserve_add_pending (fclient);
+    global_reduce_bitrate_sampling (global.out_bitrate);
+    if (fh)
+    {
+        thread_mutex_lock (&fh->lock);
+        if (fh->finfo.flags & (FS_FALLBACK|FS_JINGLE))
+            stats_event_dec (NULL, "listeners");
+        fh_release (fh);
+    }
+}
 
-    return 0;
+
+struct _client_functions file_content_ops =
+{
+    file_send,
+    file_release
+};
+
+struct _client_functions buffer_content_ops =
+{
+    prefile_send,
+    file_release
+};
+
+
+static void fserve_move_listener (client_t *client)
+{
+    fh_node *fh = client->shared_data;
+    fbinfo f;
+
+    refbuf_release (client->refbuf);
+    client->refbuf = NULL;
+    rate_free (client->out_bitrate);
+    client->out_bitrate = NULL;
+    client->shared_data = NULL;
+    thread_mutex_lock (&fh->lock);
+    f.flags = fh->finfo.flags;
+    f.limit = fh->finfo.limit;
+    f.mount = fh->finfo.fallback;
+    f.fallback = NULL;
+    client->intro_offset = -1;
+    move_listener (client, &f);
+    fh_release (fh);
 }
 
 
-int fserve_add_client (client_t *client, FILE *file)
+static int prefile_send (client_t *client)
 {
-    return fserve_add_client_mount (client, NULL, file);
+    refbuf_t *refbuf = client->refbuf;
+    int loop = 3, bytes;
+
+    while (loop)
+    {
+        fh_node *fh = client->shared_data;
+        loop--;
+        if (fserve_running == 0 || client->con->error)
+            return -1;
+        if (refbuf == NULL || client->pos == refbuf->len)
+        {
+            if (fh && fh->finfo.fallback)
+            {
+                fserve_move_listener (client);
+                return 0;
+            }
+            if (refbuf == NULL || refbuf->next == NULL)
+            {
+                if (fh)
+                {
+                    if (fh->fp) // is there a file to read from
+                    {
+                        int len = fh->finfo.limit ? 1400 : 4096;
+                        refbuf_t *r = refbuf_new (len);
+                        refbuf_release (client->refbuf);
+                        client->refbuf = r;
+                        client->pos = len;
+                        client->ops = &file_content_ops;
+                        return 1;
+                    }
+                }
+                if (client->respcode)
+                    return -1;
+                client_send_404 (client, NULL);
+                thread_mutex_lock (&fh->lock);
+                fh_release (fh);
+                return 0;
+            }
+            else
+            {
+                refbuf = refbuf->next;
+                client->refbuf->next = NULL;
+                refbuf_release (client->refbuf);
+                client->refbuf = refbuf;
+            }
+            client->pos = 0;
+        }
+        bytes = format_generic_write_to_client (client);
+        if (bytes < 0)
+        {
+            client->schedule_ms = client->worker->time_ms + 150;
+            return 0;
+        }
+        global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
+        if (bytes < 4096)
+            break;
+    }
+    client->schedule_ms = client->worker->time_ms + (loop ? 50 : 15);
+    return 0;
 }
 
 
-/* add client to file serving engine, but just write out the buffer contents,
- * then pass the client to the callback with the provided arg
- */
-void fserve_add_client_callback (client_t *client, fserve_callback_t callback, void *arg)
+static int file_send (client_t *client)
 {
-    fserve_t *fclient = calloc (1, sizeof(fserve_t));
+    refbuf_t *refbuf = client->refbuf;
+    int loop = 5, bytes;
+    fh_node *fh = client->shared_data;
 
-    DEBUG0 ("Adding client to file serving engine");
-    if (fclient == NULL)
+    if (client->con->discon_time && client->worker->current_time.tv_sec >= client->con->discon_time)
+        return -1;
+    while (loop)
     {
-        client_send_404 (client, "memory exhausted");
-        return;
+        loop--;
+        if (fserve_running == 0 || client->con->error)
+            return -1;
+        if (fh->finfo.limit)
+        {
+            long rate = rate_avg (client->out_bitrate);
+            if (rate == 0) loop = 0;
+            if (fh->finfo.limit < rate)
+            {
+                rate_add (client->out_bitrate, 0, client->worker->time_ms);
+                client->schedule_ms = client->worker->time_ms + 150;
+                return 0;
+            }
+        }
+        if (client->pos == refbuf->len)
+        {
+            bytes = 0;
+            if (fh->finfo.fallback)
+            {
+                fserve_move_listener (client);
+                return 0;
+            }
+            if (fh->fp)
+            {
+                thread_mutex_lock (&fh->lock);
+                if (fseeko (fh->fp, client->intro_offset, SEEK_SET) == 0 &&
+                        (bytes = fread (refbuf->data, 1, refbuf->len, fh->fp)) > 0)
+                {
+                    refbuf->len = bytes;
+                    client->intro_offset += bytes;
+                }
+                thread_mutex_unlock (&fh->lock);
+            }
+            if (bytes == 0)
+                return -1;
+            client->pos = 0;
+        }
+        bytes = client->check_buffer (client);
+        if (bytes < 0)
+            return 0;
+        global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
+        if (fh->finfo.limit)
+            rate_add (client->out_bitrate, bytes, client->worker->time_ms);
     }
-    fclient->file = NULL;
-    fclient->client = client;
-    fclient->ready = 0;
-    fclient->callback = callback;
-    fclient->arg = arg;
+    client->schedule_ms = client->worker->time_ms + 10;
+    return 1;
+}
 
-    fserve_add_pending (fclient);
+
+void fserve_setup_client_fb (client_t *client, fbinfo *finfo)
+{
+    client->ops = &buffer_content_ops;
+    if (finfo)
+    {
+        fh_node *fh = open_fh (finfo);
+        client->shared_data = fh;
+        if (fh)
+        {
+            if (fh->finfo.limit)
+                client->out_bitrate = rate_setup (50,1000);
+            thread_mutex_unlock (&fh->lock);
+        }
+    }
+    else
+    {
+        client->check_buffer = format_generic_write_to_client;
+    }
+    client->intro_offset = 0;
+    client->flags |= CLIENT_ACTIVE;
 }
 
 
+void fserve_setup_client (client_t *client, const char *mount)
+{
+    fbinfo finfo;
+    finfo.flags = FS_NORMAL;
+    finfo.mount = (char *)mount;
+    finfo.fallback = NULL;
+    finfo.limit = 0;
+    client->check_buffer = format_generic_write_to_client;
+    fserve_setup_client_fb (client, &finfo);
+}
+
+
+void fserve_set_override (const char *mount, const char *dest)
+{
+    fh_node fh, *result;
+
+    fh.finfo.flags = FS_FALLBACK;
+    fh.finfo.mount = (char *)mount;
+    fh.finfo.fallback = NULL;
+
+    avl_tree_rlock (fh_cache);
+    if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0)
+    {
+        char *tmp = result->finfo.fallback;
+        result->finfo.fallback = strdup (dest);
+        free (tmp);
+    }
+    avl_tree_unlock (fh_cache);
+}
+
 static int _delete_mapping(void *mapping) {
     mime_type *map = mapping;
     free(map->ext);

Modified: icecast/branches/kh/icecast/src/fserve.h
===================================================================
--- icecast/branches/kh/icecast/src/fserve.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/fserve.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -18,26 +18,28 @@
 
 typedef void (*fserve_callback_t)(client_t *, void *);
 
-typedef struct _fserve_t
+typedef struct _fbinfo
 {
-    client_t *client;
-
-    FILE *file;
-    int ready;
-    void (*callback)(client_t *, void *);
-    void *arg;
+    int flags;
+    unsigned int limit;
     char *mount;
-    struct _fserve_t *next;
-} fserve_t;
+    char *fallback;
+} fbinfo;
 
+#define FS_NORMAL               01
+#define FS_FALLBACK             02
+#define FS_USE_ADMIN            04
+#define FS_JINGLE               010
+
 void fserve_initialize(void);
 void fserve_shutdown(void);
 int fserve_client_create(client_t *httpclient, const char *path);
-int fserve_add_client (client_t *client, FILE *file);
-void fserve_add_client_callback (client_t *client, fserve_callback_t callback, void *arg);
 char *fserve_content_type (const char *path);
 void fserve_recheck_mime_types (ice_config_t *config);
 
+void fserve_setup_client (client_t *client, const char *mount);
+void fserve_setup_client_fb (client_t *client, fbinfo *finfo);
+void fserve_set_override (const char *mount, const char *dest);
 
 #endif
 

Modified: icecast/branches/kh/icecast/src/global.c
===================================================================
--- icecast/branches/kh/icecast/src/global.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/global.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -68,10 +68,10 @@
     thread_mutex_unlock(&_global_mutex);
 }
 
-void global_add_bitrates (struct rate_calc *rate, unsigned long value)
+void global_add_bitrates (struct rate_calc *rate, unsigned long value, uint64_t milli)
 {
     thread_spin_lock (&global.spinlock);
-    rate_add (rate, value, timing_get_time());
+    rate_add (rate, value, milli);
     thread_spin_unlock (&global.spinlock);
 }
 

Modified: icecast/branches/kh/icecast/src/global.h
===================================================================
--- icecast/branches/kh/icecast/src/global.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/global.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -24,6 +24,7 @@
 
 #include "thread/thread.h"
 #include "net/sock.h"
+#include "compat.h"
 
 typedef struct ice_global_tag
 {
@@ -60,7 +61,7 @@
 void global_shutdown(void);
 void global_lock(void);
 void global_unlock(void);
-void global_add_bitrates (struct rate_calc *rate, unsigned long value);
+void global_add_bitrates (struct rate_calc *rate, unsigned long value, uint64_t milli);
 void global_reduce_bitrate_sampling (struct rate_calc *rate);
 unsigned long global_getrate_avg (struct rate_calc *rate);
 

Modified: icecast/branches/kh/icecast/src/main.c
===================================================================
--- icecast/branches/kh/icecast/src/main.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/main.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -124,10 +124,8 @@
 
     connection_shutdown();
     config_shutdown();
-    global_shutdown();
     resolver_shutdown();
     sock_shutdown();
-    thread_shutdown();
 
     DEBUG0 ("library cleanups");
 #ifdef HAVE_CURL
@@ -138,6 +136,8 @@
     _stop_logging();
     log_shutdown();
     xslt_shutdown();
+    thread_shutdown();
+    global_shutdown();
 }
 
 static int _parse_config_opts(int argc, char **argv, char *filename, int size)

Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/slave.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -42,8 +42,8 @@
 
 #include "compat.h"
 
+#include "timing/timing.h"
 #include "thread/thread.h"
-#include "timing/timing.h"
 #include "avl/avl.h"
 #include "net/sock.h"
 #include "httpp/httpp.h"
@@ -71,20 +71,41 @@
 } redirect_host;
 
 
-static void *_slave_thread(void *arg);
+static void _slave_thread(void);
 static void redirector_add (const char *server, int port, int interval);
 static redirect_host *find_slave_host (const char *server, int port);
 static void redirector_clearall (void);
+static int  relay_startup (client_t *client);
+static int  relay_read (client_t *client);
 
 int slave_running = 0;
+int worker_count;
+int relays_connecting;
+
 static volatile int update_settings = 0;
 static volatile int update_all_mounts = 0;
 static volatile int restart_connection_thread = 0;
 static time_t streamlist_check = 0;
 static rwlock_t slaves_lock;
+static spin_t relay_start_lock;
 
 redirect_host *redirectors;
+worker_t *workers;
+rwlock_t workers_lock;
 
+struct _client_functions relay_client_ops =
+{
+    relay_read,
+    client_destroy
+};
+
+struct _client_functions relay_startup_ops =
+{
+    relay_startup,
+    client_destroy
+};
+
+
 relay_server *relay_free (relay_server *relay)
 {
     relay_server *next = relay->next;
@@ -187,19 +208,25 @@
     update_all_mounts = 0;
     restart_connection_thread = 0;
     redirectors = NULL;
+    workers = NULL;
+    worker_count = 0;
+    relays_connecting = 0;
+    thread_spin_create (&relay_start_lock);
+    thread_rwlock_create (&workers_lock);
 #ifndef HAVE_CURL
     ERROR0 ("streamlist request disabled, rebuild with libcurl if required");
 #endif
-    _slave_thread (NULL);
+    _slave_thread ();
     slave_running = 0;
-    thread_rwlock_destroy (&slaves_lock);
 }
 
 
 void slave_shutdown(void)
 {
-    if (!slave_running)
-        return;
+    workers_adjust (0);
+    thread_rwlock_destroy (&slaves_lock);
+    thread_rwlock_destroy (&workers_lock);
+    thread_spin_destroy (&relay_start_lock);
     slave_running = 0;
 }
 
@@ -264,7 +291,7 @@
 /* Actually open the connection and do some http parsing, handle any 302
  * responses within here.
  */
-static client_t *open_relay_connection (relay_server *relay, relay_server_master *master)
+static int open_relay_connection (client_t *client, relay_server *relay, relay_server_master *master)
 {
     int redirects = 0;
     char *server_id = NULL;
@@ -381,25 +408,22 @@
         }
         else
         {
-            client_t *client = NULL;
-
             if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
             {
                 ERROR2("Error from relay request: %s (%s)", relay->localmount,
                         httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
                 break;
             }
-            global_lock ();
-            client = client_create (con, parser);
-            global_unlock ();
             sock_set_blocking (streamsock, 0);
+            client->con = con;
+            client->parser = parser;
             client_set_queue (client, NULL);
             free (server);
             free (mount);
             free (server_id);
             free (auth_header);
 
-            return client;
+            return 0;
         }
         redirects++;
     }
@@ -412,7 +436,7 @@
         connection_close (con);
     if (parser)
         httpp_destroy (parser);
-    return NULL;
+    return -1;
 }
 
 
@@ -424,119 +448,135 @@
     relay_server *relay = arg;
     source_t *src = relay->source;
     relay_server_master *master = relay->masters;
-    client_t *client = NULL;
+    client_t *client = src->client;
     mount_proxy *mountinfo;
     ice_config_t *config;
 
     INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
+    thread_rwlock_rlock (&global.shutdown_lock);
     thread_mutex_lock (&src->lock);
     do
     {
-        if (master)
-        {
-            thread_mutex_unlock (&src->lock);
-            client = open_relay_connection (relay, master);
-            thread_mutex_lock (&src->lock);
+        int ret;
 
-            if (client == NULL)
-                continue;
+        thread_mutex_unlock (&src->lock);
+        ret = open_relay_connection (client, relay, master);
+        thread_mutex_lock (&src->lock);
 
-            src->client = client;
-            src->parser = client->parser;
-        }
+        if (ret < 0)
+            continue;
+        src->parser = client->parser;
 
         if (connection_complete_source (src, 0) < 0)
         {
             INFO0("Failed to complete source initialisation");
-            client_destroy (client);
-            src->client = NULL;
             continue;
         }
-        if (client)
-        {
-            stats_event_inc (NULL, "source_relay_connections");
-            stats_event (relay->localmount, "source_ip", client->con->ip);
-        }
+        stats_event_inc (NULL, "source_relay_connections");
 
-        source_main (relay->source);
+        source_init (src);
+        relay->running = 1;
+        client->ops = &relay_client_ops;
+        client->flags |= CLIENT_ACTIVE;
 
-        if (relay->on_demand == 0)
-        {
-            /* try next server if configured */
-            if (global.running == ICE_RUNNING && relay->running && master->next)
-                continue;
+        thread_spin_lock (&relay_start_lock);
+        relays_connecting--;
+        thread_spin_unlock (&relay_start_lock);
+        thread_cond_signal (&client->worker->cond);
 
-            /* only keep refreshing YP entries for inactive on-demand relays */
-            yp_remove (relay->localmount);
-            relay->source->yp_public = -1;
-            relay->start = time(NULL) + 10; /* prevent busy looping if failing */
-            slave_update_all_mounts();
-        }
-
-        /* we've finished, now get cleaned up */
-        thread_mutex_unlock (&src->lock);
-        relay->cleanup = 1;
-        slave_rebuild_mounts();
-
         return NULL;
     } while ((master = master->next));
 
-    /* source should be locked here */
+    /* failed to start, better clean up and reset */
+    if (relay->on_demand)
+        src->flags &= ~SOURCE_ON_DEMAND;  // disable for a short time
+    else
+        yp_remove (relay->localmount);
+
+    if (client->con)
+        connection_close (client->con);
+    client->con = NULL;
+    if (client->parser)
+        httpp_destroy (client->parser);
+    client->parser = NULL;
+
     config = config_get_config();
     mountinfo = config_find_mount (config, src->mount);
-    if (mountinfo && mountinfo->fallback_mount)
+    if (mountinfo && mountinfo->fallback_mount && strcmp (src->mount, mountinfo->fallback_mount) != 0)
     {
-        source_t *fallback_source;
-
+        int left_on = 0;
+        client_t *leave_on = NULL, *check_clients, *last = NULL;
         INFO1 ("failed relay, fallback to %s", mountinfo->fallback_mount);
-        avl_tree_rlock(global.source_tree);
-        fallback_source = source_find_mount (mountinfo->fallback_mount);
-        if (fallback_source)
+        check_clients = src->client_list;
+        src->client_list = NULL;
+        src->listeners = 0;
+        while (check_clients)
         {
-            thread_mutex_unlock (&src->lock);
-            source_move_clients (src, fallback_source);
-            thread_mutex_lock (&src->lock);
+            client_t *to_move = check_clients;
+            check_clients = to_move->next;
+            if (to_move->flags & CLIENT_IS_SLAVE)
+            {
+                if (leave_on == NULL) last = to_move;
+                to_move->next = leave_on;
+                leave_on = to_move;
+                left_on++;
+            }
+            else
+            {
+                fbinfo f;
+                f.mount = mountinfo->fallback_mount;
+                f.flags = FS_NORMAL;
+                f.fallback = NULL;
+                if (src->format)
+                    f.limit = rate_avg (src->format->in_bitrate);
+                else
+                    f.limit = src->limit_rate;
+                thread_mutex_unlock (&src->lock);
+                move_listener (to_move, &f);
+                thread_mutex_lock (&src->lock);
+            }
         }
-        avl_tree_unlock (global.source_tree);
+        /* it is possible that listeners could of been moved here by now */
+        src->listeners += left_on;
+        if (last)
+        {
+            last->next = src->client_list;
+            src->client_list = leave_on;
+        }
     }
     config_release_config();
 
-    source_clear_source (relay->source);
-    thread_mutex_unlock (&src->lock);
+    INFO2 ("listener count still on %s is %d", src->mount, src->listeners);
+    source_clear_source (src);
+    relay->start = client->worker->current_time.tv_sec + relay->interval;
+    client->schedule_ms = timing_get_time() + 1000;
+    client->flags |= CLIENT_ACTIVE;
 
-    /* cleanup relay, but prevent this relay from starting up again too soon */
-    relay->source->on_demand = 0;
-    relay->start = time(NULL) + relay->interval;
-    relay->cleanup = 1;
-
+    thread_spin_lock (&relay_start_lock);
+    relays_connecting--;
+    thread_spin_unlock (&relay_start_lock);
+    thread_cond_signal (&client->worker->cond);
+    thread_rwlock_unlock (&global.shutdown_lock);
     return NULL;
 }
 
 
-/* wrapper for starting the provided relay stream */
 static void check_relay_stream (relay_server *relay)
 {
-    if (relay->source == NULL)
+    source_t *source = relay->source;
+    if (source && source->client)
+        return;
+    if (source == NULL)
     {
         if (relay->localmount[0] != '/')
         {
-            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
-                    relay->localmount);
+            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping", relay->localmount);
             return;
         }
         /* new relay, reserve the name */
-        relay->source = source_reserve (relay->localmount);
-        if (relay->source)
+        source = source_reserve (relay->localmount);
+        if (source == NULL)
         {
-            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
-            if (relay->on_demand)
-            {
-                relay->start = time(NULL);
-                slave_update_all_mounts();
-            }
-        }
-        else
-        {
             if (relay->start == 0)
             {
                 WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
@@ -544,91 +584,32 @@
             }
             return;
         }
+        relay->source = source;
+        INFO1("Adding new relay at mountpoint \"%s\"", relay->localmount);
+        stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
     }
-    do
+    if (source->client == NULL)
     {
-        source_t *source = relay->source;
-        /* skip relay if active, not configured or just not time yet */
-        if (relay->source == NULL || relay->running || relay->start > time(NULL))
-            break;
-        if (relay->enable == 0)
+        client_t *client;
+        global_lock();
+        client = client_create (NULL, NULL);
+        global_unlock();
+        source->client = client;
+        client->shared_data = relay;
+        client->ops = &relay_startup_ops;
+        if (relay->on_demand)
         {
-            stats_event (relay->localmount, NULL, NULL);
-            break;
-        }
-        /* check if an inactive on-demand relay has a fallback that has listeners */
-        if (relay->on_demand && source->on_demand_req == 0)
-        {
-            ice_config_t *config = config_get_config ();
-            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
-
-            source->on_demand = relay->on_demand;
-
-            if (mountinfo)
-            {
-                if (mountinfo->fallback_mount && mountinfo->fallback_override)
-                {
-                    source_t *fallback;
-                    avl_tree_rlock (global.source_tree);
-                    fallback = source_find_mount (mountinfo->fallback_mount);
-                    if (fallback && fallback->running && fallback->listeners)
-                    {
-                        DEBUG1 ("fallback running with %lu listeners", fallback->listeners);
-                        source->on_demand_req = 1;
-                    }
-                    avl_tree_unlock (global.source_tree);
-                }
-            }
-            else
-            {
-                if (relay->start)
-                {
-                    thread_mutex_lock (&relay->source->lock);
-                    source_update_settings (config, relay->source, mountinfo);
-                    thread_mutex_unlock (&relay->source->lock);
-                    relay->start = 0;
-                }
-            }
+            ice_config_t *config = config_get_config();
+            mount_proxy *mountinfo = config_find_mount (config, source->mount);
+            thread_mutex_lock (&source->lock);
+            source->flags |= SOURCE_ON_DEMAND;
+            source_update_settings (config, source, mountinfo);
+            thread_mutex_unlock (&source->lock);
             config_release_config();
-            if (source->on_demand_req == 0)
-                break;
         }
-
-        relay->start = time(NULL) + 5;
-        relay->running = 1;
-        relay->thread = thread_create ("Relay Thread", start_relay_stream,
-                relay, THREAD_ATTACHED);
-        return;
-
-    } while (0);
-    /* the relay thread may of shut down itself */
-    if (relay->cleanup)
-    {
-        if (relay->thread)
-        {
-            DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
-            thread_join (relay->thread);
-            relay->thread = NULL;
-        }
-        relay->cleanup = 0;
-        relay->running = 0;
-
-        if (relay->enable == 0)
-        {
-            stats_event (relay->localmount, NULL, NULL);
-            return;
-        }
-        if (relay->on_demand && relay->source)
-        {
-            ice_config_t *config = config_get_config ();
-            mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
-            thread_mutex_lock (&relay->source->lock);
-            source_update_settings (config, relay->source, mountinfo);
-            thread_mutex_unlock (&relay->source->lock);
-            config_release_config ();
-            stats_event (relay->localmount, "listeners", "0");
-            relay->start = time(NULL);
-        }
+        client->flags |= CLIENT_ACTIVE;
+        DEBUG1 ("adding relay client for %s", relay->localmount);
+        client_add_worker (client);
     }
 }
 
@@ -661,6 +642,7 @@
             old->on_demand = new->on_demand;
         return 0;
     } while (0);
+    new->source = old->source;
     return 1;
 }
 
@@ -733,20 +715,23 @@
 
     while (to_free)
     {
-        if (to_free->source)
+        relay_server *release = to_free;
+        to_free = release->next;
+
+        if (release->source && release->source->client)
         {
-            if (to_free->running)
+            if (release->running)
             {
                 /* relay has been removed from xml, shut down active relay */
-                DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
-                to_free->running = 0;
-                to_free->source->running = 0;
-                thread_join (to_free->thread);
+                DEBUG1 ("source shutdown request on \"%s\"", release->localmount);
+                release->running = 0;
+                release->source->flags &= ~SOURCE_RUNNING;
             }
             else
-                stats_event (to_free->localmount, NULL, NULL);
+                stats_event (release->localmount, NULL, NULL);
+            continue;
         }
-        to_free = relay_free (to_free);
+        relay_free (release);
     }
 
     relay = to_start;
@@ -1032,13 +1017,14 @@
     config = config_get_config();
     update_master_as_slave (config);
     stats_global (config);
+    workers_adjust (config->workers_count);
     config_release_config();
 
     source_recheck_mounts (1);
     connection_thread_startup();
 }
 
-static void *_slave_thread(void *arg)
+static void _slave_thread(void)
 {
     slave_startup();
 
@@ -1046,7 +1032,9 @@
     {
         relay_server *cleanup_relays = NULL;
         int skip_timer = 0;
+        struct timespec current;
 
+        thread_get_timespec (&current);
         /* re-read xml file if requested */
         if (global . schedule_config_reread)
         {
@@ -1054,13 +1042,12 @@
             global . schedule_config_reread = 0;
         }
 
-        global_add_bitrates (global.out_bitrate, 0L);
+        global_add_bitrates (global.out_bitrate, 0L, THREAD_TIME_MS(&current));
 
         if (global.running != ICE_RUNNING)
             break;
 
-        /* only update relays lists from master when required */
-        if (streamlist_check <= time(NULL))
+        if (streamlist_check <= current.tv_sec)
         {
             ice_config_t *config;
 
@@ -1070,7 +1057,7 @@
             thread_mutex_lock (&(config_locks()->relay_lock));
             config = config_get_config();
 
-            streamlist_check = time(NULL) + config->master_update_interval;
+            streamlist_check = current.tv_sec + config->master_update_interval;
             update_master_as_slave (config);
 
             update_from_master (config);
@@ -1115,8 +1102,6 @@
     thread_rwlock_wlock (&global.shutdown_lock);
     thread_rwlock_unlock (&global.shutdown_lock);
     INFO0 ("Slave thread shutdown complete");
-
-    return NULL;
 }
 
 
@@ -1229,3 +1214,108 @@
             redirect->server, redirect->port);
 }
 
+
+static int relay_read (client_t *client)
+{
+    relay_server *relay = client->shared_data;
+    source_t *source = relay->source;
+
+    thread_mutex_lock (&source->lock);
+    if (source_running (source))
+    {
+        if (relay->enable == 0)
+            source->flags &= ~SOURCE_RUNNING;
+        if (relay->on_demand && source->listeners == 0)
+            source->flags &= ~SOURCE_RUNNING;
+        source_read (source);
+        return 0;
+    }
+    DEBUG2 ("counts are %d and %d", source->termination_count, source->listeners);
+    if (relay->running)
+    {
+        client_t *listener;
+
+        INFO1 ("standing by to restart relay on %s", relay->localmount);
+        source_shutdown (source, 0);
+
+        /* the relay itself closed, but may want restarting */
+        listener = source->client_list;
+        while (listener)
+        {
+            client_set_queue (listener, NULL);
+            listener = listener->next;
+        }
+        client->ops = &relay_startup_ops;
+        relay->running = 0;
+        global_reduce_bitrate_sampling (global.out_bitrate);
+        thread_mutex_unlock (&source->lock);
+        thread_rwlock_unlock (&global.shutdown_lock);
+        return 0;
+    }
+    if ((source->flags & SOURCE_TERMINATING) == 0)
+    {
+        source_shutdown (source, 1);
+        source->flags |= SOURCE_TERMINATING;
+    }
+    if (source->termination_count)
+    {
+        thread_mutex_unlock (&source->lock);
+        return 0;
+    }
+    INFO1 ("shutting down relay %s", relay->localmount);
+    thread_mutex_unlock (&source->lock);
+    stats_event (relay->localmount, NULL, NULL);
+    global_reduce_bitrate_sampling (global.out_bitrate);
+    thread_rwlock_unlock (&global.shutdown_lock);
+    relay_free (relay);
+    return -1;
+}
+
+
+static int relay_startup (client_t *client)
+{
+    relay_server *relay = client->shared_data;
+
+    if (global.running != ICE_RUNNING)
+        return -1;
+    if (relay->enable == 0 || relay->start > client->worker->current_time.tv_sec)
+    {
+        client->schedule_ms = client->worker->time_ms + 1000;
+        return 0;
+    }
+
+    if (relay->on_demand)
+    {
+        source_t *src = relay->source;
+        src->flags |= SOURCE_ON_DEMAND;
+        if (src->listeners == 0)
+        {
+            client->schedule_ms = client->worker->time_ms + 1000;
+            return 0;
+        }
+        if (client->worker->current_time.tv_sec % 10 == 0)
+        {
+            mount_proxy * mountinfo = config_find_mount (config_get_config(), src->mount);
+            if (mountinfo && mountinfo->fallback_mount)
+                source_set_override (mountinfo->fallback_mount, src->mount);
+            config_release_config();
+        }
+        DEBUG1 ("Detected listeners on relay %s", relay->localmount);
+    }
+
+    /* limit the number of relays starting up at the same time */
+    thread_spin_lock (&relay_start_lock);
+    if (relays_connecting > 3)
+    {
+        thread_spin_unlock (&relay_start_lock);
+        client->schedule_ms = client->worker->time_ms + 2000;
+        return 0;
+    }
+    relays_connecting++;
+    thread_spin_unlock (&relay_start_lock);
+
+    client->flags &= ~CLIENT_ACTIVE;
+    thread_create ("Relay Thread", start_relay_stream, relay, THREAD_DETACHED);
+    return 0;
+}
+

Modified: icecast/branches/kh/icecast/src/slave.h
===================================================================
--- icecast/branches/kh/icecast/src/slave.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/slave.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -21,7 +21,7 @@
 void slave_update_all_mounts (void);
 void slave_rebuild_mounts (void);
 relay_server *slave_find_relay (relay_server *relays, const char *mount);
-int  redirect_client (const char *mountpoint, struct _client_tag *client);
+int  redirect_client (const char *mountpoint, client_t *client);
 void redirector_update (struct _client_tag *client);
 relay_server *relay_free (relay_server *relay);
 

Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/source.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -35,7 +35,6 @@
 #endif
 
 #include "thread/thread.h"
-#include "timing/timing.h"
 #include "avl/avl.h"
 #include "httpp/httpp.h"
 #include "net/sock.h"
@@ -60,17 +59,22 @@
 
 #define MAX_FALLBACK_DEPTH 10
 
-mutex_t move_clients_mutex;
 
 /* avl tree helper */
 static int  _compare_clients(void *compare_arg, void *a, void *b);
 static void _parse_audio_info (source_t *source, const char *s);
-static void source_shutdown (source_t *source);
-static void process_listeners (source_t *source, int fast_clients_only, int deletion_expected);
+static void source_client_release (client_t *client);
+static void source_listener_release (client_t *client);
+static int  source_client_read (client_t *client);
+static int  source_client_shutdown (client_t *client);
+static int  source_client_http_send (client_t *client);
+static int  send_to_listener (client_t *client);
 
-static int  http_source_listener (source_t *source, client_t *client);
-static int  http_source_intro (source_t *source, client_t *client);
+static int  http_source_listener (client_t *client);
+static int  http_source_intro (client_t *client);
 static int  locate_start_on_queue (source_t *source, client_t *client);
+static void listener_change_worker (client_t *client, source_t *source);
+static void source_change_worker (source_t *source);
 
 #ifdef _WIN32
 #define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
@@ -78,6 +82,37 @@
 static void source_run_script (char *command, char *mountpoint);
 #endif
 
+struct _client_functions source_client_ops = 
+{
+    source_client_read,
+    NULL
+};
+
+struct _client_functions source_client_halt_ops = 
+{
+    source_client_shutdown,
+    source_client_release
+};
+
+struct _client_functions listener_client_ops = 
+{
+    send_to_listener,
+    source_listener_release
+};
+
+struct _client_functions listener_pause_ops = 
+{
+    NULL,
+    NULL
+};
+
+struct _client_functions source_client_http_ops =
+{
+    source_client_http_send,
+    source_client_release
+};
+
+
 /* Allocate a new source with the stated mountpoint, if one already
  * exists with that mountpoint in the global source tree then return
  * NULL.
@@ -102,7 +137,6 @@
 
         /* make duplicates for strings or similar */
         src->mount = strdup (mount);
-        src->avg_bitrate_duration = 60;
         src->listener_send_trigger = 10000;
 
         thread_mutex_create (&src->lock);
@@ -197,24 +231,13 @@
 
 void source_clear_source (source_t *source)
 {
-    int i;
+    int i, do_twice = 0;
     ice_config_t *config;
     mount_proxy *mountinfo;
     refbuf_t *p;
 
     DEBUG1 ("clearing source \"%s\"", source->mount);
 
-    /* log bytes read in access log */
-    if (source->client)
-    {
-        source->client->flags &= ~CLIENT_AUTHENTICATED;
-        if (source->format)
-            source->client->con->sent_bytes = source->format->read_bytes;
-    }
-    client_destroy(source->client);
-    source->client = NULL;
-    source->parser = NULL;
-
     if (source->dumpfile)
     {
         INFO1 ("Closing dumpfile for %s", source->mount);
@@ -223,13 +246,16 @@
     }
 
     /* lets drop any listeners still connected */
+    DEBUG2 ("source %s has %d clients to release", source->mount, source->listeners);
     i = 0;
     config = config_get_config ();
     mountinfo = config_find_mount (config, source->mount);
-    while (source->active_clients)
+    while (source->client_list)
     {
-        client_t *client = source->active_clients;
-        source->active_clients = client->next;
+        client_t *client;
+
+        client = source->client_list;
+        source->client_list = client->next;
         client->next = NULL;
         /* do not count listeners who have joined but haven't done any processing */
         if (client->respcode == 200)
@@ -238,40 +264,36 @@
     }
     config_release_config ();
     if (i)
+    {
         stats_event_sub (NULL, "listeners", i);
+        stats_event_sub (source->mount, "listeners", i);
+    }
 
-    source->fast_clients_p = &source->active_clients;
-
     format_free_plugin (source->format);
     source->format = NULL;
 
     /* flush out the stream data, we don't want any left over */
 
-    /* first remove the reference for refbufs marked for burst */
-    p = source->burst_point;
-    while (p)
-    {
-        refbuf_t *to_go = p;
-        p = to_go->next;
-        to_go->next = NULL;
-        refbuf_release (to_go);
-    }
-    source->burst_point = NULL;
-    /* then the normal queue, there could be listeners still pending */
+    /* the source holds a reference on the very latest so that one
+     * always exists */
+    refbuf_release (source->stream_data_tail);
+
+    /* remove the reference for buffers on the queue */
     p = source->stream_data;
     while (p)
     {
         refbuf_t *to_go = p;
         p = to_go->next;
         to_go->next = NULL;
-        if (to_go->_count > 1)
-            WARN1 ("buffer is %d", to_go->_count);
+        // DEBUG1 ("queue refbuf count is %d", to_go->_count);
+        if (do_twice || to_go == source->burst_point)
+        { /* burst data is also counted */
+            refbuf_release (to_go); 
+            do_twice = 1;
+        }
         refbuf_release (to_go);
     }
-    /* the source holds 2 references on the very latest so that one
-     * always exists */
-    if (p)
-        refbuf_release (p);
+    source->burst_point = NULL;
     source->stream_data = NULL;
     source->stream_data_tail = NULL;
 
@@ -281,7 +303,6 @@
     source->queue_size_limit = 0;
     source->listeners = 0;
     source->prev_listeners = 0;
-    source->shoutcast_compat = 0;
     source->client_stats_update = 0;
     util_dict_free (source->audio_info);
     source->audio_info = NULL;
@@ -295,32 +316,44 @@
         source->intro_file = NULL;
     }
 
-    source->on_demand_req = 0;
+    source->flags &= ~SOURCE_ON_DEMAND_REQ;
+    thread_mutex_unlock (&source->lock);
 }
 
 
-/* Remove the provided source from the global tree and free it */
-void source_free_source (source_t *source)
+/* the internal free function. at this point we know the source is
+ * not on the source tree */
+static int _free_source (void *p)
 {
-    DEBUG1 ("freeing source \"%s\"", source->mount);
-    avl_tree_wlock (global.source_tree);
-    avl_delete (global.source_tree, source, NULL);
-    avl_tree_unlock (global.source_tree);
+    source_t *source = p;
+    source_clear_source (source);
 
     /* make sure all YP entries have gone */
     yp_remove (source->mount);
 
     /* There should be no listeners on this mount */
-    if (source->active_clients)
+    if (source->client_list)
         WARN1("active listeners on mountpoint %s", source->mount);
 
     thread_mutex_destroy (&source->lock);
 
+    INFO1 ("freeing source \"%s\"", source->mount);
     free (source->mount);
     free (source);
+    return 1;
 }
 
 
+/* Remove the provided source from the global tree and free it */
+void source_free_source (source_t *source)
+{
+    avl_tree_wlock (global.source_tree);
+    thread_mutex_lock (&source->lock); /* listeners may of been added */
+    avl_delete (global.source_tree, source, _free_source);
+    avl_tree_unlock (global.source_tree);
+}
+
+
 client_t *source_find_client(source_t *source, int id)
 {
     client_t fakeclient, *client = NULL;
@@ -329,7 +362,7 @@
     fakeclient.con = &fakecon;
     fakeclient.con->id = id;
 
-    client = source->active_clients;
+    client = source->client_list;
     while (client)
     {
         if (_compare_clients (NULL, client, &fakeclient) == 0)
@@ -340,99 +373,6 @@
 }
 
 
-/* Move clients from source to dest provided dest is running
- * and that the stream format is the same.
- * The only lock that should be held when this is called is the
- * source tree lock
- */
-void source_move_clients (source_t *source, source_t *dest)
-{
-    unsigned long count = 0;
-    if (strcmp (source->mount, dest->mount) == 0)
-    {
-        WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
-        return;
-    }
-    /* we don't want the two write locks to deadlock in here */
-    thread_mutex_lock (&move_clients_mutex);
-    thread_mutex_lock (&dest->lock);
-
-    /* if the destination is not running then we can't move clients */
-    if (dest->running == 0 && dest->on_demand == 0)
-    {
-        WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
-        thread_mutex_unlock (&dest->lock);
-        thread_mutex_unlock (&move_clients_mutex);
-        return;
-    }
-
-    do
-    {
-        client_t *leave_list = NULL;
-
-        thread_mutex_lock (&source->lock);
-
-        if (source->on_demand == 0 && source->format == NULL)
-        {
-            INFO1 ("source mount %s is not available", source->mount);
-            break;
-        }
-        if (source->format && dest->format)
-        {
-            if (source->format->type != dest->format->type)
-            {
-                WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
-                break;
-            }
-        }
-
-        /* we need to move the active listeners */
-        while (source->active_clients)
-        {
-            client_t *client = source->active_clients;
-            source->active_clients = client->next;
-
-            /* don't move known slave relays to streams which are not timed (fallback file) */
-            if (dest->client == NULL && (client->flags & CLIENT_IS_SLAVE))
-            {
-                client->next = leave_list;
-                leave_list = client;
-                continue;
-            }
-            /* when switching a client to a different queue, be wary of the 
-             * refbuf it's referring to, if it's http headers then we need
-             * to write them so don't release it.
-             */
-            if (client->check_buffer != http_source_listener)
-            {
-                client_set_queue (client, NULL);
-                client->check_buffer = http_source_intro;
-                if (source->client == NULL)
-                    client->intro_offset = -1;
-            }
-
-            client->next = dest->active_clients;
-            dest->active_clients = client;
-            count++;
-        }
-        source->active_clients = leave_list;
-        INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);
-
-        dest->listeners += count;
-        source->listeners -= count;
-        stats_event_args (source->mount, "listeners", "%lu", source->listeners);
-
-    } while (0);
-
-    thread_mutex_unlock (&source->lock);
-    /* see if we need to wake up an on-demand relay */
-    if (dest->running == 0 && dest->on_demand && count)
-        dest->on_demand_req = 1;
-
-    thread_mutex_unlock (&dest->lock);
-    thread_mutex_unlock (&move_clients_mutex);
-}
-
 /* Update stats from source processing, this should be called regulary (every
  * few seconds) to keep totals up to date.
  */
@@ -441,7 +381,6 @@
     unsigned long incoming_rate = 8 * rate_avg (source->format->in_bitrate);
     unsigned long kbytes_sent = source->bytes_sent_since_update/1024;
     unsigned long kbytes_read = source->bytes_read_since_update/1024;
-    time_t now = time(NULL);
 
     source->format->sent_bytes += kbytes_sent*1024;
     stats_event_args (source->mount, "outgoing_kbitrate", "%ld",
@@ -454,48 +393,16 @@
     stats_event_args (source->mount, "total_mbytes_sent",
             "%"PRIu64, source->format->sent_bytes/(1024*1024));
     if (source->client)
+    {
+        worker_t *worker = source->client->worker;
         stats_event_args (source->mount, "connected", "%"PRIu64,
-                (uint64_t)(now - source->client->con->con_time));
+                (uint64_t)(worker->current_time.tv_sec - source->client->con->con_time));
+    }
     stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
     stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
 
     source->bytes_sent_since_update %= 1024;
     source->bytes_read_since_update %= 1024;
-
-    if (source->running && source->limit_rate)
-    {
-        if (incoming_rate >= source->limit_rate)
-        {
-            /* when throttling, we perform a sleep so that the input is not read as quickly, we
-             * don't do precise timing here as this just makes sure the incoming bitrate is not
-             * excessive. lower bitrate stream have higher sleep counts */
-            float kbits = incoming_rate/8.0f;
-            if (kbits < 1200)
-                kbits = 1200;
-            source->throttle_stream = (int)(1000000 / kbits * 1000);
-
-            /* if bitrate is consistently excessive then terminate the stream */
-            if (source->throttle_termination == 0)
-            {
-                source->throttle_termination = now + source->avg_bitrate_duration/2;
-                /* DEBUG1 ("throttle termination set at %ld", source->throttle_termination); */
-            }
-            else
-            {
-                if (now >= source->throttle_termination)
-                {
-                    source->running = 0;
-                    WARN3 ("%s terminating, exceeding bitrate limits (%dk/%dk)",
-                            source->mount, incoming_rate/1024, source->limit_rate/1024);
-                }
-            }
-        }
-        else
-        {
-            source->throttle_stream = 0;
-            source->throttle_termination = 0;
-        }
-    }
 }
 
 
@@ -503,61 +410,66 @@
  * and sent back, however NULL is also valid as in the case of a short
  * timeout and there's no data pending.
  */
-static void get_next_buffer (source_t *source)
+void source_read (source_t *source)
 {
+    client_t *client = source->client;
     refbuf_t *refbuf = NULL;
-    int no_delay_count = 0;
+    int skip = 1;
+    int loop = 3;
+    time_t current = client->worker->current_time.tv_sec;
+    int fds = 0;
 
-    while (global.running == ICE_RUNNING && source->running)
+    if (global.running != ICE_RUNNING)
+        source->flags &= ~SOURCE_RUNNING;
+    do
     {
-        int fds = 0;
-        time_t current = time(NULL);
-        int delay = 200;
-
-        source->amount_added_to_queue = 0;
-
-        /* service fast clients but jump out once in a while to check on
-         * normal clients */
-        if (no_delay_count == 10)
-            return;
-
-        if (*source->fast_clients_p)
+        if (source->flags & SOURCE_TEMPORARY_FALLBACK && source->fallback.mount &&
+                source->termination_count == 0)
         {
-            delay = 0;
-            no_delay_count++;
+            DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
+            free (source->fallback.mount);
+            source->fallback.mount = NULL;
+            source->flags &= ~SOURCE_TEMPORARY_FALLBACK;
         }
-
-        thread_mutex_unlock (&source->lock);
-
-        if (source->throttle_stream > 0)
-            thread_sleep (source->throttle_stream);
-
-        if (source->client)
-            fds = util_timed_wait_for_fd (source->client->con->sock, delay);
-        else
+        if (source->prev_listeners != source->listeners)
         {
-            thread_sleep (delay*1000);
-            source->last_read = current;
+            INFO2("listener count on %s now %lu", source->mount, source->listeners);
+            source->prev_listeners = source->listeners;
+            stats_event_args (source->mount, "listeners", "%lu", source->listeners);
+            if (source->listeners > source->peak_listeners)
+            {
+                source->peak_listeners = source->listeners;
+                stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
+            }
         }
-
-        /* take the lock */
-        thread_mutex_lock (&source->lock);
-
         if (source->client && current >= source->client_stats_update)
         {
+            source_change_worker (source);
             update_source_stats (source);
             source->client_stats_update = current + source->stats_interval;
+            thread_mutex_unlock (&source->lock);
+            return;
         }
+        if (source->limit_rate)
+        {
+            unsigned long incoming_rate = 8 * rate_avg (source->format->in_bitrate);
+
+            if (source->limit_rate < incoming_rate)
+            {
+                rate_add (source->format->in_bitrate, 0, current);
+                break;
+            }
+        }
+        fds = util_timed_wait_for_fd (client->con->sock, 0);
         if (fds < 0)
         {
             if (! sock_recoverable (sock_error()))
             {
                 WARN0 ("Error while waiting on socket, Disconnecting source");
-                source->running = 0;
+                source->flags &= ~SOURCE_RUNNING;
             }
-            continue;
+            break;
         }
-
         if (fds == 0)
         {
             if (source->last_read + (time_t)source->timeout < current)
@@ -565,82 +477,160 @@
                 DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
                         source->timeout, (long)current);
                 WARN0 ("Disconnecting source due to socket timeout");
-                source->running = 0;
+                source->flags &= ~SOURCE_RUNNING;
                 break;
             }
-            if (delay == 0)
-            {
-                process_listeners (source, 1, 0);
-                continue;
-            }
             rate_add (source->format->in_bitrate, 0, current);
+            if (source->skip_duration < 60)
+                source->skip_duration = 80;
+            else
+                source->skip_duration = (long)(source->skip_duration *1.3);
             break;
         }
+        source->skip_duration = (long)(source->skip_duration * 0.5);
+
+        skip = 0;
         source->last_read = current;
-        refbuf = source->format->get_buffer (source);
-        if (refbuf)
+        do
         {
-            source->bytes_read_since_update += refbuf->len;
-            source->amount_added_to_queue = refbuf->len;
+            refbuf = source->format->get_buffer (source);
+            if (refbuf)
+            {
+                source->bytes_read_since_update += refbuf->len;
 
-            /* the latest refbuf is counted twice so that it stays */
-            refbuf_addref (refbuf);
+                /* the latest refbuf is counted twice so that it stays */
+                refbuf_addref (refbuf);
 
-            /* append buffer to the in-flight data queue,  */
-            if (source->stream_data == NULL)
-            {
-                source->stream_data = refbuf;
-                source->burst_point = refbuf;
-                source->burst_offset = 0;
-            }
-            if (source->stream_data_tail)
-            {
-                source->stream_data_tail->next = refbuf;
-                refbuf_release (source->stream_data_tail);
-            }
-            source->stream_data_tail = refbuf;
-            source->queue_size += refbuf->len;
+                /* append buffer to the in-flight data queue,  */
+                if (source->stream_data == NULL)
+                {
+                    source->stream_data = refbuf;
+                    source->burst_point = refbuf;
+                    source->burst_offset = 0;
+                }
+                if (source->stream_data_tail)
+                {
+                    source->stream_data_tail->next = refbuf;
+                    refbuf_release (source->stream_data_tail);
+                }
+                source->stream_data_tail = refbuf;
+                source->queue_size += refbuf->len;
 
-            /* increase refcount for keeping burst data */
-            refbuf_addref (refbuf);
+                /* increase refcount for keeping burst data */
+                refbuf_addref (refbuf);
 
-            /* move the starting point for new listeners */
-            source->burst_offset += refbuf->len;
-            while (source->burst_offset > source->burst_size)
+                /* move the starting point for new listeners */
+                source->burst_offset += refbuf->len;
+                while (source->burst_offset > source->burst_size)
+                {
+                    refbuf_t *to_release = source->burst_point;
+                    if (to_release && to_release->next)
+                    {
+                        source->burst_offset -= to_release->len;
+                        source->burst_point = to_release->next;
+                        refbuf_release (to_release);
+                        continue;
+                    }
+                    break;
+                }
+
+                /* save stream to file */
+                if (source->dumpfile && source->format->write_buf_to_file)
+                    source->format->write_buf_to_file (source, refbuf);
+            }
+            else
             {
-                refbuf_t *to_release = source->burst_point;
-                if (to_release->next)
+                if (client->con->error)
                 {
-                    source->burst_offset -= to_release->len;
-                    source->burst_point = to_release->next;
-                    refbuf_release (to_release);
-                    continue;
+                    INFO1 ("End of Stream %s", source->mount);
+                    source->flags &= ~SOURCE_RUNNING;
+                    skip = 1;
                 }
                 break;
             }
+            loop--;
+        } while (loop);
+        if (loop == 0)
+            client->schedule_ms += 20;
 
-            /* save stream to file */
-            if (source->dumpfile && source->format->write_buf_to_file)
-                source->format->write_buf_to_file (source, refbuf);
+        /* lets see if we have too much data in the queue */
+        while (source->queue_size > source->queue_size_limit ||
+                (source->stream_data && source->stream_data->_count == 1))
+        {
+            refbuf_t *to_go = source->stream_data;
+            source->stream_data = to_go->next;
+            source->queue_size -= to_go->len;
+            to_go->next = NULL;
+            /* mark for delete to tell others holding it and release it ourselves */
+            to_go->flags |= SOURCE_BLOCK_RELEASE;
+            refbuf_release (to_go);
         }
+    } while (0);
+
+    if (skip)
+        client->schedule_ms = client->worker->time_ms + source->skip_duration;
+    thread_mutex_unlock (&source->lock);
+}
+
+
+static int source_client_read (client_t *client)
+{
+    source_t *source = client->shared_data;
+
+    thread_mutex_lock (&source->lock);
+    if (source_running (source))
+        source_read (source);
+    else
+    {
+        if ((source->flags & SOURCE_TERMINATING) == 0)
+        {
+            source_shutdown (source, 1);
+            source->flags |= SOURCE_TERMINATING;
+        }
+        DEBUG2 ("counts are %d and %d", source->termination_count, source->listeners);
+        if (source->termination_count)
+            client->schedule_ms = client->worker->time_ms + 200;
         else
         {
-            if (source->client->con && source->client->con->error)
-            {
-                INFO1 ("End of Stream %s", source->mount);
-                source->running = 0;
-            }
+            /* all the source listeners are stopped but still on the handlers */
+            DEBUG0 ("source client shutting down");
+            /* move them to possible fallback */
+            client->ops = &source_client_halt_ops;
+            free (source->fallback.mount);
+            source->fallback.mount = NULL;
+            stats_event (source->mount, NULL, NULL);
         }
-        break;
+        thread_mutex_unlock (&source->lock);
     }
+    return 0;
 }
 
 
+static int source_queue_advance (client_t *client)
+{
+    source_t *source = client->shared_data;
+    refbuf_t *refbuf;
+
+    if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
+        return -1;
+    refbuf = client->refbuf;
+
+    /* move to the next buffer if we have finished with the current one */
+    if (client->pos == refbuf->len)
+    {
+        if (refbuf->next == NULL)
+            return -1;
+        client_set_queue (client, refbuf->next);
+    }
+    return source->format->write_buf_to_client (client);
+}
+
+
 static int locate_start_on_queue (source_t *source, client_t *client)
 {
     refbuf_t *refbuf;
     long lag = 0;
-    int ret = -1;
+    int ret = -1, pos = 0;
 
     /* we only want to attempt a burst at connection time, not midstream
      * however streams like theora may not have the most recent page marked as
@@ -650,8 +640,8 @@
     refbuf = source->stream_data_tail;
     if (client->intro_offset == -1 && (refbuf->flags & SOURCE_BLOCK_SYNC))
     {
-        refbuf = source->stream_data_tail;
-        lag = refbuf->len;
+        pos = refbuf->len;
+        lag = 0;
     }
     else
     {
@@ -673,10 +663,9 @@
         if (refbuf->flags & SOURCE_BLOCK_SYNC)
         {
             client_set_queue (client, refbuf);
-            client->check_buffer = format_advance_queue;
-            client->write_to_client = source->format->write_buf_to_client;
             client->intro_offset = -1;
-            client->lag = lag;
+            client->pos = pos;
+            client->queue_pos = source->client->queue_pos - lag;
             ret = 0;
             break;
         }
@@ -687,24 +676,16 @@
 }
 
 
-static int http_source_intro (source_t *source, client_t *client)
+static int http_source_intro (client_t *client)
 {
-    refbuf_t *refbuf = client->refbuf;
+    source_t *source = client->shared_data;
 
-    if (refbuf == NULL)
+    if (client->refbuf == NULL && source->intro_file)
     {
-        /* client refers to no data, must be from a move */
-        if (source->client)
-        {
-            locate_start_on_queue (source, client);
-            return -1;
-        }
-        /* source -> file fallback, need a refbuf for data */
-        refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-        client->refbuf = refbuf;
-        client->pos = refbuf->len;
+        client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+        client->pos = client->refbuf->len;
         client->intro_offset = 0;
-        client->lag = 0;
+        client->queue_pos = 0;
     }
     if (format_file_read (client, source->intro_file) < 0)
     {
@@ -712,28 +693,39 @@
         {
             /* better find the right place in queue for this client */
             client_set_queue (client, NULL);
-            return locate_start_on_queue (source, client);
+            client->check_buffer = source_queue_advance;
+            return 0;
         }
         client->intro_offset = 0;  /* replay intro file */
+        client->schedule_ms = client->worker->time_ms + 100;
         return -1;
     }
-    return 0;
+    client->schedule_ms = client->worker->time_ms + 20;
+    return source->format->write_buf_to_client (client);
 }
 
 
-static int http_source_listener (source_t *source, client_t *client)
+static int http_source_listener (client_t *client)
 {
     refbuf_t *refbuf = client->refbuf;
+    source_t *source = client->shared_data;
 
     if (refbuf == NULL)
+    {
+        client->check_buffer = http_source_intro;
         return -1;
+    }
 
     if (client->respcode == 0)
     {
+        if (source_running (source) == 0)
+        {
+            client->schedule_ms = client->worker->time_ms + 200;
+            return 0;
+        }
         if (format_prepare_headers (source, client) < 0)
         {
             ERROR0 ("internal problem, dropping client");
-            client->con->error = 1;
             return -1;
         }
         client->respcode = 200;
@@ -743,178 +735,134 @@
     }
     if (client->pos == refbuf->len)
     {
-        client->write_to_client = source->format->write_buf_to_client;
         client->check_buffer = http_source_intro;
         client->intro_offset = 0;
         client->pos = refbuf->len = 4096;
+        client->con->sent_bytes = 0;
         return -1;
     }
-    return 0;
+    client->schedule_ms = client->worker->time_ms + 10;
+    return format_generic_write_to_client (client);
 }
 
 
-/* general send routine per listener.  The deletion_expected tells us whether
- * the last in the queue is about to disappear, so if this client is still
- * referring to it after writing then drop the client as it's fallen too far
- * behind
- *
- * return 1 for fast client, limiter kicked in
- *        0 for normal case.
+/* general send routine per listener.
  */
-static int send_to_listener (source_t *source, client_t *client, int deletion_expected)
+static int send_to_listener (client_t *client)
 {
     int bytes;
     int loop = 8;   /* max number of iterations in one go */
     long total_written = 0;
+    int ret = 0;
+    source_t *source = client->shared_data;
 
+    if (client->con->error)
+        return -1;
+    if (source->fallback.mount)
+    {
+        thread_mutex_lock (&source->lock);
+
+        if (client->flags & CLIENT_IS_SLAVE)
+            client->schedule_ms = client->worker->time_ms + 100;
+        else
+        {
+            // remove from the sources client list
+            client_t **pnext = &source->client_list;
+            while (*pnext && *pnext != client)
+                pnext = &((*pnext)->next);
+            *pnext = client->next;
+            if (client->check_buffer != http_source_listener)
+            {
+                client_set_queue (client, NULL);
+                client->check_buffer = source->format->write_buf_to_client;
+            }
+            thread_mutex_unlock (&source->lock);
+            move_listener (client, &source->fallback);
+            thread_mutex_lock (&source->lock);
+            source->listeners--;
+        }
+        source->termination_count--;
+        thread_mutex_unlock (&source->lock);
+        return 0;
+    }
+    if (source->flags & SOURCE_TERMINATING)
+    {
+        thread_mutex_lock (&source->lock);
+        DEBUG1 ("termination_count now %d",  source->termination_count);
+        client_set_queue (client, NULL);
+        client->ops = &listener_pause_ops;
+        source->termination_count--;
+        thread_mutex_unlock (&source->lock);
+        return -1;
+    }
     /* check for limited listener time */
-    if (client->con->discon_time && time(NULL) >= client->con->discon_time)
+    if (client->con->discon_time &&
+            client->worker->current_time.tv_sec >= client->con->discon_time)
     {
         INFO1 ("time limit reached for client #%lu", client->con->id);
-        client->con->error = 1;
+        return -1;
+    }
+    if (source_running (source) == 0)
+    {
+        client->schedule_ms = client->worker->time_ms + 200;
         return 0;
     }
 
-    if (source->amount_added_to_queue)
-        client->lag += source->amount_added_to_queue;
+    thread_mutex_lock (&source->lock);
 
+    // do we migrate this listener to the same handler as the source client
+    if (source->client && source->client->worker != client->worker)
+        listener_change_worker (client, source);
+
+    client->schedule_ms = client->worker->time_ms;
     while (loop)
     {
         /* jump out if client connection has died */
         if (client->con->error)
+        {
+            ret = -1;
             break;
-
+        }
         /* lets not send too much to one client in one go, but don't
            sleep for too long if more data can be sent */
         if (total_written > source->listener_send_trigger)
-        {
-            loop = 0;
             break;
-        }
-
-        if (client->check_buffer (source, client) < 0)
-            break;
-
-        bytes = client->write_to_client (client);
-        if (bytes <= 0)
+        bytes = client->check_buffer (client);
+        if (bytes < 0)
             break;  /* can't write any more */
 
+        client->schedule_ms += 5;
         total_written += bytes;
         loop--;
     }
+    if (loop == 0)
+        client->schedule_ms -= 20;
     if (total_written)
     {
-        rate_add (source->format->out_bitrate, total_written, timing_get_time());
-        global_add_bitrates (global.out_bitrate, total_written);
+        rate_add (source->format->out_bitrate, total_written, client->worker->time_ms);
+        global_add_bitrates (global.out_bitrate, total_written, client->worker->time_ms);
         source->bytes_sent_since_update += total_written;
     }
 
     /* the refbuf referenced at head (last in queue) may be marked for deletion
      * if so, check to see if this client is still referring to it */
-    if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
+    if (client->refbuf && (client->refbuf->flags & SOURCE_BLOCK_RELEASE))
     {
         INFO2 ("Client %lu (%s) has fallen too far behind, removing",
                 client->con->id, client->con->ip);
         stats_event_inc (source->mount, "slow_listeners");
         client_set_queue (client, NULL);
-        client->con->error = 1;
+        ret = -1;
     }
-    return loop ? 0 : 1;
+    thread_mutex_unlock (&source->lock);
+    return ret;
 }
 
 
-/* run through the queue of listeners, the fast ones are at the back of the
- * queue so you may want to process only those. If a buffer is going to be
- * removed from the stream queue then flag it so that listeners can be
- * dropped if need be.
- */
-static void process_listeners (source_t *source, int fast_clients_only, int deletion_expected)
-{
-    client_t *client, **client_p;
-    client_t *fast_clients = NULL, **fast_client_tail = &fast_clients;
-    unsigned long listener_count = 0;
-
-    /* where do we start from */
-    if (fast_clients_only)
-        client_p = source->fast_clients_p;
-    else
-        client_p = &source->active_clients;
-    client = *client_p;
-
-    while (client)
-    {
-        int fast_client = send_to_listener (source, client, deletion_expected);
-
-        if (client->con->error)
-        {
-            client_t *to_go = client;
-            ice_config_t *config;
-            mount_proxy *mountinfo;
-
-            *client_p = client->next;
-            client = client->next;
-
-            config = config_get_config ();
-            mountinfo = config_find_mount (config, source->mount);
-            auth_release_listener (to_go, source->mount, mountinfo);
-            config_release_config();
-
-            source->listeners--;
-            stats_event_dec (NULL, "listeners");
-            DEBUG0("Client removed");
-            continue;
-        }
-        listener_count++;
-        if (fast_client && client->check_buffer != http_source_intro)
-        {
-            client_t *to_move = client;
-
-            *client_p = client->next;
-            client = client->next;
-
-            to_move->next = NULL;
-            *fast_client_tail = to_move;
-            fast_client_tail = &to_move->next;
-            continue;
-        }
-        client_p = &client->next;
-        client = *client_p;
-    }
-    source->fast_clients_p = client_p;
-    /* place fast clients list at the end */
-    if (fast_clients)
-        *client_p = fast_clients;
-
-    /* consistency check, these should match */
-    if (fast_clients_only == 0 && listener_count != source->listeners)
-        ERROR3 ("mount %s has %lu, %lu", source->mount, listener_count, source->listeners);
-
-    /* has the listener count changed */
-    if (source->listeners != source->prev_listeners)
-    {
-        source->prev_listeners = source->listeners;
-        INFO2("listener count on %s now %lu", source->mount, source->listeners);
-        if (source->listeners > source->peak_listeners)
-        {
-            source->peak_listeners = source->listeners;
-            stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
-        }
-        stats_event_args (source->mount, "listeners", "%lu", source->listeners);
-        if (source->listeners == 0)
-            rate_reduce (source->format->out_bitrate, 0);
-        /* change of listener numbers, so reduce scope of global sampling */
-        global_reduce_bitrate_sampling (global.out_bitrate);
-        /* do we need to shutdown an on-demand relay */
-        if (source->listeners == 0 && source->on_demand)
-            source->running = 0;
-    }
-}
-
-
 /* Perform any initialisation before the stream data is processed, the header
  * info is processed by now and the format details are setup
  */
-static void source_init (source_t *source)
+void source_init (source_t *source)
 {
     mount_proxy *mountinfo;
 
@@ -929,13 +877,9 @@
         }
     }
 
-    /* grab a read lock, to make sure we get a chance to cleanup */
-    thread_rwlock_rlock (&global.shutdown_lock);
-
     /* start off the statistics */
     stats_event_inc (NULL, "source_total_connections");
     stats_event_hidden (source->mount, "slow_listeners", "0", STATS_COUNTERS);
-    stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
     stats_event (source->mount, "server_type", source->format->contenttype);
     stats_event_hidden (source->mount, "listener_peak", "0", STATS_COUNTERS);
     stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
@@ -943,15 +887,17 @@
     stats_event_hidden (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
     stats_event_hidden (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
     stats_event_hidden (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
+    stats_event (source->mount, "source_ip", source->client->con->ip);
 
     source->last_read = time(NULL);
     source->prev_listeners = -1;
     source->bytes_sent_since_update = 0;
     source->stats_interval = 5;
+    source->termination_count = 0;
     /* so the first set of average stats after 3 seconds */
     source->client_stats_update = source->last_read + 3;
+    source->skip_duration = 80;
 
-    source->fast_clients_p = &source->active_clients;
     source->audio_info = util_dict_new();
     if (source->client)
     {
@@ -962,11 +908,13 @@
             stats_event (source->mount, "audio_info", str);
         }
     }
+    source->format->in_bitrate = rate_setup (50, 1);
+    source->format->out_bitrate = rate_setup (50, 1000);
 
     thread_mutex_unlock (&source->lock);
 
     /* on demand relays should of already called this */
-    if (source->on_demand == 0)
+    if ((source->flags & SOURCE_ON_DEMAND) == 0)
         slave_update_all_mounts();
 
     mountinfo = config_find_mount (config_get_config(), source->mount);
@@ -984,94 +932,47 @@
          */
 
         if (mountinfo->fallback_override && mountinfo->fallback_mount)
-        {
-            source_t *fallback_source;
-
-            avl_tree_rlock(global.source_tree);
-            fallback_source = source_find_mount (mountinfo->fallback_mount);
-
-            if (fallback_source)
-                source_move_clients (fallback_source, source);
-
-            avl_tree_unlock(global.source_tree);
-        }
+            source_set_override (mountinfo->fallback_mount, source->mount);
     }
     config_release_config();
 
-    thread_mutex_lock (&source->lock);
-
-    source->format->in_bitrate = rate_setup (source->avg_bitrate_duration+1, 1);
-    source->format->out_bitrate = rate_setup (120, 1000);
     INFO1 ("Source %s initialised", source->mount);
-    source->running = 1;
+    source->flags |= SOURCE_RUNNING;
 }
 
 
-void source_main (source_t *source)
+void source_set_override (const char *mount, const char *dest)
 {
-    source_init (source);
+    source_t *source;
 
-    while (global.running == ICE_RUNNING && source->running)
+    avl_tree_rlock (global.source_tree);
+    source = source_find_mount (mount);
+    if (source)
     {
-        int remove_from_q;
+        source->fallback.limit = 0;
+        source->fallback.mount = strdup (dest);
+    }
+    else
+        fserve_set_override (mount, dest);
+    avl_tree_unlock (global.source_tree);
+}
 
-        get_next_buffer (source);
 
-        remove_from_q = 0;
+void source_set_fallback (source_t *source, const char *dest_mount)
+{
+    if (dest_mount == NULL)
+        return;
 
-        /* lets see if we have too much data in the queue, but do not
-           remove it until later */
-        if (source->queue_size > source->queue_size_limit)
-            remove_from_q = 1;
-
-        process_listeners (source, 0, remove_from_q);
-
-        /* lets reduce the queue, any lagging clients should of been
-         * terminated by now
-         */
-        if (source->stream_data)
-        {
-            /* if we need to prune the queue to keep within the specific limit then
-             * all other references to the last block need of been gone by now
-             */
-            if (remove_from_q && source->stream_data->_count > 1)
-                ERROR1 ("unable to prune queue, size is at %ld", source->queue_size);
-
-            /* normal unreferenced queue data will have a refcount 1, but
-             * burst queue data will be at least 2, active clients will also
-             * increase refcount
-             */
-            while (source->stream_data->_count == 1)
-            {
-                refbuf_t *to_go = source->stream_data;
-
-                if (to_go->next == NULL || source->burst_point == to_go)
-                {
-                    /* this should not happen */
-                    ERROR2 ("queue state is unexpected (%p, %d)", to_go->next, 
-                            source->burst_point == to_go ? 1: 0);
-                    source->running = 0;
-                    break;
-                }
-                source->stream_data = to_go->next;
-                source->queue_size -= to_go->len;
-                to_go->next = NULL;
-                refbuf_release (to_go);
-            }
-        }
-    }
-    source->running = 0;
-
-    source_shutdown (source);
+    source->fallback.flags = FS_FALLBACK;
+    source->fallback.mount = strdup (dest_mount);
+    source->fallback.limit = (int)(rate_avg (source->format->in_bitrate) * 1.02);
+    source->termination_count = source->listeners;
 }
 
-
-/* this function is expected to keep lock held on exit */
-static void source_shutdown (source_t *source)
+void source_shutdown (source_t *source, int with_fallback)
 {
     mount_proxy *mountinfo;
 
-    source->running = 0;
     INFO1("Source \"%s\" exiting", source->mount);
 
     update_source_stats (source);
@@ -1082,49 +983,15 @@
             source_run_script (mountinfo->on_disconnect, source->mount);
         auth_stream_end (mountinfo, source->mount);
 
-        /* we have de-activated the source now, so no more clients will be
-         * added, now move the listeners we have to the fallback (if any)
-         */
-        if (mountinfo->fallback_mount)
-        {
-            char *mount = strdup (mountinfo->fallback_mount);
-            source_t *fallback_source;
-
-            config_release_config();
-            avl_tree_rlock (global.source_tree);
-            fallback_source = source_find_mount (mount);
-            free (mount);
-
-            if (fallback_source != NULL)
-            {
-                /* be careful wrt to deadlocking */
-                thread_mutex_unlock (&source->lock);
-                source_move_clients (source, fallback_source);
-                thread_mutex_lock (&source->lock);
-            }
-
-            avl_tree_unlock (global.source_tree);
-        }
+        if (with_fallback)
+            source_set_fallback (source, mountinfo->fallback_mount);
     }
-    else
-        config_release_config();
+    config_release_config();
 
-    /* delete this sources stats */
-    stats_event(source->mount, NULL, NULL);
-
-    /* we don't remove the source from the tree here, it may be a relay and
-       therefore reserved */
-    source_clear_source (source);
-
-    global_reduce_bitrate_sampling (global.out_bitrate);
-
     global_lock();
     global.sources--;
     stats_event_args (NULL, "sources", "%d", global.sources);
     global_unlock();
-
-    /* release our hold on the lock so the main thread can continue cleaning up */
-    thread_rwlock_unlock (&global.shutdown_lock);
 }
 
 
@@ -1204,10 +1071,6 @@
     if (source->format && source->format->apply_settings)
         source->format->apply_settings (source->client, source->format, mountinfo);
 
-    str = httpp_getvar (parser, "user-agent");
-    if (str && source->format)
-        stats_event_conv (source->mount, "user_agent", str, source->format->charset);
-
     /* public */
     if (mountinfo && mountinfo->yp_public >= 0)
         val = mountinfo->yp_public;
@@ -1342,11 +1205,6 @@
     if (mountinfo && mountinfo->limit_rate)
         source->limit_rate = mountinfo->limit_rate;
 
-    if (mountinfo)
-        source->avg_bitrate_duration = mountinfo->avg_bitrate_duration;
-    else
-        source->avg_bitrate_duration = 60;
-
     /* needs a better mechanism, probably via a client_t handle */
     free (source->dumpfilename);
     source->dumpfilename = NULL;
@@ -1411,12 +1269,6 @@
  */
 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
 {
-    /*  skip if source is a fallback to file */
-    if (source->running && source->client == NULL)
-    {
-        stats_event_hidden (source->mount, NULL, NULL, STATS_HIDDEN);
-        return;
-    }
     /* set global settings first */
     source->queue_size_limit = config->queue_size_limit;
     source->timeout = config->source_timeout;
@@ -1429,7 +1281,7 @@
 
     if (source->dumpfilename)
         DEBUG1 ("Dumping stream to %s", source->dumpfilename);
-    if (source->on_demand)
+    if (source->flags & SOURCE_ON_DEMAND)
     {
         DEBUG0 ("on_demand set");
         stats_event (source->mount, "on_demand", "1");
@@ -1471,54 +1323,30 @@
 }
 
 
-void *source_client_thread (void *arg)
+int source_client_callback (client_t *client, void *arg)
 {
+    const char *agent;
     source_t *source = arg;
 
-    stats_event_inc(NULL, "source_client_connections");
-
-    thread_mutex_lock (&source->lock);
-    source_main (source);
-
-    if (source->wait_time)
+    if (client->con->error) /* did http response fail? */
     {
-        time_t release = source->wait_time + time(NULL);
-        INFO2 ("keeping %s reserved for %d seconds", source->mount, source->wait_time);
-        thread_mutex_unlock (&source->lock);
-        while (global.running && release >= time(NULL))
-            thread_sleep (1000000);
-    }
-    else
-        thread_mutex_unlock (&source->lock);
-
-    source_free_source (source);
-    slave_update_all_mounts();
-
-    return NULL;
-}
-
-
-void source_client_callback (client_t *client, void *arg)
-{
-    source_t *source = arg;
-    refbuf_t *old_data = client->refbuf;
-
-    if (client->con->error)
-    {
         global_lock();
         global.sources--;
         global_unlock();
-        source_clear_source (source);
-        source_free_source (source);
-        return;
+        return -1;
     }
-    client->refbuf = old_data->associated;
-    old_data->associated = NULL;
-    refbuf_release (old_data);
-    stats_event (source->mount, "source_ip", source->client->con->ip);
+    thread_rwlock_rlock (&global.shutdown_lock);
 
-    thread_create ("Source Thread", source_client_thread,
-            source, THREAD_DETACHED);
+    agent = httpp_getvar (source->client->parser, "user-agent");
+    if (agent)
+        stats_event (source->mount, "user_agent", agent);
+    stats_event_inc(NULL, "source_client_connections");
+    stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
+
+    source_init (source);
+    client->ops = &source_client_ops;
+    client->shared_data = source;
+    return 0;
 }
 
 
@@ -1557,71 +1385,6 @@
 #endif
 
 
-static void *source_fallback_file (void *arg)
-{
-    char *mount = arg;
-    char *type;
-    char *path;
-    unsigned int len;
-    FILE *file = NULL;
-    source_t *source = NULL;
-    ice_config_t *config;
-    http_parser_t *parser;
-
-    do
-    {
-        if (mount == NULL || mount[0] != '/')
-            break;
-        config = config_get_config();
-        len  = strlen (config->webroot_dir) + strlen (mount) + 1;
-        path = malloc (len);
-        if (path)
-            snprintf (path, len, "%s%s", config->webroot_dir, mount);
-        
-        config_release_config ();
-        if (path == NULL)
-            break;
-
-        file = fopen (path, "rb");
-        if (file == NULL)
-        {
-            WARN1 ("unable to open file \"%s\"", path);
-            free (path);
-            break;
-        }
-        free (path);
-        source = source_reserve (mount);
-        if (source == NULL)
-        {
-            WARN1 ("mountpoint \"%s\" already reserved", mount);
-            break;
-        }
-        INFO1 ("mountpoint %s is reserved", mount);
-        type = fserve_content_type (mount);
-        parser = httpp_create_parser();
-        httpp_initialize (parser, NULL);
-        httpp_setvar (parser, "content-type", type);
-        free (type);
-
-        stats_event_hidden (source->mount, NULL, NULL, STATS_HIDDEN);
-        source->yp_public = 0;
-        source->intro_file = file;
-        source->parser = parser;
-        source->avg_bitrate_duration = 20;
-        source->listener_send_trigger = 4096;
-        file = NULL;
-
-        if (connection_complete_source (source, 0) < 0)
-            break;
-        source_client_thread (source);
-        httpp_destroy (parser);
-    } while (0);
-    if (file)
-        fclose (file);
-    free (mount);
-    return NULL;
-}
-
 static int is_mount_template (const char *mount)
 {
     if (strchr (mount, '*') || strchr (mount, '?') || strchr (mount, '['))
@@ -1679,17 +1442,6 @@
         else
             stats_event (mount->mountname, NULL, NULL);
 
-        /* check for fallback to file */
-        if (global.running == ICE_RUNNING && mount->fallback_mount)
-        {
-            source_t *fallback = source_find_mount (mount->fallback_mount);
-            if (fallback == NULL)
-            {
-                thread_create ("Fallback file thread", source_fallback_file,
-                        strdup (mount->fallback_mount), THREAD_DETACHED);
-            }
-        }
-
         mount = mount->next;
     }
     avl_tree_unlock (global.source_tree);
@@ -1709,10 +1461,10 @@
         return 1;
 
     /* allow multiple authenticated relays */
-    if (client->username == NULL || (client->flags & CLIENT_IS_SLAVE))
+    if (client->username == NULL || client->flags & CLIENT_IS_SLAVE)
         return 1;
 
-    existing = source->active_clients;
+    existing = source->client_list;
     while (existing)
     {
         if (existing->con->error == 0 && existing->username &&
@@ -1732,10 +1484,95 @@
 }
 
 
-/* Add client to source if it finds one. If a 0 is returned then the client should not be
- * touched, if the return value is -1 then it failed to add and should not be touched.
- * If it's a -2 value then the client is still around for any further processing.
+/* listeners have now detected the source shutting down, now wait for them to 
+ * exit the handlers
  */
+static int source_client_shutdown (client_t *client)
+{
+    source_t *source = client->shared_data;
+    int ret = -1;
+
+    client->schedule_ms = client->worker->time_ms + 100;
+    if (client->con->discon_time)
+    {
+        if (client->con->discon_time >= client->worker->current_time.tv_sec)
+            return 0;
+        else
+            return -1;
+    }
+    thread_mutex_lock (&source->lock);
+    DEBUG1 ("remaining listeners to process is %d", source->listeners);
+    /* listeners handled now */
+    if (source->wait_time)
+    {
+        /* set a wait time for leaving the source reserved */
+        client->con->discon_time = client->worker->current_time.tv_sec + source->wait_time;
+        INFO2 ("keeping %s reserved for %d seconds", source->mount, source->wait_time);
+        ret = 0;
+    }
+    thread_mutex_unlock (&source->lock);
+    return ret;
+}
+
+
+/* clean up what is left from the source. */
+void source_client_release (client_t *client)
+{
+    source_t *source = client->shared_data;
+
+    global_reduce_bitrate_sampling (global.out_bitrate);
+
+    thread_mutex_lock (&source->lock);
+    /* log bytes read in access log */
+    if (source->format)
+        client->con->sent_bytes = source->format->read_bytes;
+    thread_mutex_unlock (&source->lock);
+
+    client_destroy (client);
+    source_free_source (source);
+    thread_rwlock_unlock (&global.shutdown_lock);
+    slave_update_all_mounts();
+}
+
+
+/* listener is off the handler list, so clean up */
+static void source_listener_release (client_t *client)
+{
+    source_t *source = client->shared_data;
+    ice_config_t *config;
+    mount_proxy *mountinfo;
+    client_t **pnext;
+    int value;
+
+    if (source_running (source) == 0)
+        return;
+
+    thread_mutex_lock (&source->lock);
+
+    /* search through sources client list to find previous link in list */
+    // we could flag the source to recreate the client list when needed
+    // killclient, listclients, move clients
+    pnext = &source->client_list;
+    while (*pnext && *pnext != client)
+        pnext = &((*pnext)->next);
+    *pnext = client->next;
+    value = --source->listeners;
+    if (source->listeners == 0)
+        rate_reduce (source->format->out_bitrate, 0);
+
+    stats_event_dec (NULL, "listeners");
+    stats_event_args (source->mount, "listeners", "%lu", value);
+    /* change of listener numbers, so reduce scope of global sampling */
+    global_reduce_bitrate_sampling (global.out_bitrate);
+
+    config = config_get_config ();
+    mountinfo = config_find_mount (config, source->mount);
+    auth_release_listener (client, source->mount, mountinfo);
+    config_release_config();
+    thread_mutex_unlock (&source->lock);
+}
+
+
 int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
 {
     int loop = 10;
@@ -1775,7 +1612,7 @@
         /* ok, we found a source and it is locked */
         if (client->flags & CLIENT_IS_SLAVE)
         {
-            if (source->client == NULL && source->on_demand == 0)
+            if (source->client == NULL && (source->flags & SOURCE_ON_DEMAND) == 0)
             {
                 client_send_403 (client, "Slave relay reading from time unregulated stream");
                 return -1;
@@ -1854,48 +1691,78 @@
         return -1;
 
     } while (1);
-
     client->con->sent_bytes = 0;
-    client->write_to_client = format_generic_write_to_client;
-    client->check_buffer = http_source_listener;
+
     client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
     memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
 
-    /* lets add the client to the active list */
-    client->next = source->active_clients;
-    source->active_clients = client;
-    source->listeners++;
+    source_setup_listener (source, client);
+    client->flags |= CLIENT_ACTIVE;
     thread_mutex_unlock (&source->lock);
 
-    if (source->running == 0 && source->on_demand)
+    return 0;
+}
+
+/* call with the source lock heldm, but expect the lock released on exit
+ * as the listener may of changed threads and therefore lock needed to be
+ * released
+ */
+void source_setup_listener (source_t *source, client_t *client)
+{
+    client->ops = &listener_client_ops;
+    client->shared_data = source;
+    client->queue_pos = 0;
+
+    client->check_buffer = http_source_listener;
+    // add client to the source
+    client->next = source->client_list;
+    source->client_list = client;
+    source->listeners++;
+}
+
+
+static int source_client_http_send (client_t *client)
+{
+    refbuf_t *stream;
+
+    if (client->pos < client->refbuf->len)
     {
-        /* enable on-demand relay to start, wake up the slave thread */
-        DEBUG0("kicking off on-demand relay");
-        source->on_demand_req = 1;
+        int ret = format_generic_write_to_client (client);
+        if (ret > 0 && ret < client->refbuf->len)
+            return 0; /* trap for short writes */
     }
-    return 0;
+    stream = client->refbuf->associated;
+    client->refbuf->associated = NULL;
+    refbuf_release (client->refbuf);
+    client->refbuf = stream;
+    client->pos = client->intro_offset;
+    client->intro_offset = 0;
+    return source_client_callback (client, client->shared_data);
 }
 
 
-void source_startup (client_t *client, const char *uri)
+int source_startup (client_t *client, const char *uri)
 {
     source_t *source;
     source = source_reserve (uri);
 
     if (source)
     {
+        thread_mutex_lock (&source->lock);
         source->client = client;
         source->parser = client->parser;
         if (connection_complete_source (source, 1) < 0)
         {
-            source_clear_source (source);
+            thread_mutex_unlock (&source->lock);
             source_free_source (source);
-            return;
+            return -1;
         }
         client->respcode = 200;
-        if (client->server_conn->shoutcast_compat)
+        client->shared_data = source;
+
+        if (client->server_conn && client->server_conn->shoutcast_compat)
         {
-            source->shoutcast_compat = 1;
+            source->flags |= SOURCE_SHOUTCAST_COMPAT;
             source_client_callback (client, source);
         }
         else
@@ -1907,7 +1774,10 @@
             /* we may have unprocessed data read in, so don't overwrite it */
             ok->associated = client->refbuf;
             client->refbuf = ok;
-            fserve_add_client_callback (client, source_client_callback, source);
+            client->intro_offset = client->pos;
+            client->pos = 0;
+            client->ops = &source_client_http_ops;
+            thread_mutex_unlock (&source->lock);
         }
     }
     else
@@ -1915,6 +1785,53 @@
         client_send_403 (client, "Mountpoint in use");
         WARN1 ("Mountpoint %s in use", uri);
     }
+    return 0;
 }
 
 
+/* check to see if the source client can be moved to a less busy worker thread.
+ * we only move the source client, not the listeners, they will move later
+ */
+void source_change_worker (source_t *source)
+{
+    client_t *client = source->client;
+    worker_t *this_worker = client->worker, *worker;
+
+    thread_rwlock_rlock (&workers_lock);
+    worker = find_least_busy_handler ();
+    if (worker != client->worker)
+    {
+        if (worker->count + source->listeners + 10 < client->worker->count)
+        {
+            thread_mutex_unlock (&source->lock);
+            client_change_worker (client, worker);
+            DEBUG2 ("moving source from %p to %p", this_worker, worker);
+            thread_mutex_lock (&source->lock);
+        }
+    }
+    thread_rwlock_unlock (&workers_lock);
+}
+
+
+/* move listener client to worker theread that the source is on. This will
+ * help cache but prevent overloading a single worker with many listeners.
+ */
+void listener_change_worker (client_t *client, source_t *source)
+{
+    worker_t *this_worker = client->worker, *dest_worker;
+    long diff;
+
+    thread_rwlock_rlock (&workers_lock);
+    dest_worker = source->client->worker;
+    diff = dest_worker->count - this_worker->count;
+
+    if (diff < 1000 && this_worker != dest_worker)
+    {
+        thread_mutex_unlock (&source->lock);
+        client_change_worker (client, dest_worker);
+        DEBUG2 ("moving listener from %p to %p", this_worker, dest_worker);
+        thread_mutex_lock (&source->lock);
+    }
+    thread_rwlock_unlock (&workers_lock);
+}
+

Modified: icecast/branches/kh/icecast/src/source.h
===================================================================
--- icecast/branches/kh/icecast/src/source.h	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/source.h	2009-07-07 23:26:21 UTC (rev 16219)
@@ -17,25 +17,23 @@
 #include "yp.h"
 #include "util.h"
 #include "format.h"
+#include "fserve.h"
 
 #include <stdio.h>
 
 typedef struct source_tag
 {
+    char *mount;
+    unsigned int flags;
+    int listener_send_trigger;
+
     client_t *client;
     http_parser_t *parser;
     time_t client_stats_update;
-    
-    char *mount;
 
-    /* set to zero to request the source to shutdown without causing a global
-     * shutdown */
-    int running;
-
     struct _format_plugin_tag *format;
 
-    client_t *active_clients;
-    client_t **fast_clients_p;
+    client_t *client_list;
 
     util_dict *audio_info;
 
@@ -45,19 +43,18 @@
     char *dumpfilename; /* Name of a file to dump incoming stream to */
     FILE *dumpfile;
 
-    int throttle_stream;
-    time_t throttle_termination;
-    uint64_t limit_rate;
-    int avg_bitrate_duration;
+    fbinfo fallback;
+
+    int skip_duration;
+    long limit_rate;
     time_t wait_time;
-    long listener_send_trigger;
 
+    unsigned long termination_count;
     unsigned long peak_listeners;
     unsigned long listeners;
     unsigned long prev_listeners;
 
     int yp_public;
-    int shoutcast_compat;
     int log_id;
 
     /* per source burst handling for connecting clients */
@@ -67,11 +64,8 @@
 
     unsigned int queue_size;
     unsigned int queue_size_limit;
-    unsigned int amount_added_to_queue;
 
     unsigned timeout;  /* source timeout in seconds */
-    int on_demand;
-    int on_demand_req;
     unsigned long bytes_sent_since_update;
     unsigned long bytes_read_since_update;
     int stats_interval;
@@ -85,13 +79,20 @@
 
 } source_t;
 
-#define source_available(x)     ((x)->running || (x)->on_demand)
-#define source_running(x)       ((x)->running)
+#define SOURCE_RUNNING              001
+#define SOURCE_ON_DEMAND            002
+#define SOURCE_ON_DEMAND_REQ        004
+#define SOURCE_SHOUTCAST_COMPAT     010
+#define SOURCE_TERMINATING          020
+#define SOURCE_TEMPORARY_FALLBACK   040
 
+#define source_available(x)     ((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND))
+#define source_running(x)       ((x)->flags & SOURCE_RUNNING)
+
 source_t *source_reserve (const char *mount);
 void *source_client_thread (void *arg);
-void source_startup (client_t *client, const char *uri);
-void source_client_callback (client_t *client, void *source);
+int  source_startup (client_t *client, const char *uri);
+int  source_client_callback (client_t *client, void *source);
 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo);
 void source_clear_source (source_t *source);
 source_t *source_find_mount(const char *mount);
@@ -104,8 +105,15 @@
 void source_main(source_t *source);
 void source_recheck_mounts (int update_all);
 int  source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client);
+void source_read (source_t *source);
+void source_setup_listener (source_t *source, client_t *client);
+void source_init (source_t *source);
+void source_shutdown (source_t *source, int with_fallback);
+void source_set_fallback (source_t *source, const char *dest_mount);
+void source_set_override (const char *mount, const char *dest);
 
-extern mutex_t move_clients_mutex;
+#define SOURCE_BLOCK_SYNC           01
+#define SOURCE_BLOCK_RELEASE        02
 
 #define SOURCE_BLOCK_SYNC           01
 

Modified: icecast/branches/kh/icecast/src/stats.c
===================================================================
--- icecast/branches/kh/icecast/src/stats.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/stats.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -47,7 +47,10 @@
 #endif
 
 #define VAL_BUFSIZE 20
+#define STATS_BLOCK_NORMAL  01
 
+#define STATS_LARGE  CLIENT_FORMAT_BIT
+
 #define STATS_EVENT_SET     0
 #define STATS_EVENT_INC     1
 #define STATS_EVENT_DEC     2
@@ -83,12 +86,11 @@
 
 typedef struct _event_listener_tag
 {
-    client_t *client;
     int hidden_level;
     char *source;
 
     /* queue for unwritten stats to stats clients */
-    refbuf_t *queue, **queue_recent_p;
+    refbuf_t **queue_recent_p;
     unsigned int content_len;
 
     struct _event_listener_tag *next;
@@ -577,32 +579,71 @@
 }
 
 
-static void stats_listeners_send (event_listener_t *listener)
+static int stats_listeners_send (client_t *client)
 {
-    int loop = 6;
-    int ret;
-    client_t *client = listener->client;
+    int loop = 8;
+    int ret = 0;
+    event_listener_t *listener = client->shared_data;
 
+    if (client->con->error)
+        return -1;
+    if (client->flags & STATS_LARGE)
+        loop = 4;
+    else
+        if (listener->content_len > 50000)
+        {
+            WARN1 ("dropping stats client, %ld in queue", listener->content_len);
+            return -1;
+        }
+    client->schedule_ms = client->worker->time_ms + 100;
+    thread_mutex_lock(&_stats_mutex);
     while (loop)
     {
-        if (format_advance_queue (NULL, client) < 0)
+        refbuf_t *refbuf = client->refbuf;
+
+        if (refbuf == NULL)
             break;
+        if ((client->flags & STATS_LARGE) && (refbuf->flags & STATS_BLOCK_NORMAL))
+            client->flags &= ~STATS_LARGE;
+
         ret = format_generic_write_to_client (client);
-        if (ret < 0)
-            break;
-        listener->content_len -= ret;
+        if (ret > 0)
+            listener->content_len -= ret;
+        if (client->pos == refbuf->len)
+        {
+            client->refbuf = refbuf->next;
+            refbuf->next = NULL;
+            refbuf_release (refbuf);
+            client->pos = 0;
+            if (client->refbuf == NULL)
+            {
+                listener->queue_recent_p = &client->refbuf;
+                break;
+            }
+        }
+        else if (ret < 4096)
+            break; /* short write, so stop for now */
         loop--;
     }
+    thread_mutex_unlock(&_stats_mutex);
+    if (loop == 0)
+        client->schedule_ms -= 100;
+    return 0;
 }
 
-static void clear_stats_queue (event_listener_t *listener)
+
+static void clear_stats_queue (client_t *client)
 {
-    while (listener->queue && listener->queue->_count == 1)
+    refbuf_t *refbuf = client->refbuf;
+    while (refbuf)
     {
-        refbuf_t *to_go = listener->queue;
-        listener->queue = to_go->next;
+        refbuf_t *to_go = refbuf;
+        refbuf = to_go->next;
+        if (to_go->_count != 1) DEBUG1 ("odd count for stats %d", to_go->_count);
+        to_go->next = NULL;
         refbuf_release (to_go);
     }
+    client->refbuf = NULL;
 }
 
 
@@ -616,34 +657,8 @@
 
     while (listener)
     {
-        client_t *client = listener->client;
-
         if (listener->hidden_level & hidden_level)
-        {
             _add_stats_to_stats_client (listener, fmt, ap);
-        }
-        stats_listeners_send (listener);
-        if (client->con->error || listener->content_len > 10000)
-        {
-            stats_event_t stats_count;
-            char buffer [20];
-
-            *trail = listener->next;
-
-            build_event (&stats_count, NULL, "stats_connections", buffer);
-            stats_count.action = STATS_EVENT_DEC;
-            process_event_unlocked (&stats_count);
-
-            /* moved this listener so that final cleanup can be done outside of lock */
-            listener->next = _stats.listeners_removed;
-            _stats.listeners_removed = listener;
-
-            listener = *trail;
-            continue;
-        }
-        /* reduce queue if unsued */
-        clear_stats_queue (listener);
-
         trail = &listener->next;
         listener = listener->next;
     }
@@ -736,6 +751,7 @@
 
         if (_append_to_bufferv (refbuf, size, fmt, ap) == 0)
         {
+            refbuf->flags |= STATS_BLOCK_NORMAL;
             _add_node_to_stats_client (listener, refbuf);
             return;
         }
@@ -860,8 +876,6 @@
             node2 = avl_get_next (node2);
         }
     }
-
-    client_set_queue (listener->client, refbuf);
     _add_node_to_stats_client (listener, refbuf);
 
     /* now we register to receive future event notices */
@@ -870,31 +884,60 @@
 }
 
 
-static void stats_callback (client_t *client, void *arg)
+static void stats_client_release (client_t *client)
 {
-    event_listener_t *listener = arg;
+    event_listener_t *listener = _stats.event_listeners,
+                     **trail = &_stats.event_listeners;
+    while (listener)
+    {
+        if (listener == client->shared_data)
+        {
+            stats_event_t stats_count;
+            char buffer [20];
 
-    client_set_queue (client, NULL);
-
-    thread_mutex_lock(&_stats_mutex);
-    _register_listener (listener);
-    thread_mutex_unlock(&_stats_mutex);
+            *trail = listener->next;
+            clear_stats_queue (client);
+            free (listener->source);
+            free (listener);
+            client_destroy (client);
+            build_event (&stats_count, NULL, "stats_connections", buffer);
+            stats_count.action = STATS_EVENT_DEC;
+            process_event_unlocked (&stats_count);
+            return;
+        }
+        trail = &listener->next;
+        listener = listener->next;
+    }
 }
 
+
+struct _client_functions stats_client_send_ops =
+{
+    stats_listeners_send,
+    stats_client_release
+};
+
 void stats_add_listener (client_t *client, int hidden_level)
 {
     event_listener_t *listener = calloc (1, sizeof (event_listener_t));
     listener->hidden_level = hidden_level;
-    listener->client = client;
-    listener->queue_recent_p = &listener->queue;
 
     client->respcode = 200;
+    client->ops = &stats_client_send_ops;
+    client->shared_data = listener;
     client_set_queue (client, NULL);
+    client->flags |= CLIENT_ACTIVE;
     client->refbuf = refbuf_new (100);
     snprintf (client->refbuf->data, 100,
             "HTTP/1.0 200 OK\r\ncapability: streamlist\r\n\r\n");
     client->refbuf->len = strlen (client->refbuf->data);
-    fserve_add_client_callback (client, stats_callback, listener);
+    listener->content_len = client->refbuf->len;
+    listener->queue_recent_p = &client->refbuf->next;
+
+    client->flags |= STATS_LARGE;
+    thread_mutex_lock(&_stats_mutex);
+    _register_listener (listener);
+    thread_mutex_unlock(&_stats_mutex);
 }
 
 void stats_transform_xslt(client_t *client, const char *uri)
@@ -1073,15 +1116,5 @@
     listener = _stats.listeners_removed;
     _stats.listeners_removed = NULL;
     thread_mutex_unlock (&_stats_mutex);
-
-    /* flush out any closed stats clients */
-    while (listener)
-    {
-        event_listener_t *to_go = listener;
-        listener = listener->next;
-        client_destroy (to_go->client);
-        clear_stats_queue (to_go);
-        free (to_go);
-    }
 }
 

Modified: icecast/branches/kh/icecast/src/xslt.c
===================================================================
--- icecast/branches/kh/icecast/src/xslt.c	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/xslt.c	2009-07-07 23:26:21 UTC (rev 16219)
@@ -236,7 +236,7 @@
         client_set_queue (client, NULL);
         client->refbuf = refbuf;
         refbuf->len = strlen (refbuf->data);
-        fserve_add_client (client, NULL);
+        fserve_setup_client (client, NULL);
         xmlFree (string);
     }
     else

Modified: icecast/branches/kh/icecast/win32/icecast.dsp
===================================================================
--- icecast/branches/kh/icecast/win32/icecast.dsp	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/win32/icecast.dsp	2009-07-07 23:26:21 UTC (rev 16219)
@@ -375,6 +375,10 @@
 # End Source File
 # Begin Source File
 
+SOURCE=..\src\slave.h
+# End Source File
+# Begin Source File
+
 SOURCE=..\src\net\sock.h
 # End Source File
 # Begin Source File

Modified: icecast/branches/kh/icecast/win32/icecast2.iss
===================================================================
--- icecast/branches/kh/icecast/win32/icecast2.iss	2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/win32/icecast2.iss	2009-07-07 23:26:21 UTC (rev 16219)
@@ -3,7 +3,7 @@
 
 [Setup]
 AppName=Icecast2-KH
-AppVerName=Icecast v2.3.2-kh9
+AppVerName=Icecast v2.3.2-kh10
 AppPublisherURL=http://www.icecast.org
 AppSupportURL=http://www.icecast.org
 AppUpdatesURL=http://www.icecast.org
@@ -13,7 +13,7 @@
 LicenseFile=..\COPYING
 InfoAfterFile=..\README
 OutputDir=.
-OutputBaseFilename=icecast2_win32_v2.3.2-kh9_setup
+OutputBaseFilename=icecast2_win32_v2.3.2-kh10_setup
 WizardImageFile=icecast2logo2.bmp
 WizardImageStretch=no
 ; uncomment the following line if you want your installation to run on NT 3.51 too.
@@ -45,7 +45,7 @@
 Source: "..\admin\flashpolicy"; DestDir: "{app}\admin"; Flags: ignoreversion
 Source: "c:\xiph\lib\pthreadVSE.dll"; DestDir: "{app}"; Flags: ignoreversion
 Source: "..\conf\*.dist"; DestDir: "{app}"; Flags: ignoreversion
-Source: "..\examples\icecast_shoutcast_compat.xml"; DestDir: "{app}"; DestName: "icecast.xml"; Flags: ignoreversion
+Source: "..\examples\icecast_shoutcast_compat.xml"; DestDir: "{app}"; DestName: "icecast.xml"; Flags: ignoreversion confirmoverwrite
 Source: "c:\xiph\lib\iconv.dll"; DestDir: "{app}"; Flags: ignoreversion
 Source: "c:\xiph\lib\libxslt.dll"; DestDir: "{app}"; Flags: ignoreversion
 Source: "c:\xiph\lib\libxml2.dll"; DestDir: "{app}"; Flags: ignoreversion



More information about the commits mailing list