[xiph-commits] r17788 - in icecast/branches/kh/icecast: . admin src win32
karl at svn.xiph.org
karl at svn.xiph.org
Mon Jan 24 19:06:25 PST 2011
Author: karl
Date: 2011-01-24 19:06:25 -0800 (Mon, 24 Jan 2011)
New Revision: 17788
Modified:
icecast/branches/kh/icecast/NEWS
icecast/branches/kh/icecast/admin/listclients.xsl
icecast/branches/kh/icecast/config.h.vc6
icecast/branches/kh/icecast/configure.in
icecast/branches/kh/icecast/src/admin.c
icecast/branches/kh/icecast/src/auth.c
icecast/branches/kh/icecast/src/auth_htpasswd.c
icecast/branches/kh/icecast/src/auth_url.c
icecast/branches/kh/icecast/src/cfgfile.c
icecast/branches/kh/icecast/src/cfgfile.h
icecast/branches/kh/icecast/src/client.c
icecast/branches/kh/icecast/src/client.h
icecast/branches/kh/icecast/src/connection.c
icecast/branches/kh/icecast/src/event.c
icecast/branches/kh/icecast/src/flv.c
icecast/branches/kh/icecast/src/flv.h
icecast/branches/kh/icecast/src/format.c
icecast/branches/kh/icecast/src/format_mp3.c
icecast/branches/kh/icecast/src/format_mp3.h
icecast/branches/kh/icecast/src/format_ogg.c
icecast/branches/kh/icecast/src/fserve.c
icecast/branches/kh/icecast/src/global.h
icecast/branches/kh/icecast/src/mpeg.c
icecast/branches/kh/icecast/src/slave.c
icecast/branches/kh/icecast/src/source.c
icecast/branches/kh/icecast/src/source.h
icecast/branches/kh/icecast/win32/icecast2.iss
Log:
bump to kh29.
mostly fixes, mainly to do with workers handling relays, startup, shutdown, restarting
and how those listeners are processed. The handling of auth clients if services become
slow/unresponsive. some code cleanup, reduce some lock contention, code duplication and
fine tuning of certain limits.
Modified: icecast/branches/kh/icecast/NEWS
===================================================================
--- icecast/branches/kh/icecast/NEWS 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/NEWS 2011-01-25 03:06:25 UTC (rev 17788)
@@ -17,6 +17,53 @@
any extra tags are show in the conf/icecast.xml.dist file
+2.3.2-kh29
+. prevent crash with very small burst size (eg 0).
+. fix possible race with relay restart and headers used in stats update
+. fix possible mpeg issues causing a crash or getting stuck in an allocation loop on bad input.
+. An FLV listener could trigger a corruption in the metadata for non-FLV listeners. More likely
+ to occur when starting on-demand relays.
+. relays that have only been connected for a few seconds will be treated as if it had failed to
+ restart and listeners will be droped or moved.
+. fix possible server stall if reload requested and auth listeners were pending on auth queue
+. Fix a number of cases where listeners kept a relay from restarting because of an incorrect
+ internal state, a stuck relay.
+. revert to reserving a new relay source later in client processing. The relay is still installed
+ immediately (with client) but the source could clash with another already reserved.
+. listener triggers wakeup of inactive on-demand relay, which allows us to reduce the scheduling
+ of inactive relay clients.
+. when relays are updated, only wakeup workers once, not for each relay.
+. stats cleanup on disabled or failed relay start
+. on-demand relays minimum transfer set to 2Meg
+. make rejected_mount and stream_auth work properly again for URL auth
+. possible listener cleanup race if auth used at exit. We now keep listener on the worker even
+ if inactive until complete.
+. expanded FLV metadata, double and bool settings, adds audiocodecid, audiosamplerate, stereo and
+ various entries from stats.
+. slave relay could of been redirected, now it is marked as a slave by the master so bypassess
+ various limit checks.
+. If auth server does not respond then disable URL requests for listeners for 60 seconds. Keeps
+ icecast from clogging up if link to auth server breaks. Some people use auth for non-auth
+ accounting or listener intro, so extra option added to allow those by default (option
+ presume_innocent set to yes to allow listener even if auth not active).
+. Make auth refcount change based on threads holding it not clients, simplifies the shutdown or
+ busy startup case.
+. move non-ogg metadata update code into one place, and fix-up use of inline URL parsing and
+ possible short write in rare cases.
+. slow down listener thread (therefore reduce new incoming listeners) if relay starts up, more
+ so if many relays start or auth queues are full.
+. increase internal settings, max auth handlers to 100, auth queue to 300 and listen queue to 64
+. fix ogg stream dump-file.
+. reduce frequency of rebalancing across workers with many streams, reduces lock contention
+. use alloca instead of malloc for non-ogg metadata/stream merge, minor speedup
+. new listeners (with queryargs or user:pass) redirected because of icecast will
+ have their supplied auth details added to new location.
+. recreate worker pipe if it fails or else we could busy loop.
+. add error code to recoverable list for accept, and prevent listener thread slow down in those
+ cases, seems to affect FreeBSD more than others although it could affect others.
+. revert listener stat names for listclients to 2.3.2 case for now
+. minor log message cleanups.
+
2.3.2-kh28
. fix possible short send shoutcast metadata inserts getting corrupted
. fix a few possible flv issues when metadata is missing.
Modified: icecast/branches/kh/icecast/admin/listclients.xsl
===================================================================
--- icecast/branches/kh/icecast/admin/listclients.xsl 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/admin/listclients.xsl 2011-01-25 03:06:25 UTC (rev 17788)
@@ -58,10 +58,10 @@
<xsl:variable name = "themount" ><xsl:value-of select="@mount" /></xsl:variable>
<xsl:for-each select="listener">
<tr>
- <td align="center"><xsl:value-of select="ip" /><xsl:if test="username"> (<xsl:value-of select="username" />)</xsl:if></td>
- <td align="center"><xsl:value-of select="connected" /></td>
+ <td align="center"><xsl:value-of select="IP" /><xsl:if test="username"> (<xsl:value-of select="username" />)</xsl:if></td>
+ <td align="center"><xsl:value-of select="Connected" /></td>
<td align="center"><xsl:value-of select="lag" /></td>
- <td align="center"><xsl:value-of select="useragent" /></td>
+ <td align="center"><xsl:value-of select="UserAgent" /></td>
<td align="center"><a href="killclient.xsl?mount={$themount}&id={@id}">Kick</a></td>
</tr>
</xsl:for-each>
Modified: icecast/branches/kh/icecast/config.h.vc6
===================================================================
--- icecast/branches/kh/icecast/config.h.vc6 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/config.h.vc6 2011-01-25 03:06:25 UTC (rev 17788)
@@ -95,7 +95,7 @@
#define PACKAGE_NAME "Icecast"
/* Version number of package */
-#define VERSION "2.3.2-kh28"
+#define VERSION "2.3.2-kh29"
/* Define to the version of this package. */
#define PACKAGE_VERSION VERSION
Modified: icecast/branches/kh/icecast/configure.in
===================================================================
--- icecast/branches/kh/icecast/configure.in 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/configure.in 2011-01-25 03:06:25 UTC (rev 17788)
@@ -1,4 +1,4 @@
-AC_INIT([Icecast], [2.3.2-kh28], [karl at xiph.org])
+AC_INIT([Icecast], [2.3.2-kh29], [karl at xiph.org])
LT_INIT
AC_PREREQ(2.59)
Modified: icecast/branches/kh/icecast/src/admin.c
===================================================================
--- icecast/branches/kh/icecast/src/admin.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/admin.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -637,13 +637,13 @@
snprintf (buf, sizeof (buf), "%lu", listener->connection.id);
xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
- xmlNewChild (node, NULL, XMLSTR("ip"), XMLSTR(listener->connection.ip));
+ xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip));
useragent = httpp_getvar (listener->parser, "user-agent");
if (useragent && xmlCheckUTF8((unsigned char *)useragent))
{
xmlChar *str = xmlEncodeEntitiesReentrant (srcnode->doc, XMLSTR(useragent));
- xmlNewChild (node, NULL, XMLSTR("useragent"), str);
+ xmlNewChild (node, NULL, XMLSTR("UserAgent"), str);
xmlFree (str);
}
@@ -660,7 +660,7 @@
{
snprintf (buf, sizeof (buf), "%lu",
(unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
- xmlNewChild (node, NULL, XMLSTR("connected"), XMLSTR(buf));
+ xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf));
}
if (listener->username)
{
@@ -1061,17 +1061,17 @@
if (song)
{
plugin->set_tag (plugin, "song", song, charset);
- INFO2("Metadata song on mountpoint %s changed to \"%s\"", source->mount, song);
+ INFO2("Metadata song on %s set to \"%s\"", source->mount, song);
}
if (artist)
{
plugin->set_tag (plugin, "artist", artist, charset);
- INFO2 ("Metadata artist on mountpoint %s changed to \"%s\"", source->mount, artist);
+ INFO2 ("Metadata artist on %s changed to \"%s\"", source->mount, artist);
}
if (title)
{
plugin->set_tag (plugin, "title", title, charset);
- INFO2 ("Metadata title on mountpoint %s changed to \"%s\"", source->mount, title);
+ INFO2 ("Metadata title on %s changed to \"%s\"", source->mount, title);
}
/* updates are now done, let them be pushed into the stream */
plugin->set_tag (plugin, NULL, NULL, NULL);
Modified: icecast/branches/kh/icecast/src/auth.c
===================================================================
--- icecast/branches/kh/icecast/src/auth.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/auth.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -54,8 +54,23 @@
static void *auth_run_thread (void *arg);
static int auth_postprocess_listener (auth_client *auth_user);
static void auth_postprocess_source (auth_client *auth_user);
+static int wait_for_auth (client_t *client);
+struct _client_functions auth_release_ops =
+{
+ wait_for_auth,
+ client_destroy
+};
+
+
+static int wait_for_auth (client_t *client)
+{
+ DEBUG0 ("client finished with auth");
+ return -1;
+}
+
+
void auth_check_http (client_t *client)
{
const char *header;
@@ -112,7 +127,6 @@
static void queue_auth_client (auth_client *auth_user, mount_proxy *mountinfo)
{
auth_t *auth;
- int i;
if (auth_user == NULL || mountinfo == NULL)
return;
@@ -120,23 +134,27 @@
thread_mutex_lock (&auth->lock);
auth_user->next = NULL;
auth_user->auth = auth;
- auth->refcount++;
*auth->tailp = auth_user;
auth->tailp = &auth_user->next;
auth->pending_count++;
- for (i=0; i<auth->handlers; i++)
+ if (auth->refcount > auth->handlers)
+ DEBUG0 ("max authentication handlers allocated");
+ else
{
- if (auth->handles [i].thread == NULL)
+ int i;
+ for (i=0; i<auth->handlers; i++)
{
- DEBUG1 ("starting auth thread %d", i);
- auth->handles [i].thread = thread_create ("auth thread", auth_run_thread,
- &auth->handles [i], THREAD_DETACHED);
- break;
+ if (auth->handles [i].thread == NULL)
+ {
+ DEBUG1 ("starting auth thread %d", i);
+ auth->refcount++;
+ auth->handles [i].thread = thread_create ("auth thread", auth_run_thread,
+ &auth->handles [i], THREAD_DETACHED);
+ break;
+ }
}
}
- if (i == auth->handlers)
- DEBUG0 ("max authentication handlers allocated");
- INFO2 ("auth on %s has %d pending", auth->mount, auth->pending_count);
+ DEBUG2 ("auth on %s has %d pending", auth->mount, auth->pending_count);
thread_mutex_unlock (&auth->lock);
}
@@ -162,8 +180,6 @@
authenticator->running = 0;
while (authenticator->handlers)
{
- while (authenticator->handles [authenticator->handlers-1].thread)
- thread_sleep (5000);
if (authenticator->release_thread_data)
authenticator->release_thread_data (authenticator,
authenticator->handles [authenticator->handlers-1].data);
@@ -195,7 +211,6 @@
client_send_401 (client, auth_user->auth->realm);
auth_user->client = NULL;
}
- auth_release (auth_user->auth);
free (auth_user->hostname);
free (auth_user->mount);
free (auth_user);
@@ -250,18 +265,23 @@
*/
static void auth_remove_listener (auth_client *auth_user)
{
- DEBUG0 ("...queue listener");
if (auth_user->auth->release_listener)
auth_user->auth->release_listener (auth_user);
- auth_release (auth_user->auth);
auth_user->auth = NULL;
+
/* client is going, so auth is not an issue at this point */
- auth_user->client->flags &= ~CLIENT_AUTHENTICATED;
- if (auth_user->client->respcode == 0)
- client_send_404 (auth_user->client, "Failed relay");
- else
- auth_user->client->flags |= CLIENT_ACTIVE;
- auth_user->client = NULL;
+ if (auth_user->client)
+ {
+ client_t *client = auth_user->client;
+ if (client->worker)
+ client->flags = CLIENT_ACTIVE | (client->flags & ~CLIENT_AUTHENTICATED);
+ else
+ {
+ client->flags &= ~CLIENT_AUTHENTICATED;
+ client_destroy (auth_user->client);
+ }
+ auth_user->client = NULL;
+ }
}
@@ -353,6 +373,7 @@
}
DEBUG1 ("Authenication thread %d shutting down", handler->id);
handler->thread = NULL;
+ auth_release (auth);
thread_rwlock_unlock (&auth_lock);
return NULL;
}
@@ -409,7 +430,8 @@
/* check whether we are processing a streamlist request for slaves */
if (strcmp (mount, "/admin/streams") == 0)
{
- if (client->parser->req_type == httpp_req_stats && (client->flags & CLIENT_IS_SLAVE))
+ client->flags |= CLIENT_IS_SLAVE;
+ if (client->parser->req_type == httpp_req_stats)
{
stats_add_listener (client, STATS_SLAVE|STATS_GENERAL);
return 0;
@@ -467,12 +489,12 @@
if ((client->flags & CLIENT_AUTHENTICATED) == 0)
{
/* auth failed so do we place the listener elsewhere */
+ auth_user->client = NULL;
if (auth->rejected_mount)
mount = auth->rejected_mount;
else
{
client_send_401 (client, auth_user->auth->realm);
- auth_user->client = NULL;
return -1;
}
}
@@ -539,11 +561,13 @@
{
auth_client *auth_user;
- if (mountinfo->auth->running == 0 || mountinfo->auth->pending_count > 150)
+ if (mountinfo->auth->running == 0 || mountinfo->auth->pending_count > 300)
{
config_release_config ();
WARN0 ("too many clients awaiting authentication");
client_send_403 (client, "busy, please try again later");
+ if (global.new_connections_slowdown < 10)
+ global.new_connections_slowdown++;
return;
}
auth_user = auth_client_setup (mount, client);
@@ -577,30 +601,20 @@
{
if (client->flags & CLIENT_AUTHENTICATED)
{
- /* drop any queue reference here, we do not want a race between the source thread
- * and the auth/fserve thread */
client_set_queue (client, NULL);
if (mount && mountinfo && mountinfo->auth && mountinfo->auth->release_listener)
{
auth_client *auth_user = auth_client_setup (mount, client);
client->flags &= ~CLIENT_ACTIVE;
+ if (client->worker)
+ client->ops = &auth_release_ops; // put into a wait state
auth_user->process = auth_remove_listener;
queue_auth_client (auth_user, mountinfo);
return 0;
}
client->flags &= ~CLIENT_AUTHENTICATED;
}
- if (client->worker)
- {
- if (client->connection.sent_bytes == 0)
- {
- client_send_404 (client, NULL);
- return 0;
- }
- }
- else
- client_destroy (client);
return -1;
}
@@ -663,8 +677,8 @@
auth->handlers = atoi (options->value);
options = options->next;
}
- if (auth->handlers < 1) auth->handlers = 1;
- if (auth->handlers > 20) auth->handlers = 20;
+ if (auth->handlers < 1) auth->handlers = 3;
+ if (auth->handlers > 100) auth->handlers = 100;
return 0;
}
Modified: icecast/branches/kh/icecast/src/auth_htpasswd.c
===================================================================
--- icecast/branches/kh/icecast/src/auth_htpasswd.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/auth_htpasswd.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -112,7 +112,8 @@
return;
if (stat (htpasswd->filename, &file_stat) != 0)
{
- WARN1 ("failed to check status of %s", htpasswd->filename);
+ const char *msg = strerror (errno);
+ WARN2 ("failed to check status of %s (%s)", htpasswd->filename, msg ? msg : "unknown");
/* Create a dummy users tree for things to use later */
thread_rwlock_wlock (&htpasswd->file_rwlock);
Modified: icecast/branches/kh/icecast/src/auth_url.c
===================================================================
--- icecast/branches/kh/icecast/src/auth_url.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/auth_url.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -100,6 +100,8 @@
} auth_thread_data;
typedef struct {
+ time_t reject_until;
+ int presume_innocent;
char *addurl;
char *removeurl;
char *stream_start;
@@ -176,7 +178,7 @@
if (retcode == 403)
{
char *p = strchr (ptr, ' ') + 1;
- snprintf (atd->errormsg, sizeof(atd->errormsg), p);
+ snprintf (atd->errormsg, sizeof(atd->errormsg), "%s", p);
p = strchr (atd->errormsg, '\r');
if (p) *p='\0';
}
@@ -241,7 +243,7 @@
{
struct build_intro_contents *x = (void*)client->refbuf->data;
x->type = type;
- mpeg_setup (&x->sync, httpp_getvar (client->parser, HTTPP_VAR_URI));
+ mpeg_setup (&x->sync, client->connection.ip);
}
}
}
@@ -298,13 +300,19 @@
client_t *client = auth_user->client;
auth_url *url = auth_user->auth->state;
auth_thread_data *atd = auth_user->thread_data;
- time_t duration = time(NULL) - client->connection.con_time;
+ time_t now = time(NULL), duration = now - client->connection.con_time;
char *username, *password, *mount, *server, *ipaddr;
const char *qargs;
char *userpwd = NULL, post [4096];
if (url->removeurl == NULL)
return AUTH_OK;
+ if (url->reject_until)
+ {
+ if (url->reject_until >= now)
+ return AUTH_FAILED;
+ url->reject_until = 0;
+ }
server = util_url_escape (auth_user->hostname);
if (client->username)
@@ -364,7 +372,10 @@
DEBUG1 ("...handler %d sending request", auth_user->handler);
if (curl_easy_perform (atd->curl))
+ {
WARN2 ("auth to server %s failed with %s", url->removeurl, atd->errormsg);
+ url->reject_until = time (NULL) + 60; /* prevent further attempts for a while */
+ }
else
DEBUG1 ("...handler %d request complete", auth_user->handler);
@@ -391,6 +402,15 @@
if (url->addurl == NULL)
return AUTH_OK;
+ if (url->reject_until)
+ {
+ if (url->presume_innocent)
+ client->flags |= CLIENT_AUTHENTICATED;
+ if (url->reject_until >= time(NULL))
+ return AUTH_FAILED;
+ url->reject_until = 0;
+ }
+
config = config_get_config ();
server = util_url_escape (config->hostname);
port = config->port;
@@ -468,9 +488,11 @@
if (res)
{
+ url->reject_until = time (NULL) + 60; /* prevent further attempts for a while */
WARN2 ("auth to server %s failed with %s", url->addurl, atd->errormsg);
- client_send_403 (client, "Authentication not possible");
- auth_user->client = NULL;
+ INFO0 ("will not auth new listeners for 60 seconds");
+ if (url->presume_innocent)
+ client->flags |= CLIENT_AUTHENTICATED;
return AUTH_FAILED;
}
if (atd->location)
@@ -501,11 +523,11 @@
}
if (x->type)
mpeg_cleanup (&x->sync);
- auth_user->client = NULL;
if (atoi (atd->errormsg) == 403)
+ {
+ auth_user->client = NULL;
client_send_403 (client, atd->errormsg+4);
- else
- client_send_401 (client, auth_user->auth->realm);
+ }
if (atd->errormsg[0])
INFO2 ("client auth (%s) failed with \"%s\"", url->addurl, atd->errormsg);
return AUTH_FAILED;
@@ -756,6 +778,8 @@
free (url_info->timelimit_header);
url_info->timelimit_header = strdup (options->value);
}
+ if (strcmp(options->name, "presume_innocent") == 0)
+ url_info->presume_innocent = strcasecmp (options->value, "yes") ? 0 : 1;
options = options->next;
}
Modified: icecast/branches/kh/icecast/src/cfgfile.c
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/cfgfile.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -228,8 +228,9 @@
}
-static void config_clear_relay (relay_server *relay)
+relay_server *config_clear_relay (relay_server *relay)
{
+ relay_server *next = relay->next;
while (relay->masters)
{
relay_server_master *master = relay->masters;
@@ -239,8 +240,11 @@
if (master->mount) xmlFree (master->mount);
free (master);
}
- if (relay->localmount) xmlFree (relay->localmount);
+ if (relay->localmount) xmlFree (relay->localmount);
+ if (relay->username) xmlFree (relay->username);
+ if (relay->password) xmlFree (relay->password);
free (relay);
+ return next;
}
@@ -347,11 +351,7 @@
if (c->mimetypes_fn) xmlFree (c->mimetypes_fn);
while (c->relay)
- {
- relay_server *to_go = c->relay;
- c->relay = to_go->next;
- config_clear_relay (to_go);
- }
+ c->relay = config_clear_relay (c->relay);
while (c->mounts)
{
@@ -458,8 +458,11 @@
}
/* MUST be called with the lock held! */
-void config_set_config(ice_config_t *config) {
- memcpy(&_current_configuration, config, sizeof(ice_config_t));
+void config_set_config (ice_config_t *new_config, ice_config_t *old_config)
+{
+ if (old_config)
+ memcpy (old_config, &_current_configuration, sizeof(ice_config_t));
+ memcpy(&_current_configuration, new_config, sizeof(ice_config_t));
}
ice_config_t *config_get_config_unlocked(void)
Modified: icecast/branches/kh/icecast/src/cfgfile.h
===================================================================
--- icecast/branches/kh/icecast/src/cfgfile.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/cfgfile.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -294,8 +294,9 @@
int config_parse_file(const char *filename, ice_config_t *configuration);
int config_initial_parse_file(const char *filename);
int config_parse_cmdline(int arg, char **argv);
-void config_set_config(ice_config_t *config);
+void config_set_config (ice_config_t *new_config, ice_config_t *old_config);
listener_t *config_clear_listener (listener_t *listener);
+relay_server *config_clear_relay (relay_server *relay);
void config_clear(ice_config_t *config);
mount_proxy *config_find_mount (ice_config_t *config, const char *mount);
Modified: icecast/branches/kh/icecast/src/client.c
===================================================================
--- icecast/branches/kh/icecast/src/client.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/client.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -391,6 +391,16 @@
#endif
+static void worker_control_create (worker_t *worker)
+{
+ if (pipe_create (&worker->wakeup_fd[0]) < 0)
+ {
+ ERROR0 ("pipe failed, descriptor limit?");
+ abort();
+ }
+}
+
+
static void worker_add_pending_clients (worker_t *worker)
{
if (worker && worker->pending_clients)
@@ -413,7 +423,6 @@
static void worker_wait (worker_t *worker)
{
int ret, duration = 3;
- char ca[30];
if (global.running == ICE_RUNNING)
{
@@ -426,7 +435,22 @@
ret = util_timed_wait_for_fd (worker->wakeup_fd[0], duration);
if (ret > 0) /* may of been several wakeup attempts */
- pipe_read (worker->wakeup_fd[0], ca, sizeof ca);
+ {
+ char ca[30];
+ do
+ {
+ ret = pipe_read (worker->wakeup_fd[0], ca, sizeof ca);
+ if (ret > 0)
+ break;
+ if (ret < 0 && sock_recoverable (sock_error()))
+ break;
+ sock_close (worker->wakeup_fd[1]);
+ sock_close (worker->wakeup_fd[0]);
+ worker_control_create (worker);
+ worker_wakeup (worker);
+ WARN0 ("Had to recreate worker control feed");
+ } while (1);
+ }
worker_add_pending_clients (worker);
@@ -547,11 +571,8 @@
{
worker_t *handler = calloc (1, sizeof(worker_t));
- if (pipe_create (&handler->wakeup_fd[0]) < 0)
- {
- ERROR0 ("pipe failed, fd limit?");
- abort();
- }
+ worker_control_create (handler);
+
handler->pending_clients_tail = &handler->pending_clients;
thread_spin_create (&handler->lock);
thread_rwlock_wlock (&workers_lock);
Modified: icecast/branches/kh/icecast/src/client.h
===================================================================
--- icecast/branches/kh/icecast/src/client.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/client.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -67,11 +67,20 @@
/* position in first buffer */
unsigned int pos;
- /* http response code for this client */
- int respcode;
-
client_t *next_on_worker;
+ /* functions to process client */
+ struct _client_functions *ops;
+
+ /* function to check if refbuf needs updating */
+ int (*check_buffer)(struct _client_tag *client);
+
+ /* generic handle */
+ void *shared_data;
+
+ /* current mountpoint */
+ const char *mount;
+
/* the clients connection */
connection_t connection;
@@ -96,9 +105,6 @@
/* Client password, if authenticated */
char *password;
- /* generic handle */
- void *shared_data;
-
/* Format-handler-specific data for this client */
void *format_data;
@@ -111,11 +117,8 @@
/* function to call to release format specific resources */
void (*free_client_data)(struct _client_tag *client);
- /* function to check if refbuf needs updating */
- int (*check_buffer)(struct _client_tag *client);
-
- /* functions to process client */
- struct _client_functions *ops;
+ /* http response code for this client */
+ int respcode;
};
client_t *client_create (sock_t sock);
Modified: icecast/branches/kh/icecast/src/connection.c
===================================================================
--- icecast/branches/kh/icecast/src/connection.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/connection.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -44,6 +44,7 @@
#include "avl/avl.h"
#include "net/sock.h"
#include "httpp/httpp.h"
+#include "timing/timing.h"
#include "cfgfile.h"
#include "global.h"
@@ -510,12 +511,10 @@
struct sockaddr_storage sa;
socklen_t slen = sizeof (sa);
- con->con_time = time(NULL);
con->sock = sock;
if (sock == SOCK_ERROR)
return -1;
con->id = _next_connection_id();
- con->discon_time = con->con_time + header_timeout;
if (addr)
{
con->ip = strdup (addr);
@@ -566,6 +565,8 @@
{
close (sigfd);
}
+#else
+#define connection_close_sigfd() do {}while(0);
#endif
static sock_t wait_for_serversock (void)
@@ -583,7 +584,7 @@
ufds[i].fd = sigfd;
ufds[i].events = POLLIN;
ufds[i].revents = 0;
- ret = poll(ufds, i+1, -1);
+ ret = poll(ufds, i+1, 4000);
#else
ret = poll(ufds, global.server_sockets, 333);
#endif
@@ -727,7 +728,6 @@
}
global_unlock ();
sock_close (sock);
- thread_sleep (1000);
return NULL;
}
@@ -977,12 +977,16 @@
if (client)
{
/* do a small delay here so the client has chance to send the request after
- * getting a connect. This also prevents excessively large number of new
- * listeners from joining at the same time */
- thread_sleep (3000);
+ * getting a connect. */
+ client->schedule_ms = timing_get_time();
+ client->connection.con_time = client->schedule_ms/1000;
+ client->connection.discon_time = client->connection.con_time + header_timeout;
+ client->schedule_ms += 5;
client_add_worker (client);
stats_event_inc (NULL, "connections");
}
+ if (global.new_connections_slowdown)
+ thread_sleep (global.new_connections_slowdown * 5000);
}
#ifdef HAVE_OPENSSL
SSL_CTX_free (ssl_ctx);
@@ -1023,6 +1027,7 @@
if (conn_tid)
{
connection_running = 0;
+ connection_close_sigfd ();
INFO0("shutting down connection thread");
thread_join (conn_tid);
conn_tid = NULL;
@@ -1456,16 +1461,13 @@
void connection_close(connection_t *con)
{
- if (con->con_time)
- {
- if (con->sock != SOCK_ERROR)
- sock_close(con->sock);
- free(con->ip);
+ if (con->sock != SOCK_ERROR)
+ sock_close (con->sock);
+ free (con->ip);
#ifdef HAVE_OPENSSL
- if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); }
+ if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); }
#endif
- memset (con, 0, sizeof (connection_t));
- con->sock = SOCK_ERROR;
- }
+ memset (con, 0, sizeof (connection_t));
+ con->sock = SOCK_ERROR;
}
Modified: icecast/branches/kh/icecast/src/event.c
===================================================================
--- icecast/branches/kh/icecast/src/event.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/event.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -32,7 +32,7 @@
{
int ret;
ice_config_t *config;
- ice_config_t new_config;
+ ice_config_t new_config, old_config;
/* reread config file */
INFO0("Re-reading XML");
@@ -61,8 +61,7 @@
}
else {
restart_logging (&new_config);
- config_clear(config);
- config_set_config(&new_config);
+ config_set_config (&new_config, &old_config);
config = config_get_config_unlocked();
yp_recheck_config (config);
fserve_recheck_mime_types (config);
@@ -71,6 +70,7 @@
config_release_config();
connection_thread_shutdown();
slave_restart();
+ config_clear (&old_config);
}
}
Modified: icecast/branches/kh/icecast/src/flv.c
===================================================================
--- icecast/branches/kh/icecast/src/flv.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/flv.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -208,7 +208,7 @@
refbuf_t *ref = client->refbuf, *scmeta = ref->associated;
mp3_client_data *client_mp3 = client->format_data;
struct flv *flv = client_mp3->specific;
- int ret;
+ int ret, meta_to_free = 0;
if (client->pos >= ref->len)
{
@@ -229,21 +229,50 @@
flvmeta = scmeta->associated;
if (flvmeta == NULL)
{
- char *value = stats_get_value (flv->mpeg_sync.mount, "server_name");
+ char *value = stats_get_value (client->mount, "server_name");
flvmeta = flv_meta_allocate (200);
+ meta_to_free = 1;
if (value)
- flv_meta_append (flvmeta, "name", value);
+ flv_meta_append_string (flvmeta, "name", value);
free (value);
value = stats_get_value (flv->mpeg_sync.mount, "title");
if (value)
- flv_meta_append (flvmeta, "title", value);
+ flv_meta_append_string (flvmeta, "title", value);
else
- flv_meta_append (flvmeta, "title", "");
+ flv_meta_append_string (flvmeta, "title", "");
free (value);
- flv_meta_append (flvmeta, "title", value);
- flv_meta_append (flvmeta, NULL, NULL);
- ref->associated = flvmeta;
+ flv_meta_append_string (flvmeta, "title", value);
+ value = stats_get_value (client->mount, "audio_codecid");
+ if (value)
+ {
+ int id = atoi (value);
+ if (id == 2 || id == 10)
+ flv_meta_append_number (flvmeta, "audiocodecid", (double)id);
+ free (value);
+ }
+ value = stats_get_value (client->mount, "ice-bitrate");
+ if (value)
+ {
+ double rate = (double)atoi (value);
+ flv_meta_append_number (flvmeta, "audiodatarate", rate);
+ free (value);
+ }
+ value = stats_get_value (client->mount, "ice-samplerate");
+ if (value)
+ {
+ double rate = (double)atoi (value);
+ flv_meta_append_number (flvmeta, "audiosamplerate", rate);
+ free (value);
+ }
+ value = stats_get_value (client->mount, "ice-channels");
+ if (value)
+ {
+ int chann = atoi (value);
+ flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0);
+ free (value);
+ }
+ flv_meta_append_string (flvmeta, NULL, NULL);
}
flvm = (struct flvmeta *)flvmeta->data;
src = (char *)flvm + sizeof (*flvm);
@@ -260,8 +289,10 @@
flv->prev_tagsize = len + 11;
flv->tag[4] = prev_type;
flv->seen_metadata = ref->associated;
+ if (meta_to_free) refbuf_release (flvmeta);
return send_flv_buffer (client, flv);
}
+ if (meta_to_free) refbuf_release (flvmeta);
flv->seen_metadata = ref->associated;
}
@@ -283,6 +314,25 @@
}
+void flv_write_BE64 (void *where, void *val)
+{
+ int len = sizeof (uint64_t);
+ int diff = 1;
+ unsigned char *src = val, *dst = where;
+
+ if (*((unsigned char *)&len)) // little endian?
+ {
+ diff = -1;
+ src += sizeof (uint64_t) - 1;
+ }
+ for (len = sizeof (uint64_t); len; len--, dst++)
+ {
+ *dst = *src;
+ src += diff;
+ }
+}
+
+
void flv_write_UI16 (unsigned char *p, unsigned val)
{
p[1] = val & 0xFF;
@@ -303,41 +353,80 @@
memcpy (ptr+13, "\010\000\000\000\000", 5);
flvm->meta_pos = 18;
flvm->arraylen = 0;
+ flvm->meta_pos += sizeof (struct flvmeta);
+ return buffer;
+}
-#if 0
- flvm->arraylen = 1;
- memcpy (ptr+18, "\000\010duration\000", 11);
- flvm->meta_pos += 2+8+1;
+
+static int flv_meta_increase (refbuf_t *buffer, int taglen, int valuelen)
+{
+ struct flvmeta *flvm = (struct flvmeta *)buffer->data;
+ unsigned char *array_size_loc = (unsigned char *)buffer->data + sizeof (*flvm) + 16;
+
+ if (taglen + valuelen + 3 + flvm->meta_pos > buffer->len - 3)
+ taglen = 0; // force end of the metadata
+ if (taglen == 0)
{
- // 1 billion as a double, endian matters. hex 00 41 cd cd 65 00 00 00 00
- memcpy (ptr+flvm->meta_pos, "\101\315\315\145\000\000\000\000", 8);
- flvm->meta_pos += 8;
+ DEBUG1 ("%d array elements", flvm->arraylen);
+ memcpy (buffer->data+flvm->meta_pos, "\000\000\011", 3);
+ flvm->meta_pos += 3;
+ return -1;
}
-#endif
- flvm->meta_pos += sizeof (struct flvmeta);
- return buffer;
+ flvm->arraylen++;
+ flv_write_UI16 (array_size_loc, flvm->arraylen); // over 64k tags not handled
+ flvm->meta_pos += (2 + taglen + 1 + valuelen);
+ return 0;
}
-void flv_meta_append (refbuf_t *buffer, const char *tag, const char *value)
+void flv_meta_append_bool (refbuf_t *buffer, const char *tag, int value)
{
+ int taglen = tag ? strlen (tag) : 0;
struct flvmeta *flvm = (struct flvmeta *)buffer->data;
+ unsigned char *ptr = (unsigned char *)buffer->data + flvm->meta_pos;
+
+ if (flv_meta_increase (buffer, taglen, 1) < 0)
+ return;
+
+ flv_write_UI16 (ptr, taglen);
+ memcpy (ptr+2, tag, taglen);
+ ptr += (taglen + 2);
+ *ptr = 0x01; // a boolean as UI8
+ ptr[1] = value & 0xFF;
+}
+
+
+void flv_meta_append_number (refbuf_t *buffer, const char *tag, double value)
+{
+ int taglen = 0;
+ struct flvmeta *flvm = (struct flvmeta *)buffer->data;
+ unsigned char *ptr = (unsigned char *)buffer->data + flvm->meta_pos;
+
+ if (tag) taglen = strlen (tag);
+ if (flv_meta_increase (buffer, taglen, 8) < 0)
+ return;
+
+ flv_write_UI16 (ptr, taglen);
+ memcpy (ptr+2, tag, taglen);
+ ptr += (taglen + 2);
+ *ptr = 0x00; // a number
+ ptr++;
+ flv_write_BE64 (ptr, &value);
+ // DEBUG2 ("Appending %s number %g", tag, value);
+}
+
+
+void flv_meta_append_string (refbuf_t *buffer, const char *tag, const char *value)
+{
int taglen = 0, valuelen = 0;
- unsigned char *ptr, *array_sizing;
+ struct flvmeta *flvm = (struct flvmeta *)buffer->data;
+ unsigned char *ptr = (unsigned char *)buffer->data + flvm->meta_pos;
if (tag) taglen = strlen (tag);
if (value) valuelen = strlen (value);
- if (taglen + valuelen + 5 + flvm->meta_pos > buffer->len - 3)
- tag = NULL; // force end of the metadata
- if (tag == NULL)
- {
- memcpy (buffer->data+flvm->meta_pos, "\000\000\011", 3);
- flvm->meta_pos += 3;
+ if (flv_meta_increase (buffer, taglen, valuelen+2) < 0)
return;
- }
- array_sizing = (unsigned char *)buffer->data + sizeof (*flvm) +16; // 64k elements enough?
- ptr = (unsigned char *)buffer->data + flvm->meta_pos;
flv_write_UI16 (ptr, taglen);
memcpy (ptr+2, tag, taglen);
@@ -346,10 +435,7 @@
ptr++;
flv_write_UI16 (ptr, valuelen);
memcpy (ptr+2, value, valuelen);
- ptr += (valuelen + 2);
- flvm->arraylen++;
- flv_write_UI16 (array_sizing, flvm->arraylen); // over 64k tags not handled
- flvm->meta_pos = (char*)ptr - buffer->data;
+ // DEBUG2 ("Appending %s string %s", tag, value);
}
Modified: icecast/branches/kh/icecast/src/flv.h
===================================================================
--- icecast/branches/kh/icecast/src/flv.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/flv.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -34,4 +34,6 @@
void free_flv_client_data (struct flv *flv);
refbuf_t *flv_meta_allocate (size_t len);
-void flv_meta_append (refbuf_t *buffer, const char *tag, const char *value);
+void flv_meta_append_string (refbuf_t *buffer, const char *tag, const char *value);
+void flv_meta_append_number (refbuf_t *buffer, const char *tag, double value);
+void flv_meta_append_bool (refbuf_t *buffer, const char *tag, int value);
Modified: icecast/branches/kh/icecast/src/format.c
===================================================================
--- icecast/branches/kh/icecast/src/format.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/format.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -76,6 +76,8 @@
rate_free (format->in_bitrate);
rate_free (format->out_bitrate);
free (format->charset);
+ if (format->parser && format->parser != client->parser) // a relay client may have a new parser
+ httpp_destroy (format->parser);
memset (format, 0, sizeof (format_plugin_t));
}
@@ -195,6 +197,7 @@
const char *useragent = httpp_getvar (client->parser, "user-agent");
const char *protocol = "HTTP/1.0";
const char *contenttypehdr = "Content-Type";
+ const char *contenttype = plugin->contenttype;
if (useragent)
{
@@ -202,9 +205,11 @@
protocol = "ICY";
if (strstr (useragent, "Shoutcast Server")) /* hack for sc_serv */
contenttypehdr = "content-type";
+ if (strstr (useragent, "BlackBerry"))
+ contenttype="audio/aac";
}
bytes = snprintf (ptr, remaining, "%s 200 OK\r\n"
- "%s: %s\r\n", protocol, contenttypehdr, plugin->contenttype);
+ "%s: %s\r\n", protocol, contenttypehdr, contenttype);
remaining -= bytes;
ptr += bytes;
client->respcode = 200;
Modified: icecast/branches/kh/icecast/src/format_mp3.c
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/format_mp3.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -74,7 +74,7 @@
{
const char *metadata;
mp3_state *state = calloc(1, sizeof(mp3_state));
- refbuf_t *meta, *flvmeta;
+ refbuf_t *meta;
const char *s;
plugin->get_buffer = mp3_get_no_meta;
@@ -97,11 +97,7 @@
/* initial metadata needs to be blank for sending to clients and for
comparing with new metadata */
- flvmeta = flv_meta_allocate (200);
- flv_meta_append (flvmeta, "Title", "");
- flv_meta_append (flvmeta, NULL, NULL);
meta = refbuf_new (17);
- meta->associated = flvmeta;
memcpy (meta->data, "\001StreamTitle='';", 17);
state->metadata = meta;
state->interval = -1;
@@ -117,10 +113,10 @@
state->interval = state->inline_metadata_interval;
}
}
- if (client)
+ if (client && (plugin->type == FORMAT_TYPE_AAC || plugin->type == FORMAT_TYPE_MPEG))
{
client->format_data = malloc (sizeof (mpeg_sync));
- mpeg_setup (client->format_data, plugin->mount);
+ mpeg_setup (client->format_data, client->connection.ip);
plugin->write_buf_to_client = write_mpeg_buf_to_client;
}
mpeg_setup (&state->file_sync, plugin->mount);
@@ -158,8 +154,6 @@
}
free (source_mp3->url_title);
source_mp3->url_title = value;
- stats_event (plugin->mount, "title", value);
- stats_event_time (plugin->mount, "metadata_updated");
}
else if (strcmp (tag, "artist") == 0)
{
@@ -169,38 +163,63 @@
}
else if (strcmp (tag, "url") == 0)
{
- free (source_mp3->url);
- source_mp3->url = value;
+ free (source_mp3->inline_url);
+ source_mp3->inline_url = value;
}
else
free (value);
}
-static char *filter_shoutcast_metadata (source_t *source, char *metadata, size_t meta_len)
+static int parse_shoutcast_metadata (mp3_state *source_mp3)
{
- char *p = NULL;
- char *end;
- int len;
+ int meta_len = source_mp3->build_metadata_len;
+ char *metadata = source_mp3->build_metadata;
+ if (meta_len <= 1 || memcmp (metadata, source_mp3->metadata->data, meta_len) == 0)
+ return 0;
+
+ if (metadata == NULL || meta_len < 16 || meta_len > 4081)
+ return -1;
+ if (*(unsigned char*)metadata * 16 + 1 != meta_len)
+ return -1;
+ metadata++;
+ meta_len--;
do
{
- if (metadata == NULL || meta_len < 16 || meta_len > 4081)
+ char *s, *end = NULL;
+ int len, term_len = 2;
+ if (strncmp (metadata, "StreamTitle='", 13) == 0)
+ {
+ if ((end = strstr (metadata+13, "';")) == NULL)
+ break;
+ len = end - metadata - 12;
+ s = malloc (len);
+ snprintf (s, len, "%s", metadata+13);
+ free (source_mp3->url_title);
+ source_mp3->url_title = s;
+ DEBUG1 ("found title %s", s);
+ }
+ else if (strncmp (metadata, "StreamUrl='", 11) == 0)
+ {
+ if ((end = strstr (metadata+11, "';")) == NULL)
+ break;
+ len = end - metadata - 10;
+ s = malloc (len);
+ snprintf (s, len, "%s", metadata+11);
+ free (source_mp3->inline_url);
+ source_mp3->inline_url = s;
+ DEBUG1 ("found url %s", s);
+ }
+ else if ((end = strchr (metadata, ';')) == NULL)
break;
- if (*(unsigned char*)metadata * 16 + 1 != meta_len)
- break;
- metadata [meta_len-1] = '\0';
- metadata++;
- if (strncmp (metadata, "StreamTitle='", 13))
- break;
- if ((end = strstr (metadata+13, "\';")) == NULL)
- break;
- len = (end - metadata) - 13;
- p = calloc (1, len+1);
- if (p)
- memcpy (p, metadata+13, len);
- } while (0);
- return p;
+ else
+ term_len=1;
+ meta_len -= (end - metadata + term_len);
+ metadata = end + term_len;
+ source_mp3->update_metadata = 1;
+ } while (meta_len > 0);
+ return 0;
}
@@ -262,11 +281,7 @@
if (source_mp3->url_artist && source_mp3->url_title)
len += 3;
if (source_mp3->inline_url)
- {
- char *end = strstr (source_mp3->inline_url, "';");
- if (end)
- len += end - source_mp3->inline_url + strlen (streamurl) + 2;
- }
+ len += strlen (source_mp3->inline_url) + strlen (streamurl) + 2;
else if (source_mp3->url)
len += strlen (source_mp3->url) + strlen (streamurl) + 2;
#define MAX_META_LEN 255*16
@@ -284,57 +299,89 @@
p = refbuf_new (size);
if (p)
{
- refbuf_t *flvmeta = flv_meta_allocate (1000);
+ refbuf_t *flvmeta = flv_meta_allocate (4000);
+ mpeg_sync *mpeg_sync = source->client->format_data;
mp3_state *source_mp3 = source->format->_state;
int r;
- char *title;
memset (p->data, '\0', size);
p->associated = flvmeta;
+ if (mpeg_sync)
+ {
+ char *str = stats_get_value (source->mount, "server_name");
+ if (str)
+ {
+ flv_meta_append_string (flvmeta, "name", str);
+ free (str);
+ }
+ str = stats_get_value (source->mount, "server_description");
+ if (str)
+ {
+ flv_meta_append_string (flvmeta, "description", str);
+ free (str);
+ }
+ str = stats_get_value (source->mount, "ice-channels");
+ if (str)
+ {
+ int chann = atoi (str);
+ flv_meta_append_bool (flvmeta, "stereo", chann == 2 ? 1 : 0);
+ free (str);
+ }
+ else
+ flv_meta_append_bool (flvmeta, "stereo", (mpeg_sync->channels == 2));
+ str = stats_get_value (source->mount, "ice-samplerate");
+ if (str)
+ {
+ double rate = (double)atoi (str);
+ flv_meta_append_number (flvmeta, "audiosamplerate", rate);
+ free (str);
+ }
+ else
+ flv_meta_append_number (flvmeta, "audiosamplerate", (double)mpeg_sync->samplerate);
+ str = stats_get_value (source->mount, "ice-bitrate");
+ if (str)
+ {
+ double rate = (double)atoi (str);
+ flv_meta_append_number (flvmeta, "audiodatarate", rate);
+ free (str);
+ }
+ flv_meta_append_number (flvmeta, "audiocodecid", (double)(mpeg_sync->layer ? 2 : 10));
+ }
if (source_mp3->url_artist && source_mp3->url_title)
{
- r = snprintf (p->data, size, "%c%s%s - %s';", len_byte, streamtitle,
+ r = snprintf (p->data, size, "%c%s%s - %s", len_byte, streamtitle,
source_mp3->url_artist, source_mp3->url_title);
- flv_meta_append (flvmeta, "artist", source_mp3->url_artist);
+ flv_meta_append_string (flvmeta, "artist", source_mp3->url_artist);
}
else
- r = snprintf (p->data, size, "%c%s%s';", len_byte, streamtitle,
- source_mp3->url_title);
- flv_meta_append (flvmeta, "title", source_mp3->url_title);
+ r = snprintf (p->data, size, "%c%s%s", len_byte, streamtitle, source_mp3->url_title);
+ logging_playlist (source->mount, p->data+14, source->listeners);
+ stats_event (source->mount, "title", p->data+14);
+ strcat (p->data+14, "';");
+ flv_meta_append_string (flvmeta, "title", source_mp3->url_title);
if (r > 0)
{
- if (source_mp3->inline_url)
+ r += 2;
+ if (source_mp3->inline_url && size-r > strlen (source_mp3->inline_url)+14)
{
- char *end = strstr (source_mp3->inline_url, "';");
- if (end)
- {
- int urllen = end - source_mp3->inline_url;
- int len = urllen + strlen("StreamUrl='") + 3;
- if (size-r > len);
- {
- char *tmp = alloca (urllen+1);
- snprintf (p->data+r, len, "StreamUrl='%.*s';", urllen, source_mp3->inline_url);
- snprintf (tmp, urllen+1, "%.*s", urllen, source_mp3->inline_url);
- flv_meta_append (flvmeta, "metadata_url", tmp);
- stats_event (source->mount, "metadata_url", tmp);
- }
- }
+ snprintf (p->data+r, size-r, "StreamUrl='%s';", source_mp3->inline_url);
+ flv_meta_append_string (flvmeta, "URL", source_mp3->inline_url);
+ stats_event (source->mount, "metadata_url", source_mp3->inline_url);
}
else if (source_mp3->url)
{
snprintf (p->data+r, size-r, "StreamUrl='%s';", source_mp3->url);
- flv_meta_append (flvmeta, "URL", source_mp3->url);
+ flv_meta_append_string (flvmeta, "URL", source_mp3->url);
+ stats_event (source->mount, "metadata_url", NULL);
}
}
- DEBUG1 ("shoutcast metadata block setup with %.80s", p->data+1);
- title = filter_shoutcast_metadata (source, p->data, size);
- logging_playlist (source->mount, title, source->listeners);
+ DEBUG1 ("shoutcast metadata block setup with %.80s...", p->data+1);
yp_touch (source->mount);
- free (title);
- flv_meta_append (flvmeta, NULL, NULL);
+ flv_meta_append_string (flvmeta, NULL, NULL);
refbuf_release (source_mp3->metadata);
source_mp3->metadata = p;
+ stats_event_time (source->mount, "metadata_updated");
}
}
@@ -398,12 +445,11 @@
* to merge them into 1 write call */
if (remaining)
{
- char *merge = malloc (remaining + meta_len);
+ char *merge = alloca (remaining + meta_len);
memcpy (merge, refbuf->data + client->pos, remaining);
memcpy (merge+remaining, metadata, meta_len);
ret = client_send_bytes (client, merge, remaining+meta_len);
- free (merge);
if (ret > (int)remaining)
{
@@ -425,6 +471,8 @@
client->queue_pos += remaining;
return ret;
}
+ /* although we are not actually in metadata yet, we know we can do another merge next time */
+ client->flags |= CLIENT_IN_METADATA;
client->schedule_ms += 200;
if (ret > 0)
{
@@ -532,6 +580,7 @@
}
free (format_mp3->url_artist);
free (format_mp3->url_title);
+ free (format_mp3->inline_url);
free (format_mp3->url);
refbuf_release (format_mp3->metadata);
refbuf_release (format_mp3->read_data);
@@ -586,6 +635,7 @@
return unprocessed;
}
+
static int validate_mpeg (source_t *source, refbuf_t *refbuf)
{
client_t *client = source->client;
@@ -593,11 +643,13 @@
mpeg_sync *mpeg_sync = client->format_data;
int unprocessed = mpeg_complete_frames (mpeg_sync, refbuf, 0);
- if (unprocessed < 0)
+
+ if (unprocessed < 0 || unprocessed > 8000) /* too much unprocessed really, may not be parsing */
{
- WARN1 ("no frames processed for %s", source->mount);
- mpeg_cleanup (client->format_data);
- client->format_data = NULL;
+ if (unprocessed > 0 && refbuf->len)
+ return 0;
+ WARN1 ("no frames detected for %s", source->mount);
+ source->flags &= ~SOURCE_RUNNING;
return -1;
}
if (unprocessed > 0)
@@ -624,16 +676,15 @@
if (unprocessed < source_mp3->queue_block_size)
len = source_mp3->queue_block_size;
else
- {
- if (unprocessed > 8000)
- return -1;
len = unprocessed + 2000;
- }
+
leftover = refbuf_new (len);
memcpy (leftover->data, refbuf->data + refbuf->len, unprocessed);
source_mp3->read_data = leftover;
source_mp3->read_count = unprocessed;
}
+ if (source->format->read_bytes < 2500)
+ stats_event_args (source->mount, "audio_codecid", "%d", (mpeg_sync->layer ? 2 : 10));
return refbuf->len ? 0 : -1;
}
@@ -749,41 +800,12 @@
bytes -= metadata_remaining;
memmove (src, src+metadata_remaining, bytes);
- /* assign metadata if it's greater than 1 byte, and the text has changed */
- if (source_mp3->build_metadata_len > 1 &&
- strcmp (source_mp3->build_metadata+1, source_mp3->metadata->data+1) != 0)
+ if (source_mp3->build_metadata_len > 1 && parse_shoutcast_metadata (source_mp3) < 0)
{
- char *title = filter_shoutcast_metadata (source, source_mp3->build_metadata, source_mp3->build_metadata_len);
-
- if (title)
- {
- refbuf_t *flvmeta, *meta = refbuf_new (source_mp3->build_metadata_len);
-
- memcpy (meta->data, source_mp3->build_metadata, source_mp3->build_metadata_len);
- DEBUG1("shoutcast metadata %.80s", meta->data+1);
- flvmeta = flv_meta_allocate (strlen(title)+20);
- logging_playlist (source->mount, title, source->listeners);
- stats_event_conv (source->mount, "title", title, source->format->charset);
- flv_meta_append (flvmeta, "title", title);
- stats_event_time (source->mount, "metadata_updated");
- yp_touch (source->mount);
- free (title);
- source_mp3->inline_url = strstr (meta->data+1, "StreamUrl='");
- if (source_mp3->inline_url)
- source_mp3->inline_url += 11;
- flv_meta_append (flvmeta, NULL, NULL);
- meta->associated = flvmeta;
- refbuf_release (source_mp3->metadata);
- source_mp3->metadata = meta;
- }
- else
- {
- ERROR2 ("Incorrect metadata format \"%.80s\", ending %s",
- source_mp3->build_metadata+1, source->mount);
- source->flags &= ~SOURCE_RUNNING;
- refbuf_release (refbuf);
- return NULL;
- }
+ WARN1 ("Unable to parse metadata insert for %s", source->mount);
+ source->flags &= ~SOURCE_RUNNING;
+ refbuf_release (refbuf);
+ return NULL;
}
source_mp3->offset = 0;
source_mp3->build_metadata_len = 0;
@@ -833,7 +855,7 @@
if (plugin->type == FORMAT_TYPE_AAC || plugin->type == FORMAT_TYPE_MPEG)
{
client_mp3->specific = calloc (1, sizeof(mpeg_sync));
- mpeg_setup (client_mp3->specific, plugin->mount);
+ mpeg_setup (client_mp3->specific, client->connection.ip);
mpeg_check_numframes (client_mp3->specific, 1);
}
Modified: icecast/branches/kh/icecast/src/format_mp3.h
===================================================================
--- icecast/branches/kh/icecast/src/format_mp3.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/format_mp3.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -21,7 +21,7 @@
#include "format.h"
#include "mpeg.h"
-#define CLIENT_WANTS_FLV (CLIENT_FORMAT_BIT<<20)
+#define CLIENT_WANTS_FLV (CLIENT_FORMAT_BIT<<10)
typedef struct {
refbuf_t *associated;
Modified: icecast/branches/kh/icecast/src/format_ogg.c
===================================================================
--- icecast/branches/kh/icecast/src/format_ogg.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/format_ogg.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -617,7 +617,7 @@
{
if (write_ogg_data (source, header) == 0)
return;
- header = header->next;
+ header = header->associated;
}
ogg_info->file_headers = refbuf->associated;
}
Modified: icecast/branches/kh/icecast/src/fserve.c
===================================================================
--- icecast/branches/kh/icecast/src/fserve.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/fserve.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -439,7 +439,7 @@
fbinfo finfo;
fullpath = util_get_path_from_normalised_uri (path, 0);
- INFO2 ("checking for file %s (%s)", path, fullpath);
+ DEBUG2 ("checking for file %s (%s)", path, fullpath);
if (strcmp (util_get_extension (fullpath), "m3u") == 0)
m3u_requested = 1;
@@ -607,6 +607,7 @@
static void file_release (client_t *client)
{
fh_node *fh = client->shared_data;
+ int ret = -1;
if (fh)
{
@@ -618,17 +619,17 @@
stats_event (fh->finfo.mount, NULL, NULL);
fh_release (fh);
}
- if (client->respcode == 200)
+ if (client->flags & CLIENT_AUTHENTICATED)
{
const char *mount = httpp_getvar (client->parser, HTTPP_VAR_URI);
ice_config_t *config = config_get_config ();
mount_proxy *mountinfo = config_find_mount (config, mount);
if (mountinfo && mountinfo->access_log.name)
logging_access_id (&mountinfo->access_log, client);
- auth_release_listener (client, mount, mountinfo);
+ ret = auth_release_listener (client, mount, mountinfo);
config_release_config();
}
- else
+ if (ret < 0)
{
client->flags &= ~CLIENT_AUTHENTICATED;
client_destroy (client);
@@ -925,6 +926,7 @@
thread_mutex_unlock (&fh->lock);
if (client->respcode == 0)
fill_http_headers (client, finfo->mount, NULL);
+ client->mount = fh->finfo.mount;
}
else
client->check_buffer = format_generic_write_to_client;
Modified: icecast/branches/kh/icecast/src/global.h
===================================================================
--- icecast/branches/kh/icecast/src/global.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/global.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -15,7 +15,7 @@
#include "config.h"
-#define ICE_LISTEN_QUEUE 10
+#define ICE_LISTEN_QUEUE 64
#define ICE_RUNNING 1
#define ICE_HALTING 2
@@ -34,6 +34,7 @@
int running;
+ int new_connections_slowdown;
int sources;
int clients;
int schedule_config_reread;
Modified: icecast/branches/kh/icecast/src/mpeg.c
===================================================================
--- icecast/branches/kh/icecast/src/mpeg.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/mpeg.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -129,18 +129,21 @@
int64_t bitrate = get_mpeg_bitrate (mp, p);
int samples = get_samples_per_mpegframe (mp->ver, mp->layer);
+ int samplerate = get_mpegframe_samplerate (p);
+ if (samplerate == 0 || (mp->samplerate && mp->samplerate != samplerate))
+ return -1;
mp->sample_count = samples;
if (bitrate > 0 && samples > 0)
{
bitrate *= 1000;
if (mp->layer == LAYER_1)
{
- frame_len = (int)(12 * bitrate / get_mpegframe_samplerate(p) + padding) * 4; // ??
+ frame_len = (int)(12 * bitrate / samplerate + padding) * 4; // ??
}
else
{
- frame_len = (int)(samples / 8 * bitrate / get_mpegframe_samplerate(p) + padding);
+ frame_len = (int)(samples / 8 * bitrate / samplerate + padding);
}
}
return frame_len;
@@ -151,6 +154,8 @@
{
int frame_len = get_mpeg_frame_length (mp, p);
+ if (frame_len <= 0)
+ return -1;
if (remaining - frame_len < 0)
return 0;
if (mp->raw)
@@ -212,8 +217,8 @@
}
mp->syncbytes = 3;
memcpy (&mp->fixed_headerbits[0], p, 3);
- INFO4 ("detected AAC MPEG-%s, rate %d, channels %d on %s", id ? "2" : "4", mp->samplerate,
- mp->channels, mp->mount);
+ if (mp->check_numframes > 1)
+ INFO4 ("detected AAC MPEG-%s, rate %d, channels %d on %s", id ? "2" : "4", mp->samplerate, mp->channels, mp->mount);
mp->process_frame = handle_aac_frame;
return 1;
}
@@ -229,13 +234,13 @@
mp->ver = (p[1] & 0x18) >> 3;
if (mp->layer && version [mp->ver] && layer[mp->layer])
{
- int checking = mp->check_numframes;
+ int checking = mp->check_numframes, samplerate;
unsigned char *fh = p;
char stream_type[20];
// au.crc = (p[1] & 0x1) == 0;
- mp->samplerate = get_mpegframe_samplerate (p);
- if (mp->samplerate == 0)
+ samplerate = get_mpegframe_samplerate (p);
+ if (samplerate == 0)
return -1;
while (checking)
{
@@ -250,6 +255,8 @@
//DEBUG3 ("checking frame %d, but need more data (%d,%d)", 5-checking, frame_len, remaining);
return 0;
}
+ if (samplerate != get_mpegframe_samplerate (fh+frame_len))
+ return -1;
if (fh[frame_len] != 255 || fh[frame_len+1] != p[1])
{
//DEBUG4 ("checking frame %d, but code is %x %x %x", 5-checking, fh[frame_len], fh[frame_len+1], fh[frame_len+2]);
@@ -260,6 +267,7 @@
fh += frame_len;
checking--;
}
+ mp->samplerate = samplerate;
if (((p[3] & 0xC0) >> 6) == 3)
mp->channels = 1;
else
@@ -267,7 +275,8 @@
mp->syncbytes = 2;
memcpy (&mp->fixed_headerbits[0], p, 2);
snprintf (stream_type, sizeof (stream_type), "%s %s", version [mp->ver], layer[mp->layer]);
- INFO4 ("%s detected (%d, %d) on %s", stream_type, mp->samplerate, mp->channels, mp->mount);
+ if (mp->check_numframes > 1)
+ INFO4 ("%s detected (%d, %d) on %s", stream_type, mp->samplerate, mp->channels, mp->mount);
mp->process_frame = handle_mpeg_frame;
return 1;
}
@@ -363,8 +372,6 @@
if (mp == NULL)
return 0; /* leave as-is */
- if (mp->resync_count > 30)
- return -1;
mp->sample_count = 0;
if (mp->surplus)
{
@@ -413,7 +420,7 @@
}
if (ret == 0)
{
- if (remaining > 10000)
+ if (remaining > 20000)
return -1;
new_block->len = 0;
return remaining;
@@ -433,7 +440,10 @@
completed++;
}
if (remaining < 0 || remaining > new_block->len)
+ {
+ ERROR2 ("block inconsistency (%d, %d)", remaining, new_block->len);
abort();
+ }
new_block->len -= remaining;
return remaining;
}
Modified: icecast/branches/kh/icecast/src/slave.c
===================================================================
--- icecast/branches/kh/icecast/src/slave.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/slave.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -108,30 +108,6 @@
};
-relay_server *relay_free (relay_server *relay)
-{
- relay_server *next = relay->next;
-
- DEBUG2("freeing relay %s (%p)", relay->localmount, relay);
- if (relay->source)
- source_free_source (relay->source);
- while (relay->masters)
- {
- relay_server_master *master = relay->masters;
- relay->masters = master->next;
- xmlFree (master->ip);
- xmlFree (master->mount);
- xmlFree (master->bind);
- free (master);
- }
- xmlFree (relay->localmount);
- if (relay->username) xmlFree (relay->username);
- if (relay->password) xmlFree (relay->password);
- free (relay);
- return next;
-}
-
-
relay_server *relay_copy (relay_server *r)
{
relay_server *copy = calloc (1, sizeof (relay_server));
@@ -162,7 +138,6 @@
copy->mp3metadata = r->mp3metadata;
copy->on_demand = r->on_demand;
copy->interval = r->interval;
- copy->source = r->source;
copy->running = 1;
r->source = NULL;
DEBUG2 ("copy relay %s at %p", copy->localmount, copy);
@@ -275,15 +250,26 @@
{
char *location;
/* add enough for "http://" the port ':' and nul */
- int len = strlen (mountpoint) + strlen (checking->server) + 13;
+ int len = strlen (mountpoint) + strlen (checking->server) + 15;
+ const char *user = client->username;
+ const char *pass = client->password;
+ const char *args = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
+ const char *colon = ":", *at_sign = "@";
- INFO2 ("redirecting listener to slave server "
- "at %s:%d", checking->server, checking->port);
- location = malloc (len);
- snprintf (location, len, "http://%s:%d%s", checking->server,
- checking->port, mountpoint);
+ if (args)
+ len += strlen (args);
+ else
+ args = "";
+ if (user && pass)
+ len += strlen (user) + strlen (pass);
+ else
+ colon = at_sign = user = pass = "";
+ INFO2 ("redirecting listener to slave server at %s:%d", checking->server, checking->port);
+ location = alloca (len);
+ snprintf (location, len, "http://%s%s%s%s%s:%d%s%s",
+ user, colon, pass, at_sign,
+ checking->server, checking->port, mountpoint, args);
client_send_302 (client, location);
- free (location);
ret = 1;
}
trail = &checking->next;
@@ -430,8 +416,6 @@
}
else
{
- http_parser_t *old_parser = client->parser;
-
if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
{
ERROR2("Error from relay request: %s (%s)", relay->localmount,
@@ -439,9 +423,9 @@
break;
}
sock_set_blocking (streamsock, 0);
- client->parser = parser;
- if (old_parser)
- httpp_destroy (old_parser);
+ client->parser = parser; // old parser will be free in the format clear
+ client->connection.discon_time = 0;
+ client->connection.con_time = time (NULL);
client_set_queue (client, NULL);
free (server);
free (mount);
@@ -486,7 +470,6 @@
WARN1 ("Failed to complete initialisation on %s", relay->localmount);
continue;
}
- src->parser = client->parser;
return 1;
} while ((master = master->next) && global.running == ICE_RUNNING);
return -1;
@@ -558,38 +541,16 @@
}
-int relay_install (relay_server *relay)
+static int relay_install (relay_server *relay)
{
client_t *client;
- source_t *source = source_reserve (relay->localmount);
- if (source == NULL)
- {
- WARN1 ("new relay but source \"%s\" already exists", relay->localmount);
- return -1;
- }
- relay->source = source;
- stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
-
global_lock();
- client = source->client = client_create (SOCK_ERROR);
+ client = client_create (SOCK_ERROR);
global_unlock();
client->shared_data = relay;
client->ops = &relay_startup_ops;
- if (relay->on_demand)
- {
- ice_config_t *config;
- mount_proxy *mountinfo;
-
- thread_mutex_lock (&source->lock);
- config = config_get_config();
- mountinfo = config_find_mount (config, source->mount);
- source->flags |= SOURCE_ON_DEMAND;
- source_update_settings (config, source, mountinfo);
- thread_mutex_unlock (&source->lock);
- config_release_config();
- }
client->flags |= CLIENT_ACTIVE;
DEBUG1 ("adding relay client for %s", relay->localmount);
client_add_worker (client);
@@ -661,13 +622,7 @@
INFO1 ("relay details changed on \"%s\", restarting", new->localmount);
existing_relay->new_details = new;
if (source && source->client)
- {
- /* wakeup client to change relay details */
- client_t *client = source->client;
- worker_t *worker = client->worker;
- client->schedule_ms = 0;
- worker_wakeup (worker);
- }
+ source->client->schedule_ms = 0;
}
*existing_p = existing_relay->next; /* leave client to free structure */
new->next = new_list;
@@ -699,12 +654,15 @@
static void update_relays (relay_server **relay_list, relay_server *new_relay_list)
{
relay_server *active_relays, *cleanup_relays = new_relay_list;
+ worker_t *worker = NULL;
- thread_mutex_lock (&(config_locks()->relay_lock));
if (relay_list)
{
+ thread_mutex_lock (&(config_locks()->relay_lock));
active_relays = update_relay_set (relay_list, new_relay_list);
cleanup_relays = *relay_list;
+ *relay_list = active_relays;
+ thread_mutex_unlock (&(config_locks()->relay_lock));
}
while (cleanup_relays)
{
@@ -712,25 +670,19 @@
source_t *source = to_release->source;
cleanup_relays = to_release->next;
- if (source && source->client) // use client to free up if available
+ if (source && source->client)
{
- to_release->cleanup = 1;
- if (to_release->running)
- {
- /* relay has been removed from xml/streamlist, shut down active relay */
- INFO1 ("source shutdown request on \"%s\"", to_release->localmount);
- to_release->running = 0;
- source->flags &= ~SOURCE_RUNNING;
- }
+ INFO1 ("relay shutdown request on \"%s\"", to_release->localmount);
source->client->schedule_ms = 0;
- continue;
}
- relay_free (to_release);
+ to_release->cleanup = 1;
}
- /* re-assign new set */
- if (relay_list)
- *relay_list = active_relays;
- thread_mutex_unlock (&(config_locks()->relay_lock));
+ worker = workers;
+ while (worker)
+ {
+ worker_wakeup (worker);
+ worker = worker->next;
+ }
}
@@ -909,7 +861,8 @@
}
if (master->ok) /* merge retrieved relays */
update_relays (&global.master_relays, master->new_relays);
- update_relays (NULL, master->new_relays);
+ while (master->new_relays)
+ master->new_relays = config_clear_relay (master->new_relays);
curl_easy_cleanup (handle);
free (master->server);
@@ -976,6 +929,7 @@
thread_rwlock_unlock (&slaves_lock);
}
+
static void slave_startup (void)
{
ice_config_t *config;
@@ -1020,6 +974,10 @@
}
global_add_bitrates (global.out_bitrate, 0L, THREAD_TIME_MS(¤t));
+ if (global.new_connections_slowdown)
+ global.new_connections_slowdown--;
+ if (global.new_connections_slowdown > 30)
+ global.new_connections_slowdown = 30;
if (global.running != ICE_RUNNING)
break;
@@ -1187,7 +1145,8 @@
relay = client->shared_data;
relay->source = old_details->source;
old_details->source = NULL;
- relay_free (old_details);
+ config_clear_relay (old_details);
+
thread_mutex_unlock (&(config_locks()->relay_lock));
}
return relay;
@@ -1202,9 +1161,10 @@
thread_mutex_lock (&source->lock);
if (source_running (source))
{
- if (relay->cleanup || relay->running == 0)
+ if (relay->cleanup) relay->running = 0;
+ if (relay->running == 0)
source->flags &= ~SOURCE_RUNNING;
- if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 5000000)
+ if (relay->on_demand && source->listeners == 0 && source->format->read_bytes > 2000000)
source->flags &= ~SOURCE_RUNNING;
return source_read (source);
}
@@ -1215,6 +1175,14 @@
{
if (relay->running)
fallback = 0;
+ if (client->worker->current_time.tv_sec - client->connection.con_time < 3)
+ {
+ /* force a delayed restart if stream cannot be maintained for 3 seconds, by
+ * which time any listeners in restart wait would of come back on */
+ source->flags &= ~SOURCE_PAUSE_LISTENERS;
+ client->connection.con_time = 0;
+ fallback = 1;
+ }
global_lock();
global.sources--;
stats_event_args (NULL, "sources", "%d", global.sources);
@@ -1243,29 +1211,40 @@
if (source->listeners)
{
INFO1 ("listeners on terminating relay %s, rechecking", relay->localmount);
+ source->termination_count = source->listeners;
+ source->flags &= ~SOURCE_PAUSE_LISTENERS;
+ source->flags |= SOURCE_LISTENERS_SYNC;
thread_mutex_unlock (&source->lock);
return 0; /* listeners may be paused, recheck and let them leave this stream */
}
INFO1 ("shutting down relay %s", relay->localmount);
- source_clear_listeners (source);
thread_mutex_unlock (&source->lock);
stats_event (relay->localmount, NULL, NULL); // needed???
slave_update_all_mounts();
connection_close (&client->connection);
return -1;
}
- if (relay->running)
- {
- if (client->connection.con_time == 0)
+ do {
+ if (relay->running)
{
- client->schedule_ms = client->worker->time_ms + (relay->interval * 1000);
- INFO2 ("standing by to restart relay on %s in %d seconds", relay->localmount, relay->interval);
+ if (client->connection.con_time)
+ {
+ INFO1 ("standing by to restart relay on %s", relay->localmount);
+ if (relay->on_demand && source->listeners == 0)
+ source_clear_source (source);
+ break;
+ }
+ else
+ {
+ client->schedule_ms = client->worker->time_ms + (relay->interval * 1000);
+ INFO2 ("standing by to restart relay on %s in %d seconds", relay->localmount, relay->interval);
+ }
}
else
- INFO1 ("standing by to restart relay on %s", relay->localmount);
- }
- else
- INFO1 ("Relay %s is now disabled", relay->localmount);
+ INFO1 ("Relay %s is now disabled", relay->localmount);
+ source_clear_source (source);
+ slave_update_all_mounts();
+ } while (0);
thread_mutex_unlock (&source->lock);
connection_close (&client->connection);
@@ -1277,7 +1256,11 @@
static void relay_release (client_t *client)
{
relay_server *relay = client->shared_data;
- relay_free (relay);
+ DEBUG2("freeing relay %s (%p)", relay->localmount, relay);
+ if (relay->source)
+ source_free_source (relay->source);
+ relay->source = NULL;
+ config_clear_relay (relay);
client_destroy (client);
}
@@ -1289,20 +1272,48 @@
if (relay->cleanup)
{
+ source_t *source = relay->source;
+ if (source == NULL)
+ return -1;
/* listeners may be still on, do a recheck */
relay->running = 0;
+ DEBUG1 ("cleanup detected on %s", relay->localmount);
client->ops = &relay_client_ops;
client->schedule_ms += 25;
return 0;
}
- if (global.running != ICE_RUNNING)
- return 0; /* wait for cleanup */
- if (relay->running == 0)
+ if (global.running != ICE_RUNNING || relay->running == 0) /* wait for cleanup */
{
- client->schedule_ms = client->worker->time_ms + 1000;
+ client->schedule_ms = client->worker->time_ms + 50;
return 0;
}
+ if (relay->source == NULL) /* new relay, so set up a source if we can */
+ {
+ source_t *source = source_reserve (relay->localmount);
+ if (source == NULL)
+ {
+ INFO1 ("new relay but source \"%s\" exists, waiting", relay->localmount);
+ client->schedule_ms = client->worker->time_ms + 1000;
+ return 0;
+ }
+ if (relay->on_demand)
+ {
+ ice_config_t *config;
+ mount_proxy *mountinfo;
+ thread_mutex_lock (&source->lock);
+ config = config_get_config();
+ mountinfo = config_find_mount (config, source->mount);
+ source->flags |= SOURCE_ON_DEMAND;
+ source_update_settings (config, source, mountinfo);
+ thread_mutex_unlock (&source->lock);
+ config_release_config();
+ }
+ relay->source = source;
+ source->client = client;
+ stats_event_flags (source->mount, "listener_connections", "0", STATS_COUNTERS);
+ }
+
if (relay->on_demand)
{
source_t *src = relay->source;
@@ -1332,7 +1343,7 @@
}
if (start_relay == 0)
{
- client->schedule_ms += 500;
+ client->schedule_ms += 60000;
return 0;
}
INFO1 ("Detected listeners on relay %s", relay->localmount);
@@ -1343,7 +1354,9 @@
if (relays_connecting > 3)
{
thread_spin_unlock (&relay_start_lock);
- client->schedule_ms += 500;
+ client->schedule_ms += 200;
+ if (global.new_connections_slowdown < 5)
+ global.new_connections_slowdown++;
return 0;
}
relays_connecting++;
Modified: icecast/branches/kh/icecast/src/source.c
===================================================================
--- icecast/branches/kh/icecast/src/source.c 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/source.c 2011-01-25 03:06:25 UTC (rev 17788)
@@ -240,51 +240,6 @@
}
-static int source_listener_drop (source_t *source, client_t *client, mount_proxy *mountinfo)
-{
- client->shared_data = NULL;
- client_set_queue (client, NULL);
- if (mountinfo && mountinfo->access_log.name)
- logging_access_id (&mountinfo->access_log, client);
-
- return auth_release_listener (client, source->mount, mountinfo);
-}
-
-void source_clear_listeners (source_t *source)
-{
- int i;
- ice_config_t *config;
- mount_proxy *mountinfo;
-
- /* lets drop any listeners still connected */
- DEBUG2 ("source %s has %d clients to release", source->mount, source->listeners);
- i = 0;
- config = config_get_config ();
- mountinfo = config_find_mount (config, source->mount);
- while (1)
- {
- avl_node *node = source->clients->root->right;
- if (node)
- {
- client_t *client = node->key;
- if (avl_delete (source->clients, client, NULL) == 0)
- {
- source_listener_drop (source, client, mountinfo);
- i++;
- }
- continue;
- }
- break;
- }
- config_release_config ();
- DEBUG1 ("no listeners are attached to %s", source->mount);
- if (i)
- stats_event_sub (NULL, "listeners", i);
- source->listeners = 0;
- source->prev_listeners = 0;
-}
-
-
void source_clear_source (source_t *source)
{
int do_twice = 0;
@@ -443,11 +398,20 @@
source->flags &= ~SOURCE_RUNNING;
do
{
- if (source->fallback.mount && source->termination_count == 0)
+ if (source->flags & SOURCE_LISTENERS_SYNC)
{
- DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
- free (source->fallback.mount);
- source->fallback.mount = NULL;
+ if (source->termination_count)
+ {
+ client->schedule_ms += 20;
+ thread_mutex_unlock (&source->lock);
+ return 0;
+ }
+ if (source->fallback.mount)
+ {
+ DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
+ free (source->fallback.mount);
+ source->fallback.mount = NULL;
+ }
source->flags &= ~SOURCE_LISTENERS_SYNC;
}
if (source->listeners == 0)
@@ -467,6 +431,11 @@
{
update_source_stats (source);
source->client_stats_update = current + source->stats_interval;
+ }
+ if (current >= source->worker_balance_recheck)
+ {
+ int recheck = global.sources > 6 ? global.sources : 6;
+ source->worker_balance_recheck = current + recheck;
if (source_change_worker (source))
return 1;
}
@@ -500,7 +469,7 @@
DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
source->timeout, (long)current);
WARN1 ("Disconnecting %s due to socket timeout", source->mount);
- source->flags &= ~SOURCE_RUNNING;
+ source->flags &= ~(SOURCE_RUNNING|SOURCE_PAUSE_LISTENERS);
skip = 0;
break;
}
@@ -555,10 +524,11 @@
refbuf_release (to_release);
continue;
}
- DEBUG0 ("weird state of min_queue point");
- refbuf_release (source->min_queue_point);
- source->min_queue_point = refbuf;
- source->min_queue_offset = refbuf->len;
+ if (source->min_queue_point != refbuf)
+ {
+ ERROR0 ("weird state of min_queue point");
+ abort();
+ }
break;
}
@@ -855,7 +825,7 @@
{
source_t *source = client->shared_data;
- if (source->flags & SOURCE_LISTENERS_SYNC)
+ if ((source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)) == SOURCE_LISTENERS_SYNC)
{
client->schedule_ms = client->worker->time_ms + 150;
return 0;
@@ -889,6 +859,8 @@
{
source->termination_count--;
//DEBUG2 ("termination count on %s now %lu", source->mount, source->termination_count);
+ if (client->connection.error)
+ return -1;
if (source->fallback.mount)
{
int move_failed;
@@ -929,12 +901,12 @@
long total_written = 0;
int ret = 0;
- if (client->connection.error)
- return -1;
-
if (source->flags & SOURCE_LISTENERS_SYNC)
return listener_waiting_on_source (source, client);
+ if (client->connection.error)
+ return -1;
+
/* check for limited listener time */
if (client->connection.discon_time &&
client->worker->current_time.tv_sec >= client->connection.discon_time)
@@ -1053,9 +1025,9 @@
source->prev_listeners = -1;
source->bytes_sent_since_update = 0;
source->stats_interval = 5;
- source->termination_count = 0;
/* so the first set of average stats after 3 seconds */
source->client_stats_update = source->last_read + 3;
+ source->worker_balance_recheck = source->last_read + 20;
source->skip_duration = 80;
util_dict_free (source->audio_info);
@@ -1101,6 +1073,7 @@
/* on demand relays should of already called this */
if ((source->flags & SOURCE_ON_DEMAND) == 0)
slave_update_all_mounts();
+ source->flags &= ~SOURCE_ON_DEMAND;
}
@@ -1152,6 +1125,7 @@
if (source->client->connection.con_time)
update_source_stats (source);
+ source->flags &= ~SOURCE_ON_DEMAND;
source->termination_count = source->listeners;
mountinfo = config_find_mount (config_get_config(), source->mount);
if (mountinfo)
@@ -1185,13 +1159,12 @@
if (len)
{
char name[100], value[200];
- char *esc;
+ int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
- sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
- esc = util_url_unescape (value);
- if (esc)
+ if (n == 2 && strncmp (name, "ice-", 4) == 0)
{
- if (source_running (source))
+ char *esc = util_url_unescape (value);
+ if (esc)
{
util_dict_set (source->audio_info, name, esc);
stats_event (source->mount, name, esc);
@@ -1696,7 +1669,6 @@
if (source->format)
client->connection.sent_bytes = source->format->read_bytes;
- source_clear_listeners (source);
thread_mutex_unlock (&source->lock);
source_free_source (source);
@@ -1707,30 +1679,32 @@
static int source_listener_release (source_t *source, client_t *client)
{
- ice_config_t *config;
- mount_proxy *mountinfo;
- int ret;
-
/* search through sources client list to find previous link in list */
source_listener_detach (source, client);
+ client->shared_data = NULL;
if (source->listeners == 0)
rate_reduce (source->format->out_bitrate, 1000);
- /* if listener disconnects at the same time as the source does then we need
- * to account for it as the source thinks it is still connected */
- if (source->termination_count)
- source->termination_count--;
-
stats_event_dec (NULL, "listeners");
stats_event_args (source->mount, "listeners", "%lu", source->listeners);
/* change of listener numbers, so reduce scope of global sampling */
global_reduce_bitrate_sampling (global.out_bitrate);
- config = config_get_config ();
- mountinfo = config_find_mount (config, source->mount);
- ret = source_listener_drop (source, client, mountinfo);
- config_release_config();
- return ret;
+ if (client->respcode)
+ {
+ int ret;
+ ice_config_t *config = config_get_config ();
+ mount_proxy *mountinfo = config_find_mount (config, source->mount);
+
+ if (mountinfo && mountinfo->access_log.name)
+ logging_access_id (&mountinfo->access_log, client);
+
+ ret = auth_release_listener (client, source->mount, mountinfo);
+ config_release_config();
+ return ret;
+ }
+ client_send_404 (client, NULL); // failed on-demand relay?
+ return 0;
}
@@ -1898,17 +1872,27 @@
*/
void source_setup_listener (source_t *source, client_t *client)
{
- if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
+ if (source->flags & SOURCE_LISTENERS_SYNC)
+ client->ops = &listener_wait_ops;
+ else if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
client->ops = &listener_pause_ops;
else
client->ops = &listener_client_ops;
client->shared_data = source;
client->queue_pos = 0;
+ client->mount = source->mount;
client->check_buffer = http_source_listener;
// add client to the source
avl_insert (source->clients, client);
source->listeners++;
+ if ((source->flags & (SOURCE_ON_DEMAND|SOURCE_RUNNING)) == SOURCE_ON_DEMAND)
+ {
+ source->client->schedule_ms = 0;
+ client->schedule_ms += 300;
+ worker_wakeup (source->client->worker);
+ DEBUG0 ("woke up relay");
+ }
}
@@ -1967,7 +1951,6 @@
source_free_source (source);
return 0;
}
- source->parser = client->parser;
client->respcode = 200;
client->shared_data = source;
@@ -1991,6 +1974,7 @@
thread_mutex_unlock (&source->lock);
}
client->flags |= CLIENT_ACTIVE;
+ worker_wakeup (client->worker);
}
else
{
Modified: icecast/branches/kh/icecast/src/source.h
===================================================================
--- icecast/branches/kh/icecast/src/source.h 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/src/source.h 2011-01-25 03:06:25 UTC (rev 17788)
@@ -28,8 +28,8 @@
int listener_send_trigger;
client_t *client;
- http_parser_t *parser;
time_t client_stats_update;
+ time_t worker_balance_recheck;
struct _format_plugin_tag *format;
@@ -86,7 +86,7 @@
#define SOURCE_TERMINATING (1<<4)
#define SOURCE_LISTENERS_SYNC (1<<5)
-#define source_available(x) (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && (x)->fallback.mount == NULL)
+#define source_available(x) (((x)->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) && ((x)->flags & SOURCE_LISTENERS_SYNC) == 0)
#define source_running(x) ((x)->flags & SOURCE_RUNNING)
source_t *source_reserve (const char *mount);
Modified: icecast/branches/kh/icecast/win32/icecast2.iss
===================================================================
--- icecast/branches/kh/icecast/win32/icecast2.iss 2011-01-20 23:00:15 UTC (rev 17787)
+++ icecast/branches/kh/icecast/win32/icecast2.iss 2011-01-25 03:06:25 UTC (rev 17788)
@@ -3,7 +3,7 @@
[Setup]
AppName=Icecast2-KH
-AppVerName=Icecast v2.3.2-kh28
+AppVerName=Icecast v2.3.2-kh29
AppPublisherURL=http://www.icecast.org
AppSupportURL=http://www.icecast.org
AppUpdatesURL=http://www.icecast.org
@@ -13,7 +13,7 @@
LicenseFile=..\COPYING
InfoAfterFile=..\README
OutputDir=.
-OutputBaseFilename=icecast2_win32_v2.3.2-kh28_setup
+OutputBaseFilename=icecast2_win32_v2.3.2-kh29_setup
WizardImageFile=icecast2logo2.bmp
WizardImageStretch=no
VersionInfoVersion=2.3.2
More information about the commits
mailing list