Implement concatSequence for parser

concatSequence collects sequential parses of
parsers in a serial stream using a fold.

* Wrapper and internal implementation
* Benchmark and tests
* Documentation update
This commit is contained in:
Ishan Bhanuka 2022-05-26 21:03:16 +05:30 committed by Harendra Kumar
parent 4fa2d548fa
commit 9d68f58a21
5 changed files with 111 additions and 16 deletions

View File

@ -413,6 +413,10 @@ parseBreak s = do
Left (_ :: SomeException) -> return ()
Right (_, s1) -> parseBreak s1
{-# INLINE concatSequence #-}
concatSequence :: MonadCatch m => SerialT m Int -> m ()
concatSequence = IP.parse $ PR.concatSequence FL.drain $ S.repeat PR.one
-------------------------------------------------------------------------------
-- Benchmarks
-------------------------------------------------------------------------------
@ -455,6 +459,7 @@ o_1_space_serial value =
, benchIOSink value "parseMany (take all)" (parseMany value)
, benchIOSink value "parseIterate (take 1)" (parseIterate 1)
, benchIOSink value "parseIterate (take all)" (parseIterate value)
, benchIOSink value "concatSequence" concatSequence
]
o_1_space_filesystem :: BenchEnv -> [Benchmark]

View File

@ -32,6 +32,7 @@ import qualified Streamly.Internal.Data.Parser.ParserD as PR
import qualified Streamly.Internal.Data.Producer as Producer
import qualified Streamly.Internal.Data.Producer.Source as Source
import qualified Streamly.Internal.Data.Stream.IsStream as IP
import qualified Streamly.Internal.Data.Stream.StreamD as D
import Gauge
import Streamly.Prelude (SerialT, MonadAsync, IsStream)
@ -196,6 +197,10 @@ longestAllAny value =
(drainWhile (<= value))
)
{-# INLINE sequenceParser #-}
sequenceParser :: MonadCatch m => SerialT m Int -> m ()
sequenceParser = IP.parseD (PR.sequence FL.drain (D.repeat (PR.satisfy $ const True)))
-------------------------------------------------------------------------------
-- Spanning
-------------------------------------------------------------------------------
@ -337,6 +342,7 @@ o_1_space_serial value =
, benchIOSink value "teeFst (all,any)" $ teeFstAllAny value
, benchIOSink value "shortest (all,any)" $ shortestAllAny value
, benchIOSink value "longest (all,any)" $ longestAllAny value
, benchIOSink value "sequenceParser" sequenceParser
]
o_1_space_serial_spanning :: Int -> [Benchmark]

View File

@ -245,10 +245,12 @@ import Prelude hiding
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Parser.ParserK.Type (Parser)
import Streamly.Internal.Data.Stream.Serial (SerialT(getSerialT))
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Parser.ParserD as D
import qualified Streamly.Internal.Data.Parser.ParserK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as SD
--
-- $setup
@ -1193,19 +1195,22 @@ takeP i p = D.toParserK $ D.takeP i $ D.fromParserK p
-- Sequential Collection
-------------------------------------------------------------------------------
--
-- | @concatSequence f t@ collects sequential parses of parsers in the
-- container @t@ using the fold @f@. Fails if the input ends or any of the
-- parsers fail.
-- | @concatSequence f p@ collects sequential parses of parsers in a
-- serial stream @p@ using the fold @f@. Fails if the input ends or any
-- of the parsers fail.
--
-- This is same as 'Data.Traversable.sequence' but more efficient.
-- An even more efficient implementation can use ParserD type Parser in
-- the SerialT stream.
--
-- /Unimplemented/
-- /Pre-release/
--
{-# INLINE concatSequence #-}
concatSequence ::
-- Foldable t =>
Fold m b c -> t (Parser m a b) -> Parser m a c
concatSequence _f _p = undefined
MonadCatch m =>
Fold m b c -> SerialT m (Parser m a b) -> Parser m a c
concatSequence f p =
let sp = fmap D.fromParserK $ SD.fromStreamK $ getSerialT p
in D.toParserK $ D.sequence f sp
-- | Map a 'Parser' returning function on the result of a 'Parser'.
--

View File

@ -1901,14 +1901,79 @@ sepBy
-------------------------------------------------------------------------------
--
-- | See 'Streamly.Internal.Data.Parser.sequence'.
--
-- /Unimplemented/
--
{-# INLINE sequence #-}
sequence ::
-- Foldable t =>
Fold m b c -> t (Parser m a b) -> Parser m a c
sequence _f _p = undefined
sequence :: MonadThrow m =>
Fold m b c -> D.Stream m (Parser m a b) -> Parser m a c
sequence (Fold fstep finitial fextract) (D.Stream sstep sstate) =
Parser step initial extract
where
initial = do
fres <- finitial
case fres of
FL.Partial fs -> return $ IPartial (Nothing', sstate, fs)
FL.Done c -> return $ IDone c
-- state does not contain any parser
-- yield a new parser from the stream
step (Nothing', ss, fs) _ = do
sres <- sstep defState ss
case sres of
D.Yield p ss1 -> return $ Continue 1 (Just' p, ss1, fs)
D.Stop -> do
c <- fextract fs
return $ Done 1 c
D.Skip ss1 -> return $ Continue 1 (Nothing', ss1, fs)
-- state holds a parser that may or may not have been
-- initialized. pinit holds the initial parser state
-- or modified parser state respectively
step (Just' (Parser pstep pinit pextr), ss, fs) a = do
ps <- pinit
case ps of
IPartial ps1 -> do
pres <- pstep ps1 a
case pres of
Partial n ps2 ->
let newP =
Just' $ Parser pstep (return $ IPartial ps2) pextr
in return $ Partial n (newP, ss, fs)
Continue n ps2 ->
let newP =
Just' $ Parser pstep (return $ IPartial ps2) pextr
in return $ Continue n (newP, ss, fs)
Done n b -> do
fres <- fstep fs b
case fres of
FL.Partial fs1 ->
return $ Partial n (Nothing', ss, fs1)
FL.Done c -> return $ Done n c
Error msg -> return $ Error msg
IDone b -> do
fres <- fstep fs b
case fres of
FL.Partial fs1 ->
return $ Partial 1 (Nothing', ss, fs1)
FL.Done c -> return $ Done 1 c
IError err -> return $ Error err
extract (Nothing', _, fs) = fextract fs
extract (Just' (Parser _ pinit pextr), _, fs) = do
ps <- pinit
case ps of
IPartial ps1 -> do
b <- pextr ps1
fres <- fstep fs b
case fres of
FL.Partial fs1 -> fextract fs1
FL.Done c -> return c
IDone b -> do
fres <- fstep fs b
case fres of
FL.Partial fs1 -> fextract fs1
FL.Done c -> return c
IError err -> throwM $ ParseError err
-------------------------------------------------------------------------------
-- Alternative Collection

View File

@ -2,7 +2,7 @@ module Main (main) where
import Control.Exception (SomeException(..))
import Data.Word (Word8, Word32, Word64)
import Streamly.Test.Common (listEquals, checkListEqual, chooseInt)
import Streamly.Test.Common (listEquals, checkListEqual, chooseInt, equals)
import Test.Hspec (Spec, hspec, describe)
import Test.Hspec.QuickCheck
import Test.QuickCheck
@ -18,6 +18,7 @@ import qualified Streamly.Internal.Data.Parser.ParserD as P
import qualified Streamly.Internal.Data.Producer.Source as Source
import qualified Streamly.Internal.Data.Producer as Producer
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Test.Hspec as H
@ -635,6 +636,18 @@ parseUnfold = do
listEquals (==) xs ls
parserSequence :: Property
parserSequence =
forAll (vectorOf 11 (listOf (chooseAny :: Gen Int))) $ \ins ->
monadicIO $ do
let parsers = D.fromList
$ fmap (\xs -> P.fromFold $ FL.take (length xs) FL.sum) ins
let sequencedParser = P.sequence FL.sum parsers
outs <-
run $
S.parseD sequencedParser $ S.concatMap S.fromList (S.fromList ins)
equals (==) outs (sum $ map sum ins)
-------------------------------------------------------------------------------
-- Test for a particular case hit during fs events testing
-------------------------------------------------------------------------------
@ -720,6 +733,7 @@ main =
prop "parseMany" parseMany
prop "parseMany2Events" parseMany2Events
prop "parseUnfold" parseUnfold
prop "parserSequence" parserSequence
describe "test for accumulator" $ do
prop "P.fromFold FL.sum = FL.sum" fromFold