Add li_ev_safe_ref_and_stop macro and use it

personal/stbuehler/wip
Stefan Bühler 14 years ago
parent 30f804170c
commit 594987c29e

@ -28,6 +28,13 @@ LI_API void li_ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events
LI_API void li_ev_io_rem_events(struct ev_loop *loop, ev_io *watcher, int events);
LI_API void li_ev_io_set_events(struct ev_loop *loop, ev_io *watcher, int events);
#define li_ev_safe_ref_and_stop(stopf, loop, watcher) do { \
ev_watcher *__w = (ev_watcher*) watcher; \
if (ev_is_active(__w)) { \
ev_ref(loop); \
stopf(loop, watcher); \
} \
} while (0)
/* URL inplace decode: replace %XX with character \xXX; replace control characters with '_' (< 32 || == 127) */
LI_API void li_url_decode(GString *path);

@ -316,8 +316,8 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents)
#endif
goto write_eagain;
default: /* Fatal error, connection has to be closed */
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, loop, &acon->out_notify_watcher);
li_ev_safe_ref_and_stop(ev_io_stop, loop, &acon->fd_watcher);
acon->close_cb(acon, NULL); /* TODO: set err */
return;
}
@ -336,8 +336,8 @@ static void angel_connection_io_cb(struct ev_loop *loop, ev_io *w, int revents)
send_item->value.fds.pos++;
continue;
case -1: /* Fatal error, connection has to be closed */
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, loop, &acon->out_notify_watcher);
li_ev_safe_ref_and_stop(ev_io_stop, loop, &acon->fd_watcher);
acon->close_cb(acon, NULL); /* TODO: set err */
return;
case -2: goto write_eagain;
@ -366,8 +366,8 @@ write_eagain:
if (revents | EV_READ) {
GError *err = NULL;
if (!angel_connection_read(acon, &err)) {
ev_async_stop(loop, &acon->out_notify_watcher);
ev_io_stop(loop, &acon->fd_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, loop, &acon->out_notify_watcher);
li_ev_safe_ref_and_stop(ev_io_stop, loop, &acon->fd_watcher);
acon->close_cb(acon, err);
}
}
@ -390,12 +390,17 @@ liAngelConnection* li_angel_connection_new(struct ev_loop *loop, int fd, gpointe
acon->fd = fd;
acon->call_id_list = li_idlist_new(65535);
acon->call_table = g_ptr_array_new();
ev_io_init(&acon->fd_watcher, angel_connection_io_cb, fd, EV_READ);
ev_io_start(acon->loop, &acon->fd_watcher);
acon->fd_watcher.data = acon;
ev_unref(acon->loop); /* this watcher shouldn't keep the loop alive */
ev_async_init(&acon->out_notify_watcher, angel_connection_out_notify_cb);
ev_async_start(acon->loop, &acon->out_notify_watcher);
acon->out_notify_watcher.data = acon;
ev_unref(acon->loop); /* this watcher shouldn't keep the loop alive */
acon->out = g_queue_new();
acon->in.data = g_string_sized_new(1024);
acon->in.pos = 0;
@ -442,8 +447,8 @@ void li_angel_connection_free(liAngelConnection *acon) {
g_mutex_free(acon->mutex);
acon->mutex = NULL;
ev_io_stop(acon->loop, &acon->fd_watcher);
ev_async_stop(acon->loop, &acon->out_notify_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, acon->loop, &acon->out_notify_watcher);
li_ev_safe_ref_and_stop(ev_io_stop, acon->loop, &acon->fd_watcher);
li_idlist_free(acon->call_id_list);
while (NULL != (send_item = g_queue_pop_head(acon->out))) {

@ -45,18 +45,16 @@ static void server_setup_free(gpointer _ss) {
g_slice_free(liServerSetup, _ss);
}
#define CATCH_SIGNAL(loop, cb, n) do {\
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
ev_unref(loop); /* Signal watchers shouldn't keep loop alive */ \
#define CATCH_SIGNAL(loop, cb, n) do { \
ev_init(&srv->sig_w_##n, cb); \
ev_signal_set(&srv->sig_w_##n, SIG##n); \
ev_signal_start(loop, &srv->sig_w_##n); \
srv->sig_w_##n.data = srv; \
/* Signal watchers shouldn't keep loop alive */ \
ev_unref(loop); \
} while (0)
#define UNCATCH_SIGNAL(loop, n) do {\
ev_ref(loop); \
ev_signal_stop(loop, &srv->sig_w_##n); \
} while (0)
#define UNCATCH_SIGNAL(loop, n) li_ev_safe_ref_and_stop(ev_signal_stop, loop, &srv->sig_w_##n)
static void sigint_cb(struct ev_loop *loop, struct ev_signal *w, int revents) {
liServer *srv = (liServer*) w->data;

@ -50,7 +50,7 @@ void stat_cache_free(liStatCache *sc) {
g_thread_join(sc->thread);
g_slice_free(liStatCacheEntry, dummy);
ev_async_stop(sc->delete_queue.loop, &sc->job_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, sc->delete_queue.loop, &sc->job_watcher);
/* clear cache */
g_hash_table_iter_init(&iter, sc->entries);

@ -392,8 +392,7 @@ void li_worker_free(liWorker *wrk) {
g_queue_clear(&wrk->closing_sockets);
}
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->job_async_queue_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->job_async_queue_watcher);
{ /* free timestamps */
guint i;
@ -405,8 +404,7 @@ void li_worker_free(liWorker *wrk) {
g_array_free(wrk->timestamps_local, TRUE);
}
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->li_worker_exit_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->li_worker_exit_watcher);
{
GAsyncQueue *q = wrk->job_async_queue;
@ -426,11 +424,9 @@ void li_worker_free(liWorker *wrk) {
g_async_queue_unref(wrk->new_con_queue);
ev_ref(wrk->loop);
ev_timer_stop(wrk->loop, &wrk->stats_watcher);
li_ev_safe_ref_and_stop(ev_timer_stop, wrk->loop, &wrk->stats_watcher);
ev_ref(wrk->loop);
ev_async_stop(wrk->loop, &wrk->collect_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->collect_watcher);
li_collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0);
g_async_queue_unref(wrk->collect_queue);

Loading…
Cancel
Save