Raw IO w/o buffering, add some array APIs

* Streamly.FileSystem.FD module for unbuffered IO. Buffering can be controlled
  from the stream itself.
* Support IO using writev as well

Array APIs include:

* coalesceChunksOf to coalesce smaller arrays in a stream into bigger ones
* unlinesArraysBy to join a stream of arrays using a line separator
* splitArraysOn to split a stream of arrays on a separator byte
This commit is contained in:
Harendra Kumar 2019-06-13 23:43:49 +05:30
parent e57ed67089
commit 694eab4e6a
11 changed files with 1325 additions and 150 deletions

View File

@ -18,12 +18,12 @@ import Gauge
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Mem.Array as A
import qualified Streamly.Prelude as S
import qualified Streamly.String as SS
#ifdef DEVBUILD
import Data.Char (ord, chr)
import System.IO (hSeek, SeekMode(..))
import qualified Streamly.Fold as FL
import qualified Streamly.String as SS
#endif
-- Input and output file handles
@ -94,8 +94,8 @@ main = do
Handles inh _ <- readIORef href
let s = FH.readArrays inh
larr <- S.last s
return $ case larr of
Nothing -> Nothing
case larr of
Nothing -> return Nothing
Just arr -> A.readIndex arr (A.length arr - 1)
-- Note: this cannot be fairly compared with GNU wc -c or wc -m as
-- wc uses lseek to just determine the file size rather than reading

View File

@ -2,40 +2,52 @@ import qualified Streamly.Prelude as S
import qualified Streamly.Fold as FL
import qualified Streamly.Mem.Array as A
import qualified Streamly.FileSystem.Handle as FH
import qualified System.IO as FH
-- import qualified Streamly.FileSystem.FD as FH
import qualified Streamly.String as SS
import Data.Char (ord)
import System.Environment (getArgs)
import System.IO (openFile, IOMode(..), stdout, Handle, hSeek, SeekMode(..))
import System.IO (IOMode(..), hSeek, SeekMode(..))
cat :: Handle -> IO ()
cat src = FH.writeArrays stdout $ FH.readArraysUpto (256*1024) src
cat :: FH.Handle -> IO ()
cat src = FH.writeArrays FH.stdout $ FH.readArraysOfUpto (256*1024) src
{-
cat src =
FH.writeByChunksOf (1024*1024) FH.stdout
$ FH.readByChunksUpto (16*1024) src
-}
cp :: Handle -> Handle -> IO ()
cp src dst = FH.writeArrays dst $ FH.readArraysUpto (256*1024) src
cp :: FH.Handle -> FH.Handle -> IO ()
cp src dst = FH.writeArrays dst $ FH.readArraysOfUpto (256*1024) src
{-
cp src dst =
FH.writeByChunksOf (1024*1024) dst
$ FH.readByChunksUpto (16*1024) src
-}
ord' :: Num a => Char -> a
ord' = (fromIntegral . ord)
wcl :: Handle -> IO ()
wcl :: FH.Handle -> IO ()
wcl src = print =<< (S.length
$ flip SS.foldLines FL.drain
$ SS.decodeChar8
$ FH.read src)
grepc :: String -> Handle -> IO ()
grepc :: String -> FH.Handle -> IO ()
grepc pat src = print . (subtract 1) =<< (S.length
$ FL.splitOn (A.fromList (map ord' pat)) FL.drain
$ FH.read src)
avgll :: Handle -> IO ()
avgll :: FH.Handle -> IO ()
avgll src = print =<< (FL.foldl' avg
$ FL.splitSuffixBy (== ord' '\n') FL.length
$ FH.read src)
where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length
toDouble = fmap (fromIntegral :: Int -> Double)
llhisto :: Handle -> IO ()
llhisto :: FH.Handle -> IO ()
llhisto src = print =<< (FL.foldl' (FL.classify FL.length)
$ S.map bucket
$ FL.splitSuffixBy (== ord' '\n') FL.length
@ -46,7 +58,7 @@ llhisto src = print =<< (FL.foldl' (FL.classify FL.length)
main :: IO ()
main = do
name <- fmap head getArgs
src <- openFile name ReadMode
src <- FH.openFile name ReadMode
let rewind = hSeek src AbsoluteSeek 0
rewind >> putStrLn "cat" >> cat src -- Unix cat program
@ -55,5 +67,5 @@ main = do
rewind >> putStr "avgll " >> avgll src -- get average line length
rewind >> putStr "llhisto " >> llhisto src -- get line length histogram
dst <- openFile "dst-xyz.txt" WriteMode
dst <- FH.openFile "dst-xyz.txt" WriteMode
rewind >> putStr "cp " >> cp src dst -- Unix cp program

View File

@ -0,0 +1,561 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.FileSystem.FD
-- Copyright : (c) 2019 Composewell Technologies
--
-- License : BSD3
-- Maintainer : harendra.kumar@gmail.com
-- Stability : experimental
-- Portability : GHC
--
-- This module is a an experimental replacement for
-- "Streamly.FileSystem.Handle". The former module provides IO facilities based
-- on the GHC Handle type. The APIs in this module avoid the GHC handle layer
-- and provide more explicit control over buffering.
--
-- Read and write data as streams and arrays to and from files.
--
-- This module provides read and write APIs based on handles. Before reading or
-- writing, a file must be opened first using 'openFile'. The 'Handle' returned
-- by 'openFile' is then used to access the file. A 'Handle' is backed by an
-- operating system file descriptor. When the 'Handle' is garbage collected the
-- underlying file descriptor is automatically closed. A handle can be
-- explicitly closed using 'closeFile'.
--
-- Reading and writing APIs are divided into two categories, sequential
-- streaming APIs and random or seekable access APIs. File IO APIs are quite
-- similar to "Streamly.Mem.Array" read write APIs. In that regard, arrays can
-- be considered as in-memory files or files can be considered as on-disk
-- arrays.
--
-- > import qualified Streamly.FileSystem.FD as FD
--
module Streamly.FileSystem.FD
(
-- * File Handles
Handle
, stdin
, stdout
, stderr
, openFile
-- TODO file path based APIs
-- , readFile
-- , writeFile
-- * Streaming IO
-- | Streaming APIs read or write data to or from a file or device
-- sequentially, they never perform a seek to a random location. When
-- reading, the stream is lazy and generated on-demand as the consumer
-- consumes it. Read IO requests to the IO device are performed in chunks
-- of 32KiB, this is referred to as @defaultChunkSize@ in the
-- documentation. One IO request may or may not read the full chunk. If the
-- whole stream is not consumed, it is possible that we may read slightly
-- more from the IO device than what the consumer needed. Unless specified
-- otherwise in the API, writes are collected into chunks of
-- @defaultChunkSize@ before they are written to the IO device.
-- Streaming APIs work for all kind of devices, seekable or non-seekable;
-- including disks, files, memory devices, terminals, pipes, sockets and
-- fifos. While random access APIs work only for files or devices that have
-- random access or seek capability for example disks, memory devices.
-- Devices like terminals, pipes, sockets and fifos do not have random
-- access capability.
-- ** Read File to Stream
, read
-- , readUtf8
-- , readLines
-- , readFrames
, readByChunksUpto
-- -- * Array Read
-- , readArrayUpto
-- , readArrayOf
, readArrays
, readArraysOfUpto
-- , readArraysOf
-- ** Write File from Stream
, write
-- , writeUtf8
-- , writeUtf8Lines
-- , writeFrames
, writeByChunksOf
-- -- * Array Write
-- , writeArray
, writeArrays
, writeArraysOfUpto
, writev
, writevArraysOfUpto
-- -- * Random Access (Seek)
-- -- | Unlike the streaming APIs listed above, these APIs apply to devices or
-- files that have random access or seek capability. This type of devices
-- include disks, files, memory devices and exclude terminals, pipes,
-- sockets and fifos.
--
-- , readIndex
-- , readSlice
-- , readSliceRev
-- , readAt -- read from a given position to th end of file
-- , readSliceArrayUpto
-- , readSliceArrayOf
-- , writeIndex
-- , writeSlice
-- , writeSliceRev
-- , writeAt -- start writing at the given position
-- , writeSliceArray
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
-- import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
-- import System.IO (Handle, hGetBufSome, hPutBuf)
import System.IO (IOMode)
import Prelude hiding (read)
import qualified GHC.IO.FD as FD
import qualified GHC.IO.Device as RawIO hiding (write)
import qualified Streamly.FileSystem.FDIO as RawIO
import Streamly.Mem.Array.Types (Array(..))
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamD (toStreamD)
import Streamly.Streams.StreamD.Type (fromStreamD)
import Streamly.Streams.StreamK.Type (IsStream, mkStream)
import Streamly.Mem.Array.Types (byteLength, defaultChunkSize, groupIOVecsOf)
-- import Streamly.Fold (Fold)
-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines)
import qualified Streamly.Mem.Array as A
import qualified Streamly.Prelude as S
import qualified Streamly.Streams.StreamD.Type as D
-------------------------------------------------------------------------------
-- References
-------------------------------------------------------------------------------
--
-- The following references may be useful to build an understanding about the
-- file API design:
--
-- http://www.linux-mag.com/id/308/ for blocking/non-blocking IO on linux.
-- https://lwn.net/Articles/612483/ Non-blocking buffered file read operations
-- https://en.wikipedia.org/wiki/C_file_input/output for C APIs.
-- https://docs.oracle.com/javase/tutorial/essential/io/file.html for Java API.
-- https://www.w3.org/TR/FileAPI/ for http file API.
-------------------------------------------------------------------------------
-- Handles
-------------------------------------------------------------------------------
-- XXX attach a finalizer
-- | A 'Handle' is returned by 'openFile' and is subsequently used to perform
-- read and write operations on a file.
--
newtype Handle = Handle FD.FD
-- | File handle for standard input
stdin :: Handle
stdin = Handle FD.stdin
-- | File handle for standard output
stdout :: Handle
stdout = Handle FD.stdout
-- | File handle for standard error
stderr :: Handle
stderr = Handle FD.stderr
-- XXX we can support all the flags that the "open" system call supports.
-- Instead of using RTS locking mechanism can we use system provided locking
-- instead?
--
-- | Open a file that is not a directory and return a file handle.
-- 'openFile' enforces a multiple-reader single-writer locking on files. That
-- is, there may either be many handles on the same file which manage input, or
-- just one handle on the file which manages output. If any open handle is
-- managing a file for output, no new handle can be allocated for that file. If
-- any open handle is managing a file for input, new handles can only be
-- allocated if they do not manage output. Whether two files are the same is
-- implementation-dependent, but they should normally be the same if they have
-- the same absolute path name and neither has been renamed, for example.
--
openFile :: FilePath -> IOMode -> IO Handle
openFile path mode = fmap (Handle . fst) $ FD.openFile path mode True
-------------------------------------------------------------------------------
-- Array IO (Input)
-------------------------------------------------------------------------------
-- | Read a 'ByteArray' from a file handle. If no data is available on the
-- handle it blocks until some data becomes available. If data is available
-- then it immediately returns that data without blocking. It reads a maximum
-- of up to the size requested.
{-# INLINABLE readArrayUpto #-}
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto size (Handle fd) = do
ptr <- mallocPlainForeignPtrBytes size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
withForeignPtr ptr $ \p -> do
-- n <- hGetBufSome h p size
n <- RawIO.read fd p size
let v = Array
{ aStart = ptr
, aEnd = p `plusPtr` n
, aBound = p `plusPtr` size
}
-- XXX shrink only if the diff is significant
-- A.shrinkToFit v
return v
-------------------------------------------------------------------------------
-- Array IO (output)
-------------------------------------------------------------------------------
-- | Write an 'Array' to a file handle.
--
-- @since 0.7.0
{-# INLINABLE writeArray #-}
writeArray :: Storable a => Handle -> Array a -> IO ()
writeArray _ arr | A.length arr == 0 = return ()
writeArray (Handle fd) arr = withForeignPtr (aStart arr) $ \p -> do
RawIO.writeAll fd (castPtr p) aLen
{-
-- Experiment to compare "writev" based IO with "write" based IO.
iov <- A.newArray 1
let iov' = iov {aEnd = aBound iov}
A.writeIndex iov' 0 (RawIO.IOVec (castPtr p) (fromIntegral aLen))
RawIO.writevAll fd (unsafeForeignPtrToPtr (aStart iov')) 1
-}
where
aLen = byteLength arr
-- | Write an array of 'IOVec' to a file handle.
--
-- @since 0.7.0
{-# INLINABLE writeIOVec #-}
writeIOVec :: Handle -> Array RawIO.IOVec -> IO ()
writeIOVec _ iov | A.length iov == 0 = return ()
writeIOVec (Handle fd) iov =
withForeignPtr (aStart iov) $ \p ->
RawIO.writevAll fd p (A.length iov)
-------------------------------------------------------------------------------
-- Stream of Arrays IO
-------------------------------------------------------------------------------
-- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is specified by @size@. The actual size
-- read may be less than or equal to @size@.
{-# INLINABLE _readArraysOfUpto #-}
_readArraysOfUpto :: (IsStream t, MonadIO m)
=> Int -> Handle -> t m (Array Word8)
_readArraysOfUpto size h = go
where
-- XXX use cons/nil instead
go = mkStream $ \_ yld _ stp -> do
arr <- liftIO $ readArrayUpto size h
if A.length arr == 0
then stp
else yld arr go
{-# INLINE_NORMAL readArraysOfUpto #-}
readArraysOfUpto :: (IsStream t, MonadIO m)
=> Int -> Handle -> t m (Array Word8)
readArraysOfUpto size h = D.fromStreamD (D.Stream step ())
where
{-# INLINE_LATE step #-}
step _ _ = do
arr <- liftIO $ readArrayUpto size h
return $
case A.length arr of
0 -> D.Stop
_ -> D.Yield arr ()
-- XXX read 'Array a' instead of Word8
--
-- | @readArrays h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is limited to @defaultChunkSize@.
-- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode'
-- on the 'Handle'.
--
-- > readArrays = readArraysOfUpto defaultChunkSize
--
-- @since 0.7.0
{-# INLINE readArrays #-}
readArrays :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
readArrays = readArraysOfUpto defaultChunkSize
-------------------------------------------------------------------------------
-- Read File to Stream
-------------------------------------------------------------------------------
-- TODO for concurrent streams implement readahead IO. We can send multiple
-- read requests at the same time. For serial case we can use async IO. We can
-- also control the read throughput in mbps or IOPS.
-- | @readByChunksUpto chunkSize handle@ reads a byte stream from a file handle,
-- reads are performed in chunks of up to @chunkSize@. The stream ends as soon
-- as EOF is encountered.
--
{-# INLINE readByChunksUpto #-}
readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8
readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h
-- TODO
-- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a
--
-- > read = 'readByChunks' A.defaultChunkSize
-- | Generate a stream of elements of the given type from a file 'Handle'. The
-- stream ends when EOF is encountered.
--
-- @since 0.7.0
{-# INLINE read #-}
read :: (IsStream t, MonadIO m) => Handle -> t m Word8
read = A.flattenArrays . readArrays
-------------------------------------------------------------------------------
-- Writing
-------------------------------------------------------------------------------
-- | Write a stream of arrays to a handle.
--
-- @since 0.7.0
{-# INLINE writeArrays #-}
writeArrays :: (MonadIO m, Storable a) => Handle -> SerialT m (Array a) -> m ()
writeArrays h m = S.mapM_ (liftIO . writeArray h) m
-- | Write a stream of 'IOVec' arrays to a handle.
--
-- @since 0.7.0
{-# INLINE writev #-}
writev :: MonadIO m => Handle -> SerialT m (Array RawIO.IOVec) -> m ()
writev h m = S.mapM_ (liftIO . writeIOVec h) m
-- | Write a stream of arrays to a handle after coalescing them in chunks of
-- specified size. The chunk size is only a maximum and the actual writes could
-- be smaller than that as we do not split the arrays to fit them to the
-- specified size.
--
-- @since 0.7.0
{-# INLINE writeArraysOfUpto #-}
writeArraysOfUpto :: (MonadIO m, Storable a)
=> Int -> Handle -> SerialT m (Array a) -> m ()
writeArraysOfUpto n h xs = writeArrays h $ A.coalesceChunksOf n xs
-- | Write a stream of arrays to a handle after grouping them in 'IOVec' arrays
-- of up to a maximum total size. Writes are performed using gather IO via
-- @writev@ system call. The maximum number of entries in each 'IOVec' group
-- limited to 512.
--
-- @since 0.7.0
{-# INLINE writevArraysOfUpto #-}
writevArraysOfUpto :: MonadIO m
=> Int -> Handle -> SerialT m (Array a) -> m ()
writevArraysOfUpto n h xs =
writev h $ fromStreamD $ groupIOVecsOf n 512 (toStreamD xs)
-- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes.
--
-- XXX test this
-- Note that if you use a chunk size less than 8K (GHC's default buffer
-- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you
-- do not want buffering to occur at GHC level as well. Same thing applies to
-- writes as well.
-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of
-- input elements.
--
-- @since 0.7.0
{-# INLINE writeByChunksOf #-}
writeByChunksOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
writeByChunksOf n h m = writeArrays h $ A.arraysOf n m
-- > write = 'writeByChunks' A.defaultChunkSize
--
-- | Write a byte stream to a file handle. Combines the bytes in chunks of size
-- up to 'A.defaultChunkSize' before writing. Note that the write behavior
-- depends on the 'IOMode' and the current seek position of the handle.
--
-- @since 0.7.0
{-# INLINE write #-}
write :: MonadIO m => Handle -> SerialT m Word8 -> m ()
write = writeByChunksOf defaultChunkSize
{-
{-# INLINE write #-}
write :: (MonadIO m, Storable a) => Handle -> SerialT m a -> m ()
write = toHandleWith A.defaultChunkSize
-}
-------------------------------------------------------------------------------
-- IO with encoding/decoding Unicode characters
-------------------------------------------------------------------------------
{-
-- |
-- > readUtf8 = decodeUtf8 . read
--
-- Read a UTF8 encoded stream of unicode characters from a file handle.
--
-- @since 0.7.0
{-# INLINE readUtf8 #-}
readUtf8 :: (IsStream t, MonadIO m) => Handle -> t m Char
readUtf8 = decodeUtf8 . read
-- |
-- > writeUtf8 h s = write h $ encodeUtf8 s
--
-- Encode a stream of unicode characters to UTF8 and write it to the given file
-- handle. Default block buffering applies to the writes.
--
-- @since 0.7.0
{-# INLINE writeUtf8 #-}
writeUtf8 :: MonadIO m => Handle -> SerialT m Char -> m ()
writeUtf8 h s = write h $ encodeUtf8 s
-- | Write a stream of unicode characters after encoding to UTF-8 in chunks
-- separated by a linefeed character @'\n'@. If the size of the buffer exceeds
-- @defaultChunkSize@ and a linefeed is not yet found, the buffer is written
-- anyway. This is similar to writing to a 'Handle' with the 'LineBuffering'
-- option.
--
-- @since 0.7.0
{-# INLINE writeUtf8ByLines #-}
writeUtf8ByLines :: (IsStream t, MonadIO m) => Handle -> t m Char -> m ()
writeUtf8ByLines = undefined
-- | Read UTF-8 lines from a file handle and apply the specified fold to each
-- line. This is similar to reading a 'Handle' with the 'LineBuffering' option.
--
-- @since 0.7.0
{-# INLINE readLines #-}
readLines :: (IsStream t, MonadIO m) => Handle -> Fold m Char b -> t m b
readLines h f = foldLines (readUtf8 h) f
-------------------------------------------------------------------------------
-- Framing on a sequence
-------------------------------------------------------------------------------
-- | Read a stream from a file handle and split it into frames delimited by
-- the specified sequence of elements. The supplied fold is applied on each
-- frame.
--
-- @since 0.7.0
{-# INLINE readFrames #-}
readFrames :: (IsStream t, MonadIO m, Storable a)
=> Array a -> Handle -> Fold m a b -> t m b
readFrames = undefined -- foldFrames . read
-- | Write a stream to the given file handle buffering up to frames separated
-- by the given sequence or up to a maximum of @defaultChunkSize@.
--
-- @since 0.7.0
{-# INLINE writeByFrames #-}
writeByFrames :: (IsStream t, MonadIO m, Storable a)
=> Array a -> Handle -> t m a -> m ()
writeByFrames = undefined
-------------------------------------------------------------------------------
-- Random Access IO (Seek)
-------------------------------------------------------------------------------
-- XXX handles could be shared, so we may not want to use the handle state at
-- all for these APIs. we can use pread and pwrite instead. On windows we will
-- need to use readFile/writeFile with an offset argument.
-------------------------------------------------------------------------------
-- | Read the element at the given index treating the file as an array.
--
-- @since 0.7.0
{-# INLINE readIndex #-}
readIndex :: Storable a => Handle -> Int -> Maybe a
readIndex arr i = undefined
-- NOTE: To represent a range to read we have chosen (start, size) instead of
-- (start, end). This helps in removing the ambiguity of whether "end" is
-- included in the range or not.
--
-- We could avoid specifying the range to be read and instead use "take size"
-- on the stream, but it may end up reading more and then consume it partially.
-- | @readSliceWith chunkSize handle pos len@ reads up to @len@ bytes
-- from @handle@ starting at the offset @pos@ from the beginning of the file.
--
-- Reads are performed in chunks of size @chunkSize@. For block devices, to
-- avoid reading partial blocks @chunkSize@ must align with the block size of
-- the underlying device. If the underlying block size is unknown, it is a good
-- idea to keep it a multiple 4KiB. This API ensures that the start of each
-- chunk is aligned with @chunkSize@ from second chunk onwards.
--
{-# INLINE readSliceWith #-}
readSliceWith :: (IsStream t, MonadIO m, Storable a)
=> Int -> Handle -> Int -> Int -> t m a
readSliceWith chunkSize h pos len = undefined
-- | @readSlice h i count@ streams a slice from the file handle @h@ starting
-- at index @i@ and reading up to @count@ elements in the forward direction
-- ending at the index @i + count - 1@.
--
-- @since 0.7.0
{-# INLINE readSlice #-}
readSlice :: (IsStream t, MonadIO m, Storable a)
=> Handle -> Int -> Int -> t m a
readSlice = readSliceWith A.defaultChunkSize
-- | @readSliceRev h i count@ streams a slice from the file handle @h@ starting
-- at index @i@ and reading up to @count@ elements in the reverse direction
-- ending at the index @i - count + 1@.
--
-- @since 0.7.0
{-# INLINE readSliceRev #-}
readSliceRev :: (IsStream t, MonadIO m, Storable a)
=> Handle -> Int -> Int -> t m a
readSliceRev h i count = undefined
-- | Write the given element at the given index in the file.
--
-- @since 0.7.0
{-# INLINE writeIndex #-}
writeIndex :: (MonadIO m, Storable a) => Handle -> Int -> a -> m ()
writeIndex h i a = undefined
-- | @writeSlice h i count stream@ writes a stream to the file handle @h@
-- starting at index @i@ and writing up to @count@ elements in the forward
-- direction ending at the index @i + count - 1@.
--
-- @since 0.7.0
{-# INLINE writeSlice #-}
writeSlice :: (IsStream t, Monad m, Storable a)
=> Handle -> Int -> Int -> t m a -> m ()
writeSlice h i len s = undefined
-- | @writeSliceRev h i count stream@ writes a stream to the file handle @h@
-- starting at index @i@ and writing up to @count@ elements in the reverse
-- direction ending at the index @i - count + 1@.
--
-- @since 0.7.0
{-# INLINE writeSliceRev #-}
writeSliceRev :: (IsStream t, Monad m, Storable a)
=> Handle -> Int -> Int -> t m a -> m ()
writeSliceRev arr i len s = undefined
-}

View File

@ -0,0 +1,200 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.FileSystem.FDIO
-- Copyright : (c) 2019 Composewell Technologies
-- Copyright : (c) 1994-2008 The University of Glasgow
--
-- License : BSD3
-- Maintainer : harendra.kumar@gmail.com
-- Stability : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--
module Streamly.FileSystem.FDIO
( write
, writeAll
, IOVec (..)
, writev
, writevAll
)
where
import Control.Concurrent (threadWaitWrite)
import Control.Monad (when)
import Data.Int (Int64)
import Data.Word (Word8)
import Foreign.C.Error (throwErrnoIfMinus1RetryMayBlock)
import Foreign.C.Types (CSize(..), CInt(..), CBool(..))
import Foreign.Ptr (plusPtr, Ptr)
import System.Posix.Internals (c_write, c_safe_write)
import GHC.IO.FD (FD(..))
import Streamly.FileSystem.IOVec (IOVec(..), c_writev, c_safe_writev)
-------------------------------------------------------------------------------
-- IO Routines
-------------------------------------------------------------------------------
-- See System.POSIX.Internals in GHC base package
-------------------------------------------------------------------------------
-- Write without blocking the underlying OS thread
-------------------------------------------------------------------------------
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
isNonBlocking :: FD -> Bool
isNonBlocking fd = fdIsNonBlocking fd /= 0
#if !defined(mingw32_HOST_OS)
-- "poll"s the fd for data to become available or timeout
-- See cbits/inputReady.c in base package
foreign import ccall unsafe "fdReady"
unsafe_fdReady :: CInt -> CBool -> Int64 -> CBool -> IO CInt
writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
writeNonBlocking loc !fd !buf !off !len
| isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block
| otherwise = do
let isWrite = 1
isSocket = 0
msecs = 0
r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket
when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd))
if threaded then safe_write else unsafe_write
where
do_write call = fromIntegral `fmap`
throwErrnoIfMinus1RetryMayBlock loc call
(threadWaitWrite (fromIntegral (fdFD fd)))
unsafe_write = do_write (c_write (fdFD fd) (buf `plusPtr` off) len)
safe_write = do_write (c_safe_write (fdFD fd) (buf `plusPtr` off) len)
writevNonBlocking :: String -> FD -> Ptr IOVec -> Int -> IO CInt
writevNonBlocking loc !fd !iov !cnt
| isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block
| otherwise = do
let isWrite = 1
isSocket = 0
msecs = 0
r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket
when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd))
if threaded then safe_write else unsafe_write
where
do_write call = fromIntegral `fmap`
throwErrnoIfMinus1RetryMayBlock loc call
(threadWaitWrite (fromIntegral (fdFD fd)))
unsafe_write = do_write (c_writev (fdFD fd) iov (fromIntegral cnt))
safe_write = do_write (c_safe_writev (fdFD fd) iov (fromIntegral cnt))
#else
foreign import WINDOWS_CCONV safe "recv"
c_safe_recv :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt
foreign import WINDOWS_CCONV safe "send"
c_safe_send :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt
blockingWriteRawBufferPtr :: String -> FD -> Ptr Word8-> Int -> CSize -> IO CInt
blockingWriteRawBufferPtr loc !fd !buf !off !len
= throwErrnoIfMinus1Retry loc $ do
let start_ptr = buf `plusPtr` off
send_ret = c_safe_send (fdFD fd) start_ptr (fromIntegral len) 0
write_ret = c_safe_write (fdFD fd) start_ptr (fromIntegral len)
r <- bool write_ret send_ret (fdIsSocket fd)
when (r == -1) c_maperrno
return r
-- We don't trust write() to give us the correct errno, and
-- instead do the errno conversion from GetLastError()
-- ourselves. The main reason is that we treat ERROR_NO_DATA
-- (pipe is closing) as EPIPE, whereas write() returns EINVAL
-- for this case. We need to detect EPIPE correctly, because it
-- shouldn't be reported as an error when it happens on stdout.
-- As for send()'s case, Winsock functions don't do errno
-- conversion in any case so we have to do it ourselves.
-- That means we're doing the errno conversion no matter if the
-- fd is from a socket or not.
-- NOTE: "safe" versions of the read/write calls for use by the threaded RTS.
-- These calls may block, but that's ok.
asyncWriteRawBufferPtr :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
asyncWriteRawBufferPtr loc !fd !buf !off !len = do
(l, rc) <- asyncWrite (fromIntegral (fdFD fd)) (fdIsSocket_ fd)
(fromIntegral len) (buf `plusPtr` off)
if l == (-1)
then let sock_errno = c_maperrno_func (fromIntegral rc)
non_sock_errno = Errno (fromIntegral rc)
errno = bool non_sock_errno sock_errno (fdIsSocket fd)
in ioError (errnoToIOError loc errno Nothing Nothing)
else return (fromIntegral l)
writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
writeNonBlocking loc !fd !buf !off !len
| threaded = blockingWriteRawBufferPtr loc fd buf off len
| otherwise = asyncWriteRawBufferPtr loc fd buf off len
#endif
-- | @write FD buffer offset length@ tries to write data on the given
-- filesystem fd (cannot be a socket) up to sepcified length starting from the
-- given offset in the buffer. The write will not block the OS thread, it may
-- suspend the Haskell thread until write can proceed. Returns the actual
-- amount of data written.
write :: FD -> Ptr Word8 -> Int -> CSize -> IO CInt
write = writeNonBlocking "Streamly.FileSystem.FDIO"
-- XXX sendAll for sockets has a similar code, we can deduplicate the two.
-- XXX we need to check the errno to determine if the loop should continue. For
-- example, write may return without writing all data if the process file-size
-- limit has reached, in that case keep writing in a loop is fruitless.
--
-- | Keep writing in a loop until all data in the buffer has been written.
writeAll :: FD -> Ptr Word8 -> Int -> IO ()
writeAll fd ptr bytes = do
res <- write fd ptr 0 (fromIntegral bytes)
let res' = fromIntegral res
if res' < bytes
then writeAll fd (ptr `plusPtr` res') (bytes - res')
else return ()
-------------------------------------------------------------------------------
-- Vector IO
-------------------------------------------------------------------------------
-- | @write FD iovec count@ tries to write data on the given filesystem fd
-- (cannot be a socket) from an iovec with specified number of entries. The
-- write will not block the OS thread, it may suspend the Haskell thread until
-- write can proceed. Returns the actual amount of data written.
writev :: FD -> Ptr IOVec -> Int -> IO CInt
writev = writevNonBlocking "Streamly.FileSystem.FDIO"
-- | Keep writing an iovec in a loop until all the iovec entries are written.
writevAll :: FD -> Ptr IOVec -> Int -> IO ()
writevAll fd iovec count = do
_res <- writev fd iovec count
{-
let res' = fromIntegral res
totalBytes = countIOVecBytes
if res' < totalBytes
then do
let iovec' = createModifiedIOVec
count' = ...
writeAll fd iovec' count'
else return ()
-}
return ()

View File

@ -63,7 +63,7 @@ module Streamly.FileSystem.File
-- -- * Array Read
-- , readArrayOf
, readArraysUpto
, readArraysOfUpto
-- , readArraysOf
, readArrays
@ -183,26 +183,26 @@ appendArray file arr = SIO.withFile file AppendMode (\h -> FH.writeArray h arr)
-- Stream of Arrays IO
-------------------------------------------------------------------------------
-- | @readArraysUpto size file@ reads a stream of arrays from file @file@.
-- | @readArraysOfUpto size file@ reads a stream of arrays from file @file@.
-- The maximum size of a single array is specified by @size@. The actual size
-- read may be less than or equal to @size@.
{-# INLINABLE readArraysUpto #-}
readArraysUpto :: (IsStream t, MonadCatch m, MonadIO m)
{-# INLINABLE readArraysOfUpto #-}
readArraysOfUpto :: (IsStream t, MonadCatch m, MonadIO m)
=> Int -> FilePath -> t m (Array Word8)
readArraysUpto size file = withFile file ReadMode (FH.readArraysUpto size)
readArraysOfUpto size file = withFile file ReadMode (FH.readArraysOfUpto size)
-- XXX read 'Array a' instead of Word8
--
-- | @readArrays file@ reads a stream of arrays from file @file@.
-- The maximum size of a single array is limited to @defaultChunkSize@.
--
-- > readArrays = readArraysUpto defaultChunkSize
-- > readArrays = readArraysOfUpto defaultChunkSize
--
-- @since 0.7.0
{-# INLINE readArrays #-}
readArrays :: (IsStream t, MonadCatch m, MonadIO m)
=> FilePath -> t m (Array Word8)
readArrays = readArraysUpto A.defaultChunkSize
readArrays = readArraysOfUpto A.defaultChunkSize
-------------------------------------------------------------------------------
-- Read File to Stream
@ -219,7 +219,7 @@ readArrays = readArraysUpto A.defaultChunkSize
--
{-# INLINE readByChunksUpto #-}
readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8
readByChunksUpto chunkSize h = A.flattenArrays $ readArraysUpto chunkSize h
readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h
-}
-- TODO

View File

@ -53,13 +53,13 @@ module Streamly.FileSystem.Handle
-- , readUtf8
-- , readLines
-- , readFrames
-- , readByChunks
, readByChunksUpto
-- -- * Array Read
-- , readArrayUpto
-- , readArrayOf
, readArraysUpto
, readArraysOfUpto
-- , readArraysOf
, readArrays
@ -68,7 +68,7 @@ module Streamly.FileSystem.Handle
-- , writeUtf8
-- , writeUtf8ByLines
-- , writeByFrames
, writeByChunks
, writeByChunksOf
-- -- * Array Write
, writeArray
@ -172,13 +172,13 @@ writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen
-- Stream of Arrays IO
-------------------------------------------------------------------------------
-- | @readArraysUpto size h@ reads a stream of arrays from file handle @h@.
-- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is specified by @size@. The actual size
-- read may be less than or equal to @size@.
{-# INLINABLE _readArraysUpto #-}
_readArraysUpto :: (IsStream t, MonadIO m)
{-# INLINABLE _readArraysOfUpto #-}
_readArraysOfUpto :: (IsStream t, MonadIO m)
=> Int -> Handle -> t m (Array Word8)
_readArraysUpto size h = go
_readArraysOfUpto size h = go
where
-- XXX use cons/nil instead
go = mkStream $ \_ yld _ stp -> do
@ -187,9 +187,9 @@ _readArraysUpto size h = go
then stp
else yld arr go
{-# INLINE_NORMAL readArraysUpto #-}
readArraysUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8)
readArraysUpto size h = D.fromStreamD (D.Stream step ())
{-# INLINE_NORMAL readArraysOfUpto #-}
readArraysOfUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8)
readArraysOfUpto size h = D.fromStreamD (D.Stream step ())
where
{-# INLINE_LATE step #-}
step _ _ = do
@ -206,12 +206,12 @@ readArraysUpto size h = D.fromStreamD (D.Stream step ())
-- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode'
-- on the 'Handle'.
--
-- > readArrays = readArraysUpto defaultChunkSize
-- > readArrays = readArraysOfUpto defaultChunkSize
--
-- @since 0.7.0
{-# INLINE readArrays #-}
readArrays :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
readArrays = readArraysUpto A.defaultChunkSize
readArrays = readArraysOfUpto A.defaultChunkSize
-------------------------------------------------------------------------------
-- Read File to Stream
@ -221,15 +221,13 @@ readArrays = readArraysUpto A.defaultChunkSize
-- read requests at the same time. For serial case we can use async IO. We can
-- also control the read throughput in mbps or IOPS.
{-
-- | @readByChunksUpto chunkSize handle@ reads a byte stream from a file
-- handle, reads are performed in chunks of up to @chunkSize@. The stream ends
-- as soon as EOF is encountered.
--
{-# INLINE readByChunksUpto #-}
readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8
readByChunksUpto chunkSize h = A.flattenArrays $ readArraysUpto chunkSize h
-}
readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h
-- TODO
-- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a
@ -267,11 +265,11 @@ writeArrays h m = S.mapM_ (liftIO . writeArray h) m
-- input elements.
--
-- @since 0.7.0
{-# INLINE writeByChunks #-}
writeByChunks :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
writeByChunks n h m = writeArrays h $ A.arraysOf n m
{-# INLINE writeByChunksOf #-}
writeByChunksOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
writeByChunksOf n h m = writeArrays h $ A.arraysOf n m
-- > write = 'writeByChunks' A.defaultChunkSize
-- > write = 'writeByChunksOf' A.defaultChunkSize
--
-- | Write a byte stream to a file handle. Combines the bytes in chunks of size
-- up to 'A.defaultChunkSize' before writing. Note that the write behavior
@ -280,7 +278,7 @@ writeByChunks n h m = writeArrays h $ A.arraysOf n m
-- @since 0.7.0
{-# INLINE write #-}
write :: MonadIO m => Handle -> SerialT m Word8 -> m ()
write = writeByChunks A.defaultChunkSize
write = writeByChunksOf A.defaultChunkSize
{-
{-# INLINE write #-}

View File

@ -0,0 +1,66 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CApiFFI #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- |
-- Module : Streamly.FileSystem.IOVec
-- Copyright : (c) 2019 Composewell Technologies
--
-- License : BSD3
-- Maintainer : harendra.kumar@gmail.com
-- Stability : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--
module Streamly.FileSystem.IOVec
( IOVec(..)
, c_writev
, c_safe_writev
)
where
import Data.Word (Word8, Word64)
import Foreign.C.Types (CInt(..))
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import System.Posix.Types (CSsize(..))
-------------------------------------------------------------------------------
-- IOVec
-------------------------------------------------------------------------------
#if !defined(mingw32_HOST_OS)
#include <sys/uio.h>
data IOVec = IOVec
{ iovBase :: {-# UNPACK #-} !(Ptr Word8)
, iovLen :: {-# UNPACK #-} !Word64
} deriving (Eq, Show)
instance Storable IOVec where
sizeOf _ = #{size struct iovec}
alignment _ = #{alignment struct iovec}
peek ptr = do
base <- #{peek struct iovec, iov_base} ptr
len :: #{type size_t} <- #{peek struct iovec, iov_len} ptr
return $ IOVec base len
poke ptr vec = do
let base = iovBase vec
len :: #{type size_t} = iovLen vec
#{poke struct iovec, iov_base} ptr base
#{poke struct iovec, iov_len} ptr len
foreign import capi unsafe "sys/uio.h writev"
c_writev :: CInt -> Ptr IOVec -> CInt -> IO CSsize
foreign import capi safe "sys/uio.h writev"
c_safe_writev :: CInt -> Ptr IOVec -> CInt -> IO CSsize
#else
c_writev = error "writev not implemented for windows"
c_safe_writev = error "writev not implemented for windows"
#endif

View File

@ -93,27 +93,32 @@ module Streamly.Mem.Array
-- , newArray
, writeN
, write
, fromListN
, fromList
, A.fromListN
, A.fromList
-- Folds
, toArrayN
, A.toArrayN
-- , toArrays
, toArray
-- Streams
-- Streams of arrays
, arraysOf
-- * Elimination
-- 'GHC.Exts.toList' from "GHC.Exts" can be used to convert an array to a
-- list.
, read
, A.read
, readRev
, toList
, A.toList
-- Streams of arrays
, flattenArrays
-- , flattenArraysRev
, spliceArrays
, coalesceArrays
, coalesceChunksOf
, unlinesArraysBy
, splitArraysOn
-- * Random Access
, length
@ -123,8 +128,10 @@ module Streamly.Mem.Array
{-
, readSlice
, readSliceRev
-}
, writeIndex
{-
, writeSlice
, writeSliceRev
-}
@ -140,13 +147,14 @@ where
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Functor.Identity (Identity)
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read)
import Streamly.Mem.Array.Types hiding (flattenArrays, newArray)
import Streamly.Mem.Array.Types (Array(..), length)
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK.Type (IsStream)
import Streamly.Fold.Types (Fold(..))
@ -178,7 +186,7 @@ newArray len = undefined
writeN :: (MonadIO m, Storable a) => Int -> SerialT m a -> m (Array a)
writeN n m = do
if n < 0 then error "writeN: negative write count specified" else return ()
fromStreamDN n $ D.toStreamD m
A.fromStreamDN n $ D.toStreamD m
-------------------------------------------------------------------------------
-- Elimination
@ -189,7 +197,7 @@ writeN n m = do
-- @since 0.7.0
{-# INLINE_EARLY readRev #-}
readRev :: (Monad m, IsStream t, Storable a) => Array a -> t m a
readRev = D.fromStreamD . toStreamDRev
readRev = D.fromStreamD . A.toStreamDRev
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.readRev fallback to StreamK" [1]
-- forall a. S.toStreamK (readRev a) = K.revFromArray a #-}
@ -199,7 +207,7 @@ _null :: Storable a => Array a -> Bool
_null arr = length arr == 0
{-# INLINE _last #-}
_last :: forall a. Storable a => Array a -> Maybe a
_last :: (MonadIO m, Storable a) => Array a -> m (Maybe a)
_last arr = readIndex arr (length arr - 1)
-------------------------------------------------------------------------------
@ -306,11 +314,12 @@ foldbWith level f = undefined
--
-- @since 0.7.0
{-# INLINE readIndex #-}
readIndex :: Storable a => Array a -> Int -> Maybe a
readIndex :: (MonadIO m, Storable a) => Array a -> Int -> m (Maybe a)
readIndex arr i =
if i < 0 || i > length arr - 1
then Nothing
else Just $ unsafeIndex arr i
then return Nothing
else liftIO $ withForeignPtr (aStart arr) $ \p ->
fmap Just $ peekElemOff p i
{-
-- | @readSlice arr i count@ streams a slice of the array @arr@ starting
@ -332,14 +341,25 @@ readSlice arr i len = undefined
readSliceRev :: (IsStream t, Monad m, Storable a)
=> Array a -> Int -> Int -> t m a
readSliceRev arr i len = undefined
-}
-- | /O(1)/ Write the given element at the given index in the array.
--
-- @since 0.7.0
{-# INLINE writeIndex #-}
writeIndex :: (MonadIO m, Storable a) => Array a -> Int -> a -> m ()
writeIndex arr i a = undefined
writeIndex arr i a = do
let maxIndex = length arr - 1
if i < 0
then error "writeIndex: negative array index"
else if i > maxIndex
then error $ "writeIndex: specified array index " ++ show i
++ " is beyond the maximum index " ++ show maxIndex
else
liftIO $ withForeignPtr (aStart arr) $ \p ->
pokeElemOff p i a
{-
-- | @writeSlice arr i count stream@ writes a stream to the array @arr@
-- starting at index @i@ and writing up to @count@ elements in the forward
-- direction ending at the index @i + count - 1@.
@ -381,7 +401,7 @@ toArraysInRange low high (Fold step initial extract) =
{-# INLINE _toArraysOf #-}
_toArraysOf :: (MonadIO m, Storable a)
=> Int -> Fold m a (SerialT Identity (Array a))
_toArraysOf n = FL.lchunksOf n (toArrayN n) FL.toStream
_toArraysOf n = FL.lchunksOf n (A.toArrayN n) FL.toStream
-- XXX The realloc based implementation needs to make one extra copy if we use
-- shrinkToFit. On the other hand, the stream of arrays implementation may
@ -401,7 +421,7 @@ bytesToCount x n =
let elemSize = sizeOf x
in n + elemSize - 1 `div` elemSize
{-# INLINE toArrayMinChunk #-}
{-# INLINE_NORMAL toArrayMinChunk #-}
toArrayMinChunk :: forall m a. (MonadIO m, Storable a)
=> Int -> Fold m a (Array a)
-- toArrayMinChunk n = FL.mapM spliceArrays $ toArraysOf n
@ -416,11 +436,14 @@ toArrayMinChunk elemCount = Fold step initial extract
initial = do
when (elemCount < 0) $ error "toArrayMinChunk: elemCount is negative"
liftIO $ A.newArray elemCount
step arr@(Array _ end bound) x | end == bound = do
arr1 <- liftIO $ reallocDouble 1 arr
step arr@(Array start end bound) x | end == bound = do
let p = unsafeForeignPtrToPtr start
oldSize = end `minusPtr` p
newSize = max (oldSize * 2) 1
arr1 <- liftIO $ A.realloc newSize arr
insertElem arr1 x
step arr x = insertElem arr x
extract = liftIO . shrinkToFit
extract = liftIO . A.shrinkToFit
-- | Fold the whole input to a single array.
--
@ -429,7 +452,7 @@ toArrayMinChunk elemCount = Fold step initial extract
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: forall m a. (MonadIO m, Storable a) => Fold m a (Array a)
toArray = toArrayMinChunk (bytesToCount (undefined :: a) (mkChunkSize 1024))
toArray = toArrayMinChunk (bytesToCount (undefined :: a) (A.mkChunkSize 1024))
-- | Convert a stream of arrays into a stream of their elements.
--
@ -449,6 +472,39 @@ flattenArrays m = D.fromStreamD $ A.flattenArrays (D.toStreamD m)
_flattenArraysRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
_flattenArraysRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m)
-- XXX use an Array instead as separator? Or use a separate unlinesArraysBySeq
-- API for that?
--
-- | Flatten a stream of arrays appending the given element after each
-- array.
--
-- @since 0.7.0
{-# INLINE unlinesArraysBy #-}
unlinesArraysBy :: (MonadIO m, IsStream t, Storable a)
=> a -> t m (Array a) -> t m a
unlinesArraysBy x = D.fromStreamD . A.unlines x . D.toStreamD
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE splitArraysOn #-}
splitArraysOn
:: (IsStream t, MonadIO m)
=> Word8
-> t m (Array Word8)
-> t m (Array Word8)
splitArraysOn byte s = D.fromStreamD $ A.splitOn byte $ D.toStreamD s
-- | Coalesce adajcent arrays in incoming stream to form bigger arrays of a
-- maximum specified size.
--
-- @since 0.7.0
{-# INLINE coalesceChunksOf #-}
coalesceChunksOf :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> SerialT m (Array a)
coalesceChunksOf n xs = D.fromStreamD $ A.coalesceChunksOf n (D.toStreamD xs)
-- |
-- > arraysOf n = FL.groupsOf n (FL.toArrayN n)
--
@ -459,18 +515,17 @@ _flattenArraysRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m)
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf n str =
D.fromStreamD $ fromStreamDArraysOf n (D.toStreamD str)
D.fromStreamD $ A.fromStreamDArraysOf n (D.toStreamD str)
-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.
{-# INLINE _spliceArraysRealloced #-}
_spliceArraysRealloced :: (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
_spliceArraysRealloced s = do
buffered <- P.foldr S.cons S.nil s
len <- S.sum (S.map length buffered)
-- CAUTION! length must more than equal to lengths of all the arrays in the
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe len buffered = do
arr <- liftIO $ A.newArray len
end <- S.foldlM' writeArr (aEnd arr) buffered
return $ arr {aEnd = end}
@ -479,42 +534,36 @@ _spliceArraysRealloced s = do
writeArr dst Array{..} =
liftIO $ withForeignPtr aStart $ \src -> do
let len = aEnd `minusPtr` src
memcpy (castPtr dst) (castPtr src) len
return $ dst `plusPtr` len
let count = aEnd `minusPtr` src
A.memcpy (castPtr dst) (castPtr src) count
return $ dst `plusPtr` count
{-# INLINE spliceArraysBuffered #-}
spliceArraysBuffered :: forall m a. (MonadIO m, Storable a)
{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
spliceArraysBuffered s = do
_spliceArraysBuffered s = do
buffered <- P.foldr S.cons S.nil s
len <- S.sum (S.map length buffered)
spliceArraysLenUnsafe len s
{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
spliceArraysRealloced s = do
idst <- liftIO $ A.newArray (bytesToCount (undefined :: a)
(mkChunkSizeKB 4))
arr <- S.foldlM' appendArr idst s
liftIO $ shrinkToFit arr
(A.mkChunkSizeKB 4))
where
appendArr dst@(Array _ end bound) src = liftIO $ do
let srcLen = aEnd src `minusPtr` unsafeForeignPtrToPtr (aStart src)
dst1 <-
if end `plusPtr` srcLen >= bound
then reallocDouble srcLen dst
else return dst
withForeignPtr (aStart dst1) $ \_ -> do
withForeignPtr (aStart src) $ \psrc -> do
let pdst = aEnd dst1
memcpy (castPtr pdst) (castPtr psrc) srcLen
return $ dst1 { aEnd = pdst `plusPtr` srcLen }
arr <- S.foldlM' A.spliceWithDoubling idst s
liftIO $ A.shrinkToFit arr
-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE spliceArrays #-}
spliceArrays :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
spliceArrays = spliceArraysBuffered
-- spliceArrays = _spliceArraysRealloced
{-# INLINE coalesceArrays #-}
coalesceArrays :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
coalesceArrays = spliceArraysRealloced
-- spliceArrays = _spliceArraysBuffered
-- | Create an 'Array' from a stream. This is useful when we want to create a
-- single array from a stream of unknown size. 'writeN' is at least twice
@ -548,18 +597,18 @@ write = FL.foldl' toArray
{-# INLINE transformWith #-}
transformWith :: (MonadIO m, Storable a, Storable b)
=> (SerialT m a -> SerialT m b) -> Array a -> m (Array b)
transformWith f arr = FL.foldl' (toArrayMinChunk (length arr)) $ f (read arr)
transformWith f arr = FL.foldl' (toArrayMinChunk (length arr)) $ f (A.read arr)
-- | Fold an array using a 'Fold'.
--
-- @since 0.7.0
{-# INLINE foldArray #-}
foldArray :: (MonadIO m, Storable a) => Fold m a b -> Array a -> m b
foldArray f arr = FL.foldl' f (read arr)
foldArray f arr = FL.foldl' f (A.read arr)
-- | Fold an array using a stream fold operation.
--
-- @since 0.7.0
{-# INLINE foldWith #-}
foldWith :: (MonadIO m, Storable a) => (SerialT m a -> m b) -> Array a -> m b
foldWith f arr = f (read arr)
foldWith f arr = f (A.read arr)

View File

@ -23,29 +23,34 @@ module Streamly.Mem.Array.Types
Array (..)
-- * Construction
, unsafeInlineIO
, withNewArray
, newArray
, unsafeSnoc
, snoc
, shrinkToFit
, memcpy
, memcmp
, spliceWithDoubling
, fromList
, fromListN
, fromStreamDN
-- , fromStreamD
-- * Streams of arrays
, fromStreamDArraysOf
, flattenArrays
, flattenArraysRev
, coalesceChunksOf
, groupIOVecsOf
, splitOn
-- * Elimination
, unsafeIndexIO
, unsafeIndex
, length
, byteLength
, byteCapacity
, foldl'
, foldr
, splitAt
, toStreamD
, toStreamDRev
@ -59,7 +64,11 @@ module Streamly.Mem.Array.Types
, defaultChunkSize
, mkChunkSize
, mkChunkSizeKB
, reallocDouble
, unsafeInlineIO
, realloc
, shrinkToFit
, memcpy
, memcmp
, unlines
)
@ -74,11 +83,11 @@ import Data.Word (Word8)
import Foreign.C.String (CString)
import Foreign.C.Types (CSize(..), CInt(..))
import Foreign.ForeignPtr
(ForeignPtr, withForeignPtr, touchForeignPtr)
(ForeignPtr, withForeignPtr, touchForeignPtr, plusForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (plusPtr, minusPtr, castPtr)
import Foreign.Ptr (plusPtr, minusPtr, castPtr, nullPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, foldr, read, unlines, unwords)
import Prelude hiding (length, foldr, read, unlines, splitAt)
import Text.Read (readPrec, readListPrec, readListPrecDefault)
import GHC.Base (Addr#, realWorld#)
@ -89,6 +98,7 @@ import GHC.Ptr (Ptr(..))
import Streamly.Fold.Types (Fold(..))
import Streamly.SVar (adaptState)
import Streamly.FileSystem.FDIO (IOVec(..))
import qualified Streamly.Streams.StreamD.Type as D
import qualified Streamly.Streams.StreamK as K
@ -165,6 +175,9 @@ foreign import ccall unsafe "string.h memcpy" c_memcpy
foreign import ccall unsafe "string.h strlen" c_strlen
:: CString -> IO CSize
foreign import ccall unsafe "string.h memchr" c_memchr
:: Ptr Word8 -> Word8 -> CSize -> IO (Ptr Word8)
-- XXX we are converting Int to CSize
memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO ()
memcpy dst src len = c_memcpy dst src (fromIntegral len) >> return ()
@ -222,14 +235,14 @@ withNewArray count f = do
unsafeSnoc :: forall a. Storable a => Array a -> a -> IO (Array a)
unsafeSnoc arr@Array{..} x = do
when (aEnd == aBound) $
error "BUG: unsafeAppend: writing beyond array bounds"
error "BUG: unsafeSnoc: writing beyond array bounds"
poke aEnd x
touchForeignPtr aStart
return $ arr {aEnd = aEnd `plusPtr` (sizeOf (undefined :: a))}
{-# INLINE snoc #-}
snoc :: forall a. Storable a => Array a -> a -> Array a
snoc arr@Array {..} x = unsafePerformIO $
snoc :: forall a. Storable a => Array a -> a -> IO (Array a)
snoc arr@Array {..} x = do
if (aEnd == aBound)
then do
let oldStart = unsafeForeignPtrToPtr aStart
@ -250,37 +263,14 @@ snoc arr@Array {..} x = unsafePerformIO $
touchForeignPtr aStart
return $ arr {aEnd = aEnd `plusPtr` (sizeOf (undefined :: a))}
-- | Remove the free space from an Array.
shrinkToFit :: forall a. Storable a => Array a -> IO (Array a)
shrinkToFit arr@Array{..} = do
assert (aEnd <= aBound) (return ())
if aEnd /= aBound
then do
let oldStart = unsafeForeignPtrToPtr aStart
let size = aEnd `minusPtr` oldStart
newPtr <- mallocPlainForeignPtrAlignedBytes
size (alignment (undefined :: a))
withForeignPtr newPtr $ \pNew -> do
memcpy (castPtr pNew) (castPtr oldStart) size
touchForeignPtr aStart
let end = pNew `plusPtr` size
return $ Array
{ aStart = newPtr
, aEnd = end
, aBound = end
}
else return arr
-- | Expand the free space in the array doubling the size with a minimum
-- exapnsion of the specified amount of bytes.
{-# NOINLINE reallocDouble #-}
reallocDouble :: forall a. Storable a => Int -> Array a -> IO (Array a)
reallocDouble minIncrease Array{..} = do
-- | Reallocate the array to the specified size in bytes. If the size is less
-- than the original array the array gets truncated.
{-# NOINLINE realloc #-}
realloc :: forall a. Storable a => Int -> Array a -> IO (Array a)
realloc newSize Array{..} = do
assert (aEnd <= aBound) (return ())
let oldStart = unsafeForeignPtrToPtr aStart
let size = aEnd `minusPtr` oldStart
newSize = max (size * 2) (size + minIncrease)
newPtr <- mallocPlainForeignPtrAlignedBytes
newSize (alignment (undefined :: a))
withForeignPtr newPtr $ \pNew -> do
@ -292,6 +282,17 @@ reallocDouble minIncrease Array{..} = do
, aBound = pNew `plusPtr` newSize
}
-- | Remove the free space from an Array.
shrinkToFit :: forall a. Storable a => Array a -> IO (Array a)
shrinkToFit arr@Array{..} = do
assert (aEnd <= aBound) (return ())
if aEnd /= aBound
then do
let oldStart = unsafeForeignPtrToPtr aStart
let size = aEnd `minusPtr` oldStart
realloc size arr
else return arr
-- XXX when converting an array of Word8 from a literal string we can simply
-- refer to the literal string. Is it possible to write rules such that
-- fromList Word8 can be rewritten so that GHC does not first convert the
@ -340,15 +341,30 @@ unsafeIndexIO Array {..} i =
unsafeIndex :: forall a. Storable a => Array a -> Int -> a
unsafeIndex arr i = let !r = unsafeInlineIO $ unsafeIndexIO arr i in r
-- | /O(1)/ Get the length of the array.
-- | /O(1)/ Get the byte length of the array.
--
-- @since 0.7.0
{-# INLINE byteLength #-}
byteLength :: Array a -> Int
byteLength Array{..} =
let p = unsafeForeignPtrToPtr aStart
len = aEnd `minusPtr` p
in assert (len >= 0) len
-- | /O(1)/ Get the length of the array i.e. the number of elements in the
-- array.
--
-- @since 0.7.0
{-# INLINE length #-}
length :: forall a. Storable a => Array a -> Int
length Array{..} =
length arr = byteLength arr `div` sizeOf (undefined :: a)
{-# INLINE byteCapacity #-}
byteCapacity :: Array a -> Int
byteCapacity Array{..} =
let p = unsafeForeignPtrToPtr aStart
aLen = aEnd `minusPtr` p
in assert (aLen >= 0) (aLen `div` sizeOf (undefined :: a))
len = aBound `minusPtr` p
in assert (len >= 0) len
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
@ -729,8 +745,9 @@ defaultChunkSize :: Int
defaultChunkSize = mkChunkSizeKB 32
{-# INLINE_NORMAL unlines #-}
unlines :: MonadIO m => D.Stream m (Array Char) -> D.Stream m Char
unlines (D.Stream step state) = D.Stream step' (OuterLoop state)
unlines :: forall m a. (MonadIO m, Storable a)
=> a -> D.Stream m (Array a) -> D.Stream m a
unlines sep (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
@ -743,7 +760,7 @@ unlines (D.Stream step state) = D.Stream step' (OuterLoop state)
D.Stop -> D.Stop
step' _ (InnerLoop st _ p end) | p == end =
return $ D.Yield '\n' $ OuterLoop st
return $ D.Yield sep $ OuterLoop st
step' _ (InnerLoop st startf p end) = do
x <- liftIO $ do
@ -751,4 +768,272 @@ unlines (D.Stream step state) = D.Stream step' (OuterLoop state)
touchForeignPtr startf
return r
return $ D.Yield x (InnerLoop st startf
(p `plusPtr` (sizeOf (undefined :: Char))) end)
(p `plusPtr` (sizeOf (undefined :: a))) end)
{-# INLINE spliceTwo #-}
spliceTwo :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a)
spliceTwo arr1 arr2 = do
let src1 = unsafeForeignPtrToPtr (aStart arr1)
src2 = unsafeForeignPtrToPtr (aStart arr2)
len1 = aEnd arr1 `minusPtr` src1
len2 = aEnd arr2 `minusPtr` src2
arr <- liftIO $ newArray (len1 + len2)
let dst = unsafeForeignPtrToPtr (aStart arr)
liftIO $ do
memcpy (castPtr dst) (castPtr src1) len1
touchForeignPtr (aStart arr1)
memcpy (castPtr (dst `plusPtr` len1)) (castPtr src2) len2
touchForeignPtr (aStart arr2)
return arr { aEnd = dst `plusPtr` (len1 + len2) }
-- Splice a new array into a pre-reserved array. The user must ensure that
-- there is enough space in the array.
{-# INLINE spliceWith #-}
spliceWith :: (MonadIO m) => Array a -> Array a -> m (Array a)
spliceWith dst@(Array _ end bound) src = liftIO $ do
let srcLen = byteLength src
if end `plusPtr` srcLen > bound
then error "Bug: spliceIntoUnsafe: Not enough space in the target array"
else
withForeignPtr (aStart dst) $ \_ -> do
withForeignPtr (aStart src) $ \psrc -> do
let pdst = aEnd dst
memcpy (castPtr pdst) (castPtr psrc) srcLen
return $ dst { aEnd = pdst `plusPtr` srcLen }
-- Splice a new array into a preallocated array, doubling the space if there is
-- no space in the target array.
{-# INLINE spliceWithDoubling #-}
spliceWithDoubling :: (MonadIO m, Storable a)
=> Array a -> Array a -> m (Array a)
spliceWithDoubling dst@(Array start end bound) src = do
assert (end <= bound) (return ())
let srcLen = aEnd src `minusPtr` unsafeForeignPtrToPtr (aStart src)
dst1 <-
if end `plusPtr` srcLen >= bound
then do
let oldStart = unsafeForeignPtrToPtr start
oldSize = end `minusPtr` oldStart
newSize = max (oldSize * 2) (oldSize + srcLen)
liftIO $ realloc newSize dst
else return dst
spliceWith dst1 src
data SpliceState s arr
= SpliceInitial s
| SpliceBuffering s arr
| SpliceYielding arr (SpliceState s arr)
| SpliceFinish
-- XXX can use general grouping combinators to achieve this?
-- | Coalesce adajcent arrays in incoming stream to form bigger arrays of a
-- maximum specified size.
--
-- @since 0.7.0
{-# INLINE_NORMAL coalesceChunksOf #-}
coalesceChunksOf :: (MonadIO m, Storable a)
=> Int -> D.Stream m (Array a) -> D.Stream m (Array a)
coalesceChunksOf n (D.Stream step state) = D.Stream step' (SpliceInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (SpliceInitial st) = do
r <- step gst st
case r of
D.Yield arr s -> return $
let len = byteLength arr
in if len >= n
then D.Skip (SpliceYielding arr (SpliceInitial s))
else D.Skip (SpliceBuffering s arr)
D.Skip s -> return $ D.Skip (SpliceInitial s)
D.Stop -> return $ D.Stop
step' gst (SpliceBuffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
let len = byteLength buf + byteLength arr
if len > n
then return $ D.Skip (SpliceYielding buf (SpliceBuffering s arr))
else do
buf' <- if byteCapacity buf < n
then liftIO $ realloc n buf
else return buf
buf'' <- spliceWith buf' arr
return $ D.Skip (SpliceBuffering s buf'')
D.Skip s -> return $ D.Skip (SpliceBuffering s buf)
D.Stop -> return $ D.Skip (SpliceYielding buf SpliceFinish)
step' _ SpliceFinish = return D.Stop
step' _ (SpliceYielding arr next) = return $ D.Yield arr next
data GatherState s arr
= GatherInitial s
| GatherBuffering s arr Int
| GatherYielding arr (GatherState s arr)
| GatherFinish
-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOf n maxIOVLen (D.Stream step state) =
D.Stream step' (GatherInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (GatherInitial st) = do
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
len = byteLength arr
iov <- liftIO $ newArray maxIOVLen
iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral len))
if len >= n
then return $ D.Skip (GatherYielding iov' (GatherInitial s))
else return $ D.Skip (GatherBuffering s iov' len)
D.Skip s -> return $ D.Skip (GatherInitial s)
D.Stop -> return $ D.Stop
step' gst (GatherBuffering st iov len) = do
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
alen = byteLength arr
len' = len + alen
if len' > n || length iov >= maxIOVLen
then do
iov' <- liftIO $ newArray maxIOVLen
iov'' <- liftIO $ unsafeSnoc iov' (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherYielding iov
(GatherBuffering s iov'' alen))
else do
iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherBuffering s iov' len')
D.Skip s -> return $ D.Skip (GatherBuffering s iov len)
D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish)
step' _ GatherFinish = return D.Stop
step' _ (GatherYielding iov next) = return $ D.Yield iov next
-- | Create two slices of an array without copying the original array. The
-- specified index @i@ is the first index of the second slice.
--
-- @since 0.7.0
splitAt :: forall a. Storable a => Int -> Array a -> (Array a, Array a)
splitAt i arr@Array{..} =
let maxIndex = length arr - 1
in if i < 0
then error "sliceAt: negative array index"
else if i > maxIndex
then error $ "sliceAt: specified array index " ++ show i
++ " is beyond the maximum index " ++ show maxIndex
else let off = i * sizeOf (undefined :: a)
p = unsafeForeignPtrToPtr aStart `plusPtr` off
in ( Array
{ aStart = aStart
, aEnd = p
, aBound = p
}
, Array
{ aStart = aStart `plusForeignPtr` off
, aEnd = aEnd
, aBound = aBound
}
)
-- Drops the separator byte
{-# INLINE breakOn #-}
breakOn :: MonadIO m
=> Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
breakOn sep arr@Array{..} = liftIO $ do
let p = unsafeForeignPtrToPtr aStart
loc <- c_memchr p sep (fromIntegral $ aEnd `minusPtr` p)
return $
if loc == nullPtr
then (arr, Nothing)
else
( Array
{ aStart = aStart
, aEnd = loc
, aBound = loc
}
, Just $ Array
{ aStart = aStart `plusForeignPtr` (loc `minusPtr` p + 1)
, aEnd = aEnd
, aBound = aBound
}
)
data SplitState s arr
= Initial s
| Buffering s arr
| Splitting s arr
| Yielding arr (SplitState s arr)
| Finishing
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE_NORMAL splitOn #-}
splitOn
:: MonadIO m
=> Word8
-> D.Stream m (Array Word8)
-> D.Stream m (Array Word8)
splitOn byte (D.Stream step state) = D.Stream step' (Initial state)
where
{-# INLINE_LATE step' #-}
step' gst (Initial st) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
return $ case marr2 of
Nothing -> D.Skip (Buffering s arr1)
Just arr2 -> D.Skip (Yielding arr1 (Splitting s arr2))
D.Skip s -> return $ D.Skip (Initial s)
D.Stop -> return $ D.Stop
step' gst (Buffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
buf' <- spliceTwo buf arr1
return $ case marr2 of
Nothing -> D.Skip (Buffering s buf')
Just x -> D.Skip (Yielding buf' (Splitting s x))
D.Skip s -> return $ D.Skip (Buffering s buf)
D.Stop -> return $
if byteLength buf == 0
then D.Stop
else D.Skip (Yielding buf Finishing)
step' _ (Splitting st buf) = do
(arr1, marr2) <- breakOn byte buf
return $ case marr2 of
Nothing -> D.Skip $ Buffering st arr1
Just arr2 -> D.Skip $ Yielding arr1 (Splitting st arr2)
step' _ (Yielding arr next) = return $ D.Yield arr next
step' _ Finishing = return $ D.Stop

View File

@ -187,7 +187,7 @@ words = FL.wordsBy isSpace toArray
{-# INLINE unlines #-}
unlines :: (MonadIO m, IsStream t) => t m (Array Char) -> t m Char
unlines = D.fromStreamD . A.unlines . D.toStreamD
unlines = D.fromStreamD . A.unlines '\n' . D.toStreamD
{-# INLINE unwords #-}
unwords :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char

View File

@ -207,6 +207,9 @@ library
, Streamly.Sink
, Streamly.Pipe.Types
, Streamly.FileSystem.IOVec
, Streamly.FileSystem.FDIO
exposed-modules: Streamly.Prelude
, Streamly.Time
, Streamly
@ -217,8 +220,9 @@ library
-- IO devices
, Streamly.Mem.Array
, Streamly.FileSystem.File
, Streamly.FileSystem.FD
, Streamly.FileSystem.Handle
, Streamly.FileSystem.File
, Streamly.Network.Socket
, Streamly.Network.Server
, Streamly.Network.Client