1
0
mirror of https://github.com/ilyakooo0/urbit.git synced 2024-12-21 22:01:46 +03:00

ames: remove |fi core

The entry point arms of |fi are moved to the |pe core and
|ke is now called directly so we avoid doing e.g. abed:ke:fi:peer
This commit is contained in:
yosoyubik 2023-04-05 10:27:47 +02:00
parent 08170068b0
commit d137d78465

View File

@ -493,6 +493,10 @@
=/ vec ~[sndr.shot rcvr.shot sndr-life rcvr-life]
;; shut-packet %- cue %- need
(~(de sivc:aes:crypto (shaz symmetric-key) vec) siv len cyf)
:: ordered map for tracking remote scry requests
::
++ orm ((on @ud keen-state) lte)
::
+| %atomics
::
+$ private-key @uwprivatekey
@ -1503,7 +1507,7 @@
::NOTE we only send requests to ships we know,
:: so we should only get responses from ships we know.
:: below we assume sndr.shot is a known peer.
fi-abet:(on-hear:fi:(abed-got:pe sndr.shot) l shot)
abet:(on-hear-fine:(abed-got:pe sndr.shot) l shot)
:: +on-hear-packet: handle mildly processed packet receipt
::
++ on-hear-packet
@ -1676,7 +1680,7 @@
:: XX use ev-trace?
=/ =tape "; fine dropping malformed wire {<wire>}"
(emit duct %pass /parse-wire %d %flog %text tape)
fi-abet:(on-pine-boon:fi:(abed-got:pe u.her) t.t.t.wire payload)
abet:(on-pine-boon:(abed-got:pe u.her) t.t.t.wire payload)
::
?~ parsed=(parse-bone-wire wire)
~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}"
@ -1890,9 +1894,7 @@
=/ peer-core (abed-peer:pe her.u.res u.state)
?- -.u.res
%pump abet:(on-wake:peer-core bone.u.res error)
::
%fine
fi-abet:ke-abet:ke-take-wake:(ke-abed:ke:fi:peer-core wire.u.res)
%fine abet:ke-abet:ke-take-wake:(ke-abed:ke:peer-core wire.u.res)
==
::
=. event-core (emit duct %pass /recork %b %wait `@da`(add now ~d1))
@ -2143,10 +2145,10 @@
++ meet-alien-fine
|= [peens=(jug path ^duct) key=?(%keen %pine)]
^+ event-core
=+ fine=fi:(abed:pe ship)
=< fi-abet ^+ fine
=+ peer-core=(abed:pe ship)
=< abet ^+ peer-core
%- ~(rep by peens)
|= [[=path ducts=(set ^duct)] cor=_fine]
|= [[=path ducts=(set ^duct)] cor=_peer-core]
%- ~(rep in ducts)
|= [=^duct c=_cor]
%.([path duct] ?-(key %pine on-pine:c, %keen on-keen:c))
@ -2251,7 +2253,7 @@
?< =(our ship)
?^ ship-state=(~(get by peers.ames-state) ship)
?> ?=([%known *] u.ship-state)
fi-abet:(on-pine:fi:(abed-peer:pe ship +.u.ship-state) path duct)
abet:(on-pine:(abed-peer:pe ship +.u.ship-state) path duct)
%+ enqueue-alien-todo ship
|= todos=alien-agenda
todos(pines (~(put ju pines.todos) path duct))
@ -2262,7 +2264,7 @@
=+ ~:(spit path) :: assert length
?^ ship-state=(~(get by peers.ames-state) ship)
?> ?=([%known *] u.ship-state)
fi-abet:(on-keen:fi:(abed-peer:pe ship +.u.ship-state) path duct)
abet:(on-keen:(abed-peer:pe ship +.u.ship-state) path duct)
%+ enqueue-alien-todo ship
|= todos=alien-agenda
todos(keens (~(put ju keens.todos) path duct))
@ -2279,7 +2281,7 @@
::
%- (slog leaf+"ames: missing scry {<path>} for {<ship>}" ~)
event-core
fi-abet:ke-abet:(ke-unsub:(ke-abed:ke:fi:peer path) duct all)
abet:ke-abet:(ke-unsub:(ke-abed:ke:peer path) duct all)
::
+| %implementation
:: +enqueue-alien-todo: helper to enqueue a pending request
@ -2622,6 +2624,46 @@
::
abet:(call:(abed:mu bone) %wake ~)
::
++ on-hear-fine
|= [=lane =shot]
^+ peer-core
?> =(sndr-tick.shot (mod life.peer-state 16))
::
=/ [=peep =meow] (sift-purr `@ux`content.shot)
=/ =path (slag 3 path.peep)
?. (~(has by order.scry) path)
~&(dead-response/peep peer-core)
=< ke-abet
(ke-rcv:(ke-abed:ke path) peep meow lane)
::
++ on-keen
|= [=path =^duct]
^+ peer-core
?: (~(has by order.scry) path)
~> %slog.0^leaf/"fine: dupe {(spud path)}"
ke-abet:(ke-sub:(ke-abed:ke path) duct)
=^ keen-id=@ud seq.scry [seq.scry +(seq.scry)]
=. order.scry (~(put by order.scry) path keen-id)
=. keens.scry (put:orm keens.scry keen-id *keen-state)
ke-abet:(ke-start:(ke-abed:ke path) duct)
::
++ on-pine
|= [=path =^duct]
^+ peer-core
?~ blk=(de-part:balk her rift.peer-state life.peer-state path)
!! :: XX: ???
=/ =wire [%fine %pine (scot %p her) path]
(pe-emit duct %pass wire %a %plea her plea=[%$ /pine `*`u.blk])
::
++ on-pine-boon
|= [=path payload=*]
^+ peer-core
?~ blk=(de-part:balk her rift.peer-state life.peer-state path)
!!
=+ ;;(case=@ud payload)
=. cas.u.blk ud+case
(on-keen (slag 3 (en-path:balk u.blk)) duct)
::
+| %implementation
:: +dedup-message: replace with any existing copy of this message
::
@ -3715,12 +3757,10 @@
(call %done ok=%.y)
--
--
:: +fi: constructor for |fine remote scry core
:: +ke: constructor for |keen remote scry core
::
++ fi
++ ke
=> |%
::
++ orm ((on @ud keen-state) lte)
:: +gum: glue together a list of $byts into one
::
:: TODO: move to hoon.hoon (see +cad in lib/tiny)
@ -3777,382 +3817,325 @@
--
--
::
|%
|_ [=path keen-id=@ud keen=keen-state]
::
+| %helpers
::
++ fine-core .
++ fi-abet abet :: XX +abet:pe, call directly instead?
++ ke-core .
++ ke-abed
|= p=^path
~| no-keen-for-path/p
=. keen-id (~(got by order.scry) p)
ke-core(path p, keen (got:orm keens.scry keen-id))
::
++ ke-abed-id
|= id=@ud
%- ke-abed
~| no-path-for-id/id
%- need
^- (unit ^path)
%- ~(rep by order.scry)
|= [[p=^path i=@ud] out=(unit ^path)]
^- (unit ^path)
?^ out out
?:(=(id i) `p ~)
::
++ ke-abet
^+ peer-core
?: =, keen
:: num-fragments is 0 when unknown (i.e. no response yet)
:: if no-one is listening, kill request
::
?| =(~ listeners.keen)
&(!=(0 num-fragments) =(num-fragments num-received))
==
ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry (put:orm keens.scry keen-id keen)
peer-core
::
++ ke-abet-gone
=? ke-core ?=(^ next-wake.keen)
(ke-rest u.next-wake.keen)
=. keens.scry +:(del:orm keens.scry keen-id)
=. order.scry (~(del by order.scry) path)
peer-core
::
++ ke-full-path
:^ (scot %p her)
(scot %ud rift.peer-state)
(scot %ud life.peer-state)
path
::
++ ke-show
=, keen
:* nex=(lent nex)
hav=(lent hav)
num-fragments=num-fragments
num-received=num-received
next-wake=next-wake
metrics=metrics
==
::
++ ke-deq (deq want)
++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen))
++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim))
++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim))
++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag))
++ ke-send
|=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot)))
::
++ ke-clean-up
|= [=^duct core=_event-core]
?+ duct core
[[%clay *] *] core :: XX TODO?
::
[[%gall %use app=@ *] *]
?. ?=([%gall %use %spider *] -.duct)
core :: XX TODO?
?> ?=([%gall %use %spider @ ship=@ %thread tid=@ *] -.duct)
=/ =cage spider-stop+!>([&7.-.duct |])
=/ poke=* [%0 %m [p q.q]:cage]
=/ =plea [%g /ge/spider poke]
(emit:core duct %pass /fine/unsub %g %plea our plea)
==
::
+| %entry-points
::
++ on-hear
|= [=lane =shot]
^+ fine-core
?> =(sndr-tick.shot (mod life.peer-state 16))
++ ke-start
|= =^duct
~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}"
=. ke-core (ke-sub duct)
?> =(num-fragments.keen 0)
=/ fra=@ 1
=/ req=hoot (ke-etch-keen fra)
=/ =want [fra req last=now tries=1 skips=0]
=. wan.keen (cons:ke-deq *(pha ^want) want)
:: =. metrics.keen (on-sent:ke-gauge 1)
(ke-send req)
::
++ ke-rcv
|= [[=full=^path num=@ud] =meow =lane:ames]
^+ ke-core
=/ og ke-core
=. event-core (ke-update-qos %live last-contact=now)
:: handle empty
?: =(0 num.meow)
?> =(~ dat.meow)
(ke-done sig.meow ~)
:: update congestion, or fill details
::
=/ [=peep =meow] (sift-purr `@ux`content.shot)
=/ =path (slag 3 path.peep)
?. (~(has by order.scry) path)
~&(dead-response/peep fine-core)
=< ke-abet
(ke-rcv:(ke-abed:ke path) peep meow lane)
::
++ on-keen
|= [=path =^duct]
^+ fine-core
?: (~(has by order.scry) path)
~> %slog.0^leaf/"fine: dupe {(spud path)}"
ke-abet:(ke-sub:(ke-abed:ke path) duct)
=^ keen-id=@ud seq.scry [seq.scry +(seq.scry)]
=. order.scry (~(put by order.scry) path keen-id)
=. keens.scry (put:orm keens.scry keen-id *keen-state)
ke-abet:(ke-start:(ke-abed:ke path) duct)
::
++ on-pine
|= [=path =^duct]
^+ fine-core
?~ blk=(de-part:balk her rift.peer-state life.peer-state path)
!! :: XX: ???
=/ =wire [%fine %pine (scot %p her) path]
=/ =plea [%$ /pine `*`u.blk]
=. event-core (emit duct %pass wire %a %plea her plea)
fine-core
::
++ on-pine-boon
|= [=path payload=*]
^+ fine-core
?~ blk=(de-part:balk her rift.peer-state life.peer-state path)
=? ke-core =(0 num-fragments.keen)
?> =(num 1)
(ke-first-rcv meow)
::
?. ?=([@ @ @ *] full-path)
~| fine-path-too-short+full-path
!!
?. =(`her (slaw %p i.full-path))
~| fine-path-bunk-ship+[full-path her]
!!
?. =(`life.peer-state (slaw %ud i.t.t.full-path))
~| fine-path-bunk-life+[full-path life.peer-state]
!!
?. =(`rift.peer-state (slaw %ud i.t.full-path))
~| fine-path-bunk-rift+[full-path rift.peer-state]
!!
?. (veri-fra:keys [full-path num [dat sig]:meow])
~| fine-purr-fail-signature/num^`@ux`sig.meow
~| life.peer-state
!!
=+ ;;(case=@ud payload)
=. cas.u.blk ud+case
(on-keen (slag 3 (en-path:balk u.blk)) duct)
::
+| %internal
::
++ ke
|_ [=path keen-id=@ud keen=keen-state]
::
+| %helpers
::
++ ke-core .
++ ke-abed
|= p=^path
~| no-keen-for-path/p
=. keen-id (~(got by order.scry) p)
ke-core(path p, keen (got:orm keens.scry keen-id))
::
++ ke-abed-id
|= id=@ud
%- ke-abed
~| no-path-for-id/id
%- need
^- (unit ^path)
%- ~(rep by order.scry)
|= [[p=^path i=@ud] out=(unit ^path)]
^- (unit ^path)
?^ out out
?:(=(id i) `p ~)
::
++ ke-abet
^+ fine-core
?: =, keen
:: num-fragments is 0 when unknown (i.e. no response yet)
:: if no-one is listening, kill request
::
?| =(~ listeners.keen)
&(!=(0 num-fragments) =(num-fragments num-received))
==
ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry (put:orm keens.scry keen-id keen)
fine-core
::
++ ke-abet-gone
=? ke-core ?=(^ next-wake.keen)
(ke-rest u.next-wake.keen)
=. keens.scry
+:(del:orm keens.scry keen-id)
=. order.scry
(~(del by order.scry) path)
fine-core
::
++ ke-full-path
:^ (scot %p her)
(scot %ud rift.peer-state)
(scot %ud life.peer-state)
path
::
++ ke-show
=, keen
:* nex=(lent nex)
hav=(lent hav)
num-fragments=num-fragments
num-received=num-received
next-wake=next-wake
metrics=metrics
=^ found=? ke-core (ke-on-ack num)
?. found
(ke-fast-retransmit:og num)
=: hav.keen [[num meow] hav.keen]
num-received.keen +(num-received.keen)
==
::
++ ke-deq (deq want)
++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen))
++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim))
++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim))
++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag))
++ ke-send
|=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot)))
::
++ ke-clean-up
|= [=^duct core=_event-core]
?+ duct core
[[%clay *] *] core :: XX TODO
::
[[%gall %use app=@ *] *]
?. ?=([%gall %use %spider *] -.duct)
core :: XX TODO
?> ?=([%gall %use %spider @ ship=@ %thread tid=@ *] -.duct)
=/ =cage spider-stop+!>([&7.-.duct |])
=/ poke=* [%0 %m [p q.q]:cage]
=/ =plea [%g /ge/spider poke]
(emit:core duct %pass /fine/unsub %g %plea our plea)
==
::
+| %entry-points
::
++ ke-start
|= =^duct
~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}"
=. ke-core (ke-sub duct)
?> =(num-fragments.keen 0)
=/ fra=@ 1
=/ req=hoot (ke-etch-keen fra)
=/ =want [fra req last=now tries=1 skips=0]
=. wan.keen (cons:ke-deq *(pha ^want) want)
:: =. metrics.keen (on-sent:ke-gauge 1)
(ke-send req)
::
++ ke-rcv
|= [[=full=^path num=@ud] =meow =lane:ames]
^+ ke-core
=/ og ke-core
=. event-core (ke-update-qos %live last-contact=now)
:: handle empty
?: =(0 num.meow)
?> =(~ dat.meow)
(ke-done sig.meow ~)
:: update congestion, or fill details
?. =(num-fragments num-received):keen
ke-continue
(ke-done [sig dat]:ke-sift-full)
::
++ ke-sub
|=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct)))
:: scry is autocancelled in +ke-abet if no more listeners
::
++ ke-unsub
|= [=^duct all=?]
?. |(all (~(has in listeners.keen) duct))
%. ke-core
:: XX TODO use trace, add fine flags? reuse ames?
(slog leaf/"fine: {<duct>} not a listener for {<path>}" ~)
=? event-core all
:: notify all listeners by inspecting their
:: ducts and sending appropiate clean up moves
::
=? ke-core =(0 num-fragments.keen)
?> =(num 1)
(ke-first-rcv meow)
::
?. ?=([@ @ @ *] full-path)
~| fine-path-too-short+full-path
!!
?. =(`her (slaw %p i.full-path))
~| fine-path-bunk-ship+[full-path her]
!!
?. =(`life.peer-state (slaw %ud i.t.t.full-path))
~| fine-path-bunk-life+[full-path life.peer-state]
!!
?. =(`rift.peer-state (slaw %ud i.t.full-path))
~| fine-path-bunk-rift+[full-path rift.peer-state]
!!
?. (veri-fra:keys [full-path num [dat sig]:meow])
~| fine-purr-fail-signature/num^`@ux`sig.meow
~| life.peer-state
!!
::
=^ found=? ke-core (ke-on-ack num)
(~(rep in listeners.keen) ke-clean-up)
:: TODO: use ev-trace
%- (slog leaf/"fine: deleting {<path>}" ~)
ke-core(listeners.keen ?:(all ~ (~(del in listeners.keen) duct)))
::
+| %implementation
::
++ ke-on-ack
=| marked=(list want)
|= fra=@ud
^- [? _ke-core]
=; [[found=? cor=_ke-core] wan=(pha want)]
?. found
(ke-fast-retransmit:og num)
=: hav.keen [[num meow] hav.keen]
num-received.keen +(num-received.keen)
==
?. =(num-fragments num-received):keen
ke-continue
(ke-done [sig dat]:ke-sift-full)
::
++ ke-sub
|=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct)))
:: scry is autocancelled in +ke-abet if no more listeners
::
++ ke-unsub
|= [=^duct all=?]
?. |(all (~(has in listeners.keen) duct))
%. ke-core
:: XX TODO use trace, add fine flags? reuse ames?
(slog leaf/"fine: {<duct>} not a listener for {<path>}" ~)
=? event-core all
:: notify all listeners by inspecting their
:: ducts and sending appropiate clean up moves
::
(~(rep in listeners.keen) ke-clean-up)
:: TODO: use ev-trace
%- (slog leaf/"fine: deleting {<path>}" ~)
ke-core(listeners.keen ?:(all ~ (~(del in listeners.keen) duct)))
::
+| %implementation
::
++ ke-on-ack
=| marked=(list want)
|= fra=@ud
^- [? _ke-core]
=; [[found=? cor=_ke-core] wan=(pha want)]
?. found
[found ke-core]
[found cor(wan.keen wan)]
%^ (dip-left:ke-deq ,[found=? cor=_ke-core]) wan.keen
[| ke-core]
|= [[found=? cor=_ke-core] =want]
^- [(unit _want) stop=? [found=? cor=_ke-core]]
=. ke-core cor
?: =(fra fra.want)
=. metrics.keen
(on-ack:ke-gauge +>.want)
[~ %.y %.y ke-core]
=. skips.want +(skips.want)
=^ resend=? metrics.keen
(on-skipped-packet:ke-gauge +>.want)
?. resend
[`want %.n found ke-core]
=. tries.want +(tries.want)
=. last-sent.want now
=. ke-core (ke-send hoot.want)
[found ke-core]
[found cor(wan.keen wan)]
%^ (dip-left:ke-deq ,[found=? cor=_ke-core]) wan.keen
[| ke-core]
|= [[found=? cor=_ke-core] =want]
^- [(unit _want) stop=? [found=? cor=_ke-core]]
=. ke-core cor
?: =(fra fra.want)
=. metrics.keen
(on-ack:ke-gauge +>.want)
[~ %.y %.y ke-core]
=. skips.want +(skips.want)
=^ resend=? metrics.keen
(on-skipped-packet:ke-gauge +>.want)
?. resend
[`want %.n found ke-core]
::
++ ke-done
|= [sig=@ data=$@(~ (cask))]
?> (meri:keys ke-full-path sig data)
~> %slog.0^leaf/"fine: done {(spud ke-full-path)}"
=/ listeners ~(tap in listeners.keen)
=/ dat=(unit (cask))
?~(data ~ `data)
|- ^+ ke-core
?~ listeners
ke-core
=. event-core
(emit i.listeners %give %tune ke-full-path sig dat)
$(listeners t.listeners)
::
++ ke-first-rcv
|= =meow
^+ ke-core
=- ke-core(keen -)
::
=/ paz=(list want)
%+ turn (gulf 1 num.meow)
|= fra=@ud
^- want
[fra (ke-etch-keen fra) now 0 0]
::
%_ keen
num-fragments num.meow
nex (tail paz)
==
:: +ke-continue: send packets based on normal congestion flow
::
++ ke-continue
=| inx=@ud
=| sent=(list @ud)
=/ max num-slots:ke-gauge
|- ^+ ke-core
?: |(=(~ nex.keen) =(inx max))
ke-core
=^ =want nex.keen nex.keen
=. last-sent.want now
=. tries.want +(tries.want)
=. wan.keen (snoc:ke-deq wan.keen want)
:: =. metrics.keen (on-sent:ke-gauge 1)
=. ke-core (ke-send hoot.want)
$(inx +(inx))
::
++ ke-sift-full
=, keen
~| %frag-mismatch
~| have/num-received
~| need/num-fragments
~| path/path
?> =(num-fragments num-received)
?> =((lent hav) num-received)
(sift-roar num-fragments hav)
::
++ ke-update-qos
|= new=qos
^+ event-core
=^ old qos.peer-state [qos.peer-state new]
?~ text=(qos-update-text her old new kay.veb ships.bug.channel)
event-core
:: print message
::
(emit duct %pass /qos %d %flog %text u.text)
::
++ ke-fast-retransmit
|= fra=@ud
=; [cor=_ke-core wants=(pha want)]
cor(wan.keen wants)
%^ (dip-left:ke-deq ,cor=_ke-core) wan.keen
ke-core
|= [cor=_ke-core =want]
^- [(unit ^want) stop=? cor=_ke-core]
?. (lte fra.want fra)
[`want & cor]
?: (gth (next-expiry:ke-gauge:cor +>.want) now)
[`want & cor]
=. last-sent.want now
=. cor (ke-send:cor hoot.want)
[`want | cor]
::
++ ke-pass-timer
|= =note
=/ =wire (welp /fine/behn/wake/(scot %p her) path)
ke-core(event-core (emit unix-duct.ames-state %pass wire note))
::
++ ke-set-wake
^+ ke-core
=/ next-wake=(unit @da)
=/ want=(unit want) (peek-left:ke-deq wan.keen)
?~ want ~
`(next-expiry:ke-gauge +>:u.want)
?: =(next-wake next-wake.keen)
ke-core
=? ke-core !=(~ next-wake.keen)
=/ old (need next-wake.keen)
=. next-wake.keen ~
(ke-rest old)
=? ke-core ?=(^ next-wake)
=. next-wake.keen next-wake
(ke-wait u.next-wake)
=. tries.want +(tries.want)
=. last-sent.want now
=. ke-core (ke-send hoot.want)
[`want %.n found ke-core]
::
++ ke-done
|= [sig=@ data=$@(~ (cask))]
?> (meri:keys ke-full-path sig data)
~> %slog.0^leaf/"fine: done {(spud ke-full-path)}"
=/ listeners ~(tap in listeners.keen)
=/ dat=(unit (cask))
?~(data ~ `data)
|- ^+ ke-core
?~ listeners
ke-core
:: +ke-take-wake: handle request packet timeout
=. event-core
(emit i.listeners %give %tune ke-full-path sig dat)
$(listeners t.listeners)
::
++ ke-first-rcv
|= =meow
^+ ke-core
=- ke-core(keen -)
::
++ ke-take-wake
^+ ke-core
=/ paz=(list want)
%+ turn (gulf 1 num.meow)
|= fra=@ud
^- want
[fra (ke-etch-keen fra) now 0 0]
::
%_ keen
num-fragments num.meow
nex (tail paz)
==
:: +ke-continue: send packets based on normal congestion flow
::
++ ke-continue
=| inx=@ud
=| sent=(list @ud)
=/ max num-slots:ke-gauge
|- ^+ ke-core
?: |(=(~ nex.keen) =(inx max))
ke-core
=^ =want nex.keen nex.keen
=. last-sent.want now
=. tries.want +(tries.want)
=. wan.keen (snoc:ke-deq wan.keen want)
:: =. metrics.keen (on-sent:ke-gauge 1)
=. ke-core (ke-send hoot.want)
$(inx +(inx))
::
++ ke-sift-full
=, keen
~| %frag-mismatch
~| have/num-received
~| need/num-fragments
~| path/path
?> =(num-fragments num-received)
?> =((lent hav) num-received)
(sift-roar num-fragments hav)
::
++ ke-update-qos
|= new=qos
^+ event-core
=^ old qos.peer-state [qos.peer-state new]
?~ text=(qos-update-text her old new kay.veb ships.bug.channel)
event-core
:: print message
::
(emit duct %pass /qos %d %flog %text u.text)
::
++ ke-fast-retransmit
|= fra=@ud
=; [cor=_ke-core wants=(pha want)]
cor(wan.keen wants)
%^ (dip-left:ke-deq ,cor=_ke-core) wan.keen
ke-core
|= [cor=_ke-core =want]
^- [(unit ^want) stop=? cor=_ke-core]
?. (lte fra.want fra)
[`want & cor]
?: (gth (next-expiry:ke-gauge:cor +>.want) now)
[`want & cor]
=. last-sent.want now
=. cor (ke-send:cor hoot.want)
[`want | cor]
::
++ ke-pass-timer
|= =note
=/ =wire (welp /fine/behn/wake/(scot %p her) path)
ke-core(event-core (emit unix-duct.ames-state %pass wire note))
::
++ ke-set-wake
^+ ke-core
=/ next-wake=(unit @da)
=/ want=(unit want) (peek-left:ke-deq wan.keen)
?~ want ~
`(next-expiry:ke-gauge +>:u.want)
?: =(next-wake next-wake.keen)
ke-core
=? ke-core !=(~ next-wake.keen)
=/ old (need next-wake.keen)
=. next-wake.keen ~
=. event-core %- ke-update-qos
=/ expiry=@da (add ~s30 last-contact.qos.peer-state)
=? -.qos.peer-state
(gte now expiry)
%dead
qos.peer-state
:: expire direct route
=? route.peer-state
?& ?=(%dead -.qos.peer-state)
?=(^ route.peer-state)
direct.u.route.peer-state
!=(%czar (clan:title her))
==
route.peer-state(direct.u %.n)
=. metrics.keen on-timeout:ke-gauge
=^ want=(unit want) wan.keen
(pop-left:ke-deq wan.keen)
~| %took-wake-for-empty-want
?> ?=(^ want)
=: tries.u.want +(tries.u.want)
last-sent.u.want now
(ke-rest old)
=? ke-core ?=(^ next-wake)
=. next-wake.keen next-wake
(ke-wait u.next-wake)
ke-core
:: +ke-take-wake: handle request packet timeout
::
++ ke-take-wake
^+ ke-core
=. next-wake.keen ~
=. event-core %- ke-update-qos
=/ expiry=@da (add ~s30 last-contact.qos.peer-state)
=? -.qos.peer-state
(gte now expiry)
%dead
qos.peer-state
:: expire direct route
=? route.peer-state
?& ?=(%dead -.qos.peer-state)
?=(^ route.peer-state)
direct.u.route.peer-state
!=(%czar (clan:title her))
==
=. wan.keen (cons:ke-deq wan.keen u.want)
(ke-send hoot.u.want)
--
route.peer-state(direct.u %.n)
=. metrics.keen on-timeout:ke-gauge
=^ want=(unit want) wan.keen
(pop-left:ke-deq wan.keen)
~| %took-wake-for-empty-want
?> ?=(^ want)
=: tries.u.want +(tries.u.want)
last-sent.u.want now
==
=. wan.keen (cons:ke-deq wan.keen u.want)
(ke-send hoot.u.want)
--
:: +ga: constructor for |pump-gauge congestion control core
::