From b876f8401de7cbb3a8517fe2561488797e5a794a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Sat, 31 Jul 2010 14:44:45 +0200 Subject: [PATCH] [core]: rework connection/vrequest structs; add liConInfo --- include/lighttpd/connection.h | 42 +---- include/lighttpd/core_lua.h | 4 + include/lighttpd/throttle.h | 11 +- include/lighttpd/typedefs.h | 13 +- include/lighttpd/utils.h | 17 +- include/lighttpd/virtualrequest.h | 68 +++++++- src/CMakeLists.txt | 1 - src/main/Makefile.am | 1 - src/main/condition.c | 32 ++-- src/main/connection.c | 257 +++++++++++++++--------------- src/main/connection_lua.c | 44 ----- src/main/core_lua.c | 2 +- src/main/plugin_core.c | 17 +- src/main/request.c | 8 +- src/main/response.c | 8 +- src/main/throttle.c | 94 ++++++----- src/main/virtualrequest.c | 48 ++++-- src/main/virtualrequest_lua.c | 153 +++++++++++++++++- src/main/worker.c | 10 +- src/main/wscript | 1 - src/modules/mod_access.c | 8 +- src/modules/mod_accesslog.c | 32 ++-- src/modules/mod_debug.c | 18 +-- src/modules/mod_fastcgi.c | 30 ++-- src/modules/mod_limit.c | 18 ++- src/modules/mod_openssl.c | 4 +- src/modules/mod_progress.c | 8 +- src/modules/mod_proxy.c | 8 - src/modules/mod_redirect.c | 10 +- src/modules/mod_rewrite.c | 10 +- src/modules/mod_scgi.c | 30 ++-- src/modules/mod_status.c | 16 +- 32 files changed, 603 insertions(+), 420 deletions(-) delete mode 100644 src/main/connection_lua.c diff --git a/include/lighttpd/connection.h b/include/lighttpd/connection.h index d0cb3f9..50c7706 100644 --- a/include/lighttpd/connection.h +++ b/include/lighttpd/connection.h @@ -32,6 +32,8 @@ struct liConnection { liServerSocket *srv_sock; gpointer srv_sock_data; /** private data for custom sockets (ssl) */ + liConInfo info; + liConnectionState state; gboolean response_headers_sent, expect_100_cont; @@ -40,13 +42,12 @@ struct liConnection { liBuffer *raw_in_buffer; ev_io sock_watcher; - liSocketAddress remote_addr, local_addr; - GString *remote_addr_str, *local_addr_str; - gboolean is_ssl, keep_alive; liVRequest *mainvr; liHttpRequestCtx req_parser_ctx; + ev_tstamp ts_started; /* when connection was started, not a (v)request */ + /* Keep alive timeout data */ struct { GList *link; @@ -58,38 +59,6 @@ struct liConnection { /* I/O timeout data */ liWaitQueueElem io_timeout_elem; - - /* I/O throttling */ - gboolean throttled; /* TRUE if connection is throttled */ - struct { - struct { - liThrottlePool *ptr; /* NULL if not in any throttling pool */ - GList lnk; - GQueue *queue; - gint magazine; - } pool; - struct { - gchar unused; /* this struct is unused for now */ - } ip; - struct { - guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */ - gint magazine; - ev_tstamp last_update; - } con; - liWaitQueueElem wqueue_elem; - } throttle; - - ev_tstamp ts_started; - - struct { - guint64 bytes_in; /* total number of bytes received */ - guint64 bytes_out; /* total number of bytes sent */ - ev_tstamp last_avg; - guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */ - guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */ - guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */ - guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */ - } stats; }; /* Internal functions */ @@ -105,4 +74,7 @@ LI_API void li_connection_error(liConnection *con); /* used in worker.c */ /* public function */ LI_API gchar *li_connection_state_str(liConnectionState state); +/* returns NULL if the vrequest doesn't belong to a liConnection* object */ +LI_API liConnection* li_connection_from_vrequest(liVRequest *vr); + #endif diff --git a/include/lighttpd/core_lua.h b/include/lighttpd/core_lua.h index 13f513d..c4da18b 100644 --- a/include/lighttpd/core_lua.h +++ b/include/lighttpd/core_lua.h @@ -55,6 +55,10 @@ LI_API void li_lua_init_vrequest_mt(lua_State *L); LI_API liVRequest* li_lua_get_vrequest(lua_State *L, int ndx); LI_API int li_lua_push_vrequest(lua_State *L, liVRequest *vr); +LI_API void li_lua_init_coninfo_mt(lua_State *L); +LI_API liConInfo* li_lua_get_coninfo(lua_State *L, int ndx); +LI_API int li_lua_push_coninfo(lua_State *L, liConInfo *vr); + LI_API int li_lua_fixindex(lua_State *L, int ndx); diff --git a/include/lighttpd/throttle.h b/include/lighttpd/throttle.h index 3aa223d..9d4f5b4 100644 --- a/include/lighttpd/throttle.h +++ b/include/lighttpd/throttle.h @@ -3,7 +3,6 @@ #define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */ -typedef struct liThrottlePool liThrottlePool; struct liThrottlePool { GString *name; guint rate; /** bytes/s */ @@ -16,19 +15,21 @@ struct liThrottlePool { ev_tstamp *last_con_rearm; }; -typedef struct liThrottleParam liThrottleParam; struct liThrottleParam { guint rate; guint burst; }; -LI_API void li_throttle_reset(liConnection *con); +LI_API void li_throttle_reset(liVRequest *vr); LI_API void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents); LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate); LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool); -LI_API void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool); -LI_API void li_throttle_pool_release(liConnection *con); +LI_API void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool); +LI_API void li_throttle_pool_release(liVRequest *vr); + +/* update throttle data: notify it that we sent bytes, and that we never send more than write_max at once */ +LI_API void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max); #endif diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index 3be671a..b860e44 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -119,9 +119,8 @@ typedef enum { LI_NETWORK_STATUS_SUCCESS, /**< some IO was actually done (read/write) or cq was empty for write */ LI_NETWORK_STATUS_FATAL_ERROR, LI_NETWORK_STATUS_CONNECTION_CLOSE, - LI_NETWORK_STATUS_WAIT_FOR_EVENT, /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK; no real IO was done + LI_NETWORK_STATUS_WAIT_FOR_EVENT /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK; no real IO was done internal: some io may be done */ - LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT /**< nothing done yet, read/write will be done somewhere else */ } liNetworkStatus; /* options.h */ @@ -207,6 +206,12 @@ typedef struct liServer liServer; typedef struct liServerSocket liServerSocket; +/* throttle.h */ + +typedef struct liThrottlePool liThrottlePool; + +typedef struct liThrottleParam liThrottleParam; + /* value.h */ typedef struct liValue liValue; @@ -224,6 +229,10 @@ typedef enum { /* virtualrequest.h */ +typedef struct liConCallbacks liConCallbacks; + +typedef struct liConInfo liConInfo; + typedef struct liVRequest liVRequest; typedef struct liVRequestRef liVRequestRef; diff --git a/include/lighttpd/utils.h b/include/lighttpd/utils.h index a672e60..7b90b74 100644 --- a/include/lighttpd/utils.h +++ b/include/lighttpd/utils.h @@ -9,8 +9,6 @@ typedef enum { COUNTER_UNITS } liCounterType; - - LI_API void li_fatal(const gchar* msg); /* set O_NONBLOCK and FD_CLOEXEC */ @@ -123,6 +121,21 @@ LI_API GQuark li_sys_error_quark(); LI_API gboolean _li_set_sys_error(GError **error, const gchar *msg, const gchar *file, int lineno); + +/* idea from linux kernel container_of: include/linux/kernel.h + * Please note that this "implementation" is not "type-safe"; it doesn't + * check the type of the ptr. + */ +/** + * LI_CONTAINER_OF - cast a member of a structure out to the containing structure + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define LI_CONTAINER_OF(ptr, type, member) \ + ((type *)( (char *) ptr - offsetof(type, member) )) + /* inline implementations */ INLINE void li_path_append_slash(GString *path) { diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index 1317b99..7496d7b 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -36,6 +36,40 @@ typedef liHandlerResult (*liFilterHandlerCB)(liVRequest *vr, liFilter *f); typedef void (*liFilterFreeCB)(liVRequest *vr, liFilter *f); typedef liHandlerResult (*liVRequestHandlerCB)(liVRequest *vr); typedef liHandlerResult (*liVRequestPluginHandlerCB)(liVRequest *vr, liPlugin *p); +typedef gboolean (*liVRequestCheckIOCB)(liVRequest *vr); + +struct liConCallbacks { + liVRequestHandlerCB + handle_request_headers, + handle_response_headers, handle_response_body, + handle_response_error; /* this is _not_ for 500 - internal error */ + + liVRequestCheckIOCB handle_check_io; +}; + +/* this data "belongs" to a vrequest, but is updated by the connection code */ +struct liConInfo { + const liConCallbacks *callbacks; + + liSocketAddress remote_addr, local_addr; + GString *remote_addr_str, *local_addr_str; + gboolean is_ssl; + gboolean keep_alive; + + /* bytes in our "raw-io-out-queue" that hasn't be sent yet. (whatever "sent" means - in ssl buffer, kernel, ...) */ + goffset out_queue_length; + + /* use li_vrequest_update_stats_{in,out} to update this */ + struct { + guint64 bytes_in; /* total number of bytes received */ + guint64 bytes_out; /* total number of bytes sent */ + ev_tstamp last_avg; + guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */ + guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */ + guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */ + guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */ + } stats; +}; struct liFilter { liChunkQueue *in, *out; @@ -58,7 +92,7 @@ struct liVRequestRef { }; struct liVRequest { - liConnection *con; + liConInfo *coninfo; liWorker *wrk; liVRequestRef *ref; @@ -69,11 +103,6 @@ struct liVRequest { ev_tstamp ts_started; - liVRequestHandlerCB - handle_request_headers, - handle_response_headers, handle_response_body, - handle_response_error; /* this is _not_ for 500 - internal error */ - GPtrArray *plugin_ctx; liPlugin *backend; @@ -98,6 +127,27 @@ struct liVRequest { GList job_queue_link; GPtrArray *stat_cache_entries; + + /* I/O throttling */ + gboolean throttled; /* TRUE if vrequest is throttled */ + struct { + gint magazine; /* currently available for use */ + + struct { + liThrottlePool *ptr; /* NULL if not in any throttling pool */ + GList lnk; + GQueue *queue; + gint magazine; + } pool; + struct { + gchar unused; /* this struct is unused for now */ + } ip; + struct { + guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */ + ev_tstamp last_update; + } con; + liWaitQueueElem wqueue_elem; + } throttle; }; #define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \ @@ -110,7 +160,7 @@ struct liVRequest { } \ } while (0) -LI_API liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_response_headers, liVRequestHandlerCB handle_response_body, liVRequestHandlerCB handle_response_error, liVRequestHandlerCB handle_request_headers); +LI_API liVRequest* li_vrequest_new(liConnection *con, liConInfo *coninfo); LI_API void li_vrequest_free(liVRequest *vr); /* if keepalive = TRUE, you either have to reset it later again with FALSE or call li_vrequest_start before reusing the vr; * keepalive = TRUE doesn't reset the vr->request fields, so mod_status can show the last request data in the keep-alive state @@ -157,4 +207,8 @@ LI_API gboolean li_vrequest_redirect(liVRequest *vr, GString *uri); LI_API gboolean li_vrequest_redirect_directory(liVRequest *vr); +/* updates worker stats too */ +LI_API void li_vrequest_update_stats_in(liVRequest *vr, goffset transferred); +LI_API void li_vrequest_update_stats_out(liVRequest *vr, goffset transferred); + #endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index feaccf4..5fc1e04 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -246,7 +246,6 @@ SET(LIGHTTPD_SHARED_SRC ${LIGHTTPD_SHARED_SRC} chunk_lua.c core_lua.c - connection_lua.c environment_lua.c filters_lua.c http_headers_lua.c diff --git a/src/main/Makefile.am b/src/main/Makefile.am index 4a533c1..c607060 100644 --- a/src/main/Makefile.am +++ b/src/main/Makefile.am @@ -49,7 +49,6 @@ lua_src= \ value_lua.c \ \ chunk_lua.c \ - connection_lua.c \ core_lua.c \ environment_lua.c \ filters_lua.c \ diff --git a/src/main/condition.c b/src/main/condition.c index 904f8b7..7dce93f 100644 --- a/src/main/condition.c +++ b/src/main/condition.c @@ -30,7 +30,7 @@ static const liConditionValueType cond_value_hints[] = { /* uses wrk->tmp_str for temporary (and returned) strings */ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue, liConditionValue *res, liConditionValueType prefer) { - liConnection *con = vr->con; + liConInfo *coninfo = vr->coninfo; liHandlerResult r; struct stat st; int err; @@ -42,21 +42,21 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue case LI_COMP_REQUEST_LOCALIP: if (prefer == LI_COND_VALUE_HINT_STRING) { res->match_type = LI_COND_VALUE_HINT_STRING; - res->data.str = con->local_addr_str->str; + res->data.str = coninfo->local_addr_str->str; } else { res->match_type = LI_COND_VALUE_HINT_SOCKADDR; - res->data.addr = con->local_addr; + res->data.addr = coninfo->local_addr; } break; case LI_COMP_REQUEST_LOCALPORT: res->match_type = LI_COND_VALUE_HINT_NUMBER; - switch (con->local_addr.addr->plain.sa_family) { + switch (coninfo->local_addr.addr->plain.sa_family) { case AF_INET: - res->data.number = ntohs(con->local_addr.addr->ipv4.sin_port); + res->data.number = ntohs(coninfo->local_addr.addr->ipv4.sin_port); break; #ifdef HAVE_IPV6 case AF_INET6: - res->data.number = ntohs(con->local_addr.addr->ipv6.sin6_port); + res->data.number = ntohs(coninfo->local_addr.addr->ipv6.sin6_port); break; #endif default: @@ -67,21 +67,21 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue case LI_COMP_REQUEST_REMOTEIP: if (prefer == LI_COND_VALUE_HINT_STRING) { res->match_type = LI_COND_VALUE_HINT_STRING; - res->data.str = con->remote_addr_str->str; + res->data.str = coninfo->remote_addr_str->str; } else { res->match_type = LI_COND_VALUE_HINT_SOCKADDR; - res->data.addr = con->remote_addr; + res->data.addr = coninfo->remote_addr; } break; case LI_COMP_REQUEST_REMOTEPORT: res->match_type = LI_COND_VALUE_HINT_NUMBER; - switch (con->remote_addr.addr->plain.sa_family) { + switch (coninfo->remote_addr.addr->plain.sa_family) { case AF_INET: - res->data.number = ntohs(con->remote_addr.addr->ipv4.sin_port); + res->data.number = ntohs(coninfo->remote_addr.addr->ipv4.sin_port); break; #ifdef HAVE_IPV6 case AF_INET6: - res->data.number = ntohs(con->remote_addr.addr->ipv6.sin6_port); + res->data.number = ntohs(coninfo->remote_addr.addr->ipv6.sin6_port); break; #endif default: @@ -99,7 +99,7 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue break; case LI_COMP_REQUEST_SCHEME: res->match_type = LI_COND_VALUE_HINT_STRING; - res->data.str = con->is_ssl ? "https" : "http"; + res->data.str = coninfo->is_ssl ? "https" : "http"; break; case LI_COMP_REQUEST_QUERY_STRING: res->data.str = vr->request.uri.query->str; @@ -164,14 +164,14 @@ liHandlerResult li_condition_get_value(liVRequest *vr, liConditionLValue *lvalue break; case LI_COMP_REQUEST_HEADER: res->match_type = LI_COND_VALUE_HINT_STRING; - li_http_header_get_all(con->wrk->tmp_str, vr->request.headers, GSTR_LEN(lvalue->key)); - res->data.str = con->wrk->tmp_str->str; + li_http_header_get_all(vr->wrk->tmp_str, vr->request.headers, GSTR_LEN(lvalue->key)); + res->data.str = vr->wrk->tmp_str->str; break; case LI_COMP_RESPONSE_HEADER: VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr); res->match_type = LI_COND_VALUE_HINT_STRING; - li_http_header_get_all(con->wrk->tmp_str, vr->response.headers, GSTR_LEN(lvalue->key)); - res->data.str = con->wrk->tmp_str->str; + li_http_header_get_all(vr->wrk->tmp_str, vr->response.headers, GSTR_LEN(lvalue->key)); + res->data.str = vr->wrk->tmp_str->str; break; case LI_COMP_ENVIRONMENT: res->match_type = LI_COND_VALUE_HINT_STRING; diff --git a/src/main/connection.c b/src/main/connection.c index ccb5c59..4c57bb7 100644 --- a/src/main/connection.c +++ b/src/main/connection.c @@ -5,11 +5,26 @@ static void li_connection_reset_keep_alive(liConnection *con); static void li_connection_internal_error(liConnection *con); +static void update_io_events(liConnection *con) { + int events = 0; + + if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) { + events = events | EV_READ; + } + + if (con->raw_out->length > 0) { + if (!con->mainvr->throttled || con->mainvr->throttle.magazine > 0) { + events = events | EV_WRITE; + } + } + + li_ev_io_set_events(con->wrk->loop, &con->sock_watcher, events); +} + static void parse_request_body(liConnection *con) { if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) { goffset newbytes = 0; - li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_READ); if (con->mainvr->request.content_length == -1) { /* TODO: parse chunked encoded request body, filters */ /* li_chunkqueue_steal_all(con->in, con->raw_in); */ @@ -20,14 +35,11 @@ static void parse_request_body(liConnection *con) { } if (con->in->bytes_in == con->mainvr->request.content_length) { con->in->is_closed = TRUE; - li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ); } } if (newbytes > 0 || con->in->is_closed) { li_vrequest_handle_request_body(con->mainvr); } - } else { - li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ); } } @@ -56,41 +68,37 @@ static void forward_response_body(liConnection *con) { li_chunkqueue_steal_all(con->raw_out, con->out); } if (con->out->is_closed) con->raw_out->is_closed = TRUE; + con->info.out_queue_length = con->raw_out->length; } - if (con->raw_out->length > 0) { - li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE); - } else { - li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE); - } - } else { - li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE); } } +/* don't use con afterwards */ static void connection_request_done(liConnection *con) { liVRequest *vr = con->mainvr; liServerState s; if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { - VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->keep_alive); + VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->info.keep_alive); } li_plugins_handle_close(con); s = g_atomic_int_get(&con->srv->dest_state); - if (con->keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) { + if (con->info.keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) { li_connection_reset_keep_alive(con); } else { li_worker_con_put(con); } } +/* return FALSE if you shouldn't use con afterwards */ static gboolean check_response_done(liConnection *con) { if (con->in->is_closed && con->raw_out->is_closed && 0 == con->raw_out->length) { connection_request_done(con); - return TRUE; + return FALSE; } - return FALSE; + return TRUE; } static void connection_close(liConnection *con) { @@ -130,7 +138,7 @@ static void li_connection_internal_error(liConnection *con) { /* We only need the http version from the http request, "keep-alive" reset doesn't reset it */ li_vrequest_reset(con->mainvr, TRUE); - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; con->mainvr->response.http_status = 500; con->state = LI_CON_STATE_WRITE; /* skips further vrequest handling */ @@ -162,7 +170,7 @@ static gboolean connection_handle_read(liConnection *con) { con->keep_alive_requests++; /* disable keep alive if limit is reached */ if (con->keep_alive_requests == CORE_OPTION(LI_CORE_OPTION_MAX_KEEP_ALIVE_REQUESTS).number) - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; con->state = LI_CON_STATE_READ_REQUEST_HEADER; @@ -188,7 +196,7 @@ static gboolean connection_handle_read(liConnection *con) { li_counter_format(vr->request.uri.raw->len, COUNTER_BYTES, vr->wrk->tmp_str)->str ); - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; con->mainvr->response.http_status = 414; /* Request-URI Too Large */ li_vrequest_handle_direct(con->mainvr); con->state = LI_CON_STATE_WRITE; @@ -210,7 +218,7 @@ static gboolean connection_handle_read(liConnection *con) { } con->wrk->stats.requests++; - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; /* set status 400 if not already set to e.g. 413 */ if (con->mainvr->response.http_status == 0) con->mainvr->response.http_status = 400; @@ -230,7 +238,7 @@ static gboolean connection_handle_read(liConnection *con) { if (!li_request_validate_header(con)) { /* skip mainvr handling */ con->state = LI_CON_STATE_WRITE; - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; con->in->is_closed = TRUE; forward_response_body(con); } else { @@ -245,7 +253,6 @@ static gboolean connection_handle_read(liConnection *con) { } li_chunkqueue_append_mem(con->raw_out, CONST_STR_LEN("HTTP/1.1 100 Continue\r\n\r\n")); con->expect_100_cont = FALSE; - li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE); } con->state = LI_CON_STATE_HANDLE_MAINVR; @@ -261,8 +268,6 @@ static gboolean connection_handle_read(liConnection *con) { static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { liNetworkStatus res; - goffset write_max; - goffset transferred; liConnection *con = (liConnection*) w->data; gboolean update_io_timeout = FALSE; @@ -271,10 +276,8 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem); if (revents & EV_READ) { - if (con->in->is_closed) { - /* don't read the next request before current one is done */ - li_ev_io_rem_events(loop, w, EV_READ); - } else { + if (!con->in->is_closed) { + goffset transferred; transferred = con->raw_in->length; if (con->srv_sock->read_cb) { @@ -284,17 +287,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { } transferred = con->raw_in->length - transferred; - con->wrk->stats.bytes_in += transferred; - con->stats.bytes_in += transferred; update_io_timeout = update_io_timeout || (transferred > 0); - if ((ev_now(loop) - con->stats.last_avg) >= 5.0) { - con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s; - con->stats.bytes_out_5s = con->stats.bytes_out; - con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s; - con->stats.bytes_in_5s = con->stats.bytes_in; - con->stats.last_avg = ev_now(loop); - } + li_vrequest_update_stats_in(con->mainvr, transferred); switch (res) { case LI_NETWORK_STATUS_SUCCESS: @@ -312,21 +307,20 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { return; case LI_NETWORK_STATUS_WAIT_FOR_EVENT: break; - case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT: - /* TODO: aio */ - li_ev_io_rem_events(loop, w, EV_READ); - _ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio"); - break; } } } if (revents & EV_WRITE) { if (con->raw_out->length > 0) { - if (con->throttled) { - write_max = MIN(con->throttle.con.magazine, 256*1024); + goffset transferred; + static const goffset WRITE_MAX = 256*1024; /* 256kB */ + goffset write_max; + + if (con->mainvr->throttled) { + write_max = MIN(con->mainvr->throttle.magazine, WRITE_MAX); } else { - write_max = 256*1024; /* 256kB */ + write_max = WRITE_MAX; } if (write_max > 0) { @@ -339,8 +333,7 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { } transferred = transferred - con->raw_out->length; - con->wrk->stats.bytes_out += transferred; - con->stats.bytes_out += transferred; + con->info.out_queue_length = con->raw_out->length; update_io_timeout = update_io_timeout || (transferred > 0); switch (res) { @@ -357,47 +350,18 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { return; case LI_NETWORK_STATUS_WAIT_FOR_EVENT: break; - case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT: - /* TODO: aio */ - li_ev_io_rem_events(loop, w, EV_WRITE); - _ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio"); - break; } } else { transferred = 0; } - if ((ev_now(loop) - con->stats.last_avg) >= 5.0) { - con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s; - con->stats.bytes_out_5s = con->stats.bytes_out; - con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s; - con->stats.bytes_in_5s = con->stats.bytes_in; - con->stats.last_avg = ev_now(loop); - } + li_vrequest_update_stats_out(con->mainvr, transferred); - if (con->throttled) { - con->throttle.con.magazine -= transferred; - /*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con, - transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/ - if (con->throttle.con.magazine <= 0) { - li_ev_io_rem_events(loop, w, EV_WRITE); - li_waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem); - } - - if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queue) { - liThrottlePool *pool = con->throttle.pool.ptr; - g_atomic_int_inc(&pool->num_cons_queued); - con->throttle.pool.queue = pool->queues[con->wrk->ndx]; - g_queue_push_tail_link(con->throttle.pool.queue, &con->throttle.pool.lnk); - } - } - - if (0 == con->raw_out->length) { - li_ev_io_rem_events(loop, w, EV_WRITE); + if (con->mainvr->throttled) { + li_throttle_update(con->mainvr, transferred, WRITE_MAX); } } else { _DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue"); - li_ev_io_rem_events(loop, w, EV_WRITE); } } @@ -412,7 +376,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem); } - check_response_done(con); + if (!check_response_done(con)) return; + + update_io_events(con); } static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) { @@ -422,18 +388,19 @@ static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int reven } static liHandlerResult mainvr_handle_response_headers(liVRequest *vr) { - liConnection *con = vr->con; + liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { VR_DEBUG(vr, "%s", "read request/handle response header"); } parse_request_body(con); + update_io_events(con); return LI_HANDLER_GO_ON; } static liHandlerResult mainvr_handle_response_body(liVRequest *vr) { - liConnection *con = vr->con; - if (check_response_done(con)) return LI_HANDLER_GO_ON; + liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); + if (!check_response_done(con)) return LI_HANDLER_GO_ON; if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { VR_DEBUG(vr, "%s", "write response"); @@ -442,22 +409,49 @@ static liHandlerResult mainvr_handle_response_body(liVRequest *vr) { parse_request_body(con); forward_response_body(con); - if (check_response_done(con)) return LI_HANDLER_GO_ON; + if (!check_response_done(con)) return LI_HANDLER_GO_ON; + + update_io_events(con); return LI_HANDLER_GO_ON; } static liHandlerResult mainvr_handle_response_error(liVRequest *vr) { - li_connection_internal_error(vr->con); + liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); + + li_connection_internal_error(con); + update_io_events(con); + return LI_HANDLER_GO_ON; } static liHandlerResult mainvr_handle_request_headers(liVRequest *vr) { + liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); + /* start reading input */ - parse_request_body(vr->con); + parse_request_body(con); + update_io_events(con); + return LI_HANDLER_GO_ON; } +static gboolean mainvr_handle_check_io(liVRequest *vr) { + liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); + + update_io_events(con); + + return TRUE; +} + +static const liConCallbacks con_callbacks = { + mainvr_handle_request_headers, + mainvr_handle_response_headers, + mainvr_handle_response_body, + mainvr_handle_response_error, + + mainvr_handle_check_io +}; + liConnection* li_connection_new(liWorker *wrk) { liServer *srv = wrk->srv; liConnection *con = g_slice_new0(liConnection); @@ -471,19 +465,17 @@ liConnection* li_connection_new(liWorker *wrk) { ev_init(&con->sock_watcher, connection_cb); ev_io_set(&con->sock_watcher, -1, 0); con->sock_watcher.data = con; - con->remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); - con->local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); - con->is_ssl = FALSE; - con->keep_alive = TRUE; + con->info.remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); + con->info.local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN); + con->info.is_ssl = FALSE; + con->info.keep_alive = TRUE; con->raw_in = li_chunkqueue_new(); con->raw_out = li_chunkqueue_new(); - con->mainvr = li_vrequest_new(con, - mainvr_handle_response_headers, - mainvr_handle_response_body, - mainvr_handle_response_error, - mainvr_handle_request_headers); + con->info.callbacks = &con_callbacks; + + con->mainvr = li_vrequest_new(con, &con->info); li_http_request_parser_init(&con->req_parser_ctx, &con->mainvr->request, con->raw_in); con->in = con->mainvr->vr_in; @@ -502,9 +494,6 @@ liConnection* li_connection_new(liWorker *wrk) { con->io_timeout_elem.data = con; - con->throttle.wqueue_elem.data = con; - con->throttle.pool.lnk.data = con; - return con; } @@ -519,7 +508,7 @@ void li_connection_reset(liConnection *con) { li_server_socket_release(con->srv_sock); con->srv_sock = NULL; con->srv_sock_data = NULL; - con->is_ssl = FALSE; + con->info.is_ssl = FALSE; ev_io_stop(con->wrk->loop, &con->sock_watcher); if (con->sock_watcher.fd != -1) { @@ -534,9 +523,12 @@ void li_connection_reset(liConnection *con) { li_chunkqueue_reset(con->raw_in); li_chunkqueue_reset(con->raw_out); + con->info.out_queue_length = 0; li_buffer_release(con->raw_in_buffer); con->raw_in_buffer = NULL; + li_throttle_reset(con->mainvr); + li_vrequest_reset(con->mainvr, FALSE); /* restore chunkqueue limits */ @@ -549,11 +541,11 @@ void li_connection_reset(liConnection *con) { li_http_request_parser_reset(&con->req_parser_ctx); - g_string_truncate(con->remote_addr_str, 0); - li_sockaddr_clear(&con->remote_addr); - g_string_truncate(con->local_addr_str, 0); - li_sockaddr_clear(&con->local_addr); - con->keep_alive = TRUE; + g_string_truncate(con->info.remote_addr_str, 0); + li_sockaddr_clear(&con->info.remote_addr); + g_string_truncate(con->info.local_addr_str, 0); + li_sockaddr_clear(&con->info.local_addr); + con->info.keep_alive = TRUE; if (con->keep_alive_data.link) { g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link); @@ -565,18 +557,16 @@ void li_connection_reset(liConnection *con) { con->keep_alive_requests = 0; /* reset stats */ - con->stats.bytes_in = G_GUINT64_CONSTANT(0); - con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0); - con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); - con->stats.bytes_out = G_GUINT64_CONSTANT(0); - con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0); - con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); - con->stats.last_avg = 0; + con->info.stats.bytes_in = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_in_5s = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out_5s = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); + con->info.stats.last_avg = 0; /* remove from timeout queue */ li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem); - - li_throttle_reset(con); } static void li_connection_reset_keep_alive(liConnection *con) { @@ -617,9 +607,12 @@ static void li_connection_reset_keep_alive(liConnection *con) { con->expect_100_cont = FALSE; li_ev_io_set_events(con->wrk->loop, &con->sock_watcher, EV_READ); - con->keep_alive = TRUE; + con->info.keep_alive = TRUE; con->raw_out->is_closed = FALSE; + con->info.out_queue_length = con->raw_out->length; + + li_throttle_reset(con->mainvr); li_vrequest_reset(con->mainvr, TRUE); li_http_request_parser_reset(&con->req_parser_ctx); @@ -631,15 +624,13 @@ static void li_connection_reset_keep_alive(liConnection *con) { li_cqlimit_set_limit(con->raw_out->limit, 512*1024); /* reset stats */ - con->stats.bytes_in = G_GUINT64_CONSTANT(0); - con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0); - con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); - con->stats.bytes_out = G_GUINT64_CONSTANT(0); - con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0); - con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); - con->stats.last_avg = 0; - - li_throttle_reset(con); + con->info.stats.bytes_in = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_in_5s = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out_5s = G_GUINT64_CONSTANT(0); + con->info.stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0); + con->info.stats.last_avg = 0; if (con->raw_in->length != 0) { /* start handling next request if data is already available */ @@ -663,16 +654,18 @@ void li_connection_free(liConnection *con) { close(con->sock_watcher.fd); } ev_io_set(&con->sock_watcher, -1, 0); - g_string_free(con->remote_addr_str, TRUE); - li_sockaddr_clear(&con->remote_addr); - g_string_free(con->local_addr_str, TRUE); - li_sockaddr_clear(&con->local_addr); - con->keep_alive = TRUE; + g_string_free(con->info.remote_addr_str, TRUE); + li_sockaddr_clear(&con->info.remote_addr); + g_string_free(con->info.local_addr_str, TRUE); + li_sockaddr_clear(&con->info.local_addr); + con->info.keep_alive = TRUE; li_chunkqueue_free(con->raw_in); li_chunkqueue_free(con->raw_out); li_buffer_release(con->raw_in_buffer); + li_throttle_reset(con->mainvr); + li_vrequest_free(con->mainvr); li_http_request_parser_clear(&con->req_parser_ctx); @@ -700,3 +693,13 @@ gchar *li_connection_state_str(liConnectionState state) { return (gchar*)states[state]; } + +liConnection* li_connection_from_vrequest(liVRequest *vr) { + liConnection *con; + + if (vr->coninfo->callbacks != &con_callbacks) return NULL; + + con = LI_CONTAINER_OF(vr->coninfo, liConnection, info); + + return con; +} diff --git a/src/main/connection_lua.c b/src/main/connection_lua.c deleted file mode 100644 index c234e12..0000000 --- a/src/main/connection_lua.c +++ /dev/null @@ -1,44 +0,0 @@ - -#include - -#include -#include - -#define LUA_CONNECTION "liConnection*" - -static void init_con_mt(lua_State *L) { - /* TODO */ -} - -void li_lua_init_connection_mt(lua_State *L) { - if (luaL_newmetatable(L, LUA_CONNECTION)) { - init_con_mt(L); - } - lua_pop(L, 1); -} - -liConnection* li_lua_get_connection(lua_State *L, int ndx) { - if (!lua_isuserdata(L, ndx)) return NULL; - if (!lua_getmetatable(L, ndx)) return NULL; - luaL_getmetatable(L, LUA_CONNECTION); - if (lua_isnil(L, -1) || lua_isnil(L, -2) || !lua_equal(L, -1, -2)) { - lua_pop(L, 2); - return NULL; - } - lua_pop(L, 2); - return *(liConnection**) lua_touserdata(L, ndx); -} - -int li_lua_push_connection(lua_State *L, liConnection *con) { - liConnection **pcon; - - pcon = (liConnection**) lua_newuserdata(L, sizeof(liConnection*)); - *pcon = con; - - if (luaL_newmetatable(L, LUA_CONNECTION)) { - init_con_mt(L); - } - - lua_setmetatable(L, -2); - return 1; -} diff --git a/src/main/core_lua.c b/src/main/core_lua.c index 0968c75..ac7005a 100644 --- a/src/main/core_lua.c +++ b/src/main/core_lua.c @@ -243,7 +243,7 @@ static void lua_push_constants(lua_State *L, int ndx) { void li_lua_init(lua_State *L, liServer *srv, liWorker *wrk) { li_lua_init_chunk_mt(L); - li_lua_init_connection_mt(L); + li_lua_init_coninfo_mt(L); li_lua_init_environment_mt(L); li_lua_init_filter_mt(L); li_lua_init_physical_mt(L); diff --git a/src/main/plugin_core.c b/src/main/plugin_core.c index b69a5ca..9e23482 100644 --- a/src/main/plugin_core.c +++ b/src/main/plugin_core.c @@ -1418,7 +1418,7 @@ static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param, UNUSED(context); - li_throttle_pool_acquire(vr->con, pool); + li_throttle_pool_acquire(vr, pool); return LI_HANDLER_GO_ON; } @@ -1498,20 +1498,19 @@ static void core_throttle_connection_free(liServer *srv, gpointer param) { static liHandlerResult core_handle_throttle_connection(liVRequest *vr, gpointer param, gpointer *context) { - liConnection *con = vr->con; liThrottleParam *throttle_param = param; UNUSED(context); - con->throttle.con.rate = throttle_param->rate; - con->throttled = TRUE; + vr->throttle.con.rate = throttle_param->rate; + vr->throttled = TRUE; - if (con->throttle.pool.magazine) { - guint supply = MAX(con->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY); - con->throttle.con.magazine += supply; - con->throttle.pool.magazine -= supply; + if (vr->throttle.pool.magazine) { + guint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY); + vr->throttle.magazine += supply; + vr->throttle.pool.magazine -= supply; } else { - con->throttle.con.magazine += throttle_param->burst; + vr->throttle.magazine += throttle_param->burst; } return LI_HANDLER_GO_ON; diff --git a/src/main/request.c b/src/main/request.c index 095725b..996df94 100644 --- a/src/main/request.c +++ b/src/main/request.c @@ -61,7 +61,7 @@ void li_request_clear(liRequest *req) { /* closes connection after response */ static void bad_request(liConnection *con, int status) { - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; con->mainvr->response.http_status = status; li_vrequest_handle_direct(con->mainvr); } @@ -94,7 +94,7 @@ gboolean li_request_validate_header(liConnection *con) { liHttpHeader *hh; GList *l; - if (con->is_ssl) { + if (con->info.is_ssl) { g_string_append_len(req->uri.scheme, CONST_STR_LEN("https")); } else { g_string_append_len(req->uri.scheme, CONST_STR_LEN("http")); @@ -103,11 +103,11 @@ gboolean li_request_validate_header(liConnection *con) { switch (req->http_version) { case LI_HTTP_VERSION_1_0: if (!li_http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("keep-alive"))) - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; break; case LI_HTTP_VERSION_1_1: if (li_http_header_is(req->headers, CONST_STR_LEN("connection"), CONST_STR_LEN("close"))) - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; break; case LI_HTTP_VERSION_UNSET: bad_request(con, 505); /* Version not Supported */ diff --git a/src/main/response.c b/src/main/response.c index 4c864ac..716d231 100644 --- a/src/main/response.c +++ b/src/main/response.c @@ -50,7 +50,7 @@ gboolean li_response_send_headers(liConnection *con) { g_string_printf(con->wrk->tmp_str, "%"L_GOFFSET_FORMAT, con->out->length); li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Length"), GSTR_LEN(con->wrk->tmp_str)); } - } else if (con->keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) { + } else if (con->info.keep_alive && vr->request.http_version == LI_HTTP_VERSION_1_1) { /* TODO: maybe someone set a content length header? */ if (!(vr->response.transfer_encoding & LI_HTTP_TRANSFER_ENCODING_CHUNKED)) { vr->response.transfer_encoding |= LI_HTTP_TRANSFER_ENCODING_CHUNKED; @@ -58,7 +58,7 @@ gboolean li_response_send_headers(liConnection *con) { } } else { /* Unknown content length, no chunked encoding */ - con->keep_alive = FALSE; + con->info.keep_alive = FALSE; } if (vr->request.http_method == LI_HTTP_METHOD_HEAD) { @@ -71,11 +71,11 @@ gboolean li_response_send_headers(liConnection *con) { /* Status line */ if (vr->request.http_version == LI_HTTP_VERSION_1_1) { g_string_append_len(head, CONST_STR_LEN("HTTP/1.1 ")); - if (!con->keep_alive) + if (!con->info.keep_alive) li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Connection"), CONST_STR_LEN("close")); } else { g_string_append_len(head, CONST_STR_LEN("HTTP/1.0 ")); - if (con->keep_alive) + if (con->info.keep_alive) li_http_header_overwrite(vr->response.headers, CONST_STR_LEN("Connection"), CONST_STR_LEN("keep-alive")); } diff --git a/src/main/throttle.c b/src/main/throttle.c index 55805d7..e88d9aa 100644 --- a/src/main/throttle.c +++ b/src/main/throttle.c @@ -49,43 +49,43 @@ void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) { g_slice_free(liThrottlePool, pool); } -void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool) { +void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) { gint magazine; - if (con->throttle.pool.ptr == pool) + if (vr->throttle.pool.ptr == pool) return; - if (con->throttle.pool.ptr != NULL) { + if (vr->throttle.pool.ptr != NULL) { /* already in a different pool */ - li_throttle_pool_release(con); + li_throttle_pool_release(vr); } /* try to steal some initial 4kbytes from the pool */ while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) { if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) { - con->throttle.pool.magazine = 4*1024; + vr->throttle.pool.magazine = 4*1024; break; } } - con->throttle.pool.ptr = pool; - con->throttled = TRUE; + vr->throttle.pool.ptr = pool; + vr->throttled = TRUE; } -void li_throttle_pool_release(liConnection *con) { - if (con->throttle.pool.queue == NULL) +void li_throttle_pool_release(liVRequest *vr) { + if (vr->throttle.pool.queue == NULL) return; - if (con->throttle.pool.queue) { - g_queue_unlink(con->throttle.pool.queue, &con->throttle.pool.lnk); - con->throttle.pool.queue = NULL; - g_atomic_int_add(&con->throttle.pool.ptr->num_cons_queued, -1); + if (vr->throttle.pool.queue) { + g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk); + vr->throttle.pool.queue = NULL; + g_atomic_int_add(&vr->throttle.pool.ptr->num_cons_queued, -1); } /* give back bandwidth */ - g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine); - con->throttle.pool.magazine = 0; - con->throttle.pool.ptr = NULL; + g_atomic_int_add(&vr->throttle.pool.ptr->magazine, vr->throttle.pool.magazine); + vr->throttle.pool.magazine = 0; + vr->throttle.pool.ptr = NULL; } static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) { @@ -123,8 +123,8 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) { /* rearm connections */ for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) { - ((liConnection*)lnk->data)->throttle.pool.magazine += supply; - ((liConnection*)lnk->data)->throttle.pool.queue = NULL; + ((liVRequest*)lnk->data)->throttle.pool.magazine += supply; + ((liVRequest*)lnk->data)->throttle.pool.queue = NULL; lnk_next = lnk->next; lnk->next = NULL; lnk->prev = NULL; @@ -138,23 +138,23 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) { } } -void li_throttle_reset(liConnection *con) { - if (!con->throttled) +void li_throttle_reset(liVRequest *vr) { + if (!vr->throttled) return; /* remove from throttle queue */ - li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem); - li_throttle_pool_release(con); + li_waitqueue_remove(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem); + li_throttle_pool_release(vr); - con->throttle.con.rate = 0; - con->throttle.con.magazine = 0; - con->throttled = FALSE; + vr->throttle.con.rate = 0; + vr->throttle.magazine = 0; + vr->throttled = FALSE; } void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) { liWaitQueueElem *wqe; liThrottlePool *pool; - liConnection *con; + liVRequest *vr; liWorker *wrk; ev_tstamp now; guint supply; @@ -165,31 +165,49 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) { now = ev_now(loop); while (NULL != (wqe = li_waitqueue_pop(&wrk->throttle_queue))) { - con = wqe->data; + vr = wqe->data; - if (con->throttle.pool.ptr) { + if (vr->throttle.pool.ptr) { /* throttled by pool */ - pool = con->throttle.pool.ptr; + pool = vr->throttle.pool.ptr; li_throttle_pool_rearm(wrk, pool); - if (con->throttle.con.rate) { - supply = MIN(con->throttle.pool.magazine, con->throttle.con.rate * THROTTLE_GRANULARITY); - con->throttle.con.magazine += supply; - con->throttle.pool.magazine -= supply; + if (vr->throttle.con.rate) { + supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate * THROTTLE_GRANULARITY); + vr->throttle.magazine += supply; + vr->throttle.pool.magazine -= supply; } else { - con->throttle.con.magazine += con->throttle.pool.magazine; - con->throttle.pool.magazine = 0; + vr->throttle.magazine += vr->throttle.pool.magazine; + vr->throttle.pool.magazine = 0; } /* TODO: throttled by ip */ } else { /* throttled by connection */ - if (con->throttle.con.magazine <= con->throttle.con.rate * THROTTLE_GRANULARITY * 4) - con->throttle.con.magazine += con->throttle.con.rate * THROTTLE_GRANULARITY; + if (vr->throttle.magazine <= vr->throttle.con.rate * THROTTLE_GRANULARITY * 4) + vr->throttle.magazine += vr->throttle.con.rate * THROTTLE_GRANULARITY; } - li_ev_io_add_events(loop, &con->sock_watcher, EV_WRITE); + vr->coninfo->callbacks->handle_check_io(vr); } li_waitqueue_update(&wrk->throttle_queue); } + +void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max) { + vr->throttle.magazine -= transferred; + + /*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con, + transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/ + + if (vr->throttle.magazine <= 0) { + li_waitqueue_push(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem); + } + + if (vr->throttle.pool.ptr && vr->throttle.pool.magazine <= write_max && !vr->throttle.pool.queue) { + liThrottlePool *pool = vr->throttle.pool.ptr; + g_atomic_int_inc(&pool->num_cons_queued); + vr->throttle.pool.queue = pool->queues[vr->wrk->ndx]; + g_queue_push_tail_link(vr->throttle.pool.queue, &vr->throttle.pool.lnk); + } +} diff --git a/src/main/virtualrequest.c b/src/main/virtualrequest.c index dde1baf..379c902 100644 --- a/src/main/virtualrequest.c +++ b/src/main/virtualrequest.c @@ -136,11 +136,11 @@ liFilter* li_vrequest_add_filter_out(liVRequest *vr, liFilterHandlerCB handle_da return filters_add(&vr->filters_out, handle_data, handle_free, param); } -liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_response_headers, liVRequestHandlerCB handle_response_body, liVRequestHandlerCB handle_response_error, liVRequestHandlerCB handle_request_headers) { +liVRequest* li_vrequest_new(liConnection *con, liConInfo *coninfo) { liServer *srv = con->srv; liVRequest *vr = g_slice_new0(liVRequest); - vr->con = con; + vr->coninfo = coninfo; vr->wrk = con->wrk; vr->ref = g_slice_new0(liVRequestRef); vr->ref->refcount = 1; @@ -148,11 +148,6 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon vr->ref->wrk = con->wrk; vr->state = LI_VRS_CLEAN; - vr->handle_response_headers = handle_response_headers; - vr->handle_response_body = handle_response_body; - vr->handle_response_error = handle_response_error; - vr->handle_request_headers = handle_request_headers; - vr->plugin_ctx = g_ptr_array_new(); g_ptr_array_set_size(vr->plugin_ctx, g_hash_table_size(srv->plugins)); vr->options = g_slice_copy(srv->option_def_values->len * sizeof(liOptionValue), srv->option_def_values->data); @@ -194,6 +189,9 @@ liVRequest* li_vrequest_new(liConnection *con, liVRequestHandlerCB handle_respon li_action_stack_init(&vr->action_stack); + vr->throttle.wqueue_elem.data = vr; + vr->throttle.pool.lnk.data = vr; + return vr; } @@ -521,7 +519,7 @@ static void vrequest_do_handle_write(liVRequest *vr) { return; } - switch (vr->handle_response_body(vr)) { + switch (vr->coninfo->callbacks->handle_response_body(vr)) { case LI_HANDLER_GO_ON: break; case LI_HANDLER_COMEBACK: @@ -561,7 +559,7 @@ void li_vrequest_state_machine(liVRequest *vr) { case LI_HANDLER_ERROR: return; } - res = vr->handle_request_headers(vr); + res = vr->coninfo->callbacks->handle_request_headers(vr); switch (res) { case LI_HANDLER_GO_ON: if (vr->state == LI_VRS_HANDLE_REQUEST_HEADERS) { @@ -610,7 +608,7 @@ void li_vrequest_state_machine(liVRequest *vr) { case LI_HANDLER_ERROR: return; } - res = vr->handle_response_headers(vr); + res = vr->coninfo->callbacks->handle_response_headers(vr); switch (res) { case LI_HANDLER_GO_ON: vr->state = LI_VRS_WRITE_CONTENT; @@ -642,7 +640,7 @@ void li_vrequest_state_machine(liVRequest *vr) { VR_DEBUG(vr, "%s", "error"); } /* this will probably reset the vrequest, so stop handling after it */ - vr->handle_response_error(vr); + vr->coninfo->callbacks->handle_response_error(vr); return; } } while (!done); @@ -688,7 +686,7 @@ gboolean li_vrequest_redirect_directory(liVRequest *vr) { if (vr->request.uri.authority->len > 0) { g_string_append_len(uri, GSTR_LEN(vr->request.uri.authority)); } else { - g_string_append_len(uri, GSTR_LEN(vr->con->local_addr_str)); + g_string_append_len(uri, GSTR_LEN(vr->coninfo->local_addr_str)); } g_string_append_len(uri, GSTR_LEN(vr->request.uri.raw_orig_path)); g_string_append_c(uri, '/'); @@ -699,3 +697,29 @@ gboolean li_vrequest_redirect_directory(liVRequest *vr) { return li_vrequest_redirect(vr, uri); } + +static void update_stats_avg(ev_tstamp now, liConInfo *coninfo) { + if ((now - coninfo->stats.last_avg) >= 5.0) { + coninfo->stats.bytes_out_5s_diff = coninfo->stats.bytes_out - coninfo->stats.bytes_out_5s; + coninfo->stats.bytes_out_5s = coninfo->stats.bytes_out; + coninfo->stats.bytes_in_5s_diff = coninfo->stats.bytes_in - coninfo->stats.bytes_in_5s; + coninfo->stats.bytes_in_5s = coninfo->stats.bytes_in; + coninfo->stats.last_avg = now; + } +} + +void li_vrequest_update_stats_in(liVRequest *vr, goffset transferred) { + liConInfo *coninfo = vr->coninfo; + vr->wrk->stats.bytes_in += transferred; + coninfo->stats.bytes_in += transferred; + + update_stats_avg(ev_now(vr->wrk->loop), coninfo); +} + +void li_vrequest_update_stats_out(liVRequest *vr, goffset transferred) { + liConInfo *coninfo = vr->coninfo; + vr->wrk->stats.bytes_out += transferred; + coninfo->stats.bytes_out += transferred; + + update_stats_avg(ev_now(vr->wrk->loop), coninfo); +} diff --git a/src/main/virtualrequest_lua.c b/src/main/virtualrequest_lua.c index 274b63c..48fb2b2 100644 --- a/src/main/virtualrequest_lua.c +++ b/src/main/virtualrequest_lua.c @@ -20,7 +20,7 @@ static int lua_vrequest_attr_read_out(liVRequest *vr, lua_State *L) { } static int lua_vrequest_attr_read_con(liVRequest *vr, lua_State *L) { - li_lua_push_connection(L, vr->con); + li_lua_push_coninfo(L, vr->coninfo); return 1; } @@ -76,6 +76,10 @@ static const struct { { NULL, NULL, NULL } }; +#undef AR +#undef AW +#undef ARW + static int lua_vrequest_index(lua_State *L) { liVRequest *vr; const char *key; @@ -370,3 +374,150 @@ int li_lua_push_vrequest(lua_State *L, liVRequest *vr) { lua_setmetatable(L, -2); return 1; } + + +#define LUA_CONINFO "liConInfo*" + +typedef int (*lua_ConInfo_Attrib)(liConInfo *coninfo, lua_State *L); + +static int lua_coninfo_attr_read_local(liConInfo *coninfo, lua_State *L) { + lua_pushlstring(L, GSTR_LEN(coninfo->local_addr_str)); + return 1; +} + +static int lua_coninfo_attr_read_remote(liConInfo *coninfo, lua_State *L) { + lua_pushlstring(L, GSTR_LEN(coninfo->remote_addr_str)); + return 1; +} + +#define AR(m) { #m, lua_coninfo_attr_read_##m, NULL } +#define AW(m) { #m, NULL, lua_coninfo_attr_write_##m } +#define ARW(m) { #m, lua_coninfo_attr_read_##m, lua_coninfo_attr_write_##m } + +static const struct { + const char* key; + lua_ConInfo_Attrib read_attr, write_attr; +} coninfo_attribs[] = { + AR(local), + AR(remote), + + { NULL, NULL, NULL } +}; + +#undef AR +#undef AW +#undef ARW + +static int lua_coninfo_index(lua_State *L) { + liConInfo *coninfo; + const char *key; + int i; + + if (lua_gettop(L) != 2) { + lua_pushstring(L, "incorrect number of arguments"); + lua_error(L); + } + + if (li_lua_metatable_index(L)) return 1; + + coninfo = li_lua_get_coninfo(L, 1); + if (!coninfo) return 0; + + if (lua_isnumber(L, 2)) return 0; + if (!lua_isstring(L, 2)) return 0; + + key = lua_tostring(L, 2); + for (i = 0; coninfo_attribs[i].key ; i++) { + if (0 == strcmp(key, coninfo_attribs[i].key)) { + if (coninfo_attribs[i].read_attr) + return coninfo_attribs[i].read_attr(coninfo, L); + break; + } + } + + lua_pushstring(L, "cannot read attribute "); + lua_pushstring(L, key); + lua_pushstring(L, " in coninfo"); + lua_concat(L, 3); + lua_error(L); + + return 0; +} + +static int lua_coninfo_newindex(lua_State *L) { + liConInfo *coninfo; + const char *key; + int i; + + if (lua_gettop(L) != 3) { + lua_pushstring(L, "incorrect number of arguments"); + lua_error(L); + } + + coninfo = li_lua_get_coninfo(L, 1); + if (!coninfo) return 0; + + if (lua_isnumber(L, 2)) return 0; + if (!lua_isstring(L, 2)) return 0; + + key = lua_tostring(L, 2); + for (i = 0; coninfo_attribs[i].key ; i++) { + if (0 == strcmp(key, coninfo_attribs[i].key)) { + if (coninfo_attribs[i].write_attr) + return coninfo_attribs[i].write_attr(coninfo, L); + break; + } + } + + lua_pushstring(L, "cannot write attribute "); + lua_pushstring(L, key); + lua_pushstring(L, "in coninfo"); + lua_concat(L, 3); + lua_error(L); + + return 0; +} + +static const luaL_Reg coninfo_mt[] = { + { "__index", lua_coninfo_index }, + { "__newindex", lua_coninfo_newindex }, + + { NULL, NULL } +}; + +static void init_coninfo_mt(lua_State *L) { + luaL_register(L, NULL, coninfo_mt); +} + +void li_lua_init_coninfo_mt(lua_State *L) { + if (luaL_newmetatable(L, LUA_CONINFO)) { + init_coninfo_mt(L); + } + lua_pop(L, 1); +} + +liConInfo* li_lua_get_coninfo(lua_State *L, int ndx) { + if (!lua_isuserdata(L, ndx)) return NULL; + if (!lua_getmetatable(L, ndx)) return NULL; + luaL_getmetatable(L, LUA_CONINFO); + if (lua_isnil(L, -1) || lua_isnil(L, -2) || !lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return NULL; + } + lua_pop(L, 2); + return *(liConInfo**) lua_touserdata(L, ndx); +} + +int li_lua_push_coninfo(lua_State *L, liConInfo *coninfo) { + liConInfo **pconinfo; + + pconinfo = (liConInfo**) lua_newuserdata(L, sizeof(liConInfo*)); + *pconinfo = coninfo; + + if (luaL_newmetatable(L, LUA_CONINFO)) { + init_coninfo_mt(L); + } + + lua_setmetatable(L, -2); + return 1; +} diff --git a/src/main/worker.c b/src/main/worker.c index b5c7fa0..a45f862 100644 --- a/src/main/worker.c +++ b/src/main/worker.c @@ -159,7 +159,7 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) con = wqe->data; vr = con->mainvr; if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { - VR_DEBUG(vr, "connection io-timeout from %s after %.2f seconds", con->remote_addr_str->str, now - wqe->ts); + VR_DEBUG(vr, "connection io-timeout from %s after %.2f seconds", con->info.remote_addr_str->str, now - wqe->ts); } li_plugins_handle_close(con); li_worker_con_put(con); @@ -288,11 +288,11 @@ void li_worker_new_con(liWorker *ctx, liWorker *wrk, liSocketAddress remote_addr con->ts_started = CUR_TS(wrk); con->mainvr->ts_started = CUR_TS(wrk); - con->remote_addr = remote_addr; - li_sockaddr_to_string(remote_addr, con->remote_addr_str, FALSE); + con->info.remote_addr = remote_addr; + li_sockaddr_to_string(remote_addr, con->info.remote_addr_str, FALSE); - con->local_addr = li_sockaddr_local_from_socket(s); - li_sockaddr_to_string(con->local_addr, con->local_addr_str, FALSE); + con->info.local_addr = li_sockaddr_local_from_socket(s); + li_sockaddr_to_string(con->info.local_addr, con->info.local_addr_str, FALSE); li_waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem); diff --git a/src/main/wscript b/src/main/wscript index ecbb458..e5c406c 100644 --- a/src/main/wscript +++ b/src/main/wscript @@ -73,7 +73,6 @@ def build(bld): value_lua.c chunk_lua.c - connection_lua.c core_lua.c environment_lua.c filters_lua.c diff --git a/src/modules/mod_access.c b/src/modules/mod_access.c index ea65a61..b7e7222 100644 --- a/src/modules/mod_access.c +++ b/src/modules/mod_access.c @@ -67,7 +67,7 @@ enum { static liHandlerResult access_check(liVRequest *vr, gpointer param, gpointer *context) { access_check_data *acd = param; - liSockAddr *addr = vr->con->remote_addr.addr; + liSockAddr *addr = vr->coninfo->remote_addr.addr; gboolean log_blocked = _OPTION(vr, acd->p, OPTION_LOG_BLOCKED).boolean; GString *redirect_url = _OPTIONPTR(vr, acd->p, OPTION_REDIRECT_URL).string; @@ -82,7 +82,7 @@ static liHandlerResult access_check(liVRequest *vr, gpointer param, gpointer *co vr->response.http_status = 403; if (log_blocked) - VR_INFO(vr, "access.check: blocked %s", vr->con->remote_addr_str->str); + VR_INFO(vr, "access.check: blocked %s", vr->coninfo->remote_addr_str->str); } #ifdef HAVE_IPV6 } else if (addr->plain.sa_family == AF_INET6) { @@ -93,7 +93,7 @@ static liHandlerResult access_check(liVRequest *vr, gpointer param, gpointer *co vr->response.http_status = 403; if (log_blocked) - VR_INFO(vr, "access.check: blocked %s", vr->con->remote_addr_str->str); + VR_INFO(vr, "access.check: blocked %s", vr->coninfo->remote_addr_str->str); } #endif } else { @@ -222,7 +222,7 @@ static liHandlerResult access_deny(liVRequest *vr, gpointer param, gpointer *con vr->response.http_status = 403; if (log_blocked) { - VR_INFO(vr, "access.deny: blocked %s", vr->con->remote_addr_str->str); + VR_INFO(vr, "access.deny: blocked %s", vr->coninfo->remote_addr_str->str); } return LI_HANDLER_GO_ON; diff --git a/src/modules/mod_accesslog.c b/src/modules/mod_accesslog.c index f3302ac..03a252c 100644 --- a/src/modules/mod_accesslog.c +++ b/src/modules/mod_accesslog.c @@ -243,10 +243,10 @@ static GString *al_format_log(liVRequest *vr, al_data *ald, GArray *format) { g_string_append_c(str, '%'); break; case AL_FORMAT_REMOTE_ADDR: - g_string_append_len(str, GSTR_LEN(vr->con->remote_addr_str)); + g_string_append_len(str, GSTR_LEN(vr->coninfo->remote_addr_str)); break; case AL_FORMAT_LOCAL_ADDR: - g_string_append_len(str, GSTR_LEN(vr->con->local_addr_str)); + g_string_append_len(str, GSTR_LEN(vr->coninfo->local_addr_str)); break; case AL_FORMAT_BYTES_RESPONSE: li_string_append_int(str, vr->vr_out->bytes_out); @@ -291,10 +291,10 @@ static GString *al_format_log(liVRequest *vr, al_data *ald, GArray *format) { g_string_append_c(str, '-'); break; case AL_FORMAT_LOCAL_PORT: - switch (vr->con->local_addr.addr->plain.sa_family) { - case AF_INET: li_string_append_int(str, ntohs(vr->con->local_addr.addr->ipv4.sin_port)); break; + switch (vr->coninfo->local_addr.addr->plain.sa_family) { + case AF_INET: li_string_append_int(str, ntohs(vr->coninfo->local_addr.addr->ipv4.sin_port)); break; #ifdef HAVE_IPV6 - case AF_INET6: li_string_append_int(str, ntohs(vr->con->local_addr.addr->ipv6.sin6_port)); break; + case AF_INET6: li_string_append_int(str, ntohs(vr->coninfo->local_addr.addr->ipv6.sin6_port)); break; #endif default: g_string_append_c(str, '-'); break; } @@ -350,18 +350,22 @@ static GString *al_format_log(liVRequest *vr, al_data *ald, GArray *format) { else g_string_append_c(str, '-'); break; - case AL_FORMAT_CONNECTION_STATUS: - /* was request completed? */ - if (vr->con->in->is_closed && vr->con->raw_out->is_closed && 0 == vr->con->raw_out->length) - g_string_append_c(str, 'X'); - else - g_string_append_c(str, vr->con->keep_alive ? '+' : '-'); + case AL_FORMAT_CONNECTION_STATUS: { + /* was request completed? */ + liConnection *con = li_connection_from_vrequest(vr); /* try to get a connection object */ + + if (con && (con->in->is_closed && con->raw_out->is_closed && 0 == con->raw_out->length)) { + g_string_append_c(str, 'X'); + } else { + g_string_append_c(str, vr->coninfo->keep_alive ? '+' : '-'); + } + } break; case AL_FORMAT_BYTES_IN: - li_string_append_int(str, vr->con->stats.bytes_in); + li_string_append_int(str, vr->coninfo->stats.bytes_in); break; case AL_FORMAT_BYTES_OUT: - li_string_append_int(str, vr->con->stats.bytes_out); + li_string_append_int(str, vr->coninfo->stats.bytes_out); break; default: /* not implemented: @@ -398,7 +402,7 @@ static void al_handle_vrclose(liVRequest *vr, liPlugin *p) { msg = al_format_log(vr, p->data, format); g_string_append_len(msg, CONST_STR_LEN("\r\n")); - li_log_write(vr->con->srv, log, msg); + li_log_write(vr->wrk->srv, log, msg); } diff --git a/src/modules/mod_debug.c b/src/modules/mod_debug.c index 802a562..02aaf84 100644 --- a/src/modules/mod_debug.c +++ b/src/modules/mod_debug.c @@ -100,10 +100,10 @@ static gpointer debug_collect_func(liWorker *wrk, gpointer fdata) { cd->con = c; cd->io_timeout_elem = c->io_timeout_elem; cd->fd = c->sock_watcher.fd; - cd->is_ssl = c->is_ssl; - cd->keep_alive = c->keep_alive; - cd->remote_addr_str = g_string_new_len(GSTR_LEN(c->remote_addr_str)); - cd->local_addr_str = g_string_new_len(GSTR_LEN(c->local_addr_str)); + cd->is_ssl = c->info.is_ssl; + cd->keep_alive = c->info.keep_alive; + cd->remote_addr_str = g_string_new_len(GSTR_LEN(c->info.remote_addr_str)); + cd->local_addr_str = g_string_new_len(GSTR_LEN(c->info.local_addr_str)); cd->host = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.host)); cd->path = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.path)); cd->query = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.query)); @@ -112,14 +112,14 @@ static gpointer debug_collect_func(liWorker *wrk, gpointer fdata) { cd->response_size = c->mainvr->out->bytes_out; cd->state = c->state; cd->ts_started = c->ts_started; - cd->bytes_in = c->stats.bytes_in; - cd->bytes_out = c->stats.bytes_out; - cd->bytes_in_5s_diff = c->stats.bytes_in_5s_diff; - cd->bytes_out_5s_diff = c->stats.bytes_out_5s_diff; + cd->bytes_in = c->info.stats.bytes_in; + cd->bytes_out = c->info.stats.bytes_out; + cd->bytes_in_5s_diff = c->info.stats.bytes_in_5s_diff; + cd->bytes_out_5s_diff = c->info.stats.bytes_out_5s_diff; if (job->detailed.remote_addr_str) { if (job->detailed.wrk_ndx == wrk->ndx && job->detailed.con_ndx == i && - job->detailed.fd == c->sock_watcher.fd && g_string_equal(job->detailed.remote_addr_str, c->remote_addr_str)) { + job->detailed.fd == c->sock_watcher.fd && g_string_equal(job->detailed.remote_addr_str, c->info.remote_addr_str)) { cd->detailed = g_string_sized_new(1023); g_string_append_printf(cd->detailed, "
connection* @ %p = {\n", (void*)cd->con);
diff --git a/src/modules/mod_fastcgi.c b/src/modules/mod_fastcgi.c
index 5338c0d..2031d3e 100644
--- a/src/modules/mod_fastcgi.c
+++ b/src/modules/mod_fastcgi.c
@@ -325,18 +325,18 @@ static void fastcgi_env_add(GByteArray *buf, liEnvironmentDup *envdup, const gch
 }
 
 static void fastcgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray* buf) {
-	liConnection *con = vr->con;
-	GString *tmp = con->wrk->tmp_str;
+	liConInfo *coninfo = vr->coninfo;
+	GString *tmp = vr->wrk->tmp_str;
 
 	fastcgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_SOFTWARE"), GSTR_LEN(CORE_OPTIONPTR(LI_CORE_OPTION_SERVER_TAG).string));
 	fastcgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_NAME"), GSTR_LEN(vr->request.uri.host));
 	fastcgi_env_add(buf, envdup, CONST_STR_LEN("GATEWAY_INTERFACE"), CONST_STR_LEN("CGI/1.1"));
 	{
 		guint port = 0;
-		switch (con->local_addr.addr->plain.sa_family) {
-		case AF_INET: port = con->local_addr.addr->ipv4.sin_port; break;
+		switch (coninfo->local_addr.addr->plain.sa_family) {
+		case AF_INET: port = coninfo->local_addr.addr->ipv4.sin_port; break;
 #ifdef HAVE_IPV6
-		case AF_INET6: port = con->local_addr.addr->ipv6.sin6_port; break;
+		case AF_INET6: port = coninfo->local_addr.addr->ipv6.sin6_port; break;
 #endif
 		}
 		if (port) {
@@ -344,14 +344,14 @@ static void fastcgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteAr
 			fastcgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_PORT"), GSTR_LEN(tmp));
 		}
 	}
-	fastcgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_ADDR"), GSTR_LEN(con->local_addr_str));
+	fastcgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_ADDR"), GSTR_LEN(coninfo->local_addr_str));
 
 	{
 		guint port = 0;
-		switch (con->remote_addr.addr->plain.sa_family) {
-		case AF_INET: port = con->remote_addr.addr->ipv4.sin_port; break;
+		switch (coninfo->remote_addr.addr->plain.sa_family) {
+		case AF_INET: port = coninfo->remote_addr.addr->ipv4.sin_port; break;
 #ifdef HAVE_IPV6
-		case AF_INET6: port = con->remote_addr.addr->ipv6.sin6_port; break;
+		case AF_INET6: port = coninfo->remote_addr.addr->ipv6.sin6_port; break;
 #endif
 		}
 		if (port) {
@@ -359,7 +359,7 @@ static void fastcgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteAr
 			fastcgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_PORT"), GSTR_LEN(tmp));
 		}
 	}
-	fastcgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_ADDR"), GSTR_LEN(con->remote_addr_str));
+	fastcgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_ADDR"), GSTR_LEN(coninfo->remote_addr_str));
 
 	if (vr->request.content_length > 0) {
 		g_string_printf(tmp, "%" L_GOFFSET_MODIFIER "i", vr->request.content_length);
@@ -397,7 +397,7 @@ static void fastcgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteAr
 		break;
 	}
 
-	if (con->is_ssl) {
+	if (coninfo->is_ssl) {
 		fastcgi_env_add(buf, envdup, CONST_STR_LEN("HTTPS"), CONST_STR_LEN("on"));
 	}
 }
@@ -583,10 +583,6 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				/* TODO: aio */
-				li_ev_io_rem_events(loop, w, EV_READ);
-				break;
 			}
 		}
 	}
@@ -609,10 +605,6 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				li_ev_io_rem_events(loop, w, EV_WRITE);
-				/* TODO: aio */
-				break;
 			}
 		}
 		if (fcon->fcgi_out->length == 0) {
diff --git a/src/modules/mod_limit.c b/src/modules/mod_limit.c
index 7e54727..82ab2f1 100644
--- a/src/modules/mod_limit.c
+++ b/src/modules/mod_limit.c
@@ -173,6 +173,7 @@ static void mod_limit_vrclose(liVRequest *vr, liPlugin *p) {
 	mod_limit_context *ctx;
 	guint i;
 	gint cons;
+	liSocketAddress remote_addr = vr->coninfo->remote_addr;
 
 	if (!arr)
 		return;
@@ -186,12 +187,12 @@ static void mod_limit_vrclose(liVRequest *vr, liPlugin *p) {
 			g_atomic_int_add(&ctx->refcount, -1);
 			break;
 		case ML_TYPE_CON_IP:
-			cons = GPOINTER_TO_INT(li_radixtree_lookup_exact(ctx->pool.con_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len));
+			cons = GPOINTER_TO_INT(li_radixtree_lookup_exact(ctx->pool.con_ip, remote_addr.addr, remote_addr.len));
 			cons--;
 			if (!cons) {
-				li_radixtree_remove(ctx->pool.con_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len);
+				li_radixtree_remove(ctx->pool.con_ip, remote_addr.addr, remote_addr.len);
 			} else {
-				li_radixtree_insert(ctx->pool.con_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len, GINT_TO_POINTER(cons));
+				li_radixtree_insert(ctx->pool.con_ip, remote_addr.addr, remote_addr.len, GINT_TO_POINTER(cons));
 			}
 			g_atomic_int_add(&ctx->refcount, -1);
 			break;
@@ -211,6 +212,7 @@ static liHandlerResult mod_limit_action_handle(liVRequest *vr, gpointer param, g
 	GPtrArray *arr = g_ptr_array_index(vr->plugin_ctx, ctx->plugin->id);
 	gint cons;
 	mod_limit_req_ip_data *rid;
+	liSocketAddress remote_addr = vr->coninfo->remote_addr;
 
 	UNUSED(context);
 
@@ -237,9 +239,9 @@ static liHandlerResult mod_limit_action_handle(liVRequest *vr, gpointer param, g
 		break;
 	case ML_TYPE_CON_IP:
 		g_mutex_lock(ctx->mutex);
-		cons = GPOINTER_TO_INT(li_radixtree_lookup_exact(ctx->pool.con_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len));
+		cons = GPOINTER_TO_INT(li_radixtree_lookup_exact(ctx->pool.con_ip, remote_addr.addr, remote_addr.len));
 		if (cons < ctx->limit) {
-			li_radixtree_insert(ctx->pool.con_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len, GINT_TO_POINTER(cons+1));
+			li_radixtree_insert(ctx->pool.con_ip, remote_addr.addr, remote_addr.len, GINT_TO_POINTER(cons+1));
 			g_atomic_int_inc(&ctx->refcount);
 		} else {
 			limit_reached = TRUE;
@@ -267,15 +269,15 @@ static liHandlerResult mod_limit_action_handle(liVRequest *vr, gpointer param, g
 		break;
 	case ML_TYPE_REQ_IP:
 		g_mutex_lock(ctx->mutex);
-		rid = li_radixtree_lookup_exact(ctx->pool.req_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len);
+		rid = li_radixtree_lookup_exact(ctx->pool.req_ip, remote_addr.addr, remote_addr.len);
 		if (!rid) {
 			/* IP not known */
 			rid = g_slice_new0(mod_limit_req_ip_data);
 			rid->requests = 1;
-			rid->ip = li_sockaddr_dup(vr->con->remote_addr);
+			rid->ip = li_sockaddr_dup(remote_addr);
 			rid->ctx = ctx;
 			rid->timeout_elem.data = rid;
-			li_radixtree_insert(ctx->pool.req_ip, vr->con->remote_addr.addr, vr->con->remote_addr.len, rid);
+			li_radixtree_insert(ctx->pool.req_ip, remote_addr.addr, remote_addr.len, rid);
 			li_waitqueue_push(&(((mod_limit_data*)ctx->plugin->data)->timeout_queues[vr->wrk->ndx]), &rid->timeout_elem);
 			g_atomic_int_inc(&ctx->refcount);
 		} else if (rid->requests < ctx->limit) {
diff --git a/src/modules/mod_openssl.c b/src/modules/mod_openssl.c
index d596934..6723422 100644
--- a/src/modules/mod_openssl.c
+++ b/src/modules/mod_openssl.c
@@ -64,7 +64,7 @@ static gboolean openssl_con_new(liConnection *con) {
 	}
 
 	con->srv_sock_data = conctx;
-	con->is_ssl = TRUE;
+	con->info.is_ssl = TRUE;
 
 	return TRUE;
 
@@ -90,7 +90,7 @@ static void openssl_con_close(liConnection *con) {
 	}
 
 	con->srv_sock_data = NULL;
-	con->is_ssl = FALSE;
+	con->info.is_ssl = FALSE;
 
 	g_slice_free(openssl_connection_ctx, conctx);
 }
diff --git a/src/modules/mod_progress.c b/src/modules/mod_progress.c
index 07ba440..a680ff7 100644
--- a/src/modules/mod_progress.c
+++ b/src/modules/mod_progress.c
@@ -148,8 +148,8 @@ static void progress_vrclose(liVRequest *vr, liPlugin *p) {
 		node->vr = NULL;
 		node->request_size = vr->request.content_length;
 		node->response_size = vr->out->bytes_out;
-		node->bytes_in = vr->con->in->bytes_in;
-		node->bytes_out = MAX(0, vr->con->out->bytes_out - vr->con->raw_out->length);
+		node->bytes_in = vr->vr_in->bytes_in;
+		node->bytes_out = MAX(0, vr->vr_out->bytes_out - vr->coninfo->out_queue_length);
 		node->status_code = vr->response.http_status;
 		li_waitqueue_push(&progress_data.timeout_queues[vr->wrk->ndx], &(node->timeout_queue_elem));
 	}
@@ -217,8 +217,8 @@ static gpointer progress_collect_func(liWorker *wrk, gpointer fdata) {
 		node_new->vr = node->vr;
 		node_new->request_size = node->vr->request.content_length;
 		node_new->response_size = node->vr->out->bytes_out;
-		node_new->bytes_in = node->vr->con->in->bytes_in;
-		node_new->bytes_out = MAX(0, node->vr->con->out->bytes_out - node->vr->con->raw_out->length);
+		node_new->bytes_in = node->vr->vr_in->bytes_in;
+		node_new->bytes_out = MAX(0, node->vr->vr_out->bytes_out - node->vr->coninfo->out_queue_length);
 		node_new->status_code = node->vr->response.http_status;
 	} else {
 		/* copy dead data */
diff --git a/src/modules/mod_proxy.c b/src/modules/mod_proxy.c
index 4e7bfa4..c128e07 100644
--- a/src/modules/mod_proxy.c
+++ b/src/modules/mod_proxy.c
@@ -261,10 +261,6 @@ static void proxy_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				/* TODO: aio */
-				li_ev_io_rem_events(loop, w, EV_READ);
-				break;
 			}
 		}
 	}
@@ -287,10 +283,6 @@ static void proxy_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				li_ev_io_rem_events(loop, w, EV_WRITE);
-				/* TODO: aio */
-				break;
 			}
 		}
 		if (pcon->proxy_out->length == 0) {
diff --git a/src/modules/mod_redirect.c b/src/modules/mod_redirect.c
index 6568f43..94b13ca 100644
--- a/src/modules/mod_redirect.c
+++ b/src/modules/mod_redirect.c
@@ -298,10 +298,10 @@ static gboolean redirect_internal(liVRequest *vr, GString *dest, redirect_rule *
 		case REDIRECT_PART_VAR:
 
 			switch (rp->data.cond_lval) {
-			case LI_COMP_REQUEST_LOCALIP: str = vr->con->local_addr_str; break;
-			case LI_COMP_REQUEST_REMOTEIP: str = vr->con->remote_addr_str; break;
+			case LI_COMP_REQUEST_LOCALIP: str = vr->coninfo->local_addr_str; break;
+			case LI_COMP_REQUEST_REMOTEIP: str = vr->coninfo->remote_addr_str; break;
 			case LI_COMP_REQUEST_SCHEME:
-				if (vr->con->is_ssl)
+				if (vr->coninfo->is_ssl)
 					str_stack = li_const_gstring(CONST_STR_LEN("https"));
 				else
 					str_stack = li_const_gstring(CONST_STR_LEN("http"));
@@ -312,8 +312,8 @@ static gboolean redirect_internal(liVRequest *vr, GString *dest, redirect_rule *
 			case LI_COMP_REQUEST_QUERY_STRING: str = vr->request.uri.query; break;
 			case LI_COMP_REQUEST_METHOD: str = vr->request.http_method_str; break;
 			case LI_COMP_REQUEST_CONTENT_LENGTH:
-				g_string_printf(vr->con->wrk->tmp_str, "%"L_GOFFSET_FORMAT, vr->request.content_length);
-				str = vr->con->wrk->tmp_str;
+				g_string_printf(vr->wrk->tmp_str, "%"L_GOFFSET_FORMAT, vr->request.content_length);
+				str = vr->wrk->tmp_str;
 				break;
 			default: continue;
 			}
diff --git a/src/modules/mod_rewrite.c b/src/modules/mod_rewrite.c
index 53bd62a..26b2f41 100644
--- a/src/modules/mod_rewrite.c
+++ b/src/modules/mod_rewrite.c
@@ -282,10 +282,10 @@ static gboolean rewrite_internal(liVRequest *vr, GString *dest_path, GString *de
 		case REWRITE_PART_VAR:
 
 			switch (rp->data.cond_lval) {
-			case LI_COMP_REQUEST_LOCALIP: str = vr->con->local_addr_str; break;
-			case LI_COMP_REQUEST_REMOTEIP: str = vr->con->remote_addr_str; break;
+			case LI_COMP_REQUEST_LOCALIP: str = vr->coninfo->local_addr_str; break;
+			case LI_COMP_REQUEST_REMOTEIP: str = vr->coninfo->remote_addr_str; break;
 			case LI_COMP_REQUEST_SCHEME:
-				if (vr->con->is_ssl)
+				if (vr->coninfo->is_ssl)
 					str_stack = li_const_gstring(CONST_STR_LEN("https"));
 				else
 					str_stack = li_const_gstring(CONST_STR_LEN("http"));
@@ -296,8 +296,8 @@ static gboolean rewrite_internal(liVRequest *vr, GString *dest_path, GString *de
 			case LI_COMP_REQUEST_QUERY_STRING: str = vr->request.uri.query; break;
 			case LI_COMP_REQUEST_METHOD: str = vr->request.http_method_str; break;
 			case LI_COMP_REQUEST_CONTENT_LENGTH:
-				g_string_printf(vr->con->wrk->tmp_str, "%"L_GOFFSET_FORMAT, vr->request.content_length);
-				str = vr->con->wrk->tmp_str;
+				g_string_printf(vr->wrk->tmp_str, "%"L_GOFFSET_FORMAT, vr->request.content_length);
+				str = vr->wrk->tmp_str;
 				break;
 			default: continue;
 			}
diff --git a/src/modules/mod_scgi.c b/src/modules/mod_scgi.c
index 38b446c..9642cae 100644
--- a/src/modules/mod_scgi.c
+++ b/src/modules/mod_scgi.c
@@ -167,8 +167,8 @@ static void scgi_env_add(GByteArray *buf, liEnvironmentDup *envdup, const gchar
 }
 
 static void scgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray* buf) {
-	liConnection *con = vr->con;
-	GString *tmp = con->wrk->tmp_str;
+	liConInfo *coninfo = vr->coninfo;
+	GString *tmp = vr->wrk->tmp_str;
 
 	g_assert(vr->request.content_length >= 0);
 
@@ -185,10 +185,10 @@ static void scgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray
 	scgi_env_add(buf, envdup, CONST_STR_LEN("GATEWAY_INTERFACE"), CONST_STR_LEN("CGI/1.1"));
 	{
 		guint port = 0;
-		switch (con->local_addr.addr->plain.sa_family) {
-		case AF_INET: port = con->local_addr.addr->ipv4.sin_port; break;
+		switch (coninfo->local_addr.addr->plain.sa_family) {
+		case AF_INET: port = coninfo->local_addr.addr->ipv4.sin_port; break;
 #ifdef HAVE_IPV6
-		case AF_INET6: port = con->local_addr.addr->ipv6.sin6_port; break;
+		case AF_INET6: port = coninfo->local_addr.addr->ipv6.sin6_port; break;
 #endif
 		}
 		if (port) {
@@ -196,14 +196,14 @@ static void scgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray
 			scgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_PORT"), GSTR_LEN(tmp));
 		}
 	}
-	scgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_ADDR"), GSTR_LEN(con->local_addr_str));
+	scgi_env_add(buf, envdup, CONST_STR_LEN("SERVER_ADDR"), GSTR_LEN(coninfo->local_addr_str));
 
 	{
 		guint port = 0;
-		switch (con->remote_addr.addr->plain.sa_family) {
-		case AF_INET: port = con->remote_addr.addr->ipv4.sin_port; break;
+		switch (coninfo->remote_addr.addr->plain.sa_family) {
+		case AF_INET: port = coninfo->remote_addr.addr->ipv4.sin_port; break;
 #ifdef HAVE_IPV6
-		case AF_INET6: port = con->remote_addr.addr->ipv6.sin6_port; break;
+		case AF_INET6: port = coninfo->remote_addr.addr->ipv6.sin6_port; break;
 #endif
 		}
 		if (port) {
@@ -211,7 +211,7 @@ static void scgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray
 			scgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_PORT"), GSTR_LEN(tmp));
 		}
 	}
-	scgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_ADDR"), GSTR_LEN(con->remote_addr_str));
+	scgi_env_add(buf, envdup, CONST_STR_LEN("REMOTE_ADDR"), GSTR_LEN(coninfo->remote_addr_str));
 
 	scgi_env_add(buf, envdup, CONST_STR_LEN("SCRIPT_NAME"), GSTR_LEN(vr->request.uri.path));
 
@@ -244,7 +244,7 @@ static void scgi_env_create(liVRequest *vr, liEnvironmentDup *envdup, GByteArray
 		break;
 	}
 
-	if (con->is_ssl) {
+	if (coninfo->is_ssl) {
 		scgi_env_add(buf, envdup, CONST_STR_LEN("HTTPS"), CONST_STR_LEN("on"));
 	}
 }
@@ -347,10 +347,6 @@ static void scgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				/* TODO: aio */
-				li_ev_io_rem_events(loop, w, EV_READ);
-				break;
 			}
 		}
 	}
@@ -373,10 +369,6 @@ static void scgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
 				break;
 			case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
 				break;
-			case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
-				li_ev_io_rem_events(loop, w, EV_WRITE);
-				/* TODO: aio */
-				break;
 			}
 		}
 		if (scon->scgi_out->length == 0) {
diff --git a/src/modules/mod_status.c b/src/modules/mod_status.c
index 94f2bc6..84f7aad 100644
--- a/src/modules/mod_status.c
+++ b/src/modules/mod_status.c
@@ -397,10 +397,10 @@ static gpointer status_collect_func(liWorker *wrk, gpointer fdata) {
 	for (guint i = 0; i < wrk->connections_active; i++) {
 		liConnection *c = g_array_index(wrk->connections, liConnection*, i);
 		mod_status_con_data *cd = &g_array_index(sd->connections, mod_status_con_data, i);
-		cd->is_ssl = c->is_ssl;
-		cd->keep_alive = c->keep_alive;
-		cd->remote_addr_str = g_string_new_len(GSTR_LEN(c->remote_addr_str));
-		cd->local_addr_str = g_string_new_len(GSTR_LEN(c->local_addr_str));
+		cd->is_ssl = c->info.is_ssl;
+		cd->keep_alive = c->info.keep_alive;
+		cd->remote_addr_str = g_string_new_len(GSTR_LEN(c->info.remote_addr_str));
+		cd->local_addr_str = g_string_new_len(GSTR_LEN(c->info.local_addr_str));
 		cd->host = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.host));
 		cd->path = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.path));
 		cd->query = g_string_new_len(GSTR_LEN(c->mainvr->request.uri.query));
@@ -408,10 +408,10 @@ static gpointer status_collect_func(liWorker *wrk, gpointer fdata) {
 		cd->request_size = c->mainvr->request.content_length;
 		cd->response_size = c->mainvr->out->bytes_out;
 		cd->state = c->state;
-		cd->bytes_in = c->stats.bytes_in;
-		cd->bytes_out = c->stats.bytes_out;
-		cd->bytes_in_5s_diff = c->stats.bytes_in_5s_diff;
-		cd->bytes_out_5s_diff = c->stats.bytes_out_5s_diff;
+		cd->bytes_in = c->info.stats.bytes_in;
+		cd->bytes_out = c->info.stats.bytes_out;
+		cd->bytes_in_5s_diff = c->info.stats.bytes_in_5s_diff;
+		cd->bytes_out_5s_diff = c->info.stats.bytes_out_5s_diff;
 
 		cd->ts_started = (guint64)(CUR_TS(wrk) - c->ts_started);