Browse Source

[core] splice() data from backends to tempfiles

splice() data from backends to tempfiles (where splice() is available);
reduce copying data to userspace when writing data to tempfiles

Note: splice() on Linux returns EINVAL if target file has O_APPEND set
so lighttpd uses pwrite() (where available) when writing to tempfiles
(instead of lseek() + write(), or O_APPEND and write())
Glenn Strauss 4 weeks ago
parent
commit
487113fc50
  1. 150
      src/chunk.c
  2. 9
      src/chunk.h
  3. 49
      src/http-header-glue.c
  4. 3
      src/server.c

150
src/chunk.c

@ -643,7 +643,12 @@ void chunkqueue_steal(chunkqueue * const restrict dest, chunkqueue * const restr
static int chunkqueue_get_append_mkstemp(buffer * const b, const char *path, const uint32_t len) {
buffer_copy_path_len2(b,path,len,CONST_STR_LEN("lighttpd-upload-XXXXXX"));
#if defined(HAVE_SPLICE) && defined(HAVE_PWRITE)
/*(splice() rejects O_APPEND target; omit flag if also using pwrite())*/
return fdevent_mkostemp(b->ptr, 0);
#else
return fdevent_mkostemp(b->ptr, O_APPEND);
#endif
}
static chunk *chunkqueue_get_append_newtempfile(chunkqueue * const restrict cq, log_error_st * const restrict errh) {
@ -777,8 +782,13 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch
#ifdef __COVERITY__
if (dst_c->file.fd < 0) return -1;
#endif
#ifdef HAVE_PWRITE
/* coverity[negative_returns : FALSE] */
const ssize_t written =pwrite(dst_c->file.fd, mem, len, dst_c->file.length);
#else
/* coverity[negative_returns : FALSE] */
const ssize_t written = write(dst_c->file.fd, mem, len);
#endif
if ((size_t) written == len) {
dst_c->file.length += len;
@ -800,6 +810,146 @@ int chunkqueue_append_mem_to_tempfile(chunkqueue * const restrict dest, const ch
return -1;
}
#ifdef HAVE_SPLICE
__attribute_cold__
__attribute_noinline__
static ssize_t chunkqueue_append_drain_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
/* attempt to drain full 'len' from pipe
* (even if len not reduced to opts->max_per_read limit)
* since data may have already been moved from socket to pipe
*(returns 0 on success, or -errno (negative errno) if error,
* even if partial write occurred)*/
char buf[16384];
ssize_t rd;
do {
do {
rd = read(fd, buf, sizeof(buf));
} while (rd < 0 && errno == EINTR);
if (rd < 0) break;
if (0 != chunkqueue_append_mem_to_tempfile(cq, buf, (size_t)rd, errh))
break;
} while ((len -= (unsigned int)rd));
if (0 == len)
return 0;
else {
const int errnum = errno;
if (cq->last && 0 == chunk_remaining_length(cq->last)) {
/*(remove empty chunk and unlink tempfile)*/
chunkqueue_remove_empty_chunks(cq);
}
return -errnum;
}
}
ssize_t chunkqueue_append_splice_pipe_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
/*(returns num bytes written, or -errno (negative errno) if error)*/
ssize_t total = 0;
do {
chunk * const c = chunkqueue_get_append_tempfile(cq, errh);
if (__builtin_expect( (NULL == c), 0)) return -errno;
loff_t off = c->file.length;
ssize_t wr = splice(fd, NULL, c->file.fd, &off, len,
SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
if (__builtin_expect(((size_t)wr == len), 1)) {
c->file.length += len;
cq->bytes_in += len;
return total + len;
}
else if (wr >= 0) {
/*(assume EINTR if partial write and retry;
* retry might fail with ENOSPC if no more space on volume)*/
cq->bytes_in += wr;
total += wr;
len -= (size_t)wr;
c->file.length += (size_t)wr;
/* continue; retry */
}
else {
const int errnum = errno;
switch (errnum) {
case EAGAIN:
#ifdef EWOULDBLOCK
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
#endif
if (0 == chunk_remaining_length(c)) {
/*(remove empty chunk and unlink tempfile)*/
chunkqueue_remove_empty_chunks(cq);
}
return total;
case EINVAL: /*(assume total == 0 if EINVAL)*/
wr = chunkqueue_append_drain_pipe_tempfile(cq, fd, len, errh);
return (0 == wr) ? total + len : wr;
default:
if (!chunkqueue_append_tempfile_err(cq, errh, c))
return -errnum;
break; /* else continue; retry */
}
}
} while (len);
return -EIO; /*(not reached)*/
}
static int cqpipes[2] = { -1, -1 };
__attribute_cold__
__attribute_noinline__
void chunkqueue_internal_pipes(int init) {
/*(intended for internal use within a single lighttpd process;
* must be initialized after fork() and graceful-restart to avoid
* sharing pipes between processes)*/
if (-1 != cqpipes[0]) { close(cqpipes[0]); cqpipes[0] = -1; }
if (-1 != cqpipes[1]) { close(cqpipes[1]); cqpipes[1] = -1; }
if (init)
fdevent_pipe_cloexec(cqpipes, 262144);
}
__attribute_cold__
__attribute_noinline__
static void chunkqueue_pipe_read_discard (void) {
char buf[16384];
ssize_t rd;
do {
rd = read(cqpipes[0], buf, sizeof(buf));
} while (rd > 0 || (rd < 0 && errno == EINTR));
if (rd < 0
#ifdef EWOULDBLOCK
#if EWOULDBLOCK != EAGAIN
&& errno != EWOULDBLOCK
#endif
#endif
&& errno != EAGAIN) {
chunkqueue_internal_pipes(1); /*(close() and re-initialize)*/
}
}
ssize_t chunkqueue_append_splice_sock_tempfile(chunkqueue * const restrict cq, const int fd, unsigned int len, log_error_st * const restrict errh) {
/*(returns num bytes written, or -errno (negative errno) if error)*/
int * const pipes = cqpipes;
if (-1 == pipes[1])
return -EINVAL; /*(not configured; not handled here)*/
/* splice() socket data to intermediate pipe */
ssize_t wr = splice(fd, NULL, pipes[1], NULL, len,
SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
if (__builtin_expect( (wr <= 0), 0))
return -EINVAL; /*(reuse to indicate not handled here)*/
len = (unsigned int)wr;
/* splice() data from intermediate pipe to tempfile */
wr = chunkqueue_append_splice_pipe_tempfile(cq, pipes[0], len, errh);
if (wr < 0) /* expect (wr == (ssize_t)len) or (wr == -1) */
chunkqueue_pipe_read_discard();/* discard data from intermediate pipe */
return wr;
}
#endif /* HAVE_SPLICE */
int chunkqueue_steal_with_tempfiles(chunkqueue * const restrict dest, chunkqueue * const restrict src, off_t len, log_error_st * const restrict errh) {
while (len > 0) {
chunk *c = src->first;

9
src/chunk.h

@ -111,6 +111,15 @@ void chunkqueue_append_buffer_commit(chunkqueue *cq);
int chunkqueue_append_mem_to_tempfile(chunkqueue * restrict cq, const char * restrict mem, size_t len, struct log_error_st * const restrict errh);
#ifdef HAVE_SPLICE
ssize_t chunkqueue_append_splice_pipe_tempfile(chunkqueue * restrict cq, int fd, unsigned int len, struct log_error_st * restrict errh);
ssize_t chunkqueue_append_splice_sock_tempfile(chunkqueue * restrict cq, int fd, unsigned int len, struct log_error_st * restrict errh);
__attribute_cold__
void chunkqueue_internal_pipes(int init);
#else
#define chunkqueue_internal_pipes(init) do { } while (0)
#endif
/* functions to handle buffers to read into: */
/* obtain/reserve memory in chunkqueue at least len (input) size,
* return pointer to memory with len (output) available for use

49
src/http-header-glue.c

@ -750,6 +750,42 @@ static int http_response_append_buffer(request_st * const r, buffer * const mem,
}
#ifdef HAVE_SPLICE
static int http_response_append_splice(request_st * const r, http_response_opts * const opts, buffer * const b, const int fd, unsigned int toread) {
/* check if worthwhile to splice() to avoid copying through userspace */
/*assert(opts->simple_accum);*//*(checked in caller)*/
if (r->resp_body_scratchpad >= toread
&& (toread > 65536
|| (toread >= 32768/*(!http_response_append_buffer_simple_accum())*/
&& r->write_queue.last && r->write_queue.last->file.is_temp))) {
if (!buffer_is_blank(b)) {
/*(flush small reads previously accumulated in b)*/
int rc = http_response_append_buffer(r, b, 0); /*(0 to flush)*/
chunk_buffer_yield(b); /*(improve large buf reuse)*/
if (__builtin_expect( (0 != rc), 0)) return -1; /* error */
}
/*assert(opts->fdfmt == S_IFSOCK || opts->fdfmt == S_IFIFO);*/
ssize_t n = (opts->fdfmt == S_IFSOCK)
? chunkqueue_append_splice_sock_tempfile(
&r->write_queue, fd, toread, r->conf.errh)
: chunkqueue_append_splice_pipe_tempfile(
&r->write_queue, fd, toread, r->conf.errh);
if (__builtin_expect( (n >= 0), 1)) {
if (0 == (r->resp_body_scratchpad -= n))
r->resp_body_finished = 1;
return 1; /* success */
}
else if (n != -EINVAL)
return -1; /* error */
/*(fall through; target filesystem w/o splice() support)*/
}
return 0; /* not handled */
}
#endif
static int http_response_append_mem(request_st * const r, const char * const mem, size_t len) {
if (r->resp_decode_chunked)
return http_chunk_decode_append_mem(r, mem, len);
@ -1159,6 +1195,19 @@ handler_t http_response_read(request_st * const r, http_response_opts * const op
avail = buffer_string_space(b);
if (0 == fdevent_ioctl_fionread(fd, opts->fdfmt, (int *)&toread)) {
#ifdef HAVE_SPLICE
/* check if worthwhile to splice() to avoid copying to userspace */
if (opts->simple_accum) {
int rc = http_response_append_splice(r, opts, b, fd, toread);
if (rc) {
if (__builtin_expect( (rc > 0), 1))
break;
return HANDLER_ERROR;
} /*(fall through to handle traditionally)*/
}
#endif
if (avail < toread) {
uint32_t blen = buffer_clen(b);
if (toread + blen < 4096)

3
src/server.c

@ -1758,6 +1758,8 @@ static int server_main_setup (server * const srv, int argc, char **argv) {
return -1;
}
chunkqueue_internal_pipes(config_feature_bool(srv, "chunkqueue.splice", 1));
/* might fail if user is using fam (not gamin) and famd isn't running */
if (!stat_cache_init(srv->ev, srv->errh)) {
log_error(srv->errh, __FILE__, __LINE__,
@ -2064,6 +2066,7 @@ int main (int argc, char ** argv) {
}
/* clean-up */
chunkqueue_internal_pipes(0);
remove_pid_file(srv);
config_log_error_close(srv);
if (graceful_restart)

Loading…
Cancel
Save