mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-09-17 11:37:20 +03:00
Implement intersectBySorted API
This commit is contained in:
parent
4bc714fe14
commit
160393c8e0
@ -459,6 +459,10 @@ o_n_heap_buffering value =
|
||||
$ joinWith Internal.joinOuter sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinOuterMap"
|
||||
$ joinMapWith Internal.joinOuterMap sqrtVal sqrtVal
|
||||
, benchIOSrc1 "intersectBy"
|
||||
$ joinWith Internal.intersectBy sqrtVal sqrtVal
|
||||
, benchIOSrc1 "intersectBySorted"
|
||||
$ joinMapWith Internal.intersectBySorted sqrtVal sqrtVal
|
||||
]
|
||||
]
|
||||
|
||||
|
@ -28,7 +28,7 @@ module Streamly.Internal.Data.Stream.IsStream.Top
|
||||
-- | These are not exactly set operations because streams are not
|
||||
-- necessarily sets, they may have duplicated elements.
|
||||
, intersectBy
|
||||
, mergeIntersectBy
|
||||
, intersectBySorted
|
||||
, differenceBy
|
||||
, mergeDifferenceBy
|
||||
, unionBy
|
||||
@ -65,6 +65,7 @@ import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
|
||||
import Streamly.Internal.Data.Stream.IsStream.Type
|
||||
(IsStream(..), adapt, foldl', fromList)
|
||||
import Streamly.Internal.Data.Stream.Serial (SerialT)
|
||||
--import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
|
||||
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
|
||||
|
||||
import qualified Data.List as List
|
||||
@ -79,6 +80,7 @@ import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
|
||||
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
|
||||
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
|
||||
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
|
||||
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD
|
||||
|
||||
import Prelude hiding (filter, zipWith, concatMap, concat)
|
||||
|
||||
@ -580,11 +582,12 @@ intersectBy eq s1 s2 =
|
||||
--
|
||||
-- Time: O(m+n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE mergeIntersectBy #-}
|
||||
mergeIntersectBy :: -- (IsStream t, Monad m) =>
|
||||
-- /Pre-release/
|
||||
{-# INLINE intersectBySorted #-}
|
||||
intersectBySorted :: (IsStream t, MonadIO m, Eq a) =>
|
||||
(a -> a -> Ordering) -> t m a -> t m a -> t m a
|
||||
mergeIntersectBy _eq _s1 _s2 = undefined
|
||||
intersectBySorted eq s1 =
|
||||
IsStream.fromStreamD . StreamD.intersectBySorted eq (IsStream.toStreamD s1) . IsStream.toStreamD
|
||||
|
||||
-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2
|
||||
|
||||
|
@ -142,6 +142,7 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting
|
||||
-- | Opposite to compact in ArrayStream
|
||||
, splitInnerBy
|
||||
, splitInnerBySuffix
|
||||
, intersectBySorted
|
||||
)
|
||||
where
|
||||
|
||||
@ -482,6 +483,59 @@ mergeBy
|
||||
=> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Intersection of sorted streams ---------------------------------------------
|
||||
-------------------------------------------------------------------------------
|
||||
{-# INLINE_NORMAL intersectBySorted #-}
|
||||
intersectBySorted
|
||||
:: (MonadIO m, Eq a)
|
||||
=> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
intersectBySorted cmp (Stream stepa ta) (Stream stepb tb) =
|
||||
Stream step (Just ta, Just tb, Nothing, Nothing, Nothing)
|
||||
|
||||
where
|
||||
{-# INLINE_LATE step #-}
|
||||
|
||||
-- step 1
|
||||
step gst (Just sa, sb, Nothing, b, Nothing) = do
|
||||
r <- stepa gst sa
|
||||
return $ case r of
|
||||
Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing)
|
||||
Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 2
|
||||
step gst (sa, Just sb, a, Nothing, Nothing) = do
|
||||
r <- stepb gst sb
|
||||
return $ case r of
|
||||
Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing)
|
||||
Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 3
|
||||
-- both the values are available compare it
|
||||
step _ (sa, sb, Just a, Just b, Nothing) = do
|
||||
let res = cmp a b
|
||||
return $ case res of
|
||||
GT -> Skip (sa, sb, Just a, Nothing, Nothing)
|
||||
LT -> Skip (sa, sb, Nothing, Just b, Nothing)
|
||||
EQ -> Yield a (sa, sb, Nothing, Just a, Just b) -- step 4
|
||||
|
||||
-- step 4
|
||||
-- Matching element
|
||||
step gst (Just sa, Just sb, Nothing, Just _, Just b) = do
|
||||
r1 <- stepa gst sa
|
||||
return $ case r1 of
|
||||
Yield a' sa' -> do
|
||||
if a' == b -- match with prev a
|
||||
then Yield a' (Just sa', Just sb, Nothing, Just b, Just b) --step 1
|
||||
else Skip (Just sa', Just sb, Just a', Nothing, Nothing)
|
||||
|
||||
Skip sa' -> Skip (Just sa', Just sb, Nothing, Nothing, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
step _ (_, _, _, _, _) = return Stop
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Combine N Streams - unfoldMany
|
||||
------------------------------------------------------------------------------
|
||||
|
@ -101,7 +101,7 @@ extra-source-files:
|
||||
test/Streamly/Test/Data/Array/Prim/Pinned.hs
|
||||
test/Streamly/Test/Data/Array/Foreign.hs
|
||||
test/Streamly/Test/Data/Array/Stream/Foreign.hs
|
||||
test/Streamly/Test/Data/Parser/ParserD.hs
|
||||
test/Streamly/Test/Data/Parser/ParserD.hs
|
||||
test/Streamly/Test/FileSystem/Event.hs
|
||||
test/Streamly/Test/FileSystem/Event/Common.hs
|
||||
test/Streamly/Test/FileSystem/Event/Darwin.hs
|
||||
|
@ -1,6 +1,7 @@
|
||||
module Main (main) where
|
||||
module Main (main)
|
||||
where
|
||||
|
||||
import Data.List (elem, nub, sort)
|
||||
import Data.List (elem, intersect, nub, sort)
|
||||
import Data.Maybe (isNothing)
|
||||
import Test.QuickCheck
|
||||
( Gen
|
||||
@ -168,10 +169,45 @@ joinLeftMap =
|
||||
let v2 = joinLeftList ls0 ls1
|
||||
assert (v1 == v2)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Main
|
||||
-------------------------------------------------------------------------------
|
||||
intersectBy :: Property
|
||||
intersectBy =
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
|
||||
monadicIO $ action (sort ls0) (sort ls1)
|
||||
|
||||
where
|
||||
|
||||
action ls0 ls1 = do
|
||||
v1 <-
|
||||
run
|
||||
$ S.toList
|
||||
$ Top.intersectBy
|
||||
(==)
|
||||
(S.fromList ls0)
|
||||
(S.fromList ls1)
|
||||
let v2 = intersect ls0 ls1
|
||||
assert (v1 == sort v2)
|
||||
|
||||
intersectBySorted :: Property
|
||||
intersectBySorted =
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
|
||||
monadicIO $ action (sort ls0) (sort ls1)
|
||||
|
||||
where
|
||||
|
||||
action ls0 ls1 = do
|
||||
v1 <-
|
||||
run
|
||||
$ S.toList
|
||||
$ Top.intersectBySorted
|
||||
compare
|
||||
(S.fromList ls0)
|
||||
(S.fromList ls1)
|
||||
let v2 = intersect ls0 ls1
|
||||
assert (v1 == sort v2)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
moduleName :: String
|
||||
moduleName = "Prelude.Top"
|
||||
|
||||
@ -187,3 +223,6 @@ main = hspec $ do
|
||||
prop "joinOuterMap" Main.joinOuterMap
|
||||
prop "joinLeft" Main.joinLeft
|
||||
prop "joinLeftMap" Main.joinLeftMap
|
||||
-- intersect
|
||||
prop "intersectBy" Main.intersectBy
|
||||
prop "intersectBySorted" Main.intersectBySorted
|
||||
|
Loading…
Reference in New Issue
Block a user