- introduce throttling via pools, rework throttling by connection

- new actions: 'throttle_pool', 'throttle_connection' and 'status'
- don't start waitqueue timer in waitqueue_init(); start/stop timer on demand
- new parameter for network_write(): write_max
- move stats and timeout handling from network_write/read() to connection_cb()
personal/stbuehler/wip
Thomas Porzelt 14 years ago
parent 595ce1fe13
commit 383fa6a4f5

@ -45,6 +45,7 @@
#include <lighttpd/virtualrequest.h>
#include <lighttpd/log.h>
#include <lighttpd/stat_cache.h>
#include <lighttpd/throttle.h>
#include <lighttpd/connection.h>

@ -58,22 +58,38 @@ struct connection {
waitqueue_elem io_timeout_elem;
/* I/O throttling */
gboolean throttled; /* TRUE if connection is throttled */
struct {
waitqueue_elem queue_elem;
guint magazine;
ev_tstamp ts;
struct {
throttle_pool_t *ptr; /* NULL if not in any throttling pool */
GList lnk;
gboolean queued;
gint magazine;
} pool;
struct {
throttle_pool_t *ptr; /* pool for per-ip throttling, NULL if not limited by ip */
GList lnk;
gboolean queued;
gint magazine;
} ip;
struct {
guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
gint magazine;
ev_tstamp last_update;
} con;
waitqueue_elem wqueue_elem;
} throttle;
ev_tstamp ts;
struct {
guint64 bytes_in;
guint64 bytes_out;
guint64 bytes_in; /* total number of bytes received */
guint64 bytes_out; /* total number of bytes sent */
ev_tstamp last_avg;
guint64 bytes_in_5s;
guint64 bytes_out_5s;
guint64 bytes_in_5s_diff;
guint64 bytes_out_5s_diff;
guint64 bytes_in_5s; /* total number of bytes received at last 5s interval */
guint64 bytes_out_5s; /* total number of bytes sent at last 5s interval */
guint64 bytes_in_5s_diff; /* diff between bytes received at 5s interval n and interval n-1 */
guint64 bytes_out_5s_diff; /* diff between bytes sent at 5s interval n and interval n-1 */
} stats;
};

@ -24,7 +24,7 @@ LI_API ssize_t net_write(int fd, void *buf, ssize_t nbyte);
/** repeats read after EINTR */
LI_API ssize_t net_read(int fd, void *buf, ssize_t nbyte);
LI_API network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq);
LI_API network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq, goffset write_max);
LI_API network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq);
/* use writev for mem chunks, buffered read/write for files */

@ -76,6 +76,8 @@ struct server {
gdouble io_timeout;
GArray *throttle_pools;
gdouble stat_cache_ttl;
};

@ -0,0 +1,26 @@
#ifndef _LIGHTTPD_THROTTLE_H_
#define _LIGHTTPD_THROTTLE_H_
#define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */
struct throttle_pool_t {
GString *name;
guint rate; /** bytes/s */
gint magazine;
GQueue** queues; /** worker specific queues. each worker has 2 */
guint* current_queue;
guint num_cons;
gint rearming;
ev_tstamp last_pool_rearm;
ev_tstamp *last_con_rearm;
};
typedef struct throttle_pool_t throttle_pool_t;
void throttle_cb(struct ev_loop *loop, ev_timer *w, int revents);
throttle_pool_t *throttle_pool_new(server *srv, GString *name, guint rate);
void throttle_pool_free(server *srv, throttle_pool_t *pool);
#endif

@ -42,6 +42,9 @@ LI_API void waitqueue_push(waitqueue *queue, waitqueue_elem *elem);
/* pops the first ready! element from the queue or NULL if none ready yet. this should be called in your callback */
LI_API waitqueue_elem *waitqueue_pop(waitqueue *queue);
/* pops all elements from the queue that are ready or NULL of none ready yet. returns number of elements pop()ed and saves old head in '*head' */
LI_API guint waitqueue_pop_ready(waitqueue *queue, waitqueue_elem **head);
/* removes an element from the queue */
LI_API void waitqueue_remove(waitqueue *queue, waitqueue_elem *elem);

@ -298,6 +298,7 @@ SET(COMMON_SRC
stat_cache.c
sys-files.c
sys-socket.c
throttle.c
url_parser.c
utils.c
value.c

@ -192,6 +192,7 @@ static gboolean connection_handle_read(connection *con) {
con->expect_100_cont = FALSE;
ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
con->state = CON_STATE_HANDLE_MAINVR;
action_enter(con->mainvr, con->srv->mainaction);
vrequest_handle_request_headers(con->mainvr);
@ -204,6 +205,9 @@ static gboolean connection_handle_read(connection *con) {
}
static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
network_status_t res;
goffset write_max;
goffset transferred;
connection *con = (connection*) w->data;
if (revents & EV_READ) {
@ -211,7 +215,15 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
/* don't read the next request before current one is done */
ev_io_rem_events(loop, w, EV_READ);
} else {
switch (network_read(con->mainvr, w->fd, con->raw_in)) {
transferred = con->raw_in->length;
res = network_read(con->mainvr, w->fd, con->raw_in);
transferred = con->raw_in->length - transferred;
con->wrk->stats.bytes_in += transferred;
con->stats.bytes_in += transferred;
switch (res) {
case NETWORK_STATUS_SUCCESS:
if (!connection_handle_read(con)) return;
break;
@ -236,24 +248,67 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (revents & EV_WRITE) {
if (con->raw_out->length > 0) {
switch (network_write(con->mainvr, w->fd, con->raw_out)) {
case NETWORK_STATUS_SUCCESS:
vrequest_joblist_append(con->mainvr);
break;
case NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network write fatal error");
connection_error(con);
return;
case NETWORK_STATUS_CONNECTION_CLOSE:
connection_close(con);
return;
case NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
ev_io_rem_events(loop, w, EV_WRITE);
_ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio");
/* TODO: aio */
break;
if (con->throttled) {
write_max = MIN(con->throttle.con.magazine, 256*1024);
} else {
write_max = 256*1024; /* 256kB */
}
if (write_max > 0) {
transferred = con->raw_out->length;
res = network_write(con->mainvr, w->fd, con->raw_out, write_max);
transferred = transferred - con->raw_out->length;
con->wrk->stats.bytes_out += transferred;
con->stats.bytes_out += transferred;
switch (res) {
case NETWORK_STATUS_SUCCESS:
vrequest_joblist_append(con->mainvr);
break;
case NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network write fatal error");
connection_error(con);
return;
case NETWORK_STATUS_CONNECTION_CLOSE:
connection_close(con);
return;
case NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
ev_io_rem_events(loop, w, EV_WRITE);
_ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio");
/* TODO: aio */
break;
}
} else {
transferred = 0;
}
if ((ev_now(loop) - con->stats.last_avg) >= 5.0) {
con->stats.bytes_out_5s_diff = con->wrk->stats.bytes_out - con->wrk->stats.bytes_out_5s;
con->stats.bytes_out_5s = con->stats.bytes_out;
con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s;
con->stats.bytes_in_5s = con->stats.bytes_in;
con->stats.last_avg = ev_now(loop);
}
if (con->throttled) {
con->throttle.con.magazine -= transferred;
/*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con,
transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/
if (con->throttle.con.magazine <= 0) {
ev_io_rem_events(loop, w, EV_WRITE);
waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
}
if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queued) {
throttle_pool_t *pool = con->throttle.pool.ptr;
g_atomic_int_inc(&pool->num_cons);
g_queue_push_tail_link(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
con->throttle.pool.queued = TRUE;
}
}
} else {
_DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue");
@ -261,6 +316,9 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
}
}
if ((con->io_timeout_elem.ts + 1.0) < ev_now(loop))
waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
check_response_done(con);
}
@ -350,7 +408,10 @@ connection* connection_new(worker *wrk) {
con->keep_alive_data.watcher.data = con;
con->io_timeout_elem.data = con;
con->throttle.queue_elem.data = con;
con->throttle.wqueue_elem.data = con;
con->throttle.pool.lnk.data = con;
con->throttle.ip.lnk.data = con;
return con;
}
@ -406,8 +467,43 @@ void connection_reset(connection *con) {
/* remove from timeout queue */
waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
/* remove from throttle queue */
waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.queue_elem);
waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
if (con->throttle.pool.ptr) {
if (con->throttle.pool.queued) {
throttle_pool_t *pool = con->throttle.pool.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
con->throttle.pool.queued = FALSE;
}
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttle.pool.ptr = NULL;
}
if (con->throttle.ip.ptr) {
if (con->throttle.ip.queued) {
throttle_pool_t *pool = con->throttle.ip.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk);
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
con->throttle.ip.queued = FALSE;
}
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
con->throttle.ip.ptr = NULL;
}
con->throttle.con.rate = 0;
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttled = FALSE;
}
void server_check_keepalive(server *srv);
@ -460,7 +556,41 @@ void connection_reset_keep_alive(connection *con) {
/* remove from timeout queue */
waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
/* remove from throttle queue */
waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.queue_elem);
waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
if (con->throttle.pool.ptr) {
if (con->throttle.pool.queued) {
throttle_pool_t *pool = con->throttle.pool.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
con->throttle.pool.queued = FALSE;
}
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttle.pool.ptr = NULL;
}
if (con->throttle.ip.ptr) {
if (con->throttle.ip.queued) {
throttle_pool_t *pool = con->throttle.ip.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk);
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
con->throttle.ip.queued = FALSE;
}
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
con->throttle.ip.ptr = NULL;
}
con->throttle.con.rate = 0;
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttled = FALSE;
}
void connection_free(connection *con) {

@ -36,6 +36,7 @@
* xyz
*
* Todo:
* - make output generating "async", give up control every N entries
* - filters for entries (pattern, regex)
* - include-* parameters
* - javascript for sorting

@ -596,7 +596,7 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (fcon->fd != -1 && (revents & EV_WRITE)) {
if (fcon->fcgi_out->length > 0) {
switch (network_write(fcon->vr, w->fd, fcon->fcgi_out)) {
switch (network_write(fcon->vr, w->fd, fcon->fcgi_out, 256*1024)) {
case NETWORK_STATUS_SUCCESS:
break;
case NETWORK_STATUS_FATAL_ERROR:

@ -36,26 +36,12 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) {
return r;
}
network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq, goffset write_max) {
network_status_t res;
ev_tstamp ts, now = CUR_TS(vr->wrk);
worker *wrk;
#ifdef TCP_CORK
int corked = 0;
#endif
goffset write_max = 256*1024, write_bytes, wrote; /* 256 kb */
if (CORE_OPTION(CORE_OPTION_THROTTLE).number) {
/* throttling is enabled */
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->wrk->throttle_queue.delay)) {
vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts);
if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number)
vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number;
vr->con->throttle.ts = now;
/*g_print("throttle magazine: %u kbytes\n", vr->con->throttle.magazine / 1024);*/
}
write_max = vr->con->throttle.magazine;
}
goffset write_bytes, wrote;
#ifdef TCP_CORK
/* Linux: put a cork into the socket as we want to combine the write() calls
@ -84,33 +70,6 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
}
#endif
/* stats */
wrk = vr->wrk;
wrk->stats.bytes_out += wrote;
vr->con->stats.bytes_out += wrote;
/* update 5s stats */
ts = CUR_TS(wrk);
if ((ts - vr->con->stats.last_avg) >= 5.0) {
vr->con->stats.bytes_out_5s_diff = vr->wrk->stats.bytes_out - vr->wrk->stats.bytes_out_5s;
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
vr->con->stats.last_avg = ts;
}
/* only update once a second, the cast is to round the timestamp */
if ((vr->con->io_timeout_elem.ts + 1.) < now)
waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
vr->con->throttle.magazine = write_bytes;
/* check if throttle magazine is empty */
if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_bytes == 0) {
/* remove EV_WRITE from sockwatcher for now */
ev_io_rem_events(vr->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
waitqueue_push(&vr->wrk->throttle_queue, &vr->con->throttle.queue_elem);
return NETWORK_STATUS_WAIT_FOR_AIO_EVENT;
}
return res;
}
@ -119,8 +78,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
off_t max_read = 16 * blocksize; /* 256k */
ssize_t r;
off_t len = 0;
worker *wrk = vr->wrk;
ev_tstamp now = CUR_TS(wrk);
if (cq->limit && cq->limit->limit > 0) {
if (max_read > cq->limit->limit - cq->limit->current) {
@ -132,10 +89,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
}
}
/* only update once a second */
if ((vr->con->io_timeout_elem.ts + 1.) < now)
waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
do {
GByteArray *buf = g_byte_array_sized_new(blocksize);
g_byte_array_set_size(buf, blocksize);
@ -160,19 +113,6 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
g_byte_array_set_size(buf, r);
chunkqueue_append_bytearr(cq, buf);
len += r;
/* stats */
wrk = vr->wrk;
wrk->stats.bytes_in += r;
vr->con->stats.bytes_in += r;
/* update 5s stats */
if ((now - vr->con->stats.last_avg) >= 5.0) {
vr->con->stats.bytes_in_5s_diff = vr->con->stats.bytes_in - vr->con->stats.bytes_in_5s;
vr->con->stats.bytes_in_5s = vr->con->stats.bytes_in;
vr->con->stats.last_avg = now;
}
} while (r == blocksize && len < max_read);
return NETWORK_STATUS_SUCCESS;

@ -292,6 +292,32 @@ static action* core_static(server *srv, plugin* p, value *val) {
return action_new_function(core_handle_static, NULL, NULL, NULL);
}
static handler_t core_handle_status(vrequest *vr, gpointer param, gpointer *context) {
UNUSED(param);
UNUSED(context);
vr->response.http_status = GPOINTER_TO_INT(param);
return HANDLER_GO_ON;
}
static action* core_status(server *srv, plugin* p, value *val) {
gpointer ptr;
UNUSED(p);
if (val || val->type != VALUE_NUMBER) {
ERROR(srv, "%s", "status action expects a number as parameter");
return NULL;
}
ptr = GINT_TO_POINTER((gint) value_extract(val).number);
return action_new_function(core_handle_status, NULL, NULL, ptr);
}
static void core_log_write_free(server *srv, gpointer param) {
UNUSED(srv);
g_string_free(param, TRUE);
@ -957,7 +983,7 @@ static action* core_buffer_out(server *srv, plugin* p, value *val) {
UNUSED(p);
if (val->type != VALUE_NUMBER) {
ERROR(srv, "'core_buffer_out' action expects an integer as parameter, %s given", value_type_string(val->type));
ERROR(srv, "'buffer.out' action expects an integer as parameter, %s given", value_type_string(val->type));
return NULL;
}
@ -990,7 +1016,7 @@ static action* core_buffer_in(server *srv, plugin* p, value *val) {
UNUSED(p);
if (val->type != VALUE_NUMBER) {
ERROR(srv, "'core_buffer_in' action expects an integer as parameter, %s given", value_type_string(val->type));
ERROR(srv, "'buffer.in' action expects an integer as parameter, %s given", value_type_string(val->type));
return NULL;
}
@ -1007,6 +1033,154 @@ static action* core_buffer_in(server *srv, plugin* p, value *val) {
return action_new_function(core_handle_buffer_in, NULL, NULL, GINT_TO_POINTER((gint) limit));
}
static handler_t core_handle_throttle_pool(vrequest *vr, gpointer param, gpointer *context) {
throttle_pool_t *pool = param;
gint magazine;
UNUSED(context);
if (vr->con->throttle.pool.ptr != pool) {
if (vr->con->throttle.pool.ptr) {
/* connection has been in a different pool, give back bandwidth */
g_atomic_int_add(&vr->con->throttle.pool.ptr->magazine, vr->con->throttle.pool.magazine);
vr->con->throttle.pool.magazine = 0;
if (vr->con->throttle.pool.queued) {
throttle_pool_t *p = vr->con->throttle.pool.ptr;
g_queue_unlink(p->queues[vr->con->wrk->ndx+p->current_queue[vr->con->wrk->ndx]], &vr->con->throttle.ip.lnk);
g_atomic_int_add(&p->num_cons, -1);
}
}
/* try to steal some initial 4kbytes from the pool */
while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) {
if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) {
vr->con->throttle.pool.magazine = 4*1024;
break;
}
}
}
vr->con->throttle.pool.ptr = pool;
vr->con->throttled = TRUE;
return HANDLER_GO_ON;
}
static action* core_throttle_pool(server *srv, plugin* p, value *val) {
GString *name;
guint i;
throttle_pool_t *pool = NULL;
gint64 rate;
UNUSED(p);
if (val->type != VALUE_STRING && val->type != VALUE_LIST) {
ERROR(srv, "'throttle_pool' action expects a string or a string-number tuple as parameter, %s given", value_type_string(val->type));
return NULL;
}
if (val->type == VALUE_LIST) {
if (val->data.list->len != 2
|| g_array_index(val->data.list, value*, 0)->type != VALUE_STRING
|| g_array_index(val->data.list, value*, 1)->type != VALUE_NUMBER) {
ERROR(srv, "%s", "'throttle_pool' action expects a string or a string-number tuple as parameter");
return NULL;
}
name = g_array_index(val->data.list, value*, 0)->data.string;
rate = g_array_index(val->data.list, value*, 1)->data.number;
if (rate && rate < (32*1024)) {
ERROR(srv, "throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate);
return NULL;
}
if (rate > (0xFFFFFFFF)) {
ERROR(srv, "throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
} else {
name = val->data.string;
rate = 0;
}
for (i = 0; i < srv->throttle_pools->len; i++) {
if (g_string_equal(g_array_index(srv->throttle_pools, throttle_pool_t*, i)->name, name)) {
/* pool already defined */
if (val->type == VALUE_LIST && g_array_index(srv->throttle_pools, throttle_pool_t*, i)->rate != (guint)rate) {
ERROR(srv, "throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", name->str,
g_array_index(srv->throttle_pools, throttle_pool_t*, i)->rate);
return NULL;
}
pool = g_array_index(srv->throttle_pools, throttle_pool_t*, i);
break;
}
}
if (!pool) {
/* pool not yet defined */
if (val->type == VALUE_STRING) {
ERROR(srv, "throttle_pool: rate for pool '%s' hasn't been defined", name->str);
return NULL;
}
pool = throttle_pool_new(srv, value_extract(g_array_index(val->data.list, value*, 0)).string, (guint)rate);
g_array_append_val(srv->throttle_pools, pool);
}
return action_new_function(core_handle_throttle_pool, NULL, NULL, pool);
}
static handler_t core_handle_throttle_connection(vrequest *vr, gpointer param, gpointer *context) {
gint supply;
connection *con = vr->con;
guint rate = GPOINTER_TO_UINT(param);
UNUSED(context);
con->throttle.con.rate = rate;
con->throttled = TRUE;
if (con->throttle.pool.magazine) {
suply = MAX(con->throttle.pool.magazine, rate * THROTTLE_GRANULARITY);
con->throttle.con.magazine += supply;
con->throttle.pool.magazine -= supply;
}
return HANDLER_GO_ON;
}
static action* core_throttle_connection(server *srv, plugin* p, value *val) {
gint64 rate;
UNUSED(p);
if (val->type != VALUE_NUMBER) {
ERROR(srv, "'throttle_connection' action expects a positiv integer as parameter, %s given", value_type_string(val->type));
return NULL;
}
rate = val->data.number;
if (rate < 0) {
rate = 0; /* no limit */
}
if (rate && rate < (32*1024)) {
ERROR(srv, "throttle_connection: rate %"G_GUINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate);
return NULL;
}
if (rate > (0xFFFFFFFF)) {
ERROR(srv, "throttle_connection: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
return action_new_function(core_handle_throttle_connection, NULL, NULL, GUINT_TO_POINTER((guint) rate));
}
static const plugin_option options[] = {
{ "debug.log_request_handling", VALUE_BOOLEAN, GINT_TO_POINTER(FALSE), NULL, NULL },
@ -1038,6 +1212,8 @@ static const plugin_action actions[] = {
{ "docroot", core_docroot },
{ "static", core_static },
{ "status", core_status },
{ "log.write", core_log_write },
{ "blank", core_blank },
@ -1054,6 +1230,10 @@ static const plugin_action actions[] = {
{ "buffer.out", core_buffer_out },
{ "buffer.in", core_buffer_in },
{ "throttle_pool", core_throttle_pool },
/*{ "throttle.ip", core_throttle_ip },*/
{ "throttle_connection", core_throttle_connection },
{ NULL, NULL }
};

@ -87,6 +87,7 @@ server* server_new(const gchar *module_dir) {
srv->state = SERVER_STARTING;
srv->workers = g_array_new(FALSE, TRUE, sizeof(worker*));
srv->worker_count = 1;
srv->sockets = g_ptr_array_new();
@ -109,6 +110,8 @@ server* server_new(const gchar *module_dir) {
/* error log ts format */
server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT"));
srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(throttle_pool_t*));
log_init(srv);
srv->io_timeout = 300; /* default I/O timeout */
@ -135,6 +138,15 @@ void server_free(server* srv) {
action_release(srv, srv->mainaction);
/* free throttle pools */
{
guint i;
for (i = 0; i < srv->throttle_pools->len; i++) {
throttle_pool_free(srv, g_array_index(srv->throttle_pools, throttle_pool_t*, i));
}
g_array_free(srv->throttle_pools, TRUE);
}
/* free all workers */
{
guint i;

@ -24,7 +24,6 @@ void stat_cache_new(worker *wrk, gdouble ttl) {
sc->job_queue_out = g_async_queue_new();
waitqueue_init(&sc->delete_queue, wrk->loop, stat_cache_delete_cb, ttl, sc);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&sc->job_watcher, stat_cache_job_cb);
sc->job_watcher.data = wrk;

@ -0,0 +1,138 @@
/*
* Implemented with token bucket algorithm.
* On average, the rate of bytes/s never exceeds the specified limit
* but allows small bursts of previously unused bandwidth (max rate*2 for 1 second).
*/
#include <lighttpd/base.h>
throttle_pool_t *throttle_pool_new(server *srv, GString *name, guint rate) {
throttle_pool_t *pool;
guint i;
pool = g_slice_new0(throttle_pool_t);
pool->rate = rate;
pool->magazine = rate * THROTTLE_GRANULARITY;
pool->name = name;
pool->queues = g_new0(GQueue*, srv->worker_count * 2);;
for (i = 0; i < (srv->worker_count*2); i+=2) {
pool->queues[i] = g_queue_new();
pool->queues[i+1] = g_queue_new();
}
pool->current_queue = g_new0(guint, srv->worker_count);
pool->last_pool_rearm = ev_time();
pool->last_con_rearm = g_new0(ev_tstamp, srv->worker_count);
for (i = 0; i < srv->worker_count; i++) {
pool->last_con_rearm[i] = pool->last_pool_rearm;
}
return pool;
}
void throttle_pool_free(server *srv, throttle_pool_t *pool) {
guint i;
for (i = 0; i < (srv->workers->len*2); i+=2) {
g_queue_free(pool->queues[i]);
g_queue_free(pool->queues[i+1]);
}
g_free(pool->current_queue);
g_free(pool->last_con_rearm);
g_string_free(pool->name, TRUE);
g_slice_free(throttle_pool_t, pool);
}
void throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
waitqueue_elem *wqe;
throttle_pool_t *pool;
connection *con;
worker *wrk;
ev_tstamp now;
guint magazine, supply;
GQueue *queue;
GList *lnk, *lnk_next;
UNUSED(revents);
wrk = w->data;
now = ev_now(loop);
while (NULL != (wqe = waitqueue_pop(&wrk->throttle_queue))) {
con = wqe->data;
if (con->throttle.pool.ptr) {
/* throttled by pool */
pool = con->throttle.pool.ptr;
/* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) {
if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4))
g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY);
pool->last_pool_rearm = now;
}
g_atomic_int_set(&pool->rearming, 0);
}
/* select current queue */
queue = pool->queues[wrk->ndx*2+pool->current_queue[wrk->ndx]];
if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) {
/* switch current queue by xoring with 1 */
pool->current_queue[wrk->ndx] ^= 1;
if (queue->length) {
do {
magazine = g_atomic_int_get(&pool->magazine);
supply = magazine / g_atomic_int_get(&pool->num_cons);
} while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length)));
g_atomic_int_add(&(pool->num_cons), - queue->length);
/* rearm connections */
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
((connection*)lnk->data)->throttle.pool.magazine += supply;
((connection*)lnk->data)->throttle.pool.queued = FALSE;
lnk_next = lnk->next;
lnk->next = NULL;
lnk->prev = NULL;
}
/* clear current connection queue */
g_queue_init(queue);
}
pool->last_con_rearm[wrk->ndx] = now;
}
if (con->throttle.con.rate) {
supply = MIN(con->throttle.pool.magazine, con->throttle.con.rate * THROTTLE_GRANULARITY);
con->throttle.con.magazine += supply;
con->throttle.pool.magazine -= supply;
} else {
con->throttle.con.magazine += con->throttle.pool.magazine;
con->throttle.pool.magazine = 0;
}
} else if (con->throttle.ip.ptr) {
/* throttled by ip */
} else {
/* throttled by connection */
if (con->throttle.con.magazine <= con->throttle.con.rate * THROTTLE_GRANULARITY * 4)
con->throttle.con.magazine += con->throttle.con.rate * THROTTLE_GRANULARITY;
}
ev_io_add_events(loop, &con->sock_watcher, EV_WRITE);
}
waitqueue_update(&wrk->throttle_queue);
}

@ -3,7 +3,6 @@
void waitqueue_init(waitqueue *queue, struct ev_loop *loop, waitqueue_cb callback, gdouble delay, gpointer data) {
ev_timer_init(&queue->timer, callback, delay, delay);
ev_timer_start(loop, &queue->timer);
queue->timer.data = data;
queue->head = queue->tail = NULL;
@ -17,19 +16,20 @@ void waitqueue_stop(waitqueue *queue) {
void waitqueue_update(waitqueue *queue) {
ev_tstamp repeat;
ev_tstamp now = ev_now(queue->loop);
if (queue->head) {
repeat = queue->head->ts + queue->delay - ev_now(queue->loop);
if (repeat < 0.01)
repeat = 0.01;
} else {
repeat = queue->delay;
}
repeat = queue->head->ts + queue->delay - now;
if (repeat < 0.05)
repeat = 0.05;
if (queue->timer.repeat != repeat)
{
queue->timer.repeat = repeat;
ev_timer_again(queue->loop, &queue->timer);
} else {
/* stop timer if queue empty */
ev_timer_stop(queue->loop, &queue->timer);
return;
}
}
@ -69,6 +69,9 @@ void waitqueue_push(waitqueue *queue, waitqueue_elem *elem) {
queue->tail->next = elem;
queue->tail = elem;
}
if (!ev_is_active(&queue->timer))
ev_timer_start(queue->loop, &queue->timer);
}
waitqueue_elem *waitqueue_pop(waitqueue *queue) {
@ -121,3 +124,32 @@ guint waitqueue_length(waitqueue *queue) {
return i;
}
guint waitqueue_pop_ready(waitqueue *queue, waitqueue_elem **head) {
guint i = 0;
waitqueue_elem *elem = queue->head;
ev_tstamp now = ev_now(queue->loop);
*head = elem;
while (elem != NULL) {
if ((elem->ts + queue->delay) > now) {
queue->head = elem;
if (elem->prev) {
elem->prev->next = NULL;
}
return i;
}
elem->queued = FALSE;
elem = elem->next;
i++;
}
queue->head = NULL;
queue->tail = NULL;
return i;
}

@ -118,23 +118,6 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents)
waitqueue_update(&wrk->io_timeout_queue);
}
/* check for throttled connections */
static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
connection *con;
waitqueue_elem *wqe;
UNUSED(revents);
while ((wqe = waitqueue_pop(&wrk->throttle_queue)) != NULL) {
/* connection waited long enough to reenable sending of data again */
con = wqe->data;
ev_io_add_events(loop, &con->sock_watcher, EV_WRITE);
}
waitqueue_update(&wrk->throttle_queue);
}
/* run vreqest state machine */
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
@ -346,11 +329,9 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
/* io timeout timer */
waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
/* throttling */
waitqueue_init(&wrk->throttle_queue, wrk->loop, worker_throttle_cb, 0.5, wrk);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
waitqueue_init(&wrk->throttle_queue, wrk->loop, throttle_cb, THROTTLE_GRANULARITY, wrk);
/* job queue */
g_queue_init(&wrk->job_queue);
@ -471,6 +452,9 @@ void worker_stop(worker *context, worker *wrk) {
ev_async_stop(wrk->loop, &wrk->worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
waitqueue_stop(&wrk->io_timeout_queue);
waitqueue_stop(&wrk->throttle_queue);
ev_timer_stop(wrk->loop, &wrk->throttle_timer);
worker_new_con_cb(wrk->loop, &wrk->new_con_watcher, 0); /* handle remaining new connections */
/* close keep alive connections */

@ -48,6 +48,7 @@ common_src = '''
stat_cache.c
sys-files.c
sys-socket.c
throttle.c
url_parser.rl
utils.c
value.c

Loading…
Cancel
Save