From 140e5d2b5f34401db9da596094dd2ff3bcf746b4 Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Fri, 24 Mar 2023 13:59:29 +0100 Subject: [PATCH] ames: reorder |keen arms --- pkg/arvo/sys/vane/ames.hoon | 256 +++++++++++++++++------------------- 1 file changed, 123 insertions(+), 133 deletions(-) diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index b0a2ed042..f6542eba5 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1490,9 +1490,7 @@ ^+ event-core =/ =shot (sift-shot b) ?: sam.shot (on-hear-packet l shot d) - ?: req.shot - ~|([%fine %request-events-forbidden] !!) - :: (on-hear-response:fine l shot d) + ?: req.shot ~|([%fine %request-events-forbidden] !!) ?^ d ::TODO handle ~& [%fine %done-goofed mote.u.d] @@ -1502,8 +1500,7 @@ ::NOTE we only send requests to ships we know, :: so we should only get responses from ships we know. :: below we assume sndr.shot is a known peer. - =* from sndr.shot - fi-abet:(on-hear:fi:(abed-got:pe from) l shot) + fi-abet:(on-hear:fi:(abed-got:pe sndr.shot) l shot) :: +on-hear-packet: handle mildly processed packet receipt :: ++ on-hear-packet @@ -1672,7 +1669,6 @@ |= [=wire payload=*] ^+ event-core ?: ?=([%fine %pine @ *] wire) - :: XX TODO refactor ?~ her=(slaw %p i.t.t.wire) :: XX use ev-trace? =/ =tape "; fine dropping malformed wire {}" @@ -3716,7 +3712,7 @@ (call %done ok=%.y) -- -- - :: +fi: construct |fine remote scry core + :: +fi: constructor for |fine remote scry core :: ++ fi => |% @@ -3830,50 +3826,12 @@ (on-keen (slag 3 (en-path:balk u.blk)) duct) :: +| %internal - :: XX TODO rethink core naming/structure to follow current ames ++ ke - |_ $: =path - keen-id=@ud - keen=keen-state - == + |_ [=path keen-id=@ud keen=keen-state] + :: + +| %helpers :: ++ ke-core . - ++ ke-abet - ^+ fine-core - =/ gone=? - =, keen - :: num-fragments is 0 when unknown (i.e. no response - :: yet) - :: if no-one is listening, kill request - ?| =(~ listeners.keen) - &(!=(0 num-fragments) =(num-fragments num-received)) - == - ?: gone - ke-abet-gone - =. ke-core ke-set-wake - =. keens.scry - (put:orm keens.scry keen-id keen) - fine-core - :: - ++ ke-show - =, keen - :* nex=(lent nex) - hav=(lent hav) - num-fragments=num-fragments - num-received=num-received - next-wake=next-wake - metrics=metrics - == - :: - ++ ke-abet-gone - =? ke-core ?=(^ next-wake.keen) - (ke-rest u.next-wake.keen) - =. keens.scry - +:(del:orm keens.scry keen-id) - =. order.scry - (~(del by order.scry) path) - fine-core - :: ++ ke-abed |= p=^path ~| no-keen-for-path/p @@ -3892,26 +3850,119 @@ ?^ out out ?:(=(id i) `p ~) :: - ++ ke-deq (deq want) + ++ ke-abet + ^+ fine-core + =/ gone=? + =, keen + :: num-fragments is 0 when unknown (i.e. no response yet) + :: if no-one is listening, kill request + ?| =(~ listeners.keen) + &(!=(0 num-fragments) =(num-fragments num-received)) + == + ?: gone + ke-abet-gone + =. ke-core ke-set-wake + =. keens.scry + (put:orm keens.scry keen-id keen) + fine-core + :: + ++ ke-abet-gone + =? ke-core ?=(^ next-wake.keen) + (ke-rest u.next-wake.keen) + =. keens.scry + +:(del:orm keens.scry keen-id) + =. order.scry + (~(del by order.scry) path) + fine-core + :: ++ ke-full-path :^ (scot %p her) (scot %ud rift.peer-state) (scot %ud life.peer-state) path :: - ++ ke-update-qos - |= =new=qos - ^+ event-core - =^ old-qos qos.peer-state [qos.peer-state new-qos] - ?~ text=(qos-update-text her old-qos new-qos kay.veb ships.bug.channel) - event-core - :: print message - :: - (emit duct %pass /qos %d %flog %text u.text) + ++ ke-show + =, keen + :* nex=(lent nex) + hav=(lent hav) + num-fragments=num-fragments + num-received=num-received + next-wake=next-wake + metrics=metrics + == :: - ++ ke-etch-keen - |= frag=@ud - (etch-keen ke-full-path frag) + ++ ke-deq (deq want) + ++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen)) + ++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim)) + ++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim)) + ++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag)) + ++ ke-send + |=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot))) + :: + +| %entry-points + :: + ++ ke-start + |= =^duct + ~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}" + =. ke-core (ke-sub duct) + ?> =(num-fragments.keen 0) + =/ fra=@ 1 + =/ req (ke-etch-keen fra) + =/ =want [fra req now 1 0] + =. wan.keen (cons:ke-deq *(pha ^want) want) + :: =. metrics.keen (on-sent:ke-gauge 1) + (ke-send req) + :: + ++ ke-rcv + |= [[=full=^path num=@ud] =meow =lane:ames] + ^+ ke-core + =/ og ke-core + =. event-core (ke-update-qos %live last-contact=now) + :: handle empty + ?: =(0 num.meow) + ?> =(~ dat.meow) + (ke-done sig.meow ~) + :: update congestion, or fill details + :: + =? ke-core =(0 num-fragments.keen) + ?> =(num 1) + (ke-first-rcv meow) + :: + ?. ?=([@ @ @ *] full-path) + ~| fine-path-too-short+full-path + !! + ?. =(`her (slaw %p i.full-path)) + ~| fine-path-bunk-ship+[full-path her] + !! + ?. =(`life.peer-state (slaw %ud i.t.t.full-path)) + ~| fine-path-bunk-life+[full-path life.peer-state] + !! + ?. =(`rift.peer-state (slaw %ud i.t.full-path)) + ~| fine-path-bunk-rift+[full-path rift.peer-state] + !! + ?. (veri-fra:keys [full-path num [dat sig]:meow]) + ~| fine-purr-fail-signature/num^`@ux`sig.meow + ~| life.peer-state + !! + :: + =^ found=? ke-core (ke-on-ack num) + ?. found + (ke-fast-retransmit:og num) + =: hav.keen [[num meow] hav.keen] + num-received.keen +(num-received.keen) + == + ?. =(num-fragments num-received):keen + ke-continue + (ke-done [sig dat]:ke-sift-full) + :: + ++ ke-sub + |=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct))) + :: scry is autocancelled in +ke-abet if no more listeners + :: + ++ ke-unsub + |=(=^duct ke-core(listeners.keen (~(del in listeners.keen) duct))) + :: + +| %implementation :: ++ ke-on-ack =| marked=(list want) @@ -3940,18 +3991,6 @@ =. ke-core (ke-send hoot.want) [`want %.n found ke-core] :: - ++ ke-start - |= =^duct - ~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}" - =. ke-core (ke-sub duct) - ?> =(num-fragments.keen 0) - =/ fra=@ 1 - =/ req (ke-etch-keen fra) - =/ =want [fra req now 1 0] - =. wan.keen (cons:ke-deq *(pha ^want) want) - :: =. metrics.keen (on-sent:ke-gauge 1) - (ke-send req) - :: ++ ke-done |= [sig=@ data=$@(~ (cask))] ?> (meri:keys ke-full-path sig data) @@ -3998,18 +4037,7 @@ =. ke-core (ke-send hoot.want) $(inx +(inx)) :: - ++ ke-sub - |= =^duct - ke-core(listeners.keen (~(put in listeners.keen) duct)) - :: scry is autocancelled in +ke-abet if no more listeners :: - ++ ke-unsub - |= =^duct - ke-core(listeners.keen (~(del in listeners.keen) duct)) - :: - ++ ke-send - |= =hoot - ke-core(event-core (send-blob for=| her `@ux`hoot)) :: ++ ke-sift-full =, keen @@ -4021,47 +4049,15 @@ ?> =((lent hav) num-received) (sift-roar num-fragments hav) :: - ++ ke-rcv - |= [[=full=^path num=@ud] =meow =lane:ames] - ^+ ke-core - =/ og ke-core - =. event-core (ke-update-qos %live last-contact=now) - :: handle empty - ?: =(0 num.meow) - ?> =(~ dat.meow) - (ke-done sig.meow ~) - :: update congestion, or fill details + ++ ke-update-qos + |= new=qos + ^+ event-core + =^ old qos.peer-state [qos.peer-state new] + ?~ text=(qos-update-text her old new kay.veb ships.bug.channel) + event-core + :: print message :: - =? ke-core =(0 num-fragments.keen) - ?> =(num 1) - (ke-first-rcv meow) - :: - ?. ?=([@ @ @ *] full-path) - ~| fine-path-too-short+full-path - !! - ?. =(`her (slaw %p i.full-path)) - ~| fine-path-bunk-ship+[full-path her] - !! - ?. =(`life.peer-state (slaw %ud i.t.t.full-path)) - ~| fine-path-bunk-life+[full-path life.peer-state] - !! - ?. =(`rift.peer-state (slaw %ud i.t.full-path)) - ~| fine-path-bunk-rift+[full-path rift.peer-state] - !! - ?. (veri-fra:keys [full-path num [dat sig]:meow]) - ~| fine-purr-fail-signature/num^`@ux`sig.meow - ~| life.peer-state - !! - :: - =^ found=? ke-core (ke-on-ack num) - ?. found - (ke-fast-retransmit:og num) - =: hav.keen [[num meow] hav.keen] - num-received.keen +(num-received.keen) - == - ?. =(num-fragments num-received):keen - ke-continue - (ke-done [sig dat]:ke-sift-full) + (emit duct %pass /qos %d %flog %text u.text) :: ++ ke-fast-retransmit |= fra=@ud @@ -4079,17 +4075,11 @@ =. cor (ke-send:cor hoot.want) [`want | cor] :: - ++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen)) - :: - ++ ke-timer-wire - `wire`(welp /fine/behn/wake/(scot %p her) path) - :: ++ ke-pass-timer |= =note - ke-core(event-core (emit unix-duct.ames-state %pass ke-timer-wire note)) + =/ =wire (welp /fine/behn/wake/(scot %p her) path) + ke-core(event-core (emit unix-duct.ames-state %pass wire note)) :: - ++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim)) - ++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim)) ++ ke-set-wake ^+ ke-core =/ next-wake=(unit @da) @@ -4137,7 +4127,7 @@ (ke-send hoot.u.want) -- -- - :: +ga: construct |pump-gauge congestion control core + :: +ga: constructor for |pump-gauge congestion control core :: ++ ga |= [pump-metrics live-packets=@ud]