adds proxy listener and connection structs, refactors loopback proxy

This commit is contained in:
Joe Bryan 2018-06-01 16:54:39 -04:00
parent c4928a7f3b
commit e1b5ab35cd

View File

@ -908,15 +908,9 @@ u3_http_io_exit(void)
_http_release_ports_file(u3_Host.dir_c);
}
static void
_proxy_alloc(uv_handle_t* had_u,
size_t len_i,
uv_buf_t* buf
)
{
void* ptr_v = c3_malloc(len_i);
*buf = uv_buf_init(ptr_v, len_i);
}
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
typedef struct _u3_proxy_writ {
uv_write_t wri_u;
@ -927,9 +921,139 @@ typedef struct _u3_proxy_writ {
typedef struct _u3_proxy_conn {
uv_tcp_t client;
uv_tcp_t upt_u;
uv_connect_t upc_u;
uv_connect_t upc_u; // XX pointer so nullable for reverse case?
struct _u3_proxy_listener* lis_u;
struct _u3_proxy_conn* nex_u;
} u3_proxy_conn;
typedef struct _u3_proxy_reverse {
uv_tcp_t tcp_u;
u3_atom sip; // reverse proxy for ship
c3_s por_s;
u3_hbod* bod_u; // pending buffers
struct _u3_proxy_listener* lis_u;
struct _u3_proxy_reverse* nex_u;
} u3_proxy_reverse;
typedef struct _u3_proxy_listener {
uv_tcp_t sev_u; // server handle
c3_s por_s;
c3_o sec; // yes == https
struct _u3_proxy_conn* con_u; // active connection list
struct _u3_proxy_reverse* rev_u; // active reverse listeners
struct u3_proxy_listener* nex_u; // next listener
} u3_proxy_listener;
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
static void
_proxy_alloc(uv_handle_t* had_u,
size_t len_i,
uv_buf_t* buf
)
{
void* ptr_v = c3_malloc(len_i);
*buf = uv_buf_init(ptr_v, len_i);
}
static void
_proxy_writ_free(u3_proxy_writ* ruq_u)
{
free(ruq_u->buf_y);
free(ruq_u);
}
static u3_proxy_writ*
_proxy_writ_new(u3_proxy_conn* con_u, c3_y* buf_y)
{
u3_proxy_writ* ruq_u = c3_malloc(sizeof(*ruq_u));
ruq_u->wri_u.data = con_u;
ruq_u->buf_y = buf_y;
return ruq_u;
}
static void
_proxy_conn_free(u3_proxy_conn* con_u)
{
free(con_u);
// XX detach from listener
}
// XX this gets called for both sides, wat do
static void
_proxy_conn_free_uv(uv_handle_t* han_u)
{
u3_proxy_conn* con_u = han_u->data;
// _proxy_conn_free(con_u);
}
static u3_proxy_conn*
_proxy_conn_new(u3_proxy_listener* lis_u)
{
u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u));
con_u->lis_u = lis_u;
con_u->client.data = con_u;
con_u->upt_u.data = con_u;
con_u->upc_u.data = con_u;
// XX link to listener
return con_u;
}
static void
_proxy_reverse_free(u3_proxy_reverse* rev_u)
{
u3z(rev_u->sip);
free(rev_u);
// XX free buffers
// XX detach from listener
}
static u3_proxy_reverse*
_proxy_reverse_new(u3_proxy_listener* lis_u, u3_atom sip)
{
u3_proxy_reverse* rev_u = c3_malloc(sizeof(*rev_u));
rev_u->tcp_u.data = rev_u;
rev_u->lis_u = lis_u;
rev_u->sip = sip;
rev_u->por_s = 0; // set after opened
rev_u->bod_u = 0;
rev_u->nex_u = 0;
// XX link to listener
return rev_u;
}
static void
_proxy_listener_free(u3_proxy_listener* lis_u)
{
free(lis_u);
// XX close and free connections
// XX close and free reverse listeners
// XX detach from global state
}
static u3_proxy_listener*
_proxy_listener_new(c3_s por_s, c3_o sec)
{
u3_proxy_listener* lis_u = c3_malloc(sizeof(*lis_u));
lis_u->sev_u.data = lis_u;
lis_u->por_s = por_s;
lis_u->sec = sec;
lis_u->con_u = 0;
lis_u->rev_u = 0;
lis_u->nex_u = 0;
// XX link to global state
return lis_u;
}
static void
_proxy_write_cb(uv_write_t* wri_u, c3_i sas_i)
{
@ -937,9 +1061,10 @@ _proxy_write_cb(uv_write_t* wri_u, c3_i sas_i)
if ( 0 != sas_i ) {
uL(fprintf(uH, "proxy: write: %s\n", uv_strerror(sas_i)));
// close and free ruq_u->wri_u.data
// close and free con_u (ruq_u->wri_u.data)?
}
free(ruq_u->buf_y);
_proxy_writ_free(ruq_u);
}
static void
@ -949,27 +1074,22 @@ _proxy_sock_read_client_cb(uv_stream_t* tcp_u,
{
u3_proxy_conn* con_u = tcp_u->data;
if ( siz_w == UV_EOF ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
if ( UV_EOF == siz_w ) {
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
} else if ( 0 > siz_w ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
}
else {
u3_proxy_writ* ruq_u = c3_malloc(sizeof(*ruq_u));
ruq_u->wri_u.data = con_u;
ruq_u->buf_y = (c3_y*)buf_u->base;
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
if ( 0 != uv_write(&ruq_u->wri_u,
(uv_stream_t*)&(con_u->upt_u),
buf_u, 1,
_proxy_write_cb) ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
_proxy_writ_free(ruq_u);
}
}
// if ( buf_u->base ) {
// free(buf_u->base);
// }
}
static void
@ -980,26 +1100,21 @@ _proxy_sock_read_upstream_cb(uv_stream_t* tcp_u,
u3_proxy_conn* con_u = tcp_u->data;
if ( UV_EOF == siz_w ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
} else if ( 0 > siz_w ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
}
else {
u3_proxy_writ* ruq_u = c3_malloc(sizeof(*ruq_u));
ruq_u->wri_u.data = con_u;
ruq_u->buf_y = (c3_y*)buf_u->base;
u3_proxy_writ* ruq_u = _proxy_writ_new(con_u, (c3_y*)buf_u->base);
if ( 0 != uv_write(&ruq_u->wri_u,
(uv_stream_t*)&(con_u->client),
buf_u, 1,
_proxy_write_cb) ) {
uv_close((uv_handle_t*)tcp_u, 0); // XX
uv_close((uv_handle_t*)tcp_u, _proxy_conn_free_uv);
_proxy_writ_free(ruq_u);
}
}
// if ( buf_u->base ) {
// free(buf_u->base);
// }
}
static void
@ -1017,25 +1132,17 @@ _proxy_lopc_connect_cb(uv_connect_t * upc_u, c3_i sas_i)
}
static void
_proxy_sock_new(uv_stream_t* str_u)
_proxy_sock_new(u3_proxy_listener* lis_u)
{
u3_proxy_conn* con_u = c3_malloc(sizeof(*con_u));
con_u->client.data = con_u;
con_u->upt_u.data = con_u;
con_u->upc_u.data = con_u;
// XX cast and capture str_u
u3_proxy_conn* con_u = _proxy_conn_new(lis_u);
uv_tcp_init(u3L, &con_u->client);
c3_w ret_w;
ret_w = uv_accept(str_u, (uv_stream_t*)&con_u->client);
if ( UV_EOF == ret_w ) {
uL(fprintf(uH, "proxy: accept: ERROR\n"));
c3_i sas_i;
if ( 0 != (sas_i = uv_accept((uv_stream_t*)&lis_u->sev_u, (uv_stream_t*)&con_u->client)) ) {
uL(fprintf(uH, "proxy: accept: %s\n", uv_strerror(sas_i)));
// XX fix
uv_close((uv_handle_t*)&con_u->client, free);
uv_close((uv_handle_t*)&con_u->client, _proxy_conn_free_uv);
}
else {
struct sockaddr_in lop_u;
@ -1063,15 +1170,16 @@ _proxy_sock_new(uv_stream_t* str_u)
}
}
static void
_proxy_sock_listen_cb(uv_stream_t* str_u, c3_i sas_i)
_proxy_sock_listen_cb(uv_stream_t* sev_u, c3_i sas_i)
{
u3_proxy_listener* lis_u = (u3_proxy_listener*)sev_u;
if ( 0 != sas_i ) {
uL(fprintf(uH, "proxy: listen_cb: %s\n", uv_strerror(sas_i)));
}
else {
_proxy_sock_new(str_u);
_proxy_sock_new(lis_u);
}
}
@ -1084,6 +1192,36 @@ _proxy_sock_listen_cb(uv_stream_t* str_u, c3_i sas_i)
static void
_proxy_sock_start(void)
{
u3_proxy_listener* lis_u = _proxy_listener_new(9090, c3n);
uv_tcp_init(u3L, &lis_u->sev_u);
struct sockaddr_in add_u;
memset(&add_u, 0, sizeof(add_u));
add_u.sin_family = AF_INET;
add_u.sin_addr.s_addr = INADDR_ANY;
add_u.sin_port = htons(lis_u->por_s);
c3_i sas_i;
sas_i = uv_tcp_bind(&lis_u->sev_u, (const struct sockaddr*)&add_u, 0);
if ( 0 != sas_i ||
0 != (sas_i = uv_listen((uv_stream_t*)&lis_u->sev_u,
TCP_BACKLOG, _proxy_sock_listen_cb)) ) {
if ( UV_EADDRINUSE == sas_i ) {
uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i)));
//wat do
_proxy_listener_free(lis_u);
}
}
}
// XX use later
static void
_proxy_reverse_start(void)
{
struct sockaddr_in add_u;
@ -1103,7 +1241,7 @@ _proxy_sock_start(void)
if ( 0 != sas_i ||
0 != (sas_i = uv_listen((uv_stream_t*)server,
TCP_BACKLOG, _proxy_sock_listen_cb)) ) {
TCP_BACKLOG, _proxy_sock_listen_cb)) ) {
if ( UV_EADDRINUSE == sas_i ) {
//wat do
uL(fprintf(uH, "proxy: listen: %s\n", uv_strerror(sas_i)));