From a7ecb972e86c81ad9122330520318a4cb9806aeb Mon Sep 17 00:00:00 2001 From: Fang Date: Fri, 22 Feb 2019 17:43:37 +0100 Subject: [PATCH] Implement %eth-watcher, an app for tracking Ethereum events Poke with [%watch %sometag config:eth-watcher] to initialize, then subscribe at /sometag to receive updates in the shape of [%snap snapshot:eth-watcher] for initial and on-reorg logs, [%logs loglist] for logs as they happen. --- app/eth-watcher.hoon | 556 ++++++++++++++++++++++++++++++++++++ lib/eth-watcher.hoon | 38 +++ mar/eth-watcher/action.hoon | 7 + mar/eth-watcher/update.hoon | 7 + sur/eth-watcher.hoon | 39 +++ 5 files changed, 647 insertions(+) create mode 100644 app/eth-watcher.hoon create mode 100644 lib/eth-watcher.hoon create mode 100644 mar/eth-watcher/action.hoon create mode 100644 mar/eth-watcher/update.hoon create mode 100644 sur/eth-watcher.hoon diff --git a/app/eth-watcher.hoon b/app/eth-watcher.hoon new file mode 100644 index 000000000..872e95a1e --- /dev/null +++ b/app/eth-watcher.hoon @@ -0,0 +1,556 @@ +:: watcher: ethereum event log collector +:: +/+ *eth-watcher +:: +=, ethereum +=, rpc +:: +|% +++ state + $: eyes=(map name eye) + == +:: +++ eye + $: config + latest-block=@ud + filter-id=@ud + poll-timer=@da + snapshot + sap=history + == +:: +++ history + $: interval=_100 + max-count=_10 + count=@ud + latest-block=@ud + snaps=(qeu snapshot) + == +:: +++ move (pair bone card) +++ card + $% [%hiss wire (unit user:eyre) mark %hiss hiss:eyre] + [%wait wire @da] + [%rest @da] + [%info wire desk nori:clay] + [%diff %eth-watcher-update update] + [%quit ~] + == +-- +:: +|_ [bowl:gall state] +:: +++ prep + |= old=(unit state) + ?~ old + [~ ..prep] + [~ ..prep(+<+ u.old)] +:: +++ poke-noun + |= [what=?(%debug %debug-init %save %load) =name] + ^- (quip move _+>) + =+ eye=(fall (~(get by eyes) name) *eye) + ?- what + %debug + ~& [%log-lent (lent logs.eye)] + ~& [%last-heard last-heard-block.eye] + [~ +>.$] + :: + %debug-init + =- done:(init:watcher name -) + ^- config + :* (need (de-purl:html 'http://104.198.35.227:8545')) :: parity + ::(need (de-purl:html 'http://35.226.110.143:8545')) :: geth + 7.100.000 + ~ + ~[azimuth:contracts:azimuth] + ~ + == + :: + %save + =/ pax=path + /(scot %p our)/home/(scot %da now)/watcher/[name]/jam + :_ +>.$ + :_ ~ + ^- move + :* ost + %info + /jamfile + (foal:space:userlib pax [%jam !>((jam eye))]) + == + :: + %load + =. eyes + %+ ~(put by eyes) name + =- (^eye (cue .^(@ %cx -))) + /(scot %p our)/home/(scot %da now)/watcher/[name]/jam + done:new-filter:(open:watcher name) + == +:: +++ poke-eth-watcher-action + |= act=action + ^- (quip move _+>) + ?- -.act + %watch + done:(init:watcher +.act) + :: + %clear + wipe:(open:watcher +.act) + == +:: +++ peer + |= pax=path + ^- (quip move _+>) + ?> ?=([@ ~] pax) + done:(put-snapshot-diff:(open:watcher i.pax) ost) +:: +++ wake + |= [wir=wire ~] + ^- (quip move _+>) + ?> ?=([@ %poll ~] wir) + done:poll-filter:(open:watcher i.wir) +:: +++ sigh-tang + |= [wir=wire res=tang] + ^- (quip move _+>) + ~& ['something went wrong!' wir] + ~_ res + [~ +>.$] +:: +++ sigh-json-rpc-response + |= [wir=wire res=response:rpc:jstd] + ^- (quip move _+>) + ?> ?=([@ *] wir) + =< done + %- sigh-json-rpc-response:(open:watcher i.wir) + [t.wir res] +:: +++ watcher + |_ $: =name + =eye + rewind-block=(unit @ud) + new-logs=loglist + moves=(list move) + == + :: + ++ open + |= nom=^name + ^+ +> + +>.$(name nom, eye (~(got by eyes) nom)) + :: + :: +| outward + :: + ++ wipe + => cancel-wait-poll + => cancel-subscribers + :- (flop moves) + ..watcher(eyes (~(del by eyes) name)) + :: + ++ done + ^- [(list move) _..watcher] + =? . ?=(^ rewind-block) + :: if we're rewinding to a block, then we throw away any moves + :: and changes we were going to make. + :: + =: moves *(list move) + new-logs *loglist + == + (restore-block u.rewind-block) + :: if we have any updates, send them + :: + =? . !=(~ new-logs) + (fan-diff %logs new-logs) + :: produce moves, store updated state + :: + :- (flop moves) + ..watcher(eyes (~(put by eyes) name eye)) + :: + :: +put-move: store side-effect + :: + ++ put-move + |= =card + %_(+> moves [[ost card] moves]) + :: + ++ put-moves + |= moz=(list move) + %_(+> moves (weld (flop moz) moves)) + :: + :: +put-rpc-request: store rpc request to ethereum node + :: + ++ put-rpc-request + |= [wir=wire id=(unit @t) req=request] + ^+ +> + %- put-move + ^- card + :* %hiss + [name wir] + ~ + %json-rpc-response + %hiss + %+ json-request node.eye + (request-to-json id req) + == + :: + :: +put-log: store change made by event + :: + ++ put-log + |= log=event-log + %_ +> + new-logs (store-new-logs ~[log] new-logs) + logs.eye (store-new-logs ~[log] logs.eye) + heard.eye (~(put in heard.eye) (log-to-id log)) + == + :: + :: +| subscriptions + :: + ++ put-diff + |= [for=bone dif=update] + %_(+> moves [[for %diff %eth-watcher-update dif] moves]) + :: + ++ put-snapshot-diff + |= for=bone + (put-diff for %snap last-heard-block.eye heard.eye logs.eye) + :: + ++ get-subscribers + ^- (list bone) + %+ murn ~(tap by sup) + |= [b=bone s=ship p=path] + ^- (unit bone) + ?> ?=([@ *] p) + ?:(=(name i.p) `b ~) + :: + ++ fan-diff + |= dif=update + %- put-moves + %+ turn get-subscribers + |= b=bone + ^- move + [b %diff %eth-watcher-update dif] + :: + ++ cancel-subscribers + %- put-moves + %+ turn get-subscribers + |=(b=bone [b %quit ~]) + :: + :: +| configuration + :: + :: +init: set up eye + :: + ++ init + |= [nom=^name =config] + ^+ +> + =. name nom + =. eye + %*(. *^eye - config, last-heard-block from-block.config) + get-latest-block + :: + :: +| catch-up-operations + :: + :: +get-latest-block + :: + :: Get latest block from eth node and compare to our own latest block. + :: Get intervening blocks in chunks until we're caught up, then set + :: up a filter going forward. + :: + ++ get-latest-block + => cancel-wait-poll + (put-rpc-request /catch-up/block-number `'block number' %eth-block-number ~) + :: + :: +catch-up: get next chunk + :: + ++ catch-up + |= from-block=@ud + ^+ +> + ?: (gte from-block latest-block.eye) + new-filter + =/ next-block (min latest-block.eye (add from-block 5.760)) :: ~d1 + ~? debug=| + [%catching-up from=from-block to=latest-block.eye] + %- put-rpc-request + :+ /catch-up/step/(scot %ud from-block)/(scot %ud next-block) + `'catch up' + :* %eth-get-logs + `number+from-block + `number+next-block + contracts.eye + topics.eye + == + :: + :: +| filter-operations + :: + :: +new-filter: request a new polling filter + :: + :: Listens from the last-heard block onward. + :: + ++ new-filter + %- put-rpc-request + :+ /filter/new `'new filter' + ^- request:rpc + :* %eth-new-filter + `number+last-heard-block.eye + ?~(to-block.eye ~ `number+u.to-block.eye) + contracts.eye + topics.eye + == + :: + :: +read-filter: get all events the filter captures + :: + ++ read-filter + %- put-rpc-request + :+ /filter/logs `'filter logs' + [%eth-get-filter-logs filter-id.eye] + :: + :: +poll-filter: get all new events since last poll (or filter creation) + :: + ++ poll-filter + ?: =(0 filter-id.eye) + ~& %no-filter-bad-poll + . + %- put-rpc-request + :+ /filter/changes `'poll filter' + [%eth-get-filter-changes filter-id.eye] + :: + :: +wait-poll: remind us to poll in four minutes + :: + :: Four minutes because Ethereum RPC filters time out after five. + :: We don't check for an existing timer or clear an old one here, + :: sane flows shouldn't see this being called superfluously. + :: + ++ wait-poll + =+ wen=(add now ~m4) + %- put-move(poll-timer.eye wen) + [%wait name^/poll wen] + :: + :: +cancel-wait-poll: remove poll reminder + :: + ++ cancel-wait-poll + ?: =(*@da poll-timer.eye) ..cancel-wait-poll + %- put-move(poll-timer.eye *@da) + [%rest poll-timer.eye] + :: + :: +| filter-results + :: + :: +sigh-json-rpc-response: process rpc response + :: + ++ sigh-json-rpc-response + |= [wir=wire res=response:rpc:jstd] + ^+ +> + ~! -.res + ?: ?=(%fail -.res) + ?: =(405 p.hit.res) + ~& 'HTTP 405 error (expected if using infura)' + +>.$ + ?. =(5 (div p.hit.res 100)) + ~& [%http-error hit.res] + +>.$ + ?+ wir + ~& [%retrying-node ~] ::((soft tang) q.res)] + wait-poll + [%catch-up %step @ta @ta ~] + ~& %retrying-catch-up + (catch-up (slav %ud `@ta`i.t.t.wir)) + == + ?+ wir ~|([%weird-sigh-wire wir] !!) + [%filter %new *] + (take-new-filter res) + :: + [%filter *] + (take-filter-results res) + :: + [%catch-up %block-number ~] + (take-block-number res) + :: + [%catch-up %step @ta @ta ~] + =/ from-block (slav %ud `@ta`i.t.t.wir) + =/ next-block (slav %ud `@ta`i.t.t.t.wir) + (take-catch-up-step res from-block next-block) + == + :: + :: +take-new-filter: store filter-id and read it + :: + ++ take-new-filter + |= rep=response:rpc:jstd + ^+ +> + ~| rep + ?< ?=(%batch -.rep) + ?< ?=(%fail -.rep) + ?: ?=(%error -.rep) + ~& [%filter-error--retrying message.rep] + new-filter + =- read-filter(filter-id.eye -) + (parse-eth-new-filter-res res.rep) + :: + :: +take-filter-results: parse results into event-logs and process them + :: + ++ take-filter-results + |= rep=response:rpc:jstd + ^+ +> + ?< ?=(%batch -.rep) + ?< ?=(%fail -.rep) + ?: ?=(%error -.rep) + ?. ?| =('filter not found' message.rep) :: geth + =('Filter not found' message.rep) :: parity + == + ~& [%unhandled-filter-error +.rep] + +> + ~& [%filter-timed-out--recreating block=last-heard-block.eye +.rep] + :: arguably should rewind 40 blocks on the off chance the chain reorganized + :: when we blinked. this will also restart the filter. + :: + :: (restore-block ?:((lth last-heard-block 40) 0 (sub.add last-heard-block 40))) + :: + :: counter-argument: it's a royal pain to restore from a snapshot + :: every time you can't ping the node for 5 minutes. this is likely + :: to destabilize the network. better to manually restore if we + :: notice an anomaly. + :: + :: third way: don't trust anything that doesn't have 40 confirmations + :: + new-filter + :: kick polling timer, only if it hasn't already been. + =? +> (gth now poll-timer.eye) + wait-poll + (take-events rep) + :: + :: +take-block-number: take block number and start catching up + :: + ++ take-block-number + |= rep=response:rpc:jstd + ^+ +> + ?< ?=(%batch -.rep) + ?< ?=(%fail -.rep) + ?: ?=(%error -.rep) + ~& [%take-block-number-error--retrying message.rep] + get-latest-block + =. latest-block.eye (parse-eth-block-number res.rep) + (catch-up last-heard-block.eye) + :: + :: +take-catch-up-step: process chunk + :: + ++ take-catch-up-step + |= [rep=response:rpc:jstd from-block=@ud next-block=@ud] + ^+ +> + ?< ?=(%batch -.rep) + ?< ?=(%fail -.rep) + ?: ?=(%error -.rep) + ~& [%catch-up-step-error--retrying message.rep] + (catch-up from-block) + =. +>.$ (take-events rep) + (catch-up next-block) + :: + :: +take-events: process events + :: + ++ take-events + |= rep=response:rpc:jstd + ^+ +> + ?< ?=(%batch -.rep) + ?< ?=(%fail -.rep) + ?< ?=(%error -.rep) + ?. ?=(%a -.res.rep) + ~& [%events-not-array rep] + !! + =* changes p.res.rep + ~? &(debug=| (gth (lent changes) 0)) + :* %processing-changes + changes=(lent changes) + block=last-heard-block.eye + id=filter-id.eye + == + |- ^+ +>.^$ + ?~ changes +>.^$ + =. +>.^$ + (take-event-log (parse-event-log i.changes)) + $(changes t.changes) + :: + :: +take-event-log: obtain changes from event-log + :: + ++ take-event-log + |= log=event-log + ^+ +> + ?~ mined.log + ~& %ignoring-unmined-event + +> + =* place u.mined.log + ?: (~(has in heard.eye) block-number.place log-index.place) + ?. removed.u.mined.log + ~? debug=| + [%ignoring-duplicate-event tx=transaction-hash.u.mined.log] + +> + :: block was reorganized away, so rewind to this block and + :: start syncing again. + :: + ~& :* 'removed event! Perhaps chain has reorganized?' + tx-hash=transaction-hash.u.mined.log + block-number=block-number.u.mined.log + block-hash=block-hash.u.mined.log + == + %= +> + rewind-block + :- ~ + ?~ rewind-block + block-number.place + (min block-number.place u.rewind-block) + == + =. last-heard-block.eye + (max block-number.place last-heard-block.eye) + ?: ?& (gte block-number.place from-block.eye) + ?| ?=(~ to-block.eye) + (lte block-number.place u.to-block.eye) + == + == + (put-log log) + ~& :* %event-block-out-of-range + got=block-number.place + from=from-block.eye + to=to-block.eye + == + +>.$ + :: + :: +restore-block: rewind to block or earlier + :: + ++ restore-block + |= block=@ud + ^+ +> + =/ old-qeu snaps.sap.eye + :: clear history + :: + =: snaps.sap.eye ~ + count.sap.eye 0 + latest-block.sap.eye 0 + == + :: find a snapshot we can use, remove ones that are too new + :: + =^ snap=snapshot +>.$ + ?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu)))) + [%*(. *snapshot last-heard-block from-block.eye) +>.$] + |- ^- [snapshot _+>.^$] + =^ snap=snapshot old-qeu + ~(get to old-qeu) + =: count.sap.eye +(count.sap.eye) + latest-block.sap.eye last-heard-block.snap + snaps.sap.eye (~(put to snaps.sap.eye) snap) + == + ?: |(=(~ old-qeu) (lth block last-heard-block:(need ~(top to old-qeu)))) + [snap +>.^$] + $ + ~& [%restoring-block block last-heard-block.snap] + (restore-snap snap) + :: + :: +restore-snap: revert state to snapshot + :: + ++ restore-snap + |= snap=snapshot + ^+ +> + :: notify subscribers + ::TODO be more nuanced about what changed, maybe + :: + =. +>.$ (fan-diff snap+snap) + :: restore state and kick new fetch cycle + :: + %= get-latest-block + last-heard-block.eye last-heard-block.snap + heard.eye heard.snap + logs.eye logs.snap + == + -- +-- \ No newline at end of file diff --git a/lib/eth-watcher.hoon b/lib/eth-watcher.hoon new file mode 100644 index 000000000..6fa905ee5 --- /dev/null +++ b/lib/eth-watcher.hoon @@ -0,0 +1,38 @@ +:: eth-watcher utilities +:: +/- *eth-watcher +:: +|% +:: +log-to-id: extract the event-id from an event-log +:: +++ log-to-id + |= log=event-log:rpc:ethereum + ^- event-id:ethereum + ?> ?=(^ mined.log) + :- block-number.u.mined.log + log-index.u.mined.log +:: +:: +store-new-logs: add logs to an old loglist, ensuring newest-first ordering +:: +:: assumes :new is already ordered newest-first +:: +++ store-new-logs + |= [new=loglist old=loglist] + ^- loglist + ?~ new old + =+ new-place=(log-to-id i.new) + |- + ?~ old [i.new old] + =+ old-place=(log-to-id i.old) + :: if the :old-place is older than :new-place, + :: put :new-place down, and grab the next one from :new + :: otherwise, keep looking through :old + :: + ?: ?| (gth block.new-place block.old-place) + ?& =(block.new-place block.old-place) + (gth log.new-place log.old-place) + == + == + [i.new ^$(new t.new)] + [i.old $(old t.old)] +-- \ No newline at end of file diff --git a/mar/eth-watcher/action.hoon b/mar/eth-watcher/action.hoon new file mode 100644 index 000000000..7ccca1985 --- /dev/null +++ b/mar/eth-watcher/action.hoon @@ -0,0 +1,7 @@ +/- *eth-watcher +|_ act=action +++ grab + |% + ++ noun action + -- +-- diff --git a/mar/eth-watcher/update.hoon b/mar/eth-watcher/update.hoon new file mode 100644 index 000000000..1a2d09343 --- /dev/null +++ b/mar/eth-watcher/update.hoon @@ -0,0 +1,7 @@ +/- *eth-watcher +|_ upd=update +++ grab + |% + ++ noun update + -- +-- diff --git a/sur/eth-watcher.hoon b/sur/eth-watcher.hoon new file mode 100644 index 000000000..0b0805a1c --- /dev/null +++ b/sur/eth-watcher.hoon @@ -0,0 +1,39 @@ +:: watcher: ethereum event log collector +:: +|% +++ name @tas +:: +++ config + $: node=purl:eyre + from-block=@ud + to-block=(unit @ud) + contracts=(list address:ethereum) + topics=(list $@(@ux (list @ux))) + == +:: +++ action + $% [%watch =name =config] + ::TODO support modifying existing config for future polling + [%clear =name] + == +:: +++ update + $% :: %snap: all known-good logs, sent on-subscribe and on-reorg + ::TODO there's probably a way to be more nuanced about what we forgot + :: to cope with a reorg + :: + [%snap =snapshot] + :: %vent: newly added logs + :: + [%logs =loglist] + == +:: +++ snapshot + $: last-heard-block=@ud + heard=(set event-id:ethereum) + logs=loglist + == +:: +++ loglist + (list event-log:rpc:ethereum) :: newest first +-- \ No newline at end of file