Initial gutting of conduit

This commit is contained in:
Michael Snoyman 2014-04-23 14:35:19 +03:00
parent 9b240bd820
commit 54319e9697
14 changed files with 438 additions and 362 deletions

View File

@ -62,18 +62,17 @@ module Network.Wai
-- * Response
, Response
, FilePart (..)
, WithSource
-- ** Response composers
, responseFile
, responseBuilder
, responseLBS
, responseSource
, responseSourceBracket
, responseStream
, responseStreamBracket
, responseRaw
-- * Response accessors
, responseStatus
, responseHeaders
, responseToSource
, responseToStream
) where
import Blaze.ByteString.Builder (Builder, fromLazyByteString)
@ -82,10 +81,6 @@ import Control.Exception (bracket, bracketOnError)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Data.ByteString.Lazy.Char8 ()
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Lazy (lazyConsume)
import qualified Data.Conduit.List as CL
import Data.Monoid (mempty)
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr (SockAddrInet))
@ -130,8 +125,8 @@ responseLBS :: H.Status -> H.ResponseHeaders -> L.ByteString -> Response
responseLBS s h = ResponseBuilder s h . fromLazyByteString
-- | Creating 'Response' from 'C.Source'.
responseSource :: H.Status -> H.ResponseHeaders -> C.Source IO (C.Flush Builder) -> Response
responseSource st hs src = ResponseSource st hs ($ src)
responseStream :: H.Status -> H.ResponseHeaders -> ((Maybe Builder -> IO ()) -> IO ()) -> Response
responseStream = ResponseStream
-- | Creating 'Response' with allocated resource safely released.
--
@ -142,17 +137,17 @@ responseSource st hs src = ResponseSource st hs ($ src)
-- * The third argument is a function to create
-- ('H.Status','H.ResponseHeaders','C.Source' 'IO' ('C.Flush' 'Builder'))
-- from the resource.
responseSourceBracket :: IO a
responseStreamBracket :: IO a
-> (a -> IO ())
-> (a -> IO (H.Status
,H.ResponseHeaders
,C.Source IO (C.Flush Builder)))
,(Maybe Builder -> IO ()) -> IO ()))
-> IO Response
responseSourceBracket setup teardown action =
responseStreamBracket setup teardown action =
bracketOnError setup teardown $ \resource -> do
(st,hdr,src) <- action resource
return $ ResponseSource st hdr $ \f ->
bracket (return resource) teardown (\_ -> f src)
(st,hdr,stream) <- action resource
return $ ResponseStream st hdr $ \f ->
bracket (return resource) teardown (\_ -> stream f)
-- | Create a response for a raw application. This is useful for \"upgrade\"
-- situations such as WebSockets, where an application requests for the server
@ -165,10 +160,10 @@ responseSourceBracket setup teardown action =
-- @responseRaw@, behavior is undefined.
--
-- Since 2.1.0
responseRaw :: (C.Source IO B.ByteString -> C.Sink B.ByteString IO () -> IO ())
responseRaw :: (IO B.ByteString -> (B.ByteString -> IO ()) -> IO ())
-> Response
-> Response
responseRaw rawApp fallback = ResponseRaw ($ rawApp) fallback
responseRaw = ResponseRaw
----------------------------------------------------------------
@ -176,31 +171,35 @@ responseRaw rawApp fallback = ResponseRaw ($ rawApp) fallback
responseStatus :: Response -> H.Status
responseStatus (ResponseFile s _ _ _) = s
responseStatus (ResponseBuilder s _ _ ) = s
responseStatus (ResponseSource s _ _ ) = s
responseStatus (ResponseStream s _ _ ) = s
responseStatus (ResponseRaw _ res ) = responseStatus res
-- | Accessing 'H.Status' in 'Response'.
responseHeaders :: Response -> H.ResponseHeaders
responseHeaders (ResponseFile _ hs _ _) = hs
responseHeaders (ResponseBuilder _ hs _ ) = hs
responseHeaders (ResponseSource _ hs _ ) = hs
responseHeaders (ResponseStream _ hs _ ) = hs
responseHeaders (ResponseRaw _ res) = responseHeaders res
-- | Converting the body information in 'Response' to 'Source'.
responseToSource :: Response
-> (H.Status, H.ResponseHeaders, WithSource IO (C.Flush Builder) b)
responseToSource (ResponseSource s h b) = (s, h, b)
responseToSource (ResponseFile s h fp (Just part)) =
(s, h, \f -> IO.withFile fp IO.ReadMode $ \handle -> f $ sourceFilePart handle part C.$= CL.map (C.Chunk . fromByteString))
responseToSource (ResponseFile s h fp Nothing) =
(s, h, \f -> IO.withFile fp IO.ReadMode $ \handle -> f $ CB.sourceHandle handle C.$= CL.map (C.Chunk . fromByteString))
responseToSource (ResponseBuilder s h b) =
(s, h, ($ CL.sourceList [C.Chunk b]))
responseToSource (ResponseRaw _ res) = responseToSource res
responseToStream :: Response
-> (H.Status, H.ResponseHeaders, (Maybe Builder -> IO ()) -> IO ())
responseToStream (ResponseStream s h b) = (s, h, b)
responseToStream (ResponseFile s h fp (Just part)) =
error "FIXME responseToStream"
--(s, h, \f -> IO.withFile fp IO.ReadMode $ \handle -> f $ sourceFilePart handle part C.$= CL.map (C.Chunk . fromByteString))
responseToStream (ResponseFile s h fp Nothing) =
error "FIXME responseToStream"
--(s, h, \f -> IO.withFile fp IO.ReadMode $ \handle -> f $ CB.sourceHandle handle C.$= CL.map (C.Chunk . fromByteString))
responseToStream (ResponseBuilder s h b) =
(s, h, ($ Just b))
responseToStream (ResponseRaw _ res) = responseToStream res
{- FIXME
sourceFilePart :: IO.Handle -> FilePart -> C.Source IO B.ByteString
sourceFilePart handle (FilePart offset count _) =
CB.sourceHandleRange handle (Just offset) (Just count)
-}
----------------------------------------------------------------
@ -237,7 +236,7 @@ defaultRequest = Request
, remoteHost = SockAddrInet 0 0
, pathInfo = []
, queryString = []
, requestBody = return ()
, requestBody = return B.empty
, vault = mempty
, requestBodyLength = KnownLength 0
, requestHeaderHost = Nothing
@ -249,4 +248,4 @@ defaultRequest = Request
--
-- Since 1.4.1
lazyRequestBody :: Request -> IO L.ByteString
lazyRequestBody = fmap L.fromChunks . lazyConsume . requestBody
lazyRequestBody = error "lazyRequestBody" -- fmap L.fromChunks . lazyConsume . requestBody

View File

@ -8,7 +8,6 @@ module Network.Wai.Internal where
import Blaze.ByteString.Builder (Builder)
import qualified Data.ByteString as B
import qualified Data.Conduit as C
import Data.Text (Text)
import Data.Typeable (Typeable)
import Data.Vault.Lazy (Vault)
@ -51,8 +50,9 @@ data Request = Request {
, pathInfo :: [Text]
-- | Parsed query string information
, queryString :: H.Query
-- | A request body provided as 'Source'.
, requestBody :: C.Source IO B.ByteString
-- | Get the next chunk of the body. Returns an empty bytestring when the
-- body is fully consumed.
, requestBody :: IO B.ByteString
-- | A location for arbitrary data to be shared by applications and middleware.
, vault :: Vault
-- | The size of the request body. In the case of a chunked request body,
@ -84,16 +84,13 @@ data Request = Request {
data Response
= ResponseFile H.Status H.ResponseHeaders FilePath (Maybe FilePart)
| ResponseBuilder H.Status H.ResponseHeaders Builder
| ResponseSource H.Status H.ResponseHeaders (forall b. WithSource IO (C.Flush Builder) b)
| ResponseRaw (forall b. WithRawApp b) Response
-- FIXME perhaps need a better distinction between "allocate/free
-- resources" and "start running". See, for example, restore ugliness in
-- Warp
| ResponseStream H.Status H.ResponseHeaders ((Maybe Builder -> IO ()) -> IO ())
| ResponseRaw (IO B.ByteString -> (B.ByteString -> IO ()) -> IO ()) Response
deriving Typeable
type RawApp = C.Source IO B.ByteString -> C.Sink B.ByteString IO () -> IO ()
type WithRawApp b = (RawApp -> IO b) -> IO b
-- | Auxiliary type for 'ResponseSource'.
type WithSource m a b = (C.Source m a -> m b) -> m b
-- | The size of the request body. In the case of chunked bodies, the size will
-- not be known.
--

View File

@ -1,5 +1,5 @@
Name: wai
Version: 2.1.0.2
Version: 3.0.0
Synopsis: Web Application Interface.
Description: Provides a common protocol for communication between web applications and web servers.
License: MIT
@ -20,8 +20,6 @@ Library
Build-Depends: base >= 4 && < 5
, bytestring >= 0.9.1.4
, blaze-builder >= 0.2.1.4 && < 0.4
, conduit >= 1.0.8 && < 1.2
, conduit-extra >= 1.0
, network >= 2.2.1.5
, http-types >= 0.7
, text >= 0.7

View File

@ -35,7 +35,6 @@ module Network.Wai.Handler.Warp (
, setOnOpen
, setOnClose
, setTimeout
, setIntercept
, setManager
, setFdCacheDuration
, setBeforeMainLoop
@ -52,7 +51,6 @@ module Network.Wai.Handler.Warp (
, settingsOnOpen
, settingsOnClose
, settingsTimeout
, settingsIntercept
, settingsManager
, settingsFdCacheDuration
, settingsBeforeMainLoop
@ -87,7 +85,6 @@ module Network.Wai.Handler.Warp (
, sendResponse
) where
import Data.Conduit.Network (HostPreference(..))
import Network.Wai.Handler.Warp.Date
import Network.Wai.Handler.Warp.FdCache
import Network.Wai.Handler.Warp.Header
@ -100,8 +97,8 @@ import Network.Wai.Handler.Warp.Types
import Control.Exception (SomeException)
import Network.Wai (Request, Response)
import Network.Socket (SockAddr)
import Data.Conduit (Source)
import Data.ByteString (ByteString)
import Data.Streaming.Network (HostPreference)
-- | Port to listen on. Default value: 3000
--
@ -151,16 +148,6 @@ setOnClose x y = y { settingsOnClose = x }
setTimeout :: Int -> Settings -> Settings
setTimeout x y = y { settingsTimeout = x }
-- | Register some intercept handler to deal with specific requests. Prime use
-- case: websockets.
--
-- Default: always return @Nothing@.
--
-- Since 2.1.0
setIntercept :: (Request -> IO (Maybe (Source IO ByteString -> Connection -> IO ())))
-> Settings -> Settings
setIntercept x y = y { settingsIntercept = x }
-- | Use an existing timeout manager instead of spawning a new one. If used,
-- 'settingsTimeout' is ignored.
--

View File

@ -2,17 +2,14 @@ module Network.Wai.Handler.Warp.Conduit where
import Control.Applicative
import Control.Exception
import Control.Monad (unless)
import Control.Monad (when, unless)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy.Char8 (pack)
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Internal (ResumableSource (..))
import qualified Data.Conduit.List as CL
import qualified Data.IORef as I
import Data.Word (Word, Word8)
import Network.Wai.Handler.Warp.Types
@ -20,22 +17,29 @@ import Network.Wai.Handler.Warp.Types
----------------------------------------------------------------
-- | Contains a @Source@ and a byte count that is still to be read in.
newtype IsolatedBSSource = IsolatedBSSource (I.IORef (Int, ResumableSource IO ByteString))
data ISource = ISource !Source !(I.IORef Int)
mkISource :: Source -> Int -> IO ISource
mkISource src cnt = do
ref <- I.newIORef cnt
return $! ISource src ref
-- | Given an @IsolatedBSSource@ provide a @Source@ that only allows up to the
-- specified number of bytes to be passed downstream. All leftovers should be
-- retained within the @Source@. If there are not enough bytes available,
-- throws a @ConnectionClosedByPeer@ exception.
ibsIsolate :: IsolatedBSSource -> Source IO ByteString
ibsIsolate ibs@(IsolatedBSSource ref) = do
(count, src) <- liftIO $ I.readIORef ref
unless (count == 0) $ do
-- Get the next chunk (if available) and the updated source
(src', mbs) <- lift $ src $$++ CL.head
readISource :: ISource -> IO ByteString
readISource (ISource src ref) = do
count <- I.readIORef ref
if count == 0
then return S.empty
else do
bs <- readSource src
-- If no chunk available, then there aren't enough bytes in the
-- stream. Throw a ConnectionClosedByPeer
bs <- maybe (liftIO $ throwIO ConnectionClosedByPeer) return mbs
when (S.null bs) $ liftIO $ throwIO ConnectionClosedByPeer
let -- How many of the bytes in this chunk to send downstream
toSend = min count (S.length bs)
@ -48,16 +52,8 @@ ibsIsolate ibs@(IsolatedBSSource ref) = do
-- downstream, and then loop on this function for the
-- next chunk.
| count' > 0 -> do
liftIO $ I.writeIORef ref (count', src')
yield bs
ibsIsolate ibs
-- The expected count is the total size of the chunk we
-- just read. Send this chunk downstream, and then
-- terminate the stream.
| count == S.length bs -> do
liftIO $ I.writeIORef ref (count', src')
yield bs
liftIO $ I.writeIORef ref count'
return bs
-- Some of the bytes in this chunk should not be sent
-- downstream. Split up the chunk into the sent and
@ -65,84 +61,109 @@ ibsIsolate ibs@(IsolatedBSSource ref) = do
-- source, and send the rest of the chunk downstream.
| otherwise -> do
let (x, y) = S.splitAt toSend bs
liftIO $ I.writeIORef ref (count', fmapResume (yield y >>) src')
yield x
leftoverSource src y
assert (count' == 0) $ I.writeIORef ref count'
return x
-- | Extract the underlying @Source@ from an @IsolatedBSSource@, which will not
-- perform any more isolation.
ibsDone :: IsolatedBSSource -> IO (ResumableSource IO ByteString)
ibsDone (IsolatedBSSource ref) = snd <$> I.readIORef ref
isourceDone :: ISource -> Source
isourceDone (ISource src _) = src
----------------------------------------------------------------
data CSource = CSource !Source !(I.IORef ChunkState)
data ChunkState = NeedLen
| NeedLenNewline
| HaveLen Word
| DoneChunking
deriving Show
bsCRLF :: L.ByteString
bsCRLF = pack "\r\n"
chunkedSource :: MonadIO m
=> I.IORef (ResumableSource m ByteString, ChunkState)
-> Source m ByteString
chunkedSource ipair = do
(src, mlen) <- liftIO $ I.readIORef ipair
go src mlen
mkCSource :: Source -> IO CSource
mkCSource src = do
ref <- I.newIORef NeedLen
return $! CSource src ref
readCSource :: CSource -> IO ByteString
readCSource (CSource src ref) = do
mlen <- I.readIORef ref
go mlen
where
go' src front = do
(src', (len, bs)) <- lift $ src $$++ front getLen
let src''
| S.null bs = src'
| otherwise = fmapResume (yield bs >>) src'
go src'' $ HaveLen len
withLen 0 bs = do
leftoverSource src bs
dropCRLF
yield' S.empty DoneChunking
withLen len bs
| S.null bs = do
-- FIXME should this throw an exception if len > 0?
I.writeIORef ref DoneChunking
return S.empty
| otherwise =
case S.length bs `compare` fromIntegral len of
EQ -> yield' bs NeedLenNewline
LT -> yield' bs $ HaveLen $ len - fromIntegral (S.length bs)
GT -> do
let (x, y) = S.splitAt (fromIntegral len) bs
leftoverSource src y
yield' x NeedLenNewline
go src NeedLen = go' src id
go src NeedLenNewline = go' src (CB.take 2 >>)
go src (HaveLen 0) = do
yield' bs mlen = do
I.writeIORef ref mlen
return bs
dropCRLF = do
bs <- readSource src
case S.uncons bs of
Nothing -> return ()
Just (13, bs') -> dropLF bs'
Just (10, bs') -> leftoverSource src bs'
Just _ -> leftoverSource src bs
dropLF bs =
case S.uncons bs of
Nothing -> do
bs2 <- readSource' src
unless (S.null bs2) $ dropLF bs2
Just (10, bs') -> leftoverSource src bs'
Just _ -> leftoverSource src bs
go NeedLen = getLen
go NeedLenNewline = dropCRLF >> getLen
go (HaveLen 0) = do
-- Drop the final CRLF
(src', ()) <- lift $ src $$++ do
crlf <- CB.take 2
unless (crlf == bsCRLF) $ leftover $ S.concat $ L.toChunks crlf
liftIO $ I.writeIORef ipair (src', HaveLen 0)
go src (HaveLen len) = do
(src', mbs) <- lift $ src $$++ CL.head
case mbs of
Nothing -> liftIO $ I.writeIORef ipair (src', HaveLen 0)
Just bs ->
case S.length bs `compare` fromIntegral len of
EQ -> yield' src' NeedLenNewline bs
LT -> do
let mlen = HaveLen $ len - fromIntegral (S.length bs)
yield' src' mlen bs
GT -> do
let (x, y) = S.splitAt (fromIntegral len) bs
let src'' = fmapResume (yield y >>) src'
yield' src'' NeedLenNewline x
dropCRLF
I.writeIORef ref DoneChunking
return S.empty
go (HaveLen len) = do
bs <- readSource src
withLen len bs
go DoneChunking = return S.empty
yield' src mlen bs = do
liftIO $ I.writeIORef ipair (src, mlen)
yield bs
go src mlen
getLen :: Monad m => Sink ByteString m (Word, ByteString)
-- Get the length from the source, and then pass off control to withLen
getLen = do
mbs <- CL.head
case mbs of
Nothing -> return (0, S.empty)
Just bs -> do
bs <- readSource src
if S.null bs
then do
I.writeIORef ref $ assert False $ HaveLen 0
return S.empty
else do
(x, y) <-
case S.breakByte 10 bs of
(x, y)
| S.null y -> do
mbs2 <- CL.head
case mbs2 of
Nothing -> return (x, y)
Just bs2 -> return $ S.breakByte 10 $ bs `S.append` bs2
bs2 <- readSource' src
if S.null bs2
then return (x, y)
else return $ S.breakByte 10 $ bs `S.append` bs2
| otherwise -> return (x, y)
let w =
S.foldl' (\i c -> i * 16 + fromIntegral (hexToWord c)) 0
$ S.takeWhile isHexDigit x
return (w, S.drop 1 y)
withLen w $ S.drop 1 y
hexToWord w
| w < 58 = w - 48
@ -153,8 +174,3 @@ isHexDigit :: Word8 -> Bool
isHexDigit w = w >= 48 && w <= 57
|| w >= 65 && w <= 70
|| w >= 97 && w <= 102
----------------------------------------------------------------
fmapResume :: (Source m o1 -> Source m o2) -> ResumableSource m o1 -> ResumableSource m o2
fmapResume f (ResumableSource src m) = ResumableSource (f src) m

View File

@ -17,7 +17,6 @@ import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import qualified Data.ByteString.Unsafe as SU
import qualified Data.CaseInsensitive as CI
import Data.Conduit
import qualified Data.IORef as I
import Data.Monoid (mempty)
import qualified Network.HTTP.Types as H
@ -32,6 +31,7 @@ import qualified Network.Wai.Handler.Warp.Timeout as Timeout
import Network.Wai.Handler.Warp.Types
import Network.Wai.Internal
import Prelude hiding (lines)
import Control.Monad (when)
----------------------------------------------------------------
@ -47,25 +47,22 @@ recvRequest :: Settings
-> Connection
-> InternalInfo
-> SockAddr -- ^ Peer's address.
-> Source IO ByteString -- ^ Where HTTP request comes from.
-> Source -- ^ Where HTTP request comes from.
-> IO (Request
,IndexedHeader
,IO (ResumableSource IO ByteString)
,Maybe ByteString) -- ^
,IndexedHeader) -- ^
-- 'Request' passed to 'Application',
-- 'IndexedHeader' of HTTP request for internal use,
-- leftover source (i.e. body and other HTTP reqeusts in HTTP pipelining),
-- leftovers from request header parsing (used for raw responses)
recvRequest settings conn ii addr src0 = do
(src, (leftover', hdrlines)) <- src0 $$+ headerLines
recvRequest settings conn ii addr src = do
hdrlines <- headerLines src
(method, unparsedPath, path, query, httpversion, hdr) <- parseHeaderLines hdrlines
let idxhdr = indexRequestHeader hdr
expect = idxhdr ! idxExpect
cl = idxhdr ! idxContentLength
te = idxhdr ! idxTransferEncoding
liftIO $ handleExpect conn httpversion expect
(rbody, bodyLength, getSource) <- bodyAndSource src cl te
(rbody, bodyLength) <- bodyAndSource src cl te
rbody' <- timeoutBody th rbody
let req = Request {
requestMethod = method
, httpVersion = httpversion
@ -76,21 +73,24 @@ recvRequest settings conn ii addr src0 = do
, requestHeaders = hdr
, isSecure = False
, remoteHost = addr
, requestBody = timeoutBody th rbody
, requestBody = rbody'
, vault = mempty
, requestBodyLength = bodyLength
, requestHeaderHost = idxhdr ! idxHost
, requestHeaderRange = idxhdr ! idxRange
}
return (req, idxhdr, getSource, leftover')
return (req, idxhdr)
where
th = threadHandle ii
----------------------------------------------------------------
headerLines :: Sink ByteString IO (Maybe ByteString, [ByteString])
headerLines =
await >>= maybe (throwIO (NotEnoughLines [])) (push (THStatus 0 id id))
headerLines :: Source -> IO [ByteString]
headerLines src = do
bs <- readSource src
if S.null bs
then throwIO $ NotEnoughLines []
else push src (THStatus 0 id id) bs
----------------------------------------------------------------
@ -109,19 +109,19 @@ handleExpect _ _ _ = return ()
----------------------------------------------------------------
bodyAndSource :: ResumableSource IO ByteString
-> Maybe HeaderValue
-> Maybe HeaderValue
-> IO (Source IO ByteString
bodyAndSource :: Source
-> Maybe HeaderValue -- ^ content length
-> Maybe HeaderValue -- ^ transfer-encoding
-> IO (IO ByteString
,RequestBodyLength
,IO (ResumableSource IO ByteString))
)
bodyAndSource src cl te
| chunked = do
ref <- I.newIORef (src, NeedLen)
return (chunkedSource ref, ChunkedBody, fst <$> I.readIORef ref)
csrc <- mkCSource src
return (readCSource csrc, ChunkedBody)
| otherwise = do
ibs <- IsolatedBSSource <$> I.newIORef (len, src)
return (ibsIsolate ibs, bodyLen, ibsDone ibs)
isrc <- mkISource src len
return (readISource isrc, bodyLen)
where
len = toLength cl
bodyLen = KnownLength $ fromIntegral len
@ -137,17 +137,28 @@ isChunked _ = False
----------------------------------------------------------------
timeoutBody :: Timeout.Handle -> Source IO ByteString -> Source IO ByteString
timeoutBody :: Timeout.Handle -> IO ByteString -> IO (IO ByteString)
timeoutBody timeoutHandle rbody = do
-- Timeout handling was paused after receiving the full request
-- headers. Now we need to resume it to avoid a slowloris
-- attack during request body sending.
liftIO $ Timeout.resume timeoutHandle
-- As soon as we finish receiving the request body, whether
-- because the application is not interested in more bytes, or
-- because there is no more data available, pause the timeout
-- handler again.
addCleanup (const $ liftIO $ Timeout.pause timeoutHandle) rbody
isFirstRef <- I.newIORef True
return $ do
isFirst <- I.readIORef isFirstRef
when isFirst $
-- Timeout handling was paused after receiving the full request
-- headers. Now we need to resume it to avoid a slowloris
-- attack during request body sending.
Timeout.resume timeoutHandle
bs <- rbody
-- As soon as we finish receiving the request body, whether
-- because the application is not interested in more bytes, or
-- because there is no more data available, pause the timeout
-- handler again.
when (S.null bs) (Timeout.pause timeoutHandle)
return bs
----------------------------------------------------------------
@ -161,11 +172,13 @@ data THStatus = THStatus
----------------------------------------------------------------
{- FIXME
close :: Sink ByteString IO a
close = throwIO IncompleteHeaders
-}
push :: THStatus -> ByteString -> Sink ByteString IO (Maybe ByteString, [ByteString])
push (THStatus len lines prepend) bs'
push :: Source -> THStatus -> ByteString -> IO [ByteString]
push src (THStatus len lines prepend) bs'
-- Too many bytes
| len > maxTotalHeaderLength = throwIO OverLargeHeader
| otherwise = push' mnl
@ -187,15 +200,19 @@ push (THStatus len lines prepend) bs'
Just (nl, False)
{-# INLINE push' #-}
push' :: Maybe (Int, Bool) -> IO [ByteString]
-- No newline find in this chunk. Add it to the prepend,
-- update the length, and continue processing.
push' Nothing = await >>= maybe close (push status)
push' Nothing = do
bs <- readSource' src
when (S.null bs) $ throwIO IncompleteHeaders
push src status bs
where
len' = len + bsLen
prepend' = S.append bs
status = THStatus len' lines prepend'
-- Found a newline, but next line continues as a multiline header
push' (Just (end, True)) = push status rest
push' (Just (end, True)) = push src status rest
where
rest = S.drop (end + 1) bs
prepend' = S.append (SU.unsafeTake (checkCR bs end) bs)
@ -204,12 +221,9 @@ push (THStatus len lines prepend) bs'
-- Found a newline at position end.
push' (Just (end, False))
-- leftover
| S.null line = let lines' = lines []
rest = if start < bsLen then
Just (SU.unsafeDrop start bs)
else
Nothing
in maybe (return ()) leftover rest >> return (rest, lines')
| S.null line = do
when (start < bsLen) $ leftoverSource src (SU.unsafeDrop start bs)
return (lines [])
-- more headers
| otherwise = let len' = len + start
lines' = lines . (line:)
@ -217,10 +231,12 @@ push (THStatus len lines prepend) bs'
in if start < bsLen then
-- more bytes in this chunk, push again
let bs'' = SU.unsafeDrop start bs
in push status bs''
else
in push src status bs''
else do
-- no more bytes in this chunk, ask for more
await >>= maybe close (push status)
bs <- readSource' src
when (S.null bs) $ throwIO IncompleteHeaders
push src status bs
where
start = end + 1 -- start of next chunk
line = SU.unsafeTake (checkCR bs end) bs

View File

@ -19,13 +19,11 @@ import Control.Exception
import Control.Monad.IO.Class (liftIO)
import Data.Array ((!))
import Data.ByteString (ByteString)
import Data.Streaming.Blaze (newBlazeRecv, reuseBufferStrategy, allocBuffer)
import qualified Data.ByteString as S
import Control.Monad (unless)
import Control.Monad (unless, when)
import qualified Data.ByteString.Char8 as B (pack)
import qualified Data.CaseInsensitive as CI
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Blaze (builderToByteStringWith, allocBuffer, reuseBufferStrategy)
import Data.Function (on)
import Data.List (deleteBy)
import Data.Maybe (isJust, listToMaybe)
@ -161,10 +159,10 @@ sendResponse :: Connection
-> (forall a. IO a -> IO a) -- ^ Restore masking state.
-> Request -- ^ HTTP request.
-> IndexedHeader -- ^ Indexed header of HTTP request.
-> Maybe ByteString -- ^ leftovers from header parsing
-> IO ByteString -- ^ source from client, for raw response
-> Response -- ^ HTTP response including status code and response header.
-> IO Bool -- ^ Returing True if the connection is persistent.
sendResponse conn ii restore req reqidxhdr leftover' response = do
sendResponse conn ii restore req reqidxhdr src response = do
hs <- addServerAndDate hs0
if hasBody s req then do
sendRsp conn ver s hs restore rsp
@ -188,20 +186,20 @@ sendResponse conn ii restore req reqidxhdr leftover' response = do
rsp = case response of
ResponseFile _ _ path mPart -> RspFile path mPart mRange (T.tickle th)
ResponseBuilder _ _ b -> RspBuilder b needsChunked
ResponseSource _ _ fb -> RspSource fb needsChunked th
ResponseRaw raw _ -> RspRaw raw leftover' (T.tickle th)
ResponseStream _ _ fb -> RspStream fb needsChunked th
ResponseRaw raw _ -> RspRaw raw src (T.tickle th)
ret = case response of
ResponseFile {} -> isPersist
ResponseBuilder {} -> isKeepAlive
ResponseSource {} -> isKeepAlive
ResponseStream {} -> isKeepAlive
ResponseRaw {} -> False
----------------------------------------------------------------
data Rsp = RspFile FilePath (Maybe FilePart) (Maybe HeaderValue) (IO ())
| RspBuilder Builder Bool
| RspSource (forall b. WithSource IO (Flush Builder) b) Bool T.Handle
| RspRaw (forall b. WithRawApp b) (Maybe ByteString) (IO ())
| RspStream ((Maybe Builder -> IO ()) -> IO ()) Bool T.Handle
| RspRaw (IO ByteString -> (ByteString -> IO ()) -> IO ()) (IO ByteString) (IO ())
----------------------------------------------------------------
@ -243,37 +241,36 @@ sendRsp conn ver s hs restore (RspBuilder body needsChunked) = restore $ do
----------------------------------------------------------------
sendRsp conn ver s hs restore (RspSource withBodyFlush needsChunked th) =
withBodyFlush $ \bodyFlush -> restore $ do
sendRsp conn ver s hs restore (RspStream withBodyFlush needsChunked th) = do
header <- composeHeaderBuilder ver s hs needsChunked
let src = yield header >> cbody bodyFlush
strategy = reuseBufferStrategy $ allocBuffer defaultBufferSize
src $$ builderToByteStringWith strategy =$ connSink conn th
where
cbody bodyFlush = if needsChunked then body $= chunk else body
where
body = mapOutput (\x -> case x of
Flush -> flush
Chunk builder -> builder)
bodyFlush
chunk :: Conduit Builder IO Builder
chunk = await >>= maybe (yield chunkedTransferTerminator) (\x -> yield (chunkedTransferEncoding x) >> chunk)
(recv, finish) <- newBlazeRecv $ reuseBufferStrategy $ allocBuffer defaultBufferSize
let addChunk builder = do
popper <- recv builder
let loop = do
bs <- popper
unless (S.null bs) $ do
connSink conn th bs
loop
loop
addChunk header
restore $ do
withBodyFlush $ \mbuilder -> addChunk $ case mbuilder of
Nothing -> flush
Just builder -> chunkedTransferEncoding builder
when needsChunked $ addChunk chunkedTransferTerminator
mbs <- finish
maybe (return ()) (connSink conn th) mbs
----------------------------------------------------------------
sendRsp conn _ _ _ restore (RspRaw withApp mbs tickle) =
withApp $ \app -> restore $ app src sink
sendRsp conn _ _ _ restore (RspRaw withApp src tickle) =
withApp recv send
where
sink = CL.mapM_ (\bs -> connSendAll conn bs >> tickle)
src = do
maybe (return ()) yield mbs
loop
where
loop = do
bs <- liftIO $ connRecv conn
unless (S.null bs) $ do
liftIO tickle
yield bs >> loop
recv = do
bs <- src
unless (S.null bs) tickle
return bs
send bs = connSendAll conn bs >> tickle
----------------------------------------------------------------
@ -284,12 +281,16 @@ sendResponseNoBody :: Connection
-> (forall a. IO a -> IO a) -- ^ restore
-> Response
-> IO ()
sendResponseNoBody conn ver s hs restore (ResponseSource _ _ withBodyFlush) =
withBodyFlush $ \_bodyFlush ->
restore $ composeHeader ver s hs >>= connSendAll conn
sendResponseNoBody conn ver s hs restore (ResponseRaw withRaw _) =
withRaw $ \_raw ->
restore $ composeHeader ver s hs >>= connSendAll conn
sendResponseNoBody conn ver s hs restore (ResponseStream _ _ withBodyFlush) = restore $ do
-- Allow the application to free resources
withBodyFlush $ const $ return ()
composeHeader ver s hs >>= connSendAll conn
sendResponseNoBody conn ver s hs restore (ResponseRaw withRaw _) = restore $ do
-- Allow the application to free resources
withRaw (return S.empty) (const $ return ())
composeHeader ver s hs >>= connSendAll conn
sendResponseNoBody conn ver s hs restore ResponseBuilder{} =
restore $ composeHeader ver s hs >>= connSendAll conn
sendResponseNoBody conn ver s hs restore ResponseFile{} =
@ -299,17 +300,11 @@ sendResponseNoBody conn ver s hs restore ResponseFile{} =
----------------------------------------------------------------
-- | Use 'connSendAll' to send this data while respecting timeout rules.
connSink :: Connection -> T.Handle -> Sink ByteString IO ()
connSink Connection { connSendAll = send } th = sink
where
sink = await >>= maybe close push
close = liftIO (T.resume th)
push x = do
liftIO $ do
T.resume th
send x
T.pause th
sink
connSink :: Connection -> T.Handle -> ByteString -> IO ()
connSink Connection { connSendAll = send } th bs = do
T.resume th
send bs
T.pause th
-- We pause timeouts before passing control back to user code. This ensures
-- that a timeout will only ever be executed when Warp is in control. We
-- also make sure to resume the timeout after the completion of user code

View File

@ -13,15 +13,7 @@ import Control.Monad (forever, when, unless, void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit
import Data.Conduit.Internal (ResumableSource (..))
import qualified Data.Conduit.List as CL
#if MIN_VERSION_conduit(1,1,0)
import Data.Streaming.Network (bindPortTCP)
#else
import Data.Conduit.Network (bindPort)
#define bindPortTCP bindPort
#endif
import Network (sClose, Socket)
import Network.Socket (accept, SockAddr)
import qualified Network.Socket.ByteString as Sock
@ -237,7 +229,8 @@ serveConnection :: Connection
-> IO ()
serveConnection conn ii addr isSecure' settings app = do
istatus <- newIORef False
recvSendLoop istatus (connSource conn th istatus) `E.catch` \e -> do
src <- mkSource (connSource conn th istatus)
recvSendLoop istatus src `E.catch` \e -> do
sendErrorResponse istatus e
throwIO (e :: SomeException)
@ -247,65 +240,61 @@ serveConnection conn ii addr isSecure' settings app = do
sendErrorResponse istatus e = do
status <- readIORef istatus
when status $ void $ mask $ \restore ->
sendResponse conn ii restore dummyreq defaultIndexRequestHeader Nothing (errorResponse e)
sendResponse conn ii restore dummyreq defaultIndexRequestHeader (return S.empty) (errorResponse e)
dummyreq = defaultRequest { remoteHost = addr }
errorResponse e = settingsOnExceptionResponse settings e
recvSendLoop istatus fromClient = do
(req', idxhdr, getSource, leftover') <- recvRequest settings conn ii addr fromClient
(req', idxhdr) <- recvRequest settings conn ii addr fromClient
let req = req' { isSecure = isSecure' }
intercept' <- settingsIntercept settings req
case intercept' of
Nothing -> do
-- Let the application run for as long as it wants
T.pause th
-- Let the application run for as long as it wants
T.pause th
-- In the event that some scarce resource was acquired during
-- creating the request, we need to make sure that we don't get
-- an async exception before calling the ResponseSource.
keepAlive <- mask $ \restore -> do
res <- restore $ app req
T.resume th
-- FIXME consider forcing evaluation of the res here to
-- send more meaningful error messages to the user.
-- However, it may affect performance.
writeIORef istatus False
sendResponse conn ii restore req idxhdr leftover' res
-- In the event that some scarce resource was acquired during
-- creating the request, we need to make sure that we don't get
-- an async exception before calling the ResponseSource.
keepAlive <- mask $ \restore -> do
res <- restore $ app req
T.resume th
-- FIXME consider forcing evaluation of the res here to
-- send more meaningful error messages to the user.
-- However, it may affect performance.
writeIORef istatus False
sendResponse conn ii restore req idxhdr (readSource fromClient) res
-- We just send a Response and it takes a time to
-- receive a Request again. If we immediately call recv,
-- it is likely to fail and the IO manager works.
-- It is very costy. So, we yield to another Haskell
-- thread hoping that the next Request will arraive
-- when this Haskell thread will be re-scheduled.
-- This improves performance at least when
-- the number of cores is small.
Conc.yield
-- We just send a Response and it takes a time to
-- receive a Request again. If we immediately call recv,
-- it is likely to fail and the IO manager works.
-- It is very costy. So, we yield to another Haskell
-- thread hoping that the next Request will arraive
-- when this Haskell thread will be re-scheduled.
-- This improves performance at least when
-- the number of cores is small.
Conc.yield
when keepAlive $ do
-- flush the rest of the request body
requestBody req $$ CL.sinkNull
ResumableSource fromClient' _ <- getSource
T.resume th
recvSendLoop istatus fromClient'
Just intercept -> do
T.pause th
ResumableSource fromClient' _ <- getSource
intercept fromClient' conn
when keepAlive $ do
-- flush the rest of the request body
flushBody $ requestBody req
T.resume th
recvSendLoop istatus fromClient
connSource :: Connection -> T.Handle -> IORef Bool -> Source IO ByteString
connSource Connection { connRecv = recv } th istatus = src
flushBody :: IO ByteString -> IO ()
flushBody src =
loop
where
src = do
bs <- liftIO recv
unless (S.null bs) $ do
liftIO $ do
writeIORef istatus True
when (S.length bs >= 2048) $ T.tickle th
yield bs
src
loop = do
bs <- src
unless (S.null bs) loop
connSource :: Connection -> T.Handle -> IORef Bool -> IO ByteString
connSource Connection { connRecv = recv } th istatus = do
bs <- recv
unless (S.null bs) $ do
writeIORef istatus True
when (S.length bs >= 2048) $ T.tickle th
return bs
-- Copied from: https://github.com/mzero/plush/blob/master/src/Plush/Server/Warp.hs
setSocketCloseOnExec :: Socket -> IO ()

View File

@ -8,8 +8,7 @@ import Control.Monad (when)
import qualified Data.ByteString as S
import qualified Data.Text as T
import qualified Data.Text.IO as TIO
import Data.Conduit
import Data.Conduit.Network (HostPreference)
import Data.Streaming.Network (HostPreference)
import GHC.IO.Exception (IOErrorType(..))
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
@ -40,7 +39,6 @@ data Settings = Settings
, settingsOnOpen :: SockAddr -> IO Bool -- ^ What to do when a connection is open. When 'False' is returned, the connection is closed immediately. Otherwise, the connection is going on. Default: always returns 'True'.
, settingsOnClose :: SockAddr -> IO () -- ^ What to do when a connection is close. Default: do nothing.
, settingsTimeout :: Int -- ^ Timeout value in seconds. Default value: 30
, settingsIntercept :: Request -> IO (Maybe (Source IO S.ByteString -> Connection -> IO ()))
, settingsManager :: Maybe Manager -- ^ Use an existing timeout manager instead of spawning a new one. If used, 'settingsTimeout' is ignored. Default is 'Nothing'
, settingsFdCacheDuration :: Int -- ^ Cache duratoin time of file descriptors in seconds. 0 means that the cache mechanism is not used. Default value: 10
, settingsBeforeMainLoop :: IO ()
@ -72,7 +70,6 @@ defaultSettings = Settings
, settingsOnOpen = const $ return True
, settingsOnClose = const $ return ()
, settingsTimeout = 30
, settingsIntercept = const (return Nothing)
, settingsManager = Nothing
, settingsFdCacheDuration = 10
, settingsBeforeMainLoop = return ()
@ -112,7 +109,6 @@ exceptionResponseForDebug e = responseLBS H.internalServerError500 [(H.hContentT
{-# DEPRECATED settingsOnOpen "Use setOnOpen instead" #-}
{-# DEPRECATED settingsOnClose "Use setOnClose instead" #-}
{-# DEPRECATED settingsTimeout "Use setTimeout instead" #-}
{-# DEPRECATED settingsIntercept "Use setIntercept instead" #-}
{-# DEPRECATED settingsManager "Use setManager instead" #-}
{-# DEPRECATED settingsFdCacheDuration "Use setFdCacheDuration instead" #-}
{-# DEPRECATED settingsBeforeMainLoop "Use setBeforeMainLoop instead" #-}

View File

@ -13,6 +13,8 @@ import qualified Network.Wai.Handler.Warp.Date as D
import qualified Network.Wai.Handler.Warp.FdCache as F
import qualified Network.Wai.Handler.Warp.Timeout as T
import Network.Wai.Handler.Warp.Buffer (Buffer,BufSize)
import qualified Data.ByteString as S
import Data.IORef (IORef, readIORef, writeIORef, newIORef)
----------------------------------------------------------------
@ -87,3 +89,31 @@ data InternalInfo = InternalInfo {
, fdCacher :: Maybe F.MutableFdCache
, dateCacher :: D.DateCache
}
----------------------------------------------------------------
data Source = Source !(IORef ByteString) !(IO ByteString)
mkSource :: IO ByteString -> IO Source
mkSource func = do
ref <- newIORef S.empty
return $! Source ref func
readSource :: Source -> IO ByteString
readSource (Source ref func) = do
bs <- readIORef ref
if S.null bs
then func
else do
writeIORef ref S.empty
return bs
-- | Read from a Source, ignoring any leftovers.
readSource' :: Source -> IO ByteString
readSource' (Source _ func) = func
leftoverSource :: Source -> ByteString -> IO ()
leftoverSource (Source ref _) bs = writeIORef ref bs
readLeftoverSource :: Source -> IO ByteString
readLeftoverSource (Source ref _) = readIORef ref

View File

@ -2,10 +2,10 @@
module ConduitSpec (main, spec) where
import Network.Wai.Handler.Warp.Conduit
import Data.Conduit
import qualified Data.Conduit.List as CL
import Network.Wai.Handler.Warp.Types
import Control.Monad (replicateM)
import Test.Hspec
import qualified Data.IORef as I
import Data.IORef as I
import qualified Data.ByteString as S
main :: IO ()
@ -14,19 +14,33 @@ main = hspec spec
spec :: Spec
spec = describe "conduit" $ do
it "IsolatedBSSource" $ do
(rsrc, ()) <- mapM_ yield (map S.singleton [1..50]) $$+ return ()
ibs <- fmap IsolatedBSSource $ I.newIORef (40, rsrc)
x <- ibsIsolate ibs $$ CL.take 20
ref <- newIORef $ map S.singleton [1..50]
src <- mkSource $ do
x <- readIORef ref
case x of
[] -> return S.empty
y:z -> do
writeIORef ref z
return y
isrc <- mkISource src 40
x <- replicateM 20 $ readISource isrc
S.concat x `shouldBe` S.pack [1..20]
y <- ibsIsolate ibs $$ CL.consume
y <- replicateM 40 $ readISource isrc
S.concat y `shouldBe` S.pack [21..40]
rsrc' <- ibsDone ibs
z <- rsrc' $$+- CL.consume
z <- replicateM 40 $ readSource src
S.concat z `shouldBe` S.pack [41..50]
it "chunkedSource" $ do
(rsrc, ()) <- yield "5\r\n12345\r\n3\r\n678\r\n0\r\nBLAH" $$+ return ()
ref <- I.newIORef (rsrc, NeedLen)
x <- chunkedSource ref $$ CL.consume
ref <- newIORef $ "5\r\n12345\r\n3\r\n678\r\n0\r\n\r\nBLAH"
src <- mkSource $ do
x <- readIORef ref
writeIORef ref S.empty
return x
csrc <- mkCSource src
x <- replicateM 15 $ readCSource csrc
S.concat x `shouldBe` "12345678"
y <- replicateM 15 $ readSource src
S.concat y `shouldBe` "BLAH"

View File

@ -4,17 +4,16 @@
module RequestSpec (main, spec) where
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.List (sourceList)
import Network.Wai.Handler.Warp.Request
import Network.Wai.Handler.Warp.RequestHeader (parseByteRanges)
import Network.Wai.Handler.Warp.Types
import Test.Hspec
import Test.Hspec.QuickCheck
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import qualified Data.ByteString.Lazy as L
import qualified Network.HTTP.Types.Header as HH
import Data.IORef
main :: IO ()
main = hspec spec
@ -23,9 +22,9 @@ spec :: Spec
spec = do
describe "headerLines" $ do
it "takes until blank" $
blankSafe >>= (`shouldBe` (Nothing, ["foo", "bar", "baz"]))
blankSafe >>= (`shouldBe` ("", ["foo", "bar", "baz"]))
it "ignored leading whitespace in bodies" $
whiteSafe >>= (`shouldBe` (Just " hi there", ["foo", "bar", "baz"]))
whiteSafe >>= (`shouldBe` (" hi there", ["foo", "bar", "baz"]))
it "throws OverLargeHeader when too many" $
tooMany `shouldThrow` overLargeHeader
it "throws OverLargeHeader when too large" $
@ -35,8 +34,9 @@ spec = do
[ "GET / HTTP/1.1\r\nConnection: Close\r"
, "\n\r\n"
]
(mleftover, actual) <- mapM_ yield chunks $$ headerLines
mleftover `shouldBe` Nothing
(actual, src) <- headerLinesList' chunks
leftover <- readLeftoverSource src
leftover `shouldBe` S.empty
actual `shouldBe` ["GET / HTTP/1.1", "Connection: Close"]
prop "random chunking" $ \breaks extraS -> do
let bsFull = "GET / HTTP/1.1\r\nConnection: Close\r\n\r\n" `S8.append` extra
@ -47,11 +47,9 @@ spec = do
bs1 : loop xs bs2
where
(bs1, bs2) = S8.splitAt ((x `mod` 10) + 1) bs
(leftover, actual) <- mapM_ yield chunks $$ do
(_, actual) <- headerLines
x' <- CB.take (length extraS)
let x = S8.concat $ L.toChunks x'
return (x, actual)
(actual, src) <- headerLinesList' chunks
leftover <- consumeLen (length extraS) src
actual `shouldBe` ["GET / HTTP/1.1", "Connection: Close"]
leftover `shouldBe` extra
describe "parseByteRanges" $ do
@ -63,10 +61,41 @@ spec = do
test "foobytes=9500-" Nothing
test "bytes=0-0,-1" $ Just [HH.ByteRangeFromTo 0 0, HH.ByteRangeSuffix 1]
where
blankSafe = (sourceList ["f", "oo\n", "bar\nbaz\n\r\n"]) $$ headerLines
whiteSafe = (sourceList ["foo\r\nbar\r\nbaz\r\n\r\n hi there"]) $$ headerLines
tooMany = (sourceList $ repeat "f\n") $$ headerLines
tooLarge = (sourceList $ repeat "f") $$ headerLines
blankSafe = headerLinesList ["f", "oo\n", "bar\nbaz\n\r\n"]
whiteSafe = headerLinesList ["foo\r\nbar\r\nbaz\r\n\r\n hi there"]
tooMany = headerLinesList $ repeat "f\n"
tooLarge = headerLinesList $ repeat "f"
headerLinesList orig = do
(res, src) <- headerLinesList' orig
leftover <- readLeftoverSource src
return (leftover, res)
headerLinesList' orig = do
ref <- newIORef orig
let src = do
x <- readIORef ref
case x of
[] -> return S.empty
y:z -> do
writeIORef ref z
return y
src' <- mkSource src
res <- headerLines src'
return (res, src')
consumeLen len0 src =
loop id len0
where
loop front len
| len <= 0 = return $ S.concat $ front []
| otherwise = do
bs <- readSource src
if S.null bs
then loop front 0
else do
let (x, y) = S.splitAt len bs
loop (front . (x:)) (len - S.length x)
overLargeHeader :: Selector InvalidRequest
overLargeHeader e = e == OverLargeHeader

View File

@ -5,15 +5,14 @@
module RunSpec (main, spec, withApp) where
import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Monad (forM_, replicateM_)
import Control.Monad (forM_, replicateM_, unless)
import System.Timeout (timeout)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr, hGetSome)
import qualified Control.Exception as E
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import qualified Data.ByteString.Lazy as L
import Data.Conduit (($$), (=$))
import qualified Data.Conduit.List
import qualified Data.IORef as I
import Network (connectTo, PortID (PortNumber))
import Network.HTTP.Types
@ -27,8 +26,6 @@ import Control.Exception.Lifted (bracket, try, IOException, onException)
import Data.Streaming.Network (bindPortTCP, getSocketTCP, safeRecv)
import Network.Socket (sClose)
import qualified Network.HTTP as HTTP
import Data.Conduit (Flush (Chunk), ($=))
import qualified Data.Conduit.List as CL
import Blaze.ByteString.Builder (fromByteString)
import Network.Socket.ByteString (sendAll)
@ -49,7 +46,7 @@ err icount msg = liftIO $ I.writeIORef icount $ Left $ show msg
readBody :: CounterApplication
readBody icount req = do
body <- requestBody req $$ Data.Conduit.List.consume
body <- consumeBody $ requestBody req
case () of
()
| pathInfo req == ["hello"] && L.fromChunks body /= "Hello"
@ -70,8 +67,8 @@ ignoreBody icount req = do
doubleConnect :: CounterApplication
doubleConnect icount req = do
_ <- requestBody req $$ Data.Conduit.List.consume
_ <- requestBody req $$ Data.Conduit.List.consume
_ <- consumeBody $ requestBody req
_ <- consumeBody $ requestBody req
incr icount
return $ responseLBS status200 [] "double connect"
@ -235,7 +232,7 @@ spec = do
it "works" $ do
ifront <- I.newIORef id
let app req = do
bss <- requestBody req $$ Data.Conduit.List.consume
bss <- consumeBody $ requestBody req
liftIO $ I.atomicModifyIORef ifront $ \front -> (front . (S.concat bss:), ())
return $ responseLBS status200 [] ""
withApp defaultSettings app $ \port -> do
@ -258,8 +255,8 @@ spec = do
it "lots of chunks" $ do
ifront <- I.newIORef id
let app req = do
bss <- requestBody req $$ Data.Conduit.List.consume
liftIO $ I.atomicModifyIORef ifront $ \front -> (front . (S.concat bss:), ())
bss <- consumeBody $ requestBody req
I.atomicModifyIORef ifront $ \front -> (front . (S.concat bss:), ())
return $ responseLBS status200 [] ""
withApp defaultSettings app $ \port -> do
handle <- connectTo "127.0.0.1" $ PortNumber $ fromIntegral port
@ -269,13 +266,13 @@ spec = do
["0\r\n\r\n"]
mapM_ (\bs -> hPutStr handle bs >> hFlush handle) input
hClose handle
threadDelay 1000
threadDelay 100000 -- FIXME why does this delay need to be so high?
front <- I.readIORef ifront
front [] `shouldBe` replicate 2 (S.concat $ replicate 50 "12345")
it "in chunks" $ do
ifront <- I.newIORef id
let app req = do
bss <- requestBody req $$ Data.Conduit.List.consume
bss <- consumeBody $ requestBody req
liftIO $ I.atomicModifyIORef ifront $ \front -> (front . (S.concat bss:), ())
return $ responseLBS status200 [] ""
withApp defaultSettings app $ \port -> do
@ -297,10 +294,13 @@ spec = do
it "timeout in request body" $ do
ifront <- I.newIORef id
let app req = do
bss <- (requestBody req $$ Data.Conduit.List.consume) `onException`
bss <- (consumeBody $ requestBody req) `onException`
liftIO (I.atomicModifyIORef ifront (\front -> (front . ("consume interrupted":), ())))
liftIO $ threadDelay 4000000 `onException`
I.atomicModifyIORef ifront (\front -> (front . ("threadDelay interrupted":), ()))
liftIO $ threadDelay 4000000 `E.catch` \e -> do
I.atomicModifyIORef ifront (\front ->
( front . ((S8.pack $ "threadDelay interrupted: " ++ show e):)
, ()))
E.throwIO (e :: E.SomeException)
liftIO $ I.atomicModifyIORef ifront $ \front -> (front . (S.concat bss:), ())
return $ responseLBS status200 [] ""
withApp (setTimeout 1 defaultSettings) app $ \port -> do
@ -324,10 +324,13 @@ spec = do
it "works" $ do
let app _req = do
let backup = responseLBS status200 [] "Not raw"
return $ flip responseRaw backup $ \src sink ->
src
$$ Data.Conduit.List.map doubleBS
=$ sink
return $ flip responseRaw backup $ \src sink -> do
let loop = do
bs <- src
unless (S.null bs) $ do
sink $ doubleBS bs
loop
loop
doubleBS = S.concatMap $ \w -> S.pack [w, w]
withApp defaultSettings app $ \port -> do
handle <- connectTo "127.0.0.1" $ PortNumber $ fromIntegral port
@ -351,8 +354,13 @@ spec = do
`shouldBe` ["date"]
it "streaming echo #249" $ do
let app req = return $ responseSource status200 []
$ requestBody req $= CL.map (Chunk . fromByteString)
let app req = return $ responseStream status200 [] $ \write -> do
let loop = do
bs <- requestBody req
unless (S.null bs) $ do
write $ Just $ fromByteString bs
loop
loop
withApp defaultSettings app $ \port -> do
(socket, _addr) <- getSocketTCP "127.0.0.1" port
sendAll socket "POST / HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n"
@ -360,3 +368,13 @@ spec = do
sendAll socket "5\r\nhello\r\n0\r\n\r\n"
bs <- safeRecv socket 4096
S.takeWhile (/= 13) bs `shouldBe` "HTTP/1.1 200 OK"
consumeBody :: IO ByteString -> IO [ByteString]
consumeBody body =
loop id
where
loop front = do
bs <- body
if S.null bs
then return $ front []
else loop $ front . (bs:)

View File

@ -1,5 +1,5 @@
Name: warp
Version: 2.1.4.1
Version: 3.0.0
Synopsis: A fast, light-weight web server for WAI applications.
License: MIT
License-file: LICENSE
@ -39,20 +39,16 @@ Library
Build-Depends: base >= 3 && < 5
, array
, blaze-builder >= 0.3.3 && < 0.4
, blaze-builder-conduit >= 0.5 && < 1.2
, bytestring >= 0.9.1.4
, case-insensitive >= 0.2
, conduit >= 0.5 && < 1.2
, conduit-extra
, ghc-prim
, http-types >= 0.7
, lifted-base >= 0.1
, network-conduit >= 0.5 && < 1.2
, simple-sendfile >= 0.2.7 && < 0.3
, transformers >= 0.2.2 && < 0.4
, unix-compat >= 0.2
, void
, wai >= 2.1 && < 2.2
, wai >= 3.0 && < 3.1
, text
, streaming-commons
if flag(network-bytestring)
@ -125,15 +121,12 @@ Test-Suite spec
Build-Depends: base >= 4 && < 5
, array
, blaze-builder >= 0.3.3 && < 0.4
, blaze-builder-conduit >= 0.5
, bytestring >= 0.9.1.4
, case-insensitive >= 0.2
, conduit >= 0.5
, ghc-prim
, HTTP
, http-types >= 0.8.4
, lifted-base >= 0.1
, network-conduit
, simple-sendfile >= 0.2.4 && < 0.3
, transformers >= 0.2.2 && < 0.4
, unix-compat >= 0.2
@ -146,7 +139,6 @@ Test-Suite spec
, time
, old-locale
, text
, conduit-extra
, streaming-commons >= 0.1.1
, async