king: Pier cleanup pass.

This commit is contained in:
~siprel 2020-06-06 23:34:27 +00:00
parent 648b0743c8
commit 95df4b0764
9 changed files with 180 additions and 182 deletions

View File

@ -383,7 +383,7 @@ testPill pax showPil showSeq = do
pill <- fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure
logTrace "Using pill to generate boot sequence."
bootSeq <- generateBootSeq (Ship 0) pill False (Fake $ Ship 0)
bootSeq <- genBootSeq (Ship 0) pill False (Fake (Ship 0))
logTrace "Validate jam/cue and toNoun/fromNoun on pill value"
reJam <- validateNounVal pill

View File

@ -118,7 +118,7 @@ ames
-> Bool
-> (EvErr -> STM ())
-> (Text -> RIO e ())
-> ([EvErr], RAcquire e (EffCb e NewtEf))
-> ([EvErr], RAcquire e (NewtEf -> IO ()))
ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
king = fromIntegral (env ^. kingIdL)
@ -126,7 +126,7 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runAmes :: RAcquire e (EffCb e NewtEf)
runAmes :: RAcquire e (NewtEf -> IO ())
runAmes = do
mode <- rio (netMode isFake)
drv <- mkRAcquire start stop
@ -153,8 +153,8 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
rsKill aResolvr
cancel aRecvTid
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> RIO e ()
handleEffect drv@AmesDrv {..} mode = \case
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO ()
handleEffect drv@AmesDrv {..} mode = runRIO env . \case
NewtEfTurf (_id, ()) turfs -> do
atomically $ writeTVar aTurfs (Just turfs)

View File

@ -34,7 +34,7 @@ wakeErr :: WorkError -> IO ()
wakeErr _ = pure ()
:: HasKingId e => e -> (EvErr -> STM ()) -> ([EvErr], Acquire (EffCb e BehnEf))
:: HasKingId e => e -> (EvErr -> STM ()) -> ([EvErr], Acquire (BehnEf -> IO ()))
behn env enqueueEv =
(initialEvents, runBehn)
@ -42,10 +42,10 @@ behn env enqueueEv =
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runBehn :: Acquire (EffCb e BehnEf)
runBehn :: Acquire (BehnEf -> IO ())
runBehn = do
tim <- mkAcquire Timer.init Timer.stop
pure (handleEf tim)
pure (runRIO env . handleEf tim)
handleEf :: Timer -> BehnEf -> RIO e ()
handleEf b = io . \case

View File

@ -122,7 +122,7 @@ clay
. (HasPierConfig e, HasLogFunc e, HasKingId e)
=> e
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e SyncEf))
-> ([EvErr], RAcquire e (SyncEf -> IO ()))
clay env plan =
(initialEvents, runSync)
@ -134,15 +134,15 @@ clay env plan =
-- specified directory and shove it into an %into event.
initialEvents = [EvErr boatEv (boatFailed env)]
runSync :: RAcquire e (EffCb e SyncEf)
runSync :: RAcquire e (SyncEf -> IO ())
runSync = handleEffect <$> mkRAcquire start stop
start :: RIO e ClayDrv
start = ClayDrv <$> newTVarIO mempty
stop c = pure ()
handleEffect :: ClayDrv -> SyncEf -> RIO e ()
handleEffect cd = \case
handleEffect :: ClayDrv -> SyncEf -> IO ()
handleEffect cd = runRIO env . \case
SyncEfHill _ mountPoints -> do
logDebug $ displayShow ("(clay) known mount points:", mountPoints)
pierPath <- view pierPathL

View File

@ -287,7 +287,7 @@ eyre
-> Ship
-> (EvErr -> STM ())
-> Bool
-> ([EvErr], RAcquire e (EffCb e HttpServerEf))
-> ([EvErr], RAcquire e (HttpServerEf -> IO ()))
eyre env multi who plan isFake = (initialEvents, runHttpServer)
king = fromIntegral (env ^. kingIdL)
@ -295,7 +295,7 @@ eyre env multi who plan isFake = (initialEvents, runHttpServer)
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runHttpServer :: RAcquire e (EffCb e HttpServerEf)
runHttpServer :: RAcquire e (HttpServerEf -> IO ())
runHttpServer = handleEf <$> mkRAcquire
(Drv <$> newMVar Nothing)
(\(Drv v) -> stopService v kill >>= fromEither)
@ -318,8 +318,8 @@ eyre env multi who plan isFake = (initialEvents, runHttpServer)
liveFailed _ = pure ()
handleEf :: Drv -> HttpServerEf -> RIO e ()
handleEf drv = \case
handleEf :: Drv -> HttpServerEf -> IO ()
handleEf drv = runRIO env . \case
HSESetConfig (i, ()) conf -> do
logDebug (displayShow ("EYRE", "%set-config"))
Serv {..} <- restart drv conf

View File

@ -66,7 +66,7 @@ client
. (HasLogFunc e, HasKingId e)
=> e
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e HttpClientEf))
-> ([EvErr], RAcquire e (HttpClientEf -> IO ()))
client env plan = (initialEvents, runHttpClient)
kingId = view (kingIdL . to fromIntegral) env
@ -74,7 +74,7 @@ client env plan = (initialEvents, runHttpClient)
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv kingId) (bornFailed env)]
runHttpClient :: RAcquire e (EffCb e HttpClientEf)
runHttpClient :: RAcquire e (HttpClientEf -> IO ())
runHttpClient = handleEffect <$> mkRAcquire start stop
start :: RIO e (HttpClientDrv)
@ -88,10 +88,10 @@ client env plan = (initialEvents, runHttpClient)
liveThreads <- atomically $ readTVar hcdLive
mapM_ cancel liveThreads
handleEffect :: HttpClientDrv -> HttpClientEf -> RIO e ()
handleEffect :: HttpClientDrv -> HttpClientEf -> IO ()
handleEffect drv = \case
HCERequest _ id req -> newReq drv id req
HCECancelRequest _ id -> cancelReq drv id
HCERequest _ id req -> runRIO env (newReq drv id req)
HCECancelRequest _ id -> runRIO env (cancelReq drv id)
newReq :: HttpClientDrv -> ReqId -> HttpClientReq -> RIO e ()
newReq drv id req = do

View File

@ -12,7 +12,7 @@ module Urbit.Vere.Pier
, pier
, runPersist
, runCompute
, generateBootSeq
, genBootSeq
@ -49,7 +49,7 @@ import qualified Urbit.Vere.Term.Demux as Term
import qualified Urbit.Vere.Term.Render as Term
-- Initialize pier directory. --------------------------------------------------
setupPierDirectory :: FilePath -> RIO e ()
setupPierDirectory shipPath = do
@ -64,8 +64,8 @@ setupPierDirectory shipPath = do
genEntropy :: RIO e Word512
genEntropy = fromIntegral . bytesAtom <$> io (Ent.getEntropy 64)
generateBootSeq :: Ship -> Pill -> Bool -> LegacyBootEvent -> RIO e BootSeq
generateBootSeq ship Pill {..} lite boot = do
genBootSeq :: Ship -> Pill -> Bool -> LegacyBootEvent -> RIO e BootSeq
genBootSeq ship Pill {..} lite boot = do
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> postKern <> pUserspaceOvums
pure $ BootSeq ident pBootFormulas ovums
@ -100,7 +100,7 @@ writeJobs log !jobs = do
jobPayload (DoWork (Work _ m d o )) = toNoun (m, d, o)
-- Boot a new ship. ------------------------------------------------------------
-- Acquire a running serf. -----------------------------------------------------
printTank :: (Text -> IO ()) -> Atom -> Tank -> IO ()
printTank f _ = io . f . unlines . fmap unTape . wash (WashCfg 0 80)
@ -125,6 +125,9 @@ runSerf vSlog pax fax = do
, scDead = pure () -- TODO: What can be done?
-- Boot a new ship. ------------------------------------------------------------
:: TVar (Text -> IO ())
-> Pill
@ -158,7 +161,7 @@ bootNewShip
-> LegacyBootEvent
-> RIO e ()
bootNewShip pill lite flags ship bootEv = do
seq@(BootSeq ident x y) <- generateBootSeq ship pill lite bootEv
seq@(BootSeq ident x y) <- genBootSeq ship pill lite bootEv
logTrace "BootSeq Computed"
pierPath <- view pierPathL
@ -203,22 +206,22 @@ resumed vSlog replayUntil flags = do
pure (serf, log)
getSnapshot :: forall e. FilePath -> Word64 -> RIO e (Maybe FilePath)
getSnapshot :: forall e . FilePath -> Word64 -> RIO e (Maybe FilePath)
getSnapshot top last = do
lastSnapshot <- lastMay <$> listReplays
pure (replayToPath <$> lastSnapshot)
replayDir = top </> ".partial-replay"
replayToPath eId = replayDir </> show eId
lastSnapshot <- lastMay <$> listReplays
pure (replayToPath <$> lastSnapshot)
replayDir = top </> ".partial-replay"
replayToPath eId = replayDir </> show eId
listReplays :: RIO e [Word64]
listReplays = do
createDirectoryIfMissing True replayDir
snapshotNums <- mapMaybe readMay <$> listDirectory replayDir
pure $ sort (filter (<= fromIntegral last) snapshotNums)
listReplays :: RIO e [Word64]
listReplays = do
createDirectoryIfMissing True replayDir
snapshotNums <- mapMaybe readMay <$> listDirectory replayDir
pure $ sort (filter (<= fromIntegral last) snapshotNums)
-- Run Pier --------------------------------------------------------------------
-- Utils for Spawning Worker Threads -------------------------------------------
acquireWorker :: HasLogFunc e => Text -> RIO e () -> RAcquire e (Async ())
acquireWorker nam act = mkRAcquire (async act) kill
@ -236,6 +239,9 @@ acquireWorkerBound nam act = mkRAcquire (asyncBound act) kill
cancel tid
logTrace ("Killed worker thread: " <> display nam)
-- Run Pier --------------------------------------------------------------------
:: (Serf, EventLog)
-> TVar (Text -> IO ())
@ -243,112 +249,106 @@ pier
-> MultiEyreApi
-> RAcquire PierEnv ()
pier (serf, log) vSlog mStart multi = do
computeQ <- newTQueueIO @_ @Serf.EvErr
persistQ <- newTQueueIO
executeQ <- newTQueueIO
saveM <- newEmptyTMVarIO
computeQ <- newTQueueIO @_ @Serf.EvErr
persistQ <- newTQueueIO
executeQ <- newTQueueIO
saveM <- newEmptyTMVarIO
kingApi <- King.kingAPI
kapi <- King.kingAPI
termApiQ <- atomically $ do
q <- newTQueue
writeTVar (King.kTermConn kingApi) (Just $ writeTQueue q)
pure q
termApiQ <- atomically $ do
q <- newTQueue
writeTVar (King.kTermConn kapi) (Just $ writeTQueue q)
pure q
(demux, muxed) <- atomically $ do
res <- Term.mkDemux
pure (res, Term.useDemux res)
-- (sz, local) <- Term.localClient
-- (waitExternalTerm, termServPort) <- Term.termServer
(demux, muxed) <- atomically $ do
res <- Term.mkDemux
-- Term.addDemux local res
pure (res, Term.useDemux res)
-- rio $ logInfo $ display $
-- "TERMSERV Terminal Server running on port: " <> tshow termServPort
acquireWorker "TERMINAL" $ forever $ do
logTrace "TERMSERV Waiting for external terminal."
atomically $ do
ext <- Term.connClient <$> readTQueue termApiQ
Term.addDemux ext demux
logTrace "TERMSERV External terminal connected."
-- Slogs go to both stderr and to the terminal.
acquireWorker "TERMSERV" $ forever $ do
logTrace "TERMSERV Waiting for external terminal."
atomically $ do
oldSlog <- readTVar vSlog
writeTVar vSlog $ \txt -> do
atomically $ Term.trace muxed txt
oldSlog txt
ext <- Term.connClient <$> readTQueue termApiQ
Term.addDemux ext demux
logTrace "TERMSERV External terminal connected."
let logId = Log.identity log
let ship = who logId
-- Slogs go to both stderr and to the terminal.
atomically $ do
oldSlog <- readTVar vSlog
writeTVar vSlog $ \txt -> do
atomically $ Term.trace muxed txt
oldSlog txt
-- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
-- add them.
let showErr = atomically . Term.trace muxed . (flip append "\r\n")
let logId = Log.identity log
let ship = who logId
-- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
-- add them.
let showErr = atomically . Term.trace muxed . (flip append "\r\n")
env <- ask
let (bootEvents, startDrivers) = drivers
(isFake logId)
(writeTQueue computeQ)
(Term.TSize { tsWide = 80, tsTall = 24 }, muxed)
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
scryM <- newEmptyTMVarIO
onKill <- view onKillPierSigL
let computeConfig = ComputeConfig { ccOnWork = readTQueue computeQ
, ccOnKill = onKill
, ccOnSave = takeTMVar saveM
, ccOnScry = takeTMVar scryM
, ccPutResult = writeTQueue persistQ
, ccShowSpinner = Term.spin muxed
, ccHideSpinner = Term.stopSpin muxed
, ccLastEvInLog = Log.lastEv log
let plan = writeTQueue executeQ
drivz <- startDrivers
tExec <- acquireWorker "Effects" (router (readTQueue executeQ) drivz)
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ plan)
tSerf <- acquireWorker "Serf" (runCompute serf computeConfig)
tSaveSignal <- saveSignalThread saveM
-- TODO bullshit scry tester
void $ acquireWorker "bullshit scry tester" $ forever $ do
env <- ask
threadDelay 1_000_000
wen <- io
let kal = \mTermNoun -> runRIO env $ do
logTrace $ displayShow ("scry result: ", mTermNoun)
let nkt = MkKnot $ tshow $ Time.MkDate wen
let pax = Path ["j", "~zod", "life", nkt, "~zod"]
atomically $ putTMVar scryM (wen, Nothing, pax, kal)
let (bootEvents, startDrivers) =
drivers env multi ship (isFake logId)
(writeTQueue computeQ)
(Term.TSize{tsWide=80, tsTall=24}, muxed)
putMVar mStart ()
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
-- Wait for something to die.
scryM <- newEmptyTMVarIO
let ded = asum
[ death "effects thread" tExec
, death "persist thread" tDisk
, death "compute thread" tSerf
onKill <- view onKillPierSigL
atomically ded >>= \case
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("Something simply exited", tag)
let computeConfig = ComputeConfig
{ ccOnWork = readTQueue computeQ
, ccOnKill = onKill
, ccOnSave = takeTMVar saveM
, ccOnScry = takeTMVar scryM
, ccPutResult = writeTQueue persistQ
, ccShowSpinner = Term.spin muxed
, ccHideSpinner = Term.stopSpin muxed
, ccLastEvInLog = Log.lastEv log
let plan = writeTQueue executeQ
drivz <- startDrivers
tExec <- acquireWorker "Effects" (router (readTQueue executeQ) drivz)
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ plan)
tSerf <- acquireWorker "Serf" (runCompute serf computeConfig)
tSaveSignal <- saveSignalThread saveM
-- bullshit scry tester
void $ acquireWorker "bullshit scry tester" $ forever $ do
env <- ask
threadDelay 1_000_000
wen <- io
let kal = \mTermNoun -> runRIO env $ do
logTrace $ displayShow ("scry result: ", mTermNoun)
let nkt = MkKnot $ tshow $ Time.MkDate wen
let pax = Path ["j", "~zod", "life", nkt, "~zod"]
atomically $ putTMVar scryM (wen, Nothing, pax, kal)
putMVar mStart ()
-- Wait for something to die.
let ded = asum [ death "effects thread" tExec
, death "persist thread" tDisk
, death "compute thread" tSerf
atomically ded >>= \case
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("Something simply exited", tag)
atomically $ (Term.spin muxed) (Just "shutdown")
atomically $ (Term.spin muxed) (Just "shutdown")
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
@ -368,14 +368,14 @@ saveSignalThread tm = mkRAcquire start cancel
-- Start All Drivers -----------------------------------------------------------
data Drivers e = Drivers
{ dAmes :: EffCb e AmesEf
, dBehn :: EffCb e BehnEf
, dHttpClient :: EffCb e HttpClientEf
, dHttpServer :: EffCb e HttpServerEf
, dNewt :: EffCb e NewtEf
, dSync :: EffCb e SyncEf
, dTerm :: EffCb e TermEf
{ dAmes :: AmesEf -> IO ()
, dBehn :: BehnEf -> IO ()
, dIris :: HttpClientEf -> IO ()
, dEyre :: HttpServerEf -> IO ()
, dNewt :: NewtEf -> IO ()
, dSync :: SyncEf -> IO ()
, dTerm :: TermEf -> IO ()
:: HasPierEnv e
@ -388,25 +388,26 @@ drivers
-> (Text -> RIO e ())
-> ([EvErr], RAcquire e (Drivers e))
drivers env multi who isFake plan termSys stderr =
(initialEvents, runDrivers) -- TODO
(behnBorn, runBehn) = behn env plan
(amesBorn, runAmes) = ames env who isFake plan stderr
(httpBorn, runHttp) = eyre env multi who plan isFake
(clayBorn, runClay) = clay env plan
(irisBorn, runIris) = client env plan
(termBorn, runTerm) = Term.term env termSys plan
initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn,
termBorn, irisBorn]
runDrivers = do
dNewt <- runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dHttpClient <- runIris
dHttpServer <- runHttp
dSync <- runClay
dTerm <- runTerm
pure (Drivers{..})
(initialEvents, runDrivers)
(behnBorn, runBehn) = behn env plan
(amesBorn, runAmes) = ames env who isFake plan stderr
(httpBorn, runEyre) = eyre env multi who plan isFake
(clayBorn, runClay) = clay env plan
(irisBorn, runIris) = client env plan
(termBorn, runTerm) = Term.term env termSys plan
initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn,
termBorn, irisBorn]
runDrivers = do
dNewt <- runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dIris <- runIris
dEyre <- runEyre
dSync <- runClay
dTerm <- runTerm
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
@ -419,19 +420,19 @@ router waitFx Drivers {..} = forever $ do
case ef of
GoodParse (EfVega _ _ ) -> error "TODO"
GoodParse (EfExit _ _ ) -> error "TODO"
GoodParse (EfVane (VEAmes ef)) -> dAmes ef
GoodParse (EfVane (VEBehn ef)) -> dBehn ef
GoodParse (EfVane (VEBoat ef)) -> dSync ef
GoodParse (EfVane (VEClay ef)) -> dSync ef
GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef
GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef
GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
GoodParse (EfVane (VEAmes ef)) -> io (dAmes ef)
GoodParse (EfVane (VEBehn ef)) -> io (dBehn ef)
GoodParse (EfVane (VEBoat ef)) -> io (dSync ef)
GoodParse (EfVane (VEClay ef)) -> io (dSync ef)
GoodParse (EfVane (VEHttpClient ef)) -> io (dIris ef)
GoodParse (EfVane (VEHttpServer ef)) -> io (dEyre ef)
GoodParse (EfVane (VENewt ef)) -> io (dNewt ef)
GoodParse (EfVane (VESync ef)) -> io (dSync ef)
GoodParse (EfVane (VETerm ef)) -> io (dTerm ef)
FailParse n -> logError $ display $ pack @Text (ppShow n)
-- Compute Thread --------------------------------------------------------------
-- Compute (Serf) Thread -------------------------------------------------------
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev = logDebug $ display $ "[EVENT]\n" <> pretty
@ -473,8 +474,9 @@ runCompute serf ComputeConfig {..} = do
void $ async $ forever (atomically (takeTMVar vEvProcessing) >>= logEvent)
let onSpin :: Maybe Ev -> STM ()
onSpin Nothing = ccHideSpinner
onSpin (Just ev) = do
onSpin = \case
Nothing -> ccHideSpinner
Just ev -> do
ccShowSpinner (getSpinnerNameForEvent ev)
putTMVar vEvProcessing ev
@ -483,7 +485,7 @@ runCompute serf ComputeConfig {..} = do
io ( serf maxBatchSize ccLastEvInLog onCR ccPutResult onSpin)
-- Persist Thread --------------------------------------------------------------
-- Event-Log Persistence Thread ------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show

View File

@ -107,14 +107,10 @@ data Order
deriveToNoun ''Order
type EffCb e a = a -> RIO e ()
type Perform = Ef -> IO ()
data IODriver = IODriver
{ bornEvent :: IO Ev
, startDriver :: (Ev -> STM ()) -> IO (Async (), Perform)
{ bornEvent :: IO Ev
, startDriver :: (Ev -> STM ()) -> IO (Async (), Ef -> IO ())
data Fact = Fact
{ factEve :: EventId

View File

@ -505,7 +505,7 @@ term :: forall e. (HasPierEnv e)
=> e
-> (T.TSize, Client)
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e TermEf))
-> ([EvErr], RAcquire e (TermEf -> IO ()))
term env (tsize, Client{..}) plan =
(initialEvents, runTerm)
@ -516,10 +516,10 @@ term env (tsize, Client{..}) plan =
, EvErr initialHail (initialHailFailed env)
runTerm :: RAcquire e (EffCb e TermEf)
runTerm :: RAcquire e (TermEf -> IO ())
runTerm = do
tim <- mkRAcquire (async readLoop) cancel
pure handleEffect
pure (runRIO env . handleEffect)
Because our terminals are always `Demux`ed, we don't have to