[xiph-commits] r16524 - icecast/branches/kh/icecast/src

karl at svn.xiph.org karl at svn.xiph.org
Fri Aug 28 13:19:49 PDT 2009


Author: karl
Date: 2009-08-28 13:19:49 -0700 (Fri, 28 Aug 2009)
New Revision: 16524

Modified:
   icecast/branches/kh/icecast/src/admin.c
   icecast/branches/kh/icecast/src/auth.c
   icecast/branches/kh/icecast/src/auth_url.c
   icecast/branches/kh/icecast/src/client.c
   icecast/branches/kh/icecast/src/client.h
   icecast/branches/kh/icecast/src/connection.c
   icecast/branches/kh/icecast/src/connection.h
   icecast/branches/kh/icecast/src/format.c
   icecast/branches/kh/icecast/src/format.h
   icecast/branches/kh/icecast/src/format_mp3.c
   icecast/branches/kh/icecast/src/format_ogg.c
   icecast/branches/kh/icecast/src/fserve.c
   icecast/branches/kh/icecast/src/global.c
   icecast/branches/kh/icecast/src/logging.c
   icecast/branches/kh/icecast/src/main.c
   icecast/branches/kh/icecast/src/slave.c
   icecast/branches/kh/icecast/src/source.c
   icecast/branches/kh/icecast/src/source.h
   icecast/branches/kh/icecast/src/stats.c
Log:
kh13. connection_t/client_t merge, client scheduling changes. intro content
from listener_add now possible.


Modified: icecast/branches/kh/icecast/src/admin.c
===================================================================
--- icecast/branches/kh/icecast/src/admin.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/admin.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -188,7 +188,7 @@
                 if (source->client)
                 {
                     snprintf (buf, sizeof(buf), "%lu",
-                            (unsigned long)(now - source->client->con->con_time));
+                            (unsigned long)(now - source->client->connection.con_time));
                     xmlNewChild (srcnode, NULL, XMLSTR("Connected"), XMLSTR(buf));
                 }
                 xmlNewChild (srcnode, NULL, XMLSTR("content-type"), 
@@ -289,8 +289,10 @@
     }
     else
     {
+        thread_mutex_lock (&source->lock);
         if (source_available (source) == 0)
         {
+            thread_mutex_unlock (&source->lock);
             avl_tree_unlock (global.source_tree);
             INFO1("Received admin command on unavailable mount \"%s\"", mount);
             client_send_400 (client, "Source is not available");
@@ -453,6 +455,7 @@
     }
     if (!parameters_passed) {
         doc = admin_build_sourcelist(source->mount);
+        thread_mutex_unlock (&source->lock);
         admin_send_response(doc, client, response, "moveclients.xsl");
         xmlFreeDoc(doc);
         return;
@@ -468,6 +471,7 @@
 
     snprintf (buf, sizeof(buf), "Clients moved from %s to %s",
             source->mount, dest_source);
+    thread_mutex_unlock (&source->lock);
     xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(buf));
     xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
 
@@ -613,10 +617,10 @@
 
     xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
 
-    snprintf (buf, sizeof (buf), "%lu", listener->con->id);
+    snprintf (buf, sizeof (buf), "%lu", listener->connection.id);
     xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
 
-    xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->con->ip));
+    xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->connection.ip));
 
     useragent = httpp_getvar (listener->parser, "user-agent");
     if (useragent)
@@ -626,13 +630,16 @@
         xmlFree (str);
     }
 
-    snprintf (buf, sizeof (buf), "%ld", (long)(source->client->queue_pos - listener->queue_pos));
+    if (listener->flags & CLIENT_ACTIVE)
+        snprintf (buf, sizeof (buf), "%ld", (long)(source->client->queue_pos - listener->queue_pos));
+    else
+        snprintf (buf, sizeof (buf), "0");
     xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
 
     if (listener->worker)
     {
         snprintf (buf, sizeof (buf), "%lu",
-                (unsigned long)(listener->worker->current_time.tv_sec - listener->con->con_time));
+                (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
         xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
     }
     if (listener->username)
@@ -654,15 +661,12 @@
     if (source == NULL)
         return;
 
-    thread_mutex_lock (&source->lock);
-
     listener = source->client_list;
     while (listener)
     {
         add_listener_node (srcnode, listener);
         listener = listener->next;
     }
-    thread_mutex_unlock (&source->lock);
 }
 
 
@@ -734,20 +738,19 @@
     else
     {
         client_t *listener;
-        thread_mutex_lock (&source->lock);
 
         listener = source->client_list;
         while (listener)
         {
-            if (listener->con->id == id)
+            if (listener->connection.id == id)
             {
                 add_listener_node (srcnode, listener);
                 break;
             }
             listener = listener->next;
         }
-        thread_mutex_unlock (&source->lock);
     }
+    thread_mutex_unlock (&source->lock);
 
     admin_send_response(doc, client, response, "listclients.xsl");
     xmlFreeDoc(doc);
@@ -871,6 +874,7 @@
         node = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL);
         srcnode = xmlNewChild(node, NULL, XMLSTR("source"), NULL);
         xmlSetProp(srcnode, XMLSTR "mount", XMLSTR(source->mount));
+        thread_mutex_unlock (&source->lock);
 
         if (message) {
             msgnode = xmlNewChild(node, NULL, XMLSTR("iceresponse"), NULL);
@@ -890,6 +894,7 @@
         return;
     } while (0);
 
+    thread_mutex_unlock (&source->lock);
     config_release_config ();
     client_send_400 (client, "missing parameter");
 }
@@ -908,6 +913,7 @@
 
     source->flags &= ~SOURCE_RUNNING;
 
+    thread_mutex_unlock (&source->lock);
     admin_send_response(doc, client, response, "response.xsl");
     xmlFreeDoc(doc);
 }
@@ -926,7 +932,6 @@
 
     id = atoi(idtext);
 
-    thread_mutex_lock (&source->lock);
     listener = source_find_client(source, id);
 
     doc = xmlNewDoc(XMLSTR("1.0"));
@@ -939,7 +944,7 @@
         /* This tags it for removal on the next iteration of the main source
          * loop
          */
-        listener->con->error = 1;
+        listener->connection.error = 1;
         snprintf(buf, sizeof(buf), "Client %d removed", id);
         xmlNewChild(node, NULL, XMLSTR("message"), XMLSTR(buf));
         xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
@@ -958,13 +963,30 @@
 static void command_fallback(client_t *client, source_t *source,
     int response)
 {
-    const char *fallback;
+    char *mount = strdup (source->mount);
+    mount_proxy *mountinfo;
+    ice_config_t *config;
 
+    thread_mutex_unlock (&source->lock);
     DEBUG0("Got fallback request");
+    config = config_grab_config();
+    mountinfo = config_find_mount (config, mount);
+    free (mount);
+    if (mountinfo)
+    {
+        const char *fallback;
+        char buffer[200];
+        COMMAND_REQUIRE(client, "fallback", fallback);
 
-    COMMAND_REQUIRE(client, "fallback", fallback);
-
-    client_send_400 (client, "not implemented");
+        xmlFree (mountinfo->fallback_mount);
+        mountinfo->fallback_mount = (char *)xmlCharStrdup (fallback);
+        snprintf (buffer, sizeof (buffer), "Fallback for \"%s\" configured", mountinfo->mountname);
+        config_release_config ();
+        html_success (client, buffer);
+        return;
+    }
+    config_release_config ();
+    client_send_400 (client, "no mount details available");
 }
 
 static void command_metadata(client_t *client, source_t *source,
@@ -989,10 +1011,8 @@
     COMMAND_OPTIONAL(client, "artwork", artwork);
     COMMAND_OPTIONAL(client, "charset", charset);
 
-    thread_mutex_lock (&source->lock);
-
     plugin = source->format;
-    if (source->client && strcmp (client->con->ip, source->client->con->ip) != 0)
+    if (source->client && strcmp (client->connection.ip, source->client->connection.ip) != 0)
         if (response == RAW && connection_check_admin_pass (client->parser) == 0)
             same_ip = 0;
 
@@ -1067,12 +1087,13 @@
 
     if ((source->flags & SOURCE_SHOUTCAST_COMPAT) == 0)
     {
+        thread_mutex_unlock (&source->lock);
         ERROR0 ("illegal change of metadata on non-shoutcast compatible stream");
         client_send_400 (client, "illegal metadata call");
         return;
     }
 
-    if (source->client && strcmp (client->con->ip, source->client->con->ip) != 0)
+    if (source->client && strcmp (client->connection.ip, source->client->connection.ip) != 0)
         if (connection_check_admin_pass (client->parser) == 0)
             same_ip = 0;
 
@@ -1181,6 +1202,8 @@
 {
     DEBUG0("List mounts request");
 
+    client_set_queue (client, NULL);
+    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
     if (response == TEXT)
     {
         redirector_update (client);
@@ -1214,6 +1237,7 @@
     xmlDocPtr doc;
     xmlNodePtr node, srcnode;
 
+    thread_mutex_unlock (&source->lock);
     doc = xmlNewDoc(XMLSTR("1.0"));
     node = xmlNewDocNode(doc, NULL, XMLSTR("icestats"), NULL);
     srcnode = xmlNewChild(node, NULL, XMLSTR("source"), NULL);

Modified: icecast/branches/kh/icecast/src/auth.c
===================================================================
--- icecast/branches/kh/icecast/src/auth.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/auth.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -189,7 +189,7 @@
         client_t *client = auth_user->client;
 
         if (client->respcode)
-            client->con->error = 1;
+            client->connection.error = 1;
         client_send_401 (client, auth_user->auth->realm);
         client->flags |= CLIENT_ACTIVE;
         auth_user->client = NULL;
@@ -207,7 +207,7 @@
     int ret = 1;
     if (client)
     {
-        if (sock_active (client->con->sock) == 0)
+        if (sock_active (client->connection.sock) == 0)
             ret = 0;
     }
     return ret;
@@ -338,7 +338,19 @@
             auth_user->handler = handler->id;
 
             if (auth_user->process)
+            {
+                worker_t *worker = NULL;
+                if (auth_user->client)
+                    worker = auth_user->client->worker;
                 auth_user->process (auth_user);
+                if (worker)
+                {
+                    /* wakeup worker for new client */
+                    thread_mutex_lock (&worker->lock);
+                    thread_cond_signal (&worker->cond);
+                    thread_mutex_unlock (&worker->lock);
+                }
+            }
             else
                 ERROR0 ("client auth process not set");
 
@@ -360,20 +372,27 @@
 
     DEBUG1 ("moving listener to %s", finfo->mount);
     avl_tree_rlock (global.source_tree);
+
     source = source_find_mount (finfo->mount);
-
-    if (source && source_available (source))
+    while (source)
     {
         thread_mutex_lock (&source->lock);
-        avl_tree_unlock (global.source_tree);
-        source_setup_listener (source, client);
-        thread_mutex_unlock (&source->lock);
+        if (source_available (source))
+        {
+            avl_tree_unlock (global.source_tree);
+            source_setup_listener (source, client);
+            thread_mutex_unlock (&source->lock);
+            return;
+        }
+        if (source->fallback.mount)
+        {
+            source_t *prev = source;
+            source = source_find_mount (prev->fallback.mount);
+            thread_mutex_unlock (&prev->lock);
+        }
     }
-    else
-    {
-        avl_tree_unlock (global.source_tree);
-        fserve_setup_client_fb (client, finfo);
-    }
+    avl_tree_unlock (global.source_tree);
+    fserve_setup_client_fb (client, finfo);
 }
 
 
@@ -441,10 +460,12 @@
     ice_config_t *config;
     mount_proxy *mountinfo;
     const char *mount = auth_user->mount;
+    worker_t *worker;
 
     if (client == NULL)
         return -1;
 
+    worker = client->worker;
     if ((client->flags & CLIENT_AUTHENTICATED) == 0)
     {
         /* auth failed so do we place the listener elsewhere */
@@ -557,6 +578,8 @@
             return 1;
         }
         client->flags &= ~CLIENT_AUTHENTICATED;
+        client_destroy (client);
+        return 1;
     }
     client_send_404 (client, NULL);
     return 0;

Modified: icecast/branches/kh/icecast/src/auth_url.c
===================================================================
--- icecast/branches/kh/icecast/src/auth_url.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/auth_url.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -114,6 +114,11 @@
 } auth_url;
 
 
+struct build_intro_contents
+{
+    refbuf_t *head, **tailp;
+};
+
 static void auth_url_clear(auth_t *self)
 {
     auth_url *url;
@@ -174,7 +179,7 @@
         {
             unsigned int limit = 0;
             sscanf ((char *)ptr+url->timelimit_header_len, "%u\r\n", &limit);
-            client->con->discon_time = time(NULL) + limit;
+            client->connection.discon_time = time(NULL) + limit;
         }
         if (strncasecmp (ptr, "icecast-slave: 1", 16) == 0)
             client->flags |= CLIENT_IS_SLAVE;
@@ -206,10 +211,26 @@
     return (int)bytes;
 }
 
-/* capture returned data, but don't do anything with it */
+
+
 static int handle_returned_data (void *ptr, size_t size, size_t nmemb, void *stream)
 {
-    return (int)(size*nmemb);
+    auth_client *auth_user = stream;
+    unsigned bytes = size * nmemb;
+    client_t *client = auth_user->client;
+
+    if (client && client->respcode == 0)
+    {
+        refbuf_t *n, *r = client->refbuf;
+        struct build_intro_contents *x = (void*)r->data;
+
+        client->flags |= CLIENT_HAS_INTRO_CONTENT;
+        n = refbuf_new (bytes);
+        memcpy (n->data, ptr, bytes);
+        *x->tailp = n;
+        x->tailp = &n->next;
+    }
+    return (int)(bytes);
 }
 
 
@@ -218,7 +239,7 @@
     client_t *client = auth_user->client;
     auth_url *url = auth_user->auth->state;
     auth_thread_data *atd = auth_user->thread_data;
-    time_t duration = time(NULL) - client->con->con_time;
+    time_t duration = time(NULL) - client->connection.con_time;
     char *username, *password, *mount, *server, *ipaddr;
     const char *qargs;
     char *userpwd = NULL, post [4096];
@@ -241,12 +262,12 @@
     qargs = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
     snprintf (post, sizeof post, "%s%s", auth_user->mount, qargs ? qargs : "");
     mount = util_url_escape (post);
-    ipaddr = util_url_escape (client->con->ip);
+    ipaddr = util_url_escape (client->connection.ip);
 
     snprintf (post, sizeof (post),
             "action=listener_remove&server=%s&port=%d&client=%lu&mount=%s"
             "&user=%s&pass=%s&ip=%s&duration=%lu",
-            server, auth_user->port, client->con->id, mount, username,
+            server, auth_user->port, client->connection.id, mount, username,
             password, ipaddr, (long unsigned)duration);
     free (ipaddr);
     free (server);
@@ -280,6 +301,7 @@
     curl_easy_setopt (atd->curl, CURLOPT_URL, url->removeurl);
     curl_easy_setopt (atd->curl, CURLOPT_POSTFIELDS, post);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
+    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
 
     DEBUG1 ("...handler %d sending request", auth_user->handler);
     if (curl_easy_perform (atd->curl))
@@ -304,6 +326,7 @@
     char *user_agent, *username, *password;
     char *mount, *ipaddr, *server;
     ice_config_t *config;
+    struct build_intro_contents *x;
     char *userpwd = NULL, post [4096];
 
     if (url->addurl == NULL)
@@ -330,12 +353,12 @@
     qargs = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
     snprintf (post, sizeof post, "%s%s", auth_user->mount, qargs ? qargs : "");
     mount = util_url_escape (post);
-    ipaddr = util_url_escape (client->con->ip);
+    ipaddr = util_url_escape (client->connection.ip);
 
     snprintf (post, sizeof (post),
             "action=listener_add&server=%s&port=%d&client=%lu&mount=%s"
             "&user=%s&pass=%s&ip=%s&agent=%s",
-            server, port, client->con->id, mount, username,
+            server, port, client->connection.id, mount, username,
             password, ipaddr, user_agent);
     free (server);
     free (mount);
@@ -370,7 +393,12 @@
     curl_easy_setopt (atd->curl, CURLOPT_URL, url->addurl);
     curl_easy_setopt (atd->curl, CURLOPT_POSTFIELDS, post);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
+    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
     atd->errormsg[0] = '\0';
+    /* setup in case intro data is returned */
+    x = (void *)client->refbuf->data;
+    x->head = NULL;
+    x->tailp = &x->head;
 
     DEBUG1 ("handler %d sending request", auth_user->handler);
     res = curl_easy_perform (atd->curl);
@@ -395,7 +423,11 @@
     }
     /* we received a response, lets see what it is */
     if (client->flags & CLIENT_AUTHENTICATED)
+    {
+        if (client->flags & CLIENT_HAS_INTRO_CONTENT)
+            client->refbuf->next = x->head;
         return AUTH_OK;
+    }
     if (atoi (atd->errormsg) == 403)
     {
         client_send_403 (client, atd->errormsg+4);
@@ -419,8 +451,8 @@
 
     server = util_url_escape (auth_user->hostname);
     mount = util_url_escape (auth_user->mount);
-    if (client && client->con)
-        ipaddr = util_url_escape (client->con->ip);
+    if (client && client->connection.ip)
+        ipaddr = util_url_escape (client->connection.ip);
     else
         ipaddr = strdup("");
 
@@ -442,6 +474,7 @@
     curl_easy_setopt (atd->curl, CURLOPT_URL, url->stream_start);
     curl_easy_setopt (atd->curl, CURLOPT_POSTFIELDS, post);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
+    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
 
     DEBUG1 ("handler %d sending request", auth_user->handler);
     if (curl_easy_perform (atd->curl))
@@ -460,8 +493,8 @@
 
     server = util_url_escape (auth_user->hostname);
     mount = util_url_escape (auth_user->mount);
-    if (client && client->con)
-        ipaddr = util_url_escape (client->con->ip);
+    if (client && client->connection.ip)
+        ipaddr = util_url_escape (client->connection.ip);
     else
         ipaddr = strdup("");
 
@@ -483,6 +516,7 @@
     curl_easy_setopt (atd->curl, CURLOPT_URL, url->stream_end);
     curl_easy_setopt (atd->curl, CURLOPT_POSTFIELDS, post);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
+    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
 
     DEBUG1 ("handler %d sending request", auth_user->handler);
     if (curl_easy_perform (atd->curl))
@@ -511,13 +545,14 @@
     curl_easy_setopt (atd->curl, CURLOPT_URL, url->stream_auth);
     curl_easy_setopt (atd->curl, CURLOPT_POSTFIELDS, post);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEHEADER, auth_user);
+    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, auth_user);
     if (strcmp (auth_user->mount, httpp_getvar (client->parser, HTTPP_VAR_URI)) != 0)
         admin = "&admin=1";
     mount = util_url_escape (auth_user->mount);
     host = util_url_escape (auth_user->hostname);
     user = util_url_escape (client->username);
     pass = util_url_escape (client->password);
-    ipaddr = util_url_escape (client->con->ip);
+    ipaddr = util_url_escape (client->connection.ip);
 
     snprintf (post, sizeof (post),
             "action=stream_auth&mount=%s&ip=%s&server=%s&port=%d&user=%s&pass=%s%s",
@@ -560,7 +595,6 @@
     curl_easy_setopt (atd->curl, CURLOPT_USERAGENT, atd->server_id);
     curl_easy_setopt (atd->curl, CURLOPT_HEADERFUNCTION, handle_returned_header);
     curl_easy_setopt (atd->curl, CURLOPT_WRITEFUNCTION, handle_returned_data);
-    curl_easy_setopt (atd->curl, CURLOPT_WRITEDATA, atd);
     curl_easy_setopt (atd->curl, CURLOPT_NOSIGNAL, 1L);
     curl_easy_setopt (atd->curl, CURLOPT_TIMEOUT, 6L);
 #ifdef CURLOPT_PASSWDFUNCTION

Modified: icecast/branches/kh/icecast/src/client.c
===================================================================
--- icecast/branches/kh/icecast/src/client.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/client.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -45,34 +45,28 @@
 
 int worker_count;
 
-/* create a client_t with the provided connection and parser details. Return
- * client_t ready for use.  Should be called with global lock held.
+/* Return client_t ready for use. The provided socket can be SOCK_ERROR to
+ * allocate a dummy client_t.  Must be called with global lock held.
  */
-client_t *client_create (connection_t *con, http_parser_t *parser)
+client_t *client_create (sock_t sock)
 {
-    client_t *client = (client_t *)calloc(1, sizeof(client_t));
+    client_t *client = calloc (1, sizeof (client_t));
 
-    if (client == NULL)
-        abort();
-
-    global.clients++;
-
-    if (con && con->serversock != SOCK_ERROR)
+    if (sock != SOCK_ERROR)
     {
-        int i;
-        for (i=0; i < global.server_sockets; i++)
+        refbuf_t *r;
+        if (connection_init (&client->connection, sock) < 0)
         {
-            if (global.serversock[i] == con->serversock)
-            {
-                client->server_conn = global.server_conn[i];
-                client->server_conn->refcount++;
-            }
+            free (client);
+            return NULL;
         }
+        r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+        r->len = 0;
+        client->shared_data = r;
+        client->flags |= CLIENT_ACTIVE;
     }
+    global.clients++;
     stats_event_args (NULL, "clients", "%d", global.clients);
-    client->con = con;
-    client->parser = parser;
-    client->pos = 0;
     return client;
 }
 
@@ -82,6 +76,11 @@
     if (client == NULL)
         return;
 
+    if (client->worker)
+    {
+        WARN0 ("client still on worker thread");
+        return;
+    }
     /* release the buffer now, as the buffer could be on the source queue
      * and may of disappeared after auth completes */
     if (client->refbuf)
@@ -99,8 +98,7 @@
     if (client->respcode && client->parser)
         logging_access(client);
 
-    if (client->con)
-        connection_close(client->con);
+    connection_close (&client->connection);
     if (client->parser)
         httpp_destroy (client->parser);
 
@@ -124,6 +122,7 @@
 /* helper function for reading data from a client */
 int client_read_bytes (client_t *client, void *buf, unsigned len)
 {
+    int (*con_read)(struct connection_tag *handle, void *buf, size_t len) = connection_read;
     int bytes;
 
     if (client->refbuf && client->pos < client->refbuf->len)
@@ -135,9 +134,13 @@
         client->pos += remaining;
         return remaining;
     }
-    bytes = client->con->read (client->con, buf, len);
+#ifdef HAVE_OPENSSL
+    if (client->connection.ssl)
+        con_read = connection_read_ssl;
+#endif
+    bytes = con_read (&client->connection, buf, len);
 
-    if (bytes == -1 && client->con->error)
+    if (bytes == -1 && client->connection.error)
         DEBUG0 ("reading from connection has failed");
 
     return bytes;
@@ -146,6 +149,8 @@
 
 void client_send_302(client_t *client, const char *location)
 {
+    client_set_queue (client, NULL);
+    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
             "HTTP/1.0 302 Temporarily Moved\r\n"
             "Content-Type: text/html\r\n"
@@ -158,6 +163,8 @@
 
 
 void client_send_400(client_t *client, char *message) {
+    client_set_queue (client, NULL);
+    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
             "HTTP/1.0 400 Bad Request\r\n"
             "Content-Type: text/html\r\n\r\n"
@@ -175,7 +182,7 @@
     if (realm == NULL)
         realm = config->server_id;
 
-    refbuf_release (client->refbuf);
+    client_set_queue (client, NULL);
     client->refbuf = refbuf_new (500);
     snprintf (client->refbuf->data, 500,
             "HTTP/1.0 401 Authentication Required\r\n"
@@ -193,6 +200,8 @@
 {
     if (reason == NULL)
         reason = "Forbidden";
+    client_set_queue (client, NULL);
+    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
             "HTTP/1.0 403 %s\r\n"
             "Content-Type: text/html\r\n\r\n", reason);
@@ -228,13 +237,15 @@
                 "<b>%s</b>\r\n", message);
         client->respcode = 404;
         client->refbuf->len = strlen (client->refbuf->data);
+        fserve_setup_client (client, NULL);
     }
-    fserve_setup_client (client, NULL);
 }
 
 
 void client_send_416(client_t *client)
 {
+    client_set_queue (client, NULL);
+    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
     snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
             "HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
     client->respcode = 416;
@@ -246,42 +257,37 @@
 /* helper function for sending the data to a client */
 int client_send_bytes (client_t *client, const void *buf, unsigned len)
 {
-#ifdef HAVE_AIO
-    int ret, err;
-    struct aiocb *aiocbp = &client->aio;
+    int (*con_send)(struct connection_tag *handle, const void *buf, size_t len) = connection_send;
+    int ret;
+#ifdef HAVE_OPENSSL
+    if (client->connection.ssl)
+        con_send = connection_send_ssl;
+#endif
+    ret = con_send (&client->connection, buf, len);
 
-    if (client->pending_io == 0)
-    {
-        memset (aiocbp, 0 , sizeof (struct aiocb));
-        aiocbp->aio_fildes = client->con->sock;
-        aiocbp->aio_buf = (void*)buf; /* only read from */
-        aiocbp->aio_nbytes = len;
-
-        if (aio_write (aiocbp) < 0)
-            return -1;
-        client->pending_io = 1;
-    }
-    if ((err = aio_error (aiocbp)) == EINPROGRESS)
-        return -1;
-    ret = aio_return (aiocbp);
-    if (ret < 0)
-        sock_set_error (err); /* make sure errno gets set */
-
-    client->pending_io = 0;
-#else
-    int ret = client->con->send (client->con, buf, len);
-
-    if (client->con->error)
+    if (client->connection.error)
         DEBUG0 ("Client connection died");
 
     return ret;
-#endif
 }
 
 void client_set_queue (client_t *client, refbuf_t *refbuf)
 {
     refbuf_t *to_release = client->refbuf;
 
+    if (to_release && client->flags & CLIENT_HAS_INTRO_CONTENT)
+    {
+        refbuf_t *intro = to_release->next;
+        while (intro)
+        {
+            refbuf_t *r = intro->next;
+            intro->next = NULL;
+            refbuf_release (intro);
+            intro = r;
+        }
+        to_release->next = NULL;
+        client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
+    }
     client->refbuf = refbuf;
     if (refbuf)
         refbuf_addref (client->refbuf);
@@ -293,12 +299,11 @@
 
 worker_t *find_least_busy_handler (void)
 {
-    worker_t *handler, *min = NULL;
+    worker_t *min = workers;
 
-    if (workers)
+    if (workers && workers->next)
     {
-        min = workers;
-        handler = workers->next;
+        worker_t *handler = workers->next;
         DEBUG2 ("handler %p has %d clients", min, min->count);
         while (handler)
         {
@@ -320,13 +325,14 @@
     *this_worker->current_p = client->next_on_worker;
     this_worker->count--;
     thread_mutex_unlock (&this_worker->lock);
+    client->next_on_worker = NULL;
 
     thread_mutex_lock (&dest_worker->lock);
     if (dest_worker->running)
     {
         client->worker = dest_worker;
-        client->next_on_worker = dest_worker->clients;
-        dest_worker->clients = client;
+        *dest_worker->last_p = client;
+        dest_worker->last_p = &client->next_on_worker;
         dest_worker->count++;
         client->flags |= CLIENT_HAS_CHANGED_THREAD;
         // make client inactive so that the destination thread does not run it straight away
@@ -348,8 +354,8 @@
     thread_rwlock_unlock (&workers_lock);
 
     client->schedule_ms = handler->time_ms;
-    client->next_on_worker = handler->clients;
-    handler->clients = client;
+    *handler->last_p = client;
+    handler->last_p = &client->next_on_worker;
     client->worker = handler;
     ++handler->count;
     if (handler->wakeup_ms - handler->time_ms > 15)
@@ -405,6 +411,8 @@
                         client->flags &= ~CLIENT_HAS_CHANGED_THREAD;
                         client->flags |= CLIENT_ACTIVE;
                         client = *prevp;
+                        if (client == NULL)
+                            handler->last_p = prevp;
                         continue;
                     }
                     if (ret < 0)
@@ -417,6 +425,8 @@
                         if (client->ops->release)
                             client->ops->release (client);
                         client = *prevp;
+                        if (client == NULL)
+                            handler->last_p = prevp;
                         continue;
                     }
                 }
@@ -444,6 +454,7 @@
     thread_mutex_create (&handler->lock);
     thread_cond_create (&handler->cond);
     thread_rwlock_wlock (&workers_lock);
+    handler->last_p = &handler->clients;
     handler->next = workers;
     workers = handler;
     worker_count++;
@@ -454,7 +465,8 @@
 static void worker_stop (void)
 {
     worker_t *handler = workers;
-    client_t *clients = NULL;
+    client_t *clients = NULL, **last;
+    int count;
 
     if (handler == NULL)
         return;
@@ -471,8 +483,8 @@
     thread_sleep (10000);
     thread_mutex_lock (&handler->lock);
     clients = handler->clients;
-    handler->clients = NULL;
-    handler->count = 0;
+    last = handler->last_p;
+    count = handler->count;
     thread_cond_signal (&handler->cond);
     thread_mutex_unlock (&handler->lock);
     if (clients)
@@ -481,18 +493,9 @@
             WARN0 ("clients left unprocessed");
         else
         {
-            // move clients to another worker
-            client_t *endp  = clients;
-            int count = 0;
-
             thread_mutex_lock (&workers->lock);
-            while (endp->next_on_worker)
-            {
-                endp = endp->next_on_worker;
-                count++;
-            }
-            endp->next_on_worker = workers->clients;
-            workers->clients = clients;
+            *workers->last_p = clients;
+            workers->last_p = last;
             workers->count += count;
             thread_mutex_unlock (&workers->lock);
         }

Modified: icecast/branches/kh/icecast/src/client.h
===================================================================
--- icecast/branches/kh/icecast/src/client.h	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/client.h	2009-08-28 20:19:49 UTC (rev 16524)
@@ -35,7 +35,7 @@
     mutex_t lock;
     cond_t cond;
     client_t *clients;
-    client_t **current_p;
+    client_t **current_p, **last_p;
     thread_type *thread;
     struct timespec current_time;
     uint64_t time_ms;
@@ -69,8 +69,9 @@
 
     client_t *next_on_worker;
 
-    /* the client's connection */
-    connection_t *con;
+    /* the clients connection */
+    connection_t connection;
+
     /* the client's http headers */
     http_parser_t *parser;
 
@@ -116,7 +117,7 @@
     client_t *next;  /* for use with grouping similar clients */
 };
 
-client_t *client_create (connection_t *con, http_parser_t *parser);
+client_t *client_create (sock_t sock);
 void client_destroy(client_t *client);
 void client_send_504(client_t *client, char *message);
 void client_send_416(client_t *client);
@@ -142,6 +143,7 @@
 #define CLIENT_IS_SLAVE             (004)
 #define CLIENT_HAS_CHANGED_THREAD   (010)
 #define CLIENT_NO_CONTENT_LENGTH    (020)
+#define CLIENT_HAS_INTRO_CONTENT    (040)
 #define CLIENT_FORMAT_BIT           (01000)
 
 #endif  /* __CLIENT_H__ */

Modified: icecast/branches/kh/icecast/src/connection.c
===================================================================
--- icecast/branches/kh/icecast/src/connection.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/connection.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -31,6 +31,7 @@
 #ifndef _WIN32
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <netdb.h>
 #endif
 
 #include "compat.h"
@@ -96,7 +97,7 @@
 static time_t now;
 static spin_t _connection_lock;
 static volatile unsigned long _current_id = 0;
-static volatile thread_type *conn_tid;
+thread_type *conn_tid;
 
 static int ssl_ok;
 #ifdef HAVE_OPENSSL
@@ -249,7 +250,7 @@
 /* handlers for reading and writing a connection_t when there is ssl
  * configured on the listening port
  */
-static int connection_read_ssl (connection_t *con, void *buf, size_t len)
+int connection_read_ssl (connection_t *con, void *buf, size_t len)
 {
     int bytes = SSL_read (con->ssl, buf, len);
 
@@ -266,7 +267,7 @@
     return bytes;
 }
 
-static int connection_send_ssl (connection_t *con, const void *buf, size_t len)
+int connection_send_ssl (connection_t *con, const void *buf, size_t len)
 {
     int bytes = SSL_write (con->ssl, buf, len);
 
@@ -298,7 +299,7 @@
 /* handlers (default) for reading and writing a connection_t, no encrpytion
  * used just straight access to the socket
  */
-static int connection_read (connection_t *con, void *buf, size_t len)
+int connection_read (connection_t *con, void *buf, size_t len)
 {
     int bytes = sock_read_bytes (con->sock, buf, len);
     if (bytes == 0)
@@ -308,7 +309,7 @@
     return bytes;
 }
 
-static int connection_send (connection_t *con, const void *buf, size_t len)
+int connection_send (connection_t *con, const void *buf, size_t len)
 {
     int bytes = sock_write_bytes (con->sock, buf, len);
     if (bytes < 0)
@@ -416,30 +417,49 @@
 }
 
 
-connection_t *connection_create (sock_t sock, sock_t serversock, char *ip)
+int connection_init (connection_t *con, sock_t sock)
 {
-    connection_t *con;
-    con = (connection_t *)calloc(1, sizeof(connection_t));
     if (con)
     {
-        con->sock = sock;
-        con->serversock = serversock;
-        con->read = connection_read;
-        con->send = connection_send;
+        struct sockaddr_storage sa;
+        socklen_t slen = sizeof (sa);
+
         con->con_time = time(NULL);
         con->id = _next_connection_id();
-        con->ip = ip;
+        con->discon_time = con->con_time + header_timeout;
+        con->sock = sock;
+        if (sock == SOCK_ERROR)
+            return 0;
+        if (getpeername (sock, (struct sockaddr *)&sa, &slen) == 0)
+        {
+            char *ip;
+#ifdef HAVE_GETNAMEINFO
+            char buffer [200] = "unknown";
+            getnameinfo ((struct sockaddr *)&sa, slen, buffer, 200, NULL, 0, NI_NUMERICHOST);
+            ip = strdup (buffer);
+#else
+            int len = 30;
+            ip = malloc (len);
+            strncpy (ip, inet_ntoa (sa.sin_addr), len);
+#endif
+            if (accept_ip_address (ip))
+            {
+                con->ip = ip;
+                return 0;
+            }
+            free (ip);
+        }
+        memset (con, 0, sizeof (connection_t));
     }
-    return con;
+    return -1;
 }
 
+
 /* prepare connection for interacting over a SSL connection
  */
 void connection_uses_ssl (connection_t *con)
 {
 #ifdef HAVE_OPENSSL
-    con->read = connection_read_ssl;
-    con->send = connection_send_ssl;
     con->ssl = SSL_new (ssl_ctx);
     SSL_set_accept_state (con->ssl);
     SSL_set_fd (con->ssl, con->sock);
@@ -529,42 +549,57 @@
 #endif
 }
 
-static connection_t *_accept_connection(int duration)
+
+static client_t *accept_client (int duration)
 {
-    sock_t sock, serversock;
-    char *ip;
+    client_t *client;
+    sock_t sock, serversock = wait_for_serversock (duration);
 
-    serversock = wait_for_serversock (duration);
     if (serversock == SOCK_ERROR)
         return NULL;
 
-    now = time(NULL);
-    /* malloc enough room for a full IP address (including ipv6) */
-    ip = (char *)malloc(MAX_ADDR_LEN);
-
-    sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
-    if (sock != SOCK_ERROR)
+    sock = sock_accept (serversock, NULL, 0);
+    if (sock == SOCK_ERROR)
     {
-        connection_t *con = NULL;
-        /* Make any IPv4 mapped IPv6 address look like a normal IPv4 address */
-        if (strncmp (ip, "::ffff:", 7) == 0)
-            memmove (ip, ip+7, strlen (ip+7)+1);
-
-        if (accept_ip_address (ip))
-            con = connection_create (sock, serversock, ip);
-        if (con)
-            return con;
-        sock_close (sock);
+        if (sock_recoverable (sock_error()))
+            return NULL;
+        WARN2 ("accept() failed with error %d: %s", sock_error(), strerror(sock_error()));
+        thread_sleep (500000);
+        return NULL;
     }
-    else
+    global_lock ();
+    client = client_create (sock);
+    if (client)
     {
-        if (!sock_recoverable(sock_error()))
+        connection_t *con = &client->connection;
+        int i;
+        for (i=0; i < global.server_sockets; i++)
         {
-            WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error()));
-            thread_sleep (500000);
+            if (global.serversock[i] == serversock)
+            {
+                client->server_conn = global.server_conn[i];
+                client->server_conn->refcount++;
+                if (client->server_conn->ssl && ssl_ok)
+                    connection_uses_ssl (con);
+                if (client->server_conn->shoutcast_compat)
+                    client->ops = &shoutcast_source_ops;
+                else
+                    client->ops = &http_request_ops;
+                break;
+            }
         }
+        global_unlock ();
+        stats_event_inc (NULL, "connections");
+        if (sock_set_blocking (con->sock, 0) || sock_set_nodelay (con->sock))
+        {
+            WARN0 ("failed to set tcp options on client connection, dropping");
+            client_destroy (client);
+            client = NULL;
+        }
+        return client;
     }
-    free (ip);
+    global_unlock ();
+    sock_close (sock);
     return NULL;
 }
 
@@ -577,7 +612,8 @@
 {
     do
     {
-        if (client->con->error || client->con->discon_time <= client->worker->current_time.tv_sec)
+        connection_t *con = &client->connection;
+        if (con->error || con->discon_time <= client->worker->current_time.tv_sec)
             break;
 
         if (client->shared_data)  /* need to get password first */
@@ -593,7 +629,7 @@
                 break;
 
             ret = client_read_bytes (client, buf, remaining);
-            if (ret == 0 || client->con->error)
+            if (ret == 0 || con->error)
                 break;
             if (ret < 0)
                 return 0;
@@ -650,7 +686,7 @@
     refbuf_t *refbuf = client->shared_data;
     int remaining = PER_CLIENT_REFBUF_SIZE - 1 - refbuf->len, ret = -1;
 
-    if (remaining && client->con->discon_time > client->worker->current_time.tv_sec)
+    if (remaining && client->connection.discon_time > client->worker->current_time.tv_sec)
     {
         char *buf = refbuf->data + refbuf->len;
 
@@ -701,7 +737,7 @@
             } while (0);
             client->refbuf = client->shared_data;
             client->shared_data = NULL;
-            client->con->discon_time = 0;
+            client->connection.discon_time = 0;
             client->parser = httpp_create_parser();
             httpp_initialize (client->parser, NULL);
             if (httpp_parse (client->parser, refbuf->data, refbuf->len))
@@ -751,7 +787,7 @@
             /* invalid http request */
             return -1;
         }
-        if (ret && client->con->error == 0)
+        if (ret && client->connection.error == 0)
         {
             client->schedule_ms = client->worker->time_ms + 100;
             return 0;
@@ -763,9 +799,8 @@
 }
 
 
-void *connection_thread (void *arg)
+static void *connection_thread (void *arg)
 {
-    connection_t *con;
     ice_config_t *config;
 
     connection_running = 1;
@@ -780,37 +815,9 @@
 
     while (connection_running)
     {
-        con = _accept_connection (333);
-
-        if (con)
+        client_t *client = accept_client (333);
+        if (client)
         {
-            client_t *client = NULL;
-            refbuf_t *r;
-
-            global_lock();
-            client = client_create (con, NULL);
-            global_unlock();
-
-            if (client->server_conn->ssl && ssl_ok)
-                connection_uses_ssl (client->con);
-
-            if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock))
-            {
-                WARN0 ("failed to set tcp options on client connection, dropping");
-                client_destroy (client);
-                continue;
-            }
-
-            if (client->server_conn->shoutcast_compat)
-                client->ops = &shoutcast_source_ops;
-            else
-                client->ops = &http_request_ops;
-            r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-            r->len = 0;
-            client->shared_data = r;
-            client->flags |= CLIENT_ACTIVE;
-            client->con->discon_time = time(NULL) + header_timeout;
-
             /* do a small delay here so the client has chance to send the request after
              * getting a connect. This also prevents excessively large number of new
              * listeners from joining at the same time */
@@ -840,11 +847,10 @@
 {
     if (conn_tid)
     {
-        thread_type *tid = (thread_type*)conn_tid;;
+        connection_running = 0;
+        INFO0("shutting down connection thread");
+        thread_join (conn_tid);
         conn_tid = NULL;
-        INFO0("shutting down connection thread");
-        connection_running = 0;
-        thread_join (tid);
     }
 }
 
@@ -1315,12 +1321,14 @@
 
 void connection_close(connection_t *con)
 {
-    sock_close(con->sock);
-    if (con->ip) free(con->ip);
-    if (con->host) free(con->host);
+    if (con->con_time)
+    {
+        sock_close(con->sock);
+        free(con->ip);
 #ifdef HAVE_OPENSSL
-    if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); }
+        if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); }
 #endif
-    free(con);
+        memset (con, 0, sizeof (connection_t));
+    }
 }
 

Modified: icecast/branches/kh/icecast/src/connection.h
===================================================================
--- icecast/branches/kh/icecast/src/connection.h	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/connection.h	2009-08-28 20:19:49 UTC (rev 16524)
@@ -21,9 +21,9 @@
 #endif
 
 struct source_tag;
+struct ice_config_tag;
 typedef struct connection_tag connection_t;
 
-#include "cfgfile.h"
 #include "compat.h"
 #include "httpp/httpp.h"
 #include "net/sock.h"
@@ -37,17 +37,13 @@
     uint64_t sent_bytes;
 
     sock_t sock;
-    sock_t serversock;
     int error;
 
 #ifdef HAVE_OPENSSL
     SSL *ssl;   /* SSL handler */
 #endif
-    int (*send)(struct connection_tag *handle, const void *buf, size_t len);
-    int (*read)(struct connection_tag *handle, void *buf, size_t len);
 
     char *ip;
-    char *host;
 };
 
 #ifdef HAVE_OPENSSL
@@ -61,9 +57,15 @@
 void connection_thread_shutdown();
 int  connection_setup_sockets (struct ice_config_tag *config);
 void connection_close(connection_t *con);
-connection_t *connection_create (sock_t sock, sock_t serversock, char *ip);
+int  connection_init (connection_t *con, sock_t sock);
 int connection_complete_source (struct source_tag *source, int response);
 void connection_uses_ssl (connection_t *con);
+#ifdef HAVE_OPENSSL
+int  connection_read_ssl (connection_t *con, void *buf, size_t len);
+int  connection_send_ssl (connection_t *con, const void *buf, size_t len);
+#endif
+int  connection_read (connection_t *con, void *buf, size_t len);
+int  connection_send (connection_t *con, const void *buf, size_t len);
 void connection_thread_shutdown_req (void);
 
 int connection_check_pass (http_parser_t *parser, const char *user, const char *pass);

Modified: icecast/branches/kh/icecast/src/format.c
===================================================================
--- icecast/branches/kh/icecast/src/format.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/format.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -96,12 +96,29 @@
 {
     refbuf_t *refbuf = client->refbuf;
 
-    if (intro == NULL)
+    if (refbuf == NULL)
         return -1;
     if (client->pos == refbuf->len)
     {
         size_t bytes;
 
+        if (client->flags & CLIENT_HAS_INTRO_CONTENT)
+        {
+            if (refbuf->next)
+            {
+                client->refbuf = refbuf->next;
+                refbuf->next = NULL;
+                refbuf_release (refbuf);
+                client->pos = 0;
+                return 0;
+            }
+            client_set_queue (client, NULL);
+            client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
+            client->intro_offset = client->connection.sent_bytes;
+            return -1;
+        }
+        if (intro == NULL)
+            return -1;
         if (fseek (intro, client->intro_offset, SEEK_SET) < 0)
             return -1;
         bytes = fread (refbuf->data, 1, PER_CLIENT_REFBUF_SIZE, intro);
@@ -123,7 +140,7 @@
     const char *buf = refbuf->data + client->pos;
     unsigned int len = refbuf->len - client->pos;
 
-    if (len > 4096) /* make sure we don't send huge amounts in one go */
+    if (len > 5000) /* make sure we don't send huge amounts in one go */
         len = 4096;
     ret = client_send_bytes (client, buf, len);
 
@@ -134,26 +151,24 @@
 }
 
 
-int format_prepare_headers (source_t *source, client_t *client)
+int format_general_headers (source_t *source, client_t *client)
 {
-    unsigned remaining;
-    char *ptr;
+    unsigned remaining = 4096 - client->refbuf->len;
+    char *ptr = client->refbuf->data + client->refbuf->len;
     int bytes;
     int bitrate_filtered = 0;
     avl_node *node;
     ice_config_t *config;
 
-    DEBUG0 ("processing listener headers");
-    remaining = client->refbuf->len;
-    ptr = client->refbuf->data;
-    client->respcode = 200;
+    if (client->respcode == 0)
+    {
+        bytes = snprintf (ptr, remaining, "HTTP/1.0 200 OK\r\n"
+                "Content-Type: %s\r\n", source->format->contenttype);
+        remaining -= bytes;
+        ptr += bytes;
+        client->respcode = 200;
+    }
 
-    bytes = snprintf (ptr, remaining, "HTTP/1.0 200 OK\r\n"
-            "Content-Type: %s\r\n", source->format->contenttype);
-
-    remaining -= bytes;
-    ptr += bytes;
-
     /* iterate through source http headers and send to client */
     avl_tree_rlock (source->parser->vars);
     node = avl_get_first (source->parser->vars);
@@ -227,10 +242,7 @@
     remaining -= bytes;
     ptr += bytes;
 
-    client->refbuf->len -= remaining;
-    if (source->format->create_client_data)
-        if (source->format->create_client_data (source, client) < 0)
-            bytes = -1;
-    return bytes;
+    client->refbuf->len = 4096 - remaining;
+    return 0;
 }
 

Modified: icecast/branches/kh/icecast/src/format.h
===================================================================
--- icecast/branches/kh/icecast/src/format.h	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/format.h	2009-08-28 20:19:49 UTC (rev 16524)
@@ -64,7 +64,7 @@
 int format_generic_write_to_client (client_t *client);
 
 int format_file_read (client_t *client, FILE *fp);
-int format_prepare_headers (struct source_tag *source, client_t *client);
+int format_general_headers (struct source_tag *source, client_t *client);
 
 void format_send_general_headers(format_plugin_t *format, 
         struct source_tag *source, client_t *client);

Modified: icecast/branches/kh/icecast/src/format_mp3.c
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/format_mp3.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -382,14 +382,14 @@
             {
                 client_mp3->metadata_offset += (ret - remaining);
                 client->flags |= CLIENT_IN_METADATA;
-                client->schedule_ms += 100;
+                client->schedule_ms += 300;
             }
             client_mp3->since_meta_block = 0;
             client->pos += remaining;
             client->queue_pos += remaining;
             return ret;
         }
-        client->schedule_ms += 100;
+        client->schedule_ms += 300;
         if (ret > 0)
         {
             client_mp3->since_meta_block += ret;
@@ -409,7 +409,7 @@
     }
     if (ret > 0)
         client_mp3->metadata_offset += ret;
-    client->schedule_ms += 100;
+    client->schedule_ms += 300;
     client->flags |= CLIENT_IN_METADATA;
 
     return ret > 0 ? ret : 0;
@@ -476,6 +476,8 @@
         ret = 0;
     } while (0);
 
+    if (ret < 0)
+        client->schedule_ms += 250;
     if (ret > 0)
         written += ret;
     return written == 0 ? -1 : written;
@@ -521,8 +523,6 @@
     {
         int read_in = source_mp3->queue_block_size - source_mp3->read_count;
         bytes = client_read_bytes (client, buf, read_in);
-        if (bytes < read_in)
-            client->schedule_ms = client->worker->time_ms + source->skip_duration;
         if (bytes < 0)
             return 0;
         rate_add (format->in_bitrate, bytes, client->worker->current_time.tv_sec);
@@ -695,9 +695,8 @@
     mp3_client_data *client_mp3 = calloc(1,sizeof(mp3_client_data));
     mp3_state *source_mp3 = source->format->_state;
     const char *metadata;
-    /* the +-2 is for overwriting the last set of \r\n */
-    size_t  remaining = 4096 - client->refbuf->len + 2;
-    char *ptr = client->refbuf->data + client->refbuf->len - 2;
+    size_t  remaining = 4096;
+    char *ptr = client->refbuf->data;
     int bytes;
     const char *useragent;
 
@@ -721,6 +720,15 @@
 
     client->format_data = client_mp3;
     client->free_client_data = free_mp3_client_data;
+    client->refbuf->len = 4096 - remaining;
+
+    if (format_general_headers (source, client) < 0)
+        return -1;
+
+    remaining = 4096 - client->refbuf->len + 2;
+    ptr = client->refbuf->data + client->refbuf->len - 2;
+
+    /* check for shoutcast style metadata inserts */
     metadata = httpp_getvar(client->parser, "icy-metadata");
     if (metadata && atoi(metadata))
     {

Modified: icecast/branches/kh/icecast/src/format_ogg.c
===================================================================
--- icecast/branches/kh/icecast/src/format_ogg.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/format_ogg.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -462,8 +462,6 @@
         data = ogg_sync_buffer (&ogg_info->oy, 4096);
 
         bytes = client_read_bytes (source->client, data, 4096);
-        if (bytes < 4096)
-            source->client->schedule_ms = source->client->worker->time_ms + source->skip_duration;
         if (bytes <= 0)
         {
             ogg_sync_wrote (&ogg_info->oy, 0);
@@ -486,7 +484,7 @@
         client_data->headers_sent = 1;
         client->format_data = client_data;
         client->free_client_data = free_ogg_client_data;
-        ret = 0;
+        ret = format_general_headers (source, client);
     }
     return ret;
 }
@@ -575,8 +573,11 @@
         ret = 0;
     } while (0);
 
-    if (ret > 0)
-       written += ret;
+    if (ret > 0) /* short write */
+    {
+        client->schedule_ms += 250;
+        written += ret;
+    }
     return written;
 }
 

Modified: icecast/branches/kh/icecast/src/fserve.c
===================================================================
--- icecast/branches/kh/icecast/src/fserve.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/fserve.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -109,11 +109,11 @@
         avl_tree_free (mimetypes, _delete_mapping);
     if (fh_cache)
     {
-        int count = 100;
+        int count = 20;
         while (fh_cache->length && count)
         {
             DEBUG1 ("waiting for %u entries to clear", fh_cache->length);
-            thread_sleep (20000);
+            thread_sleep (100000);
             count--;
         }
         avl_tree_free (fh_cache, _delete_fh);
@@ -189,7 +189,11 @@
     fh_node *fh = mapping;
     if (fh->refcount)
         WARN2 ("handle for %s has refcount %d", fh->finfo.mount, fh->refcount);
-    thread_mutex_destroy (&fh->lock);
+    else
+    {
+        thread_mutex_unlock (&fh->lock);
+        thread_mutex_destroy (&fh->lock);
+    }
     if (fh->fp)
         fclose (fh->fp);
     free (fh->finfo.mount);
@@ -571,13 +575,13 @@
 static int prefile_send (client_t *client)
 {
     refbuf_t *refbuf = client->refbuf;
-    int loop = 3, bytes;
+    int loop = 6, bytes, written = 0;
 
     while (loop)
     {
         fh_node *fh = client->shared_data;
         loop--;
-        if (fserve_running == 0 || client->con->error)
+        if (fserve_running == 0 || client->connection.error)
             return -1;
         if (refbuf == NULL || client->pos == refbuf->len)
         {
@@ -620,14 +624,15 @@
         bytes = format_generic_write_to_client (client);
         if (bytes < 0)
         {
-            client->schedule_ms = client->worker->time_ms + 150;
+            client->schedule_ms = client->worker->time_ms + 300;
             return 0;
         }
+        written += bytes;
         global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
-        if (bytes < 4096)
+        if (written > 30000)
             break;
     }
-    client->schedule_ms = client->worker->time_ms + (loop ? 50 : 15);
+    client->schedule_ms = client->worker->time_ms + 150;
     return 0;
 }
 
@@ -635,15 +640,16 @@
 static int file_send (client_t *client)
 {
     refbuf_t *refbuf = client->refbuf;
-    int loop = 5, bytes;
+    int loop = 6, bytes, written = 0;
     fh_node *fh = client->shared_data;
 
-    if (client->con->discon_time && client->worker->current_time.tv_sec >= client->con->discon_time)
+    if (client->connection.discon_time &&
+            client->worker->current_time.tv_sec >= client->connection.discon_time)
         return -1;
     while (loop)
     {
         loop--;
-        if (fserve_running == 0 || client->con->error)
+        if (fserve_running == 0 || client->connection.error)
             return -1;
         if (fh->finfo.limit)
         {
@@ -681,12 +687,18 @@
         }
         bytes = client->check_buffer (client);
         if (bytes < 0)
+        {
+            client->schedule_ms = client->worker->time_ms + 300;
             return 0;
+        }
+        written += bytes;
         global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
         if (fh->finfo.limit)
             rate_add (client->out_bitrate, bytes, client->worker->time_ms);
+        if (written > 30000)
+            break;
     }
-    client->schedule_ms = client->worker->time_ms + 10;
+    client->schedule_ms = client->worker->time_ms + 150;
     return 1;
 }
 
@@ -709,8 +721,17 @@
     {
         client->check_buffer = format_generic_write_to_client;
     }
+    client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
     client->intro_offset = 0;
-    client->flags |= CLIENT_ACTIVE;
+    if (client->flags & CLIENT_ACTIVE)
+        client->schedule_ms = client->worker->time_ms;
+    else
+    {
+        client->flags |= CLIENT_ACTIVE;
+        thread_mutex_lock (&client->worker->lock);
+        thread_cond_signal (&client->worker->cond);
+        thread_mutex_unlock (&client->worker->lock);
+    }
 }
 
 

Modified: icecast/branches/kh/icecast/src/global.c
===================================================================
--- icecast/branches/kh/icecast/src/global.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/global.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -78,7 +78,7 @@
 void global_reduce_bitrate_sampling (struct rate_calc *rate)
 {
     thread_spin_lock (&global.spinlock);
-    rate_reduce (rate, 0);
+    rate_reduce (rate, 2);
     thread_spin_unlock (&global.spinlock);
 }
 

Modified: icecast/branches/kh/icecast/src/logging.c
===================================================================
--- icecast/branches/kh/icecast/src/logging.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/logging.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -134,7 +134,7 @@
             httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL),
             httpp_getvar (client->parser, HTTPP_VAR_VERSION));
 
-    stayed = now - client->con->con_time;
+    stayed = now - client->connection.con_time;
 
     if (client->username == NULL)
         username = "-"; 
@@ -151,12 +151,12 @@
 
     config = config_get_config();
     if (config->access_log.log_ip)
-        ip = client->con->ip;
+        ip = client->connection.ip;
     config_release_config ();
     log_write_direct (accesslog,
             "%s - %s [%s] \"%s\" %d %" PRIu64 " \"%s\" \"%s\" %lu",
             ip, username,
-            datebuf, reqbuf, client->respcode, client->con->sent_bytes,
+            datebuf, reqbuf, client->respcode, client->connection.sent_bytes,
             referrer, user_agent, (unsigned long)stayed);
 }
 

Modified: icecast/branches/kh/icecast/src/main.c
===================================================================
--- icecast/branches/kh/icecast/src/main.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/main.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -14,16 +14,18 @@
 #ifdef HAVE_CONFIG_H
 #include <config.h>
 #endif
-
-#ifdef WIN32_SERVICE
-#define _WIN32_WINNT 0x0400
-#include <windows.h>
-#endif
 
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 
+#ifdef WIN32
+#define _WIN32_WINNT 0x0400
+/* For getpid() */
+#include <process.h>
+#include <windows.h>
+#endif
+
 #ifdef HAVE_UNISTD_H
 # include <unistd.h>
 #endif
@@ -61,11 +63,6 @@
 
 #include <libxml/xmlmemory.h>
 
-#ifdef _WIN32
-/* For getpid() */
-#include <process.h>
-#endif
-
 #undef CATMODULE
 #define CATMODULE "main"
 
@@ -113,7 +110,6 @@
     refbuf_initialize();
 
     stats_initialize();
-    fserve_initialize();
     xslt_initialize();
 #ifdef HAVE_CURL_GLOBAL_INIT
     curl_global_init (CURL_GLOBAL_ALL);
@@ -122,15 +118,15 @@
 
 void _shutdown_subsystems(void)
 {
-    refbuf_shutdown();
+    fserve_shutdown();
     slave_shutdown();
     auth_shutdown();
     yp_shutdown();
     stats_shutdown();
 
-    fserve_shutdown();
     connection_shutdown();
     config_shutdown();
+    refbuf_shutdown();
     resolver_shutdown();
     sock_shutdown();
 
@@ -316,7 +312,6 @@
     }
     slave_initialize();
 
-    connection_thread_shutdown();
     connection_setup_sockets (NULL);
 }
 
@@ -459,6 +454,7 @@
     }
 
     _ch_root_uid_setup(); /* Change user id and root if requested/possible */
+    fserve_initialize();
 
 #ifdef CHUID 
     /* We'll only have getuid() if we also have setuid(), it's reasonable to

Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/slave.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -297,7 +297,7 @@
     char *server_id = NULL;
     ice_config_t *config;
     http_parser_t *parser = NULL;
-    connection_t *con=NULL;
+    connection_t *con = &client->connection;
     char *server = strdup (master->ip);
     char *mount = strdup (master->mount);
     int port = master->port;
@@ -346,7 +346,7 @@
             WARN3 ("Failed to connect to %s:%d for %s", server, port, relay->localmount);
             break;
         }
-        con = connection_create (streamsock, SOCK_ERROR, strdup (server));
+        connection_init (con, streamsock);
 
         /* At this point we may not know if we are relaying an mp3 or vorbis
          * stream, but only send the icy-metadata header if the relay details
@@ -405,7 +405,6 @@
             strncpy (server, uri, len);
             connection_close (con);
             httpp_destroy (parser);
-            con = NULL;
             parser = NULL;
         }
         else
@@ -417,7 +416,6 @@
                 break;
             }
             sock_set_blocking (streamsock, 0);
-            client->con = con;
             client->parser = parser;
             client_set_queue (client, NULL);
             free (server);
@@ -434,8 +432,7 @@
     free (mount);
     free (server_id);
     free (auth_header);
-    if (con)
-        connection_close (con);
+    connection_close (con);
     if (parser)
         httpp_destroy (parser);
     return -1;
@@ -495,9 +492,7 @@
     else
         yp_remove (relay->localmount);
 
-    if (client->con)
-        connection_close (client->con);
-    client->con = NULL;
+    connection_close (&client->connection);
     if (client->parser)
         httpp_destroy (client->parser);
     client->parser = NULL;
@@ -551,6 +546,7 @@
     INFO2 ("listener count still on %s is %d", src->mount, src->listeners);
     source_clear_listeners (src);
     source_clear_source (src);
+    thread_mutex_unlock (&src->lock);
     relay->start = client->worker->current_time.tv_sec + relay->interval;
     client->schedule_ms = timing_get_time() + 1000;
     client->flags |= CLIENT_ACTIVE;
@@ -595,7 +591,7 @@
     {
         client_t *client;
         global_lock();
-        client = client_create (NULL, NULL);
+        client = client_create (SOCK_ERROR);
         global_unlock();
         source->client = client;
         client->shared_data = relay;
@@ -1254,13 +1250,12 @@
         client->ops = &relay_startup_ops;
         relay->running = 0;
         global_reduce_bitrate_sampling (global.out_bitrate);
-        if (client->con)
-            connection_close (client->con);
-        client->con = NULL;
+        connection_close (&client->connection);
         if (client->parser)
             httpp_destroy (client->parser);
         client->parser = NULL;
         source_clear_source (source);
+        thread_mutex_unlock (&source->lock);
         thread_rwlock_unlock (&global.shutdown_lock);
         slave_update_all_mounts();
         return 0;
@@ -1299,6 +1294,8 @@
 
     if (relay->cleanup)
         return -1;
+    if (global.running != ICE_RUNNING)
+        return 0; /* wait for cleanup */
     if (relay->enable == 0 || relay->start > client->worker->current_time.tv_sec)
     {
         client->schedule_ms = client->worker->time_ms + 1000;

Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/source.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -61,7 +61,6 @@
 
 
 /* avl tree helper */
-static int  _compare_clients(void *compare_arg, void *a, void *b);
 static void _parse_audio_info (source_t *source, const char *s);
 static void source_client_release (client_t *client);
 static void source_listener_release (client_t *client);
@@ -75,6 +74,7 @@
 static int  locate_start_on_queue (source_t *source, client_t *client);
 static void listener_change_worker (client_t *client, source_t *source);
 static void source_change_worker (source_t *source);
+static int  source_client_callback (client_t *client);
 
 #ifdef _WIN32
 #define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
@@ -247,6 +247,8 @@
         client = source->client_list;
         source->client_list = client->next;
         client->next = NULL;
+        client->shared_data = NULL;
+        client_set_queue (client, NULL);
         /* do not count listeners who have joined but haven't done any processing */
         if (client->respcode == 200)
             i++;
@@ -256,7 +258,6 @@
     if (i)
     {
         stats_event_sub (NULL, "listeners", i);
-        stats_event_sub (source->mount, "listeners", i);
     }
     source->listeners = 0;
     source->prev_listeners = 0;
@@ -323,7 +324,6 @@
     }
 
     source->flags &= ~SOURCE_ON_DEMAND_REQ;
-    thread_mutex_unlock (&source->lock);
 }
 
 
@@ -342,6 +342,7 @@
     if (source->client_list)
         WARN1("active listeners on mountpoint %s", source->mount);
 
+    thread_mutex_unlock (&source->lock);
     thread_mutex_destroy (&source->lock);
 
     INFO1 ("freeing source \"%s\"", source->mount);
@@ -363,16 +364,12 @@
 
 client_t *source_find_client(source_t *source, int id)
 {
-    client_t fakeclient, *client = NULL;
-    connection_t fakecon;
+    client_t *client = NULL;
 
-    fakeclient.con = &fakecon;
-    fakeclient.con->id = id;
-
     client = source->client_list;
     while (client)
     {
-        if (_compare_clients (NULL, client, &fakeclient) == 0)
+        if (client->connection.id == id)
             break;
         client = client->next;
     }
@@ -403,7 +400,7 @@
     {
         worker_t *worker = source->client->worker;
         stats_event_args (source->mount, "connected", "%"PRIu64,
-                (uint64_t)(worker->current_time.tv_sec - source->client->con->con_time));
+                (uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time));
     }
     stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
     stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
@@ -430,8 +427,7 @@
         source->flags &= ~SOURCE_RUNNING;
     do
     {
-        if (source->flags & SOURCE_TEMPORARY_FALLBACK && source->fallback.mount &&
-                source->termination_count == 0)
+        if (source->fallback.mount && source->termination_count == 0)
         {
             DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
             free (source->fallback.mount);
@@ -467,7 +463,7 @@
                 break;
             }
         }
-        fds = util_timed_wait_for_fd (client->con->sock, 0);
+        fds = util_timed_wait_for_fd (client->connection.sock, 0);
         if (fds < 0)
         {
             if (! sock_recoverable (sock_error()))
@@ -491,10 +487,10 @@
             if (source->skip_duration < 60)
                 source->skip_duration = 80;
             else
-                source->skip_duration = (long)(source->skip_duration *1.3);
+                source->skip_duration = (long)(source->skip_duration *1.8);
             break;
         }
-        source->skip_duration = (long)(source->skip_duration * 0.5);
+        source->skip_duration = (long)(source->skip_duration * 0.9);
 
         skip = 0;
         source->last_read = current;
@@ -544,21 +540,20 @@
                 /* save stream to file */
                 if (source->dumpfile && source->format->write_buf_to_file)
                     source->format->write_buf_to_file (source, refbuf);
+                client->schedule_ms = client->worker->time_ms + 5;
             }
             else
             {
-                if (client->con->error)
+                skip = 1;
+                if (client->connection.error)
                 {
                     INFO1 ("End of Stream %s", source->mount);
                     source->flags &= ~SOURCE_RUNNING;
-                    skip = 1;
                 }
                 break;
             }
             loop--;
-        } while (loop);
-        if (loop == 0)
-            client->schedule_ms += 20;
+        } while (0);
 
         /* lets see if we have too much data in the queue */
         while (source->queue_size > source->queue_size_limit ||
@@ -618,14 +613,20 @@
     refbuf_t *refbuf;
 
     if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
+    {
+        client->schedule_ms += 200;
         return -1;
+    }
     refbuf = client->refbuf;
 
     /* move to the next buffer if we have finished with the current one */
     if (client->pos == refbuf->len)
     {
         if (refbuf->next == NULL)
+        {
+            client->schedule_ms = source->client->schedule_ms + 20;
             return -1;
+        }
         client_set_queue (client, refbuf->next);
     }
     return source->format->write_buf_to_client (client);
@@ -695,18 +696,19 @@
     }
     if (format_file_read (client, source->intro_file) < 0)
     {
+        client->schedule_ms = client->worker->time_ms + 1000;
         if (source->stream_data_tail)
         {
             /* better find the right place in queue for this client */
             client_set_queue (client, NULL);
             client->check_buffer = source_queue_advance;
+            if (client->connection.sent_bytes == 0) // no intro
+                client->schedule_ms = client->worker->time_ms;
             return 0;
         }
         client->intro_offset = 0;  /* replay intro file */
-        client->schedule_ms = client->worker->time_ms + 100;
         return -1;
     }
-    client->schedule_ms = client->worker->time_ms + 20;
     return source->format->write_buf_to_client (client);
 }
 
@@ -724,17 +726,22 @@
 
     if (client->respcode == 0)
     {
+        int (*build_headers)(source_t *, client_t *) = format_general_headers;
+
         if (source_running (source) == 0)
         {
             client->schedule_ms = client->worker->time_ms + 200;
             return 0;
         }
-        if (format_prepare_headers (source, client) < 0)
+        if (source->format->create_client_data)
+            build_headers = source->format->create_client_data;
+
+        refbuf->len = 0;
+        if (build_headers (source, client) < 0)
         {
             ERROR0 ("internal problem, dropping client");
             return -1;
         }
-        client->respcode = 200;
         stats_event_inc (NULL, "listeners");
         stats_event_inc (NULL, "listener_connections");
         stats_event_inc (source->mount, "listener_connections");
@@ -743,11 +750,18 @@
     {
         client->check_buffer = http_source_intro;
         client->intro_offset = 0;
-        client->pos = refbuf->len = 4096;
-        client->con->sent_bytes = 0;
-        return -1;
+        if (client->flags & CLIENT_HAS_INTRO_CONTENT)
+        {
+            client->refbuf = refbuf->next;
+            refbuf->next = NULL;
+            refbuf_release (refbuf);
+            client->pos = 0;
+        }
+        else
+            client->pos = refbuf->len = 4096;
+        client->connection.sent_bytes = 0;
+        return 0;
     }
-    client->schedule_ms = client->worker->time_ms + 10;
     return format_generic_write_to_client (client);
 }
 
@@ -757,12 +771,12 @@
 static int send_to_listener (client_t *client)
 {
     int bytes;
-    int loop = 8;   /* max number of iterations in one go */
+    int loop = 6;   /* max number of iterations in one go */
     long total_written = 0;
     int ret = 0;
     source_t *source = client->shared_data;
 
-    if (client->con->error)
+    if (client->connection.error || source == NULL)
         return -1;
     if (source->fallback.mount)
     {
@@ -779,7 +793,8 @@
             *pnext = client->next;
             if (client->check_buffer != http_source_listener)
             {
-                client_set_queue (client, NULL);
+                if ((client->flags & CLIENT_HAS_INTRO_CONTENT) == 0)
+                    client_set_queue (client, NULL);
                 client->check_buffer = source->format->write_buf_to_client;
             }
             thread_mutex_unlock (&source->lock);
@@ -802,10 +817,10 @@
         return -1;
     }
     /* check for limited listener time */
-    if (client->con->discon_time &&
-            client->worker->current_time.tv_sec >= client->con->discon_time)
+    if (client->connection.discon_time &&
+            client->worker->current_time.tv_sec >= client->connection.discon_time)
     {
-        INFO1 ("time limit reached for client #%lu", client->con->id);
+        INFO1 ("time limit reached for client #%lu", client->connection.id);
         return -1;
     }
     if (source_running (source) == 0)
@@ -824,7 +839,7 @@
     while (loop)
     {
         /* jump out if client connection has died */
-        if (client->con->error)
+        if (client->connection.error)
         {
             ret = -1;
             break;
@@ -832,17 +847,20 @@
         /* lets not send too much to one client in one go, but don't
            sleep for too long if more data can be sent */
         if (total_written > source->listener_send_trigger)
+        {
+            client->schedule_ms = client->worker->time_ms;
             break;
+        }
         bytes = client->check_buffer (client);
         if (bytes < 0)
             break;  /* can't write any more */
 
-        client->schedule_ms += 5;
+        client->schedule_ms += 100;
         total_written += bytes;
         loop--;
     }
     if (loop == 0)
-        client->schedule_ms -= 20;
+        client->schedule_ms -= 500;
     if (total_written)
     {
         rate_add (source->format->out_bitrate, total_written, client->worker->time_ms);
@@ -855,7 +873,7 @@
     if (client->refbuf && (client->refbuf->flags & SOURCE_BLOCK_RELEASE))
     {
         INFO2 ("Client %lu (%s) has fallen too far behind, removing",
-                client->con->id, client->con->ip);
+                client->connection.id, client->connection.ip);
         stats_event_inc (source->mount, "slow_listeners");
         client_set_queue (client, NULL);
         ret = -1;
@@ -893,7 +911,7 @@
     stats_event_hidden (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
     stats_event_hidden (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
     stats_event_hidden (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
-    stats_event (source->mount, "source_ip", source->client->con->ip);
+    stats_event (source->mount, "source_ip", source->client->connection.ip);
 
     source->last_read = time(NULL);
     source->prev_listeners = -1;
@@ -955,8 +973,17 @@
     source = source_find_mount (mount);
     if (source)
     {
-        source->fallback.limit = 0;
-        source->fallback.mount = strdup (dest);
+        if (strcmp (source->mount, dest) != 0)
+        {
+            thread_mutex_lock (&source->lock);
+            if (source->listeners && source->fallback.mount == NULL)
+            {
+                source->fallback.limit = 0;
+                source->fallback.mount = strdup (dest);
+                source->termination_count = source->listeners;
+            }
+            thread_mutex_unlock (&source->lock);
+        }
     }
     else
         fserve_set_override (mount, dest);
@@ -1002,21 +1029,6 @@
 }
 
 
-static int _compare_clients(void *compare_arg, void *a, void *b)
-{
-    client_t *clienta = (client_t *)a;
-    client_t *clientb = (client_t *)b;
-
-    connection_t *cona = clienta->con;
-    connection_t *conb = clientb->con;
-
-    if (cona->id < conb->id) return -1;
-    if (cona->id > conb->id) return 1;
-
-    return 0;
-}
-
-
 static void _parse_audio_info (source_t *source, const char *s)
 {
     const char *start = s;
@@ -1330,13 +1342,14 @@
 }
 
 
-int source_client_callback (client_t *client, void *arg)
+static int source_client_callback (client_t *client)
 {
     const char *agent;
-    source_t *source = arg;
+    source_t *source = client->shared_data;
 
-    if (client->con->error) /* did http response fail? */
+    if (client->connection.error) /* did http response fail? */
     {
+        thread_mutex_unlock (&source->lock);
         global_lock();
         global.sources--;
         global_unlock();
@@ -1352,7 +1365,6 @@
 
     source_init (source);
     client->ops = &source_client_ops;
-    client->shared_data = source;
     return 0;
 }
 
@@ -1474,12 +1486,12 @@
     existing = source->client_list;
     while (existing)
     {
-        if (existing->con->error == 0 && existing->username &&
+        if (existing->connection.error == 0 && existing->username &&
                 strcmp (existing->username, client->username) == 0)
         {
             if (auth->drop_existing_listener)
             {
-                existing->con->error = 1;
+                existing->connection.error = 1;
                 return 1;
             }
             else
@@ -1500,9 +1512,9 @@
     int ret = -1;
 
     client->schedule_ms = client->worker->time_ms + 100;
-    if (client->con->discon_time)
+    if (client->connection.discon_time)
     {
-        if (client->con->discon_time >= client->worker->current_time.tv_sec)
+        if (client->connection.discon_time >= client->worker->current_time.tv_sec)
             return 0;
         else
             return -1;
@@ -1513,7 +1525,7 @@
     if (source->wait_time)
     {
         /* set a wait time for leaving the source reserved */
-        client->con->discon_time = client->worker->current_time.tv_sec + source->wait_time;
+        client->connection.discon_time = client->worker->current_time.tv_sec + source->wait_time;
         INFO2 ("keeping %s reserved for %d seconds", source->mount, source->wait_time);
         ret = 0;
     }
@@ -1532,7 +1544,7 @@
     thread_mutex_lock (&source->lock);
     /* log bytes read in access log */
     if (source->format)
-        client->con->sent_bytes = source->format->read_bytes;
+        client->connection.sent_bytes = source->format->read_bytes;
     thread_mutex_unlock (&source->lock);
 
     client_destroy (client);
@@ -1551,6 +1563,11 @@
     client_t **pnext;
     int value;
 
+    if (source == NULL)
+    {
+        client_destroy (client);
+        return;
+    }
     if (source_running (source) == 0)
         return;
 
@@ -1621,6 +1638,7 @@
         {
             if (source->client == NULL && (source->flags & SOURCE_ON_DEMAND) == 0)
             {
+                thread_mutex_unlock (&source->lock);
                 client_send_403 (client, "Slave relay reading from time unregulated stream");
                 return -1;
             }
@@ -1657,8 +1675,8 @@
         }
 
         /* set a per-mount disconnect time if auth hasn't set one already */
-        if (mountinfo->max_listener_duration && client->con->discon_time == 0)
-            client->con->discon_time = time(NULL) + mountinfo->max_listener_duration;
+        if (mountinfo->max_listener_duration && client->connection.discon_time == 0)
+            client->connection.discon_time = time(NULL) + mountinfo->max_listener_duration;
 
         INFO3 ("max on %s is %ld (cur %lu)", source->mount,
                 mountinfo->max_listeners, source->listeners);
@@ -1698,7 +1716,7 @@
         return -1;
 
     } while (1);
-    client->con->sent_bytes = 0;
+    client->connection.sent_bytes = 0;
 
     client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
     memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
@@ -1731,6 +1749,7 @@
 static int source_client_http_send (client_t *client)
 {
     refbuf_t *stream;
+    source_t *source = client->shared_data;
 
     if (client->pos < client->refbuf->len)
     {
@@ -1744,7 +1763,8 @@
     client->refbuf = stream;
     client->pos = client->intro_offset;
     client->intro_offset = 0;
-    return source_client_callback (client, client->shared_data);
+    thread_mutex_lock (&source->lock);
+    return source_client_callback (client);
 }
 
 
@@ -1770,7 +1790,7 @@
         if (client->server_conn && client->server_conn->shoutcast_compat)
         {
             source->flags |= SOURCE_SHOUTCAST_COMPAT;
-            source_client_callback (client, source);
+            source_client_callback (client);
         }
         else
         {

Modified: icecast/branches/kh/icecast/src/source.h
===================================================================
--- icecast/branches/kh/icecast/src/source.h	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/source.h	2009-08-28 20:19:49 UTC (rev 16524)
@@ -86,13 +86,12 @@
 #define SOURCE_TERMINATING          020
 #define SOURCE_TEMPORARY_FALLBACK   040
 
-#define source_available(x)     ((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND))
+#define source_available(x)     (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && (x)->fallback.mount == NULL)
 #define source_running(x)       ((x)->flags & SOURCE_RUNNING)
 
 source_t *source_reserve (const char *mount);
 void *source_client_thread (void *arg);
 int  source_startup (client_t *client, const char *uri);
-int  source_client_callback (client_t *client, void *source);
 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo);
 void source_clear_listeners (source_t *source);
 void source_clear_source (source_t *source);

Modified: icecast/branches/kh/icecast/src/stats.c
===================================================================
--- icecast/branches/kh/icecast/src/stats.c	2009-08-28 15:55:13 UTC (rev 16523)
+++ icecast/branches/kh/icecast/src/stats.c	2009-08-28 20:19:49 UTC (rev 16524)
@@ -585,7 +585,7 @@
     int ret = 0;
     event_listener_t *listener = client->shared_data;
 
-    if (client->con->error)
+    if (client->connection.error)
         return -1;
     if (client->flags & STATS_LARGE)
         loop = 4;



More information about the commits mailing list