Revert "ames: make +abet pure"

This is stil a WIP and has not been tested so reverting commit
e4182f52a9.
This commit is contained in:
yosoyubik 2023-03-23 10:41:56 +01:00
parent 3c158b2491
commit d214fad1bd

View File

@ -1730,6 +1730,16 @@
++ on-plea ++ on-plea
|= [=ship =plea] |= [=ship =plea]
^+ event-core ^+ event-core
:: since flow kill goes like:
:: client vane cork task -> client ames pass cork as plea ->
:: -> server ames sinks plea -> server ames +on-plea (we are here);
:: if it's %cork plea passed to ames from its sink,
:: give %done and process flow closing after +on-take-done call
::
?: =([%a /close ~] plea)
(emit duct %give %done ~)
:: .plea is from local vane to foreign ship
::
=/ ship-state (~(get by peers.ames-state) ship) =/ ship-state (~(get by peers.ames-state) ship)
:: ::
?. ?=([~ %known *] ship-state) ?. ?=([~ %known *] ship-state)
@ -1753,30 +1763,14 @@
:: ::
=/ =peer-state +.u.ship-state =/ =peer-state +.u.ship-state
=/ =channel [[our ship] now channel-state -.peer-state] =/ =channel [[our ship] now channel-state -.peer-state]
=/ peer-core (pe peer-state channel)
|^ ?+ plea foreign-plea
[%a [%kill ~] =bone] kill-plea
[%a [%cork ~] =bone] =~(cork-plea (emit duct %give %done ~))
==
:: .plea is from local vane to foreign ship
:: ::
++ foreign-plea =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) %- %^ ev-trace msg.veb ship
%- %^ ev-trace msg.veb ship |. ^- tape
|. ^- tape =/ sndr [our our-life.channel]
=/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel]
=/ rcvr [ship her-life.channel] "plea {<sndr rcvr bone=bone vane.plea path.plea>}"
"plea {<sndr rcvr bone=bone vane.plea path.plea>}" abet:(on-memo:(pe peer-state channel) bone plea %plea)
abet:(on-memo:peer-core bone plea %plea)
:: client ames [%cork as plea] -> server ames sinks plea,
:: passes a+/close to itself,
:: put flow in closing, and give %done
:: sink ack, pass a+/kill task <- after +on-take-done, ack %cork pea
:: to itself and delete the flow
::
++ cork-plea abet:(on-close-flow:peer-core ;;(@ +.payload.plea))
++ kill-plea abet:(on-kill-flow:peer-core ;;(@ +.payload.plea))
--
:: +on-cork: handle request to kill a flow :: +on-cork: handle request to kill a flow
:: ::
++ on-cork ++ on-cork
@ -2667,35 +2661,6 @@
:: maybe resend some timed out packets :: maybe resend some timed out packets
:: ::
abet:(call:abed:(pump-core bone) %wake ~) abet:(call:abed:(pump-core bone) %wake ~)
:: +on-close-flow: close flow on cork receiver side
::
++ on-close-flow
|= =bone
^+ peer-core
peer-core(closing.peer-state (~(put in closing.peer-state) bone))
:: +on-kill-flow: kill flow on cork sender side
::
++ on-kill-flow
|= =bone
^+ peer-core
?: (~(has in corked.peer-state) bone)
~> %slog.0^leaf/"ames: ignoring kill on corked bone {<bone>}"
peer-core
=. peer-state
=, peer-state
%_ peer-state
:: if the publisher was behind, preemptively remove any nacks
::
rcv (~(del by (~(del by rcv) bone)) (mix 0b10 bone))
snd (~(del by snd) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
:: since we got one cork ack, try the next one
::
recork-one
:: ::
+| %implementation +| %implementation
:: +dedup-message: replace with any existing copy of this message :: +dedup-message: replace with any existing copy of this message
@ -3074,6 +3039,7 @@
:: we have unsent fragments of the current message; feed them :: we have unsent fragments of the current message; feed them
:: ::
?. =(~ unsent-fragments.state) ?. =(~ unsent-fragments.state)
:: we have unsent fragments of the current message; feed them :: we have unsent fragments of the current message; feed them
:: ::
=^ unsent pump abut:(feed:packet-pump unsent-fragments.state) =^ unsent pump abut:(feed:packet-pump unsent-fragments.state)
@ -3127,7 +3093,8 @@
:: not a nack-trace bone; relay ack to client vane :: not a nack-trace bone; relay ack to client vane
:: ::
(pe-emit (got-duct bone) %give %done error) (pe-emit (got-duct bone) %give %done error)
:: +pump-cork: handle %cork on the publisher :: XX impure +abet pattern
:: +pump-cork: kill flow on cork sender side
:: ::
++ pump-cork ++ pump-cork
|= =message-num |= =message-num
@ -3139,8 +3106,24 @@
%- %+ pe-trace odd.veb %- %+ pe-trace odd.veb
|.("trying to delete a corked bone={<bone>}") |.("trying to delete a corked bone={<bone>}")
peer-core peer-core
=/ =wire (make-bone-wire her her-rift.channel bone) =/ nack-bone=^bone (mix 0b10 bone)
(pe-emit duct %pass wire %a %plea her [%a /kill bone]) =? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
:: if the publisher was behind we remove nacks on that bone
::
(~(del by rcv.peer-state) nack-bone)
=. peer-state
=, peer-state
%_ peer-state
snd (~(del by snd) bone)
rcv (~(del by rcv) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
:: since we got one cork ack, try the next one
::
recork-one
:: +pu: construct |packet-pump core :: +pu: construct |packet-pump core
:: ::
++ pu ++ pu
@ -3726,7 +3709,10 @@
:: account for publishers that still handle ames-to-ames %pleas :: account for publishers that still handle ames-to-ames %pleas
:: ::
?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
(pe-emit duct %pass wire %a %plea her [%a /close bone]) :: XX FIXME impure +abet pattern...
::
=. closing.peer-state (~(put in closing.peer-state) bone)
(pe-emit duct %pass wire %a %plea her [%a /close ~])
:: ::
:: +ha-boon: handle response message, acking unconditionally :: +ha-boon: handle response message, acking unconditionally
:: ::