urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs

468 lines
16 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
Top-Level Pier Management
This is the code that starts the IO drivers and deals with
communication between the serf, the log, and the IO drivers.
-}
module Urbit.Vere.Pier
( booted, resumed, getSnapshot, pier, runPersist, runCompute, generateBootSeq
2019-08-22 02:49:08 +03:00
) where
import Urbit.Prelude
import RIO.Directory
import System.Random
import Urbit.Arvo
import Urbit.King.Config
import Urbit.Vere.Pier.Types
2020-01-25 03:20:13 +03:00
import Control.Monad.Trans.Maybe
import Data.Text (append)
import System.Posix.Files (ownerModes, setFileMode)
2020-02-04 04:27:16 +03:00
import Urbit.King.App (HasConfigDir(..), HasStderrLogFunc(..))
import Urbit.Vere.Ames (ames)
import Urbit.Vere.Behn (behn)
import Urbit.Vere.Clay (clay)
2020-05-08 21:29:18 +03:00
import Urbit.Vere.Eyre (eyre)
import Urbit.Vere.Eyre.Multi (MultiEyreApi)
import Urbit.Vere.Http.Client (client)
import Urbit.Vere.Log (EventLog)
import Urbit.Vere.Serf (Serf, SerfState(..), doJob, sStderr)
import qualified System.Entropy as Ent
import qualified Urbit.King.API as King
import qualified Urbit.Time as Time
import qualified Urbit.Vere.Log as Log
import qualified Urbit.Vere.Serf as Serf
import qualified Urbit.Vere.Term as Term
import qualified Urbit.Vere.Term.API as Term
import qualified Urbit.Vere.Term.Demux as Term
import qualified Urbit.Vere.Term.Render as Term
--------------------------------------------------------------------------------
_ioDrivers = [] :: [IODriver]
2019-08-28 14:45:49 +03:00
setupPierDirectory :: FilePath -> RIO e ()
2019-08-15 05:42:48 +03:00
setupPierDirectory shipPath = do
for_ ["put", "get", "log", "chk"] $ \seg -> do
2019-08-28 14:45:49 +03:00
let pax = shipPath <> "/.urb/" <> seg
createDirectoryIfMissing True pax
2019-08-28 14:45:49 +03:00
io $ setFileMode pax ownerModes
-- Load pill into boot sequence. -----------------------------------------------
2019-07-16 03:01:45 +03:00
2019-08-28 14:45:49 +03:00
genEntropy :: RIO e Word512
genEntropy = fromIntegral . bytesAtom <$> io (Ent.getEntropy 64)
2019-07-16 03:01:45 +03:00
2019-10-03 21:31:15 +03:00
generateBootSeq :: Ship -> Pill -> Bool -> LegacyBootEvent -> RIO e BootSeq
generateBootSeq ship Pill{..} lite boot = do
2019-07-16 03:01:45 +03:00
ent <- genEntropy
2019-10-16 23:38:46 +03:00
let ovums = preKern ent <> pKernelOvums <> postKern <> pUserspaceOvums
2019-07-16 03:01:45 +03:00
pure $ BootSeq ident pBootFormulas ovums
where
ident = LogIdentity ship isFake (fromIntegral $ length pBootFormulas)
preKern ent = [ EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
, EvBlip $ BlipEvArvo $ ArvoEvWack () ent
2019-07-16 03:01:45 +03:00
]
2019-10-16 23:38:46 +03:00
postKern = [ EvBlip $ BlipEvTerm $ TermEvBoot (1,()) lite boot ]
isFake = case boot of
Fake _ -> True
_ -> False
2019-07-16 03:01:45 +03:00
-- Write a batch of jobs into the event log ------------------------------------
2019-08-28 14:45:49 +03:00
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- Log.nextEv log
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
where
2019-08-28 14:45:49 +03:00
fromJob :: (EventId, Job) -> RIO e ByteString
fromJob (expectedId, job) = do
2019-08-28 14:45:49 +03:00
unless (expectedId == jobId job) $
error $ show ("bad job id!", expectedId, jobId job)
pure $ jamBS $ jobPayload job
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ m n)) = toNoun (m, n)
jobPayload (DoWork (Work _ m d o)) = toNoun (m, d, o)
-- Boot a new ship. ------------------------------------------------------------
2020-02-04 04:27:16 +03:00
booted :: (HasPierConfig e, HasStderrLogFunc e, HasLogFunc e)
2019-10-18 01:32:06 +03:00
=> Pill -> Bool -> Serf.Flags -> Ship -> LegacyBootEvent
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
2019-10-18 01:32:06 +03:00
booted pill lite flags ship boot = do
2019-10-03 21:31:15 +03:00
seq@(BootSeq ident x y) <- rio $ generateBootSeq ship pill lite boot
2019-08-28 14:45:49 +03:00
rio $ logTrace "BootSeq Computed"
2019-08-15 05:42:48 +03:00
pierPath <- view pierPathL
2019-10-18 01:32:06 +03:00
2019-08-28 14:45:49 +03:00
liftRIO (setupPierDirectory pierPath)
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Directory Setup"
2019-08-15 05:42:48 +03:00
log <- Log.new (pierPath <> "/.urb/log") ident
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ logTrace "Event Log Initialized"
2019-08-15 05:42:48 +03:00
serf <- Serf.run (Serf.Config pierPath flags)
2019-08-28 14:45:49 +03:00
rio $ logTrace "Serf Started"
2019-08-15 05:42:48 +03:00
2019-08-28 14:45:49 +03:00
rio $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
2019-08-28 14:45:49 +03:00
logTrace "Boot Sequence completed"
Serf.snapshot serf serfSt
2019-08-28 14:45:49 +03:00
logTrace "Snapshot taken"
writeJobs log (fromList events)
2019-08-28 14:45:49 +03:00
logTrace "Events written"
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
2020-02-04 04:27:16 +03:00
resumed :: (HasStderrLogFunc e, HasPierConfig e, HasLogFunc e)
=> Maybe Word64 -> Serf.Flags
2019-09-04 01:17:20 +03:00
-> RAcquire e (Serf e, EventLog, SerfState)
resumed event flags = do
2020-02-06 02:20:32 +03:00
rio $ logTrace "Resuming ship"
top <- view pierPathL
2020-01-25 03:20:13 +03:00
tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do
ev <- MaybeT (pure event)
MaybeT (getSnapshot top ev)
2020-02-06 02:20:32 +03:00
rio $ logTrace $ display @Text ("pier: " <> pack top)
rio $ logTrace $ display @Text ("running serf in: " <> pack tap)
log <- Log.existing (top <> "/.urb/log")
2020-02-06 02:20:32 +03:00
serf <- Serf.run (Serf.Config tap flags)
2020-02-06 02:20:32 +03:00
serfSt <- rio $ Serf.replay serf log event
2019-08-28 15:22:56 +03:00
rio $ Serf.snapshot serf serfSt
pure (serf, log, serfSt)
2020-01-25 03:20:13 +03:00
getSnapshot :: forall e. FilePath -> Word64 -> RIO e (Maybe FilePath)
getSnapshot top last = do
2020-01-25 03:20:13 +03:00
lastSnapshot <- lastMay <$> listReplays
pure (replayToPath <$> lastSnapshot)
where
replayDir = top </> ".partial-replay"
replayToPath eId = replayDir </> show eId
listReplays :: RIO e [Word64]
listReplays = do
createDirectoryIfMissing True replayDir
snapshotNums <- mapMaybe readMay <$> listDirectory replayDir
pure $ sort (filter (<= fromIntegral last) snapshotNums)
-- Run Pier --------------------------------------------------------------------
acquireWorker :: RIO e () -> RAcquire e (Async ())
acquireWorker act = mkRAcquire (async act) cancel
pier :: e. (HasConfigDir e, HasLogFunc e, HasNetworkConfig e, HasPierConfig e)
2019-10-18 01:32:06 +03:00
=> (Serf e, EventLog, SerfState)
-> MVar ()
-> MultiEyreApi
2019-08-28 14:45:49 +03:00
-> RAcquire e ()
pier (serf, log, ss) mStart multi = do
computeQ <- newTQueueIO
persistQ <- newTQueueIO
executeQ <- newTQueueIO
saveM <- newEmptyTMVarIO
shutdownM <- newEmptyTMVarIO
kapi King.kingAPI
termApiQ <- atomically $ do
q <- newTQueue
writeTVar (King.kTermConn kapi) (Just $ writeTQueue q)
pure q
let shutdownEvent = putTMVar shutdownM ()
2019-08-28 14:45:49 +03:00
inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16)
2019-12-17 14:29:58 +03:00
-- (sz, local) <- Term.localClient
-- (waitExternalTerm, termServPort) <- Term.termServer
(demux, muxed) <- atomically $ do
res <- Term.mkDemux
2019-12-17 14:29:58 +03:00
-- Term.addDemux local res
pure (res, Term.useDemux res)
-- rio $ logInfo $ display $
-- "TERMSERV Terminal Server running on port: " <> tshow termServPort
acquireWorker $ forever $ do
logTrace "TERMSERV Waiting for external terminal."
atomically $ do
ext <- Term.connClient <$> readTQueue termApiQ
Term.addDemux ext demux
logTrace "TERMSERV External terminal connected."
swapMVar (sStderr serf) (atomically . Term.trace muxed)
2019-09-04 01:17:20 +03:00
let logId = Log.identity log
let ship = who logId
-- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
-- add them.
let showErr = atomically . Term.trace muxed . (flip append "\r\n")
let (bootEvents, startDrivers) =
drivers inst multi ship (isFake logId)
(writeTQueue computeQ)
shutdownEvent
(Term.TSize{tsWide=80, tsTall=24}, muxed)
showErr
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
tExe <- startDrivers >>= router (readTQueue executeQ)
tDisk <- runPersist log persistQ (writeTQueue executeQ)
2019-09-18 07:41:31 +03:00
tCpu <- runCompute serf ss
(readTQueue computeQ)
(takeTMVar saveM)
(takeTMVar shutdownM)
(Term.spin muxed)
(Term.stopSpin muxed)
2019-09-18 07:41:31 +03:00
(writeTQueue persistQ)
2019-07-20 06:00:23 +03:00
tSaveSignal <- saveSignalThread saveM
putMVar mStart ()
-- Wait for something to die.
2019-07-20 06:00:23 +03:00
let ded = asum [ death "effect thread" tExe
, death "persist thread" tDisk
, death "compute thread" tCpu
]
atomically ded >>= \case
2019-08-28 14:45:49 +03:00
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("Something simply exited", tag)
atomically $ (Term.spin muxed) (Just "shutdown")
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
waitCatchSTM tid <&> \case
Left exn -> Left (tag, exn)
Right () -> Right tag
saveSignalThread :: TMVar () -> RAcquire e (Async ())
saveSignalThread tm = mkRAcquire start cancel
where
start = async $ forever $ do
threadDelay (120 * 1000000) -- 120 seconds
atomically $ putTMVar tm ()
-- Start All Drivers -----------------------------------------------------------
2019-08-29 03:26:59 +03:00
data Drivers e = Drivers
{ dAmes :: EffCb e AmesEf
, dBehn :: EffCb e BehnEf
, dHttpClient :: EffCb e HttpClientEf
, dHttpServer :: EffCb e HttpServerEf
, dNewt :: EffCb e NewtEf
, dSync :: EffCb e SyncEf
, dTerm :: EffCb e TermEf
}
drivers
:: (HasLogFunc e, HasNetworkConfig e, HasPierConfig e)
=> KingId
-> MultiEyreApi
-> Ship
-> Bool
-> (Ev -> STM ())
-> STM ()
-> (Term.TSize, Term.Client)
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (Drivers e))
drivers inst multi who isFake plan shutdownSTM termSys stderr =
(initialEvents, runDrivers)
where
(behnBorn, runBehn) = behn inst plan
2019-10-18 01:32:06 +03:00
(amesBorn, runAmes) = ames inst who isFake plan stderr
(httpBorn, runHttp) = eyre inst multi who plan isFake
(clayBorn, runClay) = clay inst plan
(irisBorn, runIris) = client inst plan
(termBorn, runTerm) = Term.term termSys shutdownSTM inst plan
initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn,
termBorn, irisBorn]
runDrivers = do
dNewt <- runAmes
2019-08-29 03:26:59 +03:00
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dHttpClient <- runIris
dHttpServer <- runHttp
dSync <- runClay
2019-09-03 21:02:54 +03:00
dTerm <- runTerm
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
2019-08-29 03:26:59 +03:00
router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ())
2019-08-28 14:45:49 +03:00
router waitFx Drivers{..} =
mkRAcquire start cancel
where
start = async $ forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
2019-08-28 14:45:49 +03:00
logEffect ef
case ef of
GoodParse (EfVega _ _) -> error "TODO"
GoodParse (EfExit _ _) -> error "TODO"
GoodParse (EfVane (VEAmes ef)) -> dAmes ef
GoodParse (EfVane (VEBehn ef)) -> dBehn ef
GoodParse (EfVane (VEBoat ef)) -> dSync ef
GoodParse (EfVane (VEClay ef)) -> dSync ef
GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef
GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef
GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
2019-08-28 14:45:49 +03:00
FailParse n -> logError
$ display
$ pack @Text (ppShow n)
-- Compute Thread --------------------------------------------------------------
data ComputeRequest
= CREvent Ev
| CRSave ()
| CRShutdown ()
deriving (Eq, Show)
2019-08-28 14:45:49 +03:00
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev =
logDebug $ display $ "[EVENT]\n" <> pretty
where
pretty :: Text
pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev
logEffect :: HasLogFunc e => Lenient Ef -> RIO e ()
logEffect ef =
logDebug $ display $ "[EFFECT]\n" <> pretty ef
where
pretty :: Lenient Ef -> Text
pretty = \case
GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
=> Serf e
-> SerfState
-> STM Ev
-> STM ()
-> STM ()
2019-09-18 07:41:31 +03:00
-> (Maybe Text -> STM ())
-> STM ()
-> ((Job, FX) -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runCompute serf ss getEvent getSaveSignal getShutdownSignal
showSpinner hideSpinner putResult =
2019-08-28 14:45:49 +03:00
mkRAcquire (async (go ss)) cancel
where
2019-08-28 14:45:49 +03:00
go :: SerfState -> RIO e ()
go ss = do
cr <- atomically $
2019-09-10 23:34:11 +03:00
CRShutdown <$> getShutdownSignal <|>
CRSave <$> getSaveSignal <|>
2019-09-10 23:34:11 +03:00
CREvent <$> getEvent
case cr of
CREvent ev -> do
logEvent ev
wen <- io Time.now
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
atomically $ showSpinner (getSpinnerNameForEvent ev)
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically $ hideSpinner
atomically (putResult (job', fx))
go ss'
CRSave () -> do
logDebug $ "Taking periodic snapshot"
Serf.snapshot serf ss
go ss
CRShutdown () -> do
-- When shutting down, we first request a snapshot, and then we
-- just exit this recursive processing, which will cause the serf
-- to exit from its RAcquire.
logDebug $ "Shutting down compute system..."
Serf.snapshot serf ss
pure ()
2019-07-20 06:00:23 +03:00
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
]
runPersist :: e. (HasPierConfig e, HasLogFunc e)
=> EventLog
-> TQueue (Job, FX)
-> (FX -> STM ())
2019-08-28 14:45:49 +03:00
-> RAcquire e (Async ())
runPersist log inpQ out =
mkRAcquire runThread cancel
2019-07-20 06:00:23 +03:00
where
2019-08-28 14:45:49 +03:00
runThread :: RIO e (Async ())
runThread = asyncBound $ do
dryRun <- view dryRunL
forever $ do
writs <- atomically getBatchFromQueue
unless dryRun $ do
events <- validateJobsAndGetBytes (toNullable writs)
Log.appendEvents log events
atomically $ for_ writs $ \(_,fx) -> out fx
2019-07-20 06:00:23 +03:00
2019-08-28 14:45:49 +03:00
validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString)
validateJobsAndGetBytes writs = do
2019-07-20 06:00:23 +03:00
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, (j, fx)) -> do
unless (expectedId == jobId j) $
throwIO (BadEventId expectedId (jobId j))
case j of
RunNok _ ->
error "This shouldn't happen here!"
DoWork (Work eId mug wen ev) ->
pure $ jamBS $ toNoun (mug, wen, ev)
getBatchFromQueue :: STM (NonNull [(Job, FX)])
2019-07-20 06:00:23 +03:00
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
where
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)