clay: rewrite new protocol

Use an explicit state machine to carefully manage pending downloads.
This commit is contained in:
Philip Monk 2020-07-23 01:36:38 -07:00
parent 553a9db843
commit 51983e5480
No known key found for this signature in database
GPG Key ID: B66E1F02604E44EC
3 changed files with 160 additions and 116 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:8317fe02192638c38467831ed9629d941414ae3a1c3506092bc60d8e4ee4051a
size 6278972
oid sha256:5ed0687965fa6df0a31c4c7622cdce3543c99b505b259d355288fbe08a1cd43d
size 6363294

View File

@ -208,7 +208,7 @@
::
++ get-germ
|= =desk
=+ .^(=cass:clay %cw /(scot %p our)/home/(scot %da now))
=+ .^(=cass:clay %cw /(scot %p our)/[desk]/(scot %da now))
?- ud.cass
%0 %init
%1 %that

View File

@ -111,7 +111,7 @@
::
:: Over-the-wire backfill request
::
+$ fill [=desk =tako =lobe]
+$ fill [=desk =lobe]
::
:: Ford cache
::
@ -220,13 +220,21 @@
::
+$ rind :: request manager
$: nix=@ud :: request index
bom=(map @ud [p=duct q=rave]) :: outstanding
mob=(map @ud down) :: active downloads
bom=(map @ud update-state) :: outstanding
fod=(map duct @ud) :: current requests
haw=(map mood (unit cage)) :: simple cache
== ::
+$ down
[=nako need=(list [=lobe =tako])]
::
:: Active downloads
::
+$ update-state
$: =duct
=rave
have=(map lobe blob)
need=(list lobe)
nako=(qeu (unit nako))
busy=_|
==
::
:: Result of a subscription
::
@ -1217,7 +1225,7 @@
(send-over-ames hen her inx syd `rave)
%= +>+.$
nix.u.ref +(nix.u.ref)
bom.u.ref (~(put by bom.u.ref) inx [hen rave])
bom.u.ref (~(put by bom.u.ref) inx [hen rave ~ ~ ~ |])
fod.u.ref (~(put by fod.u.ref) hen inx)
==
::
@ -2620,9 +2628,9 @@
?> ?=(^ ref)
=+ ruv=(~(get by bom.u.ref) inx)
?~ ruv +>.$
=/ rav=rave q.u.ruv
=/ rav=rave rave.u.ruv
?: ?=(%many -.rav)
(take-foreign-update inx rut)
abet:(apex:(foreign-update inx) rut)
?~ rut
:: nothing here, so cache that
::
@ -2697,126 +2705,148 @@
!>(;;(@uvI q.page))
--
::
:: A full foreign update. Validate and apply to our local cache of
:: their state.
:: Respond to backfill request
::
++ take-foreign-update
|= [inx=@ud rut=(unit rand)]
^+ ..take-foreign-update
:: Maybe should verify the requester is allowed to access this blob?
::
++ give-backfill
|= =lobe
^+ ..give-backfill
(emit hen %give %boon (~(got by lat.ran) lobe))
::
:: Ingest foreign update, requesting missing blobs if necessary
::
++ foreign-update
|= inx=@ud
?> ?=(^ ref)
=/ ruv (~(get by bom.u.ref) inx)
?~ ruv
~& [%clay-foreign-update-lost her syd inx]
..take-foreign-update
=. hen p.u.ruv
=/ =rave q.u.ruv
?> ?=(%many -.rave)
|^
?~ rut
=: bom.u.ref (~(del by bom.u.ref) inx)
fod.u.ref (~(del by fod.u.ref) hen)
==
=<(?>(?=(^ ref) .) wake)
=. lim ?.(?=(%da -.to.moat.rave) lim p.to.moat.rave)
?> ?=(%nako p.r.u.rut)
=/ nako ;;(nako q.r.u.rut)
=/ missing (missing-blobs nako)
=. mob.u.ref (~(put by mob.u.ref) inx nako ~(tap by missing))
(backfill inx)
=/ [sat=update-state lost=?]
=/ ruv (~(get by bom.u.ref) inx)
?~ ruv
~& [%clay-foreign-update-lost her syd inx]
[*update-state &]
[u.ruv |]
=/ done=? |
=. hen duct.sat
|%
++ abet
^+ ..foreign-update
?: lost
..foreign-update
?: done
=: bom.u.ref (~(del by bom.u.ref) inx)
fod.u.ref (~(del by fod.u.ref) hen)
==
=<(?>(?=(^ ref) .) wake)
=. bom.u.ref (~(put by bom.u.ref) inx sat)
..foreign-update
::
++ apex
|= rut=(unit rand)
^+ ..abet
?: lost ..abet
~& > taking-update=inx
?~ rut
=. nako.sat (~(put to nako.sat) ~)
work
?> ?=(%nako p.r.u.rut)
=/ nako ;;(nako q.r.u.rut)
=/ missing (missing-blobs nako)
=. need.sat `(list lobe)`(welp need.sat ~(tap in missing))
=. nako.sat (~(put to nako.sat) ~ nako)
work
::
++ missing-blobs
|= =nako
^- (map lobe tako)
^- (set lobe)
=/ yakis ~(tap in lar.nako)
|- ^- (map lobe tako)
|- ^- (set lobe)
=* yaki-loop $
?~ yakis
~
=/ lobes=(list [=path =lobe]) ~(tap by q.i.yakis)
|- ^- (map lobe tako)
|- ^- (set lobe)
=* blob-loop $
?~ lobes
yaki-loop(yakis t.yakis)
?: (~(has by lat.ran) lobe.i.lobes)
blob-loop(lobes t.lobes)
(~(put by blob-loop(lobes t.lobes)) lobe.i.lobes r.i.yakis)
--
::
:: Respond to backfill request
::
++ give-backfill
|= [=tako =lobe]
^+ ..give-backfill
|^
?> tako-in-desk
?> lobe-in-tako
(emit hen %give %boon (~(got by hut.ran) lobe))
(~(put in blob-loop(lobes t.lobes)) lobe.i.lobes)
::
++ tako-in-desk
=/ desk-tako (~(got by hit.dom) let.dom)
|- ^- ?
=/ desk-yaki (~(got by hut.ran) desk-tako)
?: =(tako r.desk-yaki)
&
(lien p.desk-yaki |=(t=^tako $(desk-tako t)))
:: Receive backfill response
::
++ lobe-in-tako
=/ =yaki (~(got by hut.ran) tako)
(lien ~(tap by q.yaki) |=([path l=^lobe] =(l lobe)))
--
::
:: Receive backfill response
::
++ take-backfill
|= [inx=@ud =blob]
^+ ..take-backfill
?> ?=(^ ref)
=/ mob (~(get by mob.u.ref) inx)
?~ mob
~& [%clay-take-backfill-lost her syd inx]
..take-backfill
:: In reverse order so we don't overwrite existing blobs
++ take-backfill
|= =blob
~& > [%taking-backfill p.blob]
^+ ..abet
?: lost ..abet
=? need.sat
?& ?=(%delta -.blob)
!(~(has by lat.ran) q.q.blob)
!(~(has by have.sat) q.q.blob)
==
[q.q.blob need.sat]
~& > [%putting-backfill p.blob]
:: We can't put a blob in lat.ran if its parent isn't already
:: there. Unions are in reverse order so we don't overwrite
:: existing blobs.
::
=. ..abet
?: &(?=(%delta -.blob) !(~(has by lat.ran) q.q.blob))
..abet(have.sat (~(uni by (malt [p.blob `^blob`blob] ~)) have.sat))
..abet(lat.ran (~(uni by (malt [p.blob blob] ~)) lat.ran))
work(busy.sat |)
::
=. lat.ran (~(uni by (malt [p.blob blob] ~)) lat.ran)
(backfill inx)
::
:: Fill in missing blobs from nako
::
++ backfill
|= inx=@ud
^+ ..backfill
?> ?=(^ ref)
|^
=/ mob (~(get by mob.u.ref) inx)
?~ mob
~& [%clay-backfill-lost her syd inx]
..backfill
:: If we have everything we need, apply it
:: Fetch next blob
::
|- ^+ ..backfill
?~ need.u.mob
=. ..backfill
++ work
^+ ..abet
?: busy.sat
..abet
|- ^+ ..abet
?: =(~ need.sat)
~& > %done-work
:: NB: if you change to release nakos as we get enough blobs
:: for them instead of all at the end, you *must* store the
:: `lim` that should be applied after the nako is complete and
:: not use the one in the rave, since that will apply to the
:: end of subscription.
::
=. lat.ran (~(uni by have.sat) lat.ran)
|- ^+ ..abet
?: =(~ nako.sat)
~& > %pausing-work
..abet
=^ next=(unit nako) nako.sat ~(get to nako.sat)
?~ next
~& > %really-done-work
..abet(done &)
~& > %applying-work
=. ..abet (apply-foreign-update u.next)
=. ..foreign-update =<(?>(?=(^ ref) .) wake)
$
~& > %need-work
?> ?=(^ need.sat)
:: This is what removes an item from `need`. This happens every
:: time we take a backfill response, but it could happen more than
:: once if we somehow got this data in the meantime (maybe from
:: another desk updating concurrently, or a previous update on this
:: same desk).
::
?: ?| (~(has by lat.ran) i.need.sat)
(~(has by have.sat) i.need.sat)
==
~& > [%skip-work i.need.sat]
$(need.sat t.need.sat)
~& > [%get-work i.need.sat]
:: Otherwise, fetch the next blob
::
=/ =fill [syd i.need.sat]
=/ =wire /back-index/(scot %p her)/[syd]/(scot %ud inx)
=/ =path [%backfill syd (scot %ud inx) ~]
=. ..foreign-update
=< ?>(?=(^ ref) .)
(apply-foreign-update nako.u.mob)
=: bom.u.ref (~(del by bom.u.ref) inx)
fod.u.ref (~(del by fod.u.ref) hen)
mob.u.ref (~(del by mob.u.ref) inx)
==
=<(?>(?=(^ ref) .) wake)
:: This is what removes an item from `need`. This happens every
:: time we take a backfill response, but it could happen more than
:: once if we somehow got this data in the meantime (maybe from
:: another desk updating concurrently).
::
?: (~(has by lat.ran) lobe.i.need.u.mob)
$(need.u.mob t.need.u.mob)
:: Otherwise, fetch the next blob
::
=/ =fill [syd [tako lobe]:i.need.u.mob]
=/ =wire /back-index/(scot %p her)/[syd]/(scot %ud inx)
=/ =path [%backfill syd (scot %ud inx) ~]
=. mob.u.ref (~(put by mob.u.ref) inx u.mob)
(emit `move`[hen %pass wire %a %plea her %c path fill])
(emit hen %pass wire %a %plea her %c path fill)
..abet(busy.sat &)
::
:: When we get a %w foreign update, store this in our state.
::
@ -2826,7 +2856,7 @@
::
++ apply-foreign-update
|= =nako
^+ ..backfill
^+ ..abet
:: hit: updated commit-hashes by @ud case
:: nut: new commit-hash/commit pairs
:: hut: updated commits by hash
@ -2863,12 +2893,20 @@
$(aeon +(aeon))
:: produce updated state
::
~& > inx=inx
=/ =rave rave:(~(got by bom.u.ref) inx)
?> ?=(%many -.rave)
=: let.dom (max let.nako let.dom)
hit.dom hit
hut.ran hut
lat.ran lat
:: Is this correct? Seeems like it should only go to `to` if
:: we've gotten all the way to the end. Leaving this
:: behavior unchanged for now, but I believe it's wrong.
::
lim ?.(?=(%da -.to.moat.rave) lim p.to.moat.rave)
==
..backfill
..abet
--
::
:: fire function if request is in future
@ -4058,7 +4096,7 @@
=* pax path.plea.req
=* res payload.plea.req
::
?: ?=([%backfill * *] pax)
?: ?=([%backfill *] pax)
=+ ;;(=fill res)
=^ mos ruf
=/ den ((de our now ski hen ruf) our desk.fill)
@ -4100,7 +4138,12 @@
^- ref=(unit rind)
?~ ref.rede-3
~
`[nix bom ~ fod haw]:u.ref.rede-3
=- `u.ref.rede-3(bom bom.-)
^- bom=(map @ud update-state)
%- ~(run by bom.u.ref.rede-3)
|= [=duct =rave]
^- update-state
[duct rave ~ ~ ~ |]
::
++ room-3-to-4
|= =room-3
@ -4507,6 +4550,7 @@
::
%boon
=+ ;; =blob payload.q.hin
~& blob=p.blob
::
=/ her=ship (slav %p i.t.tea)
=/ =desk (slav %tas i.t.t.tea)
@ -4514,7 +4558,7 @@
::
=^ mos ruf
=/ den ((de our now ski hen ruf) her desk)
abet:(take-backfill:den index blob)
abet:abet:(take-backfill:(foreign-update:den index) blob)
[mos ..^$]
==
::