[xiph-commits] r13533 - icecast/trunk/icecast/src

karl at svn.xiph.org karl at svn.xiph.org
Fri Aug 10 14:33:16 PDT 2007


Author: karl
Date: 2007-08-10 14:33:16 -0700 (Fri, 10 Aug 2007)
New Revision: 13533

Modified:
   icecast/trunk/icecast/src/logging.h
   icecast/trunk/icecast/src/slave.c
Log:
Handle http 302 response when a relay starts. The socket IO is isolated into a
separate function for loop handling and log messages are updated.



Modified: icecast/trunk/icecast/src/logging.h
===================================================================
--- icecast/trunk/icecast/src/logging.h	2007-08-10 16:53:50 UTC (rev 13532)
+++ icecast/trunk/icecast/src/logging.h	2007-08-10 21:33:16 UTC (rev 13533)
@@ -38,6 +38,7 @@
 #define ERROR1(y, a) log_write(errorlog, 1, CATMODULE "/", __func__, y, a)
 #define ERROR2(y, a, b) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b)
 #define ERROR3(y, a, b, c) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c)
+#define ERROR4(y, a, b, c, d) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c, d)
 
 #define WARN0(y) log_write(errorlog, 2, CATMODULE "/", __func__, y)
 #define WARN1(y, a) log_write(errorlog, 2, CATMODULE "/", __func__, y, a)

Modified: icecast/trunk/icecast/src/slave.c
===================================================================
--- icecast/trunk/icecast/src/slave.c	2007-08-10 16:53:50 UTC (rev 13532)
+++ icecast/trunk/icecast/src/slave.c	2007-08-10 21:33:16 UTC (rev 13533)
@@ -145,53 +145,53 @@
 }
 
 
-/* This does the actual connection for a relay. A thread is
- * started off if a connection can be acquired
+/* Actually open the connection and do some http parsing, handle any 302
+ * responses within here.
  */
-static void *start_relay_stream (void *arg)
+static client_t *open_relay_connection (relay_server *relay)
 {
-    relay_server *relay = arg;
-    sock_t streamsock = SOCK_ERROR;
-    source_t *src = relay->source;
+    int redirects = 0;
     http_parser_t *parser = NULL;
     connection_t *con=NULL;
+    char *server = strdup (relay->server);
+    char *mount = strdup (relay->mount);
+    int port = relay->port;
+    char *auth_header;
     char header[4096];
 
-    relay->running = 1;
-    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
-    do
+    /* build any authentication header before connecting */
+    if (relay->username && relay->password)
     {
-        char *auth_header;
+        char *esc_authorisation;
+        unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
 
-        streamsock = sock_connect_wto (relay->server, relay->port, 30);
+        auth_header = malloc (len);
+        snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
+        esc_authorisation = util_base64_encode(auth_header);
+        free(auth_header);
+        len = strlen (esc_authorisation) + 24;
+        auth_header = malloc (len);
+        snprintf (auth_header, len,
+                "Authorization: Basic %s\r\n", esc_authorisation);
+        free(esc_authorisation);
+    }
+    else
+        auth_header = strdup ("");
+
+    while (redirects < 10)
+    {
+        sock_t streamsock;
+
+        INFO2 ("connecting to %s:%d", server, port);
+
+        streamsock = sock_connect_wto (server, port, 10);
         if (streamsock == SOCK_ERROR)
         {
-            WARN3("Failed to relay stream from master server, couldn't connect to http://%s:%d%s",
-                    relay->server, relay->port, relay->mount);
+            WARN2 ("Failed to connect to %s:%d", server, port);
             break;
         }
-        con = connection_create (streamsock, -1, NULL);
+        con = connection_create (streamsock, -1, strdup (server));
 
-        if (relay->username && relay->password)
-        {
-            char *esc_authorisation;
-            unsigned len = strlen(relay->username) + strlen(relay->password) + 2;
-
-            auth_header = malloc (len);
-            snprintf (auth_header, len, "%s:%s", relay->username, relay->password);
-            esc_authorisation = util_base64_encode(auth_header);
-            free(auth_header);
-            len = strlen (esc_authorisation) + 24;
-            auth_header = malloc (len);
-            snprintf (auth_header, len,
-                    "Authorization: Basic %s\r\n", esc_authorisation);
-            free(esc_authorisation);
-        }
-        else
-        {
-            auth_header = strdup ("");
-        }
-
         /* At this point we may not know if we are relaying an mp3 or vorbis
          * stream, but only send the icy-metadata header if the relay details
          * state so (the typical case).  It's harmless in the vorbis case. If
@@ -202,54 +202,132 @@
                 "%s"
                 "%s"
                 "\r\n",
-                relay->mount,
+                mount,
                 ICECAST_VERSION_STRING,
                 relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                 auth_header);
-        free (auth_header);
         memset (header, 0, sizeof(header));
         if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
         {
-            WARN0("Header read failed");
+            ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
             break;
         }
         parser = httpp_create_parser();
         httpp_initialize (parser, NULL);
         if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
         {
-            ERROR0("Error parsing relay request");
+            ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
+                    server, port, mount);
             break;
         }
-        if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
+        if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
         {
-            ERROR1("Error from relay request: %s", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
-            break;
-        }
-        src->parser = parser;
-        src->con = con;
+            /* better retry the connection again but with different details */
+            const char *uri, *mountpoint;
+            int len;
 
-        global_lock ();
-        if (client_create (&src->client, con, parser) < 0)
-        {
-            global_unlock ();
-            /* make sure only the client_destory frees these */
+            uri = httpp_getvar (parser, "location");
+            INFO1 ("redirect received %s", uri);
+            if (strncmp (uri, "http://", 7) != 0)
+                break;
+            uri += 7;
+            mountpoint = strchr (uri, '/');
+            free (mount);
+            if (mountpoint)
+                mount = strdup (mountpoint);
+            else
+                mount = strdup ("/");
+
+            len = strcspn (uri, ":/");
+            port = 80;
+            if (uri [len] == ':')
+                port = atoi (uri+len+1);
+            free (server);
+            server = calloc (1, len+1);
+            strncpy (server, uri, len);
+            connection_close (con);
+            httpp_destroy (parser);
             con = NULL;
             parser = NULL;
-            break;
         }
-        global_unlock ();
-        sock_set_blocking (streamsock, SOCK_NONBLOCK);
-        con = NULL;
-        parser = NULL;
-        client_set_queue (src->client, NULL);
+        else
+        {
+            client_t *client = NULL;
 
+            if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
+            {
+                ERROR2("Error from relay request: %s (%s)", relay->localmount,
+                        httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
+                break;
+            }
+            global_lock ();
+            if (client_create (&client, con, parser) < 0)
+            {
+                global_unlock ();
+                /* make sure only the client_destory frees these */
+                con = NULL;
+                parser = NULL;
+                client_destroy (client);
+                break;
+            }
+            global_unlock ();
+            sock_set_blocking (streamsock, SOCK_NONBLOCK);
+            client_set_queue (client, NULL);
+            free (server);
+            free (mount);
+            free (auth_header);
+
+            return client;
+        }
+        redirects++;
+    }
+    /* failed, better clean up */
+    free (server);
+    free (mount);
+    free (auth_header);
+    if (con)
+        connection_close (con);
+    if (parser)
+        httpp_destroy (parser);
+    return NULL;
+}
+
+
+/* This does the actual connection for a relay. A thread is
+ * started off if a connection can be acquired
+ */
+static void *start_relay_stream (void *arg)
+{
+    relay_server *relay = arg;
+    source_t *src = relay->source;
+    client_t *client;
+    ice_config_t *config;
+
+    INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
+    do
+    {
+        client = open_relay_connection (relay);
+
+        if (client == NULL)
+            continue;
+
+        src->client = client;
+        src->parser = client->parser;
+        src->con = client->con;
+
         if (connection_complete_source (src, 0) < 0)
         {
-            DEBUG0("Failed to complete source initialisation");
-            break;
+            INFO0("Failed to complete source initialisation");
+            client_destroy (client);
+            src->client = NULL;
+            continue;
         }
         stats_event_inc(NULL, "source_relay_connections");
-        stats_event (relay->localmount, "source_ip", relay->server);
+        stats_event (relay->localmount, "source_ip", client->con->ip);
+        config = config_get_config();
+        stats_event_args (relay->localmount, "listenurl", "http://%s:%d%s",
+                config->hostname, config->port, relay->localmount);
+        config_release_config();
 
         source_main (relay->source);
 
@@ -265,7 +343,7 @@
         relay->cleanup = 1;
 
         return NULL;
-    } while (0);
+    } while (0);  /* TODO allow looping through multiple servers */
 
     if (relay->source->fallback_mount)
     {
@@ -281,12 +359,6 @@
         avl_tree_unlock (global.source_tree);
     }
 
-    if (con)
-        connection_close (con);
-    src->con = NULL;
-    if (parser)
-        httpp_destroy (parser);
-    src->parser = NULL;
     source_clear_source (relay->source);
 
     /* cleanup relay, but prevent this relay from starting up again too soon */
@@ -331,7 +403,6 @@
             if (source->fallback_mount && source->fallback_override)
             {
                 source_t *fallback;
-                DEBUG1 ("checking %s for fallback override", source->fallback_mount);
                 avl_tree_rlock (global.source_tree);
                 fallback = source_find_mount (source->fallback_mount);
                 if (fallback && fallback->running && fallback->listeners)
@@ -346,6 +417,7 @@
         }
 
         relay->start = time(NULL) + 5;
+        relay->running = 1;
         relay->thread = thread_create ("Relay Thread", start_relay_stream,
                 relay, THREAD_ATTACHED);
         return;
@@ -363,7 +435,7 @@
         relay->cleanup = 0;
         relay->running = 0;
 
-        if (relay->on_demand)
+        if (relay->on_demand && relay->source)
         {
             ice_config_t *config = config_get_config ();
             mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
@@ -472,6 +544,7 @@
             {
                 /* relay has been removed from xml, shut down active relay */
                 DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
+                to_free->running = 0;
                 to_free->source->running = 0;
                 thread_join (to_free->thread);
                 slave_rebuild_mounts();
@@ -654,7 +727,7 @@
             source_recheck_mounts();
         }
     }
-    DEBUG0 ("shutting down current relays");
+    INFO0 ("shutting down current relays");
     relay_check_streams (NULL, global.relays, 0);
     relay_check_streams (NULL, global.master_relays, 0);
 



More information about the commits mailing list