2
0
Fork 0

[core]: add tasklet pools to workers and use them for stat-cache

personal/stbuehler/wip
Stefan Bühler 2010-08-25 18:05:23 +02:00
parent 8587598a66
commit 1c3d3c287f
8 changed files with 197 additions and 204 deletions

View File

@ -13,7 +13,9 @@ enum liCoreOptions {
LI_CORE_OPTION_MAX_KEEP_ALIVE_IDLE,
LI_CORE_OPTION_MAX_KEEP_ALIVE_REQUESTS,
LI_CORE_OPTION_ETAG_FLAGS
LI_CORE_OPTION_ETAG_FLAGS,
LI_CORE_OPTION_ASYNC_STAT
};
enum liCoreOptionPtrs {

View File

@ -130,6 +130,7 @@ struct liServer {
GArray *throttle_pools;
gdouble stat_cache_ttl;
gint tasklet_pool_threads;
};

View File

@ -62,20 +62,17 @@ struct liStatCacheEntry {
liStatCacheEntryData data;
GArray *dirlist; /* array of stat_cache_entry_data, used together with STAT_CACHE_ENTRY_DIR */
liStatCache *sc;
GPtrArray *vrequests; /* vrequests waiting for this info */
guint refcount;
liWaitQueueElem queue_elem; /* queue element for the delete_queue */
guint refcount; /* vrequests, delete_queue and tasklet hold references; dirlist/entrie cache entries are always in delete_queue too */
liWaitQueueElem queue_elem; /* queue element for the delete_queue */
gboolean cached;
};
struct liStatCache {
GHashTable *dirlists;
GHashTable *entries;
GAsyncQueue *job_queue_out; /* elements waiting for stat */
GAsyncQueue *job_queue_in; /* elements with finished stat */
liWaitQueue delete_queue;
GThread *thread;
ev_async job_watcher;
gdouble ttl;
guint64 hits;
@ -83,7 +80,7 @@ struct liStatCache {
guint64 errors;
};
LI_API void li_stat_cache_new(liWorker *wrk, gdouble ttl);
LI_API liStatCache* li_stat_cache_new(liWorker *wrk, gdouble ttl);
LI_API void li_stat_cache_free(liStatCache *sc);
/*

View File

@ -5,6 +5,8 @@
#error Please include <lighttpd/base.h> instead of this file
#endif
#include <lighttpd/tasklet.h>
struct lua_State;
typedef struct liStatistics liStatistics;
@ -109,6 +111,8 @@ struct liWorker {
GAsyncQueue *job_async_queue;
ev_async job_async_queue_watcher;
liTaskletPool *tasklets;
liStatCache *stat_cache;
GByteArray *network_read_buf; /** internal temporary buffer for network.c */

View File

@ -1010,6 +1010,20 @@ static gboolean core_stat_cache_ttl(liServer *srv, liPlugin* p, liValue *val, gp
return TRUE;
}
static gboolean core_tasklet_pool_threads(liServer *srv, liPlugin* p, liValue *val, gpointer userdata) {
UNUSED(p); UNUSED(userdata);
if (!val || val->type != LI_VALUE_NUMBER) {
ERROR(srv, "%s", "tasklet_pool.threads expects a number as parameter");
return FALSE;
}
srv->tasklet_pool_threads = val->data.number;
li_tasklet_pool_set_threads(srv->main_worker->tasklets, srv->tasklet_pool_threads);
return TRUE;
}
/*
* OPTIONS
*/
@ -1614,6 +1628,8 @@ static const liPluginOption options[] = {
{ "etag.use", LI_VALUE_NONE, 0, core_option_etag_use_parse }, /* type in config is list, internal type is number for flags */
{ "stat.async", LI_VALUE_BOOLEAN, TRUE, NULL },
{ NULL, 0, 0, NULL }
};
@ -1670,6 +1686,7 @@ static const liPluginSetup setups[] = {
{ "module_load", core_module_load, NULL },
{ "io.timeout", core_io_timeout, NULL },
{ "stat_cache.ttl", core_stat_cache_ttl, NULL },
{ "tasklet_pool.threads", core_tasklet_pool_threads, NULL },
{ NULL, NULL, NULL }
};

View File

@ -179,6 +179,7 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) {
srv->io_timeout = 300; /* default I/O timeout */
srv->keep_alive_queue_timeout = 5;
srv->stat_cache_ttl = 10.0; /* default stat cache ttl */
srv->tasklet_pool_threads = 4; /* default per-worker tasklet_pool threads */
#ifdef LIGHTY_OS_LINUX
/* sched_getaffinity is only available on linux */

View File

@ -2,78 +2,61 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <lighttpd/plugin_core.h>
static void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents);
static void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents);
static gpointer stat_cache_thread(gpointer data);
static void stat_cache_entry_free(liStatCacheEntry *sce);
void li_stat_cache_new(liWorker *wrk, gdouble ttl) {
static void stat_cache_entry_release(liStatCacheEntry *sce);
static void stat_cache_entry_acquire(liStatCacheEntry *sce);
liStatCache* li_stat_cache_new(liWorker *wrk, gdouble ttl) {
liStatCache *sc;
GError *err;
if (ttl < 0) {
/* fall back to default if not sane */
ttl = 10.0;
} else if (ttl == 0) {
/* ttl means disabled stat cache */
return;
return NULL;
}
sc = g_slice_new0(liStatCache);
sc->ttl = ttl;
sc->entries = g_hash_table_new_full((GHashFunc)g_string_hash, (GEqualFunc)g_string_equal, NULL, NULL);
sc->dirlists = g_hash_table_new_full((GHashFunc)g_string_hash, (GEqualFunc)g_string_equal, NULL, NULL);
sc->job_queue_in = g_async_queue_new();
sc->job_queue_out = g_async_queue_new();
li_waitqueue_init(&sc->delete_queue, wrk->loop, stat_cache_delete_cb, ttl, sc);
ev_init(&sc->job_watcher, stat_cache_job_cb);
sc->job_watcher.data = wrk;
ev_async_start(wrk->loop, &sc->job_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
wrk->stat_cache = sc;
return sc;
}
sc->thread = g_thread_create(stat_cache_thread, sc, TRUE, &err);
if (!sc->thread) {
/* failed to create thread */
assert(0);
static void stat_cache_remove_from_cache(liStatCache *sc, liStatCacheEntry *sce) {
if (sce->cached) {
if (sce->type == STAT_CACHE_ENTRY_SINGLE) {
g_hash_table_remove(sc->entries, sce->data.path);
} else {
g_hash_table_remove(sc->dirlists, sce->data.path);
}
sce->cached = FALSE;
}
sce->sc = NULL;
stat_cache_entry_release(sce);
}
void li_stat_cache_free(liStatCache *sc) {
liStatCacheEntry *sce;
liWaitQueueElem *wqe;
/* check if stat cache was enabled */
if (!sc)
return;
/* wake up thread */
sce = g_slice_new0(liStatCacheEntry);
g_async_queue_push(sc->job_queue_out, sce);
g_thread_join(sc->thread);
g_slice_free(liStatCacheEntry, sce);
li_waitqueue_stop(&sc->delete_queue);
while (NULL != (wqe = li_waitqueue_pop_force(&sc->delete_queue))) {
sce = wqe->data;
if (sce->cached) {
if (sce->type == STAT_CACHE_ENTRY_SINGLE)
g_hash_table_remove(sc->entries, sce->data.path);
else
g_hash_table_remove(sc->dirlists, sce->data.path);
sce->cached = FALSE;
}
stat_cache_entry_free(sce);
liStatCacheEntry *sce = wqe->data;
stat_cache_remove_from_cache(sc, sce);
}
li_ev_safe_ref_and_stop(ev_async_stop, sc->delete_queue.loop, &sc->job_watcher);
g_async_queue_unref(sc->job_queue_in);
g_async_queue_unref(sc->job_queue_out);
g_hash_table_destroy(sc->entries);
g_hash_table_destroy(sc->dirlists);
g_slice_free(liStatCache, sc);
@ -81,67 +64,133 @@ void li_stat_cache_free(liStatCache *sc) {
static void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liStatCache *sc = (liStatCache*) w->data;
liStatCacheEntry *sce;
liWaitQueueElem *wqe;
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(&sc->delete_queue)) != NULL) {
liStatCacheEntry *sce = wqe->data;
/* stat cache entry TTL over */
sce = wqe->data;
if (sce->cached) {
if (sce->type == STAT_CACHE_ENTRY_SINGLE)
g_hash_table_remove(sc->entries, sce->data.path);
else
g_hash_table_remove(sc->dirlists, sce->data.path);
sce->cached = FALSE;
}
if (sce->refcount) {
/* if there are still vrequests using this entry just requeue it */
li_waitqueue_push(&sc->delete_queue, wqe);
} else {
/* no more vrequests using this entry, finally free it */
stat_cache_entry_free(sce);
}
stat_cache_remove_from_cache(sc, sce);
}
li_waitqueue_update(&sc->delete_queue);
}
/* called whenever an async stat job has finished */
static void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents) {
static void stat_cache_finished(gpointer data) {
liStatCacheEntry *sce = data;
guint i;
liStatCacheEntry *sce;
liStatCache *sc = ((liWorker*)w->data)->stat_cache;
liVRequest *vr;
UNUSED(loop);
UNUSED(revents);
while ((sce = g_async_queue_try_pop(sc->job_queue_in)) != NULL) {
if (sce->data.failed)
sc->errors++;
/* queue pending vrequests */
for (i = 0; i < sce->vrequests->len; i++) {
vr = g_ptr_array_index(sce->vrequests, i);
li_vrequest_joblist_append(vr);
}
g_ptr_array_set_size(sce->vrequests, 0);
sce->refcount--;
if (sce->data.failed) {
if (NULL != sce->sc) sce->sc->errors++;
}
/* queue pending vrequests */
for (i = 0; i < sce->vrequests->len; i++) {
vr = g_ptr_array_index(sce->vrequests, i);
li_vrequest_joblist_append(vr);
}
/* release tasklet reference */
stat_cache_entry_release(sce);
}
static void stat_cache_run(gpointer data) {
liStatCacheEntry *sce = data;
if (stat(sce->data.path->str, &sce->data.st) == -1) {
sce->data.failed = TRUE;
sce->data.err = errno;
} else {
sce->data.failed = FALSE;
}
if (!sce->data.failed && sce->type == STAT_CACHE_ENTRY_DIR) {
/* dirlisting */
DIR *dirp;
gsize size;
struct dirent *entry;
struct dirent *result;
gint error;
liStatCacheEntryData sced;
GString *str;
dirp = opendir(sce->data.path->str);
if (dirp == NULL) {
sce->data.failed = TRUE;
sce->data.err = errno;
} else {
size = li_dirent_buf_size(dirp);
assert(size != (gsize)-1);
entry = g_slice_alloc(size);
sce->dirlist = g_array_sized_new(FALSE, FALSE, sizeof(liStatCacheEntryData), 32);
str = g_string_sized_new(sce->data.path->len + 64);
g_string_append_len(str, GSTR_LEN(sce->data.path));
while ((error = readdir_r(dirp, entry, &result)) == 0 && result != NULL) {
/* hide "." and ".." */
if (result->d_name[0] == '.' && (result->d_name[1] == '\0' ||
(result->d_name[1] == '.' && result->d_name[2] == '\0'))) {
continue;
}
sced.path = g_string_sized_new(63);
g_string_assign(sced.path, result->d_name);
g_string_truncate(str, sce->data.path->len);
/* make sure the path ends with / (or whatever) */
if (!sce->data.path->len || sce->data.path->str[sce->data.path->len-1] != G_DIR_SEPARATOR)
g_string_append_c(str, G_DIR_SEPARATOR);
g_string_append_len(str, GSTR_LEN(sced.path));
if (stat(str->str, &sced.st) == -1) {
sced.failed = TRUE;
sced.err = errno;
} else {
sced.failed = FALSE;
}
g_array_append_val(sce->dirlist, sced);
}
if (error) {
sce->data.failed = TRUE;
sce->data.err = error;
}
g_string_free(str, TRUE);
g_slice_free1(size, entry);
closedir(dirp);
}
}
g_atomic_int_set(&sce->state, STAT_CACHE_ENTRY_FINISHED);
}
static liStatCacheEntry *stat_cache_entry_new(liStatCache *sc, GString *path) {
liStatCacheEntry *sce;
sce = g_slice_new0(liStatCacheEntry);
sce->sc = sc;
sce->data.path = g_string_new_len(GSTR_LEN(path));
sce->vrequests = g_ptr_array_sized_new(8);
sce->state = STAT_CACHE_ENTRY_WAITING;
sce->queue_elem.data = sce;
sce->refcount = 1;
sce->cached = TRUE;
return sce;
}
static void stat_cache_entry_free(liStatCacheEntry *sce) {
guint i;
assert(sce->vrequests->len == 0);
assert(sce->refcount == 0);
g_string_free(sce->data.path, TRUE);
g_ptr_array_free(sce->vrequests, TRUE);
@ -157,105 +206,24 @@ static void stat_cache_entry_free(liStatCacheEntry *sce) {
g_slice_free(liStatCacheEntry, sce);
}
static gpointer stat_cache_thread(gpointer data) {
liStatCache *sc = data;
liStatCacheEntry *sce;
while (TRUE) {
sce = g_async_queue_pop(sc->job_queue_out);
/* stat cache entry with path == NULL indicates server stop */
if (!sce->data.path)
break;
if (stat(sce->data.path->str, &sce->data.st) == -1) {
sce->data.failed = TRUE;
sce->data.err = errno;
} else {
sce->data.failed = FALSE;
}
if (!sce->data.failed && sce->type == STAT_CACHE_ENTRY_DIR) {
/* dirlisting */
DIR *dirp;
gsize size;
struct dirent *entry;
struct dirent *result;
gint error;
liStatCacheEntryData sced;
GString *str;
dirp = opendir(sce->data.path->str);
if (dirp == NULL) {
sce->data.failed = TRUE;
sce->data.err = errno;
} else {
size = li_dirent_buf_size(dirp);
assert(size != (gsize)-1);
entry = g_slice_alloc(size);
sce->dirlist = g_array_sized_new(FALSE, FALSE, sizeof(liStatCacheEntryData), 32);
str = g_string_sized_new(sce->data.path->len + 64);
g_string_append_len(str, GSTR_LEN(sce->data.path));
while ((error = readdir_r(dirp, entry, &result)) == 0 && result != NULL) {
/* hide "." and ".." */
if (result->d_name[0] == '.' && (result->d_name[1] == '\0' ||
(result->d_name[1] == '.' && result->d_name[2] == '\0'))) {
continue;
}
sced.path = g_string_sized_new(63);
g_string_assign(sced.path, result->d_name);
g_string_truncate(str, sce->data.path->len);
/* make sure the path ends with / (or whatever) */
if (!sce->data.path->len || sce->data.path->str[sce->data.path->len-1] != G_DIR_SEPARATOR)
g_string_append_c(str, G_DIR_SEPARATOR);
g_string_append_len(str, GSTR_LEN(sced.path));
if (stat(str->str, &sced.st) == -1) {
sced.failed = TRUE;
sced.err = errno;
} else {
sced.failed = FALSE;
}
g_array_append_val(sce->dirlist, sced);
}
if (error) {
sce->data.failed = TRUE;
sce->data.err = error;
}
g_string_free(str, TRUE);
g_slice_free1(size, entry);
closedir(dirp);
}
}
g_atomic_int_set(&sce->state, STAT_CACHE_ENTRY_FINISHED);
g_async_queue_push(sc->job_queue_in, sce);
ev_async_send(sc->delete_queue.loop, &sc->job_watcher);
}
return NULL;
static void stat_cache_entry_release(liStatCacheEntry *sce) {
if (0 == --sce->refcount) stat_cache_entry_free(sce);
}
static liStatCacheEntry *stat_cache_entry_new(GString *path) {
liStatCacheEntry *sce;
static void stat_cache_entry_acquire(liStatCacheEntry *sce) {
++sce->refcount;
}
sce = g_slice_new0(liStatCacheEntry);
sce->data.path = g_string_new_len(GSTR_LEN(path));
sce->vrequests = g_ptr_array_sized_new(8);
sce->state = STAT_CACHE_ENTRY_WAITING;
sce->queue_elem.data = sce;
sce->refcount = 1;
sce->cached = TRUE;
void li_stat_cache_entry_acquire(liVRequest *vr, liStatCacheEntry *sce) {
stat_cache_entry_acquire(sce);
g_ptr_array_add(vr->stat_cache_entries, sce);
g_ptr_array_add(sce->vrequests, vr);
}
return sce;
void li_stat_cache_entry_release(liVRequest *vr, liStatCacheEntry *sce) {
g_ptr_array_remove_fast(sce->vrequests, vr);
g_ptr_array_remove_fast(vr->stat_cache_entries, sce);
stat_cache_entry_release(sce);
}
liHandlerResult li_stat_cache_get_dirlist(liVRequest *vr, GString *path, liStatCacheEntry **result) {
@ -274,7 +242,7 @@ liHandlerResult li_stat_cache_get_dirlist(liVRequest *vr, GString *path, liStatC
if (g_ptr_array_index(vr->stat_cache_entries, i) == sce)
return LI_HANDLER_WAIT_FOR_EVENT;
}
li_stat_cache_entry_acquire(vr, sce);
li_stat_cache_entry_acquire(vr, sce); /* assign sce to vr */
return LI_HANDLER_WAIT_FOR_EVENT;
}
@ -284,16 +252,22 @@ liHandlerResult li_stat_cache_get_dirlist(liVRequest *vr, GString *path, liStatC
if (g_ptr_array_index(vr->stat_cache_entries, i) == sce)
return LI_HANDLER_GO_ON;
}
li_stat_cache_entry_acquire(vr, sce);
li_stat_cache_entry_acquire(vr, sce); /* assign sce to vr */
return LI_HANDLER_GO_ON;
} else {
/* cache miss, allocate new entry */
sce = stat_cache_entry_new(path);
sce = stat_cache_entry_new(sc, path);
sce->type = STAT_CACHE_ENTRY_DIR;
li_stat_cache_entry_acquire(vr, sce);
li_stat_cache_entry_acquire(vr, sce); /* assign sce to vr */
/* uses initial reference of sce */
li_waitqueue_push(&sc->delete_queue, &sce->queue_elem);
g_hash_table_insert(sc->dirlists, sce->data.path, sce);
g_async_queue_push(sc->job_queue_out, sce);
sce->refcount++;
li_tasklet_push(vr->wrk->tasklets, stat_cache_run, stat_cache_finished, sce);
sc->misses++;
return LI_HANDLER_WAIT_FOR_EVENT;
}
@ -305,7 +279,7 @@ static liHandlerResult stat_cache_get(liVRequest *vr, GString *path, struct stat
guint i;
/* force blocking call if we are not in a vrequest context or stat cache is disabled */
if (!vr || !(sc = vr->wrk->stat_cache))
if (!vr || !(sc = vr->wrk->stat_cache) || !CORE_OPTION(LI_CORE_OPTION_ASYNC_STAT).boolean)
async = FALSE;
if (async) {
@ -320,19 +294,25 @@ static liHandlerResult stat_cache_get(liVRequest *vr, GString *path, struct stat
return LI_HANDLER_WAIT_FOR_EVENT;
}
}
li_stat_cache_entry_acquire(vr, sce);
li_stat_cache_entry_acquire(vr, sce); /* assign sce to vr */
return LI_HANDLER_WAIT_FOR_EVENT;
}
sc->hits++;
} else {
/* cache miss, allocate new entry */
sce = stat_cache_entry_new(path);
sce = stat_cache_entry_new(sc, path);
sce->type = STAT_CACHE_ENTRY_SINGLE;
li_stat_cache_entry_acquire(vr, sce);
li_stat_cache_entry_acquire(vr, sce); /* assign sce to vr */
/* uses initial reference of sce */
li_waitqueue_push(&sc->delete_queue, &sce->queue_elem);
g_hash_table_insert(sc->entries, sce->data.path, sce);
g_async_queue_push(sc->job_queue_out, sce);
sce->refcount++;
li_tasklet_push(vr->wrk->tasklets, stat_cache_run, stat_cache_finished, sce);
sc->misses++;
return LI_HANDLER_WAIT_FOR_EVENT;
}
@ -372,16 +352,3 @@ liHandlerResult li_stat_cache_get(liVRequest *vr, GString *path, struct stat *st
liHandlerResult li_stat_cache_get_sync(liVRequest *vr, GString *path, struct stat *st, int *err, int *fd) {
return stat_cache_get(vr, path, st, err, fd, FALSE);
}
void li_stat_cache_entry_acquire(liVRequest *vr, liStatCacheEntry *sce) {
sce->refcount++;
g_ptr_array_add(vr->stat_cache_entries, sce);
g_ptr_array_add(sce->vrequests, vr);
}
void li_stat_cache_entry_release(liVRequest *vr, liStatCacheEntry *sce) {
sce->refcount--;
g_ptr_array_remove_fast(sce->vrequests, vr);
g_ptr_array_remove_fast(vr->stat_cache_entries, sce);
}

View File

@ -464,6 +464,8 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
ev_async_start(wrk->loop, &wrk->job_async_queue_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
wrk->tasklets = li_tasklet_pool_new(wrk->loop, srv->tasklet_pool_threads);
wrk->network_read_buf = g_byte_array_sized_new(0);
return wrk;
@ -539,6 +541,8 @@ void li_worker_free(liWorker *wrk) {
li_stat_cache_free(wrk->stat_cache);
li_tasklet_pool_free(wrk->tasklets);
#ifdef HAVE_LUA_H
lua_close(wrk->L);
wrk->L = NULL;
@ -569,7 +573,7 @@ void li_worker_run(liWorker *wrk) {
/* setup stat cache if necessary */
if (wrk->srv->stat_cache_ttl && !wrk->stat_cache)
li_stat_cache_new(wrk, wrk->srv->stat_cache_ttl);
wrk->stat_cache = li_stat_cache_new(wrk, wrk->srv->stat_cache_ttl);
ev_loop(wrk->loop, 0);
}