vere: add /~/slog http endpoint

Acts as a SSE endpoint for streaming slogs emitted by the runtime. Takes care
of tank rendering, emitting the lines resulting from printing the tank at
whatever the current terminal width is.

Adds siginfo logic to http.c, showing how many connections to that endpoint are
open.
This commit is contained in:
Fang 2020-09-24 17:07:12 +02:00
parent be1cfc16c4
commit c4a2088bff
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972
3 changed files with 222 additions and 11 deletions

View File

@ -633,6 +633,8 @@
u3_pico* ent_u;
u3_pico* ext_u;
} pec_u;
void* sop_p; // slog stream data
void (*sog_f)(void*, c3_w, u3_noun); // slog stream callback
// XX remove
c3_s por_s; // UDP port
u3_save* sav_u; // autosave

View File

@ -90,8 +90,7 @@ typedef struct _u3_h2o_serv {
*/
typedef struct _u3_hfig {
u3_form* for_u; // config from %eyre
struct _u3_warc* cli_u; // rev proxy clients
struct _u3_pcon* con_u; // cli_u connections
struct _u3_hreq* seq_u; // open slog requests
} u3_hfig;
/* u3_httd: general http device
@ -361,6 +360,44 @@ _http_req_unlink(u3_hreq* req_u)
}
}
/* _http_seq_link(): store slog stream request in state
*/
static void
_http_seq_link(u3_hcon* hon_u, u3_hreq* req_u)
{
u3_hfig* fig_u = &hon_u->htp_u->htd_u->fig_u;
req_u->hon_u = hon_u;
req_u->seq_l = hon_u->seq_l++;
req_u->nex_u = fig_u->seq_u;
if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = req_u;
}
fig_u->seq_u = req_u;
}
/* _http_seq_unlink(): remove slog stream request from state
*/
static void
_http_seq_unlink(u3_hreq* req_u)
{
u3_hfig* fig_u = &req_u->hon_u->htp_u->htd_u->fig_u;
if ( 0 != req_u->pre_u ) {
req_u->pre_u->nex_u = req_u->nex_u;
if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = req_u->pre_u;
}
}
else {
fig_u->seq_u = req_u->nex_u;
if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = 0;
}
}
}
/* _http_req_to_duct(): translate srv/con/req to duct
*/
static u3_noun
@ -395,13 +432,11 @@ typedef struct _u3_hgen {
u3_hreq* req_u; // originating request
} u3_hgen;
/* _http_req_done(): request finished, deallocation callback
/* _http_req_close(): clean up & deallocate request
*/
static void
_http_req_done(void* ptr_v)
_http_req_close(u3_hreq* req_u)
{
u3_hreq* req_u = (u3_hreq*)ptr_v;
// client canceled request before response
//
if ( u3_rsat_plan == req_u->sat_e ) {
@ -412,10 +447,28 @@ _http_req_done(void* ptr_v)
uv_close((uv_handle_t*)req_u->tim_u, _http_close_cb);
req_u->tim_u = 0;
}
}
/* _http_req_done(): request finished, deallocation callback
*/
static void
_http_req_done(void* ptr_v)
{
u3_hreq* req_u = (u3_hreq*)ptr_v;
_http_req_close(req_u);
_http_req_unlink(req_u);
}
/* _http_seq_done(): slog stream request finished, deallocation callback
*/
static void
_http_seq_done(void* ptr_v)
{
u3_hreq* seq_u = (u3_hreq*)ptr_v;
_http_req_close(seq_u);
_http_seq_unlink(seq_u);
}
/* _http_req_timer_cb(): request timeout callback
*/
static void
@ -432,7 +485,7 @@ _http_req_timer_cb(uv_timer_t* tim_u)
}
}
/* _http_req_new(): receive http request.
/* _http_req_new(): receive standard http request.
*/
static u3_hreq*
_http_req_new(u3_hcon* hon_u, h2o_req_t* rec_u)
@ -450,6 +503,24 @@ _http_req_new(u3_hcon* hon_u, h2o_req_t* rec_u)
return req_u;
}
/* _http_seq_new(): receive slog stream http request.
*/
static u3_hreq*
_http_seq_new(u3_hcon* hon_u, h2o_req_t* rec_u)
{
u3_hreq* req_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*req_u),
_http_seq_done);
req_u->rec_u = rec_u;
req_u->sat_e = u3_rsat_plan;
req_u->tim_u = 0;
req_u->gen_u = 0;
req_u->pre_u = 0;
_http_seq_link(hon_u, req_u);
return req_u;
}
/* _http_req_dispatch(): dispatch http request to %eyre
*/
static void
@ -728,6 +799,47 @@ typedef struct _h2o_uv_sock { // see private st_h2o_uv_socket_t
uv_stream_t* han_u; // client stream handler (u3_hcon)
} h2o_uv_sock;
/* _http_seq_accept(): handle incoming http request on slogstream endpoint
*/
static int
_http_seq_accept(h2o_handler_t* han_u, h2o_req_t* rec_u)
{
// store the request in state
//
u3_hreq* seq_u;
{
h2o_uv_sock* suv_u = (h2o_uv_sock*)rec_u->conn->
callbacks->get_socket(rec_u->conn);
u3_hcon* hon_u = (u3_hcon*)suv_u->han_u;
// sanity check
c3_assert( hon_u->sok_u == &suv_u->sok_u );
seq_u = _http_seq_new(hon_u, rec_u);
seq_u->tim_u = c3_malloc(sizeof(*seq_u->tim_u));
seq_u->tim_u->data = seq_u;
uv_timer_init(u3L, seq_u->tim_u);
uv_timer_start(seq_u->tim_u, _http_req_timer_cb, 600 * 1000, 0);
}
// send the initial response
//
{
u3_noun hed = u3nl(u3nc(u3i_string("Content-Type"),
u3i_string("text/event-stream")),
u3nc(u3i_string("Cache-Control"),
u3i_string("no-cache")),
u3nc(u3i_string("Connection"),
u3i_string("keep-alive")),
u3_none);
_http_start_respond(seq_u, 200, hed, u3_nul, c3n);
}
return 0;
}
/* _http_rec_accept(); handle incoming http request from h2o.
*/
static c3_i
@ -1240,6 +1352,12 @@ _http_serv_init_h2o(SSL_CTX* tls_u, c3_o log, c3_o red)
h2o_u->han_u->on_req = _http_rec_accept;
}
// register slog stream endpoint
//
h2o_pathconf_t* pac_u = h2o_config_register_path(h2o_u->hos_u, "/~/slog", 0);
h2o_handler_t* han_u = h2o_create_handler(pac_u, sizeof(*han_u));
han_u->on_req = _http_seq_accept;
if ( c3y == log ) {
// XX move this to post serv_start and put the port in the name
#if 0
@ -1732,6 +1850,67 @@ _http_ef_http_server(u3_httd* htd_u,
u3z(dat);
}
/* _http_stream_slog(): emit slog to open connections
*/
static void
_http_stream_slog(void* vop_p, c3_w pri_w, u3_noun tan)
{
u3_httd* htd_u = (u3_httd*)vop_p;
u3_hreq* seq_u = htd_u->fig_u.seq_u;
// only do the work if there are open slog streams
//
if ( 0 != seq_u ) {
u3_weak data = u3_none;
if ( c3y == u3a_is_atom(tan) ) {
data = u3nt(u3_nul, u3r_met(3, tan), u3k(tan));
}
else {
u3_weak wol = u3_none;
// if we have no arvo kernel and can't evaluate nock,
// only send %leaf tanks
//
if ( 0 == u3A->roc ) {
if ( c3__leaf == u3h(tan) ) {
wol = u3nc(u3k(u3t(tan)), u3_nul);
}
}
else {
u3_noun blu = u3_term_get_blew(0);
c3_l col_l = u3h(blu);
wol = u3dc("wash", u3nc(0, col_l), u3k(tan));
u3z(blu);
}
if ( u3_none != wol ) {
u3_noun low = wol;
u3_atom txt = u3_nul;
while ( u3_nul != low ) {
u3_atom lin = u3qc_cat(3, u3qc_rap(3, u3k(u3h(low))), c3_s2('\n', '\n'));
txt = u3qc_cat(3, txt, u3qc_cat(3, u3i_string("data:"), lin));
low = u3t(low);
}
data = u3nt(u3_nul, u3r_met(3, txt), txt);
}
u3z(wol);
}
if ( u3_none != data ) {
while ( 0 != seq_u ) {
_http_continue_respond(seq_u, u3k(data), c3n);
seq_u = seq_u->nex_u;
}
}
u3z(data);
}
u3z(tan);
}
/* _reck_mole(): parse simple atomic mole.
*/
static u3_noun
@ -1800,7 +1979,7 @@ _http_io_kick(u3_auto* car_u, u3_noun wir, u3_noun cad)
u3_noun p_pud, t_pud, tt_pud, q_pud, r_pud, s_pud;
c3_l sev_l, coq_l, seq_l;
if ( (c3n == u3r_cell(pud, &p_pud, &t_pud)) ||
(c3n == _reck_lily(c3__uv, u3k(p_pud), &sev_l)) )
{
@ -1857,11 +2036,34 @@ _http_io_exit(u3_auto* car_u)
// _http_serv_close(htp_u);
// }
// XX close u3_Host.fig_u.cli_u and con_u
{
u3_noun dat = u3nt(u3_nul, 25, u3i_string("data:urbit shutting down\n\n"));
u3_hreq* seq_u = htd_u->fig_u.seq_u;
while ( 0 != seq_u ) {
_http_continue_respond(seq_u, u3k(dat), c3y);
seq_u = seq_u->nex_u;
}
u3z(dat);
}
_http_release_ports_file(u3_Host.dir_c);
}
/* _http_io_info(): print status info.
*/
static void
_http_io_info(u3_auto* car_u)
{
u3_httd* htd_u = (u3_httd*)car_u;
c3_y sec_y = 0;
u3_hreq* seq_u = htd_u->fig_u.seq_u;
while ( 0 != seq_u ) {
sec_y++;
seq_u = seq_u->nex_u;
}
u3l_log(" open slogstreams: %d\n", sec_y);
}
/* u3_http_io_init(): initialize http I/O.
*/
u3_auto*
@ -1873,8 +2075,13 @@ u3_http_io_init(u3_pier* pir_u)
car_u->nam_m = c3__http;
car_u->liv_o = c3n;
car_u->io.talk_f = _http_io_talk;
car_u->io.info_f = _http_io_info;
car_u->io.kick_f = _http_io_kick;
car_u->io.exit_f = _http_io_exit;
pir_u->sop_p = htd_u;
pir_u->sog_f = _http_stream_slog;
// XX retry up to N?
//
// car_u->ev.bail_f = ...;

View File

@ -971,10 +971,12 @@ _pier_on_lord_slog(void* ptr_v, c3_w pri_w, u3_noun tan)
c3_c* tan_c = u3r_string(tan);
u3C.stderr_log_f(tan_c);
c3_free(tan_c);
u3z(tan);
pir_u->sog_f(pir_u->sop_p, pri_w, tan);
}
else {
u3_pier_tank(0, pri_w, tan);
u3_pier_tank(0, pri_w, u3k(tan));
pir_u->sog_f(pir_u->sop_p, pri_w, tan);
}
}