Add framing/buffering of streams using Maybe

This commit is contained in:
Harendra Kumar 2021-11-07 19:55:00 +05:30
parent 09d4257c68
commit ae1d8b1404
2 changed files with 35 additions and 1 deletions

View File

@ -13,6 +13,11 @@
-- fundamental stream IO APIs built on top of those are -- fundamental stream IO APIs built on top of those are
-- 'readChunksWithBufferOf' and 'writeChunks'. Rest of this module is just -- 'readChunksWithBufferOf' and 'writeChunks'. Rest of this module is just
-- combinatorial programming using these. -- combinatorial programming using these.
--
-- We can achieve line buffering by folding lines in the input stream into a
-- stream of arrays using Stream.splitOn or Fold.takeEndBy_ and similar
-- operations. One can wrap the input stream in 'Maybe' type and then use
-- 'writeMaybesWithBufferOf' to achieve user controlled buffering.
-- TODO: Need a separate module for pread/pwrite based reading writing for -- TODO: Need a separate module for pread/pwrite based reading writing for
-- seekable devices. Stateless read/write can be helpful in multithreaded -- seekable devices. Stateless read/write can be helpful in multithreaded
@ -51,6 +56,7 @@ module Streamly.Internal.FileSystem.Handle
-- , writeByFrames -- , writeByFrames
-- , writeLines -- , writeLines
, writeWithBufferOf , writeWithBufferOf
, writeMaybesWithBufferOf
, putBytes , putBytes
, putBytesWithBufferOf , putBytesWithBufferOf
@ -101,6 +107,7 @@ where
import Control.Exception (assert) import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.IO.Class (MonadIO(..))
import Data.Function ((&)) import Data.Function ((&))
import Data.Maybe (isNothing, fromJust)
import Data.Word (Word8) import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr) import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (minusPtr, plusPtr) import Foreign.Ptr (minusPtr, plusPtr)
@ -509,6 +516,19 @@ writeChunksWithBufferOf n h = lpackArraysChunksOf n (writeChunks h)
writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 () writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 ()
writeWithBufferOf n h = FL.chunksOf n (writeNUnsafe n) (writeChunks h) writeWithBufferOf n h = FL.chunksOf n (writeNUnsafe n) (writeChunks h)
-- | Write a stream of 'Maybe' values. Keep buffering the just values in an
-- array until a 'Nothing' is encountered or the buffer size exceeds the
-- specified limit, at that point flush the buffer to the handle.
--
-- /Pre-release/
{-# INLINE writeMaybesWithBufferOf #-}
writeMaybesWithBufferOf :: (MonadIO m )
=> Int -> Handle -> Fold m (Maybe Word8) ()
writeMaybesWithBufferOf n h =
let writeNJusts = FL.lmap fromJust $ A.writeN n
writeOnNothing = FL.takeEndBy_ isNothing writeNJusts
in FL.many writeOnNothing (writeChunks h)
-- | Like 'writeWithBufferOf' but uses the experimental 'Refold' API. -- | Like 'writeWithBufferOf' but uses the experimental 'Refold' API.
-- --
-- /Internal/ -- /Internal/

View File

@ -47,6 +47,7 @@ module Streamly.Internal.Network.Socket
-- , writeUtf8ByLines -- , writeUtf8ByLines
-- , writeByFrames -- , writeByFrames
, writeWithBufferOf , writeWithBufferOf
, writeMaybesWithBufferOf
, putChunks , putChunks
, putBytesWithBufferOf , putBytesWithBufferOf
@ -66,8 +67,8 @@ import Control.Exception (onException)
import Control.Monad.Catch (MonadCatch, finally, MonadMask) import Control.Monad.Catch (MonadCatch, finally, MonadMask)
import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (forM_, when) import Control.Monad (forM_, when)
import Data.Maybe (isNothing, fromJust)
import Data.Word (Word8) import Data.Word (Word8)
import Foreign.Ptr (minusPtr, plusPtr, Ptr, castPtr) import Foreign.Ptr (minusPtr, plusPtr, Ptr, castPtr)
import Foreign.Storable (Storable(..)) import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes) import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
@ -497,6 +498,19 @@ putBytesWithBufferOf n h m = putChunks h $ AS.arraysOf n m
writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 () writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf n h = FL.chunksOf n (A.writeNUnsafe n) (writeChunks h) writeWithBufferOf n h = FL.chunksOf n (A.writeNUnsafe n) (writeChunks h)
-- | Write a stream of 'Maybe' values. Keep buffering the 'Just' values in an
-- array. Write the array to the 'Handle' as soon as a 'Nothing' is encountered
-- or the buffer size exceeds the specified limit.
--
-- /Pre-release/
{-# INLINE writeMaybesWithBufferOf #-}
writeMaybesWithBufferOf :: (MonadIO m )
=> Int -> Socket -> Fold m (Maybe Word8) ()
writeMaybesWithBufferOf n h =
let writeNJusts = FL.lmap fromJust $ A.writeN n
writeOnNothing = FL.takeEndBy_ isNothing writeNJusts
in FL.many writeOnNothing (writeChunks h)
-- > write = 'writeWithBufferOf' defaultChunkSize -- > write = 'writeWithBufferOf' defaultChunkSize
-- --
-- | Write a byte stream to a file handle. Combines the bytes in chunks of size -- | Write a byte stream to a file handle. Combines the bytes in chunks of size