eth-watcher: store logs in state to implement peer

In order to give an initial response to incoming subscriptions (without
resorting to retrieving that data from chain again) we now store event
log history in state.

Instead of discarding pending-logs entirely after sending out updates,
we add them to the watchdog's history.
Just like pending-logs, we remove from the head during a rewind (though
not before exhausting the pending-logs).
This commit is contained in:
Fang 2019-10-30 00:37:12 +01:00
parent 277f2955c3
commit c3ac547b3b
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972

View File

@ -15,9 +15,11 @@
$: config
=number:block
=pending-logs
=history
blocks=(list block)
==
::
+$ history (list loglist)
+$ pending-logs (map number:block loglist)
::
+$ peek-data
@ -261,58 +263,79 @@
?: (gth number.dog number.id.latest-block)
(pure:m dog)
;< =block bind:m (get-block-by-number url.dog number.dog)
;< [=new=pending-logs new-blocks=(lest ^block)] bind:m
;< dog=watchdog bind:m
(take-block [path dog] block)
%_ loop
pending-logs.dog new-pending-logs
blocks.dog new-blocks
number.dog +(number.id.i.new-blocks)
==
loop(dog dog)
::
:: Process a block, detecting and handling reorgs
::
++ take-block
|= [context =block]
=/ m (async:stdio ,[pending-logs (lest ^block)])
=/ m (async:stdio ,watchdog)
^- form:m
:: if this next block isn't direct descendant of our logs, reorg happened
?: &(?=(^ blocks.dog) !=(parent-hash.block hash.id.i.blocks.dog))
(rewind path url.dog pending-logs.dog block blocks.dog)
;< =new=pending-logs bind:m
(rewind [path dog] block)
;< [=new=pending-logs =released=loglist] bind:m
(release-old-events path pending-logs.dog number.id.block)
;< =new=loglist bind:m
(get-logs-by-hash url.dog hash.id.block contracts.dog topics.dog)
=. new-pending-logs
(~(put by new-pending-logs) number.id.block new-loglist)
(pure:m new-pending-logs [block blocks.dog])
%- pure:m
%_ dog
number +(number.id.block)
pending-logs new-pending-logs
history [released-loglist history.dog]
blocks [block blocks.dog]
==
::
:: Release events if they're more than 30 blocks ago
::
++ release-old-events
|= [=path =pending-logs =number:block]
=/ m (async:stdio ,^pending-logs)
=/ m (async:stdio ,[^pending-logs loglist])
^- form:m
?: (lth number 30) (pure:m pending-logs)
?: (lth number 30) (pure:m pending-logs ~)
=/ rel-number (sub number 30)
=/ =loglist (~(get ja pending-logs) rel-number)
;< ~ bind:m (send-logs path loglist)
(pure:m (~(del by pending-logs) rel-number))
(pure:m (~(del by pending-logs) rel-number) loglist)
::
:: Reorg detected, so rewind until we're back in sync
::
++ rewind
|= [=path url=@ta =pending-logs =block blocks=(list block)]
=/ m (async:stdio ,[^pending-logs (lest ^block)])
:: block: wants to be head of blocks.dog, but might not match
|= [context =block]
=/ m (async:stdio ,watchdog)
=* blocks blocks.dog
|- ^- form:m
=* loop $
:: if we have no further history to rewind, we're done
?~ blocks
(pure:m pending-logs block blocks)
(pure:m dog(blocks [block blocks]))
:: if target block is directly after "latest", we're done
?: =(parent-hash.block hash.id.i.blocks)
(pure:m pending-logs block blocks)
;< =next=^block bind:m (get-block-by-number url number.id.i.blocks)
?: =(~ pending-logs)
;< ~ bind:m (disavow path block)
loop(block next-block, blocks t.blocks)
=. pending-logs (~(del by pending-logs) number.id.block)
(pure:m dog(blocks [block blocks]))
:: next-block: the new target block
;< =next=^block bind:m
(get-block-by-number url.dog number.id.i.blocks)
:: remove from either pending-logs or history
?: =(~ pending-logs.dog)
:: if no more pending logs, start deleting from history instead
::NOTE this assumes there's one history entry per item in blocks.
:: while +zoom breaks that assumption by clearing blocks, we won't
:: run out of history before running out of blocks, allowing us to
:: skip the =(number.id.block number.id.i.i.history) check.
?~ history.dog
loop(block next-block, blocks t.blocks)
;< ~ bind:m
:: don't bother sending a disavow if there were no logs there
?~ i.history.dog (pure:(async:stdio ,~) ~)
(disavow path block)
loop(block next-block, blocks t.blocks, history.dog t.history.dog)
=. pending-logs.dog
(~(del by pending-logs.dog) number.id.block)
loop(block next-block, blocks t.blocks)
::
:: Tell subscribers there was a deep reorg
@ -348,6 +371,7 @@
;< ~ bind:m (send-logs path loglist)
=. number.dog +(to-number)
=. blocks.dog ~
=. history.dog [loglist history.dog]
(pure:m dog)
--
::
@ -411,11 +435,24 @@
loop(dogs t.dogs)
==
::
:: +handle-peer: subscribe & get initial subscription data
::
:: /logs/some-path:
::
++ handle-peer
|= =path
=/ m tapp-async
^- form:m
::TODO
?. ?=([%logs ^] path)
~| [%invalid-subscription-path path]
!!
;< ~ bind:m
%+ send-effect-on-bone:stdio ost.bowl
:+ %diff %eth-watcher-diff
:- %history
^- loglist
~| [%no-such-watchdog t.path]
(zing history:(~(got by dogs.state) t.path))
(pure:m state)
::
:: +handle-peek: get diagnostics data