From 383fa6a4f5d6651285bedd47cc3a89bd46d8780e Mon Sep 17 00:00:00 2001 From: Thomas Porzelt Date: Thu, 16 Apr 2009 17:02:53 +0200 Subject: [PATCH] - introduce throttling via pools, rework throttling by connection - new actions: 'throttle_pool', 'throttle_connection' and 'status' - don't start waitqueue timer in waitqueue_init(); start/stop timer on demand - new parameter for network_write(): write_max - move stats and timeout handling from network_write/read() to connection_cb() --- include/lighttpd/base.h | 1 + include/lighttpd/connection.h | 34 +++++-- include/lighttpd/network.h | 2 +- include/lighttpd/server.h | 2 + include/lighttpd/throttle.h | 26 +++++ include/lighttpd/waitqueue.h | 3 + src/CMakeLists.txt | 1 + src/connection.c | 174 ++++++++++++++++++++++++++++---- src/modules/mod_dirlist.c | 1 + src/modules/mod_fastcgi.c | 2 +- src/network.c | 64 +----------- src/plugin_core.c | 184 +++++++++++++++++++++++++++++++++- src/server.c | 12 +++ src/stat_cache.c | 1 - src/throttle.c | 138 +++++++++++++++++++++++++ src/waitqueue.c | 50 +++++++-- src/worker.c | 24 +---- src/wscript | 1 + 18 files changed, 593 insertions(+), 127 deletions(-) create mode 100644 include/lighttpd/throttle.h create mode 100644 src/throttle.c diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index 666ec95..48d1e44 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -45,6 +45,7 @@ #include #include #include +#include #include diff --git a/include/lighttpd/connection.h b/include/lighttpd/connection.h index 5ae8d39..409657a 100644 --- a/include/lighttpd/connection.h +++ b/include/lighttpd/connection.h @@ -58,22 +58,38 @@ struct connection { waitqueue_elem io_timeout_elem; /* I/O throttling */ + gboolean throttled; /* TRUE if connection is throttled */ struct { - waitqueue_elem queue_elem; - guint magazine; - ev_tstamp ts; + struct { + throttle_pool_t *ptr; /* NULL if not in any throttling pool */ + GList lnk; + gboolean queued; + gint magazine; + } pool; + struct { + throttle_pool_t *ptr; /* pool for per-ip throttling, NULL if not limited by ip */ + GList lnk; + gboolean queued; + gint magazine; + } ip; + struct { + guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */ + gint magazine; + ev_tstamp last_update; + } con; + waitqueue_elem wqueue_elem; } throttle; ev_tstamp ts; struct { - guint64 bytes_in; - guint64 bytes_out; + guint64 bytes_in; /* total number of bytes received */ + guint64 bytes_out; /* total number of bytes sent */ ev_tstamp last_avg; - guint64 bytes_in_5s; - guint64 bytes_out_5s; - guint64 bytes_in_5s_diff; - guint64 bytes_out_5s_diff; + guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */ + guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */ + guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */ + guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */ } stats; }; diff --git a/include/lighttpd/network.h b/include/lighttpd/network.h index 29c8344..bc60811 100644 --- a/include/lighttpd/network.h +++ b/include/lighttpd/network.h @@ -24,7 +24,7 @@ LI_API ssize_t net_write(int fd, void *buf, ssize_t nbyte); /** repeats read after EINTR */ LI_API ssize_t net_read(int fd, void *buf, ssize_t nbyte); -LI_API network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq); +LI_API network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq, goffset write_max); LI_API network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq); /* use writev for mem chunks, buffered read/write for files */ diff --git a/include/lighttpd/server.h b/include/lighttpd/server.h index 1e82f02..9c0df16 100644 --- a/include/lighttpd/server.h +++ b/include/lighttpd/server.h @@ -76,6 +76,8 @@ struct server { gdouble io_timeout; + GArray *throttle_pools; + gdouble stat_cache_ttl; }; diff --git a/include/lighttpd/throttle.h b/include/lighttpd/throttle.h new file mode 100644 index 0000000..8ddc066 --- /dev/null +++ b/include/lighttpd/throttle.h @@ -0,0 +1,26 @@ +#ifndef _LIGHTTPD_THROTTLE_H_ +#define _LIGHTTPD_THROTTLE_H_ + +#define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */ + +struct throttle_pool_t { + GString *name; + guint rate; /** bytes/s */ + gint magazine; + GQueue** queues; /** worker specific queues. each worker has 2 */ + guint* current_queue; + guint num_cons; + + gint rearming; + ev_tstamp last_pool_rearm; + ev_tstamp *last_con_rearm; +}; + +typedef struct throttle_pool_t throttle_pool_t; + +void throttle_cb(struct ev_loop *loop, ev_timer *w, int revents); + +throttle_pool_t *throttle_pool_new(server *srv, GString *name, guint rate); +void throttle_pool_free(server *srv, throttle_pool_t *pool); + +#endif diff --git a/include/lighttpd/waitqueue.h b/include/lighttpd/waitqueue.h index cbc3129..14dc980 100644 --- a/include/lighttpd/waitqueue.h +++ b/include/lighttpd/waitqueue.h @@ -42,6 +42,9 @@ LI_API void waitqueue_push(waitqueue *queue, waitqueue_elem *elem); /* pops the first ready! element from the queue or NULL if none ready yet. this should be called in your callback */ LI_API waitqueue_elem *waitqueue_pop(waitqueue *queue); +/* pops all elements from the queue that are ready or NULL of none ready yet. returns number of elements pop()ed and saves old head in '*head' */ +LI_API guint waitqueue_pop_ready(waitqueue *queue, waitqueue_elem **head); + /* removes an element from the queue */ LI_API void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 87892ce..ca6861f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -298,6 +298,7 @@ SET(COMMON_SRC stat_cache.c sys-files.c sys-socket.c + throttle.c url_parser.c utils.c value.c diff --git a/src/connection.c b/src/connection.c index 531b4e4..c5fe65f 100644 --- a/src/connection.c +++ b/src/connection.c @@ -192,6 +192,7 @@ static gboolean connection_handle_read(connection *con) { con->expect_100_cont = FALSE; ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE); } + con->state = CON_STATE_HANDLE_MAINVR; action_enter(con->mainvr, con->srv->mainaction); vrequest_handle_request_headers(con->mainvr); @@ -204,6 +205,9 @@ static gboolean connection_handle_read(connection *con) { } static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { + network_status_t res; + goffset write_max; + goffset transferred; connection *con = (connection*) w->data; if (revents & EV_READ) { @@ -211,7 +215,15 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { /* don't read the next request before current one is done */ ev_io_rem_events(loop, w, EV_READ); } else { - switch (network_read(con->mainvr, w->fd, con->raw_in)) { + transferred = con->raw_in->length; + + res = network_read(con->mainvr, w->fd, con->raw_in); + + transferred = con->raw_in->length - transferred; + con->wrk->stats.bytes_in += transferred; + con->stats.bytes_in += transferred; + + switch (res) { case NETWORK_STATUS_SUCCESS: if (!connection_handle_read(con)) return; break; @@ -236,24 +248,67 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { if (revents & EV_WRITE) { if (con->raw_out->length > 0) { - switch (network_write(con->mainvr, w->fd, con->raw_out)) { - case NETWORK_STATUS_SUCCESS: - vrequest_joblist_append(con->mainvr); - break; - case NETWORK_STATUS_FATAL_ERROR: - _ERROR(con->srv, con->mainvr, "%s", "network write fatal error"); - connection_error(con); - return; - case NETWORK_STATUS_CONNECTION_CLOSE: - connection_close(con); - return; - case NETWORK_STATUS_WAIT_FOR_EVENT: - break; - case NETWORK_STATUS_WAIT_FOR_AIO_EVENT: - ev_io_rem_events(loop, w, EV_WRITE); - _ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio"); - /* TODO: aio */ - break; + if (con->throttled) { + write_max = MIN(con->throttle.con.magazine, 256*1024); + } else { + write_max = 256*1024; /* 256kB */ + } + + if (write_max > 0) { + transferred = con->raw_out->length; + + res = network_write(con->mainvr, w->fd, con->raw_out, write_max); + + transferred = transferred - con->raw_out->length; + con->wrk->stats.bytes_out += transferred; + con->stats.bytes_out += transferred; + + switch (res) { + case NETWORK_STATUS_SUCCESS: + vrequest_joblist_append(con->mainvr); + break; + case NETWORK_STATUS_FATAL_ERROR: + _ERROR(con->srv, con->mainvr, "%s", "network write fatal error"); + connection_error(con); + return; + case NETWORK_STATUS_CONNECTION_CLOSE: + connection_close(con); + return; + case NETWORK_STATUS_WAIT_FOR_EVENT: + break; + case NETWORK_STATUS_WAIT_FOR_AIO_EVENT: + ev_io_rem_events(loop, w, EV_WRITE); + _ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio"); + /* TODO: aio */ + break; + } + } else { + transferred = 0; + } + + if ((ev_now(loop) - con->stats.last_avg) >= 5.0) { + con->stats.bytes_out_5s_diff = con->wrk->stats.bytes_out - con->wrk->stats.bytes_out_5s; + con->stats.bytes_out_5s = con->stats.bytes_out; + con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s; + con->stats.bytes_in_5s = con->stats.bytes_in; + con->stats.last_avg = ev_now(loop); + } + + if (con->throttled) { + con->throttle.con.magazine -= transferred; + /*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con, + transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/ + if (con->throttle.con.magazine <= 0) { + ev_io_rem_events(loop, w, EV_WRITE); + waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem); + } + + if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queued) { + throttle_pool_t *pool = con->throttle.pool.ptr; + g_atomic_int_inc(&pool->num_cons); + g_queue_push_tail_link(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk); + con->throttle.pool.queued = TRUE; + } } } else { _DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue"); @@ -261,6 +316,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { } } + if ((con->io_timeout_elem.ts + 1.0) < ev_now(loop)) + waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem); + check_response_done(con); } @@ -350,7 +408,10 @@ connection* connection_new(worker *wrk) { con->keep_alive_data.watcher.data = con; con->io_timeout_elem.data = con; - con->throttle.queue_elem.data = con; + + con->throttle.wqueue_elem.data = con; + con->throttle.pool.lnk.data = con; + con->throttle.ip.lnk.data = con; return con; } @@ -406,8 +467,43 @@ void connection_reset(connection *con) { /* remove from timeout queue */ waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem); + /* remove from throttle queue */ - waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.queue_elem); + waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem); + + if (con->throttle.pool.ptr) { + if (con->throttle.pool.queued) { + throttle_pool_t *pool = con->throttle.pool.ptr; + g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk); + g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1); + con->throttle.pool.queued = FALSE; + } + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine); + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine); + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine); + con->throttle.pool.magazine = 0; + con->throttle.ip.magazine = 0; + con->throttle.con.magazine = 0; + con->throttle.pool.ptr = NULL; + } + + if (con->throttle.ip.ptr) { + if (con->throttle.ip.queued) { + throttle_pool_t *pool = con->throttle.ip.ptr; + g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk); + g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1); + con->throttle.ip.queued = FALSE; + } + g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine); + g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine); + con->throttle.ip.ptr = NULL; + } + + con->throttle.con.rate = 0; + con->throttle.pool.magazine = 0; + con->throttle.ip.magazine = 0; + con->throttle.con.magazine = 0; + con->throttled = FALSE; } void server_check_keepalive(server *srv); @@ -460,7 +556,41 @@ void connection_reset_keep_alive(connection *con) { /* remove from timeout queue */ waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem); /* remove from throttle queue */ - waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.queue_elem); + waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem); + + if (con->throttle.pool.ptr) { + if (con->throttle.pool.queued) { + throttle_pool_t *pool = con->throttle.pool.ptr; + g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk); + g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1); + con->throttle.pool.queued = FALSE; + } + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine); + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine); + g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine); + con->throttle.pool.magazine = 0; + con->throttle.ip.magazine = 0; + con->throttle.con.magazine = 0; + con->throttle.pool.ptr = NULL; + } + + if (con->throttle.ip.ptr) { + if (con->throttle.ip.queued) { + throttle_pool_t *pool = con->throttle.ip.ptr; + g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk); + g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1); + con->throttle.ip.queued = FALSE; + } + g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine); + g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine); + con->throttle.ip.ptr = NULL; + } + + con->throttle.con.rate = 0; + con->throttle.pool.magazine = 0; + con->throttle.ip.magazine = 0; + con->throttle.con.magazine = 0; + con->throttled = FALSE; } void connection_free(connection *con) { diff --git a/src/modules/mod_dirlist.c b/src/modules/mod_dirlist.c index cb438f5..ff39324 100644 --- a/src/modules/mod_dirlist.c +++ b/src/modules/mod_dirlist.c @@ -36,6 +36,7 @@ * xyz * * Todo: + * - make output generating "async", give up control every N entries * - filters for entries (pattern, regex) * - include-* parameters * - javascript for sorting diff --git a/src/modules/mod_fastcgi.c b/src/modules/mod_fastcgi.c index 845a3f0..1c53e31 100644 --- a/src/modules/mod_fastcgi.c +++ b/src/modules/mod_fastcgi.c @@ -596,7 +596,7 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) { if (fcon->fd != -1 && (revents & EV_WRITE)) { if (fcon->fcgi_out->length > 0) { - switch (network_write(fcon->vr, w->fd, fcon->fcgi_out)) { + switch (network_write(fcon->vr, w->fd, fcon->fcgi_out, 256*1024)) { case NETWORK_STATUS_SUCCESS: break; case NETWORK_STATUS_FATAL_ERROR: diff --git a/src/network.c b/src/network.c index adb34ad..5553b12 100644 --- a/src/network.c +++ b/src/network.c @@ -36,26 +36,12 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) { return r; } -network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) { +network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq, goffset write_max) { network_status_t res; - ev_tstamp ts, now = CUR_TS(vr->wrk); - worker *wrk; #ifdef TCP_CORK int corked = 0; #endif - goffset write_max = 256*1024, write_bytes, wrote; /* 256 kb */ - - if (CORE_OPTION(CORE_OPTION_THROTTLE).number) { - /* throttling is enabled */ - if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->wrk->throttle_queue.delay)) { - vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts); - if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number) - vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number; - vr->con->throttle.ts = now; - /*g_print("throttle magazine: %u kbytes\n", vr->con->throttle.magazine / 1024);*/ - } - write_max = vr->con->throttle.magazine; - } + goffset write_bytes, wrote; #ifdef TCP_CORK /* Linux: put a cork into the socket as we want to combine the write() calls @@ -84,33 +70,6 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) { } #endif - /* stats */ - wrk = vr->wrk; - wrk->stats.bytes_out += wrote; - vr->con->stats.bytes_out += wrote; - - /* update 5s stats */ - ts = CUR_TS(wrk); - - if ((ts - vr->con->stats.last_avg) >= 5.0) { - vr->con->stats.bytes_out_5s_diff = vr->wrk->stats.bytes_out - vr->wrk->stats.bytes_out_5s; - vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out; - vr->con->stats.last_avg = ts; - } - - /* only update once a second, the cast is to round the timestamp */ - if ((vr->con->io_timeout_elem.ts + 1.) < now) - waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem); - - vr->con->throttle.magazine = write_bytes; - /* check if throttle magazine is empty */ - if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_bytes == 0) { - /* remove EV_WRITE from sockwatcher for now */ - ev_io_rem_events(vr->wrk->loop, &vr->con->sock_watcher, EV_WRITE); - waitqueue_push(&vr->wrk->throttle_queue, &vr->con->throttle.queue_elem); - return NETWORK_STATUS_WAIT_FOR_AIO_EVENT; - } - return res; } @@ -119,8 +78,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { off_t max_read = 16 * blocksize; /* 256k */ ssize_t r; off_t len = 0; - worker *wrk = vr->wrk; - ev_tstamp now = CUR_TS(wrk); if (cq->limit && cq->limit->limit > 0) { if (max_read > cq->limit->limit - cq->limit->current) { @@ -132,10 +89,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { } } - /* only update once a second */ - if ((vr->con->io_timeout_elem.ts + 1.) < now) - waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem); - do { GByteArray *buf = g_byte_array_sized_new(blocksize); g_byte_array_set_size(buf, blocksize); @@ -160,19 +113,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) { g_byte_array_set_size(buf, r); chunkqueue_append_bytearr(cq, buf); len += r; - - /* stats */ - wrk = vr->wrk; - wrk->stats.bytes_in += r; - vr->con->stats.bytes_in += r; - - /* update 5s stats */ - - if ((now - vr->con->stats.last_avg) >= 5.0) { - vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s; - vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in; - vr->con->stats.last_avg = now; - } } while (r == blocksize && len < max_read); return NETWORK_STATUS_SUCCESS; diff --git a/src/plugin_core.c b/src/plugin_core.c index ed4b1c2..fadf9d8 100644 --- a/src/plugin_core.c +++ b/src/plugin_core.c @@ -292,6 +292,32 @@ static action* core_static(server *srv, plugin* p, value *val) { return action_new_function(core_handle_static, NULL, NULL, NULL); } + +static handler_t core_handle_status(vrequest *vr, gpointer param, gpointer *context) { + UNUSED(param); + UNUSED(context); + + vr->response.http_status = GPOINTER_TO_INT(param); + + return HANDLER_GO_ON; +} + +static action* core_status(server *srv, plugin* p, value *val) { + gpointer ptr; + + UNUSED(p); + + if (val || val->type != VALUE_NUMBER) { + ERROR(srv, "%s", "status action expects a number as parameter"); + return NULL; + } + + ptr = GINT_TO_POINTER((gint) value_extract(val).number); + + return action_new_function(core_handle_status, NULL, NULL, ptr); +} + + static void core_log_write_free(server *srv, gpointer param) { UNUSED(srv); g_string_free(param, TRUE); @@ -957,7 +983,7 @@ static action* core_buffer_out(server *srv, plugin* p, value *val) { UNUSED(p); if (val->type != VALUE_NUMBER) { - ERROR(srv, "'core_buffer_out' action expects an integer as parameter, %s given", value_type_string(val->type)); + ERROR(srv, "'buffer.out' action expects an integer as parameter, %s given", value_type_string(val->type)); return NULL; } @@ -990,7 +1016,7 @@ static action* core_buffer_in(server *srv, plugin* p, value *val) { UNUSED(p); if (val->type != VALUE_NUMBER) { - ERROR(srv, "'core_buffer_in' action expects an integer as parameter, %s given", value_type_string(val->type)); + ERROR(srv, "'buffer.in' action expects an integer as parameter, %s given", value_type_string(val->type)); return NULL; } @@ -1007,6 +1033,154 @@ static action* core_buffer_in(server *srv, plugin* p, value *val) { return action_new_function(core_handle_buffer_in, NULL, NULL, GINT_TO_POINTER((gint) limit)); } +static handler_t core_handle_throttle_pool(vrequest *vr, gpointer param, gpointer *context) { + throttle_pool_t *pool = param; + gint magazine; + + UNUSED(context); + + if (vr->con->throttle.pool.ptr != pool) { + if (vr->con->throttle.pool.ptr) { + /* connection has been in a different pool, give back bandwidth */ + g_atomic_int_add(&vr->con->throttle.pool.ptr->magazine, vr->con->throttle.pool.magazine); + vr->con->throttle.pool.magazine = 0; + if (vr->con->throttle.pool.queued) { + throttle_pool_t *p = vr->con->throttle.pool.ptr; + g_queue_unlink(p->queues[vr->con->wrk->ndx+p->current_queue[vr->con->wrk->ndx]], &vr->con->throttle.ip.lnk); + g_atomic_int_add(&p->num_cons, -1); + } + } + + /* try to steal some initial 4kbytes from the pool */ + while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) { + if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) { + vr->con->throttle.pool.magazine = 4*1024; + break; + } + } + } + + vr->con->throttle.pool.ptr = pool; + vr->con->throttled = TRUE; + + return HANDLER_GO_ON; +} + +static action* core_throttle_pool(server *srv, plugin* p, value *val) { + GString *name; + guint i; + throttle_pool_t *pool = NULL; + gint64 rate; + + UNUSED(p); + + if (val->type != VALUE_STRING && val->type != VALUE_LIST) { + ERROR(srv, "'throttle_pool' action expects a string or a string-number tuple as parameter, %s given", value_type_string(val->type)); + return NULL; + } + + if (val->type == VALUE_LIST) { + if (val->data.list->len != 2 + || g_array_index(val->data.list, value*, 0)->type != VALUE_STRING + || g_array_index(val->data.list, value*, 1)->type != VALUE_NUMBER) { + + ERROR(srv, "%s", "'throttle_pool' action expects a string or a string-number tuple as parameter"); + return NULL; + } + + name = g_array_index(val->data.list, value*, 0)->data.string; + rate = g_array_index(val->data.list, value*, 1)->data.number; + + if (rate && rate < (32*1024)) { + ERROR(srv, "throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate); + return NULL; + } + + if (rate > (0xFFFFFFFF)) { + ERROR(srv, "throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate); + return NULL; + } + } else { + name = val->data.string; + rate = 0; + } + + for (i = 0; i < srv->throttle_pools->len; i++) { + if (g_string_equal(g_array_index(srv->throttle_pools, throttle_pool_t*, i)->name, name)) { + /* pool already defined */ + if (val->type == VALUE_LIST && g_array_index(srv->throttle_pools, throttle_pool_t*, i)->rate != (guint)rate) { + ERROR(srv, "throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", name->str, + g_array_index(srv->throttle_pools, throttle_pool_t*, i)->rate); + return NULL; + } + + pool = g_array_index(srv->throttle_pools, throttle_pool_t*, i); + break; + } + } + + if (!pool) { + /* pool not yet defined */ + if (val->type == VALUE_STRING) { + ERROR(srv, "throttle_pool: rate for pool '%s' hasn't been defined", name->str); + return NULL; + } + + pool = throttle_pool_new(srv, value_extract(g_array_index(val->data.list, value*, 0)).string, (guint)rate); + g_array_append_val(srv->throttle_pools, pool); + } + + return action_new_function(core_handle_throttle_pool, NULL, NULL, pool); +} + + +static handler_t core_handle_throttle_connection(vrequest *vr, gpointer param, gpointer *context) { + gint supply; + connection *con = vr->con; + guint rate = GPOINTER_TO_UINT(param); + + UNUSED(context); + + con->throttle.con.rate = rate; + con->throttled = TRUE; + + if (con->throttle.pool.magazine) { + suply = MAX(con->throttle.pool.magazine, rate * THROTTLE_GRANULARITY); + con->throttle.con.magazine += supply; + con->throttle.pool.magazine -= supply; + } + + return HANDLER_GO_ON; +} + +static action* core_throttle_connection(server *srv, plugin* p, value *val) { + gint64 rate; + UNUSED(p); + + if (val->type != VALUE_NUMBER) { + ERROR(srv, "'throttle_connection' action expects a positiv integer as parameter, %s given", value_type_string(val->type)); + return NULL; + } + + rate = val->data.number; + + if (rate < 0) { + rate = 0; /* no limit */ + } + + if (rate && rate < (32*1024)) { + ERROR(srv, "throttle_connection: rate %"G_GUINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate); + return NULL; + } + + if (rate > (0xFFFFFFFF)) { + ERROR(srv, "throttle_connection: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate); + return NULL; + } + + return action_new_function(core_handle_throttle_connection, NULL, NULL, GUINT_TO_POINTER((guint) rate)); +} + static const plugin_option options[] = { { "debug.log_request_handling", VALUE_BOOLEAN, GINT_TO_POINTER(FALSE), NULL, NULL }, @@ -1038,6 +1212,8 @@ static const plugin_action actions[] = { { "docroot", core_docroot }, { "static", core_static }, + { "status", core_status }, + { "log.write", core_log_write }, { "blank", core_blank }, @@ -1054,6 +1230,10 @@ static const plugin_action actions[] = { { "buffer.out", core_buffer_out }, { "buffer.in", core_buffer_in }, + { "throttle_pool", core_throttle_pool }, + /*{ "throttle.ip", core_throttle_ip },*/ + { "throttle_connection", core_throttle_connection }, + { NULL, NULL } }; diff --git a/src/server.c b/src/server.c index 4829433..62c0af9 100644 --- a/src/server.c +++ b/src/server.c @@ -87,6 +87,7 @@ server* server_new(const gchar *module_dir) { srv->state = SERVER_STARTING; srv->workers = g_array_new(FALSE, TRUE, sizeof(worker*)); + srv->worker_count = 1; srv->sockets = g_ptr_array_new(); @@ -109,6 +110,8 @@ server* server_new(const gchar *module_dir) { /* error log ts format */ server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT")); + srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(throttle_pool_t*)); + log_init(srv); srv->io_timeout = 300; /* default I/O timeout */ @@ -135,6 +138,15 @@ void server_free(server* srv) { action_release(srv, srv->mainaction); + /* free throttle pools */ + { + guint i; + for (i = 0; i < srv->throttle_pools->len; i++) { + throttle_pool_free(srv, g_array_index(srv->throttle_pools, throttle_pool_t*, i)); + } + g_array_free(srv->throttle_pools, TRUE); + } + /* free all workers */ { guint i; diff --git a/src/stat_cache.c b/src/stat_cache.c index dfbf7fa..b2b1bb6 100644 --- a/src/stat_cache.c +++ b/src/stat_cache.c @@ -24,7 +24,6 @@ void stat_cache_new(worker *wrk, gdouble ttl) { sc->job_queue_out = g_async_queue_new(); waitqueue_init(&sc->delete_queue, wrk->loop, stat_cache_delete_cb, ttl, sc); - ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ ev_init(&sc->job_watcher, stat_cache_job_cb); sc->job_watcher.data = wrk; diff --git a/src/throttle.c b/src/throttle.c new file mode 100644 index 0000000..fcd1e8a --- /dev/null +++ b/src/throttle.c @@ -0,0 +1,138 @@ +/* + * Implemented with token bucket algorithm. + * On average, the rate of bytes/s never exceeds the specified limit + * but allows small bursts of previously unused bandwidth (max rate*2 for 1 second). + */ + +#include + + +throttle_pool_t *throttle_pool_new(server *srv, GString *name, guint rate) { + throttle_pool_t *pool; + guint i; + + pool = g_slice_new0(throttle_pool_t); + pool->rate = rate; + pool->magazine = rate * THROTTLE_GRANULARITY; + pool->name = name; + + pool->queues = g_new0(GQueue*, srv->worker_count * 2);; + for (i = 0; i < (srv->worker_count*2); i+=2) { + pool->queues[i] = g_queue_new(); + pool->queues[i+1] = g_queue_new(); + } + + pool->current_queue = g_new0(guint, srv->worker_count); + + pool->last_pool_rearm = ev_time(); + pool->last_con_rearm = g_new0(ev_tstamp, srv->worker_count); + for (i = 0; i < srv->worker_count; i++) { + pool->last_con_rearm[i] = pool->last_pool_rearm; + } + + return pool; +} + +void throttle_pool_free(server *srv, throttle_pool_t *pool) { + guint i; + + for (i = 0; i < (srv->workers->len*2); i+=2) { + g_queue_free(pool->queues[i]); + g_queue_free(pool->queues[i+1]); + } + + g_free(pool->current_queue); + g_free(pool->last_con_rearm); + + g_string_free(pool->name, TRUE); + + g_slice_free(throttle_pool_t, pool); +} + + +void throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) { + waitqueue_elem *wqe; + throttle_pool_t *pool; + connection *con; + worker *wrk; + ev_tstamp now; + guint magazine, supply; + GQueue *queue; + GList *lnk, *lnk_next; + + UNUSED(revents); + + wrk = w->data; + now = ev_now(loop); + + while (NULL != (wqe = waitqueue_pop(&wrk->throttle_queue))) { + con = wqe->data; + + if (con->throttle.pool.ptr) { + /* throttled by pool */ + pool = con->throttle.pool.ptr; + + /* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */ + if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) { + if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) { + if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4)) + g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY); + + pool->last_pool_rearm = now; + } + + g_atomic_int_set(&pool->rearming, 0); + } + + /* select current queue */ + queue = pool->queues[wrk->ndx*2+pool->current_queue[wrk->ndx]]; + + if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) { + /* switch current queue by xoring with 1 */ + pool->current_queue[wrk->ndx] ^= 1; + + if (queue->length) { + do { + magazine = g_atomic_int_get(&pool->magazine); + supply = magazine / g_atomic_int_get(&pool->num_cons); + } while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length))); + + g_atomic_int_add(&(pool->num_cons), - queue->length); + + /* rearm connections */ + for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) { + ((connection*)lnk->data)->throttle.pool.magazine += supply; + ((connection*)lnk->data)->throttle.pool.queued = FALSE; + lnk_next = lnk->next; + lnk->next = NULL; + lnk->prev = NULL; + } + + /* clear current connection queue */ + g_queue_init(queue); + } + + pool->last_con_rearm[wrk->ndx] = now; + } + + if (con->throttle.con.rate) { + supply = MIN(con->throttle.pool.magazine, con->throttle.con.rate * THROTTLE_GRANULARITY); + con->throttle.con.magazine += supply; + con->throttle.pool.magazine -= supply; + } else { + con->throttle.con.magazine += con->throttle.pool.magazine; + con->throttle.pool.magazine = 0; + } + } else if (con->throttle.ip.ptr) { + /* throttled by ip */ + } else { + /* throttled by connection */ + if (con->throttle.con.magazine <= con->throttle.con.rate * THROTTLE_GRANULARITY * 4) + con->throttle.con.magazine += con->throttle.con.rate * THROTTLE_GRANULARITY; + } + + ev_io_add_events(loop, &con->sock_watcher, EV_WRITE); + } + + waitqueue_update(&wrk->throttle_queue); +} \ No newline at end of file diff --git a/src/waitqueue.c b/src/waitqueue.c index f2ff62b..dfb9aca 100644 --- a/src/waitqueue.c +++ b/src/waitqueue.c @@ -3,7 +3,6 @@ void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data) { ev_timer_init(&queue->timer, callback, delay, delay); - ev_timer_start(loop, &queue->timer); queue->timer.data = data; queue->head = queue->tail = NULL; @@ -17,19 +16,20 @@ void waitqueue_stop(waitqueue *queue) { void waitqueue_update(waitqueue *queue) { ev_tstamp repeat; + ev_tstamp now = ev_now(queue->loop); if (queue->head) { - repeat = queue->head->ts + queue->delay - ev_now(queue->loop); - if (repeat < 0.01) - repeat = 0.01; - } else { - repeat = queue->delay; - } + repeat = queue->head->ts + queue->delay - now; + + if (repeat < 0.05) + repeat = 0.05; - if (queue->timer.repeat != repeat) - { queue->timer.repeat = repeat; ev_timer_again(queue->loop, &queue->timer); + } else { + /* stop timer if queue empty */ + ev_timer_stop(queue->loop, &queue->timer); + return; } } @@ -69,6 +69,9 @@ void waitqueue_push(waitqueue *queue, waitqueue_elem *elem) { queue->tail->next = elem; queue->tail = elem; } + + if (!ev_is_active(&queue->timer)) + ev_timer_start(queue->loop, &queue->timer); } waitqueue_elem *waitqueue_pop(waitqueue *queue) { @@ -121,3 +124,32 @@ guint waitqueue_length(waitqueue *queue) { return i; } + +guint waitqueue_pop_ready(waitqueue *queue, waitqueue_elem **head) { + guint i = 0; + waitqueue_elem *elem = queue->head; + ev_tstamp now = ev_now(queue->loop); + + *head = elem; + + while (elem != NULL) { + if ((elem->ts + queue->delay) > now) { + queue->head = elem; + + if (elem->prev) { + elem->prev->next = NULL; + } + + return i; + } + + elem->queued = FALSE; + elem = elem->next; + i++; + } + + queue->head = NULL; + queue->tail = NULL; + + return i; +} diff --git a/src/worker.c b/src/worker.c index a1f75e9..c6c83b9 100644 --- a/src/worker.c +++ b/src/worker.c @@ -118,23 +118,6 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) waitqueue_update(&wrk->io_timeout_queue); } -/* check for throttled connections */ -static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) { - worker *wrk = (worker*) w->data; - connection *con; - waitqueue_elem *wqe; - - UNUSED(revents); - - while ((wqe = waitqueue_pop(&wrk->throttle_queue)) != NULL) { - /* connection waited long enough to reenable sending of data again */ - con = wqe->data; - ev_io_add_events(loop, &con->sock_watcher, EV_WRITE); - } - - waitqueue_update(&wrk->throttle_queue); -} - /* run vreqest state machine */ static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) { worker *wrk = (worker*) w->data; @@ -346,11 +329,9 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) { /* io timeout timer */ waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk); - ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ /* throttling */ - waitqueue_init(&wrk->throttle_queue, wrk->loop, worker_throttle_cb, 0.5, wrk); - ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ + waitqueue_init(&wrk->throttle_queue, wrk->loop, throttle_cb, THROTTLE_GRANULARITY, wrk); /* job queue */ g_queue_init(&wrk->job_queue); @@ -471,6 +452,9 @@ void worker_stop(worker *context, worker *wrk) { ev_async_stop(wrk->loop, &wrk->worker_stop_watcher); ev_async_stop(wrk->loop, &wrk->new_con_watcher); + waitqueue_stop(&wrk->io_timeout_queue); + waitqueue_stop(&wrk->throttle_queue); + ev_timer_stop(wrk->loop, &wrk->throttle_timer); worker_new_con_cb(wrk->loop, &wrk->new_con_watcher, 0); /* handle remaining new connections */ /* close keep alive connections */ diff --git a/src/wscript b/src/wscript index 849925b..9ad0862 100644 --- a/src/wscript +++ b/src/wscript @@ -48,6 +48,7 @@ common_src = ''' stat_cache.c sys-files.c sys-socket.c + throttle.c url_parser.rl utils.c value.c