[xiph-commits] r7338 - icecast/trunk/libshout/src

brendan at dactyl.lonelymoon.com brendan
Sun Jul 25 15:16:48 PDT 2004


Author: brendan
Date: Sun Jul 25 15:16:48 2004
New Revision: 7338

Modified:
icecast/trunk/libshout/src/
icecast/trunk/libshout/src/shout.c
icecast/trunk/libshout/src/shout_private.h
Log:
Create read queue for buffering response header.
Buffer response header in HTTP login (we're very close to ready for
non-blocking I/O now).



Property changes on: icecast/trunk/libshout/src
___________________________________________________________________
Name: svn:ignore
- Makefile
Makefile.in
*.o
*.lo
*.la
.libs
.deps

+ Makefile
Makefile.in
TAGS
*.o
*.lo
*.la
.libs
.deps


Modified: icecast/trunk/libshout/src/shout.c
===================================================================
--- icecast/trunk/libshout/src/shout.c	2004-07-25 21:42:17 UTC (rev 7337)
+++ icecast/trunk/libshout/src/shout.c	2004-07-25 22:16:47 UTC (rev 7338)
@@ -39,9 +39,12 @@
#include "util.h"

/* -- local prototypes -- */
-static int queue_data(shout_t *self, const unsigned char *data, size_t len);
+static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len);
static int queue_str(shout_t *self, const char *str);
+static int queue_printf(shout_t *self, const char *fmt, ...);
+static void queue_free(shout_buf_t *queue);
static int send_queue(shout_t *self);
+static int get_response(shout_t *self);
static int try_write (shout_t *self, const void *data, size_t len);

static int login_xaudiocast(shout_t *self);
@@ -761,7 +764,7 @@
/* -- static function definitions -- */

/* queue data in pages of SHOUT_BUFSIZE bytes */
-static int queue_data(shout_t *self, const unsigned char *data, size_t len)
+static int queue_data(shout_buf_t **queue, const unsigned char *data, size_t len)
{
shout_buf_t *buf;
size_t plen;
@@ -769,13 +772,13 @@
if (!len)
return SHOUTERR_SUCCESS;

-	if (!self->queue) {
-		self->queue = calloc(1, sizeof (shout_buf_t));
-		if (! self->queue)
+	if (!*queue) {
+		*queue = calloc(1, sizeof (shout_buf_t));
+		if (! *queue)
return SHOUTERR_MALLOC;
}

-	for (buf = self->queue; buf->next; buf = buf->next);
+	for (buf = *queue; buf->next; buf = buf->next);

/* Maybe any added data should be freed if we hit a malloc error?
* Otherwise it'd be impossible to tell where to start requeueing.
@@ -785,6 +788,7 @@
buf->next = calloc(1, sizeof (shout_buf_t));
if (! buf->next)
return SHOUTERR_MALLOC;
+			buf->next->prev = buf;
buf = buf->next;
}

@@ -800,7 +804,7 @@

static inline int queue_str(shout_t *self, const char *str)
{
-	return queue_data(self, str, strlen(str));
+	return queue_data(&self->wqueue, str, strlen(str));
}

/* this should be shared with sock_write. Create libicecommon. */
@@ -821,12 +825,12 @@
self->error = SHOUTERR_SUCCESS;
if (len > 0) {
if ((size_t)len < sizeof(buffer))
-			queue_data(self, buf, len);
+			queue_data(&self->wqueue, buf, len);
else {
buf = malloc(++len);
if (buf) {
len = vsnprintf(buf, len, fmt, ap_retry);
-				queue_data(self, buf, len);
+				queue_data(&self->wqueue, buf, len);
free(buf);
} else
self->error = SHOUTERR_MALLOC;
@@ -839,6 +843,62 @@
return self->error;
}

+static inline void queue_free(shout_buf_t *queue)
+{
+	shout_buf_t *prev;
+
+	while (queue) {
+		prev = queue;
+		queue = queue->next;
+		free(prev);
+	}
+}
+
+static int get_response(shout_t *self)
+{
+	char buf[1024];
+	int rc, blen;
+	char *pc;
+	shout_buf_t *queue;
+	int newlines = 0;
+
+	rc = sock_read_bytes(self->socket, buf, sizeof(buf));
+
+	if (rc < 0 && sock_recoverable(rc))
+		return SHOUTERR_BUSY;
+	if (!rc)
+		return SHOUTERR_SOCKET;
+
+	if ((rc = queue_data(&self->rqueue, buf, rc)))
+		return rc;
+
+	/* work from the back looking for \r?\n\r?\n. Anything else means more
+	 * is coming. */
+	for (queue = self->rqueue; queue->next; queue = queue->next);
+	pc = queue->data + queue->len - 1;
+	blen = queue->len;
+	while (blen) {
+		if (*pc == '\n')
+			newlines++;
+		else if (*pc != '\r')
+			break;
+
+		if (newlines == 2)
+			return SHOUTERR_SUCCESS;
+
+		blen--;
+		pc--;
+
+		if (!blen && queue->prev) {
+			queue = queue->prev;
+			pc = queue->data + queue->len - 1;
+			blen = queue->len;
+		}
+	}
+
+	return SHOUTERR_BUSY;
+}
+
static int try_write (shout_t *self, const void *data, size_t len)
{
int ret = sock_write_bytes (self->socket, data, len);
@@ -855,15 +915,36 @@
return ret;
}

+/* collect nodes of a queue into a single buffer */
+static int collect_queue(shout_buf_t *queue, char **buf)
+{
+	shout_buf_t *node;
+	int pos = 0;
+	int len = 0;
+
+	for (node = queue; node; node = node->next)
+		len += node->len;
+
+	if (!(*buf = malloc(len)))
+		return SHOUTERR_MALLOC;
+
+	for (node = queue; node; node = node->next) {
+		memcpy(*buf + pos, node->data, node->len);
+		pos += node->len;
+	}
+
+	return len;
+}
+
static int send_queue(shout_t *self)
{
shout_buf_t *buf;
int ret;

-	if (!self->queue)
+	if (!self->wqueue)
return 0;

-	buf = self->queue;
+	buf = self->wqueue;
while (buf) {
ret = try_write (self, buf->data + buf->pos, buf->len - buf->pos);
if (ret < 0)
@@ -871,9 +952,11 @@

buf->pos += ret;
if (buf->pos == buf->len) {
-			self->queue = buf->next;
+			self->wqueue = buf->next;
free(buf);
-			buf = self->queue;
+			buf = self->wqueue;
+			if (buf)
+				buf->prev = NULL;
} else /* incomplete write */
return SHOUTERR_SUCCESS;
}
@@ -962,8 +1045,9 @@

static int login_http_basic(shout_t *self)
{
-	char header[4096];
http_parser_t *parser;
+	char *header;
+	int hlen = 0;
int code;
char *retcode;
#if 0
@@ -983,13 +1067,21 @@
if (send_queue(self) != SHOUTERR_SUCCESS)
return self->error;

-	if (_shout_util_read_header(self->socket, header, 4096) == 0)
-		/* either we didn't get a complete header, or we timed out */
-		return self->error = SHOUTERR_SOCKET;
+	while ((code = get_response(self)) == SHOUTERR_BUSY);
+	if (code != SHOUTERR_SUCCESS)
+		return code;

+	/* all this copying! */
+	hlen = collect_queue(self->rqueue, &header);
+	if (hlen <= 0)
+		return SHOUTERR_MALLOC;
+	queue_free(self->rqueue);
+	self->rqueue = NULL;
+
parser = httpp_create_parser();
httpp_initialize(parser, NULL);
-	if (httpp_parse_response(parser, header, strlen(header), self->mount)) {
+	if (httpp_parse_response(parser, header, hlen, self->mount)) {
+		free (header);
retcode = httpp_getvar(parser, HTTPP_VAR_ERROR_CODE);
code = atoi(retcode);
if(code >= 200 && code < 300) {
@@ -1035,6 +1127,7 @@
#endif
}

+	free(header);
httpp_destroy(parser);
return self->error = SHOUTERR_NOLOGIN;
}
@@ -1088,7 +1181,7 @@
return SHOUTERR_SUCCESS;
}

-int login_icy(shout_t *self)
+static int login_icy(shout_t *self)
{
char response[4096];
const char *bitrate;

Modified: icecast/trunk/libshout/src/shout_private.h
===================================================================
--- icecast/trunk/libshout/src/shout_private.h	2004-07-25 21:42:17 UTC (rev 7337)
+++ icecast/trunk/libshout/src/shout_private.h	2004-07-25 22:16:47 UTC (rev 7338)
@@ -37,6 +37,7 @@
unsigned int len;
unsigned int pos;

+	struct _shout_buf *prev;
struct _shout_buf *next;
} shout_buf_t;

@@ -82,7 +83,8 @@
int (*send)(shout_t* self, const unsigned char* buff, size_t len);
void (*close)(shout_t* self);

-	shout_buf_t *queue;
+	shout_buf_t *rqueue;
+	shout_buf_t *wqueue;

/* start of this period's timeclock */
uint64_t starttime;



More information about the commits mailing list