[xiph-commits] r8068 - icecast/trunk/icecast/src

karl at motherfish-iii.xiph.org karl at motherfish-iii.xiph.org
Fri Oct 22 17:44:29 PDT 2004


Author: karl
Date: 2004-10-22 17:44:29 -0700 (Fri, 22 Oct 2004)
New Revision: 8068

Modified:
   icecast/trunk/icecast/src/slave.c
   icecast/trunk/icecast/src/slave.h
Log:
When starting relay threads, have the relay thread do the connection not the
slave thread.  Also improve cleanup handling and log messages as well


Modified: icecast/trunk/icecast/src/slave.c
===================================================================
--- icecast/trunk/icecast/src/slave.c	2004-10-22 17:05:08 UTC (rev 8067)
+++ icecast/trunk/icecast/src/slave.c	2004-10-23 00:44:29 UTC (rev 8068)
@@ -62,7 +62,8 @@
 static void *_slave_thread(void *arg);
 thread_type *_slave_thread_id;
 static int slave_running = 0;
-static int max_interval = 0;
+static unsigned int max_interval = 0;
+static int rescan_relays = 0;
 
 relay_server *relay_free (relay_server *relay)
 {
@@ -73,7 +74,7 @@
     xmlFree (relay->server);
     xmlFree (relay->mount);
     xmlFree (relay->localmount);
-    xmlFree (relay);
+    free (relay);
     return next;
 }
 
@@ -94,26 +95,20 @@
 }
 
 
-static void *_relay_thread (void *arg)
+/* force a recheck of the relays. This will recheck the master server if
+ * a this is a slave.
+ */
+void slave_recheck (void)
 {
-    relay_server *relay = arg;
-
-    relay->running = 1;
-    stats_event_inc(NULL, "source_relay_connections");
-
-    source_main (relay->source);
-
-    relay->running = 0;
-    if (relay->cleanup)
-        relay_free (relay);
-
-    return NULL;
+    max_interval = 0;
 }
 
-
-void slave_recheck (void)
+/* rescan the current relays to see if any need starting or if any
+ * relay threads have terminated
+ */
+void slave_rescan (void)
 {
-    max_interval = 0;
+    rescan_relays = 1;
 }
 
 
@@ -129,36 +124,27 @@
 
 void slave_shutdown(void)
 {
-    relay_server *relay;
-
     if (!slave_running)
         return;
     slave_running = 0;
+    DEBUG0 ("waiting for slave thread");
     thread_join (_slave_thread_id);
-
-    relay = global.relays;
-    while (relay)
-        relay = relay_free (relay);
-    global.relays = NULL;
-
-    relay = global.master_relays;
-    while (relay)
-        relay = relay_free (relay);
-    global.master_relays = NULL;
 }
 
 
 /* This does the actual connection for a relay. A thread is
  * started off if a connection can be acquired
  */
-static void start_relay_stream (relay_server *relay)
+static void *start_relay_stream (void *arg)
 {
+    relay_server *relay = arg;
     sock_t streamsock = SOCK_ERROR;
     source_t *src = relay->source;
     http_parser_t *parser = NULL;
     connection_t *con=NULL;
     char header[4096];
 
+    relay->running = 1;
     INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
     do
     {
@@ -206,9 +192,15 @@
             DEBUG0("Failed to complete source initialisation");
             break;
         }
-        thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
+        stats_event_inc(NULL, "source_relay_connections");
 
-        return;
+        source_main (relay->source);
+
+        /* initiate an immediate relay cleanup run */
+        relay->cleanup = 1;
+        slave_rescan();
+
+        return NULL;
     } while (0);
 
     if (con == NULL && streamsock != SOCK_ERROR)
@@ -220,6 +212,12 @@
         httpp_destroy (parser);
     src->parser = NULL;
     source_clear_source (relay->source);
+
+    /* initiate an immediate relay cleanup run */
+    relay->cleanup = 1;
+    slave_rescan();
+
+    return NULL;
 }
 
 
@@ -228,14 +226,34 @@
 {
     if (relay->source == NULL)
     {
+        if (relay->localmount[0] != '/')
+        {
+            WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
+                    relay->localmount);
+            return;
+        }
         /* new relay, reserve the name */
-        DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
         relay->source = source_reserve (relay->localmount);
+        if (relay->source)
+            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
+        else
+            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
     }
     if (relay->source && !relay->running)
     {
-        start_relay_stream (relay);
+        relay->thread = thread_create ("Relay Thread", start_relay_stream,
+                relay, THREAD_ATTACHED);
+        return;
     }
+    /* the relay thread may of close down */
+    if (relay->cleanup && relay->thread)
+    {
+        DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
+        thread_join (relay->thread);
+        relay->thread = NULL;
+        relay->cleanup = 0;
+        relay->running = 0;
+    }
 }
 
 
@@ -257,6 +275,7 @@
 
          while (existing_relay)
          {
+             /* break out if keeping relay */
              if (strcmp (relay->localmount, existing_relay->localmount) == 0)
                  break;
              existing_p = &existing_relay->next;
@@ -281,33 +300,44 @@
 
 /* update the relay_list with entries from new_relay_list. Any new relays
  * are added to the list, and any not listed in the provided new_relay_list
- * get marked for shutting down, just in case they are not shutting down by
- * themselves
+ * are separated and returned in a separate list
  */
-static void
+static relay_server *
 update_relays (relay_server **relay_list, relay_server *new_relay_list)
 {
-    relay_server *relay, *current;
+    relay_server *active_relays, *cleanup_relays;
 
-    current = update_relay_set (relay_list, new_relay_list);
+    active_relays = update_relay_set (relay_list, new_relay_list);
 
-    /* ok whats left, lets make sure they shut down */
-    relay = *relay_list;
-    while (relay)
+    cleanup_relays = *relay_list;
+    /* re-assign new set */
+    *relay_list = active_relays;
+
+    return cleanup_relays;
+}
+
+
+static void relay_check_streams (relay_server *to_start, relay_server *to_free)
+{
+    relay_server *relay;
+
+    while (to_free)
     {
-        relay->cleanup = 1;
-        if (relay->source)
+        if (to_free->running && to_free->source)
         {
-            if (relay->source->running)
-                DEBUG1 ("requested %s to shut down", relay->source->mount);
-            relay->source->running = 0;
-            relay = relay->next;
+            DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
+            to_free->source->running = 0;
+            thread_join (to_free->thread);
         }
-        else
-            relay = relay_free (relay);
+        to_free = relay_free (to_free);
     }
-    /* re-assign new set */
-    *relay_list = current;
+
+    relay = to_start;
+    while (relay)
+    {
+        check_relay_stream (relay);
+        relay = relay->next;
+    }
 }
 
 
@@ -321,7 +351,7 @@
     do
     {
         char *authheader, *data;
-        relay_server *relays = NULL, *relay;
+        relay_server *new_relays = NULL, *cleanup_relays;
         int len, count = 1;
 
         username = strdup ("relay");
@@ -345,11 +375,9 @@
             break;
         }
 
-        len = strlen(username) + strlen(password) + 1;
-        authheader = malloc(len+1);
-        strcpy(authheader, username);
-        strcat(authheader, ":");
-        strcat(authheader, password);
+        len = strlen(username) + strlen(password) + 2;
+        authheader = malloc(len);
+        snprintf (authheader, len, "%s:%s", username, password);
         data = util_base64_encode(authheader);
         sock_write (mastersock,
                 "GET /admin/streamlist.txt HTTP/1.0\r\n"
@@ -385,23 +413,20 @@
                 r->mount = xmlStrdup (buf);
                 r->localmount = xmlStrdup (buf);
                 r->mp3metadata = 1;
-                r->next = relays;
-                relays = r;
+                r->next = new_relays;
+                new_relays = r;
             }
         }
         sock_close (mastersock);
 
-        update_relays (&global.master_relays, relays);
-        /* start any inactive relays */
-        relay = global.master_relays;
-        while (relay)
-        {
-            check_relay_stream (relay);
-            relay = relay->next;
-        }
-        relay = relays;
-        while (relay)
-            relay = relay_free (relay);
+        thread_mutex_lock (&(config_locks()->relay_lock));
+        cleanup_relays = update_relays (&global.master_relays, new_relays);
+        
+        relay_check_streams (global.master_relays, cleanup_relays);
+        relay_check_streams (NULL, new_relays);
+
+        thread_mutex_unlock (&(config_locks()->relay_lock));
+
     } while(0);
 
     if (master)
@@ -418,39 +443,52 @@
 static void *_slave_thread(void *arg)
 {
     ice_config_t *config;
-    relay_server *relay;
-    unsigned interval = 0;
+    unsigned int interval = 0;
 
     while (slave_running)
     {
+        relay_server *cleanup_relays;
+
         thread_sleep (1000000);
-        if (max_interval > ++interval)
+        if (rescan_relays == 0 && max_interval > ++interval)
             continue;
 
-        interval = 0;
-        config = config_get_config();
+        /* only update relays lists when required */
+        if (max_interval <= interval)
+        {
+            DEBUG0 ("checking master stream list");
+            config = config_get_config();
 
-        max_interval = config->master_update_interval;
+            interval = 0;
+            max_interval = config->master_update_interval;
 
-        /* the connection could time some time, so the lock can drop */
-        if (update_from_master (config))
-            config = config_get_config();
+            /* the connection could take some time, so the lock can drop */
+            if (update_from_master (config))
+                config = config_get_config();
 
-        thread_mutex_lock (&(config_locks()->relay_lock));
+            thread_mutex_lock (&(config_locks()->relay_lock));
 
-        update_relays (&global.relays, config->relay);
+            cleanup_relays = update_relays (&global.relays, config->relay);
 
-        config_release_config();
+            config_release_config();
 
-        /* start any inactive relays */
-        relay = global.relays;
-        while (relay)
+            relay_check_streams (global.relays, cleanup_relays);
+            thread_mutex_unlock (&(config_locks()->relay_lock));
+        }
+        else
         {
-            check_relay_stream (relay);
-            relay = relay->next;
+            DEBUG0 ("rescanning relay lists");
+            thread_mutex_lock (&(config_locks()->relay_lock));
+            relay_check_streams (global.master_relays, NULL);
+            relay_check_streams (global.relays, NULL);
+            thread_mutex_unlock (&(config_locks()->relay_lock));
         }
-        thread_mutex_unlock (&(config_locks()->relay_lock));
+        rescan_relays = 0;
     }
+    DEBUG0 ("shutting down current relays");
+    relay_check_streams (NULL, global.relays);
+    relay_check_streams (NULL, global.master_relays);
+
     INFO0 ("Slave thread shutdown complete");
 
     return NULL;

Modified: icecast/trunk/icecast/src/slave.h
===================================================================
--- icecast/trunk/icecast/src/slave.h	2004-10-22 17:05:08 UTC (rev 8067)
+++ icecast/trunk/icecast/src/slave.h	2004-10-23 00:44:29 UTC (rev 8068)
@@ -13,6 +13,8 @@
 #ifndef __SLAVE_H__
 #define __SLAVE_H__
 
+#include <thread/thread.h>
+
 typedef struct _relay_server {
     char *server;
     int port;
@@ -22,6 +24,7 @@
     int mp3metadata;
     int running;
     int cleanup;
+    thread_type *thread;
     struct _relay_server *next;
 } relay_server;
 



More information about the commits mailing list