From c4a2088bff2d28c7a8a43eacbcbcaff0def62513 Mon Sep 17 00:00:00 2001 From: Fang Date: Thu, 24 Sep 2020 17:07:12 +0200 Subject: [PATCH] vere: add /~/slog http endpoint Acts as a SSE endpoint for streaming slogs emitted by the runtime. Takes care of tank rendering, emitting the lines resulting from printing the tank at whatever the current terminal width is. Adds siginfo logic to http.c, showing how many connections to that endpoint are open. --- pkg/urbit/include/vere/vere.h | 2 + pkg/urbit/vere/io/http.c | 225 ++++++++++++++++++++++++++++++++-- pkg/urbit/vere/pier.c | 6 +- 3 files changed, 222 insertions(+), 11 deletions(-) diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index 516a1319bc..e8f3dcd45e 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -633,6 +633,8 @@ u3_pico* ent_u; u3_pico* ext_u; } pec_u; + void* sop_p; // slog stream data + void (*sog_f)(void*, c3_w, u3_noun); // slog stream callback // XX remove c3_s por_s; // UDP port u3_save* sav_u; // autosave diff --git a/pkg/urbit/vere/io/http.c b/pkg/urbit/vere/io/http.c index bcd356221e..1e7ee68b5e 100644 --- a/pkg/urbit/vere/io/http.c +++ b/pkg/urbit/vere/io/http.c @@ -90,8 +90,7 @@ typedef struct _u3_h2o_serv { */ typedef struct _u3_hfig { u3_form* for_u; // config from %eyre - struct _u3_warc* cli_u; // rev proxy clients - struct _u3_pcon* con_u; // cli_u connections + struct _u3_hreq* seq_u; // open slog requests } u3_hfig; /* u3_httd: general http device @@ -361,6 +360,44 @@ _http_req_unlink(u3_hreq* req_u) } } +/* _http_seq_link(): store slog stream request in state +*/ +static void +_http_seq_link(u3_hcon* hon_u, u3_hreq* req_u) +{ + u3_hfig* fig_u = &hon_u->htp_u->htd_u->fig_u; + req_u->hon_u = hon_u; + req_u->seq_l = hon_u->seq_l++; + req_u->nex_u = fig_u->seq_u; + + if ( 0 != req_u->nex_u ) { + req_u->nex_u->pre_u = req_u; + } + fig_u->seq_u = req_u; +} + +/* _http_seq_unlink(): remove slog stream request from state +*/ +static void +_http_seq_unlink(u3_hreq* req_u) +{ + u3_hfig* fig_u = &req_u->hon_u->htp_u->htd_u->fig_u; + if ( 0 != req_u->pre_u ) { + req_u->pre_u->nex_u = req_u->nex_u; + + if ( 0 != req_u->nex_u ) { + req_u->nex_u->pre_u = req_u->pre_u; + } + } + else { + fig_u->seq_u = req_u->nex_u; + + if ( 0 != req_u->nex_u ) { + req_u->nex_u->pre_u = 0; + } + } +} + /* _http_req_to_duct(): translate srv/con/req to duct */ static u3_noun @@ -395,13 +432,11 @@ typedef struct _u3_hgen { u3_hreq* req_u; // originating request } u3_hgen; -/* _http_req_done(): request finished, deallocation callback +/* _http_req_close(): clean up & deallocate request */ static void -_http_req_done(void* ptr_v) +_http_req_close(u3_hreq* req_u) { - u3_hreq* req_u = (u3_hreq*)ptr_v; - // client canceled request before response // if ( u3_rsat_plan == req_u->sat_e ) { @@ -412,10 +447,28 @@ _http_req_done(void* ptr_v) uv_close((uv_handle_t*)req_u->tim_u, _http_close_cb); req_u->tim_u = 0; } +} +/* _http_req_done(): request finished, deallocation callback +*/ +static void +_http_req_done(void* ptr_v) +{ + u3_hreq* req_u = (u3_hreq*)ptr_v; + _http_req_close(req_u); _http_req_unlink(req_u); } +/* _http_seq_done(): slog stream request finished, deallocation callback +*/ +static void +_http_seq_done(void* ptr_v) +{ + u3_hreq* seq_u = (u3_hreq*)ptr_v; + _http_req_close(seq_u); + _http_seq_unlink(seq_u); +} + /* _http_req_timer_cb(): request timeout callback */ static void @@ -432,7 +485,7 @@ _http_req_timer_cb(uv_timer_t* tim_u) } } -/* _http_req_new(): receive http request. +/* _http_req_new(): receive standard http request. */ static u3_hreq* _http_req_new(u3_hcon* hon_u, h2o_req_t* rec_u) @@ -450,6 +503,24 @@ _http_req_new(u3_hcon* hon_u, h2o_req_t* rec_u) return req_u; } +/* _http_seq_new(): receive slog stream http request. +*/ +static u3_hreq* +_http_seq_new(u3_hcon* hon_u, h2o_req_t* rec_u) +{ + u3_hreq* req_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*req_u), + _http_seq_done); + req_u->rec_u = rec_u; + req_u->sat_e = u3_rsat_plan; + req_u->tim_u = 0; + req_u->gen_u = 0; + req_u->pre_u = 0; + + _http_seq_link(hon_u, req_u); + + return req_u; +} + /* _http_req_dispatch(): dispatch http request to %eyre */ static void @@ -728,6 +799,47 @@ typedef struct _h2o_uv_sock { // see private st_h2o_uv_socket_t uv_stream_t* han_u; // client stream handler (u3_hcon) } h2o_uv_sock; +/* _http_seq_accept(): handle incoming http request on slogstream endpoint +*/ +static int +_http_seq_accept(h2o_handler_t* han_u, h2o_req_t* rec_u) +{ + // store the request in state + // + u3_hreq* seq_u; + { + h2o_uv_sock* suv_u = (h2o_uv_sock*)rec_u->conn-> + callbacks->get_socket(rec_u->conn); + u3_hcon* hon_u = (u3_hcon*)suv_u->han_u; + + // sanity check + c3_assert( hon_u->sok_u == &suv_u->sok_u ); + + seq_u = _http_seq_new(hon_u, rec_u); + + seq_u->tim_u = c3_malloc(sizeof(*seq_u->tim_u)); + seq_u->tim_u->data = seq_u; + uv_timer_init(u3L, seq_u->tim_u); + uv_timer_start(seq_u->tim_u, _http_req_timer_cb, 600 * 1000, 0); + } + + // send the initial response + // + { + u3_noun hed = u3nl(u3nc(u3i_string("Content-Type"), + u3i_string("text/event-stream")), + u3nc(u3i_string("Cache-Control"), + u3i_string("no-cache")), + u3nc(u3i_string("Connection"), + u3i_string("keep-alive")), + u3_none); + + _http_start_respond(seq_u, 200, hed, u3_nul, c3n); + } + + return 0; +} + /* _http_rec_accept(); handle incoming http request from h2o. */ static c3_i @@ -1240,6 +1352,12 @@ _http_serv_init_h2o(SSL_CTX* tls_u, c3_o log, c3_o red) h2o_u->han_u->on_req = _http_rec_accept; } + // register slog stream endpoint + // + h2o_pathconf_t* pac_u = h2o_config_register_path(h2o_u->hos_u, "/~/slog", 0); + h2o_handler_t* han_u = h2o_create_handler(pac_u, sizeof(*han_u)); + han_u->on_req = _http_seq_accept; + if ( c3y == log ) { // XX move this to post serv_start and put the port in the name #if 0 @@ -1732,6 +1850,67 @@ _http_ef_http_server(u3_httd* htd_u, u3z(dat); } +/* _http_stream_slog(): emit slog to open connections +*/ +static void +_http_stream_slog(void* vop_p, c3_w pri_w, u3_noun tan) +{ + u3_httd* htd_u = (u3_httd*)vop_p; + u3_hreq* seq_u = htd_u->fig_u.seq_u; + + // only do the work if there are open slog streams + // + if ( 0 != seq_u ) { + u3_weak data = u3_none; + + if ( c3y == u3a_is_atom(tan) ) { + data = u3nt(u3_nul, u3r_met(3, tan), u3k(tan)); + } + else { + u3_weak wol = u3_none; + + // if we have no arvo kernel and can't evaluate nock, + // only send %leaf tanks + // + if ( 0 == u3A->roc ) { + if ( c3__leaf == u3h(tan) ) { + wol = u3nc(u3k(u3t(tan)), u3_nul); + } + } + else { + u3_noun blu = u3_term_get_blew(0); + c3_l col_l = u3h(blu); + wol = u3dc("wash", u3nc(0, col_l), u3k(tan)); + u3z(blu); + } + + if ( u3_none != wol ) { + u3_noun low = wol; + u3_atom txt = u3_nul; + while ( u3_nul != low ) { + u3_atom lin = u3qc_cat(3, u3qc_rap(3, u3k(u3h(low))), c3_s2('\n', '\n')); + txt = u3qc_cat(3, txt, u3qc_cat(3, u3i_string("data:"), lin)); + low = u3t(low); + } + data = u3nt(u3_nul, u3r_met(3, txt), txt); + } + + u3z(wol); + } + + if ( u3_none != data ) { + while ( 0 != seq_u ) { + _http_continue_respond(seq_u, u3k(data), c3n); + seq_u = seq_u->nex_u; + } + } + + u3z(data); + } + + u3z(tan); +} + /* _reck_mole(): parse simple atomic mole. */ static u3_noun @@ -1800,7 +1979,7 @@ _http_io_kick(u3_auto* car_u, u3_noun wir, u3_noun cad) u3_noun p_pud, t_pud, tt_pud, q_pud, r_pud, s_pud; c3_l sev_l, coq_l, seq_l; - + if ( (c3n == u3r_cell(pud, &p_pud, &t_pud)) || (c3n == _reck_lily(c3__uv, u3k(p_pud), &sev_l)) ) { @@ -1857,11 +2036,34 @@ _http_io_exit(u3_auto* car_u) // _http_serv_close(htp_u); // } - // XX close u3_Host.fig_u.cli_u and con_u + { + u3_noun dat = u3nt(u3_nul, 25, u3i_string("data:urbit shutting down\n\n")); + u3_hreq* seq_u = htd_u->fig_u.seq_u; + while ( 0 != seq_u ) { + _http_continue_respond(seq_u, u3k(dat), c3y); + seq_u = seq_u->nex_u; + } + u3z(dat); + } _http_release_ports_file(u3_Host.dir_c); } +/* _http_io_info(): print status info. +*/ +static void +_http_io_info(u3_auto* car_u) +{ + u3_httd* htd_u = (u3_httd*)car_u; + c3_y sec_y = 0; + u3_hreq* seq_u = htd_u->fig_u.seq_u; + while ( 0 != seq_u ) { + sec_y++; + seq_u = seq_u->nex_u; + } + u3l_log(" open slogstreams: %d\n", sec_y); +} + /* u3_http_io_init(): initialize http I/O. */ u3_auto* @@ -1873,8 +2075,13 @@ u3_http_io_init(u3_pier* pir_u) car_u->nam_m = c3__http; car_u->liv_o = c3n; car_u->io.talk_f = _http_io_talk; + car_u->io.info_f = _http_io_info; car_u->io.kick_f = _http_io_kick; car_u->io.exit_f = _http_io_exit; + + pir_u->sop_p = htd_u; + pir_u->sog_f = _http_stream_slog; + // XX retry up to N? // // car_u->ev.bail_f = ...; diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index bc2d2d0d72..cc330e28b4 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -971,10 +971,12 @@ _pier_on_lord_slog(void* ptr_v, c3_w pri_w, u3_noun tan) c3_c* tan_c = u3r_string(tan); u3C.stderr_log_f(tan_c); c3_free(tan_c); - u3z(tan); + + pir_u->sog_f(pir_u->sop_p, pri_w, tan); } else { - u3_pier_tank(0, pri_w, tan); + u3_pier_tank(0, pri_w, u3k(tan)); + pir_u->sog_f(pir_u->sop_p, pri_w, tan); } }