Hook up ~& to the terminal driver.

This commit is contained in:
Elliot Glaysher 2019-09-03 15:17:20 -07:00
parent 8af0d7bef9
commit c635abd58e
3 changed files with 58 additions and 49 deletions

View File

@ -16,7 +16,7 @@ import Vere.Ames (ames)
import Vere.Behn (behn)
import Vere.Http.Server (serv)
import Vere.Log (EventLog)
import Vere.Serf (Serf, SerfState(..), doJob)
import Vere.Serf (Serf, sStderr, SerfState(..), doJob)
import Vere.Term
import qualified System.Entropy as Ent
@ -78,7 +78,7 @@ writeJobs log !jobs = do
booted :: HasLogFunc e
=> FilePath -> FilePath -> Serf.Flags -> Ship
-> RAcquire e (Serf, EventLog, SerfState)
-> RAcquire e (Serf e, EventLog, SerfState)
booted pillPath pierPath flags ship = do
rio $ logTrace "LOADING PILL"
@ -116,7 +116,7 @@ booted pillPath pierPath flags ship = do
resumed :: HasLogFunc e
=> FilePath -> Serf.Flags
-> RAcquire e (Serf, EventLog, SerfState)
-> RAcquire e (Serf e, EventLog, SerfState)
resumed top flags = do
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run (Serf.Config top flags)
@ -132,7 +132,7 @@ resumed top flags = do
pier :: e. HasLogFunc e
=> FilePath
-> Maybe Port
-> (Serf, EventLog, SerfState)
-> (Serf e, EventLog, SerfState)
-> RAcquire e ()
pier pierPath mPort (serf, log, ss) = do
computeQ <- newTQueueIO :: RAcquire e (TQueue Ev)
@ -143,6 +143,8 @@ pier pierPath mPort (serf, log, ss) = do
terminalSystem <- initializeLocalTerminal
serf <- pure serf { sStderr = (tsStderr terminalSystem) }
let ship = who (Log.identity log)
let (bootEvents, startDrivers) =
@ -186,7 +188,7 @@ data Drivers e = Drivers
drivers :: HasLogFunc e
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ())
-> TerminalSystem
-> TerminalSystem e
-> ([Ev], RAcquire e (Drivers e))
drivers pierPath inst who mPort plan termSys =
(initialEvents, runDrivers)
@ -253,7 +255,7 @@ logEffect ef =
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
=> Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
=> Serf e -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
-> RAcquire e (Async ())
runCompute serf ss getEvent putResult =
mkRAcquire (async (go ss)) cancel

View File

@ -4,7 +4,7 @@
- TODO: `recvLen` is not big-endian safe.
-}
module Vere.Serf ( Serf, SerfState(..), doJob
module Vere.Serf ( Serf, sStderr, SerfState(..), doJob
, run, shutdown, kill
, replay, bootFromSeq, snapshot
, collectFX
@ -75,12 +75,13 @@ data SerfState = SerfState
ssLastEv :: SerfState -> EventId
ssLastEv = pred . ssNextEv
data Serf = Serf
data Serf e = Serf
{ sendHandle :: Handle
, recvHandle :: Handle
, errThread :: Async ()
, process :: ProcessHandle
, sState :: MVar SerfState
, sStderr :: Text -> RIO e ()
}
data ShipId = ShipId Ship Bool
@ -125,9 +126,9 @@ deriveNoun ''Plea
-- Utils -----------------------------------------------------------------------
printTank :: HasLogFunc e => Word32 -> Tank -> RIO e ()
printTank _pri tank =
(serf . unlines . fmap unTape . wash (WashCfg 0 80)) tank
printTank :: HasLogFunc e => (Text -> RIO e ()) -> Word32 -> Tank -> RIO e ()
printTank log _pri tank =
(log . unlines . fmap unTape . wash (WashCfg 0 80)) tank
guardExn :: (Exception e, MonadIO m) => Bool -> e -> m ()
guardExn ok = io . unless ok . throwIO
@ -139,10 +140,10 @@ fromRightExn (Right x) _ = pure x
-- Process Management ----------------------------------------------------------
run :: HasLogFunc e => Config -> RAcquire e Serf
run :: HasLogFunc e => Config -> RAcquire e (Serf e)
run config = mkRAcquire (startUp config) tearDown
startUp :: HasLogFunc e => Config -> RIO e Serf
startUp :: HasLogFunc e => Config -> RIO e (Serf e)
startUp conf@(Config pierPath flags) = do
logTrace "STARTING SERF"
logTrace (displayShow conf)
@ -153,7 +154,7 @@ startUp conf@(Config pierPath flags) = do
ss <- newEmptyMVar
et <- async (readStdErr e)
pure (Serf i o et p ss)
pure (Serf i o et p ss serf)
where
diskKey = ""
config = show (compileFlags flags)
@ -184,7 +185,7 @@ readStdErr h =
Left exn -> io (IO.ioError exn)
Right () -> loop
tearDown :: Serf -> RIO e ()
tearDown :: HasLogFunc e => Serf e -> RIO e ()
tearDown serf = do
io $ terminateProcess (process serf)
void $ waitForExit serf
@ -199,13 +200,13 @@ tearDown serf = do
-- debug killedMsg
-- terminateProcess (process serf)
waitForExit :: Serf -> RIO e ExitCode
waitForExit :: HasLogFunc e => Serf e -> RIO e ExitCode
waitForExit = io . waitForProcess . process
kill :: Serf -> RIO e ExitCode
kill :: HasLogFunc e => Serf e -> RIO e ExitCode
kill serf = io (terminateProcess $ process serf) >> waitForExit serf
_shutdownAndWait :: HasLogFunc e => Serf -> Word8 -> RIO e ExitCode
_shutdownAndWait :: HasLogFunc e => Serf e -> Word8 -> RIO e ExitCode
_shutdownAndWait serf code = do
shutdown serf code
waitForExit serf
@ -221,12 +222,12 @@ withWord64AsByteString w k = do
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
runRIO env (k bs)
sendLen :: Serf -> Int -> RIO e ()
sendLen :: HasLogFunc e => Serf e -> Int -> RIO e ()
sendLen s i = do
w <- evaluate (fromIntegral i :: Word64)
withWord64AsByteString (fromIntegral i) (hPut (sendHandle s))
sendOrder :: HasLogFunc e => Serf -> Order -> RIO e ()
sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e ()
sendOrder w o = do
logDebug $ display ("[Serf.sendOrder.toNoun] " <> tshow o)
n <- evaluate (toNoun o)
@ -241,7 +242,7 @@ sendOrder w o = do
sendBytes w bs
logDebug "[Serf.sendOrder.sent]"
sendBytes :: Serf -> ByteString -> RIO e ()
sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e ()
sendBytes s bs = handle ioErr $ do
sendLen s (length bs)
hFlush (sendHandle s)
@ -260,18 +261,18 @@ sendBytes s bs = handle ioErr $ do
-- TODO WHY DOES THIS MATTER?????
hack = threadDelay 10000
recvLen :: MonadIO m => Serf -> m Word64
recvLen :: (MonadIO m, HasLogFunc e) => Serf e -> m Word64
recvLen w = io $ do
bs <- hGet (recvHandle w) 8
case length bs of
8 -> unsafeUseAsCString bs (peek . castPtr)
_ -> throwIO SerfConnectionClosed
recvBytes :: Serf -> Word64 -> RIO e ByteString
recvBytes :: HasLogFunc e => Serf e -> Word64 -> RIO e ByteString
recvBytes serf =
io . hGet (recvHandle serf) . fromIntegral
recvAtom :: Serf -> RIO e Atom
recvAtom :: HasLogFunc e => Serf e -> RIO e Atom
recvAtom w = do
len <- recvLen w
bs <- recvBytes w len
@ -286,16 +287,16 @@ cordText = T.strip . unCord
--------------------------------------------------------------------------------
snapshot :: HasLogFunc e => Serf -> SerfState -> RIO e ()
snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e ()
snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss
shutdown :: HasLogFunc e => Serf -> Word8 -> RIO e ()
shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e ()
shutdown serf code = sendOrder serf (OExit code)
{-
TODO Find a cleaner way to handle `PStdr` Pleas.
-}
recvPlea :: HasLogFunc e => Serf -> RIO e Plea
recvPlea :: HasLogFunc e => Serf e -> RIO e Plea
recvPlea w = do
logDebug "[Vere.Serf.recvPlea] waiting"
a <- recvAtom w
@ -303,9 +304,9 @@ recvPlea w = do
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun (traceShowId n) p m)
case p of PStdr e msg -> do serf ("[stdr-plea] " <> cordText msg)
case p of PStdr e msg -> do (sStderr w) (cordText msg)
recvPlea w
PSlog _ pri t -> do printTank pri t
PSlog _ pri t -> do printTank (sStderr w) pri t
recvPlea w
_ -> do logTrace $ display ("recvPlea got: " <> tshow p)
pure p
@ -313,7 +314,7 @@ recvPlea w = do
{-
Waits for initial plea, and then sends boot IPC if necessary.
-}
handshake :: HasLogFunc e => Serf -> LogIdentity -> RIO e SerfState
handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState
handshake serf ident = do
ss@SerfState{..} <- recvPlea serf >>= \case
PPlay Nothing -> pure $ SerfState 1 (Mug 0)
@ -325,7 +326,7 @@ handshake serf ident = do
pure ss
sendWork :: e. HasLogFunc e => Serf -> Job -> RIO e SerfResp
sendWork :: e. HasLogFunc e => Serf e -> Job -> RIO e SerfResp
sendWork w job =
do
sendOrder w (OWork job)
@ -350,25 +351,25 @@ sendWork w job =
PPlay p -> throwIO (UnexpectedPlay eId p)
PDone i m o -> produce (SerfState (i+1) m, o)
PWork work -> replace (DoWork work)
PStdr _ cord -> serf ("[stdr-plea] " <> cordText cord) >> loop
PSlog _ pri t -> printTank pri t >> loop
PStdr _ cord -> (sStderr w) (cordText cord) >> loop
PSlog _ pri t -> printTank (sStderr w) pri t >> loop
--------------------------------------------------------------------------------
doJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState, FX)
doJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState, FX)
doJob serf job = do
sendWork serf job >>= \case
Left replaced -> doJob serf replaced
Right (ss, fx) -> pure (job, ss, fx)
bootJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState)
bootJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState)
bootJob serf job = do
doJob serf job >>= \case
(job, ss, []) -> pure (job, ss)
(job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx)
replayJob :: HasLogFunc e => Serf -> Job -> RIO e SerfState
replayJob :: HasLogFunc e => Serf e -> Job -> RIO e SerfState
replayJob serf job = do
sendWork serf job >>= \case
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
@ -383,7 +384,7 @@ data BootExn = ShipAlreadyBooted
deriving stock (Eq, Ord, Show)
deriving anyclass (Exception)
bootFromSeq :: e. HasLogFunc e => Serf -> BootSeq -> RIO e ([Job], SerfState)
bootFromSeq :: e. HasLogFunc e => Serf e -> BootSeq -> RIO e ([Job], SerfState)
bootFromSeq serf (BootSeq ident nocks ovums) = do
handshake serf ident >>= \case
ss@(SerfState 1 (Mug 0)) -> loop [] ss bootSeqFns
@ -408,12 +409,12 @@ bootFromSeq serf (BootSeq ident nocks ovums) = do
until it is caught up.
-}
replayJobs :: HasLogFunc e
=> Serf -> SerfState -> ConduitT Job Void (RIO e) SerfState
=> Serf e -> SerfState -> ConduitT Job Void (RIO e) SerfState
replayJobs serf = go
where
go ss = await >>= maybe (pure ss) (lift . replayJob serf >=> go)
replay :: HasLogFunc e => Serf -> Log.EventLog -> RIO e SerfState
replay :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e SerfState
replay serf log = do
ss <- handshake serf (Log.identity log)
@ -445,7 +446,7 @@ toJobs ident eId =
-- Collect Effects for Parsing -------------------------------------------------
collectFX :: HasLogFunc e => Serf -> Log.EventLog -> RIO e ()
collectFX :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e ()
collectFX serf log = do
ss <- handshake serf (Log.identity log)
@ -464,7 +465,7 @@ persistFX log = loop
loop
doCollectFX :: e. HasLogFunc e
=> Serf -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
=> Serf e -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
doCollectFX serf = go
where
go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) ()

View File

@ -1,6 +1,6 @@
{-# OPTIONS_GHC -Wwarn #-}
module Vere.Term (initializeLocalTerminal, term, TerminalSystem, tsReaderThread) where
module Vere.Term (initializeLocalTerminal, term, TerminalSystem(..)) where
import UrbitPrelude
import Arvo hiding (Term)
@ -42,12 +42,14 @@ data ReadData = ReadData
-- view of the caller, a terminal has a thread which when exits indicates that
-- the session is over, and has a general in/out queue in the types of the
-- vere/arvo interface.
data TerminalSystem = TerminalSystem
data TerminalSystem e = TerminalSystem
-- | The reader can be waited on, as it shuts itself down when the console
-- goes away.
{ tsReaderThread :: Async()
, tsReadQueue :: TQueue Belt
, tsWriteQueue :: TQueue VereOutput
--
, tsStderr :: Text -> RIO e ()
}
-- Private data to the TerminalSystem that we keep around for stop().
@ -90,12 +92,12 @@ isTerminalBlit _ = True
-- Initializes the generalized input/output parts of the terminal.
--
initializeLocalTerminal :: HasLogFunc e => RAcquire e TerminalSystem
initializeLocalTerminal :: HasLogFunc e => RAcquire e (TerminalSystem e)
initializeLocalTerminal = do
(a, b) <- mkRAcquire start stop
pure a
where
start :: HasLogFunc e => RIO e (TerminalSystem, Private)
start :: HasLogFunc e => RIO e (TerminalSystem e, Private)
start = do
-- Initialize the writing side of the terminal
--
@ -118,9 +120,13 @@ initializeLocalTerminal = do
tsReadQueue <- newTQueueIO
tsReaderThread <- asyncBound (readTerminal tsReadQueue tsWriteQueue (bell tsWriteQueue))
let tsStderr = \txt ->
atomically $ writeTQueue tsWriteQueue $ VerePrintOutput $ unpack txt
pure (TerminalSystem{..}, Private{..})
stop :: (TerminalSystem, Private) -> RIO e ()
stop :: HasLogFunc e
=> (TerminalSystem e, Private) -> RIO e ()
stop (TerminalSystem{..}, Private{..}) = do
cancel tsReaderThread
cancel pWriterThread
@ -164,7 +170,6 @@ initializeLocalTerminal = do
io $ runTermOutput t $ termText "\r"
runMaybeTermOutput t vtClearToBegin
io $ runTermOutput t $ termText p
io $ runTermOutput t $ termText "\r\n"
s <- termRefreshLine t s
loop s
VereBlankLine -> do
@ -304,7 +309,7 @@ initializeLocalTerminal = do
-- ETX (^C)
logDebug $ displayShow "Ctrl-c interrupt"
atomically $ do
writeTQueue wq $ VerePrintOutput "interrupt"
writeTQueue wq $ VerePrintOutput "interrupt\r\n"
writeTQueue rq $ Ctl $ Cord "c"
loop rd
else if w <= 26 then do
@ -323,7 +328,8 @@ initializeLocalTerminal = do
--------------------------------------------------------------------------------
term :: TerminalSystem -> FilePath -> KingId -> QueueEv -> ([Ev], RAcquire e (EffCb e TermEf))
term :: HasLogFunc e
=> TerminalSystem e -> FilePath -> KingId -> QueueEv -> ([Ev], RAcquire e (EffCb e TermEf))
term TerminalSystem{..} pierPath king enqueueEv =
(initialEvents, runTerm)
where