urbit/pkg/arvo/app/eth-watcher.hoon

471 lines
14 KiB
Plaintext
Raw Normal View History

2019-10-29 22:51:58 +03:00
:: eth-watcher: ethereum event log collector
::
/- *eth-watcher
2019-07-05 04:15:53 +03:00
/+ tapp, stdio
=, ethereum-types
2019-08-07 01:42:37 +03:00
=, able:jael
2019-07-05 04:15:53 +03:00
=> |%
2019-07-19 01:26:15 +03:00
+$ app-state
$: %0
dogs=(map path watchdog)
==
::
2019-10-29 22:51:58 +03:00
+$ context [=path dog=watchdog]
+$ watchdog
$: config
2019-07-19 03:08:01 +03:00
=number:block
=pending-logs
=history
2019-07-19 03:08:01 +03:00
blocks=(list block)
2019-07-19 01:26:15 +03:00
==
::
+$ history (list loglist)
+$ pending-logs (map number:block loglist)
::
+$ peek-data
[%atom =next-block=number:block]
2019-07-05 04:15:53 +03:00
+$ in-poke-data
$: %eth-watcher-poke
2019-10-29 22:51:58 +03:00
poke
==
2019-07-05 04:15:53 +03:00
+$ out-poke-data ~
+$ in-peer-data ~
+$ out-peer-data
$: %eth-watcher-diff
2019-10-29 22:51:58 +03:00
diff
==
2019-07-05 04:15:53 +03:00
++ tapp
%: ^tapp
app-state
peek-data
in-poke-data
out-poke-data
in-peer-data
out-peer-data
==
++ tapp-async tapp-async:tapp
++ stdio (^stdio out-poke-data out-peer-data)
--
::
:: Async helpers
::
=> |%
++ request-rpc
|= [url=@ta id=(unit @t) req=request:rpc:ethereum]
=/ m (async:stdio ,json)
^- form:m
%+ (retry json) `10
=/ m (async:stdio ,(unit json))
^- form:m
|^
=/ =request:http
:* method=%'POST'
url=url
header-list=['Content-Type'^'application/json' ~]
^= body
%- some %- as-octt:mimes:html
%- en-json:html
(request-to-json:rpc:ethereum id req)
==
;< ~ bind:m (send-request:stdio request)
2019-07-05 23:59:29 +03:00
;< rep=(unit client-response:iris) bind:m
2019-07-05 04:15:53 +03:00
take-maybe-response:stdio
?~ rep
(pure:m ~)
(parse-response u.rep)
::
++ parse-response
2019-07-05 23:59:29 +03:00
|= =client-response:iris
2019-07-05 04:15:53 +03:00
=/ m (async:stdio ,(unit json))
^- form:m
?> ?=(%finished -.client-response)
2019-08-02 20:16:05 +03:00
?~ full-file.client-response
(pure:m ~)
=/ body=@t q.data.u.full-file.client-response
2019-07-05 04:15:53 +03:00
=/ jon=(unit json) (de-json:html body)
?~ jon
(pure:m ~)
=, dejs-soft:format
=/ array=(unit (list response:rpc:jstd))
((ar parse-one-response) u.jon)
?~ array
=/ res=(unit response:rpc:jstd) (parse-one-response u.jon)
?~ res
(async-fail:stdio %request-rpc-parse-error >id< ~)
?: ?=(%error -.u.res)
(async-fail:stdio %request-rpc-error >id< >+.res< ~)
?. ?=(%result -.u.res)
(async-fail:stdio %request-rpc-fail >u.res< ~)
(pure:m `res.u.res)
(async-fail:stdio %request-rpc-batch >%not-implemented< ~)
:: (pure:m `[%batch u.array])
::
++ parse-one-response
|= =json
^- (unit response:rpc:jstd)
=/ res=(unit [@t ^json])
%. json
=, dejs-soft:format
(ot id+so result+some ~)
?^ res `[%result u.res]
~| parse-one-response=json
:+ ~ %error %- need
%. json
=, dejs-soft:format
(ot id+so error+(ot code+no message+so ~) ~)
--
::
++ retry
|* result=mold
|= [crash-after=(unit @ud) computation=_*form:(async:stdio (unit result))]
=/ m (async:stdio ,result)
=| try=@ud
|^
|- ^- form:m
=* loop $
?: =(crash-after `try)
(async-fail:stdio %retry-too-many ~)
;< ~ bind:m (backoff try ~m1)
;< res=(unit result) bind:m computation
?^ res
(pure:m u.res)
loop(try +(try))
::
++ backoff
|= [try=@ud limit=@dr]
=/ m (async:stdio ,~)
^- form:m
;< eny=@uvJ bind:m get-entropy:stdio
;< now=@da bind:m get-time:stdio
%- wait:stdio
%+ add now
%+ min limit
?: =(0 try) ~s0
%+ add
(mul ~s1 (bex (dec try)))
(mul ~s0..0001 (~(rad og eny) 1.000))
--
::
++ get-latest-block
|= url=@ta
=/ m (async:stdio ,block)
^- form:m
;< =json bind:m (request-rpc url `'block number' %eth-block-number ~)
(get-block-by-number url (parse-eth-block-number:rpc:ethereum json))
::
++ get-block-by-number
|= [url=@ta =number:block]
=/ m (async:stdio ,block)
^- form:m
|^
;< =json bind:m
(request-rpc url `'block by number' %eth-get-block-by-number number |)
=/ =block (parse-block json)
?. =(number number.id.block)
(async-fail:stdio %reorg-detected >number< >block< ~)
(pure:m block)
::
++ parse-block
|= =json
^- block
=< [[&1 &2] |2]
^- [@ @ @]
~| json
%. json
=, dejs:format
%- ot
:~ hash+parse-hex-result:rpc:ethereum
number+parse-hex-result:rpc:ethereum
'parentHash'^parse-hex-result:rpc:ethereum
==
--
::
++ get-logs-by-hash
|= [url=@ta =hash:block contracts=(list address) =topics]
=/ m (async:stdio loglist)
2019-07-05 04:15:53 +03:00
^- form:m
;< =json bind:m
%+ request-rpc url
:* `'logs by hash'
%eth-get-logs-by-hash
hash
contracts
topics
2019-07-05 04:15:53 +03:00
==
%- pure:m
(parse-event-logs:rpc:ethereum json)
2019-07-19 23:13:14 +03:00
::
++ get-logs-by-range
|= $: url=@ta
contracts=(list address)
=topics
=from=number:block
=to=number:block
==
=/ m (async:stdio loglist)
2019-07-19 23:13:14 +03:00
^- form:m
;< =json bind:m
%+ request-rpc url
:* `'logs by range'
%eth-get-logs
`number+from-number
`number+to-number
contracts
topics
2019-07-19 23:13:14 +03:00
==
%- pure:m
(parse-event-logs:rpc:ethereum json)
2019-07-19 23:13:14 +03:00
::
++ send-logs
|= [=path =loglist]
2019-07-05 04:15:53 +03:00
=/ m (async:stdio ,~)
|- ^- form:m
=* loop $
?~ loglist
2019-07-05 04:15:53 +03:00
(pure:m ~)
;< ~ bind:m (send-update path %log i.loglist)
loop(loglist t.loglist)
::
++ send-update
2019-10-29 22:51:58 +03:00
|= [=path =diff]
=/ m (async:stdio ,~)
^- form:m
=. path [%logs path]
2019-10-29 22:51:58 +03:00
(give-result:stdio path %eth-watcher-diff diff)
2019-07-05 04:15:53 +03:00
--
::
:: Main loop
::
=> |%
2019-07-19 01:26:15 +03:00
::
:: Update watchdog configuration, then look for updates
2019-07-19 01:26:15 +03:00
::
++ configure
|= [context =config]
=/ m (async:stdio ,watchdog)
2019-07-05 04:15:53 +03:00
^- form:m
%+ get-updates path
%_ dog
- config
2019-10-29 22:51:58 +03:00
number from.config
==
2019-07-19 01:26:15 +03:00
::
:: Get updates since last checked
::
++ get-updates
|= context
=/ m (async:stdio ,watchdog)
2019-07-19 01:26:15 +03:00
^- form:m
;< =latest=block bind:m (get-latest-block url.dog)
;< dog=watchdog bind:m (zoom [path dog] number.id.latest-block)
2019-07-05 04:15:53 +03:00
|- ^- form:m
=* loop $
?: (gth number.dog number.id.latest-block)
(pure:m dog)
;< =block bind:m (get-block-by-number url.dog number.dog)
;< dog=watchdog bind:m
(take-block [path dog] block)
loop(dog dog)
2019-07-05 04:15:53 +03:00
::
2019-07-19 01:26:15 +03:00
:: Process a block, detecting and handling reorgs
::
2019-07-05 04:15:53 +03:00
++ take-block
|= [context =block]
=/ m (async:stdio ,watchdog)
2019-07-05 04:15:53 +03:00
^- form:m
:: if this next block isn't direct descendant of our logs, reorg happened
?: &(?=(^ blocks.dog) !=(parent-hash.block hash.id.i.blocks.dog))
(rewind [path dog] block)
;< [=new=pending-logs =released=loglist] bind:m
(release-old-events path pending-logs.dog number.id.block)
;< =new=loglist bind:m
(get-logs-by-hash url.dog hash.id.block contracts.dog topics.dog)
=. new-pending-logs
(~(put by new-pending-logs) number.id.block new-loglist)
%- pure:m
%_ dog
number +(number.id.block)
pending-logs new-pending-logs
history [released-loglist history.dog]
blocks [block blocks.dog]
==
2019-07-05 04:15:53 +03:00
::
2019-07-19 01:26:15 +03:00
:: Release events if they're more than 30 blocks ago
::
2019-07-05 04:15:53 +03:00
++ release-old-events
|= [=path =pending-logs =number:block]
=/ m (async:stdio ,[^pending-logs loglist])
2019-07-05 04:15:53 +03:00
^- form:m
?: (lth number 30) (pure:m pending-logs ~)
2019-08-06 09:11:40 +03:00
=/ rel-number (sub number 30)
=/ =loglist (~(get ja pending-logs) rel-number)
;< ~ bind:m (send-logs path loglist)
(pure:m (~(del by pending-logs) rel-number) loglist)
2019-07-05 04:15:53 +03:00
::
2019-07-19 01:26:15 +03:00
:: Reorg detected, so rewind until we're back in sync
::
2019-07-05 04:15:53 +03:00
++ rewind
:: block: wants to be head of blocks.dog, but might not match
|= [context =block]
=/ m (async:stdio ,watchdog)
=* blocks blocks.dog
2019-07-05 04:15:53 +03:00
|- ^- form:m
=* loop $
:: if we have no further history to rewind, we're done
2019-07-05 06:53:24 +03:00
?~ blocks
(pure:m dog(blocks [block blocks]))
:: if target block is directly after "latest", we're done
2019-07-05 06:53:24 +03:00
?: =(parent-hash.block hash.id.i.blocks)
(pure:m dog(blocks [block blocks]))
:: next-block: the new target block
;< =next=^block bind:m
(get-block-by-number url.dog number.id.i.blocks)
:: remove from either pending-logs or history
?: =(~ pending-logs.dog)
:: if no more pending logs, start deleting from history instead
::NOTE this assumes there's one history entry per item in blocks.
:: while +zoom breaks that assumption by clearing blocks, we won't
:: run out of history before running out of blocks, allowing us to
:: skip the =(number.id.block number.id.i.i.history) check.
?~ history.dog
loop(block next-block, blocks t.blocks)
;< ~ bind:m
:: don't bother sending a disavow if there were no logs there
?~ i.history.dog (pure:(async:stdio ,~) ~)
(disavow path block)
loop(block next-block, blocks t.blocks, history.dog t.history.dog)
=. pending-logs.dog
(~(del by pending-logs.dog) number.id.block)
2019-07-05 06:53:24 +03:00
loop(block next-block, blocks t.blocks)
2019-07-05 04:15:53 +03:00
::
2019-07-19 01:26:15 +03:00
:: Tell subscribers there was a deep reorg
::
2019-07-05 04:15:53 +03:00
++ disavow
|= [=path =block]
2019-07-05 04:15:53 +03:00
=/ m (async:stdio ,~)
^- form:m
(send-update path %disavow id.block)
2019-07-19 23:13:14 +03:00
::
:: Zoom forward to near a given block number.
::
:: Zooming doesn't go forward one block at a time. As a
:: consequence, it cannot detect and handle reorgs. Only use it
:: at a safe distance -- 500 blocks ago is probably sufficient.
::
++ zoom
|= [context =latest=number:block]
=/ m (async:stdio ,watchdog)
2019-07-19 23:13:14 +03:00
^- form:m
2019-08-05 21:40:13 +03:00
=/ zoom-margin=number:block 100
?: (lth latest-number (add number.dog zoom-margin))
(pure:m dog)
=/ to-number=number:block (sub latest-number zoom-margin)
;< =loglist bind:m
%: get-logs-by-range
url.dog
contracts.dog
topics.dog
number.dog
to-number
==
;< ~ bind:m (send-logs path loglist)
=. number.dog +(to-number)
=. blocks.dog ~
=. history.dog [loglist history.dog]
(pure:m dog)
2019-07-05 04:15:53 +03:00
--
::
:: Main
::
=* default-tapp default-tapp:tapp
%- create-tapp-all:tapp
^- tapp-core-all:tapp
2019-07-19 03:08:01 +03:00
|_ [=bowl:gall state=app-state]
++ handle-init
=/ m tapp-async
^- form:m
:: start update timer loop
;< now=@da bind:m get-time:stdio
;< ~ bind:m (wait-effect:stdio (add now ~m5))
(pure:m state)
::
++ handle-diff handle-diff:default-tapp
::
2019-07-05 04:15:53 +03:00
++ handle-poke
|= =in-poke-data
=/ m tapp-async
^- form:m
2019-07-19 01:26:15 +03:00
?- +<.in-poke-data
%watch
=/ dog=watchdog
(~(gut by dogs.state) path.in-poke-data *watchdog)
;< dog=watchdog bind:m
(configure [path.in-poke-data dog] config.in-poke-data)
=. dogs.state (~(put by dogs.state) path.in-poke-data dog)
(pure:m state)
::
%clear
=. dogs.state (~(del by dogs.state) path.in)
(pure:m state)
2019-07-05 04:15:53 +03:00
==
::
++ handle-take
|= =sign:tapp
2019-07-19 01:26:15 +03:00
=/ m tapp-async
^- form:m
2019-07-19 03:08:01 +03:00
?+ -.sign ~|([%strange-sign -.sign] !!)
%wake
;< ~ bind:m
;< now=@da bind:(async:tapp ,~) get-time:stdio
=/ next=@da (add now ~m5)
::NOTE we use +send-raw-card here to ensure we always set a new timer,
:: regardless of what happens further on in the flow.
(send-raw-card:stdio %wait /effect/(scot %da next) next)
::TODO ideally we'd process these in parallel. this seems possible,
:: but requires non-trivial work, as it deviates from tapp's flow.
:: (when making that change, take note of rpc request id's.)
=/ dogs=(list [=path dog=watchdog]) ~(tap by dogs.state)
|- ^- form:m
=* loop $
?~ dogs
(pure:m state)
=, i.dogs
;< dog=watchdog bind:m (get-updates path dog)
=. dogs.state (~(put by dogs.state) path dog)
loop(dogs t.dogs)
2019-07-19 01:26:15 +03:00
==
2019-07-05 04:15:53 +03:00
::
:: +handle-peer: subscribe & get initial subscription data
::
:: /logs/some-path:
::
++ handle-peer
|= =path
=/ m tapp-async
^- form:m
?. ?=([%logs ^] path)
~| [%invalid-subscription-path path]
!!
;< ~ bind:m
%+ send-effect-on-bone:stdio ost.bowl
:+ %diff %eth-watcher-diff
:- %history
^- loglist
~| [%no-such-watchdog t.path]
(zing history:(~(got by dogs.state) t.path))
(pure:m state)
::
:: +handle-peek: get diagnostics data
::
:: /block/some-path: get next block number to check for /some-path
::
++ handle-peek
|= =path
^- (unit (unit peek-data))
?. ?=([%x %block ^] path) ~
?. (~(has by dogs.state) t.t.path) ~
:+ ~ ~
:- %atom
number:(~(got by dogs.state) t.t.path)
2019-07-05 04:15:53 +03:00
--