mirror of
https://github.com/composewell/streamly.git
synced 2024-10-26 19:50:19 +03:00
DUMMY BRANCH: DONT EVER MERGE ANYTHING FROM THIS PATH
This commit is contained in:
parent
a75d7345e6
commit
4d51888fd6
@ -321,6 +321,8 @@ main =
|
||||
, benchIOSink "tap" (Ops.tap 1)
|
||||
, benchIOSink "tapAsync" (Ops.tapAsync 1)
|
||||
, benchIOSink "tapAsyncS" (Ops.tapAsyncS 1)
|
||||
, bench "reassembleBy_Fav" $ nfIO $ randomRIO (5,5) >>= Ops.favBench
|
||||
, bench "reassembleBy_Unfav" $ nfIO $ randomRIO (5,5) >>= Ops.unfavBench
|
||||
]
|
||||
, bgroup "transformationX4"
|
||||
[ benchIOSink "scan" (Ops.scan 4)
|
||||
|
@ -33,7 +33,8 @@ import Prelude
|
||||
(Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=), (==), (>=),
|
||||
subtract, undefined, Maybe(..), odd, Bool, not, (>>=), mapM_, curry,
|
||||
maxBound, div, IO, compare, Double, fromIntegral, Integer, (<$>),
|
||||
(<*>), flip, (**), (/))
|
||||
(<*>), flip, (**), (/), Bounded(..), Num(..), Eq, Ord, Integral(..), print,
|
||||
(=<<))
|
||||
import qualified Prelude as P
|
||||
import qualified Data.Foldable as F
|
||||
import qualified GHC.Exts as GHC
|
||||
@ -936,3 +937,45 @@ foldableSum = P.sum
|
||||
{-# INLINE traversableMapM #-}
|
||||
traversableMapM :: Stream Identity Int -> IO (Stream Identity Int)
|
||||
traversableMapM = P.mapM return
|
||||
|
||||
-- Benchmark for reassembleBy
|
||||
newtype BoundedZ = BoundedZ Int deriving (Eq, Ord)
|
||||
|
||||
instance Num BoundedZ where
|
||||
(BoundedZ x) + (BoundedZ y) = BoundedZ (x + y)
|
||||
(BoundedZ x) * (BoundedZ y) = BoundedZ (x * y)
|
||||
(BoundedZ x) - (BoundedZ y) = BoundedZ (x - y)
|
||||
fromInteger = BoundedZ . fromIntegral
|
||||
|
||||
instance Bounded BoundedZ where
|
||||
minBound = BoundedZ 0
|
||||
maxBound = BoundedZ 100000000
|
||||
|
||||
diff :: BoundedZ -> BoundedZ -> Int
|
||||
diff (BoundedZ x) (BoundedZ y) = x - y
|
||||
|
||||
favBench :: Int -> IO ()
|
||||
favBench i = S.drain $ Internal.reassembleBy i diff favSrc
|
||||
where
|
||||
favSrc = S.unfoldr step (BoundedZ 0)
|
||||
where
|
||||
step cnt =
|
||||
if cnt > BoundedZ value
|
||||
then Nothing
|
||||
else Just (cnt, cnt + 1)
|
||||
|
||||
unfavBench :: Int -> IO ()
|
||||
unfavBench i = S.drain $ Internal.reassembleBy i diff unfavSrc
|
||||
where
|
||||
unfavSrc = S.unfoldr step (BoundedZ (i - 1))
|
||||
where
|
||||
step cnt
|
||||
| cnt > BoundedZ value = Nothing
|
||||
| unb cnt `rem` i == 0 = Just (cnt, cnt + 2 * BoundedZ i - 1)
|
||||
| P.otherwise = Just (cnt, cnt - 1)
|
||||
unb (BoundedZ x) = x
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -298,7 +298,6 @@ module Streamly.Internal.Data.Stream.StreamD
|
||||
, applyParallel
|
||||
, foldParallel
|
||||
|
||||
-- XXX Shift/Change?
|
||||
-- * Reordering
|
||||
, reassembleBy
|
||||
)
|
||||
@ -3855,68 +3854,48 @@ tapAsync f (Stream step1 state1) = Stream step TapInit
|
||||
-- Reorder in sequence
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- XXX Add tests and benchmarks
|
||||
-- XXX Replace heap with a list
|
||||
-- Buffer until the next element in sequence arrives. The function argument
|
||||
-- determines the difference in sequence numbers. This could be useful in
|
||||
-- implementing sequenced streams, for example, TCP reassembly.
|
||||
{-# INLINE reassembleBy #-}
|
||||
{-# INLINE_NORMAL reassembleBy #-}
|
||||
reassembleBy
|
||||
:: (Bounded a, Monad m)
|
||||
=> Int
|
||||
-> (a -> a -> Int)
|
||||
-> Stream m a
|
||||
-> Stream m a
|
||||
reassembleBy sz diff (Stream step state) = Stream step' state'
|
||||
reassembleBy sz diff (Stream step state) = Stream step' (H.empty, Nothing, state)
|
||||
where
|
||||
state' = (H.empty, Nothing, state)
|
||||
{-# INLINE_LATE step' #-}
|
||||
step' _ (h, Nothing, s) = do
|
||||
r <- step defState s
|
||||
case r of
|
||||
Yield a s' -> case diff a minBound of
|
||||
0 -> return $ Yield a (h, Just a, s')
|
||||
x | x < sz -> return $ Skip (H.insert (Entry x a) h, Nothing, s')
|
||||
| otherwise -> return $ Skip (h, Nothing, s')
|
||||
Skip s' -> return $ Skip (h, Nothing, s')
|
||||
Stop -> return Stop
|
||||
step' _ (h, Just c, s) = do
|
||||
r <- step defState s
|
||||
case r of
|
||||
Yield a s' -> case diff a c of
|
||||
0 -> return $ Skip (h, Just c, s')
|
||||
1 -> return $ Yield a (h, Just a, s')
|
||||
x | x < 0 -> return $ Skip (h, Just c, s')
|
||||
| x < sz ->
|
||||
let y = diff a minBound in
|
||||
case view of
|
||||
Just (Entry _ payH, delH) ->
|
||||
r <- step defState s
|
||||
case r of
|
||||
Yield a s' -> case diff a minBound of
|
||||
0 -> return $ Yield a (h, Just a, s')
|
||||
x | x < sz -> return $ Skip (hi x a h, Nothing, s')
|
||||
| otherwise -> return $ Skip (h, Nothing, s')
|
||||
Skip s' -> return $ Skip (h, Nothing, s')
|
||||
Stop -> return Stop
|
||||
step' _ (h, Just c, s) = exhaustH (h, c, s)
|
||||
hi p v = H.insert (Entry p v)
|
||||
{-# INLINE_EARLY processS #-}
|
||||
processS (h, c, s) = do
|
||||
r <- step defState s
|
||||
case r of
|
||||
Yield a s' ->
|
||||
return $ case diff a c of
|
||||
x | x <= 0 -> Skip (h, Just c, s')
|
||||
1 -> Yield a (h, Just a, s')
|
||||
x | x <= sz -> Skip (hi (diff a minBound) a h, Just c, s')
|
||||
_ -> Skip (h, Just c, s')
|
||||
Skip s' -> return $ Skip (h, Just c, s')
|
||||
Stop -> return Stop
|
||||
exhaustH (h, c, s) =
|
||||
case H.uncons h of
|
||||
Nothing -> processS (h, c, s)
|
||||
Just (Entry _ payH, delH) ->
|
||||
case diff payH c of
|
||||
0 -> return $ Skip (H.insert (Entry y a) delH, Just c, s')
|
||||
1 -> return $ Yield payH (H.insert (Entry y a) delH, Just payH, s')
|
||||
-- XXX Condition required?
|
||||
x_ | x_ < 0 -> return $ Skip (H.insert (Entry y a) delH, Just c, s')
|
||||
_ -> return $ Skip (H.insert (Entry y a) h, Just c, s')
|
||||
_ -> return $ Skip (h, Just c, s')
|
||||
| otherwise -> return $ Skip (h, Just c, s')
|
||||
Skip s' ->
|
||||
case view of
|
||||
Just (Entry _ payH, delH) ->
|
||||
case diff payH c of
|
||||
0 -> return $ Skip (delH, Just c, s')
|
||||
1 -> return $ Yield payH (delH, Just payH, s')
|
||||
-- XXX Condition required?
|
||||
x | x < 0 -> return $ Skip (delH, Just c, s')
|
||||
_ -> return $ Skip (h, Just c, s')
|
||||
_ -> return $ Skip (h, Just c, s')
|
||||
Stop ->
|
||||
case view of
|
||||
Just (Entry _ payH, delH) ->
|
||||
case diff payH c of
|
||||
0 -> return $ Skip (delH, Just c, s)
|
||||
1 -> return $ Yield payH (delH, Just payH, s)
|
||||
-- XXX Condition required?
|
||||
x | x < 0 -> return $ Skip (delH, Just c, s)
|
||||
-- XXX Do we want to yeild the rest?
|
||||
_ -> return Stop
|
||||
_ -> return Stop
|
||||
where
|
||||
view = H.uncons h
|
||||
x | x <= 0 -> return $ Skip (delH, Just c, s)
|
||||
1 -> return $ Yield payH (delH, Just payH, s)
|
||||
_ -> processS (h, c, s)
|
||||
|
@ -420,7 +420,7 @@ module Streamly.Internal.Prelude
|
||||
, fromHandle
|
||||
, toHandle
|
||||
|
||||
-- XXX Shift/Change?
|
||||
-- XXX Change the position
|
||||
-- * Reordering
|
||||
, reassembleBy
|
||||
)
|
||||
@ -3533,7 +3533,6 @@ splitInnerBySuffix splitter joiner xs =
|
||||
-- Reorder in sequence
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
|
||||
-- Buffer until the next element in sequence arrives. The function argument
|
||||
-- determines the difference in sequence numbers. This could be useful in
|
||||
-- implementing sequenced streams, for example, TCP reassembly.
|
||||
|
Loading…
Reference in New Issue
Block a user