Merge pull request #3832 from urbit/la/thread-watcher

observe-hook: spin up a thread upon receiving a %fact from a subscription
This commit is contained in:
L 2020-11-09 12:37:13 -06:00 committed by GitHub
commit 9cbed7a4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 276 additions and 2 deletions

View File

@ -2,7 +2,7 @@
/+ drum=hood-drum, helm=hood-helm, kiln=hood-kiln
|%
+$ state
$: %10
$: %11
drum=state:drum
helm=state:helm
kiln=state:kiln
@ -13,6 +13,7 @@
[%7 drum=state:drum helm=state:helm kiln=state:kiln]
[%8 drum=state:drum helm=state:helm kiln=state:kiln]
[%9 drum=state:drum helm=state:helm kiln=state:kiln]
[%10 drum=state:drum helm=state:helm kiln=state:kiln]
==
+$ any-state-tuple
$: drum=any-state:drum

View File

@ -0,0 +1,222 @@
:: observe-hook:
::
:: helper that observes an app at a particular path and forwards all facts
:: to a particular thread. kills the subscription if the thread crashes
::
/- sur=observe-hook
/+ default-agent, dbug
::
|%
+$ card card:agent:gall
+$ versioned-state
$% state-0
==
::
+$ serial @uv
+$ state-0 [%0 observers=(map serial observer:sur)]
++ got-by-val
|= [a=(map serial observer:sur) b=observer:sur]
^- serial
%- need
%+ roll ~(tap by a)
|= [[key=serial val=observer:sur] output=(unit serial)]
?:(=(val b) `key output)
--
::
%- agent:dbug
=| state-0
=* state -
::
^- agent:gall
|_ =bowl:gall
+* this .
def ~(. (default-agent this %|) bowl)
::
++ on-init
|^ ^- (quip card _this)
:_ this
:_ ~
(act /inv-gra [%watch %invite-store /invitatory/graph %invite-accepted-graph])
::
++ act
|= [=wire =action:sur]
^- card
:* %pass
wire
%agent
[our.bowl %observe-hook]
%poke
%observe-action
!> ^- action:sur
action
==
--
::
++ on-save !>(state)
++ on-load
|= old-vase=vase
^- (quip card _this)
`this(state !<(state-0 old-vase))
::
++ on-poke
|= [=mark =vase]
^- (quip card _this)
?> (team:title our.bowl src.bowl)
?. ?=(%observe-action mark)
(on-poke:def mark vase)
=/ =action:sur !<(action:sur vase)
=* observer observer.action
=/ vals (silt ~(val by observers))
?- -.action
%watch
?: ?|(=(app.observer %spider) =(app.observer %observe-hook))
~|('we avoid infinite loops' !!)
?: (~(has in vals) observer)
~|('duplicate observer' !!)
:_ this(observers (~(put by observers) (sham eny.bowl) observer))
:_ ~
:* %pass
/observer/(scot %uv (sham eny.bowl))
%agent
[our.bowl app.observer]
%watch
path.observer
==
::
%ignore
?. (~(has in vals) observer)
~|('cannot remove nonexistent observer' !!)
=/ key (got-by-val observers observer)
:_ this(observers (~(del by observers) key))
:_ ~
:* %pass
/observer/(scot %uv key)
%agent
[our.bowl app.observer]
%leave
~
==
==
::
++ on-agent
|= [=wire =sign:agent:gall]
^- (quip card _this)
|^
?+ wire (on-agent:def wire sign)
[%observer @ ~] on-observer
[%thread-result @ ~] on-thread-result
[%thread-start @ @ ~] on-thread-start
==
::
++ on-observer
?> ?=([%observer @ ~] wire)
?+ -.sign (on-agent:def wire sign)
%watch-ack
?~ p.sign [~ this]
=/ =serial (slav %uv i.t.wire)
~& watch-ack-deleting-observer+(~(got by observers) serial)
[~ this(observers (~(del by observers) serial))]
::
%kick
=/ =serial (slav %uv i.t.wire)
=/ =observer:sur (~(got by observers) serial)
:_ this
:_ ~
:* %pass
wire
%agent
[our.bowl app.observer]
%watch
path.observer
==
::
%fact
=/ =serial (slav %uv i.t.wire)
=/ =observer:sur (~(got by observers) serial)
=/ tid (scot %uv (sham eny.bowl))
:_ this
:~ :* %pass
[%thread-result i.t.wire ~]
%agent
[our.bowl %spider]
%watch
[%thread-result tid ~]
==
:* %pass
[%thread-start i.t.wire tid ~]
%agent
[our.bowl %spider]
%poke
%spider-start
!>([~ `tid thread.observer (slop q.cage.sign !>(~))])
== ==
==
::
++ on-thread-result
?> ?=([%thread-result @ ~] wire)
?+ -.sign (on-agent:def wire sign)
%kick [~ this]
%watch-ack [~ this]
::
%fact
?. =(p.cage.sign %thread-fail)
:_ this
:_ ~
:* %pass
wire
%agent
[our.bowl %spider]
%leave
~
==
=/ =serial (slav %uv i.t.wire)
=/ =observer:sur (~(got by observers) serial)
~& observer-failed+observer
:_ this(observers (~(del by observers) serial))
:~ :* %pass
[%observer i.t.wire ~]
%agent
[our.bowl app.observer]
%leave
~
==
:* %pass
wire
%agent
[our.bowl %spider]
%leave
~
==
==
==
::
++ on-thread-start
?> ?=([%thread-start @ @ ~] wire)
?. ?=(%poke-ack -.sign) (on-agent:def wire sign)
?~ p.sign [~ this]
=/ =serial (slav %uv i.t.wire)
=/ =observer:sur (~(got by observers) serial)
~& added-invalid-observer+observer
:_ this(observers (~(del by observers) serial))
:~ :* %pass
[%observer i.t.wire ~]
%agent
[our.bowl app.observer]
%leave
~
==
:* %pass
wire
%agent
[our.bowl app.observer]
%leave
~
== ==
--
::
++ on-watch on-watch:def
++ on-leave on-leave:def
++ on-peek on-peek:def
++ on-arvo on-arvo:def
++ on-fail on-fail:def
--

View File

@ -107,6 +107,7 @@
%graph-store
%graph-pull-hook
%graph-push-hook
%observe-hook
==
::
++ deft-fish :: default connects
@ -209,7 +210,7 @@
==
::
++ on-load
|= [hood-version=?(%1 %2 %3 %4 %5 %6 %7 %8 %9 %10) old=any-state]
|= [hood-version=?(%1 %2 %3 %4 %5 %6 %7 %8 %9 %10 %11) old=any-state]
=< se-abet =< se-view
=. sat old
=. dev (~(gut by bin) ost *source)
@ -241,6 +242,8 @@
=? ..on-load (lte hood-version %10)
=> (se-born | %home %graph-push-hook)
(se-born | %home %graph-pull-hook)
=? ..on-load (lte hood-version %11)
(se-born | %home %observe-hook)
..on-load
::
++ reap-phat :: ack connect

View File

@ -0,0 +1,13 @@
/- sur=observe-hook
|_ =action:sur
++ grad %noun
++ grow
|%
++ noun action
--
::
++ grab
|%
++ noun action:sur
--
--

View File

@ -0,0 +1,7 @@
|%
+$ observer [app=term =path thread=term]
+$ action
$% [%watch =observer]
[%ignore =observer]
==
--

View File

@ -0,0 +1,28 @@
/- spider, inv=invite-store, graph-view
/+ strandio
::
=* strand strand:spider
=* fail strand-fail:strand
=* poke-our poke-our:strandio
=* flog-text flog-text:strandio
::
^- thread:spider
|= arg=vase
=/ m (strand ,vase)
^- form:m
=+ !<([=update:inv ~] arg)
?. ?=(%accepted -.update)
(pure:m !>(~))
;< =bowl:spider bind:m get-bowl:strandio
=* invite invite.update
?: =(our.bowl entity.resource.invite)
:: do not crash because that will kill the invitatory subscription
(pure:m !>(~))
;< ~ bind:m
%+ poke-our %spider
=- spider-start+!>([`tid.bowl ~ %graph-join -])
%+ slop
!> ^- action:graph-view
[%join resource.invite ship.invite]
!>(~)
(pure:m !>(~))