[xiph-commits] r9733 - icecast/trunk/icecast/src
karl at svn.xiph.org
karl at svn.xiph.org
Thu Aug 11 16:30:03 PDT 2005
Author: karl
Date: 2005-08-11 16:29:58 -0700 (Thu, 11 Aug 2005)
New Revision: 9733
Modified:
icecast/trunk/icecast/src/client.c
icecast/trunk/icecast/src/client.h
icecast/trunk/icecast/src/connection.c
icecast/trunk/icecast/src/connection.h
icecast/trunk/icecast/src/fserve.c
icecast/trunk/icecast/src/slave.c
icecast/trunk/icecast/src/source.c
Log:
drop the thread pool of connection threads, they were using a blocking socket
on incoming connections. Now we get the accept thread to create a client_t
and mark it as a shoutcast client if need be. Then use a single connection
thread to poll the non-blocking sockets for the headers. When complete they
get handled as usual.
Modified: icecast/trunk/icecast/src/client.c
===================================================================
--- icecast/trunk/icecast/src/client.c 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/client.c 2005-08-11 23:29:58 UTC (rev 9733)
@@ -39,33 +39,34 @@
#undef CATMODULE
#define CATMODULE "client"
-client_t *client_create(connection_t *con, http_parser_t *parser)
+/* should be called with global lock held */
+int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser)
{
- ice_config_t *config = config_get_config ();
+ ice_config_t *config;
client_t *client = (client_t *)calloc(1, sizeof(client_t));
- int client_limit = config->client_limit;
+ int ret = -1;
+
+ if (client == NULL)
+ return -1;
+
+ config = config_get_config ();
+
+ global.clients++;
+ if (config->client_limit < global.clients)
+ WARN2 ("server client limit reached (%d/%d)", config->client_limit, global.clients);
+ else
+ ret = 0;
+
config_release_config ();
- global_lock();
- if (global.clients >= client_limit || client == NULL)
- {
- client_limit = global.clients;
- global_unlock();
- free (client);
- WARN1 ("server client limit reached (%d clients)", client_limit);
- return NULL;
- }
- global.clients++;
stats_event_args (NULL, "clients", "%d", global.clients);
- global_unlock();
-
client->con = con;
client->parser = parser;
- client->refbuf = NULL;
client->pos = 0;
client->write_to_client = format_generic_write_to_client;
+ *c_ptr = client;
- return client;
+ return ret;
}
void client_destroy(client_t *client)
@@ -110,7 +111,23 @@
/* helper function for reading data from a client */
int client_read_bytes (client_t *client, void *buf, unsigned len)
{
- int bytes = sock_read_bytes (client->con->sock, buf, len);
+ int bytes;
+
+ if (client->refbuf && client->refbuf->len)
+ {
+ /* we have data to read from a refbuf first */
+ if (client->refbuf->len < len)
+ len = client->refbuf->len;
+ memcpy (buf, client->refbuf->data, len);
+ if (client->refbuf->len < len)
+ {
+ char *ptr = client->refbuf->data;
+ memmove (ptr, ptr+len, client->refbuf->len - len);
+ }
+ client->refbuf->len -= len;
+ return len;
+ }
+ bytes = sock_read_bytes (client->con->sock, buf, len);
if (bytes > 0)
return bytes;
Modified: icecast/trunk/icecast/src/client.h
===================================================================
--- icecast/trunk/icecast/src/client.h 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/client.h 2005-08-11 23:29:58 UTC (rev 9733)
@@ -67,7 +67,7 @@
} client_t;
-client_t *client_create(connection_t *con, http_parser_t *parser);
+int client_create (client_t **c_ptr, connection_t *con, http_parser_t *parser);
void client_destroy(client_t *client);
void client_send_504(client_t *client, char *message);
void client_send_404(client_t *client, char *message);
Modified: icecast/trunk/icecast/src/connection.c
===================================================================
--- icecast/trunk/icecast/src/connection.c 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/connection.c 2005-08-11 23:29:58 UTC (rev 9733)
@@ -80,10 +80,13 @@
#define SHOUTCAST_SOURCE_AUTH 1
#define ICECAST_SOURCE_AUTH 0
-typedef struct con_queue_tag {
- connection_t *con;
- struct con_queue_tag *next;
-} con_queue_t;
+typedef struct client_queue_tag {
+ client_t *client;
+ int offset;
+ int stream_offset;
+ int shoutcast;
+ struct client_queue_tag *next;
+} client_queue_t;
typedef struct _thread_queue_tag {
thread_type *thread_id;
@@ -93,12 +96,13 @@
static mutex_t _connection_mutex;
static volatile unsigned long _current_id = 0;
static int _initialized = 0;
+static thread_type *tid;
-volatile static con_queue_t *_queue = NULL;
-static mutex_t _queue_mutex;
+static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
+static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
+static mutex_t _con_queue_mutex;
+static mutex_t _req_queue_mutex;
-static thread_queue_t *_conhands = NULL;
-
rwlock_t _source_shutdown_rwlock;
static void *_handle_connection(void *arg);
@@ -108,7 +112,8 @@
if (_initialized) return;
thread_mutex_create(&_connection_mutex);
- thread_mutex_create(&_queue_mutex);
+ thread_mutex_create(&_con_queue_mutex);
+ thread_mutex_create(&_req_queue_mutex);
thread_mutex_create(&move_clients_mutex);
thread_rwlock_create(&_source_shutdown_rwlock);
thread_cond_create(&global.shutdown_cond);
@@ -122,7 +127,8 @@
thread_cond_destroy(&global.shutdown_cond);
thread_rwlock_destroy(&_source_shutdown_rwlock);
- thread_mutex_destroy(&_queue_mutex);
+ thread_mutex_destroy(&_con_queue_mutex);
+ thread_mutex_destroy(&_req_queue_mutex);
thread_mutex_destroy(&_connection_mutex);
thread_mutex_destroy(&move_clients_mutex);
@@ -140,19 +146,19 @@
return id;
}
-connection_t *create_connection(sock_t sock, sock_t serversock, char *ip) {
+connection_t *connection_create (sock_t sock, sock_t serversock, char *ip)
+{
connection_t *con;
- con = (connection_t *)malloc(sizeof(connection_t));
- memset(con, 0, sizeof(connection_t));
- con->sock = sock;
- con->serversock = serversock;
- con->con_time = time(NULL);
- con->id = _next_connection_id();
- con->ip = ip;
+ con = (connection_t *)calloc(1, sizeof(connection_t));
+ if (con)
+ {
+ con->sock = sock;
+ con->serversock = serversock;
+ con->con_time = time(NULL);
+ con->id = _next_connection_id();
+ con->ip = ip;
+ }
- con->event_number = EVENT_NO_EVENT;
- con->event = NULL;
-
return con;
}
@@ -254,8 +260,11 @@
ip = (char *)malloc(MAX_ADDR_LEN);
sock = sock_accept(serversock, ip, MAX_ADDR_LEN);
- if (sock >= 0) {
- con = create_connection(sock, serversock, ip);
+ if (sock >= 0)
+ {
+ con = connection_create (sock, serversock, ip);
+ if (con == NULL)
+ free (ip);
return con;
}
@@ -268,172 +277,208 @@
return NULL;
}
-static void _add_connection(connection_t *con)
+
+/* add client to connection queue. At this point some header information
+ * has been collected, so we now pass it onto the connection thread for
+ * further processing
+ */
+static void _add_connection (client_queue_t *node)
{
- con_queue_t *node;
-
- node = (con_queue_t *)malloc(sizeof(con_queue_t));
-
- thread_mutex_lock(&_queue_mutex);
- node->con = con;
- node->next = (con_queue_t *)_queue;
- _queue = node;
- thread_mutex_unlock(&_queue_mutex);
+ thread_mutex_lock (&_con_queue_mutex);
+ *_con_queue_tail = node;
+ _con_queue_tail = (volatile client_queue_t **)&node->next;
+ thread_mutex_unlock (&_con_queue_mutex);
}
-static void _push_thread(thread_queue_t **queue, thread_type *thread_id)
+
+/* this returns queued clients for the connection thread. headers are
+ * already provided, but need to be parsed.
+ */
+static client_queue_t *_get_connection(void)
{
- /* create item */
- thread_queue_t *item = (thread_queue_t *)malloc(sizeof(thread_queue_t));
- item->thread_id = thread_id;
- item->next = NULL;
+ client_queue_t *node = NULL;
-
- thread_mutex_lock(&_queue_mutex);
- if (*queue == NULL) {
- *queue = item;
- } else {
- item->next = *queue;
- *queue = item;
+ /* common case, no new connections so don't bother taking locks */
+ if (_con_queue)
+ {
+ thread_mutex_lock (&_con_queue_mutex);
+ node = (client_queue_t *)_con_queue;
+ _con_queue = node->next;
+ if (_con_queue == NULL)
+ _con_queue_tail = &_con_queue;
+ thread_mutex_unlock (&_con_queue_mutex);
}
- thread_mutex_unlock(&_queue_mutex);
+ return node;
}
-static thread_type *_pop_thread(thread_queue_t **queue)
+
+/* run along queue checking for any data that has come in or a timeout */
+static void process_request_queue ()
{
- thread_type *id;
- thread_queue_t *item;
+ client_queue_t **node_ref = (client_queue_t **)&_req_queue;
+ ice_config_t *config = config_get_config ();
+ int timeout = config->header_timeout;
+ config_release_config();
- thread_mutex_lock(&_queue_mutex);
+ while (*node_ref)
+ {
+ client_queue_t *node = *node_ref;
+ client_t *client = node->client;
+ int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
+ char *buf = client->refbuf->data + node->offset;
- item = *queue;
- if (item == NULL) {
- thread_mutex_unlock(&_queue_mutex);
- return NULL;
- }
+ if (len > 0)
+ {
+ if (client->con->con_time + timeout <= time(NULL))
+ len = 0;
+ else
+ len = client_read_bytes (client, buf, len);
+ }
- *queue = item->next;
- item->next = NULL;
- id = item->thread_id;
- free(item);
+ if (len > 0)
+ {
+ int pass_it = 1;
+ char *ptr;
- thread_mutex_unlock(&_queue_mutex);
+ node->offset += len;
+ client->refbuf->data [node->offset] = '\000';
+ do
+ {
+ if (node->shoutcast == 1)
+ {
+ /* password line */
+ if (strstr (client->refbuf->data, "\r\n") != NULL)
+ break;
+ if (strstr (client->refbuf->data, "\n") != NULL)
+ break;
+ }
+ /* stream_offset refers to the start of any data sent after the
+ * http style headers, we don't want to lose those */
+ ptr = strstr (client->refbuf->data, "\r\n\r\n");
+ if (ptr)
+ {
+ node->stream_offset = (ptr+4) - client->refbuf->data;
+ break;
+ }
+ ptr = strstr (client->refbuf->data, "\n\n");
+ if (ptr)
+ {
+ node->stream_offset = (ptr+2) - client->refbuf->data;
+ break;
+ }
+ pass_it = 0;
+ } while (0);
- return id;
+ if (pass_it)
+ {
+ if ((client_queue_t **)_req_queue_tail == &(node->next))
+ _req_queue_tail = (volatile client_queue_t **)node_ref;
+ *node_ref = node->next;
+ node->next = NULL;
+ _add_connection (node);
+ }
+ }
+ else
+ {
+ if (len == 0 || client->con->error)
+ {
+ if ((client_queue_t **)_req_queue_tail == &node->next)
+ _req_queue_tail = (volatile client_queue_t **)node_ref;
+ *node_ref = node->next;
+ client_destroy (client);
+ free (node);
+ continue;
+ }
+ }
+ node_ref = &node->next;
+ }
}
-static void _build_pool(void)
+
+/* add node to the queue of requests. This is where the clients are when
+ * initial http details are read.
+ */
+static void _add_request_queue (client_queue_t *node)
{
- ice_config_t *config;
- int i;
- thread_type *tid;
- char buff[64];
- int threadpool_size;
-
- config = config_get_config();
- threadpool_size = config->threadpool_size;
- config_release_config();
-
- for (i = 0; i < threadpool_size; i++) {
- snprintf(buff, 64, "Connection Thread #%d", i);
- tid = thread_create(buff, _handle_connection, NULL, THREAD_ATTACHED);
- _push_thread(&_conhands, tid);
- }
+ thread_mutex_lock (&_req_queue_mutex);
+ *_req_queue_tail = node;
+ _req_queue_tail = (volatile client_queue_t **)&node->next;
+ thread_mutex_unlock (&_req_queue_mutex);
}
-static void _destroy_pool(void)
-{
- thread_type *id;
- int i;
- i = 0;
-
- id = _pop_thread(&_conhands);
- while (id != NULL) {
- thread_join(id);
- id = _pop_thread(&_conhands);
- }
- INFO0("All connection threads down");
-}
-
void connection_accept_loop(void)
{
connection_t *con;
- _build_pool();
+ tid = thread_create ("connection thread", _handle_connection, NULL, THREAD_ATTACHED);
while (global.running == ICE_RUNNING)
{
- if (global . schedule_config_reread)
+ con = _accept_connection();
+
+ if (con)
{
- /* reread config file */
- INFO0("Scheduling config reread ...");
+ client_queue_t *node;
+ ice_config_t *config;
+ int i;
+ client_t *client = NULL;
- connection_inject_event(EVENT_CONFIG_READ, NULL);
- global . schedule_config_reread = 0;
- }
+ global_lock();
+ if (client_create (&client, con, NULL) < 0)
+ {
+ global_unlock();
+ client_send_404 (client, "Icecast connection limit reached");
+ continue;
+ }
+ global_unlock();
- con = _accept_connection();
+ /* setup client for reading incoming http */
+ client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+ client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
+ client->refbuf->len = 0; /* force reader code to ignore buffer */
- if (con) {
- _add_connection(con);
+ node = calloc (1, sizeof (client_queue_t));
+ if (node == NULL)
+ {
+ client_destroy (client);
+ continue;
+ }
+ node->client = client;
+
+ /* Check for special shoutcast compatability processing */
+ config = config_get_config();
+ for (i = 0; i < global.server_sockets; i++)
+ {
+ if (global.serversock[i] == con->serversock)
+ {
+ if (config->listeners[i].shoutcast_compat)
+ node->shoutcast = 1;
+ }
+ }
+ config_release_config();
+
+ sock_set_blocking (client->con->sock, SOCK_NONBLOCK);
+ sock_set_nodelay (client->con->sock);
+
+ _add_request_queue (node);
+ stats_event_inc (NULL, "connections");
}
+ process_request_queue ();
}
/* Give all the other threads notification to shut down */
thread_cond_broadcast(&global.shutdown_cond);
- _destroy_pool();
+ if (tid)
+ thread_join (tid);
/* wait for all the sources to shutdown */
thread_rwlock_wlock(&_source_shutdown_rwlock);
thread_rwlock_unlock(&_source_shutdown_rwlock);
}
-static connection_t *_get_connection(void)
-{
- con_queue_t *node = NULL;
- con_queue_t *oldnode = NULL;
- connection_t *con = NULL;
- /* common case, no new connections so don't bother taking locks */
- if (_queue == NULL)
- return NULL;
-
- thread_mutex_lock(&_queue_mutex);
- if (_queue) {
- node = (con_queue_t *)_queue;
- while (node->next) {
- oldnode = node;
- node = node->next;
- }
-
- /* node is now the last node
- ** and oldnode is the previous one, or NULL
- */
- if (oldnode) oldnode->next = NULL;
- else (_queue) = NULL;
- }
- thread_mutex_unlock(&_queue_mutex);
-
- if (node) {
- con = node->con;
- free(node);
- }
-
- return con;
-}
-
-void connection_inject_event(int eventnum, void *event_data) {
- connection_t *con = calloc(1, sizeof(connection_t));
-
- con->event_number = eventnum;
- con->event = event_data;
-
- _add_connection(con);
-}
-
-
/* Called when activating a source. Verifies that the source count is not
* exceeded and applies any initial parameters.
*/
@@ -484,10 +529,6 @@
return -1;
}
- global.sources++;
- stats_event_args (NULL, "sources", "%d", global.sources);
- global_unlock();
-
/* for relays, we don't yet have a client, however we do require one
* to retrieve the stream from. This is created here, quite late,
* because we can't use this client to return an error code/message,
@@ -495,12 +536,9 @@
*/
if (source->client == NULL)
{
- source->client = client_create (source->con, source->parser);
- if (source->client == NULL)
+ if (client_create (&source->client, source->con, source->parser) < 0)
{
config_release_config();
- global_lock();
- global.sources--;
global_unlock();
connection_close (source->con);
source->con = NULL;
@@ -509,6 +547,9 @@
return -1;
}
}
+ global.sources++;
+ stats_event_args (NULL, "sources", "%d", global.sources);
+ global_unlock();
source->running = 1;
mountinfo = config_find_mount (config, source->mount);
@@ -823,160 +864,141 @@
return;
}
- sock_set_blocking(client->con->sock, SOCK_NONBLOCK);
- sock_set_nodelay(client->con->sock);
-
- client->write_to_client = format_generic_write_to_client;
- client->check_buffer = format_check_http_buffer;
- client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
-
add_client (uri, client);
if (uri != passed_uri) free (uri);
}
-void _handle_shoutcast_compatible(connection_t *con, char *mount, char *source_password) {
- char shoutcast_password[256];
+static void _handle_shoutcast_compatible (client_queue_t *node)
+{
char *http_compliant;
int http_compliant_len = 0;
- char header[4096];
http_parser_t *parser;
+ ice_config_t *config = config_get_config ();
+ char *shoutcast_mount;
+ client_t *client = node->client;
- memset(shoutcast_password, 0, sizeof (shoutcast_password));
- /* Step one of shoutcast auth protocol, read encoder password (1 line) */
- if (util_read_header(con->sock, shoutcast_password,
- sizeof (shoutcast_password),
- READ_LINE) == 0) {
- /* either we didn't get a complete line, or we timed out */
- connection_close(con);
- return;
- }
- /* Get rid of trailing \n */
- shoutcast_password[strlen(shoutcast_password)-1] = '\000';
- if (strcmp(shoutcast_password, source_password)) {
- ERROR0("Invalid source password");
- connection_close(con);
- return;
- }
- /* Step two of shoutcast auth protocol, send OK2. For those
- interested, OK2 means it supports metadata updates via admin.cgi,
- and the string "OK" can also be sent, but will indicate to the
- shoutcast source client to not send metadata updates.
- I believe icecast 1.x used to send OK. */
- sock_write(con->sock, "%s\r\n", "OK2");
+ if (node->shoutcast == 1)
+ {
+ char *source_password, *ptr;
+ mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount);
- memset(header, 0, sizeof (header));
- /* Step three of shoutcast auth protocol, read HTTP-style
- request headers and process them.*/
- if (util_read_header(con->sock, header, sizeof (header),
- READ_ENTIRE_HEADER) == 0) {
- /* either we didn't get a complete header, or we timed out */
- connection_close(con);
+ if (mountinfo && mountinfo->password)
+ source_password = strdup (mountinfo->password);
+ else
+ source_password = strdup (config->source_password);
+ config_release_config();
+
+ /* Get rid of trailing \r\n or \n after password */
+ ptr = strstr (client->refbuf->data, "\r\n");
+ if (ptr == NULL)
+ ptr = strstr (client->refbuf->data, "\n");
+
+ if (ptr == NULL)
+ {
+ client_destroy (client);
+ free (source_password);
+ free (node);
+ return;
+ }
+ *ptr = '\0';
+
+ if (strcmp (client->refbuf->data, source_password) == 0)
+ {
+ client->respcode = 200;
+ /* send this non-blocking but if there is only a partial write
+ * then leave to header timeout */
+ sock_write (client->con->sock, "OK2\r\n");
+ memset (client->refbuf->data, 0, client->refbuf->len);
+ node->shoutcast = 2;
+ node->offset = 0;
+ /* we've checked the password, now send it back for reading headers */
+ _add_request_queue (node);
+ free (source_password);
+ return;
+ }
+ client_destroy (client);
+ free (node);
return;
}
+ shoutcast_mount = strdup (config->shoutcast_mount);
+ config_release_config();
/* Here we create a valid HTTP request based of the information
that was passed in via the non-HTTP style protocol above. This
means we can use some of our existing code to handle this case */
- http_compliant_len = strlen(header) + strlen(mount) + 20;
+ http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset;
http_compliant = (char *)calloc(1, http_compliant_len);
snprintf (http_compliant, http_compliant_len,
- "SOURCE %s HTTP/1.0\r\n%s", mount, header);
+ "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, client->refbuf->data);
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
if (httpp_parse (parser, http_compliant, strlen(http_compliant)))
{
- client_t *client = client_create (con, parser);
- if (client)
+ /* we may have more than just headers, so prepare for it */
+ if (node->stream_offset == node->offset)
+ client->refbuf->len = 0;
+ else
{
- _handle_source_request (client, mount, SHOUTCAST_SOURCE_AUTH);
- free (http_compliant);
- return;
+ char *ptr = client->refbuf->data;
+ client->refbuf->len = node->offset - node->stream_offset;
+ memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
}
+ client->parser = parser;
+ _handle_source_request (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH);
}
- connection_close (con);
- httpp_destroy (parser);
+ else
+ client_destroy (client);
free (http_compliant);
+ free (shoutcast_mount);
+ free (node);
+ return;
}
+
+/* Connection thread. Here we take clients off the connection queue and check
+ * the contents provided. We set up the parser then hand off to the specific
+ * request handler.
+ */
static void *_handle_connection(void *arg)
{
- char header[4096];
- connection_t *con;
http_parser_t *parser;
char *rawuri, *uri;
- client_t *client;
- int i = 0;
- int continue_flag = 0;
- ice_config_t *config;
- char *source_password;
while (global.running == ICE_RUNNING) {
- /* grab a connection and set the socket to blocking */
- while ((con = _get_connection())) {
+ client_queue_t *node = _get_connection();
- /* Handle meta-connections */
- if(con->event_number > 0) {
- switch(con->event_number) {
- case EVENT_CONFIG_READ:
- event_config_read(con->event);
- break;
- default:
- ERROR1("Unknown event number: %d", con->event_number);
- break;
- }
- free(con);
- continue;
- }
+ if (node)
+ {
+ client_t *client = node->client;
- stats_event_inc(NULL, "connections");
-
- sock_set_blocking(con->sock, SOCK_BLOCK);
-
- continue_flag = 0;
/* Check for special shoutcast compatability processing */
- for(i = 0; i < MAX_LISTEN_SOCKETS; i++) {
- if(global.serversock[i] == con->serversock) {
- config = config_get_config();
- if (config->listeners[i].shoutcast_compat) {
- char *shoutcast_mount = strdup (config->shoutcast_mount);
- mount_proxy *mountinfo = config_find_mount (config, config->shoutcast_mount);
- if (mountinfo && mountinfo->password)
- source_password = strdup (mountinfo->password);
- else
- source_password = strdup (config->source_password);
- config_release_config();
- _handle_shoutcast_compatible(con, shoutcast_mount, source_password);
- free(source_password);
- free (shoutcast_mount);
- continue_flag = 1;
- break;
- }
- config_release_config();
- }
- }
- if(continue_flag) {
+ if (node->shoutcast)
+ {
+ _handle_shoutcast_compatible (node);
continue;
}
- /* fill header with the http header */
- memset(header, 0, sizeof (header));
- if (util_read_header(con->sock, header, sizeof (header),
- READ_ENTIRE_HEADER) == 0) {
- /* either we didn't get a complete header, or we timed out */
- connection_close(con);
- continue;
- }
-
+ /* process normal HTTP headers */
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
- if (httpp_parse(parser, header, strlen(header))) {
- /* handle the connection or something */
+ client->parser = parser;
+ if (httpp_parse (parser, client->refbuf->data, node->offset))
+ {
+ /* we may have more than just headers, so prepare for it */
+ if (node->stream_offset == node->offset)
+ client->refbuf->len = 0;
+ else
+ {
+ char *ptr = client->refbuf->data;
+ client->refbuf->len = node->offset - node->stream_offset;
+ memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
+ }
+ free (node);
if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) &&
strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) {
ERROR0("Bad HTTP protocol detected");
- connection_close(con);
- httpp_destroy(parser);
+ client_destroy (client);
continue;
}
@@ -985,21 +1007,9 @@
if (uri == NULL)
{
- sock_write(con->sock, "The path you requested was invalid\r\n");
- connection_close(con);
- httpp_destroy(parser);
+ client_destroy (client);
continue;
}
- client = client_create (con, parser);
- if (client == NULL)
- {
- sock_write (con->sock, "HTTP/1.0 404 File Not Found\r\n"
- "Content-Type: text/html\r\n\r\n"
- "<b>Connection limit reached</b>");
- connection_close(con);
- httpp_destroy(parser);
- continue;
- }
if (parser->req_type == httpp_req_source) {
_handle_source_request (client, uri, ICECAST_SOURCE_AUTH);
@@ -1016,16 +1026,16 @@
}
free(uri);
- continue;
}
- else {
+ else
+ {
+ free (node);
ERROR0("HTTP request parsing failed");
- connection_close(con);
- httpp_destroy(parser);
- continue;
+ client_destroy (client);
}
+ continue;
}
- thread_sleep (100000);
+ thread_sleep (50000);
}
DEBUG0 ("Connection thread done");
Modified: icecast/trunk/icecast/src/connection.h
===================================================================
--- icecast/trunk/icecast/src/connection.h 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/connection.h 2005-08-11 23:29:58 UTC (rev 9733)
@@ -38,20 +38,15 @@
char *ip;
char *host;
- /* For 'fake' connections */
- int event_number;
- void *event;
} connection_t;
void connection_initialize(void);
void connection_shutdown(void);
void connection_accept_loop(void);
void connection_close(connection_t *con);
-connection_t *create_connection(sock_t sock, sock_t serversock, char *ip);
+connection_t *connection_create (sock_t sock, sock_t serversock, char *ip);
int connection_complete_source (struct source_tag *source);
-void connection_inject_event(int eventnum, void *event_data);
-
int connection_check_source_pass(http_parser_t *parser, const char *mount);
int connection_check_relay_pass(http_parser_t *parser);
int connection_check_admin_pass(http_parser_t *parser);
Modified: icecast/trunk/icecast/src/fserve.c
===================================================================
--- icecast/trunk/icecast/src/fserve.c 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/fserve.c 2005-08-11 23:29:58 UTC (rev 9733)
@@ -571,9 +571,6 @@
fclient->client = client;
fclient->ready = 0;
- sock_set_blocking (client->con->sock, SOCK_NONBLOCK);
- sock_set_nodelay (client->con->sock);
-
thread_mutex_lock (&pending_lock);
fclient->next = (fserve_t *)pending_list;
pending_list = fclient;
Modified: icecast/trunk/icecast/src/slave.c
===================================================================
--- icecast/trunk/icecast/src/slave.c 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/slave.c 2005-08-11 23:29:58 UTC (rev 9733)
@@ -56,6 +56,7 @@
#include "logging.h"
#include "source.h"
#include "format.h"
+#include "event.h"
#define CATMODULE "slave"
@@ -180,7 +181,7 @@
relay->server, relay->port, relay->mount);
break;
}
- con = create_connection (streamsock, -1, NULL);
+ con = connection_create (streamsock, -1, NULL);
if (relay->username && relay->password)
{
@@ -598,6 +599,13 @@
{
relay_server *cleanup_relays;
+ /* re-read xml file if requested */
+ if (global . schedule_config_reread)
+ {
+ event_config_read (NULL);
+ global . schedule_config_reread = 0;
+ }
+
thread_sleep (1000000);
if (slave_running == 0)
break;
Modified: icecast/trunk/icecast/src/source.c
===================================================================
--- icecast/trunk/icecast/src/source.c 2005-08-11 23:17:00 UTC (rev 9732)
+++ icecast/trunk/icecast/src/source.c 2005-08-11 23:29:58 UTC (rev 9733)
@@ -602,9 +602,6 @@
stats_event (source->mount, "listener_peak", "0");
stats_event_time (source->mount, "stream_start");
- if (source->client->con)
- sock_set_blocking (source->con->sock, SOCK_NONBLOCK);
-
DEBUG0("Source creation complete");
source->last_read = time (NULL);
source->running = 1;
More information about the commits
mailing list