mirror of
https://github.com/urbit/shrub.git
synced 2025-01-04 18:43:46 +03:00
510 lines
15 KiB
Haskell
510 lines
15 KiB
Haskell
{-# OPTIONS_GHC -Wwarn #-}
|
|
|
|
{-
|
|
- TODO: `recvLen` is not big-endian safe.
|
|
-}
|
|
|
|
module Vere.Serf ( Serf, sStderr, SerfState(..), doJob
|
|
, run, shutdown, kill
|
|
, replay, bootFromSeq, snapshot
|
|
, collectFX
|
|
, Config(..), Flags, Flag(..)
|
|
) where
|
|
|
|
import UrbitPrelude
|
|
|
|
import Arvo
|
|
import Data.Conduit
|
|
import System.Process
|
|
import System.ProgressBar
|
|
import Vere.Pier.Types
|
|
|
|
import Data.Bits (setBit)
|
|
import Data.ByteString (hGet)
|
|
import Data.ByteString.Unsafe (unsafeUseAsCString)
|
|
import Foreign.Marshal.Alloc (alloca)
|
|
import Foreign.Ptr (castPtr)
|
|
import Foreign.Storable (peek, poke)
|
|
import System.Exit (ExitCode)
|
|
|
|
import qualified Urbit.Ob as Ob
|
|
|
|
import qualified Data.ByteString.Unsafe as BS
|
|
import qualified Data.Text as T
|
|
import qualified System.IO as IO
|
|
import qualified System.IO.Error as IO
|
|
import qualified Urbit.Time as Time
|
|
import qualified Vere.Log as Log
|
|
|
|
|
|
-- Serf Config -----------------------------------------------------------------
|
|
|
|
type Flags = [Flag]
|
|
|
|
data Flag
|
|
= DebugRam
|
|
| DebugCpu
|
|
| CheckCorrupt
|
|
| CheckFatal
|
|
| Verbose
|
|
| DryRun
|
|
| Quiet
|
|
| Hashless
|
|
| Trace
|
|
deriving (Eq, Ord, Show, Enum, Bounded)
|
|
|
|
compileFlags :: [Flag] -> Word
|
|
compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0
|
|
|
|
data Config = Config FilePath [Flag]
|
|
deriving (Show)
|
|
|
|
serf :: HasLogFunc e => Text -> RIO e ()
|
|
serf msg = logInfo $ display ("SERF: " <> msg)
|
|
|
|
|
|
-- Types -----------------------------------------------------------------------
|
|
|
|
data SerfState = SerfState
|
|
{ ssNextEv :: EventId
|
|
, ssLastMug :: Mug
|
|
}
|
|
deriving (Eq, Ord, Show)
|
|
|
|
ssLastEv :: SerfState -> EventId
|
|
ssLastEv = pred . ssNextEv
|
|
|
|
data Serf e = Serf
|
|
{ sendHandle :: Handle
|
|
, recvHandle :: Handle
|
|
, process :: ProcessHandle
|
|
, sStderr :: MVar (Text -> RIO e ())
|
|
}
|
|
|
|
data ShipId = ShipId Ship Bool
|
|
deriving (Eq, Ord, Show)
|
|
|
|
data Plea
|
|
= PPlay EventId Mug
|
|
| PWork Work
|
|
| PDone EventId Mug FX
|
|
| PStdr EventId Cord
|
|
| PSlog EventId Word32 Tank
|
|
deriving (Eq, Show)
|
|
|
|
type ReplacementEv = Job
|
|
type WorkResult = (SerfState, FX)
|
|
type SerfResp = Either ReplacementEv WorkResult
|
|
|
|
data SerfExn
|
|
= BadComputeId EventId WorkResult
|
|
| BadReplacementId EventId ReplacementEv
|
|
| UnexpectedPlay EventId (EventId, Mug)
|
|
| BadPleaAtom Atom
|
|
| BadPleaNoun Noun [Text] Text
|
|
| ReplacedEventDuringReplay EventId ReplacementEv
|
|
| ReplacedEventDuringBoot EventId ReplacementEv
|
|
| EffectsDuringBoot EventId FX
|
|
| SerfConnectionClosed
|
|
| UnexpectedPleaOnNewShip Plea
|
|
| InvalidInitialPlea Plea
|
|
deriving (Show)
|
|
|
|
|
|
-- Instances -------------------------------------------------------------------
|
|
|
|
instance Exception SerfExn
|
|
|
|
deriveNoun ''ShipId
|
|
deriveNoun ''Plea
|
|
|
|
|
|
-- Utils -----------------------------------------------------------------------
|
|
|
|
printTank :: HasLogFunc e => MVar (Text -> RIO e ()) -> Word32 -> Tank -> RIO e ()
|
|
printTank log _pri tank =
|
|
((printErr log) . unlines . fmap unTape . wash (WashCfg 0 80)) tank
|
|
|
|
guardExn :: (Exception e, MonadIO m) => Bool -> e -> m ()
|
|
guardExn ok = io . unless ok . throwIO
|
|
|
|
fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b
|
|
fromRightExn (Left m) exn = throwIO (exn m)
|
|
fromRightExn (Right x) _ = pure x
|
|
|
|
printErr :: MVar (Text -> RIO e ()) -> Text -> RIO e ()
|
|
printErr m txt = do
|
|
f <- readMVar m
|
|
f txt
|
|
|
|
-- Process Management ----------------------------------------------------------
|
|
|
|
run :: HasLogFunc e => Config -> RAcquire e (Serf e)
|
|
run config = mkRAcquire (startUp config) tearDown
|
|
|
|
startUp :: HasLogFunc e => Config -> RIO e (Serf e)
|
|
startUp conf@(Config pierPath flags) = do
|
|
logTrace "STARTING SERF"
|
|
logTrace (displayShow conf)
|
|
|
|
(i, o, e, p) <- io $ do
|
|
(Just i, Just o, Just e, p) <- createProcess pSpec
|
|
pure (i, o, e, p)
|
|
|
|
stderr <- newMVar serf
|
|
async (readStdErr e stderr)
|
|
pure (Serf i o p stderr)
|
|
where
|
|
diskKey = ""
|
|
config = show (compileFlags flags)
|
|
args = [pierPath, diskKey, config]
|
|
pSpec = (proc "urbit-worker" args)
|
|
{ std_in = CreatePipe
|
|
, std_out = CreatePipe
|
|
, std_err = CreatePipe
|
|
}
|
|
|
|
readStdErr :: ∀e. HasLogFunc e => Handle -> MVar (Text -> RIO e ()) -> RIO e ()
|
|
readStdErr h print =
|
|
untilEOFExn $ do
|
|
raw <- io $ IO.hGetLine h
|
|
let ln = T.strip (pack raw)
|
|
printErr print ln
|
|
serf ("[stderr] " <> ln)
|
|
where
|
|
eofMsg = "[Serf.readStdErr] serf stderr closed"
|
|
|
|
untilEOFExn :: RIO e () -> RIO e ()
|
|
untilEOFExn act = loop
|
|
where
|
|
loop :: RIO e ()
|
|
loop = do
|
|
env <- ask
|
|
res <- io $ IO.tryIOError $ runRIO env act
|
|
case res of
|
|
Left exn | IO.isEOFError exn -> logDebug eofMsg
|
|
Left exn -> io (IO.ioError exn)
|
|
Right () -> loop
|
|
|
|
tearDown :: HasLogFunc e => Serf e -> RIO e ()
|
|
tearDown serf = do
|
|
io $ terminateProcess (process serf)
|
|
void $ waitForExit serf
|
|
|
|
-- race_ waitThenKill (shutdownAndWait serf 0)
|
|
where
|
|
-- killedMsg =
|
|
-- "[Serf.tearDown]: Serf didn't die when asked, killing it"
|
|
|
|
-- waitThenKill = do
|
|
-- threadDelay 1000000
|
|
-- debug killedMsg
|
|
-- terminateProcess (process serf)
|
|
|
|
waitForExit :: HasLogFunc e => Serf e -> RIO e ExitCode
|
|
waitForExit = io . waitForProcess . process
|
|
|
|
kill :: HasLogFunc e => Serf e -> RIO e ExitCode
|
|
kill serf = io (terminateProcess $ process serf) >> waitForExit serf
|
|
|
|
_shutdownAndWait :: HasLogFunc e => Serf e -> Word8 -> RIO e ExitCode
|
|
_shutdownAndWait serf code = do
|
|
shutdown serf code
|
|
waitForExit serf
|
|
|
|
|
|
-- Basic Send and Receive Operations -------------------------------------------
|
|
|
|
withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a
|
|
withWord64AsByteString w k = do
|
|
env <- ask
|
|
io $ alloca $ \wp -> do
|
|
poke wp w
|
|
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
|
|
runRIO env (k bs)
|
|
|
|
sendLen :: HasLogFunc e => Serf e -> Int -> RIO e ()
|
|
sendLen s i = do
|
|
w <- evaluate (fromIntegral i :: Word64)
|
|
withWord64AsByteString (fromIntegral i) (hPut (sendHandle s))
|
|
|
|
sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e ()
|
|
sendOrder w o = do
|
|
-- logDebug $ display ("(sendOrder) " <> tshow o)
|
|
sendBytes w $ jamBS $ toNoun o
|
|
-- logDebug "(sendOrder) Done"
|
|
|
|
sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e ()
|
|
sendBytes s bs = handle ioErr $ do
|
|
sendLen s (length bs)
|
|
hPut (sendHandle s) bs
|
|
hFlush (sendHandle s)
|
|
|
|
where
|
|
ioErr :: IOError -> RIO e ()
|
|
ioErr _ = throwIO SerfConnectionClosed
|
|
|
|
recvLen :: (MonadIO m, HasLogFunc e) => Serf e -> m Word64
|
|
recvLen w = io $ do
|
|
bs <- hGet (recvHandle w) 8
|
|
case length bs of
|
|
8 -> unsafeUseAsCString bs (peek . castPtr)
|
|
_ -> throwIO SerfConnectionClosed
|
|
|
|
recvBytes :: HasLogFunc e => Serf e -> Word64 -> RIO e ByteString
|
|
recvBytes serf =
|
|
io . hGet (recvHandle serf) . fromIntegral
|
|
|
|
recvAtom :: HasLogFunc e => Serf e -> RIO e Atom
|
|
recvAtom w = do
|
|
len <- recvLen w
|
|
bs <- recvBytes w len
|
|
pure (packAtom bs)
|
|
where
|
|
packAtom :: ByteString -> Atom
|
|
packAtom = view (from atomBytes)
|
|
|
|
cordText :: Cord -> Text
|
|
cordText = T.strip . unCord
|
|
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e ()
|
|
snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss
|
|
|
|
shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e ()
|
|
shutdown serf code = sendOrder serf (OExit code)
|
|
|
|
{-
|
|
TODO Find a cleaner way to handle `PStdr` Pleas.
|
|
-}
|
|
recvPlea :: HasLogFunc e => Serf e -> RIO e Plea
|
|
recvPlea w = do
|
|
logDebug "(recvPlea) Waiting"
|
|
a <- recvAtom w
|
|
logDebug "(recvPlea) Got atom"
|
|
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
|
|
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun n p m)
|
|
|
|
case p of PStdr e msg -> do printErr (sStderr w) (cordText msg)
|
|
recvPlea w
|
|
PSlog _ pri t -> do printTank (sStderr w) pri t
|
|
recvPlea w
|
|
_ -> do logTrace "recvPlea got something else"
|
|
pure p
|
|
|
|
{-
|
|
Waits for initial plea, and then sends boot IPC if necessary.
|
|
-}
|
|
handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState
|
|
handshake serf ident = do
|
|
ss@SerfState{..} <- recvPlea serf >>= \case
|
|
PPlay e m -> pure $ SerfState e m
|
|
x -> throwIO (InvalidInitialPlea x)
|
|
|
|
when (ssNextEv == 1) $ do
|
|
sendOrder serf (OBoot (lifecycleLen ident))
|
|
|
|
pure ss
|
|
|
|
sendWork :: ∀e. HasLogFunc e => Serf e -> Job -> RIO e SerfResp
|
|
sendWork w job =
|
|
do
|
|
sendOrder w (OWork job)
|
|
res <- loop
|
|
logTrace ("[sendWork] Got response")
|
|
pure res
|
|
where
|
|
eId = jobId job
|
|
|
|
produce :: WorkResult -> RIO e SerfResp
|
|
produce (ss@SerfState{..}, o) = do
|
|
guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o))
|
|
pure $ Right (ss, o)
|
|
|
|
replace :: ReplacementEv -> RIO e SerfResp
|
|
replace job = do
|
|
guardExn (jobId job == eId) (BadReplacementId eId job)
|
|
pure (Left job)
|
|
|
|
loop :: RIO e SerfResp
|
|
loop = recvPlea w >>= \case
|
|
PPlay e m -> throwIO (UnexpectedPlay eId (e, m))
|
|
PDone i m o -> produce (SerfState (i+1) m, o)
|
|
PWork work -> replace (DoWork work)
|
|
PStdr _ cord -> printErr (sStderr w) (cordText cord) >> loop
|
|
PSlog _ pri t -> printTank (sStderr w) pri t >> loop
|
|
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
doJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState, FX)
|
|
doJob serf job = do
|
|
sendWork serf job >>= \case
|
|
Left replaced -> doJob serf replaced
|
|
Right (ss, fx) -> pure (job, ss, fx)
|
|
|
|
bootJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState)
|
|
bootJob serf job = do
|
|
doJob serf job >>= \case
|
|
(job, ss, _) -> pure (job, ss)
|
|
-- (job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx)
|
|
|
|
replayJob :: HasLogFunc e => Serf e -> Job -> RIO e SerfState
|
|
replayJob serf job = do
|
|
sendWork serf job >>= \case
|
|
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
|
|
Right (ss, _) -> pure ss
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
updateProgressBar :: Int -> Text -> Maybe (ProgressBar ())
|
|
-> RIO e (Maybe (ProgressBar ()))
|
|
updateProgressBar count startMsg = \case
|
|
Nothing -> do
|
|
-- We only construct the progress bar on the first time that we
|
|
-- process an event so that we don't display an empty progress
|
|
-- bar when the snapshot is caught up to the log.
|
|
putStrLn startMsg
|
|
let style = defStyle { stylePostfix = exact }
|
|
pb <- io $ newProgressBar style 10 (Progress 0 count ())
|
|
pure (Just pb)
|
|
Just pb -> do
|
|
io $ incProgress pb 1
|
|
pure (Just pb)
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
type BootSeqFn = EventId -> Mug -> Time.Wen -> Job
|
|
|
|
data BootExn = ShipAlreadyBooted
|
|
deriving stock (Eq, Ord, Show)
|
|
deriving anyclass (Exception)
|
|
|
|
bootFromSeq :: ∀e. HasLogFunc e => Serf e -> BootSeq -> RIO e ([Job], SerfState)
|
|
bootFromSeq serf (BootSeq ident nocks ovums) = do
|
|
handshake serf ident >>= \case
|
|
ss@(SerfState 1 (Mug 0)) -> loop [] ss Nothing bootSeqFns
|
|
_ -> throwIO ShipAlreadyBooted
|
|
|
|
where
|
|
loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn]
|
|
-> RIO e ([Job], SerfState)
|
|
loop acc ss pb = \case
|
|
[] -> do
|
|
pb <- updateProgressBar 0 bootMsg pb
|
|
pure (reverse acc, ss)
|
|
x:xs -> do
|
|
wen <- io Time.now
|
|
job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen
|
|
pb <- updateProgressBar (1 + length xs) bootMsg pb
|
|
(job, ss) <- bootJob serf job
|
|
loop (job:acc) ss pb xs
|
|
|
|
bootSeqFns :: [BootSeqFn]
|
|
bootSeqFns = fmap muckNock nocks <> fmap muckOvum ovums
|
|
where
|
|
muckNock nok eId mug _ = RunNok $ LifeCyc eId mug nok
|
|
muckOvum ov eId mug wen = DoWork $ Work eId mug wen ov
|
|
|
|
bootMsg = "Booting " ++ (fakeStr (isFake ident)) ++
|
|
(Ob.renderPatp (Ob.patp (fromIntegral (who ident))))
|
|
fakeStr True = "fake "
|
|
fakeStr False = ""
|
|
|
|
{-
|
|
The ship is booted, but it is behind. shove events to the worker
|
|
until it is caught up.
|
|
-}
|
|
replayJobs :: HasLogFunc e
|
|
=> Serf e -> Int -> SerfState -> ConduitT Job Void (RIO e) SerfState
|
|
replayJobs serf lastEv = go Nothing
|
|
where
|
|
go pb ss = do
|
|
await >>= \case
|
|
Nothing -> pure ss
|
|
Just job -> do
|
|
pb <- lift $ updatePb ss pb
|
|
played <- lift $ replayJob serf job
|
|
go pb played
|
|
|
|
updatePb ss = do
|
|
let start = lastEv - (fromIntegral (ssNextEv ss))
|
|
let msg = pack ("Replaying events #" ++ (show (ssNextEv ss)) ++
|
|
" to #" ++ (show lastEv))
|
|
updateProgressBar start msg
|
|
|
|
|
|
replay :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e SerfState
|
|
replay serf log = do
|
|
ss <- handshake serf (Log.identity log)
|
|
|
|
lastEv <- Log.lastEv log
|
|
runConduit $ Log.streamEvents log (ssNextEv ss)
|
|
.| toJobs (Log.identity log) (ssNextEv ss)
|
|
.| replayJobs serf (fromIntegral lastEv) ss
|
|
|
|
toJobs :: HasLogFunc e
|
|
=> LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) ()
|
|
toJobs ident eId =
|
|
await >>= \case
|
|
Nothing -> lift $ logTrace "[toJobs] no more jobs"
|
|
Just at -> do yield =<< lift (fromAtom at)
|
|
lift $ logTrace $ display ("[toJobs] " <> tshow eId)
|
|
toJobs ident (eId+1)
|
|
where
|
|
isNock = eId <= fromIntegral (lifecycleLen ident)
|
|
|
|
fromAtom :: ByteString -> RIO e Job
|
|
fromAtom bs | isNock = do
|
|
noun <- cueBSExn bs
|
|
(mug, nok) <- fromNounExn noun
|
|
pure $ RunNok (LifeCyc eId mug nok)
|
|
fromAtom bs = do
|
|
noun <- cueBSExn bs
|
|
(mug, wen, ovm) <- fromNounExn noun
|
|
pure $ DoWork (Work eId mug wen ovm)
|
|
|
|
|
|
-- Collect Effects for Parsing -------------------------------------------------
|
|
|
|
collectFX :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e ()
|
|
collectFX serf log = do
|
|
ss <- handshake serf (Log.identity log)
|
|
|
|
runConduit $ Log.streamEvents log (ssNextEv ss)
|
|
.| toJobs (Log.identity log) (ssNextEv ss)
|
|
.| doCollectFX serf ss
|
|
.| persistFX log
|
|
|
|
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) ()
|
|
persistFX log = loop
|
|
where
|
|
loop = await >>= \case
|
|
Nothing -> pure ()
|
|
Just (eId, fx) -> do
|
|
lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
|
|
loop
|
|
|
|
doCollectFX :: ∀e. HasLogFunc e
|
|
=> Serf e -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
|
|
doCollectFX serf = go
|
|
where
|
|
go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
|
|
go ss = await >>= \case
|
|
Nothing -> pure ()
|
|
Just jb -> do
|
|
-- jb <- pure $ replaceMug jb (ssLastMug ss)
|
|
(_, ss, fx) <- lift $ doJob serf jb
|
|
when (0 == (jobId jb `mod` 10_000)) $ do
|
|
lift $ logTrace $ displayShow (jobId jb)
|
|
yield (jobId jb, fx)
|
|
go ss
|
|
|
|
_replaceMug :: Job -> Mug -> Job
|
|
_replaceMug jb mug =
|
|
case jb of
|
|
DoWork (Work eId _ w o) -> DoWork (Work eId mug w o)
|
|
RunNok (LifeCyc eId _ n) -> RunNok (LifeCyc eId mug n)
|