[core] implement streams on chunkqueues

personal/stbuehler/wip
Stefan Bühler 9 years ago
parent ebd5a4ecb3
commit c450fa82a5
  1. 1
      include/lighttpd/base.h
  2. 119
      include/lighttpd/stream.h
  3. 8
      include/lighttpd/stream_http_response.h
  4. 25
      include/lighttpd/typedefs.h
  5. 3
      src/CMakeLists.txt
  6. 3
      src/main/Makefile.am
  7. 8
      src/main/http_response_parser.rl
  8. 547
      src/main/stream.c
  9. 79
      src/main/stream_http_response.c
  10. 142
      src/main/stream_simple_socket.c
  11. 3
      src/main/wscript

@ -18,6 +18,7 @@
#include <lighttpd/chunk_parser.h>
#include <lighttpd/waitqueue.h>
#include <lighttpd/stream.h>
#include <lighttpd/radix.h>
#include <lighttpd/base_lua.h>

@ -0,0 +1,119 @@
#ifndef _LIGHTTPD_STREAM_H_
#define _LIGHTTPD_STREAM_H_
#ifndef _LIGHTTPD_BASE_H_
#error Please include <lighttpd/base.h> instead of this file
#endif
#include <lighttpd/jobqueue.h>
typedef void (*liStreamCB)(liStream *stream, liStreamEvent event);
struct liStream {
gint refcount;
liStream *source, *dest;
liChunkQueue *out;
liJob new_data_job;
liJobQueue *jobqueue;
liStreamCB cb;
};
LI_API const gchar* li_stream_event_string(liStreamEvent event);
LI_API void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb);
LI_API void li_stream_acquire(liStream* stream);
LI_API void li_stream_release(liStream* stream);
INLINE void li_stream_safe_release(liStream** pstream);
INLINE void li_stream_safe_reset_and_release(liStream** pstream);
LI_API void li_stream_connect(liStream *source, liStream *dest);
LI_API void li_stream_disconnect(liStream *stream); /* disconnects stream->source and stream */
LI_API void li_stream_disconnect_dest(liStream *stream); /* disconnects stream->dest and stream. only for errors/conection resets */
LI_API void li_stream_reset(liStream *stream); /* disconnect both sides */
LI_API void li_stream_notify(liStream *stream); /* new data in stream->cq, notify stream->dest */
LI_API void li_stream_notify_later(liStream *stream);
LI_API void li_stream_again(liStream *stream); /* more data to be generated in stream with event NEW_DATA or more data to be read from stream->source->cq */
LI_API void li_stream_again_later(liStream *stream);
/* detach from jobqueue, stops all event handling. you have to detach all connected streams to move streams between threads */
LI_API void li_stream_detach(liStream *stream);
LI_API void li_stream_attach(liStream *stream, liJobQueue *jobqueue); /* attach to another jobqueue - possibly after switching threads */
/* walks from first using ->dest until it reaches NULL or (it reached last and NULL != i->limit) or limit == i->cq->limit and
* sets i->cq->limit to limit, triggering LI_STREAM_NEW_CQLIMIT.
* limit must not be NULL!
*/
LI_API void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit);
/* checks whether all chunkqueues in a range of streams are empty. one of first and last can be NULL to walk all stream in a direction */
LI_API gboolean li_streams_empty(liStream *first, liStream *last);
LI_API liStream* li_stream_plug_new(liJobQueue *jobqueue); /* simple forwarder; can also be used for providing data from memory */
LI_API liStream* li_stream_null_new(liJobQueue *jobqueue); /* eats everything, disconnects source on eof, out is always closed */
typedef void (*liIOStreamCB)(liIOStream *stream, liIOStreamEvent event);
/* TODO: support throttle */
struct liIOStream {
liStream stream_in, stream_out;
liCQLimit *stream_in_limit;
/* initialize these before connecting stream_out if you need them */
liWaitQueue *write_timeout_queue;
liWaitQueueElem write_timeout_elem;
liWorker *wrk;
ev_io io_watcher;
/* whether we want to read/write */
gboolean in_closed, out_closed;
gboolean can_read, can_write; /* set to FALSE if you got EAGAIN */
liIOStreamCB cb;
gpointer data; /* data for the callback */
};
LI_API const gchar* li_iostream_event_string(liIOStreamEvent event);
LI_API liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data);
LI_API void li_iostream_acquire(liIOStream* iostream);
LI_API void li_iostream_release(liIOStream* iostream);
LI_API int li_iostream_reset(liIOStream *iostream); /* returns fd, disconnects everything, stop callbacks, releases one reference */
/* similar to stream_detach/_attach */
LI_API void li_iostream_detach(liIOStream *iostream);
LI_API void li_iostream_attach(liIOStream *iostream, liWorker *wrk);
/* handles basic tcp/unix socket connections, writing and reading data, supports throttling */
LI_API void li_stream_simple_socket_close(liIOStream *stream, gboolean aborted);
LI_API void li_stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event);
LI_API void li_stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data);
/* inline implementations */
INLINE void li_stream_safe_release(liStream** pstream) {
liStream *stream;
if (NULL == pstream || NULL == (stream = *pstream)) return;
*pstream = NULL;
li_stream_release(stream);
}
INLINE void li_stream_safe_reset_and_release(liStream** pstream) {
liStream *stream;
if (NULL == pstream || NULL == (stream = *pstream)) return;
*pstream = NULL;
li_stream_reset(stream);
li_stream_release(stream);
}
#endif

@ -0,0 +1,8 @@
#ifndef _LIGHTTPD_STREAM_HTTP_RESPONSE_H_
#define _LIGHTTPD_STREAM_HTTP_RESPONSE_H_
#include <lighttpd/base.h>
LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph);
#endif

@ -214,6 +214,31 @@ typedef struct liServer liServer;
typedef struct liServerSocket liServerSocket;
/* stream.h */
typedef struct liStream liStream;
typedef struct liIOStream liIOStream;
typedef enum {
LI_STREAM_NEW_DATA, /* either new/more data in stream->source->cq, or more data to be generated */
LI_STREAM_NEW_CQLIMIT,
LI_STREAM_CONNECTED_DEST,
LI_STREAM_CONNECTED_SOURCE,
LI_STREAM_DISCONNECTED_DEST,
LI_STREAM_DISCONNECTED_SOURCE,
LI_STREAM_DESTROY
} liStreamEvent;
typedef enum {
LI_IOSTREAM_READ, /* should try reading */
LI_IOSTREAM_WRITE, /* should try writing */
LI_IOSTREAM_CONNECTED_DEST, /* stream_in connected dest */
LI_IOSTREAM_CONNECTED_SOURCE, /* stream_out connected source */
LI_IOSTREAM_DISCONNECTED_DEST, /* stream_in disconnected dest */
LI_IOSTREAM_DISCONNECTED_SOURCE, /* stream_out disconnected source */
LI_IOSTREAM_DESTROY /* stream_in and stream_out are down to refcount = 0 */
} liIOStreamEvent;
/* throttle.h */
typedef struct liThrottlePool liThrottlePool;

@ -231,6 +231,9 @@ SET(LIGHTTPD_SHARED_SRC
response.c
server.c
stat_cache.c
stream.c
stream_http_response.c
stream_simple_socket.c
throttle.c
url_parser.c
value.c

@ -36,6 +36,9 @@ lighttpd_shared_src= \
response.c \
server.c \
stat_cache.c \
stream.c \
stream_http_response.c \
stream_simple_socket.c \
throttle.c \
url_parser.c \
value.c \

@ -24,8 +24,14 @@
action status {
getStringTo(fpc, ctx->h_value);
ctx->response->http_status = atoi(ctx->h_value->str);
if (ctx->response->http_status >= 100 && ctx->response->http_status < 200) {
switch (ctx->response->http_status) {
case 100: /* Continue */
case 102: /* Processing */
ctx->drop_header = TRUE;
break;
/* don't ignore 101 Switching Protocols */
default:
break;
}
}

@ -0,0 +1,547 @@
#include <lighttpd/base.h>
const gchar* li_stream_event_string(liStreamEvent event) {
switch (event) {
case LI_STREAM_NEW_DATA:
return "new_data";
case LI_STREAM_NEW_CQLIMIT:
return "new_cqlimit";
case LI_STREAM_CONNECTED_DEST:
return "connected_dest";
case LI_STREAM_CONNECTED_SOURCE:
return "connected_source";
case LI_STREAM_DISCONNECTED_DEST:
return "disconnected_dest";
case LI_STREAM_DISCONNECTED_SOURCE:
return "disconnected_source";
case LI_STREAM_DESTROY:
return "destroy";
}
return "invalid stream event";
}
const gchar* li_iostream_event_string(liIOStreamEvent event) {
switch (event) {
case LI_IOSTREAM_READ:
return "read";
case LI_IOSTREAM_WRITE:
return "write";
case LI_IOSTREAM_CONNECTED_DEST:
return "connected_dest";
case LI_IOSTREAM_CONNECTED_SOURCE:
return "connected_source";
case LI_IOSTREAM_DISCONNECTED_DEST:
return "disconnected_dest";
case LI_IOSTREAM_DISCONNECTED_SOURCE:
return "disconnected_source";
case LI_IOSTREAM_DESTROY:
return "destroy";
}
return "invalid stream event";
}
/* callback can assume that the stream is not destroyed while the callback is running */
static void li_stream_safe_cb(liStream *stream, liStreamEvent event) {
if (NULL != stream->cb) {
li_stream_acquire(stream);
stream->cb(stream, event);
li_stream_release(stream);
}
}
static void stream_new_data_job_cb(liJob *job) {
liStream *stream = LI_CONTAINER_OF(job, liStream, new_data_job);
li_stream_safe_cb(stream, LI_STREAM_NEW_DATA);
}
void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb) {
stream->refcount = 1;
stream->source = stream->dest = NULL;
stream->out = li_chunkqueue_new();
li_job_init(&stream->new_data_job, stream_new_data_job_cb);
stream->jobqueue = jobqueue;
stream->cb = cb;
}
void li_stream_acquire(liStream* stream) {
assert(g_atomic_int_get(&stream->refcount) > 0);
g_atomic_int_inc(&stream->refcount);
}
void li_stream_release(liStream* stream) {
assert(g_atomic_int_get(&stream->refcount) > 0);
if (g_atomic_int_dec_and_test(&stream->refcount)) {
li_job_clear(&stream->new_data_job);
li_chunkqueue_free(stream->out);
stream->out = NULL;
stream->jobqueue = NULL;
if (NULL != stream->cb) stream->cb(stream, LI_STREAM_DESTROY); /* "unsafe" cb, we can't keep a ref this time */
}
}
void li_stream_connect(liStream *source, liStream *dest) {
/* streams must be "valid" */
assert(source->refcount > 0 && dest->refcount > 0);
if (NULL != source->dest || NULL != dest->source) {
g_error("Can't connect already connected streams");
}
/* keep them alive for this function and for callbacks (-> callbacks are "safe") */
g_atomic_int_inc(&source->refcount);
g_atomic_int_inc(&dest->refcount);
/* references for the links */
g_atomic_int_inc(&source->refcount);
g_atomic_int_inc(&dest->refcount);
source->dest = dest;
dest->source = source;
if (NULL != source->cb) source->cb(source, LI_STREAM_CONNECTED_DEST);
/* only notify dest if source didn't disconnect */
if (source->dest == dest && NULL != dest->cb) dest->cb(dest, LI_STREAM_CONNECTED_SOURCE);
/* still connected: sync liCQLimit */
if (source->dest == dest) {
liCQLimit *sl = source->out->limit, *dl = dest->out->limit;
if (sl != NULL && dl == NULL) {
li_stream_set_cqlimit(dest, NULL, sl);
}
else if (sl == NULL && dl != NULL) {
li_stream_set_cqlimit(NULL, source, dl);
}
}
/* still connected and source has data: notify dest */
if (source->dest == dest && (source->out->length > 0 || source->out->is_closed)) {
li_stream_again_later(dest);
}
/* release our "function" refs */
li_stream_release(source);
li_stream_release(dest);
}
static void _disconnect(liStream *source, liStream *dest) {
/* streams must be "valid" */
assert(g_atomic_int_get(&source->refcount) > 0 && g_atomic_int_get(&dest->refcount) > 0);
assert(source->dest == dest && dest->source == source);
source->dest = NULL;
dest->source = NULL;
/* we still have the references from the links -> callbacks are "safe" */
if (NULL != source->cb) source->cb(source, LI_STREAM_DISCONNECTED_DEST);
if (NULL != dest->cb) dest->cb(dest, LI_STREAM_DISCONNECTED_SOURCE);
/* release references from the link */
li_stream_release(source);
li_stream_release(dest);
}
void li_stream_disconnect(liStream *stream) {
if (NULL == stream || NULL == stream->source) return;
_disconnect(stream->source, stream);
}
void li_stream_disconnect_dest(liStream *stream) {
if (NULL == stream || NULL == stream->dest) return;
_disconnect(stream, stream->dest);
}
void li_stream_reset(liStream *stream) {
if (NULL == stream || 0 == stream->refcount) return;
li_stream_acquire(stream);
if (NULL != stream->source) _disconnect(stream->source, stream);
if (NULL != stream->dest) _disconnect(stream, stream->dest);
li_stream_release(stream);
}
void li_stream_notify(liStream *stream) {
if (NULL != stream->dest) li_stream_again(stream->dest);
}
void li_stream_notify_later(liStream *stream) {
if (NULL != stream->dest) li_stream_again_later(stream->dest);
}
void li_stream_again(liStream *stream) {
if (NULL != stream->jobqueue) {
li_job_now(stream->jobqueue, &stream->new_data_job);
}
}
void li_stream_again_later(liStream *stream) {
if (NULL != stream->jobqueue) {
li_job_later(stream->jobqueue, &stream->new_data_job);
}
}
void li_stream_detach(liStream *stream) {
stream->jobqueue = NULL;
li_job_stop(&stream->new_data_job);
li_chunkqueue_set_limit(stream->out, NULL);
}
void li_stream_attach(liStream *stream, liJobQueue *jobqueue) {
stream->jobqueue = jobqueue;
li_stream_again_later(stream);
}
void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit) {
if (NULL != limit) li_cqlimit_acquire(limit);
if (NULL == first) {
while (NULL != last && NULL == last->out->limit) {
if (limit == last->out->limit) break;
li_chunkqueue_set_limit(last->out, limit);
if (NULL != last->cb) {
liStream *cur = last;
last = last->source;
li_stream_acquire(cur);
cur->cb(cur, LI_STREAM_NEW_CQLIMIT);
li_stream_release(cur);
} else {
last = last->source;
}
}
} else {
gboolean reached_last = FALSE;
while (NULL != first && !reached_last && NULL != first->out->limit) {
if (limit == first->out->limit) break;
if (first == last) reached_last = TRUE;
li_chunkqueue_set_limit(first->out, limit);
if (NULL != first->cb) {
liStream *cur = first;
first = first->dest;
li_stream_acquire(cur);
cur->cb(cur, LI_STREAM_NEW_CQLIMIT);
li_stream_release(cur);
} else {
first = first->dest;
}
}
}
if (NULL != limit) li_cqlimit_release(limit);
}
gboolean li_streams_empty(liStream *first, liStream *last) {
if (NULL == first) {
while (NULL != last) {
if (NULL != last->out && last->out->length > 0) return FALSE;
last = last->source;
}
} else {
while (NULL != first) {
if (NULL != first->out && first->out->length > 0) return FALSE;
if (first == last) break;
first = first->dest;
}
}
return TRUE;
}
static void stream_plug_cb(liStream *stream, liStreamEvent event) {
switch (event) {
case LI_STREAM_NEW_DATA:
if (!stream->out->is_closed && NULL != stream->source) {
li_chunkqueue_steal_all(stream->out, stream->source->out);
stream->out->is_closed = stream->out->is_closed || stream->source->out->is_closed;
li_stream_notify_later(stream);
}
if (stream->out->is_closed) {
li_stream_disconnect(stream);
}
break;
case LI_STREAM_DISCONNECTED_DEST:
li_stream_disconnect(stream);
break;
case LI_STREAM_DESTROY:
g_slice_free(liStream, stream);
break;
default:
break;
}
}
liStream* li_stream_plug_new(liJobQueue *jobqueue) {
liStream *stream = g_slice_new0(liStream);
li_stream_init(stream, jobqueue, stream_plug_cb);
return stream;
}
static void stream_null_cb(liStream *stream, liStreamEvent event) {
switch (event) {
case LI_STREAM_NEW_DATA:
if (NULL == stream->source) return;
li_chunkqueue_skip_all(stream->source->out);
if (stream->source->out->is_closed) li_stream_disconnect(stream);
break;
case LI_STREAM_DESTROY:
g_slice_free(liStream, stream);
break;
default:
break;
}
}
liStream* li_stream_null_new(liJobQueue *jobqueue) {
liStream *stream = g_slice_new0(liStream);
li_stream_init(stream, jobqueue, stream_null_cb);
stream->out->is_closed = TRUE;
return stream;
}
static void iostream_destroy(liIOStream *iostream) {
if (0 < iostream->stream_out.refcount || 0 < iostream->stream_in.refcount) return;
iostream->stream_out.refcount = iostream->stream_in.refcount = 1;
if (NULL != iostream->stream_in_limit) {
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
iostream->stream_in_limit->io_watcher = NULL;
}
li_cqlimit_release(iostream->stream_in_limit);
iostream->stream_in_limit = NULL;
}
if (NULL != iostream->write_timeout_queue) {
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
iostream->write_timeout_queue = NULL;
}
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
iostream->cb(iostream, LI_IOSTREAM_DESTROY);
assert(1 == iostream->stream_out.refcount);
assert(1 == iostream->stream_in.refcount);
g_slice_free(liIOStream, iostream);
}
static void iostream_in_cb(liStream *stream, liStreamEvent event) {
liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_in);
switch (event) {
case LI_STREAM_NEW_DATA:
if (0 == li_chunkqueue_limit_available(stream->out)) {
/* locked */
return;
}
if (iostream->can_read) {
goffset curoutlen = stream->out->length;
gboolean curout_closed = stream->out->is_closed;
iostream->cb(iostream, LI_IOSTREAM_READ);
if (curoutlen != stream->out->length || curout_closed != stream->out->is_closed) {
li_stream_notify_later(stream);
}
if (-1 == iostream->io_watcher.fd) return;
if (iostream->can_read) {
li_stream_again_later(stream);
}
}
if (!iostream->can_read && !iostream->in_closed) {
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ);
}
if (!iostream->can_write && !iostream->out_closed) {
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE);
}
break;
case LI_STREAM_NEW_CQLIMIT:
if (NULL != iostream->stream_in_limit) {
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
iostream->stream_in_limit->io_watcher = NULL;
}
li_cqlimit_release(iostream->stream_in_limit);
}
if (stream->out->limit) {
stream->out->limit->io_watcher = &iostream->io_watcher;
li_cqlimit_acquire(stream->out->limit);
}
iostream->stream_in_limit = stream->out->limit;
break;
case LI_STREAM_CONNECTED_SOURCE:
/* there is no incoming data */
li_stream_disconnect(stream);
break;
case LI_STREAM_CONNECTED_DEST:
iostream->cb(iostream, LI_IOSTREAM_CONNECTED_DEST);
break;
case LI_STREAM_DISCONNECTED_DEST:
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_DEST);
break;
case LI_STREAM_DESTROY:
iostream->can_read = FALSE;
iostream_destroy(iostream);
break;
default:
break;
}
}
static void iostream_out_cb(liStream *stream, liStreamEvent event) {
liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_out);
switch (event) {
case LI_STREAM_NEW_DATA:
if (iostream->can_write) {
iostream->cb(iostream, LI_IOSTREAM_WRITE);
if (NULL != iostream->write_timeout_queue) {
if (stream->out->length > 0) {
if (!iostream->write_timeout_elem.queued || (iostream->write_timeout_elem.ts + 1.0) < ev_now(iostream->wrk->loop)) {
li_waitqueue_push(iostream->write_timeout_queue, &iostream->write_timeout_elem);
}
} else {
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
}
}
if (-1 == iostream->io_watcher.fd) return;
if (iostream->can_write) {
if (stream->out->length > 0 || stream->out->is_closed) {
li_stream_again_later(stream);
}
}
}
if (!iostream->can_read && !iostream->in_closed) {
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ);
}
if (!iostream->can_write && !iostream->out_closed) {
li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE);
}
break;
case LI_STREAM_CONNECTED_DEST:
/* there is no outgoing data */
li_stream_disconnect_dest(stream);
break;
case LI_STREAM_CONNECTED_SOURCE:
iostream->cb(iostream, LI_IOSTREAM_CONNECTED_SOURCE);
break;
case LI_STREAM_DISCONNECTED_SOURCE:
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_SOURCE);
break;
case LI_STREAM_DESTROY:
iostream->can_write = FALSE;
iostream_destroy(iostream);
break;
default:
break;
}
}
static void iostream_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
liIOStream *iostream = (liIOStream*) w->data;
gboolean do_write = FALSE;
UNUSED(loop);
li_ev_io_rem_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE | EV_READ);
if (0 != (revents & EV_WRITE) && !iostream->can_write && iostream->stream_out.refcount > 0) {
iostream->can_write = TRUE;
do_write = TRUE;
li_stream_acquire(&iostream->stream_out); /* keep out stream alive during li_stream_again(&iostream->stream_in) */
}
if (0 != (revents & EV_READ) && !iostream->can_read && iostream->stream_in.refcount > 0) {
iostream->can_read = TRUE;
li_stream_again_later(&iostream->stream_in);
}
if (do_write) {
li_stream_again_later(&iostream->stream_out);
li_stream_release(&iostream->stream_out);
}
}
liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data) {
liIOStream *iostream = g_slice_new0(liIOStream);
li_stream_init(&iostream->stream_in, &wrk->jobqueue, iostream_in_cb);
li_stream_init(&iostream->stream_out, &wrk->jobqueue, iostream_out_cb);
iostream->stream_in_limit = NULL;
iostream->write_timeout_queue = NULL;
iostream->wrk = wrk;
ev_io_init(&iostream->io_watcher, iostream_io_cb, fd, EV_READ);
iostream->io_watcher.data = iostream;
iostream->in_closed = iostream->out_closed = iostream->can_read = FALSE;
iostream->can_write = TRUE;
iostream->cb = cb;
iostream->data = data;
ev_io_start(iostream->wrk->loop, &iostream->io_watcher);
return iostream;
}
void li_iostream_acquire(liIOStream* iostream) {
li_stream_acquire(&iostream->stream_in);
li_stream_acquire(&iostream->stream_out);
}
void li_iostream_release(liIOStream* iostream) {
li_stream_release(&iostream->stream_in);
li_stream_release(&iostream->stream_out);
}
int li_iostream_reset(liIOStream *iostream) {
int fd;
if (NULL == iostream) return -1;
fd = iostream->io_watcher.fd;
if (NULL != iostream->wrk->loop) {
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
}
ev_io_set(&iostream->io_watcher, -1, 0);
if (NULL != iostream->write_timeout_queue) {
li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem);
iostream->write_timeout_queue = NULL;
}
li_stream_disconnect(&iostream->stream_out);
li_stream_disconnect_dest(&iostream->stream_in);
li_iostream_release(iostream);
return fd;
}
void li_iostream_detach(liIOStream *iostream) {
if (NULL != iostream->wrk) {
ev_io_stop(iostream->wrk->loop, &iostream->io_watcher);
iostream->wrk = NULL;
}
if (NULL != iostream->stream_in_limit) {
if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) {
iostream->stream_in_limit->io_watcher = NULL;
}
li_cqlimit_release(iostream->stream_in_limit);
iostream->stream_in_limit = NULL;
}
li_stream_detach(&iostream->stream_in);
li_stream_detach(&iostream->stream_out);
}
void li_iostream_attach(liIOStream *iostream, liWorker *wrk) {
assert(NULL == iostream->wrk);
li_stream_attach(&iostream->stream_in, &wrk->jobqueue);
li_stream_attach(&iostream->stream_out, &wrk->jobqueue);
iostream->wrk = wrk;
ev_io_start(iostream->wrk->loop, &iostream->io_watcher);
}

@ -0,0 +1,79 @@
#include <lighttpd/stream_http_response.h>
typedef struct liStreamHttpResponse liStreamHttpResponse;
struct liStreamHttpResponse {
liHttpResponseCtx parse_response_ctx;
liStream stream;
liVRequest *vr;
gboolean response_headers_finished;
};
static void stream_http_respone_data(liStreamHttpResponse* shr) {
if (NULL == shr->stream.source) return;
if (!shr->response_headers_finished) {
switch (li_http_response_parse(shr->vr, &shr->parse_response_ctx)) {
case LI_HANDLER_GO_ON:
shr->response_headers_finished = TRUE;
li_vrequest_indirect_headers_ready(shr->vr);
break;
case LI_HANDLER_ERROR:
VR_ERROR(shr->vr, "%s", "Parsing response header failed");
li_vrequest_error(shr->vr);
return;
case LI_HANDLER_WAIT_FOR_EVENT:
if (shr->stream.source == NULL || shr->stream.source->out->is_closed) {
VR_ERROR(shr->vr, "%s", "Parsing response header failed (eos)");
li_vrequest_error(shr->vr);
}
default:
return;
}
}
li_chunkqueue_steal_all(shr->stream.out, shr->stream.source->out);
if (shr->stream.source->out->is_closed) {
shr->stream.out->is_closed = TRUE;
li_stream_disconnect(&shr->stream);
}
li_stream_notify(&shr->stream);
}
static void stream_http_respone_cb(liStream *stream, liStreamEvent event) {
liStreamHttpResponse* shr = LI_CONTAINER_OF(stream, liStreamHttpResponse, stream);
switch (event) {
case LI_STREAM_NEW_DATA:
stream_http_respone_data(shr);
break;
case LI_STREAM_DISCONNECTED_DEST:
li_stream_disconnect(stream);
break;
case LI_STREAM_DISCONNECTED_SOURCE:
if (NULL != stream->dest && !stream->out->is_closed) {
/* "abort" */
li_stream_disconnect_dest(stream);
}
break;
case LI_STREAM_DESTROY:
li_http_response_parser_clear(&shr->parse_response_ctx);
g_slice_free(liStreamHttpResponse, shr);
break;
default:
break;
}
}
LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph) {
liStreamHttpResponse *shr = g_slice_new0(liStreamHttpResponse);
shr->response_headers_finished = FALSE;
shr->vr = vr;
li_stream_init(&shr->stream, &vr->wrk->jobqueue, stream_http_respone_cb);
li_http_response_parser_init(&shr->parse_response_ctx, &vr->response, http_in->out,
accept_cgi, accept_nph);
li_stream_connect(http_in, &shr->stream);
return &shr->stream;
}

@ -0,0 +1,142 @@
#include <lighttpd/base.h>
void li_stream_simple_socket_close(liIOStream *stream, gboolean aborted) {
int fd = stream->io_watcher.fd;
ev_io_stop(stream->wrk->loop, &stream->io_watcher);
if (-1 == fd) return;
stream->out_closed = stream->in_closed = TRUE;
stream->can_read = stream->can_write = FALSE;
if (NULL != stream->stream_in.out) {
stream->stream_in.out->is_closed = TRUE;
}
if (aborted || stream->in_closed) {
li_iostream_acquire(stream);
fd = li_iostream_reset(stream);
if (-1 != fd) {
shutdown(fd, SHUT_RDWR);
close(fd);
}
} else {
stream->io_watcher.fd = -1;
shutdown(fd, SHUT_WR);
li_stream_disconnect(&stream->stream_out);
li_worker_add_closing_socket(stream->wrk, fd);
}
}
static void stream_simple_socket_read(liIOStream *stream, gpointer *data) {
liNetworkStatus res;
GError *err = NULL;
liWorker *wrk = stream->wrk;
liChunkQueue *raw_in = stream->stream_in.out;
if (NULL == *data && NULL != wrk->network_read_buf) {
/* reuse worker buf if needed */
*data = wrk->network_read_buf;
wrk->network_read_buf = NULL;
}
{
liBuffer *raw_in_buffer = *data;
res = li_network_read(stream->io_watcher.fd, raw_in, &raw_in_buffer, &err);
*data = raw_in_buffer;
}
if (NULL == wrk->network_read_buf && NULL != *data
&& 1 == g_atomic_int_get(&((liBuffer*)*data)->refcount)) {
/* move buffer back to worker if we didn't use it */
wrk->network_read_buf = *data;
*data = NULL;
}
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
ERROR(wrk->srv, "network read fatal error: %s", NULL != err ? err->message : "(unknown)");
g_error_free(err);
li_stream_simple_socket_close(stream, TRUE);
break;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_READ);
stream->stream_in.out->is_closed = TRUE;
stream->in_closed = TRUE;
stream->can_read = FALSE;
break;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
stream->can_read = FALSE;
break;
}
}
static void stream_simple_socket_write(liIOStream *stream) {
liNetworkStatus res;
liChunkQueue *raw_out = stream->stream_out.out;
liChunkQueue *from = stream->stream_out.source->out;
li_chunkqueue_steal_all(raw_out, from);
if (raw_out->length > 0) {
static const goffset WRITE_MAX = 256*1024; /* 256kB */
goffset write_max;
GError *err = NULL;
write_max = WRITE_MAX;
res = li_network_write(stream->io_watcher.fd, raw_out, write_max, &err);
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
ERROR(stream->wrk->srv, "network write fatal error: %s", NULL != err ? err->message : "(unknown)");
g_error_free(err);
li_stream_simple_socket_close(stream, TRUE);
break;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
li_stream_simple_socket_close(stream, TRUE);
break;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
stream->can_write = FALSE;
break;
}
}
if (0 == raw_out->length && raw_out->is_closed) {
int fd = stream->io_watcher.fd;
li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_WRITE);
if (-1 != fd) shutdown(fd, SHUT_WR);
stream->out_closed = TRUE;
stream->can_write = FALSE;
li_stream_disconnect(&stream->stream_out);
}
}
void li_stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event) {
li_stream_simple_socket_io_cb_with_context(stream, event, &stream->data);
}
void li_stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data) {
switch (event) {
case LI_IOSTREAM_READ:
stream_simple_socket_read(stream, data);
break;
case LI_IOSTREAM_WRITE:
stream_simple_socket_write(stream);
break;
case LI_IOSTREAM_DESTROY:
if (NULL != *data) {
li_buffer_release(*data);
*data = NULL;
}
default:
break;
}
}

@ -52,6 +52,9 @@ def build(bld):
response.c
server.c
stat_cache.c
stream.c
stream_http_response.c
stream_simple_socket.c
throttle.c
url_parser.rl
value.c

Loading…
Cancel
Save