Implement joinOuterHash API

This commit is contained in:
Ranjeet Kumar Ranjan 2022-01-05 18:52:42 +05:30 committed by Harendra Kumar
parent 7b0a34348e
commit 72a2b1fa43
4 changed files with 102 additions and 15 deletions

View File

@ -436,12 +436,27 @@ joinInnerMap val1 val2 _ =
(fmap toKvMap (mkStreamLen val1))
(fmap toKvMap (mkStreamLen val2))
{-# INLINE joinOuter #-}
joinOuter :: Int -> Int -> Int -> IO ()
joinOuter val1 val2 _ =
S.drain $ Internal.joinOuter (==) (mkStreamLen val1) $ mkStreamLen val2
{-# INLINE joinOuterMap #-}
joinOuterMap :: Int -> Int -> Int -> IO ()
joinOuterMap val1 val2 _ =
S.drain $
Internal.joinOuterMap
(fmap toKvMap (mkStreamLen val1))
(fmap toKvMap (mkStreamLen val2))
o_n_heap_buffering :: Int -> [Benchmark]
o_n_heap_buffering value =
[ bgroup "buffered"
[
benchIOSrc1 "joinInner" (joinInner sqrtVal sqrtVal)
, benchIOSrc1 "joinInnerMap" (joinInnerMap sqrtVal sqrtVal)
, benchIOSrc1 "joinOuter" (joinOuter sqrtVal sqrtVal)
, benchIOSrc1 "joinOuterMap" (joinOuterMap sqrtVal sqrtVal)
]
]

View File

@ -42,9 +42,9 @@ module Streamly.Internal.Data.Stream.IsStream.Top
, leftJoin
, mergeLeftJoin
, hashLeftJoin
, outerJoin
, joinOuter
, mergeOuterJoin
, hashOuterJoin
, joinOuterMap
)
where
@ -385,7 +385,7 @@ leftJoin eq s1 s2 = Stream.evalStateT (return False) $ do
else Stream.nil
Nothing -> return (a, Nothing)
-- | Like 'outerJoin' but uses a hashmap for efficiency.
-- | Like 'joinOuter' but uses a hashmap for efficiency.
--
-- Space: O(n)
--
@ -423,13 +423,13 @@ mergeLeftJoin _eq _s1 _s2 = undefined
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE outerJoin #-}
outerJoin :: MonadIO m =>
{-# INLINE joinOuter #-}
joinOuter :: MonadIO m =>
(a -> b -> Bool)
-> SerialT m a
-> SerialT m b
-> SerialT m (Maybe a, Maybe b)
outerJoin eq s1 s =
joinOuter eq s1 s =
Stream.concatM $ do
arr <- Array.fromStream $ fmap (,False) s
return $ go arr <> leftOver arr
@ -470,7 +470,7 @@ outerJoin eq s1 s =
-- a flag. At the end go through @t m b@ and find those that are not in that
-- hash to return (Nothing, b).
--
-- | Like 'outerJoin' but uses a hashmap for efficiency.
-- | Like 'joinOuter' but uses a hashmap for efficiency.
--
-- For space efficiency use the smaller stream as the second stream.
--
@ -478,13 +478,38 @@ outerJoin eq s1 s =
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE hashOuterJoin #-}
hashOuterJoin :: -- (Monad m, Hashable b) =>
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
hashOuterJoin _eq _s1 _s2 = undefined
-- /Pre-release/
{-# INLINE joinOuterMap #-}
joinOuterMap ::
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap s1 s2 =
Stream.concatM $ do
km1 <- kvFold $ IsStream.adapt s1
km2 <- kvFold $ IsStream.adapt s2
-- | Like 'outerJoin' but works only on sorted streams.
let res1 = Stream.map (joinAB km2) s1
where
joinAB km (k, a) =
case k `Map.lookup` km of
Just b -> (k, Just a, Just b)
Nothing -> (k, Just a, Nothing)
let res2 = Stream.mapMaybe (joinAB km1) s2
where
joinAB km (k, b) =
case k `Map.lookup` km of
Just _ -> Nothing
Nothing -> Just (k, Nothing, Just b)
return $ Stream.serial res1 res2
where
-- XXX Generate error if a duplicate insertion is attempted?
kvFold = Stream.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
-- | Like 'joinOuter' but works only on sorted streams.
--
-- Space: O(1)
--
@ -607,7 +632,7 @@ mergeDifferenceBy _eq _s1 _s2 = undefined
-- unionBy eq s1 s2 = s1 \`serial` (s2 `differenceBy eq` s1)
-- @
--
-- Similar to 'outerJoin' but not the same.
-- Similar to 'joinOuter' but not the same.
--
-- Space: O(n)
--

View File

@ -102,6 +102,7 @@ extra-source-files:
test/Streamly/Test/Data/Array/Foreign.hs
test/Streamly/Test/Data/Array/Stream/Foreign.hs
test/Streamly/Test/Data/Parser/ParserD.hs
test/Streamly/Test/Data/Stream/Top.hs
test/Streamly/Test/FileSystem/Event.hs
test/Streamly/Test/FileSystem/Event/Common.hs
test/Streamly/Test/FileSystem/Event/Darwin.hs

View File

@ -1,6 +1,7 @@
module Main (main) where
import Data.List (nub, sort)
import Data.List (elem, nub, sort)
import Data.Maybe (isNothing)
import Test.QuickCheck
( Gen
, Property
@ -68,6 +69,49 @@ joinInnerMap =
]
assert (sort v1 == sort v2)
-- XXX A bug need to be fixed in joinOuter function
joinOuter :: Property
joinOuter =
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
monadicIO $ action ls0 ls1
where
action ls0 ls1 = do
v1 <-
run
$ S.toList
$ Top.joinOuter eq (S.fromList ls0) (S.fromList ls1)
let v2 = [ (Just i, Just j) | i <- ls0, j <- ls1, i == j ]
assert (v1 == v2)
joinOuterMap :: Property
joinOuterMap =
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
monadicIO $
action (map (\a -> (a,a)) ls0) (map (\a -> (a,a)) (nub ls1))
where
action ls0 ls1 = do
v1 <-
run
$ S.toList
$ Top.joinOuterMap (S.fromList ls0) (S.fromList ls1)
let v2 = do
i <- ls0
if (elem i ls1)
then return (fst i, Just (fst i), Just (fst i))
else return (fst i, Just (fst i), Nothing)
v3 = do
j <- ls1
if (elem j ls0)
then return (fst j, Just (fst j), Just (fst j))
else return (fst j, Nothing, Just (fst j))
v4 = filter (\(_, a2, _) -> isNothing a2) v3
assert (v1 == v2 ++ v4)
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
@ -82,3 +126,5 @@ main = hspec $ do
prop "joinInner" Main.joinInner
prop "joinInnerMap" Main.joinInnerMap
prop "joinOuter" Main.joinOuter
prop "joinOuterMap" Main.joinOuterMap