Make the compute thread take a request instead of an event.
Elliot Glaysher 2019-09-10 14:04:03 -07:00 committed by GitHub
commit 9b8ce8d9de
3 changed files with 73 additions and 41 deletions

@ -140,29 +140,35 @@ pier pierPath mPort (serf, log, ss) = do
persistQ <- newTQueueIO :: RAcquire e (TQueue (Job, FX))
executeQ <- newTQueueIO :: RAcquire e (TQueue FX)
saveM <- newEmptyTMVarIO :: RAcquire e (TMVar ())
shutdownM <- newEmptyTMVarIO :: RAcquire e (TMVar ())
let shutdownEvent = putTMVar shutdownM ()
inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16)
terminalSystem <- initializeLocalTerminal
serf <- pure serf { sStderr = (tsStderr terminalSystem) }
let ship = who (Log.identity log)
let (bootEvents, startDrivers) =
drivers pierPath inst ship mPort (writeTQueue computeQ) terminalSystem
drivers pierPath inst ship mPort (writeTQueue computeQ)
shutdownEvent terminalSystem
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
tExe <- startDrivers >>= router (readTQueue executeQ)
tDisk <- runPersist log persistQ (writeTQueue executeQ)
tCpu <- runCompute serf ss (readTQueue computeQ) (writeTQueue persistQ)
tCpu <- runCompute serf ss (readTQueue computeQ) (takeTMVar saveM)
(takeTMVar shutdownM) (writeTQueue persistQ)
tSaveSignal <- saveSignalThread saveM
-- Wait for something to die.
let ded = asum [ death "effect thread" tExe
, death "persist thread" tDisk
, death "compute thread" tCpu
, death "terminal thread" (tsReaderThread terminalSystem)
atomically ded >>= \case
@ -175,6 +181,13 @@ death tag tid = do
Left exn -> Left (tag, exn)
Right () -> Right tag
saveSignalThread :: TMVar () -> RAcquire e (Async ())
saveSignalThread tm = mkRAcquire start cancel
start = async $ forever $ do
threadDelay (120 * 1000000) -- 120 seconds
atomically $ putTMVar tm ()
-- Start All Drivers -----------------------------------------------------------
data Drivers e = Drivers
@ -188,17 +201,17 @@ data Drivers e = Drivers
drivers :: HasLogFunc e
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ())
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ()) -> STM()
-> TerminalSystem e
-> ([Ev], RAcquire e (Drivers e))
drivers pierPath inst who mPort plan termSys =
drivers pierPath inst who mPort plan shutdownSTM termSys =
(initialEvents, runDrivers)
(behnBorn, runBehn) = behn inst plan
(amesBorn, runAmes) = ames inst who mPort plan
(httpBorn, runHttp) = serv pierPath inst plan
(irisBorn, runIris) = client inst plan
(termBorn, runTerm) = term termSys pierPath inst plan
(termBorn, runTerm) = term termSys shutdownSTM pierPath inst plan
initialEvents = mconcat [behnBorn, amesBorn, httpBorn, termBorn, irisBorn]
runDrivers = do
dNewt <- liftAcquire $ runAmes
@ -240,6 +253,12 @@ router waitFx Drivers{..} =
-- Compute Thread --------------------------------------------------------------
data ComputeRequest
= CREvent Ev
| CRSave ()
| CRShutdown ()
deriving (Eq, Show)
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev =
logDebug $ display $ "[EVENT]\n" <> pretty
@ -257,22 +276,43 @@ logEffect ef =
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
=> Serf e -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
=> Serf e
-> SerfState
-> STM Ev
-> STM ()
-> STM ()
-> ((Job, FX) -> STM ())
-> RAcquire e (Async ())
runCompute serf ss getEvent putResult =
runCompute serf ss getEvent getSaveSignal getShutdownSignal putResult =
mkRAcquire (async (go ss)) cancel
go :: SerfState -> RIO e ()
go ss = do
ev <- atomically getEvent
logEvent ev
wen <- io
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
cr <- atomically $
CRShutdown <$> getShutdownSignal <|>
CRSave <$> getSaveSignal <|>
CREvent <$> getEvent
case cr of
CREvent ev -> do
logEvent ev
wen <- io
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically (putResult (job', fx))
go ss'
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically (putResult (job', fx))
go ss'
CRSave () -> do
logDebug $ "Taking periodic snapshot"
Serf.snapshot serf ss
go ss
CRShutdown () -> do
-- When shutting down, we first request a snapshot, and then we
-- just exit this recursive processing, which will cause the serf
-- to exit from its RAcquire.
logDebug $ "Shutting down compute system..."
Serf.snapshot serf ss
pure ()
-- Persist Thread --------------------------------------------------------------

@ -93,6 +93,8 @@ data IODriver = IODriver
, startDriver :: (Ev -> STM ()) -> IO (Async (), Perform)
-- Instances -------------------------------------------------------------------

@ -41,18 +41,15 @@ data ReadData = ReadData
-- the session is over, and has a general in/out queue in the types of the
-- vere/arvo interface.
data TerminalSystem e = TerminalSystem
-- | The reader can be waited on, as it shuts itself down when the console
-- goes away.
{ tsReaderThread :: Async()
, tsReadQueue :: TQueue Belt
{ tsReadQueue :: TQueue Belt
, tsWriteQueue :: TQueue VereOutput
, tsStderr :: Text -> RIO e ()
-- Private data to the TerminalSystem that we keep around for stop().
data Private = Private
{ pWriterThread :: Async()
{ pReaderThread :: Async ()
, pWriterThread :: Async ()
, pPreviousConfiguration :: TerminalAttributes
@ -116,7 +113,7 @@ initializeLocalTerminal = do
io $ setTerminalAttributes stdInput newTermSettings Immediately
tsReadQueue <- newTQueueIO
tsReaderThread <- asyncBound (readTerminal tsReadQueue tsWriteQueue (bell tsWriteQueue))
pReaderThread <- asyncBound (readTerminal tsReadQueue tsWriteQueue (bell tsWriteQueue))
let tsStderr = \txt ->
atomically $ writeTQueue tsWriteQueue $ VerePrintOutput $ unpack txt
@ -126,7 +123,12 @@ initializeLocalTerminal = do
stop :: HasLogFunc e
=> (TerminalSystem e, Private) -> RIO e ()
stop (TerminalSystem{..}, Private{..}) = do
cancel tsReaderThread
-- Note that we don't `cancel pReaderThread` here. This is a deliberate
-- decision because fdRead calls into a native function which the runtime
-- can't kill. If we were to cancel here, the internal `waitCatch` would
-- block until the next piece of keyboard input. Since this only happens
-- at shutdown, just leak the file descriptor.
cancel pWriterThread
-- take the terminal out of raw mode
io $ setTerminalAttributes stdInput pPreviousConfiguration Immediately
@ -327,8 +329,9 @@ initializeLocalTerminal = do
term :: HasLogFunc e
=> TerminalSystem e -> FilePath -> KingId -> QueueEv -> ([Ev], RAcquire e (EffCb e TermEf))
term TerminalSystem{..} pierPath king enqueueEv =
=> TerminalSystem e -> (STM ()) -> FilePath -> KingId -> QueueEv
-> ([Ev], RAcquire e (EffCb e TermEf))
term TerminalSystem{..} shutdownSTM pierPath king enqueueEv =
(initialEvents, runTerm)
initialEvents = [(initialBlew 80 24), initialHail]
@ -358,20 +361,7 @@ term TerminalSystem{..} pierPath king enqueueEv =
for_ fsWrites handleFsWrite
TermEfInit _ _ -> pure ()
TermEfLogo path _ -> do
-- %logo is the shutdown path. A previous iteration just had the reader
-- thread exit when it saw a ^D, which was wrong because it didn't emit
-- a ^D to your Urbit, which does things and then sends us a %logo.
-- But this isn't optimal either. Right now, Pier spins forever,
-- waiting for some piece to exit or die, and I added the terminal
-- reading Async for expedience. But the terminal system ending should
-- additionally trigger taking a snapshot, along with any other clean
-- shutdown work.
-- If we have a separate terminal program which connects to the daemon,
-- this shouldn't be shutdown, but should be a sort of disconnect,
-- meaning it would be a VereOutput?
cancel tsReaderThread
atomically $ shutdownSTM
TermEfMass _ _ -> pure ()
handleFsWrite :: Blit -> RIO e ()