[core] Add support for throttling by IP

personal/stbuehler/wip
Thomas Porzelt 13 years ago
parent 8ae6902602
commit 31d2291a1c

@ -133,6 +133,7 @@ struct liServer {
gdouble io_timeout;
GArray *throttle_pools;
liRadixTree *throttle_ip_pools;
gdouble stat_cache_ttl;
gint tasklet_pool_threads;

@ -3,12 +3,21 @@
#define THROTTLE_GRANULARITY 200 /* defines how frequently (in milliseconds) a magazine is refilled */
/* this makro converts a ev_tstamp to a gint. this is needed for atomic access. millisecond precision, can hold a month max */
#define THROTTLE_EVTSTAMP_TO_GINT(x) ((gint) ((x - ((gint)x - (gint)x % (3600*24*31))) * 1000))
/* this makro converts a ev_tstamp to a gint. this is needed for atomic access. millisecond precision, can hold two weeks max */
#define THROTTLE_EVTSTAMP_TO_GINT(x) ((gint) ((x - ((gint)x - (gint)x % (3600*24*14))) * 1000))
typedef enum {
LI_THROTTLE_POOL_NAME,
LI_THROTTLE_POOL_IP
} liThrottlePoolType;
struct liThrottlePool {
/* global per pool */
GString *name;
liThrottlePoolType type;
union {
GString *name;
liSocketAddress addr;
} data;
gint rate; /* bytes/s */
gint refcount;
gint rearming; /* atomic access, 1 if a worker is currently rearming the magazine */
@ -29,11 +38,11 @@ struct liThrottleParam {
LI_API void li_throttle_reset(liVRequest *vr);
LI_API void li_throttle_cb(liWaitQueue *wq, gpointer data);
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate);
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, liThrottlePoolType type, gpointer param, guint rate);
LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool);
LI_API void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool);
LI_API void li_throttle_pool_release(liVRequest *vr);
LI_API void li_throttle_pool_release(liVRequest *vr, liThrottlePool *pool);
/* update throttle data: notify it that we sent <transferred> bytes, and that we never send more than write_max at once */
LI_API void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max);

@ -134,7 +134,10 @@ struct liVRequest {
gint magazine;
} pool;
struct {
gchar unused; /* this struct is unused for now */
liThrottlePool *ptr;
GList lnk;
GQueue *queue;
gint magazine;
} ip;
struct {
gint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */

@ -1616,6 +1616,13 @@ static liAction* core_buffer_in(liServer *srv, liWorker *wrk, liPlugin* p, liVal
return li_action_new_function(core_handle_buffer_in, NULL, NULL, GINT_TO_POINTER((gint) limit));
}
static void core_throttle_pool_free(liServer *srv, gpointer param) {
UNUSED(srv);
li_throttle_pool_free(srv, param);
}
static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param, gpointer *context) {
liThrottlePool *pool = param;
@ -1665,7 +1672,7 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l
rate = 0;
}
pool = li_throttle_pool_new(srv, name, rate);
pool = li_throttle_pool_new(srv, LI_THROTTLE_POOL_NAME, name, rate);
if (!pool) {
ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str);
@ -1673,11 +1680,49 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l
}
if (rate != pool->rate && rate != 0) {
ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", pool->name->str, pool->rate);
ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", pool->data.name->str, pool->rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_pool, NULL, core_throttle_pool_free, pool);
}
static liHandlerResult core_handle_throttle_ip(liVRequest *vr, gpointer param, gpointer *context) {
liThrottlePool *pool;
gint rate = GPOINTER_TO_INT(param);
UNUSED(context);
pool = li_throttle_pool_new(vr->wrk->srv, LI_THROTTLE_POOL_IP, &vr->coninfo->remote_addr, rate);
li_throttle_pool_acquire(vr, pool);
return LI_HANDLER_GO_ON;
}
static liAction* core_throttle_ip(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
gint64 rate;
UNUSED(wrk); UNUSED(p); UNUSED(userdata);
if (val->type != LI_VALUE_NUMBER) {
ERROR(srv, "'io.throttle_ip' action expects a positiv integer as parameter, %s given", li_value_type_string(val->type));
return NULL;
}
rate = val->data.number;
if (rate < 32*1024) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum)", rate);
return NULL;
}
if (rate > 0xFFFFFFFF) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_pool, NULL, NULL, pool);
return li_action_new_function(core_handle_throttle_ip, NULL, NULL, GINT_TO_POINTER(rate));
}
static void core_throttle_connection_free(liServer *srv, gpointer param) {
@ -1835,7 +1880,7 @@ static const liPluginAction actions[] = {
{ "io.buffer_in", core_buffer_in, NULL },
{ "io.throttle", core_throttle_connection, NULL },
{ "io.throttle_pool", core_throttle_pool, NULL },
/*{ "io.throttle_ip", core_throttle_ip, NULL },*/
{ "io.throttle_ip", core_throttle_ip, NULL },
{ NULL, NULL, NULL }
};

@ -142,6 +142,7 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) {
li_server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT"));
srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(liThrottlePool*));
srv->throttle_ip_pools = li_radixtree_new();
li_log_init(srv);
@ -221,6 +222,7 @@ void li_server_free(liServer* srv) {
li_throttle_pool_free(srv, g_array_index(srv->throttle_pools, liThrottlePool*, i));
}
g_array_free(srv->throttle_pools, TRUE);
li_radixtree_free(srv->throttle_ip_pools, NULL, NULL);
}
if (srv->acon) {

@ -6,20 +6,37 @@
#include <lighttpd/base.h>
liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) {
liThrottlePool *li_throttle_pool_new(liServer *srv, liThrottlePoolType type, gpointer param, guint rate) {
liThrottlePool *pool;
guint i;
g_mutex_lock(srv->action_mutex);
/* check if we already have a pool with that name */
for (i = 0; i < srv->throttle_pools->len; i++) {
pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
if (type == LI_THROTTLE_POOL_NAME) {
/* named pool */
GString *name = param;
/* check if we already have a pool with that name */
for (i = 0; i < srv->throttle_pools->len; i++) {
pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
if (g_string_equal(pool->data.name, name)) {
g_atomic_int_inc(&pool->refcount);
g_mutex_unlock(srv->action_mutex);
g_string_free(name, TRUE);
return pool;
}
}
} else {
/* IP address pool */
liSocketAddress *remote_addr = param;
if (remote_addr->addr->plain.sa_family == AF_INET)
pool = li_radixtree_lookup_exact(srv->throttle_ip_pools, &remote_addr->addr->ipv4.sin_addr.s_addr, 32);
else
pool = li_radixtree_lookup_exact(srv->throttle_ip_pools, &remote_addr->addr->ipv6.sin6_addr.s6_addr, 128);
if (g_string_equal(pool->name, name)) {
g_atomic_int_inc(&pool->refcount);
if (pool) {
g_mutex_unlock(srv->action_mutex);
g_string_free(name, TRUE);
return pool;
}
}
@ -30,10 +47,9 @@ liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) {
}
pool = g_slice_new0(liThrottlePool);
pool->type = type;
pool->rate = rate;
pool->name = name;
pool->last_rearm = THROTTLE_EVTSTAMP_TO_GINT(ev_time());
pool->refcount = 1;
/*
* We if we are not in LI_SERVER_INIT state, we can initialize the queues directly.
@ -51,7 +67,19 @@ liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) {
}
}
g_array_append_val(srv->throttle_pools, pool);
if (type == LI_THROTTLE_POOL_NAME) {
pool->refcount = 1;
pool->data.name = param;
g_array_append_val(srv->throttle_pools, pool);
} else {
liSocketAddress *remote_addr = param;
pool->data.addr = li_sockaddr_dup(*remote_addr);
if (remote_addr->addr->plain.sa_family == AF_INET)
li_radixtree_insert(srv->throttle_ip_pools, &remote_addr->addr->ipv4.sin_addr.s_addr, 32, pool);
else
li_radixtree_insert(srv->throttle_ip_pools, &remote_addr->addr->ipv6.sin6_addr.s6_addr, 128, pool);
}
g_mutex_unlock(srv->action_mutex);
@ -65,11 +93,22 @@ void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
return;
g_mutex_lock(srv->action_mutex);
for (i = 0; i < srv->throttle_pools->len; i++) {
if (pool == g_array_index(srv->throttle_pools, liThrottlePool*, i)) {
g_array_remove_index_fast(srv->throttle_pools, i);
break;
if (pool->type == LI_THROTTLE_POOL_NAME) {
for (i = 0; i < srv->throttle_pools->len; i++) {
if (pool == g_array_index(srv->throttle_pools, liThrottlePool*, i)) {
g_array_remove_index_fast(srv->throttle_pools, i);
break;
}
}
g_string_free(pool->data.name, TRUE);
} else {
if (pool->data.addr.addr->plain.sa_family == AF_INET)
li_radixtree_remove(srv->throttle_ip_pools, &pool->data.addr.addr->ipv4.sin_addr.s_addr, 32);
else
li_radixtree_remove(srv->throttle_ip_pools, &pool->data.addr.addr->ipv6.sin6_addr.s6_addr, 128);
li_sockaddr_clear(&pool->data.addr);
}
g_mutex_unlock(srv->action_mutex);
@ -84,40 +123,52 @@ void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
g_free(pool->worker_queues);
}
g_string_free(pool->name, TRUE);
g_slice_free(liThrottlePool, pool);
}
void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) {
if (vr->throttle.pool.ptr == pool)
/* already in this pool */
if (vr->throttle.pool.ptr == pool || vr->throttle.ip.ptr == pool)
return;
g_atomic_int_inc(&pool->refcount);
if (vr->throttle.pool.ptr != NULL) {
/* already in a different pool */
li_throttle_pool_release(vr);
if (pool->type == LI_THROTTLE_POOL_NAME) {
if (vr->throttle.pool.ptr != NULL) {
/* already in a different pool */
li_throttle_pool_release(vr, vr->throttle.pool.ptr);
}
vr->throttle.pool.ptr = pool;
} else {
vr->throttle.ip.ptr = pool;
}
vr->throttle.pool.ptr = pool;
vr->throttled = TRUE;
}
void li_throttle_pool_release(liVRequest *vr) {
if (vr->throttle.pool.ptr == NULL)
return;
void li_throttle_pool_release(liVRequest *vr, liThrottlePool *pool) {
if (pool->type == LI_THROTTLE_POOL_NAME) {
if (vr->throttle.pool.queue) {
g_atomic_int_add(&pool->worker_num_cons_queued[vr->wrk->ndx], -1);
g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
vr->throttle.pool.queue = NULL;
}
g_atomic_int_add(&vr->throttle.pool.ptr->refcount, -1);
vr->throttle.pool.magazine = 0;
vr->throttle.pool.ptr = NULL;
} else {
if (vr->throttle.ip.queue) {
g_atomic_int_add(&pool->worker_num_cons_queued[vr->wrk->ndx], -1);
g_queue_unlink(vr->throttle.ip.queue, &vr->throttle.ip.lnk);
vr->throttle.ip.queue = NULL;
}
if (vr->throttle.pool.queue) {
g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
vr->throttle.pool.queue = NULL;
g_atomic_int_add(&vr->throttle.pool.ptr->worker_num_cons_queued[vr->wrk->ndx], -1);
vr->throttle.ip.magazine = 0;
vr->throttle.ip.ptr = NULL;
}
/* give back bandwidth */
vr->throttle.pool.magazine = 0;
vr->throttle.pool.ptr = NULL;
li_throttle_pool_free(vr->wrk->srv, pool);
}
static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
@ -127,13 +178,21 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
GList *lnk, *lnk_next;
guint now = THROTTLE_EVTSTAMP_TO_GINT(CUR_TS(wrk));
if (now - pool->worker_last_rearm[wrk->ndx] < THROTTLE_GRANULARITY)
time_diff = now - pool->worker_last_rearm[wrk->ndx];
/* overflow after 31 days... */
if (G_UNLIKELY(time_diff < 0))
time_diff = 1000;
if (G_LIKELY(time_diff < THROTTLE_GRANULARITY) && G_LIKELY(time_diff > 0))
return;
/* milliseconds since last global rearm */
time_diff = now - g_atomic_int_get(&pool->last_rearm);
if (G_UNLIKELY(time_diff < 0))
time_diff = 1000;
/* check if we have to rearm any magazines */
if (time_diff >= THROTTLE_GRANULARITY) {
if (G_UNLIKELY(time_diff >= THROTTLE_GRANULARITY)) {
/* spinlock while we rearm the magazines */
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
gint worker_num_cons[wrk->srv->worker_count];
@ -145,14 +204,15 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
num_cons += worker_num_cons[i];
}
/* rearm the worker magazines */
supply = ((pool->rate / 1000) * MIN(time_diff, 2000)) / num_cons;
if (num_cons) {
/* rearm the worker magazines */
supply = ((pool->rate / 1000) * MIN(time_diff, 1000)) / num_cons;
for (i = 0; i < wrk->srv->worker_count; i++) {
if (worker_num_cons[i] == 0)
continue;
for (i = 0; i < wrk->srv->worker_count; i++) {
if (worker_num_cons[i] == 0)
continue;
g_atomic_int_add(&pool->worker_magazine[i], supply * worker_num_cons[i]);
g_atomic_int_add(&pool->worker_magazine[i], supply * worker_num_cons[i]);
}
}
g_atomic_int_set(&pool->last_rearm, now);
@ -166,6 +226,7 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
/* select current queue */
queue = pool->worker_queues[wrk->ndx];
if (queue->length) {
g_atomic_int_set(&pool->worker_num_cons_queued[wrk->ndx], 0);
@ -175,8 +236,13 @@ static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
/* rearm connections */
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
((liVRequest*)lnk->data)->throttle.pool.magazine += supply;
((liVRequest*)lnk->data)->throttle.pool.queue = NULL;
if (pool->type == LI_THROTTLE_POOL_NAME) {
((liVRequest*)lnk->data)->throttle.pool.magazine += supply;
((liVRequest*)lnk->data)->throttle.pool.queue = NULL;
} else {
((liVRequest*)lnk->data)->throttle.ip.magazine += supply;
((liVRequest*)lnk->data)->throttle.ip.queue = NULL;
}
lnk_next = lnk->next;
lnk->next = NULL;
lnk->prev = NULL;
@ -195,8 +261,14 @@ void li_throttle_reset(liVRequest *vr) {
/* remove from throttle queue */
li_waitqueue_remove(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem);
li_throttle_pool_release(vr);
if (vr->throttle.pool.ptr)
li_throttle_pool_release(vr, vr->throttle.pool.ptr);
if (vr->throttle.ip.ptr)
li_throttle_pool_release(vr, vr->throttle.ip.ptr);
vr->throttle.pool.magazine = 0;
vr->throttle.ip.magazine = 0;
vr->throttle.con.rate = 0;
vr->throttle.magazine = 0;
vr->throttled = FALSE;
@ -204,7 +276,6 @@ void li_throttle_reset(liVRequest *vr) {
void li_throttle_cb(liWaitQueue *wq, gpointer data) {
liWaitQueueElem *wqe;
liThrottlePool *pool;
liVRequest *vr;
liWorker *wrk;
ev_tstamp now;
@ -218,11 +289,25 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
if (vr->throttle.pool.ptr) {
/* throttled by pool */
pool = vr->throttle.pool.ptr;
li_throttle_pool_rearm(wrk, vr->throttle.pool.ptr);
li_throttle_pool_rearm(wrk, pool);
if (vr->throttle.ip.ptr) {
/* throttled by pool+IP */
if (vr->throttle.con.rate) {
li_throttle_pool_rearm(wrk, vr->throttle.ip.ptr);
supply = MIN(vr->throttle.pool.magazine, vr->throttle.ip.magazine);
if (vr->throttle.con.rate) {
/* throttled by pool+IP+con */
supply = MIN(supply, vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY);
}
vr->throttle.pool.magazine -= supply;
vr->throttle.ip.magazine -= supply;
vr->throttle.magazine += supply;
} else if (vr->throttle.con.rate) {
/* throttled by pool+con */
supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
@ -230,29 +315,38 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
vr->throttle.magazine += vr->throttle.pool.magazine;
vr->throttle.pool.magazine = 0;
}
/* TODO: throttled by ip */
} else if (vr->throttle.ip.ptr) {
/* throttled by IP */
li_throttle_pool_rearm(wrk, vr->throttle.ip.ptr);
if (vr->throttle.con.rate) {
/* throttled by IP+con */
supply = MIN(vr->throttle.ip.magazine, vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.ip.magazine -= supply;
} else {
vr->throttle.magazine += vr->throttle.ip.magazine;
vr->throttle.ip.magazine = 0;
}
} else {
/* throttled by connection */
if (vr->throttle.magazine <= vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY * 4)
if (vr->throttle.magazine <= vr->throttle.con.rate)
vr->throttle.magazine += vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY;
}
vr->coninfo->callbacks->handle_check_io(vr);
}
li_waitqueue_update(wq);
}
void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max) {
vr->throttle.magazine -= transferred;
/*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con,
transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/
if (vr->throttle.magazine <= 0) {
li_waitqueue_push(&vr->wrk->throttle_queue, &vr->throttle.wqueue_elem);
}
/* queue in pool if necessary */
if (vr->throttle.pool.ptr && vr->throttle.pool.magazine <= write_max && !vr->throttle.pool.queue) {
liThrottlePool *pool = vr->throttle.pool.ptr;
@ -260,4 +354,13 @@ void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max)
g_queue_push_tail_link(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
g_atomic_int_inc(&pool->worker_num_cons_queued[vr->wrk->ndx]);
}
/* queue in IP pool if necessary */
if (vr->throttle.ip.ptr && vr->throttle.ip.magazine <= write_max && !vr->throttle.ip.queue) {
liThrottlePool *pool = vr->throttle.ip.ptr;
vr->throttle.ip.queue = pool->worker_queues[vr->wrk->ndx];
g_queue_push_tail_link(vr->throttle.ip.queue, &vr->throttle.ip.lnk);
g_atomic_int_inc(&pool->worker_num_cons_queued[vr->wrk->ndx]);
}
}

@ -192,6 +192,7 @@ liVRequest* li_vrequest_new(liWorker *wrk, liConInfo *coninfo) {
vr->throttle.wqueue_elem.data = vr;
vr->throttle.pool.lnk.data = vr;
vr->throttle.ip.lnk.data = vr;
return vr;
}

Loading…
Cancel
Save