diff --git a/src/Streamly/Internal/Data/Array/Stream/Foreign.hs b/src/Streamly/Internal/Data/Array/Stream/Foreign.hs index 35077ab5a..b6dc04867 100644 --- a/src/Streamly/Internal/Data/Array/Stream/Foreign.hs +++ b/src/Streamly/Internal/Data/Array/Stream/Foreign.hs @@ -38,9 +38,6 @@ module Streamly.Internal.Data.Array.Stream.Foreign -- * Compaction , lpackArraysChunksOf -#if !defined(mingw32_HOST_OS) - , groupIOVecsOf -#endif , compact -- * Splitting @@ -70,10 +67,6 @@ import GHC.Ptr (Ptr(..)) import GHC.Types (SPEC(..)) import Prelude hiding (null, last, (!!), read, concat, unlines) -#if !defined(mingw32_HOST_OS) -import Streamly.Internal.System.IOVec.Type (IOVec(..)) -#endif - import Streamly.Internal.BaseCompat import Streamly.Internal.Data.Array.Foreign.Type (Array(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) @@ -220,22 +213,6 @@ lpackArraysChunksOf :: (MonadIO m, Storable a) lpackArraysChunksOf n fld = FL.map A.unsafeThaw $ AS.lpackArraysChunksOf n (FL.map A.unsafeFreeze fld) -#if !defined(mingw32_HOST_OS) - --- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream --- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in --- each array and a maximum of @maxEntries@ entries in each array. --- --- @since 0.7.0 -{-# INLINE_NORMAL groupIOVecsOf #-} -groupIOVecsOf :: MonadIO m - => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec) -groupIOVecsOf n maxIOVLen str = - D.map A.unsafeFreeze - $ AS.groupIOVecsOf n maxIOVLen - $ D.map A.unsafeThaw str -#endif - -- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a -- maximum specified size in bytes. -- diff --git a/src/Streamly/Internal/Data/Array/Stream/Mut/Foreign.hs b/src/Streamly/Internal/Data/Array/Stream/Mut/Foreign.hs index b93a3ba77..f55f92ae3 100644 --- a/src/Streamly/Internal/Data/Array/Stream/Mut/Foreign.hs +++ b/src/Streamly/Internal/Data/Array/Stream/Mut/Foreign.hs @@ -17,9 +17,6 @@ module Streamly.Internal.Data.Array.Stream.Mut.Foreign , packArraysChunksOf , SpliceState (..) , lpackArraysChunksOf -#if !defined(mingw32_HOST_OS) - , groupIOVecsOf -#endif , compact , compactLE , compactEQ @@ -33,13 +30,6 @@ import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad (when) import Data.Bifunctor (first) import Foreign.Storable (Storable(..)) -#if !defined(mingw32_HOST_OS) -import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) -import Foreign.Ptr (castPtr) -import Streamly.Internal.System.IOVec.Type (IOVec(..)) -import Streamly.Internal.Data.Array.Foreign.Mut.Type (length) -import Streamly.Internal.Data.SVar (adaptState) -#endif import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.Stream.Serial (SerialT) @@ -50,8 +40,6 @@ import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.Stream.StreamD as D -import Prelude hiding (length) - -- | @arraysOf n stream@ groups the elements in the input stream into arrays of -- @n@ elements each. -- @@ -245,77 +233,3 @@ compactGE :: -- (MonadIO m, Storable a) => Int -> SerialT m (Array a) -> SerialT m (Array a) compactGE _n _xs = undefined -- D.fromStreamD $ D.foldMany (compactGEFold n) (D.toStreamD xs) - -------------------------------------------------------------------------------- --- IOVec -------------------------------------------------------------------------------- - -#if !defined(mingw32_HOST_OS) -data GatherState s arr - = GatherInitial s - | GatherBuffering s arr Int - | GatherYielding arr (GatherState s arr) - | GatherFinish - --- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream --- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in --- each array and a maximum of @maxEntries@ entries in each array. --- --- @since 0.7.0 -{-# INLINE_NORMAL groupIOVecsOf #-} -groupIOVecsOf :: MonadIO m - => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec) -groupIOVecsOf n maxIOVLen (D.Stream step state) = - D.Stream step' (GatherInitial state) - - where - - {-# INLINE_LATE step' #-} - step' gst (GatherInitial st) = do - when (n <= 0) $ - -- XXX we can pass the module string from the higher level API - error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of " - ++ "groups [" ++ show n ++ "] must be a natural number" - when (maxIOVLen <= 0) $ - -- XXX we can pass the module string from the higher level API - error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of " - ++ "IOVec entries [" ++ show n ++ "] must be a natural number" - r <- step (adaptState gst) st - case r of - D.Yield arr s -> do - let p = unsafeForeignPtrToPtr (aStart arr) - len = MArray.byteLength arr - iov <- liftIO $ MArray.newArray maxIOVLen - iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p) - (fromIntegral len)) - if len >= n - then return $ D.Skip (GatherYielding iov' (GatherInitial s)) - else return $ D.Skip (GatherBuffering s iov' len) - D.Skip s -> return $ D.Skip (GatherInitial s) - D.Stop -> return D.Stop - - step' gst (GatherBuffering st iov len) = do - r <- step (adaptState gst) st - case r of - D.Yield arr s -> do - let p = unsafeForeignPtrToPtr (aStart arr) - alen = MArray.byteLength arr - len' = len + alen - if len' > n || length iov >= maxIOVLen - then do - iov' <- liftIO $ MArray.newArray maxIOVLen - iov'' <- liftIO $ MArray.unsafeSnoc iov' (IOVec (castPtr p) - (fromIntegral alen)) - return $ D.Skip (GatherYielding iov - (GatherBuffering s iov'' alen)) - else do - iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p) - (fromIntegral alen)) - return $ D.Skip (GatherBuffering s iov' len') - D.Skip s -> return $ D.Skip (GatherBuffering s iov len) - D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish) - - step' _ GatherFinish = return D.Stop - - step' _ (GatherYielding iov next) = return $ D.Yield iov next -#endif diff --git a/src/Streamly/Internal/FileSystem/FD.hs b/src/Streamly/Internal/FileSystem/FD.hs index 6e5cab440..81e027d78 100644 --- a/src/Streamly/Internal/FileSystem/FD.hs +++ b/src/Streamly/Internal/FileSystem/FD.hs @@ -138,7 +138,7 @@ import Streamly.Internal.Data.Stream.Serial (SerialT) import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream) #if !defined(mingw32_HOST_OS) -import Streamly.Internal.Data.Array.Stream.Foreign (groupIOVecsOf) +import Streamly.Internal.System.IOVec (groupIOVecsOf) import Streamly.Internal.Data.Stream.StreamD (toStreamD) import Streamly.Internal.Data.Stream.StreamD.Type (fromStreamD) import qualified Streamly.Internal.FileSystem.FDIO as RawIO hiding (write) diff --git a/src/Streamly/Internal/FileSystem/FDIO.hs b/src/Streamly/Internal/FileSystem/FDIO.hs index 3bde1695e..b83daba29 100644 --- a/src/Streamly/Internal/FileSystem/FDIO.hs +++ b/src/Streamly/Internal/FileSystem/FDIO.hs @@ -22,6 +22,7 @@ module Streamly.Internal.FileSystem.FDIO where import Control.Monad (when) +import Streamly.Internal.System.IOVec.Type (IOVec) #if !defined(mingw32_HOST_OS) import Control.Concurrent (threadWaitWrite) import Data.Int (Int64) @@ -30,7 +31,7 @@ import Foreign.C.Error (throwErrnoIfMinus1RetryMayBlock) import Foreign.C.Types (CBool(..)) #endif import System.Posix.Internals (c_write, c_safe_write) -import Streamly.Internal.System.IOVec.Type (IOVec, c_writev, c_safe_writev) +import Streamly.Internal.System.IOVec.Type (c_writev, c_safe_writev) #endif import Foreign.C.Types (CSize(..), CInt(..)) diff --git a/src/Streamly/Internal/System/IOVec.hs b/src/Streamly/Internal/System/IOVec.hs new file mode 100644 index 000000000..10d706045 --- /dev/null +++ b/src/Streamly/Internal/System/IOVec.hs @@ -0,0 +1,127 @@ +-- | +-- Module : Streamly.Internal.System.IOVec +-- Copyright : (c) 2019 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +-- Low level IO routines interfacing the operating system. +-- + +module Streamly.Internal.System.IOVec + ( IOVec(..) + , c_writev + , c_safe_writev +#if !defined(mingw32_HOST_OS) + , groupIOVecsOf + , groupIOVecsOfMut +#endif + ) +where + +#include "inline.hs" + +#if !defined(mingw32_HOST_OS) +import Control.Monad (when) +import Control.Monad.IO.Class (MonadIO(..)) +import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.Ptr (castPtr) +import Streamly.Internal.Data.Array.Foreign.Mut.Type (length) +import Streamly.Internal.Data.SVar (adaptState) +import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..)) + +import qualified Streamly.Internal.Data.Array.Foreign.Type as Array +import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray +import qualified Streamly.Internal.Data.Stream.StreamD as D +#endif + +import Streamly.Internal.System.IOVec.Type + +import Prelude hiding (length) + +#if !defined(mingw32_HOST_OS) +data GatherState s arr + = GatherInitial s + | GatherBuffering s arr Int + | GatherYielding arr (GatherState s arr) + | GatherFinish + +-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream +-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in +-- each array and a maximum of @maxEntries@ entries in each array. +-- +-- @since 0.7.0 +{-# INLINE_NORMAL groupIOVecsOfMut #-} +groupIOVecsOfMut :: MonadIO m + => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec) +groupIOVecsOfMut n maxIOVLen (D.Stream step state) = + D.Stream step' (GatherInitial state) + + where + + {-# INLINE_LATE step' #-} + step' gst (GatherInitial st) = do + when (n <= 0) $ + -- XXX we can pass the module string from the higher level API + error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of " + ++ "groups [" ++ show n ++ "] must be a natural number" + when (maxIOVLen <= 0) $ + -- XXX we can pass the module string from the higher level API + error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of " + ++ "IOVec entries [" ++ show n ++ "] must be a natural number" + r <- step (adaptState gst) st + case r of + D.Yield arr s -> do + let p = unsafeForeignPtrToPtr (aStart arr) + len = MArray.byteLength arr + iov <- liftIO $ MArray.newArray maxIOVLen + iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p) + (fromIntegral len)) + if len >= n + then return $ D.Skip (GatherYielding iov' (GatherInitial s)) + else return $ D.Skip (GatherBuffering s iov' len) + D.Skip s -> return $ D.Skip (GatherInitial s) + D.Stop -> return D.Stop + + step' gst (GatherBuffering st iov len) = do + r <- step (adaptState gst) st + case r of + D.Yield arr s -> do + let p = unsafeForeignPtrToPtr (aStart arr) + alen = MArray.byteLength arr + len' = len + alen + if len' > n || length iov >= maxIOVLen + then do + iov' <- liftIO $ MArray.newArray maxIOVLen + iov'' <- liftIO $ MArray.unsafeSnoc iov' (IOVec (castPtr p) + (fromIntegral alen)) + return $ D.Skip (GatherYielding iov + (GatherBuffering s iov'' alen)) + else do + iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p) + (fromIntegral alen)) + return $ D.Skip (GatherBuffering s iov' len') + D.Skip s -> return $ D.Skip (GatherBuffering s iov len) + D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish) + + step' _ GatherFinish = return D.Stop + + step' _ (GatherYielding iov next) = return $ D.Yield iov next + +-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream +-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in +-- each array and a maximum of @maxEntries@ entries in each array. +-- +-- @since 0.7.0 +{-# INLINE_NORMAL groupIOVecsOf #-} +groupIOVecsOf :: MonadIO m + => Int + -> Int + -> D.Stream m (Array.Array a) -> D.Stream m (Array.Array IOVec) +groupIOVecsOf n maxIOVLen str = + D.map Array.unsafeFreeze + $ groupIOVecsOfMut n maxIOVLen + $ D.map Array.unsafeThaw str +#endif diff --git a/streamly.cabal b/streamly.cabal index 336a9c466..2628eaff8 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -458,6 +458,9 @@ library -- Memory storage , Streamly.Internal.Ring.Foreign + -- IOVec (depends on arrays/streams) + , Streamly.Internal.System.IOVec + -- streamly-unicode , Streamly.Internal.Unicode.Stream , Streamly.Internal.Unicode.Utf8