[core] Add workers.cpu_affinity setup action

personal/stbuehler/wip
Thomas Porzelt 12 years ago
parent 004e70479e
commit 9c63e60cb7
  1. 3
      include/lighttpd/base.h
  2. 50
      include/lighttpd/log.h
  3. 21
      include/lighttpd/server.h
  4. 2
      include/lighttpd/worker.h
  5. 611
      src/main/log.c
  6. 129
      src/main/plugin_core.c
  7. 16
      src/main/server.c
  8. 31
      src/main/worker.c
  9. 26
      src/modules/mod_accesslog.c

@ -5,6 +5,8 @@
#error Do not mix lighty with angel code
#endif
#define STRINGIFY(s) #s
#include <lighttpd/settings.h>
/* Next try to fix strict-alias warning */
@ -35,6 +37,7 @@
#include <lighttpd/chunk_parser.h>
#include <lighttpd/waitqueue.h>
#include <lighttpd/radix.h>
#include <lighttpd/server.h>
#include <lighttpd/worker.h>

@ -5,34 +5,45 @@
#error Please include <lighttpd/base.h> instead of this file
#endif
/*
* Logging uses a dedicated thread in order to prevent blocking write io from blocking normal operations in worker threads.
* Code handling vrequests should use the VR_ERROR(), VR_DEBUG() etc makros. Otherwise the ERROR(), DEBUG() etc makros should be used.
* Basic examples: VR_WARNING(vr, "%s", "something unexpected happened") ERROR(srv, "%d is not bigger than %d", 23, 42)
*
* Log targets specify where the log messages are written to. They are kept open for a certain amount of time (default 30s).
* file://
*
* Logs are sent once per ev_loop() iteration to the logging thread in order to reduce syscalls and lock contention.
*/
/* #include <lighttpd/valgrind/valgrind.h> */
#define _SEGFAULT(srv, vr, fmt, ...) \
do { \
li_log_write_(srv, NULL, LI_LOG_LEVEL_ABORT, LOG_FLAG_TIMESTAMP, "(crashing) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__); \
li_log_write(srv, NULL, LI_LOG_LEVEL_ABORT, LOG_FLAG_TIMESTAMP, "(crashing) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__); \
/* VALGRIND_PRINTF_BACKTRACE(fmt, __VA_ARGS__); */\
abort();\
} while(0)
#define _ERROR(srv, vr, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_ERROR, LOG_FLAG_TIMESTAMP, "(error) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
li_log_write(srv, vr, LI_LOG_LEVEL_ERROR, LOG_FLAG_TIMESTAMP, "(error) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
#define _WARNING(srv, vr, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_WARNING, LOG_FLAG_TIMESTAMP, "(warning) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
li_log_write(srv, vr, LI_LOG_LEVEL_WARNING, LOG_FLAG_TIMESTAMP, "(warning) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
#define _INFO(srv, vr, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_INFO, LOG_FLAG_TIMESTAMP, "(info) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
li_log_write(srv, vr, LI_LOG_LEVEL_INFO, LOG_FLAG_TIMESTAMP, "(info) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
#define _DEBUG(srv, vr, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_DEBUG, LOG_FLAG_TIMESTAMP, "(debug) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
li_log_write(srv, vr, LI_LOG_LEVEL_DEBUG, LOG_FLAG_TIMESTAMP, "(debug) %s.%d: "fmt, LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__)
#define _BACKEND(srv, vr, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_BACKEND, LOG_FLAG_TIMESTAMP, fmt, __VA_ARGS__)
li_log_write(srv, vr, LI_LOG_LEVEL_BACKEND, LOG_FLAG_TIMESTAMP, fmt, __VA_ARGS__)
#define _BACKEND_LINES(srv, vr, txt, fmt, ...) \
li_log_split_lines_(srv, vr, LI_LOG_LEVEL_BACKEND, LOG_FLAG_TIMESTAMP, txt, fmt, __VA_ARGS__)
#define _GERROR(srv, vr, error, fmt, ...) \
li_log_write_(srv, vr, LI_LOG_LEVEL_ERROR, LOG_FLAG_TIMESTAMP, "(error) %s.%d: " fmt "\n %s", LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__, error ? error->message : "Empty GError")
li_log_write(srv, vr, LI_LOG_LEVEL_ERROR, LOG_FLAG_TIMESTAMP, "(error) %s.%d: " fmt "\n %s", LI_REMOVE_PATH(__FILE__), __LINE__, __VA_ARGS__, error ? error->message : "Empty GError")
#define VR_SEGFAULT(vr, fmt, ...) _SEGFAULT(vr->wrk->srv, vr, fmt, __VA_ARGS__)
#define VR_ERROR(vr, fmt, ...) _ERROR(vr->wrk->srv, vr, fmt, __VA_ARGS__)
@ -55,18 +66,12 @@
#define LOG_FLAG_NONE (0x0) /* default flag */
#define LOG_FLAG_TIMESTAMP (0x1) /* prepend a timestamp to the log message */
#define LOG_FLAG_NOLOCK (0x1 << 1) /* for internal use only */
#define LOG_FLAG_ALLOW_REPEAT (0x1 << 2) /* allow writing of multiple equal entries after each other */
struct liLog {
liLogType type;
GString *path;
gint refcount;
gint fd;
GString *lastmsg;
guint lastmsg_count;
GMutex *mutex;
liWaitQueueElem wqelem;
};
struct liLogTimestamp {
@ -77,9 +82,12 @@ struct liLogTimestamp {
};
struct liLogEntry {
liLog *log;
GString *path;
liLogTimestamp *ts;
liLogLevel level;
guint flags;
GString *msg;
GList queue_link;
};
/* determines the type of a log target by the path given. /absolute/path = file; |app = pipe; stderr = stderr; syslog = syslog */
@ -91,19 +99,17 @@ LI_API gchar* li_log_level_str(liLogLevel log_level);
/* log_new is used to create a new log target, if a log with the same path already exists, it is referenced instead */
LI_API liLog *li_log_new(liServer *srv, liLogType type, GString *path);
LI_API void li_log_ref(liServer *srv, liLog *log);
LI_API void li_log_unref(liServer *srv, liLog *log);
LI_API void li_log_thread_start(liServer *srv);
LI_API void li_log_thread_wakeup(liServer *srv);
LI_API void li_log_thread_stop(liServer *srv);
LI_API void li_log_thread_finish(liServer *srv);
LI_API void li_log_init(liServer *srv);
LI_API void li_log_cleanup(liServer *srv);
/* li_log_write is used to directly write a message to a log target */
LI_API void li_log_write(liServer *srv, liLog *log, GString *msg);
/* li_log_write_ is used to write to the errorlog */
LI_API gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guint flags, const gchar *fmt, ...) G_GNUC_PRINTF(5, 6);
LI_API gboolean li_log_write_direct(liServer *srv, liVRequest *vr, GString *path, GString *msg);
/* li_log_write is used to write to the errorlog */
LI_API gboolean li_log_write(liServer *srv, liVRequest *vr, liLogLevel log_level, guint flags, const gchar *fmt, ...) G_GNUC_PRINTF(5, 6);
LI_API liLogTimestamp *li_log_timestamp_new(liServer *srv, GString *format);
LI_API gboolean li_log_timestamp_free(liServer *srv, liLogTimestamp *ts);

@ -69,8 +69,7 @@ struct liServer {
guint worker_count;
GArray *workers;
#ifdef LIGHTY_OS_LINUX
cpu_set_t affinity_mask; /** cpus used by workers */
guint affinity_cpus; /** total number of cpus in affinity_mask */
liValue *workers_cpu_affinity;
#endif
GArray *ts_formats; /** array of (GString*), add with li_server_ts_format_add() */
@ -105,15 +104,17 @@ struct liServer {
gboolean exiting; /** atomic access */
struct {
GMutex *mutex;
GHashTable *targets; /** const gchar* path => (log_t*) */
GAsyncQueue *queue;
struct ev_loop *loop;
ev_async watcher;
liRadixTree *targets; /** const gchar* path => (liLog*) */
liWaitQueue close_queue;
GQueue write_queue;
GStaticMutex write_queue_mutex;
GThread *thread;
gboolean thread_finish; /** finish writing logs in the queue, then exit thread; access with atomic functions */
gboolean thread_stop; /** stop thread immediately; access with atomic functions */
gboolean thread_alive; /** access with atomic functions */
GArray *timestamps; /** array of log_timestamp_t */
liLog *stderr;
gboolean thread_alive;
gboolean thread_finish;
gboolean thread_stop;
GArray *timestamps;
} logs;
ev_tstamp started;

@ -70,6 +70,8 @@ struct liWorker {
ev_check loop_check;
ev_async worker_stop_watcher, worker_suspend_watcher, worker_exit_watcher;
GQueue log_queue;
guint connections_active; /** 0..con_act-1: active connections, con_act..used-1: free connections
* use with atomic, read direct from local worker context
*/

@ -12,159 +12,225 @@
#include <fcntl.h>
#include <stdarg.h>
#define DEFAULT_TS_FORMAT "%d/%b/%Y %T %Z"
#define LOG_DEFAULT_TS_FORMAT "%d/%b/%Y %T %Z"
#define LOG_DEFAULT_TTL 30.0
static void log_free_unlocked(liServer *srv, liLog *log);
static void log_thread_stop(liServer *srv);
static void log_thread_finish(liServer *srv);
static void log_watcher_cb(struct ev_loop *loop, ev_async *w, int revents);
static void log_lock(liLog *log) {
g_mutex_lock(log->mutex);
static void li_log_write_stderr(liServer *srv, const gchar *msg, gboolean newline) {
gsize s;
struct tm tm;
time_t now = (time_t) ev_time();
gchar buf[128];
GStaticMutex mtx = G_STATIC_MUTEX_INIT;
UNUSED(srv);
#ifdef HAVE_LOCALTIME_R
s = strftime(buf, sizeof(buf), LOG_DEFAULT_TS_FORMAT, localtime_r(&now, &tm));
#else
s = strftime(buf, sizeof(buf), LOG_DEFAULT_TS_FORMAT, localtime(&now));
#endif
buf[s] = '\0';
g_static_mutex_lock(&mtx);
g_printerr(newline ? "%s %s\n" : "%s %s", buf, msg);
g_static_mutex_unlock(&mtx);
}
static void log_unlock(liLog *log) {
g_mutex_unlock(log->mutex);
static liLog *log_open(liServer *srv, GString *path) {
liLog *log;
if (path)
log = li_radixtree_lookup_exact(srv->logs.targets, path->str, path->len * 8);
else
log = NULL;
if (!log) {
/* log not open */
gint fd = -1;
liLogType type = li_log_type_from_path(path);
switch (type) {
case LI_LOG_TYPE_STDERR:
fd = STDERR_FILENO;
break;
case LI_LOG_TYPE_FILE:
/* todo: open via angel */
fd = open(path->str, O_RDWR | O_CREAT | O_APPEND, 0660);
if (fd == -1) {
int err = errno;
GString *str = g_string_sized_new(255);
g_string_append_printf(str, "(error) %s.%d: failed to open log file '%s': %s", LI_REMOVE_PATH(__FILE__), __LINE__, path->str, g_strerror(err));
//li_log_write_stderr(srv, str->str, TRUE);
g_string_free(str, TRUE);
return NULL;
}
break;
case LI_LOG_TYPE_PIPE:
case LI_LOG_TYPE_SYSLOG:
/* todo */
assert(NULL);
case LI_LOG_TYPE_NONE:
return NULL;
}
log = g_slice_new0(liLog);
log->type = type;
log->path = g_string_new_len(GSTR_LEN(path));
log->fd = fd;
log->wqelem.data = log;
li_radixtree_insert(srv->logs.targets, log->path->str, log->path->len * 8, log);
/*g_print("log_open(\"%s\")\n", log->path->str);*/
}
li_waitqueue_push(&srv->logs.close_queue, &log->wqelem);
return log;
}
void li_log_write(liServer *srv, liLog *log, GString *msg) {
liLogEntry *log_entry;
static void log_close(liServer *srv, liLog *log) {
li_radixtree_remove(srv->logs.targets, log->path->str, log->path->len * 8);
li_waitqueue_remove(&srv->logs.close_queue, &log->wqelem);
if (log->type == LI_LOG_TYPE_FILE || log->type == LI_LOG_TYPE_PIPE) {
close(log->fd);
}
/*g_print("log_close(\"%s\")\n", log->path->str);*/
g_string_free(log->path, TRUE);
g_slice_free(liLog, log);
}
static void log_close_cb(struct ev_loop *loop, struct ev_timer *w, int revents) {
/* callback for the close queue */
liServer *srv = (liServer*) w->data;
liWaitQueueElem *wqe;
UNUSED(loop);
UNUSED(revents);
while ((wqe = li_waitqueue_pop(&srv->logs.close_queue)) != NULL) {
log_close(srv, wqe->data);
}
li_waitqueue_update(&srv->logs.close_queue);
}
void li_log_init(liServer *srv) {
srv->logs.loop = ev_loop_new(EVFLAG_AUTO);
ev_async_init(&srv->logs.watcher, log_watcher_cb);
srv->logs.watcher.data = srv;
srv->logs.targets = li_radixtree_new();
li_waitqueue_init(&srv->logs.close_queue, srv->logs.loop, log_close_cb, LOG_DEFAULT_TTL, srv);
srv->logs.timestamps = g_array_new(FALSE, FALSE, sizeof(liLogTimestamp*));
srv->logs.thread_alive = FALSE;
g_queue_init(&srv->logs.write_queue);
g_static_mutex_init(&srv->logs.write_queue_mutex);
/* first entry in srv->logs.timestamps is the default timestamp */
li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN(LOG_DEFAULT_TS_FORMAT)));
}
void li_log_cleanup(liServer *srv) {
guint i;
liLogTimestamp *ts;
/* wait for logging thread to exit */
if (g_atomic_int_get(&srv->logs.thread_alive) == TRUE)
{
li_log_thread_finish(srv);
g_thread_join(srv->logs.thread);
}
li_log_ref(srv, log);
li_radixtree_free(srv->logs.targets, NULL, NULL);
for (i = 0; i < srv->logs.timestamps->len; i++) {
ts = g_array_index(srv->logs.timestamps, liLogTimestamp*, i);
/*g_print("ts #%d refcount: %d\n", i, ts->refcount);*/
if (li_log_timestamp_free(srv, g_array_index(srv->logs.timestamps, liLogTimestamp*, 0)))
i--;
}
g_array_free(srv->logs.timestamps, TRUE);
ev_loop_destroy(srv->logs.loop);
}
gboolean li_log_write_direct(liServer *srv, liVRequest *vr, GString *path, GString *msg) {
liLogEntry *log_entry;
liWorker *wrk;
log_entry = g_slice_new(liLogEntry);
log_entry->log = log;
log_entry->path = g_string_new_len(GSTR_LEN(path));
log_entry->ts = NULL;
log_entry->level = 0;
log_entry->flags = 0;
log_entry->msg = msg;
log_entry->queue_link.data = log_entry;
log_entry->queue_link.next = NULL;
log_entry->queue_link.prev = NULL;
if (G_LIKELY(vr)) {
/* push onto local worker log queue */
wrk = vr->wrk;
g_queue_push_tail_link(&wrk->log_queue, &log_entry->queue_link);
} else {
/* no worker context, push directly onto global log queue */
g_static_mutex_lock(&srv->logs.write_queue_mutex);
g_queue_push_tail_link(&srv->logs.write_queue, &log_entry->queue_link);
g_static_mutex_unlock(&srv->logs.write_queue_mutex);
ev_async_send(srv->logs.loop, &srv->logs.watcher);
}
g_async_queue_push(srv->logs.queue, log_entry);
return TRUE;
}
gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guint flags, const gchar *fmt, ...) {
gboolean li_log_write(liServer *srv, liVRequest *vr, liLogLevel log_level, guint flags, const gchar *fmt, ...) {
liWorker *wrk;
va_list ap;
GString *log_line;
liLog *log = NULL;
liLogEntry *log_entry;
liLogTimestamp *ts = NULL;
GArray *logs = NULL;
GString *path;
if (vr != NULL) {
if (!srv) srv = vr->wrk->srv;
wrk = vr->wrk;
if (!srv) srv = wrk->srv;
/* get log from connection */
logs = CORE_OPTIONPTR(LI_CORE_OPTION_LOG).list;
ts = CORE_OPTIONPTR(LI_CORE_OPTION_LOG_TS_FORMAT).ptr;
}
else {
} else {
liOptionPtrValue *ologval = NULL;
wrk = NULL;
if (0 + LI_CORE_OPTION_LOG < srv->optionptr_def_values->len) {
ologval = g_array_index(srv->optionptr_def_values, liOptionPtrValue*, 0 + LI_CORE_OPTION_LOG);
}
if (ologval != NULL) logs = ologval->data.list;
if (ologval != NULL)
logs = ologval->data.list;
}
if (logs != NULL && log_level < logs->len) {
log = g_array_index(logs, liLog*, log_level);
/* if (log == NULL)
return TRUE;*/
path = g_array_index(logs, GString*, log_level);
} else {
return FALSE;
}
if (NULL == ts && 0 < srv->logs.timestamps->len) {
if (NULL == ts && srv->logs.timestamps->len > 0) {
ts = g_array_index(srv->logs.timestamps, liLogTimestamp*, 0);
}
if (log) li_log_ref(srv, log);
log_line = g_string_sized_new(0);
log_line = g_string_sized_new(63);
va_start(ap, fmt);
g_string_vprintf(log_line, fmt, ap);
va_end(ap);
if (log && !(flags & LOG_FLAG_NOLOCK))
log_lock(log);
#if 0
/* - needs extra handling for switching between server state (angel log/normal log)
* - needs an option to turn it off, as it is bad for debugging (you expect to see error messages immediately
*/
if (!(flags & LOG_FLAG_ALLOW_REPEAT)) {
/* check if last message for this log was the same */
if (g_string_equal(log->lastmsg, log_line)) {
log->lastmsg_count++;
if (!(flags & LOG_FLAG_NOLOCK))
log_unlock(log);
log_unref(srv, log);
g_string_free(log_line, TRUE);
return TRUE;
}
else {
if (log->lastmsg_count > 0) {
guint count = log->lastmsg_count;
log->lastmsg_count = 0;
li_log_write_(srv, vr, log_level, flags | LOG_FLAG_NOLOCK | LOG_FLAG_ALLOW_REPEAT, "last message repeated %d times", count);
}
}
}
g_string_assign(log->lastmsg, log_line->str);
#endif
/* for normal error messages, we prepend a timestamp */
if (flags & LOG_FLAG_TIMESTAMP) {
time_t cur_ts;
liLogTimestamp fake_ts;
GString fake_ts_format;
GString *tmpstr = NULL;
g_mutex_lock(srv->logs.mutex);
/* if we have a worker context, we can use its timestamp to save us a call to time() */
if (vr != NULL)
cur_ts = (time_t)CUR_TS(vr->wrk);
else
cur_ts = time(NULL);
if (NULL == ts) {
ts = &fake_ts;
ts->last_ts = 0;
fake_ts_format = li_const_gstring(CONST_STR_LEN(DEFAULT_TS_FORMAT));
ts->format = &fake_ts_format;
ts->cached = tmpstr = g_string_sized_new(255);
}
if (cur_ts != ts->last_ts) {
gsize s;
#ifdef HAVE_LOCALTIME_R
struct tm tm;
#endif
g_string_set_size(ts->cached, 255);
#ifdef HAVE_LOCALTIME_R
s = strftime(ts->cached->str, ts->cached->allocated_len,
ts->format->str, localtime_r(&cur_ts, &tm));
#else
s = strftime(ts->cached->str, ts->cached->allocated_len,
ts->format->str, localtime(&cur_ts));
#endif
g_string_set_size(ts->cached, s);
ts->last_ts = cur_ts;
}
g_string_prepend_c(log_line, ' ');
g_string_prepend_len(log_line, GSTR_LEN(ts->cached));
if (NULL != tmpstr) g_string_free(tmpstr, TRUE);
g_mutex_unlock(srv->logs.mutex);
}
if (log && !(flags & LOG_FLAG_NOLOCK))
log_unlock(log);
g_string_append_len(log_line, CONST_STR_LEN("\r\n"));
if (!log) {
li_angel_log(srv, log_line);
if (!path) {
li_log_write_stderr(srv, log_line->str, TRUE);
g_string_free(log_line, TRUE);
return TRUE;
}
@ -175,66 +241,132 @@ gboolean li_log_write_(liServer *srv, liVRequest *vr, liLogLevel log_level, guin
case LI_SERVER_WARMUP:
case LI_SERVER_STOPPING:
case LI_SERVER_DOWN:
li_log_unref(srv, log);
li_angel_log(srv, log_line);
li_log_write_stderr(srv, log_line->str, TRUE);
g_string_free(log_line, TRUE);
return TRUE;
default:
break;
}
log_entry = g_slice_new(liLogEntry);
log_entry->log = log;
log_entry->msg = log_line;
log_entry->path = g_string_new_len(GSTR_LEN(path));
log_entry->ts = ts;
log_entry->level = log_level;
log_entry->flags = flags;
log_entry->msg = log_line;
log_entry->queue_link.data = log_entry;
log_entry->queue_link.next = NULL;
log_entry->queue_link.prev = NULL;
if (G_LIKELY(wrk)) {
/* push onto local worker log queue */
g_queue_push_tail_link(&wrk->log_queue, &log_entry->queue_link);
} else {
/* no worker context, push directly onto global log queue */
g_static_mutex_lock(&srv->logs.write_queue_mutex);
g_queue_push_tail_link(&srv->logs.write_queue, &log_entry->queue_link);
g_static_mutex_unlock(&srv->logs.write_queue_mutex);
ev_async_send(srv->logs.loop, &srv->logs.watcher);
}
g_async_queue_push(srv->logs.queue, log_entry);
/* on critical error, exit */
/* TODO: write message immediately, as the log write is followed by an abort() */
//g_print("log_write: %s -> %s\n", log_line->str, path->str);
return TRUE;
}
static gpointer log_thread(liServer *srv) {
ev_loop(srv->logs.loop, 0);
return NULL;
}
static GString *log_timestamp_format(liServer *srv, liLogTimestamp *ts) {
gsize s;
struct tm tm;
time_t now = (time_t) ev_now(srv->logs.loop);
/* cache hit */
if (now == ts->last_ts)
return ts->cached;
#ifdef HAVE_LOCALTIME_R
s = strftime(ts->cached->str, ts->cached->allocated_len, ts->format->str, localtime_r(&now, &tm));
#else
s = strftime(ts->cached->str, ts->cached->allocated_len, ts->format->str, localtime(&now));
#endif
g_string_set_size(ts->cached, s);
ts->last_ts = now;
return ts->cached;
}
static void log_watcher_cb(struct ev_loop *loop, ev_async *w, int revents) {
liServer *srv = (liServer*) w->data;
liLog *log;
liLogEntry *log_entry;
GList *queue_link, *queue_link_next;
GString *msg;
gssize bytes_written;
gssize write_res;
while (TRUE) {
if (g_atomic_int_get(&srv->logs.thread_stop) == TRUE)
break;
UNUSED(loop);
UNUSED(revents);
if (g_atomic_int_get(&srv->logs.thread_finish) == TRUE && g_async_queue_length(srv->logs.queue) == 0)
break;
if (g_atomic_int_get(&srv->logs.thread_stop) == TRUE) {
liWaitQueueElem *wqe;
log_entry = g_async_queue_pop(srv->logs.queue);
/* if log_entry->log is NULL, it means that the logger thread has been woken up probably because it should exit */
if (log_entry->log == NULL) {
g_slice_free(liLogEntry, log_entry);
continue;
while ((wqe = li_waitqueue_pop_force(&srv->logs.close_queue)) != NULL) {
log_close(srv, wqe->data);
}
li_waitqueue_stop(&srv->logs.close_queue);
ev_async_stop(srv->logs.loop, &srv->logs.watcher);
return;
}
log = log_entry->log;
msg = log_entry->msg;
/* pop everything from global write queue */
g_static_mutex_lock(&srv->logs.write_queue_mutex);
queue_link = g_queue_peek_head_link(&srv->logs.write_queue);
g_queue_init(&srv->logs.write_queue);
g_static_mutex_unlock(&srv->logs.write_queue_mutex);
while (queue_link) {
log_entry = queue_link->data;
log = log_open(srv, log_entry->path);
msg = log_entry->msg;
bytes_written = 0;
if (!log) {
li_log_write_stderr(srv, log_entry->msg->str, TRUE);
goto next;
}
if (log_entry->flags & LOG_FLAG_TIMESTAMP) {
log_timestamp_format(srv, log_entry->ts);
g_string_prepend_c(msg, ' ');
g_string_prepend_len(msg, GSTR_LEN(log_entry->ts->cached));
}
g_string_append_len(msg, CONST_STR_LEN("\n"));
/* todo: support for other logtargets than files */
while (bytes_written < (gssize)msg->len) {
write_res = write(log->fd, msg->str + bytes_written, msg->len - bytes_written);
//write_res = msg->len;
/* write() failed, check why */
if (write_res == -1) {
switch (errno) {
GString *str;
int err = errno;
switch (err) {
case EAGAIN:
case EINTR:
continue;
}
g_printerr("could not write to log: %s\n", msg->str);
str = g_string_sized_new(63);
g_string_printf(str, "could not write to log '%s': %s\n", log_entry->path->str, g_strerror(err));
li_log_write_stderr(srv, str->str, TRUE);
li_log_write_stderr(srv, msg->str, TRUE);
break;
}
else {
@ -243,50 +375,26 @@ static gpointer log_thread(liServer *srv) {
}
}
g_string_free(msg, TRUE);
next:
queue_link_next = queue_link->next;
g_string_free(log_entry->path, TRUE);
g_string_free(log_entry->msg, TRUE);
g_slice_free(liLogEntry, log_entry);
li_log_unref(srv, log);
queue_link = queue_link_next;
}
return NULL;
}
static void log_rotate(gchar * path, liLog *log, liServer * UNUSED_PARAM(srv)) {
if (g_atomic_int_get(&srv->logs.thread_finish) == TRUE) {
liWaitQueueElem *wqe;
switch (log->type) {
case LI_LOG_TYPE_FILE:
close(log->fd);
log->fd = open(log->path->str, O_RDWR | O_CREAT | O_APPEND, 0660);
if (log->fd == -1) {
g_printerr("failed to reopen log: %s\n", path);
assert(NULL); /* TODO */
}
break;
case LI_LOG_TYPE_STDERR:
break;
case LI_LOG_TYPE_PIPE:
case LI_LOG_TYPE_SYSLOG:
case LI_LOG_TYPE_NONE:
/* TODO */
assert(NULL);
while ((wqe = li_waitqueue_pop_force(&srv->logs.close_queue)) != NULL) {
log_close(srv, wqe->data);
}
li_waitqueue_stop(&srv->logs.close_queue);
ev_async_stop(srv->logs.loop, &srv->logs.watcher);
return;
}
g_string_truncate(log->lastmsg, 0);
log->lastmsg_count = 0;
}
void li_log_ref(liServer *srv, liLog *log) {
UNUSED(srv);
g_atomic_int_inc(&log->refcount);
}
void li_log_unref(liServer *srv, liLog *log) {
g_mutex_lock(srv->logs.mutex);
if (g_atomic_int_dec_and_test(&log->refcount))
log_free_unlocked(srv, log);
g_mutex_unlock(srv->logs.mutex);
return;
}
liLogType li_log_type_from_path(GString *path) {
@ -348,153 +456,30 @@ gchar* li_log_level_str(liLogLevel log_level) {
}
}
liLog *li_log_new(liServer *srv, liLogType type, GString *path) {
liLog *log;
gint fd = -1;
if (type == LI_LOG_TYPE_NONE)
return NULL;
g_mutex_lock(srv->logs.mutex);
log = g_hash_table_lookup(srv->logs.targets, path->str);
/* log already open, inc refcount */
if (log != NULL)
{
g_atomic_int_inc(&log->refcount);
g_mutex_unlock(srv->logs.mutex);
return log;
}
switch (type) {
case LI_LOG_TYPE_STDERR:
fd = STDERR_FILENO;
break;
case LI_LOG_TYPE_FILE:
fd = open(path->str, O_RDWR | O_CREAT | O_APPEND, 0660);
break;
case LI_LOG_TYPE_PIPE:
case LI_LOG_TYPE_SYSLOG:
case LI_LOG_TYPE_NONE:
/* TODO */
fd = -1;
assert(NULL);
}
if (fd == -1) {
g_printerr("failed to open log: %s", g_strerror(errno));
return NULL;
}
log = g_slice_new0(liLog);
log->lastmsg = g_string_sized_new(0);
log->fd = fd;
log->path = g_string_new_len(GSTR_LEN(path));
log->refcount = 1;
log->mutex = g_mutex_new();
g_hash_table_insert(srv->logs.targets, log->path->str, log);
g_mutex_unlock(srv->logs.mutex);
return log;
}
/* only call this if srv->logs.mutex is NOT locked */
static void log_free(liServer *srv, liLog *log) {
g_mutex_lock(srv->logs.mutex);
log_free_unlocked(srv, log);
g_mutex_unlock(srv->logs.mutex);
}
/* only call this if srv->log_mutex IS locked */
static void log_free_unlocked(liServer *srv, liLog *log) {
if (log->type == LI_LOG_TYPE_FILE || log->type == LI_LOG_TYPE_PIPE)
close(log->fd);
g_hash_table_remove(srv->logs.targets, log->path);
g_string_free(log->path, TRUE);
g_string_free(log->lastmsg, TRUE);
g_mutex_free(log->mutex);
g_slice_free(liLog, log);
}
void li_log_init(liServer *srv) {
GString *str;
srv->logs.targets = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL);
srv->logs.queue = g_async_queue_new();
srv->logs.mutex = g_mutex_new();
srv->logs.timestamps = g_array_new(FALSE, FALSE, sizeof(liLogTimestamp*));
srv->logs.thread_alive = FALSE;
/* first entry in srv->logs.timestamps is the default timestamp */
li_log_timestamp_new(srv, g_string_new_len(CONST_STR_LEN(DEFAULT_TS_FORMAT)));
/* first entry in srv->logs.targets is the plain good old stderr */
str = g_string_new_len(CONST_STR_LEN("stderr"));
srv->logs.stderr = li_log_new(srv, LI_LOG_TYPE_STDERR, str);
g_string_free(str, TRUE);
}
void li_log_cleanup(liServer *srv) {
guint i;
liLogTimestamp *ts;
/* wait for logging thread to exit */
if (g_atomic_int_get(&srv->logs.thread_alive) == TRUE)
{
log_thread_finish(srv);
g_thread_join(srv->logs.thread);
}
log_free(srv, srv->logs.stderr);
g_hash_table_destroy(srv->logs.targets);
g_mutex_free(srv->logs.mutex);
g_async_queue_unref(srv->logs.queue);
for (i = 0; i < srv->logs.timestamps->len; i++) {
ts = g_array_index(srv->logs.timestamps, liLogTimestamp*, i);
/* g_print("ts #%d refcount: %d\n", i, ts->refcount); */
/*if (g_atomic_int_dec_and_test(&ts->refcount)) {
g_string_free(ts->cached, TRUE);
g_string_free(ts->format, TRUE);
g_slice_free(log_timestamp_t, ts);
g_array_remove_index_fast(srv->logs.timestamps, i);
i--;
}*/
}
li_log_timestamp_free(srv, g_array_index(srv->logs.timestamps, liLogTimestamp*, 0));
g_array_free(srv->logs.timestamps, TRUE);
}
void li_log_thread_start(liServer *srv) {
GError *err = NULL;
ev_async_start(srv->logs.loop, &srv->logs.watcher);
srv->logs.thread = g_thread_create((GThreadFunc)log_thread, srv, TRUE, &err);
g_atomic_int_set(&srv->logs.thread_alive, TRUE);
if (srv->logs.thread == NULL) {
g_printerr("could not create loggin thread: %s\n", err->message);
g_printerr("could not create logging thread: %s\n", err->message);
g_error_free(err);
abort();
}
g_atomic_int_set(&srv->logs.thread_alive, TRUE);
}
static void log_thread_stop(liServer *srv) {
void li_log_thread_stop(liServer *srv) {
if (g_atomic_int_get(&srv->logs.thread_alive) == TRUE) {
g_atomic_int_set(&srv->logs.thread_stop, TRUE);
li_log_thread_wakeup(srv);
}
}
static void log_thread_finish(liServer *srv) {
void li_log_thread_finish(liServer *srv) {
if (g_atomic_int_get(&srv->logs.thread_alive) == TRUE) {
g_atomic_int_set(&srv->logs.thread_finish, TRUE);
li_log_thread_wakeup(srv);
@ -502,14 +487,10 @@ static void log_thread_finish(liServer *srv) {
}
void li_log_thread_wakeup(liServer *srv) {
liLogEntry *e;
if (!g_atomic_int_get(&srv->logs.thread_alive))
li_log_thread_start(srv);
e = g_slice_new0(liLogEntry);
g_async_queue_push(srv->logs.queue, e);
ev_async_send(srv->logs.loop, &srv->logs.watcher);
}
@ -528,7 +509,7 @@ liLogTimestamp *li_log_timestamp_new(liServer *srv, GString *format) {
ts = g_slice_new(liLogTimestamp);
ts->cached = g_string_sized_new(0);
ts->cached = g_string_sized_new(255);
ts->last_ts = 0;
ts->refcount = 1;
ts->format = format;
@ -563,7 +544,7 @@ void li_log_split_lines(liServer *srv, liVRequest *vr, liLogLevel log_level, gui
if ('\r' == *txt || '\n' == *txt) {
*txt = '\0';
if (txt - start > 1) { /* skip empty lines*/
li_log_write_(srv, vr, log_level, flags, "%s%s", prefix, start);
li_log_write(srv, vr, log_level, flags, "%s%s", prefix, start);
}
txt++;
while (*txt == '\n' || *txt == '\r') txt++;
@ -573,7 +554,7 @@ void li_log_split_lines(liServer *srv, liVRequest *vr, liLogLevel log_level, gui
}
}
if (txt - start > 1) { /* skip empty lines*/
li_log_write_(srv, vr, log_level, flags, "%s%s", prefix, start);
li_log_write(srv, vr, log_level, flags, "%s%s", prefix, start);
}
}

@ -931,6 +931,48 @@ static gboolean core_workers(liServer *srv, liPlugin* p, liValue *val, gpointer
return TRUE;
}
static gboolean core_workers_cpu_affinity(liServer *srv, liPlugin* p, liValue *val, gpointer userdata) {
#if defined(LIGHTY_OS_LINUX)
GArray *arr1, *arr2;
guint i, j;
liValue *v;
UNUSED(p); UNUSED(userdata);
if (val->type != LI_VALUE_LIST) {
ERROR(srv, "%s", "workers.cpu_affinity expects a list of integers or list of list of integers");
return FALSE;
}
arr1 = val->data.list;
for (i = 0; i < arr1->len; i++) {
v = g_array_index(arr1, liValue*, i);
if (v->type == LI_VALUE_NUMBER)
continue;
if (v->type == LI_VALUE_LIST) {
arr2 = v->data.list;
for (j = 0; j < arr2->len; j++) {
if (g_array_index(arr2, liValue*, j)->type != LI_VALUE_NUMBER) {
ERROR(srv, "%s", "workers.cpu_affinity expects a list of integers or list of list of integers");
return FALSE;
}
}
} else {
ERROR(srv, "%s", "workers.cpu_affinity expects a list of integers or list of list of integers");
return FALSE;
}
}
srv->workers_cpu_affinity = li_value_copy(val);
return TRUE;
#else
ERROR(srv, "%s", "workers.cpu_affinity is only available on Linux systems");
return FALSE;
#endif
}
static gboolean core_module_load(liServer *srv, liPlugin* p, liValue *val, gpointer userdata) {
liValue *mods = li_value_new_list();
@ -1035,7 +1077,7 @@ static gboolean core_option_log_parse(liServer *srv, liWorker *wrk, liPlugin *p,
liLogLevel level;
GString *path;
GString *level_str;
GArray *arr = g_array_sized_new(FALSE, TRUE, sizeof(liLog*), 6);
GArray *arr = g_array_sized_new(FALSE, TRUE, sizeof(GString*), 6);
UNUSED(wrk);
UNUSED(p);
UNUSED(ndx);
@ -1046,13 +1088,9 @@ static gboolean core_option_log_parse(liServer *srv, liWorker *wrk, liPlugin *p,
/* default value */
if (!val) {
/* default: log LI_LOG_LEVEL_WARNING, LI_LOG_LEVEL_ERROR and LI_LOG_LEVEL_BACKEND to stderr */
liLog *log = srv->logs.stderr;
li_log_ref(srv, log);
g_array_index(arr, liLog*, LI_LOG_LEVEL_WARNING) = log;
li_log_ref(srv, log);
g_array_index(arr, liLog*, LI_LOG_LEVEL_ERROR) = log;
li_log_ref(srv, log);
g_array_index(arr, liLog*, LI_LOG_LEVEL_BACKEND) = log;
g_array_index(arr, GString*, LI_LOG_LEVEL_WARNING) = g_string_new_len(CONST_STR_LEN("stderr"));
g_array_index(arr, GString*, LI_LOG_LEVEL_ERROR) = g_string_new_len(CONST_STR_LEN("stderr"));
g_array_index(arr, GString*, LI_LOG_LEVEL_BACKEND) = g_string_new_len(CONST_STR_LEN("stderr"));
return TRUE;
}
@ -1069,18 +1107,16 @@ static gboolean core_option_log_parse(liServer *srv, liWorker *wrk, liPlugin *p,
if (g_str_equal(level_str->str, "*")) {
for (guint i = 0; i < arr->len; i++) {
liLog *log;
/* overwrite old path */
if (NULL != g_array_index(arr, GString*, i))
g_string_free(g_array_index(arr, GString*, i), TRUE);
if (NULL != g_array_index(arr, liLog*, i))
continue;
log = li_log_new(srv, li_log_type_from_path(path), path);
g_array_index(arr, liLog*, i) = log;
g_array_index(arr, GString*, i) = g_string_new_len(GSTR_LEN(path));
}
}
else {
liLog *log = li_log_new(srv, li_log_type_from_path(path), path);
level = li_log_level_from_string(level_str);
g_array_index(arr, liLog*, level) = log;
g_array_index(arr, GString*, level) = g_string_new_len(GSTR_LEN(path));;
}
}
@ -1089,14 +1125,16 @@ static gboolean core_option_log_parse(liServer *srv, liWorker *wrk, liPlugin *p,
static void core_option_log_free(liServer *srv, liPlugin *p, size_t ndx, gpointer oval) {
GArray *arr = oval;
UNUSED(srv);
UNUSED(p);
UNUSED(ndx);
if (!arr) return;
for (guint i = 0; i < arr->len; i++) {
if (NULL != g_array_index(arr, liLog*, i))
li_log_unref(srv, g_array_index(arr, liLog*, i));
if (NULL != g_array_index(arr, GString*, i))
g_string_free(g_array_index(arr, GString*, i), TRUE);
}
g_array_free(arr, TRUE);
}
@ -1684,6 +1722,7 @@ static const liPluginSetup setups[] = {
{ "set_default", core_setup_set, NULL },
{ "listen", core_listen, NULL },
{ "workers", core_workers, NULL },
{ "workers.cpu_affinity", core_workers_cpu_affinity, NULL },
{ "module_load", core_module_load, NULL },
{ "io.timeout", core_io_timeout, NULL },
{ "stat_cache.ttl", core_stat_cache_ttl, NULL },
@ -1705,32 +1744,44 @@ static void plugin_core_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk
UNUSED(p);
#if defined(LIGHTY_OS_LINUX) && 0
#if defined(LIGHTY_OS_LINUX)
/* sched_setaffinity is only available on linux */
if (srv->affinity_cpus != 0) {
gint cpu;
guint cpu_nth;
cpu_set_t mask;
/* bind worker to n-th cpu */
for (cpu_nth = 0, cpu = 0; cpu < CPU_SETSIZE; cpu++) {
//g_print("wrk: %u cpu: %d\n", wrk->ndx, cpu);
if (!CPU_ISSET(cpu, &srv->affinity_mask))
continue;
if ((wrk->ndx % srv->affinity_cpus) == cpu_nth) {
CPU_ZERO(&mask);
CPU_SET(wrk->ndx % srv->affinity_cpus, &mask);
DEBUG(srv, "binding worker #%u to cpu #%u", wrk->ndx+1, wrk->ndx % srv->affinity_cpus);
if (0 != sched_setaffinity(0, sizeof(srv->affinity_mask), &mask)) {
ERROR(srv, "couldn't set cpu affinity mask: %s", g_strerror(errno));
}
cpu_set_t mask;
liValue *v = srv->workers_cpu_affinity;
GArray *arr;
break;
}
if (!v)
return;
arr = v->data.list;
if (wrk->ndx >= arr->len) {
WARNING(srv, "worker #%u has no entry in workers.cpu_affinity", wrk->ndx+1);
return;
}
CPU_ZERO(&mask);
v = g_array_index(arr, liValue*, wrk->ndx);
if (v->type == LI_VALUE_NUMBER) {
CPU_SET(v->data.number, &mask);
DEBUG(srv, "binding worker #%u to cpu %u", wrk->ndx+1, (guint)v->data.number);
} else {
guint i;
g_string_truncate(wrk->tmp_str, 0);
arr = v->data.list;
cpu_nth++;
for (i = 0; i < arr->len; i++) {
CPU_SET(g_array_index(arr, liValue*, i)->data.number, &mask);
g_string_append_printf(wrk->tmp_str, i ? ",%u":"%u", (guint)g_array_index(arr, liValue*, i)->data.number);
}
DEBUG(srv, "binding worker #%u to cpus %s", wrk->ndx+1, wrk->tmp_str->str);
}
if (0 != sched_setaffinity(0, sizeof(mask), &mask)) {
ERROR(srv, "couldn't set cpu affinity mask for worker #%u: %s", wrk->ndx, g_strerror(errno));
}
#else
UNUSED(srv); UNUSED(wrk);

@ -181,20 +181,6 @@ liServer* li_server_new(const gchar *module_dir, gboolean module_resident) {
srv->stat_cache_ttl = 10.0; /* default stat cache ttl */
srv->tasklet_pool_threads = 4; /* default per-worker tasklet_pool threads */
#ifdef LIGHTY_OS_LINUX
/* sched_getaffinity is only available on linux */
if (0 != sched_getaffinity(0, sizeof(srv->affinity_mask), &srv->affinity_mask)) {
ERROR(srv, "couldn't get cpu affinity mask: %s", g_strerror(errno));
} else {
/* how many cpus do we have */
gint cpu;
for (cpu = 0; cpu < CPU_SETSIZE; cpu++) {
if (CPU_ISSET(cpu, &srv->affinity_mask))
srv->affinity_cpus++;
}
}
#endif
return srv;
}
@ -293,6 +279,8 @@ void li_server_free(liServer* srv) {
g_array_free(srv->li_plugins_handle_close, TRUE);
g_array_free(srv->li_plugins_handle_vrclose, TRUE);
li_value_free(srv->workers_cpu_affinity);
if (srv->started_str)
g_string_free(srv->started_str, TRUE);

@ -266,6 +266,37 @@ GString *li_worker_current_timestamp(liWorker *wrk, liTimeFunc timefunc, guint f
return wts->str;
}
/* loop prepare watcher */
static void li_worker_loop_prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
liWorker *wrk = (liWorker*) w->data;
liServer *srv = wrk->srv;
GList *lnk;
UNUSED(loop);
UNUSED(revents);
/* are there pending log entries? */
if (g_queue_get_length(&wrk->log_queue)) {
//g_print("pending log entries: %d\n", g_queue_get_length(&wrk->log_queue));
/* take log entries from local queue, insert into global queue and notify log thread */
g_static_mutex_lock(&srv->logs.write_queue_mutex);
/* have to concatenate the queues by hand as g_queue_push_tail_link() cannot handle this simple task */
lnk = g_queue_peek_head_link(&wrk->log_queue);
lnk->prev = srv->logs.write_queue.tail;
if (srv->logs.write_queue.tail)
srv->logs.write_queue.tail->next = lnk;
else
srv->logs.write_queue.head = lnk;
srv->logs.write_queue.tail = g_queue_peek_tail_link(&wrk->log_queue);
srv->logs.write_queue.length++;
g_static_mutex_unlock(&srv->logs.write_queue_mutex);
ev_async_send(srv->logs.loop, &srv->logs.watcher);
/* clear local worker queue */
g_queue_init(&wrk->log_queue);
}
}
/* stop worker watcher */
static void li_worker_stop_cb(struct ev_loop *loop, ev_async *w, int revents) {
liWorker *wrk = (liWorker*) w->data;

@ -31,6 +31,8 @@
* MIT, see COPYING file in the lighttpd 2 tree
*/
#define AL_DEFAULT_FORMAT "%h %V %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\""
#include <lighttpd/base.h>
#include <lighttpd/plugin_core.h>
@ -160,7 +162,7 @@ static al_format al_get_format(gchar c) {
if (al_format_mapping[i].character == c)
break;
}
return al_format_mapping[i];
}
@ -180,7 +182,7 @@ static GArray *al_parse_format(liServer *srv, GString *formatstr) {
GArray *arr = g_array_new(FALSE, TRUE, sizeof(al_format_entry));
al_format_entry e;
gchar *c, *k;
for (c = formatstr->str; *c != '\0';) {
if (*c == '%') {
@ -390,35 +392,31 @@ static void al_handle_vrclose(liVRequest *vr, liPlugin *p) {
/* VRequest closed, log it */
GString *msg;
liResponse *resp = &vr->response;
liLog *log = OPTIONPTR(AL_OPTION_ACCESSLOG).ptr;
GString *log_path = OPTIONPTR(AL_OPTION_ACCESSLOG).ptr;
GArray *format = OPTIONPTR(AL_OPTION_ACCESSLOG_FORMAT).list;
UNUSED(p);
if (LI_VRS_CLEAN == vr->state || resp->http_status == 0 || !log || !format)
if (LI_VRS_CLEAN == vr->state || resp->http_status == 0 || !log_path || !format)
/* if status code is zero, it means the connection was closed while in keep alive state or similar and no logging is needed */
return;
msg = al_format_log(vr, p->data, format);
g_string_append_len(msg, CONST_STR_LEN("\r\n"));
li_log_write(vr->wrk->srv, log, msg);
li_log_write_direct(vr->wrk->srv, vr, log_path, msg);
}
static void al_option_accesslog_free(liServer *srv, liPlugin *p, size_t ndx, gpointer oval) {
UNUSED(srv);
UNUSED(p);
UNUSED(ndx);
if (!oval) return;
li_log_unref(srv, oval);
g_string_free(oval, TRUE);
}
static gboolean al_option_accesslog_parse(liServer *srv, liWorker *wrk, liPlugin *p, size_t ndx, liValue *val, gpointer *oval) {
liLog *log;
UNUSED(wrk);
UNUSED(p);
UNUSED(ndx);