From 39a87bfcfd6f563460406fef2ab1a5f14d670257 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Mon, 11 Apr 2016 00:04:58 +0200 Subject: [PATCH] Fixes anomaly with `multicast` subscription disposal. --- .../ConnectableObservable.swift | 24 ++- .../Tests/Observable+BindingTest.swift | 175 ++++++++++++++++++ 2 files changed, 189 insertions(+), 10 deletions(-) diff --git a/RxSwift/Observables/Implementations/ConnectableObservable.swift b/RxSwift/Observables/Implementations/ConnectableObservable.swift index 9be64f0c..ce78baba 100644 --- a/RxSwift/Observables/Implementations/ConnectableObservable.swift +++ b/RxSwift/Observables/Implementations/ConnectableObservable.swift @@ -26,32 +26,36 @@ public class ConnectableObservable } class Connection : Disposable { - + + private var _lock: NSRecursiveLock // state - private weak var _parent: ConnectableObservableAdapter? + private var _parent: ConnectableObservableAdapter? private var _subscription : Disposable? - - init(parent: ConnectableObservableAdapter, subscription: Disposable) { + + init(parent: ConnectableObservableAdapter, lock: NSRecursiveLock, subscription: Disposable) { _parent = parent _subscription = subscription + _lock = lock } func dispose() { - guard let parent = _parent else { return } - - parent._lock.performLocked { + _lock.lock(); defer { _lock.unlock() } // { + guard let parent = _parent else { + return + } + guard let oldSubscription = _subscription else { return } _subscription = nil - if _parent?._connection === self { + if parent._connection === self { parent._connection = nil } _parent = nil oldSubscription.dispose() - } + // } } } @@ -80,7 +84,7 @@ class ConnectableObservableAdapter } let disposable = _source.subscribe(_subject.asObserver()) - let connection = Connection(parent: self, subscription: disposable) + let connection = Connection(parent: self, lock: _lock, subscription: disposable) _connection = connection return connection } diff --git a/Tests/RxSwiftTests/Tests/Observable+BindingTest.swift b/Tests/RxSwiftTests/Tests/Observable+BindingTest.swift index 9a3981f3..0b741218 100644 --- a/Tests/RxSwiftTests/Tests/Observable+BindingTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+BindingTest.swift @@ -15,6 +15,181 @@ class ObservableBindingTest : RxTest { } +// multicast +extension ObservableBindingTest { + func testMulticast_Cold_Completed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(40, 0), + next(90, 1), + next(150, 2), + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + completed(390) + ]) + + let res = scheduler.start { + xs.multicast({ PublishSubject() }) { $0 } + } + + XCTAssertEqual(res.events, [ + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + completed(390) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 390) + ]) + } + + func testMulticast_Cold_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(40, 0), + next(90, 1), + next(150, 2), + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + error(390, testError) + ]) + + let res = scheduler.start { + xs.multicast({ PublishSubject() }) { $0 } + } + + XCTAssertEqual(res.events, [ + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + error(390, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 390) + ]) + } + + func testMulticast_Cold_Dispose() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(40, 0), + next(90, 1), + next(150, 2), + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + ]) + + let res = scheduler.start { + xs.multicast({ PublishSubject() }) { $0 } + } + + XCTAssertEqual(res.events, [ + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 1000) + ]) + } + + func testMulticast_Cold_Zip() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(40, 0), + next(90, 1), + next(150, 2), + next(210, 3), + next(240, 4), + next(270, 5), + next(330, 6), + next(340, 7), + completed(390) + ]) + + let res = scheduler.start { + xs.multicast({ PublishSubject() }) { Observable.zip($0, $0) { a, b in a + b } } + } + + XCTAssertEqual(res.events, [ + next(210, 6), + next(240, 8), + next(270, 10), + next(330, 12), + next(340, 14), + completed(390) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 390) + ]) + } + + func testMulticast_SubjectSelectorThrows() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(210, 1), + next(240, 2), + completed(300) + ]) + + let res = scheduler.start { + xs.multicast({ () throws -> PublishSubject in throw testError }) { $0 } + } + + XCTAssertEqual(res.events, [ + error(200, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + ]) + } + + func testMulticast_SelectorThrows() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(210, 1), + next(240, 2), + completed(300) + ]) + + let res = scheduler.start { + xs.multicast({ PublishSubject() }) { _ -> Observable in throw testError } + } + + XCTAssertEqual(res.events, [ + error(200, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + ]) + } +} + // refCount extension ObservableBindingTest { func testRefCount_DeadlockSimple() {