2
0
Fork 0

Add support for filters

This commit is contained in:
Stefan Bühler 2009-03-11 16:57:07 +01:00
parent 764ce8335b
commit 158f42aa0f
2 changed files with 84 additions and 23 deletions

View File

@ -32,7 +32,6 @@ typedef enum {
VRS_ERROR
} vrequest_state;
typedef handler_t (*filter_handler)(vrequest *vr, filter *f, plugin *p);
typedef handler_t (*vrequest_handler)(vrequest *vr);
typedef handler_t (*vrequest_plugin_handler)(vrequest *vr, plugin *p);
@ -46,7 +45,7 @@ struct filter {
struct filters {
GPtrArray *queue;
chunkqueue *in, *out;
gboolean pending, waitforevent, waitforfd;
guint skip_ndx;
};
struct connection;
@ -100,6 +99,9 @@ LI_API vrequest* vrequest_new(struct connection *con, vrequest_handler handle_re
LI_API void vrequest_free(vrequest *vr);
LI_API void vrequest_reset(vrequest *vr);
LI_API void vrequest_add_filter_in(vrequest *vr, plugin *p, filter_handler handle);
LI_API void vrequest_add_filter_out(vrequest *vr, plugin *p, filter_handler handle);
/* Signals an internal error; handles the error in the _next_ loop */
LI_API void vrequest_error(vrequest *vr);

View File

@ -2,15 +2,6 @@
#include <lighttpd/base.h>
#include <lighttpd/plugin_core.h>
static filter* filter_new() {
filter *f = g_slice_new0(filter);
return f;
}
static void filter_free(filter *f) {
g_slice_free(filter, f);
}
static void filters_init(filters *fs) {
fs->queue = g_ptr_array_new();
fs->in = chunkqueue_new();
@ -20,7 +11,9 @@ static void filters_init(filters *fs) {
static void filters_clean(filters *fs) {
guint i;
for (i = 0; i < fs->queue->len; i++) {
filter_free((filter*) g_ptr_array_index(fs->queue, i));
filter *f = (filter*) g_ptr_array_index(fs->queue, i);
if (i > 0) chunkqueue_free(fs->in);
g_slice_free(filter, f);
}
g_ptr_array_free(fs->queue, TRUE);
chunkqueue_free(fs->in);
@ -29,14 +22,79 @@ static void filters_clean(filters *fs) {
static void filters_reset(filters *fs) {
guint i;
fs->skip_ndx = 0;
for (i = 0; i < fs->queue->len; i++) {
filter_free((filter*) g_ptr_array_index(fs->queue, i));
filter *f = (filter*) g_ptr_array_index(fs->queue, i);
if (i > 0) chunkqueue_free(fs->in);
g_slice_free(filter, f);
}
g_ptr_array_set_size(fs->queue, 0);
chunkqueue_reset(fs->in);
chunkqueue_reset(fs->out);
}
static gboolean filters_run(vrequest *vr, filters *fs) {
guint i;
if (0 == fs->queue->len) {
chunkqueue_steal_all(fs->out, fs->in);
if (fs->in->is_closed) fs->out->is_closed = TRUE;
return TRUE;
}
for (i = 0; i < fs->queue->len; i++) {
filter *f = (filter*) g_ptr_array_index(fs->queue, i);
switch (f->handle(vr, f, f->p)) {
case HANDLER_GO_ON:
break;
case HANDLER_COMEBACK:
vrequest_joblist_append(vr);
break;
case HANDLER_WAIT_FOR_EVENT:
break; /* ignore - filter has to call vrequest_joblist_append(vr); */
case HANDLER_ERROR:
return FALSE;
}
}
if (fs->out->is_closed) {
filter *f = (filter*) g_ptr_array_index(fs->queue, fs->queue->len - 1);
f->in->is_closed = TRUE;
}
for (i = fs->queue->len; i-- > fs->skip_ndx; ) {
filter *f = (filter*) g_ptr_array_index(fs->queue, i);
if (f->in->is_closed) {
guint j = i;
while (j-- > fs->skip_ndx) {
filter *ff = (filter*) g_ptr_array_index(fs->queue, j);
ff->in->is_closed = TRUE;
}
fs->skip_ndx = i;
}
}
return TRUE;
}
static void filters_add(filters *fs, plugin *p, filter_handler handle) {
filter *f = g_slice_new0(filter);
f->out = fs->out;
f->p = p;
f->handle = handle;
if (0 == fs->queue->len) {
f->in = fs->in;
} else {
filter *prev = (filter*) g_ptr_array_index(fs->queue, fs->queue->len - 1);
f->in = prev->out = chunkqueue_new();
chunkqueue_set_limit(f->in, fs->in->limit);
}
g_ptr_array_add(fs->queue, f);
}
void vrequest_add_filter_in(vrequest *vr, plugin *p, filter_handler handle) {
filters_add(&vr->filters_in, p, handle);
}
void vrequest_add_filter_out(vrequest *vr, plugin *p, filter_handler handle) {
filters_add(&vr->filters_out, p, handle);
}
vrequest* vrequest_new(connection *con, vrequest_handler handle_response_headers, vrequest_handler handle_response_body, vrequest_handler handle_response_error, vrequest_handler handle_request_headers) {
server *srv = con->srv;
vrequest *vr = g_slice_new0(vrequest);
@ -228,12 +286,13 @@ static gboolean vrequest_do_handle_actions(vrequest *vr) {
static gboolean vrequest_do_handle_read(vrequest *vr) {
handler_t res;
if (vr->backend && vr->backend->handle_request_body) {
chunkqueue_steal_all(vr->in, vr->vr_in); /* TODO: filters */
if (!filters_run(vr, &vr->filters_in)) {
vrequest_error(vr);
}
if (vr->vr_in->is_closed) vr->in->is_closed = TRUE;
res = vr->backend->handle_request_body(vr, vr->backend);
switch (res) {
switch (vr->backend->handle_request_body(vr, vr->backend)) {
case HANDLER_GO_ON:
break;
case HANDLER_COMEBACK:
@ -253,11 +312,11 @@ static gboolean vrequest_do_handle_read(vrequest *vr) {
}
static gboolean vrequest_do_handle_write(vrequest *vr) {
handler_t res;
chunkqueue_steal_all(vr->vr_out, vr->out); /* TODO: filters */
if (vr->out->is_closed) vr->vr_out->is_closed = TRUE;
res = vr->handle_response_body(vr);
switch (res) {
if (!filters_run(vr, &vr->filters_out)) {
vrequest_error(vr);
}
switch (vr->handle_response_body(vr)) {
case HANDLER_GO_ON:
break;
case HANDLER_COMEBACK:
@ -387,4 +446,4 @@ gboolean vrequest_redirect(vrequest *vr, GString *uri) {
http_header_overwrite(vr->response.headers, CONST_STR_LEN("Location"), GSTR_LEN(uri));
return TRUE;
}
}