From 582410afda1587005f9a65473d545ad255a225e7 Mon Sep 17 00:00:00 2001 From: Ranjeet Kumar Ranjan Date: Wed, 5 Jan 2022 16:35:50 +0530 Subject: [PATCH] Implement joinLeftHash API --- .../Benchmark/Prelude/Serial/NestedStream.hs | 4 ++ .../Internal/Data/Stream/IsStream/Top.hs | 45 +++++++++++------- test/Streamly/Test/Prelude/Top.hs | 47 +++++++++++++++++++ 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs b/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs index fb5b3ce6..45c2a9bf 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs @@ -451,6 +451,10 @@ o_n_heap_buffering value = $ joinWith Internal.joinInner sqrtVal sqrtVal , benchIOSrc1 "joinInnerMap" $ joinMapWith Internal.joinInnerMap sqrtVal sqrtVal + , benchIOSrc1 "joinLeft" + $ joinWith Internal.joinLeft sqrtVal sqrtVal + , benchIOSrc1 "joinLeftMap " + $ joinMapWith Internal.joinLeftMap sqrtVal sqrtVal , benchIOSrc1 "joinOuter" $ joinWith Internal.joinOuter sqrtVal sqrtVal , benchIOSrc1 "joinOuterMap" diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs index 2b2c1df3..7677412f 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs @@ -39,9 +39,9 @@ module Streamly.Internal.Data.Stream.IsStream.Top , joinInner , joinInnerMap , joinInnerMerge - , leftJoin + , joinLeft , mergeLeftJoin - , hashLeftJoin + , joinLeftMap , joinOuter , mergeOuterJoin , joinOuterMap @@ -291,6 +291,10 @@ joinInner eq s1 s2 = do ) s2 ) s1 +-- XXX Generate error if a duplicate insertion is attempted? +toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v) +toMap = Stream.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty + -- If the second stream is too big it can be partitioned based on hashes and -- then we can process one parition at a time. -- @@ -313,14 +317,11 @@ joinInnerMap :: (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b) joinInnerMap s1 s2 = Stream.concatM $ do - km <- kvFold $ IsStream.adapt s2 + km <- toMap $ IsStream.adapt s2 pure $ Stream.mapMaybe (joinAB km) s1 where - -- XXX Generate error if a duplicate insertion is attempted? - kvFold = Stream.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty - joinAB kvm (k, a) = case k `Map.lookup` kvm of Just b -> Just (k, a, b) @@ -353,7 +354,7 @@ joinInnerMerge = undefined -- stream is expensive to evaluate. -- -- @ --- rightJoin = flip leftJoin +-- rightJoin = flip joinLeft -- @ -- -- Space: O(n) assuming the second stream is cached in memory. @@ -361,10 +362,10 @@ joinInnerMerge = undefined -- Time: O(m x n) -- -- /Unimplemented/ -{-# INLINE leftJoin #-} -leftJoin :: Monad m => +{-# INLINE joinLeft #-} +joinLeft :: Monad m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b) -leftJoin eq s1 s2 = Stream.evalStateT (return False) $ do +joinLeft eq s1 s2 = Stream.evalStateT (return False) $ do a <- Stream.liftInner s1 -- XXX should we use StreamD monad here? -- XXX Is there a better way to perform some action at the end of a loop @@ -391,13 +392,23 @@ leftJoin eq s1 s2 = Stream.evalStateT (return False) $ do -- -- Time: O(m + n) -- --- /Unimplemented/ -{-# INLINE hashLeftJoin #-} -hashLeftJoin :: -- Hashable b => - (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b) -hashLeftJoin = undefined +-- /Pre-release/ +{-# INLINE joinLeftMap #-} +joinLeftMap :: (IsStream t, Ord k, Monad m) => + t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b) +joinLeftMap s1 s2 = + Stream.concatM $ do + km <- toMap $ IsStream.adapt s2 + return $ Stream.map (joinAB km) s1 --- | Like 'leftJoin' but works only on sorted streams. + where + + joinAB km (k, a) = + case k `Map.lookup` km of + Just b -> (k, a, Just b) + Nothing -> (k, a, Nothing) + +-- | Like 'joinLeft' but works only on sorted streams. -- -- Space: O(1) -- @@ -575,7 +586,7 @@ mergeIntersectBy :: -- (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a mergeIntersectBy _eq _s1 _s2 = undefined --- Roughly leftJoin s1 s2 = s1 `difference` s2 + s1 `intersection` s2 +-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2 -- | Delete first occurrences of those elements from the first stream that are -- present in the second stream. If an element occurs multiple times in the diff --git a/test/Streamly/Test/Prelude/Top.hs b/test/Streamly/Test/Prelude/Top.hs index f78fd078..404b98d9 100644 --- a/test/Streamly/Test/Prelude/Top.hs +++ b/test/Streamly/Test/Prelude/Top.hs @@ -123,6 +123,51 @@ joinOuterMap = let v2 = joinOuterList ls0 ls1 assert (sort v1 == sort v2) +joinLeftList :: [(Int, Int)] -> [(Int, Int)] -> [(Int, Int, Maybe Int)] +joinLeftList ls0 ls1 = + let v = do + i <- ls0 + if i `elem` ls1 + then return (fst i, fst i, Just (fst i)) + else return (fst i, fst i, Nothing) + in v + +joinLeft :: Property +joinLeft = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + -- nub the second list as no way to validate using list functions + monadicIO $ action ls0 (nub ls1) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.joinLeft eq (S.fromList ls0) (S.fromList ls1) + let v2 = joinLeftList (map (\a -> (a,a)) ls0) (map (\a -> (a,a)) ls1) + v3 = map (\ (_, x1, x2) -> (x1, x2)) v2 + assert (v1 == v3) + +joinLeftMap :: Property +joinLeftMap = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + -- nub the second list as no way to validate using list functions + monadicIO $ + action (map (\a -> (a,a)) ls0) (map (\a -> (a,a)) (nub ls1)) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.joinLeftMap (S.fromList ls0) (S.fromList ls1) + let v2 = joinLeftList ls0 ls1 + assert (v1 == v2) + ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -140,3 +185,5 @@ main = hspec $ do -- XXX currently API is broken https://github.com/composewell/streamly/issues/1032 --prop "joinOuter" Main.joinOuter prop "joinOuterMap" Main.joinOuterMap + prop "joinLeft" Main.joinLeft + prop "joinLeftMap" Main.joinLeftMap