gall: add $beat map for subscription nonces

Previously we stored the nonce in $boat, which changed the $bowl of each
agent. This compiles and all agents reload, but more testing is needed.
It also renames inbound/outbound watches to $bitt/$boat.
This commit is contained in:
yosoyubik 2022-05-16 09:13:40 +02:00
parent a1dcc5a8cd
commit 83356f02ca
2 changed files with 64 additions and 57 deletions

View File

@ -1657,9 +1657,8 @@
[%sift dudes=(list dude)] :: per agent
== ::
+$ bitt (map duct (pair ship path)) :: incoming subs
+$ boat :: outgoing subs
%+ map [=wire =ship =term] ::
[acked=? =path nonce=@] ::
+$ boat (map [=wire =ship =term] [acked=? =path]) :: outgoing subs
+$ beat (map [=wire =ship =term] nonce=@) ::
+$ bowl :: standard app state
$: $: our=ship :: host
src=ship :: guest

View File

@ -65,9 +65,9 @@
:: $watches: subscribers and publications
::
:: TODO: rename this, to $ties?
:: TODO: rename $boat and $bitt and document
:: TODO: document
::
+$ watches [inbound=bitt outbound=boat]
+$ watches [=bitt =boat =beat]
:: $routes: new cuff; TODO: document
::
+$ routes
@ -330,16 +330,14 @@
sub-nonce=0
live.egg-8
stats.egg-8
[inbound.watches.egg-8 (boat-8-to-9 outbound.watches.egg-8)]
(watches-8-to-9 watches.egg-8)
[old-state beak marks]:egg-8
==
::
++ boat-8-to-9
|= =boat-8
^- boat
%- ~(run by boat-8)
|= [acked=? =path]
[acked path nonce=0]
++ watches-8-to-9
|= watches-8
^- watches
[inbound outbound (~(run by outbound) |=([acked=? =path] nonce=0))]
--
--
:: adult gall vane interface, for type compatibility with pupa
@ -1165,11 +1163,11 @@
::
++ ap-nuke
^+ ap-core
=/ out=(list [[=wire =ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
=/ out=(list [[=wire =ship =term] ? =path])
~(tap by boat.watches.yoke)
=/ inbound-paths=(set path)
%- silt
%+ turn ~(tap by inbound.watches.yoke)
%+ turn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
path
=/ will=(list card:agent:gall)
@ -1177,8 +1175,8 @@
?: =(~ inbound-paths)
~
[%give %kick ~(tap in inbound-paths) ~]~
%+ turn ~(tap by outbound.watches.yoke)
|= [[=wire =ship =term] ? =path nonce=@]
%+ turn ~(tap by boat.watches.yoke)
|= [[=wire =ship =term] ? =path]
[%pass wire %agent [ship term] %leave ~]
=^ maybe-tang ap-core (ap-ingest ~ |.([will *agent]))
ap-core
@ -1283,7 +1281,7 @@
|= =ship
^+ ap-core
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
~(tap by bitt.watches.yoke)
|- ^+ ap-core
?^ in
=? ap-core =(ship ship.i.in)
@ -1292,7 +1290,10 @@
$(in t.in)
::
=/ out=(list [[=wire =^ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
%+ turn ~(tap by boat.watches.yoke)
|= [key=[wire ^ship term] val=[? path]]
:- key
val(+ [+.val (~(got by beat.watches.yoke) key)])
|- ^+ ap-core
?~ out
ap-core
@ -1316,7 +1317,7 @@
^+ ap-core
::
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
~(tap by bitt.watches.yoke)
|- ^+ ap-core
?~ in ap-core
::
@ -1337,7 +1338,7 @@
?~ target-paths
?~ target-ship
~[agent-duct]
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: =(target-ship `ship)
@ -1352,7 +1353,7 @@
++ ap-ducts-from-path
|= [target-path=path target-ship=(unit ship)]
^- (list duct)
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: ?& =(target-path path)
@ -1438,8 +1439,8 @@
attributing.agent-routes :: guest
agent-name :: agent
== ::
:* wex=outbound.watches.yoke :: outgoing
sup=inbound.watches.yoke :: incoming
:* wex=boat.watches.yoke :: outgoing
sup=bitt.watches.yoke :: incoming
== ::
:* act=change.stats.yoke :: tick
eny=eny.stats.yoke :: nonce
@ -1475,8 +1476,8 @@
|= pax=path
^+ ap-core
=/ incoming [attributing.agent-routes pax]
=. inbound.watches.yoke
(~(put by inbound.watches.yoke) agent-duct incoming)
=. bitt.watches.yoke
(~(put by bitt.watches.yoke) agent-duct incoming)
=^ maybe-tang ap-core
%+ ap-ingest %watch-ack |.
(on-watch:ap-agent-core pax)
@ -1554,7 +1555,7 @@
ingest-and-check-error
:: if .agent-wire matches, it's an old pre-nonce subscription
::
?: (~(has by outbound.watches.yoke) sub-key)
?: (~(has by boat.watches.yoke) sub-key)
run-sign
:: if an app happened to use a null wire, no-op
::
@ -1569,7 +1570,7 @@
=: nonce u.has-nonce
agent-wire (tail agent-wire)
==
=/ got (~(get by outbound.watches.yoke) sub-key)
=/ got (~(get by beat.watches.yoke) sub-key)
?~ got
on-missing
?. =(nonce.u.got nonce)
@ -1588,25 +1589,29 @@
(ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan)
::
%kick
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) sub-key)
=: beat.watches.yoke (~(del by beat.watches.yoke) sub-key)
boat.watches.yoke (~(del by boat.watches.yoke) sub-key)
==
::
ingest-and-check-error
::
%watch-ack
?. (~(has by outbound.watches.yoke) sub-key)
?. (~(has by boat.watches.yoke) sub-key)
%. ap-core
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
=. outbound.watches.yoke
=? beat.watches.yoke ?=(^ p.sign)
(~(del by beat.watches.yoke) sub-key)
::
=. boat.watches.yoke
?^ p.sign
(~(del by outbound.watches.yoke) sub-key)
(~(del by boat.watches.yoke) sub-key)
::
%+ ~(jab by outbound.watches.yoke) sub-key
|= val=[acked=? =path nonce=@]
%+ ~(jab by boat.watches.yoke) sub-key
|= val=[acked=? =path]
%. val(acked &)
%^ trace &(odd.veb.bug.state acked.val)
leaf/"{<agent-name>} 2nd watch-ack on {<val>}" ~
@ -1682,8 +1687,8 @@
^+ ap-core
::
%= ap-core
inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
bitt.watches.yoke
(~(del by bitt.watches.yoke) agent-duct)
==
:: +ap-load-delete: load delete.
::
@ -1691,13 +1696,13 @@
^+ ap-core
::
=/ maybe-incoming
(~(get by inbound.watches.yoke) agent-duct)
(~(get by bitt.watches.yoke) agent-duct)
?~ maybe-incoming
ap-core
::
=/ incoming u.maybe-incoming
=. inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
=. bitt.watches.yoke
(~(del by bitt.watches.yoke) agent-duct)
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
@ -1803,10 +1808,10 @@
::
=. agent.yoke &++.p.result
=/ moves (zing (turn -.p.result ap-from-internal))
=. inbound.watches.yoke
=. bitt.watches.yoke
(ap-handle-kicks moves)
(ap-handle-peers moves)
:: +ap-handle-kicks: handle cancels of inbound.watches
:: +ap-handle-kicks: handle cancels of bitt.watches
::
++ ap-handle-kicks
~/ %ap-handle-kicks
@ -1822,8 +1827,8 @@
::
=/ quit-map=bitt
(malt (turn quits |=(=duct [duct *[ship path]])))
(~(dif by inbound.watches.yoke) quit-map)
:: +ap-handle-peers: handle new outbound.watches
(~(dif by bitt.watches.yoke) quit-map)
:: +ap-handle-peers: handle new boat.watches
::
++ ap-handle-peers
~/ %ap-handle-peers
@ -1841,16 +1846,18 @@
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
::
?. (~(has by outbound.watches.yoke) sub-wire dock)
?. (~(has by boat.watches.yoke) sub-wire dock)
%. $(moves t.moves)
%^ trace odd.veb.bug.state
leaf/"gall: {<agent-name>} missing subscription, got %leave" ~
=/ have=[acked=? =path nonce=@]
(~(got by outbound.watches.yoke) sub-wire dock)
=/ have=[nonce=@ acked=? =path]
:- (~(got by beat.watches.yoke) sub-wire dock)
(~(got by boat.watches.yoke) sub-wire dock)
=. p.move.move
(weld sys-wire [(scot %ud nonce.have) sub-wire])
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) [sub-wire dock])
=: boat.watches.yoke (~(del by boat.watches.yoke) [sub-wire dock])
beat.watches.yoke (~(del by beat.watches.yoke) [sub-wire dock])
==
$(moves t.moves, new-moves [move new-moves])
?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move)
$(moves t.moves, new-moves [move new-moves])
@ -1860,12 +1867,11 @@
=/ sub-wire=^wire (slag 6 `^wire`wire)
=/ [=dock =deal] [[q.p q] r]:q.move.move
::
?: (~(has by outbound.watches.yoke) sub-wire dock)
?: (~(has by boat.watches.yoke) sub-wire dock)
=. ap-core
=/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<]
=/ have
(~(got by outbound.watches.yoke) sub-wire dock)
=/ have (~(got by boat.watches.yoke) sub-wire dock)
%- (slog >out=have< tang)
(ap-error %watch-not-unique tang) :: reentrant, maybe bad?
$(moves t.moves)
@ -1877,11 +1883,13 @@
new-moves [move new-moves]
sub-nonce.yoke +(sub-nonce.yoke)
::
outbound.watches.yoke
%+ ~(put by outbound.watches.yoke) [sub-wire dock]
:+ acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
sub-nonce.yoke
boat.watches.yoke
%+ ~(put by boat.watches.yoke) [sub-wire dock]
:- acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
::
beat.watches.yoke
(~(put by beat.watches.yoke) [sub-wire dock] sub-nonce.yoke)
==
--
--