Big lighttpd-angel update; still not complete, but supports fork+exec+setuid/gid+listen (no checks in listen yet)

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent fb8cfb4a88
commit f8be820c36

@ -2,12 +2,13 @@
#define _LIGHTTPD_ANGEL_H_
/* interface to the angel; implementation needs to work without angel too */
LI_API void angel_setup(server *srv);
/* listen to a socket */
LI_API int angel_listen(server *srv, GString *str);
LI_API void angel_listen(server *srv, GString *str);
/* send log messages during startup to angel, frees the string */
LI_API gboolean angel_log(server *srv, GString *str);
LI_API void angel_log(server *srv, GString *str);
/* angle_fake definitions, only for internal use */

@ -17,6 +17,9 @@ typedef struct server server;
struct instance;
typedef struct instance instance;
struct instance_conf;
typedef struct instance_conf instance_conf;
#include <lighttpd/angel_value.h>
#include <lighttpd/angel_data.h>

@ -11,24 +11,20 @@ typedef struct angel_connection angel_connection;
struct angel_call;
typedef struct angel_call angel_call;
typedef void (*AngelCallback)(gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds);
/* error, data and fds-array will be freed/closed by the angel api itself; if you want to use the fds set the array size to 0 */
typedef void (*AngelCallback)(angel_call *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds);
typedef void (*AngelReceiveCall)(angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *data);
typedef void (*AngelReceiveResult)(angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *error, GString *data, GArray *fds);
/* gets called after read/write errors */
typedef void (*AngelCloseCallback)(angel_connection *acon, GError *err);
struct angel_connection {
gpointer data;
GStaticMutex mutex; /* angel itself has no threads */
GMutex *mutex;
struct ev_loop *loop;
int fd;
idlist *call_id_list;
@ -39,7 +35,6 @@ struct angel_connection {
angel_buffer in;
AngelReceiveCall recv_call;
AngelReceiveResult recv_result;
AngelCloseCallback close_cb;
/* parse input */
@ -84,11 +79,13 @@ typedef enum {
} AngelConnectionError;
/* create connection */
LI_API angel_connection* angel_connection_create(
LI_API angel_connection* angel_connection_new(
struct ev_loop *loop, int fd, gpointer data,
AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb);
AngelReceiveCall recv_call, AngelCloseCallback close_cb);
LI_API void angel_connection_free(angel_connection *acon);
LI_API angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout);
LI_API angel_call *angel_call_new(AngelCallback callback, ev_tstamp timeout);
/* returns TRUE if a call was cancelled; make sure you don't call free while you're calling send_call */
LI_API gboolean angel_call_free(angel_call *call);
@ -109,7 +106,6 @@ LI_API gboolean angel_send_call(
LI_API gboolean angel_send_result(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *error, GString *data, GArray *fds,
GError **err);

@ -18,6 +18,8 @@ typedef gboolean (*PluginCheckConfig) (server *srv, plugin *p);
typedef void (*PluginActivateConfig)(server *srv, plugin *p);
typedef void (*PluginParseItem) (server *srv, plugin *p, value **options);
typedef void (*PluginHandleCall) (server *srv, instance *i, plugin *p, gint32 id, GString *data);
typedef enum {
PLUGIN_ITEM_OPTION_MANDATORY = 1
} plugin_item_option_flags;
@ -32,7 +34,7 @@ struct plugin_item {
const gchar *name;
PluginParseItem handle_parse_item;
const plugin_item_option options[];
const plugin_item_option *options;
};
struct plugin {
@ -42,8 +44,8 @@ struct plugin {
gpointer data; /**< private plugin data */
const plugin_item *items;
GHashTable *angel_callbacks; /**< map (const gchar*) -> PluginHandleCall */
PluginInit plugin_init_marker; /**< identify plugin; PluginInit must be unique per plugin */
PluginFree handle_free; /**< called before plugin is unloaded */
PluginCleanConfig handle_clean_config; /**< called before the reloading of the config is started or after the reloading failed */
@ -59,6 +61,7 @@ struct Plugins {
struct modules *modules;
GHashTable *module_refs, *load_module_refs; /** gchar* -> server_module */
GHashTable *ht_plugins, *load_ht_plugins;
GPtrArray *plugins, *load_plugins; /* plugin* */
};
@ -69,7 +72,7 @@ void plugins_clear(server *srv);
void plugins_config_clean(server *srv);
gboolean plugins_config_load(server *srv, const gchar *filename);
gboolean plugins_handle_item(server *srv, GString *itemname, value *hash);
void plugins_handle_item(server *srv, GString *itemname, value *hash);
/* "core" is a reserved module name for interal use */
gboolean plugins_load_module(server *srv, const gchar *name);

@ -3,6 +3,16 @@
#include <lighttpd/angel_base.h>
typedef struct {
/* Load */
instance_conf *load_instconf;
gboolean load_failed;
/* Running */
instance_conf *instconf;
instance *inst;
} plugin_core_config_t;
gboolean plugin_core_init(server *srv);
#endif

@ -9,10 +9,38 @@
#define LIGHTTPD_ANGEL_MAGIC ((guint)0x3e14ac65)
#endif
typedef enum {
INSTANCE_DOWN, /* not running */
INSTANCE_LOADING, /* startup */
INSTANCE_WARMUP, /* running, but logging to files disabled */
INSTANCE_ACTIVE, /* everything running */
INSTANCE_SUSPEND /* handle remaining connections, suspend logs+accept() */
} instance_state_t;
struct instance_conf {
gint refcount;
gchar **cmd;
GString *username;
uid_t uid;
gid_t gid;
};
struct instance {
gint refcount;
server *srv;
instance_conf *ic;
pid_t pid;
ev_child child_watcher;
instance_state_t s_cur, s_dest;
angel_connection *con;
instance *replace, *replace_by;
angel_connection *acon;
gboolean in_jobqueue;
};
struct server {
@ -24,6 +52,9 @@ struct server {
sig_w_TERM,
sig_w_PIPE;
GQueue job_queue;
ev_async job_watcher;
Plugins plugins;
log_t log;
@ -32,4 +63,17 @@ struct server {
LI_API server* server_new(const gchar *module_dir);
LI_API void server_free(server* srv);
LI_API instance* server_new_instance(server *srv, instance_conf *ic);
LI_API void instance_replace(instance *oldi, instance *newi);
LI_API void instance_set_state(instance *i, instance_state_t s);
LI_API instance_conf* instance_conf_new(gchar **cmd, GString *username, uid_t uid, gid_t gid);
LI_API void instance_conf_release(instance_conf *ic);
LI_API void instance_conf_acquire(instance_conf *ic);
LI_API void instance_release(instance *i);
LI_API void instance_acquire(instance *i);
LI_API void instance_job_append(instance *i);
#endif

@ -27,6 +27,9 @@
#include <lighttpd/typedefs.h>
#include <lighttpd/module.h>
#include <lighttpd/angel_data.h>
#include <lighttpd/angel_connection.h>
#include <lighttpd/chunk.h>
#include <lighttpd/chunk_parser.h>

@ -22,12 +22,14 @@ struct server_socket {
struct server {
guint32 magic; /** server magic version, check against LIGHTTPD_SERVER_MAGIC in plugins */
server_state state; /** atomic access */
angel_connection *acon;
struct worker *main_worker;
guint worker_count;
GArray *workers;
GArray *ts_formats; /** array of (GString*), add with server_ts_format_add() */
struct ev_loop *loop;
guint loop_flags;
ev_signal
sig_w_INT,
@ -85,6 +87,7 @@ struct server {
LI_API server* server_new(const gchar *module_dir);
LI_API void server_free(server* srv);
LI_API gboolean server_loop_init(server *srv);
LI_API gboolean server_worker_init(server *srv);
LI_API void server_listen(server *srv, int fd);

@ -377,8 +377,8 @@ ADD_EXECUTABLE(lighttpd-angel
utils.c
)
ADD_TARGET_PROPERTIES(lighttpd-angel LINK_FLAGS "${LUA_LDFLAGS} ${EV_LDFLAGS} ${GMODULE_LDFLAGS} ${WARN_FLAGS}")
ADD_TARGET_PROPERTIES(lighttpd-angel COMPILE_FLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GMODULE_CFLAGS} ${WARN_FLAGS}")
ADD_TARGET_PROPERTIES(lighttpd-angel LINK_FLAGS "${LUA_LDFLAGS} ${EV_LDFLAGS} ${GTHREAD_LDFLAGS} ${GMODULE_LDFLAGS} ${WARN_FLAGS}")
ADD_TARGET_PROPERTIES(lighttpd-angel COMPILE_FLAGS "${LUA_CFLAGS} ${EV_CFLAGS} ${GTHREAD_CFLAGS} ${GMODULE_CFLAGS} ${WARN_FLAGS}")
IF(HAVE_PCRE_H)
TARGET_LINK_LIBRARIES(lighttpd ${PCRE_LIBRARY})

@ -2,12 +2,79 @@
#include <lighttpd/base.h>
#include <lighttpd/angel.h>
static void angel_call_cb(angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id, GString *data) {
server *srv = acon->data;
ERROR(srv, "received message for %s:%s, not implemented yet", mod, action);
if (-1 != id) angel_send_result(acon, id, g_string_new_len(CONST_STR_LEN("not implemented yet")), NULL, NULL, NULL);
}
static void angel_close_cb(angel_connection *acon, GError *err) {
server *srv = acon->data;
ERROR(srv, "fatal: angel connection close: %s", err ? err->message : g_strerror(errno));
if (err) g_error_free(err);
exit(1);
}
void angel_setup(server *srv) {
srv->acon = angel_connection_new(srv->loop, 0, srv, angel_call_cb, angel_close_cb);
}
static void angel_listen_cb(angel_call *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds) {
server *srv = ctx;
guint i;
UNUSED(data);
angel_call_free(acall);
ERROR(srv, "%s", "listen_cb");
if (timeout) {
ERROR(srv, "listen failed: %s", "time out");
return;
}
if (error->len > 0) {
ERROR(srv, "listen failed: %s", error->str);
/* TODO: exit? */
return;
}
if (fds && fds->len > 0) {
for (i = 0; i < fds->len; i++) {
INFO(srv, "listening on fd %i", g_array_index(fds, int, i));
server_listen(srv, g_array_index(fds, int, i));
}
g_array_set_size(fds, 0);
} else {
ERROR(srv, "listen failed: %s", "received no filedescriptors");
}
}
/* listen to a socket */
int angel_listen(server *srv, GString *str) {
return angel_fake_listen(srv, str);
void angel_listen(server *srv, GString *str) {
if (srv->acon) {
angel_call *acall = angel_call_new(angel_listen_cb, 3.0);
GError *err = NULL;
acall->context = srv;
if (!angel_send_call(srv->acon, CONST_STR_LEN("core"), CONST_STR_LEN("listen"), acall, g_string_new_len(GSTR_LEN(str)), &err)) {
ERROR(srv, "couldn't send call: %s", err->message);
g_error_free(err);
}
} else {
int fd = angel_fake_listen(srv, str);
if (-1 == fd) {
ERROR(srv, "listen('%s') failed", str->str);
/* TODO: exit? */
} else {
server_listen(srv, fd);
}
}
}
/* send log messages while startup to angel */
gboolean angel_log(server *srv, GString *str) {
return angel_fake_log(srv, str);
void angel_log(server *srv, GString *str) {
angel_fake_log(srv, str);
}

@ -5,6 +5,8 @@
#define ANGEL_MAGIC ((gint32) 0x8a930a9f)
static void close_fd_array(GArray *fds);
typedef enum {
ANGEL_CALL_SEND_SIMPLE = 1,
ANGEL_CALL_SEND_CALL = 2,
@ -56,6 +58,7 @@ static void send_queue_item_free(angel_connection_send_item_t *i) {
g_string_free(i->value.string.buf, TRUE);
break;
case ANGEL_CONNECTION_ITEM_FDS:
close_fd_array(i->value.fds.fds);
g_array_free(i->value.fds.fds, TRUE);
break;
}
@ -72,6 +75,7 @@ static void send_queue_clean(GQueue *queue) {
break;
case ANGEL_CONNECTION_ITEM_FDS:
if (i->value.fds.pos < i->value.fds.fds->len) return;
close_fd_array(i->value.fds.fds);
g_array_free(i->value.fds.fds, TRUE);
break;
}
@ -100,7 +104,7 @@ static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **e
old_len = acon->in.data->len;
g_string_set_size(acon->in.data, need);
for ( ; want > 0; ) {
r = read(acon->fd, acon->in.data + old_len, want);
r = read(acon->fd, acon->in.data->str + old_len, want);
if (r < 0) {
switch (errno) {
case EINTR:
@ -131,16 +135,63 @@ static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **e
return TRUE;
}
static void close_fd_array(GArray *fds) {
guint i;
for (i = 0; i < fds->len; i++) {
close(g_array_index(fds, int, i));
}
g_array_set_size(fds, 0);
}
static gboolean angel_dispatch(angel_connection *acon, GError **err) {
gint32 id = acon->parse.id, type = acon->parse.type;
angel_call *call = NULL;
AngelCallback cb = NULL;
gpointer ctx;
if (type != ANGEL_CALL_SEND_SIMPLE) {
g_static_mutex_lock(&acon->mutex);
switch (type) {
case ANGEL_CALL_SEND_SIMPLE:
if (-1 != id) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: %i, should be -1 for simple call", (gint) id);
close_fd_array(acon->parse.fds);
return FALSE;
}
if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Wrong data in call");
close_fd_array(acon->parse.fds);
return FALSE;
}
acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.data);
break;
case ANGEL_CALL_SEND_CALL:
if (-1 == id) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: -1, should be >= 0 for call");
close_fd_array(acon->parse.fds);
return FALSE;
}
if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Wrong data in call");
close_fd_array(acon->parse.fds);
return FALSE;
}
acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.data);
break;
case ANGEL_CALL_SEND_RESULT:
g_printerr("received result: %i\n", id);
g_mutex_lock(acon->mutex);
if (!idlist_is_used(acon->call_id_list, id)) {
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: %i", (gint) id);
close_fd_array(acon->parse.fds);
return FALSE;
}
idlist_put(acon->call_id_list, id);
@ -149,39 +200,23 @@ static gboolean angel_dispatch(angel_connection *acon, GError **err) {
g_ptr_array_index(acon->call_table, id) = NULL;
if (call) {
ev_timer_stop(acon->loop, &call->timeout_watcher);
if (!call->callback) {
ctx = call->context;
if (NULL == (cb = call->callback)) {
g_slice_free(angel_call, call);
call = NULL;
}
}
}
g_static_mutex_unlock(&acon->mutex);
} else if (-1 != id) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid id: %i, should be -1 for simple call", (gint) id);
return FALSE;
}
g_mutex_unlock(acon->mutex);
switch (type) {
case ANGEL_CALL_SEND_SIMPLE:
if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Wrong data in call");
return FALSE;
}
acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.data);
break;
case ANGEL_CALL_SEND_RESULT:
if (call) {
acon->recv_result(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action),
id, acon->parse.error, acon->parse.data, acon->parse.fds);
if (cb) {
cb(call, ctx, FALSE, acon->parse.error, acon->parse.data, acon->parse.fds);
}
close_fd_array(acon->parse.fds);
break;
default:
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA,
"Invalid type: %i", (gint) type);
close_fd_array(acon->parse.fds);
return FALSE;
}
@ -228,10 +263,12 @@ static gboolean angel_connection_read(angel_connection *acon, GError **err) {
"receive fd error: %s", g_strerror(errno));
return FALSE;
case -2:
g_printerr("waiting for fds: %i\n", acon->parse.missing_fds);
return TRUE; /* need more data */
}
}
acon->parse.have_header = FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.mod, acon->parse.mod_len, err)) return FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.action, acon->parse.action_len, err)) return FALSE;
if (!angel_data_read_mem(&acon->in, &acon->parse.error, acon->parse.error_len, err)) return FALSE;
@ -256,9 +293,9 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents)
gboolean out_queue_empty;
angel_connection_send_item_t *send_item;
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
send_item = g_queue_peek_head(acon->out);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
for (i = 0; send_item && (i < 10); i++) { /* don't send more than 10 chunks */
switch (send_item->type) {
@ -295,7 +332,9 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents)
case ANGEL_CONNECTION_ITEM_FDS:
while (send_item->value.fds.pos < send_item->value.fds.fds->len) {
switch (send_fd(w->fd, g_array_index(send_item->value.fds.fds, int, send_item->value.fds.pos))) {
case 0: continue;
case 0:
send_item->value.fds.pos++;
continue;
case -1: /* Fatal error, connection has to be closed */
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
@ -303,24 +342,23 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents)
return;
case -2: goto write_eagain;
}
send_item->value.fds.pos++;
}
break;
}
send_queue_item_free(send_item);
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
g_queue_pop_head(acon->out);
send_item = g_queue_peek_head(acon->out);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
}
write_eagain:
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
send_queue_clean(acon->out);
out_queue_empty = (0 == acon->out->length);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (out_queue_empty) ev_io_rem_events(loop, w, EV_WRITE);
}
@ -342,30 +380,82 @@ static void angel_connection_out_notify_cb(struct ev_loop *loop, ev_async *w, in
}
/* create connection */
angel_connection* angel_connection_create(struct ev_loop *loop, int fd, gpointer data,
AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb) {
angel_connection* angel_connection_new(struct ev_loop *loop, int fd, gpointer data,
AngelReceiveCall recv_call, AngelCloseCallback close_cb) {
angel_connection *acon = g_slice_new0(angel_connection);
acon->data = data;
g_static_mutex_init(&acon->mutex);
acon->mutex = g_mutex_new();
acon->loop = loop;
acon->fd = fd;
acon->call_id_list = idlist_new(65535);
acon->call_table = g_ptr_array_new();
ev_io_init(&acon->fd_watcher, angel_connection_io_cb, fd, EV_READ);
ev_io_start(acon->loop, &acon->fd_watcher);
acon->fd_watcher.data = acon;
ev_async_init(&acon->out_notify_watcher, angel_connection_out_notify_cb);
ev_async_start(acon->loop, &acon->out_notify_watcher);
acon->out_notify_watcher.data = acon;
acon->out = g_queue_new();
acon->in.data = g_string_sized_new(0);
acon->in.data = g_string_sized_new(1024);
acon->in.pos = 0;
acon->parse.mod = g_string_sized_new(0);
acon->parse.action = g_string_sized_new(0);
acon->parse.error = g_string_sized_new(0);
acon->parse.data = g_string_sized_new(0);
acon->parse.fds = g_array_new(FALSE, FALSE, sizeof(int));
acon->recv_call = recv_call;
acon->recv_result = recv_result;
acon->close_cb = close_cb;
return acon;
}
void angel_connection_free(angel_connection *acon) {
angel_connection_send_item_t *send_item;
guint i;
g_printerr("angel_connection_free\n");
if (!acon) return;
close(acon->fd);
acon->fd = -1;
for (i = 0; i < acon->call_table->len; i++) {
angel_call *acall = g_ptr_array_index(acon->call_table, i);
AngelCallback cb;
if (!acall) continue;
g_ptr_array_index(acon->call_table, i) = NULL;
cb = acall->callback;
ev_timer_stop(acon->loop, &acall->timeout_watcher);
if (cb) {
cb(acall, acall->context, TRUE, NULL, NULL, NULL);
} else {
g_slice_free(angel_call, acall);
}
}
g_ptr_array_free(acon->call_table, TRUE);
g_mutex_free(acon->mutex);
acon->mutex = NULL;
ev_io_stop(acon->loop, &acon->fd_watcher);
ev_async_stop(acon->loop, &acon->out_notify_watcher);
idlist_free(acon->call_id_list);
while (NULL != (send_item = g_queue_pop_head(acon->out))) {
send_queue_item_free(send_item);
}
g_queue_free(acon->out);
g_string_free(acon->in.data, TRUE);
/* TODO */
g_slice_free(angel_connection, acon);
}
static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
angel_call* call = (angel_call*) w->data;
angel_connection *acon = call->acon;
@ -373,23 +463,23 @@ static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents
gpointer ctx;
UNUSED(loop); UNUSED(revents);
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
g_ptr_array_index(acon->call_table, call->id) = NULL;
if (NULL == (cb = call->callback)) {
g_slice_free(angel_call, call);
}
ctx = call->context;
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (cb) cb(ctx, TRUE, NULL, NULL, NULL);
if (cb) cb(call, ctx, TRUE, NULL, NULL, NULL);
}
angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout) {
angel_call *angel_call_new(AngelCallback callback, ev_tstamp timeout) {
angel_call* call = g_slice_new0(angel_call);
g_assert(NULL != callback);
call->callback = callback;
ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, 0, timeout);
ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, timeout, 0);
call->timeout_watcher.data = call;
call->id = -1;
@ -402,14 +492,14 @@ gboolean angel_call_free(angel_call *call) {
if (call->acon) {
angel_connection *acon = call->acon;
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
if (-1 != call->id) {
r = TRUE;
call->callback = NULL;
} else {
g_slice_free(angel_call, call);
}
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
} else {
g_slice_free(angel_call, call);
}
@ -425,17 +515,26 @@ static gboolean prepare_call_header(GString **pbuf,
buf = g_string_sized_new(8*4 + mod_len + action_len);
*pbuf = buf;
g_printerr("Prepare call with id: %i\n", id);
if (!angel_data_write_int32(buf, ANGEL_MAGIC, err)) return FALSE;
if (!angel_data_write_int32(buf, type, err)) return FALSE;
if (!angel_data_write_int32(buf, id, err)) return FALSE;
if (!angel_data_write_int32(buf, mod_len, err)) return FALSE;
if (!angel_data_write_int32(buf, action_len, err)) return FALSE;
if (type != ANGEL_CALL_SEND_RESULT) {
if (!angel_data_write_int32(buf, mod_len, err)) return FALSE;
if (!angel_data_write_int32(buf, action_len, err)) return FALSE;
} else {
if (!angel_data_write_int32(buf, 0, err)) return FALSE;
if (!angel_data_write_int32(buf, 0, err)) return FALSE;
}
if (!angel_data_write_int32(buf, error_len, err)) return FALSE;
if (!angel_data_write_int32(buf, data_len, err)) return FALSE;
if (!angel_data_write_int32(buf, fd_count, err)) return FALSE;
g_string_append_len(buf, mod, mod_len);
g_string_append_len(buf, action, action_len);
if (type != ANGEL_CALL_SEND_RESULT) {
g_string_append_len(buf, mod, mod_len);
g_string_append_len(buf, action, action_len);
}
return TRUE;
}
@ -450,6 +549,11 @@ gboolean angel_send_simple_call(
if (err && *err) goto error;
if (-1 == acon->fd) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed");
goto error;
}
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
@ -457,11 +561,11 @@ gboolean angel_send_simple_call(
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error;
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
@ -485,38 +589,45 @@ gboolean angel_send_call(
if (err && *err) goto error;
g_static_mutex_lock(&acon->mutex);
if (-1 == acon->fd) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed");
goto error;
}
g_mutex_lock(acon->mutex);
if (-1 != call->id) {
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_ALREADY_RUNNING, "call already running");
goto error_before_new_id;
}
if (-1 == (call->id = idlist_get(acon->call_id_list))) {
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_OUT_OF_CALL_IDS, "out of call ids");
goto error;
}
call->acon = acon;
if ((guint) call->id >= acon->call_table->len) {
g_ptr_array_set_size(acon->call_table, call->id + 1);
}
g_ptr_array_index(acon->call_table, call->id) = call;
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
if (data && data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error;
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data ? data->len : 0, 0, err)) goto error;
ev_timer_start(acon->loop, &call->timeout_watcher);
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, data);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
@ -527,6 +638,7 @@ error:
if (-1 != call->id) {
idlist_put(acon->call_id_list, call->id);
call->id = -1;
call->acon = NULL;
}
error_before_new_id:
if (data) g_string_free(data, TRUE);
@ -536,7 +648,6 @@ error_before_new_id:
gboolean angel_send_result(
angel_connection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
gint32 id,
GString *error, GString *data, GArray *fds,
GError **err) {
@ -545,20 +656,25 @@ gboolean angel_send_result(
if (err && *err) goto error;
if (data->len > ANGEL_CALL_MAX_STR_LEN) {
if (-1 == acon->fd) {
g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, "connection already closed");
goto error;
}
if (data && data->len > ANGEL_CALL_MAX_STR_LEN) {
g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN);
goto error;
}
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, mod, mod_len, action, action_len, error->len, data->len, fds->len, err)) goto error;
if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, NULL, 0, NULL, 0, error ? error->len : 0, data ? data->len : 0, fds ? fds->len : 0, err)) goto error;
g_static_mutex_lock(&acon->mutex);
g_mutex_lock(acon->mutex);
queue_was_empty = (0 == acon->out->length);
send_queue_push_string(acon->out, buf);
send_queue_push_string(acon->out, error);
send_queue_push_string(acon->out, data);
send_queue_push_fds(acon->out, fds);
g_static_mutex_unlock(&acon->mutex);
g_mutex_unlock(acon->mutex);
if (queue_was_empty)
ev_async_send(acon->loop, &acon->out_notify_watcher);
@ -568,6 +684,8 @@ gboolean angel_send_result(
error:
if (data) g_string_free(data, TRUE);
if (buf) g_string_free(buf, TRUE);
if (error) g_string_free(error, TRUE);
if (fds) close_fd_array(fds);
return FALSE;
return FALSE;

@ -18,6 +18,9 @@ void log_init(server *srv) {
srv->log.levels[LOG_LEVEL_ERROR] = TRUE;
srv->log.levels[LOG_LEVEL_WARNING] = TRUE;
srv->log.levels[LOG_LEVEL_INFO] = TRUE; /* TODO: remove debug levels */
srv->log.levels[LOG_LEVEL_DEBUG] = TRUE;
srv->log.fd = -1;
srv->log.ts_cache = g_string_sized_new(0);
srv->log.log_line = g_string_sized_new(0);

@ -1,6 +1,16 @@
#include <lighttpd/angel_base.h>
#include <lighttpd/angel_config_parser.h>
#include <lighttpd/angel_plugin_core.h>
# ifndef HAVE_ISSETUGID
static int l_issetugid() {
return (geteuid() != getuid() || getegid() != getgid());
}
# define issetugid l_issetugid
# endif
int main(int argc, char *argv[]) {
GError *error = NULL;
@ -51,6 +61,15 @@ int main(int argc, char *argv[]) {
goto cleanup;
}
if (!(getuid() == 0) && issetugid()) {
g_printerr("Are you nuts ? Don't apply a SUID bit to this binary\n");
result = -1;
goto cleanup;
}
/* initialize threading */
g_thread_init(NULL);
srv = server_new(module_dir);
if (!plugins_config_load(srv, config_path)) {
@ -58,7 +77,11 @@ int main(int argc, char *argv[]) {
goto cleanup;
}
g_printerr("lighttpd-angel: Parsed config file\n");
INFO(srv, "%s", "parsed config file");
ev_loop(srv->loop, 0);
INFO(srv, "%s", "going down");
cleanup:
if (srv) server_free(srv);

@ -38,12 +38,14 @@ static server_item* server_item_new(plugin *p, const plugin_item *p_item) {
static void plugin_free(server *srv, plugin *p) {
if (p->handle_free) p->handle_free(srv, p);
g_hash_table_destroy(p->angel_callbacks);
g_slice_free(plugin, p);
}
static plugin* plugin_new(const char *name) {
plugin *p = g_slice_new0(plugin);
p->name = name;
p->angel_callbacks = g_hash_table_new(g_str_hash, g_str_equal);
return p;
}
@ -89,6 +91,9 @@ void plugins_init(server *srv, const gchar *module_dir) {
ps->module_refs = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, _server_module_release);
ps->load_module_refs = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, _server_module_release);
ps->ht_plugins = g_hash_table_new(g_str_hash, g_str_equal);
ps->load_ht_plugins = g_hash_table_new(g_str_hash, g_str_equal);
ps->plugins = g_ptr_array_new();
ps->load_plugins = g_ptr_array_new();
}
@ -104,6 +109,9 @@ void plugins_clear(server *srv) {
g_hash_table_destroy(ps->module_refs);
g_hash_table_destroy(ps->load_module_refs);
g_hash_table_remove_all(ps->ht_plugins);
g_hash_table_remove_all(ps->load_ht_plugins);
g_ptr_array_free(ps->plugins, TRUE);
g_ptr_array_free(ps->load_plugins, TRUE);
@ -123,6 +131,7 @@ void plugins_config_clean(server *srv) {
g_hash_table_remove_all(ps->load_items);
g_hash_table_remove_all(ps->load_module_refs);
g_hash_table_remove_all(ps->load_ht_plugins);
g_ptr_array_set_size(ps->load_plugins, 0);
}
@ -131,32 +140,44 @@ gboolean plugins_config_load(server *srv, const gchar *filename) {
GError *error = NULL;
guint i;
if (!plugins_load_module(srv, NULL)) {
ERROR(srv, "%s", "failed loading core plugins");
plugins_config_clean(srv);
return FALSE;
}
if (!angel_config_parse_file(srv, filename, &error)) {
ERROR(srv, "failed to parse config file: %s\n", error->message);
ERROR(srv, "failed to parse config file: %s", error->message);
g_error_free(error);
plugins_config_clean(srv);
return FALSE;
}
/* check new config */
for (i = ps->plugins->len; i-- > 0; ) {
for (i = ps->load_plugins->len; i-- > 0; ) {
plugin *p = g_ptr_array_index(ps->load_plugins, i);
if (p->handle_check_config) {
if (!p->handle_check_config(srv, p)) {
ERROR(srv, "%s", "config check failed");
plugins_config_clean(srv);
return FALSE;
}
}
}
ERROR(srv, "%s", "activate");
/* activate new config */
for (i = ps->plugins->len; i-- > 0; ) {
for (i = ps->load_plugins->len; i-- > 0; ) {
plugin *p = g_ptr_array_index(ps->load_plugins, i);
ERROR(srv, "activate: %s", p->name);
if (p->handle_activate_config) {
p->handle_activate_config(srv, p);
}
}
ERROR(srv, "%s", "done");
{ /* swap the arrays */
GPtrArray *tmp = ps->load_plugins; ps->load_plugins = ps->plugins; ps->plugins = tmp;
}
@ -164,9 +185,11 @@ gboolean plugins_config_load(server *srv, const gchar *filename) {
GHashTable *tmp;
tmp = ps->load_items; ps->load_items = ps->items; ps->items = tmp;
tmp = ps->load_module_refs; ps->load_module_refs = ps->module_refs; ps->module_refs = tmp;
tmp = ps->load_ht_plugins; ps->load_ht_plugins = ps->ht_plugins; ps->ht_plugins = tmp;
}
g_hash_table_remove_all(ps->load_items);
g_hash_table_remove_all(ps->load_module_refs);
g_hash_table_remove_all(ps->load_ht_plugins);
g_ptr_array_set_size(ps->load_plugins, 0);
if (!ps->config_filename) {
@ -178,7 +201,7 @@ gboolean plugins_config_load(server *srv, const gchar *filename) {
return TRUE;
}
gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) {
void plugins_handle_item(server *srv, GString *itemname, value *hash) {
Plugins *ps = &srv->plugins;
server_item *si;
@ -186,7 +209,7 @@ gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) {
/* debug items */
{
GString *tmp = value_to_string(hash);
ERROR(srv, "Item '%s': %s\n", itemname->str, tmp->str);
ERROR(srv, "Item '%s': %s", itemname->str, tmp->str);
g_string_free(tmp, TRUE);
}
#endif
@ -239,7 +262,6 @@ gboolean plugins_handle_item(server *srv, GString *itemname, value *hash) {
g_slice_free1(sizeof(value*) * si->option_count, optlist);
}
return TRUE;
}
static gboolean plugins_activate_module(server *srv, server_module *sm) {
@ -266,6 +288,8 @@ static gboolean plugins_activate_module(server *srv, server_module *sm) {
}
}
g_hash_table_insert(ps->load_ht_plugins, (gpointer) p->name, p);
return TRUE;
item_collission:
@ -341,6 +365,7 @@ plugin *angel_plugin_register(server *srv, module *mod, const gchar *name, Plugi
p = plugin_new(name);
if (!init(srv, p)) {
ERROR(srv, "Couldn't load plugin '%s' for module '%s': init failed", name, mod->name->str);
plugin_free(srv, p);
return NULL;
}

@ -1,8 +1,302 @@
#include <lighttpd/angel_plugin_core.h>
#include <lighttpd/ip_parsers.h>
gboolean plugin_core_init(server *srv) {
/* load core plugins */
#include <pwd.h>
#include <grp.h>
static void core_instance_parse(server *srv, plugin *p, value **options) {
GPtrArray *cmd;
gchar **cmdarr;
plugin_core_config_t *config = (plugin_core_config_t*) p->data;
uid_t uid = -1;
gid_t gid = -1;
GString *user = NULL;
if (config->load_instconf) {
ERROR(srv, "%s", "Already configure the instance");
config->load_failed = FALSE;
return;
}
/* set user and group */
if (options[0]) {
struct passwd *pwd;
user = options[0]->data.string;
if (NULL == (pwd = getpwnam(user->str))) {
ERROR(srv, "can't find username '%s'", user->str);
config->load_failed = FALSE;
return;
}
uid = pwd->pw_uid;
gid = pwd->pw_gid;
}
if (options[1]) {
struct group *grp;
GString *group = options[1]->data.string;
if (NULL == (grp = getgrnam(group->str))) {
ERROR(srv, "can't find groupname '%s'", group->str);
config->load_failed = FALSE;
return;
}
gid = grp->gr_gid;
}
if (0 == uid) {
ERROR(srv, "%s", "I will not set uid to 0");
config->load_failed = FALSE;
return;
}
if (0 == gid) {
ERROR(srv, "%s", "I will not set gid to 0");
config->load_failed = FALSE;
return;
}
cmd = g_ptr_array_new();
#if 0
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/bin/valgrind")));
#endif
if (options[2]) {
g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[2]->data.string)));
} else {
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/bin/lighttpd")));
}
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("--angel")));
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-c")));
if (options[3]) {
g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[3]->data.string)));
} else if (options[4]) {
g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[4]->data.string)));
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-l")));
} else {
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/etc/lighttpd2/lighttpd.conf")));
}
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("-m")));
if (options[5]) {
g_ptr_array_add(cmd, g_strndup(GSTR_LEN(options[5]->data.string)));
} else {
g_ptr_array_add(cmd, g_strndup(CONST_STR_LEN("/usr/lib/lighttpd2/")));
}
g_ptr_array_add(cmd, NULL);
cmdarr = (gchar**) g_ptr_array_free(cmd, FALSE);
config->load_instconf = instance_conf_new(cmdarr, user, uid, gid);
}
static const plugin_item_option core_instance_options[] = {
{ "user", VALUE_STRING, 0 },
{ "group", VALUE_STRING, 0 },
{ "binary", VALUE_STRING, 0 },
{ "config", VALUE_STRING, 0 },
{ "luaconfig", VALUE_STRING, 0 },
{ "modules", VALUE_STRING, 0 },
{ NULL, 0, 0 }
};
static const plugin_item core_items[] = {
{ "instance", core_instance_parse, core_instance_options },
{ NULL, NULL, NULL }
};
static int do_listen(server *srv, GString *str) {
guint32 ipv4;
#ifdef HAVE_IPV6
guint8 ipv6[16];
#endif
guint16 port = 80;
if (parse_ipv4(str->str, &ipv4, NULL, &port)) {
int s, v;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipv4;
addr.sin_port = htons(port);
if (-1 == (s = socket(AF_INET, SOCK_STREAM, 0))) {
ERROR(srv, "Couldn't open socket: %s", g_strerror(errno));
return -1;
}
v = 1;
if (-1 == setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(v))) {
close(s);
ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno));
return -1;
}
if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) {
close(s);
ERROR(srv, "Couldn't bind socket to '%s': %s", str->str, g_strerror(errno));
return -1;
}
if (-1 == listen(s, 1000)) {
close(s);
ERROR(srv, "Couldn't listen on '%s': %s", str->str, g_strerror(errno));
return -1;
}
DEBUG(srv, "listen to ipv4: '%s' port: %d", str->str, port);
return s;
#ifdef HAVE_IPV6
} else if (parse_ipv6(str->str, ipv6, NULL, &port)) {
GString *ipv6_str = g_string_sized_new(0);
int s, v;
struct sockaddr_in6 addr;
ipv6_tostring(ipv6_str, ipv6);
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
memcpy(&addr.sin6_addr, ipv6, 16);
addr.sin6_port = htons(port);
if (-1 == (s = socket(AF_INET6, SOCK_STREAM, 0))) {
ERROR(srv, "Couldn't open socket: %s", g_strerror(errno));
g_string_free(ipv6_str, TRUE);
return -1;
}
v = 1;
if (-1 == setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(v))) {
close(s);
ERROR(srv, "Couldn't setsockopt(SO_REUSEADDR): %s", g_strerror(errno));
g_string_free(ipv6_str, TRUE);
return -1;
}
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v, sizeof(v))) {
close(s);
ERROR(srv, "Couldn't setsockopt(IPV6_V6ONLY): %s", g_strerror(errno));
g_string_free(ipv6_str, TRUE);
return -1;
}
if (-1 == bind(s, (struct sockaddr*)&addr, sizeof(addr))) {
close(s);
ERROR(srv, "Couldn't bind socket to '%s': %s", ipv6_str->str, g_strerror(errno));
g_string_free(ipv6_str, TRUE);
return -1;
}
if (-1 == listen(s, 1000)) {
close(s);
ERROR(srv, "Couldn't listen on '%s': %s", ipv6_str->str, g_strerror(errno));
g_string_free(ipv6_str, TRUE);
return -1;
}
DEBUG(srv, "listen to ipv6: '%s' port: %d", ipv6_str->str, port);
g_string_free(ipv6_str, TRUE);
return s;
#endif
} else {
ERROR(srv, "Invalid ip: '%s'", str->str);
return -1;
}
}
static void core_listen(server *srv, instance *i, plugin *p, gint32 id, GString *data) {
GError *err = NULL;
gint fd;
GArray *fds;
DEBUG(srv, "core_listen(%i) '%s'", id, data->str);
if (-1 == id) return; /* ignore simple calls */
fd = do_listen(srv, data);
if (-1 == fd) {
GString *error = g_string_sized_new(0);
g_string_printf(error, "Couldn't listen to '%s'", data->str);
if (!angel_send_result(i->acon, id, error, NULL, NULL, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
}
return;
}
fds = g_array_new(FALSE, FALSE, sizeof(int));
g_array_append_val(fds, fd);
if (!angel_send_result(i->acon, id, NULL, NULL, fds, &err)) {
ERROR(srv, "Couldn't send result: %s", err->message);
g_error_free(err);
return;