Change mod_fastcgi to not wait for complete packets (enables limits < 64kbyte); fix some limit handling.
parent
767ad93e83
commit
98a36e970a
12
src/chunk.c
12
src/chunk.c
|
@ -367,12 +367,12 @@ static void cqlimit_update(chunkqueue *cq, goffset d) {
|
|||
cq->mem_usage += d;
|
||||
assert(cq->mem_usage >= 0);
|
||||
cql = cq->limit;
|
||||
fprintf(stderr, "cqlimit_update: cq->mem_usage: %"L_GOFFSET_FORMAT"\n", cq->mem_usage);
|
||||
g_printerr("cqlimit_update: cq->mem_usage: %"L_GOFFSET_FORMAT"\n", cq->mem_usage);
|
||||
|
||||
if (!cql) return;
|
||||
cql->current += d;
|
||||
assert(cql->current >= 0);
|
||||
fprintf(stderr, "cqlimit_update: cql->current: %"L_GOFFSET_FORMAT", cql->limit: %"L_GOFFSET_FORMAT"\n", cql->current, cql->limit);
|
||||
g_printerr("cqlimit_update: cql->current: %"L_GOFFSET_FORMAT", cql->limit: %"L_GOFFSET_FORMAT"\n", cql->current, cql->limit);
|
||||
if (cql->locked) {
|
||||
if (cql->limit <= 0 || cql->current < cql->limit) {
|
||||
cqlimit_unlock(cql);
|
||||
|
@ -445,9 +445,13 @@ void chunkqueue_use_limit(chunkqueue *cq, vrequest *vr) {
|
|||
}
|
||||
|
||||
void chunkqueue_set_limit(chunkqueue *cq, cqlimit* cql) {
|
||||
gboolean upd_limit = (cql != cq->limit);
|
||||
goffset memusage = cq->mem_usage;
|
||||
if (cql) cqlimit_acquire(cql);
|
||||
if (upd_limit) cqlimit_update(cq, -memusage);
|
||||
cqlimit_release(cq->limit);
|
||||
cq->limit = cql;
|
||||
if (upd_limit) cqlimit_update(cq, memusage);
|
||||
}
|
||||
|
||||
/* pass ownership of str to chunkqueue, do not free/modify it afterwards
|
||||
|
@ -573,6 +577,7 @@ goffset chunkqueue_steal_len(chunkqueue *out, chunkqueue *in, goffset length) {
|
|||
out->length += bytes;
|
||||
cqlimit_update(out, memoutbytes);
|
||||
cqlimit_update(in, meminbytes);
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
|
@ -601,7 +606,7 @@ goffset chunkqueue_steal_all(chunkqueue *out, chunkqueue *in) {
|
|||
/* update the queue tail and length */
|
||||
out->queue->tail = in->queue->tail;
|
||||
out->queue->length += in->queue->length;
|
||||
/* reset in->queue) */
|
||||
/* reset in->queue */
|
||||
g_queue_init(in->queue);
|
||||
}
|
||||
/* count bytes in chunkqueues */
|
||||
|
@ -610,6 +615,7 @@ goffset chunkqueue_steal_all(chunkqueue *out, chunkqueue *in) {
|
|||
in->length = 0;
|
||||
out->bytes_in += len;
|
||||
out->length += len;
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ struct FCGI_Record {
|
|||
guint16 requestID;
|
||||
guint16 contentLength;
|
||||
guint8 paddingLength;
|
||||
gint remainingContent, remainingPadding;
|
||||
gboolean valid, first;
|
||||
};
|
||||
|
||||
|
||||
|
@ -162,7 +164,7 @@ static void fastcgi_context_acquire(fastcgi_context *ctx) {
|
|||
static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents);
|
||||
|
||||
static fastcgi_connection* fastcgi_connection_new(vrequest *vr, fastcgi_context *ctx) {
|
||||
fastcgi_connection* fcon = g_slice_new(fastcgi_connection);
|
||||
fastcgi_connection* fcon = g_slice_new0(fastcgi_connection);
|
||||
|
||||
fastcgi_context_acquire(ctx);
|
||||
fcon->ctx = ctx;
|
||||
|
@ -467,7 +469,22 @@ static void fastcgi_forward_request(vrequest *vr, fastcgi_connection *fcon) {
|
|||
|
||||
static gboolean fastcgi_get_packet(fastcgi_connection *fcon) {
|
||||
const unsigned char *data;
|
||||
gint len;
|
||||
|
||||
/* already got packet */
|
||||
if (fcon->fcgi_in_record.valid) {
|
||||
if (0 == fcon->fcgi_in_record.remainingContent) {
|
||||
/* wait for padding data ? */
|
||||
gint len = fcon->fcgi_in->length;
|
||||
if (len > fcon->fcgi_in_record.remainingPadding) len = fcon->fcgi_in_record.remainingPadding;
|
||||
chunkqueue_skip(fcon->fcgi_in, len);
|
||||
fcon->fcgi_in_record.remainingPadding -= len;
|
||||
if (0 != fcon->fcgi_in_record.remainingPadding) return TRUE; /* wait for data */
|
||||
fcon->fcgi_in_record.valid = FALSE; /* read next packet */
|
||||
} else {
|
||||
return TRUE; /* wait for/handle more content */
|
||||
}
|
||||
}
|
||||
|
||||
if (!chunkqueue_extract_to(fcon->vr, fcon->fcgi_in, FCGI_HEADER_LEN, fcon->buf_in_record)) return FALSE; /* need more data */
|
||||
|
||||
data = (const unsigned char*) fcon->buf_in_record->str;
|
||||
|
@ -476,18 +493,30 @@ static gboolean fastcgi_get_packet(fastcgi_connection *fcon) {
|
|||
fcon->fcgi_in_record.requestID = (data[2] << 8) | (data[3]);
|
||||
fcon->fcgi_in_record.contentLength = (data[4] << 8) | (data[5]);
|
||||
fcon->fcgi_in_record.paddingLength = data[6];
|
||||
fcon->fcgi_in_record.remainingContent = fcon->fcgi_in_record.contentLength;
|
||||
fcon->fcgi_in_record.remainingPadding = fcon->fcgi_in_record.paddingLength;
|
||||
fcon->fcgi_in_record.valid = TRUE;
|
||||
fcon->fcgi_in_record.first = TRUE;
|
||||
|
||||
len = ((gint) fcon->fcgi_in_record.contentLength) + fcon->fcgi_in_record.paddingLength + FCGI_HEADER_LEN;
|
||||
|
||||
if (len > fcon->fcgi_in->length) return FALSE; /* need more data */
|
||||
chunkqueue_skip(fcon->fcgi_in, FCGI_HEADER_LEN);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/* get available data and mark it as read (subtract it from contentLength) */
|
||||
static int fastcgi_available(fastcgi_connection *fcon) {
|
||||
gint len = fcon->fcgi_in->length;
|
||||
if (len > fcon->fcgi_in_record.remainingContent) len = fcon->fcgi_in_record.remainingContent;
|
||||
fcon->fcgi_in_record.remainingContent -= len;
|
||||
return len;
|
||||
}
|
||||
|
||||
static gboolean fastcgi_parse_response(fastcgi_connection *fcon) {
|
||||
vrequest *vr = fcon->vr;
|
||||
plugin *p = fcon->ctx->plugin;
|
||||
gint len;
|
||||
while (fastcgi_get_packet(fcon)) {
|
||||
VR_WARNING(vr, "Fastcgi record type %i", (gint) fcon->fcgi_in_record.type);
|
||||
if (fcon->fcgi_in_record.version != FCGI_VERSION_1) {
|
||||
VR_ERROR(vr, "Unknown fastcgi protocol version %i", (gint) fcon->fcgi_in_record.version);
|
||||
close(fcon->fd);
|
||||
|
@ -495,34 +524,34 @@ static gboolean fastcgi_parse_response(fastcgi_connection *fcon) {
|
|||
vrequest_error(vr);
|
||||
return FALSE;
|
||||
}
|
||||
chunkqueue_skip(fcon->fcgi_in, FCGI_HEADER_LEN);
|
||||
switch (fcon->fcgi_in_record.type) {
|
||||
case FCGI_END_REQUEST:
|
||||
chunkqueue_skip(fcon->fcgi_in, fcon->fcgi_in_record.contentLength);
|
||||
chunkqueue_skip(fcon->fcgi_in, fastcgi_available(fcon));
|
||||
fcon->stdout->is_closed = TRUE;
|
||||
break;
|
||||
case FCGI_STDOUT:
|
||||
if (0 == fcon->fcgi_in_record.contentLength) {
|
||||
fcon->stdout->is_closed = TRUE;
|
||||
} else {
|
||||
chunkqueue_steal_len(fcon->stdout, fcon->fcgi_in, fcon->fcgi_in_record.contentLength);
|
||||
chunkqueue_steal_len(fcon->stdout, fcon->fcgi_in, fastcgi_available(fcon));
|
||||
}
|
||||
break;
|
||||
case FCGI_STDERR:
|
||||
chunkqueue_extract_to(vr, fcon->fcgi_in, fcon->fcgi_in_record.contentLength, vr->con->wrk->tmp_str);
|
||||
len = fastcgi_available(fcon);
|
||||
chunkqueue_extract_to(vr, fcon->fcgi_in, len, vr->con->wrk->tmp_str);
|
||||
if (FASTCGI_OPTION(FASTCGI_OPTION_LOG_PLAIN_ERRORS).boolean) {
|
||||
log_split_lines(vr->con->srv, vr, LOG_LEVEL_BACKEND, 0, vr->con->wrk->tmp_str->str, "");
|
||||
} else {
|
||||
VR_BACKEND_LINES(vr, vr->con->wrk->tmp_str->str, "%s", "(fcgi-stderr) ");
|
||||
}
|
||||
chunkqueue_skip(fcon->fcgi_in, fcon->fcgi_in_record.contentLength);
|
||||
chunkqueue_skip(fcon->fcgi_in, len);
|
||||
break;
|
||||
default:
|
||||
VR_WARNING(vr, "Unhandled fastcgi record type %i", (gint) fcon->fcgi_in_record.type);
|
||||
chunkqueue_skip(fcon->fcgi_in, fcon->fcgi_in_record.contentLength);
|
||||
if (fcon->fcgi_in_record.first) VR_WARNING(vr, "Unhandled fastcgi record type %i", (gint) fcon->fcgi_in_record.type);
|
||||
chunkqueue_skip(fcon->fcgi_in, fastcgi_available(fcon));
|
||||
break;
|
||||
}
|
||||
chunkqueue_skip(fcon->fcgi_in, fcon->fcgi_in_record.paddingLength);
|
||||
fcon->fcgi_in_record.first = FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -703,7 +732,7 @@ static handler_t fastcgi_handle(vrequest *vr, gpointer param, gpointer *context)
|
|||
chunkqueue_set_limit(fcon->fcgi_in, vr->out->limit);
|
||||
chunkqueue_set_limit(fcon->stdout, vr->out->limit);
|
||||
chunkqueue_set_limit(fcon->fcgi_out, vr->in->limit);
|
||||
vr->out->limit->io_watcher = &fcon->fd_watcher;
|
||||
if (vr->out->limit) vr->out->limit->io_watcher = &fcon->fd_watcher;
|
||||
|
||||
return fastcgi_statemachine(vr, fcon);
|
||||
}
|
||||
|
@ -720,7 +749,7 @@ static void fastcgi_close(vrequest *vr, plugin *p) {
|
|||
fastcgi_connection *fcon = (fastcgi_connection*) g_ptr_array_index(vr->plugin_ctx, p->id);
|
||||
g_ptr_array_index(vr->plugin_ctx, p->id) = NULL;
|
||||
if (fcon) {
|
||||
vr->out->limit->io_watcher = NULL;
|
||||
if (vr->out->limit) vr->out->limit->io_watcher = NULL;
|
||||
fastcgi_connection_free(fcon);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,12 +116,22 @@ network_status_t network_write(vrequest *vr, int fd, chunkqueue *cq) {
|
|||
|
||||
network_status_t network_read(vrequest *vr, int fd, chunkqueue *cq) {
|
||||
const ssize_t blocksize = 16*1024; /* 16k */
|
||||
const off_t max_read = 16 * blocksize; /* 256k */
|
||||
off_t max_read = 16 * blocksize; /* 256k */
|
||||
ssize_t r;
|
||||
off_t len = 0;
|
||||
worker *wrk = vr->con->wrk;
|
||||
ev_tstamp now = CUR_TS(wrk);
|
||||
|
||||
if (cq->limit && cq->limit->limit > 0) {
|
||||
if (max_read > cq->limit->limit - cq->limit->current) {
|
||||
max_read = cq->limit->limit - cq->limit->current;
|
||||
if (max_read <= 0) {
|
||||
max_read = 0; /* we still have to read something */
|
||||
VR_ERROR(vr, "%s", "network_read: fd should be disabled as chunkqueue is already full");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
GString *buf = g_string_sized_new(blocksize);
|
||||
g_string_set_size(buf, blocksize);
|
||||
|
|
|
@ -983,8 +983,10 @@ static action* core_limit_out(server *srv, plugin* p, value *val) {
|
|||
|
||||
if (limit < 0) {
|
||||
limit = 0; /* no limit */
|
||||
}
|
||||
if (limit > (1 << 30)) {
|
||||
} else if (limit < (16*1024)) {
|
||||
ERROR(srv, "limit %"G_GINT64_FORMAT" is too low (need at least 16 kb)", limit);
|
||||
return NULL;
|
||||
} else if (limit > (1 << 30)) {
|
||||
ERROR(srv, "limit %"G_GINT64_FORMAT" is too high (1GB is the maximum)", limit);
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue