shrub/pkg/arvo/lib/push-hook.hoon

513 lines
13 KiB
Plaintext

:: lib/push-hook: helper for creating a push hook
::
:: lib/push-hook is a helper for automatically pushing data from a
:: local store to the corresponding pull-hook on remote ships. It also
:: proxies remote pokes to the store.
::
:: ## Interfacing notes:
::
:: The inner door may interact with the library by producing cards.
:: Do not pass any cards on a wire beginning with /helper as these
:: wires are reserved by this library. Any watches/pokes/peeks not
:: listed below will be routed to the inner door.
::
:: ## Subscription paths
::
:: /resource/[resource]: Receive initial state and updates to
:: .resource. .resource should be encoded with en-path:resource from
:: /lib/resource. Facts on this path will be of mark
:: update-mark.config
::
:: ## Pokes
::
:: %push-hook-action: Add/remove a resource from pushing.
:: [update-mark.config]: A poke to proxy to the local store or a
:: foreign push-hook
::
/- *push-hook
/+ default-agent, resource, verb, versioning, agentio
|%
+$ card card:agent:gall
::
:: $config: configuration for the push hook
::
:: .store-name: name of the store to proxy pokes and
:: subscriptions to
:: .store-path: subscription path to receive updates on
:: .update-mark: mark that updates will be tagged with
:: .pull-hook-name: name of the corresponding pull-hook
::
+$ config
$: store-name=term
store-path=path
update=mold
update-mark=term
pull-hook-name=term
version=@ud
min-version=@ud
==
::
:: $base-state-0: state for the push hook
::
:: .sharing: resources that the push hook is proxying
:: .inner-state: state given to internal door
::
+$ base-state-0
$: sharing=(set resource)
inner-state=vase
==
::
+$ state-0 [%0 base-state-0]
::
+$ state-1 [%1 base-state-0]
::
+$ versioned-state
$% state-0
state-1
==
++ push-hook
|* =config
$_ ^|
|_ bowl:gall
::
:: +resource-for-update: get affected resources from an update
::
:: Given a vase of the update, the mark of which is
:: update-mark.config, produce the affected resources, if any.
::
++ resource-for-update
|~ vase
*(list resource)
::
:: +take-update: handle update from store
::
:: Given an update from the store, do other things after proxying
:: the update
::
++ take-update
|~ vase
*[(list card) _^|(..on-init)]
:: +transform-proxy-update: optionally transform update
::
:: If ^ is produced, then the update is forwarded to the local
:: store. If ~ is produced, the update is not forwarded and the
:: poke fails.
::
++ transform-proxy-update
|~ vase
*(unit vase)
:: +initial-watch: produce initial state for a subscription
::
:: .resource is the resource being subscribed to.
:: .path is any additional information in the subscription wire.
:: This would typically be used to encode state that the subscriber
:: already has. For example, a chat client might encode
:: the number of messages that it already has, or the date it last
:: received an update.
::
:: If +initial-watch crashes, the subscription fails.
::
++ initial-watch
|~ [path resource]
*vase
:: from agent:gall
::
++ on-init
*[(list card) _^|(..on-init)]
::
++ on-save
*vase
::
++ on-load
|~ vase
*[(list card) _^|(..on-init)]
::
++ on-poke
|~ cage
*[(list card) _^|(..on-init)]
::
++ on-watch
|~ path
*[(list card) _^|(..on-init)]
::
++ on-leave
|~ path
*[(list card) _^|(..on-init)]
::
++ on-peek
|~ path
*(unit (unit cage))
::
++ on-agent
|~ [wire sign:agent:gall]
*[(list card) _^|(..on-init)]
::
++ on-arvo
|~ [wire sign-arvo]
*[(list card) _^|(..on-init)]
::
++ on-fail
|~ [term tang]
*[(list card) _^|(..on-init)]
--
++ agent
|* =config
|= =(push-hook config)
=| state-1
=* state -
^- agent:gall
=<
|_ =bowl:gall
+* this .
og ~(. push-hook bowl)
hc ~(. +> bowl)
def ~(. (default-agent this %|) bowl)
io ~(. agentio bowl)
pass pass:io
ver ~(. versioning [bowl [update-mark version min-version]:config])
::
++ on-init
=^ cards push-hook
on-init:og
:_ this
[watch-store:hc cards]
::
++ on-load
|= =old=vase
=/ old
!<(versioned-state old-vase)
=| cards=(list card:agent:gall)
|^
?- -.old
%1
=^ og-cards push-hook
(on-load:og inner-state.old)
=/ old-subs
find-old-subs
=/ version-cards
:- (fact:io version+!>(version.config) /version ~)
?~ old-subs ~
(kick:io old-subs)^~
[:(weld cards og-cards version-cards) this(state old)]
::
::
%0
%_ $
-.old %1
::
cards
=/ paths=(list path)
kicked-watches
?~ paths cards
:_ cards
[%give %kick paths ~]
==
==
::
++ find-old-subs
%~ tap in
%+ roll
~(val by sup.bowl)
|= [[=ship =path] out=(set path)]
?. ?=([%resource *] path) out
?. ?=([%resource %ver] path)
(~(put in out) path)
=/ path-ver=@ud
(ver-from-path:hc path)
?: (supported:ver (append-version:ver path-ver)) out
(~(put in out) path)
::
++ kicked-watches
^- (list path)
%~ tap in
%+ roll
~(val by sup.bowl)
|= [[=ship =path] out=(set path)]
?~ path out
?. (lth 4 (lent path))
out
(~(put in out) path)
--
::
++ on-save
=. inner-state
on-save:og
!>(state)
::
++ on-poke
|= [=mark =vase]
^- (quip card:agent:gall agent:gall)
?: =(mark %push-hook-action)
?> (team:title our.bowl src.bowl)
=^ cards state
(poke-hook-action:hc !<(action vase))
[cards this]
::
?: (is-root:ver mark)
:_ this
(forward-update:hc mark vase)
::
=^ cards push-hook
(on-poke:og mark vase)
[cards this]
::
++ on-watch
|= =path
^- (quip card:agent:gall agent:gall)
?: ?=([%version ~] path)
:_ this
(fact-init:io version+!>(min-version.config))^~
?. ?=([%resource *] path)
=^ cards push-hook
(on-watch:og path)
[cards this]
|^
?. ?=([%ver %ship @ @ @ *] t.path)
unversioned
=/ =resource
(de-path:resource t.t.path)
=/ =mark
(append-version:ver (slav %ud i.t.t.t.t.t.path))
?. (supported:ver mark)
:_ this
(fact-init-kick:io version+!>(min-version.config))
=/ =vase
(convert-to:ver mark (initial-watch:og t.t.t.t.t.t.path resource))
:_ this
[%give %fact ~ mark vase]~
::
++ unversioned
?> ?=([%ship @ @ *] t.path)
=/ =resource
(de-path:resource t.path)
=/ =vase
%+ convert-to:ver update-mark.config
(initial-watch:og t.t.t.t.path resource)
:_ this
[%give %fact ~ update-mark.config vase]~
--
::
++ on-agent
|= [=wire =sign:agent:gall]
^- (quip card:agent:gall agent:gall)
?. ?=([%helper %push-hook @ *] wire)
=^ cards push-hook
(on-agent:og wire sign)
[cards this]
?. ?=(%store i.t.t.wire)
(on-agent:def wire sign)
?+ -.sign (on-agent:def wire sign)
%kick [~[watch-store:hc] this]
::
%fact
?. (is-root:ver p.cage.sign)
=^ cards push-hook
(on-agent:og wire sign)
[cards this]
=^ cards push-hook
(take-update:og q.cage.sign)
:_ this
%+ weld
(push-updates:hc cage.sign)
cards
==
::
++ on-leave
|= =path
=^ cards push-hook
(on-leave:og path)
[cards this]
::
++ on-arvo
|= [=wire =sign-arvo]
=^ cards push-hook
(on-arvo:og wire sign-arvo)
[cards this]
::
++ on-fail
|= [=term =tang]
=^ cards push-hook
(on-fail:og term tang)
[cards this]
::
++ on-peek
|= =path
^- (unit (unit cage))
?: =(/x/dbug/state path)
``noun+(slop !>(state(inner-state *vase)) on-save:og)
?. =(/x/sharing path)
(on-peek:og path)
``noun+!>(sharing)
--
|_ =bowl:gall
+* og ~(. push-hook bowl)
ver ~(. versioning [bowl [update-mark version min-version]:config])
io ~(. agentio bowl)
pass pass:io
::
++ poke-hook-action
|= =action
^- (quip card:agent:gall _state)
|^
?- -.action
%add (add +.action)
%remove (remove +.action)
%revoke (revoke +.action)
==
::
++ add
|= rid=resource
=. sharing
(~(put in sharing) rid)
`state
::
++ remove
|= rid=resource
=/ pax=path
[%resource (en-path:resource rid)]
=/ paths=(set path)
%- silt
%+ turn
(incoming-subscriptions pax)
|=([ship pox=path] pox)
=. sharing
(~(del in sharing) rid)
:_ state
[%give %kick ~(tap in paths) ~]~
::
++ revoke
|= [ships=(set ship) rid=resource]
=/ pax=path
[%resource (en-path:resource rid)]
:_ state
%+ murn
(incoming-subscriptions pax)
|= [her=ship =path]
^- (unit card)
?. (~(has in ships) her)
~
`[%give %kick ~[path] `her]
--
::
++ incoming-subscriptions
|= prefix=path
^- (list (pair ship path))
%+ skim
~(val by sup.bowl)
|= [him=ship pax=path]
=/ idx=(unit @)
(find prefix pax)
?~ idx %.n
=(u.idx 0)
::
++ make-wire
|= =wire
^+ wire
%+ weld
/helper/push-hook
wire
::
++ watch-store
^- card:agent:gall
=/ =wire
(make-wire /store)
[%pass wire %agent [our.bowl store-name.config] %watch store-path.config]
::
++ push-updates
|= =cage
^- (list card:agent:gall)
%+ roll (resource-for-update q.cage)
|= [rid=resource cards=(list card)]
|^
:(weld cards versioned unversioned)
::
++ versioned
^- (list card:agent:gall)
=/ prefix=path
resource+ver+(en-path:resource rid)
=/ paths=(jug @ud path)
%+ roll
(incoming-subscriptions prefix)
|= [[ship =path] out=(jug @ud path)]
=/ path-ver=@ud
(ver-from-path path)
(~(put ju out) path-ver path)
%+ turn ~(tap by paths)
|= [fact-ver=@ud paths=(set path)]
=/ =mark
(append-version:ver fact-ver)
=/ =^cage
:- mark
(convert-from:ver mark q.cage)
(fact:io cage ~(tap in paths))
:: TODO: deprecate
++ unversioned
=/ prefix=path
resource+(en-path:resource rid)
=/ unversioned=(set path)
%- ~(gas in *(set path))
(turn (incoming-subscriptions prefix) tail)
?: =(0 ~(wyt in unversioned)) ~
=/ =^cage
:- update-mark.config
(convert-from:ver update-mark.config q.cage)
(fact:io cage ~(tap in unversioned))^~
--
::
++ forward-update
|= =cage
^- (list card:agent:gall)
=- lis
=/ vas
(convert-to:ver cage)
%+ roll (resource-for-update q.cage)
|= [rid=resource [lis=(list card:agent:gall) tf-vas=(unit vase)]]
^- [(list card:agent:gall) (unit vase)]
=/ =path
resource+(en-path:resource rid)
=/ =wire (make-wire path)
=* ship entity.rid
=. tf-vas
?. =(our.bowl ship)
:: do not transform before forwarding
::
`vas
:: use cached transform
::
?^ tf-vas tf-vas
:: transform before poking store
::
(transform-proxy-update:og vas)
~| "forwarding failed during transform. mark: {<p.cage>} resource: {<rid>}"
?> ?=(^ tf-vas)
=/ =dock
:- ship
?. =(our.bowl ship)
:: forward to host
::
dap.bowl
:: poke our store
::
store-name.config
=/ cag=^cage
:- current-version:ver
u.tf-vas
:_ tf-vas
[[%pass wire %agent dock %poke cag] lis]
::
++ ver-from-path
|= =path
=/ extra=^path
(slag 5 path)
?> ?=(^ extra)
(slav %ud i.extra)
::
++ resource-for-update
|= =vase
^- (list resource)
%~ tap in
%- silt
(resource-for-update:og vase)
--
--