shrub/pkg/king/lib/Vere/Pier.hs

326 lines
11 KiB
Haskell
Raw Normal View History

2019-07-16 03:01:45 +03:00
{-# OPTIONS_GHC -Wwarn #-}
2019-08-22 02:49:08 +03:00
module Vere.Pier
( booted, resumed, pier, runPersist, runCompute, generateBootSeq
) where
2019-07-16 03:01:45 +03:00
import UrbitPrelude
import Arvo
import Vere.Pier.Types
import System.Random
import System.Directory (createDirectoryIfMissing)
import System.Posix.Files (ownerModes, setFileMode)
import Vere.Ames (ames)
import Vere.Behn (behn)
import Vere.Http.Server (serv)
import Vere.Log (EventLog)
2019-09-04 01:17:20 +03:00
import Vere.Serf (Serf, sStderr, SerfState(..), doJob)
import Vere.Term
2019-07-16 03:01:45 +03:00
import qualified System.Entropy as Ent
import qualified Urbit.Time as Time
2019-07-16 03:01:45 +03:00
import qualified Vere.Log as Log
import qualified Vere.Serf as Serf
--------------------------------------------------------------------------------
_ioDrivers = [] :: [IODriver]
2019-08-28 14:45:49 +03:00
setupPierDirectory :: FilePath -> RIO e ()
2019-08-15 05:42:48 +03:00
setupPierDirectory shipPath = do
for_ ["put", "get", "log", "chk"] $ \seg -> do
2019-08-28 14:45:49 +03:00
let pax = shipPath <> "/.urb/" <> seg
io $ createDirectoryIfMissing True pax
io $ setFileMode pax ownerModes
-- Load pill into boot sequence. -----------------------------------------------
2019-07-16 03:01:45 +03:00
2019-08-28 14:45:49 +03:00
genEntropy :: RIO e Word512
genEntropy = fromIntegral . view (from atomBytes) <$> io (Ent.getEntropy 64)
2019-07-16 03:01:45 +03:00
2019-08-28 14:45:49 +03:00
generateBootSeq :: Ship -> Pill -> RIO e BootSeq
2019-07-16 03:01:45 +03:00
generateBootSeq ship Pill{..} = do
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> pUserspaceOvums
pure $ BootSeq ident pBootFormulas ovums
where
ident = LogIdentity ship True (fromIntegral $ length pBootFormulas)
preKern ent = [ EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
, EvBlip $ BlipEvArvo $ ArvoEvWack () ent
, EvBlip $ BlipEvTerm $ TermEvBoot (1,()) (Fake (who ident))
2019-07-16 03:01:45 +03:00
]
-- Write a batch of jobs into the event log ------------------------------------
2019-08-28 14:45:49 +03:00
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- Log.nextEv log
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
where
2019-08-28 14:45:49 +03:00
fromJob :: (EventId, Job) -> RIO e ByteString
fromJob (expectedId, job) = do
2019-08-28 14:45:49 +03:00
unless (expectedId == jobId job) $
error $ show ("bad job id!", expectedId, jobId job)
pure $ jamBS $ jobPayload job
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ m n)) = toNoun (m, n)
jobPayload (DoWork (Work _ m d o)) = toNoun (m, d, o)
-- Boot a new ship. ------------------------------------------------------------
2019-08-28 14:45:49 +03:00
booted :: HasLogFunc e
=> FilePath -> FilePath -> Serf.Flags -> Ship
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
booted pillPath pierPath flags ship = do
2019-08-28 14:45:49 +03:00
rio $ logTrace "LOADING PILL"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
pill <- io (loadFile pillPath >>= either throwIO pure)
2019-08-28 14:45:49 +03:00
rio $ logTrace "PILL LOADED"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
seq@(BootSeq ident x y) <- rio $ generateBootSeq ship pill
2019-08-28 14:45:49 +03:00
rio $ logTrace "BootSeq Computed"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
liftRIO (setupPierDirectory pierPath)
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Directory Setup"
2019-08-15 05:42:48 +03:00
log <- Log.new (pierPath <> "/.urb/log") ident
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Event Log Initialized"
2019-08-15 05:42:48 +03:00
serf <- Serf.run (Serf.Config pierPath flags)
2019-08-28 14:45:49 +03:00
rio $ logTrace "Serf Started"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
2019-08-28 14:45:49 +03:00
logTrace "Boot Sequence completed"
Serf.snapshot serf serfSt
2019-08-28 14:45:49 +03:00
logTrace "Snapshot taken"
writeJobs log (fromList events)
2019-08-28 14:45:49 +03:00
logTrace "Events written"
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
2019-08-28 15:22:56 +03:00
resumed :: HasLogFunc e
=> FilePath -> Serf.Flags
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
2019-07-22 00:24:07 +03:00
resumed top flags = do
log <- Log.existing (top <> "/.urb/log")
2019-07-22 00:24:07 +03:00
serf <- Serf.run (Serf.Config top flags)
2019-08-28 15:22:56 +03:00
serfSt <- rio $ Serf.replay serf log
2019-08-28 15:22:56 +03:00
rio $ Serf.snapshot serf serfSt
pure (serf, log, serfSt)
-- Run Pier --------------------------------------------------------------------
2019-08-28 14:45:49 +03:00
pier :: e. HasLogFunc e
=> FilePath
-> Maybe Port
2019-09-04 01:17:20 +03:00
-> (Serf e, EventLog, SerfState)
2019-08-28 14:45:49 +03:00
-> RAcquire e ()
pier pierPath mPort (serf, log, ss) = do
2019-08-28 14:45:49 +03:00
computeQ <- newTQueueIO :: RAcquire e (TQueue Ev)
persistQ <- newTQueueIO :: RAcquire e (TQueue (Job, FX))
executeQ <- newTQueueIO :: RAcquire e (TQueue FX)
2019-08-28 14:45:49 +03:00
inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16)
2019-09-03 21:02:54 +03:00
terminalSystem <- initializeLocalTerminal
2019-09-04 01:17:20 +03:00
serf <- pure serf { sStderr = (tsStderr terminalSystem) }
let ship = who (Log.identity log)
let (bootEvents, startDrivers) =
drivers pierPath inst ship mPort (writeTQueue computeQ) terminalSystem
2019-08-28 14:45:49 +03:00
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
tExe <- startDrivers >>= router (readTQueue executeQ)
tDisk <- runPersist log persistQ (writeTQueue executeQ)
tCpu <- runCompute serf ss (readTQueue computeQ) (writeTQueue persistQ)
2019-07-20 06:00:23 +03:00
-- Wait for something to die.
2019-07-20 06:00:23 +03:00
let ded = asum [ death "effect thread" tExe
, death "persist thread" tDisk
, death "compute thread" tCpu
, death "terminal thread" (tsReaderThread terminalSystem)
]
atomically ded >>= \case
2019-08-28 14:45:49 +03:00
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("something simply exited", tag)
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
waitCatchSTM tid <&> \case
Left exn -> Left (tag, exn)
Right () -> Right tag
-- Start All Drivers -----------------------------------------------------------
2019-08-29 03:26:59 +03:00
data Drivers e = Drivers
{ dAmes :: EffCb e AmesEf
, dBehn :: EffCb e BehnEf
, dHttpClient :: EffCb e HttpClientEf
, dHttpServer :: EffCb e HttpServerEf
, dNewt :: EffCb e NewtEf
, dSync :: EffCb e SyncEf
, dTerm :: EffCb e TermEf
}
2019-08-29 03:26:59 +03:00
drivers :: HasLogFunc e
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ())
2019-09-04 01:17:20 +03:00
-> TerminalSystem e
2019-08-29 03:26:59 +03:00
-> ([Ev], RAcquire e (Drivers e))
drivers pierPath inst who mPort plan termSys =
(initialEvents, runDrivers)
where
(behnBorn, runBehn) = behn inst plan
(amesBorn, runAmes) = ames inst who mPort plan
(httpBorn, runHttp) = serv pierPath inst plan
2019-08-30 23:25:50 +03:00
(termBorn, runTerm) = term termSys pierPath inst plan
initialEvents = mconcat [behnBorn, amesBorn, httpBorn, termBorn]
runDrivers = do
2019-08-29 03:26:59 +03:00
dNewt <- liftAcquire $ runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dHttpClient <- pure $ const $ pure ()
dHttpServer <- runHttp
dSync <- pure $ const $ pure ()
2019-09-03 21:02:54 +03:00
dTerm <- runTerm
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
2019-08-29 03:26:59 +03:00
router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ())
2019-08-28 14:45:49 +03:00
router waitFx Drivers{..} =
mkRAcquire start cancel
where
start = async $ forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
2019-08-28 14:45:49 +03:00
logEffect ef
case ef of
GoodParse (EfVega _ _) -> error "TODO"
GoodParse (EfExit _ _) -> error "TODO"
GoodParse (EfVane (VEAmes ef)) -> dAmes ef
GoodParse (EfVane (VEBehn ef)) -> dBehn ef
GoodParse (EfVane (VEBoat ef)) -> dSync ef
GoodParse (EfVane (VEClay ef)) -> dSync ef
GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef
GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef
GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
2019-08-28 14:45:49 +03:00
FailParse n -> logError
$ display
$ pack @Text (ppShow n)
-- Compute Thread --------------------------------------------------------------
2019-08-28 14:45:49 +03:00
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev =
logDebug $ display $ "[EVENT]\n" <> pretty
where
pretty :: Text
pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev
logEffect :: HasLogFunc e => Lenient Ef -> RIO e ()
logEffect ef =
logDebug $ display $ "[EFFECT]\n" <> pretty ef
where
pretty :: Lenient Ef -> Text
pretty = \case
GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
2019-09-04 01:17:20 +03:00
=> Serf e -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runCompute serf ss getEvent putResult =
2019-08-28 14:45:49 +03:00
mkRAcquire (async (go ss)) cancel
where
2019-08-28 14:45:49 +03:00
go :: SerfState -> RIO e ()
go ss = do
ev <- atomically getEvent
2019-08-28 14:45:49 +03:00
logEvent ev
wen <- io Time.now
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
2019-08-28 15:22:56 +03:00
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically (putResult (job', fx))
go ss'
2019-07-20 06:00:23 +03:00
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
]
runPersist :: EventLog
-> TQueue (Job, FX)
-> (FX -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runPersist log inpQ out =
2019-08-28 14:45:49 +03:00
mkRAcquire runThread cancelWait
2019-07-20 06:00:23 +03:00
where
2019-08-28 14:45:49 +03:00
cancelWait :: Async () -> RIO e ()
2019-07-20 06:00:23 +03:00
cancelWait tid = cancel tid >> wait tid
2019-08-28 14:45:49 +03:00
runThread :: RIO e (Async ())
2019-07-20 06:00:23 +03:00
runThread = asyncBound $ forever $ do
writs <- atomically getBatchFromQueue
events <- validateJobsAndGetBytes (toNullable writs)
2019-07-20 06:00:23 +03:00
Log.appendEvents log events
atomically $ for_ writs $ \(_,fx) -> out fx
2019-07-20 06:00:23 +03:00
2019-08-28 14:45:49 +03:00
validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString)
validateJobsAndGetBytes writs = do
2019-07-20 06:00:23 +03:00
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, (j, fx)) -> do
unless (expectedId == jobId j) $
throwIO (BadEventId expectedId (jobId j))
case j of
RunNok _ ->
error "This shouldn't happen here!"
DoWork (Work eId mug wen ev) ->
pure $ jamBS $ toNoun (mug, wen, ev)
getBatchFromQueue :: STM (NonNull [(Job, FX)])
2019-07-20 06:00:23 +03:00
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
where
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)