|
|
|
@ -31,8 +31,7 @@ LI_API gboolean mod_balancer_free(liModules *mods, liModule *mod);
|
|
|
|
|
typedef enum {
|
|
|
|
|
BE_ALIVE,
|
|
|
|
|
BE_OVERLOADED,
|
|
|
|
|
BE_DOWN,
|
|
|
|
|
BE_DOWN_RETRY
|
|
|
|
|
BE_DOWN
|
|
|
|
|
} backend_state;
|
|
|
|
|
|
|
|
|
|
typedef enum {
|
|
|
|
@ -41,22 +40,35 @@ typedef enum {
|
|
|
|
|
BAL_DOWN
|
|
|
|
|
} balancer_state;
|
|
|
|
|
|
|
|
|
|
typedef enum {
|
|
|
|
|
BM_SQF,
|
|
|
|
|
BM_ROUNDROBIN
|
|
|
|
|
} balancer_method;
|
|
|
|
|
|
|
|
|
|
struct backend {
|
|
|
|
|
liAction *act;
|
|
|
|
|
guint load;
|
|
|
|
|
backend_state state;
|
|
|
|
|
ev_tstamp wake;
|
|
|
|
|
};
|
|
|
|
|
typedef struct backend backend;
|
|
|
|
|
|
|
|
|
|
struct balancer {
|
|
|
|
|
GMutex *lock;
|
|
|
|
|
GArray *backends;
|
|
|
|
|
balancer_state state;
|
|
|
|
|
balancer_method method;
|
|
|
|
|
gint next_ndx;
|
|
|
|
|
};
|
|
|
|
|
typedef struct balancer balancer;
|
|
|
|
|
|
|
|
|
|
static balancer* balancer_new() {
|
|
|
|
|
static balancer* balancer_new(balancer_method method) {
|
|
|
|
|
balancer *b = g_slice_new(balancer);
|
|
|
|
|
b->lock = g_mutex_new();
|
|
|
|
|
b->backends = g_array_new(FALSE, TRUE, sizeof(backend));
|
|
|
|
|
b->method = method;
|
|
|
|
|
b->state = BAL_ALIVE;
|
|
|
|
|
b->next_ndx = 0;
|
|
|
|
|
|
|
|
|
|
return b;
|
|
|
|
|
}
|
|
|
|
@ -64,6 +76,7 @@ static balancer* balancer_new() {
|
|
|
|
|
static void balancer_free(liServer *srv, balancer *b) {
|
|
|
|
|
guint i;
|
|
|
|
|
if (!b) return;
|
|
|
|
|
g_mutex_free(b->lock);
|
|
|
|
|
for (i = 0; i < b->backends->len; i++) {
|
|
|
|
|
backend *be = &g_array_index(b->backends, backend, i);
|
|
|
|
|
li_action_release(srv, be->act);
|
|
|
|
@ -74,7 +87,7 @@ static void balancer_free(liServer *srv, balancer *b) {
|
|
|
|
|
|
|
|
|
|
static gboolean balancer_fill_backends(balancer *b, liServer *srv, liValue *val) {
|
|
|
|
|
if (val->type == LI_VALUE_ACTION) {
|
|
|
|
|
backend be = { val->data.val_action.action, 0, BE_ALIVE };
|
|
|
|
|
backend be = { val->data.val_action.action, 0, BE_ALIVE, 0 };
|
|
|
|
|
assert(srv == val->data.val_action.srv);
|
|
|
|
|
li_action_acquire(be.act);
|
|
|
|
|
g_array_append_val(b->backends, be);
|
|
|
|
@ -93,7 +106,7 @@ static gboolean balancer_fill_backends(balancer *b, liServer *srv, liValue *val)
|
|
|
|
|
}
|
|
|
|
|
assert(srv == oa->data.val_action.srv);
|
|
|
|
|
{
|
|
|
|
|
backend be = { oa->data.val_action.action, 0, BE_ALIVE };
|
|
|
|
|
backend be = { oa->data.val_action.action, 0, BE_ALIVE, 0 };
|
|
|
|
|
li_action_acquire(be.act);
|
|
|
|
|
g_array_append_val(b->backends, be);
|
|
|
|
|
}
|
|
|
|
@ -107,16 +120,72 @@ static gboolean balancer_fill_backends(balancer *b, liServer *srv, liValue *val)
|
|
|
|
|
|
|
|
|
|
static liHandlerResult balancer_act_select(liVRequest *vr, gboolean backlog_provided, gpointer param, gpointer *context) {
|
|
|
|
|
balancer *b = (balancer*) param;
|
|
|
|
|
gint be_ndx = 0;
|
|
|
|
|
backend *be = &g_array_index(b->backends, backend, be_ndx);
|
|
|
|
|
gint be_ndx, load;
|
|
|
|
|
guint i, j;
|
|
|
|
|
backend *be;
|
|
|
|
|
ev_tstamp now = ev_now(vr->wrk->loop);
|
|
|
|
|
gboolean all_dead = TRUE;
|
|
|
|
|
|
|
|
|
|
UNUSED(backlog_provided);
|
|
|
|
|
|
|
|
|
|
/* TODO implement some selection algorithms */
|
|
|
|
|
be_ndx = -1;
|
|
|
|
|
|
|
|
|
|
g_mutex_lock(b->lock);
|
|
|
|
|
|
|
|
|
|
switch (b->method) {
|
|
|
|
|
case BM_SQF:
|
|
|
|
|
load = -1;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < b->backends->len; i++) {
|
|
|
|
|
be = &g_array_index(b->backends, backend, i);
|
|
|
|
|
|
|
|
|
|
if (now >= be->wake) be->state = BE_ALIVE;
|
|
|
|
|
if (be->state != BE_DOWN) all_dead = FALSE;
|
|
|
|
|
if (be->state != BE_ALIVE) continue;
|
|
|
|
|
|
|
|
|
|
if (load == -1 || load > (gint) be->load) {
|
|
|
|
|
be_ndx = i;
|
|
|
|
|
load = be->load;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
case BM_ROUNDROBIN:
|
|
|
|
|
for (j = 0; j < b->backends->len; j++) {
|
|
|
|
|
i = (b->next_ndx + j) & b->backends->len;
|
|
|
|
|
be = &g_array_index(b->backends, backend, i);
|
|
|
|
|
|
|
|
|
|
if (now >= be->wake) be->state = BE_ALIVE;
|
|
|
|
|
if (be->state != BE_DOWN) all_dead = FALSE;
|
|
|
|
|
if (be->state != BE_ALIVE) continue;
|
|
|
|
|
|
|
|
|
|
be_ndx = i;
|
|
|
|
|
break; /* use first alive backend */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (-1 == be_ndx) {
|
|
|
|
|
/* Couldn't find a active backend */
|
|
|
|
|
if (all_dead) {
|
|
|
|
|
li_vrequest_backend_dead(vr);
|
|
|
|
|
} else {
|
|
|
|
|
li_vrequest_backend_overloaded(vr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_mutex_unlock(b->lock);
|
|
|
|
|
|
|
|
|
|
VR_ERROR(vr, "balancer select: %i", be_ndx);
|
|
|
|
|
return LI_HANDLER_GO_ON;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
be = &g_array_index(b->backends, backend, be_ndx);
|
|
|
|
|
be->load++;
|
|
|
|
|
|
|
|
|
|
g_mutex_unlock(b->lock);
|
|
|
|
|
|
|
|
|
|
VR_DEBUG(vr, "balancer select: %i", be_ndx);
|
|
|
|
|
|
|
|
|
|
li_action_enter(vr, be->act);
|
|
|
|
|
*context = GINT_TO_POINTER(be_ndx);
|
|
|
|
|
|
|
|
|
@ -135,11 +204,28 @@ static liHandlerResult balancer_act_fallback(liVRequest *vr, gboolean backlog_pr
|
|
|
|
|
|
|
|
|
|
VR_ERROR(vr, "balancer fallback: %i (error: %i)", be_ndx, error);
|
|
|
|
|
|
|
|
|
|
/* TODO implement fallback/backlog */
|
|
|
|
|
g_mutex_lock(b->lock);
|
|
|
|
|
|
|
|
|
|
be->load--;
|
|
|
|
|
|
|
|
|
|
if (error == BACKEND_OVERLOAD || be->load > 0) {
|
|
|
|
|
/* long timeout for overload - we will enable the backend anyway if another request finishs */
|
|
|
|
|
if (be->state == BE_ALIVE) be->wake = ev_now(vr->wrk->loop) + 5.0;
|
|
|
|
|
|
|
|
|
|
if (be->state != BE_DOWN) be->state = BE_OVERLOADED;
|
|
|
|
|
} else {
|
|
|
|
|
/* short timeout for dead backends - lets retry soon */
|
|
|
|
|
be->wake = ev_now(vr->wrk->loop) + 1.0;
|
|
|
|
|
|
|
|
|
|
be->state = BE_DOWN;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_mutex_unlock(b->lock);
|
|
|
|
|
|
|
|
|
|
*context = GINT_TO_POINTER(-1);
|
|
|
|
|
li_vrequest_backend_error(vr, error);
|
|
|
|
|
|
|
|
|
|
return balancer_act_select(vr, backlog_provided, param, context);
|
|
|
|
|
|
|
|
|
|
return LI_HANDLER_GO_ON;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -155,9 +241,16 @@ static liHandlerResult balancer_act_finished(liVRequest *vr, gpointer param, gpo
|
|
|
|
|
|
|
|
|
|
VR_ERROR(vr, "balancer finished: %i", be_ndx);
|
|
|
|
|
|
|
|
|
|
/* TODO implement backlog */
|
|
|
|
|
g_mutex_lock(b->lock);
|
|
|
|
|
|
|
|
|
|
/* TODO implement backlog */
|
|
|
|
|
be->load--;
|
|
|
|
|
|
|
|
|
|
/* reactivate it (if not alive), as it obviously isn't completely down */
|
|
|
|
|
be->state = BE_ALIVE;
|
|
|
|
|
|
|
|
|
|
g_mutex_unlock(b->lock);
|
|
|
|
|
|
|
|
|
|
return LI_HANDLER_GO_ON;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -165,7 +258,7 @@ static void balancer_act_free(liServer *srv, gpointer param) {
|
|
|
|
|
balancer_free(srv, (balancer*) param);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static liAction* balancer_rr(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
|
|
|
|
|
static liAction* balancer_create(liServer *srv, liWorker *wrk, liPlugin* p, liValue *val, gpointer userdata) {
|
|
|
|
|
balancer *b;
|
|
|
|
|
UNUSED(wrk); UNUSED(p); UNUSED(userdata);
|
|
|
|
|
|
|
|
|
@ -174,7 +267,8 @@ static liAction* balancer_rr(liServer *srv, liWorker *wrk, liPlugin* p, liValue
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b = balancer_new();
|
|
|
|
|
/* userdata contains the method */
|
|
|
|
|
b = balancer_new(GPOINTER_TO_INT(userdata));
|
|
|
|
|
if (!balancer_fill_backends(b, srv, val)) {
|
|
|
|
|
balancer_free(srv, b);
|
|
|
|
|
return NULL;
|
|
|
|
@ -189,7 +283,8 @@ static const liPluginOption options[] = {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static const liPluginAction actions[] = {
|
|
|
|
|
{ "balancer.rr", balancer_rr, NULL },
|
|
|
|
|
{ "balancer.rr", balancer_create, GINT_TO_POINTER(BM_ROUNDROBIN) },
|
|
|
|
|
{ "balancer.sqf", balancer_create, GINT_TO_POINTER(BM_SQF) },
|
|
|
|
|
{ NULL, NULL, NULL }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|