urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs

478 lines
13 KiB
Haskell
Raw Normal View History

2020-05-21 00:20:01 +03:00
{-
|%
:: +writ: from king to serf
::
2020-05-28 01:57:34 +03:00
+$ gang (unit (set ship))
2020-05-21 00:20:01 +03:00
+$ writ
$% $: %live
$% [%exit cod=@]
[%save eve=@]
[%pack eve=@]
== ==
[%peek now=date lyc=gang pat=path]
[%play eve=@ lit=(list ?((pair date ovum) *))]
[%work job=(pair date ovum)]
==
:: +plea: from serf to king
::
+$ plea
$% [%live ~]
[%ripe [pro=@ hon=@ nok=@] eve=@ mug=@]
[%slog pri=@ ?(cord tank)]
[%peek dat=(unit (cask))]
$: %play
$% [%done mug=@]
[%bail eve=@ mug=@ dud=goof]
== ==
$: %work
$% [%done eve=@ mug=@ fec=(list ovum)]
[%swap eve=@ mug=@ job=(pair date ovum) fec=(list ovum)]
[%bail lud=(list goof)]
== ==
==
-}
2020-05-28 01:57:34 +03:00
module Urbit.Vere.Serf.IPC
( Serf
, Config(..)
, PlayBail(..)
, Flag(..)
, RunError(..)
, RunInput(..)
, RunOutput(..)
, start
, serfLastEventBlocking
, shutdown
, snapshot
, bootSeq
, replay
, running
)
where
import Urbit.Prelude hiding ((<|))
2020-05-28 01:57:34 +03:00
import Data.Bits
import Data.Conduit
2020-05-28 01:57:34 +03:00
import System.Process
import Urbit.Arvo
import Urbit.Vere.Pier.Types hiding (Work)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (castPtr)
import Foreign.Storable (peek, poke)
import RIO.Prelude (decodeUtf8Lenient)
2020-05-28 01:57:34 +03:00
import System.Posix.Signals (sigKILL, signalProcess)
import Urbit.Time (Wen)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Unsafe as BS
import qualified System.IO.Error as IO
import qualified Urbit.Time as Time
-- IPC Types -------------------------------------------------------------------
type Gang = Maybe (HoonSet Ship)
type Goof = (Term, [Tank])
data Live
2020-05-28 01:57:34 +03:00
= LExit Atom -- exit status code
| LSave EventId
| LPack EventId
deriving (Show)
type PlayBail = (EventId, Mug, Goof)
data Play
= PDone Mug
| PBail PlayBail
deriving (Show)
data Work
= WDone EventId Mug [Ef]
| WSwap EventId Mug (Wen, Noun) [Ef]
| WBail [Goof]
deriving (Show)
data Writ
= WLive Live
| WPeek Wen Gang Path
| WPlay EventId [Noun]
| WWork Wen Ev
deriving (Show)
data RipeInfo = RipeInfo
{ riProt :: Atom
, riHoon :: Atom
, riNock :: Atom
}
deriving (Show)
data SerfState = SerfState
{ ssLast :: EventId
, ssHash :: Mug
}
deriving (Show)
data SerfInfo = SerfInfo
{ siRipe :: RipeInfo
, siStat :: SerfState
}
deriving (Show)
type Slog = (Atom, Tank)
data Plea
= PLive ()
| PRipe SerfInfo
| PSlog Slog
| PPeek (Maybe (Term, Noun))
| PPlay Play
| PWork Work
deriving (Show)
deriveNoun ''Live
deriveNoun ''Play
deriveNoun ''Work
deriveNoun ''Writ
deriveNoun ''RipeInfo
deriveNoun ''SerfState
deriveNoun ''SerfInfo
deriveNoun ''Plea
2020-05-28 01:57:34 +03:00
-- Serf API Types --------------------------------------------------------------
data Serf = Serf
{ serfSend :: Handle
, serfRecv :: Handle
, serfProc :: ProcessHandle
, serfSlog :: Slog -> IO ()
, serfLock :: MVar SerfState
2020-05-21 00:20:01 +03:00
}
data Flag
= DebugRam
| DebugCpu
| CheckCorrupt
| CheckFatal
| Verbose
| DryRun
| Quiet
| Hashless
| Trace
deriving (Eq, Ord, Show, Enum, Bounded)
2020-05-28 01:57:34 +03:00
data Config = Config
{ scSerf :: FilePath -- Where is the urbit-worker executable?
, scPier :: FilePath -- Where is the pier directory?
, scFlag :: [Flag] -- Serf execution flags.
, scSlog :: Slog -> IO () -- What to do with slogs?
, scStdr :: Text -> IO () -- What to do with lines from stderr?
, scDead :: IO () -- What to do when the serf process goes down?
}
2020-05-21 00:20:01 +03:00
data RunError
= RunBail [Goof]
| RunSwap EventId Mug Wen Noun [Ef]
2020-05-21 00:20:01 +03:00
data RunInput
= RunSnap
| RunPack
| RunPeek Wen Gang Path (Maybe (Term, Noun) -> IO ())
| RunWork Ev (RunError -> IO ())
2020-05-21 00:20:01 +03:00
data RunOutput = RunOutput EventId Mug Wen Noun [Ef]
-- Exceptions ------------------------------------------------------------------
data SerfExn
-- = BadComputeId EventId WorkResult
-- | BadReplacementId EventId ReplacementEv
-- | UnexpectedPlay EventId (EventId, Mug)
= UnexpectedPlea Plea Text
| BadPleaAtom Atom
| BadPleaNoun Noun [Text] Text
-- | ReplacedEventDuringReplay EventId ReplacementEv
-- | ReplacedEventDuringBoot EventId ReplacementEv
-- | EffectsDuringBoot EventId FX
| SerfConnectionClosed
-- | UnexpectedPleaOnNewShip Plea
-- | InvalidInitialPlea Plea
deriving (Show, Exception)
-- Access Current Serf State ---------------------------------------------------
2020-05-28 01:57:34 +03:00
serfLastEventBlocking :: Serf -> IO EventId
serfLastEventBlocking Serf{serfLock} = ssLast <$> readMVar serfLock
-- Low Level IPC Functions -----------------------------------------------------
fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b
fromRightExn (Left m) exn = throwIO (exn m)
fromRightExn (Right x) _ = pure x
2020-05-28 01:57:34 +03:00
-- TODO Support Big Endian
sendLen :: Serf -> Int -> IO ()
sendLen s i = do
w <- evaluate (fromIntegral i :: Word64)
2020-05-28 01:57:34 +03:00
withWord64AsByteString w (hPut (serfSend s))
where
withWord64AsByteString :: Word64 -> (ByteString -> IO a) -> IO a
withWord64AsByteString w k = alloca $ \wp -> do
poke wp w
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
k bs
sendBytes :: Serf -> ByteString -> IO ()
sendBytes s bs = handle onIOError $ do
sendLen s (length bs)
hPut (serfSend s) bs
hFlush (serfSend s)
where
onIOError :: IOError -> IO ()
2020-05-28 01:57:34 +03:00
onIOError = const (throwIO SerfConnectionClosed) -- TODO call death callback?
recvBytes :: Serf -> Word64 -> IO ByteString
2020-05-28 01:57:34 +03:00
recvBytes serf = BS.hGet (serfRecv serf) . fromIntegral
recvLen :: Serf -> IO Word64
recvLen w = do
bs <- BS.hGet (serfRecv w) 8
case length bs of
2020-05-28 01:57:34 +03:00
8 -> BS.unsafeUseAsCString bs (peek @Word64 . castPtr)
_ -> throwIO SerfConnectionClosed -- TODO kill worker process and call the death callback.
2020-05-28 01:57:34 +03:00
recvResp :: Serf -> IO ByteString
recvResp serf = do
len <- recvLen serf
recvBytes serf len
-- Send Writ / Recv Plea -------------------------------------------------------
sendWrit :: Serf -> Writ -> IO ()
2020-05-28 01:57:34 +03:00
sendWrit s = sendBytes s . jamBS . toNoun
recvPlea :: Serf -> IO Plea
recvPlea w = do
2020-05-28 01:57:34 +03:00
b <- recvResp w
n <- fromRightExn (cueBS b) (const $ BadPleaAtom $ bytesAtom b)
p <- fromRightExn (fromNounErr @Plea n) (\(p, m) -> BadPleaNoun n p m)
pure p
recvPleaHandlingSlog :: Serf -> IO Plea
recvPleaHandlingSlog serf = loop
where
loop = recvPlea serf >>= \case
PSlog info -> serfSlog serf info >> loop
other -> pure other
-- Higher-Level IPC Functions --------------------------------------------------
recvRipe :: Serf -> IO SerfInfo
recvRipe serf = recvPleaHandlingSlog serf >>= \case
PRipe ripe -> pure ripe
plea -> throwIO (UnexpectedPlea plea "expecting %play")
recvPlay :: Serf -> IO Play
recvPlay serf = recvPleaHandlingSlog serf >>= \case
PPlay play -> pure play
plea -> throwIO (UnexpectedPlea plea "expecting %play")
recvLive :: Serf -> IO ()
recvLive serf = recvPleaHandlingSlog serf >>= \case
PLive () -> pure ()
plea -> throwIO (UnexpectedPlea plea "expecting %live")
recvWork :: Serf -> IO Work
recvWork serf = do
recvPleaHandlingSlog serf >>= \case
PWork work -> pure work
plea -> throwIO (UnexpectedPlea plea "expecting %work")
recvPeek :: Serf -> IO (Maybe (Term, Noun))
recvPeek serf = do
recvPleaHandlingSlog serf >>= \case
PPeek peek -> pure peek
plea -> throwIO (UnexpectedPlea plea "expecting %peek")
2020-05-28 01:57:34 +03:00
-- Request-Response Points -- These don't touch the lock -----------------------
sendSnapshotRequest :: Serf -> EventId -> IO ()
sendSnapshotRequest serf eve = do
sendWrit serf (WLive $ LSave eve)
recvLive serf
2020-05-28 01:57:34 +03:00
sendCompactRequest :: Serf -> EventId -> IO ()
sendCompactRequest serf eve = do
sendWrit serf (WLive $ LPack eve)
recvLive serf
2020-05-28 01:57:34 +03:00
sendScryRequest :: Serf -> Wen -> Gang -> Path -> IO (Maybe (Term, Noun))
sendScryRequest serf w g p = do
sendWrit serf (WPeek w g p)
recvPeek serf
2020-05-28 01:57:34 +03:00
sendShutdownRequest :: Serf -> Atom -> IO ()
sendShutdownRequest serf exitCode = do
sendWrit serf (WLive $ LExit exitCode)
pure ()
-- Serf Usage Flows ------------------------------------------------------------
compileFlags :: [Flag] -> Word
compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0
readStdErr :: Handle -> (Text -> IO ()) -> IO () -> IO ()
readStdErr h onLine onClose = loop
where
loop = do
IO.tryIOError (BS.hGetLine h >>= onLine . decodeUtf8Lenient) >>= \case
Left exn -> onClose
Right () -> loop
start :: Config -> IO (Serf, SerfInfo)
start (Config exePax pierPath flags onSlog onStdr onDead) = do
(Just i, Just o, Just e, p) <- createProcess pSpec
2020-05-28 01:57:34 +03:00
void $ async (readStdErr e onStdr onDead)
vLock <- newEmptyMVar
let serf = Serf i o p onSlog vLock
2020-05-28 01:57:34 +03:00
info <- recvRipe serf
putMVar vLock (siStat info)
pure (serf, info)
where
diskKey = ""
config = show (compileFlags flags)
args = [pierPath, diskKey, config]
pSpec = (proc exePax args) { std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
2020-05-28 01:57:34 +03:00
snapshot :: HasLogFunc e => Serf -> RIO e ()
snapshot serf = do
logTrace "execSnapshot: taking lock"
serfState <- takeMVar (serfLock serf)
io (sendSnapshotRequest serf (ssLast serfState))
logTrace "execSnapshot: releasing lock"
putMVar (serfLock serf) serfState
shutdown :: HasLogFunc e => Serf -> RIO e ()
shutdown serf = do
race_ (wait2sec >> forceKill) $ do
logTrace "Getting current serf state (taking lock, might block if in use)."
finalState <- takeMVar (serfLock serf)
logTrace "Got serf state (and took lock). Requesting shutdown."
io (sendShutdownRequest serf 0)
logTrace "Sent shutdown request. Waiting for process to die."
io $ waitForProcess (serfProc serf)
logTrace "RIP Serf process."
where
wait2sec = threadDelay 2_000_000
forceKill = do
logTrace "Serf taking too long to go down, kill with fire (SIGTERM)."
io (getPid $ serfProc serf) >>= \case
Nothing -> do
logTrace "Serf process already dead."
Just pid -> do
io $ signalProcess sigKILL pid
io $ waitForProcess (serfProc serf)
logTrace "Finished killing serf process with fire."
bootSeq :: Serf -> [Noun] -> IO (Maybe PlayBail) -- TODO should this be an exception?
bootSeq serf@Serf{..} seq = do
oldInfo <- takeMVar serfLock
sendWrit serf (WPlay 1 seq)
(res, newInfo) <- recvPlay serf >>= \case
PBail bail -> pure (Just bail, oldInfo)
PDone newMug -> pure (Nothing, SerfState (fromIntegral $ length seq) newMug)
putMVar serfLock newInfo
pure res
{-
If this throws an exception, the serf will be in an unusable state. Kill
the process.
2020-05-28 01:57:34 +03:00
TODO *we* should probably kill the serf on exception?
TODO Take advantage of IPC support for batching.
TODO Maybe take snapshots
-}
2020-05-28 01:57:34 +03:00
replay :: forall m . MonadIO m => Serf -> ConduitT Noun Void m (Maybe PlayBail)
replay serf = do
2020-05-28 01:57:34 +03:00
initState <- takeMVar (serfLock serf)
(mErr, newState) <- loop initState
putMVar (serfLock serf) newState
pure mErr
where
loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState)
2020-05-28 01:57:34 +03:00
loop (SerfState lastEve lastMug) = await >>= \case
Nothing -> pure (Nothing, SerfState lastEve lastMug)
Just ev -> do
let newEve = lastEve + 1
io $ sendWrit serf (WPlay newEve [ev])
2020-05-28 01:57:34 +03:00
io (recvPlay serf) >>= \case
PBail bail -> pure (Just bail, SerfState lastEve lastMug)
PDone newMug -> loop (SerfState newEve newMug)
{-
If this throws an exception, the serf will be in an unusable state. Kill
the process.
2020-05-28 01:57:34 +03:00
TODO *we* should probably kill the serf on exception?
TODO callbacks on snapshot and compaction?
TODO Take advantage of async IPC to fill pipe with more than one thing.
-}
running
:: forall m
. MonadIO m
=> Serf
-> (Maybe RunInput -> IO ())
-> ConduitT RunInput RunOutput m ()
running serf notice = do
SerfState {..} <- takeMVar (serfLock serf)
newState <- loop ssHash ssLast
putMVar (serfLock serf) newState
pure ()
where
loop :: Mug -> EventId -> ConduitT RunInput RunOutput m SerfState
loop mug eve = do
io (notice Nothing)
nex <- await
io (notice nex)
nex & \case
Nothing -> do
pure $ SerfState eve mug
Just RunSnap -> do
io (sendSnapshotRequest serf eve)
loop mug eve
Just RunPack -> do
2020-05-28 01:57:34 +03:00
io (sendCompactRequest serf eve)
loop mug eve
Just (RunPeek wen gang pax act) -> do
2020-05-28 01:57:34 +03:00
io (sendScryRequest serf wen gang pax >>= act)
loop mug eve
Just (RunWork evn err) -> do
wen <- io Time.now
io (sendWrit serf (WWork wen evn))
io (recvWork serf) >>= \case
WDone eid hash fx -> do
yield (RunOutput eid hash wen (toNoun evn) fx)
loop hash eid
WSwap eid hash (wen, noun) fx -> do
io $ err (RunSwap eid hash wen noun fx)
yield (RunOutput eid hash wen noun fx)
loop hash eid
WBail goofs -> do
io $ err (RunBail goofs)
loop mug eve