diff --git a/pkg/urbit/vere/io/ames.c b/pkg/urbit/vere/io/ames.c index 4d544425d..ae0656c3f 100644 --- a/pkg/urbit/vere/io/ames.c +++ b/pkg/urbit/vere/io/ames.c @@ -14,12 +14,11 @@ #include "vere/vere.h" #include "ur/serial.h" -/* u3_pact: ames packet, coming or going. +/* u3_pact: outbound ames packet. */ typedef struct _u3_pact { uv_udp_send_t snd_u; // udp send request - c3_w pip_w; // target IPv4 address - c3_s por_s; // target port + u3_lane lan_u; // destination lane c3_w len_w; // length in bytes c3_y* hun_y; // packet buffer c3_y imp_y; // galaxy number (optional) @@ -32,30 +31,36 @@ typedef struct _u3_ames { // packet network state u3_auto car_u; // driver u3_pier* pir_u; // pier - union { // + union { // uv udp handle uv_udp_t wax_u; // uv_handle_t had_u; // }; // ur_cue_test_t* tes_u; // cue-test handle u3_cue_xeno* sil_u; // cue handle c3_c* dns_c; // domain XX multiple/fallback - c3_d dop_d; // drop count - c3_d fal_d; // crash count + c3_y ver_y; // protocol version + u3p(u3h_root) lax_p; // lane scry cache + struct _u3_panc* pac_u; // packets pending forwards c3_w imp_w[256]; // imperial IPs time_t imp_t[256]; // imperial IP timestamps c3_o imp_o[256]; // imperial print status - c3_o net_o; // can send - c3_o see_o; // can scry - c3_d saw_d; // successive scry failures - c3_o fit_o; // filtering active - c3_y ver_y; // protocol version - u3p(u3h_root) lax_p; // lane scry cache - c3_d vet_d; // version mismatches filtered - c3_d mut_d; // invalid mugs filtered - struct _u3_panc* pac_u; // packets pending forwards - c3_d foq_d; // forward queue size - c3_d fow_d; // forwarded count - c3_d fod_d; // forwards dropped count + struct { // config: + c3_o net_o; // can send + c3_o see_o; // can scry + c3_o fit_o; // filtering active + } fig_u; // + struct { // stats: + c3_d dop_d; // drop count + c3_d fal_d; // crash count + c3_d saw_d; // successive scry failures + c3_d hed_d; // failed to read header + c3_d vet_d; // version mismatches filtered + c3_d mut_d; // invalid mugs filtered + c3_d bod_d; // failed to read body + c3_d foq_d; // forward queue size + c3_d fow_d; // forwarded count + c3_d fod_d; // forwards dropped count + } sat_u; // } u3_ames; /* u3_head: ames packet header @@ -86,6 +91,7 @@ u3_lane ore_u; // origin lane u3_head hed_u; // header u3_body bod_u; // body + void* ptr_v; // buffer (to free) } u3_panc; /* _ames_alloc(): libuv buffer allocator. @@ -116,31 +122,193 @@ _ames_pact_free(u3_pact* pac_u) /* _ames_panc_free(): remove references, lose refcounts and free struct */ static void -_ames_panc_free(u3_panc* pac_u) { +_ames_panc_free(u3_panc* pac_u) +{ if ( 0 != pac_u->nex_u ) { pac_u->nex_u->pre_u = pac_u->pre_u; } + if ( 0 != pac_u->pre_u ) { pac_u->pre_u->nex_u = pac_u->nex_u; - } else { + } + else { c3_assert(pac_u == pac_u->sam_u->pac_u); pac_u->sam_u->pac_u = pac_u->nex_u; } - c3_free(pac_u->bod_u.con_y); + c3_free(pac_u->ptr_v); c3_free(pac_u); } -/* _ca_mug_body(): truncated mug hash of bytes +/* _ames_mug_body(): truncated (20 least-significant bits) mug hash of bytes */ static c3_l -_ca_mug_body(c3_w len_w, c3_y* byt_y) +_ames_mug_body(c3_w len_w, c3_y* byt_y) { - // mask off ((1 << 20) - 1) - // return u3r_mug_bytes(byt_y, len_w) & 0xfffff; } +/* _ames_sift_head(): parse packet header. +*/ +static c3_o +_ames_sift_head(u3_head* hed_u, c3_y buf_y[4]) +{ + c3_w hed_w = buf_y[0] + | (buf_y[1] << 8) + | (buf_y[2] << 16) + | (buf_y[3] << 24); + + // XX only version 0 currently recognized + // + hed_u->ver_y = hed_w & 0x7; + hed_u->mug_l = (hed_w >> 3) & 0xfffff; // 20 bits + hed_u->sac_y = (hed_w >> 23) & 0x3; + hed_u->rac_y = (hed_w >> 25) & 0x3; + hed_u->enc_o = (hed_w >> 27) & 0x1; + return c3y; +} + +/* _ames_etch_head(): serialize packet header. +*/ +static void +_ames_etch_head(u3_head* hed_u, c3_y buf_y[4]) +{ + c3_w hed_w = hed_u->ver_y + | (hed_u->mug_l << 3) + | (hed_u->sac_y << 23) + | (hed_u->rac_y << 25) + | (hed_u->enc_o << 27); + + // only version 0 currently recognized + // + c3_assert( 0 == hed_u->ver_y ); + + buf_y[0] = hed_w & 0xff; + buf_y[1] = (hed_w >> 8) & 0xff; + buf_y[2] = (hed_w >> 16) & 0xff; + buf_y[3] = (hed_w >> 24) & 0xff; +} + +/* _ames_chub_bytes(): c3_y[8] to c3_d +** XX factor out, deduplicate with other conversions +*/ +static inline c3_d +_ames_chub_bytes(c3_y byt_y[8]) +{ + return (c3_d)byt_y[0] + | (c3_d)byt_y[1] << 8 + | (c3_d)byt_y[2] << 16 + | (c3_d)byt_y[3] << 24 + | (c3_d)byt_y[4] << 32 + | (c3_d)byt_y[5] << 40 + | (c3_d)byt_y[6] << 48 + | (c3_d)byt_y[7] << 56; +} + +/* _ames_ship_to_chubs(): pack [len_y] bytes into c3_d[2] +*/ +static inline void +_ames_ship_to_chubs(c3_d sip_d[2], c3_y len_y, c3_y* buf_y) +{ + c3_y sip_y[16] = {0}; + memcpy(sip_y, buf_y, c3_min(16, len_y)); + + sip_d[0] = _ames_chub_bytes(sip_y); + sip_d[1] = _ames_chub_bytes(sip_y + 8); +} + +/* _ames_chub_bytes(): c3_d to c3_y[8] +** XX factor out, deduplicate with other conversions +*/ +static inline void +_ames_bytes_chub(c3_y byt_y[8], c3_d num_d) +{ + byt_y[0] = num_d & 0xff; + byt_y[1] = (num_d >> 8) & 0xff; + byt_y[2] = (num_d >> 16) & 0xff; + byt_y[3] = (num_d >> 24) & 0xff; + byt_y[4] = (num_d >> 32) & 0xff; + byt_y[5] = (num_d >> 40) & 0xff; + byt_y[6] = (num_d >> 48) & 0xff; + byt_y[7] = (num_d >> 56) & 0xff; +} + +/* _ames_ship_of_chubs(): unpack c3_d[2] into [len_y] bytes. +*/ +static inline void +_ames_ship_of_chubs(c3_d sip_d[2], c3_y len_y, c3_y* buf_y) +{ + c3_y sip_y[16] = {0}; + + _ames_bytes_chub(sip_y, sip_d[0]); + _ames_bytes_chub(sip_y + 8, sip_d[1]); + + memcpy(buf_y, sip_y, c3_min(16, len_y)); +} + +/* _ames_sift_body(): parse packet body. +*/ +static c3_o +_ames_sift_body(u3_head* hed_u, + u3_body* bod_u, + c3_w len_w, + c3_y* bod_y) +{ + c3_y sen_y = 2 << hed_u->sac_y; + c3_y rec_y = 2 << hed_u->rac_y; + + if ( (sen_y + rec_y) >= len_w ) { + return c3n; + } + else { + _ames_ship_to_chubs(bod_u->sen_d, sen_y, bod_y); + _ames_ship_to_chubs(bod_u->rec_d, rec_y, bod_y + sen_y); + + bod_u->con_w = len_w - sen_y - rec_y; + bod_u->con_y = bod_y + sen_y + rec_y; + return c3y; + } +} + +/* _ames_etch_pack(): serialize packet header and body. +*/ +static c3_w +_ames_etch_pack(u3_head* hed_u, + u3_body* bod_u, + c3_o mug_o, + c3_y** out_y) +{ + // start with the body + // + c3_y sen_y = 2 << hed_u->sac_y; // sender len + c3_y rec_y = 2 << hed_u->rac_y; // receiver len + c3_w bod_w = sen_y + rec_y + bod_u->con_w; // body len + c3_w len_w = 4 + bod_w; // packet len + c3_y* pac_y = c3_malloc(len_w); + c3_y* bod_y = pac_y + 4; + + _ames_ship_of_chubs(bod_u->sen_d, sen_y, bod_y); + _ames_ship_of_chubs(bod_u->rec_d, rec_y, bod_y + sen_y); + + { + c3_y* con_y = bod_y + sen_y + rec_y; + memcpy(con_y, bod_u->con_y, bod_u->con_w); + } + + // if we updated the origin lane, we need to update the mug too + // + if ( c3y == mug_o ) { + hed_u->mug_l = _ames_mug_body(bod_w, bod_y); + } + + // now we can serialize the head + // + _ames_etch_head(hed_u, pac_y); + + *out_y = pac_y; + return len_w; +} + /* _ames_send_cb(): send callback. */ static void @@ -149,12 +317,12 @@ _ames_send_cb(uv_udp_send_t* req_u, c3_i sas_i) u3_pact* pac_u = (u3_pact*)req_u; u3_ames* sam_u = pac_u->sam_u; - if ( sas_i && (c3y == sam_u->net_o) ) { + if ( sas_i && (c3y == sam_u->fig_u.net_o) ) { u3l_log("ames: send fail: %s\n", uv_strerror(sas_i)); - sam_u->net_o = c3n; + sam_u->fig_u.net_o = c3n; } else { - sam_u->net_o = c3y; + sam_u->fig_u.net_o = c3y; } _ames_pact_free(pac_u); @@ -176,8 +344,8 @@ _ames_send(u3_pact* pac_u) memset(&add_u, 0, sizeof(add_u)); add_u.sin_family = AF_INET; - add_u.sin_addr.s_addr = htonl(pac_u->pip_w); - add_u.sin_port = htons(pac_u->por_s); + add_u.sin_addr.s_addr = htonl(pac_u->lan_u.pip_w); + add_u.sin_port = htons(pac_u->lan_u.por_s); { uv_buf_t buf_u = uv_buf_init((c3_c*)pac_u->hun_y, pac_u->len_w); @@ -188,9 +356,9 @@ _ames_send(u3_pact* pac_u) _ames_send_cb); if ( sas_i ) { - if ( c3y == sam_u->net_o ) { + if ( c3y == sam_u->fig_u.net_o ) { u3l_log("ames: send fail: %s\n", uv_strerror(sas_i)); - sam_u->net_o = c3n; + sam_u->fig_u.net_o = c3n; } _ames_pact_free(pac_u); @@ -199,95 +367,6 @@ _ames_send(u3_pact* pac_u) } } -/* _ames_czar_port(): udp port for galaxy. -*/ -static c3_s -_ames_czar_port(c3_y imp_y) -{ - if ( c3n == u3_Host.ops_u.net ) { - return 31337 + imp_y; - } - else { - return 13337 + imp_y; - } -} - -/* _ames_czar_gone(): galaxy address resolution failed. -*/ -static void -_ames_czar_gone(u3_pact* pac_u, time_t now) -{ - u3_ames* sam_u = pac_u->sam_u; - - if ( c3y == sam_u->imp_o[pac_u->imp_y] ) { - u3l_log("ames: czar at %s: not found (b)\n", pac_u->dns_c); - sam_u->imp_o[pac_u->imp_y] = c3n; - } - - if ( (0 == sam_u->imp_w[pac_u->imp_y]) || - (0xffffffff == sam_u->imp_w[pac_u->imp_y]) ) - { - sam_u->imp_w[pac_u->imp_y] = 0xffffffff; - } - - // keep existing ip for 5 more minutes - // - sam_u->imp_t[pac_u->imp_y] = now; - - _ames_pact_free(pac_u); -} - -/* _ames_czar_cb(): galaxy address resolution callback. -*/ -static void -_ames_czar_cb(uv_getaddrinfo_t* adr_u, - c3_i sas_i, - struct addrinfo* aif_u) -{ - u3_pact* pac_u = (u3_pact*)adr_u->data; - u3_ames* sam_u = pac_u->sam_u; - time_t now = time(0); - - struct addrinfo* rai_u = aif_u; - - while ( 1 ) { - if ( !rai_u ) { - _ames_czar_gone(pac_u, now); - break; - } - - if ( (AF_INET == rai_u->ai_family) ) { - struct sockaddr_in* add_u = (struct sockaddr_in *)rai_u->ai_addr; - c3_w old_w = sam_u->imp_w[pac_u->imp_y]; - - sam_u->imp_w[pac_u->imp_y] = ntohl(add_u->sin_addr.s_addr); - sam_u->imp_t[pac_u->imp_y] = now; - sam_u->imp_o[pac_u->imp_y] = c3y; - -#if 1 - if ( sam_u->imp_w[pac_u->imp_y] != old_w - && sam_u->imp_w[pac_u->imp_y] != 0xffffffff ) { - u3_noun wad = u3i_words(1, &sam_u->imp_w[pac_u->imp_y]); - u3_noun nam = u3dc("scot", c3__if, wad); - c3_c* nam_c = u3r_string(nam); - - u3l_log("ames: czar %s: ip %s\n", pac_u->dns_c, nam_c); - - c3_free(nam_c); u3z(nam); - } -#endif - - _ames_send(pac_u); - break; - } - - rai_u = rai_u->ai_next; - } - - c3_free(adr_u); - uv_freeaddrinfo(aif_u); -} - /* u3_ames_decode_lane(): deserialize noun to lane */ u3_lane @@ -316,17 +395,6 @@ u3_ames_encode_lane(u3_lane lan) { return u3ke_jam(u3nt(c3__ipv4, u3i_words(1, &lan.pip_w), lan.por_s)); } -/* _ames_lane_from_sockaddr(): sockaddr_in to lane struct -*/ -static u3_lane -_ames_lane_from_sockaddr(struct sockaddr_in* add_u) -{ - u3_lane lan_u; - lan_u.por_s = ntohs(add_u->sin_port); - lan_u.pip_w = ntohl(add_u->sin_addr.s_addr); - return lan_u; -} - /* _ames_lane_into_cache(): put las for who into cache, including timestamp */ static void @@ -370,8 +438,6 @@ _ames_lane_from_cache(u3p(u3h_root) lax_p, u3_noun who) { static u3_noun _ames_serialize_packet(u3_panc* pac_u, c3_o dop_o) { - c3_y sen_y = 2 << pac_u->hed_u.sac_y; - c3_y rec_y = 2 << pac_u->hed_u.rac_y; c3_o nal_o = c3n; // update the body's lane, if desired @@ -398,16 +464,27 @@ _ames_serialize_packet(u3_panc* pac_u, c3_o dop_o) // always using the appropriate galaxy lane instead. // if ( u3_nul == lon ) { + c3_w con_w; + c3_y* con_y; + u3z(lon); lon = u3nt(u3_nul, c3n, u3_ames_encode_lane(pac_u->ore_u)); nal_o = c3y; - c3_free(pac_u->bod_u.con_y); - u3_noun jam = u3ke_jam(u3nc(lon, bod)); - pac_u->bod_u.con_w = u3r_met(3, jam); - pac_u->bod_u.con_y = c3_malloc(pac_u->bod_u.con_w); - u3r_bytes(0, pac_u->bod_u.con_w, pac_u->bod_u.con_y, jam); - u3z(jam); + // XX off-loom jam? + // + { + u3_noun jam = u3ke_jam(u3nc(lon, bod)); + con_w = u3r_met(3, jam); + con_y = c3_malloc(con_w); + u3r_bytes(0, con_w, con_y, jam); + u3z(jam); + } + + c3_free(pac_u->ptr_v); + pac_u->ptr_v = con_y; + pac_u->bod_u.con_y = con_y; + pac_u->bod_u.con_w = con_w; } else { u3z(lon); u3z(bod); @@ -416,66 +493,135 @@ _ames_serialize_packet(u3_panc* pac_u, c3_o dop_o) // serialize the packet // - u3_noun pac; + // XX serialize on stack? + // { - // start with the body - // - u3_body* bod_u = &pac_u->bod_u; - c3_y* pac_y = c3_malloc(4 + sen_y + rec_y + bod_u->con_w); - { - u3_atom sen = u3i_chubs(2, bod_u->sen_d); - u3_atom rec = u3i_chubs(2, bod_u->rec_d); - u3r_bytes(0, sen_y, pac_y + 4, sen); - u3r_bytes(0, rec_y, pac_y + 4 + sen_y, rec); - u3z(sen); u3z(rec); - } - memcpy(pac_y + 4 + sen_y + rec_y, bod_u->con_y, bod_u->con_w); - - // if we updated the origin lane, we need to update the mug too - // - if ( c3y == nal_o ) { - pac_u->hed_u.mug_l = _ca_mug_body(sen_y + rec_y + bod_u->con_w, - pac_y + 4); - } - - // now we can serialize the head - // - u3_head* hed_u = &pac_u->hed_u; - c3_w hed_w = hed_u->ver_y - | (hed_u->mug_l << 3) - | (hed_u->sac_y << 23) - | (hed_u->rac_y << 25) - | (hed_u->enc_o << 27); - - // XX assumes little-endian - // - memcpy(pac_y, &hed_w, 4); - - pac = u3i_bytes(4 + sen_y + rec_y + bod_u->con_w, pac_y); + u3_noun pac; + c3_y* pac_y; + c3_w len_w = _ames_etch_pack(&pac_u->hed_u, + &pac_u->bod_u, + nal_o, &pac_y); + pac = u3i_bytes(len_w, pac_y); c3_free(pac_y); + + return pac; + } +} + +/* _ames_czar_port(): udp port for galaxy. +*/ +static c3_s +_ames_czar_port(c3_y imp_y) +{ + if ( c3n == u3_Host.ops_u.net ) { + return 31337 + imp_y; + } + else { + return 13337 + imp_y; + } +} + +/* _ames_czar_gone(): galaxy address resolution failed. +*/ +static void +_ames_czar_gone(u3_pact* pac_u, time_t now) +{ + u3_ames* sam_u = pac_u->sam_u; + + if ( c3y == sam_u->imp_o[pac_u->imp_y] ) { + u3l_log("ames: czar at %s: not found (b)\n", pac_u->dns_c); + sam_u->imp_o[pac_u->imp_y] = c3n; } - return pac; + if ( (0 == sam_u->imp_w[pac_u->imp_y]) || + (0xffffffff == sam_u->imp_w[pac_u->imp_y]) ) + { + sam_u->imp_w[pac_u->imp_y] = 0xffffffff; + } + + // keep existing ip for 5 more minutes + // + sam_u->imp_t[pac_u->imp_y] = now; + + _ames_pact_free(pac_u); +} + +/* _ames_czar_here(): galaxy address resolution succeeded. +*/ +static void +_ames_czar_here(u3_pact* pac_u, time_t now, struct sockaddr_in* add_u) +{ + u3_ames* sam_u = pac_u->sam_u; + c3_w old_w = sam_u->imp_w[pac_u->imp_y]; + c3_w pip_w = ntohl(add_u->sin_addr.s_addr); + + if ( pip_w != old_w ) { + u3_noun nam = u3dc("scot", c3__if, u3i_word(pip_w)); + c3_c* nam_c = u3r_string(nam); + + u3l_log("ames: czar %s: ip %s\n", pac_u->dns_c, nam_c); + + c3_free(nam_c); + u3z(nam); + } + + sam_u->imp_w[pac_u->imp_y] = pip_w; + sam_u->imp_t[pac_u->imp_y] = now; + sam_u->imp_o[pac_u->imp_y] = c3y; + + pac_u->lan_u.pip_w = pip_w; + _ames_send(pac_u); +} + +/* _ames_czar_cb(): galaxy address resolution callback. +*/ +static void +_ames_czar_cb(uv_getaddrinfo_t* adr_u, + c3_i sas_i, + struct addrinfo* aif_u) +{ + { + u3_pact* pac_u = (u3_pact*)adr_u->data; + struct addrinfo* rai_u = aif_u; + time_t now = time(0); + + while ( rai_u ) { + if ( (AF_INET == rai_u->ai_family) ) { + _ames_czar_here(pac_u, now, (struct sockaddr_in *)rai_u->ai_addr); + break; + } + else { + rai_u = rai_u->ai_next; + } + } + + if ( !rai_u ) { + _ames_czar_gone(pac_u, now); + } + } + + c3_free(adr_u); + uv_freeaddrinfo(aif_u); } /* _ames_czar(): galaxy address resolution. */ static void -_ames_czar(u3_pact* pac_u, c3_c* bos_c) +_ames_czar(u3_pact* pac_u) { u3_ames* sam_u = pac_u->sam_u; - pac_u->por_s = _ames_czar_port(pac_u->imp_y); + pac_u->lan_u.por_s = _ames_czar_port(pac_u->imp_y); if ( c3n == u3_Host.ops_u.net ) { - pac_u->pip_w = 0x7f000001; + pac_u->lan_u.pip_w = 0x7f000001; _ames_send(pac_u); return; } // if we don't have a galaxy domain, no-op // - if ( 0 == bos_c ) { + if ( !sam_u->dns_c ) { u3_noun nam = u3dc("scot", 'p', pac_u->imp_y); c3_c* nam_c = u3r_string(nam); u3l_log("ames: no galaxy domain for %s, no-op\r\n", nam_c); @@ -485,48 +631,64 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c) return; } - time_t now = time(0); - - // backoff - if ( (0xffffffff == sam_u->imp_w[pac_u->imp_y]) && - (now - sam_u->imp_t[pac_u->imp_y]) < 300 ) { - _ames_pact_free(pac_u); - return; - } - - if ( (0 == sam_u->imp_w[pac_u->imp_y]) || - (now - sam_u->imp_t[pac_u->imp_y]) > 300 ) { /* 5 minute TTL */ - u3_noun nam = u3dc("scot", 'p', pac_u->imp_y); - c3_c* nam_c = u3r_string(nam); - // XX remove extra byte for '~' - pac_u->dns_c = c3_malloc(1 + strlen(bos_c) + 1 + strlen(nam_c)); - - snprintf(pac_u->dns_c, 256, "%s.%s", nam_c + 1, bos_c); - // u3l_log("czar %s, dns %s\n", nam_c, pac_u->dns_c); - - c3_free(nam_c); - u3z(nam); + { + c3_w pip_w = sam_u->imp_w[pac_u->imp_y]; + time_t wen = sam_u->imp_t[pac_u->imp_y]; + time_t now = time(0); + // backoff for 5 minutes after failed lookup + // + if ( ( now < wen ) // time shenanigans! + || ( (0xffffffff == pip_w) // sentinal ip address + && ((now - wen) < 300) ) ) { - uv_getaddrinfo_t* adr_u = c3_malloc(sizeof(*adr_u)); - adr_u->data = pac_u; - + _ames_pact_free(pac_u); + return; + } + // cached addresses have a 5 minute TTL + // + else if ( (0 != pip_w) && ((now - wen) < 300) ) { + pac_u->lan_u.pip_w = pip_w; + _ames_send(pac_u); + return; + } + else { c3_i sas_i; - if ( 0 != (sas_i = uv_getaddrinfo(u3L, adr_u, - _ames_czar_cb, - pac_u->dns_c, 0, 0)) ) { - u3l_log("ames: %s\n", uv_strerror(sas_i)); - _ames_czar_gone(pac_u, now); + { + u3_noun nam = u3dc("scot", 'p', pac_u->imp_y); + c3_c* nam_c = u3r_string(nam); + + // NB: . separator not counted, as [nam_c] includes a ~ that we skip + // + pac_u->dns_c = c3_malloc(1 + strlen(nam_c) + strlen(sam_u->dns_c)); + sas_i = snprintf(pac_u->dns_c, 255, "%s.%s", nam_c + 1, sam_u->dns_c); + + c3_free(nam_c); + u3z(nam); + } + + if ( 255 <= sas_i ) { + u3l_log("ames: czar: galaxy domain %s truncated\n", sam_u->dns_c); + _ames_pact_free(pac_u); return; } + + { + uv_getaddrinfo_t* adr_u = c3_malloc(sizeof(*adr_u)); + adr_u->data = pac_u; + + if ( 0 != (sas_i = uv_getaddrinfo(u3L, adr_u, + _ames_czar_cb, + pac_u->dns_c, 0, 0)) ) + { + u3l_log("ames: %s\n", uv_strerror(sas_i)); + _ames_czar_gone(pac_u, now); + return; + } + } } } - else { - pac_u->pip_w = sam_u->imp_w[pac_u->imp_y]; - _ames_send(pac_u); - return; - } } /* _ames_ef_send(): send packet to network (v4). @@ -558,7 +720,7 @@ _ames_ef_send(u3_ames* sam_u, u3_noun lan, u3_noun pac) c3_assert( val < 256 ); pac_u->imp_y = val; - _ames_czar(pac_u, sam_u->dns_c); + _ames_czar(pac_u); } // non-galaxy lane // @@ -575,9 +737,7 @@ _ames_ef_send(u3_ames* sam_u, u3_noun lan, u3_noun pac) // otherwise, mutate destination and send packet // else { - pac_u->pip_w = lan_u.pip_w; - pac_u->por_s = lan_u.por_s; - + pac_u->lan_u = lan_u; _ames_send(pac_u); } } @@ -596,20 +756,20 @@ _ames_cap_queue(u3_ames* sam_u) if ( c3__hear == u3h(egg_u->cad) ) { u3_auto_drop(&sam_u->car_u, egg_u); - sam_u->dop_d++; + sam_u->sat_u.dop_d++; if ( u3C.wag_w & u3o_verbose ) { - u3l_log("ames: packet dropped (%" PRIu64 " total)\n", sam_u->dop_d); + u3l_log("ames: packet dropped (%" PRIu64 " total)\n", sam_u->sat_u.dop_d); } } egg_u = nex_u; } - if ( (sam_u->dop_d && (0 == (sam_u->dop_d % 1000))) + if ( (sam_u->sat_u.dop_d && (0 == (sam_u->sat_u.dop_d % 1000))) && !(u3C.wag_w & u3o_verbose) ) { - u3l_log("ames: packet dropped (%" PRIu64 " total)\n", sam_u->dop_d); + u3l_log("ames: packet dropped (%" PRIu64 " total)\n", sam_u->sat_u.dop_d); } } @@ -642,19 +802,19 @@ static void _ames_hear_bail(u3_ovum* egg_u, u3_noun lud) { u3_ames* sam_u = (u3_ames*)egg_u->car_u; - sam_u->fal_d++; + sam_u->sat_u.fal_d++; if ( (u3C.wag_w & u3o_verbose) - || (0 == (sam_u->fal_d % 1000)) ) + || (0 == (sam_u->sat_u.fal_d % 1000)) ) { _ames_punt_goof(lud); - u3l_log("ames: packet failed (%" PRIu64 " total)\n\n", sam_u->fal_d); + u3l_log("ames: packet failed (%" PRIu64 " total)\n\n", sam_u->sat_u.fal_d); } else { u3z(lud); - if ( 0 == (sam_u->fal_d % 1000) ) { - u3l_log("ames: packet failed (%" PRIu64 " total)\n\n", sam_u->fal_d); + if ( 0 == (sam_u->sat_u.fal_d % 1000) ) { + u3l_log("ames: packet failed (%" PRIu64 " total)\n\n", sam_u->sat_u.fal_d); } } @@ -684,22 +844,70 @@ _ames_put_packet(u3_ames* sam_u, static void _ames_forward(u3_panc* pac_u, u3_noun las) { - pac_u->sam_u->fow_d++; - if ( 0 == (pac_u->sam_u->fow_d % 1000000) ) { - u3l_log("ames: forwarded %" PRIu64 " total\n", pac_u->sam_u->fow_d); + u3_ames* sam_u = pac_u->sam_u; + + sam_u->sat_u.fow_d++; + if ( 0 == (sam_u->sat_u.fow_d % 1000000) ) { + u3l_log("ames: forwarded %" PRIu64 " total\n", sam_u->sat_u.fow_d); + } + + if ( u3C.wag_w & u3o_verbose ) { + u3_noun sen = u3dc("scot", 'p', u3i_chubs(2, pac_u->bod_u.sen_d)); + u3_noun rec = u3dc("scot", 'p', u3i_chubs(2, pac_u->bod_u.rec_d)); + c3_c* sen_c = u3r_string(sen); + c3_c* rec_c = u3r_string(rec); + c3_y* pip_y = (c3_y*)&pac_u->ore_u.pip_w; + + u3l_log("ames: forwarding for %s to %s from %d.%d.%d.%d:%d\n", + sen_c, rec_c, + pip_y[0], pip_y[1], pip_y[2], pip_y[3], + pac_u->ore_u.por_s); + + c3_free(sen_c); c3_free(rec_c); + u3z(sen); u3z(rec); } { - u3_noun los = las; u3_noun pac = _ames_serialize_packet(pac_u, c3y); - while ( u3_nul != las ) { - _ames_ef_send(pac_u->sam_u, u3k(u3h(las)), u3k(pac)); - las = u3t(las); + u3_noun tag, dat, lan, t = las; + + while ( u3_nul != t ) { + u3x_cell(t, &lan, &t); + + // validate lane and skip self if galaxy + // + if ( c3n == u3r_cell(lan, &tag, &dat) ) { + u3l_log("ames: bogus lane\n"); + u3m_p("lan", lan); + } + else { + c3_o sen_o = c3y; + c3_d who_d[2]; + + if ( c3y == tag ) { + u3r_chubs(0, 2, who_d, dat); + + if ( (who_d[0] == sam_u->pir_u->who_d[0]) + && (who_d[1] == sam_u->pir_u->who_d[1]) ) + { + sen_o = c3n; + if ( u3C.wag_w & u3o_verbose ) { + u3l_log("ames: forward skipping self\n"); + } + } + } + + if ( c3y == sen_o ) { + _ames_ef_send(sam_u, u3k(lan), u3k(pac)); + } + } } - u3z(los); u3z(pac); + + u3z(pac); } _ames_panc_free(pac_u); + u3z(las); } /* _ames_lane_scry_cb(): learn lane to forward packet on @@ -708,28 +916,27 @@ static void _ames_lane_scry_cb(void* vod_p, u3_noun nun) { u3_panc* pac_u = vod_p; - u3_weak las = u3r_at(7, nun); + u3_ames* sam_u = pac_u->sam_u; + u3_weak las = u3r_at(7, nun); - pac_u->sam_u->foq_d--; + sam_u->sat_u.foq_d--; // if scry fails, remember we can't scry, and just inject the packet // if ( u3_none == las ) { - if ( 5 < ++pac_u->sam_u->saw_d ) { + if ( 5 < ++sam_u->sat_u.saw_d ) { u3l_log("ames: giving up scry\n"); - pac_u->sam_u->see_o = c3n; + sam_u->fig_u.see_o = c3n; } - _ames_put_packet(pac_u->sam_u, - _ames_serialize_packet(pac_u, c3n), - pac_u->ore_u); + _ames_put_packet(sam_u, _ames_serialize_packet(pac_u, c3n), pac_u->ore_u); _ames_panc_free(pac_u); } else { - pac_u->sam_u->saw_d = 0; + sam_u->sat_u.saw_d = 0; // cache the scry result for later use // - _ames_lane_into_cache(pac_u->sam_u->lax_p, + _ames_lane_into_cache(sam_u->lax_p, u3i_chubs(2, pac_u->bod_u.rec_d), u3k(las)); @@ -748,114 +955,28 @@ _ames_lane_scry_cb(void* vod_p, u3_noun nun) u3z(nun); } -/* _ames_recv_cb(): receive callback. +/* _ames_try_forward(): try to forward a packet for another ship. */ static void -_ames_recv_cb(uv_udp_t* wax_u, - ssize_t nrd_i, - const uv_buf_t * buf_u, - const struct sockaddr* adr_u, - unsigned flg_i) +_ames_try_forward(u3_ames* sam_u, + u3_lane* lan_u, + u3_head* hed_u, + u3_body* bod_u, + c3_y* hun_y) { - u3_ames* sam_u = wax_u->data; - c3_o pas_o = c3y; - c3_y* byt_y = (c3_y*)buf_u->base; - c3_y* bod_y = byt_y + 4; - u3_head hed_u; + u3_weak lac; - // ensure a sane message size + // if the recipient is a galaxy, their lane is always &+~gax // - if ( 4 >= nrd_i ) { - pas_o = c3n; + if ( (256 > bod_u->rec_d[0]) + && (0 == bod_u->rec_d[1]) ) + { + lac = u3nc(c3y, (c3_y)bod_u->rec_d[0]); } - // unpack the packet header + // otherwise, try to get the lane from cache // else { - c3_w hed_w = (byt_y[0] << 0) - | (byt_y[1] << 8) - | (byt_y[2] << 16) - | (byt_y[3] << 24); - - hed_u.ver_y = hed_w & 0x7; - hed_u.mug_l = (hed_w >> 3) & 0xfffff; //NOTE ((1 << 20) - 1) - hed_u.sac_y = (hed_w >> 23) & 0x3; - hed_u.rac_y = (hed_w >> 25) & 0x3; - hed_u.enc_o = (hed_w >> 27) & 0x1; - } - - // ensure the protocol version matches ours - // - if ( c3y == pas_o - && (c3y == sam_u->fit_o) - && (sam_u->ver_y != hed_u.ver_y) ) - { - pas_o = c3n; - - sam_u->vet_d++; - if ( 0 == (sam_u->vet_d % 100) ) { - u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->vet_d); - } - } - - // ensure the mug is valid - // - if ( c3y == pas_o - && (hed_u.mug_l != _ca_mug_body(nrd_i - 4, bod_y)) ) - { - pas_o = c3n; - - sam_u->mut_d++; - if ( 0 == (sam_u->mut_d % 100) ) { - u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->mut_d); - } - } - - // unpack the body - // - c3_y sen_y = 2 << hed_u.sac_y; - c3_y rec_y = 2 << hed_u.rac_y; - c3_d sen_d[2]; - c3_d rec_d[2]; - c3_w con_w = nrd_i - 4 - sen_y - rec_y; - c3_y* con_y = NULL; - - if ( c3y == pas_o ) { - u3_noun sen = u3i_bytes(sen_y, bod_y); - u3_noun rec = u3i_bytes(rec_y, bod_y + sen_y); - u3r_chubs(0, 2, rec_d, rec); - u3r_chubs(0, 2, sen_d, sen); - u3z(sen); u3z(rec); - - con_y = c3_malloc(con_w); - memcpy(con_y, bod_y + sen_y + rec_y, con_w); - - // ensure the content is cue-able - // - pas_o = ur_cue_test_with(sam_u->tes_u, con_w, con_y) ? c3y : c3n; - } - - // if we can scry, - // and we are not the recipient, - // we might want to forward statelessly - // - if ( c3y == pas_o - && c3y == sam_u->see_o - && ( (rec_d[0] != sam_u->pir_u->who_d[0]) - || (rec_d[1] != sam_u->pir_u->who_d[1]) ) ) - { - pas_o = c3n; - - u3_weak lac; - // if the recipient is a galaxy, their lane is always &+~gax - // - if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) { - lac = u3nc(c3y, (c3_y)rec_d[0]); - } - // otherwise, try to get the lane from cache - // - else { - lac = _ames_lane_from_cache(sam_u->lax_p, u3i_chubs(2, rec_d)); - } + lac = _ames_lane_from_cache(sam_u->lax_p, u3i_chubs(2, bod_u->rec_d)); // if we don't know the lane, and the scry queue is full, // just drop the packet @@ -865,71 +986,202 @@ _ames_recv_cb(uv_udp_t* wax_u, // so can't pluck these out of the event queue like it does in // _ames_cap_queue. as such, blocked on u3_lord_peek_cancel or w/e. // - if ( (u3_none == lac) && (1000 < sam_u->foq_d) ) { - c3_free(con_y); - - sam_u->fod_d++; - if ( 0 == (sam_u->fod_d % 10000) ) { - u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->fod_d); + if ( (u3_none == lac) && (1000 < sam_u->sat_u.foq_d) ) { + sam_u->sat_u.fod_d++; + if ( 0 == (sam_u->sat_u.fod_d % 10000) ) { + u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->sat_u.fod_d); } + + c3_free(hun_y); + return; } // if we know there's no lane, drop the packet // else if ( u3_nul == lac ) { - c3_free(con_y); - u3z(lac); + c3_free(hun_y); + return; } - // otherwise, proceed with forwarding + } + + // proceed with forwarding + // + { + // store the packet details for later processing + // + // XX allocates unnecessarily when we know the lane + // + u3_panc* pac_u = c3_calloc(sizeof(*pac_u)); + pac_u->sam_u = sam_u; + pac_u->hed_u = *hed_u; + pac_u->bod_u = *bod_u; + pac_u->ore_u = *lan_u; + pac_u->ptr_v = hun_y; + + if ( 0 != sam_u->pac_u ) { + pac_u->nex_u = sam_u->pac_u; + sam_u->pac_u->pre_u = pac_u; + } + sam_u->pac_u = pac_u; + + // if we already know the lane, just forward + // + if ( u3_none != lac ) { + _ames_forward(pac_u, lac); + } + // otherwise, there's space in the scry queue; scry the lane out of ames // else { - // store the packet details for later processing - // - u3_panc* pac_u = c3_calloc(sizeof(*pac_u)); - pac_u->sam_u = sam_u; - pac_u->hed_u = hed_u; - pac_u->bod_u.sen_d[0] = sen_d[0]; - pac_u->bod_u.sen_d[1] = sen_d[1]; - pac_u->bod_u.rec_d[0] = rec_d[0]; - pac_u->bod_u.rec_d[1] = rec_d[1]; - pac_u->bod_u.con_w = con_w; - pac_u->bod_u.con_y = con_y; - pac_u->ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u); + sam_u->sat_u.foq_d++; + u3_noun pax = u3nq(u3i_string("peers"), + u3dc("scot", 'p', u3i_chubs(2, bod_u->rec_d)), + u3i_string("forward-lane"), + u3_nul); + u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax, + u3_nul, pax, pac_u, _ames_lane_scry_cb); + } + } +} - if ( 0 != sam_u->pac_u ) { - pac_u->nex_u = sam_u->pac_u; - sam_u->pac_u->pre_u = pac_u; - } - sam_u->pac_u = pac_u; +/* _ames_hear(): parse a (potential packet), dispatch appropriately. +*/ +static void +_ames_hear(u3_ames* sam_u, + u3_lane* lan_u, + c3_w len_w, + c3_y* hun_y) +{ + u3_head hed_u; + u3_body bod_u; - // if we already know the lane, just forward - // - if ( u3_none != lac ) { - _ames_forward(pac_u, lac); + // XX packet filtering needs to revised for two protocol-change scenarios + // + // - packets using old protocol versions from our sponsees + // these must be let through, and this is a transitive condition; + // they must also be forwarded where appropriate + // they can be validated, as we know their semantics + // + // - packets using newer protocol versions + // these should probably be let through, or at least + // trigger printfs suggesting upgrade. + // they cannot be filtered, as we do not know their semantics + // + + // unpack header, ensuring buffer is large enough + // + if ( (4 > len_w) + || (c3n == _ames_sift_head(&hed_u, hun_y)) ) + { + sam_u->sat_u.hed_d++; + if ( 0 == (sam_u->sat_u.hed_d % 100) ) { + u3l_log("ames: %" PRIu64 " dropped, failed to read header\n", sam_u->sat_u.hed_d); + } + + c3_free(hun_y); + return; + } + + // ensure the protocol version matches ours + // + // XX rethink use of [fit_o] here and elsewhere + // + if ( (c3y == sam_u->fig_u.fit_o) + && (sam_u->ver_y != hed_u.ver_y) ) + { + sam_u->sat_u.vet_d++; + if ( 0 == (sam_u->sat_u.vet_d % 100) ) { + u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->sat_u.vet_d); + } + + c3_free(hun_y); + return; + } + + { + c3_w bod_w = len_w - 4; + c3_y* bod_y = hun_y + 4; + + // ensure the mug is valid + // + if ( _ames_mug_body(bod_w, bod_y) != hed_u.mug_l ) { + sam_u->sat_u.mut_d++; + if ( 0 == (sam_u->sat_u.mut_d % 100) ) { + u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->sat_u.mut_d); } - // otherwise, there's space in the scry queue; scry the lane out of ames - // - else { - sam_u->foq_d++; - u3_noun pax = u3nq(u3i_string("peers"), - u3dc("scot", 'p', u3i_chubs(2, rec_d)), - u3i_string("forward-lane"), - u3_nul); - u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax, - u3_nul, pax, pac_u, _ames_lane_scry_cb); + + c3_free(hun_y); + return; + } + + // unpack and validate the body + // + if ( (c3n == _ames_sift_body(&hed_u, &bod_u, bod_w, bod_y)) + || !ur_cue_test_with(sam_u->tes_u, bod_u.con_w, bod_u.con_y) ) + { + sam_u->sat_u.bod_d++; + if ( 0 == (sam_u->sat_u.bod_d % 100) ) { + u3l_log("ames: %" PRIu64 " dropped, failed to read body\n", sam_u->sat_u.bod_d); } + + c3_free(hun_y); + return; } } - // if we passed the filter, inject the packet + // if we can scry, + // and we are not the recipient, + // we might want to forward statelessly // - if ( c3y == pas_o ) { - c3_free(con_y); - u3_lane ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u); - u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base); - _ames_put_packet(sam_u, msg, ore_u); + if ( (c3y == sam_u->fig_u.see_o) + && ( (bod_u.rec_d[0] != sam_u->pir_u->who_d[0]) + || (bod_u.rec_d[1] != sam_u->pir_u->who_d[1]) ) ) + { + _ames_try_forward(sam_u, lan_u, &hed_u, &bod_u, hun_y); } + // otherwise, inject the packet as an event + // + else { + u3_noun msg = u3i_bytes(len_w, hun_y); + c3_free(hun_y); + _ames_put_packet(sam_u, msg, *lan_u); + } +} - c3_free(buf_u->base); +/* _ames_recv_cb(): udp message receive callback. +*/ +static void +_ames_recv_cb(uv_udp_t* wax_u, + ssize_t nrd_i, + const uv_buf_t * buf_u, + const struct sockaddr* adr_u, + unsigned flg_i) +{ + if ( 0 > nrd_i ) { + if ( u3C.wag_w & u3o_verbose ) { + u3l_log("ames: recv: fail: %s\n", uv_strerror(nrd_i)); + } + c3_free(buf_u->base); + } + else if ( 0 == nrd_i ) { + c3_free(buf_u->base); + } + else if ( flg_i & UV_UDP_PARTIAL ) { + if ( u3C.wag_w & u3o_verbose ) { + u3l_log("ames: recv: fail: message truncated\n"); + } + c3_free(buf_u->base); + } + else { + u3_ames* sam_u = wax_u->data; + struct sockaddr_in* add_u = (struct sockaddr_in*)adr_u; + u3_lane lan_u; + + lan_u.por_s = ntohs(add_u->sin_port); + lan_u.pip_w = ntohl(add_u->sin_addr.s_addr); + + // NB: [nrd_i] will never exceed max length from _ames_alloc() + // + _ames_hear(sam_u, &lan_u, (c3_w)nrd_i, (c3_y*)buf_u->base); + } } /* _ames_io_start(): initialize ames I/O. @@ -1040,7 +1292,7 @@ static void _ames_prot_scry_cb(void* vod_p, u3_noun nun) { u3_ames* sam_u = vod_p; - u3_weak ver = u3r_at(7, nun); + u3_weak ver = u3r_at(7, nun); if ( u3_none == ver ) { // assume protocol version 0 @@ -1056,7 +1308,10 @@ _ames_prot_scry_cb(void* vod_p, u3_noun nun) sam_u->ver_y = ver; } - sam_u->fit_o = c3y; + // XX revise: filtering should probably be disabled if + // we get a protocol version above the latest one we know + // + sam_u->fig_u.fit_o = c3y; u3z(nun); } @@ -1079,6 +1334,10 @@ _ames_io_talk(u3_auto* car_u) // scry the protocol version out of arvo // + // XX this should be re-triggered periodically, + // or, better yet, %ames should emit a %turf + // (or some other reconfig) effect when it is reset. + // u3_pier_peek_last(car_u->pir_u, u3_nul, c3__ax, u3_nul, u3nt(u3i_string("protocol"), u3i_string("version"), u3_nul), sam_u, _ames_prot_scry_cb); @@ -1205,14 +1464,24 @@ static void _ames_io_info(u3_auto* car_u) { u3_ames* sam_u = (u3_ames*)car_u; - u3l_log(" dropped: %" PRIu64 "\n", sam_u->dop_d); - u3l_log(" forwards dropped: %" PRIu64 "\n", sam_u->fod_d); - u3l_log(" forwards pending: %" PRIu64 "\n", sam_u->foq_d); - u3l_log(" forwarded: %" PRIu64 "\n", sam_u->fow_d); - u3l_log(" filtered (ver): %" PRIu64 "\n", sam_u->vet_d); - u3l_log(" filtered (mug): %" PRIu64 "\n", sam_u->mut_d); - u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d); - u3l_log(" cached lanes: %u\n", u3to(u3h_root, sam_u->lax_p)->use_w); + +# define FLAG(a) ( (c3y == a) ? "&" : "|" ) + + u3l_log(" config:\n"); + u3l_log(" filtering: %s\n", FLAG(sam_u->fig_u.fit_o)); + u3l_log(" can send: %s\n", FLAG(sam_u->fig_u.net_o)); + u3l_log(" can scry: %s\n", FLAG(sam_u->fig_u.see_o)); + u3l_log(" counters:\n"); + u3l_log(" dropped: %" PRIu64 "\n", sam_u->sat_u.dop_d); + u3l_log(" forwards dropped: %" PRIu64 "\n", sam_u->sat_u.fod_d); + u3l_log(" forwards pending: %" PRIu64 "\n", sam_u->sat_u.foq_d); + u3l_log(" forwarded: %" PRIu64 "\n", sam_u->sat_u.fow_d); + u3l_log(" filtered (hed): %" PRIu64 "\n", sam_u->sat_u.hed_d); + u3l_log(" filtered (ver): %" PRIu64 "\n", sam_u->sat_u.vet_d); + u3l_log(" filtered (mug): %" PRIu64 "\n", sam_u->sat_u.mut_d); + u3l_log(" filtered (bod): %" PRIu64 "\n", sam_u->sat_u.bod_d); + u3l_log(" crashed: %" PRIu64 "\n", sam_u->sat_u.fal_d); + u3l_log(" cached lanes: %u\n", u3h_wyt(sam_u->lax_p)); } /* u3_ames_io_init(): initialize ames I/O. @@ -1222,11 +1491,9 @@ u3_ames_io_init(u3_pier* pir_u) { u3_ames* sam_u = c3_calloc(sizeof(*sam_u)); sam_u->pir_u = pir_u; - sam_u->dop_d = 0; - sam_u->net_o = c3y; - sam_u->see_o = c3y; - sam_u->fit_o = c3n; - sam_u->foq_d = 0; + sam_u->fig_u.net_o = c3y; + sam_u->fig_u.see_o = c3y; + sam_u->fig_u.fit_o = c3n; //NOTE some numbers on memory usage for the lane cache //