ames: reorder |keen arms

This commit is contained in:
yosoyubik 2023-03-24 13:59:29 +01:00
parent 3269192b29
commit 140e5d2b5f

View File

@ -1490,9 +1490,7 @@
^+ event-core ^+ event-core
=/ =shot (sift-shot b) =/ =shot (sift-shot b)
?: sam.shot (on-hear-packet l shot d) ?: sam.shot (on-hear-packet l shot d)
?: req.shot ?: req.shot ~|([%fine %request-events-forbidden] !!)
~|([%fine %request-events-forbidden] !!)
:: (on-hear-response:fine l shot d)
?^ d ?^ d
::TODO handle ::TODO handle
~& [%fine %done-goofed mote.u.d] ~& [%fine %done-goofed mote.u.d]
@ -1502,8 +1500,7 @@
::NOTE we only send requests to ships we know, ::NOTE we only send requests to ships we know,
:: so we should only get responses from ships we know. :: so we should only get responses from ships we know.
:: below we assume sndr.shot is a known peer. :: below we assume sndr.shot is a known peer.
=* from sndr.shot fi-abet:(on-hear:fi:(abed-got:pe sndr.shot) l shot)
fi-abet:(on-hear:fi:(abed-got:pe from) l shot)
:: +on-hear-packet: handle mildly processed packet receipt :: +on-hear-packet: handle mildly processed packet receipt
:: ::
++ on-hear-packet ++ on-hear-packet
@ -1672,7 +1669,6 @@
|= [=wire payload=*] |= [=wire payload=*]
^+ event-core ^+ event-core
?: ?=([%fine %pine @ *] wire) ?: ?=([%fine %pine @ *] wire)
:: XX TODO refactor
?~ her=(slaw %p i.t.t.wire) ?~ her=(slaw %p i.t.t.wire)
:: XX use ev-trace? :: XX use ev-trace?
=/ =tape "; fine dropping malformed wire {<wire>}" =/ =tape "; fine dropping malformed wire {<wire>}"
@ -3716,7 +3712,7 @@
(call %done ok=%.y) (call %done ok=%.y)
-- --
-- --
:: +fi: construct |fine remote scry core :: +fi: constructor for |fine remote scry core
:: ::
++ fi ++ fi
=> |% => |%
@ -3830,50 +3826,12 @@
(on-keen (slag 3 (en-path:balk u.blk)) duct) (on-keen (slag 3 (en-path:balk u.blk)) duct)
:: ::
+| %internal +| %internal
:: XX TODO rethink core naming/structure to follow current ames
++ ke ++ ke
|_ $: =path |_ [=path keen-id=@ud keen=keen-state]
keen-id=@ud ::
keen=keen-state +| %helpers
==
:: ::
++ ke-core . ++ ke-core .
++ ke-abet
^+ fine-core
=/ gone=?
=, keen
:: num-fragments is 0 when unknown (i.e. no response
:: yet)
:: if no-one is listening, kill request
?| =(~ listeners.keen)
&(!=(0 num-fragments) =(num-fragments num-received))
==
?: gone
ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry
(put:orm keens.scry keen-id keen)
fine-core
::
++ ke-show
=, keen
:* nex=(lent nex)
hav=(lent hav)
num-fragments=num-fragments
num-received=num-received
next-wake=next-wake
metrics=metrics
==
::
++ ke-abet-gone
=? ke-core ?=(^ next-wake.keen)
(ke-rest u.next-wake.keen)
=. keens.scry
+:(del:orm keens.scry keen-id)
=. order.scry
(~(del by order.scry) path)
fine-core
::
++ ke-abed ++ ke-abed
|= p=^path |= p=^path
~| no-keen-for-path/p ~| no-keen-for-path/p
@ -3892,26 +3850,119 @@
?^ out out ?^ out out
?:(=(id i) `p ~) ?:(=(id i) `p ~)
:: ::
++ ke-deq (deq want) ++ ke-abet
^+ fine-core
=/ gone=?
=, keen
:: num-fragments is 0 when unknown (i.e. no response yet)
:: if no-one is listening, kill request
?| =(~ listeners.keen)
&(!=(0 num-fragments) =(num-fragments num-received))
==
?: gone
ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry
(put:orm keens.scry keen-id keen)
fine-core
::
++ ke-abet-gone
=? ke-core ?=(^ next-wake.keen)
(ke-rest u.next-wake.keen)
=. keens.scry
+:(del:orm keens.scry keen-id)
=. order.scry
(~(del by order.scry) path)
fine-core
::
++ ke-full-path ++ ke-full-path
:^ (scot %p her) :^ (scot %p her)
(scot %ud rift.peer-state) (scot %ud rift.peer-state)
(scot %ud life.peer-state) (scot %ud life.peer-state)
path path
:: ::
++ ke-update-qos ++ ke-show
|= =new=qos =, keen
^+ event-core :* nex=(lent nex)
=^ old-qos qos.peer-state [qos.peer-state new-qos] hav=(lent hav)
?~ text=(qos-update-text her old-qos new-qos kay.veb ships.bug.channel) num-fragments=num-fragments
event-core num-received=num-received
:: print message next-wake=next-wake
metrics=metrics
==
:: ::
(emit duct %pass /qos %d %flog %text u.text) ++ ke-deq (deq want)
++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen))
++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim))
++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim))
++ ke-etch-keen |=(frag=@ud (etch-keen ke-full-path frag))
++ ke-send
|=(=hoot ke-core(event-core (send-blob for=| her `@ux`hoot)))
:: ::
++ ke-etch-keen +| %entry-points
|= frag=@ud ::
(etch-keen ke-full-path frag) ++ ke-start
|= =^duct
~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}"
=. ke-core (ke-sub duct)
?> =(num-fragments.keen 0)
=/ fra=@ 1
=/ req (ke-etch-keen fra)
=/ =want [fra req now 1 0]
=. wan.keen (cons:ke-deq *(pha ^want) want)
:: =. metrics.keen (on-sent:ke-gauge 1)
(ke-send req)
::
++ ke-rcv
|= [[=full=^path num=@ud] =meow =lane:ames]
^+ ke-core
=/ og ke-core
=. event-core (ke-update-qos %live last-contact=now)
:: handle empty
?: =(0 num.meow)
?> =(~ dat.meow)
(ke-done sig.meow ~)
:: update congestion, or fill details
::
=? ke-core =(0 num-fragments.keen)
?> =(num 1)
(ke-first-rcv meow)
::
?. ?=([@ @ @ *] full-path)
~| fine-path-too-short+full-path
!!
?. =(`her (slaw %p i.full-path))
~| fine-path-bunk-ship+[full-path her]
!!
?. =(`life.peer-state (slaw %ud i.t.t.full-path))
~| fine-path-bunk-life+[full-path life.peer-state]
!!
?. =(`rift.peer-state (slaw %ud i.t.full-path))
~| fine-path-bunk-rift+[full-path rift.peer-state]
!!
?. (veri-fra:keys [full-path num [dat sig]:meow])
~| fine-purr-fail-signature/num^`@ux`sig.meow
~| life.peer-state
!!
::
=^ found=? ke-core (ke-on-ack num)
?. found
(ke-fast-retransmit:og num)
=: hav.keen [[num meow] hav.keen]
num-received.keen +(num-received.keen)
==
?. =(num-fragments num-received):keen
ke-continue
(ke-done [sig dat]:ke-sift-full)
::
++ ke-sub
|=(=^duct ke-core(listeners.keen (~(put in listeners.keen) duct)))
:: scry is autocancelled in +ke-abet if no more listeners
::
++ ke-unsub
|=(=^duct ke-core(listeners.keen (~(del in listeners.keen) duct)))
::
+| %implementation
:: ::
++ ke-on-ack ++ ke-on-ack
=| marked=(list want) =| marked=(list want)
@ -3940,18 +3991,6 @@
=. ke-core (ke-send hoot.want) =. ke-core (ke-send hoot.want)
[`want %.n found ke-core] [`want %.n found ke-core]
:: ::
++ ke-start
|= =^duct
~> %slog.0^leaf/"fine: keen {(spud ke-full-path)}"
=. ke-core (ke-sub duct)
?> =(num-fragments.keen 0)
=/ fra=@ 1
=/ req (ke-etch-keen fra)
=/ =want [fra req now 1 0]
=. wan.keen (cons:ke-deq *(pha ^want) want)
:: =. metrics.keen (on-sent:ke-gauge 1)
(ke-send req)
::
++ ke-done ++ ke-done
|= [sig=@ data=$@(~ (cask))] |= [sig=@ data=$@(~ (cask))]
?> (meri:keys ke-full-path sig data) ?> (meri:keys ke-full-path sig data)
@ -3998,18 +4037,7 @@
=. ke-core (ke-send hoot.want) =. ke-core (ke-send hoot.want)
$(inx +(inx)) $(inx +(inx))
:: ::
++ ke-sub
|= =^duct
ke-core(listeners.keen (~(put in listeners.keen) duct))
:: scry is autocancelled in +ke-abet if no more listeners
:: ::
++ ke-unsub
|= =^duct
ke-core(listeners.keen (~(del in listeners.keen) duct))
::
++ ke-send
|= =hoot
ke-core(event-core (send-blob for=| her `@ux`hoot))
:: ::
++ ke-sift-full ++ ke-sift-full
=, keen =, keen
@ -4021,47 +4049,15 @@
?> =((lent hav) num-received) ?> =((lent hav) num-received)
(sift-roar num-fragments hav) (sift-roar num-fragments hav)
:: ::
++ ke-rcv ++ ke-update-qos
|= [[=full=^path num=@ud] =meow =lane:ames] |= new=qos
^+ ke-core ^+ event-core
=/ og ke-core =^ old qos.peer-state [qos.peer-state new]
=. event-core (ke-update-qos %live last-contact=now) ?~ text=(qos-update-text her old new kay.veb ships.bug.channel)
:: handle empty event-core
?: =(0 num.meow) :: print message
?> =(~ dat.meow)
(ke-done sig.meow ~)
:: update congestion, or fill details
:: ::
=? ke-core =(0 num-fragments.keen) (emit duct %pass /qos %d %flog %text u.text)
?> =(num 1)
(ke-first-rcv meow)
::
?. ?=([@ @ @ *] full-path)
~| fine-path-too-short+full-path
!!
?. =(`her (slaw %p i.full-path))
~| fine-path-bunk-ship+[full-path her]
!!
?. =(`life.peer-state (slaw %ud i.t.t.full-path))
~| fine-path-bunk-life+[full-path life.peer-state]
!!
?. =(`rift.peer-state (slaw %ud i.t.full-path))
~| fine-path-bunk-rift+[full-path rift.peer-state]
!!
?. (veri-fra:keys [full-path num [dat sig]:meow])
~| fine-purr-fail-signature/num^`@ux`sig.meow
~| life.peer-state
!!
::
=^ found=? ke-core (ke-on-ack num)
?. found
(ke-fast-retransmit:og num)
=: hav.keen [[num meow] hav.keen]
num-received.keen +(num-received.keen)
==
?. =(num-fragments num-received):keen
ke-continue
(ke-done [sig dat]:ke-sift-full)
:: ::
++ ke-fast-retransmit ++ ke-fast-retransmit
|= fra=@ud |= fra=@ud
@ -4079,17 +4075,11 @@
=. cor (ke-send:cor hoot.want) =. cor (ke-send:cor hoot.want)
[`want | cor] [`want | cor]
:: ::
++ ke-gauge (ga metrics.keen (wyt:ke-deq wan.keen))
::
++ ke-timer-wire
`wire`(welp /fine/behn/wake/(scot %p her) path)
::
++ ke-pass-timer ++ ke-pass-timer
|= =note |= =note
ke-core(event-core (emit unix-duct.ames-state %pass ke-timer-wire note)) =/ =wire (welp /fine/behn/wake/(scot %p her) path)
ke-core(event-core (emit unix-duct.ames-state %pass wire note))
:: ::
++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim))
++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim))
++ ke-set-wake ++ ke-set-wake
^+ ke-core ^+ ke-core
=/ next-wake=(unit @da) =/ next-wake=(unit @da)
@ -4137,7 +4127,7 @@
(ke-send hoot.u.want) (ke-send hoot.u.want)
-- --
-- --
:: +ga: construct |pump-gauge congestion control core :: +ga: constructor for |pump-gauge congestion control core
:: ::
++ ga ++ ga
|= [pump-metrics live-packets=@ud] |= [pump-metrics live-packets=@ud]