diff --git a/.hlint.ignore b/.hlint.ignore index 918028b13..ef696f0bc 100644 --- a/.hlint.ignore +++ b/.hlint.ignore @@ -14,7 +14,7 @@ src/Streamly/Internal/Data/Stream/StreamD.hs src/Streamly/Internal/Data/Pipe/Types.hs src/Streamly/Internal/Data/SmallArray/Types.hs src/Streamly/Internal/Data/Unicode/Stream.hs -src/Streamly/Internal/Mutable/Prim/Var.hs +src/Streamly/Internal/Data/IORef/Prim.hs src/Streamly/Internal/Data/Array/Prim/Types.hs src/Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs src/Streamly/Internal/Data/Array/Prim/TypesInclude.hs diff --git a/src/Streamly/Internal/Data/IORef/Prim.hs b/src/Streamly/Internal/Data/IORef/Prim.hs new file mode 100644 index 000000000..6a4a1f311 --- /dev/null +++ b/src/Streamly/Internal/Data/IORef/Prim.hs @@ -0,0 +1,84 @@ +{-# LANGUAGE UnboxedTuples #-} + +#include "inline.hs" + +-- | +-- Module : Streamly.Internal.Data.IORef.Prim +-- Copyright : (c) 2019 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +-- A mutable variable in a mutation capable monad (IO) holding a 'Prim' +-- value. This allows fast modification because of unboxed storage. +-- +-- = Multithread Consistency Notes +-- +-- In general, any value that straddles a machine word cannot be guaranteed to +-- be consistently read from another thread without a lock. GHC heap objects +-- are always machine word aligned, therefore, a 'IORef' is also word aligned. +-- On a 64-bit platform, writing a 64-bit aligned type from one thread and +-- reading it from another thread should give consistent old or new value. The +-- same holds true for 32-bit values on a 32-bit platform. + +module Streamly.Internal.Data.IORef.Prim + ( + IORef + , Prim + + -- * Construction + , newIORef + + -- * Write + , writeIORef + , modifyIORef' + + -- * Read + , readIORef + ) +where + +import Control.Monad.Primitive (primitive_) +import Data.Primitive.Types (Prim, sizeOf#, readByteArray#, writeByteArray#) +import GHC.Exts (MutableByteArray#, newByteArray#, RealWorld) +import GHC.IO (IO(..)) + +-- | An 'IORef' holds a single 'Prim' value. +data IORef a = IORef (MutableByteArray# RealWorld) + +-- | Create a new 'IORef'. +-- +-- /Internal/ +{-# INLINE newIORef #-} +newIORef :: forall a. Prim a => a -> IO (IORef a) +newIORef x = IO (\s# -> + case newByteArray# (sizeOf# (undefined :: a)) s# of + (# s1#, arr# #) -> + case writeByteArray# arr# 0# x s1# of + s2# -> (# s2#, IORef arr# #) + ) + +-- | Write a value to an 'IORef'. +-- +-- /Internal/ +{-# INLINE writeIORef #-} +writeIORef :: Prim a => IORef a -> a -> IO () +writeIORef (IORef arr#) x = primitive_ (writeByteArray# arr# 0# x) + +-- | Read a value from an 'IORef'. +-- +-- /Internal/ +{-# INLINE readIORef #-} +readIORef :: Prim a => IORef a -> IO a +readIORef (IORef arr#) = IO (readByteArray# arr# 0#) + +-- | Modify the value of an 'IORef' using a function with strict application. +-- +-- /Internal/ +{-# INLINE modifyIORef' #-} +modifyIORef' :: Prim a => IORef a -> (a -> a) -> IO () +modifyIORef' (IORef arr#) g = primitive_ $ \s# -> + case readByteArray# arr# 0# s# of + (# s'#, a #) -> let a' = g a in a' `seq` writeByteArray# arr# 0# a' s'# diff --git a/src/Streamly/Internal/Data/Stream/IsStream.hs b/src/Streamly/Internal/Data/Stream/IsStream.hs index 7785f5829..3c68112e3 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream.hs @@ -66,7 +66,7 @@ module Streamly.Internal.Data.Stream.IsStream , fromListM , K.fromFoldable , fromFoldableM - , fromPrimVar + , fromPrimIORef , fromCallback -- * Elimination @@ -553,7 +553,7 @@ import Streamly.Internal.Data.Pipe.Types (Pipe (..)) import Streamly.Internal.Data.Time.Units ( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime , toAbsTime, TimeUnit64, RelTime64, addToAbsTime64) -import Streamly.Internal.Mutable.Prim.Var (Prim, Var) +import Streamly.Internal.Data.IORef.Prim (Prim, IORef) import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) import qualified Streamly.Internal.Memory.Array as A @@ -978,13 +978,13 @@ fromHandle h = go str <- liftIO $ IO.hGetLine h yld str go --- | Construct a stream by reading a 'Prim' 'Var' repeatedly. +-- | Construct a stream by reading a 'Prim' 'IORef' repeatedly. -- -- /Internal/ -- -{-# INLINE fromPrimVar #-} -fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a -fromPrimVar = fromStreamD . D.fromPrimVar +{-# INLINE fromPrimIORef #-} +fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a +fromPrimIORef = fromStreamD . D.fromPrimIORef -- | Takes a callback setter function and provides it with a callback. The -- callback when invoked adds a value at the tail of the stream. Returns a diff --git a/src/Streamly/Internal/Data/Stream/StreamD.hs b/src/Streamly/Internal/Data/Stream/StreamD.hs index 6495f1307..fae39618c 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD.hs @@ -81,7 +81,7 @@ module Streamly.Internal.Data.Stream.StreamD , fromListM , fromStreamK , fromStreamD - , fromPrimVar + , fromPrimIORef , fromSVar -- * Elimination @@ -325,8 +325,7 @@ import Foreign.Storable (Storable(..)) import GHC.Types (SPEC(..)) import System.Mem (performMajorGC) import Fusion.Plugin.Types (Fuse(..)) -import Streamly.Internal.Mutable.Prim.Var - (Prim, Var, readVar, newVar, modifyVar') +import Streamly.Internal.Data.IORef.Prim (Prim) import Streamly.Internal.Data.Time.Units (TimeUnit64, toRelTime64, diffAbsTime64, RelTime64) import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_) @@ -341,6 +340,7 @@ import Streamly.Internal.Data.Unfold.Types (Unfold(..)) import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..)) import Streamly.Internal.Data.Stream.SVar (fromConsumer, pushToFold) +import qualified Streamly.Internal.Data.IORef.Prim as Prim import qualified Streamly.Internal.Data.Pipe.Types as Pipe import qualified Streamly.Internal.Memory.Array.Types as A import qualified Streamly.Internal.Memory.Mutable.Array.Types as MA @@ -677,12 +677,12 @@ fromListM = Stream step toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a toStreamD = fromStreamK . K.toStream -{-# INLINE_NORMAL fromPrimVar #-} -fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a -fromPrimVar var = Stream step () +{-# INLINE_NORMAL fromPrimIORef #-} +fromPrimIORef :: (MonadIO m, Prim a) => Prim.IORef a -> Stream m a +fromPrimIORef var = Stream step () where {-# INLINE_LATE step #-} - step _ () = liftIO (readVar var) >>= \x -> return $ Yield x () + step _ () = liftIO (Prim.readIORef var) >>= \x -> return $ Yield x () ------------------------------------------------------------------------------- -- Generation from SVar @@ -3811,17 +3811,17 @@ pollCounts predicate transf fld (Stream step state) = Stream step' Nothing -- As long as we are using an "Int" for counts lockfree reads from -- Var should work correctly on both 32-bit and 64-bit machines. -- However, an Int on a 32-bit machine may overflow quickly. - countVar <- liftIO $ newVar (0 :: Int) + countVar <- liftIO $ Prim.newIORef (0 :: Int) tid <- forkManaged $ void $ runFold fld - $ transf $ fromPrimVar countVar + $ transf $ fromPrimIORef countVar return $ Skip (Just (countVar, tid, state)) step' gst (Just (countVar, tid, st)) = do r <- step gst st case r of Yield x s -> do - when (predicate x) $ liftIO $ modifyVar' countVar (+ 1) + when (predicate x) $ liftIO $ Prim.modifyIORef' countVar (+ 1) return $ Yield x (Just (countVar, tid, s)) Skip s -> return $ Skip (Just (countVar, tid, s)) Stop -> do @@ -3842,12 +3842,12 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing i <- MC.catch (do liftIO $ threadDelay (round $ samplingRate * 1000000) - i <- liftIO $ readVar countVar + i <- liftIO $ Prim.readIORef countVar let !diff = i - prev void $ action diff return i) (\(e :: AsyncException) -> do - i <- liftIO $ readVar countVar + i <- liftIO $ Prim.readIORef countVar let !diff = i - prev void $ action diff throwM (MC.toException e)) @@ -3855,7 +3855,7 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing {-# INLINE_LATE step' #-} step' _ Nothing = do - countVar <- liftIO $ newVar 0 + countVar <- liftIO $ Prim.newIORef 0 tid <- fork $ loop countVar 0 ref <- liftIO $ newIORef () _ <- liftIO $ mkWeakIORef ref (killThread tid) @@ -3865,7 +3865,7 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing r <- step gst st case r of Yield x s -> do - liftIO $ modifyVar' countVar (+ 1) + liftIO $ Prim.modifyIORef' countVar (+ 1) return $ Yield x (Just (countVar, tid, s, ref)) Skip s -> return $ Skip (Just (countVar, tid, s, ref)) Stop -> do @@ -4628,13 +4628,13 @@ dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1) Stop -> Stop {-# INLINE updateTimeVar #-} -updateTimeVar :: Var IO Int64 -> IO () +updateTimeVar :: Prim.IORef Int64 -> IO () updateTimeVar timeVar = do MicroSecond64 t <- fromAbsTime <$> getTime Monotonic - modifyVar' timeVar (const t) + Prim.modifyIORef' timeVar (const t) {-# INLINE updateWithDelay #-} -updateWithDelay :: RealFrac a => a -> Var IO Int64 -> IO () +updateWithDelay :: RealFrac a => a -> Prim.IORef Int64 -> IO () updateWithDelay precision timeVar = do threadDelay (delayTime precision) updateTimeVar timeVar @@ -4666,14 +4666,14 @@ times g = Stream step Nothing -- XXX note that this is safe only on a 64-bit machine. On a 32-bit -- machine a 64-bit 'Var' cannot be read consistently without a lock -- while another thread is writing to it. - timeVar <- liftIO $ newVar (0 :: Int64) + timeVar <- liftIO $ Prim.newIORef (0 :: Int64) liftIO $ updateTimeVar timeVar tid <- forkManaged $ liftIO $ forever (updateWithDelay g timeVar) - a <- liftIO $ readVar timeVar + a <- liftIO $ Prim.readIORef timeVar return $ Skip $ Just (timeVar, tid, a) step _ s@(Just (timeVar, _, t0)) = do - a <- liftIO $ readVar timeVar + a <- liftIO $ Prim.readIORef timeVar -- XXX we can perhaps use an AbsTime64 using a 64 bit Int for -- efficiency. or maybe we can use a representation using Double for -- floating precision time diff --git a/src/Streamly/Internal/Mutable/Prim/Var.hs b/src/Streamly/Internal/Mutable/Prim/Var.hs deleted file mode 100644 index 77771ce94..000000000 --- a/src/Streamly/Internal/Mutable/Prim/Var.hs +++ /dev/null @@ -1,83 +0,0 @@ -{-# LANGUAGE UnboxedTuples #-} - -#include "inline.hs" - --- | --- Module : Streamly.Internal.Mutable.Prim.Var --- Copyright : (c) 2019 Composewell Technologies --- --- License : BSD3 --- Maintainer : streamly@composewell.com --- Stability : experimental --- Portability : GHC --- --- A mutable variable in a mutation capable monad (IO/ST) holding a 'Prim' --- value. This allows fast modification because of unboxed storage. --- --- = Multithread Consistency Notes --- --- In general, any value that straddles a machine word cannot be guaranteed to --- be consistently read from another thread without a lock. GHC heap objects --- are always machine word aligned, therefore, a 'Var' is also word aligned. On --- a 64-bit platform, writing a 64-bit aligned type from one thread and reading --- it from another thread should give consistent old or new value. The same --- holds true for 32-bit values on a 32-bit platform. - -module Streamly.Internal.Mutable.Prim.Var - ( - Var - , MonadMut - , Prim - - -- * Construction - , newVar - - -- * Write - , writeVar - , modifyVar' - - -- * Read - , readVar - ) -where - -import Control.Monad.Primitive (PrimMonad(..), primitive_) -import Data.Primitive.Types (Prim, sizeOf#, readByteArray#, writeByteArray#) -import GHC.Exts (MutableByteArray#, newByteArray#) - --- | A 'Var' holds a single 'Prim' value. -data Var m a = Var (MutableByteArray# (PrimState m)) - --- The name PrimMonad does not give a clue what it means, an explicit "Mut" --- suffix provides a better hint. MonadMut is just a generalization of MonadIO. --- --- | A monad that allows mutable operations using a state token. -type MonadMut = PrimMonad - --- | Create a new mutable variable. -{-# INLINE newVar #-} -newVar :: forall m a. (MonadMut m, Prim a) => a -> m (Var m a) -newVar x = primitive (\s# -> - case newByteArray# (sizeOf# (undefined :: a)) s# of - (# s1#, arr# #) -> - case writeByteArray# arr# 0# x s1# of - s2# -> (# s2#, Var arr# #) - ) - --- | Write a value to a mutable variable. -{-# INLINE writeVar #-} -writeVar :: (MonadMut m, Prim a) => Var m a -> a -> m () -writeVar (Var arr#) x = primitive_ (writeByteArray# arr# 0# x) - --- | Read a value from a variable. -{-# INLINE readVar #-} -readVar :: (MonadMut m, Prim a) => Var m a -> m a -readVar (Var arr#) = primitive (readByteArray# arr# 0#) - --- | Modify the value of a mutable variable using a function with strict --- application. -{-# INLINE modifyVar' #-} -modifyVar' :: (MonadMut m, Prim a) => Var m a -> (a -> a) -> m () -modifyVar' (Var arr#) g = primitive_ $ \s# -> - case readByteArray# arr# 0# s# of - (# s'#, a #) -> let a' = g a in a' `seq` writeByteArray# arr# 0# a' s'# diff --git a/streamly.cabal b/streamly.cabal index ba61a4dd1..ead7edf7c 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -386,7 +386,7 @@ library , Streamly.Internal.Data.SVar -- Mutable data - , Streamly.Internal.Mutable.Prim.Var + , Streamly.Internal.Data.IORef.Prim -- Arrays , Streamly.Internal.Data.Array