2
0
Fork 0

angel: support for restaring instance with SIGHUP

This commit is contained in:
Stefan Bühler 2009-10-17 23:50:41 +02:00
parent a25d0f81d0
commit 92904247d6
9 changed files with 400 additions and 105 deletions

View File

@ -10,15 +10,18 @@ typedef struct liPluginItemOption liPluginItemOption;
typedef struct liPlugin liPlugin;
typedef struct liPlugins liPlugins;
typedef gboolean (*liPluginInitCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginFreeCB) (liServer *srv, liPlugin *p);
typedef gboolean (*liPluginInitCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginFreeCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginCleanConfigCB) (liServer *srv, liPlugin *p);
typedef gboolean (*liPluginCheckConfigCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginActivateConfigCB)(liServer *srv, liPlugin *p);
typedef void (*liPluginParseItemCB) (liServer *srv, liPlugin *p, liValue **options);
typedef void (*liPluginCleanConfigCB) (liServer *srv, liPlugin *p);
typedef gboolean (*liPluginCheckConfigCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginActivateConfigCB) (liServer *srv, liPlugin *p);
typedef void (*liPluginParseItemCB) (liServer *srv, liPlugin *p, liValue **options);
typedef void (*liPluginHandleCallCB) (liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data);
typedef void (*liPluginHandleCallCB) (liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data);
typedef void (*liPluginInstanceReplacedCB) (liServer *srv, liPlugin *p, liInstance *oldi, liInstance *newi);
typedef void (*liPluginInstanceReachedStateCB)(liServer *srv, liPlugin *p, liInstance *i, liInstanceState s);
typedef enum {
LI_PLUGIN_ITEM_OPTION_MANDATORY = 1
@ -51,6 +54,9 @@ struct liPlugin {
liPluginCleanConfigCB handle_clean_config; /**< called before the reloading of the config is started or after the reloading failed */
liPluginCheckConfigCB handle_check_config; /**< called before activating a config to ensure everything works */
liPluginActivateConfigCB handle_activate_config; /**< called to activate a config after successful loading it. this cannot fail */
liPluginInstanceReplacedCB handle_instance_replaced;
liPluginInstanceReachedStateCB handle_instance_reached_state;
};
struct liPlugins {
@ -78,5 +84,16 @@ LI_API void li_plugins_handle_item(liServer *srv, GString *itemname, liValue *ha
LI_API gboolean li_plugins_load_module(liServer *srv, const gchar *name);
/* Needed by modules to register their plugin(s) */
LI_API liPlugin *li_angel_plugin_register(liServer *srv, liModule *mod, const gchar *name, liPluginInitCB init);
INLINE void li_angel_plugin_add_angel_cb(liPlugin *p, const gchar *name, liPluginHandleCallCB cb);
/* called when replace was successful or failed - check states to find out */
LI_API void li_angel_plugin_replaced_instance(liServer *srv, liInstance *oldi, liInstance *newi);
LI_API void li_angel_plugin_instance_reached_state(liServer *srv, liInstance *i, liInstanceState s);
/* inline implementations */
INLINE void li_angel_plugin_add_angel_cb(liPlugin *p, const gchar *name, liPluginHandleCallCB cb) {
g_hash_table_insert(p->angel_callbacks, (gchar*) name, (gpointer)(intptr_t) cb);
}
#endif

View File

@ -6,14 +6,18 @@
typedef struct liPluginCoreConfig liPluginCoreConfig;
struct liPluginCoreConfig {
/* Load */
liInstanceConf *load_instconf;
gboolean load_failed;
liInstanceConf *load_instconf;
GPtrArray *load_listen_masks;
/* Running */
liInstanceConf *instconf;
liInstance *inst;
GPtrArray *listen_masks;
liInstance *inst;
GHashTable *listen_sockets;
ev_signal sig_hup;
};
typedef struct liPluginCoreListenMask liPluginCoreListenMask;

View File

@ -9,14 +9,7 @@
#define LIGHTTPD_ANGEL_MAGIC ((guint)0x3e14ac65)
#endif
typedef enum {
LI_INSTANCE_DOWN, /* not started yet */
LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */
LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */
LI_INSTANCE_RUNNING, /* everything running */
LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */
LI_INSTANCE_FINISHED /* not running */
} liInstanceState;
typedef void (*liInstanceResourceFreeCB) (liServer *srv, liInstance *i, liPlugin *p, liInstanceResource *res);
struct liInstanceConf {
gint refcount;
@ -44,6 +37,8 @@ struct liInstance {
liInstance *replace, *replace_by;
liAngelConnection *acon;
GPtrArray *resources;
};
struct liServer {
@ -60,13 +55,21 @@ struct liServer {
liLog log;
};
struct liInstanceResource {
liInstanceResourceFreeCB free_cb;
liPlugin *plugin; /* may be NULL - we don't care about that */
guint ndx; /* internal array index */
gpointer data;
};
LI_API liServer* li_server_new(const gchar *module_dir);
LI_API void li_server_free(liServer* srv);
LI_API void li_server_stop(liServer *srv);
LI_API liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic);
LI_API void li_instance_replace(liInstance *oldi, liInstance *newi);
LI_API gboolean li_instance_replace(liInstance *oldi, liInstance *newi);
LI_API void li_instance_set_state(liInstance *i, liInstanceState s);
LI_API void li_instance_state_reached(liInstance *i, liInstanceState s);
@ -77,4 +80,7 @@ LI_API void li_instance_conf_acquire(liInstanceConf *ic);
LI_API void li_instance_release(liInstance *i);
LI_API void li_instance_acquire(liInstance *i);
LI_API void li_instance_add_resource(liInstance *i, liInstanceResource *res, liInstanceResourceFreeCB free_cb, liPlugin *p, gpointer data);
LI_API void li_instance_rem_resource(liInstance *i, liInstanceResource *res);
#endif

View File

@ -8,8 +8,18 @@ typedef struct liProc liProc;
/* angel_server.h */
typedef enum {
LI_INSTANCE_DOWN, /* not started yet */
LI_INSTANCE_SUSPENDED, /* inactive, neither accept nor logs, handle remaining connections */
LI_INSTANCE_WARMUP, /* only accept(), no logging: waiting for another instance to suspend */
LI_INSTANCE_RUNNING, /* everything running */
LI_INSTANCE_SUSPENDING, /* suspended accept(), still logging, handle remaining connections */
LI_INSTANCE_FINISHED /* not running */
} liInstanceState;
typedef struct liServer liServer;
typedef struct liInstance liInstance;
typedef struct liInstanceConf liInstanceConf;
typedef struct liInstanceResource liInstanceResource;
#endif

View File

@ -109,8 +109,8 @@ void li_plugins_clear(liServer *srv) {
g_hash_table_destroy(ps->module_refs);
g_hash_table_destroy(ps->load_module_refs);
g_hash_table_remove_all(ps->ht_plugins);
g_hash_table_remove_all(ps->load_ht_plugins);
g_hash_table_destroy(ps->ht_plugins);
g_hash_table_destroy(ps->load_ht_plugins);
g_ptr_array_free(ps->plugins, TRUE);
g_ptr_array_free(ps->load_plugins, TRUE);
@ -374,3 +374,23 @@ liPlugin *li_angel_plugin_register(liServer *srv, liModule *mod, const gchar *na
return p;
}
void li_angel_plugin_replaced_instance(liServer *srv, liInstance *oldi, liInstance *newi) {
liPlugins *ps = &srv->plugins;
guint i;
for (i = 0; i < ps->plugins->len; i++) {
liPlugin *p = g_ptr_array_index(ps->plugins, i);
if (p->handle_instance_replaced) p->handle_instance_replaced(srv, p, oldi, newi);
}
}
void li_angel_plugin_instance_reached_state(liServer *srv, liInstance *inst, liInstanceState s) {
liPlugins *ps = &srv->plugins;
guint i;
for (i = 0; i < ps->plugins->len; i++) {
liPlugin *p = g_ptr_array_index(ps->plugins, i);
if (p->handle_instance_reached_state) p->handle_instance_reached_state(srv, p, inst, s);
}
}

View File

@ -2,6 +2,22 @@
#include <lighttpd/angel_plugin_core.h>
#include <lighttpd/ip_parsers.h>
typedef struct listen_socket listen_socket;
typedef struct listen_ref_resource listen_ref_resource;
struct listen_socket {
gint refcount;
liSocketAddress addr;
int fd;
};
struct listen_ref_resource {
liInstanceResource ires;
listen_socket *sock;
};
#include <pwd.h>
#include <grp.h>
@ -255,48 +271,134 @@ static const liPluginItem core_items[] = {
{ NULL, NULL, NULL }
};
static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) {
guint32 ipv4;
#ifdef HAVE_IPV6
guint8 ipv6[16];
#endif
guint16 port;
static listen_socket* listen_new_socket(liSocketAddress *addr, int fd) {
listen_socket *sock = g_slice_new0(listen_socket);
sock->refcount = 0;
sock->addr = *addr;
sock->fd = fd;
return sock;
}
static void listen_socket_acquire(listen_socket *sock) {
g_atomic_int_inc(&sock->refcount);
}
static void listen_ref_release(liServer *srv, liInstance *i, liPlugin *p, liInstanceResource *res) {
listen_ref_resource *ref = res->data;
listen_socket *sock = ref->sock;
UNUSED(i);
UNUSED(srv);
assert(g_atomic_int_get(&sock->refcount) > 0);
if (g_atomic_int_dec_and_test(&sock->refcount)) {
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
g_hash_table_remove(config->listen_sockets, &sock->addr);
}
g_slice_free(listen_ref_resource, ref);
}
static void _listen_socket_free(gpointer ptr) {
listen_socket *sock = ptr;
li_sockaddr_clear(&sock->addr);
close(sock->fd);
g_slice_free(listen_socket, sock);
}
static void listen_socket_add(liInstance *i, liPlugin *p, listen_socket *sock) {
listen_ref_resource *ref = g_slice_new0(listen_ref_resource);
listen_socket_acquire(sock);
ref->sock = sock;
li_instance_add_resource(i, &ref->ires, listen_ref_release, p, ref);
}
static gboolean listen_check_acl(liServer *srv, liPluginCoreConfig *config, liSocketAddress *addr) {
guint i;
liPluginCoreListenMask *mask;
if (li_parse_ipv4(str->str, &ipv4, NULL, &port)) {
int s, v;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
if (!port) port = 80;
switch (addr->addr->plain.sa_family) {
case AF_INET: {
struct sockaddr_in *ipv4 = &addr->addr->ipv4;
guint port = ntohs(ipv4->sin_port);
if (config->listen_masks->len) {
for (i = 0; i < config->listen_masks->len; i++) {
mask = g_ptr_array_index(config->listen_masks, i);
switch (mask->type) {
case LI_PLUGIN_CORE_LISTEN_MASK_IPV4:
if (!li_ipv4_in_ipv4_net(ipv4, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue;
if (!li_ipv4_in_ipv4_net(ipv4->sin_addr.s_addr, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue;
if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue;
break;
return TRUE;
case LI_PLUGIN_CORE_LISTEN_MASK_IPV6:
if (!li_ipv4_in_ipv6_net(ipv4, mask->value.ipv6.addr, mask->value.ipv6.network)) continue;
if (!li_ipv4_in_ipv6_net(ipv4->sin_addr.s_addr, mask->value.ipv6.addr, mask->value.ipv6.network)) continue;
if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue;
break;
case LI_PLUGIN_CORE_LISTEN_MASK_UNIX:
return TRUE;
default:
continue;
}
break;
}
if (i == config->listen_masks->len) {
ERROR(srv, "listen to socket '%s' not allowed", str->str);
return -1;
}
return FALSE;
} else {
return (ipv4->sin_port == 80 || ipv4->sin_port == 443);
}
} break;
#ifdef HAVE_IPV6
case AF_INET6: {
struct sockaddr_in6 *ipv6 = &addr->addr->ipv6;
guint port = ntohs(ipv6->sin6_port);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipv4;
addr.sin_port = htons(port);
if (config->listen_masks->len) {
for (i = 0; i < config->listen_masks->len; i++) {
mask = g_ptr_array_index(config->listen_masks, i);
switch (mask->type) {
case LI_PLUGIN_CORE_LISTEN_MASK_IPV4:
if (!li_ipv6_in_ipv4_net(ipv6->sin6_addr.s6_addr, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue;
if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue;
return TRUE;
case LI_PLUGIN_CORE_LISTEN_MASK_IPV6:
if (!li_ipv6_in_ipv6_net(ipv6->sin6_addr.s6_addr, mask->value.ipv6.addr, mask->value.ipv6.network)) continue;
if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue;
return TRUE;
default:
continue;
}
}
return FALSE;
} else {
return (ipv6->sin6_port == 80 || ipv6->sin6_port == 443);
}
} break;
#endif
#ifdef HAVE_SYS_UN_H
case AF_UNIX: {
if (config->listen_masks->len) {
/* TODO: support unix addresses */
} else {
return FALSE; /* don't allow unix by default */
}
} break;
#endif
default:
ERROR(srv, "Address family %i not supported", addr->addr->plain.sa_family);
break;
}
return FALSE;
}
static int do_listen(liServer *srv, liSocketAddress *addr, GString *str) {
int s, v;
GString *ipv6_str;
switch (addr->addr->plain.sa_family) {
case AF_INET:
if (-1 == (s = socket(AF_INET, SOCK_STREAM, 0))) {
ERROR(srv, "Couldn't open socket: %s", g_strerror(errno));
return -1;
@ -307,7 +409,7 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) {
ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno));
return -1;
}
if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) {
if (-1 == bind(s, &addr->addr->plain, addr->len)) {
close(s);
ERROR(srv, "Couldn't bind socket to '%s': %s", str->str, g_strerror(errno));
return -1;
@ -317,43 +419,13 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) {
ERROR(srv, "Couldn't listen on '%s': %s", str->str, g_strerror(errno));
return -1;
}
DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, port);
DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, addr->addr->ipv4.sin_port);
return s;
#ifdef HAVE_IPV6
} else if (li_parse_ipv6(str->str, ipv6, NULL, &port)) {
GString *ipv6_str = g_string_sized_new(0);
int s, v;
struct sockaddr_in6 addr;
li_ipv6_tostring(ipv6_str, ipv6);
if (!port) port = 80;
case AF_INET6:
ipv6_str = g_string_sized_new(0);
li_ipv6_tostring(ipv6_str, addr->addr->ipv6.sin6_addr.s6_addr);
if (config->listen_masks->len) {
for (i = 0; i < config->listen_masks->len; i++) {
mask = g_ptr_array_index(config->listen_masks, i);
switch (mask->type) {
case LI_PLUGIN_CORE_LISTEN_MASK_IPV4:
if (!li_ipv6_in_ipv4_net(ipv6, mask->value.ipv4.addr, mask->value.ipv4.networkmask)) continue;
if ((mask->value.ipv4.port != port) && (mask->value.ipv4.port != 0 || (port != 80 && port != 443))) continue;
break;
case LI_PLUGIN_CORE_LISTEN_MASK_IPV6:
if (!li_ipv6_in_ipv6_net(ipv6, mask->value.ipv6.addr, mask->value.ipv6.network)) continue;
if ((mask->value.ipv6.port != port) && (mask->value.ipv6.port != 0 || (port != 80 && port != 443))) continue;
break;
case LI_PLUGIN_CORE_LISTEN_MASK_UNIX:
continue;
}
break;
}
if (i == config->listen_masks->len) {
ERROR(srv, "listen to socket '%s' not allowed", str->str);
return -1;
}
}
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
memcpy(&addr.sin6_addr, ipv6, 16);
addr.sin6_port = htons(port);
if (-1 == (s = socket(AF_INET6, SOCK_STREAM, 0))) {
ERROR(srv, "Couldn't open socket: %s", g_strerror(errno));
g_string_free(ipv6_str, TRUE);
@ -372,7 +444,7 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) {
g_string_free(ipv6_str, TRUE);
return -1;
}
if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) {
if (-1 == bind(s, &addr->addr->plain, addr->len)) {
close(s);
ERROR(srv, "Couldn't bind socket to '%s': %s", ipv6_str->str, g_strerror(errno));
g_string_free(ipv6_str, TRUE);
@ -384,32 +456,86 @@ static int do_listen(liServer *srv, liPluginCoreConfig *config, GString *str) {
g_string_free(ipv6_str, TRUE);
return -1;
}
DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, port);
DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, addr->addr->ipv6.sin6_port);
g_string_free(ipv6_str, TRUE);
return s;
#endif
/* TODO: listen unix socket */
} else {
ERROR(srv, "Invalid ip: '%s'", str->str);
return -1;
#ifdef HAVE_SYS_UN_H
case AF_UNIX:
ERROR(srv, "Unix sockets not supported: %s", str->str);
/* TODO: support unix addresses */
break;
#endif
default:
ERROR(srv, "Address family %i not supported", addr->addr->plain.sa_family);
break;
}
return -1;
}
static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) {
static void core_listen(liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data) {
GError *err = NULL;
gint fd;
GArray *fds;
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
liSocketAddress addr;
listen_socket *sock;
DEBUG(srv, "core_listen(%i) '%s'", id, data->str);
if (-1 == id) return; /* ignore simple calls */
fd = do_listen(srv, config, data);
addr = li_sockaddr_from_string(data, 80);
if (!addr.addr) {
GString *error = g_string_sized_new(0);
g_string_printf(error, "Invalid socket address: '%s'", data->str);
if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
}
return;
}
if (!listen_check_acl(srv, config, &addr)) {
GString *error = g_string_sized_new(0);
li_sockaddr_clear(&addr);
g_string_printf(error, "Socket address not allowed: '%s'", data->str);
if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
}
return;
}
if (NULL == (sock = g_hash_table_lookup(config->listen_sockets, &addr))) {
fd = do_listen(srv, &addr, data);
if (-1 == fd) {
GString *error = g_string_sized_new(0);
li_sockaddr_clear(&addr);
g_string_printf(error, "Couldn't listen to '%s'", data->str);
if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
}
return;
}
li_fd_init(fd);
sock = listen_new_socket(&addr, fd);
g_hash_table_insert(config->listen_sockets, &sock->addr, sock);
} else {
li_sockaddr_clear(&addr);
}
listen_socket_add(i, p, sock);
fd = dup(sock->fd);
if (-1 == fd) {
/* socket ref will be released when instance is released */
GString *error = g_string_sized_new(0);
g_string_printf(error, "Couldn't listen to '%s'", data->str);
g_string_printf(error, "Couldn't duplicate fd");
if (!li_angel_send_result(i->acon, id, error, NULL, NULL, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
@ -427,7 +553,7 @@ static void core_listen(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GS
}
}
static void core_reached_state(liServer *srv, liInstance *i, liPlugin *p, gint32 id, GString *data) {
static void core_reached_state(liServer *srv, liPlugin *p, liInstance *i, gint32 id, GString *data) {
UNUSED(srv);
UNUSED(p);
UNUSED(id);
@ -448,6 +574,8 @@ static void core_free(liServer *srv, liPlugin *p) {
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
guint i;
li_ev_safe_ref_and_stop(ev_signal_stop, srv->loop, &config->sig_hup);
core_clean(srv, p);
if (config->instconf) {
@ -456,7 +584,7 @@ static void core_free(liServer *srv, liPlugin *p) {
}
if (config->inst) {
li_instance_set_state(config->inst, LI_INSTANCE_DOWN);
li_instance_set_state(config->inst, LI_INSTANCE_FINISHED);
li_instance_release(config->inst);
config->inst = NULL;
}
@ -466,8 +594,11 @@ static void core_free(liServer *srv, liPlugin *p) {
}
g_ptr_array_free(config->listen_masks, TRUE);
g_ptr_array_free(config->load_listen_masks, TRUE);
g_hash_table_destroy(config->listen_sockets);
config->listen_masks = NULL;
config->load_listen_masks = NULL;
g_slice_free(liPluginCoreConfig, config);
}
static void core_clean(liServer *srv, liPlugin *p) {
@ -527,6 +658,31 @@ static void core_activate(liServer *srv, liPlugin *p) {
}
}
static void core_instance_replaced(liServer *srv, liPlugin *p, liInstance *oldi, liInstance *newi) {
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
UNUSED(srv);
if (oldi == config->inst && LI_INSTANCE_FINISHED == oldi->s_cur) {
li_instance_acquire(newi);
config->inst = newi;
li_instance_release(oldi);
}
}
static void core_handle_sig_hup(struct ev_loop *loop, ev_signal *w, int revents) {
liPluginCoreConfig *config = w->data;
liInstance *oldi, *newi;
UNUSED(loop);
UNUSED(revents);
if (NULL == (oldi = config->inst)) return;
if (oldi->replace_by) return;
newi = li_server_new_instance(oldi->srv, config->instconf);
li_instance_replace(oldi, newi);
li_instance_release(newi);
}
static gboolean core_init(liServer *srv, liPlugin *p) {
liPluginCoreConfig *config;
UNUSED(srv);
@ -537,12 +693,19 @@ static gboolean core_init(liServer *srv, liPlugin *p) {
p->handle_clean_config = core_clean;
p->handle_check_config = core_check;
p->handle_activate_config = core_activate;
p->handle_instance_replaced = core_instance_replaced;
config->listen_masks = g_ptr_array_new();
config->load_listen_masks = g_ptr_array_new();
config->listen_sockets = g_hash_table_new_full(li_hash_sockaddr, li_equal_sockaddr, NULL, _listen_socket_free);
g_hash_table_insert(p->angel_callbacks, "listen", (gpointer)(intptr_t)core_listen);
g_hash_table_insert(p->angel_callbacks, "reached-state", (gpointer)(intptr_t)core_reached_state);
li_angel_plugin_add_angel_cb(p, "listen", core_listen);
li_angel_plugin_add_angel_cb(p, "reached-state", core_reached_state);
ev_signal_init(&config->sig_hup, core_handle_sig_hup, SIGHUP);
config->sig_hup.data = config;
ev_signal_start(srv->loop, &config->sig_hup);
ev_unref(srv->loop);
return TRUE;
}

View File

@ -9,7 +9,7 @@
static void read_pipe(liServer *srv, liErrorPipe *epipe, gboolean flush) {
const ssize_t max_read = 8192;
ssize_t r, toread;
ssize_t r, toread = 0;
GString *buf;
int count = 10;
@ -104,8 +104,8 @@ void li_error_pipe_free(liErrorPipe *epipe) {
ev_io_stop(srv->loop, &epipe->fd_watcher);
li_error_pipe_flush(epipe);
if (-1 != epipe->fds[0]) close(epipe->fds[0]);
if (-1 != epipe->fds[1]) close(epipe->fds[1]);
if (-1 != epipe->fds[0]) { close(epipe->fds[0]); epipe->fds[0] = -1; }
if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; }
g_slice_free(liErrorPipe, epipe);
}
@ -114,7 +114,7 @@ void li_error_pipe_free(liErrorPipe *epipe) {
void li_error_pipe_activate(liErrorPipe *epipe) {
liServer *srv = epipe->srv;
if (-1 != epipe->fds[1]) close(epipe->fds[1]);
if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; }
ev_io_start(srv->loop, &epipe->fd_watcher);
}

View File

@ -95,7 +95,7 @@ static void instance_angel_call_cb(liAngelConnection *acon,
return;
}
cb(srv, i, p, id, data);
cb(srv, p, i, id, data);
}
static void instance_angel_close_cb(liAngelConnection *acon, GError *err) {
@ -200,12 +200,31 @@ liInstance* li_server_new_instance(liServer *srv, liInstanceConf *ic) {
i->s_cur = i->s_dest = LI_INSTANCE_DOWN;
ev_child_init(&i->child_watcher, instance_child_cb, -1, 0);
i->child_watcher.data = i;
i->resources = g_ptr_array_new();
return i;
}
void li_instance_replace(liInstance *oldi, liInstance *newi) {
/* TODO ??? */
gboolean li_instance_replace(liInstance *oldi, liInstance *newi) {
if (oldi->replace_by || newi->replace) return FALSE;
oldi->replace_by = newi;
newi->replace = oldi;
li_instance_acquire(oldi);
li_instance_acquire(newi);
li_instance_set_state(newi, LI_INSTANCE_WARMUP);
return TRUE;
}
static void li_instance_unset_replace(liInstance *oldi, liInstance *newi) {
g_assert(newi == oldi->replace_by); oldi->replace_by = NULL;
g_assert(oldi == newi->replace); newi->replace = NULL;
li_angel_plugin_replaced_instance(oldi->srv, oldi, newi);
li_instance_release(oldi);
li_instance_release(newi);
}
void li_instance_set_state(liInstance *i, liInstanceState s) {
@ -275,6 +294,9 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) {
}
break;
case LI_INSTANCE_SUSPENDED:
if (i->replace_by && i->replace_by->s_dest == LI_INSTANCE_WARMUP) {
li_instance_set_state(i->replace_by, LI_INSTANCE_RUNNING);
}
switch (i->s_dest) {
case LI_INSTANCE_DOWN:
break; /* impossible */
@ -294,7 +316,10 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) {
}
break;
case LI_INSTANCE_WARMUP:
/* TODO: replace another instance? */
if (i->replace) {
/* stop old instance */
li_instance_set_state(i->replace, LI_INSTANCE_FINISHED);
}
break;
case LI_INSTANCE_RUNNING:
/* nothing to do, instance should already know what to do */
@ -304,7 +329,19 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) {
break;
case LI_INSTANCE_FINISHED:
if (i->s_dest != LI_INSTANCE_FINISHED) {
/* TODO: replacing another instance failed? */
if (i->replace) {
ERROR(i->srv, "%s", "Replacing instance failed, continue old instance");
li_instance_set_state(i->replace, LI_INSTANCE_RUNNING);
li_instance_unset_replace(i->replace, i);
}
} else {
if (i->replace_by) {
if (i->replace_by->s_dest == LI_INSTANCE_WARMUP) {
li_instance_set_state(i->replace_by, LI_INSTANCE_RUNNING);
}
li_instance_unset_replace(i, i->replace_by);
}
}
break;
}
@ -317,12 +354,16 @@ void li_instance_state_reached(liInstance *i, liInstanceState s) {
} else {
li_instance_state_reached(i, LI_INSTANCE_FINISHED);
}
} else {
li_angel_plugin_instance_reached_state(i->srv, i, s);
}
}
void li_instance_release(liInstance *i) {
liServer *srv;
liInstance *t;
guint j;
if (!i) return;
srv = i->srv;
@ -342,6 +383,14 @@ void li_instance_release(liInstance *i) {
t = i->replace_by; i->replace_by = NULL;
li_instance_release(t);
for (j = 0; j < i->resources->len; j++) {
liInstanceResource *res = g_ptr_array_index(i->resources, j);
res->ndx = -1;
res->free_cb(srv, i, res->plugin, res);
}
g_ptr_array_free(i->resources, TRUE);
g_slice_free(liInstance, i);
}
@ -369,6 +418,8 @@ void li_instance_conf_release(liInstanceConf *ic) {
if (!ic) return;
assert(g_atomic_int_get(&ic->refcount) > 0);
if (!g_atomic_int_dec_and_test(&ic->refcount)) return;
if (ic->username) g_string_free(ic->username, TRUE);
g_strfreev(ic->cmd);
g_strfreev(ic->env);
g_slice_free(liInstanceConf, ic);
@ -378,3 +429,21 @@ void li_instance_conf_acquire(liInstanceConf *ic) {
assert(g_atomic_int_get(&ic->refcount) > 0);
g_atomic_int_inc(&ic->refcount);
}
void li_instance_add_resource(liInstance *i, liInstanceResource *res, liInstanceResourceFreeCB free_cb, liPlugin *p, gpointer data) {
res->free_cb = free_cb;
res->data = data;
res->plugin = p;
res->ndx = i->resources->len;
g_ptr_array_add(i->resources, res);
}
void li_instance_rem_resource(liInstance *i, liInstanceResource *res) {
liInstanceResource *res2;
g_assert(res == g_ptr_array_index(i->resources, res->ndx));
g_ptr_array_remove_index_fast(i->resources, res->ndx);
res2 = g_ptr_array_index(i->resources, res->ndx);
res2->ndx = res->ndx;
}

View File

@ -101,12 +101,15 @@ gint li_send_fd(gint s, gint fd) { /* write fd to unix socket s */
struct iovec iov;
#ifdef CMSG_FIRSTHDR
struct cmsghdr *cmsg;
#ifndef CMSG_SPACE
#define CMSG_SPACE(x) x+100
#endif
# ifndef CMSG_SPACE
# define CMSG_SPACE(x) x+100
# endif
gchar buf[CMSG_SPACE(sizeof(gint))];
#endif
memset(&msg, 0, sizeof(msg));
memset(&iov, 0, sizeof(iov));
iov.iov_len = 1;
iov.iov_base = "x";
msg.msg_iov = &iov;
@ -161,6 +164,9 @@ gint li_receive_fd(gint s, gint *fd) { /* read fd from unix socket s */
gchar x[100];
gchar name[100];
memset(&msg, 0, sizeof(msg));
memset(&iov, 0, sizeof(iov));
iov.iov_base = x;
iov.iov_len = 100;
msg.msg_name = name;