2
0
Fork 0

[core] Use streams

This commit is contained in:
Stefan Bühler 2013-05-22 17:26:33 +02:00
parent b41cc2898a
commit ff69160c34
46 changed files with 1391 additions and 1204 deletions

View File

@ -39,6 +39,7 @@ AC_PROG_LIBTOOL
AC_HEADER_STDC
AC_HEADER_SYS_WAIT
AC_CHECK_HEADERS([ \
unistd.h \
stddef.h \
sys/mman.h \
sys/resource.h \

View File

@ -19,6 +19,7 @@
#include <lighttpd/waitqueue.h>
#include <lighttpd/stream.h>
#include <lighttpd/filter.h>
#include <lighttpd/radix.h>
#include <lighttpd/base_lua.h>

View File

@ -50,10 +50,10 @@ struct liChunk {
GList cq_link;
};
typedef void (*liCQLimitNotifyCB)(liVRequest *vr, gpointer context, gboolean locked);
typedef void (*liCQLimitNotifyCB)(gpointer context, gboolean locked);
struct liCQLimit {
gint refcount;
liVRequest *vr;
struct ev_loop *loop;
goffset limit, current;
gboolean locked;
@ -124,7 +124,7 @@ INLINE goffset li_chunk_length(liChunk *c);
* cqlimit *
******************/
LI_API liCQLimit* li_cqlimit_new(liVRequest *vr);
LI_API liCQLimit* li_cqlimit_new(struct ev_loop *loop);
LI_API void li_cqlimit_reset(liCQLimit *cql);
LI_API void li_cqlimit_acquire(liCQLimit *cql);
LI_API void li_cqlimit_release(liCQLimit *cql);
@ -138,7 +138,7 @@ LI_API liChunkQueue* li_chunkqueue_new();
LI_API void li_chunkqueue_reset(liChunkQueue *cq);
LI_API void li_chunkqueue_free(liChunkQueue *cq);
LI_API void li_chunkqueue_use_limit(liChunkQueue *cq, liVRequest *vr);
LI_API void li_chunkqueue_use_limit(liChunkQueue *cq, struct ev_loop *loop, goffset limit);
LI_API void li_chunkqueue_set_limit(liChunkQueue *cq, liCQLimit* cql);
/* return -1 for unlimited, 0 for full and n > 0 for n bytes free */
LI_API goffset li_chunkqueue_limit_available(liChunkQueue *cq);

View File

@ -9,6 +9,9 @@ typedef enum {
/** unused */
LI_CON_STATE_DEAD,
/** closed (or "closing") */
LI_CON_STATE_CLOSE,
/** waiting for new input after first request */
LI_CON_STATE_KEEP_ALIVE,
@ -25,21 +28,34 @@ typedef enum {
LI_CON_STATE_WRITE,
} liConnectionState;
typedef struct liConnectionSocketCallbacks liConnectionSocketCallbacks;
typedef struct liConnectionSocket liConnectionSocket;
struct liConnectionSocketCallbacks {
void (*finish)(liConnection *con, gboolean aborted);
};
struct liConnectionSocket {
gpointer data; /** private data (simple tcp, ssl, ...) */
const liConnectionSocketCallbacks *callbacks;
liStream *raw_in, *raw_out;
};
struct liConnection {
guint idx; /** index in connection table */
guint idx; /** index in connection table, -1 if not active */
liServer *srv;
liWorker *wrk;
liServerSocket *srv_sock;
gpointer srv_sock_data; /** private data for custom sockets (ssl) */
liConnectionSocket con_sock;
liConInfo info;
liConnectionState state;
gboolean response_headers_sent, expect_100_cont;
gboolean response_headers_sent, expect_100_cont, out_has_all_data;
liChunkQueue *raw_in, *raw_out;
liChunkQueue *in, *out; /* link to mainvr->in/out */
liBuffer *raw_in_buffer;
liStream in, out;
liVRequest *mainvr;
liHttpRequestCtx req_parser_ctx;
@ -55,18 +71,17 @@ struct liConnection {
} keep_alive_data;
guint keep_alive_requests;
ev_io sock_watcher;
gboolean can_read, can_write;
/* I/O timeout data */
/* I/O read timeout data */
liWaitQueueElem io_timeout_elem;
liJob job_reset;
};
/* Internal functions */
LI_API liConnection* li_connection_new(liWorker *wrk);
/** Free dead connections */
LI_API void li_connection_free(liConnection *con);
/** close connection */
/* close connection (for worker keep-alive timeout) */
LI_API void li_connection_reset(liConnection *con);
/** aborts an active connection, calls all plugin cleanup handlers */
@ -80,6 +95,27 @@ LI_API gchar *li_connection_state_str(liConnectionState state);
/* returns NULL if the vrequest doesn't belong to a liConnection* object */
LI_API liConnection* li_connection_from_vrequest(liVRequest *vr);
LI_API void connection_handle_io(liConnection *con);
/******************************************************/
/* IO backend stuff (simple tcp, tls implementations) */
/******************************************************/
/* call after IO send operations if con->out_has_all_data and out queues are empty */
LI_API void li_connection_request_done(liConnection *con);
/* call after successful io
* li_connection_simple_tcp takes care of this for you.
*/
LI_API void li_connection_update_io_timeout(liConnection *con);
/* handles IOStream events for a connection; updates transfered bytes and io timeouts;
* *pcon is needed to handle cases then the connections gets reset while handling io stuff
* NULL == *pcon is ok - it won't update transfered bytes and io timeouts then.
* closes outgoing stream on reading EOF
*/
LI_API void li_connection_simple_tcp(liConnection **pcon, liIOStream *stream, gpointer *context, liIOStreamEvent event);
/******************************************************/
#endif

38
include/lighttpd/filter.h Normal file
View File

@ -0,0 +1,38 @@
#ifndef _LIGHTTPD_FILTER_H_
#define _LIGHTTPD_FILTER_H_
#ifndef _LIGHTTPD_BASE_H_
#error Please include <lighttpd/base.h> instead of this file
#endif
typedef liHandlerResult (*liFilterHandlerCB)(liVRequest *vr, liFilter *f);
typedef void (*liFilterFreeCB)(liVRequest *vr, liFilter *f);
typedef void (*liFilterEventCB)(liVRequest *vr, liFilter *f, liStreamEvent event);
struct liFilter {
liStream stream;
liChunkQueue *in, *out;
/* if the handler wasn't able to handle all "in" data it must call li_stream_again(&f->stream) to trigger a new call to handle_data
* vr, in and out can be NULL if the associated vrequest/stream was destroyed
*/
liFilterHandlerCB handle_data;
liFilterFreeCB handle_free;
liFilterEventCB handle_event;
gpointer param;
liVRequest *vr;
guint filter_ndx;
};
LI_API liFilter* li_filter_new(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param);
LI_API liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param);
LI_API liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param);
LI_API void li_vrequest_filters_init(liVRequest *vr);
LI_API void li_vrequest_filters_clear(liVRequest *vr);
LI_API void li_vrequest_filters_reset(liVRequest *vr);
#endif

View File

@ -20,4 +20,7 @@ LI_API liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *ou
LI_API void li_filter_buffer_on_disk_reset(liFilterBufferOnDiskState *state);
LI_API liHandlerResult li_filter_buffer_on_disk_cb(liVRequest *vr, liFilter *f);
LI_API void li_filter_buffer_on_disk_free_cb(liVRequest *vr, liFilter *f);
#endif

View File

@ -15,7 +15,6 @@ LI_API void li_response_init(liResponse *resp);
LI_API void li_response_reset(liResponse *resp);
LI_API void li_response_clear(liResponse *resp);
LI_API gboolean li_response_send_headers(liConnection *con);
LI_API void li_response_send_error_page(liConnection *con);
LI_API void li_response_send_headers(liVRequest *vr, liChunkQueue *raw_out, liChunkQueue *response_body);
#endif

View File

@ -9,11 +9,7 @@
# define LIGHTTPD_SERVER_MAGIC ((guint)0x12AB34CD)
#endif
typedef gboolean (*liConnectionNewCB)(liConnection *con);
typedef void (*liConnectionCloseCB)(liConnection *con);
typedef liNetworkStatus (*liConnectionWriteCB)(liConnection *con, goffset write_max);
typedef liNetworkStatus (*liConnectionReadCB)(liConnection *con);
typedef void (*liServerSocketUpdateEventsCB)(liConnection *con, int events);
typedef gboolean (*liConnectionNewCB)(liConnection *con, int fd);
typedef void (*liServerSocketReleaseCB)(liServerSocket *srv_sock);
typedef void (*liServerStateWaitCancelled)(liServer *srv, liServerStateWait *w);
@ -38,12 +34,8 @@ struct liServerSocket {
/* Custom sockets (ssl) */
gpointer data;
liConnectionWriteCB write_cb;
liConnectionReadCB read_cb;
liConnectionNewCB new_cb;
liConnectionCloseCB close_cb;
liServerSocketReleaseCB release_cb;
liServerSocketUpdateEventsCB update_events_cb;
};
struct liServerStateWait {

View File

@ -80,6 +80,10 @@ typedef struct liCondition liCondition;
typedef struct liConnection liConnection;
/* filter.h */
typedef struct liFilter liFilter;
/* http_headers.h */
typedef struct liHttpHeader liHttpHeader;
@ -268,10 +272,6 @@ typedef struct liConInfo liConInfo;
typedef struct liVRequest liVRequest;
typedef struct liFilter liFilter;
typedef struct liFilters liFilters;
/* worker.h */
typedef struct liWorker liWorker;

View File

@ -34,19 +34,10 @@ typedef enum {
LI_VRS_ERROR
} liVRequestState;
typedef liHandlerResult (*liFilterHandlerCB)(liVRequest *vr, liFilter *f);
typedef void (*liFilterFreeCB)(liVRequest *vr, liFilter *f);
typedef G_GNUC_WARN_UNUSED_RESULT gboolean (*liVRequestHandlerCB)(liVRequest *vr);
typedef liHandlerResult (*liVRequestPluginHandlerCB)(liVRequest *vr, liPlugin *p);
typedef gboolean (*liVRequestCheckIOCB)(liVRequest *vr);
typedef void (*liVRequestHandlerCB)(liVRequest *vr);
struct liConCallbacks {
liVRequestHandlerCB
handle_request_headers,
handle_response_headers, handle_response_body,
handle_response_error; /* this is _not_ for 500 - internal error */
liVRequestCheckIOCB handle_check_io;
liVRequestHandlerCB handle_response_error; /* this is _not_ for 500 - internal error */
};
/* this data "belongs" to a vrequest, but is updated by the connection code */
@ -57,6 +48,10 @@ struct liConInfo {
GString *remote_addr_str, *local_addr_str;
gboolean is_ssl;
gboolean keep_alive;
gboolean aborted; /* network aborted connection before response was sent completely */
liStream *req;
liStream *resp;
/* bytes in our "raw-io-out-queue" that hasn't be sent yet. (whatever "sent" means - in ssl buffer, kernel, ...) */
goffset out_queue_length;
@ -73,20 +68,6 @@ struct liConInfo {
} stats;
};
struct liFilter {
liChunkQueue *in, *out;
liFilterHandlerCB handle_data;
liFilterFreeCB handle_free;
gpointer param;
/* do not modify these yourself: */
gboolean knows_out_is_closed, done;
};
struct liFilters {
GPtrArray *queue;
liChunkQueue *in, *out;
};
struct liVRequest {
liConInfo *coninfo;
liWorker *wrk;
@ -101,7 +82,6 @@ struct liVRequest {
ev_tstamp ts_started;
GPtrArray *plugin_ctx;
liPlugin *backend;
liRequest request;
liPhysical physical;
@ -111,14 +91,18 @@ struct liVRequest {
liEnvironment env;
/* -> vr_in -> filters_in -> in_memory ->(buffer_on_disk) -> in -> handle -> out -> filters_out -> vr_out -> */
gboolean cq_memory_limit_hit; /* stop feeding chunkqueues with memory chunks */
liFilters filters_in, filters_out;
liChunkQueue *vr_in, *vr_out, *in_memory;
liChunkQueue *in, *out;
GPtrArray *filters;
liStream *filters_in_last, *filters_out_last;
liStream *filters_in_first, *filters_out_first;
liFilterBufferOnDiskState in_buffer_state;
liPlugin *backend;
liStream *backend_source;
liStream *backend_drain;
liChunkQueue *direct_out; /* NULL for indirect responses, backend_source->out for direct responses. do not set this yourself for indirect responses! */
liActionStack action_stack;
gboolean actions_wait_for_response;
liJob job;
@ -166,9 +150,6 @@ LI_API void li_vrequest_free(liVRequest *vr);
*/
LI_API void li_vrequest_reset(liVRequest *vr, gboolean keepalive);
LI_API liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
LI_API liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, gpointer param);
/* Signals an internal error; handles the error in the _next_ loop */
LI_API void li_vrequest_error(liVRequest *vr);
@ -181,18 +162,18 @@ LI_API void li_vrequest_backend_finished(liVRequest *vr); /* action.c */
LI_API void li_vrequest_start(liVRequest *vr);
/* received all request headers */
LI_API void li_vrequest_handle_request_headers(liVRequest *vr);
/* received (partial) request content */
LI_API void li_vrequest_handle_request_body(liVRequest *vr);
/* received all response headers/status code - call once from your indirect handler */
LI_API void li_vrequest_handle_response_headers(liVRequest *vr);
/* received (partial) response content - call from your indirect handler */
LI_API void li_vrequest_handle_response_body(liVRequest *vr);
/* response completely ready */
/* response completely ready; use this only in action callbacks */
LI_API gboolean li_vrequest_handle_direct(liVRequest *vr);
/* check whether the request is already handled */
LI_API gboolean li_vrequest_is_handled(liVRequest *vr);
/* handle request over time */
LI_API gboolean li_vrequest_handle_indirect(liVRequest *vr, liPlugin *p);
LI_API gboolean li_vrequest_is_handled(liVRequest *vr);
/* signal that backend connection is ready - after this a backend error might result in a internal error */
LI_API void li_vrequest_indirect_connect(liVRequest *vr, liStream *backend_drain, liStream *backend_source);
/* received all response headers/status code - call once from your indirect handler */
LI_API void li_vrequest_indirect_headers_ready(liVRequest *vr);
LI_API void li_vrequest_state_machine(liVRequest *vr);
LI_API void li_vrequest_joblist_append(liVRequest *vr);

View File

@ -213,6 +213,7 @@ SET(LIGHTTPD_SHARED_SRC
connection.c
environment.c
etag.c
filter.c
filter_chunked.c
filter_buffer_on_disk.c
http_headers.c
@ -240,7 +241,6 @@ SET(LIGHTTPD_SHARED_SRC
value.c
virtualrequest.c
worker.c
plugin_core.c
)
@ -322,16 +322,16 @@ TARGET_LINK_LIBRARIES(lighttpd2 lighttpd-${PACKAGE_VERSION}-common lighttpd-${PA
SET(L_INSTALL_TARGETS ${L_INSTALL_TARGETS} lighttpd2-worker lighttpd2 lighttpd-${PACKAGE_VERSION}-common lighttpd-${PACKAGE_VERSION}-shared lighttpd-${PACKAGE_VERSION}-sharedangel)
IF(BUILD_EXTRA_WARNINGS)
SET(WARN_CFLAGS "-g -O2 -g2 -Wall -Wmissing-declarations -Wdeclaration-after-statement -Wcast-align -Wsign-compare -Wnested-externs -Wpointer-arith")
SET(WARN_LDFLAGS "-g -O2 -g2 -Wall -Wl,--as-needed")
SET(WARN_CFLAGS " -g -O2 -g2 -Wall -Wmissing-declarations -Wdeclaration-after-statement -Wcast-align -Wsign-compare -Wnested-externs -Wpointer-arith")
SET(WARN_LDFLAGS " -g -O2 -g2 -Wall -Wl,--as-needed")
# -Werror -Wbad-function-cast -Wmissing-prototypes
ELSE(BUILD_EXTRA_WARNINGS)
SET(WARN_CFLAGS "")
SET(WARN_LDFLAGS "")
ENDIF(BUILD_EXTRA_WARNINGS)
SET(COMMON_LDFLAGS "${LUA_LDFLAGS} ${LIBEV_LDFLAGS} ${GTHREAD_LDFLAGS} ${GMODULE_LDFLAGS} ${WARN_LDFLAGS}")
SET(COMMON_CFLAGS "${LUA_CFLAGS} ${LIBEV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS} ${WARN_CFLAGS}")
SET(COMMON_LDFLAGS "${LUA_LDFLAGS} ${LIBEV_LDFLAGS} ${GTHREAD_LDFLAGS} ${GMODULE_LDFLAGS}${WARN_LDFLAGS}")
SET(COMMON_CFLAGS "${LUA_CFLAGS} ${LIBEV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS}${WARN_CFLAGS}")
ADD_AND_INSTALL_LIBRARY(mod_access "modules/mod_access.c")
ADD_AND_INSTALL_LIBRARY(mod_accesslog "modules/mod_accesslog.c")
@ -367,16 +367,16 @@ IF(WITH_LUA)
ENDIF(WITH_LUA)
IF(WITH_GNUTLS)
ADD_AND_INSTALL_LIBRARY(mod_gnutls "modules/mod_gnutls.c")
TARGET_LINK_LIBRARIES(mod_gnutls ${GNUTLS_LDFLAGS})
ADD_TARGET_PROPERTIES(mod_gnutls COMPILE_FLAGS ${GNUTLS_CFLAGS})
# ADD_AND_INSTALL_LIBRARY(mod_gnutls "modules/mod_gnutls.c")
# TARGET_LINK_LIBRARIES(mod_gnutls ${GNUTLS_LDFLAGS})
# ADD_TARGET_PROPERTIES(mod_gnutls COMPILE_FLAGS ${GNUTLS_CFLAGS})
ENDIF(WITH_GNUTLS)
IF(HAVE_LIBSSL AND HAVE_LIBCRYPTO)
IF(WITH_OPENSSL)
ADD_AND_INSTALL_LIBRARY(mod_openssl "modules/mod_openssl.c")
TARGET_LINK_LIBRARIES(mod_openssl ssl)
TARGET_LINK_LIBRARIES(mod_openssl crypto)
ENDIF(HAVE_LIBSSL AND HAVE_LIBCRYPTO)
ENDIF(WITH_OPENSSL)
TARGET_LINK_LIBRARIES(lighttpd-${PACKAGE_VERSION}-common ${COMMON_LDFLAGS})
ADD_TARGET_PROPERTIES(lighttpd-${PACKAGE_VERSION}-common COMPILE_FLAGS ${COMMON_CFLAGS})

View File

@ -1,6 +1,6 @@
lib_LTLIBRARIES=liblighttpd2-common.la
common_cflags=-I$(top_srcdir)/include -I$(top_builddir)/include
common_cflags=-I$(top_builddir)/include -I$(top_srcdir)/include
common_src= \
angel_connection.c \

View File

@ -189,6 +189,7 @@ static inline void mp_free_page(const void *ptr, gsize size) {
# ifdef MAP_ANON
munmap((void*) ptr, size);
# else
UNUSED(size);
g_free(ptr);
# endif

View File

@ -87,11 +87,13 @@ gpointer li_radixtree_insert(liRadixTree *tree, const void *key, guint32 bits, g
/* split node */
liRadixNode *newnode;
guint32 width = (node->width > bits) ? bits : node->width;
assert(width <= RDXBITS);
mask = RDX_MASK(width);
while ((current & mask) != (node->key & mask)) {
width--;
mask <<= 1;
}
assert(width <= RDXBITS-1);
newnode = g_slice_new0(liRadixNode);
newnode->width = width;
newnode->key = current & mask;

View File

@ -1,15 +1,4 @@
#ifdef LIGHTY_OS_NETBSD
#define _NETBSD_SOURCE
#endif
#ifdef LIGHTY_OS_OPENBSD
#warning OpenBSD does net allow sending of file descriptors when _XOPEN_SOURCE is defined (needed for Solaris)
#else
#define _XOPEN_SOURCE
#define _XOPEN_SOURCE_EXTENDED 1
#endif
#include <lighttpd/utils.h>
#include <lighttpd/ip_parsers.h>

View File

@ -2,7 +2,7 @@
libexec_PROGRAMS=lighttpd2-worker
lib_LTLIBRARIES=liblighttpd2-shared.la
common_cflags=-I$(top_srcdir)/include -I$(top_builddir)/include
common_cflags=-I$(top_builddir)/include -I$(top_srcdir)/include
lighttpd_shared_src= \
angel.c \
@ -18,6 +18,7 @@ lighttpd_shared_src= \
connection.c \
environment.c \
etag.c \
filter.c \
filter_chunked.c \
filter_buffer_on_disk.c \
http_headers.c \

View File

@ -390,7 +390,7 @@ liHandlerResult li_action_execute(liVRequest *vr) {
return res;
}
if (as->backend_failed && ase == action_stack_top(as)) {
/* when backend selection failed and balancer i still the top action, we remove the balancer itself so it doesn't loop forever */
/* when backend selection failed and balancer is still the top action, we remove the balancer itself so it doesn't loop forever */
action_stack_pop(srv, vr, as);
}
break;

View File

@ -322,10 +322,10 @@ static void chunk_free(liChunkQueue *cq, liChunk *c) {
/******************
* cqlimit *
******************/
liCQLimit* li_cqlimit_new(liVRequest *vr) {
liCQLimit* li_cqlimit_new(struct ev_loop *loop) {
liCQLimit *cql = g_slice_new0(liCQLimit);
cql->refcount = 1;
cql->vr = vr;
cql->loop = loop;
cql->limit = -1;
return cql;
}
@ -356,20 +356,20 @@ void li_cqlimit_release(liCQLimit *cql) {
static void cqlimit_lock(liCQLimit *cql) {
cql->locked = TRUE;
if (cql->io_watcher && cql->io_watcher->fd != -1) {
li_ev_io_rem_events(cql->vr->wrk->loop, cql->io_watcher, EV_READ);
li_ev_io_rem_events(cql->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
cql->notify(cql->vr, cql->context, cql->locked);
cql->notify(cql->context, cql->locked);
}
}
static void cqlimit_unlock(liCQLimit *cql) {
cql->locked = FALSE;
if (cql->io_watcher && cql->io_watcher->fd != -1) {
li_ev_io_add_events(cql->vr->wrk->loop, cql->io_watcher, EV_READ);
li_ev_io_add_events(cql->loop, cql->io_watcher, EV_READ);
}
if (cql->notify) {
cql->notify(cql->vr, cql->context, cql->locked);
cql->notify(cql->context, cql->locked);
}
}
@ -452,9 +452,9 @@ void li_chunkqueue_free(liChunkQueue *cq) {
g_slice_free(liChunkQueue, cq);
}
void li_chunkqueue_use_limit(liChunkQueue *cq, liVRequest *vr) {
if (cq->limit) return;
cq->limit = li_cqlimit_new(vr);
void li_chunkqueue_use_limit(liChunkQueue *cq, struct ev_loop *loop, goffset limit) {
if (!cq->limit) cq->limit = li_cqlimit_new(loop);
li_cqlimit_set_limit(cq->limit, limit);
}
void li_chunkqueue_set_limit(liChunkQueue *cq, liCQLimit* cql) {

File diff suppressed because it is too large Load Diff

193
src/main/filter.c Normal file
View File

@ -0,0 +1,193 @@
#include <lighttpd/base.h>
#include <lighttpd/filter.h>
static void li_filter_stop(liFilter *filter);
void li_vrequest_filters_init(liVRequest *vr) {
vr->filters_in_last = vr->filters_out_last = NULL;
vr->filters_in_first = vr->filters_out_first = NULL;
vr->filters = g_ptr_array_new();
}
void li_vrequest_filters_clear(liVRequest *vr) {
li_vrequest_filters_reset(vr);
g_ptr_array_free(vr->filters, TRUE);
vr->filters = NULL;
}
void li_vrequest_filters_reset(liVRequest *vr) {
while (vr->filters->len > 0) {
li_filter_stop(g_ptr_array_index(vr->filters, vr->filters->len - 1));
}
vr->filters_in_last = vr->filters_out_last = NULL;
vr->filters_in_first = vr->filters_out_first = NULL;
}
static void filter_handle_data(liFilter *filter) {
if (NULL != filter->handle_data) {
goffset curoutlen = filter->stream.out->length;
gboolean curout_closed = filter->stream.out->is_closed;
filter->in = (NULL != filter->stream.source) ? filter->stream.source->out : NULL;
switch (filter->handle_data(filter->vr, filter)) {
case LI_HANDLER_GO_ON:
if (NULL != filter->stream.source && (0 != filter->stream.source->out->length)) {
/* auto comeback */
li_stream_again(&filter->stream);
}
break;
case LI_HANDLER_COMEBACK:
li_stream_again(&filter->stream);
break;
case LI_HANDLER_WAIT_FOR_EVENT:
break;
case LI_HANDLER_ERROR:
if (NULL != filter->vr) li_vrequest_error(filter->vr);
li_stream_reset(&filter->stream);
break;
}
if (NULL != filter->stream.source && (0 == filter->stream.source->out->length) && filter->stream.source->out->is_closed) {
li_stream_disconnect(&filter->stream);
}
if (curoutlen != filter->stream.out->length || curout_closed != filter->stream.out->is_closed) {
li_stream_notify(&filter->stream);
}
}
}
static void filter_stream_cb(liStream *stream, liStreamEvent event) {
liFilter *filter = LI_CONTAINER_OF(stream, liFilter, stream);
switch (event) {
case LI_STREAM_NEW_DATA:
filter_handle_data(filter);
break;
case LI_STREAM_NEW_CQLIMIT:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
}
break;
case LI_STREAM_CONNECTED_SOURCE:
case LI_STREAM_CONNECTED_DEST:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
} else {
li_stream_again(stream);
}
break;
case LI_STREAM_DISCONNECTED_SOURCE:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
} else {
if (!stream->out->is_closed) li_stream_again(stream);
}
break;
case LI_STREAM_DISCONNECTED_DEST:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
} else {
li_stream_disconnect(stream);
}
break;
case LI_STREAM_DESTROY:
if (NULL != filter->handle_event) {
filter->handle_event(filter->vr, filter, event);
}
if (NULL != filter->handle_free) {
filter->handle_free(filter->vr, filter);
}
break;
}
}
liFilter* li_filter_new(liVRequest* vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param) {
liFilter *f;
f = g_slice_new0(liFilter);
li_stream_init(&f->stream, &vr->wrk->jobqueue, filter_stream_cb);
f->out = f->stream.out;
f->param = param;
f->handle_data = handle_data;
f->handle_free = handle_free;
f->handle_event = handle_event;
f->vr = vr;
f->filter_ndx = vr->filters->len;
g_ptr_array_add(vr->filters, f);
return f;
}
static void li_filter_stop(liFilter *filter) {
liVRequest *vr = filter->vr;
if (NULL == vr) return;
filter->vr = NULL;
/* remove from vr filters list */
assert(vr->filters->len > 0);
if (vr->filters->len - 1 != filter->filter_ndx) {
/* not the last filter, swap: */
liFilter *last = g_ptr_array_index(vr->filters, vr->filters->len - 1);
last->filter_ndx = filter->filter_ndx;
g_ptr_array_index(vr->filters, filter->filter_ndx) = last;
}
g_ptr_array_set_size(vr->filters, vr->filters->len - 1);
li_stream_again(&filter->stream);
li_stream_release(&filter->stream);
}
liFilter* li_vrequest_add_filter_in(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param) {
liFilter *f;
/* as soon as we have a backend -> too late for in filters */
if (LI_VRS_READ_CONTENT <= vr->state) return NULL;
f = li_filter_new(vr, handle_data, handle_free, handle_event, param);
if (NULL == vr->filters_in_first) {
assert(NULL == vr->filters_in_last);
vr->filters_in_first = &f->stream;
vr->filters_in_last = &f->stream;
} else {
assert(NULL != vr->filters_in_last);
li_stream_connect(vr->filters_in_last, &f->stream);
vr->filters_in_last = &f->stream;
}
return f;
}
liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_data, liFilterFreeCB handle_free, liFilterEventCB handle_event, gpointer param) {
liFilter *f;
/* as soon as we write the response it is too late for out filters */
if (LI_VRS_WRITE_CONTENT <= vr->state) return NULL;
f = li_filter_new(vr, handle_data, handle_free, handle_event, param);
if (NULL == vr->filters_out_first) {
assert(NULL == vr->filters_out_last);
vr->filters_out_first = &f->stream;
vr->filters_out_last = &f->stream;
} else {
assert(NULL != vr->filters_out_last);
li_stream_connect(vr->filters_out_last, &f->stream);
vr->filters_out_last = &f->stream;
}
return f;
}

View File

@ -136,3 +136,30 @@ liHandlerResult li_filter_buffer_on_disk(liVRequest *vr, liChunkQueue *out, liCh
void li_filter_buffer_on_disk_reset(bod_state *state) {
bod_close(state);
}
liHandlerResult li_filter_buffer_on_disk_cb(liVRequest *vr, liFilter *f) {
goffset lim_avail;
bod_state *state = f->param;
if (NULL == state) state = &vr->in_buffer_state;
if (state->tempfile || vr->request.content_length < 0 || vr->request.content_length > 64*1024 ||
((lim_avail = li_chunkqueue_limit_available(f->in)) <= 32*1024 && lim_avail >= 0)) {
return li_filter_buffer_on_disk(vr, f->out, f->in, state);
} else {
li_chunkqueue_steal_all(f->out, f->in);
if (f->in->is_closed) f->out->is_closed = TRUE;
return LI_HANDLER_GO_ON;
}
}
void li_filter_buffer_on_disk_free_cb(liVRequest *vr, liFilter *f) {
bod_state *state = f->param;
if (NULL == state) {
li_filter_buffer_on_disk_reset(&vr->in_buffer_state);
} else {
li_filter_buffer_on_disk_reset(state);
g_slice_free(bod_state, state);
}
}

View File

@ -262,7 +262,7 @@ static liHandlerResult filter_lua_in(liVRequest *vr, gpointer param, gpointer *c
UNUSED(context);
if (state) {
li_vrequest_add_filter_in(vr, filter_lua_handle, filter_lua_free, state);
li_vrequest_add_filter_in(vr, filter_lua_handle, filter_lua_free, NULL, state);
}
return LI_HANDLER_GO_ON;
@ -274,7 +274,7 @@ static liHandlerResult filter_lua_out(liVRequest *vr, gpointer param, gpointer *
UNUSED(context);
if (state) {
li_vrequest_add_filter_out(vr, filter_lua_handle, filter_lua_free, state);
li_vrequest_add_filter_out(vr, filter_lua_handle, filter_lua_free, NULL, state);
}
return LI_HANDLER_GO_ON;
@ -342,7 +342,7 @@ liFilter* li_lua_vrequest_add_filter_in(lua_State *L, liVRequest *vr, int state_
state->LL = LL;
state->object_ref = luaL_ref(L, LUA_REGISTRYINDEX);
return li_vrequest_add_filter_in(vr, filter_lua_handle, filter_lua_free, state);
return li_vrequest_add_filter_in(vr, filter_lua_handle, filter_lua_free, NULL, state);
}
liFilter* li_lua_vrequest_add_filter_out(lua_State *L, liVRequest *vr, int state_ndx) {
@ -355,5 +355,5 @@ liFilter* li_lua_vrequest_add_filter_out(lua_State *L, liVRequest *vr, int state
state->LL = LL;
state->object_ref = luaL_ref(L, LUA_REGISTRYINDEX);
return li_vrequest_add_filter_out(vr, filter_lua_handle, filter_lua_free, state);
return li_vrequest_add_filter_out(vr, filter_lua_handle, filter_lua_free, NULL, state);
}

View File

@ -707,11 +707,11 @@ static liHandlerResult core_handle_static(liVRequest *vr, gpointer param, gpoint
if (is_multipart) {
GString *subheader = g_string_sized_new(1023);
g_string_append_printf(subheader, "\r\n--%s\r\nContent-Type: %s\r\nContent-Range: %s\r\n\r\n", boundary, mime_str->str, vr->wrk->tmp_str->str);
li_chunkqueue_append_string(vr->out, subheader);
li_chunkqueue_append_chunkfile(vr->out, cf, rs.range_start, rs.range_length);
li_chunkqueue_append_string(vr->direct_out, subheader);
li_chunkqueue_append_chunkfile(vr->direct_out, cf, rs.range_start, rs.range_length);
} else {
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Range"), GSTR_LEN(vr->wrk->tmp_str));
li_chunkqueue_append_chunkfile(vr->out, cf, rs.range_start, rs.range_length);
li_chunkqueue_append_chunkfile(vr->direct_out, cf, rs.range_start, rs.range_length);
}
break;
case LI_PARSE_HTTP_RANGE_DONE:
@ -721,7 +721,7 @@ static liHandlerResult core_handle_static(liVRequest *vr, gpointer param, gpoint
if (is_multipart) {
GString *subheader = g_string_sized_new(1023);
g_string_append_printf(subheader, "\r\n--%s--\r\n", boundary);
li_chunkqueue_append_string(vr->out, subheader);
li_chunkqueue_append_string(vr->direct_out, subheader);
g_string_printf(vr->wrk->tmp_str, "multipart/byteranges; boundary=%s", boundary);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), GSTR_LEN(vr->wrk->tmp_str));
@ -731,12 +731,13 @@ static liHandlerResult core_handle_static(liVRequest *vr, gpointer param, gpoint
break;
case LI_PARSE_HTTP_RANGE_INVALID:
done = TRUE;
li_chunkqueue_reset(vr->out); vr->out->is_closed = TRUE;
/* indirect handing: out cq is already "closed" */
li_chunkqueue_reset(vr->direct_out); vr->direct_out->is_closed = TRUE;
break;
case LI_PARSE_HTTP_RANGE_NOT_SATISFIABLE:
ranged_response = TRUE;
done = TRUE;
li_chunkqueue_reset(vr->out); vr->out->is_closed = TRUE;
li_chunkqueue_reset(vr->direct_out);
g_string_printf(vr->wrk->tmp_str, "bytes */%"G_GINT64_FORMAT, (goffset) st.st_size);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Range"), GSTR_LEN(vr->wrk->tmp_str));
vr->response.http_status = 416;
@ -750,7 +751,7 @@ static liHandlerResult core_handle_static(liVRequest *vr, gpointer param, gpoint
if (!ranged_response) {
vr->response.http_status = 200;
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), GSTR_LEN(mime_str));
li_chunkqueue_append_chunkfile(vr->out, cf, 0, st.st_size);
li_chunkqueue_append_chunkfile(vr->direct_out, cf, 0, st.st_size);
}
li_chunkfile_release(cf);
@ -952,7 +953,7 @@ static liHandlerResult core_handle_respond(liVRequest *vr, gpointer param, gpoin
if (rp->pattern) {
g_string_truncate(vr->wrk->tmp_str, 0);
li_pattern_eval(vr, vr->wrk->tmp_str, rp->pattern, NULL, NULL, NULL, NULL);
li_chunkqueue_append_mem(vr->out, GSTR_LEN(vr->wrk->tmp_str));
li_chunkqueue_append_mem(vr->direct_out, GSTR_LEN(vr->wrk->tmp_str));
}
return LI_HANDLER_GO_ON;
@ -1674,7 +1675,7 @@ static liHandlerResult core_handle_buffer_out(liVRequest *vr, gpointer param, gp
gint limit = GPOINTER_TO_INT(param);
UNUSED(context);
li_cqlimit_set_limit(vr->out->limit, limit);
li_chunkqueue_use_limit(vr->coninfo->resp->out, vr->wrk->loop, limit);
return LI_HANDLER_GO_ON;
}
@ -1707,7 +1708,7 @@ static liHandlerResult core_handle_buffer_in(liVRequest *vr, gpointer param, gpo
gint limit = GPOINTER_TO_INT(param);
UNUSED(context);
li_cqlimit_set_limit(vr->in->limit, limit);
li_chunkqueue_use_limit(vr->coninfo->req->out, vr->wrk->loop, limit);
return LI_HANDLER_GO_ON;
}

View File

@ -90,7 +90,6 @@ void li_request_copy(liRequest *dest, const liRequest *src) {
static void bad_request(liConnection *con, int status) {
con->info.keep_alive = FALSE;
con->mainvr->response.http_status = status;
li_vrequest_handle_direct(con->mainvr);
}
static gboolean request_parse_url(liVRequest *vr) {

View File

@ -22,20 +22,30 @@ void li_response_clear(liResponse *resp) {
resp->transfer_encoding = LI_HTTP_TRANSFER_ENCODING_IDENTITY;
}
gboolean li_response_send_headers(liConnection *con) {
static void li_response_send_error_page(liVRequest *vr, liChunkQueue *response_body);
void li_response_send_headers(liVRequest *vr, liChunkQueue *raw_out, liChunkQueue *response_body) {
GString *head;
liVRequest *vr = con->mainvr;
gboolean have_real_body, response_complete;
liChunkQueue *tmp_cq = NULL;
if (vr->response.http_status < 100 || vr->response.http_status > 999) {
VR_ERROR(vr, "wrong status: %i", vr->response.http_status);
return FALSE;
VR_ERROR(vr, "wrong status: %i, internal error", vr->response.http_status);
vr->response.http_status = 500;
vr->coninfo->keep_alive = FALSE;
response_body = NULL;
}
have_real_body = (NULL != response_body) && ((response_body->length > 0) || !response_body->is_closed);
response_complete = (NULL != response_body) && response_body->is_closed;
head = g_string_sized_new(8*1024-1);
if (0 == con->out->length && con->mainvr->backend == NULL
&& vr->response.http_status >= 400 && vr->response.http_status < 600) {
li_response_send_error_page(con);
if (!have_real_body && vr->response.http_status >= 400 && vr->response.http_status < 600) {
tmp_cq = li_chunkqueue_new(); /* create a temporary cq for the response body */
response_body = tmp_cq;
li_response_send_error_page(vr, response_body);
have_real_body = response_complete = TRUE;
}
if ((vr->response.http_status >= 100 && vr->response.http_status < 200) ||
@ -43,16 +53,15 @@ gboolean li_response_send_headers(liConnection *con) {
vr->response.http_status == 205 ||
vr->response.http_status == 304) {
/* They never have a content-body/length */
li_chunkqueue_reset(con->out);
con->out->is_closed = TRUE;
con->raw_out->is_closed = TRUE;
} else if (con->out->is_closed) {
if (vr->request.http_method != LI_HTTP_METHOD_HEAD || con->out->length > 0) {
li_chunkqueue_reset(response_body);
response_body->is_closed = TRUE;
} else if (response_complete) {
if (vr->request.http_method != LI_HTTP_METHOD_HEAD || response_body->length > 0) {
/* do not send content-length: 0 if backend already skipped content generation for HEAD */
g_string_printf(con->wrk->tmp_str, "%"L_GOFFSET_FORMAT, con->out->length);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(con->wrk->tmp_str));
g_string_printf(vr->wrk->tmp_str, "%"L_GOFFSET_FORMAT, response_body->length);
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(vr->wrk->tmp_str));
}
} else if (con->info.keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) {
} else if (vr->coninfo->keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) {
/* TODO: maybe someone set a content length header? */
if (!(vr->response.transfer_encoding & LI_HTTP_TRANSFER_ENCODING_CHUNKED)) {
vr->response.transfer_encoding |= LI_HTTP_TRANSFER_ENCODING_CHUNKED;
@ -60,14 +69,13 @@ gboolean li_response_send_headers(liConnection *con) {
}
} else {
/* Unknown content length, no chunked encoding */
con->info.keep_alive = FALSE;
vr->coninfo->keep_alive = FALSE;
}
if (vr->request.http_method == LI_HTTP_METHOD_HEAD) {
/* content-length is set, but no body */
li_chunkqueue_reset(con->out);
con->out->is_closed = TRUE;
con->raw_out->is_closed = TRUE;
li_chunkqueue_reset(response_body);
response_body->is_closed = TRUE;
}
/* Status line */
@ -90,10 +98,10 @@ gboolean li_response_send_headers(liConnection *con) {
/* connection header, if needed. connection entries in the list are ignored below, send them directly */
if (vr->request.http_version == LI_HTTP_VERSION_1_1) {
if (!con->info.keep_alive)
if (!vr->coninfo->keep_alive)
g_string_append_len(head, CONST_STR_LEN("Connection: close\r\n"));
} else {
if (con->info.keep_alive)
if (vr->coninfo->keep_alive)
g_string_append_len(head, CONST_STR_LEN("Connection: keep-alive\r\n"));
}
@ -114,7 +122,7 @@ gboolean li_response_send_headers(liConnection *con) {
}
if (!have_date) {
GString *d = li_worker_current_timestamp(con->wrk, LI_GMTIME, LI_TS_FORMAT_HEADER);
GString *d = li_worker_current_timestamp(vr->wrk, LI_GMTIME, LI_TS_FORMAT_HEADER);
/* HTTP/1.1 requires a Date: header */
g_string_append_len(head, CONST_STR_LEN("Date: "));
g_string_append_len(head, GSTR_LEN(d));
@ -133,9 +141,12 @@ gboolean li_response_send_headers(liConnection *con) {
}
g_string_append_len(head, CONST_STR_LEN("\r\n"));
li_chunkqueue_append_string(con->raw_out, head);
li_chunkqueue_append_string(raw_out, head);
return TRUE;
if (NULL != tmp_cq) {
li_chunkqueue_steal_all(raw_out, tmp_cq);
li_chunkqueue_free(tmp_cq);
}
}
#define SET_LEN_AND_RETURN_STR(x) \
@ -197,8 +208,7 @@ static gchar *li_response_error_description(guint status_code, guint *len) {
}
}
void li_response_send_error_page(liConnection *con) {
liVRequest *vr = con->mainvr;
static void li_response_send_error_page(liVRequest *vr, liChunkQueue *response_body) {
gchar status_code[3];
guint len;
gchar *str;
@ -213,11 +223,11 @@ void li_response_send_error_page(liConnection *con) {
" <title>"
));
li_http_status_to_str(con->mainvr->response.http_status, status_code);
li_http_status_to_str(vr->response.http_status, status_code);
g_string_append_len(html, status_code, 3);
g_string_append_len(html, CONST_STR_LEN(" - "));
str = li_http_status_string(con->mainvr->response.http_status, &len);
str = li_http_status_string(vr->response.http_status, &len);
g_string_append_len(html, str, len);
g_string_append_len(html, CONST_STR_LEN(
@ -249,7 +259,7 @@ void li_response_send_error_page(liConnection *con) {
g_string_append_len(html, str, len);
g_string_append_len(html, CONST_STR_LEN("</h1>\n"));
str = li_response_error_description(con->mainvr->response.http_status, &len);
str = li_response_error_description(vr->response.http_status, &len);
g_string_append_len(html, str, len);
g_string_append_len(html, CONST_STR_LEN(" <p id=\"footer\">"));
@ -263,7 +273,7 @@ void li_response_send_error_page(liConnection *con) {
li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), CONST_STR_LEN("text/html; charset=utf-8"));
li_chunkqueue_append_string(con->out, html);
li_chunkqueue_append_string(response_body, html);
li_http_header_remove(vr->response.headers, CONST_STR_LEN("transfer-encoding"));
li_http_header_remove(vr->response.headers, CONST_STR_LEN("content-encoding"));
li_http_header_remove(vr->response.headers, CONST_STR_LEN("etag"));

View File

@ -31,19 +31,17 @@ static int lua_subrequest_gc(lua_State *L);
typedef int (*lua_Subrequest_Attrib)(liSubrequest *sr, lua_State *L);
static int lua_subrequest_attr_read_in(liSubrequest *sr, lua_State *L) {
if (NULL == sr->vr) { lua_pushnil(L); return 1; }
li_lua_push_chunkqueue(L, sr->vr->vr_in);
li_lua_push_chunkqueue(L, sr->coninfo.req->out);
return 1;
}
static int lua_subrequest_attr_read_out(liSubrequest *sr, lua_State *L) {
if (NULL == sr->vr) { lua_pushnil(L); return 1; }
li_lua_push_chunkqueue(L, sr->vr->vr_out);
li_lua_push_chunkqueue(L, sr->coninfo.resp->out);
return 1;
}
static int lua_subrequest_attr_read_is_done(liSubrequest *sr, lua_State *L) {
lua_pushboolean(L, (NULL == sr->vr) || sr->vr->vr_out->is_closed);
lua_pushboolean(L, sr->coninfo.resp->out->is_closed);
return 1;
}
@ -249,14 +247,14 @@ static void subvr_bind_lua(liSubrequest *sr, liLuaState *LL, int notify_ndx, int
static void subvr_check(liVRequest *vr) {
liSubrequest *sr = LI_CONTAINER_OF(vr->coninfo, liSubrequest, coninfo);
if (sr->notified_out_bytes < vr->vr_out->bytes_in
|| sr->notified_out_closed != vr->vr_out->is_closed
if (sr->notified_out_bytes < sr->coninfo.resp->out->bytes_in
|| sr->notified_out_closed != sr->coninfo.resp->out->is_closed
|| sr->notified_response_headers != sr->have_response_headers) {
subvr_run_lua(sr, sr->func_notify_ref);
}
sr->notified_out_bytes = vr->vr_out->bytes_in;
sr->notified_out_closed = vr->vr_out->is_closed;
sr->notified_out_bytes = sr->coninfo.resp->out->bytes_in;
sr->notified_out_closed = sr->coninfo.resp->out->is_closed;
sr->notified_response_headers = sr->have_response_headers;
if (sr->notified_out_closed) { /* reques done */
@ -264,22 +262,7 @@ static void subvr_check(liVRequest *vr) {
}
}
static G_GNUC_WARN_UNUSED_RESULT gboolean subvr_handle_response_headers(liVRequest *vr) {
liSubrequest *sr = LI_CONTAINER_OF(vr->coninfo, liSubrequest, coninfo);
sr->have_response_headers = TRUE;
subvr_check(vr);
return TRUE;
}
static G_GNUC_WARN_UNUSED_RESULT gboolean subvr_handle_response_body(liVRequest *vr) {
subvr_check(vr);
return TRUE;
}
static G_GNUC_WARN_UNUSED_RESULT gboolean subvr_handle_response_error(liVRequest *vr) {
static void subvr_handle_response_error(liVRequest *vr) {
liSubrequest *sr = LI_CONTAINER_OF(vr->coninfo, liSubrequest, coninfo);
li_vrequest_free(sr->vr);
@ -287,28 +270,14 @@ static G_GNUC_WARN_UNUSED_RESULT gboolean subvr_handle_response_error(liVRequest
subvr_run_lua(sr, sr->func_error_ref);
subvr_release_lua(sr);
return FALSE;
}
static G_GNUC_WARN_UNUSED_RESULT gboolean subvr_handle_request_headers(liVRequest *vr) {
static void subvr_handle_check_io(liVRequest *vr) {
subvr_check(vr);
return TRUE;
}
static gboolean subvr_handle_check_io(liVRequest *vr) {
subvr_check(vr);
return TRUE;
}
const liConCallbacks subrequest_callbacks = {
subvr_handle_request_headers,
subvr_handle_response_headers,
subvr_handle_response_body,
subvr_handle_response_error,
subvr_handle_check_io
};
@ -327,6 +296,9 @@ static liSubrequest* subrequest_new(liVRequest *vr) {
sr->coninfo.is_ssl = vr->coninfo->is_ssl;
sr->coninfo.keep_alive = FALSE; /* doesn't mean anything here anyway */
sr->coninfo.req = li_stream_null_new(&vr->wrk->jobqueue);
sr->coninfo.resp = li_stream_plug_new(&vr->wrk->jobqueue);
sr->vr = li_vrequest_new(vr->wrk, &sr->coninfo);
li_vrequest_start(sr->vr);
@ -334,7 +306,6 @@ static liSubrequest* subrequest_new(liVRequest *vr) {
li_request_copy(&sr->vr->request, &vr->request);
sr->vr->request.content_length = 0;
sr->vr->vr_in->is_closed = TRUE;
return sr;
}

View File

@ -337,7 +337,7 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
vr->throttle.magazine += vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY;
}
if (!vr->coninfo->callbacks->handle_check_io(vr)) continue; /* vr got reset */
// TODO: vr->coninfo->callbacks->handle_check_io(vr);
if (vr->throttle.magazine <= 0)
li_throttle_update(vr, 0, 0);