[Icecast-dev] [PATCH 18/31] connection_process takes node, con_q_t gets refbuf, and con_t timeout, util updated

Niv Sardi nsardi at smartjog.com
Fri Jul 30 07:54:40 PDT 2010


This allows all to be re-entrant.

Signed-off-by: Niv Sardi <nsardi at smartjog.com>
---
 src/connection.c |   27 ++++++++++++++++++---------
 src/connection.h |    1 +
 src/util.c       |   48 ++++++++++++++++++++++++++++++++++--------------
 3 files changed, 53 insertions(+), 23 deletions(-)

diff --git a/src/connection.c b/src/connection.c
index 118258d..9532bab 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -84,6 +84,7 @@
 
 typedef struct connection_queue_tag {
     connection_t *con;
+    refbuf_t *refbuf;
     struct connection_queue_tag *next;
 } connection_queue_t;
 
@@ -115,7 +116,7 @@ rwlock_t _source_shutdown_rwlock;
 
 static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount);
 static int _handle_client (client_t *client);
-static int _connection_process (connection_t *con, int timeout);
+static int _connection_process (connection_queue_t *node);
 static void *_connection_thread (void *arg);
 
 static int compare_ip (void *arg, void *a, void *b)
@@ -633,7 +634,7 @@ static void *_connection_thread (void *arg)
         if (!node) {
             continue;
         }
-	err = _connection_process (node, 3000);
+	err = _connection_process (node);
         if (err > 0) {
             free(node);
             continue;
@@ -662,18 +663,20 @@ static void *_connection_thread (void *arg)
     return NULL;
 }
 
-static int _connection_process (connection_t *con, int timeout) {
+static int _connection_process (connection_queue_t *node) {
     ice_config_t *config;
     client_t *client = NULL;
     listener_t *listener;
-    refbuf_t *header = NULL;
+    refbuf_t *header;
     http_parser_t *parser = NULL;
     int hdrsize = 0;
     int shoutcast = 0;
     char *shoutcast_mount = NULL;
 
-    header = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-    hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE);
+    if (!node->refbuf)
+	    node->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+    header = node->refbuf;
+    hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE);
     if (hdrsize < 0)
     {
         global_unlock();
@@ -700,7 +703,7 @@ static int _connection_process (connection_t *con, int timeout) {
 
     if (header->sync_point && (parser->req_type == httpp_req_source ||
                                parser->req_type == httpp_req_post)) {
-	    hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE);
+	    hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE);
 	    if (hdrsize < 0) {
             INFO ("Header read failed");
             return hdrsize;
@@ -708,7 +711,7 @@ static int _connection_process (connection_t *con, int timeout) {
     }
 
     global_lock();
-    if (client_create (&client, con, parser) < 0)
+    if (client_create (&client, node->con, parser) < 0)
     {
         global_unlock();
         client_send_403 (client, "Icecast connection limit reached");
@@ -745,8 +748,12 @@ static int _connection_process (connection_t *con, int timeout) {
     global_unlock();
     config_release_config();
 
-    if (client->con->con_time + timeout <= time(NULL))
+/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */
+    if (client->con->con_timeout <= time(NULL)) {
+        WARN("there might be a bug if you see this");
+        client_destroy (client);
         return -1;
+    }
 
     stats_event_inc (NULL, "connections");
 
@@ -779,6 +786,8 @@ void connection_accept_loop (void)
             continue;
         }
 
+        con->con_timeout = time(NULL) + timeout;
+
         /* add connection async to the connection queue, then the
          * connection loop will do all the dirty work */
         node =_connection_node_new (con);
diff --git a/src/connection.h b/src/connection.h
index 80a2b10..42483cc 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -34,6 +34,7 @@ typedef struct connection_tag
     unsigned long id;
 
     time_t con_time;
+    time_t con_timeout;
     time_t discon_time;
     uint64_t sent_bytes;
 
diff --git a/src/util.c b/src/util.c
index 94b06f6..c74d1ce 100644
--- a/src/util.c
+++ b/src/util.c
@@ -142,9 +142,18 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
         return -ENOENT;
     }
 
-    config = config_get_config();
-    header_timeout = config->header_timeout*1000;
-    config_release_config();
+    if (con->con_timeout == 0) {
+        DEBUG0 ("NO TIMEOUT");
+        config = config_get_config();
+        header_timeout = config->header_timeout*1000;
+        config_release_config();
+    } else if (time(NULL) < con->con_timeout) {
+        DEBUG3 ("STILL HAVE TIME (%d - %d = %d)", time(NULL), con->con_timeout, con->con_timeout - time(NULL));
+        header_timeout = 1000;
+    } else {
+        DEBUG0 ("HUM BROKEN PIPE");
+        return -EPIPE;
+    }
 
     if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) {
 	    INFO("util_timed_wait_for_fd <= 0");
@@ -152,24 +161,19 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
     }
 
     if (refbuf->sync_point < 0) {
-        DEBUG ("REENTRING, got and old non-resolved sync");
+        DEBUG0 ("REENTRING, got and old non-resolved sync");
         pos = -refbuf->sync_point;
     } else if (refbuf->sync_point > 0) {
-        DEBUG ("REENTRING, got and old resolved sync");
+        DEBUG0 ("REENTRING, got and old resolved sync");
         endpos = pos = refbuf->sync_point;
     } else {
-        DEBUG ("FIRST TIME, no sync");
+        DEBUG0 ("FIRST TIME, no sync");
         pos = 0;
     }
 
-    while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) >= 0) {
-        if (bytes == 0)
-            con->error = 1;
-        if (bytes == -1 && !sock_recoverable (sock_error()))
-            con->error = 1;
-
-        DEBUG("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos);
-	/* this is used for re-entrance, so we get a new chance to read */
+    while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) > 0) {
+        DEBUG4 ("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos);
+        /* this is used for re-entrance, so we get a new chance to read */
         if (endpos == -ENOENT)
             endpos = util_find_eos_delim (refbuf, -(bytes + pos), flags);
         if (endpos != -ENOENT) {
@@ -198,8 +202,24 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
             refbuf->sync_point = -pos;
 		    return -EAGAIN;
 	    }
+
+        if (time(NULL) > con->con_timeout)
+            goto out_FAIL;
+    }
+
+    if (bytes == 0 || ! sock_recoverable (sock_error())) {
+        WARN ("Connection error");
+        con->error = 1;
+        return -EPIPE;
+    }
+
+    if (time(NULL) < con->con_timeout) {
+        DEBUG3 ("STILL HAVE TIME pos == (%d)…bytes = %d… data is %s", pos, bytes, refbuf->data);
+        DEBUG3 ("OUT, %p, refbuf->len = %d, refbuf->syncpoint = %d", refbuf, refbuf->len, refbuf->sync_point);
+        return -EAGAIN;
     }
 
+out_FAIL:
     WARN("Couldn't find enough data, pos = %d, data = %s, entire = %d.\n", pos, refbuf->data, flags);
     refbuf->sync_point = 0;
     return -ENOENT;
-- 
1.7.1



More information about the Icecast-dev mailing list