WIP implements multi-part http responses

This commit is contained in:
Joe Bryan 2018-10-11 16:46:21 -04:00
parent 330d8d6f35
commit 7483e23b29
2 changed files with 106 additions and 51 deletions

View File

@ -50,6 +50,7 @@
c3_w seq_l; // sequence within connection c3_w seq_l; // sequence within connection
u3_rsat sat_e; // request state u3_rsat sat_e; // request state
uv_timer_t* tim_u; // timeout uv_timer_t* tim_u; // timeout
void* gen_u; // response generator
struct _u3_hcon* hon_u; // connection backlink struct _u3_hcon* hon_u; // connection backlink
struct _u3_hreq* nex_u; // next in connection's list struct _u3_hreq* nex_u; // next in connection's list
struct _u3_hreq* pre_u; // next in connection's list struct _u3_hreq* pre_u; // next in connection's list

View File

@ -332,9 +332,13 @@ _http_req_dispatch(u3_hreq* req_u, u3_noun req)
} }
typedef struct _u3_hgen { typedef struct _u3_hgen {
h2o_generator_t neg_u; h2o_generator_t neg_u; // response callbacks
h2o_iovec_t bod_u; h2o_iovec_t bod_u; // pending body
u3_hhed* hed_u; c3_o red; // ready to send
c3_o dun; // done sending
void* fre_v; // buffer to be freed
u3_hhed* hed_u; // headers to be freed
u3_hreq* req_u; // originating request
} u3_hgen; } u3_hgen;
/* _http_hgen_dispose(): dispose response generator and buffers /* _http_hgen_dispose(): dispose response generator and buffers
@ -344,7 +348,50 @@ _http_hgen_dispose(void* ptr_v)
{ {
u3_hgen* gen_u = (u3_hgen*)ptr_v; u3_hgen* gen_u = (u3_hgen*)ptr_v;
_http_heds_free(gen_u->hed_u); _http_heds_free(gen_u->hed_u);
free(gen_u->bod_u.base); // free(gen_u->bod_u.base);
free(gen_u->fre_v);
}
/* _http_stop_response(): h2o is closing an in-progress response.
*/
static void
_http_stop_response(h2o_generator_t* neg_u, h2o_req_t* rec_u)
{
// kill request in %light
// disposal will happen as usual
}
/* _http_proceed_response(): h2o is ready for more response data.
*/
static void
_http_proceed_response(h2o_generator_t* neg_u, h2o_req_t* rec_u)
{
u3_hgen* gen_u = (u3_hgen*)neg_u;
u3_hreq* req_u = gen_u->req_u;
// sanity check
c3_assert( rec_u == req_u->rec_u );
// free the old buffer that has now been completely written
if ( 0 != gen_u->fre_v ) {
free(gen_u->fre_v);
gen_u->fre_v = 0;
}
// send the pending buffer
if ( 0 != gen_u->bod_u.base ) {
h2o_send(rec_u, &gen_u->bod_u, 1, ( c3y == gen_u->dun ) ?
H2O_SEND_STATE_FINAL :
H2O_SEND_STATE_IN_PROGRESS);
// save a pointer to free later
gen_u->fre_v = gen_u->bod_u.base;
gen_u->bod_u.base = 0;
gen_u->red = c3n;
}
// we're ready for the next buffer
else {
gen_u->red = c3y;
}
} }
/* _http_start_respond(): write a [%http-response %start ...] to h2o_req_t->res /* _http_start_respond(): write a [%http-response %start ...] to h2o_req_t->res
@ -364,7 +411,7 @@ _http_start_respond(u3_hreq* req_u,
req_u->sat_e = u3_rsat_ripe; req_u->sat_e = u3_rsat_ripe;
uv_timer_stop(req_u->tim_u); // uv_timer_stop(req_u->tim_u);
h2o_req_t* rec_u = req_u->rec_u; h2o_req_t* rec_u = req_u->rec_u;
@ -377,11 +424,6 @@ _http_start_respond(u3_hreq* req_u,
u3_hhed* hed_u = _http_heds_from_noun(u3k(headers)); u3_hhed* hed_u = _http_heds_from_noun(u3k(headers));
u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u),
_http_hgen_dispose);
gen_u->neg_u = (h2o_generator_t){0, 0};
gen_u->hed_u = hed_u;
while ( 0 != hed_u ) { while ( 0 != hed_u ) {
h2o_add_header_by_str(&rec_u->pool, &rec_u->res.headers, h2o_add_header_by_str(&rec_u->pool, &rec_u->res.headers,
hed_u->nam_c, hed_u->nam_w, 0, 0, hed_u->nam_c, hed_u->nam_w, 0, 0,
@ -389,7 +431,18 @@ _http_start_respond(u3_hreq* req_u,
hed_u = hed_u->nex_u; hed_u = hed_u->nex_u;
} }
gen_u->bod_u = _http_vec_from_octs(u3k(data)); h2o_iovec_t bod_u = _http_vec_from_octs(u3k(data));
u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u),
_http_hgen_dispose);
gen_u->neg_u = (h2o_generator_t){
_http_proceed_response,
_http_stop_response
};
gen_u->hed_u = hed_u;
gen_u->fre_v = bod_u.base;
gen_u->req_u = req_u;
req_u->gen_u = gen_u;
if (c3y == complete) { if (c3y == complete) {
// We know the entire size of the response, so ensure we tell h2o about the size. // We know the entire size of the response, so ensure we tell h2o about the size.
@ -402,7 +455,7 @@ _http_start_respond(u3_hreq* req_u,
fprintf(stderr, "start and send state final\n"); fprintf(stderr, "start and send state final\n");
// We are the final message. // We are the final message.
h2o_send(rec_u, &gen_u->bod_u, 1, H2O_SEND_STATE_FINAL); h2o_send(rec_u, &bod_u, 1, H2O_SEND_STATE_FINAL);
u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u; u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u;
@ -413,10 +466,11 @@ _http_start_respond(u3_hreq* req_u,
fprintf(stderr, "start and send state in progress\n"); fprintf(stderr, "start and send state in progress\n");
// We are the first of multiple messages. // We are the first of multiple messages.
h2o_send(rec_u, &gen_u->bod_u, 1, H2O_SEND_STATE_IN_PROGRESS); h2o_send(rec_u, &bod_u, 1, H2O_SEND_STATE_IN_PROGRESS);
// We must restart the timeout timer. // We must restart the timeout timer
uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); // XX revisit timer
// uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0);
} }
u3z(status); u3z(headers); u3z(data); u3z(complete); u3z(status); u3z(headers); u3z(data); u3z(complete);
@ -434,6 +488,9 @@ _http_continue_respond(u3_hreq* req_u,
{ {
fprintf(stderr, "continue\n"); fprintf(stderr, "continue\n");
// XX add sequence numbers for %continue cards?
// Arvo does not guarantee idempotence!!
/* if ( u3_rsat_plan != req_u->sat_e ) { */ /* if ( u3_rsat_plan != req_u->sat_e ) { */
/* //uL(fprintf(uH, "duplicate response\n")); */ /* //uL(fprintf(uH, "duplicate response\n")); */
/* return; */ /* return; */
@ -441,47 +498,43 @@ _http_continue_respond(u3_hreq* req_u,
/* req_u->sat_e = u3_rsat_ripe; */ /* req_u->sat_e = u3_rsat_ripe; */
uv_timer_stop(req_u->tim_u); // XX revisit timer
// uv_timer_stop(req_u->tim_u);
h2o_iovec_t bod_u = _http_vec_from_octs(u3k(data));
u3_hgen* gen_u = req_u->gen_u;
h2o_req_t* rec_u = req_u->rec_u; h2o_req_t* rec_u = req_u->rec_u;
/* rec_u->res.status = status; */ gen_u->dun = complete;
/* rec_u->res.reason = (status < 200) ? "weird" : */
/* (status < 300) ? "ok" : */
/* (status < 400) ? "moved" : */
/* (status < 500) ? "missing" : */
/* "hosed"; */
/* u3_hhed* hed_u = _http_heds_from_noun(u3k(headers)); */ // if the generator is ready, immediately send
if ( c3y == gen_u->red ) {
h2o_send(rec_u, &bod_u, 1, ( c3y == gen_u->dun ) ?
H2O_SEND_STATE_FINAL :
H2O_SEND_STATE_IN_PROGRESS);
u3_hgen* gen_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*gen_u), c3_assert( 0 == gen_u->fre_v );
_http_hgen_dispose); // save a pointer to free later
gen_u->neg_u = (h2o_generator_t){0, 0}; gen_u->fre_v = bod_u.base;
/* gen_u->hed_u = hed_u; */
/* while ( 0 != hed_u ) { */ // if we're done, and shutting down ...
/* h2o_add_header_by_str(&rec_u->pool, &rec_u->res.headers, */ if ( c3y == gen_u->dun ) {
/* hed_u->nam_c, hed_u->nam_w, 0, 0, */ u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u;
/* hed_u->val_c, hed_u->val_w); */
/* hed_u = hed_u->nex_u; */
/* } */
gen_u->bod_u = _http_vec_from_octs(u3k(data)); if ( 0 != h2o_u->ctx_u.shutdown_requested ) {
/* rec_u->res.content_length = gen_u->bod_u.len; */ rec_u->http1_is_persistent = 0;
}
/* h2o_start_response(rec_u, &gen_u->neg_u); */
h2o_send(rec_u, &gen_u->bod_u, 1,
(c3y == complete ?
H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS));
{
u3_h2o_serv* h2o_u = req_u->hon_u->htp_u->h2o_u;
if ( 0 != h2o_u->ctx_u.shutdown_requested ) {
rec_u->http1_is_persistent = 0;
} }
} }
// if the generator is not ready, save the buffer
// to be sent on _http_proceed_response
else {
// TODO: this could overwrite
// realloc and combine buffers
gen_u->bod_u = bod_u;
}
u3z(data); u3z(complete); u3z(data); u3z(complete);
} }
@ -543,10 +596,11 @@ _http_rec_accept(h2o_handler_t* han_u, h2o_req_t* rec_u)
u3_hreq* req_u = _http_req_new(hon_u, rec_u); u3_hreq* req_u = _http_req_new(hon_u, rec_u);
req_u->tim_u = c3_malloc(sizeof(*req_u->tim_u)); // XX revisit timer
req_u->tim_u->data = req_u; // req_u->tim_u = c3_malloc(sizeof(*req_u->tim_u));
uv_timer_init(u3L, req_u->tim_u); // req_u->tim_u->data = req_u;
uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0); // uv_timer_init(u3L, req_u->tim_u);
// uv_timer_start(req_u->tim_u, _http_req_timer_cb, 30 * 1000, 0);
_http_req_dispatch(req_u, req); _http_req_dispatch(req_u, req);