Merge branch 'ted/kiln-warp' into m/distpilled, and

modify Kiln to listen for %writ's from Clay to track commits to Clay
that were not generated by Kiln itself.
This commit is contained in:
Ted Blackman 2021-09-16 12:25:02 -04:00
commit 3b30b67a3e
3 changed files with 194 additions and 116 deletions

View File

@ -2,8 +2,8 @@
/+ drum=hood-drum, helm=hood-helm, kiln=hood-kiln
|%
+$ state
$~ [%19 *state:drum *state:helm *state:kiln]
$>(%19 any-state)
$~ [%20 *state:drum *state:helm *state:kiln]
$>(%20 any-state)
::
+$ any-state
$% [ver=?(%1 %2 %3 %4 %5 %6) lac=(map @tas fin-any-state)]
@ -20,6 +20,7 @@
[%17 drum=state-4:drum helm=state:helm kiln=state-4:kiln]
[%18 drum=state-4:drum helm=state:helm kiln=state-5:kiln]
[%19 drum=state-4:drum helm=state:helm kiln=state-6:kiln]
[%20 drum=state-4:drum helm=state:helm kiln=state-7:kiln]
==
+$ any-state-tuple
$: drum=any-state:drum

View File

@ -5,7 +5,8 @@
=, format
=* dude dude:gall
|%
+$ state state-6
+$ state state-7
+$ state-7 [%7 pith-7]
+$ state-6 [%6 pith-6]
+$ state-5 [%5 pith-5]
+$ state-4 [%4 pith-4]
@ -15,7 +16,8 @@
+$ state-0 [%0 pith-0]
+$ any-state
$~ *state
$% state-6
$% state-7
state-6
state-5
state-4
state-3
@ -24,7 +26,7 @@
state-0
==
::
+$ pith-6
+$ pith-7
$: wef=(unit weft)
rem=(map desk per-desk) ::
syn=(map kiln-sync let=@ud) ::
@ -40,10 +42,29 @@
hxs=(map desk @ud)
== ::
::
+$ pith-6
$: wef=(unit weft)
rem=(map desk per-desk) ::
syn=(map kiln-sync let=@ud) ::
ark=(map desk arak-6) ::
commit-timer=[way=wire nex=@da tim=@dr mon=term] ::
:: map desk to the currently ongoing fuse request
:: and the latest version numbers for beaks to
fus=(map desk per-fuse)
:: used for fuses - every time we get a fuse we
:: bump this. used when calculating hashes to
:: ensure they're unique even when the same
:: request is made multiple times.
hxs=(map desk @ud)
== ::
::
+$ arak-6 [rail=rail-6 next=(list rung) =rein]
+$ rail-6 [paused=? =ship =desk =aeon]
::
+$ pith-5
$: rem=(map desk per-desk) ::
syn=(map kiln-sync let=@ud) ::
ark=(map desk arak) ::
ark=(map desk arak-6) ::
commit-timer=[way=wire nex=@da tim=@dr mon=term] ::
:: map desk to the currently ongoing fuse request
:: and the latest version numbers for beaks to
@ -232,7 +253,7 @@
|-
?~ dez ..on-init
=. ..on-init
abet:(install:vats i.dez our i.dez)
abet:(install-local:vats i.dez)
=? ..on-init !=(sop our)
abet:(install:vats i.dez sop i.dez)
$(dez t.dez)
@ -289,12 +310,23 @@
=- +.old(ark -)
%- ~(run by ark.old)
|= a=arak-4
^- arak
^- arak-6
[[paused=| ship desk aeon] next rein]:a
::
=? old ?=(%5 -.old)
[%6 ~ +.old]
::
?> ?=(%6 -.old)
=? old ?=(%6 -.old)
:- %7
=- +.old(ark -)
%- ~(run by ark.old)
|= a=arak-6
^- arak
:_ rein.a
^- (unit rail)
`[paused.rail ship.rail desk.rail aeon.rail next]:a
::
?> ?=(%7 -.old)
=. +<+.$.abet old
=< abet
=? kiln ?=(^ old-ota)
@ -343,6 +375,7 @@
::
++ vats
|_ [loc=desk rak=arak]
++ ral (need rail.rak)
++ vats .
++ abet kiln(ark (~(put by ark) loc rak))
++ abed
@ -350,7 +383,7 @@
~_ leaf/"kiln: {<lac>} not installed"
vats(loc lac, rak (~(got by ark) lac))
::
++ here "{<loc>} from {<[ship desk]:rail.rak>}"
++ here "{<loc>} from {<[ship desk]:ral>}"
++ make-wire |=(step=@tas /kiln/vats/[loc]/[step])
++ from-wire
|= =wire
@ -370,21 +403,23 @@
++ pyre |=(=tang [%pass /kiln/vats %pyre tang])
++ find (warp %find [%sing %y ud+1 /])
++ sync-da (warp %sync [%sing %w da+now /])
++ sync-ud (warp %sync [%sing %w ud+aeon.rail.rak /])
++ download (warp %download [%sing %v ud+aeon.rail.rak /])
++ sync-ud (warp %sync [%sing %w ud+aeon:ral /])
++ download (warp %download [%sing %v ud+aeon:ral /])
++ warp
|= [s=term r=rave]
(clay-card s %warp ship.rail.rak desk.rail.rak `r)
(clay-card s %warp ship:ral desk:ral `r)
++ merge-main
=/ germ (get-germ loc)
=/ =aeon (dec aeon.rail.rak)
=/ =aeon (dec aeon:ral)
%+ clay-card %merge-main
[%merg loc ship.rail.rak desk.rail.rak ud+aeon germ]
[%merg loc ship:ral desk:ral ud+aeon germ]
++ merge-kids
=/ germ (get-germ %kids)
=/ =aeon (dec aeon.rail.rak)
=/ =aeon (dec aeon:ral)
%+ clay-card %merge-kids
[%merg %kids ship.rail.rak desk.rail.rak ud+aeon germ]
[%merg %kids ship:ral desk:ral ud+aeon germ]
++ listen
(clay-card %listen %warp our loc `[%next %z da+now /])
++ clay-card
|= [step=@tas =task:clay]
^- card:agent:gall
@ -430,49 +465,66 @@
++ install
|= [lac=desk her=ship rem=desk]
^+ vats
?: =([her rem] [our lac])
(install-local lac)
=/ got (~(get by ark) lac)
?: =(`[her rem] got)
~> %slog.0^leaf/"kiln: already tracking {here:(abed lac)}, ignoring"
vats
=: loc lac
rak [[paused=| her rem *aeon] next=~ rein:(fall got *arak)]
rak [`[paused=| her rem *aeon next=~] rein:(fall got *arak)]
==
?. ?& =(our her)
::
=- (~(has in -) rem)
.^((set desk) %cd /(scot %p our)//(scot %da now))
==
~> %slog.0^leaf/"kiln: beginning install into {here}"
(emit find:pass)
~> %slog.0^leaf/"kiln: beginning local install into {here}"
%- take-find
:+ %clay %writ
:- ~
:+ [%y ud+1 rem] /
:- %arch
!> .^(arch %cy /(scot %p our)/[rem]/1)
~> %slog.0^leaf/"kiln: beginning install into {here}"
(emil find:pass listen:pass ~)
:: +install-local: install from a local desk, with no remote
::
++ install-local
|= lac=desk
^+ vats
?: (~(has by ark) loc)
~> %slog.0^leaf/"kiln: already tracking {here:(abed lac)}, ignoring"
vats
=: loc lac
rak [~ *rein]
==
~> %slog.0^leaf/"kiln: local install {here}"
=. vats (update-running-apps (get-apps-diff our loc now rein.rak))
=. vats (emit listen:pass)
vats
:: +reset: resync after failure
::
:: TODO: instead of jumping all the way back to find:pass,
:: which will end up skipping all the way until the latest
:: remote commit, increment the aeon so we skip only the problematic
:: commit and try the commit immediately after it.
::
++ reset
^+ vats
~> %slog.0^leaf/"kiln: resetting tracking for {here}"
=/ cad (diff:give %reset loc rak)
=. aeon.rail.rak 0
=. next.rak ~
=/ rel ral
=. rail.rak `rel(aeon 0, next ~)
(emil find:pass cad ~)
:: +pause: stop syncing from upstream
::
++ pause
|= lac=desk
^+ vats
?. is-tracking
~> %slog.0^leaf/"kiln: {<lac>} already paused, ignoring"
vats
=. vats (abed lac)
~> %slog. :+ %0 %leaf
?: paused.rail.rak
"kiln: {<lac>} already paused, ignoring"
"kiln: {<lac>} pausing updates"
=: paused.rail.rak &
aeon.rail.rak 0
==
~> %slog.0^leaf/"kiln: {<lac>} pausing updates"
=/ rel ral
=. rail.rak `rel(paused &, aeon 0)
vats
:: +remove-upstream: stop listening to an upstream for changes
::
++ remove-upstream
|= lac=desk
^+ vats
=. vats (abed lac)
=. rail.rak ~
vats
:: +resume: restart tracking from upstream
::
@ -483,7 +535,7 @@
^+ vats
=. vats (abed lac)
~> %slog. :+ %0 %leaf
?. paused.rail.rak
?. paused:ral
"kiln: {<lac>} already tracking, ignoring"
"kiln: {<lac>} resuming updates"
reset
@ -584,7 +636,7 @@
?: =([~ kel] (read-kelvin-local our desk now))
~> %slog.0^leaf/"kiln: {here} already at {<[- +]:kel>}, ignoring"
vats
=^ tem next.rak (crank-next %| kel)
=^ tem rail.rak (crank-next %| kel)
?^ tem
(emit merge-main:pass)
=- (emit (pyre:pass leaf/- ~))
@ -613,6 +665,7 @@
%find (take-find syn)
%sync (take-sync syn)
%download (take-download syn)
%listen (take-listen syn)
%merge-main (take-merge-main syn)
%merge-kids (take-merge-kids syn)
==
@ -621,62 +674,47 @@
|= syn=sign-arvo
^+ vats
?> ?=(%writ +<.syn)
?: paused.rail.rak
?. is-tracking
vats
?~ p.syn
~> %slog.0^leaf/"kiln: cancelled (1) install into {here}, aborting"
vats(ark (~(del by ark) loc))
?. =(our ship.rail.rak)
~> %slog.0^leaf/"kiln: activated install into {here}"
(emit sync-da:pass)
~> %slog.0^leaf/"kiln: activated local install into {here}"
%- take-sync
:+ %clay %writ
:- ~
:+ [%w da+now desk.rail.rak] /
:- %cass
!> .^(cass %cw /(scot %p our)/[desk.rail.rak]/(scot %da now))
~> %slog.0^leaf/"kiln: activated install into {here}"
(emit sync-da:pass)
::
++ take-sync
|= syn=sign-arvo
^+ vats
?> ?=(%writ +<.syn)
?: paused.rail.rak
=* rit u.p.syn
?. is-tracking
vats
?~ p.syn
~> %slog.0^leaf/"kiln: cancelled (1) install into {here}, retrying"
reset
=? aeon.rail.rak ?=(%w p.p.u.p.syn) ud:;;(cass:clay q.q.r.u.p.syn)
?. =(our ship.rail.rak)
~> %slog.0^leaf/"kiln: downloading update for {here}"
(emit download:pass)
~> %slog.0^leaf/"kiln: loading update for {here}"
%- take-download
:+ %clay %writ
:- ~
:+ [%v ud+aeon.rail.rak desk.rail.rak] /
:- %dome
!> .^(dome:clay %cv /(scot %p our)/[desk.rail.rak]/(scot %ud aeon.rail.rak))
=? rail.rak ?=(%w p.p.rit) `%*(. ral aeon ud:;;(cass:clay q.q.r.rit))
~> %slog.0^leaf/"kiln: downloading update for {here}"
(emit download:pass)
::
++ take-download
|= syn=sign-arvo
^+ vats
?> ?=(%writ +<.syn)
?: paused.rail.rak
?. is-tracking
vats
?~ p.syn
~> %slog.0^leaf/"kiln: cancelled (2) install into {here}, retrying"
reset
~> %slog.0^leaf/"kiln: finished downloading update for {here}"
=/ old-weft `weft`[%zuse zuse]
=/ new-weft (read-kelvin-foreign [ship desk aeon]:rail.rak)
=/ new-weft (read-kelvin-foreign [ship desk aeon]:ral)
=? vats liv.rein.rak
=/ bill (read-bill-foreign [ship desk aeon]:rail.rak)
=/ bill (read-bill-foreign [ship desk aeon]:ral)
=/ wan (sy (get-apps-want bill rein.rak))
=/ hav (sy (get-apps-live our loc now))
=/ ded ~(tap in (~(dif in hav) wan))
(stop-dudes ded)
=. aeon.rail.rak +(aeon.rail.rak)
=. rail.rak `%*(. ral aeon +(aeon:ral))
|^ ^+ vats
?: =(%base loc)
do-base
@ -697,22 +735,24 @@
^+ vats
~> %slog.0^leaf/"kiln: future version {<new-weft>}, enqueueing"
:: retry upgrade if not blocked anymore
=/ base=arak (~(got by ark) %base)
=. next.rak (snoc next.rak [(dec aeon.rail.rak) new-weft])
=. rail.rak `%*(. ral next (snoc next:ral [(dec aeon:ral) new-weft]))
=. ark (~(put by ark) loc rak)
=/ =diff [%block loc rak new-weft blockers=(sy %base ~)]
=. vats (emil sync-ud:pass (diff:give diff) ~)
?. &(?=(^ next.base) =(~ (get-blockers weft.i.next.base)))
=/ base=arak (~(got by ark) %base)
?~ rail.base
vats
=/ rel u.rail.base
?. &(?=(^ next.rel) =(~ (get-blockers weft.i.next.rel)))
vats
~> %slog.0^leaf/"kiln: unblocked system update, updating"
=. kiln
(bump-one weft.i.next.base %base)
=. kiln (bump-one weft.i.next.rel %base)
vats
::
++ kelvin-same
^+ vats
~> %slog.0^leaf/"kiln: merging into {here}"
=. next.rak +:(crank-next %& (dec aeon.rail.rak))
=. rail.rak +:(crank-next %& (dec aeon:ral))
(emil ~[merge-main sync-ud]:pass)
::
++ do-base
@ -724,17 +764,36 @@
::
?. =(~ blockers)
~> %slog.0^leaf/"kiln: OTA blocked on {<blockers>}"
=. next.rak (snoc next.rak [(dec aeon.rail.rak) new-weft])
=. rail.rak `%*(. ral next (snoc next:ral [(dec aeon:ral) new-weft]))
=/ =diff [%block loc rak new-weft blockers]
(emil sync-ud:pass (diff:give diff) ~)
~> %slog.0^leaf/"kiln: applying OTA to {here}, kelvin: {<new-weft>}"
=. next.rak +:(crank-next %& (dec aeon.rail.rak))
=. rail.rak +:(crank-next %& (dec aeon:ral))
=. wef
?: =(old-weft new-weft) ~
`new-weft
(emil ~[merge-main sync-ud]:pass)
--
::
++ take-listen
|= syn=sign-arvo
^+ vats
?> ?=([@ %writ ~ *] syn)
=. vats (emit listen:pass)
take-commit
::
++ take-commit
^+ vats
~> %slog.0^leaf/"kiln: commit detected at {here}"
=. vats (emit (diff:give %commit loc rak))
=? vats liv.rein.rak
(update-running-apps (get-apps-diff our loc now rein.rak))
?. =(%base loc)
vats
=/ kel [- +]:weft:(head next:ral)
~> %slog.0^leaf/"kiln: merging %base into %kids at {<kel>}"
(emit merge-kids:pass)
::
++ take-merge-main
|= syn=sign-arvo
^+ vats
@ -749,14 +808,7 @@
%- (slog leaf/- p.p.syn)
=. vats (emit (diff:give %merge-fail loc rak p.p.syn))
vats
~> %slog.0^leaf/"kiln: merge into {here} succeeded"
=. vats (emit (diff:give %merge loc rak))
=? vats liv.rein.rak
(update-running-apps (get-apps-diff our loc now rein.rak))
?. =(%base loc)
vats
~> %slog.0^leaf/"kiln: bumping {<zuse>}" :: TODO print next
(emit merge-kids:pass)
take-commit
::
++ take-merge-kids
|= syn=sign-arvo
@ -768,7 +820,7 @@
reset
?- -.p.syn
%& ~> %slog.0^leaf/"kiln: OTA to %kids succeeded"
(emit (diff:give %merge %kids rak))
(emit (diff:give %commit %kids rak))
%| ~> %slog.0^leaf/"kiln: OTA to %kids failed {<p.p.syn>}"
(emit (diff:give %merge-fail %kids rak p.p.syn))
==
@ -797,20 +849,27 @@
|= daz=(list dude)
~> %slog.0^leaf/"kiln: stopping {<daz>}"
(emil `(list card:agent:gall)`(zing (turn daz stop-dude:pass)))
:: +crank-next: pop stale items from .next.rak until one matches
:: +crank-next: pop stale items from .next until one matches
::
++ crank-next
|= new=(each aeon weft)
^+ [match=*(unit rung) next.rak]
=/ rog next.rak
|- ^+ [match=*(unit rung) next.rak]
?~ rog [~ next.rak]
^+ [match=*(unit rung) rail.rak]
?~ rail.rak !!
=/ rog next.u.rail.rak
=- [match `u.rail.rak(next next)]
|- ^- [match=(unit rung) next=(list rung)]
?~ rog [~ next.u.rail.rak]
?: ?- -.new
%& =(p.new aeon.i.rog)
%| =(p.new weft.i.rog)
==
[`i.rog t.rog]
$(rog t.rog)
::
++ is-tracking
^- ?
?~ rail.rak |
!paused.u.rail.rak
--
:: +get-blockers: find desks that would block a kernel update
::
@ -824,7 +883,9 @@
~
?. liv.rein.arak
~
?: (lien next.arak |=([* k=weft] =(k kel)))
?~ rail.arak
`desk
?: (lien next.u.rail.arak |=([* k=weft] =(k kel)))
~
`desk
:: +get-germ: select merge strategy into local desk
@ -892,7 +953,9 @@
=/ =arak
(~(got by ark) %base)
=/ kel=weft
?~(next.arak zuse+zuse weft.i.next.arak)
?~ rail.arak zuse+zuse
?~ next.u.rail.arak zuse+zuse
weft.i.next.u.rail.arak
abet:(bump:vats kel except force)
::
++ poke-cancel

View File

@ -7,7 +7,7 @@
+$ diff
$% [%block =desk =arak =weft blockers=(set desk)]
[%reset =desk =arak]
[%merge =desk =arak]
[%commit =desk =arak]
[%merge-sunk =desk =arak =tang]
[%merge-fail =desk =arak =tang]
[%suspend =desk =arak]
@ -16,28 +16,33 @@
:: $arak: foreign vat tracker
::
:: .rail: upstream tracking state, if any
:: .next: list of pending commits with future kelvins
:: .rein: configuration for agents
::
+$ arak
$: =rail
next=(list rung)
$: rail=(unit rail)
=rein
==
:: $rail: upstream tracking state
::
:: .paused: is tracking paused? or live
:: .ship: upstream ship (could be .our)
:: .desk: name of upstream desk
:: .aeon: next aeon to pull from upstream
:: .next: list of pending commits with future kelvins
::
+$ rail
$: paused=?
=ship
=desk
=aeon
next=(list rung)
==
:: $rung: reference to upstream commit
::
+$ rung [=aeon =weft]
:: $rein: diff from desk manifest
::
:: .liv: suspended?
:: .liv: suspended? if suspended, no agents should run
:: .add: agents not in manifest that should be running
:: .sub: agents in manifest that should not be running
::
@ -65,23 +70,25 @@
=+ .^(=weft %cx /(scot %p our)/[desk]/(scot %da now)/sys/kelvin)
:+ %rose ["" "{<desk>}" "::"]
^- tang
=- ?: =(~ next.arak) -
%+ snoc -
leaf/"pending: {<(turn next.arak |=([@ lal=@tas num=@] [lal num]))>}"
^- tang
=/ meb (mergebase-hashes our desk now arak)
=/ poz ?:(paused.rail.arak "paused" "tracking")
=/ poz
?~ rail.arak "local"
?:(paused.u.rail.arak "paused" "tracking")
=/ sat ?:(liv.rein.arak "running" "suspended")
=/ pen
?~ rail.arak "~"
<(turn next.u.rail.arak |=([@ lal=@tas num=@] [lal num]))>
:~ leaf/"/sys/kelvin: {<[lal num]:weft>}"
leaf/"base hash: {?.(=(1 (lent meb)) <meb> <(head meb)>)}"
leaf/"%cz hash: {<hash>}"
leaf/"updates: {sat}"
leaf/"source ship: {<ship.rail.arak>}"
leaf/"source desk: {<desk.rail.arak>}"
leaf/"source aeon: {<aeon.rail.arak>}"
leaf/"source ship: {?~(rail.arak <~> <ship.u.rail.arak>)}"
leaf/"source desk: {?~(rail.arak <~> <desk.u.rail.arak>)}"
leaf/"source aeon: {?~(rail.arak <~> <aeon.u.rail.arak>)}"
leaf/"agent status: {sat}"
leaf/"force on: {?:(=(~ add.rein.arak) "~" <add.rein.arak>)}"
leaf/"force off: {?:(=(~ sub.rein.arak) "~" <sub.rein.arak>)}"
leaf/"pending: {pen}"
==
:: +read-kelvin-foreign: read /sys/kelvin from a foreign desk
::
@ -109,7 +116,7 @@
|= [our=ship =desk now=@da]
^- (unit weft)
=/ pax (en-beam [our desk da+now] /sys/kelvin)
?~ =<(fil .^(arch cy/pax))
?. .^(? cu/pax)
~
[~ .^(weft cx/pax)]
:: +read-bill-foreign: read /desk/bill from a foreign desk
@ -137,7 +144,7 @@
++ read-bill
|= [our=ship =desk now=@da]
=/ pax (en-beam [our desk da+now] /desk/bill)
?~ =<(fil .^(arch cy/pax))
?. .^(? cu/pax)
*bill
.^(bill cx/pax)
:: +is-fish: should dill link .dude?
@ -179,10 +186,11 @@
::
++ mergebase-hashes
|= [our=@p =desk now=@da =arak]
=/ her (scot %p ship.rail.arak)
?> ?=(^ rail.arak)
=/ her (scot %p ship.u.rail.arak)
=/ ego (scot %p our)
=/ wen (scot %da now)
%+ turn .^((list tako) %cs ~[ego desk wen %base her desk.rail.arak])
%+ turn .^((list tako) %cs ~[ego desk wen %base her desk.u.rail.arak])
|=(=tako .^(@uv %cs ~[ego desk wen %hash (scot %uv tako)]))
::
++ enjs
@ -241,12 +249,18 @@
++ arak
|= a=^arak
%- pairs
:~ ship+s+(scot %p ship.rail.a)
desk+s+desk.rail.a
paused+b+paused.rail.a
aeon+(numb aeon.rail.a)
next+a+(turn next.a rung)
:~ rail+?~(rail.a ~ (rail u.rail.a))
rein+(rein rein.a)
==
::
++ rail
|= r=^rail
%- pairs
:~ ship+s+(scot %p ship.r)
desk+s+desk.r
paused+b+paused.r
aeon+(numb aeon.r)
next+a+(turn next.r rung)
==
--
--