diff --git a/.gitignore b/.gitignore index ce25bb8..4607e3e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ /generated-docs/ /.psc* /.psa* +src/Main.purs diff --git a/bower.json b/bower.json index 266cfac..0936138 100644 --- a/bower.json +++ b/bower.json @@ -9,8 +9,7 @@ "dependencies": { "purescript-eff": "^2.0.0", "purescript-prelude": "^2.5.0", - "purescript-aff-promise": "^0.4.0", - "purescript-aff": "^2.0.3" + "purescript-refs": "^2.0.0" }, "devDependencies": { "purescript-console": "^2.0.0", diff --git a/ffi/Control/Stream/combinators/combine.js b/ffi/Control/Stream/combinators/combine.js new file mode 100644 index 0000000..f13f6ba --- /dev/null +++ b/ffi/Control/Stream/combinators/combine.js @@ -0,0 +1,97 @@ +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +import { map, tail } from '@most/prelude' + +import { IndexSink } from '../sink/IndexSink' +import { Stream } from '../Stream' +import { disposeAll } from '../disposables' + +export const combine2 = f => s1 => s2 => combineArray(f, [s1, s2]) +export const combine3 = f => s1 => s2 => s3 => combineArray(f, [s1, s2, s3]) +export const combine4 = f => s1 => s2 => s3 => s4 => combineArray(f, [s1, s2, s3, s4]) +export const combine5 = f => s1 => s2 => s3 => s4 => s5 => combineArray(f, [s1, s2, s3, s4, s5]) + +function combineArray (f, streams) { + return new Stream(combineSources(f, streams)) +} + +function combineSources (f, streams) { + return new Combine(f, map(getSource, streams)) +} + +function getSource (stream) { + return stream.source +} + +function Combine (f, sources) { + this.f = f + this.sources = sources +} + +Combine.prototype.run = function (sink) { + return scheduler => { + var l = this.sources.length + var disposables = new Array(l) + var sinks = new Array(l) + + var mergeSink = new CombineSink(disposables, sinks, sink, this.f) + + for (var indexSink, i = 0; i < l; ++i) { + indexSink = sinks[i] = new IndexSink(i, mergeSink) + disposables[i] = this.sources[i].run(indexSink)(scheduler) + } + + return disposeAll(disposables) + } +} + +function CombineSink (disposables, sinks, sink, f) { + this.sink = sink + this.disposables = disposables + this.sinks = sinks + this.f = f + + var l = sinks.length + this.awaiting = l + this.values = new Array(l) + this.hasValue = new Array(l) + + for (var i = 0; i < l; ++i) { + this.hasValue[i] = false + } + + this.activeCount = sinks.length +} + +CombineSink.prototype.event = function (t) { + return indexedValue => { + var i = indexedValue.index + var awaiting = this._updateReady(i) + + this.values[i] = indexedValue.value + if (awaiting === 0) { + const value = tail(this.values).reduce((f, x) => f(x), this.f(this.values[0])) + this.sink.event(t)(value) + } + } +} + +CombineSink.prototype._updateReady = function (index) { + if (this.awaiting > 0) { + if (!this.hasValue[index]) { + this.hasValue[index] = true + this.awaiting -= 1 + } + } + return this.awaiting +} + +CombineSink.prototype.end = function (t, indexedValue) { + this.disposables[indexedValue.index].dispose() + + if (--this.activeCount === 0) { + this.sink.end(t) + } +} diff --git a/ffi/Control/Stream/combinators/concat.js b/ffi/Control/Stream/combinators/concat.js new file mode 100644 index 0000000..95529ff --- /dev/null +++ b/ffi/Control/Stream/combinators/concat.js @@ -0,0 +1,56 @@ +import { MemoizedDisposable } from '../disposables/MemoizedDisposable' +import { Stream } from '../Stream' + +export const concat = s1 => s2 => continueWith(() => s2)(s1) + +export const continueWith = f => stream => new Stream(new ContinueWith(f, stream.source)) + +function ContinueWith (f, source) { + this.f = f + this.source = source +} + +ContinueWith.prototype.run = function (sink) { + return scheduler => { + return new ContinueWithSink(this.f, this.source, sink, scheduler) + } +} + +function ContinueWithSink (f, source, sink, scheduler) { + this.f = f + this.sink = sink + this.scheduler = scheduler + this.active = true + this.disposable = new MemoizedDisposable(source.run(this)(scheduler)) +} + +ContinueWithSink.prototype.event = function (t) { + return x => { + if (!this.active) { + return + } + this.sink.event(t)(x) + } +} + +ContinueWithSink.prototype.end = function (t, x) { + if (!this.active) { + return + } + + this.disposable.dispose() + this._startNext(t, x, this.sink) +} + +ContinueWithSink.prototype._startNext = function (t, x, sink) { + this._continue(this.f, x, sink) +} + +ContinueWithSink.prototype._continue = function (f, x, sink) { + return f(x).source.run(sink)(this.scheduler) +} + +ContinueWithSink.prototype.dispose = function () { + this.active = false + return this.disposable.dispose() +} diff --git a/ffi/Control/Stream/combinators/index.js b/ffi/Control/Stream/combinators/index.js index 5365bd9..f99d175 100644 --- a/ffi/Control/Stream/combinators/index.js +++ b/ffi/Control/Stream/combinators/index.js @@ -1,2 +1,6 @@ -export * from './tap' export * from './drain' +export * from './mergeConcurrently' +export * from './combine' +export * from './merge' +export * from './concat' +export * from './switch' diff --git a/ffi/Control/Stream/combinators/merge.js b/ffi/Control/Stream/combinators/merge.js new file mode 100644 index 0000000..3e6781d --- /dev/null +++ b/ffi/Control/Stream/combinators/merge.js @@ -0,0 +1,74 @@ +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +import { IndexSink } from '../sink/IndexSink' +import { Stream } from '../Stream' +import { disposeAll } from '../disposables' +import { reduce } from '@most/prelude' + +/** + * @param {Array} streams array of stream to merge + * @returns {Stream} stream containing events from all input observables + * in time order. If two events are simultaneous they will be merged in + * arbitrary order. + */ +export function merge (streams) { + return new Stream(mergeSources(streams)) +} + +/** + * This implements fusion/flattening for merge. It will + * fuse adjacent merge operations. For example: + * - a.merge(b).merge(c) effectively becomes merge(a, b, c) + * - merge(a, merge(b, c)) effectively becomes merge(a, b, c) + * It does this by concatenating the sources arrays of + * any nested Merge sources, in effect "flattening" nested + * merge operations into a single merge. + */ +function mergeSources (streams) { + return new Merge(reduce(appendSources, [], streams)) +} + +function appendSources (sources, stream) { + var source = stream.source + return source instanceof Merge + ? sources.concat(source.sources) + : sources.concat(source) +} + +function Merge (sources) { + this.sources = sources +} + +Merge.prototype.run = function (sink, scheduler) { + var l = this.sources.length + var disposables = new Array(l) + var sinks = new Array(l) + + var mergeSink = new MergeSink(disposables, sinks, sink) + + for (var indexSink, i = 0; i < l; ++i) { + indexSink = sinks[i] = new IndexSink(i, mergeSink) + disposables[i] = this.sources[i].run(indexSink, scheduler) + } + + return disposeAll(disposables) +} + +function MergeSink (disposables, sinks, sink) { + this.sink = sink + this.disposables = disposables + this.activeCount = sinks.length +} + +MergeSink.prototype.event = function (t, indexValue) { + this.sink.event(t, indexValue.value) +} + +MergeSink.prototype.end = function (t, indexedValue) { + dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink) + if (--this.activeCount === 0) { + this.sink.end(t, indexedValue.value) + } +} diff --git a/ffi/Control/Stream/combinators/mergeConcurrently.js b/ffi/Control/Stream/combinators/mergeConcurrently.js index d2f5a2b..c1e0c0a 100644 --- a/ffi/Control/Stream/combinators/mergeConcurrently.js +++ b/ffi/Control/Stream/combinators/mergeConcurrently.js @@ -2,15 +2,23 @@ /** @author Brian Cavalier */ /** @author John Hann */ +import { curry2, curry3, id as identity } from '@most/prelude' + import { LinkedList } from '../LinkedList' -import { MemoizedDisosable } from '../disposables/MemoizedDisposable' -import { id as identity } from '@most/prelude' +import { MemoizedDisposable } from '../disposables/MemoizedDisposable' +import { Stream } from '../Stream' -export const mergeConcurrently = (concurrency, stream) => - mergeMapConcurrently(identity, concurrency, stream) +export const flatMap = + curry2((stream, f) => mergeMapConcurrently(f, Infinity, stream)) -export const mergeMapConcurrently = (f, concurrency, stream) => - new MergeConcurrently(f, concurrency, stream) +export const mergeConcurrently = curry2((concurrency, stream) => + mergeMapConcurrently(identity, concurrency, stream)) + +export const join = + mergeConcurrently(Infinity) + +export const mergeMapConcurrently = curry3((f, concurrency, stream) => + new Stream(new MergeConcurrently(f, concurrency, stream.source))) class MergeConcurrently { constructor (f, concurrency, source) { @@ -19,8 +27,8 @@ class MergeConcurrently { this.source = source } - run (sink, scheduler) { - return new Outer(this.f, this.concurrency, this.source, sink, scheduler) + run (sink) { + return scheduler => new Outer(this.f, this.concurrency, this.source, sink, scheduler) } } @@ -32,12 +40,12 @@ class Outer { this.scheduler = scheduler this.pending = [] this.current = new LinkedList() - this.disposable = new MemoizedDisosable(source.run(this, scheduler)) + this.disposable = new MemoizedDisposable(source.run(this)(scheduler)) this.active = true } - event (t, x) { - this._addInner(t, x) + event (t) { + return x => this._addInner(t, x) } _addInner (t, x) { @@ -62,10 +70,10 @@ class Outer { this.current.add(innerSink) } - end (t, x) { + end (t) { this.active = false this.disposable.dispose() - this._checkEnd(t, x) + this._checkEnd(t) } dispose () { @@ -75,26 +83,25 @@ class Outer { this.current.dispose() } - _endInner (t, x, inner) { + _endInner (t, inner) { this.current.remove(inner) inner.dispose() if (this.pending.length === 0) { - this._checkEnd(t, x) + this._checkEnd(t) } else { this._startInner(t, this.pending.shift()) } } - _checkEnd (t, x) { + _checkEnd (t) { if (!this.active && this.current.isEmpty()) { - this.sink.end(t, x) + this.sink.end(t) } } } -const mapAndRun = (f, x, sink, scheduler) => - f(x).run(sink, scheduler) +const mapAndRun = (f, x, sink, scheduler) => f(x).source.run(sink)(scheduler) class Inner { constructor (time, outer, sink) { @@ -105,16 +112,12 @@ class Inner { this.disposable = void 0 } - event (t, x) { - this.sink.event(Math.max(t, this.time), x) + event (t) { + return x => this.sink.event(Math.max(t, this.time))(x) } - end (t, x) { - this.outer._endInner(Math.max(t, this.time), x, this) - } - - error (t, e) { - this.outer.error(Math.max(t, this.time), e) + end (t) { + this.outer._endInner(Math.max(t, this.time), this) } dispose () { diff --git a/ffi/Control/Stream/combinators/switch.js b/ffi/Control/Stream/combinators/switch.js new file mode 100644 index 0000000..f578c6d --- /dev/null +++ b/ffi/Control/Stream/combinators/switch.js @@ -0,0 +1,99 @@ +import { disposeAll, emptyDisposable } from '../disposables' + +import { Stream } from '../Stream' + +/** + * Given a stream of streams, return a new stream that adopts the behavior + * of the most recent inner stream. + * @param {Stream} stream of streams on which to switch + * @returns {Stream} switching stream + */ +function switchLatest (stream) { + return new Stream(new Switch(stream.source)) +} + +export { switchLatest as switch } + +function Switch (source) { + this.source = source +} + +Switch.prototype.run = function (sink) { + return scheduler => { + const switchSink = new SwitchSink(sink, scheduler) + return disposeAll([switchSink, this.source.run(switchSink)(scheduler)]) + } +} + +function SwitchSink (sink, scheduler) { + this.sink = sink + this.scheduler = scheduler + this.current = null + this.ended = false +} + +SwitchSink.prototype.event = function (t) { + return stream => { + this._disposeCurrent(t) // TODO: capture the result of this dispose + this.current = new Segment(t, Infinity, this, this.sink) + this.current.disposable = stream.source.run(this.current)(this.scheduler) + } +} + +SwitchSink.prototype.end = function (t, x) { + this.ended = true + this._checkEnd(t, x) +} + +SwitchSink.prototype.dispose = function () { + return this._disposeCurrent(this.scheduler.now()) +} + +SwitchSink.prototype._disposeCurrent = function (t) { + if (this.current !== null) { + return this.current._dispose(t) + } +} + +SwitchSink.prototype._disposeInner = function (t, inner) { + inner._dispose(t) // TODO: capture the result of this dispose + if (inner === this.current) { + this.current = null + } +} + +SwitchSink.prototype._checkEnd = function (t, x) { + if (this.ended && this.current === null) { + this.sink.end(t, x) + } +} + +SwitchSink.prototype._endInner = function (t, x, inner) { + this._disposeInner(t, inner) + this._checkEnd(t, x) +} + +function Segment (min, max, outer, sink) { + this.min = min + this.max = max + this.outer = outer + this.sink = sink + this.disposable = emptyDisposable +} + +Segment.prototype.event = function (t) { + return x => { + if (t < this.max) { + this.sink.event(Math.max(t, this.min))(x) + } + } +} + +Segment.prototype.end = function (t, x) { + this.outer._endInner(Math.max(t, this.min), x, this) +} + +Segment.prototype._dispose = function (t) { + this.max = t + this.disposable.dispose() +} diff --git a/ffi/Control/Stream/combinators/tap.js b/ffi/Control/Stream/combinators/tap.js deleted file mode 100644 index 51e7b21..0000000 --- a/ffi/Control/Stream/combinators/tap.js +++ /dev/null @@ -1,50 +0,0 @@ -import { Stream } from '../Stream' -import { curry2 } from '@most/prelude' - -export const tapEnd = curry2((f, stream) => new Stream(new TapEnd(f, stream.source))) - -export const tapEvent = curry2((f, stream) => new Stream(new TapEvent(f, stream.source))) - -class TapEvent { - constructor (f, source) { - this.source = source - this.f = f - - this.run = sink => scheduler => - source.run(new TapSink(f, sink))(scheduler) - } -} - -class TapSink { - constructor (f, sink) { - this.event = t => x => { - f(x)() - sink.event(t)(x) - } - - this.end = t => { - sink.end(t) - } - } -} - -class TapEnd { - constructor (f, source) { - this.source = source - this.f = f - - this.run = sink => scheduler => - source.run(new TapEndSink(f, sink))(scheduler) - } -} - -class TapEndSink { - constructor (f, sink) { - this.event = t => x => sink.event(t)(x) - - this.end = t => { - f() - sink.end(t) - } - } -} diff --git a/ffi/Control/Stream/disposables/Disposable.js b/ffi/Control/Stream/disposables/Disposable.js index afbb45e..5659341 100644 --- a/ffi/Control/Stream/disposables/Disposable.js +++ b/ffi/Control/Stream/disposables/Disposable.js @@ -7,9 +7,7 @@ export class Disposable { constructor (dispose, data) { this._dispose = dispose this._data = data - } - dispose () { - return this._dispose(this._data) + this.dispose = () => dispose(data) } } diff --git a/ffi/Control/Stream/disposables/MemoizedDisposable.js b/ffi/Control/Stream/disposables/MemoizedDisposable.js index 5b84266..5a33058 100644 --- a/ffi/Control/Stream/disposables/MemoizedDisposable.js +++ b/ffi/Control/Stream/disposables/MemoizedDisposable.js @@ -8,7 +8,7 @@ export class MemoizedDisposable { dispose () { if (!this.disposed) { this.disposed = true - this.value = disposeSafely(this.disposable) + this.value = this.disposable.dispose() this.disposable = undefined } diff --git a/ffi/Control/Stream/disposables/index.js b/ffi/Control/Stream/disposables/index.js index 5289cbf..3997410 100644 --- a/ffi/Control/Stream/disposables/index.js +++ b/ffi/Control/Stream/disposables/index.js @@ -9,4 +9,4 @@ export const createDisposable = curry2((dispose, data) => new MemoizedDisposable export const emptyDisposable = new Disposable(id, undefined) export const disposeAll = disposables => - createDisposable(map(disposeSafely, disposables), disposables) + createDisposable(curry2(map)(disposeSafely), disposables) diff --git a/ffi/Control/Stream/runEffects.js b/ffi/Control/Stream/runEffects.js index 94a3019..1c128f2 100644 --- a/ffi/Control/Stream/runEffects.js +++ b/ffi/Control/Stream/runEffects.js @@ -11,6 +11,8 @@ function runSource (source, scheduler) { const observer = new RunEffectsSink(disposable) disposable.setDisposable(source.run(observer)(scheduler)) + + return {} } var RunEffectsSink = function RunEffectsSink (disposable) { diff --git a/ffi/Control/Stream/scheduler/ScheduledTask.js b/ffi/Control/Stream/scheduler/ScheduledTask.js index fb88107..fa0140a 100644 --- a/ffi/Control/Stream/scheduler/ScheduledTask.js +++ b/ffi/Control/Stream/scheduler/ScheduledTask.js @@ -1,14 +1,19 @@ /* @flow */ -export function ScheduledTask (delay, period, task, scheduler) { - this.time = delay - this.period = period - this.task = task - this.scheduler = scheduler - this.active = true +export class ScheduledTask { + constructor (delay, period, task, scheduler) { + this.time = delay + this.period = period + this.task = task + this.scheduler = scheduler + this.active = true + } - this.run = () => task.run(this.time) - this.dispose = () => { - scheduler.cancel(this) - task.dispose() + run () { + this.task.run(this.time) + } + + dispose () { + this.scheduler.cancel(this) + this.task.dispose() } } diff --git a/ffi/Control/Stream/scheduler/Scheduler.js b/ffi/Control/Stream/scheduler/Scheduler.js index 6dc626f..67e7e41 100644 --- a/ffi/Control/Stream/scheduler/Scheduler.js +++ b/ffi/Control/Stream/scheduler/Scheduler.js @@ -25,24 +25,26 @@ export class Scheduler { } asap (task) { - return this.schedule(0, -1, task) + return this.schedule(0)(-1)(task) } - delay (delay, task) { - return this.schedule(delay, -1, task) + delay (delay) { + return task => this.schedule(delay)(-1)(task) } - periodic (period, task) { - return this.schedule(0, period, task) + periodic (period) { + return task => this.schedule(0)(period)(task) } - schedule (delay, period, task) { - const now = this.now() - const st = new ScheduledTask(now + Math.max(0, delay), period, task, this) + schedule (delay) { + return period => task => { + const now = this.now() + const st = new ScheduledTask(now + Math.max(0, delay), period, task, this) - this.timeline.add(st) - this._scheduleNextRun(now) - return st + this.timeline.add(st) + this._scheduleNextRun(now) + return st + } } cancel (task) { diff --git a/ffi/Control/Stream/sink/IndexSink.js b/ffi/Control/Stream/sink/IndexSink.js index 01cb7d9..b62f5ee 100644 --- a/ffi/Control/Stream/sink/IndexSink.js +++ b/ffi/Control/Stream/sink/IndexSink.js @@ -15,10 +15,10 @@ export class IndexSink { } } - end (t, x) { + end (t) { if (!this.active) return this.active = false - this.sink.end(t, { index: this.index, value: x }) + this.sink.end(t, { index: this.index }) } } diff --git a/package.json b/package.json index 8242b64..73acb16 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "description": "", "main": "index.js", "scripts": { + "start": "yarn build && pulp run", "build": "rollup -c && pulp build", "changelog": "conventional-changelog -i CHANGELOG.md -s -r 0 -p angular", "clean": "rimraf lib lib.es2015", diff --git a/src/Control/Stream.js b/src/Control/Stream.js index d54467f..bb94ba1 100644 --- a/src/Control/Stream.js +++ b/src/Control/Stream.js @@ -16,11 +16,38 @@ Object.defineProperty(exports, '__esModule', { value: true }); // drop :: Int -> [a] -> [a] // drop first n elements +function drop (n, a) { // eslint-disable-line complexity + if (n < 0) { + throw new TypeError('n must be >= 0') + } + var l = a.length; + if (n === 0 || l === 0) { + return a + } + + if (n >= l) { + return [] + } + + return unsafeDrop(n, a, l - n) +} + +// unsafeDrop :: Int -> [a] -> Int -> [a] +// Internal helper for drop +function unsafeDrop (n, a, l) { + var b = new Array(l); + for (var i = 0; i < l; ++i) { + b[i] = a[n + i]; + } + return b +} // tail :: [a] -> [a] // drop head element - +function tail (a) { + return drop(1, a) +} // copy :: [a] -> [a] // duplicate a (shallow duplication) @@ -39,7 +66,13 @@ function map (f, a) { // reduce :: (a -> b -> a) -> a -> [b] -> a // accumulate via left-fold - +function reduce (f, z, a) { + var r = z; + for (var i = 0, l = a.length; i < l; ++i) { + r = f(r, a[i], i); + } + return r +} // replace :: a -> Int -> [a] // replace element at index @@ -124,10 +157,8 @@ function curry3 (f) { var Disposable = function Disposable (dispose, data) { this._dispose = dispose; this._data = data; -}; -Disposable.prototype.dispose = function dispose () { - return this._dispose(this._data) + this.dispose = function () { return dispose(data); }; }; var MemoizedDisposable = function MemoizedDisposable (disposable) { @@ -139,14 +170,14 @@ var MemoizedDisposable = function MemoizedDisposable (disposable) { MemoizedDisposable.prototype.dispose = function dispose () { if (!this.disposed) { this.disposed = true; - this.value = disposeSafely(this.disposable); + this.value = this.disposable.dispose(); this.disposable = undefined; } return this.value }; -function disposeSafely$1 (disposable) { +function disposeSafely (disposable) { disposable.dispose(); } @@ -154,7 +185,7 @@ var createDisposable = curry2(function (dispose, data) { return new MemoizedDisp var emptyDisposable = new Disposable(id, undefined); -var disposeAll = function (disposables) { return createDisposable(map(disposeSafely$1, disposables), disposables); }; +var disposeAll = function (disposables) { return createDisposable(curry2(map)(disposeSafely), disposables); }; /** @license MIT License (c) copyright 2010-2016 original author or authors */ /** @author Brian Cavalier */ @@ -202,21 +233,22 @@ function runAsap (f) { } /* */ -function ScheduledTask (delay, period, task, scheduler) { - var this$1 = this; - +var ScheduledTask = function ScheduledTask (delay, period, task, scheduler) { this.time = delay; this.period = period; this.task = task; this.scheduler = scheduler; this.active = true; +}; - this.run = function () { return task.run(this$1.time); }; - this.dispose = function () { - scheduler.cancel(this$1); - task.dispose(); - }; -} +ScheduledTask.prototype.run = function run () { + this.task.run(this.time); +}; + +ScheduledTask.prototype.dispose = function dispose () { + this.scheduler.cancel(this); + this.task.dispose(); +}; /* */ function runTask$1 (task) { @@ -244,24 +276,32 @@ Scheduler.prototype.now = function now () { }; Scheduler.prototype.asap = function asap (task) { - return this.schedule(0, -1, task) + return this.schedule(0)(-1)(task) }; -Scheduler.prototype.delay = function delay (delay$1, task) { - return this.schedule(delay$1, -1, task) +Scheduler.prototype.delay = function delay (delay$1) { + var this$1 = this; + + return function (task) { return this$1.schedule(delay$1)(-1)(task); } }; -Scheduler.prototype.periodic = function periodic (period, task) { - return this.schedule(0, period, task) +Scheduler.prototype.periodic = function periodic (period) { + var this$1 = this; + + return function (task) { return this$1.schedule(0)(period)(task); } }; -Scheduler.prototype.schedule = function schedule (delay, period, task) { - var now = this.now(); - var st = new ScheduledTask(now + Math.max(0, delay), period, task, this); +Scheduler.prototype.schedule = function schedule (delay) { + var this$1 = this; - this.timeline.add(st); - this._scheduleNextRun(now); - return st + return function (period) { return function (task) { + var now = this$1.now(); + var st = new ScheduledTask(now + Math.max(0, delay), period, task, this$1); + + this$1.timeline.add(st); + this$1._scheduleNextRun(now); + return st + }; } }; Scheduler.prototype.cancel = function cancel (task) { @@ -478,48 +518,6 @@ var scheduleTasks = curry2(function (tasks, scheduler) { } }); -var Stream = function Stream (source) { - this.source = source; -}; - -var tapEnd = curry2(function (f, stream) { return new Stream(new TapEnd(f, stream.source)); }); - -var tapEvent = curry2(function (f, stream) { return new Stream(new TapEvent(f, stream.source)); }); - -var TapEvent = function TapEvent (f, source) { - this.source = source; - this.f = f; - - this.run = function (sink) { return function (scheduler) { return source.run(new TapSink(f, sink))(scheduler); }; }; -}; - -var TapSink = function TapSink (f, sink) { - this.event = function (t) { return function (x) { - f(x)(); - sink.event(t)(x); - }; }; - - this.end = function (t) { - sink.end(t); - }; -}; - -var TapEnd = function TapEnd (f, source) { - this.source = source; - this.f = f; - - this.run = function (sink) { return function (scheduler) { return source.run(new TapEndSink(f, sink))(scheduler); }; }; -}; - -var TapEndSink = function TapEndSink (f, sink) { - this.event = function (t) { return function (x) { return sink.event(t)(x); }; }; - - this.end = function (t) { - f(); - sink.end(t); - }; -}; - var SettableDisposable = function SettableDisposable () { this.disposable = void 0; this.disposed = false; @@ -559,6 +557,8 @@ function runSource (source, scheduler) { var observer = new RunEffectsSink(disposable); disposable.setDisposable(source.run(observer)(scheduler)); + + return {} } var RunEffectsSink = function RunEffectsSink (disposable) { @@ -577,100 +577,526 @@ RunEffectsSink.prototype.end = function end (t) { var drain = function (stream) { return function () { return runEffects(stream, defaultScheduler); }; }; -var combine = - curry3(function (f, stream1, stream2) { return new Stream(new Combine(f, stream1.source, stream2.source)); }); - -var Combine = function Combine (f, source1, source2) { - this.f = f; - this.source1 = source1; - this.source2 = source2; +var LinkedList = function LinkedList () { + this.head = null; + this.length = 0; }; -Combine.prototype.run = function run (sink) { - var this$1 = this; +LinkedList.prototype.add = function add (x) { + if (this.head !== null) { + this.head.prev = x; + x.next = this.head; + } + this.head = x; + ++this.length; +}; - return function (scheduler) { - var disposables = Array(2); - var sinks = Array(2); - - var mergeSink = new CombineSink(disposables, sinks, sink, this$1.f); - - sinks[0] = new IndexSink(0, mergeSink); - sinks[1] = new IndexSink(1, mergeSink); - disposables[0] = this$1.source1.run(sinks[0])(scheduler); - disposables[1] = this$1.source2.run(sinks[1])(scheduler); - - return disposeAll(disposables) +LinkedList.prototype.remove = function remove (x) { // eslint-disable-linecomplexity + --this.length; + if (x === this.head) { + this.head = this.head.next; + } + if (x.next !== null) { + x.next.prev = x.prev; + x.next = null; + } + if (x.prev !== null) { + x.prev.next = x.next; + x.prev = null; } }; -var IndexSink = function IndexSink () {}; +LinkedList.prototype.isEmpty = function isEmpty () { + return this.length === 0 +}; -IndexSink.prototype.cosntructor = function cosntructor (i, sink) { - this.i = i; +LinkedList.prototype.dispose = function dispose () { + if (this.isEmpty()) { return } + + var x = this.head; + this.head = null; + this.length = 0; + + while (x !== null) { + x.dispose(); + x = x.next; + } +}; + +var Stream = function Stream (source) { + this.source = source; +}; + +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +var flatMap = + curry2(function (stream, f) { return mergeMapConcurrently(f, Infinity, stream); }); + +var mergeConcurrently = curry2(function (concurrency, stream) { return mergeMapConcurrently(id, concurrency, stream); }); + +var join = + mergeConcurrently(Infinity); + +var mergeMapConcurrently = curry3(function (f, concurrency, stream) { return new Stream(new MergeConcurrently(f, concurrency, stream.source)); }); + +var MergeConcurrently = function MergeConcurrently (f, concurrency, source) { + this.f = f; + this.concurrency = concurrency; + this.source = source; +}; + +MergeConcurrently.prototype.run = function run (sink) { + var this$1 = this; + + return function (scheduler) { return new Outer(this$1.f, this$1.concurrency, this$1.source, sink, scheduler); } +}; + +var Outer = function Outer (f, concurrency, source, sink, scheduler) { + this.f = f; + this.concurrency = concurrency; this.sink = sink; + this.scheduler = scheduler; + this.pending = []; + this.current = new LinkedList(); + this.disposable = new MemoizedDisposable(source.run(this)(scheduler)); + this.active = true; +}; + +Outer.prototype.event = function event (t) { + var this$1 = this; + + return function (x) { return this$1._addInner(t, x); } +}; + +Outer.prototype._addInner = function _addInner (t, x) { + if (this.current.length < this.concurrency) { + this._startInner(t, x); + } else { + this.pending.push(x); + } +}; + +Outer.prototype._startInner = function _startInner (t, x) { + try { + this._initInner(t, x); + } catch (e) { + this.error(t, e); + } +}; + +Outer.prototype._initInner = function _initInner (t, x) { + var innerSink = new Inner(t, this, this.sink); + innerSink.disposable = mapAndRun(this.f, x, innerSink, this.scheduler); + this.current.add(innerSink); +}; + +Outer.prototype.end = function end (t) { + this.active = false; + this.disposable.dispose(); + this._checkEnd(t); +}; + +Outer.prototype.dispose = function dispose () { + this.active = false; + this.pending.length = 0; + this.disposable.dispose(); + this.current.dispose(); +}; + +Outer.prototype._endInner = function _endInner (t, inner) { + this.current.remove(inner); + inner.dispose(); + + if (this.pending.length === 0) { + this._checkEnd(t); + } else { + this._startInner(t, this.pending.shift()); + } +}; + +Outer.prototype._checkEnd = function _checkEnd (t) { + if (!this.active && this.current.isEmpty()) { + this.sink.end(t); + } +}; + +var mapAndRun = function (f, x, sink, scheduler) { return f(x).source.run(sink)(scheduler); }; + +var Inner = function Inner (time, outer, sink) { + this.prev = this.next = null; + this.time = time; + this.outer = outer; + this.sink = sink; + this.disposable = void 0; +}; + +Inner.prototype.event = function event (t) { + var this$1 = this; + + return function (x) { return this$1.sink.event(Math.max(t, this$1.time))(x); } +}; + +Inner.prototype.end = function end (t) { + this.outer._endInner(Math.max(t, this.time), this); +}; + +Inner.prototype.dispose = function dispose () { + return this.disposable.dispose() +}; + +var IndexSink = function IndexSink (i, sink) { + this.sink = sink; + this.index = i; + this.active = true; + this.value = void 0; }; IndexSink.prototype.event = function event (t) { var this$1 = this; return function (x) { - this$1.sink.event(t)({ index: this$1.i, value: x }); + if (!this$1.active) { return } + + this$1.value = x; + this$1.sink.event(t)(this$1); } }; IndexSink.prototype.end = function end (t) { - this.sink.end(t, this.i); + if (!this.active) { return } + + this.active = false; + this.sink.end(t, { index: this.index }); }; -var CombineSink = function CombineSink (disposables, sinks, sink, f) { +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +var combine2 = function (f) { return function (s1) { return function (s2) { return combineArray(f, [s1, s2]); }; }; }; +var combine3 = function (f) { return function (s1) { return function (s2) { return function (s3) { return combineArray(f, [s1, s2, s3]); }; }; }; }; +var combine4 = function (f) { return function (s1) { return function (s2) { return function (s3) { return function (s4) { return combineArray(f, [s1, s2, s3, s4]); }; }; }; }; }; +var combine5 = function (f) { return function (s1) { return function (s2) { return function (s3) { return function (s4) { return function (s5) { return combineArray(f, [s1, s2, s3, s4, s5]); }; }; }; }; }; }; + +function combineArray (f, streams) { + return new Stream(combineSources(f, streams)) +} + +function combineSources (f, streams) { + return new Combine(f, map(getSource, streams)) +} + +function getSource (stream) { + return stream.source +} + +function Combine (f, sources) { + this.f = f; + this.sources = sources; +} + +Combine.prototype.run = function (sink) { + var this$1 = this; + + return function (scheduler) { + var l = this$1.sources.length; + var disposables = new Array(l); + var sinks = new Array(l); + + var mergeSink = new CombineSink(disposables, sinks, sink, this$1.f); + + for (var indexSink, i = 0; i < l; ++i) { + indexSink = sinks[i] = new IndexSink(i, mergeSink); + disposables[i] = this$1.sources[i].run(indexSink)(scheduler); + } + + return disposeAll(disposables) + } +}; + +function CombineSink (disposables, sinks, sink, f) { + var this$1 = this; + + this.sink = sink; this.disposables = disposables; this.sinks = sinks; - this.sink = sink; this.f = f; - this.awaiting = 2; - this.values = Array(2); - this.hasValue = [false, false]; - this.activeCount = 2; -}; + var l = sinks.length; + this.awaiting = l; + this.values = new Array(l); + this.hasValue = new Array(l); -CombineSink.prototype.event = function event (t) { - var this$1 = this; + for (var i = 0; i < l; ++i) { + this$1.hasValue[i] = false; + } - return function (ref) { - var index = ref.index; - var value = ref.value; + this.activeCount = sinks.length; +} - var f = this$1.f; - var awaiting = this$1._updateReady(index); - this$1.values[index] = value; +CombineSink.prototype.event = function (t) { + var this$1 = this; + return function (indexedValue) { + var i = indexedValue.index; + var awaiting = this$1._updateReady(i); + + this$1.values[i] = indexedValue.value; if (awaiting === 0) { - var result = f(this$1.values[0])(this$1.values[1]); - this$1.sink.event(t)(result); + var value = tail(this$1.values).reduce(function (f, x) { return f(x); }, this$1.f(this$1.values[0])); + this$1.sink.event(t)(value); } } }; -CombineSink.prototype._updateReady = function _updateReady (index) { +CombineSink.prototype._updateReady = function (index) { if (this.awaiting > 0) { if (!this.hasValue[index]) { this.hasValue[index] = true; this.awaiting -= 1; } } - return this.awaiting }; -CombineSink.prototype.end = function end (t, ref) { - var index = ref.index; - var value = ref.value; +CombineSink.prototype.end = function (t, indexedValue) { + this.disposables[indexedValue.index].dispose(); - this.disposables[index].dispose(); + if (--this.activeCount === 0) { + this.sink.end(t); + } +}; - if (--this.activeCount === 0) { this.sink.end(t); } +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +/** + * @param {Array} streams array of stream to merge + * @returns {Stream} stream containing events from all input observables + * in time order. If two events are simultaneous they will be merged in + * arbitrary order. + */ +function merge (streams) { + return new Stream(mergeSources(streams)) +} + +/** + * This implements fusion/flattening for merge. It will + * fuse adjacent merge operations. For example: + * - a.merge(b).merge(c) effectively becomes merge(a, b, c) + * - merge(a, merge(b, c)) effectively becomes merge(a, b, c) + * It does this by concatenating the sources arrays of + * any nested Merge sources, in effect "flattening" nested + * merge operations into a single merge. + */ +function mergeSources (streams) { + return new Merge(reduce(appendSources, [], streams)) +} + +function appendSources (sources, stream) { + var source = stream.source; + return source instanceof Merge + ? sources.concat(source.sources) + : sources.concat(source) +} + +function Merge (sources) { + this.sources = sources; +} + +Merge.prototype.run = function (sink, scheduler) { + var this$1 = this; + + var l = this.sources.length; + var disposables = new Array(l); + var sinks = new Array(l); + + var mergeSink = new MergeSink(disposables, sinks, sink); + + for (var indexSink, i = 0; i < l; ++i) { + indexSink = sinks[i] = new IndexSink(i, mergeSink); + disposables[i] = this$1.sources[i].run(indexSink, scheduler); + } + + return disposeAll(disposables) +}; + +function MergeSink (disposables, sinks, sink) { + this.sink = sink; + this.disposables = disposables; + this.activeCount = sinks.length; +} + +MergeSink.prototype.event = function (t, indexValue) { + this.sink.event(t, indexValue.value); +}; + +MergeSink.prototype.end = function (t, indexedValue) { + dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink); + if (--this.activeCount === 0) { + this.sink.end(t, indexedValue.value); + } +}; + +var concat = function (s1) { return function (s2) { return continueWith(function () { return s2; })(s1); }; }; + +var continueWith = function (f) { return function (stream) { return new Stream(new ContinueWith(f, stream.source)); }; }; + +function ContinueWith (f, source) { + this.f = f; + this.source = source; +} + +ContinueWith.prototype.run = function (sink) { + var this$1 = this; + + return function (scheduler) { + return new ContinueWithSink(this$1.f, this$1.source, sink, scheduler) + } +}; + +function ContinueWithSink (f, source, sink, scheduler) { + this.f = f; + this.sink = sink; + this.scheduler = scheduler; + this.active = true; + this.disposable = new MemoizedDisposable(source.run(this)(scheduler)); +} + +ContinueWithSink.prototype.event = function (t) { + var this$1 = this; + + return function (x) { + if (!this$1.active) { + return + } + this$1.sink.event(t)(x); + } +}; + +ContinueWithSink.prototype.end = function (t, x) { + if (!this.active) { + return + } + + this.disposable.dispose(); + this._startNext(t, x, this.sink); +}; + +ContinueWithSink.prototype._startNext = function (t, x, sink) { + this._continue(this.f, x, sink); +}; + +ContinueWithSink.prototype._continue = function (f, x, sink) { + return f(x).source.run(sink)(this.scheduler) +}; + +ContinueWithSink.prototype.dispose = function () { + this.active = false; + return this.disposable.dispose() +}; + +/** + * Given a stream of streams, return a new stream that adopts the behavior + * of the most recent inner stream. + * @param {Stream} stream of streams on which to switch + * @returns {Stream} switching stream + */ +function switchLatest (stream) { + return new Stream(new Switch(stream.source)) +} + +function Switch (source) { + this.source = source; +} + +Switch.prototype.run = function (sink) { + var this$1 = this; + + return function (scheduler) { + var switchSink = new SwitchSink(sink, scheduler); + return disposeAll([switchSink, this$1.source.run(switchSink)(scheduler)]) + } +}; + +function SwitchSink (sink, scheduler) { + this.sink = sink; + this.scheduler = scheduler; + this.current = null; + this.ended = false; +} + +SwitchSink.prototype.event = function (t) { + var this$1 = this; + + return function (stream) { + this$1._disposeCurrent(t); // TODO: capture the result of this dispose + this$1.current = new Segment(t, Infinity, this$1, this$1.sink); + this$1.current.disposable = stream.source.run(this$1.current)(this$1.scheduler); + } +}; + +SwitchSink.prototype.end = function (t, x) { + this.ended = true; + this._checkEnd(t, x); +}; + +SwitchSink.prototype.dispose = function () { + return this._disposeCurrent(this.scheduler.now()) +}; + +SwitchSink.prototype._disposeCurrent = function (t) { + if (this.current !== null) { + return this.current._dispose(t) + } +}; + +SwitchSink.prototype._disposeInner = function (t, inner) { + inner._dispose(t); // TODO: capture the result of this dispose + if (inner === this.current) { + this.current = null; + } +}; + +SwitchSink.prototype._checkEnd = function (t, x) { + if (this.ended && this.current === null) { + this.sink.end(t, x); + } +}; + +SwitchSink.prototype._endInner = function (t, x, inner) { + this._disposeInner(t, inner); + this._checkEnd(t, x); +}; + +function Segment (min, max, outer, sink) { + this.min = min; + this.max = max; + this.outer = outer; + this.sink = sink; + this.disposable = emptyDisposable; +} + +Segment.prototype.event = function (t) { + var this$1 = this; + + return function (x) { + if (t < this$1.max) { + this$1.sink.event(Math.max(t, this$1.min))(x); + } + } +}; + +Segment.prototype.end = function (t, x) { + this.outer._endInner(Math.max(t, this.min), x, this); +}; + +Segment.prototype._dispose = function (t) { + this.max = t; + this.disposable.dispose(); }; /* */ @@ -682,7 +1108,16 @@ exports.defaultScheduler = defaultScheduler; exports.scheduleTasks = scheduleTasks; exports.endTask = endTask; exports.eventTask = eventTask; -exports.tapEnd = tapEnd; -exports.tapEvent = tapEvent; exports.drain = drain; -exports.combine = combine; +exports.flatMap = flatMap; +exports.mergeConcurrently = mergeConcurrently; +exports.join = join; +exports.mergeMapConcurrently = mergeMapConcurrently; +exports.combine2 = combine2; +exports.combine3 = combine3; +exports.combine4 = combine4; +exports.combine5 = combine5; +exports.merge = merge; +exports.concat = concat; +exports.continueWith = continueWith; +exports.switch = switchLatest; diff --git a/src/Control/Stream.purs b/src/Control/Stream.purs index e1f4db7..d6af4f9 100644 --- a/src/Control/Stream.purs +++ b/src/Control/Stream.purs @@ -10,9 +10,9 @@ module Control.Stream , ScheduledTask , Disposable , Time - , createStream - , getSource - , runSource + , EventFn + , EndFn + -- Scheduler-related functions and values , defaultScheduler , eventTask , endTask @@ -20,19 +20,54 @@ module Control.Stream , emptyDisposable , disposeAll , scheduleTasks + -- Stream-related helpers + , createStream + , createSink + , createCombinator + , createEventCombinator + , getSource + , runSource + -- Combinators , drain , tapEvent , tapEnd + , filter + , mergeConcurrently + , mergeMapConcurrently + , join + , combine2 + , combine3 + , combine4 + , combine5 + , continueWith + , concat + , merge + , switch + , constant + , scan + -- Stream factories , just , fromArray + , empty + , periodic ) where +import Control.Applicative (class Applicative) +import Control.Apply (class Apply) +import Control.Bind (class Bind) +import Control.Category (id) +import Control.Monad (class Monad) import Control.Monad.Eff (Eff, Pure) +import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef) +import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff) +import Control.MonadPlus (class Alt, class Plus) import Data.Array (snoc) +import Data.Function (flip, ($)) import Data.Functor (class Functor, map) -import Data.Unit (Unit) -import Prelude (flip) +import Data.Monoid (class Monoid) +import Data.Semigroup (class Semigroup) +import Data.Unit (Unit, unit) newtype Time = Time Int @@ -50,11 +85,11 @@ type Sink a = , end :: Time -> Unit } -type Scheduler = +newtype Scheduler = Scheduler { now :: Pure Time , asap :: Task -> ScheduledTask , delay :: Number -> Task -> ScheduledTask - , period :: Number -> Task -> ScheduledTask + , periodic :: Number -> Task -> ScheduledTask , schedule :: Number -> Number -> Task -> ScheduledTask , cancel :: ScheduledTask -> Unit , cancelAll :: (ScheduledTask -> Boolean) -> Unit @@ -72,9 +107,36 @@ type ScheduledTask = type Disposable = { dispose :: Pure Unit } +type EventFn a b = (Sink b -> Scheduler -> Time -> a -> Unit) +type EndFn a = (Sink a -> Scheduler -> Time -> Unit) + instance functorStream :: Functor Stream where map = _map +instance applyStream :: Apply Stream where + apply = combine2 id + +instance applicativeStream :: Applicative Stream where + pure = just + +instance bindStream :: Bind Stream where + bind = flatMap + +instance monadStream :: Monad Stream + +instance semigroupStream :: Semigroup (Stream a) where + append = concat + +instance altStream :: Alt Stream where + alt = \s1 s2 -> merge [s1, s2] + +instance monoidStream :: Monoid (Stream a) where + mempty = empty + +instance plusStream :: Plus Stream where + empty = empty + +-- Stream-related helpers createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a createStream run = Stream { source: { run } } @@ -84,6 +146,30 @@ getSource stream = case stream of Stream a -> a.source runSource :: forall a. Stream a -> Sink a -> Scheduler -> Disposable runSource stream sink scheduler = (getSource stream).run sink scheduler +createSink :: forall a. (Time -> a -> Unit) -> (Time -> Unit) -> Sink a +createSink event end = { event, end } + +createCombinator :: forall a b. EventFn a b -> EndFn b -> Stream a -> Stream b +createCombinator event end stream = createStream runCombinator + where + runCombinator :: Sink b -> Scheduler -> Disposable + runCombinator sink scheduler = + runSource stream (createSink (event sink scheduler) (end sink scheduler)) scheduler + +createEventCombinator :: forall a b. EventFn a b -> Stream a -> Stream b +createEventCombinator event stream = createCombinator event end stream + where + end :: EndFn b + end sink scheduler time = + sink.end time + +createEndCombinator :: forall a. EndFn a -> Stream a -> Stream a +createEndCombinator end stream = createCombinator event end stream + where + event :: EventFn a a + event sink scheduler time value = sink.event time value + +-- Stream factories just :: forall a. a -> Stream a just a = createStream (runJust a) @@ -91,6 +177,12 @@ runJust :: forall a. a -> Sink a -> Scheduler -> Disposable runJust a sink scheduler = scheduleTasks [ eventTask a sink, endTask sink ] scheduler +empty :: forall a. Stream a +empty = createStream \sink scheduler -> scheduleTasks [endTask sink] scheduler + +never :: forall a. Stream a +never = createStream \sink scheduler -> emptyDisposable + -- TODO: remove fromArray because it's not a real event stream -- replace with a fromArray combinator fromArray :: forall a. Array a -> Stream a @@ -105,19 +197,78 @@ runFromArray arr sink scheduler = scheduleTasks tasks scheduler eventTasks :: Array Task eventTasks = map (flip eventTask sink) arr +periodic :: Number -> Stream Unit +periodic period = createStream (runPeriodic period) + +runPeriodic :: Number -> Sink Unit -> Scheduler -> Disposable +runPeriodic period sink scheduler = { dispose: scheduledTask.dispose } + where + p = case scheduler of Scheduler a -> a.periodic + scheduledTask = p period (eventTask unit sink) + +-- combinators _map :: forall a b. (a -> b) -> Stream a -> Stream b -_map f stream = createStream runMap +_map f stream = createEventCombinator mapEvent stream where - runMap :: Sink b -> Scheduler -> Disposable - runMap sink scheduler = - runSource stream (mapSink f sink) scheduler + mapEvent :: EventFn a b + mapEvent sink scheduler time value = sink.event time (f value) -mapSink :: forall a b. (a -> b) -> Sink b -> Sink a -mapSink f sink = { event: mapEvent, end: sink.end } +filter :: forall a. (a -> Boolean) -> Stream a -> Stream a +filter predicate stream = createEventCombinator event stream where - mapEvent :: Time -> a -> Unit - mapEvent time value = sink.event time (f value) + event :: EventFn a a + event sink scheduler time value = if predicate value + then sink.event time value + else unit +tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a +tapEvent f stream = createEventCombinator event stream + where + event :: EventFn a a + event sink scheduler time value = sink.event time value + where -- find a better way to perform these side effects + x = unsafePerformEff (f value) + +tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a +tapEnd f stream = createEndCombinator end stream + where + end :: EndFn a + end sink scheduler time = sink.end time + where -- find a better way to perform these side effects + x = unsafePerformEff f + +startWith :: forall a. a -> Stream a -> Stream a +startWith value stream = concat (just value) stream + +constant :: forall a b. b -> Stream a -> Stream b +constant value stream = createEventCombinator constantEvent stream + where + constantEvent sink scheduler time x = sink.event time value + +scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b +scan f seed stream = startWith seed $ createEventCombinator scanEvent stream + where + state = createState seed + + scanEvent :: Sink b -> Scheduler -> Time -> a -> Unit + scanEvent sink scheduler time value = sink.event time (state.set \acc -> f acc value) + +-- find a better way to perform these side effects +createState :: forall a. a -> { get :: Pure a, set :: (a -> a) -> a } +createState seed = { set, get } + where + ref :: Ref a + ref = unsafePerformEff $ unsafeCoerceEff $ newRef seed + + set :: (a -> a) -> a + set f = unsafePerformEff $ unsafeCoerceEff $ + modifyRef' ref \x -> { state: (f x), value: (f x) } + + get :: Pure a + get = unsafeCoerceEff $ readRef ref + +-- Foreign imports +-- Scheduler-related foreign import defaultScheduler :: Scheduler foreign import eventTask :: forall a. a -> Sink a -> Task foreign import endTask :: forall a. Sink a -> Task @@ -125,6 +276,17 @@ foreign import createDisposable :: forall a. (a -> Unit) -> a -> Disposable foreign import emptyDisposable :: Disposable foreign import disposeAll :: Array Disposable -> Disposable foreign import scheduleTasks :: Array Task -> Scheduler -> Disposable +-- combinators foreign import drain :: forall e a. Stream a -> EffStream e Unit -foreign import tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a -foreign import tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a +foreign import mergeConcurrently :: forall a. Int -> Stream (Stream a) -> Stream a +foreign import mergeMapConcurrently :: forall a b. (a -> Stream b) -> Stream a -> Stream b +foreign import combine2 :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c +foreign import combine3 :: forall a b c d. (a -> b -> c -> d) -> Stream a -> Stream b -> Stream c -> Stream d +foreign import combine4 :: forall a b c d e. (a -> b -> c -> d -> e) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e +foreign import combine5 :: forall a b c d e f. (a -> b -> c -> d -> e -> f) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e -> Stream f +foreign import join :: forall a. Stream (Stream a) -> Stream a +foreign import flatMap :: forall a b. Stream a -> (a -> Stream b) -> Stream b +foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Stream a +foreign import concat :: forall a. Stream a -> Stream a -> Stream a +foreign import merge :: forall a. Array (Stream a) -> Stream a +foreign import switch :: forall a. Stream (Stream a) -> Stream a