Fixing swift 3.0 warnings.

This commit is contained in:
Krunoslav Zaher 2016-04-21 13:33:53 +02:00
parent c4ca775bdc
commit 1b5cd9ecb5
70 changed files with 427 additions and 427 deletions

View File

@ -19,9 +19,9 @@ let derivedData = Process.arguments[2]
let fileManager = NSFileManager() let fileManager = NSFileManager()
func escape(value: String) -> String { func escape(value: String) -> String {
let escapedString = value.stringByReplacingOccurrencesOfString("\n", withString: "\\n") let escapedString = value.replacingOccurrences(of: "\n", with: "\\n")
let escapedString1 = escapedString.stringByReplacingOccurrencesOfString("\r", withString: "\\r") let escapedString1 = escapedString.replacingOccurrences(of: "\r", with: "\\r")
let escapedString2 = escapedString1.stringByReplacingOccurrencesOfString("\"", withString: "\\\"") let escapedString2 = escapedString1.replacingOccurrences(of: "\"", with: "\\\"")
return "\"\(escapedString2)\"" return "\"\(escapedString2)\""
} }
@ -30,14 +30,14 @@ func processFile(path: String, outputPath: String) -> String {
let rawContent = NSData(contentsOfFile: path)! let rawContent = NSData(contentsOfFile: path)!
let content = NSString(data: rawContent, encoding: NSUTF8StringEncoding)! as String let content = NSString(data: rawContent, encoding: NSUTF8StringEncoding)! as String
let components = content.componentsSeparatedByString("<%") let components = content.components(separatedBy: "<%")
var functionContentComponents: [String] = [] var functionContentComponents: [String] = []
functionContentComponents.append("var components: [String] = [\"// This file is autogenerated. Take a look at `Preprocessor` target in RxSwift project \\n\"]\n") functionContentComponents.append("var components: [String] = [\"// This file is autogenerated. Take a look at `Preprocessor` target in RxSwift project \\n\"]\n")
functionContentComponents.append("components.append(\(escape(components[0])))\n") functionContentComponents.append("components.append(\(escape(value: components[0])))\n")
for codePlusSuffix in (components[1 ..< components.count]) { for codePlusSuffix in (components[1 ..< components.count]) {
let codePlusSuffixSeparated = codePlusSuffix.componentsSeparatedByString("%>") let codePlusSuffixSeparated = codePlusSuffix.components(separatedBy: "%>")
if codePlusSuffixSeparated.count != 2 { if codePlusSuffixSeparated.count != 2 {
fatalError("Error in \(path) near \(codePlusSuffix)") fatalError("Error in \(path) near \(codePlusSuffix)")
} }
@ -46,18 +46,18 @@ func processFile(path: String, outputPath: String) -> String {
let suffix = codePlusSuffixSeparated[1] let suffix = codePlusSuffixSeparated[1]
if code.hasPrefix("=") { if code.hasPrefix("=") {
functionContentComponents.append("components.append(String(\(code.substringFromIndex(code.startIndex.successor()))))\n") functionContentComponents.append("components.append(String(\(code.substring(from: code.startIndex.successor()))))\n")
} }
else { else {
functionContentComponents.append("\(code)\n") functionContentComponents.append("\(code)\n")
} }
functionContentComponents.append("components.append(\(escape(suffix)));\n") functionContentComponents.append("components.append(\(escape(value: suffix)));\n")
} }
functionContentComponents.append("try! components.joinWithSeparator(\"\").writeToFile(\"\(outputPath)\", atomically: false, encoding: NSUTF8StringEncoding)") functionContentComponents.append("try! components.joinWithSeparator(\"\").writeToFile(\"\(outputPath)\", atomically: false, encoding: NSUTF8StringEncoding)")
return functionContentComponents.joinWithSeparator("") return functionContentComponents.joined(separator: "")
} }
func runCommand(path: String) { func runCommand(path: String) {
@ -71,34 +71,34 @@ func runCommand(path: String) {
task.waitUntilExit() task.waitUntilExit()
if task.terminationReason != NSTaskTerminationReason.Exit { if task.terminationReason != NSTaskTerminationReason.exit {
exit(-1) exit(-1)
} }
} }
let files = fileManager.subpathsAtPath(sourceFilesRoot) let files = try fileManager.subpathsOfDirectory(atPath: sourceFilesRoot)
var generateAllFiles = ["// Generated code\n", "import Foundation\n"] var generateAllFiles = ["// Generated code\n", "import Foundation\n"]
for file in files! { for file in files {
if ((file as NSString).pathExtension ?? "") != "tt" { if ((file as NSString).pathExtension ?? "") != "tt" {
continue continue
} }
print(file) print(file)
let path = (sourceFilesRoot as NSString).stringByAppendingPathComponent(file as String) let path = (sourceFilesRoot as NSString).appendingPathComponent(file as String)
let outputPath = path.substringToIndex(path.endIndex.predecessor().predecessor().predecessor()) + ".swift" let outputPath = path.substring(to: path.endIndex.predecessor().predecessor().predecessor()) + ".swift"
generateAllFiles.append("_ = { () -> Void in\n\(processFile(path, outputPath: outputPath))\n}()\n") generateAllFiles.append("_ = { () -> Void in\n\(processFile(path: path, outputPath: outputPath))\n}()\n")
} }
let script = generateAllFiles.joinWithSeparator("") let script = generateAllFiles.joined(separator: "")
let scriptPath = (derivedData as NSString).stringByAppendingPathComponent("_preprocessor.sh") let scriptPath = (derivedData as NSString).appendingPathComponent("_preprocessor.sh")
do { do {
try script.writeToFile(scriptPath, atomically: true, encoding: NSUTF8StringEncoding) try script.write(toFile: scriptPath, atomically: true, encoding: NSUTF8StringEncoding)
} catch _ { } catch _ {
} }
runCommand(scriptPath) runCommand(path: scriptPath)

View File

@ -335,7 +335,7 @@ extension NSError {
#if !RX_NO_MODULE #if !RX_NO_MODULE
@noreturn func rxFatalError(lastMessage: String) { @noreturn func rxFatalError(_ lastMessage: String) {
// The temptation to comment this line is great, but please don't, it's for your own good. The choice is yours. // The temptation to comment this line is great, but please don't, it's for your own good. The choice is yours.
fatalError(lastMessage) fatalError(lastMessage)
} }

View File

@ -18,7 +18,7 @@ struct PriorityQueue<Element: AnyObject> {
mutating func enqueue(element: Element) { mutating func enqueue(element: Element) {
_elements.append(element) _elements.append(element)
bubbleToHigherPriority(_elements.count - 1) bubbleToHigherPriority(initialUnbalancedIndex: _elements.count - 1)
} }
func peek() -> Element? { func peek() -> Element? {
@ -34,7 +34,7 @@ struct PriorityQueue<Element: AnyObject> {
return nil return nil
} }
removeAt(0) removeAt(index: 0)
return front return front
} }
@ -42,7 +42,7 @@ struct PriorityQueue<Element: AnyObject> {
mutating func remove(element: Element) { mutating func remove(element: Element) {
for i in 0 ..< _elements.count { for i in 0 ..< _elements.count {
if _elements[i] === element { if _elements[i] === element {
removeAt(i) removeAt(index: i)
return return
} }
} }
@ -57,8 +57,8 @@ struct PriorityQueue<Element: AnyObject> {
_elements.popLast() _elements.popLast()
if !removingLast { if !removingLast {
bubbleToHigherPriority(index) bubbleToHigherPriority(initialUnbalancedIndex: index)
bubbleToLowerPriority(index) bubbleToLowerPriority(initialUnbalancedIndex: index)
} }
} }

View File

@ -43,7 +43,7 @@ public class RefCountDisposable : DisposeBase, Cancelable {
if let _ = _disposable { if let _ = _disposable {
do { do {
try incrementChecked(&_count) try incrementChecked(i: &_count)
} catch (_) { } catch (_) {
rxFatalError("RefCountDisposable increment failed") rxFatalError("RefCountDisposable increment failed")
} }
@ -83,7 +83,7 @@ public class RefCountDisposable : DisposeBase, Cancelable {
let oldDisposable: Disposable? = _lock.calculateLocked { let oldDisposable: Disposable? = _lock.calculateLocked {
if let oldDisposable = _disposable { if let oldDisposable = _disposable {
do { do {
try decrementChecked(&_count) try decrementChecked(i: &_count)
} catch (_) { } catch (_) {
rxFatalError("RefCountDisposable decrement on release failed") rxFatalError("RefCountDisposable decrement on release failed")
} }

View File

@ -46,7 +46,7 @@ public class SingleAssignmentDisposable : DisposeBase, Disposable, Cancelable {
return _disposable ?? NopDisposable.instance return _disposable ?? NopDisposable.instance
} }
set { set {
_setDisposable(newValue)?.dispose() _setDisposable(newValue: newValue)?.dispose()
} }
} }

View File

@ -9,7 +9,7 @@
import Foundation import Foundation
public final class StableCompositeDisposable { public final class StableCompositeDisposable {
public static func create(disposable1: Disposable, _ disposable2: Disposable) -> Disposable { public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Disposable {
return BinaryDisposable(disposable1, disposable2) return BinaryDisposable(disposable1, disposable2)
} }
} }

View File

@ -21,7 +21,7 @@ extension ObservableType {
let observer = AnonymousObserver { e in let observer = AnonymousObserver { e in
on(event: e) on(event: e)
} }
return self.subscribeSafe(observer) return self.subscribeSafe(observer: observer)
} }
/** /**
@ -60,7 +60,7 @@ extension ObservableType {
} }
} }
return BinaryDisposable( return BinaryDisposable(
self.subscribeSafe(observer), self.subscribeSafe(observer: observer),
disposable disposable
) )
} }
@ -79,7 +79,7 @@ extension ObservableType {
onNext(value) onNext(value)
} }
} }
return self.subscribeSafe(observer) return self.subscribeSafe(observer: observer)
} }
/** /**
@ -96,7 +96,7 @@ extension ObservableType {
onError(error) onError(error)
} }
} }
return self.subscribeSafe(observer) return self.subscribeSafe(observer: observer)
} }
/** /**
@ -113,7 +113,7 @@ extension ObservableType {
onCompleted() onCompleted()
} }
} }
return self.subscribeSafe(observer) return self.subscribeSafe(observer: observer)
} }
} }
@ -123,6 +123,6 @@ public extension ObservableType {
*/ */
@warn_unused_result(message: "http://git.io/rxs.ud") @warn_unused_result(message: "http://git.io/rxs.ud")
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable { func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
return self.asObservable().subscribe(observer) return self.asObservable().subscribe(observer: observer)
} }
} }

View File

@ -53,7 +53,7 @@ class BufferTimeCountSink<Element, O: ObserverType where O.E == [Element]>
func run() -> Disposable { func run() -> Disposable {
createTimer(_windowID) createTimer(_windowID)
return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self)) return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(observer: self))
} }
func startNewWindowAndSendCurrentOne() { func startNewWindowAndSendCurrentOne() {

View File

@ -21,7 +21,7 @@ class CatchSinkProxy<O: ObserverType> : ObserverType {
} }
func on(event: Event<E>) { func on(event: Event<E>) {
_parent.forwardOn(event) _parent.forwardOn(event: event)
switch event { switch event {
case .Next: case .Next:
@ -47,7 +47,7 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
func run() -> Disposable { func run() -> Disposable {
let d1 = SingleAssignmentDisposable() let d1 = SingleAssignmentDisposable()
_subscription.disposable = d1 _subscription.disposable = d1
d1.disposable = _parent._source.subscribe(self) d1.disposable = _parent._source.subscribe(observer: self)
return _subscription return _subscription
} }
@ -55,9 +55,9 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
func on(event: Event<E>) { func on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
forwardOn(event) forwardOn(event: event)
case .Completed: case .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Error(let error): case .Error(let error):
do { do {
@ -68,7 +68,7 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
_subscription.disposable = catchSequence.subscribe(observer) _subscription.disposable = catchSequence.subscribe(observer)
} }
catch let e { catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
} }
} }
@ -110,26 +110,26 @@ class CatchSequenceSink<S: Sequence, O: ObserverType where S.Iterator.Element :
func on(event: Event<Element>) { func on(event: Event<Element>) {
switch event { switch event {
case .Next: case .Next:
forwardOn(event) forwardOn(event: event)
case .Error(let error): case .Error(let error):
_lastError = error _lastError = error
schedule(.MoveNext) schedule(command: .MoveNext)
case .Completed: case .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
} }
} }
override func subscribeToNext(source: Observable<E>) -> Disposable { override func subscribeToNext(source: Observable<E>) -> Disposable {
return source.subscribe(self) return source.subscribe(observer: self)
} }
override func done() { override func done() {
if let lastError = _lastError { if let lastError = _lastError {
forwardOn(.Error(lastError)) forwardOn(event: .Error(lastError))
} }
else { else {
forwardOn(.Completed) forwardOn(event: .Completed)
} }
self.dispose() self.dispose()
@ -156,7 +156,7 @@ class CatchSequence<S: Sequence where S.Iterator.Element : ObservableConvertible
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = CatchSequenceSink<S, O>(observer: observer) let sink = CatchSequenceSink<S, O>(observer: observer)
sink.disposable = sink.run((self.sources.makeIterator(), nil)) sink.disposable = sink.run(sources: (self.sources.makeIterator(), nil))
return sink return sink
} }
} }

View File

@ -51,7 +51,7 @@ class CombineLatestCollectionTypeSink<C: Collection, R, O: ObserverType where C.
if _numberOfValues < _parent._count { if _numberOfValues < _parent._count {
let numberOfOthersThatAreDone = self._numberOfDone - (_isDone[atIndex] ? 1 : 0) let numberOfOthersThatAreDone = self._numberOfDone - (_isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self._parent._count - 1 { if numberOfOthersThatAreDone == self._parent._count - 1 {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
return return
@ -59,15 +59,15 @@ class CombineLatestCollectionTypeSink<C: Collection, R, O: ObserverType where C.
do { do {
let result = try _parent._resultSelector(_values.map { $0! }) let result = try _parent._resultSelector(_values.map { $0! })
forwardOn(.Next(result)) forwardOn(event: .Next(result))
} }
catch let error { catch let error {
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
} }
case .Error(let error): case .Error(let error):
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
case .Completed: case .Completed:
if _isDone[atIndex] { if _isDone[atIndex] {
@ -78,7 +78,7 @@ class CombineLatestCollectionTypeSink<C: Collection, R, O: ObserverType where C.
_numberOfDone += 1 _numberOfDone += 1
if _numberOfDone == self._parent._count { if _numberOfDone == self._parent._count {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
else { else {
@ -93,8 +93,8 @@ class CombineLatestCollectionTypeSink<C: Collection, R, O: ObserverType where C.
for i in _parent._sources.startIndex ..< _parent._sources.endIndex { for i in _parent._sources.startIndex ..< _parent._sources.endIndex {
let index = j let index = j
let source = _parent._sources[i].asObservable() let source = _parent._sources[i].asObservable()
_subscriptions[j].disposable = source.subscribe(AnyObserver { event in _subscriptions[j].disposable = source.subscribe(observer: AnyObserver { event in
self.on(event, atIndex: index) self.on(event: event, atIndex: index)
}) })
j += 1 j += 1

View File

@ -48,10 +48,10 @@ class CombineLatestSink<O: ObserverType>
if _numberOfValues == _arity { if _numberOfValues == _arity {
do { do {
let result = try getResult() let result = try getResult()
forwardOn(.Next(result)) forwardOn(event: .Next(result))
} }
catch let e { catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
} }
} }
@ -66,14 +66,14 @@ class CombineLatestSink<O: ObserverType>
} }
if allOthersDone { if allOthersDone {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
} }
} }
func fail(error: ErrorProtocol) { func fail(error: ErrorProtocol) {
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
} }
@ -86,7 +86,7 @@ class CombineLatestSink<O: ObserverType>
_numberOfDone += 1 _numberOfDone += 1
if _numberOfDone == _arity { if _numberOfDone == _arity {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
} }
@ -115,20 +115,20 @@ class CombineLatestObserver<ElementType>
} }
func on(event: Event<Element>) { func on(event: Event<Element>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<Element>) { func _synchronized_on(event: Event<Element>) {
switch event { switch event {
case .Next(let value): case .Next(let value):
_setLatestValue(value) _setLatestValue(value)
_parent.next(_index) _parent.next(index: _index)
case .Error(let error): case .Error(let error):
_this.dispose() _this.dispose()
_parent.fail(error) _parent.fail(error: error)
case .Completed: case .Completed:
_this.dispose() _this.dispose()
_parent.done(_index) _parent.done(index: _index)
} }
} }
} }

View File

@ -31,7 +31,7 @@ class ConcatSink<S: Sequence, O: ObserverType where S.Iterator.Element : Observa
} }
override func subscribeToNext(source: Observable<E>) -> Disposable { override func subscribeToNext(source: Observable<E>) -> Disposable {
return source.subscribe(self) return source.subscribe(observer: self)
} }
override func extract(observable: Observable<E>) -> SequenceGenerator? { override func extract(observable: Observable<E>) -> SequenceGenerator? {

View File

@ -83,7 +83,7 @@ class ConnectableObservableAdapter<S: SubjectType>
return connection return connection
} }
let disposable = _source.subscribe(_subject.asObserver()) let disposable = _source.subscribe(observer: _subject.asObserver())
let connection = Connection(parent: self, lock: _lock, subscription: disposable) let connection = Connection(parent: self, lock: _lock, subscription: disposable)
_connection = connection _connection = connection
return connection return connection
@ -91,6 +91,6 @@ class ConnectableObservableAdapter<S: SubjectType>
} }
override func subscribe<O : ObserverType where O.E == S.E>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == S.E>(observer: O) -> Disposable {
return _subject.subscribe(observer) return _subject.subscribe(observer: observer)
} }
} }

View File

@ -71,7 +71,7 @@ class Debug<Element> : Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = Debug_(parent: self, observer: observer) let sink = Debug_(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -21,17 +21,17 @@ class DeferredSink<S: ObservableType, O: ObserverType where S.E == O.E> : Sink<O
func run() -> Disposable { func run() -> Disposable {
do { do {
let result = try _observableFactory() let result = try _observableFactory()
return result.subscribe(self) return result.subscribe(observer: self)
} }
catch let e { catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
return NopDisposable.instance return NopDisposable.instance
} }
} }
func on(event: Event<E>) { func on(event: Event<E>) {
forwardOn(event) forwardOn(event: event)
switch event { switch event {
case .Next: case .Next:

View File

@ -44,7 +44,7 @@ class DelaySubscription<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DelaySubscriptionSink(parent: self, observer: observer) let sink = DelaySubscriptionSink(parent: self, observer: observer)
sink.disposable = _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in sink.disposable = _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in
return self._source.subscribe(sink) return self._source.subscribe(observer: sink)
} }
return sink return sink

View File

@ -35,14 +35,14 @@ class DistinctUntilChangedSink<O: ObserverType, Key>: Sink<O>, ObserverType {
_currentKey = key _currentKey = key
forwardOn(event) forwardOn(event: event)
} }
catch let error { catch let error {
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
} }
case .Error, .Completed: case .Error, .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
} }
} }
@ -64,7 +64,7 @@ class DistinctUntilChanged<Element, Key>: Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DistinctUntilChangedSink(parent: self, observer: observer) let sink = DistinctUntilChangedSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -47,7 +47,7 @@ class Do<Element> : Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DoSink(parent: self, observer: observer) let sink = DoSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -27,27 +27,27 @@ class ElementAtSink<SourceType, O: ObserverType where O.E == SourceType> : Sink<
case .Next(_): case .Next(_):
if (_i == 0) { if (_i == 0) {
forwardOn(event) forwardOn(event: event)
forwardOn(.Completed) forwardOn(event: .Completed)
self.dispose() self.dispose()
} }
do { do {
try decrementChecked(&_i) try decrementChecked(i: &_i)
} catch(let e) { } catch(let e) {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
return return
} }
case .Error(let e): case .Error(let e):
forwardOn(.Error(e)) forwardOn(event: .Error(e))
self.dispose() self.dispose()
case .Completed: case .Completed:
if (_parent._throwOnEmpty) { if (_parent._throwOnEmpty) {
forwardOn(.Error(RxError.ArgumentOutOfRange)) forwardOn(event: .Error(RxError.ArgumentOutOfRange))
} else { } else {
forwardOn(.Completed) forwardOn(event: .Completed)
} }
self.dispose() self.dispose()
@ -73,7 +73,7 @@ class ElementAt<SourceType> : Producer<SourceType> {
override func run<O: ObserverType where O.E == SourceType>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == SourceType>(observer: O) -> Disposable {
let sink = ElementAtSink(parent: self, observer: observer) let sink = ElementAtSink(parent: self, observer: observer)
sink.disposable = _source.subscribeSafe(sink) sink.disposable = _source.subscribeSafe(observer: sink)
return sink return sink
} }
} }

View File

@ -10,7 +10,7 @@ import Foundation
class Empty<Element> : Producer<Element> { class Empty<Element> : Producer<Element> {
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed) observer.on(event: .Completed)
return NopDisposable.instance return NopDisposable.instance
} }
} }

View File

@ -16,7 +16,7 @@ class Error<Element> : Producer<Element> {
} }
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Error(_error)) observer.on(event: .Error(_error))
return NopDisposable.instance return NopDisposable.instance
} }
} }

View File

@ -54,7 +54,7 @@ class Filter<Element> : Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = FilterSink(predicate: _predicate, observer: observer) let sink = FilterSink(predicate: _predicate, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -22,7 +22,7 @@ class GenerateSink<S, O: ObserverType> : Sink<O> {
} }
func run() -> Disposable { func run() -> Disposable {
return _parent._scheduler.scheduleRecursive(true) { (isFirst, recurse) -> Void in return _parent._scheduler.scheduleRecursive(state: true) { (isFirst, recurse) -> Void in
do { do {
if !isFirst { if !isFirst {
self._state = try self._parent._iterate(self._state) self._state = try self._parent._iterate(self._state)
@ -30,17 +30,17 @@ class GenerateSink<S, O: ObserverType> : Sink<O> {
if try self._parent._condition(self._state) { if try self._parent._condition(self._state) {
let result = try self._parent._resultSelector(self._state) let result = try self._parent._resultSelector(self._state)
self.forwardOn(.Next(result)) self.forwardOn(event: .Next(result))
recurse(false) recurse(false)
} }
else { else {
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
self.dispose() self.dispose()
} }
} }
catch let error { catch let error {
self.forwardOn(.Error(error)) self.forwardOn(event: .Error(error))
self.dispose() self.dispose()
} }
} }

View File

@ -20,10 +20,10 @@ class JustScheduledSink<O: ObserverType> : Sink<O> {
func run() -> Disposable { func run() -> Disposable {
let scheduler = _parent._scheduler let scheduler = _parent._scheduler
return scheduler.schedule(_parent._element) { element in return scheduler.schedule(state: _parent._element) { element in
self.forwardOn(.Next(element)) self.forwardOn(event: .Next(element))
return scheduler.schedule(()) { _ in return scheduler.schedule(state: ()) { _ in
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
return NopDisposable.instance return NopDisposable.instance
} }
} }
@ -54,8 +54,8 @@ class Just<Element> : Producer<Element> {
} }
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Next(_element)) observer.on(event: .Next(_element))
observer.on(.Completed) observer.on(event: .Completed)
return NopDisposable.instance return NopDisposable.instance
} }
} }

View File

@ -93,7 +93,7 @@ class MapWithIndex<SourceType, ResultType> : Producer<ResultType> {
override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable {
let sink = MapWithIndexSink(selector: _selector, observer: observer) let sink = MapWithIndexSink(selector: _selector, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }
@ -128,7 +128,7 @@ class Map<SourceType, ResultType>: Producer<ResultType> {
override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable {
let sink = MapSink(selector: _selector, observer: observer) let sink = MapSink(selector: _selector, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }

View File

@ -31,7 +31,7 @@ class MergeLimitedSinkIter<S: ObservableConvertibleType, O: ObserverType where S
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -39,18 +39,18 @@ class MergeLimitedSinkIter<S: ObservableConvertibleType, O: ObserverType where S
case .Next: case .Next:
_parent.forwardOn(event) _parent.forwardOn(event)
case .Error: case .Error:
_parent.forwardOn(event) _parent.forwardOn(event: event)
_parent.dispose() _parent.dispose()
case .Completed: case .Completed:
_parent._group.removeDisposable(_disposeKey) _parent._group.removeDisposable(disposeKey: _disposeKey)
if let next = _parent._queue.dequeue() { if let next = _parent._queue.dequeue() {
_parent.subscribe(next, group: _parent._group) _parent.subscribe(innerSource: next, group: _parent._group)
} }
else { else {
_parent._activeCount = _parent._activeCount - 1 _parent._activeCount = _parent._activeCount - 1
if _parent._stopped && _parent._activeCount == 0 { if _parent._stopped && _parent._activeCount == 0 {
_parent.forwardOn(.Completed) _parent.forwardOn(event: .Completed)
_parent.dispose() _parent.dispose()
} }
} }
@ -81,14 +81,14 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
init(maxConcurrent: Int, observer: O) { init(maxConcurrent: Int, observer: O) {
_maxConcurrent = maxConcurrent _maxConcurrent = maxConcurrent
_group.addDisposable(_sourceSubscription) _group.addDisposable(disposable: _sourceSubscription)
super.init(observer: observer) super.init(observer: observer)
} }
func run(source: Observable<S>) -> Disposable { func run(source: Observable<S>) -> Disposable {
_group.addDisposable(_sourceSubscription) _group.addDisposable(disposable: _sourceSubscription)
let disposable = source.subscribe(self) let disposable = source.subscribe(observer: self)
_sourceSubscription.disposable = disposable _sourceSubscription.disposable = disposable
return _group return _group
} }
@ -96,7 +96,7 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
func subscribe(innerSource: E, group: CompositeDisposable) { func subscribe(innerSource: E, group: CompositeDisposable) {
let subscription = SingleAssignmentDisposable() let subscription = SingleAssignmentDisposable()
let key = group.addDisposable(subscription) let key = group.addDisposable(disposable: subscription)
if let key = key { if let key = key {
let observer = MergeLimitedSinkIter(parent: self, disposeKey: key) let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
@ -107,7 +107,7 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -119,19 +119,19 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
subscribe = true subscribe = true
} }
else { else {
_queue.enqueue(value) _queue.enqueue(element: value)
subscribe = false subscribe = false
} }
if subscribe { if subscribe {
self.subscribe(value, group: _group) self.subscribe(innerSource: value, group: _group)
} }
case .Error(let error): case .Error(let error):
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
case .Completed: case .Completed:
if _activeCount == 0 { if _activeCount == 0 {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
else { else {
@ -154,7 +154,7 @@ class MergeLimited<S: ObservableConvertibleType> : Producer<S.E> {
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer) let sink = MergeLimitedSink<S, O>(maxConcurrent: _maxConcurrent, observer: observer)
sink.disposable = sink.run(_source) sink.disposable = sink.run(source: _source)
return sink return sink
} }
} }
@ -200,7 +200,7 @@ final class FlatMapWithIndexSink<SourceType, S: ObservableConvertibleType, O: Ob
} }
override func performMap(element: SourceType) throws -> S { override func performMap(element: SourceType) throws -> S {
return try _selector(element, try incrementChecked(&_index)) return try _selector(element, try incrementChecked(i: &_index))
} }
} }
@ -245,15 +245,15 @@ class MergeSinkIter<SourceType, S: ObservableConvertibleType, O: ObserverType wh
switch event { switch event {
case .Next(let value): case .Next(let value):
_parent._lock.lock(); defer { _parent._lock.unlock() } // lock { _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
_parent.forwardOn(.Next(value)) _parent.forwardOn(event: .Next(value))
// } // }
case .Error(let error): case .Error(let error):
_parent._lock.lock(); defer { _parent._lock.unlock() } // lock { _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
_parent.forwardOn(.Error(error)) _parent.forwardOn(event: .Error(error))
_parent.dispose() _parent.dispose()
// } // }
case .Completed: case .Completed:
_parent._group.removeDisposable(_disposeKey) _parent._group.removeDisposable(disposeKey: _disposeKey)
// If this has returned true that means that `Completed` should be sent. // If this has returned true that means that `Completed` should be sent.
// In case there is a race who will sent first completed, // In case there is a race who will sent first completed,
// lock will sort it out. When first Completed message is sent // lock will sort it out. When first Completed message is sent
@ -261,7 +261,7 @@ class MergeSinkIter<SourceType, S: ObservableConvertibleType, O: ObserverType wh
// to be sent, and thus preserving the sequence grammar. // to be sent, and thus preserving the sequence grammar.
if _parent._stopped && _parent._group.count == MergeNoIterators { if _parent._stopped && _parent._group.count == MergeNoIterators {
_parent._lock.lock(); defer { _parent._lock.unlock() } // lock { _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
_parent.forwardOn(.Completed) _parent.forwardOn(event: .Completed)
_parent.dispose() _parent.dispose()
// } // }
} }
@ -303,23 +303,23 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
return return
} }
do { do {
let value = try performMap(element) let value = try performMap(element: element)
subscribeInner(value.asObservable()) subscribeInner(value.asObservable())
} }
catch let e { catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
} }
case .Error(let error): case .Error(let error):
_lock.lock(); defer { _lock.unlock() } // lock { _lock.lock(); defer { _lock.unlock() } // lock {
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
// } // }
case .Completed: case .Completed:
_lock.lock(); defer { _lock.unlock() } // lock { _lock.lock(); defer { _lock.unlock() } // lock {
_stopped = true _stopped = true
if _group.count == MergeNoIterators { if _group.count == MergeNoIterators {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
else { else {
@ -331,7 +331,7 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
func subscribeInner(source: Observable<O.E>) { func subscribeInner(source: Observable<O.E>) {
let iterDisposable = SingleAssignmentDisposable() let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = _group.addDisposable(iterDisposable) { if let disposeKey = _group.addDisposable(disposable: iterDisposable) {
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey) let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter) let subscription = source.subscribe(iter)
iterDisposable.disposable = subscription iterDisposable.disposable = subscription
@ -339,9 +339,9 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
} }
func run(source: Observable<SourceType>) -> Disposable { func run(source: Observable<SourceType>) -> Disposable {
_group.addDisposable(_sourceSubscription) _group.addDisposable(disposable: _sourceSubscription)
let subscription = source.subscribe(self) let subscription = source.subscribe(observer: self)
_sourceSubscription.disposable = subscription _sourceSubscription.disposable = subscription
return _group return _group
@ -364,7 +364,7 @@ final class FlatMap<SourceType, S: ObservableConvertibleType>: Producer<S.E> {
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = FlatMapSink(selector: _selector, observer: observer) let sink = FlatMapSink(selector: _selector, observer: observer)
sink.disposable = sink.run(_source) sink.disposable = sink.run(source: _source)
return sink return sink
} }
} }
@ -383,7 +383,7 @@ final class FlatMapWithIndex<SourceType, S: ObservableConvertibleType>: Producer
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer) let sink = FlatMapWithIndexSink<SourceType, S, O>(selector: _selector, observer: observer)
sink.disposable = sink.run(_source) sink.disposable = sink.run(source: _source)
return sink return sink
} }
@ -403,7 +403,7 @@ final class FlatMapFirst<SourceType, S: ObservableConvertibleType>: Producer<S.E
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer) let sink = FlatMapFirstSink<SourceType, S, O>(selector: _selector, observer: observer)
sink.disposable = sink.run(_source) sink.disposable = sink.run(source: _source)
return sink return sink
} }
} }
@ -417,7 +417,7 @@ final class Merge<S: ObservableConvertibleType> : Producer<S.E> {
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = MergeBasicSink<S, O>(observer: observer) let sink = MergeBasicSink<S, O>(observer: observer)
sink.disposable = sink.run(_source) sink.disposable = sink.run(source: _source)
return sink return sink
} }
} }

View File

@ -27,20 +27,20 @@ class MulticastSink<S: SubjectType, O: ObserverType>: Sink<O>, ObserverType {
let observable = try _parent._selector(connectable) let observable = try _parent._selector(connectable)
let subscription = observable.subscribe(self) let subscription = observable.subscribe(observer: self)
let connection = connectable.connect() let connection = connectable.connect()
return BinaryDisposable(subscription, connection) return BinaryDisposable(subscription, connection)
} }
catch let e { catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
return NopDisposable.instance return NopDisposable.instance
} }
} }
func on(event: Event<ResultType>) { func on(event: Event<ResultType>) {
forwardOn(event) forwardOn(event: event)
switch event { switch event {
case .Next: break case .Next: break
case .Error, .Completed: case .Error, .Completed:

View File

@ -23,7 +23,7 @@ class ObserveOn<E> : Producer<E> {
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = ObserveOnSink(scheduler: scheduler, observer: observer) let sink = ObserveOnSink(scheduler: scheduler, observer: observer)
sink._subscription.disposable = source.subscribe(sink) sink._subscription.disposable = source.subscribe(observer: sink)
return sink return sink
} }
@ -63,7 +63,7 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
override func onCore(event: Event<E>) { override func onCore(event: Event<E>) {
let shouldStart = _lock.calculateLocked { () -> Bool in let shouldStart = _lock.calculateLocked { () -> Bool in
self._queue.enqueue(event) self._queue.enqueue(element: event)
switch self._state { switch self._state {
case .Stopped: case .Stopped:
@ -75,7 +75,7 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
} }
if shouldStart { if shouldStart {
_scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) _scheduleDisposable.disposable = self._scheduler.scheduleRecursive(state: (), action: self.run)
} }
} }
@ -91,7 +91,7 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
} }
if let nextEvent = nextEvent { if let nextEvent = nextEvent {
observer?.on(nextEvent) observer?.on(event: nextEvent)
if nextEvent.isStopEvent { if nextEvent.isStopEvent {
dispose() dispose()
} }

View File

@ -68,7 +68,7 @@ class ObserveOnSerialDispatchQueue<E> : Producer<E> {
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer) let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer)
sink.subscription.disposable = source.subscribe(sink) sink.subscription.disposable = source.subscribe(observer: sink)
return sink return sink
} }

View File

@ -68,7 +68,7 @@ class Reduce<SourceType, AccumulateType, ResultType> : Producer<ResultType> {
override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable {
let sink = ReduceSink(parent: self, observer: observer) let sink = ReduceSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -24,12 +24,12 @@ class RetryTriggerSink<S: Sequence, O: ObserverType, TriggerObservable: Observab
switch event { switch event {
case .Next: case .Next:
_parent._parent._lastError = nil _parent._parent._lastError = nil
_parent._parent.schedule(.MoveNext) _parent._parent.schedule(command: .MoveNext)
case .Error(let e): case .Error(let e):
_parent._parent.forwardOn(.Error(e)) _parent._parent.forwardOn(event: .Error(e))
_parent._parent.dispose() _parent._parent.dispose()
case .Completed: case .Completed:
_parent._parent.forwardOn(.Completed) _parent._parent.forwardOn(event: .Completed)
_parent._parent.dispose() _parent._parent.dispose()
} }
} }
@ -51,7 +51,7 @@ class RetryWhenSequenceSinkIter<S: Sequence, O: ObserverType, TriggerObservable:
func on(event: Event<E>) { func on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
_parent.forwardOn(event) _parent.forwardOn(event: event)
case .Error(let error): case .Error(let error):
_parent._lastError = error _parent._lastError = error
@ -59,16 +59,16 @@ class RetryWhenSequenceSinkIter<S: Sequence, O: ObserverType, TriggerObservable:
// dispose current subscription // dispose current subscription
super.dispose() super.dispose()
let errorHandlerSubscription = _parent._notifier.subscribe(RetryTriggerSink(parent: self)) let errorHandlerSubscription = _parent._notifier.subscribe(observer: RetryTriggerSink(parent: self))
_errorHandlerSubscription.disposable = errorHandlerSubscription _errorHandlerSubscription.disposable = errorHandlerSubscription
_parent._errorSubject.on(.Next(failedWith)) _parent._errorSubject.on(event: .Next(failedWith))
} }
else { else {
_parent.forwardOn(.Error(error)) _parent.forwardOn(event: .Error(error))
_parent.dispose() _parent.dispose()
} }
case .Completed: case .Completed:
_parent.forwardOn(event) _parent.forwardOn(event: event)
_parent.dispose() _parent.dispose()
} }
} }
@ -101,11 +101,11 @@ class RetryWhenSequenceSink<S: Sequence, O: ObserverType, TriggerObservable: Obs
override func done() { override func done() {
if let lastError = _lastError { if let lastError = _lastError {
forwardOn(.Error(lastError)) forwardOn(event: .Error(lastError))
_lastError = nil _lastError = nil
} }
else { else {
forwardOn(.Completed) forwardOn(event: .Completed)
} }
dispose() dispose()
@ -120,13 +120,13 @@ class RetryWhenSequenceSink<S: Sequence, O: ObserverType, TriggerObservable: Obs
override func subscribeToNext(source: Observable<E>) -> Disposable { override func subscribeToNext(source: Observable<E>) -> Disposable {
let iter = RetryWhenSequenceSinkIter(parent: self) let iter = RetryWhenSequenceSinkIter(parent: self)
iter.disposable = source.subscribe(iter) iter.disposable = source.subscribe(observer: iter)
return iter return iter
} }
override func run(sources: SequenceGenerator) -> Disposable { override func run(sources: SequenceGenerator) -> Disposable {
let triggerSubscription = _handler.subscribe(_notifier.asObserver()) let triggerSubscription = _handler.subscribe(observer: _notifier.asObserver())
let superSubscription = super.run(sources) let superSubscription = super.run(sources: sources)
return StableCompositeDisposable.create(superSubscription, triggerSubscription) return StableCompositeDisposable.create(superSubscription, triggerSubscription)
} }
} }
@ -144,7 +144,7 @@ class RetryWhenSequence<S: Sequence, TriggerObservable: ObservableType, Error wh
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = RetryWhenSequenceSink<S, O, TriggerObservable, Error>(parent: self, observer: observer) let sink = RetryWhenSequenceSink<S, O, TriggerObservable, Error>(parent: self, observer: observer)
sink.disposable = sink.run((self._sources.makeIterator(), nil)) sink.disposable = sink.run(sources: (self._sources.makeIterator(), nil))
return sink return sink
} }
} }

View File

@ -27,7 +27,7 @@ class SamplerSink<O: ObserverType, ElementType, SampleType where O.E == ElementT
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -38,23 +38,23 @@ class SamplerSink<O: ObserverType, ElementType, SampleType where O.E == ElementT
_parent._element = nil _parent._element = nil
} }
_parent.forwardOn(.Next(element)) _parent.forwardOn(event: .Next(element))
} }
if _parent._atEnd { if _parent._atEnd {
_parent.forwardOn(.Completed) _parent.forwardOn(event: .Completed)
_parent.dispose() _parent.dispose()
} }
case .Error(let e): case .Error(let e):
_parent.forwardOn(.Error(e)) _parent.forwardOn(event: .Error(e))
_parent.dispose() _parent.dispose()
case .Completed: case .Completed:
if let element = _parent._element { if let element = _parent._element {
_parent._element = nil _parent._element = nil
_parent.forwardOn(.Next(element)) _parent.forwardOn(event: .Next(element))
} }
if _parent._atEnd { if _parent._atEnd {
_parent.forwardOn(.Completed) _parent.forwardOn(event: .Completed)
_parent.dispose() _parent.dispose()
} }
} }
@ -85,14 +85,14 @@ class SampleSequenceSink<O: ObserverType, SampleType>
} }
func run() -> Disposable { func run() -> Disposable {
_sourceSubscription.disposable = _parent._source.subscribe(self) _sourceSubscription.disposable = _parent._source.subscribe(observer: self)
let samplerSubscription = _parent._sampler.subscribe(SamplerSink(parent: self)) let samplerSubscription = _parent._sampler.subscribe(SamplerSink(parent: self))
return StableCompositeDisposable.create(_sourceSubscription, samplerSubscription) return StableCompositeDisposable.create(_sourceSubscription, samplerSubscription)
} }
func on(event: Event<Element>) { func on(event: Event<Element>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<Element>) { func _synchronized_on(event: Event<Element>) {
@ -100,7 +100,7 @@ class SampleSequenceSink<O: ObserverType, SampleType>
case .Next(let element): case .Next(let element):
_element = element _element = element
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Completed: case .Completed:
_atEnd = true _atEnd = true

View File

@ -58,7 +58,7 @@ class Scan<Element, Accumulate>: Producer<Accumulate> {
override func run<O : ObserverType where O.E == Accumulate>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Accumulate>(observer: O) -> Disposable {
let sink = ScanSink(parent: self, observer: observer) let sink = ScanSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -19,13 +19,13 @@ class ObservableSequenceSink<O: ObserverType> : Sink<O> {
} }
func run() -> Disposable { func run() -> Disposable {
return _parent._scheduler!.scheduleRecursive((0, _parent._elements)) { (state, recurse) in return _parent._scheduler!.scheduleRecursive(state: (0, _parent._elements)) { (state, recurse) in
if state.0 < state.1.count { if state.0 < state.1.count {
self.forwardOn(.Next(state.1[state.0])) self.forwardOn(event: .Next(state.1[state.0]))
recurse((state.0 + 1, state.1)) recurse((state.0 + 1, state.1))
} }
else { else {
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
} }
} }
} }
@ -44,10 +44,10 @@ class ObservableSequence<E> : Producer<E> {
// optimized version without scheduler // optimized version without scheduler
guard _scheduler != nil else { guard _scheduler != nil else {
for element in _elements { for element in _elements {
observer.on(.Next(element)) observer.on(event: .Next(element))
} }
observer.on(.Completed) observer.on(event: .Completed)
return NopDisposable.instance return NopDisposable.instance
} }

View File

@ -32,7 +32,7 @@ final class ShareReplay1<Element>
override func subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer) return _synchronized_subscribe(observer: observer)
} }
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
@ -41,19 +41,19 @@ final class ShareReplay1<Element>
} }
if let stopEvent = self._stopEvent { if let stopEvent = self._stopEvent {
observer.on(stopEvent) observer.on(event: stopEvent)
return NopDisposable.instance return NopDisposable.instance
} }
let initialCount = self._observers.count let initialCount = self._observers.count
let disposeKey = self._observers.insert(AnyObserver(observer)) let disposeKey = self._observers.insert(element: AnyObserver(observer))
if initialCount == 0 { if initialCount == 0 {
let connection = SingleAssignmentDisposable() let connection = SingleAssignmentDisposable()
_connection = connection _connection = connection
connection.disposable = self._source.subscribe(self) connection.disposable = self._source.subscribe(observer: self)
} }
return SubscriptionDisposable(owner: self, key: disposeKey) return SubscriptionDisposable(owner: self, key: disposeKey)
@ -61,12 +61,12 @@ final class ShareReplay1<Element>
func synchronizedUnsubscribe(disposeKey: DisposeKey) { func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey) _synchronized_unsubscribe(disposeKey: disposeKey)
} }
func _synchronized_unsubscribe(disposeKey: DisposeKey) { func _synchronized_unsubscribe(disposeKey: DisposeKey) {
// if already unsubscribed, just return // if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil { if self._observers.removeKey(key: disposeKey) == nil {
return return
} }
@ -78,7 +78,7 @@ final class ShareReplay1<Element>
func on(event: Event<E>) { func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_on(event) _synchronized_on(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -96,6 +96,6 @@ final class ShareReplay1<Element>
_connection = nil _connection = nil
} }
_observers.on(event) _observers.on(event: event)
} }
} }

View File

@ -30,23 +30,23 @@ final class ShareReplay1WhileConnected<Element>
override func subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer) return _synchronized_subscribe(observer: observer)
} }
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if let element = self._element { if let element = self._element {
observer.on(.Next(element)) observer.on(event: .Next(element))
} }
let initialCount = self._observers.count let initialCount = self._observers.count
let disposeKey = self._observers.insert(AnyObserver(observer)) let disposeKey = self._observers.insert(element: AnyObserver(observer))
if initialCount == 0 { if initialCount == 0 {
let connection = SingleAssignmentDisposable() let connection = SingleAssignmentDisposable()
_connection = connection _connection = connection
connection.disposable = self._source.subscribe(self) connection.disposable = self._source.subscribe(observer: self)
} }
return SubscriptionDisposable(owner: self, key: disposeKey) return SubscriptionDisposable(owner: self, key: disposeKey)
@ -54,12 +54,12 @@ final class ShareReplay1WhileConnected<Element>
func synchronizedUnsubscribe(disposeKey: DisposeKey) { func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey) _synchronized_unsubscribe(disposeKey: disposeKey)
} }
func _synchronized_unsubscribe(disposeKey: DisposeKey) { func _synchronized_unsubscribe(disposeKey: DisposeKey) {
// if already unsubscribed, just return // if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil { if self._observers.removeKey(key: disposeKey) == nil {
return return
} }
@ -72,21 +72,21 @@ final class ShareReplay1WhileConnected<Element>
func on(event: Event<E>) { func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_on(event) _synchronized_on(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
switch event { switch event {
case .Next(let element): case .Next(let element):
_element = element _element = element
_observers.on(event) _observers.on(event: event)
case .Error, .Completed: case .Error, .Completed:
_element = nil _element = nil
_connection?.dispose() _connection?.dispose()
_connection = nil _connection = nil
let observers = _observers let observers = _observers
_observers = Bag() _observers = Bag()
observers.on(event) observers.on(event: event)
} }
} }
} }

View File

@ -70,7 +70,7 @@ class SingleAsync<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SingleAsyncSink(parent: self, observer: observer) let sink = SingleAsyncSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -22,7 +22,7 @@ class Sink<O : ObserverType> : SingleAssignmentDisposable {
if disposed { if disposed {
return return
} }
_observer.on(event) _observer.on(event: event)
} }
final func forwarder() -> SinkForward<O> { final func forwarder() -> SinkForward<O> {
@ -48,9 +48,9 @@ class SinkForward<O: ObserverType>: ObserverType {
func on(event: Event<E>) { func on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
_forward._observer.on(event) _forward._observer.on(event: event)
case .Error, .Completed: case .Error, .Completed:
_forward._observer.on(event) _forward._observer.on(event: event)
_forward.dispose() _forward.dispose()
} }
} }

View File

@ -29,16 +29,16 @@ class SkipCountSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
case .Next(let value): case .Next(let value):
if remaining <= 0 { if remaining <= 0 {
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
else { else {
remaining -= 1 remaining -= 1
} }
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
self.dispose() self.dispose()
case .Completed: case .Completed:
forwardOn(event) forwardOn(event: event)
self.dispose() self.dispose()
} }
} }
@ -56,7 +56,7 @@ class SkipCount<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SkipCountSink(parent: self, observer: observer) let sink = SkipCountSink(parent: self, observer: observer)
sink.disposable = source.subscribe(sink) sink.disposable = source.subscribe(observer: sink)
return sink return sink
} }
@ -82,13 +82,13 @@ class SkipTimeSink<ElementType, O: ObserverType where O.E == ElementType> : Sink
switch event { switch event {
case .Next(let value): case .Next(let value):
if open { if open {
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
self.dispose() self.dispose()
case .Completed: case .Completed:
forwardOn(event) forwardOn(event: event)
self.dispose() self.dispose()
} }
} }
@ -98,12 +98,12 @@ class SkipTimeSink<ElementType, O: ObserverType where O.E == ElementType> : Sink
} }
func run() -> Disposable { func run() -> Disposable {
let disposeTimer = parent.scheduler.scheduleRelative((), dueTime: self.parent.duration) { let disposeTimer = parent.scheduler.scheduleRelative(state: (), dueTime: self.parent.duration) {
self.tick() self.tick()
return NopDisposable.instance return NopDisposable.instance
} }
let disposeSubscription = parent.source.subscribe(self) let disposeSubscription = parent.source.subscribe(observer: self)
return BinaryDisposable(disposeTimer, disposeSubscription) return BinaryDisposable(disposeTimer, disposeSubscription)
} }

View File

@ -31,7 +31,7 @@ class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -40,7 +40,7 @@ class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
_parent._forwardElements = true _parent._forwardElements = true
_subscription.dispose() _subscription.dispose()
case .Error(let e): case .Error(let e):
_parent.forwardOn(.Error(e)) _parent.forwardOn(event: .Error(e))
_parent.dispose() _parent.dispose()
case .Completed: case .Completed:
_subscription.dispose() _subscription.dispose()
@ -76,30 +76,30 @@ class SkipUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
if _forwardElements { if _forwardElements {
forwardOn(event) forwardOn(event: event)
} }
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Completed: case .Completed:
if _forwardElements { if _forwardElements {
forwardOn(event) forwardOn(event: event)
} }
_sourceSubscription.dispose() _sourceSubscription.dispose()
} }
} }
func run() -> Disposable { func run() -> Disposable {
let sourceSubscription = _parent._source.subscribe(self) let sourceSubscription = _parent._source.subscribe(observer: self)
let otherObserver = SkipUntilSinkOther(parent: self) let otherObserver = SkipUntilSinkOther(parent: self)
let otherSubscription = _parent._other.subscribe(otherObserver) let otherSubscription = _parent._other.subscribe(observer: otherObserver)
_sourceSubscription.disposable = sourceSubscription _sourceSubscription.disposable = sourceSubscription
otherObserver._subscription.disposable = otherSubscription otherObserver._subscription.disposable = otherSubscription

View File

@ -26,17 +26,17 @@ class SkipWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
do { do {
_running = try !_parent._predicate(value) _running = try !_parent._predicate(value)
} catch let e { } catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
return return
} }
} }
if _running { if _running {
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
case .Error, .Completed: case .Error, .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
} }
} }
@ -62,19 +62,19 @@ class SkipWhileSinkWithIndex<ElementType, O: ObserverType where O.E == ElementTy
if !_running { if !_running {
do { do {
_running = try !_parent._predicateWithIndex(value, _index) _running = try !_parent._predicateWithIndex(value, _index)
try incrementChecked(&_index) try incrementChecked(i: &_index)
} catch let e { } catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
return return
} }
} }
if _running { if _running {
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
case .Error, .Completed: case .Error, .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
} }
} }
@ -103,12 +103,12 @@ class SkipWhile<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if let _ = _predicate { if let _ = _predicate {
let sink = SkipWhileSink(parent: self, observer: observer) let sink = SkipWhileSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
else { else {
let sink = SkipWhileSinkWithIndex(parent: self, observer: observer) let sink = SkipWhileSinkWithIndex(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -34,7 +34,7 @@ class SubscribeOnSink<Ob: ObservableType, O: ObserverType where Ob.E == O.E> : S
disposeEverything.disposable = cancelSchedule disposeEverything.disposable = cancelSchedule
cancelSchedule.disposable = parent.scheduler.schedule(()) { (_) -> Disposable in cancelSchedule.disposable = parent.scheduler.schedule(()) { (_) -> Disposable in
let subscription = self.parent.source.subscribe(self) let subscription = self.parent.source.subscribe(observer: self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription) disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return NopDisposable.instance return NopDisposable.instance
} }

View File

@ -30,7 +30,7 @@ class SwitchSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
} }
func run(source: Observable<SourceType>) -> Disposable { func run(source: Observable<SourceType>) -> Disposable {
let subscription = source.subscribe(self) let subscription = source.subscribe(observer: self)
_subscriptions.disposable = subscription _subscriptions.disposable = subscription
return StableCompositeDisposable.create(_subscriptions, _innerSubscription) return StableCompositeDisposable.create(_subscriptions, _innerSubscription)
} }

View File

@ -63,7 +63,7 @@ class TakeCount<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeCountSink(parent: self, observer: observer) let sink = TakeCountSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }
@ -117,7 +117,7 @@ class TakeTimeSink<ElementType, O: ObserverType where O.E == ElementType>
return NopDisposable.instance return NopDisposable.instance
} }
let disposeSubscription = _parent._source.subscribe(self) let disposeSubscription = _parent._source.subscribe(observer: self)
return BinaryDisposable(disposeTimer, disposeSubscription) return BinaryDisposable(disposeTimer, disposeSubscription)
} }

View File

@ -26,18 +26,18 @@ class TakeLastSink<ElementType, O: ObserverType where O.E == ElementType> : Sink
func on(event: Event<E>) { func on(event: Event<E>) {
switch event { switch event {
case .Next(let value): case .Next(let value):
_elements.enqueue(value) _elements.enqueue(element: value)
if _elements.count > self._parent._count { if _elements.count > self._parent._count {
_elements.dequeue() _elements.dequeue()
} }
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Completed: case .Completed:
for e in _elements { for e in _elements {
forwardOn(.Next(e)) forwardOn(event: .Next(e))
} }
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
} }
@ -57,7 +57,7 @@ class TakeLast<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeLastSink(parent: self, observer: observer) let sink = TakeLastSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -31,16 +31,16 @@ class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
_parent.forwardOn(.Completed) _parent.forwardOn(event: .Completed)
_parent.dispose() _parent.dispose()
case .Error(let e): case .Error(let e):
_parent.forwardOn(.Error(e)) _parent.forwardOn(event: .Error(e))
_parent.dispose() _parent.dispose()
case .Completed: case .Completed:
_parent._open = true _parent._open = true
@ -76,27 +76,27 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
switch event { switch event {
case .Next: case .Next:
forwardOn(event) forwardOn(event: event)
case .Error: case .Error:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Completed: case .Completed:
forwardOn(event) forwardOn(event: event)
dispose() dispose()
} }
} }
func run() -> Disposable { func run() -> Disposable {
let otherObserver = TakeUntilSinkOther(parent: self) let otherObserver = TakeUntilSinkOther(parent: self)
let otherSubscription = _parent._other.subscribe(otherObserver) let otherSubscription = _parent._other.subscribe(observer: otherObserver)
otherObserver._subscription.disposable = otherSubscription otherObserver._subscription.disposable = otherSubscription
let sourceSubscription = _parent._source.subscribe(self) let sourceSubscription = _parent._source.subscribe(observer: self)
return StableCompositeDisposable.create(sourceSubscription, otherObserver._subscription) return StableCompositeDisposable.create(sourceSubscription, otherObserver._subscription)
} }

View File

@ -121,11 +121,11 @@ class TakeWhile<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if let _ = _predicate { if let _ = _predicate {
let sink = TakeWhileSink(parent: self, observer: observer) let sink = TakeWhileSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} else { } else {
let sink = TakeWhileSinkWithIndex(parent: self, observer: observer) let sink = TakeWhileSinkWithIndex(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -33,13 +33,13 @@ class ThrottleSink<O: ObserverType>
} }
func run() -> Disposable { func run() -> Disposable {
let subscription = _parent._source.subscribe(self) let subscription = _parent._source.subscribe(observer: self)
return StableCompositeDisposable.create(subscription, cancellable) return StableCompositeDisposable.create(subscription, cancellable)
} }
func on(event: Event<Element>) { func on(event: Event<Element>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<Element>) { func _synchronized_on(event: Event<Element>) {
@ -55,17 +55,17 @@ class ThrottleSink<O: ObserverType>
let d = SingleAssignmentDisposable() let d = SingleAssignmentDisposable()
self.cancellable.disposable = d self.cancellable.disposable = d
d.disposable = scheduler.scheduleRelative(currentId, dueTime: dueTime, action: self.propagate) d.disposable = scheduler.scheduleRelative(state: currentId, dueTime: dueTime, action: self.propagate)
case .Error: case .Error:
_value = nil _value = nil
forwardOn(event) forwardOn(event: event)
dispose() dispose()
case .Completed: case .Completed:
if let value = _value { if let value = _value {
_value = nil _value = nil
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
} }
@ -76,7 +76,7 @@ class ThrottleSink<O: ObserverType>
if let value = originalValue where _id == currentId { if let value = originalValue where _id == currentId {
_value = nil _value = nil
forwardOn(.Next(value)) forwardOn(event: .Next(value))
} }
// } // }
return NopDisposable.instance return NopDisposable.instance

View File

@ -19,8 +19,8 @@ class TimerSink<O: ObserverType where O.E : SignedInteger > : Sink<O> {
} }
func run() -> Disposable { func run() -> Disposable {
return _parent._scheduler.schedulePeriodic(0 as O.E, startAfter: _parent._dueTime, period: _parent._period!) { state in return _parent._scheduler.schedulePeriodic(state: 0 as O.E, startAfter: _parent._dueTime, period: _parent._period!) { state in
self.forwardOn(.Next(state)) self.forwardOn(event: .Next(state))
return state &+ 1 return state &+ 1
} }
} }
@ -37,9 +37,9 @@ class TimerOneOffSink<O: ObserverType where O.E : SignedInteger> : Sink<O> {
} }
func run() -> Disposable { func run() -> Disposable {
return _parent._scheduler.scheduleRelative((), dueTime: _parent._dueTime) { (_) -> Disposable in return _parent._scheduler.scheduleRelative(state: (), dueTime: _parent._dueTime) { (_) -> Disposable in
self.forwardOn(.Next(0)) self.forwardOn(event: .Next(0))
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
return NopDisposable.instance return NopDisposable.instance
} }

View File

@ -44,7 +44,7 @@ class ToArray<SourceType> : Producer<[SourceType]> {
override func run<O: ObserverType where O.E == [SourceType]>(observer: O) -> Disposable { override func run<O: ObserverType where O.E == [SourceType]>(observer: O) -> Disposable {
let sink = ToArraySink(parent: self, observer: observer) let sink = ToArraySink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink) sink.disposable = _source.subscribe(observer: sink)
return sink return sink
} }
} }

View File

@ -29,12 +29,12 @@ class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType where O.E
let source = try _parent._observableFactory(resource) let source = try _parent._observableFactory(resource)
return StableCompositeDisposable.create( return StableCompositeDisposable.create(
source.subscribe(self), source.subscribe(observer: self),
disposable disposable
) )
} catch let error { } catch let error {
return StableCompositeDisposable.create( return StableCompositeDisposable.create(
Observable.error(error).subscribe(self), Observable.error(error).subscribe(observer: self),
disposable disposable
) )
} }

View File

@ -31,7 +31,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
init(parent: Parent, observer: O) { init(parent: Parent, observer: O) {
_parent = parent _parent = parent
_groupDisposable.addDisposable(_timerD) _groupDisposable.addDisposable(disposable: _timerD)
_refCountDisposable = RefCountDisposable(disposable: _groupDisposable) _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
super.init(observer: observer) super.init(observer: observer)
@ -39,22 +39,22 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
func run() -> Disposable { func run() -> Disposable {
forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) forwardOn(event: .Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
createTimer(_windowId) createTimer(windowId: _windowId)
_groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) _groupDisposable.addDisposable(disposable: _parent._source.subscribeSafe(observer: self))
return _refCountDisposable return _refCountDisposable
} }
func startNewWindowAndCompleteCurrentOne() { func startNewWindowAndCompleteCurrentOne() {
_subject.on(.Completed) _subject.on(event: .Completed)
_subject = PublishSubject<Element>() _subject = PublishSubject<Element>()
forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) forwardOn(event: .Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -63,12 +63,12 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
switch event { switch event {
case .Next(let element): case .Next(let element):
_subject.on(.Next(element)) _subject.on(event: .Next(element))
do { do {
try incrementChecked(&_count) try incrementChecked(i: &_count)
} catch (let e) { } catch (let e) {
_subject.on(.Error(e as ErrorProtocol)) _subject.on(event: .Error(e as ErrorProtocol))
dispose() dispose()
} }
@ -81,17 +81,17 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
} }
case .Error(let error): case .Error(let error):
_subject.on(.Error(error)) _subject.on(event: .Error(error))
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
case .Completed: case .Completed:
_subject.on(.Completed) _subject.on(event: .Completed)
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
if newWindow { if newWindow {
createTimer(newId) createTimer(windowId: newId)
} }
} }
@ -108,7 +108,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
_timerD.disposable = nextTimer _timerD.disposable = nextTimer
nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in nextTimer.disposable = _parent._scheduler.scheduleRelative(state: windowId, dueTime: _parent._timeSpan) { previousWindowId in
var newId = 0 var newId = 0
@ -123,7 +123,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
self.startNewWindowAndCompleteCurrentOne() self.startNewWindowAndCompleteCurrentOne()
} }
self.createTimer(newId) self.createTimer(windowId: newId)
return NopDisposable.instance return NopDisposable.instance
} }

View File

@ -32,14 +32,14 @@ class WithLatestFromSink<FirstType, SecondType, ResultType, O: ObserverType wher
let sndSubscription = SingleAssignmentDisposable() let sndSubscription = SingleAssignmentDisposable()
let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription) let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
sndSubscription.disposable = _parent._second.subscribe(sndO) sndSubscription.disposable = _parent._second.subscribe(observer: sndO)
let fstSubscription = _parent._first.subscribe(self) let fstSubscription = _parent._first.subscribe(observer: self)
return StableCompositeDisposable.create(fstSubscription, sndSubscription) return StableCompositeDisposable.create(fstSubscription, sndSubscription)
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -49,16 +49,16 @@ class WithLatestFromSink<FirstType, SecondType, ResultType, O: ObserverType wher
do { do {
let res = try _parent._resultSelector(value, latest) let res = try _parent._resultSelector(value, latest)
forwardOn(.Next(res)) forwardOn(event: .Next(res))
} catch let e { } catch let e {
forwardOn(.Error(e)) forwardOn(event: .Error(e))
dispose() dispose()
} }
case .Completed: case .Completed:
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
case let .Error(error): case let .Error(error):
forwardOn(.Error(error)) forwardOn(event: .Error(error))
dispose() dispose()
} }
} }
@ -85,7 +85,7 @@ class WithLatestFromSecond<FirstType, SecondType, ResultType, O: ObserverType wh
} }
func on(event: Event<E>) { func on(event: Event<E>) {
synchronizedOn(event) synchronizedOn(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -95,7 +95,7 @@ class WithLatestFromSecond<FirstType, SecondType, ResultType, O: ObserverType wh
case .Completed: case .Completed:
_disposable.dispose() _disposable.dispose()
case let .Error(error): case let .Error(error):
_parent.forwardOn(.Error(error)) _parent.forwardOn(event: .Error(error))
_parent.dispose() _parent.dispose()
} }
} }

View File

@ -42,7 +42,7 @@ class ZipCollectionTypeSink<C: Collection, R, O: ObserverType where C.Iterator.E
_lock.lock(); defer { _lock.unlock() } // { _lock.lock(); defer { _lock.unlock() } // {
switch event { switch event {
case .Next(let element): case .Next(let element):
_values[atIndex].enqueue(element) _values[atIndex].enqueue(element: element)
if _values[atIndex].count == 1 { if _values[atIndex].count == 1 {
_numberOfValues += 1 _numberOfValues += 1
@ -51,7 +51,7 @@ class ZipCollectionTypeSink<C: Collection, R, O: ObserverType where C.Iterator.E
if _numberOfValues < _parent.count { if _numberOfValues < _parent.count {
let numberOfOthersThatAreDone = _numberOfDone - (_isDone[atIndex] ? 1 : 0) let numberOfOthersThatAreDone = _numberOfDone - (_isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == _parent.count - 1 { if numberOfOthersThatAreDone == _parent.count - 1 {
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
self.dispose() self.dispose()
} }
return return
@ -72,15 +72,15 @@ class ZipCollectionTypeSink<C: Collection, R, O: ObserverType where C.Iterator.E
} }
let result = try _parent.resultSelector(arguments) let result = try _parent.resultSelector(arguments)
self.forwardOn(.Next(result)) self.forwardOn(event: .Next(result))
} }
catch let error { catch let error {
self.forwardOn(.Error(error)) self.forwardOn(event: .Error(error))
self.dispose() self.dispose()
} }
case .Error(let error): case .Error(let error):
self.forwardOn(.Error(error)) self.forwardOn(event: .Error(error))
self.dispose() self.dispose()
case .Completed: case .Completed:
if _isDone[atIndex] { if _isDone[atIndex] {
@ -91,7 +91,7 @@ class ZipCollectionTypeSink<C: Collection, R, O: ObserverType where C.Iterator.E
_numberOfDone += 1 _numberOfDone += 1
if _numberOfDone == _parent.count { if _numberOfDone == _parent.count {
self.forwardOn(.Completed) self.forwardOn(event: .Completed)
self.dispose() self.dispose()
} }
else { else {
@ -106,8 +106,8 @@ class ZipCollectionTypeSink<C: Collection, R, O: ObserverType where C.Iterator.E
for i in _parent.sources.startIndex ..< _parent.sources.endIndex { for i in _parent.sources.startIndex ..< _parent.sources.endIndex {
let index = j let index = j
let source = _parent.sources[i].asObservable() let source = _parent.sources[i].asObservable()
_subscriptions[j].disposable = source.subscribe(AnyObserver { event in _subscriptions[j].disposable = source.subscribe(observer: AnyObserver { event in
self.on(event, atIndex: index) self.on(event: event, atIndex: index)
}) })
j += 1 j += 1
} }

View File

@ -63,11 +63,11 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
let subscription1 = SingleAssignmentDisposable() let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable() let subscription2 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -159,13 +159,13 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
let subscription2 = SingleAssignmentDisposable() let subscription2 = SingleAssignmentDisposable()
let subscription3 = SingleAssignmentDisposable() let subscription3 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -263,15 +263,15 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
let subscription3 = SingleAssignmentDisposable() let subscription3 = SingleAssignmentDisposable()
let subscription4 = SingleAssignmentDisposable() let subscription4 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue(element: $0) }, this: subscription4)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
subscription4.disposable = _parent.source4.subscribe(observer4) subscription4.disposable = _parent.source4.subscribe(observer: observer4)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -375,17 +375,17 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
let subscription4 = SingleAssignmentDisposable() let subscription4 = SingleAssignmentDisposable()
let subscription5 = SingleAssignmentDisposable() let subscription5 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue(element: $0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue(element: $0) }, this: subscription5)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
subscription4.disposable = _parent.source4.subscribe(observer4) subscription4.disposable = _parent.source4.subscribe(observer: observer4)
subscription5.disposable = _parent.source5.subscribe(observer5) subscription5.disposable = _parent.source5.subscribe(observer: observer5)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -495,19 +495,19 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
let subscription5 = SingleAssignmentDisposable() let subscription5 = SingleAssignmentDisposable()
let subscription6 = SingleAssignmentDisposable() let subscription6 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue(element: $0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue(element: $0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue(element: $0) }, this: subscription6)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
subscription4.disposable = _parent.source4.subscribe(observer4) subscription4.disposable = _parent.source4.subscribe(observer: observer4)
subscription5.disposable = _parent.source5.subscribe(observer5) subscription5.disposable = _parent.source5.subscribe(observer: observer5)
subscription6.disposable = _parent.source6.subscribe(observer6) subscription6.disposable = _parent.source6.subscribe(observer: observer6)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -623,21 +623,21 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
let subscription6 = SingleAssignmentDisposable() let subscription6 = SingleAssignmentDisposable()
let subscription7 = SingleAssignmentDisposable() let subscription7 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue(element: $0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue(element: $0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue(element: $0) }, this: subscription6)
let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue(element: $0) }, this: subscription7)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
subscription4.disposable = _parent.source4.subscribe(observer4) subscription4.disposable = _parent.source4.subscribe(observer: observer4)
subscription5.disposable = _parent.source5.subscribe(observer5) subscription5.disposable = _parent.source5.subscribe(observer: observer5)
subscription6.disposable = _parent.source6.subscribe(observer6) subscription6.disposable = _parent.source6.subscribe(observer: observer6)
subscription7.disposable = _parent.source7.subscribe(observer7) subscription7.disposable = _parent.source7.subscribe(observer: observer7)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,
@ -759,23 +759,23 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
let subscription7 = SingleAssignmentDisposable() let subscription7 = SingleAssignmentDisposable()
let subscription8 = SingleAssignmentDisposable() let subscription8 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue(element: $0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue(element: $0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue(element: $0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue(element: $0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue(element: $0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue(element: $0) }, this: subscription6)
let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue(element: $0) }, this: subscription7)
let observer8 = ZipObserver(lock: _lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8) let observer8 = ZipObserver(lock: _lock, parent: self, index: 7, setNextValue: { self._values8.enqueue(element: $0) }, this: subscription8)
subscription1.disposable = _parent.source1.subscribe(observer1) subscription1.disposable = _parent.source1.subscribe(observer: observer1)
subscription2.disposable = _parent.source2.subscribe(observer2) subscription2.disposable = _parent.source2.subscribe(observer: observer2)
subscription3.disposable = _parent.source3.subscribe(observer3) subscription3.disposable = _parent.source3.subscribe(observer: observer3)
subscription4.disposable = _parent.source4.subscribe(observer4) subscription4.disposable = _parent.source4.subscribe(observer: observer4)
subscription5.disposable = _parent.source5.subscribe(observer5) subscription5.disposable = _parent.source5.subscribe(observer: observer5)
subscription6.disposable = _parent.source6.subscribe(observer6) subscription6.disposable = _parent.source6.subscribe(observer: observer6)
subscription7.disposable = _parent.source7.subscribe(observer7) subscription7.disposable = _parent.source7.subscribe(observer: observer7)
subscription8.disposable = _parent.source8.subscribe(observer8) subscription8.disposable = _parent.source8.subscribe(observer: observer8)
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [
subscription1, subscription1,

View File

@ -65,11 +65,11 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
}).joinWithSeparator("\n") %> }).joinWithSeparator("\n") %>
<%= (Array(1...i).map { <%= (Array(1...i).map {
" let observer\($0) = ZipObserver(lock: _lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))" " let observer\($0) = ZipObserver(lock: _lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue(element: $0) }, this: subscription\($0))"
}).joinWithSeparator("\n") %> }).joinWithSeparator("\n") %>
<%= (Array(1...i).map { <%= (Array(1...i).map {
" subscription\($0).disposable = _parent.source\($0).subscribe(observer\($0))" }).joinWithSeparator("\n") " subscription\($0).disposable = _parent.source\($0).subscribe(observer: observer\($0))" }).joinWithSeparator("\n")
%> %>
return CompositeDisposable(disposables: [ return CompositeDisposable(disposables: [

View File

@ -69,7 +69,7 @@ extension ObservableType {
*/ */
@warn_unused_result(message: "http://git.io/rxs.uo") @warn_unused_result(message: "http://git.io/rxs.uo")
public func publish() -> ConnectableObservable<E> { public func publish() -> ConnectableObservable<E> {
return self.multicast(PublishSubject()) return self.multicast(subject: PublishSubject())
} }
} }
@ -90,7 +90,7 @@ extension ObservableType {
@warn_unused_result(message: "http://git.io/rxs.uo") @warn_unused_result(message: "http://git.io/rxs.uo")
public func replay(bufferSize: Int) public func replay(bufferSize: Int)
-> ConnectableObservable<E> { -> ConnectableObservable<E> {
return self.multicast(ReplaySubject.create(bufferSize: bufferSize)) return self.multicast(subject: ReplaySubject.create(bufferSize: bufferSize))
} }
/** /**
@ -105,7 +105,7 @@ extension ObservableType {
@warn_unused_result(message: "http://git.io/rxs.uo") @warn_unused_result(message: "http://git.io/rxs.uo")
public func replayAll() public func replayAll()
-> ConnectableObservable<E> { -> ConnectableObservable<E> {
return self.multicast(ReplaySubject.createUnbounded()) return self.multicast(subject: ReplaySubject.createUnbounded())
} }
} }
@ -166,7 +166,7 @@ extension ObservableType {
return ShareReplay1(source: self.asObservable()) return ShareReplay1(source: self.asObservable())
} }
else { else {
return self.replay(bufferSize).refCount() return self.replay(bufferSize: bufferSize).refCount()
} }
} }

View File

@ -203,7 +203,7 @@ extension ObservableType {
@warn_unused_result(message: "http://git.io/rxs.uo") @warn_unused_result(message: "http://git.io/rxs.uo")
public func catchErrorJustReturn(element: E) public func catchErrorJustReturn(element: E)
-> Observable<E> { -> Observable<E> {
return Catch(source: asObservable(), handler: { _ in Observable.just(element) }) return Catch(source: asObservable(), handler: { _ in Observable.just(element: element) })
} }
} }
@ -294,7 +294,7 @@ extension Sequence where Iterator.Element : ObservableType {
public func amb() public func amb()
-> Observable<Iterator.Element.E> { -> Observable<Iterator.Element.E> {
return self.reduce(Observable.never()) { a, o in return self.reduce(Observable.never()) { a, o in
return a.amb(o.asObservable()) return a.amb(right: o.asObservable())
} }
} }
} }

View File

@ -253,7 +253,7 @@ extension ObservableType {
@warn_unused_result(message: "http://git.io/rxs.uo") @warn_unused_result(message: "http://git.io/rxs.uo")
public func timeout(dueTime: RxTimeInterval, scheduler: SchedulerType) public func timeout(dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> { -> Observable<E> {
return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.Timeout), scheduler: scheduler) return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(error: RxError.Timeout), scheduler: scheduler)
} }
/** /**

View File

@ -17,7 +17,7 @@ class ObserverBase<ElementType> : Disposable, ObserverType {
switch event { switch event {
case .Next: case .Next:
if _isStopped == 0 { if _isStopped == 0 {
onCore(event) onCore(event: event)
} }
case .Error, .Completed: case .Error, .Completed:
@ -25,7 +25,7 @@ class ObserverBase<ElementType> : Disposable, ObserverType {
return return
} }
onCore(event) onCore(event: event)
} }
} }

View File

@ -39,12 +39,12 @@ class TailRecursiveSink<S: Sequence, O: ObserverType where S.Iterator.Element: O
func run(sources: SequenceGenerator) -> Disposable { func run(sources: SequenceGenerator) -> Disposable {
_generators.append(sources) _generators.append(sources)
schedule(.MoveNext) schedule(command: .MoveNext)
return _subscription return _subscription
} }
func invoke(command: TailRecursiveSinkCommand) { func invoke(value command: TailRecursiveSinkCommand) {
switch command { switch command {
case .Dispose: case .Dispose:
disposeCommand() disposeCommand()
@ -55,11 +55,11 @@ class TailRecursiveSink<S: Sequence, O: ObserverType where S.Iterator.Element: O
// simple implementation for now // simple implementation for now
func schedule(command: TailRecursiveSinkCommand) { func schedule(command: TailRecursiveSinkCommand) {
_gate.invoke(InvocableScheduledItem(invocable: self, state: command)) _gate.invoke(action: InvocableScheduledItem(invocable: self, state: command))
} }
func done() { func done() {
forwardOn(.Completed) forwardOn(event: .Completed)
dispose() dispose()
} }
@ -107,7 +107,7 @@ class TailRecursiveSink<S: Sequence, O: ObserverType where S.Iterator.Element: O
_generators.append((e, nil)) _generators.append((e, nil))
} }
let nextGenerator = extract(nextCandidate) let nextGenerator = extract(observable: nextCandidate)
if let nextGenerator = nextGenerator { if let nextGenerator = nextGenerator {
_generators.append(nextGenerator) _generators.append(nextGenerator)
@ -129,7 +129,7 @@ class TailRecursiveSink<S: Sequence, O: ObserverType where S.Iterator.Element: O
let disposable = SingleAssignmentDisposable() let disposable = SingleAssignmentDisposable()
_subscription.disposable = disposable _subscription.disposable = disposable
disposable.disposable = subscribeToNext(next!) disposable.disposable = subscribeToNext(source: next!)
} }
func subscribeToNext(source: Observable<E>) -> Disposable { func subscribeToNext(source: Observable<E>) -> Disposable {
@ -146,7 +146,7 @@ class TailRecursiveSink<S: Sequence, O: ObserverType where S.Iterator.Element: O
_subscription.dispose() _subscription.dispose()
schedule(.Dispose) schedule(command: .Dispose)
} }
} }

View File

@ -23,7 +23,7 @@ public var resourceCount: AtomicInt = 0
rxFatalError("Abstract method") rxFatalError("Abstract method")
} }
@noreturn func rxFatalError(lastMessage: String) { @noreturn func rxFatalError(_ lastMessage: String) {
// The temptation to comment this line is great, but please don't, it's for your own good. The choice is yours. // The temptation to comment this line is great, but please don't, it's for your own good. The choice is yours.
fatalError(lastMessage) fatalError(lastMessage)
} }

View File

@ -72,7 +72,7 @@ public final class ConcurrentMainScheduler : SchedulerType {
- returns: The disposable object used to cancel the scheduled action (best effort). - returns: The disposable object used to cancel the scheduled action (best effort).
*/ */
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable { public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable {
return _mainScheduler.scheduleRelative(state, dueTime: dueTime, action: action) return _mainScheduler.scheduleRelative(state: state, dueTime: dueTime, action: action)
} }
/** /**
@ -85,6 +85,6 @@ public final class ConcurrentMainScheduler : SchedulerType {
- returns: The disposable object used to cancel the scheduled action (best effort). - returns: The disposable object used to cancel the scheduled action (best effort).
*/ */
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable { public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
return _mainScheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action) return _mainScheduler.schedulePeriodic(state: state, startAfter: startAfter, period: period, action: action)
} }
} }

View File

@ -22,7 +22,7 @@ import Foundation
let CurrentThreadSchedulerValueInstance = CurrentThreadSchedulerKeyInstance let CurrentThreadSchedulerValueInstance = CurrentThreadSchedulerKeyInstance
@objc class CurrentThreadSchedulerKey : NSObject, NSCopying { @objc class CurrentThreadSchedulerKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool { override func isEqual(_ object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerKeyInstance return object === CurrentThreadSchedulerKeyInstance
} }
@ -38,7 +38,7 @@ import Foundation
} }
@objc class CurrentThreadSchedulerQueueKey : NSObject, NSCopying { @objc class CurrentThreadSchedulerQueueKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool { override func isEqual(_ object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerQueueKeyInstance return object === CurrentThreadSchedulerQueueKeyInstance
} }
@ -71,10 +71,10 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
static var queue : ScheduleQueue? { static var queue : ScheduleQueue? {
get { get {
return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance) return NSThread.getThreadLocalStorageValueForKey(key: CurrentThreadSchedulerQueueKeyInstance)
} }
set { set {
NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance) NSThread.setThreadLocalStorageValue(value: newValue, forKey: CurrentThreadSchedulerQueueKeyInstance)
} }
} }
@ -83,11 +83,11 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
*/ */
public static private(set) var isScheduleRequired: Bool { public static private(set) var isScheduleRequired: Bool {
get { get {
let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(key: CurrentThreadSchedulerKeyInstance)
return value == nil return value == nil
} }
set(isScheduleRequired) { set(isScheduleRequired) {
NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance) NSThread.setThreadLocalStorageValue(value: isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance)
} }
} }
@ -138,7 +138,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
} }
let scheduledItem = ScheduledItem(action: action, state: state) let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem) queue.value.enqueue(element: scheduledItem)
// In Xcode 7.3, `return scheduledItem` causes segmentation fault 11 on release build. // In Xcode 7.3, `return scheduledItem` causes segmentation fault 11 on release build.
// To workaround this compiler issue, returns AnonymousDisposable that disposes scheduledItem. // To workaround this compiler issue, returns AnonymousDisposable that disposes scheduledItem.

View File

@ -27,7 +27,7 @@ private class ImmediateScheduler : ImmediateSchedulerType {
*/ */
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
let disposable = SingleAssignmentDisposable() let disposable = SingleAssignmentDisposable()
_asyncLock.invoke(AnonymousInvocable { _asyncLock.invoke(action: AnonymousInvocable {
if disposable.disposed { if disposable.disposed {
return return
} }

View File

@ -19,6 +19,6 @@ struct InvocableScheduledItem<I: InvocableWithValueType> : InvocableType {
} }
func invoke() { func invoke() {
_invocable.invoke(_state) _invocable.invoke(value: _state)
} }
} }

View File

@ -34,7 +34,7 @@ class SchedulePeriodicRecursive<State> {
} }
func start() -> Disposable { func start() -> Disposable {
return _scheduler.scheduleRecursive(SchedulePeriodicRecursiveCommand.Tick, dueTime: _startAfter, action: self.tick) return _scheduler.scheduleRecursive(state: SchedulePeriodicRecursiveCommand.Tick, dueTime: _startAfter, action: self.tick)
} }
func tick(command: SchedulePeriodicRecursiveCommand, scheduler: RecursiveScheduler) -> Void { func tick(command: SchedulePeriodicRecursiveCommand, scheduler: RecursiveScheduler) -> Void {
@ -43,12 +43,12 @@ class SchedulePeriodicRecursive<State> {
// tick interval is short. // tick interval is short.
switch command { switch command {
case .Tick: case .Tick:
scheduler.schedule(.Tick, dueTime: _period) scheduler.schedule(state: .Tick, dueTime: _period)
// The idea is that if on tick there wasn't any item enqueued, schedule to perform work immediatelly. // The idea is that if on tick there wasn't any item enqueued, schedule to perform work immediatelly.
// Else work will be scheduled after previous enqueued work completes. // Else work will be scheduled after previous enqueued work completes.
if AtomicIncrement(&_pendingTickCount) == 1 { if AtomicIncrement(&_pendingTickCount) == 1 {
self.tick(.DispatchStart, scheduler: scheduler) self.tick(command: .DispatchStart, scheduler: scheduler)
} }
case .DispatchStart: case .DispatchStart:
@ -56,7 +56,7 @@ class SchedulePeriodicRecursive<State> {
// Start work and schedule check is this last batch of work // Start work and schedule check is this last batch of work
if AtomicDecrement(&_pendingTickCount) > 0 { if AtomicDecrement(&_pendingTickCount) > 0 {
// This gives priority to scheduler emulation, it's not perfect, but helps // This gives priority to scheduler emulation, it's not perfect, but helps
scheduler.schedule(SchedulePeriodicRecursiveCommand.DispatchStart) scheduler.schedule(state: SchedulePeriodicRecursiveCommand.DispatchStart)
} }
} }
} }

View File

@ -31,7 +31,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
- returns: Current time. - returns: Current time.
*/ */
public var now: RxTime { public var now: RxTime {
return _converter.convertFromVirtualTime(clock) return _converter.convertFromVirtualTime(virtualTime: clock)
} }
/** /**
@ -51,7 +51,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
_running = false _running = false
_converter = converter _converter = converter
_schedulerQueue = PriorityQueue(hasHigherPriority: { _schedulerQueue = PriorityQueue(hasHigherPriority: {
switch converter.compareVirtualTime($0.time, $1.time) { switch converter.compareVirtualTime(lhs: $0.time, $1.time) {
case .LessThan: case .LessThan:
return true return true
case .Equal: case .Equal:
@ -73,7 +73,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
- returns: The disposable object used to cancel the scheduled action (best effort). - returns: The disposable object used to cancel the scheduled action (best effort).
*/ */
public func schedule<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable { public func schedule<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
return self.scheduleRelative(state, dueTime: 0.0) { a in return self.scheduleRelative(state: state, dueTime: 0.0) { a in
return action(a) return action(a)
} }
} }
@ -88,8 +88,8 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
*/ */
public func scheduleRelative<StateType>(state: StateType, dueTime: RxTimeInterval, action: StateType -> Disposable) -> Disposable { public func scheduleRelative<StateType>(state: StateType, dueTime: RxTimeInterval, action: StateType -> Disposable) -> Disposable {
let time = self.now.addingTimeInterval(dueTime) let time = self.now.addingTimeInterval(dueTime)
let absoluteTime = _converter.convertToVirtualTime(time) let absoluteTime = _converter.convertToVirtualTime(time: time)
let adjustedTime = self.adjustScheduledTime(absoluteTime) let adjustedTime = self.adjustScheduledTime(time: absoluteTime)
return scheduleAbsoluteVirtual(state, time: adjustedTime, action: action) return scheduleAbsoluteVirtual(state, time: adjustedTime, action: action)
} }
@ -103,7 +103,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
*/ */
public func scheduleRelativeVirtual<StateType>(state: StateType, dueTime: VirtualTimeInterval, action: StateType -> Disposable) -> Disposable { public func scheduleRelativeVirtual<StateType>(state: StateType, dueTime: VirtualTimeInterval, action: StateType -> Disposable) -> Disposable {
let time = _converter.offsetVirtualTime(time: self.clock, offset: dueTime) let time = _converter.offsetVirtualTime(time: self.clock, offset: dueTime)
return scheduleAbsoluteVirtual(state, time: time, action: action) return scheduleAbsoluteVirtual(state: state, time: time, action: action)
} }
/** /**
@ -126,9 +126,9 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
_nextId += 1 _nextId += 1
_schedulerQueue.enqueue(item) _schedulerQueue.enqueue(element: item)
compositeDisposable.addDisposable(item) compositeDisposable.addDisposable(disposable: item)
return compositeDisposable return compositeDisposable
} }
@ -156,12 +156,12 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
break break
} }
if _converter.compareVirtualTime(next.time, self.clock).greaterThan { if _converter.compareVirtualTime(lhs: next.time, self.clock).greaterThan {
_clock = next.time _clock = next.time
} }
next.invoke() next.invoke()
_schedulerQueue.remove(next) _schedulerQueue.remove(element: next)
} while _running } while _running
_running = false _running = false
@ -170,7 +170,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
func findNext() -> VirtualSchedulerItem<VirtualTime>? { func findNext() -> VirtualSchedulerItem<VirtualTime>? {
while let front = _schedulerQueue.peek() { while let front = _schedulerQueue.peek() {
if front.disposed { if front.disposed {
_schedulerQueue.remove(front) _schedulerQueue.remove(element: front)
continue continue
} }
@ -198,16 +198,16 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
break break
} }
if _converter.compareVirtualTime(next.time, virtualTime).greaterThan { if _converter.compareVirtualTime(lhs: next.time, virtualTime).greaterThan {
break break
} }
if _converter.compareVirtualTime(next.time, self.clock).greaterThan { if _converter.compareVirtualTime(lhs: next.time, self.clock).greaterThan {
_clock = next.time _clock = next.time
} }
next.invoke() next.invoke()
_schedulerQueue.remove(next) _schedulerQueue.remove(element: next)
} while _running } while _running
_clock = virtualTime _clock = virtualTime
@ -221,7 +221,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
MainScheduler.ensureExecutingOnScheduler() MainScheduler.ensureExecutingOnScheduler()
let sleepTo = _converter.offsetVirtualTime(time: clock, offset: virtualInterval) let sleepTo = _converter.offsetVirtualTime(time: clock, offset: virtualInterval)
if _converter.compareVirtualTime(sleepTo, clock).lessThen { if _converter.compareVirtualTime(lhs: sleepTo, clock).lessThen {
fatalError("Can't sleep to past.") fatalError("Can't sleep to past.")
} }

View File

@ -52,7 +52,7 @@ final public class PublishSubject<Element>
*/ */
public func on(event: Event<Element>) { public func on(event: Event<Element>) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_on(event) _synchronized_on(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -62,12 +62,12 @@ final public class PublishSubject<Element>
return return
} }
_observers.on(event) _observers.on(event: event)
case .Completed, .Error: case .Completed, .Error:
if _stoppedEvent == nil { if _stoppedEvent == nil {
_stoppedEvent = event _stoppedEvent = event
_stopped = true _stopped = true
_observers.on(event) _observers.on(event: event)
_observers.removeAll() _observers.removeAll()
} }
} }
@ -81,31 +81,31 @@ final public class PublishSubject<Element>
*/ */
public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer) return _synchronized_subscribe(observer: observer)
} }
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if let stoppedEvent = _stoppedEvent { if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent) observer.on(event: stoppedEvent)
return NopDisposable.instance return NopDisposable.instance
} }
if _disposed { if _disposed {
observer.on(.Error(RxError.Disposed(object: self))) observer.on(event: .Error(RxError.Disposed(object: self)))
return NopDisposable.instance return NopDisposable.instance
} }
let key = _observers.insert(observer.asObserver()) let key = _observers.insert(element: observer.asObserver())
return SubscriptionDisposable(owner: self, key: key) return SubscriptionDisposable(owner: self, key: key)
} }
func synchronizedUnsubscribe(disposeKey: DisposeKey) { func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey) _synchronized_unsubscribe(disposeKey: disposeKey)
} }
func _synchronized_unsubscribe(disposeKey: DisposeKey) { func _synchronized_unsubscribe(disposeKey: DisposeKey) {
_ = _observers.removeKey(disposeKey) _ = _observers.removeKey(key: disposeKey)
} }
/** /**

View File

@ -98,7 +98,7 @@ class ReplayBufferBase<Element>
override func on(event: Event<Element>) { override func on(event: Event<Element>) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_on(event) _synchronized_on(event: event)
} }
func _synchronized_on(event: Event<E>) { func _synchronized_on(event: Event<E>) {
@ -112,44 +112,44 @@ class ReplayBufferBase<Element>
switch event { switch event {
case .Next(let value): case .Next(let value):
addValueToBuffer(value) addValueToBuffer(value: value)
trim() trim()
_observers.on(event) _observers.on(event: event)
case .Error, .Completed: case .Error, .Completed:
_stoppedEvent = event _stoppedEvent = event
trim() trim()
_observers.on(event) _observers.on(event: event)
_observers.removeAll() _observers.removeAll()
} }
} }
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer) return _synchronized_subscribe(observer: observer)
} }
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if _disposed { if _disposed {
observer.on(.Error(RxError.Disposed(object: self))) observer.on(event: .Error(RxError.Disposed(object: self)))
return NopDisposable.instance return NopDisposable.instance
} }
let AnyObserver = observer.asObserver() let AnyObserver = observer.asObserver()
replayBuffer(AnyObserver) replayBuffer(observer: AnyObserver)
if let stoppedEvent = _stoppedEvent { if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent) observer.on(event: stoppedEvent)
return NopDisposable.instance return NopDisposable.instance
} }
else { else {
let key = _observers.insert(AnyObserver) let key = _observers.insert(element: AnyObserver)
return SubscriptionDisposable(owner: self, key: key) return SubscriptionDisposable(owner: self, key: key)
} }
} }
func synchronizedUnsubscribe(disposeKey: DisposeKey) { func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() } _lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey) _synchronized_unsubscribe(disposeKey: disposeKey)
} }
func _synchronized_unsubscribe(disposeKey: DisposeKey) { func _synchronized_unsubscribe(disposeKey: DisposeKey) {
@ -157,7 +157,7 @@ class ReplayBufferBase<Element>
return return
} }
_ = _observers.removeKey(disposeKey) _ = _observers.removeKey(key: disposeKey)
} }
override func dispose() { override func dispose() {
@ -195,7 +195,7 @@ final class ReplayOne<Element> : ReplayBufferBase<Element> {
override func replayBuffer(observer: AnyObserver<Element>) { override func replayBuffer(observer: AnyObserver<Element>) {
if let value = _value { if let value = _value {
observer.on(.Next(value)) observer.on(event: .Next(value))
} }
} }
@ -213,12 +213,12 @@ class ReplayManyBase<Element> : ReplayBufferBase<Element> {
} }
override func addValueToBuffer(value: Element) { override func addValueToBuffer(value: Element) {
_queue.enqueue(value) _queue.enqueue(element: value)
} }
override func replayBuffer(observer: AnyObserver<E>) { override func replayBuffer(observer: AnyObserver<E>) {
for item in _queue { for item in _queue {
observer.on(.Next(item)) observer.on(event: .Next(item))
} }
} }