urbit/pkg/king/lib/Vere/Pier.hs

371 lines
12 KiB
Haskell
Raw Normal View History

2019-07-16 03:01:45 +03:00
{-# OPTIONS_GHC -Wwarn #-}
2019-08-22 02:49:08 +03:00
module Vere.Pier
( booted, resumed, pier, runPersist, runCompute, generateBootSeq
) where
2019-07-16 03:01:45 +03:00
import UrbitPrelude
import Arvo
import Vere.Pier.Types
import System.Random
import System.Directory (createDirectoryIfMissing)
import System.Posix.Files (ownerModes, setFileMode)
import Vere.Ames (ames)
import Vere.Behn (behn)
import Vere.Http.Client (client)
import Vere.Http.Server (serv)
import Vere.Log (EventLog)
2019-09-04 01:17:20 +03:00
import Vere.Serf (Serf, sStderr, SerfState(..), doJob)
import Vere.Clay (clay)
import Vere.Term
2019-07-16 03:01:45 +03:00
import qualified System.Entropy as Ent
import qualified Urbit.Time as Time
2019-07-16 03:01:45 +03:00
import qualified Vere.Log as Log
import qualified Vere.Serf as Serf
--------------------------------------------------------------------------------
_ioDrivers = [] :: [IODriver]
2019-08-28 14:45:49 +03:00
setupPierDirectory :: FilePath -> RIO e ()
2019-08-15 05:42:48 +03:00
setupPierDirectory shipPath = do
for_ ["put", "get", "log", "chk"] $ \seg -> do
2019-08-28 14:45:49 +03:00
let pax = shipPath <> "/.urb/" <> seg
io $ createDirectoryIfMissing True pax
io $ setFileMode pax ownerModes
-- Load pill into boot sequence. -----------------------------------------------
2019-07-16 03:01:45 +03:00
2019-08-28 14:45:49 +03:00
genEntropy :: RIO e Word512
genEntropy = fromIntegral . view (from atomBytes) <$> io (Ent.getEntropy 64)
2019-07-16 03:01:45 +03:00
2019-08-28 14:45:49 +03:00
generateBootSeq :: Ship -> Pill -> RIO e BootSeq
2019-07-16 03:01:45 +03:00
generateBootSeq ship Pill{..} = do
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> pUserspaceOvums
pure $ BootSeq ident pBootFormulas ovums
where
ident = LogIdentity ship True (fromIntegral $ length pBootFormulas)
preKern ent = [ EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
, EvBlip $ BlipEvArvo $ ArvoEvWack () ent
, EvBlip $ BlipEvTerm $ TermEvBoot (1,()) (Fake (who ident))
2019-07-16 03:01:45 +03:00
]
-- Write a batch of jobs into the event log ------------------------------------
2019-08-28 14:45:49 +03:00
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- Log.nextEv log
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
where
2019-08-28 14:45:49 +03:00
fromJob :: (EventId, Job) -> RIO e ByteString
fromJob (expectedId, job) = do
2019-08-28 14:45:49 +03:00
unless (expectedId == jobId job) $
error $ show ("bad job id!", expectedId, jobId job)
pure $ jamBS $ jobPayload job
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ m n)) = toNoun (m, n)
jobPayload (DoWork (Work _ m d o)) = toNoun (m, d, o)
-- Boot a new ship. ------------------------------------------------------------
2019-08-28 14:45:49 +03:00
booted :: HasLogFunc e
=> FilePath -> FilePath -> Serf.Flags -> Ship
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
booted pillPath pierPath flags ship = do
2019-08-28 14:45:49 +03:00
rio $ logTrace "LOADING PILL"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
pill <- io (loadFile pillPath >>= either throwIO pure)
2019-08-28 14:45:49 +03:00
rio $ logTrace "PILL LOADED"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
seq@(BootSeq ident x y) <- rio $ generateBootSeq ship pill
2019-08-28 14:45:49 +03:00
rio $ logTrace "BootSeq Computed"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
liftRIO (setupPierDirectory pierPath)
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Directory Setup"
2019-08-15 05:42:48 +03:00
log <- Log.new (pierPath <> "/.urb/log") ident
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Event Log Initialized"
2019-08-15 05:42:48 +03:00
serf <- Serf.run (Serf.Config pierPath flags)
2019-08-28 14:45:49 +03:00
rio $ logTrace "Serf Started"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
2019-08-28 14:45:49 +03:00
logTrace "Boot Sequence completed"
Serf.snapshot serf serfSt
2019-08-28 14:45:49 +03:00
logTrace "Snapshot taken"
writeJobs log (fromList events)
2019-08-28 14:45:49 +03:00
logTrace "Events written"
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
2019-08-28 15:22:56 +03:00
resumed :: HasLogFunc e
=> FilePath -> Serf.Flags
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
2019-07-22 00:24:07 +03:00
resumed top flags = do
log <- Log.existing (top <> "/.urb/log")
2019-07-22 00:24:07 +03:00
serf <- Serf.run (Serf.Config top flags)
2019-08-28 15:22:56 +03:00
serfSt <- rio $ Serf.replay serf log
2019-08-28 15:22:56 +03:00
rio $ Serf.snapshot serf serfSt
pure (serf, log, serfSt)
-- Run Pier --------------------------------------------------------------------
2019-08-28 14:45:49 +03:00
pier :: e. HasLogFunc e
=> FilePath
-> Maybe Port
2019-09-04 01:17:20 +03:00
-> (Serf e, EventLog, SerfState)
2019-08-28 14:45:49 +03:00
-> RAcquire e ()
pier pierPath mPort (serf, log, ss) = do
computeQ <- newTQueueIO :: RAcquire e (TQueue Ev)
2019-08-28 14:45:49 +03:00
persistQ <- newTQueueIO :: RAcquire e (TQueue (Job, FX))
executeQ <- newTQueueIO :: RAcquire e (TQueue FX)
saveM <- newEmptyTMVarIO :: RAcquire e (TMVar ())
shutdownM <- newEmptyTMVarIO :: RAcquire e (TMVar ())
let shutdownEvent = putTMVar shutdownM ()
2019-08-28 14:45:49 +03:00
inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16)
2019-09-03 21:02:54 +03:00
terminalSystem <- initializeLocalTerminal
2019-09-13 02:14:57 +03:00
swapMVar (sStderr serf) (tsStderr terminalSystem)
2019-09-04 01:17:20 +03:00
let ship = who (Log.identity log)
let (bootEvents, startDrivers) =
2019-09-10 23:34:11 +03:00
drivers pierPath inst ship mPort (writeTQueue computeQ)
shutdownEvent terminalSystem
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
tExe <- startDrivers >>= router (readTQueue executeQ)
tDisk <- runPersist log persistQ (writeTQueue executeQ)
2019-09-10 23:34:11 +03:00
tCpu <- runCompute serf ss (readTQueue computeQ) (takeTMVar saveM)
(takeTMVar shutdownM) (writeTQueue persistQ)
2019-07-20 06:00:23 +03:00
tSaveSignal <- saveSignalThread saveM
-- Wait for something to die.
2019-07-20 06:00:23 +03:00
let ded = asum [ death "effect thread" tExe
, death "persist thread" tDisk
, death "compute thread" tCpu
]
atomically ded >>= \case
2019-08-28 14:45:49 +03:00
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("something simply exited", tag)
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
waitCatchSTM tid <&> \case
Left exn -> Left (tag, exn)
Right () -> Right tag
saveSignalThread :: TMVar () -> RAcquire e (Async ())
saveSignalThread tm = mkRAcquire start cancel
where
start = async $ forever $ do
threadDelay (120 * 1000000) -- 120 seconds
atomically $ putTMVar tm ()
-- Start All Drivers -----------------------------------------------------------
2019-08-29 03:26:59 +03:00
data Drivers e = Drivers
{ dAmes :: EffCb e AmesEf
, dBehn :: EffCb e BehnEf
, dHttpClient :: EffCb e HttpClientEf
, dHttpServer :: EffCb e HttpServerEf
, dNewt :: EffCb e NewtEf
, dSync :: EffCb e SyncEf
, dTerm :: EffCb e TermEf
}
2019-08-29 03:26:59 +03:00
drivers :: HasLogFunc e
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ()) -> STM()
2019-09-04 01:17:20 +03:00
-> TerminalSystem e
2019-08-29 03:26:59 +03:00
-> ([Ev], RAcquire e (Drivers e))
drivers pierPath inst who mPort plan shutdownSTM termSys =
(initialEvents, runDrivers)
where
(behnBorn, runBehn) = behn inst plan
(amesBorn, runAmes) = ames inst who mPort plan
(httpBorn, runHttp) = serv pierPath inst plan
(clayBorn, runClay) = clay pierPath inst plan
(irisBorn, runIris) = client inst plan
(termBorn, runTerm) = term termSys shutdownSTM pierPath inst plan
initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn,
termBorn, irisBorn]
runDrivers = do
2019-08-29 03:26:59 +03:00
dNewt <- liftAcquire $ runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dHttpClient <- runIris
dHttpServer <- runHttp
dSync <- runClay
2019-09-03 21:02:54 +03:00
dTerm <- runTerm
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
2019-08-29 03:26:59 +03:00
router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ())
2019-08-28 14:45:49 +03:00
router waitFx Drivers{..} =
mkRAcquire start cancel
where
start = async $ forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
2019-08-28 14:45:49 +03:00
logEffect ef
case ef of
GoodParse (EfVega _ _) -> error "TODO"
GoodParse (EfExit _ _) -> error "TODO"
GoodParse (EfVane (VEAmes ef)) -> dAmes ef
GoodParse (EfVane (VEBehn ef)) -> dBehn ef
GoodParse (EfVane (VEBoat ef)) -> dSync ef
GoodParse (EfVane (VEClay ef)) -> dSync ef
GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef
GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef
GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
2019-08-28 14:45:49 +03:00
FailParse n -> logError
$ display
$ pack @Text (ppShow n)
-- Compute Thread --------------------------------------------------------------
data ComputeRequest
= CREvent Ev
| CRSave ()
| CRShutdown ()
deriving (Eq, Show)
2019-08-28 14:45:49 +03:00
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev =
logDebug $ display $ "[EVENT]\n" <> pretty
where
pretty :: Text
pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev
logEffect :: HasLogFunc e => Lenient Ef -> RIO e ()
logEffect ef =
logDebug $ display $ "[EFFECT]\n" <> pretty ef
where
pretty :: Lenient Ef -> Text
pretty = \case
GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
=> Serf e
-> SerfState
-> STM Ev
-> STM ()
-> STM ()
-> ((Job, FX) -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runCompute serf ss getEvent getSaveSignal getShutdownSignal putResult =
2019-08-28 14:45:49 +03:00
mkRAcquire (async (go ss)) cancel
where
2019-08-28 14:45:49 +03:00
go :: SerfState -> RIO e ()
go ss = do
cr <- atomically $
2019-09-10 23:34:11 +03:00
CRShutdown <$> getShutdownSignal <|>
CRSave <$> getSaveSignal <|>
2019-09-10 23:34:11 +03:00
CREvent <$> getEvent
case cr of
CREvent ev -> do
logEvent ev
wen <- io Time.now
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically (putResult (job', fx))
go ss'
CRSave () -> do
logDebug $ "Taking periodic snapshot"
Serf.snapshot serf ss
go ss
CRShutdown () -> do
-- When shutting down, we first request a snapshot, and then we
-- just exit this recursive processing, which will cause the serf
-- to exit from its RAcquire.
logDebug $ "Shutting down compute system..."
Serf.snapshot serf ss
pure ()
2019-07-20 06:00:23 +03:00
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
]
runPersist :: EventLog
-> TQueue (Job, FX)
-> (FX -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runPersist log inpQ out =
2019-08-28 14:45:49 +03:00
mkRAcquire runThread cancelWait
2019-07-20 06:00:23 +03:00
where
2019-08-28 14:45:49 +03:00
cancelWait :: Async () -> RIO e ()
2019-07-20 06:00:23 +03:00
cancelWait tid = cancel tid >> wait tid
2019-08-28 14:45:49 +03:00
runThread :: RIO e (Async ())
2019-07-20 06:00:23 +03:00
runThread = asyncBound $ forever $ do
writs <- atomically getBatchFromQueue
events <- validateJobsAndGetBytes (toNullable writs)
2019-07-20 06:00:23 +03:00
Log.appendEvents log events
atomically $ for_ writs $ \(_,fx) -> out fx
2019-07-20 06:00:23 +03:00
2019-08-28 14:45:49 +03:00
validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString)
validateJobsAndGetBytes writs = do
2019-07-20 06:00:23 +03:00
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, (j, fx)) -> do
unless (expectedId == jobId j) $
throwIO (BadEventId expectedId (jobId j))
case j of
RunNok _ ->
error "This shouldn't happen here!"
DoWork (Work eId mug wen ev) ->
pure $ jamBS $ toNoun (mug, wen, ev)
getBatchFromQueue :: STM (NonNull [(Job, FX)])
2019-07-20 06:00:23 +03:00
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
where
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)