Implement min/max/range without dequeue

This commit is contained in:
Harendra Kumar 2022-06-07 17:02:58 +05:30
parent 40e549e605
commit d36f02ff12
3 changed files with 86 additions and 122 deletions

View File

@ -51,17 +51,18 @@ module Streamly.Internal.Data.Fold.Window
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor(bimap)
import Data.Function ((&))
import Data.Maybe (fromMaybe)
import Foreign (Storable(..))
import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Tuple.Strict
(Tuple'(..), Tuple3Fused' (Tuple3Fused'))
import Prelude hiding (length, sum, minimum, maximum)
import qualified Deque.Strict as DQ
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Ring.Foreign as Ring
-- $setup
-- >>> import Data.Bifunctor(bimap)
@ -213,111 +214,90 @@ powerSumFrac p = lmap (** p) sum
-- Location
-- Theoretically, we can approximate minimum in a rolling window by using a
-- 'powerMean' with sufficiently large negative power.
-- XXX Remove MonadIO constraint
-- | Determine the maximum and minimum in a rolling window.
-- XXX If we need to know the minimum in the window only once in a while then
-- we can use linear search when it is extracted and not pay the cost all the
-- time.
-- If you want to compute the range of the entire stream @Fold.teeWith (,)
-- Fold.maximum Fold.minimum@ would be much faster.
-- | The minimum element in a rolling window.
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
-- If you want to compute the minimum of the entire stream Fold.minimum from
-- streamly package would be much faster.
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
{-# INLINE range #-}
range :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a))
range n = Fold step initial extract
-- XXX Use Ring unfold and then fold for composing maximum and minimum to
-- get the range.
initial =
if n <= 0
then error "range: window size must be > 0"
let f (a, b) = Partial $ Tuple3Fused' a b (0 :: Int)
in fmap f $ liftIO $ n
step (Tuple3Fused' rb rh i) a = do
rh1 <- liftIO $ Ring.unsafeInsert rb rh a
return $ Partial $ Tuple3Fused' rb rh1 (i + 1)
-- XXX We need better Ring array APIs so that we can unfold the ring to a
-- stream and fold the stream using a fold of our choice.
-- We could just scan the stream to get a stream of ring buffers and then
-- map required folds over those, but we need to be careful that all those
-- rings refer to the same mutable ring, therefore, downstream needs to
-- process those strictly before it can change.
foldFunc i
| i < n = Ring.unsafeFoldRingM
| otherwise = Ring.unsafeFoldRingFullM
extract (Tuple3Fused' rb rh i) =
if i == 0
then return Nothing
else do
x <- liftIO $ peek rh
let accum (mn, mx) a = return (min mn a, max mx a)
fmap Just $ foldFunc i rh accum (x, x) rb
-- | Find the minimum element in a rolling window.
-- This implementation traverses the entire window buffer to compute the
-- minimum whenever we demand it. It performs better than the dequeue based
-- implementation in @streamly-statistics@ package when any of the following
-- holds:
-- * window size is small (< 30)
-- * we are using this as a fold instead of a scan.
-- For other cases the implementation in the @streamly-statistics@ package
-- performs better.
-- If you want to compute the minimum of the entire stream
-- 'Streamly.Data.Fold.minimum' is much faster.
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
minimum = Fold step initial extract
minimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
minimum n = fmap (fmap fst) $ range n
initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))
step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))
{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q
dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia <= snd ia'
then dqloop ia q'
else DQ.snoc ia q
extract (Tuple3' _ _ q) = return $ snd
$ fromMaybe (0, error "min: Empty stream")
$ DQ.head q
-- Theoretically, we can approximate maximum in a rolling window by using a
-- 'powerMean' with sufficiently large positive power.
-- | The maximum element in a rolling window.
-- If you want to compute the maximum of the entire stream Fold.maximum from
-- streamly package would be much faster.
-- See the performance related comments in 'minimum'.
-- If you want to compute the maximum of the entire stream 'Fold.maximum' would
-- be much faster.
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
maximum = Fold step initial extract
initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))
step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))
{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q
dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia >= snd ia'
then dqloop ia q'
else DQ.snoc ia q
extract (Tuple3' _ _ q) =
$ snd
$ fromMaybe (0, error "max: Empty stream")
$ DQ.head q
maximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
maximum n = fmap (fmap snd) $ range n
-- | Arithmetic mean of elements in a sliding window:
@ -335,19 +315,3 @@ maximum = Fold step initial extract
{-# INLINE mean #-}
mean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a
mean = Fold.teeWith (/) sum length
-- | The difference between the maximum and minimum elements of a rolling window.
-- >>> range = Fold.teeWith (-) maximum minimum
-- If you want to compute the range of the entire stream @Fold.teeWith (-)
-- Fold.maximum Fold.min@ from the streamly package would be much faster.
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
{-# INLINE range #-}
range :: (Monad m, Num a, Ord a) => Fold m (a, Maybe a) a
range = Fold.teeWith (-) maximum minimum

View File

@ -343,7 +343,6 @@ library
, transformers-base >= 0.4 && < 0.5
, primitive >= 0.5.4 && < 0.8
, heaps >= 0.3 && < 0.5
, deque >= 0.4 && < 0.5
, hashable >= 1.3 && < 1.5
, unordered-containers >= 0.2 && < 0.3
if flag(use-unliftio)

View File

@ -33,7 +33,9 @@ main = hspec $ do
describe "Correctness" $ do
let winSize = 3
testCase1 = [31, 41, 59, 26, 53, 58, 97] :: [Double]
testCase1 =
[1.0, 4.0, 3.0, 2.1, -5.1, -2.0, 7.0, 3.0, -2.5] :: [Double]
testCase2 = replicate 5 1.0 ++ [7.0]
testFunc tc f sI sW = do
@ -44,18 +46,17 @@ main = hspec $ do
it "Infinite" $ a == sI
it ("Finite " ++ show winSize) $ b == sW
testFunc2 tc expec f = do
let c = S.fromList tc
a <- runIO $ S.fold (f winSize) c
it (show tc) $ a == expec
describe "minimum" $ do
let scanInf = [31, 31, 31, 26, 26, 26, 26] :: [Double]
scanWin = [31, 31, 31, 26, 26, 26, 53] :: [Double]
testFunc testCase1 minimum scanInf scanWin
testFunc2 testCase1 (Just (-2.5)) minimum
describe "maximum" $ do
let scanInf = [31, 41, 59, 59, 59, 59, 97] :: [Double]
scanWin = [31, 41, 59, 59, 59, 58, 97] :: [Double]
testFunc testCase1 maximum scanInf scanWin
testFunc2 testCase1 (Just 7.0) maximum
describe "range" $ do
let scanInf = [0, 10, 28, 33, 33, 33, 71] :: [Double]
scanWin = [0, 10, 28, 33, 33, 32, 44] :: [Double]
testFunc testCase1 range scanInf scanWin
testFunc2 testCase1 (Just (-2.5, 7.0)) range
describe "sum" $ do
let scanInf = [1, 2, 3, 4, 5, 12] :: [Double]
scanWin = [1, 2, 3, 3, 3, 9] :: [Double]