[core] Rework throttle code a bit, especially regarding pools
This commit is contained in:
parent
fc09bf9095
commit
d18045ccbf
|
@ -65,15 +65,11 @@ struct liConnection {
|
|||
struct {
|
||||
liThrottlePool *ptr; /* NULL if not in any throttling pool */
|
||||
GList lnk;
|
||||
guint8 queue_ndx;
|
||||
gboolean queued;
|
||||
GQueue *queue;
|
||||
gint magazine;
|
||||
} pool;
|
||||
struct {
|
||||
liThrottlePool *ptr; /* pool for per-ip throttling, NULL if not limited by ip */
|
||||
GList lnk;
|
||||
gboolean queued;
|
||||
gint magazine;
|
||||
gchar unused; /* this struct is unused for now */
|
||||
} ip;
|
||||
struct {
|
||||
guint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
|
||||
|
|
|
@ -8,9 +8,8 @@ struct liThrottlePool {
|
|||
GString *name;
|
||||
guint rate; /** bytes/s */
|
||||
gint magazine;
|
||||
GQueue** queues; /** worker specific queues. each worker has 2 */
|
||||
guint* current_queue;
|
||||
gint num_cons;
|
||||
GQueue** queues; /** worker specific queues */
|
||||
gint num_cons_queued;
|
||||
|
||||
gint rearming;
|
||||
ev_tstamp last_pool_rearm;
|
||||
|
@ -23,9 +22,13 @@ struct liThrottleParam {
|
|||
guint burst;
|
||||
};
|
||||
|
||||
LI_API void li_throttle_reset(liConnection *con);
|
||||
LI_API void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents);
|
||||
|
||||
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate);
|
||||
LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool);
|
||||
|
||||
LI_API void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool);
|
||||
LI_API void li_throttle_pool_release(liConnection *con);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -382,13 +382,11 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
|
|||
li_waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
|
||||
}
|
||||
|
||||
if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queued) {
|
||||
if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queue) {
|
||||
liThrottlePool *pool = con->throttle.pool.ptr;
|
||||
guint8 queue_ndx = con->wrk->ndx+pool->current_queue[con->wrk->ndx];
|
||||
g_atomic_int_inc(&pool->num_cons);
|
||||
g_queue_push_tail_link(pool->queues[queue_ndx], &con->throttle.pool.lnk);
|
||||
con->throttle.pool.queue_ndx = queue_ndx;
|
||||
con->throttle.pool.queued = TRUE;
|
||||
g_atomic_int_inc(&pool->num_cons_queued);
|
||||
con->throttle.pool.queue = pool->queues[con->wrk->ndx];
|
||||
g_queue_push_tail_link(con->throttle.pool.queue, &con->throttle.pool.lnk);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,7 +502,6 @@ liConnection* li_connection_new(liWorker *wrk) {
|
|||
|
||||
con->throttle.wqueue_elem.data = con;
|
||||
con->throttle.pool.lnk.data = con;
|
||||
con->throttle.ip.lnk.data = con;
|
||||
|
||||
return con;
|
||||
}
|
||||
|
@ -577,42 +574,7 @@ void li_connection_reset(liConnection *con) {
|
|||
/* remove from timeout queue */
|
||||
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
|
||||
|
||||
/* remove from throttle queue */
|
||||
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
|
||||
|
||||
if (con->throttle.pool.ptr) {
|
||||
if (con->throttle.pool.queued) {
|
||||
liThrottlePool *pool = con->throttle.pool.ptr;
|
||||
g_queue_unlink(pool->queues[con->throttle.pool.queue_ndx], &con->throttle.pool.lnk);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
|
||||
con->throttle.pool.queued = FALSE;
|
||||
}
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
|
||||
con->throttle.pool.magazine = 0;
|
||||
con->throttle.ip.magazine = 0;
|
||||
con->throttle.con.magazine = 0;
|
||||
con->throttle.pool.ptr = NULL;
|
||||
}
|
||||
|
||||
if (con->throttle.ip.ptr) {
|
||||
if (con->throttle.ip.queued) {
|
||||
liThrottlePool *pool = con->throttle.ip.ptr;
|
||||
g_queue_unlink(pool->queues[con->throttle.pool.queue_ndx], &con->throttle.ip.lnk);
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
|
||||
con->throttle.ip.queued = FALSE;
|
||||
}
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
|
||||
con->throttle.ip.ptr = NULL;
|
||||
}
|
||||
|
||||
con->throttle.con.rate = 0;
|
||||
con->throttle.pool.magazine = 0;
|
||||
con->throttle.ip.magazine = 0;
|
||||
con->throttle.con.magazine = 0;
|
||||
con->throttled = FALSE;
|
||||
li_throttle_reset(con);
|
||||
}
|
||||
|
||||
static void li_connection_reset_keep_alive(liConnection *con) {
|
||||
|
@ -671,42 +633,8 @@ static void li_connection_reset_keep_alive(liConnection *con) {
|
|||
|
||||
/* remove from timeout queue */
|
||||
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
|
||||
/* remove from throttle queue */
|
||||
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
|
||||
|
||||
if (con->throttle.pool.ptr) {
|
||||
if (con->throttle.pool.queued) {
|
||||
liThrottlePool *pool = con->throttle.pool.ptr;
|
||||
g_queue_unlink(pool->queues[con->throttle.pool.queue_ndx], &con->throttle.pool.lnk);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
|
||||
con->throttle.pool.queued = FALSE;
|
||||
}
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
|
||||
con->throttle.pool.magazine = 0;
|
||||
con->throttle.ip.magazine = 0;
|
||||
con->throttle.con.magazine = 0;
|
||||
con->throttle.pool.ptr = NULL;
|
||||
}
|
||||
|
||||
if (con->throttle.ip.ptr) {
|
||||
if (con->throttle.ip.queued) {
|
||||
liThrottlePool *pool = con->throttle.ip.ptr;
|
||||
g_queue_unlink(pool->queues[con->throttle.pool.queue_ndx], &con->throttle.ip.lnk);
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
|
||||
con->throttle.ip.queued = FALSE;
|
||||
}
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
|
||||
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
|
||||
con->throttle.ip.ptr = NULL;
|
||||
}
|
||||
|
||||
con->throttle.con.rate = 0;
|
||||
con->throttle.pool.magazine = 0;
|
||||
con->throttle.ip.magazine = 0;
|
||||
con->throttle.con.magazine = 0;
|
||||
con->throttled = FALSE;
|
||||
li_throttle_reset(con);
|
||||
|
||||
if (con->raw_in->length != 0) {
|
||||
/* start handling next request if data is already available */
|
||||
|
|
|
@ -1304,34 +1304,10 @@ static liAction* core_buffer_in(liServer *srv, liPlugin* p, liValue *val, gpoint
|
|||
|
||||
static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param, gpointer *context) {
|
||||
liThrottlePool *pool = param;
|
||||
gint magazine;
|
||||
|
||||
UNUSED(context);
|
||||
|
||||
if (vr->con->throttle.pool.ptr != pool) {
|
||||
if (vr->con->throttle.pool.ptr) {
|
||||
/* connection has been in a different pool, give back bandwidth */
|
||||
g_atomic_int_add(&vr->con->throttle.pool.ptr->magazine, vr->con->throttle.pool.magazine);
|
||||
vr->con->throttle.pool.magazine = 0;
|
||||
if (vr->con->throttle.pool.queued) {
|
||||
liThrottlePool *p = vr->con->throttle.pool.ptr;
|
||||
g_queue_unlink(p->queues[vr->con->throttle.pool.queue_ndx], &vr->con->throttle.pool.lnk);
|
||||
g_atomic_int_add(&p->num_cons, -1);
|
||||
vr->con->throttle.pool.queued = FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
/* try to steal some initial 4kbytes from the pool */
|
||||
while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) {
|
||||
if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) {
|
||||
vr->con->throttle.pool.magazine = 4*1024;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vr->con->throttle.pool.ptr = pool;
|
||||
vr->con->throttled = TRUE;
|
||||
li_throttle_pool_acquire(vr->con, pool);
|
||||
|
||||
return LI_HANDLER_GO_ON;
|
||||
}
|
||||
|
|
|
@ -19,14 +19,11 @@ liThrottlePool *li_throttle_pool_new(liServer *srv, GString *name, guint rate) {
|
|||
pool->magazine = rate * THROTTLE_GRANULARITY;
|
||||
pool->name = name;
|
||||
|
||||
pool->queues = g_new0(GQueue*, worker_count * 2);;
|
||||
for (i = 0; i < (worker_count*2); i+=2) {
|
||||
pool->queues = g_new0(GQueue*, worker_count);;
|
||||
for (i = 0; i < worker_count; i++) {
|
||||
pool->queues[i] = g_queue_new();
|
||||
pool->queues[i+1] = g_queue_new();
|
||||
}
|
||||
|
||||
pool->current_queue = g_new0(guint, worker_count);
|
||||
|
||||
pool->last_pool_rearm = ev_time();
|
||||
pool->last_con_rearm = g_new0(ev_tstamp, worker_count);
|
||||
for (i = 0; i < worker_count; i++) {
|
||||
|
@ -42,20 +39,117 @@ void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
|
|||
|
||||
worker_count = srv->worker_count ? srv->worker_count : 1;
|
||||
|
||||
for (i = 0; i < (worker_count*2); i+=2) {
|
||||
for (i = 0; i < worker_count; i++) {
|
||||
g_queue_free(pool->queues[i]);
|
||||
g_queue_free(pool->queues[i+1]);
|
||||
}
|
||||
|
||||
g_free(pool->queues);
|
||||
|
||||
g_free(pool->current_queue);
|
||||
g_free(pool->last_con_rearm);
|
||||
|
||||
g_string_free(pool->name, TRUE);
|
||||
|
||||
g_slice_free(liThrottlePool, pool);
|
||||
}
|
||||
|
||||
void li_throttle_pool_acquire(liConnection *con, liThrottlePool *pool) {
|
||||
gint magazine;
|
||||
|
||||
if (con->throttle.pool.ptr == pool)
|
||||
return;
|
||||
|
||||
if (con->throttle.pool.ptr != NULL) {
|
||||
/* already in a different pool */
|
||||
li_throttle_pool_release(con);
|
||||
}
|
||||
|
||||
/* try to steal some initial 4kbytes from the pool */
|
||||
while ((magazine = g_atomic_int_get(&pool->magazine)) > (4*1024)) {
|
||||
if (g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (4*1024))) {
|
||||
con->throttle.pool.magazine = 4*1024;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
con->throttle.pool.ptr = pool;
|
||||
con->throttled = TRUE;
|
||||
}
|
||||
|
||||
void li_throttle_pool_release(liConnection *con) {
|
||||
if (con->throttle.pool.queue == NULL)
|
||||
return;
|
||||
|
||||
if (con->throttle.pool.queue) {
|
||||
g_queue_unlink(con->throttle.pool.queue, &con->throttle.pool.lnk);
|
||||
con->throttle.pool.queue = NULL;
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->num_cons_queued, -1);
|
||||
}
|
||||
|
||||
/* give back bandwidth */
|
||||
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
|
||||
con->throttle.pool.magazine = 0;
|
||||
con->throttle.pool.ptr = NULL;
|
||||
}
|
||||
|
||||
void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool) {
|
||||
ev_tstamp now = CUR_TS(wrk);
|
||||
|
||||
/* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */
|
||||
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
|
||||
if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) {
|
||||
if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4)) {
|
||||
g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY);
|
||||
|
||||
pool->last_pool_rearm = now;
|
||||
}
|
||||
}
|
||||
|
||||
g_atomic_int_set(&pool->rearming, 0);
|
||||
}
|
||||
|
||||
if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) {
|
||||
GQueue *queue;
|
||||
GList *lnk, *lnk_next;
|
||||
gint magazine, supply, num_cons;
|
||||
|
||||
/* select current queue */
|
||||
queue = pool->queues[wrk->ndx];
|
||||
|
||||
if (queue->length) {
|
||||
do {
|
||||
magazine = g_atomic_int_get(&pool->magazine);
|
||||
num_cons = g_atomic_int_get(&pool->num_cons_queued);
|
||||
supply = magazine / num_cons;
|
||||
} while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length)));
|
||||
|
||||
g_atomic_int_add(&(pool->num_cons_queued), - queue->length);
|
||||
|
||||
/* rearm connections */
|
||||
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
|
||||
((liConnection*)lnk->data)->throttle.pool.magazine += supply;
|
||||
((liConnection*)lnk->data)->throttle.pool.queue = NULL;
|
||||
lnk_next = lnk->next;
|
||||
lnk->next = NULL;
|
||||
lnk->prev = NULL;
|
||||
}
|
||||
|
||||
/* clear current connection queue */
|
||||
g_queue_init(queue);
|
||||
}
|
||||
|
||||
pool->last_con_rearm[wrk->ndx] = now;
|
||||
}
|
||||
}
|
||||
|
||||
void li_throttle_reset(liConnection *con) {
|
||||
if (!con->throttled)
|
||||
return;
|
||||
|
||||
/* remove from throttle queue */
|
||||
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
|
||||
li_throttle_pool_release(con);
|
||||
|
||||
con->throttle.con.rate = 0;
|
||||
con->throttle.con.magazine = 0;
|
||||
con->throttled = FALSE;
|
||||
}
|
||||
|
||||
void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
||||
liWaitQueueElem *wqe;
|
||||
|
@ -63,9 +157,7 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
liConnection *con;
|
||||
liWorker *wrk;
|
||||
ev_tstamp now;
|
||||
guint magazine, supply;
|
||||
GQueue *queue;
|
||||
GList *lnk, *lnk_next;
|
||||
guint supply;
|
||||
|
||||
UNUSED(revents);
|
||||
|
||||
|
@ -79,48 +171,7 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
/* throttled by pool */
|
||||
pool = con->throttle.pool.ptr;
|
||||
|
||||
/* this is basically another way to do "if (try_lock(foo)) { ...; unlock(foo); }" */
|
||||
if (g_atomic_int_compare_and_exchange(&pool->rearming, 0, 1)) {
|
||||
if ((now - pool->last_pool_rearm) >= THROTTLE_GRANULARITY) {
|
||||
if (g_atomic_int_get(&pool->magazine) <= (pool->rate * THROTTLE_GRANULARITY * 4))
|
||||
g_atomic_int_add(&pool->magazine, pool->rate * THROTTLE_GRANULARITY);
|
||||
|
||||
pool->last_pool_rearm = now;
|
||||
}
|
||||
|
||||
g_atomic_int_set(&pool->rearming, 0);
|
||||
}
|
||||
|
||||
/* select current queue */
|
||||
queue = pool->queues[wrk->ndx*2+pool->current_queue[wrk->ndx]];
|
||||
|
||||
if ((now - pool->last_con_rearm[wrk->ndx]) >= THROTTLE_GRANULARITY) {
|
||||
/* switch current queue by xoring with 1 */
|
||||
pool->current_queue[wrk->ndx] ^= 1;
|
||||
|
||||
if (queue->length) {
|
||||
do {
|
||||
magazine = g_atomic_int_get(&pool->magazine);
|
||||
supply = magazine / g_atomic_int_get(&pool->num_cons);
|
||||
} while (!g_atomic_int_compare_and_exchange(&pool->magazine, magazine, magazine - (supply * queue->length)));
|
||||
|
||||
g_atomic_int_add(&(pool->num_cons), - queue->length);
|
||||
|
||||
/* rearm connections */
|
||||
for (lnk = g_queue_peek_head_link(queue); lnk != NULL; lnk = lnk_next) {
|
||||
((liConnection*)lnk->data)->throttle.pool.magazine += supply;
|
||||
((liConnection*)lnk->data)->throttle.pool.queued = FALSE;
|
||||
lnk_next = lnk->next;
|
||||
lnk->next = NULL;
|
||||
lnk->prev = NULL;
|
||||
}
|
||||
|
||||
/* clear current connection queue */
|
||||
g_queue_init(queue);
|
||||
}
|
||||
|
||||
pool->last_con_rearm[wrk->ndx] = now;
|
||||
}
|
||||
li_throttle_pool_rearm(wrk, pool);
|
||||
|
||||
if (con->throttle.con.rate) {
|
||||
supply = MIN(con->throttle.pool.magazine, con->throttle.con.rate * THROTTLE_GRANULARITY);
|
||||
|
@ -130,8 +181,7 @@ void li_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
con->throttle.con.magazine += con->throttle.pool.magazine;
|
||||
con->throttle.pool.magazine = 0;
|
||||
}
|
||||
} else if (con->throttle.ip.ptr) {
|
||||
/* throttled by ip */
|
||||
/* TODO: throttled by ip */
|
||||
} else {
|
||||
/* throttled by connection */
|
||||
if (con->throttle.con.magazine <= con->throttle.con.rate * THROTTLE_GRANULARITY * 4)
|
||||
|
|
Loading…
Reference in New Issue