diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index 3096201..35528e6 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -18,7 +18,6 @@ #include #include -#include #include #include diff --git a/include/lighttpd/stream.h b/include/lighttpd/stream.h deleted file mode 100644 index 9b903c7..0000000 --- a/include/lighttpd/stream.h +++ /dev/null @@ -1,93 +0,0 @@ -#ifndef _LIGHTTPD_STREAM_H_ -#define _LIGHTTPD_STREAM_H_ - -#ifndef _LIGHTTPD_BASE_H_ -#error Please include instead of this file -#endif - -#include - -typedef void (*liStreamCB)(liStream *stream, liStreamEvent event); - -struct liStream { - gint refcount; - - liStream *source, *dest; - - liChunkQueue *out; - - liJob new_data_job; - liJobQueue *jobqueue; - - liStreamCB cb; -}; - -LI_API void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb); -LI_API void li_stream_acquire(liStream* stream); -LI_API void li_stream_release(liStream* stream); - -LI_API void li_stream_connect(liStream *source, liStream *dest); -LI_API void li_stream_disconnect(liStream *stream); /* disconnects stream->source and stream */ -LI_API void li_stream_disconnect_dest(liStream *stream); /* disconnects stream->dest and stream. only for errors/conection resets */ -LI_API void li_stream_reset(liStream *stream); /* disconnect both sides */ - -LI_API void li_stream_notify(liStream *stream); /* new data in stream->cq, notify stream->dest */ -LI_API void li_stream_notify_later(liStream *stream); -LI_API void li_stream_again(liStream *stream); /* more data to be generated in stream with event NEW_DATA or more data to be read from stream->source->cq */ -LI_API void li_stream_again_later(liStream *stream); - -/* detach from jobqueue, stops all event handling. you have to detach all connected streams to move streams between threads */ -LI_API void li_stream_detach(liStream *stream); -LI_API void li_stream_attach(liStream *stream, liJobQueue *jobqueue); /* attach to another jobqueue - possibly after switching threads */ - -/* walks from first using ->dest until it reaches NULL or (it reached last and NULL != i->limit) or limit == i->cq->limit and - * sets i->cq->limit to limit, triggering LI_STREAM_NEW_CQLIMIT. - * limit must not be NULL! - */ -LI_API void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit); - - -LI_API liStream* li_stream_plug_new(liJobQueue *jobqueue); /* simple forwarder; can also be used for providing data from memory */ -LI_API liStream* li_stream_null_new(liJobQueue *jobqueue); /* eats everything, disconnects source on eof, out is always closed */ - - - -typedef void (*liIOStreamCB)(liIOStream *stream, liIOStreamEvent event); - -/* TODO: support throttle */ -struct liIOStream { - liStream stream_in, stream_out; - liCQLimit *stream_in_limit; - - /* initialize these before connecting stream_out if you need them */ - liWaitQueue *write_timeout_queue; - liWaitQueueElem write_timeout_elem; - - liWorker *wrk; - ev_io io_watcher; - - /* whether we want to read/write */ - gboolean in_closed, out_closed; - gboolean can_read, can_write; /* set to FALSE if you got EAGAIN */ - - liIOStreamCB cb; - - gpointer data; /* data for the callback */ -}; - -LI_API liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data); -LI_API void li_iostream_acquire(liIOStream* iostream); -LI_API void li_iostream_release(liIOStream* iostream); - -LI_API int li_iostream_reset(liIOStream *iostream); /* returns fd, disconnects everything, stop callbacks, releases one reference */ - -/* similar to stream_detach/_attach */ -LI_API void li_iostream_detach(liIOStream *iostream); -LI_API void li_iostream_attach(liIOStream *iostream, liWorker *wrk); - - -LI_API void stream_simple_socket_close(liIOStream *stream, gboolean aborted); -LI_API void stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event); -LI_API void stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data); - -#endif diff --git a/include/lighttpd/stream_http_response.h b/include/lighttpd/stream_http_response.h deleted file mode 100644 index 6cae84b..0000000 --- a/include/lighttpd/stream_http_response.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef _LIGHTTPD_STREAM_HTTP_RESPONSE_H_ -#define _LIGHTTPD_STREAM_HTTP_RESPONSE_H_ - -#include - -LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph); - -#endif diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index ea406d3..bb6b69a 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -214,31 +214,6 @@ typedef struct liServer liServer; typedef struct liServerSocket liServerSocket; -/* stream.h */ - -typedef struct liStream liStream; -typedef struct liIOStream liIOStream; - -typedef enum { - LI_STREAM_NEW_DATA, /* either new/more data in stream->source->cq, or more data to be generated */ - LI_STREAM_NEW_CQLIMIT, - LI_STREAM_CONNECTED_DEST, - LI_STREAM_CONNECTED_SOURCE, - LI_STREAM_DISCONNECTED_DEST, - LI_STREAM_DISCONNECTED_SOURCE, - LI_STREAM_DESTROY -} liStreamEvent; - -typedef enum { - LI_IOSTREAM_READ, /* should try reading */ - LI_IOSTREAM_WRITE, /* should try writing */ - LI_IOSTREAM_CONNECTED_DEST, /* stream_in connected dest */ - LI_IOSTREAM_CONNECTED_SOURCE, /* stream_out connected source */ - LI_IOSTREAM_DISCONNECTED_DEST, /* stream_in disconnected dest */ - LI_IOSTREAM_DISCONNECTED_SOURCE, /* stream_out disconnected source */ - LI_IOSTREAM_DESTROY /* stream_in and stream_out are down to refcount = 0 */ -} liIOStreamEvent; - /* throttle.h */ typedef struct liThrottlePool liThrottlePool; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3b7b583..dcf3329 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -231,9 +231,6 @@ SET(LIGHTTPD_SHARED_SRC response.c server.c stat_cache.c - stream.c - stream_http_response.c - stream_simple_socket.c throttle.c url_parser.c value.c diff --git a/src/main/Makefile.am b/src/main/Makefile.am index 0001cf9..be81fe4 100644 --- a/src/main/Makefile.am +++ b/src/main/Makefile.am @@ -36,9 +36,6 @@ lighttpd_shared_src= \ response.c \ server.c \ stat_cache.c \ - stream.c \ - stream_http_response.c \ - stream_simple_socket.c \ throttle.c \ url_parser.c \ value.c \ diff --git a/src/main/http_response_parser.rl b/src/main/http_response_parser.rl index 0bd6c4f..9e690d7 100644 --- a/src/main/http_response_parser.rl +++ b/src/main/http_response_parser.rl @@ -24,14 +24,8 @@ action status { getStringTo(fpc, ctx->h_value); ctx->response->http_status = atoi(ctx->h_value->str); - switch (ctx->response->http_status) { - case 100: /* Continue */ - case 102: /* Processing */ + if (ctx->response->http_status >= 100 && ctx->response->http_status < 200) { ctx->drop_header = TRUE; - break; - /* don't ignore 101 Switching Protocols */ - default: - break; } } diff --git a/src/main/stream.c b/src/main/stream.c deleted file mode 100644 index 760f4d8..0000000 --- a/src/main/stream.c +++ /dev/null @@ -1,501 +0,0 @@ - -#include - -/* callback can assume that the stream is not destroyed while the callback is running */ -static void li_stream_safe_cb(liStream *stream, liStreamEvent event) { - if (NULL != stream->cb) { - li_stream_acquire(stream); - stream->cb(stream, event); - li_stream_release(stream); - } -} - -static void stream_new_data_job_cb(liJob *job) { - liStream *stream = LI_CONTAINER_OF(job, liStream, new_data_job); - li_stream_safe_cb(stream, LI_STREAM_NEW_DATA); -} - -void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb) { - stream->refcount = 1; - stream->source = stream->dest = NULL; - stream->out = li_chunkqueue_new(); - li_job_init(&stream->new_data_job, stream_new_data_job_cb); - stream->jobqueue = jobqueue; - stream->cb = cb; -} - -void li_stream_acquire(liStream* stream) { - assert(g_atomic_int_get(&stream->refcount) > 0); - g_atomic_int_inc(&stream->refcount); -} - -void li_stream_release(liStream* stream) { - assert(g_atomic_int_get(&stream->refcount) > 0); - if (g_atomic_int_dec_and_test(&stream->refcount)) { - li_job_clear(&stream->new_data_job); - li_chunkqueue_free(stream->out); - stream->out = NULL; - stream->jobqueue = NULL; - if (NULL != stream->cb) stream->cb(stream, LI_STREAM_DESTROY); /* "unsafe" cb, we can't keep a ref this time */ - } -} - -void li_stream_connect(liStream *source, liStream *dest) { - /* streams must be "valid" */ - assert(source->refcount > 0 && dest->refcount > 0); - - if (NULL != source->dest || NULL != dest->source) { - g_error("Can't connect already connected streams"); - } - - /* keep them alive for this function and for callbacks (-> callbacks are "safe") */ - g_atomic_int_inc(&source->refcount); - g_atomic_int_inc(&dest->refcount); - - /* references for the links */ - g_atomic_int_inc(&source->refcount); - g_atomic_int_inc(&dest->refcount); - source->dest = dest; - dest->source = source; - - if (NULL != source->cb) source->cb(source, LI_STREAM_CONNECTED_DEST); - /* only notify dest if source didn't disconnect */ - if (source->dest == dest && NULL != dest->cb) dest->cb(dest, LI_STREAM_CONNECTED_SOURCE); - - /* still connected: sync liCQLimit */ - if (source->dest == dest) { - liCQLimit *sl = source->out->limit, *dl = dest->out->limit; - if (sl != NULL && dl == NULL) { - li_stream_set_cqlimit(dest, NULL, sl); - } - else if (sl == NULL && dl != NULL) { - li_stream_set_cqlimit(NULL, source, dl); - } - } - - /* still connected and source has data: notify dest */ - if (source->dest == dest && (source->out->length > 0 || source->out->is_closed)) { - li_stream_again_later(dest); - } - - /* release our "function" refs */ - li_stream_release(source); - li_stream_release(dest); -} - -static void _disconnect(liStream *source, liStream *dest) { - /* streams must be "valid" */ - assert(g_atomic_int_get(&source->refcount) > 0 && g_atomic_int_get(&dest->refcount) > 0); - assert(source->dest == dest && dest->source == source); - - source->dest = NULL; - dest->source = NULL; - /* we still have the references from the links -> callbacks are "safe" */ - if (NULL != source->cb) source->cb(source, LI_STREAM_DISCONNECTED_DEST); - if (NULL != dest->cb) dest->cb(dest, LI_STREAM_DISCONNECTED_SOURCE); - - /* release references from the link */ - li_stream_release(source); - li_stream_release(dest); -} - -void li_stream_disconnect(liStream *stream) { - if (NULL == stream || NULL == stream->source) return; - _disconnect(stream->source, stream); -} - -void li_stream_disconnect_dest(liStream *stream) { - if (NULL == stream || NULL == stream->dest) return; - _disconnect(stream, stream->dest); -} - -void li_stream_reset(liStream *stream) { - if (NULL == stream || 0 == stream->refcount) return; - - li_stream_acquire(stream); - if (NULL != stream->source) _disconnect(stream->source, stream); - if (NULL != stream->dest) _disconnect(stream, stream->dest); - li_stream_release(stream); -} - -void li_stream_notify(liStream *stream) { - if (NULL != stream->dest) li_stream_again(stream->dest); -} - -void li_stream_notify_later(liStream *stream) { - if (NULL != stream->dest) li_stream_again_later(stream->dest); -} - -void li_stream_again(liStream *stream) { - if (NULL != stream->jobqueue) { - li_job_now(stream->jobqueue, &stream->new_data_job); - } -} - -void li_stream_again_later(liStream *stream) { - if (NULL != stream->jobqueue) { - li_job_later(stream->jobqueue, &stream->new_data_job); - } -} - -void li_stream_detach(liStream *stream) { - stream->jobqueue = NULL; - li_job_stop(&stream->new_data_job); - - li_chunkqueue_set_limit(stream->out, NULL); -} - -void li_stream_attach(liStream *stream, liJobQueue *jobqueue) { - stream->jobqueue = jobqueue; - li_stream_again_later(stream); -} - -void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *limit) { - assert(NULL != limit); - - li_cqlimit_acquire(limit); - if (NULL == first) { - while (NULL != last && NULL == last->out->limit) { - if (limit == last->out->limit) break; - li_chunkqueue_set_limit(last->out, limit); - if (NULL != last->cb) { - li_stream_acquire(last); - last->cb(last, LI_STREAM_NEW_CQLIMIT); - if (NULL != last->source) { - last = last->source; - li_stream_release(last->dest); - } else { - li_stream_release(last); - last = NULL; - } - } else { - last = last->source; - } - } - } else { - gboolean reached_last = (NULL == last); - while (NULL != first && !(reached_last && NULL != first->out->limit)) { - if (limit == first->out->limit) break; - if (first == last) reached_last = TRUE; - li_chunkqueue_set_limit(first->out, limit); - if (NULL != first->cb) { - li_stream_acquire(first); - first->cb(first, LI_STREAM_NEW_CQLIMIT); - if (NULL != first->dest) { - first = first->dest; - li_stream_release(first->source); - } else { - li_stream_release(first); - first = NULL; - } - } else { - first = first->dest; - } - } - } - li_cqlimit_release(limit); -} - - -static void stream_plug_cb(liStream *stream, liStreamEvent event) { - switch (event) { - case LI_STREAM_NEW_DATA: - if (!stream->out->is_closed && NULL != stream->source) { - li_chunkqueue_steal_all(stream->out, stream->source->out); - stream->out->is_closed = stream->out->is_closed || stream->source->out->is_closed; - li_stream_notify_later(stream); - } - if (stream->out->is_closed) { - li_stream_disconnect(stream); - } - break; - case LI_STREAM_DISCONNECTED_DEST: - li_stream_disconnect(stream); - break; - case LI_STREAM_DESTROY: - g_slice_free(liStream, stream); - break; - default: - break; - } -} - -liStream* li_stream_plug_new(liJobQueue *jobqueue) { - liStream *stream = g_slice_new0(liStream); - li_stream_init(stream, jobqueue, stream_plug_cb); - return stream; -} - -static void stream_null_cb(liStream *stream, liStreamEvent event) { - switch (event) { - case LI_STREAM_NEW_DATA: - if (NULL == stream->source) return; - li_chunkqueue_skip_all(stream->source->out); - if (stream->source->out->is_closed) li_stream_disconnect(stream); - break; - case LI_STREAM_DESTROY: - g_slice_free(liStream, stream); - break; - default: - break; - } -} - -liStream* li_stream_null_new(liJobQueue *jobqueue) { - liStream *stream = g_slice_new0(liStream); - li_stream_init(stream, jobqueue, stream_null_cb); - stream->out->is_closed = TRUE; - return stream; -} - - -static void iostream_destroy(liIOStream *iostream) { - if (0 < iostream->stream_out.refcount || 0 < iostream->stream_in.refcount) return; - iostream->stream_out.refcount = iostream->stream_in.refcount = 1; - - if (NULL != iostream->stream_in_limit) { - if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) { - iostream->stream_in_limit->io_watcher = NULL; - } - li_cqlimit_release(iostream->stream_in_limit); - iostream->stream_in_limit = NULL; - } - - if (NULL != iostream->write_timeout_queue) { - li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem); - iostream->write_timeout_queue = NULL; - } - - ev_io_stop(iostream->wrk->loop, &iostream->io_watcher); - - iostream->cb(iostream, LI_IOSTREAM_DESTROY); - - assert(1 == iostream->stream_out.refcount); - assert(1 == iostream->stream_in.refcount); - - g_slice_free(liIOStream, iostream); -} - -static void iostream_in_cb(liStream *stream, liStreamEvent event) { - liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_in); - - switch (event) { - case LI_STREAM_NEW_DATA: - if (0 == li_chunkqueue_limit_available(stream->out)) { - /* locked */ - return; - } - if (iostream->can_read) { - goffset curoutlen = stream->out->length; - gboolean curout_closed = stream->out->is_closed; - - iostream->cb(iostream, LI_IOSTREAM_READ); - - if (curoutlen != stream->out->length || curout_closed != stream->out->is_closed) { - li_stream_notify_later(stream); - } - - if (-1 == iostream->io_watcher.fd) return; - - if (iostream->can_read) { - li_stream_again_later(stream); - } - } - if (!iostream->can_read && !iostream->in_closed) { - li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ); - } - if (!iostream->can_write && !iostream->out_closed) { - li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE); - } - break; - case LI_STREAM_NEW_CQLIMIT: - if (NULL != iostream->stream_in_limit) { - if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) { - iostream->stream_in_limit->io_watcher = NULL; - } - li_cqlimit_release(iostream->stream_in_limit); - } - if (stream->out->limit) { - stream->out->limit->io_watcher = &iostream->io_watcher; - li_cqlimit_acquire(stream->out->limit); - } - iostream->stream_in_limit = stream->out->limit; - break; - case LI_STREAM_CONNECTED_SOURCE: - /* there is no incoming data */ - li_stream_disconnect(stream); - break; - case LI_STREAM_CONNECTED_DEST: - iostream->cb(iostream, LI_IOSTREAM_CONNECTED_DEST); - break; - case LI_STREAM_DISCONNECTED_DEST: - iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_DEST); - break; - case LI_STREAM_DESTROY: - iostream->can_read = FALSE; - iostream_destroy(iostream); - break; - default: - break; - } -} - -static void iostream_out_cb(liStream *stream, liStreamEvent event) { - liIOStream *iostream = LI_CONTAINER_OF(stream, liIOStream, stream_out); - - switch (event) { - case LI_STREAM_NEW_DATA: - if (iostream->can_write) { - iostream->cb(iostream, LI_IOSTREAM_WRITE); - if (NULL != iostream->write_timeout_queue) { - if (stream->out->length > 0) { - if (!iostream->write_timeout_elem.queued || (iostream->write_timeout_elem.ts + 1.0) < ev_now(iostream->wrk->loop)) { - li_waitqueue_push(iostream->write_timeout_queue, &iostream->write_timeout_elem); - } - } else { - li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem); - } - } - - if (-1 == iostream->io_watcher.fd) return; - - if (iostream->can_write) { - if (stream->out->length > 0 || stream->out->is_closed) { - li_stream_again_later(stream); - } - } - } - if (!iostream->can_read && !iostream->in_closed) { - li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_READ); - } - if (!iostream->can_write && !iostream->out_closed) { - li_ev_io_add_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE); - } - break; - case LI_STREAM_CONNECTED_DEST: - /* there is no outgoing data */ - li_stream_disconnect_dest(stream); - break; - case LI_STREAM_CONNECTED_SOURCE: - iostream->cb(iostream, LI_IOSTREAM_CONNECTED_SOURCE); - break; - case LI_STREAM_DISCONNECTED_SOURCE: - iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_SOURCE); - break; - case LI_STREAM_DESTROY: - iostream->can_write = FALSE; - iostream_destroy(iostream); - break; - default: - break; - } -} - -static void iostream_io_cb(struct ev_loop *loop, ev_io *w, int revents) { - liIOStream *iostream = (liIOStream*) w->data; - gboolean do_write = FALSE; - UNUSED(loop); - - li_ev_io_rem_events(iostream->wrk->loop, &iostream->io_watcher, EV_WRITE | EV_READ); - - if (0 != (revents & EV_WRITE) && !iostream->can_write && iostream->stream_out.refcount > 0) { - iostream->can_write = TRUE; - do_write = TRUE; - li_stream_acquire(&iostream->stream_out); /* keep out stream alive during li_stream_again(&iostream->stream_in) */ - } - - if (0 != (revents & EV_READ) && !iostream->can_read && iostream->stream_in.refcount > 0) { - iostream->can_read = TRUE; - li_stream_again_later(&iostream->stream_in); - } - - if (do_write) { - li_stream_again_later(&iostream->stream_out); - li_stream_release(&iostream->stream_out); - } -} - -liIOStream* li_iostream_new(liWorker *wrk, int fd, liIOStreamCB cb, gpointer data) { - liIOStream *iostream = g_slice_new0(liIOStream); - - li_stream_init(&iostream->stream_in, &wrk->jobqueue, iostream_in_cb); - li_stream_init(&iostream->stream_out, &wrk->jobqueue, iostream_out_cb); - iostream->stream_in_limit = NULL; - - iostream->write_timeout_queue = NULL; - - iostream->wrk = wrk; - ev_io_init(&iostream->io_watcher, iostream_io_cb, fd, EV_READ); - iostream->io_watcher.data = iostream; - - iostream->in_closed = iostream->out_closed = iostream->can_read = FALSE; - iostream->can_write = TRUE; - - iostream->cb = cb; - iostream->data = data; - - ev_io_start(iostream->wrk->loop, &iostream->io_watcher); - - return iostream; -} - -void li_iostream_acquire(liIOStream* iostream) { - li_stream_acquire(&iostream->stream_in); - li_stream_acquire(&iostream->stream_out); -} - -void li_iostream_release(liIOStream* iostream) { - li_stream_release(&iostream->stream_in); - li_stream_release(&iostream->stream_out); -} - -int li_iostream_reset(liIOStream *iostream) { - int fd; - if (NULL == iostream) return -1; - - fd = iostream->io_watcher.fd; - if (NULL != iostream->wrk->loop) { - ev_io_stop(iostream->wrk->loop, &iostream->io_watcher); - } - ev_io_set(&iostream->io_watcher, -1, 0); - - if (NULL != iostream->write_timeout_queue) { - li_waitqueue_remove(iostream->write_timeout_queue, &iostream->write_timeout_elem); - iostream->write_timeout_queue = NULL; - } - - li_stream_disconnect(&iostream->stream_out); - li_stream_disconnect_dest(&iostream->stream_in); - - li_iostream_release(iostream); - - return fd; -} - -void li_iostream_detach(liIOStream *iostream) { - if (NULL != iostream->wrk) { - ev_io_stop(iostream->wrk->loop, &iostream->io_watcher); - iostream->wrk = NULL; - } - - if (NULL != iostream->stream_in_limit) { - if (&iostream->io_watcher == iostream->stream_in_limit->io_watcher) { - iostream->stream_in_limit->io_watcher = NULL; - } - li_cqlimit_release(iostream->stream_in_limit); - iostream->stream_in_limit = NULL; - } - - li_stream_detach(&iostream->stream_in); - li_stream_detach(&iostream->stream_out); -} - -void li_iostream_attach(liIOStream *iostream, liWorker *wrk) { - assert(NULL == iostream->wrk); - - li_stream_attach(&iostream->stream_in, &wrk->jobqueue); - li_stream_attach(&iostream->stream_out, &wrk->jobqueue); - - iostream->wrk = wrk; - ev_io_start(iostream->wrk->loop, &iostream->io_watcher); -} diff --git a/src/main/stream_http_response.c b/src/main/stream_http_response.c deleted file mode 100644 index 31060d7..0000000 --- a/src/main/stream_http_response.c +++ /dev/null @@ -1,79 +0,0 @@ -#include - -typedef struct liStreamHttpResponse liStreamHttpResponse; - -struct liStreamHttpResponse { - liHttpResponseCtx parse_response_ctx; - - liStream stream; - liVRequest *vr; - gboolean response_headers_finished; -}; - -static void stream_http_respone_data(liStreamHttpResponse* shr) { - if (NULL == shr->stream.source) return; - - if (!shr->response_headers_finished) { - switch (li_http_response_parse(shr->vr, &shr->parse_response_ctx)) { - case LI_HANDLER_GO_ON: - shr->response_headers_finished = TRUE; - li_vrequest_indirect_headers_ready(shr->vr); - break; - case LI_HANDLER_ERROR: - VR_ERROR(shr->vr, "%s", "Parsing response header failed"); - li_vrequest_error(shr->vr); - return; - case LI_HANDLER_WAIT_FOR_EVENT: - if (shr->stream.source == NULL || shr->stream.source->out->is_closed) { - VR_ERROR(shr->vr, "%s", "Parsing response header failed (eos)"); - li_vrequest_error(shr->vr); - } - default: - return; - } - } - - li_chunkqueue_steal_all(shr->stream.out, shr->stream.source->out); - if (shr->stream.source->out->is_closed) { - shr->stream.out->is_closed = TRUE; - li_stream_disconnect(&shr->stream); - } - li_stream_notify(&shr->stream); -} - - -static void stream_http_respone_cb(liStream *stream, liStreamEvent event) { - liStreamHttpResponse* shr = LI_CONTAINER_OF(stream, liStreamHttpResponse, stream); - - switch (event) { - case LI_STREAM_NEW_DATA: - stream_http_respone_data(shr); - break; - case LI_STREAM_DISCONNECTED_DEST: - li_stream_disconnect(stream); - break; - case LI_STREAM_DISCONNECTED_SOURCE: - if (NULL != stream->dest && !stream->out->is_closed) { - /* "abort" */ - li_stream_disconnect_dest(stream); - } - break; - case LI_STREAM_DESTROY: - li_http_response_parser_clear(&shr->parse_response_ctx); - g_slice_free(liStreamHttpResponse, shr); - break; - default: - break; - } -} - -LI_API liStream* li_stream_http_response_handle(liStream *http_in, liVRequest *vr, gboolean accept_cgi, gboolean accept_nph) { - liStreamHttpResponse *shr = g_slice_new0(liStreamHttpResponse); - shr->response_headers_finished = FALSE; - shr->vr = vr; - li_stream_init(&shr->stream, &vr->wrk->jobqueue, stream_http_respone_cb); - li_http_response_parser_init(&shr->parse_response_ctx, &vr->response, http_in->out, - accept_cgi, accept_nph); - li_stream_connect(http_in, &shr->stream); - return &shr->stream; -} diff --git a/src/main/stream_simple_socket.c b/src/main/stream_simple_socket.c deleted file mode 100644 index 8aa265a..0000000 --- a/src/main/stream_simple_socket.c +++ /dev/null @@ -1,145 +0,0 @@ - -#include - -void stream_simple_socket_close(liIOStream *stream, gboolean aborted) { - int fd = stream->io_watcher.fd; - - ev_io_stop(stream->wrk->loop, &stream->io_watcher); - - if (-1 == fd) return; - - stream->out_closed = stream->in_closed = TRUE; - stream->can_read = stream->can_write = FALSE; - if (NULL != stream->stream_in.out) { - stream->stream_in.out->is_closed = TRUE; - } - - if (aborted || stream->in_closed) { - li_iostream_acquire(stream); - fd = li_iostream_reset(stream); - if (-1 != fd) { - shutdown(fd, SHUT_RDWR); - close(fd); - } - } else { - stream->io_watcher.fd = -1; - - shutdown(fd, SHUT_WR); - li_stream_disconnect(&stream->stream_out); - ERROR(stream->wrk->srv, "Adding fd %i to closing sockets", fd); - li_worker_add_closing_socket(stream->wrk, fd); - } -} - -static void stream_simple_socket_read(liIOStream *stream, gpointer *data) { - liNetworkStatus res; - GError *err = NULL; - liWorker *wrk = stream->wrk; - - liChunkQueue *raw_in = stream->stream_in.out; - - if (NULL == *data && NULL != wrk->network_read_buf) { - /* reuse worker buf if needed */ - *data = wrk->network_read_buf; - wrk->network_read_buf = NULL; - } - - { - liBuffer *raw_in_buffer = *data; - res = li_network_read(stream->io_watcher.fd, raw_in, &raw_in_buffer, &err); - *data = raw_in_buffer; - } - - if (NULL == wrk->network_read_buf && NULL != *data - && 1 == g_atomic_int_get(&((liBuffer*)*data)->refcount)) { - /* move buffer back to worker if we didn't use it */ - wrk->network_read_buf = *data; - *data = NULL; - } - - switch (res) { - case LI_NETWORK_STATUS_SUCCESS: - break; - case LI_NETWORK_STATUS_FATAL_ERROR: - ERROR(wrk->srv, "network read fatal error: %s", NULL != err ? err->message : "(unknown)"); - g_error_free(err); - stream_simple_socket_close(stream, TRUE); - break; - case LI_NETWORK_STATUS_CONNECTION_CLOSE: - li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_READ); - stream->stream_in.out->is_closed = TRUE; - stream->in_closed = TRUE; - stream->can_read = FALSE; - break; - case LI_NETWORK_STATUS_WAIT_FOR_EVENT: - stream->can_read = FALSE; - break; - } -} - -static void stream_simple_socket_write(liIOStream *stream) { - liNetworkStatus res; - liChunkQueue *raw_out = stream->stream_out.out; - liChunkQueue *from = stream->stream_out.source->out; - - li_chunkqueue_steal_all(raw_out, from); - - if (raw_out->length > 0) { - static const goffset WRITE_MAX = 256*1024; /* 256kB */ - goffset write_max; - GError *err = NULL; - - write_max = WRITE_MAX; - - res = li_network_write(stream->io_watcher.fd, raw_out, write_max, &err); - - switch (res) { - case LI_NETWORK_STATUS_SUCCESS: - break; - case LI_NETWORK_STATUS_FATAL_ERROR: - ERROR(stream->wrk->srv, "network write fatal error: %s", NULL != err ? err->message : "(unknown)"); - g_error_free(err); - stream_simple_socket_close(stream, TRUE); - break; - case LI_NETWORK_STATUS_CONNECTION_CLOSE: - stream_simple_socket_close(stream, TRUE); - break; - case LI_NETWORK_STATUS_WAIT_FOR_EVENT: - stream->can_write = FALSE; - break; - } - } - - if (0 == raw_out->length && from->is_closed) { - int fd = stream->io_watcher.fd; - li_ev_io_rem_events(stream->wrk->loop, &stream->io_watcher, EV_WRITE); - if (-1 != fd) shutdown(fd, SHUT_WR); - stream->out_closed = TRUE; - stream->can_write = FALSE; - li_stream_disconnect(&stream->stream_out); - } -} - -void stream_simple_socket_io_cb(liIOStream *stream, liIOStreamEvent event) { - stream_simple_socket_io_cb_with_context(stream, event, &stream->data); -} - -void stream_simple_socket_io_cb_with_context(liIOStream *stream, liIOStreamEvent event, gpointer *data) { - // TODO remove debug - ERROR(stream->wrk->srv, "stream_simple_socket_io_cb: %p, %i", (void*)stream, event); - switch (event) { - case LI_IOSTREAM_READ: - stream_simple_socket_read(stream, data); - break; - case LI_IOSTREAM_WRITE: - stream_simple_socket_write(stream); - break; - case LI_IOSTREAM_DESTROY: - if (NULL != *data) { - li_buffer_release(*data); - *data = NULL; - } - default: - break; - } -} diff --git a/src/main/wscript b/src/main/wscript index c589982..6149b21 100644 --- a/src/main/wscript +++ b/src/main/wscript @@ -52,9 +52,6 @@ def build(bld): response.c server.c stat_cache.c - stream.c - stream_http_response.c - stream_simple_socket.c throttle.c url_parser.rl value.c