feat(stream): implement more helpers and combinators

This commit is contained in:
Tylor Steinberger 2017-03-13 05:29:32 -04:00
parent 820bb35cd5
commit 8e8b5166ce
19 changed files with 1136 additions and 248 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@
/generated-docs/
/.psc*
/.psa*
src/Main.purs

View File

@ -9,8 +9,7 @@
"dependencies": {
"purescript-eff": "^2.0.0",
"purescript-prelude": "^2.5.0",
"purescript-aff-promise": "^0.4.0",
"purescript-aff": "^2.0.3"
"purescript-refs": "^2.0.0"
},
"devDependencies": {
"purescript-console": "^2.0.0",

View File

@ -0,0 +1,97 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { map, tail } from '@most/prelude'
import { IndexSink } from '../sink/IndexSink'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
export const combine2 = f => s1 => s2 => combineArray(f, [s1, s2])
export const combine3 = f => s1 => s2 => s3 => combineArray(f, [s1, s2, s3])
export const combine4 = f => s1 => s2 => s3 => s4 => combineArray(f, [s1, s2, s3, s4])
export const combine5 = f => s1 => s2 => s3 => s4 => s5 => combineArray(f, [s1, s2, s3, s4, s5])
function combineArray (f, streams) {
return new Stream(combineSources(f, streams))
}
function combineSources (f, streams) {
return new Combine(f, map(getSource, streams))
}
function getSource (stream) {
return stream.source
}
function Combine (f, sources) {
this.f = f
this.sources = sources
}
Combine.prototype.run = function (sink) {
return scheduler => {
var l = this.sources.length
var disposables = new Array(l)
var sinks = new Array(l)
var mergeSink = new CombineSink(disposables, sinks, sink, this.f)
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink)(scheduler)
}
return disposeAll(disposables)
}
}
function CombineSink (disposables, sinks, sink, f) {
this.sink = sink
this.disposables = disposables
this.sinks = sinks
this.f = f
var l = sinks.length
this.awaiting = l
this.values = new Array(l)
this.hasValue = new Array(l)
for (var i = 0; i < l; ++i) {
this.hasValue[i] = false
}
this.activeCount = sinks.length
}
CombineSink.prototype.event = function (t) {
return indexedValue => {
var i = indexedValue.index
var awaiting = this._updateReady(i)
this.values[i] = indexedValue.value
if (awaiting === 0) {
const value = tail(this.values).reduce((f, x) => f(x), this.f(this.values[0]))
this.sink.event(t)(value)
}
}
}
CombineSink.prototype._updateReady = function (index) {
if (this.awaiting > 0) {
if (!this.hasValue[index]) {
this.hasValue[index] = true
this.awaiting -= 1
}
}
return this.awaiting
}
CombineSink.prototype.end = function (t, indexedValue) {
this.disposables[indexedValue.index].dispose()
if (--this.activeCount === 0) {
this.sink.end(t)
}
}

View File

@ -0,0 +1,56 @@
import { MemoizedDisposable } from '../disposables/MemoizedDisposable'
import { Stream } from '../Stream'
export const concat = s1 => s2 => continueWith(() => s2)(s1)
export const continueWith = f => stream => new Stream(new ContinueWith(f, stream.source))
function ContinueWith (f, source) {
this.f = f
this.source = source
}
ContinueWith.prototype.run = function (sink) {
return scheduler => {
return new ContinueWithSink(this.f, this.source, sink, scheduler)
}
}
function ContinueWithSink (f, source, sink, scheduler) {
this.f = f
this.sink = sink
this.scheduler = scheduler
this.active = true
this.disposable = new MemoizedDisposable(source.run(this)(scheduler))
}
ContinueWithSink.prototype.event = function (t) {
return x => {
if (!this.active) {
return
}
this.sink.event(t)(x)
}
}
ContinueWithSink.prototype.end = function (t, x) {
if (!this.active) {
return
}
this.disposable.dispose()
this._startNext(t, x, this.sink)
}
ContinueWithSink.prototype._startNext = function (t, x, sink) {
this._continue(this.f, x, sink)
}
ContinueWithSink.prototype._continue = function (f, x, sink) {
return f(x).source.run(sink)(this.scheduler)
}
ContinueWithSink.prototype.dispose = function () {
this.active = false
return this.disposable.dispose()
}

View File

@ -1,2 +1,6 @@
export * from './tap'
export * from './drain'
export * from './mergeConcurrently'
export * from './combine'
export * from './merge'
export * from './concat'
export * from './switch'

View File

@ -0,0 +1,74 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { IndexSink } from '../sink/IndexSink'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
import { reduce } from '@most/prelude'
/**
* @param {Array} streams array of stream to merge
* @returns {Stream} stream containing events from all input observables
* in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
export function merge (streams) {
return new Stream(mergeSources(streams))
}
/**
* This implements fusion/flattening for merge. It will
* fuse adjacent merge operations. For example:
* - a.merge(b).merge(c) effectively becomes merge(a, b, c)
* - merge(a, merge(b, c)) effectively becomes merge(a, b, c)
* It does this by concatenating the sources arrays of
* any nested Merge sources, in effect "flattening" nested
* merge operations into a single merge.
*/
function mergeSources (streams) {
return new Merge(reduce(appendSources, [], streams))
}
function appendSources (sources, stream) {
var source = stream.source
return source instanceof Merge
? sources.concat(source.sources)
: sources.concat(source)
}
function Merge (sources) {
this.sources = sources
}
Merge.prototype.run = function (sink, scheduler) {
var l = this.sources.length
var disposables = new Array(l)
var sinks = new Array(l)
var mergeSink = new MergeSink(disposables, sinks, sink)
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink, scheduler)
}
return disposeAll(disposables)
}
function MergeSink (disposables, sinks, sink) {
this.sink = sink
this.disposables = disposables
this.activeCount = sinks.length
}
MergeSink.prototype.event = function (t, indexValue) {
this.sink.event(t, indexValue.value)
}
MergeSink.prototype.end = function (t, indexedValue) {
dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink)
if (--this.activeCount === 0) {
this.sink.end(t, indexedValue.value)
}
}

View File

@ -2,15 +2,23 @@
/** @author Brian Cavalier */
/** @author John Hann */
import { curry2, curry3, id as identity } from '@most/prelude'
import { LinkedList } from '../LinkedList'
import { MemoizedDisosable } from '../disposables/MemoizedDisposable'
import { id as identity } from '@most/prelude'
import { MemoizedDisposable } from '../disposables/MemoizedDisposable'
import { Stream } from '../Stream'
export const mergeConcurrently = (concurrency, stream) =>
mergeMapConcurrently(identity, concurrency, stream)
export const flatMap =
curry2((stream, f) => mergeMapConcurrently(f, Infinity, stream))
export const mergeMapConcurrently = (f, concurrency, stream) =>
new MergeConcurrently(f, concurrency, stream)
export const mergeConcurrently = curry2((concurrency, stream) =>
mergeMapConcurrently(identity, concurrency, stream))
export const join =
mergeConcurrently(Infinity)
export const mergeMapConcurrently = curry3((f, concurrency, stream) =>
new Stream(new MergeConcurrently(f, concurrency, stream.source)))
class MergeConcurrently {
constructor (f, concurrency, source) {
@ -19,8 +27,8 @@ class MergeConcurrently {
this.source = source
}
run (sink, scheduler) {
return new Outer(this.f, this.concurrency, this.source, sink, scheduler)
run (sink) {
return scheduler => new Outer(this.f, this.concurrency, this.source, sink, scheduler)
}
}
@ -32,12 +40,12 @@ class Outer {
this.scheduler = scheduler
this.pending = []
this.current = new LinkedList()
this.disposable = new MemoizedDisosable(source.run(this, scheduler))
this.disposable = new MemoizedDisposable(source.run(this)(scheduler))
this.active = true
}
event (t, x) {
this._addInner(t, x)
event (t) {
return x => this._addInner(t, x)
}
_addInner (t, x) {
@ -62,10 +70,10 @@ class Outer {
this.current.add(innerSink)
}
end (t, x) {
end (t) {
this.active = false
this.disposable.dispose()
this._checkEnd(t, x)
this._checkEnd(t)
}
dispose () {
@ -75,26 +83,25 @@ class Outer {
this.current.dispose()
}
_endInner (t, x, inner) {
_endInner (t, inner) {
this.current.remove(inner)
inner.dispose()
if (this.pending.length === 0) {
this._checkEnd(t, x)
this._checkEnd(t)
} else {
this._startInner(t, this.pending.shift())
}
}
_checkEnd (t, x) {
_checkEnd (t) {
if (!this.active && this.current.isEmpty()) {
this.sink.end(t, x)
this.sink.end(t)
}
}
}
const mapAndRun = (f, x, sink, scheduler) =>
f(x).run(sink, scheduler)
const mapAndRun = (f, x, sink, scheduler) => f(x).source.run(sink)(scheduler)
class Inner {
constructor (time, outer, sink) {
@ -105,16 +112,12 @@ class Inner {
this.disposable = void 0
}
event (t, x) {
this.sink.event(Math.max(t, this.time), x)
event (t) {
return x => this.sink.event(Math.max(t, this.time))(x)
}
end (t, x) {
this.outer._endInner(Math.max(t, this.time), x, this)
}
error (t, e) {
this.outer.error(Math.max(t, this.time), e)
end (t) {
this.outer._endInner(Math.max(t, this.time), this)
}
dispose () {

View File

@ -0,0 +1,99 @@
import { disposeAll, emptyDisposable } from '../disposables'
import { Stream } from '../Stream'
/**
* Given a stream of streams, return a new stream that adopts the behavior
* of the most recent inner stream.
* @param {Stream} stream of streams on which to switch
* @returns {Stream} switching stream
*/
function switchLatest (stream) {
return new Stream(new Switch(stream.source))
}
export { switchLatest as switch }
function Switch (source) {
this.source = source
}
Switch.prototype.run = function (sink) {
return scheduler => {
const switchSink = new SwitchSink(sink, scheduler)
return disposeAll([switchSink, this.source.run(switchSink)(scheduler)])
}
}
function SwitchSink (sink, scheduler) {
this.sink = sink
this.scheduler = scheduler
this.current = null
this.ended = false
}
SwitchSink.prototype.event = function (t) {
return stream => {
this._disposeCurrent(t) // TODO: capture the result of this dispose
this.current = new Segment(t, Infinity, this, this.sink)
this.current.disposable = stream.source.run(this.current)(this.scheduler)
}
}
SwitchSink.prototype.end = function (t, x) {
this.ended = true
this._checkEnd(t, x)
}
SwitchSink.prototype.dispose = function () {
return this._disposeCurrent(this.scheduler.now())
}
SwitchSink.prototype._disposeCurrent = function (t) {
if (this.current !== null) {
return this.current._dispose(t)
}
}
SwitchSink.prototype._disposeInner = function (t, inner) {
inner._dispose(t) // TODO: capture the result of this dispose
if (inner === this.current) {
this.current = null
}
}
SwitchSink.prototype._checkEnd = function (t, x) {
if (this.ended && this.current === null) {
this.sink.end(t, x)
}
}
SwitchSink.prototype._endInner = function (t, x, inner) {
this._disposeInner(t, inner)
this._checkEnd(t, x)
}
function Segment (min, max, outer, sink) {
this.min = min
this.max = max
this.outer = outer
this.sink = sink
this.disposable = emptyDisposable
}
Segment.prototype.event = function (t) {
return x => {
if (t < this.max) {
this.sink.event(Math.max(t, this.min))(x)
}
}
}
Segment.prototype.end = function (t, x) {
this.outer._endInner(Math.max(t, this.min), x, this)
}
Segment.prototype._dispose = function (t) {
this.max = t
this.disposable.dispose()
}

View File

@ -1,50 +0,0 @@
import { Stream } from '../Stream'
import { curry2 } from '@most/prelude'
export const tapEnd = curry2((f, stream) => new Stream(new TapEnd(f, stream.source)))
export const tapEvent = curry2((f, stream) => new Stream(new TapEvent(f, stream.source)))
class TapEvent {
constructor (f, source) {
this.source = source
this.f = f
this.run = sink => scheduler =>
source.run(new TapSink(f, sink))(scheduler)
}
}
class TapSink {
constructor (f, sink) {
this.event = t => x => {
f(x)()
sink.event(t)(x)
}
this.end = t => {
sink.end(t)
}
}
}
class TapEnd {
constructor (f, source) {
this.source = source
this.f = f
this.run = sink => scheduler =>
source.run(new TapEndSink(f, sink))(scheduler)
}
}
class TapEndSink {
constructor (f, sink) {
this.event = t => x => sink.event(t)(x)
this.end = t => {
f()
sink.end(t)
}
}
}

View File

@ -7,9 +7,7 @@ export class Disposable {
constructor (dispose, data) {
this._dispose = dispose
this._data = data
}
dispose () {
return this._dispose(this._data)
this.dispose = () => dispose(data)
}
}

View File

@ -8,7 +8,7 @@ export class MemoizedDisposable {
dispose () {
if (!this.disposed) {
this.disposed = true
this.value = disposeSafely(this.disposable)
this.value = this.disposable.dispose()
this.disposable = undefined
}

View File

@ -9,4 +9,4 @@ export const createDisposable = curry2((dispose, data) => new MemoizedDisposable
export const emptyDisposable = new Disposable(id, undefined)
export const disposeAll = disposables =>
createDisposable(map(disposeSafely, disposables), disposables)
createDisposable(curry2(map)(disposeSafely), disposables)

View File

@ -11,6 +11,8 @@ function runSource (source, scheduler) {
const observer = new RunEffectsSink(disposable)
disposable.setDisposable(source.run(observer)(scheduler))
return {}
}
var RunEffectsSink = function RunEffectsSink (disposable) {

View File

@ -1,14 +1,19 @@
/* @flow */
export function ScheduledTask (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
export class ScheduledTask {
constructor (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
}
this.run = () => task.run(this.time)
this.dispose = () => {
scheduler.cancel(this)
task.dispose()
run () {
this.task.run(this.time)
}
dispose () {
this.scheduler.cancel(this)
this.task.dispose()
}
}

View File

@ -25,24 +25,26 @@ export class Scheduler {
}
asap (task) {
return this.schedule(0, -1, task)
return this.schedule(0)(-1)(task)
}
delay (delay, task) {
return this.schedule(delay, -1, task)
delay (delay) {
return task => this.schedule(delay)(-1)(task)
}
periodic (period, task) {
return this.schedule(0, period, task)
periodic (period) {
return task => this.schedule(0)(period)(task)
}
schedule (delay, period, task) {
const now = this.now()
const st = new ScheduledTask(now + Math.max(0, delay), period, task, this)
schedule (delay) {
return period => task => {
const now = this.now()
const st = new ScheduledTask(now + Math.max(0, delay), period, task, this)
this.timeline.add(st)
this._scheduleNextRun(now)
return st
this.timeline.add(st)
this._scheduleNextRun(now)
return st
}
}
cancel (task) {

View File

@ -15,10 +15,10 @@ export class IndexSink {
}
}
end (t, x) {
end (t) {
if (!this.active) return
this.active = false
this.sink.end(t, { index: this.index, value: x })
this.sink.end(t, { index: this.index })
}
}

View File

@ -4,6 +4,7 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "yarn build && pulp run",
"build": "rollup -c && pulp build",
"changelog": "conventional-changelog -i CHANGELOG.md -s -r 0 -p angular",
"clean": "rimraf lib lib.es2015",

View File

@ -16,11 +16,38 @@ Object.defineProperty(exports, '__esModule', { value: true });
// drop :: Int -> [a] -> [a]
// drop first n elements
function drop (n, a) { // eslint-disable-line complexity
if (n < 0) {
throw new TypeError('n must be >= 0')
}
var l = a.length;
if (n === 0 || l === 0) {
return a
}
if (n >= l) {
return []
}
return unsafeDrop(n, a, l - n)
}
// unsafeDrop :: Int -> [a] -> Int -> [a]
// Internal helper for drop
function unsafeDrop (n, a, l) {
var b = new Array(l);
for (var i = 0; i < l; ++i) {
b[i] = a[n + i];
}
return b
}
// tail :: [a] -> [a]
// drop head element
function tail (a) {
return drop(1, a)
}
// copy :: [a] -> [a]
// duplicate a (shallow duplication)
@ -39,7 +66,13 @@ function map (f, a) {
// reduce :: (a -> b -> a) -> a -> [b] -> a
// accumulate via left-fold
function reduce (f, z, a) {
var r = z;
for (var i = 0, l = a.length; i < l; ++i) {
r = f(r, a[i], i);
}
return r
}
// replace :: a -> Int -> [a]
// replace element at index
@ -124,10 +157,8 @@ function curry3 (f) {
var Disposable = function Disposable (dispose, data) {
this._dispose = dispose;
this._data = data;
};
Disposable.prototype.dispose = function dispose () {
return this._dispose(this._data)
this.dispose = function () { return dispose(data); };
};
var MemoizedDisposable = function MemoizedDisposable (disposable) {
@ -139,14 +170,14 @@ var MemoizedDisposable = function MemoizedDisposable (disposable) {
MemoizedDisposable.prototype.dispose = function dispose () {
if (!this.disposed) {
this.disposed = true;
this.value = disposeSafely(this.disposable);
this.value = this.disposable.dispose();
this.disposable = undefined;
}
return this.value
};
function disposeSafely$1 (disposable) {
function disposeSafely (disposable) {
disposable.dispose();
}
@ -154,7 +185,7 @@ var createDisposable = curry2(function (dispose, data) { return new MemoizedDisp
var emptyDisposable = new Disposable(id, undefined);
var disposeAll = function (disposables) { return createDisposable(map(disposeSafely$1, disposables), disposables); };
var disposeAll = function (disposables) { return createDisposable(curry2(map)(disposeSafely), disposables); };
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
@ -202,21 +233,22 @@ function runAsap (f) {
}
/* */
function ScheduledTask (delay, period, task, scheduler) {
var this$1 = this;
var ScheduledTask = function ScheduledTask (delay, period, task, scheduler) {
this.time = delay;
this.period = period;
this.task = task;
this.scheduler = scheduler;
this.active = true;
};
this.run = function () { return task.run(this$1.time); };
this.dispose = function () {
scheduler.cancel(this$1);
task.dispose();
};
}
ScheduledTask.prototype.run = function run () {
this.task.run(this.time);
};
ScheduledTask.prototype.dispose = function dispose () {
this.scheduler.cancel(this);
this.task.dispose();
};
/* */
function runTask$1 (task) {
@ -244,24 +276,32 @@ Scheduler.prototype.now = function now () {
};
Scheduler.prototype.asap = function asap (task) {
return this.schedule(0, -1, task)
return this.schedule(0)(-1)(task)
};
Scheduler.prototype.delay = function delay (delay$1, task) {
return this.schedule(delay$1, -1, task)
Scheduler.prototype.delay = function delay (delay$1) {
var this$1 = this;
return function (task) { return this$1.schedule(delay$1)(-1)(task); }
};
Scheduler.prototype.periodic = function periodic (period, task) {
return this.schedule(0, period, task)
Scheduler.prototype.periodic = function periodic (period) {
var this$1 = this;
return function (task) { return this$1.schedule(0)(period)(task); }
};
Scheduler.prototype.schedule = function schedule (delay, period, task) {
var now = this.now();
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this);
Scheduler.prototype.schedule = function schedule (delay) {
var this$1 = this;
this.timeline.add(st);
this._scheduleNextRun(now);
return st
return function (period) { return function (task) {
var now = this$1.now();
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this$1);
this$1.timeline.add(st);
this$1._scheduleNextRun(now);
return st
}; }
};
Scheduler.prototype.cancel = function cancel (task) {
@ -478,48 +518,6 @@ var scheduleTasks = curry2(function (tasks, scheduler) {
}
});
var Stream = function Stream (source) {
this.source = source;
};
var tapEnd = curry2(function (f, stream) { return new Stream(new TapEnd(f, stream.source)); });
var tapEvent = curry2(function (f, stream) { return new Stream(new TapEvent(f, stream.source)); });
var TapEvent = function TapEvent (f, source) {
this.source = source;
this.f = f;
this.run = function (sink) { return function (scheduler) { return source.run(new TapSink(f, sink))(scheduler); }; };
};
var TapSink = function TapSink (f, sink) {
this.event = function (t) { return function (x) {
f(x)();
sink.event(t)(x);
}; };
this.end = function (t) {
sink.end(t);
};
};
var TapEnd = function TapEnd (f, source) {
this.source = source;
this.f = f;
this.run = function (sink) { return function (scheduler) { return source.run(new TapEndSink(f, sink))(scheduler); }; };
};
var TapEndSink = function TapEndSink (f, sink) {
this.event = function (t) { return function (x) { return sink.event(t)(x); }; };
this.end = function (t) {
f();
sink.end(t);
};
};
var SettableDisposable = function SettableDisposable () {
this.disposable = void 0;
this.disposed = false;
@ -559,6 +557,8 @@ function runSource (source, scheduler) {
var observer = new RunEffectsSink(disposable);
disposable.setDisposable(source.run(observer)(scheduler));
return {}
}
var RunEffectsSink = function RunEffectsSink (disposable) {
@ -577,100 +577,526 @@ RunEffectsSink.prototype.end = function end (t) {
var drain = function (stream) { return function () { return runEffects(stream, defaultScheduler); }; };
var combine =
curry3(function (f, stream1, stream2) { return new Stream(new Combine(f, stream1.source, stream2.source)); });
var Combine = function Combine (f, source1, source2) {
this.f = f;
this.source1 = source1;
this.source2 = source2;
var LinkedList = function LinkedList () {
this.head = null;
this.length = 0;
};
Combine.prototype.run = function run (sink) {
var this$1 = this;
LinkedList.prototype.add = function add (x) {
if (this.head !== null) {
this.head.prev = x;
x.next = this.head;
}
this.head = x;
++this.length;
};
return function (scheduler) {
var disposables = Array(2);
var sinks = Array(2);
var mergeSink = new CombineSink(disposables, sinks, sink, this$1.f);
sinks[0] = new IndexSink(0, mergeSink);
sinks[1] = new IndexSink(1, mergeSink);
disposables[0] = this$1.source1.run(sinks[0])(scheduler);
disposables[1] = this$1.source2.run(sinks[1])(scheduler);
return disposeAll(disposables)
LinkedList.prototype.remove = function remove (x) { // eslint-disable-linecomplexity
--this.length;
if (x === this.head) {
this.head = this.head.next;
}
if (x.next !== null) {
x.next.prev = x.prev;
x.next = null;
}
if (x.prev !== null) {
x.prev.next = x.next;
x.prev = null;
}
};
var IndexSink = function IndexSink () {};
LinkedList.prototype.isEmpty = function isEmpty () {
return this.length === 0
};
IndexSink.prototype.cosntructor = function cosntructor (i, sink) {
this.i = i;
LinkedList.prototype.dispose = function dispose () {
if (this.isEmpty()) { return }
var x = this.head;
this.head = null;
this.length = 0;
while (x !== null) {
x.dispose();
x = x.next;
}
};
var Stream = function Stream (source) {
this.source = source;
};
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var flatMap =
curry2(function (stream, f) { return mergeMapConcurrently(f, Infinity, stream); });
var mergeConcurrently = curry2(function (concurrency, stream) { return mergeMapConcurrently(id, concurrency, stream); });
var join =
mergeConcurrently(Infinity);
var mergeMapConcurrently = curry3(function (f, concurrency, stream) { return new Stream(new MergeConcurrently(f, concurrency, stream.source)); });
var MergeConcurrently = function MergeConcurrently (f, concurrency, source) {
this.f = f;
this.concurrency = concurrency;
this.source = source;
};
MergeConcurrently.prototype.run = function run (sink) {
var this$1 = this;
return function (scheduler) { return new Outer(this$1.f, this$1.concurrency, this$1.source, sink, scheduler); }
};
var Outer = function Outer (f, concurrency, source, sink, scheduler) {
this.f = f;
this.concurrency = concurrency;
this.sink = sink;
this.scheduler = scheduler;
this.pending = [];
this.current = new LinkedList();
this.disposable = new MemoizedDisposable(source.run(this)(scheduler));
this.active = true;
};
Outer.prototype.event = function event (t) {
var this$1 = this;
return function (x) { return this$1._addInner(t, x); }
};
Outer.prototype._addInner = function _addInner (t, x) {
if (this.current.length < this.concurrency) {
this._startInner(t, x);
} else {
this.pending.push(x);
}
};
Outer.prototype._startInner = function _startInner (t, x) {
try {
this._initInner(t, x);
} catch (e) {
this.error(t, e);
}
};
Outer.prototype._initInner = function _initInner (t, x) {
var innerSink = new Inner(t, this, this.sink);
innerSink.disposable = mapAndRun(this.f, x, innerSink, this.scheduler);
this.current.add(innerSink);
};
Outer.prototype.end = function end (t) {
this.active = false;
this.disposable.dispose();
this._checkEnd(t);
};
Outer.prototype.dispose = function dispose () {
this.active = false;
this.pending.length = 0;
this.disposable.dispose();
this.current.dispose();
};
Outer.prototype._endInner = function _endInner (t, inner) {
this.current.remove(inner);
inner.dispose();
if (this.pending.length === 0) {
this._checkEnd(t);
} else {
this._startInner(t, this.pending.shift());
}
};
Outer.prototype._checkEnd = function _checkEnd (t) {
if (!this.active && this.current.isEmpty()) {
this.sink.end(t);
}
};
var mapAndRun = function (f, x, sink, scheduler) { return f(x).source.run(sink)(scheduler); };
var Inner = function Inner (time, outer, sink) {
this.prev = this.next = null;
this.time = time;
this.outer = outer;
this.sink = sink;
this.disposable = void 0;
};
Inner.prototype.event = function event (t) {
var this$1 = this;
return function (x) { return this$1.sink.event(Math.max(t, this$1.time))(x); }
};
Inner.prototype.end = function end (t) {
this.outer._endInner(Math.max(t, this.time), this);
};
Inner.prototype.dispose = function dispose () {
return this.disposable.dispose()
};
var IndexSink = function IndexSink (i, sink) {
this.sink = sink;
this.index = i;
this.active = true;
this.value = void 0;
};
IndexSink.prototype.event = function event (t) {
var this$1 = this;
return function (x) {
this$1.sink.event(t)({ index: this$1.i, value: x });
if (!this$1.active) { return }
this$1.value = x;
this$1.sink.event(t)(this$1);
}
};
IndexSink.prototype.end = function end (t) {
this.sink.end(t, this.i);
if (!this.active) { return }
this.active = false;
this.sink.end(t, { index: this.index });
};
var CombineSink = function CombineSink (disposables, sinks, sink, f) {
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var combine2 = function (f) { return function (s1) { return function (s2) { return combineArray(f, [s1, s2]); }; }; };
var combine3 = function (f) { return function (s1) { return function (s2) { return function (s3) { return combineArray(f, [s1, s2, s3]); }; }; }; };
var combine4 = function (f) { return function (s1) { return function (s2) { return function (s3) { return function (s4) { return combineArray(f, [s1, s2, s3, s4]); }; }; }; }; };
var combine5 = function (f) { return function (s1) { return function (s2) { return function (s3) { return function (s4) { return function (s5) { return combineArray(f, [s1, s2, s3, s4, s5]); }; }; }; }; }; };
function combineArray (f, streams) {
return new Stream(combineSources(f, streams))
}
function combineSources (f, streams) {
return new Combine(f, map(getSource, streams))
}
function getSource (stream) {
return stream.source
}
function Combine (f, sources) {
this.f = f;
this.sources = sources;
}
Combine.prototype.run = function (sink) {
var this$1 = this;
return function (scheduler) {
var l = this$1.sources.length;
var disposables = new Array(l);
var sinks = new Array(l);
var mergeSink = new CombineSink(disposables, sinks, sink, this$1.f);
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink);
disposables[i] = this$1.sources[i].run(indexSink)(scheduler);
}
return disposeAll(disposables)
}
};
function CombineSink (disposables, sinks, sink, f) {
var this$1 = this;
this.sink = sink;
this.disposables = disposables;
this.sinks = sinks;
this.sink = sink;
this.f = f;
this.awaiting = 2;
this.values = Array(2);
this.hasValue = [false, false];
this.activeCount = 2;
};
var l = sinks.length;
this.awaiting = l;
this.values = new Array(l);
this.hasValue = new Array(l);
CombineSink.prototype.event = function event (t) {
var this$1 = this;
for (var i = 0; i < l; ++i) {
this$1.hasValue[i] = false;
}
return function (ref) {
var index = ref.index;
var value = ref.value;
this.activeCount = sinks.length;
}
var f = this$1.f;
var awaiting = this$1._updateReady(index);
this$1.values[index] = value;
CombineSink.prototype.event = function (t) {
var this$1 = this;
return function (indexedValue) {
var i = indexedValue.index;
var awaiting = this$1._updateReady(i);
this$1.values[i] = indexedValue.value;
if (awaiting === 0) {
var result = f(this$1.values[0])(this$1.values[1]);
this$1.sink.event(t)(result);
var value = tail(this$1.values).reduce(function (f, x) { return f(x); }, this$1.f(this$1.values[0]));
this$1.sink.event(t)(value);
}
}
};
CombineSink.prototype._updateReady = function _updateReady (index) {
CombineSink.prototype._updateReady = function (index) {
if (this.awaiting > 0) {
if (!this.hasValue[index]) {
this.hasValue[index] = true;
this.awaiting -= 1;
}
}
return this.awaiting
};
CombineSink.prototype.end = function end (t, ref) {
var index = ref.index;
var value = ref.value;
CombineSink.prototype.end = function (t, indexedValue) {
this.disposables[indexedValue.index].dispose();
this.disposables[index].dispose();
if (--this.activeCount === 0) {
this.sink.end(t);
}
};
if (--this.activeCount === 0) { this.sink.end(t); }
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/**
* @param {Array} streams array of stream to merge
* @returns {Stream} stream containing events from all input observables
* in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
function merge (streams) {
return new Stream(mergeSources(streams))
}
/**
* This implements fusion/flattening for merge. It will
* fuse adjacent merge operations. For example:
* - a.merge(b).merge(c) effectively becomes merge(a, b, c)
* - merge(a, merge(b, c)) effectively becomes merge(a, b, c)
* It does this by concatenating the sources arrays of
* any nested Merge sources, in effect "flattening" nested
* merge operations into a single merge.
*/
function mergeSources (streams) {
return new Merge(reduce(appendSources, [], streams))
}
function appendSources (sources, stream) {
var source = stream.source;
return source instanceof Merge
? sources.concat(source.sources)
: sources.concat(source)
}
function Merge (sources) {
this.sources = sources;
}
Merge.prototype.run = function (sink, scheduler) {
var this$1 = this;
var l = this.sources.length;
var disposables = new Array(l);
var sinks = new Array(l);
var mergeSink = new MergeSink(disposables, sinks, sink);
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink);
disposables[i] = this$1.sources[i].run(indexSink, scheduler);
}
return disposeAll(disposables)
};
function MergeSink (disposables, sinks, sink) {
this.sink = sink;
this.disposables = disposables;
this.activeCount = sinks.length;
}
MergeSink.prototype.event = function (t, indexValue) {
this.sink.event(t, indexValue.value);
};
MergeSink.prototype.end = function (t, indexedValue) {
dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink);
if (--this.activeCount === 0) {
this.sink.end(t, indexedValue.value);
}
};
var concat = function (s1) { return function (s2) { return continueWith(function () { return s2; })(s1); }; };
var continueWith = function (f) { return function (stream) { return new Stream(new ContinueWith(f, stream.source)); }; };
function ContinueWith (f, source) {
this.f = f;
this.source = source;
}
ContinueWith.prototype.run = function (sink) {
var this$1 = this;
return function (scheduler) {
return new ContinueWithSink(this$1.f, this$1.source, sink, scheduler)
}
};
function ContinueWithSink (f, source, sink, scheduler) {
this.f = f;
this.sink = sink;
this.scheduler = scheduler;
this.active = true;
this.disposable = new MemoizedDisposable(source.run(this)(scheduler));
}
ContinueWithSink.prototype.event = function (t) {
var this$1 = this;
return function (x) {
if (!this$1.active) {
return
}
this$1.sink.event(t)(x);
}
};
ContinueWithSink.prototype.end = function (t, x) {
if (!this.active) {
return
}
this.disposable.dispose();
this._startNext(t, x, this.sink);
};
ContinueWithSink.prototype._startNext = function (t, x, sink) {
this._continue(this.f, x, sink);
};
ContinueWithSink.prototype._continue = function (f, x, sink) {
return f(x).source.run(sink)(this.scheduler)
};
ContinueWithSink.prototype.dispose = function () {
this.active = false;
return this.disposable.dispose()
};
/**
* Given a stream of streams, return a new stream that adopts the behavior
* of the most recent inner stream.
* @param {Stream} stream of streams on which to switch
* @returns {Stream} switching stream
*/
function switchLatest (stream) {
return new Stream(new Switch(stream.source))
}
function Switch (source) {
this.source = source;
}
Switch.prototype.run = function (sink) {
var this$1 = this;
return function (scheduler) {
var switchSink = new SwitchSink(sink, scheduler);
return disposeAll([switchSink, this$1.source.run(switchSink)(scheduler)])
}
};
function SwitchSink (sink, scheduler) {
this.sink = sink;
this.scheduler = scheduler;
this.current = null;
this.ended = false;
}
SwitchSink.prototype.event = function (t) {
var this$1 = this;
return function (stream) {
this$1._disposeCurrent(t); // TODO: capture the result of this dispose
this$1.current = new Segment(t, Infinity, this$1, this$1.sink);
this$1.current.disposable = stream.source.run(this$1.current)(this$1.scheduler);
}
};
SwitchSink.prototype.end = function (t, x) {
this.ended = true;
this._checkEnd(t, x);
};
SwitchSink.prototype.dispose = function () {
return this._disposeCurrent(this.scheduler.now())
};
SwitchSink.prototype._disposeCurrent = function (t) {
if (this.current !== null) {
return this.current._dispose(t)
}
};
SwitchSink.prototype._disposeInner = function (t, inner) {
inner._dispose(t); // TODO: capture the result of this dispose
if (inner === this.current) {
this.current = null;
}
};
SwitchSink.prototype._checkEnd = function (t, x) {
if (this.ended && this.current === null) {
this.sink.end(t, x);
}
};
SwitchSink.prototype._endInner = function (t, x, inner) {
this._disposeInner(t, inner);
this._checkEnd(t, x);
};
function Segment (min, max, outer, sink) {
this.min = min;
this.max = max;
this.outer = outer;
this.sink = sink;
this.disposable = emptyDisposable;
}
Segment.prototype.event = function (t) {
var this$1 = this;
return function (x) {
if (t < this$1.max) {
this$1.sink.event(Math.max(t, this$1.min))(x);
}
}
};
Segment.prototype.end = function (t, x) {
this.outer._endInner(Math.max(t, this.min), x, this);
};
Segment.prototype._dispose = function (t) {
this.max = t;
this.disposable.dispose();
};
/* */
@ -682,7 +1108,16 @@ exports.defaultScheduler = defaultScheduler;
exports.scheduleTasks = scheduleTasks;
exports.endTask = endTask;
exports.eventTask = eventTask;
exports.tapEnd = tapEnd;
exports.tapEvent = tapEvent;
exports.drain = drain;
exports.combine = combine;
exports.flatMap = flatMap;
exports.mergeConcurrently = mergeConcurrently;
exports.join = join;
exports.mergeMapConcurrently = mergeMapConcurrently;
exports.combine2 = combine2;
exports.combine3 = combine3;
exports.combine4 = combine4;
exports.combine5 = combine5;
exports.merge = merge;
exports.concat = concat;
exports.continueWith = continueWith;
exports.switch = switchLatest;

View File

@ -10,9 +10,9 @@ module Control.Stream
, ScheduledTask
, Disposable
, Time
, createStream
, getSource
, runSource
, EventFn
, EndFn
-- Scheduler-related functions and values
, defaultScheduler
, eventTask
, endTask
@ -20,19 +20,54 @@ module Control.Stream
, emptyDisposable
, disposeAll
, scheduleTasks
-- Stream-related helpers
, createStream
, createSink
, createCombinator
, createEventCombinator
, getSource
, runSource
-- Combinators
, drain
, tapEvent
, tapEnd
, filter
, mergeConcurrently
, mergeMapConcurrently
, join
, combine2
, combine3
, combine4
, combine5
, continueWith
, concat
, merge
, switch
, constant
, scan
-- Stream factories
, just
, fromArray
, empty
, periodic
)
where
import Control.Applicative (class Applicative)
import Control.Apply (class Apply)
import Control.Bind (class Bind)
import Control.Category (id)
import Control.Monad (class Monad)
import Control.Monad.Eff (Eff, Pure)
import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef)
import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff)
import Control.MonadPlus (class Alt, class Plus)
import Data.Array (snoc)
import Data.Function (flip, ($))
import Data.Functor (class Functor, map)
import Data.Unit (Unit)
import Prelude (flip)
import Data.Monoid (class Monoid)
import Data.Semigroup (class Semigroup)
import Data.Unit (Unit, unit)
newtype Time = Time Int
@ -50,11 +85,11 @@ type Sink a =
, end :: Time -> Unit
}
type Scheduler =
newtype Scheduler = Scheduler
{ now :: Pure Time
, asap :: Task -> ScheduledTask
, delay :: Number -> Task -> ScheduledTask
, period :: Number -> Task -> ScheduledTask
, periodic :: Number -> Task -> ScheduledTask
, schedule :: Number -> Number -> Task -> ScheduledTask
, cancel :: ScheduledTask -> Unit
, cancelAll :: (ScheduledTask -> Boolean) -> Unit
@ -72,9 +107,36 @@ type ScheduledTask =
type Disposable = { dispose :: Pure Unit }
type EventFn a b = (Sink b -> Scheduler -> Time -> a -> Unit)
type EndFn a = (Sink a -> Scheduler -> Time -> Unit)
instance functorStream :: Functor Stream where
map = _map
instance applyStream :: Apply Stream where
apply = combine2 id
instance applicativeStream :: Applicative Stream where
pure = just
instance bindStream :: Bind Stream where
bind = flatMap
instance monadStream :: Monad Stream
instance semigroupStream :: Semigroup (Stream a) where
append = concat
instance altStream :: Alt Stream where
alt = \s1 s2 -> merge [s1, s2]
instance monoidStream :: Monoid (Stream a) where
mempty = empty
instance plusStream :: Plus Stream where
empty = empty
-- Stream-related helpers
createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a
createStream run = Stream { source: { run } }
@ -84,6 +146,30 @@ getSource stream = case stream of Stream a -> a.source
runSource :: forall a. Stream a -> Sink a -> Scheduler -> Disposable
runSource stream sink scheduler = (getSource stream).run sink scheduler
createSink :: forall a. (Time -> a -> Unit) -> (Time -> Unit) -> Sink a
createSink event end = { event, end }
createCombinator :: forall a b. EventFn a b -> EndFn b -> Stream a -> Stream b
createCombinator event end stream = createStream runCombinator
where
runCombinator :: Sink b -> Scheduler -> Disposable
runCombinator sink scheduler =
runSource stream (createSink (event sink scheduler) (end sink scheduler)) scheduler
createEventCombinator :: forall a b. EventFn a b -> Stream a -> Stream b
createEventCombinator event stream = createCombinator event end stream
where
end :: EndFn b
end sink scheduler time =
sink.end time
createEndCombinator :: forall a. EndFn a -> Stream a -> Stream a
createEndCombinator end stream = createCombinator event end stream
where
event :: EventFn a a
event sink scheduler time value = sink.event time value
-- Stream factories
just :: forall a. a -> Stream a
just a = createStream (runJust a)
@ -91,6 +177,12 @@ runJust :: forall a. a -> Sink a -> Scheduler -> Disposable
runJust a sink scheduler =
scheduleTasks [ eventTask a sink, endTask sink ] scheduler
empty :: forall a. Stream a
empty = createStream \sink scheduler -> scheduleTasks [endTask sink] scheduler
never :: forall a. Stream a
never = createStream \sink scheduler -> emptyDisposable
-- TODO: remove fromArray because it's not a real event stream
-- replace with a fromArray combinator
fromArray :: forall a. Array a -> Stream a
@ -105,19 +197,78 @@ runFromArray arr sink scheduler = scheduleTasks tasks scheduler
eventTasks :: Array Task
eventTasks = map (flip eventTask sink) arr
periodic :: Number -> Stream Unit
periodic period = createStream (runPeriodic period)
runPeriodic :: Number -> Sink Unit -> Scheduler -> Disposable
runPeriodic period sink scheduler = { dispose: scheduledTask.dispose }
where
p = case scheduler of Scheduler a -> a.periodic
scheduledTask = p period (eventTask unit sink)
-- combinators
_map :: forall a b. (a -> b) -> Stream a -> Stream b
_map f stream = createStream runMap
_map f stream = createEventCombinator mapEvent stream
where
runMap :: Sink b -> Scheduler -> Disposable
runMap sink scheduler =
runSource stream (mapSink f sink) scheduler
mapEvent :: EventFn a b
mapEvent sink scheduler time value = sink.event time (f value)
mapSink :: forall a b. (a -> b) -> Sink b -> Sink a
mapSink f sink = { event: mapEvent, end: sink.end }
filter :: forall a. (a -> Boolean) -> Stream a -> Stream a
filter predicate stream = createEventCombinator event stream
where
mapEvent :: Time -> a -> Unit
mapEvent time value = sink.event time (f value)
event :: EventFn a a
event sink scheduler time value = if predicate value
then sink.event time value
else unit
tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a
tapEvent f stream = createEventCombinator event stream
where
event :: EventFn a a
event sink scheduler time value = sink.event time value
where -- find a better way to perform these side effects
x = unsafePerformEff (f value)
tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a
tapEnd f stream = createEndCombinator end stream
where
end :: EndFn a
end sink scheduler time = sink.end time
where -- find a better way to perform these side effects
x = unsafePerformEff f
startWith :: forall a. a -> Stream a -> Stream a
startWith value stream = concat (just value) stream
constant :: forall a b. b -> Stream a -> Stream b
constant value stream = createEventCombinator constantEvent stream
where
constantEvent sink scheduler time x = sink.event time value
scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b
scan f seed stream = startWith seed $ createEventCombinator scanEvent stream
where
state = createState seed
scanEvent :: Sink b -> Scheduler -> Time -> a -> Unit
scanEvent sink scheduler time value = sink.event time (state.set \acc -> f acc value)
-- find a better way to perform these side effects
createState :: forall a. a -> { get :: Pure a, set :: (a -> a) -> a }
createState seed = { set, get }
where
ref :: Ref a
ref = unsafePerformEff $ unsafeCoerceEff $ newRef seed
set :: (a -> a) -> a
set f = unsafePerformEff $ unsafeCoerceEff $
modifyRef' ref \x -> { state: (f x), value: (f x) }
get :: Pure a
get = unsafeCoerceEff $ readRef ref
-- Foreign imports
-- Scheduler-related
foreign import defaultScheduler :: Scheduler
foreign import eventTask :: forall a. a -> Sink a -> Task
foreign import endTask :: forall a. Sink a -> Task
@ -125,6 +276,17 @@ foreign import createDisposable :: forall a. (a -> Unit) -> a -> Disposable
foreign import emptyDisposable :: Disposable
foreign import disposeAll :: Array Disposable -> Disposable
foreign import scheduleTasks :: Array Task -> Scheduler -> Disposable
-- combinators
foreign import drain :: forall e a. Stream a -> EffStream e Unit
foreign import tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a
foreign import tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a
foreign import mergeConcurrently :: forall a. Int -> Stream (Stream a) -> Stream a
foreign import mergeMapConcurrently :: forall a b. (a -> Stream b) -> Stream a -> Stream b
foreign import combine2 :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
foreign import combine3 :: forall a b c d. (a -> b -> c -> d) -> Stream a -> Stream b -> Stream c -> Stream d
foreign import combine4 :: forall a b c d e. (a -> b -> c -> d -> e) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e
foreign import combine5 :: forall a b c d e f. (a -> b -> c -> d -> e -> f) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e -> Stream f
foreign import join :: forall a. Stream (Stream a) -> Stream a
foreign import flatMap :: forall a b. Stream a -> (a -> Stream b) -> Stream b
foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Stream a
foreign import concat :: forall a. Stream a -> Stream a -> Stream a
foreign import merge :: forall a. Array (Stream a) -> Stream a
foreign import switch :: forall a. Stream (Stream a) -> Stream a