|
|
|
@ -38,7 +38,7 @@ ssize_t net_read(int fd, void *buf, ssize_t nbyte) {
|
|
|
|
|
|
|
|
|
|
network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
network_status_t res;
|
|
|
|
|
ev_tstamp ts, now = CUR_TS(vr->con->wrk);
|
|
|
|
|
ev_tstamp ts, now = CUR_TS(vr->wrk);
|
|
|
|
|
worker *wrk;
|
|
|
|
|
#ifdef TCP_CORK
|
|
|
|
|
int corked = 0;
|
|
|
|
@ -47,7 +47,7 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
|
|
|
|
|
if (CORE_OPTION(CORE_OPTION_THROTTLE).number) {
|
|
|
|
|
/* throttling is enabled */
|
|
|
|
|
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->con->wrk->throttle_queue.delay)) {
|
|
|
|
|
if (G_UNLIKELY((now - vr->con->throttle.ts) > vr->wrk->throttle_queue.delay)) {
|
|
|
|
|
vr->con->throttle.magazine += CORE_OPTION(CORE_OPTION_THROTTLE).number * (now - vr->con->throttle.ts);
|
|
|
|
|
if (vr->con->throttle.magazine > CORE_OPTION(CORE_OPTION_THROTTLE).number)
|
|
|
|
|
vr->con->throttle.magazine = CORE_OPTION(CORE_OPTION_THROTTLE).number;
|
|
|
|
@ -85,7 +85,7 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/* stats */
|
|
|
|
|
wrk = vr->con->wrk;
|
|
|
|
|
wrk = vr->wrk;
|
|
|
|
|
wrk->stats.bytes_out += wrote;
|
|
|
|
|
vr->con->stats.bytes_out += wrote;
|
|
|
|
|
|
|
|
|
@ -93,21 +93,21 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
ts = CUR_TS(wrk);
|
|
|
|
|
|
|
|
|
|
if ((ts - vr->con->stats.last_avg) >= 5.0) {
|
|
|
|
|
vr->con->stats.bytes_out_5s_diff = vr->con->wrk->stats.bytes_out - vr->con->wrk->stats.bytes_out_5s;
|
|
|
|
|
vr->con->stats.bytes_out_5s_diff = vr->wrk->stats.bytes_out - vr->wrk->stats.bytes_out_5s;
|
|
|
|
|
vr->con->stats.bytes_out_5s = vr->con->stats.bytes_out;
|
|
|
|
|
vr->con->stats.last_avg = ts;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* only update once a second, the cast is to round the timestamp */
|
|
|
|
|
if ((vr->con->io_timeout_elem.ts + 1.) < now)
|
|
|
|
|
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
|
|
|
|
waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
|
|
|
|
|
|
|
|
|
vr->con->throttle.magazine = write_bytes;
|
|
|
|
|
/* check if throttle magazine is empty */
|
|
|
|
|
if (CORE_OPTION(CORE_OPTION_THROTTLE).number && write_bytes == 0) {
|
|
|
|
|
/* remove EV_WRITE from sockwatcher for now */
|
|
|
|
|
ev_io_rem_events(vr->con->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
|
|
|
|
|
waitqueue_push(&vr->con->wrk->throttle_queue, &vr->con->throttle.queue_elem);
|
|
|
|
|
ev_io_rem_events(vr->wrk->loop, &vr->con->sock_watcher, EV_WRITE);
|
|
|
|
|
waitqueue_push(&vr->wrk->throttle_queue, &vr->con->throttle.queue_elem);
|
|
|
|
|
return NETWORK_STATUS_WAIT_FOR_AIO_EVENT;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -119,7 +119,7 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
off_t max_read = 16 * blocksize; /* 256k */
|
|
|
|
|
ssize_t r;
|
|
|
|
|
off_t len = 0;
|
|
|
|
|
worker *wrk = vr->con->wrk;
|
|
|
|
|
worker *wrk = vr->wrk;
|
|
|
|
|
ev_tstamp now = CUR_TS(wrk);
|
|
|
|
|
|
|
|
|
|
if (cq->limit && cq->limit->limit > 0) {
|
|
|
|
@ -134,7 +134,7 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
|
|
|
|
|
/* only update once a second */
|
|
|
|
|
if ((vr->con->io_timeout_elem.ts + 1.) < now)
|
|
|
|
|
waitqueue_push(&vr->con->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
|
|
|
|
waitqueue_push(&vr->wrk->io_timeout_queue, &vr->con->io_timeout_elem);
|
|
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
GByteArray *buf = g_byte_array_sized_new(blocksize);
|
|
|
|
@ -162,7 +162,7 @@ network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
|
|
|
|
len += r;
|
|
|
|
|
|
|
|
|
|
/* stats */
|
|
|
|
|
wrk = vr->con->wrk;
|
|
|
|
|
wrk = vr->wrk;
|
|
|
|
|
wrk->stats.bytes_in += r;
|
|
|
|
|
vr->con->stats.bytes_in += r;
|
|
|
|
|
|
|
|
|
|