[core] pass con around gw_backend instead of srv
This commit is contained in:
parent
304e46d4f8
commit
6f39097ab6
179
src/gw_backend.c
179
src/gw_backend.c
|
@ -211,26 +211,27 @@ static int gw_extension_insert(gw_exts *ext, const buffer *key, gw_host *fh) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void gw_proc_connect_success(server *srv, gw_host *host, gw_proc *proc, int debug) {
|
||||
static void gw_proc_connect_success(connection *con, gw_host *host, gw_proc *proc, int debug) {
|
||||
gw_proc_tag_inc(host, proc, CONST_STR_LEN(".connected"));
|
||||
proc->last_used = srv->cur_ts;
|
||||
proc->last_used = con->srv->cur_ts;
|
||||
|
||||
if (debug) {
|
||||
log_error(srv->errh, __FILE__, __LINE__,
|
||||
log_error(con->conf.errh, __FILE__, __LINE__,
|
||||
"got proc: pid: %d socket: %s load: %d",
|
||||
proc->pid, proc->connection_name->ptr, proc->load);
|
||||
}
|
||||
}
|
||||
|
||||
__attribute_cold__
|
||||
static void gw_proc_connect_error(server *srv, gw_host *host, gw_proc *proc, pid_t pid, int errnum, int debug) {
|
||||
log_error_st *errh = srv->errh;
|
||||
static void gw_proc_connect_error(connection *con, gw_host *host, gw_proc *proc, pid_t pid, int errnum, int debug) {
|
||||
const time_t cur_ts = con->srv->cur_ts;
|
||||
log_error_st * const errh = con->conf.errh;
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
"establishing connection failed: socket: %s: %s",
|
||||
proc->connection_name->ptr, strerror(errnum));
|
||||
|
||||
if (!proc->is_local) {
|
||||
proc->disabled_until = srv->cur_ts + host->disable_time;
|
||||
proc->disabled_until = cur_ts + host->disable_time;
|
||||
gw_proc_set_state(host, proc, PROC_STATE_OVERLOADED);
|
||||
}
|
||||
else if (proc->pid == pid && proc->state == PROC_STATE_RUNNING) {
|
||||
|
@ -261,7 +262,7 @@ static void gw_proc_connect_error(server *srv, gw_host *host, gw_proc *proc, pid
|
|||
"decrease server.max-connections. The load for this FastCGI "
|
||||
"backend %s is %d", proc->connection_name->ptr, proc->load);
|
||||
}
|
||||
proc->disabled_until = srv->cur_ts + host->disable_time;
|
||||
proc->disabled_until = cur_ts + host->disable_time;
|
||||
gw_proc_set_state(host, proc, PROC_STATE_OVERLOADED);
|
||||
}
|
||||
else {
|
||||
|
@ -272,7 +273,7 @@ static void gw_proc_connect_error(server *srv, gw_host *host, gw_proc *proc, pid
|
|||
#if 0
|
||||
gw_proc_set_state(host, proc, PROC_STATE_DIED_WAIT_FOR_PID);
|
||||
#else /* treat as overloaded (future: unless we send kill() signal)*/
|
||||
proc->disabled_until = srv->cur_ts + host->disable_time;
|
||||
proc->disabled_until = cur_ts + host->disable_time;
|
||||
gw_proc_set_state(host, proc, PROC_STATE_OVERLOADED);
|
||||
#endif
|
||||
}
|
||||
|
@ -286,11 +287,11 @@ static void gw_proc_connect_error(server *srv, gw_host *host, gw_proc *proc, pid
|
|||
}
|
||||
}
|
||||
|
||||
static void gw_proc_release(server *srv, gw_host *host, gw_proc *proc, int debug) {
|
||||
static void gw_proc_release(gw_host *host, gw_proc *proc, int debug, log_error_st *errh) {
|
||||
gw_proc_load_dec(host, proc);
|
||||
|
||||
if (debug) {
|
||||
log_error(srv->errh, __FILE__, __LINE__,
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
"released proc: pid: %d socket: %s load: %u",
|
||||
proc->pid, proc->connection_name->ptr, proc->load);
|
||||
}
|
||||
|
@ -944,24 +945,24 @@ static gw_host * gw_host_get(connection *con, gw_extension *extension, int balan
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int gw_establish_connection(server *srv, gw_host *host, gw_proc *proc, pid_t pid, int gw_fd, int debug) {
|
||||
static int gw_establish_connection(connection *con, gw_host *host, gw_proc *proc, pid_t pid, int gw_fd, int debug) {
|
||||
if (-1 == connect(gw_fd, proc->saddr, proc->saddrlen)) {
|
||||
if (errno == EINPROGRESS || errno == EALREADY || errno == EINTR) {
|
||||
if (debug > 2) {
|
||||
log_error(srv->errh, __FILE__, __LINE__,
|
||||
log_error(con->conf.errh, __FILE__, __LINE__,
|
||||
"connect delayed; will continue later: %s",
|
||||
proc->connection_name->ptr);
|
||||
}
|
||||
|
||||
return 1;
|
||||
} else {
|
||||
gw_proc_connect_error(srv, host, proc, pid, errno, debug);
|
||||
gw_proc_connect_error(con, host, proc, pid, errno, debug);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (debug > 1) {
|
||||
log_error(srv->errh, __FILE__, __LINE__,
|
||||
log_error(con->conf.errh, __FILE__, __LINE__,
|
||||
"connect succeeded: %d", gw_fd);
|
||||
}
|
||||
|
||||
|
@ -1073,7 +1074,7 @@ static gw_handler_ctx * handler_ctx_init(size_t sz) {
|
|||
}
|
||||
|
||||
static void handler_ctx_free(gw_handler_ctx *hctx) {
|
||||
/* caller MUST have called gw_backend_close(srv, hctx) if necessary */
|
||||
/* caller MUST have called gw_backend_close(hctx, con) if necessary */
|
||||
if (hctx->handler_ctx_free) hctx->handler_ctx_free(hctx);
|
||||
chunk_buffer_release(hctx->response);
|
||||
|
||||
|
@ -1084,7 +1085,7 @@ static void handler_ctx_free(gw_handler_ctx *hctx) {
|
|||
}
|
||||
|
||||
static void handler_ctx_clear(gw_handler_ctx *hctx) {
|
||||
/* caller MUST have called gw_backend_close(srv, hctx) if necessary */
|
||||
/* caller MUST have called gw_backend_close(hctx, con) if necessary */
|
||||
|
||||
hctx->proc = NULL;
|
||||
hctx->host = NULL;
|
||||
|
@ -1737,18 +1738,19 @@ void gw_set_transparent(gw_handler_ctx *hctx) {
|
|||
}
|
||||
|
||||
|
||||
static void gw_backend_close(server *srv, gw_handler_ctx *hctx) {
|
||||
static void gw_backend_close(gw_handler_ctx *hctx, connection *con) {
|
||||
if (hctx->fd >= 0) {
|
||||
fdevent_fdnode_event_del(srv->ev, hctx->fdn);
|
||||
/*fdevent_unregister(srv->ev, hctx->fd);*//*(handled below)*/
|
||||
fdevent_sched_close(srv->ev, hctx->fd, 1);
|
||||
fdevent_fdnode_event_del(hctx->ev, hctx->fdn);
|
||||
/*fdevent_unregister(ev, hctx->fd);*//*(handled below)*/
|
||||
fdevent_sched_close(hctx->ev, hctx->fd, 1);
|
||||
hctx->fdn = NULL;
|
||||
hctx->fd = -1;
|
||||
}
|
||||
|
||||
if (hctx->host) {
|
||||
if (hctx->proc) {
|
||||
gw_proc_release(srv, hctx->host, hctx->proc, hctx->conf.debug);
|
||||
gw_proc_release(hctx->host, hctx->proc, hctx->conf.debug,
|
||||
con->conf.errh);
|
||||
hctx->proc = NULL;
|
||||
}
|
||||
|
||||
|
@ -1757,11 +1759,10 @@ static void gw_backend_close(server *srv, gw_handler_ctx *hctx) {
|
|||
}
|
||||
}
|
||||
|
||||
static void gw_connection_close(server *srv, gw_handler_ctx *hctx) {
|
||||
static void gw_connection_close(gw_handler_ctx *hctx, connection *con) {
|
||||
gw_plugin_data *p = hctx->plugin_data;
|
||||
connection *con = hctx->remote_conn;
|
||||
|
||||
gw_backend_close(srv, hctx);
|
||||
gw_backend_close(hctx, con);
|
||||
handler_ctx_free(hctx);
|
||||
con->plugin_ctx[p->id] = NULL;
|
||||
|
||||
|
@ -1770,11 +1771,10 @@ static void gw_connection_close(server *srv, gw_handler_ctx *hctx) {
|
|||
}
|
||||
}
|
||||
|
||||
static handler_t gw_reconnect(server *srv, gw_handler_ctx *hctx) {
|
||||
gw_backend_close(srv, hctx);
|
||||
static handler_t gw_reconnect(gw_handler_ctx *hctx, connection *con) {
|
||||
gw_backend_close(hctx, con);
|
||||
|
||||
hctx->host = gw_host_get(hctx->remote_conn, hctx->ext,
|
||||
hctx->conf.balance, hctx->conf.debug);
|
||||
hctx->host = gw_host_get(con,hctx->ext,hctx->conf.balance,hctx->conf.debug);
|
||||
if (NULL == hctx->host) return HANDLER_FINISHED;
|
||||
|
||||
gw_host_assign(hctx->host);
|
||||
|
@ -1789,14 +1789,13 @@ static handler_t gw_reconnect(server *srv, gw_handler_ctx *hctx) {
|
|||
handler_t gw_connection_reset(connection *con, void *p_d) {
|
||||
gw_plugin_data *p = p_d;
|
||||
gw_handler_ctx *hctx = con->plugin_ctx[p->id];
|
||||
if (hctx) gw_connection_close(con->srv, hctx);
|
||||
if (hctx) gw_connection_close(hctx, con);
|
||||
|
||||
return HANDLER_GO_ON;
|
||||
}
|
||||
|
||||
|
||||
static void gw_conditional_tcp_fin(server *srv, gw_handler_ctx *hctx) {
|
||||
connection *con = hctx->remote_conn;
|
||||
static void gw_conditional_tcp_fin(gw_handler_ctx *hctx, connection *con) {
|
||||
/*assert(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_TCP_FIN);*/
|
||||
if (!chunkqueue_is_empty(hctx->wb)) return;
|
||||
if (!hctx->host->tcp_fin_propagate) return;
|
||||
|
@ -1809,10 +1808,10 @@ static void gw_conditional_tcp_fin(server *srv, gw_handler_ctx *hctx) {
|
|||
con->conf.stream_request_body &= ~FDEVENT_STREAM_REQUEST_POLLIN;
|
||||
con->is_readable = 0;
|
||||
shutdown(hctx->fd, SHUT_WR);
|
||||
fdevent_fdnode_event_clr(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_clr(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
}
|
||||
|
||||
static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
||||
static handler_t gw_write_request(gw_handler_ctx *hctx, connection *con) {
|
||||
switch(hctx->state) {
|
||||
case GW_STATE_INIT:
|
||||
/* do we have a running process for this host (max-procs) ? */
|
||||
|
@ -1840,31 +1839,30 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
|
||||
hctx->fd = fdevent_socket_nb_cloexec(hctx->host->family,SOCK_STREAM,0);
|
||||
if (-1 == hctx->fd) {
|
||||
log_error_st * const errh = hctx->remote_conn->conf.errh;
|
||||
log_error_st * const errh = con->conf.errh;
|
||||
if (errno == EMFILE || errno == EINTR) {
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
"wait for fd at connection: %d",
|
||||
hctx->remote_conn->fd);
|
||||
"wait for fd at connection: %d", con->fd);
|
||||
return HANDLER_WAIT_FOR_FD;
|
||||
}
|
||||
|
||||
log_perror(errh, __FILE__, __LINE__,
|
||||
"socket failed %d %d", srv->cur_fds, srv->max_fds);
|
||||
"socket failed %d %d", con->srv->cur_fds, con->srv->max_fds);
|
||||
return HANDLER_ERROR;
|
||||
}
|
||||
|
||||
srv->cur_fds++;
|
||||
++con->srv->cur_fds;
|
||||
|
||||
hctx->fdn = fdevent_register(srv->ev,hctx->fd,gw_handle_fdevent,hctx);
|
||||
hctx->fdn = fdevent_register(hctx->ev,hctx->fd,gw_handle_fdevent,hctx);
|
||||
|
||||
if (hctx->proc->is_local) {
|
||||
hctx->pid = hctx->proc->pid;
|
||||
}
|
||||
|
||||
switch (gw_establish_connection(srv, hctx->host, hctx->proc, hctx->pid,
|
||||
switch (gw_establish_connection(con, hctx->host, hctx->proc, hctx->pid,
|
||||
hctx->fd, hctx->conf.debug)) {
|
||||
case 1: /* connection is in progress */
|
||||
fdevent_fdnode_event_set(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_set(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
gw_set_state(hctx, GW_STATE_CONNECT_DELAYED);
|
||||
return HANDLER_WAIT_FOR_EVENT;
|
||||
case -1:/* connection error */
|
||||
|
@ -1878,14 +1876,14 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
if (hctx->state == GW_STATE_CONNECT_DELAYED) { /*(not GW_STATE_INIT)*/
|
||||
int socket_error = fdevent_connect_status(hctx->fd);
|
||||
if (socket_error != 0) {
|
||||
gw_proc_connect_error(srv, hctx->host, hctx->proc, hctx->pid,
|
||||
gw_proc_connect_error(con, hctx->host, hctx->proc, hctx->pid,
|
||||
socket_error, hctx->conf.debug);
|
||||
return HANDLER_ERROR;
|
||||
}
|
||||
/* go on with preparing the request */
|
||||
}
|
||||
|
||||
gw_proc_connect_success(srv, hctx->host, hctx->proc, hctx->conf.debug);
|
||||
gw_proc_connect_success(con, hctx->host, hctx->proc, hctx->conf.debug);
|
||||
|
||||
gw_set_state(hctx, GW_STATE_PREPARE_WRITE);
|
||||
/* fall through */
|
||||
|
@ -1896,14 +1894,13 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
handler_t rc = hctx->create_env(hctx);
|
||||
if (HANDLER_GO_ON != rc) {
|
||||
if (HANDLER_FINISHED != rc && HANDLER_ERROR != rc)
|
||||
fdevent_fdnode_event_clr(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_clr(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
/*(disable Nagle algorithm if streaming and content-length unknown)*/
|
||||
if (AF_UNIX != hctx->host->family) {
|
||||
connection *con = hctx->remote_conn;
|
||||
if (con->request.content_length < 0) {
|
||||
if (-1 == fdevent_set_tcp_nodelay(hctx->fd, 1)) {
|
||||
/*(error, but not critical)*/
|
||||
|
@ -1911,13 +1908,12 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
}
|
||||
}
|
||||
|
||||
fdevent_fdnode_event_add(srv->ev, hctx->fdn, FDEVENT_IN|FDEVENT_RDHUP);
|
||||
fdevent_fdnode_event_add(hctx->ev, hctx->fdn, FDEVENT_IN|FDEVENT_RDHUP);
|
||||
gw_set_state(hctx, GW_STATE_WRITE);
|
||||
/* fall through */
|
||||
case GW_STATE_WRITE:
|
||||
if (!chunkqueue_is_empty(hctx->wb)) {
|
||||
int ret;
|
||||
log_error_st * const errh = hctx->remote_conn->conf.errh;
|
||||
log_error_st * const errh = con->conf.errh;
|
||||
#if 0
|
||||
if (hctx->conf.debug > 1) {
|
||||
log_error(errh, __FILE__, __LINE__, "sdsx",
|
||||
|
@ -1925,10 +1921,8 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
hctx->fd, chunkqueue_length(hctx->wb));
|
||||
}
|
||||
#endif
|
||||
ret = srv->network_backend_write(hctx->fd, hctx->wb,
|
||||
MAX_WRITE_LIMIT, errh);
|
||||
|
||||
if (ret < 0) {
|
||||
if (con->srv->network_backend_write(hctx->fd, hctx->wb,
|
||||
MAX_WRITE_LIMIT, errh) < 0) {
|
||||
switch(errno) {
|
||||
case EPIPE:
|
||||
case ENOTCONN:
|
||||
|
@ -1952,13 +1946,12 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
}
|
||||
|
||||
if (hctx->wb->bytes_out == hctx->wb_reqlen) {
|
||||
fdevent_fdnode_event_clr(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_clr(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
gw_set_state(hctx, GW_STATE_READ);
|
||||
} else {
|
||||
off_t wblen = hctx->wb->bytes_in - hctx->wb->bytes_out;
|
||||
if ((hctx->wb->bytes_in < hctx->wb_reqlen || hctx->wb_reqlen < 0)
|
||||
&& wblen < 65536 - 16384) {
|
||||
connection *con = hctx->remote_conn;
|
||||
/*(con->conf.stream_request_body & FDEVENT_STREAM_REQUEST)*/
|
||||
if (!(con->conf.stream_request_body
|
||||
& FDEVENT_STREAM_REQUEST_POLLIN)) {
|
||||
|
@ -1968,30 +1961,29 @@ static handler_t gw_write_request(server *srv, gw_handler_ctx *hctx) {
|
|||
}
|
||||
}
|
||||
if (0 == wblen) {
|
||||
fdevent_fdnode_event_clr(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_clr(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
} else {
|
||||
fdevent_fdnode_event_add(srv->ev, hctx->fdn, FDEVENT_OUT);
|
||||
fdevent_fdnode_event_add(hctx->ev, hctx->fdn, FDEVENT_OUT);
|
||||
}
|
||||
}
|
||||
|
||||
if (hctx->remote_conn->conf.stream_request_body
|
||||
if (con->conf.stream_request_body
|
||||
& FDEVENT_STREAM_REQUEST_TCP_FIN)
|
||||
gw_conditional_tcp_fin(srv, hctx);
|
||||
gw_conditional_tcp_fin(hctx, con);
|
||||
|
||||
return HANDLER_WAIT_FOR_EVENT;
|
||||
case GW_STATE_READ:
|
||||
/* waiting for a response */
|
||||
return HANDLER_WAIT_FOR_EVENT;
|
||||
default:
|
||||
log_error(hctx->remote_conn->conf.errh, __FILE__, __LINE__,
|
||||
log_error(con->conf.errh, __FILE__, __LINE__,
|
||||
"(debug) unknown state");
|
||||
return HANDLER_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
__attribute_cold__
|
||||
static handler_t gw_write_error(server *srv, gw_handler_ctx *hctx) {
|
||||
connection *con = hctx->remote_conn;
|
||||
static handler_t gw_write_error(gw_handler_ctx *hctx, connection *con) {
|
||||
int status = con->http_status;
|
||||
|
||||
if (hctx->state == GW_STATE_INIT ||
|
||||
|
@ -1999,26 +1991,26 @@ static handler_t gw_write_error(server *srv, gw_handler_ctx *hctx) {
|
|||
|
||||
/* (optimization to detect backend process exit while processing a
|
||||
* large number of ready events; (this block could be removed)) */
|
||||
if (0 == srv->srvconf.max_worker)
|
||||
gw_restart_dead_procs(srv, hctx->host, hctx->conf.debug, 0);
|
||||
if (0 == con->srv->srvconf.max_worker)
|
||||
gw_restart_dead_procs(con->srv, hctx->host, hctx->conf.debug, 0);
|
||||
|
||||
/* cleanup this request and let request handler start request again */
|
||||
if (hctx->reconnects++ < 5) return gw_reconnect(srv, hctx);
|
||||
if (hctx->reconnects++ < 5) return gw_reconnect(hctx, con);
|
||||
}
|
||||
|
||||
if (hctx->backend_error) hctx->backend_error(hctx);
|
||||
gw_connection_close(srv, hctx);
|
||||
gw_connection_close(hctx, con);
|
||||
con->http_status = (status == 400) ? 400 : 503;
|
||||
return HANDLER_FINISHED;
|
||||
}
|
||||
|
||||
static handler_t gw_send_request(server *srv, gw_handler_ctx *hctx) {
|
||||
handler_t rc = gw_write_request(srv, hctx);
|
||||
return (HANDLER_ERROR != rc) ? rc : gw_write_error(srv, hctx);
|
||||
static handler_t gw_send_request(gw_handler_ctx *hctx, connection *con) {
|
||||
handler_t rc = gw_write_request(hctx, con);
|
||||
return (HANDLER_ERROR != rc) ? rc : gw_write_error(hctx, con);
|
||||
}
|
||||
|
||||
|
||||
static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx);
|
||||
static handler_t gw_recv_response(gw_handler_ctx *hctx, connection *con);
|
||||
|
||||
|
||||
handler_t gw_handle_subrequest(connection *con, void *p_d) {
|
||||
|
@ -2027,19 +2019,17 @@ handler_t gw_handle_subrequest(connection *con, void *p_d) {
|
|||
if (NULL == hctx) return HANDLER_GO_ON;
|
||||
if (con->mode != p->id) return HANDLER_GO_ON; /* not my job */
|
||||
|
||||
server * const srv = con->srv;
|
||||
|
||||
if ((con->conf.stream_response_body & FDEVENT_STREAM_RESPONSE_BUFMIN)
|
||||
&& con->file_started) {
|
||||
if (chunkqueue_length(con->write_queue) > 65536 - 4096) {
|
||||
fdevent_fdnode_event_clr(srv->ev, hctx->fdn, FDEVENT_IN);
|
||||
fdevent_fdnode_event_clr(hctx->ev, hctx->fdn, FDEVENT_IN);
|
||||
}
|
||||
else if (!(fdevent_fdnode_interest(hctx->fdn) & FDEVENT_IN)) {
|
||||
/* optimistic read from backend */
|
||||
handler_t rc;
|
||||
rc = gw_recv_response(srv, hctx); /*(might invalidate hctx)*/
|
||||
rc = gw_recv_response(hctx, con); /*(might invalidate hctx)*/
|
||||
if (rc != HANDLER_GO_ON) return rc; /*(unless HANDLER_GO_ON)*/
|
||||
fdevent_fdnode_event_add(srv->ev, hctx->fdn, FDEVENT_IN);
|
||||
fdevent_fdnode_event_add(hctx->ev, hctx->fdn, FDEVENT_IN);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2106,20 +2096,19 @@ handler_t gw_handle_subrequest(connection *con, void *p_d) {
|
|||
{
|
||||
handler_t rc =((0==hctx->wb->bytes_in || !chunkqueue_is_empty(hctx->wb))
|
||||
&& hctx->state != GW_STATE_CONNECT_DELAYED)
|
||||
? gw_send_request(srv, hctx)
|
||||
? gw_send_request(hctx, con)
|
||||
: HANDLER_WAIT_FOR_EVENT;
|
||||
if (HANDLER_WAIT_FOR_EVENT != rc) return rc;
|
||||
}
|
||||
|
||||
if (con->conf.stream_request_body & FDEVENT_STREAM_REQUEST_TCP_FIN)
|
||||
gw_conditional_tcp_fin(srv, hctx);
|
||||
gw_conditional_tcp_fin(hctx, con);
|
||||
|
||||
return HANDLER_WAIT_FOR_EVENT;
|
||||
}
|
||||
|
||||
|
||||
static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
||||
connection *con = hctx->remote_conn;
|
||||
static handler_t gw_recv_response(gw_handler_ctx *hctx, connection *con) {
|
||||
gw_proc *proc = hctx->proc;
|
||||
gw_host *host = hctx->host;
|
||||
/*(XXX: make this a configurable flag for other protocols)*/
|
||||
|
@ -2155,8 +2144,8 @@ static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
|||
physpath = con->physical.path;
|
||||
}
|
||||
|
||||
proc->last_used = srv->cur_ts;
|
||||
gw_backend_close(srv, hctx);
|
||||
proc->last_used = con->srv->cur_ts;
|
||||
gw_backend_close(hctx, con);
|
||||
handler_ctx_clear(hctx);
|
||||
|
||||
/* don't do more than 6 loops here; normally shouldn't happen */
|
||||
|
@ -2183,7 +2172,7 @@ static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
|||
return HANDLER_COMEBACK;
|
||||
} else {
|
||||
/* we are done */
|
||||
gw_connection_close(srv, hctx);
|
||||
gw_connection_close(hctx, con);
|
||||
}
|
||||
|
||||
return HANDLER_FINISHED;
|
||||
|
@ -2191,19 +2180,20 @@ static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
|||
case HANDLER_ERROR:
|
||||
/* (optimization to detect backend process exit while processing a
|
||||
* large number of ready events; (this block could be removed)) */
|
||||
errh = hctx->remote_conn->conf.errh;
|
||||
errh = con->conf.errh;
|
||||
if (proc->is_local && 1 == proc->load && proc->pid == hctx->pid
|
||||
&& proc->state != PROC_STATE_DIED && 0 == srv->srvconf.max_worker) {
|
||||
&& proc->state != PROC_STATE_DIED
|
||||
&& 0 == con->srv->srvconf.max_worker) {
|
||||
/* intentionally check proc->disabed_until before gw_proc_waitpid */
|
||||
if (proc->disabled_until < srv->cur_ts
|
||||
&& 0 != gw_proc_waitpid(srv, host, proc)) {
|
||||
if (proc->disabled_until < con->srv->cur_ts
|
||||
&& 0 != gw_proc_waitpid(con->srv, host, proc)) {
|
||||
if (hctx->conf.debug) {
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
"--- gw spawning\n\tsocket %s\n\tcurrent: 1/%d",
|
||||
proc->connection_name->ptr, host->num_procs);
|
||||
}
|
||||
|
||||
if (gw_spawn_connection(srv, host, proc, hctx->conf.debug)) {
|
||||
if (gw_spawn_connection(con->srv,host,proc,hctx->conf.debug)) {
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
"respawning failed, will retry later");
|
||||
}
|
||||
|
@ -2222,7 +2212,7 @@ static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
|||
proc->connection_name->ptr,
|
||||
con->uri.path->ptr, BUFFER_INTLEN_PTR(con->uri.query));
|
||||
|
||||
return gw_reconnect(srv, hctx);
|
||||
return gw_reconnect(hctx, con);
|
||||
}
|
||||
|
||||
log_error(errh, __FILE__, __LINE__,
|
||||
|
@ -2240,7 +2230,7 @@ static handler_t gw_recv_response(server *srv, gw_handler_ctx *hctx) {
|
|||
|
||||
if (hctx->backend_error) hctx->backend_error(hctx);
|
||||
http_response_backend_error(con);
|
||||
gw_connection_close(srv, hctx);
|
||||
gw_connection_close(hctx, con);
|
||||
return HANDLER_FINISHED;
|
||||
}
|
||||
}
|
||||
|
@ -2253,12 +2243,12 @@ static handler_t gw_handle_fdevent(server *srv, void *ctx, int revents) {
|
|||
joblist_append(srv, con);
|
||||
|
||||
if (revents & FDEVENT_IN) {
|
||||
handler_t rc = gw_recv_response(srv, hctx); /*(might invalidate hctx)*/
|
||||
handler_t rc = gw_recv_response(hctx, con); /*(might invalidate hctx)*/
|
||||
if (rc != HANDLER_GO_ON) return rc; /*(unless HANDLER_GO_ON)*/
|
||||
}
|
||||
|
||||
if (revents & FDEVENT_OUT) {
|
||||
return gw_send_request(srv, hctx); /*(might invalidate hctx)*/
|
||||
return gw_send_request(hctx, con); /*(might invalidate hctx)*/
|
||||
}
|
||||
|
||||
/* perhaps this issue is already handled */
|
||||
|
@ -2273,7 +2263,7 @@ static handler_t gw_handle_fdevent(server *srv, void *ctx, int revents) {
|
|||
* FIXME: as it is a bit ugly.
|
||||
*
|
||||
*/
|
||||
gw_send_request(srv, hctx);
|
||||
gw_send_request(hctx, con);
|
||||
} else if (con->file_started) {
|
||||
/* drain any remaining data from kernel pipe buffers
|
||||
* even if (con->conf.stream_response_body
|
||||
|
@ -2285,7 +2275,7 @@ static handler_t gw_handle_fdevent(server *srv, void *ctx, int revents) {
|
|||
con->conf.stream_response_body &= ~FDEVENT_STREAM_RESPONSE_BUFMIN;
|
||||
con->conf.stream_response_body |= FDEVENT_STREAM_RESPONSE_POLLRDHUP;
|
||||
do {
|
||||
rc = gw_recv_response(srv,hctx); /*(might invalidate hctx)*/
|
||||
rc = gw_recv_response(hctx,con); /*(might invalidate hctx)*/
|
||||
} while (rc == HANDLER_GO_ON); /*(unless HANDLER_GO_ON)*/
|
||||
con->conf.stream_response_body = flags;
|
||||
return rc; /* HANDLER_FINISHED or HANDLER_ERROR */
|
||||
|
@ -2297,7 +2287,7 @@ static handler_t gw_handle_fdevent(server *srv, void *ctx, int revents) {
|
|||
con->uri.path->ptr, BUFFER_INTLEN_PTR(con->uri.query),
|
||||
proc->connection_name->ptr, hctx->state);
|
||||
|
||||
gw_connection_close(srv, hctx);
|
||||
gw_connection_close(hctx, con);
|
||||
}
|
||||
} else if (revents & FDEVENT_ERR) {
|
||||
log_error(con->conf.errh, __FILE__, __LINE__,
|
||||
|
@ -2305,7 +2295,7 @@ static handler_t gw_handle_fdevent(server *srv, void *ctx, int revents) {
|
|||
|
||||
if (hctx->backend_error) hctx->backend_error(hctx);
|
||||
http_response_backend_error(con);
|
||||
gw_connection_close(srv, hctx);
|
||||
gw_connection_close(hctx, con);
|
||||
}
|
||||
|
||||
return HANDLER_FINISHED;
|
||||
|
@ -2487,6 +2477,7 @@ handler_t gw_check_extension(connection *con, gw_plugin_data *p, int uri_path_ha
|
|||
|
||||
if (!hctx) hctx = handler_ctx_init(hctx_sz);
|
||||
|
||||
hctx->ev = con->srv->ev;
|
||||
hctx->remote_conn = con;
|
||||
hctx->plugin_data = p;
|
||||
hctx->host = host;
|
||||
|
|
|
@ -285,6 +285,8 @@ typedef enum {
|
|||
GW_STATE_READ
|
||||
} gw_connection_state_t;
|
||||
|
||||
struct fdevents; /* declaration */
|
||||
|
||||
#define GW_RESPONDER 1
|
||||
#define GW_AUTHORIZER 2
|
||||
#define GW_FILTER 3 /*(not implemented)*/
|
||||
|
@ -305,6 +307,7 @@ typedef struct gw_handler_ctx {
|
|||
|
||||
buffer *response;
|
||||
|
||||
struct fdevents *ev;
|
||||
fdnode *fdn; /* fdevent (fdnode *) object */
|
||||
int fd; /* fd to the gw process */
|
||||
|
||||
|
|
Loading…
Reference in New Issue