Fix uniqBy and rollingMap

Fix hlint

Fix review comments

Add Functor contraint

Fix review comments
This commit is contained in:
Ranjeet Kumar Ranjan 2022-03-03 21:32:32 +05:30 committed by Harendra Kumar
parent f31f28c0d8
commit 4b49390286
4 changed files with 45 additions and 25 deletions

View File

@ -151,7 +151,7 @@ pollCounts n =
where
f = Internal.rollingMap (-) . Internal.delayPost 1
f = Internal.rollingMap2 (-) . Internal.delayPost 1
{-# INLINE timestamped #-}
timestamped :: (S.MonadAsync m) => SerialT m Int -> m ()

View File

@ -113,6 +113,7 @@ module Streamly.Internal.Data.Stream.StreamD.Transform
-- | Map using the previous element.
, rollingMap
, rollingMapM
, rollingMap2
-- * Maybe Streams
, mapMaybe
@ -1200,32 +1201,36 @@ slicesBy p (Stream step1 state1) = Stream step (Just (state1, 0, 0))
-- Rolling map
------------------------------------------------------------------------------
data RollingMapState s a = RollingMapInit s | RollingMapGo s a
data RollingMapState s a = RollingMapGo s a
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM f (Stream step1 state1) = Stream step (RollingMapInit state1)
where
step gst (RollingMapInit st) = do
r <- step1 (adaptState gst) st
return $ case r of
Yield x s -> Skip $ RollingMapGo s x
Skip s -> Skip $ RollingMapInit s
Stop -> Stop
rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM f (Stream step1 state1) = Stream step (RollingMapGo state1 Nothing)
step gst (RollingMapGo s1 x1) = do
where
step gst (RollingMapGo s1 curr) = do
r <- step1 (adaptState gst) s1
case r of
Yield x s -> do
!res <- f x x1
return $ Yield res $ RollingMapGo s x
Skip s -> return $ Skip $ RollingMapGo s x1
!res <- f curr x
return $ Yield res $ RollingMapGo s (Just x)
Skip s -> return $ Skip $ RollingMapGo s curr
Stop -> return Stop
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b
rollingMap f = rollingMapM (\x y -> return $ f x y)
{-# INLINE rollingMap2 #-}
rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap2 f = catMaybes . rollingMap g
where
g Nothing _ = Nothing
g (Just x) y = Just (f x y)
------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------
@ -1238,3 +1243,7 @@ mapMaybe f = fmap fromJust . filter isJust . map f
{-# INLINE_NORMAL mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM f = fmap fromJust . filter isJust . mapM f
{-# INLINE catMaybes #-}
catMaybes :: (Monad m) => Stream m (Maybe a) -> Stream m a
catMaybes = fmap fromJust . filter isJust

View File

@ -156,6 +156,7 @@ module Streamly.Internal.Data.Stream.IsStream.Transform
-- | Map using the previous element.
, rollingMapM
, rollingMap
, rollingMap2
-- * Maybe Streams
-- Move these to Streamly.Data.Maybe.Stream?
@ -921,8 +922,14 @@ filterM p m = fromStreamD $ D.filterM p $ toStreamD m
{-# INLINE uniqBy #-}
uniqBy :: (IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy eq =
catMaybes . rollingMap (\x y -> if x `eq` y then Nothing else Just y)
uniqBy eq = catMaybes . rollingMap f
where
f pre curr =
case pre of
Nothing -> Just curr
Just x -> if x `eq` curr then Nothing else Just curr
-- | Drop repeated elements that are adjacent to each other.
--
@ -1519,7 +1526,7 @@ elemIndices a = findIndices (== a)
-- /Pre-release/
--
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap :: (IsStream t, Monad m) => (Maybe a -> a -> b) -> t m a -> t m b
rollingMap f m = fromStreamD $ D.rollingMap f $ toStreamD m
-- | Like 'rollingMap' but with an effectful map function.
@ -1527,9 +1534,13 @@ rollingMap f m = fromStreamD $ D.rollingMap f $ toStreamD m
-- /Pre-release/
--
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
rollingMapM :: (IsStream t, Monad m) => (Maybe a -> a -> m b) -> t m a -> t m b
rollingMapM f m = fromStreamD $ D.rollingMapM f $ toStreamD m
{-# INLINE rollingMap2 #-}
rollingMap2 :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap2 f m = fromStreamD $ D.rollingMap2 f $ toStreamD m
------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------

View File

@ -178,10 +178,10 @@ intersectBy ::
)
-> (Int -> Int -> a)
-> Property
intersectBy _srt intersectFunc cmp =
intersectBy srt intersectFunc cmp =
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
monadicIO $ action (sort ls0) (sort ls1)
monadicIO $ action (srt ls0) (srt ls1)
where
@ -194,7 +194,7 @@ intersectBy _srt intersectFunc cmp =
(S.fromList ls0)
(S.fromList ls1)
let v2 = ls0 `intersect` ls1
assert (v1 == sort v2)
assert (sort v1 == sort v2)
-------------------------------------------------------------------------------
-- Main
@ -215,7 +215,7 @@ main = hspec $ do
prop "joinLeft" Main.joinLeft
prop "joinLeftMap" Main.joinLeftMap
-- intersect
-- XXX currently API is broken https://github.com/composewell/streamly/issues/1471
--prop "intersectBy" (intersectBy id Top.intersectBy (==))
prop "intersectBy"
(intersectBy id Top.intersectBy (==))
prop "intersectBySorted"
(intersectBy sort Top.intersectBySorted compare)