feat(stream): start of a stream library

This commit is contained in:
Tylor Steinberger 2017-03-12 19:48:37 -04:00
commit 6b75d2f2a7
36 changed files with 2696 additions and 0 deletions

14
.editorconfig Normal file
View File

@ -0,0 +1,14 @@
root = true
[*]
indent_style = space
indent_size = 2
tab_width = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
[*.md]
trim_trailing_whitespace = false
insert_final_newline = false

2
.eslintignore Normal file
View File

@ -0,0 +1,2 @@
output/
src/

6
.eslintrc Normal file
View File

@ -0,0 +1,6 @@
{
"extends": "@most/eslint-config-most",
"rules": {
"comma-dangle": "off"
}
}

16
.github/ISSUE_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,16 @@
<!--
Do you need help or have a question? Please ensure your question is thorough.
Found a bug? Please fill out the sections below 👍.
Show some empathy to the person reading your issue. They are volunteers.
-->
**Code to reproduce the issue:**
**Expected behavior:**
**Actual behavior:**
**Versions of packages used:**

8
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,8 @@
<!--
Thank you for your contribution!
To help speed up the process of merging your code, check the following:
-->
- [ ] I added new tests for the issue I fixed or the feature I built
- [ ] I ran `yarn test` for the package I'm modifying
- [ ] I used `yarn commit` instead of `git commit`

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
/bower_components/
/node_modules/
/.pulp-cache/
/output/
/generated-docs/
/.psc*
/.psa*

6
.travis.yml Normal file
View File

@ -0,0 +1,6 @@
language: node_js
node_js:
- 7
cache: yarn

11
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,11 @@
# Contributing
First of all, thank you so much, we need your help.
## Contributing a fix or feature
1. Fork the repository
2. Switch to a new branch `git checkout -b [branchName]`
3. Produce your fix or feature
4. Use `yarn commit` instead of `git commit` PLEASE!
5. Submit a pull request for review

20
LICENSE.md Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2016 Tylor Steinberger
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

19
bower.json Normal file
View File

@ -0,0 +1,19 @@
{
"name": "tempest",
"ignore": [
"**/.*",
"node_modules",
"bower_components",
"output"
],
"dependencies": {
"purescript-eff": "^2.0.0",
"purescript-prelude": "^2.5.0",
"purescript-aff-promise": "^0.4.0",
"purescript-aff": "^2.0.3"
},
"devDependencies": {
"purescript-console": "^2.0.0",
"purescript-psci-support": "^2.0.0"
}
}

View File

@ -0,0 +1,47 @@
export class LinkedList {
constructor () {
this.head = null
this.length = 0
}
add (x) {
if (this.head !== null) {
this.head.prev = x
x.next = this.head
}
this.head = x
++this.length
}
remove (x) { // eslint-disable-line complexity
--this.length
if (x === this.head) {
this.head = this.head.next
}
if (x.next !== null) {
x.next.prev = x.prev
x.next = null
}
if (x.prev !== null) {
x.prev.next = x.next
x.prev = null
}
}
isEmpty () {
return this.length === 0
}
dispose () {
if (this.isEmpty()) return
var x = this.head
this.head = null
this.length = 0
while (x !== null) {
x.dispose()
x = x.next
}
}
}

View File

@ -0,0 +1,5 @@
export class Stream {
constructor (source) {
this.source = source
}
}

View File

@ -0,0 +1,4 @@
import { defaultScheduler } from '../scheduler'
import { runEffects } from '../runEffects'
export const drain = stream => () => runEffects(stream, defaultScheduler)

View File

@ -0,0 +1,2 @@
export * from './tap'
export * from './drain'

View File

@ -0,0 +1,123 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import { LinkedList } from '../LinkedList'
import { MemoizedDisosable } from '../disposables/MemoizedDisposable'
import { id as identity } from '@most/prelude'
export const mergeConcurrently = (concurrency, stream) =>
mergeMapConcurrently(identity, concurrency, stream)
export const mergeMapConcurrently = (f, concurrency, stream) =>
new MergeConcurrently(f, concurrency, stream)
class MergeConcurrently {
constructor (f, concurrency, source) {
this.f = f
this.concurrency = concurrency
this.source = source
}
run (sink, scheduler) {
return new Outer(this.f, this.concurrency, this.source, sink, scheduler)
}
}
class Outer {
constructor (f, concurrency, source, sink, scheduler) {
this.f = f
this.concurrency = concurrency
this.sink = sink
this.scheduler = scheduler
this.pending = []
this.current = new LinkedList()
this.disposable = new MemoizedDisosable(source.run(this, scheduler))
this.active = true
}
event (t, x) {
this._addInner(t, x)
}
_addInner (t, x) {
if (this.current.length < this.concurrency) {
this._startInner(t, x)
} else {
this.pending.push(x)
}
}
_startInner (t, x) {
try {
this._initInner(t, x)
} catch (e) {
this.error(t, e)
}
}
_initInner (t, x) {
const innerSink = new Inner(t, this, this.sink)
innerSink.disposable = mapAndRun(this.f, x, innerSink, this.scheduler)
this.current.add(innerSink)
}
end (t, x) {
this.active = false
this.disposable.dispose()
this._checkEnd(t, x)
}
dispose () {
this.active = false
this.pending.length = 0
this.disposable.dispose()
this.current.dispose()
}
_endInner (t, x, inner) {
this.current.remove(inner)
inner.dispose()
if (this.pending.length === 0) {
this._checkEnd(t, x)
} else {
this._startInner(t, this.pending.shift())
}
}
_checkEnd (t, x) {
if (!this.active && this.current.isEmpty()) {
this.sink.end(t, x)
}
}
}
const mapAndRun = (f, x, sink, scheduler) =>
f(x).run(sink, scheduler)
class Inner {
constructor (time, outer, sink) {
this.prev = this.next = null
this.time = time
this.outer = outer
this.sink = sink
this.disposable = void 0
}
event (t, x) {
this.sink.event(Math.max(t, this.time), x)
}
end (t, x) {
this.outer._endInner(Math.max(t, this.time), x, this)
}
error (t, e) {
this.outer.error(Math.max(t, this.time), e)
}
dispose () {
return this.disposable.dispose()
}
}

View File

@ -0,0 +1,50 @@
import { Stream } from '../Stream'
import { curry2 } from '@most/prelude'
export const tapEnd = curry2((f, stream) => new Stream(new TapEnd(f, stream.source)))
export const tapEvent = curry2((f, stream) => new Stream(new TapEvent(f, stream.source)))
class TapEvent {
constructor (f, source) {
this.source = source
this.f = f
this.run = sink => scheduler =>
source.run(new TapSink(f, sink))(scheduler)
}
}
class TapSink {
constructor (f, sink) {
this.event = t => x => {
f(x)()
sink.event(t)(x)
}
this.end = t => {
sink.end(t)
}
}
}
class TapEnd {
constructor (f, source) {
this.source = source
this.f = f
this.run = sink => scheduler =>
source.run(new TapEndSink(f, sink))(scheduler)
}
}
class TapEndSink {
constructor (f, sink) {
this.event = t => x => sink.event(t)(x)
this.end = t => {
f()
sink.end(t)
}
}
}

View File

@ -0,0 +1,15 @@
export class Disposable {
/**
* Create a new Disposable which will dispose its underlying resource.
* @param {function} dispose function
* @param {*?} data any data to be passed to disposer function
*/
constructor (dispose, data) {
this._dispose = dispose
this._data = data
}
dispose () {
return this._dispose(this._data)
}
}

View File

@ -0,0 +1,17 @@
export class MemoizedDisposable {
constructor (disposable) {
this.disposed = false
this.value = undefined
this.disposable = disposable
}
dispose () {
if (!this.disposed) {
this.disposed = true
this.value = disposeSafely(this.disposable)
this.disposable = undefined
}
return this.value
}
}

View File

@ -0,0 +1,29 @@
export class SettableDisposable {
constructor () {
this.disposable = void 0
this.disposed = false
this.result = void 0
}
setDisposable (disposable) {
this.disposable = disposable
if (this.disposed) {
this.result = disposable.dispose()
}
}
dispose () {
if (this.disposed) {
return this.result
}
this.disposed = true
if (this.disposable !== void 0) {
this.result = this.disposable.dispose()
}
return this.result
}
}

View File

@ -0,0 +1,3 @@
export function disposeSafely (disposable) {
disposable.dispose()
}

View File

@ -0,0 +1,12 @@
import { curry2, id, map } from '@most/prelude'
import { Disposable } from './Disposable'
import { MemoizedDisposable } from './MemoizedDisposable'
import { disposeSafely } from './disposeSafely'
export const createDisposable = curry2((dispose, data) => new MemoizedDisposable(new Disposable(dispose, data)))
export const emptyDisposable = new Disposable(id, undefined)
export const disposeAll = disposables =>
createDisposable(map(disposeSafely, disposables), disposables)

View File

@ -0,0 +1,4 @@
/* @flow */
export * from './disposables'
export * from './scheduler'
export * from './combinators'

View File

@ -0,0 +1,28 @@
import { SettableDisposable } from './disposables/SettableDisposable'
export function runEffects (stream, scheduler) {
return runSourceEffects(stream.source, scheduler)
}
const runSourceEffects = (source, scheduler) => runSource(source, scheduler)
function runSource (source, scheduler) {
const disposable = new SettableDisposable()
const observer = new RunEffectsSink(disposable)
disposable.setDisposable(source.run(observer)(scheduler))
}
var RunEffectsSink = function RunEffectsSink (disposable) {
this._disposable = disposable
this.active = true
}
RunEffectsSink.prototype.event = function event (t) {
return function (x) {}
}
RunEffectsSink.prototype.end = function end (t) {
this.active = false
Promise.resolve(this._disposable).then(d => d.dispose())
}

View File

@ -0,0 +1,48 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/*global setTimeout, clearTimeout*/
export function defer (task) {
return Promise.resolve(task).then(runTask)
}
export function runTask (task) {
return task.run()
}
export class ClockTimer {
constructor () {
this.now = Date.now
}
setTimer (f, dt) {
return dt <= 0 ? runAsap(f) : setTimeout(f, dt)
}
clearTimer (t) {
return t instanceof Asap ? t.cancel() : clearTimeout(t)
}
}
class Asap {
constructor (f) {
this.f = f
this.active = true
}
run () {
return this.active && this.f()
}
cancel () {
this.active = false
}
}
function runAsap (f) {
const task = new Asap(f)
defer(task)
return task
}

View File

@ -0,0 +1,30 @@
import { curry2, curry3 } from '@most/prelude'
const propagateTask =
curry3((run, value, sink) => new PropagateTask(run, value, sink))
export const eventTask =
curry2((value, sink) => propagateTask(runEvent, value, sink))
export const endTask = sink => propagateTask(runEnd, void 0, sink)
class PropagateTask {
constructor (run, value, sink) {
let active = true
this.dispose = () => { active = false }
this.run = t => {
if (!active) return
run(t, value, sink)
}
}
}
function runEvent (time, value, sink) {
sink.event(time)(value)
}
function runEnd (time, value, sink) {
sink.end(time, value)
}

View File

@ -0,0 +1,14 @@
/* @flow */
export function ScheduledTask (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
this.run = () => task.run(this.time)
this.dispose = () => {
scheduler.cancel(this)
task.dispose()
}
}

View File

@ -0,0 +1,99 @@
/* @flow */
import { ScheduledTask } from './ScheduledTask'
function runTask (task) {
try {
return task.run()
} catch (e) {
console.error(e)
}
}
export class Scheduler {
constructor (timer, timeline) {
this.timer = timer
this.timeline = timeline
this._timer = null
this._nextArrival = Infinity
this._runReadyTasksBound = () => this._runReadyTasks(this.now())
}
now () {
return this.timer.now()
}
asap (task) {
return this.schedule(0, -1, task)
}
delay (delay, task) {
return this.schedule(delay, -1, task)
}
periodic (period, task) {
return this.schedule(0, period, task)
}
schedule (delay, period, task) {
const now = this.now()
const st = new ScheduledTask(now + Math.max(0, delay), period, task, this)
this.timeline.add(st)
this._scheduleNextRun(now)
return st
}
cancel (task) {
task.active = false
if (this.timeline.remove(task)) {
this._reschedule()
}
}
cancelAll (f) {
this.timeline.removeAll(f)
this._reschedule()
}
_reschedule () {
if (this.timeline.isEmpty()) {
this._unschedule()
} else {
this._scheduleNextRun(this.now())
}
}
_unschedule () {
this.timer.clearTimer(this._timer)
this._timer = null
}
_scheduleNextRun (now) { // eslint-disable-line complexity
if (this.timeline.isEmpty()) {
return
}
const nextArrival = this.timeline.nextArrival()
if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now)
} else if (nextArrival < this._nextArrival) {
this._unschedule()
this._scheduleNextArrival(nextArrival, now)
}
}
_scheduleNextArrival (nextArrival, now) {
this._nextArrival = nextArrival
const delay = Math.max(0, nextArrival - now)
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay)
}
_runReadyTasks (now) {
this._timer = null
this.timeline.runTasks(now, runTask)
this._scheduleNextRun(this.now())
}
}

View File

@ -0,0 +1,124 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import * as base from '@most/prelude'
export class Timeline {
constructor () {
this.tasks = []
}
nextArrival () {
return this.isEmpty() ? Infinity : this.tasks[0].time
}
isEmpty () {
return this.tasks.length === 0
}
add (st) {
insertByTime(st, this.tasks)
}
remove (st) {
const i = binarySearch(st.time, this.tasks)
if (i >= 0 && i < this.tasks.length) {
const at = base.findIndex(st, this.tasks[i].events)
if (at >= 0) {
this.tasks[i].events.splice(at, 1)
return true
}
}
return false
}
removeAll (f) {
for (let i = 0; i < this.tasks.length; ++i) {
removeAllFrom(f, this.tasks[i])
}
}
runTasks (t, runTask) {
const tasks = this.tasks
const l = tasks.length
let i = 0
while (i < l && tasks[i].time <= t) {
++i
}
this.tasks = tasks.slice(i)
// Run all ready tasks
for (let j = 0; j < i; ++j) {
this.tasks = runReadyTasks(runTask, tasks[j].events, this.tasks)
}
}
}
function runReadyTasks (runTask, events, tasks) { // eslint-disable-line complexity
for (let i = 0; i < events.length; ++i) {
const task = events[i]
if (task.active) {
runTask(task)
// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0 && task.active) {
task.time = task.time + task.period
insertByTime(task, tasks)
}
}
}
return tasks
}
function insertByTime (task, timeslots) { // eslint-disable-line complexity
const l = timeslots.length
if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]))
return
}
const i = binarySearch(task.time, timeslots)
if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]))
} else if (task.time === timeslots[i].time) {
timeslots[i].events.push(task)
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]))
}
}
function removeAllFrom (f, timeslot) {
timeslot.events = base.removeAll(f, timeslot.events)
}
function binarySearch (t, sortedArray) { // eslint-disable-line complexity
let lo = 0
let hi = sortedArray.length
let mid, y
while (lo < hi) {
mid = Math.floor((lo + hi) / 2)
y = sortedArray[mid]
if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid
} else {
lo = mid + 1
}
}
return hi
}
const newTimeslot = (t, events) => ({ time: t, events: events })

View File

@ -0,0 +1,16 @@
import { ClockTimer } from './ClockTimer'
import { Scheduler } from './Scheduler'
import { Timeline } from './Timeline'
import { curry2 } from '@most/prelude'
export { endTask, eventTask } from './PropagateTask'
export const defaultScheduler = new Scheduler(new ClockTimer(), new Timeline())
export const scheduleTasks = curry2((tasks, scheduler) => {
const scheduledTasks = tasks.map(task => scheduler.asap(task))
return {
dispose: () => scheduledTasks.forEach(task => task.dispose())
}
})

View File

@ -0,0 +1,24 @@
export class IndexSink {
constructor (i, sink) {
this.sink = sink
this.index = i
this.active = true
this.value = void 0
}
event (t) {
return x => {
if (!this.active) return
this.value = x
this.sink.event(t)(this)
}
}
end (t, x) {
if (!this.active) return
this.active = false
this.sink.end(t, { index: this.index, value: x })
}
}

View File

@ -0,0 +1 @@
export * from './IndexSink'

27
package.json Normal file
View File

@ -0,0 +1,27 @@
{
"name": "purescript-tempest",
"version": "0.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"build": "rollup -c && pulp build"
},
"author": "Tylor Steinberger",
"license": "MIT",
"devDependencies": {
"@most/eslint-config-most": "^1.0.3",
"eslint": "^3.17.1",
"eslint-config-standard": "^7.0.1",
"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^2.1.1",
"rollup": "^0.41.4",
"rollup-plugin-buble": "^0.15.0",
"rollup-plugin-flow": "^1.1.1",
"rollup-plugin-node-resolve": "^2.0.0",
"uglify-js": "^2.8.11"
},
"dependencies": {
"@most/prelude": "^1.5.0"
}
}

14
rollup.config.js Normal file
View File

@ -0,0 +1,14 @@
import buble from 'rollup-plugin-buble'
import flow from 'rollup-plugin-flow'
import resolve from 'rollup-plugin-node-resolve'
export default {
entry: 'ffi/Control/Stream/index.js',
dest: 'src/Control/Stream.js',
format: 'cjs',
plugins: [
flow(),
buble(),
resolve({ module: true, jsnext: true })
]
}

688
src/Control/Stream.js Normal file
View File

@ -0,0 +1,688 @@
'use strict';
Object.defineProperty(exports, '__esModule', { value: true });
/** @license MIT License (c) copyright 2010-2016 original author or authors */
// Non-mutating array operations
// cons :: a -> [a] -> [a]
// a with x prepended
// append :: a -> [a] -> [a]
// a with x appended
// drop :: Int -> [a] -> [a]
// drop first n elements
// tail :: [a] -> [a]
// drop head element
// copy :: [a] -> [a]
// duplicate a (shallow duplication)
// map :: (a -> b) -> [a] -> [b]
// transform each element with f
function map (f, a) {
var l = a.length;
var b = new Array(l);
for (var i = 0; i < l; ++i) {
b[i] = f(a[i]);
}
return b
}
// reduce :: (a -> b -> a) -> a -> [b] -> a
// accumulate via left-fold
// replace :: a -> Int -> [a]
// replace element at index
// remove :: Int -> [a] -> [a]
// remove element at index
// removeAll :: (a -> boolean) -> [a] -> [a]
// remove all elements matching a predicate
function removeAll (f, a) {
var l = a.length;
var b = new Array(l);
var j = 0;
for (var x = (void 0), i = 0; i < l; ++i) {
x = a[i];
if (!f(x)) {
b[j] = x;
++j;
}
}
b.length = j;
return b
}
// findIndex :: a -> [a] -> Int
// find index of x in a, from the left
function findIndex (x, a) {
for (var i = 0, l = a.length; i < l; ++i) {
if (x === a[i]) {
return i
}
}
return -1
}
// isArrayLike :: * -> boolean
// Return true iff x is array-like
/** @license MIT License (c) copyright 2010-2016 original author or authors */
// id :: a -> a
var id = function (x) { return x; };
// compose :: (b -> c) -> (a -> b) -> (a -> c)
// apply :: (a -> b) -> a -> b
// curry2 :: ((a, b) -> c) -> (a -> b -> c)
function curry2 (f) {
function curried (a, b) {
switch (arguments.length) {
case 0: return curried
case 1: return function (b) { return f(a, b); }
default: return f(a, b)
}
}
return curried
}
// curry3 :: ((a, b, c) -> d) -> (a -> b -> c -> d)
function curry3 (f) {
function curried (a, b, c) { // eslint-disable-line complexity
switch (arguments.length) {
case 0: return curried
case 1: return curry2(function (b, c) { return f(a, b, c); })
case 2: return function (c) { return f(a, b, c); }
default:return f(a, b, c)
}
}
return curried
}
// curry4 :: ((a, b, c, d) -> e) -> (a -> b -> c -> d -> e)
/** @license MIT License (c) copyright 2016 original author or authors */
var Disposable = function Disposable (dispose, data) {
this._dispose = dispose;
this._data = data;
};
Disposable.prototype.dispose = function dispose () {
return this._dispose(this._data)
};
var MemoizedDisposable = function MemoizedDisposable (disposable) {
this.disposed = false;
this.value = undefined;
this.disposable = disposable;
};
MemoizedDisposable.prototype.dispose = function dispose () {
if (!this.disposed) {
this.disposed = true;
this.value = disposeSafely(this.disposable);
this.disposable = undefined;
}
return this.value
};
function disposeSafely$1 (disposable) {
disposable.dispose();
}
var createDisposable = curry2(function (dispose, data) { return new MemoizedDisposable(new Disposable(dispose, data)); });
var emptyDisposable = new Disposable(id, undefined);
var disposeAll = function (disposables) { return createDisposable(map(disposeSafely$1, disposables), disposables); };
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/*global setTimeout, clearTimeout*/
function defer (task) {
return Promise.resolve(task).then(runTask)
}
function runTask (task) {
return task.run()
}
var ClockTimer = function ClockTimer () {
this.now = Date.now;
};
ClockTimer.prototype.setTimer = function setTimer (f, dt) {
return dt <= 0 ? runAsap(f) : setTimeout(f, dt)
};
ClockTimer.prototype.clearTimer = function clearTimer (t) {
return t instanceof Asap ? t.cancel() : clearTimeout(t)
};
var Asap = function Asap (f) {
this.f = f;
this.active = true;
};
Asap.prototype.run = function run () {
return this.active && this.f()
};
Asap.prototype.cancel = function cancel () {
this.active = false;
};
function runAsap (f) {
var task = new Asap(f);
defer(task);
return task
}
/* */
function ScheduledTask (delay, period, task, scheduler) {
var this$1 = this;
this.time = delay;
this.period = period;
this.task = task;
this.scheduler = scheduler;
this.active = true;
this.run = function () { return task.run(this$1.time); };
this.dispose = function () {
scheduler.cancel(this$1);
task.dispose();
};
}
/* */
function runTask$1 (task) {
try {
return task.run()
} catch (e) {
console.error(e);
}
}
var Scheduler = function Scheduler (timer, timeline) {
var this$1 = this;
this.timer = timer;
this.timeline = timeline;
this._timer = null;
this._nextArrival = Infinity;
this._runReadyTasksBound = function () { return this$1._runReadyTasks(this$1.now()); };
};
Scheduler.prototype.now = function now () {
return this.timer.now()
};
Scheduler.prototype.asap = function asap (task) {
return this.schedule(0, -1, task)
};
Scheduler.prototype.delay = function delay (delay$1, task) {
return this.schedule(delay$1, -1, task)
};
Scheduler.prototype.periodic = function periodic (period, task) {
return this.schedule(0, period, task)
};
Scheduler.prototype.schedule = function schedule (delay, period, task) {
var now = this.now();
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this);
this.timeline.add(st);
this._scheduleNextRun(now);
return st
};
Scheduler.prototype.cancel = function cancel (task) {
task.active = false;
if (this.timeline.remove(task)) {
this._reschedule();
}
};
Scheduler.prototype.cancelAll = function cancelAll (f) {
this.timeline.removeAll(f);
this._reschedule();
};
Scheduler.prototype._reschedule = function _reschedule () {
if (this.timeline.isEmpty()) {
this._unschedule();
} else {
this._scheduleNextRun(this.now());
}
};
Scheduler.prototype._unschedule = function _unschedule () {
this.timer.clearTimer(this._timer);
this._timer = null;
};
Scheduler.prototype._scheduleNextRun = function _scheduleNextRun (now) { // eslint-disable-line complexity
if (this.timeline.isEmpty()) {
return
}
var nextArrival = this.timeline.nextArrival();
if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now);
} else if (nextArrival < this._nextArrival) {
this._unschedule();
this._scheduleNextArrival(nextArrival, now);
}
};
Scheduler.prototype._scheduleNextArrival = function _scheduleNextArrival (nextArrival, now) {
this._nextArrival = nextArrival;
var delay = Math.max(0, nextArrival - now);
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay);
};
Scheduler.prototype._runReadyTasks = function _runReadyTasks (now) {
this._timer = null;
this.timeline.runTasks(now, runTask$1);
this._scheduleNextRun(this.now());
};
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var Timeline = function Timeline () {
this.tasks = [];
};
Timeline.prototype.nextArrival = function nextArrival () {
return this.isEmpty() ? Infinity : this.tasks[0].time
};
Timeline.prototype.isEmpty = function isEmpty () {
return this.tasks.length === 0
};
Timeline.prototype.add = function add (st) {
insertByTime(st, this.tasks);
};
Timeline.prototype.remove = function remove$$1 (st) {
var i = binarySearch(st.time, this.tasks);
if (i >= 0 && i < this.tasks.length) {
var at = findIndex(st, this.tasks[i].events);
if (at >= 0) {
this.tasks[i].events.splice(at, 1);
return true
}
}
return false
};
Timeline.prototype.removeAll = function removeAll$$1 (f) {
var this$1 = this;
for (var i = 0; i < this.tasks.length; ++i) {
removeAllFrom(f, this$1.tasks[i]);
}
};
Timeline.prototype.runTasks = function runTasks (t, runTask) {
var this$1 = this;
var tasks = this.tasks;
var l = tasks.length;
var i = 0;
while (i < l && tasks[i].time <= t) {
++i;
}
this.tasks = tasks.slice(i);
// Run all ready tasks
for (var j = 0; j < i; ++j) {
this$1.tasks = runReadyTasks(runTask, tasks[j].events, this$1.tasks);
}
};
function runReadyTasks (runTask, events, tasks) { // eslint-disable-line complexity
for (var i = 0; i < events.length; ++i) {
var task = events[i];
if (task.active) {
runTask(task);
// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0 && task.active) {
task.time = task.time + task.period;
insertByTime(task, tasks);
}
}
}
return tasks
}
function insertByTime (task, timeslots) { // eslint-disable-line complexity
var l = timeslots.length;
if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]));
return
}
var i = binarySearch(task.time, timeslots);
if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]));
} else if (task.time === timeslots[i].time) {
timeslots[i].events.push(task);
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]));
}
}
function removeAllFrom (f, timeslot) {
timeslot.events = removeAll(f, timeslot.events);
}
function binarySearch (t, sortedArray) { // eslint-disable-line complexity
var lo = 0;
var hi = sortedArray.length;
var mid, y;
while (lo < hi) {
mid = Math.floor((lo + hi) / 2);
y = sortedArray[mid];
if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid;
} else {
lo = mid + 1;
}
}
return hi
}
var newTimeslot = function (t, events) { return ({ time: t, events: events }); };
var propagateTask =
curry3(function (run, value, sink) { return new PropagateTask(run, value, sink); });
var eventTask =
curry2(function (value, sink) { return propagateTask(runEvent, value, sink); });
var endTask = function (sink) { return propagateTask(runEnd, void 0, sink); };
var PropagateTask = function PropagateTask (run, value, sink) {
var active = true;
this.dispose = function () { active = false; };
this.run = function (t) {
if (!active) { return }
run(t, value, sink);
};
};
function runEvent (time, value, sink) {
sink.event(time)(value);
}
function runEnd (time, value, sink) {
sink.end(time, value);
}
var defaultScheduler = new Scheduler(new ClockTimer(), new Timeline());
var scheduleTasks = curry2(function (tasks, scheduler) {
var scheduledTasks = tasks.map(function (task) { return scheduler.asap(task); });
return {
dispose: function () { return scheduledTasks.forEach(function (task) { return task.dispose(); }); }
}
});
var Stream = function Stream (source) {
this.source = source;
};
var tapEnd = curry2(function (f, stream) { return new Stream(new TapEnd(f, stream.source)); });
var tapEvent = curry2(function (f, stream) { return new Stream(new TapEvent(f, stream.source)); });
var TapEvent = function TapEvent (f, source) {
this.source = source;
this.f = f;
this.run = function (sink) { return function (scheduler) { return source.run(new TapSink(f, sink))(scheduler); }; };
};
var TapSink = function TapSink (f, sink) {
this.event = function (t) { return function (x) {
f(x)();
sink.event(t)(x);
}; };
this.end = function (t) {
sink.end(t);
};
};
var TapEnd = function TapEnd (f, source) {
this.source = source;
this.f = f;
this.run = function (sink) { return function (scheduler) { return source.run(new TapEndSink(f, sink))(scheduler); }; };
};
var TapEndSink = function TapEndSink (f, sink) {
this.event = function (t) { return function (x) { return sink.event(t)(x); }; };
this.end = function (t) {
f();
sink.end(t);
};
};
var SettableDisposable = function SettableDisposable () {
this.disposable = void 0;
this.disposed = false;
this.result = void 0;
};
SettableDisposable.prototype.setDisposable = function setDisposable (disposable) {
this.disposable = disposable;
if (this.disposed) {
this.result = disposable.dispose();
}
};
SettableDisposable.prototype.dispose = function dispose () {
if (this.disposed) {
return this.result
}
this.disposed = true;
if (this.disposable !== void 0) {
this.result = this.disposable.dispose();
}
return this.result
};
function runEffects (stream, scheduler) {
return runSourceEffects(stream.source, scheduler)
}
var runSourceEffects = function (source, scheduler) { return runSource(source, scheduler); };
function runSource (source, scheduler) {
var disposable = new SettableDisposable();
var observer = new RunEffectsSink(disposable);
disposable.setDisposable(source.run(observer)(scheduler));
}
var RunEffectsSink = function RunEffectsSink (disposable) {
this._disposable = disposable;
this.active = true;
};
RunEffectsSink.prototype.event = function event (t) {
return function (x) {}
};
RunEffectsSink.prototype.end = function end (t) {
this.active = false;
Promise.resolve(this._disposable).then(function (d) { return d.dispose(); });
};
var drain = function (stream) { return function () { return runEffects(stream, defaultScheduler); }; };
var combine =
curry3(function (f, stream1, stream2) { return new Stream(new Combine(f, stream1.source, stream2.source)); });
var Combine = function Combine (f, source1, source2) {
this.f = f;
this.source1 = source1;
this.source2 = source2;
};
Combine.prototype.run = function run (sink) {
var this$1 = this;
return function (scheduler) {
var disposables = Array(2);
var sinks = Array(2);
var mergeSink = new CombineSink(disposables, sinks, sink, this$1.f);
sinks[0] = new IndexSink(0, mergeSink);
sinks[1] = new IndexSink(1, mergeSink);
disposables[0] = this$1.source1.run(sinks[0])(scheduler);
disposables[1] = this$1.source2.run(sinks[1])(scheduler);
return disposeAll(disposables)
}
};
var IndexSink = function IndexSink () {};
IndexSink.prototype.cosntructor = function cosntructor (i, sink) {
this.i = i;
this.sink = sink;
};
IndexSink.prototype.event = function event (t) {
var this$1 = this;
return function (x) {
this$1.sink.event(t)({ index: this$1.i, value: x });
}
};
IndexSink.prototype.end = function end (t) {
this.sink.end(t, this.i);
};
var CombineSink = function CombineSink (disposables, sinks, sink, f) {
this.disposables = disposables;
this.sinks = sinks;
this.sink = sink;
this.f = f;
this.awaiting = 2;
this.values = Array(2);
this.hasValue = [false, false];
this.activeCount = 2;
};
CombineSink.prototype.event = function event (t) {
var this$1 = this;
return function (ref) {
var index = ref.index;
var value = ref.value;
var f = this$1.f;
var awaiting = this$1._updateReady(index);
this$1.values[index] = value;
if (awaiting === 0) {
var result = f(this$1.values[0])(this$1.values[1]);
this$1.sink.event(t)(result);
}
}
};
CombineSink.prototype._updateReady = function _updateReady (index) {
if (this.awaiting > 0) {
if (!this.hasValue[index]) {
this.hasValue[index] = true;
this.awaiting -= 1;
}
}
return this.awaiting
};
CombineSink.prototype.end = function end (t, ref) {
var index = ref.index;
var value = ref.value;
this.disposables[index].dispose();
if (--this.activeCount === 0) { this.sink.end(t); }
};
/* */
exports.createDisposable = createDisposable;
exports.emptyDisposable = emptyDisposable;
exports.disposeAll = disposeAll;
exports.defaultScheduler = defaultScheduler;
exports.scheduleTasks = scheduleTasks;
exports.endTask = endTask;
exports.eventTask = eventTask;
exports.tapEnd = tapEnd;
exports.tapEvent = tapEvent;
exports.drain = drain;
exports.combine = combine;

130
src/Control/Stream.purs Normal file
View File

@ -0,0 +1,130 @@
module Control.Stream
( STREAM
, EffStream
, PureStream
, Stream
, Source
, Sink
, Scheduler
, Task
, ScheduledTask
, Disposable
, Time
, createStream
, getSource
, runSource
, defaultScheduler
, eventTask
, endTask
, createDisposable
, emptyDisposable
, disposeAll
, scheduleTasks
, drain
, tapEvent
, tapEnd
, just
, fromArray
)
where
import Control.Monad.Eff (Eff, Pure)
import Data.Array (snoc)
import Data.Functor (class Functor, map)
import Data.Unit (Unit)
import Prelude (flip)
newtype Time = Time Int
foreign import data STREAM :: !
type EffStream e a = Eff (stream :: STREAM | e) a
type PureStream a = EffStream () a
newtype Stream a = Stream { source :: Source a }
type Source a = { run :: Sink a -> Scheduler -> Disposable }
type Sink a =
{ event :: Time -> a -> Unit
, end :: Time -> Unit
}
type Scheduler =
{ now :: Pure Time
, asap :: Task -> ScheduledTask
, delay :: Number -> Task -> ScheduledTask
, period :: Number -> Task -> ScheduledTask
, schedule :: Number -> Number -> Task -> ScheduledTask
, cancel :: ScheduledTask -> Unit
, cancelAll :: (ScheduledTask -> Boolean) -> Unit
}
type Task =
{ run :: Time -> Unit
, dispose :: Pure Unit
}
type ScheduledTask =
{ run :: Pure Unit
, dispose :: Pure Unit
}
type Disposable = { dispose :: Pure Unit }
instance functorStream :: Functor Stream where
map = _map
createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a
createStream run = Stream { source: { run } }
getSource :: forall a. Stream a -> Source a
getSource stream = case stream of Stream a -> a.source
runSource :: forall a. Stream a -> Sink a -> Scheduler -> Disposable
runSource stream sink scheduler = (getSource stream).run sink scheduler
just :: forall a. a -> Stream a
just a = createStream (runJust a)
runJust :: forall a. a -> Sink a -> Scheduler -> Disposable
runJust a sink scheduler =
scheduleTasks [ eventTask a sink, endTask sink ] scheduler
-- TODO: remove fromArray because it's not a real event stream
-- replace with a fromArray combinator
fromArray :: forall a. Array a -> Stream a
fromArray arr = createStream (runFromArray arr)
runFromArray :: forall a. Array a -> Sink a -> Scheduler -> Disposable
runFromArray arr sink scheduler = scheduleTasks tasks scheduler
where
tasks :: Array Task
tasks = snoc eventTasks (endTask sink)
eventTasks :: Array Task
eventTasks = map (flip eventTask sink) arr
_map :: forall a b. (a -> b) -> Stream a -> Stream b
_map f stream = createStream runMap
where
runMap :: Sink b -> Scheduler -> Disposable
runMap sink scheduler =
runSource stream (mapSink f sink) scheduler
mapSink :: forall a b. (a -> b) -> Sink b -> Sink a
mapSink f sink = { event: mapEvent, end: sink.end }
where
mapEvent :: Time -> a -> Unit
mapEvent time value = sink.event time (f value)
foreign import defaultScheduler :: Scheduler
foreign import eventTask :: forall a. a -> Sink a -> Task
foreign import endTask :: forall a. Sink a -> Task
foreign import createDisposable :: forall a. (a -> Unit) -> a -> Disposable
foreign import emptyDisposable :: Disposable
foreign import disposeAll :: Array Disposable -> Disposable
foreign import scheduleTasks :: Array Task -> Scheduler -> Disposable
foreign import drain :: forall e a. Stream a -> EffStream e Unit
foreign import tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a
foreign import tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a

1033
yarn.lock Normal file

File diff suppressed because it is too large Load Diff