wait for request body in dynamic backends, cache them on disk
parent
c40b1da839
commit
3aa78c1002
|
@ -39,7 +39,6 @@
|
|||
#include <lighttpd/request.h>
|
||||
#include <lighttpd/response.h>
|
||||
#include <lighttpd/environment.h>
|
||||
#include <lighttpd/filter_buffer_on_disk.h>
|
||||
#include <lighttpd/virtualrequest.h>
|
||||
#include <lighttpd/stat_cache.h>
|
||||
#include <lighttpd/mimetype.h>
|
||||
|
|
|
@ -1,26 +1,11 @@
|
|||
#ifndef _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_
|
||||
#define _LIGHTTPD_FILTER_BUFFER_ON_DISK_H_
|
||||
|
||||
#ifndef _LIGHTTPD_BASE_H_
|
||||
#error Please include <lighttpd/base.h> instead of this file
|
||||
#endif
|
||||
#include <lighttpd/base.h>
|
||||
|
||||
/* initialize with zero */
|
||||
typedef struct {
|
||||
/* internal state */
|
||||
liChunkFile *tempfile;
|
||||
goffset flush_pos, write_pos;
|
||||
|
||||
/* config */
|
||||
goffset flush_limit; /* -1: wait for end-of-stream, n >= 0: if more than n bytes have been written, the next part of the file gets forwarded to out */
|
||||
gboolean split_on_file_chunks; /* start a new file on FILE_CHUNK (those are not written to the file) */
|
||||
} liFilterBufferOnDiskState;
|
||||
|
||||
LI_API liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liChunkQueue *in, liFilterBufferOnDiskState *state);
|
||||
LI_API void li_filter_buffer_on_disk_reset(liFilterBufferOnDiskState *state);
|
||||
|
||||
|
||||
LI_API liHandlerResult li_filter_buffer_on_disk_cb(liVRequest *vr, liFilter *f);
|
||||
LI_API void li_filter_buffer_on_disk_free_cb(liVRequest *vr, liFilter *f);
|
||||
/* flush_limit: -1: wait for end-of-stream, n >= 0: if more than n bytes have been written, the next part of the file gets forwarded to out */
|
||||
/* split_on_file_chunks: start a new file on FILE_CHUNK (those are not written to the file) */
|
||||
LI_API liStream* li_filter_buffer_on_disk(liVRequest *vr, goffset flush_limit, gboolean split_on_file_chunks);
|
||||
LI_API void li_filter_buffer_on_disk_stop(liStream *stream);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -97,7 +97,7 @@ struct liVRequest {
|
|||
liStream *filters_in_last, *filters_out_last;
|
||||
liStream *filters_in_first, *filters_out_first;
|
||||
|
||||
liFilterBufferOnDiskState in_buffer_state;
|
||||
liStream *in_buffer_on_disk_stream, *wait_for_request_body_stream;
|
||||
|
||||
liPlugin *backend;
|
||||
liStream *backend_source;
|
||||
|
@ -111,6 +111,13 @@ struct liVRequest {
|
|||
GPtrArray *stat_cache_entries;
|
||||
};
|
||||
|
||||
#define LI_VREQUEST_WAIT_FOR_REQUEST_BODY(vr) \
|
||||
do { \
|
||||
if (!li_vrequest_wait_for_request_body(vr)) { \
|
||||
return LI_HANDLER_WAIT_FOR_EVENT; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LI_VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \
|
||||
do { \
|
||||
if (vr->state == LI_VRS_HANDLE_REQUEST_HEADERS) { \
|
||||
|
@ -128,19 +135,26 @@ LI_API void li_vrequest_free(liVRequest *vr);
|
|||
*/
|
||||
LI_API void li_vrequest_reset(liVRequest *vr, gboolean keepalive);
|
||||
|
||||
/* Signals an internal error; handles the error in the _next_ loop */
|
||||
LI_API void li_vrequest_error(liVRequest *vr);
|
||||
|
||||
LI_API void li_vrequest_backend_overloaded(liVRequest *vr);
|
||||
LI_API void li_vrequest_backend_dead(liVRequest *vr);
|
||||
LI_API void li_vrequest_backend_error(liVRequest *vr, liBackendError berror);
|
||||
LI_API void li_vrequest_backend_finished(liVRequest *vr); /* action.c */
|
||||
|
||||
/****************************************************/
|
||||
/* called by connection */
|
||||
/****************************************************/
|
||||
/* resets fields which weren't reset in favor of keep-alive tracking */
|
||||
LI_API void li_vrequest_start(liVRequest *vr);
|
||||
/* received all request headers */
|
||||
LI_API void li_vrequest_handle_request_headers(liVRequest *vr);
|
||||
|
||||
/* called by connection IO handling */
|
||||
LI_API void li_vrequest_update_stats_in(liVRequest *vr, goffset transferred);
|
||||
LI_API void li_vrequest_update_stats_out(liVRequest *vr, goffset transferred);
|
||||
|
||||
/****************************************************/
|
||||
/* called by actions handling the request */
|
||||
/****************************************************/
|
||||
/* returns TRUE if request body is present
|
||||
* or shouldn't be waited for (if caching on disk is disabled and liCQLimit hit, ...)
|
||||
* if it returns FALSE it will trigger li_vrequest_joblist_append later */
|
||||
LI_API gboolean li_vrequest_wait_for_request_body(liVRequest *vr);
|
||||
|
||||
/* response completely ready; use this only in action callbacks */
|
||||
LI_API gboolean li_vrequest_handle_direct(liVRequest *vr);
|
||||
/* check whether the request is already handled */
|
||||
|
@ -153,16 +167,22 @@ LI_API void li_vrequest_indirect_connect(liVRequest *vr, liStream *backend_drain
|
|||
/* received all response headers/status code - call once from your indirect handler */
|
||||
LI_API void li_vrequest_indirect_headers_ready(liVRequest *vr);
|
||||
|
||||
/* Signals an internal error; handles the error in the _next_ loop */
|
||||
LI_API void li_vrequest_error(liVRequest *vr);
|
||||
|
||||
LI_API void li_vrequest_backend_overloaded(liVRequest *vr);
|
||||
LI_API void li_vrequest_backend_dead(liVRequest *vr);
|
||||
LI_API void li_vrequest_backend_error(liVRequest *vr, liBackendError berror);
|
||||
LI_API void li_vrequest_backend_finished(liVRequest *vr); /* action.c */
|
||||
|
||||
|
||||
|
||||
LI_API void li_vrequest_state_machine(liVRequest *vr);
|
||||
LI_API void li_vrequest_joblist_append(liVRequest *vr);
|
||||
LI_API liJobRef* li_vrequest_get_ref(liVRequest *vr);
|
||||
|
||||
LI_API gboolean li_vrequest_redirect(liVRequest *vr, GString *uri);
|
||||
|
||||
LI_API gboolean li_vrequest_redirect_directory(liVRequest *vr);
|
||||
|
||||
/* updates worker stats too */
|
||||
LI_API void li_vrequest_update_stats_in(liVRequest *vr, goffset transferred);
|
||||
LI_API void li_vrequest_update_stats_out(liVRequest *vr, goffset transferred);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -458,8 +458,8 @@ void li_connection_start(liConnection *con, liSocketAddress remote_addr, int s,
|
|||
|
||||
assert(NULL != con->con_sock.raw_in || NULL != con->con_sock.raw_out);
|
||||
|
||||
li_chunkqueue_use_limit(con->con_sock.raw_in->out, 512*1024);
|
||||
li_chunkqueue_use_limit(con->con_sock.raw_out->out, 512*1024);
|
||||
li_chunkqueue_use_limit(con->con_sock.raw_in->out, 256*1024);
|
||||
li_chunkqueue_use_limit(con->con_sock.raw_out->out, 256*1024);
|
||||
|
||||
li_stream_connect(&con->out, con->con_sock.raw_out);
|
||||
li_stream_connect(con->con_sock.raw_in, &con->in);
|
||||
|
|
|
@ -1,17 +1,71 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
#include <lighttpd/filter_buffer_on_disk.h>
|
||||
|
||||
typedef liFilterBufferOnDiskState bod_state;
|
||||
typedef struct bod_state bod_state;
|
||||
|
||||
struct bod_state {
|
||||
liStream stream;
|
||||
liVRequest *vr;
|
||||
|
||||
/* internal state */
|
||||
liChunkFile *tempfile;
|
||||
goffset flush_pos, write_pos;
|
||||
|
||||
/* config */
|
||||
goffset flush_limit;
|
||||
gboolean split_on_file_chunks;
|
||||
};
|
||||
|
||||
/* flush current tempfile chunk. ignores out->is_closed. */
|
||||
static void bod_flush(bod_state *state) {
|
||||
liChunkQueue *out = state->stream.out;
|
||||
if (NULL != out && NULL != state->tempfile && state->write_pos > state->flush_pos) {
|
||||
li_chunkqueue_append_chunkfile(out, state->tempfile, state->flush_pos, state->write_pos - state->flush_pos);
|
||||
state->flush_pos = state->write_pos;
|
||||
li_stream_notify(&state->stream);
|
||||
}
|
||||
}
|
||||
|
||||
/* flush if flush_limit is reached */
|
||||
static void bod_autoflush(bod_state *state) {
|
||||
if (-1 != state->flush_limit && state->write_pos - state->flush_pos > state->flush_limit) {
|
||||
bod_flush(state);
|
||||
}
|
||||
}
|
||||
|
||||
/* close current file, flush it before if necessary */
|
||||
static void bod_close(bod_state *state) {
|
||||
if (NULL != state->tempfile) {
|
||||
bod_flush(state);
|
||||
li_chunkfile_release(state->tempfile);
|
||||
state->tempfile = NULL;
|
||||
}
|
||||
state->flush_pos = state->write_pos = 0;
|
||||
}
|
||||
|
||||
static gboolean bod_open(liVRequest *vr, bod_state *state) {
|
||||
/* abort buffering, disconnect streams */
|
||||
static void bod_error(bod_state *state) {
|
||||
bod_close(state);
|
||||
li_stream_reset(&state->stream);
|
||||
state->vr = NULL;
|
||||
}
|
||||
|
||||
/* stop buffering, forward everyting */
|
||||
static void bod_stop(bod_state *state) {
|
||||
bod_close(state);
|
||||
if (NULL != state->stream.source && !state->stream.out->is_closed) {
|
||||
li_chunkqueue_steal_all(state->stream.out, state->stream.source->out);
|
||||
if (state->stream.source->out->is_closed) {
|
||||
state->stream.out->is_closed = TRUE;
|
||||
li_stream_disconnect(&state->stream);
|
||||
}
|
||||
li_stream_notify(&state->stream);
|
||||
}
|
||||
state->vr = NULL;
|
||||
}
|
||||
|
||||
static gboolean bod_open(bod_state *state) {
|
||||
if (NULL == state->tempfile) {
|
||||
gint fd;
|
||||
GString *tmpfilename;
|
||||
|
@ -24,8 +78,9 @@ static gboolean bod_open(liVRequest *vr, bod_state *state) {
|
|||
|
||||
fd = g_mkstemp(tmpfilename->str);
|
||||
if (-1 == fd) {
|
||||
VR_ERROR(vr, "g_mkstemp failed: %s", g_strerror(errno));
|
||||
VR_ERROR(state->vr, "g_mkstemp failed: %s", g_strerror(errno));
|
||||
g_string_free(tmpfilename, TRUE);
|
||||
bod_stop(state);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
|
@ -39,28 +94,22 @@ static gboolean bod_open(liVRequest *vr, bod_state *state) {
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
static void bod_flush(liChunkQueue *out, bod_state *state) {
|
||||
if (state->tempfile && state->write_pos > state->flush_pos) {
|
||||
li_chunkqueue_append_chunkfile(out, state->tempfile, state->flush_pos, state->write_pos - state->flush_pos);
|
||||
state->flush_pos = state->write_pos;
|
||||
}
|
||||
}
|
||||
|
||||
static void bod_autoflush(liChunkQueue *out, bod_state *state) {
|
||||
if (-1 != state->flush_limit && state->tempfile && state->write_pos - state->flush_pos > state->flush_limit) {
|
||||
li_chunkqueue_append_chunkfile(out, state->tempfile, state->flush_pos, state->write_pos - state->flush_pos);
|
||||
state->flush_pos = state->write_pos;
|
||||
}
|
||||
}
|
||||
|
||||
liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liChunkQueue *in, bod_state *state) {
|
||||
UNUSED(vr);
|
||||
static void bod_handle_data(bod_state *state) {
|
||||
liChunkQueue *out = state->stream.out;
|
||||
liChunkQueue *in;
|
||||
|
||||
if (out->is_closed) {
|
||||
in->is_closed = TRUE;
|
||||
li_chunkqueue_skip_all(in);
|
||||
li_stream_disconnect(&state->stream);
|
||||
bod_close(state);
|
||||
return LI_HANDLER_GO_ON;
|
||||
return;
|
||||
}
|
||||
|
||||
in = (state->stream.source != NULL) ? state->stream.source->out : NULL;
|
||||
if (NULL == in) goto out;
|
||||
|
||||
if (NULL == state->vr) {
|
||||
li_chunkqueue_steal_all(out, in);
|
||||
goto out;
|
||||
}
|
||||
|
||||
while (in->length > 0) {
|
||||
|
@ -70,19 +119,22 @@ liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liCh
|
|||
char *data = NULL;
|
||||
GError *err;
|
||||
|
||||
assert(UNUSED_CHUNK != c->type);
|
||||
switch (c->type) {
|
||||
case UNUSED_CHUNK: return LI_HANDLER_ERROR;
|
||||
case UNUSED_CHUNK:
|
||||
/* shouldn't happen anyway, but stealing it is ok here too */
|
||||
case FILE_CHUNK:
|
||||
bod_flush(out, state);
|
||||
if (state->split_on_file_chunks) {
|
||||
bod_close(state);
|
||||
} else {
|
||||
bod_flush(state);
|
||||
}
|
||||
li_chunkqueue_steal_chunk(out, in);
|
||||
break;
|
||||
case STRING_CHUNK:
|
||||
case MEM_CHUNK:
|
||||
case BUFFER_CHUNK:
|
||||
if (!bod_open(vr, state)) return LI_HANDLER_ERROR;
|
||||
if (!bod_open(state)) return;
|
||||
|
||||
length = li_chunk_length(c);
|
||||
ci = li_chunkqueue_iter(in);
|
||||
|
@ -90,10 +142,11 @@ liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liCh
|
|||
err = NULL;
|
||||
if (LI_HANDLER_GO_ON != li_chunkiter_read(ci, 0, length, &data, &data_len, &err)) {
|
||||
if (NULL != err) {
|
||||
VR_ERROR(vr, "%s", err->message);
|
||||
VR_ERROR(state->vr, "%s", err->message);
|
||||
g_error_free(err);
|
||||
}
|
||||
return LI_HANDLER_ERROR;
|
||||
bod_error(state);
|
||||
return;
|
||||
}
|
||||
|
||||
while ( data_len > 0 ) {
|
||||
|
@ -107,8 +160,9 @@ liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liCh
|
|||
default: break;
|
||||
}
|
||||
|
||||
VR_ERROR(vr, "pwrite failed: %s", g_strerror(errno));
|
||||
return LI_HANDLER_ERROR;
|
||||
VR_ERROR(state->vr, "pwrite failed: %s", g_strerror(errno));
|
||||
bod_stop(state); /* write failures are not critical */
|
||||
return;
|
||||
}
|
||||
|
||||
data += r;
|
||||
|
@ -122,44 +176,66 @@ liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liCh
|
|||
}
|
||||
}
|
||||
|
||||
bod_autoflush(out, state);
|
||||
bod_autoflush(state);
|
||||
|
||||
if (in->is_closed) {
|
||||
bod_flush(out, state);
|
||||
out:
|
||||
if (NULL == in || in->is_closed) {
|
||||
out->is_closed = TRUE;
|
||||
bod_close(state); /* close/flush ignores out->is_closed */
|
||||
li_stream_notify(&state->stream); /* if no flush happened we still notify */
|
||||
}
|
||||
}
|
||||
|
||||
static void bod_cb(liStream *stream, liStreamEvent event) {
|
||||
bod_state *state = LI_CONTAINER_OF(stream, bod_state, stream);
|
||||
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
bod_handle_data(state);
|
||||
break;
|
||||
case LI_STREAM_NEW_CQLIMIT:
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_DEST:
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_SOURCE:
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_DEST:
|
||||
if (!state->stream.out->is_closed || 0 != state->stream.out->length) {
|
||||
li_stream_disconnect(stream);
|
||||
bod_close(state);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_SOURCE:
|
||||
if (!state->stream.out->is_closed) {
|
||||
li_stream_disconnect_dest(stream);
|
||||
bod_close(state);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
bod_close(state);
|
||||
return LI_HANDLER_GO_ON;
|
||||
}
|
||||
return LI_HANDLER_GO_ON;
|
||||
}
|
||||
|
||||
void li_filter_buffer_on_disk_reset(bod_state *state) {
|
||||
bod_close(state);
|
||||
}
|
||||
|
||||
liHandlerResult li_filter_buffer_on_disk_cb(liVRequest *vr, liFilter *f) {
|
||||
goffset lim_avail;
|
||||
bod_state *state = f->param;
|
||||
|
||||
if (NULL == state) state = &vr->in_buffer_state;
|
||||
|
||||
if (state->tempfile || vr->request.content_length < 0 || vr->request.content_length > 64*1024 ||
|
||||
((lim_avail = li_chunkqueue_limit_available(f->in)) <= 32*1024 && lim_avail >= 0)) {
|
||||
return li_filter_buffer_on_disk(vr, f->out, f->in, state);
|
||||
} else {
|
||||
li_chunkqueue_steal_all(f->out, f->in);
|
||||
if (f->in->is_closed) f->out->is_closed = TRUE;
|
||||
return LI_HANDLER_GO_ON;
|
||||
}
|
||||
}
|
||||
|
||||
void li_filter_buffer_on_disk_free_cb(liVRequest *vr, liFilter *f) {
|
||||
bod_state *state = f->param;
|
||||
|
||||
if (NULL == state) {
|
||||
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
|
||||
} else {
|
||||
li_filter_buffer_on_disk_reset(state);
|
||||
g_slice_free(bod_state, state);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
liStream* li_filter_buffer_on_disk(liVRequest *vr, goffset flush_limit, gboolean split_on_file_chunks) {
|
||||
bod_state *state = g_slice_new0(bod_state);
|
||||
state->vr = vr;
|
||||
state->flush_limit = flush_limit;
|
||||
state->split_on_file_chunks = split_on_file_chunks;
|
||||
li_stream_init(&state->stream, &vr->wrk->loop, bod_cb);
|
||||
return &state->stream;
|
||||
}
|
||||
|
||||
void li_filter_buffer_on_disk_stop(liStream *stream) {
|
||||
bod_state *state;
|
||||
|
||||
if (NULL == stream) return;
|
||||
assert(bod_cb == stream->cb);
|
||||
|
||||
li_stream_acquire(stream);
|
||||
state = LI_CONTAINER_OF(stream, bod_state, stream);
|
||||
bod_stop(state);
|
||||
li_stream_again_later(stream);
|
||||
li_stream_release(stream);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
#include <lighttpd/plugin_core.h>
|
||||
#include <lighttpd/filter_buffer_on_disk.h>
|
||||
|
||||
static void vrequest_job_cb(liJob *job) {
|
||||
liVRequest *vr = LI_CONTAINER_OF(job, liVRequest, job);
|
||||
|
@ -40,9 +41,6 @@ liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) {
|
|||
|
||||
li_vrequest_filters_init(vr);
|
||||
|
||||
vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */
|
||||
vr->in_buffer_state.split_on_file_chunks = FALSE;
|
||||
|
||||
li_action_stack_init(&vr->action_stack);
|
||||
|
||||
li_job_init(&vr->job, vrequest_job_cb);
|
||||
|
@ -55,17 +53,13 @@ liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) {
|
|||
void li_vrequest_free(liVRequest* vr) {
|
||||
liServer *srv = vr->wrk->srv;
|
||||
|
||||
if (NULL != vr->backend_drain) {
|
||||
li_stream_reset(vr->backend_drain);
|
||||
li_stream_release(vr->backend_drain);
|
||||
vr->backend_drain = NULL;
|
||||
}
|
||||
if (NULL != vr->backend_source) {
|
||||
li_stream_reset(vr->backend_source);
|
||||
li_stream_release(vr->backend_source);
|
||||
vr->backend_source = NULL;
|
||||
vr->direct_out = NULL;
|
||||
}
|
||||
li_stream_safe_reset_and_release(&vr->backend_drain);
|
||||
vr->direct_out = NULL;
|
||||
li_stream_safe_reset_and_release(&vr->backend_source);
|
||||
|
||||
li_filter_buffer_on_disk_stop(vr->in_buffer_on_disk_stream);
|
||||
li_stream_safe_reset_and_release(&vr->in_buffer_on_disk_stream);
|
||||
li_stream_safe_reset_and_release(&vr->wait_for_request_body_stream);
|
||||
|
||||
li_action_stack_clear(vr, &vr->action_stack);
|
||||
if (vr->state != LI_VRS_CLEAN) {
|
||||
|
@ -83,8 +77,6 @@ void li_vrequest_free(liVRequest* vr) {
|
|||
|
||||
li_vrequest_filters_clear(vr);
|
||||
|
||||
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
|
||||
|
||||
li_job_clear(&vr->job);
|
||||
|
||||
g_slice_free1(srv->option_def_values->len * sizeof(liOptionValue), vr->options);
|
||||
|
@ -126,6 +118,10 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) {
|
|||
vr->direct_out = NULL;
|
||||
}
|
||||
|
||||
li_filter_buffer_on_disk_stop(vr->in_buffer_on_disk_stream);
|
||||
li_stream_safe_reset_and_release(&vr->in_buffer_on_disk_stream);
|
||||
li_stream_safe_reset_and_release(&vr->wait_for_request_body_stream);
|
||||
|
||||
li_action_stack_reset(vr, &vr->action_stack);
|
||||
if (vr->state != LI_VRS_CLEAN) {
|
||||
li_plugins_handle_vrclose(vr);
|
||||
|
@ -147,10 +143,6 @@ void li_vrequest_reset(liVRequest *vr, gboolean keepalive) {
|
|||
|
||||
li_vrequest_filters_reset(vr);
|
||||
|
||||
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
|
||||
vr->in_buffer_state.flush_limit = -1; /* wait until upload is complete */
|
||||
vr->in_buffer_state.split_on_file_chunks = FALSE;
|
||||
|
||||
li_job_reset(&vr->job);
|
||||
|
||||
while (vr->stat_cache_entries->len > 0 ) {
|
||||
|
@ -220,6 +212,117 @@ void li_vrequest_handle_request_headers(liVRequest *vr) {
|
|||
li_vrequest_joblist_append(vr);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
liStream stream;
|
||||
|
||||
liVRequest *vr;
|
||||
gboolean have_mem_chunk, ready;
|
||||
} wait_for_request_body_stream;
|
||||
|
||||
static void wait_for_request_body_stream_cb(liStream *stream, liStreamEvent event) {
|
||||
wait_for_request_body_stream *ws = LI_CONTAINER_OF(stream, wait_for_request_body_stream, stream);
|
||||
|
||||
switch (event) {
|
||||
case LI_STREAM_NEW_DATA:
|
||||
if (NULL == ws->stream.source) return;
|
||||
if (ws->have_mem_chunk || ws->ready) {
|
||||
li_chunkqueue_steal_all(ws->stream.out, ws->stream.source->out);
|
||||
} else {
|
||||
liChunkQueue *in = ws->stream.source->out, *out = ws->stream.out;
|
||||
while (in->length > 0) {
|
||||
liChunk *c = li_chunkqueue_first_chunk(in);
|
||||
assert(NULL != c);
|
||||
if (FILE_CHUNK != c->type) {
|
||||
ws->have_mem_chunk = TRUE;
|
||||
li_chunkqueue_steal_all(out, in);
|
||||
break;
|
||||
}
|
||||
li_chunkqueue_steal_chunk(out, in);
|
||||
}
|
||||
}
|
||||
if (ws->stream.source->out->is_closed) {
|
||||
ws->stream.out->is_closed = TRUE;
|
||||
}
|
||||
if (!ws->ready && (ws->stream.out->is_closed ||
|
||||
(ws->have_mem_chunk && li_chunkqueue_limit_available(ws->stream.out) < 1024))) {
|
||||
ws->ready = TRUE;
|
||||
if (NULL != ws->vr) li_vrequest_joblist_append(ws->vr);
|
||||
ws->vr = NULL;
|
||||
}
|
||||
li_stream_notify(stream);
|
||||
break;
|
||||
case LI_STREAM_NEW_CQLIMIT:
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_DEST:
|
||||
ws->ready = TRUE;
|
||||
ws->vr = NULL;
|
||||
break;
|
||||
case LI_STREAM_CONNECTED_SOURCE:
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_DEST:
|
||||
if (!ws->stream.out->is_closed || 0 != ws->stream.out->length) {
|
||||
li_stream_disconnect(stream);
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DISCONNECTED_SOURCE:
|
||||
if (!ws->stream.out->is_closed) {
|
||||
li_stream_disconnect_dest(stream);
|
||||
if (!ws->ready) {
|
||||
ws->ready = TRUE;
|
||||
if (NULL != ws->vr) li_vrequest_joblist_append(ws->vr);
|
||||
ws->vr = NULL;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case LI_STREAM_DESTROY:
|
||||
g_slice_free(wait_for_request_body_stream, ws);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static liStream* wait_for_request_body_stream_new(liVRequest *vr) {
|
||||
wait_for_request_body_stream *ws = g_slice_new0(wait_for_request_body_stream);
|
||||
ws->vr = vr;
|
||||
li_stream_init(&ws->stream, &vr->wrk->loop, wait_for_request_body_stream_cb);
|
||||
return &ws->stream;
|
||||
}
|
||||
|
||||
static gboolean wait_for_request_body_stream_ready(liStream *stream) {
|
||||
wait_for_request_body_stream *ws;
|
||||
|
||||
if (NULL == stream) return FALSE;
|
||||
assert(wait_for_request_body_stream_cb == stream->cb);
|
||||
ws = LI_CONTAINER_OF(stream, wait_for_request_body_stream, stream);
|
||||
return ws->ready;
|
||||
}
|
||||
|
||||
gboolean li_vrequest_wait_for_request_body(liVRequest *vr) {
|
||||
goffset lim_avail;
|
||||
|
||||
/* too late to wait? */
|
||||
if (vr->state > LI_VRS_HANDLE_REQUEST_HEADERS) return TRUE;
|
||||
if (0 == vr->request.content_length) return TRUE;
|
||||
|
||||
if (NULL != vr->wait_for_request_body_stream) {
|
||||
if (wait_for_request_body_stream_ready(vr->wait_for_request_body_stream)) return TRUE;
|
||||
return FALSE; /* still waiting */
|
||||
}
|
||||
|
||||
lim_avail = li_chunkqueue_limit_available(vr->coninfo->req->out);
|
||||
|
||||
vr->wait_for_request_body_stream = wait_for_request_body_stream_new(vr);
|
||||
|
||||
if (vr->request.content_length < 0 || lim_avail < 0 || vr->request.content_length > lim_avail) {
|
||||
vr->in_buffer_on_disk_stream = li_filter_buffer_on_disk(vr, -1, FALSE);
|
||||
li_stream_connect(vr->coninfo->req, vr->in_buffer_on_disk_stream);
|
||||
li_stream_connect(vr->in_buffer_on_disk_stream, vr->wait_for_request_body_stream);
|
||||
} else {
|
||||
li_stream_connect(vr->coninfo->req, vr->wait_for_request_body_stream);
|
||||
}
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
/* response completely ready */
|
||||
gboolean li_vrequest_handle_direct(liVRequest *vr) {
|
||||
if (li_vrequest_handle_indirect(vr, NULL)) {
|
||||
|
@ -253,22 +356,33 @@ gboolean li_vrequest_handle_indirect(liVRequest *vr, liPlugin *p) {
|
|||
}
|
||||
|
||||
void li_vrequest_indirect_connect(liVRequest *vr, liStream *backend_drain, liStream* backend_source) {
|
||||
liStream *req_in;
|
||||
|
||||
assert(LI_VRS_READ_CONTENT == vr->state);
|
||||
assert(NULL != backend_drain);
|
||||
assert(NULL != backend_source);
|
||||
|
||||
li_stream_acquire(backend_drain);
|
||||
li_stream_acquire(backend_source);
|
||||
|
||||
vr->backend_drain = backend_drain;
|
||||
|
||||
if (NULL != vr->wait_for_request_body_stream) {
|
||||
req_in = vr->wait_for_request_body_stream;
|
||||
li_filter_buffer_on_disk_stop(vr->in_buffer_on_disk_stream);
|
||||
} else {
|
||||
req_in = vr->coninfo->req;
|
||||
}
|
||||
|
||||
/* connect in-queue */
|
||||
if (NULL != vr->filters_in_last) {
|
||||
li_stream_connect(vr->filters_in_last, vr->backend_drain);
|
||||
li_stream_connect(vr->coninfo->req, vr->filters_in_first);
|
||||
li_stream_connect(req_in, vr->filters_in_first);
|
||||
} else {
|
||||
/* no filters */
|
||||
li_stream_connect(vr->coninfo->req, vr->backend_drain);
|
||||
li_stream_connect(req_in, vr->backend_drain);
|
||||
}
|
||||
|
||||
li_stream_acquire(backend_source);
|
||||
vr->backend_source = backend_source;
|
||||
|
||||
li_chunkqueue_set_limit(backend_source->out, vr->coninfo->resp->out->limit);
|
||||
|
|
|
@ -120,6 +120,8 @@ static liHandlerResult fastcgi_handle(liVRequest *vr, gpointer param, gpointer *
|
|||
|
||||
if (li_vrequest_is_handled(vr)) return LI_HANDLER_GO_ON;
|
||||
|
||||
LI_VREQUEST_WAIT_FOR_REQUEST_BODY(vr);
|
||||
|
||||
switch (li_fastcgi_backend_get(vr, ctx->pool, &bcon, &bwait)) {
|
||||
case LI_BACKEND_SUCCESS:
|
||||
assert(NULL == bwait);
|
||||
|
|
|
@ -248,6 +248,8 @@ static liHandlerResult proxy_handle(liVRequest *vr, gpointer param, gpointer *co
|
|||
|
||||
if (li_vrequest_is_handled(vr)) return LI_HANDLER_GO_ON;
|
||||
|
||||
LI_VREQUEST_WAIT_FOR_REQUEST_BODY(vr);
|
||||
|
||||
switch (li_backend_get(vr, ctx->pool, &bcon, &bwait)) {
|
||||
case LI_BACKEND_SUCCESS:
|
||||
assert(NULL == bwait);
|
||||
|
|
|
@ -351,6 +351,8 @@ static liHandlerResult scgi_handle(liVRequest *vr, gpointer param, gpointer *con
|
|||
|
||||
if (li_vrequest_is_handled(vr)) return LI_HANDLER_GO_ON;
|
||||
|
||||
LI_VREQUEST_WAIT_FOR_REQUEST_BODY(vr);
|
||||
|
||||
switch (li_backend_get(vr, ctx->pool, &bcon, &bwait)) {
|
||||
case LI_BACKEND_SUCCESS:
|
||||
assert(NULL == bwait);
|
||||
|
|
Loading…
Reference in New Issue