mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-03 19:57:08 +03:00
Rename, document, update joins/set operations
This commit is contained in:
parent
f9a81d441e
commit
a6a15e431d
@ -11,16 +11,17 @@
|
||||
|
||||
module Streamly.Internal.Data.Stream.Container
|
||||
(
|
||||
nub
|
||||
-- * Deduplication
|
||||
ordNub
|
||||
|
||||
-- * Joins for unconstrained types
|
||||
, joinLeftGeneric
|
||||
, joinOuterGeneric
|
||||
-- * Joins
|
||||
, leftJoin
|
||||
, outerJoin
|
||||
|
||||
-- * Joins with Ord constraint
|
||||
, joinInner
|
||||
, joinLeft
|
||||
, joinOuter
|
||||
-- * Ord Joins
|
||||
, innerOrdJoin
|
||||
, leftOrdJoin
|
||||
, outerOrdJoin
|
||||
)
|
||||
where
|
||||
|
||||
@ -47,12 +48,15 @@ import qualified Streamly.Internal.Data.Stream.Transformer as Stream
|
||||
|
||||
#include "DocTestDataStream.hs"
|
||||
|
||||
-- | The memory used is proportional to the number of unique elements in the
|
||||
-- stream. If we want to limit the memory we can just use "take" to limit the
|
||||
-- uniq elements in the stream.
|
||||
{-# INLINE_NORMAL nub #-}
|
||||
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
|
||||
nub (Stream step1 state1) = Stream step (Set.empty, state1)
|
||||
-- | @nub@ specialized to 'Ord' types for better performance. Returns a
|
||||
-- subsequence of the stream removing any duplicate elements.
|
||||
--
|
||||
-- The memory used is proportional to the number of unique elements in the
|
||||
-- stream. One way to limit the memory is to use @take@ on the resulting
|
||||
-- stream to limit the unique elements in the stream.
|
||||
{-# INLINE_NORMAL ordNub #-}
|
||||
ordNub :: (Monad m, Ord a) => Stream m a -> Stream m a
|
||||
ordNub (Stream step1 state1) = Stream step (Set.empty, state1)
|
||||
|
||||
where
|
||||
|
||||
@ -78,8 +82,8 @@ toMap =
|
||||
--
|
||||
-- XXX An IntMap may be faster when the keys are Int.
|
||||
-- XXX Use hashmap instead of map?
|
||||
--
|
||||
-- | Like 'joinInner' but uses a 'Map' for efficiency.
|
||||
|
||||
-- | 'innerJoin' specialized to 'Ord' types for better performance.
|
||||
--
|
||||
-- If the input streams have duplicate keys, the behavior is undefined.
|
||||
--
|
||||
@ -90,10 +94,10 @@ toMap =
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinInner #-}
|
||||
joinInner :: (Monad m, Ord k) =>
|
||||
{-# INLINE innerOrdJoin #-}
|
||||
innerOrdJoin :: (Monad m, Ord k) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
|
||||
joinInner s1 s2 =
|
||||
innerOrdJoin s1 s2 =
|
||||
Stream.concatEffect $ do
|
||||
km <- toMap s2
|
||||
pure $ Stream.mapMaybe (joinAB km) s1
|
||||
@ -106,32 +110,33 @@ joinInner s1 s2 =
|
||||
Nothing -> Nothing
|
||||
|
||||
-- XXX We can do this concurrently.
|
||||
-- XXX Check performance of StreamD vs StreamK
|
||||
-- XXX If the second stream is sorted and passed as an Array or a seek capable
|
||||
-- stream then we could use binary search if we have an Ord instance or
|
||||
-- Ordering returning function. The time complexity would then become (m x log
|
||||
-- n).
|
||||
|
||||
-- XXX Check performance of StreamD vs StreamK
|
||||
|
||||
-- | Like 'joinInner' but emit @(a, Just b)@, and additionally, for those @a@'s
|
||||
-- that are not equal to any @b@ emit @(a, Nothing)@.
|
||||
-- | Like 'innerJoin' but emits @(a, Just b)@ whenever a and b are equal, for
|
||||
-- those @a@'s that are not equal to any @b@ emits @(a, Nothing)@.
|
||||
--
|
||||
-- The second stream is evaluated multiple times. If the stream is a
|
||||
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
|
||||
-- before calling this function. Caching may also improve performance if the
|
||||
-- stream is expensive to evaluate.
|
||||
-- This is a generalization of 'innerJoin' to include all elements from the
|
||||
-- left stream and not just those which have an equal in the right stream. This
|
||||
-- is not a commutative operation, the order of the stream arguments matters.
|
||||
--
|
||||
-- >>> joinRightGeneric eq = flip (Stream.joinLeftGeneric eq)
|
||||
-- All the caveats mentioned in 'innerJoin' apply here as well. Right join is
|
||||
-- not provided because it is just a flipped left join:
|
||||
--
|
||||
-- >>> rightJoin eq = flip (Stream.leftJoin eq)
|
||||
--
|
||||
-- Space: O(n) assuming the second stream is cached in memory.
|
||||
--
|
||||
-- Time: O(m x n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE joinLeftGeneric #-}
|
||||
joinLeftGeneric :: Monad m =>
|
||||
{-# INLINE leftJoin #-}
|
||||
leftJoin :: Monad m =>
|
||||
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
|
||||
joinLeftGeneric eq s1 s2 = Stream.evalStateT (return False) $ unCross $ do
|
||||
leftJoin eq s1 s2 = Stream.evalStateT (return False) $ unCross $ do
|
||||
a <- mkCross (Stream.liftInner s1)
|
||||
-- XXX should we use StreamD monad here?
|
||||
-- XXX Is there a better way to perform some action at the end of a loop
|
||||
@ -152,19 +157,17 @@ joinLeftGeneric eq s1 s2 = Stream.evalStateT (return False) $ unCross $ do
|
||||
else mkCross Stream.nil
|
||||
Nothing -> return (a, Nothing)
|
||||
|
||||
-- XXX rename to joinLeftOrd?
|
||||
|
||||
-- | A more efficient 'joinLeft' using a hashmap for efficiency.
|
||||
-- | 'leftJoin' specialized to 'Ord' types for better performance.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinLeft #-}
|
||||
joinLeft :: (Ord k, Monad m) =>
|
||||
{-# INLINE leftOrdJoin #-}
|
||||
leftOrdJoin :: (Ord k, Monad m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
|
||||
joinLeft s1 s2 =
|
||||
leftOrdJoin s1 s2 =
|
||||
Stream.concatEffect $ do
|
||||
km <- toMap s2
|
||||
return $ fmap (joinAB km) s1
|
||||
@ -177,14 +180,19 @@ joinLeft s1 s2 =
|
||||
Nothing -> (k, a, Nothing)
|
||||
|
||||
-- XXX We can do this concurrently.
|
||||
-- XXX Check performance of StreamD vs StreamK cross operation.
|
||||
|
||||
-- XXX Check performance of StreamD vs StreamK
|
||||
|
||||
-- | Like 'joinLeft' but emits a @(Just a, Just b)@. Like 'joinLeft', for those
|
||||
-- | Like 'leftJoin' but emits a @(Just a, Just b)@. Like 'leftJoin', for those
|
||||
-- @a@'s that are not equal to any @b@ emit @(Just a, Nothing)@, but
|
||||
-- additionally, for those @b@'s that are not equal to any @a@ emit @(Nothing,
|
||||
-- Just b)@.
|
||||
--
|
||||
-- This is a generalization of left join to include all the elements from the
|
||||
-- right stream as well, in other words it is a combination of left and right
|
||||
-- joins. This is a commutative operation. The order of stream arguments can be
|
||||
-- changed without affecting results, except for the ordering of elements in
|
||||
-- the resulting tuple.
|
||||
--
|
||||
-- For space efficiency use the smaller stream as the second stream.
|
||||
--
|
||||
-- Space: O(n)
|
||||
@ -192,15 +200,15 @@ joinLeft s1 s2 =
|
||||
-- Time: O(m x n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinOuterGeneric #-}
|
||||
joinOuterGeneric :: MonadIO m =>
|
||||
{-# INLINE outerJoin #-}
|
||||
outerJoin :: MonadIO m =>
|
||||
(a -> b -> Bool)
|
||||
-> Stream m a
|
||||
-> Stream m b
|
||||
-> Stream m (Maybe a, Maybe b)
|
||||
joinOuterGeneric eq s1 s =
|
||||
outerJoin eq s1 s2 =
|
||||
Stream.concatEffect $ do
|
||||
inputArr <- Array.fromStream s
|
||||
inputArr <- Array.fromStream s2
|
||||
let len = Array.length inputArr
|
||||
foundArr <-
|
||||
Stream.fold
|
||||
@ -254,18 +262,18 @@ joinOuterGeneric eq s1 s =
|
||||
-- a flag. At the end go through @Stream m b@ and find those that are not in that
|
||||
-- hash to return (Nothing, b).
|
||||
|
||||
-- | Like 'joinOuter' but uses a 'Map' for efficiency.
|
||||
-- | 'outerJoin' specialized to 'Ord' types for better performance.
|
||||
--
|
||||
-- Space: O(m + n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinOuter #-}
|
||||
joinOuter ::
|
||||
{-# INLINE outerOrdJoin #-}
|
||||
outerOrdJoin ::
|
||||
(Ord k, MonadIO m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
|
||||
joinOuter s1 s2 =
|
||||
outerOrdJoin s1 s2 =
|
||||
Stream.concatEffect $ do
|
||||
km1 <- kvFold s1
|
||||
km2 <- kvFold s2
|
||||
|
@ -141,9 +141,8 @@ module Streamly.Internal.Data.Stream.Nesting
|
||||
|
||||
-- * Transform (Nested Containers)
|
||||
-- | Opposite to compact in ArrayStream
|
||||
, splitInnerBy
|
||||
, splitInnerBySuffix
|
||||
, intersectBySorted
|
||||
, splitInnerBy -- XXX innerSplitOn
|
||||
, splitInnerBySuffix -- XXX innerSplitOnSuffix
|
||||
|
||||
-- * Reduce By Streams
|
||||
, dropPrefix
|
||||
@ -593,49 +592,6 @@ mergeFstBy :: -- Monad m =>
|
||||
mergeFstBy _f _m1 _m2 = undefined
|
||||
-- fromStreamK $ D.mergeFstBy f (toStreamD m1) (toStreamD m2)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Intersection of sorted streams
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-- Assuming the streams are sorted in ascending order
|
||||
{-# INLINE_NORMAL intersectBySorted #-}
|
||||
intersectBySorted :: Monad m
|
||||
=> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
intersectBySorted cmp (Stream stepa ta) (Stream stepb tb) =
|
||||
Stream step
|
||||
( ta -- left stream state
|
||||
, tb -- right stream state
|
||||
, Nothing -- left value
|
||||
, Nothing -- right value
|
||||
)
|
||||
|
||||
where
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
-- step 1, fetch the first value
|
||||
step gst (sa, sb, Nothing, b) = do
|
||||
r <- stepa gst sa
|
||||
return $ case r of
|
||||
Yield a sa' -> Skip (sa', sb, Just a, b) -- step 2/3
|
||||
Skip sa' -> Skip (sa', sb, Nothing, b)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 2, fetch the second value
|
||||
step gst (sa, sb, a@(Just _), Nothing) = do
|
||||
r <- stepb gst sb
|
||||
return $ case r of
|
||||
Yield b sb' -> Skip (sa, sb', a, Just b) -- step 3
|
||||
Skip sb' -> Skip (sa, sb', a, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 3, compare the two values
|
||||
step _ (sa, sb, Just a, Just b) = do
|
||||
let res = cmp a b
|
||||
return $ case res of
|
||||
GT -> Skip (sa, sb, Just a, Nothing) -- step 2
|
||||
LT -> Skip (sa, sb, Nothing, Just b) -- step 1
|
||||
EQ -> Yield a (sa, sb, Nothing, Just b) -- step 1
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Combine N Streams - unfoldMany
|
||||
------------------------------------------------------------------------------
|
||||
|
@ -8,38 +8,73 @@
|
||||
-- Portability : GHC
|
||||
--
|
||||
-- Top level module that can depend on all other lower level Stream modules.
|
||||
--
|
||||
-- Design notes:
|
||||
--
|
||||
-- The order of arguments in the join operations should ideally be opposite. It
|
||||
-- should be such that the infinite stream is the last one. The transformation
|
||||
-- should be on the last argument, so if you curry the functions with all other
|
||||
-- arguments we get a @Stream -> Stream@ function. The first stream argument
|
||||
-- may be considered as a config or modifier for the operation.
|
||||
--
|
||||
-- Benefit of changing the order is that we get a more intuitive Stream ->
|
||||
-- Stream transformation after currying all other arguments. The inner loop
|
||||
-- streams become arguments for the transformation, more like local modifiers
|
||||
-- for the global outer stream as the last argument. Thus we can continue using
|
||||
-- transformations on the outer stream in a composed pipeline. Otherwise we can
|
||||
-- use flip to flip the order.
|
||||
--
|
||||
-- The fact that the inner stream can be used in the loop multiple times also
|
||||
-- tells that this is not the real effectful stream, it is more like a pure
|
||||
-- stream or an array. In fact we may consider using an Identity streams as
|
||||
-- inner streams in which case these functions will not look nice.
|
||||
--
|
||||
-- Downsides:
|
||||
--
|
||||
-- * Maybe less intuitive to think about, because we usually think the first
|
||||
-- stream as the outer loop and second as the inner.
|
||||
-- * Zip and merge operations will continue using the opposite order.
|
||||
-- * Need to change the order of cross, crossWith operations as well
|
||||
-- * It will be inconsistent with Data.List. The functions cannot be used as
|
||||
-- intuitive operators.
|
||||
--
|
||||
-- The choice is similar to concatMap vs bind. concatMap is pipeline
|
||||
-- composition friendly but bind is user intuition friendly. Another option is
|
||||
-- to have other functions with a different argument order e.g. flippedCross
|
||||
-- instead of cross.
|
||||
--
|
||||
-- If we change the order we have to make sure that we have a consistent
|
||||
-- convention for set-like and the cross join operations.
|
||||
|
||||
module Streamly.Internal.Data.Stream.Top
|
||||
(
|
||||
-- * Transformation
|
||||
-- ** Sampling
|
||||
-- * Sampling
|
||||
-- | Value agnostic filtering.
|
||||
strideFromThen
|
||||
|
||||
-- * Nesting
|
||||
-- ** Set like operations
|
||||
-- | These are not exactly set operations because streams are not
|
||||
-- necessarily sets, they may have duplicated elements. These operations
|
||||
-- are generic i.e. they work on streams of unconstrained types, therefore,
|
||||
-- they have quadratic performance characterstics. For better performance
|
||||
-- using Set structures see the Streamly.Internal.Data.Stream.Container
|
||||
-- module.
|
||||
, filterInStreamGenericBy
|
||||
, deleteInStreamGenericBy
|
||||
, unionWithStreamGenericBy
|
||||
-- * Straight Joins
|
||||
-- | These are set-like operations but not exactly set operations because
|
||||
-- streams are not necessarily sets, they may have duplicated elements.
|
||||
-- These operations are generic i.e. they work on streams of unconstrained
|
||||
-- types, therefore, they have quadratic performance characterstics. For
|
||||
-- better performance using Set or Map structures see the
|
||||
-- Streamly.Internal.Data.Stream.Container module.
|
||||
, intersectBy
|
||||
, deleteFirstsBy
|
||||
, unionBy
|
||||
|
||||
-- ** Set like operations on sorted streams
|
||||
, filterInStreamAscBy
|
||||
, deleteInStreamAscBy
|
||||
, unionWithStreamAscBy
|
||||
-- Set like operations on sorted streams
|
||||
, sortedIntersectBy
|
||||
, sortedDeleteFirstsBy
|
||||
, sortedUnionBy
|
||||
|
||||
-- ** Join operations
|
||||
, joinInnerGeneric
|
||||
-- * Cross Joins
|
||||
, innerJoin
|
||||
|
||||
-- * Joins on sorted stream
|
||||
, joinInnerAscBy
|
||||
, joinLeftAscBy
|
||||
, joinOuterAscBy
|
||||
-- Joins on sorted stream
|
||||
, innerSortedJoin
|
||||
, leftSortedJoin
|
||||
, outerSortedJoin
|
||||
)
|
||||
where
|
||||
|
||||
@ -48,7 +83,7 @@ where
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Data.IORef (newIORef, readIORef, modifyIORef')
|
||||
import Streamly.Internal.Data.Fold.Type (Fold)
|
||||
import Streamly.Internal.Data.Stream.Type (Stream, cross)
|
||||
import Streamly.Internal.Data.Stream.Type (Stream(..), Step(..), cross)
|
||||
|
||||
import qualified Data.List as List
|
||||
import qualified Streamly.Internal.Data.Fold as Fold
|
||||
@ -110,29 +145,40 @@ strideFromThen offset stride =
|
||||
-- binary search if we have an Ord instance or Ordering returning function. The
|
||||
-- time complexity would then become (m x log n).
|
||||
|
||||
-- | Like 'cross' but emits only those tuples where @a == b@ using the
|
||||
-- supplied equality predicate.
|
||||
-- | Like 'cross' but emits only those tuples where @a == b@ using the supplied
|
||||
-- equality predicate. This is essentially a @cross intersection@ of two
|
||||
-- streams.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> joinInnerGeneric eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2
|
||||
-- >>> innerJoin eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2
|
||||
--
|
||||
-- You should almost always prefer @joinInnerOrd@ over 'joinInnerGeneric' if
|
||||
-- possible. @joinInnerOrd@ is an order of magnitude faster but may take more
|
||||
-- space for caching the second stream.
|
||||
-- The second (inner) stream must be finite. Moreover, it must be either pure
|
||||
-- or capable of multiple evaluations. If not then the caller should cache it
|
||||
-- in an 'Data.Array.Array', if the type does not have an 'Unbox' instance then
|
||||
-- use the Generic 'Data.Array.Generic.Array'. Convert the array to stream
|
||||
-- before calling this function. Caching may also improve performance if the
|
||||
-- stream is expensive to evaluate.
|
||||
--
|
||||
-- See 'Streamly.Internal.Data.Unfold.joinInnerGeneric' for a much faster fused
|
||||
-- alternative.
|
||||
-- If you care about performance this function should be your last choice among
|
||||
-- all inner joins. 'Streamly.Internal.Data.Unfold.innerJoin' is a much faster
|
||||
-- fused alternative. 'innerSortedJoin' is a faster alternative when streams
|
||||
-- are sorted. 'innerOrdJoin' is an order of magnitude faster alternative when
|
||||
-- the type has an 'Ord' instance.
|
||||
--
|
||||
-- Note: Conceptually, this is a commutative operation. Result includes all the
|
||||
-- elements from the left and the right stream. The order of streams can be
|
||||
-- changed without affecting results, except for the ordering within the tuple.
|
||||
--
|
||||
-- Time: O(m x n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinInnerGeneric #-}
|
||||
joinInnerGeneric :: Monad m =>
|
||||
{-# INLINE innerJoin #-}
|
||||
innerJoin :: Monad m =>
|
||||
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
|
||||
joinInnerGeneric eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ cross s1 s2
|
||||
innerJoin eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ cross s1 s2
|
||||
{-
|
||||
joinInnerGeneric eq s1 s2 = do
|
||||
innerJoin eq s1 s2 = do
|
||||
-- ConcatMap works faster than bind
|
||||
Stream.concatMap (\a ->
|
||||
Stream.concatMap (\b ->
|
||||
@ -143,44 +189,44 @@ joinInnerGeneric eq s1 s2 = do
|
||||
) s1
|
||||
-}
|
||||
|
||||
-- | A more efficient 'joinInner' for sorted streams.
|
||||
-- | A more efficient 'innerJoin' for sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE joinInnerAscBy #-}
|
||||
joinInnerAscBy ::
|
||||
{-# INLINE innerSortedJoin #-}
|
||||
innerSortedJoin ::
|
||||
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
|
||||
joinInnerAscBy = undefined
|
||||
innerSortedJoin = undefined
|
||||
|
||||
-- | A more efficient 'joinLeft' for sorted streams.
|
||||
-- | A more efficient 'leftJoin' for sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE joinLeftAscBy #-}
|
||||
joinLeftAscBy :: -- Monad m =>
|
||||
{-# INLINE leftSortedJoin #-}
|
||||
leftSortedJoin :: -- Monad m =>
|
||||
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
|
||||
joinLeftAscBy _eq _s1 _s2 = undefined
|
||||
leftSortedJoin _eq _s1 _s2 = undefined
|
||||
|
||||
-- | A more efficient 'joinOuter' for sorted streams.
|
||||
-- | A more efficient 'outerJoin' for sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE joinOuterAscBy #-}
|
||||
joinOuterAscBy :: -- Monad m =>
|
||||
{-# INLINE outerSortedJoin #-}
|
||||
outerSortedJoin :: -- Monad m =>
|
||||
(a -> b -> Ordering)
|
||||
-> Stream m a
|
||||
-> Stream m b
|
||||
-> Stream m (Maybe a, Maybe b)
|
||||
joinOuterAscBy _eq _s1 _s2 = undefined
|
||||
outerSortedJoin _eq _s1 _s2 = undefined
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Set operations (special joins)
|
||||
@ -190,8 +236,8 @@ joinOuterAscBy _eq _s1 _s2 = undefined
|
||||
-- the best would be to use an Array with linear search. If the second stream
|
||||
-- is sorted we can also use a binary search, using Ord constraint.
|
||||
|
||||
-- | Keep only those elements in the second stream that are present in the
|
||||
-- first stream too. The first stream is folded to a container using the
|
||||
-- | Keep only those elements in the first stream that are present in the
|
||||
-- second stream too. The second stream is folded to a container using the
|
||||
-- supplied fold and then the elements in the container are looked up using the
|
||||
-- supplied lookup function.
|
||||
--
|
||||
@ -206,23 +252,30 @@ filterStreamWith :: Monad m =>
|
||||
filterStreamWith fld member s1 s2 =
|
||||
Stream.concatEffect
|
||||
$ do
|
||||
xs <- Stream.fold fld s1
|
||||
return $ Stream.filter (`member` xs) s2
|
||||
xs <- Stream.fold fld s2
|
||||
return $ Stream.filter (`member` xs) s1
|
||||
|
||||
-- | 'filterInStreamGenericBy' retains only those elements in the second stream that
|
||||
-- are present in the first stream.
|
||||
-- XXX instead of folding the second stream to a list we could use it directly.
|
||||
-- If the user wants they can generate the stream from an array and also call
|
||||
-- uniq or nub on it. We can provide a convenience Stream -> Stream to cache
|
||||
-- a finite stream in an array and serve it from the cache. The user can decide
|
||||
-- what is best based on the context. They can also choose to use a boxed or
|
||||
-- unboxed array for caching. To force caching we can make the second stream
|
||||
-- monad type Identity. But that may be less flexible. One option is to use
|
||||
-- cachedIntersectBy etc for automatic caching.
|
||||
|
||||
-- | 'intersectBy' returns a subsequence of the first stream which intersects
|
||||
-- with the second stream. Note that this is not a commutative operation unlike
|
||||
-- a set intersection, because of duplicate elements in the stream the order of
|
||||
-- the streams matters. This is similar to 'Data.List.intersectBy'. Note that
|
||||
-- intersectBy is a special case of 'innerJoin'.
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
|
||||
-- [2,1,1]
|
||||
-- >>> f s1 s2 = Stream.fold Fold.toList $ Stream.intersectBy (==) (Stream.fromList s1) (Stream.fromList s2)
|
||||
-- >>> f [1,3,4,4,5]) [2,3,4,5,5]
|
||||
-- [3,4,4,5]
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
|
||||
-- [1,2,2]
|
||||
--
|
||||
-- Similar to the list intersectBy operation but with the stream argument order
|
||||
-- flipped.
|
||||
--
|
||||
-- The first stream must be finite and must not block. Second stream is
|
||||
-- processed only after the first stream is fully realized.
|
||||
-- First stream can be infinite, the second stream must be finite and must be
|
||||
-- capable of multiple evaluations.
|
||||
--
|
||||
-- Space: O(n) where @n@ is the number of elements in the second stream.
|
||||
--
|
||||
@ -230,46 +283,87 @@ filterStreamWith fld member s1 s2 =
|
||||
-- @n@ is the number of elements in the second stream.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE filterInStreamGenericBy #-}
|
||||
filterInStreamGenericBy :: Monad m =>
|
||||
{-# INLINE intersectBy #-}
|
||||
intersectBy :: Monad m =>
|
||||
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
|
||||
filterInStreamGenericBy eq =
|
||||
intersectBy eq =
|
||||
-- XXX Use an (unboxed) array instead.
|
||||
filterStreamWith
|
||||
(Fold.scanMaybe (Fold.uniqBy eq) Fold.toListRev)
|
||||
(List.any . eq)
|
||||
|
||||
-- | Like 'filterInStreamGenericBy' but assumes that the input streams are sorted in
|
||||
-------------------------------------------------------------------------------
|
||||
-- Intersection of sorted streams
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-- XXX The sort order is not important as long both the streams have the same
|
||||
-- sort order. We need to move only in one direction in each stream.
|
||||
-- XXX Fix the argument order to use the same behavior as intersectBy.
|
||||
|
||||
-- | Like 'intersectBy' but assumes that the input streams are sorted in
|
||||
-- ascending order. To use it on streams sorted in descending order pass an
|
||||
-- inverted comparison function returning GT for less than and LT for greater
|
||||
-- than.
|
||||
--
|
||||
-- Both streams can be infinite.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- Time: O(m+n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE filterInStreamAscBy #-}
|
||||
filterInStreamAscBy :: Monad m =>
|
||||
{-# INLINE_NORMAL sortedIntersectBy #-}
|
||||
sortedIntersectBy :: Monad m =>
|
||||
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
filterInStreamAscBy eq s1 s2 = Stream.intersectBySorted eq s2 s1
|
||||
sortedIntersectBy cmp (Stream stepa ta) (Stream stepb tb) =
|
||||
Stream step
|
||||
( ta -- left stream state
|
||||
, tb -- right stream state
|
||||
, Nothing -- left value
|
||||
, Nothing -- right value
|
||||
)
|
||||
|
||||
-- | Delete all elements of the first stream from the seconds stream. If an
|
||||
-- element occurs multiple times in the first stream as many occurrences of it
|
||||
-- are deleted from the second stream.
|
||||
where
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
-- step 1, fetch the first value
|
||||
step gst (sa, sb, Nothing, b) = do
|
||||
r <- stepa gst sa
|
||||
return $ case r of
|
||||
Yield a sa' -> Skip (sa', sb, Just a, b) -- step 2/3
|
||||
Skip sa' -> Skip (sa', sb, Nothing, b)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 2, fetch the second value
|
||||
step gst (sa, sb, a@(Just _), Nothing) = do
|
||||
r <- stepb gst sb
|
||||
return $ case r of
|
||||
Yield b sb' -> Skip (sa, sb', a, Just b) -- step 3
|
||||
Skip sb' -> Skip (sa, sb', a, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
-- step 3, compare the two values
|
||||
step _ (sa, sb, Just a, Just b) = do
|
||||
let res = cmp a b
|
||||
return $ case res of
|
||||
GT -> Skip (sa, sb, Just a, Nothing) -- step 2
|
||||
LT -> Skip (sa, sb, Nothing, Just b) -- step 1
|
||||
EQ -> Yield a (sa, sb, Nothing, Just b) -- step 1
|
||||
|
||||
-- | Returns a subsequence of the first stream, deleting first occurrences of
|
||||
-- those elements that are present in the second stream. Note that this is not
|
||||
-- a commutative operation. This is similar to the 'Data.List.deleteFirstsBy'.
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList $ Stream.deleteInStreamGenericBy (==) (Stream.fromList [1,2,3]) (Stream.fromList [1,2,2])
|
||||
-- [2]
|
||||
-- >>> f xs ys = Stream.fold Fold.toList $ Stream.deleteFirstsBy (==) (Stream.fromList xs) (Stream.fromList ys)
|
||||
-- >>> f [1,2,2,3,3,5] [1,2,2,3,4]
|
||||
-- [2,3,5]
|
||||
--
|
||||
-- The following laws hold:
|
||||
-- The following holds:
|
||||
--
|
||||
-- > deleteInStreamGenericBy (==) s1 (s1 `append` s2) === s2
|
||||
-- > deleteInStreamGenericBy (==) s1 (s1 `interleave` s2) === s2
|
||||
-- > deleteFirstsBy (==) (Stream.nub s2 `append` s1) s2 === s1
|
||||
-- > deleteFirstsBy (==) (Stream.nub s2 `interleave` s1) s2 === s1
|
||||
--
|
||||
-- Same as the list 'Data.List.//' operation but with argument order flipped.
|
||||
--
|
||||
-- The first stream must be finite and must not block. Second stream is
|
||||
-- processed only after the first stream is fully realized.
|
||||
-- First stream can be infinite, second stream must be finite.
|
||||
--
|
||||
-- Space: O(m) where @m@ is the number of elements in the first stream.
|
||||
--
|
||||
@ -277,58 +371,83 @@ filterInStreamAscBy eq s1 s2 = Stream.intersectBySorted eq s2 s1
|
||||
-- @n@ is the number of elements in the second stream.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE deleteInStreamGenericBy #-}
|
||||
deleteInStreamGenericBy :: Monad m =>
|
||||
{-# INLINE deleteFirstsBy #-}
|
||||
deleteFirstsBy :: Monad m =>
|
||||
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
|
||||
deleteInStreamGenericBy eq s1 s2 =
|
||||
Stream.concatEffect
|
||||
$ do
|
||||
-- This may work well if s1 is small
|
||||
-- If s1 is big we can go through s1, deleting elements from s2 and
|
||||
-- not emitting an element if it was successfully deleted from s2.
|
||||
-- we will need a deleteBy that can return whether the element was
|
||||
-- deleted or not.
|
||||
xs <- Stream.fold Fold.toList s2
|
||||
let f = Fold.foldl' (flip (List.deleteBy eq)) xs
|
||||
fmap Stream.fromList $ Stream.fold f s1
|
||||
deleteFirstsBy eq s2 s1 =
|
||||
-- XXX s2 can be a sorted mutable array and we can use binary
|
||||
-- search to find. Mark the element deleted, count the deletions
|
||||
-- and reconsolidate the array when a min number of elements is
|
||||
-- deleted.
|
||||
|
||||
-- | A more efficient 'deleteInStreamGenericBy' for streams sorted in ascending order.
|
||||
-- XXX Use StreamK or list as second argument instead of Stream to avoid
|
||||
-- concatEffect?
|
||||
Stream.concatEffect $ do
|
||||
xs <- Stream.toList s1
|
||||
-- It reverses the list but that is fine.
|
||||
let del x =
|
||||
List.foldl' (\(ys,res) y ->
|
||||
if x `eq` y
|
||||
then (ys, True)
|
||||
else (x:ys, res)) ([], False)
|
||||
g (ys,_) x =
|
||||
let (ys1, deleted) = del x ys
|
||||
in if deleted
|
||||
then (ys1, Nothing)
|
||||
else (ys1, Just x)
|
||||
in return
|
||||
$ Stream.catMaybes
|
||||
$ fmap snd
|
||||
$ Stream.postscanl' g (xs, Nothing) s2
|
||||
|
||||
-- | A more efficient 'deleteFirstsBy' for streams sorted in ascending order.
|
||||
--
|
||||
-- Both streams can be infinite.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE deleteInStreamAscBy #-}
|
||||
deleteInStreamAscBy :: -- (Monad m) =>
|
||||
{-# INLINE sortedDeleteFirstsBy #-}
|
||||
sortedDeleteFirstsBy :: -- (Monad m) =>
|
||||
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
deleteInStreamAscBy _eq _s1 _s2 = undefined
|
||||
sortedDeleteFirstsBy _eq _s1 _s2 = undefined
|
||||
|
||||
-- XXX Remove the MonadIO constraint. We can just cache one stream and then
|
||||
-- implement using differenceEqBy.
|
||||
|
||||
-- | This essentially appends to the second stream all the occurrences of
|
||||
-- elements in the first stream that are not already present in the second
|
||||
-- stream.
|
||||
-- | Returns the first stream appended with those unique elements from the
|
||||
-- second stream that are not already present in the first stream. Note that
|
||||
-- this is not a commutative operation unlike a set union, argument order
|
||||
-- matters. The behavior is similar to the 'Data.List.unionBy'.
|
||||
--
|
||||
-- Equivalent to the following except that @s2@ is evaluated only once:
|
||||
--
|
||||
-- >>> unionWithStreamGenericBy eq s1 s2 = s2 `Stream.append` (Stream.deleteInStreamGenericBy eq s2 s1)
|
||||
-- >>> unionBy eq s1 s2 = s1 `Stream.append` Stream.deleteFirstsBy eq s1 (Stream.nub s2)
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList $ Stream.unionWithStreamGenericBy (==) (Stream.fromList [1,1,2,3]) (Stream.fromList [1,2,2,4])
|
||||
-- >>> f s1 s2 = Stream.fold Fold.toList $ Stream.unionBy (==) (Stream.fromList s1) (Stream.fromList s2)
|
||||
-- >>> f [1,2,2,4] [1,1,2,3,3]
|
||||
-- [1,2,2,4,3]
|
||||
--
|
||||
-- First stream can be infinite, but second stream must be finite. Note that if
|
||||
-- the first stream is infinite the union means just the first stream. Thus
|
||||
-- union is useful only when both streams are finite. See 'sortedUnionBy' where
|
||||
-- union can work on infinite streams if they are sorted.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m x n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE unionWithStreamGenericBy #-}
|
||||
unionWithStreamGenericBy :: MonadIO m =>
|
||||
{-# INLINE unionBy #-}
|
||||
unionBy :: MonadIO m =>
|
||||
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
|
||||
unionWithStreamGenericBy eq s1 s2 =
|
||||
unionBy eq s2 s1 =
|
||||
Stream.concatEffect
|
||||
$ do
|
||||
-- XXX use a rewrite rule such that if a list converted to stream
|
||||
-- is passed to unionBy then this becomes an identity operation.
|
||||
xs <- Stream.fold Fold.toList s1
|
||||
-- XXX we can use postscanlMAfter' instead of IORef
|
||||
ref <- liftIO $ newIORef $! List.nubBy eq xs
|
||||
@ -341,12 +460,21 @@ unionWithStreamGenericBy eq s1 s2 =
|
||||
return $ Stream.fromList xs1
|
||||
return $ Stream.mapM f s2 `Stream.append` s3
|
||||
|
||||
-- | A more efficient 'unionWithStreamGenericBy' for sorted streams.
|
||||
-- | A more efficient 'unionBy' for sorted streams.
|
||||
--
|
||||
-- Note that the behavior is different from 'unionBy'. In 'unionBy' we append
|
||||
-- the unique elements from second stream only after exhausting the first one
|
||||
-- whereas in sorted streams we can determine unique elements early even when
|
||||
-- we are going through the first stream. Thus the result is an interleaving of
|
||||
-- the two streams, merging those elements from the second stream that are not
|
||||
-- present in the first.
|
||||
--
|
||||
-- Space: O(1)
|
||||
--
|
||||
-- Both streams can be infinite.
|
||||
--
|
||||
-- /Unimplemented/
|
||||
{-# INLINE unionWithStreamAscBy #-}
|
||||
unionWithStreamAscBy :: -- (Monad m) =>
|
||||
{-# INLINE sortedUnionBy #-}
|
||||
sortedUnionBy :: -- (Monad m) =>
|
||||
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
|
||||
unionWithStreamAscBy _eq _s1 _s2 = undefined
|
||||
sortedUnionBy _eq _s1 _s2 = undefined
|
||||
|
@ -82,7 +82,7 @@ module Streamly.Internal.Data.Unfold
|
||||
, dropWhileM
|
||||
|
||||
-- ** Cross product
|
||||
, joinInnerGeneric
|
||||
, innerJoin
|
||||
|
||||
-- ** Resource Management
|
||||
-- | 'bracket' is the most general resource management operation, all other
|
||||
@ -677,10 +677,12 @@ dropWhileM f (Unfold step inject) = Unfold step' inject'
|
||||
dropWhile :: Monad m => (b -> Bool) -> Unfold m a b -> Unfold m a b
|
||||
dropWhile f = dropWhileM (return . f)
|
||||
|
||||
{-# INLINE_NORMAL joinInnerGeneric #-}
|
||||
joinInnerGeneric :: Monad m =>
|
||||
-- | Cross intersection of two unfolds. See
|
||||
-- 'Streamly.Internal.Data.Stream.innerJoin' for more details.
|
||||
{-# INLINE_NORMAL innerJoin #-}
|
||||
innerJoin :: Monad m =>
|
||||
(b -> c -> Bool) -> Unfold m a b -> Unfold m a c -> Unfold m a (b, c)
|
||||
joinInnerGeneric eq s1 s2 = filter (\(a, b) -> a `eq` b) $ cross s1 s2
|
||||
innerJoin eq s1 s2 = filter (\(a, b) -> a `eq` b) $ cross s1 s2
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Exceptions
|
||||
|
@ -649,7 +649,7 @@ intersectBySorted :: (IsStream t, Monad m) =>
|
||||
(a -> a -> Ordering) -> t m a -> t m a -> t m a
|
||||
intersectBySorted eq s1 =
|
||||
IsStream.fromStreamD
|
||||
. StreamD.intersectBySorted eq (IsStream.toStreamD s1)
|
||||
. StreamD.sortedIntersectBy eq (IsStream.toStreamD s1)
|
||||
. IsStream.toStreamD
|
||||
|
||||
-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2
|
||||
|
Loading…
Reference in New Issue
Block a user