shrub/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs

516 lines
16 KiB
Haskell

{-|
Serf Interface
TODO: `recvLen` is not big-endian safe.
-}
module Urbit.Vere.Serf ( Serf, sStderr, SerfState(..), doJob
, run, shutdown, kill
, replay, bootFromSeq, snapshot
, collectFX
, Config(..), Flags, Flag(..)
) where
import Urbit.Prelude
import Data.Conduit
import System.Process
import System.ProgressBar
import Urbit.Arvo
import Urbit.Vere.Pier.Types
import Data.Bits (setBit)
import Data.ByteString (hGet)
import Data.ByteString.Unsafe (unsafeUseAsCString)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (castPtr)
import Foreign.Storable (peek, poke)
import System.Exit (ExitCode)
import qualified Data.ByteString.Unsafe as BS
import qualified Data.Conduit.Combinators as CC
import qualified Data.Text as T
import qualified System.IO as IO
import qualified System.IO.Error as IO
import qualified Urbit.Ob as Ob
import qualified Urbit.Time as Time
import qualified Urbit.Vere.Log as Log
-- Serf Config -----------------------------------------------------------------
type Flags = [Flag]
data Flag
= DebugRam
| DebugCpu
| CheckCorrupt
| CheckFatal
| Verbose
| DryRun
| Quiet
| Hashless
| Trace
deriving (Eq, Ord, Show, Enum, Bounded)
compileFlags :: [Flag] -> Word
compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0
data Config = Config FilePath [Flag]
deriving (Show)
serf :: HasLogFunc e => Text -> RIO e ()
serf msg = logInfo $ display ("SERF: " <> msg)
-- Types -----------------------------------------------------------------------
data SerfState = SerfState
{ ssNextEv :: EventId
, ssLastMug :: Mug
}
deriving (Eq, Ord, Show)
ssLastEv :: SerfState -> EventId
ssLastEv = pred . ssNextEv
data Serf e = Serf
{ sendHandle :: Handle
, recvHandle :: Handle
, process :: ProcessHandle
, sStderr :: MVar (Text -> RIO e ())
}
data ShipId = ShipId Ship Bool
deriving (Eq, Ord, Show)
data Plea
= PPlay EventId Mug
| PWork Work
| PDone EventId Mug FX
| PStdr EventId Cord
| PSlog EventId Word32 Tank
deriving (Eq, Show)
type ReplacementEv = Job
type WorkResult = (SerfState, FX)
type SerfResp = Either ReplacementEv WorkResult
data SerfExn
= BadComputeId EventId WorkResult
| BadReplacementId EventId ReplacementEv
| UnexpectedPlay EventId (EventId, Mug)
| BadPleaAtom Atom
| BadPleaNoun Noun [Text] Text
| ReplacedEventDuringReplay EventId ReplacementEv
| ReplacedEventDuringBoot EventId ReplacementEv
| EffectsDuringBoot EventId FX
| SerfConnectionClosed
| UnexpectedPleaOnNewShip Plea
| InvalidInitialPlea Plea
deriving (Show)
-- Instances -------------------------------------------------------------------
instance Exception SerfExn
deriveNoun ''ShipId
deriveNoun ''Plea
-- Utils -----------------------------------------------------------------------
printTank :: HasLogFunc e
=> MVar (Text -> RIO e ()) -> Word32 -> Tank
-> RIO e ()
printTank log _pri = printErr log . unlines . fmap unTape . wash (WashCfg 0 80)
guardExn :: (Exception e, MonadIO m) => Bool -> e -> m ()
guardExn ok = io . unless ok . throwIO
fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b
fromRightExn (Left m) exn = throwIO (exn m)
fromRightExn (Right x) _ = pure x
printErr :: MVar (Text -> RIO e ()) -> Text -> RIO e ()
printErr m txt = do
f <- readMVar m
f txt
-- Process Management ----------------------------------------------------------
run :: HasLogFunc e => Config -> RAcquire e (Serf e)
run config = mkRAcquire (startUp config) tearDown
startUp :: HasLogFunc e => Config -> RIO e (Serf e)
startUp conf@(Config pierPath flags) = do
logTrace "STARTING SERF"
logTrace (displayShow conf)
(i, o, e, p) <- io $ do
(Just i, Just o, Just e, p) <- createProcess pSpec
pure (i, o, e, p)
stderr <- newMVar serf
async (readStdErr e stderr)
pure (Serf i o p stderr)
where
diskKey = ""
config = show (compileFlags flags)
args = [pierPath, diskKey, config]
pSpec = (proc "urbit-worker" args)
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
readStdErr :: e. HasLogFunc e => Handle -> MVar (Text -> RIO e ()) -> RIO e ()
readStdErr h print =
untilEOFExn $ do
raw <- io $ IO.hGetLine h
let ln = T.strip (pack raw)
printErr print ln
serf ("[stderr] " <> ln)
where
eofMsg = "[Serf.readStdErr] serf stderr closed"
untilEOFExn :: RIO e () -> RIO e ()
untilEOFExn act = loop
where
loop :: RIO e ()
loop = do
env <- ask
res <- io $ IO.tryIOError $ runRIO env act
case res of
Left exn | IO.isEOFError exn -> logDebug eofMsg
Left exn -> io (IO.ioError exn)
Right () -> loop
tearDown :: HasLogFunc e => Serf e -> RIO e ()
tearDown serf = do
io $ terminateProcess (process serf)
void $ waitForExit serf
-- race_ waitThenKill (shutdownAndWait serf 0)
where
-- killedMsg =
-- "[Serf.tearDown]: Serf didn't die when asked, killing it"
-- waitThenKill = do
-- threadDelay 1000000
-- debug killedMsg
-- terminateProcess (process serf)
waitForExit :: HasLogFunc e => Serf e -> RIO e ExitCode
waitForExit = io . waitForProcess . process
kill :: HasLogFunc e => Serf e -> RIO e ExitCode
kill serf = io (terminateProcess $ process serf) >> waitForExit serf
_shutdownAndWait :: HasLogFunc e => Serf e -> Word8 -> RIO e ExitCode
_shutdownAndWait serf code = do
shutdown serf code
waitForExit serf
-- Basic Send and Receive Operations -------------------------------------------
withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a
withWord64AsByteString w k = do
env <- ask
io $ alloca $ \wp -> do
poke wp w
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
runRIO env (k bs)
sendLen :: HasLogFunc e => Serf e -> Int -> RIO e ()
sendLen s i = do
w <- evaluate (fromIntegral i :: Word64)
withWord64AsByteString (fromIntegral i) (hPut (sendHandle s))
sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e ()
sendOrder w o = do
-- logDebug $ display ("(sendOrder) " <> tshow o)
sendBytes w $ jamBS $ toNoun o
-- logDebug "(sendOrder) Done"
sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e ()
sendBytes s bs = handle ioErr $ do
sendLen s (length bs)
hPut (sendHandle s) bs
hFlush (sendHandle s)
where
ioErr :: IOError -> RIO e ()
ioErr _ = throwIO SerfConnectionClosed
recvLen :: (MonadIO m, HasLogFunc e) => Serf e -> m Word64
recvLen w = io $ do
bs <- hGet (recvHandle w) 8
case length bs of
8 -> unsafeUseAsCString bs (peek . castPtr)
_ -> throwIO SerfConnectionClosed
recvBytes :: HasLogFunc e => Serf e -> Word64 -> RIO e ByteString
recvBytes serf =
io . hGet (recvHandle serf) . fromIntegral
recvAtom :: HasLogFunc e => Serf e -> RIO e Atom
recvAtom w = do
len <- recvLen w
bytesAtom <$> recvBytes w len
cordText :: Cord -> Text
cordText = T.strip . unCord
--------------------------------------------------------------------------------
snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e ()
snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss
shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e ()
shutdown serf code = sendOrder serf (OExit code)
{-|
TODO Find a cleaner way to handle `PStdr` Pleas.
-}
recvPlea :: HasLogFunc e => Serf e -> RIO e Plea
recvPlea w = do
logDebug "(recvPlea) Waiting"
a <- recvAtom w
logDebug "(recvPlea) Got atom"
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun n p m)
case p of PStdr e msg -> do printErr (sStderr w) (cordText msg)
recvPlea w
PSlog _ pri t -> do printTank (sStderr w) pri t
recvPlea w
_ -> do logTrace "recvPlea got something else"
pure p
{-|
Waits for initial plea, and then sends boot IPC if necessary.
-}
handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState
handshake serf ident = do
ss@SerfState{..} <- recvPlea serf >>= \case
PPlay e m -> pure $ SerfState e m
x -> throwIO (InvalidInitialPlea x)
when (ssNextEv == 1) $ do
sendOrder serf (OBoot (lifecycleLen ident))
pure ss
sendWork :: e. HasLogFunc e => Serf e -> Job -> RIO e SerfResp
sendWork w job =
do
sendOrder w (OWork job)
res <- loop
logTrace ("[sendWork] Got response")
pure res
where
eId = jobId job
produce :: WorkResult -> RIO e SerfResp
produce (ss@SerfState{..}, o) = do
guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o))
pure $ Right (ss, o)
replace :: ReplacementEv -> RIO e SerfResp
replace job = do
guardExn (jobId job == eId) (BadReplacementId eId job)
pure (Left job)
loop :: RIO e SerfResp
loop = recvPlea w >>= \case
PPlay e m -> throwIO (UnexpectedPlay eId (e, m))
PDone i m o -> produce (SerfState (i+1) m, o)
PWork work -> replace (DoWork work)
PStdr _ cord -> printErr (sStderr w) (cordText cord) >> loop
PSlog _ pri t -> printTank (sStderr w) pri t >> loop
--------------------------------------------------------------------------------
doJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState, FX)
doJob serf job = do
sendWork serf job >>= \case
Left replaced -> doJob serf replaced
Right (ss, fx) -> pure (job, ss, fx)
bootJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState)
bootJob serf job = do
doJob serf job >>= \case
(job, ss, _) -> pure (job, ss)
-- (job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx)
replayJob :: HasLogFunc e => Serf e -> Job -> RIO e SerfState
replayJob serf job = do
sendWork serf job >>= \case
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
Right (ss, _) -> pure ss
--------------------------------------------------------------------------------
updateProgressBar :: HasLogFunc e
=> Int -> Text -> Maybe (ProgressBar ())
-> RIO e (Maybe (ProgressBar ()))
updateProgressBar count startMsg = \case
Nothing -> do
-- We only construct the progress bar on the first time that we
-- process an event so that we don't display an empty progress
-- bar when the snapshot is caught up to the log.
logTrace $ display startMsg
let style = defStyle { stylePostfix = exact }
pb <- newProgressBar style 10 (Progress 0 count ())
pure (Just pb)
Just pb -> do
incProgress pb 1
pure (Just pb)
--------------------------------------------------------------------------------
type BootSeqFn = EventId -> Mug -> Time.Wen -> Job
data BootExn = ShipAlreadyBooted
deriving stock (Eq, Ord, Show)
deriving anyclass (Exception)
bootFromSeq :: e. HasLogFunc e => Serf e -> BootSeq -> RIO e ([Job], SerfState)
bootFromSeq serf (BootSeq ident nocks ovums) = do
handshake serf ident >>= \case
ss@(SerfState 1 (Mug 0)) -> loop [] ss Nothing bootSeqFns
_ -> throwIO ShipAlreadyBooted
where
loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn]
-> RIO e ([Job], SerfState)
loop acc ss pb = \case
[] -> do
pb <- updateProgressBar 0 bootMsg pb
pure (reverse acc, ss)
x:xs -> do
wen <- io Time.now
job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen
pb <- updateProgressBar (1 + length xs) bootMsg pb
(job, ss) <- bootJob serf job
loop (job:acc) ss pb xs
bootSeqFns :: [BootSeqFn]
bootSeqFns = fmap muckNock nocks <> fmap muckOvum ovums
where
muckNock nok eId mug _ = RunNok $ LifeCyc eId mug nok
muckOvum ov eId mug wen = DoWork $ Work eId mug wen ov
bootMsg = "Booting " ++ (fakeStr (isFake ident)) ++
(Ob.renderPatp (Ob.patp (fromIntegral (who ident))))
fakeStr True = "fake "
fakeStr False = ""
{-|
The ship is booted, but it is behind. shove events to the worker
until it is caught up.
-}
replayJobs :: HasLogFunc e
=> Serf e -> Int -> SerfState -> ConduitT Job Void (RIO e) SerfState
replayJobs serf lastEv = go Nothing
where
go pb ss = do
await >>= \case
Nothing -> pure ss
Just job -> do
pb <- lift $ updatePb ss pb
played <- lift $ replayJob serf job
go pb played
updatePb ss = do
let start = lastEv - (fromIntegral (ssNextEv ss))
let msg = pack ("Replaying events #" ++ (show (ssNextEv ss)) ++
" to #" ++ (show lastEv))
updateProgressBar start msg
replay :: HasLogFunc e
=> Serf e -> Log.EventLog -> Maybe Word64 -> RIO e SerfState
replay serf log last = do
ss <- handshake serf (Log.identity log)
let numEvs = case last of
Nothing -> ssNextEv ss
Just la -> la - (ssNextEv ss) + 1
lastEv <- last & maybe (Log.lastEv log) pure
runConduit $ Log.streamEvents log (ssNextEv ss)
.| CC.take (fromIntegral numEvs)
.| toJobs (Log.identity log) (ssNextEv ss)
.| replayJobs serf (fromIntegral lastEv) ss
toJobs :: HasLogFunc e
=> LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) ()
toJobs ident eId =
await >>= \case
Nothing -> lift $ logTrace "[toJobs] no more jobs"
Just at -> do yield =<< lift (fromAtom at)
lift $ logTrace $ display ("[toJobs] " <> tshow eId)
toJobs ident (eId+1)
where
isNock = eId <= fromIntegral (lifecycleLen ident)
fromAtom :: ByteString -> RIO e Job
fromAtom bs | isNock = do
noun <- cueBSExn bs
(mug, nok) <- fromNounExn noun
pure $ RunNok (LifeCyc eId mug nok)
fromAtom bs = do
noun <- cueBSExn bs
(mug, wen, ovm) <- fromNounExn noun
pure $ DoWork (Work eId mug wen ovm)
-- Collect Effects for Parsing -------------------------------------------------
collectFX :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e ()
collectFX serf log = do
ss <- handshake serf (Log.identity log)
runConduit $ Log.streamEvents log (ssNextEv ss)
.| toJobs (Log.identity log) (ssNextEv ss)
.| doCollectFX serf ss
.| persistFX log
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) ()
persistFX log = loop
where
loop = await >>= \case
Nothing -> pure ()
Just (eId, fx) -> do
lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
loop
doCollectFX :: e. HasLogFunc e
=> Serf e -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
doCollectFX serf = go
where
go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
go ss = await >>= \case
Nothing -> pure ()
Just jb -> do
-- jb <- pure $ replaceMug jb (ssLastMug ss)
(_, ss, fx) <- lift $ doJob serf jb
when (0 == (jobId jb `mod` 10_000)) $ do
lift $ logTrace $ displayShow (jobId jb)
yield (jobId jb, fx)
go ss
_replaceMug :: Job -> Mug -> Job
_replaceMug jb mug =
case jb of
DoWork (Work eId _ w o) -> DoWork (Work eId mug w o)
RunNok (LifeCyc eId _ n) -> RunNok (LifeCyc eId mug n)