2
0
Fork 0

Append to previous buffer in network reads, save used buffer in custom location

This commit is contained in:
Stefan Bühler 2010-02-10 22:25:48 +01:00
parent 13611b5243
commit 048a7d9144
10 changed files with 119 additions and 29 deletions

View File

@ -152,11 +152,11 @@ LI_API void li_chunkqueue_append_string(liChunkQueue *cq, GString *str);
*/
LI_API void li_chunkqueue_append_bytearr(liChunkQueue *cq, GByteArray *mem);
/* pass ownership of buffer to chunkqueue, do not free/modify it afterwards
* you may modify the data (not the length) if you are sure it isn't sent before.
* if the length is NULL, buffer is destroyed immediately
/* pass ownership of one buffer reference to chunkqueue
* if the length is NULL, reference is released immediately
*/
LI_API void li_chunkqueue_append_buffer(liChunkQueue *cq, liBuffer *buffer);
LI_API void li_chunkqueue_append_buffer2(liChunkQueue *cq, liBuffer *buffer, gsize offset, gsize length);
/* memory gets copied */
LI_API void li_chunkqueue_append_mem(liChunkQueue *cq, const void *mem, gssize len);

View File

@ -37,6 +37,7 @@ struct liConnection {
liChunkQueue *raw_in, *raw_out;
liChunkQueue *in, *out; /* link to mainvr->in/out */
liBuffer *raw_in_buffer;
ev_io sock_watcher;
liSocketAddress remote_addr, local_addr;

View File

@ -16,7 +16,7 @@ LI_API ssize_t li_net_write(int fd, void *buf, ssize_t nbyte);
LI_API ssize_t li_net_read(int fd, void *buf, ssize_t nbyte);
LI_API liNetworkStatus li_network_write(liVRequest *vr, int fd, liChunkQueue *cq, goffset write_max);
LI_API liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq);
LI_API liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq, liBuffer **buffer);
/* use writev for mem chunks, buffered read/write for files */
LI_API liNetworkStatus li_network_write_writev(liVRequest *vr, int fd, liChunkQueue *cq, goffset *write_max);

View File

@ -522,9 +522,8 @@ void li_chunkqueue_append_bytearr(liChunkQueue *cq, GByteArray *mem) {
cqlimit_update(cq, mem->len);
}
/* pass ownership of buffer to chunkqueue, do not free/modify it afterwards
* you may modify the data (not the length) if you are sure it isn't sent before.
* if the length is NULL, buffer is destroyed immediately
/* pass ownership of one buffer reference to chunkqueue
* if the length is NULL, reference is released immediately
*/
void li_chunkqueue_append_buffer(liChunkQueue *cq, liBuffer *buffer) {
liChunk *c;
@ -543,6 +542,24 @@ void li_chunkqueue_append_buffer(liChunkQueue *cq, liBuffer *buffer) {
cqlimit_update(cq, buffer->used);
}
void li_chunkqueue_append_buffer2(liChunkQueue *cq, liBuffer *buffer, gsize offset, gsize length) {
liChunk *c;
if (length == 0) {
li_buffer_release(buffer);
return;
}
assert(offset + length <= buffer->used);
c = chunk_new();
c->type = BUFFER_CHUNK;
c->data.buffer.buffer = buffer;
c->data.buffer.offset = offset;
c->data.buffer.length = length;
g_queue_push_tail_link(&cq->queue, &c->cq_link);
cq->length += length;
cq->bytes_in += length;
cqlimit_update(cq, length);
}
/* memory gets copied */
void li_chunkqueue_append_mem(liChunkQueue *cq, const void *mem, gssize len) {
liChunk *c;

View File

@ -279,7 +279,7 @@ static void connection_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (con->srv_sock->read_cb) {
res = con->srv_sock->read_cb(con);
} else {
res = li_network_read(con->mainvr, w->fd, con->raw_in);
res = li_network_read(con->mainvr, w->fd, con->raw_in, &con->raw_in_buffer);
}
transferred = con->raw_in->length - transferred;
@ -535,6 +535,8 @@ void li_connection_reset(liConnection *con) {
li_chunkqueue_reset(con->raw_in);
li_chunkqueue_reset(con->raw_out);
li_buffer_release(con->raw_in_buffer);
con->raw_in_buffer = NULL;
li_vrequest_reset(con->mainvr, FALSE);
@ -736,6 +738,7 @@ void li_connection_free(liConnection *con) {
li_chunkqueue_free(con->raw_in);
li_chunkqueue_free(con->raw_out);
li_buffer_release(con->raw_in_buffer);
li_vrequest_free(con->mainvr);
li_http_request_parser_clear(&con->req_parser_ctx);

View File

@ -73,7 +73,7 @@ liNetworkStatus li_network_write(liVRequest *vr, int fd, liChunkQueue *cq, goffs
return res;
}
liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq) {
liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq, liBuffer **buffer) {
const ssize_t blocksize = 16*1024; /* 16k */
off_t max_read = 16 * blocksize; /* 256k */
ssize_t r;
@ -90,18 +90,47 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq) {
}
do {
liBuffer *buf;
liBuffer *buf = NULL;
gboolean cq_buf_append;
buf = li_chunkqueue_get_last_buffer(cq, 1024);
buf = NULL;
if (!(cq_buf_append = (buf != NULL))) {
buf = li_buffer_new(blocksize);
cq_buf_append = (buf != NULL);
if (NULL != buffer) {
if (buf != NULL) {
/* use last buffer as *buffer; they should be the same anyway */
if (G_UNLIKELY(buf != *buffer)) {
li_buffer_acquire(buf);
li_buffer_release(*buffer);
*buffer = buf;
}
} else {
buf = *buffer;
if (buf != NULL) {
/* if *buffer is the only reference, we can reset the buffer */
if (g_atomic_int_get(&buf->refcount) == 1) {
buf->used = 0;
}
if (buf->alloc_size - buf->used < 1024) {
/* release *buffer */
li_buffer_release(buf);
*buffer = buf = NULL;
}
}
if (buf == NULL) {
*buffer = buf = li_buffer_new(blocksize);
}
}
assert(*buffer == buf);
} else {
VR_ERROR(vr, "buffer: used %i", (int) buf->used);
if (buf == NULL) {
buf = li_buffer_new(blocksize);
}
}
if (-1 == (r = li_net_read(fd, buf->addr + buf->used, buf->alloc_size - buf->used))) {
if (!cq_buf_append) li_buffer_release(buf);
if (buffer == NULL && !cq_buf_append) li_buffer_release(buf);
switch (errno) {
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
@ -116,14 +145,26 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq) {
return LI_NETWORK_STATUS_FATAL_ERROR;
}
} else if (0 == r) {
if (!cq_buf_append) li_buffer_release(buf);
if (buffer == NULL && !cq_buf_append) li_buffer_release(buf);
return len ? LI_NETWORK_STATUS_SUCCESS : LI_NETWORK_STATUS_CONNECTION_CLOSE;
}
if (cq_buf_append) {
li_chunkqueue_update_last_buffer_size(cq, r);
} else {
buf->used = r;
li_chunkqueue_append_buffer(cq, buf);
gsize offset;
if (buffer != NULL) li_buffer_acquire(buf);
offset = buf->used;
buf->used += r;
li_chunkqueue_append_buffer2(cq, buf, offset, r);
}
if (NULL != buffer) {
if (buf->alloc_size - buf->used < 1024) {
/* release *buffer */
li_buffer_release(buf);
*buffer = buf = NULL;
}
}
len += r;
} while (r == blocksize && len < max_read);

View File

@ -68,6 +68,7 @@ struct fastcgi_connection {
int fd;
ev_io fd_watcher;
liChunkQueue *fcgi_in, *fcgi_out, *stdout;
liBuffer *fcgi_in_buffer;
GByteArray *buf_in_record;
FCGI_Record fcgi_in_record;
@ -192,6 +193,7 @@ static void fastcgi_connection_free(fastcgi_connection *fcon) {
li_chunkqueue_free(fcon->fcgi_in);
li_chunkqueue_free(fcon->fcgi_out);
li_chunkqueue_free(fcon->stdout);
li_buffer_release(fcon->fcgi_in_buffer);
g_byte_array_free(fcon->buf_in_record, TRUE);
li_http_response_parser_clear(&fcon->parse_response_ctx);
@ -563,7 +565,7 @@ static void fastcgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (fcon->fcgi_in->is_closed) {
li_ev_io_rem_events(loop, w, EV_READ);
} else {
switch (li_network_read(fcon->vr, w->fd, fcon->fcgi_in)) {
switch (li_network_read(fcon->vr, w->fd, fcon->fcgi_in, &fcon->fcgi_in_buffer)) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR:

View File

@ -213,18 +213,33 @@ static liNetworkStatus openssl_con_read(liConnection *con) {
ERR_clear_error();
buf = li_chunkqueue_get_last_buffer(cq, 1024);
buf = NULL;
if (!(cq_buf_append = (buf != NULL))) {
buf = li_buffer_new(blocksize);
cq_buf_append = (buf != NULL);
if (buf != NULL) {
/* use last buffer as raw_in_buffer; they should be the same anyway */
if (G_UNLIKELY(buf != con->raw_in_buffer)) {
li_buffer_acquire(buf);
li_buffer_release(con->raw_in_buffer);
con->raw_in_buffer = buf;
}
} else {
buf = con->raw_in_buffer;
if (buf != NULL && buf->alloc_size - buf->used < 1024) {
/* release *buffer */
li_buffer_release(buf);
con->raw_in_buffer = buf = NULL;
}
if (buf == NULL) {
con->raw_in_buffer = buf = li_buffer_new(blocksize);
}
}
assert(con->raw_in_buffer == buf);
r = SSL_read(conctx->ssl, buf->addr + buf->used, buf->alloc_size - buf->used);
if (r < 0) {
int oerrno = errno, err;
gboolean was_fatal;
if (!cq_buf_append) li_buffer_release(buf);
err = SSL_get_error(conctx->ssl, r);
if (SSL_ERROR_WANT_READ == err || SSL_ERROR_WANT_WRITE == err) {
@ -292,15 +307,22 @@ static liNetworkStatus openssl_con_read(liConnection *con) {
return LI_NETWORK_STATUS_FATAL_ERROR;
} else if (r == 0) {
if (!cq_buf_append) li_buffer_release(buf);
return LI_NETWORK_STATUS_CONNECTION_CLOSE;
}
if (cq_buf_append) {
li_chunkqueue_update_last_buffer_size(cq, r);
} else {
buf->used = r;
li_chunkqueue_append_buffer(cq, buf);
gsize offset;
offset = buf->used;
buf->used += r;
li_chunkqueue_append_buffer2(cq, buf, offset, r);
}
if (buf->alloc_size - buf->used < 1024) {
/* release *buffer */
li_buffer_release(buf);
con->raw_in_buffer = buf = NULL;
}
len += r;
} while (len < max_read);

View File

@ -52,6 +52,7 @@ struct proxy_connection {
int fd;
ev_io fd_watcher;
liChunkQueue *proxy_in, *proxy_out;
liBuffer *proxy_in_buffer;
liHttpResponseCtx parse_response_ctx;
gboolean response_headers_finished;
@ -131,6 +132,7 @@ static void proxy_connection_free(proxy_connection *pcon) {
li_chunkqueue_free(pcon->proxy_in);
li_chunkqueue_free(pcon->proxy_out);
li_buffer_release(pcon->proxy_in_buffer);
li_http_response_parser_clear(&pcon->parse_response_ctx);
@ -240,7 +242,7 @@ static void proxy_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (pcon->proxy_in->is_closed) {
li_ev_io_rem_events(loop, w, EV_READ);
} else {
switch (li_network_read(pcon->vr, w->fd, pcon->proxy_in)) {
switch (li_network_read(pcon->vr, w->fd, pcon->proxy_in, &pcon->proxy_in_buffer)) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR:

View File

@ -47,6 +47,7 @@ struct scgi_connection {
int fd;
ev_io fd_watcher;
liChunkQueue *scgi_in, *scgi_out;
liBuffer *scgi_in_buffer;
liHttpResponseCtx parse_response_ctx;
gboolean response_headers_finished;
@ -126,6 +127,7 @@ static void scgi_connection_free(scgi_connection *scon) {
li_chunkqueue_free(scon->scgi_in);
li_chunkqueue_free(scon->scgi_out);
li_buffer_release(scon->scgi_in_buffer);
li_http_response_parser_clear(&scon->parse_response_ctx);
@ -329,7 +331,7 @@ static void scgi_fd_cb(struct ev_loop *loop, ev_io *w, int revents) {
if (scon->scgi_in->is_closed) {
li_ev_io_rem_events(loop, w, EV_READ);
} else {
switch (li_network_read(scon->vr, w->fd, scon->scgi_in)) {
switch (li_network_read(scon->vr, w->fd, scon->scgi_in, &scon->scgi_in_buffer)) {
case LI_NETWORK_STATUS_SUCCESS:
break;
case LI_NETWORK_STATUS_FATAL_ERROR: