feat(stream): more combinators

This commit is contained in:
Tylor Steinberger 2017-03-16 00:38:43 -04:00
parent 13c18b7077
commit a50a246e69
13 changed files with 573 additions and 39 deletions

View File

@ -9,7 +9,8 @@
"dependencies": {
"purescript-eff": "^2.0.0",
"purescript-prelude": "^2.5.0",
"purescript-refs": "^2.0.0"
"purescript-refs": "^2.0.0",
"purescript-maybe": "^2.1.1"
},
"devDependencies": {
"purescript-console": "^2.0.0",

View File

@ -0,0 +1,41 @@
import { endTask, eventTask } from '../scheduler'
import { Stream } from '../Stream'
import { disposeAll } from '../disposables'
export const delay = delayTime => stream =>
delayTime <= 0 ? stream : new Stream(new Delay(delayTime, stream.source))
class Delay {
constructor (dt, source) {
this.dt = dt
this.source = source
}
run (sink) {
return scheduler => {
const delaySink = new DelaySink(this.dt, sink, scheduler)
return disposeAll([delaySink, this.source.run(delaySink)(scheduler)])
}
}
}
class DelaySink {
constructor (dt, sink, scheduler) {
this.dt = dt
this.sink = sink
this.scheduler = scheduler
}
dispose () {
this.scheduler.cancelAll(task => task.sink === this.sink)
}
event (t) {
return x => this.scheduler.delay(this.dt)(eventTask(x)(this.sink))
}
end (t) {
this.scheduler.delay(this.dt)(endTask(this.sink))
}
}

View File

@ -4,3 +4,5 @@ export * from './combine'
export * from './merge'
export * from './concat'
export * from './switch'
export * from './skipRepeatsWith'
export * from './delay'

View File

@ -0,0 +1,40 @@
import { Stream } from '../Stream'
export const skipRepeatsWith = equals => stream =>
new Stream(new SkipRepeats(equals, stream))
class SkipRepeats {
constructor (equals, source) {
this.equals = equals
this.source = source
}
run (sink) {
return scheduler => this.source.run(new SkipRepeatsSink(this.equals, sink), scheduler)
}
}
class SkipRepeatsSink {
constructor (equals, sink) {
this.equals = equals
this.value = void 0
this.init = true
}
event (t) {
return x => {
if (this.init) {
this.init = false
this.value = x
this.sink.event(t, x)
} else if (!this.equals(this.value)(x)) {
this.value = x
this.sink.event(t)(x)
}
}
}
end (t) {
this.sink.end(t)
}
}

View File

@ -2,3 +2,4 @@
export * from './disposables'
export * from './scheduler'
export * from './combinators'
export * from './multicast'

View File

@ -0,0 +1,45 @@
import { append, drop } from '@most/prelude'
import { MulticastSource } from './MulticastSource'
export class HoldSource extends MulticastSource {
constructor (source, bufferSize) {
super(source)
this.bufferSize = bufferSize
this.has = false
this.buffer = []
}
add (sink, scheduler) {
if (this.has) {
pushEvents(this.buffer, sink, scheduler)
}
return super.add(sink, scheduler)
}
event (time) {
return value => {
this.has = true
this.buffer = dropAndAppend(value, this.buffer, this.bufferSize)
return super.event(time)(value)
}
}
}
function pushEvents (buffer, sink, scheduler) {
const length = buffer.length
for (let i = 0; i < length; ++i) {
sink.event(scheduler.now(), buffer[i])
}
}
export function dropAndAppend (value, buffer, bufferSize) {
if (buffer.length === bufferSize) {
return append(value, drop(1, buffer))
}
return append(value, buffer)
}

View File

@ -0,0 +1,16 @@
export class MulticastDisposable {
constructor (source, sink) {
this.source = source
this.sink = sink
this.disposed = false
}
dispose () {
if (this.disposed) {
return
}
this.disposed = true
const remaining = this.source.remove(this.sink)
return remaining === 0 && this.source._dispose()
}
}

View File

@ -0,0 +1,62 @@
import { append, findIndex, remove } from '@most/prelude'
import { dispose, emptyDisposable } from './dispose'
import { MulticastDisposable } from './MulticastDisposable'
export class MulticastSource {
constructor (source) {
this.source = source
this.sinks = []
this._disposable = emptyDisposable
}
run (sink) {
return scheduler => {
const n = this.add(sink, scheduler)
if (n === 1) {
this._disposable = this.source.run(this)(scheduler)
}
return new MulticastDisposable(this, sink)
}
}
_dispose () {
const disposable = this._disposable
this._disposable = emptyDisposable
return Promise.resolve(disposable).then(dispose)
}
add (sink) {
this.sinks = append(sink, this.sinks)
return this.sinks.length
}
remove (sink) {
const i = findIndex(sink, this.sinks)
// istanbul ignore next
if (i >= 0) {
this.sinks = remove(i, this.sinks)
}
return this.sinks.length
}
event (time) {
return value => {
const s = this.sinks
if (s.length === 1) {
return s[0].event(time)(value)
}
for (let i = 0; i < s.length; ++i) {
s[i].event(time)(value)
}
}
}
end (time) {
const s = this.sinks
for (let i = 0; i < s.length; ++i) {
s[i].end(time)
}
}
}

View File

@ -0,0 +1,5 @@
export const dispose = disposable => disposable.dispose()
export const emptyDisposable = {
dispose () {}
}

View File

@ -0,0 +1,14 @@
import { HoldSource } from './HoldSource'
import { MulticastSource } from './MulticastSource'
import { Stream } from '../Stream'
export function multicast (stream) {
const source = stream.source
return source instanceof MulticastSource
? stream
: new Stream(new MulticastSource(source))
}
export const hold = bufferSize => stream =>
new Stream(new HoldSource(stream.source, bufferSize))

View File

@ -11,6 +11,7 @@ export const endTask = sink => propagateTask(runEnd, void 0, sink)
class PropagateTask {
constructor (run, value, sink) {
let active = true
this.sink = sink
this.dispose = () => { active = false }
this.run = t => {

View File

@ -12,7 +12,16 @@ Object.defineProperty(exports, '__esModule', { value: true });
// append :: a -> [a] -> [a]
// a with x appended
function append (x, a) {
var l = a.length;
var b = new Array(l + 1);
for (var i = 0; i < l; ++i) {
b[i] = a[i];
}
b[l] = x;
return b
}
// drop :: Int -> [a] -> [a]
// drop first n elements
@ -80,7 +89,37 @@ function reduce (f, z, a) {
// remove :: Int -> [a] -> [a]
// remove element at index
function remove (i, a) { // eslint-disable-line complexity
if (i < 0) {
throw new TypeError('i must be >= 0')
}
var l = a.length;
if (l === 0 || i >= l) { // exit early if index beyond end of array
return a
}
if (l === 1) { // exit early if index in bounds and length === 1
return []
}
return unsafeRemove(i, a, l - 1)
}
// unsafeRemove :: Int -> [a] -> Int -> [a]
// Internal helper to remove element at index
function unsafeRemove (i, a, l) {
var b = new Array(l);
var j;
for (j = 0; j < i; ++j) {
b[j] = a[j];
}
for (j = i; j < l; ++j) {
b[j] = a[j + 1];
}
return b
}
// removeAll :: (a -> boolean) -> [a] -> [a]
// remove all elements matching a predicate
@ -491,6 +530,7 @@ var endTask = function (sink) { return propagateTask(runEnd, void 0, sink); };
var PropagateTask = function PropagateTask (run, value, sink) {
var active = true;
this.sink = sink;
this.dispose = function () { active = false; };
this.run = function (t) {
@ -1099,6 +1139,223 @@ Segment.prototype._dispose = function (t) {
this.disposable.dispose();
};
var skipRepeatsWith = function (equals) { return function (stream) { return new Stream(new SkipRepeats(equals, stream)); }; };
var SkipRepeats = function SkipRepeats (equals, source) {
this.equals = equals;
this.source = source;
};
SkipRepeats.prototype.run = function run (sink) {
var this$1 = this;
return function (scheduler) { return this$1.source.run(new SkipRepeatsSink(this$1.equals, sink), scheduler); }
};
var SkipRepeatsSink = function SkipRepeatsSink (equals, sink) {
this.equals = equals;
this.value = void 0;
this.init = true;
};
SkipRepeatsSink.prototype.event = function event (t) {
var this$1 = this;
return function (x) {
if (this$1.init) {
this$1.init = false;
this$1.value = x;
this$1.sink.event(t, x);
} else if (!this$1.equals(this$1.value)(x)) {
this$1.value = x;
this$1.sink.event(t)(x);
}
}
};
SkipRepeatsSink.prototype.end = function end (t) {
this.sink.end(t);
};
var delay = function (delayTime) { return function (stream) { return delayTime <= 0 ? stream : new Stream(new Delay(delayTime, stream.source)); }; };
var Delay = function Delay (dt, source) {
this.dt = dt;
this.source = source;
};
Delay.prototype.run = function run (sink) {
var this$1 = this;
return function (scheduler) {
var delaySink = new DelaySink(this$1.dt, sink, scheduler);
return disposeAll([delaySink, this$1.source.run(delaySink)(scheduler)])
}
};
var DelaySink = function DelaySink (dt, sink, scheduler) {
this.dt = dt;
this.sink = sink;
this.scheduler = scheduler;
};
DelaySink.prototype.dispose = function dispose () {
var this$1 = this;
this.scheduler.cancelAll(function (task) { return task.sink === this$1.sink; });
};
DelaySink.prototype.event = function event (t) {
var this$1 = this;
return function (x) { return this$1.scheduler.delay(this$1.dt)(eventTask(x)(this$1.sink)); }
};
DelaySink.prototype.end = function end (t) {
this.scheduler.delay(this.dt)(endTask(this.sink));
};
var dispose$1 = function (disposable) { return disposable.dispose(); };
var emptyDisposable$1 = {
dispose: function dispose () {}
};
var MulticastDisposable = function MulticastDisposable (source, sink) {
this.source = source;
this.sink = sink;
this.disposed = false;
};
MulticastDisposable.prototype.dispose = function dispose () {
if (this.disposed) {
return
}
this.disposed = true;
var remaining = this.source.remove(this.sink);
return remaining === 0 && this.source._dispose()
};
var MulticastSource = function MulticastSource (source) {
this.source = source;
this.sinks = [];
this._disposable = emptyDisposable$1;
};
MulticastSource.prototype.run = function run (sink) {
var this$1 = this;
return function (scheduler) {
var n = this$1.add(sink, scheduler);
if (n === 1) {
this$1._disposable = this$1.source.run(this$1)(scheduler);
}
return new MulticastDisposable(this$1, sink)
}
};
MulticastSource.prototype._dispose = function _dispose () {
var disposable = this._disposable;
this._disposable = emptyDisposable$1;
return Promise.resolve(disposable).then(dispose$1)
};
MulticastSource.prototype.add = function add (sink) {
this.sinks = append(sink, this.sinks);
return this.sinks.length
};
MulticastSource.prototype.remove = function remove$1 (sink) {
var i = findIndex(sink, this.sinks);
// istanbul ignore next
if (i >= 0) {
this.sinks = remove(i, this.sinks);
}
return this.sinks.length
};
MulticastSource.prototype.event = function event (time) {
var this$1 = this;
return function (value) {
var s = this$1.sinks;
if (s.length === 1) {
return s[0].event(time)(value)
}
for (var i = 0; i < s.length; ++i) {
s[i].event(time)(value);
}
}
};
MulticastSource.prototype.end = function end (time) {
var s = this.sinks;
for (var i = 0; i < s.length; ++i) {
s[i].end(time);
}
};
var HoldSource = (function (MulticastSource$$1) {
function HoldSource (source, bufferSize) {
MulticastSource$$1.call(this, source);
this.bufferSize = bufferSize;
this.has = false;
this.buffer = [];
}
if ( MulticastSource$$1 ) HoldSource.__proto__ = MulticastSource$$1;
HoldSource.prototype = Object.create( MulticastSource$$1 && MulticastSource$$1.prototype );
HoldSource.prototype.constructor = HoldSource;
HoldSource.prototype.add = function add (sink, scheduler) {
if (this.has) {
pushEvents(this.buffer, sink, scheduler);
}
return MulticastSource$$1.prototype.add.call(this, sink, scheduler)
};
HoldSource.prototype.event = function event (time) {
var this$1 = this;
return function (value) {
this$1.has = true;
this$1.buffer = dropAndAppend(value, this$1.buffer, this$1.bufferSize);
return MulticastSource$$1.prototype.event.call(this$1, time)(value)
}
};
return HoldSource;
}(MulticastSource));
function pushEvents (buffer, sink, scheduler) {
var length = buffer.length;
for (var i = 0; i < length; ++i) {
sink.event(scheduler.now(), buffer[i]);
}
}
function dropAndAppend (value, buffer, bufferSize) {
if (buffer.length === bufferSize) {
return append(value, drop(1, buffer))
}
return append(value, buffer)
}
function multicast (stream) {
var source = stream.source;
return source instanceof MulticastSource
? stream
: new Stream(new MulticastSource(source))
}
var hold = function (bufferSize) { return function (stream) { return new Stream(new HoldSource(stream.source, bufferSize)); }; };
/* */
exports.createDisposable = createDisposable;
@ -1121,3 +1378,7 @@ exports.merge = merge;
exports.concat = concat;
exports.continueWith = continueWith;
exports.switch = switchLatest;
exports.skipRepeatsWith = skipRepeatsWith;
exports.delay = delay;
exports.multicast = multicast;
exports.hold = hold;

View File

@ -3,14 +3,15 @@ module Control.Stream
, EffStream
, Stream
, Source
, EventFn
, EndFn
, RunFn
, Sink
, Scheduler
, Task
, ScheduledTask
, Disposable
, Time
, EventFn
, EndFn
-- Scheduler-related functions and values
, defaultScheduler
, eventTask
@ -46,29 +47,39 @@ module Control.Stream
, constant
, scan
, startWith
, multicast
, hold
, skipRepeats
, skipRepeatsWith
, delay
, sample
-- Stream factories
, just
, fromArray
, empty
, never
, periodic
)
where
import Control.Alt ((<|>))
import Control.Applicative (class Applicative)
import Control.Apply (class Apply)
import Control.Bind (class Bind)
import Control.Category (id)
import Control.Monad (class Monad)
import Control.Monad.Eff (Eff, Pure)
import Control.Monad.Eff (Eff, Pure, runPure)
import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef)
import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff)
import Control.MonadPlus (class Alt, class Plus)
import Data.Array (snoc)
import Data.Eq (class Eq, eq)
import Data.Function (flip, ($))
import Data.Functor (class Functor, map)
import Data.Maybe (Maybe(Just, Nothing), fromJust, isJust)
import Data.Monoid (class Monoid)
import Data.Semigroup (class Semigroup)
import Data.Unit (Unit, unit)
import Partial.Unsafe (unsafePartial)
newtype Time = Time Int
@ -78,9 +89,13 @@ type EffStream e a = Eff (stream :: STREAM | e) a
newtype Stream a = Stream { source :: Source a }
type Source a = { run :: Sink a -> Scheduler -> Disposable }
newtype Source a = Source { run :: RunFn a }
type Sink a =
type RunFn a = Sink a -> Scheduler -> Disposable
type EventFn a = Time -> a -> Unit
type EndFn = Time -> Unit
newtype Sink a = Sink
{ event :: Time -> a -> Unit
, end :: Time -> Unit
}
@ -107,9 +122,6 @@ type ScheduledTask =
type Disposable = { dispose :: Pure Unit }
type EventFn a b = (Sink b -> Scheduler -> Time -> a -> Unit)
type EndFn a = (Sink a -> Scheduler -> Time -> Unit)
instance functorStream :: Functor Stream where
map = _map
@ -138,36 +150,37 @@ instance plusStream :: Plus Stream where
-- Stream-related helpers
createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a
createStream run = Stream { source: { run } }
createStream run = Stream { source: Source { run } }
getSource :: forall a. Stream a -> Source a
getSource stream = case stream of Stream a -> a.source
getSource (Stream stream) = stream.source
runSource :: forall a. Stream a -> Sink a -> Scheduler -> Disposable
runSource stream sink scheduler = (getSource stream).run sink scheduler
runSource :: forall a. Source a -> Sink a -> Scheduler -> Disposable
runSource (Source source) sink scheduler = source.run sink scheduler
runStream :: forall a. Stream a -> Sink a -> Scheduler -> Disposable
runStream stream sink scheduler = runSource (getSource stream) sink scheduler
createSink :: forall a. (Time -> a -> Unit) -> (Time -> Unit) -> Sink a
createSink event end = { event, end }
createSink event end = Sink { event, end }
createCombinator :: forall a b. EventFn a b -> EndFn b -> Stream a -> Stream b
createCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> (Sink b -> Scheduler -> EndFn) -> Stream a -> Stream b
createCombinator event end stream = createStream runCombinator
where
runCombinator :: Sink b -> Scheduler -> Disposable
runCombinator sink scheduler =
runSource stream (createSink (event sink scheduler) (end sink scheduler)) scheduler
runStream stream (createSink (event sink scheduler) (end sink scheduler)) scheduler
createEventCombinator :: forall a b. EventFn a b -> Stream a -> Stream b
createEventCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> Stream a -> Stream b
createEventCombinator event stream = createCombinator event end stream
where
end :: EndFn b
end sink scheduler time =
sink.end time
end (Sink sink) (Scheduler scheduler) time = sink.end time
createEndCombinator :: forall a. EndFn a -> Stream a -> Stream a
createEndCombinator :: forall a. (Sink a -> Scheduler -> EndFn) -> Stream a -> Stream a
createEndCombinator end stream = createCombinator event end stream
where
event :: EventFn a a
event sink scheduler time value = sink.event time value
event :: Sink a -> Scheduler -> EventFn a
event (Sink sink) scheduler time value = sink.event time value
-- Stream factories
just :: forall a. a -> Stream a
@ -192,7 +205,7 @@ runFromArray :: forall a. Array a -> Sink a -> Scheduler -> Disposable
runFromArray arr sink scheduler = scheduleTasks tasks scheduler
where
tasks :: Array Task
tasks = snoc eventTasks (endTask sink)
tasks = eventTasks <|> [ (endTask sink) ]
eventTasks :: Array Task
eventTasks = map (flip eventTask sink) arr
@ -201,39 +214,34 @@ periodic :: Number -> Stream Unit
periodic period = createStream (runPeriodic period)
runPeriodic :: Number -> Sink Unit -> Scheduler -> Disposable
runPeriodic period sink scheduler = { dispose: scheduledTask.dispose }
runPeriodic period sink (Scheduler scheduler) = { dispose: scheduledTask.dispose }
where
p = case scheduler of Scheduler a -> a.periodic
scheduledTask = p period (eventTask unit sink)
scheduledTask = scheduler.periodic period (eventTask unit sink)
-- combinators
_map :: forall a b. (a -> b) -> Stream a -> Stream b
_map f stream = createEventCombinator mapEvent stream
where
mapEvent :: EventFn a b
mapEvent sink scheduler time value = sink.event time (f value)
mapEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (f value)
filter :: forall a. (a -> Boolean) -> Stream a -> Stream a
filter predicate stream = createEventCombinator event stream
where
event :: EventFn a a
event sink scheduler time value = if predicate value
event (Sink sink) (Scheduler scheduler) time value = if predicate value
then sink.event time value
else unit
tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a
tapEvent f stream = createEventCombinator event stream
where
event :: EventFn a a
event sink scheduler time value = sink.event time value
event (Sink sink) (Scheduler scheduler) time value = sink.event time value
where -- find a better way to perform these side effects
x = unsafePerformEff (f value)
tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a
tapEnd f stream = createEndCombinator end stream
where
end :: EndFn a
end sink scheduler time = sink.end time
end (Sink sink) (Scheduler scheduler) time = sink.end time
where -- find a better way to perform these side effects
x = unsafePerformEff f
@ -243,18 +251,51 @@ startWith value stream = concat (just value) stream
constant :: forall a b. b -> Stream a -> Stream b
constant value stream = createEventCombinator constantEvent stream
where
constantEvent sink scheduler time x = sink.event time value
constantEvent (Sink sink) (Scheduler scheduler) time x = sink.event time value
scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b
scan f seed stream = startWith seed $ createEventCombinator scanEvent stream
where
state :: State b
state = createState seed
scanEvent :: Sink b -> Scheduler -> Time -> a -> Unit
scanEvent sink scheduler time value = sink.event time (state.set \acc -> f acc value)
scanEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (state.set \acc -> f acc value)
skipRepeats :: forall a. (Eq a) => Stream a -> Stream a
skipRepeats = skipRepeatsWith eq
sample :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c
sample f sampler stream = createStream $ runSample (createState Nothing)
where
runSample :: State (Maybe b) -> Sink c -> Scheduler -> Disposable
runSample state sink scheduler = disposeAll
[ runStream stream (createHoldSink state) scheduler
, runStream sampler (createSampleSink state sink f) scheduler
]
createHoldSink :: forall a. State (Maybe a) -> Sink a
createHoldSink state = createSink event end
where
event time value = always unit (state.set \_ -> Just value)
end time = unit
createSampleSink :: forall a b c. State (Maybe b) -> Sink c -> (a -> b -> c) -> Sink a
createSampleSink state (Sink sink) f = createSink event end
where
end time = sink.end time
event time value =
if isJust (runPure state.get)
then sink.event time $ f value (unsafePartial $ fromJust (runPure state.get))
else unit
always :: forall a b. a -> b -> a
always a b = a
-- find a better way to perform these side effects
createState :: forall a. a -> { get :: Pure a, set :: (a -> a) -> a }
type State a = { get:: Pure a, set :: (a -> a) -> a }
createState :: forall a. a -> State a
createState seed = { set, get }
where
ref :: Ref a
@ -290,3 +331,7 @@ foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Strea
foreign import concat :: forall a. Stream a -> Stream a -> Stream a
foreign import merge :: forall a. Array (Stream a) -> Stream a
foreign import switch :: forall a. Stream (Stream a) -> Stream a
foreign import multicast :: forall a. Stream a -> Stream a
foreign import hold :: forall a. Int -> Stream a -> Stream a
foreign import skipRepeatsWith :: forall a. (a -> a -> Boolean) -> Stream a -> Stream a
foreign import delay :: forall a. Number -> Stream a -> Stream a