From 5a6187085170fd8304512d835f99bd5acd1b8634 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Sat, 3 Aug 2019 12:26:45 -0700 Subject: [PATCH] *Much* cleaner http-server code. --- pkg/hs-urbit/lib/Vere/Http/Server.hs | 479 +++++++++++++-------------- pkg/hs-urbit/package.yaml | 1 + 2 files changed, 233 insertions(+), 247 deletions(-) diff --git a/pkg/hs-urbit/lib/Vere/Http/Server.hs b/pkg/hs-urbit/lib/Vere/Http/Server.hs index c811f6b6d6..421e9d15d1 100644 --- a/pkg/hs-urbit/lib/Vere/Http/Server.hs +++ b/pkg/hs-urbit/lib/Vere/Http/Server.hs @@ -1,72 +1,55 @@ -{-# OPTIONS_GHC -Wwarn #-} - module Vere.Http.Server where -import Arvo hiding (ServerId, reqBody, reqUrl, secure) +import Arvo hiding (ServerId, reqBody, reqUrl, secure) +import Data.Conduit import Noun -import UrbitPrelude hiding (Builder) - -import Vere.Http hiding (Cancel, Continue, Method, ResponseHeader(..), Start) - +import UrbitPrelude hiding (Builder) import Vere.Pier.Types import Data.Binary.Builder (Builder, fromByteString) import Data.Bits (shiftL, (.|.)) import Network.Socket (SockAddr(..)) import System.Random (randomIO) +import Vere.Http (convertHeaders, unconvertHeaders) import qualified Network.HTTP.Types as H import qualified Network.Wai as W +import qualified Network.Wai.Conduit as W import qualified Network.Wai.Handler.Warp as W import qualified Network.Wai.Handler.WarpTLS as W --- Live Requests --------------------------------------------------------------- +-- RespAction -- Reorganized HttpEvent for Cleaner Processing ------------------ -type ReqId = Word -type SeqId = Word -- TODO Unused. Why is this a thing? +{- + The sequence of actions on a given request *should* be: -data LiveReqs = LiveReqs - { nextReqId :: ReqId - , activeReqs :: Map ReqId (TMVar HttpEvent) - } + [%head .] [%bloc .]* %done -emptyLiveReqs :: LiveReqs -emptyLiveReqs = LiveReqs 1 mempty + But we will actually accept anything, and mostly do the right + thing. There are two situations where we ignore ignore the data from + some actions. -respondToLiveReq :: TVar LiveReqs -> ReqId -> HttpEvent -> STM () -respondToLiveReq var req ev = do - mVar <- lookup req . activeReqs <$> readTVar var - case mVar of - Nothing -> pure () - Just tv -> putTMVar tv ev + - If you send something *after* a %done action, it will be ignored. + - If you send a %done before a %head, we will produce "444 No + Response" with an empty response body. +-} +data RespAction + = RAHead ResponseHeader + | RABloc File + | RADone -newLiveReq :: TVar LiveReqs -> STM (ReqId, TMVar HttpEvent) -newLiveReq var = do - liv <- readTVar var - tmv <- newEmptyTMVar - - let (nex, act) = (nextReqId liv, activeReqs liv) - - writeTVar var (LiveReqs (nex+1) (insertMap nex tmv act)) - - pure (nex, tmv) +reorgHttpEvent :: HttpEvent -> [RespAction] +reorgHttpEvent = \case + Start head mBlk isDone -> [RAHead head] + <> toList (RABloc <$> mBlk) + <> if isDone then [RADone] else [] + Cancel () -> [RADone] + Continue mBlk isDone -> toList (RABloc <$> mBlk) + <> if isDone then [RADone] else [] --- Servers --------------------------------------------------------------------- - -newtype Drv = Drv { unDrv :: MVar (Maybe Serv) } - -data Serv = Serv - { sServId :: ServId - , sConfig :: HttpServerConf - , sHttpTid :: Async () - , sHttpsTid :: Async () - , sLiveReqs :: TVar LiveReqs - } - - --- Generic Service Restart and Stop Logic -------------------------------------- +-- Generic Service Stop/Restart -- Using an MVar for Atomicity ----------------- {- Restart a running service. @@ -112,201 +95,56 @@ stopService vServ kkill = do pure (Nothing, res) +-- Live Requests Table -- All Requests Still Waiting for Responses ------------- + +type ReqId = Word +type SeqId = Word -- TODO Unused. Why is this a thing? + +data LiveReqs = LiveReqs + { nextReqId :: ReqId + , activeReqs :: Map ReqId (TMVar RespAction) + } + +emptyLiveReqs :: LiveReqs +emptyLiveReqs = LiveReqs 1 mempty + +respondToLiveReq :: TVar LiveReqs -> ReqId -> RespAction -> STM () +respondToLiveReq var req ev = do + mVar <- lookup req . activeReqs <$> readTVar var + case mVar of + Nothing -> pure () + Just tv -> putTMVar tv ev + +rmLiveReq :: TVar LiveReqs -> ReqId -> STM () +rmLiveReq var reqId = do + liv <- readTVar var + writeTVar var (liv { activeReqs = deleteMap reqId (activeReqs liv) }) + +newLiveReq :: TVar LiveReqs -> STM (ReqId, TMVar RespAction) +newLiveReq var = do + liv <- readTVar var + tmv <- newEmptyTMVar + + let (nex, act) = (nextReqId liv, activeReqs liv) + + writeTVar var (LiveReqs (nex+1) (insertMap nex tmv act)) + + pure (nex, tmv) + + -- Random Helpers -------------------------------------------------------------- cordBytes :: Cord -> ByteString cordBytes = encodeUtf8 . unCord - --- Utilities for Constructing Events ------------------------------------------- - -servEv :: HttpServerEv -> Ev -servEv = EvBlip . BlipEvHttpServer - -bornEv :: KingId -> Ev -bornEv king = - servEv $ HttpServerEvBorn (fromIntegral king, ()) () - -liveEv :: ServId -> Port -> Maybe Port -> Ev -liveEv sId non sec = - servEv $ HttpServerEvLive (sId, ()) non sec - -reqEv :: ServId -> ReqId -> Bool -> Address -> HttpRequest -> Ev -reqEv sId reqId secure addr req = - servEv $ HttpServerEvRequest (sId, reqId, 1, ()) - $ HttpServerReq secure addr req - - --------------------------------------------------------------------------------- - -killServ :: Serv -> IO () -killServ Serv{sHttpsTid, sHttpTid} = do - cancel sHttpTid - cancel sHttpsTid - wait sHttpTid - wait sHttpsTid - -kill :: Drv -> IO () -kill (Drv v) = stopService v killServ >>= fromEither - -respond :: Drv -> ReqId -> HttpEvent -> IO () -respond (Drv v) req ev = do - readMVar v >>= \case - Nothing -> pure () - Just sv -> atomically (respondToLiveReq (sLiveReqs sv) req ev) - - --- Top-Level Driver Interface -------------------------------------------------- - -serv :: KingId - -> QueueEv - -> ([Ev], Acquire (EffCb HttpServerEf)) -serv king plan = - (initialEvents, runHttpServer) - where - initialEvents :: [Ev] - initialEvents = [bornEv king] - - runHttpServer :: Acquire (EffCb HttpServerEf) - runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill - - restart :: Drv -> HttpServerConf -> IO (ServId, Port, Maybe Port) - restart (Drv var) conf = do - fromEither =<< restartService var (startServ conf plan) killServ - - handleEf :: Drv -> HttpServerEf -> IO () - handleEf drv = \case - HSESetConfig (i, ()) conf -> - when (i == fromIntegral king) $ do - (sId, insecurePort, securePort) <- restart drv conf - atomically (plan (liveEv sId insecurePort securePort)) - HSEResponse (i, req, _seq, ()) ev -> - when (i == fromIntegral king) $ - respond drv (fromIntegral req) ev - - --------------------------------------------------------------------------------- - -{- - TODO Need to find an open port. --} -startServ :: HttpServerConf -> (Ev -> STM ()) - -> IO (Serv, (ServId, Port, Maybe Port)) -startServ conf plan = do - tls <- case (hscSecure conf) of - Nothing -> error "HACK: Implement support for missing PEMs" - Just (PEM key, PEM cert) -> - pure (W.tlsSettingsMemory (cordBytes cert) (cordBytes key)) - - sId <- ServId <$> randomIO - liv <- newTVarIO emptyLiveReqs - - httpsTid <- async $ W.runTLS tls W.defaultSettings (app sId liv plan True) - - httpTid <- async $ W.run 80 (app sId liv plan False) - - let res = (sId, Port 80, Just $ Port 443) - - pure (Serv sId conf httpTid httpsTid liv, res) - -respondLoop :: (W.Response -> IO W.ResponseReceived) - -> TMVar HttpEvent - -> IO W.ResponseReceived -respondLoop respond tmv = start - where - start :: IO W.ResponseReceived - start = do - atomically (readTMVar tmv) >>= \case - Cancel () -> - fullCancel - Continue _ _ -> do - putStrLn "%continue before %start" - start - Start hdr init isDone -> do - startStreaming hdr $ \s d -> do - whenJust init (sendBlock s) - stream isDone s d - - stream :: Bool -> (Builder -> IO ()) -> IO () - -> IO () - stream isDone send done = - case isDone of - True -> closeStream done - False -> do - atomically (readTMVar tmv) >>= \case - Start _ _ _ -> do - putStrLn "%start after %continue" - stream isDone send done - Cancel () -> do - streamingCancel done - Continue blk doneNow -> do - whenJust blk (sendBlock send) - stream doneNow send done - - startStreaming :: ResponseHeader - -> ((Builder -> IO ()) -> IO () -> IO ()) - -> IO W.ResponseReceived - startStreaming hdr cb = do - let status = hdrStatus hdr - headers = hdrHeaders hdr - respond $ W.responseStream status headers cb - - closeStream :: IO () -> IO () - closeStream killReq = killReq - - streamingCancel :: IO () -> IO () - streamingCancel killReq = killReq - - sendBlock :: (Builder -> IO ()) -> File -> IO () - sendBlock sendBlk = sendBlk . fromByteString . unOcts . unFile - - fullCancel :: IO W.ResponseReceived - fullCancel = respond $ W.responseLBS H.status500 [] "request canceled" - -hdrHeaders :: ResponseHeader -> [H.Header] -hdrHeaders = unconvertHeaders . headers - -hdrStatus :: ResponseHeader -> H.Status -hdrStatus = toEnum . fromIntegral . statusCode - whenJust :: Monad m => Maybe a -> (a -> m ()) -> m () whenJust Nothing act = pure () whenJust (Just a) act = act a -{- -data HttpEvent - = Start ResponseHeader (Maybe File) Bool - | Continue (Maybe File) Bool - | Cancel () - deriving (Eq, Ord, Show) --} - -app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> Bool -> W.Application -app sId liv plan secure req respond = do - body <- reqBody req - meth <- maybe (error "bad method") pure (cookMeth req) - - let addr = reqAddr req - hdrs = convertHeaders $ W.requestHeaders req - evReq = HttpRequest meth (reqUrl req) hdrs body - - respVar <- atomically $ do (reqId, var) <- newLiveReq liv - sendReqEvent reqId addr evReq - pure var - - respondLoop respond respVar - - where - - sendReqEvent :: ReqId -> Address -> HttpRequest -> STM () - sendReqEvent reqId x y = - plan (reqEv sId reqId secure x y) - cookMeth :: W.Request -> Maybe Method -cookMeth re = - case H.parseMethod (W.requestMethod re) of - Left _ -> Nothing - Right m -> Just m +cookMeth = H.parseMethod . W.requestMethod >>> \case + Left _ -> Nothing + Right m -> Just m reqIdCord :: ReqId -> Cord reqIdCord = Cord . tshow @@ -335,22 +173,169 @@ mkIpv6 (p, q, r, s) = Ipv6 (pBits .|. qBits .|. rBits .|. sBits) reqUrl :: W.Request -> Cord reqUrl = Cord . decodeUtf8 . W.rawPathInfo + +-- Utilities for Constructing Events ------------------------------------------- + +servEv :: HttpServerEv -> Ev +servEv = EvBlip . BlipEvHttpServer + +bornEv :: KingId -> Ev +bornEv king = + servEv $ HttpServerEvBorn (king, ()) () + +liveEv :: ServId -> Port -> Maybe Port -> Ev +liveEv sId non sec = + servEv $ HttpServerEvLive (sId, ()) non sec + +reqEv :: ServId -> ReqId -> Bool -> Address -> HttpRequest -> Ev +reqEv sId reqId secure addr req = + servEv $ HttpServerEvRequest (sId, reqId, 1, ()) + $ HttpServerReq secure addr req + + + +-- Http Server Flows ----------------------------------------------------------- + {- -data ClientResponse - = Progress ResponseHeader Int (Maybe Int) (Maybe ByteString) - | Finished ResponseHeader (Maybe MimeData) - | Cancel () + This accepts all action orderings so that there are no edge-cases + to be handled: -data MimeData = MimeData Text ByteString - -readEvents :: W.Request -> IO Request -readEvents req = do - let Just meth = cookMeth req - url = Cord $ decodeUtf8 $ W.rawPathInfo req - headers = convertHeaders (W.requestHeaders req) - bodyLbs <- W.strictRequestBody req - let body = if length bodyLbs == 0 then Nothing - else Just $ Octs (toStrict bodyLbs) - - pure (Request meth url headers body) + - If %bloc before %head, collect it and wait for %head. + - If %done before %head, ignore all chunks and produce Nothing. -} +getHead :: TMVar RespAction -> IO (Maybe (ResponseHeader, [File])) +getHead tmv = go [] + where + go çunks = atomically (readTMVar tmv) >>= \case + RAHead head -> pure $ Just (head, reverse çunks) + RABloc çunk -> go (çunk : çunks) + RADone -> pure Nothing + +{- + - Immediatly yield all of the initial chunks + - Yield the data from %bloc action. + - Close the stream when we hit a %done action. +-} +streamBlocks :: [File] -> TMVar RespAction -> ConduitT () (Flush Builder) IO () +streamBlocks init tmv = + for_ init yieldÇunk >> go + where + yieldFlush = \x -> yield (Chunk x) >> yield Flush + yieldÇunk = yieldFlush . fromByteString . unOcts . unFile + logDupHead = putStrLn "Multiple %head actions on one request" + + go = atomically (readTMVar tmv) >>= \case + RAHead head -> logDupHead >> go + RABloc çunk -> yieldÇunk çunk + RADone -> pure () + +sendResponse :: (W.Response -> IO W.ResponseReceived) + -> TMVar RespAction + -> IO W.ResponseReceived +sendResponse cb tmv = do + getHead tmv >>= \case + Nothing -> do cb $ W.responseLBS (H.mkStatus 444 "No Response") [] "" + Just (h,i) -> do let çunks = streamBlocks i tmv + cb $ W.responseSource (hdrStatus h) (hdrHeaders h) çunks + where + hdrHeaders :: ResponseHeader -> [H.Header] + hdrHeaders = unconvertHeaders . headers + + hdrStatus :: ResponseHeader -> H.Status + hdrStatus = toEnum . fromIntegral . statusCode + +app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> Bool -> W.Application +app sId liv plan secure req respond = do + body <- reqBody req + meth <- maybe (error "bad method") pure (cookMeth req) + + let addr = reqAddr req + hdrs = convertHeaders $ W.requestHeaders req + evReq = HttpRequest meth (reqUrl req) hdrs body + + (reqId, respVar) <- atomically (newLiveReq liv) + + atomically $ plan (reqEv sId reqId secure addr evReq) + + done <- sendResponse respond respVar + + atomically (rmLiveReq liv reqId) + + pure done + + +-- Top-Level Driver Interface -------------------------------------------------- + +newtype Drv = Drv { unDrv :: MVar (Maybe Serv) } + +data Serv = Serv + { sServId :: ServId + , sConfig :: HttpServerConf + , sHttpTid :: Async () + , sHttpsTid :: Async () + , sLiveReqs :: TVar LiveReqs + } + +{- + TODO Need to find an open port. +-} +startServ :: HttpServerConf -> (Ev -> STM ()) + -> IO (Serv, (ServId, Port, Maybe Port)) +startServ conf plan = do + tls <- case (hscSecure conf) of + Nothing -> error "HACK: Implement support for missing PEMs" + Just (PEM key, PEM cert) -> + pure (W.tlsSettingsMemory (cordBytes cert) (cordBytes key)) + + sId <- ServId <$> randomIO + liv <- newTVarIO emptyLiveReqs + + httpsTid <- async $ W.runTLS tls W.defaultSettings (app sId liv plan True) + + httpTid <- async $ W.run 80 (app sId liv plan False) + + let res = (sId, Port 80, Just $ Port 443) + + pure (Serv sId conf httpTid httpsTid liv, res) + + +killServ :: Serv -> IO () +killServ Serv{sHttpsTid, sHttpTid} = do + cancel sHttpTid + cancel sHttpsTid + wait sHttpTid + wait sHttpsTid + +kill :: Drv -> IO () +kill (Drv v) = stopService v killServ >>= fromEither + +respond :: Drv -> ReqId -> HttpEvent -> IO () +respond (Drv v) reqId ev = do + readMVar v >>= \case + Nothing -> pure () + Just sv -> atomically $ for_ (reorgHttpEvent ev) $ + respondToLiveReq (sLiveReqs sv) reqId + +serv :: KingId -> QueueEv -> ([Ev], Acquire (EffCb HttpServerEf)) +serv king plan = + (initialEvents, runHttpServer) + where + initialEvents :: [Ev] + initialEvents = [bornEv king] + + runHttpServer :: Acquire (EffCb HttpServerEf) + runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill + + restart :: Drv -> HttpServerConf -> IO (ServId, Port, Maybe Port) + restart (Drv var) conf = do + fromEither =<< restartService var (startServ conf plan) killServ + + handleEf :: Drv -> HttpServerEf -> IO () + handleEf drv = \case + HSESetConfig (i, ()) conf -> + when (i == fromIntegral king) $ do + (sId, insecurePort, securePort) <- restart drv conf + atomically $ plan (liveEv sId insecurePort securePort) + HSEResponse (i, req, _seq, ()) ev -> + when (i == fromIntegral king) $ + respond drv (fromIntegral req) ev diff --git a/pkg/hs-urbit/package.yaml b/pkg/hs-urbit/package.yaml index c73819cdca..e9814407f3 100644 --- a/pkg/hs-urbit/package.yaml +++ b/pkg/hs-urbit/package.yaml @@ -79,6 +79,7 @@ dependencies: - unordered-containers - vector - wai + - wai-conduit - warp - warp-tls