[xiph-commits] r8090 - icecast/trunk/icecast/src

karl at motherfish-iii.xiph.org karl at motherfish-iii.xiph.org
Mon Oct 25 07:03:43 PDT 2004


Author: karl
Date: 2004-10-25 07:03:42 -0700 (Mon, 25 Oct 2004)
New Revision: 8090

Modified:
   icecast/trunk/icecast/src/source.c
   icecast/trunk/icecast/src/stats.c
   icecast/trunk/icecast/src/stats.h
Log:
The _inc/_dec routines can race causing incorrect values as they don't account
for unprocessed stat events.  Here I push the actual calculations to the stats
thread. The API is maintained however all stats for a specific source can be
dropped with one call now.


Modified: icecast/trunk/icecast/src/source.c
===================================================================
--- icecast/trunk/icecast/src/source.c	2004-10-25 10:03:48 UTC (rev 8089)
+++ icecast/trunk/icecast/src/source.c	2004-10-25 14:03:42 UTC (rev 8090)
@@ -745,7 +745,7 @@
 
     /* delete this sources stats */
     stats_event_dec(NULL, "sources");
-    stats_event(source->mount, "listeners", NULL);
+    stats_event(source->mount, NULL, NULL);
 
     /* we don't remove the source from the tree here, it may be a relay and
        therefore reserved */

Modified: icecast/trunk/icecast/src/stats.c
===================================================================
--- icecast/trunk/icecast/src/stats.c	2004-10-25 10:03:48 UTC (rev 8089)
+++ icecast/trunk/icecast/src/stats.c	2004-10-25 14:03:42 UTC (rev 8090)
@@ -35,11 +35,20 @@
 #include "client.h"
 #include "stats.h"
 #include "xslt.h"
+#define CATMODULE "stats"
+#include "logging.h"
 
 #ifdef _WIN32
 #define vsnprintf _vsnprintf
+#define snprintf _snprintf
 #endif
 
+#define STATS_EVENT_SET     0
+#define STATS_EVENT_INC     1
+#define STATS_EVENT_DEC     2
+#define STATS_EVENT_ADD     3
+#define STATS_EVENT_REMOVE  4
+
 typedef struct _event_listener_tag
 {
     stats_event_t **queue;
@@ -48,22 +57,19 @@
     struct _event_listener_tag *next;
 } event_listener_t;
 
-int _stats_running = 0;
-thread_type *_stats_thread_id;
-int _stats_threads = 0;
+static volatile int _stats_running = 0;
+static thread_type *_stats_thread_id;
+static volatile int _stats_threads = 0;
 
-stats_t _stats;
-mutex_t _stats_mutex;
+static stats_t _stats;
+static mutex_t _stats_mutex;
 
-stats_event_t *_global_event_queue;
+static volatile stats_event_t *_global_event_queue;
 mutex_t _global_event_mutex;
 
-cond_t _event_signal_cond;
+static volatile event_listener_t *_event_listeners;
 
-event_listener_t *_event_listeners;
 
-
-
 static void *_stats_thread(void *arg);
 static int _compare_stats(void *a, void *b, void *arg);
 static int _compare_source_stats(void *a, void *b, void *arg);
@@ -74,6 +80,46 @@
 static stats_source_t *_find_source(avl_tree *tree, char *source);
 static void _free_event(stats_event_t *event);
 
+
+/* simple helper function for creating an event */
+static stats_event_t *build_event (const char *source, const char *name, const char *value)
+{
+    stats_event_t *event;
+
+    event = (stats_event_t *)calloc(1, sizeof(stats_event_t));
+    if (event)
+    {
+        if (source)
+            event->source = (char *)strdup(source);
+        if (name)
+            event->name = (char *)strdup(name);
+        if (value)
+            event->value = (char *)strdup(value);
+        else
+            event->action = STATS_EVENT_REMOVE;
+    }
+    return event;
+}
+
+static void queue_global_event (stats_event_t *event)
+{
+    thread_mutex_lock(&_global_event_mutex);
+    if (_global_event_queue == NULL)
+    {
+        _global_event_queue = event;
+    }
+    else
+    {
+        stats_event_t *node = (stats_event_t *)_global_event_queue;
+        while (node->next)
+            node = node->next;
+        node->next = event;
+    }
+    /* DEBUG3("event added (%s, %s, %s)", event->source,
+         event->name, event->value); */
+    thread_mutex_unlock(&_global_event_mutex);
+}
+
 void stats_initialize()
 {
     _event_listeners = NULL;
@@ -85,9 +131,6 @@
     /* set up global mutex */
     thread_mutex_create(&_stats_mutex);
 
-    /* set up event signaler */
-    thread_cond_create(&_event_signal_cond);
-
     /* set up stats queues */
     _global_event_queue = NULL;
     thread_mutex_create(&_global_event_mutex);
@@ -116,19 +159,18 @@
         n = _stats_threads;
         thread_mutex_unlock(&_stats_mutex);
     } while (n > 0);
+    INFO0("stats thread finished");
 
     /* free the queues */
 
     /* destroy the queue mutexes */
     thread_mutex_destroy(&_global_event_mutex);
 
-    /* tear it all down */
-    thread_cond_destroy(&_event_signal_cond);
     thread_mutex_destroy(&_stats_mutex);
     avl_tree_free(_stats.source_tree, _free_source_stats);
     avl_tree_free(_stats.global_tree, _free_stats);
 
-    event = _global_event_queue;
+    event = (stats_event_t *)_global_event_queue;
     while(event) {
         if(event->source)
             free(event->source);
@@ -155,43 +197,36 @@
     return NULL;
 }
 
-void stats_event(char *source, char *name, char *value)
+/* simple name=tag stat create/update */
+void stats_event(const char *source, const char *name, const char *value)
 {
-    stats_event_t *node;
     stats_event_t *event;
 
-    if (name == NULL || strcmp(name, "") == 0) return;
+    event = build_event (source, name, value);
+    if (event)
+        queue_global_event (event);
+}
 
-    /* build event */
-    event = (stats_event_t *)malloc(sizeof(stats_event_t));
-    event->source = NULL;
-    if (source != NULL) event->source = (char *)strdup(source);
-    event->name = (char *)strdup(name);
-    event->value = NULL;
-    event->next = NULL;
-    if (value != NULL) event->value = (char *)strdup(value);
 
-    /* queue event */
-    thread_mutex_lock(&_global_event_mutex);
-    if (_global_event_queue == NULL) {
-        _global_event_queue = event;
-    } else {
-        node = _global_event_queue;
-        while (node->next) node = node->next;
-        node->next = event;
-    }
-    thread_mutex_unlock(&_global_event_mutex);
-}
-
-void stats_event_args(char *source, char *name, char *format, ...)
+/* printf style formatting for stat create/update */
+void stats_event_args(const char *source, char *name, char *format, ...)
 {
     char buf[1024];
     va_list val;
-    
+    int ret;
+
+    if (name == NULL)
+        return;
     va_start(val, format);
-    vsnprintf(buf, 1024, format, val);
+    ret = vsnprintf(buf, 1024, format, val);
     va_end(val);
 
+    if (ret < 0 || (unsigned int)ret >= sizeof (buf))
+    {
+        WARN2 ("problem with formatting %s stat %s",
+                source==NULL ? "global" : source, name);
+        return;
+    }
     stats_event(source, name, buf);
 }
 
@@ -223,56 +258,42 @@
 {
     return(_get_stats(source, name));
 }
-void stats_event_inc(char *source, char *name)
+
+/* increase the value in the provided stat by 1 */
+void stats_event_inc(const char *source, const char *name)
 {
-    char *old_value;
-    int new_value;
-    
-    old_value = _get_stats(source, name);
-    if (old_value != NULL) {
-        new_value = atoi(old_value);
-        free(old_value);
-        new_value++;
-    } else {
-        new_value = 1;
+    stats_event_t *event = build_event (source, name, NULL);
+    /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
+    if (event)
+    {
+        event->action = STATS_EVENT_INC;
+        queue_global_event (event);
     }
-
-    stats_event_args(source, name, "%d", new_value);
 }
 
-void stats_event_add(char *source, char *name, unsigned long value)
+void stats_event_add(const char *source, const char *name, unsigned long value)
 {
-    char *old_value;
-    unsigned long new_value;
-
-    old_value = _get_stats(source, name);
-    if (old_value != NULL) {
-        new_value = atol(old_value);
-        free(old_value);
-        new_value += value;
-    } else {
-        new_value = value;
+    stats_event_t *event = build_event (source, name, NULL);
+    /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
+    if (event)
+    {
+        event->value = malloc (16);
+        snprintf (event->value, 16, "%ld", value);
+        event->action = STATS_EVENT_ADD;
+        queue_global_event (event);
     }
-
-    stats_event_args(source, name, "%ld", new_value);
 }
 
-void stats_event_dec(char *source, char *name)
+/* decrease the value in the provided stat by 1 */
+void stats_event_dec(const char *source, const char *name)
 {
-    char *old_value;
-    int new_value;
-
-    old_value = _get_stats(source, name);
-    if (old_value != NULL) {
-        new_value = atoi(old_value);
-        free(old_value);
-        new_value--;
-        if (new_value < 0) new_value = 0;
-    } else {
-        new_value = 0;
+    /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
+    stats_event_t *event = build_event (source, name, NULL);
+    if (event)
+    {
+        event->action = STATS_EVENT_DEC;
+        queue_global_event (event);
     }
-
-    stats_event_args(source, name, "%d", new_value);
 }
 
 /* note: you must call this function only when you have exclusive access
@@ -330,12 +351,13 @@
 
 static stats_event_t *_copy_event(stats_event_t *event)
 {
-    stats_event_t *copy = (stats_event_t *)malloc(sizeof(stats_event_t));
+    stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t));
     if (event->source) 
         copy->source = (char *)strdup(event->source);
     else
         copy->source = NULL;
-    copy->name = (char *)strdup(event->name);
+    if (event->name)
+        copy->name = (char *)strdup(event->name);
     if (event->value)
         copy->value = (char *)strdup(event->value);
     else
@@ -345,103 +367,167 @@
     return copy;
 }
 
+
+/* helper to apply specialised changes to a stats node */
+static void modify_node_event (stats_node_t *node, stats_event_t *event)
+{
+    char *str;
+
+    if (event->action != STATS_EVENT_SET)
+    {
+        int value = 0;
+
+        switch (event->action)
+        {
+            case STATS_EVENT_INC:
+                value = atoi (node->value)+1;
+                break;
+            case STATS_EVENT_DEC:
+                value = atoi (node->value)-1;
+                break;
+            case STATS_EVENT_ADD:
+                value = atoi (node->value)+atoi (event->value);
+                break;
+            default:
+                break;
+        }
+        str = malloc (16);
+        snprintf (str, 16, "%d", value);
+    }
+    else
+        str = (char *)strdup (event->value);
+    free (node->value);
+    node->value = str;
+    /* DEBUG3 ("update node %s \"%s\" (%d)", node->name, node->value, event->action); */
+}
+
+
+static void process_global_event (stats_event_t *event)
+{
+    stats_node_t *node;
+
+    /* DEBUG3("global event %s %s %d", event->name, event->value, event->action); */
+    if (event->action == STATS_EVENT_REMOVE)
+    {
+        /* we're deleting */
+        node = _find_node(_stats.global_tree, event->name);
+        if (node != NULL)
+            avl_delete(_stats.global_tree, (void *)node, _free_stats);
+        return;
+    }
+    node = _find_node(_stats.global_tree, event->name);
+    if (node)
+    {
+        modify_node_event (node, event);
+    }
+    else
+    {
+        /* add node */
+        node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
+        node->name = (char *)strdup(event->name);
+        node->value = (char *)strdup(event->value);
+
+        avl_insert(_stats.global_tree, (void *)node);
+    }
+}
+
+
+static void process_source_event (stats_event_t *event)
+{
+    stats_source_t *snode = _find_source(_stats.source_tree, event->source);
+    if (snode == NULL)
+    {
+        if (event->action == STATS_EVENT_REMOVE)
+            return;
+        snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
+        if (snode == NULL)
+            return;
+        DEBUG1 ("new source stat %s", event->source);
+        snode->source = (char *)strdup(event->source);
+        snode->stats_tree = avl_tree_new(_compare_stats, NULL);
+
+        avl_insert(_stats.source_tree, (void *)snode);
+    }
+    if (event->name)
+    {
+        stats_node_t *node = _find_node(snode->stats_tree, event->name);
+        if (node == NULL)
+        {
+            if (event->action == STATS_EVENT_REMOVE)
+                return;
+            /* adding node */
+            if (event->value)
+            {
+                DEBUG2 ("new node %s (%s)", event->name, event->value);
+                node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
+                node->name = (char *)strdup(event->name);
+                node->value = (char *)strdup(event->value);
+
+                avl_insert(snode->stats_tree, (void *)node);
+            }
+            return;
+        }
+        if (event->action == STATS_EVENT_REMOVE)
+        {
+            DEBUG1 ("delete node %s", event->name);
+            avl_delete(snode->stats_tree, (void *)node, _free_stats);
+            return;
+        }
+        modify_node_event (node, event);
+        return;
+    }
+    if (event->action == STATS_EVENT_REMOVE)
+    {
+        DEBUG1 ("delete source node %s", event->source);
+        avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
+    }
+}
+
+
 static void *_stats_thread(void *arg)
 {
     stats_event_t *event;
     stats_event_t *copy;
-    stats_node_t *node;
-    stats_node_t *anode;
-    stats_source_t *snode;
-    stats_source_t *asnode;
     event_listener_t *listener;
-    avl_node *avlnode;
 
+    stats_event (NULL, "server", ICECAST_VERSION_STRING);
+
+    /* global currently active stats */
+    stats_event (NULL, "clients", "0");
+    stats_event (NULL, "connections", "0");
+    stats_event (NULL, "sources", "0");
+    stats_event (NULL, "stats", "0");
+
+    /* global accumulating stats */
+    stats_event (NULL, "client_connections", "0");
+    stats_event (NULL, "source_client_connections", "0");
+    stats_event (NULL, "source_relay_connections", "0");
+    stats_event (NULL, "source_total_connections", "0");
+    stats_event (NULL, "stats_connections", "0");
+
+    INFO0 ("stats thread started");
     while (_stats_running) {
-        thread_mutex_lock(&_global_event_mutex);
         if (_global_event_queue != NULL) {
             /* grab the next event from the queue */
-            event = _global_event_queue;
+            thread_mutex_lock(&_global_event_mutex);
+            event = (stats_event_t *)_global_event_queue;
             _global_event_queue = event->next;
+            thread_mutex_unlock(&_global_event_mutex);
+
             event->next = NULL;
             thread_mutex_unlock(&_global_event_mutex);
 
             thread_mutex_lock(&_stats_mutex);
-            if (event->source == NULL) {
-                /* we have a global event */
-                if (event->value != NULL) {
-                    /* adding/updating */
-                    node = _find_node(_stats.global_tree, event->name);
-                    if (node == NULL) {
-                        /* add node */
-                        anode = (stats_node_t *)malloc(sizeof(stats_node_t));
-                        anode->name = (char *)strdup(event->name);
-                        anode->value = (char *)strdup(event->value);
 
-                        avl_insert(_stats.global_tree, (void *)anode);
-                    } else {
-                        /* update node */
-                        free(node->value);
-                        node->value = (char *)strdup(event->value);
-                    }
-
-                } else {
-                    /* we're deleting */
-                    node = _find_node(_stats.global_tree, event->name);
-                    if (node != NULL)
-                        avl_delete(_stats.global_tree, (void *)node, _free_stats);
-                }
-            } else {
-                /* we have a source event */
-
-                snode = _find_source(_stats.source_tree, event->source);
-                if (snode != NULL) {
-                    /* this is a source we already have a tree for */
-                    if (event->value != NULL) {
-                        /* we're add/updating */
-                        node = _find_node(snode->stats_tree, event->name);
-                        if (node == NULL) {
-                            /* adding node */
-                            anode = (stats_node_t *)malloc(sizeof(stats_node_t));
-                            anode->name = (char *)strdup(event->name);
-                            anode->value = (char *)strdup(event->value);
-
-                            avl_insert(snode->stats_tree, (void *)anode);
-                        } else {
-                            /* updating node */
-                            free(node->value);
-                            node->value = (char *)strdup(event->value);
-                        }
-                    } else {
-                        /* we're deleting */
-                        node = _find_node(snode->stats_tree, event->name);
-                        if (node != NULL) {
-                            avl_delete(snode->stats_tree, (void *)node, _free_stats);
-
-                                avlnode = avl_get_first(snode->stats_tree);
-                            if (avlnode == NULL) {
-                                avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
-                            }
-                        }
-                    }
-                } else {
-                    /* this is a new source */
-                    asnode = (stats_source_t *)malloc(sizeof(stats_source_t));
-                    asnode->source = (char *)strdup(event->source);
-                    asnode->stats_tree = avl_tree_new(_compare_stats, NULL);
-
-                    anode = (stats_node_t *)malloc(sizeof(stats_node_t));
-                    anode->name = (char *)strdup(event->name);
-                    anode->value = (char *)strdup(event->value);
-                    
-                    avl_insert(asnode->stats_tree, (void *)anode);
-
-                    avl_insert(_stats.source_tree, (void *)asnode);
-                }
-            }
+            /* check if we are dealing with a global or source event */
+            if (event->source == NULL)
+                process_global_event (event);
+            else
+                process_source_event (event);
             
             /* now we have an event that's been processed into the running stats */
             /* this event should get copied to event listeners' queues */
-            listener = _event_listeners;
+            listener = (event_listener_t *)_event_listeners;
             while (listener) {
                 copy = _copy_event(event);
                 thread_mutex_lock(listener->mutex);
@@ -450,23 +536,17 @@
 
                 listener = listener->next;
             }
-            thread_cond_broadcast(&_event_signal_cond);
 
             /* now we need to destroy the event */
             _free_event(event);
 
             thread_mutex_unlock(&_stats_mutex);
             continue;
-        } else {
-            thread_mutex_unlock(&_global_event_mutex);
         }
 
         thread_sleep(300000);
     }
 
-    /* wake the other threads so they can shut down cleanly */
-    thread_cond_broadcast(&_event_signal_cond);
-
     return NULL;
 }
 
@@ -483,7 +563,7 @@
     if (_event_listeners == NULL) {
         _event_listeners = evli;
     } else {
-        node = _event_listeners;
+        node = (event_listener_t *)_event_listeners;
         while (node->next) node = node->next;
         node->next = evli;
     }
@@ -499,6 +579,7 @@
         event->source = NULL;
     event->name = (char *)strdup(node->name);
     event->value = (char *)strdup(node->value);
+    event->action = STATS_EVENT_SET;
     event->next = NULL;
 
     return event;
@@ -535,7 +616,10 @@
     int ret;
 
     /* send data to the client!!!! */
-    ret = sock_write(con->sock, "EVENT %s %s %s\n", (event->source != NULL) ? event->source : "global", event->name, event->value ? event->value : "null");
+    ret = sock_write(con->sock, "EVENT %s %s %s\n",
+            (event->source != NULL) ? event->source : "global",
+            event->name ? event->name : "null",
+            event->value ? event->value : "null");
 
     return (ret == -1) ? 0 : 1;
 }
@@ -649,7 +733,7 @@
             _free_event(event);
         } else {
             thread_mutex_unlock(&local_event_mutex);
-            thread_cond_wait(&_event_signal_cond);
+            thread_sleep (500000);
             continue;
         }
                    
@@ -806,8 +890,7 @@
     if (bytes > 0) client->con->sent_bytes += bytes;
     else goto send_error;
 
-    bytes = sock_write_bytes(client->con->sock, buff, len);
-    if (bytes > 0) client->con->sent_bytes += bytes;
+    bytes = client_send_bytes (client, buff, (unsigned)len);
 
  send_error:
     while (src_nodes) {

Modified: icecast/trunk/icecast/src/stats.h
===================================================================
--- icecast/trunk/icecast/src/stats.h	2004-10-25 10:03:48 UTC (rev 8089)
+++ icecast/trunk/icecast/src/stats.h	2004-10-25 14:03:42 UTC (rev 8090)
@@ -38,6 +38,7 @@
     char *source;
     char *name;
     char *value;
+    int  action;
 
     struct _stats_event_tag *next;
 } stats_event_t;
@@ -77,11 +78,11 @@
 
 stats_t *stats_get_stats();
 
-void stats_event(char *source, char *name, char *value);
-void stats_event_args(char *source, char *name, char *format, ...);
-void stats_event_inc(char *source, char *name);
-void stats_event_add(char *source, char *name, unsigned long value);
-void stats_event_dec(char *source, char *name);
+void stats_event(const char *source, const char *name, const char *value);
+void stats_event_args(const char *source, char *name, char *format, ...);
+void stats_event_inc(const char *source, const char *name);
+void stats_event_add(const char *source, const char *name, unsigned long value);
+void stats_event_dec(const char *source, const char *name);
 
 void *stats_connection(void *arg);
 void *stats_callback(void *arg);



More information about the commits mailing list