mirror of
https://github.com/urbit/shrub.git
synced 2024-11-28 05:22:27 +03:00
External terminals working well now (Lots more janky cherry picking from king-daemon branch)
This commit is contained in:
parent
c579335288
commit
a920e71aca
@ -4,15 +4,17 @@
|
||||
ships. Do it or strip it out.
|
||||
-}
|
||||
|
||||
module King.API (kingAPI, readPortsFile) where
|
||||
module King.API (King(..), kingAPI, readPortsFile) where
|
||||
|
||||
import UrbitPrelude
|
||||
import Data.Aeson
|
||||
-- ort Data.Aeson
|
||||
import RIO.Directory
|
||||
|
||||
import Arvo (Belt)
|
||||
import King.App (HasConfigDir(..))
|
||||
import Network.Socket (Socket)
|
||||
import Prelude (read)
|
||||
|
||||
-- rt Vere.LockFile (lockFile)
|
||||
|
||||
import qualified Network.HTTP.Types as H
|
||||
@ -20,30 +22,24 @@ import qualified Network.Wai as W
|
||||
import qualified Network.Wai.Handler.Warp as W
|
||||
import qualified Network.Wai.Handler.WebSockets as WS
|
||||
import qualified Network.WebSockets as WS
|
||||
import qualified Urbit.Ob as Ob
|
||||
import qualified Vere.NounServ as NounServ
|
||||
import qualified Vere.Term.API as Term
|
||||
|
||||
|
||||
-- Types -----------------------------------------------------------------------
|
||||
|
||||
type TermConn = NounServ.Conn Belt [Term.Ev]
|
||||
|
||||
type TermConnAPI = TVar (Maybe (TermConn -> STM ()))
|
||||
|
||||
{-
|
||||
Daemon state.
|
||||
-}
|
||||
data King = King
|
||||
{ kServer :: Async ()
|
||||
{ kServer :: Async ()
|
||||
, kTermConn :: TermConnAPI
|
||||
}
|
||||
|
||||
data ShipStatus = Halted | Booting | Booted | Running | LandscapeUp
|
||||
deriving (Generic, ToJSON, FromJSON)
|
||||
|
||||
data KingStatus = Starting | Started
|
||||
deriving (Generic, ToJSON, FromJSON)
|
||||
|
||||
data StatusResp = StatusResp
|
||||
{ king :: KingStatus
|
||||
, ships :: Map Text ShipStatus
|
||||
}
|
||||
deriving (Generic, ToJSON, FromJSON)
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
@ -84,9 +80,11 @@ kingServer is =
|
||||
where
|
||||
startKing :: HasLogFunc e => (Int, Socket) -> RIO e King
|
||||
startKing (port, sock) = do
|
||||
api <- newTVarIO Nothing
|
||||
let opts = W.defaultSettings & W.setPort port
|
||||
tid <- async $ io $ W.runSettingsSocket opts sock $ app
|
||||
pure (King tid)
|
||||
env <- ask
|
||||
tid <- async $ io $ W.runSettingsSocket opts sock $ app env api
|
||||
pure (King tid api)
|
||||
|
||||
{-
|
||||
Start the HTTP server and write to the ports file.
|
||||
@ -99,36 +97,35 @@ kingAPI = do
|
||||
-- lockFile dir
|
||||
kingServer (port, sock)
|
||||
|
||||
stubStatus :: StatusResp
|
||||
stubStatus = StatusResp Started $ mapFromList [("zod", Running)]
|
||||
|
||||
serveTerminal :: Ship -> Word -> W.Application
|
||||
serveTerminal ship word =
|
||||
WS.websocketsOr WS.defaultConnectionOptions placeholderWSApp fallback
|
||||
serveTerminal :: HasLogFunc e => e -> TermConnAPI -> Word -> W.Application
|
||||
serveTerminal env api word =
|
||||
WS.websocketsOr WS.defaultConnectionOptions wsApp fallback
|
||||
where
|
||||
fallback req respond =
|
||||
respond $ W.responseLBS H.status500 []
|
||||
$ "This endpoint uses websockets"
|
||||
|
||||
placeholderWSApp :: WS.ServerApp
|
||||
placeholderWSApp _ = pure ()
|
||||
wsApp pen =
|
||||
atomically (readTVar api) >>= \case
|
||||
Nothing -> WS.rejectRequest pen "Ship not running"
|
||||
Just sp -> do
|
||||
wsc <- io $ WS.acceptRequest pen
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
atomically $ sp $ NounServ.mkConn inp out
|
||||
runRIO env $
|
||||
NounServ.wsConn "NOUNSERV (wsServ) " inp out wsc
|
||||
|
||||
data BadShip = BadShip Text
|
||||
deriving (Show, Exception)
|
||||
|
||||
readShip :: Text -> IO Ship
|
||||
readShip t = Ob.parsePatp t & \case
|
||||
Left err -> throwIO (BadShip t)
|
||||
Right pp -> pure $ Ship $ fromIntegral $ Ob.fromPatp pp
|
||||
|
||||
app :: W.Application
|
||||
app req respond =
|
||||
app :: HasLogFunc e => e -> TermConnAPI -> W.Application
|
||||
app env api req respond =
|
||||
case W.pathInfo req of
|
||||
["terminal", ship, session] -> do
|
||||
["terminal", session] -> do
|
||||
session :: Word <- evaluate $ read $ unpack session
|
||||
ship <- readShip ship
|
||||
serveTerminal ship session req respond
|
||||
serveTerminal env api session req respond
|
||||
["status"] ->
|
||||
respond $ W.responseLBS H.status200 [] $ encode stubStatus
|
||||
respond $ W.responseLBS H.status200 [] "{}"
|
||||
_ ->
|
||||
respond $ W.responseLBS H.status404 [] "No implemented"
|
||||
|
@ -5,6 +5,9 @@ module Vere.NounServ
|
||||
, wsServer
|
||||
, wsClient
|
||||
, testIt
|
||||
, wsServApp
|
||||
, mkConn
|
||||
, wsConn
|
||||
) where
|
||||
|
||||
import UrbitPrelude
|
||||
@ -37,7 +40,7 @@ data Server i o a = Server
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsConn :: (FromNoun i, ToNoun o, Show o, HasLogFunc e)
|
||||
wsConn :: (FromNoun i, ToNoun o) -- , HasLogFunc e)
|
||||
=> Utf8Builder
|
||||
-> TBMChan i -> TBMChan o
|
||||
-> WS.Connection
|
||||
@ -45,21 +48,21 @@ wsConn :: (FromNoun i, ToNoun o, Show o, HasLogFunc e)
|
||||
wsConn pre inp out wsc = do
|
||||
env <- ask
|
||||
|
||||
-- logWarn (pre <> "(wcConn) Connected!")
|
||||
-- logWarn (pre <> "(wcConn) Connected!")
|
||||
|
||||
writer <- io $ async $ runRIO env $ forever $ do
|
||||
-- logWarn (pre <> "(wsConn) Waiting for data.")
|
||||
-- logWarn (pre <> "(wsConn) Waiting for data.")
|
||||
byt <- io $ toStrict <$> WS.receiveData wsc
|
||||
-- logWarn (pre <> "Got data")
|
||||
-- logWarn (pre <> "Got data")
|
||||
dat <- cueBSExn byt >>= fromNounExn
|
||||
-- logWarn (pre <> "(wsConn) Decoded data, writing to chan")
|
||||
-- logWarn (pre <> "(wsConn) Decoded data, writing to chan")
|
||||
atomically $ writeTBMChan inp dat
|
||||
|
||||
reader <- io $ async $ runRIO env $ forever $ do
|
||||
-- logWarn (pre <> "Waiting for data from chan")
|
||||
-- logWarn (pre <> "Waiting for data from chan")
|
||||
atomically (readTBMChan out) >>= \case
|
||||
Nothing -> do
|
||||
-- logWarn (pre <> "(wsConn) Connection closed")
|
||||
-- logWarn (pre <> "(wsConn) Connection closed")
|
||||
error "dead-conn"
|
||||
Just msg -> do
|
||||
-- logWarn (pre <> "(wsConn) Got message! " <> displayShow msg)
|
||||
@ -67,7 +70,7 @@ wsConn pre inp out wsc = do
|
||||
|
||||
res <- atomically (waitCatchSTM writer <|> waitCatchSTM reader)
|
||||
|
||||
-- logWarn $ displayShow (res :: Either SomeException ())
|
||||
-- logWarn $ displayShow (res :: Either SomeException ())
|
||||
|
||||
atomically (closeTBMChan inp >> closeTBMChan out)
|
||||
|
||||
@ -76,9 +79,9 @@ wsConn pre inp out wsc = do
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsClient :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e)
|
||||
=> W.Port -> RIO e (Client i o)
|
||||
wsClient por = do
|
||||
wsClient :: ∀i o e. (ToNoun o, FromNoun i, HasLogFunc e)
|
||||
=> Text -> W.Port -> RIO e (Client i o)
|
||||
wsClient pax por = do
|
||||
env <- ask
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
@ -87,31 +90,36 @@ wsClient por = do
|
||||
-- logDebug "NOUNSERV (wsClie) Trying to connect"
|
||||
|
||||
tid <- io $ async
|
||||
$ WS.runClient "127.0.0.1" por "/terminal/~zod/1"
|
||||
$ WS.runClient "127.0.0.1" por (unpack pax)
|
||||
$ runRIO env . wsConn "NOUNSERV (wsClie) " inp out
|
||||
|
||||
pure $ Client con tid
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsServer :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e)
|
||||
wsServApp :: (HasLogFunc e, ToNoun o, FromNoun i)
|
||||
=> (Conn i o -> STM ())
|
||||
-> WS.PendingConnection
|
||||
-> RIO e ()
|
||||
wsServApp cb pen = do
|
||||
logError "NOUNSERV (wsServer) Got connection!"
|
||||
wsc <- io $ WS.acceptRequest pen
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
atomically $ cb (mkConn inp out)
|
||||
wsConn "NOUNSERV (wsServ) " inp out wsc
|
||||
|
||||
wsServer :: ∀i o e. (ToNoun o, FromNoun i, HasLogFunc e)
|
||||
=> RIO e (Server i o W.Port)
|
||||
wsServer = do
|
||||
con <- io $ newTBMChanIO 5
|
||||
|
||||
let app pen = do
|
||||
logTrace "NOUNSERV (wsServer) Got connection! Accepting"
|
||||
wsc <- io $ WS.acceptRequest pen
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
atomically $ writeTBMChan con (mkConn inp out)
|
||||
wsConn "NOUNSERV (wsServ) " inp out wsc
|
||||
|
||||
tid <- async $ do
|
||||
env <- ask
|
||||
logTrace "NOUNSERV (wsServer) Starting server"
|
||||
io $ WS.runServer "127.0.0.1" 9999 (runRIO env . app)
|
||||
logWarn "NOUNSERV (wsServer) Server died"
|
||||
logError "NOUNSERV (wsServer) Starting server"
|
||||
io $ WS.runServer "127.0.0.1" 9999
|
||||
$ runRIO env . wsServApp (writeTBMChan con)
|
||||
logError "NOUNSERV (wsServer) Server died"
|
||||
atomically $ closeTBMChan con
|
||||
|
||||
pure $ Server (readTBMChan con) tid 9999
|
||||
@ -133,7 +141,7 @@ testIt = do
|
||||
logTrace "(testIt) Starting Server"
|
||||
Server{..} <- wsServer @Example @Example
|
||||
logTrace "(testIt) Connecting"
|
||||
Client{..} <- wsClient @Example @Example sData
|
||||
Client{..} <- wsClient @Example @Example "/" sData
|
||||
|
||||
logTrace "(testIt) Accepting connection"
|
||||
sConn <- fromJust "accept" =<< atomically sAccept
|
||||
|
@ -152,7 +152,12 @@ pier (serf, log, ss) = do
|
||||
saveM <- newEmptyTMVarIO
|
||||
shutdownM <- newEmptyTMVarIO
|
||||
|
||||
_api ← King.kingAPI
|
||||
kapi ← King.kingAPI
|
||||
|
||||
termApiQ <- atomically $ do
|
||||
q <- newTQueue
|
||||
writeTVar (King.kTermConn kapi) (Just $ writeTQueue q)
|
||||
pure q
|
||||
|
||||
let shutdownEvent = putTMVar shutdownM ()
|
||||
|
||||
@ -160,28 +165,22 @@ pier (serf, log, ss) = do
|
||||
|
||||
-- (sz, local) <- Term.localClient
|
||||
|
||||
(waitExternalTerm, termServPort) <- Term.termServer
|
||||
-- (waitExternalTerm, termServPort) <- Term.termServer
|
||||
|
||||
(demux, muxed) <- atomically $ do
|
||||
res <- Term.mkDemux
|
||||
-- Term.addDemux local res
|
||||
pure (res, Term.useDemux res)
|
||||
|
||||
rio $ logInfo $ display $
|
||||
"TERMSERV Terminal Server running on port: " <> tshow termServPort
|
||||
-- rio $ logInfo $ display $
|
||||
-- "TERMSERV Terminal Server running on port: " <> tshow termServPort
|
||||
|
||||
let listenLoop = do
|
||||
acquireWorker $ forever $ do
|
||||
logTrace "TERMSERV Waiting for external terminal."
|
||||
ok <- atomically $ do
|
||||
waitExternalTerm >>= \case
|
||||
Nothing -> pure False
|
||||
Just ext -> Term.addDemux ext demux >> pure True
|
||||
if ok
|
||||
then do logTrace "TERMSERV External terminal connected"
|
||||
listenLoop
|
||||
else logTrace "TERMSERV Termainal server is dead"
|
||||
|
||||
acquireWorker listenLoop
|
||||
atomically $ do
|
||||
ext <- Term.connClient <$> readTQueue termApiQ
|
||||
Term.addDemux ext demux
|
||||
logTrace "TERMSERV External terminal connected."
|
||||
|
||||
swapMVar (sStderr serf) (atomically . Term.trace muxed)
|
||||
|
||||
|
@ -3,7 +3,7 @@ module Vere.Term
|
||||
, localClient
|
||||
, connectToRemote
|
||||
, runTerminalClient
|
||||
, termServer
|
||||
, connClient
|
||||
, term
|
||||
) where
|
||||
|
||||
@ -113,31 +113,11 @@ isTerminalBlit _ = True
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
{-
|
||||
TODO XX HACK: We don't have any good way of handling client
|
||||
disconnect, so we just retry. This will probably waste CPU.
|
||||
-}
|
||||
termServer :: ∀e. HasLogFunc e
|
||||
=> RAcquire e (STM (Maybe Client), Port)
|
||||
termServer = fst <$> mkRAcquire start stop
|
||||
where
|
||||
stop = cancel . snd
|
||||
start = do
|
||||
serv <- Serv.wsServer @Belt @[Term.Ev]
|
||||
|
||||
let getClient = do
|
||||
Serv.sAccept serv <&> \case
|
||||
Nothing -> Nothing
|
||||
Just c -> Just $ Client
|
||||
{ give = Serv.cSend c
|
||||
, take = Serv.cRecv c >>= \case
|
||||
Nothing -> empty
|
||||
Just ev -> pure ev
|
||||
}
|
||||
|
||||
pure ( (getClient, Port $ fromIntegral $ Serv.sData serv)
|
||||
, Serv.sAsync serv
|
||||
)
|
||||
connClient :: Serv.Conn Belt [Term.Ev] -> Client
|
||||
connClient c = Client
|
||||
{ give = Serv.cSend c
|
||||
, take = Serv.cRecv c
|
||||
}
|
||||
|
||||
connectToRemote :: ∀e. HasLogFunc e
|
||||
=> Port
|
||||
@ -147,10 +127,13 @@ connectToRemote port local = mkRAcquire start stop
|
||||
where
|
||||
stop (x, y) = cancel x >> cancel y
|
||||
start = do
|
||||
Serv.Client{..} <- Serv.wsClient (fromIntegral port)
|
||||
Serv.Client{..} <- Serv.wsClient "/terminal/0" (fromIntegral port)
|
||||
|
||||
-- TODO XX Handle disconnect more cleanly.
|
||||
ferry <- async $ forever $ atomically $ asum
|
||||
[ Term.take local >>= Serv.cSend cConn
|
||||
[ Term.take local >>= \case
|
||||
Nothing -> empty
|
||||
Just ev -> Serv.cSend cConn ev
|
||||
, Serv.cRecv cConn >>= \case
|
||||
Nothing -> empty
|
||||
Just ev -> Term.give local ev
|
||||
@ -164,11 +147,13 @@ instance HasConfigDir HackConfigDir where configDirL = hcdPax
|
||||
|
||||
runTerminalClient :: ∀e. HasLogFunc e => FilePath -> RIO e ()
|
||||
runTerminalClient pier = runRAcquire $ do
|
||||
mPort <- runRIO (HCD pier) readPortsFile
|
||||
port <- maybe (error "Can't connect") pure mPort
|
||||
(tsize, local) <- localClient
|
||||
(tid1, tid2) <- connectToRemote (Port $ fromIntegral port) local
|
||||
atomically $ waitSTM tid1 <|> waitSTM tid2
|
||||
mPort <- runRIO (HCD pier) readPortsFile
|
||||
port <- maybe (error "Can't connect") pure mPort
|
||||
mExit <- io newEmptyTMVarIO
|
||||
(siz, cli) <- localClient (putTMVar mExit ())
|
||||
(tid, sid) <- connectToRemote (Port $ fromIntegral port) cli
|
||||
atomically $ waitSTM tid <|> waitSTM sid <|> takeTMVar mExit
|
||||
|
||||
where
|
||||
runRAcquire :: RAcquire e () -> RIO e ()
|
||||
runRAcquire act = rwith act $ const $ pure ()
|
||||
@ -176,8 +161,10 @@ runTerminalClient pier = runRAcquire $ do
|
||||
{-
|
||||
Initializes the generalized input/output parts of the terminal.
|
||||
-}
|
||||
localClient :: ∀e. HasLogFunc e => RAcquire e (TSize.Window Word, Client)
|
||||
localClient = fst <$> mkRAcquire start stop
|
||||
localClient :: ∀e. HasLogFunc e
|
||||
=> STM ()
|
||||
-> RAcquire e (TSize.Window Word, Client)
|
||||
localClient doneSignal = fst <$> mkRAcquire start stop
|
||||
where
|
||||
start :: HasLogFunc e => RIO e ((TSize.Window Word, Client), Private)
|
||||
start = do
|
||||
@ -202,7 +189,7 @@ localClient = fst <$> mkRAcquire start stop
|
||||
pReaderThread <- asyncBound
|
||||
(readTerminal tsReadQueue tsWriteQueue (bell tsWriteQueue))
|
||||
|
||||
let client = Client { take = readTQueue tsReadQueue
|
||||
let client = Client { take = Just <$> readTQueue tsReadQueue
|
||||
, give = writeTQueue tsWriteQueue
|
||||
}
|
||||
|
||||
@ -504,8 +491,10 @@ localClient = fst <$> mkRAcquire start stop
|
||||
writeTQueue rq $ Ctl $ Cord "c"
|
||||
loop rd
|
||||
else if w <= 26 then do
|
||||
sendBelt $ Ctl $ Cord $ pack [BS.w2c (w + 97 - 1)]
|
||||
loop rd
|
||||
case pack [BS.w2c (w + 97 - 1)] of
|
||||
"d" -> atomically doneSignal
|
||||
c -> do sendBelt $ Ctl $ Cord c
|
||||
loop rd
|
||||
else if w == 27 then do
|
||||
loop rd { rdEscape = True }
|
||||
else do
|
||||
@ -537,28 +526,31 @@ term (tsize, Client{..}) shutdownSTM king enqueueEv =
|
||||
|
||||
runTerm :: RAcquire e (EffCb e TermEf)
|
||||
runTerm = do
|
||||
tim <- mkRAcquire start cancel
|
||||
tim <- mkRAcquire (async readLoop) cancel
|
||||
pure handleEffect
|
||||
|
||||
start :: RIO e (Async ())
|
||||
start = async readBelt
|
||||
|
||||
readBelt :: RIO e ()
|
||||
readBelt = forever $ do
|
||||
b <- atomically take
|
||||
let blip = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b
|
||||
atomically $ enqueueEv $ blip
|
||||
{-
|
||||
Because our terminals are always `Demux`ed, we don't have to
|
||||
care about disconnections.
|
||||
-}
|
||||
readLoop :: RIO e ()
|
||||
readLoop = forever $ do
|
||||
atomically take >>= \case
|
||||
Nothing -> pure ()
|
||||
Just b -> do
|
||||
let blip = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b
|
||||
atomically $ enqueueEv $ blip
|
||||
|
||||
handleEffect :: TermEf -> RIO e ()
|
||||
handleEffect = \case
|
||||
TermEfBlit _ blits -> do
|
||||
let (termBlits, fsWrites) = partition isTerminalBlit blits
|
||||
atomically $ give [Term.Blits termBlits]
|
||||
for_ fsWrites handleFsWrite
|
||||
TermEfInit _ _ -> pure ()
|
||||
TermEfLogo path _ -> do
|
||||
atomically $ shutdownSTM
|
||||
TermEfMass _ _ -> pure ()
|
||||
TermEfInit _ _ -> pure ()
|
||||
TermEfMass _ _ -> pure ()
|
||||
TermEfLogo _ _ -> atomically shutdownSTM
|
||||
TermEfBlit _ blits -> do
|
||||
let (termBlits, fsWrites) = partition isTerminalBlit blits
|
||||
atomically $ give [Term.Blits termBlits]
|
||||
for_ fsWrites handleFsWrite
|
||||
|
||||
|
||||
handleFsWrite :: Blit -> RIO e ()
|
||||
handleFsWrite (Sag path noun) = performPut path (jamBS noun)
|
||||
|
@ -22,7 +22,7 @@ data Ev = Blits [Blit]
|
||||
deriving (Show)
|
||||
|
||||
data Client = Client
|
||||
{ take :: STM Belt
|
||||
{ take :: STM (Maybe Belt)
|
||||
, give :: [Ev] -> STM ()
|
||||
}
|
||||
|
||||
|
@ -17,17 +17,40 @@ import qualified Vere.Term.Logic as Logic
|
||||
|
||||
-- External --------------------------------------------------------------------
|
||||
|
||||
data KeyedSet a = KeyedSet
|
||||
{ _ksTable :: IntMap a
|
||||
, _nextKey :: Int
|
||||
}
|
||||
|
||||
instance Semigroup (KeyedSet a) where
|
||||
KeyedSet t1 k1 <> KeyedSet t2 k2 = KeyedSet (t1 <> t2) (max k1 k2)
|
||||
|
||||
instance Monoid (KeyedSet a) where
|
||||
mempty = KeyedSet mempty 0
|
||||
|
||||
ksInsertKey :: a -> KeyedSet a -> (Int, KeyedSet a)
|
||||
ksInsertKey x (KeyedSet tbl nex) =
|
||||
(nex, KeyedSet (insertMap nex x tbl) (succ nex))
|
||||
|
||||
ksInsert :: a -> KeyedSet a -> KeyedSet a
|
||||
ksInsert x s = snd $ ksInsertKey x s
|
||||
|
||||
ksDelete :: Int -> KeyedSet a -> KeyedSet a
|
||||
ksDelete k (KeyedSet t n) = KeyedSet (deleteMap k t) n
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
data Demux = Demux
|
||||
{ dConns :: TVar [Client]
|
||||
{ dConns :: TVar (KeyedSet Client)
|
||||
, dStash :: TVar Logic.St
|
||||
}
|
||||
|
||||
mkDemux :: STM Demux
|
||||
mkDemux = Demux <$> newTVar [] <*> newTVar Logic.init
|
||||
mkDemux = Demux <$> newTVar mempty <*> newTVar Logic.init
|
||||
|
||||
addDemux :: Client -> Demux -> STM ()
|
||||
addDemux conn Demux{..} = do
|
||||
modifyTVar' dConns (conn:)
|
||||
modifyTVar' dConns (ksInsert conn)
|
||||
stash <- readTVar dStash
|
||||
Term.give conn (Logic.toTermEv <$> Logic.drawState stash)
|
||||
|
||||
@ -44,9 +67,26 @@ dGive :: Demux -> [Term.Ev] -> STM ()
|
||||
dGive Demux{..} evs = do
|
||||
modifyTVar' dStash (force $ steps evs)
|
||||
conns <- readTVar dConns
|
||||
for_ conns $ \c -> Term.give c evs
|
||||
for_ (_ksTable conns) $ \c -> Term.give c evs
|
||||
|
||||
dTake :: Demux -> STM Belt
|
||||
{-
|
||||
Returns Nothing if any connected client disconnected. A `Demux`
|
||||
terminal lives forever, so you can continue to call this after it
|
||||
returns `Nothing`.
|
||||
|
||||
If there are no attached clients, this will not return until one
|
||||
is attached.
|
||||
-}
|
||||
dTake :: Demux -> STM (Maybe Belt)
|
||||
dTake Demux{..} = do
|
||||
conns <- readTVar dConns
|
||||
asum (Term.take <$> conns)
|
||||
waitForBelt conns >>= \case
|
||||
(_, Just b ) -> pure (Just b)
|
||||
(k, Nothing) -> do writeTVar dConns (ksDelete k conns)
|
||||
pure Nothing
|
||||
where
|
||||
waitForBelt :: KeyedSet Client -> STM (Int, Maybe Belt)
|
||||
waitForBelt ks = asum
|
||||
$ fmap (\(k,c) -> (k,) <$> Term.take c)
|
||||
$ mapToList
|
||||
$ _ksTable ks
|
||||
|
Loading…
Reference in New Issue
Block a user