[Icecast-dev] [PATCH 17/31] Connection: add threads this needs to go after the client_tag_t obsoletting patch
Niv Sardi
nsardi at smartjog.com
Fri Jul 30 07:54:39 PDT 2010
Signed-off-by: Niv Sardi <nsardi at smartjog.com>
---
src/connection.c | 150 +++++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 132 insertions(+), 18 deletions(-)
diff --git a/src/connection.c b/src/connection.c
index 7641e8e..118258d 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -82,19 +82,10 @@
#define SHOUTCAST_SOURCE_AUTH 1
#define ICECAST_SOURCE_AUTH 0
-typedef struct client_queue_tag {
- client_t *client;
- int offset;
- int stream_offset;
- int shoutcast;
- char *shoutcast_mount;
- struct client_queue_tag *next;
-} client_queue_t;
-
-typedef struct _thread_queue_tag {
- thread_type *thread_id;
- struct _thread_queue_tag *next;
-} thread_queue_t;
+typedef struct connection_queue_tag {
+ connection_t *con;
+ struct connection_queue_tag *next;
+} connection_queue_t;
typedef struct
{
@@ -107,8 +98,11 @@ typedef struct
static spin_t _connection_lock;
static volatile unsigned long _current_id = 0;
static int _initialized = 0;
+static volatile int _connection_running = 0;
+static volatile connection_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
+static cond_t *_connection_cond;
+static thread_type *_connection_thread_id;
-static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
static int ssl_ok;
#ifdef HAVE_OPENSSL
static SSL_CTX *ssl_ctx;
@@ -121,6 +115,8 @@ rwlock_t _source_shutdown_rwlock;
static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount);
static int _handle_client (client_t *client);
+static int _connection_process (connection_t *con, int timeout);
+static void *_connection_thread (void *arg);
static int compare_ip (void *arg, void *a, void *b)
{
@@ -155,6 +151,12 @@ void connection_initialize(void)
allowed_ip.contents = NULL;
allowed_ip.file_mtime = 0;
+ _connection_running = 1;
+ /* (XXX)xaiki: need a way to make it go away on shutdown, ok for now */
+ _connection_thread_id = thread_create ("Connection Thread", _connection_thread,
+ NULL, THREAD_DETACHED);
+ _connection_cond = thread_cond_create ();
+
_initialized = 1;
}
@@ -162,12 +164,20 @@ void connection_shutdown(void)
{
if (!_initialized) return;
+ _connection_running = 0;
+ thread_cond_signal(_connection_cond);
+ DEBUG0 ("waiting for connection thread");
+ thread_join(_connection_thread_id);
+
#ifdef HAVE_OPENSSL
SSL_CTX_free (ssl_ctx);
#endif
if (banned_ip.contents) avl_tree_free (banned_ip.contents, free_filtered_ip);
if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_ip);
+ thread_cond_destroy (_connection_cond);
+ free (_connection_cond);
+
thread_cond_destroy(&global.shutdown_cond);
thread_rwlock_destroy(&_source_shutdown_rwlock);
thread_spin_destroy (&_connection_lock);
@@ -551,7 +561,108 @@ static connection_t *_accept_connection(int duration)
return NULL;
}
-int connection_process (connection_t *con, int timeout) {
+connection_queue_t *_connection_node_new (connection_t *con)
+{
+ connection_queue_t *node;
+ if (!con)
+ return NULL;
+
+ node = calloc (1, sizeof (connection_queue_t));
+ if (!node)
+ return NULL;
+
+ node->con = con;
+
+ return node;
+}
+
+/* add a connection to connection queue. At this point the connection
+ * has just been accepted, we push it to the queue and return so that
+ * we can keep getting connections in.
+ */
+static void _add_connection (connection_queue_t *node)
+{
+ WARN ("added connection");
+ *_con_queue_tail = node;
+ _con_queue_tail = (volatile connection_queue_t **)&node->next;
+
+ thread_cond_signal(_connection_cond);
+}
+
+/* this returns queued clients for the connection thread. headers are
+ * already provided, but need to be parsed.
+ */
+static connection_queue_t *_get_connection(void)
+{
+ connection_queue_t *node = NULL;
+
+ /* common case, no new connections so don't bother taking locks */
+ if (_con_queue) {
+ node = (connection_queue_t *)_con_queue;
+ _con_queue = node->next;
+ if (_con_queue == NULL)
+ _con_queue_tail = &_con_queue;
+ node->next = NULL;
+ } else {
+ INFO("sleeping");
+ thread_cond_wait(_connection_cond);
+ INFO("awake");
+ }
+ return node;
+}
+
+static void _connection_node_destroy (connection_queue_t *node) {
+ INFO("destroying node");
+
+ if (node->con)
+ connection_close(node->con);
+ free(node);
+}
+
+static void *_connection_thread (void *arg)
+{
+ connection_queue_t *node;
+ int err;
+
+ WARN("Launched connection thread");
+ while (_connection_running)
+ {
+ /* XXX(xaiki): this needs to wait on a fd, so we don't kill polar bears */
+ node = _get_connection();
+ INFO("got node");
+ if (!node) {
+ continue;
+ }
+ err = _connection_process (node, 3000);
+ if (err > 0) {
+ free(node);
+ continue;
+ }
+
+ switch (-err) {
+ case EAGAIN: /* put it again at the end of the queue */
+ _add_connection (node);
+ break;
+ case EINPROGRESS: /* already handled */
+ free(node);
+ break;
+ default:
+ ERROR ("droping node (%p, client => %p), error = %d", node, node->client, err);
+ _connection_node_destroy (node);
+ break;
+ }
+ }
+
+ while (_con_queue) {
+ node = _get_connection();
+ _connection_node_destroy (node);
+ }
+
+ INFO0 ("Connection thread shutdown complete");
+ return NULL;
+}
+
+static int _connection_process (connection_t *con, int timeout) {
ice_config_t *config;
client_t *client = NULL;
listener_t *listener;
@@ -648,6 +759,7 @@ int connection_process (connection_t *con, int timeout) {
void connection_accept_loop (void)
{
+ connection_queue_t *node;
connection_t *con;
ice_config_t *config;
int duration = 300;
@@ -667,8 +779,10 @@ void connection_accept_loop (void)
continue;
}
- if (connection_process (con, timeout) != -1)
- duration = 5;
+ /* add connection async to the connection queue, then the
+ * connection loop will do all the dirty work */
+ node =_connection_node_new (con);
+ _add_connection (node);
}
/* Give all the other threads notification to shut down */
@@ -713,7 +827,7 @@ int connection_complete_source (source_t *source, int response)
source->client = NULL;
}
WARN1("Content-type \"%s\" not supported, dropping source", contenttype);
- return -1;
+ return -EINPROGRESS;
}
}
else
--
1.7.1
More information about the Icecast-dev
mailing list