From c3dc0d8d7e5f61bde535297527db5d4d4f993e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Sun, 5 Sep 2010 13:08:40 +0200 Subject: [PATCH] [common]: Add generic jobqueue --- include/lighttpd/jobqueue.h | 61 ++++++++++ src/CMakeLists.txt | 1 + src/common/Makefile.am | 1 + src/common/jobqueue.c | 222 ++++++++++++++++++++++++++++++++++++ src/common/wscript | 1 + 5 files changed, 286 insertions(+) create mode 100644 include/lighttpd/jobqueue.h create mode 100644 src/common/jobqueue.c diff --git a/include/lighttpd/jobqueue.h b/include/lighttpd/jobqueue.h new file mode 100644 index 0000000..6677fc4 --- /dev/null +++ b/include/lighttpd/jobqueue.h @@ -0,0 +1,61 @@ +#ifndef _LIGHTTPD_JOBQUEUE_H_ +#define _LIGHTTPD_JOBQUEUE_H_ + +#include + +typedef struct liJob liJob; +typedef struct liJobRef liJobRef; +typedef struct liJobQueue liJobQueue; + +typedef void (*liJobCB)(liJob *job); + +/* All data here is private; use the functions to interact with the job-queue */ + +struct liJob { + guint generation; + GList link; + liJobCB callback; + liJobRef *ref; +}; + +struct liJobRef { + gint refcount; + liJob *job; + liJobQueue *queue; +}; + +struct liJobQueue { + struct ev_loop *loop; + guint generation; + + ev_prepare prepare_watcher; + + GQueue queue; + ev_timer queue_watcher; + + GAsyncQueue *async_queue; + ev_async async_queue_watcher; +}; + +LI_API void li_job_queue_init(liJobQueue *jq, struct ev_loop *loop); +LI_API void li_job_queue_clear(liJobQueue *jq); /* runs until all jobs are done */ + +LI_API void li_job_init(liJob *job, liJobCB callback); +LI_API void li_job_reset(liJob *job); +LI_API void li_job_clear(liJob *job); + +/* marks the job for later execution */ +LI_API void li_job_later(liJobQueue *jq, liJob *job); +LI_API void li_job_later_ref(liJobRef *jobref); /* NOT thread-safe! */ +/* if the job didn't run in this generation yet, run it now; otherwise mark it for later execution */ +LI_API void li_job_now(liJobQueue *jq, liJob *job); +LI_API void li_job_now_ref(liJobRef *jobref); /* NOT thread-safe! */ + +LI_API void li_job_async(liJobRef *jobref); +/* marks the job for later execution; this is the only threadsafe way to push a job to the queue */ + +LI_API liJobRef* li_job_ref(liJobQueue *jq, liJob *job); +LI_API void li_job_ref_release(liJobRef *jobref); +LI_API void li_job_ref_acquire(liJobRef *jobref); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dea53b..491f2de 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -174,6 +174,7 @@ SET(COMMON_SRC encoding.c idlist.c ip_parsers.c + jobqueue.c memcached.c mempool.c module.c diff --git a/src/common/Makefile.am b/src/common/Makefile.am index a617a91..97992c0 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -9,6 +9,7 @@ common_src= \ encoding.c \ idlist.c \ ip_parsers.c \ + jobqueue.c \ mempool.c \ module.c \ radix.c \ diff --git a/src/common/jobqueue.c b/src/common/jobqueue.c new file mode 100644 index 0000000..a9e58b6 --- /dev/null +++ b/src/common/jobqueue.c @@ -0,0 +1,222 @@ + +#include +#include + +#define INC_GEN(jq) do { jq->generation++; if (0 == jq->generation) jq->generation = 1; } while (0) + +static void job_queue_run(liJobQueue* jq, int loops) { + int i; + + for (i = 0; i < loops; i++) { + GQueue q = jq->queue; + GList *l; + liJob *job; + + INC_GEN(jq); + + if (q.length == 0) return; + + g_queue_init(&jq->queue); /* reset queue, elements are in q */ + + while (NULL != (l = g_queue_pop_head_link(&q))) { + job = LI_CONTAINER_OF(l, liJob, link); + job->generation = jq->generation; + job->link.data = NULL; + + job->callback(job); + } + } + + if (jq->queue.length > 0) { + /* make sure we will run again soon */ + ev_timer_start(jq->loop, &jq->queue_watcher); + } +} + +static void job_queue_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) { + liJobQueue* jq = (liJobQueue*) w->data; + UNUSED(loop); + UNUSED(revents); + + job_queue_run(jq, 3); +} + +static void job_queue_watcher_cb(struct ev_loop *loop, ev_timer *w, int revents) { + UNUSED(loop); + UNUSED(revents); + UNUSED(w); + + /* just keep loop alive, run jobs in prepare */ +} + +/* run jobs for async queued jobs */ +static void job_async_queue_cb(struct ev_loop *loop, ev_async *w, int revents) { + liJobQueue* jq = (liJobQueue*) w->data; + GAsyncQueue *q = jq->async_queue; + liJobRef *jobref; + UNUSED(loop); + UNUSED(revents); + + while (NULL != (jobref = g_async_queue_try_pop(q))) { + li_job_now_ref(jobref); + li_job_ref_release(jobref); + } +} + + +void li_job_queue_init(liJobQueue* jq, struct ev_loop *loop) { + jq->loop = loop; + + ev_init(&jq->prepare_watcher, job_queue_prepare_cb); + jq->prepare_watcher.data = jq; + ev_prepare_start(jq->loop, &jq->prepare_watcher); + ev_unref(jq->loop); /* this watcher shouldn't keep the loop alive */ + + /* job queue */ + g_queue_init(&jq->queue); + ev_timer_init(&jq->queue_watcher, job_queue_watcher_cb, 0, 0); + jq->queue_watcher.data = jq; + + jq->async_queue = g_async_queue_new(); + ev_async_init(&jq->async_queue_watcher, job_async_queue_cb); + jq->async_queue_watcher.data = jq; + ev_async_start(jq->loop, &jq->async_queue_watcher); + ev_unref(jq->loop); /* this watcher shouldn't keep the loop alive */ +} + +void li_job_queue_clear(liJobQueue *jq) { + while (jq->queue.length > 0 || g_async_queue_length(jq->async_queue) > 0) { + liJobRef *jobref; + + while (NULL != (jobref = g_async_queue_try_pop(jq->async_queue))) { + li_job_now_ref(jobref); + li_job_ref_release(jobref); + } + + job_queue_run(jq, 1); + } + + g_async_queue_unref(jq->async_queue); + jq->async_queue = NULL; + + li_ev_safe_ref_and_stop(ev_async_stop, jq->loop, &jq->async_queue_watcher); + li_ev_safe_ref_and_stop(ev_prepare_stop, jq->loop, &jq->prepare_watcher); + ev_timer_stop(jq->loop, &jq->queue_watcher); +} + +void li_job_init(liJob *job, liJobCB callback) { + job->generation = 0; + job->link.prev = job->link.next = job->link.data = 0; + job->callback = callback; + job->ref = 0; +} + +void li_job_reset(liJob *job) { + if (NULL != job->link.data) { + liJobQueue *jq = job->link.data; + + g_queue_unlink(&jq->queue, &job->link); + job->link.data = NULL; + } + + job->generation = 0; + if (NULL != job->ref) { + /* keep it if refcount == 1, as we are the only reference then */ + if (1 < g_atomic_int_get(&job->ref->refcount)) { + li_job_ref_release(job->ref); + job->ref = NULL; + } + } +} + +void li_job_clear(liJob *job) { + if (NULL != job->link.data) { + liJobQueue *jq = job->link.data; + + g_queue_unlink(&jq->queue, &job->link); + job->link.data = NULL; + } + + job->generation = 0; + if (NULL != job->ref) { + job->ref->job = NULL; + li_job_ref_release(job->ref); + job->ref = NULL; + } + + job->callback = NULL; +} + +void li_job_later(liJobQueue *jq, liJob *job) { + if (NULL != job->link.data) return; /* already queued */ + + job->link.data = jq; + g_queue_push_tail_link(&jq->queue, &job->link); +} + +void li_job_later_ref(liJobRef *jobref) { + liJob *job = jobref->job; + + if (NULL != job) li_job_later(jobref->queue, job); +} + +void li_job_now(liJobQueue *jq, liJob *job) { + if (job->generation != jq->generation) { + job->generation = jq->generation; + + /* unqueue if queued */ + if (NULL != job->link.data) { + assert(jq == job->link.data); + g_queue_unlink(&jq->queue, &job->link); + job->link.data = NULL; + } + + job->callback(job); + } else { + li_job_later(jq, job); + } +} + +void li_job_now_ref(liJobRef *jobref) { + liJob *job = jobref->job; + + if (NULL != job) li_job_now(jobref->queue, job); +} + +void li_job_async(liJobRef *jobref) { + liJobQueue *jq = jobref->queue; + GAsyncQueue *const q = jq->async_queue; + if (NULL == q) return; + li_job_ref_acquire(jobref); + g_async_queue_push(q, jobref); + ev_async_send(jq->loop, &jq->async_queue_watcher); +} + +liJobRef* li_job_ref(liJobQueue *jq, liJob *job) { + liJobRef *ref = job->ref; + + if (NULL != ref) { + li_job_ref_acquire(ref); + return ref; + } + + ref = g_slice_new0(liJobRef); + ref->refcount = 2; /* job->ref + returned ref */ + ref->job = job; + ref->queue = jq; + job->ref = ref; + + return ref; +} + +void li_job_ref_release(liJobRef *jobref) { + g_assert(jobref->refcount > 0); + if (g_atomic_int_dec_and_test(&jobref->refcount)) { + g_slice_free(liJobRef, jobref); + } +} + +void li_job_ref_acquire(liJobRef *jobref) { + g_assert(jobref->refcount > 0); + g_atomic_int_inc(&jobref->refcount); +} diff --git a/src/common/wscript b/src/common/wscript index bccd442..a5937d7 100644 --- a/src/common/wscript +++ b/src/common/wscript @@ -23,6 +23,7 @@ def build(bld): encoding.c idlist.c ip_parsers.rl + jobqueue.c memcached.c mempool.c module.c