From 1caa30d812f3003371e4a0a5707b3b45f92c6b20 Mon Sep 17 00:00:00 2001 From: Philip Monk Date: Mon, 11 Nov 2019 21:36:32 -0800 Subject: [PATCH] mall: convert eth-watcher to mall/imp --- pkg/arvo/age/eth-watcher.hoon | 360 +++++++++++++++++++++++++++++ pkg/arvo/age/spider.hoon | 29 ++- pkg/arvo/gen/spider/start.hoon | 2 +- pkg/arvo/gen/spider/tree.hoon | 11 + pkg/arvo/imp/eth-watcher.hoon | 98 ++++++++ pkg/arvo/imp/ph.hoon | 49 ---- pkg/arvo/lib/ethio.hoon | 68 ++---- pkg/arvo/lib/threadio.hoon | 38 ++- pkg/arvo/mar/eth-watcher-poke.hoon | 7 + pkg/arvo/sur/eth-watcher.hoon | 10 + 10 files changed, 552 insertions(+), 120 deletions(-) create mode 100644 pkg/arvo/age/eth-watcher.hoon create mode 100644 pkg/arvo/gen/spider/tree.hoon create mode 100644 pkg/arvo/imp/eth-watcher.hoon delete mode 100644 pkg/arvo/imp/ph.hoon create mode 100644 pkg/arvo/mar/eth-watcher-poke.hoon diff --git a/pkg/arvo/age/eth-watcher.hoon b/pkg/arvo/age/eth-watcher.hoon new file mode 100644 index 0000000000..96735686f5 --- /dev/null +++ b/pkg/arvo/age/eth-watcher.hoon @@ -0,0 +1,360 @@ +:: eth-watcher: ethereum event log collector +:: +/- *eth-watcher, spider +/+ default-agent, verb +=, ethereum-types +=, able:jael +:: +=> |% + ++ refresh-rate ~m5 + -- +:: +=> |% + +$ card card:agent:mall + +$ app-state + $: %0 + dogs=(map path watchdog) + == + :: + +$ context [=path dog=watchdog] + +$ watchdog + $: config + running=(unit (unit =iid:spider)) + =number:block + =pending-logs + =history + blocks=(list block) + == + :: + :: history: newest block first, oldest event first + +$ history (list loglist) + -- +:: +:: Helpers +:: +=> |% + ++ wait + |= now=@da + ^- card + [%pass /timer %arvo %b %wait (add now refresh-rate)] + :: + ++ wait-shortcut + |= now=@da + ^- card + [%pass /shortcut %arvo %b %wait now] + :: + ++ poke-spider + |= [=path our=@p =cage] + ^- card + [%pass [%running path] %agent [our %spider] %poke cage] + :: + ++ watch-spider + |= [=path our=@p =sub=path] + ^- card + [%pass [%running path] %agent [our %spider] %watch sub-path] + -- +:: +:: Main +:: +^- agent:mall +=| state=app-state +%+ verb & +|_ =bowl:mall ++* this . + def ~(. (default-agent this %&) bowl) +:: +++ on-init + ^- (quip card _this) + :: start update timer loop + [[(wait now.bowl) ~] this] +:: +++ on-save !>(state) +++ on-load + |= old=vase + =+ !<(old-state=app-state old) + `this(state old-state) +:: +++ on-poke + |= [=mark =vase] + ?: ?=(%noun mark) + ~& state + `this + ?. ?=(%eth-watcher-poke mark) + (on-poke:def mark vase) + :: + =+ !<(=poke vase) + ?- -.poke + %watch + :: fully restart the watchdog if it doesn't exist yet, + :: or if the new config changes more than just the url. + ~& > %ouch + =/ restart=? + ?| !(~(has by dogs.state) path.poke) + ?! .= ->:(~(got by dogs.state) path.poke) + +.config.poke + == + ~? &((~(has by dogs.state) path.poke) restart) + [dap.bowl 'overwriting existing watchdog on' path.poke] + =/ restart-cards + =/ dog (~(get by dogs.state) path.poke) + ?. ?& restart + ?=(^ dog) + ?=([~ ~ *] running.u.dog) + == + ~ + =/ =cage [%spider-stop !>([u.u.running.u.dog &])] + [%pass [%starting path] %agent [our.bowl %spider] %poke cage] + =/ new-dog + =/ dog=watchdog + ?: restart *watchdog + (~(got by dogs.state) path.poke) + %_ dog + - config.poke + number from.config.poke + == + =. dogs.state (~(put by dogs.state) path.poke new-dog) + [[(wait-shortcut now.bowl) ~] this] + :: + %clear + =. dogs.state (~(del by dogs.state) path.poke) + [~ this] + == +:: +:: +on-watch: subscribe & get initial subscription data +:: +:: /logs/some-path: +:: +++ on-watch + |= =path + ?. ?=([%logs ^] path) + ~| [%invalid-subscription-path path] + !! + !! + :: ;< ~ bind:m + :: %+ send-effect-on-bone:stdio ost.bowl + :: :+ %diff %eth-watcher-diff + :: :- %history + :: ^- loglist + :: %- zing + :: %- flop + :: =< history + :: (~(gut by dogs.state) t.path *watchdog) + :: (pure:m state) +:: +++ on-leave on-leave:def +:: +:: +on-peek: get diagnostics data +:: +:: /block/some-path: get next block number to check for /some-path +:: +++ on-peek + |= =path + ^- (unit (unit cage)) + ?. ?=([%x %block ^] path) ~ + ?. (~(has by dogs.state) t.t.path) ~ + :+ ~ ~ + :- %atom + !>(number:(~(got by dogs.state) t.t.path)) +:: +++ on-agent + |= [=wire =sign:agent:mall] + |^ + ^- (quip card agent:mall) + ?+ wire (on-agent:def wire sign) + [%starting *] + ?+ -.sign (on-agent:def wire sign) + %watch-ack + ?~ p.sign + [~ this] + %- (slog leaf+"eth-watcher failed to get iid" u.p.sign) + [~ (clear-running t.wire)] + :: + %kick + =* path t.wire + =/ dog (~(get by dogs.state) path) + ?~ dog + [~ this] + ?~ running.u.dog + [~ this] + ?^ u.running.u.dog + [~ this] + [~ this(dogs.state (~(put by dogs.state) path u.dog(running ~)))] + :: + %fact + =* path t.wire + ?> ?=(%iid p.cage.sign) + =+ !<(=new=iid:spider q.cage.sign) + =/ dog (~(get by dogs.state) path) + :: watchdog already cancelled + :: + ?~ dog + [~ this] + :: not looking for imp + :: + ?~ running.u.dog + [~ this] + :: already running imp + :: + ?^ u.running.u.dog + [~ this] + => .(running.u.dog ``new-iid) + =/ args + :^ ~ `new-iid %eth-watcher + !>(`watchpup`[- number pending-logs blocks]:u.dog) + :_ this(dogs.state (~(put by dogs.state) path u.dog)) + :~ (watch-spider path our.bowl /imp-result/[new-iid]) + (poke-spider path our.bowl %spider-start !>(args)) + == + == + :: + [%running *] + ?- -.sign + %poke-ack + ?~ p.sign + [~ this] + %- (slog leaf+"eth-watcher couldn't start imp" u.p.sign) + [~ (clear-running t.wire)] + :: + %watch-ack + ?~ p.sign + [~ this] + %- (slog leaf+"eth-watcher couldn't start listen to imp" u.p.sign) + [~ (clear-running t.wire)] + :: + %kick [~ (clear-running t.wire)] + %fact + =* path t.wire + =/ dog (~(get by dogs.state) path) + ?~ dog + [~ this] + ?+ p.cage.sign (on-agent:def wire sign) + %imp-fail + =+ !<([=term =tang] q.cage.sign) + %- (slog leaf+"eth-watcher failed; will retry" leaf+ tang) + [~ this(dogs.state (~(put by dogs.state) path u.dog(running ~)))] + :: + %imp-done + =+ !<([vows=disavows pup=watchpup] q.cage.sign) + =. u.dog + %_ u.dog + - -.pup + number number.pup + blocks blocks.pup + pending-logs pending-logs.pup + == + =^ cards-1 u.dog (disavow path u.dog vows) + =^ cards-2 u.dog (release-logs path u.dog) + =. dogs.state (~(put by dogs.state) path u.dog(running ~)) + `this + :: [(weld cards-1 cards-2) this] + == + == + == + :: + ++ clear-running + |= =path + =/ dog (~(get by dogs.state) path) + ?~ dog + this + this(dogs.state (~(put by dogs.state) path u.dog(running ~))) + :: + ++ disavow + |= [=path dog=watchdog vows=disavows] + ^- (quip card watchdog) + =/ history-ids=(list [id:block loglist]) + %+ murn history.dog + |= logs=loglist + ^- (unit [id:block loglist]) + ?~ logs + ~ + `[[block-hash block-number]:(need mined.i.logs) logs] + =/ actual-vows=disavows + %+ skim vows + |= =id:block + (lien history-ids |=([=history=id:block *] =(id history-id))) + =/ actual-history=history + %+ murn history-ids + |= [=id:block logs=loglist] + ^- (unit loglist) + ?: (lien actual-vows |=(=vow=id:block =(id vow-id))) + ~ + `logs + :_ dog(history actual-history) + %+ turn actual-vows + |= =id:block + [%give %fact `path %eth-watcher-diff !>([%disavow id])] + :: + ++ release-logs + |= [=path dog=watchdog] + ^- (quip card watchdog) + ?: (lth number.dog 30) + `dog + =/ rel-number (sub number.dog 30) + =/ numbers ~(tap in ~(key by pending-logs.dog)) + |- ^- (quip card watchdog) + ?~ numbers + `dog + ?: (gth i.numbers rel-number) + $(numbers t.numbers) + =^ cards-1 dog + =/ =loglist (~(get ja pending-logs.dog) i.numbers) + =. pending-logs.dog (~(del by pending-logs.dog) i.numbers) + ?~ loglist + `dog + =. history.dog [loglist history.dog] + :_ dog + %+ turn loglist + |= =event-log:rpc:ethereum + ^- card + [%give %fact `path %eth-watcher-diff !>([%log event-log])] + =^ cards-2 dog $(numbers t.numbers) + [(weld cards-1 cards-2) dog] + -- +:: +++ on-arvo + |= [=wire =sign-arvo] + ^- (quip card agent:mall) + ?+ +<.sign-arvo ~|([%strange-sign-arvo -.sign-arvo] !!) + %wake + =; rest + ?. =(/timer wire) + rest + [[(wait now.bowl) -.rest] +.rest] + ?^ error.sign-arvo + :: failed, try again. maybe should tell user if fails more than + :: 5 times. + :: + [[(wait now.bowl) ~] this] + :: start all updates in parallel + :: + =/ dogs=(list [=path dog=watchdog]) ~(tap by dogs.state) + =| cards=(list card) + ^- (quip card agent:mall) + =- [(flop -<) ->] + |- ^- (quip card agent:mall) + =* loop $ + ?~ dogs + [cards this] + =, i.dogs + ?^ running.dog.i.dogs + ?~ u.running.dog.i.dogs + %- (slog leaf+"eth-watcher delayed getting iid" ~) + loop(dogs t.dogs) + :: if still running, kill it and restart + :: + =/ =cage [%spider-stop !>([u.u.running.dog |])] + =. cards + :_ cards + [%pass [%starting path] %agent [our.bowl %spider] %poke cage] + loop(i.dogs i.dogs(running.dog ~)) + :: + => .(running.dog.i.dogs [~ ~]) + =. cards + :_ cards + [%pass [%starting path] %agent [our.bowl %spider] %watch /next-iid] + =. dogs.state (~(put by dogs.state) path dog) + loop(dogs t.dogs) + == +:: +++ on-fail on-fail:def +-- diff --git a/pkg/arvo/age/spider.hoon b/pkg/arvo/age/spider.hoon index e9f5785638..6187cceb8c 100644 --- a/pkg/arvo/age/spider.hoon +++ b/pkg/arvo/age/spider.hoon @@ -28,7 +28,7 @@ == :: +$ start-args - [parent=(unit iid) file=term =vase] + [parent=(unit iid) use=(unit iid) file=term =vase] -- :: :: Trie operations @@ -147,6 +147,7 @@ ^- (quip card _this) =^ cards state ?+ path (on-watch:def path) + [%next-iid ~] on-watch-next-iid [%imp @ *] (on-watch:sc t.path) [%imp-result @ ~] (on-watch-result:sc i.t.path) == @@ -202,14 +203,21 @@ ++ on-watch-result |= =iid ^- (quip card ^state) - ?> (~(has by started.state) (~(got by iid.state) iid)) + ?> (lth (slav %ud iid) count.state) :: (~(has by started.state) (~(got by iid.state) iid)) `state :: +++ on-watch-next-iid + ^- (quip card ^state) + :_ state(count +(count.state)) + :~ [%give %fact ~ %iid !>((scot %ud count.state))] + [%give %kick ~ ~] + == +:: ++ handle-sign |= [=iid =wire =sign-arvo] =/ imp (~(get by iid.state) iid) ?~ imp - %- (slog leaf+"spider got sign for non-existent {}" ~) + %- (slog leaf+"spider got sign for non-existent {}" ~) `state (take-input u.imp ~ %sign wire sign-arvo) :: @@ -217,18 +225,22 @@ |= [=iid =wire =sign:agent:mall] =/ imp (~(get by iid.state) iid) ?~ imp - %- (slog leaf+"spider got agent for non-existent {}" ~) + %- (slog leaf+"spider got agent for non-existent {}" ~) `state (take-input u.imp ~ %agent wire sign) :: ++ handle-start-imp - |= [parent-iid=(unit iid) file=term =vase] + |= [parent-iid=(unit iid) use=(unit iid) file=term =vase] ^- (quip card ^state) =/ parent-imp=imp ?~ parent-iid / (~(got by iid.state) u.parent-iid) - =/ =imp (snoc parent-imp count.state) + =^ new-iid count.state + ?~ use + [(scot %ud count.state) +(count.state)] + [u.use count.state] + =/ =imp (snoc parent-imp (slav %ud new-iid)) :: ?: (has-imp running.state imp) ~| [%already-started imp] @@ -236,10 +248,9 @@ ?: (~(has by started.state) imp) ~| [%already-starting imp] !! - =/ new-iid (scot %ud count.state) + :: =: started.state (~(put by started.state) imp vase) iid.state (~(put by iid.state) new-iid imp) - count.state +(count.state) == =/ =card =/ =schematic:ford [%path [our.bowl %home] %imp file] @@ -378,7 +389,7 @@ ++ imp-done |= [=imp =vase] ^- (quip card ^state) - %- (slog leaf+"thread {} finished" (sell vase) ~) + :: %- (slog leaf+"thread {} finished" (sell vase) ~) =/ =iid (imp-to-iid imp) =/ done-cards=(list card) :~ [%give %fact `/imp-result/[iid] %imp-done vase] diff --git a/pkg/arvo/gen/spider/start.hoon b/pkg/arvo/gen/spider/start.hoon index 6e59a93624..b3af6b16f4 100644 --- a/pkg/arvo/gen/spider/start.hoon +++ b/pkg/arvo/gen/spider/start.hoon @@ -1,3 +1,3 @@ :- %say |= [* [name=term vase=$@(~ [vase ~])] ~] -[%spider-start ~ name ?~(vase *^vase -.vase)] +[%spider-start ~ ~ name ?~(vase *^vase -.vase)] diff --git a/pkg/arvo/gen/spider/tree.hoon b/pkg/arvo/gen/spider/tree.hoon new file mode 100644 index 0000000000..c5a6a497ba --- /dev/null +++ b/pkg/arvo/gen/spider/tree.hoon @@ -0,0 +1,11 @@ +/- spider +:- %say +|= [[now=@da *] ~ *] +:- %tang +=/ tree + .^((list (list @ud)) %mx /=spider/(scot %da now)/tree/noun) +%+ turn tree +|= imp=(list @ud) +=/ =path + (turn imp |=(=@ud (scot %ud ud))) +>path< diff --git a/pkg/arvo/imp/eth-watcher.hoon b/pkg/arvo/imp/eth-watcher.hoon new file mode 100644 index 0000000000..31e95c1fcb --- /dev/null +++ b/pkg/arvo/imp/eth-watcher.hoon @@ -0,0 +1,98 @@ +:: eth-watcher: ethereum event log collector +:: +/- spider, *eth-watcher +/+ tapp, threadio, ethio +=, ethereum-types +=, able:jael +:: +:: Main loop: get updates since last checked +:: +|= [=bowl:spider args=vase] +|^ +=+ !<(pup=watchpup args) +=/ m (thread:threadio ,vase) +^- form:m +;< =latest=block bind:m (get-latest-block:ethio url.pup) +;< pup=watchpup bind:m (zoom pup number.id.latest-block) +=| vows=disavows +|- ^- form:m +=* loop $ +?: (gth number.pup number.id.latest-block) + (pure:m !>([vows pup])) +;< =block bind:m (get-block-by-number:ethio url.pup number.pup) +;< [=new=disavows pup=watchpup] bind:m (take-block pup block) +%= loop + pup pup + vows (weld vows new-disavows) +== +:: +:: Process a block, detecting and handling reorgs +:: +++ take-block + |= [pup=watchpup =block] + =/ m (thread:threadio ,[disavows watchpup]) + ^- form:m + :: if this next block isn't direct descendant of our logs, reorg happened + ?: &(?=(^ blocks.pup) !=(parent-hash.block hash.id.i.blocks.pup)) + (rewind pup block) + ;< =new=loglist bind:m :: oldest first + (get-logs-by-hash:ethio url.pup hash.id.block contracts.pup topics.pup) + %- pure:m + :- ~ + %_ pup + number +(number.id.block) + pending-logs (~(put by pending-logs.pup) number.id.block new-loglist) + blocks [block blocks.pup] + == +:: +:: Reorg detected, so rewind until we're back in sync +:: +++ rewind + :: block: wants to be head of blocks.pup, but might not match + |= [pup=watchpup =block] + =/ m (thread:threadio ,[disavows watchpup]) + =* blocks blocks.pup + =| vows=disavows + |- ^- form:m + =* loop $ + :: if we have no further history to rewind, we're done + ?~ blocks + (pure:m (flop vows) pup(blocks [block blocks])) + :: if target block is directly after "latest", we're done + ?: =(parent-hash.block hash.id.i.blocks) + (pure:m (flop vows) pup(blocks [block blocks])) + :: next-block: the new target block + ;< =next=^block bind:m + (get-block-by-number:ethio url.pup number.id.i.blocks) + =. pending-logs.pup (~(del by pending-logs.pup) number.id.i.blocks) + =. vows [id.block vows] + loop(block next-block, blocks t.blocks) +:: +:: Zoom forward to near a given block number. +:: +:: Zooming doesn't go forward one block at a time. As a +:: consequence, it cannot detect and handle reorgs. Only use it +:: at a safe distance -- 500 blocks ago is probably sufficient. +:: +++ zoom + |= [pup=watchpup =latest=number:block] + =/ m (thread:threadio ,watchpup) + ^- form:m + =/ zoom-margin=number:block 100 + ?: (lth latest-number (add number.pup zoom-margin)) + (pure:m pup) + =/ to-number=number:block (sub latest-number zoom-margin) + ;< =loglist bind:m :: oldest first + %: get-logs-by-range:ethio + url.pup + contracts.pup + topics.pup + number.pup + to-number + == + =? pending-logs.pup ?=(^ loglist) + (~(put by pending-logs.pup) to-number loglist) + =. number.pup +(to-number) + =. blocks.pup ~ + (pure:m pup) +-- diff --git a/pkg/arvo/imp/ph.hoon b/pkg/arvo/imp/ph.hoon deleted file mode 100644 index 26ca9f8b33..0000000000 --- a/pkg/arvo/imp/ph.hoon +++ /dev/null @@ -1,49 +0,0 @@ -/- spider -/+ *threadio -=, thread=thread:spider -=< ^- imp:spider - |= [=bowl:spider vase] - =/ m (thread ,vase) - ^- form:m - ~& > 'Entering pH loop' - ;< ~ bind:m - %- (main-loop ,~) - :~ handle-run - handle-stop - handle-run-all - == - (pure:m *vase) -:: -|% -++ handle-run - |= ~ - =/ m (thread ,~) - ^- form:m - ;< =vase bind:m ((handle ,vase) (take-poke %ph-run)) - =/ ph-name !<(term vase) - =/ poke-vase !>([%ph-active (cat 3 %ph- ph-name) *^vase]) - ;< ~ bind:m (poke-our %spider %spider-start poke-vase) - ;< ~ bind:m (watch-our /active %spider /imp-result/ph-active) - ;< =cage bind:m (take-fact /active) - ~& > got-fact=-.cage - (pure:m ~) -:: -++ handle-stop - |= ~ - =/ m (thread ,~) - ^- form:m - ;< =vase bind:m ((handle ,vase) (take-poke %ph-stop)) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%ph-active &])) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%aqua-ames &])) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%aqua-behn &])) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%aqua-dill &])) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%aqua-eyre &])) - ;< ~ bind:m (poke-our %spider %spider-stop !>([%aqua-eyre-azimuth &])) - (pure:m ~) -:: -++ handle-run-all - |= ~ - =/ m (thread ,~) - ^- form:m - (pure:m ~) --- diff --git a/pkg/arvo/lib/ethio.hoon b/pkg/arvo/lib/ethio.hoon index 3a214aeba2..655b713a0a 100644 --- a/pkg/arvo/lib/ethio.hoon +++ b/pkg/arvo/lib/ethio.hoon @@ -1,12 +1,10 @@ :: ethio: Asynchronous Ethereum input/output functions. ::. -/+ stdio +/+ threadio =, ethereum-types =, able:jael :: -|* [out-poke-data=mold out-peer-data=mold] => |% - ++ stdio (^stdio out-poke-data out-peer-data) +$ topics (list ?(@ux (list @ux))) -- |% @@ -14,10 +12,10 @@ :: ++ request-rpc |= [url=@ta id=(unit @t) req=request:rpc:ethereum] - =/ m (async:stdio ,json) + =/ m (thread:threadio ,json) ^- form:m - |^ %+ (retry json) `10 - =/ m (async:stdio ,(unit json)) + |^ %+ (retry:threadio json) `10 + =/ m (thread:threadio ,(unit json)) ^- form:m =/ =request:http :* method=%'POST' @@ -28,46 +26,16 @@ %- en-json:html (request-to-json:rpc:ethereum id req) == - ;< ~ bind:m (send-request:stdio request) + ;< ~ bind:m (send-request:threadio request) ;< rep=(unit client-response:iris) bind:m - take-maybe-response:stdio + take-maybe-response:threadio ?~ rep (pure:m ~) (parse-response u.rep) :: - ++ retry - |* result=mold - |= [crash-after=(unit @ud) computation=_*form:(async:stdio (unit result))] - =/ m (async:stdio ,result) - =| try=@ud - |^ |- ^- form:m - =* loop $ - ?: =(crash-after `try) - (async-fail:stdio %retry-too-many ~) - ;< ~ bind:m (backoff try ~m1) - ;< res=(unit result) bind:m computation - ?^ res - (pure:m u.res) - loop(try +(try)) - :: - ++ backoff - |= [try=@ud limit=@dr] - =/ m (async:stdio ,~) - ^- form:m - ;< eny=@uvJ bind:m get-entropy:stdio - ;< now=@da bind:m get-time:stdio - %- wait:stdio - %+ add now - %+ min limit - ?: =(0 try) ~s0 - %+ add - (mul ~s1 (bex (dec try))) - (mul ~s0..0001 (~(rad og eny) 1.000)) - -- - :: ++ parse-response |= =client-response:iris - =/ m (async:stdio ,(unit json)) + =/ m (thread:threadio ,(unit json)) ^- form:m ?> ?=(%finished -.client-response) ?~ full-file.client-response @@ -82,13 +50,13 @@ ?~ array =/ res=(unit response:rpc:jstd) (parse-one-response u.jon) ?~ res - (async-fail:stdio %request-rpc-parse-error >id< ~) + (thread-fail:threadio %request-rpc-parse-error >id< ~) ?: ?=(%error -.u.res) - (async-fail:stdio %request-rpc-error >id< >+.res< ~) + (thread-fail:threadio %request-rpc-error >id< >+.res< ~) ?. ?=(%result -.u.res) - (async-fail:stdio %request-rpc-fail >u.res< ~) + (thread-fail:threadio %request-rpc-fail >u.res< ~) (pure:m `res.u.res) - (async-fail:stdio %request-rpc-batch >%not-implemented< ~) + (thread-fail:threadio %request-rpc-batch >%not-implemented< ~) :: (pure:m `[%batch u.array]) :: ++ parse-one-response @@ -109,19 +77,19 @@ :: ++ read-contract |= [url=@t proto-read-request:rpc:ethereum] - =/ m (async:stdio ,@t) + =/ m (thread:threadio ,@t) ;< =json bind:m %^ request-rpc url id :+ %eth-call ^- call:rpc:ethereum [~ to ~ ~ ~ `tape`(encode-call:rpc:ethereum function arguments)] [%label %latest] - ?. ?=(%s -.json) (async-fail:stdio %request-rpc-fail >json< ~) + ?. ?=(%s -.json) (thread-fail:threadio %request-rpc-fail >json< ~) (pure:m p.json) :: ++ get-latest-block |= url=@ta - =/ m (async:stdio ,block) + =/ m (thread:threadio ,block) ^- form:m ;< =json bind:m (request-rpc url `'block number' %eth-block-number ~) @@ -129,7 +97,7 @@ :: ++ get-block-by-number |= [url=@ta =number:block] - =/ m (async:stdio ,block) + =/ m (thread:threadio ,block) ^- form:m |^ ;< =json bind:m @@ -138,7 +106,7 @@ [%eth-get-block-by-number number |] =/ =block (parse-block json) ?. =(number number.id.block) - (async-fail:stdio %reorg-detected >number< >block< ~) + (thread-fail:threadio %reorg-detected >number< >block< ~) (pure:m block) :: ++ parse-block @@ -158,7 +126,7 @@ :: ++ get-logs-by-hash |= [url=@ta =hash:block contracts=(list address) =topics] - =/ m (async:stdio (list event-log:rpc:ethereum)) + =/ m (thread:threadio (list event-log:rpc:ethereum)) ^- form:m ;< =json bind:m %+ request-rpc url @@ -178,7 +146,7 @@ =from=number:block =to=number:block == - =/ m (async:stdio (list event-log:rpc:ethereum)) + =/ m (thread:threadio (list event-log:rpc:ethereum)) ^- form:m ;< =json bind:m %+ request-rpc url diff --git a/pkg/arvo/lib/threadio.hoon b/pkg/arvo/lib/threadio.hoon index ea5627c6d8..ae31f0415e 100644 --- a/pkg/arvo/lib/threadio.hoon +++ b/pkg/arvo/lib/threadio.hoon @@ -423,17 +423,33 @@ [(weld cards.res cards.output) next.output] -- :: -++ backoff - |= [try=@ud limit=@dr] - =/ m (thread ,~) - ^- form:m - ;< eny=@uvJ bind:m get-entropy - %- sleep - %+ min limit - ?: =(0 try) ~s0 - %+ add - (mul ~s1 (bex (dec try))) - (mul ~s0..0001 (~(rad og eny) 1.000)) +++ retry + |* result=mold + |= [crash-after=(unit @ud) computation=_*form:(thread (unit result))] + =/ m (thread ,result) + =| try=@ud + |^ |- ^- form:m + =* loop $ + ?: =(crash-after `try) + (thread-fail %retry-too-many ~) + ;< ~ bind:m (backoff try ~m1) + ;< res=(unit result) bind:m computation + ?^ res + (pure:m u.res) + loop(try +(try)) + :: + ++ backoff + |= [try=@ud limit=@dr] + =/ m (thread ,~) + ^- form:m + ;< eny=@uvJ bind:m get-entropy + %- sleep + %+ min limit + ?: =(0 try) ~s0 + %+ add + (mul ~s1 (bex (dec try))) + (mul ~s0..0001 (~(rad og eny) 1.000)) + -- :: :: ---- :: diff --git a/pkg/arvo/mar/eth-watcher-poke.hoon b/pkg/arvo/mar/eth-watcher-poke.hoon new file mode 100644 index 0000000000..b1658d2c12 --- /dev/null +++ b/pkg/arvo/mar/eth-watcher-poke.hoon @@ -0,0 +1,7 @@ +/- *eth-watcher +|_ poke +++ grab + |% + ++ noun poke + -- +-- diff --git a/pkg/arvo/sur/eth-watcher.hoon b/pkg/arvo/sur/eth-watcher.hoon index 731a9c38a0..2e8cc23a1d 100644 --- a/pkg/arvo/sur/eth-watcher.hoon +++ b/pkg/arvo/sur/eth-watcher.hoon @@ -11,6 +11,16 @@ :: +$ loglist (list event-log:rpc:ethereum) +$ topics (list ?(@ux (list @ux))) ++$ watchpup + $: config + =number:block + =pending-logs + blocks=(list block) + == +:: +:: disavows: newest block first ++$ disavows (list id:block) ++$ pending-logs (map number:block loglist) :: +$ poke $% :: %watch: configure a watchdog and fetch initial logs