Add some combinators to scan using folds

This commit is contained in:
Harendra Kumar 2024-08-13 16:52:48 +05:30
parent 690870277b
commit 345e21aa32

View File

@ -285,7 +285,7 @@ countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial
-- | This is the most general of all demux, classify operations.
-- See 'demux' for documentation.
{-# DEPRECATED demuxGeneric "Use demuxGeneric from Scanl module" #-}
{-# DEPRECATED demuxGeneric "Use demuxScanGeneric instead" #-}
{-# INLINE demuxGeneric #-}
demuxGeneric :: (Monad m, IsMap f, Traversable f) =>
(a -> Key f)
@ -405,15 +405,64 @@ demuxerToContainer getKey getFold =
Partial s -> fin s
_ -> error "demuxerToContainer: unreachable code"
-- | Replacement for demuxGeneric when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- /Unimplemented/
demuxScanGeneric :: -- (Monad m, IsMap f, Traversable f) =>
(a -> k) -- k ~ Key f
-> (k -> m (Fold m a b))
-> Scanl m a (k, b)
demuxScanGeneric = undefined
-- | Scanning variant of 'demuxerToContainer'.
{-# INLINE demuxScanGeneric #-}
demuxScanGeneric :: (Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxScanGeneric getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
initial = return $ Tuple' IsMap.mapEmpty Nothing
{-# INLINE runFold #-}
runFold kv (Fold step1 initial1 extract1 final1) (k, a) = do
res <- initial1
case res of
Partial s -> do
res1 <- step1 s a
$ case res1 of
Partial _ ->
let fld = Fold step1 (return res1) extract1 final1
in Tuple' (IsMap.mapInsert k fld kv) Nothing
Done b -> Tuple' (IsMap.mapDelete k kv) (Just (k, b))
Done b ->
-- Done in "initial" is possible only for the very first time
-- the fold is initialized, and in that case we have not yet
-- inserted it in the Map, so we do not need to delete it.
return $ Tuple' kv (Just (k, b))
step (Tuple' kv _) a = do
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
fld <- getFold k
runFold kv fld (k, a)
Just f -> runFold kv f (k, a)
extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
f (Fold _ i e _) = do
r <- i
case r of
Partial s -> e s
_ -> error "demuxGeneric: unreachable code"
final (Tuple' kv x) = return (Prelude.mapM f kv, x)
f (Fold _ i _ fin) = do
r <- i
case r of
Partial s -> fin s
_ -> error "demuxGeneric: unreachable code"
-- | @demux getKey getFold@: In a key value stream, fold values corresponding
-- to each key using a key specific fold. @getFold@ is invoked to generate a
@ -441,7 +490,7 @@ demuxScanGeneric = undefined
-- /Pre-release/
{-# DEPRECATED demux "Use demux from Scanl module" #-}
{-# DEPRECATED demux "Use demuxScan instead" #-}
{-# INLINE demux #-}
demux :: (Monad m, Ord k) =>
(a -> k)
@ -449,19 +498,28 @@ demux :: (Monad m, Ord k) =>
-> Fold m a (m (Map k b), Maybe (k, b))
demux = demuxGeneric
-- | Replacement for demux when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- /Unimplemented/
demuxScan :: -- (Monad m, Ord k) =>
{-# INLINE demuxUsingMap #-}
demuxUsingMap :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> Scanl m a (k, b)
demuxScan = undefined
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMap = demuxScanGeneric
-- | Scanning variant of 'demuxerToMap'.
-- TODO: To drain the final in-progress folds this requires the drain step of
-- Scanl to be streaming.
{-# INLINE demuxScan #-}
demuxScan :: (Monad m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> Scanl m a (Maybe (k, b))
demuxScan getKey = fmap snd . demuxUsingMap getKey
-- | This is specialized version of 'demuxGeneric' that uses mutable IO cells
-- as fold accumulators for better performance.
{-# DEPRECATED demuxGenericIO "Use demuxGenericIO from Scanl module" #-}
{-# DEPRECATED demuxGenericIO "Use demuxScanGenericIO instead" #-}
{-# INLINE demuxGenericIO #-}
demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
@ -609,15 +667,88 @@ demuxerToContainerIO getKey getFold =
Partial s -> fin s
_ -> error "demuxGenericIO: unreachable code"
-- | Replacement for demuxGenericIO when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- | This is a specialized version of 'demux' that uses mutable IO cells as
-- fold accumulators for better performance.
-- /Unimplemented/
demuxScanGenericIO :: -- (Monad m, IsMap f, Traversable f) =>
(a -> k) -- k ~ Key f
-> (k -> m (Fold m a b))
-> Scanl m a (k, b)
demuxScanGenericIO = undefined
-- Keep in mind that the values in the returned Map may be changed by the
-- ongoing fold if you are using those concurrently in another thread.
{-# INLINE demuxScanGenericIO #-}
demuxScanGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (Key f -> m (Fold m a b))
-> Scanl m a (m (f b), Maybe (Key f, b))
demuxScanGenericIO getKey getFold =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
initial = return $ Tuple' IsMap.mapEmpty Nothing
{-# INLINE initFold #-}
initFold kv (Fold step1 initial1 extract1 final1) (k, a) = do
res <- initial1
case res of
Partial s -> do
res1 <- step1 s a
case res1 of
Partial _ -> do
-- XXX Instead of using a Fold type here use a custom
-- type with an IORef (possibly unboxed) for the
-- accumulator. That will reduce the allocations.
let fld = Fold step1 (return res1) extract1 final1
ref <- liftIO $ newIORef fld
return $ Tuple' (IsMap.mapInsert k ref kv) Nothing
Done b -> return $ Tuple' kv (Just (k, b))
Done b -> return $ Tuple' kv (Just (k, b))
{-# INLINE runFold #-}
runFold kv ref (Fold step1 initial1 extract1 final1) (k, a) = do
res <- initial1
case res of
Partial s -> do
res1 <- step1 s a
case res1 of
Partial _ -> do
let fld = Fold step1 (return res1) extract1 final1
liftIO $ writeIORef ref fld
return $ Tuple' kv Nothing
Done b ->
let kv1 = IsMap.mapDelete k kv
in return $ Tuple' kv1 (Just (k, b))
Done _ -> error "demuxGenericIO: unreachable"
step (Tuple' kv _) a = do
let k = getKey a
case IsMap.mapLookup k kv of
Nothing -> do
f <- getFold k
initFold kv f (k, a)
Just ref -> do
f <- liftIO $ readIORef ref
runFold kv ref f (k, a)
extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
f ref = do
Fold _ i e _ <- liftIO $ readIORef ref
r <- i
case r of
Partial s -> e s
_ -> error "demuxGenericIO: unreachable code"
final (Tuple' kv x) = return (Prelude.mapM f kv, x)
f ref = do
Fold _ i _ fin <- liftIO $ readIORef ref
r <- i
case r of
Partial s -> fin s
_ -> error "demuxGenericIO: unreachable code"
-- | This is specialized version of 'demux' that uses mutable IO cells as
-- fold accumulators for better performance.
@ -625,7 +756,7 @@ demuxScanGenericIO = undefined
-- Keep in mind that the values in the returned Map may be changed by the
-- ongoing fold if you are using those concurrently in another thread.
{-# DEPRECATED demuxIO "Use demuxIO from Scanl module" #-}
{-# DEPRECATED demuxIO "Use demuxScanIO instead" #-}
{-# INLINE demuxIO #-}
demuxIO :: (MonadIO m, Ord k) =>
(a -> k)
@ -633,15 +764,25 @@ demuxIO :: (MonadIO m, Ord k) =>
-> Fold m a (m (Map k b), Maybe (k, b))
demuxIO = demuxGenericIO
-- | Replacement for demuxIO when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- /Unimplemented/
demuxScanIO :: -- (Monad m, Ord k) =>
{-# INLINE demuxUsingMapIO #-}
demuxUsingMapIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> Scanl m a (k, b)
demuxScanIO = undefined
-> Scanl m a (m (Map k b), Maybe (k, b))
demuxUsingMapIO = demuxScanGenericIO
-- | This is a specialized version of 'demuxScan' that uses mutable IO cells as
-- scan accumulators for better performance.
-- TODO: To drain the final in-progress folds this requires the drain step of
-- Scanl to be streaming.
{-# INLINE demuxScanIO #-}
demuxScanIO :: (MonadIO m, Ord k) =>
(a -> k)
-> (k -> m (Fold m a b))
-> Scanl m a (Maybe (k, b))
demuxScanIO getKey = fmap snd . demuxUsingMapIO getKey
-- | Fold a key value stream to a key-value Map. If the same key appears
-- multiple times, only the last value is retained.
@ -773,7 +914,7 @@ demuxKvToMap = demuxKvToContainer
-- XXX Use a Refold m k a b so that we can make the fold key specifc.
-- XXX Is using a function (a -> k) better than using the input (k,a)?
{-# DEPRECATED classifyGeneric "Use classifyGeneric from Scanl module" #-}
{-# DEPRECATED classifyGeneric "Use classifyScanGeneric instead" #-}
{-# INLINE classifyGeneric #-}
classifyGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
-- Note: we need to return the Map itself to display the in-progress values
@ -877,15 +1018,65 @@ toContainer f (Fold step1 initial1 _ final1) =
r <- Prelude.mapM final1 kv
return $ IsMap.mapUnion r kv1
-- | Replacement for classifyGeneric when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- | Scanning variant of 'toContainer'.
-- /Unimplemented/
classifyScanGeneric :: -- (Monad m, IsMap f, Traversable f) =>
(a -> k) -- k ~ Key f
-> Fold m a b
-> Scanl m a (k, b)
classifyScanGeneric = undefined
{-# INLINE classifyScanGeneric #-}
classifyScanGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
-- Note: we need to return the Map itself to display the in-progress values
-- e.g. to implement top. We could possibly create a separate abstraction
-- for that use case. We return an action because we want it to be lazy so
-- that the downstream consumers can choose to process or discard it.
(a -> Key f) -> Fold m a b -> Scanl m a (m (f b), Maybe (Key f, b))
classifyScanGeneric f (Fold step1 initial1 extract1 final1) =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
initial = return $ Tuple3' IsMap.mapEmpty Set.empty Nothing
{-# INLINE initFold #-}
initFold kv set k a = do
x <- initial1
case x of
Partial s -> do
r <- step1 s a
$ case r of
Partial s1 ->
Tuple3' (IsMap.mapInsert k s1 kv) set Nothing
Done b ->
Tuple3' kv set (Just (k, b))
Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b)))
step (Tuple3' kv set _) a = do
let k = f a
case IsMap.mapLookup k kv of
Nothing -> do
if Set.member k set
then return (Tuple3' kv set Nothing)
else initFold kv set k a
Just s -> do
r <- step1 s a
$ case r of
Partial s1 ->
Tuple3' (IsMap.mapInsert k s1 kv) set Nothing
Done b ->
let kv1 = IsMap.mapDelete k kv
in Tuple3' kv1 (Set.insert k set) (Just (k, b))
extract (Tuple3' kv _ x) = return (Prelude.mapM extract1 kv, x)
final (Tuple3' kv set x) = return (IsMap.mapTraverseWithKey f1 kv, x)
f1 k s = do
if Set.member k set
-- XXX Why are we doing this? If it is in the set then it will not
-- be in the map and vice-versa.
then extract1 s
else final1 s
-- | Folds the values for each key using the supplied fold. When scanning, as
-- soon as the fold is complete, its result is available in the second
@ -898,21 +1089,25 @@ classifyScanGeneric = undefined
-- >> classify f fld = Fold.demux f (const fld)
{-# DEPRECATED classify "Use classify from Scanl module" #-}
{-# DEPRECATED classify "Use classifyScan instead" #-}
{-# INLINE classify #-}
classify :: (Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classify = classifyGeneric
-- | Replacement for classify when Folds will not have the 'extract' function.
-- Note that this requires the drain step of Scanl to be streaming.
{-# INLINE classifyUsingMap #-}
classifyUsingMap :: (Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Scanl m a (m (Map k b), Maybe (k, b))
classifyUsingMap = classifyScanGeneric
-- XXX Make it consistent with demux.
-- | Scanning variant of 'toMap'.
-- /Unimplemented/
classifyScan :: -- (Monad m, Ord k) =>
(a -> k)
-> Fold m a b
-> Scanl m a (k, b)
classifyScan = undefined
{-# INLINE classifyScan #-}
classifyScan :: (MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Scanl m a (Maybe (k, b))
classifyScan getKey = fmap snd . classifyUsingMap getKey
-- XXX we can use a Prim IORef if we can constrain the state "s" to be Prim
@ -1033,15 +1228,66 @@ toContainerIO f (Fold step1 initial1 _ final1) =
g ref = liftIO (readIORef ref) >>= final1
-- | Replacement for classifyGenericIO when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
-- | Scanning variant of 'classifyGenericIO'.
-- /Unimplemented/
classifyScanGenericIO :: -- (Monad m, IsMap f, Traversable f) =>
(a -> k) -- k ~ Key f
-> Fold m a b
-> Scanl m a (k, b)
classifyScanGenericIO = undefined
{-# INLINE classifyScanGenericIO #-}
classifyScanGenericIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Scanl m a (m (f b), Maybe (Key f, b))
classifyScanGenericIO f (Fold step1 initial1 extract1 final1) =
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
initial = return $ Tuple3' IsMap.mapEmpty Set.empty Nothing
{-# INLINE initFold #-}
initFold kv set k a = do
x <- initial1
case x of
Partial s -> do
r <- step1 s a
case r of
Partial s1 -> do
ref <- liftIO $ newIORef s1
return $ Tuple3' (IsMap.mapInsert k ref kv) set Nothing
Done b ->
return $ Tuple3' kv set (Just (k, b))
Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b)))
step (Tuple3' kv set _) a = do
let k = f a
case IsMap.mapLookup k kv of
Nothing -> do
if Set.member k set
then return (Tuple3' kv set Nothing)
else initFold kv set k a
Just ref -> do
s <- liftIO $ readIORef ref
r <- step1 s a
case r of
Partial s1 -> do
liftIO $ writeIORef ref s1
return $ Tuple3' kv set Nothing
Done b ->
let kv1 = IsMap.mapDelete k kv
in return
$ Tuple3' kv1 (Set.insert k set) (Just (k, b))
extract (Tuple3' kv _ x) = return (Prelude.mapM g kv, x)
g ref = liftIO (readIORef ref) >>= extract1
final (Tuple3' kv set x) = return (IsMap.mapTraverseWithKey g kv, x)
g k ref = do
s <- liftIO $ readIORef ref
if Set.member k set
then extract1 s
else final1 s
-- | Same as classify except that it uses mutable IORef cells in the
-- Map providing better performance. Be aware that if this is used as a scan,
@ -1051,21 +1297,27 @@ classifyScanGenericIO = undefined
-- >> classifyIO f fld = Fold.demuxIO f (const fld)
{-# DEPRECATED classifyIO "Use classifyIO from Scanl module" #-}
{-# DEPRECATED classifyIO "Use classifyScanIO instead" #-}
{-# INLINE classifyIO #-}
classifyIO :: (MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classifyIO = classifyGenericIO
-- | Replacement for classifyIO when Folds will not have the 'extract'
-- function. Note that this requires the drain step of Scanl to be streaming.
{-# INLINE classifyUsingMapIO #-}
classifyUsingMapIO :: (MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Scanl m a (m (Map k b), Maybe (k, b))
classifyUsingMapIO = classifyScanGenericIO
-- | This is a specialized version of 'classifyScan' that uses mutable IO cells
-- as scan accumulators for better performance.
-- /Unimplemented/
classifyScanIO :: -- (MonadIO m, Ord k) =>
(a -> k)
-> Fold m a b
-> Scanl m a (k, b)
classifyScanIO = undefined
-- TODO: To drain the final in-progress folds this requires the drain step of
-- Scanl to be streaming.
{-# INLINE classifyScanIO #-}
classifyScanIO :: (MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Scanl m a (Maybe (k, b))
classifyScanIO getKey = fmap snd . classifyUsingMapIO getKey
{-# INLINE toContainer #-}