[Icecast-dev] [PATCH 17/31] Connection: add threads this needs to go after the client_tag_t obsoletting patch

Niv Sardi nsardi at smartjog.com
Fri Jul 30 07:54:39 PDT 2010


Signed-off-by: Niv Sardi <nsardi at smartjog.com>
---
 src/connection.c |  150 +++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 132 insertions(+), 18 deletions(-)

diff --git a/src/connection.c b/src/connection.c
index 7641e8e..118258d 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -82,19 +82,10 @@
 #define SHOUTCAST_SOURCE_AUTH 1
 #define ICECAST_SOURCE_AUTH 0
 
-typedef struct client_queue_tag {
-    client_t *client;
-    int offset;
-    int stream_offset;
-    int shoutcast;
-    char *shoutcast_mount;
-    struct client_queue_tag *next;
-} client_queue_t;
-
-typedef struct _thread_queue_tag {
-    thread_type *thread_id;
-    struct _thread_queue_tag *next;
-} thread_queue_t;
+typedef struct connection_queue_tag {
+    connection_t *con;
+    struct connection_queue_tag *next;
+} connection_queue_t;
 
 typedef struct
 {
@@ -107,8 +98,11 @@ typedef struct
 static spin_t _connection_lock;
 static volatile unsigned long _current_id = 0;
 static int _initialized = 0;
+static volatile int _connection_running = 0;
+static volatile connection_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
+static cond_t *_connection_cond;
+static thread_type *_connection_thread_id;
 
-static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
 static int ssl_ok;
 #ifdef HAVE_OPENSSL
 static SSL_CTX *ssl_ctx;
@@ -121,6 +115,8 @@ rwlock_t _source_shutdown_rwlock;
 
 static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount);
 static int _handle_client (client_t *client);
+static int _connection_process (connection_t *con, int timeout);
+static void *_connection_thread (void *arg);
 
 static int compare_ip (void *arg, void *a, void *b)
 {
@@ -155,6 +151,12 @@ void connection_initialize(void)
     allowed_ip.contents = NULL;
     allowed_ip.file_mtime = 0;
 
+    _connection_running = 1;
+    /* (XXX)xaiki: need a way to make it go away on shutdown, ok for now */
+    _connection_thread_id = thread_create ("Connection Thread", _connection_thread,
+					   NULL, THREAD_DETACHED);
+    _connection_cond = thread_cond_create ();
+
     _initialized = 1;
 }
 
@@ -162,12 +164,20 @@ void connection_shutdown(void)
 {
     if (!_initialized) return;
 
+    _connection_running = 0;
+    thread_cond_signal(_connection_cond);
+    DEBUG0 ("waiting for connection thread");
+    thread_join(_connection_thread_id);
+
 #ifdef HAVE_OPENSSL
     SSL_CTX_free (ssl_ctx);
 #endif
     if (banned_ip.contents)  avl_tree_free (banned_ip.contents, free_filtered_ip);
     if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_ip);
 
+    thread_cond_destroy (_connection_cond);
+    free (_connection_cond);
+
     thread_cond_destroy(&global.shutdown_cond);
     thread_rwlock_destroy(&_source_shutdown_rwlock);
     thread_spin_destroy (&_connection_lock);
@@ -551,7 +561,108 @@ static connection_t *_accept_connection(int duration)
     return NULL;
 }
 
-int connection_process (connection_t *con, int timeout) {
+connection_queue_t *_connection_node_new (connection_t *con)
+{
+    connection_queue_t *node;
+    if (!con)
+        return NULL;
+
+    node = calloc (1, sizeof (connection_queue_t));
+    if (!node)
+        return NULL;
+
+    node->con = con;
+
+    return node;
+}
+
+/* add a connection to connection queue. At this point the connection
+ * has just been accepted, we push it to the queue and return so that
+ * we can keep getting connections in.
+ */
+static void _add_connection (connection_queue_t *node)
+{
+    WARN ("added connection");
+    *_con_queue_tail = node;
+    _con_queue_tail = (volatile connection_queue_t **)&node->next;
+
+    thread_cond_signal(_connection_cond);
+}
+
+/* this returns queued clients for the connection thread. headers are
+ * already provided, but need to be parsed.
+ */
+static connection_queue_t *_get_connection(void)
+{
+    connection_queue_t *node = NULL;
+
+    /* common case, no new connections so don't bother taking locks */
+    if (_con_queue) {
+        node = (connection_queue_t *)_con_queue;
+        _con_queue = node->next;
+        if (_con_queue == NULL)
+            _con_queue_tail = &_con_queue;
+        node->next = NULL;
+    } else {
+        INFO("sleeping");
+        thread_cond_wait(_connection_cond);
+        INFO("awake");
+    }
+    return node;
+}
+
+static void _connection_node_destroy (connection_queue_t *node) {
+    INFO("destroying node");
+
+    if (node->con)
+        connection_close(node->con);
+    free(node);
+}
+
+static void *_connection_thread (void *arg)
+{
+    connection_queue_t *node;
+    int err;
+
+    WARN("Launched connection thread");
+    while (_connection_running)
+    {
+        /* XXX(xaiki): this needs to wait on a fd, so we don't kill polar bears */
+        node = _get_connection();
+        INFO("got node");
+        if (!node) {
+            continue;
+        }
+	err = _connection_process (node, 3000);
+        if (err > 0) {
+            free(node);
+            continue;
+        }
+
+        switch (-err) {
+        case EAGAIN:     /* put it again at the end of the queue */
+            _add_connection (node);
+            break;
+        case EINPROGRESS:	/* already handled */
+            free(node);
+            break;
+        default:
+            ERROR ("droping node (%p, client => %p), error = %d", node, node->client, err);
+            _connection_node_destroy (node);
+            break;
+        }
+    }
+
+    while (_con_queue) {
+        node = _get_connection();
+        _connection_node_destroy (node);
+    }
+
+    INFO0 ("Connection thread shutdown complete");
+    return NULL;
+}
+
+static int _connection_process (connection_t *con, int timeout) {
     ice_config_t *config;
     client_t *client = NULL;
     listener_t *listener;
@@ -648,6 +759,7 @@ int connection_process (connection_t *con, int timeout) {
 
 void connection_accept_loop (void)
 {
+    connection_queue_t *node;
     connection_t *con;
     ice_config_t *config;
     int duration = 300;
@@ -667,8 +779,10 @@ void connection_accept_loop (void)
             continue;
         }
 
-        if (connection_process (con, timeout) != -1)
-            duration = 5;
+        /* add connection async to the connection queue, then the
+         * connection loop will do all the dirty work */
+        node =_connection_node_new (con);
+        _add_connection (node);
     }
 
     /* Give all the other threads notification to shut down */
@@ -713,7 +827,7 @@ int connection_complete_source (source_t *source, int response)
                     source->client = NULL;
                 }
                 WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
-                return -1;
+                return -EINPROGRESS;
             }
         }
         else
-- 
1.7.1



More information about the Icecast-dev mailing list