diff --git a/bower.json b/bower.json index 0936138..2be2dda 100644 --- a/bower.json +++ b/bower.json @@ -9,7 +9,8 @@ "dependencies": { "purescript-eff": "^2.0.0", "purescript-prelude": "^2.5.0", - "purescript-refs": "^2.0.0" + "purescript-refs": "^2.0.0", + "purescript-maybe": "^2.1.1" }, "devDependencies": { "purescript-console": "^2.0.0", diff --git a/ffi/Control/Stream/combinators/delay.js b/ffi/Control/Stream/combinators/delay.js new file mode 100644 index 0000000..da8a232 --- /dev/null +++ b/ffi/Control/Stream/combinators/delay.js @@ -0,0 +1,41 @@ +import { endTask, eventTask } from '../scheduler' + +import { Stream } from '../Stream' +import { disposeAll } from '../disposables' + +export const delay = delayTime => stream => + delayTime <= 0 ? stream : new Stream(new Delay(delayTime, stream.source)) + +class Delay { + constructor (dt, source) { + this.dt = dt + this.source = source + } + + run (sink) { + return scheduler => { + const delaySink = new DelaySink(this.dt, sink, scheduler) + return disposeAll([delaySink, this.source.run(delaySink)(scheduler)]) + } + } +} + +class DelaySink { + constructor (dt, sink, scheduler) { + this.dt = dt + this.sink = sink + this.scheduler = scheduler + } + + dispose () { + this.scheduler.cancelAll(task => task.sink === this.sink) + } + + event (t) { + return x => this.scheduler.delay(this.dt)(eventTask(x)(this.sink)) + } + + end (t) { + this.scheduler.delay(this.dt)(endTask(this.sink)) + } +} diff --git a/ffi/Control/Stream/combinators/index.js b/ffi/Control/Stream/combinators/index.js index f99d175..f7f854d 100644 --- a/ffi/Control/Stream/combinators/index.js +++ b/ffi/Control/Stream/combinators/index.js @@ -4,3 +4,5 @@ export * from './combine' export * from './merge' export * from './concat' export * from './switch' +export * from './skipRepeatsWith' +export * from './delay' diff --git a/ffi/Control/Stream/combinators/skipRepeatsWith.js b/ffi/Control/Stream/combinators/skipRepeatsWith.js new file mode 100644 index 0000000..61a439c --- /dev/null +++ b/ffi/Control/Stream/combinators/skipRepeatsWith.js @@ -0,0 +1,40 @@ +import { Stream } from '../Stream' + +export const skipRepeatsWith = equals => stream => + new Stream(new SkipRepeats(equals, stream)) + +class SkipRepeats { + constructor (equals, source) { + this.equals = equals + this.source = source + } + + run (sink) { + return scheduler => this.source.run(new SkipRepeatsSink(this.equals, sink), scheduler) + } +} + +class SkipRepeatsSink { + constructor (equals, sink) { + this.equals = equals + this.value = void 0 + this.init = true + } + + event (t) { + return x => { + if (this.init) { + this.init = false + this.value = x + this.sink.event(t, x) + } else if (!this.equals(this.value)(x)) { + this.value = x + this.sink.event(t)(x) + } + } + } + + end (t) { + this.sink.end(t) + } +} diff --git a/ffi/Control/Stream/index.js b/ffi/Control/Stream/index.js index f17d98a..7e793c2 100644 --- a/ffi/Control/Stream/index.js +++ b/ffi/Control/Stream/index.js @@ -2,3 +2,4 @@ export * from './disposables' export * from './scheduler' export * from './combinators' +export * from './multicast' diff --git a/ffi/Control/Stream/multicast/HoldSource.js b/ffi/Control/Stream/multicast/HoldSource.js new file mode 100644 index 0000000..c3ad323 --- /dev/null +++ b/ffi/Control/Stream/multicast/HoldSource.js @@ -0,0 +1,45 @@ +import { append, drop } from '@most/prelude' + +import { MulticastSource } from './MulticastSource' + +export class HoldSource extends MulticastSource { + constructor (source, bufferSize) { + super(source) + this.bufferSize = bufferSize + this.has = false + this.buffer = [] + } + + add (sink, scheduler) { + if (this.has) { + pushEvents(this.buffer, sink, scheduler) + } + + return super.add(sink, scheduler) + } + + event (time) { + return value => { + this.has = true + this.buffer = dropAndAppend(value, this.buffer, this.bufferSize) + + return super.event(time)(value) + } + } +} + +function pushEvents (buffer, sink, scheduler) { + const length = buffer.length + + for (let i = 0; i < length; ++i) { + sink.event(scheduler.now(), buffer[i]) + } +} + +export function dropAndAppend (value, buffer, bufferSize) { + if (buffer.length === bufferSize) { + return append(value, drop(1, buffer)) + } + + return append(value, buffer) +} diff --git a/ffi/Control/Stream/multicast/MulticastDisposable.js b/ffi/Control/Stream/multicast/MulticastDisposable.js new file mode 100644 index 0000000..d38b9b5 --- /dev/null +++ b/ffi/Control/Stream/multicast/MulticastDisposable.js @@ -0,0 +1,16 @@ +export class MulticastDisposable { + constructor (source, sink) { + this.source = source + this.sink = sink + this.disposed = false + } + + dispose () { + if (this.disposed) { + return + } + this.disposed = true + const remaining = this.source.remove(this.sink) + return remaining === 0 && this.source._dispose() + } +} diff --git a/ffi/Control/Stream/multicast/MulticastSource.js b/ffi/Control/Stream/multicast/MulticastSource.js new file mode 100644 index 0000000..2e0d341 --- /dev/null +++ b/ffi/Control/Stream/multicast/MulticastSource.js @@ -0,0 +1,62 @@ +import { append, findIndex, remove } from '@most/prelude' +import { dispose, emptyDisposable } from './dispose' + +import { MulticastDisposable } from './MulticastDisposable' + +export class MulticastSource { + constructor (source) { + this.source = source + this.sinks = [] + this._disposable = emptyDisposable + } + + run (sink) { + return scheduler => { + const n = this.add(sink, scheduler) + if (n === 1) { + this._disposable = this.source.run(this)(scheduler) + } + return new MulticastDisposable(this, sink) + } + } + + _dispose () { + const disposable = this._disposable + this._disposable = emptyDisposable + return Promise.resolve(disposable).then(dispose) + } + + add (sink) { + this.sinks = append(sink, this.sinks) + return this.sinks.length + } + + remove (sink) { + const i = findIndex(sink, this.sinks) + // istanbul ignore next + if (i >= 0) { + this.sinks = remove(i, this.sinks) + } + + return this.sinks.length + } + + event (time) { + return value => { + const s = this.sinks + if (s.length === 1) { + return s[0].event(time)(value) + } + for (let i = 0; i < s.length; ++i) { + s[i].event(time)(value) + } + } + } + + end (time) { + const s = this.sinks + for (let i = 0; i < s.length; ++i) { + s[i].end(time) + } + } +} diff --git a/ffi/Control/Stream/multicast/dispose.js b/ffi/Control/Stream/multicast/dispose.js new file mode 100644 index 0000000..34ed5ab --- /dev/null +++ b/ffi/Control/Stream/multicast/dispose.js @@ -0,0 +1,5 @@ +export const dispose = disposable => disposable.dispose() + +export const emptyDisposable = { + dispose () {} +} diff --git a/ffi/Control/Stream/multicast/index.js b/ffi/Control/Stream/multicast/index.js new file mode 100644 index 0000000..cb2cfaf --- /dev/null +++ b/ffi/Control/Stream/multicast/index.js @@ -0,0 +1,14 @@ +import { HoldSource } from './HoldSource' +import { MulticastSource } from './MulticastSource' +import { Stream } from '../Stream' + +export function multicast (stream) { + const source = stream.source + + return source instanceof MulticastSource + ? stream + : new Stream(new MulticastSource(source)) +} + +export const hold = bufferSize => stream => + new Stream(new HoldSource(stream.source, bufferSize)) diff --git a/ffi/Control/Stream/scheduler/PropagateTask.js b/ffi/Control/Stream/scheduler/PropagateTask.js index 0883d75..4941dbe 100644 --- a/ffi/Control/Stream/scheduler/PropagateTask.js +++ b/ffi/Control/Stream/scheduler/PropagateTask.js @@ -11,6 +11,7 @@ export const endTask = sink => propagateTask(runEnd, void 0, sink) class PropagateTask { constructor (run, value, sink) { let active = true + this.sink = sink this.dispose = () => { active = false } this.run = t => { diff --git a/src/Control/Stream.js b/src/Control/Stream.js index bb94ba1..ce131a9 100644 --- a/src/Control/Stream.js +++ b/src/Control/Stream.js @@ -12,7 +12,16 @@ Object.defineProperty(exports, '__esModule', { value: true }); // append :: a -> [a] -> [a] // a with x appended +function append (x, a) { + var l = a.length; + var b = new Array(l + 1); + for (var i = 0; i < l; ++i) { + b[i] = a[i]; + } + b[l] = x; + return b +} // drop :: Int -> [a] -> [a] // drop first n elements @@ -80,7 +89,37 @@ function reduce (f, z, a) { // remove :: Int -> [a] -> [a] // remove element at index +function remove (i, a) { // eslint-disable-line complexity + if (i < 0) { + throw new TypeError('i must be >= 0') + } + var l = a.length; + if (l === 0 || i >= l) { // exit early if index beyond end of array + return a + } + + if (l === 1) { // exit early if index in bounds and length === 1 + return [] + } + + return unsafeRemove(i, a, l - 1) +} + +// unsafeRemove :: Int -> [a] -> Int -> [a] +// Internal helper to remove element at index +function unsafeRemove (i, a, l) { + var b = new Array(l); + var j; + for (j = 0; j < i; ++j) { + b[j] = a[j]; + } + for (j = i; j < l; ++j) { + b[j] = a[j + 1]; + } + + return b +} // removeAll :: (a -> boolean) -> [a] -> [a] // remove all elements matching a predicate @@ -491,6 +530,7 @@ var endTask = function (sink) { return propagateTask(runEnd, void 0, sink); }; var PropagateTask = function PropagateTask (run, value, sink) { var active = true; + this.sink = sink; this.dispose = function () { active = false; }; this.run = function (t) { @@ -1099,6 +1139,223 @@ Segment.prototype._dispose = function (t) { this.disposable.dispose(); }; +var skipRepeatsWith = function (equals) { return function (stream) { return new Stream(new SkipRepeats(equals, stream)); }; }; + +var SkipRepeats = function SkipRepeats (equals, source) { + this.equals = equals; + this.source = source; +}; + +SkipRepeats.prototype.run = function run (sink) { + var this$1 = this; + + return function (scheduler) { return this$1.source.run(new SkipRepeatsSink(this$1.equals, sink), scheduler); } +}; + +var SkipRepeatsSink = function SkipRepeatsSink (equals, sink) { + this.equals = equals; + this.value = void 0; + this.init = true; +}; + +SkipRepeatsSink.prototype.event = function event (t) { + var this$1 = this; + + return function (x) { + if (this$1.init) { + this$1.init = false; + this$1.value = x; + this$1.sink.event(t, x); + } else if (!this$1.equals(this$1.value)(x)) { + this$1.value = x; + this$1.sink.event(t)(x); + } + } +}; + +SkipRepeatsSink.prototype.end = function end (t) { + this.sink.end(t); +}; + +var delay = function (delayTime) { return function (stream) { return delayTime <= 0 ? stream : new Stream(new Delay(delayTime, stream.source)); }; }; + +var Delay = function Delay (dt, source) { + this.dt = dt; + this.source = source; +}; + +Delay.prototype.run = function run (sink) { + var this$1 = this; + + return function (scheduler) { + var delaySink = new DelaySink(this$1.dt, sink, scheduler); + return disposeAll([delaySink, this$1.source.run(delaySink)(scheduler)]) + } +}; + +var DelaySink = function DelaySink (dt, sink, scheduler) { + this.dt = dt; + this.sink = sink; + this.scheduler = scheduler; +}; + +DelaySink.prototype.dispose = function dispose () { + var this$1 = this; + + this.scheduler.cancelAll(function (task) { return task.sink === this$1.sink; }); +}; + +DelaySink.prototype.event = function event (t) { + var this$1 = this; + + return function (x) { return this$1.scheduler.delay(this$1.dt)(eventTask(x)(this$1.sink)); } +}; + +DelaySink.prototype.end = function end (t) { + this.scheduler.delay(this.dt)(endTask(this.sink)); +}; + +var dispose$1 = function (disposable) { return disposable.dispose(); }; + +var emptyDisposable$1 = { + dispose: function dispose () {} +}; + +var MulticastDisposable = function MulticastDisposable (source, sink) { + this.source = source; + this.sink = sink; + this.disposed = false; +}; + +MulticastDisposable.prototype.dispose = function dispose () { + if (this.disposed) { + return + } + this.disposed = true; + var remaining = this.source.remove(this.sink); + return remaining === 0 && this.source._dispose() +}; + +var MulticastSource = function MulticastSource (source) { + this.source = source; + this.sinks = []; + this._disposable = emptyDisposable$1; +}; + +MulticastSource.prototype.run = function run (sink) { + var this$1 = this; + + return function (scheduler) { + var n = this$1.add(sink, scheduler); + if (n === 1) { + this$1._disposable = this$1.source.run(this$1)(scheduler); + } + return new MulticastDisposable(this$1, sink) + } +}; + +MulticastSource.prototype._dispose = function _dispose () { + var disposable = this._disposable; + this._disposable = emptyDisposable$1; + return Promise.resolve(disposable).then(dispose$1) +}; + +MulticastSource.prototype.add = function add (sink) { + this.sinks = append(sink, this.sinks); + return this.sinks.length +}; + +MulticastSource.prototype.remove = function remove$1 (sink) { + var i = findIndex(sink, this.sinks); + // istanbul ignore next + if (i >= 0) { + this.sinks = remove(i, this.sinks); + } + + return this.sinks.length +}; + +MulticastSource.prototype.event = function event (time) { + var this$1 = this; + + return function (value) { + var s = this$1.sinks; + if (s.length === 1) { + return s[0].event(time)(value) + } + for (var i = 0; i < s.length; ++i) { + s[i].event(time)(value); + } + } +}; + +MulticastSource.prototype.end = function end (time) { + var s = this.sinks; + for (var i = 0; i < s.length; ++i) { + s[i].end(time); + } +}; + +var HoldSource = (function (MulticastSource$$1) { + function HoldSource (source, bufferSize) { + MulticastSource$$1.call(this, source); + this.bufferSize = bufferSize; + this.has = false; + this.buffer = []; + } + + if ( MulticastSource$$1 ) HoldSource.__proto__ = MulticastSource$$1; + HoldSource.prototype = Object.create( MulticastSource$$1 && MulticastSource$$1.prototype ); + HoldSource.prototype.constructor = HoldSource; + + HoldSource.prototype.add = function add (sink, scheduler) { + if (this.has) { + pushEvents(this.buffer, sink, scheduler); + } + + return MulticastSource$$1.prototype.add.call(this, sink, scheduler) + }; + + HoldSource.prototype.event = function event (time) { + var this$1 = this; + + return function (value) { + this$1.has = true; + this$1.buffer = dropAndAppend(value, this$1.buffer, this$1.bufferSize); + + return MulticastSource$$1.prototype.event.call(this$1, time)(value) + } + }; + + return HoldSource; +}(MulticastSource)); + +function pushEvents (buffer, sink, scheduler) { + var length = buffer.length; + + for (var i = 0; i < length; ++i) { + sink.event(scheduler.now(), buffer[i]); + } +} + +function dropAndAppend (value, buffer, bufferSize) { + if (buffer.length === bufferSize) { + return append(value, drop(1, buffer)) + } + + return append(value, buffer) +} + +function multicast (stream) { + var source = stream.source; + + return source instanceof MulticastSource + ? stream + : new Stream(new MulticastSource(source)) +} + +var hold = function (bufferSize) { return function (stream) { return new Stream(new HoldSource(stream.source, bufferSize)); }; }; + /* */ exports.createDisposable = createDisposable; @@ -1121,3 +1378,7 @@ exports.merge = merge; exports.concat = concat; exports.continueWith = continueWith; exports.switch = switchLatest; +exports.skipRepeatsWith = skipRepeatsWith; +exports.delay = delay; +exports.multicast = multicast; +exports.hold = hold; diff --git a/src/Control/Stream.purs b/src/Control/Stream.purs index 7e6b04c..1913814 100644 --- a/src/Control/Stream.purs +++ b/src/Control/Stream.purs @@ -3,14 +3,15 @@ module Control.Stream , EffStream , Stream , Source + , EventFn + , EndFn + , RunFn , Sink , Scheduler , Task , ScheduledTask , Disposable , Time - , EventFn - , EndFn -- Scheduler-related functions and values , defaultScheduler , eventTask @@ -46,29 +47,39 @@ module Control.Stream , constant , scan , startWith + , multicast + , hold + , skipRepeats + , skipRepeatsWith + , delay + , sample -- Stream factories , just , fromArray , empty + , never , periodic ) where +import Control.Alt ((<|>)) import Control.Applicative (class Applicative) import Control.Apply (class Apply) import Control.Bind (class Bind) import Control.Category (id) import Control.Monad (class Monad) -import Control.Monad.Eff (Eff, Pure) +import Control.Monad.Eff (Eff, Pure, runPure) import Control.Monad.Eff.Ref (Ref, modifyRef', newRef, readRef) import Control.Monad.Eff.Unsafe (unsafeCoerceEff, unsafePerformEff) import Control.MonadPlus (class Alt, class Plus) -import Data.Array (snoc) +import Data.Eq (class Eq, eq) import Data.Function (flip, ($)) import Data.Functor (class Functor, map) +import Data.Maybe (Maybe(Just, Nothing), fromJust, isJust) import Data.Monoid (class Monoid) import Data.Semigroup (class Semigroup) import Data.Unit (Unit, unit) +import Partial.Unsafe (unsafePartial) newtype Time = Time Int @@ -78,9 +89,13 @@ type EffStream e a = Eff (stream :: STREAM | e) a newtype Stream a = Stream { source :: Source a } -type Source a = { run :: Sink a -> Scheduler -> Disposable } +newtype Source a = Source { run :: RunFn a } -type Sink a = +type RunFn a = Sink a -> Scheduler -> Disposable +type EventFn a = Time -> a -> Unit +type EndFn = Time -> Unit + +newtype Sink a = Sink { event :: Time -> a -> Unit , end :: Time -> Unit } @@ -107,9 +122,6 @@ type ScheduledTask = type Disposable = { dispose :: Pure Unit } -type EventFn a b = (Sink b -> Scheduler -> Time -> a -> Unit) -type EndFn a = (Sink a -> Scheduler -> Time -> Unit) - instance functorStream :: Functor Stream where map = _map @@ -138,36 +150,37 @@ instance plusStream :: Plus Stream where -- Stream-related helpers createStream :: forall a. (Sink a -> Scheduler -> Disposable) -> Stream a -createStream run = Stream { source: { run } } +createStream run = Stream { source: Source { run } } getSource :: forall a. Stream a -> Source a -getSource stream = case stream of Stream a -> a.source +getSource (Stream stream) = stream.source -runSource :: forall a. Stream a -> Sink a -> Scheduler -> Disposable -runSource stream sink scheduler = (getSource stream).run sink scheduler +runSource :: forall a. Source a -> Sink a -> Scheduler -> Disposable +runSource (Source source) sink scheduler = source.run sink scheduler + +runStream :: forall a. Stream a -> Sink a -> Scheduler -> Disposable +runStream stream sink scheduler = runSource (getSource stream) sink scheduler createSink :: forall a. (Time -> a -> Unit) -> (Time -> Unit) -> Sink a -createSink event end = { event, end } +createSink event end = Sink { event, end } -createCombinator :: forall a b. EventFn a b -> EndFn b -> Stream a -> Stream b +createCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> (Sink b -> Scheduler -> EndFn) -> Stream a -> Stream b createCombinator event end stream = createStream runCombinator where runCombinator :: Sink b -> Scheduler -> Disposable runCombinator sink scheduler = - runSource stream (createSink (event sink scheduler) (end sink scheduler)) scheduler + runStream stream (createSink (event sink scheduler) (end sink scheduler)) scheduler -createEventCombinator :: forall a b. EventFn a b -> Stream a -> Stream b +createEventCombinator :: forall a b. (Sink b -> Scheduler -> EventFn a) -> Stream a -> Stream b createEventCombinator event stream = createCombinator event end stream where - end :: EndFn b - end sink scheduler time = - sink.end time + end (Sink sink) (Scheduler scheduler) time = sink.end time -createEndCombinator :: forall a. EndFn a -> Stream a -> Stream a +createEndCombinator :: forall a. (Sink a -> Scheduler -> EndFn) -> Stream a -> Stream a createEndCombinator end stream = createCombinator event end stream where - event :: EventFn a a - event sink scheduler time value = sink.event time value + event :: Sink a -> Scheduler -> EventFn a + event (Sink sink) scheduler time value = sink.event time value -- Stream factories just :: forall a. a -> Stream a @@ -192,7 +205,7 @@ runFromArray :: forall a. Array a -> Sink a -> Scheduler -> Disposable runFromArray arr sink scheduler = scheduleTasks tasks scheduler where tasks :: Array Task - tasks = snoc eventTasks (endTask sink) + tasks = eventTasks <|> [ (endTask sink) ] eventTasks :: Array Task eventTasks = map (flip eventTask sink) arr @@ -201,39 +214,34 @@ periodic :: Number -> Stream Unit periodic period = createStream (runPeriodic period) runPeriodic :: Number -> Sink Unit -> Scheduler -> Disposable -runPeriodic period sink scheduler = { dispose: scheduledTask.dispose } +runPeriodic period sink (Scheduler scheduler) = { dispose: scheduledTask.dispose } where - p = case scheduler of Scheduler a -> a.periodic - scheduledTask = p period (eventTask unit sink) + scheduledTask = scheduler.periodic period (eventTask unit sink) -- combinators _map :: forall a b. (a -> b) -> Stream a -> Stream b _map f stream = createEventCombinator mapEvent stream where - mapEvent :: EventFn a b - mapEvent sink scheduler time value = sink.event time (f value) + mapEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (f value) filter :: forall a. (a -> Boolean) -> Stream a -> Stream a filter predicate stream = createEventCombinator event stream where - event :: EventFn a a - event sink scheduler time value = if predicate value + event (Sink sink) (Scheduler scheduler) time value = if predicate value then sink.event time value else unit tapEvent :: forall e a. (a -> EffStream e Unit) -> Stream a -> Stream a tapEvent f stream = createEventCombinator event stream where - event :: EventFn a a - event sink scheduler time value = sink.event time value + event (Sink sink) (Scheduler scheduler) time value = sink.event time value where -- find a better way to perform these side effects x = unsafePerformEff (f value) tapEnd :: forall e a. EffStream e Unit -> Stream a -> Stream a tapEnd f stream = createEndCombinator end stream where - end :: EndFn a - end sink scheduler time = sink.end time + end (Sink sink) (Scheduler scheduler) time = sink.end time where -- find a better way to perform these side effects x = unsafePerformEff f @@ -243,18 +251,51 @@ startWith value stream = concat (just value) stream constant :: forall a b. b -> Stream a -> Stream b constant value stream = createEventCombinator constantEvent stream where - constantEvent sink scheduler time x = sink.event time value + constantEvent (Sink sink) (Scheduler scheduler) time x = sink.event time value scan :: forall a b. (b -> a -> b) -> b -> Stream a -> Stream b scan f seed stream = startWith seed $ createEventCombinator scanEvent stream where + state :: State b state = createState seed scanEvent :: Sink b -> Scheduler -> Time -> a -> Unit - scanEvent sink scheduler time value = sink.event time (state.set \acc -> f acc value) + scanEvent (Sink sink) (Scheduler scheduler) time value = sink.event time (state.set \acc -> f acc value) + +skipRepeats :: forall a. (Eq a) => Stream a -> Stream a +skipRepeats = skipRepeatsWith eq + +sample :: forall a b c. (a -> b -> c) -> Stream a -> Stream b -> Stream c +sample f sampler stream = createStream $ runSample (createState Nothing) + where + runSample :: State (Maybe b) -> Sink c -> Scheduler -> Disposable + runSample state sink scheduler = disposeAll + [ runStream stream (createHoldSink state) scheduler + , runStream sampler (createSampleSink state sink f) scheduler + ] + +createHoldSink :: forall a. State (Maybe a) -> Sink a +createHoldSink state = createSink event end + where + event time value = always unit (state.set \_ -> Just value) + end time = unit + +createSampleSink :: forall a b c. State (Maybe b) -> Sink c -> (a -> b -> c) -> Sink a +createSampleSink state (Sink sink) f = createSink event end + where + end time = sink.end time + event time value = + if isJust (runPure state.get) + then sink.event time $ f value (unsafePartial $ fromJust (runPure state.get)) + else unit + +always :: forall a b. a -> b -> a +always a b = a -- find a better way to perform these side effects -createState :: forall a. a -> { get :: Pure a, set :: (a -> a) -> a } +type State a = { get:: Pure a, set :: (a -> a) -> a } + +createState :: forall a. a -> State a createState seed = { set, get } where ref :: Ref a @@ -290,3 +331,7 @@ foreign import continueWith :: forall a. (Unit -> Stream a) -> Stream a -> Strea foreign import concat :: forall a. Stream a -> Stream a -> Stream a foreign import merge :: forall a. Array (Stream a) -> Stream a foreign import switch :: forall a. Stream (Stream a) -> Stream a +foreign import multicast :: forall a. Stream a -> Stream a +foreign import hold :: forall a. Int -> Stream a -> Stream a +foreign import skipRepeatsWith :: forall a. (a -> a -> Boolean) -> Stream a -> Stream a +foreign import delay :: forall a. Number -> Stream a -> Stream a