refactors loopback proxy and connection structs, corrects close/free lifecycle

This commit is contained in:
Joe Bryan 2018-06-01 21:10:30 -04:00
parent 3c24756d28
commit 65ac6dd5e0

View File

@ -917,15 +917,15 @@ typedef struct _u3_proxy_writ {
c3_y* buf_y;
} u3_proxy_writ;
// add server and data fields
typedef struct _u3_proxy_conn {
uv_tcp_t client;
uv_tcp_t upt_u;
uv_connect_t upc_u; // XX pointer so nullable for reverse case?
uv_tcp_t don_u; // downstream handle
uv_tcp_t* upt_u; // upstream handle XX union of local connect and reverse listener?
u3_hbod* bod_u; // pending buffers
struct _u3_proxy_listener* lis_u;
struct _u3_proxy_conn* nex_u;
} u3_proxy_conn;
// XX probably not right
typedef struct _u3_proxy_reverse {
uv_tcp_t tcp_u;
u3_atom sip; // reverse proxy for ship
@ -950,9 +950,8 @@ typedef struct _u3_proxy_listener {
static void
_proxy_alloc(uv_handle_t* had_u,
size_t len_i,
uv_buf_t* buf
)
size_t len_i,
uv_buf_t* buf)
{
void* ptr_v = c3_malloc(len_i);
*buf = uv_buf_init(ptr_v, len_i);
@ -976,19 +975,34 @@ _proxy_writ_new(u3_proxy_conn* con_u, c3_y* buf_y)
return ruq_u;
}
// XX deduplicate with _cttp_bods_free()
static void
_proxy_bods_free(u3_hbod* bod_u)
{
while ( bod_u ) {
u3_hbod* nex_u = bod_u->nex_u;
free(bod_u);
bod_u = nex_u;
}
}
static void
_proxy_conn_free(u3_proxy_conn* con_u)
{
_proxy_bods_free(con_u->bod_u);
free(con_u);
// XX detach from listener
}
// XX this gets called for both sides, wat do
static void
_proxy_conn_free_uv(uv_handle_t* han_u)
_proxy_conn_close(u3_proxy_conn* con_u)
{
u3_proxy_conn* con_u = han_u->data;
// _proxy_conn_free(con_u);
uv_close((uv_handle_t*)&con_u->don_u, (uv_close_cb)_proxy_conn_free);
if ( 0 != con_u->upt_u ) {
uv_close((uv_handle_t*)con_u->upt_u, (uv_close_cb)free);
}
}
static u3_proxy_conn*
@ -996,10 +1010,11 @@ _proxy_conn_new(u3_proxy_listener* lis_u)
{
u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u));
con_u->lis_u = lis_u;
con_u->upt_u = 0;
con_u->bod_u = 0;
con_u->nex_u = 0;
con_u->client.data = con_u;
con_u->upt_u.data = con_u;
con_u->upc_u.data = con_u;
con_u->don_u.data = con_u;
// XX link to listener
@ -1058,77 +1073,132 @@ static void
_proxy_write_cb(uv_write_t* wri_u, c3_i sas_i)
{
u3_proxy_writ* ruq_u = (u3_proxy_writ*)wri_u;
// u3_proxy_conn* con_u = ruq_u->wri_u.data;
if ( 0 != sas_i ) {
uL(fprintf(uH, "proxy: write: %s\n", uv_strerror(sas_i)));
// close and free con_u (ruq_u->wri_u.data)?
// periodically cores, already closing, broken pipe
// _proxy_conn_close(con_u);
}
_proxy_writ_free(ruq_u);
}
static void
_proxy_sock_read_client_cb(uv_stream_t* tcp_u,
ssize_t siz_w,
const uv_buf_t * buf_u)
_proxy_sock_read_downstream_cb(uv_stream_t* don_u,
ssize_t siz_w,
const uv_buf_t * buf_u)
{
u3_proxy_conn* con_u = tcp_u->data;
u3_proxy_conn* con_u = don_u->data;
if ( UV_EOF == siz_w ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
} else if ( 0 > siz_w ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
if ( (UV_EOF == siz_w) || (0 > siz_w) ) {
_proxy_conn_close(con_u);
}
else {
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
if ( 0 != uv_write(&ruq_u->wri_u,
(uv_stream_t*)&(con_u->upt_u),
buf_u, 1,
_proxy_write_cb) ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
c3_i sas_i;
if ( 0 != (sas_i = uv_write(&ruq_u->wri_u,
(uv_stream_t*)con_u->upt_u,
buf_u, 1,
_proxy_write_cb)) ) {
uL(fprintf(uH, "proxy: read downstream: %s\n", uv_strerror(sas_i)));
_proxy_conn_close(con_u);
_proxy_writ_free(ruq_u);
}
}
}
static void
_proxy_sock_read_upstream_cb(uv_stream_t* tcp_u,
_proxy_sock_read_upstream_cb(uv_stream_t* upt_u,
ssize_t siz_w,
const uv_buf_t * buf_u)
{
u3_proxy_conn* con_u = tcp_u->data;
u3_proxy_conn* con_u = upt_u->data;
if ( UV_EOF == siz_w ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
} else if ( 0 > siz_w ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
if ( (UV_EOF == siz_w) || (0 > siz_w) ) {
_proxy_conn_close(con_u);
}
else {
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
if ( 0 != uv_write(&ruq_u->wri_u,
(uv_stream_t*)&(con_u->client),
buf_u, 1,
_proxy_write_cb) ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
c3_i sas_i;
if ( 0 != (sas_i = uv_write(&ruq_u->wri_u,
(uv_stream_t*)&(con_u->don_u),
buf_u, 1,
_proxy_write_cb)) ) {
uL(fprintf(uH, "proxy: read upstream: %s\n", uv_strerror(sas_i)));
_proxy_conn_close(con_u);
_proxy_writ_free(ruq_u);
}
}
}
static void
_proxy_fire(u3_proxy_conn* con_u)
{
// XX uv_write all in bod_u to upt_u
// XX set cooldown timers to close these?
uv_read_start((uv_stream_t*)&con_u->don_u, _proxy_alloc, _proxy_sock_read_downstream_cb);
uv_read_start((uv_stream_t*)con_u->upt_u, _proxy_alloc, _proxy_sock_read_upstream_cb);
}
static void
_proxy_lopc_connect_cb(uv_connect_t * upc_u, c3_i sas_i)
{
u3_proxy_conn* con_u = upc_u->data;
uv_tcp_t* upt_u = upc_u->data;
u3_proxy_conn* con_u = upt_u->data;
con_u->upt_u = upt_u;
free(upc_u);
if ( 0 != sas_i ) {
uL(fprintf(uH, "proxy: connect: %s\n", uv_strerror(sas_i)));
// close and free con_u
_proxy_conn_close(con_u);
}
uv_read_start((uv_stream_t*)&con_u->client, _proxy_alloc, _proxy_sock_read_client_cb);
uv_read_start((uv_stream_t*)&con_u->upt_u, _proxy_alloc, _proxy_sock_read_upstream_cb);
_proxy_fire(con_u);
}
static void
_proxy_lopc(u3_proxy_conn* con_u)
{
uv_tcp_t* upt_u = c3_malloc(sizeof(*upt_u));
// not yet linked in reverse
upt_u->data = con_u;
uv_tcp_init(u3L, upt_u);
struct sockaddr_in lop_u;
memset(&lop_u, 0, sizeof(lop_u));
lop_u.sin_family = AF_INET;
lop_u.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
// get the appropriate loopback port
{
c3_s por_s = 0;
u3_http* htp_u;
for ( htp_u = u3_Host.htp_u; (0 != htp_u); htp_u = htp_u->nex_u ) {
if ( c3n == htp_u->lop && con_u->lis_u->sec == htp_u->sec ) {
por_s = htp_u->por_w;
}
}
c3_assert( 0 != por_s );
lop_u.sin_port = htons(por_s);
}
uv_connect_t* upc_u = c3_malloc(sizeof(*upc_u));
upc_u->data = upt_u;
uv_tcp_connect(upc_u, upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb);
}
static void
@ -1136,44 +1206,21 @@ _proxy_sock_new(u3_proxy_listener* lis_u)
{
u3_proxy_conn* con_u = _proxy_conn_new(lis_u);
uv_tcp_init(u3L, &con_u->client);
// init and accept downstream
uv_tcp_init(u3L, &con_u->don_u);
c3_i sas_i;
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->client)) ) {
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->don_u)) ) {
uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i)));
uv_close((uv_handle_t*)&con_u->client, _proxy_conn_free_uv);
_proxy_conn_close(con_u);
}
else {
struct sockaddr_in lop_u;
memset(&lop_u, 0, sizeof(lop_u));
lop_u.sin_family = AF_INET;
lop_u.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
// get the appropriate loopback port
{
c3_s por_s = 0;
u3_http* htp_u;
for ( htp_u = u3_Host.htp_u; (0 != htp_u); htp_u = htp_u->nex_u ) {
if ( c3n == htp_u->lop && lis_u->sec == htp_u->sec ) {
por_s = htp_u->por_w;
}
}
c3_assert( 0 != por_s );
lop_u.sin_port = htons(por_s);
}
uv_tcp_init(u3L, &con_u->upt_u);
uv_tcp_connect(&con_u->upc_u, &con_u->upt_u, (const struct sockaddr*)&lop_u, _proxy_lopc_connect_cb);
// uv_read_start((uv_stream_t*)client, _proxy_alloc, _proxy_sock_read_cb);
//XX parse for HOST or SNI
// IF US
//XX parse for HOST or SNI (based on lis_u->sec)
// IF US (or unrecognized?)
// open loopback and write
// set it up to keep writing
// IF CHILD
@ -1182,6 +1229,8 @@ _proxy_sock_new(u3_proxy_listener* lis_u)
// save buffer
// on accept, write_start
// OTHERWISE, CLOSE
_proxy_lopc(con_u);
}
}