Split the server combinators in a separate module

This commit is contained in:
Harendra Kumar 2019-05-31 12:43:26 +05:30
parent e347812091
commit a2e76766c3
3 changed files with 13 additions and 79 deletions

View File

@ -13,6 +13,7 @@ import qualified Streamly.FileSystem.File as File
import qualified Streamly.Fold as FL
import qualified Streamly.Mem.Array as A
import qualified Streamly.Network.Socket as NS
import qualified Streamly.Network.Server as NS
import qualified Streamly.Prelude as S
main :: IO ()
@ -22,7 +23,7 @@ main = do
$ encodeChar8Unchecked
$ S.concatMap A.read
$ S.concatMapBy parallel recv
$ NS.recvConnectionsOn 8090
$ NS.connectionsOnAllAddrs 8090
where

View File

@ -32,13 +32,8 @@
module Streamly.Network.Socket
(
-- ** Listen for Connections
ServerSpec(..)
, recvConnectionsWith
, recvConnectionsOn
-- ** Read a stream from a connection
, fromSocket
-- * Read from connection
fromSocket
, read
-- , readUtf8
-- , readLines
@ -53,7 +48,7 @@ module Streamly.Network.Socket
-- , readArraysOf
, readArrays
-- ** Write a stream to a connection
-- * Write to connection
, write
-- , writeUtf8
-- , writeUtf8ByLines
@ -63,6 +58,8 @@ module Streamly.Network.Socket
-- -- * Array Write
, writeArray
, writeArrays
-- reading/writing datagrams
)
where
@ -76,10 +73,7 @@ import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr, Ptr, castPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import Network.Socket
(Socket, PortNumber, SocketOption(..), Family(..),
SockAddr(..), withSocketsDo, SocketType(..), socket, accept, bind,
defaultProtocol, setSocketOption, maxListenQueue, sendBuf, recvBuf)
import Network.Socket (Socket, sendBuf, recvBuf)
#if MIN_VERSION_network(3,1,0)
import Network.Socket (withFdSocket)
#else
@ -89,7 +83,6 @@ import Prelude hiding (read)
import qualified Network.Socket as Net
import Streamly (MonadAsync)
import Streamly.Mem.Array.Types (Array(..))
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK.Type (IsStream, mkStream)
@ -100,71 +93,6 @@ import qualified Streamly.Mem.Array as A
import qualified Streamly.Mem.Array.Types as A hiding (flattenArrays)
import qualified Streamly.Prelude as S
-------------------------------------------------------------------------------
-- Listen
-------------------------------------------------------------------------------
-- | Specify the configuration of a server.
data ServerSpec = ServerSpec
{
serverAddressFamily :: !Family
, serverAddress :: !SockAddr
, serverSockOpts :: ![(SocketOption, Int)]
}
initListener :: Int -> ServerSpec -> IO Socket
initListener tcpListenQ ServerSpec{..} =
withSocketsDo $ do
sock <- socket serverAddressFamily Stream defaultProtocol
mapM_ (\(opt, val) -> setSocketOption sock opt val) serverSockOpts
bind sock serverAddress
Net.listen sock tcpListenQ
return sock
-- | Start a TCP stream server. The server listens for connections on the
-- supplied address (address family, local interface IP address and port) and
-- generates a stream of connected sockets and the endpoint they are connected
-- to. The first argument is the maximum number of pending connections in the
-- backlog.
{-# INLINE recvConnectionTuplesWith #-}
recvConnectionTuplesWith :: MonadAsync m
=> Int -> ServerSpec -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith tcpListenQ opts = S.unfoldrM step Nothing
where
step Nothing = do
listener <- liftIO $ initListener tcpListenQ opts
r <- liftIO $ accept listener
-- XXX error handling
return $ Just (r, Just listener)
step (Just listener) = do
r <- liftIO $ accept listener
-- XXX error handling
return $ Just (r, Just listener)
{-# INLINE recvConnectionsWith #-}
recvConnectionsWith :: MonadAsync m => Int -> ServerSpec -> SerialT m Socket
recvConnectionsWith tcpListenQ opts = fmap fst $
recvConnectionTuplesWith tcpListenQ opts
{-# INLINE recvConnectionTuplesOn #-}
recvConnectionTuplesOn :: MonadAsync m
=> PortNumber -> SerialT m (Socket, SockAddr)
recvConnectionTuplesOn port =
recvConnectionTuplesWith maxListenQueue ServerSpec
{ serverAddressFamily = AF_INET
, serverAddress = SockAddrInet port 0
, serverSockOpts = [(NoDelay,1), (ReuseAddr,1)]
}
{-# INLINE recvConnectionsOn #-}
recvConnectionsOn :: MonadAsync m => PortNumber -> SerialT m Socket
recvConnectionsOn = fmap fst . recvConnectionTuplesOn
-- | Read a stream of Word8 from a socket, closing the socket when done.
fromSocket :: (MonadCatch m, MonadIO m) => Socket -> SerialT m Word8
fromSocket sk = S.finally (liftIO (Net.close sk)) (read sk)
-------------------------------------------------------------------------------
-- Array IO (Input)
-------------------------------------------------------------------------------
@ -307,6 +235,10 @@ readByChunksUpto chunkSize h = A.flattenArrays $ readArraysUpto chunkSize h
read :: (IsStream t, MonadIO m) => Socket -> t m Word8
read = A.flattenArrays . readArrays
-- | Read a stream of Word8 from a socket, closing the socket when done.
fromSocket :: (MonadCatch m, MonadIO m) => Socket -> SerialT m Word8
fromSocket sk = S.finally (liftIO (Net.close sk)) (read sk)
-------------------------------------------------------------------------------
-- Writing
-------------------------------------------------------------------------------

View File

@ -218,6 +218,7 @@ library
, Streamly.FileSystem.File
, Streamly.FileSystem.Handle
, Streamly.Network.Socket
, Streamly.Network.Server
-- Time
, Streamly.Time.Units