From c3ac547b3bb43785ae0e7bd8511c3dd7c89c9406 Mon Sep 17 00:00:00 2001 From: Fang Date: Wed, 30 Oct 2019 00:37:12 +0100 Subject: [PATCH] eth-watcher: store logs in state to implement peer In order to give an initial response to incoming subscriptions (without resorting to retrieving that data from chain again) we now store event log history in state. Instead of discarding pending-logs entirely after sending out updates, we add them to the watchdog's history. Just like pending-logs, we remove from the head during a rewind (though not before exhausting the pending-logs). --- pkg/arvo/app/eth-watcher.hoon | 83 +++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/pkg/arvo/app/eth-watcher.hoon b/pkg/arvo/app/eth-watcher.hoon index 5d09b4c2f..f65d19e0b 100644 --- a/pkg/arvo/app/eth-watcher.hoon +++ b/pkg/arvo/app/eth-watcher.hoon @@ -15,9 +15,11 @@ $: config =number:block =pending-logs + =history blocks=(list block) == :: + +$ history (list loglist) +$ pending-logs (map number:block loglist) :: +$ peek-data @@ -261,58 +263,79 @@ ?: (gth number.dog number.id.latest-block) (pure:m dog) ;< =block bind:m (get-block-by-number url.dog number.dog) - ;< [=new=pending-logs new-blocks=(lest ^block)] bind:m + ;< dog=watchdog bind:m (take-block [path dog] block) - %_ loop - pending-logs.dog new-pending-logs - blocks.dog new-blocks - number.dog +(number.id.i.new-blocks) - == + loop(dog dog) :: :: Process a block, detecting and handling reorgs :: ++ take-block |= [context =block] - =/ m (async:stdio ,[pending-logs (lest ^block)]) + =/ m (async:stdio ,watchdog) ^- form:m + :: if this next block isn't direct descendant of our logs, reorg happened ?: &(?=(^ blocks.dog) !=(parent-hash.block hash.id.i.blocks.dog)) - (rewind path url.dog pending-logs.dog block blocks.dog) - ;< =new=pending-logs bind:m + (rewind [path dog] block) + ;< [=new=pending-logs =released=loglist] bind:m (release-old-events path pending-logs.dog number.id.block) ;< =new=loglist bind:m (get-logs-by-hash url.dog hash.id.block contracts.dog topics.dog) =. new-pending-logs (~(put by new-pending-logs) number.id.block new-loglist) - (pure:m new-pending-logs [block blocks.dog]) + %- pure:m + %_ dog + number +(number.id.block) + pending-logs new-pending-logs + history [released-loglist history.dog] + blocks [block blocks.dog] + == :: :: Release events if they're more than 30 blocks ago :: ++ release-old-events |= [=path =pending-logs =number:block] - =/ m (async:stdio ,^pending-logs) + =/ m (async:stdio ,[^pending-logs loglist]) ^- form:m - ?: (lth number 30) (pure:m pending-logs) + ?: (lth number 30) (pure:m pending-logs ~) =/ rel-number (sub number 30) =/ =loglist (~(get ja pending-logs) rel-number) ;< ~ bind:m (send-logs path loglist) - (pure:m (~(del by pending-logs) rel-number)) + (pure:m (~(del by pending-logs) rel-number) loglist) :: :: Reorg detected, so rewind until we're back in sync :: ++ rewind - |= [=path url=@ta =pending-logs =block blocks=(list block)] - =/ m (async:stdio ,[^pending-logs (lest ^block)]) + :: block: wants to be head of blocks.dog, but might not match + |= [context =block] + =/ m (async:stdio ,watchdog) + =* blocks blocks.dog |- ^- form:m =* loop $ + :: if we have no further history to rewind, we're done ?~ blocks - (pure:m pending-logs block blocks) + (pure:m dog(blocks [block blocks])) + :: if target block is directly after "latest", we're done ?: =(parent-hash.block hash.id.i.blocks) - (pure:m pending-logs block blocks) - ;< =next=^block bind:m (get-block-by-number url number.id.i.blocks) - ?: =(~ pending-logs) - ;< ~ bind:m (disavow path block) - loop(block next-block, blocks t.blocks) - =. pending-logs (~(del by pending-logs) number.id.block) + (pure:m dog(blocks [block blocks])) + :: next-block: the new target block + ;< =next=^block bind:m + (get-block-by-number url.dog number.id.i.blocks) + :: remove from either pending-logs or history + ?: =(~ pending-logs.dog) + :: if no more pending logs, start deleting from history instead + ::NOTE this assumes there's one history entry per item in blocks. + :: while +zoom breaks that assumption by clearing blocks, we won't + :: run out of history before running out of blocks, allowing us to + :: skip the =(number.id.block number.id.i.i.history) check. + ?~ history.dog + loop(block next-block, blocks t.blocks) + ;< ~ bind:m + :: don't bother sending a disavow if there were no logs there + ?~ i.history.dog (pure:(async:stdio ,~) ~) + (disavow path block) + loop(block next-block, blocks t.blocks, history.dog t.history.dog) + =. pending-logs.dog + (~(del by pending-logs.dog) number.id.block) loop(block next-block, blocks t.blocks) :: :: Tell subscribers there was a deep reorg @@ -348,6 +371,7 @@ ;< ~ bind:m (send-logs path loglist) =. number.dog +(to-number) =. blocks.dog ~ + =. history.dog [loglist history.dog] (pure:m dog) -- :: @@ -411,11 +435,24 @@ loop(dogs t.dogs) == :: +:: +handle-peer: subscribe & get initial subscription data +:: +:: /logs/some-path: +:: ++ handle-peer |= =path =/ m tapp-async ^- form:m - ::TODO + ?. ?=([%logs ^] path) + ~| [%invalid-subscription-path path] + !! + ;< ~ bind:m + %+ send-effect-on-bone:stdio ost.bowl + :+ %diff %eth-watcher-diff + :- %history + ^- loglist + ~| [%no-such-watchdog t.path] + (zing history:(~(got by dogs.state) t.path)) (pure:m state) :: :: +handle-peek: get diagnostics data