shrub/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs

548 lines
17 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
Serf Interface
TODO: `recvLen` is not big-endian safe.
2019-07-17 02:14:46 +03:00
-}
module Urbit.Vere.Serf ( Serf, sStderr, SerfState(..), doJob
, run, shutdown, kill
, replay, bootFromSeq, snapshot
, collectFX
, Config(..), Flags, Flag(..)
) where
import Urbit.Prelude
import Data.Conduit
import System.Process
import System.ProgressBar
import Urbit.Arvo
import Urbit.Vere.Pier.Types
2019-07-22 00:24:07 +03:00
import Data.Bits (setBit)
2019-07-12 04:16:40 +03:00
import Data.ByteString (hGet)
import Data.ByteString.Unsafe (unsafeUseAsCString)
2019-07-12 04:16:40 +03:00
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (castPtr)
2019-07-12 22:24:44 +03:00
import Foreign.Storable (peek, poke)
2019-07-12 04:16:40 +03:00
import System.Exit (ExitCode)
2020-02-04 04:27:16 +03:00
import Urbit.King.App (HasStderrLogFunc(..))
import qualified Data.ByteString.Unsafe as BS
import qualified Data.Conduit.Combinators as CC
import qualified Data.Text as T
import qualified System.IO as IO
import qualified System.IO.Error as IO
import qualified Urbit.Ob as Ob
import qualified Urbit.Time as Time
import qualified Urbit.Vere.Log as Log
2019-07-22 00:24:07 +03:00
-- Serf Config -----------------------------------------------------------------
type Flags = [Flag]
data Flag
= DebugRam
| DebugCpu
| CheckCorrupt
| CheckFatal
| Verbose
| DryRun
| Quiet
| Hashless
| Trace
deriving (Eq, Ord, Show, Enum, Bounded)
compileFlags :: [Flag] -> Word
compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0
data Config = Config FilePath [Flag]
deriving (Show)
2019-08-28 15:22:56 +03:00
serf :: HasLogFunc e => Text -> RIO e ()
2019-12-19 12:42:49 +03:00
serf msg = logInfo $ display ("SERF: " <> msg)
2019-07-22 00:24:07 +03:00
2019-07-17 02:14:46 +03:00
-- Types -----------------------------------------------------------------------
data SerfState = SerfState
{ ssNextEv :: EventId
, ssLastMug :: Mug
}
deriving (Eq, Ord, Show)
2019-08-15 05:42:48 +03:00
ssLastEv :: SerfState -> EventId
ssLastEv = pred . ssNextEv
2019-09-04 01:17:20 +03:00
data Serf e = Serf
{ sendHandle :: Handle
, recvHandle :: Handle
, process :: ProcessHandle
2019-09-13 02:14:57 +03:00
, sStderr :: MVar (Text -> RIO e ())
}
data ShipId = ShipId Ship Bool
deriving (Eq, Ord, Show)
data Plea
= PPlay EventId Mug
| PWork Work
| PDone EventId Mug FX
| PStdr EventId Cord
| PSlog EventId Word32 Tank
2019-06-01 03:21:44 +03:00
deriving (Eq, Show)
type ReplacementEv = Job
type WorkResult = (SerfState, FX)
type SerfResp = Either ReplacementEv WorkResult
2019-06-01 03:21:44 +03:00
data SerfExn
2019-06-01 03:21:44 +03:00
= BadComputeId EventId WorkResult
| BadReplacementId EventId ReplacementEv
| UnexpectedPlay EventId (EventId, Mug)
2019-06-01 03:21:44 +03:00
| BadPleaAtom Atom
| BadPleaNoun Noun [Text] Text
| ReplacedEventDuringReplay EventId ReplacementEv
2019-07-16 03:01:45 +03:00
| ReplacedEventDuringBoot EventId ReplacementEv
| EffectsDuringBoot EventId FX
| SerfConnectionClosed
| UnexpectedPleaOnNewShip Plea
| InvalidInitialPlea Plea
2019-06-01 03:21:44 +03:00
deriving (Show)
2019-07-17 02:14:46 +03:00
-- Instances -------------------------------------------------------------------
instance Exception SerfExn
2019-06-01 03:21:44 +03:00
deriveNoun ''ShipId
2019-07-17 02:14:46 +03:00
deriveNoun ''Plea
2019-06-01 03:21:44 +03:00
-- Utils -----------------------------------------------------------------------
2020-01-23 07:16:09 +03:00
printTank :: HasLogFunc e
=> MVar (Text -> RIO e ()) -> Word32 -> Tank
-> RIO e ()
printTank log _pri = printErr log . unlines . fmap unTape . wash (WashCfg 0 80)
2019-08-28 15:22:56 +03:00
guardExn :: (Exception e, MonadIO m) => Bool -> e -> m ()
guardExn ok = io . unless ok . throwIO
2019-06-01 03:21:44 +03:00
2019-08-28 15:22:56 +03:00
fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b
fromRightExn (Left m) exn = throwIO (exn m)
fromRightExn (Right x) _ = pure x
2019-09-13 02:14:57 +03:00
printErr :: MVar (Text -> RIO e ()) -> Text -> RIO e ()
printErr m txt = do
f <- readMVar m
f txt
2019-07-17 02:14:46 +03:00
2020-01-23 07:16:09 +03:00
2019-07-17 02:14:46 +03:00
-- Process Management ----------------------------------------------------------
2019-09-04 01:17:20 +03:00
run :: HasLogFunc e => Config -> RAcquire e (Serf e)
2019-08-28 15:22:56 +03:00
run config = mkRAcquire (startUp config) tearDown
2019-09-04 01:17:20 +03:00
startUp :: HasLogFunc e => Config -> RIO e (Serf e)
startUp conf@(Config pierPath flags) = do
2019-08-28 15:22:56 +03:00
logTrace "STARTING SERF"
logTrace (displayShow conf)
(i, o, e, p) <- io $ do
(Just i, Just o, Just e, p) <- createProcess pSpec
pure (i, o, e, p)
stderr <- newMVar serf
2019-09-13 02:14:57 +03:00
async (readStdErr e stderr)
pure (Serf i o p stderr)
2019-07-17 02:14:46 +03:00
where
diskKey = ""
2019-07-22 00:24:07 +03:00
config = show (compileFlags flags)
2019-07-21 23:30:30 +03:00
args = [pierPath, diskKey, config]
pSpec = (proc "urbit-worker" args)
2019-07-17 02:14:46 +03:00
{ std_in = CreatePipe
, std_out = CreatePipe
2019-07-21 23:30:30 +03:00
, std_err = CreatePipe
2019-07-17 02:14:46 +03:00
}
2019-09-13 02:14:57 +03:00
readStdErr :: e. HasLogFunc e => Handle -> MVar (Text -> RIO e ()) -> RIO e ()
readStdErr h print =
2019-07-21 23:30:30 +03:00
untilEOFExn $ do
2019-09-13 02:14:57 +03:00
raw <- io $ IO.hGetLine h
let ln = T.strip (pack raw)
printErr print ln
serf ("[stderr] " <> ln)
2019-07-21 23:30:30 +03:00
where
eofMsg = "[Serf.readStdErr] serf stderr closed"
2019-07-22 00:24:07 +03:00
2019-08-28 15:22:56 +03:00
untilEOFExn :: RIO e () -> RIO e ()
2019-07-21 23:30:30 +03:00
untilEOFExn act = loop
where
2019-08-28 15:22:56 +03:00
loop :: RIO e ()
2019-07-21 23:30:30 +03:00
loop = do
2019-08-28 15:22:56 +03:00
env <- ask
res <- io $ IO.tryIOError $ runRIO env act
case res of
Left exn | IO.isEOFError exn -> logDebug eofMsg
Left exn -> io (IO.ioError exn)
Right () -> loop
2019-09-04 01:17:20 +03:00
tearDown :: HasLogFunc e => Serf e -> RIO e ()
tearDown serf = do
2019-08-28 15:22:56 +03:00
io $ terminateProcess (process serf)
void $ waitForExit serf
2019-08-13 08:56:31 +03:00
-- race_ waitThenKill (shutdownAndWait serf 0)
2019-07-22 00:24:07 +03:00
where
2019-08-13 08:56:31 +03:00
-- killedMsg =
-- "[Serf.tearDown]: Serf didn't die when asked, killing it"
2019-07-22 00:24:07 +03:00
2019-08-13 08:56:31 +03:00
-- waitThenKill = do
-- threadDelay 1000000
-- debug killedMsg
-- terminateProcess (process serf)
2019-09-04 01:17:20 +03:00
waitForExit :: HasLogFunc e => Serf e -> RIO e ExitCode
2019-08-28 15:22:56 +03:00
waitForExit = io . waitForProcess . process
2019-09-04 01:17:20 +03:00
kill :: HasLogFunc e => Serf e -> RIO e ExitCode
2019-08-28 15:22:56 +03:00
kill serf = io (terminateProcess $ process serf) >> waitForExit serf
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
_shutdownAndWait :: HasLogFunc e => Serf e -> Word8 -> RIO e ExitCode
2019-08-28 15:22:56 +03:00
_shutdownAndWait serf code = do
shutdown serf code
waitForExit serf
2019-07-17 02:14:46 +03:00
-- Basic Send and Receive Operations -------------------------------------------
2019-08-28 15:22:56 +03:00
withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a
2019-07-17 02:14:46 +03:00
withWord64AsByteString w k = do
2019-08-28 15:22:56 +03:00
env <- ask
io $ alloca $ \wp -> do
poke wp w
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
runRIO env (k bs)
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
sendLen :: HasLogFunc e => Serf e -> Int -> RIO e ()
2019-07-17 02:14:46 +03:00
sendLen s i = do
w <- evaluate (fromIntegral i :: Word64)
withWord64AsByteString (fromIntegral i) (hPut (sendHandle s))
2019-09-04 01:17:20 +03:00
sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e ()
2019-07-17 02:14:46 +03:00
sendOrder w o = do
-- logDebug $ display ("(sendOrder) " <> tshow o)
sendBytes w $ jamBS $ toNoun o
-- logDebug "(sendOrder) Done"
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e ()
2019-08-13 08:56:31 +03:00
sendBytes s bs = handle ioErr $ do
2019-07-17 02:14:46 +03:00
sendLen s (length bs)
hPut (sendHandle s) bs
hFlush (sendHandle s)
2019-08-13 08:56:31 +03:00
where
2019-08-28 15:22:56 +03:00
ioErr :: IOError -> RIO e ()
2019-08-13 08:56:31 +03:00
ioErr _ = throwIO SerfConnectionClosed
2019-09-04 01:17:20 +03:00
recvLen :: (MonadIO m, HasLogFunc e) => Serf e -> m Word64
2019-08-28 15:22:56 +03:00
recvLen w = io $ do
2019-07-17 02:14:46 +03:00
bs <- hGet (recvHandle w) 8
case length bs of
8 -> unsafeUseAsCString bs (peek . castPtr)
_ -> throwIO SerfConnectionClosed
2019-09-04 01:17:20 +03:00
recvBytes :: HasLogFunc e => Serf e -> Word64 -> RIO e ByteString
2019-08-28 15:22:56 +03:00
recvBytes serf =
io . hGet (recvHandle serf) . fromIntegral
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
recvAtom :: HasLogFunc e => Serf e -> RIO e Atom
2019-07-17 02:14:46 +03:00
recvAtom w = do
len <- recvLen w
bytesAtom <$> recvBytes w len
2019-07-17 02:14:46 +03:00
cordText :: Cord -> Text
2019-07-22 21:10:27 +03:00
cordText = T.strip . unCord
2019-07-17 02:14:46 +03:00
2019-06-01 03:21:44 +03:00
--------------------------------------------------------------------------------
2019-09-04 01:17:20 +03:00
snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e ()
2020-02-06 02:20:32 +03:00
snapshot serf ss = do
logTrace $ display ("Taking snapshot at event " <> tshow (ssLastEv ss))
sendOrder serf $ OSave $ ssLastEv ss
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e ()
shutdown serf code = sendOrder serf (OExit code)
2019-07-17 02:14:46 +03:00
2020-01-23 07:16:09 +03:00
{-|
TODO Find a cleaner way to handle `PStdr` Pleas.
2019-07-17 02:14:46 +03:00
-}
2019-09-04 01:17:20 +03:00
recvPlea :: HasLogFunc e => Serf e -> RIO e Plea
2019-07-17 02:14:46 +03:00
recvPlea w = do
2019-12-19 12:42:49 +03:00
logDebug "(recvPlea) Waiting"
2019-07-17 02:14:46 +03:00
a <- recvAtom w
2019-12-19 12:42:49 +03:00
logDebug "(recvPlea) Got atom"
2019-07-17 02:14:46 +03:00
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun n p m)
2019-07-17 02:14:46 +03:00
2019-09-13 02:14:57 +03:00
case p of PStdr e msg -> do printErr (sStderr w) (cordText msg)
recvPlea w
2019-09-04 01:17:20 +03:00
PSlog _ pri t -> do printTank (sStderr w) pri t
recvPlea w
2019-12-19 12:42:49 +03:00
_ -> do logTrace "recvPlea got something else"
pure p
2019-07-17 02:14:46 +03:00
2020-01-23 07:16:09 +03:00
{-|
2019-07-17 02:14:46 +03:00
Waits for initial plea, and then sends boot IPC if necessary.
-}
2019-09-04 01:17:20 +03:00
handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState
2019-07-17 02:14:46 +03:00
handshake serf ident = do
2020-02-06 02:20:32 +03:00
logTrace "Serf Handshake"
ss@SerfState{..} <- recvPlea serf >>= \case
PPlay e m -> pure $ SerfState e m
x -> throwIO (InvalidInitialPlea x)
2019-07-17 02:14:46 +03:00
2020-02-06 02:20:32 +03:00
logTrace $ display ("Handshake result: " <> tshow ss)
when (ssNextEv == 1) $ do
2020-02-06 02:20:32 +03:00
let ev = OBoot (lifecycleLen ident)
logTrace $ display ("No snapshot. Sending boot event: " <> tshow ev)
sendOrder serf ev
logTrace "Finished handshake"
2019-07-17 02:14:46 +03:00
pure ss
2019-07-17 02:14:46 +03:00
2019-09-04 01:17:20 +03:00
sendWork :: e. HasLogFunc e => Serf e -> Job -> RIO e SerfResp
sendWork w job =
do
sendOrder w (OWork job)
res <- loop
2019-12-19 12:42:49 +03:00
logTrace ("[sendWork] Got response")
pure res
where
eId = jobId job
2019-08-28 15:22:56 +03:00
produce :: WorkResult -> RIO e SerfResp
produce (ss@SerfState{..}, o) = do
guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o))
pure $ Right (ss, o)
2019-06-01 03:21:44 +03:00
2019-08-28 15:22:56 +03:00
replace :: ReplacementEv -> RIO e SerfResp
replace job = do
guardExn (jobId job == eId) (BadReplacementId eId job)
pure (Left job)
2019-08-28 15:22:56 +03:00
loop :: RIO e SerfResp
2019-06-01 03:21:44 +03:00
loop = recvPlea w >>= \case
PPlay e m -> throwIO (UnexpectedPlay eId (e, m))
PDone i m o -> produce (SerfState (i+1) m, o)
PWork work -> replace (DoWork work)
2019-09-13 02:14:57 +03:00
PStdr _ cord -> printErr (sStderr w) (cordText cord) >> loop
2019-09-04 01:17:20 +03:00
PSlog _ pri t -> printTank (sStderr w) pri t >> loop
--------------------------------------------------------------------------------
2019-09-04 01:17:20 +03:00
doJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState, FX)
doJob serf job = do
sendWork serf job >>= \case
Left replaced -> doJob serf replaced
Right (ss, fx) -> pure (job, ss, fx)
2019-09-04 01:17:20 +03:00
bootJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState)
bootJob serf job = do
doJob serf job >>= \case
2019-09-04 03:11:24 +03:00
(job, ss, _) -> pure (job, ss)
-- (job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx)
2019-09-04 01:17:20 +03:00
replayJob :: HasLogFunc e => Serf e -> Job -> RIO e SerfState
replayJob serf job = do
sendWork serf job >>= \case
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
Right (ss, _) -> pure ss
--------------------------------------------------------------------------------
updateProgressBar :: HasLogFunc e
=> Int -> Text -> Maybe (ProgressBar ())
-> RIO e (Maybe (ProgressBar ()))
updateProgressBar count startMsg = \case
Nothing -> do
-- We only construct the progress bar on the first time that we
-- process an event so that we don't display an empty progress
-- bar when the snapshot is caught up to the log.
2020-02-04 04:27:16 +03:00
let style = defStyle { stylePrefix = msg (fromStrict startMsg) }
pb <- newProgressBar style 10 (Progress 0 count ())
pure (Just pb)
Just pb -> do
incProgress pb 1
pure (Just pb)
--------------------------------------------------------------------------------
type BootSeqFn = EventId -> Mug -> Time.Wen -> Job
data BootExn = ShipAlreadyBooted
deriving stock (Eq, Ord, Show)
deriving anyclass (Exception)
2020-02-04 04:27:16 +03:00
logStderr :: HasStderrLogFunc e => RIO LogFunc a -> RIO e a
logStderr action = do
logFunc <- view stderrLogFuncL
runRIO logFunc action
bootFromSeq :: e. (HasStderrLogFunc e, HasLogFunc e)
=> Serf e -> BootSeq -> RIO e ([Job], SerfState)
2019-07-16 03:23:48 +03:00
bootFromSeq serf (BootSeq ident nocks ovums) = do
handshake serf ident >>= \case
ss@(SerfState 1 (Mug 0)) -> loop [] ss Nothing bootSeqFns
_ -> throwIO ShipAlreadyBooted
2019-07-16 03:01:45 +03:00
where
loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn]
-> RIO e ([Job], SerfState)
loop acc ss pb = \case
[] -> do
2020-02-04 04:27:16 +03:00
pb <- logStderr (updateProgressBar 0 bootMsg pb)
pure (reverse acc, ss)
x:xs -> do
wen <- io Time.now
job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen
2020-02-04 04:27:16 +03:00
pb <- logStderr (updateProgressBar (1 + length xs) bootMsg pb)
(job, ss) <- bootJob serf job
loop (job:acc) ss pb xs
bootSeqFns :: [BootSeqFn]
bootSeqFns = fmap muckNock nocks <> fmap muckOvum ovums
2019-07-16 03:23:48 +03:00
where
muckNock nok eId mug _ = RunNok $ LifeCyc eId mug nok
muckOvum ov eId mug wen = DoWork $ Work eId mug wen ov
bootMsg = "Booting " ++ (fakeStr (isFake ident)) ++
2019-09-21 02:10:03 +03:00
(Ob.renderPatp (Ob.patp (fromIntegral (who ident))))
fakeStr True = "fake "
fakeStr False = ""
2020-01-23 07:16:09 +03:00
{-|
2019-07-17 02:14:46 +03:00
The ship is booted, but it is behind. shove events to the worker
until it is caught up.
-}
2020-02-04 04:27:16 +03:00
replayJobs :: (HasStderrLogFunc e, HasLogFunc e)
=> Serf e -> Int -> SerfState -> ConduitT Job Void (RIO e) SerfState
replayJobs serf lastEv = go Nothing
where
go pb ss = do
2020-02-06 02:20:32 +03:00
await >>= \case
Nothing -> pure ss
Just job -> do
pb <- lift $ logStderr (updatePb ss pb)
played <- lift $ replayJob serf job
go pb played
updatePb ss = do
2020-02-06 02:20:32 +03:00
let start = lastEv - fromIntegral (ssNextEv ss)
let msg = pack ( "Replaying events #" ++ (show (ssNextEv ss))
<> " to #" ++ (show lastEv)
)
updateProgressBar start msg
2019-09-13 02:20:15 +03:00
2020-02-04 04:27:16 +03:00
replay :: (HasStderrLogFunc e, HasLogFunc e)
=> Serf e -> Log.EventLog -> Maybe Word64 -> RIO e SerfState
replay serf log last = do
2020-02-06 02:20:32 +03:00
logTrace "Beginning event log replay"
last & \case
Nothing -> pure ()
Just lt -> logTrace $ display $
"User requested to replay up to event #" <> tshow lt
ss <- handshake serf (Log.identity log)
2020-02-06 02:20:32 +03:00
logLastEv :: Word64 <- fromIntegral <$> Log.lastEv log
let serfNextEv = ssNextEv ss
lastEventInSnap = serfNextEv - 1
logTrace $ display $ "Last event in event log is #" <> tshow logLastEv
let replayUpTo = fromMaybe logLastEv last
let numEvs :: Int = fromIntegral replayUpTo - fromIntegral lastEventInSnap
2020-02-06 02:20:32 +03:00
logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo
logTrace $ display $ "Will replay " <> tshow numEvs <> " in total."
2020-02-06 02:20:32 +03:00
runConduit $ Log.streamEvents log serfNextEv
.| CC.take (fromIntegral numEvs)
.| toJobs (Log.identity log) serfNextEv
.| replayJobs serf (fromIntegral replayUpTo) ss
2019-08-28 15:22:56 +03:00
toJobs :: HasLogFunc e
=> LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) ()
toJobs ident eId =
await >>= \case
2019-08-28 15:22:56 +03:00
Nothing -> lift $ logTrace "[toJobs] no more jobs"
Just at -> do yield =<< lift (fromAtom at)
2019-12-19 12:42:49 +03:00
lift $ logTrace $ display ("[toJobs] " <> tshow eId)
toJobs ident (eId+1)
where
2019-08-28 15:22:56 +03:00
isNock = eId <= fromIntegral (lifecycleLen ident)
2019-08-28 15:22:56 +03:00
fromAtom :: ByteString -> RIO e Job
fromAtom bs | isNock = do
noun <- cueBSExn bs
(mug, nok) <- fromNounExn noun
pure $ RunNok (LifeCyc eId mug nok)
fromAtom bs = do
noun <- cueBSExn bs
(mug, wen, ovm) <- fromNounExn noun
pure $ DoWork (Work eId mug wen ovm)
-- Collect Effects for Parsing -------------------------------------------------
2019-09-04 01:17:20 +03:00
collectFX :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e ()
collectFX serf log = do
ss <- handshake serf (Log.identity log)
2019-07-17 02:14:46 +03:00
runConduit $ Log.streamEvents log (ssNextEv ss)
.| toJobs (Log.identity log) (ssNextEv ss)
.| doCollectFX serf ss
.| persistFX log
2019-08-28 15:22:56 +03:00
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) ()
persistFX log = loop
where
loop = await >>= \case
Nothing -> pure ()
Just (eId, fx) -> do
2019-08-29 03:26:59 +03:00
lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
loop
2019-08-28 15:22:56 +03:00
doCollectFX :: e. HasLogFunc e
2019-09-04 01:17:20 +03:00
=> Serf e -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
doCollectFX serf = go
where
2019-08-28 15:22:56 +03:00
go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
go ss = await >>= \case
Nothing -> pure ()
Just jb -> do
-- jb <- pure $ replaceMug jb (ssLastMug ss)
2019-08-28 15:22:56 +03:00
(_, ss, fx) <- lift $ doJob serf jb
when (0 == (jobId jb `mod` 10_000)) $ do
lift $ logTrace $ displayShow (jobId jb)
yield (jobId jb, fx)
go ss
_replaceMug :: Job -> Mug -> Job
_replaceMug jb mug =
case jb of
DoWork (Work eId _ w o) -> DoWork (Work eId mug w o)
RunNok (LifeCyc eId _ n) -> RunNok (LifeCyc eId mug n)