:: lib/pull-hook: helper for creating a push hook :: :: lib/pull-hook is a helper for automatically pulling data from a :: corresponding push-hook to a store. :: :: ## Interfacing notes: :: :: The inner door may interact with the library by producing cards. :: Do not pass any cards on a wire beginning with /helper as these :: wires are reserved by this library. Any watches/pokes/peeks not :: listed below will be routed to the inner door. :: :: ## Subscription paths :: :: /tracking: The set of resources we are pulling :: :: ## Pokes :: :: %pull-hook-action: Add/remove a resource from pulling. :: /- *pull-hook /+ default-agent, resource, versioning, agentio, pull-hook-virt |% :: JSON conversions ++ dejs =, dejs:format |% ++ action %- of :~ add+add == ++ add %- ot :~ ship+(su ;~(pfix sig fed:ag)) resource+dejs:resource == -- -- :: :: |% +$ card card:agent:gall :: :: $config: configuration for the pull hook :: :: .store-name: name of the store to send subscription updates to. :: .update-mark: mark that updates will be tagged with, without :: version number :: .push-hook-name: name of the corresponding push-hook :: .no-validate: If true, don't validate that resource/wire/src match :: up :: +$ config $: store-name=term update=mold update-mark=term push-hook-name=term version=@ud min-version=@ud no-validate=_| == :: :: $base-state-0: state for the pull hook :: :: .tracking: a map of resources we are pulling, and the ships that :: we are pulling them from. :: .inner-state: state given to internal door :: +$ base-state-0 $: tracking=(map resource ship) inner-state=vase == :: +$ base-state-1 $: base-state-0 failed-kicks=(map resource ship) == :: +$ track [=ship =status] :: +$ status $% [%active ~] [%failed-kick ~] [%pub-ver ver=@ud] [%sub-ver ver=@ud] == :: +$ base-state-2 $: tracking=(map resource track) inner-state=vase == :: +$ base-state-3 $: prev-version=@ud prev-min-version=@ud base-state-2 == :: +$ state-0 [%0 base-state-0] :: +$ state-1 [%1 base-state-0] :: +$ state-2 [%2 base-state-1] :: +$ state-3 [%3 base-state-2] :: +$ state-4 [%4 base-state-3] :: +$ versioned-state $% state-0 state-1 state-2 state-3 state-4 == :: +diplomatic: only renegotiate if versions changed :: :: If %.n please leave note as to why renegotiation necessary :: :: ++ diplomatic ^- ? %.y :: ++ default |* [pull-hook=* =config] |_ =bowl:gall :: ++ on-pull-nack |= [=resource =tang] =/ =tank leaf+"subscribe failed from {} for {}" %- (slog tank tang) [~ pull-hook] :: ++ on-pull-kick |= =resource *(unit path) -- :: ++ pull-hook |* config $_ ^| |_ bowl:gall :: +on-pull-nack: handle failed pull subscription :: :: This arm is called when a pull subscription fails. lib/pull-hook :: will automatically delete the resource from .tracking by the :: time this arm is called. :: ++ on-pull-nack |~ [resource tang] *[(list card) _^|(..on-init)] :: +on-pull-kick: produce any additional resubscribe path :: :: If non-null, the produced path is appended to the original :: subscription path. This should be used to encode extra :: information onto the path in order to reduce the payload of a :: kick and resubscribe. :: :: If null, a resubscribe is not attempted :: ++ on-pull-kick |~ resource *(unit path) :: +resource-for-update: get resources from vase :: :: This should be identical to the +resource-for-update arm in the :: corresponding push-hook :: ++ resource-for-update |~ vase *(list resource) :: :: from agent:gall ++ on-init *[(list card) _^|(..on-init)] :: ++ on-save *vase :: ++ on-load |~ vase *[(list card) _^|(..on-init)] :: ++ on-poke |~ [mark vase] *[(list card) _^|(..on-init)] :: ++ on-watch |~ path *[(list card) _^|(..on-init)] :: ++ on-leave |~ path *[(list card) _^|(..on-init)] :: ++ on-peek |~ path *(unit (unit cage)) :: ++ on-agent |~ [wire sign:agent:gall] *[(list card) _^|(..on-init)] :: ++ on-arvo |~ [wire sign-arvo] *[(list card) _^|(..on-init)] :: ++ on-fail |~ [term tang] *[(list card) _^|(..on-init)] -- ++ agent |* =config |= =(pull-hook config) =| state-4 =* state - ^- agent:gall =< |_ =bowl:gall +* this . og ~(. pull-hook bowl) hc ~(. +> bowl) def ~(. (default-agent this %|) bowl) ver ~(. versioning [bowl [update-mark version min-version]:config]) io ~(. agentio bowl) pass pass:io :: ++ on-init ^- [(list card:agent:gall) agent:gall] =^ cards pull-hook on-init:og [cards this] :: ++ on-load |= =old=vase =/ old !<(versioned-state old-vase) =| cards=(list card:agent:gall) |^ ?- -.old %4 =^ og-cards pull-hook (on-load:og inner-state.old) =. state old =/ kick=(list card) ?: ?& =(min-version.config prev-min-version.old) =(version.config prev-version.old) diplomatic == ~ (poke-self:pass kick+!>(%.n))^~ :_ this :(weld cards og-cards kick) :: %3 $(old [%4 0 0 +.old]) %2 $(old (state-to-3 old)) %1 $(old [%2 +.old ~]) %0 !! :: pre-breach == :: ++ state-to-3 |= old=state-2 %* . *state-3 tracking (tracking-to-3 tracking.old) inner-state inner-state.old == :: ++ tracking-to-3 |= trk=(map resource ship) %- ~(gas by *(map resource track)) %+ turn ~(tap by trk) |= [=resource =ship] :- resource [ship %active ~] :: -- :: ++ on-save ^- vase =: inner-state on-save:og prev-min-version min-version.config prev-version version.config == !>(state) :: ++ on-poke |= [=mark =vase] ^- [(list card:agent:gall) agent:gall] ?+ mark =^ cards pull-hook (on-poke:og mark vase) [cards this] :: %kick ?> (team:title [our src]:bowl) =+ !<(nice=? vase) =^ [cards=(list card:agent:gall) hook=_pull-hook] state (restart-subs:hc nice) =. pull-hook hook [cards this] :: %sane ?> (team:title [our src]:bowl) =^ cards state poke-sane:hc [cards this] :: %pull-hook-action ?> (team:title [our src]:bowl) =^ [cards=(list card) hook=_pull-hook] state tr-abet:(tr-hook-act:track-engine:hc !<(action vase)) =. pull-hook hook [cards this] == :: ++ on-watch |= =path ^- [(list card:agent:gall) agent:gall] ?> (team:title our.bowl src.bowl) ?+ path :: forward by default =^ cards pull-hook (on-watch:og path) [cards this] :: [%nack ~] `this :: [%tracking ~] :_ this ~[give-update] == :: ++ on-agent |= [=wire =sign:agent:gall] ^- [(list card:agent:gall) agent:gall] ?. ?=([%helper %pull-hook @ *] wire) =^ cards pull-hook (on-agent:og wire sign) [cards this] ?: ?=([%version ~] t.t.wire) =^ [cards=(list card) hook=_pull-hook] state (take-version:hc src.bowl sign) =. pull-hook hook [cards this] ?. ?=([%pull ?(%unver-resource %resource) *] t.t.wire) (on-agent:def wire sign) =/ rid=resource (de-path:resource t.t.t.t.wire) =/ versioned=? ?=(%resource i.t.t.t.wire) =^ [cards=(list card) hook=_pull-hook] state tr-abet:(tr-sign:(tr-abed:track-engine:hc rid) sign versioned) =. pull-hook hook [cards this] :: ++ on-leave |= =path ^- [(list card:agent:gall) agent:gall] =^ cards pull-hook (on-leave:og path) [cards this] :: ++ on-arvo |= [=wire =sign-arvo] ^- [(list card:agent:gall) agent:gall] =^ cards pull-hook (on-arvo:og wire sign-arvo) [cards this] :: ++ on-fail |= [=term =tang] ^- [(list card:agent:gall) agent:gall] =^ cards pull-hook (on-fail:og term tang) [cards this] :: ++ on-peek |= =path ^- (unit (unit cage)) ?: =(/x/dbug/state path) ``noun+(slop !>(state(inner-state *vase)) on-save:og) ?. =(/x/tracking path) (on-peek:og path) ``noun+!>(~(key by tracking)) -- |_ =bowl:gall +* og ~(. pull-hook bowl) io ~(. agentio bowl) pass pass:io virt ~(. pull-hook-virt bowl) ver ~(. versioning [bowl [update-mark version min-version]:config]) :: ++ restart-subs |= nice=? =| acc-cards=(list card) =/ subs=(list resource) ~(tap in ~(key by tracking)) |- ^- [[(list card) _pull-hook] _state] ?~ subs [[acc-cards pull-hook] state] =* rid i.subs =^ [crds=(list card) hook=_pull-hook] state tr-abet:(tr-on-load:(tr-abed:track-engine rid) nice) =. pull-hook hook $(subs t.subs, acc-cards (weld acc-cards crds)) :: ++ track-engine |_ [cards=(list card) rid=resource =ship =status gone=_|] :: +| %init: state machine setup and manipulation :: ++ tr-core . ++ tr-abed |= r=resource =/ [s=^ship sta=^status] (~(got by tracking) r) tr-core(rid r, ship s, status sta) :: ++ tr-abet ^- [[(list card) _pull-hook] _state] =. tracking ?: gone (~(del by tracking) rid) (~(put by tracking) rid [ship status]) [[(flop cards) pull-hook] state] :: ++ tr-emit |= =card tr-core(cards [card cards]) :: ++ tr-emis |= caz=(list card) tr-core(cards (welp (flop cards) cards)) :: ++ tr-ap-og |= ap=_^?(|.(*(quip card _pull-hook))) =^ caz pull-hook (ap) (tr-emis caz) :: ++ tr-sane ^- ? ?+ -.status %.y %active (~(has by wex.bowl) [tr-sub-wire tr-sub-dock]) == :: +| %sign: sign handling :: :: ++ tr-sign |= [=sign:agent:gall versioned=?] |^ ?+ -.sign !! %kick tr-kick %watch-ack (tr-wack +.sign) %fact (tr-fact +.sign) == :: ++ tr-wack |= tan=(unit tang) ?~ tan tr-core ?. versioned %- tr-ap-og:tr-cleanup:tr-give-nack |.((on-pull-nack:og rid u.tan)) %- (slog leaf+"versioned nack for {} in {}" u.tan) =/ pax (kick-mule:virt rid |.((on-pull-kick:og rid))) ?~ pax tr-failed-kick ?~ u.pax tr-cleanup (tr-watch-unver u.u.pax) :: ++ tr-fact |= =cage ?: ?=(%version p.cage) =/ req-ver=@ud !<(@ud q.cage) ?: (lth req-ver min-version.config) (tr-suspend-pub-ver min-version.config) (tr-suspend-sub-ver req-ver) ?> (is-root:ver p.cage) =/ fact-ver=@ud (read-version:ver p.cage) ?. (gte fact-ver min-version.config) ?. versioned :: don't process unversioned, unsupported facts :: just wait for publisher to upgrade and kick the :: subscription tr-core (tr-suspend-pub-ver min-version.config) =/ =^cage (convert-to:ver cage) =/ =wire (make-wire /store) =+ resources=(~(gas in *(set resource)) (resource-for-update:og q.cage)) ?> ?| no-validate.config ?& (check-src resources) (~(has in resources) rid) == == =/ =mark (append-version:ver version.config) (tr-emit (~(poke-our pass wire) store-name.config cage)) -- :: ++ tr-kick ?. ?=(%active -.status) tr-core =/ pax (kick-mule:virt rid |.((on-pull-kick:og rid))) ?~ pax tr-failed-kick ?~ u.pax tr-cleanup (tr-watch u.u.pax) :: +| %lifecycle: lifecycle management for tracked resource :: :: ++ tr-add |= [s=^ship r=resource] ?< =(s our.bowl) =: ship s rid r status [%active ~] == (tr-watch /) :: ++ tr-remove tr-leave:tr-cleanup :: ++ tr-hook-act |= =action ^+ tr-core ?- -.action %add (tr-add +.action) :: %remove ?. (~(has by tracking) resource.action) tr-core tr-remove:(tr-abed resource.action) == :: ++ tr-cleanup =. gone %.y (tr-emit give-update) :: ++ tr-failed-kick tr-core(status [%failed-kick ~]) :: ++ tr-suspend-pub-ver |= ver=@ud =. status [%pub-ver ver] tr-leave:tr-watch-ver :: :: ++ tr-suspend-sub-ver |= ver=@ud tr-core(status [%sub-ver ver]) :: ++ tr-on-load |= nice=? ?+ -.status tr-core %failed-kick tr-restart %active ?:(&(tr-sane nice) tr-core tr-rewatch) :: %sub-ver ?. (supported:ver (append-version:ver ver.status)) tr-core tr-restart == :: ++ tr-restart =. status [%active ~] tr-kick :: ++ tr-rewatch tr-kick:tr-leave :: :: :: +| %subscription: subscription cards :: :: ++ tr-give-nack (tr-emit (fact:io resource+!>(rid) /nack ~)) :: ++ tr-ver-wire (make-wire /version) :: ++ tr-watch-ver (tr-emit (watch-version ship)) :: ++ tr-leave-ver (tr-emit (~(leave pass tr-ver-wire) tr-sub-dock)) ++ tr-sub-wire (make-wire pull+resource+(en-path:resource rid)) ++ tr-unver-sub-wire (make-wire pull+unver-resource+(en-path:resource rid)) :: ++ tr-sub-dock ^- dock [ship push-hook-name.config] :: ++ tr-watch-unver |= pax=path =/ =path :- %resource (weld (en-path:resource rid) pax) (tr-emit (~(watch pass tr-unver-sub-wire) tr-sub-dock path)) :: ++ tr-watch |= pax=path ^+ tr-core =/ =path :+ %resource %ver %+ weld (snoc (en-path:resource rid) (scot %ud version.config)) pax (tr-emit (~(watch pass tr-sub-wire) tr-sub-dock path)) :: ++ tr-leave (tr-emit (~(leave pass tr-sub-wire) tr-sub-dock)) -- :: ++ take-version |= [who=ship =sign:agent:gall] ^- [[(list card) _pull-hook] _state] ?+ -.sign !! %watch-ack ?~ p.sign [~^pull-hook state] =/ =tank leaf+"subscribe failed from {} on wire {}" %- (slog tank u.p.sign) [~^pull-hook state] :: %kick :_ state [(watch-version who)^~ pull-hook] :: %fact ?. =(%version p.cage.sign) [~^pull-hook state] =+ !<(version=@ud q.cage.sign) =/ tracks=(list [rid=resource =track]) ~(tap by tracking) =| cards=(list card) =| leave=_& |- ?~ tracks =? cards leave :_(cards (leave-version who)) [[cards pull-hook] state] ?. ?=(%pub-ver -.status.track.i.tracks) $(tracks t.tracks) ?. =(who ship.track.i.tracks) $(tracks t.tracks) ?. =(ver.status.track.i.tracks version) =. leave %.n $(tracks t.tracks) =^ [caz=(list card) hook=_pull-hook] state tr-abet:tr-restart:(tr-abed:track-engine rid.i.tracks) =. pull-hook hook $(tracks t.tracks, cards (weld cards caz)) == :: ++ version-wir (make-wire /version) :: ++ version-dock |= =ship ^- dock [ship push-hook-name.config] :: ++ watch-version |= =ship (~(watch pass version-wir) [ship push-hook-name.config] /version) :: ++ leave-version |= =ship (~(leave pass version-wir) [ship push-hook-name.config]) :: ++ poke-sane ^- (quip card:agent:gall _state) =/ cards :: TODO revive ~ :: restart-subscriptions ~? > ?=(^ cards) "Fixed subscriptions in {}" [cards state] :: ++ check-subscription |= [rid=resource =ship] ^- ? %+ lien ~(tap in ~(key by wex.bowl)) |= [=wire her=^ship app=term] ^- ? ?& =(app push-hook-name.config) =(ship her) =((scag 4 wire) /helper/pull-hook/pull/resource) =(`rid (de-path-soft:resource (slag 4 wire))) == :: :: ++ make-wire |= =wire ^+ wire %+ weld /helper/pull-hook wire :: ++ give-update ^- card [%give %fact ~[/tracking] %pull-hook-update !>(tracking)] :: ++ check-src |= resources=(set resource) ^- ? %+ roll ~(tap in resources) |= [rid=resource out=_|] ?: out %.y ?~ status=(~(get by tracking) rid) %.n =(src.bowl ship.u.status) -- --