2
0
Fork 0

Added vrequest_joblist_append_async (with a vrequest-reference handling)

This commit is contained in:
Stefan Bühler 2009-04-14 18:18:25 +02:00
parent f07f4d4971
commit 7aaa764f49
5 changed files with 120 additions and 15 deletions

View File

@ -196,6 +196,9 @@ typedef enum {
struct vrequest;
typedef struct vrequest vrequest;
struct vrequest_ref;
typedef struct vrequest_ref vrequest_ref;
struct filter;
typedef struct filter filter;

View File

@ -50,9 +50,14 @@ struct filters {
guint skip_ndx;
};
struct connection;
struct vrequest_ref {
guint refcount;
vrequest *vr; /* This is only accesible by the worker thread the vrequest belongs to, and it may be NULL if the vrequest is already reset */
};
struct vrequest {
struct connection *con;
connection *con;
vrequest_ref *ref;
option_value *options;
@ -82,7 +87,8 @@ struct vrequest {
action_stack action_stack;
gboolean actions_wait_for_response;
GList *job_queue_link;
gint queued;
GList job_queue_link;
GPtrArray *stat_cache_entries;
};
@ -101,6 +107,9 @@ LI_API vrequest* vrequest_new(struct connection *con, vrequest_handler handle_re
LI_API void vrequest_free(vrequest *vr);
LI_API void vrequest_reset(vrequest *vr);
LI_API vrequest_ref* vrequest_acquire_ref(vrequest *vr);
LI_API vrequest* vrequest_release_ref(vrequest_ref *vr_ref);
LI_API void vrequest_add_filter_in(vrequest *vr, filter_handler handle_data, filter_free handle_free, gpointer param);
LI_API void vrequest_add_filter_out(vrequest *vr, filter_handler handle_data, filter_free handle_free, gpointer param);
@ -128,6 +137,7 @@ LI_API gboolean vrequest_is_handled(vrequest *vr);
LI_API void vrequest_state_machine(vrequest *vr);
LI_API void vrequest_joblist_append(vrequest *vr);
LI_API void vrequest_joblist_append_async(vrequest *vr);
LI_API gboolean vrequest_stat(vrequest *vr);

View File

@ -96,6 +96,9 @@ struct worker {
GQueue job_queue;
ev_timer job_queue_watcher;
GAsyncQueue *job_async_queue;
ev_async job_async_queue_watcher;
stat_cache *stat_cache;
};

View File

@ -103,6 +103,9 @@ vrequest* vrequest_new(connection *con, vrequest_handler handle_response_headers
vrequest *vr = g_slice_new0(vrequest);
vr->con = con;
vr->ref = g_slice_new0(vrequest_ref);
vr->ref->refcount = 1;
vr->ref->vr = vr;
vr->state = VRS_CLEAN;
vr->handle_response_headers = handle_response_headers;
@ -128,6 +131,8 @@ vrequest* vrequest_new(connection *con, vrequest_handler handle_response_headers
vr->stat_cache_entries = g_ptr_array_sized_new(2);
vr->job_queue_link.data = vr;
action_stack_init(&vr->action_stack);
return vr;
@ -148,9 +153,9 @@ void vrequest_free(vrequest* vr) {
filters_clean(vr, &vr->filters_in);
filters_clean(vr, &vr->filters_out);
if (vr->job_queue_link) {
g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link);
vr->job_queue_link = NULL;
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->con->wrk->job_queue, &vr->job_queue_link);
g_atomic_int_set(&vr->queued, 0);
}
g_slice_free1(vr->con->srv->option_def_values->len * sizeof(option_value), vr->options);
@ -162,6 +167,11 @@ void vrequest_free(vrequest* vr) {
}
g_ptr_array_free(vr->stat_cache_entries, TRUE);
vr->ref->vr = NULL;
if (g_atomic_int_dec_and_test(&vr->ref->refcount)) {
g_slice_free(vrequest_ref, vr->ref);
}
g_slice_free(vrequest, vr);
}
@ -188,9 +198,9 @@ void vrequest_reset(vrequest *vr) {
filters_reset(vr, &vr->filters_in);
filters_reset(vr, &vr->filters_out);
if (vr->job_queue_link) {
g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link);
vr->job_queue_link = NULL;
if (g_atomic_int_get(&vr->queued)) { /* atomic access shouldn't be needed here; no one else can access vr here... */
g_queue_unlink(&vr->con->wrk->job_queue, &vr->job_queue_link);
g_atomic_int_set(&vr->queued, 0);
}
for (i = 0; i < vr->stat_cache_entries->len; i++) {
@ -199,6 +209,34 @@ void vrequest_reset(vrequest *vr) {
}
memcpy(vr->options, vr->con->srv->option_def_values->data, vr->con->srv->option_def_values->len * sizeof(option_value));
if (1 != g_atomic_int_get(&vr->ref->refcount)) {
/* If we are not the only user of vr->ref we have to get a new one and detach the old */
vr->ref->vr = NULL;
if (g_atomic_int_dec_and_test(&vr->ref->refcount)) {
g_slice_free(vrequest_ref, vr->ref);
}
vr->ref = g_slice_new0(vrequest_ref);
vr->ref->refcount = 1;
vr->ref->vr = vr;
}
}
vrequest_ref* vrequest_acquire_ref(vrequest *vr) {
vrequest_ref* vr_ref = vr->ref;
g_assert(vr_ref->refcount > 0);
g_atomic_int_inc(&vr_ref->refcount);
return vr_ref;
}
vrequest* vrequest_release_ref(vrequest_ref *vr_ref) {
vrequest *vr = vr_ref->vr;
g_assert(vr_ref->refcount > 0);
if (g_atomic_int_dec_and_test(&vr_ref->refcount)) {
g_assert(vr == NULL); /* we are the last user, and the ref holded by vr itself is handled extra, so the vr was already reset */
g_slice_free(vrequest_ref, vr_ref);
}
return vr;
}
void vrequest_error(vrequest *vr) {
@ -432,12 +470,19 @@ void vrequest_state_machine(vrequest *vr) {
void vrequest_joblist_append(vrequest *vr) {
GQueue *const q = &vr->con->wrk->job_queue;
worker *wrk = vr->con->wrk;
if (vr->job_queue_link) return; /* already in queue */
g_queue_push_tail(q, vr);
vr->job_queue_link = g_queue_peek_tail_link(q);
if (!g_atomic_int_compare_and_exchange(&vr->queued, 0, 1)) return; /* already in queue */
g_queue_push_tail_link(q, &vr->job_queue_link);
ev_timer_start(wrk->loop, &wrk->job_queue_watcher);
}
void vrequest_joblist_append_async(vrequest *vr) {
GAsyncQueue *const q = vr->con->wrk->job_async_queue;
worker *wrk = vr->con->wrk;
if (!g_atomic_int_compare_and_exchange(&vr->queued, 0, 1)) return; /* already in queue */
g_async_queue_push(q, vrequest_acquire_ref(vr));
ev_async_send(wrk->loop, &wrk->job_async_queue_watcher);
}
gboolean vrequest_stat(vrequest *vr) {
/* Do not stat again */
if (vr->physical.have_stat || vr->physical.have_errno) return vr->physical.have_stat;

View File

@ -139,18 +139,37 @@ static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) {
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker *wrk = (worker*) w->data;
GQueue q = wrk->job_queue;
GList *l;
vrequest *vr;
UNUSED(loop);
UNUSED(revents);
g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */
while (NULL != (vr = g_queue_pop_head(&q))) {
vr->job_queue_link = NULL;
while (NULL != (l = g_queue_pop_head_link(&q))) {
vr = l->data;
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
vrequest_state_machine(vr);
}
}
/* run vreqest state machine for async queued jobs */
static void worker_job_async_queue_cb(struct ev_loop *loop, ev_async *w, int revents) {
worker *wrk = (worker*) w->data;
GAsyncQueue *q = wrk->job_async_queue;
vrequest_ref *vr_ref;
vrequest *vr;
UNUSED(loop);
UNUSED(revents);
while (NULL != (vr_ref = g_async_queue_try_pop(q))) {
if (NULL != (vr = vrequest_release_ref(vr_ref))) {
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
vrequest_state_machine(vr);
}
}
}
/* cache timestamp */
GString *worker_current_timestamp(worker *wrk, guint format_ndx) {
@ -338,6 +357,12 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) {
ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0);
wrk->job_queue_watcher.data = wrk;
wrk->job_async_queue = g_async_queue_new();
ev_async_init(&wrk->job_async_queue_watcher, worker_job_async_queue_cb);
wrk->job_async_queue_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->job_async_queue_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
stat_cache_new(wrk, srv->stat_cache_ttl);
return wrk;
@ -370,7 +395,7 @@ void worker_free(worker *wrk) {
}
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->worker_exit_watcher);
ev_async_stop(wrk->loop, &wrk->job_async_queue_watcher);
{ /* free timestamps */
guint i;
@ -379,6 +404,25 @@ void worker_free(worker *wrk) {
g_array_free(wrk->timestamps, TRUE);
}
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->worker_exit_watcher);
{
GAsyncQueue *q = wrk->job_async_queue;
vrequest_ref *vr_ref;
vrequest *vr;
while (NULL != (vr_ref = g_async_queue_try_pop(q))) {
if (NULL != (vr = vrequest_release_ref(vr_ref))) {
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
vrequest_state_machine(vr);
}
}
g_async_queue_unref(q);
}
g_async_queue_unref(wrk->new_con_queue);
ev_ref(wrk->loop);