[xiph-commits] r8070 - icecast/trunk/icecast/src

karl at motherfish-iii.xiph.org karl at motherfish-iii.xiph.org
Sat Oct 23 17:34:15 PDT 2004


Author: karl
Date: 2004-10-23 17:34:15 -0700 (Sat, 23 Oct 2004)
New Revision: 8070

Modified:
   icecast/trunk/icecast/src/fserve.c
   icecast/trunk/icecast/src/fserve.h
Log:
fix a busy CPU case when slow and fast file serving clients are connected at
the same time.  Flag clients on return from select/poll and only process those.
Also fix a rare race which could leave clients in pending


Modified: icecast/trunk/icecast/src/fserve.c
===================================================================
--- icecast/trunk/icecast/src/fserve.c	2004-10-23 15:45:40 UTC (rev 8069)
+++ icecast/trunk/icecast/src/fserve.c	2004-10-24 00:34:15 UTC (rev 8070)
@@ -60,22 +60,22 @@
 #define MIMETYPESFILE "/etc/mime.types"
 #endif
 
-static avl_tree *client_tree;
-static avl_tree *pending_tree;
+static fserve_t *active_list = NULL;
+static volatile fserve_t *pending_list = NULL;
+
+static mutex_t pending_lock;
 static avl_tree *mimetypes = NULL;
 
-static cond_t fserv_cond;
 static thread_type *fserv_thread;
-static int run_fserv;
-static int fserve_clients;
+static int run_fserv = 0;
+static unsigned int fserve_clients;
 static int client_tree_changed=0;
 
 #ifdef HAVE_POLL
 static struct pollfd *ufds = NULL;
-static int ufdssize = 0;
 #else
 static fd_set fds;
-static int fd_max = 0;
+static int fd_max = -1;
 #endif
 
 typedef struct {
@@ -83,9 +83,6 @@
     char *type;
 } mime_type;
 
-/* avl tree helper */
-static int _compare_clients(void *compare_arg, void *a, void *b);
-static int _remove_client(void *key);
 static int _free_client(void *key);
 static int _delete_mapping(void *mapping);
 static void *fserv_thread_function(void *arg);
@@ -103,9 +100,7 @@
 
     create_mime_mappings(MIMETYPESFILE);
 
-    client_tree = avl_tree_new(_compare_clients, NULL);
-    pending_tree = avl_tree_new(_compare_clients, NULL);
-    thread_cond_create(&fserv_cond);
+    thread_mutex_create (&pending_lock);
 
     run_fserv = 1;
 
@@ -115,207 +110,205 @@
 
 void fserve_shutdown(void)
 {
-    ice_config_t *config = config_get_config();
-    int serve = config->fileserve;
-
-    config_release_config();
-
-    if(!serve)
-        return;
-
     if(!run_fserv)
         return;
 
-    avl_tree_free(mimetypes, _delete_mapping);
-
     run_fserv = 0;
-    thread_cond_signal(&fserv_cond);
     thread_join(fserv_thread);
-
-    thread_cond_destroy(&fserv_cond);
-    avl_tree_free(client_tree, _free_client);
-    avl_tree_free(pending_tree, _free_client);
+    INFO0("file serving thread stopped");
+    avl_tree_free(mimetypes, _delete_mapping);
 }
 
-static void wait_for_fds() {
-    avl_node *client_node;
-    fserve_t *client;
-    int i;
-
-    while(run_fserv) {
 #ifdef HAVE_POLL
-        if(client_tree_changed) {
-            client_tree_changed = 0;
-            i = 0;
-            ufdssize = fserve_clients;
-            ufds = realloc(ufds, ufdssize * sizeof(struct pollfd));
-            avl_tree_rlock(client_tree);
-            client_node = avl_get_first(client_tree);
-            while(client_node) {
-                client = client_node->key;
-                ufds[i].fd = client->client->con->sock;
-                ufds[i].events = POLLOUT;
-                client_node = avl_get_next(client_node);
-            }
-            avl_tree_unlock(client_tree);
-        }
+int fserve_client_waiting (void)
+{
+    fserve_t *fclient;
+    unsigned int i = 0;
 
-        if(poll(ufds, ufdssize, 200) > 0)
-            return;
+    /* only rebuild ufds if there are clients added/removed */
+    if (client_tree_changed)
+    {
+        client_tree_changed = 0;
+        ufds = realloc(ufds, fserve_clients * sizeof(struct pollfd));
+        fclient = active_list;
+        while (fclient)
+        {
+            ufds[i].fd = fclient->client->con->sock;
+            ufds[i].events = POLLOUT;
+            ufds[i].revents = 0;
+            fclient = fclient->next;
+            i++;
+        }
+    }
+    if (poll(ufds, fserve_clients, 200) > 0)
+    {
+        /* mark any clients that are ready */
+        fclient = active_list;
+        for (i=0; i<fserve_clients; i++)
+        {
+            if (ufds[i].revents & (POLLOUT|POLLHUP|POLLERR))
+                fclient->ready = 1;
+            fclient = fclient->next;
+        }
+        return 1;
+    }
+    return 0;
+}
 #else
+int fserve_client_waiting (void)
+{
+    fserve_t *fclient;
+    fd_set realfds;
+
+    /* only rebuild fds if there are clients added/removed */
+    if(client_tree_changed) {
+        client_tree_changed = 0;
+        FD_ZERO(&fds);
+        fd_max = -1;
+        fclient = active_list;
+        while (fclient) {
+            FD_SET (fclient->client->con->sock, &fds);
+            if (fclient->client->con->sock > fd_max)
+                fd_max = fclient->client->con->sock;
+            fclient = fclient->next;
+        }
+    }
+    /* hack for windows, select needs at least 1 descriptor */
+    if (fd_max == -1)
+        thread_sleep (200000);
+    else
+    {
         struct timeval tv;
-        fd_set realfds;
         tv.tv_sec = 0;
         tv.tv_usec = 200000;
-        if(client_tree_changed) {
-            client_tree_changed = 0;
-            i=0;
-            FD_ZERO(&fds);
-            fd_max = 0;
-            avl_tree_rlock(client_tree);
-            client_node = avl_get_first(client_tree);
-            while(client_node) {
-                client = client_node->key;
-                FD_SET(client->client->con->sock, &fds);
-                if(client->client->con->sock > fd_max)
-                    fd_max = client->client->con->sock;
-                client_node = avl_get_next(client_node);
-            }
-            avl_tree_unlock(client_tree);
-        }
-
+        /* make a duplicate of the set so we do not have to rebuild it
+         * each time around */
         memcpy(&realfds, &fds, sizeof(fd_set));
         if(select(fd_max+1, NULL, &realfds, NULL, &tv) > 0)
-            return;
+        {
+            /* mark any clients that are ready */
+            fclient = active_list;
+            while (fclient)
+            {
+                if (FD_ISSET (fclient->client->con->sock, &realfds))
+                    fclient->ready = 1;
+                fclient = fclient->next;
+            }
+            return 1;
+        }
+    }
+    return 0;
+}
 #endif
-        else {
-            avl_tree_rlock(pending_tree);
-            client_node = avl_get_first(pending_tree);
-            avl_tree_unlock(pending_tree);
-            if(client_node)
-                return;
+
+static void wait_for_fds() {
+    fserve_t *fclient;
+
+    while (run_fserv)
+    {
+        /* add any new clients here */
+        if (pending_list)
+        {
+            thread_mutex_lock (&pending_lock);
+
+            fclient = (fserve_t*)pending_list;
+            while (fclient)
+            {
+                fserve_t *to_move = fclient;
+                fclient = fclient->next;
+                to_move->next = active_list;
+                active_list = to_move;
+                client_tree_changed = 1;
+                fserve_clients++;
+                stats_event_inc(NULL, "clients");
+            }
+            pending_list = NULL;
+            thread_mutex_unlock (&pending_lock);
         }
+        /* drop out of here is someone is ready */
+        if (fserve_client_waiting())
+           break;
     }
 }
 
 static void *fserv_thread_function(void *arg)
 {
-    avl_node *client_node, *pending_node;
-    fserve_t *client;
+    fserve_t *fclient, **trail;
     int sbytes, bytes;
 
+    INFO0("file serving thread started");
     while (run_fserv) {
-        avl_tree_rlock(client_tree);
-
-        client_node = avl_get_first(client_tree);
-        if(!client_node) {
-            avl_tree_rlock(pending_tree);
-            pending_node = avl_get_first(pending_tree);
-            if(!pending_node) {
-                /* There are no current clients. Wait until there are... */
-                avl_tree_unlock(pending_tree);
-                avl_tree_unlock(client_tree);
-                thread_cond_wait(&fserv_cond);
-                continue;
-            }
-            avl_tree_unlock(pending_tree);
-        }
-
-        /* This isn't hugely efficient, but it'll do for now */
-        avl_tree_unlock(client_tree);
         wait_for_fds();
 
-        avl_tree_rlock(client_tree);
-        client_node = avl_get_first(client_tree);
+        fclient = active_list;
+        trail = &active_list;
 
-        while(client_node) {
-            avl_node_wlock(client_node);
+        while (fclient)
+        {
+            /* process this client, if it is ready */
+            if (fclient->ready)
+            {
+                fclient->ready = 0;
+                if(fclient->offset >= fclient->datasize) {
+                    /* Grab a new chunk */
+                    bytes = fread(fclient->buf, 1, BUFSIZE, fclient->file);
+                    if (bytes == 0)
+                    {
+                        fserve_t *to_go = fclient;
+                        fclient = fclient->next;
+                        *trail = fclient;
+                        _free_client (to_go);
+                        fserve_clients--;
+                        client_tree_changed = 1;
+                        continue;
+                    }
+                    fclient->offset = 0;
+                    fclient->datasize = bytes;
+                }
 
-            client = (fserve_t *)client_node->key;
+                /* Now try and send current chunk. */
+                sbytes = client_send_bytes (fclient->client, 
+                        &fclient->buf[fclient->offset], 
+                        fclient->datasize - fclient->offset);
 
-            if(client->offset >= client->datasize) {
-                /* Grab a new chunk */
-                bytes = fread(client->buf, 1, BUFSIZE, client->file);
-                if(bytes <= 0) {
-                    client->client->con->error = 1;
-                    avl_node_unlock(client_node);
-                    client_node = avl_get_next(client_node);
+                /* TODO: remove clients if they take too long. */
+                if(sbytes > 0) {
+                    fclient->offset += sbytes;
+                }
+
+                if (fclient->client->con->error)
+                {
+                    fserve_t *to_go = fclient;
+                    fclient = fclient->next;
+                    *trail = fclient;
+                    fserve_clients--;
+                    _free_client (to_go);
+                    client_tree_changed = 1;
                     continue;
                 }
-                client->offset = 0;
-                client->datasize = bytes;
             }
-
-            /* Now try and send current chunk. */
-            sbytes = client_send_bytes (client->client, 
-                    &client->buf[client->offset], 
-                    client->datasize - client->offset);
-
-            /* TODO: remove clients if they take too long. */
-            if(sbytes >= 0) {
-                client->offset += sbytes;
-            }
-            avl_node_unlock(client_node);
-            client_node = avl_get_next(client_node);
+            trail = &fclient->next;
+            fclient = fclient->next;
         }
-
-        avl_tree_unlock(client_tree);
-
-        /* Now we need a write lock instead, to delete done clients. */
-        avl_tree_wlock(client_tree);
-
-        client_node = avl_get_first(client_tree);
-        while(client_node) {
-            client = (fserve_t *)client_node->key;
-            if(client->client->con->error) {
-                fserve_clients--;
-                client_node = avl_get_next(client_node);
-                avl_delete(client_tree, (void *)client, _free_client);
-                client_tree_changed = 1;
-                continue;
-            }
-            client_node = avl_get_next(client_node);
-        }
-
-        avl_tree_wlock(pending_tree);
-
-        /* And now insert new clients. */
-        client_node = avl_get_first(pending_tree);
-        while(client_node) {
-            client = (fserve_t *)client_node->key;
-            avl_insert(client_tree, client);
-            client_tree_changed = 1;
-            fserve_clients++;
-            stats_event_inc(NULL, "clients");
-            client_node = avl_get_next(client_node);
-
-        }
-
-        /* clear pending */
-        while(avl_get_first(pending_tree)) {
-            avl_delete(pending_tree, avl_get_first(pending_tree)->key, 
-                    _remove_client);
-        }
-
-        avl_tree_unlock(pending_tree);
-        avl_tree_unlock(client_tree);
     }
 
     /* Shutdown path */
+    thread_mutex_lock (&pending_lock);
+    while (pending_list)
+    {
+        fserve_t *to_go = (fserve_t *)pending_list;
+        pending_list = to_go->next;
+        _free_client (to_go);
+    }
+    thread_mutex_unlock (&pending_lock);
 
-    avl_tree_wlock(pending_tree);
-    while(avl_get_first(pending_tree))
-        avl_delete(pending_tree, avl_get_first(pending_tree)->key, 
-                _free_client);
-    avl_tree_unlock(pending_tree);
+    while (active_list)
+    {
+        fserve_t *to_go = active_list;
+        active_list = to_go->next;
+        _free_client (to_go);
+    }
 
-    avl_tree_wlock(client_tree);
-    while(avl_get_first(client_tree))
-        avl_delete(client_tree, avl_get_first(client_tree)->key, 
-                _free_client);
-    avl_tree_unlock(client_tree);
-
-    thread_exit(0);
     return NULL;
 }
 
@@ -378,6 +371,7 @@
     client->client = httpclient;
     client->offset = 0;
     client->datasize = 0;
+    client->ready = 0;
     client->buf = malloc(BUFSIZE);
 
     global_lock();
@@ -405,34 +399,14 @@
     sock_set_blocking(client->client->con->sock, SOCK_NONBLOCK);
     sock_set_nodelay(client->client->con->sock);
 
-    avl_tree_wlock(pending_tree);
-    avl_insert(pending_tree, client);
-    avl_tree_unlock(pending_tree);
+    thread_mutex_lock (&pending_lock);
+    client->next = (fserve_t *)pending_list;
+    pending_list = client;
+    thread_mutex_unlock (&pending_lock);
 
-    thread_cond_signal(&fserv_cond);
-
     return 0;
 }
 
-static int _compare_clients(void *compare_arg, void *a, void *b)
-{
-    fserve_t *clienta = (fserve_t *)a;
-    fserve_t *clientb = (fserve_t *)b;
-
-    connection_t *cona = clienta->client->con;
-    connection_t *conb = clientb->client->con;
-
-    if (cona->id < conb->id) return -1;
-    if (cona->id > conb->id) return 1;
-
-    return 0;
-}
-
-static int _remove_client(void *key)
-{
-    return 1;
-}
-
 static int _free_client(void *key)
 {
     fserve_t *client = (fserve_t *)key;

Modified: icecast/trunk/icecast/src/fserve.h
===================================================================
--- icecast/trunk/icecast/src/fserve.h	2004-10-23 15:45:40 UTC (rev 8069)
+++ icecast/trunk/icecast/src/fserve.h	2004-10-24 00:34:15 UTC (rev 8070)
@@ -15,14 +15,16 @@
 
 #include <stdio.h>
 
-typedef struct
+typedef struct _fserve_t
 {
     client_t *client;
 
     FILE *file;
     int offset;
     int datasize;
+    int ready;
     unsigned char *buf;
+    struct _fserve_t *next;
 } fserve_t;
 
 void fserve_initialize(void);



More information about the commits mailing list