feat(stream): reimplement

This commit is contained in:
Tylor Steinberger 2017-03-16 13:45:06 -04:00
parent a50a246e69
commit 0813a54fb8
44 changed files with 2080 additions and 2935 deletions

View File

@ -1,2 +1 @@
output/
src/

View File

@ -1,47 +0,0 @@
export class LinkedList {
constructor () {
this.head = null
this.length = 0
}
add (x) {
if (this.head !== null) {
this.head.prev = x
x.next = this.head
}
this.head = x
++this.length
}
remove (x) { // eslint-disable-line complexity
--this.length
if (x === this.head) {
this.head = this.head.next
}
if (x.next !== null) {
x.next.prev = x.prev
x.next = null
}
if (x.prev !== null) {
x.prev.next = x.next
x.prev = null
}
}
isEmpty () {
return this.length === 0
}
dispose () {
if (this.isEmpty()) return
var x = this.head
this.head = null
this.length = 0
while (x !== null) {
x.dispose()
x = x.next
}
}
}

View File

@ -1,5 +0,0 @@
export class Stream {
constructor (source) {
this.source = source
}
}

View File

@ -1,97 +0,0 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { map, tail } from '@most/prelude'
import { IndexSink } from '../sink/IndexSink'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
export const combine2 = f => s1 => s2 => combineArray(f, [s1, s2])
export const combine3 = f => s1 => s2 => s3 => combineArray(f, [s1, s2, s3])
export const combine4 = f => s1 => s2 => s3 => s4 => combineArray(f, [s1, s2, s3, s4])
export const combine5 = f => s1 => s2 => s3 => s4 => s5 => combineArray(f, [s1, s2, s3, s4, s5])
function combineArray (f, streams) {
return new Stream(combineSources(f, streams))
}
function combineSources (f, streams) {
return new Combine(f, map(getSource, streams))
}
function getSource (stream) {
return stream.source
}
function Combine (f, sources) {
this.f = f
this.sources = sources
}
Combine.prototype.run = function (sink) {
return scheduler => {
var l = this.sources.length
var disposables = new Array(l)
var sinks = new Array(l)
var mergeSink = new CombineSink(disposables, sinks, sink, this.f)
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink)(scheduler)
}
return disposeAll(disposables)
}
}
function CombineSink (disposables, sinks, sink, f) {
this.sink = sink
this.disposables = disposables
this.sinks = sinks
this.f = f
var l = sinks.length
this.awaiting = l
this.values = new Array(l)
this.hasValue = new Array(l)
for (var i = 0; i < l; ++i) {
this.hasValue[i] = false
}
this.activeCount = sinks.length
}
CombineSink.prototype.event = function (t) {
return indexedValue => {
var i = indexedValue.index
var awaiting = this._updateReady(i)
this.values[i] = indexedValue.value
if (awaiting === 0) {
const value = tail(this.values).reduce((f, x) => f(x), this.f(this.values[0]))
this.sink.event(t)(value)
}
}
}
CombineSink.prototype._updateReady = function (index) {
if (this.awaiting > 0) {
if (!this.hasValue[index]) {
this.hasValue[index] = true
this.awaiting -= 1
}
}
return this.awaiting
}
CombineSink.prototype.end = function (t, indexedValue) {
this.disposables[indexedValue.index].dispose()
if (--this.activeCount === 0) {
this.sink.end(t)
}
}

View File

@ -1,56 +0,0 @@
import { MemoizedDisposable } from '../disposables/MemoizedDisposable'
import { Stream } from '../Stream'
export const concat = s1 => s2 => continueWith(() => s2)(s1)
export const continueWith = f => stream => new Stream(new ContinueWith(f, stream.source))
function ContinueWith (f, source) {
this.f = f
this.source = source
}
ContinueWith.prototype.run = function (sink) {
return scheduler => {
return new ContinueWithSink(this.f, this.source, sink, scheduler)
}
}
function ContinueWithSink (f, source, sink, scheduler) {
this.f = f
this.sink = sink
this.scheduler = scheduler
this.active = true
this.disposable = new MemoizedDisposable(source.run(this)(scheduler))
}
ContinueWithSink.prototype.event = function (t) {
return x => {
if (!this.active) {
return
}
this.sink.event(t)(x)
}
}
ContinueWithSink.prototype.end = function (t, x) {
if (!this.active) {
return
}
this.disposable.dispose()
this._startNext(t, x, this.sink)
}
ContinueWithSink.prototype._startNext = function (t, x, sink) {
this._continue(this.f, x, sink)
}
ContinueWithSink.prototype._continue = function (f, x, sink) {
return f(x).source.run(sink)(this.scheduler)
}
ContinueWithSink.prototype.dispose = function () {
this.active = false
return this.disposable.dispose()
}

View File

@ -1,41 +0,0 @@
import { endTask, eventTask } from '../scheduler'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
export const delay = delayTime => stream =>
delayTime <= 0 ? stream : new Stream(new Delay(delayTime, stream.source))
class Delay {
constructor (dt, source) {
this.dt = dt
this.source = source
}
run (sink) {
return scheduler => {
const delaySink = new DelaySink(this.dt, sink, scheduler)
return disposeAll([delaySink, this.source.run(delaySink)(scheduler)])
}
}
}
class DelaySink {
constructor (dt, sink, scheduler) {
this.dt = dt
this.sink = sink
this.scheduler = scheduler
}
dispose () {
this.scheduler.cancelAll(task => task.sink === this.sink)
}
event (t) {
return x => this.scheduler.delay(this.dt)(eventTask(x)(this.sink))
}
end (t) {
this.scheduler.delay(this.dt)(endTask(this.sink))
}
}

View File

@ -1,4 +0,0 @@
import { defaultScheduler } from '../scheduler'
import { runEffects } from '../runEffects'
export const drain = stream => () => runEffects(stream, defaultScheduler)

View File

@ -1,8 +0,0 @@
export * from './drain'
export * from './mergeConcurrently'
export * from './combine'
export * from './merge'
export * from './concat'
export * from './switch'
export * from './skipRepeatsWith'
export * from './delay'

View File

@ -1,74 +0,0 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { IndexSink } from '../sink/IndexSink'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
import { reduce } from '@most/prelude'
/**
* @param {Array} streams array of stream to merge
* @returns {Stream} stream containing events from all input observables
* in time order. If two events are simultaneous they will be merged in
* arbitrary order.
*/
export function merge (streams) {
return new Stream(mergeSources(streams))
}
/**
* This implements fusion/flattening for merge. It will
* fuse adjacent merge operations. For example:
* - a.merge(b).merge(c) effectively becomes merge(a, b, c)
* - merge(a, merge(b, c)) effectively becomes merge(a, b, c)
* It does this by concatenating the sources arrays of
* any nested Merge sources, in effect "flattening" nested
* merge operations into a single merge.
*/
function mergeSources (streams) {
return new Merge(reduce(appendSources, [], streams))
}
function appendSources (sources, stream) {
var source = stream.source
return source instanceof Merge
? sources.concat(source.sources)
: sources.concat(source)
}
function Merge (sources) {
this.sources = sources
}
Merge.prototype.run = function (sink, scheduler) {
var l = this.sources.length
var disposables = new Array(l)
var sinks = new Array(l)
var mergeSink = new MergeSink(disposables, sinks, sink)
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink, scheduler)
}
return disposeAll(disposables)
}
function MergeSink (disposables, sinks, sink) {
this.sink = sink
this.disposables = disposables
this.activeCount = sinks.length
}
MergeSink.prototype.event = function (t, indexValue) {
this.sink.event(t, indexValue.value)
}
MergeSink.prototype.end = function (t, indexedValue) {
dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink)
if (--this.activeCount === 0) {
this.sink.end(t, indexedValue.value)
}
}

View File

@ -1,126 +0,0 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { curry2, curry3, id as identity } from '@most/prelude'
import { LinkedList } from '../LinkedList'
import { MemoizedDisposable } from '../disposables/MemoizedDisposable'
import { Stream } from '../Stream'
export const flatMap =
curry2((stream, f) => mergeMapConcurrently(f, Infinity, stream))
export const mergeConcurrently = curry2((concurrency, stream) =>
mergeMapConcurrently(identity, concurrency, stream))
export const join =
mergeConcurrently(Infinity)
export const mergeMapConcurrently = curry3((f, concurrency, stream) =>
new Stream(new MergeConcurrently(f, concurrency, stream.source)))
class MergeConcurrently {
constructor (f, concurrency, source) {
this.f = f
this.concurrency = concurrency
this.source = source
}
run (sink) {
return scheduler => new Outer(this.f, this.concurrency, this.source, sink, scheduler)
}
}
class Outer {
constructor (f, concurrency, source, sink, scheduler) {
this.f = f
this.concurrency = concurrency
this.sink = sink
this.scheduler = scheduler
this.pending = []
this.current = new LinkedList()
this.disposable = new MemoizedDisposable(source.run(this)(scheduler))
this.active = true
}
event (t) {
return x => this._addInner(t, x)
}
_addInner (t, x) {
if (this.current.length < this.concurrency) {
this._startInner(t, x)
} else {
this.pending.push(x)
}
}
_startInner (t, x) {
try {
this._initInner(t, x)
} catch (e) {
this.error(t, e)
}
}
_initInner (t, x) {
const innerSink = new Inner(t, this, this.sink)
innerSink.disposable = mapAndRun(this.f, x, innerSink, this.scheduler)
this.current.add(innerSink)
}
end (t) {
this.active = false
this.disposable.dispose()
this._checkEnd(t)
}
dispose () {
this.active = false
this.pending.length = 0
this.disposable.dispose()
this.current.dispose()
}
_endInner (t, inner) {
this.current.remove(inner)
inner.dispose()
if (this.pending.length === 0) {
this._checkEnd(t)
} else {
this._startInner(t, this.pending.shift())
}
}
_checkEnd (t) {
if (!this.active && this.current.isEmpty()) {
this.sink.end(t)
}
}
}
const mapAndRun = (f, x, sink, scheduler) => f(x).source.run(sink)(scheduler)
class Inner {
constructor (time, outer, sink) {
this.prev = this.next = null
this.time = time
this.outer = outer
this.sink = sink
this.disposable = void 0
}
event (t) {
return x => this.sink.event(Math.max(t, this.time))(x)
}
end (t) {
this.outer._endInner(Math.max(t, this.time), this)
}
dispose () {
return this.disposable.dispose()
}
}

View File

@ -1,40 +0,0 @@
import { Stream } from '../Stream'
export const skipRepeatsWith = equals => stream =>
new Stream(new SkipRepeats(equals, stream))
class SkipRepeats {
constructor (equals, source) {
this.equals = equals
this.source = source
}
run (sink) {
return scheduler => this.source.run(new SkipRepeatsSink(this.equals, sink), scheduler)
}
}
class SkipRepeatsSink {
constructor (equals, sink) {
this.equals = equals
this.value = void 0
this.init = true
}
event (t) {
return x => {
if (this.init) {
this.init = false
this.value = x
this.sink.event(t, x)
} else if (!this.equals(this.value)(x)) {
this.value = x
this.sink.event(t)(x)
}
}
}
end (t) {
this.sink.end(t)
}
}

View File

@ -1,99 +0,0 @@
import { disposeAll, emptyDisposable } from '../disposables'
import { Stream } from '../Stream'
/**
* Given a stream of streams, return a new stream that adopts the behavior
* of the most recent inner stream.
* @param {Stream} stream of streams on which to switch
* @returns {Stream} switching stream
*/
function switchLatest (stream) {
return new Stream(new Switch(stream.source))
}
export { switchLatest as switch }
function Switch (source) {
this.source = source
}
Switch.prototype.run = function (sink) {
return scheduler => {
const switchSink = new SwitchSink(sink, scheduler)
return disposeAll([switchSink, this.source.run(switchSink)(scheduler)])
}
}
function SwitchSink (sink, scheduler) {
this.sink = sink
this.scheduler = scheduler
this.current = null
this.ended = false
}
SwitchSink.prototype.event = function (t) {
return stream => {
this._disposeCurrent(t) // TODO: capture the result of this dispose
this.current = new Segment(t, Infinity, this, this.sink)
this.current.disposable = stream.source.run(this.current)(this.scheduler)
}
}
SwitchSink.prototype.end = function (t, x) {
this.ended = true
this._checkEnd(t, x)
}
SwitchSink.prototype.dispose = function () {
return this._disposeCurrent(this.scheduler.now())
}
SwitchSink.prototype._disposeCurrent = function (t) {
if (this.current !== null) {
return this.current._dispose(t)
}
}
SwitchSink.prototype._disposeInner = function (t, inner) {
inner._dispose(t) // TODO: capture the result of this dispose
if (inner === this.current) {
this.current = null
}
}
SwitchSink.prototype._checkEnd = function (t, x) {
if (this.ended && this.current === null) {
this.sink.end(t, x)
}
}
SwitchSink.prototype._endInner = function (t, x, inner) {
this._disposeInner(t, inner)
this._checkEnd(t, x)
}
function Segment (min, max, outer, sink) {
this.min = min
this.max = max
this.outer = outer
this.sink = sink
this.disposable = emptyDisposable
}
Segment.prototype.event = function (t) {
return x => {
if (t < this.max) {
this.sink.event(Math.max(t, this.min))(x)
}
}
}
Segment.prototype.end = function (t, x) {
this.outer._endInner(Math.max(t, this.min), x, this)
}
Segment.prototype._dispose = function (t) {
this.max = t
this.disposable.dispose()
}

View File

@ -1,13 +0,0 @@
export class Disposable {
/**
* Create a new Disposable which will dispose its underlying resource.
* @param {function} dispose function
* @param {*?} data any data to be passed to disposer function
*/
constructor (dispose, data) {
this._dispose = dispose
this._data = data
this.dispose = () => dispose(data)
}
}

View File

@ -1,17 +0,0 @@
export class MemoizedDisposable {
constructor (disposable) {
this.disposed = false
this.value = undefined
this.disposable = disposable
}
dispose () {
if (!this.disposed) {
this.disposed = true
this.value = this.disposable.dispose()
this.disposable = undefined
}
return this.value
}
}

View File

@ -1,29 +0,0 @@
export class SettableDisposable {
constructor () {
this.disposable = void 0
this.disposed = false
this.result = void 0
}
setDisposable (disposable) {
this.disposable = disposable
if (this.disposed) {
this.result = disposable.dispose()
}
}
dispose () {
if (this.disposed) {
return this.result
}
this.disposed = true
if (this.disposable !== void 0) {
this.result = this.disposable.dispose()
}
return this.result
}
}

View File

@ -1,3 +0,0 @@
export function disposeSafely (disposable) {
disposable.dispose()
}

View File

@ -1,12 +0,0 @@
import { curry2, id, map } from '@most/prelude'
import { Disposable } from './Disposable'
import { MemoizedDisposable } from './MemoizedDisposable'
import { disposeSafely } from './disposeSafely'
export const createDisposable = curry2((dispose, data) => new MemoizedDisposable(new Disposable(dispose, data)))
export const emptyDisposable = new Disposable(id, undefined)
export const disposeAll = disposables =>
createDisposable(curry2(map)(disposeSafely), disposables)

View File

@ -1,5 +0,0 @@
/* @flow */
export * from './disposables'
export * from './scheduler'
export * from './combinators'
export * from './multicast'

View File

@ -1,45 +0,0 @@
import { append, drop } from '@most/prelude'
import { MulticastSource } from './MulticastSource'
export class HoldSource extends MulticastSource {
constructor (source, bufferSize) {
super(source)
this.bufferSize = bufferSize
this.has = false
this.buffer = []
}
add (sink, scheduler) {
if (this.has) {
pushEvents(this.buffer, sink, scheduler)
}
return super.add(sink, scheduler)
}
event (time) {
return value => {
this.has = true
this.buffer = dropAndAppend(value, this.buffer, this.bufferSize)
return super.event(time)(value)
}
}
}
function pushEvents (buffer, sink, scheduler) {
const length = buffer.length
for (let i = 0; i < length; ++i) {
sink.event(scheduler.now(), buffer[i])
}
}
export function dropAndAppend (value, buffer, bufferSize) {
if (buffer.length === bufferSize) {
return append(value, drop(1, buffer))
}
return append(value, buffer)
}

View File

@ -1,16 +0,0 @@
export class MulticastDisposable {
constructor (source, sink) {
this.source = source
this.sink = sink
this.disposed = false
}
dispose () {
if (this.disposed) {
return
}
this.disposed = true
const remaining = this.source.remove(this.sink)
return remaining === 0 && this.source._dispose()
}
}

View File

@ -1,62 +0,0 @@
import { append, findIndex, remove } from '@most/prelude'
import { dispose, emptyDisposable } from './dispose'
import { MulticastDisposable } from './MulticastDisposable'
export class MulticastSource {
constructor (source) {
this.source = source
this.sinks = []
this._disposable = emptyDisposable
}
run (sink) {
return scheduler => {
const n = this.add(sink, scheduler)
if (n === 1) {
this._disposable = this.source.run(this)(scheduler)
}
return new MulticastDisposable(this, sink)
}
}
_dispose () {
const disposable = this._disposable
this._disposable = emptyDisposable
return Promise.resolve(disposable).then(dispose)
}
add (sink) {
this.sinks = append(sink, this.sinks)
return this.sinks.length
}
remove (sink) {
const i = findIndex(sink, this.sinks)
// istanbul ignore next
if (i >= 0) {
this.sinks = remove(i, this.sinks)
}
return this.sinks.length
}
event (time) {
return value => {
const s = this.sinks
if (s.length === 1) {
return s[0].event(time)(value)
}
for (let i = 0; i < s.length; ++i) {
s[i].event(time)(value)
}
}
}
end (time) {
const s = this.sinks
for (let i = 0; i < s.length; ++i) {
s[i].end(time)
}
}
}

View File

@ -1,5 +0,0 @@
export const dispose = disposable => disposable.dispose()
export const emptyDisposable = {
dispose () {}
}

View File

@ -1,14 +0,0 @@
import { HoldSource } from './HoldSource'
import { MulticastSource } from './MulticastSource'
import { Stream } from '../Stream'
export function multicast (stream) {
const source = stream.source
return source instanceof MulticastSource
? stream
: new Stream(new MulticastSource(source))
}
export const hold = bufferSize => stream =>
new Stream(new HoldSource(stream.source, bufferSize))

View File

@ -1,30 +0,0 @@
import { SettableDisposable } from './disposables/SettableDisposable'
export function runEffects (stream, scheduler) {
return runSourceEffects(stream.source, scheduler)
}
const runSourceEffects = (source, scheduler) => runSource(source, scheduler)
function runSource (source, scheduler) {
const disposable = new SettableDisposable()
const observer = new RunEffectsSink(disposable)
disposable.setDisposable(source.run(observer)(scheduler))
return {}
}
var RunEffectsSink = function RunEffectsSink (disposable) {
this._disposable = disposable
this.active = true
}
RunEffectsSink.prototype.event = function event (t) {
return function (x) {}
}
RunEffectsSink.prototype.end = function end (t) {
this.active = false
Promise.resolve(this._disposable).then(d => d.dispose())
}

View File

@ -1,48 +0,0 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/*global setTimeout, clearTimeout*/
export function defer (task) {
return Promise.resolve(task).then(runTask)
}
export function runTask (task) {
return task.run()
}
export class ClockTimer {
constructor () {
this.now = Date.now
}
setTimer (f, dt) {
return dt <= 0 ? runAsap(f) : setTimeout(f, dt)
}
clearTimer (t) {
return t instanceof Asap ? t.cancel() : clearTimeout(t)
}
}
class Asap {
constructor (f) {
this.f = f
this.active = true
}
run () {
return this.active && this.f()
}
cancel () {
this.active = false
}
}
function runAsap (f) {
const task = new Asap(f)
defer(task)
return task
}

View File

@ -1,31 +0,0 @@
import { curry2, curry3 } from '@most/prelude'
const propagateTask =
curry3((run, value, sink) => new PropagateTask(run, value, sink))
export const eventTask =
curry2((value, sink) => propagateTask(runEvent, value, sink))
export const endTask = sink => propagateTask(runEnd, void 0, sink)
class PropagateTask {
constructor (run, value, sink) {
let active = true
this.sink = sink
this.dispose = () => { active = false }
this.run = t => {
if (!active) return
run(t, value, sink)
}
}
}
function runEvent (time, value, sink) {
sink.event(time)(value)
}
function runEnd (time, value, sink) {
sink.end(time, value)
}

View File

@ -1,19 +0,0 @@
/* @flow */
export class ScheduledTask {
constructor (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
}
run () {
this.task.run(this.time)
}
dispose () {
this.scheduler.cancel(this)
this.task.dispose()
}
}

View File

@ -1,101 +0,0 @@
/* @flow */
import { ScheduledTask } from './ScheduledTask'
function runTask (task) {
try {
return task.run()
} catch (e) {
console.error(e)
}
}
export class Scheduler {
constructor (timer, timeline) {
this.timer = timer
this.timeline = timeline
this._timer = null
this._nextArrival = Infinity
this._runReadyTasksBound = () => this._runReadyTasks(this.now())
}
now () {
return this.timer.now()
}
asap (task) {
return this.schedule(0)(-1)(task)
}
delay (delay) {
return task => this.schedule(delay)(-1)(task)
}
periodic (period) {
return task => this.schedule(0)(period)(task)
}
schedule (delay) {
return period => task => {
const now = this.now()
const st = new ScheduledTask(now + Math.max(0, delay), period, task, this)
this.timeline.add(st)
this._scheduleNextRun(now)
return st
}
}
cancel (task) {
task.active = false
if (this.timeline.remove(task)) {
this._reschedule()
}
}
cancelAll (f) {
this.timeline.removeAll(f)
this._reschedule()
}
_reschedule () {
if (this.timeline.isEmpty()) {
this._unschedule()
} else {
this._scheduleNextRun(this.now())
}
}
_unschedule () {
this.timer.clearTimer(this._timer)
this._timer = null
}
_scheduleNextRun (now) { // eslint-disable-line complexity
if (this.timeline.isEmpty()) {
return
}
const nextArrival = this.timeline.nextArrival()
if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now)
} else if (nextArrival < this._nextArrival) {
this._unschedule()
this._scheduleNextArrival(nextArrival, now)
}
}
_scheduleNextArrival (nextArrival, now) {
this._nextArrival = nextArrival
const delay = Math.max(0, nextArrival - now)
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay)
}
_runReadyTasks (now) {
this._timer = null
this.timeline.runTasks(now, runTask)
this._scheduleNextRun(this.now())
}
}

View File

@ -1,124 +0,0 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import * as base from '@most/prelude'
export class Timeline {
constructor () {
this.tasks = []
}
nextArrival () {
return this.isEmpty() ? Infinity : this.tasks[0].time
}
isEmpty () {
return this.tasks.length === 0
}
add (st) {
insertByTime(st, this.tasks)
}
remove (st) {
const i = binarySearch(st.time, this.tasks)
if (i >= 0 && i < this.tasks.length) {
const at = base.findIndex(st, this.tasks[i].events)
if (at >= 0) {
this.tasks[i].events.splice(at, 1)
return true
}
}
return false
}
removeAll (f) {
for (let i = 0; i < this.tasks.length; ++i) {
removeAllFrom(f, this.tasks[i])
}
}
runTasks (t, runTask) {
const tasks = this.tasks
const l = tasks.length
let i = 0
while (i < l && tasks[i].time <= t) {
++i
}
this.tasks = tasks.slice(i)
// Run all ready tasks
for (let j = 0; j < i; ++j) {
this.tasks = runReadyTasks(runTask, tasks[j].events, this.tasks)
}
}
}
function runReadyTasks (runTask, events, tasks) { // eslint-disable-line complexity
for (let i = 0; i < events.length; ++i) {
const task = events[i]
if (task.active) {
runTask(task)
// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0 && task.active) {
task.time = task.time + task.period
insertByTime(task, tasks)
}
}
}
return tasks
}
function insertByTime (task, timeslots) { // eslint-disable-line complexity
const l = timeslots.length
if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]))
return
}
const i = binarySearch(task.time, timeslots)
if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]))
} else if (task.time === timeslots[i].time) {
timeslots[i].events.push(task)
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]))
}
}
function removeAllFrom (f, timeslot) {
timeslot.events = base.removeAll(f, timeslot.events)
}
function binarySearch (t, sortedArray) { // eslint-disable-line complexity
let lo = 0
let hi = sortedArray.length
let mid, y
while (lo < hi) {
mid = Math.floor((lo + hi) / 2)
y = sortedArray[mid]
if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid
} else {
lo = mid + 1
}
}
return hi
}
const newTimeslot = (t, events) => ({ time: t, events: events })

View File

@ -1,16 +0,0 @@
import { ClockTimer } from './ClockTimer'
import { Scheduler } from './Scheduler'
import { Timeline } from './Timeline'
import { curry2 } from '@most/prelude'
export { endTask, eventTask } from './PropagateTask'
export const defaultScheduler = new Scheduler(new ClockTimer(), new Timeline())
export const scheduleTasks = curry2((tasks, scheduler) => {
const scheduledTasks = tasks.map(task => scheduler.asap(task))
return {
dispose: () => scheduledTasks.forEach(task => task.dispose())
}
})

View File

@ -1,24 +0,0 @@
export class IndexSink {
constructor (i, sink) {
this.sink = sink
this.index = i
this.active = true
this.value = void 0
}
event (t) {
return x => {
if (!this.active) return
this.value = x
this.sink.event(t)(this)
}
}
end (t) {
if (!this.active) return
this.active = false
this.sink.end(t, { index: this.index })
}
}

View File

@ -1 +0,0 @@
export * from './IndexSink'

View File

@ -5,7 +5,7 @@
"main": "index.js",
"scripts": {
"start": "yarn build && pulp run",
"build": "rollup -c && pulp build",
"build": "pulp build",
"changelog": "conventional-changelog -i CHANGELOG.md -s -r 0 -p angular",
"clean": "rimraf lib lib.es2015",
"commit": "git-cz",
@ -32,10 +32,6 @@
"husky": "^0.13.2",
"pulp": "^10.0.4",
"purescript": "^0.10.7",
"rollup": "^0.41.4",
"rollup-plugin-buble": "^0.15.0",
"rollup-plugin-flow": "^1.1.1",
"rollup-plugin-node-resolve": "^2.0.0",
"uglify-js": "^2.8.11",
"validate-commit-msg": "^2.11.2"
},

View File

@ -1,14 +0,0 @@
import buble from 'rollup-plugin-buble'
import flow from 'rollup-plugin-flow'
import resolve from 'rollup-plugin-node-resolve'
export default {
entry: 'ffi/Control/Stream/index.js',
dest: 'src/Control/Stream.js',
format: 'cjs',
plugins: [
flow(),
buble(),
resolve({ module: true, jsnext: true })
]
}

View File

@ -0,0 +1,32 @@
exports.emptyDisposable = { dispose: function () { return {} } }
exports.disposeAll = function (disposables) {
var disposed = false
function dispose () {
if (disposed) return
disposables.forEach(function (disposable) {
disposable.dispose()
})
disposed = true
}
return { dispose: dispose }
}
exports.lazyDisposable = function (f) {
var disposed = false
function dispose () {
if (disposed) return
var disposable = f()
disposable.dispose()
disposed = true
}
return { dispose: dispose }
}

View File

@ -0,0 +1,10 @@
module Control.Stream.Disposable where
import Control.Monad.Eff (Pure)
import Data.Unit (Unit)
type Disposable = { dispose :: Pure Unit }
foreign import emptyDisposable :: Disposable
foreign import disposeAll :: Array Disposable -> Disposable
foreign import lazyDisposable :: (Unit -> Disposable) -> Disposable

View File

@ -0,0 +1,370 @@
exports.scheduleTasks = function (f) {
return function (tasks) {
var scheduledTasks = tasks.map(f.bind(scheduler))
function dispose () {
scheduledTasks.forEach(function (task) {
task.dispose()
})
}
return { dispose: dispose }
}
}
exports.eventTask = function (sink) {
return function (value) {
return new PropagateTask(runEvent, value, sink)
}
}
exports.endTask = function (sink) {
return new PropagateTask(runEnd, void 0, sink)
}
function PropagateTask (run, value, sink) {
this.active = true
this._run = run
this.value = value
this.sink = sink
}
PropagateTask.prototype.dispose = function () {
this.active = false
}
PropagateTask.prototype.run = function (time) {
if (!this.active) return
this._run(time, this.value, this.sink)
}
function runEvent (time, value, sink) {
sink.event(time)(value)
}
function runEnd (time, value, sink) {
sink.end(time)
}
function defer (task) {
return Promise.resolve(task).then(runTask)
}
function runTask (task) {
return task.run()
}
var ClockTimer = function ClockTimer () {
this.now = Date.now
}
ClockTimer.prototype.setTimer = function setTimer (f, dt) {
return dt <= 0 ? runAsap(f) : setTimeout(f, dt)
}
ClockTimer.prototype.clearTimer = function clearTimer (t) {
return t instanceof Asap ? t.cancel() : clearTimeout(t)
}
var Asap = function Asap (f) {
this.f = f
this.active = true
}
Asap.prototype.run = function run () {
return this.active && this.f()
}
Asap.prototype.cancel = function cancel () {
this.active = false
}
function runAsap (f) {
var task = new Asap(f)
defer(task)
return task
}
var ScheduledTask = function ScheduledTask (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
}
ScheduledTask.prototype.run = function run () {
this.task.run(this.time)
}
ScheduledTask.prototype.dispose = function dispose () {
this.scheduler.cancel(this)
this.task.dispose()
}
function runTask$1 (task) {
try {
return task.run()
} catch (e) {
console.error(e)
}
}
var Scheduler = function Scheduler (timer, timeline) {
var this$1 = this
this.timer = timer
this.timeline = timeline
this._timer = null
this._nextArrival = Infinity
this._runReadyTasksBound = function () { return this$1._runReadyTasks(this$1.now()) }
}
Scheduler.prototype.now = function now () {
return this.timer.now()
}
Scheduler.prototype.asap = function asap (task) {
return this.schedule(0)(-1)(task)
}
Scheduler.prototype.delay = function delay (delay$1) {
var this$1 = this
return function (task) { return this$1.schedule(delay$1)(-1)(task) }
}
Scheduler.prototype.periodic = function periodic (period) {
var this$1 = this
return function (task) { return this$1.schedule(0)(period)(task) }
}
Scheduler.prototype.schedule = function schedule (delay) {
var this$1 = this
return function (period) {
return function (task) {
var now = this$1.now()
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this$1)
st.run = st.run.bind(st)
st.dispose = st.dispose.bind(st)
this$1.timeline.add(st)
this$1._scheduleNextRun(now)
return st
}
}
}
Scheduler.prototype.cancel = function cancel (task) {
task.active = false
if (this.timeline.remove(task)) {
this._reschedule()
}
}
Scheduler.prototype.cancelAll = function cancelAll (f) {
this.timeline.removeAll(f)
this._reschedule()
}
Scheduler.prototype._reschedule = function _reschedule () {
if (this.timeline.isEmpty()) {
this._unschedule()
} else {
this._scheduleNextRun(this.now())
}
}
Scheduler.prototype._unschedule = function _unschedule () {
this.timer.clearTimer(this._timer)
this._timer = null
}
Scheduler.prototype._scheduleNextRun = function _scheduleNextRun (now) { // eslint-disable-line complexity
if (this.timeline.isEmpty()) {
return
}
var nextArrival = this.timeline.nextArrival()
if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now)
} else if (nextArrival < this._nextArrival) {
this._unschedule()
this._scheduleNextArrival(nextArrival, now)
}
}
Scheduler.prototype._scheduleNextArrival = function _scheduleNextArrival (nextArrival, now) {
this._nextArrival = nextArrival
var delay = Math.max(0, nextArrival - now)
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay)
}
Scheduler.prototype._runReadyTasks = function _runReadyTasks (now) {
this._timer = null
this.timeline.runTasks(now, runTask$1)
this._scheduleNextRun(this.now())
}
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var Timeline = function Timeline () {
this.tasks = []
}
Timeline.prototype.nextArrival = function nextArrival () {
return this.isEmpty() ? Infinity : this.tasks[0].time
}
Timeline.prototype.isEmpty = function isEmpty () {
return this.tasks.length === 0
}
Timeline.prototype.add = function add (st) {
insertByTime(st, this.tasks)
}
Timeline.prototype.remove = function remove$$1 (st) {
var i = binarySearch(st.time, this.tasks)
if (i >= 0 && i < this.tasks.length) {
var at = findIndex(st, this.tasks[i].events)
if (at >= 0) {
this.tasks[i].events.splice(at, 1)
return true
}
}
return false
}
Timeline.prototype.removeAll = function removeAll$$1 (f) {
var this$1 = this
for (var i = 0; i < this.tasks.length; ++i) {
removeAllFrom(f, this$1.tasks[i])
}
}
Timeline.prototype.runTasks = function runTasks (t, runTask) {
var this$1 = this
var tasks = this.tasks
var l = tasks.length
var i = 0
while (i < l && tasks[i].time <= t) {
++i
}
this.tasks = tasks.slice(i)
// Run all ready tasks
for (var j = 0; j < i; ++j) {
this$1.tasks = runReadyTasks(runTask, tasks[j].events, this$1.tasks)
}
}
function runReadyTasks (runTask, events, tasks) { // eslint-disable-line complexity
for (var i = 0; i < events.length; ++i) {
var task = events[i]
if (task.active) {
runTask(task)
// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0 && task.active) {
task.time = task.time + task.period
insertByTime(task, tasks)
}
}
}
return tasks
}
function insertByTime (task, timeslots) { // eslint-disable-line complexity
var l = timeslots.length
if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]))
return
}
var i = binarySearch(task.time, timeslots)
if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]))
} else if (task.time === timeslots[i].time) {
timeslots[i].events.push(task)
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]))
}
}
function removeAllFrom (f, timeslot) {
timeslot.events = removeAll(f, timeslot.events)
}
function binarySearch (t, sortedArray) { // eslint-disable-line complexity
var lo = 0
var hi = sortedArray.length
var mid, y
while (lo < hi) {
mid = Math.floor((lo + hi) / 2)
y = sortedArray[mid]
if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid
} else {
lo = mid + 1
}
}
return hi
}
var newTimeslot = function (t, events) { return ({ time: t, events: events }) }
function removeAll (f, a) {
var l = a.length
var b = new Array(l)
var j = 0
for (var x = (void 0), i = 0; i < l; ++i) {
x = a[i]
if (!f(x)) {
b[j] = x
++j
}
}
b.length = j
return b
}
// findIndex :: a -> [a] -> Int
// find index of x in a, from the left
function findIndex (x, a) {
for (var i = 0, l = a.length; i < l; ++i) {
if (x === a[i]) {
return i
}
}
return -1
}
var scheduler = new Scheduler(new ClockTimer(), new Timeline())
exports.scheduler = scheduler

View File

@ -0,0 +1,32 @@
module Control.Stream.Scheduler where
import Control.Monad.Eff (Pure)
import Control.Stream.Disposable (Disposable)
import Control.Stream.Sink (Sink)
import Control.Stream.Time (Time)
import Data.Unit (Unit)
newtype Scheduler = Scheduler
{ now :: Pure Time
, asap :: Task -> ScheduledTask
, delay :: Number -> Task -> ScheduledTask
, periodic :: Number -> Task -> ScheduledTask
, schedule :: Number -> Number -> Task -> ScheduledTask
, cancel :: ScheduledTask -> Unit
, cancelAll :: (ScheduledTask -> Boolean) -> Unit
}
type Task =
{ run :: Time -> Unit
, dispose :: Pure Unit
}
type ScheduledTask =
{ run :: Pure Unit
, dispose :: Pure Unit
}
foreign import scheduler :: Scheduler
foreign import scheduleTasks :: (Task -> ScheduledTask) -> Array Task -> Disposable
foreign import eventTask :: forall a. Sink a -> a -> Task
foreign import endTask :: forall a. Sink a -> Task

View File

@ -0,0 +1,9 @@
module Control.Stream.Sink (Sink(..)) where
import Control.Stream.Time (Time)
import Data.Unit (Unit)
newtype Sink a = Sink
{ event :: Time -> a -> Unit
, end :: Time -> Unit
}

File diff suppressed because it is too large Load Diff

View File

@ -1,337 +1,58 @@
module Control.Stream
( STREAM
( module Control.Stream.Stream
, module Control.Stream.Disposable
, module Control.Stream.Scheduler
, module Control.Stream.Sink
, module Control.Stream.Time
) where
import Control.Stream.Stream
(STREAM
, EffStream
, Stream
, Source
, EventFn
, EndFn
, RunFn
, Sink
, Scheduler
, Task
, ScheduledTask
, Disposable
, Time
-- Scheduler-related functions and values
, defaultScheduler
, eventTask
, endTask
, createDisposable
, emptyDisposable
, disposeAll
, scheduleTasks
-- Stream-related helpers
, createStream
, createSink
, Stream(..)
, just
, empty
, never
, periodic
, createCombinator
, createEventCombinator
, createEndCombinator
, getSource
, runSource
-- Combinators
, drain
, tapEvent
, tapEnd
, filter
, fromArray
, take
, skip
, constant
, continueWith
, concat
, startWith
, scan
, loop
, chain
, mergeConcurrently
, mergeMapConcurrently
, join
, mergeMapConcurrently
, concatMap
, switch
, combine2
, combine3
, combine4
, combine5
, continueWith
, concat
, combineArray
, merge
, switch
, constant
, scan
, startWith
, multicast
, hold
, skipRepeats
, skipRepeatsWith
, delay
, skipRepeats
, until
, since
, during
, sample
-- Stream factories
, just
, fromArray
, empty
, never
, periodic
, debounce
, throttle
, delay
, multicast
)
where
import Control.Alt ((<|>))
import Control.Applicative (class Applicative)
import Control.Apply (class Apply)
import Control.Bind (class Bind)
import Control.Category (id)
import Control.Monad (class Monad)
import Control.Monad.Eff (Eff, Pure, runPure)
import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef)
import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff)
import Control.MonadPlus (class Alt, class Plus)
import Data.Eq (class Eq, eq)
import Data.Function (flip, ($))
import Data.Functor (class Functor, map)
import Data.Maybe (Maybe(Just, Nothing), fromJust, isJust)
import Data.Monoid (class Monoid)
import Data.Semigroup (class Semigroup)
import Data.Unit (Unit, unit)
import Partial.Unsafe (unsafePartial)
newtype Time = Time Int
foreign import data STREAM :: !
type EffStream e a = Eff (stream :: STREAM | e) a
newtype Stream a = Stream { source :: Source a }
newtype Source a = Source { run :: RunFn a }
type RunFn a = Sink a -> Scheduler -> Disposable
type EventFn a = Time -> a -> Unit
type EndFn = Time -> Unit
newtype Sink a = Sink
{ event :: Time -> a -> Unit
, end :: Time -> Unit
}
newtype Scheduler = Scheduler
{ now :: Pure Time
, asap :: Task -> ScheduledTask
, delay :: Number -> Task -> ScheduledTask
, periodic :: Number -> Task -> ScheduledTask
, schedule :: Number -> Number -> Task -> ScheduledTask
, cancel :: ScheduledTask -> Unit
, cancelAll :: (ScheduledTask -> Boolean) -> Unit
}
type Task =
{ run :: Time -> Unit
, dispose :: Pure Unit
}
type ScheduledTask =
{ run :: Pure Unit
, dispose :: Pure Unit
}
type Disposable = { dispose :: Pure Unit }
instance functorStream :: Functor Stream where
map = _map
instance applyStream :: Apply Stream where
apply = combine2 id
instance applicativeStream :: Applicative Stream where
pure = just
instance bindStream :: Bind Stream where
bind = flatMap
instance monadStream :: Monad Stream
instance semigroupStream :: Semigroup (Stream a) where
append = concat
instance altStream :: Alt Stream where
alt = \s1 s2 -> merge [s1, s2]
instance monoidStream :: Monoid (Stream a) where
mempty = empty
instance plusStream :: Plus Stream where
empty = empty
-- Stream-related helpers
createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a
createStream run = Stream { source: Source { run } }
getSource :: forall a. Stream a -> Source a
getSource (Stream stream) = stream.source
runSource :: forall a. Source a -> Sink a -> Scheduler -> Disposable
runSource (Source source) sink scheduler = source.run sink scheduler
runStream :: forall a. Stream a -> Sink a -> Scheduler -> Disposable
runStream stream sink scheduler = runSource (getSource stream) sink scheduler
createSink :: forall a. (Time -> a -> Unit) -> (Time -> Unit) -> Sink a
createSink event end = Sink { event, end }
createCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> (Sink b -> Scheduler -> EndFn) -> Stream a -> Stream b
createCombinator event end stream = createStream runCombinator
where
runCombinator :: Sink b -> Scheduler -> Disposable
runCombinator sink scheduler =
runStream stream (createSink (event sink scheduler) (end sink scheduler)) scheduler
createEventCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> Stream a -> Stream b
createEventCombinator event stream = createCombinator event end stream
where
end (Sink sink) (Scheduler scheduler) time = sink.end time
createEndCombinator :: forall a. (Sink a -> Scheduler -> EndFn) -> Stream a -> Stream a
createEndCombinator end stream = createCombinator event end stream
where
event :: Sink a -> Scheduler -> EventFn a
event (Sink sink) scheduler time value = sink.event time value
-- Stream factories
just :: forall a. a -> Stream a
just a = createStream (runJust a)
runJust :: forall a. a -> Sink a -> Scheduler -> Disposable
runJust a sink scheduler =
scheduleTasks [ eventTask a sink, endTask sink ] scheduler
empty :: forall a. Stream a
empty = createStream \sink scheduler -> scheduleTasks [endTask sink] scheduler
never :: forall a. Stream a
never = createStream \sink scheduler -> emptyDisposable
-- TODO: remove fromArray because it's not a real event stream
-- replace with a fromArray combinator
fromArray :: forall a. Array a -> Stream a
fromArray arr = createStream (runFromArray arr)
runFromArray :: forall a. Array a -> Sink a -> Scheduler -> Disposable
runFromArray arr sink scheduler = scheduleTasks tasks scheduler
where
tasks :: Array Task
tasks = eventTasks <|> [ (endTask sink) ]
eventTasks :: Array Task
eventTasks = map (flip eventTask sink) arr
periodic :: Number -> Stream Unit
periodic period = createStream (runPeriodic period)
runPeriodic :: Number -> Sink Unit -> Scheduler -> Disposable
runPeriodic period sink (Scheduler scheduler) = { dispose: scheduledTask.dispose }
where
scheduledTask = scheduler.periodic period (eventTask unit sink)
-- combinators
_map :: forall a b. (a -> b) -> Stream a -> Stream b
_map f stream = createEventCombinator mapEvent stream
where
mapEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (f value)
filter :: forall a. (a -> Boolean) -> Stream a -> Stream a
filter predicate stream = createEventCombinator event stream
where
event (Sink sink) (Scheduler scheduler) time value = if predicate value
then sink.event time value
else unit
tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a
tapEvent f stream = createEventCombinator event stream
where
event (Sink sink) (Scheduler scheduler) time value = sink.event time value
where -- find a better way to perform these side effects
x = unsafePerformEff (f value)
tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a
tapEnd f stream = createEndCombinator end stream
where
end (Sink sink) (Scheduler scheduler) time = sink.end time
where -- find a better way to perform these side effects
x = unsafePerformEff f
startWith :: forall a. a -> Stream a -> Stream a
startWith value stream = concat (just value) stream
constant :: forall a b. b -> Stream a -> Stream b
constant value stream = createEventCombinator constantEvent stream
where
constantEvent (Sink sink) (Scheduler scheduler) time x = sink.event time value
scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b
scan f seed stream = startWith seed $ createEventCombinator scanEvent stream
where
state :: State b
state = createState seed
scanEvent :: Sink b -> Scheduler -> Time -> a -> Unit
scanEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (state.set \acc -> f acc value)
skipRepeats :: forall a. (Eq a) => Stream a -> Stream a
skipRepeats = skipRepeatsWith eq
sample :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
sample f sampler stream = createStream $ runSample (createState Nothing)
where
runSample :: State (Maybe b) -> Sink c -> Scheduler -> Disposable
runSample state sink scheduler = disposeAll
[ runStream stream (createHoldSink state) scheduler
, runStream sampler (createSampleSink state sink f) scheduler
]
createHoldSink :: forall a. State (Maybe a) -> Sink a
createHoldSink state = createSink event end
where
event time value = always unit (state.set \_ -> Just value)
end time = unit
createSampleSink :: forall a b c. State (Maybe b) -> Sink c -> (a -> b -> c) -> Sink a
createSampleSink state (Sink sink) f = createSink event end
where
end time = sink.end time
event time value =
if isJust (runPure state.get)
then sink.event time $ f value (unsafePartial $ fromJust (runPure state.get))
else unit
always :: forall a b. a -> b -> a
always a b = a
-- find a better way to perform these side effects
type State a = { get:: Pure a, set :: (a -> a) -> a }
createState :: forall a. a -> State a
createState seed = { set, get }
where
ref :: Ref a
ref = unsafePerformEff $ unsafeCoerceEff $ newRef seed
set :: (a -> a) -> a
set f = unsafePerformEff $ unsafeCoerceEff $
modifyRef' ref \x -> { state: (f x), value: (f x) }
get :: Pure a
get = unsafeCoerceEff $ readRef ref
-- Foreign imports
-- Scheduler-related
foreign import defaultScheduler :: Scheduler
foreign import eventTask :: forall a. a -> Sink a -> Task
foreign import endTask :: forall a. Sink a -> Task
foreign import createDisposable :: forall a. (a -> Unit) -> a -> Disposable
foreign import emptyDisposable :: Disposable
foreign import disposeAll :: Array Disposable -> Disposable
foreign import scheduleTasks :: Array Task -> Scheduler -> Disposable
-- combinators
foreign import drain :: forall e a. Stream a -> EffStream e Unit
foreign import mergeConcurrently :: forall a. Int -> Stream (Stream a) -> Stream a
foreign import mergeMapConcurrently :: forall a b. (a -> Stream b) -> Stream a -> Stream b
foreign import combine2 :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
foreign import combine3 :: forall a b c d. (a -> b -> c -> d) -> Stream a -> Stream b -> Stream c -> Stream d
foreign import combine4 :: forall a b c d e. (a -> b -> c -> d -> e) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e
foreign import combine5 :: forall a b c d e f. (a -> b -> c -> d -> e -> f) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e -> Stream f
foreign import join :: forall a. Stream (Stream a) -> Stream a
foreign import flatMap :: forall a b. Stream a -> (a -> Stream b) -> Stream b
foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Stream a
foreign import concat :: forall a. Stream a -> Stream a -> Stream a
foreign import merge :: forall a. Array (Stream a) -> Stream a
foreign import switch :: forall a. Stream (Stream a) -> Stream a
foreign import multicast :: forall a. Stream a -> Stream a
foreign import hold :: forall a. Int -> Stream a -> Stream a
foreign import skipRepeatsWith :: forall a. (a -> a -> Boolean) -> Stream a -> Stream a
foreign import delay :: forall a. Number -> Stream a -> Stream a
import Control.Stream.Disposable (Disposable, emptyDisposable)
import Control.Stream.Scheduler (Scheduler(..), Task, ScheduledTask, scheduler, scheduleTasks, eventTask, endTask)
import Control.Stream.Sink (Sink(..))
import Control.Stream.Time (Time)

1244
src/Control/Stream/Stream.js Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,339 @@
module Control.Stream.Stream
( STREAM
, EffStream
, Stream(..)
, just
, empty
, never
, periodic
, createCombinator
, createEventCombinator
, createEndCombinator
, drain
, tapEvent
, tapEnd
, fromArray
, take
, skip
, constant
, continueWith
, concat
, startWith
, scan
, loop
, chain
, mergeConcurrently
, join
, mergeMapConcurrently
, concatMap
, switch
, combine2
, combine3
, combine4
, combine5
, combineArray
, merge
, skipRepeatsWith
, skipRepeats
, until
, since
, during
, sample
, debounce
, throttle
, delay
, multicast
, hold
, Subject
, toSubject
, toHoldSubject
, fromSubject
, subjectEvent
, subjectEnd
, subscribe
) where
import Control.Alternative (class Apply)
import Control.Bind (class Bind)
import Control.Category (id)
import Control.Monad.Eff (Eff, Pure, runPure)
import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef)
import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff)
import Control.MonadPlus (class Alt, class Alternative, class MonadPlus, class Plus)
import Control.MonadZero (class Applicative, class MonadZero)
import Control.Stream.Disposable (Disposable, emptyDisposable, disposeAll)
import Control.Stream.Scheduler (Scheduler(..), endTask, eventTask, scheduleTasks)
import Control.Stream.Sink (Sink(..))
import Control.Stream.Stream (Stream(..), EffStream, empty, just)
import Control.Stream.Time (Time)
import Data.Eq ((==))
import Data.EuclideanRing ((-))
import Data.Functor (class Functor)
import Data.Maybe (Maybe(..), fromJust, isNothing)
import Data.Monoid (class Monoid)
import Data.Unit (Unit, unit)
import Partial.Unsafe (unsafePartial)
import Prelude (class Eq, class Monad, class Semigroup, eq)
foreign import data STREAM :: !
type EffStream e a = Eff (stream :: STREAM | e) a
foreign import data Subject :: * -> *
newtype Stream a = Stream { run :: Sink a -> Scheduler -> Disposable }
instance functorStream :: Functor Stream where
map = mapStream
instance applyStream :: Apply Stream where
apply = combine2 id
instance applicativeStream :: Applicative Stream where
pure = just
instance bindStream :: Bind Stream where
bind = chain
instance monadStream :: Monad Stream
instance semigroupStream :: Semigroup (Stream a) where
append = concat
instance altStream :: Alt Stream where
alt = \s1 s2 -> merge [s1, s2]
instance monoidStream :: Monoid (Stream a) where
mempty = empty
instance plusStream :: Plus Stream where
empty = empty
instance alternativeStream :: Alternative Stream
instance monadZeroStream :: MonadZero Stream
instance monadPlusStream :: MonadPlus Stream
just :: forall a. a -> Stream a
just a = Stream { run } where
run :: Sink a -> Scheduler -> Disposable
run sink (Scheduler scheduler) =
scheduleTasks scheduler.asap [ (eventTask sink a), (endTask sink) ]
empty :: forall a. Stream a
empty = Stream { run } where
run :: Sink a -> Scheduler -> Disposable
run sink (Scheduler scheduler) = scheduleTasks scheduler.asap [ endTask sink ]
never :: forall a. Stream a
never = Stream { run: \sink scheduler -> emptyDisposable }
periodic :: Number -> Stream Unit
periodic period = Stream { run } where
run :: Sink Unit -> Scheduler -> Disposable
run sink (Scheduler scheduler) = do
let task = eventTask sink unit
let scheduledTask = scheduler.periodic period task
{ dispose : scheduledTask.dispose }
createCombinator :: forall a b. (Sink b -> Time -> a -> Unit) -> (Sink b -> Time -> Unit) -> Stream a -> Stream b
createCombinator event end (Stream stream) = Stream { run } where
run :: Sink b -> Scheduler -> Disposable
run sink scheduler = stream.run (Sink { event: (event sink), end: (end sink) }) scheduler
createEventCombinator :: forall a b. (Sink b -> Time -> a -> Unit) -> Stream a -> Stream b
createEventCombinator event stream = createCombinator event end stream where
end (Sink sink) time = sink.end time
createEndCombinator :: forall a. (Sink a -> Time -> Unit) -> Stream a -> Stream a
createEndCombinator end stream = createCombinator event end stream where
event (Sink sink) time value = sink.event time value
tapEvent :: forall e a. (a -> Eff e Unit) -> Stream a -> Stream a
tapEvent f stream = createEventCombinator event stream where
event :: Sink a -> Time -> a -> Unit
event (Sink sink) time value = sink.event time (always value (unsafePerformEff (f value)))
tapEnd :: forall e a. Eff e Unit -> Stream a -> Stream a
tapEnd f stream = createEndCombinator end stream where
end :: Sink a -> Time -> Unit
end (Sink sink) time = sink.end (always time (unsafePerformEff f))
take :: forall a. Int -> Stream a -> Stream a
take 0 stream = empty
take amount (Stream stream) = Stream { run } where
run (Sink sink) scheduler = disposable where
disposable = stream.run (Sink { event, end }) scheduler where
state :: State Int
state = createState amount
event ::Time -> a -> Unit
event time value = do
let amountLeft = (getState state)
if amountLeft == 0
then unit
else do
let newAmountLeft = state.set \x -> x - 1
let x = sink.event time value
if newAmountLeft == 0
then sink.end (always time (runPure disposable.dispose))
else unit
end time = sink.end time
skip :: forall a. Int -> Stream a -> Stream a
skip 0 stream = stream
skip amount stream = createEventCombinator event stream where
state :: State Int
state = createState amount
event :: Sink a -> Time -> a -> Unit
event (Sink sink) time value = do
let amountLeft = runPure state.get
if (amountLeft == 0)
then sink.event time value
else always unit (state.set \x -> x - 1)
mapStream :: forall a b. (a -> b) -> Stream a -> Stream b
mapStream f stream = createEventCombinator event stream where
event :: Sink b -> Time -> a -> Unit
event (Sink sink) time value = sink.event time (f value)
filter :: forall a. (a -> Boolean) -> Stream a -> Stream a
filter predicate stream = createEventCombinator event stream where
event (Sink sink) time value = if predicate value
then sink.event time value
else unit
skipRepeatsWith :: forall a. (a -> a -> Boolean) -> Stream a -> Stream a
skipRepeatsWith compare stream = createEventCombinator event stream where
state :: State (Maybe a)
state = createState Nothing
event (Sink sink) time value = do
let currentState = (getState state)
if (isNothing currentState)
then sink.event time value
else do
let currentValue = unsafePartial (fromJust currentState)
if (compare currentValue value)
then unit
else sink.event time value
skipRepeats :: forall a. (Eq a) => Stream a -> Stream a
skipRepeats = skipRepeatsWith (eq)
constant :: forall a b. b -> Stream a -> Stream b
constant b stream = createEventCombinator event stream where
event :: Sink b -> Time -> a -> Unit
event (Sink sink) time value = sink.event time b
concat :: forall a. Stream a -> Stream a -> Stream a
concat s1 s2 = continueWith(\_ -> s2) s1
startWith :: forall a. a -> Stream a -> Stream a
startWith seed stream = concat (just seed) stream
scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b
scan f initial stream = startWith initial (createEventCombinator event stream) where
seed :: State b
seed = createState initial
event :: Sink b -> Time -> a -> Unit
event (Sink sink) time value = sink.event time (seed.set \x -> f x value)
loop :: forall a b c. (b -> a -> { seed :: b, value :: c }) -> b -> Stream a -> Stream c
loop f initial stream = createEventCombinator event stream where
seed :: State b
seed = createState initial
event :: Sink c -> Time -> a -> Unit
event (Sink sink) time x = do
let seedValue = f (getState seed) x
sink.event time (always seedValue.value (seed.set \_ -> seedValue.seed))
concatMap :: forall a b. (a -> Stream b) -> Stream a -> Stream b
concatMap f stream = mergeMapConcurrently f 1 stream
ap :: forall a b. (Stream (a -> b)) -> Stream a -> Stream b
ap fs xs = combine2 (\f x -> f x) fs xs
during :: forall a b. Stream (Stream b) -> Stream a -> Stream a
during timeWindow stream = until (join timeWindow) (since timeWindow stream)
sample :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
sample f (Stream sampler) (Stream stream) = Stream { run: runSample (createState Nothing) } where
runSample :: State (Maybe b) -> Sink c -> Scheduler -> Disposable
runSample state sink scheduler = disposeAll
[ stream.run (createHoldSink state) scheduler
, sampler.run (createSampleSink state sink f) scheduler
]
createHoldSink :: forall a. State (Maybe a) -> Sink a
createHoldSink state = Sink { event, end } where
event time value = always unit (state.set \_ -> Just value)
end time = unit
createSampleSink :: forall a b c. State (Maybe b) -> Sink c -> (a -> b -> c) -> Sink a
createSampleSink state (Sink sink) f = Sink { event, end } where
end time = sink.end time
event time value =
if isNothing (runPure state.get)
then unit
else sink.event time (f value (unsafePartial (fromJust (runPure state.get))))
sampleWith :: forall a b. Stream b -> Stream a -> Stream a
sampleWith signal stream = sample (\_ y -> y) signal stream
-- find a better way to perform these side effects
always :: forall a b. a -> b -> a
always a b = a
type State a = { get:: Pure a, set :: (a -> a) -> a }
getState :: forall a. State a -> a
getState state = runPure state.get
createState :: forall a. a -> State a
createState seed = { set, get }
where
ref :: Ref a
ref = unsafePerformEff (unsafeCoerceEff (newRef seed))
set :: (a -> a) -> a
set f = unsafePerformEff (unsafeCoerceEff (modifyRef' ref \x -> { state: (f x), value: (f x) }))
get :: Pure a
get = unsafeCoerceEff (readRef ref)
foreign import drain :: forall e a. Stream a -> EffStream e Unit
foreign import subscribe :: forall e a. (a -> Eff e Unit) -> (Unit -> Eff e Unit) -> Stream a -> EffStream e Unit
foreign import fromArray :: forall a b. Array a -> Stream b -> Stream a
foreign import chain :: forall a b. Stream a -> (a -> Stream b) -> Stream b
foreign import mergeConcurrently :: forall a. Int -> Stream (Stream a) -> Stream a
foreign import join :: forall a. Stream (Stream a) -> Stream a
foreign import mergeMapConcurrently :: forall a b. (a -> Stream b) -> Int -> Stream a -> Stream b
foreign import switch :: forall a. Stream (Stream a) -> Stream a
foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Stream a
foreign import combine2 :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
foreign import combine3 :: forall a b c d. (a -> b -> c -> d) -> Stream a -> Stream b -> Stream c -> Stream d
foreign import combine4 :: forall a b c d e. (a -> b -> c -> d -> e) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e
foreign import combine5 :: forall a b c d e f. (a -> b -> c -> d -> e -> f) -> Stream a -> Stream b -> Stream c -> Stream d -> Stream e -> Stream f
foreign import combineArray :: forall a. Array (Stream a) -> Stream (Array a)
foreign import merge :: forall a. Array (Stream a) -> Stream a
foreign import until :: forall a b. Stream b -> Stream a -> Stream a
foreign import since :: forall a b. Stream b -> Stream a -> Stream a
foreign import debounce :: forall a. Number -> Stream a -> Stream a
foreign import throttle :: forall a. Number -> Stream a -> Stream a
foreign import delay :: forall a. Number -> Stream a -> Stream a
foreign import multicast :: forall a. Stream a -> Stream a
foreign import hold :: forall a. Stream a -> Stream a
foreign import proxy :: forall a. a -> { attach :: (Stream a -> Stream a), stream :: Stream a }
-- Sad subject types :/
foreign import toSubject :: forall a. Stream a -> Subject a
foreign import toHoldSubject :: forall a. Int -> Stream a -> Subject a
foreign import fromSubject :: forall a. Subject a -> Stream a
foreign import subjectEvent :: forall a. a -> Subject a -> Subject a
foreign import subjectEnd :: forall a. a -> Subject a -> Subject a

3
src/Control/Time.purs Normal file
View File

@ -0,0 +1,3 @@
module Control.Stream.Time where
newtype Time = Time Int