ames: drop duplicate scry request packets for pending scrys

This commit is contained in:
Ted Blackman 2022-06-01 19:16:25 -05:00
parent d6428a9023
commit 1601903f34

View File

@ -8,8 +8,14 @@
#define FINE_PAGE 16384 // packets per page TODO
#define FINE_FRAG 1024 // bytes per fragment packet
#define FINE_PATH_MAX 384 // longest allowed scry path
#define FINE_SCRY_MAX 10 // maximum allowed pending scry requests
#define HEAD_SIZE 4 // header size in bytes
// a hack to work around the inability to delete from a hashtable
//
#define FINE_PEND 1 // scry cache sentinel value: "pending"
#define FINE_DEAD 2 // scry cache sentinel value: "dead"
/* u3_fine: fine networking
*/
typedef struct _u3_fine {
@ -1125,14 +1131,20 @@ _ames_czar(u3_pact* pac_u)
static void
_fine_put_cache(u3_ames* sam_u, u3_noun pax, c3_w lop_w, u3_noun lis)
{
c3_w cur_w = lop_w;
while ( lis != u3_nul ) {
u3_noun key = u3nc(u3k(pax), u3i_word(cur_w));
u3h_put(sam_u->fin_s.sac_p, key, u3k(u3h(lis)));
if ( (FINE_PEND == lis) || (FINE_DEAD == lis) ) {
u3_noun key = u3nc(u3k(pax), u3i_word(lop_w));
u3h_put(sam_u->fin_s.sac_p, key, lis);
}
else {
c3_w cur_w = lop_w;
while ( lis != u3_nul ) {
u3_noun key = u3nc(u3k(pax), u3i_word(cur_w));
u3h_put(sam_u->fin_s.sac_p, key, u3k(u3h(lis)));
lis = u3t(lis);
cur_w++;
u3z(key);
lis = u3t(lis);
cur_w++;
u3z(key);
}
}
}
@ -1514,6 +1526,34 @@ _fine_lop(c3_w fra_w)
return 1 + (((fra_w - 1) / FINE_PAGE) * FINE_PAGE);
}
static u3_weak
_fine_scry_path(u3_pact* pac_u, c3_o lop_o)
{
u3_keen* ken_u = (
( PACT_WAIL == pac_u->typ_y )
? &pac_u->wal_u.ken_u
: &pac_u->pur_u.ken_u);
u3_noun pat;
{
u3_noun pux = u3i_string(ken_u->pat_c);
u3_noun ful = u3dc("rush", pux, u3v_wish("stap"));
if ( u3_nul == ful ) {
u3z(ful);
return u3_none;
}
pat = u3k(u3t(ful));
u3z(ful);
}
c3_w fra_w = ken_u->fra_w;
if ( c3y == lop_o ) {
fra_w = _fine_lop(fra_w);
}
return u3nc(pat, u3i_word(fra_w));
}
/* _fine_pack_scry_cb(): receive packets for datum out of fine
*/
static void _fine_pack_scry_cb(void* vod_p, u3_noun nun)
@ -1523,16 +1563,20 @@ static void _fine_pack_scry_cb(void* vod_p, u3_noun nun)
u3_ames* sam_u = pac_u->sam_u;
u3_keen* ken_u = &pac_u->pur_u.ken_u;
u3_noun pax = u3do("stab", u3i_string(ken_u->pat_c));
c3_w lop_w = _fine_lop(ken_u->fra_w);
// if not [~ ~ fragments], mark as dead
//
u3_weak pas = u3r_at(7, nun);
if( pas == u3_none ) {
_fine_put_cache(sam_u, pax, lop_w, FINE_DEAD);
_ames_pact_free(pac_u);
u3z(nun);
return;
}
u3_noun pax = u3do("stab", u3i_string(ken_u->pat_c));
c3_w lop_w = _fine_lop(ken_u->fra_w);
_fine_put_cache(sam_u, pax, lop_w, pas);
// find requested fragment
@ -1570,6 +1614,7 @@ static void
_fine_hear_request(u3_pact* req_u, c3_w cur_w)
{
u3_pact* res_u;
u3_weak key;
if ( c3n == _fine_sift_wail(req_u, cur_w) ) {
u3l_log("fine: _fine_hear_request bad wail\n");
@ -1577,15 +1622,6 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w)
return;
}
u3_noun pux = u3i_string(req_u->wal_u.ken_u.pat_c);
u3_noun pat = u3dc("rush", pux, u3v_wish("stap"));
if ( u3_nul == pat ) {
u3l_log("fine: bad request\n");
_ames_pact_free(req_u);
u3z(pat);
return;
}
// fill in the parts of res_u that we know from req_u
{
res_u = c3_calloc(sizeof(*res_u));
@ -1628,6 +1664,15 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w)
res_u->pur_u.ken_u.pat_c = c3_calloc(len_s + 1);
memcpy(res_u->pur_u.ken_u.pat_c, req_u->wal_u.ken_u.pat_c, len_s);
}
// make scry cache key
//
key = _fine_scry_path(req_u, c3n);
if ( u3_none == key ) {
u3l_log("fine: bad request\n");
_ames_pact_free(req_u);
return;
}
// free incoming request
//
_ames_pact_free(req_u);
@ -1643,24 +1688,35 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w)
// look up request in scry cache
//
u3_noun key = u3nc(u3k(u3t(pat)), u3i_word(res_u->pur_u.ken_u.fra_w));
u3_weak cac = u3h_git(res_u->sam_u->fin_s.sac_p, key);
if ( u3_none == cac ) {
// already pending; drop
//
if ( FINE_PEND == cac ) {
//u3l_log("fine: pend %u %s\n", res_u->pur_u.ken_u.fra_w,
// res_u->pur_u.ken_u.pat_c);
_ames_pact_free(res_u);
}
// cache miss or a previous scry blocked; try again
//
else if ( (u3_none == cac) || (FINE_DEAD == cac) ) {
//u3l_log("fine: miss %u %s\n", res_u->pur_u.ken_u.fra_w,
// res_u->pur_u.ken_u.pat_c);
// cache miss, scry into arvo for a page of packets
//
c3_w lop_w = _fine_lop(res_u->pur_u.ken_u.fra_w);
u3_noun pax =
u3nc(c3__fine,
u3nq(c3__hunk,
u3dc("scot", c3__ud, lop_w),
u3dc("scot", c3__ud, FINE_PAGE),
u3k(u3t(pat))));
u3k(u3h(key))));
// mark as pending in the scry cache
//
_fine_put_cache(res_u->sam_u, u3k(u3h(key)), lop_w, FINE_PEND);
// scry into arvo for a page of packets
//
u3_pier_peek_last(res_u->sam_u->car_u.pir_u, u3_nul, c3__ax, u3_nul,
pax, res_u, _fine_pack_scry_cb);
}
// cache hit, fill in response meow and send
//
@ -1672,10 +1728,10 @@ _fine_hear_request(u3_pact* req_u, c3_w cur_w)
}
else {
u3l_log("fine: _fine_hear_request meow bad\n");
_ames_pact_free(res_u);
}
u3z(key);
u3z(pat);
}
// TODO: check protocol version
@ -1819,14 +1875,14 @@ _ames_hear(u3_ames* sam_u,
// check contents match mug in header
//
if ( c3n == _ames_check_mug(pac_u) ) {
_log_head(&pac_u->hed_u);
sam_u->sat_u.mut_d++;
// TODO: reinstate filter after debugging is over
// if ( 0 == (sam_u->sat_u.mut_d % 100000) ) {
if ( 1 ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n",
sam_u->sat_u.mut_d);
}
_log_head(&pac_u->hed_u);
sam_u->sat_u.mut_d++;
// TODO: reinstate filter after debugging is over
// if ( 0 == (sam_u->sat_u.mut_d % 100000) ) {
if ( 1 ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n",
sam_u->sat_u.mut_d);
}
_ames_pact_free(pac_u);
return;
}