Fixed bug in HTTP Server (only first block from stream was handled).

This commit is contained in:
Benjamin Summers 2019-12-20 13:47:20 -08:00
parent 5dd3cdde91
commit 5d66c39d02

View File

@ -1,3 +1,7 @@
{-
TODO Make sure that HTTP sockets get closed on shutdown.
-}
{-# OPTIONS_GHC -Wwarn #-} {-# OPTIONS_GHC -Wwarn #-}
{- {-
@ -282,10 +286,11 @@ reqEv sId reqId which addr req =
-- Http Server Flows ----------------------------------------------------------- -- Http Server Flows -----------------------------------------------------------
data Req data Resp
= RHead ResponseHeader [File] = RHead ResponseHeader [File]
| RFull ResponseHeader [File] | RFull ResponseHeader [File]
| RNone | RNone
deriving (Show)
{- {-
This accepts all action orderings so that there are no edge-cases This accepts all action orderings so that there are no edge-cases
@ -293,9 +298,11 @@ data Req
- If %bloc before %head, collect it and wait for %head. - If %bloc before %head, collect it and wait for %head.
- If %done before %head, ignore all chunks and produce Nothing. - If %done before %head, ignore all chunks and produce Nothing.
TODO Be strict about this instead. Ignore invalid request streams.
-} -}
getReq :: TQueue RespAction -> RIO e Req getResp :: TQueue RespAction -> RIO e Resp
getReq tmv = go [] getResp tmv = go []
where where
go çunks = atomically (readTQueue tmv) >>= \case go çunks = atomically (readTQueue tmv) >>= \case
RAHead head ç -> pure $ RHead head $ reverse (ç : çunks) RAHead head ç -> pure $ RHead head $ reverse (ç : çunks)
@ -325,9 +332,8 @@ streamBlocks env init tmv =
go = atomically (readTQueue tmv) >>= \case go = atomically (readTQueue tmv) >>= \case
RAHead head c -> logDupHead >> yieldÇunk c >> go RAHead head c -> logDupHead >> yieldÇunk c >> go
RAFull head c -> logDupHead >> yieldÇunk c >> go RAFull head c -> logDupHead >> yieldÇunk c >> go
RABloc c -> yieldÇunk c RABloc c -> yieldÇunk c >> go
RADone -> do runRIO env (logTrace "Stream finished") RADone -> pure ()
pure ()
sendResponse :: HasLogFunc e sendResponse :: HasLogFunc e
=> (W.Response -> IO W.ResponseReceived) => (W.Response -> IO W.ResponseReceived)
@ -335,7 +341,7 @@ sendResponse :: HasLogFunc e
-> RIO e W.ResponseReceived -> RIO e W.ResponseReceived
sendResponse cb tmv = do sendResponse cb tmv = do
env <- ask env <- ask
getReq tmv >>= \case getResp tmv >>= \case
RNone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") [] RNone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") []
$ "" $ ""
RFull h f -> io $ cb $ W.responseLBS (hdrStatus h) (hdrHeaders h) RFull h f -> io $ cb $ W.responseLBS (hdrStatus h) (hdrHeaders h)
@ -499,7 +505,7 @@ respond :: HasLogFunc e
=> Drv -> ReqId -> HttpEvent -> RIO e () => Drv -> ReqId -> HttpEvent -> RIO e ()
respond (Drv v) reqId ev = do respond (Drv v) reqId ev = do
readMVar v >>= \case readMVar v >>= \case
Nothing -> pure () Nothing -> logWarn "Got a response to a request that does not exist."
Just sv -> do logDebug $ displayShow $ reorgHttpEvent ev Just sv -> do logDebug $ displayShow $ reorgHttpEvent ev
for_ (reorgHttpEvent ev) $ for_ (reorgHttpEvent ev) $
atomically . respondToLiveReq (sLiveReqs sv) reqId atomically . respondToLiveReq (sLiveReqs sv) reqId