[xiph-commits] r10370 - icecast/trunk/icecast/src
karl at svn.xiph.org
karl at svn.xiph.org
Mon Nov 14 16:36:36 PST 2005
Author: karl
Date: 2005-11-14 16:36:34 -0800 (Mon, 14 Nov 2005)
New Revision: 10370
Modified:
icecast/trunk/icecast/src/stats.c
Log:
update queue handling for stats. This was slow when many stats were being
queued. These apply to both web interface requests and stats clients.
Modified: icecast/trunk/icecast/src/stats.c
===================================================================
--- icecast/trunk/icecast/src/stats.c 2005-11-15 00:29:24 UTC (rev 10369)
+++ icecast/trunk/icecast/src/stats.c 2005-11-15 00:36:34 UTC (rev 10370)
@@ -51,10 +51,18 @@
#define STATS_EVENT_REMOVE 4
#define STATS_EVENT_HIDDEN 5
+typedef struct _event_queue_tag
+{
+ volatile stats_event_t *head;
+ volatile stats_event_t **tail;
+} event_queue_t;
+
+#define event_queue_init(qp) { (qp)->head = NULL; (qp)->tail = &(qp)->head; }
+
typedef struct _event_listener_tag
{
- stats_event_t **queue;
- mutex_t *mutex;
+ event_queue_t queue;
+ mutex_t mutex;
struct _event_listener_tag *next;
} event_listener_t;
@@ -66,7 +74,7 @@
static stats_t _stats;
static mutex_t _stats_mutex;
-static volatile stats_event_t *_global_event_queue;
+static event_queue_t _global_event_queue;
mutex_t _global_event_mutex;
static volatile event_listener_t *_event_listeners;
@@ -77,10 +85,11 @@
static int _compare_source_stats(void *a, void *b, void *arg);
static int _free_stats(void *key);
static int _free_source_stats(void *key);
-static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue);
+static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
static stats_node_t *_find_node(avl_tree *tree, char *name);
static stats_source_t *_find_source(avl_tree *tree, char *source);
static void _free_event(stats_event_t *event);
+static stats_event_t *_get_event_from_queue (event_queue_t *queue);
/* simple helper function for creating an event */
@@ -106,19 +115,7 @@
static void queue_global_event (stats_event_t *event)
{
thread_mutex_lock(&_global_event_mutex);
- if (_global_event_queue == NULL)
- {
- _global_event_queue = event;
- }
- else
- {
- stats_event_t *node = (stats_event_t *)_global_event_queue;
- while (node->next)
- node = node->next;
- node->next = event;
- }
- /* DEBUG3("event added (%s, %s, %s)", event->source,
- event->name, event->value); */
+ _add_event_to_queue (event, &_global_event_queue);
thread_mutex_unlock(&_global_event_mutex);
}
@@ -134,7 +131,7 @@
thread_mutex_create(&_stats_mutex);
/* set up stats queues */
- _global_event_queue = NULL;
+ event_queue_init (&_global_event_queue);
thread_mutex_create(&_global_event_mutex);
/* fire off the stats thread */
@@ -145,7 +142,6 @@
void stats_shutdown()
{
int n;
- stats_event_t *event, *next;
if(!_stats_running) /* We can't shutdown if we're not running. */
return;
@@ -172,17 +168,17 @@
avl_tree_free(_stats.source_tree, _free_source_stats);
avl_tree_free(_stats.global_tree, _free_stats);
- event = (stats_event_t *)_global_event_queue;
- while(event) {
+ while (1)
+ {
+ stats_event_t *event = _get_event_from_queue (&_global_event_queue);
+ if (event == NULL) break;
if(event->source)
free(event->source);
if(event->value)
free(event->value);
if(event->name)
free(event->name);
- next = event->next;
free(event);
- event = next;
}
}
@@ -537,7 +533,7 @@
void stats_event_time (const char *mount, const char *name)
{
time_t now = time(NULL);
- struct tm local;
+ struct tm local;
char buffer[100];
localtime_r (&now, &local);
@@ -571,11 +567,10 @@
INFO0 ("stats thread started");
while (_stats_running) {
- if (_global_event_queue != NULL) {
+ if (_global_event_queue.head != NULL) {
/* grab the next event from the queue */
thread_mutex_lock(&_global_event_mutex);
- event = (stats_event_t *)_global_event_queue;
- _global_event_queue = event->next;
+ event = _get_event_from_queue (&_global_event_queue);
thread_mutex_unlock(&_global_event_mutex);
event->next = NULL;
@@ -593,9 +588,9 @@
listener = (event_listener_t *)_event_listeners;
while (listener) {
copy = _copy_event(event);
- thread_mutex_lock(listener->mutex);
- _add_event_to_queue(copy, listener->queue);
- thread_mutex_unlock(listener->mutex);
+ thread_mutex_lock (&listener->mutex);
+ _add_event_to_queue (copy, &listener->queue);
+ thread_mutex_unlock (&listener->mutex);
listener = listener->next;
}
@@ -614,16 +609,15 @@
}
/* you must have the _stats_mutex locked here */
-static void _unregister_listener(stats_event_t **queue)
+static void _unregister_listener(event_listener_t *listener)
{
event_listener_t **prev = (event_listener_t **)&_event_listeners,
*current = *prev;
while (current)
{
- if (current->queue == queue)
+ if (current == listener)
{
*prev = current->next;
- free (current);
break;
}
prev = ¤t->next;
@@ -632,25 +626,6 @@
}
-/* you must have the _stats_mutex locked here */
-static void _register_listener(stats_event_t **queue, mutex_t *mutex)
-{
- event_listener_t *node;
- event_listener_t *evli = (event_listener_t *)malloc(sizeof(event_listener_t));
-
- evli->queue = queue;
- evli->mutex = mutex;
- evli->next = NULL;
-
- if (_event_listeners == NULL) {
- _event_listeners = evli;
- } else {
- node = (event_listener_t *)_event_listeners;
- while (node->next) node = node->next;
- node->next = evli;
- }
-}
-
static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
{
stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
@@ -668,35 +643,32 @@
return event;
}
-static void _add_event_to_queue(stats_event_t *event, stats_event_t **queue)
-{
- stats_event_t *node;
- if (*queue == NULL) {
- *queue = event;
- } else {
- node = *queue;
- while (node->next) node = node->next;
- node->next = event;
- }
+static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
+{
+ *queue->tail = event;
+ queue->tail = (volatile stats_event_t **)&event->next;
}
-static stats_event_t *_get_event_from_queue(stats_event_t **queue)
+
+static stats_event_t *_get_event_from_queue (event_queue_t *queue)
{
- stats_event_t *event;
+ stats_event_t *event = NULL;
- if (*queue == NULL) return NULL;
+ if (queue && queue->head)
+ {
+ event = (stats_event_t *)queue->head;
+ queue->head = event->next;
+ if (queue->head == NULL)
+ queue->tail = &queue->head;
+ }
- event = *queue;
- *queue = (*queue)->next;
- event->next = NULL;
-
return event;
}
static int _send_event_to_client(stats_event_t *event, client_t *client)
{
- int ret = -1, len;
+ int len;
char buf [200];
/* send data to the client!!!! */
@@ -705,12 +677,15 @@
event->name ? event->name : "null",
event->value ? event->value : "null");
if (len > 0 && len < (int)sizeof (buf))
- ret = client_send_bytes (client, buf, len);
-
- return (ret == -1) ? 0 : 1;
+ {
+ client_send_bytes (client, buf, len);
+ if (client->con->error)
+ return -1;
+ }
+ return 0;
}
-void _dump_stats_to_queue(stats_event_t **queue)
+void _dump_stats_to_queue (event_queue_t *queue)
{
avl_node *node;
avl_node *node2;
@@ -750,7 +725,7 @@
** the queue for all new events atomically.
** note: mutex must already be created!
*/
-static void _atomic_get_and_register(stats_event_t **queue, mutex_t *mutex)
+static void _register_listener (event_listener_t *listener)
{
avl_node *node;
avl_node *node2;
@@ -765,7 +740,7 @@
node = avl_get_first(_stats.global_tree);
while (node) {
event = _make_event_from_node((stats_node_t *)node->key, NULL);
- _add_event_to_queue(event, queue);
+ _add_event_to_queue (event, &listener->queue);
node = avl_get_next(node);
}
@@ -777,7 +752,7 @@
node2 = avl_get_first(source->stats_tree);
while (node2) {
event = _make_event_from_node((stats_node_t *)node2->key, source->source);
- _add_event_to_queue(event, queue);
+ _add_event_to_queue (event, &listener->queue);
node2 = avl_get_next(node2);
}
@@ -786,7 +761,8 @@
}
/* now we register to receive future event notices */
- _register_listener(queue, mutex);
+ listener->next = (event_listener_t *)_event_listeners;
+ _event_listeners = listener;
thread_mutex_unlock(&_stats_mutex);
}
@@ -794,48 +770,44 @@
void *stats_connection(void *arg)
{
client_t *client = (client_t *)arg;
- stats_event_t *local_event_queue = NULL;
- mutex_t local_event_mutex;
stats_event_t *event;
+ event_listener_t listener;
INFO0 ("stats client starting");
+ event_queue_init (&listener.queue);
/* increment the thread count */
thread_mutex_lock(&_stats_mutex);
_stats_threads++;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex);
- thread_mutex_create(&local_event_mutex);
+ thread_mutex_create (&(listener.mutex));
- _atomic_get_and_register(&local_event_queue, &local_event_mutex);
+ _register_listener (&listener);
while (_stats_running) {
- thread_mutex_lock(&local_event_mutex);
- event = _get_event_from_queue(&local_event_queue);
+ thread_mutex_lock (&listener.mutex);
+ event = _get_event_from_queue (&listener.queue);
+ thread_mutex_unlock (&listener.mutex);
if (event != NULL) {
- if (!_send_event_to_client(event, client)) {
+ if (_send_event_to_client(event, client) < 0) {
_free_event(event);
- thread_mutex_unlock(&local_event_mutex);
break;
}
_free_event(event);
- } else {
- thread_mutex_unlock(&local_event_mutex);
- thread_sleep (500000);
continue;
}
-
- thread_mutex_unlock(&local_event_mutex);
+ thread_sleep (500000);
}
thread_mutex_lock(&_stats_mutex);
- _unregister_listener (&local_event_queue);
+ _unregister_listener (&listener);
_stats_threads--;
stats_event_args (NULL, "stats", "%d", _stats_threads);
thread_mutex_unlock(&_stats_mutex);
- thread_mutex_destroy(&local_event_mutex);
+ thread_mutex_destroy (&listener.mutex);
client_destroy (client);
INFO0 ("stats client finished");
@@ -916,19 +888,18 @@
void stats_get_xml(xmlDocPtr *doc, int show_hidden)
{
stats_event_t *event;
- stats_event_t *queue;
+ event_queue_t queue;
xmlNodePtr node, srcnode;
source_xml_t *src_nodes = NULL;
source_xml_t *next;
- queue = NULL;
- _dump_stats_to_queue(&queue);
+ event_queue_init (&queue);
+ _dump_stats_to_queue (&queue);
*doc = xmlNewDoc("1.0");
node = xmlNewDocNode(*doc, NULL, "icestats", NULL);
xmlDocSetRootElement(*doc, node);
-
event = _get_event_from_queue(&queue);
while (event)
{
@@ -961,7 +932,7 @@
{
int bytes;
stats_event_t *event;
- stats_event_t *queue;
+ event_queue_t queue;
xmlDocPtr doc;
xmlNodePtr node, srcnode;
int len;
@@ -969,8 +940,8 @@
source_xml_t *snd;
source_xml_t *src_nodes = NULL;
- queue = NULL;
- _dump_stats_to_queue(&queue);
+ event_queue_init (&queue);
+ _dump_stats_to_queue (&queue);
doc = xmlNewDoc("1.0");
node = xmlNewDocNode(doc, NULL, "icestats", NULL);
More information about the commits
mailing list