king: improve king subsite implementation

Make KingSubsite part of ServConf, handle 404 case more gracefully, make
slog endpoint send SSE headers immediately.

Remaining work mostly revolves around the slog endpoint's slog queue. It
builds up even if nobody is listening, and only the first to pull from
the queue gets to handle/emit the slog event.
This commit is contained in:
fang 2020-10-16 14:06:44 +02:00
parent b35f879502
commit 7853c7df9b
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972
7 changed files with 181 additions and 144 deletions

View File

@ -25,9 +25,9 @@ import Data.PEM (pemParseBS, pemWriteBS)
import RIO.Prelude (decodeUtf8Lenient)
import System.Random (randomIO)
import Urbit.Vere.Http (convertHeaders, unconvertHeaders)
import Urbit.Vere.Eyre.KingSubsite (KingSubsite)
import qualified Network.HTTP.Types as H
import qualified Network.Wai.Conduit as W
-- Types -----------------------------------------------------------------------
@ -177,7 +177,7 @@ startServ
-> HttpServerConf
-> (EvErr -> STM ())
-> (Text -> RIO e ())
-> W.Application
-> KingSubsite
-> RIO e Serv
startServ who isFake conf plan stderr sub = do
logInfo (displayShow ("EYRE", "startServ"))
@ -225,27 +225,27 @@ startServ who isFake conf plan stderr sub = do
logInfo (displayShow ("EYRE", "joinMultiEyre", who, mTls, mCre))
atomically (joinMultiEyre multi who mCre onReq onKilReq)
atomically (joinMultiEyre multi who mCre onReq onKilReq sub)
logInfo $ displayShow ("EYRE", "Starting loopback server")
lop <- serv sub vLive $ ServConf
lop <- serv vLive $ ServConf
{ scHost = soHost (pttLop ptt)
, scPort = soWhich (pttLop ptt)
, scRedi = Nothing
, scFake = False
, scType = STHttp who $ ReqApi
, scType = STHttp who sub $ ReqApi
{ rcReq = onReq Loopback
, rcKil = onKilReq
}
}
logInfo $ displayShow ("EYRE", "Starting insecure server")
ins <- serv sub vLive $ ServConf
ins <- serv vLive $ ServConf
{ scHost = soHost (pttIns ptt)
, scPort = soWhich (pttIns ptt)
, scRedi = secRedi
, scFake = noHttp
, scType = STHttp who $ ReqApi
, scType = STHttp who sub $ ReqApi
{ rcReq = onReq Insecure
, rcKil = onKilReq
}
@ -253,12 +253,12 @@ startServ who isFake conf plan stderr sub = do
mSec <- for mTls $ \tls -> do
logInfo "Starting secure server"
serv sub vLive $ ServConf
serv vLive $ ServConf
{ scHost = soHost (pttSec ptt)
, scPort = soWhich (pttSec ptt)
, scRedi = Nothing
, scFake = noHttps
, scType = STHttps who tls $ ReqApi
, scType = STHttps who tls sub $ ReqApi
{ rcReq = onReq Secure
, rcKil = onKilReq
}
@ -293,7 +293,7 @@ eyre'
=> Ship
-> Bool
-> (Text -> RIO e ())
-> W.Application
-> KingSubsite
-> RIO e ([Ev], RAcquire e (DriverApi HttpServerEf))
eyre' who isFake stderr sub = do
@ -330,9 +330,9 @@ eyre
-> (EvErr -> STM ())
-> Bool
-> (Text -> RIO e ())
-> W.Application
-> KingSubsite
-> ([Ev], RAcquire e (HttpServerEf -> IO ()))
eyre env who plan isFake sub stderr = (initialEvents, runHttpServer)
eyre env who plan isFake stderr sub = (initialEvents, runHttpServer)
where
king = fromIntegral (env ^. kingIdL)
multi = env ^. multiEyreApiL
@ -356,7 +356,7 @@ eyre env who plan isFake sub stderr = (initialEvents, runHttpServer)
restart :: Drv -> HttpServerConf -> RIO e Serv
restart (Drv var) conf = do
logInfo "Restarting http server"
let startAct = startServ who isFake conf plan sub stderr
let startAct = startServ who isFake conf plan stderr sub
res <- fromEither =<< restartService var startAct kill
logInfo "Done restating http server"
pure res

View File

@ -0,0 +1,75 @@
{-|
KingSubsite: runtime-exclusive HTTP request handling, for /~_~
-}
module Urbit.Vere.Eyre.KingSubsite
( KingSubsite
, kingSubsite
, runKingSubsite
, fourOhFourSubsite
) where
import Urbit.Prelude hiding (Builder)
import Data.ByteString.Builder
import Data.Conduit (ConduitT, Flush(..), yield)
import Data.Text.Encoding (encodeUtf8Builder)
import Urbit.Noun.Tank (wash)
import qualified Network.HTTP.Types as H
import qualified Network.Wai as W
import qualified Network.Wai.Conduit as W
newtype KingSubsite = KS { runKingSubsite :: W.Application }
data SlogAction
= KeepAlive
| Slog (Atom, Tank)
conduit :: SlogAction -> ConduitT () (Flush Builder) IO ()
conduit a = do
case a of
KeepAlive -> pure ()
Slog (_, t) -> for_ (wash (WashCfg 0 80) (tankTree t)) $ \l -> do
yield $ Chunk "data:"
yield $ Chunk $ encodeUtf8Builder $ unTape l
yield $ Chunk "\n"
yield $ Chunk "\n"
yield $ Flush
kingSubsite :: HasLogFunc e
=> TVar ((Atom, Tank) -> IO ())
-> RAcquire e KingSubsite
kingSubsite func = do
slogQ :: TQueue (Atom, Tank) <- newTQueueIO
baton :: TMVar () <- newEmptyTMVarIO
atomically $ writeTVar func (\s -> atomically $ writeTQueue slogQ s)
acquireWorker "Runtime subsite keep-alive" $ forever $ do
atomically $ putTMVar baton ()
threadDelay 20_000_000
let action = (KeepAlive <$ takeTMVar baton) -- every 20s
<|> (Slog <$> readTQueue slogQ)
--TODO queue builds even without listeners connected.
-- and with listeners connected, only one pops from queue!
-- need queue per connection, not global?
let loop = yield Flush >> forever (atomically action >>= conduit)
--TODO scry to verify cookie authentication
pure $ KS $ \req respond -> respond $ case W.pathInfo req of
("~_~":"slog":_) -> W.responseSource (H.mkStatus 200 "OK") heads loop
_ -> W.responseLBS (H.mkStatus 404 "Not Found") [] ""
where
heads = [ ("Content-Type" , "text/event-stream")
, ("Cache-Control", "no-cache")
, ("Connection" , "keep-alive")
]
fourOhFourSubsite :: Ship -> KingSubsite
fourOhFourSubsite who = KS $ \req respond ->
respond $ W.responseLBS (H.mkStatus 404 "Not Found") [] body
where
body = toLazyByteString $ foldMap charUtf8 $ msg
msg = "Ship " <> (show who) <> " not docked."

View File

@ -21,8 +21,7 @@ import Urbit.Vere.Eyre.Serv
import Urbit.Vere.Eyre.Wai
import Network.TLS (Credential)
import Network.Wai as W
import Urbit.Vere.Eyre.KingSubsite (KingSubsite, fourOhFourSubsite)
-- Types -----------------------------------------------------------------------
@ -47,6 +46,7 @@ data MultiEyreApi = MultiEyreApi
, meaPlan :: TVar (Map Ship OnMultiReq)
, meaCanc :: TVar (Map Ship OnMultiKil)
, meaTlsC :: TVar (Map Ship (TlsConfig, Credential))
, meaSite :: TVar (Map Ship KingSubsite)
, meaKill :: STM ()
}
@ -59,27 +59,36 @@ joinMultiEyre
-> Maybe (TlsConfig, Credential)
-> OnMultiReq
-> OnMultiKil
-> KingSubsite
-> STM ()
joinMultiEyre api who mTls onReq onKil = do
joinMultiEyre api who mTls onReq onKil sub = do
modifyTVar' (meaPlan api) (insertMap who onReq)
modifyTVar' (meaCanc api) (insertMap who onKil)
for_ mTls $ \creds -> do
modifyTVar' (meaTlsC api) (insertMap who creds)
modifyTVar' (meaSite api) (insertMap who sub)
leaveMultiEyre :: MultiEyreApi -> Ship -> STM ()
leaveMultiEyre MultiEyreApi {..} who = do
modifyTVar' meaCanc (deleteMap who)
modifyTVar' meaPlan (deleteMap who)
modifyTVar' meaTlsC (deleteMap who)
modifyTVar' meaSite (deleteMap who)
multiEyre :: HasLogFunc e => MultiEyreConf -> W.Application -> RIO e MultiEyreApi
multiEyre conf@MultiEyreConf {..} sub = do
multiEyre :: HasLogFunc e => MultiEyreConf -> RIO e MultiEyreApi
multiEyre conf@MultiEyreConf {..} = do
logInfo (displayShow ("EYRE", "MULTI", conf))
vLive <- io emptyLiveReqs >>= newTVarIO
vPlan <- newTVarIO mempty
vCanc <- newTVarIO (mempty :: Map Ship (Ship -> Word64 -> STM ()))
vTlsC <- newTVarIO mempty
vSite <- newTVarIO mempty
let site :: Ship -> STM KingSubsite
site who = do
sites <- readTVar vSite
pure $ maybe (fourOhFourSubsite who) id $ lookup who sites
let host = if mecLocalhostOnly then SHLocalhost else SHAnyHostOk
@ -99,12 +108,12 @@ multiEyre conf@MultiEyreConf {..} sub = do
mIns <- for mecHttpPort $ \por -> do
logInfo (displayShow ("EYRE", "MULTI", "HTTP", por))
serv sub vLive $ ServConf
serv vLive $ ServConf
{ scHost = host
, scPort = SPChoices $ singleton $ fromIntegral por
, scRedi = Nothing -- TODO
, scFake = False
, scType = STMultiHttp $ ReqApi
, scType = STMultiHttp site $ ReqApi
{ rcReq = onReq Insecure
, rcKil = onKil
}
@ -112,12 +121,12 @@ multiEyre conf@MultiEyreConf {..} sub = do
mSec <- for mecHttpsPort $ \por -> do
logInfo (displayShow ("EYRE", "MULTI", "HTTPS", por))
serv sub vLive $ ServConf
serv vLive $ ServConf
{ scHost = host
, scPort = SPChoices $ singleton $ fromIntegral por
, scRedi = Nothing
, scFake = False
, scType = STMultiHttps (MTC vTlsC) $ ReqApi
, scType = STMultiHttps (MTC vTlsC) site $ ReqApi
{ rcReq = onReq Secure
, rcKil = onKil
}
@ -128,6 +137,7 @@ multiEyre conf@MultiEyreConf {..} sub = do
, meaPlan = vPlan
, meaCanc = vCanc
, meaTlsC = vTlsC
, meaSite = vSite
, meaConf = conf
, meaKill = traverse_ saKil (toList mIns <> toList mSec)
}

View File

@ -37,9 +37,13 @@ import Urbit.Prelude hiding (Builder)
import Data.Default (def)
import Data.List.NonEmpty (NonEmpty((:|)))
import Network.TLS (Credential, Credentials(..), ServerHooks(..))
import Network.TLS ( Credential
, Credentials(..)
, ServerHooks(..)
)
import Network.TLS (credentialLoadX509ChainFromMemory)
import RIO.Prelude (decodeUtf8Lenient)
import Urbit.Vere.Eyre.KingSubsite (KingSubsite)
import qualified Control.Monad.STM as STM
import qualified Data.Char as C
@ -67,23 +71,23 @@ data TlsConfig = TlsConfig
newtype MultiTlsConfig = MTC (TVar (Map Ship (TlsConfig, Credential)))
instance Show MultiTlsConfig where
show = const "MultiTlsConfig"
data ReqApi = ReqApi
{ rcReq :: Ship -> Word64 -> E.ReqInfo -> STM ()
, rcKil :: Ship -> Word64 -> STM ()
}
instance Show ReqApi where
show = const "ReqApi"
data ServType
= STHttp Ship ReqApi
| STHttps Ship TlsConfig ReqApi
| STMultiHttp ReqApi
| STMultiHttps MultiTlsConfig ReqApi
deriving (Show)
= STHttp Ship KingSubsite ReqApi
| STHttps Ship TlsConfig KingSubsite ReqApi
| STMultiHttp (Ship -> STM KingSubsite) ReqApi
| STMultiHttps MultiTlsConfig (Ship -> STM KingSubsite) ReqApi
instance Show ServType where
show = \case
STHttp who _ _ -> "STHttp " <> show who
STHttps who tls _ _ -> "STHttps " <> show who <> " " <> show tls
STMultiHttp _ _ -> "STMultiHttp"
STMultiHttps tls _ _ -> "STMultiHttps"
data ServPort
= SPAnyPort
@ -247,10 +251,9 @@ startServer
-> W.Port
-> Net.Socket
-> Maybe W.Port
-> W.Application
-> TVar E.LiveReqs
-> RIO e ()
startServer typ hos por sok red sub vLive = do
startServer typ hos por sok red vLive = do
envir <- ask
let host = case hos of
@ -264,26 +267,27 @@ startServer typ hos por sok red sub vLive = do
& W.setTimeout (5 * 60)
-- TODO build Eyre.Site.app in pier, thread through here
let runAppl who = E.app envir who sub vLive
let runAppl who = E.app envir who vLive
reqShip = hostShip . W.requestHeaderHost
case typ of
STHttp who api -> do
let app = runAppl who (rcReq api who) (rcKil api who)
STHttp who sub api -> do
let app = runAppl who (rcReq api who) (rcKil api who) sub
io (W.runSettingsSocket opts sok app)
STHttps who TlsConfig {..} api -> do
STHttps who TlsConfig {..} sub api -> do
let tls = W.tlsSettingsChainMemory tcCerti tcChain tcPrKey
let app = runAppl who (rcReq api who) (rcKil api who)
let app = runAppl who (rcReq api who) (rcKil api who) sub
io (W.runTLSSocket tls opts sok app)
STMultiHttp api -> do
STMultiHttp fub api -> do
let app req resp = do
who <- reqShip req
runAppl who (rcReq api who) (rcKil api who) req resp
sub <- atomically $ fub who
runAppl who (rcReq api who) (rcKil api who) sub req resp
io (W.runSettingsSocket opts sok app)
STMultiHttps mtls api -> do
STMultiHttps mtls fub api -> do
TlsConfig {..} <- atomically (getFirstTlsConfig mtls)
let sni = def { onServerNameIndication = onSniHdr envir mtls }
@ -298,7 +302,8 @@ startServer typ hos por sok red sub vLive = do
runRIO envir $ logDbg ctx "Got request"
who <- reqShip req
runRIO envir $ logDbg ctx ("Parsed HOST", who)
runAppl who (rcReq api who) (rcKil api who) req resp
sub <- atomically $ fub who
runAppl who (rcReq api who) (rcKil api who) sub req resp
io (W.runTLSSocket tlsMany opts sok app)
@ -331,8 +336,8 @@ getFirstTlsConfig (MTC var) = do
[] -> STM.retry
x:_ -> pure (fst x)
realServ :: HasLogFunc e => W.Application -> TVar E.LiveReqs -> ServConf -> RIO e ServApi
realServ sub vLive conf@ServConf {..} = do
realServ :: HasLogFunc e => TVar E.LiveReqs -> ServConf -> RIO e ServApi
realServ vLive conf@ServConf {..} = do
logInfo (displayShow ("EYRE", "SERV", "Running Real Server"))
kil <- newEmptyTMVarIO
por <- newEmptyTMVarIO
@ -349,10 +354,10 @@ realServ sub vLive conf@ServConf {..} = do
logInfo (displayShow ("EYRE", "SERV", "runServ"))
rwith (forceOpenSocket scHost scPort) $ \(por, sok) -> do
atomically (putTMVar vPort por)
startServer scType scHost por sok scRedi sub vLive
startServer scType scHost por sok scRedi vLive
serv :: HasLogFunc e => W.Application -> TVar E.LiveReqs -> ServConf -> RIO e ServApi
serv sub vLive conf = do
serv :: HasLogFunc e => TVar E.LiveReqs -> ServConf -> RIO e ServApi
serv vLive conf = do
if scFake conf
then fakeServ conf
else realServ sub vLive conf
else realServ vLive conf

View File

@ -1,53 +0,0 @@
module Urbit.Vere.Eyre.Site (app) where
import Urbit.Prelude hiding (Builder)
import Data.ByteString.Builder
import Data.Conduit (ConduitT, Flush(..), yield)
import Data.Text.Encoding (encodeUtf8Builder)
import Urbit.Noun.Tank (wash)
import qualified Network.HTTP.Types as H
import qualified Network.Wai as W
import qualified Network.Wai.Conduit as W
data SlogAction
= KeepAlive
| Slog (Atom, Tank)
-- veify that if you have multiple open uwu slogs, you multiplex
-- thread TVar func and this server through from pier (loopback only)
-- LATER check cookies & scry, support on all servers
conduit :: SlogAction -> ConduitT () (Flush Builder) IO ()
conduit a = do
case a of
KeepAlive -> pure ()
Slog (_, t) -> for_ (wash (WashCfg 0 80) (tankTree t)) $ \l -> do
yield $ Chunk "data:"
yield $ Chunk $ encodeUtf8Builder $ unTape l
yield $ Chunk "\n"
yield $ Chunk "\n"
yield $ Flush
app :: HasLogFunc e
=> TVar ((Atom, Tank) -> IO ())
-> RAcquire e W.Application
app func = do
slogQ :: TQueue (Atom, Tank) <- newTQueueIO
baton :: TMVar () <- newEmptyTMVarIO
atomically $ writeTVar func (\s -> atomically $ writeTQueue slogQ s)
acquireWorker "Runtime subsite keep-alive" $ forever $ do
atomically $ putTMVar baton ()
threadDelay 30_000_000
let action = (KeepAlive <$ takeTMVar baton) -- every 30s
<|> (Slog <$> readTQueue slogQ)
-- TODO write more compactly
let loop = forever (atomically action >>= conduit)
pure $ \req respond -> respond $ case W.pathInfo req of
("~_~":"slog":_) -> W.responseSource (H.mkStatus 200 "OK") [] loop
_ -> W.responseLBS (H.mkStatus 404 "Not Found") [] ""

View File

@ -33,6 +33,7 @@ import Data.Conduit (ConduitT, Flush(Chunk, Flush), yield)
import Network.Socket (SockAddr(..))
import System.Random (newStdGen, randoms)
import Urbit.Arvo (Address(..), Ipv4(..), Ipv6(..), Method)
import Urbit.Vere.Eyre.KingSubsite (KingSubsite, runKingSubsite)
import qualified Network.HTTP.Types as H
import qualified Network.Wai as W
@ -207,14 +208,14 @@ app
:: HasLogFunc e
=> e
-> Ship
-> W.Application
-> TVar LiveReqs
-> (Word64 -> ReqInfo -> STM ())
-> (Word64 -> STM ())
-> KingSubsite
-> W.Application
app env who kingSubsite liv inform cancel req respond =
app env who liv inform cancel sub req respond =
case W.pathInfo req of
("~_~":_) -> kingSubsite req respond
("~_~":_) -> runKingSubsite sub req respond
_ ->
runRIO env $ rwith (liveReq who liv) $ \(reqId, respApi) -> do
bod <- io (toStrict <$> W.strictRequestBody req)

View File

@ -35,7 +35,6 @@ import Urbit.TermSize (TermSize(..), termSize)
import Urbit.Vere.Serf (Serf)
import qualified Data.Text as T
import qualified Network.Wai as W
import qualified System.Entropy as Ent
import qualified Urbit.EventLog.LMDB as Log
import qualified Urbit.King.API as King
@ -44,7 +43,7 @@ import qualified Urbit.Vere.Ames as Ames
import qualified Urbit.Vere.Behn as Behn
import qualified Urbit.Vere.Clay as Clay
import qualified Urbit.Vere.Eyre as Eyre
import qualified Urbit.Vere.Eyre.Site as Site
import qualified Urbit.Vere.Eyre.KingSubsite as Site
import qualified Urbit.Vere.Http.Client as Iris
import qualified Urbit.Vere.Serf as Serf
import qualified Urbit.Vere.Term as Term
@ -282,7 +281,7 @@ pier (serf, log) vSlog startedSig = do
-- Set up the runtime subsite server and its capability to slog
siteSlog <- newTVarIO (const $ pure ())
runtimeSubsite <- Site.app siteSlog
runtimeSubsite <- Site.kingSubsite siteSlog
-- Slogs go to stderr, to the runtime subsite, and to the terminal.
env <- ask
@ -412,7 +411,7 @@ drivers
-> (TermSize, Term.Client)
-> (Text -> RIO e ())
-> IO ()
-> W.Application
-> Site.KingSubsite
-> RAcquire e ([Ev], RAcquire e Drivers)
drivers env who isFake plan termSys stderr serfSIGINT sub = do
(behnBorn, runBehn) <- rio Behn.behn'