[core] chunkqueue_{peek,read}_data(), squash

chunkqueue_peek_data(), chunkqueue_read_data(), chunkqueue_read_squash()
shared code for chunkqueue manipulation
This commit is contained in:
Glenn Strauss 2020-09-23 03:31:02 -04:00
parent 362be7b3bb
commit 6c68e14940
2 changed files with 121 additions and 2 deletions

View File

@ -784,8 +784,7 @@ void chunkqueue_compact_mem(chunkqueue *cq, size_t clen) {
* no data added/removed from chunkqueue; consolidated only */
}
int chunkqueue_open_file_chunk(chunkqueue * const restrict cq, log_error_st * const restrict errh) {
chunk* const c = cq->first;
static int chunk_open_file_chunk(chunk * const restrict c, log_error_st * const restrict errh) {
off_t offset, toSend;
struct stat st;
@ -820,6 +819,10 @@ int chunkqueue_open_file_chunk(chunkqueue * const restrict cq, log_error_st * co
return 0;
}
int chunkqueue_open_file_chunk(chunkqueue * const restrict cq, log_error_st * const restrict errh) {
return chunk_open_file_chunk(cq->first, errh);
}
void
chunkqueue_small_resp_optim (chunkqueue * const restrict cq)
@ -865,3 +868,114 @@ chunkqueue_small_resp_optim (chunkqueue * const restrict cq)
filec->offset += offset;
chunkqueue_remove_empty_chunks(cq);
}
int
chunkqueue_peek_data (chunkqueue * const cq,
char ** const data, uint32_t * const dlen,
log_error_st * const errh)
{
char * const data_in = *data;
const uint32_t data_insz = *dlen;
*dlen = 0;
for (chunk *c = cq->first; c; ) {
uint32_t space = data_insz - *dlen;
switch (c->type) {
case MEM_CHUNK:
{
uint32_t have = buffer_string_length(c->mem) - (uint32_t)c->offset;
if (have > space)
have = space;
if (*dlen)
memcpy(data_in + *dlen, c->mem->ptr + c->offset, have);
else
*data = c->mem->ptr + c->offset; /*(reference; defer copy)*/
*dlen += have;
break;
}
case FILE_CHUNK:
if (0 == chunk_open_file_chunk(c, errh)) {
off_t offset = c->file.start + c->offset;
off_t toSend = c->file.length - c->offset;
if (toSend > (off_t)space)
toSend = (off_t)space;
if (-1 == lseek(c->file.fd, offset, SEEK_SET)) {
log_perror(errh, __FILE__, __LINE__, "lseek");
return -1;
}
toSend = read(c->file.fd, data_in + *dlen, (size_t)toSend);
if (-1 == toSend) {
log_perror(errh, __FILE__, __LINE__, "read");
return -1;
}
*dlen += (uint32_t)toSend;
break;
}
return -1;
default:
return -1;
}
if (*dlen == data_insz)
break;
c = c->next;
if (NULL == c)
break;
if (*dlen && *data != data_in) {
memcpy(data_in, *data, *dlen);
*data = data_in;
}
}
return 0;
}
int
chunkqueue_read_data (chunkqueue * const cq,
char * const data, const uint32_t dlen,
log_error_st * const errh)
{
char *ptr = data;
uint32_t len = dlen;
if (chunkqueue_peek_data(cq, &ptr, &len, errh) < 0 || len != dlen)
return -1;
if (data != ptr) memcpy(data, ptr, len);
chunkqueue_mark_written(cq, len);
return 0;
}
buffer *
chunkqueue_read_squash (chunkqueue * const restrict cq, log_error_st * const restrict errh)
{
/* read and replace chunkqueue contents with single MEM_CHUNK.
* cq->bytes_out is not modified */
off_t cqlen = chunkqueue_length(cq);
if (cqlen >= UINT32_MAX) return NULL;
if (cq->first && NULL == cq->first->next && cq->first->type == MEM_CHUNK)
return cq->first->mem;
chunk * const c = chunk_acquire((uint32_t)cqlen+1);
char *data = c->mem->ptr;
uint32_t dlen = (uint32_t)cqlen;
int rc = chunkqueue_peek_data(cq, &data, &dlen, errh);
if (rc < 0) {
chunk_release(c);
return NULL;
}
buffer_string_set_length(c->mem, dlen);
chunkqueue_release_chunks(cq);
chunkqueue_append_chunk(cq, c);
return c->mem;
}

View File

@ -123,6 +123,11 @@ void chunkqueue_compact_mem(chunkqueue *cq, size_t clen);
void chunkqueue_small_resp_optim (chunkqueue * restrict cq);
int chunkqueue_peek_data (chunkqueue *cq, char **data, uint32_t *dlen, struct log_error_st * restrict errh);
int chunkqueue_read_data (chunkqueue *cq, char *data, uint32_t dlen, struct log_error_st * restrict errh);
buffer * chunkqueue_read_squash (chunkqueue * restrict cq, struct log_error_st * restrict errh);
__attribute_pure__
static inline off_t chunkqueue_length(const chunkqueue *cq);
static inline off_t chunkqueue_length(const chunkqueue *cq) {