Fixes anomaly with multicast subscription disposal.

This commit is contained in:
Krunoslav Zaher 2016-04-11 00:04:58 +02:00
parent 18cf20820f
commit 39a87bfcfd
2 changed files with 189 additions and 10 deletions

View File

@ -27,31 +27,35 @@ public class ConnectableObservable<Element>
class Connection<S: SubjectType> : Disposable {
private var _lock: NSRecursiveLock
// state
private weak var _parent: ConnectableObservableAdapter<S>?
private var _parent: ConnectableObservableAdapter<S>?
private var _subscription : Disposable?
init(parent: ConnectableObservableAdapter<S>, subscription: Disposable) {
init(parent: ConnectableObservableAdapter<S>, lock: NSRecursiveLock, subscription: Disposable) {
_parent = parent
_subscription = subscription
_lock = lock
}
func dispose() {
guard let parent = _parent else { return }
_lock.lock(); defer { _lock.unlock() } // {
guard let parent = _parent else {
return
}
parent._lock.performLocked {
guard let oldSubscription = _subscription else {
return
}
_subscription = nil
if _parent?._connection === self {
if parent._connection === self {
parent._connection = nil
}
_parent = nil
oldSubscription.dispose()
}
// }
}
}
@ -80,7 +84,7 @@ class ConnectableObservableAdapter<S: SubjectType>
}
let disposable = _source.subscribe(_subject.asObserver())
let connection = Connection(parent: self, subscription: disposable)
let connection = Connection(parent: self, lock: _lock, subscription: disposable)
_connection = connection
return connection
}

View File

@ -15,6 +15,181 @@ class ObservableBindingTest : RxTest {
}
// multicast
extension ObservableBindingTest {
func testMulticast_Cold_Completed() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(40, 0),
next(90, 1),
next(150, 2),
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
completed(390)
])
let res = scheduler.start {
xs.multicast({ PublishSubject<Int>() }) { $0 }
}
XCTAssertEqual(res.events, [
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
completed(390)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 390)
])
}
func testMulticast_Cold_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(40, 0),
next(90, 1),
next(150, 2),
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
error(390, testError)
])
let res = scheduler.start {
xs.multicast({ PublishSubject<Int>() }) { $0 }
}
XCTAssertEqual(res.events, [
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
error(390, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 390)
])
}
func testMulticast_Cold_Dispose() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(40, 0),
next(90, 1),
next(150, 2),
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
])
let res = scheduler.start {
xs.multicast({ PublishSubject<Int>() }) { $0 }
}
XCTAssertEqual(res.events, [
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 1000)
])
}
func testMulticast_Cold_Zip() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(40, 0),
next(90, 1),
next(150, 2),
next(210, 3),
next(240, 4),
next(270, 5),
next(330, 6),
next(340, 7),
completed(390)
])
let res = scheduler.start {
xs.multicast({ PublishSubject<Int>() }) { Observable.zip($0, $0) { a, b in a + b } }
}
XCTAssertEqual(res.events, [
next(210, 6),
next(240, 8),
next(270, 10),
next(330, 12),
next(340, 14),
completed(390)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 390)
])
}
func testMulticast_SubjectSelectorThrows() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(210, 1),
next(240, 2),
completed(300)
])
let res = scheduler.start {
xs.multicast({ () throws -> PublishSubject<Int> in throw testError }) { $0 }
}
XCTAssertEqual(res.events, [
error(200, testError)
])
XCTAssertEqual(xs.subscriptions, [
])
}
func testMulticast_SelectorThrows() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(210, 1),
next(240, 2),
completed(300)
])
let res = scheduler.start {
xs.multicast({ PublishSubject<Int>() }) { _ -> Observable<Int> in throw testError }
}
XCTAssertEqual(res.events, [
error(200, testError)
])
XCTAssertEqual(xs.subscriptions, [
])
}
}
// refCount
extension ObservableBindingTest {
func testRefCount_DeadlockSimple() {