[xiph-commits] r10370 - icecast/trunk/icecast/src

karl at svn.xiph.org karl at svn.xiph.org
Mon Nov 14 16:36:36 PST 2005


Author: karl
Date: 2005-11-14 16:36:34 -0800 (Mon, 14 Nov 2005)
New Revision: 10370

Modified:
   icecast/trunk/icecast/src/stats.c
Log:
update queue handling for stats. This was slow when many stats were being
queued. These apply to both web interface requests and stats clients.


Modified: icecast/trunk/icecast/src/stats.c
===================================================================
--- icecast/trunk/icecast/src/stats.c	2005-11-15 00:29:24 UTC (rev 10369)
+++ icecast/trunk/icecast/src/stats.c	2005-11-15 00:36:34 UTC (rev 10370)
@@ -51,10 +51,18 @@
 #define STATS_EVENT_REMOVE  4
 #define STATS_EVENT_HIDDEN  5
 
+typedef struct _event_queue_tag
+{
+    volatile stats_event_t *head;
+    volatile stats_event_t **tail;
+} event_queue_t;
+
+#define event_queue_init(qp)    { (qp)->head = NULL; (qp)->tail = &(qp)->head; }
+
 typedef struct _event_listener_tag
 {
-    stats_event_t **queue;
-    mutex_t *mutex;
+    event_queue_t queue;
+    mutex_t mutex;
 
     struct _event_listener_tag *next;
 } event_listener_t;
@@ -66,7 +74,7 @@
 static stats_t _stats;
 static mutex_t _stats_mutex;
 
-static volatile stats_event_t *_global_event_queue;
+static event_queue_t _global_event_queue;
 mutex_t _global_event_mutex;
 
 static volatile event_listener_t *_event_listeners;
@@ -77,10 +85,11 @@
 static int _compare_source_stats(void *a, void *b, void *arg);
 static int _free_stats(void *key);
 static int _free_source_stats(void *key);
-static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue);
+static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
 static stats_node_t *_find_node(avl_tree *tree, char *name);
 static stats_source_t *_find_source(avl_tree *tree, char *source);
 static void _free_event(stats_event_t *event);
+static stats_event_t *_get_event_from_queue (event_queue_t *queue);
 
 
 /* simple helper function for creating an event */
@@ -106,19 +115,7 @@
 static void queue_global_event (stats_event_t *event)
 {
     thread_mutex_lock(&_global_event_mutex);
-    if (_global_event_queue == NULL)
-    {
-        _global_event_queue = event;
-    }
-    else
-    {
-        stats_event_t *node = (stats_event_t *)_global_event_queue;
-        while (node->next)
-            node = node->next;
-        node->next = event;
-    }
-    /* DEBUG3("event added (%s, %s, %s)", event->source,
-         event->name, event->value); */
+    _add_event_to_queue (event, &_global_event_queue);
     thread_mutex_unlock(&_global_event_mutex);
 }
 
@@ -134,7 +131,7 @@
     thread_mutex_create(&_stats_mutex);
 
     /* set up stats queues */
-    _global_event_queue = NULL;
+    event_queue_init (&_global_event_queue);
     thread_mutex_create(&_global_event_mutex);
 
     /* fire off the stats thread */
@@ -145,7 +142,6 @@
 void stats_shutdown()
 {
     int n;
-    stats_event_t *event, *next;
 
     if(!_stats_running) /* We can't shutdown if we're not running. */
         return;
@@ -172,17 +168,17 @@
     avl_tree_free(_stats.source_tree, _free_source_stats);
     avl_tree_free(_stats.global_tree, _free_stats);
 
-    event = (stats_event_t *)_global_event_queue;
-    while(event) {
+    while (1)
+    {
+        stats_event_t *event = _get_event_from_queue (&_global_event_queue);
+        if (event == NULL) break;
         if(event->source)
             free(event->source);
         if(event->value)
             free(event->value);
         if(event->name)
             free(event->name);
-        next = event->next;
         free(event);
-        event = next;
     }
 }
 
@@ -537,7 +533,7 @@
 void stats_event_time (const char *mount, const char *name)
 {
     time_t now = time(NULL);
-    struct tm local; 
+    struct tm local;
     char buffer[100];
 
     localtime_r (&now, &local);
@@ -571,11 +567,10 @@
 
     INFO0 ("stats thread started");
     while (_stats_running) {
-        if (_global_event_queue != NULL) {
+        if (_global_event_queue.head != NULL) {
             /* grab the next event from the queue */
             thread_mutex_lock(&_global_event_mutex);
-            event = (stats_event_t *)_global_event_queue;
-            _global_event_queue = event->next;
+            event = _get_event_from_queue (&_global_event_queue);
             thread_mutex_unlock(&_global_event_mutex);
 
             event->next = NULL;
@@ -593,9 +588,9 @@
             listener = (event_listener_t *)_event_listeners;
             while (listener) {
                 copy = _copy_event(event);
-                thread_mutex_lock(listener->mutex);
-                _add_event_to_queue(copy, listener->queue);
-                thread_mutex_unlock(listener->mutex);
+                thread_mutex_lock (&listener->mutex);
+                _add_event_to_queue (copy, &listener->queue);
+                thread_mutex_unlock (&listener->mutex);
 
                 listener = listener->next;
             }
@@ -614,16 +609,15 @@
 }
 
 /* you must have the _stats_mutex locked here */
-static void _unregister_listener(stats_event_t **queue)
+static void _unregister_listener(event_listener_t *listener)
 {
     event_listener_t **prev = (event_listener_t **)&_event_listeners,
                      *current = *prev;
     while (current)
     {
-        if (current->queue == queue)
+        if (current == listener)
         {
             *prev = current->next;
-            free (current);
             break;
         }
         prev = &current->next;
@@ -632,25 +626,6 @@
 }
 
 
-/* you must have the _stats_mutex locked here */
-static void _register_listener(stats_event_t **queue, mutex_t *mutex)
-{
-    event_listener_t *node;
-    event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t));
-
-    evli->queue = queue;
-    evli->mutex = mutex;
-    evli->next = NULL;
-
-    if (_event_listeners == NULL) {
-        _event_listeners = evli;
-    } else {
-        node = (event_listener_t *)_event_listeners;
-        while (node->next) node = node->next;
-        node->next = evli;
-    }
-}
-
 static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
 {
     stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
@@ -668,35 +643,32 @@
     return event;
 }
 
-static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue)
-{
-    stats_event_t *node;
 
-    if (*queue == NULL) {
-        *queue = event;
-    } else {
-        node = *queue;
-        while (node->next) node = node->next;
-        node->next = event;
-    }
+static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
+{
+    *queue->tail = event;
+    queue->tail = (volatile stats_event_t **)&event->next;
 }
 
-static stats_event_t *_get_event_from_queue(stats_event_t **queue)
+
+static stats_event_t *_get_event_from_queue (event_queue_t *queue)
 {
-    stats_event_t *event;
+    stats_event_t *event = NULL;
 
-    if (*queue == NULL) return NULL;
+    if (queue && queue->head)
+    {
+        event = (stats_event_t *)queue->head;
+        queue->head = event->next;
+        if (queue->head == NULL)
+            queue->tail = &queue->head;
+    }
 
-    event = *queue;
-    *queue = (*queue)->next;
-    event->next = NULL;
-
     return event;
 }
 
 static int _send_event_to_client(stats_event_t *event, client_t *client)
 {
-    int ret = -1, len;
+    int len;
     char buf [200];
 
     /* send data to the client!!!! */
@@ -705,12 +677,15 @@
             event->name ? event->name : "null",
             event->value ? event->value : "null");
     if (len > 0 && len < (int)sizeof (buf))
-        ret = client_send_bytes (client, buf, len);
-
-    return (ret == -1) ? 0 : 1;
+    {
+        client_send_bytes (client, buf, len);
+        if (client->con->error)
+            return -1;
+    }
+    return 0;
 }
 
-void _dump_stats_to_queue(stats_event_t **queue)
+void _dump_stats_to_queue (event_queue_t *queue)
 {
     avl_node *node;
     avl_node *node2;
@@ -750,7 +725,7 @@
 ** the queue for all new events atomically.
 ** note: mutex must already be created!
 */
-static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
+static void _register_listener (event_listener_t *listener)
 {
     avl_node *node;
     avl_node *node2;
@@ -765,7 +740,7 @@
     node = avl_get_first(_stats.global_tree);
     while (node) {
         event = _make_event_from_node((stats_node_t *)node->key, NULL);
-        _add_event_to_queue(event, queue);
+        _add_event_to_queue (event, &listener->queue);
 
         node = avl_get_next(node);
     }
@@ -777,7 +752,7 @@
         node2 = avl_get_first(source->stats_tree);
         while (node2) {
             event = _make_event_from_node((stats_node_t *)node2->key, source->source);
-            _add_event_to_queue(event, queue);
+            _add_event_to_queue (event, &listener->queue);
 
             node2 = avl_get_next(node2);
         }
@@ -786,7 +761,8 @@
     }
 
     /* now we register to receive future event notices */
-    _register_listener(queue, mutex);
+    listener->next = (event_listener_t *)_event_listeners;
+    _event_listeners = listener;
 
     thread_mutex_unlock(&_stats_mutex);
 }
@@ -794,48 +770,44 @@
 void *stats_connection(void *arg)
 {
     client_t *client = (client_t *)arg;
-    stats_event_t *local_event_queue = NULL;
-    mutex_t local_event_mutex;
     stats_event_t *event;
+    event_listener_t listener;
 
     INFO0 ("stats client starting");
 
+    event_queue_init (&listener.queue);
     /* increment the thread count */
     thread_mutex_lock(&_stats_mutex);
     _stats_threads++;
     stats_event_args (NULL, "stats", "%d", _stats_threads);
     thread_mutex_unlock(&_stats_mutex);
 
-    thread_mutex_create(&local_event_mutex);
+    thread_mutex_create (&(listener.mutex));
 
-    _atomic_get_and_register(&local_event_queue, &local_event_mutex);
+    _register_listener (&listener);
 
     while (_stats_running) {
-        thread_mutex_lock(&local_event_mutex);
-        event = _get_event_from_queue(&local_event_queue);
+        thread_mutex_lock (&listener.mutex);
+        event = _get_event_from_queue (&listener.queue);
+        thread_mutex_unlock (&listener.mutex);
         if (event != NULL) {
-            if (!_send_event_to_client(event, client)) {
+            if (_send_event_to_client(event, client) < 0) {
                 _free_event(event);
-                thread_mutex_unlock(&local_event_mutex);
                 break;
             }
             _free_event(event);
-        } else {
-            thread_mutex_unlock(&local_event_mutex);
-            thread_sleep (500000);
             continue;
         }
-                   
-        thread_mutex_unlock(&local_event_mutex);
+        thread_sleep (500000);
     }
 
     thread_mutex_lock(&_stats_mutex);
-    _unregister_listener (&local_event_queue);
+    _unregister_listener (&listener);
     _stats_threads--;
     stats_event_args (NULL, "stats", "%d", _stats_threads);
     thread_mutex_unlock(&_stats_mutex);
 
-    thread_mutex_destroy(&local_event_mutex);
+    thread_mutex_destroy (&listener.mutex);
     client_destroy (client);
     INFO0 ("stats client finished");
 
@@ -916,19 +888,18 @@
 void stats_get_xml(xmlDocPtr *doc, int show_hidden)
 {
     stats_event_t *event;
-    stats_event_t *queue;
+    event_queue_t queue;
     xmlNodePtr node, srcnode;
     source_xml_t *src_nodes = NULL;
     source_xml_t *next;
 
-    queue = NULL;
-    _dump_stats_to_queue(&queue);
+    event_queue_init (&queue);
+    _dump_stats_to_queue (&queue);
 
     *doc = xmlNewDoc("1.0");
     node = xmlNewDocNode(*doc, NULL, "icestats", NULL);
     xmlDocSetRootElement(*doc, node);
 
-
     event = _get_event_from_queue(&queue);
     while (event)
     {
@@ -961,7 +932,7 @@
 {
     int bytes;
     stats_event_t *event;
-    stats_event_t *queue;
+    event_queue_t queue;
     xmlDocPtr doc;
     xmlNodePtr node, srcnode;
     int len;
@@ -969,8 +940,8 @@
     source_xml_t *snd;
     source_xml_t *src_nodes = NULL;
 
-    queue = NULL;
-    _dump_stats_to_queue(&queue);
+    event_queue_init (&queue);
+    _dump_stats_to_queue (&queue);
 
     doc = xmlNewDoc("1.0");
     node = xmlNewDocNode(doc, NULL, "icestats", NULL);



More information about the commits mailing list