Completed core balancer support, added a simple test balancer.

personal/stbuehler/wip
Stefan Bühler 15 years ago
parent fa25947b7e
commit 6636919623

@ -21,7 +21,7 @@ typedef enum {
struct action_stack {
GArray* stack;
gboolean handle_backend_fail;
gboolean backend_failed;
backend_error backend_error;
};
@ -44,10 +44,10 @@ struct action_func {
typedef struct action_func action_func;
typedef handler_t (*BackendSelect)(vrequest *vr, gpointer param, gpointer *context);
typedef handler_t (*BackendFallback)(vrequest *vr, gpointer param, gpointer *context);
typedef handler_t (*BackendSelect)(vrequest *vr, gboolean backlog_provided, gpointer param, gpointer *context);
typedef handler_t (*BackendFallback)(vrequest *vr, gboolean backlog_provided, gpointer param, gpointer *context, backend_error error);
typedef handler_t (*BackendFinished)(vrequest *vr, gpointer param, gpointer context);
typedef handler_t (*BalancerFree)(server *srv, gpointer param);
typedef void (*BalancerFree)(server *srv, gpointer param);
struct balancer_func {
BackendSelect select;
BackendFallback fallback;
@ -97,5 +97,6 @@ LI_API action *action_new_setting(option_set setting);
LI_API action *action_new_function(ActionFunc func, ActionCleanup fcleanup, ActionFree ffree, gpointer param);
LI_API action *action_new_list();
LI_API action *action_new_condition(condition *cond, action *target, action *target_else);
LI_API action *action_new_balancer(BackendSelect bselect, BackendFallback bfallback, BackendFinished bfinished, BalancerFree bfree, gpointer param, gboolean provide_backlog);
#endif

@ -79,6 +79,16 @@ struct vrequest {
GList *job_queue_link;
};
#define VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr) \
do { \
if (vr->state == VRS_HANDLE_REQUEST_HEADERS) { \
VR_ERROR(vr, "%s", "Cannot wait for response headers as no backend handler found - fix your config"); \
return HANDLER_ERROR; \
} else if (vr->state < VRS_HANDLE_RESPONSE_HEADERS) { \
return HANDLER_WAIT_FOR_EVENT; \
} \
} while (0)
LI_API vrequest* vrequest_new(struct connection *con, vrequest_handler handle_response_headers, vrequest_handler handle_response_body, vrequest_handler handle_response_error, vrequest_handler handle_request_headers);
LI_API void vrequest_free(vrequest *vr);
LI_API void vrequest_reset(vrequest *vr);

@ -10,7 +10,7 @@ struct action_stack_element {
gpointer context;
guint pos;
} data;
gboolean finished;
gboolean finished, backlog_provided;
};
void action_release(server *srv, action *a) {
@ -38,6 +38,11 @@ void action_release(server *srv, action *a) {
}
g_array_free(a->data.list, TRUE);
break;
case ACTION_TBALANCER:
if (a->data.balancer.free) {
a->data.balancer.free(srv, a->data.balancer.param);
}
break;
}
g_slice_free(action, a);
}
@ -96,6 +101,22 @@ action *action_new_condition(condition *cond, action *target, action *target_els
return a;
}
action *action_new_balancer(BackendSelect bselect, BackendFallback bfallback, BackendFinished bfinished, BalancerFree bfree, gpointer param, gboolean provide_backlog) {
action *a;
a = g_slice_new(action);
a->refcount = 1;
a->type = ACTION_TBALANCER;
a->data.balancer.select = bselect;
a->data.balancer.fallback = bfallback;
a->data.balancer.finished = bfinished;
a->data.balancer.free = bfree;
a->data.balancer.param = param;
a->data.balancer.provide_backlog = provide_backlog;
return a;
}
static void action_stack_element_release(server *srv, vrequest *vr, action_stack_element *ase) {
action *a = ase->act;
if (!ase || !a) return;
@ -142,15 +163,18 @@ void action_stack_clear(vrequest *vr, action_stack *as) {
as->stack = NULL;
}
static action_stack_element *action_stack_top(action_stack* as) {
return as->stack->len > 0 ? &g_array_index(as->stack, action_stack_element, as->stack->len - 1) : NULL;
}
/** handle sublist now, remember current position (stack) */
void action_enter(vrequest *vr, action *a) {
action_stack *as = &vr->action_stack;
action_stack_element *top_ase = action_stack_top(as);
action_stack_element ase = { a, { 0 }, FALSE,
(top_ase ? top_ase->backlog_provided || (top_ase->act->type == ACTION_TBALANCER && top_ase->act->data.balancer.provide_backlog) : FALSE) };
action_acquire(a);
action_stack_element ase = { a, { 0 }, FALSE };
g_array_append_val(vr->action_stack.stack, ase);
}
static action_stack_element *action_stack_top(action_stack* as) {
return as->stack->len > 0 ? &g_array_index(as->stack, action_stack_element, as->stack->len - 1) : NULL;
g_array_append_val(as->stack, ase);
}
static void action_stack_pop(server *srv, vrequest *vr, action_stack *as) {
@ -167,10 +191,47 @@ handler_t action_execute(vrequest *vr) {
server *srv = vr->con->srv;
while (NULL != (ase = action_stack_top(as))) {
if (as->backend_failed) {
while (ase->act->type != ACTION_TBALANCER || !ase->act->data.balancer.provide_backlog) {
action_stack_pop(srv, vr, as);
ase = action_stack_top(as);
if (!ase) { /* no backlogging balancer found */
if (vrequest_handle_direct(vr))
vr->response.http_status = 503;
return HANDLER_GO_ON;
}
}
as->backend_failed = FALSE;
ase->finished = FALSE;
res = a->data.balancer.fallback(vr, ase->backlog_provided, a->data.balancer.param, &ase->data.context, as->backend_error);
switch (res) {
case HANDLER_GO_ON:
ase->finished = TRUE;
break;
case HANDLER_ERROR:
action_stack_reset(vr, as);
case HANDLER_COMEBACK:
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_WAIT_FOR_FD:
return res;
}
if (as->backend_failed) { /* if balancer failed, pop it */
action_stack_element *tmp_ase;
while (ase != (tmp_ase = action_stack_top(as))) {
action_stack_pop(srv, vr, as);
}
}
continue;
}
if (ase->finished) {
/* a TFUNCTION may enter sub actions _and_ return GO_ON, so we cannot pop the last element
* but we have to remember we already executed it
*/
if (ase->act->type == ACTION_TBALANCER) {
/* wait until we found a backend */
VREQUEST_WAIT_FOR_RESPONSE_HEADERS(vr);
}
action_stack_pop(srv, vr, as);
continue;
}
@ -227,7 +288,7 @@ handler_t action_execute(vrequest *vr) {
}
break;
case ACTION_TBALANCER:
res = a->data.balancer.select(vr, a->data.balancer.param, &ase->data.context);
res = a->data.balancer.select(vr, ase->backlog_provided, a->data.balancer.param, &ase->data.context);
switch (res) {
case HANDLER_GO_ON:
ase->finished = TRUE;
@ -242,5 +303,9 @@ handler_t action_execute(vrequest *vr) {
break;
}
}
if (as->backend_failed) {
if (vrequest_handle_direct(vr))
vr->response.http_status = 503;
}
return HANDLER_GO_ON;
}

@ -1,12 +1,161 @@
#include <lighttpd/base.h>
typedef enum {
BE_ALIVE,
BE_OVERLOADED,
BE_DOWN,
BE_DOWN_RETRY
} backend_state;
typedef enum {
BAL_ALIVE,
BAL_OVERLOADED,
BAL_DOWN
} balancer_state;
struct backend {
action *act;
guint load;
backend_state state;
};
typedef struct backend backend;
struct balancer {
GArray *backends;
balancer_state state;
};
typedef struct balancer balancer;
static balancer* balancer_new() {
balancer *b = g_slice_new(balancer);
b->backends = g_array_new(FALSE, TRUE, sizeof(backend));
return b;
}
static void balancer_free(server *srv, balancer *b) {
guint i;
if (!b) return;
for (i = 0; i < b->backends->len; i++) {
backend *be = &g_array_index(b->backends, backend, i);
action_release(srv, be->act);
}
g_array_free(b->backends, TRUE);
g_slice_free(balancer, b);
}
static gboolean balancer_fill_backends(balancer *b, server *srv, value *val) {
if (val->type == VALUE_ACTION) {
backend be = { val->data.val_action.action, 0, BE_ALIVE };
assert(srv == val->data.val_action.srv);
action_acquire(be.act);
g_array_append_val(b->backends, be);
return TRUE;
} else if (val->type == VALUE_LIST) {
guint i;
if (val->data.list->len == 0) {
ERROR(srv, "%s", "expected non-empty list");
return FALSE;
}
for (i = 0; i < val->data.list->len; i++) {
value *oa = g_array_index(val->data.list, value*, i);
if (oa->type != VALUE_ACTION) {
ERROR(srv, "expected action at entry %u of list, got %s", i, value_type_string(oa->type));
return FALSE;
}
assert(srv == oa->data.val_action.srv);
backend be = { oa->data.val_action.action, 0, BE_ALIVE };
action_acquire(be.act);
g_array_append_val(b->backends, be);
}
return TRUE;
} else {
ERROR(srv, "expected list, got %s", value_type_string(val->type));
return FALSE;
}
}
static handler_t balancer_act_select(vrequest *vr, gboolean backlog_provided, gpointer param, gpointer *context) {
balancer *b = (balancer*) param;
gint be_ndx = 0;
backend *be = &g_array_index(b->backends, backend, be_ndx);
UNUSED(backlog_provided);
/* TODO implement some selection algorithms */
be->load++;
action_enter(vr, be->act);
*context = GINT_TO_POINTER(be_ndx);
return HANDLER_GO_ON;
}
static handler_t balancer_act_fallback(vrequest *vr, gboolean backlog_provided, gpointer param, gpointer *context, backend_error error) {
balancer *b = (balancer*) param;
gint be_ndx = GPOINTER_TO_INT(context);
backend *be = &g_array_index(b->backends, backend, be_ndx);
UNUSED(backlog_provided);
UNUSED(error);
if (be_ndx < 0) return HANDLER_GO_ON;
/* TODO implement fallback/backlog */
be->load--;
if (vrequest_handle_direct(vr))
vr->response.http_status = 503;
return HANDLER_GO_ON;
}
static handler_t balancer_act_finished(vrequest *vr, gpointer param, gpointer context) {
balancer *b = (balancer*) param;
gint be_ndx = GPOINTER_TO_INT(context);
backend *be = &g_array_index(b->backends, backend, be_ndx);
UNUSED(vr);
if (be_ndx < 0) return HANDLER_GO_ON;
/* TODO implement backlog */
be->load--;
return HANDLER_GO_ON;
}
static void balancer_act_free(server *srv, gpointer param) {
balancer_free(srv, (balancer*) param);
}
static action* balancer_rr(server *srv, plugin* p, value *val) {
balancer *b;
action *a;
UNUSED(p);
if (!val) {
ERROR(srv, "%s", "need parameter");
return NULL;
}
b = balancer_new();
if (!balancer_fill_backends(b, srv, val)) {
balancer_free(srv, b);
return NULL;
}
a = action_new_balancer(balancer_act_select, balancer_act_fallback, balancer_act_finished, balancer_act_free, b, TRUE);
return a;
}
static const plugin_option options[] = {
{ NULL, 0, NULL, NULL, NULL }
};
static const plugin_action actions[] = {
// { "balancer.rr", status_page },
{ "balancer.rr", balancer_rr },
{ NULL, NULL }
};

@ -257,6 +257,11 @@ void vrequest_state_machine(vrequest *vr) {
res = vr->handle_request_headers(vr);
switch (res) {
case HANDLER_GO_ON:
if (vr->state == VRS_HANDLE_REQUEST_HEADERS) {
/* unhandled request */
vr->response.http_status = 404;
vrequest_handle_direct(vr);
}
break;
case HANDLER_COMEBACK:
vrequest_joblist_append(vr); /* come back later */

Loading…
Cancel
Save