|
|
|
@ -74,10 +74,16 @@ typedef enum {
|
|
|
|
|
PROGRESS_FORMAT_DUMP
|
|
|
|
|
} mod_progress_format;
|
|
|
|
|
|
|
|
|
|
typedef struct mod_progress_node mod_progress_node;
|
|
|
|
|
typedef struct mod_progress_data mod_progress_data;
|
|
|
|
|
typedef struct mod_progress_worker_data mod_progress_worker_data;
|
|
|
|
|
typedef struct mod_progress_show_param mod_progress_show_param;
|
|
|
|
|
typedef struct mod_progress_job mod_progress_job;
|
|
|
|
|
|
|
|
|
|
struct mod_progress_node {
|
|
|
|
|
gchar *id; /* unique id */
|
|
|
|
|
liWaitQueueElem timeout_queue_elem;
|
|
|
|
|
guint wrk_ndx;
|
|
|
|
|
mod_progress_worker_data *worker_data;
|
|
|
|
|
liVRequest *vr; /* null in case of tombstone. otherwise the following vars will not be valid! */
|
|
|
|
|
goffset request_size;
|
|
|
|
|
goffset response_size;
|
|
|
|
@ -85,41 +91,46 @@ struct mod_progress_node {
|
|
|
|
|
guint64 bytes_out;
|
|
|
|
|
gint status_code;
|
|
|
|
|
};
|
|
|
|
|
typedef struct mod_progress_node mod_progress_node;
|
|
|
|
|
|
|
|
|
|
struct mod_progress_data {
|
|
|
|
|
liPlugin *p;
|
|
|
|
|
guint ttl;
|
|
|
|
|
GHashTable **hash_tables;
|
|
|
|
|
liWaitQueue *timeout_queues; /* each worker has its own timeout queue */
|
|
|
|
|
mod_progress_worker_data *worker_data;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct mod_progress_worker_data {
|
|
|
|
|
mod_progress_data *pd;
|
|
|
|
|
guint wrk_ndx;
|
|
|
|
|
GHashTable *hash_table;
|
|
|
|
|
liWaitQueue timeout_queue; /* each worker has its own timeout queue */
|
|
|
|
|
};
|
|
|
|
|
typedef struct mod_progress_data mod_progress_data;
|
|
|
|
|
|
|
|
|
|
struct mod_progress_show_param {
|
|
|
|
|
liPlugin *p;
|
|
|
|
|
mod_progress_format format;
|
|
|
|
|
};
|
|
|
|
|
typedef struct mod_progress_show_param mod_progress_show_param;
|
|
|
|
|
|
|
|
|
|
struct mod_progress_job {
|
|
|
|
|
liVRequest *vr;
|
|
|
|
|
gpointer *context;
|
|
|
|
|
gboolean debug;
|
|
|
|
|
mod_progress_format format;
|
|
|
|
|
|
|
|
|
|
gchar *id;
|
|
|
|
|
liPlugin *p;
|
|
|
|
|
};
|
|
|
|
|
typedef struct mod_progress_job mod_progress_job;
|
|
|
|
|
|
|
|
|
|
/* global data */
|
|
|
|
|
static mod_progress_data progress_data;
|
|
|
|
|
// static mod_progress_data progress_data;
|
|
|
|
|
|
|
|
|
|
static void progress_timeout_callback(liWaitQueue *wq, gpointer data) {
|
|
|
|
|
liWorker *wrk = data;
|
|
|
|
|
mod_progress_worker_data *wrk_data = data;
|
|
|
|
|
liWaitQueueElem *wqe;
|
|
|
|
|
mod_progress_node *node;
|
|
|
|
|
|
|
|
|
|
while ((wqe = li_waitqueue_pop(wq)) != NULL) {
|
|
|
|
|
node = wqe->data;
|
|
|
|
|
g_hash_table_remove(progress_data.hash_tables[wrk->ndx], node->id);
|
|
|
|
|
g_hash_table_remove(wrk_data->hash_table, node->id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
li_waitqueue_update(wq);
|
|
|
|
@ -127,18 +138,20 @@ static void progress_timeout_callback(liWaitQueue *wq, gpointer data) {
|
|
|
|
|
|
|
|
|
|
static void progress_hashtable_free_callback(gpointer data) {
|
|
|
|
|
mod_progress_node *node = data;
|
|
|
|
|
mod_progress_worker_data *wd = node->worker_data;
|
|
|
|
|
|
|
|
|
|
if (node->vr) {
|
|
|
|
|
g_ptr_array_index(node->vr->plugin_ctx, progress_data.p->id) = NULL;
|
|
|
|
|
g_ptr_array_index(node->vr->plugin_ctx, wd->pd->p->id) = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
li_waitqueue_remove(&progress_data.timeout_queues[node->wrk_ndx], &(node->timeout_queue_elem));
|
|
|
|
|
li_waitqueue_remove(&wd->timeout_queue, &(node->timeout_queue_elem));
|
|
|
|
|
g_free(node->id);
|
|
|
|
|
g_slice_free(mod_progress_node, node);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void progress_vrclose(liVRequest *vr, liPlugin *p) {
|
|
|
|
|
mod_progress_node *node = (mod_progress_node*) g_ptr_array_index(vr->plugin_ctx, p->id);
|
|
|
|
|
mod_progress_data *pd = p->data;
|
|
|
|
|
|
|
|
|
|
if (node) {
|
|
|
|
|
/* connection is being tracked, replace with tombstone */
|
|
|
|
@ -148,7 +161,7 @@ static void progress_vrclose(liVRequest *vr, liPlugin *p) {
|
|
|
|
|
node->bytes_in = vr->vr_in->bytes_in;
|
|
|
|
|
node->bytes_out = MAX(0, vr->vr_out->bytes_out - vr->coninfo->out_queue_length);
|
|
|
|
|
node->status_code = vr->response.http_status;
|
|
|
|
|
li_waitqueue_push(&progress_data.timeout_queues[vr->wrk->ndx], &(node->timeout_queue_elem));
|
|
|
|
|
li_waitqueue_push(&pd->worker_data[vr->wrk->ndx].timeout_queue, &(node->timeout_queue_elem));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -158,6 +171,7 @@ static liHandlerResult progress_track(liVRequest *vr, gpointer param, gpointer *
|
|
|
|
|
liPlugin *p = (liPlugin*) param;
|
|
|
|
|
gboolean debug = _OPTION(vr, p, 0).boolean;
|
|
|
|
|
gint methods = _OPTION(vr, p, 1).number;
|
|
|
|
|
mod_progress_data *pd = p->data;
|
|
|
|
|
|
|
|
|
|
UNUSED(context);
|
|
|
|
|
|
|
|
|
@ -171,10 +185,10 @@ static liHandlerResult progress_track(liVRequest *vr, gpointer param, gpointer *
|
|
|
|
|
mod_progress_node *node = g_slice_new0(mod_progress_node);
|
|
|
|
|
node->timeout_queue_elem.data = node;
|
|
|
|
|
node->id = g_strndup(id, id_len);
|
|
|
|
|
node->wrk_ndx = vr->wrk->ndx;
|
|
|
|
|
node->worker_data = &pd->worker_data[vr->wrk->ndx];
|
|
|
|
|
node->vr = vr;
|
|
|
|
|
g_ptr_array_index(vr->plugin_ctx, progress_data.p->id) = node;
|
|
|
|
|
g_hash_table_replace(progress_data.hash_tables[vr->wrk->ndx], node->id, node);
|
|
|
|
|
g_ptr_array_index(vr->plugin_ctx, pd->p->id) = node;
|
|
|
|
|
g_hash_table_replace(node->worker_data->hash_table, node->id, node);
|
|
|
|
|
|
|
|
|
|
if (debug)
|
|
|
|
|
VR_DEBUG(vr, "progress.track: tracking progress with id \"%s\"", node->id);
|
|
|
|
@ -200,9 +214,10 @@ static liAction* progress_track_create(liServer *srv, liWorker *wrk, liPlugin* p
|
|
|
|
|
/* the CollectFunc */
|
|
|
|
|
static gpointer progress_collect_func(liWorker *wrk, gpointer fdata) {
|
|
|
|
|
mod_progress_node *node, *node_new;
|
|
|
|
|
gchar *id = fdata;
|
|
|
|
|
mod_progress_job *job = fdata;
|
|
|
|
|
mod_progress_data *pd = job->p->data;
|
|
|
|
|
|
|
|
|
|
node = g_hash_table_lookup(progress_data.hash_tables[wrk->ndx], id);
|
|
|
|
|
node = g_hash_table_lookup(pd->worker_data[wrk->ndx].hash_table, job->id);
|
|
|
|
|
|
|
|
|
|
if (!node)
|
|
|
|
|
return NULL;
|
|
|
|
@ -230,12 +245,13 @@ static void progress_collect_cb(gpointer cbdata, gpointer fdata, GPtrArray *resu
|
|
|
|
|
guint i;
|
|
|
|
|
GString *output;
|
|
|
|
|
mod_progress_node *node = NULL;
|
|
|
|
|
gchar *id = (gchar*)fdata;
|
|
|
|
|
mod_progress_job *job = cbdata;
|
|
|
|
|
mod_progress_job *job = fdata;
|
|
|
|
|
liVRequest *vr = job->vr;
|
|
|
|
|
gboolean debug = job->debug;
|
|
|
|
|
mod_progress_format format = job->format;
|
|
|
|
|
|
|
|
|
|
UNUSED(cbdata);
|
|
|
|
|
|
|
|
|
|
if (complete) {
|
|
|
|
|
/* clear context so it doesn't get cleaned up anymore */
|
|
|
|
|
*(job->context) = NULL;
|
|
|
|
@ -282,12 +298,12 @@ static void progress_collect_cb(gpointer cbdata, gpointer fdata, GPtrArray *resu
|
|
|
|
|
if (!node) {
|
|
|
|
|
/* progress id not known */
|
|
|
|
|
if (debug)
|
|
|
|
|
VR_DEBUG(vr, "progress.show: progress id \"%s\" unknown", id);
|
|
|
|
|
VR_DEBUG(vr, "progress.show: progress id \"%s\" unknown", job->id);
|
|
|
|
|
|
|
|
|
|
g_string_append_len(output, CONST_STR_LEN("{\"state\": \"unknown\"}"));
|
|
|
|
|
} else {
|
|
|
|
|
if (debug)
|
|
|
|
|
VR_DEBUG(vr, "progress.show: progress id \"%s\" found", id);
|
|
|
|
|
VR_DEBUG(vr, "progress.show: progress id \"%s\" found", job->id);
|
|
|
|
|
|
|
|
|
|
if (node->vr) {
|
|
|
|
|
/* still in progress */
|
|
|
|
@ -326,7 +342,7 @@ static void progress_collect_cb(gpointer cbdata, gpointer fdata, GPtrArray *resu
|
|
|
|
|
g_slice_free(mod_progress_node, g_ptr_array_index(result, i));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_free(id);
|
|
|
|
|
g_free(job->id);
|
|
|
|
|
g_slice_free(mod_progress_job, job);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -344,7 +360,7 @@ static liHandlerResult progress_collect_cleanup(liVRequest *vr, gpointer param,
|
|
|
|
|
static liHandlerResult progress_show(liVRequest *vr, gpointer param, gpointer *context) {
|
|
|
|
|
mod_progress_show_param *psp = (mod_progress_show_param*) param;
|
|
|
|
|
gboolean debug = _OPTION(vr, psp->p, 0).boolean;
|
|
|
|
|
gchar *id, *id_tmp;
|
|
|
|
|
gchar *id;
|
|
|
|
|
guint id_len;
|
|
|
|
|
liCollectInfo *ci;
|
|
|
|
|
mod_progress_job *job;
|
|
|
|
@ -361,19 +377,19 @@ static liHandlerResult progress_show(liVRequest *vr, gpointer param, gpointer *c
|
|
|
|
|
return LI_HANDLER_GO_ON;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id_tmp = g_strndup(id, id_len);
|
|
|
|
|
|
|
|
|
|
/* start collect job */
|
|
|
|
|
job = g_slice_new(mod_progress_job);
|
|
|
|
|
job->vr = vr;
|
|
|
|
|
job->context = context;
|
|
|
|
|
job->format = psp->format;
|
|
|
|
|
job->debug = debug;
|
|
|
|
|
ci = li_collect_start(vr->wrk, progress_collect_func, id_tmp, progress_collect_cb, job);
|
|
|
|
|
|
|
|
|
|
job->p = psp->p;
|
|
|
|
|
job->id = g_strndup(id, id_len);
|
|
|
|
|
|
|
|
|
|
ci = li_collect_start(vr->wrk, progress_collect_func, job, progress_collect_cb, NULL);
|
|
|
|
|
*context = ci; /* may be NULL */
|
|
|
|
|
return ci ? LI_HANDLER_WAIT_FOR_EVENT : LI_HANDLER_GO_ON;
|
|
|
|
|
|
|
|
|
|
return LI_HANDLER_GO_ON;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void progress_show_free(liServer *srv, gpointer param) {
|
|
|
|
@ -459,7 +475,7 @@ static gboolean progress_methods_parse(liServer *srv, liWorker *wrk, liPlugin *p
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static gboolean progress_ttl(liServer *srv, liPlugin* p, liValue *val, gpointer userdata) {
|
|
|
|
|
UNUSED(p);
|
|
|
|
|
mod_progress_data *pd = p->data;
|
|
|
|
|
UNUSED(userdata);
|
|
|
|
|
|
|
|
|
|
if (!val) {
|
|
|
|
@ -471,27 +487,24 @@ static gboolean progress_ttl(liServer *srv, liPlugin* p, liValue *val, gpointer
|
|
|
|
|
return FALSE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
progress_data.ttl = val->data.number;
|
|
|
|
|
pd->ttl = val->data.number;
|
|
|
|
|
|
|
|
|
|
return TRUE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void progress_prepare_worker(liServer *srv, liPlugin *p, liWorker *wrk) {
|
|
|
|
|
static gint once = 0;
|
|
|
|
|
static void progress_prepare(liServer *srv, liPlugin *p) {
|
|
|
|
|
mod_progress_data *pd = p->data;
|
|
|
|
|
guint i;
|
|
|
|
|
|
|
|
|
|
UNUSED(p);
|
|
|
|
|
pd->worker_data = g_slice_alloc0(sizeof(mod_progress_worker_data) * srv->worker_count);
|
|
|
|
|
for (i = 0; i < srv->worker_count; i++) {
|
|
|
|
|
liWorker *wrk = g_array_index(srv->workers, liWorker*, i);
|
|
|
|
|
|
|
|
|
|
/* initialize once */
|
|
|
|
|
if (g_atomic_int_compare_and_exchange(&once, 0, 1)) {
|
|
|
|
|
progress_data.hash_tables = g_new0(GHashTable*, srv->worker_count);
|
|
|
|
|
progress_data.timeout_queues = g_new0(liWaitQueue, srv->worker_count);
|
|
|
|
|
g_atomic_int_set(&once, 2);
|
|
|
|
|
} else {
|
|
|
|
|
while (g_atomic_int_get(&once) != 2) { }
|
|
|
|
|
pd->worker_data[i].pd = pd;
|
|
|
|
|
pd->worker_data[i].wrk_ndx = i;
|
|
|
|
|
pd->worker_data[i].hash_table = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, progress_hashtable_free_callback);
|
|
|
|
|
li_waitqueue_init(&(pd->worker_data[i].timeout_queue), wrk->loop, progress_timeout_callback, pd->ttl, &pd->worker_data[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
progress_data.hash_tables[wrk->ndx] = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, progress_hashtable_free_callback);
|
|
|
|
|
li_waitqueue_init(&(progress_data.timeout_queues[wrk->ndx]), wrk->loop, progress_timeout_callback, progress_data.ttl, wrk);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -521,22 +534,22 @@ static const liPluginSetup setups[] = {
|
|
|
|
|
|
|
|
|
|
static void plugin_progress_free(liServer *srv, liPlugin *p) {
|
|
|
|
|
guint i;
|
|
|
|
|
|
|
|
|
|
UNUSED(p);
|
|
|
|
|
mod_progress_data *pd = p->data;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < srv->worker_count; i++) {
|
|
|
|
|
g_hash_table_destroy(progress_data.hash_tables[i]);
|
|
|
|
|
g_hash_table_destroy(pd->worker_data[i].hash_table);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_free(progress_data.hash_tables);
|
|
|
|
|
g_free(progress_data.timeout_queues);
|
|
|
|
|
g_slice_free1(sizeof(mod_progress_worker_data) * srv->worker_count, pd->worker_data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void plugin_progress_init(liServer *srv, liPlugin *p, gpointer userdata) {
|
|
|
|
|
mod_progress_data *pd = g_slice_new0(mod_progress_data);
|
|
|
|
|
UNUSED(srv); UNUSED(userdata);
|
|
|
|
|
|
|
|
|
|
progress_data.p = p;
|
|
|
|
|
progress_data.ttl = 30;
|
|
|
|
|
p->data = pd;
|
|
|
|
|
pd->p = p;
|
|
|
|
|
pd->ttl = 30;
|
|
|
|
|
|
|
|
|
|
p->options = options;
|
|
|
|
|
p->optionptrs = optionptrs;
|
|
|
|
@ -545,7 +558,7 @@ static void plugin_progress_init(liServer *srv, liPlugin *p, gpointer userdata)
|
|
|
|
|
|
|
|
|
|
p->free = plugin_progress_free;
|
|
|
|
|
p->handle_vrclose = progress_vrclose;
|
|
|
|
|
p->handle_prepare_worker = progress_prepare_worker;
|
|
|
|
|
p->handle_prepare = progress_prepare;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|