mirror of
https://github.com/composewell/streamly.git
synced 2024-09-17 14:37:27 +03:00
Rename Internal.Mutable.Prim.Var to Internal.Data.IORef.Prim.
- Rename function fromPrimVar to fromPrimIORef.
This commit is contained in:
parent
971da77528
commit
1e5192145b
@ -14,7 +14,7 @@ src/Streamly/Internal/Data/Stream/StreamD.hs
|
||||
src/Streamly/Internal/Data/Pipe/Types.hs
|
||||
src/Streamly/Internal/Data/SmallArray/Types.hs
|
||||
src/Streamly/Internal/Data/Unicode/Stream.hs
|
||||
src/Streamly/Internal/Mutable/Prim/Var.hs
|
||||
src/Streamly/Internal/Data/IORef/Prim.hs
|
||||
src/Streamly/Internal/Data/Array/Prim/Types.hs
|
||||
src/Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs
|
||||
src/Streamly/Internal/Data/Array/Prim/TypesInclude.hs
|
||||
|
84
src/Streamly/Internal/Data/IORef/Prim.hs
Normal file
84
src/Streamly/Internal/Data/IORef/Prim.hs
Normal file
@ -0,0 +1,84 @@
|
||||
{-# LANGUAGE UnboxedTuples #-}
|
||||
|
||||
#include "inline.hs"
|
||||
|
||||
-- |
|
||||
-- Module : Streamly.Internal.Data.IORef.Prim
|
||||
-- Copyright : (c) 2019 Composewell Technologies
|
||||
--
|
||||
-- License : BSD3
|
||||
-- Maintainer : streamly@composewell.com
|
||||
-- Stability : experimental
|
||||
-- Portability : GHC
|
||||
--
|
||||
-- A mutable variable in a mutation capable monad (IO) holding a 'Prim'
|
||||
-- value. This allows fast modification because of unboxed storage.
|
||||
--
|
||||
-- = Multithread Consistency Notes
|
||||
--
|
||||
-- In general, any value that straddles a machine word cannot be guaranteed to
|
||||
-- be consistently read from another thread without a lock. GHC heap objects
|
||||
-- are always machine word aligned, therefore, a 'IORef' is also word aligned.
|
||||
-- On a 64-bit platform, writing a 64-bit aligned type from one thread and
|
||||
-- reading it from another thread should give consistent old or new value. The
|
||||
-- same holds true for 32-bit values on a 32-bit platform.
|
||||
|
||||
module Streamly.Internal.Data.IORef.Prim
|
||||
(
|
||||
IORef
|
||||
, Prim
|
||||
|
||||
-- * Construction
|
||||
, newIORef
|
||||
|
||||
-- * Write
|
||||
, writeIORef
|
||||
, modifyIORef'
|
||||
|
||||
-- * Read
|
||||
, readIORef
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Primitive (primitive_)
|
||||
import Data.Primitive.Types (Prim, sizeOf#, readByteArray#, writeByteArray#)
|
||||
import GHC.Exts (MutableByteArray#, newByteArray#, RealWorld)
|
||||
import GHC.IO (IO(..))
|
||||
|
||||
-- | An 'IORef' holds a single 'Prim' value.
|
||||
data IORef a = IORef (MutableByteArray# RealWorld)
|
||||
|
||||
-- | Create a new 'IORef'.
|
||||
--
|
||||
-- /Internal/
|
||||
{-# INLINE newIORef #-}
|
||||
newIORef :: forall a. Prim a => a -> IO (IORef a)
|
||||
newIORef x = IO (\s# ->
|
||||
case newByteArray# (sizeOf# (undefined :: a)) s# of
|
||||
(# s1#, arr# #) ->
|
||||
case writeByteArray# arr# 0# x s1# of
|
||||
s2# -> (# s2#, IORef arr# #)
|
||||
)
|
||||
|
||||
-- | Write a value to an 'IORef'.
|
||||
--
|
||||
-- /Internal/
|
||||
{-# INLINE writeIORef #-}
|
||||
writeIORef :: Prim a => IORef a -> a -> IO ()
|
||||
writeIORef (IORef arr#) x = primitive_ (writeByteArray# arr# 0# x)
|
||||
|
||||
-- | Read a value from an 'IORef'.
|
||||
--
|
||||
-- /Internal/
|
||||
{-# INLINE readIORef #-}
|
||||
readIORef :: Prim a => IORef a -> IO a
|
||||
readIORef (IORef arr#) = IO (readByteArray# arr# 0#)
|
||||
|
||||
-- | Modify the value of an 'IORef' using a function with strict application.
|
||||
--
|
||||
-- /Internal/
|
||||
{-# INLINE modifyIORef' #-}
|
||||
modifyIORef' :: Prim a => IORef a -> (a -> a) -> IO ()
|
||||
modifyIORef' (IORef arr#) g = primitive_ $ \s# ->
|
||||
case readByteArray# arr# 0# s# of
|
||||
(# s'#, a #) -> let a' = g a in a' `seq` writeByteArray# arr# 0# a' s'#
|
@ -66,7 +66,7 @@ module Streamly.Internal.Data.Stream.IsStream
|
||||
, fromListM
|
||||
, K.fromFoldable
|
||||
, fromFoldableM
|
||||
, fromPrimVar
|
||||
, fromPrimIORef
|
||||
, fromCallback
|
||||
|
||||
-- * Elimination
|
||||
@ -553,7 +553,7 @@ import Streamly.Internal.Data.Pipe.Types (Pipe (..))
|
||||
import Streamly.Internal.Data.Time.Units
|
||||
( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
|
||||
, toAbsTime, TimeUnit64, RelTime64, addToAbsTime64)
|
||||
import Streamly.Internal.Mutable.Prim.Var (Prim, Var)
|
||||
import Streamly.Internal.Data.IORef.Prim (Prim, IORef)
|
||||
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
|
||||
|
||||
import qualified Streamly.Internal.Memory.Array as A
|
||||
@ -978,13 +978,13 @@ fromHandle h = go
|
||||
str <- liftIO $ IO.hGetLine h
|
||||
yld str go
|
||||
|
||||
-- | Construct a stream by reading a 'Prim' 'Var' repeatedly.
|
||||
-- | Construct a stream by reading a 'Prim' 'IORef' repeatedly.
|
||||
--
|
||||
-- /Internal/
|
||||
--
|
||||
{-# INLINE fromPrimVar #-}
|
||||
fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a
|
||||
fromPrimVar = fromStreamD . D.fromPrimVar
|
||||
{-# INLINE fromPrimIORef #-}
|
||||
fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a
|
||||
fromPrimIORef = fromStreamD . D.fromPrimIORef
|
||||
|
||||
-- | Takes a callback setter function and provides it with a callback. The
|
||||
-- callback when invoked adds a value at the tail of the stream. Returns a
|
||||
|
@ -81,7 +81,7 @@ module Streamly.Internal.Data.Stream.StreamD
|
||||
, fromListM
|
||||
, fromStreamK
|
||||
, fromStreamD
|
||||
, fromPrimVar
|
||||
, fromPrimIORef
|
||||
, fromSVar
|
||||
|
||||
-- * Elimination
|
||||
@ -325,8 +325,7 @@ import Foreign.Storable (Storable(..))
|
||||
import GHC.Types (SPEC(..))
|
||||
import System.Mem (performMajorGC)
|
||||
import Fusion.Plugin.Types (Fuse(..))
|
||||
import Streamly.Internal.Mutable.Prim.Var
|
||||
(Prim, Var, readVar, newVar, modifyVar')
|
||||
import Streamly.Internal.Data.IORef.Prim (Prim)
|
||||
import Streamly.Internal.Data.Time.Units
|
||||
(TimeUnit64, toRelTime64, diffAbsTime64, RelTime64)
|
||||
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
|
||||
@ -341,6 +340,7 @@ import Streamly.Internal.Data.Unfold.Types (Unfold(..))
|
||||
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
|
||||
import Streamly.Internal.Data.Stream.SVar (fromConsumer, pushToFold)
|
||||
|
||||
import qualified Streamly.Internal.Data.IORef.Prim as Prim
|
||||
import qualified Streamly.Internal.Data.Pipe.Types as Pipe
|
||||
import qualified Streamly.Internal.Memory.Array.Types as A
|
||||
import qualified Streamly.Internal.Memory.Mutable.Array.Types as MA
|
||||
@ -677,12 +677,12 @@ fromListM = Stream step
|
||||
toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
|
||||
toStreamD = fromStreamK . K.toStream
|
||||
|
||||
{-# INLINE_NORMAL fromPrimVar #-}
|
||||
fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a
|
||||
fromPrimVar var = Stream step ()
|
||||
{-# INLINE_NORMAL fromPrimIORef #-}
|
||||
fromPrimIORef :: (MonadIO m, Prim a) => Prim.IORef a -> Stream m a
|
||||
fromPrimIORef var = Stream step ()
|
||||
where
|
||||
{-# INLINE_LATE step #-}
|
||||
step _ () = liftIO (readVar var) >>= \x -> return $ Yield x ()
|
||||
step _ () = liftIO (Prim.readIORef var) >>= \x -> return $ Yield x ()
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Generation from SVar
|
||||
@ -3811,17 +3811,17 @@ pollCounts predicate transf fld (Stream step state) = Stream step' Nothing
|
||||
-- As long as we are using an "Int" for counts lockfree reads from
|
||||
-- Var should work correctly on both 32-bit and 64-bit machines.
|
||||
-- However, an Int on a 32-bit machine may overflow quickly.
|
||||
countVar <- liftIO $ newVar (0 :: Int)
|
||||
countVar <- liftIO $ Prim.newIORef (0 :: Int)
|
||||
tid <- forkManaged
|
||||
$ void $ runFold fld
|
||||
$ transf $ fromPrimVar countVar
|
||||
$ transf $ fromPrimIORef countVar
|
||||
return $ Skip (Just (countVar, tid, state))
|
||||
|
||||
step' gst (Just (countVar, tid, st)) = do
|
||||
r <- step gst st
|
||||
case r of
|
||||
Yield x s -> do
|
||||
when (predicate x) $ liftIO $ modifyVar' countVar (+ 1)
|
||||
when (predicate x) $ liftIO $ Prim.modifyIORef' countVar (+ 1)
|
||||
return $ Yield x (Just (countVar, tid, s))
|
||||
Skip s -> return $ Skip (Just (countVar, tid, s))
|
||||
Stop -> do
|
||||
@ -3842,12 +3842,12 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing
|
||||
i <-
|
||||
MC.catch
|
||||
(do liftIO $ threadDelay (round $ samplingRate * 1000000)
|
||||
i <- liftIO $ readVar countVar
|
||||
i <- liftIO $ Prim.readIORef countVar
|
||||
let !diff = i - prev
|
||||
void $ action diff
|
||||
return i)
|
||||
(\(e :: AsyncException) -> do
|
||||
i <- liftIO $ readVar countVar
|
||||
i <- liftIO $ Prim.readIORef countVar
|
||||
let !diff = i - prev
|
||||
void $ action diff
|
||||
throwM (MC.toException e))
|
||||
@ -3855,7 +3855,7 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing
|
||||
|
||||
{-# INLINE_LATE step' #-}
|
||||
step' _ Nothing = do
|
||||
countVar <- liftIO $ newVar 0
|
||||
countVar <- liftIO $ Prim.newIORef 0
|
||||
tid <- fork $ loop countVar 0
|
||||
ref <- liftIO $ newIORef ()
|
||||
_ <- liftIO $ mkWeakIORef ref (killThread tid)
|
||||
@ -3865,7 +3865,7 @@ tapRate samplingRate action (Stream step state) = Stream step' Nothing
|
||||
r <- step gst st
|
||||
case r of
|
||||
Yield x s -> do
|
||||
liftIO $ modifyVar' countVar (+ 1)
|
||||
liftIO $ Prim.modifyIORef' countVar (+ 1)
|
||||
return $ Yield x (Just (countVar, tid, s, ref))
|
||||
Skip s -> return $ Skip (Just (countVar, tid, s, ref))
|
||||
Stop -> do
|
||||
@ -4628,13 +4628,13 @@ dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1)
|
||||
Stop -> Stop
|
||||
|
||||
{-# INLINE updateTimeVar #-}
|
||||
updateTimeVar :: Var IO Int64 -> IO ()
|
||||
updateTimeVar :: Prim.IORef Int64 -> IO ()
|
||||
updateTimeVar timeVar = do
|
||||
MicroSecond64 t <- fromAbsTime <$> getTime Monotonic
|
||||
modifyVar' timeVar (const t)
|
||||
Prim.modifyIORef' timeVar (const t)
|
||||
|
||||
{-# INLINE updateWithDelay #-}
|
||||
updateWithDelay :: RealFrac a => a -> Var IO Int64 -> IO ()
|
||||
updateWithDelay :: RealFrac a => a -> Prim.IORef Int64 -> IO ()
|
||||
updateWithDelay precision timeVar = do
|
||||
threadDelay (delayTime precision)
|
||||
updateTimeVar timeVar
|
||||
@ -4666,14 +4666,14 @@ times g = Stream step Nothing
|
||||
-- XXX note that this is safe only on a 64-bit machine. On a 32-bit
|
||||
-- machine a 64-bit 'Var' cannot be read consistently without a lock
|
||||
-- while another thread is writing to it.
|
||||
timeVar <- liftIO $ newVar (0 :: Int64)
|
||||
timeVar <- liftIO $ Prim.newIORef (0 :: Int64)
|
||||
liftIO $ updateTimeVar timeVar
|
||||
tid <- forkManaged $ liftIO $ forever (updateWithDelay g timeVar)
|
||||
a <- liftIO $ readVar timeVar
|
||||
a <- liftIO $ Prim.readIORef timeVar
|
||||
return $ Skip $ Just (timeVar, tid, a)
|
||||
|
||||
step _ s@(Just (timeVar, _, t0)) = do
|
||||
a <- liftIO $ readVar timeVar
|
||||
a <- liftIO $ Prim.readIORef timeVar
|
||||
-- XXX we can perhaps use an AbsTime64 using a 64 bit Int for
|
||||
-- efficiency. or maybe we can use a representation using Double for
|
||||
-- floating precision time
|
||||
|
@ -1,83 +0,0 @@
|
||||
{-# LANGUAGE UnboxedTuples #-}
|
||||
|
||||
#include "inline.hs"
|
||||
|
||||
-- |
|
||||
-- Module : Streamly.Internal.Mutable.Prim.Var
|
||||
-- Copyright : (c) 2019 Composewell Technologies
|
||||
--
|
||||
-- License : BSD3
|
||||
-- Maintainer : streamly@composewell.com
|
||||
-- Stability : experimental
|
||||
-- Portability : GHC
|
||||
--
|
||||
-- A mutable variable in a mutation capable monad (IO/ST) holding a 'Prim'
|
||||
-- value. This allows fast modification because of unboxed storage.
|
||||
--
|
||||
-- = Multithread Consistency Notes
|
||||
--
|
||||
-- In general, any value that straddles a machine word cannot be guaranteed to
|
||||
-- be consistently read from another thread without a lock. GHC heap objects
|
||||
-- are always machine word aligned, therefore, a 'Var' is also word aligned. On
|
||||
-- a 64-bit platform, writing a 64-bit aligned type from one thread and reading
|
||||
-- it from another thread should give consistent old or new value. The same
|
||||
-- holds true for 32-bit values on a 32-bit platform.
|
||||
|
||||
module Streamly.Internal.Mutable.Prim.Var
|
||||
(
|
||||
Var
|
||||
, MonadMut
|
||||
, Prim
|
||||
|
||||
-- * Construction
|
||||
, newVar
|
||||
|
||||
-- * Write
|
||||
, writeVar
|
||||
, modifyVar'
|
||||
|
||||
-- * Read
|
||||
, readVar
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Primitive (PrimMonad(..), primitive_)
|
||||
import Data.Primitive.Types (Prim, sizeOf#, readByteArray#, writeByteArray#)
|
||||
import GHC.Exts (MutableByteArray#, newByteArray#)
|
||||
|
||||
-- | A 'Var' holds a single 'Prim' value.
|
||||
data Var m a = Var (MutableByteArray# (PrimState m))
|
||||
|
||||
-- The name PrimMonad does not give a clue what it means, an explicit "Mut"
|
||||
-- suffix provides a better hint. MonadMut is just a generalization of MonadIO.
|
||||
--
|
||||
-- | A monad that allows mutable operations using a state token.
|
||||
type MonadMut = PrimMonad
|
||||
|
||||
-- | Create a new mutable variable.
|
||||
{-# INLINE newVar #-}
|
||||
newVar :: forall m a. (MonadMut m, Prim a) => a -> m (Var m a)
|
||||
newVar x = primitive (\s# ->
|
||||
case newByteArray# (sizeOf# (undefined :: a)) s# of
|
||||
(# s1#, arr# #) ->
|
||||
case writeByteArray# arr# 0# x s1# of
|
||||
s2# -> (# s2#, Var arr# #)
|
||||
)
|
||||
|
||||
-- | Write a value to a mutable variable.
|
||||
{-# INLINE writeVar #-}
|
||||
writeVar :: (MonadMut m, Prim a) => Var m a -> a -> m ()
|
||||
writeVar (Var arr#) x = primitive_ (writeByteArray# arr# 0# x)
|
||||
|
||||
-- | Read a value from a variable.
|
||||
{-# INLINE readVar #-}
|
||||
readVar :: (MonadMut m, Prim a) => Var m a -> m a
|
||||
readVar (Var arr#) = primitive (readByteArray# arr# 0#)
|
||||
|
||||
-- | Modify the value of a mutable variable using a function with strict
|
||||
-- application.
|
||||
{-# INLINE modifyVar' #-}
|
||||
modifyVar' :: (MonadMut m, Prim a) => Var m a -> (a -> a) -> m ()
|
||||
modifyVar' (Var arr#) g = primitive_ $ \s# ->
|
||||
case readByteArray# arr# 0# s# of
|
||||
(# s'#, a #) -> let a' = g a in a' `seq` writeByteArray# arr# 0# a' s'#
|
@ -386,7 +386,7 @@ library
|
||||
, Streamly.Internal.Data.SVar
|
||||
|
||||
-- Mutable data
|
||||
, Streamly.Internal.Mutable.Prim.Var
|
||||
, Streamly.Internal.Data.IORef.Prim
|
||||
|
||||
-- Arrays
|
||||
, Streamly.Internal.Data.Array
|
||||
|
Loading…
Reference in New Issue
Block a user