mirror of
https://github.com/urbit/shrub.git
synced 2025-01-03 10:02:32 +03:00
*Much* cleaner http-server code.
This commit is contained in:
parent
6537950603
commit
5a61870851
@ -1,72 +1,55 @@
|
||||
{-# OPTIONS_GHC -Wwarn #-}
|
||||
|
||||
module Vere.Http.Server where
|
||||
|
||||
import Arvo hiding (ServerId, reqBody, reqUrl, secure)
|
||||
import Arvo hiding (ServerId, reqBody, reqUrl, secure)
|
||||
import Data.Conduit
|
||||
import Noun
|
||||
import UrbitPrelude hiding (Builder)
|
||||
|
||||
import Vere.Http hiding (Cancel, Continue, Method, ResponseHeader(..), Start)
|
||||
|
||||
import UrbitPrelude hiding (Builder)
|
||||
import Vere.Pier.Types
|
||||
|
||||
import Data.Binary.Builder (Builder, fromByteString)
|
||||
import Data.Bits (shiftL, (.|.))
|
||||
import Network.Socket (SockAddr(..))
|
||||
import System.Random (randomIO)
|
||||
import Vere.Http (convertHeaders, unconvertHeaders)
|
||||
|
||||
import qualified Network.HTTP.Types as H
|
||||
import qualified Network.Wai as W
|
||||
import qualified Network.Wai.Conduit as W
|
||||
import qualified Network.Wai.Handler.Warp as W
|
||||
import qualified Network.Wai.Handler.WarpTLS as W
|
||||
|
||||
|
||||
-- Live Requests ---------------------------------------------------------------
|
||||
-- RespAction -- Reorganized HttpEvent for Cleaner Processing ------------------
|
||||
|
||||
type ReqId = Word
|
||||
type SeqId = Word -- TODO Unused. Why is this a thing?
|
||||
{-
|
||||
The sequence of actions on a given request *should* be:
|
||||
|
||||
data LiveReqs = LiveReqs
|
||||
{ nextReqId :: ReqId
|
||||
, activeReqs :: Map ReqId (TMVar HttpEvent)
|
||||
}
|
||||
[%head .] [%bloc .]* %done
|
||||
|
||||
emptyLiveReqs :: LiveReqs
|
||||
emptyLiveReqs = LiveReqs 1 mempty
|
||||
But we will actually accept anything, and mostly do the right
|
||||
thing. There are two situations where we ignore ignore the data from
|
||||
some actions.
|
||||
|
||||
respondToLiveReq :: TVar LiveReqs -> ReqId -> HttpEvent -> STM ()
|
||||
respondToLiveReq var req ev = do
|
||||
mVar <- lookup req . activeReqs <$> readTVar var
|
||||
case mVar of
|
||||
Nothing -> pure ()
|
||||
Just tv -> putTMVar tv ev
|
||||
- If you send something *after* a %done action, it will be ignored.
|
||||
- If you send a %done before a %head, we will produce "444 No
|
||||
Response" with an empty response body.
|
||||
-}
|
||||
data RespAction
|
||||
= RAHead ResponseHeader
|
||||
| RABloc File
|
||||
| RADone
|
||||
|
||||
newLiveReq :: TVar LiveReqs -> STM (ReqId, TMVar HttpEvent)
|
||||
newLiveReq var = do
|
||||
liv <- readTVar var
|
||||
tmv <- newEmptyTMVar
|
||||
|
||||
let (nex, act) = (nextReqId liv, activeReqs liv)
|
||||
|
||||
writeTVar var (LiveReqs (nex+1) (insertMap nex tmv act))
|
||||
|
||||
pure (nex, tmv)
|
||||
reorgHttpEvent :: HttpEvent -> [RespAction]
|
||||
reorgHttpEvent = \case
|
||||
Start head mBlk isDone -> [RAHead head]
|
||||
<> toList (RABloc <$> mBlk)
|
||||
<> if isDone then [RADone] else []
|
||||
Cancel () -> [RADone]
|
||||
Continue mBlk isDone -> toList (RABloc <$> mBlk)
|
||||
<> if isDone then [RADone] else []
|
||||
|
||||
|
||||
-- Servers ---------------------------------------------------------------------
|
||||
|
||||
newtype Drv = Drv { unDrv :: MVar (Maybe Serv) }
|
||||
|
||||
data Serv = Serv
|
||||
{ sServId :: ServId
|
||||
, sConfig :: HttpServerConf
|
||||
, sHttpTid :: Async ()
|
||||
, sHttpsTid :: Async ()
|
||||
, sLiveReqs :: TVar LiveReqs
|
||||
}
|
||||
|
||||
|
||||
-- Generic Service Restart and Stop Logic --------------------------------------
|
||||
-- Generic Service Stop/Restart -- Using an MVar for Atomicity -----------------
|
||||
|
||||
{-
|
||||
Restart a running service.
|
||||
@ -112,201 +95,56 @@ stopService vServ kkill = do
|
||||
pure (Nothing, res)
|
||||
|
||||
|
||||
-- Live Requests Table -- All Requests Still Waiting for Responses -------------
|
||||
|
||||
type ReqId = Word
|
||||
type SeqId = Word -- TODO Unused. Why is this a thing?
|
||||
|
||||
data LiveReqs = LiveReqs
|
||||
{ nextReqId :: ReqId
|
||||
, activeReqs :: Map ReqId (TMVar RespAction)
|
||||
}
|
||||
|
||||
emptyLiveReqs :: LiveReqs
|
||||
emptyLiveReqs = LiveReqs 1 mempty
|
||||
|
||||
respondToLiveReq :: TVar LiveReqs -> ReqId -> RespAction -> STM ()
|
||||
respondToLiveReq var req ev = do
|
||||
mVar <- lookup req . activeReqs <$> readTVar var
|
||||
case mVar of
|
||||
Nothing -> pure ()
|
||||
Just tv -> putTMVar tv ev
|
||||
|
||||
rmLiveReq :: TVar LiveReqs -> ReqId -> STM ()
|
||||
rmLiveReq var reqId = do
|
||||
liv <- readTVar var
|
||||
writeTVar var (liv { activeReqs = deleteMap reqId (activeReqs liv) })
|
||||
|
||||
newLiveReq :: TVar LiveReqs -> STM (ReqId, TMVar RespAction)
|
||||
newLiveReq var = do
|
||||
liv <- readTVar var
|
||||
tmv <- newEmptyTMVar
|
||||
|
||||
let (nex, act) = (nextReqId liv, activeReqs liv)
|
||||
|
||||
writeTVar var (LiveReqs (nex+1) (insertMap nex tmv act))
|
||||
|
||||
pure (nex, tmv)
|
||||
|
||||
|
||||
-- Random Helpers --------------------------------------------------------------
|
||||
|
||||
cordBytes :: Cord -> ByteString
|
||||
cordBytes = encodeUtf8 . unCord
|
||||
|
||||
|
||||
-- Utilities for Constructing Events -------------------------------------------
|
||||
|
||||
servEv :: HttpServerEv -> Ev
|
||||
servEv = EvBlip . BlipEvHttpServer
|
||||
|
||||
bornEv :: KingId -> Ev
|
||||
bornEv king =
|
||||
servEv $ HttpServerEvBorn (fromIntegral king, ()) ()
|
||||
|
||||
liveEv :: ServId -> Port -> Maybe Port -> Ev
|
||||
liveEv sId non sec =
|
||||
servEv $ HttpServerEvLive (sId, ()) non sec
|
||||
|
||||
reqEv :: ServId -> ReqId -> Bool -> Address -> HttpRequest -> Ev
|
||||
reqEv sId reqId secure addr req =
|
||||
servEv $ HttpServerEvRequest (sId, reqId, 1, ())
|
||||
$ HttpServerReq secure addr req
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
killServ :: Serv -> IO ()
|
||||
killServ Serv{sHttpsTid, sHttpTid} = do
|
||||
cancel sHttpTid
|
||||
cancel sHttpsTid
|
||||
wait sHttpTid
|
||||
wait sHttpsTid
|
||||
|
||||
kill :: Drv -> IO ()
|
||||
kill (Drv v) = stopService v killServ >>= fromEither
|
||||
|
||||
respond :: Drv -> ReqId -> HttpEvent -> IO ()
|
||||
respond (Drv v) req ev = do
|
||||
readMVar v >>= \case
|
||||
Nothing -> pure ()
|
||||
Just sv -> atomically (respondToLiveReq (sLiveReqs sv) req ev)
|
||||
|
||||
|
||||
-- Top-Level Driver Interface --------------------------------------------------
|
||||
|
||||
serv :: KingId
|
||||
-> QueueEv
|
||||
-> ([Ev], Acquire (EffCb HttpServerEf))
|
||||
serv king plan =
|
||||
(initialEvents, runHttpServer)
|
||||
where
|
||||
initialEvents :: [Ev]
|
||||
initialEvents = [bornEv king]
|
||||
|
||||
runHttpServer :: Acquire (EffCb HttpServerEf)
|
||||
runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill
|
||||
|
||||
restart :: Drv -> HttpServerConf -> IO (ServId, Port, Maybe Port)
|
||||
restart (Drv var) conf = do
|
||||
fromEither =<< restartService var (startServ conf plan) killServ
|
||||
|
||||
handleEf :: Drv -> HttpServerEf -> IO ()
|
||||
handleEf drv = \case
|
||||
HSESetConfig (i, ()) conf ->
|
||||
when (i == fromIntegral king) $ do
|
||||
(sId, insecurePort, securePort) <- restart drv conf
|
||||
atomically (plan (liveEv sId insecurePort securePort))
|
||||
HSEResponse (i, req, _seq, ()) ev ->
|
||||
when (i == fromIntegral king) $
|
||||
respond drv (fromIntegral req) ev
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
{-
|
||||
TODO Need to find an open port.
|
||||
-}
|
||||
startServ :: HttpServerConf -> (Ev -> STM ())
|
||||
-> IO (Serv, (ServId, Port, Maybe Port))
|
||||
startServ conf plan = do
|
||||
tls <- case (hscSecure conf) of
|
||||
Nothing -> error "HACK: Implement support for missing PEMs"
|
||||
Just (PEM key, PEM cert) ->
|
||||
pure (W.tlsSettingsMemory (cordBytes cert) (cordBytes key))
|
||||
|
||||
sId <- ServId <$> randomIO
|
||||
liv <- newTVarIO emptyLiveReqs
|
||||
|
||||
httpsTid <- async $ W.runTLS tls W.defaultSettings (app sId liv plan True)
|
||||
|
||||
httpTid <- async $ W.run 80 (app sId liv plan False)
|
||||
|
||||
let res = (sId, Port 80, Just $ Port 443)
|
||||
|
||||
pure (Serv sId conf httpTid httpsTid liv, res)
|
||||
|
||||
respondLoop :: (W.Response -> IO W.ResponseReceived)
|
||||
-> TMVar HttpEvent
|
||||
-> IO W.ResponseReceived
|
||||
respondLoop respond tmv = start
|
||||
where
|
||||
start :: IO W.ResponseReceived
|
||||
start = do
|
||||
atomically (readTMVar tmv) >>= \case
|
||||
Cancel () ->
|
||||
fullCancel
|
||||
Continue _ _ -> do
|
||||
putStrLn "%continue before %start"
|
||||
start
|
||||
Start hdr init isDone -> do
|
||||
startStreaming hdr $ \s d -> do
|
||||
whenJust init (sendBlock s)
|
||||
stream isDone s d
|
||||
|
||||
stream :: Bool -> (Builder -> IO ()) -> IO ()
|
||||
-> IO ()
|
||||
stream isDone send done =
|
||||
case isDone of
|
||||
True -> closeStream done
|
||||
False -> do
|
||||
atomically (readTMVar tmv) >>= \case
|
||||
Start _ _ _ -> do
|
||||
putStrLn "%start after %continue"
|
||||
stream isDone send done
|
||||
Cancel () -> do
|
||||
streamingCancel done
|
||||
Continue blk doneNow -> do
|
||||
whenJust blk (sendBlock send)
|
||||
stream doneNow send done
|
||||
|
||||
startStreaming :: ResponseHeader
|
||||
-> ((Builder -> IO ()) -> IO () -> IO ())
|
||||
-> IO W.ResponseReceived
|
||||
startStreaming hdr cb = do
|
||||
let status = hdrStatus hdr
|
||||
headers = hdrHeaders hdr
|
||||
respond $ W.responseStream status headers cb
|
||||
|
||||
closeStream :: IO () -> IO ()
|
||||
closeStream killReq = killReq
|
||||
|
||||
streamingCancel :: IO () -> IO ()
|
||||
streamingCancel killReq = killReq
|
||||
|
||||
sendBlock :: (Builder -> IO ()) -> File -> IO ()
|
||||
sendBlock sendBlk = sendBlk . fromByteString . unOcts . unFile
|
||||
|
||||
fullCancel :: IO W.ResponseReceived
|
||||
fullCancel = respond $ W.responseLBS H.status500 [] "request canceled"
|
||||
|
||||
hdrHeaders :: ResponseHeader -> [H.Header]
|
||||
hdrHeaders = unconvertHeaders . headers
|
||||
|
||||
hdrStatus :: ResponseHeader -> H.Status
|
||||
hdrStatus = toEnum . fromIntegral . statusCode
|
||||
|
||||
whenJust :: Monad m => Maybe a -> (a -> m ()) -> m ()
|
||||
whenJust Nothing act = pure ()
|
||||
whenJust (Just a) act = act a
|
||||
|
||||
{-
|
||||
data HttpEvent
|
||||
= Start ResponseHeader (Maybe File) Bool
|
||||
| Continue (Maybe File) Bool
|
||||
| Cancel ()
|
||||
deriving (Eq, Ord, Show)
|
||||
-}
|
||||
|
||||
app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> Bool -> W.Application
|
||||
app sId liv plan secure req respond = do
|
||||
body <- reqBody req
|
||||
meth <- maybe (error "bad method") pure (cookMeth req)
|
||||
|
||||
let addr = reqAddr req
|
||||
hdrs = convertHeaders $ W.requestHeaders req
|
||||
evReq = HttpRequest meth (reqUrl req) hdrs body
|
||||
|
||||
respVar <- atomically $ do (reqId, var) <- newLiveReq liv
|
||||
sendReqEvent reqId addr evReq
|
||||
pure var
|
||||
|
||||
respondLoop respond respVar
|
||||
|
||||
where
|
||||
|
||||
sendReqEvent :: ReqId -> Address -> HttpRequest -> STM ()
|
||||
sendReqEvent reqId x y =
|
||||
plan (reqEv sId reqId secure x y)
|
||||
|
||||
cookMeth :: W.Request -> Maybe Method
|
||||
cookMeth re =
|
||||
case H.parseMethod (W.requestMethod re) of
|
||||
Left _ -> Nothing
|
||||
Right m -> Just m
|
||||
cookMeth = H.parseMethod . W.requestMethod >>> \case
|
||||
Left _ -> Nothing
|
||||
Right m -> Just m
|
||||
|
||||
reqIdCord :: ReqId -> Cord
|
||||
reqIdCord = Cord . tshow
|
||||
@ -335,22 +173,169 @@ mkIpv6 (p, q, r, s) = Ipv6 (pBits .|. qBits .|. rBits .|. sBits)
|
||||
reqUrl :: W.Request -> Cord
|
||||
reqUrl = Cord . decodeUtf8 . W.rawPathInfo
|
||||
|
||||
|
||||
-- Utilities for Constructing Events -------------------------------------------
|
||||
|
||||
servEv :: HttpServerEv -> Ev
|
||||
servEv = EvBlip . BlipEvHttpServer
|
||||
|
||||
bornEv :: KingId -> Ev
|
||||
bornEv king =
|
||||
servEv $ HttpServerEvBorn (king, ()) ()
|
||||
|
||||
liveEv :: ServId -> Port -> Maybe Port -> Ev
|
||||
liveEv sId non sec =
|
||||
servEv $ HttpServerEvLive (sId, ()) non sec
|
||||
|
||||
reqEv :: ServId -> ReqId -> Bool -> Address -> HttpRequest -> Ev
|
||||
reqEv sId reqId secure addr req =
|
||||
servEv $ HttpServerEvRequest (sId, reqId, 1, ())
|
||||
$ HttpServerReq secure addr req
|
||||
|
||||
|
||||
|
||||
-- Http Server Flows -----------------------------------------------------------
|
||||
|
||||
{-
|
||||
data ClientResponse
|
||||
= Progress ResponseHeader Int (Maybe Int) (Maybe ByteString)
|
||||
| Finished ResponseHeader (Maybe MimeData)
|
||||
| Cancel ()
|
||||
This accepts all action orderings so that there are no edge-cases
|
||||
to be handled:
|
||||
|
||||
data MimeData = MimeData Text ByteString
|
||||
|
||||
readEvents :: W.Request -> IO Request
|
||||
readEvents req = do
|
||||
let Just meth = cookMeth req
|
||||
url = Cord $ decodeUtf8 $ W.rawPathInfo req
|
||||
headers = convertHeaders (W.requestHeaders req)
|
||||
bodyLbs <- W.strictRequestBody req
|
||||
let body = if length bodyLbs == 0 then Nothing
|
||||
else Just $ Octs (toStrict bodyLbs)
|
||||
|
||||
pure (Request meth url headers body)
|
||||
- If %bloc before %head, collect it and wait for %head.
|
||||
- If %done before %head, ignore all chunks and produce Nothing.
|
||||
-}
|
||||
getHead :: TMVar RespAction -> IO (Maybe (ResponseHeader, [File]))
|
||||
getHead tmv = go []
|
||||
where
|
||||
go çunks = atomically (readTMVar tmv) >>= \case
|
||||
RAHead head -> pure $ Just (head, reverse çunks)
|
||||
RABloc çunk -> go (çunk : çunks)
|
||||
RADone -> pure Nothing
|
||||
|
||||
{-
|
||||
- Immediatly yield all of the initial chunks
|
||||
- Yield the data from %bloc action.
|
||||
- Close the stream when we hit a %done action.
|
||||
-}
|
||||
streamBlocks :: [File] -> TMVar RespAction -> ConduitT () (Flush Builder) IO ()
|
||||
streamBlocks init tmv =
|
||||
for_ init yieldÇunk >> go
|
||||
where
|
||||
yieldFlush = \x -> yield (Chunk x) >> yield Flush
|
||||
yieldÇunk = yieldFlush . fromByteString . unOcts . unFile
|
||||
logDupHead = putStrLn "Multiple %head actions on one request"
|
||||
|
||||
go = atomically (readTMVar tmv) >>= \case
|
||||
RAHead head -> logDupHead >> go
|
||||
RABloc çunk -> yieldÇunk çunk
|
||||
RADone -> pure ()
|
||||
|
||||
sendResponse :: (W.Response -> IO W.ResponseReceived)
|
||||
-> TMVar RespAction
|
||||
-> IO W.ResponseReceived
|
||||
sendResponse cb tmv = do
|
||||
getHead tmv >>= \case
|
||||
Nothing -> do cb $ W.responseLBS (H.mkStatus 444 "No Response") [] ""
|
||||
Just (h,i) -> do let çunks = streamBlocks i tmv
|
||||
cb $ W.responseSource (hdrStatus h) (hdrHeaders h) çunks
|
||||
where
|
||||
hdrHeaders :: ResponseHeader -> [H.Header]
|
||||
hdrHeaders = unconvertHeaders . headers
|
||||
|
||||
hdrStatus :: ResponseHeader -> H.Status
|
||||
hdrStatus = toEnum . fromIntegral . statusCode
|
||||
|
||||
app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> Bool -> W.Application
|
||||
app sId liv plan secure req respond = do
|
||||
body <- reqBody req
|
||||
meth <- maybe (error "bad method") pure (cookMeth req)
|
||||
|
||||
let addr = reqAddr req
|
||||
hdrs = convertHeaders $ W.requestHeaders req
|
||||
evReq = HttpRequest meth (reqUrl req) hdrs body
|
||||
|
||||
(reqId, respVar) <- atomically (newLiveReq liv)
|
||||
|
||||
atomically $ plan (reqEv sId reqId secure addr evReq)
|
||||
|
||||
done <- sendResponse respond respVar
|
||||
|
||||
atomically (rmLiveReq liv reqId)
|
||||
|
||||
pure done
|
||||
|
||||
|
||||
-- Top-Level Driver Interface --------------------------------------------------
|
||||
|
||||
newtype Drv = Drv { unDrv :: MVar (Maybe Serv) }
|
||||
|
||||
data Serv = Serv
|
||||
{ sServId :: ServId
|
||||
, sConfig :: HttpServerConf
|
||||
, sHttpTid :: Async ()
|
||||
, sHttpsTid :: Async ()
|
||||
, sLiveReqs :: TVar LiveReqs
|
||||
}
|
||||
|
||||
{-
|
||||
TODO Need to find an open port.
|
||||
-}
|
||||
startServ :: HttpServerConf -> (Ev -> STM ())
|
||||
-> IO (Serv, (ServId, Port, Maybe Port))
|
||||
startServ conf plan = do
|
||||
tls <- case (hscSecure conf) of
|
||||
Nothing -> error "HACK: Implement support for missing PEMs"
|
||||
Just (PEM key, PEM cert) ->
|
||||
pure (W.tlsSettingsMemory (cordBytes cert) (cordBytes key))
|
||||
|
||||
sId <- ServId <$> randomIO
|
||||
liv <- newTVarIO emptyLiveReqs
|
||||
|
||||
httpsTid <- async $ W.runTLS tls W.defaultSettings (app sId liv plan True)
|
||||
|
||||
httpTid <- async $ W.run 80 (app sId liv plan False)
|
||||
|
||||
let res = (sId, Port 80, Just $ Port 443)
|
||||
|
||||
pure (Serv sId conf httpTid httpsTid liv, res)
|
||||
|
||||
|
||||
killServ :: Serv -> IO ()
|
||||
killServ Serv{sHttpsTid, sHttpTid} = do
|
||||
cancel sHttpTid
|
||||
cancel sHttpsTid
|
||||
wait sHttpTid
|
||||
wait sHttpsTid
|
||||
|
||||
kill :: Drv -> IO ()
|
||||
kill (Drv v) = stopService v killServ >>= fromEither
|
||||
|
||||
respond :: Drv -> ReqId -> HttpEvent -> IO ()
|
||||
respond (Drv v) reqId ev = do
|
||||
readMVar v >>= \case
|
||||
Nothing -> pure ()
|
||||
Just sv -> atomically $ for_ (reorgHttpEvent ev) $
|
||||
respondToLiveReq (sLiveReqs sv) reqId
|
||||
|
||||
serv :: KingId -> QueueEv -> ([Ev], Acquire (EffCb HttpServerEf))
|
||||
serv king plan =
|
||||
(initialEvents, runHttpServer)
|
||||
where
|
||||
initialEvents :: [Ev]
|
||||
initialEvents = [bornEv king]
|
||||
|
||||
runHttpServer :: Acquire (EffCb HttpServerEf)
|
||||
runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill
|
||||
|
||||
restart :: Drv -> HttpServerConf -> IO (ServId, Port, Maybe Port)
|
||||
restart (Drv var) conf = do
|
||||
fromEither =<< restartService var (startServ conf plan) killServ
|
||||
|
||||
handleEf :: Drv -> HttpServerEf -> IO ()
|
||||
handleEf drv = \case
|
||||
HSESetConfig (i, ()) conf ->
|
||||
when (i == fromIntegral king) $ do
|
||||
(sId, insecurePort, securePort) <- restart drv conf
|
||||
atomically $ plan (liveEv sId insecurePort securePort)
|
||||
HSEResponse (i, req, _seq, ()) ev ->
|
||||
when (i == fromIntegral king) $
|
||||
respond drv (fromIntegral req) ev
|
||||
|
@ -79,6 +79,7 @@ dependencies:
|
||||
- unordered-containers
|
||||
- vector
|
||||
- wai
|
||||
- wai-conduit
|
||||
- warp
|
||||
- warp-tls
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user