2
0
Fork 0

Add "state-wait" api for server state machine

personal/stbuehler/wip
Stefan Bühler 2010-02-13 15:29:21 +01:00
parent d18045ccbf
commit 30ebce774e
6 changed files with 138 additions and 21 deletions

View File

@ -17,6 +17,9 @@ typedef liAction*(*liPluginCreateActionCB) (liServer *srv, liPlugin *p, liValue
typedef gboolean (*liPluginSetupCB) (liServer *srv, liPlugin *p, liValue *val, gpointer userdata);
typedef void (*liPluginAngelCB) (liServer *srv, liPlugin *p, gint32 id, GString *data);
typedef void (*liPluginServerPrepareWorker)(liServer *srv, liPlugin *p, liWorker *wrk);
typedef void (*liPluginServerPrepare)(liServer *srv, liPlugin *p);
typedef void (*liPluginHandleCloseCB) (liConnection *con, liPlugin *p);
typedef liHandlerResult(*liPluginHandleVRequestCB)(liVRequest *vr, liPlugin *p);
typedef void (*liPluginHandleVRCloseCB) (liVRequest *vr, liPlugin *p);
@ -165,8 +168,6 @@ LI_API void li_plugins_stop_listen(liServer *srv); /* "prepare suspend", async *
LI_API void li_plugins_start_log(liServer *srv); /* "run" */
LI_API void li_plugins_stop_log(liServer *srv); /* "suspend now" */
LI_API void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state);
LI_API void li_plugins_handle_close(liConnection *con);
LI_API void li_plugins_handle_vrclose(liVRequest *vr);

View File

@ -17,6 +17,8 @@ typedef liNetworkStatus (*liConnectionWriteCB)(liConnection *con, goffset write_
typedef liNetworkStatus (*liConnectionReadCB)(liConnection *con);
typedef void (*liServerSocketReleaseCB)(liServerSocket *srv_sock);
typedef void (*liServerStateWaitCancelled)(liServer *srv, liServerStateWait *w);
typedef enum {
LI_SERVER_INIT, /** start state */
LI_SERVER_LOADING, /** config loaded, prepare listeing sockets/open log files */
@ -42,11 +44,24 @@ struct liServerSocket {
liServerSocketReleaseCB release_cb;
};
struct liServerStateWait {
GList queue_link;
gboolean active;
liServerStateWaitCancelled cancel_cb;
gpointer data;
};
struct liServer {
guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */
liServerState state, dest_state; /** atomic access */
liAngelConnection *acon;
/* state machine handling */
GMutex *statelock;
GQueue state_wait_queue;
liServerState state_wait_for;
ev_async state_ready_watcher;
GMutex *lualock;
struct lua_State *L; /** NULL if compiled without Lua */
@ -136,4 +151,10 @@ LI_API void li_server_socket_acquire(liServerSocket* sock);
LI_API void li_server_goto_state(liServer *srv, liServerState state);
LI_API void li_server_reached_state(liServer *srv, liServerState state);
/** threadsafe */
LI_API void li_server_state_ready(liServer *srv, liServerStateWait *sw);
/** only call from server state plugin hooks; push new wait condition to wait queue */
LI_API void li_server_state_wait(liServer *srv, liServerStateWait *sw);
#endif

View File

@ -201,6 +201,8 @@ typedef struct liResponse liResponse;
/* server.h */
typedef struct liServerStateWait liServerStateWait;
typedef struct liServer liServer;
typedef struct liServerSocket liServerSocket;

View File

@ -12,6 +12,8 @@
#include <fcntl.h>
#include <stdarg.h>
#define DEFAULT_TS_FORMAT "%d/%b/%Y %T %Z"
static void log_free_unlocked(liServer *srv, liLog *log);
static void log_thread_stop(liServer *srv);
static void log_thread_finish(liServer *srv);
@ -51,8 +53,10 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
ts = CORE_OPTIONPTR(LI_CORE_OPTION_LOG_TS_FORMAT).ptr;
}
else {
liOptionPtrValue *ologval;
ologval = g_array_index(srv->optionptr_def_values, liOptionPtrValue*, 0 + LI_CORE_OPTION_LOG);
liOptionPtrValue *ologval = NULL;
if (0 + LI_CORE_OPTION_LOG < srv->optionptr_def_values->len) {
ologval = g_array_index(srv->optionptr_def_values, liOptionPtrValue*, 0 + LI_CORE_OPTION_LOG);
}
if (ologval != NULL) logs = ologval->data.list;
}
@ -62,7 +66,7 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
return TRUE;*/
}
if (!ts) {
if (NULL == ts && 0 < srv->logs.timestamps->len) {
ts = g_array_index(srv->logs.timestamps, liLogTimestamp*, 0);
}
@ -105,6 +109,9 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
/* for normal error messages, we prepend a timestamp */
if (flags & LOG_FLAG_TIMESTAMP) {
time_t cur_ts;
liLogTimestamp fake_ts;
GString fake_ts_format;
GString *tmpstr = NULL;
g_mutex_lock(srv->logs.mutex);
@ -114,6 +121,14 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
else
cur_ts = time(NULL);
if (NULL == ts) {
ts = &fake_ts;
ts->last_ts = 0;
fake_ts_format = li_const_gstring(CONST_STR_LEN(DEFAULT_TS_FORMAT));
ts->format = &fake_ts_format;
ts->cached = tmpstr = g_string_sized_new(255);
}
if (cur_ts != ts->last_ts) {
gsize s;
#ifdef HAVE_LOCALTIME_R
@ -138,6 +153,8 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
g_string_prepend_c(log_line, ' ');
g_string_prepend_len(log_line, GSTR_LEN(ts->cached));
if (NULL != tmpstr) g_string_free(tmpstr, TRUE);
g_mutex_unlock(srv->logs.mutex);
}
@ -415,7 +432,7 @@ void li_log_init(liServer *srv) {
srv->logs.thread_alive = FALSE;
/* first entry in srv->logs.timestamps is the default timestamp */
li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN("%d/%b/%Y %T %Z")));
li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN(DEFAULT_TS_FORMAT)));
/* first entry in srv->logs.targets is the plain good old stderr */
str = g_string_new_len(CONST_STR_LEN("stderr"));

View File

@ -589,7 +589,7 @@ static void li_plugin_free_default_options(liServer *srv, liPlugin *p) {
}
}
void li_plugins_prepare_worker(liWorker *srv) { /* blocking callbacks */
void li_plugins_prepare_worker(liWorker *wrk) { /* blocking callbacks */
/* TODO */
}
void li_plugins_prepare(liServer* srv) { /* "prepare", async */
@ -608,7 +608,3 @@ void li_plugins_start_log(liServer *srv) { /* "run" */
void li_plugins_stop_log(liServer *srv) { /* "suspend now" */
/* TODO */
}
void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state) {
/* TODO */
}

View File

@ -14,6 +14,7 @@
static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents);
static void li_server_stop(liServer *srv);
static void state_ready_cb(struct ev_loop *loop, struct ev_async *w, int revents);
static liServerSocket* server_socket_new(int fd) {
liServerSocket *sock = g_slice_new0(liServerSocket);
@ -94,6 +95,12 @@ liServer* li_server_new(const gchar *module_dir) {
srv->state = LI_SERVER_INIT;
srv->dest_state = LI_SERVER_RUNNING;
srv->statelock = g_mutex_new();
g_queue_init(&srv->state_wait_queue);
srv->state_wait_for = srv->state;
ev_init(&srv->state_ready_watcher, state_ready_cb);
srv->state_ready_watcher.data = srv;
#ifdef HAVE_LUA_H
srv->L = luaL_newstate();
luaL_openlibs(srv->L);
@ -190,6 +197,8 @@ void li_server_free(liServer* srv) {
li_action_release(srv, srv->mainaction);
li_ev_safe_ref_and_stop(ev_async_stop, srv->loop, &srv->state_ready_watcher);
#ifdef HAVE_LUA_H
lua_close(srv->L);
srv->L = NULL;
@ -283,6 +292,9 @@ gboolean li_server_loop_init(liServer *srv) {
return FALSE;
}
ev_async_start(srv->loop, &srv->state_ready_watcher);
ev_unref(srv->loop); /* don't keep loop alive */
return TRUE;
}
@ -629,36 +641,40 @@ static liServerState li_server_next_state(liServer *srv) {
static void li_server_start_transition(liServer *srv, liServerState state) {
guint i;
liServerStateWait sw_dummy;
DEBUG(srv, "Try reaching state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state));
srv->state_wait_for = state;
memset(&sw_dummy, 0, sizeof(sw_dummy));
li_server_state_wait(srv, &sw_dummy);
switch (state) {
case LI_SERVER_INIT:
case LI_SERVER_LOADING:
li_server_reached_state(srv, state);
break;
case LI_SERVER_SUSPENDED:
/* TODO: wait for prepare / suspended */
li_server_reached_state(srv, LI_SERVER_SUSPENDED);
if (srv->state == LI_SERVER_LOADING) {
li_plugins_prepare(srv);
}
break;
case LI_SERVER_WARMUP:
li_server_start_listen(srv);
li_plugins_start_listen(srv);
li_server_reached_state(srv, LI_SERVER_WARMUP);
break;
case LI_SERVER_RUNNING:
if (LI_SERVER_WARMUP == srv->state) {
li_plugins_start_log(srv);
li_server_reached_state(srv, LI_SERVER_RUNNING);
} else if (LI_SERVER_SUSPENDING == srv->state) {
li_server_start_listen(srv);
li_plugins_start_listen(srv);
li_server_reached_state(srv, LI_SERVER_RUNNING);
}
break;
case LI_SERVER_SUSPENDING:
li_server_stop_listen(srv);
li_plugins_stop_listen(srv);
/* wait for closed connections and plugins */
/* TODO: wait */
li_server_reached_state(srv, LI_SERVER_SUSPENDING);
break;
case LI_SERVER_STOPPING:
/* stop all workers */
@ -675,6 +691,8 @@ static void li_server_start_transition(liServer *srv, liServerState state) {
/* wait */
break;
}
li_server_state_ready(srv, &sw_dummy);
}
void li_server_goto_state(liServer *srv, liServerState state) {
@ -704,6 +722,7 @@ void li_server_goto_state(liServer *srv, liServerState state) {
void li_server_reached_state(liServer *srv, liServerState state) {
liServerState want_state = li_server_next_state(srv);
liServerState old_state = srv->state;
GList *swlink;
if (state != want_state) return;
if (state == srv->state) return;
@ -711,6 +730,18 @@ void li_server_reached_state(liServer *srv, liServerState state) {
g_atomic_int_set(&srv->state, state);
DEBUG(srv, "Reached state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state));
/* cleanup state_wait_queue */
g_mutex_lock(srv->statelock);
while (NULL != (swlink = g_queue_pop_head_link(&srv->state_wait_queue))) {
liServerStateWait *sw = swlink->data;
sw->active = FALSE;
if (sw->cancel_cb) {
sw->cancel_cb(srv, sw);
}
}
g_mutex_unlock(srv->statelock);
switch (srv->state) {
case LI_SERVER_INIT:
break;
@ -725,9 +756,6 @@ void li_server_reached_state(liServer *srv, liServerState state) {
}
li_log_thread_start(srv);
li_plugins_prepare(srv);
/* wait for plugins to report success */
break;
case LI_SERVER_SUSPENDED:
if (LI_SERVER_SUSPENDING == old_state) {
@ -760,3 +788,55 @@ void li_server_reached_state(liServer *srv, liServerState state) {
li_server_start_transition(srv, want_state);
}
}
static void state_ready_cb(struct ev_loop *loop, struct ev_async *w, int revents) {
liServer *srv = w->data;
UNUSED(loop);
UNUSED(revents);
g_mutex_lock(srv->statelock);
if (srv->state_wait_queue.length > 0) {
/* not ready - ignore event */
g_mutex_unlock(srv->statelock);
return;
}
g_mutex_unlock(srv->statelock);
if (srv->state_wait_for != li_server_next_state(srv)) {
/* not the state we have been waiting for - ignore */
return;
}
/* IMPORTANT: do not call this while statelock is locked */
li_server_reached_state(srv, srv->state_wait_for);
}
/** threadsafe */
void li_server_state_ready(liServer *srv, liServerStateWait *sw) {
g_mutex_lock(srv->statelock);
if (sw->active) {
g_queue_unlink(&srv->state_wait_queue, &sw->queue_link);
sw->active = FALSE;
if (srv->state_wait_queue.length == 0) {
ev_async_send(srv->loop, &srv->state_ready_watcher);
}
}
g_mutex_unlock(srv->statelock);
}
/** only call from server state plugin hooks; push new wait condition to wait queue */
void li_server_state_wait(liServer *srv, liServerStateWait *sw) {
g_mutex_lock(srv->statelock);
sw->queue_link.data = sw;
g_queue_push_tail_link(&srv->state_wait_queue, &sw->queue_link);
sw->active = TRUE;
g_mutex_unlock(srv->statelock);
}