add chunking and start snapshotting

This commit is contained in:
Philip Monk 2018-10-25 14:37:04 -07:00
parent 345cc92578
commit dc1888e112
No known key found for this signature in database
GPG Key ID: B66E1F02604E44EC
2 changed files with 191 additions and 34 deletions

View File

@ -48,6 +48,7 @@
urb/state-absolute :: all absolute state
sub/state-relative :: all relative state
etn=state-eth-node :: eth connection state
sap=state-snapshots :: state snapshots
== ::
++ state-relative :: urbit metadata
$: $= bal :: balance sheet (vest)
@ -84,7 +85,20 @@
$: source=(each ship node-src) :: learning from
heard=(set event-id) :: processed events
latest-block=@ud :: last heard block
foreign-block=@ud :: node's latest block
== ::
++ state-snapshots :: rewind points
$: interval=_100 :: block interval
max-count=_10 :: max snaps
count=@ud :: length of snaps
last-block=@ud :: number of last snap
snaps=(qeu [block-number=@ud snap=snapshot]) :: old states
==
++ snapshot :: rewind point
$: urb/state-absolute :: all absolute state
sub/state-relative :: all relative state
etn=state-eth-node :: eth connection state
==
++ node-src :: ethereum node comms
$: node=purl:eyre :: node url
filter-id=@ud :: current filter
@ -701,7 +715,7 @@
|=([=life =pass] `public`[live=| life (my [life pass] ~)])
=. +>.$
%- curd =< abet
(pubs:~(feel su hen our urb sub etn) kyz)
(pubs:~(feel su hen our urb sub etn sap) kyz)
:: XX save sponsor in .own.sub
:: XX reconcile with .dns.eth
:: set initial domains
@ -789,7 +803,7 @@
::
%look
%^ cute hen our.tac =< abet
(~(look et our.tac now.sys etn.lex) src.tac)
(~(look et our.tac now.sys urb.lex sub.lex etn.lex sap.lex) src.tac)
::
:: create promises
:: {$mint p/ship q/safe}
@ -831,7 +845,7 @@
::
%pubs
%- curd =< abet
(~(pubs ~(feed su hen our.tac urb sub etn) hen) who.tac)
(~(pubs ~(feed su hen our.tac urb sub etn sap) hen) who.tac)
::
:: seen after breach
:: [%meet our=ship who=ship]
@ -856,20 +870,20 @@
:: {$vein $~}
::
$vein
(curd abet:~(vein ~(feed su hen our.tac urb sub etn) hen))
(curd abet:~(vein ~(feed su hen our.tac urb sub etn sap) hen))
::
:: watch ethereum events
:: [%vent ~]
::
%vent
=. moz [[hen %give %mack ~] moz]
(curd abet:~(vent ~(feed su hen our.tac urb sub etn) hen))
(curd abet:~(vent ~(feed su hen our.tac urb sub etn sap) hen))
::
:: monitor assets
:: {$vest $~}
::
$vest
(curd abet:~(vest ~(feed su hen our.tac urb sub etn) hen))
(curd abet:~(vest ~(feed su hen our.tac urb sub etn sap) hen))
::
:: monitor all
:: {$vine $~}
@ -913,7 +927,7 @@
+>.$
=. moz [[hen %give %mack ~] moz]
%^ cute hen our =< abet
(~(hear-vent et our now.sys etn.lex) p.mes)
(~(hear-vent et our now.sys urb.lex sub.lex etn.lex sap.lex) p.mes)
==
==
::
@ -934,39 +948,46 @@
::
[%e %sigh *]
%^ cute hen our =< abet
(~(sigh et our now.sys etn.lex) wir p.hin)
(~(sigh et our now.sys urb.lex sub.lex etn.lex sap.lex) wir p.hin)
::
[%b %wake ~]
%^ cute hen our
:: XX cleanup
::
?. ?=([%init ~] wir)
abet:~(wake et our now.sys etn.lex)
abet:(~(init et our now.sys etn.lex) our (sein our))
abet:~(wake et our now.sys urb.lex sub.lex etn.lex sap.lex)
abet:(~(init et our now.sys [urb sub etn sap]:lex) our (sein our))
::
[%j %vent *]
%^ cute hen our =< abet
(~(hear-vent et our now.sys etn.lex) p.hin)
(~(hear-vent et our now.sys urb.lex sub.lex etn.lex sap.lex) p.hin)
==
:: :: ++curd:of
++ curd :: relative moves
|= {moz/(list move) sub/state-relative etn/state-eth-node}
+>(sub sub, etn etn, moz (weld (flop moz) ^moz))
|= $: moz/(list move)
sub/state-relative
etn/state-eth-node
sap/state-snapshots
==
+>(sub sub, etn etn, sap sap, moz (weld (flop moz) ^moz))
:: :: ++cure:of
++ cure :: absolute edits
|= {hen/duct our/ship hab/(list change) urb/state-absolute}
^+ +>
(curd(urb urb) abet:(~(apex su hen our urb sub etn) hab))
(curd(urb urb) abet:(~(apex su hen our urb sub etn sap) hab))
:: :: ++cute:of
++ cute :: ethereum changes
|= $: hen=duct
our=ship
mos=(list move)
ven=chain
net=state-eth-node
urb=state-absolute
sub=state-relative
etn=state-eth-node
sap=state-snapshots
==
^+ +>
%- cure(etn net, moz (weld (flop mos) moz))
%- cure(urb urb, sub sub, etn etn, sap sap, moz (weld (flop mos) moz))
[hen our abet:(link:(burb our) ven)]
--
:: :: ++su
@ -992,6 +1013,7 @@
state-absolute
state-relative
state-eth-node
state-snapshots
==
:: moz: moves in reverse order
:: urb: absolute urbit state
@ -999,7 +1021,8 @@
::
=* urb ->+<
=* sub ->+>-
=* etn ->+>+
=* etn ->+>+<
=* sap ->+>+>
|%
:: :: ++abet:su
++ abet :: resolve
@ -1007,7 +1030,7 @@
:: => (exec yen.eth [%give %vent |+evs])
=> ?~ evs .
(vent-pass yen.eth |+evs)
[(flop moz) sub etn]
[(flop moz) sub etn sap]
:: :: ++apex:su
++ apex :: apply changes
|= hab/(list change)
@ -1323,10 +1346,12 @@
heard (~(put in heard) wer)
latest-block (max latest-block block.wer)
==
?- -.dif
%hull (file-hull +.dif)
%dns [kyz (file-dns +.dif)]
==
=^ kyz ..file
?- -.dif
%hull (file-hull +.dif)
%dns [kyz (file-dns +.dif)]
==
[kyz (file-snap wer)]
::
++ file-hull
|= [who=ship dif=diff-hull]
@ -1383,6 +1408,47 @@
++ file-dns
|= dns=dnses
..file(dns.eth dns)
::
++ file-snap :: save snapshot
|= wer=event-id
^+ ..file
=? sap
%+ lth 2
%+ sub.add
(div block.wer interval.sap)
(div last-block.sap interval.sap)
~& :* %snap count=count.sap max-count=max-count.sap
last-block=last-block.sap interval=interval.sap
lent=(lent ~(tap to snaps.sap))
==
%= sap
snaps (~(put to snaps.sap) block.wer extract-snap)
count +(count.sap)
last-block block.wer
==
=? sap (gth count.sap max-count.sap)
~& :* %dump count=count.sap max-count=max-count.sap
lent=(lent ~(tap to snaps.sap))
==
%= sap
snaps +:~(get to snaps.sap)
count (dec count)
==
..file
::
++ extract-snap :: extract rewind point
^- snapshot
:* urb
%= sub
yen.bal ~
yen.own ~
yen.puk ~
yen.eth ~
==
%= etn
source *(each ship node-src)
==
==
--
--
:: :: ++ur
@ -1514,19 +1580,28 @@
=| moves=(list move)
=+ reset=|
=| changes=logs
|_ $: our=ship
=| $: our=ship
now=@da
state-absolute
state-relative
state-eth-node
state-snapshots
==
+* etn +<+>
=* urb ->+<
=* sub ->+>-
=* etn ->+>+<
=* sap ->+>+>
::
:: +| outward
|%
::
:: +abet: produce results
::
++ abet
^- [(list move) chain state-eth-node]
[(flop moves) ?:(reset &+changes |+changes) etn]
^- $: (list move) chain state-absolute state-relative
state-eth-node state-snapshots
==
[(flop moves) ?:(reset &+changes |+changes) urb sub etn sap]
::
:: +put-move: store side-effect
::
@ -1589,9 +1664,40 @@
:: or do we have a unique duct here?
[%a %want [our p.source] /j/(scot %p our)/vent `*`[%nuke ~]]
::
:: +listen-to-node: start syncing from a node
::
:: Get latest block from eth node and compare to our own latest block.
:: Get intervening blocks in chunks until we're caught up, then set
:: up a filter going forward.
::
++ listen-to-node
|= url=purl:eyre
new-filter(source |+%*(. *node-src node url))
get-latest-block(source |+%*(. *node-src node url))
::
:: +| catch-up-operations
::
:: +get-latest-block
::
:: Get latest known block number from eth node.
::
++ get-latest-block
(put-request /catch-up/block-number `'block number' %eth-block-number ~)
::
:: +catch-up: get next chunk
::
++ catch-up
?: (gte latest-block foreign-block)
new-filter
=/ next-block (min foreign-block (add latest-block 5.760)) :: ~d1
~& [%catching-up from=latest-block to=foreign-block]
%- put-request
:+ /catch-up/step/(scot %ud next-block) `'catch up'
:* %eth-get-logs
`number+latest-block
`number+next-block
~[ships:contracts]
~
==
::
:: +| filter-operations
::
@ -1604,7 +1710,10 @@
%- put-request
:+ /filter/new `'new filter'
:* %eth-new-filter
`[%number ?:((lte latest-block 40) 0 (sub latest-block 40))]
`number+latest-block
:: XX We want to load from a snapshot at least 40 blocks behind, then
:: replay to the present
:: `[%number ?:((lte latest-block 40) 0 (sub.add latest-block 40))]
::TODO or Ships origin block when 0
~ ::TODO we should probably chunck these, maybe?
:: https://stackoverflow.com/q/49339489
@ -1660,6 +1769,7 @@
^+ +>
:: TODO: ship or node as sample?
::
=. latest-block launch:contracts
?. =(our bos)
(listen-to-ship our bos)
=+ (need (de-purl:html 'http://localhost:8545'))
@ -1738,14 +1848,19 @@
~_ q.res
+>
?> ?=(%json-rpc-response mar)
:: ~| res
=+ rep=((hard response:rpc:jstd) q.res)
=+ rep=~|(res ((hard response:rpc:jstd) q.res))
?+ cuz ~|([%weird-sigh-wire cuz] !!)
[%filter %new *]
(take-new-filter rep)
::
[%filter *]
(take-filter-results rep)
::
[%catch-up %block-number ~]
(take-block-number rep)
::
[%catch-up %step @ta ~]
(take-catch-up-step rep (slav %ud `@ta`i.t.t.cuz))
==
::
:: +take-new-filter: store filter-id and read it
@ -1779,9 +1894,47 @@
(gth now poll-timer.p.source)
==
wait-poll
?> ?=(%a -.res.rep)
(take-events rep)
::
:: +take-block-number: take block number and start catching up
::
++ take-block-number
|= rep=response:rpc:jstd
^+ +>
?< ?=(%batch -.rep)
?: ?=(%error -.rep)
~& [%take-block-number-error--retrying message.rep]
get-latest-block
=. foreign-block (parse-eth-block-number res.rep)
~& [%setting-foreign-block foreign-block]
catch-up
::
:: +take-catch-up-step: process chunk
::
++ take-catch-up-step
|= [rep=response:rpc:jstd next-block=@ud]
^+ +>
?< ?=(%batch -.rep)
?: ?=(%error -.rep)
~& [%catch-up-step-error--retrying message.rep]
catch-up
:: XX file
=. +>.$ (take-events rep)
=. latest-block next-block
catch-up
::
:: +take-events: process events
::
++ take-events
|= rep=response:rpc:jstd
^+ +>
?< ?=(%batch -.rep)
?< ?=(%error -.rep)
?. ?=(%a -.res.rep)
~& [%events-not-array rep]
!!
=* changes p.res.rep
~& :* %filter-changes
~& :* %processing-changes
changes=(lent changes)
block=latest-block
id=?.(?=(%| -.source) ~ `@ux`filter-id.p.source)

View File

@ -375,6 +375,8 @@
:: 0xab87.24a7.a953.ef14.e940.b358.6b21.a889.b62f.3d56 :: ropsten
0x84b3.7fbc.6188.da8a.e866.1eae.322a.f4d9.2db4.5ecc :: joe
:: 0x7134.3566.74e4.0c93.8736.8699.1af8.86dd.2ae8.e642 :: philip
++ launch
4.230.928
--
::
:: hashes of ship event signatures
@ -2160,11 +2162,11 @@
++ pile (tree (pair @ @)) :: efficient ship set
++ rite :: urbit commitment
$% {$apple p/(map site @)} :: web api key
{$block ~} :: banned
{$block ~} :: banned
{$email p/(set @t)} :: email addresses
{$final p/(map ship @pG)} :: ticketed ships
{$fungi p/(map term @ud)} :: fungibles
{$guest ~} :: refugee visa
{$guest ~} :: refugee visa
{$hotel p/(map dorm pile)} :: reserved block
{$jewel p/(map life ring)} :: private keyring
{$login p/(set @pG)} :: login secret
@ -7455,6 +7457,8 @@
::
++ parse-eth-new-filter-res parse-hex-result
::
++ parse-eth-block-number parse-hex-result
::
++ parse-transaction-hash parse-hex-result
::
++ parse-event-logs