Add pipeChunks transformation function

This commit is contained in:
He Zhenxing 2024-04-24 18:50:24 +08:00 committed by Harendra Kumar
parent b60aef3d51
commit 653127aa50

View File

@ -73,6 +73,7 @@ module Streamly.Internal.Network.Inet.TCP
-- ** Transformation
, pipeBytes
, pipeChunks
{-
-- ** Sink Servers
@ -461,19 +462,20 @@ withInputConnect
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m Word8
-> (Socket -> Stream m a)
-> Stream m a
withInputConnect addr port input f = S.bracket pre post handler
-> (Socket -> Stream m a)
-> (Socket -> Stream m a -> m ())
-> Stream m a
withInputConnect addr port input fread fwrite = S.bracket pre post handler
where
pre = do
sk <- liftIO $ connect addr port
tid <- fork (ISK.putBytes sk input)
tid <- fork (fwrite sk input)
return (sk, tid)
handler (sk, _) = f sk
handler (sk, _) = fread sk
-- XXX kill the thread immediately?
post (sk, _) = liftIO $ Net.close sk
@ -491,4 +493,17 @@ pipeBytes
-> PortNumber
-> Stream m Word8
-> Stream m Word8
pipeBytes addr port input = withInputConnect addr port input ISK.read
pipeBytes addr port input = withInputConnect addr port input ISK.read ISK.putBytes
-- | This is similar to pipeBytes, but works on chunks of data.
--
-- /Pre-release/
--
{-# INLINE pipeChunks #-}
pipeChunks
:: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Stream m (Array Word8)
-> Stream m (Array Word8)
pipeChunks addr port input = withInputConnect addr port input ISK.readChunks ISK.putChunks