diff --git a/vere/http.c b/vere/http.c index 1adabb6b0a..e176f2f31e 100644 --- a/vere/http.c +++ b/vere/http.c @@ -917,15 +917,15 @@ typedef struct _u3_proxy_writ { c3_y* buf_y; } u3_proxy_writ; -// add server and data fields typedef struct _u3_proxy_conn { - uv_tcp_t client; - uv_tcp_t upt_u; - uv_connect_t upc_u; // XX pointer so nullable for reverse case? + uv_tcp_t don_u; // downstream handle + uv_tcp_t* upt_u; // upstream handle XX union of local connect and reverse listener? + u3_hbod* bod_u; // pending buffers struct _u3_proxy_listener* lis_u; struct _u3_proxy_conn* nex_u; } u3_proxy_conn; +// XX probably not right typedef struct _u3_proxy_reverse { uv_tcp_t tcp_u; u3_atom sip; // reverse proxy for ship @@ -950,9 +950,8 @@ typedef struct _u3_proxy_listener { static void _proxy_alloc(uv_handle_t* had_u, - size_t len_i, - uv_buf_t* buf - ) + size_t len_i, + uv_buf_t* buf) { void* ptr_v = c3_malloc(len_i); *buf = uv_buf_init(ptr_v, len_i); @@ -976,19 +975,34 @@ _proxy_writ_new(u3_proxy_conn* con_u, c3_y* buf_y) return ruq_u; } +// XX deduplicate with _cttp_bods_free() +static void +_proxy_bods_free(u3_hbod* bod_u) +{ + while ( bod_u ) { + u3_hbod* nex_u = bod_u->nex_u; + + free(bod_u); + bod_u = nex_u; + } +} + static void _proxy_conn_free(u3_proxy_conn* con_u) { + _proxy_bods_free(con_u->bod_u); free(con_u); // XX detach from listener } -// XX this gets called for both sides, wat do static void -_proxy_conn_free_uv(uv_handle_t* han_u) +_proxy_conn_close(u3_proxy_conn* con_u) { - u3_proxy_conn* con_u = han_u->data; - // _proxy_conn_free(con_u); + uv_close((uv_handle_t*)&con_u->don_u, (uv_close_cb)_proxy_conn_free); + + if ( 0 != con_u->upt_u ) { + uv_close((uv_handle_t*)con_u->upt_u, (uv_close_cb)free); + } } static u3_proxy_conn* @@ -996,10 +1010,11 @@ _proxy_conn_new(u3_proxy_listener* lis_u) { u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u)); con_u->lis_u = lis_u; + con_u->upt_u = 0; + con_u->bod_u = 0; + con_u->nex_u = 0; - con_u->client.data = con_u; - con_u->upt_u.data = con_u; - con_u->upc_u.data = con_u; + con_u->don_u.data = con_u; // XX link to listener @@ -1058,77 +1073,132 @@ static void _proxy_write_cb(uv_write_t* wri_u, c3_i sas_i) { u3_proxy_writ* ruq_u = (u3_proxy_writ*)wri_u; + // u3_proxy_conn* con_u = ruq_u->wri_u.data; if ( 0 != sas_i ) { uL(fprintf(uH, "proxy: write: %s\n", uv_strerror(sas_i))); - // close and free con_u (ruq_u->wri_u.data)? + // periodically cores, already closing, broken pipe + // _proxy_conn_close(con_u); } _proxy_writ_free(ruq_u); } static void -_proxy_sock_read_client_cb(uv_stream_t* tcp_u, - ssize_t siz_w, - const uv_buf_t * buf_u) +_proxy_sock_read_downstream_cb(uv_stream_t* don_u, + ssize_t siz_w, + const uv_buf_t * buf_u) { - u3_proxy_conn* con_u = tcp_u->data; + u3_proxy_conn* con_u = don_u->data; - if ( UV_EOF == siz_w ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); - } else if ( 0 > siz_w ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); + if ( (UV_EOF == siz_w) || (0 > siz_w) ) { + _proxy_conn_close(con_u); } else { - u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base); + u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base); - if ( 0 != uv_write(&ruq_u->wri_u, - (uv_stream_t*)&(con_u->upt_u), - buf_u, 1, - _proxy_write_cb) ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); + c3_i sas_i; + if ( 0 != (sas_i = uv_write(&ruq_u->wri_u, + (uv_stream_t*)con_u->upt_u, + buf_u, 1, + _proxy_write_cb)) ) { + uL(fprintf(uH, "proxy: read downstream: %s\n", uv_strerror(sas_i))); + _proxy_conn_close(con_u); _proxy_writ_free(ruq_u); } } } static void -_proxy_sock_read_upstream_cb(uv_stream_t* tcp_u, +_proxy_sock_read_upstream_cb(uv_stream_t* upt_u, ssize_t siz_w, const uv_buf_t * buf_u) { - u3_proxy_conn* con_u = tcp_u->data; + u3_proxy_conn* con_u = upt_u->data; - if ( UV_EOF == siz_w ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); - } else if ( 0 > siz_w ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); + if ( (UV_EOF == siz_w) || (0 > siz_w) ) { + _proxy_conn_close(con_u); } else { u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base); - if ( 0 != uv_write(&ruq_u->wri_u, - (uv_stream_t*)&(con_u->client), - buf_u, 1, - _proxy_write_cb) ) { - uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv); + c3_i sas_i; + if ( 0 != (sas_i = uv_write(&ruq_u->wri_u, + (uv_stream_t*)&(con_u->don_u), + buf_u, 1, + _proxy_write_cb)) ) { + uL(fprintf(uH, "proxy: read upstream: %s\n", uv_strerror(sas_i))); + _proxy_conn_close(con_u); _proxy_writ_free(ruq_u); } } } +static void +_proxy_fire(u3_proxy_conn* con_u) +{ + // XX uv_write all in bod_u to upt_u + + // XX set cooldown timers to close these? + + uv_read_start((uv_stream_t*)&con_u->don_u, _proxy_alloc, _proxy_sock_read_downstream_cb); + uv_read_start((uv_stream_t*)con_u->upt_u, _proxy_alloc, _proxy_sock_read_upstream_cb); +} + static void _proxy_lopc_connect_cb(uv_connect_t * upc_u, c3_i sas_i) { - u3_proxy_conn* con_u = upc_u->data; + uv_tcp_t* upt_u = upc_u->data; + + u3_proxy_conn* con_u = upt_u->data; + con_u->upt_u = upt_u; + + free(upc_u); if ( 0 != sas_i ) { uL(fprintf(uH, "proxy: connect: %s\n", uv_strerror(sas_i))); - // close and free con_u + _proxy_conn_close(con_u); } - uv_read_start((uv_stream_t*)&con_u->client, _proxy_alloc, _proxy_sock_read_client_cb); - uv_read_start((uv_stream_t*)&con_u->upt_u, _proxy_alloc, _proxy_sock_read_upstream_cb); + _proxy_fire(con_u); +} + +static void +_proxy_lopc(u3_proxy_conn* con_u) +{ + uv_tcp_t* upt_u = c3_malloc(sizeof(*upt_u)); + + // not yet linked in reverse + upt_u->data = con_u; + + uv_tcp_init(u3L, upt_u); + + struct sockaddr_in lop_u; + + memset(&lop_u, 0, sizeof(lop_u)); + lop_u.sin_family = AF_INET; + lop_u.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + + // get the appropriate loopback port + { + c3_s por_s = 0; + u3_http* htp_u; + + for ( htp_u = u3_Host.htp_u; (0 != htp_u); htp_u = htp_u->nex_u ) { + if ( c3n == htp_u->lop && con_u->lis_u->sec == htp_u->sec ) { + por_s = htp_u->por_w; + } + } + + c3_assert( 0 != por_s ); + + lop_u.sin_port = htons(por_s); + } + + uv_connect_t* upc_u = c3_malloc(sizeof(*upc_u)); + upc_u->data = upt_u; + + uv_tcp_connect(upc_u, upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb); } static void @@ -1136,44 +1206,21 @@ _proxy_sock_new(u3_proxy_listener* lis_u) { u3_proxy_conn* con_u = _proxy_conn_new(lis_u); - uv_tcp_init(u3L, &con_u->client); + // init and accept downstream + + uv_tcp_init(u3L, &con_u->don_u); c3_i sas_i; - if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->client)) ) { + if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->don_u)) ) { uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i))); - - uv_close((uv_handle_t*)&con_u->client, _proxy_conn_free_uv); + _proxy_conn_close(con_u); } else { - struct sockaddr_in lop_u; - - memset(&lop_u, 0, sizeof(lop_u)); - lop_u.sin_family = AF_INET; - lop_u.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - - // get the appropriate loopback port - { - c3_s por_s = 0; - u3_http* htp_u; - - for ( htp_u = u3_Host.htp_u; (0 != htp_u); htp_u = htp_u->nex_u ) { - if ( c3n == htp_u->lop && lis_u->sec == htp_u->sec ) { - por_s = htp_u->por_w; - } - } - - c3_assert( 0 != por_s ); - - lop_u.sin_port = htons(por_s); - } - - uv_tcp_init(u3L, &con_u->upt_u); - uv_tcp_connect(&con_u->upc_u, &con_u->upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb); - // uv_read_start((uv_stream_t*)client, _proxy_alloc, _proxy_sock_read_cb); - //XX parse for HOST or SNI - // IF US + //XX parse for HOST or SNI (based on lis_u->sec) + + // IF US (or unrecognized?) // open loopback and write // set it up to keep writing // IF CHILD @@ -1182,6 +1229,8 @@ _proxy_sock_new(u3_proxy_listener* lis_u) // save buffer // on accept, write_start // OTHERWISE, CLOSE + + _proxy_lopc(con_u); } }