You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lighttpd2/src/main/connection.c

738 lines
24 KiB
C

#include <lighttpd/base.h>
#include <lighttpd/plugin_core.h>
/* only call it from the worker context the con belongs to */
void worker_con_put(liConnection *con); /* worker.c */
static void li_connection_reset_keep_alive(liConnection *con);
static void li_connection_internal_error(liConnection *con);
static void parse_request_body(liConnection *con) {
if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_READ);
if (con->mainvr->request.content_length == -1) {
/* TODO: parse chunked encoded request body, filters */
/* li_chunkqueue_steal_all(con->in, con->raw_in); */
con->in->is_closed = TRUE;
} else {
if (con->in->bytes_in < con->mainvr->request.content_length) {
li_chunkqueue_steal_len(con->in, con->raw_in, con->mainvr->request.content_length - con->in->bytes_in);
}
if (con->in->bytes_in == con->mainvr->request.content_length) {
con->in->is_closed = TRUE;
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}
}
li_vrequest_handle_request_body(con->mainvr);
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}
}
static void forward_response_body(liConnection *con) {
liVRequest *vr = con->mainvr;
if (con->state >= LI_CON_STATE_HANDLE_MAINVR) {
if (!con->response_headers_sent) {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "write response headers");
}
con->response_headers_sent = TRUE;
if (!li_response_send_headers(con)) {
con->response_headers_sent = FALSE;
li_connection_internal_error(con);
return;
}
}
if (vr->response.transfer_encoding & LI_HTTP_TRANSFER_ENCODING_CHUNKED) {
li_filter_chunked_encode(con, con->raw_out, con->out);
} else {
li_chunkqueue_steal_all(con->raw_out, con->out);
}
if (con->out->is_closed) con->raw_out->is_closed = TRUE;
if (con->raw_out->length > 0) {
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
}
static void connection_request_done(liConnection *con) {
liVRequest *vr = con->mainvr;
liServerState s;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(con->mainvr, "response end (keep_alive = %i)", con->keep_alive);
}
li_plugins_handle_close(con);
s = g_atomic_int_get(&con->srv->dest_state);
if (con->keep_alive && (LI_SERVER_RUNNING == s || LI_SERVER_WARMUP == s)) {
li_connection_reset_keep_alive(con);
} else {
worker_con_put(con);
}
}
static gboolean check_response_done(liConnection *con) {
if (con->in->is_closed && con->raw_out->is_closed && 0 == con->raw_out->length) {
connection_request_done(con);
return TRUE;
}
return FALSE;
}
static void connection_close(liConnection *con) {
liVRequest *vr = con->mainvr;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "connection closed");
}
li_plugins_handle_close(con);
worker_con_put(con);
}
void li_connection_error(liConnection *con) {
liVRequest *vr = con->mainvr;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "connection closed (error)");
}
li_plugins_handle_close(con);
worker_con_put(con);
}
static void li_connection_internal_error(liConnection *con) {
liVRequest *vr = con->mainvr;
if (con->response_headers_sent) {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "Couldn't send '500 Internal Error': headers already sent");
}
li_connection_error(con);
} else {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "internal error");
}
/* We only need the http version from the http request, "keep-alive" reset doesn't reset it */
li_vrequest_reset(con->mainvr, TRUE);
con->keep_alive = FALSE;
con->mainvr->response.http_status = 500;
con->state = LI_CON_STATE_WRITE; /* skips further vrequest handling */
li_chunkqueue_reset(con->out);
con->out->is_closed = TRUE;
con->in->is_closed = TRUE;
forward_response_body(con);
}
}
static gboolean connection_handle_read(liConnection *con) {
liVRequest *vr = con->mainvr;
if (con->raw_in->length == 0) return TRUE;
if (con->state == LI_CON_STATE_KEEP_ALIVE) {
/* stop keep alive timeout watchers */
if (con->keep_alive_data.link) {
g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
con->keep_alive_requests++;
/* disable keep alive if limit is reached */
if (con->keep_alive_requests == CORE_OPTION(LI_CORE_OPTION_MAX_KEEP_ALIVE_REQUESTS).number)
con->keep_alive = FALSE;
con->state = LI_CON_STATE_READ_REQUEST_HEADER;
con->ts = CUR_TS(con->wrk);
li_vrequest_start(con->mainvr);
} else {
if (con->state == LI_CON_STATE_REQUEST_START)
con->state = LI_CON_STATE_READ_REQUEST_HEADER;
}
if (con->state == LI_CON_STATE_READ_REQUEST_HEADER && con->mainvr->state == LI_VRS_CLEAN) {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "reading request header");
}
/* max uri length 8 kilobytes */
if (vr->request.uri.raw->len > 8*1024) {
VR_INFO(vr,
"request uri too large. limit: 8kb, received: %s",
li_counter_format(vr->request.uri.raw->len, COUNTER_BYTES, vr->wrk->tmp_str)->str
);
con->keep_alive = FALSE;
con->mainvr->response.http_status = 414; /* Request-URI Too Large */
li_vrequest_handle_direct(con->mainvr);
con->state = LI_CON_STATE_WRITE;
con->in->is_closed = TRUE;
forward_response_body(con);
return TRUE;
}
if (con->raw_in->length > 64*1024) {
VR_INFO(vr,
"request header too large. limit: 64kb, received: %s",
li_counter_format((guint64)con->raw_in->length, COUNTER_BYTES, vr->wrk->tmp_str)->str
);
con->keep_alive = FALSE;
con->mainvr->response.http_status = 413; /* Request Entity Too Large */
li_vrequest_handle_direct(con->mainvr);
con->state = LI_CON_STATE_WRITE;
con->in->is_closed = TRUE;
forward_response_body(con);
return TRUE;
}
switch(li_http_request_parse(con->mainvr, &con->req_parser_ctx)) {
case LI_HANDLER_GO_ON:
break; /* go on */
case LI_HANDLER_WAIT_FOR_EVENT:
return TRUE;
case LI_HANDLER_ERROR:
case LI_HANDLER_COMEBACK: /* unexpected */
/* unparsable header */
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "parsing header failed");
}
con->keep_alive = FALSE;
con->mainvr->response.http_status = 400;
li_vrequest_handle_direct(con->mainvr);
con->state = LI_CON_STATE_WRITE;
con->in->is_closed = TRUE;
forward_response_body(con);
return TRUE;
}
con->wrk->stats.requests++;
/* headers ready */
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "validating request header");
}
if (!li_request_validate_header(con)) {
/* skip mainvr handling */
con->state = LI_CON_STATE_WRITE;
con->keep_alive = FALSE;
con->in->is_closed = TRUE;
forward_response_body(con);
} else {
/* When does a client ask for 100 Continue? probably not while trying to ddos us
* as post content probably goes to a dynamic backend anyway, we don't
* care about the rare cases we could determine that we don't want a request at all
* before sending it to a backend - so just send the stupid header
*/
if (con->expect_100_cont) {
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "send 100 Continue");
}
li_chunkqueue_append_mem(con->raw_out, CONST_STR_LEN("HTTP/1.1 100 Continue\r\n\r\n"));
con->expect_100_cont = FALSE;
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_WRITE);
}
con->state = LI_CON_STATE_HANDLE_MAINVR;
li_action_enter(con->mainvr, con->srv->mainaction);
li_vrequest_handle_request_headers(con->mainvr);
}
} else {
parse_request_body(con);
}
return TRUE;
}
static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
liNetworkStatus res;
goffset write_max;
goffset transferred;
liConnection *con = (liConnection*) w->data;
if (revents & EV_READ) {
if (con->in->is_closed) {
/* don't read the next request before current one is done */
li_ev_io_rem_events(loop, w, EV_READ);
} else {
transferred = con->raw_in->length;
if (con->srv_sock->read_cb) {
res = con->srv_sock->read_cb(con);
} else {
res = li_network_read(con->mainvr, w->fd, con->raw_in);
}
transferred = con->raw_in->length - transferred;
con->wrk->stats.bytes_in += transferred;
con->stats.bytes_in += transferred;
if ((ev_now(loop) - con->stats.last_avg) >= 5.0) {
con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s;
con->stats.bytes_out_5s = con->stats.bytes_out;
con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s;
con->stats.bytes_in_5s = con->stats.bytes_in;
con->stats.last_avg = ev_now(loop);
}
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
if (!connection_handle_read(con)) return;
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network read fatal error");
li_connection_error(con);
return;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
con->raw_in->is_closed = TRUE;
shutdown(w->fd, SHUT_RD);
connection_close(con);
return;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
/* TODO: aio */
li_ev_io_rem_events(loop, w, EV_READ);
break;
}
}
}
if (revents & EV_WRITE) {
if (con->raw_out->length > 0) {
if (con->throttled) {
write_max = MIN(con->throttle.con.magazine, 256*1024);
} else {
write_max = 256*1024; /* 256kB */
}
if (write_max > 0) {
transferred = con->raw_out->length;
if (con->srv_sock->write_cb) {
res = con->srv_sock->write_cb(con, write_max);
} else {
res = li_network_write(con->mainvr, w->fd, con->raw_out, write_max);
}
transferred = transferred - con->raw_out->length;
con->wrk->stats.bytes_out += transferred;
con->stats.bytes_out += transferred;
switch (res) {
case LI_NETWORK_STATUS_SUCCESS:
li_vrequest_joblist_append(con->mainvr);
break;
case LI_NETWORK_STATUS_FATAL_ERROR:
_ERROR(con->srv, con->mainvr, "%s", "network write fatal error");
li_connection_error(con);
return;
case LI_NETWORK_STATUS_CONNECTION_CLOSE:
connection_close(con);
return;
case LI_NETWORK_STATUS_WAIT_FOR_EVENT:
break;
case LI_NETWORK_STATUS_WAIT_FOR_AIO_EVENT:
li_ev_io_rem_events(loop, w, EV_WRITE);
_ERROR(con->srv, con->mainvr, "%s", "TODO: wait for aio");
/* TODO: aio */
break;
}
} else {
transferred = 0;
}
if ((ev_now(loop) - con->stats.last_avg) >= 5.0) {
con->stats.bytes_out_5s_diff = con->stats.bytes_out - con->stats.bytes_out_5s;
con->stats.bytes_out_5s = con->stats.bytes_out;
con->stats.bytes_in_5s_diff = con->stats.bytes_in - con->stats.bytes_in_5s;
con->stats.bytes_in_5s = con->stats.bytes_in;
con->stats.last_avg = ev_now(loop);
}
if (con->throttled) {
con->throttle.con.magazine -= transferred;
/*g_print("%p wrote %"G_GINT64_FORMAT"/%"G_GINT64_FORMAT" bytes, mags: %d/%d, queued: %s\n", (void*)con,
transferred, write_max, con->throttle.pool.magazine, con->throttle.con.magazine, con->throttle.pool.queued ? "yes":"no");*/
if (con->throttle.con.magazine <= 0) {
li_ev_io_rem_events(loop, w, EV_WRITE);
li_waitqueue_push(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
}
if (con->throttle.pool.ptr && con->throttle.pool.magazine <= MAX(write_max,0) && !con->throttle.pool.queued) {
liThrottlePool *pool = con->throttle.pool.ptr;
g_atomic_int_inc(&pool->num_cons);
g_queue_push_tail_link(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
con->throttle.pool.queued = TRUE;
}
}
if (0 == con->raw_out->length) {
li_ev_io_rem_events(loop, w, EV_WRITE);
}
} else {
_DEBUG(con->srv, con->mainvr, "%s", "write event for empty queue");
li_ev_io_rem_events(loop, w, EV_WRITE);
}
}
if ((con->io_timeout_elem.ts + 1.0) < ev_now(loop))
li_waitqueue_push(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
check_response_done(con);
}
static void connection_keepalive_cb(struct ev_loop *loop, ev_timer *w, int revents) {
liConnection *con = (liConnection*) w->data;
UNUSED(loop); UNUSED(revents);
worker_con_put(con);
}
static liHandlerResult mainvr_handle_response_headers(liVRequest *vr) {
liConnection *con = vr->con;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "read request/handle response header");
}
parse_request_body(con);
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_response_body(liVRequest *vr) {
liConnection *con = vr->con;
if (check_response_done(con)) return LI_HANDLER_GO_ON;
if (CORE_OPTION(LI_CORE_OPTION_DEBUG_REQUEST_HANDLING).boolean) {
VR_DEBUG(vr, "%s", "write response");
}
parse_request_body(con);
forward_response_body(con);
if (check_response_done(con)) return LI_HANDLER_GO_ON;
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_response_error(liVRequest *vr) {
li_connection_internal_error(vr->con);
return LI_HANDLER_GO_ON;
}
static liHandlerResult mainvr_handle_request_headers(liVRequest *vr) {
/* start reading input */
parse_request_body(vr->con);
return LI_HANDLER_GO_ON;
}
liConnection* li_connection_new(liWorker *wrk) {
liServer *srv = wrk->srv;
liConnection *con = g_slice_new0(liConnection);
con->wrk = wrk;
con->srv = srv;
con->state = LI_CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
ev_init(&con->sock_watcher, connection_cb);
ev_io_set(&con->sock_watcher, -1, 0);
con->sock_watcher.data = con;
con->remote_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->local_addr_str = g_string_sized_new(INET6_ADDRSTRLEN);
con->is_ssl = FALSE;
con->keep_alive = TRUE;
con->raw_in = li_chunkqueue_new();
con->raw_out = li_chunkqueue_new();
con->mainvr = li_vrequest_new(con,
mainvr_handle_response_headers,
mainvr_handle_response_body,
mainvr_handle_response_error,
mainvr_handle_request_headers);
li_http_request_parser_init(&con->req_parser_ctx, &con->mainvr->request, con->raw_in);
con->in = con->mainvr->vr_in;
con->out = con->mainvr->vr_out;
li_chunkqueue_use_limit(con->raw_in, con->mainvr);
li_chunkqueue_use_limit(con->raw_out, con->mainvr);
li_chunkqueue_set_limit(con->mainvr->vr_in, con->raw_in->limit);
li_chunkqueue_set_limit(con->mainvr->vr_out, con->raw_out->limit);
li_chunkqueue_set_limit(con->mainvr->in, con->raw_in->limit);
li_chunkqueue_set_limit(con->mainvr->out, con->raw_out->limit);
con->keep_alive_data.link = NULL;
con->keep_alive_data.timeout = 0;
con->keep_alive_data.max_idle = 0;
ev_init(&con->keep_alive_data.watcher, connection_keepalive_cb);
con->keep_alive_data.watcher.data = con;
con->io_timeout_elem.data = con;
con->throttle.wqueue_elem.data = con;
con->throttle.pool.lnk.data = con;
con->throttle.ip.lnk.data = con;
return con;
}
void li_connection_reset(liConnection *con) {
con->state = LI_CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
if (con->srv_sock->close_cb)
con->srv_sock->close_cb(con);
li_server_socket_release(con->srv_sock);
con->srv_sock = NULL;
con->srv_sock_data = NULL;
con->is_ssl = FALSE;
ev_io_stop(con->wrk->loop, &con->sock_watcher);
if (con->sock_watcher.fd != -1) {
if (con->raw_in->is_closed) { /* read already shutdown */
shutdown(con->sock_watcher.fd, SHUT_WR);
close(con->sock_watcher.fd);
} else {
li_worker_add_closing_socket(con->wrk, con->sock_watcher.fd);
}
}
ev_io_set(&con->sock_watcher, -1, 0);
li_vrequest_reset(con->mainvr, FALSE);
li_http_request_parser_reset(&con->req_parser_ctx);
g_string_truncate(con->remote_addr_str, 0);
li_sockaddr_clear(&con->remote_addr);
g_string_truncate(con->local_addr_str, 0);
li_sockaddr_clear(&con->local_addr);
con->keep_alive = TRUE;
li_chunkqueue_reset(con->raw_in);
li_chunkqueue_reset(con->raw_out);
li_cqlimit_reset(con->raw_in->limit);
li_cqlimit_reset(con->raw_out->limit);
if (con->keep_alive_data.link) {
g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
con->keep_alive_data.max_idle = 0;
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
con->keep_alive_requests = 0;
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
/* remove from timeout queue */
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
/* remove from throttle queue */
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
if (con->throttle.pool.ptr) {
if (con->throttle.pool.queued) {
liThrottlePool *pool = con->throttle.pool.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
con->throttle.pool.queued = FALSE;
}
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttle.pool.ptr = NULL;
}
if (con->throttle.ip.ptr) {
if (con->throttle.ip.queued) {
liThrottlePool *pool = con->throttle.ip.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk);
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
con->throttle.ip.queued = FALSE;
}
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
con->throttle.ip.ptr = NULL;
}
con->throttle.con.rate = 0;
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttled = FALSE;
}
static void li_connection_reset_keep_alive(liConnection *con) {
liVRequest *vr = con->mainvr;
/* only start keep alive watcher if there isn't more input data already */
if (con->raw_in->length == 0) {
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
{
con->keep_alive_data.max_idle = CORE_OPTION(LI_CORE_OPTION_MAX_KEEP_ALIVE_IDLE).number;
if (con->keep_alive_data.max_idle == 0) {
worker_con_put(con);
return;
}
if (con->keep_alive_data.max_idle >= con->srv->keep_alive_queue_timeout) {
/* queue is sorted by con->keep_alive_data.timeout */
gboolean need_start = (0 == con->wrk->keep_alive_queue.length);
con->keep_alive_data.timeout = ev_now(con->wrk->loop) + con->srv->keep_alive_queue_timeout;
g_queue_push_tail(&con->wrk->keep_alive_queue, con);
con->keep_alive_data.link = g_queue_peek_tail_link(&con->wrk->keep_alive_queue);
if (need_start)
li_worker_check_keepalive(con->wrk);
} else {
ev_timer_set(&con->keep_alive_data.watcher, con->keep_alive_data.max_idle, 0);
ev_timer_start(con->wrk->loop, &con->keep_alive_data.watcher);
}
}
}
con->state = LI_CON_STATE_KEEP_ALIVE;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
li_ev_io_set_events(con->wrk->loop, &con->sock_watcher, EV_READ);
con->keep_alive = TRUE;
con->raw_out->is_closed = FALSE;
li_vrequest_reset(con->mainvr, TRUE);
li_http_request_parser_reset(&con->req_parser_ctx);
con->ts = CUR_TS(con->wrk);
/* reset stats */
con->stats.bytes_in = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_in_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.bytes_out = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s = G_GUINT64_CONSTANT(0);
con->stats.bytes_out_5s_diff = G_GUINT64_CONSTANT(0);
con->stats.last_avg = 0;
/* remove from timeout queue */
li_waitqueue_remove(&con->wrk->io_timeout_queue, &con->io_timeout_elem);
/* remove from throttle queue */
li_waitqueue_remove(&con->wrk->throttle_queue, &con->throttle.wqueue_elem);
if (con->throttle.pool.ptr) {
if (con->throttle.pool.queued) {
liThrottlePool *pool = con->throttle.pool.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.pool.lnk);
g_atomic_int_add(&con->throttle.pool.ptr->num_cons, -1);
con->throttle.pool.queued = FALSE;
}
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.pool.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.pool.ptr->magazine, con->throttle.con.magazine);
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttle.pool.ptr = NULL;
}
if (con->throttle.ip.ptr) {
if (con->throttle.ip.queued) {
liThrottlePool *pool = con->throttle.ip.ptr;
g_queue_unlink(pool->queues[con->wrk->ndx+pool->current_queue[con->wrk->ndx]], &con->throttle.ip.lnk);
g_atomic_int_add(&con->throttle.ip.ptr->num_cons, -1);
con->throttle.ip.queued = FALSE;
}
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.ip.magazine);
g_atomic_int_add(&con->throttle.ip.ptr->magazine, con->throttle.con.magazine);
con->throttle.ip.ptr = NULL;
}
con->throttle.con.rate = 0;
con->throttle.pool.magazine = 0;
con->throttle.ip.magazine = 0;
con->throttle.con.magazine = 0;
con->throttled = FALSE;
if (con->raw_in->length != 0) {
/* start handling next request if data is already available */
connection_handle_read(con);
}
}
void li_connection_free(liConnection *con) {
con->state = LI_CON_STATE_DEAD;
con->response_headers_sent = FALSE;
con->expect_100_cont = FALSE;
li_server_socket_release(con->srv_sock);
con->srv_sock = NULL;
if (con->wrk)
ev_io_stop(con->wrk->loop, &con->sock_watcher);
if (con->sock_watcher.fd != -1) {
/* just close it; _free should only be called on dead connections anyway */
shutdown(con->sock_watcher.fd, SHUT_WR);
close(con->sock_watcher.fd);
}
ev_io_set(&con->sock_watcher, -1, 0);
g_string_free(con->remote_addr_str, TRUE);
li_sockaddr_clear(&con->remote_addr);
g_string_free(con->local_addr_str, TRUE);
li_sockaddr_clear(&con->local_addr);
con->keep_alive = TRUE;
li_chunkqueue_free(con->raw_in);
li_chunkqueue_free(con->raw_out);
li_vrequest_free(con->mainvr);
li_http_request_parser_clear(&con->req_parser_ctx);
if (con->keep_alive_data.link && con->wrk) {
g_queue_delete_link(&con->wrk->keep_alive_queue, con->keep_alive_data.link);
con->keep_alive_data.link = NULL;
}
con->keep_alive_data.timeout = 0;
con->keep_alive_data.max_idle = 0;
if (con->wrk)
ev_timer_stop(con->wrk->loop, &con->keep_alive_data.watcher);
g_slice_free(liConnection, con);
}
gchar *li_connection_state_str(liConnectionState state) {
static const gchar *states[] = {
"dead",
"keep-alive",
"request start",
"read request header",
"handle main vrequest",
"write"
};
return (gchar*)states[state];
}