2019-07-16 03:01:45 +03:00
{-# OPTIONS_GHC -Wwarn #-}
2019-07-21 22:56:18 +03:00
module Vere.Pier (booted, resumed, runPersist, runCompute) where
2019-05-30 23:19:26 +03:00
2019-07-19 03:52:53 +03:00
import Data.Acquire
2019-07-16 03:01:45 +03:00
import UrbitPrelude
2019-07-21 22:56:18 +03:00
import Vere.Ovum
import Vere.FX
2019-05-30 23:19:26 +03:00
import Vere.Pier.Types
2019-07-19 03:52:53 +03:00
2019-07-21 04:29:39 +03:00
import System.Directory (createDirectoryIfMissing)
import System.Posix.Files (ownerModes, setFileMode)
import Vere.Log (EventLog)
import Vere.Serf (Serf, SerfState(..))
2019-06-19 01:38:24 +03:00
2019-07-16 03:01:45 +03:00
import qualified System.Entropy as Ent
2019-07-21 22:56:18 +03:00
import qualified Urbit.Time as Time
2019-07-16 03:01:45 +03:00
import qualified Vere.Log as Log
import qualified Vere.Serf as Serf
2019-05-30 23:19:26 +03:00
2019-06-18 02:47:20 +03:00
2019-06-25 04:10:41 +03:00
2019-06-18 02:47:20 +03:00
2019-07-21 22:56:18 +03:00
_ioDrivers = [] :: [IODriver]
2019-06-18 02:47:20 +03:00
2019-07-21 22:56:18 +03:00
_setupPierDirectory :: FilePath -> IO ()
_setupPierDirectory shipPath = do
2019-07-21 04:29:39 +03:00
for_ ["put", "get", "log", "chk"] $ \seg -> do
let pax = shipPath <> "/.urb/" <> seg
createDirectoryIfMissing True pax
setFileMode pax ownerModes
2019-06-18 02:47:20 +03:00
2019-07-21 22:56:18 +03:00
-- Load pill into boot sequence. -----------------------------------------------
2019-07-16 03:01:45 +03:00
genEntropy :: IO Word512
genEntropy = fromIntegral . view (from atomBytes) <$> Ent.getEntropy 64
generateBootSeq :: Ship -> Pill -> IO BootSeq
generateBootSeq ship Pill{..} = do
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> pUserspaceOvums
pure $ BootSeq ident pBootFormulas ovums
ident = LogIdentity ship True (fromIntegral $ length pBootFormulas)
2019-07-22 02:33:26 +03:00
preKern ent = [ OvumBlip $ GoodParse $ BlipTerm $ TermBoot (1,()) (Fake (who ident))
, OvumBlip $ GoodParse $ BlipArvo $ ArvoWhom () ship
, OvumBlip $ GoodParse $ BlipArvo $ ArvoWack () ent
2019-07-16 03:01:45 +03:00
2019-07-21 22:56:18 +03:00
-- Write a batch of jobs into the event log ------------------------------------
2019-07-19 03:52:53 +03:00
writeJobs :: EventLog -> Vector Job -> IO ()
writeJobs log !jobs = do
expect <- Log.nextEv log
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
2019-07-17 08:32:36 +03:00
2019-07-21 22:56:18 +03:00
fromJob :: (EventId, Job) -> IO ByteString
2019-07-21 04:29:39 +03:00
fromJob (expectedId, job) = do
guard (expectedId == jobId job)
2019-07-21 22:56:18 +03:00
pure $ jamBS $ jobPayload job
2019-07-21 04:29:39 +03:00
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ m n)) = toNoun (m, n)
jobPayload (DoWork (Work _ m d o)) = toNoun (m, d, o)
2019-07-19 03:52:53 +03:00
2019-06-29 04:46:33 +03:00
2019-07-21 22:56:18 +03:00
-- Boot a new ship. ------------------------------------------------------------
2019-07-22 00:24:07 +03:00
booted :: FilePath -> FilePath -> Serf.Flags -> Ship
-> Acquire (Serf, EventLog, SerfState)
booted pillPath top flags ship = do
2019-07-21 22:56:18 +03:00
pill <- liftIO $ loadFile @Pill pillPath >>= \case
Left l -> error (show l)
Right p -> pure p
seq@(BootSeq ident x y) <- liftIO $ generateBootSeq ship pill
log <- Log.new (top <> "/.urb/log") ident
2019-07-22 00:24:07 +03:00
serf <- Serf.run (Serf.Config top flags)
2019-07-21 22:56:18 +03:00
liftIO $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
Serf.snapshot serf serfSt
writeJobs log (fromList events)
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
2019-07-22 00:24:07 +03:00
resumed :: FilePath -> Serf.Flags -> Acquire (Serf, EventLog, SerfState)
resumed top flags = do
2019-07-21 22:56:18 +03:00
log <- Log.existing (top <> "/.urb/log")
2019-07-22 00:24:07 +03:00
serf <- Serf.run (Serf.Config top flags)
2019-07-21 22:56:18 +03:00
serfSt <- liftIO (Serf.replay serf log)
liftIO (Serf.snapshot serf serfSt)
pure (serf, log, serfSt)
2019-06-29 04:46:33 +03:00
-- Run Pier --------------------------------------------------------------------
2019-06-25 04:10:41 +03:00
performCommonPierStartup :: Serf.Serf
2019-06-19 03:04:57 +03:00
-> TQueue Ovum
2019-07-21 22:56:18 +03:00
-> TQueue (Writ, FX)
-> TQueue (Writ, FX)
2019-06-19 01:38:24 +03:00
-> LogState
-> IO Pier
2019-06-25 04:10:41 +03:00
performCommonPierStartup serf computeQ persistQ releaseQ logState = do
2019-06-19 01:38:24 +03:00
for ioDrivers $ \x -> do
bootMessage <- bornEvent x
2019-06-25 04:10:41 +03:00
atomically $ writeTQueue computeQ bootMessage
2019-06-01 01:55:21 +03:00
2019-06-19 01:38:24 +03:00
driverThreads <- for ioDrivers $ \x -> do
2019-06-25 04:10:41 +03:00
startDriver x (writeTQueue computeQ)
2019-06-19 01:38:24 +03:00
2019-07-20 06:00:23 +03:00
-- TODO: Don't do a bunch of extra work; we send all effects to all drivers
2019-06-19 01:38:24 +03:00
portingThread <- async $ do
forever $ do
2019-06-25 04:10:41 +03:00
r <- atomically (readTQueue releaseQ)
2019-06-19 01:38:24 +03:00
for_ driverThreads $ \(_, k) ->
for_ (payload r) $ \eff ->
k eff
2019-06-25 04:10:41 +03:00
Serf.workerThread serf (readTQueue computeQ) undefined
2019-06-19 01:38:24 +03:00
pure (Pier{..})
2019-06-25 04:10:41 +03:00
2019-07-20 06:00:23 +03:00
2019-07-21 22:56:18 +03:00
-- Compute Thread --------------------------------------------------------------
runCompute :: Serf -> STM Ovum -> (EventId, Mug) -> IO (Async ())
runCompute w getEvent (evendId, mug) = async $ forever $ do
ovum <- atomically $ getEvent
currentDate <- Time.now
let _mat = jam (undefined (mug, currentDate, ovum))
2019-07-20 06:00:23 +03:00
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
runPersist :: EventLog
2019-07-21 22:56:18 +03:00
-> TQueue (Writ, FX)
-> ((Writ, FX) -> STM ())
2019-07-20 06:00:23 +03:00
-> Acquire ()
runPersist log inpQ out = do
mkAcquire runThread cancelWait
pure ()
cancelWait :: Async () -> IO ()
cancelWait tid = cancel tid >> wait tid
runThread :: IO (Async ())
runThread = asyncBound $ forever $ do
2019-07-21 22:56:18 +03:00
writs <- atomically getBatchFromQueue
events <- validateWritsAndGetBytes (toNullable writs)
2019-07-20 06:00:23 +03:00
Log.appendEvents log events
atomically $ traverse_ out writs
2019-07-21 22:56:18 +03:00
validateWritsAndGetBytes :: [(Writ, FX)] -> IO (Vector ByteString)
validateWritsAndGetBytes writs = do
2019-07-20 06:00:23 +03:00
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
2019-07-21 22:56:18 +03:00
$ \(expectedId, (w, fx)) -> do
unless (expectedId == writId w) $
throwIO (BadEventId expectedId (writId w))
pure (writEv w)
2019-07-20 06:00:23 +03:00
2019-07-21 22:56:18 +03:00
getBatchFromQueue :: STM (NonNull [(Writ, FX)])
2019-07-20 06:00:23 +03:00
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)