diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 5676fe3b3..6b03a1a31 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -493,6 +493,10 @@ =/ vec ~[sndr.shot rcvr.shot sndr-life rcvr-life] ;; shut-packet %- cue %- need (~(de sivc:aes:crypto (shaz symmetric-key) vec) siv len cyf) +:: ordered map for tracking remote scry requests +:: +++ orm ((on @ud keen-state) lte) +:: +| %atomics :: +$ private-key @uwprivatekey @@ -1503,7 +1507,7 @@ ::NOTE we only send requests to ships we know, :: so we should only get responses from ships we know. :: below we assume sndr.shot is a known peer. - fi-abet:(on-hear:fi:(abed-got:pe sndr.shot) l shot) + abet:(on-hear-fine:(abed-got:pe sndr.shot) l shot) :: +on-hear-packet: handle mildly processed packet receipt :: ++ on-hear-packet @@ -1676,7 +1680,7 @@ :: XX use ev-trace? =/ =tape "; fine dropping malformed wire {}" (emit duct %pass /parse-wire %d %flog %text tape) - fi-abet:(on-pine-boon:fi:(abed-got:pe u.her) t.t.t.wire payload) + abet:(on-pine-boon:(abed-got:pe u.her) t.t.t.wire payload) :: ?~ parsed=(parse-bone-wire wire) ~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}" @@ -1890,9 +1894,7 @@ =/ peer-core (abed-peer:pe her.u.res u.state) ?- -.u.res %pump abet:(on-wake:peer-core bone.u.res error) - :: - %fine - fi-abet:ke-abet:ke-take-wake:(ke-abed:ke:fi:peer-core wire.u.res) + %fine abet:ke-abet:ke-take-wake:(ke-abed:ke:peer-core wire.u.res) == :: =. event-core (emit duct %pass /recork %b %wait `@da`(add now ~d1)) @@ -2143,10 +2145,10 @@ ++ meet-alien-fine |= [peens=(jug path ^duct) key=?(%keen %pine)] ^+ event-core - =+ fine=fi:(abed:pe ship) - =< fi-abet ^+ fine + =+ peer-core=(abed:pe ship) + =< abet ^+ peer-core %- ~(rep by peens) - |= [[=path ducts=(set ^duct)] cor=_fine] + |= [[=path ducts=(set ^duct)] cor=_peer-core] %- ~(rep in ducts) |= [=^duct c=_cor] %.([path duct] ?-(key %pine on-pine:c, %keen on-keen:c)) @@ -2251,7 +2253,7 @@ ?< =(our ship) ?^ ship-state=(~(get by peers.ames-state) ship) ?> ?=([%known *] u.ship-state) - fi-abet:(on-pine:fi:(abed-peer:pe ship +.u.ship-state) path duct) + abet:(on-pine:(abed-peer:pe ship +.u.ship-state) path duct) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(pines (~(put ju pines.todos) path duct)) @@ -2262,7 +2264,7 @@ =+ ~:(spit path) :: assert length ?^ ship-state=(~(get by peers.ames-state) ship) ?> ?=([%known *] u.ship-state) - fi-abet:(on-keen:fi:(abed-peer:pe ship +.u.ship-state) path duct) + abet:(on-keen:(abed-peer:pe ship +.u.ship-state) path duct) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(keens (~(put ju keens.todos) path duct)) @@ -2279,7 +2281,7 @@ :: %- (slog leaf+"ames: missing scry {} for {}" ~) event-core - fi-abet:ke-abet:(ke-unsub:(ke-abed:ke:fi:peer path) duct all) + abet:ke-abet:(ke-unsub:(ke-abed:ke:peer path) duct all) :: +| %implementation :: +enqueue-alien-todo: helper to enqueue a pending request @@ -2622,6 +2624,46 @@ :: abet:(call:(abed:mu bone) %wake ~) :: + ++ on-hear-fine + |= [=lane =shot] + ^+ peer-core + ?> =(sndr-tick.shot (mod life.peer-state 16)) + :: + =/ [=peep =meow] (sift-purr `@ux`content.shot) + =/ =path (slag 3 path.peep) + ?. (~(has by order.scry) path) + ~&(dead-response/peep peer-core) + =< ke-abet + (ke-rcv:(ke-abed:ke path) peep meow lane) + :: + ++ on-keen + |= [=path =^duct] + ^+ peer-core + ?: (~(has by order.scry) path) + ~> %slog.0^leaf/"fine: dupe {(spud path)}" + ke-abet:(ke-sub:(ke-abed:ke path) duct) + =^ keen-id=@ud seq.scry [seq.scry +(seq.scry)] + =. order.scry (~(put by order.scry) path keen-id) + =. keens.scry (put:orm keens.scry keen-id *keen-state) + ke-abet:(ke-start:(ke-abed:ke path) duct) + :: + ++ on-pine + |= [=path =^duct] + ^+ peer-core + ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) + !! :: XX: ??? + =/ =wire [%fine %pine (scot %p her) path] + (pe-emit duct %pass wire %a %plea her plea=[%$ /pine `*`u.blk]) + :: + ++ on-pine-boon + |= [=path payload=*] + ^+ peer-core + ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) + !! + =+ ;;(case=@ud payload) + =. cas.u.blk ud+case + (on-keen (slag 3 (en-path:balk u.blk)) duct) + :: +| %implementation :: +dedup-message: replace with any existing copy of this message :: @@ -3715,12 +3757,10 @@ (call %done ok=%.y) -- -- - :: +fi: constructor for |fine remote scry core + :: +ke: constructor for |keen remote scry core :: - ++ fi + ++ ke => |% - :: - ++ orm ((on @ud keen-state) lte) :: +gum: glue together a list of $byts into one :: :: TODO: move to hoon.hoon (see +cad in lib/tiny) @@ -3777,382 +3817,325 @@ -- -- :: - |% + |_ [=path keen-id=@ud keen=keen-state] :: +| %helpers :: - ++ fine-core . - ++ fi-abet abet :: XX +abet:pe, call directly instead? + ++ ke-core . + ++ ke-abed + |= p=^path + ~| no-keen-for-path/p + =. keen-id (~(got by order.scry) p) + ke-core(path p, keen (got:orm keens.scry keen-id)) + :: + ++ ke-abed-id + |= id=@ud + %- ke-abed + ~| no-path-for-id/id + %- need + ^- (unit ^path) + %- ~(rep by order.scry) + |= [[p=^path i=@ud] out=(unit ^path)] + ^- (unit ^path) + ?^ out out + ?:(=(id i) `p ~) + :: + ++ ke-abet + ^+ peer-core + ?: =, keen + :: num-fragments is 0 when unknown (i.e. no response yet) + :: if no-one is listening, kill request + :: + ?| =(~ listeners.keen) + &(!=(0 num-fragments) =(num-fragments num-received)) + == + ke-abet-gone + =. ke-core ke-set-wake + =. keens.scry (put:orm keens.scry keen-id keen) + peer-core + :: + ++ ke-abet-gone + =? ke-core ?=(^ next-wake.keen) + (ke-rest u.next-wake.keen) + =. keens.scry +:(del:orm keens.scry keen-id) + =. order.scry (~(del by order.scry) path) + peer-core + :: + ++ ke-full-path + :^ (scot %p her) + (scot %ud rift.peer-state) + (scot %ud life.peer-state) + path + :: + ++ ke-show + =, keen + :* nex=(lent nex) + hav=(lent hav) + num-fragments=num-fragments + num-received=num-received + next-wake=next-wake + metrics=metrics + == + :: + ++ ke-deq (deq want) + ++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen)) + ++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim)) + ++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim)) + ++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag)) + ++ ke-send + |=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot))) + :: + ++ ke-clean-up + |= [=^duct core=_event-core] + ?+ duct core + [[%clay *] *] core :: XX TODO? + :: + [[%gall %use app=@ *] *] + ?. ?=([%gall %use %spider *] -.duct) + core :: XX TODO? + ?> ?=([%gall %use %spider @ ship=@ %thread tid=@ *] -.duct) + =/ =cage spider-stop+!>([&7.-.duct |]) + =/ poke=* [%0 %m [p q.q]:cage] + =/ =plea [%g /ge/spider poke] + (emit:core duct %pass /fine/unsub %g %plea our plea) + == :: +| %entry-points :: - ++ on-hear - |= [=lane =shot] - ^+ fine-core - ?> =(sndr-tick.shot (mod life.peer-state 16)) + ++ ke-start + |= =^duct + ~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}" + =. ke-core (ke-sub duct) + ?> =(num-fragments.keen 0) + =/ fra=@ 1 + =/ req=hoot (ke-etch-keen fra) + =/ =want [fra req last=now tries=1 skips=0] + =. wan.keen (cons:ke-deq *(pha ^want) want) + :: =. metrics.keen (on-sent:ke-gauge 1) + (ke-send req) + :: + ++ ke-rcv + |= [[=full=^path num=@ud] =meow =lane:ames] + ^+ ke-core + =/ og ke-core + =. event-core (ke-update-qos %live last-contact=now) + :: handle empty + ?: =(0 num.meow) + ?> =(~ dat.meow) + (ke-done sig.meow ~) + :: update congestion, or fill details :: - =/ [=peep =meow] (sift-purr `@ux`content.shot) - =/ =path (slag 3 path.peep) - ?. (~(has by order.scry) path) - ~&(dead-response/peep fine-core) - =< ke-abet - (ke-rcv:(ke-abed:ke path) peep meow lane) - :: - ++ on-keen - |= [=path =^duct] - ^+ fine-core - ?: (~(has by order.scry) path) - ~> %slog.0^leaf/"fine: dupe {(spud path)}" - ke-abet:(ke-sub:(ke-abed:ke path) duct) - =^ keen-id=@ud seq.scry [seq.scry +(seq.scry)] - =. order.scry (~(put by order.scry) path keen-id) - =. keens.scry (put:orm keens.scry keen-id *keen-state) - ke-abet:(ke-start:(ke-abed:ke path) duct) - :: - ++ on-pine - |= [=path =^duct] - ^+ fine-core - ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) - !! :: XX: ??? - =/ =wire [%fine %pine (scot %p her) path] - =/ =plea [%$ /pine `*`u.blk] - =. event-core (emit duct %pass wire %a %plea her plea) - fine-core - :: - ++ on-pine-boon - |= [=path payload=*] - ^+ fine-core - ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) + =? ke-core =(0 num-fragments.keen) + ?> =(num 1) + (ke-first-rcv meow) + :: + ?. ?=([@ @ @ *] full-path) + ~| fine-path-too-short+full-path + !! + ?. =(`her (slaw %p i.full-path)) + ~| fine-path-bunk-ship+[full-path her] + !! + ?. =(`life.peer-state (slaw %ud i.t.t.full-path)) + ~| fine-path-bunk-life+[full-path life.peer-state] + !! + ?. =(`rift.peer-state (slaw %ud i.t.full-path)) + ~| fine-path-bunk-rift+[full-path rift.peer-state] + !! + ?. (veri-fra:keys [full-path num [dat sig]:meow]) + ~| fine-purr-fail-signature/num^`@ux`sig.meow + ~| life.peer-state !! - =+ ;;(case=@ud payload) - =. cas.u.blk ud+case - (on-keen (slag 3 (en-path:balk u.blk)) duct) - :: - +| %internal - :: - ++ ke - |_ [=path keen-id=@ud keen=keen-state] :: - +| %helpers - :: - ++ ke-core . - ++ ke-abed - |= p=^path - ~| no-keen-for-path/p - =. keen-id (~(got by order.scry) p) - ke-core(path p, keen (got:orm keens.scry keen-id)) - :: - ++ ke-abed-id - |= id=@ud - %- ke-abed - ~| no-path-for-id/id - %- need - ^- (unit ^path) - %- ~(rep by order.scry) - |= [[p=^path i=@ud] out=(unit ^path)] - ^- (unit ^path) - ?^ out out - ?:(=(id i) `p ~) - :: - ++ ke-abet - ^+ fine-core - ?: =, keen - :: num-fragments is 0 when unknown (i.e. no response yet) - :: if no-one is listening, kill request - :: - ?| =(~ listeners.keen) - &(!=(0 num-fragments) =(num-fragments num-received)) - == - ke-abet-gone - =. ke-core ke-set-wake - =. keens.scry (put:orm keens.scry keen-id keen) - fine-core - :: - ++ ke-abet-gone - =? ke-core ?=(^ next-wake.keen) - (ke-rest u.next-wake.keen) - =. keens.scry - +:(del:orm keens.scry keen-id) - =. order.scry - (~(del by order.scry) path) - fine-core - :: - ++ ke-full-path - :^ (scot %p her) - (scot %ud rift.peer-state) - (scot %ud life.peer-state) - path - :: - ++ ke-show - =, keen - :* nex=(lent nex) - hav=(lent hav) - num-fragments=num-fragments - num-received=num-received - next-wake=next-wake - metrics=metrics + =^ found=? ke-core (ke-on-ack num) + ?. found + (ke-fast-retransmit:og num) + =: hav.keen [[num meow] hav.keen] + num-received.keen +(num-received.keen) == - :: - ++ ke-deq (deq want) - ++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen)) - ++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim)) - ++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim)) - ++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag)) - ++ ke-send - |=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot))) - :: - ++ ke-clean-up - |= [=^duct core=_event-core] - ?+ duct core - [[%clay *] *] core :: XX TODO - :: - [[%gall %use app=@ *] *] - ?. ?=([%gall %use %spider *] -.duct) - core :: XX TODO - ?> ?=([%gall %use %spider @ ship=@ %thread tid=@ *] -.duct) - =/ =cage spider-stop+!>([&7.-.duct |]) - =/ poke=* [%0 %m [p q.q]:cage] - =/ =plea [%g /ge/spider poke] - (emit:core duct %pass /fine/unsub %g %plea our plea) - == - :: - +| %entry-points - :: - ++ ke-start - |= =^duct - ~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}" - =. ke-core (ke-sub duct) - ?> =(num-fragments.keen 0) - =/ fra=@ 1 - =/ req=hoot (ke-etch-keen fra) - =/ =want [fra req last=now tries=1 skips=0] - =. wan.keen (cons:ke-deq *(pha ^want) want) - :: =. metrics.keen (on-sent:ke-gauge 1) - (ke-send req) - :: - ++ ke-rcv - |= [[=full=^path num=@ud] =meow =lane:ames] - ^+ ke-core - =/ og ke-core - =. event-core (ke-update-qos %live last-contact=now) - :: handle empty - ?: =(0 num.meow) - ?> =(~ dat.meow) - (ke-done sig.meow ~) - :: update congestion, or fill details + ?. =(num-fragments num-received):keen + ke-continue + (ke-done [sig dat]:ke-sift-full) + :: + ++ ke-sub + |=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct))) + :: scry is autocancelled in +ke-abet if no more listeners + :: + ++ ke-unsub + |= [=^duct all=?] + ?. |(all (~(has in listeners.keen) duct)) + %. ke-core + :: XX TODO use trace, add fine flags? reuse ames? + (slog leaf/"fine: {} not a listener for {}" ~) + =? event-core all + :: notify all listeners by inspecting their + :: ducts and sending appropiate clean up moves :: - =? ke-core =(0 num-fragments.keen) - ?> =(num 1) - (ke-first-rcv meow) - :: - ?. ?=([@ @ @ *] full-path) - ~| fine-path-too-short+full-path - !! - ?. =(`her (slaw %p i.full-path)) - ~| fine-path-bunk-ship+[full-path her] - !! - ?. =(`life.peer-state (slaw %ud i.t.t.full-path)) - ~| fine-path-bunk-life+[full-path life.peer-state] - !! - ?. =(`rift.peer-state (slaw %ud i.t.full-path)) - ~| fine-path-bunk-rift+[full-path rift.peer-state] - !! - ?. (veri-fra:keys [full-path num [dat sig]:meow]) - ~| fine-purr-fail-signature/num^`@ux`sig.meow - ~| life.peer-state - !! - :: - =^ found=? ke-core (ke-on-ack num) + (~(rep in listeners.keen) ke-clean-up) + :: TODO: use ev-trace + %- (slog leaf/"fine: deleting {}" ~) + ke-core(listeners.keen ?:(all ~ (~(del in listeners.keen) duct))) + :: + +| %implementation + :: + ++ ke-on-ack + =| marked=(list want) + |= fra=@ud + ^- [? _ke-core] + =; [[found=? cor=_ke-core] wan=(pha want)] ?. found - (ke-fast-retransmit:og num) - =: hav.keen [[num meow] hav.keen] - num-received.keen +(num-received.keen) - == - ?. =(num-fragments num-received):keen - ke-continue - (ke-done [sig dat]:ke-sift-full) - :: - ++ ke-sub - |=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct))) - :: scry is autocancelled in +ke-abet if no more listeners - :: - ++ ke-unsub - |= [=^duct all=?] - ?. |(all (~(has in listeners.keen) duct)) - %. ke-core - :: XX TODO use trace, add fine flags? reuse ames? - (slog leaf/"fine: {} not a listener for {}" ~) - =? event-core all - :: notify all listeners by inspecting their - :: ducts and sending appropiate clean up moves - :: - (~(rep in listeners.keen) ke-clean-up) - :: TODO: use ev-trace - %- (slog leaf/"fine: deleting {}" ~) - ke-core(listeners.keen ?:(all ~ (~(del in listeners.keen) duct))) - :: - +| %implementation - :: - ++ ke-on-ack - =| marked=(list want) - |= fra=@ud - ^- [? _ke-core] - =; [[found=? cor=_ke-core] wan=(pha want)] - ?. found - [found ke-core] - [found cor(wan.keen wan)] - %^ (dip-left:ke-deq ,[found=? cor=_ke-core]) wan.keen - [| ke-core] - |= [[found=? cor=_ke-core] =want] - ^- [(unit _want) stop=? [found=? cor=_ke-core]] - =. ke-core cor - ?: =(fra fra.want) - =. metrics.keen - (on-ack:ke-gauge +>.want) - [~ %.y %.y ke-core] - =. skips.want +(skips.want) - =^ resend=? metrics.keen - (on-skipped-packet:ke-gauge +>.want) - ?. resend - [`want %.n found ke-core] - =. tries.want +(tries.want) - =. last-sent.want now - =. ke-core (ke-send hoot.want) + [found ke-core] + [found cor(wan.keen wan)] + %^ (dip-left:ke-deq ,[found=? cor=_ke-core]) wan.keen + [| ke-core] + |= [[found=? cor=_ke-core] =want] + ^- [(unit _want) stop=? [found=? cor=_ke-core]] + =. ke-core cor + ?: =(fra fra.want) + =. metrics.keen + (on-ack:ke-gauge +>.want) + [~ %.y %.y ke-core] + =. skips.want +(skips.want) + =^ resend=? metrics.keen + (on-skipped-packet:ke-gauge +>.want) + ?. resend [`want %.n found ke-core] - :: - ++ ke-done - |= [sig=@ data=$@(~ (cask))] - ?> (meri:keys ke-full-path sig data) - ~> %slog.0^leaf/"fine: done {(spud ke-full-path)}" - =/ listeners ~(tap in listeners.keen) - =/ dat=(unit (cask)) - ?~(data ~ `data) - |- ^+ ke-core - ?~ listeners - ke-core - =. event-core - (emit i.listeners %give %tune ke-full-path sig dat) - $(listeners t.listeners) - :: - ++ ke-first-rcv - |= =meow - ^+ ke-core - =- ke-core(keen -) - :: - =/ paz=(list want) - %+ turn (gulf 1 num.meow) - |= fra=@ud - ^- want - [fra (ke-etch-keen fra) now 0 0] - :: - %_ keen - num-fragments num.meow - nex (tail paz) - == - :: +ke-continue: send packets based on normal congestion flow - :: - ++ ke-continue - =| inx=@ud - =| sent=(list @ud) - =/ max num-slots:ke-gauge - |- ^+ ke-core - ?: |(=(~ nex.keen) =(inx max)) - ke-core - =^ =want nex.keen nex.keen - =. last-sent.want now - =. tries.want +(tries.want) - =. wan.keen (snoc:ke-deq wan.keen want) - :: =. metrics.keen (on-sent:ke-gauge 1) - =. ke-core (ke-send hoot.want) - $(inx +(inx)) - :: - ++ ke-sift-full - =, keen - ~| %frag-mismatch - ~| have/num-received - ~| need/num-fragments - ~| path/path - ?> =(num-fragments num-received) - ?> =((lent hav) num-received) - (sift-roar num-fragments hav) - :: - ++ ke-update-qos - |= new=qos - ^+ event-core - =^ old qos.peer-state [qos.peer-state new] - ?~ text=(qos-update-text her old new kay.veb ships.bug.channel) - event-core - :: print message - :: - (emit duct %pass /qos %d %flog %text u.text) - :: - ++ ke-fast-retransmit - |= fra=@ud - =; [cor=_ke-core wants=(pha want)] - cor(wan.keen wants) - %^ (dip-left:ke-deq ,cor=_ke-core) wan.keen - ke-core - |= [cor=_ke-core =want] - ^- [(unit ^want) stop=? cor=_ke-core] - ?. (lte fra.want fra) - [`want & cor] - ?: (gth (next-expiry:ke-gauge:cor +>.want) now) - [`want & cor] - =. last-sent.want now - =. cor (ke-send:cor hoot.want) - [`want | cor] - :: - ++ ke-pass-timer - |= =note - =/ =wire (welp /fine/behn/wake/(scot %p her) path) - ke-core(event-core (emit unix-duct.ames-state %pass wire note)) - :: - ++ ke-set-wake - ^+ ke-core - =/ next-wake=(unit @da) - =/ want=(unit want) (peek-left:ke-deq wan.keen) - ?~ want ~ - `(next-expiry:ke-gauge +>:u.want) - ?: =(next-wake next-wake.keen) - ke-core - =? ke-core !=(~ next-wake.keen) - =/ old (need next-wake.keen) - =. next-wake.keen ~ - (ke-rest old) - =? ke-core ?=(^ next-wake) - =. next-wake.keen next-wake - (ke-wait u.next-wake) + =. tries.want +(tries.want) + =. last-sent.want now + =. ke-core (ke-send hoot.want) + [`want %.n found ke-core] + :: + ++ ke-done + |= [sig=@ data=$@(~ (cask))] + ?> (meri:keys ke-full-path sig data) + ~> %slog.0^leaf/"fine: done {(spud ke-full-path)}" + =/ listeners ~(tap in listeners.keen) + =/ dat=(unit (cask)) + ?~(data ~ `data) + |- ^+ ke-core + ?~ listeners ke-core - :: +ke-take-wake: handle request packet timeout + =. event-core + (emit i.listeners %give %tune ke-full-path sig dat) + $(listeners t.listeners) + :: + ++ ke-first-rcv + |= =meow + ^+ ke-core + =- ke-core(keen -) :: - ++ ke-take-wake - ^+ ke-core + =/ paz=(list want) + %+ turn (gulf 1 num.meow) + |= fra=@ud + ^- want + [fra (ke-etch-keen fra) now 0 0] + :: + %_ keen + num-fragments num.meow + nex (tail paz) + == + :: +ke-continue: send packets based on normal congestion flow + :: + ++ ke-continue + =| inx=@ud + =| sent=(list @ud) + =/ max num-slots:ke-gauge + |- ^+ ke-core + ?: |(=(~ nex.keen) =(inx max)) + ke-core + =^ =want nex.keen nex.keen + =. last-sent.want now + =. tries.want +(tries.want) + =. wan.keen (snoc:ke-deq wan.keen want) + :: =. metrics.keen (on-sent:ke-gauge 1) + =. ke-core (ke-send hoot.want) + $(inx +(inx)) + :: + ++ ke-sift-full + =, keen + ~| %frag-mismatch + ~| have/num-received + ~| need/num-fragments + ~| path/path + ?> =(num-fragments num-received) + ?> =((lent hav) num-received) + (sift-roar num-fragments hav) + :: + ++ ke-update-qos + |= new=qos + ^+ event-core + =^ old qos.peer-state [qos.peer-state new] + ?~ text=(qos-update-text her old new kay.veb ships.bug.channel) + event-core + :: print message + :: + (emit duct %pass /qos %d %flog %text u.text) + :: + ++ ke-fast-retransmit + |= fra=@ud + =; [cor=_ke-core wants=(pha want)] + cor(wan.keen wants) + %^ (dip-left:ke-deq ,cor=_ke-core) wan.keen + ke-core + |= [cor=_ke-core =want] + ^- [(unit ^want) stop=? cor=_ke-core] + ?. (lte fra.want fra) + [`want & cor] + ?: (gth (next-expiry:ke-gauge:cor +>.want) now) + [`want & cor] + =. last-sent.want now + =. cor (ke-send:cor hoot.want) + [`want | cor] + :: + ++ ke-pass-timer + |= =note + =/ =wire (welp /fine/behn/wake/(scot %p her) path) + ke-core(event-core (emit unix-duct.ames-state %pass wire note)) + :: + ++ ke-set-wake + ^+ ke-core + =/ next-wake=(unit @da) + =/ want=(unit want) (peek-left:ke-deq wan.keen) + ?~ want ~ + `(next-expiry:ke-gauge +>:u.want) + ?: =(next-wake next-wake.keen) + ke-core + =? ke-core !=(~ next-wake.keen) + =/ old (need next-wake.keen) =. next-wake.keen ~ - =. event-core %- ke-update-qos - =/ expiry=@da (add ~s30 last-contact.qos.peer-state) - =? -.qos.peer-state - (gte now expiry) - %dead - qos.peer-state - :: expire direct route - =? route.peer-state - ?& ?=(%dead -.qos.peer-state) - ?=(^ route.peer-state) - direct.u.route.peer-state - !=(%czar (clan:title her)) - == - route.peer-state(direct.u %.n) - =. metrics.keen on-timeout:ke-gauge - =^ want=(unit want) wan.keen - (pop-left:ke-deq wan.keen) - ~| %took-wake-for-empty-want - ?> ?=(^ want) - =: tries.u.want +(tries.u.want) - last-sent.u.want now + (ke-rest old) + =? ke-core ?=(^ next-wake) + =. next-wake.keen next-wake + (ke-wait u.next-wake) + ke-core + :: +ke-take-wake: handle request packet timeout + :: + ++ ke-take-wake + ^+ ke-core + =. next-wake.keen ~ + =. event-core %- ke-update-qos + =/ expiry=@da (add ~s30 last-contact.qos.peer-state) + =? -.qos.peer-state + (gte now expiry) + %dead + qos.peer-state + :: expire direct route + =? route.peer-state + ?& ?=(%dead -.qos.peer-state) + ?=(^ route.peer-state) + direct.u.route.peer-state + !=(%czar (clan:title her)) == - =. wan.keen (cons:ke-deq wan.keen u.want) - (ke-send hoot.u.want) - -- + route.peer-state(direct.u %.n) + =. metrics.keen on-timeout:ke-gauge + =^ want=(unit want) wan.keen + (pop-left:ke-deq wan.keen) + ~| %took-wake-for-empty-want + ?> ?=(^ want) + =: tries.u.want +(tries.u.want) + last-sent.u.want now + == + =. wan.keen (cons:ke-deq wan.keen u.want) + (ke-send hoot.u.want) -- :: +ga: constructor for |pump-gauge congestion control core ::