[core] dispatch events from within event framework

event framework now calls interface to dispatch events rather than
itself implementing multiple interfaces for fdevent_process() to
be able to dispatch events generically.
personal/stbuehler/fix-fdevent
Glenn Strauss 4 years ago
parent 9459c05468
commit f5f221eda8

@ -598,22 +598,6 @@ int fdevent_accept_listenfd(int listenfd, struct sockaddr *addr, size_t *addrlen
}
void fdevent_process(server *srv, fdevents *ev, int n) {
/* n is the number of events */
int ndx = -1;
do {
ndx = ev->event_next_fdndx(ev, ndx);
/* not all fdevent handlers know how many fds got an event */
if (-1 == ndx) return;
else {
fdnode *fdn = ev->fdarray[ev->event_get_fd(ev, ndx)];
if (0 == ((uintptr_t)fdn & 0x3))
(*fdn->handler)(srv, fdn->ctx, ev->event_get_revent(ev, ndx));
}
} while (--n > 0);
}
#ifdef __APPLE__
#include <crt_externs.h>
#define environ (* _NSGetEnviron())

@ -55,7 +55,6 @@ fdevent_handler fdevent_get_handler(fdevents *ev, int fd);
void * fdevent_get_context(fdevents *ev, int fd);
int fdevent_poll(fdevents *ev, int timeout_ms);
void fdevent_process(server *srv, fdevents *ev, int n);
int fdevent_register(fdevents *ev, int fd, fdevent_handler handler, void *ctx);
int fdevent_unregister(fdevents *ev, int fd);

@ -108,46 +108,17 @@ static int fdevent_freebsd_kqueue_event_set(fdevents *ev, int fde_ndx, int fd, i
return fd;
}
static int fdevent_freebsd_kqueue_poll(fdevents *ev, int timeout_ms) {
int ret;
struct timespec ts;
ts.tv_sec = timeout_ms / 1000;
ts.tv_nsec = (timeout_ms % 1000) * 1000000;
ret = kevent(ev->kq_fd,
NULL, 0,
ev->kq_results, ev->maxfds,
&ts);
if (ret == -1) {
switch(errno) {
case EINTR:
/* we got interrupted, perhaps just a SIGCHLD of a CGI script */
return 0;
default:
log_error_write(ev->srv, __FILE__, __LINE__, "SS",
"kqueue failed polling: ", strerror(errno));
break;
}
}
static int fdevent_freebsd_kqueue_event_get_revent(const fdevents *ev, size_t ndx) {
int events = 0;
int filt = ev->kq_results[ndx].filter;
int e = ev->kq_results[ndx].flags;
return ret;
}
static int fdevent_freebsd_kqueue_event_get_revent(fdevents *ev, size_t ndx) {
int events = 0, e;
int filt = e = ev->kq_results[ndx].filter;
if (e == EVFILT_READ) {
if (filt == EVFILT_READ) {
events |= FDEVENT_IN;
} else if (e == EVFILT_WRITE) {
} else if (filt == EVFILT_WRITE) {
events |= FDEVENT_OUT;
}
e = ev->kq_results[ndx].flags;
if (e & EV_EOF) {
if (filt == EVFILT_READ) {
events |= FDEVENT_RDHUP;
@ -163,14 +134,24 @@ static int fdevent_freebsd_kqueue_event_get_revent(fdevents *ev, size_t ndx) {
return events;
}
static int fdevent_freebsd_kqueue_event_get_fd(fdevents *ev, size_t ndx) {
return ev->kq_results[ndx].ident;
}
static int fdevent_freebsd_kqueue_poll(fdevents * const ev, int timeout_ms) {
server * const srv = ev->srv;
struct timespec ts;
int n;
static int fdevent_freebsd_kqueue_event_next_fdndx(fdevents *ev, int ndx) {
UNUSED(ev);
ts.tv_sec = timeout_ms / 1000;
ts.tv_nsec = (timeout_ms % 1000) * 1000000;
return (ndx < 0) ? 0 : ndx + 1;
n = kevent(ev->kq_fd, NULL, 0, ev->kq_results, ev->maxfds, &ts);
for (int i = 0; i < n; ++i) {
fdnode * const fdn = ev->fdarray[ev->kq_results[i].ident];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_freebsd_kqueue_event_get_revent(ev, i);
(*fdn->handler)(srv, fdn->ctx, revents);
}
}
return n;
}
static int fdevent_freebsd_kqueue_reset(fdevents *ev) {
@ -197,10 +178,6 @@ int fdevent_freebsd_kqueue_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
ev->kq_fd = -1;
ev->kq_results = calloc(ev->maxfds, sizeof(*ev->kq_results));

@ -135,11 +135,6 @@ struct fdevents {
int (*event_set)(struct fdevents *ev, int fde_ndx, int fd, int events);
int (*event_del)(struct fdevents *ev, int fde_ndx, int fd);
int (*event_get_revent)(struct fdevents *ev, size_t ndx);
int (*event_get_fd)(struct fdevents *ev, size_t ndx);
int (*event_next_fdndx)(struct fdevents *ev, int ndx);
int (*poll)(struct fdevents *ev, int timeout_ms);
};

@ -23,20 +23,7 @@ static void io_watcher_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (revents & EV_WRITE) r |= FDEVENT_OUT;
if (revents & EV_ERROR) r |= FDEVENT_ERR;
switch (r = (*handler)(ev->srv, context, r)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_WAIT_FOR_FD:
break;
case HANDLER_ERROR:
/* should never happen */
SEGFAULT();
break;
default:
log_error_write(ev->srv, __FILE__, __LINE__, "d", r);
break;
}
(*handler)(ev->srv, context, r);
}
static void fdevent_libev_free(fdevents *ev) {
@ -105,27 +92,6 @@ static int fdevent_libev_poll(fdevents *ev, int timeout_ms) {
return 0;
}
static int fdevent_libev_event_get_revent(fdevents *ev, size_t ndx) {
UNUSED(ev);
UNUSED(ndx);
return 0;
}
static int fdevent_libev_event_get_fd(fdevents *ev, size_t ndx) {
UNUSED(ev);
UNUSED(ndx);
return -1;
}
static int fdevent_libev_event_next_fdndx(fdevents *ev, int ndx) {
UNUSED(ev);
UNUSED(ndx);
return -1;
}
static int fdevent_libev_reset(fdevents *ev) {
UNUSED(ev);
@ -149,10 +115,6 @@ int fdevent_libev_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
if (NULL == (ev->libev_loop = ev_default_loop(0))) {
log_error_write(ev->srv, __FILE__, __LINE__, "S",
"ev_default_loop failed , try to set server.event-handler = \"poll\" or \"select\"");

@ -87,11 +87,7 @@ static int fdevent_linux_sysepoll_event_set(fdevents *ev, int fde_ndx, int fd, i
return fd;
}
static int fdevent_linux_sysepoll_poll(fdevents *ev, int timeout_ms) {
return epoll_wait(ev->epoll_fd, ev->epoll_events, ev->maxfds, timeout_ms);
}
static int fdevent_linux_sysepoll_event_get_revent(fdevents *ev, size_t ndx) {
static int fdevent_linux_sysepoll_event_get_revent(const fdevents *ev, size_t ndx) {
int events = 0, e;
e = ev->epoll_events[ndx].events;
@ -105,23 +101,17 @@ static int fdevent_linux_sysepoll_event_get_revent(fdevents *ev, size_t ndx) {
return events;
}
static int fdevent_linux_sysepoll_event_get_fd(fdevents *ev, size_t ndx) {
# if 0
log_error_write(ev->srv, __FILE__, __LINE__, "SD, D",
"fdevent_linux_sysepoll_event_get_fd: ", (int) ndx, ev->epoll_events[ndx].data.fd);
# endif
return ev->epoll_events[ndx].data.fd;
}
static int fdevent_linux_sysepoll_event_next_fdndx(fdevents *ev, int ndx) {
size_t i;
UNUSED(ev);
i = (ndx < 0) ? 0 : ndx + 1;
return i;
static int fdevent_linux_sysepoll_poll(fdevents * const ev, int timeout_ms) {
int n = epoll_wait(ev->epoll_fd, ev->epoll_events, ev->maxfds, timeout_ms);
server * const srv = ev->srv;
for (int i = 0; i < n; ++i) {
fdnode * const fdn = ev->fdarray[ev->epoll_events[i].data.fd];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_linux_sysepoll_event_get_revent(ev, i);
(*fdn->handler)(srv, fdn->ctx, revents);
}
}
return n;
}
int fdevent_linux_sysepoll_init(fdevents *ev) {
@ -135,10 +125,6 @@ int fdevent_linux_sysepoll_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
if (-1 == (ev->epoll_fd = epoll_create(ev->maxfds))) {
log_error_write(ev->srv, __FILE__, __LINE__, "SSS",
"epoll_create failed (", strerror(errno), "), try to set server.event-handler = \"poll\" or \"select\"");

@ -115,14 +115,7 @@ static int fdevent_poll_event_set(fdevents *ev, int fde_ndx, int fd, int events)
}
}
static int fdevent_poll_poll(fdevents *ev, int timeout_ms) {
#if 0
fdevent_poll_event_compress(ev);
#endif
return poll(ev->pollfds, ev->used, timeout_ms);
}
static int fdevent_poll_event_get_revent(fdevents *ev, size_t ndx) {
static int fdevent_poll_event_get_revent(const fdevents *ev, size_t ndx) {
int r, poll_r;
if (ndx >= ev->used) {
@ -155,21 +148,32 @@ static int fdevent_poll_event_get_revent(fdevents *ev, size_t ndx) {
return r;
}
static int fdevent_poll_event_get_fd(fdevents *ev, size_t ndx) {
return ev->pollfds[ndx].fd;
}
static int fdevent_poll_event_next_fdndx(fdevents *ev, int ndx) {
size_t i;
i = (ndx < 0) ? 0 : ndx + 1;
for (; i < ev->used; i++) {
static int fdevent_poll_event_next_fdndx(const fdevents *ev, int ndx) {
for (size_t i = (size_t)(ndx+1); i < ev->used; ++i) {
if (ev->pollfds[i].revents) return i;
}
return -1;
}
static int fdevent_poll_poll(fdevents *ev, int timeout_ms) {
#if 0
fdevent_poll_event_compress(ev);
#endif
int n = poll(ev->pollfds, ev->used, timeout_ms);
server * const srv = ev->srv;
for (int ndx = -1, i = 0; i < n; ++i) {
fdnode *fdn;
ndx = fdevent_poll_event_next_fdndx(ev, ndx);
if (-1 == ndx) continue;
fdn = ev->fdarray[ndx];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_poll_event_get_revent(ev, i);
(*fdn->handler)(srv, fdn->ctx, revents);
}
}
return n;
}
int fdevent_poll_init(fdevents *ev) {
ev->type = FDEVENT_HANDLER_POLL;
#define SET(x) \
@ -181,10 +185,6 @@ int fdevent_poll_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
return 0;
}

@ -57,20 +57,7 @@ static int fdevent_select_event_set(fdevents *ev, int fde_ndx, int fd, int event
return fd;
}
static int fdevent_select_poll(fdevents *ev, int timeout_ms) {
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
ev->select_read = ev->select_set_read;
ev->select_write = ev->select_set_write;
ev->select_error = ev->select_set_error;
return select(ev->select_max_fd + 1, &(ev->select_read), &(ev->select_write), &(ev->select_error), &tv);
}
static int fdevent_select_event_get_revent(fdevents *ev, size_t ndx) {
static int fdevent_select_event_get_revent(const fdevents *ev, size_t ndx) {
int revents = 0;
if (FD_ISSET(ndx, &(ev->select_read))) {
@ -86,13 +73,7 @@ static int fdevent_select_event_get_revent(fdevents *ev, size_t ndx) {
return revents;
}
static int fdevent_select_event_get_fd(fdevents *ev, size_t ndx) {
UNUSED(ev);
return ndx;
}
static int fdevent_select_event_next_fdndx(fdevents *ev, int ndx) {
static int fdevent_select_event_next_fdndx(const fdevents *ev, int ndx) {
int i;
i = (ndx < 0) ? 0 : ndx + 1;
@ -106,6 +87,31 @@ static int fdevent_select_event_next_fdndx(fdevents *ev, int ndx) {
return -1;
}
static int fdevent_select_poll(fdevents *ev, int timeout_ms) {
int n;
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
ev->select_read = ev->select_set_read;
ev->select_write = ev->select_set_write;
ev->select_error = ev->select_set_error;
n = select(ev->select_max_fd + 1, &(ev->select_read), &(ev->select_write), &(ev->select_error), &tv);
for (int ndx = -1, i = 0; i < n; ++i) {
fdnode *fdn;
ndx = fdevent_select_event_next_fdndx(ev, ndx);
if (-1 == ndx) continue;
fdn = ev->fdarray[ndx];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_select_event_get_revent(ev, ndx);
(*fdn->handler)(ev->srv, fdn->ctx, revents);
}
}
return n;
}
int fdevent_select_init(fdevents *ev) {
ev->type = FDEVENT_HANDLER_SELECT;
#define SET(x) \
@ -117,10 +123,6 @@ int fdevent_select_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
return 0;
}

@ -68,20 +68,7 @@ static int fdevent_solaris_devpoll_event_set(fdevents *ev, int fde_ndx, int fd,
return fd;
}
static int fdevent_solaris_devpoll_poll(fdevents *ev, int timeout_ms) {
struct dvpoll dopoll;
int ret;
dopoll.dp_timeout = timeout_ms;
dopoll.dp_nfds = ev->maxfds - 1;
dopoll.dp_fds = ev->devpollfds;
ret = ioctl(ev->devpoll_fd, DP_POLL, &dopoll);
return ret;
}
static int fdevent_solaris_devpoll_event_get_revent(fdevents *ev, size_t ndx) {
static int fdevent_solaris_devpoll_event_get_revent(const fdevents *ev, size_t ndx) {
int r, poll_r;
r = 0;
@ -99,18 +86,25 @@ static int fdevent_solaris_devpoll_event_get_revent(fdevents *ev, size_t ndx) {
return r;
}
static int fdevent_solaris_devpoll_event_get_fd(fdevents *ev, size_t ndx) {
return ev->devpollfds[ndx].fd;
}
static int fdevent_solaris_devpoll_event_next_fdndx(fdevents *ev, int last_ndx) {
size_t i;
UNUSED(ev);
i = (last_ndx < 0) ? 0 : last_ndx + 1;
return i;
static int fdevent_solaris_devpoll_poll(fdevents *ev, int timeout_ms) {
int n;
server * const srv = ev->srv;
struct dvpoll dopoll;
dopoll.dp_timeout = timeout_ms;
dopoll.dp_nfds = ev->maxfds - 1;
dopoll.dp_fds = ev->devpollfds;
n = ioctl(ev->devpoll_fd, DP_POLL, &dopoll);
for (int i = 0; i < n; ++i) {
fdnode * const fdn = ev->fdarray[ev->devpollfds[i].fd];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_solaris_devpoll_event_get_revent(ev, i);
(*fdn->handler)(srv, fdn->ctx, revents);
}
}
return n;
}
int fdevent_solaris_devpoll_reset(fdevents *ev) {
@ -138,10 +132,6 @@ int fdevent_solaris_devpoll_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
ev->devpollfds = malloc(sizeof(*ev->devpollfds) * ev->maxfds);
force_assert(NULL != ev->devpollfds);

@ -58,7 +58,7 @@ static int fdevent_solaris_port_event_set(fdevents *ev, int fde_ndx, int fd, int
return fd;
}
static int fdevent_solaris_port_event_get_revent(fdevents *ev, size_t ndx) {
static int fdevent_solaris_port_event_get_revent(const fdevents *ev, size_t ndx) {
int events = 0, e;
e = ev->port_events[ndx].portev_events;
@ -72,20 +72,6 @@ static int fdevent_solaris_port_event_get_revent(fdevents *ev, size_t ndx) {
return e;
}
static int fdevent_solaris_port_event_get_fd(fdevents *ev, size_t ndx) {
return ev->port_events[ndx].portev_object;
}
static int fdevent_solaris_port_event_next_fdndx(fdevents *ev, int ndx) {
size_t i;
UNUSED(ev);
i = (ndx < 0) ? 0 : ndx + 1;
return i;
}
static void fdevent_solaris_port_free(fdevents *ev) {
close(ev->port_fd);
free(ev->port_events);
@ -118,7 +104,7 @@ static int fdevent_solaris_port_poll(fdevents *ev, int timeout_ms) {
if (!(errno == ETIME && wait_for_events != available_events)) return ret;
}
for (i = 0; i < available_events; ++i) {
for (i = 0; i < (int)available_events; ++i) {
user_data = (const int *) ev->port_events[i].portev_user;
if ((ret = port_associate(ev->port_fd, PORT_SOURCE_FD, ev->port_events[i].portev_object,
@ -132,7 +118,14 @@ static int fdevent_solaris_port_poll(fdevents *ev, int timeout_ms) {
}
}
return available_events;
for (i = 0; i < available_events; ++i) {
fdnode * const fdn = ev->fdarray[ev->port_events[i].portev_object];
if (0 == ((uintptr_t)fdn & 0x3)) {
int revents = fdevent_solaris_port_event_get_revent(ev, i);
(*fdn->handler)(ev->srv, fdn->ctx, revents);
}
}
return available_events;
}
int fdevent_solaris_port_init(fdevents *ev) {
@ -146,10 +139,6 @@ int fdevent_solaris_port_init(fdevents *ev) {
SET(event_del);
SET(event_set);
SET(event_next_fdndx);
SET(event_get_fd);
SET(event_get_revent);
if ((ev->port_fd = port_create()) < 0) {
log_error_write(ev->srv, __FILE__, __LINE__, "SSS",
"port_create() failed (", strerror(errno), "), try to set server.event-handler = \"poll\" or \"select\"");

@ -2026,17 +2026,15 @@ static int server_main_loop (server * const srv) {
}
}
if ((n = fdevent_poll(srv->ev, 1000)) > 0) {
fdevent_process(srv, srv->ev, n);
last_active_ts = srv->cur_ts;
} else if (n < 0 && errno != EINTR) {
if ((n = fdevent_poll(srv->ev, 1000)) >= 0) {
if (n > 0) last_active_ts = srv->cur_ts;
fdevent_sched_run(srv, srv->ev);
} else if (errno != EINTR) {
log_error_write(srv, __FILE__, __LINE__, "ss",
"fdevent_poll failed:",
strerror(errno));
}
if (n >= 0) fdevent_sched_run(srv, srv->ev);
for (size_t ndx = 0; ndx < joblist->used; ++ndx) {
connection *con = joblist->ptr[ndx];
connection_state_machine(srv, con);

Loading…
Cancel
Save