diff --git a/include/lighttpd/server.h b/include/lighttpd/server.h index 1ee58a3..15a9d89 100644 --- a/include/lighttpd/server.h +++ b/include/lighttpd/server.h @@ -129,6 +129,7 @@ struct liServer { gdouble io_timeout; GArray *throttle_pools; + GStaticMutex throttle_pools_mutex; gdouble stat_cache_ttl; gint tasklet_pool_threads; diff --git a/include/lighttpd/throttle.h b/include/lighttpd/throttle.h index b4477f9..205e223 100644 --- a/include/lighttpd/throttle.h +++ b/include/lighttpd/throttle.h @@ -1,22 +1,28 @@ #ifndef _LIGHTTPD_THROTTLE_H_ #define _LIGHTTPD_THROTTLE_H_ -#define THROTTLE_GRANULARITY 0.2 /* defines how frequently a magazine is refilled. should be 0.1 <= x <= 1.0 */ +#define THROTTLE_GRANULARITY 200 /* defines how frequently (in milliseconds) a magazine is refilled */ + +/* this makro converts a ev_tstamp to a gint. this is needed for atomic access. millisecond precision, can hold a month max */ +#define THROTTLE_EVTSTAMP_TO_GINT(x) ((gint) ((x - ((gint)x - (gint)x % (3600*24*31))) * 1000)) struct liThrottlePool { + /* global per pool */ GString *name; - guint rate; /** bytes/s */ - gint magazine; - GQueue** queues; /** worker specific queues */ - gint num_cons_queued; - - gint rearming; - ev_tstamp last_pool_rearm; - ev_tstamp *last_con_rearm; + gint rate; /* bytes/s */ + gint refcount; + gint rearming; /* atomic access, 1 if a worker is currently rearming the magazine */ + gint last_rearm; /* gint for atomic access. represents a ((gint)ev_tstamp*1000) */ + + /* local per worker */ + gint *worker_magazine; + gint *worker_last_rearm; + gint *worker_num_cons_queued; + GQueue** worker_queues; }; struct liThrottleParam { - guint rate; + gint rate; guint burst; }; diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index e01b979..3ce17ef 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -137,7 +137,7 @@ struct liVRequest { gchar unused; /* this struct is unused for now */ } ip; struct { - guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */ + gint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */ ev_tstamp last_update; } con; liWaitQueueElem wqueue_elem; diff --git a/src/main/plugin_core.c b/src/main/plugin_core.c index 6fbfae0..97dc424 100644 --- a/src/main/plugin_core.c +++ b/src/main/plugin_core.c @@ -1500,7 +1500,6 @@ static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param, static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) { GString *name; - guint i; liThrottlePool *pool = NULL; gint64 rate; @@ -1520,11 +1519,10 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l return NULL; } - name = g_array_index(val->data.list, liValue*, 0)->data.string; rate = g_array_index(val->data.list, liValue*, 1)->data.number; if (rate && rate < (32*1024)) { - ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum or 0 for unlimited)", rate); + ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum)", rate); return NULL; } @@ -1532,34 +1530,23 @@ static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, l ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate); return NULL; } + + name = li_value_extract_string(g_array_index(val->data.list, liValue*, 0)); } else { - name = val->data.string; + name = li_value_extract_string(val); rate = 0; } - for (i = 0; i < srv->throttle_pools->len; i++) { - if (g_string_equal(g_array_index(srv->throttle_pools, liThrottlePool*, i)->name, name)) { - /* pool already defined */ - if (val->type == LI_VALUE_LIST && g_array_index(srv->throttle_pools, liThrottlePool*, i)->rate != (guint)rate) { - ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", name->str, - g_array_index(srv->throttle_pools, liThrottlePool*, i)->rate); - return NULL; - } - - pool = g_array_index(srv->throttle_pools, liThrottlePool*, i); - break; - } - } + pool = li_throttle_pool_new(srv, name, rate); if (!pool) { - /* pool not yet defined */ - if (val->type == LI_VALUE_STRING) { - ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str); - return NULL; - } + ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str); + return NULL; + } - pool = li_throttle_pool_new(srv, li_value_extract_string(g_array_index(val->data.list, liValue*, 0)), (guint)rate); - g_array_append_val(srv->throttle_pools, pool); + if (rate != pool->rate && rate != 0) { + ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", pool->name->str, pool->rate); + return NULL; } return li_action_new_function(core_handle_throttle_pool, NULL, NULL, pool); @@ -1581,7 +1568,7 @@ static liHandlerResult core_handle_throttle_connection(liVRequest *vr, gpointer vr->throttled = TRUE; if (vr->throttle.pool.magazine) { - guint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate * THROTTLE_GRANULARITY); + gint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate / 1000 * THROTTLE_GRANULARITY); vr->throttle.magazine += supply; vr->throttle.pool.magazine -= supply; } else { @@ -1743,48 +1730,68 @@ static const liPluginAngel angelcbs[] = { #include static void plugin_core_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk) { -#if defined(LIGHTY_OS_LINUX) - /* sched_setaffinity is only available on linux */ - cpu_set_t mask; - liValue *v = srv->workers_cpu_affinity; - GArray *arr; + guint i; + UNUSED(p); - if (!v) - return; + /* initialize throttle pools that have not been yet */ + g_static_mutex_lock(&srv->throttle_pools_mutex); + for (i = 0; i < srv->throttle_pools->len; i++) { + liThrottlePool *pool = g_array_index(srv->throttle_pools, liThrottlePool*, i); - arr = v->data.list; + if (!pool->worker_queues) { + pool->worker_magazine = g_new0(gint, srv->worker_count); + pool->worker_last_rearm = g_new0(gint, srv->worker_count); + pool->worker_num_cons_queued = g_new0(gint, srv->worker_count); + pool->worker_queues = g_new0(GQueue*, srv->worker_count); - if (wrk->ndx >= arr->len) { - WARNING(srv, "worker #%u has no entry in workers.cpu_affinity", wrk->ndx+1); - return; + for (i = 0; i < srv->worker_count; i++) { + pool->worker_queues[i] = g_queue_new(); + pool->worker_last_rearm[i] = pool->last_rearm; + } + } } + g_static_mutex_unlock(&srv->throttle_pools_mutex); - CPU_ZERO(&mask); +#if defined(LIGHTY_OS_LINUX) + /* sched_setaffinity is only available on linux */ + { + cpu_set_t mask; + liValue *v = srv->workers_cpu_affinity; + GArray *arr; - v = g_array_index(arr, liValue*, wrk->ndx); - if (v->type == LI_VALUE_NUMBER) { - CPU_SET(v->data.number, &mask); - DEBUG(srv, "binding worker #%u to cpu %u", wrk->ndx+1, (guint)v->data.number); - } else { - guint i; + if (!v) + return; - g_string_truncate(wrk->tmp_str, 0); arr = v->data.list; - for (i = 0; i < arr->len; i++) { - CPU_SET(g_array_index(arr, liValue*, i)->data.number, &mask); - g_string_append_printf(wrk->tmp_str, i ? ",%u":"%u", (guint)g_array_index(arr, liValue*, i)->data.number); + if (wrk->ndx >= arr->len) { + WARNING(srv, "worker #%u has no entry in workers.cpu_affinity", wrk->ndx+1); + return; } - DEBUG(srv, "binding worker #%u to cpus %s", wrk->ndx+1, wrk->tmp_str->str); - } + CPU_ZERO(&mask); + + v = g_array_index(arr, liValue*, wrk->ndx); + if (v->type == LI_VALUE_NUMBER) { + CPU_SET(v->data.number, &mask); + DEBUG(srv, "binding worker #%u to cpu %u", wrk->ndx+1, (guint)v->data.number); + } else { + g_string_truncate(wrk->tmp_str, 0); + arr = v->data.list; + + for (i = 0; i < arr->len; i++) { + CPU_SET(g_array_index(arr, liValue*, i)->data.number, &mask); + g_string_append_printf(wrk->tmp_str, i ? ",%u":"%u", (guint)g_array_index(arr, liValue*, i)->data.number); + } - if (0 != sched_setaffinity(0, sizeof(mask), &mask)) { - ERROR(srv, "couldn't set cpu affinity mask for worker #%u: %s", wrk->ndx, g_strerror(errno)); + DEBUG(srv, "binding worker #%u to cpus %s", wrk->ndx+1, wrk->tmp_str->str); + } + + if (0 != sched_setaffinity(0, sizeof(mask), &mask)) { + ERROR(srv, "couldn't set cpu affinity mask for worker #%u: %s", wrk->ndx, g_strerror(errno)); + } } -#else - UNUSED(srv); UNUSED(wrk); UNUSED(p); #endif } diff --git a/src/main/server.c b/src/main/server.c index 728073e..f9bba16 100644 --- a/src/main/server.c +++ b/src/main/server.c @@ -140,6 +140,7 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) { li_server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT")); srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(liThrottlePool*)); + g_static_mutex_init(&srv->throttle_pools_mutex); li_log_init(srv); @@ -219,6 +220,7 @@ void li_server_free(liServer* srv) { li_throttle_pool_free(srv, g_array_index(srv->throttle_pools, liThrottlePool*, i)); } g_array_free(srv->throttle_pools, TRUE); + g_static_mutex_free(&srv->throttle_pools_mutex); } if (srv->acon) { diff --git a/src/main/throttle.c b/src/main/throttle.c index 399b6fc..3b1ba92 100644 --- a/src/main/throttle.c +++ b/src/main/throttle.c @@ -6,136 +6,187 @@ #include - liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) { liThrottlePool *pool; guint i; - guint worker_count; - worker_count = srv->worker_count ? srv->worker_count : 1; + g_static_mutex_lock(&srv->throttle_pools_mutex); + + /* check if we already have a pool with that name */ + for (i = 0; i < srv->throttle_pools->len; i++) { + pool = g_array_index(srv->throttle_pools, liThrottlePool*, i); + + if (g_string_equal(pool->name, name)) { + g_atomic_int_inc(&pool->refcount); + g_static_mutex_unlock(&srv->throttle_pools_mutex); + g_string_free(name, TRUE); + return pool; + } + } + + if (rate == 0) { + g_static_mutex_unlock(&srv->throttle_pools_mutex); + return NULL; + } pool = g_slice_new0(liThrottlePool); pool->rate = rate; - pool->magazine = rate * THROTTLE_GRANULARITY; pool->name = name; - - pool->queues = g_new0(GQueue*, worker_count);; - for (i = 0; i < worker_count; i++) { - pool->queues[i] = g_queue_new(); + pool->last_rearm = THROTTLE_EVTSTAMP_TO_GINT(ev_time()); + pool->refcount = 1; + + /* + * We if we are not in LI_SERVER_INIT state, we can initialize the queues directly. + * Otherwise they'll get initialized in the worker prepare callback. + */ + if (g_atomic_int_get(&srv->state) != LI_SERVER_INIT) { + pool->worker_magazine = g_new0(gint, srv->worker_count); + pool->worker_last_rearm = g_new0(gint, srv->worker_count); + pool->worker_num_cons_queued = g_new0(gint, srv->worker_count); + pool->worker_queues = g_new0(GQueue*, srv->worker_count); + + for (i = 0; i < srv->worker_count; i++) { + pool->worker_queues[i] = g_queue_new(); + pool->worker_last_rearm[i] = pool->last_rearm; + } } - pool->last_pool_rearm = ev_time(); - pool->last_con_rearm = g_new0(ev_tstamp, worker_count); - for (i = 0; i < worker_count; i++) { - pool->last_con_rearm[i] = pool->last_pool_rearm; - } + g_array_append_val(srv->throttle_pools, pool); + + g_static_mutex_unlock(&srv->throttle_pools_mutex); return pool; } void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) { guint i; - guint worker_count; - worker_count = srv->worker_count ? srv->worker_count : 1; + if (!g_atomic_int_dec_and_test(&pool->refcount)) + return; - for (i = 0; i < worker_count; i++) { - g_queue_free(pool->queues[i]); + g_static_mutex_lock(&srv->throttle_pools_mutex); + for (i = 0; i < srv->throttle_pools->len; i++) { + if (pool == g_array_index(srv->throttle_pools, liThrottlePool*, i)) { + g_array_remove_index_fast(srv->throttle_pools, i); + break; + } + } + g_static_mutex_unlock(&srv->throttle_pools_mutex); + + if (pool->worker_queues) { + for (i = 0; i < srv->worker_count; i++) { + g_queue_free(pool->worker_queues[i]); + } + + g_free(pool->worker_magazine); + g_free(pool->worker_last_rearm); + g_free(pool->worker_num_cons_queued); + g_free(pool->worker_queues); } - g_free(pool->queues); - g_free(pool->last_con_rearm); g_string_free(pool->name, TRUE); g_slice_free(liThrottlePool, pool); } void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) { - gint magazine; - if (vr->throttle.pool.ptr == pool) return; + g_atomic_int_inc(&pool->refcount); + if (vr->throttle.pool.ptr != NULL) { /* already in a different pool */ li_throttle_pool_release(vr); } - /* try to steal some initial 4kbytes from the pool */ - while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) { - if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) { - vr->throttle.pool.magazine = 4*1024; - break; - } - } - vr->throttle.pool.ptr = pool; vr->throttled = TRUE; } void li_throttle_pool_release(liVRequest *vr) { - if (vr->throttle.pool.queue == NULL) + if (vr->throttle.pool.ptr == NULL) return; + g_atomic_int_add(&vr->throttle.pool.ptr->refcount, -1); + if (vr->throttle.pool.queue) { g_queue_unlink(vr->throttle.pool.queue, &vr->throttle.pool.lnk); vr->throttle.pool.queue = NULL; - g_atomic_int_add(&vr->throttle.pool.ptr->num_cons_queued, -1); + g_atomic_int_add(&vr->throttle.pool.ptr->worker_num_cons_queued[vr->wrk->ndx], -1); } /* give back bandwidth */ - g_atomic_int_add(&vr->throttle.pool.ptr->magazine, vr->throttle.pool.magazine); vr->throttle.pool.magazine = 0; vr->throttle.pool.ptr = NULL; } static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) { - ev_tstamp now = CUR_TS(wrk); + gint time_diff, supply, num_cons, magazine; + guint i; + GQueue *queue; + GList *lnk, *lnk_next; + guint now = THROTTLE_EVTSTAMP_TO_GINT(CUR_TS(wrk)); - /* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */ - if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) { - if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) { - if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4)) { - g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY); + if (now - pool->worker_last_rearm[wrk->ndx] < THROTTLE_GRANULARITY) + return; - pool->last_pool_rearm = now; + /* milliseconds since last global rearm */ + time_diff = now - g_atomic_int_get(&pool->last_rearm); + /* check if we have to rearm any magazines */ + if (time_diff >= THROTTLE_GRANULARITY) { + /* spinlock while we rearm the magazines */ + if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) { + gint worker_num_cons[wrk->srv->worker_count]; + + /* calculate total cons, take a safe "snapshot" */ + num_cons = 0; + for (i = 0; i < wrk->srv->worker_count; i++) { + worker_num_cons[i] = g_atomic_int_get(&pool->worker_num_cons_queued[i]); + num_cons += worker_num_cons[i]; } - } - g_atomic_int_set(&pool->rearming, 0); - } + /* rearm the worker magazines */ + supply = ((pool->rate / 1000) * MIN(time_diff, 2000)) / num_cons; - if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) { - GQueue *queue; - GList *lnk, *lnk_next; - gint magazine, supply, num_cons; - - /* select current queue */ - queue = pool->queues[wrk->ndx]; - - if (queue->length) { - do { - magazine = g_atomic_int_get(&pool->magazine); - num_cons = g_atomic_int_get(&pool->num_cons_queued); - supply = magazine / num_cons; - } while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length))); - - g_atomic_int_add(&(pool->num_cons_queued), - queue->length); - - /* rearm connections */ - for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) { - ((liVRequest*)lnk->data)->throttle.pool.magazine += supply; - ((liVRequest*)lnk->data)->throttle.pool.queue = NULL; - lnk_next = lnk->next; - lnk->next = NULL; - lnk->prev = NULL; + for (i = 0; i < wrk->srv->worker_count; i++) { + if (worker_num_cons[i] == 0) + continue; + + g_atomic_int_add(&pool->worker_magazine[i], supply * worker_num_cons[i]); } - /* clear current connection queue */ - g_queue_init(queue); + g_atomic_int_set(&pool->last_rearm, now); + g_atomic_int_set(&pool->rearming, 0); + } else { + /* wait for pool rearm to finish */ + while (g_atomic_int_get(&pool->rearming) == 1) { } } + } + + + /* select current queue */ + queue = pool->worker_queues[wrk->ndx]; + if (queue->length) { + g_atomic_int_set(&pool->worker_num_cons_queued[wrk->ndx], 0); + + magazine = g_atomic_int_get(&pool->worker_magazine[wrk->ndx]); + g_atomic_int_add(&pool->worker_magazine[wrk->ndx], -magazine); + supply = magazine / queue->length; - pool->last_con_rearm[wrk->ndx] = now; + /* rearm connections */ + for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) { + ((liVRequest*)lnk->data)->throttle.pool.magazine += supply; + ((liVRequest*)lnk->data)->throttle.pool.queue = NULL; + lnk_next = lnk->next; + lnk->next = NULL; + lnk->prev = NULL; + } + + /* clear current connection queue */ + g_queue_init(queue); } + + pool->worker_last_rearm[wrk->ndx] = now; } void li_throttle_reset(liVRequest *vr) { @@ -157,7 +208,7 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) { liVRequest *vr; liWorker *wrk; ev_tstamp now; - guint supply; + gint supply; wrk = data; now = ev_now(wrk->loop); @@ -172,7 +223,7 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) { li_throttle_pool_rearm(wrk, pool); if (vr->throttle.con.rate) { - supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate * THROTTLE_GRANULARITY); + supply = MIN(vr->throttle.pool.magazine, vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY); vr->throttle.magazine += supply; vr->throttle.pool.magazine -= supply; } else { @@ -182,8 +233,8 @@ void li_throttle_cb(liWaitQueue *wq, gpointer data) { /* TODO: throttled by ip */ } else { /* throttled by connection */ - if (vr->throttle.magazine <= vr->throttle.con.rate * THROTTLE_GRANULARITY * 4) - vr->throttle.magazine += vr->throttle.con.rate * THROTTLE_GRANULARITY; + if (vr->throttle.magazine <= vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY * 4) + vr->throttle.magazine += vr->throttle.con.rate / 1000 * THROTTLE_GRANULARITY; } vr->coninfo->callbacks->handle_check_io(vr); @@ -204,8 +255,9 @@ void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max) if (vr->throttle.pool.ptr && vr->throttle.pool.magazine <= write_max && !vr->throttle.pool.queue) { liThrottlePool *pool = vr->throttle.pool.ptr; - g_atomic_int_inc(&pool->num_cons_queued); - vr->throttle.pool.queue = pool->queues[vr->wrk->ndx]; + + vr->throttle.pool.queue = pool->worker_queues[vr->wrk->ndx]; g_queue_push_tail_link(vr->throttle.pool.queue, &vr->throttle.pool.lnk); + g_atomic_int_inc(&pool->worker_num_cons_queued[vr->wrk->ndx]); } } diff --git a/src/main/worker.c b/src/main/worker.c index d7765c1..b9941f8 100644 --- a/src/main/worker.c +++ b/src/main/worker.c @@ -407,7 +407,7 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) { li_waitqueue_init(&wrk->io_timeout_queue, wrk->loop, worker_io_timeout_cb, srv->io_timeout, wrk); /* throttling */ - li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, THROTTLE_GRANULARITY, wrk); + li_waitqueue_init(&wrk->throttle_queue, wrk->loop, li_throttle_cb, ((gdouble)THROTTLE_GRANULARITY) / 1000, wrk); li_job_queue_init(&wrk->jobqueue, wrk->loop);