Revert "Revert "ames: make +abet pure""

This reverts commit d214fad1bd.

https://github.com/urbit/urbit/pull/6403 got closed, probably due to its
previous inclusion in the Remote Scry PR, so we reopen it (as a draft).
This commit is contained in:
yosoyubik 2023-04-27 14:42:34 +02:00
parent 568994e300
commit 4728ee68c6

View File

@ -1718,16 +1718,6 @@
++ on-plea
|= [=ship =plea]
^+ event-core
:: since flow kill goes like:
:: client vane cork task -> client ames pass cork as plea ->
:: -> server ames sinks plea -> server ames +on-plea (we are here);
:: if it's %cork plea passed to ames from its sink,
:: give %done and process flow closing after +on-take-done call
::
?: =([%a /close ~] plea) (emit duct %give %done ~)
::
:: .plea is from local vane to foreign ship
::
=/ ship-state (~(get by peers.ames-state) ship)
::
?. ?=([~ %known *] ship-state)
@ -1736,15 +1726,30 @@
todos(messages [[duct plea] messages.todos])
::
=/ =peer-state +.u.ship-state
=/ =channel [[our ship] now channel-state -.peer-state]
=/ peer-core (abed-peer:pe ship peer-state)
|^ ?+ plea foreign-plea
[%a [%kill ~] =bone] kill-plea
[%a [%cork ~] =bone] =~(cork-plea (emit duct %give %done ~))
==
:: .plea is from local vane to foreign ship
::
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
%- %^ ev-trace msg.veb ship
|. ^- tape
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"plea {<sndr rcvr bone=bone vane.plea path.plea>}"
abet:(~(on-memo pe [peer-state channel]) bone plea %plea)
++ foreign-plea
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
%- %^ ev-trace msg.veb ship
|. ^- tape
=/ sndr [our our-life.channel.peer-core]
=/ rcvr [ship her-life.channel.peer-core]
"plea {<sndr rcvr bone=bone vane.plea path.plea>}"
abet:(on-memo:peer-core bone plea %plea)
:: client ames [%cork as plea] -> server ames sinks plea,
:: passes a+/close to itself,
:: put flow in closing, and give %done
:: sink ack, pass a+/kill task <- after +on-take-done, ack %cork pea
:: to itself and delete the flow
::
++ cork-plea abet:(on-close-flow:peer-core ;;(@ +.payload.plea))
++ kill-plea abet:(on-kill-flow:peer-core ;;(@ +.payload.plea))
--
:: +on-cork: handle request to kill a flow
::
++ on-cork
@ -2614,6 +2619,36 @@
=. keens (~(put by keens) path *keen-state)
fi-abet:(fi-start:(abed:fi path) duct)
::
:: +on-close-flow: close flow on cork receiver side
::
++ on-close-flow
|= =bone
^+ peer-core
peer-core(closing.peer-state (~(put in closing.peer-state) bone))
:: +on-kill-flow: kill flow on cork sender side
::
++ on-kill-flow
|= =bone
^+ peer-core
?: (~(has in corked.peer-state) bone)
~> %slog.0^leaf/"ames: ignoring kill on corked bone {<bone>}"
peer-core
=. peer-state
=, peer-state
%_ peer-state
:: if the publisher was behind, preemptively remove any nacks
::
rcv (~(del by (~(del by rcv) bone)) (mix 0b10 bone))
snd (~(del by snd) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
:: since we got one cork ack, try the next one
::
recork-one
::
+| %implementation
:: +dedup-message: replace with any existing copy of this message
::
@ -3042,8 +3077,7 @@
:: not a nack-trace bone; relay ack to client vane
::
(pe-emit (got-duct bone) %give %done error)
:: XX impure +abet pattern
:: +pump-cork: kill flow on cork sender side
:: +pump-cork: handle %cork on the publisher
::
++ pump-cork
|= =message-num
@ -3055,24 +3089,8 @@
%- %+ pe-trace odd.veb
|.("trying to delete a corked bone={<bone>}")
peer-core
=/ nack-bone=^bone (mix 0b10 bone)
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
:: if the publisher was behind we remove nacks on that bone
::
(~(del by rcv.peer-state) nack-bone)
=. peer-state
=, peer-state
%_ peer-state
snd (~(del by snd) bone)
rcv (~(del by rcv) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
:: since we got one cork ack, try the next one
::
recork-one
=/ =wire (make-bone-wire her her-rift.channel bone)
(pe-emit duct %pass wire %a %plea her [%a /kill bone])
:: +pu: construct |packet-pump core
::
++ pu
@ -3651,10 +3669,7 @@
:: account for publishers that still handle ames-to-ames %pleas
::
?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
:: XX FIXME impure +abet pattern...
::
=. closing.peer-state (~(put in closing.peer-state) bone)
(pe-emit duct %pass wire %a %plea her [%a /close ~])
(pe-emit duct %pass wire %a %plea her [%a /close bone])
::
:: +ha-boon: handle response message, acking unconditionally
::