Initial conduit 0.4 migration

This commit is contained in:
Michael Snoyman 2012-03-27 10:32:58 +02:00
parent 335414874a
commit e16466acae
3 changed files with 102 additions and 95 deletions

View File

@ -20,7 +20,7 @@ Library
Build-Depends: base >= 4 && < 5
, bytestring >= 0.9.1.4 && < 0.10
, blaze-builder >= 0.2.1.4 && < 0.4
, conduit >= 0.3 && < 0.4
, conduit >= 0.4 && < 0.5
, network >= 2.2.1.5 && < 2.4
, http-types >= 0.6 && < 0.7
, text >= 0.7 && < 0.12

View File

@ -95,7 +95,7 @@ import Network.Sendfile
import qualified System.PosixCompat.Files as P
import Control.Monad.IO.Class (liftIO)
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Timeout as T
import Timeout (Manager, registerKillThread, pause, resume)
import Data.Word (Word8)
@ -210,6 +210,65 @@ runSettingsConnection set getConn app = do
T.cancel th
return ()
-- | Contains a @Source@ and a byte count that is still to be read in.
newtype IsolatedBSSource = IsolatedBSSource (I.IORef (Int, C.Source (ResourceT IO) ByteString))
-- | Given an @IsolatedBSSource@ provide a @Source@ that only allows up to the
-- specified number of bytes to be passed downstream. All leftovers should be
-- retained within the @Source@. If there are not enough bytes available,
-- throws a @ConnectionClosedByPeer@ exception.
ibsIsolate :: IsolatedBSSource -> C.Source (ResourceT IO) ByteString
ibsIsolate ibs@(IsolatedBSSource ref) =
C.PipeM pull (return ())
where
pull = do
(count, src) <- liftIO $ I.readIORef ref
if count == 0
-- No more bytes wanted downstream, so we're done.
then return $ C.Done Nothing ()
else do
-- Get the next chunk (if available) and the updated source
(src', mbs) <- src C.$$& CL.head
-- If no chunk available, then there aren't enough bytes in the
-- stream. Throw a ConnectionClosedByPeer
bs <- maybe (liftIO $ throwIO ConnectionClosedByPeer) return mbs
let -- How many of the bytes in this chunk to send downstream
toSend = min count (S.length bs)
-- How many bytes will still remain to be sent downstream
count' = count - toSend
case () of
()
-- The expected count is greater than the size of the
-- chunk we just read. Send the entire chunk
-- downstream, and then loop on this function for the
-- next chunk.
| count' > 0 -> do
liftIO $ I.writeIORef ref (count', src')
return $ C.HaveOutput (ibsIsolate ibs) (return ()) bs
-- The expected count is the total size of the chunk we
-- just read. Send this chunk downstream, and then
-- terminate the stream.
| count == S.length bs -> do
liftIO $ I.writeIORef ref (count', src')
return $ C.HaveOutput (C.Done Nothing ()) (return ()) bs
-- Some of the bytes in this chunk should not be sent
-- downstream. Split up the chunk into the sent and
-- not-sent parts, add the not-sent parts onto the new
-- source, and send the rest of the chunk downstream.
| otherwise -> do
let (x, y) = S.splitAt toSend bs
liftIO $ I.writeIORef ref (count', C.HaveOutput src' (return ()) y)
return $ C.HaveOutput (C.Done Nothing ()) (return ()) x
-- | Extract the underlying @Source@ from an @IsolatedBSSource@, which will not
-- perform any more isolation.
ibsDone :: IsolatedBSSource -> IO (C.Source (ResourceT IO) ByteString)
ibsDone (IsolatedBSSource ref) = fmap snd $ I.readIORef ref
serveConnection :: Settings
-> T.Handle
-> (SomeException -> IO ())
@ -223,11 +282,11 @@ serveConnection settings th onException port app conn remoteHost' =
where
serveConnection' :: ResourceT IO ()
serveConnection' = do
fromClient <- C.bufferSource $ connSource conn th
let fromClient = connSource conn th
serveConnection'' fromClient
serveConnection'' fromClient = do
(env, requireDone) <- parseRequest conn port remoteHost' fromClient
(env, ibs) <- parseRequest conn port remoteHost' fromClient
case settingsIntercept settings env of
Nothing -> do
-- Let the application run for as long as it wants
@ -235,25 +294,22 @@ serveConnection settings th onException port app conn remoteHost' =
res <- app env
-- flush the rest of the request body
requestBody env C.$$ CL.sinkNull
liftIO requireDone
ibsIsolate ibs C.$$ CL.sinkNull
fromClient' <- liftIO $ ibsDone ibs
liftIO $ T.resume th
keepAlive <- sendResponse th env conn res
when keepAlive $ serveConnection'' fromClient
when keepAlive $ serveConnection'' fromClient'
Just intercept -> do
liftIO $ T.pause th
intercept fromClient conn
-- Require that the entire request body has been consumed
type RequireDone = IO ()
parseRequest :: Connection -> Port -> SockAddr
-> C.BufferedSource (ResourceT IO) S.ByteString
-> ResourceT IO (Request, RequireDone)
parseRequest conn port remoteHost' src = do
headers' <- src C.$$ takeHeaders
parseRequest' conn port headers' remoteHost' src
-> C.Source (ResourceT IO) S.ByteString
-> ResourceT IO (Request, IsolatedBSSource)
parseRequest conn port remoteHost' src1 = do
(src2, headers') <- src1 C.$$& takeHeaders
parseRequest' conn port headers' remoteHost' src2
-- FIXME come up with good values here
bytesPerRead, maxTotalHeaderLength :: Int
@ -289,8 +345,8 @@ parseRequest' :: Connection
-> Port
-> [ByteString]
-> SockAddr
-> C.BufferedSource (ResourceT IO) S.ByteString
-> ResourceT IO (Request, RequireDone)
-> C.Source (ResourceT IO) S.ByteString -- FIXME was buffered
-> ResourceT IO (Request, IsolatedBSSource)
parseRequest' _ _ [] _ _ = throwIO $ NotEnoughLines []
parseRequest' conn port (firstLine:otherLines) remoteHost' src = do
(method, rpath', gets, httpversion) <- parseFirst firstLine
@ -307,64 +363,9 @@ parseRequest' conn port (firstLine:otherLines) remoteHost' src = do
Nothing -> 0
Just bs -> readInt bs
let serverName' = takeUntil 58 host -- ':'
(rbody, requireDone) <-
if len0 == 0
then return (mempty, return ())
else do
-- We can't use the standard isolate, as its counter is not
-- kept in a mutable variable.
lenRef <- liftIO $ I.newIORef len0
let isolate = C.NeedInput push C.Closed
push bs = flip C.ConduitM (return ()) $ do
len <- liftIO $ I.readIORef lenRef
let (a, b) = S.splitAt len bs
len' = len - S.length a
liftIO $ I.writeIORef lenRef len'
return $ if len' == 0
then
let mleftover = if S.null b then Nothing else Just b
final = C.Finished mleftover
in if S.null a
then final
else C.HaveOutput final (return ()) a
else C.HaveOutput isolate (return ()) a
-- Make sure that we don't connect to the source after the
-- isolate conduit closes.
--
-- Here's the issue: we fuse our buffered request body with
-- an isolate conduit which ensures no more than X bytes
-- are read. Suppose we read all X bytes, and then we call
-- requestBody again. What happens?
--
-- Previously, we would try to read one more chunk from the
-- buffered source. This is inherent to conduit: we
-- wouldn't know that the isolate Conduit isn't accepting
-- more data until after we've pushed some data to it. This
-- results in hanging, since there's no data available on
-- the wire.
--
-- Instead, we add a wrapper that checks if the request
-- body has already been depleted before making that first
-- pull.
--
-- Possible optimization: do away with the Conduit
-- entirely. However, this may be less efficient overall,
-- as we'd now have to check the BufferedSource status on
-- each call. Worth looking into.
wrap src' = C.SourceM
(do
len <- liftIO $ I.readIORef lenRef
if len <= 0
then return C.Closed
else return src')
(return ())
requireDone = do
len <- liftIO $ I.readIORef lenRef
when (len > 0) $ throwIO ConnectionClosedByPeer
return (wrap $ src C.$= isolate, requireDone)
(rbody, ibs) <- liftIO $ do
ibs <- fmap IsolatedBSSource $ I.newIORef (len0, src)
return (ibsIsolate ibs, ibs)
return (Request
{ requestMethod = method
, httpVersion = httpversion
@ -379,7 +380,7 @@ parseRequest' conn port (firstLine:otherLines) remoteHost' src = do
, remoteHost = remoteHost'
, requestBody = rbody
, vault = mempty
}, requireDone)
}, ibs)
takeUntil :: Word8 -> ByteString -> ByteString
@ -533,7 +534,7 @@ sendResponse th req conn r = sendResponse' r
T.tickle th
return isPersist
where
body = fmap (\x -> case x of
body = fmap2 (\x -> case x of
C.Flush -> flush
C.Chunk builder -> builder) bodyFlush
headers' = headers version s hs
@ -543,7 +544,13 @@ sendResponse th req conn r = sendResponse' r
chunk :: C.Conduit Builder (ResourceT IO) Builder
chunk = C.NeedInput push close
push x = C.HaveOutput chunk (return ()) (chunkedTransferEncoding x)
close = C.Open C.Closed (return ()) chunkedTransferTerminator
close = C.HaveOutput (C.Done Nothing ()) (return ()) chunkedTransferTerminator
fmap2 :: Functor m => (o1 -> o2) -> C.Pipe i o1 m r -> C.Pipe i o2 m r
fmap2 f (C.HaveOutput p c o) = C.HaveOutput (fmap2 f p) c (f o)
fmap2 f (C.NeedInput p c) = C.NeedInput (fmap2 f . p) (fmap2 f c)
fmap2 f (C.PipeM mp c) = C.PipeM (fmap (fmap2 f) mp) c
fmap2 _ (C.Done i x) = C.Done i x
parseHeaderNoAttr :: ByteString -> H.Header
parseHeaderNoAttr s =
@ -559,27 +566,27 @@ connSource :: Connection -> T.Handle -> C.Source (ResourceT IO) ByteString
connSource Connection { connRecv = recv } th =
src
where
src = C.SourceM (do
src = C.PipeM (do
bs <- liftIO recv
if S.null bs
then return C.Closed
then return $ C.Done Nothing ()
else do
when (S.length bs >= 2048) $ liftIO $ T.tickle th
return (C.Open src (return ()) bs))
return (C.HaveOutput src (return ()) bs))
(return ())
-- | Use 'connSendAll' to send this data while respecting timeout rules.
connSink :: Connection -> T.Handle -> C.Sink B.ByteString (ResourceT IO) ()
connSink Connection { connSendAll = send } th =
C.Processing push close
sink
where
sink = C.NeedInput push close
close = liftIO (T.resume th)
push x = C.SinkM $ do
liftIO $ do
T.resume th
send x
T.pause th
return (C.Processing push close)
push x = C.PipeM (liftIO $ do
T.resume th
send x
T.pause th
return sink) (liftIO $ T.resume th)
-- We pause timeouts before passing control back to user code. This ensures
-- that a timeout will only ever be executed when Warp is in control. We
-- also make sure to resume the timeout after the completion of user code
@ -599,7 +606,7 @@ data Settings = Settings
, settingsHost :: HostPreference -- ^ Default value: HostIPv4
, settingsOnException :: SomeException -> IO () -- ^ What to do with exceptions thrown by either the application or server. Default: ignore server-generated exceptions (see 'InvalidRequest') and print application-generated applications to stderr.
, settingsTimeout :: Int -- ^ Timeout value in seconds. Default value: 30
, settingsIntercept :: Request -> Maybe (C.BufferedSource (ResourceT IO) S.ByteString -> Connection -> ResourceT IO ())
, settingsIntercept :: Request -> Maybe (C.Source (ResourceT IO) S.ByteString -> Connection -> ResourceT IO ())
, settingsManager :: Maybe Manager -- ^ Use an existing timeout manager instead of spawning a new one. If used, 'settingsTimeout' is ignored. Default is 'Nothing'
}
@ -635,7 +642,7 @@ data THStatus = THStatus
takeHeaders :: C.Sink ByteString (ResourceT IO) [ByteString]
takeHeaders =
C.Processing (push (THStatus 0 id id)) close
C.NeedInput (push (THStatus 0 id id)) close
where
close = throwIO IncompleteHeaders
@ -650,7 +657,7 @@ takeHeaders =
let len' = len + bsLen
prepend' = prepend . S.append bs
status = THStatus len' lines prepend'
in C.Processing (push status) close
in C.NeedInput (push status) close
-- Found a newline at position end.
Just end ->
let start = end + 1 -- start of next chunk
@ -678,7 +685,7 @@ takeHeaders =
then let bs' = SU.unsafeDrop start bs
in push status bs'
-- no more bytes in this chunk, ask for more
else C.Processing (push status) close
else C.NeedInput (push status) close
where
bsLen = S.length bs
mnl = S.elemIndex 10 bs

View File

@ -21,9 +21,9 @@ Library
, bytestring >= 0.9.1.4 && < 0.10
, wai >= 1.2 && < 1.3
, transformers >= 0.2.2 && < 0.3
, conduit >= 0.3 && < 0.4
, network-conduit >= 0.3 && < 0.4
, blaze-builder-conduit >= 0.3 && < 0.4
, conduit >= 0.4 && < 0.5
, network-conduit >= 0.4 && < 0.5
, blaze-builder-conduit >= 0.4 && < 0.5
, lifted-base >= 0.1 && < 0.2
, blaze-builder >= 0.2.1.4 && < 0.4
, simple-sendfile >= 0.1 && < 0.3