!: :: %gall, agent execution !? 163 !: :::: |= pit=vase =, gall => =~ |% :: +ames-response: network response message (%boon) :: :: %d: fact :: %x: quit :: ++ ames-response $% [%d =mark noun=*] [%x ~] == :: +ames-request: network request (%plea) :: :: %m: poke :: %l: watch-as :: %s: watch :: %u: leave :: ++ ames-request $% [%m =mark noun=*] [%l =mark =path] [%s =path] [%u ~] == :: +remote-request: kinds of agent actions that can cross the network :: :: Used in wires to identify the kind of remote request we made. :: Bijective with the tags of $ames-request. :: ++ remote-request $? %watch %watch-as %poke %leave == -- |% :: +move: Arvo-level move :: ++ move $: =duct move=(wind note-arvo gift-arvo) == -- |% :: +state-old: upgrade path :: ++ state-old ?(state) :: +state: all state :: ++ state $: :: state version :: %0 :: agents by ship :: =agents == :: +subscribers: subscriber data :: ++ subscribers $: :: incoming subscribers :: incoming=bitt :: outgoing subscribers :: outgoing=boat == :: +agents: ship state :: ++ agents $: :: system duct :: system-duct=duct :: foreign contacts :: contacts=(set ship) :: running agents :: running=(map term running-agent) :: waiting queue :: blocked=(map term blocked) == :: +routes: new cuff :: ++ routes $: :: disclosing to :: disclosing=(unit (set ship)) :: attributed to :: attributing=ship == :: +running-agent: agent state :: ++ running-agent $: :: cache :: cache=worm :: control duct :: control-duct=duct :: unstopped :: live=? :: statistics :: =stats :: subscribers :: =subscribers :: agent core :: =agent :: update control :: =beak :: req'd translations :: marks=(map duct mark) == :: +blocked: blocked tasks :: ++ blocked (qeu (trel duct routes deal)) :: +stats: statistics :: ++ stats $: :: change number :: change=@ud :: entropy :: eny=@uvJ :: time :: time=@da == -- . == =| =state |= $: :: identity :: our=ship :: urban time :: now=@da :: entropy :: eny=@uvJ :: activate :: ski=sley == ~% %gall-top ..is ~ |% :: +gall-payload: gall payload :: ++ gall-payload + :: +mo: Arvo-level move handling :: :: An outer core responsible for routing moves to and from Arvo; it calls :: an inner core, +ap, to route internal moves to and from agents. :: ++ mo ~% %gall-mo +> ~ |_ $: hen=duct moves=(list move) == ++ mo-core . :: +mo-abed: initialise state with the provided duct. :: ++ mo-abed |= =duct ^+ mo-core :: mo-core(hen duct) :: +mo-abet: resolve moves. :: ++ mo-abet ^- [(list move) _gall-payload] :: =/ resolved (flop moves) [resolved gall-payload] :: :: +mo-boot: ask %ford to build us a core for the specified agent. :: ++ mo-boot |= [=term =ship =desk] ^+ mo-core :: =/ =case [%da now] =/ =wire =/ ship (scot %p ship) =/ case (scot case) /sys/cor/[term]/[ship]/[desk]/[case] :: =/ =note-arvo =/ =schematic:ford [%core [ship desk] /hoon/[term]/app] [%f %build live=%.y schematic] :: (mo-pass wire note-arvo) :: :: +mo-reboot: ask %ford to rebuild the specified agent :: ++ mo-reboot |= [force=? =term =ship] ^+ mo-core =/ gent (~(got by running.agents.state) term) =. hen control-duct.gent =* desk q.beak.gent :: if we're forcing a reboot, we don't try to %kill the old build :: ?: force (mo-boot term ship desk) :: =/ =wire =/ ship (scot %p ship) =/ case (scot r.beak.gent) /sys/cor/[term]/[ship]/[desk]/[case] %. [term ship desk] =< mo-boot =/ =note-arvo [%f %kill ~] (mo-pass wire note-arvo) :: :: :: +mo-goad: rebuild agent(s) :: ++ mo-goad |= [force=? agent=(unit dude)] ^+ mo-core ?^ agent ~| goad-gone+u.agent (mo-reboot force u.agent our) :: =/ agents=(list term) ~(tap in ~(key by running.agents.state)) |- ^+ mo-core ?~ agents mo-core %= $ agents t.agents ..mo-core (mo-reboot force i.agents our) == :: :: +mo-pass: prepend a standard %pass to the current list of moves. :: ++ mo-pass |= pass=(pair wire note-arvo) ^+ mo-core :: =/ =move [hen [%pass pass]] mo-core(moves [move moves]) :: +mo-give: prepend a standard %give to the current list of moves. :: ++ mo-give |= =gift:able ^+ mo-core :: =/ =move [hen [%give gift]] mo-core(moves [move moves]) :: +mo-receive-core: receives an app core built by %ford. :: :: Presuming we receive a good core, we first check to see if the agent :: is already running. If so, we update its beak in %gall's state, :: initialise an +ap core for the agent, install the core we got from :: %ford, and then resolve any moves associated with it. :: :: If we're dealing with a new agent, we create one using the result we :: got from %ford, add it to the collection of agents %gall is keeping :: track of, and then do more or less the same procedure as we did for the :: running agent case. :: ++ mo-receive-core ~/ %mo-receive-core |= [=term =beak =made-result:ford] ^+ mo-core :: ?: ?=([%incomplete *] made-result) (mo-give %onto %.n tang.made-result) :: =/ build-result build-result.made-result :: ?: ?=([%error *] build-result) (mo-give %onto %.n message.build-result) :: =/ =cage (result-to-cage:ford build-result) =/ result-vase q.cage =/ maybe-agent=(unit running-agent) (~(get by running.agents.state) term) :: ?^ maybe-agent =/ agent u.maybe-agent(beak beak) =. running.agents.state (~(put by running.agents.state) term agent) =/ =routes [disclosing=~ attributing=our] =/ app (ap-abed:ap term routes) =. app (ap-reinstall:app result-vase) ap-abet:app :: =/ maybe-new-agent (mule |.(!<(agent result-vase))) ?: ?=(%| -.maybe-new-agent) =/ err [[%leaf "{}: not valid agent"] p.maybe-new-agent] (mo-give %onto %.n err) =. mo-core (mo-new-agent term beak p.maybe-new-agent) =/ old mo-core =/ wag =/ =routes [disclosing=~ attributing=our] =/ app (ap-abed:ap term routes) (ap-upgrade-state:app ~) :: =/ maybe-tang -.wag =/ app +.wag ?^ maybe-tang =. mo-core old (mo-give %onto %.n u.maybe-tang) :: =. mo-core ap-abet:app =. mo-core (mo-clear-queue term) =/ =suss [term %boot now] (mo-give %onto [%.y suss]) :: +mo-new-agent: create a new agent and add it to %gall's state. :: :: %gall maintains a collection of running agents. This arm creates a :: new one with the provided name, beak, and state (held in a vase). :: ++ mo-new-agent |= [=term =beak =agent] ^+ mo-core :: =/ running-agent =/ default-agent *running-agent %_ default-agent control-duct hen beak beak agent agent == :: %_ mo-core running.agents.state (~(put by running.agents.state) term running-agent) == :: +mo-send-foreign-request: handle local request to .ship :: ++ mo-send-foreign-request ~/ %mo-send-foreign-request |= [=ship foreign-agent=term =deal] ^+ mo-core :: =. mo-core (mo-track-ship ship) =/ =ames-request ?- -.deal %poke [%m p.cage.deal q.q.cage.deal] %leave [%u ~] %raw-poke !! %poke-as !! %watch-as [%l deal] %watch [%s path.deal] == :: =/ wire =/ action -.deal /sys/way/(scot %p ship)/[foreign-agent]/[action] :: =/ =note-arvo =/ =path /ge/[foreign-agent] [%a %plea ship %g path ames-request] :: (mo-pass wire note-arvo) :: +mo-track-ship: subscribe to ames and jael for notices about .ship :: ++ mo-track-ship |= =ship ^+ mo-core :: if already contacted, no-op :: ?: (~(has in contacts.agents.state) ship) mo-core :: first contact; update state and subscribe to notifications :: =. contacts.agents.state (~(put in contacts.agents.state) ship) :: ask ames to track .ship's connectivity :: =. moves [[system-duct.agents.state %pass /sys/lag %a %heed ship] moves] :: ask jael to track .ship's breaches :: =/ =note-arvo [%j %public-keys (silt ship ~)] =. moves [[system-duct.agents.state %pass /sys/era note-arvo] moves] mo-core :: +mo-untrack-ship: cancel subscriptions to ames and jael for .ship :: ++ mo-untrack-ship |= =ship ^+ mo-core :: if already canceled, no-op :: ?. (~(has in contacts.agents.state) ship) mo-core :: delete .ship from state and kill subscriptions :: =. contacts.agents.state (~(del in contacts.agents.state) ship) :: =. moves [[system-duct.agents.state %pass /sys/lag %a %jilt ship] moves] :: =/ =note-arvo [%j %nuke (silt ship ~)] =. moves [[system-duct.agents.state %pass /sys/era note-arvo] moves] mo-core :: +mo-breach: ship breached, so forget about them :: ++ mo-breach |= =ship ^+ mo-core =. mo-core (mo-untrack-ship ship) =/ agents=(list [name=term =running-agent]) ~(tap by running.agents.state) |- ^+ mo-core ?~ agents mo-core =. mo-core =/ =routes [disclosing=~ attributing=ship] =/ app (ap-abed:ap name.i.agents routes) ap-abet:(ap-breach:app ship) $(agents t.agents) :: +mo-handle-sys: handle a +sign incoming over /sys. :: :: (Note that /sys implies the +sign should be routed to a vane.) :: ++ mo-handle-sys ~/ %mo-handle-sys |= [=path =sign-arvo] ^+ mo-core :: ?+ -.path !! %era (mo-handle-sys-era path sign-arvo) %cor (mo-handle-sys-cor path sign-arvo) %lag (mo-handle-sys-lag path sign-arvo) %pel (mo-handle-sys-pel path sign-arvo) %rep (mo-handle-sys-rep path sign-arvo) %req (mo-handle-sys-req path sign-arvo) %val (mo-handle-sys-val path sign-arvo) %way (mo-handle-sys-way path sign-arvo) == :: +mo-handle-sys-era: receive update about contact :: ++ mo-handle-sys-era |= [=path =sign-arvo] ^+ mo-core ?> ?=([%j %public-keys *] sign-arvo) ?> ?=([%era ~] path) ?. ?=(%breach -.public-keys-result.sign-arvo) mo-core (mo-breach who.public-keys-result.sign-arvo) :: +mo-handle-sys-cor: receive a cor from %ford. :: ++ mo-handle-sys-cor |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%cor @ @ @ @ ~] path) ?> ?=([%f %made *] sign-arvo) =/ beak-path t.t.path =/ =beak =/ =ship (slav %p i.beak-path) =/ =desk i.t.beak-path =/ =case [%da (slav %da i.t.t.beak-path)] [ship desk case] (mo-receive-core i.t.path beak result.sign-arvo) :: +mo-handle-sys-lag: handle an ames %clog notification :: ++ mo-handle-sys-lag |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%lag ~] path) ?> ?=([%a %clog *] sign-arvo) :: =/ agents=(list term) ~(tap in ~(key by running.agents.state)) |- ^+ mo-core ?~ agents mo-core :: =. mo-core =/ =routes [disclosing=~ attributing=our] =/ app (ap-abed:ap i.agents routes) ap-abet:(ap-clog:app ship.sign-arvo) :: $(agents t.agents) :: +mo-handle-sys-pel: translated peer. :: :: Validates a received %ford result and %gives an internal :: %fact. :: ++ mo-handle-sys-pel |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%pel @ ~] path) ?> ?=([%f %made *] sign-arvo) :: ?- result.sign-arvo [%incomplete *] (mo-give %unto %poke-ack `tang.result.sign-arvo) :: [%complete %error *] (mo-give %unto %poke-ack `message.build-result.result.sign-arvo) :: [%complete %success *] (mo-give %unto %fact (result-to-cage:ford build-result.result.sign-arvo)) == :: +mo-handle-sys-rep: reverse request. :: :: On receipt of a valid +sign from %ford, sets state to the :: appropriate duct and gives an internal %fact :: containing the +sign payload. :: ++ mo-handle-sys-rep |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%rep ~] path) ?> ?=([%f %made *] sign-arvo) :: ?- result.sign-arvo [%incomplete *] (mo-give %done `[%gall-fail tang.result.sign-arvo]) :: [%complete %error *] (mo-give %done `[%gall-fail message.build-result.result.sign-arvo]) :: [%complete %success *] (mo-give %unto %fact (result-to-cage:ford build-result.result.sign-arvo)) == :: +mo-handle-sys-req: TODO description :: :: TODO: what should we do if the remote nacks our %pull? ++ mo-handle-sys-req |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%req @ @ ~] path) =/ him (slav %p i.t.path) =/ dap i.t.t.path :: ?> ?=([?(%g %b) %unto *] sign-arvo) =/ =sign:agent +>.sign-arvo :: ?- -.sign %poke-ack =/ err=(unit error:ames) ?~ p.sign ~ `[%poke-ack u.p.sign] (mo-give %done err) :: %fact =+ [mark noun]=[p q.q]:cage.sign (mo-give %boon %d mark noun) :: %kick (mo-give %boon %x ~) :: %watch-ack =/ err=(unit error:ames) ?~ p.sign ~ `[%watch-ack u.p.sign] (mo-give %done err) == :: +mo-handle-sys-val: inbound validate. :: :: Validates an incoming +sign from %ford and applies it to the :: specified agent. :: ++ mo-handle-sys-val |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%val @ @ ~] path) ?> ?=([%f %made *] sign-arvo) =/ =ship (slav %p i.t.path) =/ =term i.t.t.path ?: ?=([%incomplete *] result.sign-arvo) =/ err (some tang.result.sign-arvo) (mo-give %unto %poke-ack err) :: =/ build-result build-result.result.sign-arvo ?: ?=([%error *] build-result) =/ err (some message.build-result) (mo-give %unto %poke-ack err) :: =/ =routes [disclosing=~ attributing=ship] =/ =cage (result-to-cage:ford build-result) =/ =deal [%poke cage] (mo-apply term routes deal) :: +mo-handle-sys-way: handle response to outgoing remote request :: ++ mo-handle-sys-way |= [=path =sign-arvo] ^+ mo-core :: ?> ?=([%way @ @ @ ~] path) =/ =ship (slav %p i.t.path) =/ foreign-agent i.t.t.path =/ remote-request (remote-request i.t.t.t.path) :: ?+ sign-arvo !! [%a %done *] =/ err=(unit tang) ?~ error=error.sign-arvo ~ `[[%leaf (trip tag.u.error)] tang.u.error] :: ?- remote-request %watch-as (mo-give %unto %watch-ack err) %watch (mo-give %unto %watch-ack err) %poke (mo-give %unto %poke-ack err) %leave mo-core == :: [%a %boon *] =/ =ames-response ;;(ames-response payload.sign-arvo) (mo-handle-ames-response ames-response) :: [%a %lost *] =/ sys-wire [%sys path] :: TODO: %drip %kick so app crash can't kill the remote %pull :: =. mo-core (mo-give %unto %kick ~) =. mo-core (mo-pass sys-wire %a %plea ship %g /ge/[foreign-agent] %u ~) mo-core == :: +mo-handle-use: handle a typed +sign incoming on /use. :: :: (Note that /use implies the +sign should be routed to an agent.) :: :: Initialises the specified agent and then performs an agent-level :: +take on the supplied +sign. :: ++ mo-handle-use ~/ %mo-handle-use |= [=path hin=(hypo sign-arvo)] ^+ mo-core :: ?. ?=([@ @ *] path) ~& [%mo-handle-use-bad-path path] !! :: =/ =sign-arvo q.hin ?. ?=([?(%g %b) %unto *] sign-arvo) =/ app =/ =term i.path =/ =ship (slav %p i.t.path) =/ =routes [disclosing=~ attributing=ship] (ap-abed:ap term routes) :: =. app (ap-generic-take:app t.t.path sign-arvo) ap-abet:app =/ =sign:agent +>.sign-arvo =/ app ?> ?=([%out @ @ *] t.t.path) =/ =term i.path =/ =ship (slav %p i.t.t.t.path) =/ =routes [disclosing=~ attributing=ship] (ap-abed:ap term routes) =. app (ap-specific-take:app t.t.path sign) ap-abet:app :: +mo-clear-queue: clear blocked tasks from the specified running agent. :: ++ mo-clear-queue |= =term ^+ mo-core :: ?. (~(has by running.agents.state) term) mo-core =/ maybe-blocked (~(get by blocked.agents.state) term) ?~ maybe-blocked mo-core :: =/ =blocked u.maybe-blocked :: |- ^+ mo-core ?: =(~ blocked) =/ blocked (~(del by blocked.agents.state) term) %_ mo-core blocked.agents.state blocked == =^ task blocked [p q]:~(get to blocked) =/ =duct p.task =/ =routes q.task =/ =deal r.task :: =/ move =/ =sock [attributing.routes our] =/ card [%slip %g %deal sock term deal] [duct card] $(moves [move moves]) :: +mo-beak: assemble a beak for the specified agent. :: ++ mo-beak |= =term ^- beak :: ?~ running=(~(get by running.agents.state) term) :: XX this fallback is necessary, as .term could be either the source :: or the destination app. ie, it might not exist locally ... :: [our %home %da now] beak.u.running :: +mo-peek: call to +ap-peek (which is not accessible outside of +mo). :: ++ mo-peek ~/ %mo-peek |= [agent=term =routes =term =path] ^- (unit (unit cage)) :: =/ app (ap-abed:ap agent routes) (ap-peek:app term path) :: +mo-apply: apply the supplied action to the specified agent. :: ++ mo-apply |= [agent=term =routes =deal] ^+ mo-core :: =/ =path =/ ship (scot %p attributing.routes) /sys/val/[ship]/[agent] :: =/ ship-desk =/ =beak (mo-beak agent) [p q]:beak :: ?: ?=(%raw-poke -.deal) =/ =schematic:ford [%vale ship-desk +.deal] =/ =note-arvo [%f %build live=%.n schematic] (mo-pass path note-arvo) :: ?: ?=(%poke-as -.deal) =/ =schematic:ford [%cast ship-desk mark.deal [%$ cage.deal]] =/ =note-arvo [%f %build live=%.n schematic] (mo-pass path note-arvo) :: =/ app (ap-abed:ap agent routes) =. app (ap-apply:app deal) ap-abet:app :: +mo-handle-local: handle locally. :: :: If the agent is running or blocked, assign it the supplied +deal. :: Otherwise simply apply the action to the agent. :: ++ mo-handle-local |= [=ship agent=term =deal] ^+ mo-core :: =/ =routes [disclosing=~ attributing=ship] =/ is-running (~(has by running.agents.state) agent) =/ is-blocked (~(has by blocked.agents.state) agent) :: ?: |(!is-running is-blocked) =/ =blocked =/ waiting (~(get by blocked.agents.state) agent) =/ deals (fall waiting *blocked) =/ deal [hen routes deal] (~(put to deals) deal) :: ~& >> [%gall-not-running agent -.deal] %_ mo-core blocked.agents.state (~(put by blocked.agents.state) agent blocked) == (mo-apply agent routes deal) :: +mo-handle-ames-request: handle %ames request message. :: ++ mo-handle-ames-request |= [=ship agent-name=term =ames-request] ^+ mo-core :: %u/%leave gets automatically acked :: =. mo-core (mo-track-ship ship) =? mo-core ?=(%u -.ames-request) (mo-give %done ~) :: =/ =wire /sys/req/(scot %p ship)/[agent-name] :: =/ =deal ?- -.ames-request %m [%raw-poke [mark noun]:ames-request] %l [%watch-as [mark path]:ames-request] %s [%watch path.ames-request] %u [%leave ~] == (mo-pass wire %g %deal [ship our] agent-name deal) :: +mo-handle-ames-response: handle ames response message. :: ++ mo-handle-ames-response |= =ames-response ^+ mo-core :: ?- -.ames-response :: %d: diff; ask ford to validate .noun as .mark :: %d =/ =wire /sys/rep :: agents load their code from the %home desk, including marks :: =/ =note-arvo =/ =disc:ford [our %home] [%f %build live=%.n %vale disc [mark noun]:ames-response] :: (mo-pass wire note-arvo) :: :: %x: kick; tell agent the publisher canceled the subscription :: %x (mo-give %unto %kick ~) == :: +ap: agent engine :: :: An inner, agent-level core. The sample refers to the agent we're :: currently focused on. :: ++ ap ~% %gall-ap +> ~ |_ $: agent-name=term agent-routes=routes agent-duct=duct agent-moves=(list move) agent-config=(list (each suss tang)) current-agent=running-agent == ++ ap-core . :: +ap-abed: initialise state for an agent, with the supplied routes. :: :: The agent must already be running in +gall -- here we simply update :: +ap's state to focus on it. :: ++ ap-abed ~/ %ap-abed |= [=term =routes] ^+ ap-core :: =/ =running-agent =/ running (~(got by running.agents.state) term) =/ =stats :+ +(change.stats.running) (shaz (mix (add term change.stats.running) eny)) now running(stats stats) :: =. agent-name term =. agent-routes routes =. current-agent running-agent =. agent-duct hen ap-core :: +ap-abet: resolve moves. :: ++ ap-abet ^+ mo-core :: =/ running (~(put by running.agents.state) agent-name current-agent) =/ moves =/ giver |=(report=(each suss tang) [hen %give %onto report]) =/ from-suss (turn agent-config giver) :(weld agent-moves from-suss moves) :: %_ mo-core running.agents.state running moves moves == :: +ap-from-internal: internal move to move. :: :: We convert from cards to duct-indexed moves when resolving :: them in Arvo. :: ++ ap-from-internal ~/ %ap-from-internal |= card=(wind neat gift:agent) ^- (list move) :: ?- -.card %slip !! :: %give =/ =gift:agent p.card ?: ?=(%kick -.gift) =/ ducts=(list duct) (ap-ducts-from-path path.gift ship.gift) %+ turn ducts |= =duct ~? &(=(duct system-duct.agents.state) !=(agent-name %hood)) [%agent-giving-on-system-duct agent-name -.gift] [duct %give %unto %kick ~] :: ?. ?=(%fact -.gift) [agent-duct %give %unto gift]~ :: =/ ducts=(list duct) (ap-ducts-from-path path.gift ~) =/ =cage cage.gift %+ turn ducts |= =duct ~? &(=(duct system-duct.agents.state) !=(agent-name %hood)) [%agent-giving-on-system-duct agent-name -.gift] ^- move =/ =mark (~(gut by marks.current-agent) duct p.cage) :: ?: =(mark p.cage) [duct %give %unto %fact cage.gift] =/ =path /sys/pel/[agent-name] =/ =note-arvo =/ =schematic:ford =/ =beak (mo-beak agent-name) [%cast [p q]:beak mark [%$ cage]] [%f %build live=%.n schematic] :: [duct %pass path note-arvo] :: %pass =/ =duct system-duct.agents.state =/ =wire p.card =/ =neat:agent q.card =? wire ?=(%agent -.neat) [%out (scot %p ship.neat) name.neat wire] =. wire :: Is it bad that this includes attributing ship? May create :: spurious duct mismatches :: [%use agent-name (scot %p attributing.agent-routes) wire] =/ =note-arvo ?- -.neat %arvo note-arvo.neat %agent =/ =task:able =/ =sock [our ship.neat] [%deal sock [name deal]:neat] [%g task] == [duct %pass wire note-arvo]~ == :: +ap-breach: ship breached, so forget about them :: ++ ap-breach |= =ship ^+ ap-core =/ in=(list [=duct =^ship =path]) ~(tap by incoming.subscribers.current-agent) |- ^+ ap-core ?^ in =? ap-core =(ship ship.i.in) =/ core ap-load-delete(agent-duct duct.i.in) core(agent-duct agent-duct) $(in t.in) :: =/ out=(list [[=wire =^ship =term] ? =path]) ~(tap by outgoing.subscribers.current-agent) |- ^+ ap-core ?~ out ap-core =? ap-core =(ship ship.i.out) =/ core =. agent-duct system-duct.agents.state (ap-specific-take wire.i.out %kick ~) core(agent-duct agent-duct) $(out t.out) :: +ap-clog: handle %clog notification from ames :: :: Kills subscriptions from .ship in both directions: :: - notifies local app that subscription is dead :: - gives remote %quit to notify subscriber ship :: TODO: %drip local app notification for error isolation :: ++ ap-clog |= =ship ^+ ap-core :: =/ in=(list [=duct =^ship =path]) ~(tap by incoming.subscribers.current-agent) |- ^+ ap-core ?~ in ap-core :: =? ap-core =(ship ship.i.in) =/ core ap-kill-up(agent-duct duct.i.in) core(agent-duct agent-duct) $(in t.in) :: +ap-agent-core: agent core with current bowl and state :: ++ ap-agent-core ~(. agent.current-agent ap-construct-bowl) :: +ap-ducts-from-path: get ducts subscribed to path :: ++ ap-ducts-from-path |= [target-path=(unit path) target-ship=(unit ship)] ^- (list duct) ?: &(?=(~ target-path) ?=(~ target-ship)) ~[agent-duct] %+ murn ~(tap by incoming.subscribers.current-agent) |= [=duct =ship =path] ^- (unit ^duct) ?~ target-ship ?: =(target-path `path) `duct ~ ?~ target-path ?: =(target-ship `ship) `duct ~ ?: &(=(target-path `path) =(target-ship `ship)) `duct ~ :: +ap-apply: apply effect. :: ++ ap-apply |= =deal ^+ ap-core :: ?- -.deal %watch-as (ap-subscribe-as +.deal) %poke (ap-poke +.deal) %watch (ap-subscribe +.deal) %raw-poke !! %poke-as !! %leave ap-load-delete == :: +ap-peek: peek. :: ++ ap-peek ~/ %ap-peek |= [=term tyl=path] ^- (unit (unit cage)) :: =/ marked ?. ?=(%x term) [mark=%$ tyl=tyl] :: =/ =path (flop tyl) ?> ?=(^ path) [mark=i.path tyl=(flop t.path)] :: =/ =mark mark.marked =/ tyl tyl.marked :: =/ peek-result=(each (unit (unit cage)) tang) (ap-mule-peek |.((on-peek:ap-agent-core [term tyl]))) :: ?- -.peek-result %& p.peek-result %| ((slog leaf+"peek bad result" p.peek-result) [~ ~]) == :: +ap-update-subscription: update subscription. :: ++ ap-update-subscription ~/ %ap-update-subscription |= [is-ok=? =other=ship other-agent=term =wire] ^+ ap-core :: ?: is-ok ap-core (ap-kill-down wire [other-ship other-agent]) :: +ap-give: return result. :: ++ ap-give |= =gift:agent ^+ ap-core :: =/ internal-moves (weld (ap-from-internal %give gift) agent-moves) ap-core(agent-moves internal-moves) :: +ap-construct-bowl: set up bowl. :: ++ ap-construct-bowl ^- bowl :* :* our :: host attributing.agent-routes :: guest agent-name :: agent == :: :* wex=outgoing.subscribers.current-agent :: outgoing sup=incoming.subscribers.current-agent :: incoming == :: :* act=change.stats.current-agent :: tick eny=eny.stats.current-agent :: nonce now=time.stats.current-agent :: time byk=beak.current-agent :: source == == :: +ap-pass: request action. :: ++ ap-pass |= [=path =neat] ^+ ap-core :: =/ internal-moves (ap-from-internal %pass path neat) ap-core(agent-moves (weld internal-moves agent-moves)) :: +ap-reinstall: reinstall. :: ++ ap-reinstall ~/ %ap-reinstall |= =vase ^+ ap-core :: =/ maybe-agent (mule |.(!<(agent vase))) ?: ?=(%| -.maybe-agent) (ap-error %new-core-not-agent p.maybe-agent) :: =/ prep =/ =agent p.maybe-agent =/ running %- some ~(on-save agent.current-agent ap-construct-bowl) =/ installed ap-install(agent.current-agent agent) (installed running) :: =^ maybe-tang ap-core prep ?~ maybe-tang ap-core (ap-error %load-failed u.maybe-tang) :: +ap-subscribe-as: apply %watch-as. :: ++ ap-subscribe-as |= [=mark =path] ^+ ap-core :: =. marks.current-agent (~(put by marks.current-agent) agent-duct mark) (ap-subscribe path) :: +ap-subscribe: apply %watch. :: ++ ap-subscribe ~/ %ap-subscribe |= pax=path ^+ ap-core :: =/ incoming [attributing.agent-routes pax] =. incoming.subscribers.current-agent (~(put by incoming.subscribers.current-agent) agent-duct incoming) :: =^ maybe-tang ap-core %+ ap-ingest %watch-ack |. (on-watch:ap-agent-core pax) ?^ maybe-tang ap-silent-delete ap-core :: +ap-poke: apply %poke. :: ++ ap-poke ~/ %ap-poke |= =cage ^+ ap-core :: =^ maybe-tang ap-core %+ ap-ingest %poke-ack |. (on-poke:ap-agent-core cage) ap-core :: +ap-error: pour error. :: ++ ap-error |= [=term =tang] ^+ ap-core :: =/ form |=(=tank [%rose [~ "! " ~] tank ~]) =^ maybe-tang ap-core %+ ap-ingest ~ |. (on-fail:ap-agent-core term (turn tang form)) ap-core :: +ap-generic-take: generic take. :: ++ ap-generic-take ~/ %ap-generic-take |= [=wire =sign-arvo] ^+ ap-core :: =^ maybe-tang ap-core %+ ap-ingest ~ |. (on-arvo:ap-agent-core wire sign-arvo) ?^ maybe-tang (ap-error %arvo-response u.maybe-tang) ap-core :: +ap-specific-take: specific take. :: ++ ap-specific-take |= [=wire =sign:agent] ^+ ap-core :: ~| wire=wire ?> ?=([%out @ @ *] wire) =/ other-ship (slav %p i.t.wire) =/ other-agent i.t.t.wire =/ =dock [other-ship other-agent] =/ agent-wire t.t.t.wire :: if subscription ack or close, handle before calling user code :: =? outgoing.subscribers.current-agent ?=(%kick -.sign) %- ~(del by outgoing.subscribers.current-agent) [agent-wire dock] ?: ?& ?=(%watch-ack -.sign) !(~(has by outgoing.subscribers.current-agent) [agent-wire dock]) == %- %: slog leaf+"{}: got ack for nonexistent subscription" leaf+"{}: {}" >wire=wire< >out=outgoing.subscribers.current-agent< ~ == ap-core :: =? outgoing.subscribers.current-agent ?=(%watch-ack -.sign) ?^ p.sign %- ~(del by outgoing.subscribers.current-agent) [agent-wire dock] %+ ~(jab by outgoing.subscribers.current-agent) [agent-wire dock] |= [acked=? =path] ~| [%already-acked agent-name wire dock path] ?< acked [& path] :: =^ maybe-tang ap-core %+ ap-ingest ~ |. (on-agent:ap-agent-core agent-wire sign) :: =? ap-core ?=(%fact -.sign) (ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire) ?^ maybe-tang (ap-error -.sign leaf/"closing subscription" u.maybe-tang) ap-core :: +ap-install: install wrapper. :: ++ ap-install |= maybe-vase=(unit vase) ^- [(unit tang) _ap-core] :: =^ maybe-tang ap-core (ap-upgrade-state maybe-vase) :: =/ new-agent-config =/ =term ?~(maybe-vase %boot %bump) =/ possibly-suss ?~ maybe-tang =/ =suss [agent-name term now] [%.y suss] [%.n u.maybe-tang] [possibly-suss agent-config] :: =/ next ap-core(agent-config new-agent-config) :: [maybe-tang next] :: +ap-upgrade-state: low-level install. :: ++ ap-upgrade-state ~/ %ap-upgrade-state |= maybe-vase=(unit vase) ^- [(unit tang) _ap-core] :: =^ maybe-tang ap-core %+ ap-ingest ~ ?~ maybe-vase |. on-init:ap-agent-core |. (on-load:ap-agent-core u.maybe-vase) [maybe-tang ap-core] :: +ap-silent-delete: silent delete. :: ++ ap-silent-delete ^+ ap-core :: %= ap-core incoming.subscribers.current-agent (~(del by incoming.subscribers.current-agent) agent-duct) == :: +ap-load-delete: load delete. :: ++ ap-load-delete ^+ ap-core :: =/ maybe-incoming (~(get by incoming.subscribers.current-agent) agent-duct) ?~ maybe-incoming ap-core :: =/ incoming u.maybe-incoming =. incoming.subscribers.current-agent (~(del by incoming.subscribers.current-agent) agent-duct) :: =^ maybe-tang ap-core %+ ap-ingest ~ |. (on-leave:ap-agent-core q.incoming) ?^ maybe-tang (ap-error %leave u.maybe-tang) ap-core :: +ap-kill-up: 2-sided kill from publisher side :: ++ ap-kill-up ^+ ap-core :: => ap-load-delete (ap-give %kick ~ ~) :: +ap-kill-down: 2-sided kill from subscriber side :: :: Must process leave first in case kick handler rewatches. :: ++ ap-kill-down |= [=wire =dock] ^+ ap-core :: =. ap-core (ap-pass wire %agent dock %leave ~) =/ way [%out (scot %p p.dock) q.dock wire] (ap-pass way %arvo %b %huck !>([%unto %kick ~])) :: +ap-mule: run virtualized with intercepted scry, preserving type :: :: Compare +mute and +mule. Those pass through scry, which :: doesn't allow us to catch crashes due to blocking scry. If :: you intercept scry, you can't preserve the type :: polymorphically. By monomorphizing, we are able to do so :: safely. :: ++ ap-mule |= run=_^?(|.(*step:agent)) ^- (each step:agent tang) =/ res (mock [run %9 2 %0 1] (sloy ski)) ?- -.res %0 [%& !<(step:agent [-:!>(*step:agent) p.res])] %1 [%| (turn p.res |=(a=* (smyt (path a))))] %2 [%| p.res] == :: +ap-mule-peek: same as +ap-mule but for (unit (unit cage)) :: ++ ap-mule-peek |= run=_^?(|.(*(unit (unit cage)))) ^- (each (unit (unit cage)) tang) =/ res (mock [run %9 2 %0 1] (sloy ski)) ?- -.res %0 [%& !<((unit (unit cage)) [-:!>(*(unit (unit cage))) p.res])] %1 [%| (turn p.res |=(a=* (smyt (path a))))] %2 [%| p.res] == :: +ap-ingest: call agent arm :: :: Handle acks here because they need to be emitted before the :: rest of the moves. :: ++ ap-ingest |= [ack=?(%poke-ack %watch-ack ~) run=_^?(|.(*step:agent))] ^- [(unit tang) _ap-core] =/ result (ap-mule run) =^ new-moves ap-core (ap-handle-result result) =/ maybe-tang=(unit tang) ?: ?=(%& -.result) ~ `p.result =/ ack-moves=(list move) %- zing %- turn :_ ap-from-internal ^- (list card:agent) ?- ack ~ ~ %poke-ack [%give %poke-ack maybe-tang]~ %watch-ack [%give %watch-ack maybe-tang]~ == :: =. agent-moves :(weld (flop new-moves) ack-moves agent-moves) [maybe-tang ap-core] :: +ap-handle-result: handle result. :: ++ ap-handle-result ~/ %ap-handle-result |= result=(each step:agent tang) ^- [(list move) _ap-core] ?: ?=(%| -.result) `ap-core :: =. agent.current-agent +.p.result =/ moves (zing (turn -.p.result ap-from-internal)) =. incoming.subscribers.current-agent (ap-handle-kicks moves) (ap-handle-peers moves) :: +ap-handle-kicks: handle cancels of incoming subscriptions :: ++ ap-handle-kicks ~/ %ap-handle-kicks |= moves=(list move) ^- bitt =/ quits=(list duct) %+ murn moves |= =move ^- (unit duct) ?. ?=([* %give %unto %kick *] move) ~ `duct.move :: =/ quit-map=bitt (malt (turn quits |=(=duct [duct *[ship path]]))) (~(dif by incoming.subscribers.current-agent) quit-map) :: +ap-handle-peers: handle new outgoing subscriptions :: ++ ap-handle-peers ~/ %ap-handle-peers |= moves=(list move) ^- [(list move) _ap-core] =| new-moves=(list move) |- ^- [(list move) _ap-core] ?~ moves [(flop new-moves) ap-core] =/ =move i.moves ?: ?=([* %pass * %g %deal * * %leave *] move) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) =/ short-wire t.t.t.t.t.t.wire =/ =dock [q.p q]:q.move.move =. outgoing.subscribers.current-agent (~(del by outgoing.subscribers.current-agent) [short-wire dock]) $(moves t.moves, new-moves [move new-moves]) ?. ?=([* %pass * %g %deal * * %watch *] move) $(moves t.moves, new-moves [move new-moves]) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) =/ short-wire t.t.t.t.t.t.wire =/ =dock [q.p q]:q.move.move =/ =path path.r.q.move.move ?: (~(has by outgoing.subscribers.current-agent) short-wire dock) =. ap-core =/ way [%out (scot %p p.dock) q.dock short-wire] =/ =tang ~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<] %- (slog >out=outgoing.subscribers.current-agent< tang) (ap-error %watch-not-unique tang) $(moves t.moves) =. outgoing.subscribers.current-agent (~(put by outgoing.subscribers.current-agent) [short-wire dock] [| path]) $(moves t.moves, new-moves [move new-moves]) -- -- :: +call: request :: ++ call ~% %gall-call +> ~ |= [=duct hic=(hypo (hobo task:able))] ^- [(list move) _gall-payload] :: ~| [%gall-call-failed duct q.hic] :: make sure our task is hard :: =/ =task:able ?. ?=(%soft -.q.hic) q.hic ;; task:able p.q.hic :: =/ initialised (mo-abed:mo duct) ?- -.task %conf =/ =dock p.task =/ =ship p.dock ?. =(our ship) ~& [%gall-not-ours ship] [~ gall-payload] :: => (mo-boot:initialised q.dock q.task) mo-abet :: %deal =/ =sock p.task =/ =term q.task =/ =deal r.task ?. =(q.sock our) ?> =(p.sock our) => (mo-send-foreign-request:initialised q.sock term deal) mo-abet :: => (mo-handle-local:initialised p.sock term deal) mo-abet :: %goad mo-abet:(mo-goad:initialised force.task agent.task) :: %init =/ payload gall-payload(system-duct.agents.state duct) [~ payload] :: %trim :: reuse %wash task to clear caches on memory-pressure :: :: XX cancel subscriptions if =(0 trim-priority) ? :: ~> %slog.[0 leaf+"gall: trim: clearing caches"] =/ =move [duct %pass / %g [%wash ~]] [[move ~] gall-payload] :: %vega [~ gall-payload] :: %plea =/ =ship ship.task =/ =path path.plea.task =/ =noun payload.plea.task :: ~| [ship=ship plea-path=path] ?> ?=([%ge @ ~] path) =/ agent-name i.t.path :: =/ =ames-request ;;(ames-request noun) => (mo-handle-ames-request:initialised ship agent-name ames-request) mo-abet :: %wash =. running.agents.state %- ~(run by running.agents.state) |= =running-agent running-agent(cache *worm) [~ gall-payload] :: %wegh =/ blocked =/ queued (~(run by blocked.agents.state) |=(blocked [%.y +<])) (sort ~(tap by queued) aor) :: =/ running =/ active (~(run by running.agents.state) |=(running-agent [%.y +<])) (sort ~(tap by active) aor) :: =/ =mass :+ %gall %.n :~ [%foreign %.y contacts.agents.state] [%blocked %.n blocked] [%active %.n running] [%dot %.y state] == :: [[duct %give %mass mass]~ gall-payload] == :: +load: recreate vane :: ++ load :: |= * :: gall-payload |= =state-old ^+ gall-payload :: ?- -.state-old %0 gall-payload(state state-old) == :: +scry: standard scry :: ++ scry ~/ %gall-scry |= [fur=(unit (set monk)) =term =shop =desk =coin =path] ^- (unit (unit cage)) ?. ?=(%.y -.shop) ~ :: =/ =ship p.shop ?: ?& =(%u term) =(~ path) =([%$ %da now] coin) =(our ship) == =/ =vase !>((~(has by running.agents.state) desk)) =/ =cage [%noun vase] (some (some cage)) :: ?. =(our ship) ~ :: ?. =([%$ %da now] coin) ~ :: ?. (~(has by running.agents.state) desk) (some ~) :: ?. ?=(^ path) ~ :: =/ initialised mo-abed:mo =/ =routes [~ ship] (mo-peek:initialised desk routes term path) :: +stay: save without cache :: ++ stay state :: +take: response :: ++ take ~/ %gall-take |= [=wire =duct hin=(hypo sign-arvo)] ^- [(list move) _gall-payload] :: ~| [%gall-take-failed wire] :: ?> ?=([?(%sys %use) *] wire) =/ initialised (mo-abed:mo duct) =/ =sign-arvo q.hin => ?- i.wire %sys (mo-handle-sys:initialised t.wire sign-arvo) %use (mo-handle-use:initialised t.wire hin) == mo-abet --