2
0
Fork 0

[common] implement libev wrapper

* need a way to mark a watcher as "don't keep the loop alive"
personal/stbuehler/wip
Stefan Bühler 2013-05-18 15:27:59 +02:00
parent 079c365f39
commit cfd8955008
66 changed files with 1715 additions and 957 deletions

View File

@ -8,13 +8,13 @@ typedef void (*liAngelLogOpen)(liServer *srv, int fd, gpointer data);
/* interface to the angel; implementation needs to work without angel too */
LI_API void li_angel_setup(liServer *srv);
/* listen to a socket */
/* listen to a socket (mainloop context) */
LI_API void li_angel_listen(liServer *srv, GString *str, liAngelListenCB cb, gpointer data);
/* send log messages during startup to angel, frees the string */
LI_API void li_angel_log(liServer *srv, GString *str);
LI_API void li_angel_log_open_file(liServer *srv, GString *filename, liAngelLogOpen, gpointer data);
LI_API void li_angel_log_open_file(liServer *srv, liEventLoop *loop, GString *filename, liAngelLogOpen, gpointer data);
/* angle_fake definitions, only for internal use */
int li_angel_fake_listen(liServer *srv, GString *str);

View File

@ -2,6 +2,7 @@
#define _LIGHTTPD_ANGEL_CONNECTION_H_
#include <lighttpd/idlist.h>
#include <lighttpd/events.h>
#define ANGEL_CALL_MAX_STR_LEN (64*1024) /* must fit into a gint32 */
@ -10,7 +11,7 @@ typedef struct liAngelConnection liAngelConnection;
typedef struct liAngelCall liAngelCall;
/* error, data and fds-array will be freed/closed by the angel api itself; if you want to use the fds set the array size to 0 */
typedef void (*liAngelCallCB)(liAngelCall *acall, gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds);
typedef void (*liAngelCallCB)(gpointer ctx, gboolean timeout, GString *error, GString *data, GArray *fds);
typedef void (*liAngelReceiveCallCB)(liAngelConnection *acon,
const gchar *mod, gsize mod_len, const gchar *action, gsize action_len,
@ -23,12 +24,11 @@ typedef void (*liAngelCloseCB)(liAngelConnection *acon, GError *err);
struct liAngelConnection {
gpointer data;
GMutex *mutex;
struct ev_loop *loop;
int fd;
liIDList *call_id_list;
GPtrArray *call_table;
ev_io fd_watcher;
ev_async out_notify_watcher;
liEventIO fd_watcher;
liEventAsync out_notify_watcher;
GQueue *out;
liAngelBuffer in;
@ -55,8 +55,13 @@ struct liAngelCall {
/* internal data */
gint32 id; /* id is -1 if there is no call pending (the callback may still be running) */
liAngelConnection *acon;
ev_timer timeout_watcher;
ev_tstamp timeout;
liEventTimer timeout_watcher;
liEventAsync result_watcher;
struct {
GString *error, *data;
GArray *fds;
} result;
};
/* error handling */
@ -80,12 +85,12 @@ typedef enum {
/* create connection */
LI_API liAngelConnection* li_angel_connection_new(
struct ev_loop *loop, int fd, gpointer data,
liEventLoop *loop, int fd, gpointer data,
liAngelReceiveCallCB recv_call, liAngelCloseCB close_cb);
LI_API void li_angel_connection_free(liAngelConnection *acon);
LI_API liAngelCall *li_angel_call_new(liAngelCallCB callback, ev_tstamp timeout);
LI_API liAngelCall *li_angel_call_new(liEventLoop *loop, liAngelCallCB callback, li_tstamp timeout);
/* returns TRUE if a call was cancelled; make sure you don't call free while you're calling send_call */
LI_API gboolean li_angel_call_free(liAngelCall *call);

View File

@ -17,7 +17,7 @@ struct liPluginCoreConfig {
liInstance *inst;
GHashTable *listen_sockets;
ev_signal sig_hup;
liEventSignal sig_hup;
};
typedef struct liPluginCoreListenMask liPluginCoreListenMask;

View File

@ -16,7 +16,7 @@ struct liErrorPipe {
liErrorPipeCB cb;
int fds[2];
ev_io fd_watcher;
liEventIO fd_watcher;
};
struct liProc {

View File

@ -30,7 +30,7 @@ struct liInstance {
liInstanceConf *ic;
liProc *proc;
ev_child child_watcher;
liEventChild child_watcher;
liInstanceState s_cur, s_dest;
@ -44,8 +44,8 @@ struct liInstance {
struct liServer {
guint32 magic; /** server magic version, check against LIGHTTPD_ANGEL_MAGIC in plugins */
struct ev_loop *loop;
ev_signal
liEventLoop loop;
liEventSignal
sig_w_INT,
sig_w_TERM,
sig_w_PIPE;

View File

@ -21,9 +21,7 @@ typedef void (*liBackendCB)(liBackendPool *bpool);
struct liBackendConnection {
/* must not keep a loop ref while the connection is not actively used,
* get and put will do ref/unref if the watcher is active */
ev_io watcher;
liEventIO watcher;
gpointer data;
};

View File

@ -5,6 +5,8 @@
#error Please include <lighttpd/base.h> instead of this file
#endif
#include <lighttpd/events.h>
/* Open a file only once, so it shouldn't get lost;
* as a file may get split into many chunks, we
* use this struct to keep track of the usage
@ -53,12 +55,11 @@ struct liChunk {
typedef void (*liCQLimitNotifyCB)(gpointer context, gboolean locked);
struct liCQLimit {
gint refcount;
struct ev_loop *loop;
goffset limit, current;
gboolean locked;
ev_io *io_watcher;
liEventIO *io_watcher;
liCQLimitNotifyCB notify; /* callback to reactivate input */
gpointer context;
@ -124,7 +125,7 @@ INLINE goffset li_chunk_length(liChunk *c);
* cqlimit *
******************/
LI_API liCQLimit* li_cqlimit_new(struct ev_loop *loop);
LI_API liCQLimit* li_cqlimit_new();
LI_API void li_cqlimit_reset(liCQLimit *cql);
LI_API void li_cqlimit_acquire(liCQLimit *cql);
LI_API void li_cqlimit_release(liCQLimit *cql);
@ -138,7 +139,7 @@ LI_API liChunkQueue* li_chunkqueue_new();
LI_API void li_chunkqueue_reset(liChunkQueue *cq);
LI_API void li_chunkqueue_free(liChunkQueue *cq);
LI_API void li_chunkqueue_use_limit(liChunkQueue *cq, struct ev_loop *loop, goffset limit);
LI_API void li_chunkqueue_use_limit(liChunkQueue *cq, goffset limit);
LI_API void li_chunkqueue_set_limit(liChunkQueue *cq, liCQLimit* cql);
/* return -1 for unlimited, 0 for full and n > 0 for n bytes free */
LI_API goffset li_chunkqueue_limit_available(liChunkQueue *cq);

View File

@ -33,6 +33,6 @@ LI_API liCollectInfo* li_collect_start_global(liServer *srv, liCollectFuncCB fun
LI_API void li_collect_break(liCollectInfo* ci); /** this will result in complete == FALSE in the callback; call it if cbdata gets invalid */
/* internal functions */
LI_API void li_collect_watcher_cb(struct ev_loop *loop, ev_async *w, int revents);
LI_API void li_collect_watcher_cb(liEventBase *watcher, int events);
#endif

View File

@ -67,7 +67,7 @@ struct liConnection {
GList *link;
ev_tstamp timeout;
guint max_idle;
ev_timer watcher;
liEventTimer watcher;
} keep_alive_data;
guint keep_alive_requests;

534
include/lighttpd/events.h Normal file
View File

@ -0,0 +1,534 @@
#ifndef _LIGHTTPD_EVENTS_H_
#define _LIGHTTPD_EVENTS_H_
#include <lighttpd/settings.h>
#include <lighttpd/utils.h>
enum {
LI_EV_READ = 0x01,
LI_EV_WRITE = 0x02,
LI_EV_WAKEUP = 0x04
};
typedef enum {
LI_EVT_NONE = 0,
LI_EVT_IO,
LI_EVT_TIMER,
LI_EVT_ASYNC,
LI_EVT_CHILD,
LI_EVT_SIGNAL,
LI_EVT_PREPARE,
LI_EVT_CHECK
} liEventType;
typedef struct liEventLoop liEventLoop;
typedef struct liEventBase liEventBase;
typedef struct liEventIO liEventIO;
typedef struct liEventTimer liEventTimer;
typedef struct liEventAsync liEventAsync;
typedef struct liEventChild liEventChild;
typedef struct liEventSignal liEventSignal;
typedef struct liEventPrepare liEventPrepare;
typedef struct liEventCheck liEventCheck;
typedef ev_tstamp li_tstamp;
typedef void (*liEventCallback)(liEventBase *watcher, int events);
struct liEventBase {
liEventType type;
unsigned int keep_loop_alive:1, active: 1;
GList link_watchers; /* data points to loop */
liEventCallback callback;
};
struct liEventIO {
liEventBase base;
int events;
union {
struct ev_watcher w;
/* struct ev_watcher_list l; */
struct ev_io io;
} libevmess;
};
struct liEventTimer {
liEventBase base;
union {
struct ev_watcher w;
struct ev_timer timer;
} libevmess;
};
struct liEventAsync {
liEventBase base;
union {
struct ev_watcher w;
struct ev_async async;
} libevmess;
};
struct liEventChild {
liEventBase base;
union {
struct ev_watcher w;
struct ev_child child;
} libevmess;
};
struct liEventSignal {
liEventBase base;
union {
struct ev_watcher w;
struct ev_signal sig;
} libevmess;
};
struct liEventPrepare {
liEventBase base;
union {
struct ev_watcher w;
struct ev_prepare prepare;
} libevmess;
};
struct liEventCheck {
liEventBase base;
union {
struct ev_watcher w;
struct ev_check check;
} libevmess;
};
#include <lighttpd/jobqueue.h>
struct liEventLoop {
struct ev_loop *loop;
liJobQueue jobqueue;
GQueue watchers;
GQueue closing_sockets;
/* whether loop should exit once all "keep_loop_alive" watchers are dead */
unsigned int end:1;
};
LI_API void li_event_loop_init(liEventLoop *loop, struct ev_loop *evloop);
LI_API struct ev_loop* li_event_loop_clear(liEventLoop *loop);
LI_API void li_event_loop_run(liEventLoop *loop);
LI_API void li_event_loop_end(liEventLoop *loop);
LI_API void li_event_loop_exit(liEventLoop *loop);
LI_API void li_event_loop_force_close_sockets(liEventLoop *loop);
LI_API const char* li_event_loop_backend_string(liEventLoop *loop);
INLINE li_tstamp li_event_time();
INLINE li_tstamp li_event_now(liEventLoop *loop);
LI_API void li_event_add_closing_socket(liEventLoop *loop, int fd);
INLINE void li_event_attach_(liEventLoop *loop, liEventBase *base);
INLINE void li_event_detach_(liEventBase *base);
INLINE gboolean li_event_attached_(liEventBase *base);
INLINE liEventLoop* li_event_get_loop_(liEventBase *base);
INLINE void li_event_start_(liEventBase *base);
INLINE void li_event_stop_(liEventBase *base);
INLINE gboolean li_event_active_(liEventBase *base);
INLINE void li_event_set_keep_loop_alive_(liEventBase *base, gboolean keep_loop_alive);
INLINE void li_event_clear_(liEventBase *base);
INLINE void li_event_set_callback_(liEventBase *base, liEventCallback callback);
#define li_event_attach(loop, watcher) (li_event_attach_((loop), &(watcher)->base))
#define li_event_detach(watcher) (li_event_detach_(&(watcher)->base))
#define li_event_attached(watcher) (li_event_attached_(&(watcher)->base))
#define li_event_get_loop(watcher) (li_event_get_loop_(&(watcher)->base))
#define li_event_start(watcher) (li_event_start_(&(watcher)->base))
#define li_event_stop(watcher) (li_event_stop_(&(watcher)->base))
#define li_event_active(watcher) (li_event_active_(&(watcher)->base))
#define li_event_set_keep_loop_alive(watcher, keep_loop_alive) (li_event_set_keep_loop_alive_(&(watcher)->base, keep_loop_alive))
#define li_event_clear(watcher) (li_event_clear_(&(watcher)->base))
#define li_event_set_callback(watcher, callback) (li_event_set_callback_(&(watcher)->base, callback))
/* defaults to keep_loop_alive = TRUE */
LI_API void li_event_io_init(liEventLoop *loop, liEventIO *io, liEventCallback callback, int fd, int events);
LI_API void li_event_io_set_fd(liEventIO *io, int fd);
INLINE int li_event_io_fd(liEventIO *io);
LI_API void li_event_io_set_events(liEventIO *io, int events);
LI_API void li_event_io_add_events(liEventIO *io, int events);
LI_API void li_event_io_rem_events(liEventIO *io, int events);
INLINE liEventIO* li_event_io_from(liEventBase *base);
/* defaults to keep_loop_alive = TRUE */
/* timer will always stop when it triggers */
LI_API void li_event_timer_init(liEventLoop *loop, liEventTimer *timer, liEventCallback callback);
INLINE void li_event_timer_once(liEventTimer *timer, li_tstamp timeout); /* also starts the watcher */
INLINE liEventTimer* li_event_timer_from(liEventBase *base);
/* defaults to keep_loop_alive = FALSE, starts immediately */
LI_API void li_event_async_init(liEventLoop *loop, liEventAsync *async, liEventCallback callback);
INLINE void li_event_async_send(liEventAsync *async);
INLINE liEventAsync* li_event_async_from(liEventBase *base);
/* defaults to keep_loop_alive = TRUE, starts immediately */
LI_API void li_event_child_init(liEventLoop *loop, liEventChild *child, liEventCallback callback, int pid);
INLINE int li_event_child_pid(liEventChild *child);
INLINE int li_event_child_status(liEventChild *child);
INLINE liEventChild* li_event_child_from(liEventBase *base);
/* defaults to keep_loop_alive = FALSE, starts immediately */
LI_API void li_event_signal_init(liEventLoop *loop, liEventSignal *signal, liEventCallback callback, int signum);
INLINE int li_event_signal_signum(liEventSignal *signal);
INLINE liEventSignal* li_event_signal_from(liEventBase *base);
/* defaults to keep_loop_alive = FALSE, starts immediately */
LI_API void li_event_prepare_init(liEventLoop *loop, liEventPrepare *prepare, liEventCallback callback);
INLINE liEventPrepare* li_event_prepare_from(liEventBase *base);
/* defaults to keep_loop_alive = FALSE, starts immediately */
LI_API void li_event_check_init(liEventLoop *loop, liEventCheck *check, liEventCallback callback);
INLINE liEventCheck* li_event_check_from(liEventBase *base);
/* inline implementations */
INLINE li_tstamp li_event_now(liEventLoop *loop) {
return ev_now(loop->loop);
}
INLINE li_tstamp li_event_time() {
return ev_time();
}
INLINE void li_event_attach_(liEventLoop *loop, liEventBase *base) {
assert(NULL == base->link_watchers.data);
assert(NULL != loop);
base->link_watchers.data = loop;
g_queue_push_tail_link(&loop->watchers, &base->link_watchers);
if (base->active) {
base->active = 0;
li_event_start_(base);
}
}
INLINE void li_event_detach_(liEventBase *base) {
liEventLoop *loop = base->link_watchers.data;
if (NULL == loop) return;
if (base->active) {
li_event_stop_(base);
base->active = 1;
}
base->link_watchers.data = NULL;
g_queue_unlink(&loop->watchers, &base->link_watchers);
}
INLINE gboolean li_event_attached_(liEventBase *base) {
return NULL != base->link_watchers.data;
}
INLINE liEventLoop* li_event_get_loop_(liEventBase *base) {
return base->link_watchers.data;
}
INLINE void li_event_start_(liEventBase *base) {
liEventLoop *loop = base->link_watchers.data;
assert(NULL != base->callback);
assert(LI_EVT_NONE != base->type);
if (base->active) return;
base->active = 1;
if (NULL != loop) {
switch (base->type) {
case LI_EVT_NONE:
break;
case LI_EVT_IO:
{
liEventIO *io = li_event_io_from(base);
assert(!ev_is_active(&io->libevmess.w));
assert(-1 != io->libevmess.io.fd);
ev_io_start(loop->loop, &io->libevmess.io);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_TIMER:
{
liEventTimer *timer = li_event_timer_from(base);
assert(!ev_is_active(&timer->libevmess.w));
if (0 >= timer->libevmess.timer.repeat) timer->libevmess.timer.repeat = 0.1;
ev_timer_again(loop->loop, &timer->libevmess.timer);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_ASYNC:
{
liEventAsync *async = li_event_async_from(base);
assert(!ev_is_active(&async->libevmess.w));
ev_async_start(loop->loop, &async->libevmess.async);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_CHILD:
{
liEventChild *child = li_event_child_from(base);
assert(!ev_is_active(&child->libevmess.w));
ev_child_start(loop->loop, &child->libevmess.child);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_SIGNAL:
{
liEventSignal *sig = li_event_signal_from(base);
assert(!ev_is_active(&sig->libevmess.w));
ev_signal_start(loop->loop, &sig->libevmess.sig);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_PREPARE:
{
liEventPrepare *prepare = li_event_prepare_from(base);
assert(!ev_is_active(&prepare->libevmess.w));
ev_prepare_start(loop->loop, &prepare->libevmess.prepare);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
case LI_EVT_CHECK:
{
liEventCheck *check = li_event_check_from(base);
assert(!ev_is_active(&check->libevmess.w));
ev_check_start(loop->loop, &check->libevmess.check);
if (!base->keep_loop_alive) ev_unref(loop->loop);
}
break;
}
}
}
INLINE void li_event_stop_(liEventBase *base) {
liEventLoop *loop = base->link_watchers.data;
if (!base->active) return;
base->active = 0;
assert(NULL != base->callback);
assert(LI_EVT_NONE != base->type);
if (NULL != loop) {
switch (base->type) {
case LI_EVT_NONE:
break;
case LI_EVT_IO:
{
liEventIO *io = li_event_io_from(base);
assert(ev_is_active(&io->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_io_stop(loop->loop, &io->libevmess.io);
}
break;
case LI_EVT_TIMER:
{
liEventTimer *timer = li_event_timer_from(base);
assert(ev_is_active(&timer->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_timer_stop(loop->loop, &timer->libevmess.timer);
}
break;
case LI_EVT_ASYNC:
{
liEventAsync *async = li_event_async_from(base);
assert(ev_is_active(&async->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_async_stop(loop->loop, &async->libevmess.async);
}
break;
case LI_EVT_CHILD:
{
liEventChild *child = li_event_child_from(base);
assert(ev_is_active(&child->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_child_stop(loop->loop, &child->libevmess.child);
}
break;
case LI_EVT_SIGNAL:
{
liEventSignal *sig = li_event_signal_from(base);
assert(ev_is_active(&sig->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_signal_stop(loop->loop, &sig->libevmess.sig);
}
break;
case LI_EVT_PREPARE:
{
liEventPrepare *prepare = li_event_prepare_from(base);
assert(ev_is_active(&prepare->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_prepare_stop(loop->loop, &prepare->libevmess.prepare);
}
break;
case LI_EVT_CHECK:
{
liEventCheck *check = li_event_check_from(base);
assert(ev_is_active(&check->libevmess.w));
if (!base->keep_loop_alive) ev_ref(loop->loop);
ev_check_stop(loop->loop, &check->libevmess.check);
}
break;
}
}
}
INLINE gboolean li_event_active_(liEventBase *base) {
return base->active;
}
INLINE void li_event_clear_(liEventBase *base) {
if (LI_EVT_NONE == base->type) return;
if (li_event_attached_(base)) li_event_detach_(base);
base->active = FALSE;
base->callback = NULL;
switch (base->type) {
case LI_EVT_NONE:
break;
case LI_EVT_IO:
{
liEventIO *io = li_event_io_from(base);
io->events = 0;
ev_io_set(&io->libevmess.io, -1, 0);
ev_set_cb(&io->libevmess.io, NULL);
}
break;
case LI_EVT_TIMER:
{
liEventTimer *timer = li_event_timer_from(base);
timer->libevmess.timer.repeat = 0;
ev_set_cb(&timer->libevmess.timer, NULL);
}
break;
case LI_EVT_ASYNC:
{
liEventAsync *async = li_event_async_from(base);
ev_set_cb(&async->libevmess.async, NULL);
}
break;
case LI_EVT_CHILD:
{
liEventChild *child = li_event_child_from(base);
ev_child_set(&child->libevmess.child, -1, 0);
ev_set_cb(&child->libevmess.child, NULL);
}
break;
case LI_EVT_SIGNAL:
{
liEventSignal *sig = li_event_signal_from(base);
ev_set_cb(&sig->libevmess.sig, NULL);
ev_signal_set(&sig->libevmess.sig, 0);
}
break;
case LI_EVT_PREPARE:
{
liEventPrepare *prepare = li_event_prepare_from(base);
ev_set_cb(&prepare->libevmess.prepare, NULL);
}
break;
case LI_EVT_CHECK:
{
liEventCheck *check = li_event_check_from(base);
ev_set_cb(&check->libevmess.check, NULL);
}
break;
}
base->type = LI_EVT_NONE;
}
INLINE void li_event_set_callback_(liEventBase *base, liEventCallback callback) {
base->callback = callback;
}
INLINE void li_event_set_keep_loop_alive_(liEventBase *base, gboolean keep_loop_alive) {
liEventLoop *loop = base->link_watchers.data;
unsigned int v = keep_loop_alive ? 1 : 0;
if (v == base->keep_loop_alive) return;
base->keep_loop_alive = v;
if (NULL == loop || !base->active) return;
if (v) {
ev_ref(loop->loop);
} else {
ev_unref(loop->loop);
}
}
INLINE int li_event_io_fd(liEventIO *io) {
return io->libevmess.io.fd;
}
INLINE liEventIO* li_event_io_from(liEventBase *base) {
assert(LI_EVT_IO == base->type);
return LI_CONTAINER_OF(base, liEventIO, base);
}
INLINE void li_event_timer_once(liEventTimer *timer, li_tstamp timeout) {
li_event_stop(timer);
timer->libevmess.timer.repeat = timeout;
li_event_start(timer);
}
INLINE liEventTimer* li_event_timer_from(liEventBase *base) {
assert(LI_EVT_TIMER == base->type);
return LI_CONTAINER_OF(base, liEventTimer, base);
}
INLINE void li_event_async_send(liEventAsync *async) {
liEventLoop *loop = async->base.link_watchers.data;
ev_async_send(loop->loop, &async->libevmess.async);
}
INLINE liEventAsync* li_event_async_from(liEventBase *base){
assert(LI_EVT_ASYNC == base->type);
return LI_CONTAINER_OF(base, liEventAsync, base);
}
INLINE int li_event_child_pid(liEventChild *child) {
return child->libevmess.child.pid;
}
INLINE int li_event_child_status(liEventChild *child) {
return child->libevmess.child.rstatus;
}
INLINE liEventChild* li_event_child_from(liEventBase *base) {
assert(LI_EVT_CHILD == base->type);
return LI_CONTAINER_OF(base, liEventChild, base);
}
INLINE int li_event_signal_signum(liEventSignal *signal) {
return signal->libevmess.sig.signum;
}
INLINE liEventSignal* li_event_signal_from(liEventBase *base) {
assert(LI_EVT_SIGNAL == base->type);
return LI_CONTAINER_OF(base, liEventSignal, base);
}
INLINE liEventPrepare* li_event_prepare_from(liEventBase *base) {
assert(LI_EVT_PREPARE == base->type);
return LI_CONTAINER_OF(base, liEventPrepare, base);
}
INLINE liEventCheck* li_event_check_from(liEventBase *base) {
assert(LI_EVT_CHECK == base->type);
return LI_CONTAINER_OF(base, liEventCheck, base);
}
#endif

View File

@ -1,7 +1,9 @@
#ifndef _LIGHTTPD_JOBQUEUE_H_
#define _LIGHTTPD_JOBQUEUE_H_
#include <lighttpd/settings.h>
#ifndef _LIGHTTPD_EVENTS_H_
#error Include lighttpd/events.h instead
#endif
typedef struct liJob liJob;
typedef struct liJobRef liJobRef;
@ -25,19 +27,18 @@ struct liJobRef {
};
struct liJobQueue {
struct ev_loop *loop;
guint generation;
ev_prepare prepare_watcher;
liEventPrepare prepare_watcher;
GQueue queue;
ev_timer queue_watcher;
liEventTimer queue_watcher;
GAsyncQueue *async_queue;
ev_async async_queue_watcher;
liEventAsync async_queue_watcher;
};
LI_API void li_job_queue_init(liJobQueue *jq, struct ev_loop *loop);
LI_API void li_job_queue_init(liJobQueue *jq, liEventLoop *loop);
LI_API void li_job_queue_clear(liJobQueue *jq); /* runs until all jobs are done */
LI_API void li_job_init(liJob *job, liJobCB callback);

View File

@ -99,8 +99,8 @@ struct liLogEntry {
};
struct liLogServerData {
struct ev_loop *loop;
ev_async watcher;
liEventLoop loop;
liEventAsync watcher;
liRadixTree *targets; /** const gchar* path => (liLog*) */
liWaitQueue close_queue;
GQueue write_queue;

View File

@ -2,7 +2,7 @@
#define _LIGHTTPD_MEMCACHED_H_
#include <lighttpd/settings.h>
#include <lighttpd/events.h>
#include <lighttpd/buffer.h>
typedef struct liMemcachedCon liMemcachedCon;
@ -21,7 +21,7 @@ typedef void (*liMemcachedCB)(liMemcachedRequest *request, liMemcachedResult res
struct liMemcachedItem {
GString *key;
guint32 flags;
ev_tstamp ttl;
li_tstamp ttl;
guint64 cas;
liBuffer *data;
};
@ -42,13 +42,13 @@ typedef enum {
LI_MEMCACHED_UNKNOWN = 0xff
} liMemcachedError;
LI_API liMemcachedCon* li_memcached_con_new(struct ev_loop *loop, liSocketAddress addr);
LI_API liMemcachedCon* li_memcached_con_new(liEventLoop *loop, liSocketAddress addr);
LI_API void li_memcached_con_acquire(liMemcachedCon* con);
LI_API void li_memcached_con_release(liMemcachedCon* con); /* thread-safe */
/* these functions are not thread-safe, i.e. must be called in the same context as "loop" from li_memcached_con_new */
LI_API liMemcachedRequest* li_memcached_get(liMemcachedCon *con, GString *key, liMemcachedCB callback, gpointer cb_data, GError **err);
LI_API liMemcachedRequest* li_memcached_set(liMemcachedCon *con, GString *key, guint32 flags, ev_tstamp ttl, liBuffer *data, liMemcachedCB callback, gpointer cb_data, GError **err);
LI_API liMemcachedRequest* li_memcached_set(liMemcachedCon *con, GString *key, guint32 flags, li_tstamp ttl, liBuffer *data, liMemcachedCB callback, gpointer cb_data, GError **err);
/* if length(key) <= 250 and all chars x: 0x20 < x < 0x7f the key
* remains untouched; otherwise it gets replaced with its sha1hex hash

View File

@ -28,7 +28,7 @@ typedef enum {
struct liServerSocket {
gint refcount;
liServer *srv;
ev_io watcher;
liEventIO watcher;
liSocketAddress local_addr;
@ -54,7 +54,7 @@ struct liServer {
GMutex *statelock;
GQueue state_wait_queue;
liServerState state_wait_for;
ev_async state_ready_watcher;
liEventAsync state_ready_watcher;
liLuaState LL;
@ -66,13 +66,12 @@ struct liServer {
#endif
GArray *ts_formats; /** array of (GString*), add with li_server_ts_format_add() */
struct ev_loop *loop;
guint loop_flags;
ev_signal
liEventSignal
sig_w_INT,
sig_w_TERM,
sig_w_PIPE;
ev_timer srv_1sec_timer;
liEventTimer srv_1sec_timer;
GPtrArray *sockets; /** array of (server_socket*) */
@ -100,7 +99,7 @@ struct liServer {
liLogServerData logs;
ev_tstamp started;
li_tstamp started;
GString *started_str;
guint connection_load, max_connections;
@ -121,7 +120,7 @@ struct liServer {
LI_API liServer* li_server_new(const gchar *module_dir, gboolean module_resident);
LI_API void li_server_free(liServer* srv);
LI_API gboolean li_server_loop_init(liServer *srv);
LI_API void li_server_loop_init(liServer *srv);
LI_API liServerSocket* li_server_listen(liServer *srv, int fd);

View File

@ -5,7 +5,7 @@
#error Please include <lighttpd/base.h> instead of this file
#endif
#include <lighttpd/jobqueue.h>
#include <lighttpd/events.h>
typedef void (*liStreamCB)(liStream *stream, liStreamEvent event);
@ -17,14 +17,14 @@ struct liStream {
liChunkQueue *out;
liJob new_data_job;
liJobQueue *jobqueue;
liEventLoop *loop;
liStreamCB cb;
};
LI_API const gchar* li_stream_event_string(liStreamEvent event);
LI_API void li_stream_init(liStream* stream, liJobQueue *jobqueue, liStreamCB cb);
LI_API void li_stream_init(liStream* stream, liEventLoop *loop, liStreamCB cb);
LI_API void li_stream_acquire(liStream* stream);
LI_API void li_stream_release(liStream* stream);
INLINE void li_stream_safe_release(liStream** pstream);
@ -42,7 +42,7 @@ LI_API void li_stream_again_later(liStream *stream);
/* detach from jobqueue, stops all event handling. you have to detach all connected streams to move streams between threads */
LI_API void li_stream_detach(liStream *stream);
LI_API void li_stream_attach(liStream *stream, liJobQueue *jobqueue); /* attach to another jobqueue - possibly after switching threads */
LI_API void li_stream_attach(liStream *stream, liEventLoop *loop); /* attach to another loop - possibly after switching threads */
/* walks from first using ->dest until it reaches NULL or (it reached last and NULL != i->limit) or limit == i->cq->limit and
* sets i->cq->limit to limit, triggering LI_STREAM_NEW_CQLIMIT.
@ -53,10 +53,8 @@ LI_API void li_stream_set_cqlimit(liStream *first, liStream *last, liCQLimit *li
/* checks whether all chunkqueues in a range of streams are empty. one of first and last can be NULL to walk all stream in a direction */
LI_API gboolean li_streams_empty(liStream *first, liStream *last);
LI_API liStream* li_stream_plug_new(liJobQueue *jobqueue); /* simple forwarder; can also be used for providing data from memory */
LI_API liStream* li_stream_null_new(liJobQueue *jobqueue); /* eats everything, disconnects source on eof, out is always closed */
LI_API liStream* li_stream_plug_new(liEventLoop *loop); /* simple forwarder; can also be used for providing data from memory */
LI_API liStream* li_stream_null_new(liEventLoop *loop); /* eats everything, disconnects source on eof, out is always closed */
typedef void (*liIOStreamCB)(liIOStream *stream, liIOStreamEvent event);
@ -69,8 +67,7 @@ struct liIOStream {
liWaitQueue *write_timeout_queue;
liWaitQueueElem write_timeout_elem;
liWorker *wrk;
ev_io io_watcher;
liEventIO io_watcher;
/* whether we want to read/write */
gboolean in_closed, out_closed;

View File

@ -2,6 +2,7 @@
#define _LIGHTTPD_TASKLET_H_
#include <lighttpd/settings.h>
#include <lighttpd/events.h>
typedef struct liTaskletPool liTaskletPool;
@ -14,7 +15,7 @@ typedef void (*liTaskletRunCB)(gpointer data);
*/
/* we do not keep the loop alive! */
LI_API liTaskletPool* li_tasklet_pool_new(struct ev_loop *loop, gint threads);
LI_API liTaskletPool* li_tasklet_pool_new(liEventLoop *loop, gint threads);
/* blocks until all tasks are done; calls all finished callbacks;
* you are allowed to call this from finish callbacks, but not more than once!

View File

@ -56,8 +56,6 @@ LI_API gboolean li_querystring_find(const GString *querystring, const gchar *key
/* formats a given guint64 for output. if dest is NULL, a new string is allocated */
LI_API GString *li_counter_format(guint64 count, liCounterType t, GString *dest);
LI_API gchar *li_ev_backend_string(guint backend);
LI_API void li_string_destroy_notify(gpointer str);
LI_API guint li_hash_binary_len(gconstpointer data, gsize len);

View File

@ -2,6 +2,7 @@
#define _LIGHTTPD_WAITQUEUE_H_
#include <lighttpd/settings.h>
#include <lighttpd/events.h>
typedef struct liWaitQueueElem liWaitQueueElem;
typedef struct liWaitQueue liWaitQueue;
@ -9,7 +10,7 @@ typedef void (*liWaitQueueCB) (liWaitQueue *wq, gpointer data);
struct liWaitQueueElem {
gboolean queued;
ev_tstamp ts;
li_tstamp ts;
liWaitQueueElem *prev;
liWaitQueueElem *next;
gpointer data;
@ -18,8 +19,7 @@ struct liWaitQueueElem {
struct liWaitQueue {
liWaitQueueElem *head;
liWaitQueueElem *tail;
ev_timer timer;
struct ev_loop *loop;
liEventTimer timer;
gdouble delay;
liWaitQueueCB callback;
@ -34,7 +34,7 @@ struct liWaitQueue {
*/
/* initializes a waitqueue by creating the ev_timer and initializing the queue. precision is sub-seconds */
LI_API void li_waitqueue_init(liWaitQueue *queue, struct ev_loop *loop, liWaitQueueCB callback, gdouble delay, gpointer data);
LI_API void li_waitqueue_init(liWaitQueue *queue, liEventLoop *loop, liWaitQueueCB callback, gdouble delay, gpointer data);
/* stops the waitqueue. to restart it, simply call li_waitqueue_update */
LI_API void li_waitqueue_stop(liWaitQueue *queue);

View File

@ -6,7 +6,7 @@
#endif
#include <lighttpd/tasklet.h>
#include <lighttpd/jobqueue.h>
#include <lighttpd/events.h>
struct lua_State;
@ -28,7 +28,7 @@ struct liStatistics {
guint64 bytes_in_5s;
guint64 bytes_in_5s_diff;
guint active_cons_5s;
ev_tstamp last_avg;
li_tstamp last_avg;
/* peak values from 5s avg */
struct {
@ -41,17 +41,9 @@ struct liStatistics {
/* updated in timer */
guint64 last_requests;
double requests_per_sec;
ev_tstamp last_update;
li_tstamp last_update;
};
#define CUR_TS(wrk) ev_now((wrk)->loop)
/* only locks if there is more than one worker */
#define WORKER_LOCK(srv, lock) \
if ((srv)->worker_count > 1) g_static_rec_mutex_lock(lock)
#define WORKER_UNLOCK(srv, lock) \
if ((srv)->worker_count > 1) g_static_rec_mutex_unlock(lock)
typedef struct liWorkerTS liWorkerTS;
struct liWorkerTS {
time_t last_generated;
@ -66,10 +58,10 @@ struct liWorker {
liLuaState LL;
struct ev_loop *loop;
ev_prepare loop_prepare;
liEventLoop loop;
liEventPrepare loop_prepare;
/* ev_check loop_check; */
ev_async worker_stop_watcher, worker_stopping_watcher, worker_suspend_watcher, worker_exit_watcher;
liEventAsync worker_stop_watcher, worker_stopping_watcher, worker_suspend_watcher, worker_exit_watcher;
liLogWorkerData logs;
@ -80,12 +72,10 @@ struct liWorker {
GArray *connections; /** array of (connection*), use only from local worker context */
ev_tstamp connections_gc_ts;
GQueue closing_sockets; /** wait for EOF before shutdown(SHUT_RD) and close() */
GString *tmp_str; /**< can be used everywhere for local temporary needed strings */
/* keep alive timeout queue */
ev_timer keep_alive_timer;
liEventTimer keep_alive_timer;
GQueue keep_alive_queue;
liWaitQueue io_timeout_queue;
@ -99,20 +89,18 @@ struct liWorker {
/* incoming queues */
/* - new connections (after accept) */
ev_async new_con_watcher;
liEventAsync new_con_watcher;
GAsyncQueue *new_con_queue;
liServerStateWait wait_for_stop_connections;
ev_timer stats_watcher;
liEventTimer stats_watcher;
liStatistics stats;
/* collect framework */
ev_async collect_watcher;
liEventAsync collect_watcher;
GAsyncQueue *collect_queue;
liJobQueue jobqueue;
liTaskletPool *tasklets;
liStatCache *stat_cache;
@ -121,7 +109,7 @@ struct liWorker {
};
LI_API liWorker* li_worker_new(liServer *srv, struct ev_loop *loop);
LI_API void li_worker_free(liWorker *wrk);
LI_API struct ev_loop* li_worker_free(liWorker *wrk);
LI_API void li_worker_run(liWorker *wrk);
LI_API void li_worker_stop(liWorker *context, liWorker *wrk);
@ -141,4 +129,35 @@ LI_API void li_worker_add_closing_socket(liWorker *wrk, int fd);
/* internal function to recycle connection */
LI_API void li_worker_con_put(liConnection *con);
INLINE li_tstamp li_cur_ts(liWorker *wrk);
INLINE liWorker* li_worker_from_iostream(liIOStream *stream);
INLINE liWorker* li_worker_from_stream(liStream *stream);
/* inline implementations */
INLINE li_tstamp li_cur_ts(liWorker *wrk) {
return li_event_now(&wrk->loop);
}
INLINE liWorker* li_worker_from_stream(liStream *stream) {
if (NULL == stream->loop) return NULL;
return LI_CONTAINER_OF(stream->loop, liWorker, loop);
}
INLINE liWorker* li_worker_from_iostream(liIOStream *stream) {
liWorker* wrk;
wrk = li_worker_from_stream(&stream->stream_in);
if (NULL != wrk) return wrk;
wrk = li_worker_from_stream(&stream->stream_out);
if (NULL != wrk) return wrk;
{
liEventLoop *loop = li_event_get_loop(&stream->io_watcher);
if (NULL == loop) return NULL;
return LI_CONTAINER_OF(loop, liWorker, loop);
}
}
#endif

View File

@ -178,6 +178,7 @@ SET(COMMON_SRC
angel_data.c
buffer.c
encoding.c
events.c
idlist.c
ip_parsers.c
jobqueue.c

View File

@ -32,24 +32,24 @@ void li_log_write(liServer *srv, liLogLevel log_level, guint flags, const gchar
/* for normal error messages, we prepend a timestamp */
if (flags & LI_LOG_FLAG_TIMESTAMP) {
GString *log_ts = srv->log.ts_cache;
time_t cur_ts;
time_t li_cur_ts;
cur_ts = (time_t)ev_now(srv->loop);
li_cur_ts = (time_t)li_event_now(&srv->loop);
if (cur_ts != srv->log.last_ts) {
if (li_cur_ts != srv->log.last_ts) {
gsize s;
struct tm tm;
g_string_set_size(log_ts, 255);
#ifdef HAVE_LOCALTIME_R
s = strftime(log_ts->str, log_ts->allocated_len, "%Y-%m-%d %H:%M:%S %Z: ", localtime_r(&cur_ts, &tm));
s = strftime(log_ts->str, log_ts->allocated_len, "%Y-%m-%d %H:%M:%S %Z: ", localtime_r(&li_cur_ts, &tm));
#else
s = strftime(log_ts->str, log_ts->allocated_len, "%Y-%m-%d %H:%M:%S %Z: ", localtime(&cur_ts));
s = strftime(log_ts->str, log_ts->allocated_len, "%Y-%m-%d %H:%M:%S %Z: ", localtime(&li_cur_ts));
#endif
g_string_set_size(log_ts, s);
srv->log.last_ts = cur_ts;
srv->log.last_ts = li_cur_ts;
}
g_string_append_len(log_line, GSTR_LEN(log_ts));

View File

@ -86,7 +86,7 @@ int main(int argc, char *argv[]) {
INFO(srv, "%s", "parsed config file");
ev_loop(srv->loop, 0);
li_event_loop_run(&srv->loop);
INFO(srv, "%s", "going down");

View File

@ -673,7 +673,7 @@ static void core_free(liServer *srv, liPlugin *p) {
liPluginCoreConfig *config = (liPluginCoreConfig*) p->data;
guint i;
li_ev_safe_ref_and_stop(ev_signal_stop, srv->loop, &config->sig_hup);
li_event_clear(&config->sig_hup);
core_clean(srv, p);
@ -768,11 +768,10 @@ static void core_instance_replaced(liServer *srv, liPlugin *p, liInstance *oldi,
}
}
static void core_handle_sig_hup(struct ev_loop *loop, ev_signal *w, int revents) {
liPluginCoreConfig *config = w->data;
static void core_handle_sig_hup(liEventBase *watcher, int events) {
liPluginCoreConfig *config = LI_CONTAINER_OF(li_event_signal_from(watcher), liPluginCoreConfig, sig_hup);
liInstance *oldi, *newi;
UNUSED(loop);
UNUSED(revents);
UNUSED(events);
if (NULL == (oldi = config->inst)) return;
@ -804,10 +803,7 @@ static gboolean core_init(liServer *srv, liPlugin *p) {
li_angel_plugin_add_angel_cb(p, "reached-state", core_reached_state);
li_angel_plugin_add_angel_cb(p, "log-open-file", core_log_open_file);
ev_signal_init(&config->sig_hup, core_handle_sig_hup, SIGHUP);
config->sig_hup.data = config;
ev_signal_start(srv->loop, &config->sig_hup);
ev_unref(srv->loop);
li_event_signal_init(&srv->loop, &config->sig_hup, core_handle_sig_hup, SIGHUP);
return TRUE;
}

View File

@ -63,15 +63,14 @@ static void read_pipe(liServer *srv, liErrorPipe *epipe, gboolean flush) {
return;
close_epipe:
ev_io_stop(srv->loop, &epipe->fd_watcher);
li_event_stop(&epipe->fd_watcher);
close(epipe->fds[0]);
epipe->fds[0] = -1;
}
static void error_pipe_cb(struct ev_loop *loop, ev_io *w, int revents) {
liErrorPipe *epipe = w->data;
UNUSED(loop);
UNUSED(revents);
static void error_pipe_cb(liEventBase *watcher, int events) {
liErrorPipe *epipe = LI_CONTAINER_OF(li_event_io_from(watcher), liErrorPipe, fd_watcher);
UNUSED(events);
read_pipe(epipe->srv, epipe, FALSE);
}
@ -89,8 +88,7 @@ liErrorPipe* li_error_pipe_new(liServer *srv, liErrorPipeCB cb, gpointer ctx) {
epipe->srv = srv;
epipe->cb = cb;
epipe->ctx = ctx;
ev_io_init(&epipe->fd_watcher, error_pipe_cb, fds[0], EV_READ);
epipe->fd_watcher.data = epipe;
li_event_io_init(&srv->loop, &epipe->fd_watcher, error_pipe_cb, fds[0], LI_EV_READ);
epipe->fds[0] = fds[0];
epipe->fds[1] = fds[1];
@ -100,9 +98,7 @@ liErrorPipe* li_error_pipe_new(liServer *srv, liErrorPipeCB cb, gpointer ctx) {
}
void li_error_pipe_free(liErrorPipe *epipe) {
liServer *srv = epipe->srv;
ev_io_stop(srv->loop, &epipe->fd_watcher);
li_event_clear(&epipe->fd_watcher);
li_error_pipe_flush(epipe);
if (-1 != epipe->fds[0]) { close(epipe->fds[0]); epipe->fds[0] = -1; }
if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; }
@ -112,10 +108,8 @@ void li_error_pipe_free(liErrorPipe *epipe) {
/** closes out-fd */
void li_error_pipe_activate(liErrorPipe *epipe) {
liServer *srv = epipe->srv;
if (-1 != epipe->fds[1]) { close(epipe->fds[1]); epipe->fds[1] = -1; }
ev_io_start(srv->loop, &epipe->fd_watcher);
li_event_start(&epipe->fd_watcher);
}
/** closes in-fd, moves out-fd to dest_fd */

View File

@ -1,38 +1,26 @@
#include <lighttpd/angel_base.h>
#define CATCH_SIGNAL(loop, cb, n) do { \
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
/* Signal watchers shouldn't keep loop alive */ \
ev_unref(loop); \
} while (0)
#define UNCATCH_SIGNAL(loop, n) li_ev_safe_ref_and_stop(ev_signal_stop, loop, &srv->sig_w_##n)
static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
liServer *srv = (liServer*) w->data;
UNUSED(loop);
UNUSED(revents);
static void sigint_cb(liEventBase *watcher, int events) {
liServer *srv = LI_CONTAINER_OF(li_event_get_loop_(watcher), liServer, loop);
UNUSED(events);
li_server_stop(srv);