Append to previous buffer in network reads

personal/stbuehler/wip
Stefan Bühler 13 years ago
parent d8f5d21ec6
commit 750bd453a2
  1. 11
      include/lighttpd/chunk.h
  2. 35
      src/main/chunk.c
  3. 8
      src/main/connection.c
  4. 8
      src/main/http_request_parser.rl
  5. 25
      src/main/network.c
  6. 42
      src/modules/mod_openssl.c

@ -201,6 +201,17 @@ INLINE liChunk* li_chunkqueue_first_chunk(liChunkQueue *cq);
LI_API gboolean li_chunkqueue_extract_to(liVRequest *vr, liChunkQueue *cq, goffset len, GString *dest);
LI_API gboolean li_chunkqueue_extract_to_bytearr(liVRequest *vr, liChunkQueue *cq, goffset len, GByteArray *dest);
/* helper functions to append to the last BUFFER_CHUNK of a chunkqueue */
/* returns the liBuffer from the last chunk in cq, if the chunk has type BUFFER_CHUNK,
* and the buffer has at least min_space bytes free and refcount == 1 (NULL otherwise) */
LI_API liBuffer* li_chunkqueue_get_last_buffer(liChunkQueue *cq, guint min_space);
/* only call this if li_chunkqueue_get_last_buffer returned a buffer; don't modify the chunkqueue
* between the two calls
* updates the buffer and the cq data
*/
LI_API void li_chunkqueue_update_last_buffer_size(liChunkQueue *cq, goffset add_length);
/********************
* Inline functions *
********************/

@ -870,3 +870,38 @@ error:
return FALSE;
}
/* helper functions to append to the last BUFFER_CHUNK of a chunkqueue */
/* returns the liBuffer from the last chunk in cq, if the chunk has type BUFFER_CHUNK,
* and the buffer has at least min_space bytes free and refcount == 1 (NULL otherwise) */
liBuffer* li_chunkqueue_get_last_buffer(liChunkQueue *cq, guint min_space) {
liChunk *c = g_queue_peek_tail(&cq->queue);
liBuffer *buf;
if (!c || c->type != BUFFER_CHUNK) return NULL;
buf = c->data.buffer.buffer;
if (g_atomic_int_get(&buf->refcount) != 1 || (buf->alloc_size - buf->used) < min_space) return NULL;
/* truncate buf->used - we are the only reference, so that is no problem;
* but we need to append directly after the current data block
*/
buf->used = c->data.buffer.offset + c->data.buffer.length;
return buf;
}
/* only call this if li_chunkqueue_get_last_buffer returned a buffer; don't modify the chunkqueue
* between the two calls
* updates the buffer and the cq data
*/
LI_API void li_chunkqueue_update_last_buffer_size(liChunkQueue *cq, goffset add_length) {
liChunk *c = g_queue_peek_tail(&cq->queue);
liBuffer *buf;
assert(c && c->type == BUFFER_CHUNK);
buf = c->data.buffer.buffer;
buf->used += add_length;
c->data.buffer.length += add_length;
cq->length += add_length;
cq->bytes_in += add_length;
cqlimit_update(cq, add_length);
}

@ -7,6 +7,8 @@ static void li_connection_internal_error(liConnection *con);
static void parse_request_body(liConnection *con) {
if ((con->state > LI_CON_STATE_HANDLE_MAINVR || con->mainvr->state >= LI_VRS_READ_CONTENT) && !con->in->is_closed) {
goffset newbytes = 0;
li_ev_io_add_events(con->wrk->loop, &con->sock_watcher, EV_READ);
if (con->mainvr->request.content_length == -1) {
/* TODO: parse chunked encoded request body, filters */
@ -14,14 +16,16 @@ static void parse_request_body(liConnection *con) {
con->in->is_closed = TRUE;
} else {
if (con->in->bytes_in < con->mainvr->request.content_length) {
li_chunkqueue_steal_len(con->in, con->raw_in, con->mainvr->request.content_length - con->in->bytes_in);
newbytes = li_chunkqueue_steal_len(con->in, con->raw_in, con->mainvr->request.content_length - con->in->bytes_in);
}
if (con->in->bytes_in == con->mainvr->request.content_length) {
con->in->is_closed = TRUE;
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}
}
li_vrequest_handle_request_body(con->mainvr);
if (newbytes > 0 || con->in->is_closed) {
li_vrequest_handle_request_body(con->mainvr);
}
} else {
li_ev_io_rem_events(con->wrk->loop, &con->sock_watcher, EV_READ);
}

@ -27,7 +27,13 @@
g_string_truncate(ctx->h_value, 0);
}
action header_value {
getStringTo(fpc - 2, ctx->h_value);
getStringTo(fpc, ctx->h_value);
/* Remove CRLF */
if (ctx->h_value->len > 2) {
g_string_truncate(ctx->h_value, ctx->h_value->len - 2);
} else {
g_string_truncate(ctx->h_value, 0);
}
}
action header {
li_http_header_insert(ctx->request->headers, GSTR_LEN(ctx->h_key), GSTR_LEN(ctx->h_value));

@ -90,9 +90,18 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq) {
}
do {
liBuffer *buf = li_buffer_new(blocksize);
if (-1 == (r = li_net_read(fd, buf->addr, buf->alloc_size))) {
li_buffer_release(buf);
liBuffer *buf;
gboolean cq_buf_append;
buf = li_chunkqueue_get_last_buffer(cq, 1024);
buf = NULL;
if (!(cq_buf_append = (buf != NULL))) {
buf = li_buffer_new(blocksize);
} else {
VR_ERROR(vr, "buffer: used %i", (int) buf->used);
}
if (-1 == (r = li_net_read(fd, buf->addr + buf->used, buf->alloc_size - buf->used))) {
if (!cq_buf_append) li_buffer_release(buf);
switch (errno) {
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
@ -107,11 +116,15 @@ liNetworkStatus li_network_read(liVRequest *vr, int fd, liChunkQueue *cq) {
return LI_NETWORK_STATUS_FATAL_ERROR;
}
} else if (0 == r) {
li_buffer_release(buf);
if (!cq_buf_append) li_buffer_release(buf);
return len ? LI_NETWORK_STATUS_SUCCESS : LI_NETWORK_STATUS_CONNECTION_CLOSE;
}
buf->used = r;
li_chunkqueue_append_buffer(cq, buf);
if (cq_buf_append) {
li_chunkqueue_update_last_buffer_size(cq, r);
} else {
buf->used = r;
li_chunkqueue_append_buffer(cq, buf);
}
len += r;
} while (r == blocksize && len < max_read);

@ -36,7 +36,6 @@ typedef struct openssl_context openssl_context;
struct openssl_connection_ctx {
SSL *ssl;
GByteArray *reuse_read_buffer;
};
struct openssl_context {
@ -90,10 +89,6 @@ static void openssl_con_close(liConnection *con) {
conctx->ssl = FALSE;
}
if (conctx->reuse_read_buffer) {
g_byte_array_free(conctx->reuse_read_buffer, TRUE);
}
con->srv_sock_data = NULL;
con->is_ssl = FALSE;
@ -194,7 +189,6 @@ static liNetworkStatus openssl_con_write(liConnection *con, goffset write_max) {
static liNetworkStatus openssl_con_read(liConnection *con) {
liChunkQueue *cq = con->raw_in;
openssl_connection_ctx *conctx = con->srv_sock_data;
GByteArray *buf;
const ssize_t blocksize = 16*1024; /* 16k */
off_t max_read = 16 * blocksize; /* 256k */
@ -212,30 +206,31 @@ static liNetworkStatus openssl_con_read(liConnection *con) {
}
}
buf = conctx->reuse_read_buffer;
conctx->reuse_read_buffer = NULL;
do {
liBuffer *buf;
gboolean cq_buf_append;
ERR_clear_error();
if (!buf) {
buf = g_byte_array_new();
g_byte_array_set_size(buf, blocksize);
buf = li_chunkqueue_get_last_buffer(cq, 1024);
buf = NULL;
if (!(cq_buf_append = (buf != NULL))) {
buf = li_buffer_new(blocksize);
}
r = SSL_read(conctx->ssl, buf->data, buf->len);
r = SSL_read(conctx->ssl, buf->addr + buf->used, buf->alloc_size - buf->used);
if (r < 0) {
int oerrno = errno, err;
gboolean was_fatal;
if (!cq_buf_append) li_buffer_release(buf);
err = SSL_get_error(conctx->ssl, r);
if (SSL_ERROR_WANT_READ == err || SSL_ERROR_WANT_WRITE == err) {
conctx->reuse_read_buffer = buf;
return LI_NETWORK_STATUS_WAIT_FOR_EVENT;
/* ignore requirement that we should pass the same buffer again */
return (len > 0) ? LI_NETWORK_STATUS_SUCCESS : LI_NETWORK_STATUS_WAIT_FOR_EVENT;
}
g_byte_array_free(buf, TRUE);
buf = NULL;
switch (err) {
case SSL_ERROR_SYSCALL:
@ -297,15 +292,18 @@ static liNetworkStatus openssl_con_read(liConnection *con) {
return LI_NETWORK_STATUS_FATAL_ERROR;
} else if (r == 0) {
g_byte_array_free(buf, TRUE);
if (!cq_buf_append) li_buffer_release(buf);
return LI_NETWORK_STATUS_CONNECTION_CLOSE;
}
g_byte_array_set_size(buf, r);
li_chunkqueue_append_bytearr(cq, buf);
buf = NULL;
if (cq_buf_append) {
li_chunkqueue_update_last_buffer_size(cq, r);
} else {
buf->used = r;
li_chunkqueue_append_buffer(cq, buf);
}
len += r;
} while (r == blocksize && len < max_read);
} while (len < max_read);
return LI_NETWORK_STATUS_SUCCESS;
}

Loading…
Cancel
Save