Add a module for sliding window folds

Moved some basic incremental folds from 'streamly-statistics" package.
This commit is contained in:
Ranjeet Kumar Ranjan 2022-03-15 15:03:33 +05:30 committed by Harendra Kumar
parent ee85b7a980
commit f95fcbd82f
9 changed files with 462 additions and 0 deletions

View File

@ -0,0 +1,115 @@
module Main (main) where
import Control.DeepSeq (NFData)
import Streamly.Data.Fold (Fold)
import Streamly.Prelude (SerialT)
import System.Random (randomRIO)
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Ring.Foreign as Ring
import qualified Streamly.Prelude as Stream
import qualified Streamly.Internal.Data.Fold.Window as Window
import Gauge
{-# INLINE source #-}
source :: (Monad m, Stream.IsStream t, Num a, Stream.Enumerable a) =>
Int -> a -> t m a
source len from =
Stream.enumerateFromThenTo from (from + 1) (from + fromIntegral len)
{-# INLINE sourceDescending #-}
sourceDescending :: (Monad m, Stream.IsStream t, Num a, Stream.Enumerable a) =>
Int -> a -> t m a
sourceDescending len from =
Stream.enumerateFromThenTo
(from + fromIntegral len)
(from + fromIntegral (len - 1))
from
{-# INLINE sourceDescendingInt #-}
sourceDescendingInt :: (Monad m, Stream.IsStream t) => Int -> Int -> t m Int
sourceDescendingInt = sourceDescending
{-# INLINE benchWith #-}
benchWith :: (Num a, NFData a) =>
(Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a a -> Benchmark
benchWith src len name f =
bench name
$ nfIO
$ randomRIO (1, 1 :: Int) >>= Stream.fold f . src len . fromIntegral
{-# INLINE benchWithFold #-}
benchWithFold :: Int -> String -> Fold IO Double Double -> Benchmark
benchWithFold = benchWith source
{-# INLINE benchWithFoldInt #-}
benchWithFoldInt :: Int -> String -> Fold IO Int Int -> Benchmark
benchWithFoldInt = benchWith source
{-# INLINE benchWithPostscan #-}
benchWithPostscan :: Int -> String -> Fold IO Double Double -> Benchmark
benchWithPostscan len name f =
bench name $ nfIO $ randomRIO (1, 1) >>=
Stream.drain . Stream.postscan f . source len
{-# INLINE numElements #-}
numElements :: Int
numElements = 100000
main :: IO ()
main =
defaultMain
[ bgroup
"fold"
[ benchWithFold numElements "minimum (window size 100)"
(Ring.slidingWindow 100 Window.minimum)
, benchWithFold numElements "minimum (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
, benchWith sourceDescendingInt numElements
"minimum descending (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
, benchWithFold numElements "maximum (window size 100)"
(Ring.slidingWindow 100 Window.maximum)
, benchWithFold numElements "maximum (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
, benchWith sourceDescendingInt numElements
"maximum descending (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
, benchWithFold numElements "range (window size 100)"
(Ring.slidingWindow 100 Window.range)
, benchWithFold numElements "range (window size 1000)"
(Ring.slidingWindow 1000 Window.range)
, benchWithFoldInt numElements "sumInt (window size 100)"
(Ring.slidingWindow 100 Window.sumInt)
, benchWithFoldInt numElements "sum for Int (window size 100)"
(Ring.slidingWindow 100 Window.sum)
, benchWithFold numElements "sum (window size 100)"
(Ring.slidingWindow 100 Window.sum)
, benchWithFold numElements "sum (window size 1000)"
(Ring.slidingWindow 1000 Window.sum)
, benchWithFold numElements "sum (entire stream)"
(Window.whole Window.sum)
, benchWithFold numElements "sum (Data.Fold)"
Fold.sum
]
, bgroup
"scan"
[ benchWithPostscan numElements "minimum (window size 100)"
(Ring.slidingWindow 100 Window.minimum)
, benchWithPostscan numElements "minimum (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
, benchWithPostscan numElements "maximum (window size 100)"
(Ring.slidingWindow 100 Window.maximum)
, benchWithPostscan numElements "maximum (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
, benchWithPostscan numElements "range (window size 100)"
(Ring.slidingWindow 100 Window.range)
, benchWithPostscan numElements "range (window size 1000)"
(Ring.slidingWindow 1000 Window.range)
, benchWithPostscan numElements "sum (window size 100)"
(Ring.slidingWindow 100 Window.sum)
, benchWithPostscan numElements "sum (window size 1000)"
(Ring.slidingWindow 1000 Window.sum)
]
]

View File

@ -322,6 +322,12 @@ benchmark Data.Fold
else
buildable: True
benchmark Data.Fold.Window
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Fold
main-is: Window.hs
benchmark Data.Parser.ParserD
import: bench-options
type: exitcode-stdio-1.0

View File

@ -0,0 +1,263 @@
module Streamly.Internal.Data.Fold.Window
( lmap
, length
, whole
, sum
, sumInt
, minimum
, maximum
, range
)
where
import Data.Bifunctor(bimap)
import Data.Function ((&))
import Data.Maybe (fromMaybe)
import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Prelude hiding (length, sum, minimum, maximum)
import qualified Deque.Strict as DQ
import qualified Streamly.Data.Fold as Fold
-- $setup
-- >>> import Data.Bifunctor(bimap)
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import Prelude hiding (length, sum, minimum, maximum)
-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------
-- | Map a function on the incoming as well as outgoing element of a rolling
-- window fold.
--
-- >>> lmap f = Fold.lmap (bimap f (f <$>))
--
{-# INLINE lmap #-}
lmap :: (c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
lmap f = Fold.lmap (bimap f (f <$>))
-- | Convert a rolling fold to a normal fold using the entire input stream as a
-- single window.
--
-- >>> whole f = Fold.lmap (\x -> (x, Nothing)) f
--
{-# INLINE whole #-}
whole :: Fold m (a, Maybe a) b -> Fold m a b
whole = Fold.lmap (, Nothing)
-------------------------------------------------------------------------------
-- Sum
-------------------------------------------------------------------------------
-- XXX Overflow.
--
-- | The sum of all the elements in a rolling window. The input elements are
-- required to be intergal numbers.
--
-- This was written in the hope that it would be a tiny bit faster than 'sum'
-- for 'Integral' values. But turns out that 'sum' is 2% faster than this even
-- for intergal values!
--
-- /Internal/
--
{-# INLINE sumInt #-}
sumInt :: forall m a. (Monad m, Integral a) => Fold m (a, Maybe a) a
sumInt = Fold step initial extract
where
initial = return $ Partial (0 :: a)
step s (a, ma) =
return
$ Partial
$ case ma of
Nothing -> s + a
Just old -> s + a - old
extract = return
-- XXX Overflow.
--
-- | Sum of all the elements in a rolling window:
--
-- Uses Kahan-Babuska-Neumaier style summation for numerical stability of
-- floating precision arithmetic.
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
--
{-# INLINE sum #-}
sum :: forall m a. (Monad m, Num a) => Fold m (a, Maybe a) a
sum = Fold step initial extract
where
initial =
return
$ Partial
$ Tuple'
(0 :: a) -- running sum
(0 :: a) -- accumulated rounding error
step (Tuple' total err) (new, mOld) =
let incr =
case mOld of
-- XXX new may be large and err may be small we may lose it
Nothing -> new - err
-- XXX if (new - old) is large we may lose err
Just old -> (new - old) - err
-- total is large and incr may be small, we may round incr here but
-- we will accumulate the rounding error in err1 in the next step.
total1 = total + incr
-- Accumulate any rounding error in err1
-- XXX In the Nothing case above we may lose err, therefore we
-- should use ((total1 - total) - new) + err here.
-- Or even in the just case if (new - old) is large we may lose
-- err, so we should use ((total1 - total) + (old - new)) + err.
err1 = (total1 - total) - incr
in return $ Partial $ Tuple' total1 err1
extract (Tuple' total _) = return total
-- | The number of elements in the rolling window.
--
{-# INLINE length #-}
length :: (Monad m, Num b) => Fold m (a, Maybe a) b
length = Fold.foldl' step 0
where
step w (_, Nothing) = w + 1
step w _ = w
-------------------------------------------------------------------------------
-- Location
-------------------------------------------------------------------------------
-- Theoretically, we can approximate minimum in a rolling window by using a
-- 'powerMean' with sufficiently large negative power.
--
-- XXX If we need to know the minimum in the window only once in a while then
-- we can use linear search when it is extracted and not pay the cost all the
-- time.
--
-- | The minimum element in a rolling window.
--
-- If you want to compute the minimum of the entire stream Fold.minimum from streamly
-- package would be much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
minimum = Fold step initial extract
where
initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))
step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))
{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q
dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia <= snd ia'
then dqloop ia q'
else DQ.snoc ia q
extract (Tuple3' _ _ q) = return $ snd
$ fromMaybe (0, error "min: Empty stream")
$ DQ.head q
-- Theoretically, we can approximate maximum in a rolling window by using a
-- 'powerMean' with sufficiently large positive power.
--
-- | The maximum element in a rolling window.
--
-- If you want to compute the maximum of the entire stream Fold.maximum from streamly
-- package would be much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
maximum = Fold step initial extract
where
initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))
step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))
{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q
dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia >= snd ia'
then dqloop ia q'
else DQ.snoc ia q
extract (Tuple3' _ _ q) =
return
$ snd
$ fromMaybe (0, error "max: Empty stream")
$ DQ.head q
-- | The difference between the maximum and minimum elements of a rolling window.
--
-- >>> range = Fold.teeWith (-) maximum minimum
--
-- If you want to compute the range of the entire stream @Fold.teeWith (-)
-- Fold.maximum Fold.min@ from the streamly package would be much faster.
--
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE range #-}
range :: (Monad m, Num a, Ord a) => Fold m (a, Maybe a) a
range = Fold.teeWith (-) maximum minimum

View File

@ -244,6 +244,7 @@ library
, Streamly.Internal.Data.Fold.Step
, Streamly.Internal.Data.Refold.Type
, Streamly.Internal.Data.Fold.Type
, Streamly.Internal.Data.Fold.Window
, Streamly.Internal.Data.Stream.StreamD.Step
, Streamly.Internal.Data.Stream.StreamD.Type
, Streamly.Internal.Data.Stream.StreamDK.Type
@ -335,6 +336,8 @@ library
, transformers-base >= 0.4 && < 0.5
, primitive >= 0.5.4 && < 0.8
, heaps >= 0.3 && < 0.5
, deque >= 0.4 && < 0.5
if flag(use-unliftio)
build-depends: unliftio-core >= 0.2 && < 0.3
else

View File

@ -129,6 +129,7 @@ let haskellPackages =
# Use a better prompt
shellHook = ''
export CABAL_DIR="$(pwd)/.cabal.nix"
cabal user-config update -a "jobs: 1"
if test -n "$PS_SHELL"
then
export PS1="$PS_SHELL\[$bldred\](nix:streamly)\[$txtrst\] "

View File

@ -68,6 +68,8 @@ cradle:
component: "bench:Unicode.Stream"
- path: "./benchmark/lib/"
component: "lib:streamly-benchmarks"
- path: "./benchmark/Streamly/Benchmark/Data/Fold/Window.hs"
component: "bench:Data.Fold.Window"
- path: "./test"
config:
cradle:
@ -134,6 +136,8 @@ cradle:
component: "lib:streamly-tests"
- path: "./test/version-bounds.hs"
component: "test:version-bounds"
- path: "./test/Streamly/Test/Data/Fold/Window.hs"
component: "test:Data.Fold.Window"
dependencies:
- streamly.cabal

View File

@ -63,6 +63,7 @@ extra-source-files:
benchmark/Streamly/Benchmark/Data/Array/Prim.hs
benchmark/Streamly/Benchmark/Data/Array/Prim/Pinned.hs
benchmark/Streamly/Benchmark/Data/Array/Stream/Foreign.hs
benchmark/Streamly/Benchmark/Data/Fold/Window.hs
benchmark/Streamly/Benchmark/Data/Parser/*.hs
benchmark/Streamly/Benchmark/Data/Stream/*.hs
benchmark/Streamly/Benchmark/FileSystem/*.hs
@ -113,6 +114,7 @@ extra-source-files:
test/Streamly/Test/Prelude.hs
test/Streamly/Test/Prelude/*.hs
test/Streamly/Test/Unicode/*.hs
test/Streamly/Test/Data/Fold/*.hs
test/lib/Streamly/Test/Common.hs
test/lib/Streamly/Test/Prelude/Common.hs
test/streamly-tests.cabal
@ -204,6 +206,7 @@ extra-source-files:
core/src/Streamly/Internal/Data/Stream/StreamK.hs
core/src/Streamly/Data/Fold.hs
core/src/Streamly/Data/Fold/Tee.hs
core/src/Streamly/Internal/Data/Fold/Window.hs
core/src/Streamly/Data/Array/Foreign.hs
core/src/Streamly/Internal/Data/Time/Clock/Darwin.c
core/src/Streamly/Internal/Data/Time/Clock/Windows.c

View File

@ -0,0 +1,61 @@
module Streamly.Test.Data.Fold.Window (main) where
import Streamly.Internal.Data.Fold.Window
import Test.Hspec (hspec, describe, it, runIO)
import qualified Streamly.Internal.Data.Ring.Foreign as Ring
import qualified Streamly.Prelude as S
import Prelude hiding (sum, maximum, minimum)
moduleName :: String
moduleName = "Window"
main :: IO ()
main = hspec $ do
describe moduleName $ do
let numElem = 80000
winSize = 800
testCaseChunk = [9007199254740992, 1, 1.0 :: Double,
9007199254740992, 1, 1, 1, 9007199254740992]
testCase = take numElem $ cycle testCaseChunk
deviationLimit = 1
testFunc f = do
let c = S.fromList testCase
a <- runIO $ S.fold (Ring.slidingWindow winSize f) c
b <- runIO $ S.fold f $ S.drop (numElem - winSize)
$ S.map (, Nothing) c
let c1 = a - b
it ("should not deviate more than " ++ show deviationLimit)
$ c1 >= -1 * deviationLimit && c1 <= deviationLimit
describe "Sum" $ testFunc sum
describe "Correctness" $ do
let winSize = 3
testCase1 = [31, 41, 59, 26, 53, 58, 97] :: [Double]
testCase2 = replicate 5 1.0 ++ [7.0]
testFunc tc f sI sW = do
let c = S.fromList tc
a <- runIO $ S.toList $ S.postscan f $ S.map (, Nothing) c
b <- runIO $ S.toList $ S.postscan
(Ring.slidingWindow winSize f) c
it "Infinite" $ a == sI
it ("Finite " ++ show winSize) $ b == sW
describe "minimum" $ do
let scanInf = [31, 31, 31, 26, 26, 26, 26] :: [Double]
scanWin = [31, 31, 31, 26, 26, 26, 53] :: [Double]
testFunc testCase1 minimum scanInf scanWin
describe "maximum" $ do
let scanInf = [31, 41, 59, 59, 59, 59, 97] :: [Double]
scanWin = [31, 41, 59, 59, 59, 58, 97] :: [Double]
testFunc testCase1 maximum scanInf scanWin
describe "range" $ do
let scanInf = [0, 10, 28, 33, 33, 33, 71] :: [Double]
scanWin = [0, 10, 28, 33, 33, 32, 44] :: [Double]
testFunc testCase1 range scanInf scanWin
describe "sum" $ do
let scanInf = [1, 2, 3, 4, 5, 12] :: [Double]
scanWin = [1, 2, 3, 3, 3, 9] :: [Double]
testFunc testCase2 sum scanInf scanWin

View File

@ -251,6 +251,12 @@ test-suite Data.Fold
type: exitcode-stdio-1.0
main-is: Streamly/Test/Data/Fold.hs
test-suite Data.Fold.Window
import: test-options
type: exitcode-stdio-1.0
main-is: Streamly/Test/Data/Fold/Window.hs
ghc-options: -main-is Streamly.Test.Data.Fold.Window.main
test-suite Data.Parser
import: test-options
type: exitcode-stdio-1.0