diff --git a/src/main/worker.c b/src/main/worker.c index 7c2e82f..6f1d920 100644 --- a/src/main/worker.c +++ b/src/main/worker.c @@ -14,19 +14,58 @@ struct worker_closing_socket { liWorker *wrk; GList *link; int fd; + ev_tstamp close_timeout; }; -static void worker_closing_socket_cb(int revents, void* arg) { - worker_closing_socket *scs = (worker_closing_socket*) arg; - UNUSED(revents); +static void worker_close_socket_now(worker_closing_socket *scs) { + liWorker *wrk = scs->wrk; - /* Whatever happend: we just close the socket */ shutdown(scs->fd, SHUT_RD); close(scs->fd); - g_queue_delete_link(&scs->wrk->closing_sockets, scs->link); + g_queue_delete_link(&wrk->closing_sockets, scs->link); g_slice_free(worker_closing_socket, scs); } +static void worker_closing_socket_cb(int revents, void* arg) { + worker_closing_socket *scs = (worker_closing_socket*) arg; + liWorker *wrk = scs->wrk; + ssize_t r; + UNUSED(revents); + + /* empty the input buffer, wait for EOF or timeout or a socket error to close it */ + g_string_set_size(wrk->tmp_str, 1024); + for (;;) { + r = read(scs->fd, wrk->tmp_str->str, wrk->tmp_str->len); + if (0 == r) break; /* got EOF */ + if (0 > r) { /* error */ + switch (errno) { + case EINTR: + /* call read again */ + continue; + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + /* check timeout: */ + if (!(revents & EV_TIMEOUT)) { + /* wait again */ + ev_once(wrk->loop, scs->fd, EV_READ, scs->close_timeout - ev_now(wrk->loop), worker_closing_socket_cb, scs); + return; + } + /* timeout reached, break switch and loop */ + break; + default: + /* real error (probably ECONNRESET or similar): break switch and loop */ + /* no logging: there is no context anymore for the socket */ + break; + } + break; /* end loop */ + } + } + + worker_close_socket_now(scs); +} + void li_worker_add_closing_socket(liWorker *wrk, int fd) { worker_closing_socket *scs; @@ -42,13 +81,14 @@ void li_worker_add_closing_socket(liWorker *wrk, int fd) { scs->fd = fd; g_queue_push_tail(&wrk->closing_sockets, scs); scs->link = g_queue_peek_tail_link(&wrk->closing_sockets); + scs->close_timeout = ev_now(wrk->loop) + 10.0; ev_once(wrk->loop, fd, EV_READ, 10.0, worker_closing_socket_cb, scs); } /* Kill it - frees fd */ static void worker_rem_closing_socket(liWorker *wrk, worker_closing_socket *scs) { - ev_feed_fd_event(wrk->loop, scs->fd, EV_READ); + ev_feed_fd_event(wrk->loop, scs->fd, EV_TIMEOUT); } /* Keep alive */ @@ -387,7 +427,7 @@ void li_worker_free(liWorker *wrk) { { /* force closing sockets */ GList *iter; for (iter = g_queue_peek_head_link(&wrk->closing_sockets); iter; iter = g_list_next(iter)) { - worker_closing_socket_cb(EV_TIMEOUT, (worker_closing_socket*) iter->data); + worker_close_socket_now((worker_closing_socket*) iter->data); } g_queue_clear(&wrk->closing_sockets); }