From d214fad1bdb77e08c97b52a84f45955c9a9f5df3 Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Thu, 23 Mar 2023 10:41:56 +0100 Subject: [PATCH] Revert "ames: make +abet pure" This is stil a WIP and has not been tested so reverting commit e4182f52a98a6f1d26bafac76773df1a22a23362. --- pkg/arvo/sys/vane/ames.hoon | 98 ++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 56 deletions(-) diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index d8bbb3c52..5ae100ddf 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1730,6 +1730,16 @@ ++ on-plea |= [=ship =plea] ^+ event-core + :: since flow kill goes like: + :: client vane cork task -> client ames pass cork as plea -> + :: -> server ames sinks plea -> server ames +on-plea (we are here); + :: if it's %cork plea passed to ames from its sink, + :: give %done and process flow closing after +on-take-done call + :: + ?: =([%a /close ~] plea) + (emit duct %give %done ~) + :: .plea is from local vane to foreign ship + :: =/ ship-state (~(get by peers.ames-state) ship) :: ?. ?=([~ %known *] ship-state) @@ -1753,30 +1763,14 @@ :: =/ =peer-state +.u.ship-state =/ =channel [[our ship] now channel-state -.peer-state] - =/ peer-core (pe peer-state channel) - |^ ?+ plea foreign-plea - [%a [%kill ~] =bone] kill-plea - [%a [%cork ~] =bone] =~(cork-plea (emit duct %give %done ~)) - == - :: .plea is from local vane to foreign ship :: - ++ foreign-plea - =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) - %- %^ ev-trace msg.veb ship - |. ^- tape - =/ sndr [our our-life.channel] - =/ rcvr [ship her-life.channel] - "plea {}" - abet:(on-memo:peer-core bone plea %plea) - :: client ames [%cork as plea] -> server ames sinks plea, - :: passes a+/close to itself, - :: put flow in closing, and give %done - :: sink ack, pass a+/kill task <- after +on-take-done, ack %cork pea - :: to itself and delete the flow - :: - ++ cork-plea abet:(on-close-flow:peer-core ;;(@ +.payload.plea)) - ++ kill-plea abet:(on-kill-flow:peer-core ;;(@ +.payload.plea)) - -- + =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) + %- %^ ev-trace msg.veb ship + |. ^- tape + =/ sndr [our our-life.channel] + =/ rcvr [ship her-life.channel] + "plea {}" + abet:(on-memo:(pe peer-state channel) bone plea %plea) :: +on-cork: handle request to kill a flow :: ++ on-cork @@ -2667,35 +2661,6 @@ :: maybe resend some timed out packets :: abet:(call:abed:(pump-core bone) %wake ~) - :: +on-close-flow: close flow on cork receiver side - :: - ++ on-close-flow - |= =bone - ^+ peer-core - peer-core(closing.peer-state (~(put in closing.peer-state) bone)) - :: +on-kill-flow: kill flow on cork sender side - :: - ++ on-kill-flow - |= =bone - ^+ peer-core - ?: (~(has in corked.peer-state) bone) - ~> %slog.0^leaf/"ames: ignoring kill on corked bone {}" - peer-core - =. peer-state - =, peer-state - %_ peer-state - :: if the publisher was behind, preemptively remove any nacks - :: - rcv (~(del by (~(del by rcv) bone)) (mix 0b10 bone)) - snd (~(del by snd) bone) - corked (~(put in corked) bone) - closing (~(del in closing) bone) - by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) - by-bone.ossuary (~(del by by-bone.ossuary) bone) - == - :: since we got one cork ack, try the next one - :: - recork-one :: +| %implementation :: +dedup-message: replace with any existing copy of this message @@ -3074,6 +3039,7 @@ :: we have unsent fragments of the current message; feed them :: ?. =(~ unsent-fragments.state) + :: we have unsent fragments of the current message; feed them :: =^ unsent pump abut:(feed:packet-pump unsent-fragments.state) @@ -3127,7 +3093,8 @@ :: not a nack-trace bone; relay ack to client vane :: (pe-emit (got-duct bone) %give %done error) - :: +pump-cork: handle %cork on the publisher + :: XX impure +abet pattern + :: +pump-cork: kill flow on cork sender side :: ++ pump-cork |= =message-num @@ -3139,8 +3106,24 @@ %- %+ pe-trace odd.veb |.("trying to delete a corked bone={}") peer-core - =/ =wire (make-bone-wire her her-rift.channel bone) - (pe-emit duct %pass wire %a %plea her [%a /kill bone]) + =/ nack-bone=^bone (mix 0b10 bone) + =? rcv.peer-state (~(has by rcv.peer-state) nack-bone) + :: if the publisher was behind we remove nacks on that bone + :: + (~(del by rcv.peer-state) nack-bone) + =. peer-state + =, peer-state + %_ peer-state + snd (~(del by snd) bone) + rcv (~(del by rcv) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) + by-bone.ossuary (~(del by by-bone.ossuary) bone) + == + :: since we got one cork ack, try the next one + :: + recork-one :: +pu: construct |packet-pump core :: ++ pu @@ -3726,7 +3709,10 @@ :: account for publishers that still handle ames-to-ames %pleas :: ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) - (pe-emit duct %pass wire %a %plea her [%a /close bone]) + :: XX FIXME impure +abet pattern... + :: + =. closing.peer-state (~(put in closing.peer-state) bone) + (pe-emit duct %pass wire %a %plea her [%a /close ~]) :: :: +ha-boon: handle response message, acking unconditionally ::