Optimize IO handling to safe syscalls; run vrequest state machines twice before looking for new io events

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent b876f8401d
commit b87e74d151
  1. 7
      include/lighttpd/connection.h
  2. 5
      include/lighttpd/typedefs.h
  3. 2
      include/lighttpd/worker.h
  4. 247
      src/main/connection.c
  5. 5
      src/main/network.c
  6. 1
      src/main/virtualrequest.c
  7. 105
      src/main/worker.c

@ -41,8 +41,6 @@ struct liConnection {
liChunkQueue *in, *out; /* link to mainvr->in/out */
liBuffer *raw_in_buffer;
ev_io sock_watcher;
liVRequest *mainvr;
liHttpRequestCtx req_parser_ctx;
@ -57,6 +55,9 @@ struct liConnection {
} keep_alive_data;
guint keep_alive_requests;
ev_io sock_watcher;
gboolean can_read, can_write;
/* I/O timeout data */
liWaitQueueElem io_timeout_elem;
};
@ -71,6 +72,8 @@ LI_API void li_connection_reset(liConnection *con);
/** aborts an active connection, calls all plugin cleanup handlers */
LI_API void li_connection_error(liConnection *con); /* used in worker.c */
LI_API void li_connection_start(liConnection *con, liSocketAddress remote_addr, int s, liServerSocket *srv_sock);
/* public function */
LI_API gchar *li_connection_state_str(liConnectionState state);

@ -116,11 +116,10 @@ typedef enum {
/* network.h */
typedef enum {
LI_NETWORK_STATUS_SUCCESS, /**< some IO was actually done (read/write) or cq was empty for write */
LI_NETWORK_STATUS_SUCCESS, /**< socket probably could have done more */
LI_NETWORK_STATUS_FATAL_ERROR,
LI_NETWORK_STATUS_CONNECTION_CLOSE,
LI_NETWORK_STATUS_WAIT_FOR_EVENT /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK; no real IO was done
internal: some io may be done */
LI_NETWORK_STATUS_WAIT_FOR_EVENT /**< read/write returned -1 with errno=EAGAIN/EWOULDBLOCK */
} liNetworkStatus;
/* options.h */

@ -66,7 +66,7 @@ struct liWorker {
struct ev_loop *loop;
ev_prepare loop_prepare;
ev_check loop_check;
ev_async li_worker_stop_watcher, li_worker_suspend_watcher, li_worker_exit_watcher;
ev_async worker_stop_watcher, worker_suspend_watcher, worker_exit_watcher;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections
* use with atomic, read direct from local worker context

@ -8,11 +8,11 @@ static void li_connection_internal_error(liConnection *con);
static void update_io_events(liConnection *con) {
int events = 0;
if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
if (!con->can_read && (con->state != LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
events = events | EV_READ;
}
if (con->raw_out->length > 0) {
if (!con->can_write && con->raw_out->length > 0) {
if (!con->mainvr->throttled || con->mainvr->throttle.magazine > 0) {
events = events | EV_WRITE;
}
@ -266,104 +266,135 @@ static gboolean connection_handle_read(liConnection *con) {
return TRUE;
}
static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
static void connection_update_io_timeout(liConnection *con) {
liWorker *wrk = con->wrk;
if ((con->io_timeout_elem.ts + 1.0) < ev_now(wrk->loop)) {
li_waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem);
}
}
static gboolean connection_try_read(liConnection *con) {
liNetworkStatus res;
liConnection *con = (liConnection*) w->data;
gboolean update_io_timeout = FALSE;
/* ensure that the connection is always in the io timeout queue */
if (!con->io_timeout_elem.queued)
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
// con->can_read = TRUE;
if (!con->in->is_closed) {
goffset transferred;
transferred = con->raw_in->length;
if (con->srv_sock->read_cb) {
res = con->srv_sock->read_cb(con);
} else {
res = li_network_read(con->mainvr, con->sock_watcher.fd, con->raw_in, &con->raw_in_buffer);
}
transferred = con->raw_in->length - transferred;
if (transferred > 0) connection_update_io_timeout(con);
li_vrequest_update_stats_in(con->mainvr, transferred);
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
con->can_read = FALSE; /* for now we still need the EV_READ event to get a callback */
if (!connection_handle_read(con)) return FALSE;
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network read fatal error");
li_connection_error(con);
return FALSE;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
con->raw_in->is_closed = TRUE;
/* shutdown(con->sock_watcher.fd, SHUT_RD); */ /* useless anyway */
ev_io_stop(con->wrk->loop, &con->sock_watcher);
close(con->sock_watcher.fd);
ev_io_set(&con->sock_watcher, -1, 0);
connection_close(con);
return FALSE;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
con->can_read = FALSE;
break;
}
}
return TRUE;
}
static gboolean connection_try_write(liConnection *con) {
liNetworkStatus res;
if (revents & EV_READ) {
if (!con->in->is_closed) {
goffset transferred;
transferred = con->raw_in->length;
con->can_write = TRUE;
if (con->srv_sock->read_cb) {
res = con->srv_sock->read_cb(con);
if (con->raw_out->length > 0) {
goffset transferred;
static const goffset WRITE_MAX = 256*1024; /* 256kB */
goffset write_max;
if (con->mainvr->throttled) {
write_max = MIN(con->mainvr->throttle.magazine, WRITE_MAX);
} else {
write_max = WRITE_MAX;
}
if (write_max > 0) {
transferred = con->raw_out->length;
if (con->srv_sock->write_cb) {
res = con->srv_sock->write_cb(con, write_max);
} else {
res = li_network_read(con->mainvr, w->fd, con->raw_in, &con->raw_in_buffer);
res = li_network_write(con->mainvr, con->sock_watcher.fd, con->raw_out, write_max);
}
transferred = con->raw_in->length - transferred;
update_io_timeout = update_io_timeout || (transferred > 0);
li_vrequest_update_stats_in(con->mainvr, transferred);
transferred = transferred - con->raw_out->length;
con->info.out_queue_length = con->raw_out->length;
if (transferred > 0) {
connection_update_io_timeout(con);
li_vrequest_joblist_append(con->mainvr);
}
con->can_write = FALSE; /* for now we still need the EV_WRITE event to get a callback */
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
if (0 == transferred) break;
if (!connection_handle_read(con)) return;
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network read fatal error");
_ERROR(con->srv, con->mainvr, "%s", "network write fatal error");
li_connection_error(con);
return;
return FALSE;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
con->raw_in->is_closed = TRUE;
shutdown(w->fd, SHUT_RD);
connection_close(con);
return;
return FALSE;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
}
} else {
transferred = 0;
}
}
if (revents & EV_WRITE) {
if (con->raw_out->length > 0) {
goffset transferred;
static const goffset WRITE_MAX = 256*1024; /* 256kB */
goffset write_max;
li_vrequest_update_stats_out(con->mainvr, transferred);
if (con->mainvr->throttled) {
write_max = MIN(con->mainvr->throttle.magazine, WRITE_MAX);
} else {
write_max = WRITE_MAX;
}
if (con->mainvr->throttled) {
li_throttle_update(con->mainvr, transferred, WRITE_MAX);
}
}
if (write_max > 0) {
transferred = con->raw_out->length;
return TRUE;
}
if (con->srv_sock->write_cb) {
res = con->srv_sock->write_cb(con, write_max);
} else {
res = li_network_write(con->mainvr, w->fd, con->raw_out, write_max);
}
static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
liConnection *con = (liConnection*) w->data;
UNUSED(loop);
transferred = transferred - con->raw_out->length;
con->info.out_queue_length = con->raw_out->length;
update_io_timeout = update_io_timeout || (transferred > 0);
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
if (0 == transferred) break;
li_vrequest_joblist_append(con->mainvr);
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network write fatal error");
li_connection_error(con);
return;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
connection_close(con);
return;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
}
} else {
transferred = 0;
}
/* ensure that the connection is always in the io timeout queue */
if (!con->io_timeout_elem.queued)
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
li_vrequest_update_stats_out(con->mainvr, transferred);
if (revents & EV_READ) con->can_read = TRUE;
if (revents & EV_WRITE) con->can_write = TRUE;
if (con->mainvr->throttled) {
li_throttle_update(con->mainvr, transferred, WRITE_MAX);
}
} else {
_DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue");
}
}
if (con->can_read)
if (!connection_try_read(con)) return;
if (con->can_write)
if (!connection_try_write(con)) return;
if (revents & EV_ERROR) {
/* if this happens, we have a serious bug in the event handling */
@ -372,10 +403,6 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
return;
}
if (update_io_timeout && ((con->io_timeout_elem.ts + 1.0) < ev_now(loop))) {
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
}
if (!check_response_done(con)) return;
update_io_events(con);
@ -392,7 +419,15 @@ static liHandlerResult mainvr_handle_response_headers(liVRequest *vr) {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "read request/handle response header");
}
if (con->can_read)
if (!connection_try_read(con)) return FALSE;
parse_request_body(con);
if (con->can_write)
if (!connection_try_write(con)) return FALSE;
update_io_events(con);
return LI_HANDLER_GO_ON;
@ -406,9 +441,15 @@ static liHandlerResult mainvr_handle_response_body(liVRequest *vr) {
VR_DEBUG(vr, "%s", "write response");
}
if (con->can_read)
if (!connection_try_read(con)) return FALSE;
parse_request_body(con);
forward_response_body(con);
if (con->can_write)
if (!connection_try_write(con)) return FALSE;
if (!check_response_done(con)) return LI_HANDLER_GO_ON;
update_io_events(con);
@ -420,6 +461,12 @@ static liHandlerResult mainvr_handle_response_error(liVRequest *vr) {
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
li_connection_internal_error(con);
if (con->can_read)
if (!connection_try_read(con)) return FALSE;
if (con->can_write)
if (!connection_try_write(con)) return FALSE;
update_io_events(con);
return LI_HANDLER_GO_ON;
@ -429,6 +476,9 @@ static liHandlerResult mainvr_handle_request_headers(liVRequest *vr) {
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
/* start reading input */
if (con->can_read)
if (!connection_try_read(con)) return FALSE;
parse_request_body(con);
update_io_events(con);
@ -438,6 +488,11 @@ static liHandlerResult mainvr_handle_request_headers(liVRequest *vr) {
static gboolean mainvr_handle_check_io(liVRequest *vr) {
liConnection *con = LI_CONTAINER_OF(vr->coninfo, liConnection, info);
if (con->can_read)
if (!connection_try_read(con)) return FALSE;
if (con->can_write)
if (!connection_try_write(con)) return FALSE;
update_io_events(con);
return TRUE;
@ -492,6 +547,8 @@ liConnection* li_connection_new(liWorker *wrk) {
ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb);
con->keep_alive_data.watcher.data = con;
con->can_read = con->can_write = TRUE;
con->io_timeout_elem.data = con;
return con;
@ -512,8 +569,8 @@ void li_connection_reset(liConnection *con) {
ev_io_stop(con->wrk->loop, &con->sock_watcher);
if (con->sock_watcher.fd != -1) {
if (con->raw_in->is_closed) { /* read already shutdown */
shutdown(con->sock_watcher.fd, SHUT_WR);
if (con->raw_in->is_closed) { /* read already got EOF */
shutdown(con->sock_watcher.fd, SHUT_RDWR);
close(con->sock_watcher.fd);
} else {
li_worker_add_closing_socket(con->wrk, con->sock_watcher.fd);
@ -565,6 +622,8 @@ void li_connection_reset(liConnection *con) {
con->info.stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->info.stats.last_avg = 0;
con->can_read = con->can_write = TRUE;
/* remove from timeout queue */
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
}
@ -681,6 +740,34 @@ void li_connection_free(liConnection *con) {
g_slice_free(liConnection, con);
}
void li_connection_start(liConnection *con, liSocketAddress remote_addr, int s, liServerSocket *srv_sock) {
ev_io_set(&con->sock_watcher, s, 0);
con->srv_sock = srv_sock;
con->state = LI_CON_STATE_REQUEST_START;
con->mainvr->ts_started = con->ts_started = CUR_TS(con->wrk);
con->info.remote_addr = remote_addr;
li_sockaddr_to_string(remote_addr, con->info.remote_addr_str, FALSE);
con->info.local_addr = li_sockaddr_local_from_socket(s);
li_sockaddr_to_string(con->info.local_addr, con->info.local_addr_str, FALSE);
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
if (srv_sock->new_cb) {
if (!srv_sock->new_cb(con)) {
li_connection_error(con);
return;
}
}
if (con->can_read)
if (!connection_try_read(con)) return;
update_io_events(con);
}
gchar *li_connection_state_str(liConnectionState state) {
static const gchar *states[] = {
"dead",

@ -61,7 +61,6 @@ liNetworkStatus li_network_write(liVRequest *vr, int fd, liChunkQueue *cq, goffs
res = li_network_write_writev(vr, fd, cq, &write_bytes);
#endif
wrote = write_max - write_bytes;
if (wrote > 0 && res == LI_NETWORK_STATUS_WAIT_FOR_EVENT) res = LI_NETWORK_STATUS_SUCCESS;
#ifdef TCP_CORK
if (corked) {
@ -136,7 +135,7 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq, liBuff
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return len ? LI_NETWORK_STATUS_SUCCESS : LI_NETWORK_STATUS_WAIT_FOR_EVENT;
return LI_NETWORK_STATUS_WAIT_FOR_EVENT;
case ECONNRESET:
case ETIMEDOUT:
return LI_NETWORK_STATUS_CONNECTION_CLOSE;
@ -146,7 +145,7 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq, liBuff
}
} else if (0 == r) {
if (buffer == NULL && !cq_buf_append) li_buffer_release(buf);
return len ? LI_NETWORK_STATUS_SUCCESS : LI_NETWORK_STATUS_CONNECTION_CLOSE;
return LI_NETWORK_STATUS_CONNECTION_CLOSE;
}
if (cq_buf_append) {
li_chunkqueue_update_last_buffer_size(cq, r);

@ -651,7 +651,6 @@ void li_vrequest_joblist_append(liVRequest *vr) {
GQueue *const q = &wrk->job_queue;
if (!g_atomic_int_compare_and_exchange(&vr->queued, 0, 1)) return; /* already in queue */
g_queue_push_tail_link(q, &vr->job_queue_link);
ev_timer_start(wrk->loop, &wrk->job_queue_watcher);
}
void li_vrequest_joblist_append_async(liVRequestRef *vr_ref) {

@ -25,7 +25,7 @@ struct worker_closing_socket {
static void worker_close_socket_now(worker_closing_socket *scs) {
liWorker *wrk = scs->wrk;
shutdown(scs->fd, SHUT_RD);
/* shutdown(scs->fd, SHUT_RD); */ /* useless anyway */
close(scs->fd);
g_queue_delete_link(&wrk->closing_sockets, scs->link);
g_slice_free(worker_closing_socket, scs);
@ -168,22 +168,46 @@ static void worker_io_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents)
li_waitqueue_update(&wrk->io_timeout_queue);
}
static void worker_job_queue(liWorker *wrk, int loops) {
int i;
for (i = 0; i < loops; i++) {
GQueue q = wrk->job_queue;
GList *l;
liVRequest *vr;
if (q.length == 0) return;
g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */
while (NULL != (l = g_queue_pop_head_link(&q))) {
vr = l->data;
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
li_vrequest_state_machine(vr);
}
}
if (wrk->job_queue.length > 0) {
/* make sure we will run again soon */
ev_timer_start(wrk->loop, &wrk->job_queue_watcher);
}
}
/* run vreqest state machine */
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
static void li_worker_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
GQueue q = wrk->job_queue;
GList *l;
liVRequest *vr;
UNUSED(loop);
UNUSED(revents);
g_queue_init(&wrk->job_queue); /* reset queue, elements are in q */
worker_job_queue(wrk, 3);
}
static void worker_job_queue_cb(struct ev_loop *loop, ev_timer *w, int revents) {
UNUSED(loop);
UNUSED(revents);
UNUSED(w);
while (NULL != (l = g_queue_pop_head_link(&q))) {
vr = l->data;
g_assert(g_atomic_int_compare_and_exchange(&vr->queued, 1, 0));
li_vrequest_state_machine(vr);
}
/* just kept loop alive, call state machines in prepare */
}
/* run vreqest state machine for async queued jobs */
@ -281,27 +305,7 @@ void li_worker_new_con(liWorker *ctx, liWorker *wrk, liSocketAddress remote_addr
if (ctx == wrk) {
liConnection *con = worker_con_get(wrk);
con->srv_sock = srv_sock;
con->state = LI_CON_STATE_REQUEST_START;
ev_io_set(&con->sock_watcher, s, EV_READ);
ev_io_start(wrk->loop, &con->sock_watcher);
con->ts_started = CUR_TS(wrk);
con->mainvr->ts_started = CUR_TS(wrk);
con->info.remote_addr = remote_addr;
li_sockaddr_to_string(remote_addr, con->info.remote_addr_str, FALSE);
con->info.local_addr = li_sockaddr_local_from_socket(s);
li_sockaddr_to_string(con->info.local_addr, con->info.local_addr_str, FALSE);
li_waitqueue_push(&wrk->io_timeout_queue, &con->io_timeout_elem);
if (srv_sock->new_cb) {
if (!srv_sock->new_cb(con)) {
li_connection_error(con);
}
}
li_connection_start(con, remote_addr, s, srv_sock);
} else {
li_worker_new_con_data *d = g_slice_new(li_worker_new_con_data);
d->remote_addr = remote_addr;
@ -409,18 +413,23 @@ liWorker* li_worker_new(liServer *srv, struct ev_loop *loop) {
g_array_index(wrk->timestamps_local, liWorkerTS, i).str = g_string_sized_new(255);
}
ev_init(&wrk->li_worker_exit_watcher, li_worker_exit_cb);
wrk->li_worker_exit_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->li_worker_exit_watcher);
ev_init(&wrk->loop_prepare, li_worker_prepare_cb);
wrk->loop_prepare.data = wrk;
ev_prepare_start(wrk->loop, &wrk->loop_prepare);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&wrk->worker_exit_watcher, li_worker_exit_cb);
wrk->worker_exit_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_exit_watcher);
ev_unref(wrk->loop); /* this watcher shouldn't keep the loop alive */
ev_init(&wrk->li_worker_stop_watcher, li_worker_stop_cb);
wrk->li_worker_stop_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->li_worker_stop_watcher);
ev_init(&wrk->worker_stop_watcher, li_worker_stop_cb);
wrk->worker_stop_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_stop_watcher);
ev_init(&wrk->li_worker_suspend_watcher, li_worker_suspend_cb);
wrk->li_worker_suspend_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->li_worker_suspend_watcher);
ev_init(&wrk->worker_suspend_watcher, li_worker_suspend_cb);
wrk->worker_suspend_watcher.data = wrk;
ev_async_start(wrk->loop, &wrk->worker_suspend_watcher);
ev_init(&wrk->new_con_watcher, li_worker_new_con_cb);
wrk->new_con_watcher.data = wrk;
@ -498,7 +507,7 @@ void li_worker_free(liWorker *wrk) {
g_array_free(wrk->timestamps_local, TRUE);
}
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->li_worker_exit_watcher);
li_ev_safe_ref_and_stop(ev_async_stop, wrk->loop, &wrk->worker_exit_watcher);
{
GAsyncQueue *q = wrk->job_async_queue;
@ -524,6 +533,8 @@ void li_worker_free(liWorker *wrk) {
li_collect_watcher_cb(wrk->loop, &wrk->collect_watcher, 0);
g_async_queue_unref(wrk->collect_queue);
li_ev_safe_ref_and_stop(ev_prepare_stop, wrk->loop, &wrk->loop_prepare);
g_string_free(wrk->tmp_str, TRUE);
li_stat_cache_free(wrk->stat_cache);
@ -569,8 +580,8 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
li_plugins_worker_stop(wrk);
ev_async_stop(wrk->loop, &wrk->li_worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->li_worker_suspend_watcher);
ev_async_stop(wrk->loop, &wrk->worker_stop_watcher);
ev_async_stop(wrk->loop, &wrk->worker_suspend_watcher);
ev_async_stop(wrk->loop, &wrk->new_con_watcher);
li_waitqueue_stop(&wrk->io_timeout_queue);
li_waitqueue_stop(&wrk->throttle_queue);
@ -595,7 +606,7 @@ void li_worker_stop(liWorker *context, liWorker *wrk) {
}
}
} else {
ev_async_send(wrk->loop, &wrk->li_worker_stop_watcher);
ev_async_send(wrk->loop, &wrk->worker_stop_watcher);
}
}
@ -628,7 +639,7 @@ void li_worker_suspend(liWorker *context, liWorker *wrk) {
}
#endif
} else {
ev_async_send(wrk->loop, &wrk->li_worker_suspend_watcher);
ev_async_send(wrk->loop, &wrk->worker_suspend_watcher);
}
}
@ -636,7 +647,7 @@ void li_worker_exit(liWorker *context, liWorker *wrk) {
if (context == wrk) {
ev_unloop (wrk->loop, EVUNLOOP_ALL);
} else {
ev_async_send(wrk->loop, &wrk->li_worker_exit_watcher);
ev_async_send(wrk->loop, &wrk->worker_exit_watcher);
}
}

Loading…
Cancel
Save