Move IOVec from Array streams to System.IOVec

This commit is contained in:
Harendra Kumar 2021-08-17 23:49:24 +05:30
parent 86f6c4eed4
commit 8268299277
6 changed files with 133 additions and 111 deletions

View File

@ -38,9 +38,6 @@ module Streamly.Internal.Data.Array.Stream.Foreign
-- * Compaction
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
, groupIOVecsOf
#endif
, compact
-- * Splitting
@ -70,10 +67,6 @@ import GHC.Ptr (Ptr(..))
import GHC.Types (SPEC(..))
import Prelude hiding (null, last, (!!), read, concat, unlines)
#if !defined(mingw32_HOST_OS)
import Streamly.Internal.System.IOVec.Type (IOVec(..))
#endif
import Streamly.Internal.BaseCompat
import Streamly.Internal.Data.Array.Foreign.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
@ -220,22 +213,6 @@ lpackArraysChunksOf :: (MonadIO m, Storable a)
lpackArraysChunksOf n fld =
FL.map A.unsafeThaw $ AS.lpackArraysChunksOf n (FL.map A.unsafeFreeze fld)
#if !defined(mingw32_HOST_OS)
-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOf n maxIOVLen str =
D.map A.unsafeFreeze
$ AS.groupIOVecsOf n maxIOVLen
$ D.map A.unsafeThaw str
#endif
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--

View File

@ -17,9 +17,6 @@ module Streamly.Internal.Data.Array.Stream.Mut.Foreign
, packArraysChunksOf
, SpliceState (..)
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
, groupIOVecsOf
#endif
, compact
, compactLE
, compactEQ
@ -33,13 +30,6 @@ import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (when)
import Data.Bifunctor (first)
import Foreign.Storable (Storable(..))
#if !defined(mingw32_HOST_OS)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (castPtr)
import Streamly.Internal.System.IOVec.Type (IOVec(..))
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar (adaptState)
#endif
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT)
@ -50,8 +40,6 @@ import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD as D
import Prelude hiding (length)
-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
@ -245,77 +233,3 @@ compactGE :: -- (MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compactGE _n _xs = undefined
-- D.fromStreamD $ D.foldMany (compactGEFold n) (D.toStreamD xs)
-------------------------------------------------------------------------------
-- IOVec
-------------------------------------------------------------------------------
#if !defined(mingw32_HOST_OS)
data GatherState s arr
= GatherInitial s
| GatherBuffering s arr Int
| GatherYielding arr (GatherState s arr)
| GatherFinish
-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOf n maxIOVLen (D.Stream step state) =
D.Stream step' (GatherInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (GatherInitial st) = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
++ "groups [" ++ show n ++ "] must be a natural number"
when (maxIOVLen <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
++ "IOVec entries [" ++ show n ++ "] must be a natural number"
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
len = MArray.byteLength arr
iov <- liftIO $ MArray.newArray maxIOVLen
iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral len))
if len >= n
then return $ D.Skip (GatherYielding iov' (GatherInitial s))
else return $ D.Skip (GatherBuffering s iov' len)
D.Skip s -> return $ D.Skip (GatherInitial s)
D.Stop -> return D.Stop
step' gst (GatherBuffering st iov len) = do
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
alen = MArray.byteLength arr
len' = len + alen
if len' > n || length iov >= maxIOVLen
then do
iov' <- liftIO $ MArray.newArray maxIOVLen
iov'' <- liftIO $ MArray.unsafeSnoc iov' (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherYielding iov
(GatherBuffering s iov'' alen))
else do
iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherBuffering s iov' len')
D.Skip s -> return $ D.Skip (GatherBuffering s iov len)
D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish)
step' _ GatherFinish = return D.Stop
step' _ (GatherYielding iov next) = return $ D.Yield iov next
#endif

View File

@ -138,7 +138,7 @@ import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
#if !defined(mingw32_HOST_OS)
import Streamly.Internal.Data.Array.Stream.Foreign (groupIOVecsOf)
import Streamly.Internal.System.IOVec (groupIOVecsOf)
import Streamly.Internal.Data.Stream.StreamD (toStreamD)
import Streamly.Internal.Data.Stream.StreamD.Type (fromStreamD)
import qualified Streamly.Internal.FileSystem.FDIO as RawIO hiding (write)

View File

@ -22,6 +22,7 @@ module Streamly.Internal.FileSystem.FDIO
where
import Control.Monad (when)
import Streamly.Internal.System.IOVec.Type (IOVec)
#if !defined(mingw32_HOST_OS)
import Control.Concurrent (threadWaitWrite)
import Data.Int (Int64)
@ -30,7 +31,7 @@ import Foreign.C.Error (throwErrnoIfMinus1RetryMayBlock)
import Foreign.C.Types (CBool(..))
#endif
import System.Posix.Internals (c_write, c_safe_write)
import Streamly.Internal.System.IOVec.Type (IOVec, c_writev, c_safe_writev)
import Streamly.Internal.System.IOVec.Type (c_writev, c_safe_writev)
#endif
import Foreign.C.Types (CSize(..), CInt(..))

View File

@ -0,0 +1,127 @@
-- |
-- Module : Streamly.Internal.System.IOVec
-- Copyright : (c) 2019 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--
module Streamly.Internal.System.IOVec
( IOVec(..)
, c_writev
, c_safe_writev
#if !defined(mingw32_HOST_OS)
, groupIOVecsOf
, groupIOVecsOfMut
#endif
)
where
#include "inline.hs"
#if !defined(mingw32_HOST_OS)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (castPtr)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar (adaptState)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))
import qualified Streamly.Internal.Data.Array.Foreign.Type as Array
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif
import Streamly.Internal.System.IOVec.Type
import Prelude hiding (length)
#if !defined(mingw32_HOST_OS)
data GatherState s arr
= GatherInitial s
| GatherBuffering s arr Int
| GatherYielding arr (GatherState s arr)
| GatherFinish
-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOfMut #-}
groupIOVecsOfMut :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOfMut n maxIOVLen (D.Stream step state) =
D.Stream step' (GatherInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (GatherInitial st) = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
++ "groups [" ++ show n ++ "] must be a natural number"
when (maxIOVLen <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
++ "IOVec entries [" ++ show n ++ "] must be a natural number"
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
len = MArray.byteLength arr
iov <- liftIO $ MArray.newArray maxIOVLen
iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral len))
if len >= n
then return $ D.Skip (GatherYielding iov' (GatherInitial s))
else return $ D.Skip (GatherBuffering s iov' len)
D.Skip s -> return $ D.Skip (GatherInitial s)
D.Stop -> return D.Stop
step' gst (GatherBuffering st iov len) = do
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
alen = MArray.byteLength arr
len' = len + alen
if len' > n || length iov >= maxIOVLen
then do
iov' <- liftIO $ MArray.newArray maxIOVLen
iov'' <- liftIO $ MArray.unsafeSnoc iov' (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherYielding iov
(GatherBuffering s iov'' alen))
else do
iov' <- liftIO $ MArray.unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherBuffering s iov' len')
D.Skip s -> return $ D.Skip (GatherBuffering s iov len)
D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish)
step' _ GatherFinish = return D.Stop
step' _ (GatherYielding iov next) = return $ D.Yield iov next
-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int
-> Int
-> D.Stream m (Array.Array a) -> D.Stream m (Array.Array IOVec)
groupIOVecsOf n maxIOVLen str =
D.map Array.unsafeFreeze
$ groupIOVecsOfMut n maxIOVLen
$ D.map Array.unsafeThaw str
#endif

View File

@ -458,6 +458,9 @@ library
-- Memory storage
, Streamly.Internal.Ring.Foreign
-- IOVec (depends on arrays/streams)
, Streamly.Internal.System.IOVec
-- streamly-unicode
, Streamly.Internal.Unicode.Stream
, Streamly.Internal.Unicode.Utf8