shrub/pkg/arvo/app/eth-watcher.hoon
Jared Tobin b3901ab42f Add 'pkg/arvo/' from commit 'c20e2a185f131ff3f5d3961829bd7a3fe0f227f8'
git-subtree-dir: pkg/arvo
git-subtree-mainline: 9c8f40bf6c
git-subtree-split: c20e2a185f
2019-06-28 12:48:05 +08:00

572 lines
14 KiB
Plaintext

:: watcher: ethereum event log collector
::
/+ *eth-watcher
::
=, ethereum
=, rpc
::
|%
++ state
$: eyes=(map name eye)
==
::
++ eye
$: config
latest-block=@ud
filter-id=@ud
poll-timer=(unit @da)
snapshot
sap=history
==
::
++ history
$: interval=_100
max-count=_10
count=@ud
latest-block=@ud
snaps=(qeu snapshot)
==
::
++ move (pair bone card)
++ card
$% [%hiss wire (unit user:eyre) mark %hiss hiss:eyre]
[%wait wire @da]
[%rest @da]
[%info wire desk nori:clay]
[%diff %eth-watcher-update update]
[%quit ~]
==
--
::
|_ [bowl:gall state]
::
++ prep
|= old=(unit state)
?~ old
[~ ..prep]
[~ ..prep(+<+ u.old)]
::
++ poke-noun
|= [what=?(%save %load) =name]
^- (quip move _+>)
=+ eye=(fall (~(get by eyes) name) *eye)
?- what
%save
=/ pax=path
/(scot %p our)/home/(scot %da now)/watcher/[name]/jam
:_ +>.$
:_ ~
^- move
:* ost
%info
/jamfile
(foal:space:userlib pax [%jam !>((jam eye))])
==
::
%load
=. eyes
%+ ~(put by eyes) name
=- (^eye (cue .^(@ %cx -)))
/(scot %p our)/home/(scot %da now)/watcher/[name]/jam
done:new-filter:(open:watcher name)
==
::
++ poke-eth-watcher-action
|= act=action
^- (quip move _+>)
?- -.act
%watch
done:(init:watcher +.act)
::
%clear
wipe:(open:watcher +.act)
==
::
++ peek-x
|= pax=path
^- (unit (unit [%noun *]))
?. ?=([@ *] pax) ~
=+ eye=(~(get by eyes) i.pax)
?~ eye [~ ~]
:: /name: all logs
::
?~ t.pax ``[%noun logs.u.eye]
:: /name/num: most recent num logs
::
=+ num=(slaw %ud i.t.pax)
?^ num ``[%noun (scag u.num logs.u.eye)]
:: /name/debug: debug information
::
?. ?=(%debug i.t.pax) ~
=- ``[%noun -]
=, u.eye
:* node=(en-purl:html node)
last=last-heard-block
lent=(lent logs)
time=poll-timer
==
::
++ peer
|= pax=path
^- (quip move _+>)
?> ?=([@ ~] pax)
done:(put-snapshot-diff:(open:watcher i.pax) ost)
::
++ wake
|= [wir=wire error=(unit tang)]
^- (quip move _+>)
?^ error
%- (slog u.error)
[~ ..wake]
?> ?=([@ %poll ~] wir)
done:poll-filter:(open:watcher i.wir)
::
++ sigh-tang
|= [wir=wire res=tang]
^- (quip move _+>)
~& ['something went wrong!' wir]
~_ res
[~ +>.$]
::
++ sigh-json-rpc-response
|= [wir=wire res=response:rpc:jstd]
^- (quip move _+>)
?> ?=([@ *] wir)
=< done
%- sigh-json-rpc-response:(open:watcher i.wir)
[t.wir res]
::
++ watcher
|_ $: =name
=eye
rewind-block=(unit @ud)
new-logs=loglist
moves=(list move)
==
::
:: +open: initialize core
::
++ open
|= nom=^name
^+ +>
+>.$(name nom, eye (~(got by eyes) nom))
::
:: +init: set up eye and initialize core
::
++ init
|= [nom=^name =config]
^+ +>
=. name nom
=. eye
%*(. *^eye - config, last-heard-block from-block.config)
get-latest-block
::
:: +| outward
::
:: +wipe: delete eye
::
++ wipe
=> cancel-wait-poll
=> cancel-subscribers
:- (flop moves)
..watcher(eyes (~(del by eyes) name))
::
:: +done: store changes, update subscribers
::
++ done
^- [(list move) _..watcher]
=? . ?=(^ rewind-block)
:: if we're rewinding to a block, then we throw away any moves
:: and changes we were going to make.
::
=: moves *(list move)
new-logs *loglist
==
(restore-block u.rewind-block)
:: if we have any updates, send them
::
=? . !=(~ new-logs)
(fan-diff %logs new-logs)
:: produce moves, store updated state
::
:- (flop moves)
..watcher(eyes (~(put by eyes) name eye))
::
:: +put-move: store side-effect
::
++ put-move
|= =card
%_(+> moves [[ost card] moves])
::
++ put-moves
|= moz=(list move)
%_(+> moves (weld (flop moz) moves))
::
:: +put-rpc-request: store rpc request to ethereum node
::
++ put-rpc-request
|= [wir=wire id=(unit @t) req=request]
^+ +>
%- put-move
^- card
:* %hiss
[name wir]
~
%json-rpc-response
%hiss
%+ json-request node.eye
(request-to-json id req)
==
::
:: +put-log: store change made by event
::
++ put-log
|= log=event-log
%_ +>
new-logs (store-new-logs ~[log] new-logs)
logs.eye (store-new-logs ~[log] logs.eye)
heard.eye (~(put in heard.eye) (log-to-id log))
==
::
:: +| subscriptions
::
++ put-diff
|= [for=bone dif=update]
%_(+> moves [[for %diff %eth-watcher-update dif] moves])
::
++ put-snapshot-diff
|= for=bone
(put-diff for %snap last-heard-block.eye heard.eye logs.eye)
::
++ get-subscribers
^- (list bone)
%+ murn ~(tap by sup)
|= [b=bone s=ship p=path]
^- (unit bone)
?> ?=([@ *] p)
?:(=(name i.p) `b ~)
::
++ fan-diff
|= dif=update
%- put-moves
%+ turn get-subscribers
|= b=bone
^- move
[b %diff %eth-watcher-update dif]
::
++ cancel-subscribers
%- put-moves
%+ turn get-subscribers
|=(b=bone [b %quit ~])
::
:: +| catch-up-operations
::
:: +get-latest-block
::
:: Get latest block from eth node and compare to our own latest block.
:: Get intervening blocks in chunks until we're caught up, then set
:: up a filter going forward.
::
++ get-latest-block
=> cancel-wait-poll
(put-rpc-request /catch-up/block-number `'block number' %eth-block-number ~)
::
:: +catch-up: get next chunk
::
++ catch-up
|= from-block=@ud
^+ +>
?: (gte from-block latest-block.eye)
new-filter
=/ next-block (min latest-block.eye (add from-block 5.760)) :: ~d1
~? debug=|
[%catching-up from=from-block to=latest-block.eye]
%- put-rpc-request
:+ /catch-up/step/(scot %ud from-block)/(scot %ud next-block)
`'catch up'
:* %eth-get-logs
`number+from-block
`number+next-block
contracts.eye
topics.eye
==
::
:: +| filter-operations
::
:: +new-filter: request a new polling filter
::
:: Listens from the last-heard block onward.
::
++ new-filter
%- put-rpc-request
:+ /filter/new `'new filter'
^- request:rpc
:* %eth-new-filter
`number+last-heard-block.eye
?~(to-block.eye ~ `number+u.to-block.eye)
contracts.eye
topics.eye
==
::
:: +read-filter: get all events the filter captures
::
++ read-filter
%- put-rpc-request
:+ /filter/logs `'filter logs'
[%eth-get-filter-logs filter-id.eye]
::
:: +poll-filter: get all new events since last poll (or filter creation)
::
++ poll-filter
?: =(0 filter-id.eye)
~& %no-filter-bad-poll
.
%- put-rpc-request
:+ /filter/changes `'poll filter'
[%eth-get-filter-changes filter-id.eye]
::
:: +wait-poll: remind us to poll in four minutes
::
:: Four minutes because Ethereum RPC filters time out after five.
:: We don't check for an existing timer or clear an old one here,
:: sane flows shouldn't see this being called superfluously.
::
++ wait-poll
=+ wen=(add now ~m4)
%- put-move(poll-timer.eye `wen)
[%wait name^/poll wen]
::
:: +cancel-wait-poll: remove poll reminder
::
++ cancel-wait-poll
?~ poll-timer.eye ..cancel-wait-poll
%- put-move(poll-timer.eye ~)
[%rest u.poll-timer.eye]
::
:: +| filter-results
::
:: +sigh-json-rpc-response: process rpc response
::
++ sigh-json-rpc-response
|= [wir=wire res=response:rpc:jstd]
^+ +>
~! -.res
?: ?=(%fail -.res)
?: =(405 p.hit.res)
~& 'HTTP 405 error (expected if using infura)'
+>.$
?. =(5 (div p.hit.res 100))
~& [%http-error hit.res]
+>.$
?+ wir
~& [%retrying-node ~] ::((soft tang) q.res)]
wait-poll
[%catch-up %step @ta @ta ~]
~& %retrying-catch-up
(catch-up (slav %ud `@ta`i.t.t.wir))
==
?+ wir ~|([%weird-sigh-wire wir] !!)
[%filter %new *]
(take-new-filter res)
::
[%filter *]
(take-filter-results res)
::
[%catch-up %block-number ~]
(take-block-number res)
::
[%catch-up %step @ta @ta ~]
=/ from-block (slav %ud `@ta`i.t.t.wir)
=/ next-block (slav %ud `@ta`i.t.t.t.wir)
(take-catch-up-step res from-block next-block)
==
::
:: +take-new-filter: store filter-id and read it
::
++ take-new-filter
|= rep=response:rpc:jstd
^+ +>
~| rep
?< ?=(%batch -.rep)
?< ?=(%fail -.rep)
?: ?=(%error -.rep)
~& [%filter-error--retrying message.rep]
new-filter
=- read-filter(filter-id.eye -)
(parse-eth-new-filter-res res.rep)
::
:: +take-filter-results: parse results into event-logs and process them
::
++ take-filter-results
|= rep=response:rpc:jstd
^+ +>
?< ?=(%batch -.rep)
?< ?=(%fail -.rep)
?: ?=(%error -.rep)
?. ?| =('filter not found' message.rep) :: geth
=('Filter not found' message.rep) :: parity
==
~& [%unhandled-filter-error +.rep]
+>
~& [%filter-timed-out--recreating block=last-heard-block.eye +.rep]
:: arguably should rewind 40 blocks on the off chance the chain reorganized
:: when we blinked. this will also restart the filter.
::
:: (restore-block ?:((lth last-heard-block 40) 0 (sub.add last-heard-block 40)))
::
:: counter-argument: it's a royal pain to restore from a snapshot
:: every time you can't ping the node for 5 minutes. this is likely
:: to destabilize the network. better to manually restore if we
:: notice an anomaly.
::
:: third way: don't trust anything that doesn't have 40 confirmations
::
new-filter
:: kick polling timer, only if it hasn't already been.
=? +> |(?=(~ poll-timer.eye) (gth now u.poll-timer.eye))
wait-poll
(take-events rep)
::
:: +take-block-number: take block number and start catching up
::
++ take-block-number
|= rep=response:rpc:jstd
^+ +>
?< ?=(%batch -.rep)
?< ?=(%fail -.rep)
?: ?=(%error -.rep)
~& [%take-block-number-error--retrying message.rep]
get-latest-block
=. latest-block.eye (parse-eth-block-number res.rep)
(catch-up last-heard-block.eye)
::
:: +take-catch-up-step: process chunk
::
++ take-catch-up-step
|= [rep=response:rpc:jstd from-block=@ud next-block=@ud]
^+ +>
?< ?=(%batch -.rep)
?< ?=(%fail -.rep)
?: ?=(%error -.rep)
~& [%catch-up-step-error--retrying message.rep]
(catch-up from-block)
=. +>.$ (take-events rep)
(catch-up next-block)
::
:: +take-events: process events
::
++ take-events
|= rep=response:rpc:jstd
^+ +>
?< ?=(%batch -.rep)
?< ?=(%fail -.rep)
?< ?=(%error -.rep)
?. ?=(%a -.res.rep)
~& [%events-not-array rep]
!!
=* changes p.res.rep
~? &(debug=| (gth (lent changes) 0))
:* %processing-changes
changes=(lent changes)
block=last-heard-block.eye
id=filter-id.eye
==
|- ^+ +>.^$
?~ changes +>.^$
=. +>.^$
(take-event-log (parse-event-log i.changes))
$(changes t.changes)
::
:: +take-event-log: obtain changes from event-log
::
++ take-event-log
|= log=event-log
^+ +>
?~ mined.log
~& %ignoring-unmined-event
+>
=* place u.mined.log
?: (~(has in heard.eye) block-number.place log-index.place)
?. removed.u.mined.log
~? debug=|
[%ignoring-duplicate-event tx=transaction-hash.u.mined.log]
+>
:: block was reorganized away, so rewind to this block and
:: start syncing again.
::
~& :* 'removed event! Perhaps chain has reorganized?'
tx-hash=transaction-hash.u.mined.log
block-number=block-number.u.mined.log
block-hash=block-hash.u.mined.log
==
%= +>
rewind-block
:- ~
?~ rewind-block
block-number.place
(min block-number.place u.rewind-block)
==
=. last-heard-block.eye
(max block-number.place last-heard-block.eye)
?: ?& (gte block-number.place from-block.eye)
?| ?=(~ to-block.eye)
(lte block-number.place u.to-block.eye)
==
==
(put-log log)
~& :* %event-block-out-of-range
got=block-number.place
from=from-block.eye
to=to-block.eye
==
+>.$
::
:: +restore-block: rewind to block or earlier
::
++ restore-block
|= block=@ud
^+ +>
=/ old-qeu snaps.sap.eye
:: clear history
::
=: snaps.sap.eye ~
count.sap.eye 0
latest-block.sap.eye 0
==
:: find a snapshot we can use, remove ones that are too new
::
=^ snap=snapshot +>.$
?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu))))
[%*(. *snapshot last-heard-block from-block.eye) +>.$]
|- ^- [snapshot _+>.^$]
=^ snap=snapshot old-qeu
~(get to old-qeu)
=: count.sap.eye +(count.sap.eye)
latest-block.sap.eye last-heard-block.snap
snaps.sap.eye (~(put to snaps.sap.eye) snap)
==
?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu))))
[snap +>.^$]
$
~& [%restoring-block block last-heard-block.snap]
(restore-snap snap)
::
:: +restore-snap: revert state to snapshot
::
++ restore-snap
|= snap=snapshot
^+ +>
:: notify subscribers
::TODO be more nuanced about what changed, maybe
::
=. +>.$ (fan-diff snap+snap)
:: restore state and kick new fetch cycle
::
%= get-latest-block
last-heard-block.eye last-heard-block.snap
heard.eye heard.snap
logs.eye logs.snap
==
--
--