From efab0ca75ddb23219ce90fd43f562367e70fa3dd Mon Sep 17 00:00:00 2001 From: Thomas Porzelt Date: Sun, 1 Mar 2009 21:16:58 +0100 Subject: [PATCH] implement stat cache --- include/lighttpd/base.h | 1 + include/lighttpd/stat_cache.h | 91 +++++++++++++ include/lighttpd/typedefs.h | 5 + include/lighttpd/virtualrequest.h | 4 + include/lighttpd/worker.h | 2 + src/plugin_core.c | 40 ++++-- src/stat_cache.c | 213 ++++++++++++++++++++++++++++++ src/virtualrequest.c | 15 +++ src/worker.c | 4 + src/wscript | 1 + 10 files changed, 366 insertions(+), 10 deletions(-) create mode 100644 include/lighttpd/stat_cache.h create mode 100644 src/stat_cache.c diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index ea6ebf5..b0a418d 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -44,6 +44,7 @@ #include #include #include +#include #include diff --git a/include/lighttpd/stat_cache.h b/include/lighttpd/stat_cache.h new file mode 100644 index 0000000..0f42b71 --- /dev/null +++ b/include/lighttpd/stat_cache.h @@ -0,0 +1,91 @@ +/* + * stat cache - speeding up stat()s + * + * The basic idea behind the stat cache is to reduce calls to stat() which might be slow due to disk io (some ms). + * Each worker thread has its own cache so no locking contention between threads happens which could be slow. + * This means that there will be more stat() calls than there would be with only one shared cache but since there + * should be mostly hits in most cases (few items requested frequently) it will outweight the locking contention. + * To prevent the stat() from blocking all other requests of that worker, we hand it over to another thread. + * + * Entries are removed after 10 seconds (adjustable through stat_cache.ttl setup) + * + * TODO: + * - stat_cache.ttl setup + * - create ETAGs + * - get content type from xattr + * - add support for inotify (linux). TTL for entries can be increased to 60s + * + * Technical details: + * If a stat is requested, the following procedure takes place: + * - a cache lookup is performed + * - in case of a cache HIT: + * - if state is FINISHED and entry is fresh then return entry + * - if state is FINISHED but entry old then reset entry, create new job and return NULL + * - if state is WAITING then add vrequest to entry and return NULL (looks like a cache miss) + * - in case of a cache MISS: + * - a new entry is allocated and inserted into the cache, state is set to WAITING + * - the entry is inserted into the delete queue + * - a new job is created and NULL returned + * + * In the delete queue callback we check if no vrequests are working on that entry. If yes, we free it. If not then we requeue it. + * Locking only happens in two cases: 1) a new job is send to the stat thread 2) the stat thread sends the info back to the worker. + * + */ + +#ifndef _LIGHTTPD_STAT_CACHE_H_ +#define _LIGHTTPD_STAT_CACHE_H_ + +#ifndef _LIGHTTPD_BASE_H_ +#error Please include instead of this file +#endif + +struct stat_cache_entry { + GString *path; + GString *etag; + GString *content_type; + struct stat st; + ev_tstamp ts; /* timestamp the entry was created (not when the stat() was done) */ + gint err; + gboolean failed; + enum { + STAT_CACHE_ENTRY_WAITING, /* waiting for stat thread to do the work, no info available */ + STAT_CACHE_ENTRY_FINISHED, /* stat() done, info available */ + } state; + GPtrArray *vrequests; /* vrequests waiting for this info */ + guint refcount; + waitqueue_elem queue_elem; /* queue element for the delete_queue */ + gboolean in_cache; +}; + +struct stat_cache { + GHashTable *entries; + GAsyncQueue *job_queue_out; /* elements waiting for stat */ + GAsyncQueue *job_queue_in; /* elements with finished stat */ + waitqueue delete_queue; + GThread *thread; + ev_async job_watcher; + gdouble ttl; + + guint64 hits; + guint64 misses; + guint64 errors; +}; + +void stat_cache_new(worker *wrk, gdouble ttl); +void stat_cache_free(stat_cache *sc); +void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents); +void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents); +gpointer stat_cache_thread(gpointer data); + +void stat_cache_entry_free(stat_cache_entry *sce); + +/* + gets a stat_cache_entry for a specified path + returns NULL in case of a cache MISS and you should return HANDLER_WAIT_FOR_EVENT +*/ +LI_API stat_cache_entry *stat_cache_entry_get(vrequest *vr, GString *path); + +/* release a stat_cache_entry so it can be cleaned up */ +LI_API void stat_cache_entry_release(vrequest *vr); + +#endif \ No newline at end of file diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index 4d258c2..7681ca4 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -201,4 +201,9 @@ typedef struct filters filters; struct worker; typedef struct worker worker; +struct stat_cache_entry; +typedef struct stat_cache_entry stat_cache_entry; +struct stat_cache; +typedef struct stat_cache stat_cache; + #endif diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index 3df568f..e89cd34 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -82,6 +82,8 @@ struct vrequest { gboolean actions_wait_for_response; GList *job_queue_link; + + stat_cache_entry *stat_cache_entry; }; #define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \ @@ -124,4 +126,6 @@ LI_API void vrequest_joblist_append(vrequest *vr); LI_API gboolean vrequest_stat(vrequest *vr); +LI_API gboolean vrequest_redirect(vrequest *vr, GString *uri); + #endif diff --git a/include/lighttpd/worker.h b/include/lighttpd/worker.h index 0ce315c..4297961 100644 --- a/include/lighttpd/worker.h +++ b/include/lighttpd/worker.h @@ -94,6 +94,8 @@ struct worker { GQueue job_queue; ev_timer job_queue_watcher; + + stat_cache *stat_cache; }; LI_API worker* worker_new(struct server *srv, struct ev_loop *loop); diff --git a/src/plugin_core.c b/src/plugin_core.c index 2496f14..a585169 100644 --- a/src/plugin_core.c +++ b/src/plugin_core.c @@ -186,18 +186,38 @@ static action* core_docroot(server *srv, plugin* p, value *val) { static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *context) { int fd; + stat_cache_entry *sce; + UNUSED(param); UNUSED(context); - if (vr->physical.path->len == 0) return HANDLER_GO_ON; + if (!vr->stat_cache_entry) { + if (vr->physical.path->len == 0) return HANDLER_GO_ON; - if (!vrequest_handle_direct(vr)) return HANDLER_GO_ON; + if (!vrequest_handle_direct(vr)) return HANDLER_GO_ON; + } + + sce = stat_cache_entry_get(vr, vr->physical.path); + if (!sce) + return HANDLER_WAIT_FOR_EVENT; VR_DEBUG(vr, "serving static file: %s", vr->physical.path->str); - fd = open(vr->physical.path->str, O_RDONLY); - if (fd == -1) { - vr->response.http_status = 404; + if (sce->failed) { + /* stat failed */ + VR_DEBUG(vr, "stat() failed: %s (%d)", g_strerror(sce->err), sce->err); + + switch (errno) { + case ENOENT: + vr->response.http_status = 404; break; + case EACCES: + case EFAULT: + vr->response.http_status = 403; break; + default: + vr->response.http_status = 500; + } + g_print("%d\n", vr->response.http_status); + } else if ((fd = open(vr->physical.path->str, O_RDONLY)) == -1) { VR_DEBUG(vr, "open() failed: %s (%d)", g_strerror(errno), errno); switch (errno) { @@ -210,8 +230,6 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont vr->response.http_status = 500; } } else { - struct stat st; - fstat(fd, &st); #ifdef FD_CLOEXEC fcntl(fd, F_SETFD, FD_CLOEXEC); @@ -219,7 +237,7 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont /* redirect to scheme + host + path + / + querystring if directory without trailing slash */ /* TODO: local addr if HTTP 1.0 without host header */ - if (S_ISDIR(st.st_mode) && vr->request.uri.orig_path->str[vr->request.uri.orig_path->len-1] != '/') { + if (S_ISDIR(sce->st.st_mode) && vr->request.uri.orig_path->str[vr->request.uri.orig_path->len-1] != '/') { GString *host = vr->request.uri.authority->len ? vr->request.uri.authority : vr->con->local_addr_str; GString *uri = g_string_sized_new( 8 /* https:// */ + host->len + @@ -241,7 +259,7 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont http_header_overwrite(vr->response.headers, CONST_STR_LEN("Location"), GSTR_LEN(uri)); g_string_free(uri, TRUE); close(fd); - } else if (!S_ISREG(st.st_mode)) { + } else if (!S_ISREG(sce->st.st_mode)) { vr->response.http_status = 404; close(fd); } else { @@ -251,10 +269,12 @@ static handler_t core_handle_static(vrequest *vr, gpointer param, gpointer *cont http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), GSTR_LEN(mime_str)); else http_header_overwrite(vr->response.headers, CONST_STR_LEN("Content-Type"), CONST_STR_LEN("application/octet-stream")); - chunkqueue_append_file_fd(vr->out, NULL, 0, st.st_size, fd); + chunkqueue_append_file_fd(vr->out, NULL, 0, sce->st.st_size, fd); } } + stat_cache_entry_release(vr); + return HANDLER_GO_ON; } diff --git a/src/stat_cache.c b/src/stat_cache.c new file mode 100644 index 0000000..69e26bb --- /dev/null +++ b/src/stat_cache.c @@ -0,0 +1,213 @@ +#include + + +void stat_cache_new(worker *wrk, gdouble ttl) { + stat_cache *sc; + GError *err; + + sc = g_slice_new0(stat_cache); + sc->ttl = ttl; + sc->entries = g_hash_table_new_full((GHashFunc)g_string_hash, (GEqualFunc)g_string_equal, NULL, NULL); + sc->job_queue_in = g_async_queue_new(); + sc->job_queue_out = g_async_queue_new(); + + waitqueue_init(&sc->delete_queue, wrk->loop, stat_cache_delete_cb, ttl, sc); + ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ + + ev_init(&sc->job_watcher, stat_cache_job_cb); + sc->job_watcher.data = wrk; + ev_async_start(wrk->loop, &sc->job_watcher); + ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ + wrk->stat_cache = sc; + + sc->thread = g_thread_create(stat_cache_thread, sc, TRUE, &err); + + if (!sc->thread) { + /* failed to create thread */ + assert(0); + } +} + +void stat_cache_free(stat_cache *sc) { + GHashTableIter iter; + gpointer k, v; + + /* wake up thread */ + g_async_queue_push(sc->job_queue_out, g_slice_new0(stat_cache_entry)); + g_thread_join(sc->thread); + + ev_async_stop(sc->delete_queue.loop, &sc->job_watcher); + + /* clear cache */ + g_hash_table_iter_init(&iter, sc->entries); + while (g_hash_table_iter_next(&iter, &k, &v)) { + stat_cache_entry_free(v); + } + + waitqueue_stop(&sc->delete_queue); + g_async_queue_unref(sc->job_queue_in); + g_async_queue_unref(sc->job_queue_out); + g_hash_table_destroy(sc->entries); + g_slice_free(stat_cache, sc); +} + +void stat_cache_delete_cb(struct ev_loop *loop, ev_timer *w, int revents) { + stat_cache *sc = (stat_cache*) w->data; + stat_cache_entry *sce; + waitqueue_elem *wqe; + + UNUSED(loop); + UNUSED(revents); + + while ((wqe = waitqueue_pop(&sc->delete_queue)) != NULL) { + /* stat cache entry TTL over */ + sce = wqe->data; + if (sce->refcount) { + /* if there are still vrequests using this entry just requeue it */ + waitqueue_push(&sc->delete_queue, wqe); + } else { + /* no more vrequests using this entry, finally free it */ + if (sce->in_cache) + g_hash_table_remove(sc->entries, sce->path); + stat_cache_entry_free(sce); + } + } + + waitqueue_update(&sc->delete_queue); +} + +void stat_cache_job_cb(struct ev_loop *loop, ev_async *w, int revents) { + guint i; + stat_cache_entry *sce; + stat_cache *sc = ((worker*)w->data)->stat_cache; + vrequest *vr; + + UNUSED(loop); + UNUSED(revents); + + while ((sce = g_async_queue_try_pop(sc->job_queue_in)) != NULL) { + if (sce->failed) + sc->errors++; + + for (i = 0; i < sce->vrequests->len; i++) { + vr = g_ptr_array_index(sce->vrequests, i); + vrequest_joblist_append(vr); + } + + g_ptr_array_set_size(sce->vrequests, 0); + } +} + +void stat_cache_entry_free(stat_cache_entry *sce) { + assert(sce->vrequests->len == 0); + assert(sce->refcount == 0); + g_string_free(sce->path, TRUE); + g_ptr_array_free(sce->vrequests, TRUE); + g_slice_free(stat_cache_entry, sce); +} + + +stat_cache_entry *stat_cache_entry_get(vrequest *vr, GString *path) { + stat_cache *sc; + stat_cache_entry *sce; + + sc = vr->con->wrk->stat_cache; + + /* lookup entry in cache */ + sce = g_hash_table_lookup(sc->entries, path); + + if (sce) { + /* cache hit, check state */ + if (g_atomic_int_get(&sce->state) == STAT_CACHE_ENTRY_FINISHED) { + /* stat info available, check if it is fresh */ + if (sce->ts >= (CUR_TS(vr->con->wrk) - (ev_tstamp)sc->ttl)) { + /* entry fresh */ + if (!vr->stat_cache_entry) { + sc->hits++; + vr->stat_cache_entry = sce; + sce->refcount++; + } + return sce; + } else { + /* entry old */ + if (sce->refcount == 0) { + /* no vrequests working on the entry, reuse it */ + } else { + /* there are still vrequests using this entry, replace with a new one */ + sce->in_cache = FALSE; + sce = g_slice_new0(stat_cache_entry); + sce->path = g_string_new_len(GSTR_LEN(path)); + sce->vrequests = g_ptr_array_sized_new(8); + sce->in_cache = TRUE; + sce->queue_elem.data = sce; + g_hash_table_replace(sc->entries, sce->path, sce); + } + + sce->ts = CUR_TS(vr->con->wrk); + vr->stat_cache_entry = sce; + g_ptr_array_add(sce->vrequests, vr); + sce->refcount++; + waitqueue_push(&sc->delete_queue, &sce->queue_elem); + sce->state = STAT_CACHE_ENTRY_WAITING; + g_async_queue_push(sc->job_queue_out, sce); + sc->misses++; + return NULL; + } + } else { + /* stat info not available (state is STAT_CACHE_ENTRY_WAITING) */ + vr->stat_cache_entry = sce; + g_ptr_array_add(sce->vrequests, vr); + sce->refcount++; + sc->misses++; + return NULL; + } + } else { + /* cache miss, allocate new entry */ + sce = g_slice_new0(stat_cache_entry); + sce->path = g_string_new_len(GSTR_LEN(path)); + sce->vrequests = g_ptr_array_sized_new(8); + sce->ts = CUR_TS(vr->con->wrk); + sce->state = STAT_CACHE_ENTRY_WAITING; + sce->in_cache = TRUE; + sce->queue_elem.data = sce; + vr->stat_cache_entry = sce; + g_ptr_array_add(sce->vrequests, vr); + sce->refcount = 1; + waitqueue_push(&sc->delete_queue, &sce->queue_elem); + g_hash_table_insert(sc->entries, sce->path, sce); + g_async_queue_push(sc->job_queue_out, sce); + sc->misses++; + return NULL; + } +} + +void stat_cache_entry_release(vrequest *vr) { + vr->stat_cache_entry->refcount--; + vr->stat_cache_entry = NULL; +} + + +gpointer stat_cache_thread(gpointer data) { + stat_cache *sc = data; + stat_cache_entry *sce; + + while (TRUE) { + sce = g_async_queue_pop(sc->job_queue_out); + + /* stat cache entry with path == NULL indicates server stop */ + if (!sce->path) + break; + + if (stat(sce->path->str, &sce->st) == -1) { + sce->failed = TRUE; + sce->err = errno; + } else + sce->failed = FALSE; + + g_atomic_int_set(&sce->state, STAT_CACHE_ENTRY_FINISHED); + g_async_queue_push(sc->job_queue_in, sce); + ev_async_send(sc->delete_queue.loop, &sc->job_watcher); + } + + return NULL; +} \ No newline at end of file diff --git a/src/virtualrequest.c b/src/virtualrequest.c index 0c266a4..0c22685 100644 --- a/src/virtualrequest.c +++ b/src/virtualrequest.c @@ -119,6 +119,11 @@ void vrequest_reset(vrequest *vr) { vr->job_queue_link = NULL; } + if (vr->stat_cache_entry) { + g_ptr_array_remove_fast(vr->stat_cache_entry->vrequests, vr); + vr->stat_cache_entry = NULL; + } + memcpy(vr->options, vr->con->srv->option_def_values->data, vr->con->srv->option_def_values->len * sizeof(option_value)); } @@ -373,3 +378,13 @@ gboolean vrequest_stat(vrequest *vr) { vr->physical.have_stat = TRUE; return TRUE; } + +gboolean vrequest_redirect(vrequest *vr, GString *uri) { + if (!vrequest_handle_direct(vr)) + return FALSE; + + vr->response.http_status = 301; + http_header_overwrite(vr->response.headers, CONST_STR_LEN("Location"), GSTR_LEN(uri)); + + return TRUE; +} \ No newline at end of file diff --git a/src/worker.c b/src/worker.c index 3b2e429..8392b4c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -335,6 +335,8 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) { ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0); wrk->job_queue_watcher.data = wrk; + stat_cache_new(wrk, 10.0); + return wrk; } @@ -386,6 +388,8 @@ void worker_free(worker *wrk) { g_string_free(wrk->tmp_str, TRUE); + stat_cache_free(wrk->stat_cache); + g_slice_free(worker, wrk); } diff --git a/src/wscript b/src/wscript index 132a57a..2e04162 100644 --- a/src/wscript +++ b/src/wscript @@ -43,6 +43,7 @@ common_src = ''' request.c response.c server.c + stat_cache.c sys-files.c sys-socket.c url_parser.rl