[xiph-commits] r8068 - icecast/trunk/icecast/src
karl at motherfish-iii.xiph.org
karl at motherfish-iii.xiph.org
Fri Oct 22 17:44:29 PDT 2004
Author: karl
Date: 2004-10-22 17:44:29 -0700 (Fri, 22 Oct 2004)
New Revision: 8068
Modified:
icecast/trunk/icecast/src/slave.c
icecast/trunk/icecast/src/slave.h
Log:
When starting relay threads, have the relay thread do the connection not the
slave thread. Also improve cleanup handling and log messages as well
Modified: icecast/trunk/icecast/src/slave.c
===================================================================
--- icecast/trunk/icecast/src/slave.c 2004-10-22 17:05:08 UTC (rev 8067)
+++ icecast/trunk/icecast/src/slave.c 2004-10-23 00:44:29 UTC (rev 8068)
@@ -62,7 +62,8 @@
static void *_slave_thread(void *arg);
thread_type *_slave_thread_id;
static int slave_running = 0;
-static int max_interval = 0;
+static unsigned int max_interval = 0;
+static int rescan_relays = 0;
relay_server *relay_free (relay_server *relay)
{
@@ -73,7 +74,7 @@
xmlFree (relay->server);
xmlFree (relay->mount);
xmlFree (relay->localmount);
- xmlFree (relay);
+ free (relay);
return next;
}
@@ -94,26 +95,20 @@
}
-static void *_relay_thread (void *arg)
+/* force a recheck of the relays. This will recheck the master server if
+ * a this is a slave.
+ */
+void slave_recheck (void)
{
- relay_server *relay = arg;
-
- relay->running = 1;
- stats_event_inc(NULL, "source_relay_connections");
-
- source_main (relay->source);
-
- relay->running = 0;
- if (relay->cleanup)
- relay_free (relay);
-
- return NULL;
+ max_interval = 0;
}
-
-void slave_recheck (void)
+/* rescan the current relays to see if any need starting or if any
+ * relay threads have terminated
+ */
+void slave_rescan (void)
{
- max_interval = 0;
+ rescan_relays = 1;
}
@@ -129,36 +124,27 @@
void slave_shutdown(void)
{
- relay_server *relay;
-
if (!slave_running)
return;
slave_running = 0;
+ DEBUG0 ("waiting for slave thread");
thread_join (_slave_thread_id);
-
- relay = global.relays;
- while (relay)
- relay = relay_free (relay);
- global.relays = NULL;
-
- relay = global.master_relays;
- while (relay)
- relay = relay_free (relay);
- global.master_relays = NULL;
}
/* This does the actual connection for a relay. A thread is
* started off if a connection can be acquired
*/
-static void start_relay_stream (relay_server *relay)
+static void *start_relay_stream (void *arg)
{
+ relay_server *relay = arg;
sock_t streamsock = SOCK_ERROR;
source_t *src = relay->source;
http_parser_t *parser = NULL;
connection_t *con=NULL;
char header[4096];
+ relay->running = 1;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
do
{
@@ -206,9 +192,15 @@
DEBUG0("Failed to complete source initialisation");
break;
}
- thread_create ("Relay Thread", _relay_thread, relay, THREAD_DETACHED);
+ stats_event_inc(NULL, "source_relay_connections");
- return;
+ source_main (relay->source);
+
+ /* initiate an immediate relay cleanup run */
+ relay->cleanup = 1;
+ slave_rescan();
+
+ return NULL;
} while (0);
if (con == NULL && streamsock != SOCK_ERROR)
@@ -220,6 +212,12 @@
httpp_destroy (parser);
src->parser = NULL;
source_clear_source (relay->source);
+
+ /* initiate an immediate relay cleanup run */
+ relay->cleanup = 1;
+ slave_rescan();
+
+ return NULL;
}
@@ -228,14 +226,34 @@
{
if (relay->source == NULL)
{
+ if (relay->localmount[0] != '/')
+ {
+ WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
+ relay->localmount);
+ return;
+ }
/* new relay, reserve the name */
- DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
relay->source = source_reserve (relay->localmount);
+ if (relay->source)
+ DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
+ else
+ WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
}
if (relay->source && !relay->running)
{
- start_relay_stream (relay);
+ relay->thread = thread_create ("Relay Thread", start_relay_stream,
+ relay, THREAD_ATTACHED);
+ return;
}
+ /* the relay thread may of close down */
+ if (relay->cleanup && relay->thread)
+ {
+ DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
+ thread_join (relay->thread);
+ relay->thread = NULL;
+ relay->cleanup = 0;
+ relay->running = 0;
+ }
}
@@ -257,6 +275,7 @@
while (existing_relay)
{
+ /* break out if keeping relay */
if (strcmp (relay->localmount, existing_relay->localmount) == 0)
break;
existing_p = &existing_relay->next;
@@ -281,33 +300,44 @@
/* update the relay_list with entries from new_relay_list. Any new relays
* are added to the list, and any not listed in the provided new_relay_list
- * get marked for shutting down, just in case they are not shutting down by
- * themselves
+ * are separated and returned in a separate list
*/
-static void
+static relay_server *
update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
- relay_server *relay, *current;
+ relay_server *active_relays, *cleanup_relays;
- current = update_relay_set (relay_list, new_relay_list);
+ active_relays = update_relay_set (relay_list, new_relay_list);
- /* ok whats left, lets make sure they shut down */
- relay = *relay_list;
- while (relay)
+ cleanup_relays = *relay_list;
+ /* re-assign new set */
+ *relay_list = active_relays;
+
+ return cleanup_relays;
+}
+
+
+static void relay_check_streams (relay_server *to_start, relay_server *to_free)
+{
+ relay_server *relay;
+
+ while (to_free)
{
- relay->cleanup = 1;
- if (relay->source)
+ if (to_free->running && to_free->source)
{
- if (relay->source->running)
- DEBUG1 ("requested %s to shut down", relay->source->mount);
- relay->source->running = 0;
- relay = relay->next;
+ DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
+ to_free->source->running = 0;
+ thread_join (to_free->thread);
}
- else
- relay = relay_free (relay);
+ to_free = relay_free (to_free);
}
- /* re-assign new set */
- *relay_list = current;
+
+ relay = to_start;
+ while (relay)
+ {
+ check_relay_stream (relay);
+ relay = relay->next;
+ }
}
@@ -321,7 +351,7 @@
do
{
char *authheader, *data;
- relay_server *relays = NULL, *relay;
+ relay_server *new_relays = NULL, *cleanup_relays;
int len, count = 1;
username = strdup ("relay");
@@ -345,11 +375,9 @@
break;
}
- len = strlen(username) + strlen(password) + 1;
- authheader = malloc(len+1);
- strcpy(authheader, username);
- strcat(authheader, ":");
- strcat(authheader, password);
+ len = strlen(username) + strlen(password) + 2;
+ authheader = malloc(len);
+ snprintf (authheader, len, "%s:%s", username, password);
data = util_base64_encode(authheader);
sock_write (mastersock,
"GET /admin/streamlist.txt HTTP/1.0\r\n"
@@ -385,23 +413,20 @@
r->mount = xmlStrdup (buf);
r->localmount = xmlStrdup (buf);
r->mp3metadata = 1;
- r->next = relays;
- relays = r;
+ r->next = new_relays;
+ new_relays = r;
}
}
sock_close (mastersock);
- update_relays (&global.master_relays, relays);
- /* start any inactive relays */
- relay = global.master_relays;
- while (relay)
- {
- check_relay_stream (relay);
- relay = relay->next;
- }
- relay = relays;
- while (relay)
- relay = relay_free (relay);
+ thread_mutex_lock (&(config_locks()->relay_lock));
+ cleanup_relays = update_relays (&global.master_relays, new_relays);
+
+ relay_check_streams (global.master_relays, cleanup_relays);
+ relay_check_streams (NULL, new_relays);
+
+ thread_mutex_unlock (&(config_locks()->relay_lock));
+
} while(0);
if (master)
@@ -418,39 +443,52 @@
static void *_slave_thread(void *arg)
{
ice_config_t *config;
- relay_server *relay;
- unsigned interval = 0;
+ unsigned int interval = 0;
while (slave_running)
{
+ relay_server *cleanup_relays;
+
thread_sleep (1000000);
- if (max_interval > ++interval)
+ if (rescan_relays == 0 && max_interval > ++interval)
continue;
- interval = 0;
- config = config_get_config();
+ /* only update relays lists when required */
+ if (max_interval <= interval)
+ {
+ DEBUG0 ("checking master stream list");
+ config = config_get_config();
- max_interval = config->master_update_interval;
+ interval = 0;
+ max_interval = config->master_update_interval;
- /* the connection could time some time, so the lock can drop */
- if (update_from_master (config))
- config = config_get_config();
+ /* the connection could take some time, so the lock can drop */
+ if (update_from_master (config))
+ config = config_get_config();
- thread_mutex_lock (&(config_locks()->relay_lock));
+ thread_mutex_lock (&(config_locks()->relay_lock));
- update_relays (&global.relays, config->relay);
+ cleanup_relays = update_relays (&global.relays, config->relay);
- config_release_config();
+ config_release_config();
- /* start any inactive relays */
- relay = global.relays;
- while (relay)
+ relay_check_streams (global.relays, cleanup_relays);
+ thread_mutex_unlock (&(config_locks()->relay_lock));
+ }
+ else
{
- check_relay_stream (relay);
- relay = relay->next;
+ DEBUG0 ("rescanning relay lists");
+ thread_mutex_lock (&(config_locks()->relay_lock));
+ relay_check_streams (global.master_relays, NULL);
+ relay_check_streams (global.relays, NULL);
+ thread_mutex_unlock (&(config_locks()->relay_lock));
}
- thread_mutex_unlock (&(config_locks()->relay_lock));
+ rescan_relays = 0;
}
+ DEBUG0 ("shutting down current relays");
+ relay_check_streams (NULL, global.relays);
+ relay_check_streams (NULL, global.master_relays);
+
INFO0 ("Slave thread shutdown complete");
return NULL;
Modified: icecast/trunk/icecast/src/slave.h
===================================================================
--- icecast/trunk/icecast/src/slave.h 2004-10-22 17:05:08 UTC (rev 8067)
+++ icecast/trunk/icecast/src/slave.h 2004-10-23 00:44:29 UTC (rev 8068)
@@ -13,6 +13,8 @@
#ifndef __SLAVE_H__
#define __SLAVE_H__
+#include <thread/thread.h>
+
typedef struct _relay_server {
char *server;
int port;
@@ -22,6 +24,7 @@
int mp3metadata;
int running;
int cleanup;
+ thread_type *thread;
struct _relay_server *next;
} relay_server;
More information about the commits
mailing list