[xiph-cvs] cvs commit: ices/src output.c process.c process.h registry.h Makefile.am config.c config.h encode.c encode.h event.h ices.c im_oss.c im_oss.h im_playlist.c im_playlist.h im_stdinpcm.c im_stdinpcm.h im_sun.c im_sun.h input.c input.h metadata.c metadata.h playlist_basic.c playlist_basic.h reencode.c reencode.h savefile.c signals.c signals.h stream.c stream.h stream_rewrite.c

Michael Smith msmith at xiph.org
Thu Feb 7 01:11:32 PST 2002



msmith      02/02/07 01:11:30

  Modified:    src      Tag: branch-beta2-rewrite Makefile.am config.c
                        config.h encode.c encode.h event.h ices.c im_oss.c
                        im_oss.h im_playlist.c im_playlist.h im_stdinpcm.c
                        im_stdinpcm.h im_sun.c im_sun.h input.c input.h
                        metadata.c metadata.h playlist_basic.c
                        playlist_basic.h reencode.c reencode.h savefile.c
                        signals.c signals.h stream.c stream.h
                        stream_rewrite.c
  Added:       src      output.c process.c process.h registry.h
  Log:
  Rewrite of ices2 (in progress. Builds, usually works, some features not
  re-added yet)

Revision  Changes    Path
No                   revision

<p>No                   revision

<p>1.5.2.1   +5 -3      ices/src/Makefile.am

Index: Makefile.am
===================================================================
RCS file: /usr/local/cvsroot/ices/src/Makefile.am,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- Makefile.am	2001/10/21 16:22:40	1.5
+++ Makefile.am	2002/02/07 09:11:10	1.5.2.1
@@ -20,13 +20,15 @@
 
 bin_PROGRAMS = ices
 
-noinst_HEADERS = config.h input.h inputmodule.h im_playlist.h signals.h stream.h reencode.h encode.h playlist_basic.h logging.h im_stdinpcm.h $(ossheaders) $(sunheaders) event.h stream_shared.h metadata.h
-ices_SOURCES = input.c config.c stream.c ices.c signals.c im_playlist.c reencode.c encode.c playlist_basic.c im_stdinpcm.c $(osssources) $(sunsources) stream_shared.c savefile.c metadata.c stream_rewrite.c
+#noinst_HEADERS = config.h input.h im_playlist.h signals.h stream.h reencode.h encode.h playlist_basic.h logging.h im_stdinpcm.h $(ossheaders) $(sunheaders) event.h stream_shared.h metadata.h process.h
+#ices_SOURCES = input.c config.c stream.c ices.c signals.c im_playlist.c reencode.c encode.c playlist_basic.c im_stdinpcm.c $(osssources) $(sunsources) stream_shared.c savefile.c metadata.c stream_rewrite.c process.c
+noinst_HEADERS = config.h input.h im_playlist.h signals.h stream.h encode.h playlist_basic.h logging.h process.h $(ossheaders) im_stdinpcm.h
+ices_SOURCES = input.c config.c stream.c ices.c signals.c im_playlist.c encode.c playlist_basic.c process.c output.c $(osssources) im_stdinpcm.c
 
 ices_LDADD = net/libicenet.la thread/libicethread.la log/libicelog.la\
         avl/libiceavl.la timing/libicetiming.la
 
-LIBS = @LIBS@ -lpthread @SOCKET_LIBS@ @XML_LIBS@ @OGG_LIBS@ @VORBIS_LIBS@\
+LIBS = @LIBS@ -lpthread -lefence @SOCKET_LIBS@ @XML_LIBS@ @OGG_LIBS@ @VORBIS_LIBS@\
        @VORBISENC_LIBS@ @SHOUT_LIBS@
 
 CFLAGS = @CFLAGS@ @XML_CFLAGS@ @OGG_CFLAGS@ @VORBIS_CFLAGS@ @SHOUT_CFLAGS@

<p><p>1.6.2.1   +117 -210  ices/src/config.c

Index: config.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/config.c,v
retrieving revision 1.6
retrieving revision 1.6.2.1
diff -u -r1.6 -r1.6.2.1
--- config.c	2002/01/29 09:20:27	1.6
+++ config.c	2002/02/07 09:11:10	1.6.2.1
@@ -1,9 +1,9 @@
 /* config.c
  * - config file reading code, plus default settings.
  *
- * $Id: config.c,v 1.6 2002/01/29 09:20:27 msmith Exp $
+ * $Id: config.c,v 1.6.2.1 2002/02/07 09:11:10 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -25,31 +25,16 @@
 #include "config.h"
 #include "stream.h"
 #include "thread/thread.h"
+#include "registry.h"
 
 #define DEFAULT_BACKGROUND 0
 #define DEFAULT_LOGPATH "/tmp"
 #define DEFAULT_LOGFILE "ices.log"
 #define DEFAULT_LOGLEVEL 1
 #define DEFAULT_LOG_STDERR 1
-#define DEFAULT_STREAM_NAME "unnamed ices stream"
-#define DEFAULT_STREAM_GENRE "ices unset"
-#define DEFAULT_STREAM_DESCRIPTION "no description set"
-#define DEFAULT_PLAYLIST_MODULE "playlist"
-#define DEFAULT_HOSTNAME "localhost"
-#define DEFAULT_PORT 8000
-#define DEFAULT_PASSWORD "password"
-#define DEFAULT_MOUNT "/stream.ogg"
-#define DEFAULT_MANAGED 0
-#define DEFAULT_MIN_BITRATE -1
-#define DEFAULT_NOM_BITRATE -1
-#define DEFAULT_MAX_BITRATE -1
-#define DEFAULT_QUALITY 3
-#define DEFAULT_REENCODE 0
-#define DEFAULT_RECONN_DELAY 2
-#define DEFAULT_RECONN_ATTEMPTS 10
-#define DEFAULT_MAXQUEUELENGTH 100 /* Make it _BIG_ by default */
-#define DEFAULT_SAVEFILENAME NULL /* NULL == don't save */
 
+#define DEFAULT_MAX_QUEUE_LENGTH 100 /* Make it _BIG_ by default */
+
 /* helper macros so we don't have to write the same
 ** stupid code over and over
 */
@@ -83,143 +68,146 @@
 /* this is the global config variable */
 config_t *ices_config;
 
-static int _using_default_instance = 1;
-
-static void _free_instances(instance_t *instance)
-{
-	instance_t *next;
-	
-	next = NULL;
-	do 
-	{
-		config_free_instance(instance);
-		next = instance->next;
-		free(instance);
-		
-		instance = next;
-	} while (next != NULL);
-}
-
 void config_free_instance(instance_t *instance)
 {
-	if (instance->hostname) free(instance->hostname);
-	if (instance->password) free(instance->password);
-	if (instance->mount) free(instance->mount);
         if (instance->queue) 
         {
                 thread_mutex_destroy(&instance->queue->lock);
                 free(instance->queue);
         }
+    free(instance);
 }
 
-static void _set_instance_defaults(instance_t *instance)
+void config_free_params(module_param_t *param)
 {
-	instance->hostname = strdup(DEFAULT_HOSTNAME);
-	instance->port = DEFAULT_PORT;
-	instance->password = strdup(DEFAULT_PASSWORD);
-	instance->mount = strdup(DEFAULT_MOUNT);
-    instance->managed = DEFAULT_MANAGED;
-	instance->min_br = DEFAULT_MIN_BITRATE;
-	instance->nom_br = DEFAULT_NOM_BITRATE;
-	instance->max_br = DEFAULT_MAX_BITRATE;
-    instance->quality = DEFAULT_QUALITY;
-	instance->encode = DEFAULT_REENCODE;
-	instance->reconnect_delay = DEFAULT_RECONN_DELAY;
-	instance->reconnect_attempts = DEFAULT_RECONN_ATTEMPTS;
-	instance->max_queue_length = DEFAULT_MAXQUEUELENGTH;
-	instance->savefilename = DEFAULT_SAVEFILENAME;
-
-	instance->queue = calloc(1, sizeof(buffer_queue));
-	thread_mutex_create(&instance->queue->lock);
-	instance->serial = rand();
-
-	instance->next = NULL;
+	module_param_t *next;
+	next = NULL;
+	do 
+	{
+		if (param->name) free(param->name);
+		if (param->value) free(param->value);
+		next = param->next;
+		free(param);
+		
+		param = next;
+	} while (next != NULL);
 }
 
-static void _parse_encode(instance_t *instance,xmlDocPtr doc, xmlNodePtr node)
+static process_chain_element *_parse_module(xmlDocPtr doc, xmlNodePtr node,
+        process_chain_element *chain_start)
 {
-	instance->encode = 1;
-	do {
-		if (node == NULL) break;
-		if (xmlIsBlankNode(node)) continue;
+    process_chain_element *module = calloc(1, sizeof(process_chain_element));
+    char *modulename=NULL;
+    int i = 0, foundmodule=0;
+    module_param_t *param, *params=NULL;
+    
+    if(node == NULL || xmlIsBlankNode(node)) {
+        fprintf(stderr, "Null or empty node in config read\n");
+        return NULL;
+    }
+
+    SET_PARM_STRING("name", modulename);
+    while(registered_modules[i].open) {
+        if(!strcmp(modulename, registered_modules[i].name)) {
+            foundmodule = 1;
+            break;
+        }
+        i++;
+    }
+
+    if(!foundmodule) {
+        fprintf(stderr, "Could not find module \"%s\"\n", modulename);
+        return NULL;
+    }
+
+    fprintf(stderr, "Configuring module \"%s\"...\n", modulename);
+
+    node = node->xmlChildrenNode;
+
+    if(node == NULL || xmlIsBlankNode(node)) {
+        fprintf(stderr, "Null or empty node in config read\n");
+        return NULL;
+    }
+
+    do {
+        if(node == NULL) break;
+        if(xmlIsBlankNode(node)) continue;
+
+        if(strcmp(node->name, "param") == 0) {
+            param = (module_param_t *)calloc(1, sizeof(module_param_t));
+            SET_PARM_STRING("name", param->name);
+            SET_STRING(param->value);
+            param->next = NULL;
+
+            if(params == NULL)
+                params = param;
+            else {
+                module_param_t *p = params;
+                while(p->next != NULL)
+                    p = p->next;
+                p->next = param;
+            }
 
-        if (strcmp(node->name, "nominal-bitrate") == 0)
-			SET_INT(instance->nom_br);
-        else if (strcmp(node->name, "minimum-bitrate") == 0)
-			SET_INT(instance->min_br);
-        else if (strcmp(node->name, "maximum-bitrate") == 0)
-			SET_INT(instance->max_br);
-        else if (strcmp(node->name, "quality") == 0)
-			SET_FLOAT(instance->quality);
-		else if (strcmp(node->name, "samplerate") == 0)
-			SET_INT(instance->samplerate);
-		else if (strcmp(node->name, "channels") == 0)
-			SET_INT(instance->channels);
+        }
         } while ((node = node->next));
 
-    if(instance->nom_br > 0 || instance->min_br > 0 || instance->max_br > 0)
-        instance->managed = 1;
-    else
-        instance->managed = 0;
-}
+    module->open = registered_modules[i].open;
+    module->params = params;
 
+    if(!chain_start)
+        return module;
+    else {
+        process_chain_element *current = chain_start;
+        while(current->next)
+            current = current->next;
+        current->next = module;
+        return chain_start;
+    }
+}
 
 static void _parse_instance(config_t *config, xmlDocPtr doc, xmlNodePtr node)
 {
-	instance_t *instance, *i;
+    process_chain_element *chain = NULL;
+    instance_t *instance = calloc(1, sizeof(instance_t));
+    instance_t *prev;
 
-	instance = (instance_t *)calloc(1, sizeof(instance_t));
-	_set_instance_defaults(instance);
+    instance->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
 
         do 
         {
                 if (node == NULL) break;
                 if (xmlIsBlankNode(node)) continue;
 
-		if (strcmp(node->name, "hostname") == 0)
-			SET_STRING(instance->hostname);
-		else if (strcmp(node->name, "port") == 0)
-			SET_INT(instance->port);
-		else if (strcmp(node->name, "password") == 0)
-			SET_STRING(instance->password);
-		else if (strcmp(node->name, "savefile") == 0)
-			SET_STRING(instance->savefilename);
-		else if (strcmp(node->name, "mount") == 0)
-			SET_STRING(instance->mount);
-		else if(strcmp(node->name, "reconnectdelay") == 0)
-			SET_INT(instance->reconnect_delay);
-		else if(strcmp(node->name, "reconnectattempts") == 0)
-			SET_INT(instance->reconnect_attempts);
-		else if(strcmp(node->name, "maxqueuelength") == 0)
-			SET_INT(instance->max_queue_length);
-		else if (strcmp(node->name, "encode") == 0)
-			_parse_encode(instance, doc, node->xmlChildrenNode);
+        if (strcmp(node->name, "maxqueuelength") == 0)
+            SET_INT(instance->max_queue_length);
+		if (strcmp(node->name, "module") == 0)
+            chain = _parse_module(doc, node, chain);
         } while ((node = node->next));
 
-	instance->next = NULL;
+    if(!chain) {
+        fprintf(stderr, "Couldn't create processing chain\n");
+        return;
+    }
 
-	if (_using_default_instance) 
-	{
-		_using_default_instance = 0;
-		_free_instances(config->instances);
-		config->instances = NULL;
-	}
+    instance->next = NULL;
+    instance->output_chain = chain;
+	instance->queue = calloc(1, sizeof(buffer_queue));
+	thread_mutex_create(&instance->queue->lock);
 
-	if (config->instances == NULL) 
-	{
-		config->instances = instance;
-	} 
-	else 
-	{
-		i = config->instances;
-		while (i->next != NULL) i = i->next;
-		i->next = instance;
-	}
+    prev = config->instances;
+    if(!prev)
+        config->instances = instance;
+    else {
+        while(prev->next)
+            prev = prev->next;
+
+        prev->next = instance;
+    }
 }
 
 static void _parse_input(config_t *config, xmlDocPtr doc, xmlNodePtr node)
 {
-	module_param_t *param, *p;
+    process_chain_element *chain = NULL;
 
         do 
         {
@@ -227,41 +215,10 @@
                 if (xmlIsBlankNode(node)) continue;
 
                 if (strcmp(node->name, "module") == 0)
-			SET_STRING(config->playlist_module);
-		else if (strcmp(node->name, "param") == 0) {
-			param = (module_param_t *)calloc(1, sizeof(module_param_t));
-			SET_PARM_STRING("name", param->name);
-			SET_STRING(param->value);
-			param->next = NULL;
-
-			if (config->module_params == NULL) 
-			{
-				config->module_params = param;
-			} 
-			else 
-			{
-				p = config->module_params;
-				while (p->next != NULL) p = p->next;
-				p->next = param;
-			}
-		}
+            chain = _parse_module(doc, node, chain);
         } while ((node = node->next));
-}
-
-static void _parse_metadata(config_t *config, xmlDocPtr doc, xmlNodePtr node)
-{
-	do 
-	{
-		if (node == NULL) break;
-		if (xmlIsBlankNode(node)) continue;
 
-		if (strcmp(node->name, "name") == 0)
-			SET_STRING(config->stream_name);
-		else if (strcmp(node->name, "genre") == 0)
-			SET_STRING(config->stream_genre);
-		else if (strcmp(node->name, "description") == 0)
-			SET_STRING(config->stream_description);
-	} while ((node = node->next));
+    config->input_chain = chain;
 }
 
 static void _parse_stream(config_t *config, xmlDocPtr doc, xmlNodePtr node)
@@ -271,9 +228,7 @@
                 if (node == NULL) break;
                 if (xmlIsBlankNode(node)) continue;
 
-		if (strcmp(node->name, "metadata") == 0)
-			_parse_metadata(config, doc, node->xmlChildrenNode);
-		else if (strcmp(node->name, "input") == 0)
+		if (strcmp(node->name, "input") == 0)
                         _parse_input(config, doc, node->xmlChildrenNode);
                 else if (strcmp(node->name, "instance") == 0)
                         _parse_instance(config, doc, node->xmlChildrenNode);
@@ -302,48 +257,10 @@
         } while ((node = node->next));
 }
 
-static void _set_defaults(config_t *c)
-{
-	instance_t *instance;
-
-	c->background = DEFAULT_BACKGROUND;
-	c->logpath = strdup(DEFAULT_LOGPATH);
-	c->logfile = strdup(DEFAULT_LOGFILE);
-	c->loglevel = DEFAULT_LOGLEVEL;
-    c->log_stderr = DEFAULT_LOG_STDERR;
-
-	c->stream_name = strdup(DEFAULT_STREAM_NAME);
-	c->stream_genre = strdup(DEFAULT_STREAM_GENRE);
-	c->stream_description = strdup(DEFAULT_STREAM_DESCRIPTION);
-
-	c->playlist_module = strdup(DEFAULT_PLAYLIST_MODULE);
-	
-	c->module_params = NULL;
-
-	instance = (instance_t *)malloc(sizeof(instance_t));
-	_set_instance_defaults(instance);
-	c->instances = instance;
-}
-
-static void _free_params(module_param_t *param)
-{
-	module_param_t *next;
-	next = NULL;
-	do 
-	{
-		if (param->name) free(param->name);
-		if (param->value) free(param->value);
-		next = param->next;
-		free(param);
-		
-		param = next;
-	} while (next != NULL);
-}
-
 void config_initialize(void)
 {
         ices_config = (config_t *)calloc(1, sizeof(config_t));
-	_set_defaults(ices_config);
+	//_set_defaults(ices_config);
         srand(time(NULL));
 }
 
@@ -351,18 +268,6 @@
 {
         if (ices_config == NULL) return;
 
-	if (ices_config->module_params != NULL) 
-	{
-		_free_params(ices_config->module_params);
-		ices_config->module_params = NULL;
-	}
-
-	if (ices_config->instances != NULL) 
-	{
-		_free_instances(ices_config->instances);
-		ices_config->instances = NULL;
-	}
-
         free(ices_config);
         ices_config = NULL;
 }
@@ -391,6 +296,8 @@
         return 1;
 }
 
+
+/* FIXME: Write this again, for the new version 
 void config_dump(void)
 {
         config_t *c = ices_config;
@@ -439,7 +346,7 @@
         
         fprintf(stderr, "\n");
 }
-
+*/
 
 
 

<p><p>1.10.2.1  +14 -61    ices/src/config.h

Index: config.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/config.h,v
retrieving revision 1.10
retrieving revision 1.10.2.1
diff -u -r1.10 -r1.10.2.1
--- config.h	2002/01/29 09:20:27	1.10
+++ config.h	2002/02/07 09:11:10	1.10.2.1
@@ -1,9 +1,9 @@
 /* config.h
  * - configuration, and global structures built from config
  *
- * $Id: config.h,v 1.10 2002/01/29 09:20:27 msmith Exp $
+ * $Id: config.h,v 1.10.2.1 2002/02/07 09:11:10 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -14,53 +14,20 @@
 #ifndef __CONFIG_H__
 #define __CONFIG_H__
 
-#include "stream.h"
-#include "inputmodule.h"
+//#include "stream.h"
+#include "process.h"
+#include "thread/thread.h"
 
-typedef struct _module_param_tag
+typedef struct _module_param_t
 {
         char *name;
         char *value;
 
-	struct _module_param_tag *next;
+	struct _module_param_t *next;
 } module_param_t;
 
-/* FIXME: orward declaraction because my headers are a mess. */
-struct buffer_queue;
+struct _instance_t;
 
-typedef struct _instance_tag
-{
-	char *hostname;
-	int port;
-	char *password;
-	char *mount;
-	int reconnect_delay;
-	int reconnect_attempts;
-	int encode;
-	int max_queue_length;
-	char *savefilename;
-
-	/* Parameters for re-encoding */
-    int managed;
-	int min_br, nom_br, max_br;
-    float quality;
-	int samplerate;
-	int channels;
-	
-	/* private */
-    FILE *savefile;
-	int serial;
-	int buffer_failures;
-	int died;
-	int kill;
-	int skip;
-    int wait_for_critical;
-
-	struct buffer_queue *queue;
-
-	struct _instance_tag *next;
-} instance_t;
-
 typedef struct _config_tag
 {
         int background;
@@ -69,32 +36,17 @@
         int loglevel;
     int log_stderr;
 
-	/* <stream> */
-
-	/* <metadata> */
-
-	char *stream_name;
-	char *stream_genre;
-	char *stream_description;
-	
-	/* <playlist> */
-	
-	char *playlist_module;
-	module_param_t *module_params;
-
-	/* <instance> */
-
-	instance_t *instances;
-
         /* private */
         int log_id;
         int shutdown;
-    char *metadata_filename;
         cond_t queue_cond;
         cond_t event_pending_cond;
         mutex_t refcount_lock;
         mutex_t flush_lock;
-	input_module_t *inmod;
+
+    process_chain_element *input_chain;
+    struct _instance_t *instances;
+    
         struct _config_tag *next;
 } config_t;
 
@@ -106,7 +58,8 @@
 int config_read(const char *filename);
 void config_dump(void);
 
-void config_free_instance(instance_t *instance);
+void config_free_instance(struct _instance_t *instance);
+void config_free_params(module_param_t *params);
 
 #endif /* __CONFIG_H__ */
 

<p><p>1.6.2.1   +209 -67   ices/src/encode.c

Index: encode.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/encode.c,v
retrieving revision 1.6
retrieving revision 1.6.2.1
diff -u -r1.6 -r1.6.2.1
--- encode.c	2002/01/28 12:52:59	1.6
+++ encode.c	2002/02/07 09:11:10	1.6.2.1
@@ -1,9 +1,9 @@
 /* encode.c
  * - runtime encoding of PCM data.
  *
- * $Id: encode.c,v 1.6 2002/01/28 12:52:59 msmith Exp $
+ * $Id: encode.c,v 1.6.2.1 2002/02/07 09:11:10 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -21,12 +21,15 @@
 
 #include "config.h"
 #include "encode.h"
+#include "process.h"
 
 #define MODULE "encode/"
 #include "logging.h"
 
-void encode_clear(encoder_state *s)
+static void close_module(process_chain_element *mod) 
 {
+    encoder_state *s = mod->priv_data;
+
         if(s)
         {
             LOG_DEBUG0("Clearing encoder engine");
@@ -38,65 +41,63 @@
         }
 }
 
-encoder_state *encode_initialise(int channels, int rate, int managed,
-        int min_br, int nom_br, int max_br, float quality,
-		int serial, vorbis_comment *vc)
+static int event_handler(process_chain_element *self, event_type ev, 
+        void *param)
 {
-	encoder_state *s = calloc(1, sizeof(encoder_state));
-	ogg_packet h1,h2,h3;
-
-    if(managed)
-        LOG_INFO5("Encoder initialising with bitrate management: %d channels, "
-                 "%d Hz, minimum bitrate %d, nominal %d, maximum %d",
-                 channels, rate, min_br, nom_br, max_br);
-    else
-	    LOG_INFO3("Encoder initialising at %d channels, %d Hz, quality %f", 
-		    	channels, rate, quality);
-
-	vorbis_info_init(&s->vi);
-
-    if(managed)
-	    vorbis_encode_init(&s->vi, channels, rate, max_br, nom_br, min_br);
-    else
-        vorbis_encode_init_vbr(&s->vi, channels, rate, quality*0.1);
-
-	vorbis_analysis_init(&s->vd, &s->vi);
-	vorbis_block_init(&s->vd, &s->vb);
+    switch(ev)
+    {
+        case EVENT_SHUTDOWN:
+            close_module(self);
+            break;
+            /* FIXME: handle next track event */
+        default:
+            return -1;
+    }
 
-	ogg_stream_init(&s->os, serial);
+    return 0;
+}
 
-	vorbis_analysis_headerout(&s->vd, vc, &h1,&h2,&h3);
-	ogg_stream_packetin(&s->os, &h1);
-	ogg_stream_packetin(&s->os, &h2);
-	ogg_stream_packetin(&s->os, &h3);
+static void encode_finish(encoder_state *s)
+{
+	ogg_packet op;
+	vorbis_analysis_wrote(&s->vd, 0);
 
-	s->in_header = 1;
-    s->samplerate = rate;
-    s->samples_in_current_page = 0;
-    s->prevgranulepos = 0;
+	while(vorbis_analysis_blockout(&s->vd, &s->vb)==1)
+	{
+		vorbis_analysis(&s->vb, &op);
+		ogg_stream_packetin(&s->os, &op);
+	}
 
-	return s;
 }
 
-void encode_data_float(encoder_state *s, float **pcm, int samples)
+static void append_page(ref_buffer *buf, ogg_page *page)
 {
-	float **buf;
-	int i;
+    int old = buf->len;
+    buf->len += page->header_len + page->body_len;
+    buf->buf = realloc(buf->buf, buf->len);
+    memcpy(buf->buf + old, page->header, page->header_len);
+    memcpy(buf->buf + old + page->header_len, page->body, page->body_len);
+}
 
-	buf = vorbis_analysis_buffer(&s->vd, samples); 
+static int encode_flush(encoder_state *s, ogg_page *og)
+{
+	int result = ogg_stream_pageout(&s->os, og);
 
-	for(i=0; i < s->vi.channels; i++)
+	if(s->in_header)
         {
-		memcpy(buf[i], pcm[i], samples*sizeof(float));
+		LOG_ERROR0("Unhandled case: flushing stream before headers have been "
+				  "output. Behaviour may be unpredictable.");
         }
-
-	vorbis_analysis_wrote(&s->vd, samples);
 
-    s->samples_in_current_page += samples;
+	if(result<=0)
+		return 0;
+	else
+		return 1;
 }
 
 /* Requires little endian data (currently) */
-void encode_data(encoder_state *s, signed char *buf, int bytes, int bigendian)
+static void encode_data(encoder_state *s, signed char *buf, int bytes, 
+        int bigendian)
 {
         float **buffer;
         int i,j;
@@ -141,7 +142,7 @@
  * Caller should loop over this to ensure that we don't end up with
  * excessive buffering in libvorbis.
  */
-int encode_dataout(encoder_state *s, ogg_page *og)
+static int encode_dataout(encoder_state *s, ogg_page *og)
 {
         ogg_packet op;
         int result;
@@ -168,13 +169,12 @@
                             ogg_stream_packetin(&s->os, &op);
                 }
 
-        /* FIXME: Make this threshold configurable.
-         * We don't want to buffer too many samples in one page when doing
+        /* We don't want to buffer too many samples in one page when doing
          * live encoding - that's fine for non-live encoding, but breaks
          * badly when doing things live. 
          * So, we flush the stream if we have too many samples buffered
          */
-        if(s->samples_in_current_page > s->samplerate * 2)
+        if(s->samples_in_current_page > (int)(s->samplerate * s->max_page_time))
         {
             LOG_DEBUG1("Forcing flush: Too many samples in current page (%d)", 
                     s->samples_in_current_page);
@@ -195,34 +195,176 @@
         }
 }
 
-void encode_finish(encoder_state *s)
+static int encode_initialise(encoder_state *s, int channels, int rate)
 {
-	ogg_packet op;
-	vorbis_analysis_wrote(&s->vd, 0);
+	ogg_packet h1,h2,h3;
+    int ret;
 
-	while(vorbis_analysis_blockout(&s->vd, &s->vb)==1)
-	{
-		vorbis_analysis(&s->vb, &op);
-		ogg_stream_packetin(&s->os, &op);
-	}
+    /* FIXME: Return values from this need checking! */
+    if(s->managed)
+        LOG_INFO5("Encoder initialising with bitrate management: %d channels, "
+                 "%d Hz, minimum bitrate %d, nominal %d, maximum %d",
+                 channels, rate, s->min_br, s->nom_br, s->max_br);
+    else
+	    LOG_INFO3("Encoder initialising at %d channels, %d Hz, quality %f", 
+		    	channels, rate, s->quality);
 
+	vorbis_info_init(&s->vi);
+
+    if(s->managed)
+	    ret = vorbis_encode_init(&s->vi, channels, rate, s->max_br, 
+                s->nom_br, s->min_br);
+    else
+        ret = vorbis_encode_init_vbr(&s->vi, channels, rate, s->quality*0.1);
+
+    if(ret) {
+        LOG_DEBUG1("Encoder init failed: returned %d", ret);
+        return ret;
+    }
+
+	vorbis_analysis_init(&s->vd, &s->vi);
+	vorbis_block_init(&s->vd, &s->vb);
+
+	ogg_stream_init(&s->os, s->serial);
+
+	vorbis_analysis_headerout(&s->vd, &s->vc, &h1,&h2,&h3);
+	ogg_stream_packetin(&s->os, &h1);
+	ogg_stream_packetin(&s->os, &h2);
+	ogg_stream_packetin(&s->os, &h3);
+
+	s->in_header = 1;
+    s->samplerate = rate;
+    s->samples_in_current_page = 0;
+    s->prevgranulepos = 0;
+
+    return 0;
 }
 
-int encode_flush(encoder_state *s, ogg_page *og)
+static void encode_data_float(encoder_state *s, float **pcm, int samples)
 {
-	int result = ogg_stream_pageout(&s->os, og);
+	float **buf;
+	int i;
 
-	if(s->in_header)
+	buf = vorbis_analysis_buffer(&s->vd, samples); 
+
+	for(i=0; i < s->vi.channels; i++)
         {
-		LOG_ERROR0("Unhandled case: flushing stream before headers have been "
-				  "output. Behaviour may be unpredictable.");
+		memcpy(buf[i], pcm[i], samples*sizeof(float));
         }
 
-	if(result<=0)
-		return 0;
-	else
-		return 1;
+	vorbis_analysis_wrote(&s->vd, samples);
+
+    s->samples_in_current_page += samples;
+}
+
+static int encode_chunk(instance_t *instance, void *self, 
+        ref_buffer *in, ref_buffer **out)
+{
+    encoder_state *enc = self;
+    ogg_page page;
+
+    *out = NULL;
+
+    if(in->flags & FLAG_BOS) {
+        if(enc->initialised) {
+            encode_finish(enc);
+            while(encode_flush(enc, &page) > 0) {
+                if(*out == NULL)
+                    *out = new_ref_buffer(MEDIA_VORBIS, NULL, 0);
+                append_page(*out, &page);
+            }
+            /* FIXME: rewrite the below func. 
+            encode_clear(enc);
+            */
+        }
+        encode_initialise(enc, in->channels, in->rate);
+        enc->initialised = 1;
+    }
+
+    if(!enc->initialised)
+        LOG_ERROR0("encode called without being initialised, absent BOS flag?");
+
+    if(in->subtype == SUBTYPE_PCM_FLOAT) {
+        float **pcm = (float **)in->buf;
+        encode_data_float(enc, pcm, in->len);
+    }
+    else {
+        signed char *buf = in->buf;
+        encode_data(enc, buf, in->len, in->subtype == SUBTYPE_PCM_BE_16);
+    }
+
+    while(encode_dataout(enc, &page) > 0) {
+        if(*out == NULL)
+            *out = new_ref_buffer(MEDIA_VORBIS, NULL, 0);
+        append_page(*out, &page);
+    }
+
+    release_buffer(in);
+
+    return ((*out)?(*out)->len:0);
 }
+
+int encode_open_module(process_chain_element *mod, module_param_t *params)
+{
+    encoder_state *enc = calloc(1, sizeof(encoder_state));
+    module_param_t *paramstart = params;
+
+    mod->name = "process-encode";
+    mod->input_type = MEDIA_PCM;
+    mod->output_type = MEDIA_VORBIS;
+
+    mod->process = encode_chunk;
+    mod->event_handler = event_handler;
+
+    mod->priv_data = enc;
+
+    enc->managed = 0;
+    enc->quality = 0.3;
+
+    enc->nom_br = 128000;
+    enc->min_br = enc->max_br = -1;
+    enc->max_page_time = 2.0; /* Seconds */
+
+    while(params) {
+
+        if(!strcmp(params->name, "quality")) {
+            /* FIXME: Ensure this is bounded to [0,10] */
+            enc->managed = 0;
+            enc->quality = atof(params->value) * 0.1;
+        }
+        else if(!strcmp(params->name, "nominal-bitrate")) {
+            enc->managed = 1;
+            enc->nom_br = atoi(params->value);
+        }
+        else if(!strcmp(params->name, "minimum-bitrate")) {
+            enc->managed = 1;
+            enc->min_br = atoi(params->value);
+        }
+        else if(!strcmp(params->name, "maximum-bitrate")) {
+            enc->managed = 1;
+            enc->max_br = atoi(params->value);
+        }
+        else if(!strcmp(params->name, "page-length-threshold")) {
+            enc->max_page_time = atof(params->value);
+        }
+        else
+            LOG_ERROR1("Unrecognised module parameter \"%s\"", params->name);
+
+        params = params->next;
+    }
+
+    if(enc->managed)
+        LOG_INFO3("Bitrate management engine enabled: "
+                "min = %d, nom = %d, max = %d", enc->min_br, 
+                enc->nom_br, enc->max_br);
+    else
+        LOG_INFO1("Full VBR encoding: quality level %f", enc->quality);
+
+    config_free_params(paramstart);
+
+    return 0;
+}
+
 
 
 

<p><p>1.3.2.1   +15 -11    ices/src/encode.h

Index: encode.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/encode.h,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- encode.h	2002/01/28 00:19:15	1.3
+++ encode.h	2002/02/07 09:11:11	1.3.2.1
@@ -1,9 +1,9 @@
 /* encode.h
  * - encoding functions
  *
- * $Id: encode.h,v 1.3 2002/01/28 00:19:15 msmith Exp $
+ * $Id: encode.h,v 1.3.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -18,7 +18,19 @@
 #include <vorbis/codec.h>
 
 typedef struct {
+    int initialised;
+
+    int managed;
+    int min_br;
+    int nom_br;
+    int max_br;
+    double quality;
+    double max_page_time;
+
+    int serial;
+    
         ogg_stream_state os;
+    vorbis_comment vc;
         vorbis_block vb;
         vorbis_dsp_state vd;
         vorbis_info vi;
@@ -29,15 +41,7 @@
         int in_header;
 } encoder_state;
 
-encoder_state *encode_initialise(int channels, int rate, int managed,
-    int min_br, int nom_br, int max_br, float quality,
-	int serial, vorbis_comment *vc);
-void encode_clear(encoder_state *s);
-void encode_data_float(encoder_state *s, float **pcm, int samples);
-void encode_data(encoder_state *s, signed char *buf, int bytes, int bigendian);
-int encode_dataout(encoder_state *s, ogg_page *og);
-void encode_finish(encoder_state *s);
-int encode_flush(encoder_state *s, ogg_page *og);
+int encode_open_module(process_chain_element *mod, module_param_t *params);
 
 #endif
 

<p><p>1.2.2.1   +4 -4      ices/src/event.h

Index: event.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/event.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- event.h	2001/09/25 12:04:21	1.2
+++ event.h	2002/02/07 09:11:11	1.2.2.1
@@ -1,9 +1,9 @@
 /* event.h
  * - Generic interface for passing events to modules.
  *
- * $Id: event.h,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: event.h,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -14,13 +14,13 @@
 #ifndef __EVENT_H__
 #define __EVENT_H__
 
-enum event_type {
+typedef enum {
         EVENT_SHUTDOWN, /* Full/final shutdown. MUST NOT ignore */
         EVENT_PAUSE, /* temporary shutdown. Can be ignored */
         EVENT_NEXTTRACK, /* Start a new track in some way */
         EVENT_RECONF, /* Reconfigure self, if possible */
         EVENT_METADATAUPDATE, /* Incoming new metadata */
-};
+} event_type;
 
 #endif /* __EVENT_H__ */
 

<p><p>1.4.2.1   +2 -2      ices/src/ices.c

Index: ices.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/ices.c,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- ices.c	2002/01/29 09:20:27	1.4
+++ ices.c	2002/02/07 09:11:11	1.4.2.1
@@ -1,7 +1,7 @@
 /* ices.c
  * - Main startup, thread launching, and cleanup code.
  *
- * $Id: ices.c,v 1.4 2002/01/29 09:20:27 msmith Exp $
+ * $Id: ices.c,v 1.4.2.1 2002/02/07 09:11:11 msmith Exp $
  *
  * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
@@ -34,7 +34,7 @@
 
         if (argc != 2) 
         {
-		fprintf(stderr, "IceS version 2.0beta1\n"
+		fprintf(stderr, "IceS version 2.0beta2\n"
                                 "  (c) Copyright 2001-2002 Michael Smith <msmith at icecast.org>\n"
                                 "\n"
                                 "Usage: \"ices config.xml\"\n");

<p><p>1.5.2.1   +44 -30    ices/src/im_oss.c

Index: im_oss.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_oss.c,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- im_oss.c	2001/10/21 02:10:08	1.5
+++ im_oss.c	2002/02/07 09:11:11	1.5.2.1
@@ -1,9 +1,9 @@
 /* im_oss.c
  * - Raw PCM input from OSS devices
  *
- * $Id: im_oss.c,v 1.5 2001/10/21 02:10:08 jack Exp $
+ * $Id: im_oss.c,v 1.5.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -27,7 +27,7 @@
 #include "config.h"
 #include "stream.h"
 #include "metadata.h"
-#include "inputmodule.h"
+#include "process.h"
 
 #include "im_oss.h"
 
@@ -36,24 +36,23 @@
 
 #define BUFSIZE 8192
 
-static void close_module(input_module_t *mod)
+static void close_module(process_chain_element *mod)
 {
         if(mod)
         {
-		if(mod->internal)
+		if(mod->priv_data)
                 {
-			im_oss_state *s = mod->internal;
+			im_oss_state *s = mod->priv_data;
                         if(s->fd >= 0)
                                 close(s->fd);
                         thread_mutex_destroy(&s->metadatalock);
                         free(s);
                 }
-		free(mod);
         }
 }
-static int event_handler(input_module_t *mod, enum event_type ev, void *param)
+static int event_handler(process_chain_element *mod, event_type ev, void *param)
 {
-	im_oss_state *s = mod->internal;
+	im_oss_state *s = mod->priv_data;
 
         switch(ev)
         {
@@ -77,13 +76,13 @@
                         thread_mutex_unlock(&s->metadatalock);
                         break;
                 default:
-			LOG_WARN1("Unhandled event %d", ev);
                         return -1;
         }
 
         return 0;
 }
 
+/*
 static void metadata_update(void *self, vorbis_comment *vc)
 {
         im_oss_state *s = self;
@@ -101,6 +100,7 @@
 
         thread_mutex_unlock(&s->metadatalock);
 }
+*/
 
 /* Core streaming function for this module
  * This is what actually produces the data which gets streamed.
@@ -109,20 +109,27 @@
  *            0  Non-fatal error.
  *           <0  Fatal error.
  */
-static int oss_read(void *self, ref_buffer *rb)
+static int oss_read(instance_t *instance, void *self, 
+        ref_buffer *in, ref_buffer **out) 
 {
         int result;
         im_oss_state *s = self;
+    ref_buffer *rb;
+    
+    rb = new_ref_buffer(MEDIA_PCM, NULL, 0);
+    *out = rb;
 
         rb->buf = malloc(BUFSIZE*2*s->channels);
         result = read(s->fd, rb->buf, BUFSIZE*2*s->channels);
 
         rb->len = result;
-	rb->aux_data = s->rate*s->channels*2;
+    rb->subtype = SUBTYPE_PCM_LE_16;
+    rb->channels = s->channels;
+    rb->rate = s->rate;
 
         if(s->newtrack)
         {
-		rb->critical = 1;
+		rb->flags |= FLAG_CRITICAL | FLAG_BOS;
                 s->newtrack = 0;
         }
 
@@ -136,35 +143,36 @@
                         LOG_INFO0("Reached EOF, no more data available");
                 else
                         LOG_ERROR1("Error reading from audio device: %s", strerror(errno));
-		free(rb->buf);
+        release_buffer(rb);
                 return -1;
         }
 
         return rb->len;
 }
 
-input_module_t *oss_open_module(module_param_t *params)
+int oss_open_module(process_chain_element *mod, module_param_t *params)
 {
-	input_module_t *mod = calloc(1, sizeof(input_module_t));
         im_oss_state *s;
         module_param_t *current;
-	char *device = "/dev/dsp"; /* default device */
+	char *device = strdup("/dev/dsp"); /* default device */
         int format = AFMT_S16_LE;
         int channels, rate;
         int use_metadata = 1; /* Default to on */
 
-	mod->type = ICES_INPUT_PCM;
-	mod->subtype = INPUT_PCM_LE_16;
-	mod->getdata = oss_read;
-	mod->handle_event = event_handler;
-	mod->metadata_update = metadata_update;
+    mod->name = "input-oss";
+	mod->input_type = MEDIA_NONE;
+    mod->output_type = MEDIA_PCM;
+    
+	mod->process = oss_read;
+	mod->event_handler = event_handler;
 
-	mod->internal = calloc(1, sizeof(im_oss_state));
-	s = mod->internal;
+	mod->priv_data = calloc(1, sizeof(im_oss_state));
+	s = mod->priv_data;
 
         s->fd = -1; /* Set it to something invalid, for now */
         s->rate = 44100; /* Defaults */
         s->channels = 2; 
+    s->newtrack = 1;
 
         thread_mutex_create(&s->metadatalock);
 
@@ -177,17 +185,19 @@
                 else if(!strcmp(current->name, "channels"))
                         s->channels = atoi(current->value);
                 else if(!strcmp(current->name, "device"))
-			device = current->value;
+			device = strdup(current->value);
                 else if(!strcmp(current->name, "metadata"))
                         use_metadata = atoi(current->value);
-		else if(!strcmp(current->name, "metadatafilename"))
-			ices_config->metadata_filename = current->value;
+		/*else if(!strcmp(current->name, "metadatafilename"))
+			ices_config->metadata_filename = strdup(current->value);*/
                 else
-			LOG_WARN1("Unknown parameter %s for stdinpcm module", current->name);
+			LOG_WARN1("Unknown parameter %s for im_oss module", current->name);
 
                 current = current->next;
         }
 
+    config_free_params(params);
+
         /* First up, lets open the audio device */
         if((s->fd = open(device, O_RDONLY, 0)) == -1)
         {
@@ -241,6 +251,7 @@
         LOG_INFO3("Opened audio device %s at %d channel(s), %d Hz", 
                         device, channels, rate);
 
+    /* FIXME: Re-add this stuff.
         if(use_metadata)
         {
         if(ices_config->metadata_filename)
@@ -249,12 +260,15 @@
                     thread_create("im_oss-metadata", metadata_thread_stdin, mod, 1);
                 LOG_INFO0("Started metadata update thread");
         }
+    */
 
-	return mod;
+    free(device);
+	return 0;
 
 fail:
+    free(device);
         close_module(mod); /* safe, this checks for valid contents */
-	return NULL;
+	return -1;
 }
 
 

<p><p>1.2.2.1   +4 -4      ices/src/im_oss.h

Index: im_oss.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_oss.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- im_oss.h	2001/09/25 12:04:21	1.2
+++ im_oss.h	2002/02/07 09:11:11	1.2.2.1
@@ -1,9 +1,9 @@
 /* im_oss.h
  * - read pcm data from oss devices
  *
- * $Id: im_oss.h,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: im_oss.h,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -14,7 +14,7 @@
 #ifndef __IM_OSS_H__
 #define __IM_OSS_H__
 
-#include "inputmodule.h"
+#include "config.h"
 #include "thread.h"
 #include <ogg/ogg.h>
 
@@ -29,6 +29,6 @@
         mutex_t metadatalock;
 } im_oss_state; 
 
-input_module_t *oss_open_module(module_param_t *params);
+int oss_open_module(process_chain_element *mod, module_param_t *params);
 
 #endif  /* __IM_OSS_H__ */

<p><p>1.3.2.1   +59 -36    ices/src/im_playlist.c

Index: im_playlist.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_playlist.c,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- im_playlist.c	2001/10/21 10:21:59	1.3
+++ im_playlist.c	2002/02/07 09:11:11	1.3.2.1
@@ -1,9 +1,9 @@
 /* playlist.c
  * - Basic playlist functionality
  *
- * $Id: im_playlist.c,v 1.3 2001/10/21 10:21:59 msmith Exp $
+ * $Id: im_playlist.c,v 1.3.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -23,7 +23,7 @@
 #include "stream.h"
 #include "event.h"
 
-#include "inputmodule.h"
+#include "process.h"
 #include "im_playlist.h"
 
 #include "playlist_basic.h"
@@ -44,33 +44,35 @@
         {NULL,NULL}
 };
 
-static void close_module(input_module_t *mod)
+static void close_module(process_chain_element *mod)
 {
-	if (mod == NULL) return;
+	if (mod == NULL) {
+        LOG_ERROR0("close_module called without module available");
+        return;
+    }
 
-	if (mod->internal) 
+	if (mod->priv_data) 
         {
-		playlist_state_t *pl = (playlist_state_t *)mod->internal;
+		playlist_state_t *pl = (playlist_state_t *)mod->priv_data;
                 pl->clear(pl->data);
                 ogg_sync_clear(&pl->oy);
                 free(pl);
         }
-	free(mod);
 }
 
-static int event_handler(input_module_t *mod, enum event_type ev, void *param)
+static int event_handler(process_chain_element *self, event_type ev, 
+        void *param)
 {
         switch(ev)
         {
                 case EVENT_SHUTDOWN:
-			close_module(mod);
+			close_module(self);
                         break;
                 case EVENT_NEXTTRACK:
                         LOG_INFO0("Moving to next file in playlist.");
-			((playlist_state_t *)mod->internal)->nexttrack = 1;
+			((playlist_state_t *)self->priv_data)->nexttrack = 1;
                         break;
                 default:
-			LOG_WARN1("Unhandled event %d", ev);
                         return -1;
         }
 
@@ -84,7 +86,8 @@
  *            0  Non-fatal error.
  *           <0  Fatal error.
  */
-static int playlist_read(void *self, ref_buffer *rb)
+static int playlist_read(instance_t *instance, void *self, 
+        ref_buffer *in, ref_buffer **out)
 {
         playlist_state_t *pl = (playlist_state_t *)self;
         int bytes;
@@ -93,6 +96,11 @@
         int result;
         ogg_page og;
 
+    if(in) {
+        LOG_ERROR0("Non-null input buffer for playlist module, not allowed");
+        return -1;
+    }
+
         if (pl->errors > 5) 
         {
                 LOG_WARN0("Too many consecutive errors - exiting");
@@ -149,14 +157,25 @@
                         LOG_WARN1("Corrupt or missing data in file (%s)", pl->filename);
                 else if(result > 0)
                 {
-			rb->len = og.header_len + og.body_len;
-			rb->buf = malloc(rb->len);
-			rb->aux_data = og.header_len;
-
-			memcpy(rb->buf, og.header, og.header_len);
-			memcpy(rb->buf+og.header_len, og.body, og.body_len);
-			if(ogg_page_granulepos(&og)==0)
-				rb->critical = 1;
+            void *buf = malloc(og.header_len + og.body_len);
+            *out = new_ref_buffer(MEDIA_VORBIS, buf, 
+                    og.header_len + og.body_len);
+
+			(*out)->aux_data = og.header_len;
+            (*out)->channels = -1; /* We don't know yet, and it's unimportant */
+            (*out)->rate = -1;
+
+			memcpy((*out)->buf, og.header, og.header_len);
+			memcpy((*out)->buf+og.header_len, og.body, og.body_len);
+			if(ogg_page_granulepos(&og)==0) {
+				(*out)->flags = FLAG_CRITICAL;
+                if(!pl->prev_was_header_page) {
+                    (*out)->flags |= FLAG_BOS;
+                }
+                pl->prev_was_header_page = 1;
+            }
+            else
+                pl->prev_was_header_page = 0;
                         break;
                 }
 
@@ -168,7 +187,7 @@
                         if (feof(pl->current_file)) 
                         {
                                 pl->nexttrack = 1;
-				return playlist_read(pl,rb);
+				return playlist_read(instance, self, in, out);
                         } 
                         else 
                         {
@@ -186,23 +205,25 @@
 
         pl->errors=0;
 
-	return rb->len;
+	return (*out)->len;
 }
 
-input_module_t *playlist_open_module(module_param_t *params)
+int playlist_open_module(process_chain_element *mod,
+        module_param_t *params)
 {
-	input_module_t *mod = calloc(1, sizeof(input_module_t));
         playlist_state_t *pl;
         module_param_t *current;
         int (*init)(module_param_t *, playlist_state_t *)=NULL;
+
+    mod->name = "input-playlist";
+	mod->input_type = MEDIA_NONE;
+    mod->output_type = MEDIA_VORBIS;
 
-	mod->type = ICES_INPUT_VORBIS;
-	mod->getdata = playlist_read;
-	mod->handle_event = event_handler;
-	mod->metadata_update = NULL; /* Not used for playlists */
+	mod->process = playlist_read;
+	mod->event_handler = event_handler;
 
-	mod->internal = calloc(1, sizeof(playlist_state_t));
-	pl = (playlist_state_t *)mod->internal;
+	mod->priv_data = calloc(1, sizeof(playlist_state_t));
+	pl = (playlist_state_t *)mod->priv_data;
 
         current = params;
         while(current)
@@ -240,21 +261,23 @@
                 else 
                 {
                         ogg_sync_init(&pl->oy);
-			return mod; /* Success. Finished initialising */
+            config_free_params(params);
+			return 0; /* Success. Finished initialising */
                 }
         }
         else
                 LOG_ERROR0("No playlist type given, cannot initialise playlist module");
 
 fail:
+    config_free_params(params);
+
         if (mod) 
         {
-		if (mod->internal)
-			free(mod->internal);
-		free(mod);
+		if (mod->priv_data)
+			free(mod->priv_data);
         }
 
-	return NULL;
+	return -1;
 }
 
 

<p><p>1.2.2.1   +5 -4      ices/src/im_playlist.h

Index: im_playlist.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_playlist.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- im_playlist.h	2001/09/25 12:04:21	1.2
+++ im_playlist.h	2002/02/07 09:11:11	1.2.2.1
@@ -1,9 +1,9 @@
 /* im_playlist.h
  * - Basic playlist functionality
  *
- * $Id: im_playlist.h,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: im_playlist.h,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -14,7 +14,7 @@
 #ifndef __IM_PLAYLIST_H__
 #define __IM_PLAYLIST_H__
 
-#include "inputmodule.h"
+#include "process.h"
 #include <ogg/ogg.h>
 
 typedef struct _playlist_state_tag
@@ -23,6 +23,7 @@
         char *filename; /* Currently streaming file */
         int errors; /* Consecutive errors */
         int nexttrack;
+    int prev_was_header_page;
         ogg_sync_state oy;
         
         char *(*get_filename)(void *data); /* returns the next desired filename */
@@ -32,6 +33,6 @@
 
 } playlist_state_t;
 
-input_module_t *playlist_open_module(module_param_t *params);
+int playlist_open_module(process_chain_element *self, module_param_t *params);
 
 #endif  /* __IM_PLAYLIST_H__ */

<p><p>1.2.2.1   +34 -23    ices/src/im_stdinpcm.c

Index: im_stdinpcm.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_stdinpcm.c,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- im_stdinpcm.c	2001/09/25 12:04:21	1.2
+++ im_stdinpcm.c	2002/02/07 09:11:11	1.2.2.1
@@ -1,9 +1,9 @@
 /* im_stdinpcm.c
  * - Raw PCM input from stdin
  *
- * $Id: im_stdinpcm.c,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: im_stdinpcm.c,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -22,7 +22,7 @@
 #include "config.h"
 #include "stream.h"
 
-#include "inputmodule.h"
+#include "process.h"
 
 #include "im_stdinpcm.h"
 
@@ -31,23 +31,22 @@
 
 #define BUFSIZE 32768
 
-static int event_handler(input_module_t *mod, enum event_type ev, void *param)
+static int event_handler(process_chain_element *mod, event_type ev, 
+        void *param)
 {
         switch(ev)
         {
                 case EVENT_SHUTDOWN:
                         if(mod)
                         {
-				if(mod->internal)
-					free(mod->internal);
-				free(mod);
+				if(mod->priv_data)
+					free(mod->priv_data);
                         }
                         break;
                 case EVENT_NEXTTRACK:
-			((stdinpcm_state *)mod->internal)->newtrack = 1;
+			((stdinpcm_state *)mod->priv_data)->newtrack = 1;
                         break;
                 default:
-			LOG_WARN1("Unhandled event %d", ev);
                         return -1;
         }
 
@@ -61,48 +60,59 @@
  *            0  Non-fatal error.
  *           <0  Fatal error.
  */
-static int stdin_read(void *self, ref_buffer *rb)
+static int stdin_read(instance_t *instance, void *self, 
+        ref_buffer *in, ref_buffer **out)
 {
         int result;
         stdinpcm_state *s = self;
+    ref_buffer *rb;
 
+    rb = new_ref_buffer(MEDIA_PCM, NULL, 0);
+
         rb->buf = malloc(BUFSIZE);
         result = fread(rb->buf, 1,BUFSIZE, stdin);
 
         rb->len = result;
-	rb->aux_data = s->rate*s->channels*2;
+    rb->rate = s->rate;
+    rb->channels = s->channels;
+    rb->subtype = SUBTYPE_PCM_LE_16;
+
         if(s->newtrack)
         {
-		rb->critical = 1;
+		rb->flags |= FLAG_CRITICAL | FLAG_BOS;
                 s->newtrack = 0;
         }
 
         if(rb->len <= 0)
         {
                 LOG_INFO0("Reached EOF, no more data available\n");
-		free(rb->buf);
+		release_buffer(rb);
                 return -1;
         }
 
+    *out = rb;
+
         return rb->len;
 }
 
-input_module_t *stdin_open_module(module_param_t *params)
+int stdin_open_module(process_chain_element *mod, module_param_t *params)
 {
-	input_module_t *mod = calloc(1, sizeof(input_module_t));
         stdinpcm_state *s;
         module_param_t *current;
 
-	mod->type = ICES_INPUT_PCM;
-	mod->getdata = stdin_read;
-	mod->handle_event = event_handler;
-	mod->metadata_update = NULL;
+    mod->name = "input-stdinpcm";
+    mod->input_type = MEDIA_NONE;
+	mod->output_type = MEDIA_PCM;
 
-	mod->internal = malloc(sizeof(stdinpcm_state));
-	s = mod->internal;
+	mod->process = stdin_read;
+	mod->event_handler = event_handler;
 
+	mod->priv_data = malloc(sizeof(stdinpcm_state));
+	s = mod->priv_data;
+
         s->rate = 44100; /* Defaults */
-	s->channels = 2; 
+	s->channels = 2;
+    s->newtrack = 1;
 
         current = params;
 
@@ -117,8 +127,9 @@
 
                 current = current->next;
         }
+    config_free_params(params);
 
-	return mod;
+	return 0;
 }
 
 

<p><p>1.2.2.1   +4 -5      ices/src/im_stdinpcm.h

Index: im_stdinpcm.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_stdinpcm.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- im_stdinpcm.h	2001/09/25 12:04:21	1.2
+++ im_stdinpcm.h	2002/02/07 09:11:11	1.2.2.1
@@ -1,9 +1,9 @@
 /* im_stdinpcm.h
  * - stdin reading
  *
- * $Id: im_stdinpcm.h,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: im_stdinpcm.h,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -14,8 +14,7 @@
 #ifndef __IM_STDINPCM_H__
 #define __IM_STDINPCM_H__
 
-#include "inputmodule.h"
-#include <ogg/ogg.h>
+#include "config.h"
 
 typedef struct
 {
@@ -24,6 +23,6 @@
         int newtrack;
 } stdinpcm_state; 
 
-input_module_t *stdin_open_module(module_param_t *params);
+int stdin_open_module(process_chain_element *mod, module_param_t *params);
 
 #endif  /* __IM_STDINPCM_H__ */

<p><p>1.6.2.1   +2 -2      ices/src/im_sun.c

Index: im_sun.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_sun.c,v
retrieving revision 1.6
retrieving revision 1.6.2.1
diff -u -r1.6 -r1.6.2.1
--- im_sun.c	2001/10/21 10:20:31	1.6
+++ im_sun.c	2002/02/07 09:11:11	1.6.2.1
@@ -1,11 +1,11 @@
 /* im_sun.c
  * - Raw PCM input from Solaris audio devices
  *
- * $Id: im_sun.c,v 1.6 2001/10/21 10:20:31 msmith Exp $
+ * $Id: im_sun.c,v 1.6.2.1 2002/02/07 09:11:11 msmith Exp $
  *
  * by Ciaran Anscomb <ciarana at rd.bbc.co.uk>, based
  * on im_oss.c which is...
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.2.2.1   +2 -2      ices/src/im_sun.h

Index: im_sun.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/im_sun.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- im_sun.h	2001/09/25 12:04:21	1.2
+++ im_sun.h	2002/02/07 09:11:11	1.2.2.1
@@ -1,11 +1,11 @@
 /* im_sun.h
  * - read pcm data from sun devices
  *
- * $Id: im_sun.h,v 1.2 2001/09/25 12:04:21 msmith Exp $
+ * $Id: im_sun.h,v 1.2.2.1 2002/02/07 09:11:11 msmith Exp $
  *
  * by Ciaran Anscomb <ciarana at rd.bbc.co.uk>, based
  * on im_oss.c which is...
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.12.2.1  +66 -107   ices/src/input.c

Index: input.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/input.c,v
retrieving revision 1.12
retrieving revision 1.12.2.1
diff -u -r1.12 -r1.12.2.1
--- input.c	2002/01/23 03:40:28	1.12
+++ input.c	2002/02/07 09:11:11	1.12.2.1
@@ -2,9 +2,9 @@
  *  - Main producer control loop. Fetches data from input modules, and controls
  *    submission of these to the instance threads. Timing control happens here.
  *
- * $Id: input.c,v 1.12 2002/01/23 03:40:28 jack Exp $
+ * $Id: input.c,v 1.12.2.1 2002/02/07 09:11:11 msmith Exp $
  * 
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -15,41 +15,30 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
-#ifdef HAVE_STDINT_H
-# include <stdint.h>
-#endif
 #include <ogg/ogg.h>
 #include <vorbis/codec.h>
 #include <string.h>
+#ifdef HAVE_STDINT_H
+# include <stdint.h>
+#endif
 
 #include "thread/thread.h"
 #include "config.h"
 #include "stream.h"
 #include "timing.h"
 #include "input.h"
-#include "event.h"
-#include "inputmodule.h"
-#include "im_playlist.h"
-#include "im_stdinpcm.h"
+#include "process.h"
 
-#ifdef HAVE_OSS
-#include "im_oss.h"
-#endif
+#define MODULE "input/"
+#include "logging.h"
 
-#ifdef HAVE_SUN_AUDIO
-#include "im_sun.h"
-#endif
+#define MAX_BUFFER_FAILURES 15
 
 #ifdef _WIN32
 typedef __int64 int64_t
 typedef unsigned __int64 uint64_t
 #endif
 
-#define MODULE "input/"
-#include "logging.h"
-
-#define MAX_BUFFER_FAILURES 15
-
 typedef struct _timing_control_tag 
 {
         uint64_t starttime;
@@ -60,24 +49,6 @@
         long serialno;
 } timing_control;
 
-typedef struct _module 
-{
-	char *name;
-	input_module_t *(*open)(module_param_t *params);
-} module;
-
-static module modules[] = {
-	{ "playlist", playlist_open_module},
-	{ "stdinpcm", stdin_open_module},
-#ifdef HAVE_OSS
-	{ "oss", oss_open_module},
-#endif
-#ifdef HAVE_SUN_AUDIO
-	{ "sun", sun_open_module},
-#endif
-	{NULL,NULL}
-};
-
 /* This is identical to shout_sync(), really. */
 static void _sleep(timing_control *control)
 {
@@ -139,8 +110,10 @@
             control->samplerate = 0;
             ret = -1;
         }
-        else
+        else {
                     control->samplerate = vi.rate;
+        }
+
 
                 vorbis_comment_clear(&vc);
                 vorbis_info_clear(&vi);
@@ -163,10 +136,13 @@
 
         LOG_DEBUG0("Input queue flush requested");
 
+    thread_mutex_lock(&ices_config->flush_lock);
+
         thread_mutex_lock(&queue->lock);
         if(!queue->head)
         {
                 thread_mutex_unlock(&queue->lock);
+        thread_mutex_unlock(&ices_config->flush_lock);
                 return;
         }
 
@@ -175,7 +151,7 @@
         {
                 next = item->next;
 
-		if(!(keep_critical && item->buf->critical))
+		if(!(keep_critical && (item->buf->flags & FLAG_CRITICAL)))
                 {
                         thread_mutex_lock(&ices_config->refcount_lock);
                         item->buf->count--;
@@ -200,6 +176,7 @@
                 {
                         prev = item;
                         item = next;
+            LOG_DEBUG0("Keeping critical buffer on flush");
                 }
         }
 
@@ -214,37 +191,18 @@
         }
 
         thread_mutex_unlock(&queue->lock);
+    thread_mutex_unlock(&ices_config->flush_lock);
 }
 
 void input_loop(void)
 {
-	input_module_t *inmod=NULL;
         timing_control *control = calloc(1, sizeof(timing_control));
         instance_t *instance, *prev, *next;
         queue_item *queued;
+    process_chain_element *chain;
         int shutdown = 0;
-	int current_module = 0;
     int valid_stream = 1;
 
-	while(ices_config->playlist_module && modules[current_module].open)
-	{
-		if(!strcmp(ices_config->playlist_module, modules[current_module].name))
-		{
-			inmod = modules[current_module].open(ices_config->module_params);
-			break;
-		}
-		current_module++;
-	}
-
-	if(!inmod)
-	{
-		LOG_ERROR1("Couldn't initialise input module \"%s\"\n", 
-				ices_config->playlist_module);
-		return;
-	}
-
-	ices_config->inmod = inmod;
-
         thread_cond_create(&ices_config->queue_cond);
         thread_cond_create(&ices_config->event_pending_cond);
         thread_mutex_create(&ices_config->refcount_lock);
@@ -252,22 +210,25 @@
 
 
         /* ok, basic config stuff done. Now, we want to start all our listening
-	 * threads.
+	 * threads, and initialise the processing chains
          */
 
+    chain = ices_config->input_chain;
+    while(chain) {
+        chain->open(chain, chain->params);
+        chain = chain->next;
+    }
+
         instance = ices_config->instances;
 
         while(instance) 
         {
-		stream_description *arg = calloc(1, sizeof(stream_description));
-		arg->stream = instance;
-		arg->input = inmod;
-        /*
-		if(instance->savefilename != NULL)
-			thread_create("savefile", savefile_stream, arg, 1);
-         */
-		thread_create("stream", ices_instance_stream, arg, 1);
-
+        chain = instance->output_chain;
+        while(chain) {
+            chain->open(chain, chain->params);
+            chain = chain->next;
+        }
+		thread_create("stream", ices_instance_output, instance, 1);
                 instance = instance->next;
         }
 
@@ -277,7 +238,7 @@
          */
         while(!shutdown) 
         {
-		ref_buffer *chunk = calloc(1, sizeof(ref_buffer));
+        ref_buffer *outchunk;
                 buffer_queue *current;
                 int ret;
 
@@ -303,14 +264,12 @@
                                 else
                                         ices_config->instances = next;
 
-				/* Just in case, flush any existing buffers
-				 * Locks shouldn't be needed, but lets be SURE */
-				thread_mutex_lock(&ices_config->flush_lock);
+				/* Just in case, flush any existing buffers*/
                                 input_flush_queue(instance->queue, 0);
-				thread_mutex_unlock(&ices_config->flush_lock);
+
+                create_event(instance->output_chain, EVENT_SHUTDOWN, NULL, 1);
 
                                 config_free_instance(instance);
-				free(instance);
 
                                 instance = next;
                                 continue;
@@ -325,24 +284,31 @@
                 if(!instance)
                 {
                         shutdown = 1;
-			free(chunk);
                         continue;
                 }
 
                 if(ices_config->shutdown) /* We've been signalled to shut down, but */
                 {						  /* the instances haven't done so yet... */
                         timing_sleep(250); /* sleep for quarter of a second */
-			free(chunk);
                         continue;
                 }
 
-		/* get a chunk of data from the input module */
-		ret = inmod->getdata(inmod->internal, chunk);
+        /* Process the input chain -
+         * This includes fetching data from some sort of input module, and
+         * possibly some processing on this data.
+         */
+        ret = process_chain(NULL, ices_config->input_chain, NULL, &outchunk);
 
+        if(ret > 0 && outchunk->count != 1) {
+            LOG_ERROR0("Output chunk has wrong refcount!!");
+            exit(1);
+        }
+
                 /* input module signalled non-fatal error. Skip this chunk */
                 if(ret==0)
                 {
-			free(chunk);
+			if(outchunk)
+                release_buffer(outchunk);
                         continue;
                 }
 
@@ -352,22 +318,24 @@
                 {
                         ices_config->shutdown = 1;
                         thread_cond_broadcast(&ices_config->queue_cond);
-			free(chunk);
+			release_buffer(outchunk);
                         continue;
                 }
 
-        if(chunk->critical)
+        if(outchunk->flags & FLAG_CRITICAL)
             valid_stream = 1;
 
                 /* figure out how much time the data represents */
-		switch(inmod->type)
+		switch(outchunk->type)
                 {
-			case ICES_INPUT_VORBIS:
-				ret = _calculate_ogg_sleep(chunk, control);
+			case MEDIA_VORBIS:
+				ret = _calculate_ogg_sleep(outchunk, control);
                                 break;
-			case ICES_INPUT_PCM:
-				ret = _calculate_pcm_sleep(chunk, control);
+			case MEDIA_PCM:
+				ret = _calculate_pcm_sleep(outchunk, control);
                                 break;
+            default:
+                LOG_ERROR0("Cannot handle datatype in main input loop");
                 }
 
         if(ret < 0)
@@ -378,7 +346,10 @@
                     while(instance)
                     {
                             if(instance->skip || 
-                        (instance->wait_for_critical && !chunk->critical))
+                        (instance->wait_for_critical && 
+                        !(outchunk->flags & FLAG_CRITICAL)) ||
+                        (instance->queue->length >= instance->max_queue_length
+                        && !(outchunk->flags & FLAG_CRITICAL)))
                             {
                                     instance = instance->next;
                                     continue;
@@ -386,12 +357,10 @@
     
                             queued = malloc(sizeof(queue_item));
     
-	    		queued->buf = chunk;
+	    		queued->buf = outchunk;
                             current = instance->queue;
     
-	    		thread_mutex_lock(&ices_config->refcount_lock);
-		    	chunk->count++;
-			    thread_mutex_unlock(&ices_config->refcount_lock);
+                acquire_buffer(outchunk);
     
                             thread_mutex_lock(&current->lock);
     
@@ -414,17 +383,8 @@
                     }
         }
 
-		/* Make sure we don't end up with a 0-refcount buffer that 
-		 * will never hit any of the free points. (this happens
-		 * if all threads are set to skip, for example).
-		 */
-		thread_mutex_lock(&ices_config->refcount_lock);
-		if(!chunk->count)
-		{
-			free(chunk->buf);
-			free(chunk);
-		}
-		thread_mutex_unlock(&ices_config->refcount_lock);
+        /* We create it with a refcount of 1, so release it now */
+        release_buffer(outchunk);
 
         if(valid_stream) {
                     /* wake up the instances */
@@ -442,9 +402,8 @@
         thread_mutex_destroy(&ices_config->refcount_lock);
 
         free(control);
-
 
-	inmod->handle_event(inmod, EVENT_SHUTDOWN, NULL);
+    create_event(ices_config->input_chain, EVENT_SHUTDOWN, NULL, 1);
 
         return;
 }

<p><p>1.4.2.1   +3 -13     ices/src/input.h

Index: input.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/input.h,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- input.h	2002/01/23 03:40:28	1.4
+++ input.h	2002/02/07 09:11:11	1.4.2.1
@@ -1,9 +1,9 @@
 /* input.h
  * - Input functions
  *
- * $Id: input.h,v 1.4 2002/01/23 03:40:28 jack Exp $
+ * $Id: input.h,v 1.4.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -18,23 +18,13 @@
 #include <vorbis/codec.h>
 
 #include "config.h"
-#include "inputmodule.h"
 #include "stream.h"
 #include "reencode.h"
 #include "encode.h"
 
-typedef struct {
-	instance_t *stream;
-	input_module_t *input;
-    reencode_state *reenc;
-    encoder_state *enc;
-    shout_t *shout;
-    vorbis_comment vc;
-} stream_description;
-
-
 void input_loop(void);
 void input_flush_queue(buffer_queue *queue, int keep_critical);
+void *ices_instance_output(void *arg);
 
 #endif /* __INPUT_H__ */
 

<p><p>1.5.2.1   +2 -2      ices/src/metadata.c

Index: metadata.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/metadata.c,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- metadata.c	2001/09/25 12:04:22	1.5
+++ metadata.c	2002/02/07 09:11:11	1.5.2.1
@@ -1,9 +1,9 @@
 /* metadata.c
  * - Metadata manipulation
  *
- * $Id: metadata.c,v 1.5 2001/09/25 12:04:22 msmith Exp $
+ * $Id: metadata.c,v 1.5.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.3.2.1   +2 -2      ices/src/metadata.h

Index: metadata.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/metadata.h,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- metadata.h	2001/09/25 12:04:22	1.3
+++ metadata.h	2002/02/07 09:11:11	1.3.2.1
@@ -1,9 +1,9 @@
 /* metadata.h
  * - metadata stuff.
  *
- * $Id: metadata.h,v 1.3 2001/09/25 12:04:22 msmith Exp $
+ * $Id: metadata.h,v 1.3.2.1 2002/02/07 09:11:11 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.5.2.1   +5 -4      ices/src/playlist_basic.c

Index: playlist_basic.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/playlist_basic.c,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- playlist_basic.c	2001/11/10 05:07:17	1.5
+++ playlist_basic.c	2002/02/07 09:11:12	1.5.2.1
@@ -1,9 +1,9 @@
 /* playlist_basic.c
  * - Simple built-in unscripted playlist
  *
- * $Id: playlist_basic.c,v 1.5 2001/11/10 05:07:17 msmith Exp $
+ * $Id: playlist_basic.c,v 1.5.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -20,7 +20,6 @@
 #include <unistd.h>
 
 #include "config.h"
-#include "inputmodule.h"
 #include "im_playlist.h"
 #include "playlist_basic.h"
 
@@ -51,6 +50,8 @@
         char buf[1024];
         int buflen;
 
+    LOG_DEBUG1("Opening playlist file %s", data->file);
+
         if(stat(data->file, &st)) 
         {
                 LOG_ERROR2("Couldn't stat file \"%s\": %s", data->file, strerror(errno));
@@ -175,7 +176,7 @@
                 if (!strcmp(params->name, "file")) 
                 {
                         if (data->file) free(data->file);
-			data->file = params->value;
+			data->file = strdup(params->value);
                 } 
                 else if (!strcmp(params->name, "random")) 
                         data->random = atoi(params->value);

<p><p>1.3.2.1   +2 -2      ices/src/playlist_basic.h

Index: playlist_basic.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/playlist_basic.h,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- playlist_basic.h	2001/09/28 10:16:54	1.3
+++ playlist_basic.h	2002/02/07 09:11:12	1.3.2.1
@@ -1,9 +1,9 @@
 /* playlist_basic.h
  * - Simple unscripted playlist
  *
- * $Id: playlist_basic.h,v 1.3 2001/09/28 10:16:54 msmith Exp $
+ * $Id: playlist_basic.h,v 1.3.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.4.2.1   +2 -2      ices/src/reencode.c

Index: reencode.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/reencode.c,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- reencode.c	2002/01/28 00:19:15	1.4
+++ reencode.c	2002/02/07 09:11:12	1.4.2.1
@@ -1,9 +1,9 @@
 /* reencode.c
  * - runtime reencoding of vorbis audio (usually to lower bitrates).
  *
- * $Id: reencode.c,v 1.4 2002/01/28 00:19:15 msmith Exp $
+ * $Id: reencode.c,v 1.4.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.3.2.1   +2 -2      ices/src/reencode.h

Index: reencode.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/reencode.h,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- reencode.h	2002/01/28 00:19:15	1.3
+++ reencode.h	2002/02/07 09:11:12	1.3.2.1
@@ -1,9 +1,9 @@
 /* reencode.h
  * - reencoding functions
  *
- * $Id: reencode.h,v 1.3 2002/01/28 00:19:15 msmith Exp $
+ * $Id: reencode.h,v 1.3.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.3.2.1   +2 -2      ices/src/savefile.c

Index: savefile.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/savefile.c,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- savefile.c	2001/09/25 12:04:22	1.3
+++ savefile.c	2002/02/07 09:11:12	1.3.2.1
@@ -1,9 +1,9 @@
 /* savefile.c
  * - Stream saving to file.
  *
- * $Id: savefile.c,v 1.3 2001/09/25 12:04:22 msmith Exp $
+ * $Id: savefile.c,v 1.3.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.4.2.1   +6 -6      ices/src/signals.c

Index: signals.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/signals.c,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- signals.c	2001/09/25 12:04:22	1.4
+++ signals.c	2002/02/07 09:11:12	1.4.2.1
@@ -1,9 +1,9 @@
 /* signals.c
  * - signal handling/setup
  *
- * $Id: signals.c,v 1.4 2001/09/25 12:04:22 msmith Exp $
+ * $Id: signals.c,v 1.4.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -17,10 +17,10 @@
 
 #include "thread.h"
 
+#include "process.h"
 #include "config.h"
 #include "stream.h"
 #include "input.h"
-#include "inputmodule.h"
 #include "event.h"
 
 #define MODULE "signals/"
@@ -30,8 +30,8 @@
 
 void signal_usr1_handler(int signum)
 {
-	LOG_INFO0("Metadata update requested");
-    metadata_update_signalled = 1;
+	LOG_INFO0("Metadata update requested (Sorry. Currently not working.)");
+    // FIXME: readd this metadata_update_signalled = 1;
     thread_cond_broadcast(&ices_config->event_pending_cond);
 
         signal(SIGUSR1, signal_usr1_handler);
@@ -43,7 +43,7 @@
         log_flush(ices_config->log_id);
 
         /* Now, let's tell it to move to the next track */
-	ices_config->inmod->handle_event(ices_config->inmod,EVENT_NEXTTRACK,NULL);
+	create_event(ices_config->input_chain, EVENT_NEXTTRACK,NULL, 0);
 
         signal(SIGHUP, signal_hup_handler);
 }

<p><p>1.2.2.1   +2 -2      ices/src/signals.h

Index: signals.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/signals.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- signals.h	2001/09/25 12:04:22	1.2
+++ signals.h	2002/02/07 09:11:12	1.2.2.1
@@ -1,9 +1,9 @@
 /* signals.h
  * - signal handling/setup
  *
- * $Id: signals.h,v 1.2 2001/09/25 12:04:22 msmith Exp $
+ * $Id: signals.h,v 1.2.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.11.2.1  +225 -209  ices/src/stream.c

Index: stream.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/stream.c,v
retrieving revision 1.11
retrieving revision 1.11.2.1
diff -u -r1.11 -r1.11.2.1
--- stream.c	2002/01/28 00:19:15	1.11
+++ stream.c	2002/02/07 09:11:12	1.11.2.1
@@ -1,9 +1,9 @@
 /* stream.c
  * - Core streaming functions/main loop.
  *
- * $Id: stream.c,v 1.11 2002/01/28 00:19:15 msmith Exp $
+ * $Id: stream.c,v 1.11.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -22,15 +22,11 @@
 
 #include "config.h"
 #include "input.h"
-#include "im_playlist.h"
 #include "net/resolver.h"
 #include "signals.h"
 #include "thread/thread.h"
-#include "reencode.h"
-#include "encode.h"
-#include "inputmodule.h"
-#include "stream_shared.h"
 #include "stream.h"
+#include "event.h"
 
 #include <ogg/ogg.h>
 #include <vorbis/codec.h>
@@ -40,236 +36,256 @@
 
 #define MAX_ERRORS 10
 
-/* The main loop for each instance. Gets data passed to it from the stream
- * manager (which gets it from the input module), and streams it to the
- * specified server
- */
-void *ices_instance_stream(void *arg)
+static void _set_stream_defaults(stream_state *stream)
 {
-	int ret, shouterr;
-	ref_buffer *buffer;
-	char *connip;
-	stream_description *sdsc = arg;
-	instance_t *stream = sdsc->stream;
-	input_module_t *inmod = sdsc->input;
-	int reencoding = (inmod->type == ICES_INPUT_VORBIS) && stream->encode;
-	int encoding = (inmod->type == ICES_INPUT_PCM) && stream->encode;
-	
-	vorbis_comment_init(&sdsc->vc);
+    stream->hostname = strdup("localhost");
+    stream->port = 8000;
+    stream->password = strdup("hackme");
+    stream->mount = strdup("/stream.ogg");
+    stream->reconnect_delay = 2;
+    stream->reconnect_attempts = 10;
+    stream->stream_name = strdup("unnamed ices stream");
+    stream->stream_genre = strdup("genre not set");
+    stream->stream_description = strdup("no description supplied");
+}
 
-	sdsc->shout = shout_new();
 
-	/* we only support the ice protocol and vorbis streams currently */
-	shout_set_format(sdsc->shout, SHOUT_FORMAT_VORBIS);
-	shout_set_protocol(sdsc->shout, SHOUT_PROTOCOL_ICE);
-
-	signal(SIGPIPE, signal_hup_handler);
+static int stream_chunk(instance_t *instance, void *self, ref_buffer *buffer,
+        ref_buffer **out)
+{
+    stream_state *stream = self;
+    int ret;
 
-	connip = malloc(16);
-	if(!resolver_getip(stream->hostname, connip, 16))
+	if(stream->errors > MAX_ERRORS)
         {
-		LOG_ERROR1("Could not resolve hostname \"%s\"", stream->hostname);
-		free(connip);
-		stream->died = 1;
-		return NULL;
-	}
-
-	if (!(shout_set_host(sdsc->shout, connip)) == SHOUTERR_SUCCESS) {
-		LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-		free(connip);
-		stream->died = 1;
-		return NULL;
+		LOG_WARN0("Too many errors, shutting down");
+        release_buffer(buffer);
+		return -2;
         }
 
-	shout_set_port(sdsc->shout, stream->port);
-	if (!(shout_set_password(sdsc->shout, stream->password)) == SHOUTERR_SUCCESS) {
-		LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-		free(connip);
-		stream->died = 1;
-		return NULL;
-	}
-	if (!(shout_set_mount(sdsc->shout, stream->mount)) == SHOUTERR_SUCCESS) {
-		LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-		free(connip);
-		stream->died = 1;
-		return NULL;
-	}
-
-	/* set the metadata for the stream */
-	if (ices_config->stream_name)
-		if (!(shout_set_name(sdsc->shout, ices_config->stream_name)) == SHOUTERR_SUCCESS) {
-			LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-			free(connip);
-			stream->died = 1;
-			return NULL;
-		}
-	if (ices_config->stream_genre)
-		if (!(shout_set_genre(sdsc->shout, ices_config->stream_genre)) == SHOUTERR_SUCCESS) {
-			LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-			free(connip);
-			stream->died = 1;
-			return NULL;
-		}
-	if (ices_config->stream_description)
-		if (!(shout_set_description(sdsc->shout, ices_config->stream_description)) == SHOUTERR_SUCCESS) {
-			LOG_ERROR1("libshout error: %s\n", shout_get_error(sdsc->shout));
-			free(connip);
-			stream->died = 1;
-			return NULL;
-		}
-
-	if(encoding)
-	{
-		if(inmod->metadata_update)
-			inmod->metadata_update(inmod->internal, &sdsc->vc);
-		sdsc->enc = encode_initialise(stream->channels, stream->samplerate,
-				stream->managed, stream->min_br, stream->nom_br, stream->max_br,
-                stream->quality, stream->serial++, &sdsc->vc);
-	}
-	else if(reencoding)
-		sdsc->reenc = reencode_init(stream);
+	/* buffer being NULL means that either a fatal error occured,
+	 * or we've been told to shut down
+	 */
+	if(!buffer) {
+        LOG_ERROR0("Null buffer: this should never happen.");
+    	return -2;
+    }
 
-    if(stream->savefilename != NULL) 
+    if(stream->wait_for_bos)
     {
-        stream->savefile = fopen(stream->savefilename, "wb");
-        if(!stream->savefile)
-            LOG_ERROR2("Failed to open stream save file %s: %s", 
-                    stream->savefilename, strerror(errno));
-        else
-            LOG_INFO1("Saving stream to file %s", stream->savefilename);
+        if(buffer->flags & FLAG_BOS) {
+            LOG_INFO0("Trying restart on new substream");
+            stream->wait_for_bos = 0;
+        }
+        else {
+            release_buffer(buffer);
+            return -1;
+        }
     }
 
-	if((shouterr = shout_open(sdsc->shout)) == SHOUTERR_SUCCESS)
-	{
-		LOG_INFO3("Connected to server: %s:%d%s", 
-				shout_get_host(sdsc->shout), shout_get_port(sdsc->shout), shout_get_mount(sdsc->shout));
+    ret = shout_send_raw(stream->shout, buffer->buf, buffer->len);
 
-		while(1)
+    if(ret < buffer->len) /* < 0 for error, or short write for other errors */
+    {
+		LOG_ERROR1("Send error: %s", shout_get_error(stream->shout));
                 {
-			if(stream->buffer_failures > MAX_ERRORS)
-			{
-				LOG_WARN0("Too many errors, shutting down");
-				break;
-			}
+			int i=0;
 
-			buffer = stream_wait_for_data(stream);
-
-			/* buffer being NULL means that either a fatal error occured,
-			 * or we've been told to shut down
-			 */
-			if(!buffer)
-				break;
-
-			/* If data is NULL or length is 0, we should just skip this one.
-			 * Probably, we've been signalled to shut down, and that'll be
-			 * caught next iteration. Add to the error count just in case,
-			 * so that we eventually break out anyway 
+			/* While we're trying to reconnect, don't receive data
+			 * to this instance, or we'll overflow once reconnect
+			 * succeeds
                          */
-			if(!buffer->buf || !buffer->len)
-			{
-				LOG_WARN0("Bad buffer dequeued!");
-				stream->buffer_failures++;
-				continue; 
-			}
+			instance->skip = 1;
 
-            if(stream->wait_for_critical)
-            {
-                LOG_INFO0("Trying restart on new substream");
-                stream->wait_for_critical = 0;
-            }
-
-            ret = process_and_send_buffer(sdsc, buffer);
-
-            /* No data produced, do nothing */
-            if(ret == -1)
-                ;
-            /* Fatal error */
-            else if(ret == -2)
-            {
-                LOG_ERROR0("Serious error, waiting to restart on "
-                           "next substream. Stream temporarily suspended.");
-                /* Set to wait until a critical buffer comes through (start of
-                 * a new substream, typically), and flush existing queue.
-                 */
-                thread_mutex_lock(&ices_config->flush_lock);
-                stream->wait_for_critical = 1;
-                input_flush_queue(stream->queue, 0);
-                thread_mutex_unlock(&ices_config->flush_lock);
-            }
-            /* Non-fatal shout error */
-            else if(ret == 0)
+			/* Also, flush the current queue */
+			input_flush_queue(instance->queue, 1);
+			
+			while((i < stream->reconnect_attempts ||
+					stream->reconnect_attempts==-1) && !ices_config->shutdown)
                         {
-				LOG_ERROR1("Send error: %s", 
-                        shout_get_error(sdsc->shout));
-				if(shouterr == SHOUTERR_SOCKET)
+				i++;
+				LOG_WARN0("Trying reconnect after server socket error");
+				shout_close(stream->shout);
+				if(shout_open(stream->shout) == SHOUTERR_SUCCESS)
                                 {
-					int i=0;
-
-					/* While we're trying to reconnect, don't receive data
-					 * to this instance, or we'll overflow once reconnect
-					 * succeeds
-					 */
-					thread_mutex_lock(&ices_config->flush_lock);
-					stream->skip = 1;
-
-					/* Also, flush the current queue */
-					input_flush_queue(stream->queue, 1);
-					thread_mutex_unlock(&ices_config->flush_lock);
-					
-					while((i < stream->reconnect_attempts ||
-							stream->reconnect_attempts==-1) && 
-                            !ices_config->shutdown)
+					LOG_INFO3("Connected to server: %s:%d%s", 
+                            shout_get_host(stream->shout), 
+                            shout_get_port(stream->shout), 
+                            shout_get_mount(stream->shout));
+
+                    /* After reconnect, we MUST start a new logical stream.
+                     * This instructs the input chain to do so.
+                     */
+                    create_event(ices_config->input_chain, EVENT_NEXTTRACK, 
+                            NULL, 0);
+					break;
+				}
+				else
+				{
+					LOG_ERROR3("Failed to reconnect to %s:%d (%s)",
+						shout_get_host(stream->shout), 
+                        shout_get_port(stream->shout), 
+                        shout_get_error(stream->shout));
+					if(i==stream->reconnect_attempts)
                                         {
-						i++;
-						LOG_WARN0("Trying reconnect after server socket error");
-						shout_close(sdsc->shout);
-						if((shouterr = shout_open(sdsc->shout)) == SHOUTERR_SUCCESS)
-						{
-							LOG_INFO3("Connected to server: %s:%d%s", 
-                                    shout_get_host(sdsc->shout), shout_get_port(sdsc->shout), 
-                                    shout_get_mount(sdsc->shout));
-							break;
-						}
-						else
-						{
-							LOG_ERROR3("Failed to reconnect to %s:%d (%s)",
-								shout_get_host(sdsc->shout),shout_get_port(sdsc->shout),
-								shout_get_error(sdsc->shout));
-							if(i==stream->reconnect_attempts)
-							{
-								LOG_ERROR0("Reconnect failed too many times, "
-										  "giving up.");
-                                /* We want to die now */
-								stream->buffer_failures = MAX_ERRORS+1; 
-							}
-							else /* Don't try again too soon */
-								sleep(stream->reconnect_delay); 
-						}
+						LOG_ERROR0("Reconnect failed too many times, "
+								  "giving up.");
+                        /* We want to die now */
+						stream->errors = MAX_ERRORS+1; 
                                         }
-					stream->skip = 0;
+					else { /* Don't try again too soon */
+                        LOG_DEBUG1("Sleeping for %d seconds before retrying",
+                                stream->reconnect_delay);
+						sleep(stream->reconnect_delay); 
+                    }
                                 }
-				stream->buffer_failures++;
                         }
-			stream_release_buffer(buffer);
+			instance->skip = 0;
                 }
+		stream->errors++;
         }
-	else
+
+	release_buffer(buffer);
+
+    return 0;
+}
+	
+static void close_module(process_chain_element *mod) 
+{
+    stream_state *stream = mod->priv_data;
+
+	shout_close(stream->shout);
+   	shout_free(stream->shout);
+
+    if(stream->hostname)
+        free(stream->hostname);
+    if(stream->password)
+        free(stream->password);
+    if(stream->mount)
+        free(stream->mount);
+    if(stream->connip)
+        free(stream->connip);
+
+    free(stream);
+}
+
+static int event_handler(process_chain_element *self, event_type ev, 
+        void *param)
+{
+    switch(ev)
+    {
+        case EVENT_SHUTDOWN:
+            close_module(self);
+            break;
+        default:
+            return -1;
+    }
+
+    return 0;
+}
+
+int stream_open_module(process_chain_element *mod, module_param_t *params)
+{
+    stream_state *stream = calloc(1, sizeof(stream_state));
+    module_param_t *paramstart = params;
+
+    _set_stream_defaults(stream);
+
+    mod->name = "output-stream";
+    mod->input_type = MEDIA_VORBIS;
+    mod->output_type = MEDIA_NONE;
+
+    mod->process = stream_chunk;
+    mod->event_handler = event_handler;
+
+    mod->priv_data = stream;
+
+    while(params) {
+        if(!strcmp(params->name, "hostname"))
+            stream->hostname = strdup(params->value);
+        else if(!strcmp(params->name, "port"))
+            stream->port = atoi(params->value);
+        else if(!strcmp(params->name, "password"))
+            stream->password = strdup(params->value);
+        else if(!strcmp(params->name, "mount"))
+            stream->mount = strdup(params->value);
+        else if(!strcmp(params->name, "reconnectdelay"))
+            stream->reconnect_delay = atoi(params->value);
+        else if(!strcmp(params->name, "reconnectattempts"))
+            stream->reconnect_attempts = atoi(params->value);
+        else if(!strcmp(params->name, "stream-name"))
+            stream->stream_name = strdup(params->value);
+        else if(!strcmp(params->name, "stream-genre"))
+            stream->stream_genre = strdup(params->value);
+        else if(!strcmp(params->name, "stream-description"))
+            stream->stream_description = strdup(params->value);
+        else
+            LOG_ERROR1("Unrecognised parameter \"%s\"", params->name);
+            
+        params = params->next;
+    } 
+
+    config_free_params(paramstart);
+
+	stream->shout = shout_new();
+
+	/* we only support the ice protocol and vorbis streams currently */
+	shout_set_format(stream->shout, SHOUT_FORMAT_VORBIS);
+	shout_set_protocol(stream->shout, SHOUT_PROTOCOL_ICE);
+
+
+	stream->connip = malloc(16);
+	if(!resolver_getip(stream->hostname, stream->connip, 16))
         {
-		LOG_ERROR3("Failed initial connect to %s:%d (%s)", 
-				shout_get_host(sdsc->shout),shout_get_port(sdsc->shout), shout_get_error(sdsc->shout));
+		LOG_ERROR1("Could not resolve hostname \"%s\"", stream->hostname);
+		return -1;
         }
-	
-	shout_close(sdsc->shout);
+
+	if (shout_set_host(stream->shout, stream->connip)) {
+		LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+		return -1;
+	}
 
-    if(stream->savefile != NULL) 
-        fclose(stream->savefile);
+	shout_set_port(stream->shout, stream->port);
+	if (shout_set_password(stream->shout, stream->password)) {
+		LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+		return -1;
+	}
+	if (shout_set_mount(stream->shout, stream->mount)) {
+		LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+		return -1;
+	}
+
+	/* set the metadata for the stream*/
+	if (stream->stream_name)
+		if (shout_set_name(stream->shout, stream->stream_name)) {
+			LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+			return -1;
+		}
+	if (stream->stream_genre)
+		if (shout_set_genre(stream->shout, stream->stream_genre)) {
+			LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+			return -1;
+		}
+	if (stream->stream_description)
+		if (shout_set_description(stream->shout, stream->stream_description)) {
+			LOG_ERROR1("libshout error: %s\n", shout_get_error(stream->shout));
+			return -1;
+		}
 
-    	shout_free(sdsc->shout);
-	encode_clear(sdsc->enc);
-	reencode_clear(sdsc->reenc);
-	vorbis_comment_clear(&sdsc->vc);
+	if(shout_open(stream->shout) == SHOUTERR_SUCCESS)
+	{
+		LOG_INFO3("Connected to server: %s:%d%s", 
+				shout_get_host(stream->shout), shout_get_port(stream->shout), 
+                shout_get_mount(stream->shout));
+    }
+	else
+	{
+		LOG_ERROR3("Failed initial connect to %s:%d (%s)", 
+				shout_get_host(stream->shout),shout_get_port(stream->shout), 
+                shout_get_error(stream->shout));
+	}
 
-	stream->died = 1;
-	return NULL;
+    return 0;
 }
 

<p><p>1.2.2.1   +26 -12    ices/src/stream.h

Index: stream.h
===================================================================
RCS file: /usr/local/cvsroot/ices/src/stream.h,v
retrieving revision 1.2
retrieving revision 1.2.2.1
diff -u -r1.2 -r1.2.2.1
--- stream.h	2001/09/25 12:04:22	1.2
+++ stream.h	2002/02/07 09:11:12	1.2.2.1
@@ -1,9 +1,9 @@
 /* stream.h
  * - Core streaming functions/main loop.
  *
- * $Id: stream.h,v 1.2 2001/09/25 12:04:22 msmith Exp $
+ * $Id: stream.h,v 1.2.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute
@@ -18,16 +18,9 @@
 #include <shout/shout.h>
 
 #include "thread/thread.h"
+#include "process.h"
 #include "config.h"
 
-typedef struct {
-	unsigned char *buf;
-	long len;
-	int count;
-	int critical;
-	long aux_data;
-} ref_buffer;
-
 typedef struct _queue_item {
         ref_buffer *buf;
         struct _queue_item *next;
@@ -38,9 +31,30 @@
         int length;
         mutex_t lock;
 } buffer_queue;
+
+typedef struct
+{
+	char *hostname;
+    char *connip;
+	int port;
+	char *password;
+	char *mount;
+	int reconnect_delay;
+	int reconnect_attempts;
+	int max_queue_length;
+
+    char *stream_name;
+    char *stream_genre;
+    char *stream_description;
+
+    shout_t *shout;
+    int errors;
+
+    int wait_for_bos;
+
+} stream_state;
 
-void *ices_instance_stream(void *arg);
-void *savefile_stream(void *arg);
+int stream_open_module(process_chain_element *mod, module_param_t *params);
 
 #endif
 

<p><p>1.3.2.1   +2 -2      ices/src/stream_rewrite.c

Index: stream_rewrite.c
===================================================================
RCS file: /usr/local/cvsroot/ices/src/stream_rewrite.c,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- stream_rewrite.c	2001/11/10 04:47:24	1.3
+++ stream_rewrite.c	2002/02/07 09:11:12	1.3.2.1
@@ -5,9 +5,9 @@
  *
  * Heavily based on vcedit.c from vorbiscomment.
  *
- * $Id: stream_rewrite.c,v 1.3 2001/11/10 04:47:24 msmith Exp $
+ * $Id: stream_rewrite.c,v 1.3.2.1 2002/02/07 09:11:12 msmith Exp $
  *
- * Copyright (c) 2001 Michael Smith <msmith at labyrinth.net.au>
+ * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
  *
  * This program is distributed under the terms of the GNU General
  * Public License, version 2. You may use, modify, and redistribute

<p><p>1.1                  ices/src/output.c

Index: output.c
===================================================================
/* output.c
 * - Manage output instances
 *
 * $Id: output.c,v 1.1 2002/02/07 09:11:12 msmith Exp $
 *
 * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
 *
 * This program is distributed under the terms of the GNU General
 * Public License, version 2. You may use, modify, and redistribute
 * it under the terms of this license. A copy should be included
 * with this source.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>

#include <thread/thread.h>
#include "config.h"
#include "input.h"
#include "stream.h"
#include "process.h"
#include "signals.h"

#define MODULE "output/"
#include "logging.h"

ref_buffer *instance_wait_for_data(instance_t *stream)
{
        ref_buffer *buffer;
        queue_item *old;

        thread_mutex_lock(&stream->queue->lock);
        while(!stream->queue->head && !ices_config->shutdown && !stream->kill)
        {
                thread_mutex_unlock(&stream->queue->lock);
                thread_cond_wait(&ices_config->queue_cond);
                thread_mutex_lock(&stream->queue->lock);
        }

        if(ices_config->shutdown || stream->kill)
        {
                LOG_DEBUG0("Shutdown signalled: thread shutting down");
                thread_mutex_unlock(&stream->queue->lock);
                return NULL;
        }

        buffer = stream->queue->head->buf;
        old = stream->queue->head;

        stream->queue->head = stream->queue->head->next;
        if(!stream->queue->head)
            stream->queue->tail = NULL;

        free(old);
        stream->queue->length--;
        thread_mutex_unlock(&stream->queue->lock);

        /* ok, we pulled something off the queue and the queue is
         * now empty - this means we're probably keeping up, so
         * clear one of the errors. This way, very occasional errors
         * don't cause eventual shutdown
         */
        if(!stream->queue->head && stream->buffer_failures>0)
                stream->buffer_failures--;

        return buffer;
}

/* The main loop for each instance. Gets data passed to it from the stream
 *  * manager (which gets it from the input chain), and feeds it out through each
 *   * output chain.
 *    */
void *ices_instance_output(void *arg)
{
    int ret;
    instance_t *instance = arg;
    ref_buffer *in, *out;

    /* What is this for?? */
    signal(SIGPIPE, signal_hup_handler);

    while(1) {
        in = instance_wait_for_data(instance);

        if(!in) {
            LOG_DEBUG0("null buffer from feeder");
            break;
        }

        if(!in->buf || in->len <= 0) {
            LOG_WARN0("Bad buffer dequeued.");
            release_buffer(in);
            continue;
        }

        ret = process_chain(instance, instance->output_chain, in, &out);

        if(ret == -1) {
            LOG_DEBUG0("Non-fatal error  - chain not completed");
            continue;
        }
        else if(ret == -2) {
            LOG_ERROR0("Error received from output chain");
            break;
        }
    }

    /* Left main loop, we've shut down */
    instance->died = 1;

    return NULL;
}

<p><p><p>1.1                  ices/src/process.c

Index: process.c
===================================================================
/* process.c
 * - Processing chains - data sources, sinks, processing effects, reencoding,
 *   etc.
 *
 * $Id: process.c,v 1.1 2002/02/07 09:11:12 msmith Exp $
 *
 * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
 *
 * This program is distributed under the terms of the GNU General
 * Public License, version 2. You may use, modify, and redistribute
 * it under the terms of this license. A copy should be included
 * with this source.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include "process.h"
#include "config.h"
#include "thread/thread.h"

#define MODULE "process/"
#include "logging.h"

#define DEBUG_BUFFERS

<p>/* Return a newly allocate buffer, with refcount initialised to 1. */
ref_buffer *new_ref_buffer(media_type media, void *data, int len)
{
    ref_buffer *new = calloc(1, sizeof(ref_buffer));
    new->type = media;
    new->buf = data;
    new->len = len;

    new->count = 1;

    return new;
}

void acquire_buffer(ref_buffer *buf)
{
    thread_mutex_lock(&ices_config->refcount_lock);

#ifdef DEBUG_BUFFERS
    if(!buf) {
        LOG_ERROR0("Null buffer aquired?");
            thread_mutex_unlock(&ices_config->refcount_lock);
        return;
    }
    if(buf->count < 0)
        LOG_ERROR1("Error: refbuf has count %d before increment.", buf->count);
#endif

    buf->count++;
    
        thread_mutex_unlock(&ices_config->refcount_lock);
}

void release_buffer(ref_buffer *buf)
{
        thread_mutex_lock(&ices_config->refcount_lock);

#ifdef DEBUG_BUFFERS
    if(!buf) {
        LOG_ERROR0("Null buffer released?");
            thread_mutex_unlock(&ices_config->refcount_lock);
        return;
    }
    if(buf->count <= 0)
        LOG_ERROR1("Error: refbuf has count %d before decrement.", buf->count);
#endif

        buf->count--;
    
        if(!buf->count)
        {
                free(buf->buf);
                free(buf);
        }
        thread_mutex_unlock(&ices_config->refcount_lock);
}

/* return values:
 * 0: normal return, success.
 * -1: chain terminated - insufficient data available?
 * -2: fatal error.
 */

int process_chain(instance_t *instance, process_chain_element *chain, 
        ref_buffer *in, ref_buffer **out) 
{
    int ret=0;

    while(chain) {
        if(chain->input_type != MEDIA_NONE && !in) {
            LOG_ERROR0("NULL input buffer where input required.");
            return -2;
        }

        if(chain->input_type != MEDIA_NONE && in->type != chain->input_type) {
            LOG_ERROR2("Chain input does not match expected input! (%d != %d",
                    in->type, chain->input_type);
            return -2;
        }
        
        ret = chain->process(instance, chain->priv_data, in, out);

        if(ret <= 0) {
            return ret;
        }

        if(chain->output_type != MEDIA_NONE &&  
                (*out)->type != chain->output_type) {
            LOG_ERROR0("Chain did not produce expected output type.");
            return -2;
        }

        in = *out;
        chain = chain->next;
    }

    return ret;
}

void create_event(process_chain_element *chain, event_type event, 
        void *param, int broadcast)
{
    /* chain->handle_event() returns 0 if it handles the event successfully.
     * We deliver to only one chain object unless the broadcast flag is set.
     */

    /* XXX: Try something like this?? Wake threads that don't do anything
     *      except when a flag gets set like this.
     * if(!chain) {
     *     thread_cond_broadcast(&ices_config->event_pending_cond);
     * } else { ...
     */
    while(chain) {
        if(!(chain->event_handler(chain, event, param) || broadcast)) 
            return;

        chain = chain->next;
    }

    if(!broadcast)
        LOG_INFO1("Non-broadcast event %d unhandled", event);
}

<p>    
    

<p><p>1.1                  ices/src/process.h

Index: process.h
===================================================================
/* process.h
 * - Processing chains
 *
 * $Id: process.h,v 1.1 2002/02/07 09:11:12 msmith Exp $
 *
 * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
 *
 * This program is distributed under the terms of the GNU General
 * Public License, version 2. You may use, modify, and redistribute
 * it under the terms of this license. A copy should be included
 * with this source.
 */

#ifndef __PROCESS_H__
#define __PROCESS_H__

#include "event.h"

typedef enum {
    MEDIA_VORBIS,
    MEDIA_PCM,
    MEDIA_DATA,
    MEDIA_NONE,
} media_type;

typedef enum {
    SUBTYPE_PCM_BE_16,
    SUBTYPE_PCM_LE_16,
    SUBTYPE_PCM_FLOAT,
} media_subtype;

typedef enum {
    FLAG_CRITICAL = 1<<0,
    FLAG_BOS = 1<<1,

} buffer_flags;

typedef struct {
    media_type type;      /* Type of data held in buffer */
    media_subtype subtype;
    short channels;
    int rate;

        void *buf;            /* Actual data */
        long len;             /* Length of data (usually bytes, sometimes samples */

        short count;          /* Reference count */

        buffer_flags flags;   /* Flag: critical chunks must be processed fully */
        long aux_data;        /* Auxilliary data used for various purposes */
} ref_buffer;

/* Need some forward declarations */
struct _process_chain_element;
struct _instance_t;
struct _module_param_t;

/* Note that instance will be NULL for input chains */
typedef int (*process_func)(struct _instance_t *instance, void *data, 
        ref_buffer *in, ref_buffer **out);
typedef int (*event_func)(struct _process_chain_element *self, event_type event,
        void *param);
typedef int (*open_func)(struct _process_chain_element *self, 
        struct _module_param_t *params);

typedef struct _process_chain_element {
    char *name;
    open_func open;
    struct _module_param_t *params;
    
    process_func process;
    event_func   event_handler;
    void *priv_data;

    media_type input_type;
    media_type output_type;
   
    struct _process_chain_element *next;
} process_chain_element;

typedef struct _instance_t
{
        int buffer_failures;
        int died;
        int kill;
        int skip;
    int wait_for_critical;

        struct buffer_queue *queue;
    int max_queue_length;
    process_chain_element *output_chain;

    struct _instance_t *next;
} instance_t;

int process_chain(struct _instance_t *instance, process_chain_element *chain,
        ref_buffer *in, ref_buffer **out);

ref_buffer *new_ref_buffer(media_type media, void *data, int len);
void acquire_buffer(ref_buffer *buf);
void release_buffer(ref_buffer *buf);

void create_event(process_chain_element *chain, event_type event, void *param,
        int broadcast);

#endif /* __PROCESS_H__ */

<p><p><p><p>1.1                  ices/src/registry.h

Index: registry.h
===================================================================
/* registry.h
 * - Registry of input/output/processing modules.
 *
 * $Id: registry.h,v 1.1 2002/02/07 09:11:12 msmith Exp $
 *
 * Copyright (c) 2001-2002 Michael Smith <msmith at labyrinth.net.au>
 *
 * This program is distributed under the terms of the GNU General
 * Public License, version 2. You may use, modify, and redistribute
 * it under the terms of this license. A copy should be included
 * with this source.
 */

#ifndef __REGISTRY_H__
#define __REGISTRY_H__

#include "process.h"
#include "config.h"
#include "im_playlist.h"
#include "im_stdinpcm.h"
#include "stream.h"
#include "encode.h"

#ifdef HAVE_OSS
#include "im_oss.h"
#endif 

/*
#ifdef HAVE_SUN_AUDIO
#include "im_sun.h"
#endif
*/
typedef struct _module
{   
        char *name;
        open_func open;
} module;

tatic module registered_modules[] = {
    { "encode", encode_open_module},
    { "stream", stream_open_module},
    { "playlist", playlist_open_module},
    { "stdinpcm", stdin_open_module}, 
#ifdef HAVE_OSS
    { "oss", oss_open_module}, 
#endif
#ifdef HAVE_SUN_AUDIO 
    { "sun", sun_open_module},
#endif
    {NULL,NULL}
};

#endif /* __REGISTRY_H__ */

<p><p><p><p>--- >8 ----
List archives:  http://www.xiph.org/archives/
Ogg project homepage: http://www.xiph.org/ogg/
To unsubscribe from this list, send a message to 'cvs-request at xiph.org'
containing only the word 'unsubscribe' in the body.  No subject is needed.
Unsubscribe messages sent to the list will be ignored/filtered.



More information about the commits mailing list