urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs

163 lines
4.6 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
2020-05-28 21:21:43 +03:00
High-Level Serf Interface
2019-07-17 02:14:46 +03:00
-}
module Urbit.Vere.Serf
( withSerf
, execReplay
, collectFX
, module X
)
where
import Urbit.Prelude
import Data.Conduit
import Urbit.Vere.Pier.Types
import Urbit.Vere.Serf.IPC
2020-05-28 21:21:43 +03:00
import Control.Monad.Trans.Resource (runResourceT)
import Urbit.Arvo (FX)
2020-06-09 01:20:21 +03:00
import Urbit.King.App (HasStderrLogFunc(..))
2020-05-28 21:21:43 +03:00
import qualified Data.Conduit.Combinators as CC
2020-06-07 03:26:59 +03:00
import qualified System.ProgressBar as PB
import qualified Urbit.EventLog.LMDB as Log
import qualified Urbit.Vere.Serf.IPC as X (Config (..), EvErr (..), Flag (..),
RunReq (..), Serf, WorkError (..),
run, sendSIGINT, snapshot, start,
stop)
--------------------------------------------------------------------------------
parseLogRow :: MonadIO m => ByteString -> m (Mug, Noun)
parseLogRow = cueBSExn >=> fromNounExn
2020-05-28 01:57:34 +03:00
withSerf :: HasLogFunc e => Config -> RAcquire e Serf
withSerf config = mkRAcquire startup kill
where
startup = do
(serf, st) <- io $ start config
logTrace (displayShow st)
pure serf
2020-05-28 01:57:34 +03:00
kill serf = do
void $ rio $ stop serf
execReplay
:: forall e
2020-06-07 03:26:59 +03:00
. (HasLogFunc e, HasStderrLogFunc e)
=> Serf
-> Log.EventLog
-> Maybe Word64
-> RIO e (Either PlayBail Word)
execReplay serf log last = do
2020-05-28 01:57:34 +03:00
lastEventInSnap <- io (serfLastEventBlocking serf)
if lastEventInSnap == 0 then doBoot else doReplay
where
doBoot :: RIO e (Either PlayBail Word)
doBoot = do
2020-05-28 01:57:34 +03:00
logTrace "Beginning boot sequence"
let bootSeqLen = lifecycleLen (Log.identity log)
evs <- runConduit $ Log.streamEvents log 1
.| CC.take (fromIntegral bootSeqLen)
.| CC.mapM (fmap snd . parseLogRow)
.| CC.sinkList
let numEvs = fromIntegral (length evs)
2020-05-28 01:57:34 +03:00
when (numEvs /= bootSeqLen) $ do
throwIO (MissingBootEventsInEventLog numEvs bootSeqLen)
2020-06-08 02:35:54 +03:00
logTrace $ display ("Sending " <> tshow numEvs <> " boot events to serf")
io (boot serf evs) >>= \case
2020-06-08 02:35:54 +03:00
Just err -> do
logTrace "Error on replay, exiting"
pure (Left err)
2020-06-08 02:35:54 +03:00
Nothing -> do
logTrace "Finished boot events, moving on to more events from log."
doReplay <&> \case
Left err -> Left err
Right num -> Right (num + numEvs)
doReplay :: RIO e (Either PlayBail Word)
doReplay = do
2020-02-06 02:20:32 +03:00
logTrace "Beginning event log replay"
2020-05-28 01:57:34 +03:00
lastEventInSnap <- io (serfLastEventBlocking serf)
2020-02-06 02:20:32 +03:00
last & \case
Nothing -> pure ()
Just lt -> logTrace $ display $
"User requested to replay up to event #" <> tshow lt
logLastEv :: Word64 <- atomically $ fromIntegral <$> Log.lastEv log
2020-02-06 02:20:32 +03:00
logTrace $ display $ "Last event in event log is #" <> tshow logLastEv
let replayUpTo = min (fromMaybe logLastEv last) logLastEv
2020-02-06 02:20:32 +03:00
let numEvs :: Int = fromIntegral replayUpTo - fromIntegral lastEventInSnap
when (numEvs < 0) $ do
2020-06-08 02:35:54 +03:00
throwIO (SnapshotAheadOfLog logLastEv lastEventInSnap)
2020-06-07 03:26:59 +03:00
incProgress <- logStderr (trackProgress (fromIntegral numEvs))
2020-02-06 02:20:32 +03:00
logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo
logTrace $ display $ "Will replay " <> tshow numEvs <> " in total."
env <- ask
2020-06-07 03:26:59 +03:00
res <- runResourceT
$ runConduit
$ Log.streamEvents log (lastEventInSnap + 1)
.| CC.take (fromIntegral numEvs)
.| CC.mapM (fmap snd . parseLogRow)
2020-06-07 03:26:59 +03:00
.| replay 5 incProgress serf
res & \case
Nothing -> pure (Right $ fromIntegral numEvs)
Just er -> pure (Left er)
2020-06-07 03:26:59 +03:00
logStderr :: HasStderrLogFunc e => RIO LogFunc a -> RIO e a
logStderr action = do
logFunc <- view stderrLogFuncL
runRIO logFunc action
trackProgress
:: HasLogFunc e
=> Word64
-> RIO e (Int -> IO ())
trackProgress = \case
0 -> pure $ const $ pure ()
num -> do
let style = PB.defStyle { PB.stylePostfix = PB.exact }
let refresh = 10
let init = PB.Progress 0 (fromIntegral num) ()
bar <- PB.newProgressBar style refresh init
env <- ask
let incr = PB.incProgress bar
pure (runRIO env . incr)
-- Collect FX ------------------------------------------------------------------
collectFX :: HasLogFunc e => Serf -> Log.EventLog -> RIO e ()
collectFX serf log = do
lastEv <- io (serfLastEventBlocking serf)
runResourceT
$ runConduit
$ Log.streamEvents log (lastEv + 1)
.| CC.mapM (parseLogRow >=> fromNounExn . snd)
.| swim serf
.| persistFX log
persistFX :: MonadIO m => Log.EventLog -> ConduitT (EventId, FX) Void m ()
persistFX log = CC.mapM_ $ \(eId, fx) -> do
Log.writeEffectsRow log eId $ jamBS $ toNoun fx