2017-11-20 15:33:13 +03:00
|
|
|
|
# Streamly
|
2017-08-31 03:59:22 +03:00
|
|
|
|
|
2017-12-05 20:56:18 +03:00
|
|
|
|
[![Hackage](https://img.shields.io/hackage/v/streamly.svg?style=flat)](https://hackage.haskell.org/package/streamly)
|
2017-11-20 21:51:27 +03:00
|
|
|
|
[![Gitter chat](https://badges.gitter.im/composewell/gitter.svg)](https://gitter.im/composewell/streamly)
|
2017-12-01 21:58:10 +03:00
|
|
|
|
[![Build Status](https://travis-ci.org/composewell/streamly.svg?branch=master)](https://travis-ci.org/composewell/streamly)
|
|
|
|
|
[![Windows Build status](https://ci.appveyor.com/api/projects/status/ajxg0c79raou9ned?svg=true)](https://ci.appveyor.com/project/harendra-kumar/streamly)
|
2018-08-12 11:40:41 +03:00
|
|
|
|
[![CircleCI](https://circleci.com/gh/composewell/streamly/tree/master.svg?style=svg)](https://circleci.com/gh/composewell/streamly/tree/master)
|
2017-12-01 21:58:10 +03:00
|
|
|
|
[![Coverage Status](https://coveralls.io/repos/composewell/streamly/badge.svg?branch=master&service=github)](https://coveralls.io/github/composewell/streamly?branch=master)
|
2017-09-04 13:37:46 +03:00
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-09-14 17:34:27 +03:00
|
|
|
|
## Streaming Concurrently
|
|
|
|
|
|
|
|
|
|
Haskell lists express pure computations using composable stream operations like
|
2018-10-22 11:05:54 +03:00
|
|
|
|
`:`, `unfold`, `map`, `filter`, `zip` and `fold`. Streamly is exactly like
|
|
|
|
|
lists except that it can express sequences of pure as well as monadic
|
|
|
|
|
computations aka streams. More importantly, it can express monadic sequences
|
|
|
|
|
with concurrent execution semantics without introducing any additional APIs.
|
|
|
|
|
|
|
|
|
|
Streamly expresses concurrency using standard, well known abstractions.
|
|
|
|
|
Concurrency semantics are defined for list operations, semigroup, applicative
|
|
|
|
|
and monadic compositions. Programmer does not need to know any low level
|
|
|
|
|
notions of concurrency like threads, locking or synchronization. Concurrent
|
|
|
|
|
and non-concurrent programs are fundamentally the same. A chosen segment of
|
|
|
|
|
the program can be made concurrent by annotating it with an appropriate
|
|
|
|
|
combinator. We can choose a combinator for lookahead style or asynchronous
|
|
|
|
|
concurrency. Concurrency is automatically scaled up or down based on the
|
|
|
|
|
demand from the consumer application, we can finally say goodbye to managing
|
|
|
|
|
thread pools and associated sizing issues. The result is truly fearless
|
|
|
|
|
and declarative monadic concurrency.
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
|
|
|
|
## Where to use streamly?
|
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
Streamly is a general purpose programming framework. It can be used equally
|
2018-10-22 11:05:54 +03:00
|
|
|
|
efficiently from a simple `Hello World!` program to a massively concurrent
|
|
|
|
|
application. The answer to the question, "where to use streamly?" - would be
|
|
|
|
|
similar to the answer to - "Where to use Haskell lists or the IO monad?".
|
2019-07-29 22:57:24 +03:00
|
|
|
|
|
|
|
|
|
Streamly simplifies streaming and makes it as intuitive as plain lists. Unlike
|
|
|
|
|
other streaming libraries, no fancy types are required. Streamly is simply a
|
|
|
|
|
generalization of Haskell lists to monadic streaming optionally with concurrent
|
2019-07-31 00:34:05 +03:00
|
|
|
|
composition. The basic stream type in streamly `SerialT m a` can be considered
|
|
|
|
|
as a list type `[a]` parameterized by the monad `m`. For example, `SerialT IO
|
|
|
|
|
a` is a moral equivalent of `[a]` in the IO monad. `SerialT Identity a`, is
|
|
|
|
|
equivalent to pure lists. Streams are constructed very much like lists, except
|
|
|
|
|
that they use `nil` and `cons` instead of `[]` and `:`. Unlike lists, streams
|
|
|
|
|
can be constructed from monadic effects, not just pure elements. Streams are
|
|
|
|
|
processed just like lists, with list like combinators, except that they are
|
|
|
|
|
monadic and work in a streaming fashion. In other words streamly just completes
|
|
|
|
|
what lists lack, you do not need to learn anything new.
|
2019-07-29 22:57:24 +03:00
|
|
|
|
|
|
|
|
|
Not surprisingly, the monad instance of streamly is a list transformer, with
|
|
|
|
|
concurrency capability.
|
2018-10-22 11:05:54 +03:00
|
|
|
|
|
|
|
|
|
## Why data flow programming?
|
|
|
|
|
|
|
|
|
|
If you need some convincing for using streaming or data flow programming
|
|
|
|
|
paradigm itself then try to answer this question - why do we use lists in
|
|
|
|
|
Haskell? It boils down to why we use functional programming in the first place.
|
|
|
|
|
Haskell is successful in enforcing the functional data flow paradigm for pure
|
|
|
|
|
computations using lists, but not for monadic computations. In the absence of a
|
|
|
|
|
standard and easy to use data flow programming paradigm for monadic
|
|
|
|
|
computations, and the IO monad providing an escape hatch to an imperative
|
|
|
|
|
model, we just love to fall into the imperative trap, and start asking the same
|
|
|
|
|
fundamental question again - why do we have to use the streaming data model?
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
2019-07-30 00:31:50 +03:00
|
|
|
|
## Comparative Performance
|
|
|
|
|
|
|
|
|
|
High performance and simplicity are the two primary goals of streamly.
|
|
|
|
|
`Streamly` employs two different stream representations (CPS and direct style)
|
|
|
|
|
and interconverts between the two to get the best of both worlds on different
|
|
|
|
|
operations. It uses both foldr/build (for CPS style) and stream fusion (for
|
|
|
|
|
direct style) techniques to fuse operations. In terms of performance,
|
|
|
|
|
Streamly's goal is to compete with equivalent C programs. Streamly redefines
|
|
|
|
|
"blazing fast" for streaming libraries, it competes with lists and `vector`.
|
|
|
|
|
Other streaming libraries like "streaming", "pipes" and "conduit" are orders of
|
|
|
|
|
magnitude slower on most microbenchmarks. See [streaming
|
|
|
|
|
benchmarks](https://github.com/composewell/streaming-benchmarks) for detailed
|
|
|
|
|
comparison.
|
|
|
|
|
|
|
|
|
|
The following chart shows a comparison of those streamly and list operations
|
|
|
|
|
where performance of the two differs by more than 10%. Positive y-axis displays
|
|
|
|
|
how many times worse is a list operation compared to the same streamly
|
|
|
|
|
operation, negative y-axis shows where streamly is worse compared to lists.
|
|
|
|
|
|
|
|
|
|
![Streamly vs Lists (time) comparison](charts-0/streamly-vs-list-time.svg)
|
|
|
|
|
|
|
|
|
|
Streamly uses lock-free synchronization for concurrent operations. It employs
|
|
|
|
|
auto-scaling of the degree of concurrency based on demand. For CPU bound tasks
|
|
|
|
|
it tries to keep the threads close to the number of CPUs available whereas for
|
|
|
|
|
IO bound tasks more threads can be utilized. Parallelism can be utilized with
|
|
|
|
|
little overhead even if the task size is very small. See [concurrency
|
|
|
|
|
benchmarks](https://github.com/composewell/concurrency-benchmarks) for detailed
|
|
|
|
|
performance results and a comparison with the `async` package.
|
|
|
|
|
|
2019-06-26 15:54:03 +03:00
|
|
|
|
## Installing and using
|
|
|
|
|
|
|
|
|
|
Please see
|
|
|
|
|
[INSTALL.md](https://github.com/composewell/streamly/tree/master/INSTALL.md)
|
|
|
|
|
for instructions on how to use streamly with your Haskell build tool or package
|
|
|
|
|
manager. You may want to go through it before jumping to run the examples
|
|
|
|
|
below.
|
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
The module `Streamly` provides just the core stream types, type casting and
|
|
|
|
|
concurrency control combinators. Stream construction, transformation, folding,
|
|
|
|
|
merging, zipping combinators are found in `Streamly.Prelude`.
|
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Streaming Pipelines
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
The following snippet provides a simple stream composition example that reads
|
|
|
|
|
numbers from stdin, prints the squares of even numbers and exits if an even
|
|
|
|
|
number more than 9 is entered.
|
2018-05-13 10:08:26 +03:00
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-05-04 21:53:03 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Data.Function ((&))
|
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $
|
2018-05-04 21:53:03 +03:00
|
|
|
|
S.repeatM getLine
|
|
|
|
|
& fmap read
|
|
|
|
|
& S.filter even
|
|
|
|
|
& S.takeWhile (<= 9)
|
|
|
|
|
& fmap (\x -> x * x)
|
|
|
|
|
& S.mapM print
|
|
|
|
|
```
|
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
Unlike `pipes` or `conduit` and like `vector` and `streaming`, `streamly`
|
|
|
|
|
composes stream data instead of stream processors (functions). A stream is
|
|
|
|
|
just like a list and is explicitly passed around to functions that process the
|
|
|
|
|
stream. Therefore, no special operator is needed to join stages in a streaming
|
|
|
|
|
pipeline, just the standard function application (`$`) or reverse function
|
2019-07-29 22:57:24 +03:00
|
|
|
|
application (`&`) operator is enough.
|
2018-09-13 15:07:02 +03:00
|
|
|
|
|
2018-05-28 10:04:57 +03:00
|
|
|
|
## Concurrent Stream Generation
|
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
`consM` or its operator form `|:` can be used to construct a stream from
|
|
|
|
|
monadic actions. A stream constructed with `consM` can run the monadic actions
|
|
|
|
|
in the stream concurrently when used with appropriate stream type combinator
|
|
|
|
|
(e.g. `asyncly`, `aheadly` or `parallely`).
|
2018-06-14 22:14:15 +03:00
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
The following code finishes in 3 seconds (6 seconds when serial), note the
|
|
|
|
|
order of elements in the resulting output, the outputs are consumed as soon as
|
|
|
|
|
each action is finished (asyncly):
|
2018-06-14 22:14:15 +03:00
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-06-14 22:14:15 +03:00
|
|
|
|
> let p n = threadDelay (n * 1000000) >> return n
|
2019-07-29 22:57:24 +03:00
|
|
|
|
> S.toList $ asyncly $ p 3 |: p 2 |: p 1 |: S.nil
|
|
|
|
|
[1,2,3]
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Use `aheadly` if you want speculative concurrency i.e. execute the actions in
|
|
|
|
|
the stream concurrently but consume the results in the specified order:
|
|
|
|
|
|
|
|
|
|
``` haskell
|
2018-06-14 22:14:15 +03:00
|
|
|
|
> S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
|
|
|
|
|
[3,2,1]
|
|
|
|
|
```
|
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
Monadic stream generation functions e.g. `unfoldrM`, `replicateM`, `repeatM`,
|
|
|
|
|
`iterateM` and `fromFoldableM` etc. can work concurrently.
|
|
|
|
|
|
2018-06-14 22:14:15 +03:00
|
|
|
|
The following finishes in 10 seconds (100 seconds when serial):
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
S.drain $ asyncly $ S.replicateM 10 $ p 10
|
2018-06-14 22:14:15 +03:00
|
|
|
|
```
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
## Concurrency Auto Scaling
|
|
|
|
|
|
|
|
|
|
Concurrency is auto-scaled i.e. more actions are executed concurrently if the
|
|
|
|
|
consumer is consuming the stream at a higher speed. How many tasks are executed
|
|
|
|
|
concurrently can be controlled by `maxThreads` and how many results are
|
|
|
|
|
buffered ahead of consumption can be controlled by `maxBuffer`. See the
|
|
|
|
|
documentation in the `Streamly` module.
|
|
|
|
|
|
2018-05-28 10:04:57 +03:00
|
|
|
|
## Concurrent Streaming Pipelines
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
Use `|&` or `|$` to apply stream processing functions concurrently. The
|
|
|
|
|
following example prints a "hello" every second; if you use `&` instead of
|
2018-05-28 10:04:57 +03:00
|
|
|
|
`|&` you will see that the delay doubles to 2 seconds instead because of serial
|
|
|
|
|
application.
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $
|
2018-05-28 10:04:57 +03:00
|
|
|
|
S.repeatM (threadDelay 1000000 >> return "hello")
|
|
|
|
|
|& S.mapM (\x -> threadDelay 1000000 >> putStrLn x)
|
|
|
|
|
```
|
|
|
|
|
|
2018-06-14 22:14:15 +03:00
|
|
|
|
## Mapping Concurrently
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
We can use `mapM` or `sequence` functions concurrently on a stream.
|
2018-06-14 22:14:15 +03:00
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2018-06-14 22:14:15 +03:00
|
|
|
|
> let p n = threadDelay (n * 1000000) >> return n
|
2019-05-07 23:48:40 +03:00
|
|
|
|
> S.drain $ aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ repeatM (p 1))
|
2018-06-14 22:14:15 +03:00
|
|
|
|
```
|
2018-05-28 10:04:57 +03:00
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Serial and Concurrent Merging
|
|
|
|
|
|
|
|
|
|
Semigroup and Monoid instances can be used to fold streams serially or
|
2018-07-11 17:40:51 +03:00
|
|
|
|
concurrently. In the following example we compose ten actions in the
|
|
|
|
|
stream, each with a delay of 1 to 10 seconds, respectively. Since all the
|
2018-05-13 07:43:26 +03:00
|
|
|
|
actions are concurrent we see one output printed every second:
|
2018-05-04 21:53:03 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Control.Concurrent (threadDelay)
|
|
|
|
|
|
|
|
|
|
main = S.toList $ parallely $ foldMap delay [1..10]
|
2018-07-11 17:40:51 +03:00
|
|
|
|
where delay n = S.yieldM $ threadDelay (n * 1000000) >> print n
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
Streams can be combined together in many ways. We provide some examples
|
|
|
|
|
below, see the tutorial for more ways. We use the following `delay`
|
2018-05-13 07:43:26 +03:00
|
|
|
|
function in the examples to demonstrate the concurrency aspects:
|
2018-05-04 21:53:03 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import Control.Concurrent
|
|
|
|
|
|
2018-07-11 17:40:51 +03:00
|
|
|
|
delay n = S.yieldM $ do
|
2018-05-13 10:08:26 +03:00
|
|
|
|
threadDelay (n * 1000000)
|
|
|
|
|
tid <- myThreadId
|
|
|
|
|
putStrLn (show tid ++ ": Delay " ++ show n)
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
### Serial
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ delay 3 <> delay 2 <> delay 1
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
ThreadId 36: Delay 3
|
|
|
|
|
ThreadId 36: Delay 2
|
|
|
|
|
ThreadId 36: Delay 1
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### Parallel
|
|
|
|
|
|
2018-07-18 17:38:29 +03:00
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain . parallely $ delay 3 <> delay 2 <> delay 1
|
2018-05-04 21:53:03 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
ThreadId 42: Delay 1
|
|
|
|
|
ThreadId 41: Delay 2
|
|
|
|
|
ThreadId 40: Delay 3
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## Nested Loops (aka List Transformer)
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
The monad instance composes like a list monad.
|
|
|
|
|
|
|
|
|
|
``` haskell
|
2018-02-13 01:07:57 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
|
|
|
|
|
loops = do
|
2018-05-01 01:32:46 +03:00
|
|
|
|
x <- S.fromFoldable [1,2]
|
|
|
|
|
y <- S.fromFoldable [3,4]
|
2018-07-11 17:40:51 +03:00
|
|
|
|
S.yieldM $ putStrLn $ show (x, y)
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain loops
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
```
|
|
|
|
|
(1,3)
|
|
|
|
|
(1,4)
|
|
|
|
|
(2,3)
|
|
|
|
|
(2,4)
|
|
|
|
|
```
|
2017-11-21 12:11:09 +03:00
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Concurrent Nested Loops
|
2017-11-21 12:11:09 +03:00
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
To run the above code with speculative concurrency i.e. each iteration in the
|
|
|
|
|
loop can run concurrently but the results are presented to the consumer of the
|
|
|
|
|
output in the same order as serial execution:
|
2018-07-11 17:40:51 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ aheadly $ loops
|
2018-07-11 17:40:51 +03:00
|
|
|
|
```
|
|
|
|
|
|
2019-07-29 22:57:24 +03:00
|
|
|
|
Different stream types execute the loop iterations in different ways. For
|
|
|
|
|
example, `wSerially` interleaves the loop iterations. There are several
|
|
|
|
|
concurrent stream styles to execute the loop iterations concurrently in
|
|
|
|
|
different ways, see the `Streamly.Tutorial` module for a detailed treatment.
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-05-04 21:53:03 +03:00
|
|
|
|
## Magical Concurrency
|
|
|
|
|
|
2018-05-01 01:32:46 +03:00
|
|
|
|
Streams can perform semigroup (<>) and monadic bind (>>=) operations
|
2018-05-13 07:43:26 +03:00
|
|
|
|
concurrently using combinators like `asyncly`, `parallelly`. For example,
|
2018-05-01 01:32:46 +03:00
|
|
|
|
to concurrently generate squares of a stream of numbers and then concurrently
|
|
|
|
|
sum the square roots of all combinations of two streams:
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2018-02-13 01:07:57 +03:00
|
|
|
|
import Streamly
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
main = do
|
2018-05-13 07:43:26 +03:00
|
|
|
|
s <- S.sum $ asyncly $ do
|
2018-05-01 01:32:46 +03:00
|
|
|
|
-- Each square is performed concurrently, (<>) is concurrent
|
|
|
|
|
x2 <- foldMap (\x -> return $ x * x) [1..100]
|
2018-05-13 10:08:26 +03:00
|
|
|
|
y2 <- foldMap (\y -> return $ y * y) [1..100]
|
2018-05-01 01:32:46 +03:00
|
|
|
|
-- Each addition is performed concurrently, monadic bind is concurrent
|
2018-02-13 01:07:57 +03:00
|
|
|
|
return $ sqrt (x2 + y2)
|
|
|
|
|
print s
|
2017-12-02 18:39:06 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
The concurrency facilities provided by streamly can be compared with
|
|
|
|
|
[OpenMP](https://en.wikipedia.org/wiki/OpenMP) and
|
|
|
|
|
[Cilk](https://en.wikipedia.org/wiki/Cilk) but with a more declarative
|
2018-05-01 01:32:46 +03:00
|
|
|
|
expression.
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2019-07-30 00:41:03 +03:00
|
|
|
|
## Example: Listing Directories Recursively/Concurrently
|
|
|
|
|
|
|
|
|
|
The following code snippet lists a directory tree recursively, reading multiple
|
|
|
|
|
directories concurrently:
|
|
|
|
|
|
|
|
|
|
```haskell
|
|
|
|
|
import Control.Monad.IO.Class (liftIO)
|
|
|
|
|
import Path.IO (listDir, getCurrentDir) -- from path-io package
|
|
|
|
|
import Streamly (AsyncT, adapt)
|
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
|
|
|
|
|
listDirRecursive :: AsyncT IO ()
|
|
|
|
|
listDirRecursive = getCurrentDir >>= readdir >>= liftIO . mapM_ putStrLn
|
|
|
|
|
where
|
|
|
|
|
readdir dir = do
|
|
|
|
|
(dirs, files) <- listDir dir
|
|
|
|
|
S.yield (map show dirs ++ map show files) <> foldMap readdir dirs
|
|
|
|
|
|
|
|
|
|
main :: IO ()
|
|
|
|
|
main = S.drain $ adapt $ listDirRecursive
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
`AsyncT` is a stream monad transformer. If you are familiar with a list
|
|
|
|
|
transformer, it is nothing but `ListT` with concurrency semantics. For example,
|
|
|
|
|
the semigroup operation `<>` is concurrent. This makes `foldMap` concurrent
|
|
|
|
|
too. You can replace `AsyncT` with `SerialT` and the above code will become
|
|
|
|
|
serial, exactly equivalent to a `ListT`.
|
|
|
|
|
|
2018-08-25 06:18:39 +03:00
|
|
|
|
## Rate Limiting
|
|
|
|
|
|
|
|
|
|
For bounded concurrent streams, stream yield rate can be specified. For
|
|
|
|
|
example, to print hello once every second you can simply write this:
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
import Streamly
|
|
|
|
|
import Streamly.Prelude as S
|
|
|
|
|
|
2019-05-07 23:48:40 +03:00
|
|
|
|
main = S.drain $ asyncly $ avgRate 1 $ S.repeatM $ putStrLn "hello"
|
2018-08-25 06:18:39 +03:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
For some practical uses of rate control, see
|
|
|
|
|
[AcidRain.hs](https://github.com/composewell/streamly/tree/master/examples/AcidRain.hs)
|
|
|
|
|
and
|
|
|
|
|
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
|
|
|
|
|
.
|
|
|
|
|
Concurrency of the stream is automatically controlled to match the specified
|
|
|
|
|
rate. Rate control works precisely even at throughputs as high as millions of
|
|
|
|
|
yields per second. For more sophisticated rate control see the haddock
|
|
|
|
|
documentation.
|
|
|
|
|
|
2019-07-30 01:15:47 +03:00
|
|
|
|
## Arrays
|
|
|
|
|
|
|
|
|
|
The `Streamly.Mem.Array` module provides mutable arrays. Arrays are the
|
|
|
|
|
computing duals of streams. Streams are good at sequential access, immutable
|
|
|
|
|
transformations of in-transit data whereas arrays are good at random access,
|
|
|
|
|
in-place transformations of buffered data. Unlike streams which are potentially
|
|
|
|
|
infinite, arrays are necessarily finite. Arrays can be used as an efficient
|
|
|
|
|
interface between streams and external storage systems like memory, files and
|
|
|
|
|
network. Streams and arrays complete each other to provide a general purpose
|
|
|
|
|
computing system. The design of streamly as a general purpose computing
|
|
|
|
|
framework is centered around these two fundamental aspects of computing and
|
|
|
|
|
storage.
|
|
|
|
|
|
|
|
|
|
Arrays in streamly use pinned memory outside GC and therefore avoid any GC
|
|
|
|
|
overhead for the storage in arrays. `ByteString` data type from the
|
|
|
|
|
`bytestring` package and the `Text` data type from the `text` package are just
|
|
|
|
|
special cases of arrays. `ByteString` is equivalent to `Array Word8` and `Text`
|
|
|
|
|
is equivalent to a `utf16` encoded `Array Word8`. All the bytestring and text
|
|
|
|
|
operations can be performed on arrays with equivalent or better performance by
|
|
|
|
|
converting them to and from streams.
|
|
|
|
|
|
|
|
|
|
## Folds
|
|
|
|
|
|
|
|
|
|
`Streamly.Fold` module provides composable left folds. A `Fold` is a consumer
|
|
|
|
|
of a stream. Folds can be used to fold a stream. Folds can be composed in many
|
|
|
|
|
ways, a stream can be distributed to multiple folds, or it can be partitioned
|
|
|
|
|
across multiple folds, or demultiplexed over multiple folds, or unzipped to two
|
|
|
|
|
folds. We can also use folds to fold segments of stream generating a stream of
|
|
|
|
|
the folded results.
|
|
|
|
|
|
|
|
|
|
If you are familiar with the `foldl` library, these are the same composable
|
|
|
|
|
left folds but simpler and better integrated with streamly, and with many more
|
|
|
|
|
powerful ways of composing and applying them.
|
|
|
|
|
|
2019-07-30 00:41:03 +03:00
|
|
|
|
## File IO
|
|
|
|
|
|
2019-07-30 02:30:09 +03:00
|
|
|
|
The following code snippets implement some common Unix command line utilities
|
|
|
|
|
using streamly. You can compile these with `ghc -O2
|
|
|
|
|
-fspec-constr-recursive=10` and compare the performance with regular GNU
|
|
|
|
|
coreutils available on your system. Source file
|
|
|
|
|
[HandleIO.hs](https://github.com/composewell/streamly/tree/master/examples/HandleIO.hs)
|
|
|
|
|
in the examples directory includes these examples.
|
2019-07-30 00:41:03 +03:00
|
|
|
|
|
|
|
|
|
``` haskell
|
2019-07-30 02:30:09 +03:00
|
|
|
|
module Main where
|
|
|
|
|
|
2019-07-30 00:41:03 +03:00
|
|
|
|
import qualified Streamly.Prelude as S
|
|
|
|
|
import qualified Streamly.Fold as FL
|
|
|
|
|
import qualified Streamly.Mem.Array as A
|
2019-07-30 02:30:09 +03:00
|
|
|
|
import qualified Streamly.FileSystem.Handle as FH
|
|
|
|
|
import qualified System.IO as FH
|
2019-07-30 00:41:03 +03:00
|
|
|
|
|
|
|
|
|
import Data.Char (ord)
|
|
|
|
|
import System.Environment (getArgs)
|
|
|
|
|
import System.IO (openFile, IOMode(..), stdout)
|
|
|
|
|
|
2019-07-30 02:30:09 +03:00
|
|
|
|
withArg f = do
|
|
|
|
|
(name : _) <- getArgs
|
|
|
|
|
src <- openFile name ReadMode
|
|
|
|
|
f src
|
|
|
|
|
|
|
|
|
|
withArg2 f = do
|
|
|
|
|
(sname : dname : _) <- getArgs
|
|
|
|
|
src <- openFile sname ReadMode
|
|
|
|
|
dst <- openFile dname WriteMode
|
|
|
|
|
f src dst
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### cat
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
cat = FH.writeArrays stdout . FH.readArraysOfUpto (256*1024)
|
|
|
|
|
main = withArg cat
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### cp
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
cp src dst = FH.writeArrays dst $ FH.readArraysOfUpto (256*1024) src
|
|
|
|
|
main = withArg2 cp
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### wc -l
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
wcl =
|
|
|
|
|
S.length
|
2019-08-02 04:26:52 +03:00
|
|
|
|
. FL.splitBySuffix (== fromIntegral (ord '\n')) FL.drain
|
2019-07-30 02:30:09 +03:00
|
|
|
|
. FH.read
|
|
|
|
|
main = withArg wcl >>= print
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### Average Line Length
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
avgll =
|
|
|
|
|
FL.foldl' avg
|
2019-08-02 04:26:52 +03:00
|
|
|
|
. FL.splitBySuffix (== fromIntegral (ord '\n')) FL.length
|
2019-07-30 02:30:09 +03:00
|
|
|
|
. FH.read
|
|
|
|
|
|
|
|
|
|
where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length
|
2019-07-30 00:41:03 +03:00
|
|
|
|
toDouble = fmap (fromIntegral :: Int -> Double)
|
|
|
|
|
|
2019-07-30 02:30:09 +03:00
|
|
|
|
main = withArg avgll >>= print
|
|
|
|
|
```
|
2019-07-30 00:41:03 +03:00
|
|
|
|
|
2019-07-30 02:30:09 +03:00
|
|
|
|
### Line Length Histogram
|
|
|
|
|
|
|
|
|
|
``` haskell
|
|
|
|
|
llhisto =
|
|
|
|
|
FL.foldl' (FL.classify FL.length)
|
|
|
|
|
. S.map bucket
|
2019-08-02 04:26:52 +03:00
|
|
|
|
. FL.splitBySuffix (== fromIntegral (ord '\n')) FL.length
|
2019-07-30 02:30:09 +03:00
|
|
|
|
. FH.read
|
|
|
|
|
|
|
|
|
|
where
|
|
|
|
|
bucket n = let i = n `div` 10 in if i > 9 then (9,n) else (i,n)
|
2019-07-30 00:41:03 +03:00
|
|
|
|
|
2019-07-30 02:30:09 +03:00
|
|
|
|
main = withArg llhisto >>= print
|
2019-07-30 00:41:03 +03:00
|
|
|
|
```
|
|
|
|
|
|
2019-07-30 01:35:29 +03:00
|
|
|
|
## Socket IO
|
|
|
|
|
|
|
|
|
|
Its easy to build concurrent client and server programs using streamly.
|
|
|
|
|
`Streamly.Network.*` modules provide easy combinators to build network servers
|
|
|
|
|
and client programs using streamly. See
|
|
|
|
|
[FromFileClient.hs](https://github.com/composewell/streamly/tree/master/examples/FromFileClient.hs),
|
|
|
|
|
[EchoServer.hs](https://github.com/composewell/streamly/tree/master/examples/EchoServer.hs),
|
|
|
|
|
[FileSinkServer.hs](https://github.com/composewell/streamly/tree/master/examples/FileSinkServer.hs)
|
|
|
|
|
in the examples directory.
|
|
|
|
|
|
2018-10-22 11:05:54 +03:00
|
|
|
|
## Exceptions
|
|
|
|
|
|
2019-07-30 00:50:29 +03:00
|
|
|
|
Exceptions can be thrown at any point using the `MonadThrow` instance. Standard
|
|
|
|
|
exception handling combinators like `bracket`, `finally`, `handle`,
|
|
|
|
|
`onException` are provided in `Streamly.Prelude` module.
|
|
|
|
|
|
|
|
|
|
In presence of concurrency, synchronous exceptions work just the way they are
|
|
|
|
|
supposed to work in non-concurrent code. When concurrent streams
|
|
|
|
|
are combined together, exceptions from the constituent streams are propagated
|
|
|
|
|
to the consumer stream. When an exception occurs in any of the constituent
|
|
|
|
|
streams other concurrent streams are promptly terminated.
|
2018-10-22 11:05:54 +03:00
|
|
|
|
|
|
|
|
|
There is no notion of explicit threads in streamly, therefore, no
|
|
|
|
|
asynchronous exceptions to deal with. You can just ignore the zillions of
|
|
|
|
|
blogs, talks, caveats about async exceptions. Async exceptions just don't
|
|
|
|
|
exist. Please don't use things like `myThreadId` and `throwTo` just for fun!
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
## Reactive Programming (FRP)
|
|
|
|
|
|
|
|
|
|
Streamly is a foundation for first class reactive programming as well by virtue
|
2018-05-04 21:53:03 +03:00
|
|
|
|
of integrating concurrency and streaming. See
|
|
|
|
|
[AcidRain.hs](https://github.com/composewell/streamly/tree/master/examples/AcidRain.hs)
|
|
|
|
|
for a console based FRP game example and
|
|
|
|
|
[CirclingSquare.hs](https://github.com/composewell/streamly/tree/master/examples/CirclingSquare.hs)
|
|
|
|
|
for an SDL based animation example.
|
2017-12-02 18:39:06 +03:00
|
|
|
|
|
2018-09-13 15:07:02 +03:00
|
|
|
|
## Conclusion
|
|
|
|
|
|
|
|
|
|
Streamly, short for streaming concurrently, provides monadic streams, with a
|
|
|
|
|
simple API, almost identical to standard lists, and an in-built
|
|
|
|
|
support for concurrency. By using stream-style combinators on stream
|
|
|
|
|
composition, streams can be generated, merged, chained, mapped, zipped, and
|
|
|
|
|
consumed concurrently – providing a generalized high level programming
|
|
|
|
|
framework unifying streaming and concurrency. Controlled concurrency allows
|
|
|
|
|
even infinite streams to be evaluated concurrently. Concurrency is auto scaled
|
|
|
|
|
based on feedback from the stream consumer. The programmer does not have to be
|
|
|
|
|
aware of threads, locking or synchronization to write scalable concurrent
|
|
|
|
|
programs.
|
|
|
|
|
|
|
|
|
|
Streamly is a programmer first library, designed to be useful and friendly to
|
|
|
|
|
programmers for solving practical problems in a simple and concise manner. Some
|
|
|
|
|
key points in favor of streamly are:
|
|
|
|
|
|
|
|
|
|
* _Simplicity_: Simple list like streaming API, if you know how to use lists
|
|
|
|
|
then you know how to use streamly. This library is built with simplicity
|
|
|
|
|
and ease of use as a design goal.
|
|
|
|
|
* _Concurrency_: Simple, powerful, and scalable concurrency. Concurrency is
|
|
|
|
|
built-in, and not intrusive, concurrent programs are written exactly the
|
|
|
|
|
same way as non-concurrent ones.
|
|
|
|
|
* _Generality_: Unifies functionality provided by several disparate packages
|
|
|
|
|
(streaming, concurrency, list transformer, logic programming, reactive
|
|
|
|
|
programming) in a concise API.
|
|
|
|
|
* _Performance_: Streamly is designed for high performance. It employs stream
|
|
|
|
|
fusion optimizations for best possible performance. Serial peformance is
|
|
|
|
|
equivalent to the venerable `vector` library in most cases and even better
|
|
|
|
|
in some cases. Concurrent performance is unbeatable. See
|
|
|
|
|
[streaming-benchmarks](https://github.com/composewell/streaming-benchmarks)
|
|
|
|
|
for a comparison of popular streaming libraries on micro-benchmarks.
|
|
|
|
|
|
|
|
|
|
The basic streaming functionality of streamly is equivalent to that provided by
|
|
|
|
|
streaming libraries like
|
|
|
|
|
[vector](https://hackage.haskell.org/package/vector),
|
|
|
|
|
[streaming](https://hackage.haskell.org/package/streaming),
|
|
|
|
|
[pipes](https://hackage.haskell.org/package/pipes), and
|
|
|
|
|
[conduit](https://hackage.haskell.org/package/conduit).
|
|
|
|
|
In addition to providing streaming functionality, streamly subsumes
|
|
|
|
|
the functionality of list transformer libraries like `pipes` or
|
|
|
|
|
[list-t](https://hackage.haskell.org/package/list-t), and also the logic
|
|
|
|
|
programming library [logict](https://hackage.haskell.org/package/logict). On
|
|
|
|
|
the concurrency side, it subsumes the functionality of the
|
|
|
|
|
[async](https://hackage.haskell.org/package/async) package, and provides even
|
|
|
|
|
higher level concurrent composition. Because it supports
|
|
|
|
|
streaming with concurrency we can write FRP applications similar in concept to
|
|
|
|
|
[Yampa](https://hackage.haskell.org/package/Yampa) or
|
|
|
|
|
[reflex](https://hackage.haskell.org/package/reflex).
|
|
|
|
|
|
2018-10-26 21:34:55 +03:00
|
|
|
|
See the `Comparison with existing packages` section at the end of the
|
|
|
|
|
[tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html).
|
|
|
|
|
|
2018-09-12 13:49:16 +03:00
|
|
|
|
## Further Reading
|
|
|
|
|
|
|
|
|
|
For more information, see:
|
2018-09-06 13:27:29 +03:00
|
|
|
|
|
2018-10-26 21:34:55 +03:00
|
|
|
|
* [Detailed tutorial](https://hackage.haskell.org/package/streamly/docs/Streamly-Tutorial.html)
|
|
|
|
|
* [Reference documentation](https://hackage.haskell.org/package/streamly)
|
|
|
|
|
* [Examples](https://github.com/composewell/streamly/tree/master/examples)
|
|
|
|
|
* [Guides](https://github.com/composewell/streamly/blob/master/docs)
|
|
|
|
|
* [Streaming benchmarks](https://github.com/composewell/streaming-benchmarks)
|
|
|
|
|
* [Concurrency benchmarks](https://github.com/composewell/concurrency-benchmarks)
|
2018-09-06 13:27:29 +03:00
|
|
|
|
|
2018-10-22 11:05:54 +03:00
|
|
|
|
## Support
|
|
|
|
|
|
2019-06-26 15:54:03 +03:00
|
|
|
|
Please feel free to ask questions on the
|
|
|
|
|
[streamly gitter channel](https://gitter.im/composewell/streamly).
|
2018-10-22 11:05:54 +03:00
|
|
|
|
If you require professional support, consulting, training or timely
|
|
|
|
|
enhancements to the library please contact
|
|
|
|
|
[support@composewell.com](mailto:support@composewell.com).
|
|
|
|
|
|
2019-01-21 07:36:50 +03:00
|
|
|
|
## Credits
|
|
|
|
|
|
|
|
|
|
The following authors/libraries have influenced or inspired this library in a
|
|
|
|
|
significant way:
|
|
|
|
|
|
|
|
|
|
* Roman Leshchinskiy (vector)
|
|
|
|
|
* Gabriel Gonzalez (foldl)
|
|
|
|
|
* Alberto G. Corona (transient)
|
|
|
|
|
|
|
|
|
|
See the `credits` directory for full list of contributors, credits and licenses.
|
|
|
|
|
|
2017-12-02 18:39:06 +03:00
|
|
|
|
## Contributing
|
|
|
|
|
|
2017-12-12 20:44:40 +03:00
|
|
|
|
The code is available under BSD-3 license
|
|
|
|
|
[on github](https://github.com/composewell/streamly). Join the
|
|
|
|
|
[gitter chat](https://gitter.im/composewell/streamly) channel for discussions.
|
|
|
|
|
You can find some of the
|
|
|
|
|
[todo items on the github wiki](https://github.com/composewell/streamly/wiki/Things-To-Do).
|
|
|
|
|
Please ask on the gitter channel or [contact the maintainer directly](mailto:harendra.kumar@gmail.com)
|
|
|
|
|
for more details on each item. All contributions are welcome!
|