diff --git a/pkg/urbit/include/c/motes.h b/pkg/urbit/include/c/motes.h index b5a3a50abe..fbb0a8d394 100644 --- a/pkg/urbit/include/c/motes.h +++ b/pkg/urbit/include/c/motes.h @@ -72,6 +72,7 @@ # define c3__ber c3_s3('b','e','r') # define c3__bias c3_s4('b','i','a','s') # define c3__bic c3_s3('b','i','c') +# define c3__bide c3_s4('b','i','d', 'e') # define c3__bind c3_s4('b','i','n','d') # define c3__bink c3_s4('b','i','n','k') # define c3__bird c3_s4('b','i','r','d') diff --git a/pkg/urbit/vere/io/ames.c b/pkg/urbit/vere/io/ames.c index 202304d57a..2563fa6c96 100644 --- a/pkg/urbit/vere/io/ames.c +++ b/pkg/urbit/vere/io/ames.c @@ -12,6 +12,7 @@ typedef struct _u3_fine { c3_y ver_y; // fine protocol u3p(u3h_root) sac_p; // scry cache hashtable + u3p(u3h_root) bid_p; // pending notifications struct _u3_ames* sam_u; // ames backpointer } u3_fine; @@ -130,6 +131,8 @@ }; } u3_pact; +static void _fine_got_pack(u3_pact*, u3_noun); + /* u3_panc: deconstructed incoming packet */ typedef struct _u3_panc { @@ -940,6 +943,58 @@ _ames_czar(u3_pact* pac_u) } } +/* _fine_ef_howl(): broadcast notification of newly bound data + */ +static void +_fine_ef_howl(u3_ames* sam_u, u3_noun pax, u3_noun lis) { + u3l_log("howling\n"); + u3_noun pas = lis; + + // TODO; refactor, + u3_weak fra = u3_none; + c3_w cur_w = 1; + u3k(pas); + u3l_log("howling a\n"); + while(pas != u3_nul) { + u3l_log("howling in list\n"); + u3_noun key = u3nc(u3k(pax), u3i_word(cur_w)); + u3h_put(sam_u->fin_s.sac_p, key, u3k(u3h(pas))); + pas = u3t(pas); + u3z(key); + } + u3l_log("howling after list\n"); + u3m_p("pax", pax); + u3_weak who = u3h_get(sam_u->fin_s.bid_p, pax); + if ( who == u3_none ) { + u3l_log("no listeners\n"); + } else { + + u3_noun her = who; + while ( her != u3_nul ) { + //u3m_p("her", her); + + /*u3_weak lac = _ames_lane_from_cache(sam_u->lax_p, u3h(her)); + if ( lac == u3_none ) { + u3l_log("no lane\n"); + u3m_p("for", u3h(her)); + her = u3t(her); + continue; + }*/ + u3_pact* pac_u = c3_calloc(sizeof(*pac_u)); + pac_u->sam_u = sam_u; + c3_d her_d[2]; + u3r_chubs(0, 2, her_d, u3k(u3h(her))); + u3l_log(" %"PRIu64", %" PRIu64 "\n", her_d[0], her_d[1]); + + // TODO: fix for non-galaxy case + pac_u->imp_y = her_d[0]; + + _fine_got_pack(pac_u, u3h(lis)); + her = u3t(her); + } + } +} + /* _ames_ef_send(): send packet to network (v4). */ static void @@ -1326,16 +1381,52 @@ _ames_skip(u3_body* bod_u) { static void _fine_got_pack(u3_pact* pac_u, u3_noun fra) { + u3l_log("got pack\n"); pac_u->len_w = u3r_met(3, fra); pac_u->hun_y = c3_calloc(pac_u->len_w); + u3m_p("fra", fra); u3r_bytes(0, pac_u->len_w, pac_u->hun_y, fra); + u3l_log("preczar\n"); - _fine_send(pac_u); + _ames_czar(pac_u); //_fine_send(pac_u); + u3l_log("postczar\n"); u3z(fra); } +static void _fine_bide(u3_pact* pac_u, u3_noun pax) +{ + u3_ames* sam_u = pac_u->sam_u; + u3_weak lis = u3h_get(sam_u->fin_s.bid_p, pax); + + if ( u3_none == lis ) { + lis = u3_nul; + } + u3_noun her = u3i_chubs(2, pac_u->req_u.pre_u.sen_d); + u3_noun new = u3nc(her, lis); + // TODO: linear search bad + + u3l_log("put in hashtable\n"); + u3m_p("hashtable", new); + u3_noun car, res; + u3x_cell(pax, &car, &res); + u3_noun our = u3dc("scot", 'p', u3i_chubs(2, sam_u->car_u.pir_u->who_d)); + + u3_noun pat = u3nt(car, our, res); + u3h_put(sam_u->fin_s.bid_p, pat, new); + u3m_p("pat", pat); + + u3_noun cad = u3nc(c3__bide, pat); + u3_noun wir = u3nc(c3__fine, u3_nul); + u3_ovum* ovo_u = u3_ovum_init(0, c3__ames, u3k(wir), u3k(cad)); + u3_auto_plan(&sam_u->car_u, ovo_u); + + u3z(wir); + u3z(cad); +} + + /* _fine_pack_scry_cb(): receive all packets for datum out of fine */ static void _fine_pack_scry_cb(void* vod_p, u3_noun nun) @@ -1345,15 +1436,18 @@ static void _fine_pack_scry_cb(void* vod_p, u3_noun nun) u3_pact* pac_u = vod_p; u3_ames* sam_u = pac_u->sam_u; u3_weak pas = u3r_at(7, nun); + u3_noun pax = u3do("stab", u3i_string(pac_u->req_u.pat_c)); if(pas == u3_none) { + // TODO: send %bide u3l_log("no result, bailing\n"); + _fine_bide(pac_u, u3k(pax)); _ames_pact_free(pac_u); + u3z(pax); u3z(nun); return; } - c3_assert( 1 == pac_u->typ_y); + c3_assert( 1 == pac_u->typ_y ); u3_weak fra = u3_none; - u3_noun pax = u3i_string(pac_u->req_u.pat_c); c3_w cur_w = 1; u3k(pas); while(pas != u3_nul) { @@ -1903,6 +1997,10 @@ _ames_kick_newt(u3_ames* sam_u, u3_noun tag, u3_noun dat) } break; case c3__howl: { + u3_noun pat = u3k(u3h(dat)); + u3_noun lis = u3k(u3t(dat)); + u3l_log("fine: howl\n"); + _fine_ef_howl(sam_u, pat, lis); ret_o = c3y; } break; @@ -2056,6 +2154,9 @@ u3_ames_io_init(u3_pier* pir_u) // 50 bytes (average) per path * 100_000 = 5MB sam_u->fin_s.sac_p = u3h_new_cache(100000); + // hashtable for notificatiosn + sam_u->fin_s.bid_p = u3h_new_cache(100000); + //NOTE some numbers on memory usage for the lane cache // // assuming we store: