Removes unnecessary subscribeSafe in case there is already an Observable source.

This commit is contained in:
Krunoslav Zaher 2015-10-24 23:45:09 +02:00
parent a861028d4f
commit d1666db4d1
41 changed files with 377 additions and 391 deletions

View File

@ -59,8 +59,6 @@
C8093CFE1B8A72BE0088E94D /* Observable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C681B8A72BE0088E94D /* Observable.swift */; };
C8093CFF1B8A72BE0088E94D /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6B1B8A72BE0088E94D /* Amb.swift */; };
C8093D001B8A72BE0088E94D /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6B1B8A72BE0088E94D /* Amb.swift */; };
C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
C8093D051B8A72BE0088E94D /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6E1B8A72BE0088E94D /* Catch.swift */; };
C8093D061B8A72BE0088E94D /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6E1B8A72BE0088E94D /* Catch.swift */; };
C8093D071B8A72BE0088E94D /* CombineLatest+arity.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6F1B8A72BE0088E94D /* CombineLatest+arity.swift */; };
@ -393,7 +391,6 @@
C8F0BFA81BBBFB8B001B112F /* Observable+Time.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C9D1B8A72BE0088E94D /* Observable+Time.swift */; };
C8F0BFA91BBBFB8B001B112F /* Observable+Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C671B8A72BE0088E94D /* Observable+Extensions.swift */; };
C8F0BFAA1BBBFB8B001B112F /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C901B8A72BE0088E94D /* Throttle.swift */; };
C8F0BFAB1BBBFB8B001B112F /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
C8F0BFAC1BBBFB8B001B112F /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6E1B8A72BE0088E94D /* Catch.swift */; };
C8F0BFAD1BBBFB8B001B112F /* CombineLatest.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C711B8A72BE0088E94D /* CombineLatest.swift */; };
C8F0BFAE1BBBFB8B001B112F /* Observable+Multiple.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C9A1B8A72BE0088E94D /* Observable+Multiple.swift */; };
@ -647,7 +644,6 @@
D2EBEAFB1BB9B6B2003A27DC /* StableCompositeDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C611B8A72BE0088E94D /* StableCompositeDisposable.swift */; };
D2EBEAFC1BB9B6BA003A27DC /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6B1B8A72BE0088E94D /* Amb.swift */; };
D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */; };
D2EBEAFE1BB9B6BA003A27DC /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
D2EBEAFF1BB9B6BA003A27DC /* Buffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = C821DBA11BA4DCAB008F3809 /* Buffer.swift */; };
D2EBEB001BB9B6BA003A27DC /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6E1B8A72BE0088E94D /* Catch.swift */; };
D2EBEB011BB9B6BA003A27DC /* CombineLatest.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C711B8A72BE0088E94D /* CombineLatest.swift */; };
@ -818,7 +814,6 @@
C8093C671B8A72BE0088E94D /* Observable+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+Extensions.swift"; sourceTree = "<group>"; };
C8093C681B8A72BE0088E94D /* Observable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Observable.swift; sourceTree = "<group>"; };
C8093C6B1B8A72BE0088E94D /* Amb.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Amb.swift; sourceTree = "<group>"; };
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AsObservable.swift; sourceTree = "<group>"; };
C8093C6E1B8A72BE0088E94D /* Catch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Catch.swift; sourceTree = "<group>"; };
C8093C6F1B8A72BE0088E94D /* CombineLatest+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+arity.swift"; sourceTree = "<group>"; };
C8093C701B8A72BE0088E94D /* CombineLatest+arity.tt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = "CombineLatest+arity.tt"; sourceTree = "<group>"; };
@ -1180,7 +1175,6 @@
D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */,
C8093C6B1B8A72BE0088E94D /* Amb.swift */,
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */,
C821DBA11BA4DCAB008F3809 /* Buffer.swift */,
C8093C6E1B8A72BE0088E94D /* Catch.swift */,
C8093C711B8A72BE0088E94D /* CombineLatest.swift */,
@ -2133,7 +2127,6 @@
C8093D641B8A72BE0088E94D /* Observable+Time.swift in Sources */,
C8093CFC1B8A72BE0088E94D /* Observable+Extensions.swift in Sources */,
C8093D4A1B8A72BE0088E94D /* Throttle.swift in Sources */,
C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */,
C8B145011BD2D80100267DCE /* ImmediateScheduler.swift in Sources */,
C8093D061B8A72BE0088E94D /* Catch.swift in Sources */,
C8093D0C1B8A72BE0088E94D /* CombineLatest.swift in Sources */,
@ -2255,7 +2248,6 @@
C8093D631B8A72BE0088E94D /* Observable+Time.swift in Sources */,
C8093CFB1B8A72BE0088E94D /* Observable+Extensions.swift in Sources */,
C8093D491B8A72BE0088E94D /* Throttle.swift in Sources */,
C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */,
C8B145001BD2D80100267DCE /* ImmediateScheduler.swift in Sources */,
C8093D051B8A72BE0088E94D /* Catch.swift in Sources */,
C8093D0B1B8A72BE0088E94D /* CombineLatest.swift in Sources */,
@ -2377,7 +2369,6 @@
C8F0BFA81BBBFB8B001B112F /* Observable+Time.swift in Sources */,
C8F0BFA91BBBFB8B001B112F /* Observable+Extensions.swift in Sources */,
C8F0BFAA1BBBFB8B001B112F /* Throttle.swift in Sources */,
C8F0BFAB1BBBFB8B001B112F /* AsObservable.swift in Sources */,
C8B145031BD2D80100267DCE /* ImmediateScheduler.swift in Sources */,
C8F0BFAC1BBBFB8B001B112F /* Catch.swift in Sources */,
C8F0BFAD1BBBFB8B001B112F /* CombineLatest.swift in Sources */,
@ -2722,7 +2713,6 @@
D2EBEAED1BB9B6A4003A27DC /* Bag.swift in Sources */,
D2EBEB1A1BB9B6C1003A27DC /* RefCount.swift in Sources */,
D2EBEB141BB9B6C1003A27DC /* Never.swift in Sources */,
D2EBEAFE1BB9B6BA003A27DC /* AsObservable.swift in Sources */,
D2EBEAE01BB9B697003A27DC /* Event.swift in Sources */,
D2EBEB411BB9B6DE003A27DC /* PublishSubject.swift in Sources */,
D2EBEB431BB9B6DE003A27DC /* SubjectType.swift in Sources */,

View File

@ -144,7 +144,6 @@
C89464B91BC6C2B00055219D /* ObservableConvertibleType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89464461BC6C2B00055219D /* ObservableConvertibleType.swift */; };
C89464BA1BC6C2B00055219D /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89464491BC6C2B00055219D /* Amb.swift */; };
C89464BB1BC6C2B00055219D /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C894644A1BC6C2B00055219D /* AnonymousObservable.swift */; };
C89464BC1BC6C2B00055219D /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C894644B1BC6C2B00055219D /* AsObservable.swift */; };
C89464BD1BC6C2B00055219D /* Buffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = C894644C1BC6C2B00055219D /* Buffer.swift */; };
C89464BE1BC6C2B00055219D /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C894644D1BC6C2B00055219D /* Catch.swift */; };
C89464BF1BC6C2B00055219D /* CombineLatest+arity.swift in Sources */ = {isa = PBXBuildFile; fileRef = C894644E1BC6C2B00055219D /* CombineLatest+arity.swift */; };
@ -524,7 +523,6 @@
C89464461BC6C2B00055219D /* ObservableConvertibleType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObservableConvertibleType.swift; sourceTree = "<group>"; };
C89464491BC6C2B00055219D /* Amb.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Amb.swift; sourceTree = "<group>"; };
C894644A1BC6C2B00055219D /* AnonymousObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = AnonymousObservable.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C894644B1BC6C2B00055219D /* AsObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AsObservable.swift; sourceTree = "<group>"; };
C894644C1BC6C2B00055219D /* Buffer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Buffer.swift; sourceTree = "<group>"; };
C894644D1BC6C2B00055219D /* Catch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Catch.swift; sourceTree = "<group>"; };
C894644E1BC6C2B00055219D /* CombineLatest+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+arity.swift"; sourceTree = "<group>"; };
@ -1091,7 +1089,6 @@
C84CC52D1BDC344100E06A64 /* ElementAt.swift */,
C89464491BC6C2B00055219D /* Amb.swift */,
C894644A1BC6C2B00055219D /* AnonymousObservable.swift */,
C894644B1BC6C2B00055219D /* AsObservable.swift */,
C894644C1BC6C2B00055219D /* Buffer.swift */,
C894644D1BC6C2B00055219D /* Catch.swift */,
C89464511BC6C2B00055219D /* CombineLatest.swift */,
@ -1756,7 +1753,6 @@
C89465981BC6C2BC0055219D /* UISearchBar+Rx.swift in Sources */,
C89464E21BC6C2B00055219D /* Take.swift in Sources */,
C89465901BC6C2BC0055219D /* UIButton+Rx.swift in Sources */,
C89464BC1BC6C2B00055219D /* AsObservable.swift in Sources */,
C89464DD1BC6C2B00055219D /* Sink.swift in Sources */,
C89464FE1BC6C2B00055219D /* CurrentThreadScheduler.swift in Sources */,
C89464BE1BC6C2B00055219D /* Catch.swift in Sources */,

View File

@ -123,6 +123,6 @@ public extension ObservableType {
*/
@warn_unused_result(message="http://git.io/rxs.ud")
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
return self.subscribe(observer)
return self.asObservable().subscribe(observer)
}
}

View File

@ -98,8 +98,8 @@ class AmbSink<ElementType, O: ObserverType where O.E == ElementType> : Sink<O> {
decide(o, e, .Right, subscription1)
}
subscription1.disposable = _parent._left.subscribeSafe(sink1)
subscription2.disposable = _parent._right.subscribeSafe(sink2)
subscription1.disposable = _parent._left.subscribe(sink1)
subscription2.disposable = _parent._right.subscribe(sink2)
return disposeAll
}

View File

@ -49,7 +49,7 @@ class BufferTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E =
func run() -> Disposable {
createTimer(_windowID)
return StableCompositeDisposable.create(_timerD, _parent._source.subscribeSafe(self))
return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self))
}
func startNewWindowAndSendCurrentOne() {

View File

@ -47,8 +47,8 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
func run() -> Disposable {
let d1 = SingleAssignmentDisposable()
_subscription.disposable = d1
d1.disposable = _parent._source.subscribeSafe(self)
d1.disposable = _parent._source.subscribe(self)
return _subscription
}
@ -65,7 +65,7 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
let observer = CatchSinkProxy(parent: self)
_subscription.disposable = catchSequence.subscribeSafe(observer)
_subscription.disposable = catchSequence.subscribe(observer)
}
catch let e {
observer?.on(.Error(e))

View File

@ -92,7 +92,7 @@ class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType wher
for i in parent.sources.startIndex ..< parent.sources.endIndex {
let index = j
let source = self.parent.sources[i].asObservable()
self.subscriptions[j].disposable = source.subscribeSafe(AnyObserver { event in
self.subscriptions[j].disposable = source.subscribe(AnyObserver { event in
self.on(event, atIndex: index)
})

View File

@ -34,10 +34,10 @@ class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
typealias R = O.E
typealias Parent = CombineLatest2<E1, E2, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -51,8 +51,8 @@ class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
return CompositeDisposable(disposables: [
subscription1,
@ -68,10 +68,10 @@ class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
class CombineLatest2<E1, E2, R> : Producer<R> {
typealias ResultSelector = (E1, E2) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
let _source1: Observable<E1>
let _source2: Observable<E2>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: ResultSelector) {
_source1 = source1
@ -111,11 +111,11 @@ class CombineLatestSink3_<E1, E2, E3, O: ObserverType> : CombineLatestSink<O> {
typealias R = O.E
typealias Parent = CombineLatest3<E1, E2, E3, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -131,9 +131,9 @@ class CombineLatestSink3_<E1, E2, E3, O: ObserverType> : CombineLatestSink<O> {
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
return CompositeDisposable(disposables: [
subscription1,
@ -150,11 +150,11 @@ class CombineLatestSink3_<E1, E2, E3, O: ObserverType> : CombineLatestSink<O> {
class CombineLatest3<E1, E2, E3, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, resultSelector: ResultSelector) {
_source1 = source1
@ -195,12 +195,12 @@ class CombineLatestSink4_<E1, E2, E3, E4, O: ObserverType> : CombineLatestSink<O
typealias R = O.E
typealias Parent = CombineLatest4<E1, E2, E3, E4, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
private var _latestElement4: E4! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -218,10 +218,10 @@ class CombineLatestSink4_<E1, E2, E3, E4, O: ObserverType> : CombineLatestSink<O
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription4.disposable = _parent._source4.subscribeSafe(observer4)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
subscription4.disposable = _parent._source4.subscribe(observer4)
return CompositeDisposable(disposables: [
subscription1,
@ -239,12 +239,12 @@ class CombineLatestSink4_<E1, E2, E3, E4, O: ObserverType> : CombineLatestSink<O
class CombineLatest4<E1, E2, E3, E4, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3, E4) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
private let _source4: Observable<E4>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
let _source4: Observable<E4>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, resultSelector: ResultSelector) {
_source1 = source1
@ -286,13 +286,13 @@ class CombineLatestSink5_<E1, E2, E3, E4, E5, O: ObserverType> : CombineLatestSi
typealias R = O.E
typealias Parent = CombineLatest5<E1, E2, E3, E4, E5, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
private var _latestElement4: E4! = nil
private var _latestElement5: E5! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
var _latestElement5: E5! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -312,11 +312,11 @@ class CombineLatestSink5_<E1, E2, E3, E4, E5, O: ObserverType> : CombineLatestSi
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription4.disposable = _parent._source4.subscribeSafe(observer4)
subscription5.disposable = _parent._source5.subscribeSafe(observer5)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
subscription4.disposable = _parent._source4.subscribe(observer4)
subscription5.disposable = _parent._source5.subscribe(observer5)
return CompositeDisposable(disposables: [
subscription1,
@ -335,13 +335,13 @@ class CombineLatestSink5_<E1, E2, E3, E4, E5, O: ObserverType> : CombineLatestSi
class CombineLatest5<E1, E2, E3, E4, E5, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3, E4, E5) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
private let _source4: Observable<E4>
private let _source5: Observable<E5>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
let _source4: Observable<E4>
let _source5: Observable<E5>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, resultSelector: ResultSelector) {
_source1 = source1
@ -384,14 +384,14 @@ class CombineLatestSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : CombineLate
typealias R = O.E
typealias Parent = CombineLatest6<E1, E2, E3, E4, E5, E6, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
private var _latestElement4: E4! = nil
private var _latestElement5: E5! = nil
private var _latestElement6: E6! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
var _latestElement5: E5! = nil
var _latestElement6: E6! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -413,12 +413,12 @@ class CombineLatestSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : CombineLate
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription4.disposable = _parent._source4.subscribeSafe(observer4)
subscription5.disposable = _parent._source5.subscribeSafe(observer5)
subscription6.disposable = _parent._source6.subscribeSafe(observer6)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
subscription4.disposable = _parent._source4.subscribe(observer4)
subscription5.disposable = _parent._source5.subscribe(observer5)
subscription6.disposable = _parent._source6.subscribe(observer6)
return CompositeDisposable(disposables: [
subscription1,
@ -438,14 +438,14 @@ class CombineLatestSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : CombineLate
class CombineLatest6<E1, E2, E3, E4, E5, E6, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3, E4, E5, E6) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
private let _source4: Observable<E4>
private let _source5: Observable<E5>
private let _source6: Observable<E6>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
let _source4: Observable<E4>
let _source5: Observable<E5>
let _source6: Observable<E6>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, resultSelector: ResultSelector) {
_source1 = source1
@ -489,15 +489,15 @@ class CombineLatestSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : Combine
typealias R = O.E
typealias Parent = CombineLatest7<E1, E2, E3, E4, E5, E6, E7, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
private var _latestElement4: E4! = nil
private var _latestElement5: E5! = nil
private var _latestElement6: E6! = nil
private var _latestElement7: E7! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
var _latestElement5: E5! = nil
var _latestElement6: E6! = nil
var _latestElement7: E7! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -521,13 +521,13 @@ class CombineLatestSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : Combine
let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription4.disposable = _parent._source4.subscribeSafe(observer4)
subscription5.disposable = _parent._source5.subscribeSafe(observer5)
subscription6.disposable = _parent._source6.subscribeSafe(observer6)
subscription7.disposable = _parent._source7.subscribeSafe(observer7)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
subscription4.disposable = _parent._source4.subscribe(observer4)
subscription5.disposable = _parent._source5.subscribe(observer5)
subscription6.disposable = _parent._source6.subscribe(observer6)
subscription7.disposable = _parent._source7.subscribe(observer7)
return CompositeDisposable(disposables: [
subscription1,
@ -548,15 +548,15 @@ class CombineLatestSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : Combine
class CombineLatest7<E1, E2, E3, E4, E5, E6, E7, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3, E4, E5, E6, E7) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
private let _source4: Observable<E4>
private let _source5: Observable<E5>
private let _source6: Observable<E6>
private let _source7: Observable<E7>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
let _source4: Observable<E4>
let _source5: Observable<E5>
let _source6: Observable<E6>
let _source7: Observable<E7>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, source7: Observable<E7>, resultSelector: ResultSelector) {
_source1 = source1
@ -601,16 +601,16 @@ class CombineLatestSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : Com
typealias R = O.E
typealias Parent = CombineLatest8<E1, E2, E3, E4, E5, E6, E7, E8, R>
private let _parent: Parent
let _parent: Parent
private var _latestElement1: E1! = nil
private var _latestElement2: E2! = nil
private var _latestElement3: E3! = nil
private var _latestElement4: E4! = nil
private var _latestElement5: E5! = nil
private var _latestElement6: E6! = nil
private var _latestElement7: E7! = nil
private var _latestElement8: E8! = nil
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
var _latestElement5: E5! = nil
var _latestElement6: E6! = nil
var _latestElement7: E7! = nil
var _latestElement8: E8! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
@ -636,14 +636,14 @@ class CombineLatestSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : Com
let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
let observer8 = CombineLatestObserver(lock: lock, parent: self, index: 7, setLatestValue: { (e: E8) -> Void in self._latestElement8 = e }, this: subscription8)
subscription1.disposable = _parent._source1.subscribeSafe(observer1)
subscription2.disposable = _parent._source2.subscribeSafe(observer2)
subscription3.disposable = _parent._source3.subscribeSafe(observer3)
subscription4.disposable = _parent._source4.subscribeSafe(observer4)
subscription5.disposable = _parent._source5.subscribeSafe(observer5)
subscription6.disposable = _parent._source6.subscribeSafe(observer6)
subscription7.disposable = _parent._source7.subscribeSafe(observer7)
subscription8.disposable = _parent._source8.subscribeSafe(observer8)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
subscription3.disposable = _parent._source3.subscribe(observer3)
subscription4.disposable = _parent._source4.subscribe(observer4)
subscription5.disposable = _parent._source5.subscribe(observer5)
subscription6.disposable = _parent._source6.subscribe(observer6)
subscription7.disposable = _parent._source7.subscribe(observer7)
subscription8.disposable = _parent._source8.subscribe(observer8)
return CompositeDisposable(disposables: [
subscription1,
@ -665,16 +665,16 @@ class CombineLatestSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : Com
class CombineLatest8<E1, E2, E3, E4, E5, E6, E7, E8, R> : Producer<R> {
typealias ResultSelector = (E1, E2, E3, E4, E5, E6, E7, E8) throws -> R
private let _source1: Observable<E1>
private let _source2: Observable<E2>
private let _source3: Observable<E3>
private let _source4: Observable<E4>
private let _source5: Observable<E5>
private let _source6: Observable<E6>
private let _source7: Observable<E7>
private let _source8: Observable<E8>
let _source1: Observable<E1>
let _source2: Observable<E2>
let _source3: Observable<E3>
let _source4: Observable<E4>
let _source5: Observable<E5>
let _source6: Observable<E6>
let _source7: Observable<E7>
let _source8: Observable<E8>
private let _resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, source7: Observable<E7>, source8: Observable<E8>, resultSelector: ResultSelector) {
_source1 = source1

View File

@ -32,14 +32,14 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep
typealias R = O.E
typealias Parent = CombineLatest<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %>, R>
let parent: Parent
let _parent: Parent
<%= (Array(1...i).map {
" var latestElement\($0): E\($0)! = nil"
" var _latestElement\($0): E\($0)! = nil"
}).joinWithSeparator("\n") %>
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: <%= i %>, observer: observer, cancel: cancel)
}
@ -49,11 +49,11 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" let observer\($0) = CombineLatestObserver(lock: lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self.latestElement\($0) = e }, this: subscription\($0))"
" let observer\($0) = CombineLatestObserver(lock: lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self._latestElement\($0) = e }, this: subscription\($0))"
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" subscription\($0).disposable = parent.source\($0).subscribeSafe(observer\($0))"
" subscription\($0).disposable = _parent._source\($0).subscribe(observer\($0))"
}).joinWithSeparator("\n") %>
return CompositeDisposable(disposables: [
@ -62,7 +62,7 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep
}
override func getResult() throws -> R {
return try self.parent.resultSelector(<%= (Array(1...i).map { "latestElement\($0)" }).joinWithSeparator(", ") %>)
return try _parent._resultSelector(<%= (Array(1...i).map { "_latestElement\($0)" }).joinWithSeparator(", ") %>)
}
}
@ -70,17 +70,17 @@ class CombineLatest<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparato
typealias ResultSelector = (<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %>) throws -> R
<%= (Array(1...i).map {
" let source\($0): Observable<E\($0)>"
" let _source\($0): Observable<E\($0)>"
}).joinWithSeparator("\n") %>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(<%= (Array(1...i).map { "source\($0): Observable<E\($0)>" }).joinWithSeparator(", ") %>, resultSelector: ResultSelector) {
<%= (Array(1...i).map {
" self.source\($0) = source\($0)"
" _source\($0) = source\($0)"
}).joinWithSeparator("\n") %>
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {

View File

@ -50,7 +50,7 @@ public class ConnectableObservable<S: SubjectType> : Observable<S.E>, Connectabl
private var _connection: ConnectionType?
public init(source: Observable<S.SubjectObserverType.E>, subject: S) {
_source = AsObservable(source: source)
_source = source
_subject = subject
_connection = nil
}
@ -61,7 +61,7 @@ public class ConnectableObservable<S: SubjectType> : Observable<S.E>, Connectabl
return connection
}
let disposable = _source.subscribeSafe(_subject.asObserver())
let disposable = _source.subscribe(_subject.asObserver())
let connection = Connection(parent: self, subscription: disposable)
_connection = connection
return connection
@ -69,6 +69,6 @@ public class ConnectableObservable<S: SubjectType> : Observable<S.E>, Connectabl
}
public override func subscribe<O : ObserverType where O.E == S.E>(observer: O) -> Disposable {
return _subject.subscribeSafe(observer)
return _subject.subscribe(observer)
}
}

View File

@ -49,6 +49,6 @@ class Debug<Element> : Producer<Element> {
print("[\(_identifier)] subscribed")
let sink = Debug_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -22,7 +22,7 @@ class DeferredSink<O: ObserverType> : Sink<O>, ObserverType {
func run() -> Disposable {
do {
let result = try _parent.eval()
return result.subscribeSafe(self)
return result.subscribe(self)
}
catch let e {
observer?.on(.Error(e))

View File

@ -45,7 +45,7 @@ class DelaySubscription<Element, S: SchedulerType>: Producer<Element> {
let sink = DelaySubscriptionSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in
return self._source.subscribeSafe(sink)
return self._source.subscribe(sink)
}
}
}

View File

@ -67,6 +67,6 @@ class DistinctUntilChanged<Element, Key>: Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DistinctUntilChangedSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -50,6 +50,6 @@ class Do<Element> : Producer<Element> {
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -54,6 +54,6 @@ class Filter<Element> : Producer<Element> {
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = FilterSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -118,7 +118,7 @@ class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType wher
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = _group.addDisposable(iterDisposable) {
let iter = FlatMapSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribeSafe(iter)
let subscription = source.subscribe(iter)
iterDisposable.disposable = subscription
}
}
@ -126,7 +126,7 @@ class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType wher
func run() -> Disposable {
_group.addDisposable(_sourceSubscription)
let subscription = _parent._source.subscribeSafe(self)
let subscription = _parent._source.subscribe(self)
_sourceSubscription.disposable = subscription
return _group

View File

@ -97,12 +97,12 @@ class Map<SourceType, ResultType>: Producer<ResultType> {
if let _ = _selector1 {
let sink = MapSink1(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
else {
let sink = MapSink2(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -66,9 +66,9 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
func run() -> Disposable {
_group.addDisposable(_sourceSubscription)
let disposable = _parent._sources.subscribeSafe(self)
let disposable = _parent._sources.subscribe(self)
_sourceSubscription.disposable = disposable
return _group
}
@ -80,7 +80,7 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
if let key = maybeKey {
let observer = MergeSinkIter(parent: self, disposeKey: key)
let disposable = value.asObservable().subscribeSafe(observer)
let disposable = value.asObservable().subscribe(observer)
innerSubscription.disposable = disposable
}
case .Error(let error):
@ -174,7 +174,7 @@ class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.
func run() -> Disposable {
_group.addDisposable(_sourceSubscription)
let disposable = _parent._sources.subscribeSafe(self)
let disposable = _parent._sources.subscribe(self)
_sourceSubscription.disposable = disposable
return _group
}
@ -187,7 +187,7 @@ class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.
if let key = key {
let observer = MergeConcurrentSinkIter(parent: self, disposeKey: key)
let disposable = innerSource.asObservable().subscribeSafe(observer)
let disposable = innerSource.asObservable().subscribe(observer)
subscription.disposable = disposable
}
}

View File

@ -27,7 +27,7 @@ class MulticastSink<S: SubjectType, O: ObserverType>: Sink<O>, ObserverType {
let observable = try _parent._selector(connectable)
let subscription = observable.subscribeSafe(self)
let subscription = observable.subscribe(self)
let connection = connectable.connect()
return BinaryDisposable(subscription, connection)

View File

@ -24,7 +24,7 @@ class ObserveOn<E> : Producer<E> {
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ObserveOnSink(scheduler: scheduler, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribeSafe(sink)
return source.subscribe(sink)
}
#if TRACE_RESOURCES

View File

@ -75,7 +75,7 @@ class ObserveOnSerialDispatchQueue<E> : Producer<E> {
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribeSafe(sink)
return source.subscribe(sink)
}
#if TRACE_RESOURCES

View File

@ -69,6 +69,6 @@ class Reduce<SourceType, AccumulateType, ResultType> : Producer<ResultType> {
override func run<O: ObserverType where O.E == ResultType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ReduceSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -72,8 +72,8 @@ class SampleSequenceSink<O: ObserverType, SampleType> : Sink<O>, ObserverType {
}
func run() -> Disposable {
_sourceSubscription.disposable = _parent._source.subscribeSafe(self)
let samplerSubscription = _parent._sampler.subscribeSafe(SamplerSink(parent: self))
_sourceSubscription.disposable = _parent._source.subscribe(self)
let samplerSubscription = _parent._sampler.subscribe(SamplerSink(parent: self))
return CompositeDisposable(_sourceSubscription, samplerSubscription)
}

View File

@ -59,6 +59,6 @@ class Scan<Element, Accumulate>: Producer<Accumulate> {
override func run<O : ObserverType where O.E == Accumulate>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ScanSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -57,7 +57,7 @@ class SkipCount<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribeSafe(sink)
return source.subscribe(sink)
}
}
@ -102,7 +102,7 @@ class SkipTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
return NopDisposable.instance
}
let disposeSubscription = parent.source.subscribeSafe(self)
let disposeSubscription = parent.source.subscribe(self)
return BinaryDisposable(disposeTimer, disposeSubscription)
}

View File

@ -102,9 +102,9 @@ class SkipUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
}
func run() -> Disposable {
let sourceSubscription = _parent._source.subscribeSafe(self)
let sourceSubscription = _parent._source.subscribe(self)
let otherObserver = SkipUntilSinkOther(parent: self)
let otherSubscription = _parent._other.subscribeSafe(otherObserver)
let otherSubscription = _parent._other.subscribe(otherObserver)
disposable = sourceSubscription
otherObserver.disposable = otherSubscription

View File

@ -104,12 +104,12 @@ class SkipWhile<Element>: Producer<Element> {
if let _ = _predicate {
let sink = SkipWhileSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
else {
let sink = SkipWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}
}

View File

@ -23,6 +23,6 @@ class StartWith<Element>: Producer<Element> {
observer.on(.Next(e))
}
return source.subscribeSafe(observer)
return source.subscribe(observer)
}
}

View File

@ -30,7 +30,7 @@ class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
}
func run() -> Disposable {
let subscription = _parent._sources.subscribeSafe(self)
let subscription = _parent._sources.subscribe(self)
_subscriptions.disposable = subscription
return CompositeDisposable(_subscriptions, _innerSubscription)
}
@ -48,7 +48,7 @@ class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
_innerSubscription.disposable = d
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
let disposable = observable.asObservable().subscribeSafe(observer)
let disposable = observable.asObservable().subscribe(observer)
d.disposable = disposable
case .Error(let error):
_lock.performLocked {

View File

@ -61,7 +61,7 @@ class TakeCount<Element>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = TakeCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}
@ -108,7 +108,7 @@ class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
return NopDisposable.instance
}
let disposeSubscription = _parent._source.subscribeSafe(self)
let disposeSubscription = _parent._source.subscribe(self)
return BinaryDisposable(disposeTimer, disposeSubscription)
}

View File

@ -97,9 +97,9 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
func run() -> Disposable {
let otherObserver = TakeUntilSinkOther(parent: self)
let otherSubscription = _parent._other.subscribeSafe(otherObserver)
let otherSubscription = _parent._other.subscribe(otherObserver)
otherObserver.disposable = otherSubscription
let sourceSubscription = _parent._source.subscribeSafe(self)
let sourceSubscription = _parent._source.subscribe(self)
return CompositeDisposable(sourceSubscription, otherSubscription)
}

View File

@ -118,11 +118,11 @@ class TakeWhile<Element>: Producer<Element> {
if let _ = _predicate {
let sink = TakeWhileSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
} else {
let sink = TakeWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}
}

View File

@ -29,7 +29,7 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
}
func run() -> Disposable {
let subscription = _parent._source.subscribeSafe(self)
let subscription = _parent._source.subscribe(self)
return CompositeDisposable(subscription, cancellable)
}

View File

@ -45,6 +45,6 @@ class ToArray<SourceType> : Producer<[SourceType]> {
override func run<O: ObserverType where O.E == [SourceType]>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ToArraySink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
return _source.subscribe(sink)
}
}

View File

@ -29,12 +29,12 @@ class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType where O.E
let source = try _parent._observableFactory(resource)
return StableCompositeDisposable.create(
source.subscribeSafe(self),
source.subscribe(self),
disposable
)
} catch let error {
return StableCompositeDisposable.create(
failWith(error).subscribeSafe(self),
failWith(error).subscribe(self),
disposable
)
}

View File

@ -30,8 +30,8 @@ class WithLatestFromSink<FirstO: ObservableType, SecondO: ObservableType, Result
let sndSubscription = SingleAssignmentDisposable()
let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
let fstSubscription = _parent._first.subscribeSafe(self)
sndSubscription.disposable = _parent._second.subscribeSafe(sndO)
let fstSubscription = _parent._first.subscribe(self)
sndSubscription.disposable = _parent._second.subscribe(sndO)
return StableCompositeDisposable.create(fstSubscription, sndSubscription)
}

View File

@ -105,7 +105,7 @@ class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Genera
for i in _parent.sources.startIndex ..< _parent.sources.endIndex {
let index = j
let source = _parent.sources[i].asObservable()
_subscriptions[j].disposable = source.subscribeSafe(AnyObserver { event in
_subscriptions[j].disposable = source.subscribe(AnyObserver { event in
self.on(event, atIndex: index)
})
j++

View File

@ -21,7 +21,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, resultSelector: (O1.E, O2.E) throws -> R)
-> Observable<R> {
return Zip2(
@ -34,20 +34,20 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip2<E1, E2, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -60,11 +60,11 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
return CompositeDisposable(disposables: [
subscription1,
@ -73,7 +73,7 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue())
}
}
@ -83,13 +83,13 @@ class Zip2<E1, E2, R> : Producer<R> {
let source1: Observable<E1>
let source2: Observable<E2>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: ResultSelector) {
self.source1 = source1
self.source2 = source2
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -110,7 +110,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, resultSelector: (O1.E, O2.E, O3.E) throws -> R)
-> Observable<R> {
return Zip3(
@ -123,22 +123,22 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip3<E1, E2, E3, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 3, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -152,13 +152,13 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
let subscription2 = SingleAssignmentDisposable()
let subscription3 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
return CompositeDisposable(disposables: [
subscription1,
@ -168,7 +168,7 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue())
}
}
@ -179,14 +179,14 @@ class Zip3<E1, E2, E3, R> : Producer<R> {
let source2: Observable<E2>
let source3: Observable<E3>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, resultSelector: ResultSelector) {
self.source1 = source1
self.source2 = source2
self.source3 = source3
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -207,7 +207,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, O4: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, O4: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, resultSelector: (O1.E, O2.E, O3.E, O4.E) throws -> R)
-> Observable<R> {
return Zip4(
@ -220,24 +220,24 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip4<E1, E2, E3, E4, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var values4: Queue<E4> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 4, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 3: return values4.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
case 3: return _values4.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -252,15 +252,15 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
let subscription3 = SingleAssignmentDisposable()
let subscription4 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self.values4.enqueue($0) }, this: subscription4)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription4.disposable = parent.source4.subscribeSafe(observer4)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
subscription4.disposable = _parent.source4.subscribe(observer4)
return CompositeDisposable(disposables: [
subscription1,
@ -271,7 +271,7 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue(), values4.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue(), _values4.dequeue())
}
}
@ -283,7 +283,7 @@ class Zip4<E1, E2, E3, E4, R> : Producer<R> {
let source3: Observable<E3>
let source4: Observable<E4>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, resultSelector: ResultSelector) {
self.source1 = source1
@ -291,7 +291,7 @@ class Zip4<E1, E2, E3, E4, R> : Producer<R> {
self.source3 = source3
self.source4 = source4
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -312,7 +312,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, O4: ObservableType, O5: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, O4: ObservableConvertibleType, O5: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, resultSelector: (O1.E, O2.E, O3.E, O4.E, O5.E) throws -> R)
-> Observable<R> {
return Zip5(
@ -325,26 +325,26 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip5<E1, E2, E3, E4, E5, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var values4: Queue<E4> = Queue(capacity: 2)
var values5: Queue<E5> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
var _values5: Queue<E5> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 5, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 3: return values4.count > 0
case 4: return values5.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
case 3: return _values4.count > 0
case 4: return _values5.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -360,17 +360,17 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
let subscription4 = SingleAssignmentDisposable()
let subscription5 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self.values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self.values5.enqueue($0) }, this: subscription5)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription4.disposable = parent.source4.subscribeSafe(observer4)
subscription5.disposable = parent.source5.subscribeSafe(observer5)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
subscription4.disposable = _parent.source4.subscribe(observer4)
subscription5.disposable = _parent.source5.subscribe(observer5)
return CompositeDisposable(disposables: [
subscription1,
@ -382,7 +382,7 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue(), values4.dequeue(), values5.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue(), _values4.dequeue(), _values5.dequeue())
}
}
@ -395,7 +395,7 @@ class Zip5<E1, E2, E3, E4, E5, R> : Producer<R> {
let source4: Observable<E4>
let source5: Observable<E5>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, resultSelector: ResultSelector) {
self.source1 = source1
@ -404,7 +404,7 @@ class Zip5<E1, E2, E3, E4, E5, R> : Producer<R> {
self.source4 = source4
self.source5 = source5
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -425,7 +425,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, O4: ObservableType, O5: ObservableType, O6: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, O4: ObservableConvertibleType, O5: ObservableConvertibleType, O6: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, _ source6: O6, resultSelector: (O1.E, O2.E, O3.E, O4.E, O5.E, O6.E) throws -> R)
-> Observable<R> {
return Zip6(
@ -438,28 +438,28 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip6<E1, E2, E3, E4, E5, E6, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var values4: Queue<E4> = Queue(capacity: 2)
var values5: Queue<E5> = Queue(capacity: 2)
var values6: Queue<E6> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
var _values5: Queue<E5> = Queue(capacity: 2)
var _values6: Queue<E6> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 6, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 3: return values4.count > 0
case 4: return values5.count > 0
case 5: return values6.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
case 3: return _values4.count > 0
case 4: return _values5.count > 0
case 5: return _values6.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -476,19 +476,19 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
let subscription5 = SingleAssignmentDisposable()
let subscription6 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self.values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self.values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self.values6.enqueue($0) }, this: subscription6)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription4.disposable = parent.source4.subscribeSafe(observer4)
subscription5.disposable = parent.source5.subscribeSafe(observer5)
subscription6.disposable = parent.source6.subscribeSafe(observer6)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
subscription4.disposable = _parent.source4.subscribe(observer4)
subscription5.disposable = _parent.source5.subscribe(observer5)
subscription6.disposable = _parent.source6.subscribe(observer6)
return CompositeDisposable(disposables: [
subscription1,
@ -501,7 +501,7 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue(), values4.dequeue(), values5.dequeue(), values6.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue(), _values4.dequeue(), _values5.dequeue(), _values6.dequeue())
}
}
@ -515,7 +515,7 @@ class Zip6<E1, E2, E3, E4, E5, E6, R> : Producer<R> {
let source5: Observable<E5>
let source6: Observable<E6>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, resultSelector: ResultSelector) {
self.source1 = source1
@ -525,7 +525,7 @@ class Zip6<E1, E2, E3, E4, E5, E6, R> : Producer<R> {
self.source5 = source5
self.source6 = source6
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -546,7 +546,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, O4: ObservableType, O5: ObservableType, O6: ObservableType, O7: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, O4: ObservableConvertibleType, O5: ObservableConvertibleType, O6: ObservableConvertibleType, O7: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, _ source6: O6, _ source7: O7, resultSelector: (O1.E, O2.E, O3.E, O4.E, O5.E, O6.E, O7.E) throws -> R)
-> Observable<R> {
return Zip7(
@ -559,30 +559,30 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip7<E1, E2, E3, E4, E5, E6, E7, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var values4: Queue<E4> = Queue(capacity: 2)
var values5: Queue<E5> = Queue(capacity: 2)
var values6: Queue<E6> = Queue(capacity: 2)
var values7: Queue<E7> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
var _values5: Queue<E5> = Queue(capacity: 2)
var _values6: Queue<E6> = Queue(capacity: 2)
var _values7: Queue<E7> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 7, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 3: return values4.count > 0
case 4: return values5.count > 0
case 5: return values6.count > 0
case 6: return values7.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
case 3: return _values4.count > 0
case 4: return _values5.count > 0
case 5: return _values6.count > 0
case 6: return _values7.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -600,21 +600,21 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
let subscription6 = SingleAssignmentDisposable()
let subscription7 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self.values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self.values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self.values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self.values7.enqueue($0) }, this: subscription7)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription4.disposable = parent.source4.subscribeSafe(observer4)
subscription5.disposable = parent.source5.subscribeSafe(observer5)
subscription6.disposable = parent.source6.subscribeSafe(observer6)
subscription7.disposable = parent.source7.subscribeSafe(observer7)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
subscription4.disposable = _parent.source4.subscribe(observer4)
subscription5.disposable = _parent.source5.subscribe(observer5)
subscription6.disposable = _parent.source6.subscribe(observer6)
subscription7.disposable = _parent.source7.subscribe(observer7)
return CompositeDisposable(disposables: [
subscription1,
@ -628,7 +628,7 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue(), values4.dequeue(), values5.dequeue(), values6.dequeue(), values7.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue(), _values4.dequeue(), _values5.dequeue(), _values6.dequeue(), _values7.dequeue())
}
}
@ -643,7 +643,7 @@ class Zip7<E1, E2, E3, E4, E5, E6, E7, R> : Producer<R> {
let source6: Observable<E6>
let source7: Observable<E7>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, source7: Observable<E7>, resultSelector: ResultSelector) {
self.source1 = source1
@ -654,7 +654,7 @@ class Zip7<E1, E2, E3, E4, E5, E6, E7, R> : Producer<R> {
self.source6 = source6
self.source7 = source7
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
@ -675,7 +675,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<O1: ObservableType, O2: ObservableType, O3: ObservableType, O4: ObservableType, O5: ObservableType, O6: ObservableType, O7: ObservableType, O8: ObservableType, R>
public func zip<O1: ObservableConvertibleType, O2: ObservableConvertibleType, O3: ObservableConvertibleType, O4: ObservableConvertibleType, O5: ObservableConvertibleType, O6: ObservableConvertibleType, O7: ObservableConvertibleType, O8: ObservableConvertibleType, R>
(source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, _ source6: O6, _ source7: O7, _ source8: O8, resultSelector: (O1.E, O2.E, O3.E, O4.E, O5.E, O6.E, O7.E, O8.E) throws -> R)
-> Observable<R> {
return Zip8(
@ -688,32 +688,32 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
typealias R = O.E
typealias Parent = Zip8<E1, E2, E3, E4, E5, E6, E7, E8, R>
let parent: Parent
let _parent: Parent
var values1: Queue<E1> = Queue(capacity: 2)
var values2: Queue<E2> = Queue(capacity: 2)
var values3: Queue<E3> = Queue(capacity: 2)
var values4: Queue<E4> = Queue(capacity: 2)
var values5: Queue<E5> = Queue(capacity: 2)
var values6: Queue<E6> = Queue(capacity: 2)
var values7: Queue<E7> = Queue(capacity: 2)
var values8: Queue<E8> = Queue(capacity: 2)
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
var _values5: Queue<E5> = Queue(capacity: 2)
var _values6: Queue<E6> = Queue(capacity: 2)
var _values7: Queue<E7> = Queue(capacity: 2)
var _values8: Queue<E8> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: 8, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
case 0: return values1.count > 0
case 1: return values2.count > 0
case 2: return values3.count > 0
case 3: return values4.count > 0
case 4: return values5.count > 0
case 5: return values6.count > 0
case 6: return values7.count > 0
case 7: return values8.count > 0
case 0: return _values1.count > 0
case 1: return _values2.count > 0
case 2: return _values3.count > 0
case 3: return _values4.count > 0
case 4: return _values5.count > 0
case 5: return _values6.count > 0
case 6: return _values7.count > 0
case 7: return _values8.count > 0
default:
rxFatalError("Unhandled case (Function)")
@ -732,23 +732,23 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
let subscription7 = SingleAssignmentDisposable()
let subscription8 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self.values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self.values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self.values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self.values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self.values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self.values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self.values7.enqueue($0) }, this: subscription7)
let observer8 = ZipObserver(lock: lock, parent: self, index: 7, setNextValue: { self.values8.enqueue($0) }, this: subscription8)
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
let observer8 = ZipObserver(lock: lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8)
subscription1.disposable = parent.source1.subscribeSafe(observer1)
subscription2.disposable = parent.source2.subscribeSafe(observer2)
subscription3.disposable = parent.source3.subscribeSafe(observer3)
subscription4.disposable = parent.source4.subscribeSafe(observer4)
subscription5.disposable = parent.source5.subscribeSafe(observer5)
subscription6.disposable = parent.source6.subscribeSafe(observer6)
subscription7.disposable = parent.source7.subscribeSafe(observer7)
subscription8.disposable = parent.source8.subscribeSafe(observer8)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
subscription3.disposable = _parent.source3.subscribe(observer3)
subscription4.disposable = _parent.source4.subscribe(observer4)
subscription5.disposable = _parent.source5.subscribe(observer5)
subscription6.disposable = _parent.source6.subscribe(observer6)
subscription7.disposable = _parent.source7.subscribe(observer7)
subscription8.disposable = _parent.source8.subscribe(observer8)
return CompositeDisposable(disposables: [
subscription1,
@ -763,7 +763,7 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
}
override func getResult() throws -> R {
return try self.parent.resultSelector(values1.dequeue(), values2.dequeue(), values3.dequeue(), values4.dequeue(), values5.dequeue(), values6.dequeue(), values7.dequeue(), values8.dequeue())
return try _parent._resultSelector(_values1.dequeue(), _values2.dequeue(), _values3.dequeue(), _values4.dequeue(), _values5.dequeue(), _values6.dequeue(), _values7.dequeue(), _values8.dequeue())
}
}
@ -779,7 +779,7 @@ class Zip8<E1, E2, E3, E4, E5, E6, E7, E8, R> : Producer<R> {
let source7: Observable<E7>
let source8: Observable<E8>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, source3: Observable<E3>, source4: Observable<E4>, source5: Observable<E5>, source6: Observable<E6>, source7: Observable<E7>, source8: Observable<E8>, resultSelector: ResultSelector) {
self.source1 = source1
@ -791,7 +791,7 @@ class Zip8<E1, E2, E3, E4, E5, E6, E7, E8, R> : Producer<R> {
self.source7 = source7
self.source8 = source8
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {

View File

@ -19,7 +19,7 @@ Merges the specified observable sequences into one observable sequence by using
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func zip<<%= (Array(1...i).map { "O\($0): ObservableType" }).joinWithSeparator(", ") %>, R>
public func zip<<%= (Array(1...i).map { "O\($0): ObservableConvertibleType" }).joinWithSeparator(", ") %>, R>
(<%= (Array(1...i).map { "source\($0): O\($0)" }).joinWithSeparator(", _ ") %>, resultSelector: (<%= (Array(1...i).map { "O\($0).E" }).joinWithSeparator(", ") %>) throws -> R)
-> Observable<R> {
return Zip<%= i %>(
@ -32,21 +32,21 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
typealias R = O.E
typealias Parent = Zip<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %>, R>
let parent: Parent
let _parent: Parent
<%= (Array(1...i).map {
" var values\($0): Queue<E\($0)> = Queue(capacity: 2)"
" var _values\($0): Queue<E\($0)> = Queue(capacity: 2)"
}).joinWithSeparator("\n") %>
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
_parent = parent
super.init(arity: <%= i %>, observer: observer, cancel: cancel)
}
override func hasElements(index: Int) -> Bool {
switch (index) {
<%= (Array(0..<i).map {
" case \($0): return values\($0 + 1).count > 0\n"
" case \($0): return _values\($0 + 1).count > 0\n"
}).joinWithSeparator("") %>
default:
rxFatalError("Unhandled case \(index)")
@ -61,11 +61,11 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" let observer\($0) = ZipObserver(lock: lock, parent: self, index: \($0 - 1), setNextValue: { self.values\($0).enqueue($0) }, this: subscription\($0))"
" let observer\($0) = ZipObserver(lock: lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))"
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" subscription\($0).disposable = parent.source\($0).subscribeSafe(observer\($0))" }).joinWithSeparator("\n")
" subscription\($0).disposable = _parent.source\($0).subscribe(observer\($0))" }).joinWithSeparator("\n")
%>
return CompositeDisposable(disposables: [
@ -74,7 +74,7 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
}
override func getResult() throws -> R {
return try self.parent.resultSelector(<%= (Array(1...i).map { "values\($0).dequeue()" }).joinWithSeparator(", ") %>)
return try _parent._resultSelector(<%= (Array(1...i).map { "_values\($0).dequeue()" }).joinWithSeparator(", ") %>)
}
}
@ -83,14 +83,14 @@ class Zip<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %>
<%= (Array(1...i).map { " let source\($0): Observable<E\($0)>" }).joinWithSeparator("\n") %>
let resultSelector: ResultSelector
let _resultSelector: ResultSelector
init(<%= (Array(1...i).map { "source\($0): Observable<E\($0)>" }).joinWithSeparator(", ") %>, resultSelector: ResultSelector) {
<%= (Array(1...i).map {
" self.source\($0) = source\($0)" }).joinWithSeparator("\n")
%>
self.resultSelector = resultSelector
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {

View File

@ -101,7 +101,7 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
return
}
let subscription2 = next!.subscribeSafe(self)
let subscription2 = next!.subscribe(self)
_subscription.disposable = subscription2
}