diff --git a/src/connection.c b/src/connection.c index 3cdb25a..10f4d7f 100644 --- a/src/connection.c +++ b/src/connection.c @@ -130,11 +130,17 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) { joblist_append(srv, con); } +static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) { + connection *con = (connection*) w->data; + UNUSED(loop); UNUSED(revents); + con_put(con->sock.srv, con); +} + connection* connection_new(server *srv) { connection *con = g_slice_new0(connection); UNUSED(srv); - con->state = CON_STATE_REQUEST_START; + con->state = CON_STATE_DEAD; con->response_headers_sent = FALSE; con->expect_100_cont = FALSE; @@ -158,11 +164,16 @@ connection* connection_new(server *srv) { physical_init(&con->physical); response_init(&con->response); + con->keep_alive_data.link = NULL; + con->keep_alive_data.timeout = 0; + my_ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb); + con->keep_alive_data.watcher.data = con; + return con; } void connection_reset(server *srv, connection *con) { - con->state = CON_STATE_REQUEST_START; + con->state = CON_STATE_DEAD; con->response_headers_sent = FALSE; con->expect_100_cont = FALSE; @@ -192,10 +203,39 @@ void connection_reset(server *srv, connection *con) { request_reset(&con->request); physical_reset(&con->physical); response_reset(&con->response); + + if (con->keep_alive_data.link) { + g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link); + con->keep_alive_data.link = NULL; + } + con->keep_alive_data.timeout = 0; + ev_timer_stop(srv->loop, &con->keep_alive_data.watcher); } +void server_check_keepalive(server *srv); void connection_reset_keep_alive(server *srv, connection *con) { - con->state = CON_STATE_REQUEST_START; + ev_timer_stop(srv->loop, &con->keep_alive_data.watcher); + { + guint timeout = GPOINTER_TO_INT(CORE_OPTION(CORE_OPTION_MAX_KEEP_ALIVE_IDLE)); + if (timeout == 0) { + con_put(srv, con); + return; + } + if (timeout >= srv->keep_alive_queue_timeout) { + /* queue is sorted by con->keep_alive_data.timeout */ + gboolean need_start = (0 == srv->keep_alive_queue.length); + con->keep_alive_data.timeout = ev_now((srv)->loop) + srv->keep_alive_queue_timeout; + g_queue_push_tail(&srv->keep_alive_queue, con); + con->keep_alive_data.link = g_queue_peek_tail_link(&srv->keep_alive_queue); + if (need_start) + server_check_keepalive(srv); + } else { + ev_timer_set(&con->keep_alive_data.watcher, timeout, 0); + ev_timer_start(srv->loop, &con->keep_alive_data.watcher); + } + } + + con->state = CON_STATE_KEEP_ALIVE; con->response_headers_sent = FALSE; con->expect_100_cont = FALSE; @@ -218,7 +258,7 @@ void connection_reset_keep_alive(server *srv, connection *con) { } void connection_free(server *srv, connection *con) { - con->state = CON_STATE_REQUEST_START; + con->state = CON_STATE_DEAD; con->response_headers_sent = FALSE; con->expect_100_cont = FALSE; @@ -249,6 +289,13 @@ void connection_free(server *srv, connection *con) { physical_clear(&con->physical); response_clear(&con->response); + if (con->keep_alive_data.link) { + g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link); + con->keep_alive_data.link = NULL; + } + con->keep_alive_data.timeout = 0; + ev_timer_stop(srv->loop, &con->keep_alive_data.watcher); + g_slice_free(connection, con); } @@ -264,6 +311,26 @@ void connection_state_machine(server *srv, connection *con) { gboolean done = FALSE; do { switch (con->state) { + case CON_STATE_DEAD: + done = TRUE; + break; + + case CON_STATE_KEEP_ALIVE: + if (con->raw_in->length > 0) { + /* stop keep alive timeout watchers */ + if (con->keep_alive_data.link) { + g_queue_delete_link(&srv->keep_alive_queue, con->keep_alive_data.link); + con->keep_alive_data.link = NULL; + } + con->keep_alive_data.timeout = 0; + ev_timer_stop(srv->loop, &con->keep_alive_data.watcher); + + connection_set_state(srv, con, CON_STATE_REQUEST_START); + } else + done = TRUE; + break; + + case CON_STATE_REQUEST_START: connection_set_state(srv, con, CON_STATE_READ_REQUEST_HEADER); action_enter(con, srv->mainaction); diff --git a/src/connection.h b/src/connection.h index f54282f..980f2d2 100644 --- a/src/connection.h +++ b/src/connection.h @@ -4,6 +4,12 @@ #include "base.h" typedef enum { + /** unused */ + CON_STATE_DEAD, + + /** waiting for new input after first request */ + CON_STATE_KEEP_ALIVE, + /** after the connect, the request is initialized, keep-alive starts here again */ CON_STATE_REQUEST_START, @@ -89,6 +95,13 @@ struct connection { struct log_t *log; gint log_level; + + /* Keep alive timeout data */ + struct { + GList *link; + ev_tstamp timeout; + ev_timer watcher; + } keep_alive_data; }; LI_API connection* connection_new(server *srv); diff --git a/src/plugin_core.c b/src/plugin_core.c index 76401d6..8d1bdd5 100644 --- a/src/plugin_core.c +++ b/src/plugin_core.c @@ -433,6 +433,7 @@ static plugin_option options[] = { { "static-file.exclude", OPTION_LIST, NULL, NULL, NULL }, { "server.tag", OPTION_STRING, NULL, NULL, NULL }, + { "server.max_keep_alive_idle", OPTION_INT, GINT_TO_POINTER(5), NULL, NULL }, { NULL, 0, NULL, NULL, NULL } }; diff --git a/src/plugin_core.h b/src/plugin_core.h index 7bc78cd..7407612 100644 --- a/src/plugin_core.h +++ b/src/plugin_core.h @@ -9,7 +9,8 @@ enum core_options_t { CORE_OPTION_STATIC_FILE_EXCLUDE = 3, - CORE_OPTION_SERVER_TAG = 4 + CORE_OPTION_SERVER_TAG = 4, + CORE_OPTION_MAX_KEEP_ALIVE_IDLE = 5 }; /* the core plugin always has base index 0, as it is the first plugin loaded */ diff --git a/src/server.c b/src/server.c index cf94f9c..97aaf28 100644 --- a/src/server.c +++ b/src/server.c @@ -1,6 +1,7 @@ #include "base.h" #include "utils.h" +#include "plugin_core.h" struct server_closing_socket; typedef struct server_closing_socket server_closing_socket; @@ -85,13 +86,54 @@ static void sigpipe_cb(struct ev_loop *loop, struct ev_signal *w, int revents) { ev_unref(loop); /* Signal watchers shouldn't keep loop alive */ \ } while (0) +void server_check_keepalive(server *srv) { + ev_tstamp now = ev_now((srv)->loop); + + if (0 == srv->keep_alive_queue.length) { + ev_timer_stop(srv->loop, &srv->keep_alive_timer); + } else { + srv->keep_alive_timer.repeat = ((connection*)g_queue_peek_head(&srv->keep_alive_queue))->keep_alive_data.timeout - now + 1; + ev_timer_again(srv->loop, &srv->keep_alive_timer); + } +} + +static void server_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) { + server *srv = (server*) w->data; + ev_tstamp now = ev_now((srv)->loop); + GQueue *q = &srv->keep_alive_queue; + GList *l; + connection *con; + + UNUSED(loop); + UNUSED(revents); + + while ( NULL != (l = g_queue_peek_head_link(q)) && + (con = (connection*) l->data)->keep_alive_data.timeout <= now ) { + guint timeout = GPOINTER_TO_INT(CORE_OPTION(CORE_OPTION_MAX_KEEP_ALIVE_IDLE)); + ev_tstamp remaining = timeout - srv->keep_alive_queue_timeout - (now - con->keep_alive_data.timeout); + if (remaining > 0) { + ev_timer_set(&con->keep_alive_data.watcher, remaining, 0); + ev_timer_start(srv->loop, &con->keep_alive_data.watcher); + } else { + /* close it */ + con_put(srv, con); + } + } + + if (NULL == l) { + ev_timer_stop(srv->loop, &srv->keep_alive_timer); + } else { + srv->keep_alive_timer.repeat = con->keep_alive_data.timeout - now + 1; + ev_timer_again(srv->loop, &srv->keep_alive_timer); + } +} + server* server_new() { server* srv = g_slice_new0(server); srv->magic = LIGHTTPD_SERVER_MAGIC; srv->state = SERVER_STARTING; - srv->connections_active = 0; srv->connections = g_array_new(FALSE, TRUE, sizeof(connection*)); srv->sockets = g_array_new(FALSE, TRUE, sizeof(server_socket*)); @@ -114,6 +156,10 @@ server* server_new() { log_init(srv); + g_queue_init(&srv->keep_alive_queue); + my_ev_init(&srv->keep_alive_timer, server_keepalive_cb); + srv->keep_alive_timer.data = srv; + return srv; } @@ -183,6 +229,9 @@ void server_free(server* srv) { g_mutex_free(srv->log_mutex); g_async_queue_unref(srv->log_queue); + g_queue_clear(&srv->keep_alive_queue); + ev_timer_stop(srv->loop, &srv->keep_alive_timer); + g_slice_free(server, srv); } @@ -239,6 +288,7 @@ static void server_listen_cb(struct ev_loop *loop, ev_io *w, int revents) { while (-1 != (s = accept(w->fd, (struct sockaddr*) &remote_addr, &l))) { connection *con = con_get(srv); + con->state = CON_STATE_REQUEST_START; con->remote_addr = remote_addr; ev_io_set(&con->sock.watcher, s, EV_READ); ev_io_start(srv->loop, &con->sock.watcher); @@ -295,6 +345,8 @@ void server_start(server *srv) { return; } + srv->keep_alive_queue_timeout = 5; + srv->option_count = g_hash_table_size(srv->options); srv->option_def_values = g_slice_alloc0(srv->option_count * sizeof(*srv->option_def_values)); @@ -328,6 +380,12 @@ void server_stop(server *srv) { server_socket *sock = g_array_index(srv->sockets, server_socket*, i); ev_io_stop(srv->loop, &sock->watcher); } + + for (i = srv->connections_active; i-- > 0;) { + connection *con = g_array_index(srv->connections, connection*, i); + if (con->state == CON_STATE_KEEP_ALIVE) + con_put(srv, con); + } } void server_exit(server *srv) { @@ -344,7 +402,6 @@ void server_exit(server *srv) { log_thread_wakeup(srv); } - void joblist_append(server *srv, connection *con) { connection_state_machine(srv, con); } diff --git a/src/server.h b/src/server.h index 8b3944d..39d3a75 100644 --- a/src/server.h +++ b/src/server.h @@ -39,6 +39,7 @@ struct server { guint loop_flags; struct ev_loop *loop; + ev_timer keep_alive_timer; guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections */ GArray *connections; /** array of (connection*) */ @@ -76,6 +77,10 @@ struct server { ev_tstamp started; statistics_t stats; + + /* keep alive timeout queue */ + guint keep_alive_queue_timeout; + GQueue keep_alive_queue; };