Split stdio ops into a Console.Stdio module

Change the FileSystem.Handle from* APIs to put*

Mutable write APIs would use a "put" prefix to make the name more intuitive.
This commit is contained in:
Harendra Kumar 2021-04-22 18:24:07 +05:30
parent 0cfc42394d
commit 6132eb3acb
8 changed files with 355 additions and 98 deletions

View File

@ -103,7 +103,7 @@ o_1_space_copy_read env =
-- | Send the file contents to /dev/null
readFromBytesNull :: Handle -> Handle -> IO ()
readFromBytesNull inh devNull = IFH.fromBytes devNull $ S.unfold FH.read inh
readFromBytesNull inh devNull = IFH.putBytes devNull $ S.unfold FH.read inh
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'readFromBytesNull
@ -116,7 +116,7 @@ inspect $ 'readFromBytesNull `hasNoType` ''D.FoldMany
-- | Send the file contents ('defaultChunkSize') to /dev/null
readWithBufferOfFromBytesNull :: Handle -> Handle -> IO ()
readWithBufferOfFromBytesNull inh devNull =
IFH.fromBytes devNull
IFH.putBytes devNull
$ S.unfold FH.readWithBufferOf (defaultChunkSize, inh)
#ifdef INSPECTION
@ -150,7 +150,7 @@ _readChunksWithBufferOf inh devNull = IUF.fold fld unf (defaultChunkSize, inh)
o_1_space_copy_fromBytes :: BenchEnv -> [Benchmark]
o_1_space_copy_fromBytes env =
[ bgroup "copy/fromBytes"
[ bgroup "copy/putBytes"
[ mkBench "rawToNull" env $ \inh _ ->
readFromBytesNull inh (nullH env)
, mkBench "FH.readWithBufferOf" env $ \inh _ ->

View File

@ -71,7 +71,7 @@ fromToBytesBracket_Stream :: Handle -> Handle -> IO ()
fromToBytesBracket_Stream inh devNull =
let readEx = IP.bracket_ (return ()) (\_ -> hClose inh)
(\_ -> IFH.toBytes inh)
in IFH.fromBytes devNull $ readEx
in IFH.putBytes devNull $ readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'fromToBytesBracket_Stream
@ -82,7 +82,7 @@ fromToBytesBracketStream :: Handle -> Handle -> IO ()
fromToBytesBracketStream inh devNull =
let readEx = S.bracket (return ()) (\_ -> hClose inh)
(\_ -> IFH.toBytes inh)
in IFH.fromBytes devNull $ readEx
in IFH.putBytes devNull $ readEx
readWriteBeforeAfterStream :: Handle -> Handle -> IO ()
readWriteBeforeAfterStream inh devNull =

65
design/container-api.md Normal file
View File

@ -0,0 +1,65 @@
# Data Containers
Containers are persistent containers of data e.g. files, arrays, maps. For
consistency we use similar API for all such containers where possible. Usually
the API names are relative to the current module e.g. toBytes in an array
module means converting the array to bytes, it sounds intuitive if read
qualified with the module name e.g. Array.toBytes. This doc lists some
conventions and guidelines to be followed.
## Unfolds and Folds
* Unfolds are named as "read" or with a "read" prefix (e.g. readChunks).
* Folds are named as "write" or with a "write" prefix (e.g. writeChunks).
## From and Put
* Immutable construction from some external source is named with a "from"
prefix (e.g. fromList).
* Mutation of an existing container uses a "put" prefix instead of "from". When
an API uses an existing container to write to and does not return a newly
constructed container then use "put".
* "from" vs "put": "from" assumes creation of a new object, it may fail if the
object being created already exists (e.g. the file exists), it may not take a
lock as it assumes immutability. "put" may create a new object or overwrite
an existing one, it may take a lock for writing as it assumes mutability.
## To and Get
* Converting the complete object to an external representation is prefixed with
"to" (e.g. toBytes).
* For mutable objects "get" APIs may be used instead of "to" APIs.
* "to" vs "get": "to" assumes immutable object so does not have to take a lock.
"get" assumes mutable object so may take a lock.
## Append
* Use "append" prefix for appending data at the end of a mutable container
## Random Access (Arrays)
* getIndex (for arrays)
* getSlice/getRange/getIndices
* putIndex (for mutable arrays)
* append (for mutable arrays)
## Key Access (Maps)
* getKey
* findKey (test existence)
* createKey (insert new key for mutable maps)
* putKey (insert or update for mutable maps)
* updateKey (update existing or fail)
* deleteKey (delete existing or fail)
* destroyKey (delete existing or not)
## Points to Consider
The fold and unfold APIs can be used to express the to/from stream APIs. So we
may not need both, it may just add to more APIs being proliferated.
We may need an "append" style fold as well? What would "stream" append
operations be called then?
We may need locked version of "write" folds for mutable containers for
concurrent access.

View File

@ -0,0 +1,31 @@
-- |
-- Module : Streamly.Console.Stdio
-- Copyright : (c) 2021 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Combinators to work with standard input, output and error streams.
--
-- See also: "Streamly.Internal.Console.Stdio"
module Streamly.Console.Stdio
(
-- * Read (stdin)
read
, readChunks
-- * Write (stdout)
, write
, writeChunks
-- * Write (stderr)
, writeErr
, writeErrChunks
)
where
import Streamly.Internal.Console.Stdio
import Prelude hiding (read)

View File

@ -0,0 +1,229 @@
-- |
-- Module : Streamly.Internal.Console.Stdio
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Console.Stdio
(
-- * Read
read
, getBytes
, getChars
, readChunks
, getChunks
-- , getChunksLn
-- , getStringsWith -- get strings using the supplied decoding
-- , getStrings -- get strings of complete chars,
-- leave any partial chars for next string
-- , getStringsLn -- get lines decoded as char strings
-- * Write
, write
, writeErr
, putBytes -- Buffered (32K)
, putChars
, writeChunks
, writeErrChunks
, putChunks -- Unbuffered
, putStringsWith
, putStrings
, putStringsLn
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import System.IO (stdin, stdout, stderr)
import Prelude hiding (read)
import Streamly.Internal.Data.Array.Foreign.Type (Array(..))
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Unfold (Unfold)
import Streamly.Internal.Data.Fold (Fold)
import qualified Streamly.Internal.Data.Array.Foreign as Array
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.FileSystem.Handle as Handle
import qualified Streamly.Internal.Unicode.Stream as Unicode
-------------------------------------------------------------------------------
-- Reads
-------------------------------------------------------------------------------
-- | Unfold standard input into a stream of 'Word8'.
--
-- /Pre-release/
{-# INLINE read #-}
read :: MonadIO m => Unfold m () Word8
read = Unfold.lmap (\() -> stdin) Handle.read
-- | Read a byte stream from standard input.
--
-- > getBytes = Handle.toBytes stdin
-- > getBytes = Stream.unfold Stdio.read ()
--
-- /Pre-release/
--
{-# INLINE getBytes #-}
getBytes :: MonadIO m => SerialT m Word8
getBytes = Handle.toBytes stdin
-- | Read a character stream from Utf8 encoded standard input.
--
-- > getChars = Unicode.decodeUtf8 Stdio.getBytes
--
-- /Pre-release/
--
{-# INLINE getChars #-}
getChars :: MonadIO m => SerialT m Char
getChars = Unicode.decodeUtf8 getBytes
-- | Unfolds standard input into a stream of 'Word8' arrays.
--
-- /Pre-release/
{-# INLINE readChunks #-}
readChunks :: MonadIO m => Unfold m () (Array Word8)
readChunks = Unfold.lmap (\() -> stdin) Handle.readChunks
-- | Read a stream of chunks from standard input. The maximum size of a single
-- chunk is limited to @defaultChunkSize@. The actual size read may be less
-- than @defaultChunkSize@.
--
-- > getChunks = Handle.toChunks stdin
-- > getChunks = Stream.unfold Stdio.readChunks ()
--
-- /Pre-release/
--
{-# INLINE getChunks #-}
getChunks :: MonadIO m => SerialT m (Array Word8)
getChunks = Handle.toChunks stdin
{-
-- | Read UTF8 encoded lines from standard input.
--
-- You may want to process the input byte stream directly using appropriate
-- folds for more efficient processing.
--
-- /Pre-release/
--
{-# INLINE getChunksLn #-}
getChunksLn :: MonadIO m => SerialT m (Array Word8)
getChunksLn = (Stream.splitWithSuffix (== '\n') f) getChars
-- XXX Need to implement Fold.unfoldMany, should be easy for
-- non-terminating folds, but may be tricky for terminating folds. See
-- Array Stream folds.
where f = Fold.unfoldMany Unicode.readCharUtf8 Array.write
-}
-------------------------------------------------------------------------------
-- Writes
-------------------------------------------------------------------------------
-- | Fold a stream of 'Word8' to standard output.
--
-- /Pre-release/
{-# INLINE write #-}
write :: MonadIO m => Fold m Word8 ()
write = Handle.write stdout
-- | Fold a stream of 'Word8' to standard error.
--
-- /Pre-release/
{-# INLINE writeErr #-}
writeErr :: MonadIO m => Fold m Word8 ()
writeErr = Handle.write stderr
-- | Write a stream of bytes to standard output.
--
-- > putBytes = Handle.putBytes stdout
-- > putBytes = Stream.fold Stdio.write
--
-- /Pre-release/
--
{-# INLINE putBytes #-}
putBytes :: MonadIO m => SerialT m Word8 -> m ()
putBytes = Handle.putBytes stdout
-- | Encode a character stream to Utf8 and write it to standard output.
--
-- > putChars = Stdio.putBytes . Unicode.encodeUtf8
--
-- /Pre-release/
--
{-# INLINE putChars #-}
putChars :: MonadIO m => SerialT m Char -> m ()
putChars = putBytes . Unicode.encodeUtf8
-- | Fold a stream of @Array Word8@ to standard output.
--
-- /Pre-release/
{-# INLINE writeChunks #-}
writeChunks :: MonadIO m => Fold m (Array Word8) ()
writeChunks = Handle.writeChunks stdout
-- | Fold a stream of @Array Word8@ to standard error.
--
-- /Pre-release/
{-# INLINE writeErrChunks #-}
writeErrChunks :: MonadIO m => Fold m (Array Word8) ()
writeErrChunks = Handle.writeChunks stderr
-- | Write a stream of chunks to standard output.
--
-- > putChunks = Handle.putChunks stdout
-- > putChunks = Stream.fold Stdio.writeChunks
--
-- /Pre-release/
--
{-# INLINE putChunks #-}
putChunks :: MonadIO m => SerialT m (Array Word8) -> m ()
putChunks = Handle.putChunks stdout
-------------------------------------------------------------------------------
-- Line buffered
-------------------------------------------------------------------------------
-- XXX We need to write transformations as pipes so that they can be applied to
-- folds as well as unfolds/streams. Non-backtracking (one-to-one, one-to-many,
-- filters, reducers) transformations may be easy so we can possibly start with
-- those.
--
-- | Write a stream of strings to standard output using the supplied encoding.
-- Output is flushed to the device for each string.
--
-- /Pre-release/
--
{-# INLINE putStringsWith #-}
putStringsWith :: MonadIO m
=> (SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putStringsWith encode = putChunks . Unicode.encodeStrings encode
-- | Write a stream of strings to standard output using UTF8 encoding. Output
-- is flushed to the device for each string.
--
-- /Pre-release/
--
{-# INLINE putStrings #-}
putStrings :: MonadIO m => SerialT m String -> m ()
putStrings = putStringsWith Unicode.encodeUtf8
-- | Like 'putStrings' but adds a newline at the end of each string.
--
-- XXX This is not portable, on Windows we need to use "\r\n" instead.
--
-- /Pre-release/
--
{-# INLINE putStringsLn #-}
putStringsLn :: MonadIO m => SerialT m String -> m ()
putStringsLn =
putChunks
. Stream.intersperseSuffix (return $ Array.fromList [10])
. Unicode.encodeStrings Unicode.encodeUtf8

View File

@ -1148,7 +1148,7 @@ intersperseBySpan _n _f _xs = undefined
--
-- /Pre-release/
{-# INLINE intersperseSuffix #-}
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
intersperseSuffix m = fromStreamD . D.intersperseSuffix m . toStreamD
-- | Insert a side effect after consuming an element of a stream.
@ -1174,7 +1174,7 @@ intersperseSuffix_ m = fromStreamD . D.intersperseSuffix_ m . toStreamD
-- /Pre-release/
--
{-# INLINE intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: (IsStream t, MonadAsync m)
intersperseSuffixBySpan :: (IsStream t, Monad m)
=> Int -> m a -> t m a -> t m a
intersperseSuffixBySpan n eff =
fromStreamD . D.intersperseSuffixBySpan n eff . toStreamD

View File

@ -20,7 +20,6 @@ module Streamly.Internal.FileSystem.Handle
, toBytes
, toBytesWithBufferOf
, getBytes
-- -- * Array Read
-- , readArrayUpto
@ -30,7 +29,6 @@ module Streamly.Internal.FileSystem.Handle
, toChunksWithBufferOf
, toChunks
, getChunks
-- ** Write to Handle
-- Byte stream write (Folds)
@ -43,8 +41,8 @@ module Streamly.Internal.FileSystem.Handle
, writeWithBufferOf
-- Byte stream write (Streams)
, fromBytes
, fromBytesWithBufferOf
, putBytes
, putBytesWithBufferOf
-- -- * Array Write
, writeArray
@ -52,12 +50,8 @@ module Streamly.Internal.FileSystem.Handle
, writeChunksWithBufferOf
-- -- * Array stream Write
, fromChunksWithBufferOf
, fromChunks
, putChunksWithBufferOf
, putChunks
, putStrings
, putBytes
, putLines
-- -- * Random Access (Seek)
-- -- | Unlike the streaming APIs listed above, these APIs apply to devices or
@ -106,10 +100,9 @@ import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import System.IO (Handle, hGetBufSome, hPutBuf, stdin, stdout)
import System.IO (Handle, hGetBufSome, hPutBuf)
import Prelude hiding (read)
import Streamly.Prelude (MonadAsync)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold.Type (Fold2(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
@ -124,7 +117,6 @@ import Streamly.Internal.Data.Array.Stream.Foreign (lpackArraysChunksOf)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array.Foreign as IA
import qualified Streamly.Internal.Data.Array.Stream.Foreign as AS
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Data.Array.Foreign as A
@ -231,28 +223,6 @@ readChunksWithBufferOf = Unfold step return
toChunks :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
toChunks = toChunksWithBufferOf defaultChunkSize
-- | Read a stream of chunks from standard input. The maximum size of a single
-- chunk is limited to @defaultChunkSize@. The actual size read may be less
-- than @defaultChunkSize@.
--
-- > getChunks = toChunks stdin
--
-- /Pre-release/
--
{-# INLINE getChunks #-}
getChunks :: (IsStream t, MonadIO m) => t m (Array Word8)
getChunks = toChunks stdin
-- | Read a stream of bytes from standard input.
--
-- > getBytes = toBytes stdin
--
-- /Pre-release/
--
{-# INLINE getBytes #-}
getBytes :: (IsStream t, MonadIO m) => t m Word8
getBytes = toBytes stdin
-- | Unfolds a handle into a stream of 'Word8' arrays. Requests to the IO
-- device are performed using a buffer of size
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'. The
@ -340,73 +310,31 @@ writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen
-- | Write a stream of arrays to a handle.
--
-- @since 0.7.0
{-# INLINE fromChunks #-}
fromChunks :: (MonadIO m, Storable a)
=> Handle -> SerialT m (Array a) -> m ()
fromChunks h = S.mapM_ (liftIO . writeArray h)
-- | Write a stream of chunks to standard output.
--
-- /Pre-release/
--
{-# INLINE putChunks #-}
putChunks :: (MonadIO m, Storable a) => SerialT m (Array a) -> m ()
putChunks = fromChunks stdout
putChunks :: (MonadIO m, Storable a)
=> Handle -> SerialT m (Array a) -> m ()
putChunks h = S.mapM_ (liftIO . writeArray h)
-- XXX use an unfold so that we can put any type of strings.
-- | Write a stream of strings to standard output using the supplied encoding.
-- Output is flushed to the device for each string.
--
-- /Pre-release/
--
{-# INLINE putStrings #-}
putStrings :: MonadAsync m
=> (SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putStrings encode = putChunks . S.mapM (IA.fromStream . encode . S.fromList)
-- XXX use an unfold so that we can put lines from any object
-- | Write a stream of strings as separate lines to standard output using the
-- supplied encoding. Output is line buffered i.e. the output is written to the
-- device as soon as a newline is encountered.
--
-- /Pre-release/
--
{-# INLINE putLines #-}
putLines :: MonadAsync m
=> (SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putLines encode = putChunks . S.mapM
(\xs -> IA.fromStream $ encode (S.fromList (xs ++ "\n")))
-- | Write a stream of bytes from standard output.
--
-- > putBytes = fromBytes stdout
--
-- /Pre-release/
--
{-# INLINE putBytes #-}
putBytes :: MonadIO m => SerialT m Word8 -> m ()
putBytes = fromBytes stdout
-- | @fromChunksWithBufferOf bufsize handle stream@ writes a stream of arrays
-- | @putChunksWithBufferOf bufsize handle stream@ writes a stream of arrays
-- to @handle@ after coalescing the adjacent arrays in chunks of @bufsize@.
-- The chunk size is only a maximum and the actual writes could be smaller as
-- we do not split the arrays to fit exactly to the specified size.
--
-- @since 0.7.0
{-# INLINE fromChunksWithBufferOf #-}
fromChunksWithBufferOf :: (MonadIO m, Storable a)
{-# INLINE putChunksWithBufferOf #-}
putChunksWithBufferOf :: (MonadIO m, Storable a)
=> Int -> Handle -> SerialT m (Array a) -> m ()
fromChunksWithBufferOf n h xs = fromChunks h $ AS.compact n xs
putChunksWithBufferOf n h xs = putChunks h $ AS.compact n xs
-- | @fromBytesWithBufferOf bufsize handle stream@ writes @stream@ to @handle@
-- | @putBytesWithBufferOf bufsize handle stream@ writes @stream@ to @handle@
-- in chunks of @bufsize@. A write is performed to the IO device as soon as we
-- collect the required input size.
--
-- @since 0.7.0
{-# INLINE fromBytesWithBufferOf #-}
fromBytesWithBufferOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
fromBytesWithBufferOf n h m = fromChunks h $ S.arraysOf n m
-- fromBytesWithBufferOf n h m = fromChunks h $ AS.arraysOf n m
{-# INLINE putBytesWithBufferOf #-}
putBytesWithBufferOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
putBytesWithBufferOf n h m = putChunks h $ S.arraysOf n m
-- putBytesWithBufferOf n h m = putChunks h $ AS.arraysOf n m
-- > write = 'writeWithBufferOf' A.defaultChunkSize
--
@ -417,9 +345,9 @@ fromBytesWithBufferOf n h m = fromChunks h $ S.arraysOf n m
-- need some extra perf boost.
--
-- @since 0.7.0
{-# INLINE fromBytes #-}
fromBytes :: MonadIO m => Handle -> SerialT m Word8 -> m ()
fromBytes = fromBytesWithBufferOf defaultChunkSize
{-# INLINE putBytes #-}
putBytes :: MonadIO m => Handle -> SerialT m Word8 -> m ()
putBytes = putBytesWithBufferOf defaultChunkSize
-- | Write a stream of arrays to a handle. Each array in the stream is written
-- to the device as a separate IO request.

View File

@ -364,6 +364,7 @@ library
-- Filesystem/IO
, Streamly.FileSystem.Handle
, Streamly.Console.Stdio
-- Network/IO
, Streamly.Network.Socket
@ -504,6 +505,9 @@ library
, Streamly.Internal.FileSystem.FDIO
, Streamly.Internal.FileSystem.FD
-- streamly-console
, Streamly.Internal.Console.Stdio
-- streamly-network
, Streamly.Internal.Network.Socket
, Streamly.Internal.Network.Inet.TCP