Use MutableByteArray as the primitive for the ring buffer

This commit is contained in:
Adithya Kumar 2024-07-04 16:59:30 +05:30
parent 390a06f003
commit ca881c620d
12 changed files with 146 additions and 197 deletions

View File

@ -11,7 +11,6 @@
module Main (main) where
import Control.Monad (void)
import GHC.Ptr (Ptr(..))
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Ring as Ring
import qualified Data.Foldable as P
@ -25,10 +24,10 @@ import Prelude as P
-- Benchmark ops
-------------------------------------------------------------------------------
unsafeEqArrayN :: (Int, Array.Array Int, (Ring.Ring Int, Ptr Int)) -> Bool
unsafeEqArrayN :: (Int, Array.Array Int, (Ring.Ring Int, Int)) -> Bool
unsafeEqArrayN (value, arr, (ring, rh)) = Ring.unsafeEqArrayN ring rh arr value
unsafeEqArray :: (Array.Array Int, (Ring.Ring Int, Ptr Int)) -> Bool
unsafeEqArray :: (Array.Array Int, (Ring.Ring Int, Int)) -> Bool
unsafeEqArray (arr, (ring, rh)) = Ring.unsafeEqArray ring rh arr
-------------------------------------------------------------------------------
@ -36,7 +35,7 @@ unsafeEqArray (arr, (ring, rh)) = Ring.unsafeEqArray ring rh arr
-------------------------------------------------------------------------------
o_1_space_serial ::
Int -> Array.Array Int -> (Ring.Ring Int, Ptr Int) -> [Benchmark]
Int -> Array.Array Int -> (Ring.Ring Int, Int) -> [Benchmark]
o_1_space_serial value arr (ring, rh) =
[ bench "unsafeEqArrayN" $ nf unsafeEqArrayN (value, arr, (ring, rh))
, bench "unsafeEqArray" $ nf unsafeEqArray (arr, (ring, rh))
@ -58,12 +57,12 @@ main = do
alloc value = do
let input = [1 .. value] :: [Int]
let arr = Array.fromList input
(ring, rh) <- Ring.new value
void $ P.foldlM (Ring.unsafeInsert ring) rh input
return (arr, (ring, rh))
ring <- Ring.new value
void $ P.foldlM (Ring.unsafeInsert ring) 0 input
return (arr, ring)
allBenchmarks (arr, (ring, rh)) value =
allBenchmarks (arr, ring) value =
[ bgroup
(o_1_space_prefix moduleName)
(o_1_space_serial value arr (ring, rh))
(o_1_space_serial value arr (ring, 0))
]

View File

@ -109,7 +109,6 @@ import Data.Proxy (Proxy(..))
import Data.Word (Word8)
import Foreign.C.String (CString)
import Foreign.Ptr (castPtr)
import Foreign.Storable (Storable)
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.Unbox (Unbox(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)
@ -212,7 +211,7 @@ last = getIndexRev 0
--
{-# INLINE writeLastN #-}
writeLastN ::
(Storable a, Unbox a, MonadIO m) => Int -> Fold m a (Array a)
(Unbox a, MonadIO m) => Int -> Fold m a (Array a)
writeLastN n
| n <= 0 = fmap (const mempty) FL.drain
| otherwise = unsafeFreeze <$> Fold step initial done done
@ -224,7 +223,7 @@ writeLastN n
return $ FL.Partial $ Tuple3Fused' rb rh1 (i + 1)
initial =
let f (a, b) = FL.Partial $ Tuple3Fused' a b (0 :: Int)
let f a = FL.Partial $ Tuple3Fused' a 0 (0 :: Int)
in fmap f $ liftIO $ RB.new n
done (Tuple3Fused' rb rh i) = do

View File

@ -41,3 +41,10 @@
#define INDEX_VALID(i,end,a) i + SIZE_OF(a) <= end
#define INDEX_INVALID(i,end,a) i + SIZE_OF(a) > end
-------------------------------------------------------------------------------
-- Macros to use Unbox with element indices
-------------------------------------------------------------------------------
#define PEEK_ELEM(a,i,arr) peekAt (i * SIZE_OF(a)) arr
#define POKE_ELEM(a,i,arr,val) pokeAt (i * SIZE_OF(a)) arr val

View File

@ -228,14 +228,13 @@ import Data.Either (isLeft, isRight, fromLeft, fromRight)
import Data.Int (Int64)
import Data.Proxy (Proxy(..))
import Data.Word (Word32)
import Foreign.Storable (Storable, peek)
import Streamly.Internal.Data.Unbox (Unbox(..))
import Streamly.Internal.Data.MutArray.Type (MutArray(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Scan (Scan (..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unbox (Unbox, sizeOf)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import qualified Prelude
@ -1564,7 +1563,7 @@ data SplitOnSeqState acc a rb rh w ck =
--
-- /Pre-release/
{-# INLINE takeEndBySeq #-}
takeEndBySeq :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
takeEndBySeq :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) =>
Array.Array a
-> Fold m a b
-> Fold m a b
@ -1590,8 +1589,8 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(rb, rhead) <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb rhead
rb <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb 0
Done b -> return $ Done b
-- Word pattern related
@ -1664,7 +1663,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
if idx == maxIndex
then do
let fld = Ring.unsafeFoldRing (Ring.ringBound rb)
let fld = Ring.unsafeFoldRing (Ring.ringCapacity rb)
let !ringHash = fld addCksum 0 rb
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
then Done <$> ffinal s1
@ -1676,7 +1675,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
res <- fstep s x
case res of
Partial s1 -> do
old <- liftIO $ peek rh
(old :: a) <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
let ringHash = deltaCksum cksum old x
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
@ -1704,7 +1703,7 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
-- /Pre-release/
--
{-# INLINE takeEndBySeq_ #-}
takeEndBySeq_ :: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
takeEndBySeq_ :: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a) =>
Array.Array a
-> Fold m a b
-> Fold m a b
@ -1731,8 +1730,8 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(rb, rhead) <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb rhead
rb <- liftIO $ Ring.new patLen
return $ Partial $ SplitOnSeqKR acc 0 rb 0
Done b -> return $ Done b
-- Word pattern related
@ -1804,14 +1803,14 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
rh1 <- liftIO $ Ring.unsafeInsert rb rh x
if idx == maxIndex
then do
let fld = Ring.unsafeFoldRing (Ring.ringBound rb)
let fld = Ring.unsafeFoldRing (Ring.ringCapacity rb)
let !ringHash = fld addCksum 0 rb
if ringHash == patHash && Ring.unsafeEqArray rb rh1 patArr
then Done <$> ffinal s
else return $ Partial $ SplitOnSeqKRLoop s ringHash rb rh1
else return $ Partial $ SplitOnSeqKR s (idx + 1) rb rh1
step (SplitOnSeqKRLoop s cksum rb rh) x = do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
res <- fstep s old
case res of
Partial s1 -> do
@ -1841,7 +1840,7 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
if n == 0
then fex s
else do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a, rh, (Ring.ringContents rb))
let rh1 = Ring.advance rb rh
r <- fstep s old
case r of
@ -1853,7 +1852,7 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
SplitOnSeqSingle s _ -> fex s
SplitOnSeqWord s idx wrd -> consumeWord s idx wrd
SplitOnSeqWordLoop s wrd -> consumeWord s patLen wrd
SplitOnSeqKR s idx rb _ -> consumeRing s idx rb (Ring.startOf rb)
SplitOnSeqKR s idx rb _ -> consumeRing s idx rb 0
SplitOnSeqKRLoop s _ rb rh -> consumeRing s patLen rb rh
extract = extractFunc fextract
@ -1888,7 +1887,7 @@ tee = teeWith (,)
-- XXX use "List" instead of "[]"?, use Array for output to scale it to a large
-- number of consumers? For polymorphic case a vector could be helpful. For
-- Storables we can use arrays. Will need separate APIs for those.
-- Unboxs we can use arrays. Will need separate APIs for those.
--
-- | Distribute one copy of the stream to each fold and collect the results in
-- a container.

View File

@ -59,14 +59,12 @@ where
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor(bimap)
import Foreign.Storable (Storable, peek)
import Streamly.Internal.Data.Unbox (Unbox(..))
import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict
(Tuple'(..), Tuple3Fused' (Tuple3Fused'))
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Ring as Ring
@ -268,7 +266,7 @@ windowPowerSumFrac p = windowLmap (** p) windowSum
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowRange #-}
windowRange :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a))
windowRange :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe (a, a))
windowRange n = Fold step initial extract extract
where
@ -280,7 +278,7 @@ windowRange n = Fold step initial extract extract
if n <= 0
then error "range: window size must be > 0"
else
let f (a, b) = Partial $ Tuple3Fused' a b (0 :: Int)
let f a = Partial $ Tuple3Fused' a 0 (0 :: Int)
in fmap f $ liftIO $ Ring.new n
step (Tuple3Fused' rb rh i) a = do
@ -306,7 +304,7 @@ windowRange n = Fold step initial extract extract
-- uninitialized if the ring is not full.
-- Using "unsafeForeignPtrToPtr" here is safe as we touch the ring
-- again in "foldFunc".
x <- liftIO $ peek (unsafeForeignPtrToPtr (Ring.ringStart rb))
x <- liftIO $ peekAt 0 (Ring.ringContents rb)
let accum (mn, mx) a = return (min mn a, max mx a)
fmap Just $ foldFunc i rh accum (x, x) rb
@ -323,7 +321,7 @@ windowRange n = Fold step initial extract extract
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMinimum #-}
windowMinimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMinimum :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe a)
windowMinimum n = fmap (fmap fst) $ windowRange n
-- | The maximum element in a rolling window.
@ -336,7 +334,7 @@ windowMinimum n = fmap (fmap fst) $ windowRange n
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMaximum #-}
windowMaximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMaximum :: (MonadIO m, Unbox a, Ord a) => Int -> Fold m a (Maybe a)
windowMaximum n = fmap (fmap snd) $ windowRange n
-- | Arithmetic mean of elements in a sliding window:

View File

@ -17,12 +17,10 @@ module Streamly.Internal.Data.Ring
-- * Construction
, new
, newRing
, writeN
, advance
, moveBy
, startOf
-- * Random writes
, unsafeInsert
@ -72,22 +70,19 @@ module Streamly.Internal.Data.Ring
#include "ArrayMacros.h"
#include "inline.hs"
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.Storable
import Foreign.ForeignPtr (ForeignPtr, withForeignPtr, touchForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (plusPtr, minusPtr, castPtr)
import Streamly.Internal.Data.Unbox as Unboxed (Unbox(peekAt))
import GHC.ForeignPtr (mallocPlainForeignPtrAlignedBytes)
import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Unbox as Unboxed (Unbox(..))
import Streamly.Internal.Data.MutArray.Type (MutArray)
import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..), lmap)
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Stream.Step (Step(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.System.IO (unsafeInlineIO)
import Streamly.Internal.Data.MutByteArray.Type (MutByteArray)
import Data.Proxy (Proxy(..))
import qualified Streamly.Internal.Data.MutByteArray.Type as MBA
import qualified Streamly.Internal.Data.MutArray.Type as MA
import qualified Streamly.Internal.Data.Array.Type as A
@ -111,68 +106,38 @@ import Prelude hiding (length, concat, read)
-- structure. We should not leak out references to it for immutable use.
--
data Ring a = Ring
{ ringStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address
, ringBound :: {-# UNPACK #-} !(Ptr a) -- first address beyond allocated memory
{ ringContents :: {-# UNPACK #-} !MutByteArray
, ringCapacity :: {-# UNPACK #-} !Int
}
-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------
-- | Get the first address of the ring as a pointer.
startOf :: Ring a -> Ptr a
startOf = unsafeForeignPtrToPtr . ringStart
-- | Create a new ringbuffer and return the ring buffer and the ringHead.
-- Returns the ring and the ringHead, the ringHead is same as ringStart.
{-# INLINE new #-}
new :: forall a. Storable a => Int -> IO (Ring a, Ptr a)
new :: forall a. Unbox a => Int -> IO (Ring a)
new count = do
let size = count * max 1 (sizeOf (undefined :: a))
fptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: a))
let p = unsafeForeignPtrToPtr fptr
return (Ring
{ ringStart = fptr
, ringBound = p `plusPtr` size
}, p)
-- XXX Rename this to "new".
--
-- | @newRing count@ allocates an empty array that can hold 'count' items. The
-- memory of the array is uninitialized and the allocation is aligned as per
-- the 'Storable' instance of the type.
--
-- /Unimplemented/
{-# INLINE newRing #-}
newRing :: Int -> m (Ring a)
newRing = undefined
arr <- MBA.new (count * SIZE_OF(a))
pure $ Ring arr count
-- | Advance the ringHead by 1 item, wrap around if we hit the end of the
-- array.
{-# INLINE advance #-}
advance :: forall a. Storable a => Ring a -> Ptr a -> Ptr a
advance Ring{..} ringHead =
let ptr = PTR_NEXT(ringHead,a)
in if ptr < ringBound
then ptr
else unsafeForeignPtrToPtr ringStart
advance :: Ring a -> Int -> Int
advance rb ringHead =
let newHead = ringHead + 1
in if newHead >= ringCapacity rb
then 0
else newHead
-- | Move the ringHead by n items. The direction depends on the sign on whether
-- n is positive or negative. Wrap around if we hit the beginning or end of the
-- array.
{-# INLINE moveBy #-}
moveBy :: forall a. Storable a => Int -> Ring a -> Ptr a -> Ptr a
moveBy by Ring {..} ringHead = ringStartPtr `plusPtr` advanceFromHead
where
elemSize = STORABLE_SIZE_OF(a)
ringStartPtr = unsafeForeignPtrToPtr ringStart
lenInBytes = ringBound `minusPtr` ringStartPtr
offInBytes = ringHead `minusPtr` ringStartPtr
len = assert (lenInBytes `mod` elemSize == 0) $ lenInBytes `div` elemSize
off = assert (offInBytes `mod` elemSize == 0) $ offInBytes `div` elemSize
advanceFromHead = (off + by `mod` len) * elemSize
moveBy :: Int -> Ring a -> Int -> Int
moveBy by rb ringHead = (ringHead + by) `mod` ringCapacity rb
-- XXX Move the writeLastN from array module here.
--
@ -181,7 +146,7 @@ moveBy by Ring {..} ringHead = ringStartPtr `plusPtr` advanceFromHead
--
-- /Unimplemented/
{-# INLINE writeN #-}
writeN :: -- (Storable a, MonadIO m) =>
writeN :: -- (Unbox a, MonadIO m) =>
Int -> Fold m a (Ring a)
writeN = undefined
@ -200,7 +165,7 @@ fromArray = undefined
-- | Modify a given index of a ring array using a modifier function.
--
-- /Unimplemented/
modifyIndex :: -- forall m a b. (MonadIO m, Storable a) =>
modifyIndex :: -- forall m a b. (MonadIO m, Unbox a) =>
Ring a -> Int -> (a -> (a, b)) -> m b
modifyIndex = undefined
@ -211,7 +176,7 @@ modifyIndex = undefined
--
-- /Unimplemented/
{-# INLINE putIndex #-}
putIndex :: -- (MonadIO m, Storable a) =>
putIndex :: -- (MonadIO m, Unbox a) =>
Ring a -> Int -> a -> m ()
putIndex = undefined
@ -220,17 +185,16 @@ putIndex = undefined
-- beause ringHead supplied is not verified to be within the Ring. Also,
-- the ringStart foreignPtr must be guaranteed to be alive by the caller.
{-# INLINE unsafeInsert #-}
unsafeInsert :: Storable a => Ring a -> Ptr a -> a -> IO (Ptr a)
unsafeInsert :: forall a. Unbox a => Ring a -> Int -> a -> IO Int
unsafeInsert rb ringHead newVal = do
poke ringHead newVal
-- touchForeignPtr (ringStart rb)
return $ advance rb ringHead
pokeAt (ringHead * SIZE_OF(a)) (ringContents rb) newVal
pure $ advance rb ringHead
-- | Insert an item at the head of the ring, when the ring is full this
-- replaces the oldest item in the ring with the new item.
--
-- /Unimplemented/
slide :: -- forall m a. (MonadIO m, Storable a) =>
slide :: -- forall m a. (MonadIO m, Unbox a) =>
Ring a -> a -> m (Ring a)
slide = undefined
@ -242,14 +206,14 @@ slide = undefined
--
-- Unsafe because it does not check the bounds of the ring array.
{-# INLINE_NORMAL getIndexUnsafe #-}
getIndexUnsafe :: -- forall m a. (MonadIO m, Storable a) =>
getIndexUnsafe :: -- forall m a. (MonadIO m, Unbox a) =>
Ring a -> Int -> m a
getIndexUnsafe = undefined
-- | /O(1)/ Lookup the element at the given index. Index starts from 0.
--
{-# INLINE getIndex #-}
getIndex :: -- (MonadIO m, Storable a) =>
getIndex :: -- (MonadIO m, Unbox a) =>
Ring a -> Int -> m a
getIndex = undefined
@ -259,7 +223,7 @@ getIndex = undefined
-- Slightly faster than computing the forward index and using getIndex.
--
{-# INLINE getIndexRev #-}
getIndexRev :: -- (MonadIO m, Storable a) =>
getIndexRev :: -- (MonadIO m, Unbox a) =>
Ring a -> Int -> m a
getIndexRev = undefined
@ -282,7 +246,7 @@ byteLength = undefined
--
-- /Unimplemented/
{-# INLINE length #-}
length :: -- forall a. Storable a =>
length :: -- forall a. Unbox a =>
Ring a -> Int
length = undefined
@ -317,18 +281,16 @@ bytesFree = undefined
--
-- /Internal/
{-# INLINE_NORMAL read #-}
read :: forall m a. (MonadIO m, Storable a) => Unfold m (Ring a, Ptr a, Int) a
read :: forall m a. (MonadIO m, Unbox a) => Unfold m (Ring a, Int, Int) a
read = Unfold step return
where
step (rb, rh, n) = do
if n <= 0
then do
liftIO $ touchForeignPtr (ringStart rb)
return Stop
then return Stop
else do
x <- liftIO $ peek rh
x <- liftIO $ PEEK_ELEM(a, rh, (ringContents rb))
let rh1 = advance rb rh
return $ Yield x (rb, rh1, n - 1)
@ -336,7 +298,7 @@ read = Unfold step return
--
-- /Unimplemented/
{-# INLINE_NORMAL readRev #-}
readRev :: -- forall m a. (MonadIO m, Storable a) =>
readRev :: -- forall m a. (MonadIO m, Unbox a) =>
Unfold m (MutArray a) a
readRev = undefined
@ -352,7 +314,7 @@ readRev = undefined
--
-- /Unimplemented/
{-# INLINE_NORMAL ringsOf #-}
ringsOf :: -- forall m a. (MonadIO m, Storable a) =>
ringsOf :: -- forall m a. (MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (MutArray a)
ringsOf = undefined -- Stream.scan (writeN n)
@ -381,10 +343,10 @@ asBytes = castUnsafe
--
-- /Pre-release/
--
cast :: forall a b. Storable b => Ring a -> Maybe (Ring b)
cast :: forall a b. Unbox b => Ring a -> Maybe (Ring b)
cast arr =
let len = byteLength arr
r = len `mod` STORABLE_SIZE_OF(b)
r = len `mod` SIZE_OF(b)
in if r /= 0
then Nothing
else Just $ castUnsafe arr
@ -399,28 +361,25 @@ cast arr =
-- the ring buffer. This is unsafe because the ringHead Ptr is not checked to
-- be in range.
{-# INLINE unsafeEqArrayN #-}
unsafeEqArrayN :: Ring a -> Ptr a -> A.Array a -> Int -> Bool
unsafeEqArrayN :: forall a. Unbox a => Ring a -> Int -> A.Array a -> Int -> Bool
unsafeEqArrayN Ring{..} rh A.Array{..} nBytes
| nBytes < 0 = error "unsafeEqArrayN: n should be >= 0"
| nBytes == 0 = True
| otherwise = unsafeInlineIO $ check (castPtr rh) 0
| otherwise = unsafeInlineIO $ check (rh * SIZE_OF(a)) 0
where
w8Contents = arrContents
check p i = do
(relem :: Word8) <- peek p
aelem <- peekAt i w8Contents
(relem :: Word8) <- peekAt p ringContents
aelem <- peekAt i arrContents
if relem == aelem
then go (p `plusPtr` 1) (i + 1)
then go (p + 1) (i + 1)
else return False
go p i
| i == nBytes = return True
| castPtr p == ringBound =
go (castPtr (unsafeForeignPtrToPtr ringStart)) i
| castPtr p == rh = touchForeignPtr ringStart >> return True
| p == (ringCapacity * SIZE_OF(a)) = go 0 i
| p == (rh * SIZE_OF(a)) = return True
| otherwise = check p i
-- XXX This is not modular. We should probably just convert the array and the
@ -435,31 +394,31 @@ unsafeEqArrayN Ring{..} rh A.Array{..} nBytes
-- supplied array must be equal to or bigger than the ringBuffer, ARRAY BOUNDS
-- ARE NOT CHECKED.
{-# INLINE unsafeEqArray #-}
unsafeEqArray :: Ring a -> Ptr a -> A.Array a -> Bool
unsafeEqArray :: forall a. Unbox a => Ring a -> Int -> A.Array a -> Bool
unsafeEqArray Ring{..} rh A.Array{..} =
unsafeInlineIO $ check (castPtr rh) 0
unsafeInlineIO $ check (rh * SIZE_OF(a)) 0
where
w8Contents = arrContents
check p i = do
(relem :: Word8) <- peek p
aelem <- peekAt i w8Contents
(relem :: Word8) <- peekAt p ringContents
aelem <- peekAt i arrContents
if relem == aelem
then go (p `plusPtr` 1) (i + 1)
then go (p + 1) (i + 1)
else return False
go p i
| castPtr p ==
ringBound = go (castPtr (unsafeForeignPtrToPtr ringStart)) i
| castPtr p == rh = touchForeignPtr ringStart >> return True
| p == (ringCapacity * SIZE_OF(a)) = go 0 i
| p == (rh * SIZE_OF(a)) = return True
| otherwise = check p i
-------------------------------------------------------------------------------
-- Folding
-------------------------------------------------------------------------------
-- XXX How does repeated multiplication effect performance? Should we track the
-- byte index instead?
-- XXX We can unfold it into a stream and fold the stream instead.
-- XXX use MonadIO
--
@ -469,39 +428,30 @@ unsafeEqArray Ring{..} rh A.Array{..} =
--
-- Unsafe because the supplied Ptr is not checked to be in range.
{-# INLINE unsafeFoldRing #-}
unsafeFoldRing :: forall a b. Storable a
=> Ptr a -> (b -> a -> b) -> b -> Ring a -> b
unsafeFoldRing ptr f z Ring{..} =
let !res = unsafeInlineIO $ withForeignPtr ringStart $ \p ->
go z p ptr
unsafeFoldRing :: forall a b. Unbox a
=> Int -> (b -> a -> b) -> b -> Ring a -> b
unsafeFoldRing len f z Ring{..} =
let !res = unsafeInlineIO $ go z 0 len
in res
where
go !acc !p !q
| p == q = return acc
| otherwise = do
x <- peek p
go (f acc x) (PTR_NEXT(p,a)) q
-- XXX Can we remove MonadIO here?
withForeignPtrM :: MonadIO m => ForeignPtr a -> (Ptr a -> m b) -> m b
withForeignPtrM fp fn = do
r <- fn $ unsafeForeignPtrToPtr fp
liftIO $ touchForeignPtr fp
return r
x <- PEEK_ELEM(a, p, ringContents)
go (f acc x) (p + 1) q
-- | Like unsafeFoldRing but with a monadic step function.
{-# INLINE unsafeFoldRingM #-}
unsafeFoldRingM :: forall m a b. (MonadIO m, Storable a)
=> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingM ptr f z Ring {..} =
withForeignPtrM ringStart $ \x -> go z x ptr
unsafeFoldRingM :: forall m a b. (MonadIO m, Unbox a)
=> Int -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingM len f z Ring {..} = go z 0 len
where
go !acc !start !end
| start == end = return acc
| otherwise = do
let !x = unsafeInlineIO $ peek start
let !x = unsafeInlineIO $ PEEK_ELEM(a, start, ringContents)
acc1 <- f acc x
go acc1 (PTR_NEXT(start,a)) end
go acc1 (start + 1) end
-- | Fold the entire length of a ring buffer starting at the supplied ringHead
-- pointer. Assuming the supplied ringHead pointer points to the oldest item,
@ -511,13 +461,12 @@ unsafeFoldRingM ptr f z Ring {..} =
-- Note, this will crash on ring of 0 size.
--
{-# INLINE unsafeFoldRingFullM #-}
unsafeFoldRingFullM :: forall m a b. (MonadIO m, Storable a)
=> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingFullM rh f z rb@Ring {..} =
withForeignPtrM ringStart $ \_ -> go z rh
unsafeFoldRingFullM :: forall m a b. (MonadIO m, Unbox a)
=> Int -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingFullM rh f z rb@Ring {..} = go z rh
where
go !acc !start = do
let !x = unsafeInlineIO $ peek start
let !x = unsafeInlineIO $ PEEK_ELEM(a, start, ringContents)
acc' <- f acc x
let ptr = advance rb start
if ptr == rh
@ -530,16 +479,15 @@ unsafeFoldRingFullM rh f z rb@Ring {..} =
-- Note, this will crash on ring of 0 size.
--
{-# INLINE unsafeFoldRingNM #-}
unsafeFoldRingNM :: forall m a b. (MonadIO m, Storable a)
=> Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingNM count rh f z rb@Ring {..} =
withForeignPtrM ringStart $ \_ -> go count z rh
unsafeFoldRingNM :: forall m a b. (MonadIO m, Unbox a)
=> Int -> Int -> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingNM count rh f z rb@Ring {..} = go count z rh
where
go 0 acc _ = return acc
go !n !acc !start = do
let !x = unsafeInlineIO $ peek start
let !x = unsafeInlineIO $ PEEK_ELEM(a, start, ringContents)
acc' <- f acc x
let ptr = advance rb start
if ptr == rh || n == 0
@ -556,7 +504,7 @@ data Tuple4' a b c d = Tuple4' !a !b !c !d deriving Show
-- a))@ action depends on when it is executed. It does not capture the sanpshot
-- of the ring at a particular time.
{-# INLINE slidingWindowWith #-}
slidingWindowWith :: forall m a b. (MonadIO m, Storable a, Unbox a)
slidingWindowWith :: forall m a b. (MonadIO m, Unbox a)
=> Int -> Fold m ((a, Maybe a), m (MutArray a)) b -> Fold m a b
slidingWindowWith n (Fold step1 initial1 extract1 final1) =
Fold step initial extract final
@ -568,10 +516,10 @@ slidingWindowWith n (Fold step1 initial1 extract1 final1) =
then error "Window size must be > 0"
else do
r <- initial1
(rb, rh) <- liftIO $ new n
rb <- liftIO $ new n
return $
case r of
Partial s -> Partial $ Tuple4' rb rh (0 :: Int) s
Partial s -> Partial $ Tuple4' rb 0 (0 :: Int) s
Done b -> Done b
toArray foldRing rb rh = do
@ -583,17 +531,15 @@ slidingWindowWith n (Fold step1 initial1 extract1 final1) =
step (Tuple4' rb rh i st) a
| i < n = do
rh1 <- liftIO $ unsafeInsert rb rh a
liftIO $ touchForeignPtr (ringStart rb)
let action = toArray unsafeFoldRingM rb (PTR_NEXT(rh, a))
let action = toArray unsafeFoldRingM rb rh1
r <- step1 st ((a, Nothing), action)
return $
case r of
Partial s -> Partial $ Tuple4' rb rh1 (i + 1) s
Done b -> Done b
| otherwise = do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a, rh, (ringContents rb))
rh1 <- liftIO $ unsafeInsert rb rh a
liftIO $ touchForeignPtr (ringStart rb)
r <- step1 st ((a, Just old), toArray unsafeFoldRingFullM rb rh1)
return $
case r of
@ -614,6 +560,6 @@ slidingWindowWith n (Fold step1 initial1 extract1 final1) =
-- there is no old element.
--
{-# INLINE slidingWindow #-}
slidingWindow :: forall m a b. (MonadIO m, Storable a, Unbox a)
slidingWindow :: forall m a b. (MonadIO m, Unbox a)
=> Int -> Fold m (a, Maybe a) b -> Fold m a b
slidingWindow n f = slidingWindowWith n (lmap fst f)

View File

@ -67,7 +67,6 @@ where
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Storable (Storable)
import GHC.Exts (SpecConstrAnnotation(..))
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.Parser (ParseError(..))
@ -772,7 +771,7 @@ stripPrefix (Stream stepa ta) (Stream stepb tb) = go SPEC Nothing' ta tb
-- /Requires 'Storable' constraint/
--
{-# INLINE isInfixOf #-}
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a)
isInfixOf :: (MonadIO m, Eq a, Enum a, Unbox a)
=> Stream m a -> Stream m a -> m Bool
isInfixOf infx stream = do
arr <- fold Array.write infx

View File

@ -159,7 +159,7 @@ import Control.Monad.IO.Class (MonadIO(..))
import Data.Bits (shiftR, shiftL, (.|.), (.&.))
import Data.Proxy (Proxy(..))
import Data.Word (Word32)
import Foreign.Storable (Storable, peek)
import Streamly.Internal.Data.Unbox (Unbox(..))
import Fusion.Plugin.Types (Fuse(..))
import GHC.Types (SPEC(..))
@ -167,7 +167,6 @@ import Streamly.Internal.Data.Array.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Parser (ParseError(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Unbox (Unbox, sizeOf)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import qualified Streamly.Internal.Data.Array.Type as A
@ -2110,7 +2109,7 @@ data SplitOnSeqState rb rh ck w fs s b x =
{-# INLINE_NORMAL splitOnSeq #-}
splitOnSeq
:: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a)
:: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a)
=> Array a
-> Fold m a b
-> Stream m a
@ -2174,8 +2173,8 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
<= sizeOf (Proxy :: Proxy Word)
then return $ Skip $ SplitOnSeqWordInit acc state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSeqKRInit 0 acc state rb rhead
rb <- liftIO $ RB.new patLen
skip $ SplitOnSeqKRInit 0 acc state rb 0
FL.Done b -> skip $ SplitOnSeqYield b SplitOnSeqInit
stepOuter _ (SplitOnSeqYield x next) = return $ Yield x next
@ -2310,7 +2309,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
rh1 <- liftIO $ RB.unsafeInsert rb rh x
if idx == maxIndex
then do
let fld = RB.unsafeFoldRing (RB.ringBound rb)
let fld = RB.unsafeFoldRing (RB.ringCapacity rb)
let !ringHash = fld addCksum 0 rb
if ringHash == patHash
then skip $ SplitOnSeqKRCheck fs s rb rh1
@ -2318,7 +2317,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
else skip $ SplitOnSeqKRInit (idx + 1) fs s rb rh1
Skip s -> skip $ SplitOnSeqKRInit idx fs s rb rh
Stop -> do
skip $ SplitOnSeqKRDone idx fs rb (RB.startOf rb)
skip $ SplitOnSeqKRDone idx fs rb 0
-- XXX The recursive "go" is more efficient than the state based recursion
-- code commented out below. Perhaps its more efficient because of
@ -2333,7 +2332,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a,rh,(RB.ringContents rb))
let cksum1 = deltaCksum cksum old x
r <- fstep fs old
case r of
@ -2343,7 +2342,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
then skip $ SplitOnSeqKRCheck fs1 s rb rh1
else go SPEC fs1 s rh1 cksum1
FL.Done b -> do
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC fs s rh cksum
@ -2379,7 +2378,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
if RB.unsafeEqArray rb rh patArr
then do
r <- final fs
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSeqKRInit 0 c st rb rst
yieldProceed jump r
else skip $ SplitOnSeqKRLoop fs st rb rh patHash
@ -2388,7 +2387,7 @@ splitOnSeq patArr (Fold fstep initial _ final) (Stream step state) =
r <- final fs
skip $ SplitOnSeqYield r SplitOnSeqDone
stepOuter _ (SplitOnSeqKRDone n fs rb rh) = do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a,rh,(RB.ringContents rb))
let rh1 = RB.advance rb rh
r <- fstep fs old
case r of
@ -2423,7 +2422,7 @@ data SplitOnSuffixSeqState rb rh ck w fs s b x =
{-# INLINE_NORMAL splitOnSuffixSeq #-}
splitOnSuffixSeq
:: forall m a b. (MonadIO m, Storable a, Unbox a, Enum a, Eq a)
:: forall m a b. (MonadIO m, Unbox a, Enum a, Eq a)
=> Bool
-> Array a
-> Fold m a b
@ -2507,8 +2506,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
<= sizeOf (Proxy :: Proxy Word)
then skip $ SplitOnSuffixSeqWordInit fs state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSuffixSeqKRInit 0 fs state rb rhead
rb <- liftIO $ RB.new patLen
skip $ SplitOnSuffixSeqKRInit 0 fs state rb 0
FL.Done fb -> skip $ SplitOnSuffixSeqYield fb SplitOnSuffixSeqInit
stepOuter _ (SplitOnSuffixSeqYield x next) = return $ Yield x next
@ -2663,7 +2662,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
FL.Partial fs1 ->
skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1
FL.Done b -> do
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> skip $ SplitOnSuffixSeqKRInit idx0 fs s rb rh0
@ -2685,14 +2684,14 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
if idx /= maxIndex
then go SPEC (idx + 1) rh1 s fs1
else skip $
let fld = RB.unsafeFoldRing (RB.ringBound rb)
let fld = RB.unsafeFoldRing (RB.ringCapacity rb)
!ringHash = fld addCksum 0 rb
in if ringHash == patHash
then SplitOnSuffixSeqKRCheck fs1 s rb rh1
else SplitOnSuffixSeqKRLoop
fs1 s rb rh1 ringHash
FL.Done b -> do
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC idx rh s fs
@ -2704,7 +2703,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
then do
r <- final fs
skip $ SplitOnSuffixSeqYield r SplitOnSuffixSeqDone
else skip $ SplitOnSuffixSeqKRDone idx fs rb (RB.startOf rb)
else skip $ SplitOnSuffixSeqKRDone idx fs rb 0
stepOuter gst (SplitOnSuffixSeqKRLoop fs0 st0 rb rh0 cksum0) =
go SPEC fs0 st0 rh0 cksum0
@ -2715,7 +2714,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
res <- step (adaptState gst) st
case res of
Yield x s -> do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a,rh,(RB.ringContents rb))
rh1 <- liftIO (RB.unsafeInsert rb rh x)
let cksum1 = deltaCksum cksum old x
r <- if withSep then fstep fs x else fstep fs old
@ -2725,7 +2724,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
then go SPEC fs1 s rh1 cksum1
else skip $ SplitOnSuffixSeqKRCheck fs1 s rb rh1
FL.Done b -> do
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC fs s rh cksum
@ -2742,7 +2741,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
if RB.unsafeEqArray rb rh patArr
then do
r <- final fs
let rst = RB.startOf rb
let rst = 0
jump c = SplitOnSuffixSeqKRInit 0 c st rb rst
yieldProceed jump r
else skip $ SplitOnSuffixSeqKRLoop fs st rb rh patHash
@ -2751,7 +2750,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial _ final) (Stream step state)
r <- final fs
skip $ SplitOnSuffixSeqYield r SplitOnSuffixSeqDone
stepOuter _ (SplitOnSuffixSeqKRDone n fs rb rh) = do
old <- liftIO $ peek rh
old <- liftIO $ PEEK_ELEM(a,rh,(RB.ringContents rb))
let rh1 = RB.advance rb rh
r <- fstep fs old
case r of

View File

@ -1,5 +1,6 @@
{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
-- |
-- Module : Streamly.Internal.Data.Stream.IsStream.Common

View File

@ -1,4 +1,5 @@
{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
-- |
-- Module : Streamly.Internal.Data.Stream.IsStream.Reduce

View File

@ -21,7 +21,8 @@ import Test.Hspec as H
unsafeEqArrayN :: [Int] -> [Int] -> Int -> Int -> Bool -> IO ()
unsafeEqArrayN lstArr lstRing startR nelem expected = do
let arr = Array.fromList lstArr
(ring, rh) <- Ring.new (length lstRing)
ring <- Ring.new (length lstRing)
let rh = 0
void $ P.foldlM (Ring.unsafeInsert ring) rh lstRing
Ring.unsafeEqArrayN ring (Ring.moveBy startR ring rh) arr nelem
`shouldBe` expected
@ -29,7 +30,8 @@ unsafeEqArrayN lstArr lstRing startR nelem expected = do
unsafeEqArray :: [Int] -> [Int] -> Int -> Bool -> IO ()
unsafeEqArray lstArr lstRing startR expected = do
let arr = Array.fromList lstArr
(ring, rh) <- Ring.new (length lstRing)
ring <- Ring.new (length lstRing)
let rh = 0
void $ P.foldlM (Ring.unsafeInsert ring) rh lstRing
Ring.unsafeEqArray ring (Ring.moveBy startR ring rh) arr
`shouldBe` expected

View File

@ -25,7 +25,6 @@ import Data.List (sort, group, intercalate)
import Data.Maybe ( isJust, fromJust )
import Data.Word (Word8)
import Data.Semigroup (Sum(..), getSum)
import Foreign.Storable (Storable)
import Streamly.Internal.Data.MutByteArray (Unbox)
import Test.Hspec.QuickCheck
import Test.QuickCheck
@ -80,11 +79,11 @@ splitOnSuffix :: Monad m =>
(a -> Bool) -> FL.Fold m a b -> S.Stream m a -> S.Stream m b
splitOnSuffix predicate f = S.foldMany (FL.takeEndBy_ predicate f)
splitOnSeq' :: (MonadIO m, Unbox a, Storable a, Enum a, Eq a) =>
splitOnSeq' :: (MonadIO m, Unbox a, Enum a, Eq a) =>
A.Array a -> FL.Fold m a b -> S.Stream m a -> S.Stream m b
splitOnSeq' patt f m = IS.foldManyPost (FL.takeEndBySeq_ patt f) m
splitOnSuffixSeq' :: (MonadIO m, Unbox a, Storable a, Enum a, Eq a) =>
splitOnSuffixSeq' :: (MonadIO m, Unbox a, Enum a, Eq a) =>
A.Array a -> FL.Fold m a b -> S.Stream m a -> S.Stream m b
splitOnSuffixSeq' patt f m = S.foldMany (FL.takeEndBySeq_ patt f) m
@ -154,7 +153,7 @@ splitOnSuffixSeq = do
splitOnSuffixSeq' (A.fromList pat) FL.toList (S.fromList xs)
splitterProperties ::
forall a. (Arbitrary a, Eq a, Show a, Storable a, Unbox a, Enum a)
forall a. (Arbitrary a, Eq a, Show a, Unbox a, Enum a)
=> a
-> String
-> Spec