[xiph-commits] r9733 - icecast/trunk/icecast/src

karl at svn.xiph.org karl at svn.xiph.org
Thu Aug 11 16:30:03 PDT 2005


Author: karl
Date: 2005-08-11 16:29:58 -0700 (Thu, 11 Aug 2005)
New Revision: 9733

Modified:
   icecast/trunk/icecast/src/client.c
   icecast/trunk/icecast/src/client.h
   icecast/trunk/icecast/src/connection.c
   icecast/trunk/icecast/src/connection.h
   icecast/trunk/icecast/src/fserve.c
   icecast/trunk/icecast/src/slave.c
   icecast/trunk/icecast/src/source.c
Log:
drop the thread pool of connection threads, they were using a blocking socket
on incoming connections. Now we get the accept thread to create a client_t
and mark it as a shoutcast client if need be.  Then use a single connection
thread to poll the non-blocking sockets for the headers. When complete they
get handled as usual.


Modified: icecast/trunk/icecast/src/client.c
===================================================================
--- icecast/trunk/icecast/src/client.c	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/client.c	2005-08-11 23:29:58 UTC (rev 9733)
@@ -39,33 +39,34 @@
 #undef CATMODULE
 #define CATMODULE "client"
 
-client_t *client_create(connection_t *con, http_parser_t *parser)
+/* should be called with global lock held */
+int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
 {
-    ice_config_t *config = config_get_config ();
+    ice_config_t *config;
     client_t *client = (client_t *)calloc(1, sizeof(client_t));
-    int client_limit = config->client_limit;
+    int ret = -1;
+
+    if (client == NULL)
+        return -1;
+
+    config = config_get_config ();
+
+    global.clients++;
+    if (config->client_limit < global.clients)
+        WARN2 ("server client limit reached (%d/%d)", config->client_limit, global.clients);
+    else
+        ret = 0;
+
     config_release_config ();
 
-    global_lock();
-    if (global.clients >= client_limit || client == NULL)
-    {
-        client_limit = global.clients;
-        global_unlock();
-        free (client);
-        WARN1 ("server client limit reached (%d clients)", client_limit);
-        return NULL;
-    }
-    global.clients++;
     stats_event_args (NULL, "clients", "%d", global.clients);
-    global_unlock();
-
     client->con = con;
     client->parser = parser;
-    client->refbuf = NULL;
     client->pos = 0;
     client->write_to_client = format_generic_write_to_client;
+    *c_ptr = client;
 
-    return client;
+    return ret;
 }
 
 void client_destroy(client_t *client)
@@ -110,7 +111,23 @@
 /* helper function for reading data from a client */
 int client_read_bytes (client_t *client, void *buf, unsigned len)
 {
-    int bytes = sock_read_bytes (client->con->sock, buf, len);
+    int bytes;
+    
+    if (client->refbuf && client->refbuf->len)
+    {
+        /* we have data to read from a refbuf first */
+        if (client->refbuf->len < len)
+            len = client->refbuf->len;
+        memcpy (buf, client->refbuf->data, len);
+        if (client->refbuf->len < len)
+        {
+            char *ptr = client->refbuf->data;
+            memmove (ptr, ptr+len, client->refbuf->len - len);
+        }
+        client->refbuf->len -= len;
+        return len;
+    }
+    bytes = sock_read_bytes (client->con->sock, buf, len);
     if (bytes > 0)
         return bytes;
 

Modified: icecast/trunk/icecast/src/client.h
===================================================================
--- icecast/trunk/icecast/src/client.h	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/client.h	2005-08-11 23:29:58 UTC (rev 9733)
@@ -67,7 +67,7 @@
 
 } client_t;
 
-client_t *client_create(connection_t *con, http_parser_t *parser);
+int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser);
 void client_destroy(client_t *client);
 void client_send_504(client_t *client, char *message);
 void client_send_404(client_t *client, char *message);

Modified: icecast/trunk/icecast/src/connection.c
===================================================================
--- icecast/trunk/icecast/src/connection.c	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/connection.c	2005-08-11 23:29:58 UTC (rev 9733)
@@ -80,10 +80,13 @@
 #define SHOUTCAST_SOURCE_AUTH 1
 #define ICECAST_SOURCE_AUTH 0
 
-typedef struct con_queue_tag {
-    connection_t *con;
-    struct con_queue_tag *next;
-} con_queue_t;
+typedef struct client_queue_tag {
+    client_t *client;
+    int offset;
+    int stream_offset;
+    int shoutcast;
+    struct client_queue_tag *next;
+} client_queue_t;
 
 typedef struct _thread_queue_tag {
     thread_type *thread_id;
@@ -93,12 +96,13 @@
 static mutex_t _connection_mutex;
 static volatile unsigned long _current_id = 0;
 static int _initialized = 0;
+static thread_type *tid;
 
-volatile static con_queue_t *_queue = NULL;
-static mutex_t _queue_mutex;
+static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
+static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
+static mutex_t _con_queue_mutex;
+static mutex_t _req_queue_mutex;
 
-static thread_queue_t *_conhands = NULL;
-
 rwlock_t _source_shutdown_rwlock;
 
 static void *_handle_connection(void *arg);
@@ -108,7 +112,8 @@
     if (_initialized) return;
     
     thread_mutex_create(&_connection_mutex);
-    thread_mutex_create(&_queue_mutex);
+    thread_mutex_create(&_con_queue_mutex);
+    thread_mutex_create(&_req_queue_mutex);
     thread_mutex_create(&move_clients_mutex);
     thread_rwlock_create(&_source_shutdown_rwlock);
     thread_cond_create(&global.shutdown_cond);
@@ -122,7 +127,8 @@
     
     thread_cond_destroy(&global.shutdown_cond);
     thread_rwlock_destroy(&_source_shutdown_rwlock);
-    thread_mutex_destroy(&_queue_mutex);
+    thread_mutex_destroy(&_con_queue_mutex);
+    thread_mutex_destroy(&_req_queue_mutex);
     thread_mutex_destroy(&_connection_mutex);
     thread_mutex_destroy(&move_clients_mutex);
 
@@ -140,19 +146,19 @@
     return id;
 }
 
-connection_t *create_connection(sock_t sock, sock_t serversock, char *ip) {
+connection_t *connection_create (sock_t sock, sock_t serversock, char *ip)
+{
     connection_t *con;
-    con = (connection_t *)malloc(sizeof(connection_t));
-    memset(con, 0, sizeof(connection_t));
-    con->sock = sock;
-    con->serversock = serversock;
-    con->con_time = time(NULL);
-    con->id = _next_connection_id();
-    con->ip = ip;
+    con = (connection_t *)calloc(1, sizeof(connection_t));
+    if (con)
+    {
+        con->sock = sock;
+        con->serversock = serversock;
+        con->con_time = time(NULL);
+        con->id = _next_connection_id();
+        con->ip = ip;
+    }
 
-    con->event_number = EVENT_NO_EVENT;
-    con->event = NULL;
-
     return con;
 }
 
@@ -254,8 +260,11 @@
     ip = (char *)malloc(MAX_ADDR_LEN);
 
     sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
-    if (sock >= 0) {
-        con = create_connection(sock, serversock, ip);
+    if (sock >= 0)
+    {
+        con = connection_create (sock, serversock, ip);
+        if (con == NULL)
+            free (ip);
 
         return con;
     }
@@ -268,172 +277,208 @@
     return NULL;
 }
 
-static void _add_connection(connection_t *con)
+
+/* add client to connection queue. At this point some header information
+ * has been collected, so we now pass it onto the connection thread for
+ * further processing
+ */
+static void _add_connection (client_queue_t *node)
 {
-    con_queue_t *node;
-
-    node = (con_queue_t *)malloc(sizeof(con_queue_t));
-    
-    thread_mutex_lock(&_queue_mutex);
-    node->con = con;
-    node->next = (con_queue_t *)_queue;
-    _queue = node;
-    thread_mutex_unlock(&_queue_mutex);
+    thread_mutex_lock (&_con_queue_mutex);
+    *_con_queue_tail = node;
+    _con_queue_tail = (volatile client_queue_t **)&node->next;
+    thread_mutex_unlock (&_con_queue_mutex);
 }
 
-static void _push_thread(thread_queue_t **queue, thread_type *thread_id)
+
+/* this returns queued clients for the connection thread. headers are
+ * already provided, but need to be parsed.
+ */
+static client_queue_t *_get_connection(void)
 {
-    /* create item */
-    thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t));
-    item->thread_id = thread_id;
-    item->next = NULL;
+    client_queue_t *node = NULL;
 
-
-    thread_mutex_lock(&_queue_mutex);
-    if (*queue == NULL) {
-        *queue = item;
-    } else {
-        item->next = *queue;
-        *queue = item;
+    /* common case, no new connections so don't bother taking locks */
+    if (_con_queue)
+    {
+        thread_mutex_lock (&_con_queue_mutex);
+        node = (client_queue_t *)_con_queue;
+        _con_queue = node->next;
+        if (_con_queue == NULL)
+            _con_queue_tail = &_con_queue;
+        thread_mutex_unlock (&_con_queue_mutex);
     }
-    thread_mutex_unlock(&_queue_mutex);
+    return node;
 }
 
-static thread_type *_pop_thread(thread_queue_t **queue)
+
+/* run along queue checking for any data that has come in or a timeout */
+static void process_request_queue ()
 {
-    thread_type *id;
-    thread_queue_t *item;
+    client_queue_t **node_ref = (client_queue_t **)&_req_queue;
+    ice_config_t *config = config_get_config ();
+    int timeout = config->header_timeout;
+    config_release_config();
 
-    thread_mutex_lock(&_queue_mutex);
+    while (*node_ref)
+    {
+        client_queue_t *node = *node_ref;
+        client_t *client = node->client;
+        int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
+        char *buf = client->refbuf->data + node->offset;
 
-    item = *queue;
-    if (item == NULL) {
-        thread_mutex_unlock(&_queue_mutex);
-        return NULL;
-    }
+        if (len > 0)
+        {
+            if (client->con->con_time + timeout <= time(NULL))
+                len = 0;
+            else
+                len = client_read_bytes (client, buf, len);
+        }
 
-    *queue = item->next;
-    item->next = NULL;
-    id = item->thread_id;
-    free(item);
+        if (len > 0)
+        {
+            int pass_it = 1;
+            char *ptr;
 
-    thread_mutex_unlock(&_queue_mutex);
+            node->offset += len;
+            client->refbuf->data [node->offset] = '\000';
+            do
+            {
+                if (node->shoutcast == 1)
+                {
+                    /* password line */
+                    if (strstr (client->refbuf->data, "\r\n") != NULL)
+                        break;
+                    if (strstr (client->refbuf->data, "\n") != NULL)
+                        break;
+                }
+                /* stream_offset refers to the start of any data sent after the
+                 * http style headers, we don't want to lose those */
+                ptr = strstr (client->refbuf->data, "\r\n\r\n");
+                if (ptr)
+                {
+                    node->stream_offset = (ptr+4) - client->refbuf->data;
+                    break;
+                }
+                ptr = strstr (client->refbuf->data, "\n\n");
+                if (ptr)
+                {
+                    node->stream_offset = (ptr+2) - client->refbuf->data;
+                    break;
+                }
+                pass_it = 0;
+            } while (0);
 
-    return id;
+            if (pass_it)
+            {
+                if ((client_queue_t **)_req_queue_tail == &(node->next))
+                    _req_queue_tail = (volatile client_queue_t **)node_ref;
+                *node_ref = node->next;
+                node->next = NULL;
+                _add_connection (node);
+            }
+        }
+        else
+        {
+            if (len == 0 || client->con->error)
+            {
+                if ((client_queue_t **)_req_queue_tail == &node->next)
+                    _req_queue_tail = (volatile client_queue_t **)node_ref;
+                *node_ref = node->next;
+                client_destroy (client);
+                free (node);
+                continue;
+            }
+        }
+        node_ref = &node->next;
+    }
 }
 
-static void _build_pool(void)
+
+/* add node to the queue of requests. This is where the clients are when
+ * initial http details are read.
+ */
+static void _add_request_queue (client_queue_t *node)
 {
-    ice_config_t *config;
-    int i;
-    thread_type *tid;
-    char buff[64];
-    int threadpool_size;
-
-    config = config_get_config();
-    threadpool_size = config->threadpool_size;
-    config_release_config();
-
-    for (i = 0; i < threadpool_size; i++) {
-        snprintf(buff, 64, "Connection Thread #%d", i);
-        tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED);
-        _push_thread(&_conhands, tid);
-    }
+    thread_mutex_lock (&_req_queue_mutex);
+    *_req_queue_tail = node;
+    _req_queue_tail = (volatile client_queue_t **)&node->next;
+    thread_mutex_unlock (&_req_queue_mutex);
 }
 
-static void _destroy_pool(void)
-{
-    thread_type *id;
-    int i;
 
-    i = 0;
-
-    id = _pop_thread(&_conhands);
-    while (id != NULL) {
-        thread_join(id);
-        id = _pop_thread(&_conhands);
-    }
-    INFO0("All connection threads down");
-}
-
 void connection_accept_loop(void)
 {
     connection_t *con;
 
-    _build_pool();
+    tid = thread_create ("connection thread", _handle_connection, NULL, THREAD_ATTACHED);
 
     while (global.running == ICE_RUNNING)
     {
-        if (global . schedule_config_reread)
+        con = _accept_connection();
+
+        if (con)
         {
-            /* reread config file */
-            INFO0("Scheduling config reread ...");
+            client_queue_t *node;
+            ice_config_t *config;
+            int i;
+            client_t *client = NULL;
 
-            connection_inject_event(EVENT_CONFIG_READ, NULL);
-            global . schedule_config_reread = 0;
-        }
+            global_lock();
+            if (client_create (&client, con, NULL) < 0)
+            {
+                global_unlock();
+                client_send_404 (client, "Icecast connection limit reached");
+                continue;
+            }
+            global_unlock();
 
-        con = _accept_connection();
+            /* setup client for reading incoming http */
+            client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+            client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
+            client->refbuf->len = 0; /* force reader code to ignore buffer */
 
-        if (con) {
-            _add_connection(con);
+            node = calloc (1, sizeof (client_queue_t));
+            if (node == NULL)
+            {
+                client_destroy (client);
+                continue;
+            }
+            node->client = client;
+
+            /* Check for special shoutcast compatability processing */
+            config = config_get_config();
+            for (i = 0; i < global.server_sockets; i++)
+            {
+                if (global.serversock[i] == con->serversock)
+                {
+                    if (config->listeners[i].shoutcast_compat)
+                        node->shoutcast = 1;
+                }
+            }
+            config_release_config(); 
+
+            sock_set_blocking (client->con->sock, SOCK_NONBLOCK);
+            sock_set_nodelay (client->con->sock);
+
+            _add_request_queue (node);
+            stats_event_inc (NULL, "connections");
         }
+        process_request_queue ();
     }
 
     /* Give all the other threads notification to shut down */
     thread_cond_broadcast(&global.shutdown_cond);
 
-    _destroy_pool();
+    if (tid)
+        thread_join (tid);
 
     /* wait for all the sources to shutdown */
     thread_rwlock_wlock(&_source_shutdown_rwlock);
     thread_rwlock_unlock(&_source_shutdown_rwlock);
 }
 
-static connection_t *_get_connection(void)
-{
-    con_queue_t *node = NULL;
-    con_queue_t *oldnode = NULL;
-    connection_t *con = NULL;
 
-    /* common case, no new connections so don't bother taking locks */
-    if (_queue == NULL)
-        return NULL;
-
-    thread_mutex_lock(&_queue_mutex);
-    if (_queue) {
-        node = (con_queue_t *)_queue;
-        while (node->next) {
-            oldnode = node;
-            node = node->next;
-        }
-        
-        /* node is now the last node
-        ** and oldnode is the previous one, or NULL
-        */
-        if (oldnode) oldnode->next = NULL;
-        else (_queue) = NULL;
-    }
-    thread_mutex_unlock(&_queue_mutex);
-
-    if (node) {
-        con = node->con;
-        free(node);
-    }
-
-    return con;
-}
-
-void connection_inject_event(int eventnum, void *event_data) {
-    connection_t *con = calloc(1, sizeof(connection_t));
-
-    con->event_number = eventnum;
-    con->event = event_data;
-
-    _add_connection(con);
-}
-
-
 /* Called when activating a source. Verifies that the source count is not
  * exceeded and applies any initial parameters.
  */
@@ -484,10 +529,6 @@
             return -1;
         }
 
-        global.sources++;
-        stats_event_args (NULL, "sources", "%d", global.sources);
-        global_unlock();
-
         /* for relays, we don't yet have a client, however we do require one
          * to retrieve the stream from.  This is created here, quite late,
          * because we can't use this client to return an error code/message,
@@ -495,12 +536,9 @@
          */
         if (source->client == NULL)
         {
-            source->client = client_create (source->con, source->parser);
-            if (source->client == NULL)
+            if (client_create (&source->client, source->con, source->parser) < 0)
             {
                 config_release_config();
-                global_lock();
-                global.sources--;
                 global_unlock();
                 connection_close (source->con);
                 source->con = NULL;
@@ -509,6 +547,9 @@
                 return -1;
             }
         }
+        global.sources++;
+        stats_event_args (NULL, "sources", "%d", global.sources);
+        global_unlock();
 
         source->running = 1;
         mountinfo = config_find_mount (config, source->mount);
@@ -823,160 +864,141 @@
         return;
     }
 
-    sock_set_blocking(client->con->sock, SOCK_NONBLOCK);
-    sock_set_nodelay(client->con->sock);
-
-    client->write_to_client = format_generic_write_to_client;
-    client->check_buffer = format_check_http_buffer;
-    client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-
     add_client (uri, client);
     if (uri != passed_uri) free (uri);
 }
 
-void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_password) {
-    char shoutcast_password[256];
+static void _handle_shoutcast_compatible (client_queue_t *node)
+{
     char *http_compliant;
     int http_compliant_len = 0;
-    char header[4096];
     http_parser_t *parser;
+    ice_config_t *config = config_get_config ();
+    char *shoutcast_mount;
+    client_t *client = node->client;
 
-    memset(shoutcast_password, 0, sizeof (shoutcast_password));
-    /* Step one of shoutcast auth protocol, read encoder password (1 line) */
-    if (util_read_header(con->sock, shoutcast_password, 
-            sizeof (shoutcast_password), 
-            READ_LINE) == 0) {
-        /* either we didn't get a complete line, or we timed out */
-        connection_close(con);
-        return;
-    }
-    /* Get rid of trailing \n */
-    shoutcast_password[strlen(shoutcast_password)-1] = '\000';
-    if (strcmp(shoutcast_password, source_password)) {
-        ERROR0("Invalid source password");
-        connection_close(con);
-        return;
-    }
-    /* Step two of shoutcast auth protocol, send OK2.  For those
-       interested, OK2 means it supports metadata updates via admin.cgi,
-       and the string "OK" can also be sent, but will indicate to the
-       shoutcast source client to not send metadata updates.
-       I believe icecast 1.x used to send OK. */
-    sock_write(con->sock, "%s\r\n", "OK2");
+    if (node->shoutcast == 1)
+    {
+        char *source_password, *ptr;
+        mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount);
 
-    memset(header, 0, sizeof (header));
-    /* Step three of shoutcast auth protocol, read HTTP-style
-       request headers and process them.*/
-    if (util_read_header(con->sock, header, sizeof (header), 
-                         READ_ENTIRE_HEADER) == 0) {
-        /* either we didn't get a complete header, or we timed out */
-        connection_close(con);
+        if (mountinfo && mountinfo->password)
+            source_password = strdup (mountinfo->password);
+        else
+            source_password = strdup (config->source_password);
+        config_release_config();
+
+        /* Get rid of trailing \r\n or \n after password */
+        ptr = strstr (client->refbuf->data, "\r\n");
+        if (ptr == NULL)
+            ptr = strstr (client->refbuf->data, "\n");
+
+        if (ptr == NULL)
+        {
+            client_destroy (client);
+            free (source_password);
+            free (node);
+            return;
+        }
+        *ptr = '\0';
+
+        if (strcmp (client->refbuf->data, source_password) == 0)
+        {
+            client->respcode = 200;
+            /* send this non-blocking but if there is only a partial write
+             * then leave to header timeout */
+            sock_write (client->con->sock, "OK2\r\n");
+            memset (client->refbuf->data, 0, client->refbuf->len);
+            node->shoutcast = 2;
+            node->offset = 0;
+            /* we've checked the password, now send it back for reading headers */
+            _add_request_queue (node);
+            free (source_password);
+            return;
+        }
+        client_destroy (client);
+        free (node);
         return;
     }
+    shoutcast_mount = strdup (config->shoutcast_mount);
+    config_release_config();
     /* Here we create a valid HTTP request based of the information
        that was passed in via the non-HTTP style protocol above. This
        means we can use some of our existing code to handle this case */
-    http_compliant_len = strlen(header) + strlen(mount) + 20;
+    http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset;
     http_compliant = (char *)calloc(1, http_compliant_len);
     snprintf (http_compliant, http_compliant_len,
-            "SOURCE %s HTTP/1.0\r\n%s", mount, header);
+            "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, client->refbuf->data);
     parser = httpp_create_parser();
     httpp_initialize(parser, NULL);
     if (httpp_parse (parser, http_compliant, strlen(http_compliant)))
     {
-        client_t *client = client_create (con, parser);
-        if (client)
+        /* we may have more than just headers, so prepare for it */
+        if (node->stream_offset == node->offset)
+            client->refbuf->len = 0;
+        else
         {
-            _handle_source_request (client, mount, SHOUTCAST_SOURCE_AUTH);
-            free (http_compliant);
-            return;
+            char *ptr = client->refbuf->data;
+            client->refbuf->len = node->offset - node->stream_offset;
+            memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
         }
+        client->parser = parser;
+        _handle_source_request (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH);
     }
-    connection_close (con);
-    httpp_destroy (parser);
+    else
+        client_destroy (client);
     free (http_compliant);
+    free (shoutcast_mount);
+    free (node);
+    return;
 }
 
+
+/* Connection thread. Here we take clients off the connection queue and check
+ * the contents provided. We set up the parser then hand off to the specific
+ * request handler.
+ */
 static void *_handle_connection(void *arg)
 {
-    char header[4096];
-    connection_t *con;
     http_parser_t *parser;
     char *rawuri, *uri;
-    client_t *client;
-    int i = 0;
-    int continue_flag = 0;
-    ice_config_t *config;
-    char *source_password;
 
     while (global.running == ICE_RUNNING) {
 
-        /* grab a connection and set the socket to blocking */
-        while ((con = _get_connection())) {
+        client_queue_t *node = _get_connection();
 
-            /* Handle meta-connections */
-            if(con->event_number > 0) {
-                switch(con->event_number) {
-                    case EVENT_CONFIG_READ:
-                        event_config_read(con->event);
-                        break;
-                    default:
-                        ERROR1("Unknown event number: %d", con->event_number);
-                        break;
-                }
-                free(con);
-                continue;
-            }
+        if (node)
+        {
+            client_t *client = node->client;
 
-            stats_event_inc(NULL, "connections");
-
-            sock_set_blocking(con->sock, SOCK_BLOCK);
-
-            continue_flag = 0;
             /* Check for special shoutcast compatability processing */
-            for(i = 0; i < MAX_LISTEN_SOCKETS; i++) {
-                if(global.serversock[i] == con->serversock) {
-                    config = config_get_config();
-                    if (config->listeners[i].shoutcast_compat) {
-                        char *shoutcast_mount = strdup (config->shoutcast_mount);
-                        mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount);
-                        if (mountinfo && mountinfo->password)
-                            source_password = strdup (mountinfo->password);
-                        else
-                            source_password = strdup (config->source_password);
-                        config_release_config();
-                        _handle_shoutcast_compatible(con, shoutcast_mount, source_password);
-                        free(source_password);
-                        free (shoutcast_mount);
-                        continue_flag = 1;
-                        break;
-                    }
-                    config_release_config();
-                }
-            }
-            if(continue_flag) {
+            if (node->shoutcast) 
+            {
+                _handle_shoutcast_compatible (node);
                 continue;
             }
 
-            /* fill header with the http header */
-            memset(header, 0, sizeof (header));
-            if (util_read_header(con->sock, header, sizeof (header), 
-                                 READ_ENTIRE_HEADER) == 0) {
-                /* either we didn't get a complete header, or we timed out */
-                connection_close(con);
-                continue;
-            }
-
+            /* process normal HTTP headers */
             parser = httpp_create_parser();
             httpp_initialize(parser, NULL);
-            if (httpp_parse(parser, header, strlen(header))) {
-                /* handle the connection or something */
+            client->parser = parser;
+            if (httpp_parse (parser, client->refbuf->data, node->offset))
+            {
+                /* we may have more than just headers, so prepare for it */
+                if (node->stream_offset == node->offset)
+                    client->refbuf->len = 0;
+                else
+                {
+                    char *ptr = client->refbuf->data;
+                    client->refbuf->len = node->offset - node->stream_offset;
+                    memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
+                }
+                free (node);
                 
                 if (strcmp("ICE",  httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) &&
                     strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) {
                     ERROR0("Bad HTTP protocol detected");
-                    connection_close(con);
-                    httpp_destroy(parser);
+                    client_destroy (client);
                     continue;
                 }
 
@@ -985,21 +1007,9 @@
 
                 if (uri == NULL)
                 {
-                    sock_write(con->sock, "The path you requested was invalid\r\n");
-                    connection_close(con);
-                    httpp_destroy(parser);
+                    client_destroy (client);
                     continue;
                 }
-                client = client_create (con, parser);
-                if (client == NULL)
-                {
-                    sock_write (con->sock, "HTTP/1.0 404 File Not Found\r\n"
-                            "Content-Type: text/html\r\n\r\n"
-                            "<b>Connection limit reached</b>");
-                    connection_close(con);
-                    httpp_destroy(parser);
-                    continue;
-                }
 
                 if (parser->req_type == httpp_req_source) {
                     _handle_source_request (client, uri, ICECAST_SOURCE_AUTH);
@@ -1016,16 +1026,16 @@
                 }
 
                 free(uri);
-                continue;
             } 
-            else {
+            else
+            {
+                free (node);
                 ERROR0("HTTP request parsing failed");
-                connection_close(con);
-                httpp_destroy(parser);
-                continue;
+                client_destroy (client);
             }
+            continue;
         }
-        thread_sleep (100000);
+        thread_sleep (50000);
     }
     DEBUG0 ("Connection thread done");
 

Modified: icecast/trunk/icecast/src/connection.h
===================================================================
--- icecast/trunk/icecast/src/connection.h	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/connection.h	2005-08-11 23:29:58 UTC (rev 9733)
@@ -38,20 +38,15 @@
     char *ip;
     char *host;
 
-    /* For 'fake' connections */
-    int event_number;
-    void *event;
 } connection_t;
 
 void connection_initialize(void);
 void connection_shutdown(void);
 void connection_accept_loop(void);
 void connection_close(connection_t *con);
-connection_t *create_connection(sock_t sock, sock_t serversock, char *ip);
+connection_t *connection_create (sock_t sock, sock_t serversock, char *ip);
 int connection_complete_source (struct source_tag *source);
 
-void connection_inject_event(int eventnum, void *event_data);
-
 int connection_check_source_pass(http_parser_t *parser, const char *mount);
 int connection_check_relay_pass(http_parser_t *parser);
 int connection_check_admin_pass(http_parser_t *parser);

Modified: icecast/trunk/icecast/src/fserve.c
===================================================================
--- icecast/trunk/icecast/src/fserve.c	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/fserve.c	2005-08-11 23:29:58 UTC (rev 9733)
@@ -571,9 +571,6 @@
     fclient->client = client;
     fclient->ready = 0;
 
-    sock_set_blocking (client->con->sock, SOCK_NONBLOCK);
-    sock_set_nodelay (client->con->sock);
-
     thread_mutex_lock (&pending_lock);
     fclient->next = (fserve_t *)pending_list;
     pending_list = fclient;

Modified: icecast/trunk/icecast/src/slave.c
===================================================================
--- icecast/trunk/icecast/src/slave.c	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/slave.c	2005-08-11 23:29:58 UTC (rev 9733)
@@ -56,6 +56,7 @@
 #include "logging.h"
 #include "source.h"
 #include "format.h"
+#include "event.h"
 
 #define CATMODULE "slave"
 
@@ -180,7 +181,7 @@
                     relay->server, relay->port, relay->mount);
             break;
         }
-        con = create_connection (streamsock, -1, NULL);
+        con = connection_create (streamsock, -1, NULL);
 
         if (relay->username && relay->password)
         {
@@ -598,6 +599,13 @@
     {
         relay_server *cleanup_relays;
 
+        /* re-read xml file if requested */
+        if (global . schedule_config_reread)
+        {
+            event_config_read (NULL);
+            global . schedule_config_reread = 0;
+        }
+
         thread_sleep (1000000);
         if (slave_running == 0)
             break;

Modified: icecast/trunk/icecast/src/source.c
===================================================================
--- icecast/trunk/icecast/src/source.c	2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/source.c	2005-08-11 23:29:58 UTC (rev 9733)
@@ -602,9 +602,6 @@
     stats_event (source->mount, "listener_peak", "0");
     stats_event_time (source->mount, "stream_start");
 
-    if (source->client->con)
-        sock_set_blocking (source->con->sock, SOCK_NONBLOCK);
-
     DEBUG0("Source creation complete");
     source->last_read = time (NULL);
     source->running = 1;



More information about the commits mailing list