implements reverse listener and "should-proxy" predicate, refactors loopback

This commit is contained in:
Joe Bryan 2018-06-04 22:50:29 -04:00
parent 2eb44fd326
commit 4792466ed5

View File

@ -922,18 +922,16 @@ typedef struct _u3_proxy_writ {
typedef struct _u3_proxy_conn {
uv_tcp_t don_u; // downstream handle
uv_tcp_t* upt_u; // upstream handle XX union of local connect and reverse listener?
u3_hbod* bod_u; // pending buffers
uv_buf_t buf_u; // pending buffer XX support multiple
struct _u3_proxy_listener* lis_u;
struct _u3_proxy_conn* nex_u;
} u3_proxy_conn;
// XX probably not right
typedef struct _u3_proxy_reverse {
uv_tcp_t tcp_u;
u3_atom sip; // reverse proxy for ship
c3_s por_s;
u3_hbod* bod_u; // pending buffers
struct _u3_proxy_listener* lis_u;
struct _u3_proxy_conn* con_u;
struct _u3_proxy_reverse* nex_u;
} u3_proxy_reverse;
@ -963,7 +961,10 @@ _proxy_alloc(uv_handle_t* had_u,
static void
_proxy_writ_free(u3_proxy_writ* ruq_u)
{
free(ruq_u->buf_y);
if ( 0 != ruq_u->buf_y ) {
free(ruq_u->buf_y);
}
free(ruq_u);
}
@ -977,23 +978,15 @@ _proxy_writ_new(u3_proxy_conn* con_u, c3_y* buf_y)
return ruq_u;
}
// XX deduplicate with _cttp_bods_free()
static void
_proxy_bods_free(u3_hbod* bod_u)
{
while ( bod_u ) {
u3_hbod* nex_u = bod_u->nex_u;
free(bod_u);
bod_u = nex_u;
}
}
static void
_proxy_conn_free(u3_proxy_conn* con_u)
{
_proxy_bods_free(con_u->bod_u);
if ( 0 != con_u->buf_u.base ) {
free(con_u->buf_u.base);
}
free(con_u);
// XX detach from listener
}
@ -1013,7 +1006,7 @@ _proxy_conn_new(u3_proxy_listener* lis_u)
u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u));
con_u->lis_u = lis_u;
con_u->upt_u = 0;
con_u->bod_u = 0;
con_u->buf_u = uv_buf_init(0, 0);
con_u->nex_u = 0;
con_u->don_u.data = con_u;
@ -1033,16 +1026,16 @@ _proxy_reverse_free(u3_proxy_reverse* rev_u)
}
static u3_proxy_reverse*
_proxy_reverse_new(u3_proxy_listener* lis_u, u3_atom sip)
_proxy_reverse_new(u3_proxy_conn* con_u, u3_atom sip)
{
u3_proxy_reverse* rev_u = c3_malloc(sizeof(*rev_u));
rev_u->tcp_u.data = rev_u;
rev_u->lis_u = lis_u;
rev_u->con_u = con_u;
rev_u->sip = sip;
rev_u->por_s = 0; // set after opened
rev_u->bod_u = 0;
rev_u->nex_u = 0;
// XX link to listener
// XX link to global state
return rev_u;
}
@ -1086,63 +1079,6 @@ _proxy_write_cb(uv_write_t* wri_u, c3_i sas_i)
_proxy_writ_free(ruq_u);
}
static c3_c*
_proxy_parse_host(const uv_buf_t* buf_u)
{
c3_c* hot_c = 0;
struct phr_header hed_u[H2O_MAX_HEADERS];
size_t hed_t = H2O_MAX_HEADERS;
{
size_t len_t = buf_u->len < H2O_MAX_REQLEN ? buf_u->len : H2O_MAX_REQLEN;
// XX slowloris?
c3_i las_i = 0;
c3_i ret_i;
// unused
c3_i ver_i;
const c3_c* met_c;
size_t met_t;
const c3_c* pat_c;
size_t pat_t;
ret_i = phr_parse_request(buf_u->base, len_t, &met_c, &met_t,
&pat_c, &pat_t, &ver_i, hed_u, &hed_t, las_i);
if ( -1 == ret_i ) {
// parse error
// XX log error? close connection?
return hot_c;
}
else if ( -2 == ret_i ) {
// incomplete
// XX await next buffer?
}
}
const h2o_token_t* tok_t;
size_t i;
for ( i = 0; i < hed_t; i++ ) {
h2o_strtolower((c3_c*)hed_u[i].name, hed_u[i].name_len);
if ( 0 != (tok_t = h2o_lookup_token(hed_u[i].name, hed_u[i].name_len)) ) {
if ( tok_t->is_init_header_special && H2O_TOKEN_HOST == tok_t ) {
hot_c = c3_malloc(1 + hed_u[i].value_len);
hot_c[hed_u[i].value_len] = 0;
memcpy(hot_c, hed_u[i].value, hed_u[i].value_len);
uL(fprintf(uH, "proxy: host: %s\n", hot_c));
break;
}
}
}
return hot_c;
}
static void
_proxy_sock_read_downstream_cb(uv_stream_t* don_u,
ssize_t siz_w,
@ -1154,8 +1090,6 @@ _proxy_sock_read_downstream_cb(uv_stream_t* don_u,
_proxy_conn_close(con_u);
}
else {
_proxy_parse_host(buf_u);
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
c3_i sas_i;
@ -1198,7 +1132,23 @@ _proxy_sock_read_upstream_cb(uv_stream_t* upt_u,
static void
_proxy_fire(u3_proxy_conn* con_u)
{
// XX uv_write all in bod_u to upt_u
if ( 0 != con_u->buf_u.base ) {
// XX free it later ...
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, 0);
c3_i sas_i;
if ( 0 != (sas_i = uv_write(&ruq_u->wri_u,
(uv_stream_t*)con_u->upt_u,
&con_u->buf_u, 1,
_proxy_write_cb)) ) {
uL(fprintf(uH, "proxy: write pending: %s\n", uv_strerror(sas_i)));
// XX wat do
_proxy_conn_close(con_u);
_proxy_writ_free(ruq_u);
return;
}
}
// XX set cooldown timers to close these?
@ -1209,6 +1159,7 @@ _proxy_fire(u3_proxy_conn* con_u)
static void
_proxy_lopc_connect_cb(uv_connect_t * upc_u, c3_i sas_i)
{
uL(fprintf(uH, "proxy: lopc cb\n"));
uv_tcp_t* upt_u = upc_u->data;
u3_proxy_conn* con_u = upt_u->data;
@ -1262,36 +1213,287 @@ _proxy_lopc(u3_proxy_conn* con_u)
uv_tcp_connect(upc_u, upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb);
}
static void
_proxy_reverse_listen_cb(uv_stream_t* tcp_u, c3_i sas_i)
{
uL(fprintf(uH, "proxy: rev cb\n"));
u3_proxy_reverse* rev_u = (u3_proxy_reverse*)tcp_u;
if ( 0 != sas_i ) {
uL(fprintf(uH, "proxy: listen_cb: %s\n", uv_strerror(sas_i)));
//XX wat do
_proxy_conn_close(rev_u->con_u);
uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free);
}
else {
uv_tcp_t* upt_u = c3_malloc(sizeof(*upt_u));
rev_u->con_u->upt_u = upt_u;
uv_tcp_init(u3L, upt_u);
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&rev_u->tcp_u, (uv_stream_t*)upt_u)) ) {
uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i)));
_proxy_conn_close(rev_u->con_u);
uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free);
}
else {
_proxy_fire(rev_u->con_u);
// XX always close here?
uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free);
}
}
}
static void
_proxy_reverse(u3_proxy_conn* con_u, u3_noun sip)
{
uL(fprintf(uH, "proxy: rev\n"));
// XX free somewhere
u3_proxy_reverse* rev_u = _proxy_reverse_new(con_u, sip);
struct sockaddr_in add_u;
memset(&add_u, 0, sizeof(add_u));
add_u.sin_family = AF_INET;
add_u.sin_addr.s_addr = INADDR_ANY;
// first available
add_u.sin_port = 0;
uv_tcp_init(u3L, &rev_u->tcp_u);
c3_i sas_i;
sas_i = uv_tcp_bind(&rev_u->tcp_u, (const struct sockaddr*)&add_u, 0);
if ( 0 != sas_i ||
0 != (sas_i = uv_listen((uv_stream_t*)&rev_u->tcp_u,
TCP_BACKLOG, _proxy_reverse_listen_cb)) ) {
if ( UV_EADDRINUSE == sas_i ) {
uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i)));
//XX wat do
_proxy_conn_close(con_u);
uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free);
return;
}
}
c3_i len_i = sizeof(add_u);
memset(&add_u, 0, sizeof(add_u));
if ( 0 != (sas_i = uv_tcp_getsockname(&rev_u->tcp_u,
(struct sockaddr*)&add_u,
&len_i)) ) {
uL(fprintf(uH, "proxy: sockname: %s\n", uv_strerror(sas_i)));
// XX wat do
_proxy_conn_close(con_u);
uv_close((uv_handle_t*)&rev_u->tcp_u, (uv_close_cb)_proxy_reverse_free);
} else {
rev_u->por_s = ntohs(add_u.sin_port);
uL(fprintf(uH, "proxy: listen: %d\n", rev_u->por_s));
// XX u3v_plan
}
}
static c3_c*
_proxy_parse_host(const uv_buf_t* buf_u)
{
c3_c* hot_c = 0;
struct phr_header hed_u[H2O_MAX_HEADERS];
size_t hed_t = H2O_MAX_HEADERS;
{
size_t len_t = buf_u->len < H2O_MAX_REQLEN ? buf_u->len : H2O_MAX_REQLEN;
// XX slowloris?
c3_i las_i = 0;
c3_i ret_i;
// unused
c3_i ver_i;
const c3_c* met_c;
size_t met_t;
const c3_c* pat_c;
size_t pat_t;
ret_i = phr_parse_request(buf_u->base, len_t, &met_c, &met_t,
&pat_c, &pat_t, &ver_i, hed_u, &hed_t, las_i);
if ( -1 == ret_i ) {
// parse error
// XX log error? close connection?
return hot_c;
}
else if ( -2 == ret_i ) {
// incomplete
// XX await next buffer?
}
}
const h2o_token_t* tok_t;
size_t i;
for ( i = 0; i < hed_t; i++ ) {
// XX in-place, copy first
h2o_strtolower((c3_c*)hed_u[i].name, hed_u[i].name_len);
if ( 0 != (tok_t = h2o_lookup_token(hed_u[i].name, hed_u[i].name_len)) ) {
if ( tok_t->is_init_header_special && H2O_TOKEN_HOST == tok_t ) {
hot_c = c3_malloc(1 + hed_u[i].value_len);
hot_c[hed_u[i].value_len] = 0;
memcpy(hot_c, hed_u[i].value, hed_u[i].value_len);
break;
}
}
}
return hot_c;
}
static u3_noun
_proxy_dest(u3_proxy_conn* con_u)
{
c3_c* hot_c;
c3_c* dom_c;
c3_assert( 0 != con_u->buf_u.base );
if ( c3n == con_u->lis_u->sec ) {
hot_c = _proxy_parse_host(&con_u->buf_u);
} else {
// XX - SNI
hot_c = 0;
}
// XX signal close connection on parse failure?
if ( 0 == hot_c ) {
return u3_nul;
}
uL(fprintf(uH, "proxy: host: %s\n", hot_c));
dom_c = strchr(hot_c, '.');
if ( 0 == dom_c ) {
free(hot_c);
return u3_nul;
}
uL(fprintf(uH, "proxy: domain: %s\n", dom_c));
// XX get from -H
// XX check EOS or port to prevent length extension
if ( 0 != strncmp(dom_c, ".urbit.org", strlen(".urbit.org")) ) {
free(hot_c);
return u3_nul;
}
{
u3_noun sip;
c3_w dif_w = dom_c - hot_c;
c3_c* sip_c = c3_malloc(2 + dif_w);
strncpy(sip_c + 1, hot_c, dif_w);
sip_c[0] = '~';
sip_c[1 + dif_w] = 0;
sip = u3dc("slaw", 'p', u3i_string(sip_c));
uL(fprintf(uH, "proxy: parsed\n"));
free(sip_c);
free(hot_c);
return sip;
}
}
static void
_proxy_read_dest_cb(uv_stream_t* don_u,
ssize_t siz_w,
const uv_buf_t * buf_u)
{
u3_proxy_conn* con_u = don_u->data;
uL(fprintf(uH, "proxy: peek cb\n"));
uv_read_stop(don_u);
if ( (UV_EOF == siz_w) || (0 > siz_w) ) {
uL(fprintf(uH, "proxy: peek nope\n"));
// XX wat do?
_proxy_conn_close(con_u);
}
else {
uL(fprintf(uH, "proxy: peek yep\n"));
// XX suport multiple
con_u->buf_u = uv_buf_init(buf_u->base, buf_u->len);
u3_noun sip = _proxy_dest(con_u);
uL(fprintf(uH, "proxy: sip\n"));
if ( u3_nul == sip ) {
uL(fprintf(uH, "proxy: sip nul\n"));
_proxy_lopc(con_u);
}
else {
u3_noun hip = u3k(u3t(sip));
u3_noun own = u3A->own;
c3_o our = c3n;
while ( u3_nul != own ) {
if ( c3y == u3r_sing(hip, u3h(own)) ) {
our = c3y;
break;
}
own = u3t(own);
}
if ( c3y == our ) {
uL(fprintf(uH, "proxy: sip us\n"));
_proxy_lopc(con_u);
}
else {
// XX check if (sein:title sip) == our
// XX check will
uL(fprintf(uH, "proxy: sip them\n"));
_proxy_reverse(con_u, hip);
}
}
u3z(sip);
}
}
static void
_proxy_sock_new(u3_proxy_listener* lis_u)
{
u3_proxy_conn* con_u = _proxy_conn_new(lis_u);
// init and accept downstream
uv_tcp_init(u3L, &con_u->don_u);
c3_i sas_i;
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->don_u)) ) {
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u,
(uv_stream_t*)&con_u->don_u)) ) {
uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i)));
_proxy_conn_close(con_u);
}
else {
// uv_read_start((uv_stream_t*)client, _proxy_alloc, _proxy_sock_read_cb);
//XX parse for HOST or SNI (based on lis_u->sec)
// IF US (or unrecognized?)
// open loopback and write
// set it up to keep writing
// IF CHILD
// bind new socket
// send a move to %eyre on child, specifying socket (assume star.urbit.org?)
// save buffer
// on accept, write_start
// OTHERWISE, CLOSE
_proxy_lopc(con_u);
uv_read_start((uv_stream_t*)&con_u->don_u,
_proxy_alloc, _proxy_read_dest_cb);
}
}
@ -1308,7 +1510,6 @@ _proxy_sock_listen_cb(uv_stream_t* sev_u, c3_i sas_i)
}
}
static void
_proxy_sock_start(void)
{
@ -1337,47 +1538,3 @@ _proxy_sock_start(void)
}
}
}
// XX use later
static void
_proxy_reverse_start(void)
{
struct sockaddr_in add_u;
// XX free somewhere
uv_tcp_t* server = c3_malloc(sizeof(*server));
uv_tcp_init(u3L, server);
memset(&add_u, 0, sizeof(add_u));
add_u.sin_family = AF_INET;
add_u.sin_addr.s_addr = INADDR_ANY;
// add_u.sin_port = htons(9090);
c3_i sas_i;
sas_i = uv_tcp_bind(server, (const struct sockaddr*)&add_u, 0);
if ( 0 != sas_i ||
0 != (sas_i = uv_listen((uv_stream_t*)server,
TCP_BACKLOG, _proxy_sock_listen_cb)) ) {
if ( UV_EADDRINUSE == sas_i ) {
//wat do
uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i)));
return;
}
}
{
struct sockaddr_in add_u;
c3_i len_i = sizeof(add_u);
memset(&add_u, 0, sizeof(add_u));
if ( 0 != (sas_i = uv_tcp_getsockname(server, (struct sockaddr*)&add_u, &len_i)) ) {
uL(fprintf(uH, "proxy: sockname: %s\n", uv_strerror(sas_i)));
} else {
uL(fprintf(uH, "proxy: listen: %d\n", ntohs(add_u.sin_port)));
}
}
}