[Icecast-dev] [PATCH 18/31] connection_process takes node, con_q_t gets refbuf, and con_t timeout, util updated
Niv Sardi
nsardi at smartjog.com
Fri Jul 30 07:54:40 PDT 2010
This allows all to be re-entrant.
Signed-off-by: Niv Sardi <nsardi at smartjog.com>
---
src/connection.c | 27 ++++++++++++++++++---------
src/connection.h | 1 +
src/util.c | 48 ++++++++++++++++++++++++++++++++++--------------
3 files changed, 53 insertions(+), 23 deletions(-)
diff --git a/src/connection.c b/src/connection.c
index 118258d..9532bab 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -84,6 +84,7 @@
typedef struct connection_queue_tag {
connection_t *con;
+ refbuf_t *refbuf;
struct connection_queue_tag *next;
} connection_queue_t;
@@ -115,7 +116,7 @@ rwlock_t _source_shutdown_rwlock;
static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount);
static int _handle_client (client_t *client);
-static int _connection_process (connection_t *con, int timeout);
+static int _connection_process (connection_queue_t *node);
static void *_connection_thread (void *arg);
static int compare_ip (void *arg, void *a, void *b)
@@ -633,7 +634,7 @@ static void *_connection_thread (void *arg)
if (!node) {
continue;
}
- err = _connection_process (node, 3000);
+ err = _connection_process (node);
if (err > 0) {
free(node);
continue;
@@ -662,18 +663,20 @@ static void *_connection_thread (void *arg)
return NULL;
}
-static int _connection_process (connection_t *con, int timeout) {
+static int _connection_process (connection_queue_t *node) {
ice_config_t *config;
client_t *client = NULL;
listener_t *listener;
- refbuf_t *header = NULL;
+ refbuf_t *header;
http_parser_t *parser = NULL;
int hdrsize = 0;
int shoutcast = 0;
char *shoutcast_mount = NULL;
- header = refbuf_new (PER_CLIENT_REFBUF_SIZE);
- hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE);
+ if (!node->refbuf)
+ node->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+ header = node->refbuf;
+ hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE);
if (hdrsize < 0)
{
global_unlock();
@@ -700,7 +703,7 @@ static int _connection_process (connection_t *con, int timeout) {
if (header->sync_point && (parser->req_type == httpp_req_source ||
parser->req_type == httpp_req_post)) {
- hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE);
+ hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE);
if (hdrsize < 0) {
INFO ("Header read failed");
return hdrsize;
@@ -708,7 +711,7 @@ static int _connection_process (connection_t *con, int timeout) {
}
global_lock();
- if (client_create (&client, con, parser) < 0)
+ if (client_create (&client, node->con, parser) < 0)
{
global_unlock();
client_send_403 (client, "Icecast connection limit reached");
@@ -745,8 +748,12 @@ static int _connection_process (connection_t *con, int timeout) {
global_unlock();
config_release_config();
- if (client->con->con_time + timeout <= time(NULL))
+/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */
+ if (client->con->con_timeout <= time(NULL)) {
+ WARN("there might be a bug if you see this");
+ client_destroy (client);
return -1;
+ }
stats_event_inc (NULL, "connections");
@@ -779,6 +786,8 @@ void connection_accept_loop (void)
continue;
}
+ con->con_timeout = time(NULL) + timeout;
+
/* add connection async to the connection queue, then the
* connection loop will do all the dirty work */
node =_connection_node_new (con);
diff --git a/src/connection.h b/src/connection.h
index 80a2b10..42483cc 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -34,6 +34,7 @@ typedef struct connection_tag
unsigned long id;
time_t con_time;
+ time_t con_timeout;
time_t discon_time;
uint64_t sent_bytes;
diff --git a/src/util.c b/src/util.c
index 94b06f6..c74d1ce 100644
--- a/src/util.c
+++ b/src/util.c
@@ -142,9 +142,18 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
return -ENOENT;
}
- config = config_get_config();
- header_timeout = config->header_timeout*1000;
- config_release_config();
+ if (con->con_timeout == 0) {
+ DEBUG0 ("NO TIMEOUT");
+ config = config_get_config();
+ header_timeout = config->header_timeout*1000;
+ config_release_config();
+ } else if (time(NULL) < con->con_timeout) {
+ DEBUG3 ("STILL HAVE TIME (%d - %d = %d)", time(NULL), con->con_timeout, con->con_timeout - time(NULL));
+ header_timeout = 1000;
+ } else {
+ DEBUG0 ("HUM BROKEN PIPE");
+ return -EPIPE;
+ }
if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) {
INFO("util_timed_wait_for_fd <= 0");
@@ -152,24 +161,19 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
}
if (refbuf->sync_point < 0) {
- DEBUG ("REENTRING, got and old non-resolved sync");
+ DEBUG0 ("REENTRING, got and old non-resolved sync");
pos = -refbuf->sync_point;
} else if (refbuf->sync_point > 0) {
- DEBUG ("REENTRING, got and old resolved sync");
+ DEBUG0 ("REENTRING, got and old resolved sync");
endpos = pos = refbuf->sync_point;
} else {
- DEBUG ("FIRST TIME, no sync");
+ DEBUG0 ("FIRST TIME, no sync");
pos = 0;
}
- while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) >= 0) {
- if (bytes == 0)
- con->error = 1;
- if (bytes == -1 && !sock_recoverable (sock_error()))
- con->error = 1;
-
- DEBUG("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos);
- /* this is used for re-entrance, so we get a new chance to read */
+ while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) > 0) {
+ DEBUG4 ("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos);
+ /* this is used for re-entrance, so we get a new chance to read */
if (endpos == -ENOENT)
endpos = util_find_eos_delim (refbuf, -(bytes + pos), flags);
if (endpos != -ENOENT) {
@@ -198,8 +202,24 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags)
refbuf->sync_point = -pos;
return -EAGAIN;
}
+
+ if (time(NULL) > con->con_timeout)
+ goto out_FAIL;
+ }
+
+ if (bytes == 0 || ! sock_recoverable (sock_error())) {
+ WARN ("Connection error");
+ con->error = 1;
+ return -EPIPE;
+ }
+
+ if (time(NULL) < con->con_timeout) {
+ DEBUG3 ("STILL HAVE TIME pos == (%d)â¦bytes = %d⦠data is %s", pos, bytes, refbuf->data);
+ DEBUG3 ("OUT, %p, refbuf->len = %d, refbuf->syncpoint = %d", refbuf, refbuf->len, refbuf->sync_point);
+ return -EAGAIN;
}
+out_FAIL:
WARN("Couldn't find enough data, pos = %d, data = %s, entire = %d.\n", pos, refbuf->data, flags);
refbuf->sync_point = 0;
return -ENOENT;
--
1.7.1
More information about the Icecast-dev
mailing list