diff --git a/vere/http.c b/vere/http.c index bb2c9ffd15..017417fb4c 100644 --- a/vere/http.c +++ b/vere/http.c @@ -922,18 +922,16 @@ typedef struct _u3_proxy_writ { typedef struct _u3_proxy_conn { uv_tcp_t don_u; // downstream handle uv_tcp_t* upt_u; // upstream handle XX union of local connect and reverse listener? - u3_hbod* bod_u; // pending buffers + uv_buf_t buf_u; // pending buffer XX support multiple struct _u3_proxy_listener* lis_u; struct _u3_proxy_conn* nex_u; } u3_proxy_conn; -// XX probably not right typedef struct _u3_proxy_reverse { uv_tcp_t tcp_u; u3_atom sip; // reverse proxy for ship c3_s por_s; - u3_hbod* bod_u; // pending buffers - struct _u3_proxy_listener* lis_u; + struct _u3_proxy_conn* con_u; struct _u3_proxy_reverse* nex_u; } u3_proxy_reverse; @@ -963,7 +961,10 @@ _proxy_alloc(uv_handle_t* had_u, static void _proxy_writ_free(u3_proxy_writ* ruq_u) { - free(ruq_u->buf_y); + if ( 0 != ruq_u->buf_y ) { + free(ruq_u->buf_y); + } + free(ruq_u); } @@ -977,23 +978,15 @@ _proxy_writ_new(u3_proxy_conn* con_u, c3_y* buf_y) return ruq_u; } -// XX deduplicate with _cttp_bods_free() -static void -_proxy_bods_free(u3_hbod* bod_u) -{ - while ( bod_u ) { - u3_hbod* nex_u = bod_u->nex_u; - - free(bod_u); - bod_u = nex_u; - } -} - static void _proxy_conn_free(u3_proxy_conn* con_u) { - _proxy_bods_free(con_u->bod_u); + if ( 0 != con_u->buf_u.base ) { + free(con_u->buf_u.base); + } + free(con_u); + // XX detach from listener } @@ -1013,7 +1006,7 @@ _proxy_conn_new(u3_proxy_listener* lis_u) u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u)); con_u->lis_u = lis_u; con_u->upt_u = 0; - con_u->bod_u = 0; + con_u->buf_u = uv_buf_init(0, 0); con_u->nex_u = 0; con_u->don_u.data = con_u; @@ -1033,16 +1026,16 @@ _proxy_reverse_free(u3_proxy_reverse* rev_u) } static u3_proxy_reverse* -_proxy_reverse_new(u3_proxy_listener* lis_u, u3_atom sip) +_proxy_reverse_new(u3_proxy_conn* con_u, u3_atom sip) { u3_proxy_reverse* rev_u = c3_malloc(sizeof(*rev_u)); rev_u->tcp_u.data = rev_u; - rev_u->lis_u = lis_u; + rev_u->con_u = con_u; rev_u->sip = sip; rev_u->por_s = 0; // set after opened - rev_u->bod_u = 0; rev_u->nex_u = 0; - // XX link to listener + + // XX link to global state return rev_u; } @@ -1086,63 +1079,6 @@ _proxy_write_cb(uv_write_t* wri_u, c3_i sas_i) _proxy_writ_free(ruq_u); } -static c3_c* -_proxy_parse_host(const uv_buf_t* buf_u) -{ - c3_c* hot_c = 0; - - struct phr_header hed_u[H2O_MAX_HEADERS]; - size_t hed_t = H2O_MAX_HEADERS; - - { - size_t len_t = buf_u->len < H2O_MAX_REQLEN ? buf_u->len : H2O_MAX_REQLEN; - // XX slowloris? - c3_i las_i = 0; - c3_i ret_i; - - // unused - c3_i ver_i; - const c3_c* met_c; - size_t met_t; - const c3_c* pat_c; - size_t pat_t; - - ret_i = phr_parse_request(buf_u->base, len_t, &met_c, &met_t, - &pat_c, &pat_t, &ver_i, hed_u, &hed_t, las_i); - - if ( -1 == ret_i ) { - // parse error - // XX log error? close connection? - return hot_c; - } - else if ( -2 == ret_i ) { - // incomplete - // XX await next buffer? - } - } - - const h2o_token_t* tok_t; - size_t i; - - for ( i = 0; i < hed_t; i++ ) { - h2o_strtolower((c3_c*)hed_u[i].name, hed_u[i].name_len); - - if ( 0 != (tok_t = h2o_lookup_token(hed_u[i].name, hed_u[i].name_len)) ) { - if ( tok_t->is_init_header_special && H2O_TOKEN_HOST == tok_t ) { - - hot_c = c3_malloc(1 + hed_u[i].value_len); - hot_c[hed_u[i].value_len] = 0; - memcpy(hot_c, hed_u[i].value, hed_u[i].value_len); - - uL(fprintf(uH, "proxy: host: %s\n", hot_c)); - break; - } - } - } - - return hot_c; -} - static void _proxy_sock_read_downstream_cb(uv_stream_t* don_u, ssize_t siz_w, @@ -1154,8 +1090,6 @@ _proxy_sock_read_downstream_cb(uv_stream_t* don_u, _proxy_conn_close(con_u); } else { - _proxy_parse_host(buf_u); - u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base); c3_i sas_i; @@ -1198,7 +1132,23 @@ _proxy_sock_read_upstream_cb(uv_stream_t* upt_u, static void _proxy_fire(u3_proxy_conn* con_u) { - // XX uv_write all in bod_u to upt_u + if ( 0 != con_u->buf_u.base ) { + // XX free it later ... + u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, 0); + + c3_i sas_i; + if ( 0 != (sas_i = uv_write(&ruq_u->wri_u, + (uv_stream_t*)con_u->upt_u, + &con_u->buf_u, 1, + _proxy_write_cb)) ) { + uL(fprintf(uH, "proxy: write pending: %s\n", uv_strerror(sas_i))); + // XX wat do + _proxy_conn_close(con_u); + _proxy_writ_free(ruq_u); + + return; + } + } // XX set cooldown timers to close these? @@ -1209,6 +1159,7 @@ _proxy_fire(u3_proxy_conn* con_u) static void _proxy_lopc_connect_cb(uv_connect_t * upc_u, c3_i sas_i) { + uL(fprintf(uH, "proxy: lopc cb\n")); uv_tcp_t* upt_u = upc_u->data; u3_proxy_conn* con_u = upt_u->data; @@ -1262,36 +1213,287 @@ _proxy_lopc(u3_proxy_conn* con_u) uv_tcp_connect(upc_u, upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb); } +static void +_proxy_reverse_listen_cb(uv_stream_t* tcp_u, c3_i sas_i) +{ + uL(fprintf(uH, "proxy: rev cb\n")); + u3_proxy_reverse* rev_u = (u3_proxy_reverse*)tcp_u; + + if ( 0 != sas_i ) { + uL(fprintf(uH, "proxy: listen_cb: %s\n", uv_strerror(sas_i))); + + //XX wat do + _proxy_conn_close(rev_u->con_u); + uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free); + } + else { + uv_tcp_t* upt_u = c3_malloc(sizeof(*upt_u)); + + rev_u->con_u->upt_u = upt_u; + + uv_tcp_init(u3L, upt_u); + + if ( 0 != (sas_i = uv_accept((uv_stream_t*)&rev_u->tcp_u, (uv_stream_t*)upt_u)) ) { + uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i))); + + _proxy_conn_close(rev_u->con_u); + uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free); + } + else { + _proxy_fire(rev_u->con_u); + // XX always close here? + uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free); + } + } +} + +static void +_proxy_reverse(u3_proxy_conn* con_u, u3_noun sip) +{ + uL(fprintf(uH, "proxy: rev\n")); + + // XX free somewhere + u3_proxy_reverse* rev_u = _proxy_reverse_new(con_u, sip); + + struct sockaddr_in add_u; + + memset(&add_u, 0, sizeof(add_u)); + add_u.sin_family = AF_INET; + add_u.sin_addr.s_addr = INADDR_ANY; + + // first available + add_u.sin_port = 0; + + uv_tcp_init(u3L, &rev_u->tcp_u); + + c3_i sas_i; + + sas_i = uv_tcp_bind(&rev_u->tcp_u, (const struct sockaddr*)&add_u, 0); + + if ( 0 != sas_i || + 0 != (sas_i = uv_listen((uv_stream_t*)&rev_u->tcp_u, + TCP_BACKLOG, _proxy_reverse_listen_cb)) ) { + if ( UV_EADDRINUSE == sas_i ) { + uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i))); + + //XX wat do + _proxy_conn_close(con_u); + uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free); + + return; + } + } + + c3_i len_i = sizeof(add_u); + + memset(&add_u, 0, sizeof(add_u)); + + if ( 0 != (sas_i = uv_tcp_getsockname(&rev_u->tcp_u, + (struct sockaddr*)&add_u, + &len_i)) ) { + uL(fprintf(uH, "proxy: sockname: %s\n", uv_strerror(sas_i))); + + // XX wat do + _proxy_conn_close(con_u); + uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free); + + } else { + rev_u->por_s = ntohs(add_u.sin_port); + + uL(fprintf(uH, "proxy: listen: %d\n", rev_u->por_s)); + + // XX u3v_plan + } +} + +static c3_c* +_proxy_parse_host(const uv_buf_t* buf_u) +{ + c3_c* hot_c = 0; + + struct phr_header hed_u[H2O_MAX_HEADERS]; + size_t hed_t = H2O_MAX_HEADERS; + + { + size_t len_t = buf_u->len < H2O_MAX_REQLEN ? buf_u->len : H2O_MAX_REQLEN; + // XX slowloris? + c3_i las_i = 0; + c3_i ret_i; + + // unused + c3_i ver_i; + const c3_c* met_c; + size_t met_t; + const c3_c* pat_c; + size_t pat_t; + + ret_i = phr_parse_request(buf_u->base, len_t, &met_c, &met_t, + &pat_c, &pat_t, &ver_i, hed_u, &hed_t, las_i); + + if ( -1 == ret_i ) { + // parse error + // XX log error? close connection? + return hot_c; + } + else if ( -2 == ret_i ) { + // incomplete + // XX await next buffer? + } + } + + const h2o_token_t* tok_t; + size_t i; + + for ( i = 0; i < hed_t; i++ ) { + // XX in-place, copy first + h2o_strtolower((c3_c*)hed_u[i].name, hed_u[i].name_len); + + if ( 0 != (tok_t = h2o_lookup_token(hed_u[i].name, hed_u[i].name_len)) ) { + if ( tok_t->is_init_header_special && H2O_TOKEN_HOST == tok_t ) { + + hot_c = c3_malloc(1 + hed_u[i].value_len); + hot_c[hed_u[i].value_len] = 0; + memcpy(hot_c, hed_u[i].value, hed_u[i].value_len); + break; + } + } + } + + return hot_c; +} + +static u3_noun +_proxy_dest(u3_proxy_conn* con_u) +{ + c3_c* hot_c; + c3_c* dom_c; + + c3_assert( 0 != con_u->buf_u.base ); + + if ( c3n == con_u->lis_u->sec ) { + hot_c = _proxy_parse_host(&con_u->buf_u); + } else { + // XX - SNI + hot_c = 0; + } + + // XX signal close connection on parse failure? + if ( 0 == hot_c ) { + return u3_nul; + } + + uL(fprintf(uH, "proxy: host: %s\n", hot_c)); + + dom_c = strchr(hot_c, '.'); + + if ( 0 == dom_c ) { + free(hot_c); + return u3_nul; + } + + uL(fprintf(uH, "proxy: domain: %s\n", dom_c)); + + // XX get from -H + // XX check EOS or port to prevent length extension + if ( 0 != strncmp(dom_c, ".urbit.org", strlen(".urbit.org")) ) { + free(hot_c); + return u3_nul; + } + + { + u3_noun sip; + + c3_w dif_w = dom_c - hot_c; + c3_c* sip_c = c3_malloc(2 + dif_w); + strncpy(sip_c + 1, hot_c, dif_w); + sip_c[0] = '~'; + sip_c[1 + dif_w] = 0; + + sip = u3dc("slaw", 'p', u3i_string(sip_c)); + + uL(fprintf(uH, "proxy: parsed\n")); + + free(sip_c); + free(hot_c); + return sip; + } +} + +static void +_proxy_read_dest_cb(uv_stream_t* don_u, + ssize_t siz_w, + const uv_buf_t * buf_u) +{ + u3_proxy_conn* con_u = don_u->data; + + uL(fprintf(uH, "proxy: peek cb\n")); + + uv_read_stop(don_u); + + if ( (UV_EOF == siz_w) || (0 > siz_w) ) { + uL(fprintf(uH, "proxy: peek nope\n")); + // XX wat do? + _proxy_conn_close(con_u); + } + else { + uL(fprintf(uH, "proxy: peek yep\n")); + + // XX suport multiple + con_u->buf_u = uv_buf_init(buf_u->base, buf_u->len); + + u3_noun sip = _proxy_dest(con_u); + + uL(fprintf(uH, "proxy: sip\n")); + + if ( u3_nul == sip ) { + uL(fprintf(uH, "proxy: sip nul\n")); + _proxy_lopc(con_u); + } + else { + u3_noun hip = u3k(u3t(sip)); + u3_noun own = u3A->own; + c3_o our = c3n; + + while ( u3_nul != own ) { + if ( c3y == u3r_sing(hip, u3h(own)) ) { + our = c3y; + break; + } + own = u3t(own); + } + + if ( c3y == our ) { + uL(fprintf(uH, "proxy: sip us\n")); + _proxy_lopc(con_u); + } + else { + // XX check if (sein:title sip) == our + // XX check will + uL(fprintf(uH, "proxy: sip them\n")); + _proxy_reverse(con_u, hip); + } + } + + u3z(sip); + } +} + static void _proxy_sock_new(u3_proxy_listener* lis_u) { u3_proxy_conn* con_u = _proxy_conn_new(lis_u); - // init and accept downstream - uv_tcp_init(u3L, &con_u->don_u); c3_i sas_i; - if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->don_u)) ) { + if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, + (uv_stream_t*)&con_u->don_u)) ) { uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i))); _proxy_conn_close(con_u); } else { - // uv_read_start((uv_stream_t*)client, _proxy_alloc, _proxy_sock_read_cb); - - //XX parse for HOST or SNI (based on lis_u->sec) - - // IF US (or unrecognized?) - // open loopback and write - // set it up to keep writing - // IF CHILD - // bind new socket - // send a move to %eyre on child, specifying socket (assume star.urbit.org?) - // save buffer - // on accept, write_start - // OTHERWISE, CLOSE - - _proxy_lopc(con_u); + uv_read_start((uv_stream_t*)&con_u->don_u, + _proxy_alloc, _proxy_read_dest_cb); } } @@ -1308,7 +1510,6 @@ _proxy_sock_listen_cb(uv_stream_t* sev_u, c3_i sas_i) } } - static void _proxy_sock_start(void) { @@ -1337,47 +1538,3 @@ _proxy_sock_start(void) } } } - -// XX use later -static void -_proxy_reverse_start(void) -{ - struct sockaddr_in add_u; - - // XX free somewhere - uv_tcp_t* server = c3_malloc(sizeof(*server)); - - uv_tcp_init(u3L, server); - - memset(&add_u, 0, sizeof(add_u)); - add_u.sin_family = AF_INET; - add_u.sin_addr.s_addr = INADDR_ANY; - // add_u.sin_port = htons(9090); - - c3_i sas_i; - - sas_i = uv_tcp_bind(server, (const struct sockaddr*)&add_u, 0); - - if ( 0 != sas_i || - 0 != (sas_i = uv_listen((uv_stream_t*)server, - TCP_BACKLOG, _proxy_sock_listen_cb)) ) { - if ( UV_EADDRINUSE == sas_i ) { - //wat do - uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i))); - return; - } - } - - { - struct sockaddr_in add_u; - c3_i len_i = sizeof(add_u); - - memset(&add_u, 0, sizeof(add_u)); - - if ( 0 != (sas_i = uv_tcp_getsockname(server, (struct sockaddr*)&add_u, &len_i)) ) { - uL(fprintf(uH, "proxy: sockname: %s\n", uv_strerror(sas_i))); - } else { - uL(fprintf(uH, "proxy: listen: %d\n", ntohs(add_u.sin_port))); - } - } -}