mall: populate wex.bowl with outgoing subscriptions

fixes #1466
This commit is contained in:
Philip Monk 2019-09-30 20:53:02 -07:00
parent 9c9115a7e0
commit 9fc28a9538
No known key found for this signature in database
GPG Key ID: B66E1F02604E44EC
8 changed files with 109 additions and 37 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d2bb354f316b850a21bf4717d8440d7bbb6bf9f34536c53d7f84435c719c0224
size 16455581
oid sha256:466901e624c1515a69a63a7cd11fab0afffeb6caef18ac640f8de44b555f3a00
size 16472260

View File

@ -84,7 +84,7 @@
!!
`this
::
++ handle-unsubscribe ~& >> %k handle-unsubscribe:def
++ handle-unsubscribe handle-unsubscribe:def
++ handle-peek
|= =path
^- (unit (unit cage))

View File

@ -26,7 +26,7 @@
?+ mark (handle-poke:def mark vase)
%spider-imput (handle-poke-imput:sc !<(imput vase))
%spider-start (handle-start-imp:sc !<([imp-name term] vase))
%spider-stop (handle-stop-imp:sc !<(imp-name vase))
%spider-stop (handle-stop-imp:sc !<([imp-name ?] vase))
==
[cards this]
::
@ -129,25 +129,34 @@
++ start-imp
|= [=imp-name =imp]
^- (quip card ^state)
=^ cards-1 state
?. (~(has by state) imp-name)
`state
(imp-done imp-name)
=/ m (thread ,~)
=/ =eval-form:eval:m (from-form:eval:m (imp bowl))
=. state (~(put by state) imp-name eval-form)
=^ cards-2 state
(take-input imp-name ~)
[(weld cards-1 cards-2) state]
::
++ handle-stop-imp
|= =imp-name
|= [=imp-name nice=?]
^- (quip card ^state)
?. (~(has by state) imp-name)
~| [%not-started imp-name]
!!
~? !(~(has by state) imp-name)
[%not-started imp-name]
?: nice
(imp-done imp-name)
(imp-fail imp-name %cancelled ~)
::
++ take-input
|= [=imp-name input=(unit input:thread)]
^- (quip card ^state)
=/ m (thread ,~)
?. (~(has by state) imp-name)
%- (slog leaf+"spider got input for non-existent {<imp-name>}" ~)
`state
=/ =eval-form:eval:m
~| [%no-imp imp-name]
(~(got by state) imp-name)
=| cards=(list card)
|- ^- (quip card ^state)
@ -189,13 +198,25 @@
|= [=imp-name =term =tang]
^- (quip card ^state)
%- (slog leaf+"thread {<imp-name>} failed" leaf+<term> tang)
:- [%pass /build/[imp-name] %arvo %f %kill ~]~
(~(del by state) imp-name)
(imp-clean imp-name)
::
++ imp-done
|= =imp-name
^- (quip card ^state)
%- (slog leaf+"thread {<imp-name>} finished" ~)
:- [%pass /build/[imp-name] %arvo %f %kill ~]~
(~(del by state) imp-name)
(imp-clean imp-name)
::
++ imp-clean
|= =imp-name
^- (quip card ^state)
:_ (~(del by state) imp-name)
:- [%pass /build/[imp-name] %arvo %f %kill ~]
%+ murn ~(tap by wex.bowl)
|= [[=wire =ship =term] [acked=? =path]]
^- (unit card)
?. ?& ?=([%imp @ *] wire)
=(imp-name i.t.wire)
==
~
`[%pass wire %agent [ship term] %unsubscribe ~]
--

View File

@ -1,3 +1,3 @@
:- %say
|= [* [name=term ~] ~]
[%spider-stop name]
[%spider-stop name |]

View File

@ -41,7 +41,6 @@
++ handle-unix-effect
|= [who=@p ue=unix-effect]
^- (quip card:agent:mall _this)
~& >> 'feeling affected'
=^ cards ships
?+ -.q.ue `ships
%restore (handle-restore our.bowl who)

View File

@ -22,6 +22,15 @@
~[%aqua-ames %aqua-behn %aqua-dill %aqua-eyre]
;< ~ bind:m (start-imps vane-imps)
;< ~ bind:m start-watching
:: We want to wait for the vane imps to actually start and get their
:: subscriptions started. Other ways to do this are delaying the ack
:: from spider until the build is finished (does that guarantee the
:: subscriptions have started?) or subscribe to the imps themselves
:: for a notification when they're done. This is probably the best
:: option because the imp can delay until it gets a positive ack on
:: the subscription.
::
;< ~ bind:m (sleep ~s0)
(pure:m ~)
::
++ end-simple
@ -29,6 +38,11 @@
^- form:m
=/ vane-imps=(list term)
~[%aqua-ames %aqua-behn %aqua-dill %aqua-eyre]
:: Get our very own event with mistakes in it... yet. Specifically,
:: this avoids the situation where updates going to the vane imps
:: crash in spider because the imp is already gone.
::
;< ~ bind:m (sleep ~s0)
;< ~ bind:m (stop-imps vane-imps)
;< ~ bind:m stop-watching
(pure:m ~)
@ -65,7 +79,7 @@
.^(? %mx /(scot %p our)/spider/(scot %da now)/started/[i.imps]/noun)
?. imp-started
loop(imps t.imps)
=/ poke-vase !>(i.imps)
=/ poke-vase !>([i.imps &])
;< ~ bind:m (poke-our %spider %spider-stop poke-vase)
loop(imps t.imps)
::

View File

@ -1174,8 +1174,7 @@
attributing.agent-routes :: guest
agent-name :: agent
== ::
:* :: NB (jtobin): see urbit/urbit#1466
wex=~ :: outgoing
:* wex=outgoing.subscribers.current-agent :: outgoing
sup=incoming.subscribers.current-agent :: incoming
== ::
:* act=change.stats.current-agent :: tick
@ -1285,12 +1284,29 @@
?> ?=([%out @ @ *] wire)
=/ other-ship (slav %p i.t.wire)
=/ other-agent i.t.t.wire
=/ =dock [other-ship other-agent]
=/ agent-wire t.t.t.wire
:: if subscription ack or close, handle before calling user code
::
=? outgoing.subscribers.current-agent ?=(%subscription-close -.gift)
%- ~(del by outgoing.subscribers.current-agent)
[wire dock]
=? outgoing.subscribers.current-agent ?=(%subscription-ack -.gift)
?^ p.gift
%- ~(del by outgoing.subscribers.current-agent)
[wire dock]
%+ ~(jab by outgoing.subscribers.current-agent) [agent-wire dock]
|= [acked=? =path]
~| [%already-acked agent-name wire dock path]
?< acked
[& path]
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
(handle-agent-response:ap-agent-core agent-wire gift)
::
=? ap-core ?=(%subscription-update -.gift)
(ap-update-subscription =(~ maybe-tang) other-ship other-agent agent-wire)
(ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire)
?^ maybe-tang
(ap-error -.gift leaf/"closing subscription" u.maybe-tang)
ap-core
@ -1403,7 +1419,7 @@
==
::
=. agent-cards
:(weld new-cards ack-cards agent-cards)
:(weld (flop new-cards) ack-cards agent-cards)
[maybe-tang ap-core]
:: +ap-handle-result: handle result.
::
@ -1414,13 +1430,10 @@
?: ?=(%| -.result)
`ap-core
::
=/ new-subs (ap-handle-quits -.p.result)
::
:- (flop -.p.result)
%_ ap-core
agent.current-agent +.p.result
incoming.subscribers.current-agent new-subs
==
=. agent.current-agent +.p.result
=. incoming.subscribers.current-agent
(ap-handle-quits -.p.result)
(ap-handle-peers -.p.result)
:: +ap-handle-quits: handle cancels of incoming subscriptions
::
++ ap-handle-quits
@ -1440,14 +1453,39 @@
=/ quit-map=bitt
(malt (turn quits |=(=duct [duct *[ship path]])))
(~(dif by incoming.subscribers.current-agent) quit-map)
:: +ap-tang: standard tang.
:: +ap-handle-peers: handle new outgoing subscriptions
::
++ ap-tang
|= =tape
^- tang
::
=/ =tank [%leaf (weld "mall: {<agent-name>}: " tape)]
[tank ~]
++ ap-handle-peers
~/ %ap-handle-peers
|= moves=(list card:agent)
^- [(list card:agent) _ap-core]
=| cards=(list card:agent)
|- ^- [(list card:agent) _ap-core]
?~ moves
[(flop cards) ap-core]
=/ =card:agent i.moves
?: ?=([%pass * %agent * %unsubscribe *] card)
=/ =wire p.card
=/ =dock [ship name]:q.card
=. outgoing.subscribers.current-agent
(~(del by outgoing.subscribers.current-agent) [wire dock])
$(moves t.moves, cards [card cards])
?. ?=([%pass * %agent * %subscribe *] card)
$(moves t.moves, cards [card cards])
=/ =wire p.card
=/ =dock [ship name]:q.card
=/ =path path.task.q.card
?: (~(has by outgoing.subscribers.current-agent) wire dock)
=. ap-core
=/ way [%out (scot %p p.dock) q.dock wire]
=/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >wire< >dock<]
%- (slog leaf/"XXX remove" tang)
(ap-specific-take way %subscription-ack `tang)
$(moves t.moves)
=. outgoing.subscribers.current-agent
(~(put by outgoing.subscribers.current-agent) [wire dock] [| path])
$(moves t.moves, cards [card cards])
--
--
:: +call: request

View File

@ -1880,8 +1880,8 @@
-- ::able
++ bitt (map duct (pair ship path)) :: incoming subs
++ boat :: outgoing subs
%+ map wire ::
(trel bean ship path) ::
%+ map ,[=wire =ship =term] ::
,[acked=? =path] ::
++ bowl :: standard app state
$: $: our/ship :: host
src/ship :: guest