Add support for new line boundary split and handle re position till new line

This commit is contained in:
Ranjeet Kumar Ranjan 2021-01-18 12:18:21 +05:30
parent 86745a756d
commit bd046dfc6d

View File

@ -30,6 +30,7 @@ module Streamly.Internal.FileSystem.Handle
, toChunksWithBufferOf
, toChunks
, toChunksTillNewLine
, getChunks
-- ** Write to Handle
@ -103,10 +104,10 @@ import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr)
import Foreign.Ptr (minusPtr, plusPtr, Ptr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import System.IO (Handle, hGetBufSome, hPutBuf, stdin, stdout)
import System.IO (Handle, SeekMode(RelativeSeek), hSeek, hGetBufSome, hPutBuf, stdin, stdout)
import Prelude hiding (read)
import Streamly.Prelude (MonadAsync)
@ -163,6 +164,29 @@ readArrayUpto size h = do
unsafeFreezeWithShrink $
mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
{-# INLINABLE readArrayUptoNL #-}
readArrayUptoNL :: Int -> Handle -> IO (Array Word8)
readArrayUptoNL size h = do
ptr <- mallocPlainForeignPtrBytes size
withForeignPtr ptr $ \p -> do
n <- hGetBufSome h p size
x <- getNewlinePos p (n-1)
hSeek h RelativeSeek $ fromIntegral (x-n)
return $
unsafeFreezeWithShrink $
mutableArray ptr (p `plusPtr` x) (p `plusPtr` size)
getNewlinePos :: Ptr Word8 -> Int -> IO Int
getNewlinePos = go
where
go _ (-1) = return 0
go p0 end0 = do
nn <- peek (p0 `plusPtr` end0) :: IO Word8
if nn == (10 :: Word8)
then return (end0+1)
else go p0 (end0-1)
-------------------------------------------------------------------------------
-- Stream of Arrays IO
-------------------------------------------------------------------------------
@ -187,10 +211,22 @@ _toChunksWithBufferOf size h = go
-- The actual size read may be less than or equal to @size@.
--
-- @since 0.7.0
{-# INLINE_NORMAL toChunksWithBufferOfNL #-}
toChunksWithBufferOfNL :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8)
toChunksWithBufferOfNL size h = D.fromStreamD (D.Stream step ())
where
{-# INLINE_LATE step #-}
step _ _ = do
arr <- liftIO $ readArrayUptoNL size h
return $
case A.length arr of
0 -> D.Stop
_ -> D.Yield arr ()
{-# INLINE_NORMAL toChunksWithBufferOf #-}
toChunksWithBufferOf :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8)
toChunksWithBufferOf size h = D.fromStreamD (D.Stream step ())
where
where
{-# INLINE_LATE step #-}
step _ _ = do
arr <- liftIO $ readArrayUpto size h
@ -231,6 +267,10 @@ readChunksWithBufferOf = Unfold step return
toChunks :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
toChunks = toChunksWithBufferOf defaultChunkSize
{-# INLINE toChunksTillNewLine #-}
toChunksTillNewLine :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
toChunksTillNewLine = toChunksWithBufferOfNL defaultChunkSize
-- | Read a stream of chunks from standard input. The maximum size of a single
-- chunk is limited to @defaultChunkSize@. The actual size read may be less
-- than @defaultChunkSize@.