From 56ff7ca32ae63a862de425c1cef6e10a213d5f1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Sun, 30 Aug 2009 20:43:13 +0200 Subject: [PATCH] angel/lighty now synchronize the server state --- include/lighttpd/angel_server.h | 3 +- include/lighttpd/plugin.h | 14 ++ include/lighttpd/server.h | 20 ++- include/lighttpd/worker.h | 4 +- src/angel/angel_plugin_core.c | 19 +++ src/angel/angel_server.c | 27 +-- src/common/angel_connection.c | 10 +- src/main/angel.c | 3 + src/main/connection.c | 5 +- src/main/lighttpd.c | 10 +- src/main/log.c | 20 ++- src/main/plugin.c | 37 ++++- src/main/plugin_core.c | 27 +++ src/main/server.c | 283 +++++++++++++++++++++++++++----- src/main/worker.c | 31 ++++ 15 files changed, 433 insertions(+), 80 deletions(-) diff --git a/include/lighttpd/angel_server.h b/include/lighttpd/angel_server.h index 75867bf..d679e34 100644 --- a/include/lighttpd/angel_server.h +++ b/include/lighttpd/angel_server.h @@ -12,7 +12,7 @@ typedef enum { LI_INSTANCE_DOWN, /* not started yet */ LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */ - LI_INSTANCE_SILENT, /* only accept(), no logging: waiting for another instance to suspend */ + LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */ LI_INSTANCE_RUNNING, /* everything running */ LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */ LI_INSTANCE_FINISHED /* not running */ @@ -65,6 +65,7 @@ LI_API void li_server_stop(liServer *srv); LI_API liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic); LI_API void li_instance_replace(liInstance *oldi, liInstance *newi); LI_API void li_instance_set_state(liInstance *i, liInstanceState s); +LI_API void li_instance_state_reached(liInstance *i, liInstanceState s); LI_API liInstanceConf* li_instance_conf_new(gchar **cmd, GString *username, uid_t uid, gid_t gid); LI_API void li_instance_conf_release(liInstanceConf *ic); diff --git a/include/lighttpd/plugin.h b/include/lighttpd/plugin.h index 3489508..f6fc611 100644 --- a/include/lighttpd/plugin.h +++ b/include/lighttpd/plugin.h @@ -33,6 +33,8 @@ struct liPlugin { size_t opt_base_index; + gboolean ready_for_next_state; /**< don't modify this; use li_plugin_ready_for_state() instead */ + liPluginFreeCB free; /**< called before plugin is unloaded */ liPluginHandleVRequestCB handle_request_body; @@ -128,6 +130,18 @@ LI_API gboolean li_parse_option(liServer *srv, const char *name, liValue *val, l LI_API void li_release_option(liServer *srv, liOptionSet *mark); /**< Does not free the option_set memory */ LI_API void li_plugins_prepare_callbacks(liServer *srv); + +/* server state machine callbacks */ +LI_API void li_plugins_prepare_worker(liWorker *srv); /* blocking callbacks */ +LI_API void li_plugins_prepare(liServer *srv); /* "prepare", async */ + +LI_API void li_plugins_start_listen(liServer *srv); /* "warmup" */ +LI_API void li_plugins_stop_listen(liServer *srv); /* "prepare suspend", async */ +LI_API void li_plugins_start_log(liServer *srv); /* "run" */ +LI_API void li_plugins_stop_log(liServer *srv); /* "suspend now" */ + +LI_API void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state); + LI_API void li_plugins_handle_close(liConnection *con); LI_API void li_plugins_handle_vrclose(liVRequest *vr); diff --git a/include/lighttpd/server.h b/include/lighttpd/server.h index df75861..8c5101c 100644 --- a/include/lighttpd/server.h +++ b/include/lighttpd/server.h @@ -6,9 +6,14 @@ #endif typedef enum { - LI_SERVER_STARTING, /** start up: don't write log files, don't accept connections */ - LI_SERVER_RUNNING, /** running: write logs, accept connections */ - LI_SERVER_STOPPING /** stopping: flush logs, don't accept new connections */ + LI_SERVER_INIT, /** start state */ + LI_SERVER_LOADING, /** config loaded, prepare listeing sockets/open log files */ + LI_SERVER_SUSPENDED, /** ready to go, no logs */ + LI_SERVER_WARMUP, /** listen() active, no logs yet, handling remaining connections */ + LI_SERVER_RUNNING, /** listen() and logs active */ + LI_SERVER_SUSPENDING, /** listen() stopped, logs active, handling remaining connections */ + LI_SERVER_STOPPING, /** listen() stopped, no logs, handling remaining connections */ + LI_SERVER_DOWN /** exit */ } liServerState; struct liServerSocket { @@ -21,7 +26,7 @@ struct liServerSocket { struct liServer { guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */ - liServerState state; /** atomic access */ + liServerState state, dest_state; /** atomic access */ liAngelConnection *acon; liWorker *main_worker; @@ -91,10 +96,6 @@ LI_API gboolean li_server_worker_init(liServer *srv); LI_API void li_server_listen(liServer *srv, int fd); -/* Start accepting connection, use log files, no new plugins after that */ -LI_API void li_server_start(liServer *srv); -/* stop accepting connections, turn keep-alive off, close all shutdown sockets, set exiting = TRUE */ -LI_API void li_server_stop(liServer *srv); /* exit asap with cleanup */ LI_API void li_server_exit(liServer *srv); @@ -107,4 +108,7 @@ LI_API guint li_server_ts_format_add(liServer *srv, GString* format); LI_API void li_server_socket_release(liServerSocket* sock); LI_API void li_server_socket_acquire(liServerSocket* sock); +LI_API void li_server_goto_state(liServer *srv, liServerState state); +LI_API void li_server_reached_state(liServer *srv, liServerState state); + #endif diff --git a/include/lighttpd/worker.h b/include/lighttpd/worker.h index 893001d..e5db0b5 100644 --- a/include/lighttpd/worker.h +++ b/include/lighttpd/worker.h @@ -54,7 +54,7 @@ struct liWorker { struct ev_loop *loop; ev_prepare loop_prepare; ev_check loop_check; - ev_async li_worker_stop_watcher, li_worker_exit_watcher; + ev_async li_worker_stop_watcher, li_worker_suspend_watcher, li_worker_exit_watcher; guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections * use with atomic, read direct from local worker context @@ -76,7 +76,6 @@ struct liWorker { guint connection_load; /** incremented by server_accept_cb, decremented by worker_con_put. use atomic access */ - GArray *timestamps_gmt; /** array of (worker_ts), use only from local worker context and through li_worker_current_timestamp(wrk, LI_GMTIME, ndx) */ GArray *timestamps_local; @@ -106,6 +105,7 @@ LI_API void li_worker_free(liWorker *wrk); LI_API void li_worker_run(liWorker *wrk); LI_API void li_worker_stop(liWorker *context, liWorker *wrk); +LI_API void li_worker_suspend(liWorker *context, liWorker *wrk); LI_API void li_worker_exit(liWorker *context, liWorker *wrk); LI_API void li_worker_new_con(liWorker *ctx, liWorker *wrk, liSocketAddress remote_addr, int s, liServerSocket *srv_sock); diff --git a/src/angel/angel_plugin_core.c b/src/angel/angel_plugin_core.c index 70e9459..d65acf1 100644 --- a/src/angel/angel_plugin_core.c +++ b/src/angel/angel_plugin_core.c @@ -195,6 +195,8 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS GError *err = NULL; gint fd; GArray *fds; + UNUSED(p); + DEBUG(srv, "core_listen(%i) '%s'", id, data->str); if (-1 == id) return; /* ignore simple calls */ @@ -221,6 +223,22 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS } } +static void core_reached_state(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) { + UNUSED(srv); + UNUSED(p); + UNUSED(id); + + if (0 == strcmp(data->str, "suspended")) { + li_instance_state_reached(i, LI_INSTANCE_SUSPENDED); + } else if (0 == strcmp(data->str, "warmup")) { + li_instance_state_reached(i, LI_INSTANCE_WARMUP); + } else if (0 == strcmp(data->str, "running")) { + li_instance_state_reached(i, LI_INSTANCE_RUNNING); + } else if (0 == strcmp(data->str, "suspending")) { + li_instance_state_reached(i, LI_INSTANCE_SUSPENDING); + } +} + static void core_clean(liServer *srv, liPlugin *p); static void core_free(liServer *srv, liPlugin *p) { liPluginCoreConfig *config = (liPluginCoreConfig*) p->data; @@ -292,6 +310,7 @@ static gboolean core_init(liServer *srv, liPlugin *p) { p->handle_activate_config = core_activate; g_hash_table_insert(p->angel_callbacks, "listen", (gpointer)(intptr_t)core_listen); + g_hash_table_insert(p->angel_callbacks, "reached-state", (gpointer)(intptr_t)core_reached_state); return TRUE; } diff --git a/src/angel/angel_server.c b/src/angel/angel_server.c index 34b9e36..14db4ba 100644 --- a/src/angel/angel_server.c +++ b/src/angel/angel_server.c @@ -1,8 +1,6 @@ #include -static void instance_state_reached(liInstance *i, liInstanceState s); - #define CATCH_SIGNAL(loop, cb, n) do { \ ev_init(&srv->sig_w_##n, cb); \ ev_signal_set(&srv->sig_w_##n, SIG##n); \ @@ -70,6 +68,8 @@ static void instance_angel_call_cb(liAngelConnection *acon, liPlugins *ps = &srv->plugins; liPlugin *p; liPluginHandleCallCB cb; + UNUSED(mod_len); + UNUSED(action_len); p = g_hash_table_lookup(ps->ht_plugins, mod); if (!p) { @@ -152,7 +152,7 @@ static void instance_child_cb(struct ev_loop *loop, ev_child *w, int revents) { li_angel_connection_free(i->acon); i->acon = NULL; ev_child_stop(loop, w); - instance_state_reached(i, news); + li_instance_state_reached(i, news); li_instance_release(i); } @@ -205,6 +205,7 @@ liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) { } void li_instance_replace(liInstance *oldi, liInstance *newi) { + /* TODO ??? */ } void li_instance_set_state(liInstance *i, liInstanceState s) { @@ -214,7 +215,7 @@ void li_instance_set_state(liInstance *i, liInstanceState s) { case LI_INSTANCE_SUSPENDING: ERROR(i->srv, "Invalid destination state %i", s); return; /* cannot set this */ - case LI_INSTANCE_SILENT: + case LI_INSTANCE_WARMUP: case LI_INSTANCE_SUSPENDED: case LI_INSTANCE_RUNNING: case LI_INSTANCE_FINISHED: @@ -230,8 +231,8 @@ void li_instance_set_state(liInstance *i, liInstanceState s) { case LI_INSTANCE_DOWN: case LI_INSTANCE_SUSPENDING: break; /* cannot be set */ - case LI_INSTANCE_SILENT: - li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error); + case LI_INSTANCE_WARMUP: + li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("warmup"), NULL, &error); break; case LI_INSTANCE_SUSPENDED: li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("suspend"), NULL, &error); @@ -243,7 +244,7 @@ void li_instance_set_state(liInstance *i, liInstanceState s) { if (i->proc) { kill(i->proc->child_pid, SIGTERM); } else { - instance_state_reached(i, LI_INSTANCE_FINISHED); + li_instance_state_reached(i, LI_INSTANCE_FINISHED); } break; } @@ -254,13 +255,13 @@ void li_instance_set_state(liInstance *i, liInstanceState s) { if (i->proc) { kill(i->proc->child_pid, SIGTERM); } else { - instance_state_reached(i, LI_INSTANCE_FINISHED); + li_instance_state_reached(i, LI_INSTANCE_FINISHED); } } } } -static void instance_state_reached(liInstance *i, liInstanceState s) { +void li_instance_state_reached(liInstance *i, liInstanceState s) { GError *error = NULL; i->s_cur = s; @@ -279,9 +280,9 @@ static void instance_state_reached(liInstance *i, liInstanceState s) { break; /* impossible */ case LI_INSTANCE_SUSPENDED: break; - case LI_INSTANCE_SILENT: + case LI_INSTANCE_WARMUP: /* make sure we move to SILENT after we spawned the instance */ - li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("run-silent"), NULL, &error); + li_angel_send_simple_call(i->acon, CONST_STR_LEN("core"), CONST_STR_LEN("warmup"), NULL, &error); break; case LI_INSTANCE_RUNNING: /* make sure we move to RUNNING after we spawned the instance */ @@ -292,7 +293,7 @@ static void instance_state_reached(liInstance *i, liInstanceState s) { break; /* nothing to do, instance should already know what to do */ } break; - case LI_INSTANCE_SILENT: + case LI_INSTANCE_WARMUP: /* TODO: replace another instance? */ break; case LI_INSTANCE_RUNNING: @@ -314,7 +315,7 @@ static void instance_state_reached(liInstance *i, liInstanceState s) { if (i->proc) { kill(i->proc->child_pid, SIGTERM); } else { - instance_state_reached(i, LI_INSTANCE_FINISHED); + li_instance_state_reached(i, LI_INSTANCE_FINISHED); } } } diff --git a/src/common/angel_connection.c b/src/common/angel_connection.c index 573e4ba..7ce7b85 100644 --- a/src/common/angel_connection.c +++ b/src/common/angel_connection.c @@ -557,17 +557,17 @@ gboolean li_angel_send_simple_call( goto error; } - if (data->len > ANGEL_CALL_MAX_STR_LEN) { + if (data && data->len > ANGEL_CALL_MAX_STR_LEN) { g_set_error(err, LI_ANGEL_CALL_ERROR, LI_ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); goto error; } - if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error; + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data ? data->len : 0, 0, err)) goto error; g_mutex_lock(acon->mutex); queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); - send_queue_push_string(acon->out, data); + if (data) send_queue_push_string(acon->out, data); g_mutex_unlock(acon->mutex); if (queue_was_empty) @@ -629,7 +629,7 @@ gboolean li_angel_send_call( g_mutex_lock(acon->mutex); queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); - send_queue_push_string(acon->out, data); + if (data) send_queue_push_string(acon->out, data); g_mutex_unlock(acon->mutex); if (queue_was_empty) @@ -675,7 +675,7 @@ gboolean li_angel_send_result( queue_was_empty = (0 == acon->out->length); send_queue_push_string(acon->out, buf); send_queue_push_string(acon->out, error); - send_queue_push_string(acon->out, data); + if (data) send_queue_push_string(acon->out, data); send_queue_push_fds(acon->out, fds); g_mutex_unlock(acon->mutex); diff --git a/src/main/angel.c b/src/main/angel.c index e724641..fa545e3 100644 --- a/src/main/angel.c +++ b/src/main/angel.c @@ -8,6 +8,8 @@ static void angel_call_cb(liAngelConnection *acon, liServer *srv = acon->data; liPlugin *p; const liPluginAngel *acb; + UNUSED(action_len); + UNUSED(mod_len); if (NULL == (p = g_hash_table_lookup(srv->plugins, mod))) goto not_found; if (NULL == p->angelcbs) goto not_found; @@ -35,6 +37,7 @@ static void angel_close_cb(liAngelConnection *acon, GError *err) { void li_angel_setup(liServer *srv) { srv->acon = li_angel_connection_new(srv->loop, 0, srv, angel_call_cb, angel_close_cb); + srv->dest_state = LI_SERVER_SUSPENDED; } static void li_angel_listen_cb(liAngelCall *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds) { diff --git a/src/main/connection.c b/src/main/connection.c index de28ef5..bd6fa30 100644 --- a/src/main/connection.c +++ b/src/main/connection.c @@ -55,13 +55,16 @@ static void forward_response_body(liConnection *con) { static void connection_request_done(liConnection *con) { liVRequest *vr = con->mainvr; + liServerState s; + if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) { VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->keep_alive); } li_plugins_handle_close(con); - if (con->keep_alive && g_atomic_int_get(&con->srv->state) == LI_SERVER_RUNNING) { + s = g_atomic_int_get(&con->srv->dest_state); + if (con->keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) { li_connection_reset_keep_alive(con); } else { worker_con_put(con); diff --git a/src/main/lighttpd.c b/src/main/lighttpd.c index 9c11ebc..91ba528 100644 --- a/src/main/lighttpd.c +++ b/src/main/lighttpd.c @@ -143,14 +143,20 @@ int main(int argc, char *argv[]) { #endif } + if (!srv->mainaction) { + ERROR(srv, "%s", "No action handlers defined"); + return 1; + } + /* if config should only be tested, exit here */ if (test_config) return 0; /* TRACE(srv, "%s", "Test!"); */ - li_server_worker_init(srv); - li_server_start(srv); + li_server_reached_state(srv, LI_SERVER_LOADING); + li_worker_run(srv->main_worker); + li_server_reached_state(srv, LI_SERVER_DOWN); if (!luaconfig) config_parser_finish(srv, ctx_stack, TRUE); diff --git a/src/main/log.c b/src/main/log.c index 2cac357..bf5fdbe 100644 --- a/src/main/log.c +++ b/src/main/log.c @@ -14,9 +14,17 @@ void li_log_write(liServer *srv, liLog *log, GString *msg) { liLogEntry *log_entry; - if (g_atomic_int_get(&srv->state) == LI_SERVER_STARTING) { + switch (g_atomic_int_get(&srv->state)) { + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDED: + case LI_SERVER_WARMUP: + case LI_SERVER_STOPPING: + case LI_SERVER_DOWN: li_angel_log(srv, msg); return; + default: + break; } log_ref(srv, log); @@ -122,10 +130,18 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin g_string_append_len(log_line, CONST_STR_LEN("\r\n")); - if (g_atomic_int_get(&srv->state) == LI_SERVER_STARTING) { + switch (g_atomic_int_get(&srv->state)) { + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDED: + case LI_SERVER_WARMUP: + case LI_SERVER_STOPPING: + case LI_SERVER_DOWN: log_unref(srv, log); li_angel_log(srv, log_line); return TRUE; + default: + break; } log_entry = g_slice_new(liLogEntry); log_entry->log = log; diff --git a/src/main/plugin.c b/src/main/plugin.c index 4247967..612dd56 100644 --- a/src/main/plugin.c +++ b/src/main/plugin.c @@ -50,9 +50,12 @@ static void li_plugin_free_setups(liServer *srv, liPlugin *p) { } void li_plugin_free(liServer *srv, liPlugin *p) { + liServerState s; + if (!p) return; - if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) { + s = g_atomic_int_get(&srv->state); + if (LI_SERVER_INIT != s && LI_SERVER_DOWN != s) { ERROR(srv, "Cannot free plugin '%s' while server is running", p->name); return; } @@ -71,8 +74,10 @@ void li_plugin_free(liServer *srv, liPlugin *p) { void li_server_plugins_free(liServer *srv) { gpointer key, val; GHashTableIter i; + liServerState s; - if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) { + s = g_atomic_int_get(&srv->state); + if (LI_SERVER_INIT != s && LI_SERVER_DOWN != s) { ERROR(srv, "%s", "Cannot free plugins while server is running"); return; } @@ -95,13 +100,15 @@ void li_server_plugins_free(liServer *srv) { liPlugin *li_plugin_register(liServer *srv, const gchar *name, liPluginInitCB init) { liPlugin *p; + liServerState s; if (!init) { ERROR(srv, "Module '%s' needs an init function", name); return NULL; } - if (g_atomic_int_get(&srv->state) != LI_SERVER_STARTING) { + s = g_atomic_int_get(&srv->state); + if (LI_SERVER_INIT != s) { ERROR(srv, "Cannot register plugin '%s' after server was started", name); return NULL; } @@ -433,3 +440,27 @@ static void li_plugin_free_default_options(liServer *srv, liPlugin *p) { g_array_index(srv->option_def_values, liOptionValue, sopt->index) = oempty; } } + +void li_plugins_prepare_worker(liWorker *srv) { /* blocking callbacks */ + /* TODO */ +} +void li_plugins_prepare(liServer* srv) { /* "prepare", async */ + /* TODO */ +} + +void li_plugins_start_listen(liServer *srv) { /* "warmup" */ + /* TODO */ +} +void li_plugins_stop_listen(liServer *srv) { /* "prepare suspend", async */ + /* TODO */ +} +void li_plugins_start_log(liServer *srv) { /* "run" */ + /* TODO */ +} +void li_plugins_stop_log(liServer *srv) { /* "suspend now" */ + /* TODO */ +} + +void li_plugin_ready_for_state(liServer *srv, liPlugin *p, liServerState state) { + /* TODO */ +} diff --git a/src/main/plugin_core.c b/src/main/plugin_core.c index c91d706..35773c6 100644 --- a/src/main/plugin_core.c +++ b/src/main/plugin_core.c @@ -1287,6 +1287,29 @@ static liAction* core_throttle_connection(liServer *srv, liPlugin* p, liValue *v return li_action_new_function(core_handle_throttle_connection, NULL, NULL, GUINT_TO_POINTER((guint) rate)); } +static void core_warmup(liServer *srv, liPlugin *p, gint32 id, GString *data) { + UNUSED(p); + UNUSED(id); + UNUSED(data); + + li_server_goto_state(srv, LI_SERVER_WARMUP); +} + +static void core_run(liServer *srv, liPlugin *p, gint32 id, GString *data) { + UNUSED(p); + UNUSED(id); + UNUSED(data); + + li_server_goto_state(srv, LI_SERVER_RUNNING); +} + +static void core_suspend(liServer *srv, liPlugin *p, gint32 id, GString *data) { + UNUSED(p); + UNUSED(id); + UNUSED(data); + + li_server_goto_state(srv, LI_SERVER_SUSPENDED); +} static const liPluginOption options[] = { { "debug.log_request_handling", LI_VALUE_BOOLEAN, GINT_TO_POINTER(FALSE), NULL, NULL }, @@ -1357,6 +1380,10 @@ static const liPluginSetup setups[] = { }; static const liPluginAngel angelcbs[] = { + { "warmup", core_warmup }, + { "run", core_run }, + { "suspend", core_suspend }, + { NULL, NULL } }; diff --git a/src/main/server.c b/src/main/server.c index 89191a2..d916309 100644 --- a/src/main/server.c +++ b/src/main/server.c @@ -3,6 +3,7 @@ #include static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents); +static void li_server_stop(liServer *srv); static liServerSocket* server_socket_new(int fd) { liServerSocket *sock = g_slice_new0(liServerSocket); @@ -58,18 +59,15 @@ static void server_setup_free(gpointer _ss) { static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) { liServer *srv = (liServer*) w->data; + UNUSED(loop); UNUSED(revents); - if (g_atomic_int_get(&srv->state) != LI_SERVER_STOPPING) { + if (g_atomic_int_get(&srv->dest_state) != LI_SERVER_DOWN) { INFO(srv, "%s", "Got signal, shutdown"); - li_server_stop(srv); + li_server_goto_state(srv, LI_SERVER_DOWN); } else { INFO(srv, "%s", "Got second signal, force shutdown"); - - /* reset default behaviour which will kill us the third time */ - UNCATCH_SIGNAL(loop, INT); - UNCATCH_SIGNAL(loop, TERM); - UNCATCH_SIGNAL(loop, PIPE); + exit(1); } } @@ -82,7 +80,8 @@ liServer* li_server_new(const gchar *module_dir) { liServer* srv = g_slice_new0(liServer); srv->magic = LIGHTTPD_SERVER_MAGIC; - srv->state = LI_SERVER_STARTING; + srv->state = LI_SERVER_INIT; + srv->dest_state = LI_SERVER_RUNNING; srv->workers = g_array_new(FALSE, TRUE, sizeof(liWorker*)); srv->worker_count = 0; @@ -115,6 +114,7 @@ liServer* li_server_new(const gchar *module_dir) { log_init(srv); srv->io_timeout = 300; /* default I/O timeout */ + srv->keep_alive_queue_timeout = 5; return srv; } @@ -310,75 +310,69 @@ static void li_server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) { } } +/* main worker only */ void li_server_listen(liServer *srv, int fd) { liServerSocket *sock = server_socket_new(fd); sock->srv = srv; g_ptr_array_add(srv->sockets, sock); - if (g_atomic_int_get(&srv->state) == LI_SERVER_RUNNING) ev_io_start(srv->main_worker->loop, &sock->watcher); + if (LI_SERVER_RUNNING == srv->state || LI_SERVER_WARMUP == srv->state) ev_io_start(srv->main_worker->loop, &sock->watcher); } -void li_server_start(liServer *srv) { +static void li_server_start_listen(liServer *srv) { guint i; - liServerState srvstate = g_atomic_int_get(&srv->state); - if (srvstate == LI_SERVER_STOPPING || srvstate == LI_SERVER_RUNNING) return; /* no restart after stop */ - g_atomic_int_set(&srv->state, LI_SERVER_RUNNING); - - if (!srv->mainaction) { - ERROR(srv, "%s", "No action handlers defined"); - li_server_stop(srv); - return; - } - - srv->keep_alive_queue_timeout = 5; - - li_plugins_prepare_callbacks(srv); for (i = 0; i < srv->sockets->len; i++) { liServerSocket *sock = g_ptr_array_index(srv->sockets, i); ev_io_start(srv->main_worker->loop, &sock->watcher); } - srv->started = ev_now(srv->main_worker->loop); { GString *str = li_worker_current_timestamp(srv->main_worker, LI_LOCALTIME, LI_TS_FORMAT_DEFAULT); srv->started = ev_now(srv->main_worker->loop); srv->started_str = g_string_new_len(GSTR_LEN(str)); } - - log_thread_start(srv); - - li_worker_run(srv->main_worker); } -void li_server_stop(liServer *srv) { +static void li_server_stop_listen(liServer *srv) { guint i; - if (g_atomic_int_get(&srv->state) == LI_SERVER_STOPPING) return; - g_atomic_int_set(&srv->state, LI_SERVER_STOPPING); + for (i = 0; i < srv->sockets->len; i++) { + liServerSocket *sock = g_ptr_array_index(srv->sockets, i); + ev_io_stop(srv->main_worker->loop, &sock->watcher); + } - if (srv->main_worker) { - for (i = 0; i < srv->sockets->len; i++) { - liServerSocket *sock = g_ptr_array_index(srv->sockets, i); - ev_io_stop(srv->main_worker->loop, &sock->watcher); - } + /* suspend all workers (close keep-alive connections) */ + for (i = 0; i < srv->worker_count; i++) { + liWorker *wrk; + wrk = g_array_index(srv->workers, liWorker*, i); + li_worker_suspend(srv->main_worker, wrk); + } +} - /* stop all workers */ - for (i = 0; i < srv->worker_count; i++) { - liWorker *wrk; - wrk = g_array_index(srv->workers, liWorker*, i); - li_worker_stop(srv->main_worker, wrk); - } +static void li_server_stop(liServer *srv) { + guint i; + + for (i = 0; i < srv->sockets->len; i++) { + liServerSocket *sock = g_ptr_array_index(srv->sockets, i); + ev_io_stop(srv->main_worker->loop, &sock->watcher); } - log_thread_wakeup(srv); + /* stop all workers */ + for (i = 0; i < srv->worker_count; i++) { + liWorker *wrk; + wrk = g_array_index(srv->workers, liWorker*, i); + li_worker_stop(srv->main_worker, wrk); + } } void li_server_exit(liServer *srv) { li_server_stop(srv); g_atomic_int_set(&srv->exiting, TRUE); + g_atomic_int_set(&srv->state, LI_SERVER_DOWN); + g_atomic_int_set(&srv->dest_state, LI_SERVER_DOWN); /* exit all workers */ { @@ -444,3 +438,206 @@ guint li_server_ts_format_add(liServer *srv, GString* format) { g_array_append_val(srv->ts_formats, format); return i; } + +/* state machine: call this functions only in the main worker context */ +/* Note: main worker doesn't need atomic read for state */ + +#if 0 + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDED: + case LI_SERVER_WARMUP: + case LI_SERVER_RUNNING: + case LI_SERVER_SUSPENDING: + case LI_SERVER_STOPPING: + case LI_SERVER_DOWN: +#endif + +static const gchar* li_server_state_string(liServerState state) { + switch (state) { + case LI_SERVER_INIT: return "init"; + case LI_SERVER_LOADING: return "loading"; + case LI_SERVER_SUSPENDED: return "suspended"; + case LI_SERVER_WARMUP: return "warmup"; + case LI_SERVER_RUNNING: return "running"; + case LI_SERVER_SUSPENDING: return "suspending"; + case LI_SERVER_STOPPING: return "stopping"; + case LI_SERVER_DOWN: return "down"; + } + + return ""; +} + +/* next state in the machine we want to reach to reach */ +static liServerState li_server_next_state(liServer *srv) { + switch (srv->state) { + case LI_SERVER_INIT: + return LI_SERVER_LOADING; + case LI_SERVER_LOADING: + if (LI_SERVER_DOWN == srv->dest_state) return LI_SERVER_STOPPING; + return LI_SERVER_SUSPENDED; + case LI_SERVER_SUSPENDED: + switch (srv->dest_state) { + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDED: + return LI_SERVER_SUSPENDED; + case LI_SERVER_WARMUP: + case LI_SERVER_RUNNING: + case LI_SERVER_SUSPENDING: + return LI_SERVER_WARMUP; + case LI_SERVER_STOPPING: + case LI_SERVER_DOWN: + return LI_SERVER_STOPPING; + } + return LI_SERVER_DOWN; + case LI_SERVER_WARMUP: + if (LI_SERVER_WARMUP == srv->dest_state) return LI_SERVER_WARMUP; + return LI_SERVER_RUNNING; + case LI_SERVER_RUNNING: + if (LI_SERVER_RUNNING == srv->dest_state) return LI_SERVER_RUNNING; + return LI_SERVER_SUSPENDING; + case LI_SERVER_SUSPENDING: + if (LI_SERVER_RUNNING == srv->dest_state) return LI_SERVER_RUNNING; + if (LI_SERVER_SUSPENDING == srv->dest_state) return LI_SERVER_SUSPENDING; + return LI_SERVER_SUSPENDED; + case LI_SERVER_STOPPING: + case LI_SERVER_DOWN: + return LI_SERVER_DOWN; + } + return LI_SERVER_DOWN; +} + +static void li_server_start_transition(liServer *srv, liServerState state) { + guint i; + DEBUG(srv, "Try reaching state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state)); + + switch (state) { + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDED: + /* TODO: wait for prepare / suspended */ + li_server_reached_state(srv, LI_SERVER_SUSPENDED); + break; + case LI_SERVER_WARMUP: + li_server_start_listen(srv); + li_plugins_start_listen(srv); + li_server_reached_state(srv, LI_SERVER_WARMUP); + break; + case LI_SERVER_RUNNING: + if (LI_SERVER_WARMUP == srv->state) { + li_plugins_start_log(srv); + li_server_reached_state(srv, LI_SERVER_RUNNING); + } else if (LI_SERVER_SUSPENDING == srv->state) { + li_server_start_listen(srv); + li_plugins_start_listen(srv); + li_server_reached_state(srv, LI_SERVER_RUNNING); + } + break; + case LI_SERVER_SUSPENDING: + li_server_stop_listen(srv); + li_plugins_stop_listen(srv); + /* wait for closed connections and plugins */ + /* TODO: wait */ + li_server_reached_state(srv, LI_SERVER_SUSPENDING); + break; + case LI_SERVER_STOPPING: + /* stop all workers */ + for (i = 0; i < srv->worker_count; i++) { + liWorker *wrk; + wrk = g_array_index(srv->workers, liWorker*, i); + li_worker_stop(srv->main_worker, wrk); + } + + log_thread_wakeup(srv); + li_server_reached_state(srv, LI_SERVER_STOPPING); + break; + case LI_SERVER_DOWN: + /* wait */ + break; + } +} + +void li_server_goto_state(liServer *srv, liServerState state) { + if (srv->dest_state == LI_SERVER_DOWN || srv->dest_state == state) return; /* cannot undo this */ + + switch (state) { + case LI_SERVER_INIT: + case LI_SERVER_LOADING: + case LI_SERVER_SUSPENDING: + case LI_SERVER_STOPPING: + return; /* invalid dest states */ + case LI_SERVER_WARMUP: + case LI_SERVER_RUNNING: + case LI_SERVER_SUSPENDED: + case LI_SERVER_DOWN: + break; + } + + g_atomic_int_set(&srv->dest_state, state); + + if (srv->dest_state != srv->state) { + liServerState want_state = li_server_next_state(srv); + li_server_start_transition(srv, want_state); + } +} + +void li_server_reached_state(liServer *srv, liServerState state) { + liServerState want_state = li_server_next_state(srv); + liServerState old_state = srv->state; + + if (state != want_state) return; + if (state == srv->state) return; + + g_atomic_int_set(&srv->state, state); + DEBUG(srv, "Reached state: %s (dest: %s)", li_server_state_string(state), li_server_state_string(srv->dest_state)); + + switch (srv->state) { + case LI_SERVER_INIT: + break; + case LI_SERVER_LOADING: + li_plugins_prepare_callbacks(srv); + li_server_worker_init(srv); + + { + GString *str = li_worker_current_timestamp(srv->main_worker, LI_LOCALTIME, LI_TS_FORMAT_DEFAULT); + srv->started = ev_now(srv->main_worker->loop); + srv->started_str = g_string_new_len(GSTR_LEN(str)); + } + + log_thread_start(srv); + + li_plugins_prepare(srv); + /* wait for plugins to report success */ + break; + case LI_SERVER_SUSPENDED: + if (LI_SERVER_SUSPENDING == old_state) { + li_plugins_stop_log(srv); + } + break; + case LI_SERVER_WARMUP: + case LI_SERVER_RUNNING: + break; + case LI_SERVER_SUSPENDING: + case LI_SERVER_STOPPING: + break; + case LI_SERVER_DOWN: + /* li_server_exit(srv); */ + return; + } + + if (srv->acon) { + GString *data = g_string_new(li_server_state_string(srv->state)); + GError *err = NULL; + + if (!li_angel_send_simple_call(srv->acon, CONST_STR_LEN("core"), CONST_STR_LEN("reached-state"), data, &err)) { + GERROR(srv, err, "%s", "couldn't send state update to angel"); + g_error_free(err); + } + } + + if (srv->dest_state != srv->state) { + want_state = li_server_next_state(srv); + li_server_start_transition(srv, want_state); + } +} diff --git a/src/main/worker.c b/src/main/worker.c index 6f1d920..496920a 100644 --- a/src/main/worker.c +++ b/src/main/worker.c @@ -237,6 +237,15 @@ static void li_worker_stop_cb(struct ev_loop *loop, ev_async *w, int revents) { li_worker_stop(wrk, wrk); } +/* stop worker watcher */ +static void li_worker_suspend_cb(struct ev_loop *loop, ev_async *w, int revents) { + liWorker *wrk = (liWorker*) w->data; + UNUSED(loop); + UNUSED(revents); + + li_worker_suspend(wrk, wrk); +} + /* exit worker watcher */ static void li_worker_exit_cb(struct ev_loop *loop, ev_async *w, int revents) { liWorker *wrk = (liWorker*) w->data; @@ -368,6 +377,10 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) { wrk->li_worker_stop_watcher.data = wrk; ev_async_start(wrk->loop, &wrk->li_worker_stop_watcher); + ev_init(&wrk->li_worker_suspend_watcher, li_worker_suspend_cb); + wrk->li_worker_suspend_watcher.data = wrk; + ev_async_start(wrk->loop, &wrk->li_worker_suspend_watcher); + ev_init(&wrk->new_con_watcher, li_worker_new_con_cb); wrk->new_con_watcher.data = wrk; ev_async_start(wrk->loop, &wrk->new_con_watcher); @@ -507,6 +520,7 @@ void li_worker_stop(liWorker *context, liWorker *wrk) { guint i; ev_async_stop(wrk->loop, &wrk->li_worker_stop_watcher); + ev_async_stop(wrk->loop, &wrk->li_worker_suspend_watcher); ev_async_stop(wrk->loop, &wrk->new_con_watcher); li_waitqueue_stop(&wrk->io_timeout_queue); li_waitqueue_stop(&wrk->throttle_queue); @@ -533,6 +547,23 @@ void li_worker_stop(liWorker *context, liWorker *wrk) { } } +void li_worker_suspend(liWorker *context, liWorker *wrk) { + if (context == wrk) { + guint i; + + /* close keep alive connections */ + for (i = wrk->connections_active; i-- > 0;) { + liConnection *con = g_array_index(wrk->connections, liConnection*, i); + if (con->state == LI_CON_STATE_KEEP_ALIVE) + worker_con_put(con); + } + + li_worker_check_keepalive(wrk); + } else { + ev_async_send(wrk->loop, &wrk->li_worker_suspend_watcher); + } +} + void li_worker_exit(liWorker *context, liWorker *wrk) { if (context == wrk) { ev_unloop (wrk->loop, EVUNLOOP_ALL);