2
0
Fork 0

[throttle] rewrite

personal/stbuehler/wip
Stefan Bühler 2013-05-21 17:16:44 +02:00
parent e0d9c0d602
commit 72ec9a432f
21 changed files with 921 additions and 702 deletions

View File

@ -42,7 +42,6 @@
#include <lighttpd/filter_buffer_on_disk.h>
#include <lighttpd/virtualrequest.h>
#include <lighttpd/stat_cache.h>
#include <lighttpd/throttle.h>
#include <lighttpd/mimetype.h>
#include <lighttpd/connection.h>

View File

@ -34,6 +34,8 @@ typedef struct liConnectionSocket liConnectionSocket;
struct liConnectionSocketCallbacks {
void (*finish)(liConnection *con, gboolean aborted);
liThrottleState* (*throttle_out)(liConnection *con);
liThrottleState* (*throttle_in)(liConnection *con);
};
struct liConnectionSocket {

View File

@ -19,7 +19,7 @@ LI_API ssize_t li_net_write(int fd, void *buf, ssize_t nbyte);
LI_API ssize_t li_net_read(int fd, void *buf, ssize_t nbyte);
LI_API liNetworkStatus li_network_write(int fd, liChunkQueue *cq, goffset write_max, GError **err);
LI_API liNetworkStatus li_network_read(int fd, liChunkQueue *cq, liBuffer **buffer, GError **err);
LI_API liNetworkStatus li_network_read(int fd, liChunkQueue *cq, goffset read_max, liBuffer **buffer, GError **err);
/* use writev for mem chunks, buffered read/write for files */
LI_API liNetworkStatus li_network_write_writev(int fd, liChunkQueue *cq, goffset *write_max, GError **err);

View File

@ -114,9 +114,6 @@ struct liServer {
gdouble io_timeout;
GArray *throttle_pools;
liRadixTree *throttle_ip_pools;
gdouble stat_cache_ttl;
gint tasklet_pool_threads;
};

View File

@ -58,7 +58,6 @@ LI_API liStream* li_stream_null_new(liEventLoop *loop); /* eats everything, disc
typedef void (*liIOStreamCB)(liIOStream *stream, liIOStreamEvent event);
/* TODO: support throttle */
struct liIOStream {
liStream stream_in, stream_out;
liCQLimit *stream_in_limit;
@ -70,8 +69,13 @@ struct liIOStream {
liEventIO io_watcher;
/* whether we want to read/write */
gboolean in_closed, out_closed;
gboolean can_read, can_write; /* set to FALSE if you got EAGAIN */
guint in_closed:1, out_closed:1;
guint can_read:1, can_write:1; /* set to FALSE if you got EAGAIN */
guint throttled_in:1, throttled_out:1;
/* throttle needs to be handled by the liIOStreamCB cb */
liThrottleState *throttle_in;
liThrottleState *throttle_out;
liIOStreamCB cb;
@ -86,6 +90,9 @@ LI_API void li_iostream_release(liIOStream* iostream);
LI_API int li_iostream_reset(liIOStream *iostream); /* returns fd, disconnects everything, stop callbacks, releases one reference */
/* unset throttle_out and throttle_in */
LI_API void li_iostream_throttle_clear(liIOStream *iostream);
/* similar to stream_detach/_attach */
LI_API void li_iostream_detach(liIOStream *iostream);
LI_API void li_iostream_attach(liIOStream *iostream, liWorker *wrk);

View File

@ -1,79 +1,28 @@
#ifndef _LIGHTTPD_THROTTLE_H_
#define _LIGHTTPD_THROTTLE_H_
#include <lighttpd/base.h>
#define THROTTLE_GRANULARITY 200 /* defines how frequently (in milliseconds) a magazine is refilled */
/* this makro converts a li_tstamp to a gint. this is needed for atomic access. millisecond precision, can hold two weeks max */
#define THROTTLE_EVTSTAMP_TO_GINT(x) ((gint) ((x - ((gint)x - (gint)x % (3600*24*14))) * 1000))
typedef void (*liThrottleNotifyCB)(liThrottleState *state, gpointer data);
typedef struct liThrottleState liThrottleState;
LI_API liThrottleState* li_throttle_new();
LI_API void li_throttle_set(liWorker *wrk, liThrottleState *state, guint rate, guint burst);
LI_API void li_throttle_free(liWorker *wrk, liThrottleState *state);
/* vrequest data */
#if 0
/* I/O throttling */
gboolean throttled; /* TRUE if vrequest is throttled */
struct {
gint magazine; /* currently available for use */
LI_API guint li_throttle_query(liWorker *wrk, liThrottleState *state, guint interested, liThrottleNotifyCB notify_callback, gpointer data);
LI_API void li_throttle_update(liThrottleState *state, guint used);
struct {
liThrottlePool *ptr; /* NULL if not in any throttling pool */
GList lnk;
GQueue *queue;
gint magazine;
} pool;
struct {
liThrottlePool *ptr;
GList lnk;
GQueue *queue;
gint magazine;
} ip;
struct {
gint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
ev_tstamp last_update;
} con;
liWaitQueueElem wqueue_elem;
} throttle;
#endif
LI_API liThrottlePool* li_throttle_pool_new(liServer *srv, guint rate, guint burst);
LI_API void li_throttle_pool_acquire(liThrottlePool *pool);
LI_API void li_throttle_pool_release(liThrottlePool *pool, liServer *srv);
typedef enum {
LI_THROTTLE_POOL_NAME,
LI_THROTTLE_POOL_IP
} liThrottlePoolType;
/* returns whether pool was actually added (otherwise it already was added) */
LI_API gboolean li_throttle_add_pool(liWorker *wrk, liThrottleState *state, liThrottlePool *pool);
LI_API void li_throttle_remove_pool(liWorker *wrk, liThrottleState *state, liThrottlePool *pool);
struct liThrottlePool {
/* global per pool */
liThrottlePoolType type;
union {
GString *name;
liSocketAddress addr;
} data;
gint rate; /* bytes/s */
gint refcount;
gint rearming; /* atomic access, 1 if a worker is currently rearming the magazine */
gint last_rearm; /* gint for atomic access. represents a ((gint)ev_tstamp*1000) */
/* local per worker */
gint *worker_magazine;
gint *worker_last_rearm;
gint *worker_num_cons_queued;
GQueue** worker_queues;
};
struct liThrottleParam {
gint rate;
guint burst;
};
LI_API void li_throttle_reset(liVRequest *vr);
LI_API void li_throttle_cb(liWaitQueue *wq, gpointer data);
LI_API liThrottlePool *li_throttle_pool_new(liServer *srv, liThrottlePoolType type, gpointer param, guint rate);
LI_API void li_throttle_pool_free(liServer *srv, liThrottlePool *pool);
LI_API void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool);
LI_API void li_throttle_pool_release(liVRequest *vr, liThrottlePool *pool);
/* update throttle data: notify it that we sent <transferred> bytes, and that we never send more than write_max at once */
LI_API void li_throttle_update(liVRequest *vr, goffset transferred, goffset write_max);
/* internal for worker waitqueue setup */
LI_API void li_throttle_waitqueue_cb(liWaitQueue *wq, gpointer data);
#endif

View File

@ -245,10 +245,9 @@ typedef enum {
/* throttle.h */
typedef struct liThrottleState liThrottleState;
typedef struct liThrottlePool liThrottlePool;
typedef struct liThrottleParam liThrottleParam;
/* value.h */
typedef struct liValue liValue;

View File

@ -35,9 +35,11 @@ typedef enum {
} liVRequestState;
typedef void (*liVRequestHandlerCB)(liVRequest *vr);
typedef liThrottleState* (*liVRequestThrottleCB)(liVRequest *vr);
struct liConCallbacks {
liVRequestHandlerCB handle_response_error; /* this is _not_ for 500 - internal error */
liVRequestThrottleCB throttle_out, throttle_in;
};
/* this data "belongs" to a vrequest, but is updated by the connection code */
@ -107,30 +109,6 @@ struct liVRequest {
liJob job;
GPtrArray *stat_cache_entries;
/* I/O throttling */
gboolean throttled; /* TRUE if vrequest is throttled */
struct {
gint magazine; /* currently available for use */
struct {
liThrottlePool *ptr; /* NULL if not in any throttling pool */
GList lnk;
GQueue *queue;
gint magazine;
} pool;
struct {
liThrottlePool *ptr;
GList lnk;
GQueue *queue;
gint magazine;
} ip;
struct {
gint rate; /* maximum transfer rate in bytes per second, 0 if unlimited */
ev_tstamp last_update;
} con;
liWaitQueueElem wqueue_elem;
} throttle;
};
#define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \

View File

@ -353,6 +353,7 @@ ADD_AND_INSTALL_LIBRARY(mod_redirect "modules/mod_redirect.c")
ADD_AND_INSTALL_LIBRARY(mod_rewrite "modules/mod_rewrite.c")
ADD_AND_INSTALL_LIBRARY(mod_scgi "modules/mod_scgi.c")
ADD_AND_INSTALL_LIBRARY(mod_status "modules/mod_status.c")
ADD_AND_INSTALL_LIBRARY(mod_throttle "modules/mod_throttle.c")
ADD_AND_INSTALL_LIBRARY(mod_userdir "modules/mod_userdir.c")
ADD_AND_INSTALL_LIBRARY(mod_vhost "modules/mod_vhost.c")

View File

@ -1,5 +1,6 @@
#include <lighttpd/base.h>
#include <lighttpd/throttle.h>
#include <lighttpd/plugin_core.h>
void li_connection_simple_tcp(liConnection **pcon, liIOStream *stream, gpointer *context, liIOStreamEvent event) {
@ -113,8 +114,25 @@ static void simple_tcp_finished(liConnection *con, gboolean aborted) {
}
}
static liThrottleState* simple_tcp_throttle_out(liConnection *con) {
simple_tcp_connection *data = con->con_sock.data;
if (NULL == data) return NULL;
if (NULL == data->sock_stream->throttle_out) data->sock_stream->throttle_out = li_throttle_new(con->wrk);
return data->sock_stream->throttle_out;
}
static liThrottleState* simple_tcp_throttle_in(liConnection *con) {
simple_tcp_connection *data = con->con_sock.data;
if (NULL == data) return NULL;
if (NULL == data->sock_stream->throttle_in) data->sock_stream->throttle_in = li_throttle_new(con->wrk);
return data->sock_stream->throttle_in;
}
static const liConnectionSocketCallbacks simple_tcp_cbs = {
simple_tcp_finished
simple_tcp_finished,
simple_tcp_throttle_out,
simple_tcp_throttle_in
};
static gboolean simple_tcp_new(liConnection *con, int fd) {
@ -522,8 +540,24 @@ static void mainvr_handle_response_error(liVRequest *vr) {
li_connection_error(con);
}
static liThrottleState* mainvr_throttle_out(liVRequest *vr) {
liConnection* con = li_connection_from_vrequest(vr);
assert(NULL != con);
return con->con_sock.callbacks->throttle_out(con);
}
static liThrottleState* mainvr_throttle_in(liVRequest *vr) {
liConnection* con = li_connection_from_vrequest(vr);
assert(NULL != con);
return con->con_sock.callbacks->throttle_in(con);
}
static const liConCallbacks con_callbacks = {
mainvr_handle_response_error
mainvr_handle_response_error,
mainvr_throttle_out,
mainvr_throttle_in
};
liConnection* li_connection_new(liWorker *wrk) {
@ -607,8 +641,6 @@ static void li_connection_reset2(liConnection *con) {
li_vrequest_reset(con->mainvr, FALSE);
li_throttle_reset(con->mainvr);
li_http_request_parser_reset(&con->req_parser_ctx);
g_string_truncate(con->info.remote_addr_str, 0);
@ -691,8 +723,6 @@ static void li_connection_reset_keep_alive(liConnection *con) {
con->out.out->is_closed = FALSE;
li_throttle_reset(con->mainvr);
li_vrequest_reset(con->mainvr, TRUE);
li_http_request_parser_reset(&con->req_parser_ctx);
@ -726,8 +756,6 @@ void li_connection_free(liConnection *con) {
g_string_free(con->info.local_addr_str, TRUE);
li_sockaddr_clear(&con->info.local_addr);
li_throttle_reset(con->mainvr);
li_vrequest_free(con->mainvr);
li_http_request_parser_clear(&con->req_parser_ctx);

View File

@ -73,16 +73,15 @@ liNetworkStatus li_network_write(int fd, liChunkQueue *cq, goffset write_max, GE
return res;
}
liNetworkStatus li_network_read(int fd, liChunkQueue *cq, liBuffer **buffer, GError **err) {
liNetworkStatus li_network_read(int fd, liChunkQueue *cq, goffset read_max, liBuffer **buffer, GError **err) {
const ssize_t blocksize = 16*1024; /* 16k */
off_t max_read = 16 * blocksize; /* 256k */
ssize_t r;
off_t len = 0;
if (cq->limit && cq->limit->limit > 0) {
if (max_read > cq->limit->limit - cq->limit->current) {
max_read = cq->limit->limit - cq->limit->current;
if (max_read <= 0) {
if (read_max > cq->limit->limit - cq->limit->current) {
read_max = cq->limit->limit - cq->limit->current;
if (read_max <= 0) {
g_set_error(err, LI_NETWORK_ERROR, 0, "li_network_read: fd should be disabled as chunkqueue is already full, aborting connection.");
return LI_NETWORK_STATUS_FATAL_ERROR;
}
@ -167,7 +166,7 @@ liNetworkStatus li_network_read(int fd, liChunkQueue *cq, liBuffer **buffer, GEr
}
}
len += r;
} while (r == blocksize && len < max_read);
} while (r == blocksize && len < read_max);
return LI_NETWORK_STATUS_SUCCESS;
}

View File

@ -1735,185 +1735,6 @@ static liAction* core_buffer_in(liServer *srv, liWorker *wrk, liPlugin* p, liVal
return li_action_new_function(core_handle_buffer_in, NULL, NULL, GINT_TO_POINTER((gint) limit));
}
static void core_throttle_pool_free(liServer *srv, gpointer param) {
UNUSED(srv);
li_throttle_pool_free(srv, param);
}
static liHandlerResult core_handle_throttle_pool(liVRequest *vr, gpointer param, gpointer *context) {
liThrottlePool *pool = param;
UNUSED(context);
li_throttle_pool_acquire(vr, pool);
return LI_HANDLER_GO_ON;
}
static liAction* core_throttle_pool(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
GString *name;
liThrottlePool *pool = NULL;
gint64 rate;
UNUSED(wrk); UNUSED(p); UNUSED(userdata);
if (val->type != LI_VALUE_STRING && val->type != LI_VALUE_LIST) {
ERROR(srv, "'io.throttle_pool' action expects a string or a string-number tuple as parameter, %s given", li_value_type_string(val->type));
return NULL;
}
if (val->type == LI_VALUE_LIST) {
if (val->data.list->len != 2
|| g_array_index(val->data.list, liValue*, 0)->type != LI_VALUE_STRING
|| g_array_index(val->data.list, liValue*, 1)->type != LI_VALUE_NUMBER) {
ERROR(srv, "%s", "'io.throttle_pool' action expects a string or a string-number tuple as parameter");
return NULL;
}
rate = g_array_index(val->data.list, liValue*, 1)->data.number;
if (rate && rate < (32*1024)) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum)", rate);
return NULL;
}
if (rate > (0xFFFFFFFF)) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
name = li_value_extract_string(g_array_index(val->data.list, liValue*, 0));
} else {
name = li_value_extract_string(val);
rate = 0;
}
pool = li_throttle_pool_new(srv, LI_THROTTLE_POOL_NAME, name, rate);
if (!pool) {
ERROR(srv, "io.throttle_pool: rate for pool '%s' hasn't been defined", name->str);
return NULL;
}
if (rate != pool->rate && rate != 0) {
ERROR(srv, "io.throttle_pool: pool '%s' already defined but with different rate (%ukbyte/s)", pool->data.name->str, pool->rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_pool, NULL, core_throttle_pool_free, pool);
}
static liHandlerResult core_handle_throttle_ip(liVRequest *vr, gpointer param, gpointer *context) {
liThrottlePool *pool;
gint rate = GPOINTER_TO_INT(param);
UNUSED(context);
pool = li_throttle_pool_new(vr->wrk->srv, LI_THROTTLE_POOL_IP, &vr->coninfo->remote_addr, rate);
li_throttle_pool_acquire(vr, pool);
return LI_HANDLER_GO_ON;
}
static liAction* core_throttle_ip(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
gint64 rate;
UNUSED(wrk); UNUSED(p); UNUSED(userdata);
if (val->type != LI_VALUE_NUMBER) {
ERROR(srv, "'io.throttle_ip' action expects a positiv integer as parameter, %s given", li_value_type_string(val->type));
return NULL;
}
rate = val->data.number;
if (rate < 32*1024) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too low (32kbyte/s minimum)", rate);
return NULL;
}
if (rate > 0xFFFFFFFF) {
ERROR(srv, "io.throttle_pool: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_ip, NULL, NULL, GINT_TO_POINTER(rate));
}
static void core_throttle_connection_free(liServer *srv, gpointer param) {
UNUSED(srv);
g_slice_free(liThrottleParam, param);
}
static liHandlerResult core_handle_throttle_connection(liVRequest *vr, gpointer param, gpointer *context) {
liThrottleParam *throttle_param = param;
UNUSED(context);
vr->throttle.con.rate = throttle_param->rate;
vr->throttled = TRUE;
if (vr->throttle.pool.magazine) {
gint supply = MAX(vr->throttle.pool.magazine, throttle_param->rate / 1000 * THROTTLE_GRANULARITY);
vr->throttle.magazine += supply;
vr->throttle.pool.magazine -= supply;
} else {
vr->throttle.magazine += throttle_param->burst;
}
return LI_HANDLER_GO_ON;
}
static liAction* core_throttle_connection(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
liThrottleParam *param;
UNUSED(wrk); UNUSED(p); UNUSED(userdata);
if (val->type == LI_VALUE_LIST && val->data.list->len == 2) {
liValue *v1 = g_array_index(val->data.list, liValue*, 0);
liValue *v2 = g_array_index(val->data.list, liValue*, 1);
if (v1->type != LI_VALUE_NUMBER || v2->type != LI_VALUE_NUMBER) {
ERROR(srv, "%s", "'io.throttle' action expects a positiv integer or a pair of those as parameter");
return NULL;
}
if (v1->data.number > (0xFFFFFFFF) || v2->data.number > (0xFFFFFFFF)) {
ERROR(srv, "%s", "io.throttle: rate or burst limit is too high (4gbyte/s maximum)");
return NULL;
}
param = g_slice_new(liThrottleParam);
param->rate = (guint)v2->data.number;
param->burst = (guint)v1->data.number;
} else if (val->type == LI_VALUE_NUMBER) {
if (val->data.number > (0xFFFFFFFF)) {
ERROR(srv, "io.throttle: rate %"G_GINT64_FORMAT" is too high (4gbyte/s maximum)", val->data.number);
return NULL;
}
param = g_slice_new(liThrottleParam);
param->rate = (guint)val->data.number;
param->burst = 0;
} else {
ERROR(srv, "'io.throttle' action expects a positiv integer or a pair of those as parameter, %s given", li_value_type_string(val->type));
return NULL;
}
if (param->rate && param->rate < (32*1024)) {
g_slice_free(liThrottleParam, param);
ERROR(srv, "io.throttle: rate %u is too low (32kbyte/s minimum or 0 for unlimited)", param->rate);
return NULL;
}
return li_action_new_function(core_handle_throttle_connection, NULL, core_throttle_connection_free, param);
}
typedef struct core_map_data core_map_data;
struct core_map_data {
liPattern *pattern;
@ -2113,9 +1934,6 @@ static const liPluginAction actions[] = {
{ "io.buffer_out", core_buffer_out, NULL },
{ "io.buffer_in", core_buffer_in, NULL },
{ "io.throttle", core_throttle_connection, NULL },
{ "io.throttle_pool", core_throttle_pool, NULL },
{ "io.throttle_ip", core_throttle_ip, NULL },
{ "map", core_map, NULL },
@ -2146,33 +1964,6 @@ static const liPluginAngel angelcbs[] = {
};
#include <sys/types.h>
static void plugin_core_prepare(liServer *srv, liPlugin *p) {
guint i;
UNUSED(p);
/* initialize throttle pools that have not been yet */
g_mutex_lock(srv->action_mutex);
for (i = 0; i < srv->throttle_pools->len; i++) {
liThrottlePool *pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
if (!pool->worker_queues) {
guint j;
pool->worker_magazine = g_new0(gint, srv->worker_count);
pool->worker_last_rearm = g_new0(gint, srv->worker_count);
pool->worker_num_cons_queued = g_new0(gint, srv->worker_count);
pool->worker_queues = g_new0(GQueue*, srv->worker_count);
for (j = 0; j < srv->worker_count; j++) {
pool->worker_queues[j] = g_queue_new();
pool->worker_last_rearm[j] = pool->last_rearm;
}
}
}
g_mutex_unlock(srv->action_mutex);
}
static void plugin_core_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk) {
UNUSED(p);
@ -2232,6 +2023,5 @@ void li_plugin_core_init(liServer *srv, liPlugin *p, gpointer userdata) {
p->setups = setups;
p->angelcbs = angelcbs;
p->handle_prepare = plugin_core_prepare;
p->handle_prepare_worker = plugin_core_prepare_worker;
}

View File

@ -125,9 +125,6 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) {
/* http header ts format */
li_server_ts_format_add(srv, g_string_new("%a, %d %b %Y %H:%M:%S GMT"));
srv->throttle_pools = g_array_new(FALSE, TRUE, sizeof(liThrottlePool*));
srv->throttle_ip_pools = li_radixtree_new();
srv->connection_load = 0;
srv->max_connections = 256; /* assume max-fds = 1024 */
srv->connection_limit_hit = FALSE;
@ -193,16 +190,6 @@ void li_server_free(liServer* srv) {
li_lua_clear(&srv->LL);
/* free throttle pools */
{
guint i;
for (i = 0; i < srv->throttle_pools->len; i++) {
li_throttle_pool_free(srv, g_array_index(srv->throttle_pools, liThrottlePool*, i));
}
g_array_free(srv->throttle_pools, TRUE);
li_radixtree_free(srv->throttle_ip_pools, NULL, NULL);
}
if (srv->acon) {
li_angel_connection_free(srv->acon);
srv->acon = NULL;

View File

@ -1,5 +1,6 @@
#include <lighttpd/base.h>
#include <lighttpd/throttle.h>
const gchar* li_stream_event_string(liStreamEvent event) {
switch (event) {
@ -313,6 +314,8 @@ static void iostream_destroy(liIOStream *iostream) {
li_event_clear(&iostream->io_watcher);
li_iostream_throttle_clear(iostream);
iostream->cb(iostream, LI_IOSTREAM_DESTROY);
assert(1 == iostream->stream_out.refcount);
@ -330,26 +333,26 @@ static void iostream_in_cb(liStream *stream, liStreamEvent event) {
/* locked */
return;
}
if (iostream->can_read) {
goffset curoutlen = stream->out->length;
if (!iostream->throttled_in && iostream->can_read) {
goffset curoutlen = stream->out->bytes_in;
gboolean curout_closed = stream->out->is_closed;
iostream->cb(iostream, LI_IOSTREAM_READ);
if (curoutlen != stream->out->length || curout_closed != stream->out->is_closed) {
if (curoutlen != stream->out->bytes_in || curout_closed != stream->out->is_closed) {
li_stream_notify_later(stream);
}
if (-1 == li_event_io_fd(&iostream->io_watcher)) return;
if (iostream->can_read) {
if (!iostream->throttled_in && iostream->can_read) {
li_stream_again_later(stream);
}
}
if (!iostream->can_read && !iostream->in_closed) {
if (!iostream->throttled_in && !iostream->can_read && !iostream->in_closed) {
li_event_io_add_events(&iostream->io_watcher, LI_EV_READ);
}
if (!iostream->can_write && !iostream->out_closed) {
if (!iostream->throttled_out && !iostream->can_write && !iostream->out_closed) {
li_event_io_add_events(&iostream->io_watcher, LI_EV_WRITE);
}
break;
@ -377,6 +380,10 @@ static void iostream_in_cb(liStream *stream, liStreamEvent event) {
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_DEST);
break;
case LI_STREAM_DESTROY:
if (NULL != iostream->throttle_in) {
li_throttle_free(li_worker_from_iostream(iostream), iostream->throttle_in);
iostream->throttle_in = NULL;
}
iostream->can_read = FALSE;
iostream_destroy(iostream);
break;
@ -390,7 +397,7 @@ static void iostream_out_cb(liStream *stream, liStreamEvent event) {
switch (event) {
case LI_STREAM_NEW_DATA:
if (iostream->can_write) {
if (!iostream->throttled_out && iostream->can_write) {
liEventLoop *loop = li_event_get_loop(&iostream->io_watcher);
li_tstamp now = li_event_now(loop);
@ -407,16 +414,16 @@ static void iostream_out_cb(liStream *stream, liStreamEvent event) {
if (-1 == li_event_io_fd(&iostream->io_watcher)) return;
if (iostream->can_write) {
if (iostream->can_write && !iostream->throttled_out) {
if (stream->out->length > 0 || stream->out->is_closed) {
li_stream_again_later(stream);
}
}
}
if (!iostream->can_read && !iostream->in_closed) {
if (!iostream->throttled_in && !iostream->can_read && !iostream->in_closed) {
li_event_io_add_events(&iostream->io_watcher, LI_EV_READ);
}
if (!iostream->can_write && !iostream->out_closed) {
if (!iostream->throttled_out && !iostream->can_write && !iostream->out_closed) {
li_event_io_add_events(&iostream->io_watcher, LI_EV_WRITE);
}
break;
@ -431,6 +438,10 @@ static void iostream_out_cb(liStream *stream, liStreamEvent event) {
iostream->cb(iostream, LI_IOSTREAM_DISCONNECTED_SOURCE);
break;
case LI_STREAM_DESTROY:
if (NULL != iostream->throttle_out) {
li_throttle_free(li_worker_from_iostream(iostream), iostream->throttle_out);
iostream->throttle_out = NULL;
}
iostream->can_write = FALSE;
iostream_destroy(iostream);
break;
@ -536,3 +547,16 @@ void li_iostream_attach(liIOStream *iostream, liWorker *wrk) {
li_event_attach(&wrk->loop, &iostream->io_watcher);
}
void li_iostream_throttle_clear(liIOStream *iostream) {
liWorker *wrk = li_worker_from_iostream(iostream);
if (NULL != iostream->throttle_in) {
li_throttle_free(wrk, iostream->throttle_in);
iostream->throttle_in = NULL;
}
if (NULL != iostream->throttle_out) {
li_throttle_free(wrk, iostream->throttle_out);
iostream->throttle_out = NULL;
}
}

View File

@ -1,5 +1,6 @@
#include <lighttpd/base.h>
#include <lighttpd/throttle.h>
void li_stream_simple_socket_close(liIOStream *stream, gboolean aborted) {
int fd = li_event_io_fd(&stream->io_watcher);
@ -31,14 +32,29 @@ void li_stream_simple_socket_close(liIOStream *stream, gboolean aborted) {
}
}
static void stream_simple_socket_read_throttle_notify(liThrottleState *state, gpointer data) {
liIOStream *stream = data;
UNUSED(state);
stream->throttled_in = FALSE;
stream->can_read = TRUE;
li_stream_again(&stream->stream_out);
}
static void stream_simple_socket_read(liIOStream *stream, gpointer *data) {
liNetworkStatus res;
GError *err = NULL;
liWorker *wrk = li_worker_from_iostream(stream);
int fd = li_event_io_fd(&stream->io_watcher);
off_t max_read = 256 * 1024; /* 256k */
liChunkQueue *raw_in = stream->stream_in.out;
if (NULL != stream->throttle_in) {
max_read = li_throttle_query(wrk, stream->throttle_in, max_read, stream_simple_socket_read_throttle_notify, stream);
if (0 == max_read) {
stream->throttled_in = TRUE;
return;
}
}
if (NULL == *data && NULL != wrk->network_read_buf) {
/* reuse worker buf if needed */
*data = wrk->network_read_buf;
@ -46,9 +62,13 @@ static void stream_simple_socket_read(liIOStream *stream, gpointer *data) {
}
{
goffset current_in_bytes = raw_in->bytes_in;
liBuffer *raw_in_buffer = *data;
res = li_network_read(fd, raw_in, &raw_in_buffer, &err);
res = li_network_read(fd, raw_in, max_read, &raw_in_buffer, &err);
*data = raw_in_buffer;
if (NULL != stream->throttle_in) {
li_throttle_update(stream->throttle_in, raw_in->bytes_in - current_in_bytes);
}
}
if (NULL == wrk->network_read_buf && NULL != *data
@ -78,29 +98,46 @@ static void stream_simple_socket_read(liIOStream *stream, gpointer *data) {
}
}
static void stream_simple_socket_write_throttle_notify(liThrottleState *state, gpointer data) {
liIOStream *stream = data;
UNUSED(state);
stream->throttled_out = FALSE;
stream->can_write = TRUE;
li_stream_again(&stream->stream_out);
}
static void stream_simple_socket_write(liIOStream *stream) {
liNetworkStatus res;
liChunkQueue *raw_out = stream->stream_out.out;
liChunkQueue *from = stream->stream_out.source->out;
int fd = li_event_io_fd(&stream->io_watcher);
liWorker *wrk;
liWorker *wrk = li_worker_from_iostream(stream);
li_chunkqueue_steal_all(raw_out, from);
if (raw_out->length > 0) {
static const goffset WRITE_MAX = 256*1024; /* 256kB */
goffset write_max;
goffset write_max, current_out_bytes = raw_out->bytes_out;
GError *err = NULL;
write_max = WRITE_MAX;
write_max = MAX(WRITE_MAX, raw_out->length);
if (NULL != stream->throttle_out) {
write_max = li_throttle_query(wrk, stream->throttle_out, write_max, stream_simple_socket_write_throttle_notify, stream);
if (0 == write_max) {
stream->throttled_out = TRUE;
return;
}
}
res = li_network_write(fd, raw_out, write_max, &err);
if (NULL != stream->throttle_out) {
li_throttle_update(stream->throttle_out, raw_out->bytes_out - current_out_bytes);
}
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
wrk = li_worker_from_iostream(stream);
ERROR(wrk->srv, "network write fatal error: %s", NULL != err ? err->message : "(unknown)");
g_error_free(err);
li_stream_simple_socket_close(stream, TRUE);

View File

@ -1,374 +1,370 @@
/*
* Implemented with token bucket algorithm.
* On average, the rate of bytes/s never exceeds the specified limit
* but allows small bursts of previously unused bandwidth (max rate*2 for 1 second).
*/
#include <lighttpd/base.h>
#include <lighttpd/throttle.h>
#include <math.h>
static void li_throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool);
/* max amount of bytes we release in one query */
#define THROTTLE_MAX_STEP (64*1024)
/* even if the magazine is empty release "overload" bytes to get requests started */
#define THROTTLE_OVERLOAD (8*1024)
liThrottlePool *li_throttle_pool_new(liServer *srv, liThrottlePoolType type, gpointer param, guint rate) {
/* debug */
#if 0
#define STRINGIFY(x) #x
#define _STRINGIFY(x) STRINGIFY(x)
#define throttle_debug(...) fprintf(stderr, "throttle.c:" _STRINGIFY(__LINE__) ": " __VA_ARGS__)
#else
#define throttle_debug(...) do { } while (0)
#endif
/* rates all in bytes/sec */
typedef struct liThrottlePoolState liThrottlePoolState;
typedef struct liThrottlePoolWorkerState liThrottlePoolWorkerState;
struct liThrottlePoolState {
liThrottlePool *pool;
GList pool_link;
/* currently available for use */
gint magazine;
};
struct liThrottleState {
gint magazine; /* currently available for use */
guint interested;
liWaitQueueElem wqueue_elem;
liThrottleNotifyCB notify_callback;
/* max values for this single state */
gint single_magazine;
guint single_rate, single_burst;
guint single_last_rearm;
/* shared pools */
GPtrArray *pools; /* <liThrottlePoolState> */
};
struct liThrottlePoolWorkerState {
gint magazine;
guint last_rearm;
guint connections; /* waiting.length; needed for atomic access */
GQueue waiting; /* <liThrottlePoolState.pool_link> waiting to get filled */
};
struct liThrottlePool {
int refcount;
GMutex *rearm_mutex;
guint rate, burst;
guint last_rearm;
liThrottlePoolWorkerState *workers;
};
static guint msec_timestamp(li_tstamp now) {
return (1000u * (guint64) floor(now)) + (guint64)(1000.0 * fmod(now, 1.0));
}
static void S_throttle_pool_rearm_workers(liThrottlePool *pool, guint worker_count, guint time_diff) {
guint i;
gint64 connections = 0;
gint64 wrk_connections[worker_count];
gint64 fill;
g_mutex_lock(srv->action_mutex);
for (i = 0; i < worker_count; ++i) {
wrk_connections[i] = g_atomic_int_get((gint*) &pool->workers[i].connections);
connections += wrk_connections[i];
}
if (type == LI_THROTTLE_POOL_NAME) {
/* named pool */
GString *name = param;
/* check if we already have a pool with that name */
for (i = 0; i < srv->throttle_pools->len; i++) {
pool = g_array_index(srv->throttle_pools, liThrottlePool*, i);
if (0 == connections) return;
if (g_string_equal(pool->data.name, name)) {
g_atomic_int_inc(&pool->refcount);
g_mutex_unlock(srv->action_mutex);
g_string_free(name, TRUE);
return pool;
time_diff = MIN(time_diff, 1000);
fill = MIN((guint64) pool->burst, ((guint64) pool->rate * time_diff) / 1000u);
throttle_debug("rearm workers: refill %i after %u (or more) msecs (rate %u, burst %u)\n",
(guint) fill, (guint) time_diff, pool->rate, pool->burst);
for (i = 0; i < worker_count; ++i) {
gint wrk_fill;
if (0 == wrk_connections[i]) continue;
wrk_fill = (fill * wrk_connections[i]) / connections;
throttle_debug("rearm worker %u: refill %u\n", i, wrk_fill);
g_atomic_int_add(&pool->workers[i].magazine, wrk_fill);
}
}
static void throttle_pool_rearm(liWorker *wrk, liThrottlePool *pool, guint now) {
liThrottlePoolWorkerState *wpool = &pool->workers[wrk->ndx];
guint last = g_atomic_int_get((gint*) &pool->last_rearm);
guint time_diff = now - last;
if (G_UNLIKELY(time_diff >= THROTTLE_GRANULARITY)) {
g_mutex_lock(pool->rearm_mutex);
/* check again */
last = g_atomic_int_get((gint*) &pool->last_rearm);
time_diff = now - last;
if (G_LIKELY(time_diff >= THROTTLE_GRANULARITY)) {
S_throttle_pool_rearm_workers(pool, wrk->srv->worker_count, time_diff);
g_atomic_int_set((gint*) &pool->last_rearm, now);
}
g_mutex_unlock(pool->rearm_mutex);
}
if (G_UNLIKELY(wpool->last_rearm < last)) {
/* distribute wpool->magazine */
GList *lnk;
guint connections = wpool->connections;
gint magazine = g_atomic_int_get(&wpool->magazine);
gint supply = magazine / connections;
g_atomic_int_add(&wpool->magazine, -supply * connections);
wpool->last_rearm = now;
throttle_debug("throttle_pool_rearm: distribute supply %i on each of %i connections\n",
supply, connections);
if (0 == supply) return;
g_atomic_int_set((gint*) &wpool->connections, 0);
while (NULL != (lnk = g_queue_pop_head_link(&wpool->waiting))) {
liThrottlePoolState *pstate = LI_CONTAINER_OF(lnk, liThrottlePoolState, pool_link);
pstate->magazine += supply;
lnk->data = NULL;
}
}
}
static void throttle_register(liThrottlePoolWorkerState *pwstate, liThrottlePoolState *pstate) {
if (NULL == pstate->pool_link.data) {
g_queue_push_tail_link(&pwstate->waiting, &pstate->pool_link);
pstate->pool_link.data = &pwstate->waiting;
g_atomic_int_inc((gint*) &pwstate->connections);
}
}
static void throttle_unregister(liThrottlePoolWorkerState *pwstate, liThrottlePoolState *pstate) {
if (NULL != pstate->pool_link.data) {
g_queue_unlink(&pwstate->waiting, &pstate->pool_link);
pstate->pool_link.data = NULL;
g_atomic_int_add((gint*) &pwstate->connections, -1);
}
}
guint li_throttle_query(liWorker *wrk, liThrottleState *state, guint interested, liThrottleNotifyCB notify_callback, gpointer data) {
guint now = msec_timestamp(li_cur_ts(wrk));
gint fill, pool_fill;
guint i, len;
if (NULL == state) return interested;
state->notify_callback = NULL;
state->wqueue_elem.data = NULL;
throttle_debug("li_throttle_query[%u]: interested %i, magazine %i\n", now, interested, state->magazine);
if (interested > THROTTLE_MAX_STEP) interested = THROTTLE_MAX_STEP;
if ((gint) interested <= state->magazine + THROTTLE_OVERLOAD) return interested;
/* also try to balance negative magazine */
fill = interested - state->magazine;
if (state->single_rate != 0) {
if (now - state->single_last_rearm >= THROTTLE_GRANULARITY) {
guint single_fill = ((guint64) state->single_rate) * 1000u / (now - state->single_last_rearm);
state->single_last_rearm = now;
if (state->single_burst - state->single_magazine < single_fill) {
state->single_magazine = state->single_burst;
} else {
state->single_magazine += single_fill;
}
}
} else {
/* IP address pool */
liSocketAddress *remote_addr = param;
if (fill > state->single_magazine) fill = state->single_magazine;
throttle_debug("single_magazine: %i\n", state->single_magazine);
}
if (remote_addr->addr->plain.sa_family == AF_INET)
pool = li_radixtree_lookup_exact(srv->throttle_ip_pools, &remote_addr->addr->ipv4.sin_addr.s_addr, 32);
else
pool = li_radixtree_lookup_exact(srv->throttle_ip_pools, &remote_addr->addr->ipv6.sin6_addr.s6_addr, 128);
/* pool_fill <= fill in the loop */
pool_fill = fill;
for (i = 0, len = state->pools->len; i < len; ++i) {
liThrottlePoolState *pstate = g_ptr_array_index(state->pools, i);
liThrottlePool *pool = pstate->pool;
liThrottlePoolWorkerState *pwstate = &pool->workers[wrk->ndx];
if (fill > pstate->magazine) {
throttle_register(pwstate, pstate);
throttle_pool_rearm(wrk, pool, now);
if (fill > pstate->magazine) {
throttle_register(pwstate, pstate);
if (pool_fill > pstate->magazine) {
pool_fill = pstate->magazine;
}
}
}
throttle_debug("pool %i magazine: %i\n", i, state->single_magazine);
}
if (pool) {
g_mutex_unlock(srv->action_mutex);
return pool;
throttle_debug("query refill: %i\n", pool_fill);
if (pool_fill > 0) {
if (state->single_rate != 0) {
state->single_magazine -= pool_fill;
}
for (i = 0, len = state->pools->len; i < len; ++i) {
liThrottlePoolState *pstate = g_ptr_array_index(state->pools, i);
pstate->magazine -= pool_fill;
}
state->magazine += pool_fill;
}
if (state->magazine + THROTTLE_OVERLOAD <= 0) {
throttle_debug("query queueing\n");
state->wqueue_elem.data = data;
state->notify_callback = notify_callback;
state->interested = interested;
if (!state->wqueue_elem.queued) {
li_waitqueue_push(&wrk->throttle_queue, &state->wqueue_elem);
}
return 0;
}
throttle_debug("query success: %i\n", state->magazine + THROTTLE_OVERLOAD);
if ((gint) interested <= state->magazine + THROTTLE_OVERLOAD) return interested;
return state->magazine + THROTTLE_OVERLOAD;
}
void li_throttle_update(liThrottleState *state, guint used) {
state->magazine -= used;
}
void li_throttle_pool_acquire(liThrottlePool *pool) {
assert(g_atomic_int_get(&pool->refcount) > 0);
g_atomic_int_inc(&pool->refcount);
}
void li_throttle_pool_release(liThrottlePool *pool, liServer *srv) {
assert(g_atomic_int_get(&pool->refcount) > 0);
if (g_atomic_int_dec_and_test(&pool->refcount)) {
g_mutex_free(pool->rearm_mutex);
pool->rearm_mutex = NULL;
if (NULL != pool->workers) {
g_slice_free1(sizeof(liThrottlePoolWorkerState) * srv->worker_count, pool->workers);
pool->workers = NULL;
}
}
}
if (rate == 0) {
g_mutex_unlock(srv->action_mutex);
return NULL;
gboolean li_throttle_add_pool(liWorker *wrk, liThrottleState *state, liThrottlePool *pool) {
liThrottlePoolState *pstate;
guint i, len;
assert(NULL != wrk);
assert(NULL != state);
if (NULL == pool) return FALSE;
for (i = 0, len = state->pools->len; i < len; ++i) {
pstate = g_ptr_array_index(state->pools, i);
if (pstate->pool == pool) return FALSE;
}
pool = g_slice_new0(liThrottlePool);
pool->type = type;
li_throttle_pool_acquire(pool);
pstate = g_slice_new0(liThrottlePoolState);
pstate->pool = pool;
g_ptr_array_add(state->pools, pstate);
return TRUE;
}
void li_throttle_remove_pool(liWorker *wrk, liThrottleState *state, liThrottlePool *pool) {
guint i, len;
assert(NULL != wrk);
if (NULL == state || NULL == pool) return;
for (i = 0, len = state->pools->len; i < len; ++i) {
liThrottlePoolState *pstate = g_ptr_array_index(state->pools, i);
if (pstate->pool == pool) {
throttle_unregister(&pool->workers[wrk->ndx], pstate);
g_ptr_array_remove_index_fast(state->pools, i);
li_throttle_pool_release(pool, wrk->srv);
g_slice_free(liThrottlePoolState, pstate);
return;
}
}
}
liThrottleState* li_throttle_new() {
liThrottleState *state = g_slice_new0(liThrottleState);
state->pools = g_ptr_array_new();
return state;
}
void li_throttle_set(liWorker *wrk, liThrottleState *state, guint rate, guint burst) {
UNUSED(wrk);
state->single_rate = rate;
state->single_burst = burst;
state->single_magazine = burst;
state->single_last_rearm = msec_timestamp(li_cur_ts(wrk));
}
void li_throttle_free(liWorker *wrk, liThrottleState *state) {
guint i, len;
assert(NULL != wrk);
if (NULL == state) return;
for (i = 0, len = state->pools->len; i < len; ++i) {
liThrottlePoolState *pstate = g_ptr_array_index(state->pools, i);
throttle_unregister(&pstate->pool->workers[wrk->ndx], pstate);
li_throttle_pool_release(pstate->pool, wrk->srv);
g_slice_free(liThrottlePoolState, pstate);
}
g_ptr_array_free(state->pools, TRUE);
li_waitqueue_remove(&wrk->throttle_queue, &state->wqueue_elem);
g_slice_free(liThrottleState, state);
}
static void throttle_prepare(liServer *srv, gpointer data, gboolean aborted) {
liThrottlePool *pool = data;
if (!aborted) {
guint i, len = srv->worker_count;
pool->workers = g_slice_alloc0(sizeof(liThrottlePoolWorkerState) * len);
for (i = 0; i < len; ++i) {
liThrottlePoolWorkerState *pwstate = &pool->workers[i];
pwstate->last_rearm = pool->last_rearm;
pwstate->magazine = pool->burst / len;
}
}
li_throttle_pool_release(pool, srv);
}
liThrottlePool* li_throttle_pool_new(liServer *srv, guint rate, guint burst) {
liThrottlePool *pool = g_slice_new0(liThrottlePool);
pool->refcount = 2; /* one for throttle_prepare() */
pool->last_rearm = msec_timestamp(li_event_time());
pool->rearm_mutex = g_mutex_new();
pool->rate = rate;
pool->last_rearm = THROTTLE_EVTSTAMP_TO_GINT(ev_time());
/*
* We if we are not in LI_SERVER_INIT state, we can initialize the queues directly.
* Otherwise they'll get initialized in the worker prepare callback.
*/
if (g_atomic_int_get(&srv->state) != LI_SERVER_INIT) {
pool->worker_magazine = g_new0(gint, srv->worker_count);
pool->worker_last_rearm = g_new0(gint, srv->worker_count);
pool->worker_num_cons_queued = g_new0(gint, srv->worker_count);
pool->worker_queues = g_new0(GQueue*, srv->worker_count);
for (i = 0; i < srv->worker_count; i++) {
pool->worker_queues[i] = g_queue_new();
pool->worker_last_rearm[i] = pool->last_rearm;
}
}
if (type == LI_THROTTLE_POOL_NAME) {
pool->refcount = 1;
pool->data.name = param;
g_array_append_val(srv->throttle_pools, pool);
} else {
liSocketAddress *remote_addr = param;
pool->data.addr = li_sockaddr_dup(*remote_addr);
if (remote_addr->addr->plain.sa_family == AF_INET)
li_radixtree_insert(srv->throttle_ip_pools, &remote_addr->addr->ipv4.sin_addr.s_addr, 32, pool);
else
li_radixtree_insert(srv->throttle_ip_pools, &remote_addr->addr->ipv6.sin6_addr.s6_addr, 128, pool);
}
g_mutex_unlock(srv->action_mutex);
pool->burst = burst;
li_server_register_prepare_cb(srv, throttle_prepare, pool);
return pool;
}
void li_throttle_pool_free(liServer *srv, liThrottlePool *pool) {
guint i;
if (!g_atomic_int_dec_and_test(&pool->refcount))
return;
g_mutex_lock(srv->action_mutex);
if (pool->type == LI_THROTTLE_POOL_NAME) {
for (i = 0; i < srv->throttle_pools->len; i++) {
if (pool == g_array_index(srv->throttle_pools, liThrottlePool*, i)) {
g_array_remove_index_fast(srv->throttle_pools, i);
break;
}
}
g_string_free(pool->data.name, TRUE);
} else {
if (pool->data.addr.addr->plain.sa_family == AF_INET)
li_radixtree_remove(srv->throttle_ip_pools, &pool->data.addr.addr->ipv4.sin_addr.s_addr, 32);
else
li_radixtree_remove(srv->throttle_ip_pools, &pool->data.addr.addr->ipv6.sin6_addr.s6_addr, 128);
li_sockaddr_clear(&pool->data.addr);
}
g_mutex_unlock(srv->action_mutex);
if (pool->worker_queues) {
for (i = 0; i < srv->worker_count; i++) {
g_queue_free(pool->worker_queues[i]);
}
g_free(pool->worker_magazine);
g_free(pool->worker_last_rearm);
g_free(pool->worker_num_cons_queued);
g_free(pool->worker_queues);
}
g_slice_free(liThrottlePool, pool);
}
void li_throttle_pool_acquire(liVRequest *vr, liThrottlePool *pool) {
/* already in this pool */
if (vr->throttle.pool.ptr == pool || vr->throttle.ip.ptr == pool)
return;
g_atomic_int_inc(&pool->refcount);
if (pool->type == LI_THROTTLE_POOL_NAME) {
if (vr->throttle.pool.ptr != NULL) {
/* already in a different pool */
li_throttle_pool_release(vr, vr->throttle.pool.ptr);
}
vr->throttle.pool.ptr = pool;
} else {
vr->throttle.ip.ptr = pool;
}