shrub/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs

609 lines
20 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
2020-05-22 21:12:28 +03:00
Top-Level Pier Management
2020-01-23 07:16:09 +03:00
This is the code that starts the IO drivers and deals with communication
between the serf, the event log, and the IO drivers.
2020-01-23 07:16:09 +03:00
-}
module Urbit.Vere.Pier
( booted
, runSerf
, resumed
, getSnapshot
, pier
, runPersist
, runCompute
2020-06-07 02:34:27 +03:00
, genBootSeq
)
where
import Urbit.Prelude
2020-05-13 22:35:57 +03:00
import Control.Monad.Trans.Maybe
import RIO.Directory
import Urbit.Arvo
import Urbit.King.App
import Urbit.Vere.Pier.Types
import Control.Monad.STM (retry)
2020-07-25 07:05:23 +03:00
import System.Environment (getExecutablePath)
import System.FilePath (splitFileName)
import System.Posix.Files (ownerModes, setFileMode)
import Urbit.EventLog.LMDB (EventLog)
import Urbit.EventLog.Event (buildLogEvent)
2020-06-09 01:20:21 +03:00
import Urbit.King.API (TermConn)
import Urbit.Noun.Time (Wen)
import Urbit.TermSize (TermSize(..), termSize)
import Urbit.Vere.Serf (Serf)
import qualified Data.Text as T
import qualified System.Entropy as Ent
import qualified Urbit.EventLog.LMDB as Log
import qualified Urbit.King.API as King
import qualified Urbit.Noun.Time as Time
import qualified Urbit.Vere.Ames as Ames
import qualified Urbit.Vere.Behn as Behn
import qualified Urbit.Vere.Clay as Clay
import qualified Urbit.Vere.Eyre as Eyre
import qualified Urbit.Vere.Eyre.KingSubsite as Site
import qualified Urbit.Vere.Http.Client as Iris
import qualified Urbit.Vere.Serf as Serf
import qualified Urbit.Vere.Term as Term
import qualified Urbit.Vere.Term.API as Term
import qualified Urbit.Vere.Term.Demux as Term
2020-06-07 02:34:27 +03:00
-- Initialize pier directory. --------------------------------------------------
2020-06-08 02:35:54 +03:00
data PierDirectoryAlreadyExists = PierDirectoryAlreadyExists
deriving (Show, Exception)
2019-08-28 14:45:49 +03:00
setupPierDirectory :: FilePath -> RIO e ()
2019-08-15 05:42:48 +03:00
setupPierDirectory shipPath = do
2020-06-08 02:35:54 +03:00
-- shipPath will already exist because we put a lock file there.
alreadyExists <- doesPathExist (shipPath </> ".urb")
when alreadyExists $ do
throwIO PierDirectoryAlreadyExists
2020-05-22 21:12:28 +03:00
for_ ["put", "get", "log", "chk"] $ \seg -> do
2020-06-08 02:35:54 +03:00
let pax = shipPath </> ".urb" </> seg
2020-05-22 21:12:28 +03:00
createDirectoryIfMissing True pax
io $ setFileMode pax ownerModes
-- Load pill into boot sequence. -----------------------------------------------
2019-07-16 03:01:45 +03:00
genEntropy :: MonadIO m => m Entropy
genEntropy = Entropy . fromIntegral . bytesAtom <$> io (Ent.getEntropy 64)
2019-07-16 03:01:45 +03:00
genBootSeq :: MonadIO m => Ship -> Pill -> Bool -> LegacyBootEvent -> m BootSeq
genBootSeq ship Pill {..} lite boot = io $ do
2020-05-22 21:12:28 +03:00
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> postKern <> pUserspaceOvums
pure $ BootSeq ident pBootFormulas ovums
where
ident = LogIdentity ship isFake (fromIntegral $ length pBootFormulas)
preKern ent =
[ EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
, EvBlip $ BlipEvArvo $ ArvoEvWack () ent
]
postKern = [EvBlip $ BlipEvTerm $ TermEvBoot (1, ()) lite boot]
isFake = case boot of
Fake _ -> True
_ -> False
2019-07-16 03:01:45 +03:00
-- Write to the log. -----------------------------------------------------------
-- | Write a batch of jobs to the event log.
2019-08-28 14:45:49 +03:00
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- atomically (Log.nextEv log)
2020-05-22 21:12:28 +03:00
events <- fmap fromList $ traverse fromJob (zip [expect ..] $ toList jobs)
Log.appendEvents log events
where
fromJob :: (EventId, Job) -> RIO e ByteString
fromJob (expectedId, job) = do
unless (expectedId == jobId job) $ error $ show
("bad job id!", expectedId, jobId job)
pure $ buildLogEvent (jobMug job) (jobPayload job)
jobMug :: Job -> Mug
jobMug (RunNok (LifeCyc _ m _)) = m
jobMug (DoWork (Work _ m _ _)) = m
2020-05-22 21:12:28 +03:00
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ _ n)) = toNoun n
jobPayload (DoWork (Work _ _ d o)) = toNoun (d, o)
2020-06-07 02:34:27 +03:00
-- Acquire a running serf. -----------------------------------------------------
runSerf
:: HasPierEnv e
=> TVar ((Atom, Tank) -> IO ())
-> FilePath
-> RAcquire e Serf
runSerf vSlog pax = do
env <- ask
2020-07-25 07:05:23 +03:00
serfProg <- io getSerfProg
Serf.withSerf (config env serfProg)
where
slog s = atomically (readTVar vSlog) >>= (\f -> f s)
2020-07-25 07:05:23 +03:00
config env serfProg = Serf.Config
{ scSerf = env ^. pierConfigL . pcSerfExe . to (maybe serfProg unpack)
, scPier = pax
, scFlag = env ^. pierConfigL . pcSerfFlags
2020-09-28 17:56:51 +03:00
, scSlog = slog
, scStdr = \txt -> slog (0, (textToTank txt))
, scDead = pure () -- TODO: What can be done?
}
2020-07-25 07:05:23 +03:00
getSerfProg :: IO FilePath
getSerfProg = do
(path, filename) <- splitFileName <$> getExecutablePath
pure $ case filename of
"urbit" -> path </> "urbit-worker"
"urbit-king" -> path </> "urbit-worker"
_ -> "urbit-worker"
-- Boot a new ship. ------------------------------------------------------------
2020-05-22 21:12:28 +03:00
booted
:: TVar ((Atom, Tank) -> IO ())
-> Pill
2020-05-22 21:12:28 +03:00
-> Bool
-> Ship
-> LegacyBootEvent
-> RAcquire PierEnv (Serf, EventLog)
booted vSlog pill lite ship boot = do
rio $ bootNewShip pill lite ship boot
resumed vSlog Nothing
2019-08-15 05:42:48 +03:00
bootSeqJobs :: Time.Wen -> BootSeq -> [Job]
bootSeqJobs now (BootSeq ident nocks ovums) = zipWith ($) bootSeqFns [1 ..]
where
wen :: EventId -> Time.Wen
wen off = Time.addGap now ((fromIntegral off - 1) ^. from Time.microSecs)
bootSeqFns :: [EventId -> Job]
bootSeqFns = fmap nockJob nocks <> fmap ovumJob ovums
where
nockJob nok eId = RunNok $ LifeCyc eId 0 nok
ovumJob ov eId = DoWork $ Work eId 0 (wen eId) ov
bootNewShip
:: HasPierEnv e
=> Pill
-> Bool
-> Ship
-> LegacyBootEvent
-> RIO e ()
bootNewShip pill lite ship bootEv = do
2020-06-07 02:34:27 +03:00
seq@(BootSeq ident x y) <- genBootSeq ship pill lite bootEv
logInfo "BootSeq Computed"
2019-08-15 05:42:48 +03:00
pierPath <- view pierPathL
2019-10-18 01:32:06 +03:00
rio (setupPierDirectory pierPath)
logInfo "Directory setup."
2019-08-15 05:42:48 +03:00
let logPath = (pierPath </> ".urb/log")
2019-08-15 05:42:48 +03:00
rwith (Log.new logPath ident) $ \log -> do
2020-10-22 20:06:57 +03:00
logInfo "Event log initialized."
jobs <- (\now -> bootSeqJobs now seq) <$> io Time.now
writeJobs log (fromList jobs)
2019-08-15 05:42:48 +03:00
logInfo "Finsihed populating event log with boot sequence"
-- Resume an existing ship. ----------------------------------------------------
resumed
:: TVar ((Atom, Tank) -> IO ())
-> Maybe Word64
-> RAcquire PierEnv (Serf, EventLog)
resumed vSlog replayUntil = do
rio $ logTrace "Resuming ship"
top <- view pierPathL
tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do
ev <- MaybeT (pure replayUntil)
MaybeT (getSnapshot top ev)
2020-02-06 02:20:32 +03:00
rio $ do
logTrace $ display @Text ("pier: " <> pack top)
logTrace $ display @Text ("running serf in: " <> pack tap)
log <- Log.existing (top </> ".urb/log")
serf <- runSerf vSlog tap
rio $ do
logInfo "Replaying events"
Serf.execReplay serf log replayUntil >>= \case
Left err -> error (show err)
Right 0 -> do
logInfo "No work during replay so no snapshot"
pure ()
Right _ -> do
logInfo "Taking snapshot"
io (Serf.snapshot serf)
logInfo "SNAPSHOT TAKEN"
pure (serf, log)
-- | Get a fake pier directory for partial snapshots.
2020-06-07 02:34:27 +03:00
getSnapshot :: forall e . FilePath -> Word64 -> RIO e (Maybe FilePath)
getSnapshot top last = do
2020-06-07 02:34:27 +03:00
lastSnapshot <- lastMay <$> listReplays
pure (replayToPath <$> lastSnapshot)
where
replayDir = top </> ".partial-replay"
replayToPath eId = replayDir </> show eId
2019-09-04 01:17:20 +03:00
2020-06-07 02:34:27 +03:00
listReplays :: RIO e [Word64]
listReplays = do
createDirectoryIfMissing True replayDir
snapshotNums <- mapMaybe readMay <$> listDirectory replayDir
pure $ sort (filter (<= fromIntegral last) snapshotNums)
2020-06-07 02:34:27 +03:00
-- Run Pier --------------------------------------------------------------------
2020-05-13 22:35:57 +03:00
pier
:: (Serf, EventLog)
-> TVar ((Atom, Tank) -> IO ())
2020-05-13 22:35:57 +03:00
-> MVar ()
-> [Ev]
-> RAcquire PierEnv ()
pier (serf, log) vSlog startedSig injected = do
2020-06-09 01:20:21 +03:00
let logId = Log.identity log :: LogIdentity
let ship = who logId :: Ship
-- TODO Instead of using a TMVar, pull directly from the IO driver
-- event sources.
computeQ :: TMVar RunReq <- newEmptyTMVarIO
2020-06-09 01:20:21 +03:00
persistQ :: TQueue (Fact, FX) <- newTQueueIO
executeQ :: TQueue FX <- newTQueueIO
saveSig :: TMVar () <- newEmptyTMVarIO
kingApi :: King.King <- King.kingAPI
termApiQ :: TQueue TermConn <- atomically $ do
2020-06-07 02:34:27 +03:00
q <- newTQueue
writeTVar (King.kTermConn kingApi) (Just $ writeTQueue q)
pure q
initialTermSize <- io $ termSize
2020-06-09 01:20:21 +03:00
(demux :: Term.Demux, muxed :: Term.Client) <- atomically $ do
res <- Term.mkDemux initialTermSize
2020-06-07 02:34:27 +03:00
pure (res, Term.useDemux res)
2020-06-09 01:20:21 +03:00
void $ acquireWorker "TERMSERV Listener" $ forever $ do
logInfo "TERMSERV Waiting for external terminal."
atomically $ do
2020-06-07 02:34:27 +03:00
ext <- Term.connClient <$> readTQueue termApiQ
Term.addDemux ext demux
logInfo "TERMSERV External terminal connected."
2020-06-07 02:34:27 +03:00
2020-10-13 07:57:01 +03:00
scryQ <- newTQueueIO
2020-09-27 01:55:10 +03:00
onKill <- view onKillPierSigL
2020-06-07 02:34:27 +03:00
-- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
-- add them.
let compute = putTMVar computeQ
2020-06-09 01:20:21 +03:00
let execute = writeTQueue executeQ
let persist = writeTQueue persistQ
let sigint = Serf.sendSIGINT serf
2020-10-16 07:22:48 +03:00
let scry = \w b g -> do
res <- newEmptyMVar
atomically $ writeTQueue scryQ (w, b, g, putMVar res)
takeMVar res
2020-05-13 22:35:57 +03:00
-- Set up the runtime subsite server and its capability to slog
siteSlog <- newTVarIO (const $ pure ())
runtimeSubsite <- Site.kingSubsite ship scry siteSlog
-- Slogs go to stderr, to the runtime subsite, and to the terminal.
env <- ask
atomically $ writeTVar vSlog $ \s@(_, tank) -> runRIO env $ do
atomically $ Term.slog muxed s
io $ readTVarIO siteSlog >>= ($ s)
logOther "serf" (display $ T.strip $ tankToText tank)
2020-06-09 01:20:21 +03:00
(bootEvents, startDrivers) <- do
env <- ask
let err = atomically . Term.trace muxed . (<> "\r\n")
siz <- atomically $ Term.curDemuxSize demux
2020-06-09 01:20:21 +03:00
let fak = isFake logId
drivers env ship fak compute scry (siz, muxed) err sigint runtimeSubsite
let computeConfig = ComputeConfig { ccOnWork = takeTMVar computeQ
2020-06-07 02:34:27 +03:00
, ccOnKill = onKill
2020-06-09 01:20:21 +03:00
, ccOnSave = takeTMVar saveSig
2020-10-13 07:57:01 +03:00
, ccOnScry = readTQueue scryQ
2020-06-09 01:20:21 +03:00
, ccPutResult = persist
2020-06-07 02:34:27 +03:00
, ccShowSpinner = Term.spin muxed
, ccHideSpinner = Term.stopSpin muxed
, ccLastEvInLog = Log.lastEv log
}
2020-05-28 21:21:43 +03:00
tSerf <- acquireWorker "Serf" (runCompute serf computeConfig)
-- Run all born events and retry them until they succeed.
wackEv <- EvBlip . BlipEvArvo . ArvoEvWack () <$> genEntropy
rio $ for_ (wackEv : bootEvents) $ \ev -> do
okaySig <- newEmptyMVar
let inject n = atomically $ compute $ RRWork $ EvErr ev $ cb n
-- TODO Make sure this dies cleanly.
cb :: Int -> WorkError -> IO ()
cb n | n >= 3 = error ("boot event failed: " <> show ev)
cb n = \case
RunOkay _ -> putMVar okaySig ()
RunSwap _ _ _ _ _ -> putMVar okaySig ()
RunBail _ -> inject (n + 1)
2020-06-11 05:02:09 +03:00
-- logTrace ("[BOOT EVENT]: " <> display (summarizeEvent ev))
io (inject 0)
let slog :: Text -> IO ()
slog txt = do
fn <- atomically (readTVar vSlog)
fn (0, textToTank txt)
2020-06-09 01:20:21 +03:00
drivz <- startDrivers
tExec <- acquireWorker "Effects" (router slog (readTQueue executeQ) drivz)
2020-06-09 01:20:21 +03:00
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute)
2020-10-22 17:53:28 +03:00
-- Now that the Serf is configured, the IO drivers are hooked up, their
-- starting events have been dispatched, and the terminal is live, we can now
-- handle injecting events requested from the command line.
2020-10-22 17:53:28 +03:00
for_ (zip [1..] injected) $ \(num, ev) -> rio $ do
logTrace $ display @Text ("Injecting event " ++ (tshow num) ++ " of " ++
(tshow $ length injected) ++ "...")
okaySig :: MVar (Either [Goof] ()) <- newEmptyMVar
let inject = atomically $ compute $ RRWork $ EvErr ev $ cb
cb :: WorkError -> IO ()
cb = \case
RunOkay _ -> putMVar okaySig (Right ())
RunSwap _ _ _ _ _ -> putMVar okaySig (Right ())
RunBail goofs -> putMVar okaySig (Left goofs)
io inject
takeMVar okaySig >>= \case
Left goof -> logError $ display @Text ("Goof in injected event: " <>
tshow goof)
2020-10-22 17:53:28 +03:00
Right () -> pure ()
2020-06-09 01:20:21 +03:00
let snapshotEverySecs = 120
2020-06-09 01:20:21 +03:00
void $ acquireWorker "Save" $ forever $ do
threadDelay (snapshotEverySecs * 1_000_000)
void $ atomically $ tryPutTMVar saveSig ()
2020-06-09 01:20:21 +03:00
putMVar startedSig ()
2020-06-07 02:34:27 +03:00
-- Wait for something to die.
2019-07-20 06:00:23 +03:00
2020-06-07 02:34:27 +03:00
let ded = asum
[ death "effects thread" tExec
, death "persist thread" tDisk
, death "compute thread" tSerf
]
2020-06-07 02:34:27 +03:00
atomically ded >>= \case
Left (tag, exn) -> logError $ displayShow (tag, "crashed", exn)
Right "compute thread" -> pure ()
Right tag -> logError $ displayShow (tag, "exited unexpectly")
2020-06-07 02:34:27 +03:00
atomically $ (Term.spin muxed) (Just "shutdown")
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
waitCatchSTM tid <&> \case
Left exn -> Left (tag, exn)
Right () -> Right tag
-- Start All Drivers -----------------------------------------------------------
data Drivers = Drivers
{ dBehn :: BehnEf -> IO ()
2020-06-07 02:34:27 +03:00
, dIris :: HttpClientEf -> IO ()
, dEyre :: HttpServerEf -> IO ()
, dNewt :: NewtEf -> IO ()
, dSync :: SyncEf -> IO ()
, dTerm :: TermEf -> IO ()
}
drivers
2020-05-22 21:12:28 +03:00
:: HasPierEnv e
2020-05-13 22:35:57 +03:00
=> e
-> Ship
-> Bool
-> (RunReq -> STM ())
2020-10-16 07:22:48 +03:00
-> (Wen -> Gang -> Path -> IO (Maybe (Term, Noun)))
-> (TermSize, Term.Client)
-> (Text -> RIO e ())
-> IO ()
-> Site.KingSubsite
-> RAcquire e ([Ev], RAcquire e Drivers)
drivers env who isFake plan scry termSys stderr serfSIGINT sub = do
(behnBorn, runBehn) <- rio Behn.behn'
(termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT)
2020-09-27 01:55:10 +03:00
(amesBorn, runAmes) <- rio (Ames.ames' who isFake scry stderr)
(httpBorn, runEyre) <- rio (Eyre.eyre' who isFake stderr sub)
(clayBorn, runClay) <- rio Clay.clay'
(irisBorn, runIris) <- rio Iris.client'
2020-09-27 01:55:10 +03:00
putStrLn ("ship is " <> tshow who)
let initialEvents = mconcat [behnBorn,clayBorn,amesBorn,httpBorn,irisBorn,termBorn]
let runDrivers = do
behn <- runBehn
term <- runTerm
ames <- runAmes
iris <- runIris
eyre <- runEyre
clay <- runClay
-- Sources lower in the list are starved until sources above them
-- have no events to offer.
acquireWorker "Event Prioritization" $ forever $ atomically $ do
let x = diEventSource
let eventSources = [x term, x clay, x behn, x iris, x eyre, x ames]
pullEvent eventSources >>= \case
Nothing -> retry
Just rr -> plan rr
pure $ Drivers
{ dTerm = diOnEffect term
, dBehn = diOnEffect behn
, dNewt = diOnEffect ames
, dIris = diOnEffect iris
, dEyre = diOnEffect eyre
, dSync = diOnEffect clay
}
pure (initialEvents, runDrivers)
2020-06-07 02:34:27 +03:00
where
pullEvent :: [STM (Maybe a)] -> STM (Maybe a)
pullEvent [] = pure Nothing
pullEvent (d:ds) = d >>= \case
Just r -> pure (Just r)
Nothing -> pullEvent ds
-- Route Effects to Drivers ----------------------------------------------------
router :: HasPierEnv e => (Text -> IO ()) -> STM FX -> Drivers -> RIO e ()
router slog waitFx Drivers {..} = do
kill <- view killPierActionL
let exit = io (slog "<<<shutdown>>>\r\n") >> atomically kill
let vega = io (slog "<<<reset>>>\r\n")
forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
logEffect ef
case ef of
GoodParse (EfVega _ _ ) -> vega
GoodParse (EfExit _ _ ) -> exit
GoodParse (EfVane (VEBehn ef)) -> io (dBehn ef)
GoodParse (EfVane (VEBoat ef)) -> io (dSync ef)
GoodParse (EfVane (VEClay ef)) -> io (dSync ef)
GoodParse (EfVane (VEHttpClient ef)) -> io (dIris ef)
GoodParse (EfVane (VEHttpServer ef)) -> io (dEyre ef)
GoodParse (EfVane (VENewt ef)) -> io (dNewt ef)
GoodParse (EfVane (VESync ef)) -> io (dSync ef)
GoodParse (EfVane (VETerm ef)) -> io (dTerm ef)
FailParse n -> logError $ display $ pack @Text (ppShow n)
2020-06-07 02:34:27 +03:00
-- Compute (Serf) Thread -------------------------------------------------------
2019-08-28 14:45:49 +03:00
logEvent :: HasLogFunc e => Ev -> RIO e ()
2020-06-11 05:02:09 +03:00
logEvent ev = do
--logInfo $ "<- " <> display (summarizeEvent ev)
2020-06-11 05:02:09 +03:00
logDebug $ "[EVENT]\n" <> display pretty
where
pretty :: Text
pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev
2019-08-28 14:45:49 +03:00
logEffect :: HasLogFunc e => Lenient Ef -> RIO e ()
2020-06-11 05:02:09 +03:00
logEffect ef = do
--logInfo $ " -> " <> display (summarizeEffect ef)
2020-06-11 05:02:09 +03:00
logDebug $ display $ "[EFFECT]\n" <> pretty ef
where
pretty :: Lenient Ef -> Text
pretty = \case
GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
2019-08-28 14:45:49 +03:00
data ComputeConfig = ComputeConfig
{ ccOnWork :: STM RunReq
, ccOnKill :: STM ()
, ccOnSave :: STM ()
, ccOnScry :: STM (Wen, Gang, Path, Maybe (Term, Noun) -> IO ())
, ccPutResult :: (Fact, FX) -> STM ()
, ccShowSpinner :: Maybe Text -> STM ()
, ccHideSpinner :: STM ()
, ccLastEvInLog :: STM EventId
}
runCompute :: forall e . HasKingEnv e => Serf.Serf -> ComputeConfig -> RIO e ()
2020-05-28 21:21:43 +03:00
runCompute serf ComputeConfig {..} = do
2020-06-09 01:20:21 +03:00
logDebug "runCompute"
let onRR = asum [ ccOnKill <&> Serf.RRKill
, ccOnSave <&> Serf.RRSave
, ccOnWork
, ccOnScry <&> \(w,g,p,k) -> Serf.RRScry w g p k
]
vEvProcessing :: TMVar Ev <- newEmptyTMVarIO
void $ async $ forever (atomically (takeTMVar vEvProcessing) >>= logEvent)
let onSpin :: Maybe Ev -> STM ()
2020-06-07 02:34:27 +03:00
onSpin = \case
Nothing -> ccHideSpinner
Just ev -> do
ccShowSpinner (getSpinnerNameForEvent ev)
putTMVar vEvProcessing ev
let maxBatchSize = 10
io (Serf.run serf maxBatchSize ccLastEvInLog onRR ccPutResult onSpin)
2020-06-07 02:34:27 +03:00
-- Event-Log Persistence Thread ------------------------------------------------
2019-07-20 06:00:23 +03:00
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
]
runPersist
:: forall e
. HasPierEnv e
=> EventLog
-> TQueue (Fact, FX)
-> (FX -> STM ())
2020-05-28 21:21:43 +03:00
-> RIO e ()
runPersist log inpQ out = do
dryRun <- view dryRunL
forever $ do
writs <- atomically getBatchFromQueue
events <- validateFactsAndGetBytes (fst <$> toNullable writs)
unless dryRun (Log.appendEvents log events)
atomically $ for_ writs $ \(_, fx) -> do
out fx
2020-05-28 21:21:43 +03:00
where
validateFactsAndGetBytes :: [Fact] -> RIO e (Vector ByteString)
validateFactsAndGetBytes facts = do
expect <- atomically (Log.nextEv log)
lis <- for (zip [expect ..] facts) $ \(expectedId, Fact eve mug wen non) ->
do
unless (expectedId == eve) $ do
throwIO (BadEventId expectedId eve)
pure $ buildLogEvent mug $ toNoun (wen, non)
pure (fromList lis)
getBatchFromQueue :: STM (NonNull [(Fact, FX)])
getBatchFromQueue = readTQueue inpQ >>= go . singleton
where
go acc = tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)