vere: refactors ames' packet parsing and forwarding

This commit is contained in:
Joe Bryan 2020-10-19 22:59:57 -07:00
parent acbcc53e49
commit e2b47e02c2

View File

@ -86,6 +86,7 @@
u3_lane ore_u; // origin lane
u3_head hed_u; // header
u3_body bod_u; // body
void* ptr_v; // buffer (to free)
} u3_panc;
/* _ames_alloc(): libuv buffer allocator.
@ -116,18 +117,21 @@ _ames_pact_free(u3_pact* pac_u)
/* _ames_panc_free(): remove references, lose refcounts and free struct
static void
_ames_panc_free(u3_panc* pac_u) {
_ames_panc_free(u3_panc* pac_u)
if ( 0 != pac_u->nex_u ) {
pac_u->nex_u->pre_u = pac_u->pre_u;
if ( 0 != pac_u->pre_u ) {
pac_u->pre_u->nex_u = pac_u->nex_u;
} else {
else {
c3_assert(pac_u == pac_u->sam_u->pac_u);
pac_u->sam_u->pac_u = pac_u->nex_u;
@ -141,7 +145,7 @@ _ames_mug_body(c3_w len_w, c3_y* byt_y)
/* _ames_sift_head(): parse packet header.
static void
static c3_o
_ames_sift_head(u3_head* hed_u, c3_y buf_y[4])
c3_w hed_w = buf_y[0]
@ -156,6 +160,7 @@ _ames_sift_head(u3_head* hed_u, c3_y buf_y[4])
hed_u->sac_y = (hed_w >> 23) & 0x3;
hed_u->rac_y = (hed_w >> 25) & 0x3;
hed_u->enc_o = (hed_w >> 27) & 0x1;
return c3y;
/* _ames_etch_head(): serialize packet header.
@ -238,6 +243,30 @@ _ames_ship_of_chubs(c3_d sip_d[2], c3_y len_y, c3_y* buf_y)
memcpy(buf_y, sip_y, c3_min(16, len_y));
/* _ames_sift_body(): parse packet body.
static c3_o
_ames_sift_body(u3_head* hed_u,
u3_body* bod_u,
c3_w len_w,
c3_y* bod_y)
c3_y sen_y = 2 << hed_u->sac_y;
c3_y rec_y = 2 << hed_u->rac_y;
if ( (sen_y + rec_y) >= len_w ) {
return c3n;
else {
_ames_ship_to_chubs(bod_u->sen_d, sen_y, bod_y);
_ames_ship_to_chubs(bod_u->rec_d, rec_y, bod_y + sen_y);
bod_u->con_w = len_w - sen_y - rec_y;
bod_u->con_y = bod_y + sen_y + rec_y;
return c3y;
/* _ames_etch_pack(): serialize packet header and body.
static c3_w
@ -452,17 +481,6 @@ u3_ames_encode_lane(u3_lane lan) {
return u3ke_jam(u3nt(c3__ipv4, u3i_words(1, &lan.pip_w), lan.por_s));
/* _ames_lane_from_sockaddr(): sockaddr_in to lane struct
static u3_lane
_ames_lane_from_sockaddr(struct sockaddr_in* add_u)
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
return lan_u;
/* _ames_lane_into_cache(): put las for who into cache, including timestamp
static void
@ -532,16 +550,27 @@ _ames_serialize_packet(u3_panc* pac_u, c3_o dop_o)
// always using the appropriate galaxy lane instead.
if ( u3_nul == lon ) {
c3_w con_w;
c3_y* con_y;
lon = u3nt(u3_nul, c3n, u3_ames_encode_lane(pac_u->ore_u));
nal_o = c3y;
u3_noun jam = u3ke_jam(u3nc(lon, bod));
pac_u->bod_u.con_w = u3r_met(3, jam);
pac_u->bod_u.con_y = c3_malloc(pac_u->bod_u.con_w);
u3r_bytes(0, pac_u->bod_u.con_w, pac_u->bod_u.con_y, jam);
// XX off-loom jam?
u3_noun jam = u3ke_jam(u3nc(lon, bod));
con_w = u3r_met(3, jam);
con_y = c3_malloc(con_w);
u3r_bytes(0, con_w, con_y, jam);
pac_u->ptr_v = con_y;
pac_u->bod_u.con_y = con_y;
pac_u->bod_u.con_w = con_w;
else {
u3z(lon); u3z(bod);
@ -855,101 +884,28 @@ _ames_lane_scry_cb(void* vod_p, u3_noun nun)
/* _ames_recv_cb(): receive callback.
/* _ames_try_forward(): try to forward a packet for another ship.
static void
_ames_recv_cb(uv_udp_t* wax_u,
ssize_t nrd_i,
const uv_buf_t * buf_u,
const struct sockaddr* adr_u,
unsigned flg_i)
_ames_try_forward(u3_ames* sam_u,
u3_lane* lan_u,
u3_head* hed_u,
u3_body* bod_u,
c3_y* hun_y)
u3_ames* sam_u = wax_u->data;
c3_o pas_o = c3y;
c3_y* byt_y = (c3_y*)buf_u->base;
c3_y* bod_y = byt_y + 4;
u3_head hed_u;
u3_weak lac;
// ensure a sane message size
// if the recipient is a galaxy, their lane is always &+~gax
if ( 4 >= nrd_i ) {
pas_o = c3n;
// unpack the packet header
_ames_sift_head(&hed_u, byt_y);
// ensure the protocol version matches ours
if ( c3y == pas_o
&& (c3y == sam_u->fit_o)
&& (sam_u->ver_y != hed_u.ver_y) )
if ( (256 > bod_u->rec_d[0])
&& (0 == bod_u->rec_d[1]) )
pas_o = c3n;
if ( 0 == (sam_u->vet_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->vet_d);
lac = u3nc(c3y, (c3_y)bod_u->rec_d[0]);
// ensure the mug is valid
// otherwise, try to get the lane from cache
if ( c3y == pas_o
&& (hed_u.mug_l != _ames_mug_body(nrd_i - 4, bod_y)) )
pas_o = c3n;
if ( 0 == (sam_u->mut_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->mut_d);
// unpack the body
c3_y sen_y = 2 << hed_u.sac_y;
c3_y rec_y = 2 << hed_u.rac_y;
c3_d sen_d[2];
c3_d rec_d[2];
c3_w con_w = nrd_i - 4 - sen_y - rec_y;
c3_y* con_y = NULL;
if ( c3y == pas_o ) {
_ames_ship_to_chubs(sen_d, sen_y, bod_y);
_ames_ship_to_chubs(rec_d, rec_y, bod_y + sen_y);
con_y = c3_malloc(con_w);
memcpy(con_y, bod_y + sen_y + rec_y, con_w);
// ensure the content is cue-able
pas_o = ur_cue_test_with(sam_u->tes_u, con_w, con_y) ? c3y : c3n;
// if we can scry,
// and we are not the recipient,
// we might want to forward statelessly
if ( c3y == pas_o
&& c3y == sam_u->see_o
&& ( (rec_d[0] != sam_u->pir_u->who_d[0])
|| (rec_d[1] != sam_u->pir_u->who_d[1]) ) )
pas_o = c3n;
u3_weak lac;
// if the recipient is a galaxy, their lane is always &+~gax
if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) {
lac = u3nc(c3y, (c3_y)rec_d[0]);
// otherwise, try to get the lane from cache
else {
lac = _ames_lane_from_cache(sam_u->lax_p, u3i_chubs(2, rec_d));
else {
lac = _ames_lane_from_cache(sam_u->lax_p, u3i_chubs(2, bod_u->rec_d));
// if we don't know the lane, and the scry queue is full,
// just drop the packet
@ -960,70 +916,182 @@ _ames_recv_cb(uv_udp_t* wax_u,
// _ames_cap_queue. as such, blocked on u3_lord_peek_cancel or w/e.
if ( (u3_none == lac) && (1000 < sam_u->foq_d) ) {
if ( 0 == (sam_u->fod_d % 10000) ) {
u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->fod_d);
// if we know there's no lane, drop the packet
else if ( u3_nul == lac ) {
// otherwise, proceed with forwarding
// proceed with forwarding
// store the packet details for later processing
// XX allocates unnecessarily when we know the lane
u3_panc* pac_u = c3_calloc(sizeof(*pac_u));
pac_u->sam_u = sam_u;
pac_u->hed_u = *hed_u;
pac_u->bod_u = *bod_u;
pac_u->ore_u = *lan_u;
pac_u->ptr_v = hun_y;
if ( 0 != sam_u->pac_u ) {
pac_u->nex_u = sam_u->pac_u;
sam_u->pac_u->pre_u = pac_u;
sam_u->pac_u = pac_u;
// if we already know the lane, just forward
if ( u3_none != lac ) {
_ames_forward(pac_u, lac);
// otherwise, there's space in the scry queue; scry the lane out of ames
else {
// store the packet details for later processing
u3_panc* pac_u = c3_calloc(sizeof(*pac_u));
pac_u->sam_u = sam_u;
pac_u->hed_u = hed_u;
pac_u->bod_u.sen_d[0] = sen_d[0];
pac_u->bod_u.sen_d[1] = sen_d[1];
pac_u->bod_u.rec_d[0] = rec_d[0];
pac_u->bod_u.rec_d[1] = rec_d[1];
pac_u->bod_u.con_w = con_w;
pac_u->bod_u.con_y = con_y;
pac_u->ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
u3_noun pax = u3nq(u3i_string("peers"),
u3dc("scot", 'p', u3i_chubs(2, bod_u->rec_d)),
u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax,
u3_nul, pax, pac_u, _ames_lane_scry_cb);
if ( 0 != sam_u->pac_u ) {
pac_u->nex_u = sam_u->pac_u;
sam_u->pac_u->pre_u = pac_u;
sam_u->pac_u = pac_u;
/* _ames_hear(): parse a (potential packet), dispatch appropriately.
static void
_ames_hear(u3_ames* sam_u,
u3_lane* lan_u,
c3_w len_w,
c3_y* hun_y)
u3_head hed_u;
u3_body bod_u;
// if we already know the lane, just forward
if ( u3_none != lac ) {
_ames_forward(pac_u, lac);
// unpack header, ensuring buffer is large enough
if ( (4 > len_w)
|| (c3n == _ames_sift_head(&hed_u, hun_y)) )
// XX track stats
// ensure the protocol version matches ours
// XX rethink
if ( (c3y == sam_u->fit_o)
&& (sam_u->ver_y != hed_u.ver_y) )
if ( 0 == (sam_u->vet_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->vet_d);
c3_w bod_w = len_w - 4;
c3_y* bod_y = hun_y + 4;
// ensure the mug is valid
if ( _ames_mug_body(bod_w, bod_y) != hed_u.mug_l ) {
if ( 0 == (sam_u->mut_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->mut_d);
// otherwise, there's space in the scry queue; scry the lane out of ames
// unpack and validate the body
if ( (c3n == _ames_sift_body(&hed_u, &bod_u, bod_w, bod_y))
|| !ur_cue_test_with(sam_u->tes_u, bod_u.con_w, bod_u.con_y) )
// XX track stats
else {
u3_noun pax = u3nq(u3i_string("peers"),
u3dc("scot", 'p', u3i_chubs(2, rec_d)),
u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax,
u3_nul, pax, pac_u, _ames_lane_scry_cb);
// if we passed the filter, inject the packet
// if we can scry,
// and we are not the recipient,
// we might want to forward statelessly
if ( c3y == pas_o ) {
u3_lane ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base);
_ames_put_packet(sam_u, msg, ore_u);
if ( (c3y == sam_u->see_o)
&& ( (bod_u.rec_d[0] != sam_u->pir_u->who_d[0])
|| (bod_u.rec_d[1] != sam_u->pir_u->who_d[1]) ) )
_ames_try_forward(sam_u, lan_u, &hed_u, &bod_u, hun_y);
// otherwise, inject the packet as an event
else {
u3_noun msg = u3i_bytes(len_w, hun_y);
_ames_put_packet(sam_u, msg, *lan_u);
/* _ames_recv_cb(): udp message receive callback.
static void
_ames_recv_cb(uv_udp_t* wax_u,
ssize_t nrd_i,
const uv_buf_t * buf_u,
const struct sockaddr* adr_u,
unsigned flg_i)
if ( 0 > nrd_i ) {
if ( u3C.wag_w & u3o_verbose ) {
u3l_log("ames: recv: fail: %s\n", uv_strerror(nrd_i));
else if ( 0 == nrd_i ) {
else if ( flg_i & UV_UDP_PARTIAL ) {
if ( u3C.wag_w & u3o_verbose ) {
u3l_log("ames: recv: fail: message truncated\n");
else {
u3_ames* sam_u = wax_u->data;
struct sockaddr_in* add_u = (struct sockaddr_in*)adr_u;
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
// NB: [nrd_i] will never exceed max length from _ames_alloc()
_ames_hear(sam_u, &lan_u, (c3_w)nrd_i, (c3_y*)buf_u->base);
/* _ames_io_start(): initialize ames I/O.