From 7483e23b29989fbbce1b78d91e0d9bc9fee948c7 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Thu, 11 Oct 2018 16:46:21 -0400 Subject: [PATCH] WIP implements multi-part http responses --- include/vere/vere.h | 1 + vere/http.c | 156 +++++++++++++++++++++++++++++--------------- 2 files changed, 106 insertions(+), 51 deletions(-) diff --git a/include/vere/vere.h b/include/vere/vere.h index d248ccf78..265b42f7d 100644 --- a/include/vere/vere.h +++ b/include/vere/vere.h @@ -50,6 +50,7 @@ c3_w seq_l; // sequence within connection u3_rsat sat_e; // request state uv_timer_t* tim_u; // timeout + void* gen_u; // response generator struct _u3_hcon* hon_u; // connection backlink struct _u3_hreq* nex_u; // next in connection's list struct _u3_hreq* pre_u; // next in connection's list diff --git a/vere/http.c b/vere/http.c index 551283f2a..d8d7bd410 100644 --- a/vere/http.c +++ b/vere/http.c @@ -332,9 +332,13 @@ _http_req_dispatch(u3_hreq* req_u, u3_noun req) } typedef struct _u3_hgen { - h2o_generator_t neg_u; - h2o_iovec_t bod_u; - u3_hhed* hed_u; + h2o_generator_t neg_u; // response callbacks + h2o_iovec_t bod_u; // pending body + c3_o red; // ready to send + c3_o dun; // done sending + void* fre_v; // buffer to be freed + u3_hhed* hed_u; // headers to be freed + u3_hreq* req_u; // originating request } u3_hgen; /* _http_hgen_dispose(): dispose response generator and buffers @@ -344,7 +348,50 @@ _http_hgen_dispose(void* ptr_v) { u3_hgen* gen_u = (u3_hgen*)ptr_v; _http_heds_free(gen_u->hed_u); - free(gen_u->bod_u.base); + // free(gen_u->bod_u.base); + free(gen_u->fre_v); +} + +/* _http_stop_response(): h2o is closing an in-progress response. +*/ +static void +_http_stop_response(h2o_generator_t* neg_u, h2o_req_t* rec_u) +{ + // kill request in %light + // disposal will happen as usual +} + +/* _http_proceed_response(): h2o is ready for more response data. +*/ +static void +_http_proceed_response(h2o_generator_t* neg_u, h2o_req_t* rec_u) +{ + u3_hgen* gen_u = (u3_hgen*)neg_u; + u3_hreq* req_u = gen_u->req_u; + + // sanity check + c3_assert( rec_u == req_u->rec_u ); + + // free the old buffer that has now been completely written + if ( 0 != gen_u->fre_v ) { + free(gen_u->fre_v); + gen_u->fre_v = 0; + } + + // send the pending buffer + if ( 0 != gen_u->bod_u.base ) { + h2o_send(rec_u, &gen_u->bod_u, 1, ( c3y == gen_u->dun ) ? + H2O_SEND_STATE_FINAL : + H2O_SEND_STATE_IN_PROGRESS); + // save a pointer to free later + gen_u->fre_v = gen_u->bod_u.base; + gen_u->bod_u.base = 0; + gen_u->red = c3n; + } + // we're ready for the next buffer + else { + gen_u->red = c3y; + } } /* _http_start_respond(): write a [%http-response %start ...] to h2o_req_t->res @@ -364,7 +411,7 @@ _http_start_respond(u3_hreq* req_u, req_u->sat_e = u3_rsat_ripe; - uv_timer_stop(req_u->tim_u); + // uv_timer_stop(req_u->tim_u); h2o_req_t* rec_u = req_u->rec_u; @@ -377,11 +424,6 @@ _http_start_respond(u3_hreq* req_u, u3_hhed* hed_u = _http_heds_from_noun(u3k(headers)); - u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u), - _http_hgen_dispose); - gen_u->neg_u = (h2o_generator_t){0, 0}; - gen_u->hed_u = hed_u; - while ( 0 != hed_u ) { h2o_add_header_by_str(&rec_u->pool, &rec_u->res.headers, hed_u->nam_c, hed_u->nam_w, 0, 0, @@ -389,7 +431,18 @@ _http_start_respond(u3_hreq* req_u, hed_u = hed_u->nex_u; } - gen_u->bod_u = _http_vec_from_octs(u3k(data)); + h2o_iovec_t bod_u = _http_vec_from_octs(u3k(data)); + + u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u), + _http_hgen_dispose); + gen_u->neg_u = (h2o_generator_t){ + _http_proceed_response, + _http_stop_response + }; + gen_u->hed_u = hed_u; + gen_u->fre_v = bod_u.base; + gen_u->req_u = req_u; + req_u->gen_u = gen_u; if (c3y == complete) { // We know the entire size of the response, so ensure we tell h2o about the size. @@ -402,7 +455,7 @@ _http_start_respond(u3_hreq* req_u, fprintf(stderr, "start and send state final\n"); // We are the final message. - h2o_send(rec_u, &gen_u->bod_u, 1, H2O_SEND_STATE_FINAL); + h2o_send(rec_u, &bod_u, 1, H2O_SEND_STATE_FINAL); u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u; @@ -413,10 +466,11 @@ _http_start_respond(u3_hreq* req_u, fprintf(stderr, "start and send state in progress\n"); // We are the first of multiple messages. - h2o_send(rec_u, &gen_u->bod_u, 1, H2O_SEND_STATE_IN_PROGRESS); + h2o_send(rec_u, &bod_u, 1, H2O_SEND_STATE_IN_PROGRESS); - // We must restart the timeout timer. - uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); + // We must restart the timeout timer + // XX revisit timer + // uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); } u3z(status); u3z(headers); u3z(data); u3z(complete); @@ -434,6 +488,9 @@ _http_continue_respond(u3_hreq* req_u, { fprintf(stderr, "continue\n"); + // XX add sequence numbers for %continue cards? + // Arvo does not guarantee idempotence!! + /* if ( u3_rsat_plan != req_u->sat_e ) { */ /* //uL(fprintf(uH, "duplicate response\n")); */ /* return; */ @@ -441,47 +498,43 @@ _http_continue_respond(u3_hreq* req_u, /* req_u->sat_e = u3_rsat_ripe; */ - uv_timer_stop(req_u->tim_u); + // XX revisit timer + // uv_timer_stop(req_u->tim_u); + h2o_iovec_t bod_u = _http_vec_from_octs(u3k(data)); + + + u3_hgen* gen_u = req_u->gen_u; h2o_req_t* rec_u = req_u->rec_u; - /* rec_u->res.status = status; */ - /* rec_u->res.reason = (status < 200) ? "weird" : */ - /* (status < 300) ? "ok" : */ - /* (status < 400) ? "moved" : */ - /* (status < 500) ? "missing" : */ - /* "hosed"; */ + gen_u->dun = complete; - /* u3_hhed* hed_u = _http_heds_from_noun(u3k(headers)); */ + // if the generator is ready, immediately send + if ( c3y == gen_u->red ) { + h2o_send(rec_u, &bod_u, 1, ( c3y == gen_u->dun ) ? + H2O_SEND_STATE_FINAL : + H2O_SEND_STATE_IN_PROGRESS); - u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u), - _http_hgen_dispose); - gen_u->neg_u = (h2o_generator_t){0, 0}; - /* gen_u->hed_u = hed_u; */ + c3_assert( 0 == gen_u->fre_v ); + // save a pointer to free later + gen_u->fre_v = bod_u.base; - /* while ( 0 != hed_u ) { */ - /* h2o_add_header_by_str(&rec_u->pool, &rec_u->res.headers, */ - /* hed_u->nam_c, hed_u->nam_w, 0, 0, */ - /* hed_u->val_c, hed_u->val_w); */ - /* hed_u = hed_u->nex_u; */ - /* } */ + // if we're done, and shutting down ... + if ( c3y == gen_u->dun ) { + u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u; - gen_u->bod_u = _http_vec_from_octs(u3k(data)); - /* rec_u->res.content_length = gen_u->bod_u.len; */ - - - /* h2o_start_response(rec_u, &gen_u->neg_u); */ - h2o_send(rec_u, &gen_u->bod_u, 1, - (c3y == complete ? - H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS)); - - { - u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u; - - if ( 0 != h2o_u->ctx_u.shutdown_requested ) { - rec_u->http1_is_persistent = 0; + if ( 0 != h2o_u->ctx_u.shutdown_requested ) { + rec_u->http1_is_persistent = 0; + } } } + // if the generator is not ready, save the buffer + // to be sent on _http_proceed_response + else { + // TODO: this could overwrite + // realloc and combine buffers + gen_u->bod_u = bod_u; + } u3z(data); u3z(complete); } @@ -543,10 +596,11 @@ _http_rec_accept(h2o_handler_t* han_u, h2o_req_t* rec_u) u3_hreq* req_u = _http_req_new(hon_u, rec_u); - req_u->tim_u = c3_malloc(sizeof(*req_u->tim_u)); - req_u->tim_u->data = req_u; - uv_timer_init(u3L, req_u->tim_u); - uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); + // XX revisit timer + // req_u->tim_u = c3_malloc(sizeof(*req_u->tim_u)); + // req_u->tim_u->data = req_u; + // uv_timer_init(u3L, req_u->tim_u); + // uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); _http_req_dispatch(req_u, req);