From 75505f73e1df5b9fef99b13cff8f05b4d9769c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Tue, 23 Jun 2009 23:40:13 +0200 Subject: [PATCH] Add more angel connection handling code (and helper functions) --- include/lighttpd/angel_base.h | 2 + include/lighttpd/angel_connection.h | 70 +++- include/lighttpd/angel_data.h | 1 + include/lighttpd/angel_server.h | 2 + include/lighttpd/base.h | 1 + include/lighttpd/idlist.h | 3 + include/lighttpd/utils.h | 9 + src/CMakeLists.txt | 6 +- src/angel_connection.c | 583 ++++++++++++++++++++++++++++ src/angel_data.c | 39 +- src/idlist.c | 9 + src/utils.c | 64 ++- 12 files changed, 756 insertions(+), 33 deletions(-) diff --git a/include/lighttpd/angel_base.h b/include/lighttpd/angel_base.h index f11ac86..d837624 100644 --- a/include/lighttpd/angel_base.h +++ b/include/lighttpd/angel_base.h @@ -25,4 +25,6 @@ typedef struct instance instance; #include #include +#include + #endif diff --git a/include/lighttpd/angel_connection.h b/include/lighttpd/angel_connection.h index 8dbf62f..85d8eb2 100644 --- a/include/lighttpd/angel_connection.h +++ b/include/lighttpd/angel_connection.h @@ -1,19 +1,56 @@ #ifndef _LIGHTTPD_ANGEL_CONNECTION_H_ #define _LIGHTTPD_ANGEL_CONNECTION_H_ +#include + +#define ANGEL_CALL_MAX_STR_LEN (64*1024) /* must fit into a gint32 */ + struct angel_connection; typedef struct angel_connection angel_connection; struct angel_call; typedef struct angel_call angel_call; -typedef void (*AngelCallback)(angel_call *acall, gboolean timeout, GString *error, GString *data, GArray *fds); +typedef void (*AngelCallback)(gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds); + +typedef void (*AngelReceiveCall)(angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gint32 id, + GString *data); + +typedef void (*AngelReceiveResult)(angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gint32 id, + GString *error, GString *data, GArray *fds); +/* gets called after read/write errors */ +typedef void (*AngelCloseCallback)(angel_connection *acon, GError *err); struct angel_connection { + gpointer data; GStaticMutex mutex; /* angel itself has no threads */ struct ev_loop *loop; int fd; + idlist *call_id_list; + GPtrArray *call_table; + ev_io fd_watcher; + ev_async out_notify_watcher; + GQueue *out; + angel_buffer in; + + AngelReceiveCall recv_call; + AngelReceiveResult recv_result; + AngelCloseCallback close_cb; + + /* parse input */ + struct { + gboolean have_header; + gint32 type, id; + gint32 mod_len, action_len, error_len, data_len, missing_fds; + guint body_size; + GString *mod, *action, *error, *data; + GArray *fds; + } parse; }; /* with multi-threading you should protect the structure @@ -24,27 +61,39 @@ struct angel_call { AngelCallback callback; /* internal data */ gint32 id; /* id is -1 if there is no call pending (the callback may still be running) */ - guint timeout; + angel_connection *acon; ev_timer timeout_watcher; - ev_io fd_watcher; }; /* error handling */ #define ANGEL_CALL_ERROR angel_call_error_quark() LI_API GQuark angel_call_error_quark(); +#define ANGEL_CONNECTION_ERROR angel_connection_error_quark() +LI_API GQuark angel_connection_error_quark(); + typedef enum { - ANGEL_CALL_ALREADY_RUNNING /* the angel_call struct is already in use for a call */ + ANGEL_CALL_ALREADY_RUNNING, /* the angel_call struct is already in use for a call */ + ANGEL_CALL_OUT_OF_CALL_IDS, /* too many calls already pending */ + ANGEL_CALL_INVALID /* invalid params */ } AngelCallError; +typedef enum { + ANGEL_CONNECTION_CLOSED, /* error on socket */ + ANGEL_CONNECTION_INVALID_DATA /* invalid data from stream */ +} AngelConnectionError; + /* create connection */ -LI_API angel_connection* angel_connection_create(int fd); +LI_API angel_connection* angel_connection_create( + struct ev_loop *loop, int fd, gpointer data, + AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb); +LI_API angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout); +/* returns TRUE if a call was cancelled; make sure you don't call free while you're calling send_call */ +LI_API gboolean angel_call_free(angel_call *call); /* calls */ /* the GString* parameters get stolen by the angel call (moved to chunkqueue) */ -LI_API void angel_call_init(angel_call *call); - LI_API gboolean angel_send_simple_call( angel_connection *acon, const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, @@ -54,18 +103,19 @@ LI_API gboolean angel_send_simple_call( LI_API gboolean angel_send_call( angel_connection *acon, const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, - angel_call *call, guint timeout, + angel_call *call, GString *data, GError **err); LI_API gboolean angel_send_result( angel_connection *acon, const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, - angel_call *call, guint timeout, + gint32 id, GString *error, GString *data, GArray *fds, GError **err); -LI_API gboolean angel_cancel_call(angel_connection *acon, angel_call *call); +/* free temporary needed memroy; call this once in while after some activity */ +LI_API void angel_cleanup_tables(angel_connection *acon); /* Usage */ #if 0 diff --git a/include/lighttpd/angel_data.h b/include/lighttpd/angel_data.h index ceb912a..9f388e7 100644 --- a/include/lighttpd/angel_data.h +++ b/include/lighttpd/angel_data.h @@ -54,5 +54,6 @@ LI_API gboolean angel_data_read_int32(angel_buffer *buf, gint32 *val, GError **e LI_API gboolean angel_data_read_int64(angel_buffer *buf, gint64 *val, GError **err); LI_API gboolean angel_data_read_char (angel_buffer *buf, gchar *val, GError **err); LI_API gboolean angel_data_read_str (angel_buffer *buf, GString **val, GError **err); +LI_API gboolean angel_data_read_mem (angel_buffer *buf, GString **val, gsize len, GError **err); #endif diff --git a/include/lighttpd/angel_server.h b/include/lighttpd/angel_server.h index 284924f..bb2255a 100644 --- a/include/lighttpd/angel_server.h +++ b/include/lighttpd/angel_server.h @@ -11,6 +11,8 @@ struct instance { pid_t pid; + + angel_connection *con; }; struct server { diff --git a/include/lighttpd/base.h b/include/lighttpd/base.h index 89a6e3c..6fdaefe 100644 --- a/include/lighttpd/base.h +++ b/include/lighttpd/base.h @@ -61,6 +61,7 @@ #include #include #include +#include #include #include diff --git a/include/lighttpd/idlist.h b/include/lighttpd/idlist.h index f66754a..ee25ee6 100644 --- a/include/lighttpd/idlist.h +++ b/include/lighttpd/idlist.h @@ -32,6 +32,9 @@ LI_API void idlist_free(idlist *l); /* request new id; return -1 if no id is available, valid ids are always > 0 */ LI_API gint idlist_get(idlist *l); +/* check whether an id is in use and can be "_put" */ +LI_API gboolean idlist_is_used(idlist *l, gint id); + /* release id. never release an id more than once! */ LI_API void idlist_put(idlist *l, gint id); diff --git a/include/lighttpd/utils.h b/include/lighttpd/utils.h index 6966c2e..8f2ea84 100644 --- a/include/lighttpd/utils.h +++ b/include/lighttpd/utils.h @@ -15,6 +15,15 @@ LI_API void fatal(const gchar* msg); /* set O_NONBLOCK and FD_CLOEXEC */ LI_API void fd_init(int fd); +LI_API void fd_no_block(int fd); +LI_API void fd_block(int fd); + +#ifndef _WIN32 +/* return -2 for EAGAIN, -1 for some other error, 0 for success */ +LI_API int send_fd(int s, int fd); /* write fd to unix socket s */ +LI_API int receive_fd(int s, int *fd); /* read fd from unix socket s */ +#endif + LI_API void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events); LI_API void ev_io_rem_events(struct ev_loop *loop, ev_io *watcher, int events); LI_API void ev_io_set_events(struct ev_loop *loop, ev_io *watcher, int events); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4ec9080..2873087 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -263,8 +263,9 @@ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR} ${CM SET(COMMON_SRC angel.c - angel_fake.c + angel_connection.c angel_data.c + angel_fake.c actions.c base.c chunk.c @@ -280,6 +281,7 @@ SET(COMMON_SRC http_headers.c http_request_parser.c http_response_parser.c + idlist.c ip_parsers.c lighttpd-glue.c log.c @@ -360,6 +362,7 @@ ADD_TARGET_PROPERTIES(lighttpd COMPILE_FLAGS ${COMMON_CFLAGS}) ADD_EXECUTABLE(lighttpd-angel angel_config_parser.c + angel_connection.c angel_data.c angel_log.c angel_main.c @@ -367,6 +370,7 @@ ADD_EXECUTABLE(lighttpd-angel angel_plugin_core.c angel_server.c angel_value.c + idlist.c ip_parsers.c module.c utils.c diff --git a/src/angel_connection.c b/src/angel_connection.c index e69de29..6029bbd 100644 --- a/src/angel_connection.c +++ b/src/angel_connection.c @@ -0,0 +1,583 @@ + +#include +#include +#include + +#define ANGEL_MAGIC ((gint32) 0x8a930a9f) + +typedef enum { + ANGEL_CALL_SEND_SIMPLE = 1, + ANGEL_CALL_SEND_CALL = 2, + ANGEL_CALL_SEND_RESULT = 3 +} angel_call_send_t; + +typedef struct { + enum { ANGEL_CONNECTION_ITEM_GSTRING, ANGEL_CONNECTION_ITEM_FDS } type; + union { + struct { + GString *buf; + guint pos; + } string; + struct { + GArray *fds; + guint pos; + } fds; + } value; +} angel_connection_send_item_t; + +static void send_queue_push_string(GQueue *queue, GString *buf) { + angel_connection_send_item_t *i; + if (!buf || !buf->len) return; + + i = g_slice_new0(angel_connection_send_item_t); + i->type = ANGEL_CONNECTION_ITEM_GSTRING; + i->value.string.buf = buf; + i->value.string.pos = 0; + + g_queue_push_tail(queue, i); +} + +static void send_queue_push_fds(GQueue *queue, GArray *fds) { + angel_connection_send_item_t *i; + if (!fds || !fds->len) return; + + i = g_slice_new0(angel_connection_send_item_t); + i->type = ANGEL_CONNECTION_ITEM_FDS; + i->value.fds.fds = fds; + i->value.fds.pos = 0; + + g_queue_push_tail(queue, i); +} + +static void send_queue_item_free(angel_connection_send_item_t *i) { + if (!i) return; + switch (i->type) { + case ANGEL_CONNECTION_ITEM_GSTRING: + g_string_free(i->value.string.buf, TRUE); + break; + case ANGEL_CONNECTION_ITEM_FDS: + g_array_free(i->value.fds.fds, TRUE); + break; + } + g_slice_free(angel_connection_send_item_t, i); +} + +static void send_queue_clean(GQueue *queue) { + angel_connection_send_item_t *i; + while (NULL != (i = g_queue_peek_head(queue))) { + switch (i->type) { + case ANGEL_CONNECTION_ITEM_GSTRING: + if (i->value.string.pos < i->value.string.buf->len) return; + g_string_free(i->value.string.buf, TRUE); + break; + case ANGEL_CONNECTION_ITEM_FDS: + if (i->value.fds.pos < i->value.fds.fds->len) return; + g_array_free(i->value.fds.fds, TRUE); + break; + } + g_queue_peek_head(queue); + g_slice_free(angel_connection_send_item_t, i); + } +} + +GQuark angel_call_error_quark() { + return g_quark_from_static_string("angel-call-error-quark"); +} +GQuark angel_connection_error_quark() { + return g_quark_from_static_string("angel-connection-error-quark"); +} + +static gboolean angel_fill_buffer(angel_connection *acon, guint need, GError **err) { + gsize old_len; + ssize_t want, r; + if (acon->in.pos > 0) { + g_string_erase(acon->in.data, 0, acon->in.pos); + acon->in.pos = 0; + } + if (acon->in.data->len >= need) return TRUE; + + want = need - acon->in.data->len; /* always > 0 */ + old_len = acon->in.data->len; + g_string_set_size(acon->in.data, need); + for ( ; want > 0; ) { + r = read(acon->fd, acon->in.data + old_len, want); + if (r < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + g_string_set_size(acon->in.data, old_len); + return TRUE; + default: + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, + "read error: %s", g_strerror(errno)); + g_string_set_size(acon->in.data, old_len); + return FALSE; + } + } else if (r == 0) { /* eof */ + errno = ECONNRESET; + g_string_set_size(acon->in.data, old_len); + return FALSE; + } else { + want -= r; + old_len += r; + } + } + + g_string_set_size(acon->in.data, old_len); + return TRUE; +} + +static gboolean angel_dispatch(angel_connection *acon, GError **err) { + gint32 id = acon->parse.id, type = acon->parse.type; + angel_call *call = NULL; + + if (type != ANGEL_CALL_SEND_SIMPLE) { + g_static_mutex_lock(&acon->mutex); + if (!idlist_is_used(acon->call_id_list, id)) { + g_static_mutex_unlock(&acon->mutex); + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid id: %i", (gint) id); + return FALSE; + } + idlist_put(acon->call_id_list, id); + if (type == ANGEL_CALL_SEND_RESULT && (guint) id < acon->call_table->len) { + call = (angel_call*) g_ptr_array_index(acon->call_table, id); + g_ptr_array_index(acon->call_table, id) = NULL; + if (call) { + ev_timer_stop(acon->loop, &call->timeout_watcher); + if (!call->callback) { + g_slice_free(angel_call, call); + call = NULL; + } + } + } + + g_static_mutex_unlock(&acon->mutex); + } else if (-1 != id) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid id: %i, should be -1 for simple call", (gint) id); + return FALSE; + } + + switch (type) { + case ANGEL_CALL_SEND_SIMPLE: + if (acon->parse.error->len > 0 || acon->parse.fds->len > 0) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Wrong data in call"); + return FALSE; + } + acon->recv_call(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), + id, acon->parse.data); + break; + case ANGEL_CALL_SEND_RESULT: + if (call) { + acon->recv_result(acon, GSTR_LEN(acon->parse.mod), GSTR_LEN(acon->parse.action), + id, acon->parse.error, acon->parse.data, acon->parse.fds); + } + break; + default: + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid type: %i", (gint) type); + return FALSE; + } + + return TRUE; +} + +static gboolean angel_connection_read(angel_connection *acon, GError **err) { + for ( ;; ) { + if (!acon->parse.have_header) { + gint32 magic; + if (!angel_fill_buffer(acon, 8*4, err)) return FALSE; + if (acon->in.data->len - acon->in.pos < 8*4) return TRUE; /* need more data */ + + if (!angel_data_read_int32(&acon->in, &magic, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.type, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.id, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.mod_len, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.action_len, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.error_len, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.data_len, err)) return FALSE; + if (!angel_data_read_int32(&acon->in, &acon->parse.missing_fds, err)) return FALSE; + + if (ANGEL_MAGIC != magic) { + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_INVALID_DATA, + "Invalid magic: 0x%x (should be 0x%x)", (gint) magic, (gint) ANGEL_MAGIC); + return FALSE; + } + + acon->parse.body_size = acon->parse.mod_len + acon->parse.action_len + acon->parse.error_len + acon->parse.data_len; + acon->parse.have_header = TRUE; + } + + if (!angel_fill_buffer(acon, acon->parse.body_size, err)) return FALSE; + if (acon->in.data->len - acon->in.pos < acon->parse.body_size) return TRUE; /* need more data */ + while (acon->parse.missing_fds > 0) { + int fd = -1; + switch (receive_fd(acon->fd, &fd)) { + case 0: + g_array_append_val(acon->parse.fds, fd); + acon->parse.missing_fds--; + break; + case -1: + g_set_error(err, ANGEL_CONNECTION_ERROR, ANGEL_CONNECTION_CLOSED, + "receive fd error: %s", g_strerror(errno)); + return FALSE; + case -2: + return TRUE; /* need more data */ + } + } + + if (!angel_data_read_mem(&acon->in, &acon->parse.mod, acon->parse.mod_len, err)) return FALSE; + if (!angel_data_read_mem(&acon->in, &acon->parse.action, acon->parse.action_len, err)) return FALSE; + if (!angel_data_read_mem(&acon->in, &acon->parse.error, acon->parse.error_len, err)) return FALSE; + if (!angel_data_read_mem(&acon->in, &acon->parse.data, acon->parse.data_len, err)) return FALSE; + + if (!angel_dispatch(acon, err)) return FALSE; + + g_string_truncate(acon->parse.error, 0); + g_string_truncate(acon->parse.data, 0); + g_array_set_size(acon->parse.fds, 0); + } +} + +static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents) { + angel_connection *acon = (angel_connection*) w->data; + + if (revents | EV_WRITE) { + GString *out_str; + int i; + ssize_t written, len; + gchar *data; + gboolean out_queue_empty; + angel_connection_send_item_t *send_item; + + g_static_mutex_lock(&acon->mutex); + send_item = g_queue_peek_head(acon->out); + g_static_mutex_unlock(&acon->mutex); + + for (i = 0; send_item && (i < 10); i++) { /* don't send more than 10 chunks */ + switch (send_item->type) { + case ANGEL_CONNECTION_ITEM_GSTRING: + out_str = send_item->value.string.buf; + written = send_item->value.string.pos; + data = out_str->str + written; + len = out_str->len - written; + while (len > 0) { + written = write(w->fd, data, len); + if (written < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + goto write_eagain; + default: /* Fatal error, connection has to be closed */ + ev_async_stop(loop, &acon->out_notify_watcher); + ev_io_stop(loop, &acon->fd_watcher); + acon->close_cb(acon, NULL); /* TODO: set err */ + return; + } + } else { + data += written; + len -= written; + send_item->value.string.pos += written; + } + } + break; + + case ANGEL_CONNECTION_ITEM_FDS: + while (send_item->value.fds.pos < send_item->value.fds.fds->len) { + switch (send_fd(w->fd, g_array_index(send_item->value.fds.fds, int, send_item->value.fds.pos))) { + case 0: continue; + case -1: /* Fatal error, connection has to be closed */ + ev_async_stop(loop, &acon->out_notify_watcher); + ev_io_stop(loop, &acon->fd_watcher); + acon->close_cb(acon, NULL); /* TODO: set err */ + return; + case -2: goto write_eagain; + } + send_item->value.fds.pos++; + } + break; + } + + send_queue_item_free(send_item); + + g_static_mutex_lock(&acon->mutex); + g_queue_pop_head(acon->out); + send_item = g_queue_peek_head(acon->out); + g_static_mutex_unlock(&acon->mutex); + } + +write_eagain: + g_static_mutex_lock(&acon->mutex); + send_queue_clean(acon->out); + out_queue_empty = (0 == acon->out->length); + g_static_mutex_unlock(&acon->mutex); + + if (out_queue_empty) ev_io_rem_events(loop, w, EV_WRITE); + } + + if (revents | EV_READ) { + GError *err = NULL; + if (!angel_connection_read(acon, &err)) { + ev_async_stop(loop, &acon->out_notify_watcher); + ev_io_stop(loop, &acon->fd_watcher); + acon->close_cb(acon, err); + } + } +} + +static void angel_connection_out_notify_cb(struct ev_loop *loop, ev_async *w, int revents) { + angel_connection *acon = (angel_connection*) w->data; + UNUSED(revents); + ev_io_add_events(loop, &acon->fd_watcher, EV_WRITE); +} + +/* create connection */ +angel_connection* angel_connection_create(struct ev_loop *loop, int fd, gpointer data, + AngelReceiveCall recv_call, AngelReceiveResult recv_result, AngelCloseCallback close_cb) { + angel_connection *acon = g_slice_new0(angel_connection); + + acon->data = data; + g_static_mutex_init(&acon->mutex); + acon->loop = loop; + acon->fd = fd; + acon->call_id_list = idlist_new(65535); + ev_io_init(&acon->fd_watcher, angel_connection_io_cb, fd, EV_READ); + acon->fd_watcher.data = acon; + ev_async_init(&acon->out_notify_watcher, angel_connection_out_notify_cb); + acon->out_notify_watcher.data = acon; + acon->out = g_queue_new(); + acon->in.data = g_string_sized_new(0); + acon->in.pos = 0; + + acon->recv_call = recv_call; + acon->recv_result = recv_result; + acon->close_cb = close_cb; + + return acon; +} + +static void angel_call_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + angel_call* call = (angel_call*) w->data; + angel_connection *acon = call->acon; + AngelCallback cb = NULL; + gpointer ctx; + UNUSED(loop); UNUSED(revents); + + g_static_mutex_lock(&acon->mutex); + g_ptr_array_index(acon->call_table, call->id) = NULL; + if (NULL == (cb = call->callback)) { + g_slice_free(angel_call, call); + } + ctx = call->context; + g_static_mutex_unlock(&acon->mutex); + + if (cb) cb(ctx, TRUE, NULL, NULL, NULL); +} + +angel_call *angel_call_create(AngelCallback callback, ev_tstamp timeout) { + angel_call* call = g_slice_new0(angel_call); + + g_assert(NULL != callback); + call->callback = callback; + ev_timer_init(&call->timeout_watcher, angel_call_timeout_cb, 0, timeout); + call->timeout_watcher.data = call; + call->id = -1; + + return call; +} + +/* returns TRUE if a call was cancelled */ +gboolean angel_call_free(angel_call *call) { + gboolean r = FALSE; + + if (call->acon) { + angel_connection *acon = call->acon; + g_static_mutex_lock(&acon->mutex); + if (-1 != call->id) { + r = TRUE; + call->callback = NULL; + } else { + g_slice_free(angel_call, call); + } + g_static_mutex_unlock(&acon->mutex); + } else { + g_slice_free(angel_call, call); + } + + return r; +} + +static gboolean prepare_call_header(GString **pbuf, + gint32 type, gint32 id, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gsize error_len, gsize data_len, gsize fd_count, GError **err) { + GString *buf; + buf = g_string_sized_new(8*4 + mod_len + action_len); + *pbuf = buf; + + if (!angel_data_write_int32(buf, ANGEL_MAGIC, err)) return FALSE; + if (!angel_data_write_int32(buf, type, err)) return FALSE; + if (!angel_data_write_int32(buf, id, err)) return FALSE; + if (!angel_data_write_int32(buf, mod_len, err)) return FALSE; + if (!angel_data_write_int32(buf, action_len, err)) return FALSE; + if (!angel_data_write_int32(buf, error_len, err)) return FALSE; + if (!angel_data_write_int32(buf, data_len, err)) return FALSE; + if (!angel_data_write_int32(buf, fd_count, err)) return FALSE; + + g_string_append_len(buf, mod, mod_len); + g_string_append_len(buf, action, action_len); + + return TRUE; +} + +gboolean angel_send_simple_call( + angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + GString *data, + GError **err) { + GString *buf = NULL; + gboolean queue_was_empty; + + if (err && *err) goto error; + + if (data->len > ANGEL_CALL_MAX_STR_LEN) { + g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); + goto error; + } + + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_SIMPLE, -1, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error; + + g_static_mutex_lock(&acon->mutex); + queue_was_empty = (0 == acon->out->length); + send_queue_push_string(acon->out, buf); + send_queue_push_string(acon->out, data); + g_static_mutex_unlock(&acon->mutex); + + if (queue_was_empty) + ev_async_send(acon->loop, &acon->out_notify_watcher); + + return TRUE; + +error: + if (data) g_string_free(data, TRUE); + if (buf) g_string_free(buf, TRUE); + return FALSE; +} + +gboolean angel_send_call( + angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + angel_call *call, + GString *data, + GError **err) { + GString *buf = NULL; + gboolean queue_was_empty; + + if (err && *err) goto error; + + g_static_mutex_lock(&acon->mutex); + if (-1 != call->id) { + g_static_mutex_unlock(&acon->mutex); + g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_ALREADY_RUNNING, "call already running"); + goto error_before_new_id; + } + + if (-1 == (call->id = idlist_get(acon->call_id_list))) { + g_static_mutex_unlock(&acon->mutex); + g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_OUT_OF_CALL_IDS, "out of call ids"); + goto error; + } + + if ((guint) call->id >= acon->call_table->len) { + g_ptr_array_set_size(acon->call_table, call->id + 1); + } + g_ptr_array_index(acon->call_table, call->id) = call; + g_static_mutex_unlock(&acon->mutex); + + + if (data->len > ANGEL_CALL_MAX_STR_LEN) { + g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); + goto error; + } + + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_CALL, call->id, mod, mod_len, action, action_len, 0, data->len, 0, err)) goto error; + + g_static_mutex_lock(&acon->mutex); + queue_was_empty = (0 == acon->out->length); + send_queue_push_string(acon->out, buf); + send_queue_push_string(acon->out, data); + g_static_mutex_unlock(&acon->mutex); + + if (queue_was_empty) + ev_async_send(acon->loop, &acon->out_notify_watcher); + + return TRUE; + +error: + if (-1 != call->id) { + idlist_put(acon->call_id_list, call->id); + call->id = -1; + } +error_before_new_id: + if (data) g_string_free(data, TRUE); + if (buf) g_string_free(buf, TRUE); + return FALSE; +} + +gboolean angel_send_result( + angel_connection *acon, + const gchar *mod, gsize mod_len, const gchar *action, gsize action_len, + gint32 id, + GString *error, GString *data, GArray *fds, + GError **err) { + GString *buf = NULL; + gboolean queue_was_empty; + + if (err && *err) goto error; + + if (data->len > ANGEL_CALL_MAX_STR_LEN) { + g_set_error(err, ANGEL_CALL_ERROR, ANGEL_CALL_INVALID, "data too lang for angel call: %" G_GSIZE_FORMAT " > %i", data->len, ANGEL_CALL_MAX_STR_LEN); + goto error; + } + + if (!prepare_call_header(&buf, ANGEL_CALL_SEND_RESULT, id, mod, mod_len, action, action_len, error->len, data->len, fds->len, err)) goto error; + + g_static_mutex_lock(&acon->mutex); + queue_was_empty = (0 == acon->out->length); + send_queue_push_string(acon->out, buf); + send_queue_push_string(acon->out, error); + send_queue_push_string(acon->out, data); + send_queue_push_fds(acon->out, fds); + g_static_mutex_unlock(&acon->mutex); + + if (queue_was_empty) + ev_async_send(acon->loop, &acon->out_notify_watcher); + + return TRUE; + +error: + if (data) g_string_free(data, TRUE); + if (buf) g_string_free(buf, TRUE); + return FALSE; + + return FALSE; +} + +/* free temporary needed memroy; call this once in while after some activity */ +void angel_cleanup_tables(angel_connection *acon) { + UNUSED(acon); + /* TODO + guint max_used_id = idlist_cleanup(acon->call_id_list); + g_ptr_array_set_size(acon->call_id_list, max_used_id); + */ +} diff --git a/src/angel_data.c b/src/angel_data.c index 6b10815..7db157a 100644 --- a/src/angel_data.c +++ b/src/angel_data.c @@ -1,5 +1,5 @@ -#include +#include #include /* error handling */ @@ -92,35 +92,44 @@ gboolean angel_data_read_char (angel_buffer *buf, gchar *val, GError **err) { return TRUE; } +gboolean angel_data_read_mem (angel_buffer *buf, GString **val, gsize len, GError **err) { + GString *s; + g_return_val_if_fail(err == NULL || *err == NULL, FALSE); + + if (buf->data->len - buf->pos < len) { + return error_eof(err, "string-data"); + } + s = *val; + if (!s) { + *val = s = g_string_sized_new(len); + } else { + g_string_truncate(s, 0); + } + g_string_append_len(s, buf->data->str + buf->pos, len); + buf->pos += len; + return TRUE; +} + gboolean angel_data_read_str (angel_buffer *buf, GString **val, GError **err) { gint32 ilen; - gsize len; - GString *s; g_return_val_if_fail(err == NULL || *err == NULL, FALSE); if (buf->data->len - buf->pos < sizeof(gint32)) { return error_eof(err, "string-length"); } - memcpy(&ilen, buf->data->str + buf->pos, sizeof(len)); + memcpy(&ilen, buf->data->str + buf->pos, sizeof(ilen)); buf->pos += sizeof(gint32); if (ilen < 0 || ilen > ANGEL_DATA_MAX_STR_LEN) { + buf->pos -= sizeof(gint32); g_set_error(err, ANGEL_DATA_ERROR, ANGEL_DATA_ERROR_INVALID_STRING_LENGTH, "String length in buffer invalid: %i", (gint) ilen); return FALSE; } - len = (gsize) ilen; - if (buf->data->len - buf->pos < len) { - return error_eof(err, "string-data"); - } - s = *val; - if (!s) { - *val = s = g_string_sized_new(len); - } else { - g_string_truncate(s, 0); + if (!angel_data_read_mem(buf, val, (gsize) ilen, err)) { + buf->pos -= sizeof(gint32); + return FALSE; } - g_string_append_len(s, buf->data->str + buf->pos, len); - buf->pos += len; return TRUE; } diff --git a/src/idlist.c b/src/idlist.c index e0d13a0..53b5cfb 100644 --- a/src/idlist.c +++ b/src/idlist.c @@ -97,6 +97,15 @@ gint idlist_get(idlist *l) { return newid; } +gboolean idlist_is_used(idlist *l, gint id) { + GArray *a = l->bitvector; + guint ndx = id / UL_BITS, bndx = id % UL_BITS; + gulong bmask = 1 << bndx; + if (id < 0 || ndx > a->len) return FALSE; + + return (0 != (g_array_index(a, gulong, ndx) & (bmask))); +} + void idlist_put(idlist *l, gint id) { clear_bit(l->bitvector, id); diff --git a/src/utils.c b/src/utils.c index b8a076b..a640b0d 100644 --- a/src/utils.c +++ b/src/utils.c @@ -6,26 +6,76 @@ #include #include +#include + void fatal(const gchar* msg) { fprintf(stderr, "%s\n", msg); abort(); } -void fd_init(int fd) { -#ifdef _WIN32 +void fd_no_block(int fd) { +#ifdef O_NONBLOCK + fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR); +#elif defined _WIN32 int i = 1; + ioctlsocket(fd, FIONBIO, &i); +#else +#error No way found to set non-blocking mode for fds. #endif -#ifdef FD_CLOEXEC - /* close fd on exec (cgi) */ - fcntl(fd, F_SETFD, FD_CLOEXEC); -#endif +} + +void fd_block(int fd) { #ifdef O_NONBLOCK - fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR); + fcntl(fd, F_SETFL, O_RDWR); #elif defined _WIN32 + int i = 0; ioctlsocket(fd, FIONBIO, &i); +#else +#error No way found to set blocking mode for fds. +#endif +} + +void fd_init(int fd) { +#ifdef FD_CLOEXEC + /* close fd on exec (cgi) */ + fcntl(fd, F_SETFD, FD_CLOEXEC); #endif + fd_no_block(fd); +} + +#ifndef _WIN32 +int send_fd(int s, int fd) { /* write fd to unix socket s */ + for ( ;; ) { + if (-1 == ioctl(s, I_SENDFD, fd)) { + switch (errno) { + case EINTR: break; + case EAGAIN: return -2; + default: return -1; + } + } else { + return 0; + } + } } +int receive_fd(int s, int *fd) { /* read fd from unix socket s */ + struct strrecvfd res; + memset(&res, 0, sizeof(res)); + for ( ;; ) { + if (-1 == ioctl(s, I_RECVFD, &res)) { + switch (errno) { + case EINTR: break; + case EAGAIN: return -2; + default: return -1; + } + } else { + *fd = res.fd; + } + } +} +#endif + + void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) { if ((watcher->events & events) == events) return; ev_io_stop(loop, watcher);