mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-09-20 15:08:34 +03:00
king: Better handling of edge-cases around IPC failure.
This commit is contained in:
parent
8e78266d74
commit
ca13d3f79b
@ -2,7 +2,8 @@ Stubbed out:
|
||||
|
||||
- [x] Handle replacement events (stubbed out now b/c interface can't
|
||||
handle unparsed nouns)
|
||||
- [ ] Handle IPC errors by killing serf process.
|
||||
- [x] Handle IPC errors by killing serf process.
|
||||
- [ ] PlayBail should be an exception.
|
||||
- [ ] Write haddock docs for `Urbit.Vere.Serf.IPC`.
|
||||
- [ ] Unstub slog/stder/dead callbacks on serf config.
|
||||
- [ ] GoodParse hack in newRunCompute.
|
||||
|
@ -150,7 +150,7 @@ data Serf = Serf
|
||||
, serfRecv :: Handle
|
||||
, serfProc :: ProcessHandle
|
||||
, serfSlog :: Slog -> IO ()
|
||||
, serfLock :: MVar SerfState
|
||||
, serfLock :: MVar (Either SomeException SerfState)
|
||||
}
|
||||
|
||||
data Flag
|
||||
@ -208,7 +208,9 @@ data SerfExn
|
||||
-- Access Current Serf State ---------------------------------------------------
|
||||
|
||||
serfLastEventBlocking :: Serf -> IO EventId
|
||||
serfLastEventBlocking Serf{serfLock} = ssLast <$> readMVar serfLock
|
||||
serfLastEventBlocking Serf{serfLock} = readMVar serfLock >>= \case
|
||||
Left err -> throwIO err
|
||||
Right ss -> pure (ssLast ss)
|
||||
|
||||
|
||||
-- Low Level IPC Functions -----------------------------------------------------
|
||||
@ -347,7 +349,7 @@ start (Config exePax pierPath flags onSlog onStdr onDead) = do
|
||||
vLock <- newEmptyMVar
|
||||
let serf = Serf i o p onSlog vLock
|
||||
info <- recvRipe serf
|
||||
putMVar vLock (siStat info)
|
||||
putMVar vLock (Right $ siStat info)
|
||||
pure (serf, info)
|
||||
where
|
||||
diskKey = ""
|
||||
@ -358,13 +360,33 @@ start (Config exePax pierPath flags onSlog onStdr onDead) = do
|
||||
, std_err = CreatePipe
|
||||
}
|
||||
|
||||
withSerfLock
|
||||
:: MonadIO m
|
||||
=> (m (SerfState, a) -> m (Either SomeException (SerfState, a)))
|
||||
-> Serf
|
||||
-> (SerfState -> m (SerfState, a))
|
||||
-> m a
|
||||
withSerfLock tryGen s f = do
|
||||
ss <- takeLock
|
||||
tryGen (f ss) >>= \case
|
||||
Left e -> do
|
||||
io (forceKillSerf s)
|
||||
putMVar (serfLock s) (Left e)
|
||||
throwIO e
|
||||
Right (ss', x) -> do
|
||||
putMVar (serfLock s) (Right ss')
|
||||
pure x
|
||||
where
|
||||
takeLock = do
|
||||
takeMVar (serfLock s) >>= \case
|
||||
Left exn -> putMVar (serfLock s) (Left exn) >> throwIO exn
|
||||
Right ss -> pure ss
|
||||
|
||||
snapshot :: HasLogFunc e => Serf -> RIO e ()
|
||||
snapshot serf = do
|
||||
logTrace "execSnapshot: taking lock"
|
||||
serfState <- takeMVar (serfLock serf)
|
||||
io (sendSnapshotRequest serf (ssLast serfState))
|
||||
logTrace "execSnapshot: releasing lock"
|
||||
putMVar (serfLock serf) serfState
|
||||
snapshot serf =
|
||||
withSerfLock try serf \ss -> do
|
||||
io (sendSnapshotRequest serf (ssLast ss))
|
||||
pure (ss, ())
|
||||
|
||||
shutdown :: HasLogFunc e => Serf -> RIO e ()
|
||||
shutdown serf = do
|
||||
@ -380,39 +402,37 @@ shutdown serf = do
|
||||
wait2sec = threadDelay 2_000_000
|
||||
forceKill = do
|
||||
logTrace "Serf taking too long to go down, kill with fire (SIGTERM)."
|
||||
io (getPid $ serfProc serf) >>= \case
|
||||
Nothing -> do
|
||||
logTrace "Serf process already dead."
|
||||
Just pid -> do
|
||||
io $ signalProcess sigKILL pid
|
||||
io $ waitForProcess (serfProc serf)
|
||||
logTrace "Finished killing serf process with fire."
|
||||
io (forceKillSerf serf)
|
||||
logTrace "Serf process killed with SIGTERM."
|
||||
|
||||
forceKillSerf :: Serf -> IO ()
|
||||
forceKillSerf serf = do
|
||||
getPid (serfProc serf) >>= \case
|
||||
Nothing -> pure ()
|
||||
Just pid -> do
|
||||
io $ signalProcess sigKILL pid
|
||||
io $ void $ waitForProcess (serfProc serf)
|
||||
|
||||
bootSeq :: Serf -> [Noun] -> IO (Maybe PlayBail) -- TODO should this be an exception?
|
||||
bootSeq serf@Serf{..} seq = do
|
||||
oldInfo <- takeMVar serfLock
|
||||
sendWrit serf (WPlay 1 seq)
|
||||
(res, newInfo) <- recvPlay serf >>= \case
|
||||
PBail bail -> pure (Just bail, oldInfo)
|
||||
PDone newMug -> pure (Nothing, SerfState (fromIntegral $ length seq) newMug)
|
||||
putMVar serfLock newInfo
|
||||
pure res
|
||||
withSerfLock try serf \ss -> do
|
||||
recvPlay serf >>= \case
|
||||
PBail bail -> pure (ss, Just bail)
|
||||
PDone newMug -> pure (SerfState (fromIntegral $ length seq) newMug, Nothing)
|
||||
|
||||
{-
|
||||
If this throws an exception, the serf will be in an unusable state. Kill
|
||||
the process.
|
||||
|
||||
TODO *we* should probably kill the serf on exception?
|
||||
TODO Take advantage of IPC support for batching.
|
||||
TODO Maybe take snapshots
|
||||
-}
|
||||
replay :: forall m . MonadIO m => Serf -> ConduitT Noun Void m (Maybe PlayBail)
|
||||
replay
|
||||
:: forall m
|
||||
. (MonadUnliftIO m, MonadIO m)
|
||||
=> Serf
|
||||
-> ConduitT Noun Void m (Maybe PlayBail)
|
||||
replay serf = do
|
||||
initState <- takeMVar (serfLock serf)
|
||||
(mErr, newState) <- loop initState
|
||||
putMVar (serfLock serf) newState
|
||||
pure mErr
|
||||
withSerfLock tryC serf \ss -> do
|
||||
(r, ss') <- loop ss
|
||||
pure (ss', r)
|
||||
where
|
||||
loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState)
|
||||
loop (SerfState lastEve lastMug) = await >>= \case
|
||||
@ -425,24 +445,20 @@ replay serf = do
|
||||
PDone newMug -> loop (SerfState newEve newMug)
|
||||
|
||||
{-
|
||||
If this throws an exception, the serf will be in an unusable state. Kill
|
||||
the process.
|
||||
|
||||
TODO *we* should probably kill the serf on exception?
|
||||
TODO callbacks on snapshot and compaction?
|
||||
TODO Take advantage of async IPC to fill pipe with more than one thing.
|
||||
-}
|
||||
running
|
||||
:: forall m
|
||||
. MonadIO m
|
||||
. (MonadIO m, MonadUnliftIO m)
|
||||
=> Serf
|
||||
-> (Maybe RunInput -> IO ())
|
||||
-> ConduitT RunInput RunOutput m ()
|
||||
running serf notice = do
|
||||
SerfState {..} <- takeMVar (serfLock serf)
|
||||
newState <- loop ssHash ssLast
|
||||
putMVar (serfLock serf) newState
|
||||
pure ()
|
||||
withSerfLock tryC serf $ \SerfState{..} -> do
|
||||
newState <- loop ssHash ssLast
|
||||
pure (newState, ())
|
||||
where
|
||||
loop :: Mug -> EventId -> ConduitT RunInput RunOutput m SerfState
|
||||
loop mug eve = do
|
||||
|
Loading…
Reference in New Issue
Block a user