mirror of
https://github.com/urbit/shrub.git
synced 2025-01-03 01:54:43 +03:00
572 lines
14 KiB
Plaintext
572 lines
14 KiB
Plaintext
:: watcher: ethereum event log collector
|
|
::
|
|
/+ *eth-watcher
|
|
::
|
|
=, ethereum
|
|
=, rpc
|
|
::
|
|
|%
|
|
++ state
|
|
$: eyes=(map name eye)
|
|
==
|
|
::
|
|
++ eye
|
|
$: config
|
|
latest-block=@ud
|
|
filter-id=@ud
|
|
poll-timer=(unit @da)
|
|
snapshot
|
|
sap=history
|
|
==
|
|
::
|
|
++ history
|
|
$: interval=_100
|
|
max-count=_10
|
|
count=@ud
|
|
latest-block=@ud
|
|
snaps=(qeu snapshot)
|
|
==
|
|
::
|
|
++ move (pair bone card)
|
|
++ card
|
|
$% [%hiss wire (unit user:eyre) mark %hiss hiss:eyre]
|
|
[%wait wire @da]
|
|
[%rest @da]
|
|
[%info wire desk nori:clay]
|
|
[%diff %eth-watcher-update update]
|
|
[%quit ~]
|
|
==
|
|
--
|
|
::
|
|
|_ [bowl:gall state]
|
|
::
|
|
++ prep
|
|
|= old=(unit state)
|
|
?~ old
|
|
[~ ..prep]
|
|
[~ ..prep(+<+ u.old)]
|
|
::
|
|
++ poke-noun
|
|
|= [what=?(%save %load) =name]
|
|
^- (quip move _+>)
|
|
=+ eye=(fall (~(get by eyes) name) *eye)
|
|
?- what
|
|
%save
|
|
=/ pax=path
|
|
/(scot %p our)/home/(scot %da now)/watcher/[name]/jam
|
|
:_ +>.$
|
|
:_ ~
|
|
^- move
|
|
:* ost
|
|
%info
|
|
/jamfile
|
|
(foal:space:userlib pax [%jam !>((jam eye))])
|
|
==
|
|
::
|
|
%load
|
|
=. eyes
|
|
%+ ~(put by eyes) name
|
|
=- (^eye (cue .^(@ %cx -)))
|
|
/(scot %p our)/home/(scot %da now)/watcher/[name]/jam
|
|
done:new-filter:(open:watcher name)
|
|
==
|
|
::
|
|
++ poke-eth-watcher-action
|
|
|= act=action
|
|
^- (quip move _+>)
|
|
?- -.act
|
|
%watch
|
|
done:(init:watcher +.act)
|
|
::
|
|
%clear
|
|
wipe:(open:watcher +.act)
|
|
==
|
|
::
|
|
++ peek-x
|
|
|= pax=path
|
|
^- (unit (unit [%noun *]))
|
|
?. ?=([@ *] pax) ~
|
|
=+ eye=(~(get by eyes) i.pax)
|
|
?~ eye [~ ~]
|
|
:: /name: all logs
|
|
::
|
|
?~ t.pax ``[%noun logs.u.eye]
|
|
:: /name/num: most recent num logs
|
|
::
|
|
=+ num=(slaw %ud i.t.pax)
|
|
?^ num ``[%noun (scag u.num logs.u.eye)]
|
|
:: /name/debug: debug information
|
|
::
|
|
?. ?=(%debug i.t.pax) ~
|
|
=- ``[%noun -]
|
|
=, u.eye
|
|
:* node=(en-purl:html node)
|
|
last=last-heard-block
|
|
lent=(lent logs)
|
|
time=poll-timer
|
|
==
|
|
::
|
|
++ peer
|
|
|= pax=path
|
|
^- (quip move _+>)
|
|
?> ?=([@ ~] pax)
|
|
done:(put-snapshot-diff:(open:watcher i.pax) ost)
|
|
::
|
|
++ wake
|
|
|= [wir=wire error=(unit tang)]
|
|
^- (quip move _+>)
|
|
?^ error
|
|
%- (slog u.error)
|
|
[~ ..wake]
|
|
?> ?=([@ %poll ~] wir)
|
|
done:poll-filter:(open:watcher i.wir)
|
|
::
|
|
++ sigh-tang
|
|
|= [wir=wire res=tang]
|
|
^- (quip move _+>)
|
|
~& ['something went wrong!' wir]
|
|
~_ res
|
|
[~ +>.$]
|
|
::
|
|
++ sigh-json-rpc-response
|
|
|= [wir=wire res=response:rpc:jstd]
|
|
^- (quip move _+>)
|
|
?> ?=([@ *] wir)
|
|
=< done
|
|
%- sigh-json-rpc-response:(open:watcher i.wir)
|
|
[t.wir res]
|
|
::
|
|
++ watcher
|
|
|_ $: =name
|
|
=eye
|
|
rewind-block=(unit @ud)
|
|
new-logs=loglist
|
|
moves=(list move)
|
|
==
|
|
::
|
|
:: +open: initialize core
|
|
::
|
|
++ open
|
|
|= nom=^name
|
|
^+ +>
|
|
+>.$(name nom, eye (~(got by eyes) nom))
|
|
::
|
|
:: +init: set up eye and initialize core
|
|
::
|
|
++ init
|
|
|= [nom=^name =config]
|
|
^+ +>
|
|
=. name nom
|
|
=. eye
|
|
%*(. *^eye - config, last-heard-block from-block.config)
|
|
get-latest-block
|
|
::
|
|
:: +| outward
|
|
::
|
|
:: +wipe: delete eye
|
|
::
|
|
++ wipe
|
|
=> cancel-wait-poll
|
|
=> cancel-subscribers
|
|
:- (flop moves)
|
|
..watcher(eyes (~(del by eyes) name))
|
|
::
|
|
:: +done: store changes, update subscribers
|
|
::
|
|
++ done
|
|
^- [(list move) _..watcher]
|
|
=? . ?=(^ rewind-block)
|
|
:: if we're rewinding to a block, then we throw away any moves
|
|
:: and changes we were going to make.
|
|
::
|
|
=: moves *(list move)
|
|
new-logs *loglist
|
|
==
|
|
(restore-block u.rewind-block)
|
|
:: if we have any updates, send them
|
|
::
|
|
=? . !=(~ new-logs)
|
|
(fan-diff %logs new-logs)
|
|
:: produce moves, store updated state
|
|
::
|
|
:- (flop moves)
|
|
..watcher(eyes (~(put by eyes) name eye))
|
|
::
|
|
:: +put-move: store side-effect
|
|
::
|
|
++ put-move
|
|
|= =card
|
|
%_(+> moves [[ost card] moves])
|
|
::
|
|
++ put-moves
|
|
|= moz=(list move)
|
|
%_(+> moves (weld (flop moz) moves))
|
|
::
|
|
:: +put-rpc-request: store rpc request to ethereum node
|
|
::
|
|
++ put-rpc-request
|
|
|= [wir=wire id=(unit @t) req=request]
|
|
^+ +>
|
|
%- put-move
|
|
^- card
|
|
:* %hiss
|
|
[name wir]
|
|
~
|
|
%json-rpc-response
|
|
%hiss
|
|
%+ json-request node.eye
|
|
(request-to-json id req)
|
|
==
|
|
::
|
|
:: +put-log: store change made by event
|
|
::
|
|
++ put-log
|
|
|= log=event-log
|
|
%_ +>
|
|
new-logs (store-new-logs ~[log] new-logs)
|
|
logs.eye (store-new-logs ~[log] logs.eye)
|
|
heard.eye (~(put in heard.eye) (log-to-id log))
|
|
==
|
|
::
|
|
:: +| subscriptions
|
|
::
|
|
++ put-diff
|
|
|= [for=bone dif=update]
|
|
%_(+> moves [[for %diff %eth-watcher-update dif] moves])
|
|
::
|
|
++ put-snapshot-diff
|
|
|= for=bone
|
|
(put-diff for %snap last-heard-block.eye heard.eye logs.eye)
|
|
::
|
|
++ get-subscribers
|
|
^- (list bone)
|
|
%+ murn ~(tap by sup)
|
|
|= [b=bone s=ship p=path]
|
|
^- (unit bone)
|
|
?> ?=([@ *] p)
|
|
?:(=(name i.p) `b ~)
|
|
::
|
|
++ fan-diff
|
|
|= dif=update
|
|
%- put-moves
|
|
%+ turn get-subscribers
|
|
|= b=bone
|
|
^- move
|
|
[b %diff %eth-watcher-update dif]
|
|
::
|
|
++ cancel-subscribers
|
|
%- put-moves
|
|
%+ turn get-subscribers
|
|
|=(b=bone [b %quit ~])
|
|
::
|
|
:: +| catch-up-operations
|
|
::
|
|
:: +get-latest-block
|
|
::
|
|
:: Get latest block from eth node and compare to our own latest block.
|
|
:: Get intervening blocks in chunks until we're caught up, then set
|
|
:: up a filter going forward.
|
|
::
|
|
++ get-latest-block
|
|
=> cancel-wait-poll
|
|
(put-rpc-request /catch-up/block-number `'block number' %eth-block-number ~)
|
|
::
|
|
:: +catch-up: get next chunk
|
|
::
|
|
++ catch-up
|
|
|= from-block=@ud
|
|
^+ +>
|
|
?: (gte from-block latest-block.eye)
|
|
new-filter
|
|
=/ next-block (min latest-block.eye (add from-block 5.760)) :: ~d1
|
|
~? debug=|
|
|
[%catching-up from=from-block to=latest-block.eye]
|
|
%- put-rpc-request
|
|
:+ /catch-up/step/(scot %ud from-block)/(scot %ud next-block)
|
|
`'catch up'
|
|
:* %eth-get-logs
|
|
`number+from-block
|
|
`number+next-block
|
|
contracts.eye
|
|
topics.eye
|
|
==
|
|
::
|
|
:: +| filter-operations
|
|
::
|
|
:: +new-filter: request a new polling filter
|
|
::
|
|
:: Listens from the last-heard block onward.
|
|
::
|
|
++ new-filter
|
|
%- put-rpc-request
|
|
:+ /filter/new `'new filter'
|
|
^- request:rpc
|
|
:* %eth-new-filter
|
|
`number+last-heard-block.eye
|
|
?~(to-block.eye ~ `number+u.to-block.eye)
|
|
contracts.eye
|
|
topics.eye
|
|
==
|
|
::
|
|
:: +read-filter: get all events the filter captures
|
|
::
|
|
++ read-filter
|
|
%- put-rpc-request
|
|
:+ /filter/logs `'filter logs'
|
|
[%eth-get-filter-logs filter-id.eye]
|
|
::
|
|
:: +poll-filter: get all new events since last poll (or filter creation)
|
|
::
|
|
++ poll-filter
|
|
?: =(0 filter-id.eye)
|
|
~& %no-filter-bad-poll
|
|
.
|
|
%- put-rpc-request
|
|
:+ /filter/changes `'poll filter'
|
|
[%eth-get-filter-changes filter-id.eye]
|
|
::
|
|
:: +wait-poll: remind us to poll in four minutes
|
|
::
|
|
:: Four minutes because Ethereum RPC filters time out after five.
|
|
:: We don't check for an existing timer or clear an old one here,
|
|
:: sane flows shouldn't see this being called superfluously.
|
|
::
|
|
++ wait-poll
|
|
=+ wen=(add now ~m4)
|
|
%- put-move(poll-timer.eye `wen)
|
|
[%wait name^/poll wen]
|
|
::
|
|
:: +cancel-wait-poll: remove poll reminder
|
|
::
|
|
++ cancel-wait-poll
|
|
?~ poll-timer.eye ..cancel-wait-poll
|
|
%- put-move(poll-timer.eye ~)
|
|
[%rest u.poll-timer.eye]
|
|
::
|
|
:: +| filter-results
|
|
::
|
|
:: +sigh-json-rpc-response: process rpc response
|
|
::
|
|
++ sigh-json-rpc-response
|
|
|= [wir=wire res=response:rpc:jstd]
|
|
^+ +>
|
|
~! -.res
|
|
?: ?=(%fail -.res)
|
|
?: =(405 p.hit.res)
|
|
~& 'HTTP 405 error (expected if using infura)'
|
|
+>.$
|
|
?. =(5 (div p.hit.res 100))
|
|
~& [%http-error hit.res]
|
|
+>.$
|
|
?+ wir
|
|
~& [%retrying-node ~] ::((soft tang) q.res)]
|
|
wait-poll
|
|
[%catch-up %step @ta @ta ~]
|
|
~& %retrying-catch-up
|
|
(catch-up (slav %ud `@ta`i.t.t.wir))
|
|
==
|
|
?+ wir ~|([%weird-sigh-wire wir] !!)
|
|
[%filter %new *]
|
|
(take-new-filter res)
|
|
::
|
|
[%filter *]
|
|
(take-filter-results res)
|
|
::
|
|
[%catch-up %block-number ~]
|
|
(take-block-number res)
|
|
::
|
|
[%catch-up %step @ta @ta ~]
|
|
=/ from-block (slav %ud `@ta`i.t.t.wir)
|
|
=/ next-block (slav %ud `@ta`i.t.t.t.wir)
|
|
(take-catch-up-step res from-block next-block)
|
|
==
|
|
::
|
|
:: +take-new-filter: store filter-id and read it
|
|
::
|
|
++ take-new-filter
|
|
|= rep=response:rpc:jstd
|
|
^+ +>
|
|
~| rep
|
|
?< ?=(%batch -.rep)
|
|
?< ?=(%fail -.rep)
|
|
?: ?=(%error -.rep)
|
|
~& [%filter-error--retrying message.rep]
|
|
new-filter
|
|
=- read-filter(filter-id.eye -)
|
|
(parse-eth-new-filter-res res.rep)
|
|
::
|
|
:: +take-filter-results: parse results into event-logs and process them
|
|
::
|
|
++ take-filter-results
|
|
|= rep=response:rpc:jstd
|
|
^+ +>
|
|
?< ?=(%batch -.rep)
|
|
?< ?=(%fail -.rep)
|
|
?: ?=(%error -.rep)
|
|
?. ?| =('filter not found' message.rep) :: geth
|
|
=('Filter not found' message.rep) :: parity
|
|
==
|
|
~& [%unhandled-filter-error +.rep]
|
|
+>
|
|
~& [%filter-timed-out--recreating block=last-heard-block.eye +.rep]
|
|
:: arguably should rewind 40 blocks on the off chance the chain reorganized
|
|
:: when we blinked. this will also restart the filter.
|
|
::
|
|
:: (restore-block ?:((lth last-heard-block 40) 0 (sub.add last-heard-block 40)))
|
|
::
|
|
:: counter-argument: it's a royal pain to restore from a snapshot
|
|
:: every time you can't ping the node for 5 minutes. this is likely
|
|
:: to destabilize the network. better to manually restore if we
|
|
:: notice an anomaly.
|
|
::
|
|
:: third way: don't trust anything that doesn't have 40 confirmations
|
|
::
|
|
new-filter
|
|
:: kick polling timer, only if it hasn't already been.
|
|
=? +> |(?=(~ poll-timer.eye) (gth now u.poll-timer.eye))
|
|
wait-poll
|
|
(take-events rep)
|
|
::
|
|
:: +take-block-number: take block number and start catching up
|
|
::
|
|
++ take-block-number
|
|
|= rep=response:rpc:jstd
|
|
^+ +>
|
|
?< ?=(%batch -.rep)
|
|
?< ?=(%fail -.rep)
|
|
?: ?=(%error -.rep)
|
|
~& [%take-block-number-error--retrying message.rep]
|
|
get-latest-block
|
|
=. latest-block.eye (parse-eth-block-number res.rep)
|
|
(catch-up last-heard-block.eye)
|
|
::
|
|
:: +take-catch-up-step: process chunk
|
|
::
|
|
++ take-catch-up-step
|
|
|= [rep=response:rpc:jstd from-block=@ud next-block=@ud]
|
|
^+ +>
|
|
?< ?=(%batch -.rep)
|
|
?< ?=(%fail -.rep)
|
|
?: ?=(%error -.rep)
|
|
~& [%catch-up-step-error--retrying message.rep]
|
|
(catch-up from-block)
|
|
=. +>.$ (take-events rep)
|
|
(catch-up next-block)
|
|
::
|
|
:: +take-events: process events
|
|
::
|
|
++ take-events
|
|
|= rep=response:rpc:jstd
|
|
^+ +>
|
|
?< ?=(%batch -.rep)
|
|
?< ?=(%fail -.rep)
|
|
?< ?=(%error -.rep)
|
|
?. ?=(%a -.res.rep)
|
|
~& [%events-not-array rep]
|
|
!!
|
|
=* changes p.res.rep
|
|
~? &(debug=| (gth (lent changes) 0))
|
|
:* %processing-changes
|
|
changes=(lent changes)
|
|
block=last-heard-block.eye
|
|
id=filter-id.eye
|
|
==
|
|
|- ^+ +>.^$
|
|
?~ changes +>.^$
|
|
=. +>.^$
|
|
(take-event-log (parse-event-log i.changes))
|
|
$(changes t.changes)
|
|
::
|
|
:: +take-event-log: obtain changes from event-log
|
|
::
|
|
++ take-event-log
|
|
|= log=event-log
|
|
^+ +>
|
|
?~ mined.log
|
|
~& %ignoring-unmined-event
|
|
+>
|
|
=* place u.mined.log
|
|
?: (~(has in heard.eye) block-number.place log-index.place)
|
|
?. removed.u.mined.log
|
|
~? debug=|
|
|
[%ignoring-duplicate-event tx=transaction-hash.u.mined.log]
|
|
+>
|
|
:: block was reorganized away, so rewind to this block and
|
|
:: start syncing again.
|
|
::
|
|
~& :* 'removed event! Perhaps chain has reorganized?'
|
|
tx-hash=transaction-hash.u.mined.log
|
|
block-number=block-number.u.mined.log
|
|
block-hash=block-hash.u.mined.log
|
|
==
|
|
%= +>
|
|
rewind-block
|
|
:- ~
|
|
?~ rewind-block
|
|
block-number.place
|
|
(min block-number.place u.rewind-block)
|
|
==
|
|
=. last-heard-block.eye
|
|
(max block-number.place last-heard-block.eye)
|
|
?: ?& (gte block-number.place from-block.eye)
|
|
?| ?=(~ to-block.eye)
|
|
(lte block-number.place u.to-block.eye)
|
|
==
|
|
==
|
|
(put-log log)
|
|
~& :* %event-block-out-of-range
|
|
got=block-number.place
|
|
from=from-block.eye
|
|
to=to-block.eye
|
|
==
|
|
+>.$
|
|
::
|
|
:: +restore-block: rewind to block or earlier
|
|
::
|
|
++ restore-block
|
|
|= block=@ud
|
|
^+ +>
|
|
=/ old-qeu snaps.sap.eye
|
|
:: clear history
|
|
::
|
|
=: snaps.sap.eye ~
|
|
count.sap.eye 0
|
|
latest-block.sap.eye 0
|
|
==
|
|
:: find a snapshot we can use, remove ones that are too new
|
|
::
|
|
=^ snap=snapshot +>.$
|
|
?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu))))
|
|
[%*(. *snapshot last-heard-block from-block.eye) +>.$]
|
|
|- ^- [snapshot _+>.^$]
|
|
=^ snap=snapshot old-qeu
|
|
~(get to old-qeu)
|
|
=: count.sap.eye +(count.sap.eye)
|
|
latest-block.sap.eye last-heard-block.snap
|
|
snaps.sap.eye (~(put to snaps.sap.eye) snap)
|
|
==
|
|
?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu))))
|
|
[snap +>.^$]
|
|
$
|
|
~& [%restoring-block block last-heard-block.snap]
|
|
(restore-snap snap)
|
|
::
|
|
:: +restore-snap: revert state to snapshot
|
|
::
|
|
++ restore-snap
|
|
|= snap=snapshot
|
|
^+ +>
|
|
:: notify subscribers
|
|
::TODO be more nuanced about what changed, maybe
|
|
::
|
|
=. +>.$ (fan-diff snap+snap)
|
|
:: restore state and kick new fetch cycle
|
|
::
|
|
%= get-latest-block
|
|
last-heard-block.eye last-heard-block.snap
|
|
heard.eye heard.snap
|
|
logs.eye logs.snap
|
|
==
|
|
--
|
|
--
|