2017-11-20 15:33:13 +03:00
|
|
|
|
# Streamly
|
2017-08-31 03:59:22 +03:00
|
|
|
|
|
2017-12-05 20:56:18 +03:00
|
|
|
|
[![Hackage](https://img.shields.io/hackage/v/streamly.svg?style=flat)](https://hackage.haskell.org/package/streamly)
|
2017-11-20 21:51:27 +03:00
|
|
|
|
[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly)
|
2017-12-01 21:58:10 +03:00
|
|
|
|
[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly)
|
|
|
|
|
[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly)
|
2018-08-12 11:40:41 +03:00
|
|
|
|
[![CircleCI](https://circleci.com/gh/composewell/streamly/tree/master.svg?style=svg)](https://circleci.com/gh/composewell/streamly/tree/master)
|
2017-12-01 21:58:10 +03:00
|
|
|
|
[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master)
|
2017-09-04 13:37:46 +03:00
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-09-14 17:34:27 +03:00
|
|
|
|
## Streaming Concurrently
|
|
|
|
|
|
|
|
|
|
Haskell lists express pure computations using composable stream operations like
|
2018-10-22 11:05:54 +03:00
|
|
|
|
`:`, `unfold`, `map`, `filter`, `zip` and `fold`. Streamly is exactly like
|
|
|
|
|
lists except that it can express sequences of pure as well as monadic
|
|
|
|
|
computations aka streams. More importantly, it can express monadic sequences
|
|
|
|
|
with concurrent execution semantics without introducing any additional APIs.
|
|
|
|
|
|
|
|
|
|
Streamly expresses concurrency using standard, well known abstractions.
|
|
|
|
|
Concurrency semantics are defined for list operations, semigroup, applicative
|
|
|
|
|
and monadic compositions. Programmer does not need to know any low level
|
|
|
|
|
notions of concurrency like threads, locking or synchronization. Concurrent
|
|
|
|
|
and non-concurrent programs are fundamentally the same. A chosen segment of
|
|
|
|
|
the program can be made concurrent by annotating it with an appropriate
|
|
|
|
|
combinator. We can choose a combinator for lookahead style or asynchronous
|
|
|
|
|
concurrency. Concurrency is automatically scaled up or down based on the
|
|
|
|
|
demand from the consumer application, we can finally say goodbye to managing
|
|
|
|
|
thread pools and associated sizing issues. The result is truly fearless
|
|
|
|
|
and declarative monadic concurrency.
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
|
|
|
|
## Where to use streamly?
|
|
|
|
|
|
2018-10-22 11:05:54 +03:00
|
|
|
|
Streamly is a general purpose programming framwework. It can be used equally
|
|
|
|
|
efficiently from a simple `Hello World!` program to a massively concurrent
|
|
|
|
|
application. The answer to the question, "where to use streamly?" - would be
|
|
|
|
|
similar to the answer to - "Where to use Haskell lists or the IO monad?".
|
|
|
|
|
Streamly generalizes lists to monadic streams, and the `IO` monad to
|
|
|
|
|
non-deterministic and concurrent stream composition. The `IO` monad is a
|
|
|
|
|
special case of streamly; if we use single element streams the behavior of
|
|
|
|
|
streamly becomes identical to the IO monad. The IO monad code can be replaced
|
|
|
|
|
with streamly by just prefixing the IO actions with `liftIO`, without any other
|
|
|
|
|
changes, and without any loss of performance. Pure lists too are a special
|
|
|
|
|
case of streamly; if we use `Identity` as the underlying monad, streamly
|
|
|
|
|
streams turn into pure lists. Non-concurrent programs are just a special case
|
|
|
|
|
of concurrent ones, simply adding a combinator turns a non-concurrent program
|
|
|
|
|
into a concurrent one.
|
|
|
|
|
|
|
|
|
|
In other words, streamly combines the functionality of lists and IO, with
|
|
|
|
|
builtin concurrency. If you want to write a program that involves IO,
|
|
|
|
|
concurrent or not, then you can just use streamly as the base monad, in fact,
|
|
|
|
|
you could even use streamly for pure computations, as streamly performs at par
|
|
|
|
|
with pure lists or `vector`.
|
|
|
|
|
|
|
|
|
|
## Why data flow programming?
|
|
|
|
|
|
|
|
|
|
If you need some convincing for using streaming or data flow programming
|
|
|
|
|
paradigm itself then try to answer this question - why do we use lists in
|
|
|
|
|
Haskell? It boils down to why we use functional programming in the first place.
|
|
|
|
|
Haskell is successful in enforcing the functional data flow paradigm for pure
|
|
|
|
|
computations using lists, but not for monadic computations. In the absence of a
|
|
|
|
|
standard and easy to use data flow programming paradigm for monadic
|
|
|
|
|
computations, and the IO monad providing an escape hatch to an imperative
|
|
|
|
|
model, we just love to fall into the imperative trap, and start asking the same
|
|
|
|
|
fundamental question again - why do we have to use the streaming data model?
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
|
|
|
|
## Show me an example
|
|
|
|
|
|
|
|
|
|
Here is an IO monad code to list a directory recursively:
|
|
|
|
|
|
|
|
|
|
```haskell
|
|
|
|
|
import Control.Monad.IO.Class (liftIO)
|
|
|
|
|
import Path.IO (listDir, getCurrentDir) -- from path-io package
|
|
|
|
|
|
|
|
|
|
listDirRecursive = getCurrentDir >>= readdir
|
|
|
|
|
where
|
|
|
|
|
readdir dir = do
|
|
|
|
|
(dirs, files) <- listDir dir
|
|
|
|
|
liftIO $ mapM_ putStrLn
|
|
|
|
|
$ map show dirs ++ map show files
|
|
|
|
|
foldMap readdir dirs
|
|
|
|
|
```
|
2018-06-14 23:38:06 +03:00
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
This is your usual IO monad code, with no streamly specific code whatsoever.
|
|
|
|
|
This is how you can run this:
|
2018-05-04 21:53:03 +03:00
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
``` haskell
|
|
|
|
|
main :: IO ()
|
|
|
|
|
main = listDirRecursive
|
|
|
|
|
```
|
2018-05-04 23:32:41 +03:00
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
And, this is how you can run exactly the same code using streamly with
|
|
|
|
|
lookahead style concurrency, the only difference is that this time multiple
|
|
|
|
|
directories are read concurrently:
|
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
import Streamly (aheadly)
|
|
|
|
|
import qualified Streamly.Prelude as S
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
|
|
|
|
main :: IO ()
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ aheadly $ listDirRecursive
|
2018-09-13 15:07:02 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Isn't that magical? What's going on here? Streamly does not introduce any new
|
2018-10-22 11:05:54 +03:00
|
|
|
|
abstractions, it just uses standard abstractions like `Semigroup` or
|
2018-09-14 17:34:27 +03:00
|
|
|
|
`Monoid` to combine monadic streams concurrently, the way lists combine a
|
2018-10-22 11:05:54 +03:00
|
|
|
|
sequence of pure values non-concurrently. The `foldMap` in the code
|
2018-09-13 15:07:02 +03:00
|
|
|
|
above turns into a concurrent monoidal composition of a stream of `readdir`
|
|
|
|
|
computations.
|
|
|
|
|
|
|
|
|
|
## How does it perform?
|
|
|
|
|
|
|
|
|
|
Providing monadic streaming and high level declarative concurrency does not
|
|
|
|
|
mean that `streamly` compromises with performance in any way. The
|
|
|
|
|
non-concurrent performance of `streamly` competes with lists and the `vector`
|
|
|
|
|
library. The concurrent performance is as good as it gets, see [concurrency
|
|
|
|
|
benchmarks](https://github.com/composewell/concurrency-benchmarks) for detailed
|
|
|
|
|
performance results and a comparison with the `async` package.
|
2018-05-04 23:32:41 +03:00
|
|
|
|
|
2018-07-14 08:25:33 +03:00
|
|
|
|
The following chart shows a summary of the cost of key streaming operations
|
2018-09-13 15:07:02 +03:00
|
|
|
|
processing a million elements. The timings for `streamly` and `vector` are in
|
|
|
|
|
the 600-700 microseconds range and therefore can barely be seen in the graph.
|
|
|
|
|
For more details, see [streaming
|
|
|
|
|
benchmarks](https://github.com/composewell/streaming-benchmarks).
|
2018-07-14 08:25:33 +03:00
|
|
|
|
|
2018-07-14 08:27:04 +03:00
|
|
|
|
![Streaming Operations at a Glance](charts-0/KeyOperations-time.svg)
|
2018-07-14 08:25:33 +03:00
|
|
|
|
|
2019-05-15 18:10:39 +03:00
|
|
|
|
## File IO
|
|
|
|
|
|
|
|
|
|
The following code snippet implements some common Unix command line utilities
|
|
|
|
|
using streamly. To get an idea about IO streaming performance, you can
|
|
|
|
|
benchmark these against the regular unix utilities using the `time` command.
|
2019-06-18 11:54:37 +03:00
|
|
|
|
Make sure to use a big enough input file and compile with
|
|
|
|
|
`ghc -O2 -fspec-constr-recursive=10` when benchmarking. Use `+RTS -s` flags on
|
|
|
|
|
the executable to check the space usage, look for `maximum residency` in the
|
|
|
|
|
output.
|
2019-05-15 19:16:34 +03:00
|
|
|
|
|
|
|
|
|
Note that `grep -c` counts the number of lines where the pattern occurs whereas
|
|
|
|
|
the snippet below counts the total number of occurrences of the pattern,
|
|
|
|
|
therefore, the output may differ.
|
2019-05-15 18:10:39 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import qualified Streamly.Fold as FL
|
|
|
|
|
import qualified Streamly.Mem.Array as A
|
|
|
|
|
import qualified Streamly.FileSystem.File as File
|
|
|
|
|
|
|
|
|
|
import Data.Char (ord)
|
|
|
|
|
import System.Environment (getArgs)
|
|
|
|
|
import System.IO (openFile, IOMode(..), stdout)
|
|
|
|
|
|
2019-05-16 03:51:36 +03:00
|
|
|
|
cat src = File.writeArrays stdout $ File.readArraysUpto (256*1024) src
|
|
|
|
|
cp src dst = File.writeArrays dst $ File.readArraysUpto (256*1024) src
|
2019-05-15 19:16:34 +03:00
|
|
|
|
wcl src = print =<<
|
|
|
|
|
( S.length
|
2019-05-15 18:10:39 +03:00
|
|
|
|
$ FL.splitSuffixBy (== fromIntegral (ord '\n')) FL.drain
|
|
|
|
|
$ File.read src)
|
2019-05-15 19:16:34 +03:00
|
|
|
|
grepc pat src = print . (subtract 1) =<<
|
|
|
|
|
( S.length
|
2019-05-15 18:10:39 +03:00
|
|
|
|
$ FL.splitOn (A.fromList (map (fromIntegral . ord) pat)) FL.drain
|
|
|
|
|
$ File.read src)
|
2019-05-15 19:16:34 +03:00
|
|
|
|
avgll src = print =<<
|
|
|
|
|
( FL.foldl' avg
|
|
|
|
|
$ FL.splitSuffixBy (== fromIntegral (ord '\n')) FL.length
|
|
|
|
|
$ File.read src)
|
|
|
|
|
where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length
|
|
|
|
|
toDouble = fmap (fromIntegral :: Int -> Double)
|
|
|
|
|
llhisto src = print =<<
|
|
|
|
|
( FL.foldl' (FL.classify FL.length)
|
|
|
|
|
$ S.map bucket
|
|
|
|
|
$ FL.splitSuffixBy (== fromIntegral (ord '\n')) FL.length
|
|
|
|
|
$ File.read src)
|
|
|
|
|
where
|
|
|
|
|
bucket n = let i = n `div` 10 in if i > 9 then (9,n) else (i,n)
|
2019-05-15 18:10:39 +03:00
|
|
|
|
|
|
|
|
|
main = do
|
|
|
|
|
name <- fmap head getArgs
|
|
|
|
|
src <- openFile name ReadMode
|
|
|
|
|
-- cat src -- Unix cat program
|
|
|
|
|
-- wcl src -- Unix wc -l program
|
2019-05-15 19:16:34 +03:00
|
|
|
|
-- grepc "aaaa" src -- Unix grep -c program
|
2019-05-15 18:10:39 +03:00
|
|
|
|
|
|
|
|
|
-- dst <- openFile "dst.txt" WriteMode
|
|
|
|
|
-- cp src dst -- Unix cp program
|
2019-05-15 19:16:34 +03:00
|
|
|
|
|
|
|
|
|
-- avgll src -- get average line length
|
|
|
|
|
llhisto src -- get line length histogram
|
2019-05-15 18:10:39 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Streaming Pipelines
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
The following snippet provides a simple stream composition example that reads
|
|
|
|
|
numbers from stdin, prints the squares of even numbers and exits if an even
|
|
|
|
|
number more than 9 is entered.
|
2018-05-13 10:08:26 +03:00
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-05-04 21:53:03 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Data.Function ((&))
|
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $
|
2018-05-04 21:53:03 +03:00
|
|
|
|
S.repeatM getLine
|
|
|
|
|
& fmap read
|
|
|
|
|
& S.filter even
|
|
|
|
|
& S.takeWhile (<= 9)
|
|
|
|
|
& fmap (\x -> x * x)
|
|
|
|
|
& S.mapM print
|
|
|
|
|
```
|
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
Unlike `pipes` or `conduit` and like `vector` and `streaming`, `streamly`
|
|
|
|
|
composes stream data instead of stream processors (functions). A stream is
|
|
|
|
|
just like a list and is explicitly passed around to functions that process the
|
|
|
|
|
stream. Therefore, no special operator is needed to join stages in a streaming
|
|
|
|
|
pipeline, just the standard function application (`$`) or reverse function
|
|
|
|
|
application (`&`) operator is enough. Combinators are provided in
|
|
|
|
|
`Streamly.Prelude` to transform or fold streams.
|
|
|
|
|
|
2018-05-28 10:04:57 +03:00
|
|
|
|
## Concurrent Stream Generation
|
|
|
|
|
|
2018-06-14 22:14:15 +03:00
|
|
|
|
Monadic construction and generation functions e.g. `consM`, `unfoldrM`,
|
|
|
|
|
`replicateM`, `repeatM`, `iterateM` and `fromFoldableM` etc. work concurrently
|
2018-07-11 17:40:51 +03:00
|
|
|
|
when used with appropriate stream type combinator (e.g. `asyncly`, `aheadly` or
|
|
|
|
|
`parallely`).
|
2018-06-14 22:14:15 +03:00
|
|
|
|
|
|
|
|
|
The following code finishes in 3 seconds (6 seconds when serial):
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-06-14 22:14:15 +03:00
|
|
|
|
> let p n = threadDelay (n * 1000000) >> return n
|
|
|
|
|
> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
|
|
|
|
|
[3,2,1]
|
|
|
|
|
|
|
|
|
|
> S.toList $ parallely $ p 3 |: p 2 |: p 1 |: S.nil
|
|
|
|
|
[1,2,3]
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
The following finishes in 10 seconds (100 seconds when serial):
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
S.drain $ asyncly $ S.replicateM 10 $ p 10
|
2018-06-14 22:14:15 +03:00
|
|
|
|
```
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
|
|
|
|
## Concurrent Streaming Pipelines
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
Use `|&` or `|$` to apply stream processing functions concurrently. The
|
|
|
|
|
following example prints a "hello" every second; if you use `&` instead of
|
2018-05-28 10:04:57 +03:00
|
|
|
|
`|&` you will see that the delay doubles to 2 seconds instead because of serial
|
|
|
|
|
application.
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $
|
2018-05-28 10:04:57 +03:00
|
|
|
|
S.repeatM (threadDelay 1000000 >> return "hello")
|
|
|
|
|
|& S.mapM (\x -> threadDelay 1000000 >> putStrLn x)
|
|
|
|
|
```
|
|
|
|
|
|
2018-06-14 22:14:15 +03:00
|
|
|
|
## Mapping Concurrently
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
We can use `mapM` or `sequence` functions concurrently on a stream.
|
2018-06-14 22:14:15 +03:00
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-06-14 22:14:15 +03:00
|
|
|
|
> let p n = threadDelay (n * 1000000) >> return n
|
2019-05-07 23:48:40 +03:00
|
|
|
|
> S.drain $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1))
|
2018-06-14 22:14:15 +03:00
|
|
|
|
```
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Serial and Concurrent Merging
|
|
|
|
|
|
|
|
|
|
Semigroup and Monoid instances can be used to fold streams serially or
|
2018-07-11 17:40:51 +03:00
|
|
|
|
concurrently. In the following example we compose ten actions in the
|
|
|
|
|
stream, each with a delay of 1 to 10 seconds, respectively. Since all the
|
2018-05-13 07:43:26 +03:00
|
|
|
|
actions are concurrent we see one output printed every second:
|
2018-05-04 21:53:03 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Control.Concurrent (threadDelay)
|
|
|
|
|
|
|
|
|
|
main = S.toList $ parallely $ foldMap delay [1..10]
|
2018-07-11 17:40:51 +03:00
|
|
|
|
where delay n = S.yieldM $ threadDelay (n * 1000000) >> print n
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
Streams can be combined together in many ways. We provide some examples
|
|
|
|
|
below, see the tutorial for more ways. We use the following `delay`
|
2018-05-13 07:43:26 +03:00
|
|
|
|
function in the examples to demonstrate the concurrency aspects:
|
2018-05-04 21:53:03 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Control.Concurrent
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
delay n = S.yieldM $ do
|
2018-05-13 10:08:26 +03:00
|
|
|
|
threadDelay (n * 1000000)
|
|
|
|
|
tid <- myThreadId
|
|
|
|
|
putStrLn (show tid ++ ": Delay " ++ show n)
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
### Serial
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ delay 3 <> delay 2 <> delay 1
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
ThreadId 36: Delay 3
|
|
|
|
|
ThreadId 36: Delay 2
|
|
|
|
|
ThreadId 36: Delay 1
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### Parallel
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain . parallely $ delay 3 <> delay 2 <> delay 1
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
ThreadId 42: Delay 1
|
|
|
|
|
ThreadId 41: Delay 2
|
|
|
|
|
ThreadId 40: Delay 3
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## Nested Loops (aka List Transformer)
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
The monad instance composes like a list monad.
|
|
|
|
|
|
|
|
|
|
``` haskell
|
2018-02-13 01:07:57 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
|
|
|
|
|
loops = do
|
2018-05-01 01:32:46 +03:00
|
|
|
|
x <- S.fromFoldable [1,2]
|
|
|
|
|
y <- S.fromFoldable [3,4]
|
2018-07-11 17:40:51 +03:00
|
|
|
|
S.yieldM $ putStrLn $ show (x, y)
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain loops
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
(1,3)
|
|
|
|
|
(1,4)
|
|
|
|
|
(2,3)
|
|
|
|
|
(2,4)
|
|
|
|
|
```
|
2017-11-21 12:11:09 +03:00
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Concurrent Nested Loops
|
2017-11-21 12:11:09 +03:00
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
To run the above code with, lookahead style concurrency i.e. each iteration in
|
|
|
|
|
the loop can run run concurrently by but the results are presented in the same
|
|
|
|
|
order as serial execution:
|
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ aheadly $ loops
|
2018-07-11 17:40:51 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
To run it with depth first concurrency yielding results asynchronously in the
|
|
|
|
|
same order as they become available (deep async composition):
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ asyncly $ loops
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
To run it with breadth first concurrency and yeilding results asynchronously
|
|
|
|
|
(wide async composition):
|
2018-05-13 07:43:26 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ wAsyncly $ loops
|
2018-05-13 07:43:26 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
The above streams provide lazy/demand-driven concurrency which is automatically
|
|
|
|
|
scaled as per demand and is controlled/bounded so that it can be used on
|
|
|
|
|
infinite streams. The following combinator provides strict, unbounded
|
|
|
|
|
concurrency irrespective of demand:
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ parallely $ loops
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-05-13 07:43:26 +03:00
|
|
|
|
To run it serially but interleaving the outer and inner loop iterations
|
|
|
|
|
(breadth first serial):
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ wSerially $ loops
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Magical Concurrency
|
|
|
|
|
|
2018-05-01 01:32:46 +03:00
|
|
|
|
Streams can perform semigroup (<>) and monadic bind (>>=) operations
|
2018-05-13 07:43:26 +03:00
|
|
|
|
concurrently using combinators like `asyncly`, `parallelly`. For example,
|
2018-05-01 01:32:46 +03:00
|
|
|
|
to concurrently generate squares of a stream of numbers and then concurrently
|
|
|
|
|
sum the square roots of all combinations of two streams:
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2018-02-13 01:07:57 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
main = do
|
2018-05-13 07:43:26 +03:00
|
|
|
|
s <- S.sum $ asyncly $ do
|
2018-05-01 01:32:46 +03:00
|
|
|
|
-- Each square is performed concurrently, (<>) is concurrent
|
|
|
|
|
x2 <- foldMap (\x -> return $ x * x) [1..100]
|
2018-05-13 10:08:26 +03:00
|
|
|
|
y2 <- foldMap (\y -> return $ y * y) [1..100]
|
2018-05-01 01:32:46 +03:00
|
|
|
|
-- Each addition is performed concurrently, monadic bind is concurrent
|
2018-02-13 01:07:57 +03:00
|
|
|
|
return $ sqrt (x2 + y2)
|
|
|
|
|
print s
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
The concurrency facilities provided by streamly can be compared with
|
|
|
|
|
[OpenMP](https://en.wikipedia.org/wiki/OpenMP) and
|
|
|
|
|
[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative
|
2018-05-01 01:32:46 +03:00
|
|
|
|
expression.
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-08-25 06:18:39 +03:00
|
|
|
|
## Rate Limiting
|
|
|
|
|
|
|
|
|
|
For bounded concurrent streams, stream yield rate can be specified. For
|
|
|
|
|
example, to print hello once every second you can simply write this:
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import Streamly.Prelude as S
|
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ asyncly $ avgRate 1 $ S.repeatM $ putStrLn "hello"
|
2018-08-25 06:18:39 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
For some practical uses of rate control, see
|
|
|
|
|
[AcidRain.hs](https://github.com/composewell/streamly/tree/master/examples/AcidRain.hs)
|
|
|
|
|
and
|
|
|
|
|
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
|
|
|
|
|
.
|
|
|
|
|
Concurrency of the stream is automatically controlled to match the specified
|
|
|
|
|
rate. Rate control works precisely even at throughputs as high as millions of
|
|
|
|
|
yields per second. For more sophisticated rate control see the haddock
|
|
|
|
|
documentation.
|
|
|
|
|
|
2018-10-22 11:05:54 +03:00
|
|
|
|
## Exceptions
|
|
|
|
|
|
|
|
|
|
From a library user point of view, there is nothing much to learn or talk about
|
|
|
|
|
exceptions. Synchronous exceptions work just the way they are supposed to work
|
|
|
|
|
in any standard non-concurrent code. When concurrent streams are combined
|
|
|
|
|
together, exceptions from the constituent streams are propagated to the
|
|
|
|
|
consumer stream. When an exception occurs in any of the constituent streams
|
|
|
|
|
other concurrent streams are promptly terminated. Exceptions can be thrown
|
|
|
|
|
using the `MonadThrow` instance.
|
|
|
|
|
|
|
|
|
|
There is no notion of explicit threads in streamly, therefore, no
|
|
|
|
|
asynchronous exceptions to deal with. You can just ignore the zillions of
|
|
|
|
|
blogs, talks, caveats about async exceptions. Async exceptions just don't
|
|
|
|
|
exist. Please don't use things like `myThreadId` and `throwTo` just for fun!
|
|
|
|
|
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
## Reactive Programming (FRP)
|
|
|
|
|
|
|
|
|
|
Streamly is a foundation for first class reactive programming as well by virtue
|
2018-05-04 21:53:03 +03:00
|
|
|
|
of integrating concurrency and streaming. See
|
|
|
|
|
[AcidRain.hs](https://github.com/composewell/streamly/tree/master/examples/AcidRain.hs)
|
|
|
|
|
for a console based FRP game example and
|
|
|
|
|
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
|
|
|
|
|
for an SDL based animation example.
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
## Conclusion
|
|
|
|
|
|
|
|
|
|
Streamly, short for streaming concurrently, provides monadic streams, with a
|
|
|
|
|
simple API, almost identical to standard lists, and an in-built
|
|
|
|
|
support for concurrency. By using stream-style combinators on stream
|
|
|
|
|
composition, streams can be generated, merged, chained, mapped, zipped, and
|
|
|
|
|
consumed concurrently – providing a generalized high level programming
|
|
|
|
|
framework unifying streaming and concurrency. Controlled concurrency allows
|
|
|
|
|
even infinite streams to be evaluated concurrently. Concurrency is auto scaled
|
|
|
|
|
based on feedback from the stream consumer. The programmer does not have to be
|
|
|
|
|
aware of threads, locking or synchronization to write scalable concurrent
|
|
|
|
|
programs.
|
|
|
|
|
|
|
|
|
|
Streamly is a programmer first library, designed to be useful and friendly to
|
|
|
|
|
programmers for solving practical problems in a simple and concise manner. Some
|
|
|
|
|
key points in favor of streamly are:
|
|
|
|
|
|
|
|
|
|
* _Simplicity_: Simple list like streaming API, if you know how to use lists
|
|
|
|
|
then you know how to use streamly. This library is built with simplicity
|
|
|
|
|
and ease of use as a design goal.
|
|
|
|
|
* _Concurrency_: Simple, powerful, and scalable concurrency. Concurrency is
|
|
|
|
|
built-in, and not intrusive, concurrent programs are written exactly the
|
|
|
|
|
same way as non-concurrent ones.
|
|
|
|
|
* _Generality_: Unifies functionality provided by several disparate packages
|
|
|
|
|
(streaming, concurrency, list transformer, logic programming, reactive
|
|
|
|
|
programming) in a concise API.
|
|
|
|
|
* _Performance_: Streamly is designed for high performance. It employs stream
|
|
|
|
|
fusion optimizations for best possible performance. Serial peformance is
|
|
|
|
|
equivalent to the venerable `vector` library in most cases and even better
|
|
|
|
|
in some cases. Concurrent performance is unbeatable. See
|
|
|
|
|
[streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
|
|
|
|
|
for a comparison of popular streaming libraries on micro-benchmarks.
|
|
|
|
|
|
|
|
|
|
The basic streaming functionality of streamly is equivalent to that provided by
|
|
|
|
|
streaming libraries like
|
|
|
|
|
[vector](https://hackage.haskell.org/package/vector),
|
|
|
|
|
[streaming](https://hackage.haskell.org/package/streaming),
|
|
|
|
|
[pipes](https://hackage.haskell.org/package/pipes), and
|
|
|
|
|
[conduit](https://hackage.haskell.org/package/conduit).
|
|
|
|
|
In addition to providing streaming functionality, streamly subsumes
|
|
|
|
|
the functionality of list transformer libraries like `pipes` or
|
|
|
|
|
[list-t](https://hackage.haskell.org/package/list-t), and also the logic
|
|
|
|
|
programming library [logict](https://hackage.haskell.org/package/logict). On
|
|
|
|
|
the concurrency side, it subsumes the functionality of the
|
|
|
|
|
[async](https://hackage.haskell.org/package/async) package, and provides even
|
|
|
|
|
higher level concurrent composition. Because it supports
|
|
|
|
|
streaming with concurrency we can write FRP applications similar in concept to
|
|
|
|
|
[Yampa](https://hackage.haskell.org/package/Yampa) or
|
|
|
|
|
[reflex](https://hackage.haskell.org/package/reflex).
|
|
|
|
|
|
2018-10-26 21:34:55 +03:00
|
|
|
|
See the `Comparison with existing packages` section at the end of the
|
|
|
|
|
[tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html).
|
|
|
|
|
|
2018-09-12 13:49:16 +03:00
|
|
|
|
## Further Reading
|
|
|
|
|
|
|
|
|
|
For more information, see:
|
2018-09-06 13:27:29 +03:00
|
|
|
|
|
2018-10-26 21:34:55 +03:00
|
|
|
|
* [Detailed tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
|
|
|
|
|
* [Reference documentation](https://hackage.haskell.org/package/streamly)
|
|
|
|
|
* [Examples](https://github.com/composewell/streamly/tree/master/examples)
|
|
|
|
|
* [Guides](https://github.com/composewell/streamly/blob/master/docs)
|
|
|
|
|
* [Streaming benchmarks](https://github.com/composewell/streaming-benchmarks)
|
|
|
|
|
* [Concurrency benchmarks](https://github.com/composewell/concurrency-benchmarks)
|
2018-09-06 13:27:29 +03:00
|
|
|
|
|
2018-10-22 11:05:54 +03:00
|
|
|
|
## Support
|
|
|
|
|
|
|
|
|
|
If you require professional support, consulting, training or timely
|
|
|
|
|
enhancements to the library please contact
|
|
|
|
|
[support@composewell.com](mailto:support@composewell.com).
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
## Contributing
|
|
|
|
|
|
2017-12-12 20:44:40 +03:00
|
|
|
|
The code is available under BSD-3 license
|
|
|
|
|
[on github](https://github.com/composewell/streamly). Join the
|
|
|
|
|
[gitter chat](https://gitter.im/composewell/streamly) channel for discussions.
|
|
|
|
|
You can find some of the
|
|
|
|
|
[todo items on the github wiki](https://github.com/composewell/streamly/wiki/Things-To-Do).
|
|
|
|
|
Please ask on the gitter channel or [contact the maintainer directly](mailto:harendra.kumar@gmail.com)
|
|
|
|
|
for more details on each item. All contributions are welcome!
|
2017-10-20 07:06:00 +03:00
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
This library was originally inspired by the `transient` package authored by
|
|
|
|
|
Alberto G. Corona.
|