[xiph-commits] r7431 - icecast/branches/kh/icecast/src

karl at motherfish-iii.xiph.org karl
Thu Aug 5 19:05:06 PDT 2004


Author: karl
Date: Thu Aug  5 19:05:06 2004
New Revision: 7431

Modified:
icecast/branches/kh/icecast/src/slave.c
icecast/branches/kh/icecast/src/slave.h
icecast/branches/kh/icecast/src/source.c
Log:
slave update. Make relay threads attached. Allow for rescanning the current
relays so that master server queries are not triggered when not required.
Each relay is started independently now. some comment and variable cleanups


Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c	2004-07-31 09:33:42 UTC (rev 7430)
+++ icecast/branches/kh/icecast/src/slave.c	2004-07-31 13:34:22 UTC (rev 7431)
@@ -62,7 +62,8 @@
static void *_slave_thread(void *arg);
thread_type *_slave_thread_id;
static int slave_running = 0;
-static unsigned max_interval = 0;
+static unsigned int max_interval = 0;
+static int rescan_relays = 0;

relay_server *relay_free (relay_server *relay)
{
@@ -99,29 +100,22 @@
}


-static void *_relay_thread (void *arg)
+/* force a recheck of the relays. This will recheck the master server if
+ * a this is a slave.
+ */
+void slave_recheck (void)
{
-    relay_server *relay = arg;
-
-    relay->running = 1;
-    stats_event_inc(NULL, "source_relay_connections");
-
-    source_main (relay->source);
-
-    relay->running = 0;
-    if (relay->cleanup)
-        relay_free (relay);
-
-    return NULL;
+    max_interval = 0;
}

-
-void slave_recheck (void)
+/* rescan the current relays to see if any need starting or if any
+ * relay threads have terminated
+ */
+void slave_rescan (void)
{
-    max_interval = 0;
+    rescan_relays = 1;
}

-
void slave_initialize(void)
{
if (slave_running)
@@ -134,22 +128,10 @@

void slave_shutdown(void)
{
-    relay_server *relay;
-
if (!slave_running)
return;
slave_running = 0;
thread_join (_slave_thread_id);
-
-    relay = global.relays;
-    while (relay)
-        relay = relay_free (relay);
-    global.relays = NULL;
-
-    relay = global.master_relays;
-    while (relay)
-        relay = relay_free (relay);
-    global.master_relays = NULL;
}


@@ -198,23 +180,22 @@
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
*/
-static void start_relay_stream (relay_server *relay)
+static void *start_relay_stream (void *arg)
{
+    relay_server *relay = arg;
sock_t streamsock = SOCK_ERROR;
source_t *src = relay->source;
http_parser_t *parser = NULL;
connection_t *con=NULL;
char header[4096];

-    if (relay->on_demand && src->on_demand_req == 0)
-        return;
-
+    relay->running = 1;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
{
char *auth_header;

-        streamsock = sock_connect_wto (relay->server, relay->port, 30);
+        streamsock = sock_connect_wto (relay->server, relay->port, 10);
if (streamsock == SOCK_ERROR)
{
WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
@@ -279,9 +260,15 @@
DEBUG0("Failed to complete source initialisation");
break;
}
-        thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
+        stats_event_inc(NULL, "source_relay_connections");

-        return;
+        source_main (relay->source);
+
+        /* initiate an immediate relay cleanup run */
+        relay->cleanup = 1;
+        slave_rescan();
+
+        return NULL;
} while (0);

if (con == NULL && streamsock != SOCK_ERROR)
@@ -293,6 +280,12 @@
httpp_destroy (parser);
src->parser = NULL;
source_clear_source (relay->source);
+
+    /* initiate an immediate relay cleanup run */
+    relay->cleanup = 1;
+    slave_rescan();
+
+    return NULL;
}


@@ -302,18 +295,37 @@
if (relay->source == NULL)
{
/* new relay, reserve the name */
-        DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
relay->source = source_reserve (relay->localmount);
if (relay->source)
{
+            DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
if (relay->on_demand)
DEBUG0 ("setting on_demand");
relay->source->on_demand = relay->on_demand;
}
+        else
+            WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
}
-    if (relay->source && !relay->running)
+    do
{
-        start_relay_stream (relay);
+        if (relay->source == NULL || relay->running)
+            break;
+        if (relay->on_demand && relay->source->on_demand_req == 0)
+            break;
+
+        relay->thread = thread_create ("Relay Thread", start_relay_stream,
+                relay, THREAD_ATTACHED);
+        return;
+
+    } while (0);
+    /* the relay thread may of close down */
+    if (relay->cleanup && relay->thread)
+    {
+        DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
+        thread_join (relay->thread);
+        relay->thread = NULL;
+        relay->cleanup = 0;
+        relay->running = 0;
}
}

@@ -336,6 +348,7 @@

while (existing_relay)
{
+             /* break out if keeping relay */
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
break;
existing_p = &existing_relay->next;
@@ -360,33 +373,44 @@

/* update the relay_list with entries from new_relay_list. Any new relays
* are added to the list, and any not listed in the provided new_relay_list
- * get marked for shutting down, just in case they are not shutting down by
- * themselves
+ * are separated an returned in a separate list
*/
-static void
+static relay_server *
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
-    relay_server *relay, *current;
+    relay_server *active_relays, *cleanup_relays;

-    current = update_relay_set (relay_list, new_relay_list);
+    active_relays = update_relay_set (relay_list, new_relay_list);

-    /* ok whats left, lets make sure they shut down */
-    relay = *relay_list;
-    while (relay)
+    cleanup_relays = *relay_list;
+    /* re-assign new set */
+    *relay_list = active_relays;
+
+    return cleanup_relays;
+}
+
+
+static void relay_check_streams (relay_server *to_start, relay_server *to_free)
+{
+    relay_server *relay;
+
+    while (to_free)
{
-        relay->cleanup = 1;
-        if (relay->source)
+        if (to_free->running && to_free->source)
{
-            if (relay->source->running)
-                DEBUG1 ("requested %s to shut down", relay->source->mount);
-            relay->source->running = 0;
-            relay = relay->next;
+           DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
+           to_free->source->running = 0;
+           thread_join (to_free->thread);
}
-        else
-            relay = relay_free (relay);
+        to_free = relay_free (to_free);
}
-    /* re-assign new set */
-    *relay_list = current;
+
+    relay = to_start;
+    while (relay)
+    {
+        check_relay_stream (relay);
+        relay = relay->next;
+    }
}


@@ -400,7 +424,7 @@
do
{
char *authheader, *data;
-        relay_server *relays = NULL, *relay;
+        relay_server *new_relays = NULL, *cleanup_relays;
int len, count = 1;
int on_demand, send_auth;

@@ -474,23 +498,20 @@
r->username = xmlStrdup (username);
r->password = xmlStrdup (password);
}
-                r->next = relays;
-                relays = r;
+                r->next = new_relays;
+                new_relays = r;
}
}
sock_close (mastersock);

-        update_relays (&global.master_relays, relays);
-        /* start any inactive relays */
-        relay = global.master_relays;
-        while (relay)
-        {
-            check_relay_stream (relay);
-            relay = relay->next;
-        }
-        relay = relays;
-        while (relay)
-            relay = relay_free (relay);
+        thread_mutex_lock (&(config_locks()->relay_lock));
+        cleanup_relays = update_relays (&global.master_relays, new_relays);
+
+        relay_check_streams (global.master_relays, cleanup_relays);
+        relay_check_streams (NULL, new_relays);
+
+        thread_mutex_unlock (&(config_locks()->relay_lock));
+
} while(0);

if (master)
@@ -507,39 +528,50 @@
static void *_slave_thread(void *arg)
{
ice_config_t *config;
-    relay_server *relay;
-    unsigned interval = 0;
+    unsigned int interval = 0;

while (slave_running)
{
+        relay_server *cleanup_relays;
+
thread_sleep (1000000);
-        if (max_interval > ++interval)
+        if (rescan_relays == 0 && max_interval > ++interval)
continue;

-        interval = 0;
-        config = config_get_config();
+        /* only update relays lists when required */
+        if (max_interval <= interval)
+        {
+            config = config_get_config();

-        max_interval = config->master_update_interval;
+            interval = 0;
+            max_interval = config->master_update_interval;

-        /* the connection could time some time, so the lock can drop */
-        if (update_from_master (config))
-            config = config_get_config();
+            /* the connection could take some time, so the lock can drop */
+            if (update_from_master (config))
+                config = config_get_config();

-        thread_mutex_lock (&(config_locks()->relay_lock));
+            thread_mutex_lock (&(config_locks()->relay_lock));

-        update_relays (&global.relays, config->relay);
+            cleanup_relays = update_relays (&global.relays, config->relay);

-        config_release_config();
+            config_release_config();

-        /* start any inactive relays */
-        relay = global.relays;
-        while (relay)
+            relay_check_streams (global.relays, cleanup_relays);
+            thread_mutex_unlock (&(config_locks()->relay_lock));
+        }
+        else
{
-            check_relay_stream (relay);
-            relay = relay->next;
+            thread_mutex_lock (&(config_locks()->relay_lock));
+            relay_check_streams (global.master_relays, NULL);
+            relay_check_streams (global.relays, NULL);
+            thread_mutex_unlock (&(config_locks()->relay_lock));
}
-        thread_mutex_unlock (&(config_locks()->relay_lock));
+        rescan_relays = 0;
}
+    DEBUG0 ("shutting down current relays");
+    relay_check_streams (NULL, global.relays);
+    relay_check_streams (NULL, global.master_relays);
+
INFO0 ("Slave thread shutdown complete");

return NULL;

Modified: icecast/branches/kh/icecast/src/slave.h
===================================================================
--- icecast/branches/kh/icecast/src/slave.h	2004-07-31 09:33:42 UTC (rev 7430)
+++ icecast/branches/kh/icecast/src/slave.h	2004-07-31 13:34:22 UTC (rev 7431)
@@ -14,6 +14,7 @@
#define __SLAVE_H__

#include <client.h>
+#include <thread/thread.h>

typedef struct _relay_server {
char *server;
@@ -27,6 +28,7 @@
int on_demand;
int running;
int cleanup;
+    thread_type *thread;
struct _relay_server *next;
} relay_server;

@@ -40,6 +42,7 @@
void slave_initialize(void);
void slave_shutdown(void);
void slave_recheck (void);
+void slave_rescan (void);
int slave_redirect (char *mountpoint, client_t *client);
relay_server *relay_free (relay_server *relay);


Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c	2004-07-31 09:33:42 UTC (rev 7430)
+++ icecast/branches/kh/icecast/src/source.c	2004-07-31 13:34:22 UTC (rev 7431)
@@ -399,7 +399,7 @@
if (dest->running == 0 && dest->on_demand)
{
dest->on_demand_req = 1;
-        slave_recheck();
+        slave_rescan();
}
thread_mutex_unlock (&dest->lock);
thread_mutex_unlock (&move_clients_mutex);
@@ -820,7 +820,7 @@
/* enable on-demand relay to start, wake up the slave thread */
DEBUG0("kicking off on-demand relay");
source->on_demand_req = 1;
-        slave_recheck();
+        slave_rescan();
}
DEBUG1 ("Added client to pending on %s", source->mount);
source->check_pending = 1;



More information about the commits mailing list