From 8587598a66c35c1d5c0ae0a8e73a0728b7641a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Wed, 25 Aug 2010 15:56:11 +0200 Subject: [PATCH] [core] add tasklets: jobs which use blocking code --- include/lighttpd/tasklet.h | 36 ++++++++++ src/CMakeLists.txt | 1 + src/common/Makefile.am | 1 + src/common/tasklet.c | 144 +++++++++++++++++++++++++++++++++++++ src/common/wscript | 1 + 5 files changed, 183 insertions(+) create mode 100644 include/lighttpd/tasklet.h create mode 100644 src/common/tasklet.c diff --git a/include/lighttpd/tasklet.h b/include/lighttpd/tasklet.h new file mode 100644 index 0000000..a766dea --- /dev/null +++ b/include/lighttpd/tasklet.h @@ -0,0 +1,36 @@ +#ifndef _LIGHTTPD_TASKLET_H_ +#define _LIGHTTPD_TASKLET_H_ + +#include + +typedef struct liTaskletPool liTaskletPool; + +typedef void (*liTaskletFinishedCB)(gpointer data); +typedef void (*liTaskletRunCB)(gpointer data); + +/* if threads = 0: all run callbacks are executed immediately in li_tasklet_push (but finished_cb is delayed) + * if threads < 0: a shared GThreadPool is used + * if threads > 0: a exclusive GThreadPool is used with the specified numbers of threads + */ + +/* we do not keep the loop alive! */ +LI_API liTaskletPool* li_tasklet_pool_new(struct ev_loop *loop, gint threads); + +/* blocks until all tasks are done; calls all finished callbacks; + * you are allowed to call this from finish callbacks, but not more than once! + */ +LI_API void li_tasklet_pool_free(liTaskletPool *pool); + +/* this may stop the old pool and wait for all jobs to be finished; doesn't call finished callbacks */ +LI_API void li_tasklet_pool_set_threads(liTaskletPool *pool, gint threads); + +LI_API gint li_tasklet_pool_get_threads(liTaskletPool *pool); + +/* the finished callback is executed in the same thread context as the pool lives in; + * it will either be called from li_tasklet_pool_free or the ev-loop handler, + * never from li_tasklet_push + * all tasklets will be executed, you can *not* cancel them! + */ +LI_API void li_tasklet_push(liTaskletPool *pool, liTaskletRunCB run, liTaskletFinishedCB finished, gpointer data); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0e1d86e..8a3a287 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -180,6 +180,7 @@ SET(COMMON_SRC radix.c sys_memory.c sys_socket.c + tasklet.c utils.c waitqueue.c ) diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 15f50a8..a617a91 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -14,6 +14,7 @@ common_src= \ radix.c \ sys_memory.c \ sys_socket.c \ + tasklet.c \ utils.c \ waitqueue.c diff --git a/src/common/tasklet.c b/src/common/tasklet.c new file mode 100644 index 0000000..1ecb4d4 --- /dev/null +++ b/src/common/tasklet.c @@ -0,0 +1,144 @@ + +#include + +typedef struct liTasklet liTasklet; + +struct liTaskletPool { + GThreadPool *threadpool; + + struct ev_loop *loop; + ev_async finished_watcher; + GAsyncQueue *finished; + + int threads; + + /* -1: running finished_watcher_cb, do not delete + * 0: standard, do not delete, can delete + * 1: running finished_watcher_cb, delete in finished_watcher_cb + */ + int delete_later; +}; + +struct liTasklet { + liTaskletRunCB run_cb; + liTaskletFinishedCB finished_cb; + gpointer data; +}; + +static void finished_watcher_cb(struct ev_loop *loop, ev_async *w, int revents) { + liTaskletPool *pool = w->data; + liTasklet *t; + UNUSED(loop); + UNUSED(revents); + + pool->delete_later = -1; + + while (NULL != (t = g_async_queue_try_pop(pool->finished))) { + t->finished_cb(t->data); + if (1 == pool->delete_later) { + g_slice_free(liTaskletPool, pool); + return; + } + } + + pool->delete_later = 0; +} + +static void run_tasklet(gpointer data, gpointer userdata) { + liTaskletPool *pool = userdata; + liTasklet *t = data; + + t->run_cb(t->data); + g_async_queue_push(pool->finished, t); + ev_async_send(pool->loop, &pool->finished_watcher); +} + +liTaskletPool* li_tasklet_pool_new(struct ev_loop *loop, gint threads) { + liTaskletPool *pool = g_slice_new0(liTaskletPool); + + pool->loop = loop; + + ev_init(&pool->finished_watcher, finished_watcher_cb); + pool->finished_watcher.data = pool; + ev_async_start(pool->loop, &pool->finished_watcher); + ev_unref(pool->loop); + + pool->finished = g_async_queue_new(); + + li_tasklet_pool_set_threads(pool, threads); + + return pool; +} + +void li_tasklet_pool_free(liTaskletPool *pool) { + liTasklet *t; + + if (!pool) return; + + li_tasklet_pool_set_threads(pool, 0); + + while (NULL != (t = g_async_queue_try_pop(pool->finished))) { + t->finished_cb(t->data); + } + g_async_queue_unref(pool->finished); + pool->finished = NULL; + + ev_ref(pool->loop); + ev_async_stop(pool->loop, &pool->finished_watcher); + + if (-1 == pool->delete_later) { + pool->delete_later = 1; + } else { + g_slice_free(liTaskletPool, pool); + } +} + +void li_tasklet_pool_set_threads(liTaskletPool *pool, gint threads) { + if (threads < 0) threads = -1; + if (pool->threads == threads) return; + + if (NULL != pool->threadpool) { + if (pool->threads > 0 && threads > 0) { + /* pool was exclusive, stays exlusive. just change number of threads */ + g_thread_pool_set_max_threads(pool->threadpool, threads, NULL); + pool->threads = g_thread_pool_get_num_threads(pool->threadpool); + /* as we already had exclusive threads running, pool->threads should be > 0 */ + return; + } + + /* stop old pool */ + g_thread_pool_free(pool->threadpool, FALSE, TRUE); + pool->threadpool = NULL; + } + + if (threads != 0) { + pool->threadpool = g_thread_pool_new(run_tasklet, pool, threads, (threads > 0), NULL); + if (threads > 0) { /* exclusive pool, see how many threads we got */ + threads = g_thread_pool_get_num_threads(pool->threadpool); + if (threads == 0) { /* couldn't get exlusive threads, share threads instead */ + g_thread_pool_free(pool->threadpool, FALSE, TRUE); + pool->threadpool = g_thread_pool_new(run_tasklet, pool, -1, FALSE, NULL); + threads = -1; + } + } + } + + pool->threads = threads; +} + +gint li_tasklet_pool_get_threads(liTaskletPool *pool) { + return pool->threads; +} + +void li_tasklet_push(liTaskletPool* pool, liTaskletRunCB run, liTaskletFinishedCB finished, gpointer data) { + liTasklet *t = g_slice_new0(liTasklet); + t->run_cb = run; + t->finished_cb = finished; + t->data = data; + + if (NULL != pool->threadpool) { + g_thread_pool_push(pool->threadpool, t, NULL); + } else { + run_tasklet(t, pool); + } +} diff --git a/src/common/wscript b/src/common/wscript index 1bec16b..bccd442 100644 --- a/src/common/wscript +++ b/src/common/wscript @@ -28,6 +28,7 @@ def build(bld): module.c radix.c sys_memory.c + tasklet.c utils.c waitqueue.c '''