ames: defer mutual calls between |pump and |sink

|pump and |sink call into each other in three places
related to nacks and naxplanations (sending a nack,
notifying the |pump of a naxplanation, or dropping a
nack from the |sink). This intra calls are making implicit
updates to more parts of the state than the core should
manage. To avoid that we emit a move to %arvo, encoded
as an %ames plea, to handle that in the next event.
This commit is contained in:
yosoyubik 2023-05-08 15:28:13 +02:00
parent 6213e0bbb3
commit f53fee723a
2 changed files with 82 additions and 45 deletions

View File

@ -1649,14 +1649,14 @@
^+ peer-core
:: handle cork only deals with bones that are in closing
::
(handle-cork:abet:(call:(abed:mi:peer-core bone) %done ok=%.y) bone)
%. bone
handle-cork:abet:(call:(abed:mi:peer-core bone) %done ok=%.y)
:: failed; send message nack packet
::
++ send-nack
|= [=bone =^error]
^+ peer-core
=. peer-core abet:(call:(abed:mi:peer-core bone) %done ok=%.n)
=. event-core abet:peer-core :: XX extraneous?
=. peer-core abet:(call:(abed:mi:peer-core bone) %done ok=%.n)
:: construct nack-trace message, referencing .failed $message-num
::
=/ failed=message-num
@ -1666,7 +1666,7 @@
:: send nack-trace message on associated .nack-bone
::
=/ nack-bone=^bone (mix 0b10 bone)
abet:(call:(abed:mu:(abed-got:pe her) nack-bone) %memo message-blob)
abet:(call:(abed:mu:peer-core nack-bone) %memo message-blob)
--
:: +on-sift: handle request to filter debug output by ship
::
@ -2027,9 +2027,14 @@
todos(messages [[duct plea] messages.todos])
::
=+ peer-core=(abed-peer:pe ship +.u.ship-state)
:: %ames pleas are internal, called from self, to keep +abet cores pure
::
|^ ?+ plea foreign-plea
[%a [%kill ~] =bone] kill-plea
[%a [%cork ~] =bone] =~(cork-plea (emit duct %give %done ~))
[%a [%nack ~] *] abet:send-nack-trace
[%a [%sink ~] *] abet:sink-naxplanation
[%a [%drop ~] *] abet:clear-nack
[%a [%cork ~] *] =~(cork-plea (emit duct %give %done ~))
[%a [%kill ~] *] kill-plea
==
:: .plea is from local vane to foreign ship
::
@ -2041,15 +2046,28 @@
=/ rcvr [ship her-life.channel.peer-core]
"plea {<sndr rcvr bone=bone vane.plea path.plea>}"
abet:(on-memo:peer-core bone plea %plea)
::
++ send-nack-trace
=+ ;;([=nack=bone =message-blob] payload.plea)
abet:(call:(abed:mu:peer-core nack-bone) %memo message-blob)
::
++ sink-naxplanation
=+ ;;([=target=bone =naxplanation] payload.plea)
abet:(call:(abed:mu:peer-core target-bone) %near naxplanation)
::
++ clear-nack
=+ ;;([=nack=bone =message-num] payload.plea)
abet:(call:(abed:mi:peer-core nack-bone) %drop message-num)
:: client ames [%cork as plea] -> server ames [sinks %cork plea],
:: pass a+/close task to self
:: put flow in closing, and give %done
:: pass a+/cork task to self
:: put flow in closing (+cork-plea),
:: and give %done
:: sink %ack, pass a+/kill task <- after +on-take-done, ack %cork plea
:: to self, and delete the flow and delete the flow in +handle-cork
:: (+kill-plea)
::
::
++ cork-plea abet:(on-cork-flow:peer-core ;;(@ payload.plea))
++ kill-plea abet:(on-kill-flow:peer-core ;;(@ payload.plea))
++ cork-plea abet:(on-cork-flow:peer-core ;;(bone payload.plea))
++ kill-plea abet:(on-kill-flow:peer-core ;;(bone payload.plea))
--
:: +on-cork: handle request to kill a flow
::
@ -2931,7 +2949,6 @@
fi-abet:(fi-sub:(abed:fi path) duct)
=. keens (~(put by keens) path *keen-state)
fi-abet:(fi-start:(abed:fi path) duct)
::
:: +on-cork-flow: mark .bone as closing
::
++ on-cork-flow
@ -3360,16 +3377,16 @@
:: +pump-done: handle |message-pump's report of message (n)ack
::
++ pump-done
|= [=message-num error=(unit error)]
|= [num=message-num error=(unit error)]
^+ peer-core
?: ?& =(1 (end 0 bone))
=(1 (end 0 (rsh 0 bone)))
(~(has in corked.peer-state) (mix 0b10 bone))
==
%- %+ pe-trace msg.veb
=/ dat [her bone=bone message-num=message-num -.task]
=/ dat [her bone=bone message-num=num -.task]
|.("remove naxplanation flow {<dat>}")
:: XX we avoid re-adding the bone in abet:mu; test that it works
:: we avoid re-adding the bone in abet:mu
::
=. snd.peer-state (~(del by snd.peer-state) bone)
peer-core
@ -3379,9 +3396,10 @@
:: even bone; is this bone a nack-trace bone?
::
?: =(1 (end 0 (rsh 0 bone)))
:: nack-trace bone; assume .ok, clear nack from |message-sink
:: nack-trace bone; assume .ok, clear nack from |sink
::
abet:(call:(abed:mi (mix 0b10 bone)) %drop message-num)
%+ pe-emit duct
[%pass /clear-nack %a %plea her %a /drop (mix 0b10 bone) num]
?: &(closing ?=(%near -.task))
:: if the bone belongs to a closing flow and we got a
:: naxplanation, don't relay ack to the client vane
@ -3959,30 +3977,33 @@
%- %+ pe-trace msg.veb
=/ dat [her bone=bone message-num=message-num]
|.("sink plea {<dat>}")
=; pe=_peer-core
=. peer-core pe
=? sink !ok (call %done ok=%.n)
sink
?. ok
:: send nack-trace with blank .error for security
::
=/ nack-bone=^bone (mix 0b10 bone)
=/ =message-blob (jam [message-num *error])
abet:(call:(abed:mu nack-bone) %memo message-blob)
=+ ;; =plea message
=/ =wire (make-bone-wire her her-rift.channel nack-bone)
:: send nack-trace with blank .error for security
::
=. peer-core
%+ pe-emit duct
[%pass wire %a %plea her [%a /nack nack-bone message-blob]]
::
(done ok=%.n)
::
=/ =wire (make-bone-wire her her-rift.channel bone)
::
?. =(vane.plea %$)
?+ vane.plea ~| %ames-evil-vane^our^her^vane.plea !!
%c (pe-emit duct %pass wire %c %plea her plea)
%g (pe-emit duct %pass wire %g %plea her plea)
%j (pe-emit duct %pass wire %j %plea her plea)
==
:: a %cork plea is handled using %$ as the recipient vane to
:: account for publishers that still handle ames-to-ames %pleas
::
?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
(pe-emit duct %pass wire %a %plea her [%a /cork bone])
=. peer-core
=+ ;; =plea message
?. =(vane.plea %$)
?+ vane.plea ~| %ames-evil-vane^our^her^vane.plea !!
%c (pe-emit duct %pass wire %c %plea her plea)
%g (pe-emit duct %pass wire %g %plea her plea)
%j (pe-emit duct %pass wire %j %plea her plea)
==
:: a %cork plea is handled using %$ as the recipient vane to
:: account for publishers that still handle ames-to-ames %pleas
::
?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
(pe-emit duct %pass wire %a %plea her [%a /cork bone])
sink
::
:: +ha-boon: handle response message, acking unconditionally
::
@ -4017,7 +4038,7 @@
=. peer-core (pe-emit (got-duct bone) %give %boon message)
:: send ack unconditionally
::
(call %done ok=%.y)
(done ok=%.y)
::
++ ha-nack
^+ sink
@ -4031,12 +4052,15 @@
::
=/ target=^bone (mix 0b10 bone)
=. peer-core
:: notify |message-pump that this message got naxplained
:: will notify |message-pump that this message got naxplained
::
abet:(call:(abed:mu target) %near ;;(naxplanation message))
=/ =wire (make-bone-wire her her-rift.channel target)
=+ ;;(=naxplanation message)
%+ pe-emit duct
[%pass wire %a %plea her [%a /sink target naxplanation]]
:: ack nack-trace message (only applied if we don't later crash)
::
(call %done ok=%.y)
(done ok=%.y)
--
--
:: +fi: constructor for |fine remote scry core

View File

@ -495,13 +495,26 @@
:: ~bud -> nack-trace -> ~nec
::
=^ moves5 nec (call nec ~[//unix] %hear (snag-packet 1 moves3))
:: ~nec -> naxplanation -> ~nec
::
:: [%pass wire %a %plea her [%a /sink target naxplanation]]
=/ sink-naxplanation-plea
[%plea ~bud %a /sink bone=0 message-num=1 error]
=^ moves6 nec (call nec ~[//unix] sink-naxplanation-plea)
:: ~nec -> ack nack-trace -> ~bud
::
=^ moves6 bud (call bud ~[//unix] %hear (snag-packet 0 moves5))
=^ moves7 bud (call bud ~[//unix] %hear (snag-packet 0 moves5))
::
%+ expect-eq
!> [~[/g/talk] %give %done `error]
!> (snag 0 `(list move:ames)`moves5)
;: welp
%+ expect-eq
!> [~[/g/talk] %give %done `error]
!> (snag 0 `(list move:ames)`moves6)
::
%+ expect-eq
!> [~[//unix] %pass /bone/~bud/0/0 %a sink-naxplanation-plea]
!> (snag 0 `(list move:ames)`moves5)
::
==
::
++ test-fine-request
^- tang