Merge remote-tracking branch 'origin/yu/ames-fixes' into yu/gall-rq-wire-ames-flow-kill

This commit is contained in:
yosoyubik 2022-05-03 13:57:39 +02:00
commit 53e1c86833
6 changed files with 439 additions and 140 deletions

View File

@ -206,13 +206,14 @@
'ship'^(ship s)
'path'^(path p)
==
:: TODO: display subscription nonce
::
++ outgoing
|= =boat:gall
^- json
:- %a
%+ turn ~(tap by boat)
|= [[w=wire s=^ship t=term] [a=? p=^path]]
|= [[w=wire s=^ship t=term] [a=? p=^path nonce=@]]
%- pairs
:~ 'wire'^(path w)
'ship'^(ship s)

View File

@ -607,7 +607,7 @@
==
:_ state
%+ murn ~(tap by wex.bowl)
|= [[=wire =ship =term] [acked=? =path]]
|= [[=wire =ship =term] [acked=? =path nonce=@]]
^- (unit card)
?. ?& ?=([%thread @ *] wire)
=(tid i.t.wire)

View File

@ -351,6 +351,7 @@
:: %hear: packet from unix
:: %heed: track peer's responsiveness; gives %clog if slow
:: %jilt: stop tracking peer's responsiveness
:: %cork: request to delete message flow
:: %plea: request to send message
::
:: System and Lifecycle Tasks
@ -366,6 +367,7 @@
$% [%hear =lane =blob]
[%heed =ship]
[%jilt =ship]
[%cork =ship]
$>(%plea vane-task)
::
$>(%born vane-task)
@ -524,6 +526,8 @@
rcv=(map bone message-sink-state)
nax=(set [=bone =message-num])
heeds=(set duct)
closing=(set bone)
corked=(set bone)
==
:: $qos: quality of service; how is our connection to a peer doing?
::
@ -1653,7 +1657,7 @@
+$ bitt (map duct (pair ship path)) :: incoming subs
+$ boat :: outgoing subs
%+ map [=wire =ship =term] ::
[acked=? =path] ::
[acked=? =path nonce=@] ::
+$ bowl :: standard app state
$: $: our=ship :: host
src=ship :: guest

View File

@ -586,6 +586,24 @@
::
+$ naxplanation [=message-num =error]
::
+| %statics
::
:: $ames-state: state for entire vane
::
:: peers: states of connections to other ships
:: unix-duct: handle to give moves to unix
:: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing
:: bug: debug printing configuration
::
+$ ames-state
$: peers=(map ship ship-state)
=unix=duct
=life
crypto-core=acru:ames
=bug
==
::
+$ ames-state-4 ames-state-5
+$ ames-state-5
$: peers=(map ship ship-state-5)
@ -616,23 +634,34 @@
heeds=(set duct)
==
::
+| %statics
::
:: $ames-state: state for entire vane
::
:: peers: states of connections to other ships
:: unix-duct: handle to give moves to unix
:: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing
:: bug: debug printing configuration
::
+$ ames-state
$: peers=(map ship ship-state)
+$ ames-state-6
$: peers=(map ship ship-state-6)
=unix=duct
=life
crypto-core=acru:ames
=bug
==
::
+$ ship-state-6
$% [%alien alien-agenda]
[%known peer-state-6]
==
::
+$ peer-state-6
$: $: =symmetric-key
=life
=rift
=public-key
sponsor=ship
==
route=(unit [direct=? =lane])
=qos
=ossuary
snd=(map bone message-pump-state)
rcv=(map bone message-sink-state)
nax=(set [=bone =message-num])
heeds=(set duct)
==
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
@ -716,12 +745,14 @@
:: $message-pump-gift: effect from |message-pump
::
:: %done: report message acknowledgment
:: %cork: kill flow
:: %send: emit message fragment
:: %wait: set a new timer at .date
:: %rest: cancel timer at .date
::
+$ message-pump-gift
$% [%done =message-num error=(unit error)]
[%cork ~]
[%send =static-fragment]
[%wait date=@da]
[%rest date=@da]
@ -758,7 +789,7 @@
:: .ok: %.y unless previous failed attempt
::
+$ message-sink-task
$% [%done ok=?]
$% [%done ok=? cork=?]
[%drop =message-num]
[%hear =lane =shut-packet ok=?]
==
@ -770,6 +801,7 @@
+$ message-sink-gift
$% [%memo =message-num message=*]
[%send =message-num =ack-meat]
[%cork ~]
==
--
:: external vane interface
@ -779,7 +811,7 @@
::
=< =* adult-gate .
=| queued-events=(qeu queued-event)
=| cached-state=(unit [%5 ames-state-5])
=| cached-state=(unit [%6 ames-state-6])
::
|= [now=@da eny=@ rof=roof]
=* larval-gate .
@ -803,7 +835,7 @@
?: update-ready
=. ames-state.adult-gate
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
(state-6-to-7:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.1^leaf/"ames: metamorphosis reload"
[~ adult-gate]
@ -886,7 +918,7 @@
?: update-ready
=. ames-state.adult-gate
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
(state-6-to-7:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.1^leaf/"ames: metamorphosis reload"
[moves adult-gate]
@ -920,6 +952,13 @@
[%adult state=ames-state-5]
== ==
$: %6
$% $: %larva
events=(qeu queued-event)
state=ames-state-6
==
[%adult state=ames-state-6]
== ==
$: %7
$% $: %larva
events=(qeu queued-event)
state=_ames-state.adult-gate
@ -935,23 +974,32 @@
=. adult-gate (load:adult-core %4 state.old)
larval-gate
::
[%5 %adult *]
=. cached-state `[%5 state.old]
~> %slog.1^leaf/"ames: larva reload"
larval-gate
[%5 %adult *] (load:adult-core %5 state.old)
::
[%5 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %5 state.old)
larval-gate
::
[%6 %adult *] (load:adult-core %6 state.old)
[%6 %adult *]
=. cached-state `[%6 state.old]
~> %slog.1^leaf/"ames: larva reload"
larval-gate
::
[%6 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %6 state.old)
larval-gate
::
[%7 %adult *] (load:adult-core %7 state.old)
::
[%7 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %7 state.old)
larval-gate
::
==
--
:: adult ames, after metamorphosis from larva
@ -993,6 +1041,7 @@
%trim on-trim:event-core
%vega on-vega:event-core
%plea (on-plea:event-core [ship plea]:task)
%cork (on-cork:event-core ship.task)
==
::
[moves ames-gate]
@ -1030,19 +1079,23 @@
=< |= $= old-state
$% [%4 ames-state-4]
[%5 ames-state-5]
[%6 ^ames-state]
[%6 ames-state-6]
[%7 ^ames-state]
==
^+ ames-gate
=? old-state ?=(%4 -.old-state) %5^(state-4-to-5 +.old-state)
:: XX this would crash with ames-state-5 but load is never
=? old-state ?=(%5 -.old-state) %6^(state-5-to-6 +.old-state)
?< ?=(%6 -.old-state)
:: XX this would crash with ames-state-6 but load is never
:: called with it -- the upgrade is handled by the larval load
::
?> ?=(%6 -.old-state)
?> ?=(%7 -.old-state)
ames-gate(ames-state +.old-state)
::
|%
++ state-4-to-5
|= ames-state=ames-state-4
^- ames-state-4
^- ames-state-5
=. peers.ames-state
%- ~(run by peers.ames-state)
|= ship-state=ship-state-4
@ -1059,11 +1112,11 @@
::
++ state-5-to-6
|= ames-state=ames-state-5
^- ^^ames-state
^- ames-state-6
:_ +.ames-state
%- ~(rut by peers.ames-state)
|= [=ship ship-state=ship-state-5]
^- ^ship-state
^- ship-state-6
?. ?=(%known -.ship-state)
ship-state
=/ peer-state=peer-state-5 +.ship-state
@ -1074,12 +1127,24 @@
;; @ud
=< q.q %- need %- need
(rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)])
=/ =^peer-state
:_ +.peer-state
=, -.peer-state
[symmetric-key life rift public-key sponsor]
:- -.ship-state
:_ +.peer-state
=, -.peer-state
[symmetric-key life rift public-key sponsor]
::
++ state-6-to-7
|= ames-state=ames-state-6
^- ^^ames-state
:_ +.ames-state
%- ~(run by peers.ames-state)
|= ship-state=ship-state-6
^- ^ship-state
[-.ship-state peer-state]
?. ?=(%known -.ship-state)
ship-state
:- %known
^- peer-state
:- +<.ship-state
[route qos ossuary snd rcv nax heeds ~ ~]:ship-state
--
:: +scry: dereference namespace
::
@ -1254,13 +1319,14 @@
++ send-ack
|= =bone
^+ event-core
abet:(run-message-sink:peer-core bone %done ok=%.y)
=/ cork=? (~(has in closing.peer-state) bone)
abet:(run-message-sink:peer-core bone %done ok=%.y cork)
:: failed; send message nack packet
::
++ send-nack
|= [=bone =^error]
^+ event-core
=. event-core abet:(run-message-sink:peer-core bone %done ok=%.n)
=. event-core abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n)
=/ =^peer-state (got-peer-state her)
=/ =^channel [[our her] now channel-state -.peer-state]
:: construct nack-trace message, referencing .failed $message-num
@ -1575,7 +1641,31 @@
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
:: since flow kill goes like:
:: client vane cork task -> client ames pass cork as plea ->
:: -> server ames sinks plea -> server ames +on-plea (we are here);
:: if it's %cork plea passed to ames from its sink,
:: give %done and process flow closing after +on-take-done call
::
?: &(=(vane.plea %a) =(path.plea `path`/flow) ?=([%cork *] payload.plea))
(emit duct %give %done ~)
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-cork: handle request to kill a flow
::
++ on-cork
|= =ship
^+ event-core
=/ ship-state (~(get by peers.ames-state) ship)
::
?> ?=([~ %known *] ship-state)
=/ =peer-state +.u.ship-state
=/ =channel [[our ship] now channel-state -.peer-state]
::
=/ =plea [%a /flow [%cork ~]]
::
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
::
=. closing.peer-state (~(put in closing.peer-state) bone)
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-take-wake: receive wakeup or error notification from behn
::
@ -1641,7 +1731,6 @@
::
?- public-keys-result
[%diff @ %rift *]
:: event-core
(on-publ-rift [who to.diff]:public-keys-result)
::
[%diff @ %keys *]
@ -2289,7 +2378,8 @@
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
::
=/ message-pump (make-message-pump message-pump-state channel)
=/ closing=? (~(has in closing.peer-state) bone)
=/ message-pump (make-message-pump message-pump-state channel closing)
=^ pump-gifts message-pump-state (work:message-pump task)
=. snd.peer-state (~(put by snd.peer-state) bone message-pump-state)
:: process effects from |message-pump
@ -2300,6 +2390,7 @@
=. peer-core
?- -.gift
%done (on-pump-done [message-num error]:gift)
%cork on-pump-cork
%send (on-pump-send static-fragment.gift)
%wait (on-pump-wait date.gift)
%rest (on-pump-rest date.gift)
@ -2325,10 +2416,26 @@
:: not a nack-trace bone; relay ack to client vane
::
(emit (got-duct bone) %give %done error)
:: +on-pump-cork: kill flow on cork sender side
::
++ on-pump-cork
^+ peer-core
=. peer-state
=, peer-state
%_ peer-state
snd (~(del by snd) bone)
rcv (~(del by rcv) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
peer-core
:: +on-pump-send: emit message fragment requested by |message-pump
::
++ on-pump-send
|=(f=static-fragment (send-shut-packet bone [message-num %& +]:f))
|= f=static-fragment
(send-shut-packet bone [message-num %& +]:f)
:: +on-pump-wait: relay |message-pump's set-timer request
::
++ on-pump-wait
@ -2353,6 +2460,7 @@
++ run-message-sink
|= [=bone task=message-sink-task]
^+ peer-core
?: (~(has in corked.peer-state) bone) peer-core
:: pass .task to the |message-sink and apply state mutations
::
=/ =message-sink-state
@ -2370,8 +2478,30 @@
?- -.gift
%memo (on-sink-memo [message-num message]:gift)
%send (on-sink-send [message-num ack-meat]:gift)
%cork on-sink-cork
==
$(sink-gifts t.sink-gifts)
:: +on-sink-cork: handle flow kill after server ames has taken %done
::
++ on-sink-cork
^+ peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
=? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state)
=* next-wake u.next-wake.packet-pump-state.message-pump-state
=/ =wire (make-pump-timer-wire her.channel bone)
:: resetting timer for boons
::
(emit [/ames]~ %pass wire %b %rest next-wake)
=. peer-state
=, peer-state
%_ peer-state
rcv (~(del by rcv) bone)
snd (~(del by snd) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
==
peer-core
:: +on-sink-send: emit ack packet as requested by |message-sink
::
++ on-sink-send
@ -2405,10 +2535,14 @@
++ on-sink-boon
|= [=message-num message=*]
^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
:: send ack unconditionally
::
=. peer-core (emit (got-duct bone) %give %boon message)
=. peer-core (run-message-sink bone %done ok=%.y)
=. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
::
?. ?=([%hear * * ok=%.n] task)
:: fresh boon; give message to client vane
@ -2446,7 +2580,7 @@
=+ ;; =naxplanation message
:: ack nack-trace message (only applied if we don't later crash)
::
=. peer-core (run-message-sink bone %done ok=%.y)
=. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
:: flip .bone's second bit to find referenced flow
::
=/ target-bone=^bone (mix 0b10 bone)
@ -2458,6 +2592,10 @@
++ on-sink-plea
|= [=message-num message=*]
^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
%- %+ trace msg.veb
=/ dat [her.channel bone=bone message-num=message-num]
|.("sink plea {<dat>}")
@ -2467,6 +2605,10 @@
:: fresh plea; pass to client vane
::
=+ ;; =plea message
:: if this plea is %cork, put to closing
::
=? closing.peer-state ?=([%cork *] payload.plea)
(~(put in closing.peer-state) bone)
::
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
::
@ -2477,7 +2619,7 @@
==
:: we previously crashed on this message; send nack
::
=. peer-core (run-message-sink bone %done ok=%.n)
=. peer-core (run-message-sink bone %done ok=%.n cork=%.n)
:: also send nack-trace with blank .error for security
::
=/ nack-trace-bone=^bone (mix 0b10 bone)
@ -2491,7 +2633,7 @@
:: +make-message-pump: constructor for |message-pump
::
++ make-message-pump
|= [state=message-pump-state =channel]
|= [state=message-pump-state =channel closing=?]
=* veb veb.bug.channel
=| gifts=(list message-pump-gift)
::
@ -2525,11 +2667,27 @@
%memo (on-memo message-blob.task)
%wake (run-packet-pump %wake current.state)
%hear
?- -.ack-meat.task
%& (on-hear [message-num fragment-num=p.ack-meat]:task)
%| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task)
?- -.ack-meat.task
%&
(on-hear [message-num fragment-num=p.ack-meat]:task)
::
%|
=/ cork=?
=/ top-live
(pry:packet-queue:*make-packet-pump live.packet-pump-state.state)
:: If we send a %cork and get an ack, we can know by
:: sequence number that the ack is for the %cork message
::
?& closing
?=(^ top-live)
=(1 ~(wyt by live.packet-pump-state.state))
=(message-num:task message-num.key.u.top-live)
==
=* ack p.ack-meat.task
%- on-done
[[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork]
==
%near (on-done [message-num %naxplanation error]:naxplanation.task)
%near (on-done [[message-num %naxplanation error]:naxplanation.task %&])
==
:: +on-memo: handle request to send a message
::
@ -2561,7 +2719,7 @@
:: flows.
::
++ on-done
|= [=message-num =ack]
|= [[=message-num =ack] cork=?]
^+ message-pump
:: unsent messages from the future should never get acked
::
@ -2623,6 +2781,7 @@
?- -.u.cur
%ok
=. message-pump (give %done current.state ~)
=? message-pump cork (give %cork ~)
$(current.state +(current.state))
::
%nack
@ -3154,7 +3313,7 @@
=- [(flop gifts) state]
::
?- -.task
%done (on-done ok.task)
%done (on-done ok.task cork.task)
%drop (on-drop message-num.task)
%hear (on-hear [lane shut-packet ok]:task)
==
@ -3303,7 +3462,7 @@
:: +on-done: handle confirmation of message processing from vane
::
++ on-done
|= ok=?
|= [ok=? cork=?]
^+ message-sink
::
=^ pending pending-vane-ack.state ~(get to pending-vane-ack.state)
@ -3313,6 +3472,7 @@
=? nax.state !ok (~(put in nax.state) message-num)
::
=. message-sink (give %send message-num %| ok lag=`@dr`0)
=? message-sink cork (give %cork ~)
=/ next ~(top to pending-vane-ack.state)
?~ next
message-sink

View File

@ -14,7 +14,7 @@
+$ move [=duct move=(wind note-arvo gift-arvo)]
:: $state-8: overall gall state, versioned
::
+$ state-8 [%8 state]
+$ state-9 [%9 state]
:: $state: overall gall state
::
:: system-duct: TODO document
@ -45,6 +45,8 @@
:: $yoke: agent runner state
::
:: control-duct: TODO document
:: run-nonce: unique for each rebuild
:: sub-nonce: app-wide global %watch nonce
:: live: is this agent running? TODO document better
:: stats: TODO document
:: watches: incoming and outgoing subscription state
@ -54,8 +56,9 @@
::
+$ yoke
$: control-duct=duct
nonce=@t
live=? ::TODO remove, replaced by -.agent
run-nonce=@t
sub-nonce=_1
live=?
=stats
=watches
agent=(each agent vase)
@ -108,6 +111,7 @@
%poke
%leave
%missing
%cork
==
:: |migrate: data structures for upgrades
::
@ -116,7 +120,7 @@
:: $spore: structures for update, produced by +stay
::
+$ spore
$: %8
$: %9
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
@ -127,7 +131,8 @@
::
+$ egg
$: control-duct=duct
nonce=@t
run-nonce=@t
sub-nonce=@
live=?
=stats
=watches
@ -164,7 +169,7 @@
[^duct %pass /whiz/gall %$ %whiz ~]~
=/ adult adult-core
=. state.adult
[%8 system-duct outstanding contacts yokes=~ blocked]:spore
[%9 system-duct outstanding contacts yokes=~ blocked]:spore
=/ mo-core (mo-abed:mo:adult duct)
=. mo-core
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
@ -223,9 +228,9 @@
::
++ load
|^ |= old=spore-any
=? old ?=(%7 -.old)
(spore-7-to-8 old)
?> ?=(%8 -.old)
=? old ?=(%7 -.old) (spore-7-to-8 old)
=? old ?=(%8 -.old) (spore-8-to-9 old)
?> ?=(%9 -.old)
=. spore old
?. =(~ eggs.spore)
pupal-gate
@ -234,32 +239,79 @@
state spore(eggs *(map term yoke))
==
::
+$ spore-any $%(^spore spore-7)
+$ spore-any $%(^spore spore-8 spore-7)
+$ spore-7
$: %7
wipe-eyre-subs=_| ::NOTE band-aid for #3196
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg)
eggs=(map term egg-7)
blocked=(map term (qeu blocked-move))
==
::
+$ spore-8
$: %8
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg-8)
blocked=(map term (qeu blocked-move))
==
::
+$ egg-7 egg-8
+$ egg-8
$: control-duct=duct
run-nonce=@t
live=?
=stats
watches=watches-8
old-state=(each vase vase)
=beak
marks=(map duct mark)
==
::
+$ watches-8 [inbound=bitt outbound=boat-8]
+$ boat-8 (map [wire ship term] [acked=? =path])
::
++ spore-7-to-8
|= old=spore-7
^- ^spore
^- spore-8
:- %8
=. eggs.old
%- ~(urn by eggs.old)
|= [a=term e=egg]
|= [a=term e=egg-7]
::NOTE kiln will kick off appropriate app revival
e(old-state [%| p.old-state.e])
+>.old
::
++ spore-8-to-9
|= old=spore-8
^- ^spore
=- old(- %9, eggs -)
%- ~(run by eggs.old)
|= =egg-8
^- egg
:* control-duct.egg-8
run-nonce.egg-8
sub-nonce=0
live.egg-8
stats.egg-8
[inbound.watches.egg-8 (boat-8-to-9 outbound.watches.egg-8)]
[old-state beak marks]:egg-8
==
::
++ boat-8-to-9
|= =boat-8
^- boat
%- ~(run by boat-8)
|= [acked=? =path]
[acked path nonce=0]
--
--
:: adult gall vane interface, for type compatibility with pupa
::
=| state=state-8
=| state=state-9
|= [now=@da eny=@uvJ rof=roof]
=* gall-payload .
=< ~% %gall-wrap ..mo ~
@ -357,7 +409,7 @@
control-duct hen
beak bek
agent &+agent
nonce (scot %uw (end 5 (shas %yoke-nonce eny)))
run-nonce (scot %uw (end 5 (shas %yoke-nonce eny)))
==
::
=/ old mo-core
@ -457,8 +509,12 @@
=. outstanding.state
=/ stand
(~(gut by outstanding.state) [wire hen] *(qeu remote-request))
(~(put by outstanding.state) [wire hen] (~(put to stand) -.deal))
(mo-pass wire note-arvo)
%+ ~(put by outstanding.state) [wire hen]
(~(gas to stand) ?.(?=(%leave -.deal) ~[-.deal] ~[%leave %cork]))
=. mo-core (mo-pass wire note-arvo)
?. ?=(%leave -.deal)
mo-core
(mo-pass wire [%a [%cork ship]])
:: +mo-track-ship: subscribe to ames and jael for notices about .ship
::
++ mo-track-ship
@ -680,7 +736,12 @@
(~(put to *(qeu remote-request)) %missing)
~| [full-wire=full-wire hen=hen stand=stand]
=^ rr stand ~(get to stand)
[rr (~(put by outstanding.state) [full-wire hen] stand)]
~? &(=(rr %cork) ?=(^ stand))
[%outstanding-queue-not-empty wire hen]
:- rr
?: ?=(%cork rr)
(~(del by outstanding.state) [full-wire hen])
(~(put by outstanding.state) [full-wire hen] stand)
:: non-null case of wire is old, remove on next breach after
:: 2019/12
::
@ -696,6 +757,7 @@
%watch (mo-give %unto %watch-ack err)
%poke (mo-give %unto %poke-ack err)
%leave mo-core
%cork mo-core
%missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err)
==
::
@ -738,7 +800,7 @@
?~ yoke
%- (slog leaf+"gall: {<dap>} dead, got {<+<.sign-arvo>}" ~)
mo-core
?. =(nonce.u.yoke i.t.wire)
?. =(run-nonce.u.yoke i.t.wire)
%- (slog leaf+"gall: got old {<+<.sign-arvo>} for {<dap>}" ~)
mo-core
?. ?=([?(%gall %behn) %unto *] sign-arvo)
@ -1036,7 +1098,7 @@
::
++ ap-nuke
^+ ap-core
=/ out=(list [[=wire =ship =term] ? =path])
=/ out=(list [[=wire =ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
=/ inbound-paths=(set path)
%- silt
@ -1049,7 +1111,7 @@
~
[%give %kick ~(tap in inbound-paths) ~]~
%+ turn ~(tap by outbound.watches.yoke)
|= [[=wire =ship =term] ? =path]
|= [[=wire =ship =term] ? =path nonce=@]
[%pass wire %agent [ship term] %leave ~]
=^ maybe-tang ap-core (ap-ingest ~ |.([will *agent]))
ap-core
@ -1133,12 +1195,13 @@
tang.neet
==
=. wire
:^ %use agent-name run-nonce.yoke
?- -.neet
%agent [%out (scot %p ship.neet) name.neet wire]
%huck [%out (scot %p ship.neet) name.neet wire]
%arvo [(scot %p attributing.agent-routes) wire]
==
=. wire [%use agent-name nonce.yoke wire]
::
=/ =note-arvo
?- -.neet
%arvo note-arvo.neet
@ -1161,7 +1224,7 @@
core(agent-duct agent-duct)
$(in t.in)
::
=/ out=(list [[=wire =^ship =term] ? =path])
=/ out=(list [[=wire =^ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
|- ^+ ap-core
?~ out
@ -1169,7 +1232,8 @@
=? ap-core =(ship ship.i.out)
=/ core
=. agent-duct system-duct.state
=/ way [%out (scot %p ship) term.i.out wire.i.out]
=/ way
[%out (scot %p ship) term.i.out (scot %ud nonce.i.out) wire.i.out]
(ap-specific-take way %kick ~)
core(agent-duct agent-duct)
$(out t.out)
@ -1284,15 +1348,6 @@
?: ?=(%& -.res)
``want^p.res
((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~)
:: +ap-update-subscription: update subscription.
::
++ ap-update-subscription
~/ %ap-update-subscription
|= [is-ok=? =other=ship other-agent=term =wire]
^+ ap-core
?: is-ok
ap-core
(ap-kill-down wire [other-ship other-agent])
:: +ap-move: send move
::
++ ap-move
@ -1404,6 +1459,7 @@
=/ other-agent i.t.t.wire
=/ =dock [other-ship other-agent]
=/ agent-wire t.t.t.wire
=/ nonce=@ 0
::
=^ =sign:agent ap-core
?. ?=(%raw-fact -.unto)
@ -1424,48 +1480,107 @@
%- ap-move :_ ~
:^ hen %pass /nowhere
[%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]]
::
:: if subscription ack or close, handle before calling user code
::
=? outbound.watches.yoke ?=(%kick -.sign)
%- ~(del by outbound.watches.yoke)
[agent-wire dock]
?: ?& ?=(%watch-ack -.sign)
!(~(has by outbound.watches.yoke) [agent-wire dock])
==
%- %: slog
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
~
|^ ^+ ap-core
:: %poke-ack has no nonce; ingest directly
::
?: ?=(%poke-ack -.sign)
ingest-and-check-error
:: if .agent-wire matches, it's an old pre-nonce subscription
::
?: (~(has by outbound.watches.yoke) sub-key)
run-sign
:: if an app happened to use a null wire, no-op
::
?: =(~ agent-wire)
on-missing
=/ has-nonce=(unit @ud) (slaw %ud (head agent-wire))
?: &(?=(~ has-nonce) ?=(%kick -.sign))
on-weird-kick
:: pop nonce off .agent-wire and match against stored subscription
::
?> ?=(^ has-nonce)
=: nonce u.has-nonce
agent-wire (tail agent-wire)
==
ap-core
=/ got (~(get by outbound.watches.yoke) sub-key)
?~ got
on-missing
?. =(nonce.u.got nonce)
(on-bad-nonce nonce.u.got)
run-sign
::
=? outbound.watches.yoke ?=(%watch-ack -.sign)
?^ p.sign
%- ~(del by outbound.watches.yoke)
[agent-wire dock]
%+ ~(jab by outbound.watches.yoke) [agent-wire dock]
|= [acked=? =path]
=. .
?. acked
.
%- =/ =tape
"{<agent-name>}: received 2nd watch-ack on {<wire dock path>}"
(slog leaf+tape ~)
.
[& path]
++ sub-key [agent-wire dock]
++ ingest (ap-ingest ~ |.((on-agent:ap-agent-core agent-wire sign)))
++ run-sign
?- -.sign
%poke-ack !!
%fact
=^ tan ap-core ingest
?~ tan ap-core
=. ap-core (ap-kill-down sub-key)
(ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan)
::
%kick
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) sub-key)
::
ingest-and-check-error
::
%watch-ack
?. (~(has by outbound.watches.yoke) sub-key)
%- %: slog
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
~
==
ap-core
=. outbound.watches.yoke
?^ p.sign
(~(del by outbound.watches.yoke) sub-key)
::
%+ ~(jab by outbound.watches.yoke) sub-key
|= val=[acked=? =path nonce=@]
=? . acked.val
%.(. (slog leaf+"{<agent-name>} 2nd watch-ack on {<val>}" ~))
val(acked &)
::
ingest-and-check-error
==
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
(on-agent:ap-agent-core agent-wire sign)
:: if failed %fact handling, kill subscription
++ on-missing
%. ap-core
%- slog :~
leaf+"{<agent-name>}: got {<-.sign>} for nonexistent subscription"
leaf+"{<dock>}: {<[nonce=nonce agent-wire]>}"
>wire=wire<
==
::
=? ap-core ?=(%fact -.sign)
(ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire)
?^ maybe-tang
(ap-error -.sign leaf/"closing subscription" u.maybe-tang)
ap-core
++ on-weird-kick
%. run-sign
%- slog :~
leaf+"{<agent-name>}: got %kick for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
::
++ on-bad-nonce
|= stored-nonce=@
%. ap-core
%- slog :~
=/ nonces [expected=stored-nonce got=nonce]
=/ ok |(?=(?(%fact %kick) -.sign) =(~ p.sign))
leaf+"{<agent-name>}: stale %watch-ack {<nonces>} ok={<ok>}"
::
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
::
++ ingest-and-check-error
^+ ap-core
=^ tan ap-core ingest
?~(tan ap-core (ap-error -.sign leaf/"take {<-.sign>} failed" u.tan))
--
:: +ap-install: install wrapper.
::
++ ap-install
@ -1656,34 +1771,53 @@
?: ?=([* %pass * %g %deal * * %leave *] move)
=/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire
=/ =dock [q.p q]:q.move.move
=/ =dock [q.p q]:q.move.move
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
::
?. (~(has by outbound.watches.yoke) sub-wire dock)
=; =tang
%- (slog tang)
$(moves t.moves)
[leaf+"gall: {<agent-name>} missing subscription, got %leave"]~
=/ have=[acked=? =path nonce=@]
(~(got by outbound.watches.yoke) sub-wire dock)
=. p.move.move
(weld sys-wire [(scot %ud nonce.have) sub-wire])
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) [short-wire dock])
(~(del by outbound.watches.yoke) [sub-wire dock])
$(moves t.moves, new-moves [move new-moves])
?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move)
$(moves t.moves, new-moves [move new-moves])
=/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire
=/ =dock [q.p q]:q.move.move
=/ =path
?- -.r.q.move.move
%watch path.r.q.move.move
%watch-as path.r.q.move.move
==
?: (~(has by outbound.watches.yoke) short-wire dock)
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
=/ [=dock =deal] [[q.p q] r]:q.move.move
::
?: (~(has by outbound.watches.yoke) sub-wire dock)
=. ap-core
=/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<]
~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<]
=/ have
(~(got by outbound.watches.yoke) short-wire dock)
(~(got by outbound.watches.yoke) sub-wire dock)
%- (slog >out=have< tang)
(ap-error %watch-not-unique tang) :: reentrant, maybe bad?
$(moves t.moves)
=. outbound.watches.yoke
(~(put by outbound.watches.yoke) [short-wire dock] [| path])
$(moves t.moves, new-moves [move new-moves])
::
=. p.move.move
(weld sys-wire [(scot %ud sub-nonce.yoke) sub-wire])
%_ $
moves t.moves
new-moves [move new-moves]
sub-nonce.yoke +(sub-nonce.yoke)
::
outbound.watches.yoke
%+ ~(put by outbound.watches.yoke) [sub-wire dock]
:+ acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
sub-nonce.yoke
==
--
--
:: +call: request

View File

@ -84,7 +84,7 @@
%+ sort ~(tap by wex.bowl)
|= [[[a=wire *] *] [[b=wire *] *]]
(aor a b)
|= [[=wire =ship =term] [acked=? =path]]
|= [[=wire =ship =term] [acked=? =path nonce=@]]
^- (unit tank)
=; relevant=?
?. relevant ~