mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-11-10 10:05:09 +03:00
da4de1579d
If there were pending-logs in an existing watchdog that was not fully restarted, and the number of the starting block is newer than the first in pending, when starting a new thread, those logs will be carried over to the new thread, which will then be re-downloaded and will fail to be verified in /lib/naive
613 lines
15 KiB
Plaintext
613 lines
15 KiB
Plaintext
:: eth-watcher: ethereum event log collector
|
|
::
|
|
/- *eth-watcher, spider
|
|
/+ ethereum, default-agent, verb, dbug
|
|
=, ethereum-types
|
|
=, jael
|
|
::
|
|
=> |%
|
|
+$ card card:agent:gall
|
|
+$ app-state
|
|
$: %6
|
|
dogs=(map path watchdog)
|
|
==
|
|
::
|
|
+$ context [=path dog=watchdog]
|
|
+$ watchdog
|
|
$: config
|
|
running=(unit [since=@da =tid:spider])
|
|
=number:block
|
|
=pending-logs
|
|
=history
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
:: history: newest block first, oldest event first
|
|
+$ history (list loglist)
|
|
--
|
|
::
|
|
:: Helpers
|
|
::
|
|
=> |%
|
|
++ wait
|
|
|= [=path now=@da time=@dr]
|
|
^- card
|
|
[%pass [%timer path] %arvo %b %wait (add now time)]
|
|
::
|
|
++ wait-shortcut
|
|
|= [=path now=@da]
|
|
^- card
|
|
[%pass [%timer path] %arvo %b %wait now]
|
|
::
|
|
++ poke-spider
|
|
|= [=path our=@p =cage]
|
|
^- card
|
|
[%pass [%running path] %agent [our %spider] %poke cage]
|
|
::
|
|
++ watch-spider
|
|
|= [=path our=@p =sub=path]
|
|
^- card
|
|
[%pass [%running path] %agent [our %spider] %watch sub-path]
|
|
::
|
|
++ leave-spider
|
|
|= [=path our=@p]
|
|
^- card
|
|
[%pass [%running path] %agent [our %spider] %leave ~]
|
|
--
|
|
::
|
|
:: Main
|
|
::
|
|
%- agent:dbug
|
|
^- agent:gall
|
|
=| state=app-state
|
|
%+ verb |
|
|
|_ =bowl:gall
|
|
+* this .
|
|
def ~(. (default-agent this %|) bowl)
|
|
bec byk.bowl(r da+now.bowl)
|
|
::
|
|
++ on-init
|
|
^- (quip card _this)
|
|
[~ this]
|
|
::
|
|
++ on-save !>(state)
|
|
++ on-load
|
|
|= old=vase
|
|
|^
|
|
=+ !<(old-state=app-states old)
|
|
=? old-state ?=(%0 -.old-state)
|
|
%- (slog leaf+"upgrading eth-watcher from %0" ~)
|
|
^- app-state-1
|
|
%= old-state
|
|
- %1
|
|
dogs
|
|
%- ~(run by dogs.old-state)
|
|
|= dog=watchdog-0
|
|
%= dog
|
|
-> [~m5 ->.dog]
|
|
==
|
|
==
|
|
::
|
|
=^ cards-1=(list card) old-state
|
|
?. ?=(%1 -.old-state)
|
|
`old-state
|
|
%- (slog leaf+"upgrading eth-watcher from %1" ~)
|
|
:_ old-state(- %2)
|
|
%+ turn ~(tap by dogs.old-state)
|
|
|= [=path dog=watchdog-1]
|
|
(wait-shortcut path now.bowl)
|
|
::
|
|
=? old-state ?=(%2 -.old-state)
|
|
%- (slog leaf+"upgrading eth-watcher from %2" ~)
|
|
^- app-state-3
|
|
%= old-state
|
|
- %3
|
|
dogs
|
|
%- ~(run by dogs.old-state)
|
|
|= dog=watchdog-1
|
|
%= dog
|
|
-> [| ->.dog]
|
|
==
|
|
==
|
|
::
|
|
=? old-state ?=(%3 -.old-state)
|
|
%- (slog leaf+"upgrading eth-watcher from %3" ~)
|
|
^- app-state-4
|
|
%= old-state
|
|
- %4
|
|
dogs
|
|
%- ~(run by dogs.old-state)
|
|
|= dog=watchdog-3
|
|
^- watchdog-4
|
|
%= dog
|
|
-
|
|
^- config-4
|
|
=, -.dog
|
|
[url eager refresh-rate (mul refresh-rate 6) from contracts topics]
|
|
::
|
|
running
|
|
?~ running.dog ~
|
|
`[now.bowl u.running.dog]
|
|
==
|
|
==
|
|
::
|
|
=? old-state ?=(%4 -.old-state)
|
|
%- (slog leaf+"upgrading eth-watcher from %4" ~)
|
|
^- app-state-5
|
|
%= old-state
|
|
- %5
|
|
dogs
|
|
%- ~(run by dogs.old-state)
|
|
|= dog=watchdog-4
|
|
^- watchdog-5
|
|
%= dog
|
|
-
|
|
^- config-5
|
|
=, -.dog
|
|
[url eager refresh-rate timeout-time from contracts ~ topics]
|
|
::
|
|
pending-logs-4
|
|
%- ~(run by pending-logs-4.dog)
|
|
|= =loglist-4
|
|
%+ turn loglist-4
|
|
|= =event-log-4
|
|
event-log-4(mined ?~(mined.event-log-4 ~ `mined.event-log-4))
|
|
::
|
|
history-4
|
|
%+ turn history-4.dog
|
|
|= =loglist-4
|
|
%+ turn loglist-4
|
|
|= =event-log-4
|
|
event-log-4(mined ?~(mined.event-log-4 ~ `mined.event-log-4))
|
|
==
|
|
==
|
|
::
|
|
=? old-state ?=(%5 -.old-state)
|
|
%- (slog leaf+"upgrading eth-watcher from %5" ~)
|
|
^- app-state
|
|
%= old-state
|
|
- %6
|
|
dogs
|
|
%- ~(run by dogs.old-state)
|
|
|= dog=watchdog-5
|
|
^- watchdog
|
|
%= dog
|
|
-
|
|
^- config
|
|
=, -.dog
|
|
[url eager refresh-rate refresh-rate from ~ contracts batchers topics]
|
|
::
|
|
running
|
|
?~ running.dog ~
|
|
`[now.bowl tid.u.running.dog]
|
|
==
|
|
==
|
|
::
|
|
[cards-1 this(state ?>(?=(%6 -.old-state) old-state))]
|
|
::
|
|
+$ app-states
|
|
$%(app-state-0 app-state-1 app-state-2 app-state-3 app-state-4 app-state-5 app-state)
|
|
::
|
|
+$ app-state-5
|
|
$: %5
|
|
dogs=(map path watchdog-5)
|
|
==
|
|
::
|
|
+$ watchdog-5
|
|
$: config-5
|
|
running=(unit [since=@da =tid:spider])
|
|
=number:block
|
|
=pending-logs
|
|
=history
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
+$ config-5
|
|
$: url=@ta
|
|
eager=?
|
|
refresh-rate=@dr
|
|
timeout-time=@dr
|
|
from=number:block
|
|
contracts=(list address:ethereum)
|
|
batchers=(list address:ethereum)
|
|
=topics
|
|
==
|
|
::
|
|
+$ app-state-4
|
|
$: %4
|
|
dogs=(map path watchdog-4)
|
|
==
|
|
::
|
|
+$ watchdog-4
|
|
$: config-4
|
|
running=(unit [since=@da =tid:spider])
|
|
=number:block
|
|
=pending-logs-4
|
|
=history-4
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
+$ config-4
|
|
$: url=@ta
|
|
eager=?
|
|
refresh-rate=@dr
|
|
timeout-time=@dr
|
|
from=number:block
|
|
contracts=(list address:ethereum)
|
|
=topics
|
|
==
|
|
+$ pending-logs-4 (map number:block loglist-4)
|
|
+$ history-4 (list loglist-4)
|
|
+$ loglist-4 (list event-log-4)
|
|
+$ event-log-4
|
|
$: $= mined %- unit
|
|
$: log-index=@ud
|
|
transaction-index=@ud
|
|
transaction-hash=@ux
|
|
block-number=@ud
|
|
block-hash=@ux
|
|
removed=?
|
|
==
|
|
::
|
|
address=@ux
|
|
data=@t
|
|
topics=(lest @ux)
|
|
==
|
|
::
|
|
+$ app-state-3
|
|
$: %3
|
|
dogs=(map path watchdog-3)
|
|
==
|
|
::
|
|
+$ watchdog-3
|
|
$: config-3
|
|
running=(unit =tid:spider)
|
|
=number:block
|
|
=pending-logs-4
|
|
=history-4
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
+$ config-3
|
|
$: url=@ta
|
|
eager=?
|
|
refresh-rate=@dr
|
|
from=number:block
|
|
contracts=(list address:ethereum)
|
|
=topics
|
|
==
|
|
::
|
|
+$ app-state-2
|
|
$: %2
|
|
dogs=(map path watchdog-1)
|
|
==
|
|
::
|
|
+$ app-state-1
|
|
$: %1
|
|
dogs=(map path watchdog-1)
|
|
==
|
|
::
|
|
+$ watchdog-1
|
|
$: config-1
|
|
running=(unit =tid:spider)
|
|
=number:block
|
|
=pending-logs-4
|
|
=history-4
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
+$ config-1
|
|
$: url=@ta
|
|
refresh-rate=@dr
|
|
from=number:block
|
|
contracts=(list address:ethereum)
|
|
=topics
|
|
==
|
|
::
|
|
+$ app-state-0
|
|
$: %0
|
|
dogs=(map path watchdog-0)
|
|
==
|
|
::
|
|
+$ watchdog-0
|
|
$: config-0
|
|
running=(unit =tid:spider)
|
|
=number:block
|
|
=pending-logs-4
|
|
=history-4
|
|
blocks=(list block)
|
|
==
|
|
::
|
|
+$ config-0
|
|
$: url=@ta
|
|
from=number:block
|
|
contracts=(list address:ethereum)
|
|
=topics
|
|
==
|
|
--
|
|
::
|
|
++ on-poke
|
|
|= [=mark =vase]
|
|
?: ?=(%noun mark)
|
|
~& state
|
|
`this
|
|
?. ?=(%eth-watcher-poke mark)
|
|
(on-poke:def mark vase)
|
|
::
|
|
=+ !<(=poke vase)
|
|
?- -.poke
|
|
%watch
|
|
:: fully restart the watchdog if it doesn't exist yet,
|
|
:: or if result-altering parts of the config changed.
|
|
=/ restart=?
|
|
?| !(~(has by dogs.state) path.poke)
|
|
?! .= ->+>+:(~(got by dogs.state) path.poke)
|
|
+>+>.config.poke
|
|
==
|
|
::
|
|
=/ already (~(has by dogs.state) path.poke)
|
|
~? &(already restart)
|
|
[dap.bowl 'overwriting existing watchdog on' path.poke]
|
|
=/ wait-cards=(list card)
|
|
?: already
|
|
~
|
|
[(wait-shortcut path.poke now.bowl) ~]
|
|
::
|
|
=/ restart-cards=(list card)
|
|
=/ dog (~(get by dogs.state) path.poke)
|
|
?. ?& restart
|
|
?=(^ dog)
|
|
?=(^ running.u.dog)
|
|
==
|
|
~
|
|
=/ =cage [%spider-stop !>([tid.u.running.u.dog &])]
|
|
:_ ~
|
|
`card`[%pass [%starting path.poke] %agent [our.bowl %spider] %poke cage]
|
|
=/ new-dog
|
|
=/ dog=watchdog
|
|
?: restart *watchdog
|
|
(~(got by dogs.state) path.poke)
|
|
=+ pending=(sort ~(tap in ~(key by pending-logs.dog)) lth)
|
|
=? pending-logs.dog
|
|
?: restart |
|
|
?~ pending |
|
|
(gte i.pending from.config.poke)
|
|
?> ?=(^ pending)
|
|
:: if there are pending logs newer than what we poke with,
|
|
:: we need to clear those too avoid processing duplicates
|
|
::
|
|
~& %dropping-unreleased-logs^[from+i.pending n+(lent pending)]
|
|
~
|
|
%_ dog
|
|
- config.poke
|
|
number from.config.poke
|
|
==
|
|
=. dogs.state (~(put by dogs.state) path.poke new-dog)
|
|
[(weld wait-cards restart-cards) this]
|
|
::
|
|
%clear
|
|
=. dogs.state (~(del by dogs.state) path.poke)
|
|
[~ this]
|
|
==
|
|
::
|
|
:: +on-watch: subscribe & get initial subscription data
|
|
::
|
|
:: /logs/some-path:
|
|
::
|
|
++ on-watch
|
|
|= =path
|
|
^- (quip card agent:gall)
|
|
?. ?=([%logs ^] path)
|
|
~| [%invalid-subscription-path path]
|
|
!!
|
|
:_ this :_ ~
|
|
:* %give %fact ~ %eth-watcher-diff !>
|
|
:- %history
|
|
^- loglist
|
|
%- zing
|
|
%- flop
|
|
=< history
|
|
(~(gut by dogs.state) t.path *watchdog)
|
|
==
|
|
::
|
|
++ on-leave on-leave:def
|
|
::
|
|
:: +on-peek: get diagnostics data
|
|
::
|
|
:: /block/some-path: get next block number to check for /some-path
|
|
::
|
|
++ on-peek
|
|
|= =path
|
|
^- (unit (unit cage))
|
|
?+ path ~
|
|
[%x %block ^]
|
|
?. (~(has by dogs.state) t.t.path) ~
|
|
:+ ~ ~
|
|
:- %atom
|
|
!>(number:(~(got by dogs.state) t.t.path))
|
|
::
|
|
[%x %dogs ~]
|
|
``noun+!>(~(key by dogs.state))
|
|
::
|
|
[%x %dogs %configs ~]
|
|
``noun+!>((~(run by dogs.state) |=(=watchdog -.watchdog)))
|
|
==
|
|
::
|
|
++ on-agent
|
|
|= [=wire =sign:agent:gall]
|
|
|^
|
|
^- (quip card agent:gall)
|
|
?. ?=([%running *] wire)
|
|
(on-agent:def wire sign)
|
|
?- -.sign
|
|
%poke-ack
|
|
?~ p.sign
|
|
[~ this]
|
|
%- (slog leaf+"eth-watcher couldn't start thread" u.p.sign)
|
|
:_ (clear-running t.wire) :_ ~
|
|
(leave-spider t.wire our.bowl)
|
|
::
|
|
%watch-ack
|
|
?~ p.sign
|
|
[~ this]
|
|
%- (slog leaf+"eth-watcher couldn't start listening to thread" u.p.sign)
|
|
:: TODO: kill thread that may have started, although it may not
|
|
:: have started yet since we get this response before the
|
|
:: %start-spider poke is processed
|
|
::
|
|
[~ (clear-running t.wire)]
|
|
::
|
|
%kick [~ (clear-running t.wire)]
|
|
%fact
|
|
=* path t.wire
|
|
=/ dog (~(get by dogs.state) path)
|
|
?~ dog
|
|
[~ this]
|
|
?+ p.cage.sign (on-agent:def wire sign)
|
|
%thread-fail
|
|
=+ !<([=term =tang] q.cage.sign)
|
|
%- (slog leaf+"eth-watcher failed; will retry" leaf+<term> tang)
|
|
[~ this(dogs.state (~(put by dogs.state) path u.dog(running ~)))]
|
|
::
|
|
%thread-done
|
|
=+ !<([vows=disavows pup=watchpup] q.cage.sign)
|
|
=. u.dog
|
|
%_ u.dog
|
|
- -.pup
|
|
number number.pup
|
|
blocks blocks.pup
|
|
pending-logs pending-logs.pup
|
|
==
|
|
=^ cards-1 u.dog (disavow path u.dog vows)
|
|
=^ cards-2 u.dog (release-logs path u.dog)
|
|
=. dogs.state (~(put by dogs.state) path u.dog(running ~))
|
|
[(weld cards-1 cards-2) this]
|
|
==
|
|
==
|
|
::
|
|
++ clear-running
|
|
|= =path
|
|
=/ dog (~(get by dogs.state) path)
|
|
?~ dog
|
|
this
|
|
this(dogs.state (~(put by dogs.state) path u.dog(running ~)))
|
|
::
|
|
++ disavow
|
|
|= [=path dog=watchdog vows=disavows]
|
|
^- (quip card watchdog)
|
|
=/ history-ids=(list [id:block loglist])
|
|
%+ murn history.dog
|
|
|= logs=loglist
|
|
^- (unit [id:block loglist])
|
|
?~ logs
|
|
~
|
|
`[[block-hash block-number]:(need mined.i.logs) logs]
|
|
=/ actual-vows=disavows
|
|
%+ skim vows
|
|
|= =id:block
|
|
(lien history-ids |=([=history=id:block *] =(id history-id)))
|
|
=/ actual-history=history
|
|
%+ murn history-ids
|
|
|= [=id:block logs=loglist]
|
|
^- (unit loglist)
|
|
?: (lien actual-vows |=(=vow=id:block =(id vow-id)))
|
|
~
|
|
`logs
|
|
:_ dog(history actual-history)
|
|
%+ turn actual-vows
|
|
|= =id:block
|
|
[%give %fact [%logs path]~ %eth-watcher-diff !>([%disavow id])]
|
|
::
|
|
++ release-logs
|
|
|= [=path dog=watchdog]
|
|
^- (quip card watchdog)
|
|
?: (lth number.dog 30)
|
|
`dog
|
|
=/ numbers=(list number:block) ~(tap in ~(key by pending-logs.dog))
|
|
=. numbers (sort numbers lth)
|
|
=^ logs=(list event-log:rpc:ethereum) dog
|
|
|- ^- (quip event-log:rpc:ethereum watchdog)
|
|
?~ numbers
|
|
`dog
|
|
=^ rel-logs-1 dog
|
|
=/ =loglist (~(get ja pending-logs.dog) i.numbers)
|
|
=. pending-logs.dog (~(del by pending-logs.dog) i.numbers)
|
|
?~ loglist
|
|
`dog
|
|
=. history.dog [loglist history.dog]
|
|
[loglist dog]
|
|
=^ rel-logs-2 dog $(numbers t.numbers)
|
|
[(weld rel-logs-1 rel-logs-2) dog]
|
|
:_ dog
|
|
?~ logs
|
|
~
|
|
^- (list card:agent:gall)
|
|
[%give %fact [%logs path]~ %eth-watcher-diff !>([%logs logs])]~
|
|
--
|
|
::
|
|
++ on-arvo
|
|
|= [=wire =sign-arvo]
|
|
^- (quip card agent:gall)
|
|
?+ +<.sign-arvo ~|([%strange-sign-arvo -.sign-arvo] !!)
|
|
%wake
|
|
?. ?=([%timer *] wire) ~& weird-wire=wire [~ this]
|
|
=* path t.wire
|
|
?. (~(has by dogs.state) path)
|
|
[~ this]
|
|
=/ dog=watchdog
|
|
(~(got by dogs.state) path)
|
|
?^ error.sign-arvo
|
|
:: failed, try again. maybe should tell user if fails more than
|
|
:: 5 times.
|
|
::
|
|
%- (slog leaf+"eth-watcher failed; will retry" ~)
|
|
[[(wait path now.bowl refresh-rate.dog)]~ this]
|
|
:: maybe kill a timed-out update thread, maybe start a new one
|
|
::
|
|
=^ stop-cards=(list card) dog
|
|
:: if still running beyond timeout time, kill it
|
|
::
|
|
?. ?& ?=(^ running.dog)
|
|
::
|
|
%+ gth now.bowl
|
|
(add since.u.running.dog timeout-time.dog)
|
|
==
|
|
`dog
|
|
::
|
|
%- (slog leaf+"eth-watcher {(spud path)} timed out; will restart" ~)
|
|
=/ =cage [%spider-stop !>([tid.u.running.dog |])]
|
|
:_ dog(running ~)
|
|
:~ (leave-spider path our.bowl)
|
|
[%pass [%starting path] %agent [our.bowl %spider] %poke cage]
|
|
==
|
|
::
|
|
=^ start-cards=(list card) dog
|
|
:: if not (or no longer) running, start a new thread
|
|
::
|
|
?^ running.dog
|
|
`dog
|
|
:: if reached the to-block, don't start a new thread
|
|
::
|
|
?: ?& ?=(^ to.dog)
|
|
(gte number.dog u.to.dog)
|
|
==
|
|
`dog
|
|
::
|
|
=/ new-tid=@ta
|
|
(cat 3 'eth-watcher--' (scot %uv eny.bowl))
|
|
:_ dog(running `[now.bowl new-tid])
|
|
=/ args
|
|
:* ~ `new-tid bec %eth-watcher
|
|
!>([~ `watchpup`[- number pending-logs blocks]:dog])
|
|
==
|
|
:~ (watch-spider path our.bowl /thread-result/[new-tid])
|
|
(poke-spider path our.bowl %spider-start !>(args))
|
|
==
|
|
::
|
|
:- [(wait path now.bowl refresh-rate.dog) (weld stop-cards start-cards)]
|
|
this(dogs.state (~(put by dogs.state) path dog))
|
|
==
|
|
::
|
|
++ on-fail on-fail:def
|
|
--
|