mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-08-15 19:30:26 +03:00
Implement joinLeftHash API
This commit is contained in:
parent
870958d766
commit
582410afda
@ -451,6 +451,10 @@ o_n_heap_buffering value =
|
||||
$ joinWith Internal.joinInner sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinInnerMap"
|
||||
$ joinMapWith Internal.joinInnerMap sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinLeft"
|
||||
$ joinWith Internal.joinLeft sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinLeftMap "
|
||||
$ joinMapWith Internal.joinLeftMap sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinOuter"
|
||||
$ joinWith Internal.joinOuter sqrtVal sqrtVal
|
||||
, benchIOSrc1 "joinOuterMap"
|
||||
|
@ -39,9 +39,9 @@ module Streamly.Internal.Data.Stream.IsStream.Top
|
||||
, joinInner
|
||||
, joinInnerMap
|
||||
, joinInnerMerge
|
||||
, leftJoin
|
||||
, joinLeft
|
||||
, mergeLeftJoin
|
||||
, hashLeftJoin
|
||||
, joinLeftMap
|
||||
, joinOuter
|
||||
, mergeOuterJoin
|
||||
, joinOuterMap
|
||||
@ -291,6 +291,10 @@ joinInner eq s1 s2 = do
|
||||
) s2
|
||||
) s1
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
|
||||
toMap = Stream.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
|
||||
-- If the second stream is too big it can be partitioned based on hashes and
|
||||
-- then we can process one parition at a time.
|
||||
--
|
||||
@ -313,14 +317,11 @@ joinInnerMap :: (IsStream t, Monad m, Ord k) =>
|
||||
t m (k, a) -> t m (k, b) -> t m (k, a, b)
|
||||
joinInnerMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- kvFold $ IsStream.adapt s2
|
||||
km <- toMap $ IsStream.adapt s2
|
||||
pure $ Stream.mapMaybe (joinAB km) s1
|
||||
|
||||
where
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
kvFold = Stream.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
|
||||
joinAB kvm (k, a) =
|
||||
case k `Map.lookup` kvm of
|
||||
Just b -> Just (k, a, b)
|
||||
@ -353,7 +354,7 @@ joinInnerMerge = undefined
|
||||
-- stream is expensive to evaluate.
|
||||
--
|
||||
-- @
|
||||
-- rightJoin = flip leftJoin
|
||||
-- rightJoin = flip joinLeft
|
||||
-- @
|
||||
--
|
||||
-- Space: O(n) assuming the second stream is cached in memory.
|
||||
@ -361,10 +362,10 @@ joinInnerMerge = undefined
|
||||
-- Time: O(m x n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE leftJoin #-}
|
||||
leftJoin :: Monad m =>
|
||||
{-# INLINE joinLeft #-}
|
||||
joinLeft :: Monad m =>
|
||||
(a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
|
||||
leftJoin eq s1 s2 = Stream.evalStateT (return False) $ do
|
||||
joinLeft eq s1 s2 = Stream.evalStateT (return False) $ do
|
||||
a <- Stream.liftInner s1
|
||||
-- XXX should we use StreamD monad here?
|
||||
-- XXX Is there a better way to perform some action at the end of a loop
|
||||
@ -391,13 +392,23 @@ leftJoin eq s1 s2 = Stream.evalStateT (return False) $ do
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE hashLeftJoin #-}
|
||||
hashLeftJoin :: -- Hashable b =>
|
||||
(a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
|
||||
hashLeftJoin = undefined
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinLeftMap #-}
|
||||
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
|
||||
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
|
||||
joinLeftMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- toMap $ IsStream.adapt s2
|
||||
return $ Stream.map (joinAB km) s1
|
||||
|
||||
-- | Like 'leftJoin' but works only on sorted streams.
|
||||
where
|
||||
|
||||
joinAB km (k, a) =
|
||||
case k `Map.lookup` km of
|
||||
Just b -> (k, a, Just b)
|
||||
Nothing -> (k, a, Nothing)
|
||||
|
||||
-- | Like 'joinLeft' but works only on sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
@ -575,7 +586,7 @@ mergeIntersectBy :: -- (IsStream t, Monad m) =>
|
||||
(a -> a -> Ordering) -> t m a -> t m a -> t m a
|
||||
mergeIntersectBy _eq _s1 _s2 = undefined
|
||||
|
||||
-- Roughly leftJoin s1 s2 = s1 `difference` s2 + s1 `intersection` s2
|
||||
-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2
|
||||
|
||||
-- | Delete first occurrences of those elements from the first stream that are
|
||||
-- present in the second stream. If an element occurs multiple times in the
|
||||
|
@ -123,6 +123,51 @@ joinOuterMap =
|
||||
let v2 = joinOuterList ls0 ls1
|
||||
assert (sort v1 == sort v2)
|
||||
|
||||
joinLeftList :: [(Int, Int)] -> [(Int, Int)] -> [(Int, Int, Maybe Int)]
|
||||
joinLeftList ls0 ls1 =
|
||||
let v = do
|
||||
i <- ls0
|
||||
if i `elem` ls1
|
||||
then return (fst i, fst i, Just (fst i))
|
||||
else return (fst i, fst i, Nothing)
|
||||
in v
|
||||
|
||||
joinLeft :: Property
|
||||
joinLeft =
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
|
||||
-- nub the second list as no way to validate using list functions
|
||||
monadicIO $ action ls0 (nub ls1)
|
||||
|
||||
where
|
||||
|
||||
action ls0 ls1 = do
|
||||
v1 <-
|
||||
run
|
||||
$ S.toList
|
||||
$ Top.joinLeft eq (S.fromList ls0) (S.fromList ls1)
|
||||
let v2 = joinLeftList (map (\a -> (a,a)) ls0) (map (\a -> (a,a)) ls1)
|
||||
v3 = map (\ (_, x1, x2) -> (x1, x2)) v2
|
||||
assert (v1 == v3)
|
||||
|
||||
joinLeftMap :: Property
|
||||
joinLeftMap =
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
|
||||
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
|
||||
-- nub the second list as no way to validate using list functions
|
||||
monadicIO $
|
||||
action (map (\a -> (a,a)) ls0) (map (\a -> (a,a)) (nub ls1))
|
||||
|
||||
where
|
||||
|
||||
action ls0 ls1 = do
|
||||
v1 <-
|
||||
run
|
||||
$ S.toList
|
||||
$ Top.joinLeftMap (S.fromList ls0) (S.fromList ls1)
|
||||
let v2 = joinLeftList ls0 ls1
|
||||
assert (v1 == v2)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Main
|
||||
-------------------------------------------------------------------------------
|
||||
@ -140,3 +185,5 @@ main = hspec $ do
|
||||
-- XXX currently API is broken https://github.com/composewell/streamly/issues/1032
|
||||
--prop "joinOuter" Main.joinOuter
|
||||
prop "joinOuterMap" Main.joinOuterMap
|
||||
prop "joinLeft" Main.joinLeft
|
||||
prop "joinLeftMap" Main.joinLeftMap
|
||||
|
Loading…
Reference in New Issue
Block a user