From a920e71acac95d8b3cc65c5ebc596c7e6b30dbce Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Tue, 17 Dec 2019 10:06:20 -0800 Subject: [PATCH] External terminals working well now (Lots more janky cherry picking from king-daemon branch) --- pkg/king/lib/King/API.hs | 71 +++++++++++----------- pkg/king/lib/Vere/NounServ.hs | 58 ++++++++++-------- pkg/king/lib/Vere/Pier.hs | 29 +++++---- pkg/king/lib/Vere/Term.hs | 102 +++++++++++++++----------------- pkg/king/lib/Vere/Term/API.hs | 2 +- pkg/king/lib/Vere/Term/Demux.hs | 52 ++++++++++++++-- 6 files changed, 175 insertions(+), 139 deletions(-) diff --git a/pkg/king/lib/King/API.hs b/pkg/king/lib/King/API.hs index 580464a4a0..cc1acdcff4 100644 --- a/pkg/king/lib/King/API.hs +++ b/pkg/king/lib/King/API.hs @@ -4,15 +4,17 @@ ships. Do it or strip it out. -} -module King.API (kingAPI, readPortsFile) where +module King.API (King(..), kingAPI, readPortsFile) where import UrbitPrelude -import Data.Aeson +-- ort Data.Aeson import RIO.Directory +import Arvo (Belt) import King.App (HasConfigDir(..)) import Network.Socket (Socket) import Prelude (read) + -- rt Vere.LockFile (lockFile) import qualified Network.HTTP.Types as H @@ -20,30 +22,24 @@ import qualified Network.Wai as W import qualified Network.Wai.Handler.Warp as W import qualified Network.Wai.Handler.WebSockets as WS import qualified Network.WebSockets as WS -import qualified Urbit.Ob as Ob +import qualified Vere.NounServ as NounServ +import qualified Vere.Term.API as Term -- Types ----------------------------------------------------------------------- +type TermConn = NounServ.Conn Belt [Term.Ev] + +type TermConnAPI = TVar (Maybe (TermConn -> STM ())) + {- Daemon state. -} data King = King - { kServer :: Async () + { kServer :: Async () + , kTermConn :: TermConnAPI } -data ShipStatus = Halted | Booting | Booted | Running | LandscapeUp - deriving (Generic, ToJSON, FromJSON) - -data KingStatus = Starting | Started - deriving (Generic, ToJSON, FromJSON) - -data StatusResp = StatusResp - { king :: KingStatus - , ships :: Map Text ShipStatus - } - deriving (Generic, ToJSON, FromJSON) - -------------------------------------------------------------------------------- @@ -84,9 +80,11 @@ kingServer is = where startKing :: HasLogFunc e => (Int, Socket) -> RIO e King startKing (port, sock) = do + api <- newTVarIO Nothing let opts = W.defaultSettings & W.setPort port - tid <- async $ io $ W.runSettingsSocket opts sock $ app - pure (King tid) + env <- ask + tid <- async $ io $ W.runSettingsSocket opts sock $ app env api + pure (King tid api) {- Start the HTTP server and write to the ports file. @@ -99,36 +97,35 @@ kingAPI = do -- lockFile dir kingServer (port, sock) -stubStatus :: StatusResp -stubStatus = StatusResp Started $ mapFromList [("zod", Running)] - -serveTerminal :: Ship -> Word -> W.Application -serveTerminal ship word = - WS.websocketsOr WS.defaultConnectionOptions placeholderWSApp fallback +serveTerminal :: HasLogFunc e => e -> TermConnAPI -> Word -> W.Application +serveTerminal env api word = + WS.websocketsOr WS.defaultConnectionOptions wsApp fallback where fallback req respond = respond $ W.responseLBS H.status500 [] $ "This endpoint uses websockets" -placeholderWSApp :: WS.ServerApp -placeholderWSApp _ = pure () + wsApp pen = + atomically (readTVar api) >>= \case + Nothing -> WS.rejectRequest pen "Ship not running" + Just sp -> do + wsc <- io $ WS.acceptRequest pen + inp <- io $ newTBMChanIO 5 + out <- io $ newTBMChanIO 5 + atomically $ sp $ NounServ.mkConn inp out + runRIO env $ + NounServ.wsConn "NOUNSERV (wsServ) " inp out wsc data BadShip = BadShip Text deriving (Show, Exception) -readShip :: Text -> IO Ship -readShip t = Ob.parsePatp t & \case - Left err -> throwIO (BadShip t) - Right pp -> pure $ Ship $ fromIntegral $ Ob.fromPatp pp - -app :: W.Application -app req respond = +app :: HasLogFunc e => e -> TermConnAPI -> W.Application +app env api req respond = case W.pathInfo req of - ["terminal", ship, session] -> do + ["terminal", session] -> do session :: Word <- evaluate $ read $ unpack session - ship <- readShip ship - serveTerminal ship session req respond + serveTerminal env api session req respond ["status"] -> - respond $ W.responseLBS H.status200 [] $ encode stubStatus + respond $ W.responseLBS H.status200 [] "{}" _ -> respond $ W.responseLBS H.status404 [] "No implemented" diff --git a/pkg/king/lib/Vere/NounServ.hs b/pkg/king/lib/Vere/NounServ.hs index 2d9907ee3d..f77097ce0a 100644 --- a/pkg/king/lib/Vere/NounServ.hs +++ b/pkg/king/lib/Vere/NounServ.hs @@ -5,6 +5,9 @@ module Vere.NounServ , wsServer , wsClient , testIt + , wsServApp + , mkConn + , wsConn ) where import UrbitPrelude @@ -37,7 +40,7 @@ data Server i o a = Server -------------------------------------------------------------------------------- -wsConn :: (FromNoun i, ToNoun o, Show o, HasLogFunc e) +wsConn :: (FromNoun i, ToNoun o) -- , HasLogFunc e) => Utf8Builder -> TBMChan i -> TBMChan o -> WS.Connection @@ -45,21 +48,21 @@ wsConn :: (FromNoun i, ToNoun o, Show o, HasLogFunc e) wsConn pre inp out wsc = do env <- ask - -- logWarn (pre <> "(wcConn) Connected!") + -- logWarn (pre <> "(wcConn) Connected!") writer <- io $ async $ runRIO env $ forever $ do - -- logWarn (pre <> "(wsConn) Waiting for data.") + -- logWarn (pre <> "(wsConn) Waiting for data.") byt <- io $ toStrict <$> WS.receiveData wsc - -- logWarn (pre <> "Got data") + -- logWarn (pre <> "Got data") dat <- cueBSExn byt >>= fromNounExn - -- logWarn (pre <> "(wsConn) Decoded data, writing to chan") + -- logWarn (pre <> "(wsConn) Decoded data, writing to chan") atomically $ writeTBMChan inp dat reader <- io $ async $ runRIO env $ forever $ do - -- logWarn (pre <> "Waiting for data from chan") + -- logWarn (pre <> "Waiting for data from chan") atomically (readTBMChan out) >>= \case Nothing -> do - -- logWarn (pre <> "(wsConn) Connection closed") + -- logWarn (pre <> "(wsConn) Connection closed") error "dead-conn" Just msg -> do -- logWarn (pre <> "(wsConn) Got message! " <> displayShow msg) @@ -67,7 +70,7 @@ wsConn pre inp out wsc = do res <- atomically (waitCatchSTM writer <|> waitCatchSTM reader) - -- logWarn $ displayShow (res :: Either SomeException ()) + -- logWarn $ displayShow (res :: Either SomeException ()) atomically (closeTBMChan inp >> closeTBMChan out) @@ -76,9 +79,9 @@ wsConn pre inp out wsc = do -------------------------------------------------------------------------------- -wsClient :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e) - => W.Port -> RIO e (Client i o) -wsClient por = do +wsClient :: ∀i o e. (ToNoun o, FromNoun i, HasLogFunc e) + => Text -> W.Port -> RIO e (Client i o) +wsClient pax por = do env <- ask inp <- io $ newTBMChanIO 5 out <- io $ newTBMChanIO 5 @@ -87,31 +90,36 @@ wsClient por = do -- logDebug "NOUNSERV (wsClie) Trying to connect" tid <- io $ async - $ WS.runClient "127.0.0.1" por "/terminal/~zod/1" + $ WS.runClient "127.0.0.1" por (unpack pax) $ runRIO env . wsConn "NOUNSERV (wsClie) " inp out pure $ Client con tid -------------------------------------------------------------------------------- -wsServer :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e) +wsServApp :: (HasLogFunc e, ToNoun o, FromNoun i) + => (Conn i o -> STM ()) + -> WS.PendingConnection + -> RIO e () +wsServApp cb pen = do + logError "NOUNSERV (wsServer) Got connection!" + wsc <- io $ WS.acceptRequest pen + inp <- io $ newTBMChanIO 5 + out <- io $ newTBMChanIO 5 + atomically $ cb (mkConn inp out) + wsConn "NOUNSERV (wsServ) " inp out wsc + +wsServer :: ∀i o e. (ToNoun o, FromNoun i, HasLogFunc e) => RIO e (Server i o W.Port) wsServer = do con <- io $ newTBMChanIO 5 - let app pen = do - logTrace "NOUNSERV (wsServer) Got connection! Accepting" - wsc <- io $ WS.acceptRequest pen - inp <- io $ newTBMChanIO 5 - out <- io $ newTBMChanIO 5 - atomically $ writeTBMChan con (mkConn inp out) - wsConn "NOUNSERV (wsServ) " inp out wsc - tid <- async $ do env <- ask - logTrace "NOUNSERV (wsServer) Starting server" - io $ WS.runServer "127.0.0.1" 9999 (runRIO env . app) - logWarn "NOUNSERV (wsServer) Server died" + logError "NOUNSERV (wsServer) Starting server" + io $ WS.runServer "127.0.0.1" 9999 + $ runRIO env . wsServApp (writeTBMChan con) + logError "NOUNSERV (wsServer) Server died" atomically $ closeTBMChan con pure $ Server (readTBMChan con) tid 9999 @@ -133,7 +141,7 @@ testIt = do logTrace "(testIt) Starting Server" Server{..} <- wsServer @Example @Example logTrace "(testIt) Connecting" - Client{..} <- wsClient @Example @Example sData + Client{..} <- wsClient @Example @Example "/" sData logTrace "(testIt) Accepting connection" sConn <- fromJust "accept" =<< atomically sAccept diff --git a/pkg/king/lib/Vere/Pier.hs b/pkg/king/lib/Vere/Pier.hs index 44d810b253..211ceb1a8e 100644 --- a/pkg/king/lib/Vere/Pier.hs +++ b/pkg/king/lib/Vere/Pier.hs @@ -152,7 +152,12 @@ pier (serf, log, ss) = do saveM <- newEmptyTMVarIO shutdownM <- newEmptyTMVarIO - _api ← King.kingAPI + kapi ← King.kingAPI + + termApiQ <- atomically $ do + q <- newTQueue + writeTVar (King.kTermConn kapi) (Just $ writeTQueue q) + pure q let shutdownEvent = putTMVar shutdownM () @@ -160,28 +165,22 @@ pier (serf, log, ss) = do -- (sz, local) <- Term.localClient - (waitExternalTerm, termServPort) <- Term.termServer + -- (waitExternalTerm, termServPort) <- Term.termServer (demux, muxed) <- atomically $ do res <- Term.mkDemux -- Term.addDemux local res pure (res, Term.useDemux res) - rio $ logInfo $ display $ - "TERMSERV Terminal Server running on port: " <> tshow termServPort + -- rio $ logInfo $ display $ + -- "TERMSERV Terminal Server running on port: " <> tshow termServPort - let listenLoop = do + acquireWorker $ forever $ do logTrace "TERMSERV Waiting for external terminal." - ok <- atomically $ do - waitExternalTerm >>= \case - Nothing -> pure False - Just ext -> Term.addDemux ext demux >> pure True - if ok - then do logTrace "TERMSERV External terminal connected" - listenLoop - else logTrace "TERMSERV Termainal server is dead" - - acquireWorker listenLoop + atomically $ do + ext <- Term.connClient <$> readTQueue termApiQ + Term.addDemux ext demux + logTrace "TERMSERV External terminal connected." swapMVar (sStderr serf) (atomically . Term.trace muxed) diff --git a/pkg/king/lib/Vere/Term.hs b/pkg/king/lib/Vere/Term.hs index 7f95faf403..cde0e2ed0d 100644 --- a/pkg/king/lib/Vere/Term.hs +++ b/pkg/king/lib/Vere/Term.hs @@ -3,7 +3,7 @@ module Vere.Term , localClient , connectToRemote , runTerminalClient - , termServer + , connClient , term ) where @@ -113,31 +113,11 @@ isTerminalBlit _ = True -------------------------------------------------------------------------------- -{- - TODO XX HACK: We don't have any good way of handling client - disconnect, so we just retry. This will probably waste CPU. --} -termServer :: ∀e. HasLogFunc e - => RAcquire e (STM (Maybe Client), Port) -termServer = fst <$> mkRAcquire start stop - where - stop = cancel . snd - start = do - serv <- Serv.wsServer @Belt @[Term.Ev] - - let getClient = do - Serv.sAccept serv <&> \case - Nothing -> Nothing - Just c -> Just $ Client - { give = Serv.cSend c - , take = Serv.cRecv c >>= \case - Nothing -> empty - Just ev -> pure ev - } - - pure ( (getClient, Port $ fromIntegral $ Serv.sData serv) - , Serv.sAsync serv - ) +connClient :: Serv.Conn Belt [Term.Ev] -> Client +connClient c = Client + { give = Serv.cSend c + , take = Serv.cRecv c + } connectToRemote :: ∀e. HasLogFunc e => Port @@ -147,10 +127,13 @@ connectToRemote port local = mkRAcquire start stop where stop (x, y) = cancel x >> cancel y start = do - Serv.Client{..} <- Serv.wsClient (fromIntegral port) + Serv.Client{..} <- Serv.wsClient "/terminal/0" (fromIntegral port) + -- TODO XX Handle disconnect more cleanly. ferry <- async $ forever $ atomically $ asum - [ Term.take local >>= Serv.cSend cConn + [ Term.take local >>= \case + Nothing -> empty + Just ev -> Serv.cSend cConn ev , Serv.cRecv cConn >>= \case Nothing -> empty Just ev -> Term.give local ev @@ -164,11 +147,13 @@ instance HasConfigDir HackConfigDir where configDirL = hcdPax runTerminalClient :: ∀e. HasLogFunc e => FilePath -> RIO e () runTerminalClient pier = runRAcquire $ do - mPort <- runRIO (HCD pier) readPortsFile - port <- maybe (error "Can't connect") pure mPort - (tsize, local) <- localClient - (tid1, tid2) <- connectToRemote (Port $ fromIntegral port) local - atomically $ waitSTM tid1 <|> waitSTM tid2 + mPort <- runRIO (HCD pier) readPortsFile + port <- maybe (error "Can't connect") pure mPort + mExit <- io newEmptyTMVarIO + (siz, cli) <- localClient (putTMVar mExit ()) + (tid, sid) <- connectToRemote (Port $ fromIntegral port) cli + atomically $ waitSTM tid <|> waitSTM sid <|> takeTMVar mExit + where runRAcquire :: RAcquire e () -> RIO e () runRAcquire act = rwith act $ const $ pure () @@ -176,8 +161,10 @@ runTerminalClient pier = runRAcquire $ do {- Initializes the generalized input/output parts of the terminal. -} -localClient :: ∀e. HasLogFunc e => RAcquire e (TSize.Window Word, Client) -localClient = fst <$> mkRAcquire start stop +localClient :: ∀e. HasLogFunc e + => STM () + -> RAcquire e (TSize.Window Word, Client) +localClient doneSignal = fst <$> mkRAcquire start stop where start :: HasLogFunc e => RIO e ((TSize.Window Word, Client), Private) start = do @@ -202,7 +189,7 @@ localClient = fst <$> mkRAcquire start stop pReaderThread <- asyncBound (readTerminal tsReadQueue tsWriteQueue (bell tsWriteQueue)) - let client = Client { take = readTQueue tsReadQueue + let client = Client { take = Just <$> readTQueue tsReadQueue , give = writeTQueue tsWriteQueue } @@ -504,8 +491,10 @@ localClient = fst <$> mkRAcquire start stop writeTQueue rq $ Ctl $ Cord "c" loop rd else if w <= 26 then do - sendBelt $ Ctl $ Cord $ pack [BS.w2c (w + 97 - 1)] - loop rd + case pack [BS.w2c (w + 97 - 1)] of + "d" -> atomically doneSignal + c -> do sendBelt $ Ctl $ Cord c + loop rd else if w == 27 then do loop rd { rdEscape = True } else do @@ -537,28 +526,31 @@ term (tsize, Client{..}) shutdownSTM king enqueueEv = runTerm :: RAcquire e (EffCb e TermEf) runTerm = do - tim <- mkRAcquire start cancel + tim <- mkRAcquire (async readLoop) cancel pure handleEffect - start :: RIO e (Async ()) - start = async readBelt - - readBelt :: RIO e () - readBelt = forever $ do - b <- atomically take - let blip = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b - atomically $ enqueueEv $ blip + {- + Because our terminals are always `Demux`ed, we don't have to + care about disconnections. + -} + readLoop :: RIO e () + readLoop = forever $ do + atomically take >>= \case + Nothing -> pure () + Just b -> do + let blip = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b + atomically $ enqueueEv $ blip handleEffect :: TermEf -> RIO e () handleEffect = \case - TermEfBlit _ blits -> do - let (termBlits, fsWrites) = partition isTerminalBlit blits - atomically $ give [Term.Blits termBlits] - for_ fsWrites handleFsWrite - TermEfInit _ _ -> pure () - TermEfLogo path _ -> do - atomically $ shutdownSTM - TermEfMass _ _ -> pure () + TermEfInit _ _ -> pure () + TermEfMass _ _ -> pure () + TermEfLogo _ _ -> atomically shutdownSTM + TermEfBlit _ blits -> do + let (termBlits, fsWrites) = partition isTerminalBlit blits + atomically $ give [Term.Blits termBlits] + for_ fsWrites handleFsWrite + handleFsWrite :: Blit -> RIO e () handleFsWrite (Sag path noun) = performPut path (jamBS noun) diff --git a/pkg/king/lib/Vere/Term/API.hs b/pkg/king/lib/Vere/Term/API.hs index e18859ea0e..d4b3ce7528 100644 --- a/pkg/king/lib/Vere/Term/API.hs +++ b/pkg/king/lib/Vere/Term/API.hs @@ -22,7 +22,7 @@ data Ev = Blits [Blit] deriving (Show) data Client = Client - { take :: STM Belt + { take :: STM (Maybe Belt) , give :: [Ev] -> STM () } diff --git a/pkg/king/lib/Vere/Term/Demux.hs b/pkg/king/lib/Vere/Term/Demux.hs index 9cbde87101..8926277f4b 100644 --- a/pkg/king/lib/Vere/Term/Demux.hs +++ b/pkg/king/lib/Vere/Term/Demux.hs @@ -17,17 +17,40 @@ import qualified Vere.Term.Logic as Logic -- External -------------------------------------------------------------------- +data KeyedSet a = KeyedSet + { _ksTable :: IntMap a + , _nextKey :: Int + } + +instance Semigroup (KeyedSet a) where + KeyedSet t1 k1 <> KeyedSet t2 k2 = KeyedSet (t1 <> t2) (max k1 k2) + +instance Monoid (KeyedSet a) where + mempty = KeyedSet mempty 0 + +ksInsertKey :: a -> KeyedSet a -> (Int, KeyedSet a) +ksInsertKey x (KeyedSet tbl nex) = + (nex, KeyedSet (insertMap nex x tbl) (succ nex)) + +ksInsert :: a -> KeyedSet a -> KeyedSet a +ksInsert x s = snd $ ksInsertKey x s + +ksDelete :: Int -> KeyedSet a -> KeyedSet a +ksDelete k (KeyedSet t n) = KeyedSet (deleteMap k t) n + +-------------------------------------------------------------------------------- + data Demux = Demux - { dConns :: TVar [Client] + { dConns :: TVar (KeyedSet Client) , dStash :: TVar Logic.St } mkDemux :: STM Demux -mkDemux = Demux <$> newTVar [] <*> newTVar Logic.init +mkDemux = Demux <$> newTVar mempty <*> newTVar Logic.init addDemux :: Client -> Demux -> STM () addDemux conn Demux{..} = do - modifyTVar' dConns (conn:) + modifyTVar' dConns (ksInsert conn) stash <- readTVar dStash Term.give conn (Logic.toTermEv <$> Logic.drawState stash) @@ -44,9 +67,26 @@ dGive :: Demux -> [Term.Ev] -> STM () dGive Demux{..} evs = do modifyTVar' dStash (force $ steps evs) conns <- readTVar dConns - for_ conns $ \c -> Term.give c evs + for_ (_ksTable conns) $ \c -> Term.give c evs -dTake :: Demux -> STM Belt +{- + Returns Nothing if any connected client disconnected. A `Demux` + terminal lives forever, so you can continue to call this after it + returns `Nothing`. + + If there are no attached clients, this will not return until one + is attached. +-} +dTake :: Demux -> STM (Maybe Belt) dTake Demux{..} = do conns <- readTVar dConns - asum (Term.take <$> conns) + waitForBelt conns >>= \case + (_, Just b ) -> pure (Just b) + (k, Nothing) -> do writeTVar dConns (ksDelete k conns) + pure Nothing + where + waitForBelt :: KeyedSet Client -> STM (Int, Maybe Belt) + waitForBelt ks = asum + $ fmap (\(k,c) -> (k,) <$> Term.take c) + $ mapToList + $ _ksTable ks