[xiph-commits] r16219 - in icecast/branches/kh/icecast: . admin src win32
karl at svn.xiph.org
karl at svn.xiph.org
Tue Jul 7 16:26:21 PDT 2009
Author: karl
Date: 2009-07-07 16:26:21 -0700 (Tue, 07 Jul 2009)
New Revision: 16219
Modified:
icecast/branches/kh/icecast/NEWS
icecast/branches/kh/icecast/admin/moveclients.xsl
icecast/branches/kh/icecast/admin/stats.xsl
icecast/branches/kh/icecast/config.h.vc6
icecast/branches/kh/icecast/configure.in
icecast/branches/kh/icecast/src/admin.c
icecast/branches/kh/icecast/src/auth.c
icecast/branches/kh/icecast/src/auth.h
icecast/branches/kh/icecast/src/cfgfile.c
icecast/branches/kh/icecast/src/cfgfile.h
icecast/branches/kh/icecast/src/client.c
icecast/branches/kh/icecast/src/client.h
icecast/branches/kh/icecast/src/connection.c
icecast/branches/kh/icecast/src/event.c
icecast/branches/kh/icecast/src/format.c
icecast/branches/kh/icecast/src/format.h
icecast/branches/kh/icecast/src/format_mp3.c
icecast/branches/kh/icecast/src/format_ogg.c
icecast/branches/kh/icecast/src/fserve.c
icecast/branches/kh/icecast/src/fserve.h
icecast/branches/kh/icecast/src/global.c
icecast/branches/kh/icecast/src/global.h
icecast/branches/kh/icecast/src/main.c
icecast/branches/kh/icecast/src/slave.c
icecast/branches/kh/icecast/src/slave.h
icecast/branches/kh/icecast/src/source.c
icecast/branches/kh/icecast/src/source.h
icecast/branches/kh/icecast/src/stats.c
icecast/branches/kh/icecast/src/xslt.c
icecast/branches/kh/icecast/win32/icecast.dsp
icecast/branches/kh/icecast/win32/icecast2.iss
Log:
commit kh10 changes. Fairly major as it changes the threading. We now use N
workers (as specified in the xml) instead of the one thread per source model.
This affects most files as they tend to interact with a client to some degree
but some of those are in minor ways.
The most significant changes are in :-
- file serving engine. There is no separate thread now, so the client reads
and sends are driven from the worker, this includes the error responses.
- The source client and listener processing. The source client details have
to stay around until the listeners are processed.
- slave engine. relays are always allocated a client on the worker, with a
thread created just for the connection phase. Makes it easier to handle when
switching to another relay source.
Modified: icecast/branches/kh/icecast/NEWS
===================================================================
--- icecast/branches/kh/icecast/NEWS 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/NEWS 2009-07-07 23:26:21 UTC (rev 16219)
@@ -15,6 +15,34 @@
any extra tags are show in the conf/icecast.xml.dist file
+2.3.2-kh10
+. big internal thread change. instead of one thread per stream, we now have a fixed number
+ of worker threads, each processing a set of clients (sources/relays/listeners etc).
+ - no source or fserve threads now. auth still has thread(s) as the API for libcurl
+ is blocking
+ - worker threads with no clients sleep for long durations.
+ - defaults to 1 worker thread, overridden by <workers> in <limits>
+ - listeners get moved to the worker thread running the source, helps caching
+ - sources get moved to a less busy thread if it would help
+ - each client has a timestamp to indicate when it wants service. The sleep duration
+ for the worker is based on the next client to be serviced.
+. if fallback to file is specified and the requested mount is not active then a 404
+ response is returned instead of the file contents.
+. fserve engine can handle bandwidth limiting per client. Fallback to file uses this
+ by using the average incoming bitrate of the source stream as the target limit.
+. file serving uses an internal cache of open files, so that 1000 listerners do not
+ suddenly open 1000 files if a fallback to file occurs.
+. move listeners admin request can specify another source or file.
+. relay changes
+ - the client structure is always allocated now, instead of when the relay is started.
+ - listeners stay on relay if switching to another relay master. Only when they all
+ have failed does the fallback apply.
+. post-kh9 fixes
+ - fix for possible race with rejected new listeners going through auth
+ - make mp3-metadata-interval 0 work again
+ - auth htpasswd checks for details provided like user:pass at host:port/mount and if not
+ then checks as host:port/mount?user=a&pass=b
+
2.3.2-kh9
. allow shoutcast source client auth work via stream_auth url
. allow for a flash policy file.
Modified: icecast/branches/kh/icecast/admin/moveclients.xsl
===================================================================
--- icecast/branches/kh/icecast/admin/moveclients.xsl 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/admin/moveclients.xsl 2009-07-07 23:26:21 UTC (rev 16219)
@@ -23,8 +23,8 @@
<xsl:for-each select="source">
<table border="0" cellpadding="6" cellspacing="5" >
<tr>
- <td>Move from (<xsl:copy-of select="$currentmount" />) to (<xsl:value-of select="@mount" />)</td>
- <td><xsl:value-of select="listeners" /> Listeners</td>
+ <td>Move from <xsl:copy-of select="$currentmount" /> to <xsl:value-of select="@mount" /></td>
+ <td>(<xsl:value-of select="listeners" /> Listeners)</td>
<td><a class="nav2" href="moveclients.xsl?mount={$currentmount}&destination={@mount}">Move Clients</a></td>
</tr>
</table>
Modified: icecast/branches/kh/icecast/admin/stats.xsl
===================================================================
--- icecast/branches/kh/icecast/admin/stats.xsl 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/admin/stats.xsl 2009-07-07 23:26:21 UTC (rev 16219)
@@ -71,7 +71,7 @@
<tr>
<td align="center">
<a class="nav2" href="listclients.xsl?mount={@mount}">List Clients</a>
- <a class="nav2" href="moveclients.xsl?mount={@mount}">Move MountPoints</a>
+ <a class="nav2" href="moveclients.xsl?mount={@mount}">Move Listeners</a>
<a class="nav2" href="updatemetadata.xsl?mount={@mount}">Update Metadata</a>
<a class="nav2" href="killsource.xsl?mount={@mount}">Kill Source</a>
<xsl:if test="authenticator"><a class="nav2" href="manageauth.xsl?mount={@mount}">Manage Authentication</a></xsl:if>
Modified: icecast/branches/kh/icecast/config.h.vc6
===================================================================
--- icecast/branches/kh/icecast/config.h.vc6 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/config.h.vc6 2009-07-07 23:26:21 UTC (rev 16219)
@@ -95,7 +95,7 @@
#define PACKAGE_NAME "Icecast"
/* Version number of package */
-#define VERSION "2.3.2-kh9"
+#define VERSION "2.3.2-kh10"
/* Define to the version of this package. */
#define PACKAGE_VERSION VERSION
@@ -128,6 +128,7 @@
/* define if compiler does not handle __attribute__ keyword */
#define __attribute__(x)
+#define HAVE_STRCASECMP 1
#define strcasecmp _stricmp
#define strncasecmp _strnicmp
Modified: icecast/branches/kh/icecast/configure.in
===================================================================
--- icecast/branches/kh/icecast/configure.in 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/configure.in 2009-07-07 23:26:21 UTC (rev 16219)
@@ -1,4 +1,4 @@
-AC_INIT([Icecast], [2.3.2-kh9b], [karl at xiph.org])
+AC_INIT([Icecast], [2.3.2-kh10], [karl at xiph.org])
AC_PREREQ(2.59)
AC_CONFIG_SRCDIR(src/main.c)
@@ -42,11 +42,14 @@
dnl Check for types
AC_TYPE_OFF_T
+AC_CHECK_TYPES([struct timespec])
dnl Checks for library functions.
-AC_CHECK_FUNCS([localtime_r poll atoll strtoll getrlimit gettimeofday ftime fstat])
+AC_CHECK_FUNCS([localtime_r poll atoll strtoll strcasecmp getrlimit gettimeofday ftime fsync])
AC_SEARCH_LIBS(nanosleep, rt posix4,
AC_DEFINE(HAVE_NANOSLEEP, 1, [Define if you have nanosleep]))
+AC_SEARCH_LIBS(clock_gettime, rt posix4,
+ AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Define if you have clock_gettime]))
XIPH_NET
dnl -- configure options --
Modified: icecast/branches/kh/icecast/src/admin.c
===================================================================
--- icecast/branches/kh/icecast/src/admin.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/admin.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -83,7 +83,6 @@
};
-static time_t now;
static struct admin_command admin_general[] =
{
@@ -221,7 +220,7 @@
client->refbuf->len = len;
xmlFree(buff);
client->respcode = 200;
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
if (response == XSLT)
{
@@ -436,7 +435,7 @@
"<html><head><title>Admin request successful</title></head>"
"<body><p>%s</p></body></html>", message);
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -444,7 +443,6 @@
int response)
{
const char *dest_source;
- source_t *dest;
xmlDocPtr doc;
xmlNodePtr node;
int parameters_passed = 0;
@@ -459,34 +457,14 @@
xmlFreeDoc(doc);
return;
}
+ INFO2 ("source is \"%s\", destination is \"%s\"", source->mount, dest_source);
- dest = source_find_mount (dest_source);
-
- if (dest == NULL)
- {
- client_send_400 (client, "No such destination");
- return;
- }
-
- if (strcmp (dest->mount, source->mount) == 0)
- {
- client_send_400 (client, "supplied mountpoints are identical");
- return;
- }
-
- if (dest->running == 0 && dest->on_demand == 0)
- {
- client_send_400 (client, "Destination not running");
- return;
- }
-
- INFO2 ("source is \"%s\", destination is \"%s\"", source->mount, dest->mount);
-
doc = xmlNewDoc(XMLSTR("1.0"));
node = xmlNewDocNode(doc, NULL, XMLSTR("iceresponse"), NULL);
xmlDocSetRootElement(doc, node);
- source_move_clients (source, dest);
+ source_set_fallback (source, dest_source);
+ source->flags |= SOURCE_TEMPORARY_FALLBACK;
snprintf (buf, sizeof(buf), "Clients moved from %s to %s",
source->mount, dest_source);
@@ -611,7 +589,7 @@
if (relay->enable == 0)
{
if (relay->source && source_running (relay->source) == 0)
- relay->source->on_demand = 0;
+ relay->source->flags &= ~SOURCE_ON_DEMAND;
}
slave_update_all_mounts();
}
@@ -631,6 +609,7 @@
{
const char *useragent;
char buf[30];
+ source_t *source = listener->shared_data;
xmlNodePtr node = xmlNewChild (srcnode, NULL, XMLSTR("listener"), NULL);
@@ -647,12 +626,15 @@
xmlFree (str);
}
- snprintf (buf, sizeof (buf), "%u", listener->lag);
+ snprintf (buf, sizeof (buf), "%ld", (long)(source->client->queue_pos - listener->queue_pos));
xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
- snprintf (buf, sizeof (buf), "%lu",
- (unsigned long)(now - listener->con->con_time));
- xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
+ if (listener->worker)
+ {
+ snprintf (buf, sizeof (buf), "%lu",
+ (unsigned long)(listener->worker->current_time.tv_sec - listener->con->con_time));
+ xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
+ }
if (listener->username)
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(listener->username));
@@ -674,8 +656,7 @@
thread_mutex_lock (&source->lock);
- now = time(NULL);
- listener = source->active_clients;
+ listener = source->client_list;
while (listener)
{
add_listener_node (srcnode, listener);
@@ -755,8 +736,7 @@
client_t *listener;
thread_mutex_lock (&source->lock);
- now = time(NULL);
- listener = source->active_clients;
+ listener = source->client_list;
while (listener)
{
if (listener->con->id == id)
@@ -786,7 +766,7 @@
if (source->format->get_image (client, source->format) == 0)
{
thread_mutex_unlock (&source->lock);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
return;
}
thread_mutex_unlock (&source->lock);
@@ -821,7 +801,7 @@
config_release_config();
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -926,7 +906,7 @@
xmlNewChild(node, NULL, XMLSTR("return"), XMLSTR("1"));
xmlDocSetRootElement(doc, node);
- source->running = 0;
+ source->flags &= ~SOURCE_RUNNING;
admin_send_response(doc, client, response, "response.xsl");
xmlFreeDoc(doc);
@@ -1085,7 +1065,7 @@
return;
}
- if (source->shoutcast_compat == 0)
+ if ((source->flags & SOURCE_SHOUTCAST_COMPAT) == 0)
{
ERROR0 ("illegal change of metadata on non-shoutcast compatible stream");
client_send_400 (client, "illegal metadata call");
@@ -1192,7 +1172,7 @@
http->next = content;
client->respcode = 200;
client_set_queue (client, http);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
}
@@ -1214,7 +1194,7 @@
client->refbuf->next = stats_get_streams (1);
else
client->refbuf->next = stats_get_streams (0);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
else
{
Modified: icecast/branches/kh/icecast/src/auth.c
===================================================================
--- icecast/branches/kh/icecast/src/auth.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/auth.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -189,9 +189,9 @@
client_t *client = auth_user->client;
if (client->respcode)
- client_destroy (client);
- else
- client_send_401 (client, auth_user->auth->realm);
+ client->con->error = 1;
+ client_send_401 (client, auth_user->auth->realm);
+ client->flags |= CLIENT_ACTIVE;
auth_user->client = NULL;
}
auth_release (auth_user->auth);
@@ -354,6 +354,29 @@
}
+void move_listener (client_t *client, struct _fbinfo *finfo)
+{
+ source_t *source;
+
+ DEBUG1 ("moving listener to %s", finfo->mount);
+ avl_tree_rlock (global.source_tree);
+ source = source_find_mount (finfo->mount);
+
+ if (source && source_available (source))
+ {
+ thread_mutex_lock (&source->lock);
+ avl_tree_unlock (global.source_tree);
+ source_setup_listener (source, client);
+ thread_mutex_unlock (&source->lock);
+ }
+ else
+ {
+ avl_tree_unlock (global.source_tree);
+ fserve_setup_client_fb (client, finfo);
+ }
+}
+
+
/* Add listener to the pending lists of either the source or fserve thread. This can be run
* from the connection or auth thread context. return -1 to indicate that client has been
* terminated, 0 for receiving content.
@@ -402,7 +425,7 @@
{
DEBUG1 ("disable seek on file matching %s", mountinfo->mountname);
httpp_deletevar (client->parser, "range");
- httpp_setvar (client->parser, HTTPP_VAR_NO_CONTENT_LENGTH, "yes");
+ client->flags |= CLIENT_NO_CONTENT_LENGTH;
}
ret = fserve_client_create (client, mount);
}
@@ -430,7 +453,10 @@
else if (auth->rejected_mount)
mount = auth->rejected_mount;
else
+ {
+ client->flags |= CLIENT_ACTIVE;
return -1;
+ }
}
config = config_get_config();
mountinfo = config_find_mount (config, mount);
@@ -462,6 +488,7 @@
DEBUG1 ("on mountpoint %s", mount);
source_startup (client, mount);
}
+ client->flags |= CLIENT_ACTIVE;
}
@@ -473,9 +500,6 @@
mount_proxy *mountinfo;
ice_config_t *config;
- /* we don't need any more data from the listener, just setup for writing */
- client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
-
if (connection_check_relay_pass (client->parser))
{
client->flags |= (CLIENT_IS_SLAVE|CLIENT_AUTHENTICATED);
@@ -502,6 +526,7 @@
}
auth_user = auth_client_setup (mount, client);
auth_user->process = auth_new_listener;
+ client->flags &= ~CLIENT_ACTIVE;
INFO0 ("adding client for authentication");
queue_auth_client (auth_user, mountinfo);
}
@@ -690,6 +715,7 @@
auth_user->process = stream_auth_callback;
INFO1 ("request source auth for \"%s\"", mount);
+ client->flags &= ~CLIENT_ACTIVE;
queue_auth_client (auth_user, mountinfo);
return 1;
}
Modified: icecast/branches/kh/icecast/src/auth.h
===================================================================
--- icecast/branches/kh/icecast/src/auth.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/auth.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -19,6 +19,7 @@
struct source_tag;
struct auth_tag;
+struct _fbinfo;
typedef struct _auth_thread_t auth_thread_t;
#include <libxml/xmlmemory.h>
@@ -106,6 +107,7 @@
void auth_add_listener (const char *mount, client_t *client);
int auth_release_listener (client_t *client, const char *mount, struct _mount_proxy *mountinfo);
+void move_listener (client_t *client, struct _fbinfo *finfo);
int auth_check_source (client_t *client, const char *mount);
void auth_initialise (void);
Modified: icecast/branches/kh/icecast/src/cfgfile.c
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/cfgfile.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -469,6 +469,7 @@
configuration->client_limit = CONFIG_DEFAULT_CLIENT_LIMIT;
configuration->source_limit = CONFIG_DEFAULT_SOURCE_LIMIT;
configuration->queue_size_limit = CONFIG_DEFAULT_QUEUE_SIZE_LIMIT;
+ configuration->workers_count = 1;
configuration->client_timeout = CONFIG_DEFAULT_CLIENT_TIMEOUT;
configuration->header_timeout = CONFIG_DEFAULT_HEADER_TIMEOUT;
configuration->source_timeout = CONFIG_DEFAULT_SOURCE_TIMEOUT;
@@ -927,6 +928,7 @@
{ "sources", config_get_int, &config->source_limit },
{ "queue-size", config_get_int, &config->queue_size_limit },
{ "burst-size", config_get_int, &config->burst_size },
+ { "workers", config_get_int, &config->workers_count },
{ "client-timeout", config_get_int, &config->client_timeout },
{ "header-timeout", config_get_int, &config->header_timeout },
{ "source-timeout", config_get_int, &config->source_timeout },
@@ -934,6 +936,8 @@
};
if (parse_xml_tags (node, icecast_tags))
return -1;
+ if (config->workers_count < 1) config->workers_count = 1;
+ if (config->workers_count > 400) config->workers_count = 400;
return 0;
}
Modified: icecast/branches/kh/icecast/src/cfgfile.h
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/cfgfile.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -28,6 +28,7 @@
#include "avl/avl.h"
#include "auth.h"
+#include "compat.h"
typedef struct ice_config_dir_tag
{
@@ -84,8 +85,7 @@
/* duration in seconds for sampling the bandwidth */
int avg_bitrate_duration;
- /* trigger level at which a timer is used to prevent excessive incoming bandwidth */
- int64_t limit_rate;
+ long limit_rate;
/* duration (secs) for mountpoint to be kept reserved after source client exits */
int wait_time;
@@ -154,7 +154,6 @@
int cleanup;
int enable;
time_t start;
- thread_type *thread;
struct _relay_server *next;
} relay_server;
@@ -204,6 +203,7 @@
int client_limit;
int source_limit;
unsigned int queue_size_limit;
+ int workers_count;
unsigned int burst_size;
int client_timeout;
int header_timeout;
Modified: icecast/branches/kh/icecast/src/client.c
===================================================================
--- icecast/branches/kh/icecast/src/client.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/client.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -26,6 +26,7 @@
#include "thread/thread.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
+#include "timing/timing.h"
#include "cfgfile.h"
#include "connection.h"
@@ -42,6 +43,8 @@
#undef CATMODULE
#define CATMODULE "client"
+int worker_count;
+
/* create a client_t with the provided connection and parser details. Return
* client_t ready for use. Should be called with global lock held.
*/
@@ -54,7 +57,7 @@
global.clients++;
- if (con->serversock != SOCK_ERROR)
+ if (con && con->serversock != SOCK_ERROR)
{
int i;
for (i=0; i < global.server_sockets; i++)
@@ -69,10 +72,7 @@
stats_event_args (NULL, "clients", "%d", global.clients);
client->con = con;
client->parser = parser;
- client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
- client->refbuf->len = 0; /* force reader code to ignore buffer contents */
client->pos = 0;
- client->write_to_client = format_generic_write_to_client;
return client;
}
@@ -126,19 +126,14 @@
{
int bytes;
- if (client->refbuf && client->refbuf->len)
+ if (client->refbuf && client->pos < client->refbuf->len)
{
- /* we have data to read from a refbuf first */
- if (client->refbuf->len < len)
- len = client->refbuf->len;
- memcpy (buf, client->refbuf->data, len);
- if (len < client->refbuf->len)
- {
- char *ptr = client->refbuf->data;
- memmove (ptr, ptr+len, client->refbuf->len - len);
- }
- client->refbuf->len -= len;
- return len;
+ unsigned remaining = client->refbuf->len - client->pos;
+ if (remaining > len)
+ remaining = len;
+ memcpy (buf, client->refbuf->data + client->pos, remaining);
+ client->pos += remaining;
+ return remaining;
}
bytes = client->con->read (client->con, buf, len);
@@ -158,7 +153,7 @@
"Moved <a href=\"%s\">here</a>\r\n", location, location);
client->respcode = 302;
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -169,7 +164,7 @@
"<b>%s</b>\r\n", message);
client->respcode = 400;
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -180,7 +175,9 @@
if (realm == NULL)
realm = config->server_id;
- snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
+ refbuf_release (client->refbuf);
+ client->refbuf = refbuf_new (500);
+ snprintf (client->refbuf->data, 500,
"HTTP/1.0 401 Authentication Required\r\n"
"WWW-Authenticate: Basic realm=\"%s\"\r\n"
"\r\n"
@@ -188,7 +185,7 @@
config_release_config();
client->respcode = 401;
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -201,7 +198,7 @@
"Content-Type: text/html\r\n\r\n", reason);
client->respcode = 403;
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
void client_send_403redirect (client_t *client, const char *mount, const char *reason)
@@ -214,22 +211,25 @@
void client_send_404(client_t *client, const char *message)
{
- if (client->respcode)
+ if (client->worker == NULL) /* client is not on any worker now */
{
client_destroy (client);
return;
}
- if (message == NULL)
- message = "Not Available";
- if (client->refbuf == NULL)
+ client_set_queue (client, NULL);
+ if (client->respcode == 0)
+ {
+ if (message == NULL)
+ message = "Not Available";
client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
- snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
- "HTTP/1.0 404 Not Available\r\n"
- "Content-Type: text/html\r\n\r\n"
- "<b>%s</b>\r\n", message);
- client->respcode = 404;
- client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE,
+ "HTTP/1.0 404 Not Available\r\n"
+ "Content-Type: text/html\r\n\r\n"
+ "<b>%s</b>\r\n", message);
+ client->respcode = 404;
+ client->refbuf->len = strlen (client->refbuf->data);
+ }
+ fserve_setup_client (client, NULL);
}
@@ -239,7 +239,7 @@
"HTTP/1.0 416 Request Range Not Satisfiable\r\n\r\n");
client->respcode = 416;
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
}
@@ -290,3 +290,217 @@
refbuf_release (to_release);
}
+
+worker_t *find_least_busy_handler (void)
+{
+ worker_t *handler, *min = NULL;
+
+ if (workers)
+ {
+ min = workers;
+ handler = workers->next;
+ DEBUG2 ("handler %p has %d clients", min, min->count);
+ while (handler)
+ {
+ DEBUG2 ("handler %p has %d clients", handler, handler->count);
+ if (handler->count < min->count)
+ min = handler;
+ handler = handler->next;
+ }
+ }
+ return min;
+}
+
+
+void client_change_worker (client_t *client, worker_t *dest_worker)
+{
+ worker_t *this_worker = client->worker;
+
+ // make sure this client list is ok
+ *this_worker->current_p = client->next_on_worker;
+ this_worker->count--;
+ thread_mutex_unlock (&this_worker->lock);
+
+ thread_mutex_lock (&dest_worker->lock);
+ if (dest_worker->running)
+ {
+ client->worker = dest_worker;
+ client->next_on_worker = dest_worker->clients;
+ dest_worker->clients = client;
+ dest_worker->count++;
+ client->flags |= CLIENT_HAS_CHANGED_THREAD;
+ // make client inactive so that the destination thread does not run it straight away
+ client->flags &= ~CLIENT_ACTIVE;
+ }
+ thread_mutex_unlock (&dest_worker->lock);
+ thread_mutex_lock (&this_worker->lock);
+}
+
+
+void client_add_worker (client_t *client)
+{
+ worker_t *handler;
+
+ thread_rwlock_rlock (&workers_lock);
+ /* add client to the handler with the least number of clients */
+ handler = find_least_busy_handler();
+ thread_mutex_lock (&handler->lock);
+ thread_rwlock_unlock (&workers_lock);
+
+ client->schedule_ms = handler->time_ms;
+ client->next_on_worker = handler->clients;
+ handler->clients = client;
+ client->worker = handler;
+ ++handler->count;
+ if (handler->wakeup_ms - handler->time_ms > 15)
+ thread_cond_signal (&handler->cond); /* wake thread if required */
+ thread_mutex_unlock (&handler->lock);
+}
+
+
+void *worker (void *arg)
+{
+ worker_t *handler = arg;
+ client_t *client, **prevp;
+ long prev_count = -1;
+ struct timespec wakeup_time;
+
+ handler->running = 1;
+ thread_mutex_lock (&handler->lock);
+ thread_get_timespec (&handler->current_time);
+ handler->time_ms = THREAD_TIME_MS (&handler->current_time);
+ wakeup_time = handler->current_time;
+
+ prevp = &handler->clients;
+ while (handler->running)
+ {
+ if (prev_count != handler->count)
+ {
+ DEBUG2 ("%p now has %d clients", handler, handler->count);
+ prev_count = handler->count;
+ }
+ thread_cond_timedwait (&handler->cond, &handler->lock, &wakeup_time);
+ thread_get_timespec (&handler->current_time);
+ handler->time_ms = THREAD_TIME_MS (&handler->current_time);
+ handler->wakeup_ms = handler->time_ms + 60000;
+ client = handler->clients;
+ prevp = &handler->clients;
+ while (client)
+ {
+ /* process client details but skip those that are not ready yet */
+ if (client->flags & CLIENT_ACTIVE)
+ {
+ if (client->schedule_ms <= handler->time_ms+15)
+ {
+ int ret;
+
+ handler->current_p = prevp;
+ ret = client->ops->process (client);
+
+ /* special handler, client has moved away to another worker */
+ if (client->flags & CLIENT_HAS_CHANGED_THREAD)
+ {
+ client->flags &= ~CLIENT_HAS_CHANGED_THREAD;
+ client->flags |= CLIENT_ACTIVE;
+ client = *prevp;
+ continue;
+ }
+ if (ret < 0)
+ {
+ client_t *to_go = client;
+ *prevp = to_go->next_on_worker;
+ client->next_on_worker = NULL;
+ client->worker = NULL;
+ handler->count--;
+ if (client->ops->release)
+ client->ops->release (client);
+ client = *prevp;
+ continue;
+ }
+ }
+ if (client->schedule_ms < handler->wakeup_ms)
+ handler->wakeup_ms = client->schedule_ms;
+ }
+ prevp = &client->next_on_worker;
+ client = *prevp;
+ }
+ handler->wakeup_ms += 10; /* allow a small sleep */
+ wakeup_time.tv_sec = handler->wakeup_ms/1000;
+ wakeup_time.tv_nsec = (handler->wakeup_ms%1000) *1000000;
+ }
+ thread_mutex_unlock (&handler->lock);
+ INFO0 ("shutting down");
+ return NULL;
+}
+
+
+static void worker_start (void)
+{
+ worker_t *handler = calloc (1, sizeof(worker_t));
+
+ thread_mutex_create (&handler->lock);
+ thread_cond_create (&handler->cond);
+ thread_rwlock_wlock (&workers_lock);
+ handler->next = workers;
+ workers = handler;
+ worker_count++;
+ handler->thread = thread_create ("worker", worker, handler, THREAD_ATTACHED);
+ thread_rwlock_unlock (&workers_lock);
+}
+
+static void worker_stop (void)
+{
+ worker_t *handler = workers;
+ client_t *clients = NULL;
+
+ if (handler == NULL)
+ return;
+ thread_rwlock_wlock (&workers_lock);
+ workers = handler->next;
+ worker_count--;
+ thread_rwlock_unlock (&workers_lock);
+ handler->running = 0;
+
+ thread_mutex_lock (&handler->lock);
+ clients = handler->clients;
+ handler->clients = NULL;
+ handler->count = 0;
+ thread_cond_signal (&handler->cond);
+ thread_mutex_unlock (&handler->lock);
+ // move clients to another handler
+ if (worker_count > 1 && clients)
+ {
+ client_t *endp = clients;
+ int count = 0;
+
+ thread_mutex_lock (&workers->lock);
+ while (endp->next_on_worker)
+ {
+ endp = endp->next_on_worker;
+ count++;
+ }
+ endp->next_on_worker = workers->clients;
+ workers->clients = clients;
+ workers->count += count;
+ thread_mutex_unlock (&workers->lock);
+ }
+ if (handler->count)
+ WARN1 ("%d clients left", handler->count);
+
+ thread_join (handler->thread);
+ thread_mutex_destroy (&handler->lock);
+ thread_cond_destroy (&handler->cond);
+ free (handler);
+}
+
+void workers_adjust (int new_count)
+{
+ while (worker_count != new_count)
+ {
+ if (worker_count < new_count)
+ worker_start ();
+ else
+ worker_stop ();
+ }
+}
+
Modified: icecast/branches/kh/icecast/src/client.h
===================================================================
--- icecast/branches/kh/icecast/src/client.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/client.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -19,20 +19,56 @@
#define __CLIENT_H__
typedef struct _client_tag client_t;
+typedef struct _worker_t worker_t;
#include "cfgfile.h"
#include "connection.h"
#include "refbuf.h"
#include "httpp/httpp.h"
+#include "compat.h"
+#include "thread/thread.h"
+struct _worker_t
+{
+ int running;
+ int count;
+ mutex_t lock;
+ cond_t cond;
+ client_t *clients;
+ client_t **current_p;
+ thread_type *thread;
+ struct timespec current_time;
+ uint64_t time_ms;
+ uint64_t wakeup_ms;
+ struct _worker_t *next;
+};
+
+
+extern worker_t *workers;
+extern int worker_count;
+extern rwlock_t workers_lock;
+
+struct _client_functions
+{
+ int (*process)(struct _client_tag *client);
+ void (*release)(struct _client_tag *client);
+};
+
struct _client_tag
{
+ uint64_t schedule_ms;
+
/* various states the client could be in */
unsigned int flags;
/* position in first buffer */
unsigned int pos;
+ /* http response code for this client */
+ int respcode;
+
+ client_t *next_on_worker;
+
/* the client's connection */
connection_t *con;
/* the client's http headers */
@@ -48,30 +84,36 @@
refbuf_t *refbuf;
/* byte count in queue */
- unsigned int lag;
+ uint64_t queue_pos;
- /* http response code for this client */
- int respcode;
-
/* Client username, if authenticated */
char *username;
/* Client password, if authenticated */
char *password;
+ /* generic handle */
+ void *shared_data;
+
/* Format-handler-specific data for this client */
void *format_data;
+ /* the worker the client is attached to */
+ worker_t *worker;
+
+ /* for cases where we want to limit data rates */
+ struct rate_calc *out_bitrate;
+
/* function to call to release format specific resources */
void (*free_client_data)(struct _client_tag *client);
- /* write out data associated with client */
- int (*write_to_client)(struct _client_tag *client);
-
/* function to check if refbuf needs updating */
- int (*check_buffer)(struct source_tag *source, struct _client_tag *client);
+ int (*check_buffer)(struct _client_tag *client);
- client_t *next;
+ /* functions to process client */
+ struct _client_functions *ops;
+
+ client_t *next; /* for use with grouping similar clients */
};
client_t *client_create (connection_t *con, http_parser_t *parser);
@@ -88,9 +130,18 @@
int client_read_bytes (client_t *client, void *buf, unsigned len);
void client_set_queue (client_t *client, refbuf_t *refbuf);
+void client_change_worker (client_t *client, worker_t *dest_worker);
+void client_add_worker (client_t *client);
+worker_t *find_least_busy_handler (void);
+void workers_adjust (int new_count);
+
+
/* client flags bitmask */
+#define CLIENT_ACTIVE (001)
#define CLIENT_AUTHENTICATED (002)
#define CLIENT_IS_SLAVE (004)
+#define CLIENT_HAS_CHANGED_THREAD (010)
+#define CLIENT_NO_CONTENT_LENGTH (020)
#define CLIENT_FORMAT_BIT (01000)
#endif /* __CLIENT_H__ */
Modified: icecast/branches/kh/icecast/src/connection.c
===================================================================
--- icecast/branches/kh/icecast/src/connection.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/connection.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -78,13 +78,11 @@
Icecast auth style uses HTTP and Basic Authorization.
*/
-typedef struct client_queue_tag {
- client_t *client;
- int offset;
- int stream_offset;
- int shoutcast;
- struct client_queue_tag *next;
-} client_queue_t;
+static int shoutcast_source_client (client_t *client);
+static int http_client_request (client_t *client);
+static int _handle_get_request (client_t *client);
+static int _handle_source_request (client_t *client);
+static int _handle_stats_request (client_t *client);
typedef struct
@@ -100,13 +98,42 @@
static volatile unsigned long _current_id = 0;
static volatile thread_type *conn_tid;
-static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue;
-static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue;
static int ssl_ok;
#ifdef HAVE_OPENSSL
static SSL_CTX *ssl_ctx;
#endif
+int header_timeout;
+
+struct _client_functions shoutcast_source_ops =
+{
+ shoutcast_source_client,
+ client_destroy
+};
+
+struct _client_functions http_request_ops =
+{
+ http_client_request,
+ client_destroy
+};
+
+struct _client_functions http_req_get_ops =
+{
+ _handle_get_request,
+ client_destroy
+};
+struct _client_functions http_req_source_ops =
+{
+ _handle_source_request,
+ client_destroy
+};
+
+struct _client_functions http_req_stats_ops =
+{
+ _handle_stats_request,
+ client_destroy
+};
+
/* filtering client connection based on IP */
cache_file_contents banned_ip, allowed_ip;
/* filtering listener connection based on useragent */
@@ -114,7 +141,6 @@
int connection_running = 0;
-static void _handle_connection(void);
static int compare_line (void *arg, void *a, void *b)
{
@@ -150,11 +176,6 @@
void connection_initialize(void)
{
thread_spin_create (&_connection_lock);
- thread_mutex_create(&move_clients_mutex);
- _req_queue = NULL;
- _req_queue_tail = &_req_queue;
- _con_queue = NULL;
- _con_queue_tail = &_con_queue;
banned_ip.contents = NULL;
banned_ip.file_mtime = 0;
@@ -177,7 +198,6 @@
if (useragents.contents) avl_tree_free (useragents.contents, free_filtered_line);
thread_spin_destroy (&_connection_lock);
- thread_mutex_destroy(&move_clients_mutex);
}
static unsigned long _next_connection_id(void)
@@ -549,163 +569,204 @@
}
-/* add client to connection queue. At this point some header information
- * has been collected, so we now pass it onto the connection thread for
- * further processing
+/* shoutcast source clients are handled specially because the protocol is limited. It is
+ * essentially a password followed by a series of headers, each on a separate line. In here
+ * we get the password and build a http request like a native source client would do
*/
-static void _add_connection (client_queue_t *node)
+static int shoutcast_source_client (client_t *client)
{
- *_con_queue_tail = node;
- _con_queue_tail = (volatile client_queue_t **)&node->next;
-}
+ do
+ {
+ if (client->con->error || client->con->discon_time <= client->worker->current_time.tv_sec)
+ break;
+ if (client->shared_data) /* need to get password first */
+ {
+ refbuf_t *refbuf = client->shared_data;
+ int remaining = PER_CLIENT_REFBUF_SIZE - 2 - refbuf->len, ret, len;
+ char *buf = refbuf->data + refbuf->len;
+ char *esc_header;
+ refbuf_t *r, *resp;
+ char header [128];
-/* this returns queued clients for the connection thread. headers are
- * already provided, but need to be parsed.
- */
-static client_queue_t *_get_connection(void)
-{
- client_queue_t *node = NULL;
+ if (remaining == 0)
+ break;
- /* common case, no new connections so don't bother taking locks */
- if (_con_queue)
- {
- node = (client_queue_t *)_con_queue;
- _con_queue = node->next;
- if (_con_queue == NULL)
- _con_queue_tail = &_con_queue;
- node->next = NULL;
- }
- return node;
+ ret = client_read_bytes (client, buf, remaining);
+ if (ret == 0 || client->con->error)
+ break;
+ if (ret < 0)
+ return 0;
+
+ buf [ret] = '\0';
+ len = strcspn (refbuf->data, "\r\n");
+ if (refbuf->data [len] == '\0') /* no EOL yet */
+ return 0;
+
+ refbuf->data [len] = '\0';
+ snprintf (header, sizeof(header), "source:%s", refbuf->data);
+ esc_header = util_base64_encode (header);
+
+ len += 1 + strspn (refbuf->data+len+1, "\r\n");
+ r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+ snprintf (r->data, PER_CLIENT_REFBUF_SIZE,
+ "SOURCE %s HTTP/1.0\r\n" "Authorization: Basic %s\r\n%s",
+ client->server_conn->shoutcast_mount, esc_header, refbuf->data+len);
+ r->len = strlen (r->data);
+ free (esc_header);
+ client->respcode = 200;
+ resp = refbuf_new (30);
+ snprintf (resp->data, 30, "OK2\r\nicy-caps:11\r\n\r\n");
+ resp->len = strlen (resp->data);
+ resp->associated = r;
+ client->refbuf = resp;
+ refbuf_release (refbuf);
+ client->shared_data = NULL;
+ INFO1 ("emulation on %s", client->server_conn->shoutcast_mount);
+ }
+ format_generic_write_to_client (client);
+ if (client->pos == client->refbuf->len)
+ {
+ refbuf_t *r = client->refbuf;
+ client->shared_data = r->associated;
+ client->refbuf = NULL;
+ r->associated = NULL;
+ refbuf_release (r);
+ client->ops = &http_request_ops;
+ client->pos = 0;
+ }
+ client->schedule_ms = client->worker->time_ms + 100;
+ return 0;
+ } while (0);
+
+ refbuf_release (client->shared_data);
+ client->shared_data = NULL;
+ return -1;
}
-/* run along queue checking for any data that has come in or a timeout */
-static void process_request_queue (void)
+static int http_client_request (client_t *client)
{
- client_queue_t **node_ref = (client_queue_t **)&_req_queue;
- ice_config_t *config = config_get_config ();
- int timeout = config->header_timeout;
- config_release_config();
+ refbuf_t *refbuf = client->shared_data;
+ int remaining = PER_CLIENT_REFBUF_SIZE - 1 - refbuf->len, ret = -1;
- while (*node_ref)
+ if (remaining && client->con->discon_time > client->worker->current_time.tv_sec)
{
- client_queue_t *node = *node_ref;
- client_t *client = node->client;
- int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset;
- char *buf = client->refbuf->data + node->offset;
+ char *buf = refbuf->data + refbuf->len;
- if (len > 0)
+ ret = client_read_bytes (client, buf, remaining);
+ if (ret > 0)
{
- if (client->con->con_time + timeout <= now)
- len = 0;
- else
- len = client_read_bytes (client, buf, len);
- }
-
- if (len > 0)
- {
- int pass_it = 1;
char *ptr;
- /* handle \n, \r\n and nsvcap which for some strange reason has
- * EOL as \r\r\n */
- node->offset += len;
- client->refbuf->data [node->offset] = '\000';
+ buf [ret] = '\0';
+ refbuf->len += ret;
+ if (memcmp (refbuf->data, "<policy-file-request/>", 23) == 0)
+ {
+ fbinfo fb;
+ fb.mount = "/flashpolicy";
+ fb.flags = FS_NORMAL|FS_USE_ADMIN;
+ fb.fallback = NULL;
+ fb.limit = 0;
+ refbuf_release (refbuf);
+ client->shared_data = NULL;
+ client->check_buffer = format_generic_write_to_client;
+ fserve_setup_client_fb (client, &fb);
+ return 1;
+ }
+ /* find a blank line */
do
{
- /* ugly hack for flash policy files */
- if (node->offset == 23 && memcmp (client->refbuf->data, "<policy-file-request/>", 23) == 0)
- break;
-
- if (node->shoutcast == 1)
- {
- /* password line */
- if (strstr (client->refbuf->data, "\r\r\n") != NULL)
- break;
- if (strstr (client->refbuf->data, "\r\n") != NULL)
- break;
- if (strstr (client->refbuf->data, "\n") != NULL)
- break;
- }
- /* stream_offset refers to the start of any data sent after the
- * http style headers, we don't want to lose those */
- ptr = strstr (client->refbuf->data, "\r\r\n\r\r\n");
+ buf = refbuf->data;
+ ptr = strstr (buf, "\r\n\r\n");
if (ptr)
{
- node->stream_offset = (ptr+6) - client->refbuf->data;
+ ptr += 4;
break;
}
- ptr = strstr (client->refbuf->data, "\r\n\r\n");
+ ptr = strstr (buf, "\n\n");
if (ptr)
{
- node->stream_offset = (ptr+4) - client->refbuf->data;
+ ptr += 2;
break;
}
- ptr = strstr (client->refbuf->data, "\n\n");
+ ptr = strstr (buf, "\r\r\n\r\r\n");
if (ptr)
{
- node->stream_offset = (ptr+2) - client->refbuf->data;
+ ptr += 6;
break;
}
- pass_it = 0;
+ client->schedule_ms = client->worker->time_ms + 100;
+ return 0;
} while (0);
-
- if (pass_it)
+ client->refbuf = client->shared_data;
+ client->shared_data = NULL;
+ client->con->discon_time = 0;
+ client->parser = httpp_create_parser();
+ httpp_initialize (client->parser, NULL);
+ if (httpp_parse (client->parser, refbuf->data, refbuf->len))
{
- if ((client_queue_t **)_req_queue_tail == &(node->next))
- _req_queue_tail = (volatile client_queue_t **)node_ref;
- *node_ref = node->next;
- node->next = NULL;
- _add_connection (node);
- continue;
+ recheck_cached_file (&useragents);
+ if (useragents.contents)
+ {
+ const char *agent = httpp_getvar (client->parser, "user-agent");
+ void *result;
+
+ if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0)
+ {
+ INFO1 ("dropping client because useragent is %s", agent);
+ return -1;
+ }
+ }
+
+ /* headers now parsed, make sure any sent content is next */
+ if (strcmp("ICE", httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL)) &&
+ strcmp("HTTP", httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL)))
+ {
+ ERROR0("Bad HTTP protocol detected");
+ return -1;
+ }
+ client->schedule_ms = client->worker->time_ms + 20;
+ auth_check_http (client);
+ switch (client->parser->req_type)
+ {
+ case httpp_req_get:
+ refbuf->len = PER_CLIENT_REFBUF_SIZE;
+ client->ops = &http_req_get_ops;
+ break;
+ case httpp_req_source:
+ client->pos = ptr - refbuf->data;
+ client->ops = &http_req_source_ops;
+ break;
+ case httpp_req_stats:
+ refbuf->len = PER_CLIENT_REFBUF_SIZE;
+ client->ops = &http_req_stats_ops;
+ break;
+ default:
+ WARN0("unhandled request type from client");
+ client_send_400 (client, "unknown request");
+ }
+ return 1;
}
+ /* invalid http request */
+ return -1;
}
- else
+ if (ret && client->con->error == 0)
{
- if (len == 0 || client->con->error)
- {
- if ((client_queue_t **)_req_queue_tail == &node->next)
- _req_queue_tail = (volatile client_queue_t **)node_ref;
- *node_ref = node->next;
- client_destroy (client);
- free (node);
- continue;
- }
+ client->schedule_ms = client->worker->time_ms + 100;
+ return 0;
}
- node_ref = &node->next;
}
- _handle_connection();
+ refbuf_release (refbuf);
+ client->shared_data = NULL;
+ return -1;
}
-/* add node to the queue of requests. This is where the clients are when
- * initial http details are read.
- */
-static void _add_request_queue (client_queue_t *node)
-{
- *_req_queue_tail = node;
- _req_queue_tail = (volatile client_queue_t **)&node->next;
-}
-
-
-static client_queue_t *_create_req_node (client_t *client)
-{
- client_queue_t *node = calloc (1, sizeof (client_queue_t));
- node->client = client;
-
- if (client->server_conn->shoutcast_compat)
- node->shoutcast = 1;
-
- return node;
-}
-
-
void *connection_thread (void *arg)
{
connection_t *con;
ice_config_t *config;
- int duration = 300;
connection_running = 1;
INFO0 ("connection thread started");
@@ -714,16 +775,17 @@
get_ssl_certificate (config);
if (config->chuid == 0)
connection_setup_sockets (config);
+ header_timeout = config->header_timeout;
config_release_config ();
while (connection_running)
{
- con = _accept_connection (duration);
+ con = _accept_connection (333);
if (con)
{
- client_queue_t *node;
client_t *client = NULL;
+ refbuf_t *r;
global_lock();
client = client_create (con, NULL);
@@ -732,9 +794,6 @@
if (client->server_conn->ssl && ssl_ok)
connection_uses_ssl (client->con);
- /* setup client for reading incoming http */
- client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000';
-
if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock))
{
WARN0 ("failed to set tcp options on client connection, dropping");
@@ -742,25 +801,23 @@
continue;
}
- node = _create_req_node (client);
- if (node == NULL)
- {
- client_destroy (client);
- continue;
- }
- _add_request_queue (node);
+ if (client->server_conn->shoutcast_compat)
+ client->ops = &shoutcast_source_ops;
+ else
+ client->ops = &http_request_ops;
+ r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+ r->len = 0;
+ client->shared_data = r;
+ client->flags |= CLIENT_ACTIVE;
+ client->con->discon_time = time(NULL) + header_timeout;
+
+ /* do a small delay here so the client has chance to send the request after
+ * getting a connect. This also prevents excessively large number of new
+ * listeners from joining at the same time */
+ thread_sleep (3000);
+ client_add_worker (client);
stats_event_inc (NULL, "connections");
- if (duration > 5)
- {
- duration = 5;
- }
}
- else
- {
- if (_req_queue == NULL)
- duration = 300; /* use longer timeouts when nothing waiting */
- }
- process_request_queue ();
}
#ifdef HAVE_OPENSSL
SSL_CTX_free (ssl_ctx);
@@ -851,12 +908,9 @@
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
- source->running = 1;
mountinfo = config_find_mount (config, source->mount);
- thread_mutex_lock (&source->lock);
source_update_settings (config, source, mountinfo);
INFO1 ("source %s is ready to start", source->mount);
- thread_mutex_unlock (&source->lock);
config_release_config();
slave_rebuild_mounts();
@@ -1018,42 +1072,42 @@
}
-static void _handle_source_request (client_t *client, const char *uri)
+static int _handle_source_request (client_t *client)
{
+ const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
+
INFO1("Source logging in at mountpoint \"%s\"", uri);
if (uri[0] != '/')
{
WARN0 ("source mountpoint not starting with /");
client_send_401 (client, NULL);
- return;
+ return 1;
}
switch (auth_check_source (client, uri))
{
- case 0: /* authenticated from config file */
- source_startup (client, uri);
+ case 0: /* authenticated from config file */
+ return source_startup (client, uri);
+ case 1: /* auth pending */
break;
-
- case 1: /* auth pending */
- break;
-
- default: /* failed */
+ default: /* failed */
INFO1("Source (%s) attempted to login with invalid or missing password", uri);
client_send_401 (client, NULL);
- break;
}
+ return 0;
}
-static void _handle_stats_request (client_t *client, char *uri)
+static int _handle_stats_request (client_t *client)
{
+ const char *uri = httpp_getvar (client->parser, HTTPP_VAR_URI);
+
if (connection_check_admin_pass (client->parser) == 0)
{
auth_add_listener (uri, client);
- return;
+ return 0;
}
-
- client->flags |= CLIENT_AUTHENTICATED;
stats_add_listener (client, STATS_ALL);
+ return 1;
}
static void check_for_filtering (ice_config_t *config, client_t *client)
@@ -1081,17 +1135,22 @@
}
-static void _handle_get_request (client_t *client, char *passed_uri)
+static int _handle_get_request (client_t *client)
{
int port;
char *serverhost = NULL;
int serverport = 0;
aliases *alias;
ice_config_t *config;
- char *uri = passed_uri;
+ char *uri = util_normalise_uri (httpp_getvar (client->parser, HTTPP_VAR_URI));
int client_limit_reached = 0;
- DEBUG1 ("start with %s", passed_uri);
+ if (uri == NULL)
+ {
+ client_send_400 (client, "invalid request URI");
+ return 0;
+ }
+ DEBUG1 ("start with %s", uri);
config = config_get_config();
check_for_filtering (config, client);
port = config->port;
@@ -1115,8 +1174,10 @@
/* Handle aliases */
while(alias) {
if(strcmp(uri, alias->source) == 0 && (alias->port == -1 || alias->port == serverport) && (alias->bind_address == NULL || (serverhost != NULL && strcmp(alias->bind_address, serverhost) == 0))) {
- uri = strdup (alias->destination);
- DEBUG2 ("alias has made %s into %s", passed_uri, uri);
+ char *newuri = strdup (alias->destination);
+ DEBUG2 ("alias has made %s into %s", uri, newuri);
+ free (uri);
+ uri = newuri;
break;
}
alias = alias->next;
@@ -1133,165 +1194,19 @@
/* Dispatch all admin requests */
if (admin_handle_request (client, uri) == 0)
{
- if (uri != passed_uri) free (uri);
- return;
+ free (uri);
+ return 0;
}
/* drop non-admin GET requests here if clients limit reached */
if (client_limit_reached)
client_send_403 (client, "Too many clients connected");
else
auth_add_listener (uri, client);
- if (uri != passed_uri) free (uri);
+ free (uri);
+ return 0;
}
-static void _handle_shoutcast_compatible (client_queue_t *node)
-{
- ice_config_t *config = config_get_config ();
- client_t *client = node->client;
- listener_t *server_conn = client->server_conn;
- char *user = "source", *ptr, *headers, *esc_header;
- mount_proxy *mountinfo = config_find_mount (config, server_conn->shoutcast_mount);
- refbuf_t *r;
- int len;
- char auth [256];
-
- if (mountinfo && mountinfo->username)
- user = mountinfo->username;
-
- ptr = client->refbuf->data;
- len = strcspn (ptr, "\r\n");
- snprintf (auth, sizeof auth, "%s:%.*s", user, len, ptr);
- config_release_config();
- ptr += len;
- headers = ptr + strspn (ptr, "\r\n");
-
- esc_header = util_base64_encode (auth);
- sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n");
- /* build a buffer in a way that an icecast2 source client would present as */
- r = refbuf_new (PER_CLIENT_REFBUF_SIZE);
- snprintf (r->data, PER_CLIENT_REFBUF_SIZE,
- "SOURCE %s HTTP/1.0\r\n" "Authorization: Basic %s\r\n%s",
- server_conn->shoutcast_mount, esc_header, headers);
- free (esc_header);
- refbuf_release (client->refbuf);
- client->refbuf = r;
- r->len = 0;
- node->shoutcast = 0;
- node->offset = strlen (r->data);
- _add_request_queue (node);
-}
-
-
-/* Connection thread. Here we take clients off the connection queue and check
- * the contents provided. We set up the parser then hand off to the specific
- * request handler.
- */
-static void _handle_connection (void)
-{
- http_parser_t *parser;
- const char *rawuri;
- client_queue_t *node;
-
- while ((node = _get_connection()))
- {
-
- if (node)
- {
- client_t *client = node->client;
-
- if (node->offset == 23 && memcmp (client->refbuf->data, "<policy-file-request/>", 23) == 0)
- {
- client->respcode = 200;
- fserve_client_create (client, "/flashpolicy");
- free (node);
- continue;
- }
- /* Check for special shoutcast compatability processing */
- if (node->shoutcast)
- {
- _handle_shoutcast_compatible (node);
- continue;
- }
-
- /* process normal HTTP headers */
- parser = httpp_create_parser();
- httpp_initialize (parser, NULL);
- client->parser = parser;
- if (httpp_parse (parser, client->refbuf->data, node->offset))
- {
- char *uri;
-
- recheck_cached_file (&useragents);
- if (useragents.contents)
- {
- const char *agent = httpp_getvar (parser, "user-agent");
- void *result;
-
- if (agent && avl_get_by_key (useragents.contents, (char *)agent, &result) == 0)
- {
- DEBUG1 ("dropping client because useragent is %s", agent);
- free (node);
- client_destroy (client);
- continue;
- }
- }
- /* we may have more than just headers, so prepare for it */
- if (node->stream_offset == node->offset)
- client->refbuf->len = 0;
- else
- {
- char *ptr = client->refbuf->data;
- client->refbuf->len = node->offset - node->stream_offset;
- memmove (ptr, ptr + node->stream_offset, client->refbuf->len);
- }
-
- rawuri = httpp_getvar (parser, HTTPP_VAR_URI);
-
- free (node);
-
- if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) &&
- strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) {
- ERROR0("Bad HTTP protocol detected");
- client_destroy (client);
- continue;
- }
-
- uri = util_normalise_uri(rawuri);
-
- if (uri == NULL)
- {
- client_destroy (client);
- continue;
- }
- auth_check_http (client);
-
- if (parser->req_type == httpp_req_source) {
- _handle_source_request (client, uri);
- }
- else if (parser->req_type == httpp_req_stats) {
- _handle_stats_request (client, uri);
- }
- else if (parser->req_type == httpp_req_get) {
- _handle_get_request (client, uri);
- }
- else {
- ERROR0("Wrong request type from client");
- client_send_400 (client, "unknown request");
- }
-
- free(uri);
- }
- else
- {
- free (node);
- ERROR0("HTTP request parsing failed");
- client_destroy (client);
- }
- }
- }
-}
-
/* close any open listening sockets and reopen new listener sockets based on the settings
* in the configuration.
*/
Modified: icecast/branches/kh/icecast/src/event.c
===================================================================
--- icecast/branches/kh/icecast/src/event.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/event.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -67,6 +67,7 @@
yp_recheck_config (config);
fserve_recheck_mime_types (config);
stats_global (config);
+ workers_adjust (config->workers_count);
config_release_config();
connection_thread_shutdown();
slave_restart();
Modified: icecast/branches/kh/icecast/src/format.c
===================================================================
--- icecast/branches/kh/icecast/src/format.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -134,30 +134,6 @@
}
-/* This is the commonly used for source streams, here we just progress to
- * the next buffer in the queue if there is no more left to be written from
- * the existing buffer.
- */
-int format_advance_queue (source_t *source, client_t *client)
-{
- refbuf_t *refbuf = client->refbuf;
-
- if (refbuf == NULL)
- return -1;
-
- if (refbuf->next == NULL && client->pos == refbuf->len)
- return -1;
-
- /* move to the next buffer if we have finished with the current one */
- if (refbuf->next && client->pos == refbuf->len)
- {
- client_set_queue (client, refbuf->next);
- refbuf = client->refbuf;
- }
- return 0;
-}
-
-
int format_prepare_headers (source_t *source, client_t *client)
{
unsigned remaining;
Modified: icecast/branches/kh/icecast/src/format.h
===================================================================
--- icecast/branches/kh/icecast/src/format.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -62,7 +62,6 @@
int format_get_plugin(format_type_t type, struct source_tag *source);
int format_generic_write_to_client (client_t *client);
-int format_advance_queue (struct source_tag *source, client_t *client);
int format_file_read (client_t *client, FILE *fp);
int format_prepare_headers (struct source_tag *source, client_t *client);
Modified: icecast/branches/kh/icecast/src/format_mp3.c
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format_mp3.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -68,8 +68,11 @@
/* client format flags */
#define CLIENT_IN_METADATA CLIENT_FORMAT_BIT
+#define CLIENT_USING_BLANK_META (CLIENT_FORMAT_BIT<<1)
+static refbuf_t blank_meta = { 17, 1, "\001StreamTitle='';", NULL, NULL, 0 };
+
int format_mp3_get_plugin (source_t *source)
{
const char *metadata;
@@ -338,19 +341,27 @@
{
metadata = associated->data + client_mp3->metadata_offset;
meta_len = associated->len - client_mp3->metadata_offset;
+ client->flags &= ~CLIENT_USING_BLANK_META;
+ refbuf_release (client_mp3->associated);
+ refbuf_addref (associated);
+ client_mp3->associated = associated;
}
else
{
- if (associated)
+ if (associated || ((client->flags & CLIENT_USING_BLANK_META) &&
+ client_mp3->metadata_offset == 0))
{
metadata = "\0";
meta_len = 1;
}
else
{
- char *meta = "\001StreamTitle='';";
+ char *meta = blank_meta.data;
metadata = meta + client_mp3->metadata_offset;
- meta_len = 17 - client_mp3->metadata_offset;
+ meta_len = blank_meta.len - client_mp3->metadata_offset;
+ client->flags |= CLIENT_USING_BLANK_META;
+ refbuf_release (client_mp3->associated);
+ client_mp3->associated = NULL;
}
}
/* if there is normal stream data to send as well as metadata then try
@@ -371,19 +382,19 @@
{
client_mp3->metadata_offset += (ret - remaining);
client->flags |= CLIENT_IN_METADATA;
+ client->schedule_ms += 100;
}
- else
- client_mp3->associated = associated;
client_mp3->since_meta_block = 0;
client->pos += remaining;
- client->lag -= remaining;
+ client->queue_pos += remaining;
return ret;
}
+ client->schedule_ms += 100;
if (ret > 0)
{
client_mp3->since_meta_block += ret;
client->pos += ret;
- client->lag -= ret;
+ client->queue_pos += ret;
}
return ret > 0 ? ret : 0;
}
@@ -391,7 +402,6 @@
if (ret == meta_len)
{
- client_mp3->associated = associated;
client_mp3->metadata_offset = 0;
client->flags &= ~CLIENT_IN_METADATA;
client_mp3->since_meta_block = 0;
@@ -399,6 +409,7 @@
}
if (ret > 0)
client_mp3->metadata_offset += ret;
+ client->schedule_ms += 100;
client->flags |= CLIENT_IN_METADATA;
return ret > 0 ? ret : 0;
@@ -456,7 +467,7 @@
{
client_mp3->since_meta_block += ret;
client->pos += ret;
- client->lag -= ret;
+ client->queue_pos += ret;
}
if (ret < (int)len)
break;
@@ -498,6 +509,7 @@
mp3_state *source_mp3 = format->_state;
char *buf;
refbuf_t *refbuf;
+ client_t *client = source->client;
if (source_mp3->read_data == NULL)
{
@@ -507,10 +519,13 @@
buf = source_mp3->read_data->data + source_mp3->read_count;
if (source_mp3->read_count < source_mp3->queue_block_size)
{
- bytes = client_read_bytes (source->client, buf, source_mp3->queue_block_size-source_mp3->read_count);
+ int read_in = source_mp3->queue_block_size - source_mp3->read_count;
+ bytes = client_read_bytes (client, buf, read_in);
+ if (bytes < read_in)
+ client->schedule_ms = client->worker->time_ms + source->skip_duration;
if (bytes < 0)
return 0;
- rate_add (format->in_bitrate, bytes, time(NULL));
+ rate_add (format->in_bitrate, bytes, client->worker->current_time.tv_sec);
}
source_mp3->read_count += bytes;
refbuf = source_mp3->read_data;
@@ -535,6 +550,7 @@
refbuf = source_mp3->read_data;
source_mp3->read_data = NULL;
+ source->client->queue_pos += refbuf->len;
if (source_mp3->update_metadata)
{
mp3_set_title (source);
@@ -650,7 +666,7 @@
else
{
ERROR0 ("Incorrect metadata format, ending stream");
- source->running = 0;
+ source->flags &= ~SOURCE_RUNNING;
refbuf_release (refbuf);
refbuf_release (meta);
return NULL;
@@ -665,6 +681,7 @@
refbuf_release (refbuf);
return NULL;
}
+ source->client->queue_pos += refbuf->len;
refbuf->associated = source_mp3->metadata;
refbuf_addref (source_mp3->metadata);
refbuf->flags |= SOURCE_BLOCK_SYNC;
Modified: icecast/branches/kh/icecast/src/format_ogg.c
===================================================================
--- icecast/branches/kh/icecast/src/format_ogg.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/format_ogg.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -376,6 +376,7 @@
* marking starting points */
if (ogg_info->codec_sync == NULL)
refbuf->flags |= SOURCE_BLOCK_SYNC;
+ source->client->queue_pos += refbuf->len;
return refbuf;
}
@@ -447,7 +448,7 @@
if (ogg_info->error)
{
ERROR0 ("Problem processing stream");
- source->running = 0;
+ source->flags &= ~SOURCE_RUNNING;
return NULL;
}
if (refbuf)
@@ -461,13 +462,15 @@
data = ogg_sync_buffer (&ogg_info->oy, 4096);
bytes = client_read_bytes (source->client, data, 4096);
+ if (bytes < 4096)
+ source->client->schedule_ms = source->client->worker->time_ms + source->skip_duration;
if (bytes <= 0)
{
ogg_sync_wrote (&ogg_info->oy, 0);
return NULL;
}
format->read_bytes += bytes;
- rate_add (format->in_bitrate, bytes, time(NULL));
+ rate_add (format->in_bitrate, bytes, source->client->worker->current_time.tv_sec);
ogg_sync_wrote (&ogg_info->oy, bytes);
}
}
@@ -562,7 +565,7 @@
if (ret > 0)
{
client->pos += ret;
- client->lag -= ret;
+ client->queue_pos += ret;
}
if (ret < (int)len)
Modified: icecast/branches/kh/icecast/src/fserve.c
===================================================================
--- icecast/branches/kh/icecast/src/fserve.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/fserve.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -62,274 +62,68 @@
#define BUFSIZE 4096
-static fserve_t *active_list = NULL;
-static volatile fserve_t *pending_list = NULL;
-
static spin_t pending_lock;
static avl_tree *mimetypes = NULL;
+static avl_tree *fh_cache = NULL;
-static volatile int run_fserv = 0;
-static unsigned int fserve_clients;
-static int client_tree_changed=0;
-
-#ifdef HAVE_POLL
-static struct pollfd *ufds = NULL;
-#else
-static fd_set fds;
-static sock_t fd_max = SOCK_ERROR;
-#endif
-
typedef struct {
char *ext;
char *type;
} mime_type;
-static void fserve_client_destroy(fserve_t *fclient);
+typedef struct {
+ fbinfo finfo;
+ mutex_t lock;
+ int refcount;
+ FILE *fp;
+} fh_node;
+
+int fserve_running;
+
static int _delete_mapping(void *mapping);
-static void *fserv_thread_function(void *arg);
-static int fserve_add_client_mount (client_t *client, const char *mount, FILE *file);
+static int prefile_send (client_t *client);
+static int file_send (client_t *client);
+static int _compare_fh(void *arg, void *a, void *b);
+static int _delete_fh (void *mapping);
void fserve_initialize(void)
{
ice_config_t *config = config_get_config();
mimetypes = NULL;
- active_list = NULL;
- pending_list = NULL;
thread_spin_create (&pending_lock);
+ fh_cache = avl_tree_new (_compare_fh, NULL);
fserve_recheck_mime_types (config);
config_release_config();
stats_event (NULL, "file_connections", "0");
+ fserve_running = 1;
INFO0("file serving started");
}
void fserve_shutdown(void)
{
- thread_spin_lock (&pending_lock);
- run_fserv = 0;
- while (pending_list)
- {
- fserve_t *to_go = (fserve_t *)pending_list;
- pending_list = to_go->next;
-
- fserve_client_destroy (to_go);
- }
- while (active_list)
- {
- fserve_t *to_go = active_list;
- active_list = to_go->next;
- fserve_client_destroy (to_go);
- }
-
+ fserve_running = 0;
if (mimetypes)
avl_tree_free (mimetypes, _delete_mapping);
-
- thread_spin_unlock (&pending_lock);
- thread_spin_destroy (&pending_lock);
- INFO0("file serving stopped");
-}
-
-#ifdef HAVE_POLL
-int fserve_client_waiting (void)
-{
- fserve_t *fclient;
- unsigned int i = 0;
-
- /* only rebuild ufds if there are clients added/removed */
- if (client_tree_changed)
+ if (fh_cache)
{
- client_tree_changed = 0;
- ufds = realloc(ufds, fserve_clients * sizeof(struct pollfd));
- fclient = active_list;
- while (fclient)
+ int count = 100;
+ while (fh_cache->length && count)
{
- ufds[i].fd = fclient->client->con->sock;
- ufds[i].events = POLLOUT;
- ufds[i].revents = 0;
- fclient = fclient->next;
- i++;
+ DEBUG1 ("waiting for %u entries to clear", fh_cache->length);
+ thread_sleep (20000);
+ count--;
}
+ avl_tree_free (fh_cache, _delete_fh);
}
- if (!ufds)
- {
- thread_spin_lock (&pending_lock);
- run_fserv = 0;
- thread_spin_unlock (&pending_lock);
- return -1;
- }
- else if (poll(ufds, fserve_clients, 200) > 0)
- {
- /* mark any clients that are ready */
- fclient = active_list;
- for (i=0; i<fserve_clients; i++)
- {
- if (ufds[i].revents & (POLLOUT|POLLHUP|POLLERR))
- fclient->ready = 1;
- fclient = fclient->next;
- }
- return 1;
- }
- return 0;
-}
-#else
-int fserve_client_waiting (void)
-{
- fserve_t *fclient;
- fd_set realfds;
- /* only rebuild fds if there are clients added/removed */
- if(client_tree_changed) {
- client_tree_changed = 0;
- FD_ZERO(&fds);
- fd_max = SOCK_ERROR;
- fclient = active_list;
- while (fclient) {
- FD_SET (fclient->client->con->sock, &fds);
- if (fclient->client->con->sock > fd_max || fd_max == SOCK_ERROR)
- fd_max = fclient->client->con->sock;
- fclient = fclient->next;
- }
- }
- /* hack for windows, select needs at least 1 descriptor */
- if (fd_max == SOCK_ERROR)
- {
- thread_spin_lock (&pending_lock);
- run_fserv = 0;
- thread_spin_unlock (&pending_lock);
- return -1;
- }
- else
- {
- struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 200000;
- /* make a duplicate of the set so we do not have to rebuild it
- * each time around */
- memcpy(&realfds, &fds, sizeof(fd_set));
- if(select(fd_max+1, NULL, &realfds, NULL, &tv) > 0)
- {
- /* mark any clients that are ready */
- fclient = active_list;
- while (fclient)
- {
- if (FD_ISSET (fclient->client->con->sock, &realfds))
- fclient->ready = 1;
- fclient = fclient->next;
- }
- return 1;
- }
- }
- return 0;
+ thread_spin_destroy (&pending_lock);
+ INFO0("file serving stopped");
}
-#endif
-static int wait_for_fds(void)
-{
- fserve_t *fclient;
- int ret;
- while (run_fserv)
- {
- /* add any new clients here */
- if (pending_list)
- {
- thread_spin_lock (&pending_lock);
-
- fclient = (fserve_t*)pending_list;
- while (fclient)
- {
- fserve_t *to_move = fclient;
- fclient = fclient->next;
- to_move->next = active_list;
- active_list = to_move;
- client_tree_changed = 1;
- fserve_clients++;
- }
- pending_list = NULL;
- thread_spin_unlock (&pending_lock);
- }
- /* drop out of here if someone is ready */
- ret = fserve_client_waiting();
- if (ret)
- return ret;
- }
- return -1;
-}
-
-static void *fserv_thread_function(void *arg)
-{
- fserve_t *fclient, **trail;
- size_t bytes;
-
- while (1)
- {
- if (wait_for_fds() < 0)
- break;
-
- fclient = active_list;
- trail = &active_list;
-
- while (fclient)
- {
- /* process this client, if it is ready */
- if (fclient->ready)
- {
- client_t *client = fclient->client;
- refbuf_t *refbuf = client->refbuf;
- fclient->ready = 0;
- if (client->pos == refbuf->len)
- {
- /* Grab a new chunk */
- if (fclient->file)
- bytes = fread (refbuf->data, 1, BUFSIZE, fclient->file);
- else
- bytes = 0;
- if (bytes == 0)
- {
- if (refbuf->next == NULL)
- {
- fserve_t *to_go = fclient;
- fclient = fclient->next;
- *trail = fclient;
- fserve_client_destroy (to_go);
- fserve_clients--;
- client_tree_changed = 1;
- continue;
- }
- refbuf = refbuf->next;
- client->refbuf->next = NULL;
- refbuf_release (client->refbuf);
- client->refbuf = refbuf;
- bytes = refbuf->len;
- }
- refbuf->len = bytes;
- client->pos = 0;
- }
-
- /* Now try and send current chunk. */
- format_generic_write_to_client (client);
-
- if (client->con->error)
- {
- fserve_t *to_go = fclient;
- fclient = fclient->next;
- *trail = fclient;
- fserve_clients--;
- fserve_client_destroy (to_go);
- client_tree_changed = 1;
- continue;
- }
- }
- trail = &fclient->next;
- fclient = fclient->next;
- }
- }
- DEBUG0 ("fserve handler exit");
- return NULL;
-}
-
/* string returned needs to be free'd */
char *fserve_content_type (const char *path)
{
@@ -377,28 +171,73 @@
return type;
}
-static void fserve_client_destroy(fserve_t *fclient)
+static int _compare_fh(void *arg, void *a, void *b)
{
- if (fclient)
- {
- if (fclient->file)
- fclose (fclient->file);
+ fh_node *x = a, *y = b;
+ int r = strcmp (x->finfo.mount, y->finfo.mount);
- if (fclient->callback)
- fclient->callback (fclient->client, fclient->arg);
- else
- if (fclient->client)
- {
- ice_config_t *config = config_get_config ();
- mount_proxy *mountinfo = config_find_mount (config, fclient->mount);
+ if (r) return r;
+ r = (int)x->finfo.flags - y->finfo.flags;
+ if (r) return r;
+ if (x->finfo.flags & FS_JINGLE)
+ return strcmp (x->finfo.fallback, y->finfo.fallback);
+ return 0;
+}
- fclient->client->flags &= ~CLIENT_AUTHENTICATED;
- auth_release_listener (fclient->client, fclient->mount, mountinfo);
- config_release_config();
- }
- free (fclient->mount);
- free (fclient);
+static int _delete_fh (void *mapping)
+{
+ fh_node *fh = mapping;
+ if (fh->refcount)
+ WARN2 ("handle for %s has refcount %d", fh->finfo.mount, fh->refcount);
+ thread_mutex_destroy (&fh->lock);
+ if (fh->fp)
+ fclose (fh->fp);
+ free (fh->finfo.mount);
+ free (fh->finfo.fallback);
+ free (fh);
+
+ return 1;
+}
+
+/* find/create handle and return it with the structure in a locked state */
+static fh_node *open_fh (fbinfo *finfo)
+{
+ fh_node *fh, *result;
+
+ if (finfo->mount == NULL)
+ finfo->mount = "";
+ fh = calloc (1, sizeof (fh_node));
+ memcpy (&fh->finfo, finfo, sizeof (fbinfo));
+ avl_tree_wlock (fh_cache);
+ if (avl_get_by_key (fh_cache, fh, (void**)&result) == 0)
+ {
+ free (fh);
+ thread_mutex_lock (&result->lock);
+ result->refcount++;
+ DEBUG2 ("refcount now %d for %s", result->refcount, result->finfo.mount);
+ avl_tree_unlock (fh_cache);
+ return result;
}
+
+ // insert new one
+ if (fh->finfo.mount[0])
+ {
+ char *fullpath= util_get_path_from_normalised_uri (fh->finfo.mount, fh->finfo.flags&FS_USE_ADMIN);
+ DEBUG1 ("lookup of \"%s\"", finfo->mount);
+ fh->fp = fopen (fullpath, "rb");
+ if (fh->fp == NULL)
+ WARN1 ("Failed to open \"%s\"", fullpath);
+ free (fullpath);
+ }
+ thread_mutex_create (&fh->lock);
+ thread_mutex_lock (&fh->lock);
+ fh->refcount = 1;
+ fh->finfo.mount = strdup (finfo->mount);
+ if (finfo->fallback)
+ fh->finfo.fallback = strdup (finfo->fallback);
+ avl_insert (fh_cache, fh);
+ avl_tree_unlock (fh_cache);
+ return fh;
}
@@ -411,17 +250,15 @@
const char *range = NULL;
off_t new_content_len = 0;
off_t rangenumber = 0, content_length;
- int ret = 0, use_admin = 0;
+ int ret = 0;
char *fullpath;
int m3u_requested = 0, m3u_file_available = 1;
int xspf_requested = 0, xspf_file_available = 1;
ice_config_t *config;
- FILE *file;
+ fh_node *fh;
+ fbinfo finfo;
- if (httpclient->parser == NULL) /* special case for specific non-http content */
- use_admin = 1;
-
- fullpath = util_get_path_from_normalised_uri (path, use_admin);
+ fullpath = util_get_path_from_normalised_uri (path, 0);
INFO2 ("checking for file %s (%s)", path, fullpath);
if (strcmp (util_get_extension (fullpath), "m3u") == 0)
@@ -493,7 +330,7 @@
);
}
httpclient->refbuf->len = strlen (httpclient->refbuf->data);
- fserve_add_client (httpclient, NULL);
+ fserve_setup_client_fb (httpclient, NULL);
free (sourceuri);
free (fullpath);
return 0;
@@ -532,8 +369,13 @@
return -1;
}
- file = fopen (fullpath, "rb");
- if (file == NULL)
+ finfo.flags = FS_NORMAL;
+ finfo.mount = (char *)path;
+ finfo.fallback = NULL;
+ finfo.limit = 0;
+
+ fh = open_fh (&finfo);
+ if (fh == NULL)
{
WARN1 ("Problem accessing file \"%s\"", fullpath);
client_send_404 (httpclient, "File not readable");
@@ -549,6 +391,7 @@
{
int bytes;
+ httpclient->intro_offset = 0;
/* full http range handling is currently not done but we deal with the common case */
if (range)
{
@@ -566,10 +409,11 @@
off_t endpos;
char *type;
- ret = fseeko (file, rangenumber, SEEK_SET);
+ ret = fseeko (fh->fp, rangenumber, SEEK_SET);
if (ret == -1)
break;
+ httpclient->intro_offset = rangenumber;
new_content_len = content_length - rangenumber;
endpos = rangenumber + new_content_len - 1;
if (endpos < 0)
@@ -600,11 +444,11 @@
else
break;
}
- else if (httpclient->parser)
+ else
{
char *type = fserve_content_type (path);
httpclient->respcode = 200;
- if (httpp_getvar (httpclient->parser, HTTPP_VAR_NO_CONTENT_LENGTH))
+ if (httpclient->flags & CLIENT_NO_CONTENT_LENGTH)
bytes = snprintf (httpclient->refbuf->data, BUFSIZE,
"HTTP/1.0 200 OK\r\n"
"Content-Type: %s\r\n"
@@ -623,86 +467,276 @@
}
httpclient->refbuf->len = bytes;
httpclient->pos = 0;
+ httpclient->shared_data = fh;
stats_event_inc (NULL, "file_connections");
- fserve_add_client_mount (httpclient, path, file);
+ thread_mutex_unlock (&fh->lock);
+ fserve_setup_client_fb (httpclient, NULL);
return 0;
} while (0);
+ thread_mutex_unlock (&fh->lock);
/* If we run into any issues with the ranges
we fallback to a normal/non-range request */
- fclose (file);
client_send_416 (httpclient);
return -1;
}
-static void fserve_add_pending (fserve_t *fclient)
+// fh must be locked before calling this
+static void fh_release (fh_node *fh)
{
- thread_spin_lock (&pending_lock);
- fclient->next = (fserve_t *)pending_list;
- pending_list = fclient;
- if (run_fserv == 0)
+ fh->refcount--;
+ if (fh->finfo.mount[0])
+ DEBUG2 ("refcount now %d on %s", fh->refcount, fh->finfo.mount);
+ if (fh->refcount)
{
- run_fserv = 1;
- DEBUG0 ("fserve handler waking up");
- thread_create("File Serving Thread", fserv_thread_function, NULL, THREAD_DETACHED);
+ thread_mutex_unlock (&fh->lock);
+ return;
}
- thread_spin_unlock (&pending_lock);
+ avl_tree_wlock (fh_cache);
+ thread_mutex_unlock (&fh->lock);
+ avl_delete (fh_cache, fh, _delete_fh);
+ avl_tree_unlock (fh_cache);
}
-
-/* Add client to fserve thread, client needs to have refbuf set and filled
- * but may provide a NULL file if no data needs to be read
- */
-static int fserve_add_client_mount (client_t *client, const char *mount, FILE *file)
+static void file_release (client_t *client)
{
- fserve_t *fclient = calloc (1, sizeof(fserve_t));
+ fh_node *fh = client->shared_data;
+ const char *mount = httpp_getvar (client->parser, HTTPP_VAR_URI);
- DEBUG0 ("Adding client to file serving engine");
- if (fclient == NULL)
+ rate_free (client->out_bitrate);
+ if (client->respcode == 200)
{
+ ice_config_t *config = config_get_config ();
+ mount_proxy *mountinfo = config_find_mount (config, mount);
+ auth_release_listener (client, mount, mountinfo);
+ config_release_config();
+ }
+ else
+ {
+ client->flags &= ~CLIENT_AUTHENTICATED;
client_destroy (client);
- return -1;
}
- fclient->file = file;
- fclient->client = client;
- if (mount)
- fclient->mount = strdup (mount);
- fclient->ready = 0;
- fserve_add_pending (fclient);
+ global_reduce_bitrate_sampling (global.out_bitrate);
+ if (fh)
+ {
+ thread_mutex_lock (&fh->lock);
+ if (fh->finfo.flags & (FS_FALLBACK|FS_JINGLE))
+ stats_event_dec (NULL, "listeners");
+ fh_release (fh);
+ }
+}
- return 0;
+
+struct _client_functions file_content_ops =
+{
+ file_send,
+ file_release
+};
+
+struct _client_functions buffer_content_ops =
+{
+ prefile_send,
+ file_release
+};
+
+
+static void fserve_move_listener (client_t *client)
+{
+ fh_node *fh = client->shared_data;
+ fbinfo f;
+
+ refbuf_release (client->refbuf);
+ client->refbuf = NULL;
+ rate_free (client->out_bitrate);
+ client->out_bitrate = NULL;
+ client->shared_data = NULL;
+ thread_mutex_lock (&fh->lock);
+ f.flags = fh->finfo.flags;
+ f.limit = fh->finfo.limit;
+ f.mount = fh->finfo.fallback;
+ f.fallback = NULL;
+ client->intro_offset = -1;
+ move_listener (client, &f);
+ fh_release (fh);
}
-int fserve_add_client (client_t *client, FILE *file)
+static int prefile_send (client_t *client)
{
- return fserve_add_client_mount (client, NULL, file);
+ refbuf_t *refbuf = client->refbuf;
+ int loop = 3, bytes;
+
+ while (loop)
+ {
+ fh_node *fh = client->shared_data;
+ loop--;
+ if (fserve_running == 0 || client->con->error)
+ return -1;
+ if (refbuf == NULL || client->pos == refbuf->len)
+ {
+ if (fh && fh->finfo.fallback)
+ {
+ fserve_move_listener (client);
+ return 0;
+ }
+ if (refbuf == NULL || refbuf->next == NULL)
+ {
+ if (fh)
+ {
+ if (fh->fp) // is there a file to read from
+ {
+ int len = fh->finfo.limit ? 1400 : 4096;
+ refbuf_t *r = refbuf_new (len);
+ refbuf_release (client->refbuf);
+ client->refbuf = r;
+ client->pos = len;
+ client->ops = &file_content_ops;
+ return 1;
+ }
+ }
+ if (client->respcode)
+ return -1;
+ client_send_404 (client, NULL);
+ thread_mutex_lock (&fh->lock);
+ fh_release (fh);
+ return 0;
+ }
+ else
+ {
+ refbuf = refbuf->next;
+ client->refbuf->next = NULL;
+ refbuf_release (client->refbuf);
+ client->refbuf = refbuf;
+ }
+ client->pos = 0;
+ }
+ bytes = format_generic_write_to_client (client);
+ if (bytes < 0)
+ {
+ client->schedule_ms = client->worker->time_ms + 150;
+ return 0;
+ }
+ global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
+ if (bytes < 4096)
+ break;
+ }
+ client->schedule_ms = client->worker->time_ms + (loop ? 50 : 15);
+ return 0;
}
-/* add client to file serving engine, but just write out the buffer contents,
- * then pass the client to the callback with the provided arg
- */
-void fserve_add_client_callback (client_t *client, fserve_callback_t callback, void *arg)
+static int file_send (client_t *client)
{
- fserve_t *fclient = calloc (1, sizeof(fserve_t));
+ refbuf_t *refbuf = client->refbuf;
+ int loop = 5, bytes;
+ fh_node *fh = client->shared_data;
- DEBUG0 ("Adding client to file serving engine");
- if (fclient == NULL)
+ if (client->con->discon_time && client->worker->current_time.tv_sec >= client->con->discon_time)
+ return -1;
+ while (loop)
{
- client_send_404 (client, "memory exhausted");
- return;
+ loop--;
+ if (fserve_running == 0 || client->con->error)
+ return -1;
+ if (fh->finfo.limit)
+ {
+ long rate = rate_avg (client->out_bitrate);
+ if (rate == 0) loop = 0;
+ if (fh->finfo.limit < rate)
+ {
+ rate_add (client->out_bitrate, 0, client->worker->time_ms);
+ client->schedule_ms = client->worker->time_ms + 150;
+ return 0;
+ }
+ }
+ if (client->pos == refbuf->len)
+ {
+ bytes = 0;
+ if (fh->finfo.fallback)
+ {
+ fserve_move_listener (client);
+ return 0;
+ }
+ if (fh->fp)
+ {
+ thread_mutex_lock (&fh->lock);
+ if (fseeko (fh->fp, client->intro_offset, SEEK_SET) == 0 &&
+ (bytes = fread (refbuf->data, 1, refbuf->len, fh->fp)) > 0)
+ {
+ refbuf->len = bytes;
+ client->intro_offset += bytes;
+ }
+ thread_mutex_unlock (&fh->lock);
+ }
+ if (bytes == 0)
+ return -1;
+ client->pos = 0;
+ }
+ bytes = client->check_buffer (client);
+ if (bytes < 0)
+ return 0;
+ global_add_bitrates (global.out_bitrate, bytes, client->worker->time_ms);
+ if (fh->finfo.limit)
+ rate_add (client->out_bitrate, bytes, client->worker->time_ms);
}
- fclient->file = NULL;
- fclient->client = client;
- fclient->ready = 0;
- fclient->callback = callback;
- fclient->arg = arg;
+ client->schedule_ms = client->worker->time_ms + 10;
+ return 1;
+}
- fserve_add_pending (fclient);
+
+void fserve_setup_client_fb (client_t *client, fbinfo *finfo)
+{
+ client->ops = &buffer_content_ops;
+ if (finfo)
+ {
+ fh_node *fh = open_fh (finfo);
+ client->shared_data = fh;
+ if (fh)
+ {
+ if (fh->finfo.limit)
+ client->out_bitrate = rate_setup (50,1000);
+ thread_mutex_unlock (&fh->lock);
+ }
+ }
+ else
+ {
+ client->check_buffer = format_generic_write_to_client;
+ }
+ client->intro_offset = 0;
+ client->flags |= CLIENT_ACTIVE;
}
+void fserve_setup_client (client_t *client, const char *mount)
+{
+ fbinfo finfo;
+ finfo.flags = FS_NORMAL;
+ finfo.mount = (char *)mount;
+ finfo.fallback = NULL;
+ finfo.limit = 0;
+ client->check_buffer = format_generic_write_to_client;
+ fserve_setup_client_fb (client, &finfo);
+}
+
+
+void fserve_set_override (const char *mount, const char *dest)
+{
+ fh_node fh, *result;
+
+ fh.finfo.flags = FS_FALLBACK;
+ fh.finfo.mount = (char *)mount;
+ fh.finfo.fallback = NULL;
+
+ avl_tree_rlock (fh_cache);
+ if (avl_get_by_key (fh_cache, &fh, (void**)&result) == 0)
+ {
+ char *tmp = result->finfo.fallback;
+ result->finfo.fallback = strdup (dest);
+ free (tmp);
+ }
+ avl_tree_unlock (fh_cache);
+}
+
static int _delete_mapping(void *mapping) {
mime_type *map = mapping;
free(map->ext);
Modified: icecast/branches/kh/icecast/src/fserve.h
===================================================================
--- icecast/branches/kh/icecast/src/fserve.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/fserve.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -18,26 +18,28 @@
typedef void (*fserve_callback_t)(client_t *, void *);
-typedef struct _fserve_t
+typedef struct _fbinfo
{
- client_t *client;
-
- FILE *file;
- int ready;
- void (*callback)(client_t *, void *);
- void *arg;
+ int flags;
+ unsigned int limit;
char *mount;
- struct _fserve_t *next;
-} fserve_t;
+ char *fallback;
+} fbinfo;
+#define FS_NORMAL 01
+#define FS_FALLBACK 02
+#define FS_USE_ADMIN 04
+#define FS_JINGLE 010
+
void fserve_initialize(void);
void fserve_shutdown(void);
int fserve_client_create(client_t *httpclient, const char *path);
-int fserve_add_client (client_t *client, FILE *file);
-void fserve_add_client_callback (client_t *client, fserve_callback_t callback, void *arg);
char *fserve_content_type (const char *path);
void fserve_recheck_mime_types (ice_config_t *config);
+void fserve_setup_client (client_t *client, const char *mount);
+void fserve_setup_client_fb (client_t *client, fbinfo *finfo);
+void fserve_set_override (const char *mount, const char *dest);
#endif
Modified: icecast/branches/kh/icecast/src/global.c
===================================================================
--- icecast/branches/kh/icecast/src/global.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/global.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -68,10 +68,10 @@
thread_mutex_unlock(&_global_mutex);
}
-void global_add_bitrates (struct rate_calc *rate, unsigned long value)
+void global_add_bitrates (struct rate_calc *rate, unsigned long value, uint64_t milli)
{
thread_spin_lock (&global.spinlock);
- rate_add (rate, value, timing_get_time());
+ rate_add (rate, value, milli);
thread_spin_unlock (&global.spinlock);
}
Modified: icecast/branches/kh/icecast/src/global.h
===================================================================
--- icecast/branches/kh/icecast/src/global.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/global.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -24,6 +24,7 @@
#include "thread/thread.h"
#include "net/sock.h"
+#include "compat.h"
typedef struct ice_global_tag
{
@@ -60,7 +61,7 @@
void global_shutdown(void);
void global_lock(void);
void global_unlock(void);
-void global_add_bitrates (struct rate_calc *rate, unsigned long value);
+void global_add_bitrates (struct rate_calc *rate, unsigned long value, uint64_t milli);
void global_reduce_bitrate_sampling (struct rate_calc *rate);
unsigned long global_getrate_avg (struct rate_calc *rate);
Modified: icecast/branches/kh/icecast/src/main.c
===================================================================
--- icecast/branches/kh/icecast/src/main.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/main.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -124,10 +124,8 @@
connection_shutdown();
config_shutdown();
- global_shutdown();
resolver_shutdown();
sock_shutdown();
- thread_shutdown();
DEBUG0 ("library cleanups");
#ifdef HAVE_CURL
@@ -138,6 +136,8 @@
_stop_logging();
log_shutdown();
xslt_shutdown();
+ thread_shutdown();
+ global_shutdown();
}
static int _parse_config_opts(int argc, char **argv, char *filename, int size)
Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/slave.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -42,8 +42,8 @@
#include "compat.h"
+#include "timing/timing.h"
#include "thread/thread.h"
-#include "timing/timing.h"
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
@@ -71,20 +71,41 @@
} redirect_host;
-static void *_slave_thread(void *arg);
+static void _slave_thread(void);
static void redirector_add (const char *server, int port, int interval);
static redirect_host *find_slave_host (const char *server, int port);
static void redirector_clearall (void);
+static int relay_startup (client_t *client);
+static int relay_read (client_t *client);
int slave_running = 0;
+int worker_count;
+int relays_connecting;
+
static volatile int update_settings = 0;
static volatile int update_all_mounts = 0;
static volatile int restart_connection_thread = 0;
static time_t streamlist_check = 0;
static rwlock_t slaves_lock;
+static spin_t relay_start_lock;
redirect_host *redirectors;
+worker_t *workers;
+rwlock_t workers_lock;
+struct _client_functions relay_client_ops =
+{
+ relay_read,
+ client_destroy
+};
+
+struct _client_functions relay_startup_ops =
+{
+ relay_startup,
+ client_destroy
+};
+
+
relay_server *relay_free (relay_server *relay)
{
relay_server *next = relay->next;
@@ -187,19 +208,25 @@
update_all_mounts = 0;
restart_connection_thread = 0;
redirectors = NULL;
+ workers = NULL;
+ worker_count = 0;
+ relays_connecting = 0;
+ thread_spin_create (&relay_start_lock);
+ thread_rwlock_create (&workers_lock);
#ifndef HAVE_CURL
ERROR0 ("streamlist request disabled, rebuild with libcurl if required");
#endif
- _slave_thread (NULL);
+ _slave_thread ();
slave_running = 0;
- thread_rwlock_destroy (&slaves_lock);
}
void slave_shutdown(void)
{
- if (!slave_running)
- return;
+ workers_adjust (0);
+ thread_rwlock_destroy (&slaves_lock);
+ thread_rwlock_destroy (&workers_lock);
+ thread_spin_destroy (&relay_start_lock);
slave_running = 0;
}
@@ -264,7 +291,7 @@
/* Actually open the connection and do some http parsing, handle any 302
* responses within here.
*/
-static client_t *open_relay_connection (relay_server *relay, relay_server_master *master)
+static int open_relay_connection (client_t *client, relay_server *relay, relay_server_master *master)
{
int redirects = 0;
char *server_id = NULL;
@@ -381,25 +408,22 @@
}
else
{
- client_t *client = NULL;
-
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
{
ERROR2("Error from relay request: %s (%s)", relay->localmount,
httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
break;
}
- global_lock ();
- client = client_create (con, parser);
- global_unlock ();
sock_set_blocking (streamsock, 0);
+ client->con = con;
+ client->parser = parser;
client_set_queue (client, NULL);
free (server);
free (mount);
free (server_id);
free (auth_header);
- return client;
+ return 0;
}
redirects++;
}
@@ -412,7 +436,7 @@
connection_close (con);
if (parser)
httpp_destroy (parser);
- return NULL;
+ return -1;
}
@@ -424,119 +448,135 @@
relay_server *relay = arg;
source_t *src = relay->source;
relay_server_master *master = relay->masters;
- client_t *client = NULL;
+ client_t *client = src->client;
mount_proxy *mountinfo;
ice_config_t *config;
INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
+ thread_rwlock_rlock (&global.shutdown_lock);
thread_mutex_lock (&src->lock);
do
{
- if (master)
- {
- thread_mutex_unlock (&src->lock);
- client = open_relay_connection (relay, master);
- thread_mutex_lock (&src->lock);
+ int ret;
- if (client == NULL)
- continue;
+ thread_mutex_unlock (&src->lock);
+ ret = open_relay_connection (client, relay, master);
+ thread_mutex_lock (&src->lock);
- src->client = client;
- src->parser = client->parser;
- }
+ if (ret < 0)
+ continue;
+ src->parser = client->parser;
if (connection_complete_source (src, 0) < 0)
{
INFO0("Failed to complete source initialisation");
- client_destroy (client);
- src->client = NULL;
continue;
}
- if (client)
- {
- stats_event_inc (NULL, "source_relay_connections");
- stats_event (relay->localmount, "source_ip", client->con->ip);
- }
+ stats_event_inc (NULL, "source_relay_connections");
- source_main (relay->source);
+ source_init (src);
+ relay->running = 1;
+ client->ops = &relay_client_ops;
+ client->flags |= CLIENT_ACTIVE;
- if (relay->on_demand == 0)
- {
- /* try next server if configured */
- if (global.running == ICE_RUNNING && relay->running && master->next)
- continue;
+ thread_spin_lock (&relay_start_lock);
+ relays_connecting--;
+ thread_spin_unlock (&relay_start_lock);
+ thread_cond_signal (&client->worker->cond);
- /* only keep refreshing YP entries for inactive on-demand relays */
- yp_remove (relay->localmount);
- relay->source->yp_public = -1;
- relay->start = time(NULL) + 10; /* prevent busy looping if failing */
- slave_update_all_mounts();
- }
-
- /* we've finished, now get cleaned up */
- thread_mutex_unlock (&src->lock);
- relay->cleanup = 1;
- slave_rebuild_mounts();
-
return NULL;
} while ((master = master->next));
- /* source should be locked here */
+ /* failed to start, better clean up and reset */
+ if (relay->on_demand)
+ src->flags &= ~SOURCE_ON_DEMAND; // disable for a short time
+ else
+ yp_remove (relay->localmount);
+
+ if (client->con)
+ connection_close (client->con);
+ client->con = NULL;
+ if (client->parser)
+ httpp_destroy (client->parser);
+ client->parser = NULL;
+
config = config_get_config();
mountinfo = config_find_mount (config, src->mount);
- if (mountinfo && mountinfo->fallback_mount)
+ if (mountinfo && mountinfo->fallback_mount && strcmp (src->mount, mountinfo->fallback_mount) != 0)
{
- source_t *fallback_source;
-
+ int left_on = 0;
+ client_t *leave_on = NULL, *check_clients, *last = NULL;
INFO1 ("failed relay, fallback to %s", mountinfo->fallback_mount);
- avl_tree_rlock(global.source_tree);
- fallback_source = source_find_mount (mountinfo->fallback_mount);
- if (fallback_source)
+ check_clients = src->client_list;
+ src->client_list = NULL;
+ src->listeners = 0;
+ while (check_clients)
{
- thread_mutex_unlock (&src->lock);
- source_move_clients (src, fallback_source);
- thread_mutex_lock (&src->lock);
+ client_t *to_move = check_clients;
+ check_clients = to_move->next;
+ if (to_move->flags & CLIENT_IS_SLAVE)
+ {
+ if (leave_on == NULL) last = to_move;
+ to_move->next = leave_on;
+ leave_on = to_move;
+ left_on++;
+ }
+ else
+ {
+ fbinfo f;
+ f.mount = mountinfo->fallback_mount;
+ f.flags = FS_NORMAL;
+ f.fallback = NULL;
+ if (src->format)
+ f.limit = rate_avg (src->format->in_bitrate);
+ else
+ f.limit = src->limit_rate;
+ thread_mutex_unlock (&src->lock);
+ move_listener (to_move, &f);
+ thread_mutex_lock (&src->lock);
+ }
}
- avl_tree_unlock (global.source_tree);
+ /* it is possible that listeners could of been moved here by now */
+ src->listeners += left_on;
+ if (last)
+ {
+ last->next = src->client_list;
+ src->client_list = leave_on;
+ }
}
config_release_config();
- source_clear_source (relay->source);
- thread_mutex_unlock (&src->lock);
+ INFO2 ("listener count still on %s is %d", src->mount, src->listeners);
+ source_clear_source (src);
+ relay->start = client->worker->current_time.tv_sec + relay->interval;
+ client->schedule_ms = timing_get_time() + 1000;
+ client->flags |= CLIENT_ACTIVE;
- /* cleanup relay, but prevent this relay from starting up again too soon */
- relay->source->on_demand = 0;
- relay->start = time(NULL) + relay->interval;
- relay->cleanup = 1;
-
+ thread_spin_lock (&relay_start_lock);
+ relays_connecting--;
+ thread_spin_unlock (&relay_start_lock);
+ thread_cond_signal (&client->worker->cond);
+ thread_rwlock_unlock (&global.shutdown_lock);
return NULL;
}
-/* wrapper for starting the provided relay stream */
static void check_relay_stream (relay_server *relay)
{
- if (relay->source == NULL)
+ source_t *source = relay->source;
+ if (source && source->client)
+ return;
+ if (source == NULL)
{
if (relay->localmount[0] != '/')
{
- WARN1 ("relay mountpoint \"%s\" does not start with /, skipping",
- relay->localmount);
+ WARN1 ("relay mountpoint \"%s\" does not start with /, skipping", relay->localmount);
return;
}
/* new relay, reserve the name */
- relay->source = source_reserve (relay->localmount);
- if (relay->source)
+ source = source_reserve (relay->localmount);
+ if (source == NULL)
{
- DEBUG1("Adding relay source at mountpoint \"%s\"", relay->localmount);
- if (relay->on_demand)
- {
- relay->start = time(NULL);
- slave_update_all_mounts();
- }
- }
- else
- {
if (relay->start == 0)
{
WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
@@ -544,91 +584,32 @@
}
return;
}
+ relay->source = source;
+ INFO1("Adding new relay at mountpoint \"%s\"", relay->localmount);
+ stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
}
- do
+ if (source->client == NULL)
{
- source_t *source = relay->source;
- /* skip relay if active, not configured or just not time yet */
- if (relay->source == NULL || relay->running || relay->start > time(NULL))
- break;
- if (relay->enable == 0)
+ client_t *client;
+ global_lock();
+ client = client_create (NULL, NULL);
+ global_unlock();
+ source->client = client;
+ client->shared_data = relay;
+ client->ops = &relay_startup_ops;
+ if (relay->on_demand)
{
- stats_event (relay->localmount, NULL, NULL);
- break;
- }
- /* check if an inactive on-demand relay has a fallback that has listeners */
- if (relay->on_demand && source->on_demand_req == 0)
- {
- ice_config_t *config = config_get_config ();
- mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
-
- source->on_demand = relay->on_demand;
-
- if (mountinfo)
- {
- if (mountinfo->fallback_mount && mountinfo->fallback_override)
- {
- source_t *fallback;
- avl_tree_rlock (global.source_tree);
- fallback = source_find_mount (mountinfo->fallback_mount);
- if (fallback && fallback->running && fallback->listeners)
- {
- DEBUG1 ("fallback running with %lu listeners", fallback->listeners);
- source->on_demand_req = 1;
- }
- avl_tree_unlock (global.source_tree);
- }
- }
- else
- {
- if (relay->start)
- {
- thread_mutex_lock (&relay->source->lock);
- source_update_settings (config, relay->source, mountinfo);
- thread_mutex_unlock (&relay->source->lock);
- relay->start = 0;
- }
- }
+ ice_config_t *config = config_get_config();
+ mount_proxy *mountinfo = config_find_mount (config, source->mount);
+ thread_mutex_lock (&source->lock);
+ source->flags |= SOURCE_ON_DEMAND;
+ source_update_settings (config, source, mountinfo);
+ thread_mutex_unlock (&source->lock);
config_release_config();
- if (source->on_demand_req == 0)
- break;
}
-
- relay->start = time(NULL) + 5;
- relay->running = 1;
- relay->thread = thread_create ("Relay Thread", start_relay_stream,
- relay, THREAD_ATTACHED);
- return;
-
- } while (0);
- /* the relay thread may of shut down itself */
- if (relay->cleanup)
- {
- if (relay->thread)
- {
- DEBUG1 ("waiting for relay thread for \"%s\"", relay->localmount);
- thread_join (relay->thread);
- relay->thread = NULL;
- }
- relay->cleanup = 0;
- relay->running = 0;
-
- if (relay->enable == 0)
- {
- stats_event (relay->localmount, NULL, NULL);
- return;
- }
- if (relay->on_demand && relay->source)
- {
- ice_config_t *config = config_get_config ();
- mount_proxy *mountinfo = config_find_mount (config, relay->localmount);
- thread_mutex_lock (&relay->source->lock);
- source_update_settings (config, relay->source, mountinfo);
- thread_mutex_unlock (&relay->source->lock);
- config_release_config ();
- stats_event (relay->localmount, "listeners", "0");
- relay->start = time(NULL);
- }
+ client->flags |= CLIENT_ACTIVE;
+ DEBUG1 ("adding relay client for %s", relay->localmount);
+ client_add_worker (client);
}
}
@@ -661,6 +642,7 @@
old->on_demand = new->on_demand;
return 0;
} while (0);
+ new->source = old->source;
return 1;
}
@@ -733,20 +715,23 @@
while (to_free)
{
- if (to_free->source)
+ relay_server *release = to_free;
+ to_free = release->next;
+
+ if (release->source && release->source->client)
{
- if (to_free->running)
+ if (release->running)
{
/* relay has been removed from xml, shut down active relay */
- DEBUG1 ("source shutdown request on \"%s\"", to_free->localmount);
- to_free->running = 0;
- to_free->source->running = 0;
- thread_join (to_free->thread);
+ DEBUG1 ("source shutdown request on \"%s\"", release->localmount);
+ release->running = 0;
+ release->source->flags &= ~SOURCE_RUNNING;
}
else
- stats_event (to_free->localmount, NULL, NULL);
+ stats_event (release->localmount, NULL, NULL);
+ continue;
}
- to_free = relay_free (to_free);
+ relay_free (release);
}
relay = to_start;
@@ -1032,13 +1017,14 @@
config = config_get_config();
update_master_as_slave (config);
stats_global (config);
+ workers_adjust (config->workers_count);
config_release_config();
source_recheck_mounts (1);
connection_thread_startup();
}
-static void *_slave_thread(void *arg)
+static void _slave_thread(void)
{
slave_startup();
@@ -1046,7 +1032,9 @@
{
relay_server *cleanup_relays = NULL;
int skip_timer = 0;
+ struct timespec current;
+ thread_get_timespec (¤t);
/* re-read xml file if requested */
if (global . schedule_config_reread)
{
@@ -1054,13 +1042,12 @@
global . schedule_config_reread = 0;
}
- global_add_bitrates (global.out_bitrate, 0L);
+ global_add_bitrates (global.out_bitrate, 0L, THREAD_TIME_MS(¤t));
if (global.running != ICE_RUNNING)
break;
- /* only update relays lists from master when required */
- if (streamlist_check <= time(NULL))
+ if (streamlist_check <= current.tv_sec)
{
ice_config_t *config;
@@ -1070,7 +1057,7 @@
thread_mutex_lock (&(config_locks()->relay_lock));
config = config_get_config();
- streamlist_check = time(NULL) + config->master_update_interval;
+ streamlist_check = current.tv_sec + config->master_update_interval;
update_master_as_slave (config);
update_from_master (config);
@@ -1115,8 +1102,6 @@
thread_rwlock_wlock (&global.shutdown_lock);
thread_rwlock_unlock (&global.shutdown_lock);
INFO0 ("Slave thread shutdown complete");
-
- return NULL;
}
@@ -1229,3 +1214,108 @@
redirect->server, redirect->port);
}
+
+static int relay_read (client_t *client)
+{
+ relay_server *relay = client->shared_data;
+ source_t *source = relay->source;
+
+ thread_mutex_lock (&source->lock);
+ if (source_running (source))
+ {
+ if (relay->enable == 0)
+ source->flags &= ~SOURCE_RUNNING;
+ if (relay->on_demand && source->listeners == 0)
+ source->flags &= ~SOURCE_RUNNING;
+ source_read (source);
+ return 0;
+ }
+ DEBUG2 ("counts are %d and %d", source->termination_count, source->listeners);
+ if (relay->running)
+ {
+ client_t *listener;
+
+ INFO1 ("standing by to restart relay on %s", relay->localmount);
+ source_shutdown (source, 0);
+
+ /* the relay itself closed, but may want restarting */
+ listener = source->client_list;
+ while (listener)
+ {
+ client_set_queue (listener, NULL);
+ listener = listener->next;
+ }
+ client->ops = &relay_startup_ops;
+ relay->running = 0;
+ global_reduce_bitrate_sampling (global.out_bitrate);
+ thread_mutex_unlock (&source->lock);
+ thread_rwlock_unlock (&global.shutdown_lock);
+ return 0;
+ }
+ if ((source->flags & SOURCE_TERMINATING) == 0)
+ {
+ source_shutdown (source, 1);
+ source->flags |= SOURCE_TERMINATING;
+ }
+ if (source->termination_count)
+ {
+ thread_mutex_unlock (&source->lock);
+ return 0;
+ }
+ INFO1 ("shutting down relay %s", relay->localmount);
+ thread_mutex_unlock (&source->lock);
+ stats_event (relay->localmount, NULL, NULL);
+ global_reduce_bitrate_sampling (global.out_bitrate);
+ thread_rwlock_unlock (&global.shutdown_lock);
+ relay_free (relay);
+ return -1;
+}
+
+
+static int relay_startup (client_t *client)
+{
+ relay_server *relay = client->shared_data;
+
+ if (global.running != ICE_RUNNING)
+ return -1;
+ if (relay->enable == 0 || relay->start > client->worker->current_time.tv_sec)
+ {
+ client->schedule_ms = client->worker->time_ms + 1000;
+ return 0;
+ }
+
+ if (relay->on_demand)
+ {
+ source_t *src = relay->source;
+ src->flags |= SOURCE_ON_DEMAND;
+ if (src->listeners == 0)
+ {
+ client->schedule_ms = client->worker->time_ms + 1000;
+ return 0;
+ }
+ if (client->worker->current_time.tv_sec % 10 == 0)
+ {
+ mount_proxy * mountinfo = config_find_mount (config_get_config(), src->mount);
+ if (mountinfo && mountinfo->fallback_mount)
+ source_set_override (mountinfo->fallback_mount, src->mount);
+ config_release_config();
+ }
+ DEBUG1 ("Detected listeners on relay %s", relay->localmount);
+ }
+
+ /* limit the number of relays starting up at the same time */
+ thread_spin_lock (&relay_start_lock);
+ if (relays_connecting > 3)
+ {
+ thread_spin_unlock (&relay_start_lock);
+ client->schedule_ms = client->worker->time_ms + 2000;
+ return 0;
+ }
+ relays_connecting++;
+ thread_spin_unlock (&relay_start_lock);
+
+ client->flags &= ~CLIENT_ACTIVE;
+ thread_create ("Relay Thread", start_relay_stream, relay, THREAD_DETACHED);
+ return 0;
+}
+
Modified: icecast/branches/kh/icecast/src/slave.h
===================================================================
--- icecast/branches/kh/icecast/src/slave.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/slave.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -21,7 +21,7 @@
void slave_update_all_mounts (void);
void slave_rebuild_mounts (void);
relay_server *slave_find_relay (relay_server *relays, const char *mount);
-int redirect_client (const char *mountpoint, struct _client_tag *client);
+int redirect_client (const char *mountpoint, client_t *client);
void redirector_update (struct _client_tag *client);
relay_server *relay_free (relay_server *relay);
Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/source.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -35,7 +35,6 @@
#endif
#include "thread/thread.h"
-#include "timing/timing.h"
#include "avl/avl.h"
#include "httpp/httpp.h"
#include "net/sock.h"
@@ -60,17 +59,22 @@
#define MAX_FALLBACK_DEPTH 10
-mutex_t move_clients_mutex;
/* avl tree helper */
static int _compare_clients(void *compare_arg, void *a, void *b);
static void _parse_audio_info (source_t *source, const char *s);
-static void source_shutdown (source_t *source);
-static void process_listeners (source_t *source, int fast_clients_only, int deletion_expected);
+static void source_client_release (client_t *client);
+static void source_listener_release (client_t *client);
+static int source_client_read (client_t *client);
+static int source_client_shutdown (client_t *client);
+static int source_client_http_send (client_t *client);
+static int send_to_listener (client_t *client);
-static int http_source_listener (source_t *source, client_t *client);
-static int http_source_intro (source_t *source, client_t *client);
+static int http_source_listener (client_t *client);
+static int http_source_intro (client_t *client);
static int locate_start_on_queue (source_t *source, client_t *client);
+static void listener_change_worker (client_t *client, source_t *source);
+static void source_change_worker (source_t *source);
#ifdef _WIN32
#define source_run_script(x,y) WARN0("on [dis]connect scripts disabled");
@@ -78,6 +82,37 @@
static void source_run_script (char *command, char *mountpoint);
#endif
+struct _client_functions source_client_ops =
+{
+ source_client_read,
+ NULL
+};
+
+struct _client_functions source_client_halt_ops =
+{
+ source_client_shutdown,
+ source_client_release
+};
+
+struct _client_functions listener_client_ops =
+{
+ send_to_listener,
+ source_listener_release
+};
+
+struct _client_functions listener_pause_ops =
+{
+ NULL,
+ NULL
+};
+
+struct _client_functions source_client_http_ops =
+{
+ source_client_http_send,
+ source_client_release
+};
+
+
/* Allocate a new source with the stated mountpoint, if one already
* exists with that mountpoint in the global source tree then return
* NULL.
@@ -102,7 +137,6 @@
/* make duplicates for strings or similar */
src->mount = strdup (mount);
- src->avg_bitrate_duration = 60;
src->listener_send_trigger = 10000;
thread_mutex_create (&src->lock);
@@ -197,24 +231,13 @@
void source_clear_source (source_t *source)
{
- int i;
+ int i, do_twice = 0;
ice_config_t *config;
mount_proxy *mountinfo;
refbuf_t *p;
DEBUG1 ("clearing source \"%s\"", source->mount);
- /* log bytes read in access log */
- if (source->client)
- {
- source->client->flags &= ~CLIENT_AUTHENTICATED;
- if (source->format)
- source->client->con->sent_bytes = source->format->read_bytes;
- }
- client_destroy(source->client);
- source->client = NULL;
- source->parser = NULL;
-
if (source->dumpfile)
{
INFO1 ("Closing dumpfile for %s", source->mount);
@@ -223,13 +246,16 @@
}
/* lets drop any listeners still connected */
+ DEBUG2 ("source %s has %d clients to release", source->mount, source->listeners);
i = 0;
config = config_get_config ();
mountinfo = config_find_mount (config, source->mount);
- while (source->active_clients)
+ while (source->client_list)
{
- client_t *client = source->active_clients;
- source->active_clients = client->next;
+ client_t *client;
+
+ client = source->client_list;
+ source->client_list = client->next;
client->next = NULL;
/* do not count listeners who have joined but haven't done any processing */
if (client->respcode == 200)
@@ -238,40 +264,36 @@
}
config_release_config ();
if (i)
+ {
stats_event_sub (NULL, "listeners", i);
+ stats_event_sub (source->mount, "listeners", i);
+ }
- source->fast_clients_p = &source->active_clients;
-
format_free_plugin (source->format);
source->format = NULL;
/* flush out the stream data, we don't want any left over */
- /* first remove the reference for refbufs marked for burst */
- p = source->burst_point;
- while (p)
- {
- refbuf_t *to_go = p;
- p = to_go->next;
- to_go->next = NULL;
- refbuf_release (to_go);
- }
- source->burst_point = NULL;
- /* then the normal queue, there could be listeners still pending */
+ /* the source holds a reference on the very latest so that one
+ * always exists */
+ refbuf_release (source->stream_data_tail);
+
+ /* remove the reference for buffers on the queue */
p = source->stream_data;
while (p)
{
refbuf_t *to_go = p;
p = to_go->next;
to_go->next = NULL;
- if (to_go->_count > 1)
- WARN1 ("buffer is %d", to_go->_count);
+ // DEBUG1 ("queue refbuf count is %d", to_go->_count);
+ if (do_twice || to_go == source->burst_point)
+ { /* burst data is also counted */
+ refbuf_release (to_go);
+ do_twice = 1;
+ }
refbuf_release (to_go);
}
- /* the source holds 2 references on the very latest so that one
- * always exists */
- if (p)
- refbuf_release (p);
+ source->burst_point = NULL;
source->stream_data = NULL;
source->stream_data_tail = NULL;
@@ -281,7 +303,6 @@
source->queue_size_limit = 0;
source->listeners = 0;
source->prev_listeners = 0;
- source->shoutcast_compat = 0;
source->client_stats_update = 0;
util_dict_free (source->audio_info);
source->audio_info = NULL;
@@ -295,32 +316,44 @@
source->intro_file = NULL;
}
- source->on_demand_req = 0;
+ source->flags &= ~SOURCE_ON_DEMAND_REQ;
+ thread_mutex_unlock (&source->lock);
}
-/* Remove the provided source from the global tree and free it */
-void source_free_source (source_t *source)
+/* the internal free function. at this point we know the source is
+ * not on the source tree */
+static int _free_source (void *p)
{
- DEBUG1 ("freeing source \"%s\"", source->mount);
- avl_tree_wlock (global.source_tree);
- avl_delete (global.source_tree, source, NULL);
- avl_tree_unlock (global.source_tree);
+ source_t *source = p;
+ source_clear_source (source);
/* make sure all YP entries have gone */
yp_remove (source->mount);
/* There should be no listeners on this mount */
- if (source->active_clients)
+ if (source->client_list)
WARN1("active listeners on mountpoint %s", source->mount);
thread_mutex_destroy (&source->lock);
+ INFO1 ("freeing source \"%s\"", source->mount);
free (source->mount);
free (source);
+ return 1;
}
+/* Remove the provided source from the global tree and free it */
+void source_free_source (source_t *source)
+{
+ avl_tree_wlock (global.source_tree);
+ thread_mutex_lock (&source->lock); /* listeners may of been added */
+ avl_delete (global.source_tree, source, _free_source);
+ avl_tree_unlock (global.source_tree);
+}
+
+
client_t *source_find_client(source_t *source, int id)
{
client_t fakeclient, *client = NULL;
@@ -329,7 +362,7 @@
fakeclient.con = &fakecon;
fakeclient.con->id = id;
- client = source->active_clients;
+ client = source->client_list;
while (client)
{
if (_compare_clients (NULL, client, &fakeclient) == 0)
@@ -340,99 +373,6 @@
}
-/* Move clients from source to dest provided dest is running
- * and that the stream format is the same.
- * The only lock that should be held when this is called is the
- * source tree lock
- */
-void source_move_clients (source_t *source, source_t *dest)
-{
- unsigned long count = 0;
- if (strcmp (source->mount, dest->mount) == 0)
- {
- WARN1 ("src and dst are the same \"%s\", skipping", source->mount);
- return;
- }
- /* we don't want the two write locks to deadlock in here */
- thread_mutex_lock (&move_clients_mutex);
- thread_mutex_lock (&dest->lock);
-
- /* if the destination is not running then we can't move clients */
- if (dest->running == 0 && dest->on_demand == 0)
- {
- WARN1 ("destination mount %s not running, unable to move clients ", dest->mount);
- thread_mutex_unlock (&dest->lock);
- thread_mutex_unlock (&move_clients_mutex);
- return;
- }
-
- do
- {
- client_t *leave_list = NULL;
-
- thread_mutex_lock (&source->lock);
-
- if (source->on_demand == 0 && source->format == NULL)
- {
- INFO1 ("source mount %s is not available", source->mount);
- break;
- }
- if (source->format && dest->format)
- {
- if (source->format->type != dest->format->type)
- {
- WARN2 ("stream %s and %s are of different types, ignored", source->mount, dest->mount);
- break;
- }
- }
-
- /* we need to move the active listeners */
- while (source->active_clients)
- {
- client_t *client = source->active_clients;
- source->active_clients = client->next;
-
- /* don't move known slave relays to streams which are not timed (fallback file) */
- if (dest->client == NULL && (client->flags & CLIENT_IS_SLAVE))
- {
- client->next = leave_list;
- leave_list = client;
- continue;
- }
- /* when switching a client to a different queue, be wary of the
- * refbuf it's referring to, if it's http headers then we need
- * to write them so don't release it.
- */
- if (client->check_buffer != http_source_listener)
- {
- client_set_queue (client, NULL);
- client->check_buffer = http_source_intro;
- if (source->client == NULL)
- client->intro_offset = -1;
- }
-
- client->next = dest->active_clients;
- dest->active_clients = client;
- count++;
- }
- source->active_clients = leave_list;
- INFO2 ("passing %lu listeners to \"%s\"", count, dest->mount);
-
- dest->listeners += count;
- source->listeners -= count;
- stats_event_args (source->mount, "listeners", "%lu", source->listeners);
-
- } while (0);
-
- thread_mutex_unlock (&source->lock);
- /* see if we need to wake up an on-demand relay */
- if (dest->running == 0 && dest->on_demand && count)
- dest->on_demand_req = 1;
-
- thread_mutex_unlock (&dest->lock);
- thread_mutex_unlock (&move_clients_mutex);
-}
-
/* Update stats from source processing, this should be called regulary (every
* few seconds) to keep totals up to date.
*/
@@ -441,7 +381,6 @@
unsigned long incoming_rate = 8 * rate_avg (source->format->in_bitrate);
unsigned long kbytes_sent = source->bytes_sent_since_update/1024;
unsigned long kbytes_read = source->bytes_read_since_update/1024;
- time_t now = time(NULL);
source->format->sent_bytes += kbytes_sent*1024;
stats_event_args (source->mount, "outgoing_kbitrate", "%ld",
@@ -454,48 +393,16 @@
stats_event_args (source->mount, "total_mbytes_sent",
"%"PRIu64, source->format->sent_bytes/(1024*1024));
if (source->client)
+ {
+ worker_t *worker = source->client->worker;
stats_event_args (source->mount, "connected", "%"PRIu64,
- (uint64_t)(now - source->client->con->con_time));
+ (uint64_t)(worker->current_time.tv_sec - source->client->con->con_time));
+ }
stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
source->bytes_sent_since_update %= 1024;
source->bytes_read_since_update %= 1024;
-
- if (source->running && source->limit_rate)
- {
- if (incoming_rate >= source->limit_rate)
- {
- /* when throttling, we perform a sleep so that the input is not read as quickly, we
- * don't do precise timing here as this just makes sure the incoming bitrate is not
- * excessive. lower bitrate stream have higher sleep counts */
- float kbits = incoming_rate/8.0f;
- if (kbits < 1200)
- kbits = 1200;
- source->throttle_stream = (int)(1000000 / kbits * 1000);
-
- /* if bitrate is consistently excessive then terminate the stream */
- if (source->throttle_termination == 0)
- {
- source->throttle_termination = now + source->avg_bitrate_duration/2;
- /* DEBUG1 ("throttle termination set at %ld", source->throttle_termination); */
- }
- else
- {
- if (now >= source->throttle_termination)
- {
- source->running = 0;
- WARN3 ("%s terminating, exceeding bitrate limits (%dk/%dk)",
- source->mount, incoming_rate/1024, source->limit_rate/1024);
- }
- }
- }
- else
- {
- source->throttle_stream = 0;
- source->throttle_termination = 0;
- }
- }
}
@@ -503,61 +410,66 @@
* and sent back, however NULL is also valid as in the case of a short
* timeout and there's no data pending.
*/
-static void get_next_buffer (source_t *source)
+void source_read (source_t *source)
{
+ client_t *client = source->client;
refbuf_t *refbuf = NULL;
- int no_delay_count = 0;
+ int skip = 1;
+ int loop = 3;
+ time_t current = client->worker->current_time.tv_sec;
+ int fds = 0;
- while (global.running == ICE_RUNNING && source->running)
+ if (global.running != ICE_RUNNING)
+ source->flags &= ~SOURCE_RUNNING;
+ do
{
- int fds = 0;
- time_t current = time(NULL);
- int delay = 200;
-
- source->amount_added_to_queue = 0;
-
- /* service fast clients but jump out once in a while to check on
- * normal clients */
- if (no_delay_count == 10)
- return;
-
- if (*source->fast_clients_p)
+ if (source->flags & SOURCE_TEMPORARY_FALLBACK && source->fallback.mount &&
+ source->termination_count == 0)
{
- delay = 0;
- no_delay_count++;
+ DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
+ free (source->fallback.mount);
+ source->fallback.mount = NULL;
+ source->flags &= ~SOURCE_TEMPORARY_FALLBACK;
}
-
- thread_mutex_unlock (&source->lock);
-
- if (source->throttle_stream > 0)
- thread_sleep (source->throttle_stream);
-
- if (source->client)
- fds = util_timed_wait_for_fd (source->client->con->sock, delay);
- else
+ if (source->prev_listeners != source->listeners)
{
- thread_sleep (delay*1000);
- source->last_read = current;
+ INFO2("listener count on %s now %lu", source->mount, source->listeners);
+ source->prev_listeners = source->listeners;
+ stats_event_args (source->mount, "listeners", "%lu", source->listeners);
+ if (source->listeners > source->peak_listeners)
+ {
+ source->peak_listeners = source->listeners;
+ stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
+ }
}
-
- /* take the lock */
- thread_mutex_lock (&source->lock);
-
if (source->client && current >= source->client_stats_update)
{
+ source_change_worker (source);
update_source_stats (source);
source->client_stats_update = current + source->stats_interval;
+ thread_mutex_unlock (&source->lock);
+ return;
}
+ if (source->limit_rate)
+ {
+ unsigned long incoming_rate = 8 * rate_avg (source->format->in_bitrate);
+
+ if (source->limit_rate < incoming_rate)
+ {
+ rate_add (source->format->in_bitrate, 0, current);
+ break;
+ }
+ }
+ fds = util_timed_wait_for_fd (client->con->sock, 0);
if (fds < 0)
{
if (! sock_recoverable (sock_error()))
{
WARN0 ("Error while waiting on socket, Disconnecting source");
- source->running = 0;
+ source->flags &= ~SOURCE_RUNNING;
}
- continue;
+ break;
}
-
if (fds == 0)
{
if (source->last_read + (time_t)source->timeout < current)
@@ -565,82 +477,160 @@
DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
source->timeout, (long)current);
WARN0 ("Disconnecting source due to socket timeout");
- source->running = 0;
+ source->flags &= ~SOURCE_RUNNING;
break;
}
- if (delay == 0)
- {
- process_listeners (source, 1, 0);
- continue;
- }
rate_add (source->format->in_bitrate, 0, current);
+ if (source->skip_duration < 60)
+ source->skip_duration = 80;
+ else
+ source->skip_duration = (long)(source->skip_duration *1.3);
break;
}
+ source->skip_duration = (long)(source->skip_duration * 0.5);
+
+ skip = 0;
source->last_read = current;
- refbuf = source->format->get_buffer (source);
- if (refbuf)
+ do
{
- source->bytes_read_since_update += refbuf->len;
- source->amount_added_to_queue = refbuf->len;
+ refbuf = source->format->get_buffer (source);
+ if (refbuf)
+ {
+ source->bytes_read_since_update += refbuf->len;
- /* the latest refbuf is counted twice so that it stays */
- refbuf_addref (refbuf);
+ /* the latest refbuf is counted twice so that it stays */
+ refbuf_addref (refbuf);
- /* append buffer to the in-flight data queue, */
- if (source->stream_data == NULL)
- {
- source->stream_data = refbuf;
- source->burst_point = refbuf;
- source->burst_offset = 0;
- }
- if (source->stream_data_tail)
- {
- source->stream_data_tail->next = refbuf;
- refbuf_release (source->stream_data_tail);
- }
- source->stream_data_tail = refbuf;
- source->queue_size += refbuf->len;
+ /* append buffer to the in-flight data queue, */
+ if (source->stream_data == NULL)
+ {
+ source->stream_data = refbuf;
+ source->burst_point = refbuf;
+ source->burst_offset = 0;
+ }
+ if (source->stream_data_tail)
+ {
+ source->stream_data_tail->next = refbuf;
+ refbuf_release (source->stream_data_tail);
+ }
+ source->stream_data_tail = refbuf;
+ source->queue_size += refbuf->len;
- /* increase refcount for keeping burst data */
- refbuf_addref (refbuf);
+ /* increase refcount for keeping burst data */
+ refbuf_addref (refbuf);
- /* move the starting point for new listeners */
- source->burst_offset += refbuf->len;
- while (source->burst_offset > source->burst_size)
+ /* move the starting point for new listeners */
+ source->burst_offset += refbuf->len;
+ while (source->burst_offset > source->burst_size)
+ {
+ refbuf_t *to_release = source->burst_point;
+ if (to_release && to_release->next)
+ {
+ source->burst_offset -= to_release->len;
+ source->burst_point = to_release->next;
+ refbuf_release (to_release);
+ continue;
+ }
+ break;
+ }
+
+ /* save stream to file */
+ if (source->dumpfile && source->format->write_buf_to_file)
+ source->format->write_buf_to_file (source, refbuf);
+ }
+ else
{
- refbuf_t *to_release = source->burst_point;
- if (to_release->next)
+ if (client->con->error)
{
- source->burst_offset -= to_release->len;
- source->burst_point = to_release->next;
- refbuf_release (to_release);
- continue;
+ INFO1 ("End of Stream %s", source->mount);
+ source->flags &= ~SOURCE_RUNNING;
+ skip = 1;
}
break;
}
+ loop--;
+ } while (loop);
+ if (loop == 0)
+ client->schedule_ms += 20;
- /* save stream to file */
- if (source->dumpfile && source->format->write_buf_to_file)
- source->format->write_buf_to_file (source, refbuf);
+ /* lets see if we have too much data in the queue */
+ while (source->queue_size > source->queue_size_limit ||
+ (source->stream_data && source->stream_data->_count == 1))
+ {
+ refbuf_t *to_go = source->stream_data;
+ source->stream_data = to_go->next;
+ source->queue_size -= to_go->len;
+ to_go->next = NULL;
+ /* mark for delete to tell others holding it and release it ourselves */
+ to_go->flags |= SOURCE_BLOCK_RELEASE;
+ refbuf_release (to_go);
}
+ } while (0);
+
+ if (skip)
+ client->schedule_ms = client->worker->time_ms + source->skip_duration;
+ thread_mutex_unlock (&source->lock);
+}
+
+
+static int source_client_read (client_t *client)
+{
+ source_t *source = client->shared_data;
+
+ thread_mutex_lock (&source->lock);
+ if (source_running (source))
+ source_read (source);
+ else
+ {
+ if ((source->flags & SOURCE_TERMINATING) == 0)
+ {
+ source_shutdown (source, 1);
+ source->flags |= SOURCE_TERMINATING;
+ }
+ DEBUG2 ("counts are %d and %d", source->termination_count, source->listeners);
+ if (source->termination_count)
+ client->schedule_ms = client->worker->time_ms + 200;
else
{
- if (source->client->con && source->client->con->error)
- {
- INFO1 ("End of Stream %s", source->mount);
- source->running = 0;
- }
+ /* all the source listeners are stopped but still on the handlers */
+ DEBUG0 ("source client shutting down");
+ /* move them to possible fallback */
+ client->ops = &source_client_halt_ops;
+ free (source->fallback.mount);
+ source->fallback.mount = NULL;
+ stats_event (source->mount, NULL, NULL);
}
- break;
+ thread_mutex_unlock (&source->lock);
}
+ return 0;
}
+static int source_queue_advance (client_t *client)
+{
+ source_t *source = client->shared_data;
+ refbuf_t *refbuf;
+
+ if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
+ return -1;
+ refbuf = client->refbuf;
+
+ /* move to the next buffer if we have finished with the current one */
+ if (client->pos == refbuf->len)
+ {
+ if (refbuf->next == NULL)
+ return -1;
+ client_set_queue (client, refbuf->next);
+ }
+ return source->format->write_buf_to_client (client);
+}
+
+
static int locate_start_on_queue (source_t *source, client_t *client)
{
refbuf_t *refbuf;
long lag = 0;
- int ret = -1;
+ int ret = -1, pos = 0;
/* we only want to attempt a burst at connection time, not midstream
* however streams like theora may not have the most recent page marked as
@@ -650,8 +640,8 @@
refbuf = source->stream_data_tail;
if (client->intro_offset == -1 && (refbuf->flags & SOURCE_BLOCK_SYNC))
{
- refbuf = source->stream_data_tail;
- lag = refbuf->len;
+ pos = refbuf->len;
+ lag = 0;
}
else
{
@@ -673,10 +663,9 @@
if (refbuf->flags & SOURCE_BLOCK_SYNC)
{
client_set_queue (client, refbuf);
- client->check_buffer = format_advance_queue;
- client->write_to_client = source->format->write_buf_to_client;
client->intro_offset = -1;
- client->lag = lag;
+ client->pos = pos;
+ client->queue_pos = source->client->queue_pos - lag;
ret = 0;
break;
}
@@ -687,24 +676,16 @@
}
-static int http_source_intro (source_t *source, client_t *client)
+static int http_source_intro (client_t *client)
{
- refbuf_t *refbuf = client->refbuf;
+ source_t *source = client->shared_data;
- if (refbuf == NULL)
+ if (client->refbuf == NULL && source->intro_file)
{
- /* client refers to no data, must be from a move */
- if (source->client)
- {
- locate_start_on_queue (source, client);
- return -1;
- }
- /* source -> file fallback, need a refbuf for data */
- refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
- client->refbuf = refbuf;
- client->pos = refbuf->len;
+ client->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE);
+ client->pos = client->refbuf->len;
client->intro_offset = 0;
- client->lag = 0;
+ client->queue_pos = 0;
}
if (format_file_read (client, source->intro_file) < 0)
{
@@ -712,28 +693,39 @@
{
/* better find the right place in queue for this client */
client_set_queue (client, NULL);
- return locate_start_on_queue (source, client);
+ client->check_buffer = source_queue_advance;
+ return 0;
}
client->intro_offset = 0; /* replay intro file */
+ client->schedule_ms = client->worker->time_ms + 100;
return -1;
}
- return 0;
+ client->schedule_ms = client->worker->time_ms + 20;
+ return source->format->write_buf_to_client (client);
}
-static int http_source_listener (source_t *source, client_t *client)
+static int http_source_listener (client_t *client)
{
refbuf_t *refbuf = client->refbuf;
+ source_t *source = client->shared_data;
if (refbuf == NULL)
+ {
+ client->check_buffer = http_source_intro;
return -1;
+ }
if (client->respcode == 0)
{
+ if (source_running (source) == 0)
+ {
+ client->schedule_ms = client->worker->time_ms + 200;
+ return 0;
+ }
if (format_prepare_headers (source, client) < 0)
{
ERROR0 ("internal problem, dropping client");
- client->con->error = 1;
return -1;
}
client->respcode = 200;
@@ -743,178 +735,134 @@
}
if (client->pos == refbuf->len)
{
- client->write_to_client = source->format->write_buf_to_client;
client->check_buffer = http_source_intro;
client->intro_offset = 0;
client->pos = refbuf->len = 4096;
+ client->con->sent_bytes = 0;
return -1;
}
- return 0;
+ client->schedule_ms = client->worker->time_ms + 10;
+ return format_generic_write_to_client (client);
}
-/* general send routine per listener. The deletion_expected tells us whether
- * the last in the queue is about to disappear, so if this client is still
- * referring to it after writing then drop the client as it's fallen too far
- * behind
- *
- * return 1 for fast client, limiter kicked in
- * 0 for normal case.
+/* general send routine per listener.
*/
-static int send_to_listener (source_t *source, client_t *client, int deletion_expected)
+static int send_to_listener (client_t *client)
{
int bytes;
int loop = 8; /* max number of iterations in one go */
long total_written = 0;
+ int ret = 0;
+ source_t *source = client->shared_data;
+ if (client->con->error)
+ return -1;
+ if (source->fallback.mount)
+ {
+ thread_mutex_lock (&source->lock);
+
+ if (client->flags & CLIENT_IS_SLAVE)
+ client->schedule_ms = client->worker->time_ms + 100;
+ else
+ {
+ // remove from the sources client list
+ client_t **pnext = &source->client_list;
+ while (*pnext && *pnext != client)
+ pnext = &((*pnext)->next);
+ *pnext = client->next;
+ if (client->check_buffer != http_source_listener)
+ {
+ client_set_queue (client, NULL);
+ client->check_buffer = source->format->write_buf_to_client;
+ }
+ thread_mutex_unlock (&source->lock);
+ move_listener (client, &source->fallback);
+ thread_mutex_lock (&source->lock);
+ source->listeners--;
+ }
+ source->termination_count--;
+ thread_mutex_unlock (&source->lock);
+ return 0;
+ }
+ if (source->flags & SOURCE_TERMINATING)
+ {
+ thread_mutex_lock (&source->lock);
+ DEBUG1 ("termination_count now %d", source->termination_count);
+ client_set_queue (client, NULL);
+ client->ops = &listener_pause_ops;
+ source->termination_count--;
+ thread_mutex_unlock (&source->lock);
+ return -1;
+ }
/* check for limited listener time */
- if (client->con->discon_time && time(NULL) >= client->con->discon_time)
+ if (client->con->discon_time &&
+ client->worker->current_time.tv_sec >= client->con->discon_time)
{
INFO1 ("time limit reached for client #%lu", client->con->id);
- client->con->error = 1;
+ return -1;
+ }
+ if (source_running (source) == 0)
+ {
+ client->schedule_ms = client->worker->time_ms + 200;
return 0;
}
- if (source->amount_added_to_queue)
- client->lag += source->amount_added_to_queue;
+ thread_mutex_lock (&source->lock);
+ // do we migrate this listener to the same handler as the source client
+ if (source->client && source->client->worker != client->worker)
+ listener_change_worker (client, source);
+
+ client->schedule_ms = client->worker->time_ms;
while (loop)
{
/* jump out if client connection has died */
if (client->con->error)
+ {
+ ret = -1;
break;
-
+ }
/* lets not send too much to one client in one go, but don't
sleep for too long if more data can be sent */
if (total_written > source->listener_send_trigger)
- {
- loop = 0;
break;
- }
-
- if (client->check_buffer (source, client) < 0)
- break;
-
- bytes = client->write_to_client (client);
- if (bytes <= 0)
+ bytes = client->check_buffer (client);
+ if (bytes < 0)
break; /* can't write any more */
+ client->schedule_ms += 5;
total_written += bytes;
loop--;
}
+ if (loop == 0)
+ client->schedule_ms -= 20;
if (total_written)
{
- rate_add (source->format->out_bitrate, total_written, timing_get_time());
- global_add_bitrates (global.out_bitrate, total_written);
+ rate_add (source->format->out_bitrate, total_written, client->worker->time_ms);
+ global_add_bitrates (global.out_bitrate, total_written, client->worker->time_ms);
source->bytes_sent_since_update += total_written;
}
/* the refbuf referenced at head (last in queue) may be marked for deletion
* if so, check to see if this client is still referring to it */
- if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
+ if (client->refbuf && (client->refbuf->flags & SOURCE_BLOCK_RELEASE))
{
INFO2 ("Client %lu (%s) has fallen too far behind, removing",
client->con->id, client->con->ip);
stats_event_inc (source->mount, "slow_listeners");
client_set_queue (client, NULL);
- client->con->error = 1;
+ ret = -1;
}
- return loop ? 0 : 1;
+ thread_mutex_unlock (&source->lock);
+ return ret;
}
-/* run through the queue of listeners, the fast ones are at the back of the
- * queue so you may want to process only those. If a buffer is going to be
- * removed from the stream queue then flag it so that listeners can be
- * dropped if need be.
- */
-static void process_listeners (source_t *source, int fast_clients_only, int deletion_expected)
-{
- client_t *client, **client_p;
- client_t *fast_clients = NULL, **fast_client_tail = &fast_clients;
- unsigned long listener_count = 0;
-
- /* where do we start from */
- if (fast_clients_only)
- client_p = source->fast_clients_p;
- else
- client_p = &source->active_clients;
- client = *client_p;
-
- while (client)
- {
- int fast_client = send_to_listener (source, client, deletion_expected);
-
- if (client->con->error)
- {
- client_t *to_go = client;
- ice_config_t *config;
- mount_proxy *mountinfo;
-
- *client_p = client->next;
- client = client->next;
-
- config = config_get_config ();
- mountinfo = config_find_mount (config, source->mount);
- auth_release_listener (to_go, source->mount, mountinfo);
- config_release_config();
-
- source->listeners--;
- stats_event_dec (NULL, "listeners");
- DEBUG0("Client removed");
- continue;
- }
- listener_count++;
- if (fast_client && client->check_buffer != http_source_intro)
- {
- client_t *to_move = client;
-
- *client_p = client->next;
- client = client->next;
-
- to_move->next = NULL;
- *fast_client_tail = to_move;
- fast_client_tail = &to_move->next;
- continue;
- }
- client_p = &client->next;
- client = *client_p;
- }
- source->fast_clients_p = client_p;
- /* place fast clients list at the end */
- if (fast_clients)
- *client_p = fast_clients;
-
- /* consistency check, these should match */
- if (fast_clients_only == 0 && listener_count != source->listeners)
- ERROR3 ("mount %s has %lu, %lu", source->mount, listener_count, source->listeners);
-
- /* has the listener count changed */
- if (source->listeners != source->prev_listeners)
- {
- source->prev_listeners = source->listeners;
- INFO2("listener count on %s now %lu", source->mount, source->listeners);
- if (source->listeners > source->peak_listeners)
- {
- source->peak_listeners = source->listeners;
- stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
- }
- stats_event_args (source->mount, "listeners", "%lu", source->listeners);
- if (source->listeners == 0)
- rate_reduce (source->format->out_bitrate, 0);
- /* change of listener numbers, so reduce scope of global sampling */
- global_reduce_bitrate_sampling (global.out_bitrate);
- /* do we need to shutdown an on-demand relay */
- if (source->listeners == 0 && source->on_demand)
- source->running = 0;
- }
-}
-
-
/* Perform any initialisation before the stream data is processed, the header
* info is processed by now and the format details are setup
*/
-static void source_init (source_t *source)
+void source_init (source_t *source)
{
mount_proxy *mountinfo;
@@ -929,13 +877,9 @@
}
}
- /* grab a read lock, to make sure we get a chance to cleanup */
- thread_rwlock_rlock (&global.shutdown_lock);
-
/* start off the statistics */
stats_event_inc (NULL, "source_total_connections");
stats_event_hidden (source->mount, "slow_listeners", "0", STATS_COUNTERS);
- stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
stats_event (source->mount, "server_type", source->format->contenttype);
stats_event_hidden (source->mount, "listener_peak", "0", STATS_COUNTERS);
stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
@@ -943,15 +887,17 @@
stats_event_hidden (source->mount, "total_mbytes_sent", "0", STATS_COUNTERS);
stats_event_hidden (source->mount, "total_bytes_sent", "0", STATS_COUNTERS);
stats_event_hidden (source->mount, "total_bytes_read", "0", STATS_COUNTERS);
+ stats_event (source->mount, "source_ip", source->client->con->ip);
source->last_read = time(NULL);
source->prev_listeners = -1;
source->bytes_sent_since_update = 0;
source->stats_interval = 5;
+ source->termination_count = 0;
/* so the first set of average stats after 3 seconds */
source->client_stats_update = source->last_read + 3;
+ source->skip_duration = 80;
- source->fast_clients_p = &source->active_clients;
source->audio_info = util_dict_new();
if (source->client)
{
@@ -962,11 +908,13 @@
stats_event (source->mount, "audio_info", str);
}
}
+ source->format->in_bitrate = rate_setup (50, 1);
+ source->format->out_bitrate = rate_setup (50, 1000);
thread_mutex_unlock (&source->lock);
/* on demand relays should of already called this */
- if (source->on_demand == 0)
+ if ((source->flags & SOURCE_ON_DEMAND) == 0)
slave_update_all_mounts();
mountinfo = config_find_mount (config_get_config(), source->mount);
@@ -984,94 +932,47 @@
*/
if (mountinfo->fallback_override && mountinfo->fallback_mount)
- {
- source_t *fallback_source;
-
- avl_tree_rlock(global.source_tree);
- fallback_source = source_find_mount (mountinfo->fallback_mount);
-
- if (fallback_source)
- source_move_clients (fallback_source, source);
-
- avl_tree_unlock(global.source_tree);
- }
+ source_set_override (mountinfo->fallback_mount, source->mount);
}
config_release_config();
- thread_mutex_lock (&source->lock);
-
- source->format->in_bitrate = rate_setup (source->avg_bitrate_duration+1, 1);
- source->format->out_bitrate = rate_setup (120, 1000);
INFO1 ("Source %s initialised", source->mount);
- source->running = 1;
+ source->flags |= SOURCE_RUNNING;
}
-void source_main (source_t *source)
+void source_set_override (const char *mount, const char *dest)
{
- source_init (source);
+ source_t *source;
- while (global.running == ICE_RUNNING && source->running)
+ avl_tree_rlock (global.source_tree);
+ source = source_find_mount (mount);
+ if (source)
{
- int remove_from_q;
+ source->fallback.limit = 0;
+ source->fallback.mount = strdup (dest);
+ }
+ else
+ fserve_set_override (mount, dest);
+ avl_tree_unlock (global.source_tree);
+}
- get_next_buffer (source);
- remove_from_q = 0;
+void source_set_fallback (source_t *source, const char *dest_mount)
+{
+ if (dest_mount == NULL)
+ return;
- /* lets see if we have too much data in the queue, but do not
- remove it until later */
- if (source->queue_size > source->queue_size_limit)
- remove_from_q = 1;
-
- process_listeners (source, 0, remove_from_q);
-
- /* lets reduce the queue, any lagging clients should of been
- * terminated by now
- */
- if (source->stream_data)
- {
- /* if we need to prune the queue to keep within the specific limit then
- * all other references to the last block need of been gone by now
- */
- if (remove_from_q && source->stream_data->_count > 1)
- ERROR1 ("unable to prune queue, size is at %ld", source->queue_size);
-
- /* normal unreferenced queue data will have a refcount 1, but
- * burst queue data will be at least 2, active clients will also
- * increase refcount
- */
- while (source->stream_data->_count == 1)
- {
- refbuf_t *to_go = source->stream_data;
-
- if (to_go->next == NULL || source->burst_point == to_go)
- {
- /* this should not happen */
- ERROR2 ("queue state is unexpected (%p, %d)", to_go->next,
- source->burst_point == to_go ? 1: 0);
- source->running = 0;
- break;
- }
- source->stream_data = to_go->next;
- source->queue_size -= to_go->len;
- to_go->next = NULL;
- refbuf_release (to_go);
- }
- }
- }
- source->running = 0;
-
- source_shutdown (source);
+ source->fallback.flags = FS_FALLBACK;
+ source->fallback.mount = strdup (dest_mount);
+ source->fallback.limit = (int)(rate_avg (source->format->in_bitrate) * 1.02);
+ source->termination_count = source->listeners;
}
-
-/* this function is expected to keep lock held on exit */
-static void source_shutdown (source_t *source)
+void source_shutdown (source_t *source, int with_fallback)
{
mount_proxy *mountinfo;
- source->running = 0;
INFO1("Source \"%s\" exiting", source->mount);
update_source_stats (source);
@@ -1082,49 +983,15 @@
source_run_script (mountinfo->on_disconnect, source->mount);
auth_stream_end (mountinfo, source->mount);
- /* we have de-activated the source now, so no more clients will be
- * added, now move the listeners we have to the fallback (if any)
- */
- if (mountinfo->fallback_mount)
- {
- char *mount = strdup (mountinfo->fallback_mount);
- source_t *fallback_source;
-
- config_release_config();
- avl_tree_rlock (global.source_tree);
- fallback_source = source_find_mount (mount);
- free (mount);
-
- if (fallback_source != NULL)
- {
- /* be careful wrt to deadlocking */
- thread_mutex_unlock (&source->lock);
- source_move_clients (source, fallback_source);
- thread_mutex_lock (&source->lock);
- }
-
- avl_tree_unlock (global.source_tree);
- }
+ if (with_fallback)
+ source_set_fallback (source, mountinfo->fallback_mount);
}
- else
- config_release_config();
+ config_release_config();
- /* delete this sources stats */
- stats_event(source->mount, NULL, NULL);
-
- /* we don't remove the source from the tree here, it may be a relay and
- therefore reserved */
- source_clear_source (source);
-
- global_reduce_bitrate_sampling (global.out_bitrate);
-
global_lock();
global.sources--;
stats_event_args (NULL, "sources", "%d", global.sources);
global_unlock();
-
- /* release our hold on the lock so the main thread can continue cleaning up */
- thread_rwlock_unlock (&global.shutdown_lock);
}
@@ -1204,10 +1071,6 @@
if (source->format && source->format->apply_settings)
source->format->apply_settings (source->client, source->format, mountinfo);
- str = httpp_getvar (parser, "user-agent");
- if (str && source->format)
- stats_event_conv (source->mount, "user_agent", str, source->format->charset);
-
/* public */
if (mountinfo && mountinfo->yp_public >= 0)
val = mountinfo->yp_public;
@@ -1342,11 +1205,6 @@
if (mountinfo && mountinfo->limit_rate)
source->limit_rate = mountinfo->limit_rate;
- if (mountinfo)
- source->avg_bitrate_duration = mountinfo->avg_bitrate_duration;
- else
- source->avg_bitrate_duration = 60;
-
/* needs a better mechanism, probably via a client_t handle */
free (source->dumpfilename);
source->dumpfilename = NULL;
@@ -1411,12 +1269,6 @@
*/
void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
{
- /* skip if source is a fallback to file */
- if (source->running && source->client == NULL)
- {
- stats_event_hidden (source->mount, NULL, NULL, STATS_HIDDEN);
- return;
- }
/* set global settings first */
source->queue_size_limit = config->queue_size_limit;
source->timeout = config->source_timeout;
@@ -1429,7 +1281,7 @@
if (source->dumpfilename)
DEBUG1 ("Dumping stream to %s", source->dumpfilename);
- if (source->on_demand)
+ if (source->flags & SOURCE_ON_DEMAND)
{
DEBUG0 ("on_demand set");
stats_event (source->mount, "on_demand", "1");
@@ -1471,54 +1323,30 @@
}
-void *source_client_thread (void *arg)
+int source_client_callback (client_t *client, void *arg)
{
+ const char *agent;
source_t *source = arg;
- stats_event_inc(NULL, "source_client_connections");
-
- thread_mutex_lock (&source->lock);
- source_main (source);
-
- if (source->wait_time)
+ if (client->con->error) /* did http response fail? */
{
- time_t release = source->wait_time + time(NULL);
- INFO2 ("keeping %s reserved for %d seconds", source->mount, source->wait_time);
- thread_mutex_unlock (&source->lock);
- while (global.running && release >= time(NULL))
- thread_sleep (1000000);
- }
- else
- thread_mutex_unlock (&source->lock);
-
- source_free_source (source);
- slave_update_all_mounts();
-
- return NULL;
-}
-
-
-void source_client_callback (client_t *client, void *arg)
-{
- source_t *source = arg;
- refbuf_t *old_data = client->refbuf;
-
- if (client->con->error)
- {
global_lock();
global.sources--;
global_unlock();
- source_clear_source (source);
- source_free_source (source);
- return;
+ return -1;
}
- client->refbuf = old_data->associated;
- old_data->associated = NULL;
- refbuf_release (old_data);
- stats_event (source->mount, "source_ip", source->client->con->ip);
+ thread_rwlock_rlock (&global.shutdown_lock);
- thread_create ("Source Thread", source_client_thread,
- source, THREAD_DETACHED);
+ agent = httpp_getvar (source->client->parser, "user-agent");
+ if (agent)
+ stats_event (source->mount, "user_agent", agent);
+ stats_event_inc(NULL, "source_client_connections");
+ stats_event_hidden (source->mount, "listener_connections", "0", STATS_COUNTERS);
+
+ source_init (source);
+ client->ops = &source_client_ops;
+ client->shared_data = source;
+ return 0;
}
@@ -1557,71 +1385,6 @@
#endif
-static void *source_fallback_file (void *arg)
-{
- char *mount = arg;
- char *type;
- char *path;
- unsigned int len;
- FILE *file = NULL;
- source_t *source = NULL;
- ice_config_t *config;
- http_parser_t *parser;
-
- do
- {
- if (mount == NULL || mount[0] != '/')
- break;
- config = config_get_config();
- len = strlen (config->webroot_dir) + strlen (mount) + 1;
- path = malloc (len);
- if (path)
- snprintf (path, len, "%s%s", config->webroot_dir, mount);
-
- config_release_config ();
- if (path == NULL)
- break;
-
- file = fopen (path, "rb");
- if (file == NULL)
- {
- WARN1 ("unable to open file \"%s\"", path);
- free (path);
- break;
- }
- free (path);
- source = source_reserve (mount);
- if (source == NULL)
- {
- WARN1 ("mountpoint \"%s\" already reserved", mount);
- break;
- }
- INFO1 ("mountpoint %s is reserved", mount);
- type = fserve_content_type (mount);
- parser = httpp_create_parser();
- httpp_initialize (parser, NULL);
- httpp_setvar (parser, "content-type", type);
- free (type);
-
- stats_event_hidden (source->mount, NULL, NULL, STATS_HIDDEN);
- source->yp_public = 0;
- source->intro_file = file;
- source->parser = parser;
- source->avg_bitrate_duration = 20;
- source->listener_send_trigger = 4096;
- file = NULL;
-
- if (connection_complete_source (source, 0) < 0)
- break;
- source_client_thread (source);
- httpp_destroy (parser);
- } while (0);
- if (file)
- fclose (file);
- free (mount);
- return NULL;
-}
-
static int is_mount_template (const char *mount)
{
if (strchr (mount, '*') || strchr (mount, '?') || strchr (mount, '['))
@@ -1679,17 +1442,6 @@
else
stats_event (mount->mountname, NULL, NULL);
- /* check for fallback to file */
- if (global.running == ICE_RUNNING && mount->fallback_mount)
- {
- source_t *fallback = source_find_mount (mount->fallback_mount);
- if (fallback == NULL)
- {
- thread_create ("Fallback file thread", source_fallback_file,
- strdup (mount->fallback_mount), THREAD_DETACHED);
- }
- }
-
mount = mount->next;
}
avl_tree_unlock (global.source_tree);
@@ -1709,10 +1461,10 @@
return 1;
/* allow multiple authenticated relays */
- if (client->username == NULL || (client->flags & CLIENT_IS_SLAVE))
+ if (client->username == NULL || client->flags & CLIENT_IS_SLAVE)
return 1;
- existing = source->active_clients;
+ existing = source->client_list;
while (existing)
{
if (existing->con->error == 0 && existing->username &&
@@ -1732,10 +1484,95 @@
}
-/* Add client to source if it finds one. If a 0 is returned then the client should not be
- * touched, if the return value is -1 then it failed to add and should not be touched.
- * If it's a -2 value then the client is still around for any further processing.
+/* listeners have now detected the source shutting down, now wait for them to
+ * exit the handlers
*/
+static int source_client_shutdown (client_t *client)
+{
+ source_t *source = client->shared_data;
+ int ret = -1;
+
+ client->schedule_ms = client->worker->time_ms + 100;
+ if (client->con->discon_time)
+ {
+ if (client->con->discon_time >= client->worker->current_time.tv_sec)
+ return 0;
+ else
+ return -1;
+ }
+ thread_mutex_lock (&source->lock);
+ DEBUG1 ("remaining listeners to process is %d", source->listeners);
+ /* listeners handled now */
+ if (source->wait_time)
+ {
+ /* set a wait time for leaving the source reserved */
+ client->con->discon_time = client->worker->current_time.tv_sec + source->wait_time;
+ INFO2 ("keeping %s reserved for %d seconds", source->mount, source->wait_time);
+ ret = 0;
+ }
+ thread_mutex_unlock (&source->lock);
+ return ret;
+}
+
+
+/* clean up what is left from the source. */
+void source_client_release (client_t *client)
+{
+ source_t *source = client->shared_data;
+
+ global_reduce_bitrate_sampling (global.out_bitrate);
+
+ thread_mutex_lock (&source->lock);
+ /* log bytes read in access log */
+ if (source->format)
+ client->con->sent_bytes = source->format->read_bytes;
+ thread_mutex_unlock (&source->lock);
+
+ client_destroy (client);
+ source_free_source (source);
+ thread_rwlock_unlock (&global.shutdown_lock);
+ slave_update_all_mounts();
+}
+
+
+/* listener is off the handler list, so clean up */
+static void source_listener_release (client_t *client)
+{
+ source_t *source = client->shared_data;
+ ice_config_t *config;
+ mount_proxy *mountinfo;
+ client_t **pnext;
+ int value;
+
+ if (source_running (source) == 0)
+ return;
+
+ thread_mutex_lock (&source->lock);
+
+ /* search through sources client list to find previous link in list */
+ // we could flag the source to recreate the client list when needed
+ // killclient, listclients, move clients
+ pnext = &source->client_list;
+ while (*pnext && *pnext != client)
+ pnext = &((*pnext)->next);
+ *pnext = client->next;
+ value = --source->listeners;
+ if (source->listeners == 0)
+ rate_reduce (source->format->out_bitrate, 0);
+
+ stats_event_dec (NULL, "listeners");
+ stats_event_args (source->mount, "listeners", "%lu", value);
+ /* change of listener numbers, so reduce scope of global sampling */
+ global_reduce_bitrate_sampling (global.out_bitrate);
+
+ config = config_get_config ();
+ mountinfo = config_find_mount (config, source->mount);
+ auth_release_listener (client, source->mount, mountinfo);
+ config_release_config();
+ thread_mutex_unlock (&source->lock);
+}
+
+
int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
{
int loop = 10;
@@ -1775,7 +1612,7 @@
/* ok, we found a source and it is locked */
if (client->flags & CLIENT_IS_SLAVE)
{
- if (source->client == NULL && source->on_demand == 0)
+ if (source->client == NULL && (source->flags & SOURCE_ON_DEMAND) == 0)
{
client_send_403 (client, "Slave relay reading from time unregulated stream");
return -1;
@@ -1854,48 +1691,78 @@
return -1;
} while (1);
-
client->con->sent_bytes = 0;
- client->write_to_client = format_generic_write_to_client;
- client->check_buffer = http_source_listener;
+
client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
- /* lets add the client to the active list */
- client->next = source->active_clients;
- source->active_clients = client;
- source->listeners++;
+ source_setup_listener (source, client);
+ client->flags |= CLIENT_ACTIVE;
thread_mutex_unlock (&source->lock);
- if (source->running == 0 && source->on_demand)
+ return 0;
+}
+
+/* call with the source lock heldm, but expect the lock released on exit
+ * as the listener may of changed threads and therefore lock needed to be
+ * released
+ */
+void source_setup_listener (source_t *source, client_t *client)
+{
+ client->ops = &listener_client_ops;
+ client->shared_data = source;
+ client->queue_pos = 0;
+
+ client->check_buffer = http_source_listener;
+ // add client to the source
+ client->next = source->client_list;
+ source->client_list = client;
+ source->listeners++;
+}
+
+
+static int source_client_http_send (client_t *client)
+{
+ refbuf_t *stream;
+
+ if (client->pos < client->refbuf->len)
{
- /* enable on-demand relay to start, wake up the slave thread */
- DEBUG0("kicking off on-demand relay");
- source->on_demand_req = 1;
+ int ret = format_generic_write_to_client (client);
+ if (ret > 0 && ret < client->refbuf->len)
+ return 0; /* trap for short writes */
}
- return 0;
+ stream = client->refbuf->associated;
+ client->refbuf->associated = NULL;
+ refbuf_release (client->refbuf);
+ client->refbuf = stream;
+ client->pos = client->intro_offset;
+ client->intro_offset = 0;
+ return source_client_callback (client, client->shared_data);
}
-void source_startup (client_t *client, const char *uri)
+int source_startup (client_t *client, const char *uri)
{
source_t *source;
source = source_reserve (uri);
if (source)
{
+ thread_mutex_lock (&source->lock);
source->client = client;
source->parser = client->parser;
if (connection_complete_source (source, 1) < 0)
{
- source_clear_source (source);
+ thread_mutex_unlock (&source->lock);
source_free_source (source);
- return;
+ return -1;
}
client->respcode = 200;
- if (client->server_conn->shoutcast_compat)
+ client->shared_data = source;
+
+ if (client->server_conn && client->server_conn->shoutcast_compat)
{
- source->shoutcast_compat = 1;
+ source->flags |= SOURCE_SHOUTCAST_COMPAT;
source_client_callback (client, source);
}
else
@@ -1907,7 +1774,10 @@
/* we may have unprocessed data read in, so don't overwrite it */
ok->associated = client->refbuf;
client->refbuf = ok;
- fserve_add_client_callback (client, source_client_callback, source);
+ client->intro_offset = client->pos;
+ client->pos = 0;
+ client->ops = &source_client_http_ops;
+ thread_mutex_unlock (&source->lock);
}
}
else
@@ -1915,6 +1785,53 @@
client_send_403 (client, "Mountpoint in use");
WARN1 ("Mountpoint %s in use", uri);
}
+ return 0;
}
+/* check to see if the source client can be moved to a less busy worker thread.
+ * we only move the source client, not the listeners, they will move later
+ */
+void source_change_worker (source_t *source)
+{
+ client_t *client = source->client;
+ worker_t *this_worker = client->worker, *worker;
+
+ thread_rwlock_rlock (&workers_lock);
+ worker = find_least_busy_handler ();
+ if (worker != client->worker)
+ {
+ if (worker->count + source->listeners + 10 < client->worker->count)
+ {
+ thread_mutex_unlock (&source->lock);
+ client_change_worker (client, worker);
+ DEBUG2 ("moving source from %p to %p", this_worker, worker);
+ thread_mutex_lock (&source->lock);
+ }
+ }
+ thread_rwlock_unlock (&workers_lock);
+}
+
+
+/* move listener client to worker theread that the source is on. This will
+ * help cache but prevent overloading a single worker with many listeners.
+ */
+void listener_change_worker (client_t *client, source_t *source)
+{
+ worker_t *this_worker = client->worker, *dest_worker;
+ long diff;
+
+ thread_rwlock_rlock (&workers_lock);
+ dest_worker = source->client->worker;
+ diff = dest_worker->count - this_worker->count;
+
+ if (diff < 1000 && this_worker != dest_worker)
+ {
+ thread_mutex_unlock (&source->lock);
+ client_change_worker (client, dest_worker);
+ DEBUG2 ("moving listener from %p to %p", this_worker, dest_worker);
+ thread_mutex_lock (&source->lock);
+ }
+ thread_rwlock_unlock (&workers_lock);
+}
+
Modified: icecast/branches/kh/icecast/src/source.h
===================================================================
--- icecast/branches/kh/icecast/src/source.h 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/source.h 2009-07-07 23:26:21 UTC (rev 16219)
@@ -17,25 +17,23 @@
#include "yp.h"
#include "util.h"
#include "format.h"
+#include "fserve.h"
#include <stdio.h>
typedef struct source_tag
{
+ char *mount;
+ unsigned int flags;
+ int listener_send_trigger;
+
client_t *client;
http_parser_t *parser;
time_t client_stats_update;
-
- char *mount;
- /* set to zero to request the source to shutdown without causing a global
- * shutdown */
- int running;
-
struct _format_plugin_tag *format;
- client_t *active_clients;
- client_t **fast_clients_p;
+ client_t *client_list;
util_dict *audio_info;
@@ -45,19 +43,18 @@
char *dumpfilename; /* Name of a file to dump incoming stream to */
FILE *dumpfile;
- int throttle_stream;
- time_t throttle_termination;
- uint64_t limit_rate;
- int avg_bitrate_duration;
+ fbinfo fallback;
+
+ int skip_duration;
+ long limit_rate;
time_t wait_time;
- long listener_send_trigger;
+ unsigned long termination_count;
unsigned long peak_listeners;
unsigned long listeners;
unsigned long prev_listeners;
int yp_public;
- int shoutcast_compat;
int log_id;
/* per source burst handling for connecting clients */
@@ -67,11 +64,8 @@
unsigned int queue_size;
unsigned int queue_size_limit;
- unsigned int amount_added_to_queue;
unsigned timeout; /* source timeout in seconds */
- int on_demand;
- int on_demand_req;
unsigned long bytes_sent_since_update;
unsigned long bytes_read_since_update;
int stats_interval;
@@ -85,13 +79,20 @@
} source_t;
-#define source_available(x) ((x)->running || (x)->on_demand)
-#define source_running(x) ((x)->running)
+#define SOURCE_RUNNING 001
+#define SOURCE_ON_DEMAND 002
+#define SOURCE_ON_DEMAND_REQ 004
+#define SOURCE_SHOUTCAST_COMPAT 010
+#define SOURCE_TERMINATING 020
+#define SOURCE_TEMPORARY_FALLBACK 040
+#define source_available(x) ((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND))
+#define source_running(x) ((x)->flags & SOURCE_RUNNING)
+
source_t *source_reserve (const char *mount);
void *source_client_thread (void *arg);
-void source_startup (client_t *client, const char *uri);
-void source_client_callback (client_t *client, void *source);
+int source_startup (client_t *client, const char *uri);
+int source_client_callback (client_t *client, void *source);
void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo);
void source_clear_source (source_t *source);
source_t *source_find_mount(const char *mount);
@@ -104,8 +105,15 @@
void source_main(source_t *source);
void source_recheck_mounts (int update_all);
int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client);
+void source_read (source_t *source);
+void source_setup_listener (source_t *source, client_t *client);
+void source_init (source_t *source);
+void source_shutdown (source_t *source, int with_fallback);
+void source_set_fallback (source_t *source, const char *dest_mount);
+void source_set_override (const char *mount, const char *dest);
-extern mutex_t move_clients_mutex;
+#define SOURCE_BLOCK_SYNC 01
+#define SOURCE_BLOCK_RELEASE 02
#define SOURCE_BLOCK_SYNC 01
Modified: icecast/branches/kh/icecast/src/stats.c
===================================================================
--- icecast/branches/kh/icecast/src/stats.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/stats.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -47,7 +47,10 @@
#endif
#define VAL_BUFSIZE 20
+#define STATS_BLOCK_NORMAL 01
+#define STATS_LARGE CLIENT_FORMAT_BIT
+
#define STATS_EVENT_SET 0
#define STATS_EVENT_INC 1
#define STATS_EVENT_DEC 2
@@ -83,12 +86,11 @@
typedef struct _event_listener_tag
{
- client_t *client;
int hidden_level;
char *source;
/* queue for unwritten stats to stats clients */
- refbuf_t *queue, **queue_recent_p;
+ refbuf_t **queue_recent_p;
unsigned int content_len;
struct _event_listener_tag *next;
@@ -577,32 +579,71 @@
}
-static void stats_listeners_send (event_listener_t *listener)
+static int stats_listeners_send (client_t *client)
{
- int loop = 6;
- int ret;
- client_t *client = listener->client;
+ int loop = 8;
+ int ret = 0;
+ event_listener_t *listener = client->shared_data;
+ if (client->con->error)
+ return -1;
+ if (client->flags & STATS_LARGE)
+ loop = 4;
+ else
+ if (listener->content_len > 50000)
+ {
+ WARN1 ("dropping stats client, %ld in queue", listener->content_len);
+ return -1;
+ }
+ client->schedule_ms = client->worker->time_ms + 100;
+ thread_mutex_lock(&_stats_mutex);
while (loop)
{
- if (format_advance_queue (NULL, client) < 0)
+ refbuf_t *refbuf = client->refbuf;
+
+ if (refbuf == NULL)
break;
+ if ((client->flags & STATS_LARGE) && (refbuf->flags & STATS_BLOCK_NORMAL))
+ client->flags &= ~STATS_LARGE;
+
ret = format_generic_write_to_client (client);
- if (ret < 0)
- break;
- listener->content_len -= ret;
+ if (ret > 0)
+ listener->content_len -= ret;
+ if (client->pos == refbuf->len)
+ {
+ client->refbuf = refbuf->next;
+ refbuf->next = NULL;
+ refbuf_release (refbuf);
+ client->pos = 0;
+ if (client->refbuf == NULL)
+ {
+ listener->queue_recent_p = &client->refbuf;
+ break;
+ }
+ }
+ else if (ret < 4096)
+ break; /* short write, so stop for now */
loop--;
}
+ thread_mutex_unlock(&_stats_mutex);
+ if (loop == 0)
+ client->schedule_ms -= 100;
+ return 0;
}
-static void clear_stats_queue (event_listener_t *listener)
+
+static void clear_stats_queue (client_t *client)
{
- while (listener->queue && listener->queue->_count == 1)
+ refbuf_t *refbuf = client->refbuf;
+ while (refbuf)
{
- refbuf_t *to_go = listener->queue;
- listener->queue = to_go->next;
+ refbuf_t *to_go = refbuf;
+ refbuf = to_go->next;
+ if (to_go->_count != 1) DEBUG1 ("odd count for stats %d", to_go->_count);
+ to_go->next = NULL;
refbuf_release (to_go);
}
+ client->refbuf = NULL;
}
@@ -616,34 +657,8 @@
while (listener)
{
- client_t *client = listener->client;
-
if (listener->hidden_level & hidden_level)
- {
_add_stats_to_stats_client (listener, fmt, ap);
- }
- stats_listeners_send (listener);
- if (client->con->error || listener->content_len > 10000)
- {
- stats_event_t stats_count;
- char buffer [20];
-
- *trail = listener->next;
-
- build_event (&stats_count, NULL, "stats_connections", buffer);
- stats_count.action = STATS_EVENT_DEC;
- process_event_unlocked (&stats_count);
-
- /* moved this listener so that final cleanup can be done outside of lock */
- listener->next = _stats.listeners_removed;
- _stats.listeners_removed = listener;
-
- listener = *trail;
- continue;
- }
- /* reduce queue if unsued */
- clear_stats_queue (listener);
-
trail = &listener->next;
listener = listener->next;
}
@@ -736,6 +751,7 @@
if (_append_to_bufferv (refbuf, size, fmt, ap) == 0)
{
+ refbuf->flags |= STATS_BLOCK_NORMAL;
_add_node_to_stats_client (listener, refbuf);
return;
}
@@ -860,8 +876,6 @@
node2 = avl_get_next (node2);
}
}
-
- client_set_queue (listener->client, refbuf);
_add_node_to_stats_client (listener, refbuf);
/* now we register to receive future event notices */
@@ -870,31 +884,60 @@
}
-static void stats_callback (client_t *client, void *arg)
+static void stats_client_release (client_t *client)
{
- event_listener_t *listener = arg;
+ event_listener_t *listener = _stats.event_listeners,
+ **trail = &_stats.event_listeners;
+ while (listener)
+ {
+ if (listener == client->shared_data)
+ {
+ stats_event_t stats_count;
+ char buffer [20];
- client_set_queue (client, NULL);
-
- thread_mutex_lock(&_stats_mutex);
- _register_listener (listener);
- thread_mutex_unlock(&_stats_mutex);
+ *trail = listener->next;
+ clear_stats_queue (client);
+ free (listener->source);
+ free (listener);
+ client_destroy (client);
+ build_event (&stats_count, NULL, "stats_connections", buffer);
+ stats_count.action = STATS_EVENT_DEC;
+ process_event_unlocked (&stats_count);
+ return;
+ }
+ trail = &listener->next;
+ listener = listener->next;
+ }
}
+
+struct _client_functions stats_client_send_ops =
+{
+ stats_listeners_send,
+ stats_client_release
+};
+
void stats_add_listener (client_t *client, int hidden_level)
{
event_listener_t *listener = calloc (1, sizeof (event_listener_t));
listener->hidden_level = hidden_level;
- listener->client = client;
- listener->queue_recent_p = &listener->queue;
client->respcode = 200;
+ client->ops = &stats_client_send_ops;
+ client->shared_data = listener;
client_set_queue (client, NULL);
+ client->flags |= CLIENT_ACTIVE;
client->refbuf = refbuf_new (100);
snprintf (client->refbuf->data, 100,
"HTTP/1.0 200 OK\r\ncapability: streamlist\r\n\r\n");
client->refbuf->len = strlen (client->refbuf->data);
- fserve_add_client_callback (client, stats_callback, listener);
+ listener->content_len = client->refbuf->len;
+ listener->queue_recent_p = &client->refbuf->next;
+
+ client->flags |= STATS_LARGE;
+ thread_mutex_lock(&_stats_mutex);
+ _register_listener (listener);
+ thread_mutex_unlock(&_stats_mutex);
}
void stats_transform_xslt(client_t *client, const char *uri)
@@ -1073,15 +1116,5 @@
listener = _stats.listeners_removed;
_stats.listeners_removed = NULL;
thread_mutex_unlock (&_stats_mutex);
-
- /* flush out any closed stats clients */
- while (listener)
- {
- event_listener_t *to_go = listener;
- listener = listener->next;
- client_destroy (to_go->client);
- clear_stats_queue (to_go);
- free (to_go);
- }
}
Modified: icecast/branches/kh/icecast/src/xslt.c
===================================================================
--- icecast/branches/kh/icecast/src/xslt.c 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/src/xslt.c 2009-07-07 23:26:21 UTC (rev 16219)
@@ -236,7 +236,7 @@
client_set_queue (client, NULL);
client->refbuf = refbuf;
refbuf->len = strlen (refbuf->data);
- fserve_add_client (client, NULL);
+ fserve_setup_client (client, NULL);
xmlFree (string);
}
else
Modified: icecast/branches/kh/icecast/win32/icecast.dsp
===================================================================
--- icecast/branches/kh/icecast/win32/icecast.dsp 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/win32/icecast.dsp 2009-07-07 23:26:21 UTC (rev 16219)
@@ -375,6 +375,10 @@
# End Source File
# Begin Source File
+SOURCE=..\src\slave.h
+# End Source File
+# Begin Source File
+
SOURCE=..\src\net\sock.h
# End Source File
# Begin Source File
Modified: icecast/branches/kh/icecast/win32/icecast2.iss
===================================================================
--- icecast/branches/kh/icecast/win32/icecast2.iss 2009-07-07 23:18:05 UTC (rev 16218)
+++ icecast/branches/kh/icecast/win32/icecast2.iss 2009-07-07 23:26:21 UTC (rev 16219)
@@ -3,7 +3,7 @@
[Setup]
AppName=Icecast2-KH
-AppVerName=Icecast v2.3.2-kh9
+AppVerName=Icecast v2.3.2-kh10
AppPublisherURL=http://www.icecast.org
AppSupportURL=http://www.icecast.org
AppUpdatesURL=http://www.icecast.org
@@ -13,7 +13,7 @@
LicenseFile=..\COPYING
InfoAfterFile=..\README
OutputDir=.
-OutputBaseFilename=icecast2_win32_v2.3.2-kh9_setup
+OutputBaseFilename=icecast2_win32_v2.3.2-kh10_setup
WizardImageFile=icecast2logo2.bmp
WizardImageStretch=no
; uncomment the following line if you want your installation to run on NT 3.51 too.
@@ -45,7 +45,7 @@
Source: "..\admin\flashpolicy"; DestDir: "{app}\admin"; Flags: ignoreversion
Source: "c:\xiph\lib\pthreadVSE.dll"; DestDir: "{app}"; Flags: ignoreversion
Source: "..\conf\*.dist"; DestDir: "{app}"; Flags: ignoreversion
-Source: "..\examples\icecast_shoutcast_compat.xml"; DestDir: "{app}"; DestName: "icecast.xml"; Flags: ignoreversion
+Source: "..\examples\icecast_shoutcast_compat.xml"; DestDir: "{app}"; DestName: "icecast.xml"; Flags: ignoreversion confirmoverwrite
Source: "c:\xiph\lib\iconv.dll"; DestDir: "{app}"; Flags: ignoreversion
Source: "c:\xiph\lib\libxslt.dll"; DestDir: "{app}"; Flags: ignoreversion
Source: "c:\xiph\lib\libxml2.dll"; DestDir: "{app}"; Flags: ignoreversion
More information about the commits
mailing list