[core] Rework throttle-pool handling to be thread-safe in creation/destruction aswell as more accurate and fair throttling

personal/stbuehler/wip
Thomas Porzelt 13 years ago
parent 2a04983b9d
commit a3b7ce74af

@ -129,6 +129,7 @@ struct liServer {
gdouble io_timeout;
GArray *throttle_pools;
GStaticMutex throttle_pools_mutex;
gdouble stat_cache_ttl;
gint tasklet_pool_threads;

@ -1,22 +1,28 @@
#ifndef _LIGHTTPD_THROTTLE_H_
#define _LIGHTTPD_THROTTLE_H_
#define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */
#define THROTTLE_GRANULARITY 200 /* defines how frequently (in milliseconds) a magazine is refilled */
/* this makro converts a ev_tstamp to a gint. this is needed for atomic access. millisecond precision, can hold a month max */
#define THROTTLE_EVTSTAMP_TO_GINT(x) ((gint) ((x - ((gint)x - (gint)x % (3600*24*31))) * 1000))
struct liThrottlePool {
/* global per pool */
GString *name;
guint rate; /** bytes/s */
gint magazine;
GQueue** queues; /** worker specific queues */
gint num_cons_queued;
gint rearming;
ev_tstamp last_pool_rearm;
ev_tstamp *last_con_rearm;
gint rate; /* bytes/s */
gint refcount;
gint rearming; /* atomic access, 1 if a worker is currently rearming the magazine */
gint last_rearm; /* gint for atomic access. represents a ((gint)ev_tstamp*1000) */
/* local per worker */
gint *worker_magazine;
gint *worker_last_rearm;
gint *worker_num_cons_queued;
GQueue** worker_queues;
};
struct liThrottleParam {
guint rate;
gint rate;
guint burst;
};

@ -137,7 +137,7 @@ struct liVRequest {
gchar unused; /* this struct is unused for now */
} ip;
struct {
guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
gint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
ev_tstamp last_update;
} con;
liWaitQueueElem wqueue_elem;

@ -1500,7 +1500,6 @@ static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param,
static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
GString *name;
guint i;
liThrottlePool *pool = NULL;
gint64 rate;
@ -1520,11 +1519,10 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l
return NULL;
}
name = g_array_index(val->data.list, liValue*, 0)->data.string;
rate = g_array_index(val->data.list, liValue*, 1)->data.number;
if (rate && rate < (32*1024)) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate);
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum)", rate);
return NULL;
}
@ -1532,34 +1530,23 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
name = li_value_extract_string(g_array_index(val->data.list, liValue*, 0));
} else {
name = val->data.string;
name = li_value_extract_string(val);
rate = 0;
}
for (i = 0; i < srv->throttle_pools->len; i++) {
if (g_string_equal(g_array_index(srv->throttle_pools, liThrottlePool*, i)->name, name)) {
/* pool already defined */
if (val->type == LI_VALUE_LIST && g_array_index(srv->throttle_pools, liThrottlePool*, i)->rate != (guint)rate) {
ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", name->str,
g_array_index(srv->throttle_pools, liThrottlePool*, i)->rate);
return NULL;
}
pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
break;
}
}
pool = li_throttle_pool_new(srv, name, rate);
if (!pool) {
/* pool not yet defined */
if (val->type == LI_VALUE_STRING) {
ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str);
return NULL;
}
ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str);
return NULL;
}
pool = li_throttle_pool_new(srv, li_value_extract_string(g_array_index(val->data.list, liValue*, 0)), (guint)rate);
g_array_append_val(srv->throttle_pools, pool);
if (rate != pool->rate && rate != 0) {
ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", pool->name->str, pool->rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_pool, NULL, NULL, pool);
@ -1581,7 +1568,7 @@ static liHandlerResult core_handle_throttle_connection(liVRequest *vr, gpointer
vr->throttled = TRUE;
if (vr->throttle.pool.magazine) {
guint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY);
gint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate / 1000 * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
} else {
@ -1743,48 +1730,68 @@ static const liPluginAngel angelcbs[] = {
#include <sys/types.h>
static void plugin_core_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk) {
#if defined(LIGHTY_OS_LINUX)
/* sched_setaffinity is only available on linux */
cpu_set_t mask;
liValue *v = srv->workers_cpu_affinity;
GArray *arr;
guint i;
UNUSED(p);
if (!v)
return;
/* initialize throttle pools that have not been yet */
g_static_mutex_lock(&srv->throttle_pools_mutex);
for (i = 0; i < srv->throttle_pools->len; i++) {
liThrottlePool *pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
arr = v->data.list;
if (!pool->worker_queues) {
pool->worker_magazine = g_new0(gint, srv->worker_count);
pool->worker_last_rearm = g_new0(gint, srv->worker_count);
pool->worker_num_cons_queued = g_new0(gint, srv->worker_count);
pool->worker_queues = g_new0(GQueue*, srv->worker_count);
if (wrk->ndx >= arr->len) {
WARNING(srv, "worker #%u has no entry in workers.cpu_affinity", wrk->ndx+1);
return;
for (i = 0; i < srv->worker_count; i++) {
pool->worker_queues[i] = g_queue_new();
pool->worker_last_rearm[i] = pool->last_rearm;
}
}
}
g_static_mutex_unlock(&srv->throttle_pools_mutex);
CPU_ZERO(&mask);
#if defined(LIGHTY_OS_LINUX)
/* sched_setaffinity is only available on linux */
{
cpu_set_t mask;
liValue *v = srv->workers_cpu_affinity;
GArray *arr;
v = g_array_index(arr, liValue*, wrk->ndx);
if (v->type == LI_VALUE_NUMBER) {
CPU_SET(v->data.number, &mask);
DEBUG(srv, "binding worker #%u to cpu %u", wrk->ndx+1, (guint)v->data.number);
} else {
guint i;
if (!v)
return;
g_string_truncate(wrk->tmp_str, 0);
arr = v->data.list;
for (i = 0; i < arr->len; i++) {
CPU_SET(g_array_index(arr, liValue*, i)->data.number, &mask);
g_string_append_printf(wrk->tmp_str, i ? ",%u":"%u", (guint)g_array_index(arr, liValue*, i)->data.number);
if (wrk->ndx >= arr->len) {
WARNING(srv, "worker #%u has no entry in workers.cpu_affinity", wrk->ndx+1);
return;
}
DEBUG(srv, "binding worker #%u to cpus %s", wrk->ndx+1, wrk->tmp_str->str);
}
CPU_ZERO(&mask);
v = g_array_index(arr, liValue*, wrk->ndx);
if (v->type == LI_VALUE_NUMBER) {
CPU_SET(v->data.number, &mask);
DEBUG(srv, "binding worker #%u to cpu %u", wrk->ndx+1, (guint)v->data.number);
} else {
g_string_truncate(wrk->tmp_str, 0);
arr = v->data.list;
for (i = 0; i < arr->len; i++) {
CPU_SET(g_array_index(arr, liValue*, i)->data.number, &mask);
g_string_append_printf(wrk->tmp_str, i ? ",%u":"%u", (guint)g_array_index(arr, liValue*, i)->data.number);
}
if (0 != sched_setaffinity(0, sizeof(mask), &mask)) {
ERROR(srv, "couldn't set cpu affinity mask for worker #%u: %s", wrk->ndx, g_strerror(errno));
DEBUG(srv, "binding worker #%u to cpus %s", wrk->ndx+1, wrk->tmp_str->str);
}
if (0 != sched_setaffinity(0, sizeof(mask), &mask)) {
ERROR(srv, "couldn't set cpu affinity mask for worker #%u: %s", wrk->ndx, g_strerror(errno));
}
}
#else
UNUSED(srv); UNUSED(wrk); UNUSED(p);
#endif
}

@ -140,6 +140,7 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) {
li_server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT"));
srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(liThrottlePool*));
g_static_mutex_init(&srv->throttle_pools_mutex);
li_log_init(srv);
@ -219,6 +220,7 @@ void li_server_free(liServer* srv) {
li_throttle_pool_free(srv, g_array_index(srv->throttle_pools, liThrottlePool*, i));
}
g_array_free(srv->throttle_pools, TRUE);
g_static_mutex_free(&srv->throttle_pools_mutex);
}
if (srv->acon) {

@ -6,136 +6,187 @@
#include <lighttpd/base.h>
liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) {
liThrottlePool *pool;
guint i;
guint worker_count;
worker_count = srv->worker_count ? srv->worker_count : 1;
g_static_mutex_lock(&srv->throttle_pools_mutex);
/* check if we already have a pool with that name */
for (i = 0; i < srv->throttle_pools->len; i++) {
pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
if (g_string_equal(pool->name, name)) {
g_atomic_int_inc(&pool->refcount);
g_static_mutex_unlock(&srv->throttle_pools_mutex);
g_string_free(name, TRUE);
return pool;
}
}
if (rate == 0) {
g_static_mutex_unlock(&srv->throttle_pools_mutex);
return NULL;
}
pool = g_slice_new0(liThrottlePool);
pool->rate = rate;
pool->magazine = rate * THROTTLE_GRANULARITY;
pool->name = name;
pool->queues = g_new0(GQueue*, worker_count);;
for (i = 0; i < worker_count; i++) {
pool->queues[i] = g_queue_new();
pool->last_rearm = THROTTLE_EVTSTAMP_TO_GINT(ev_time());
pool->refcount = 1;
/*
* We if we are not in LI_SERVER_INIT state, we can initialize the queues directly.
* Otherwise they'll get initialized in the worker prepare callback.
*/
if (g_atomic_int_get(&srv->state) != LI_SERVER_INIT) {
pool->worker_magazine = g_new0(gint, srv->worker_count);
pool->worker_last_rearm = g_new0(gint, srv->worker_count);
pool->worker_num_cons_queued = g_new0(gint, srv->worker_count);
pool->worker_queues = g_new0(GQueue*, srv->worker_count);
for (i = 0; i < srv->worker_count; i++) {
pool->worker_queues[i] = g_queue_new();
pool->worker_last_rearm[i] = pool->last_rearm;
}
}
pool->last_pool_rearm = ev_time();
pool->last_con_rearm = g_new0(ev_tstamp, worker_count);
for (i = 0; i < worker_count; i++) {
pool->last_con_rearm[i] = pool->last_pool_rearm;
}
g_array_append_val(srv->throttle_pools, pool);
g_static_mutex_unlock(&srv->throttle_pools_mutex);
return pool;
}
void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
guint i;
guint worker_count;
worker_count = srv->worker_count ? srv->worker_count : 1;
if (!g_atomic_int_dec_and_test(&pool->refcount))
return;
for (i = 0; i < worker_count; i++) {
g_queue_free(pool->queues[i]);
g_static_mutex_lock(&srv->throttle_pools_mutex);
for (i = 0; i < srv->throttle_pools->len; i++) {
if (pool == g_array_index(srv->throttle_pools, liThrottlePool*, i)) {
g_array_remove_index_fast(srv->throttle_pools, i);
break;
}
}
g_static_mutex_unlock(&srv->throttle_pools_mutex);
if (pool->worker_queues) {
for (i = 0; i < srv->worker_count; i++) {
g_queue_free(pool->worker_queues[i]);
}
g_free(pool->worker_magazine);
g_free(pool->worker_last_rearm);
g_free(pool->worker_num_cons_queued);
g_free(pool->worker_queues);
}
g_free(pool->queues);
g_free(pool->last_con_rearm);
g_string_free(pool->name, TRUE);
g_slice_free(liThrottlePool, pool);
}
void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) {
gint magazine;
if (vr->throttle.pool.ptr == pool)
return;
g_atomic_int_inc(&pool->refcount);
if (vr->throttle.pool.ptr != NULL) {
/* already in a different pool */
li_throttle_pool_release(vr);
}
/* try to steal some initial 4kbytes from the pool */
while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) {
if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) {
vr->throttle.pool.magazine = 4*1024;
break;
}
}
vr->throttle.pool.ptr = pool;
vr->throttled = TRUE;
}
void li_throttle_pool_release(liVRequest *vr) {
if (vr->throttle.pool.queue == NULL)
if (vr->throttle.pool.ptr == NULL)
return;
g_atomic_int_add(&vr->throttle.pool.ptr->refcount, -1);
if (vr->throttle.pool.queue) {
g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
vr->throttle.pool.queue = NULL;
g_atomic_int_add(&vr->throttle.pool.ptr->num_cons_queued, -1);
g_atomic_int_add(&vr->throttle.pool.ptr->worker_num_cons_queued[vr->wrk->ndx], -1);
}
/* give back bandwidth */
g_atomic_int_add(&vr->throttle.pool.ptr->magazine, vr->throttle.pool.magazine);
vr->throttle.pool.magazine = 0;
vr->throttle.pool.ptr = NULL;
}
static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
ev_tstamp now = CUR_TS(wrk);
gint time_diff, supply, num_cons, magazine;
guint i;
GQueue *queue;
GList *lnk, *lnk_next;
guint now = THROTTLE_EVTSTAMP_TO_GINT(CUR_TS(wrk));
/* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) {
if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4)) {
g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY);
if (now - pool->worker_last_rearm[wrk->ndx] < THROTTLE_GRANULARITY)
return;
pool->last_pool_rearm = now;
/* milliseconds since last global rearm */
time_diff = now - g_atomic_int_get(&pool->last_rearm);
/* check if we have to rearm any magazines */
if (time_diff >= THROTTLE_GRANULARITY) {
/* spinlock while we rearm the magazines */
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
gint worker_num_cons[wrk->srv->worker_count];
/* calculate total cons, take a safe "snapshot" */
num_cons = 0;
for (i = 0; i < wrk->srv->worker_count; i++) {
worker_num_cons[i] = g_atomic_int_get(&pool->worker_num_cons_queued[i]);
num_cons += worker_num_cons[i];
}
}
g_atomic_int_set(&pool->rearming, 0);
}
/* rearm the worker magazines */
supply = ((pool->rate / 1000) * MIN(time_diff, 2000)) / num_cons;
if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) {
GQueue *queue;
GList *lnk, *lnk_next;
gint magazine, supply, num_cons;
/* select current queue */
queue = pool->queues[wrk->ndx];
if (queue->length) {
do {
magazine = g_atomic_int_get(&pool->magazine);
num_cons = g_atomic_int_get(&pool->num_cons_queued);
supply = magazine / num_cons;
} while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length)));
g_atomic_int_add(&(pool->num_cons_queued), - queue->length);
/* rearm connections */
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
((liVRequest*)lnk->data)->throttle.pool.magazine += supply;
((liVRequest*)lnk->data)->throttle.pool.queue = NULL;
lnk_next = lnk->next;
lnk->next = NULL;
lnk->prev = NULL;
for (i = 0; i < wrk->srv->worker_count; i++) {
if (worker_num_cons[i] == 0)
continue;
g_atomic_int_add(&pool->worker_magazine[i], supply * worker_num_cons[i]);
}
/* clear current connection queue */
g_queue_init(queue);
g_atomic_int_set(&pool->last_rearm, now);
g_atomic_int_set(&pool->rearming, 0);
} else {
/* wait for pool rearm to finish */
while (g_atomic_int_get(&pool->rearming) == 1) { }
}
}
/* select current queue */
queue = pool->worker_queues[wrk->ndx];
if (queue->length) {
g_atomic_int_set(&pool->worker_num_cons_queued[wrk->ndx], 0);
magazine = g_atomic_int_get(&pool->worker_magazine[wrk->ndx]);
g_atomic_int_add(&pool->worker_magazine[wrk->ndx], -magazine);
supply = magazine / queue->length;
pool->last_con_rearm[wrk->ndx] = now;
/* rearm connections */
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
((liVRequest*)lnk->data)->throttle.pool.magazine += supply;
((liVRequest*)lnk->data)->throttle.pool.queue = NULL;
lnk_next = lnk->next;
lnk->next = NULL;
lnk->prev = NULL;
}
/* clear current connection queue */
g_queue_init(queue);
}
pool->worker_last_rearm[wrk->ndx] = now;
}
void li_throttle_reset(liVRequest *vr) {
@ -157,7 +208,7 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
liVRequest *vr;
liWorker *wrk;
ev_tstamp now;
guint supply;
gint supply;
wrk = data;
now = ev_now(wrk->loop);
@ -172,7 +223,7 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
li_throttle_pool_rearm(wrk, pool);
if (vr->throttle.con.rate) {
supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate * THROTTLE_GRANULARITY);
supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
} else {
@ -182,8 +233,8 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) {
/* TODO: throttled by ip */
} else {
/* throttled by connection */
if (vr->throttle.magazine <= vr->throttle.con.rate * THROTTLE_GRANULARITY * 4)
vr->throttle.magazine += vr->throttle.con.rate * THROTTLE_GRANULARITY;
if (vr->throttle.magazine <= vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY * 4)
vr->throttle.magazine += vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY;
}
vr->coninfo->callbacks->handle_check_io(vr);
@ -204,8 +255,9 @@ void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max)
if (vr->throttle.pool.ptr && vr->throttle.pool.magazine <= write_max && !vr->throttle.pool.queue) {
liThrottlePool *pool = vr->throttle.pool.ptr;
g_atomic_int_inc(&pool->num_cons_queued);
vr->throttle.pool.queue = pool->queues[vr->wrk->ndx];
vr->throttle.pool.queue = pool->worker_queues[vr->wrk->ndx];
g_queue_push_tail_link(vr->throttle.pool.queue, &vr->throttle.pool.lnk);
g_atomic_int_inc(&pool->worker_num_cons_queued[vr->wrk->ndx]);
}
}

@ -407,7 +407,7 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
li_waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk);
/* throttling */
li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, THROTTLE_GRANULARITY, wrk);
li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, ((gdouble)THROTTLE_GRANULARITY) / 1000, wrk);
li_job_queue_init(&wrk->jobqueue, wrk->loop);

Loading…
Cancel
Save