king: Finish factoring out HTTP server lifecycle from Eyre.

This commit is contained in:
Benjamin Summers 2020-05-10 15:27:02 -07:00
parent 37855fd704
commit c1454b1366
2 changed files with 128 additions and 337 deletions

View File

@ -1,31 +1,9 @@
{-| {-|
Http Server Driver Eyre: Http Server Driver
TODO Make sure that HTTP sockets get closed on shutdown.
TODO What is this about?
// if we don't explicitly set this field, h2o will send with
// transfer-encoding: chunked
//
if ( 1 == has_len_i ) {
rec_u->res.content_length = ( 0 == gen_u->bod_u ) ?
0 : gen_u->bod_u->len_w;
}
TODO Does this matter, is is using WAI's default behavior ok?
rec_u->res.reason = (status < 200) ? "weird" :
(status < 300) ? "ok" :
(status < 400) ? "moved" :
(status < 500) ? "missing" :
"hosed";
-} -}
module Urbit.Vere.Eyre module Urbit.Vere.Eyre
( eyre ( eyre
, multiServ
, ShipAPI(..)
) )
where where
@ -34,20 +12,17 @@ import Urbit.Prelude hiding (Builder)
import Urbit.Arvo hiding (ServerId, reqUrl, secure) import Urbit.Arvo hiding (ServerId, reqUrl, secure)
import Urbit.King.Config import Urbit.King.Config
import Urbit.Vere.Eyre.Wai hiding (ReqId) import Urbit.Vere.Eyre.Wai hiding (ReqId)
import Urbit.Vere.Eyre.Serv
import Urbit.Vere.Pier.Types import Urbit.Vere.Pier.Types
import Data.PEM (pemParseBS, pemWriteBS) import Data.List.NonEmpty (NonEmpty((:|)))
import Network.Socket (SockAddr(..)) import Data.PEM (pemParseBS, pemWriteBS)
import RIO.Prelude (decodeUtf8Lenient) import RIO.Prelude (decodeUtf8Lenient)
import System.Directory (doesFileExist, removeFile) import System.Directory (doesFileExist, removeFile)
import System.Random (randomIO) import System.Random (randomIO)
import Urbit.Vere.Http (convertHeaders, unconvertHeaders) import Urbit.Vere.Http (convertHeaders, unconvertHeaders)
import qualified Network.HTTP.Types as H import qualified Network.HTTP.Types as H
import qualified Network.Socket as Net
import qualified Network.Wai as W
import qualified Network.Wai.Handler.Warp as W
import qualified Network.Wai.Handler.WarpTLS as W
-- Internal Types -------------------------------------------------------------- -- Internal Types --------------------------------------------------------------
@ -57,27 +32,24 @@ type HasShipEnv e = (HasLogFunc e, HasNetworkConfig e, HasPierConfig e)
type ReqId = UD type ReqId = UD
data Ports = Ports data Ports = Ports
{ pHttps :: Maybe Port { pHttps :: Maybe Port
, pHttp :: Port , pHttp :: Port
, pLoop :: Port , pLoop :: Port
} }
deriving (Eq, Ord, Show) deriving (Eq, Ord, Show)
newtype Drv = Drv { _unDrv :: MVar (Maybe Serv) } newtype Drv = Drv (MVar (Maybe Serv))
data Serv = Serv data Serv = Serv
{ sServId :: ServId { sServId :: ServId
, sConfig :: HttpServerConf , sConfig :: HttpServerConf
, sLoopTid :: Async () , sLop :: ServApi
, sHttpTid :: Async () , sIns :: ServApi
, sHttpsTid :: Maybe (Async ()) , sSec :: Maybe ServApi
, sLoopSock :: Net.Socket , sPorts :: Ports
, sHttpSock :: Net.Socket , sPortsFile :: FilePath
, sHttpsSock :: Net.Socket , sLiveReqs :: TVar LiveReqs
, sPorts :: Ports }
, sPortsFile :: FilePath
, sLiveReqs :: TVar LiveReqs
}
-- Generic Service Stop/Restart -- Using an MVar for Atomicity ----------------- -- Generic Service Stop/Restart -- Using an MVar for Atomicity -----------------
@ -182,16 +154,9 @@ reqEv sId reqId which addr req =
-- Top-Level Driver Interface -------------------------------------------------- -- Top-Level Driver Interface --------------------------------------------------
data CantOpenPort = CantOpenPort W.Port
deriving (Eq, Ord, Show, Exception)
data WhichPort
= WPSpecific W.Port
| WPChoices [W.Port]
data SockOpts = SockOpts data SockOpts = SockOpts
{ soLocalhost :: Bool { soLocalhost :: Bool
, soWhich :: WhichPort , soWhich :: ServPort
} }
data PortsToTry = PortsToTry data PortsToTry = PortsToTry
@ -200,73 +165,6 @@ data PortsToTry = PortsToTry
, pttLop :: SockOpts , pttLop :: SockOpts
} }
{-|
Opens a socket on some port, accepting connections from `127.0.0.1`
if fake and `0.0.0.0` if real.
It will attempt to open a socket on each of the supplied ports in
order. If they all fail, it will ask the operating system to give
us an open socket on *any* open port. If that fails, it will throw
an exception.
-}
openPort :: forall e . HasLogFunc e => SockOpts -> RIO e (W.Port, Net.Socket)
openPort SockOpts {..} = case soWhich of
WPSpecific x -> insist (fromIntegral x)
WPChoices xs -> loop (fromIntegral <$> xs)
where
loop :: [W.Port] -> RIO e (W.Port, Net.Socket)
loop = \case
[] -> do
logTrace "Fallback: asking the OS to give us some free port."
ps <- io W.openFreePort
logTrace (display ("Opened port " <> tshow (fst ps)))
pure ps
x : xs -> do
logTrace (display ("Trying to open port " <> tshow x))
io (tryOpen x) >>= \case
Left (err :: IOError) -> do
logWarn (display ("Failed to open port " <> tshow x))
logWarn (display (tshow err))
loop xs
Right ps -> do
logTrace (display ("Opened port " <> tshow (fst ps)))
pure ps
insist :: W.Port -> RIO e (W.Port, Net.Socket)
insist p = do
logTrace (display ("Opening configured port " <> tshow p))
io (tryOpen p) >>= \case
Left (err :: IOError) -> do
logWarn (display ("Failed to open port " <> tshow p))
logWarn (display (tshow err))
throwIO (CantOpenPort p)
Right ps -> do
logTrace (display ("Opened port " <> tshow (fst ps)))
pure ps
bindTo = if soLocalhost then "127.0.0.1" else "0.0.0.0"
getBindAddr :: W.Port -> IO SockAddr
getBindAddr por =
Net.getAddrInfo Nothing (Just bindTo) (Just (show por)) >>= \case
[] -> error "this should never happen."
x : _ -> pure (Net.addrAddress x)
bindListenPort :: W.Port -> Net.Socket -> IO Net.PortNumber
bindListenPort por sok = do
Net.bind sok =<< getBindAddr por
Net.listen sok 1
Net.socketPort sok
-- `inet_addr`, `bind`, and `listen` all throw `IOError` if they fail.
tryOpen :: W.Port -> IO (Either IOError (W.Port, Net.Socket))
tryOpen por = do
sok <- Net.socket Net.AF_INET Net.Stream Net.defaultProtocol
try (bindListenPort por sok) >>= \case
Left exn -> Net.close sok $> Left exn
Right por -> pure (Right (fromIntegral por, sok))
httpServerPorts :: HasShipEnv e => Bool -> RIO e PortsToTry httpServerPorts :: HasShipEnv e => Bool -> RIO e PortsToTry
httpServerPorts fak = do httpServerPorts fak = do
ins <- view (networkConfigL . ncHttpPort . to (fmap fromIntegral)) ins <- view (networkConfigL . ncHttpPort . to (fmap fromIntegral))
@ -277,49 +175,21 @@ httpServerPorts fak = do
let local = localMode || fak let local = localMode || fak
let pttSec = case (sec, fak) of let pttSec = case (sec, fak) of
(Just p , _ ) -> SockOpts local (WPSpecific p) (Just p , _ ) -> SockOpts local (SPChoices $ singleton p)
(Nothing, False) -> SockOpts local (WPChoices (443 : [8443 .. 8448])) (Nothing, False) -> SockOpts local (SPChoices (443 :| [8443 .. 8453]))
(Nothing, True ) -> SockOpts local (WPChoices ([8443 .. 8448])) (Nothing, True ) -> SockOpts local (SPChoices (8443 :| [8444 .. 8453]))
let pttIns = case (ins, fak) of let pttIns = case (ins, fak) of
(Just p , _ ) -> SockOpts local (WPSpecific p) (Just p , _ ) -> SockOpts local (SPChoices $ singleton p)
(Nothing, False) -> SockOpts local (WPChoices (80 : [8080 .. 8085])) (Nothing, False) -> SockOpts local (SPChoices (80 :| [8080 .. 8090]))
(Nothing, True ) -> SockOpts local (WPChoices [8080 .. 8085]) (Nothing, True ) -> SockOpts local (SPChoices (8080 :| [8081 .. 8090]))
let pttLop = case (lop, fak) of let pttLop = case (lop, fak) of
(Just p , _) -> SockOpts local (WPSpecific p) (Just p , _) -> SockOpts local (SPChoices $ singleton p)
(Nothing, _) -> SockOpts local (WPChoices [12321 .. 12326]) (Nothing, _) -> SockOpts local SPAnyPort
pure (PortsToTry { .. }) pure (PortsToTry { .. })
eyreApp
:: HasLogFunc e
=> e
-> ServId
-> TVar LiveReqs
-> (Ev -> STM ())
-> WhichServer
-> W.Application
eyreApp env sId vLive plan which =
app env vLive onReq onCancel
where
bodFile "" = Nothing
bodFile bs = Just $ File $ Octs bs
onReq :: Word64 -> ReqInfo -> STM ()
onReq reqId ReqInfo{..} = do
let evBod = bodFile riBod
evHdr = convertHeaders riHdr
evUrl = Cord (decodeUtf8Lenient riUrl)
evReq = HttpRequest riMet evUrl evHdr evBod
reqUd = fromIntegral reqId
event = reqEv sId reqUd which riAdr evReq
plan event
onCancel :: Word64 -> STM ()
onCancel reqId = plan (cancelEv sId (fromIntegral reqId))
parseCerts :: ByteString -> Maybe (ByteString, [ByteString]) parseCerts :: ByteString -> Maybe (ByteString, [ByteString])
parseCerts bs = do parseCerts bs = do
pems <- pemParseBS bs & either (const Nothing) Just pems <- pemParseBS bs & either (const Nothing) Just
@ -344,13 +214,12 @@ reorgHttpEvent = \case
hSta :: ResponseHeader -> H.Status hSta :: ResponseHeader -> H.Status
hSta = toEnum . fromIntegral . statusCode hSta = toEnum . fromIntegral . statusCode
respond :: HasLogFunc e respond :: HasLogFunc e
=> Drv -> Word64 -> HttpEvent -> RIO e () => Drv -> Word64 -> HttpEvent -> RIO e ()
respond (Drv v) reqId ev = do respond (Drv v) reqId ev = do
readMVar v >>= \case readMVar v >>= \case
Nothing -> logError "Got a response to a request that does not exist." Nothing -> logError "Got a response to a request that does not exist."
Just sv -> do logDebug $ displayShow ev Just sv -> do logTrace $ displayShow ev
for_ (reorgHttpEvent ev) $ for_ (reorgHttpEvent ev) $
atomically . routeRespAct (sLiveReqs sv) reqId atomically . routeRespAct (sLiveReqs sv) reqId
@ -361,76 +230,92 @@ startServ :: (HasPierConfig e, HasLogFunc e, HasNetworkConfig e)
=> Bool -> HttpServerConf -> (Ev -> STM ()) => Bool -> HttpServerConf -> (Ev -> STM ())
-> RIO e Serv -> RIO e Serv
startServ isFake conf plan = do startServ isFake conf plan = do
logDebug "startServ" logTrace "startServ"
let tls = do (PEM key, PEM certs) <- hscSecure conf srvId <- io $ ServId . UV . fromIntegral <$> (randomIO :: IO Word32)
(cert, chain) <- parseCerts (wainBytes certs)
pure $ W.tlsSettingsChainMemory cert chain $ wainBytes key
sId <- io $ ServId . UV . fromIntegral <$> (randomIO :: IO Word32) let mTls = do
liv <- newTVarIO emptyLiveReqs (PEM key, PEM certs) <- hscSecure conf
(cert, chain) <- parseCerts (wainBytes certs)
pure $ TlsConfig (wainBytes key) cert chain
ptt <- httpServerPorts isFake ptt <- httpServerPorts isFake
(httpPortInt, httpSock) <- openPort (pttIns ptt) let secRedi = Nothing -- TODO
(httpsPortInt, httpsSock) <- openPort (pttSec ptt)
(loopPortInt, loopSock) <- openPort (pttLop ptt)
let httpPort = Port (fromIntegral httpPortInt) let soHost :: SockOpts -> ServHost
httpsPort = Port (fromIntegral httpsPortInt) soHost so = if soLocalhost so then SHLocalhost else SHAnyHostOk
loopPort = Port (fromIntegral loopPortInt)
let loopOpts = W.defaultSettings & W.setPort (fromIntegral loopPort) vLive <- newTVarIO emptyLiveReqs
& W.setHost "127.0.0.1"
& W.setTimeout (5 * 60)
httpOpts = W.defaultSettings & W.setHost "*"
& W.setPort (fromIntegral httpPort)
httpsOpts = W.defaultSettings & W.setHost "*"
& W.setPort (fromIntegral httpsPort)
env <- ask let bodFile "" = Nothing
bodFile bs = Just $ File $ Octs bs
logDebug "Starting loopback server" let onReq :: WhichServer -> Word64 -> ReqInfo -> STM ()
loopTid <- async $ io onReq which reqId ReqInfo{..} = do
$ W.runSettingsSocket loopOpts loopSock let evBod = bodFile riBod
$ eyreApp env sId liv plan Loopback let evHdr = convertHeaders riHdr
let evUrl = Cord (decodeUtf8Lenient riUrl)
let evReq = HttpRequest riMet evUrl evHdr evBod
let reqUd = fromIntegral reqId
let event = reqEv srvId reqUd which riAdr evReq
plan event
logDebug "Starting HTTP server" let onKilReq = plan . cancelEv srvId . fromIntegral
httpTid <- async $ io
$ W.runSettingsSocket httpOpts httpSock
$ eyreApp env sId liv plan Insecure
logDebug "Starting HTTPS server" logTrace "Starting loopback server"
httpsTid <- for tls $ \tlsOpts -> lop <- serv vLive $ ServConf
async $ io { scHost = soHost (pttLop ptt)
$ W.runTLSSocket tlsOpts httpsOpts httpsSock , scPort = soWhich (pttLop ptt)
$ eyreApp env sId liv plan Secure , scRedi = Nothing
, scType = STHttp $ ReqApi
{ rcReq = \() -> onReq Loopback
, rcKil = onKilReq
}
}
logTrace "Starting insecure server"
ins <- serv vLive $ ServConf
{ scHost = soHost (pttIns ptt)
, scPort = soWhich (pttIns ptt)
, scRedi = secRedi
, scType = STHttp $ ReqApi
{ rcReq = \() -> onReq Insecure
, rcKil = onKilReq
}
}
mSec <- for mTls $ \tls -> do
logTrace "Starting secure server"
serv vLive $ ServConf
{ scHost = soHost (pttSec ptt)
, scPort = soWhich (pttSec ptt)
, scRedi = Nothing
, scType = STHttps tls $ ReqApi
{ rcReq = \() -> onReq Secure
, rcKil = onKilReq
}
}
pierPath <- view pierPathL pierPath <- view pierPathL
let por = Ports (tls <&> const httpsPort) httpPort loopPort
lopPor <- atomically (fmap fromIntegral $ saPor lop)
insPor <- atomically (fmap fromIntegral $ saPor ins)
secPor <- for mSec (fmap fromIntegral . atomically . saPor)
let por = Ports secPor insPor lopPor
fil = pierPath <> "/.http.ports" fil = pierPath <> "/.http.ports"
logDebug $ displayShow (sId, por, fil) logTrace $ displayShow ("EYRE", "All Servers Started.", srvId, por, fil)
logDebug "Finished started HTTP Servers" pure $ Serv srvId conf lop ins mSec por fil vLive
pure $ Serv sId conf
loopTid httpTid httpsTid
httpSock httpsSock loopSock
por fil liv
killServ :: HasLogFunc e => Serv -> RIO e () killServ :: HasLogFunc e => Serv -> RIO e ()
killServ Serv{..} = do killServ Serv{..} = do
cancel sLoopTid atomically (saKil sLop)
cancel sHttpTid atomically (saKil sIns)
traverse_ cancel sHttpsTid for_ sSec (\sec -> atomically (saKil sec))
io $ Net.close sHttpSock removePortsFile sPortsFile
io $ Net.close sHttpsSock
io $ Net.close sLoopSock
removePortsFile sPortsFile
(void . waitCatch) sLoopTid
(void . waitCatch) sHttpTid
traverse_ (void . waitCatch) sHttpsTid
kill :: HasLogFunc e => Drv -> RIO e () kill :: HasLogFunc e => Drv -> RIO e ()
kill (Drv v) = stopService v killServ >>= fromEither kill (Drv v) = stopService v killServ >>= fromEither
@ -471,111 +356,3 @@ eyre king plan isFake =
-- when (i == fromIntegral king) $ do -- when (i == fromIntegral king) $ do
logDebug "respond" logDebug "respond"
respond drv (fromIntegral req) ev respond drv (fromIntegral req) ev
-- Multi-Tenet HTTP ------------------------------------------------------------
{-
# Very First Phase: Shared HTTP, no SSL.
- Global configuration flag for shared HTTP port.
- Shared server starts before ships.
- Shared server is informed when ships go up and come down.
- Shared server delivers requests to existing HTTP driver.
- Existing HTTP driver can send responses to shared HTTP server.
-}
type ShareRequ = (ServId, ReqId, WhichServer, Address, HttpRequest)
type ShareResp = (ServId, UD, UD, HttpEvent)
data ShipAPI = ShipAPI
{ sapiReq :: ShareRequ -> STM ()
, sapiRes :: STM ShareResp
}
data MultiServ = MultiServ
{ msPort :: Maybe Word16
, msShip :: TVar (Map Ship ShipAPI)
, msBoot :: TMVar (Ship, ShipAPI)
, msDead :: TMVar Ship
, msKill :: STM ()
}
data Hap = Deþ Ship
| Lif (Ship, ShipAPI)
| Res ShareResp
| Kil ()
multiServ :: HasLogFunc e => MultiServ -> RIO e ()
multiServ ms = do
case msPort ms of
Nothing -> doNothing ms
Just po -> doSomething ms po
{-
If the port is set, we do things for real. We run an HTTP server,
sends requests to the appropriate ship, respond to requests when
responses are given, and shuts down when the king shuts down.
-}
doSomething :: HasLogFunc e => MultiServ -> Word16 -> RIO e ()
doSomething MultiServ{..} httpPort = do
logDebug "Starting HTTP server"
let httpOpts = W.defaultSettings & W.setHost "*"
& W.setPort (fromIntegral httpPort)
sId <- io $ ServId . UV . fromIntegral <$> (randomIO :: IO Word32)
vShips :: TVar (Map Ship ShipAPI) <- newTVarIO mempty
liv <- newTVarIO emptyLiveReqs
env <- ask
plan <- error "TODO"
httpTid <- async $ io
$ W.runSettings httpOpts
$ eyreApp env sId liv plan Insecure
let onHapn :: STM Hap
onHapn = asum [ Lif <$> takeTMVar msBoot
, Deþ <$> takeTMVar msDead
, Res <$> (readTVar vShips >>= asum . fmap sapiRes . toList)
, Kil <$> msKill
]
let loop = join $ atomically $ onHapn >>= \case
Deþ s -> modifyTVar' vShips (deleteMap s) >> pure loop
Lif (s,api) -> modifyTVar' vShips (insertMap s api) >> pure loop
Res _ -> error "TODO"
Kil _ -> pure (cancel httpTid)
loop
{-
If the port is not set, we still run a thread for the shared server. It
doesn't run an HTTP server, it ignores all responses, and it shuts
down when the king shuts down.
-}
doNothing :: MultiServ -> RIO e ()
doNothing MultiServ{..} = do
vShips :: TVar (Map Ship ShipAPI) <- newTVarIO mempty
let onHapn :: STM Hap
onHapn = asum [ Lif <$> takeTMVar msBoot
, Deþ <$> takeTMVar msDead
, Res <$> (readTVar vShips >>= asum . fmap sapiRes . toList)
, Kil <$> msKill
]
let loop = join $ atomically $ onHapn >>= \case
Deþ s -> modifyTVar' vShips (deleteMap s) >> pure loop
Lif (s,api) -> modifyTVar' vShips (insertMap s api) >> pure loop
Res _ -> pure loop
Kil _ -> pure (pure ())
loop

View File

@ -19,7 +19,8 @@
{-# OPTIONS_GHC -Wno-deprecations #-} {-# OPTIONS_GHC -Wno-deprecations #-}
module Urbit.Vere.Eyre.Serv module Urbit.Vere.Eyre.Serv
( TlsConfig(..) ( ServApi(..)
, TlsConfig(..)
, MultiTlsConfig , MultiTlsConfig
, ReqApi(..) , ReqApi(..)
, ServType(..) , ServType(..)
@ -50,6 +51,11 @@ import qualified Urbit.Vere.Eyre.Wai as E
-- Internal Types -------------------------------------------------------------- -- Internal Types --------------------------------------------------------------
data ServApi = ServApi
{ saKil :: STM ()
, saPor :: STM W.Port
}
data TlsConfig = TlsConfig data TlsConfig = TlsConfig
{ tcPrKey :: ByteString { tcPrKey :: ByteString
, tcCerti :: ByteString , tcCerti :: ByteString
@ -82,8 +88,6 @@ data ServConf = ServConf
, scHost :: ServHost , scHost :: ServHost
, scPort :: ServPort , scPort :: ServPort
, scRedi :: Maybe W.Port , scRedi :: Maybe W.Port
, scOpnd :: W.Port -> STM ()
, scDeth :: STM ()
} }
@ -214,10 +218,10 @@ startServer
-> W.Port -> W.Port
-> Net.Socket -> Net.Socket
-> Maybe W.Port -> Maybe W.Port
-> TVar E.LiveReqs
-> RIO e () -> RIO e ()
startServer typ hos por sok red = do startServer typ hos por sok red vLive = do
envir <- ask envir <- ask
vLive <- newTVarIO E.emptyLiveReqs
let host = case hos of let host = case hos of
SHLocalhost -> "127.0.0.1" SHLocalhost -> "127.0.0.1"
@ -266,12 +270,22 @@ configCreds TlsConfig {..} =
Left str -> Left (pack str) Left str -> Left (pack str)
Right rs -> Right rs Right rs -> Right rs
serv :: HasLogFunc e => ServConf -> RIO e () serv :: HasLogFunc e => TVar E.LiveReqs -> ServConf -> RIO e ServApi
serv ServConf {..} = do serv vLive ServConf {..} = do
tid <- async runServ kil <- newEmptyTMVarIO
atomically scDeth por <- newEmptyTMVarIO
cancel tid
void $ async $ do
tid <- async (runServ por)
atomically (takeTMVar kil)
cancel tid
pure $ ServApi
{ saKil = void (tryPutTMVar kil ())
, saPor = readTMVar por
}
where where
runServ = rwith (forceOpenSocket scHost scPort) $ \(por, sok) -> do runServ vPort = do
atomically (scOpnd por) rwith (forceOpenSocket scHost scPort) $ \(por, sok) -> do
startServer scType scHost por sok scRedi atomically (putTMVar vPort por)
startServer scType scHost por sok scRedi vLive