[Icecast-dev] [PATCH 04/31] UTIL, add find_eos_delim and use it to simplify util_read_header (pending con)

Niv Sardi nsardi at smartjog.com
Fri Jul 30 07:54:26 PDT 2010


Use memmem so we don't need '\0' ended buffer, we know most sizes anyway.

We now get a refbuf for free, so we attach it to the client already.

Util: use flags instead of the partial/entire mess, code looks cleaner that way

we'll need to port back the connection changes.

Signed-off-by: Niv Sardi <nsardi at smartjog.com>
---
 src/slave.c |   23 +++++++--
 src/util.c  |  142 +++++++++++++++++++++++++++++++++++++++++++++--------------
 src/util.h  |    7 ++-
 3 files changed, 130 insertions(+), 42 deletions(-)

diff --git a/src/slave.c b/src/slave.c
index 80dedca..834c1bb 100644
--- a/src/slave.c
+++ b/src/slave.c
@@ -155,11 +155,11 @@ static client_t *open_relay_connection (relay_server *relay)
     ice_config_t *config;
     http_parser_t *parser = NULL;
     connection_t *con=NULL;
+    refbuf_t *header = NULL;
     char *server = strdup (relay->server);
     char *mount = strdup (relay->mount);
     int port = relay->port;
     char *auth_header;
-    char header[4096];
 
     config = config_get_config ();
     server_id = strdup (config->server_id);
@@ -187,6 +187,7 @@ static client_t *open_relay_connection (relay_server *relay)
     while (redirects < 10)
     {
         sock_t streamsock;
+	int hdrsize;
 
         INFO2 ("connecting to %s:%d", server, port);
 
@@ -197,6 +198,8 @@ static client_t *open_relay_connection (relay_server *relay)
             break;
         }
         con = connection_create (streamsock, -1, strdup (server));
+        if (!con)
+            break;
 
         /* At this point we may not know if we are relaying an mp3 or vorbis
          * stream, but only send the icy-metadata header if the relay details
@@ -214,15 +217,17 @@ static client_t *open_relay_connection (relay_server *relay)
                 server,
                 relay->mp3metadata?"Icy-MetaData: 1\r\n":"",
                 auth_header);
-        memset (header, 0, sizeof(header));
-        if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0)
+
+        header = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+        hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE);
+        if (hdrsize == -ENOENT)
         {
             ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount);
             break;
         }
         parser = httpp_create_parser();
         httpp_initialize (parser, NULL);
-        if (! httpp_parse_response (parser, header, strlen(header), relay->localmount))
+        if (! httpp_parse_response (parser, header->data, hdrsize, relay->localmount))
         {
             ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount,
                     server, port, mount);
@@ -280,7 +285,13 @@ static client_t *open_relay_connection (relay_server *relay)
             }
             global_unlock ();
             sock_set_blocking (streamsock, 0);
-            client_set_queue (client, NULL);
+
+	    header->len -= hdrsize;
+	    memmove(header->data, header->data + hdrsize, header->len);
+            client_set_queue (client, header);
+	    refbuf_release(header);
+
+	    client->pos = hdrsize;
             free (server);
             free (mount);
             free (server_id);
@@ -297,6 +308,8 @@ static client_t *open_relay_connection (relay_server *relay)
     free (auth_header);
     if (con)
         connection_close (con);
+    if (header)
+        refbuf_release (header);
     if (parser)
         httpp_destroy (parser);
     return NULL;
diff --git a/src/util.c b/src/util.c
index 2894701..94b06f6 100644
--- a/src/util.c
+++ b/src/util.c
@@ -84,51 +84,125 @@ int util_timed_wait_for_fd(sock_t fd, int timeout)
 #endif
 }
 
-int util_read_header(sock_t sock, char *buff, unsigned long len, int entire)
+int util_find_eos_delim(refbuf_t *refbuf, int offset, int flags)
 {
-    int read_bytes, ret;
-    unsigned long pos;
-    char c;
+    int len = refbuf->len;
+
+    if (offset < 0) {
+        len = -offset;
+        offset = 0;
+    }
+
+    /* handle \n, \r\n and nsvcap which for some strange reason has
+     * EOL as \r\r\n */
+    char *ptr;
+    switch (flags) {
+    case HEADER_READ_LINE:
+        /* password line */
+	    ptr = memmem (refbuf->data + offset, len - offset, "\r\r\n", 3);
+        if (ptr)
+            return ((ptr+3) - refbuf->data + offset);
+        ptr = memmem (refbuf->data + offset, len - offset, "\r\n", 2);
+        if (ptr)
+            return ((ptr+2) - refbuf->data + offset);
+        ptr = memmem (refbuf->data + offset, len - offset, "\n", 1);
+        if (ptr)
+            return ((ptr+1) - refbuf->data + offset);
+        break;
+    case HEADER_READ_ENTIRE:
+        /* stream_offset refers to the start of any data sent after the
+         * http style headers, we don't want to lose those */
+        ptr = memmem (refbuf->data + offset, len - offset, "\r\r\n\r\r\n", 6);
+        if (ptr)
+            return ((ptr+6) - refbuf->data + offset);
+
+        ptr = memmem (refbuf->data + offset, len - offset, "\r\n\r\n", 4);
+        if (ptr)
+            return ((ptr+4) - refbuf->data + offset);
+
+        ptr = memmem (refbuf->data + offset, len - offset, "\n\n", 2);
+        if (ptr)
+            return ((ptr+2) - refbuf->data + offset);
+        break;
+    default:
+        WARN ("Unhandled flag: %d", flags);
+    }
+
+    return -ENOENT;
+}
+
+int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
+{
+	int bytes, pos, endpos = -ENOENT;
     ice_config_t *config;
     int header_timeout;
 
+    if (!refbuf) {
+        WARN ("No refbuf !");
+        return -ENOENT;
+    }
+
     config = config_get_config();
-    header_timeout = config->header_timeout;
+    header_timeout = config->header_timeout*1000;
     config_release_config();
 
-    read_bytes = 1;
-    pos = 0;
-    ret = 0;
-
-    while ((read_bytes == 1) && (pos < (len - 1))) {
-        read_bytes = 0;
+    if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) {
+	    INFO("util_timed_wait_for_fd <= 0");
+	    return -EAGAIN;
+    }
 
-        if (util_timed_wait_for_fd(sock, header_timeout*1000) > 0) {
+    if (refbuf->sync_point < 0) {
+        DEBUG ("REENTRING, got and old non-resolved sync");
+        pos = -refbuf->sync_point;
+    } else if (refbuf->sync_point > 0) {
+        DEBUG ("REENTRING, got and old resolved sync");
+        endpos = pos = refbuf->sync_point;
+    } else {
+        DEBUG ("FIRST TIME, no sync");
+        pos = 0;
+    }
 
-            if ((read_bytes = recv(sock, &c, 1, 0))) {
-                if (c != '\r') buff[pos++] = c;
-                if (entire) {
-                    if ((pos > 1) && (buff[pos - 1] == '\n' &&
-                                      buff[pos - 2] == '\n')) {
-                        ret = 1;
-                        break;
-                    }
-                }
-                else {
-                    if ((pos > 1) && (buff[pos - 1] == '\n')) {
-                        ret = 1;
-                        break;
-                    }
-                }
+    while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) >= 0) {
+        if (bytes == 0)
+            con->error = 1;
+        if (bytes == -1 && !sock_recoverable (sock_error()))
+            con->error = 1;
+
+        DEBUG("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos);
+	/* this is used for re-entrance, so we get a new chance to read */
+        if (endpos == -ENOENT)
+            endpos = util_find_eos_delim (refbuf, -(bytes + pos), flags);
+        if (endpos != -ENOENT) {
+            INFO("found it, read %d, left for you: %d, starting %s",
+                 pos + bytes, pos + bytes - endpos, refbuf->data);
+            if (pos + bytes - endpos > 0) {
+                refbuf->len = pos + bytes;
+                INFO("ok got everything");
+                refbuf->sync_point = 0;
+                return endpos;
             }
-        } else {
-            break;
+            INFO ("missing client data, come back for more");
+            refbuf->sync_point = endpos;
+            return endpos;
         }
-    }
 
-    if (ret) buff[pos] = '\0';
+        pos += bytes;
 
-    return ret;
+        if (refbuf->len - pos <= 0) {
+            WARN ("Looked for endpos up to %d, but couldn't find it,… well data is %s", pos, refbuf->data);
+            return -ENOMEM;
+        }
+
+	    if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) {
+		    INFO ("util_timed_wait_for_fd <= 0");
+            refbuf->sync_point = -pos;
+		    return -EAGAIN;
+	    }
+    }
+
+    WARN("Couldn't find enough data, pos = %d, data = %s, entire = %d.\n", pos, refbuf->data, flags);
+    refbuf->sync_point = 0;
+    return -ENOENT;
 }
 
 char *util_get_extension(const char *path) {
diff --git a/src/util.h b/src/util.h
index ac44f89..7ce8ed6 100644
--- a/src/util.h
+++ b/src/util.h
@@ -16,13 +16,14 @@
 #define XSLT_CONTENT 1
 #define HTML_CONTENT 2
 
-#define READ_ENTIRE_HEADER 1
-#define READ_LINE 0
+#define HEADER_READ_ENTIRE 0
+#define HEADER_READ_LINE 1
 
 #define MAX_LINE_LEN 512
 
 int util_timed_wait_for_fd(sock_t fd, int timeout);
-int util_read_header(sock_t sock, char *buff, unsigned long len, int entire);
+int util_find_eos_delim(refbuf_t *refbuf, int offset, int flags);
+int util_read_header(connection_t *con, refbuf_t *refbuf, int flags);
 int util_check_valid_extension(const char *uri);
 char *util_get_extension(const char *path);
 char *util_get_path_from_uri(char *uri);
-- 
1.7.1



More information about the Icecast-dev mailing list