From 439184fa7c63b7eacc554be1cd727f29b67362d7 Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Thu, 23 Mar 2023 12:54:22 +0100 Subject: [PATCH] ames: add +abed arms to peer-core --- pkg/arvo/sys/vane/ames.hoon | 127 +++++++++++++++--------------------- 1 file changed, 51 insertions(+), 76 deletions(-) diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 5ae100ddf..a4211134c 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1302,13 +1302,11 @@ ~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}" event-core ?> ?=([@ her=ship *] u.parsed) - =* her her.u.parsed - =/ =peer-state (got-peer-state her) - =/ =channel [[our her] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) + =* her her.u.parsed + =/ peer-core (abed-got:pe her) |^ ?: ?& ?=([%new *] u.parsed) - (lth rift.u.parsed rift.peer-state) + (lth rift.u.parsed rift.peer-state.peer-core) == :: ignore events from an old rift :: @@ -1341,19 +1339,18 @@ =/ sink-core (mi:peer-core bone *message-sink-state) =. peer-core abet:(call:abed:sink-core %done ok=%.n) =. event-core abet:peer-core - =/ =^peer-state (got-peer-state her) - =/ =^channel [[our her] now channel-state -.peer-state] :: construct nack-trace message, referencing .failed $message-num :: - =/ failed=message-num last-acked:(~(got by rcv.peer-state) bone) + =/ failed=message-num + last-acked:(~(got by rcv.peer-state.peer-core) bone) =/ =naxplanation [failed error] =/ =message-blob (jam naxplanation) :: send nack-trace message on associated .nack-trace-bone :: - =. peer-core (pe peer-state channel) =/ nack-trace-bone=^bone (mix 0b10 bone) :: - =+ pump-core=(mu:peer-core nack-trace-bone *message-pump-state) + =/ pump-core + (mu:(abed-got:pe her) nack-trace-bone *message-pump-state) abet:(call:abed:pump-core %memo message-blob) -- :: +on-sift: handle request to filter debug output by ship @@ -1407,8 +1404,7 @@ ^+ event-core =/ par (get-peer-state her) ?~ par event-core - =/ =channel [[our her] now channel-state -.u.par] - =/ peer-core (pe u.par channel) + =/ peer-core (abed-peer:pe her u.par) =/ bones ~(tap in ~(key by snd.u.par)) |- ^+ event-core ?~ bones abet:peer-core @@ -1477,28 +1473,22 @@ |= =ship ^+ event-core =/ ship-state (~(get by peers.ames-state) ship) - ?. ?=([~ %known *] ship-state) - %+ enqueue-alien-todo ship - |= todos=alien-agenda - todos(heeds (~(put in heeds.todos) duct)) - :: - =/ =peer-state +.u.ship-state - =/ =channel [[our ship] now channel-state -.peer-state] - abet:on-heed:(pe peer-state channel) + ?: ?=([~ %known *] ship-state) + abet:on-heed:(abed-peer:pe ship +.u.ship-state) + %+ enqueue-alien-todo ship + |= todos=alien-agenda + todos(heeds (~(put in heeds.todos) duct)) :: +on-jilt: handle request to stop tracking .ship's responsiveness :: ++ on-jilt |= =ship ^+ event-core =/ ship-state (~(get by peers.ames-state) ship) - ?. ?=([~ %known *] ship-state) - %+ enqueue-alien-todo ship - |= todos=alien-agenda - todos(heeds (~(del in heeds.todos) duct)) - :: - =/ =peer-state +.u.ship-state - =/ =channel [[our ship] now channel-state -.peer-state] - abet:on-jilt:(pe peer-state channel) + ?: ?=([~ %known *] ship-state) + abet:on-jilt:(abed-peer:pe ship +.u.ship-state) + %+ enqueue-alien-todo ship + |= todos=alien-agenda + todos(heeds (~(del in heeds.todos) duct)) :: +on-hear: handle raw packet receipt :: ++ on-hear @@ -1519,10 +1509,7 @@ :: so we should only get responses from ships we know. :: below we assume sndr.shot is a known peer. =* from sndr.shot - =/ =peer-state (got-peer-state from) - =/ =channel [[our from] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - fi-abet:(on-hear:fi:peer-core l shot) + fi-abet:(on-hear:fi:(abed-got:pe from) l shot) :: +on-hear-packet: handle mildly processed packet receipt :: ++ on-hear-packet @@ -1683,8 +1670,8 @@ `[direct=%.n |+u.origin.shot] :: perform peer-specific handling of packet :: - =/ peer-core (pe peer-state channel) - abet:(on-hear-shut-packet:peer-core lane shut-packet dud) + =< abet + (~(on-hear-shut-packet pe peer-state channel) [lane shut-packet dud]) :: +on-take-boon: receive request to give message to peer :: ++ on-take-boon @@ -1696,22 +1683,18 @@ :: XX use ev-trace? =/ =tape "; fine dropping malformed wire {}" (emit duct %pass /parse-wire %d %flog %text tape) - =/ =peer-state (got-peer-state u.her) - =/ =channel [[our u.her] now channel-state -.peer-state] - fi-abet:(on-pine-boon:fi:(pe peer-state channel) t.t.t.wire payload) + abet:(on-pine-boon:fi:(abed-got:pe u.her) t.t.t.wire payload) :: ?~ parsed=(parse-bone-wire wire) ~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}" event-core :: ?> ?=([@ her=ship *] u.parsed) - =* her her.u.parsed - =/ =peer-state (got-peer-state her) - =/ =channel [[our her] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) + =* her her.u.parsed + =/ peer-core (abed-got:pe her) :: ?: ?& ?=([%new *] u.parsed) - (lth rift.u.parsed rift.peer-state) + (lth rift.u.parsed rift.peer-state.peer-core) == :: ignore events from an old rift :: @@ -1770,7 +1753,7 @@ =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "plea {}" - abet:(on-memo:(pe peer-state channel) bone plea %plea) + abet:(~(on-memo pe [peer-state channel]) bone plea %plea) :: +on-cork: handle request to kill a flow :: ++ on-cork @@ -1801,7 +1784,7 @@ =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "cork plea {}" - abet:(on-memo:(pe peer-state channel) bone plea %plea) + abet:(~(on-memo pe [peer-state channel]) bone plea %plea) :: +on-kroc: cork all stale flows from failed subscriptions :: ++ on-kroc @@ -1908,13 +1891,12 @@ %- slog [leaf+"ames: got timer for strange ship: {}, ignoring" ~] :: - =/ =channel [[our her.u.res] now channel-state -.u.state] - =/ peer-core (pe u.state channel) + =/ peer-core (abed-peer:pe her.u.res u.state) ?- -.u.res %pump abet:(on-wake:peer-core bone.u.res error) :: %fine - =< fi-abet + =< abet ke-abet:ke-take-wake:(ke-abed:ke:fi:peer-core wire.u.res) == :: @@ -1932,10 +1914,7 @@ =+ [her sat]=i.pez ?. ?=(%known -.sat) $(pez t.pez) - =* peer-state +.sat - =/ =channel [[our her] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - $(pez t.pez, event-core abet:recork-one:peer-core) + $(pez t.pez, event-core abet:recork-one:(abed-peer:pe her +.sat)) :: +on-init: first boot; subscribe to our info from jael :: ++ on-init @@ -2170,8 +2149,8 @@ ++ meet-alien-fine |= [peens=(jug path ^duct) key=?(%keen %pine)] ^+ event-core - =+ fine=fi:(pe *peer-state *channel) - =; f=_fine fi-abet:f + =+ fine=fi:(abed:pe ship) + =< fi-abet ^+ fine %- ~(rep by peens) |= [[=path ducts=(set ^duct)] cor=_fine] %- ~(rep in ducts) @@ -2281,10 +2260,7 @@ |= todos=alien-agenda todos(pines (~(put ju keens.todos) path duct)) ?> ?=([%known *] u.ship-state) - =/ =peer-state +.u.ship-state - =/ =channel [[our ship] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - fi-abet:(on-pine:fi:peer-core path duct) + fi-abet:(on-pine:fi:(abed-peer:pe ship +.u.ship-state) path duct) :: XX: crashing correct behaviour? =+ blk=(need (de-part:balk our rift.ames-state life.ames-state path)) ?> ?=(%c van.blk) @@ -2308,10 +2284,7 @@ |= todos=alien-agenda todos(keens (~(put ju keens.todos) path duct)) ?> ?=([%known *] u.ship-state) - =/ =peer-state +.u.ship-state - =/ =channel [[our ship] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - fi-abet:(on-keen:fi:peer-core path duct) + fi-abet:(on-keen:fi:(abed-peer:pe ship +.u.ship-state) path duct) :: ++ on-yawn |= [=ship =path] @@ -2319,11 +2292,7 @@ ?~ ship-state=(~(get by peers.ames-state) ship) ~|(%no-ship-for-yawn !!) ?> ?=([%known *] u.ship-state) - =/ =peer-state +.u.ship-state - =/ =channel [[our ship] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - =< fi-abet - ke-abet:(ke-unsub:(ke-abed:ke:fi:peer-core path) duct) + fi-abet:ke-abet:(ke-unsub:(ke-abed:ke:fi:(abed:pe ship) path) duct) :: +| %implementation :: +enqueue-alien-todo: helper to enqueue a pending request @@ -2470,16 +2439,24 @@ :: +pe: create nested |peer-core for per-peer processing :: ++ pe - |= [=peer-state =channel] - =* veb veb.bug.channel - =* her her.channel - =* scry scry.peer-state - |% + |_ [=peer-state =channel] + +* veb veb.bug.channel + her her.channel + scry scry.peer-state :: +| %helpers :: ++ peer-core . - ++ pe-emit |=(move peer-core(event-core (emit +<))) + ++ pe-emit |=(move peer-core(event-core (emit +<))) + ++ abed |=(=ship (abed-peer ship (gut-peer-state ship))) + ++ abed-got |=(=ship (abed-peer ship (got-peer-state ship))) + ++ abed-peer + |= [=ship peer=^peer-state] + %_ peer-core + peer-state peer + channel [[our ship] now channel-state -.peer] + == + :: ++ abet ^+ event-core =. peers.ames-state @@ -3771,7 +3748,6 @@ -- -- :: +fi: construct |fine remote scry core - :: XX TODO rethink core naming/structure to follow current ames :: ++ fi => |% @@ -3838,8 +3814,7 @@ +| %helpers :: ++ fine-core . - :: ++ fi-abed XX TODO - ++ fi-abet abet :: +abet:pe + ++ fi-abet abet :: XX +abet:pe, call directly instead? :: +| %entry-points :: @@ -3886,7 +3861,7 @@ (on-keen (slag 3 (en-path:balk u.blk)) duct) :: +| %internal - :: + :: XX TODO rethink core naming/structure to follow current ames ++ ke |_ $: =path keen-id=@ud