Move stats and throttle to network.c
parent
fae25515fa
commit
91d4868ef4
|
@ -24,15 +24,14 @@ LI_API network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq);
|
|||
LI_API network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq);
|
||||
|
||||
/* use writev for mem chunks, buffered read/write for files */
|
||||
LI_API network_status_t network_write_writev(vrequest *vr, int fd, chunkqueue *cq);
|
||||
LI_API network_status_t network_write_writev(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max);
|
||||
|
||||
/* use sendfile for files, writev for mem chunks */
|
||||
LI_API network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq);
|
||||
LI_API network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max);
|
||||
|
||||
/* write backends */
|
||||
LI_API network_status_t network_backend_write(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max);
|
||||
LI_API network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max);
|
||||
LI_API network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max);
|
||||
|
||||
#define NETWORK_FALLBACK(f, write_max) do { \
|
||||
network_status_t res; \
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
#include <lighttpd/plugin_core.h>
|
||||
|
||||
/** repeats write after EINTR */
|
||||
ssize_t net_write(int fd, void *buf, ssize_t nbyte) {
|
||||
|
@ -37,10 +38,24 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) {
|
|||
|
||||
network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
||||
network_status_t res;
|
||||
ev_tstamp now = CUR_TS(vr->con->wrk);
|
||||
ev_tstamp ts, now = CUR_TS(vr->con->wrk);
|
||||
worker *wrk;
|
||||
#ifdef TCP_CORK
|
||||
int corked = 0;
|
||||
#endif
|
||||
goffset write_max = 256*1024, write_bytes, wrote; /* 256 kb */
|
||||
|
||||
if (CORE_OPTION(CORE_OPTION_THROTTLE).number) {
|
||||
/* throttling is enabled */
|
||||
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->con->wrk->throttle_queue.delay)) {
|
||||
vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts);
|
||||
if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number)
|
||||
vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number;
|
||||
vr->con->throttle.ts = now;
|
||||
/*g_print("throttle magazine: %u kbytes\n", vr->con->throttle.magazine / 1024);*/
|
||||
}
|
||||
write_max = vr->con->throttle.magazine;
|
||||
}
|
||||
|
||||
#ifdef TCP_CORK
|
||||
/* Linux: put a cork into the socket as we want to combine the write() calls
|
||||
|
@ -53,7 +68,8 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
#endif
|
||||
|
||||
/* res = network_write_writev(con, fd, cq); */
|
||||
res = network_write_sendfile(vr, fd, cq);
|
||||
write_bytes = write_max;
|
||||
res = network_write_sendfile(vr, fd, cq, &write_bytes);
|
||||
|
||||
#ifdef TCP_CORK
|
||||
if (corked) {
|
||||
|
@ -62,6 +78,30 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
}
|
||||
#endif
|
||||
|
||||
vr->con->throttle.magazine = write_bytes;
|
||||
/* check if throttle magazine is empty */
|
||||
if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_bytes == 0) {
|
||||
/* remove EV_WRITE from sockwatcher for now */
|
||||
ev_io_rem_events(vr->con->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
|
||||
waitqueue_push(&vr->con->wrk->throttle_queue, &vr->con->throttle.queue_elem);
|
||||
return NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
}
|
||||
|
||||
/* stats */
|
||||
wrote = write_max - write_bytes;
|
||||
wrk = vr->con->wrk;
|
||||
wrk->stats.bytes_out += wrote;
|
||||
vr->con->stats.bytes_out += wrote;
|
||||
|
||||
/* update 5s stats */
|
||||
ts = CUR_TS(wrk);
|
||||
|
||||
if ((ts - vr->con->stats.last_avg) >= 5.0) {
|
||||
vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s;
|
||||
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
|
||||
vr->con->stats.last_avg = ts;
|
||||
}
|
||||
|
||||
/* only update once a second, the cast is to round the timestamp */
|
||||
if ((vr->con->io_timeout_elem.ts + 1.) < now)
|
||||
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
|
||||
#include <lighttpd/base.h>
|
||||
#include <lighttpd/plugin_core.h>
|
||||
|
||||
/* first chunk must be a FILE_CHUNK ! */
|
||||
network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max) {
|
||||
|
@ -9,8 +8,6 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
|
|||
gboolean did_write_something = FALSE;
|
||||
chunkiter ci;
|
||||
chunk *c;
|
||||
worker *wrk;
|
||||
ev_tstamp ts;
|
||||
|
||||
if (0 == cq->length) return NETWORK_STATUS_FATAL_ERROR;
|
||||
|
||||
|
@ -40,7 +37,7 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
|
|||
#if EWOULDBLOCK != EAGAIN
|
||||
case EWOULDBLOCK
|
||||
#endif
|
||||
return did_write_something ? NETWORK_STATUS_SUCCESS : NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
return NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
case ECONNRESET:
|
||||
case EPIPE:
|
||||
return NETWORK_STATUS_CONNECTION_CLOSE;
|
||||
|
@ -69,73 +66,35 @@ network_status_t network_backend_sendfile(vrequest *vr, int fd, chunkqueue *cq,
|
|||
VR_ERROR(vr, "%s", "File shrinked, aborting");
|
||||
return NETWORK_STATUS_FATAL_ERROR;
|
||||
}
|
||||
return did_write_something ? NETWORK_STATUS_SUCCESS : NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
return NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
}
|
||||
chunkqueue_skip(cq, r);
|
||||
*write_max -= r;
|
||||
did_write_something = TRUE;
|
||||
|
||||
/* stats */
|
||||
wrk = vr->con->wrk;
|
||||
wrk->stats.bytes_out += r;
|
||||
vr->con->stats.bytes_out += r;
|
||||
|
||||
/* update 5s stats */
|
||||
ts = CUR_TS(wrk);
|
||||
|
||||
if ((ts - vr->con->stats.last_avg) >= 5.0) {
|
||||
vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s;
|
||||
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
|
||||
vr->con->stats.last_avg = ts;
|
||||
}
|
||||
|
||||
if (0 == cq->length) return NETWORK_STATUS_SUCCESS;
|
||||
} while (r == toSend && *write_max > 0);
|
||||
if (r != toSend) return NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
} while (*write_max > 0);
|
||||
|
||||
return NETWORK_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq) {
|
||||
goffset write_max;
|
||||
|
||||
if (CORE_OPTION(CORE_OPTION_THROTTLE).number) {
|
||||
/* throttling is enabled */
|
||||
ev_tstamp now = CUR_TS(vr->con->wrk);
|
||||
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->con->wrk->throttle_queue.delay)) {
|
||||
vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts);
|
||||
if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number)
|
||||
vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number;
|
||||
vr->con->throttle.ts = now;
|
||||
/*g_print("throttle magazine: %u kbytes\n", vr->con->throttle.magazine / 1024);*/
|
||||
}
|
||||
write_max = vr->con->throttle.magazine;
|
||||
} else
|
||||
write_max = 256*1024; /* 256kB */
|
||||
|
||||
network_status_t network_write_sendfile(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max) {
|
||||
if (cq->length == 0) return NETWORK_STATUS_FATAL_ERROR;
|
||||
|
||||
do {
|
||||
switch (chunkqueue_first_chunk(cq)->type) {
|
||||
case MEM_CHUNK:
|
||||
NETWORK_FALLBACK(network_backend_writev, &write_max);
|
||||
NETWORK_FALLBACK(network_backend_writev, write_max);
|
||||
break;
|
||||
case FILE_CHUNK:
|
||||
NETWORK_FALLBACK(network_backend_sendfile, &write_max);
|
||||
NETWORK_FALLBACK(network_backend_sendfile, write_max);
|
||||
break;
|
||||
default:
|
||||
return NETWORK_STATUS_FATAL_ERROR;
|
||||
}
|
||||
|
||||
/* check if throttle magazine is empty */
|
||||
if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_max == 0) {
|
||||
/* remove EV_WRITE from sockwatcher for now */
|
||||
vr->con->throttle.magazine = 0;
|
||||
ev_io_rem_events(vr->con->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
|
||||
waitqueue_push(&vr->con->wrk->throttle_queue, &vr->con->throttle.queue_elem);
|
||||
return NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
}
|
||||
|
||||
if (cq->length == 0) return NETWORK_STATUS_SUCCESS;
|
||||
} while (write_max > 0);
|
||||
} while (*write_max > 0);
|
||||
return NETWORK_STATUS_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -31,8 +31,6 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
|
|||
gboolean did_write_something = FALSE;
|
||||
chunkiter ci;
|
||||
chunk *c;
|
||||
worker *wrk;
|
||||
ev_tstamp ts;
|
||||
network_status_t res = NETWORK_STATUS_FATAL_ERROR;
|
||||
|
||||
GArray *chunks = g_array_sized_new(FALSE, TRUE, sizeof(struct iovec), UIO_MAXIOV);
|
||||
|
@ -69,7 +67,7 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
|
|||
#if EWOULDBLOCK != EAGAIN
|
||||
case EWOULDBLOCK:
|
||||
#endif
|
||||
res = did_write_something ? NETWORK_STATUS_SUCCESS : NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
res = NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
goto cleanup;
|
||||
case ECONNRESET:
|
||||
case EPIPE:
|
||||
|
@ -83,28 +81,14 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
|
|||
}
|
||||
}
|
||||
if (0 == r) {
|
||||
res = did_write_something ? NETWORK_STATUS_SUCCESS : NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
res = NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
goto cleanup;
|
||||
}
|
||||
chunkqueue_skip(cq, r);
|
||||
*write_max -= r;
|
||||
|
||||
/* stats */
|
||||
wrk = vr->con->wrk;
|
||||
vr->con->wrk->stats.bytes_out += r;
|
||||
vr->con->stats.bytes_out += r;
|
||||
|
||||
/* update 5s stats */
|
||||
ts = CUR_TS(wrk);
|
||||
|
||||
if ((ts - vr->con->stats.last_avg) > 5) {
|
||||
vr->con->stats.bytes_out_5s_diff = vr->con->stats.bytes_out - vr->con->stats.bytes_out_5s;
|
||||
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
|
||||
vr->con->stats.last_avg = ts;
|
||||
}
|
||||
|
||||
if (r != we_have) {
|
||||
res = NETWORK_STATUS_SUCCESS;
|
||||
res = NETWORK_STATUS_WAIT_FOR_EVENT;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
|
@ -113,7 +97,6 @@ network_status_t network_backend_writev(vrequest *vr, int fd, chunkqueue *cq, go
|
|||
goto cleanup;
|
||||
}
|
||||
|
||||
|
||||
did_write_something = TRUE;
|
||||
g_array_set_size(chunks, 0);
|
||||
} while (*write_max > 0);
|
||||
|
@ -125,21 +108,20 @@ cleanup:
|
|||
return res;
|
||||
}
|
||||
|
||||
network_status_t network_write_writev(vrequest *vr, int fd, chunkqueue *cq) {
|
||||
goffset write_max = 256*1024; // 256k //;
|
||||
network_status_t network_write_writev(vrequest *vr, int fd, chunkqueue *cq, goffset *write_max) {
|
||||
if (cq->length == 0) return NETWORK_STATUS_FATAL_ERROR;
|
||||
do {
|
||||
switch (chunkqueue_first_chunk(cq)->type) {
|
||||
case MEM_CHUNK:
|
||||
NETWORK_FALLBACK(network_backend_writev, &write_max);
|
||||
NETWORK_FALLBACK(network_backend_writev, write_max);
|
||||
break;
|
||||
case FILE_CHUNK:
|
||||
NETWORK_FALLBACK(network_backend_write, &write_max);
|
||||
NETWORK_FALLBACK(network_backend_write, write_max);
|
||||
break;
|
||||
default:
|
||||
return NETWORK_STATUS_FATAL_ERROR;
|
||||
}
|
||||
if (cq->length == 0) return NETWORK_STATUS_SUCCESS;
|
||||
} while (write_max > 0);
|
||||
} while (*write_max > 0);
|
||||
return NETWORK_STATUS_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue