From 3ab3fc6e6228cf4a79c382cfb11a7a67ecd68c12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Fri, 10 Sep 2010 16:09:17 +0200 Subject: [PATCH] Implement server part of libmanda --- libmanda-protocol.h | 8 + libmanda.c | 467 +++++++++++++++++++++++++++++++++++++++----- libmanda.h | 38 ++-- 3 files changed, 451 insertions(+), 62 deletions(-) diff --git a/libmanda-protocol.h b/libmanda-protocol.h index 260aa87..cc8656e 100644 --- a/libmanda-protocol.h +++ b/libmanda-protocol.h @@ -97,4 +97,12 @@ * */ +typedef enum { + MANDA_CMD_BIND_BACKEND = 0x0001, + MANDA_CMD_RELEASE_BACKEND = 0x0002, + MANDA_CMD_UPDATE_BACKEND = 0x0003, + + MANDA_CMD_UNKNOWN_COMMAND = 0xffff +} manda_commands; + #endif diff --git a/libmanda.c b/libmanda.c index 8dd8c9f..d900864 100644 --- a/libmanda.c +++ b/libmanda.c @@ -1,5 +1,6 @@ #include "libmanda.h" +#include "libmanda-protocol.h" #include "idlist.h" #include @@ -16,6 +17,11 @@ #define ENTER(x) do { ++((x)->refcount); } while(0) #define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0) +#define UNUSED(x) ((void)(x)) + +#define CONST_STR_LEN(x) (x), (sizeof(x) - 1) +#define GSTR_LEN(x) ((x) ? (x)->str : ""), ((x) ? (x)->len : 0) + #if __GNUC__ # define INLINE static inline #else @@ -24,7 +30,6 @@ typedef struct messageheader messageheader; typedef struct request request; -typedef struct manda_connection manda_connection; typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload); typedef void (*con_close_cb)(manda_connection *con); @@ -51,6 +56,7 @@ struct manda_connection { con_close_cb close_cb; int fd; + gboolean closed; manda_fd_watcher fd_watcher; manda_timeout req_timeout; @@ -71,6 +77,28 @@ struct manda_connection { gint refcount; }; +static void fd_no_block(int fd) { +#ifdef O_NONBLOCK + fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR); +#elif defined _WIN32 + int i = 1; + ioctlsocket(fd, FIONBIO, &i); +#else +#error No way found to set non-blocking mode for fds. +#endif +} + +static void fd_init(int fd) { +#ifdef FD_CLOEXEC + /* close fd on exec (cgi) */ + fcntl(fd, F_SETFD, FD_CLOEXEC); +#endif + fd_no_block(fd); +} + +/* -------------------------- */ +/* Message building / parsing */ +/* -------------------------- */ INLINE guint8 _read_net_uint8(const guint8* buf) { return *buf; @@ -133,6 +161,7 @@ INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) { } INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) { guint16 slen; + g_string_truncate(dest, 0); if (!read_net_uint16(buf, pos, &slen)) return FALSE; if (buf->len < slen || *pos >= buf->len - slen) { @@ -160,14 +189,19 @@ INLINE void write_net_uint32(GByteArray *buf, guint32 val) { g_byte_array_set_size(buf, curlen + sizeof(val)); _write_net_uint32(buf->data + curlen, val); } -INLINE void write_net_string(GByteArray *buf, GString *val) { +INLINE void write_net_string(GByteArray *buf, const gchar *str, guint len) { guint curlen = buf->len; - g_byte_array_set_size(buf, curlen + sizeof(guint16) + val->len); - _write_net_uint16(buf->data + curlen, val->len); - memcpy(buf->data + curlen + sizeof(guint16), val->str, val->len); + g_byte_array_set_size(buf, curlen + sizeof(guint16) + len); + _write_net_uint16(buf->data + curlen, len); + memcpy(buf->data + curlen + sizeof(guint16), str, len); } +/* ---------------- */ +/* Basic Connection */ +/* ---------------- */ + +static void _con_free(manda_connection *con); static void con_req_unlink(manda_connection *con, request *req) { request *prev = NULL, *next = NULL; @@ -205,6 +239,12 @@ static void con_req_push(manda_connection *con, request *req, guint16 reqid) { static void con_close(manda_connection *con) { GByteArray *buf; + if (con->closed) return; + + con->closed = TRUE; + + ENTER(con); + if (NULL != con->fd_watcher.priv) { con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher); con->fd_watcher.priv = NULL; @@ -264,9 +304,13 @@ static void con_close(manda_connection *con) { g_byte_array_free(con->cur_payload, TRUE); con->cur_payload = NULL; } + + LEAVE(con, _con_free); } static void _con_free(manda_connection *con) { + ENTER(con); /* don't want to enter _con_free again */ + con_close(con); g_slice_free(manda_connection, con); @@ -336,7 +380,7 @@ static void con_fd_watcher_cb(manda_fd_watcher *watcher) { ENTER(con); /* handle read */ - for ( i = 0 ; (con->fd != -1) && (i < 100) ; i++ ) { + for ( i = 0 ; (!con->closed) && (i < 100) ; i++ ) { if (con->cur_header_pos < 8) { ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos); @@ -419,7 +463,7 @@ static void con_fd_watcher_cb(manda_fd_watcher *watcher) { } } - for ( i = 0 ; (con->fd != -1) && (i < 100) && (con->send_queue.length > 0) ; i++ ) { + for ( i = 0 ; (!con->closed) && (i < 100) && (con->send_queue.length > 0) ; i++ ) { GByteArray *buf = g_queue_peek_head(&con->send_queue); ssize_t written; @@ -531,7 +575,7 @@ static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpo con->req_timeout.callback = con_timeout_cb; con->req_timeout.timeout = 0; - con->request_ids = manda_idlist_new(65535); + con->request_ids = manda_idlist_new(65536); /* id 0 is reserved so request it here */ first_id = manda_idlist_get(con->request_ids); assert(0 == first_id); @@ -560,7 +604,7 @@ static void con_send_request(manda_connection *con, GByteArray *payload, guint16 ENTER(con); - if (-1 == con->fd) { + if (con->closed) { /* connection closed */ goto error; } @@ -611,10 +655,12 @@ out: } /* payload needs to be prefixed with 8 dummy bytes for the header */ -static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) { +static gboolean con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) { + gboolean res = TRUE; + ENTER(con); - if (-1 == con->fd) { + if (con->closed) { /* connection closed */ goto error; } @@ -633,21 +679,176 @@ static void con_send_notify(manda_connection *con, GByteArray *payload, guint16 error: g_byte_array_free(payload, TRUE); + res = FALSE; out: LEAVE(con, _con_free); + + return res; } +/* message construction helper */ +static GByteArray* new_payload() { + GByteArray* buf = g_byte_array_new(); + g_byte_array_set_size(buf, 8); + return buf; +} + +static gboolean send_unkown_command(manda_connection *con, guint16 cmd, guint16 resp_id) { + GByteArray *payload; + if (NULL == con) return FALSE; + + payload = new_payload(); + write_net_uint16(payload, cmd); + return con_send_notify(con, payload, MANDA_CMD_UNKNOWN_COMMAND, resp_id); +} + +static gboolean send_server_bind_backend(manda_connection *con, guint16 resp_id, guint32 backend_id, const gchar *str, guint len) { + GByteArray *payload; + if (NULL == con) return FALSE; + + payload = new_payload(); + write_net_uint32(payload, backend_id); + write_net_string(payload, str, len); + return con_send_notify(con, payload, MANDA_CMD_BIND_BACKEND, resp_id); +} + +static gboolean send_server_bind_backend_failed(manda_connection *con, guint16 resp_id, const gchar *str, guint len) { + return send_server_bind_backend(con, resp_id, 0, str, len); +} + +static gboolean send_server_release_backend(manda_connection *con, guint32 backend_id, const gchar *str, guint len) { + GByteArray *payload; + if (NULL == con) return FALSE; + + payload = new_payload(); + write_net_uint32(payload, backend_id); + write_net_string(payload, str, len); + return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0); +} + +/* server connection */ typedef struct server_socket server_socket; struct server_socket { + manda_server *srv; + int fd; gpointer data; + + manda_fd_watcher fd_watcher; }; -static void manda_server_connection_free(manda_server_connection *con); -static void manda_server_backend_release(manda_server_connection *con, guint32 id); +static void scon_con_closed_cb(manda_connection *con); +static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload); +static void scon_release(manda_server_connection *con); + +static void sbackend_free(manda_server_backend *backend); + +static void manda_server_release_backend(manda_server_connection *con, guint32 id); + +static void server_listening_cb(manda_fd_watcher *fd_watcher) { + server_socket *sock = fd_watcher->priv; + manda_server *s = sock->srv; + int fd; + int i; + manda_server_connection *con; + + fd = accept(sock->fd, NULL, NULL); + + if (-1 == fd) return; + + fd_init(fd); + + con = g_slice_new0(manda_server_connection); + con->srv = s; + con->refcount = 1; + con->backends = g_ptr_array_new(); + con->idlist = manda_idlist_new(65536); + i = manda_idlist_get(con->idlist); /* 0 is an invalid backend id, reserve it */ + assert(i == 0); + con->con = con_new(s->data, s->ctrl, con, scon_con_message_cb, scon_con_closed_cb, fd); +} + +static void scon_free(manda_server_connection *con) { + ENTER(con); /* don't want to enter scon_free again */ + + manda_server_con_close(con); + + g_slice_free(manda_server_connection, con); +} +static void scon_release(manda_server_connection *con) { + LEAVE(con, scon_free); +} + +void manda_server_con_close(manda_server_connection *con) { + guint i; + manda_connection *bcon = con->con; + + if (NULL == bcon) return; + + ENTER(con); + + g_ptr_array_remove_fast(con->srv->connections, con); + scon_release(con); + + con->con = NULL; + bcon->close_cb = NULL; + con_close(bcon); + con_free(bcon); + + if (NULL != con->srv->callbacks->closed_connection) { + con->srv->callbacks->closed_connection(con->srv->data, con); + } + + for (i = 0; i < con->backends->len; i++) { + manda_server_backend_use *use = g_ptr_array_index(con->backends, i); + if (NULL == use) continue; + manda_server_release_backend(con, i); + } + + g_ptr_array_free(con->backends, TRUE); + con->backends = NULL; + manda_idlist_free(con->idlist); + con->idlist = NULL; + + LEAVE(con, scon_free); +} + +static void scon_con_closed_cb(manda_connection *con) { + manda_server_connection *scon = con->priv_data; + + manda_server_con_close(scon); +} + +void manda_server_add_socket(manda_server *s, int fd, gpointer data) { + server_socket *sock = g_slice_new0(server_socket); + + sock->srv = s; + sock->fd = fd; + sock->data = data; + sock->fd_watcher.callback = server_listening_cb; + sock->fd_watcher.events = MANDA_FD_READ; + sock->fd_watcher.fd = fd; + sock->fd_watcher.priv = sock; + + s->ctrl->new_fd_watcher(s->data, &sock->fd_watcher); + s->ctrl->update_fd_watcher(s->data, &sock->fd_watcher); + + g_ptr_array_add(s->sockets, sock); +} + +static void manda_server_free_socket(server_socket *sock) { + manda_server *s = sock->srv; + + s->ctrl->destroy_fd_watcher(s->data, &sock->fd_watcher); + + while (-1 == close(sock->fd) && errno == EINTR) ; + sock->fd = -1; + + g_slice_free(server_socket, sock); +} manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) { manda_server *s = g_slice_new(manda_server); @@ -656,62 +857,61 @@ manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const s->connections = g_ptr_array_new(); s->ctrl = ctrl; s->callbacks = callbacks; - s->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket)); + s->sockets = g_ptr_array_new(); return s; } -void manda_server_acquire(manda_server *s) { - ++s->refcount; -} - void manda_server_close(manda_server *s) { guint i; for (i = 0; i < s->sockets->len; i++) { - server_socket *sock = &g_array_index(s->sockets, server_socket, i); - close(sock->fd); + server_socket *sock = g_ptr_array_index(s->sockets, i); + manda_server_free_socket(sock); } - g_array_set_size(s->sockets, 0); + g_ptr_array_set_size(s->sockets, 0); - for (i = s->connections->len; i-- > 0; ) { - manda_server_connection *con = g_ptr_array_index(s->connections, i); - con->delete_later = TRUE; - s->callbacks->closed_connection(s->data, con); - if (con->refcount == 0) manda_server_connection_free(con); + while (s->connections->len > 0) { + manda_server_connection *con = g_ptr_array_index(s->connections, 0); + manda_server_con_close(con); } - g_ptr_array_set_size(s->connections, 0); } -void manda_server_release(manda_server *s) { - if (NULL == s) return; - if (0 < --s->refcount) return; - +static void manda_server_free(manda_server *s) { manda_server_close(s); - g_array_free(s->sockets, TRUE); + g_ptr_array_free(s->sockets, TRUE); g_ptr_array_free(s->connections, TRUE); g_slice_free(manda_server, s); } -void manda_server_add_socket(manda_server *s, int fd, gpointer data); - -static void manda_server_connection_free(manda_server_connection *con) { - /* TODO */ - g_slice_free(manda_server_connection, con); +void manda_server_acquire(manda_server *s) { + ENTER(s); } -void manda_server_con_close(manda_server_connection *con); +void manda_server_release(manda_server *s) { + if (NULL == s) return; -static void manda_server_backend_release(manda_server_connection *con, guint32 id) { + LEAVE(s, manda_server_free); +} + +static void manda_server_release_backend(manda_server_connection *con, guint32 id) { manda_server_backend_use *use; manda_server_backend *b; + if (NULL == con->backends) return; + if (id >= con->backends->len) return; + if (manda_idlist_is_used(con->idlist, id)) { + /* notify client */ + send_server_release_backend(con->con, id, CONST_STR_LEN("lost backend")); + } + use = g_ptr_array_index(con->backends, id); if (NULL == use) return; + g_ptr_array_index(con->backends, id) = NULL; b = use->backend; if (use->ndx != b->usage->len - 1) { @@ -722,14 +922,191 @@ static void manda_server_backend_release(manda_server_connection *con, guint32 i b->sum_last_load -= use->last_load; g_ptr_array_set_size(b->usage, b->usage->len-1); - con->refcount++; + LEAVE(b, sbackend_free); - con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use); + if (NULL != con->srv && NULL != con->srv->callbacks->release_backend) { + con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use); + } g_slice_free(manda_server_backend_use, use); } -manda_server_backend *manda_server_backend_new(gpointer data, GString *addr); -void manda_server_backend_free(manda_server_backend *backend); +manda_server_backend* manda_server_backend_new(gpointer data, GString *addr) { + manda_server_backend *backend = g_slice_new0(manda_server_backend); -void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend); -void manda_server_drop_backend(manda_server_backend *backend); + backend->data = data; + backend->usage = g_ptr_array_new(); + backend->sum_last_load = 0; + backend->addr = addr; + backend->refcount = 1; + + return backend; +} + +static void sbackend_free(manda_server_backend *backend) { + ENTER(backend); /* don't want to enter sbackend_free again */ + + manda_server_drop_backend(backend); + + g_ptr_array_free(backend->usage, TRUE); + backend->usage = NULL; + g_string_free(backend->addr, TRUE); + backend->addr = NULL; + + g_slice_free(manda_server_backend, backend); +} + +void manda_server_backend_release(manda_server_backend *backend) { + if (NULL == backend) return; + + LEAVE(backend, sbackend_free); +} +void manda_server_backend_acquire(manda_server_backend *backend) { + ENTER(backend); +} + +gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend) { + manda_server_backend_use *use; + gint i; + + if (NULL == con->con || con->con->closed) return FALSE; + + i = manda_idlist_get(con->idlist); + if (-1 == i) { + send_server_bind_backend_failed(con->con, reqid, CONST_STR_LEN("no free backend id available")); + return FALSE; + } + assert(i > 0); + + use = g_slice_new0(manda_server_backend_use); + use->con = con; + use->backend_id = i; + use->last_load = use->last_workers = 0; + use->backend = backend; + use->ndx = backend->usage->len; + g_ptr_array_add(backend->usage, use); + if (use->backend_id >= con->backends->len) g_ptr_array_set_size(con->backends, use->backend_id + 1); + g_ptr_array_index(con->backends, use->backend_id) = use; + + ENTER(backend); + + return send_server_bind_backend(con->con, reqid, use->backend_id, GSTR_LEN(backend->addr)); +} + +void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg) { + send_server_bind_backend_failed(con->con, reqid, GSTR_LEN(errmsg)); +} + +void manda_server_drop_backend(manda_server_backend *backend) { + ENTER(backend); + + while (backend->usage->len > 0) { + manda_server_backend_use *u = g_ptr_array_index(backend->usage, 0); + manda_server_release_backend(u->con, u->backend_id); + } + + LEAVE(backend, sbackend_free); +} + +static void sbackend_update(manda_server_connection *con, guint32 backend_id, guint32 load, guint32 workers) { + manda_server_backend_use *use; + manda_server_backend *b; + + if (NULL == con->backends) return; + + if (backend_id >= con->backends->len) return; + + use = g_ptr_array_index(con->backends, backend_id); + if (NULL == use) return; + + b = use->backend; + + b->sum_last_load += load - use->last_load; + use->last_load = load; + use->last_workers = workers; + + if (NULL != con->srv->callbacks->update_backend) { + con->srv->callbacks->update_backend(con->srv->data, b, use->ndx); + } +} + + +static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *payload) { + manda_server_connection *scon = con->priv_data; + manda_server *srv; + guint pos = 0; + + UNUSED(orig_command); UNUSED(orig_data); + + if (con->closed || NULL == scon || NULL == scon->srv) return; + + srv = scon->srv; + + ENTER(con); + ENTER(scon); + ENTER(srv); + + switch (mesg_command) { + case MANDA_CMD_BIND_BACKEND: { + GString *name = NULL; + + if (NULL == payload) goto error; + name = g_string_sized_new(payload->len - 2); + if (!read_net_string(payload, &pos, name)) { g_string_free(name, TRUE); goto error; } + if (pos != payload->len) { g_string_free(name, TRUE); goto error; } + + srv->callbacks->bind_backend(srv->data, scon, name, mesg_req_id); + + g_string_free(name, TRUE); + } + break; + case MANDA_CMD_RELEASE_BACKEND: { + guint32 backend_id; + + if (NULL == payload) goto error; + if (!read_net_uint32(payload, &pos, &backend_id)) goto error; + if (pos != payload->len) goto error; + + if (backend_id == 0) goto error; /* invalid id */ + + if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error; + manda_idlist_put(scon->idlist, backend_id); + + manda_server_release_backend(scon, backend_id); + } + break; + case MANDA_CMD_UPDATE_BACKEND: { + guint32 backend_id, load, workers; + + if (NULL == payload) goto error; + if (!read_net_uint32(payload, &pos, &backend_id)) goto error; + if (!read_net_uint32(payload, &pos, &load)) goto error; + if (!read_net_uint32(payload, &pos, &workers)) goto error; + if (pos != payload->len) goto error; + + if (backend_id == 0) goto error; /* invalid id */ + + if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error; + + sbackend_update(scon, backend_id, load, workers); + } + break; + + case MANDA_CMD_UNKNOWN_COMMAND: + /* we require all our commands to be handled: */ + goto error; + + default: + send_unkown_command(con, mesg_command, mesg_req_id); + break; + } + + goto out; + +error: + manda_server_con_close(scon); + +out: + LEAVE(srv, manda_server_free); + LEAVE(scon, scon_free); + LEAVE(con, _con_free); +} diff --git a/libmanda.h b/libmanda.h index a24a8be..f321a42 100644 --- a/libmanda.h +++ b/libmanda.h @@ -29,6 +29,8 @@ typedef struct manda_client_backend_callbacks manda_client_backend_callbacks; typedef struct manda_client_backend manda_client_backend; typedef struct manda_client manda_client; +typedef struct manda_connection manda_connection; + typedef void (*manda_fd_watcher_cb)(manda_fd_watcher *watcher); typedef void (*manda_new_fd_watcher)(gpointer srv, manda_fd_watcher *watcher); @@ -78,20 +80,20 @@ struct manda_timeout { /* Server API */ -typedef void (*manda_server_new_connection)(gpointer srv, manda_server_connection *con); -typedef void (*manda_server_closed_connection)(gpointer srv, manda_server_connection *con); +typedef void (*manda_server_new_connection_cb)(gpointer srv, manda_server_connection *con); +typedef void (*manda_server_closed_connection_cb)(gpointer srv, manda_server_connection *con); -typedef void (*manda_server_bind_backend)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid); -typedef void (*manda_server_update_backend)(gpointer srv, manda_server_backend *backend, guint ndx); -typedef void (*manda_server_release_backend)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use); +typedef void (*manda_server_bind_backend_cb)(gpointer srv, manda_server_connection *con, GString *name, guint16 reqid); +typedef void (*manda_server_update_backend_cb)(gpointer srv, manda_server_backend *backend, guint ndx); +typedef void (*manda_server_release_backend_cb)(gpointer srv, manda_server_backend *backend, guint old_ndx, manda_server_backend_use *old_use); struct manda_server_callbacks { - manda_server_new_connection new_connection; - manda_server_closed_connection closed_connection; + manda_server_new_connection_cb new_connection; + manda_server_closed_connection_cb closed_connection; - manda_server_bind_backend bind_backend; - manda_server_update_backend update_backend; - manda_server_release_backend release_backend; + manda_server_bind_backend_cb bind_backend; + manda_server_update_backend_cb update_backend; + manda_server_release_backend_cb release_backend; }; struct manda_server_connection { @@ -100,7 +102,8 @@ struct manda_server_connection { /* private from here */ gint refcount; - gboolean delete_later; + + manda_connection *con; GPtrArray *backends; /* manda_server_backend_use */ manda_IDList *idlist; @@ -110,7 +113,7 @@ struct manda_server_backend_use { manda_server_connection *con; guint32 backend_id; - guint32 last_load, last_backends; + guint32 last_load, last_workers; /* private from here */ manda_server_backend *backend; @@ -126,7 +129,6 @@ struct manda_server_backend { /* private from here */ gint refcount; - gboolean delete_later; }; struct manda_server { @@ -138,7 +140,7 @@ struct manda_server { const manda_async_ctrl *ctrl; const manda_server_callbacks *callbacks; - GArray *sockets; + GPtrArray *sockets; }; manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks); @@ -153,10 +155,12 @@ void manda_server_add_socket(manda_server *s, int fd, gpointer data); void manda_server_con_close(manda_server_connection *con); manda_server_backend *manda_server_backend_new(gpointer data, GString *addr); -void manda_server_backend_free(manda_server_backend *backend); +void manda_server_backend_acquire(manda_server_backend *backend); +void manda_server_backend_release(manda_server_backend *backend); -void manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend); -void manda_server_drop_backend(manda_server_backend *backend); +gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend); +void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg); +void manda_server_drop_backend(manda_server_backend *backend); /* tell all users that the backend is gone */ /* Client API */