diff --git a/benchmark/FileIO.hs b/benchmark/FileIO.hs index b36166492..1bfc845a5 100644 --- a/benchmark/FileIO.hs +++ b/benchmark/FileIO.hs @@ -18,12 +18,12 @@ import Gauge import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Mem.Array as A import qualified Streamly.Prelude as S -import qualified Streamly.String as SS #ifdef DEVBUILD import Data.Char (ord, chr) import System.IO (hSeek, SeekMode(..)) import qualified Streamly.Fold as FL +import qualified Streamly.String as SS #endif -- Input and output file handles @@ -94,8 +94,8 @@ main = do Handles inh _ <- readIORef href let s = FH.readArrays inh larr <- S.last s - return $ case larr of - Nothing -> Nothing + case larr of + Nothing -> return Nothing Just arr -> A.readIndex arr (A.length arr - 1) -- Note: this cannot be fairly compared with GNU wc -c or wc -m as -- wc uses lseek to just determine the file size rather than reading diff --git a/examples/HandleIO.hs b/examples/HandleIO.hs index 52e198411..2ddd724d2 100644 --- a/examples/HandleIO.hs +++ b/examples/HandleIO.hs @@ -2,40 +2,52 @@ import qualified Streamly.Prelude as S import qualified Streamly.Fold as FL import qualified Streamly.Mem.Array as A import qualified Streamly.FileSystem.Handle as FH +import qualified System.IO as FH +-- import qualified Streamly.FileSystem.FD as FH import qualified Streamly.String as SS import Data.Char (ord) import System.Environment (getArgs) -import System.IO (openFile, IOMode(..), stdout, Handle, hSeek, SeekMode(..)) +import System.IO (IOMode(..), hSeek, SeekMode(..)) -cat :: Handle -> IO () -cat src = FH.writeArrays stdout $ FH.readArraysUpto (256*1024) src +cat :: FH.Handle -> IO () +cat src = FH.writeArrays FH.stdout $ FH.readArraysOfUpto (256*1024) src +{- +cat src = + FH.writeByChunksOf (1024*1024) FH.stdout + $ FH.readByChunksUpto (16*1024) src +-} -cp :: Handle -> Handle -> IO () -cp src dst = FH.writeArrays dst $ FH.readArraysUpto (256*1024) src +cp :: FH.Handle -> FH.Handle -> IO () +cp src dst = FH.writeArrays dst $ FH.readArraysOfUpto (256*1024) src +{- +cp src dst = + FH.writeByChunksOf (1024*1024) dst + $ FH.readByChunksUpto (16*1024) src +-} ord' :: Num a => Char -> a ord' = (fromIntegral . ord) -wcl :: Handle -> IO () +wcl :: FH.Handle -> IO () wcl src = print =<< (S.length $ flip SS.foldLines FL.drain $ SS.decodeChar8 $ FH.read src) -grepc :: String -> Handle -> IO () +grepc :: String -> FH.Handle -> IO () grepc pat src = print . (subtract 1) =<< (S.length $ FL.splitOn (A.fromList (map ord' pat)) FL.drain $ FH.read src) -avgll :: Handle -> IO () +avgll :: FH.Handle -> IO () avgll src = print =<< (FL.foldl' avg $ FL.splitSuffixBy (== ord' '\n') FL.length $ FH.read src) where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length toDouble = fmap (fromIntegral :: Int -> Double) -llhisto :: Handle -> IO () +llhisto :: FH.Handle -> IO () llhisto src = print =<< (FL.foldl' (FL.classify FL.length) $ S.map bucket $ FL.splitSuffixBy (== ord' '\n') FL.length @@ -46,7 +58,7 @@ llhisto src = print =<< (FL.foldl' (FL.classify FL.length) main :: IO () main = do name <- fmap head getArgs - src <- openFile name ReadMode + src <- FH.openFile name ReadMode let rewind = hSeek src AbsoluteSeek 0 rewind >> putStrLn "cat" >> cat src -- Unix cat program @@ -55,5 +67,5 @@ main = do rewind >> putStr "avgll " >> avgll src -- get average line length rewind >> putStr "llhisto " >> llhisto src -- get line length histogram - dst <- openFile "dst-xyz.txt" WriteMode + dst <- FH.openFile "dst-xyz.txt" WriteMode rewind >> putStr "cp " >> cp src dst -- Unix cp program diff --git a/src/Streamly/FileSystem/FD.hs b/src/Streamly/FileSystem/FD.hs new file mode 100644 index 000000000..1351cb501 --- /dev/null +++ b/src/Streamly/FileSystem/FD.hs @@ -0,0 +1,561 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MagicHash #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE UnboxedTuples #-} + +#include "inline.hs" + +-- | +-- Module : Streamly.FileSystem.FD +-- Copyright : (c) 2019 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- This module is a an experimental replacement for +-- "Streamly.FileSystem.Handle". The former module provides IO facilities based +-- on the GHC Handle type. The APIs in this module avoid the GHC handle layer +-- and provide more explicit control over buffering. +-- +-- Read and write data as streams and arrays to and from files. +-- +-- This module provides read and write APIs based on handles. Before reading or +-- writing, a file must be opened first using 'openFile'. The 'Handle' returned +-- by 'openFile' is then used to access the file. A 'Handle' is backed by an +-- operating system file descriptor. When the 'Handle' is garbage collected the +-- underlying file descriptor is automatically closed. A handle can be +-- explicitly closed using 'closeFile'. +-- +-- Reading and writing APIs are divided into two categories, sequential +-- streaming APIs and random or seekable access APIs. File IO APIs are quite +-- similar to "Streamly.Mem.Array" read write APIs. In that regard, arrays can +-- be considered as in-memory files or files can be considered as on-disk +-- arrays. +-- +-- > import qualified Streamly.FileSystem.FD as FD +-- + +module Streamly.FileSystem.FD + ( + -- * File Handles + Handle + , stdin + , stdout + , stderr + , openFile + + -- TODO file path based APIs + -- , readFile + -- , writeFile + + -- * Streaming IO + -- | Streaming APIs read or write data to or from a file or device + -- sequentially, they never perform a seek to a random location. When + -- reading, the stream is lazy and generated on-demand as the consumer + -- consumes it. Read IO requests to the IO device are performed in chunks + -- of 32KiB, this is referred to as @defaultChunkSize@ in the + -- documentation. One IO request may or may not read the full chunk. If the + -- whole stream is not consumed, it is possible that we may read slightly + -- more from the IO device than what the consumer needed. Unless specified + -- otherwise in the API, writes are collected into chunks of + -- @defaultChunkSize@ before they are written to the IO device. + + -- Streaming APIs work for all kind of devices, seekable or non-seekable; + -- including disks, files, memory devices, terminals, pipes, sockets and + -- fifos. While random access APIs work only for files or devices that have + -- random access or seek capability for example disks, memory devices. + -- Devices like terminals, pipes, sockets and fifos do not have random + -- access capability. + + -- ** Read File to Stream + , read + -- , readUtf8 + -- , readLines + -- , readFrames + , readByChunksUpto + + -- -- * Array Read + -- , readArrayUpto + -- , readArrayOf + + , readArrays + , readArraysOfUpto + -- , readArraysOf + + -- ** Write File from Stream + , write + -- , writeUtf8 + -- , writeUtf8Lines + -- , writeFrames + , writeByChunksOf + + -- -- * Array Write + -- , writeArray + , writeArrays + , writeArraysOfUpto + , writev + , writevArraysOfUpto + + -- -- * Random Access (Seek) + -- -- | Unlike the streaming APIs listed above, these APIs apply to devices or + -- files that have random access or seek capability. This type of devices + -- include disks, files, memory devices and exclude terminals, pipes, + -- sockets and fifos. + -- + -- , readIndex + -- , readSlice + -- , readSliceRev + -- , readAt -- read from a given position to th end of file + -- , readSliceArrayUpto + -- , readSliceArrayOf + + -- , writeIndex + -- , writeSlice + -- , writeSliceRev + -- , writeAt -- start writing at the given position + -- , writeSliceArray + ) +where + +import Control.Monad.IO.Class (MonadIO(..)) +import Data.Word (Word8) +import Foreign.ForeignPtr (withForeignPtr) +-- import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.Ptr (plusPtr, castPtr) +import Foreign.Storable (Storable(..)) +import GHC.ForeignPtr (mallocPlainForeignPtrBytes) +-- import System.IO (Handle, hGetBufSome, hPutBuf) +import System.IO (IOMode) +import Prelude hiding (read) + +import qualified GHC.IO.FD as FD +import qualified GHC.IO.Device as RawIO hiding (write) +import qualified Streamly.FileSystem.FDIO as RawIO + +import Streamly.Mem.Array.Types (Array(..)) +import Streamly.Streams.Serial (SerialT) +import Streamly.Streams.StreamD (toStreamD) +import Streamly.Streams.StreamD.Type (fromStreamD) +import Streamly.Streams.StreamK.Type (IsStream, mkStream) +import Streamly.Mem.Array.Types (byteLength, defaultChunkSize, groupIOVecsOf) +-- import Streamly.Fold (Fold) +-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines) + +import qualified Streamly.Mem.Array as A +import qualified Streamly.Prelude as S +import qualified Streamly.Streams.StreamD.Type as D + +------------------------------------------------------------------------------- +-- References +------------------------------------------------------------------------------- +-- +-- The following references may be useful to build an understanding about the +-- file API design: +-- +-- http://www.linux-mag.com/id/308/ for blocking/non-blocking IO on linux. +-- https://lwn.net/Articles/612483/ Non-blocking buffered file read operations +-- https://en.wikipedia.org/wiki/C_file_input/output for C APIs. +-- https://docs.oracle.com/javase/tutorial/essential/io/file.html for Java API. +-- https://www.w3.org/TR/FileAPI/ for http file API. + +------------------------------------------------------------------------------- +-- Handles +------------------------------------------------------------------------------- + +-- XXX attach a finalizer +-- | A 'Handle' is returned by 'openFile' and is subsequently used to perform +-- read and write operations on a file. +-- +newtype Handle = Handle FD.FD + +-- | File handle for standard input +stdin :: Handle +stdin = Handle FD.stdin + +-- | File handle for standard output +stdout :: Handle +stdout = Handle FD.stdout + +-- | File handle for standard error +stderr :: Handle +stderr = Handle FD.stderr + +-- XXX we can support all the flags that the "open" system call supports. +-- Instead of using RTS locking mechanism can we use system provided locking +-- instead? +-- +-- | Open a file that is not a directory and return a file handle. +-- 'openFile' enforces a multiple-reader single-writer locking on files. That +-- is, there may either be many handles on the same file which manage input, or +-- just one handle on the file which manages output. If any open handle is +-- managing a file for output, no new handle can be allocated for that file. If +-- any open handle is managing a file for input, new handles can only be +-- allocated if they do not manage output. Whether two files are the same is +-- implementation-dependent, but they should normally be the same if they have +-- the same absolute path name and neither has been renamed, for example. +-- +openFile :: FilePath -> IOMode -> IO Handle +openFile path mode = fmap (Handle . fst) $ FD.openFile path mode True + +------------------------------------------------------------------------------- +-- Array IO (Input) +------------------------------------------------------------------------------- + +-- | Read a 'ByteArray' from a file handle. If no data is available on the +-- handle it blocks until some data becomes available. If data is available +-- then it immediately returns that data without blocking. It reads a maximum +-- of up to the size requested. +{-# INLINABLE readArrayUpto #-} +readArrayUpto :: Int -> Handle -> IO (Array Word8) +readArrayUpto size (Handle fd) = do + ptr <- mallocPlainForeignPtrBytes size + -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) + withForeignPtr ptr $ \p -> do + -- n <- hGetBufSome h p size + n <- RawIO.read fd p size + let v = Array + { aStart = ptr + , aEnd = p `plusPtr` n + , aBound = p `plusPtr` size + } + -- XXX shrink only if the diff is significant + -- A.shrinkToFit v + return v + +------------------------------------------------------------------------------- +-- Array IO (output) +------------------------------------------------------------------------------- + +-- | Write an 'Array' to a file handle. +-- +-- @since 0.7.0 +{-# INLINABLE writeArray #-} +writeArray :: Storable a => Handle -> Array a -> IO () +writeArray _ arr | A.length arr == 0 = return () +writeArray (Handle fd) arr = withForeignPtr (aStart arr) $ \p -> do + RawIO.writeAll fd (castPtr p) aLen + {- + -- Experiment to compare "writev" based IO with "write" based IO. + iov <- A.newArray 1 + let iov' = iov {aEnd = aBound iov} + A.writeIndex iov' 0 (RawIO.IOVec (castPtr p) (fromIntegral aLen)) + RawIO.writevAll fd (unsafeForeignPtrToPtr (aStart iov')) 1 + -} + where + aLen = byteLength arr + +-- | Write an array of 'IOVec' to a file handle. +-- +-- @since 0.7.0 +{-# INLINABLE writeIOVec #-} +writeIOVec :: Handle -> Array RawIO.IOVec -> IO () +writeIOVec _ iov | A.length iov == 0 = return () +writeIOVec (Handle fd) iov = + withForeignPtr (aStart iov) $ \p -> + RawIO.writevAll fd p (A.length iov) + +------------------------------------------------------------------------------- +-- Stream of Arrays IO +------------------------------------------------------------------------------- + +-- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@. +-- The maximum size of a single array is specified by @size@. The actual size +-- read may be less than or equal to @size@. +{-# INLINABLE _readArraysOfUpto #-} +_readArraysOfUpto :: (IsStream t, MonadIO m) + => Int -> Handle -> t m (Array Word8) +_readArraysOfUpto size h = go + where + -- XXX use cons/nil instead + go = mkStream $ \_ yld _ stp -> do + arr <- liftIO $ readArrayUpto size h + if A.length arr == 0 + then stp + else yld arr go + +{-# INLINE_NORMAL readArraysOfUpto #-} +readArraysOfUpto :: (IsStream t, MonadIO m) + => Int -> Handle -> t m (Array Word8) +readArraysOfUpto size h = D.fromStreamD (D.Stream step ()) + where + {-# INLINE_LATE step #-} + step _ _ = do + arr <- liftIO $ readArrayUpto size h + return $ + case A.length arr of + 0 -> D.Stop + _ -> D.Yield arr () + +-- XXX read 'Array a' instead of Word8 +-- +-- | @readArrays h@ reads a stream of arrays from file handle @h@. +-- The maximum size of a single array is limited to @defaultChunkSize@. +-- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode' +-- on the 'Handle'. +-- +-- > readArrays = readArraysOfUpto defaultChunkSize +-- +-- @since 0.7.0 +{-# INLINE readArrays #-} +readArrays :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8) +readArrays = readArraysOfUpto defaultChunkSize + +------------------------------------------------------------------------------- +-- Read File to Stream +------------------------------------------------------------------------------- + +-- TODO for concurrent streams implement readahead IO. We can send multiple +-- read requests at the same time. For serial case we can use async IO. We can +-- also control the read throughput in mbps or IOPS. + +-- | @readByChunksUpto chunkSize handle@ reads a byte stream from a file handle, +-- reads are performed in chunks of up to @chunkSize@. The stream ends as soon +-- as EOF is encountered. +-- +{-# INLINE readByChunksUpto #-} +readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8 +readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h + +-- TODO +-- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a +-- +-- > read = 'readByChunks' A.defaultChunkSize +-- | Generate a stream of elements of the given type from a file 'Handle'. The +-- stream ends when EOF is encountered. +-- +-- @since 0.7.0 +{-# INLINE read #-} +read :: (IsStream t, MonadIO m) => Handle -> t m Word8 +read = A.flattenArrays . readArrays + +------------------------------------------------------------------------------- +-- Writing +------------------------------------------------------------------------------- + +-- | Write a stream of arrays to a handle. +-- +-- @since 0.7.0 +{-# INLINE writeArrays #-} +writeArrays :: (MonadIO m, Storable a) => Handle -> SerialT m (Array a) -> m () +writeArrays h m = S.mapM_ (liftIO . writeArray h) m + +-- | Write a stream of 'IOVec' arrays to a handle. +-- +-- @since 0.7.0 +{-# INLINE writev #-} +writev :: MonadIO m => Handle -> SerialT m (Array RawIO.IOVec) -> m () +writev h m = S.mapM_ (liftIO . writeIOVec h) m + +-- | Write a stream of arrays to a handle after coalescing them in chunks of +-- specified size. The chunk size is only a maximum and the actual writes could +-- be smaller than that as we do not split the arrays to fit them to the +-- specified size. +-- +-- @since 0.7.0 +{-# INLINE writeArraysOfUpto #-} +writeArraysOfUpto :: (MonadIO m, Storable a) + => Int -> Handle -> SerialT m (Array a) -> m () +writeArraysOfUpto n h xs = writeArrays h $ A.coalesceChunksOf n xs + +-- | Write a stream of arrays to a handle after grouping them in 'IOVec' arrays +-- of up to a maximum total size. Writes are performed using gather IO via +-- @writev@ system call. The maximum number of entries in each 'IOVec' group +-- limited to 512. +-- +-- @since 0.7.0 +{-# INLINE writevArraysOfUpto #-} +writevArraysOfUpto :: MonadIO m + => Int -> Handle -> SerialT m (Array a) -> m () +writevArraysOfUpto n h xs = + writev h $ fromStreamD $ groupIOVecsOf n 512 (toStreamD xs) + +-- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes. +-- +-- XXX test this +-- Note that if you use a chunk size less than 8K (GHC's default buffer +-- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you +-- do not want buffering to occur at GHC level as well. Same thing applies to +-- writes as well. + +-- | Like 'write' but provides control over the write buffer. Output will +-- be written to the IO device as soon as we collect the specified number of +-- input elements. +-- +-- @since 0.7.0 +{-# INLINE writeByChunksOf #-} +writeByChunksOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m () +writeByChunksOf n h m = writeArrays h $ A.arraysOf n m + +-- > write = 'writeByChunks' A.defaultChunkSize +-- +-- | Write a byte stream to a file handle. Combines the bytes in chunks of size +-- up to 'A.defaultChunkSize' before writing. Note that the write behavior +-- depends on the 'IOMode' and the current seek position of the handle. +-- +-- @since 0.7.0 +{-# INLINE write #-} +write :: MonadIO m => Handle -> SerialT m Word8 -> m () +write = writeByChunksOf defaultChunkSize + +{- +{-# INLINE write #-} +write :: (MonadIO m, Storable a) => Handle -> SerialT m a -> m () +write = toHandleWith A.defaultChunkSize +-} + +------------------------------------------------------------------------------- +-- IO with encoding/decoding Unicode characters +------------------------------------------------------------------------------- + +{- +-- | +-- > readUtf8 = decodeUtf8 . read +-- +-- Read a UTF8 encoded stream of unicode characters from a file handle. +-- +-- @since 0.7.0 +{-# INLINE readUtf8 #-} +readUtf8 :: (IsStream t, MonadIO m) => Handle -> t m Char +readUtf8 = decodeUtf8 . read + +-- | +-- > writeUtf8 h s = write h $ encodeUtf8 s +-- +-- Encode a stream of unicode characters to UTF8 and write it to the given file +-- handle. Default block buffering applies to the writes. +-- +-- @since 0.7.0 +{-# INLINE writeUtf8 #-} +writeUtf8 :: MonadIO m => Handle -> SerialT m Char -> m () +writeUtf8 h s = write h $ encodeUtf8 s + +-- | Write a stream of unicode characters after encoding to UTF-8 in chunks +-- separated by a linefeed character @'\n'@. If the size of the buffer exceeds +-- @defaultChunkSize@ and a linefeed is not yet found, the buffer is written +-- anyway. This is similar to writing to a 'Handle' with the 'LineBuffering' +-- option. +-- +-- @since 0.7.0 +{-# INLINE writeUtf8ByLines #-} +writeUtf8ByLines :: (IsStream t, MonadIO m) => Handle -> t m Char -> m () +writeUtf8ByLines = undefined + +-- | Read UTF-8 lines from a file handle and apply the specified fold to each +-- line. This is similar to reading a 'Handle' with the 'LineBuffering' option. +-- +-- @since 0.7.0 +{-# INLINE readLines #-} +readLines :: (IsStream t, MonadIO m) => Handle -> Fold m Char b -> t m b +readLines h f = foldLines (readUtf8 h) f + +------------------------------------------------------------------------------- +-- Framing on a sequence +------------------------------------------------------------------------------- + +-- | Read a stream from a file handle and split it into frames delimited by +-- the specified sequence of elements. The supplied fold is applied on each +-- frame. +-- +-- @since 0.7.0 +{-# INLINE readFrames #-} +readFrames :: (IsStream t, MonadIO m, Storable a) + => Array a -> Handle -> Fold m a b -> t m b +readFrames = undefined -- foldFrames . read + +-- | Write a stream to the given file handle buffering up to frames separated +-- by the given sequence or up to a maximum of @defaultChunkSize@. +-- +-- @since 0.7.0 +{-# INLINE writeByFrames #-} +writeByFrames :: (IsStream t, MonadIO m, Storable a) + => Array a -> Handle -> t m a -> m () +writeByFrames = undefined + +------------------------------------------------------------------------------- +-- Random Access IO (Seek) +------------------------------------------------------------------------------- + +-- XXX handles could be shared, so we may not want to use the handle state at +-- all for these APIs. we can use pread and pwrite instead. On windows we will +-- need to use readFile/writeFile with an offset argument. + +------------------------------------------------------------------------------- + +-- | Read the element at the given index treating the file as an array. +-- +-- @since 0.7.0 +{-# INLINE readIndex #-} +readIndex :: Storable a => Handle -> Int -> Maybe a +readIndex arr i = undefined + +-- NOTE: To represent a range to read we have chosen (start, size) instead of +-- (start, end). This helps in removing the ambiguity of whether "end" is +-- included in the range or not. +-- +-- We could avoid specifying the range to be read and instead use "take size" +-- on the stream, but it may end up reading more and then consume it partially. + +-- | @readSliceWith chunkSize handle pos len@ reads up to @len@ bytes +-- from @handle@ starting at the offset @pos@ from the beginning of the file. +-- +-- Reads are performed in chunks of size @chunkSize@. For block devices, to +-- avoid reading partial blocks @chunkSize@ must align with the block size of +-- the underlying device. If the underlying block size is unknown, it is a good +-- idea to keep it a multiple 4KiB. This API ensures that the start of each +-- chunk is aligned with @chunkSize@ from second chunk onwards. +-- +{-# INLINE readSliceWith #-} +readSliceWith :: (IsStream t, MonadIO m, Storable a) + => Int -> Handle -> Int -> Int -> t m a +readSliceWith chunkSize h pos len = undefined + +-- | @readSlice h i count@ streams a slice from the file handle @h@ starting +-- at index @i@ and reading up to @count@ elements in the forward direction +-- ending at the index @i + count - 1@. +-- +-- @since 0.7.0 +{-# INLINE readSlice #-} +readSlice :: (IsStream t, MonadIO m, Storable a) + => Handle -> Int -> Int -> t m a +readSlice = readSliceWith A.defaultChunkSize + +-- | @readSliceRev h i count@ streams a slice from the file handle @h@ starting +-- at index @i@ and reading up to @count@ elements in the reverse direction +-- ending at the index @i - count + 1@. +-- +-- @since 0.7.0 +{-# INLINE readSliceRev #-} +readSliceRev :: (IsStream t, MonadIO m, Storable a) + => Handle -> Int -> Int -> t m a +readSliceRev h i count = undefined + +-- | Write the given element at the given index in the file. +-- +-- @since 0.7.0 +{-# INLINE writeIndex #-} +writeIndex :: (MonadIO m, Storable a) => Handle -> Int -> a -> m () +writeIndex h i a = undefined + +-- | @writeSlice h i count stream@ writes a stream to the file handle @h@ +-- starting at index @i@ and writing up to @count@ elements in the forward +-- direction ending at the index @i + count - 1@. +-- +-- @since 0.7.0 +{-# INLINE writeSlice #-} +writeSlice :: (IsStream t, Monad m, Storable a) + => Handle -> Int -> Int -> t m a -> m () +writeSlice h i len s = undefined + +-- | @writeSliceRev h i count stream@ writes a stream to the file handle @h@ +-- starting at index @i@ and writing up to @count@ elements in the reverse +-- direction ending at the index @i - count + 1@. +-- +-- @since 0.7.0 +{-# INLINE writeSliceRev #-} +writeSliceRev :: (IsStream t, Monad m, Storable a) + => Handle -> Int -> Int -> t m a -> m () +writeSliceRev arr i len s = undefined +-} diff --git a/src/Streamly/FileSystem/FDIO.hs b/src/Streamly/FileSystem/FDIO.hs new file mode 100644 index 000000000..e68841573 --- /dev/null +++ b/src/Streamly/FileSystem/FDIO.hs @@ -0,0 +1,200 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MagicHash #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE UnboxedTuples #-} + +#include "inline.hs" + +-- | +-- Module : Streamly.FileSystem.FDIO +-- Copyright : (c) 2019 Composewell Technologies +-- Copyright : (c) 1994-2008 The University of Glasgow +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- Low level IO routines interfacing the operating system. +-- + +module Streamly.FileSystem.FDIO + ( write + , writeAll + , IOVec (..) + , writev + , writevAll + ) +where + +import Control.Concurrent (threadWaitWrite) +import Control.Monad (when) +import Data.Int (Int64) +import Data.Word (Word8) +import Foreign.C.Error (throwErrnoIfMinus1RetryMayBlock) +import Foreign.C.Types (CSize(..), CInt(..), CBool(..)) +import Foreign.Ptr (plusPtr, Ptr) +import System.Posix.Internals (c_write, c_safe_write) + +import GHC.IO.FD (FD(..)) + +import Streamly.FileSystem.IOVec (IOVec(..), c_writev, c_safe_writev) + +------------------------------------------------------------------------------- +-- IO Routines +------------------------------------------------------------------------------- + +-- See System.POSIX.Internals in GHC base package + +------------------------------------------------------------------------------- +-- Write without blocking the underlying OS thread +------------------------------------------------------------------------------- + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +isNonBlocking :: FD -> Bool +isNonBlocking fd = fdIsNonBlocking fd /= 0 + +#if !defined(mingw32_HOST_OS) + +-- "poll"s the fd for data to become available or timeout +-- See cbits/inputReady.c in base package +foreign import ccall unsafe "fdReady" + unsafe_fdReady :: CInt -> CBool -> Int64 -> CBool -> IO CInt + +writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt +writeNonBlocking loc !fd !buf !off !len + | isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block + | otherwise = do + let isWrite = 1 + isSocket = 0 + msecs = 0 + r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket + when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd)) + if threaded then safe_write else unsafe_write + + where + + do_write call = fromIntegral `fmap` + throwErrnoIfMinus1RetryMayBlock loc call + (threadWaitWrite (fromIntegral (fdFD fd))) + unsafe_write = do_write (c_write (fdFD fd) (buf `plusPtr` off) len) + safe_write = do_write (c_safe_write (fdFD fd) (buf `plusPtr` off) len) + +writevNonBlocking :: String -> FD -> Ptr IOVec -> Int -> IO CInt +writevNonBlocking loc !fd !iov !cnt + | isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block + | otherwise = do + let isWrite = 1 + isSocket = 0 + msecs = 0 + r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket + when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd)) + if threaded then safe_write else unsafe_write + + where + + do_write call = fromIntegral `fmap` + throwErrnoIfMinus1RetryMayBlock loc call + (threadWaitWrite (fromIntegral (fdFD fd))) + unsafe_write = do_write (c_writev (fdFD fd) iov (fromIntegral cnt)) + safe_write = do_write (c_safe_writev (fdFD fd) iov (fromIntegral cnt)) + +#else + +foreign import WINDOWS_CCONV safe "recv" + c_safe_recv :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt + +foreign import WINDOWS_CCONV safe "send" + c_safe_send :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt + +blockingWriteRawBufferPtr :: String -> FD -> Ptr Word8-> Int -> CSize -> IO CInt +blockingWriteRawBufferPtr loc !fd !buf !off !len + = throwErrnoIfMinus1Retry loc $ do + let start_ptr = buf `plusPtr` off + send_ret = c_safe_send (fdFD fd) start_ptr (fromIntegral len) 0 + write_ret = c_safe_write (fdFD fd) start_ptr (fromIntegral len) + r <- bool write_ret send_ret (fdIsSocket fd) + when (r == -1) c_maperrno + return r + -- We don't trust write() to give us the correct errno, and + -- instead do the errno conversion from GetLastError() + -- ourselves. The main reason is that we treat ERROR_NO_DATA + -- (pipe is closing) as EPIPE, whereas write() returns EINVAL + -- for this case. We need to detect EPIPE correctly, because it + -- shouldn't be reported as an error when it happens on stdout. + -- As for send()'s case, Winsock functions don't do errno + -- conversion in any case so we have to do it ourselves. + -- That means we're doing the errno conversion no matter if the + -- fd is from a socket or not. + +-- NOTE: "safe" versions of the read/write calls for use by the threaded RTS. +-- These calls may block, but that's ok. + +asyncWriteRawBufferPtr :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt +asyncWriteRawBufferPtr loc !fd !buf !off !len = do + (l, rc) <- asyncWrite (fromIntegral (fdFD fd)) (fdIsSocket_ fd) + (fromIntegral len) (buf `plusPtr` off) + if l == (-1) + then let sock_errno = c_maperrno_func (fromIntegral rc) + non_sock_errno = Errno (fromIntegral rc) + errno = bool non_sock_errno sock_errno (fdIsSocket fd) + in ioError (errnoToIOError loc errno Nothing Nothing) + else return (fromIntegral l) + +writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt +writeNonBlocking loc !fd !buf !off !len + | threaded = blockingWriteRawBufferPtr loc fd buf off len + | otherwise = asyncWriteRawBufferPtr loc fd buf off len + +#endif + +-- | @write FD buffer offset length@ tries to write data on the given +-- filesystem fd (cannot be a socket) up to sepcified length starting from the +-- given offset in the buffer. The write will not block the OS thread, it may +-- suspend the Haskell thread until write can proceed. Returns the actual +-- amount of data written. +write :: FD -> Ptr Word8 -> Int -> CSize -> IO CInt +write = writeNonBlocking "Streamly.FileSystem.FDIO" + +-- XXX sendAll for sockets has a similar code, we can deduplicate the two. +-- XXX we need to check the errno to determine if the loop should continue. For +-- example, write may return without writing all data if the process file-size +-- limit has reached, in that case keep writing in a loop is fruitless. +-- +-- | Keep writing in a loop until all data in the buffer has been written. +writeAll :: FD -> Ptr Word8 -> Int -> IO () +writeAll fd ptr bytes = do + res <- write fd ptr 0 (fromIntegral bytes) + let res' = fromIntegral res + if res' < bytes + then writeAll fd (ptr `plusPtr` res') (bytes - res') + else return () + +------------------------------------------------------------------------------- +-- Vector IO +------------------------------------------------------------------------------- + +-- | @write FD iovec count@ tries to write data on the given filesystem fd +-- (cannot be a socket) from an iovec with specified number of entries. The +-- write will not block the OS thread, it may suspend the Haskell thread until +-- write can proceed. Returns the actual amount of data written. +writev :: FD -> Ptr IOVec -> Int -> IO CInt +writev = writevNonBlocking "Streamly.FileSystem.FDIO" + +-- | Keep writing an iovec in a loop until all the iovec entries are written. +writevAll :: FD -> Ptr IOVec -> Int -> IO () +writevAll fd iovec count = do + _res <- writev fd iovec count + {- + let res' = fromIntegral res + totalBytes = countIOVecBytes + if res' < totalBytes + then do + let iovec' = createModifiedIOVec + count' = ... + writeAll fd iovec' count' + else return () + -} + return () diff --git a/src/Streamly/FileSystem/File.hs b/src/Streamly/FileSystem/File.hs index 9e983d0b6..3472e3088 100644 --- a/src/Streamly/FileSystem/File.hs +++ b/src/Streamly/FileSystem/File.hs @@ -63,7 +63,7 @@ module Streamly.FileSystem.File -- -- * Array Read -- , readArrayOf - , readArraysUpto + , readArraysOfUpto -- , readArraysOf , readArrays @@ -183,26 +183,26 @@ appendArray file arr = SIO.withFile file AppendMode (\h -> FH.writeArray h arr) -- Stream of Arrays IO ------------------------------------------------------------------------------- --- | @readArraysUpto size file@ reads a stream of arrays from file @file@. +-- | @readArraysOfUpto size file@ reads a stream of arrays from file @file@. -- The maximum size of a single array is specified by @size@. The actual size -- read may be less than or equal to @size@. -{-# INLINABLE readArraysUpto #-} -readArraysUpto :: (IsStream t, MonadCatch m, MonadIO m) +{-# INLINABLE readArraysOfUpto #-} +readArraysOfUpto :: (IsStream t, MonadCatch m, MonadIO m) => Int -> FilePath -> t m (Array Word8) -readArraysUpto size file = withFile file ReadMode (FH.readArraysUpto size) +readArraysOfUpto size file = withFile file ReadMode (FH.readArraysOfUpto size) -- XXX read 'Array a' instead of Word8 -- -- | @readArrays file@ reads a stream of arrays from file @file@. -- The maximum size of a single array is limited to @defaultChunkSize@. -- --- > readArrays = readArraysUpto defaultChunkSize +-- > readArrays = readArraysOfUpto defaultChunkSize -- -- @since 0.7.0 {-# INLINE readArrays #-} readArrays :: (IsStream t, MonadCatch m, MonadIO m) => FilePath -> t m (Array Word8) -readArrays = readArraysUpto A.defaultChunkSize +readArrays = readArraysOfUpto A.defaultChunkSize ------------------------------------------------------------------------------- -- Read File to Stream @@ -219,7 +219,7 @@ readArrays = readArraysUpto A.defaultChunkSize -- {-# INLINE readByChunksUpto #-} readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8 -readByChunksUpto chunkSize h = A.flattenArrays $ readArraysUpto chunkSize h +readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h -} -- TODO diff --git a/src/Streamly/FileSystem/Handle.hs b/src/Streamly/FileSystem/Handle.hs index 8adc29bd9..cf1d41292 100644 --- a/src/Streamly/FileSystem/Handle.hs +++ b/src/Streamly/FileSystem/Handle.hs @@ -53,13 +53,13 @@ module Streamly.FileSystem.Handle -- , readUtf8 -- , readLines -- , readFrames - -- , readByChunks + , readByChunksUpto -- -- * Array Read -- , readArrayUpto -- , readArrayOf - , readArraysUpto + , readArraysOfUpto -- , readArraysOf , readArrays @@ -68,7 +68,7 @@ module Streamly.FileSystem.Handle -- , writeUtf8 -- , writeUtf8ByLines -- , writeByFrames - , writeByChunks + , writeByChunksOf -- -- * Array Write , writeArray @@ -172,13 +172,13 @@ writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen -- Stream of Arrays IO ------------------------------------------------------------------------------- --- | @readArraysUpto size h@ reads a stream of arrays from file handle @h@. +-- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@. -- The maximum size of a single array is specified by @size@. The actual size -- read may be less than or equal to @size@. -{-# INLINABLE _readArraysUpto #-} -_readArraysUpto :: (IsStream t, MonadIO m) +{-# INLINABLE _readArraysOfUpto #-} +_readArraysOfUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) -_readArraysUpto size h = go +_readArraysOfUpto size h = go where -- XXX use cons/nil instead go = mkStream $ \_ yld _ stp -> do @@ -187,9 +187,9 @@ _readArraysUpto size h = go then stp else yld arr go -{-# INLINE_NORMAL readArraysUpto #-} -readArraysUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) -readArraysUpto size h = D.fromStreamD (D.Stream step ()) +{-# INLINE_NORMAL readArraysOfUpto #-} +readArraysOfUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) +readArraysOfUpto size h = D.fromStreamD (D.Stream step ()) where {-# INLINE_LATE step #-} step _ _ = do @@ -206,12 +206,12 @@ readArraysUpto size h = D.fromStreamD (D.Stream step ()) -- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode' -- on the 'Handle'. -- --- > readArrays = readArraysUpto defaultChunkSize +-- > readArrays = readArraysOfUpto defaultChunkSize -- -- @since 0.7.0 {-# INLINE readArrays #-} readArrays :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8) -readArrays = readArraysUpto A.defaultChunkSize +readArrays = readArraysOfUpto A.defaultChunkSize ------------------------------------------------------------------------------- -- Read File to Stream @@ -221,15 +221,13 @@ readArrays = readArraysUpto A.defaultChunkSize -- read requests at the same time. For serial case we can use async IO. We can -- also control the read throughput in mbps or IOPS. -{- -- | @readByChunksUpto chunkSize handle@ reads a byte stream from a file -- handle, reads are performed in chunks of up to @chunkSize@. The stream ends -- as soon as EOF is encountered. -- {-# INLINE readByChunksUpto #-} readByChunksUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8 -readByChunksUpto chunkSize h = A.flattenArrays $ readArraysUpto chunkSize h --} +readByChunksUpto chunkSize h = A.flattenArrays $ readArraysOfUpto chunkSize h -- TODO -- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a @@ -267,11 +265,11 @@ writeArrays h m = S.mapM_ (liftIO . writeArray h) m -- input elements. -- -- @since 0.7.0 -{-# INLINE writeByChunks #-} -writeByChunks :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m () -writeByChunks n h m = writeArrays h $ A.arraysOf n m +{-# INLINE writeByChunksOf #-} +writeByChunksOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m () +writeByChunksOf n h m = writeArrays h $ A.arraysOf n m --- > write = 'writeByChunks' A.defaultChunkSize +-- > write = 'writeByChunksOf' A.defaultChunkSize -- -- | Write a byte stream to a file handle. Combines the bytes in chunks of size -- up to 'A.defaultChunkSize' before writing. Note that the write behavior @@ -280,7 +278,7 @@ writeByChunks n h m = writeArrays h $ A.arraysOf n m -- @since 0.7.0 {-# INLINE write #-} write :: MonadIO m => Handle -> SerialT m Word8 -> m () -write = writeByChunks A.defaultChunkSize +write = writeByChunksOf A.defaultChunkSize {- {-# INLINE write #-} diff --git a/src/Streamly/FileSystem/IOVec.hsc b/src/Streamly/FileSystem/IOVec.hsc new file mode 100644 index 000000000..931e36c3d --- /dev/null +++ b/src/Streamly/FileSystem/IOVec.hsc @@ -0,0 +1,66 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CApiFFI #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | +-- Module : Streamly.FileSystem.IOVec +-- Copyright : (c) 2019 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com +-- Stability : experimental +-- Portability : GHC +-- +-- Low level IO routines interfacing the operating system. +-- + +module Streamly.FileSystem.IOVec + ( IOVec(..) + , c_writev + , c_safe_writev + ) +where + +import Data.Word (Word8, Word64) +import Foreign.C.Types (CInt(..)) +import Foreign.Ptr (Ptr) +import Foreign.Storable (Storable(..)) +import System.Posix.Types (CSsize(..)) + +------------------------------------------------------------------------------- +-- IOVec +------------------------------------------------------------------------------- + +#if !defined(mingw32_HOST_OS) + +#include + +data IOVec = IOVec + { iovBase :: {-# UNPACK #-} !(Ptr Word8) + , iovLen :: {-# UNPACK #-} !Word64 + } deriving (Eq, Show) + +instance Storable IOVec where + sizeOf _ = #{size struct iovec} + alignment _ = #{alignment struct iovec} + peek ptr = do + base <- #{peek struct iovec, iov_base} ptr + len :: #{type size_t} <- #{peek struct iovec, iov_len} ptr + return $ IOVec base len + poke ptr vec = do + let base = iovBase vec + len :: #{type size_t} = iovLen vec + #{poke struct iovec, iov_base} ptr base + #{poke struct iovec, iov_len} ptr len + +foreign import capi unsafe "sys/uio.h writev" + c_writev :: CInt -> Ptr IOVec -> CInt -> IO CSsize + +foreign import capi safe "sys/uio.h writev" + c_safe_writev :: CInt -> Ptr IOVec -> CInt -> IO CSsize + +#else +c_writev = error "writev not implemented for windows" +c_safe_writev = error "writev not implemented for windows" +#endif diff --git a/src/Streamly/Mem/Array.hs b/src/Streamly/Mem/Array.hs index 35f938b79..bd2ce5df6 100644 --- a/src/Streamly/Mem/Array.hs +++ b/src/Streamly/Mem/Array.hs @@ -93,27 +93,32 @@ module Streamly.Mem.Array -- , newArray , writeN , write - , fromListN - , fromList + , A.fromListN + , A.fromList -- Folds - , toArrayN + , A.toArrayN -- , toArrays , toArray - -- Streams + -- Streams of arrays , arraysOf -- * Elimination -- 'GHC.Exts.toList' from "GHC.Exts" can be used to convert an array to a -- list. - , read + , A.read , readRev - , toList + , A.toList + + -- Streams of arrays , flattenArrays -- , flattenArraysRev - , spliceArrays + , coalesceArrays + , coalesceChunksOf + , unlinesArraysBy + , splitArraysOn -- * Random Access , length @@ -123,8 +128,10 @@ module Streamly.Mem.Array {- , readSlice , readSliceRev + -} , writeIndex + {- , writeSlice , writeSliceRev -} @@ -140,13 +147,14 @@ where import Control.Monad (when) import Control.Monad.IO.Class (MonadIO(..)) import Data.Functor.Identity (Identity) +import Data.Word (Word8) import Foreign.ForeignPtr (withForeignPtr) import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) import Foreign.Ptr (minusPtr, plusPtr, castPtr) import Foreign.Storable (Storable(..)) import Prelude hiding (length, null, last, map, (!!), read) -import Streamly.Mem.Array.Types hiding (flattenArrays, newArray) +import Streamly.Mem.Array.Types (Array(..), length) import Streamly.Streams.Serial (SerialT) import Streamly.Streams.StreamK.Type (IsStream) import Streamly.Fold.Types (Fold(..)) @@ -178,7 +186,7 @@ newArray len = undefined writeN :: (MonadIO m, Storable a) => Int -> SerialT m a -> m (Array a) writeN n m = do if n < 0 then error "writeN: negative write count specified" else return () - fromStreamDN n $ D.toStreamD m + A.fromStreamDN n $ D.toStreamD m ------------------------------------------------------------------------------- -- Elimination @@ -189,7 +197,7 @@ writeN n m = do -- @since 0.7.0 {-# INLINE_EARLY readRev #-} readRev :: (Monad m, IsStream t, Storable a) => Array a -> t m a -readRev = D.fromStreamD . toStreamDRev +readRev = D.fromStreamD . A.toStreamDRev -- XXX add fallback to StreamK rule -- {-# RULES "Streamly.Array.readRev fallback to StreamK" [1] -- forall a. S.toStreamK (readRev a) = K.revFromArray a #-} @@ -199,7 +207,7 @@ _null :: Storable a => Array a -> Bool _null arr = length arr == 0 {-# INLINE _last #-} -_last :: forall a. Storable a => Array a -> Maybe a +_last :: (MonadIO m, Storable a) => Array a -> m (Maybe a) _last arr = readIndex arr (length arr - 1) ------------------------------------------------------------------------------- @@ -306,11 +314,12 @@ foldbWith level f = undefined -- -- @since 0.7.0 {-# INLINE readIndex #-} -readIndex :: Storable a => Array a -> Int -> Maybe a +readIndex :: (MonadIO m, Storable a) => Array a -> Int -> m (Maybe a) readIndex arr i = if i < 0 || i > length arr - 1 - then Nothing - else Just $ unsafeIndex arr i + then return Nothing + else liftIO $ withForeignPtr (aStart arr) $ \p -> + fmap Just $ peekElemOff p i {- -- | @readSlice arr i count@ streams a slice of the array @arr@ starting @@ -332,14 +341,25 @@ readSlice arr i len = undefined readSliceRev :: (IsStream t, Monad m, Storable a) => Array a -> Int -> Int -> t m a readSliceRev arr i len = undefined +-} -- | /O(1)/ Write the given element at the given index in the array. -- -- @since 0.7.0 {-# INLINE writeIndex #-} writeIndex :: (MonadIO m, Storable a) => Array a -> Int -> a -> m () -writeIndex arr i a = undefined +writeIndex arr i a = do + let maxIndex = length arr - 1 + if i < 0 + then error "writeIndex: negative array index" + else if i > maxIndex + then error $ "writeIndex: specified array index " ++ show i + ++ " is beyond the maximum index " ++ show maxIndex + else + liftIO $ withForeignPtr (aStart arr) $ \p -> + pokeElemOff p i a +{- -- | @writeSlice arr i count stream@ writes a stream to the array @arr@ -- starting at index @i@ and writing up to @count@ elements in the forward -- direction ending at the index @i + count - 1@. @@ -381,7 +401,7 @@ toArraysInRange low high (Fold step initial extract) = {-# INLINE _toArraysOf #-} _toArraysOf :: (MonadIO m, Storable a) => Int -> Fold m a (SerialT Identity (Array a)) -_toArraysOf n = FL.lchunksOf n (toArrayN n) FL.toStream +_toArraysOf n = FL.lchunksOf n (A.toArrayN n) FL.toStream -- XXX The realloc based implementation needs to make one extra copy if we use -- shrinkToFit. On the other hand, the stream of arrays implementation may @@ -401,7 +421,7 @@ bytesToCount x n = let elemSize = sizeOf x in n + elemSize - 1 `div` elemSize -{-# INLINE toArrayMinChunk #-} +{-# INLINE_NORMAL toArrayMinChunk #-} toArrayMinChunk :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a) -- toArrayMinChunk n = FL.mapM spliceArrays $ toArraysOf n @@ -416,11 +436,14 @@ toArrayMinChunk elemCount = Fold step initial extract initial = do when (elemCount < 0) $ error "toArrayMinChunk: elemCount is negative" liftIO $ A.newArray elemCount - step arr@(Array _ end bound) x | end == bound = do - arr1 <- liftIO $ reallocDouble 1 arr + step arr@(Array start end bound) x | end == bound = do + let p = unsafeForeignPtrToPtr start + oldSize = end `minusPtr` p + newSize = max (oldSize * 2) 1 + arr1 <- liftIO $ A.realloc newSize arr insertElem arr1 x step arr x = insertElem arr x - extract = liftIO . shrinkToFit + extract = liftIO . A.shrinkToFit -- | Fold the whole input to a single array. -- @@ -429,7 +452,7 @@ toArrayMinChunk elemCount = Fold step initial extract -- @since 0.7.0 {-# INLINE toArray #-} toArray :: forall m a. (MonadIO m, Storable a) => Fold m a (Array a) -toArray = toArrayMinChunk (bytesToCount (undefined :: a) (mkChunkSize 1024)) +toArray = toArrayMinChunk (bytesToCount (undefined :: a) (A.mkChunkSize 1024)) -- | Convert a stream of arrays into a stream of their elements. -- @@ -449,6 +472,39 @@ flattenArrays m = D.fromStreamD $ A.flattenArrays (D.toStreamD m) _flattenArraysRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a _flattenArraysRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m) +-- XXX use an Array instead as separator? Or use a separate unlinesArraysBySeq +-- API for that? +-- +-- | Flatten a stream of arrays appending the given element after each +-- array. +-- +-- @since 0.7.0 +{-# INLINE unlinesArraysBy #-} +unlinesArraysBy :: (MonadIO m, IsStream t, Storable a) + => a -> t m (Array a) -> t m a +unlinesArraysBy x = D.fromStreamD . A.unlines x . D.toStreamD + +-- | Split a stream of arrays on a given separator byte, dropping the separator +-- and coalescing all the arrays between two separators into a single array. +-- +-- @since 0.7.0 +{-# INLINE splitArraysOn #-} +splitArraysOn + :: (IsStream t, MonadIO m) + => Word8 + -> t m (Array Word8) + -> t m (Array Word8) +splitArraysOn byte s = D.fromStreamD $ A.splitOn byte $ D.toStreamD s + +-- | Coalesce adajcent arrays in incoming stream to form bigger arrays of a +-- maximum specified size. +-- +-- @since 0.7.0 +{-# INLINE coalesceChunksOf #-} +coalesceChunksOf :: (MonadIO m, Storable a) + => Int -> SerialT m (Array a) -> SerialT m (Array a) +coalesceChunksOf n xs = D.fromStreamD $ A.coalesceChunksOf n (D.toStreamD xs) + -- | -- > arraysOf n = FL.groupsOf n (FL.toArrayN n) -- @@ -459,18 +515,17 @@ _flattenArraysRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m) arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) arraysOf n str = - D.fromStreamD $ fromStreamDArraysOf n (D.toStreamD str) + D.fromStreamD $ A.fromStreamDArraysOf n (D.toStreamD str) -- XXX Both of these implementations of splicing seem to perform equally well. -- We need to perform benchmarks over a range of sizes though. -{-# INLINE _spliceArraysRealloced #-} -_spliceArraysRealloced :: (MonadIO m, Storable a) - => SerialT m (Array a) -> m (Array a) -_spliceArraysRealloced s = do - buffered <- P.foldr S.cons S.nil s - len <- S.sum (S.map length buffered) - +-- CAUTION! length must more than equal to lengths of all the arrays in the +-- stream. +{-# INLINE spliceArraysLenUnsafe #-} +spliceArraysLenUnsafe :: (MonadIO m, Storable a) + => Int -> SerialT m (Array a) -> m (Array a) +spliceArraysLenUnsafe len buffered = do arr <- liftIO $ A.newArray len end <- S.foldlM' writeArr (aEnd arr) buffered return $ arr {aEnd = end} @@ -479,42 +534,36 @@ _spliceArraysRealloced s = do writeArr dst Array{..} = liftIO $ withForeignPtr aStart $ \src -> do - let len = aEnd `minusPtr` src - memcpy (castPtr dst) (castPtr src) len - return $ dst `plusPtr` len + let count = aEnd `minusPtr` src + A.memcpy (castPtr dst) (castPtr src) count + return $ dst `plusPtr` count -{-# INLINE spliceArraysBuffered #-} -spliceArraysBuffered :: forall m a. (MonadIO m, Storable a) +{-# INLINE _spliceArraysBuffered #-} +_spliceArraysBuffered :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a) -spliceArraysBuffered s = do +_spliceArraysBuffered s = do + buffered <- P.foldr S.cons S.nil s + len <- S.sum (S.map length buffered) + spliceArraysLenUnsafe len s + +{-# INLINE spliceArraysRealloced #-} +spliceArraysRealloced :: forall m a. (MonadIO m, Storable a) + => SerialT m (Array a) -> m (Array a) +spliceArraysRealloced s = do idst <- liftIO $ A.newArray (bytesToCount (undefined :: a) - (mkChunkSizeKB 4)) - arr <- S.foldlM' appendArr idst s - liftIO $ shrinkToFit arr + (A.mkChunkSizeKB 4)) - where - - appendArr dst@(Array _ end bound) src = liftIO $ do - let srcLen = aEnd src `minusPtr` unsafeForeignPtrToPtr (aStart src) - dst1 <- - if end `plusPtr` srcLen >= bound - then reallocDouble srcLen dst - else return dst - - withForeignPtr (aStart dst1) $ \_ -> do - withForeignPtr (aStart src) $ \psrc -> do - let pdst = aEnd dst1 - memcpy (castPtr pdst) (castPtr psrc) srcLen - return $ dst1 { aEnd = pdst `plusPtr` srcLen } + arr <- S.foldlM' A.spliceWithDoubling idst s + liftIO $ A.shrinkToFit arr -- | Given a stream of arrays, splice them all together to generate a single -- array. The stream must be /finite/. -- -- @since 0.7.0 -{-# INLINE spliceArrays #-} -spliceArrays :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a) -spliceArrays = spliceArraysBuffered --- spliceArrays = _spliceArraysRealloced +{-# INLINE coalesceArrays #-} +coalesceArrays :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a) +coalesceArrays = spliceArraysRealloced +-- spliceArrays = _spliceArraysBuffered -- | Create an 'Array' from a stream. This is useful when we want to create a -- single array from a stream of unknown size. 'writeN' is at least twice @@ -548,18 +597,18 @@ write = FL.foldl' toArray {-# INLINE transformWith #-} transformWith :: (MonadIO m, Storable a, Storable b) => (SerialT m a -> SerialT m b) -> Array a -> m (Array b) -transformWith f arr = FL.foldl' (toArrayMinChunk (length arr)) $ f (read arr) +transformWith f arr = FL.foldl' (toArrayMinChunk (length arr)) $ f (A.read arr) -- | Fold an array using a 'Fold'. -- -- @since 0.7.0 {-# INLINE foldArray #-} foldArray :: (MonadIO m, Storable a) => Fold m a b -> Array a -> m b -foldArray f arr = FL.foldl' f (read arr) +foldArray f arr = FL.foldl' f (A.read arr) -- | Fold an array using a stream fold operation. -- -- @since 0.7.0 {-# INLINE foldWith #-} foldWith :: (MonadIO m, Storable a) => (SerialT m a -> m b) -> Array a -> m b -foldWith f arr = f (read arr) +foldWith f arr = f (A.read arr) diff --git a/src/Streamly/Mem/Array/Types.hs b/src/Streamly/Mem/Array/Types.hs index f1c9efae0..d1618280e 100644 --- a/src/Streamly/Mem/Array/Types.hs +++ b/src/Streamly/Mem/Array/Types.hs @@ -23,29 +23,34 @@ module Streamly.Mem.Array.Types Array (..) -- * Construction - , unsafeInlineIO , withNewArray , newArray , unsafeSnoc , snoc - , shrinkToFit - , memcpy - , memcmp + , spliceWithDoubling , fromList , fromListN , fromStreamDN -- , fromStreamD + + -- * Streams of arrays , fromStreamDArraysOf , flattenArrays , flattenArraysRev + , coalesceChunksOf + , groupIOVecsOf + , splitOn -- * Elimination , unsafeIndexIO , unsafeIndex , length + , byteLength + , byteCapacity , foldl' , foldr + , splitAt , toStreamD , toStreamDRev @@ -59,7 +64,11 @@ module Streamly.Mem.Array.Types , defaultChunkSize , mkChunkSize , mkChunkSizeKB - , reallocDouble + , unsafeInlineIO + , realloc + , shrinkToFit + , memcpy + , memcmp , unlines ) @@ -74,11 +83,11 @@ import Data.Word (Word8) import Foreign.C.String (CString) import Foreign.C.Types (CSize(..), CInt(..)) import Foreign.ForeignPtr - (ForeignPtr, withForeignPtr, touchForeignPtr) + (ForeignPtr, withForeignPtr, touchForeignPtr, plusForeignPtr) import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) -import Foreign.Ptr (plusPtr, minusPtr, castPtr) +import Foreign.Ptr (plusPtr, minusPtr, castPtr, nullPtr) import Foreign.Storable (Storable(..)) -import Prelude hiding (length, foldr, read, unlines, unwords) +import Prelude hiding (length, foldr, read, unlines, splitAt) import Text.Read (readPrec, readListPrec, readListPrecDefault) import GHC.Base (Addr#, realWorld#) @@ -89,6 +98,7 @@ import GHC.Ptr (Ptr(..)) import Streamly.Fold.Types (Fold(..)) import Streamly.SVar (adaptState) +import Streamly.FileSystem.FDIO (IOVec(..)) import qualified Streamly.Streams.StreamD.Type as D import qualified Streamly.Streams.StreamK as K @@ -165,6 +175,9 @@ foreign import ccall unsafe "string.h memcpy" c_memcpy foreign import ccall unsafe "string.h strlen" c_strlen :: CString -> IO CSize +foreign import ccall unsafe "string.h memchr" c_memchr + :: Ptr Word8 -> Word8 -> CSize -> IO (Ptr Word8) + -- XXX we are converting Int to CSize memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO () memcpy dst src len = c_memcpy dst src (fromIntegral len) >> return () @@ -222,14 +235,14 @@ withNewArray count f = do unsafeSnoc :: forall a. Storable a => Array a -> a -> IO (Array a) unsafeSnoc arr@Array{..} x = do when (aEnd == aBound) $ - error "BUG: unsafeAppend: writing beyond array bounds" + error "BUG: unsafeSnoc: writing beyond array bounds" poke aEnd x touchForeignPtr aStart return $ arr {aEnd = aEnd `plusPtr` (sizeOf (undefined :: a))} {-# INLINE snoc #-} -snoc :: forall a. Storable a => Array a -> a -> Array a -snoc arr@Array {..} x = unsafePerformIO $ +snoc :: forall a. Storable a => Array a -> a -> IO (Array a) +snoc arr@Array {..} x = do if (aEnd == aBound) then do let oldStart = unsafeForeignPtrToPtr aStart @@ -250,37 +263,14 @@ snoc arr@Array {..} x = unsafePerformIO $ touchForeignPtr aStart return $ arr {aEnd = aEnd `plusPtr` (sizeOf (undefined :: a))} - --- | Remove the free space from an Array. -shrinkToFit :: forall a. Storable a => Array a -> IO (Array a) -shrinkToFit arr@Array{..} = do - assert (aEnd <= aBound) (return ()) - if aEnd /= aBound - then do - let oldStart = unsafeForeignPtrToPtr aStart - let size = aEnd `minusPtr` oldStart - newPtr <- mallocPlainForeignPtrAlignedBytes - size (alignment (undefined :: a)) - withForeignPtr newPtr $ \pNew -> do - memcpy (castPtr pNew) (castPtr oldStart) size - touchForeignPtr aStart - let end = pNew `plusPtr` size - return $ Array - { aStart = newPtr - , aEnd = end - , aBound = end - } - else return arr - --- | Expand the free space in the array doubling the size with a minimum --- exapnsion of the specified amount of bytes. -{-# NOINLINE reallocDouble #-} -reallocDouble :: forall a. Storable a => Int -> Array a -> IO (Array a) -reallocDouble minIncrease Array{..} = do +-- | Reallocate the array to the specified size in bytes. If the size is less +-- than the original array the array gets truncated. +{-# NOINLINE realloc #-} +realloc :: forall a. Storable a => Int -> Array a -> IO (Array a) +realloc newSize Array{..} = do assert (aEnd <= aBound) (return ()) let oldStart = unsafeForeignPtrToPtr aStart let size = aEnd `minusPtr` oldStart - newSize = max (size * 2) (size + minIncrease) newPtr <- mallocPlainForeignPtrAlignedBytes newSize (alignment (undefined :: a)) withForeignPtr newPtr $ \pNew -> do @@ -292,6 +282,17 @@ reallocDouble minIncrease Array{..} = do , aBound = pNew `plusPtr` newSize } +-- | Remove the free space from an Array. +shrinkToFit :: forall a. Storable a => Array a -> IO (Array a) +shrinkToFit arr@Array{..} = do + assert (aEnd <= aBound) (return ()) + if aEnd /= aBound + then do + let oldStart = unsafeForeignPtrToPtr aStart + let size = aEnd `minusPtr` oldStart + realloc size arr + else return arr + -- XXX when converting an array of Word8 from a literal string we can simply -- refer to the literal string. Is it possible to write rules such that -- fromList Word8 can be rewritten so that GHC does not first convert the @@ -340,15 +341,30 @@ unsafeIndexIO Array {..} i = unsafeIndex :: forall a. Storable a => Array a -> Int -> a unsafeIndex arr i = let !r = unsafeInlineIO $ unsafeIndexIO arr i in r --- | /O(1)/ Get the length of the array. +-- | /O(1)/ Get the byte length of the array. +-- +-- @since 0.7.0 +{-# INLINE byteLength #-} +byteLength :: Array a -> Int +byteLength Array{..} = + let p = unsafeForeignPtrToPtr aStart + len = aEnd `minusPtr` p + in assert (len >= 0) len + +-- | /O(1)/ Get the length of the array i.e. the number of elements in the +-- array. -- -- @since 0.7.0 {-# INLINE length #-} length :: forall a. Storable a => Array a -> Int -length Array{..} = +length arr = byteLength arr `div` sizeOf (undefined :: a) + +{-# INLINE byteCapacity #-} +byteCapacity :: Array a -> Int +byteCapacity Array{..} = let p = unsafeForeignPtrToPtr aStart - aLen = aEnd `minusPtr` p - in assert (aLen >= 0) (aLen `div` sizeOf (undefined :: a)) + len = aBound `minusPtr` p + in assert (len >= 0) len {-# INLINE_NORMAL toStreamD #-} toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a @@ -729,8 +745,9 @@ defaultChunkSize :: Int defaultChunkSize = mkChunkSizeKB 32 {-# INLINE_NORMAL unlines #-} -unlines :: MonadIO m => D.Stream m (Array Char) -> D.Stream m Char -unlines (D.Stream step state) = D.Stream step' (OuterLoop state) +unlines :: forall m a. (MonadIO m, Storable a) + => a -> D.Stream m (Array a) -> D.Stream m a +unlines sep (D.Stream step state) = D.Stream step' (OuterLoop state) where {-# INLINE_LATE step' #-} step' gst (OuterLoop st) = do @@ -743,7 +760,7 @@ unlines (D.Stream step state) = D.Stream step' (OuterLoop state) D.Stop -> D.Stop step' _ (InnerLoop st _ p end) | p == end = - return $ D.Yield '\n' $ OuterLoop st + return $ D.Yield sep $ OuterLoop st step' _ (InnerLoop st startf p end) = do x <- liftIO $ do @@ -751,4 +768,272 @@ unlines (D.Stream step state) = D.Stream step' (OuterLoop state) touchForeignPtr startf return r return $ D.Yield x (InnerLoop st startf - (p `plusPtr` (sizeOf (undefined :: Char))) end) + (p `plusPtr` (sizeOf (undefined :: a))) end) + +{-# INLINE spliceTwo #-} +spliceTwo :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a) +spliceTwo arr1 arr2 = do + let src1 = unsafeForeignPtrToPtr (aStart arr1) + src2 = unsafeForeignPtrToPtr (aStart arr2) + len1 = aEnd arr1 `minusPtr` src1 + len2 = aEnd arr2 `minusPtr` src2 + + arr <- liftIO $ newArray (len1 + len2) + let dst = unsafeForeignPtrToPtr (aStart arr) + + liftIO $ do + memcpy (castPtr dst) (castPtr src1) len1 + touchForeignPtr (aStart arr1) + memcpy (castPtr (dst `plusPtr` len1)) (castPtr src2) len2 + touchForeignPtr (aStart arr2) + return arr { aEnd = dst `plusPtr` (len1 + len2) } + +-- Splice a new array into a pre-reserved array. The user must ensure that +-- there is enough space in the array. +{-# INLINE spliceWith #-} +spliceWith :: (MonadIO m) => Array a -> Array a -> m (Array a) +spliceWith dst@(Array _ end bound) src = liftIO $ do + let srcLen = byteLength src + if end `plusPtr` srcLen > bound + then error "Bug: spliceIntoUnsafe: Not enough space in the target array" + else + withForeignPtr (aStart dst) $ \_ -> do + withForeignPtr (aStart src) $ \psrc -> do + let pdst = aEnd dst + memcpy (castPtr pdst) (castPtr psrc) srcLen + return $ dst { aEnd = pdst `plusPtr` srcLen } + +-- Splice a new array into a preallocated array, doubling the space if there is +-- no space in the target array. +{-# INLINE spliceWithDoubling #-} +spliceWithDoubling :: (MonadIO m, Storable a) + => Array a -> Array a -> m (Array a) +spliceWithDoubling dst@(Array start end bound) src = do + assert (end <= bound) (return ()) + let srcLen = aEnd src `minusPtr` unsafeForeignPtrToPtr (aStart src) + + dst1 <- + if end `plusPtr` srcLen >= bound + then do + let oldStart = unsafeForeignPtrToPtr start + oldSize = end `minusPtr` oldStart + newSize = max (oldSize * 2) (oldSize + srcLen) + liftIO $ realloc newSize dst + else return dst + spliceWith dst1 src + +data SpliceState s arr + = SpliceInitial s + | SpliceBuffering s arr + | SpliceYielding arr (SpliceState s arr) + | SpliceFinish + +-- XXX can use general grouping combinators to achieve this? +-- | Coalesce adajcent arrays in incoming stream to form bigger arrays of a +-- maximum specified size. +-- +-- @since 0.7.0 +{-# INLINE_NORMAL coalesceChunksOf #-} +coalesceChunksOf :: (MonadIO m, Storable a) + => Int -> D.Stream m (Array a) -> D.Stream m (Array a) +coalesceChunksOf n (D.Stream step state) = D.Stream step' (SpliceInitial state) + + where + + {-# INLINE_LATE step' #-} + step' gst (SpliceInitial st) = do + r <- step gst st + case r of + D.Yield arr s -> return $ + let len = byteLength arr + in if len >= n + then D.Skip (SpliceYielding arr (SpliceInitial s)) + else D.Skip (SpliceBuffering s arr) + D.Skip s -> return $ D.Skip (SpliceInitial s) + D.Stop -> return $ D.Stop + + step' gst (SpliceBuffering st buf) = do + r <- step gst st + case r of + D.Yield arr s -> do + let len = byteLength buf + byteLength arr + if len > n + then return $ D.Skip (SpliceYielding buf (SpliceBuffering s arr)) + else do + buf' <- if byteCapacity buf < n + then liftIO $ realloc n buf + else return buf + buf'' <- spliceWith buf' arr + return $ D.Skip (SpliceBuffering s buf'') + D.Skip s -> return $ D.Skip (SpliceBuffering s buf) + D.Stop -> return $ D.Skip (SpliceYielding buf SpliceFinish) + + step' _ SpliceFinish = return D.Stop + + step' _ (SpliceYielding arr next) = return $ D.Yield arr next + +data GatherState s arr + = GatherInitial s + | GatherBuffering s arr Int + | GatherYielding arr (GatherState s arr) + | GatherFinish + +-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream +-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in +-- each array and a maximum of @maxEntries@ entries in each array. +-- +-- @since 0.7.0 +{-# INLINE_NORMAL groupIOVecsOf #-} +groupIOVecsOf :: MonadIO m + => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec) +groupIOVecsOf n maxIOVLen (D.Stream step state) = + D.Stream step' (GatherInitial state) + + where + + {-# INLINE_LATE step' #-} + step' gst (GatherInitial st) = do + r <- step (adaptState gst) st + case r of + D.Yield arr s -> do + let p = unsafeForeignPtrToPtr (aStart arr) + len = byteLength arr + iov <- liftIO $ newArray maxIOVLen + iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p) + (fromIntegral len)) + if len >= n + then return $ D.Skip (GatherYielding iov' (GatherInitial s)) + else return $ D.Skip (GatherBuffering s iov' len) + D.Skip s -> return $ D.Skip (GatherInitial s) + D.Stop -> return $ D.Stop + + step' gst (GatherBuffering st iov len) = do + r <- step (adaptState gst) st + case r of + D.Yield arr s -> do + let p = unsafeForeignPtrToPtr (aStart arr) + alen = byteLength arr + len' = len + alen + if len' > n || length iov >= maxIOVLen + then do + iov' <- liftIO $ newArray maxIOVLen + iov'' <- liftIO $ unsafeSnoc iov' (IOVec (castPtr p) + (fromIntegral alen)) + return $ D.Skip (GatherYielding iov + (GatherBuffering s iov'' alen)) + else do + iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p) + (fromIntegral alen)) + return $ D.Skip (GatherBuffering s iov' len') + D.Skip s -> return $ D.Skip (GatherBuffering s iov len) + D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish) + + step' _ GatherFinish = return D.Stop + + step' _ (GatherYielding iov next) = return $ D.Yield iov next + +-- | Create two slices of an array without copying the original array. The +-- specified index @i@ is the first index of the second slice. +-- +-- @since 0.7.0 +splitAt :: forall a. Storable a => Int -> Array a -> (Array a, Array a) +splitAt i arr@Array{..} = + let maxIndex = length arr - 1 + in if i < 0 + then error "sliceAt: negative array index" + else if i > maxIndex + then error $ "sliceAt: specified array index " ++ show i + ++ " is beyond the maximum index " ++ show maxIndex + else let off = i * sizeOf (undefined :: a) + p = unsafeForeignPtrToPtr aStart `plusPtr` off + in ( Array + { aStart = aStart + , aEnd = p + , aBound = p + } + , Array + { aStart = aStart `plusForeignPtr` off + , aEnd = aEnd + , aBound = aBound + } + ) + +-- Drops the separator byte +{-# INLINE breakOn #-} +breakOn :: MonadIO m + => Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8)) +breakOn sep arr@Array{..} = liftIO $ do + let p = unsafeForeignPtrToPtr aStart + loc <- c_memchr p sep (fromIntegral $ aEnd `minusPtr` p) + return $ + if loc == nullPtr + then (arr, Nothing) + else + ( Array + { aStart = aStart + , aEnd = loc + , aBound = loc + } + , Just $ Array + { aStart = aStart `plusForeignPtr` (loc `minusPtr` p + 1) + , aEnd = aEnd + , aBound = aBound + } + ) + +data SplitState s arr + = Initial s + | Buffering s arr + | Splitting s arr + | Yielding arr (SplitState s arr) + | Finishing + +-- | Split a stream of arrays on a given separator byte, dropping the separator +-- and coalescing all the arrays between two separators into a single array. +-- +-- @since 0.7.0 +{-# INLINE_NORMAL splitOn #-} +splitOn + :: MonadIO m + => Word8 + -> D.Stream m (Array Word8) + -> D.Stream m (Array Word8) +splitOn byte (D.Stream step state) = D.Stream step' (Initial state) + + where + + {-# INLINE_LATE step' #-} + step' gst (Initial st) = do + r <- step gst st + case r of + D.Yield arr s -> do + (arr1, marr2) <- breakOn byte arr + return $ case marr2 of + Nothing -> D.Skip (Buffering s arr1) + Just arr2 -> D.Skip (Yielding arr1 (Splitting s arr2)) + D.Skip s -> return $ D.Skip (Initial s) + D.Stop -> return $ D.Stop + + step' gst (Buffering st buf) = do + r <- step gst st + case r of + D.Yield arr s -> do + (arr1, marr2) <- breakOn byte arr + buf' <- spliceTwo buf arr1 + return $ case marr2 of + Nothing -> D.Skip (Buffering s buf') + Just x -> D.Skip (Yielding buf' (Splitting s x)) + D.Skip s -> return $ D.Skip (Buffering s buf) + D.Stop -> return $ + if byteLength buf == 0 + then D.Stop + else D.Skip (Yielding buf Finishing) + + step' _ (Splitting st buf) = do + (arr1, marr2) <- breakOn byte buf + return $ case marr2 of + Nothing -> D.Skip $ Buffering st arr1 + Just arr2 -> D.Skip $ Yielding arr1 (Splitting st arr2) + + step' _ (Yielding arr next) = return $ D.Yield arr next + step' _ Finishing = return $ D.Stop diff --git a/src/Streamly/String.hs b/src/Streamly/String.hs index 92a072c22..5ca9d2c66 100644 --- a/src/Streamly/String.hs +++ b/src/Streamly/String.hs @@ -187,7 +187,7 @@ words = FL.wordsBy isSpace toArray {-# INLINE unlines #-} unlines :: (MonadIO m, IsStream t) => t m (Array Char) -> t m Char -unlines = D.fromStreamD . A.unlines . D.toStreamD +unlines = D.fromStreamD . A.unlines '\n' . D.toStreamD {-# INLINE unwords #-} unwords :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char diff --git a/streamly.cabal b/streamly.cabal index 7e4081f67..490bb818e 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -207,6 +207,9 @@ library , Streamly.Sink , Streamly.Pipe.Types + , Streamly.FileSystem.IOVec + , Streamly.FileSystem.FDIO + exposed-modules: Streamly.Prelude , Streamly.Time , Streamly @@ -217,8 +220,9 @@ library -- IO devices , Streamly.Mem.Array - , Streamly.FileSystem.File + , Streamly.FileSystem.FD , Streamly.FileSystem.Handle + , Streamly.FileSystem.File , Streamly.Network.Socket , Streamly.Network.Server , Streamly.Network.Client