diff --git a/Changelog.md b/Changelog.md index 661aa17a..ae1a3b76 100644 --- a/Changelog.md +++ b/Changelog.md @@ -35,16 +35,9 @@ multiple folds without breaking the stream. Combinators are provided for temporal and spatial window based fold operations, for example, to support folding and aggregating data for timeout or inactivity based sessions. -#### Composable Pipes - -`Streamly.Pipes` module provides composable pipes (stream consumers and -producers). Pipes can partition, split or distribute a stream into multiple -streams, apply transformations on each stream and merge back the results into a -single stream. - #### Streaming File IO -`Streamly.FileSystem.File` provides handle based streaming file IO +`Streamly.FileSystem.Handle` provides handle based streaming file IO operations. #### Streaming Network IO diff --git a/benchmark/Linear.hs b/benchmark/Linear.hs index e2346227..0c476b6d 100644 --- a/benchmark/Linear.hs +++ b/benchmark/Linear.hs @@ -20,7 +20,7 @@ import qualified LinearOps as Ops import Streamly import qualified Streamly.Fold as FL -import qualified Streamly.Pipe as Pipe +--import qualified Streamly.Pipe as Pipe import qualified Streamly.Mem.Array as A import qualified Streamly.Prelude as S -- import qualified Streamly.Sink as Sink @@ -242,15 +242,15 @@ main = , bgroup "folds-transforms" [ benchIOSink "drain" (FL.foldl' FL.drain) , benchIOSink "lmap" (FL.foldl' (FL.lmap (+1) FL.drain)) - , benchIOSink "pipe-mapM" - (FL.foldl' (FL.transform (Pipe.mapM (\x -> return $ x + 1)) FL.drain)) + {-, benchIOSink "pipe-mapM" + (FL.foldl' (FL.transform (Pipe.mapM (\x -> return $ x + 1)) FL.drain))-} ] , bgroup "folds-compositions" -- Applicative [ benchIOSink "all,any" (FL.foldl' ((,) <$> FL.all (<= Ops.maxValue) <*> FL.any (> Ops.maxValue))) , benchIOSink "sum,length" (FL.foldl' ((,) <$> FL.sum <*> FL.length)) - ] + ] {- , bgroup "pipes" [ benchIOSink "mapM" (Ops.transformMapM serially 1) , benchIOSink "compose" (Ops.transformComposeMapM serially 1) @@ -262,7 +262,7 @@ main = , benchIOSink "compose" (Ops.transformComposeMapM serially 4) , benchIOSink "tee" (Ops.transformTeeMapM serially 4) , benchIOSink "zip" (Ops.transformZipMapM serially 4) - ] + ] -} , bgroup "transformation" [ benchIOSink "scanl" (Ops.scan 1) , benchIOSink "scanl1'" (Ops.scanl1' 1) diff --git a/benchmark/LinearOps.hs b/benchmark/LinearOps.hs index d671ac1b..eeba18a7 100644 --- a/benchmark/LinearOps.hs +++ b/benchmark/LinearOps.hs @@ -30,7 +30,7 @@ import GHC.Generics (Generic) import qualified Streamly as S hiding (foldMapWith, runStream) import qualified Streamly.Prelude as S -import qualified Streamly.Pipe as Pipe +-- import qualified Streamly.Pipe as Pipe value, maxValue, value2 :: Int #ifdef LINEAR_ASYNC @@ -370,14 +370,9 @@ scan, scanl1', map, fmap, mapMaybe, filterEven, filterAllOut, mapMaybeM, intersperse :: S.MonadAsync m => Int -> Stream m Int -> m () {-# INLINE mapM #-} -{-# INLINE transformMapM #-} -{-# INLINE transformComposeMapM #-} -{-# INLINE transformTeeMapM #-} -{-# INLINE transformZipMapM #-} {-# INLINE map' #-} {-# INLINE fmap' #-} -mapM, map', transformMapM, transformComposeMapM, transformTeeMapM, - transformZipMapM :: (S.IsStream t, S.MonadAsync m) +mapM, map' :: (S.IsStream t, S.MonadAsync m) => (t m Int -> S.SerialT m Int) -> Int -> t m Int -> m () fmap' :: (S.IsStream t, S.MonadAsync m, P.Functor (t m)) @@ -395,6 +390,18 @@ map n = composeN n $ S.map (+1) map' t n = composeN' n $ t . S.map (+1) mapM t n = composeN' n $ t . S.mapM return +-- Temporarily commented these ops as they depend on the hidden +-- Pipe module. +{- +{-# INLINE transformMapM #-} +{-# INLINE transformComposeMapM #-} +{-# INLINE transformTeeMapM #-} +{-# INLINE transformZipMapM #-} + +transformMapM, transformComposeMapM, transformTeeMapM, + transformZipMapM :: (S.IsStream t, S.MonadAsync m) + => (t m Int -> S.SerialT m Int) -> Int -> t m Int -> m () + transformMapM t n = composeN' n $ t . S.transform (Pipe.mapM return) transformComposeMapM t n = composeN' n $ t . S.transform (Pipe.mapM (\x -> return (x + 1)) @@ -405,6 +412,7 @@ transformTeeMapM t n = composeN' n $ t . S.transform transformZipMapM t n = composeN' n $ t . S.transform (Pipe.zipWith (+) (Pipe.mapM (\x -> return (x + 1))) (Pipe.mapM (\x -> return (x + 2)))) +-} mapMaybe n = composeN n $ S.mapMaybe (\x -> if Prelude.odd x then Nothing else Just x) diff --git a/examples/FileSinkServer.hs b/examples/FileSinkServer.hs index 3ad3e1f9..927673fd 100644 --- a/examples/FileSinkServer.hs +++ b/examples/FileSinkServer.hs @@ -9,21 +9,24 @@ import System.Environment (getArgs) import Streamly import Streamly.String -import qualified Streamly.FileSystem.File as File +import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Fold as FL import qualified Streamly.Mem.Array as A import qualified Streamly.Network.Socket as NS import qualified Streamly.Network.Server as NS import qualified Streamly.Prelude as S +import System.IO (withFile, IOMode(..)) + main :: IO () main = do file <- fmap head getArgs - File.append file + withFile file AppendMode + (\src -> FH.write src $ encodeChar8Unchecked $ S.concatMap A.read $ S.concatMapBy parallel (flip NS.withSocketS recv) - $ NS.connectionsOnAllAddrs 8090 + $ NS.connectionsOnAllAddrs 8090) where diff --git a/examples/FromFileClient.hs b/examples/FromFileClient.hs index e9d3abd6..f36ec2a4 100644 --- a/examples/FromFileClient.hs +++ b/examples/FromFileClient.hs @@ -7,10 +7,14 @@ import System.Environment (getArgs) import Streamly import qualified Streamly.Prelude as S -import qualified Streamly.FileSystem.File as File +import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Network.Client as Client +import System.IO (withFile, IOMode(..)) + main :: IO () main = - let sendFile = Client.writeArrays (127,0,0,1) 8090 . File.readArrays - in getArgs >>= S.drain . parallely . S.mapM sendFile . S.fromList + let sendFile file = + withFile file ReadMode $ \src -> + Client.writeArrays (127, 0, 0, 1) 8090 $ FH.readArrays src + in getArgs >>= S.drain . parallely . S.mapM sendFile . S.fromList diff --git a/src/Streamly/Fold.hs b/src/Streamly/Fold.hs index 88b5a5f7..b623a0e9 100644 --- a/src/Streamly/Fold.hs +++ b/src/Streamly/Fold.hs @@ -136,7 +136,7 @@ module Streamly.Fold , mapM -- ** Mapping - , transform + --, transform , lmap --, lsequence , lmapM @@ -320,6 +320,7 @@ module Streamly.Fold -- , chunksOf , duplicate -- experimental + {- -- * Windowed Classification -- | Split the stream into windows or chunks in space or time. Each window -- can be associated with a key, all events associated with a particular @@ -333,13 +334,13 @@ module Streamly.Fold -- ** Tumbling Windows -- | A new window starts after the previous window is finished. -- , classifyChunksOf - , classifySessionsOf + -- , classifySessionsOf -- ** Keep Alive Windows -- | The window size is extended if an event arrives within the specified -- window size. This can represent sessions with idle or inactive timeout. -- , classifyKeepAliveChunks - , classifyKeepAliveSessions + -- , classifyKeepAliveSessions {- -- ** Sliding Windows @@ -351,6 +352,7 @@ module Streamly.Fold -- ** Sliding Window Buffers -- , slidingChunkBuffer -- , slidingSessionBuffer + -} ) where @@ -492,9 +494,9 @@ mapM f = sequence . fmap f -- | Apply a transformation on a 'Fold' using a 'Pipe'. -- -- @since 0.7.0 -{-# INLINE transform #-} -transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c -transform (Pipe pstep1 pstep2 pinitial) (Fold fstep finitial fextract) = +{-# INLINE _transform #-} +_transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c +_transform (Pipe pstep1 pstep2 pinitial) (Fold fstep finitial fextract) = Fold step initial extract where @@ -2636,14 +2638,14 @@ classifySessionsBy tick timeout reset (Fold step initial extract) str = -- only if no event is received within the specified session window size. -- -- @since 0.7.0 -{-# INLINABLE classifyKeepAliveSessions #-} -classifyKeepAliveSessions +{-# INLINABLE _classifyKeepAliveSessions #-} +_classifyKeepAliveSessions :: (IsStream t, MonadAsync m, Ord k) => Double -- ^ session inactive timeout -> Fold m a b -- ^ Fold to be applied to session payload data -> t m (k, a, Bool, AbsTime) -- ^ session key, data, close flag, timestamp -> t m (k, b) -classifyKeepAliveSessions timeout = classifySessionsBy 1 timeout True +_classifyKeepAliveSessions timeout = classifySessionsBy 1 timeout True ------------------------------------------------------------------------------ -- Keyed tumbling windows @@ -2685,11 +2687,11 @@ classifyChunksOf wsize = classifyChunksBy wsize False -- the timestamps with a clock resolution of 1 second. -- -- @since 0.7.0 -{-# INLINABLE classifySessionsOf #-} -classifySessionsOf +{-# INLINABLE _classifySessionsOf #-} +_classifySessionsOf :: (IsStream t, MonadAsync m, Ord k) => Double -- ^ time window size -> Fold m a b -- ^ Fold to be applied to window events -> t m (k, a, Bool, AbsTime) -- ^ window key, data, close flag, timestamp -> t m (k, b) -classifySessionsOf interval = classifySessionsBy 1 interval False +_classifySessionsOf interval = classifySessionsBy 1 interval False diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs index 179dd1d1..cdb6b74d 100644 --- a/src/Streamly/Prelude.hs +++ b/src/Streamly/Prelude.hs @@ -240,7 +240,7 @@ module Streamly.Prelude -- * Transformation - , transform + --, transform -- ** Mapping -- | In imperative terms a map operation can be considered as a loop over @@ -1581,9 +1581,9 @@ toHandle h m = go m ------------------------------------------------------------------------------ -- | Use a 'Pipe' to transform a stream. -{-# INLINE transform #-} -transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b -transform pipe xs = fromStreamD $ D.transform pipe (toStreamD xs) +{-# INLINE _transform #-} +_transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b +_transform pipe xs = fromStreamD $ D.transform pipe (toStreamD xs) ------------------------------------------------------------------------------ -- Transformation by Folding (Scans) diff --git a/streamly.cabal b/streamly.cabal index 6f88c306..eb93edde 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -204,28 +204,28 @@ library , Streamly.Sink.Types , Streamly.Sink , Streamly.Pipe.Types + , Streamly.Pipe , Streamly.FileSystem.IOVec , Streamly.FileSystem.FDIO - - exposed-modules: Streamly.Prelude - , Streamly.Time - , Streamly - , Streamly.Fold - , Streamly.Pipe - - , Streamly.String - - -- IO devices - , Streamly.Mem.Array , Streamly.FileSystem.FD - , Streamly.FileSystem.Handle , Streamly.FileSystem.File -- Time , Streamly.Time.Units , Streamly.Time.Clock + exposed-modules: Streamly.Prelude + , Streamly.Time + , Streamly + , Streamly.Fold + + , Streamly.String + + -- IO devices + , Streamly.Mem.Array + , Streamly.FileSystem.Handle + , Streamly.Tutorial , Streamly.Internal if !impl(ghcjs) @@ -1041,7 +1041,7 @@ executable FileIOExamples hs-source-dirs: examples ghc-options: -Wall if flag(examples) || flag(examples-sdl) - buildable: True + buildable: False build-Depends: streamly , base >= 4.8 && < 5