diff --git a/pkg/urbit/vere/io/ames.c b/pkg/urbit/vere/io/ames.c index b29c0709d9..04a2591dae 100644 --- a/pkg/urbit/vere/io/ames.c +++ b/pkg/urbit/vere/io/ames.c @@ -8,8 +8,14 @@ #define FINE_PAGE 16384 // packets per page TODO #define FINE_FRAG 1024 // bytes per fragment packet #define FINE_PATH_MAX 384 // longest allowed scry path +#define FINE_SCRY_MAX 10 // maximum allowed pending scry requests #define HEAD_SIZE 4 // header size in bytes +// a hack to work around the inability to delete from a hashtable +// +#define FINE_PEND 1 // scry cache sentinel value: "pending" +#define FINE_DEAD 2 // scry cache sentinel value: "dead" + /* u3_fine: fine networking */ typedef struct _u3_fine { @@ -815,7 +821,7 @@ _ames_send(u3_pact* pac_u) { u3_ames* sam_u = pac_u->sam_u; - if ( !pac_u->hun_y + if ( !pac_u->hun_y || !sam_u || !&sam_u->wax_u || !pac_u->len_w @@ -1090,7 +1096,7 @@ _ames_czar(u3_pact* pac_u) pac_u->rut_u.dns_c = c3_malloc(1 + strlen(nam_c) + strlen(sam_u->dns_c)); - sas_i = + sas_i = snprintf(pac_u->rut_u.dns_c, 255, "%s.%s", nam_c + 1, sam_u->dns_c); c3_free(nam_c); @@ -1125,14 +1131,20 @@ _ames_czar(u3_pact* pac_u) static void _fine_put_cache(u3_ames* sam_u, u3_noun pax, c3_w lop_w, u3_noun lis) { - c3_w cur_w = lop_w; - while ( lis != u3_nul ) { - u3_noun key = u3nc(u3k(pax), u3i_word(cur_w)); - u3h_put(sam_u->fin_s.sac_p, key, u3k(u3h(lis))); + if ( (FINE_PEND == lis) || (FINE_DEAD == lis) ) { + u3_noun key = u3nc(u3k(pax), u3i_word(lop_w)); + u3h_put(sam_u->fin_s.sac_p, key, lis); + } + else { + c3_w cur_w = lop_w; + while ( lis != u3_nul ) { + u3_noun key = u3nc(u3k(pax), u3i_word(cur_w)); + u3h_put(sam_u->fin_s.sac_p, key, u3k(u3h(lis))); - lis = u3t(lis); - cur_w++; - u3z(key); + lis = u3t(lis); + cur_w++; + u3z(key); + } } } @@ -1514,6 +1526,34 @@ _fine_lop(c3_w fra_w) return 1 + (((fra_w - 1) / FINE_PAGE) * FINE_PAGE); } +static u3_weak +_fine_scry_path(u3_pact* pac_u, c3_o lop_o) +{ + u3_keen* ken_u = ( + ( PACT_WAIL == pac_u->typ_y ) + ? &pac_u->wal_u.ken_u + : &pac_u->pur_u.ken_u); + + u3_noun pat; + { + u3_noun pux = u3i_string(ken_u->pat_c); + u3_noun ful = u3dc("rush", pux, u3v_wish("stap")); + if ( u3_nul == ful ) { + u3z(ful); + return u3_none; + } + pat = u3k(u3t(ful)); + u3z(ful); + } + + c3_w fra_w = ken_u->fra_w; + if ( c3y == lop_o ) { + fra_w = _fine_lop(fra_w); + } + + return u3nc(pat, u3i_word(fra_w)); +} + /* _fine_pack_scry_cb(): receive packets for datum out of fine */ static void _fine_pack_scry_cb(void* vod_p, u3_noun nun) @@ -1523,16 +1563,20 @@ static void _fine_pack_scry_cb(void* vod_p, u3_noun nun) u3_ames* sam_u = pac_u->sam_u; u3_keen* ken_u = &pac_u->pur_u.ken_u; + u3_noun pax = u3do("stab", u3i_string(ken_u->pat_c)); + c3_w lop_w = _fine_lop(ken_u->fra_w); + + // if not [~ ~ fragments], mark as dead + // u3_weak pas = u3r_at(7, nun); if( pas == u3_none ) { + _fine_put_cache(sam_u, pax, lop_w, FINE_DEAD); _ames_pact_free(pac_u); u3z(nun); return; } - u3_noun pax = u3do("stab", u3i_string(ken_u->pat_c)); - c3_w lop_w = _fine_lop(ken_u->fra_w); _fine_put_cache(sam_u, pax, lop_w, pas); // find requested fragment @@ -1570,22 +1614,14 @@ static void _fine_hear_request(u3_pact* req_u, c3_w cur_w) { u3_pact* res_u; - + u3_weak key; + if ( c3n == _fine_sift_wail(req_u, cur_w) ) { u3l_log("fine: _fine_hear_request bad wail\n"); _ames_pact_free(req_u); return; } - u3_noun pux = u3i_string(req_u->wal_u.ken_u.pat_c); - u3_noun pat = u3dc("rush", pux, u3v_wish("stap")); - if ( u3_nul == pat ) { - u3l_log("fine: bad request\n"); - _ames_pact_free(req_u); - u3z(pat); - return; - } - // fill in the parts of res_u that we know from req_u { res_u = c3_calloc(sizeof(*res_u)); @@ -1615,7 +1651,7 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w) .rog_d = 0 }; - // copy unsigned request payload into response body + // copy unsigned request payload into response body // res_u->pur_u = (u3_purr) { .ken_u = req_u->wal_u.ken_u, @@ -1628,6 +1664,15 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w) res_u->pur_u.ken_u.pat_c = c3_calloc(len_s + 1); memcpy(res_u->pur_u.ken_u.pat_c, req_u->wal_u.ken_u.pat_c, len_s); } + + // make scry cache key + // + key = _fine_scry_path(req_u, c3n); + if ( u3_none == key ) { + u3l_log("fine: bad request\n"); + _ames_pact_free(req_u); + return; + } // free incoming request // _ames_pact_free(req_u); @@ -1643,24 +1688,35 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w) // look up request in scry cache // - u3_noun key = u3nc(u3k(u3t(pat)), u3i_word(res_u->pur_u.ken_u.fra_w)); u3_weak cac = u3h_git(res_u->sam_u->fin_s.sac_p, key); - if ( u3_none == cac ) { + // already pending; drop + // + if ( FINE_PEND == cac ) { + //u3l_log("fine: pend %u %s\n", res_u->pur_u.ken_u.fra_w, + // res_u->pur_u.ken_u.pat_c); + _ames_pact_free(res_u); + } + // cache miss or a previous scry blocked; try again + // + else if ( (u3_none == cac) || (FINE_DEAD == cac) ) { //u3l_log("fine: miss %u %s\n", res_u->pur_u.ken_u.fra_w, // res_u->pur_u.ken_u.pat_c); - // cache miss, scry into arvo for a page of packets - // c3_w lop_w = _fine_lop(res_u->pur_u.ken_u.fra_w); u3_noun pax = u3nc(c3__fine, u3nq(c3__hunk, u3dc("scot", c3__ud, lop_w), u3dc("scot", c3__ud, FINE_PAGE), - u3k(u3t(pat)))); + u3k(u3h(key)))); + // mark as pending in the scry cache + // + _fine_put_cache(res_u->sam_u, u3k(u3h(key)), lop_w, FINE_PEND); + + // scry into arvo for a page of packets + // u3_pier_peek_last(res_u->sam_u->car_u.pir_u, u3_nul, c3__ax, u3_nul, pax, res_u, _fine_pack_scry_cb); - } // cache hit, fill in response meow and send // @@ -1672,10 +1728,10 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w) } else { u3l_log("fine: _fine_hear_request meow bad\n"); + _ames_pact_free(res_u); } u3z(key); - u3z(pat); } // TODO: check protocol version @@ -1819,14 +1875,14 @@ _ames_hear(u3_ames* sam_u, // check contents match mug in header // if ( c3n == _ames_check_mug(pac_u) ) { - _log_head(&pac_u->hed_u); - sam_u->sat_u.mut_d++; - // TODO: reinstate filter after debugging is over - // if ( 0 == (sam_u->sat_u.mut_d % 100000) ) { - if ( 1 ) { - u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", - sam_u->sat_u.mut_d); - } + _log_head(&pac_u->hed_u); + sam_u->sat_u.mut_d++; + // TODO: reinstate filter after debugging is over + // if ( 0 == (sam_u->sat_u.mut_d % 100000) ) { + if ( 1 ) { + u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", + sam_u->sat_u.mut_d); + } _ames_pact_free(pac_u); return; } @@ -2135,7 +2191,7 @@ _ames_kick_newt(u3_ames* sam_u, u3_noun tag, u3_noun dat) case c3__turf: { _ames_ef_turf(sam_u, u3k(dat)); ret_o = c3y; - } break; + } break; } u3z(tag); u3z(dat);