2
0
Fork 0

Add idlist, code basic message handling

master
Stefan Bühler 13 years ago
parent 87bb51239c
commit 7a88d7a288

@ -35,7 +35,7 @@ pkg_check_modules (GLIB2 REQUIRED glib-2.0)
SET(GLIB_INCLUDES ${GLIB2_INCLUDE_DIRS} ${GLIB2_INCLUDE_DIRS}/glib-2.0/ ${GLIB2_INCLUDE_DIRS}/glib-2.0/include/)
INCLUDE_DIRECTORIES(${GLIB_INCLUDES})
SET(MAIN_SOURCE libmanda.c)
SET(MAIN_SOURCE libmanda.c idlist.c)
SET(PACKAGE_NAME ${CMAKE_PROJECT_NAME})
SET(PACKAGE_VERSION ${PACKAGE_VERSION})

@ -0,0 +1,114 @@
#include "idlist.h"
#define UL_BITS (sizeof(gulong) * 8)
/* There are often no explicit bit shifts used in this code. This is on purpose, the
* code looks much cleaner without them, the correct constant for *, / and % is easier to calculate
* as constant (UL_BITS) and the compiler should know how to optimize the operations; as UL_BITS is hopefully
* of the form 2^n this should result in bit shifts in the executable code.
*/
manda_IDList* manda_idlist_new(gint max_ids) {
manda_IDList *l = g_slice_new0(manda_IDList);
g_assert(max_ids > 0);
l->bitvector = g_array_new(FALSE, TRUE, sizeof(gulong));
l->max_ids = max_ids;
l->next_free_id = -1;
l->used_ids = 0;
return l;
}
void manda_idlist_free(manda_IDList *l) {
if (!l) return;
g_array_free(l->bitvector, TRUE);
g_slice_free(manda_IDList, l);
}
static void mark_bit(GArray *a, gint id) {
guint ndx = id / UL_BITS, bndx = id % UL_BITS;
gulong bmask = 1 << bndx;
g_assert(id >= 0 && ndx < a->len);
g_assert(0 == (g_array_index(a, gulong, ndx) & (bmask))); /* bit musn't be set */
g_array_index(a, gulong, ndx) |= (bmask);
}
static void clear_bit(GArray *a, gint id) {
guint ndx = id / UL_BITS, bndx = id % UL_BITS;
gulong bmask = 1 << bndx;
g_assert(id >= 0 && ndx < a->len);
g_assert(0 != (g_array_index(a, gulong, ndx) & (bmask))); /* bit must be set */
g_array_index(a, gulong, ndx) &= ~(bmask);
}
static void idlist_reserve(GArray *a, guint id) {
guint ndx = id / UL_BITS;
if (ndx >= a->len) g_array_set_size(a, ndx+1);
}
gint manda_idlist_get(manda_IDList *l) {
guint fndx, ndx;
gint newid, bndx;
gulong u = -1;
GArray *a = l->bitvector;
if (l->used_ids >= l->max_ids) return -1;
if (l->next_free_id < 0) { /* all ids in use */
newid = l->used_ids++;
idlist_reserve(a, newid);
mark_bit(a, newid);
return newid;
}
/* search for an array entry which doesn't have all bits set (i.e. != (gulong) -1)
* start with the entry of next_free_id, all below are in use anyway
*/
fndx = l->next_free_id / UL_BITS;
for (ndx = fndx; ndx < a->len && ((gulong) -1 == (u = g_array_index(a, gulong, ndx))); ndx++) ;
if (ndx == a->len) { /* again: all ids are in use */
l->next_free_id = -1;
newid = l->used_ids++;
idlist_reserve(a, newid);
mark_bit(a, newid);
return newid;
}
/* array entry != -1, search for free bit */
if (fndx == ndx) bndx = (l->next_free_id / UL_BITS) - 1;
else bndx = -1;
bndx = g_bit_nth_lsf(~u, bndx);
/* no free bit found; should never happen as u != -1 and next_free_id should be correct, i.e. all bits <= the bit start index should be set */
g_assert(bndx != -1);
newid = ndx * UL_BITS + bndx;
if (newid == (gint) l->used_ids) {
l->next_free_id = -1;
} else {
l->next_free_id = newid+1;
}
l->used_ids++;
mark_bit(a, newid);
return newid;
}
gboolean manda_idlist_is_used(manda_IDList *l, gint id) {
GArray *a = l->bitvector;
guint ndx = id / UL_BITS, bndx = id % UL_BITS;
gulong bmask = 1 << bndx;
if (id < 0 || ndx >= a->len) return FALSE;
return (0 != (g_array_index(a, gulong, ndx) & (bmask)));
}
void manda_idlist_put(manda_IDList *l, gint id) {
clear_bit(l->bitvector, id);
l->used_ids--;
if ((l->next_free_id < 0 && (guint) id < l->used_ids) || id < l->next_free_id) l->next_free_id = id;
}

@ -0,0 +1,39 @@
#ifndef _LIBMANDA_IDLIST_H_
#define _LIBMANDA_IDLIST_H_
#include "libmanda.h"
struct manda_IDList {
/* used ids are marked with a "1" in the bitvector (represented as array of gulong) */
GArray *bitvector;
/* all ids are in the range [0, max_ids[, i.e. 0 <= id < max_ids
* although the type is guint, it has to fit in a gint too, as we
* use gint for the ids in the interface, so we can use -1 as a special value.
*/
guint max_ids;
/* if all ids in [0, used_ids-1] are used, next_free_id is -1
* if not, then all available ids are >= next_free_id,
* so we can start at next_free_id for searching the next free id
*/
gint next_free_id;
guint used_ids;
};
/* create new idlist; the parameter max_ids is "signed" on purpose */
manda_IDList* manda_idlist_new(gint max_ids);
/* free idlist */
void manda_idlist_free(manda_IDList *l);
/* request new id; return -1 if no id is available, valid ids are always > 0 */
gint manda_idlist_get(manda_IDList *l);
/* check whether an id is in use and can be "_put" */
gboolean manda_idlist_is_used(manda_IDList *l, gint id);
/* release id. never release an id more than once! */
void manda_idlist_put(manda_IDList *l, gint id);
#endif

@ -1,5 +1,6 @@
#include "libmanda.h"
#include "idlist.h"
#include <arpa/inet.h>
#include <errno.h>
@ -8,3 +9,727 @@
#include <unistd.h>
#include <string.h>
#include <assert.h>
#define TIMEOUT_STEP (3)
#define ENTER(x) do { ++((x)->refcount); } while(0)
#define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0)
#if __GNUC__
# define INLINE static inline
#else
# define INLINE static
#endif
typedef struct messageheader messageheader;
typedef struct request request;
typedef struct manda_connection manda_connection;
typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
typedef void (*con_close_cb)(manda_connection *con);
struct messageheader {
guint16 command, size, reqid, respid;
};
struct request {
guint16 command; /* zero means slot is free */
double timeout, timeout_step;
guint16 timeout_prev_id, timeout_next_id;
gpointer data;
};
struct manda_connection {
gpointer data; /* application data */
gpointer priv_data; /* internal data */
const manda_async_ctrl *ctrl;
con_message_cb message_cb;
con_close_cb close_cb;
int fd;
manda_fd_watcher fd_watcher;
manda_timeout req_timeout;
manda_IDList *request_ids;
GArray *requests;
guint16 timeout_first_id, timeout_last_id;
guint cur_header_pos; /* how many bytes of the header we have */
guint8 cur_header_buf[8];
messageheader cur_header;
guint cur_payload_pos;
GByteArray *cur_payload;
guint cur_send_pos;
GQueue send_queue;
gint refcount;
};
INLINE guint8 _read_net_uint8(const guint8* buf) {
return *buf;
}
INLINE guint16 _read_net_uint16(const guint8* buf) {
guint16 i;
memcpy(&i, buf, sizeof(i));
return ntohs(i);
}
INLINE guint32 _read_net_uint32(const guint8* buf) {
guint32 i;
memcpy(&i, buf, sizeof(i));
return ntohl(i);
}
INLINE void _write_net_uint8(guint8 *buf, guint8 val) {
*buf = val;
}
INLINE void _write_net_uint16(guint8 *buf, guint16 val) {
val = htons(val);
memcpy(buf, &val, sizeof(val));
}
INLINE void _write_net_uint32(guint8 *buf, guint32 val) {
val = htons(val);
memcpy(buf, &val, sizeof(val));
}
INLINE gboolean read_net_uint8(GByteArray *buf, guint *pos, guint8 *dest) {
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
/* end of buffer */
*dest = 0;
*pos = buf->len;
return FALSE;
}
*dest = _read_net_uint8(buf->data + *pos);
*pos += sizeof(*dest);
return TRUE;
}
INLINE gboolean read_net_uint16(GByteArray *buf, guint *pos, guint16 *dest) {
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
/* end of buffer */
*dest = 0;
*pos = buf->len;
return FALSE;
}
*dest = _read_net_uint16(buf->data + *pos);
*pos += sizeof(*dest);
return TRUE;
}
INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) {
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
/* end of buffer */
*dest = 0;
*pos = buf->len;
return FALSE;
}
*dest = _read_net_uint32(buf->data + *pos);
*pos += sizeof(*dest);
return TRUE;
}
INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) {
guint16 slen;
g_string_truncate(dest, 0);
if (!read_net_uint16(buf, pos, &slen)) return FALSE;
if (buf->len < slen || *pos >= buf->len - slen) {
*pos = buf->len;
return FALSE;
}
g_string_set_size(dest, slen);
memcpy(dest->str, buf->data + *pos, slen);
*pos += slen;
return TRUE;
}
INLINE void write_net_uint8(GByteArray *buf, guint8 val) {
guint curlen = buf->len;
g_byte_array_set_size(buf, curlen + sizeof(val));
_write_net_uint8(buf->data + curlen, val);
}
INLINE void write_net_uint16(GByteArray *buf, guint16 val) {
guint curlen = buf->len;
g_byte_array_set_size(buf, curlen + sizeof(val));
_write_net_uint16(buf->data + curlen, val);
}
INLINE void write_net_uint32(GByteArray *buf, guint32 val) {
guint curlen = buf->len;
g_byte_array_set_size(buf, curlen + sizeof(val));
_write_net_uint32(buf->data + curlen, val);
}
INLINE void write_net_string(GByteArray *buf, GString *val) {
guint curlen = buf->len;
g_byte_array_set_size(buf, curlen + sizeof(guint16) + val->len);
_write_net_uint16(buf->data + curlen, val->len);
memcpy(buf->data + curlen + sizeof(guint16), val->str, val->len);
}
static void con_req_unlink(manda_connection *con, request *req) {
request *prev = NULL, *next = NULL;
/* unlink */
if (req->timeout_next_id != 0) {
next = &g_array_index(con->requests, request, req->timeout_next_id);
next->timeout_prev_id = req->timeout_prev_id;
} else {
con->timeout_last_id = req->timeout_prev_id;
}
if (req->timeout_prev_id != 0) {
prev = &g_array_index(con->requests, request, req->timeout_prev_id);
prev->timeout_next_id = req->timeout_next_id;
} else {
con->timeout_first_id = req->timeout_next_id;
}
req->timeout_next_id = req->timeout_prev_id = 0;
}
static void con_req_push(manda_connection *con, request *req, guint16 reqid) {
request *prev;
if (con->timeout_last_id != 0) {
prev = &g_array_index(con->requests, request, con->timeout_last_id);
prev->timeout_next_id = reqid;
} else {
con->timeout_first_id = reqid;
}
req->timeout_prev_id = con->timeout_last_id;
con->timeout_last_id = reqid;
}
static void con_close(manda_connection *con) {
GByteArray *buf;
if (NULL != con->fd_watcher.priv) {
con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher);
con->fd_watcher.priv = NULL;
}
if (NULL != con->req_timeout.priv) {
con->ctrl->destroy_timeout(con->data, &con->req_timeout);
con->req_timeout.priv = NULL;
}
if (-1 != con->fd) {
while (-1 == close(con->fd) && errno == EINTR) ;
con->fd = -1;
}
while (NULL != (buf = g_queue_pop_head(&con->send_queue))) {
g_byte_array_free(buf, TRUE);
}
con->cur_send_pos = 0;
if (NULL != con->close_cb) {
con->close_cb(con);
}
/* "timeout" all requests */
for ( ; con->timeout_first_id > 0; ) {
request *req;
guint16 req_command;
gpointer req_data;
req = &g_array_index(con->requests, request, con->timeout_first_id);
req_command = req->command;
req_data = req->data;
con_req_unlink(con, req);
/* reset request */
req->command = 0;
req->timeout = 0;
req->data = NULL;
if (NULL != con->message_cb) {
con->message_cb(con, req_command, req_data, 0, 0, NULL);
}
}
if (NULL != con->request_ids) {
manda_idlist_free(con->request_ids);
con->request_ids = NULL;
}
if (NULL != con->requests) {
g_array_free(con->requests, TRUE);
con->requests = NULL;
}
if (NULL != con->cur_payload) {
g_byte_array_free(con->cur_payload, TRUE);
con->cur_payload = NULL;
}
}
static void _con_free(manda_connection *con) {
con_close(con);
g_slice_free(manda_connection, con);
}
static void con_free(manda_connection *con) {
LEAVE(con, _con_free);
}
static void con_handle_response(manda_connection *con) {
GByteArray *payload = con->cur_payload;
guint16 orig_command = 0, mesg_command = con->cur_header.command, resp_to = con->cur_header.respid, req_id = con->cur_header.reqid;
gpointer orig_data = NULL;
con->cur_header_pos = 0;
con->cur_payload_pos = 0;
con->cur_payload = NULL;
if (0 != resp_to) {
request *req;
if (!manda_idlist_is_used(con->request_ids, resp_to)) {
/* protocol error */
con_close(con);
goto clean;
}
manda_idlist_put(con->request_ids, resp_to);
/* try to find request data; if we can't find it the timeout already triggered */
if (resp_to >= con->requests->len) goto clean;
req = &g_array_index(con->requests, request, resp_to);
if (0 == req->command) goto clean; /* "empty" request */
orig_command = req->command;
orig_data = req->data;
/* reset request */
con_req_unlink(con, req);
req->command = 0;
req->timeout = req->timeout_step = 0;
req->data = NULL;
}
if (NULL != con->message_cb) {
con->message_cb(con, orig_command, orig_data, mesg_command, req_id, payload);
}
clean:
if (NULL != payload) g_byte_array_free(payload, TRUE);
}
static void con_fd_watcher_update(manda_connection *con) {
if (con->fd != -1) {
int events = (con->send_queue.length > 0) ? MANDA_FD_READ | MANDA_FD_WRITE : MANDA_FD_READ;
if (events != con->fd_watcher.events) {
con->fd_watcher.events = events;
con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
}
}
}
static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
manda_connection *con = watcher->priv;
guint i;
ENTER(con);
/* handle read */
for ( i = 0 ; (con->fd != -1) && (i < 100) ; i++ ) {
if (con->cur_header_pos < 8) {
ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos);
if (r < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
break;
case ECONNRESET: /* "eof" */
con_close(con);
goto out;
default:
con_close(con);
goto out;
}
break;
} else if (r == 0) { /* eof */
con_close(con);
goto out;
} else {
con->cur_header_pos += r;
}
}
if (con->cur_header_pos < 8) break;
/* parse header */
con->cur_header.command = _read_net_uint16(con->cur_header_buf + 0);
con->cur_header.size = _read_net_uint16(con->cur_header_buf + 2);
con->cur_header.reqid = _read_net_uint16(con->cur_header_buf + 4);
con->cur_header.respid = _read_net_uint16(con->cur_header_buf + 6);
if (con->cur_header.size < 8) {
/* error */
con_close(con);
} else if (con->cur_header.size > 8) {
ssize_t r;
if (!con->cur_payload) {
con->cur_payload = g_byte_array_sized_new(con->cur_header.size - 8);
g_byte_array_set_size(con->cur_payload, con->cur_header.size - 8);
con->cur_payload_pos = 0;
}
r = read(con->fd, &con->cur_payload->data[con->cur_payload_pos], con->cur_payload->len - con->cur_payload_pos);
if (r < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
break;
case ECONNRESET: /* "eof" */
con_close(con);
goto out;
default:
con_close(con);
goto out;
}
break;
} else if (r == 0) { /* eof */
con_close(con);
goto out;
} else {
con->cur_payload_pos += r;
}
if (con->cur_payload_pos < con->cur_payload->len) break;
con_handle_response(con);
} else {
con_handle_response(con);
}
}
for ( i = 0 ; (con->fd != -1) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
GByteArray *buf = g_queue_peek_head(&con->send_queue);
ssize_t written;
written = write(con->fd, &buf->data[con->cur_send_pos], buf->len - con->cur_send_pos);
if (written < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
break;
case ECONNRESET:
case EPIPE:
con_close(con);
goto out;
default: /* Fatal error/remote close, connection has to be closed */
con_close(con);
goto out;
}
break;
} else {
con->cur_send_pos += written;
}
if (con->cur_send_pos == buf->len) {
con->cur_send_pos = 0;
g_queue_pop_head(&con->send_queue);
g_byte_array_free(buf, TRUE);
} else {
break;
}
}
con_fd_watcher_update(con);
out:
LEAVE(con, _con_free);
}
static void con_timeout_cb(manda_timeout *timeout) {
manda_connection *con = timeout->priv;
double now = con->ctrl->get_time(con->data);
for ( ; con->timeout_first_id != 0; ) {
request *req;
guint16 req_command;
gpointer req_data;
guint16 reqid = con->timeout_first_id;
req = &g_array_index(con->requests, request, con->timeout_first_id);
if (req->timeout_step > now) break;
if (req->timeout > now) {
req->timeout_step = now + TIMEOUT_STEP;
if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
/* requeue */
con_req_unlink(con, req);
con_req_push(con, req, reqid);
continue;
}
req_command = req->command;
req_data = req->data;
/* reset request */
con_req_unlink(con, req);
req->command = 0;
req->timeout = req->timeout_step = 0;
req->data = NULL;
if (NULL != con->message_cb) {
con->message_cb(con, req_command, req_data, 0, 0, NULL);
}
}
if (con->timeout_first_id != 0) {
request *req;
req = &g_array_index(con->requests, request, con->timeout_first_id);
con->req_timeout.timeout = req->timeout_step;
con->ctrl->start_timeout(con->data, &con->req_timeout);
}
}
static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpointer priv_data, con_message_cb message_cb, con_close_cb close_cb, int fd) {
manda_connection *con = g_slice_new0(manda_connection);
gint first_id;
con->data = srv;
con->ctrl = ctrl;
con->priv_data = priv_data;
con->message_cb = message_cb;
con->close_cb = close_cb;
con->fd = fd;
con->fd_watcher.priv = con;
con->fd_watcher.callback = con_fd_watcher_cb;
con->fd_watcher.events = MANDA_FD_READ;
con->fd_watcher.fd = fd;
con->req_timeout.priv = con;
con->req_timeout.callback = con_timeout_cb;
con->req_timeout.timeout = 0;
con->request_ids = manda_idlist_new(65535);
/* id 0 is reserved so request it here */
first_id = manda_idlist_get(con->request_ids);
assert(0 == first_id);
con->requests = g_array_new(FALSE, TRUE, sizeof(request));
con->ctrl->new_fd_watcher(con->data, &con->fd_watcher);
con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
con->ctrl->new_timeout(con->data, &con->req_timeout);
return con;
}
static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id, guint16 resp_id) {
_write_net_uint16(payload->data + 0, command);
_write_net_uint16(payload->data + 2, payload->len);
_write_net_uint16(payload->data + 4, req_id);
_write_net_uint16(payload->data + 6, resp_id);
}
/* payload needs to be prefixed with 8 dummy bytes for the header */
static void con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
double now;
gint reqid;
request *req;
ENTER(con);
if (-1 == con->fd) {
/* connection closed */
goto error;
}
if (payload->len > 65535 || payload->len < 8 || 0 == command || wait_timeout <= 0) {
/* payload too big / invalid parameters */
goto error;
}
reqid = manda_idlist_get(con->request_ids);
if (-1 == reqid) goto error; /* no free request id available */
assert(reqid > 0 && reqid < 65536);
if ((guint) reqid > con->requests->len) {
g_array_set_size(con->requests, reqid+1);
}
req = &g_array_index(con->requests, request, reqid);
now = con->ctrl->get_time(con->data);
req->timeout = now + wait_timeout;;
req->timeout_step = now + TIMEOUT_STEP;
if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
if (con->timeout_first_id == 0) {
con->req_timeout.timeout = req->timeout_step;
con->ctrl->start_timeout(con->data, &con->req_timeout);
}
con_fix_header(payload, command, reqid, resp_id);
con_req_push(con, req, reqid);
g_queue_push_tail(&con->send_queue, payload);
con_fd_watcher_update(con);
goto out;
error:
if (NULL != con->message_cb) {
con->message_cb(con, command, data, 0, 0, NULL);
}
g_byte_array_free(payload, TRUE);
out:
LEAVE(con, _con_free);
}
/* payload needs to be prefixed with 8 dummy bytes for the header */
static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
ENTER(con);
if (-1 == con->fd) {
/* connection closed */
goto error;
}
if (payload->len > 65535 || payload->len < 8 || 0 == command) {
/* payload too big / invalid parameters */
goto error;
}
con_fix_header(payload, command, 0, resp_id);
g_queue_push_tail(&con->send_queue, payload);
con_fd_watcher_update(con);
goto out;
error:
g_byte_array_free(payload, TRUE);
out:
LEAVE(con, _con_free);
}
typedef struct server_socket server_socket;
struct server_socket {
int fd;
gpointer data;
};
static void manda_server_connection_free(manda_server_connection *con);
static void manda_server_backend_release(manda_server_connection *con, guint32 id);
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) {
manda_server *s = g_slice_new(manda_server);
s->refcount = 1;
s->data = srv;
s->connections = g_ptr_array_new();
s->ctrl = ctrl;
s->callbacks = callbacks;
s->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket));
return s;
}
void manda_server_acquire(manda_server *s) {
++s->refcount;
}
void manda_server_close(manda_server *s) {
guint i;
for (i = 0; i < s->sockets->len; i++) {
server_socket *sock = &g_array_index(s->sockets, server_socket, i);
close(sock->fd);
}
g_array_set_size(s->sockets, 0);
for (i = s->connections->len; i-- > 0; ) {
manda_server_connection *con = g_ptr_array_index(s->connections, i);
con->delete_later = TRUE;
s->callbacks->closed_connection(s->data, con);
if (con->refcount == 0) manda_server_connection_free(con);
}
g_ptr_array_set_size(s->connections, 0);
}
void manda_server_release(manda_server *s) {
if (NULL == s) return;
if (0 < --s->refcount) return;
manda_server_close(s);
g_array_free(s->sockets, TRUE);
g_ptr_array_free(s->connections, TRUE);
g_slice_free(manda_server, s);
}
void manda_server_add_socket(manda_server *s, int fd, gpointer data);
static void manda_server_connection_free(manda_server_connection *con) {
/* TODO */
g_slice_free(manda_server_connection, con);
}
void manda_server_con_close(manda_server_connection *con);
static void manda_server_backend_release(manda_server_connection *con, guint32 id) {
manda_server_backend_use *use;
manda_server_backend *b;
if (id >= con->backends->len) return;
use = g_ptr_array_index(con->backends, id);
if (NULL == use) return;
b = use->backend;
if (use->ndx != b->usage->len - 1) {
manda_server_backend_use *u = g_ptr_array_index(b->usage, b->usage->len - 1);
g_ptr_array_index(b->usage, use->ndx) = u;
u->ndx = use->ndx;
}
b->sum_last_load -= use->last_load;
g_ptr_array_set_size(b->usage, b->usage->len-1);
con->refcount++;
con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
g_slice_free(manda_server_backend_use, use);
}
manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
void manda_server_backend_free(manda_server_backend *backend);
void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
void manda_server_drop_backend(manda_server_backend *backend);

@ -7,6 +7,9 @@
#include <sys/socket.h>
/* idlist */
typedef struct manda_IDList manda_IDList;
typedef enum {
MANDA_FD_READ = 1,
MANDA_FD_WRITE = 2
@ -18,8 +21,12 @@ typedef struct manda_timeout manda_timeout;
typedef struct manda_server_callbacks manda_server_callbacks;
typedef struct manda_server_connection manda_server_connection;
typedef struct manda_server_backend_use manda_server_backend_use;
typedef struct manda_server_backend manda_server_backend;
typedef struct manda_server manda_server;
typedef struct manda_client_backend_callbacks manda_client_backend_callbacks;
typedef struct manda_client_backend manda_client_backend;
typedef struct manda_client manda_client;
typedef void (*manda_fd_watcher_cb)(manda_fd_watcher *watcher);
@ -34,6 +41,8 @@ typedef void (*manda_new_timeout)(gpointer srv, manda_timeout *timeout);
typedef void (*manda_start_timeout)(gpointer srv, manda_timeout *timeout);
typedef void (*manda_destroy_timeout)(gpointer srv, manda_timeout *timeout);
typedef double (*manda_get_time)(gpointer srv);
struct manda_async_ctrl {
manda_new_fd_watcher new_fd_watcher;
manda_update_fd_watcher update_fd_watcher;
@ -42,23 +51,25 @@ struct manda_async_ctrl {
manda_new_timeout new_timeout;
manda_start_timeout start_timeout; /* one shot */
manda_destroy_timeout destroy_timeout;
manda_get_time get_time;
};
struct manda_fd_watcher {
gpointer data; /* application data */
manda_fd_watcher_cb callback;
int events; /* bitmask of manda_async_events; "update_fd_watcher" needs to check this */
int fd; /* filedescriptor; doesn't get changed after "new_fd_watcher" */
/* private from here */
gpointer priv;
int events; /* bitmask of manda_async_events; "update_fd_watcher" needs to check this */
int fd; /* filedescriptor; doesn't get changed after "new_fd_watcher" */
};
struct manda_timeout {
gpointer data; /* application data */
manda_timeout_cb callback;
double timeout; /* absolute timestamp; check in start_timeout */
/* private from here */
@ -70,43 +81,105 @@ struct manda_timeout {
typedef void (*manda_server_new_connection)(gpointer srv, manda_server_connection *con);
typedef void (*manda_server_closed_connection)(gpointer srv, manda_server_connection *con);
typedef void (*manda_server_acquire_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
typedef void (*manda_server_update_backend)(gpointer srv, manda_server_connection *con, guint32 load, guint32 backends);
typedef void (*manda_server_release_backend)(gpointer srv, manda_server_connection *con, guint32 id);
typedef void (*manda_server_bind_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid);
typedef void (*manda_server_update_backend)(gpointer srv, manda_server_backend *backend, guint ndx);
typedef void (*manda_server_release_backend)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use);
struct manda_server_callbacks {
manda_server_new_connection server_new_connection;
manda_server_closed_connection server_closed_connection;
manda_server_new_connection new_connection;
manda_server_closed_connection closed_connection;
manda_server_acquire_backend server_acquire_backend;
manda_server_update_backend server_update_backend;
manda_server_release_backend server_release_backend;
manda_server_bind_backend bind_backend;
manda_server_update_backend update_backend;
manda_server_release_backend release_backend;
};
struct manda_server_connection {
gpointer data; /* application data */
manda_server *srv;
/* private from here */
gint refcount;
gboolean delete_later;
GPtrArray *backends; /* manda_server_backend_use */
manda_IDList *idlist;
};
struct manda_server_backend_use {
manda_server_connection *con;
guint32 backend_id;
guint32 last_load, last_backends;
/* private from here */
manda_server_backend *backend;
guint ndx;
};
struct manda_server_backend {
gpointer data; /* application data */
GPtrArray *usage; /* array of manda_server_backend_use */
guint32 sum_last_load;
GString *addr;
/* private from here */
gint refcount;
gboolean delete_later;
};
struct manda_server {
gint refcount;
gpointer data; /* application data */
GPtrArray *connections;
/* private from here */
const manda_async_ctrl *ctrl;
const manda_server_callbacks *callbacks;
GArray *sockets;
};
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks);
void manda_server_acquire(manda_server *s);
void manda_server_release(manda_server *s);
/* close everything */
void manda_server_close(manda_server *s);
void manda_server_add_socket(manda_server *s, int fd, gpointer data);
void manda_server_con_close(manda_server_connection *con);
manda_server_backend *manda_server_backend_new(gpointer data, GString *addr);
void manda_server_backend_free(manda_server_backend *backend);
void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend);
void manda_server_drop_backend(manda_server_backend *backend);
/* Client API */
typedef void (*manda_client_return_backend)(gpointer srv, manda_client_backend *backend);
/* backend will be free()d after the callback, don't keep the pointer; reason is NULL if the connection was closed */
typedef void (*manda_client_lost_backend)(gpointer srv, manda_client_backend *backend, GString *reason);
struct manda_client_backend_callbacks {
manda_client_return_backend return_backend;
manda_client_lost_backend lost_backend;
};
struct manda_client_backend {
gpointer data; /* application data */
manda_client *client;
GString *addr;
/* private from here */
const manda_client_backend_callbacks *callbacks;
gboolean do_release;
};
struct manda_client { /* private */
gint refcount;
gpointer data; /* application data */
@ -121,5 +194,7 @@ manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struc
void manda_client_acquire(manda_client *c);
void manda_client_release(manda_client *c);
manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, const manda_client_backend_callbacks *callbacks);
void manda_client_release_backend(manda_client *c, manda_client_backend *backend);
#endif

Loading…
Cancel
Save