Add lastN combinator

This commit is contained in:
adithyaov 2019-12-06 11:53:17 +05:30 committed by Harendra Kumar
parent b83c2c28fc
commit 46643d15b3
3 changed files with 30 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import qualified Streamly.Memory.Array as A
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Sink as Sink
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Internal.Prelude as IP
import qualified Streamly.Internal.Data.Pipe as Pipe
@ -200,6 +201,9 @@ main =
-- this is too low and causes all benchmarks reported in ns
-- , benchIOSink "head" Ops.head
, benchIOSink "last" Ops.last
, benchIOSink "lastN.1" (S.fold (IA.lastN 1))
, benchIOSink "lastN.10" (S.fold (IA.lastN 10))
, benchIOSink "lastN.Max" (S.fold (IA.lastN Ops.maxValue))
-- , benchIOSink "lookup" Ops.lookup
, benchIOSink "find" Ops.find
, benchIOSink "findIndex" Ops.findIndex

View File

@ -305,6 +305,10 @@ module Streamly.Internal.Data.Stream.StreamD
, mkParallel
, applyParallel
, foldParallel
-- XXX Move this later
-- XXX Exported from Array again
, lastN
)
where
@ -346,6 +350,7 @@ import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple3'(..))
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
@ -4013,3 +4018,21 @@ tapAsync f (Stream step1 state1) = Stream step TapInit
Yield a s -> Yield a (TapDone s)
Skip s -> Skip (TapDone s)
Stop -> Stop
-- XXX Is there room for improvement?
-- | Take last 'n' elements from the stream and discard the rest.
{-# INLINABLE lastN #-}
lastN :: (Storable a, MonadIO m) => Int -> Fold m a (Array a)
lastN n = Fold step initial done
where
step (Tuple3' rb rh i) a = do
rh1 <- liftIO $ RB.unsafeInsert rb rh a
return $ Tuple3' rb rh1 (i + 1)
initial = fmap (\(a, b) -> Tuple3' a b 0) $ liftIO $ RB.new n
done (Tuple3' rb rh i) = do
arr <- liftIO $ A.newArray n
foldFunc i rh snoc' arr rb
snoc' b a = liftIO $ A.unsafeSnoc b a
foldFunc i
| i < n = RB.unsafeFoldRingM
| otherwise = RB.unsafeFoldRingFullM

View File

@ -124,6 +124,9 @@ module Streamly.Internal.Memory.Array
-- * Folding Arrays
, streamFold
, fold
-- * Folds with Array as the container
, D.lastN
)
where