You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1521 lines
37 KiB
1521 lines
37 KiB
|
|
#include "libmanda.h" |
|
#include "libmanda-protocol.h" |
|
#include "idlist.h" |
|
|
|
#include <arpa/inet.h> |
|
#include <errno.h> |
|
#include <sys/socket.h> |
|
#include <fcntl.h> |
|
#include <unistd.h> |
|
#include <string.h> |
|
|
|
#include <assert.h> |
|
|
|
#define TIMEOUT_STEP (3) |
|
|
|
#define ENTER(x) do { ++((x)->refcount); } while(0) |
|
#define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0) |
|
|
|
#define UNUSED(x) ((void)(x)) |
|
|
|
#define CONST_STR_LEN(x) (x), (sizeof(x) - 1) |
|
#define GSTR_LEN(x) ((x) ? (x)->str : ""), ((x) ? (x)->len : 0) |
|
|
|
#if __GNUC__ |
|
# define INLINE static inline |
|
#else |
|
# define INLINE static |
|
#endif |
|
|
|
typedef struct messageheader messageheader; |
|
typedef struct request request; |
|
|
|
typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload); |
|
typedef void (*con_close_cb)(manda_connection *con); |
|
|
|
struct messageheader { |
|
guint16 command, size, reqid, respid; |
|
}; |
|
|
|
struct request { |
|
guint16 command; /* zero means slot is free */ |
|
double timeout, timeout_step; |
|
|
|
guint16 timeout_prev_id, timeout_next_id; |
|
|
|
gpointer data; |
|
}; |
|
|
|
struct manda_connection { |
|
gpointer data; /* application data */ |
|
gpointer priv_data; /* internal data */ |
|
const manda_async_ctrl *ctrl; |
|
|
|
con_message_cb message_cb; |
|
con_close_cb close_cb; |
|
|
|
int fd; |
|
gboolean closed; |
|
|
|
manda_fd_watcher fd_watcher; |
|
manda_timeout req_timeout; |
|
|
|
manda_IDList *request_ids; |
|
GArray *requests; |
|
guint16 timeout_first_id, timeout_last_id; |
|
|
|
guint cur_header_pos; /* how many bytes of the header we have */ |
|
guint8 cur_header_buf[8]; |
|
messageheader cur_header; |
|
guint cur_payload_pos; |
|
GByteArray *cur_payload; |
|
|
|
guint cur_send_pos; |
|
GQueue send_queue; |
|
|
|
gint refcount; |
|
}; |
|
|
|
static void fd_no_block(int fd) { |
|
#ifdef O_NONBLOCK |
|
fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR); |
|
#elif defined _WIN32 |
|
int i = 1; |
|
ioctlsocket(fd, FIONBIO, &i); |
|
#else |
|
#error No way found to set non-blocking mode for fds. |
|
#endif |
|
} |
|
|
|
static void fd_init(int fd) { |
|
#ifdef FD_CLOEXEC |
|
/* close fd on exec (cgi) */ |
|
fcntl(fd, F_SETFD, FD_CLOEXEC); |
|
#endif |
|
fd_no_block(fd); |
|
} |
|
|
|
/* -------------------------- */ |
|
/* Message building / parsing */ |
|
/* -------------------------- */ |
|
|
|
INLINE guint8 _read_net_uint8(const guint8* buf) { |
|
return *buf; |
|
} |
|
INLINE guint16 _read_net_uint16(const guint8* buf) { |
|
guint16 i; |
|
memcpy(&i, buf, sizeof(i)); |
|
return ntohs(i); |
|
} |
|
INLINE guint32 _read_net_uint32(const guint8* buf) { |
|
guint32 i; |
|
memcpy(&i, buf, sizeof(i)); |
|
return ntohl(i); |
|
} |
|
|
|
INLINE void _write_net_uint8(guint8 *buf, guint8 val) { |
|
*buf = val; |
|
} |
|
INLINE void _write_net_uint16(guint8 *buf, guint16 val) { |
|
val = htons(val); |
|
memcpy(buf, &val, sizeof(val)); |
|
} |
|
INLINE void _write_net_uint32(guint8 *buf, guint32 val) { |
|
val = htons(val); |
|
memcpy(buf, &val, sizeof(val)); |
|
} |
|
|
|
INLINE gboolean read_net_uint8(GByteArray *buf, guint *pos, guint8 *dest) { |
|
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) { |
|
/* end of buffer */ |
|
*dest = 0; |
|
*pos = buf->len; |
|
return FALSE; |
|
} |
|
*dest = _read_net_uint8(buf->data + *pos); |
|
*pos += sizeof(*dest); |
|
return TRUE; |
|
} |
|
INLINE gboolean read_net_uint16(GByteArray *buf, guint *pos, guint16 *dest) { |
|
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) { |
|
/* end of buffer */ |
|
*dest = 0; |
|
*pos = buf->len; |
|
return FALSE; |
|
} |
|
*dest = _read_net_uint16(buf->data + *pos); |
|
*pos += sizeof(*dest); |
|
return TRUE; |
|
} |
|
INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) { |
|
if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) { |
|
/* end of buffer */ |
|
*dest = 0; |
|
*pos = buf->len; |
|
return FALSE; |
|
} |
|
*dest = _read_net_uint32(buf->data + *pos); |
|
*pos += sizeof(*dest); |
|
return TRUE; |
|
} |
|
INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) { |
|
guint16 slen; |
|
|
|
g_string_truncate(dest, 0); |
|
if (!read_net_uint16(buf, pos, &slen)) return FALSE; |
|
if (buf->len < slen || *pos >= buf->len - slen) { |
|
*pos = buf->len; |
|
return FALSE; |
|
} |
|
g_string_set_size(dest, slen); |
|
memcpy(dest->str, buf->data + *pos, slen); |
|
*pos += slen; |
|
return TRUE; |
|
} |
|
|
|
INLINE void write_net_uint8(GByteArray *buf, guint8 val) { |
|
guint curlen = buf->len; |
|
g_byte_array_set_size(buf, curlen + sizeof(val)); |
|
_write_net_uint8(buf->data + curlen, val); |
|
} |
|
INLINE void write_net_uint16(GByteArray *buf, guint16 val) { |
|
guint curlen = buf->len; |
|
g_byte_array_set_size(buf, curlen + sizeof(val)); |
|
_write_net_uint16(buf->data + curlen, val); |
|
} |
|
INLINE void write_net_uint32(GByteArray *buf, guint32 val) { |
|
guint curlen = buf->len; |
|
g_byte_array_set_size(buf, curlen + sizeof(val)); |
|
_write_net_uint32(buf->data + curlen, val); |
|
} |
|
INLINE void write_net_string(GByteArray *buf, const gchar *str, guint len) { |
|
guint curlen = buf->len; |
|
g_byte_array_set_size(buf, curlen + sizeof(guint16) + len); |
|
_write_net_uint16(buf->data + curlen, len); |
|
memcpy(buf->data + curlen + sizeof(guint16), str, len); |
|
} |
|
|
|
|
|
/* ---------------- */ |
|
/* Basic Connection */ |
|
/* ---------------- */ |
|
|
|
static void _con_free(manda_connection *con); |
|
|
|
static void con_req_unlink(manda_connection *con, request *req) { |
|
request *prev = NULL, *next = NULL; |
|
|
|
/* unlink */ |
|
if (req->timeout_next_id != 0) { |
|
next = &g_array_index(con->requests, request, req->timeout_next_id); |
|
next->timeout_prev_id = req->timeout_prev_id; |
|
} else { |
|
con->timeout_last_id = req->timeout_prev_id; |
|
} |
|
if (req->timeout_prev_id != 0) { |
|
prev = &g_array_index(con->requests, request, req->timeout_prev_id); |
|
prev->timeout_next_id = req->timeout_next_id; |
|
} else { |
|
con->timeout_first_id = req->timeout_next_id; |
|
} |
|
req->timeout_next_id = req->timeout_prev_id = 0; |
|
} |
|
|
|
static void con_req_push(manda_connection *con, request *req, guint16 reqid) { |
|
request *prev; |
|
|
|
if (con->timeout_last_id != 0) { |
|
prev = &g_array_index(con->requests, request, con->timeout_last_id); |
|
prev->timeout_next_id = reqid; |
|
} else { |
|
con->timeout_first_id = reqid; |
|
} |
|
|
|
req->timeout_prev_id = con->timeout_last_id; |
|
con->timeout_last_id = reqid; |
|
} |
|
|
|
static void con_close(manda_connection *con) { |
|
GByteArray *buf; |
|
|
|
if (con->closed) return; |
|
|
|
con->closed = TRUE; |
|
|
|
ENTER(con); |
|
|
|
if (NULL != con->fd_watcher.priv) { |
|
con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher); |
|
con->fd_watcher.priv = NULL; |
|
} |
|
if (NULL != con->req_timeout.priv) { |
|
con->ctrl->destroy_timeout(con->data, &con->req_timeout); |
|
con->req_timeout.priv = NULL; |
|
} |
|
|
|
if (-1 != con->fd) { |
|
while (-1 == close(con->fd) && errno == EINTR) ; |
|
con->fd = -1; |
|
} |
|
|
|
while (NULL != (buf = g_queue_pop_head(&con->send_queue))) { |
|
g_byte_array_free(buf, TRUE); |
|
} |
|
con->cur_send_pos = 0; |
|
|
|
if (NULL != con->close_cb) { |
|
con->close_cb(con); |
|
} |
|
|
|
/* "timeout" all requests */ |
|
for ( ; con->timeout_first_id > 0; ) { |
|
request *req; |
|
guint16 req_command; |
|
gpointer req_data; |
|
|
|
req = &g_array_index(con->requests, request, con->timeout_first_id); |
|
|
|
req_command = req->command; |
|
req_data = req->data; |
|
|
|
con_req_unlink(con, req); |
|
/* reset request */ |
|
req->command = 0; |
|
req->timeout = 0; |
|
req->data = NULL; |
|
|
|
if (NULL != con->message_cb) { |
|
con->message_cb(con, req_command, req_data, 0, 0, NULL); |
|
} |
|
} |
|
|
|
if (NULL != con->request_ids) { |
|
manda_idlist_free(con->request_ids); |
|
con->request_ids = NULL; |
|
} |
|
|
|
if (NULL != con->requests) { |
|
g_array_free(con->requests, TRUE); |
|
con->requests = NULL; |
|
} |
|
|
|
if (NULL != con->cur_payload) { |
|
g_byte_array_free(con->cur_payload, TRUE); |
|
con->cur_payload = NULL; |
|
} |
|
|
|
LEAVE(con, _con_free); |
|
} |
|
|
|
static void _con_free(manda_connection *con) { |
|
ENTER(con); /* don't want to enter _con_free again */ |
|
|
|
con_close(con); |
|
|
|
g_slice_free(manda_connection, con); |
|
} |
|
|
|
static void con_free(manda_connection *con) { |
|
LEAVE(con, _con_free); |
|
} |
|
|
|
static void con_handle_response(manda_connection *con) { |
|
GByteArray *payload = con->cur_payload; |
|
guint16 orig_command = 0, mesg_command = con->cur_header.command, resp_to = con->cur_header.respid, req_id = con->cur_header.reqid; |
|
gpointer orig_data = NULL; |
|
|
|
con->cur_header_pos = 0; |
|
con->cur_payload_pos = 0; |
|
con->cur_payload = NULL; |
|
|
|
if (0 != resp_to) { |
|
request *req; |
|
|
|
if (!manda_idlist_is_used(con->request_ids, resp_to)) { |
|
/* protocol error */ |
|
con_close(con); |
|
goto clean; |
|
} |
|
|
|
manda_idlist_put(con->request_ids, resp_to); |
|
|
|
/* try to find request data; if we can't find it the timeout already triggered */ |
|
if (resp_to >= con->requests->len) goto clean; |
|
req = &g_array_index(con->requests, request, resp_to); |
|
if (0 == req->command) goto clean; /* "empty" request */ |
|
|
|
orig_command = req->command; |
|
orig_data = req->data; |
|
|
|
/* reset request */ |
|
con_req_unlink(con, req); |
|
req->command = 0; |
|
req->timeout = req->timeout_step = 0; |
|
req->data = NULL; |
|
} |
|
|
|
if (NULL != con->message_cb) { |
|
con->message_cb(con, orig_command, orig_data, mesg_command, req_id, payload); |
|
} |
|
|
|
clean: |
|
if (NULL != payload) g_byte_array_free(payload, TRUE); |
|
} |
|
|
|
static void con_fd_watcher_update(manda_connection *con) { |
|
if (con->fd != -1) { |
|
int events = (con->send_queue.length > 0) ? MANDA_FD_READ | MANDA_FD_WRITE : MANDA_FD_READ; |
|
if (events != con->fd_watcher.events) { |
|
con->fd_watcher.events = events; |
|
con->ctrl->update_fd_watcher(con->data, &con->fd_watcher); |
|
} |
|
} |
|
} |
|
|
|
static void con_fd_watcher_cb(manda_fd_watcher *watcher) { |
|
manda_connection *con = watcher->priv; |
|
guint i; |
|
|
|
ENTER(con); |
|
|
|
/* handle read */ |
|
for ( i = 0 ; (!con->closed) && (i < 100) ; i++ ) { |
|
if (con->cur_header_pos < 8) { |
|
ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos); |
|
|
|
if (r < 0) { |
|
switch (errno) { |
|
case EINTR: |
|
continue; |
|
case EAGAIN: |
|
#if EWOULDBLOCK != EAGAIN |
|
case EWOULDBLOCK: |
|
#endif |
|
break; |
|
case ECONNRESET: /* "eof" */ |
|
con_close(con); |
|
goto out; |
|
default: |
|
con_close(con); |
|
goto out; |
|
} |
|
break; |
|
} else if (r == 0) { /* eof */ |
|
con_close(con); |
|
goto out; |
|
} else { |
|
con->cur_header_pos += r; |
|
} |
|
} |
|
|
|
if (con->cur_header_pos < 8) break; |
|
|
|
/* parse header */ |
|
con->cur_header.command = _read_net_uint16(con->cur_header_buf + 0); |
|
con->cur_header.size = _read_net_uint16(con->cur_header_buf + 2); |
|
con->cur_header.reqid = _read_net_uint16(con->cur_header_buf + 4); |
|
con->cur_header.respid = _read_net_uint16(con->cur_header_buf + 6); |
|
|
|
if (con->cur_header.size < 8) { |
|
/* error */ |
|
con_close(con); |
|
} else if (con->cur_header.size > 8) { |
|
ssize_t r; |
|
|
|
if (!con->cur_payload) { |
|
con->cur_payload = g_byte_array_sized_new(con->cur_header.size - 8); |
|
g_byte_array_set_size(con->cur_payload, con->cur_header.size - 8); |
|
con->cur_payload_pos = 0; |
|
} |
|
|
|
r = read(con->fd, &con->cur_payload->data[con->cur_payload_pos], con->cur_payload->len - con->cur_payload_pos); |
|
|
|
if (r < 0) { |
|
switch (errno) { |
|
case EINTR: |
|
continue; |
|
case EAGAIN: |
|
#if EWOULDBLOCK != EAGAIN |
|
case EWOULDBLOCK: |
|
#endif |
|
break; |
|
case ECONNRESET: /* "eof" */ |
|
con_close(con); |
|
goto out; |
|
default: |
|
con_close(con); |
|
goto out; |
|
} |
|
break; |
|
} else if (r == 0) { /* eof */ |
|
con_close(con); |
|
goto out; |
|
} else { |
|
con->cur_payload_pos += r; |
|
} |
|
|
|
if (con->cur_payload_pos < con->cur_payload->len) break; |
|
|
|
con_handle_response(con); |
|
} else { |
|
con_handle_response(con); |
|
} |
|
} |
|
|
|
for ( i = 0 ; (!con->closed) && (i < 100) && (con->send_queue.length > 0) ; i++ ) { |
|
GByteArray *buf = g_queue_peek_head(&con->send_queue); |
|
ssize_t written; |
|
|
|
written = write(con->fd, &buf->data[con->cur_send_pos], buf->len - con->cur_send_pos); |
|
if (written < 0) { |
|
switch (errno) { |
|
case EINTR: |
|
continue; |
|
case EAGAIN: |
|
#if EWOULDBLOCK != EAGAIN |
|
case EWOULDBLOCK: |
|
#endif |
|
break; |
|
case ECONNRESET: |
|
case EPIPE: |
|
con_close(con); |
|
goto out; |
|
default: /* Fatal error/remote close, connection has to be closed */ |
|
con_close(con); |
|
goto out; |
|
} |
|
break; |
|
} else { |
|
con->cur_send_pos += written; |
|
} |
|
|
|
if (con->cur_send_pos == buf->len) { |
|
con->cur_send_pos = 0; |
|
g_queue_pop_head(&con->send_queue); |
|
g_byte_array_free(buf, TRUE); |
|
} else { |
|
break; |
|
} |
|
} |
|
|
|
con_fd_watcher_update(con); |
|
|
|
out: |
|
LEAVE(con, _con_free); |
|
} |
|
|
|
static void con_timeout_cb(manda_timeout *timeout) { |
|
manda_connection *con = timeout->priv; |
|
|
|
double now = con->ctrl->get_time(con->data); |
|
|
|
for ( ; con->timeout_first_id != 0; ) { |
|
request *req; |
|
guint16 req_command; |
|
gpointer req_data; |
|
guint16 reqid = con->timeout_first_id; |
|
|
|
req = &g_array_index(con->requests, request, con->timeout_first_id); |
|
|
|
if (req->timeout_step > now) break; |
|
|
|
if (req->timeout > now) { |
|
req->timeout_step = now + TIMEOUT_STEP; |
|
if (req->timeout_step > req->timeout) req->timeout_step = req->timeout; |
|
|
|
/* requeue */ |
|
con_req_unlink(con, req); |
|
con_req_push(con, req, reqid); |
|
|
|
continue; |
|
} |
|
|
|
req_command = req->command; |
|
req_data = req->data; |
|
|
|
/* reset request */ |
|
con_req_unlink(con, req); |
|
req->command = 0; |
|
req->timeout = req->timeout_step = 0; |
|
req->data = NULL; |
|
|
|
if (NULL != con->message_cb) { |
|
con->message_cb(con, req_command, req_data, 0, 0, NULL); |
|
} |
|
} |
|
|
|
if (con->timeout_first_id != 0) { |
|
request *req; |
|
|
|
req = &g_array_index(con->requests, request, con->timeout_first_id); |
|
|
|
con->req_timeout.timeout = req->timeout_step; |
|
con->ctrl->start_timeout(con->data, &con->req_timeout); |
|
} |
|
} |
|
|
|
static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpointer priv_data, con_message_cb message_cb, con_close_cb close_cb, int fd) { |
|
manda_connection *con = g_slice_new0(manda_connection); |
|
gint first_id; |
|
|
|
con->data = srv; |
|
con->ctrl = ctrl; |
|
con->priv_data = priv_data; |
|
con->message_cb = message_cb; |
|
con->close_cb = close_cb; |
|
con->fd = fd; |
|
|
|
con->fd_watcher.priv = con; |
|
con->fd_watcher.callback = con_fd_watcher_cb; |
|
con->fd_watcher.events = MANDA_FD_READ; |
|
con->fd_watcher.fd = fd; |
|
|
|
con->req_timeout.priv = con; |
|
con->req_timeout.callback = con_timeout_cb; |
|
con->req_timeout.timeout = 0; |
|
|
|
con->request_ids = manda_idlist_new(65536); |
|
/* id 0 is reserved so request it here */ |
|
first_id = manda_idlist_get(con->request_ids); |
|
assert(0 == first_id); |
|
|
|
con->requests = g_array_new(FALSE, TRUE, sizeof(request)); |
|
|
|
con->ctrl->new_fd_watcher(con->data, &con->fd_watcher); |
|
con->ctrl->update_fd_watcher(con->data, &con->fd_watcher); |
|
con->ctrl->new_timeout(con->data, &con->req_timeout); |
|
|
|
return con; |
|
} |
|
|
|
static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id, guint16 resp_id) { |
|
_write_net_uint16(payload->data + 0, command); |
|
_write_net_uint16(payload->data + 2, payload->len); |
|
_write_net_uint16(payload->data + 4, req_id); |
|
_write_net_uint16(payload->data + 6, resp_id); |
|
} |
|
|
|
/* payload needs to be prefixed with 8 dummy bytes for the header */ |
|
static gboolean con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) { |
|
double now; |
|
gint reqid; |
|
request *req; |
|
gboolean res = TRUE; |
|
|
|
ENTER(con); |
|
|
|
if (con->closed) { |
|
/* connection closed */ |
|
goto error; |
|
} |
|
|
|
if (payload->len > 65535 || payload->len < 8 || 0 == command || wait_timeout <= 0) { |
|
/* payload too big / invalid parameters */ |
|
goto error; |
|
} |
|
|
|
reqid = manda_idlist_get(con->request_ids); |
|
|
|
if (-1 == reqid) goto error; /* no free request id available */ |
|
|
|
assert(reqid > 0 && reqid < 65536); |
|
|
|
if ((guint) reqid > con->requests->len) { |
|
g_array_set_size(con->requests, reqid+1); |
|
} |
|
req = &g_array_index(con->requests, request, reqid); |
|
|
|
now = con->ctrl->get_time(con->data); |
|
req->timeout = now + wait_timeout;; |
|
req->timeout_step = now + TIMEOUT_STEP; |
|
req->data = data; |
|
req->command = command; |
|
|
|
if (req->timeout_step > req->timeout) req->timeout_step = req->timeout; |
|
|
|
if (con->timeout_first_id == 0) { |
|
con->req_timeout.timeout = req->timeout_step; |
|
con->ctrl->start_timeout(con->data, &con->req_timeout); |
|
} |
|
|
|
con_fix_header(payload, command, reqid, resp_id); |
|
con_req_push(con, req, reqid); |
|
|
|
g_queue_push_tail(&con->send_queue, payload); |
|
con_fd_watcher_update(con); |
|
|
|
goto out; |
|
|
|
error: |
|
res = FALSE; |
|
|
|
g_byte_array_free(payload, TRUE); |
|
|
|
out: |
|
LEAVE(con, _con_free); |
|
|
|
return res; |
|
} |
|
|
|
/* payload needs to be prefixed with 8 dummy bytes for the header */ |
|
static gboolean con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) { |
|
gboolean res = TRUE; |
|
|
|
ENTER(con); |
|
|
|
if (con->closed) { |
|
/* connection closed */ |
|
goto error; |
|
} |
|
|
|
if (payload->len > 65535 || payload->len < 8 || 0 == command) { |
|
/* payload too big / invalid parameters */ |
|
goto error; |
|
} |
|
|
|
con_fix_header(payload, command, 0, resp_id); |
|
|
|
g_queue_push_tail(&con->send_queue, payload); |
|
con_fd_watcher_update(con); |
|
|
|
goto out; |
|
|
|
error: |
|
g_byte_array_free(payload, TRUE); |
|
res = FALSE; |
|
|
|
out: |
|
LEAVE(con, _con_free); |
|
|
|
return res; |
|
} |
|
|
|
/* message construction helper */ |
|
|
|
static GByteArray* new_payload() { |
|
GByteArray* buf = g_byte_array_new(); |
|
g_byte_array_set_size(buf, 8); |
|
return buf; |
|
} |
|
|
|
static gboolean send_unkown_command(manda_connection *con, guint16 cmd, guint16 resp_id) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_uint16(payload, cmd); |
|
return con_send_notify(con, payload, MANDA_CMD_UNKNOWN_COMMAND, resp_id); |
|
} |
|
|
|
static gboolean send_server_bind_backend(manda_connection *con, guint16 resp_id, guint32 backend_id, const gchar *str, guint len) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_uint32(payload, backend_id); |
|
write_net_string(payload, str, len); |
|
return con_send_notify(con, payload, MANDA_CMD_BIND_BACKEND, resp_id); |
|
} |
|
|
|
static gboolean send_server_bind_backend_failed(manda_connection *con, guint16 resp_id, const gchar *str, guint len) { |
|
return send_server_bind_backend(con, resp_id, 0, str, len); |
|
} |
|
|
|
static gboolean send_server_release_backend(manda_connection *con, guint32 backend_id, const gchar *str, guint len) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_uint32(payload, backend_id); |
|
write_net_string(payload, str, len); |
|
return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0); |
|
} |
|
|
|
static gboolean send_client_bind_backend(manda_connection *con, const gchar *name, guint len, gpointer data, double wait_timeout) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_string(payload, name, len); |
|
return con_send_request(con, payload, MANDA_CMD_BIND_BACKEND, 0, data, wait_timeout); |
|
} |
|
|
|
static gboolean send_client_release_backend(manda_connection *con, guint32 backend_id) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_uint32(payload, backend_id); |
|
return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0); |
|
} |
|
|
|
static gboolean send_client_update_backend(manda_connection *con, guint32 backend_id, guint32 load, guint32 workers) { |
|
GByteArray *payload; |
|
if (NULL == con) return FALSE; |
|
|
|
payload = new_payload(); |
|
write_net_uint32(payload, backend_id); |
|
write_net_uint32(payload, load); |
|
write_net_uint32(payload, workers); |
|
return con_send_notify(con, payload, MANDA_CMD_UPDATE_BACKEND, 0); |
|
} |
|
|
|
/* server connection */ |
|
|
|
typedef struct server_socket server_socket; |
|
struct server_socket { |
|
manda_server *srv; |
|
|
|
int fd; |
|
gpointer data; |
|
|
|
manda_fd_watcher fd_watcher; |
|
}; |
|
|
|
static void scon_con_closed_cb(manda_connection *con); |
|
static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload); |
|
static void scon_release(manda_server_connection *con); |
|
|
|
static void sbackend_free(manda_server_backend *backend); |
|
|
|
static void manda_server_release_backend(manda_server_connection *con, guint32 id); |
|
|
|
static void server_listening_cb(manda_fd_watcher *fd_watcher) { |
|
server_socket *sock = fd_watcher->priv; |
|
manda_server *s = sock->srv; |
|
int fd; |
|
int i; |
|
manda_server_connection *con; |
|
|
|
fd = accept(sock->fd, NULL, NULL); |
|
|
|
if (-1 == fd) return; |
|
|
|
fd_init(fd); |
|
|
|
con = g_slice_new0(manda_server_connection); |
|
con->srv = s; |
|
con->refcount = 1; |
|
con->backends = g_ptr_array_new(); |
|
con->idlist = manda_idlist_new(65536); |
|
i = manda_idlist_get(con->idlist); /* 0 is an invalid backend id, reserve it */ |
|
assert(i == 0); |
|
con->con = con_new(s->data, s->ctrl, con, scon_con_message_cb, scon_con_closed_cb, fd); |
|
} |
|
|
|
static void scon_free(manda_server_connection *con) { |
|
ENTER(con); /* don't want to enter scon_free again */ |
|
|
|
manda_server_con_close(con); |
|
|
|
g_slice_free(manda_server_connection, con); |
|
} |
|
static void scon_release(manda_server_connection *con) { |
|
LEAVE(con, scon_free); |
|
} |
|
|
|
void manda_server_con_close(manda_server_connection *con) { |
|
guint i; |
|
manda_connection *bcon = con->con; |
|
|
|
if (NULL == bcon) return; |
|
|
|
ENTER(con); |
|
|
|
g_ptr_array_remove_fast(con->srv->connections, con); |
|
scon_release(con); |
|
|
|
con->con = NULL; |
|
bcon->close_cb = NULL; |
|
con_close(bcon); |
|
con_free(bcon); |
|
|
|
if (NULL != con->srv->callbacks->closed_connection) { |
|
con->srv->callbacks->closed_connection(con->srv->data, con); |
|
} |
|
|
|
for (i = 0; i < con->backends->len; i++) { |
|
manda_server_backend_use *use = g_ptr_array_index(con->backends, i); |
|
if (NULL == use) continue; |
|
manda_server_release_backend(con, i); |
|
} |
|
|
|
g_ptr_array_free(con->backends, TRUE); |
|
con->backends = NULL; |
|
manda_idlist_free(con->idlist); |
|
con->idlist = NULL; |
|
|
|
LEAVE(con, scon_free); |
|
} |
|
|
|
static void scon_con_closed_cb(manda_connection *con) { |
|
manda_server_connection *scon = con->priv_data; |
|
|
|
manda_server_con_close(scon); |
|
} |
|
|
|
void manda_server_add_socket(manda_server *s, int fd, gpointer data) { |
|
server_socket *sock = g_slice_new0(server_socket); |
|
|
|
sock->srv = s; |
|
sock->fd = fd; |
|
sock->data = data; |
|
sock->fd_watcher.callback = server_listening_cb; |
|
sock->fd_watcher.events = MANDA_FD_READ; |
|
sock->fd_watcher.fd = fd; |
|
sock->fd_watcher.priv = sock; |
|
|
|
s->ctrl->new_fd_watcher(s->data, &sock->fd_watcher); |
|
s->ctrl->update_fd_watcher(s->data, &sock->fd_watcher); |
|
|
|
g_ptr_array_add(s->sockets, sock); |
|
} |
|
|
|
static void manda_server_free_socket(server_socket *sock) { |
|
manda_server *s = sock->srv; |
|
|
|
s->ctrl->destroy_fd_watcher(s->data, &sock->fd_watcher); |
|
|
|
while (-1 == close(sock->fd) && errno == EINTR) ; |
|
sock->fd = -1; |
|
|
|
g_slice_free(server_socket, sock); |
|
} |
|
|
|
manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) { |
|
manda_server *s = g_slice_new(manda_server); |
|
s->refcount = 1; |
|
s->data = srv; |
|
s->connections = g_ptr_array_new(); |
|
s->ctrl = ctrl; |
|
s->callbacks = callbacks; |
|
s->sockets = g_ptr_array_new(); |
|
|
|
return s; |
|
} |
|
|
|
void manda_server_close(manda_server *s) { |
|
guint i; |
|
|
|
for (i = 0; i < s->sockets->len; i++) { |
|
server_socket *sock = g_ptr_array_index(s->sockets, i); |
|
manda_server_free_socket(sock); |
|
} |
|
g_ptr_array_set_size(s->sockets, 0); |
|
|
|
while (s->connections->len > 0) { |
|
manda_server_connection *con = g_ptr_array_index(s->connections, 0); |
|
manda_server_con_close(con); |
|
} |
|
} |
|
|
|
static void manda_server_free(manda_server *s) { |
|
manda_server_close(s); |
|
|
|
g_ptr_array_free(s->sockets, TRUE); |
|
g_ptr_array_free(s->connections, TRUE); |
|
|
|
g_slice_free(manda_server, s); |
|
} |
|
|
|
void manda_server_acquire(manda_server *s) { |
|
ENTER(s); |
|
} |
|
|
|
void manda_server_release(manda_server *s) { |
|
if (NULL == s) return; |
|
|
|
LEAVE(s, manda_server_free); |
|
} |
|
|
|
static void manda_server_release_backend(manda_server_connection *con, guint32 id) { |
|
manda_server_backend_use *use; |
|
manda_server_backend *b; |
|
|
|
if (NULL == con->backends) return; |
|
|
|
if (id >= con->backends->len) return; |
|
|
|
if (manda_idlist_is_used(con->idlist, id)) { |
|
/* notify client */ |
|
send_server_release_backend(con->con, id, CONST_STR_LEN("lost backend")); |
|
} |
|
|
|
use = g_ptr_array_index(con->backends, id); |
|
if (NULL == use) return; |
|
g_ptr_array_index(con->backends, id) = NULL; |
|
|
|
b = use->backend; |
|
if (use->ndx != b->usage->len - 1) { |
|
manda_server_backend_use *u = g_ptr_array_index(b->usage, b->usage->len - 1); |
|
g_ptr_array_index(b->usage, use->ndx) = u; |
|
u->ndx = use->ndx; |
|
} |
|
b->sum_last_load -= use->last_load; |
|
g_ptr_array_set_size(b->usage, b->usage->len-1); |
|
|
|
LEAVE(b, sbackend_free); |
|
|
|
if (NULL != con->srv && NULL != con->srv->callbacks->release_backend) { |
|
con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use); |
|
} |
|
g_slice_free(manda_server_backend_use, use); |
|
} |
|
|
|
manda_server_backend* manda_server_backend_new(gpointer data, GString *addr) { |
|
manda_server_backend *backend = g_slice_new0(manda_server_backend); |
|
|
|
backend->data = data; |
|
backend->usage = g_ptr_array_new(); |
|
backend->sum_last_load = 0; |
|
backend->addr = addr; |
|
backend->refcount = 1; |
|
|
|
return backend; |
|
} |
|
|
|
static void sbackend_free(manda_server_backend *backend) { |
|
ENTER(backend); /* don't want to enter sbackend_free again */ |
|
|
|
manda_server_drop_backend(backend); |
|
|
|
g_ptr_array_free(backend->usage, TRUE); |
|
backend->usage = NULL; |
|
g_string_free(backend->addr, TRUE); |
|
backend->addr = NULL; |
|
|
|
g_slice_free(manda_server_backend, backend); |
|
} |
|
|
|
void manda_server_backend_release(manda_server_backend *backend) { |
|
if (NULL == backend) return; |
|
|
|
LEAVE(backend, sbackend_free); |
|
} |
|
void manda_server_backend_acquire(manda_server_backend *backend) { |
|
ENTER(backend); |
|
} |
|
|
|
gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend) { |
|
manda_server_backend_use *use; |
|
gint i; |
|
|
|
if (NULL == con->con || con->con->closed) return FALSE; |
|
|
|
i = manda_idlist_get(con->idlist); |
|
if (-1 == i) { |
|
send_server_bind_backend_failed(con->con, reqid, CONST_STR_LEN("no free backend id available")); |
|
return FALSE; |
|
} |
|
assert(i > 0); |
|
|
|
use = g_slice_new0(manda_server_backend_use); |
|
use->con = con; |
|
use->backend_id = i; |
|
use->last_load = use->last_workers = 0; |
|
use->backend = backend; |
|
use->ndx = backend->usage->len; |
|
g_ptr_array_add(backend->usage, use); |
|
if (use->backend_id >= con->backends->len) g_ptr_array_set_size(con->backends, use->backend_id + 1); |
|
g_ptr_array_index(con->backends, use->backend_id) = use; |
|
|
|
ENTER(backend); |
|
|
|
return send_server_bind_backend(con->con, reqid, use->backend_id, GSTR_LEN(backend->addr)); |
|
} |
|
|
|
void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg) { |
|
send_server_bind_backend_failed(con->con, reqid, GSTR_LEN(errmsg)); |
|
} |
|
|
|
void manda_server_drop_backend(manda_server_backend *backend) { |
|
ENTER(backend); |
|
|
|
while (backend->usage->len > 0) { |
|
manda_server_backend_use *u = g_ptr_array_index(backend->usage, 0); |
|
manda_server_release_backend(u->con, u->backend_id); |
|
} |
|
|
|
LEAVE(backend, sbackend_free); |
|
} |
|
|
|
static void sbackend_update(manda_server_connection *con, guint32 backend_id, guint32 load, guint32 workers) { |
|
manda_server_backend_use *use; |
|
manda_server_backend *b; |
|
|
|
if (NULL == con->backends) return; |
|
|
|
if (backend_id >= con->backends->len) return; |
|
|
|
use = g_ptr_array_index(con->backends, backend_id); |
|
if (NULL == use) return; |
|
|
|
b = use->backend; |
|
|
|
b->sum_last_load += load - use->last_load; |
|
use->last_load = load; |
|
use->last_workers = workers; |
|
|
|
if (NULL != con->srv->callbacks->update_backend) { |
|
con->srv->callbacks->update_backend(con->srv->data, b, use->ndx); |
|
} |
|
} |
|
|
|
|
|
static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *payload) { |
|
manda_server_connection *scon = con->priv_data; |
|
manda_server *srv; |
|
guint pos = 0; |
|
|
|
UNUSED(orig_command); UNUSED(orig_data); |
|
|
|
if (con->closed || NULL == scon || NULL == scon->srv) return; |
|
|
|
srv = scon->srv; |
|
|
|
ENTER(con); |
|
ENTER(scon); |
|
ENTER(srv); |
|
|
|
switch (mesg_command) { |
|
case MANDA_CMD_BIND_BACKEND: { |
|
GString *name = NULL; |
|
|
|
if (NULL == payload) goto error; |
|
name = g_string_sized_new(payload->len - 2); |
|
if (!read_net_string(payload, &pos, name)) { g_string_free(name, TRUE); goto error; } |
|
if (pos != payload->len) { g_string_free(name, TRUE); goto error; } |
|
|
|
srv->callbacks->bind_backend(srv->data, scon, name, mesg_req_id); |
|
|
|
g_string_free(name, TRUE); |
|
} |
|
break; |
|
case MANDA_CMD_RELEASE_BACKEND: { |
|
guint32 backend_id; |
|
|
|
if (NULL == payload) goto error; |
|
if (!read_net_uint32(payload, &pos, &backend_id)) goto error; |
|
if (pos != payload->len) goto error; |
|
|
|
if (backend_id == 0) goto error; /* invalid id */ |
|
|
|
if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error; |
|
manda_idlist_put(scon->idlist, backend_id); |
|
|
|
manda_server_release_backend(scon, backend_id); |
|
} |
|
break; |
|
case MANDA_CMD_UPDATE_BACKEND: { |
|
guint32 backend_id, load, workers; |
|
|
|
if (NULL == payload) goto error; |
|
if (!read_net_uint32(payload, &pos, &backend_id)) goto error; |
|
if (!read_net_uint32(payload, &pos, &load)) goto error; |
|
if (!read_net_uint32(payload, &pos, &workers)) goto error; |
|
if (pos != payload->len) goto error; |
|
|
|
if (backend_id == 0) goto error; /* invalid id */ |
|
|
|
if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error; |
|
|
|
sbackend_update(scon, backend_id, load, workers); |
|
} |
|
break; |
|
|
|
case MANDA_CMD_UNKNOWN_COMMAND: |
|
/* we require all our commands to be handled: */ |
|
goto error; |
|
|
|
default: |
|
send_unkown_command(con, mesg_command, mesg_req_id); |
|
break; |
|
} |
|
|
|
goto out; |
|
|
|
error: |
|
manda_server_con_close(scon); |
|
|
|
out: |
|
LEAVE(srv, manda_server_free); |
|
LEAVE(scon, scon_free); |
|
LEAVE(con, _con_free); |
|
} |
|
|
|
/* client connection */ |
|
|
|
static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload); |
|
static void client_close_cb(manda_connection *con); |
|
|
|
static void mclient_free(manda_client *c) { |
|
ENTER(c); /* don't want to enter mclient_free again */ |
|
|
|
manda_client_close(c); |
|
|
|
g_free(c->addr); |
|
c->addr = NULL; |
|
|
|
if (NULL != c->backends) { |
|
g_ptr_array_free(c->backends, TRUE); |
|
c->backends = NULL; |
|
} |
|
|
|
g_slice_free(manda_client, c); |
|
} |
|
|
|
/* TODO: start timer to reconnect if connect failed */ |
|
static void client_connect_cb(manda_fd_watcher *watcher) { |
|
manda_client *c = watcher->priv; |
|
int s = watcher->fd; |
|
|
|
struct sockaddr addr; |
|
socklen_t len; |
|
|
|
ENTER(c); |
|
|
|
c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher); |
|
c->sock_watcher.fd = -1; |
|
c->sock_fd = -1; |
|
|
|
/* create new connection: |
|
* see http://www.cyberconf.org/~cynbe/ref/nonblocking-connects.html |
|
*/ |
|
|
|
/* Check to see if we can determine our peer's address. */ |
|
len = sizeof(addr); |
|
if (getpeername(s, &addr, &len) == -1) { |
|
/* connect failed; find out why */ |
|
int err; |
|
len = sizeof(err); |
|
#ifdef SO_ERROR |
|
getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &len); |
|
#else |
|
{ |
|
char ch; |
|
errno = 0; |
|
read(s, &ch, 1); |
|
err = errno; |
|
} |
|
#endif |
|
close(s); |
|
|
|
UNUSED(err); |
|
|
|
goto out; |
|
} else { |
|
/* connect succeeded */ |
|
c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s); |
|
} |
|
|
|
out: |
|
LEAVE(c, mclient_free); |
|
} |
|
|
|
static void client_connect(manda_client *c) { |
|
int s; |
|
double now; |
|
|
|
ENTER(c); |
|
|
|
if (NULL != c->con || c->closed) goto out; |
|
if (-1 != c->sock_fd || NULL == c->addr) goto out; |
|
|
|
now = c->ctrl->get_time(c->data); |
|
if (c->last_connect_ts + 1.0 > now) goto out; /* only try connect once per second */ |
|
|
|
c->last_connect_ts = now; |
|
|
|
do { |
|
s = socket(c->addr->sa_family, SOCK_STREAM, 0); |
|
} while (-1 == s && errno == EINTR); |
|
|
|
if (-1 == s) goto out; |
|
|
|
fd_init(s); |
|
c->sock_fd = s; |
|
c->sock_watcher.fd = s; |
|
c->sock_watcher.events = 0; |
|
c->ctrl->new_fd_watcher(c->data, &c->sock_watcher); |
|
|
|
if (-1 == connect(s, c->addr, c->addrlen)) { |
|
switch (errno) { |
|
case EINPROGRESS: |
|
case EALREADY: |
|
case EINTR: |
|
c->sock_watcher.events = MANDA_FD_READ | MANDA_FD_WRITE; |
|
c->ctrl->update_fd_watcher(c->data, &c->sock_watcher); |
|
goto out; |
|
case EAGAIN: /* server overloaded */ |
|
default: |
|
c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher); |
|
close(s); |
|
c->sock_watcher.fd = -1; |
|
c->sock_fd = -1; |
|
goto out; |
|
} |
|
} |
|
|
|
c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher); |
|
c->sock_watcher.fd = -1; |
|
c->sock_fd = -1; |
|
|
|
c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s); |
|
|
|
out: |
|
LEAVE(c, mclient_free); |
|
|
|
return; |
|
} |
|
|
|
manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struct sockaddr *addr, socklen_t addrlen) { |
|
manda_client *c = g_slice_new0(manda_client); |
|
|
|
c->refcount = 1; |
|
c->closed = FALSE; |
|
c->data = srv; |
|
c->ctrl = ctrl; |
|
|
|
c->addrlen = addrlen; |
|
c->addr = g_memdup(addr, addrlen); |
|
|
|
c->sock_fd = -1; |
|
c->sock_watcher.fd = -1; |
|
c->sock_watcher.priv = c; |
|
c->sock_watcher.events = 0; |
|
c->sock_watcher.callback = client_connect_cb; |
|
|
|
client_connect(c); |
|
|
|
return c; |
|
} |
|
|
|
static void client_close(manda_client *c) { |
|
if (-1 != c->sock_fd) { |
|
c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher); |
|
close(c->sock_fd); |
|
c->sock_fd = -1; |
|
c->sock_watcher.fd = -1; |
|
} |
|
|
|
if (NULL != c->con) { |
|
c->con->close_cb = NULL; |
|
con_close(c->con); |
|
LEAVE(c->con, _con_free); |
|
c->con = NULL; |
|
} |
|
|
|
if (NULL != c->backends) { |
|
guint i; |
|
|
|
for (i = 0; i < c->backends->len; i++) { |
|
static const GString con_closed_msg = { CONST_STR_LEN("Connection closed"), 0 }; |
|
manda_client_backend *backend = g_ptr_array_index(c->backends, i); |
|
|
|
if (NULL != backend) { |
|
g_ptr_array_index(c->backends, i) = NULL; |
|
|
|
backend->id = 0; |
|
if (NULL != backend->callbacks) { |
|
backend->callbacks->lost_backend(c->data, backend, &con_closed_msg); |
|
} |
|
|
|
g_string_free(backend->addr, TRUE); |
|
g_slice_free(manda_client_backend, backend); |
|
} |
|
} |
|
|
|
g_ptr_array_set_size(c->backends, 0); |
|
} |
|
|
|
if (!c->closed) { |
|
/* TODO: timer? */ |
|
client_connect(c); |
|
} |
|
} |
|
|
|
void manda_client_close(manda_client *c) { |
|
ENTER(c); |
|
|
|
c->closed = TRUE; |
|
client_close(c); |
|
|
|
LEAVE(c, mclient_free); |
|
} |
|
|
|
void manda_client_acquire(manda_client *c) { |
|
ENTER(c); |
|
} |
|
|
|
void manda_client_release(manda_client *c) { |
|
if (NULL == c) return; |
|
|
|
LEAVE(c, mclient_free); |
|
} |
|
|
|
manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, gpointer data, const manda_client_backend_callbacks *callbacks) { |
|
manda_client_backend *backend; |
|
|
|
client_connect(c); |
|
|
|
if (NULL == c->con) return NULL; |
|
|
|
backend = g_slice_new(manda_client_backend); |
|
backend->data = data; |
|
backend->callbacks = callbacks; |
|
backend->client = c; |
|
backend->addr = NULL; |
|
backend->id = 0; |
|
|
|
if (!send_client_bind_backend(c->con, GSTR_LEN(name), backend, 5.0)) { |
|
g_slice_free(manda_client_backend, backend); |
|
return NULL; |
|
} |
|
|
|
return backend; |
|
} |
|
|
|
void manda_client_release_backend(manda_client_backend *backend) { |
|
manda_client *c = backend->client; |
|
|
|
if (backend->id != 0) { |
|
/* "active" working backend */ |
|
send_client_release_backend(c->con, backend->id); |
|
g_ptr_array_index(c->backends, backend->id) = NULL; |
|
backend->id = 0; |
|
|
|
g_string_free(backend->addr, TRUE); |
|
g_slice_free(manda_client_backend, backend); |
|
} else { |
|
/* either "dead" backend or "waiting" backend */ |
|
/* in both cases free it somewhere else */ |
|
|
|
backend->callbacks = NULL; |
|
} |
|
} |
|
|
|
void manda_client_update_backend(manda_client_backend *backend, guint32 load, guint32 workers) { |
|
manda_client *c = backend->client; |
|
|
|
if (backend->id != 0) { |
|
send_client_update_backend(c->con, backend->id, load, workers); |
|
} |
|
} |
|
|
|
static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 command, guint16 req_id, GByteArray *payload) { |
|
manda_client *c = con->priv_data; |
|
guint pos = 0; |
|
|
|
UNUSED(req_id); |
|
|
|
ENTER(c); |
|
ENTER(con); |
|
|
|
switch (command) { |
|
case 0: /* timeout/request failed */ |
|
switch (orig_command) { |
|
case MANDA_CMD_BIND_BACKEND: { |
|
manda_client_backend *backend = orig_data; |
|
|
|
backend->callbacks->lost_backend(c->data, backend, NULL); |
|
|
|
g_string_free(backend->addr, TRUE); |
|
g_slice_free(manda_client_backend, backend); |
|
} |
|
break; |
|
default: |
|
goto error; |
|
} |
|
break; |
|
|
|
case MANDA_CMD_BIND_BACKEND: |
|
switch (orig_command) { |
|
case MANDA_CMD_BIND_BACKEND: { |
|
manda_client_backend *backend = orig_data; |
|
guint32 backend_id; |
|
GString *addr = NULL; |
|
|
|
if (NULL == payload) goto bind_backend_failed; |
|
if (!read_net_uint32(payload, &pos, &backend_id)) goto bind_backend_failed; |
|
addr = g_string_sized_new(payload->len - 2 - pos); |
|
if (!read_net_string(payload, &pos, addr)) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; } |
|
if (pos != payload->len) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; } |
|
|
|
if (0 == backend_id) goto bind_backend_failed; |
|
|
|
backend->addr = addr; |
|
backend->id = backend_id; |
|
|
|
if (backend_id >= c->backends->len) g_ptr_array_set_size(c->backends, backend_id+1); |
|
g_ptr_array_index(c->backends, backend_id) = backend; |
|
|
|
if (NULL != backend->callbacks) { |
|
backend->callbacks->return_backend(c->data, backend); |
|
} else { |
|
manda_client_release_backend(backend); |
|
} |
|
|
|
goto out; |
|
|
|
bind_backend_failed: |
|
backend->callbacks->lost_backend(c->data, backend, addr); |
|
|
|
g_string_free(backend->addr, TRUE); |
|
g_slice_free(manda_client_backend, backend); |
|
|
|
if (NULL == addr) goto error; |
|
g_string_free(addr, TRUE); |
|
} |
|
break; |
|
} |
|
break; |
|
|
|
case MANDA_CMD_RELEASE_BACKEND: { |
|
manda_client_backend *backend = NULL; |
|
guint32 backend_id; |
|
GString *msg = NULL; |
|
|
|
if (NULL == payload) goto error; |
|
if (!read_net_uint32(payload, &pos, &backend_id)) goto error; |
|
msg = g_string_sized_new(payload->len - 2 - pos); |
|
if (!read_net_string(payload, &pos, msg)) { g_string_free(msg, TRUE); goto error; } |
|
if (pos != payload->len) { g_string_free(msg, TRUE); goto error; } |
|
|
|
if (0 == backend_id) { g_string_free(msg, TRUE); goto error; } |
|
|
|
if (backend_id >= c->backends->len) { g_string_free(msg, TRUE); goto out; } |
|
backend = g_ptr_array_index(c->backends, backend_id); |
|
if (NULL == backend) { g_string_free(msg, TRUE); goto out; } |
|
|
|
g_ptr_array_index(c->backends, backend_id) = NULL; |
|
backend->id = 0; |
|
|
|
if (NULL != backend->callbacks) { |
|
backend->callbacks->lost_backend(c->data, backend, msg); |
|
} |
|
|
|
g_string_free(msg, TRUE); |
|
g_string_free(backend->addr, TRUE); |
|
g_slice_free(manda_client_backend, backend); |
|
} |
|
break; |
|
|
|
case MANDA_CMD_UNKNOWN_COMMAND: |
|
goto error; |
|
} |
|
|
|
goto out; |
|
error: |
|
client_close(c); |
|
|
|
out: |
|
LEAVE(con, _con_free); |
|
LEAVE(c, mclient_free); |
|
} |
|
|
|
static void client_close_cb(manda_connection *con) { |
|
manda_client *c = con->priv_data; |
|
|
|
client_close(c); |
|
}
|
|
|