Export forSocketM, rename transformBytesWith

This commit is contained in:
Harendra Kumar 2021-04-22 00:04:21 +05:30
parent 22a42d081a
commit ae1ffbf520
6 changed files with 39 additions and 32 deletions

View File

@ -68,7 +68,7 @@ module Streamly.Internal.Network.Inet.TCP
, fromChunks
-- ** Transformation
, transformBytesWith
, processBytes
{-
-- ** Sink Servers
@ -446,12 +446,11 @@ withInputConnect addr port input f = S.bracket pre post handler
--
-- /Pre-release/
--
{-# INLINABLE transformBytesWith #-}
transformBytesWith
{-# INLINABLE processBytes #-}
processBytes
:: (IsStream t, MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> t m Word8
transformBytesWith addr port input =
withInputConnect addr port input ISK.toBytes
processBytes addr port input = withInputConnect addr port input ISK.toBytes

View File

@ -13,8 +13,8 @@ module Streamly.Internal.Network.Socket
(
SockSpec (..)
-- * Use a socket
, handleWithM
, handleWith
, forSocketM
, withSocket
-- * Accept connections
, accept
@ -105,25 +105,27 @@ import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
-- | @'handleWithM' socket act@ runs the monadic computation @act@ passing the
-- socket handle to it. The handle will be closed on exit from 'handleWithM',
-- whether by normal termination or by raising an exception. If closing the
-- handle raises an exception, then this exception will be raised by
-- 'handleWithM' rather than any exception raised by 'act'.
-- | @'forSocketM' action socket@ runs the monadic computation @action@ passing
-- the socket handle to it. The handle will be closed on exit from
-- 'forSocketM', whether by normal termination or by raising an exception. If
-- closing the handle raises an exception, then this exception will be raised
-- by 'forSocketM' rather than any exception raised by 'action'.
--
-- @since 0.7.0
{-# INLINE handleWithM #-}
handleWithM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
handleWithM f sk = finally (f sk) (liftIO (Net.close sk))
-- @since 0.8.0
{-# INLINE forSocketM #-}
forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
forSocketM f sk = finally (f sk) (liftIO (Net.close sk))
-- | Like 'handleWithM' but runs a streaming computation instead of a monadic
-- | Like 'forSocketM' but runs a streaming computation instead of a monadic
-- computation.
--
-- @since 0.7.0
{-# INLINE handleWith #-}
handleWith :: (IsStream t, MonadAsync m, MonadCatch m)
-- /Inhibits stream fusion/
--
-- /Internal/
{-# INLINE withSocket #-}
withSocket :: (IsStream t, MonadAsync m, MonadCatch m)
=> Socket -> (Socket -> t m a) -> t m a
handleWith sk f = S.finally (liftIO $ Net.close sk) (f sk)
withSocket sk f = S.finally (liftIO $ Net.close sk) (f sk)
-------------------------------------------------------------------------------
-- Accept (Unfolds)

View File

@ -22,6 +22,10 @@ module Streamly.Network.Inet.TCP
-- * Connect to Servers
, connect
-- XXX Expose this as a pipe when we have pipes.
-- * Transformation
-- , processBytes
{-
-- ** Sink Servers

View File

@ -2,7 +2,7 @@
-- Module : Streamly.Network.Socket
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
@ -32,7 +32,6 @@
--
-- import Data.Function ((&))
-- import Network.Socket
-- import Streamly.Internal.Network.Socket (handleWithM)
-- import Streamly.Network.Socket (SockSpec(..))
--
-- import qualified Streamly.Prelude as Stream
@ -52,8 +51,8 @@
--
-- server spec addr =
-- Stream.unfold Socket.accept (maxListenQueue, spec, addr) -- ParallelT IO Socket
-- & Stream.mapM (handleWithM echo) -- ParallelT IO ()
-- & fromParallel -- SerialT IO ()
-- & Stream.mapM (Socket.forSocketM echo) -- ParallelT IO ()
-- & Stream.fromParallel -- SerialT IO ()
-- & Stream.drain -- IO ()
--
-- echo sk =
@ -64,12 +63,12 @@
-- = Programmer Notes
--
-- Read IO requests to connected stream sockets are performed in chunks of
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'. Unless specified
-- otherwise in the API, writes are collected into chunks of
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'. Unless
-- specified otherwise in the API, writes are collected into chunks of
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize' before they are
-- written to the socket. APIs are provided to control the chunking behavior.
--
-- > import qualified Streamly.Network.Socket as SK
-- > import qualified Streamly.Network.Socket as Socket
--
-- = See Also
--
@ -113,6 +112,9 @@ module Streamly.Network.Socket
, writeChunks
, writeChunksWithBufferOf
, writeChunk
-- * Exceptions
, forSocketM
)
where

View File

@ -72,7 +72,7 @@ server
server listener port sem handler = do
putMVar sem ()
Stream.fromSerial (Stream.unfold listener port)
& (Stream.fromAsync . Stream.mapM (Socket.handleWithM handler))
& (Stream.fromAsync . Stream.mapM (Socket.forSocketM handler))
& Stream.drain
remoteAddr :: (Word8,Word8,Word8,Word8)
@ -85,7 +85,7 @@ sender port sem = do
Stream.replicate 1000 testData -- SerialT IO String
& Stream.concatMap Stream.fromList -- SerialT IO Char
& Unicode.encodeLatin1 -- SerialT IO Word8
& TCP.transformBytesWith remoteAddr port -- SerialT IO Word8
& TCP.processBytes remoteAddr port -- SerialT IO Word8
& Unicode.decodeLatin1 -- SerialT IO Char
execute

View File

@ -83,7 +83,7 @@ server :: PortNumber -> MVar () -> (Socket -> IO ()) -> IO ()
server port sem handler = do
putMVar sem ()
Stream.fromSerial (Stream.unfold TCP.acceptOnPort port)
& Stream.fromAsync . Stream.mapM (Socket.handleWithM handler)
& Stream.fromAsync . Stream.mapM (Socket.forSocketM handler)
& Stream.drain
remoteAddr :: (Word8,Word8,Word8,Word8)
@ -96,7 +96,7 @@ sender port sem = do
Stream.replicate 1000 testData -- SerialT IO String
& Stream.concatMap Stream.fromList -- SerialT IO Char
& Unicode.encodeLatin1 -- SerialT IO Word8
& TCP.transformBytesWith remoteAddr port -- SerialT IO Word8
& TCP.processBytes remoteAddr port -- SerialT IO Word8
& Unicode.decodeLatin1 -- SerialT IO Char
execute :: PortNumber -> Int -> (Socket -> IO ()) -> IO (SerialT IO Char)