Modify unsafe(Pinned)AsPtr & introduce unsafe(Pinned)CreateUsingPtr

unsafe(Pinned)AsPtr is used to read and overwrite
unsafe(Pinned)CreateUsingPtr is used to create
This commit is contained in:
Adithya Kumar 2024-07-19 19:49:08 +05:30
parent 090e8e61af
commit aa27315c43
9 changed files with 104 additions and 73 deletions

View File

@ -511,7 +511,7 @@ asCStringUnsafe :: Array a -> (CString -> IO b) -> IO b
asCStringUnsafe arr act = do
let arr1 = asBytes arr <> fromList [0]
-- unsafePinnedAsPtr makes sure the array is pinned
unsafePinnedAsPtr arr1 $ \ptr -> act (castPtr ptr)
unsafePinnedAsPtr arr1 $ \ptr _ -> act (castPtr ptr)
-------------------------------------------------------------------------------
-- Folds

View File

@ -249,13 +249,13 @@ data Array a =
-- /Pre-release/
--
{-# INLINE unsafePinnedAsPtr #-}
unsafePinnedAsPtr :: MonadIO m => Array a -> (Ptr a -> m b) -> m b
unsafePinnedAsPtr :: MonadIO m => Array a -> (Ptr a -> Int -> m b) -> m b
unsafePinnedAsPtr arr = MA.unsafePinnedAsPtr (unsafeThaw arr)
{-# DEPRECATED asPtrUnsafe "Please use unsafePinnedAsPtr instead." #-}
{-# INLINE asPtrUnsafe #-}
asPtrUnsafe :: MonadIO m => Array a -> (Ptr a -> m b) -> m b
asPtrUnsafe = unsafePinnedAsPtr
asPtrUnsafe arr f = unsafePinnedAsPtr arr (\p _ -> f p)
-------------------------------------------------------------------------------
-- Freezing and Thawing

View File

@ -100,6 +100,9 @@ module Streamly.Internal.Data.MutArray.Type
, fromChunksK
, fromChunksRealloced -- fromSmallChunks
, unsafeCreateUsingPtr
, unsafePinnedCreateUsingPtr
-- ** Random writes
, putIndex
, putIndexUnsafe -- XXX unsafePutIndex
@ -323,7 +326,7 @@ import Data.Functor.Identity (Identity(..))
import Data.Proxy (Proxy(..))
import Data.Word (Word8, Word16)
import Foreign.C.Types (CSize(..), CInt(..))
import Foreign.Ptr (plusPtr, minusPtr, nullPtr, castPtr)
import Foreign.Ptr (plusPtr, minusPtr, nullPtr)
import Streamly.Internal.Data.MutByteArray.Type
( MutByteArray(..)
, PinnedState(..)
@ -2441,14 +2444,13 @@ fromPureStream xs =
{-# INLINABLE fromPtrN #-}
fromPtrN :: MonadIO m => Int -> Ptr Word8 -> m (MutArray Word8)
fromPtrN len addr = do
fromPtrN len addr =
-- memcpy is better than stream copy when the size is known.
-- XXX We can implement a stream copy in a similar way by streaming Word64
-- first and then remaining Word8.
arr <- new len
_ <- unsafeAsPtr arr
(\ptr -> liftIO $ c_memcpy ptr addr (fromIntegral len))
return (arr {arrEnd = len})
unsafeCreateUsingPtr len
$ \ptr -> liftIO $ c_memcpy ptr addr (fromIntegral len) >> pure len
{-# INLINABLE fromCString# #-}
fromCString# :: MonadIO m => Addr# -> m (MutArray Word8)
@ -2462,7 +2464,7 @@ fromCString# addr = do
len <- liftIO $ c_strlen (Ptr addr)
let lenInt = fromIntegral len
arr <- new lenInt
_ <- unsafeAsPtr arr (\ptr -> liftIO $ c_memcpy ptr (Ptr addr) len)
_ <- unsafeAsPtr arr (\ptr _ -> liftIO $ c_memcpy ptr (Ptr addr) len)
return (arr {arrEnd = lenInt})
{-# DEPRECATED fromByteStr# "Please fromCString# instead." #-}
@ -2482,12 +2484,13 @@ fromW16CString# addr = do
let bytes = w16len * 2
-- The array type is inferred from c_memcpy type, therefore, it is not the
-- same as the returned array type.
arr :: MutArray Word8 <- emptyOf bytes
_ <- unsafeAsPtr arr (\ptr -> liftIO
$ c_memcpy (castPtr ptr) (Ptr addr) (fromIntegral bytes))
-- CAUTION! The array type is inferred from the return type and may be
-- different from the arr type.
return (arr {arrEnd = bytes})
arr <-
unsafeCreateUsingPtr
bytes
(\ptr ->
liftIO $
c_memcpy ptr (Ptr addr) (fromIntegral bytes) >> pure bytes)
pure $ unsafeCast arr
-------------------------------------------------------------------------------
-- convert a stream of arrays to a single array by reallocating and copying
@ -2774,10 +2777,10 @@ splitOn predicate arr =
{-# INLINE breakOn #-}
breakOn :: MonadIO m
=> Word8 -> MutArray Word8 -> m (MutArray Word8, Maybe (MutArray Word8))
breakOn sep arr@MutArray{..} = unsafeAsPtr arr $ \p -> liftIO $ do
breakOn sep arr@MutArray{..} = unsafeAsPtr arr $ \p byteLen -> liftIO $ do
-- XXX We do not need memchr here, we can use a Haskell equivalent.
-- Need efficient stream based primitives that work on Word64.
loc <- c_memchr p sep (fromIntegral $ byteLength arr)
loc <- c_memchr p sep (fromIntegral byteLen)
let sepIndex = loc `minusPtr` p
return $
if loc == nullPtr
@ -2906,21 +2909,68 @@ cast arr =
-- /Pre-release/
--
{-# INLINE unsafePinnedAsPtr #-}
unsafePinnedAsPtr :: MonadIO m => MutArray a -> (Ptr a -> m b) -> m b
unsafePinnedAsPtr :: MonadIO m => MutArray a -> (Ptr a -> Int -> m b) -> m b
unsafePinnedAsPtr arr f =
Unboxed.unsafePinnedAsPtr
(arrContents arr) (\ptr -> f (ptr `plusPtr` arrStart arr))
(arrContents arr)
(\ptr -> f (ptr `plusPtr` arrStart arr) (byteLength arr))
{-# DEPRECATED asPtrUnsafe "Please use unsafePinnedAsPtr instead." #-}
{-# INLINE asPtrUnsafe #-}
asPtrUnsafe :: MonadIO m => MutArray a -> (Ptr a -> m b) -> m b
asPtrUnsafe = unsafePinnedAsPtr
asPtrUnsafe a f = unsafePinnedAsPtr a (\p _ -> f p)
-- NOTE: unsafeAsPtr is safe to use unsafe with ffi given that the ffi function
-- does not need the pointer to be valid after the call has completed.
{-# INLINE unsafeAsPtr #-}
unsafeAsPtr :: MonadIO m => MutArray a -> (Ptr a -> m b) -> m b
unsafeAsPtr :: MonadIO m => MutArray a -> (Ptr a -> Int -> m b) -> m b
unsafeAsPtr arr f =
Unboxed.unsafeAsPtr
(arrContents arr) (\ptr -> f (ptr `plusPtr` arrStart arr))
(arrContents arr)
(\ptr -> f (ptr `plusPtr` arrStart arr) (byteLength arr))
-- | @unsafeCreateUsingPtr capacity populater@ creates an array of @capacity@
-- bytes lets the @populater@ function populate it. The @populater@ get the
-- pointer to the array and should return the amount of the capacity populated
-- in bytes.
--
-- /Unsafe/ because the pointer given should be used with care. Bytes populated
-- should be lesser than the total capacity.
{-# INLINE unsafeCreateUsingPtr #-}
unsafeCreateUsingPtr
:: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8)
unsafeCreateUsingPtr cap pop = do
(arr :: MutArray Word8) <- emptyOf cap
len <- Unboxed.unsafeAsPtr (arrContents arr) pop
when (len > cap) (error (errMsg len))
-- arrStart == 0
pure (arr { arrEnd = len })
where
errMsg len =
"unsafeCreateUsingPtr: length > capacity, "
++ "length = " ++ show len ++ ", "
++ "capacity = " ++ show cap
-- | Similar to "unsafeCreateUsingPtr" but creates a pinned array.
{-# INLINE unsafePinnedCreateUsingPtr #-}
unsafePinnedCreateUsingPtr
:: MonadIO m => Int -> (Ptr Word8 -> m Int) -> m (MutArray Word8)
unsafePinnedCreateUsingPtr cap pop = do
(arr :: MutArray Word8) <- pinnedEmptyOf cap
len <- Unboxed.unsafeAsPtr (arrContents arr) pop
when (len > cap) (error (errMsg len))
-- arrStart == 0
pure (arr { arrEnd = len })
where
errMsg len =
"unsafePinnedCreateUsingPtr: length > capacity, "
++ "length = " ++ show len ++ ", "
++ "capacity = " ++ show cap
-------------------------------------------------------------------------------
-- Equality

View File

@ -177,14 +177,10 @@ import qualified Streamly.Internal.Data.StreamK.Type as K (mkStream)
{-# INLINABLE getChunk #-}
getChunk :: MonadIO m => Int -> Handle -> m (Array Word8)
getChunk size h = liftIO $ do
arr :: MArray.MutArray Word8 <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
MArray.unsafePinnedAsPtr arr $ \p -> do
n <- hGetBufSome h p size
-- XXX shrink only if the diff is significant
return $
unsafeFreezeWithShrink $
arr { MArray.arrEnd = n, MArray.arrBound = size }
arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> hGetBufSome h p size
-- XXX shrink only if the diff is significant
pure $ unsafeFreezeWithShrink arr
-- This could be useful in implementing the "reverse" read APIs or if you want
-- to read arrays of exact size instead of compacting them later. Compacting
@ -379,13 +375,8 @@ read = A.concat . readChunks
{-# INLINABLE putChunk #-}
putChunk :: MonadIO m => Handle -> Array a -> m ()
putChunk _ arr | byteLength arr == 0 = return ()
putChunk h arr = A.unsafePinnedAsPtr arr $ \ptr ->
liftIO $ hPutBuf h ptr aLen
where
-- XXX We should have the length passed by unsafePinnedAsPtr itself.
aLen = A.byteLength arr
putChunk h arr = A.unsafePinnedAsPtr arr $ \ptr byteLen ->
liftIO $ hPutBuf h ptr byteLen
-------------------------------------------------------------------------------
-- Stream of Arrays IO

View File

@ -451,8 +451,8 @@ openWatch Config{..} paths = do
withPathName :: Array Word8 -> (PathName -> IO a) -> IO a
withPathName arr act = do
A.unsafePinnedAsPtr arr $ \ptr ->
let pname = PathName (castPtr ptr) (fromIntegral (A.length arr))
A.unsafePinnedAsPtr arr $ \ptr byteLen ->
let pname = PathName (castPtr ptr) (fromIntegral byteLen)
in act pname
withPathNames = contListMap withPathName withArray

View File

@ -870,7 +870,7 @@ readOneEvent cfg wt@(Watch _ wdMap) = do
where
readHeader (ptr :: Ptr Word8) = do
readHeader (ptr :: Ptr Word8) _ = do
let len = sizeOf (undefined :: CInt)
ewd <- peek ptr
eflags <- peekByteOff ptr len

View File

@ -218,20 +218,17 @@ openFile path mode = Handle . fst <$> FD.openFile path mode True
{-# INLINABLE readArrayUpto #-}
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto size (Handle fd) = do
arr <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
MArray.unsafePinnedAsPtr arr $ \p -> do
-- n <- hGetBufSome h p size
arr <-
MArray.unsafePinnedCreateUsingPtr size $ \p ->
-- n <- hGetBufSome h p size
#if MIN_VERSION_base(4,15,0)
n <- RawIO.read fd p 0 size
RawIO.read fd p 0 size
#else
n <- RawIO.read fd p size
RawIO.read fd p size
#endif
-- XXX shrink only if the diff is significant
-- Use unsafeFreezeWithShrink
return
$ unsafeFreeze
$ arr { MArray.arrEnd = n, MArray.arrBound = size }
-- XXX shrink only if the diff is significant
-- Use unsafeFreezeWithShrink
pure $ unsafeFreeze arr
-------------------------------------------------------------------------------
-- Array IO (output)

View File

@ -98,10 +98,10 @@ import qualified Streamly.Data.Fold as FL
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array as A
( unsafeFreeze, unsafePinnedAsPtr, byteLength, pinnedChunksOf,
( unsafeFreeze, unsafePinnedAsPtr, pinnedChunksOf,
pinnedCreateOf, unsafePinnedCreateOf, lCompactGE )
import qualified Streamly.Internal.Data.MutArray as MArray
(MutArray(..), unsafePinnedAsPtr, pinnedEmptyOf)
(unsafePinnedCreateUsingPtr)
import qualified Streamly.Internal.Data.Stream as S (fromStreamK, Stream(..), Step(..))
import qualified Streamly.Internal.Data.StreamK as K (mkStream)
@ -261,16 +261,10 @@ readArrayUptoWith
-> h
-> IO (Array Word8)
readArrayUptoWith f size h = do
arr <- MArray.pinnedEmptyOf size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
MArray.unsafePinnedAsPtr arr $ \p -> do
n <- f h p size
let v = A.unsafeFreeze
$ arr { MArray.arrEnd = n, MArray.arrBound = size }
-- XXX shrink only if the diff is significant
-- A.shrinkToFit v
return v
arr <- MArray.unsafePinnedCreateUsingPtr size $ \p -> f h p size
-- XXX shrink only if the diff is significant
-- unsafeFreezeWithShrink
pure $ A.unsafeFreeze arr
-- | Read a byte array from a file handle up to a maximum of the requested
-- size. If no data is available on the handle it blocks until some data
@ -311,11 +305,8 @@ writeArrayWith :: Unbox a
-> Array a
-> IO ()
writeArrayWith _ _ arr | A.length arr == 0 = return ()
writeArrayWith f h arr = A.unsafePinnedAsPtr arr $ \ptr -> f h (castPtr ptr) aLen
where
aLen = A.byteLength arr
writeArrayWith f h arr =
A.unsafePinnedAsPtr arr $ \ptr byteLen -> f h (castPtr ptr) byteLen
-- | Write an Array to a socket.
--

View File

@ -185,19 +185,21 @@ testUnsafeIndxedFromList inp =
testAsPtrUnsafeMA :: IO ()
testAsPtrUnsafeMA = do
arr <- MA.fromList ([0 .. 99] :: [Int])
MA.unsafePinnedAsPtr arr (getList (0 :: Int)) `shouldReturn` [0 .. 99]
MA.unsafePinnedAsPtr arr getList0 `shouldReturn` [0 .. 99]
where
sizeOfInt = sizeOf (Proxy :: Proxy Int)
getList0 p byteLen = getList p (p `plusPtr` byteLen)
-- We need to be careful here. We assume Unboxed and Storable are compatible
-- with each other. For Int, they are compatible.
getList i _
| i >= 100 = return []
getList i p = do
getList p limitP
| p >= limitP = return []
getList p limitP = do
val <- peek p
rest <- getList (i + 1) (p `plusPtr` sizeOfInt)
rest <- getList (p `plusPtr` sizeOfInt) limitP
return $ val : rest
reallocMA :: Property