From 025f0b5f84cf8752a125e71dd07cb8e7a83f1dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Tue, 30 Dec 2008 21:55:00 +0100 Subject: [PATCH] Removed HANDLER_FINISHED, implemented real job queue and added some basic balancer structs --- include/lighttpd/actions.h | 28 +++++++++++++++++++++- include/lighttpd/typedefs.h | 1 - include/lighttpd/virtualrequest.h | 2 ++ include/lighttpd/worker.h | 3 +++ src/CMakeLists.txt | 1 + src/actions.c | 34 ++++++++++++++++++++++---- src/connection.c | 7 +++--- src/filter_chunked.c | 2 +- src/modules/mod_balancer.c | 40 +++++++++++++++++++++++++++++++ src/virtualrequest.c | 27 ++++++++++++++------- src/worker.c | 21 ++++++++++++++++ 11 files changed, 145 insertions(+), 21 deletions(-) create mode 100644 src/modules/mod_balancer.c diff --git a/include/lighttpd/actions.h b/include/lighttpd/actions.h index b86e34f..d82b5c0 100644 --- a/include/lighttpd/actions.h +++ b/include/lighttpd/actions.h @@ -10,11 +10,19 @@ typedef enum { ACTION_TSETTING, ACTION_TFUNCTION, ACTION_TCONDITION, - ACTION_TLIST + ACTION_TLIST, + ACTION_TBALANCER } action_type; +typedef enum { + BACKEND_OVERLOAD, + BACKEND_DEAD +} backend_error; + struct action_stack { GArray* stack; + gboolean handle_backend_fail; + backend_error backend_error; }; /* param is the param registered with the callbacks; @@ -35,6 +43,22 @@ struct action_func { }; typedef struct action_func action_func; + +typedef handler_t (*BackendSelect)(vrequest *vr, gpointer param, gpointer *context); +typedef handler_t (*BackendFallback)(vrequest *vr, gpointer param, gpointer *context); +typedef handler_t (*BackendFinished)(vrequest *vr, gpointer param, gpointer context); +typedef handler_t (*BalancerFree)(server *srv, gpointer param); +struct balancer_func { + BackendSelect select; + BackendFallback fallback; + BackendFinished finished; + BalancerFree free; + gpointer param; + gboolean provide_backlog; +}; +typedef struct balancer_func balancer_func; + + struct action { gint refcount; action_type type; @@ -51,6 +75,8 @@ struct action { action_func function; GArray* list; /** array of (action*) */ + + balancer_func balancer; } data; }; diff --git a/include/lighttpd/typedefs.h b/include/lighttpd/typedefs.h index cda39b5..99f2a2d 100644 --- a/include/lighttpd/typedefs.h +++ b/include/lighttpd/typedefs.h @@ -8,7 +8,6 @@ typedef enum { typedef enum { HANDLER_GO_ON, - HANDLER_FINISHED, HANDLER_COMEBACK, HANDLER_WAIT_FOR_EVENT, HANDLER_ERROR, diff --git a/include/lighttpd/virtualrequest.h b/include/lighttpd/virtualrequest.h index 85956e7..708bf65 100644 --- a/include/lighttpd/virtualrequest.h +++ b/include/lighttpd/virtualrequest.h @@ -75,6 +75,8 @@ struct vrequest { action_stack action_stack; gboolean actions_wait_for_response; + + GList *job_queue_link; }; LI_API vrequest* vrequest_new(struct connection *con, vrequest_handler handle_response_headers, vrequest_handler handle_response_body, vrequest_handler handle_response_error, vrequest_handler handle_request_headers); diff --git a/include/lighttpd/worker.h b/include/lighttpd/worker.h index a654f4e..0be170c 100644 --- a/include/lighttpd/worker.h +++ b/include/lighttpd/worker.h @@ -85,6 +85,9 @@ struct worker { /* collect framework */ ev_async collect_watcher; GAsyncQueue *collect_queue; + + GQueue job_queue; + ev_timer job_queue_watcher; }; LI_API worker* worker_new(struct server *srv, struct ev_loop *loop); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d894850..98d8555 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -336,6 +336,7 @@ SET(COMMON_CFLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS ADD_AND_INSTALL_LIBRARY(mod_fortune "modules/mod_fortune.c") ADD_AND_INSTALL_LIBRARY(mod_status "modules/mod_status.c") +ADD_AND_INSTALL_LIBRARY(mod_balancer "modules/mod_balancer.c") ADD_TARGET_PROPERTIES(lighttpd LINK_FLAGS ${COMMON_LDFLAGS}) ADD_TARGET_PROPERTIES(lighttpd COMPILE_FLAGS ${COMMON_CFLAGS}) diff --git a/src/actions.c b/src/actions.c index 0ce2d0e..d977748 100644 --- a/src/actions.c +++ b/src/actions.c @@ -99,8 +99,20 @@ action *action_new_condition(condition *cond, action *target, action *target_els static void action_stack_element_release(server *srv, vrequest *vr, action_stack_element *ase) { action *a = ase->act; if (!ase || !a) return; - if (a->type == ACTION_TFUNCTION && ase->data.context && a->data.function.cleanup) { - a->data.function.cleanup(vr, a->data.function.param, ase->data.context); + switch (a->type) { + case ACTION_TSETTING: + break; + case ACTION_TFUNCTION: + if (ase->data.context && a->data.function.cleanup) { + a->data.function.cleanup(vr, a->data.function.param, ase->data.context); + } + break; + case ACTION_TCONDITION: + case ACTION_TLIST: + break; + case ACTION_TBALANCER: + a->data.balancer.finished(vr, a->data.balancer.param, ase->data.context); + break; } action_release(srv, ase->act); ase->act = NULL; @@ -175,7 +187,6 @@ handler_t action_execute(vrequest *vr) { res = a->data.function.func(vr, a->data.function.param, &ase->data.context); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: ase->finished = TRUE; break; case HANDLER_ERROR: @@ -191,7 +202,6 @@ handler_t action_execute(vrequest *vr) { res = condition_check(vr, a->data.condition.cond, &condres); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: action_stack_pop(srv, vr, as); if (condres) { action_enter(vr, a->data.condition.target); @@ -216,7 +226,21 @@ handler_t action_execute(vrequest *vr) { ase->data.pos++; } break; + case ACTION_TBALANCER: + res = a->data.balancer.select(vr, a->data.balancer.param, &ase->data.context); + switch (res) { + case HANDLER_GO_ON: + ase->finished = TRUE; + break; + case HANDLER_ERROR: + action_stack_reset(vr, as); + case HANDLER_COMEBACK: + case HANDLER_WAIT_FOR_EVENT: + case HANDLER_WAIT_FOR_FD: + return res; + } + break; } } - return HANDLER_FINISHED; + return HANDLER_GO_ON; } diff --git a/src/connection.c b/src/connection.c index 3064a69..7893378 100644 --- a/src/connection.c +++ b/src/connection.c @@ -140,7 +140,6 @@ static gboolean connection_handle_read(connection *con) { VR_DEBUG(vr, "%s", "reading request header"); } switch(http_request_parse(con->mainvr, &con->req_parser_ctx)) { - case HANDLER_FINISHED: case HANDLER_GO_ON: break; /* go on */ case HANDLER_WAIT_FOR_EVENT: @@ -271,7 +270,7 @@ static handler_t mainvr_handle_response_headers(vrequest *vr) { static handler_t mainvr_handle_response_body(vrequest *vr) { connection *con = vr->con; - if (check_response_done(con)) return HANDLER_FINISHED; + if (check_response_done(con)) return HANDLER_GO_ON; if (CORE_OPTION(CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { VR_DEBUG(vr, "%s", "write response"); @@ -280,14 +279,14 @@ static handler_t mainvr_handle_response_body(vrequest *vr) { parse_request_body(con); forward_response_body(con); - if (check_response_done(con)) return HANDLER_FINISHED; + if (check_response_done(con)) return HANDLER_GO_ON; return HANDLER_GO_ON; } static handler_t mainvr_handle_response_error(vrequest *vr) { connection_internal_error(vr->con); - return HANDLER_FINISHED; + return HANDLER_GO_ON; } static handler_t mainvr_handle_request_headers(vrequest *vr) { diff --git a/src/filter_chunked.c b/src/filter_chunked.c index 5a72f74..aac13bc 100644 --- a/src/filter_chunked.c +++ b/src/filter_chunked.c @@ -37,7 +37,7 @@ handler_t filter_chunked_encode(connection *con, chunkqueue *out, chunkqueue *in chunkqueue_append_mem(out, CONST_STR_LEN("0\r\n")); out->is_closed = TRUE; } - return HANDLER_FINISHED; + return HANDLER_GO_ON; } return HANDLER_GO_ON; } diff --git a/src/modules/mod_balancer.c b/src/modules/mod_balancer.c new file mode 100644 index 0000000..15f00ac --- /dev/null +++ b/src/modules/mod_balancer.c @@ -0,0 +1,40 @@ + +#include + +static const plugin_option options[] = { + { NULL, 0, NULL, NULL, NULL } +}; + +static const plugin_action actions[] = { +// { "balancer.rr", status_page }, + { NULL, NULL } +}; + +static const plugin_setup setups[] = { + { NULL, NULL } +}; + + +static void plugin_init(server *srv, plugin *p) { + UNUSED(srv); + + p->options = options; + p->actions = actions; + p->setups = setups; +} + + +LI_API gboolean mod_balancer_init(modules *mods, module *mod) { + MODULE_VERSION_CHECK(mods); + + mod->config = plugin_register(mods->main, "mod_balancer", plugin_init); + + return mod->config != NULL; +} + +LI_API gboolean mod_balancer_free(modules *mods, module *mod) { + if (mod->config) + plugin_free(mods->main, mod->config); + + return TRUE; +} diff --git a/src/virtualrequest.c b/src/virtualrequest.c index 3fe9a92..bfefdbd 100644 --- a/src/virtualrequest.c +++ b/src/virtualrequest.c @@ -77,6 +77,11 @@ void vrequest_free(vrequest* vr) { action_stack_clear(vr, &vr->action_stack); + if (vr->job_queue_link) { + g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link); + vr->job_queue_link = NULL; + } + g_slice_free1(vr->con->srv->option_def_values->len * sizeof(option_value), vr->options); g_slice_free(vrequest, vr); @@ -96,6 +101,11 @@ void vrequest_reset(vrequest *vr) { action_stack_reset(vr, &vr->action_stack); + if (vr->job_queue_link) { + g_queue_delete_link(&vr->con->wrk->job_queue, vr->job_queue_link); + vr->job_queue_link = NULL; + } + memcpy(vr->options, vr->con->srv->option_def_values->data, vr->con->srv->option_def_values->len * sizeof(option_value)); } @@ -161,7 +171,6 @@ static gboolean vrequest_do_handle_actions(vrequest *vr) { handler_t res = action_execute(vr); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: if (vr->state == VRS_HANDLE_REQUEST_HEADERS) { VR_ERROR(vr, "%s", "actions didn't handle request"); /* request not handled */ @@ -192,7 +201,6 @@ static gboolean vrequest_do_handle_read(vrequest *vr) { res = vr->handle_request_body(vr); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: break; case HANDLER_COMEBACK: vrequest_joblist_append(vr); /* come back later */ @@ -218,7 +226,6 @@ static gboolean vrequest_do_handle_write(vrequest *vr) { res = vr->handle_response_body(vr); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: break; case HANDLER_COMEBACK: vrequest_joblist_append(vr); /* come back later */ @@ -250,7 +257,6 @@ void vrequest_state_machine(vrequest *vr) { res = vr->handle_request_headers(vr); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: break; case HANDLER_COMEBACK: vrequest_joblist_append(vr); /* come back later */ @@ -258,7 +264,7 @@ void vrequest_state_machine(vrequest *vr) { break; case HANDLER_WAIT_FOR_FD: /* TODO: wait for fd */ case HANDLER_WAIT_FOR_EVENT: - done = TRUE; + done = (vr->state == VRS_HANDLE_REQUEST_HEADERS); break; case HANDLER_ERROR: vrequest_error(vr); @@ -275,7 +281,6 @@ void vrequest_state_machine(vrequest *vr) { res = vr->handle_response_headers(vr); switch (res) { case HANDLER_GO_ON: - case HANDLER_FINISHED: vr->state = VRS_WRITE_CONTENT; break; case HANDLER_COMEBACK: @@ -284,7 +289,7 @@ void vrequest_state_machine(vrequest *vr) { break; case HANDLER_WAIT_FOR_FD: /* TODO: wait for fd */ case HANDLER_WAIT_FOR_EVENT: - done = TRUE; + done = (vr->state == VRS_HANDLE_REQUEST_HEADERS); break; case HANDLER_ERROR: vrequest_error(vr); @@ -306,6 +311,10 @@ void vrequest_state_machine(vrequest *vr) { } void vrequest_joblist_append(vrequest *vr) { - /* TODO */ - vrequest_state_machine(vr); + GQueue *const q = &vr->con->wrk->job_queue; + worker *wrk = vr->con->wrk; + if (vr->job_queue_link) return; /* already in queue */ + g_queue_push_tail(q, vr); + vr->job_queue_link = g_queue_peek_tail_link(q); + ev_timer_start(wrk->loop, &wrk->job_queue_watcher); } diff --git a/src/worker.c b/src/worker.c index 3bd6a72..f251eed 100644 --- a/src/worker.c +++ b/src/worker.c @@ -135,6 +135,22 @@ static void worker_throttle_cb(struct ev_loop *loop, ev_timer *w, int revents) { waitqueue_update(&wrk->throttle_queue); } +/* run vreqest state machine */ +static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) { + worker *wrk = (worker*) w->data; + GQueue q = wrk->job_queue; + vrequest *vr; + UNUSED(loop); + UNUSED(revents); + + g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */ + + while (NULL != (vr = g_queue_pop_head(&q))) { + vr->job_queue_link = NULL; + vrequest_state_machine(vr); + } +} + /* cache timestamp */ GString *worker_current_timestamp(worker *wrk) { time_t cur_ts = (time_t)CUR_TS(wrk); @@ -299,6 +315,11 @@ worker* worker_new(struct server *srv, struct ev_loop *loop) { waitqueue_init(&wrk->throttle_queue, wrk->loop, worker_throttle_cb, 0.5, wrk); ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */ + /* job queue */ + g_queue_init(&wrk->job_queue); + ev_timer_init(&wrk->job_queue_watcher, worker_job_queue_cb, 0, 0); + wrk->job_queue_watcher.data = wrk; + return wrk; }