Fixes deadlock with shareReplayWhileLatest. #1323

This commit is contained in:
Krunoslav Zaher 2017-07-12 22:22:48 +02:00
parent d215a2a8b0
commit d5a754d6dd
3 changed files with 63 additions and 120 deletions

View File

@ -190,12 +190,7 @@ extension ObservableType {
*/
public func shareReplay(_ bufferSize: Int)
-> Observable<E> {
if bufferSize == 1 {
return ShareReplay1(source: self.asObservable())
}
else {
return self.replay(bufferSize).refCount()
}
return self.replay(bufferSize).refCount()
}
}
@ -268,25 +263,30 @@ fileprivate final class ShareReplay1WhileConnectedConnection<Element>
_parent._connection = nil
}
_observers = Observers()
_subscription.dispose()
}
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
_lock.lock()
_synchronized_unsubscribe(disposeKey)
let shouldDisconnect = _synchronized_unsubscribe(disposeKey)
_lock.unlock()
if shouldDisconnect {
_subscription.dispose()
}
}
@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
return false
}
if _observers.count == 0 {
_synchronized_dispose()
return true
}
return false
}
#if TRACE_RESOURCES
@ -319,13 +319,13 @@ final fileprivate class ShareReplay1WhileConnected<Element>
let count = connection._observers.count
let disposable = connection._synchronized_subscribe(observer)
_lock.unlock()
if count == 0 {
connection.connect()
}
_lock.unlock()
return disposable
}
@ -411,25 +411,30 @@ fileprivate final class ShareWhileConnectedConnection<Element>
_parent._connection = nil
}
_observers = Observers()
_subscription.dispose()
}
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
_lock.lock()
_synchronized_unsubscribe(disposeKey)
let shouldDisconnect = _synchronized_unsubscribe(disposeKey)
_lock.unlock()
if shouldDisconnect {
_subscription.dispose()
}
}
@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
return false
}
if _observers.count == 0 {
_synchronized_dispose()
return true
}
return false
}
#if TRACE_RESOURCES
@ -463,12 +468,12 @@ final fileprivate class ShareWhileConnected<Element>
let disposable = connection._synchronized_subscribe(observer)
_lock.unlock()
if count == 0 {
connection.connect()
}
_lock.unlock()
return disposable
}
@ -489,101 +494,3 @@ final fileprivate class ShareWhileConnected<Element>
return connection
}
}
// optimized version of share replay for most common case
final fileprivate class ShareReplay1<Element>
: Observable<Element>
, ObserverType
, SynchronizedUnsubscribeType {
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
private let _source: Observable<Element>
private let _lock = RecursiveLock()
private var _connection: SingleAssignmentDisposable?
private var _element: Element?
private var _stopped = false
private var _stopEvent = nil as Event<Element>?
private var _observers = Observers()
init(source: Observable<Element>) {
self._source = source
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
_lock.lock()
let result = _synchronized_subscribe(observer)
_lock.unlock()
return result
}
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let element = self._element {
observer.on(.next(element))
}
if let stopEvent = self._stopEvent {
observer.on(stopEvent)
return Disposables.create()
}
let initialCount = self._observers.count
let disposeKey = self._observers.insert(observer.on)
if initialCount == 0 {
let connection = SingleAssignmentDisposable()
_connection = connection
connection.setDisposable(self._source.subscribe(self))
}
return SubscriptionDisposable(owner: self, key: disposeKey)
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
_lock.lock()
_synchronized_unsubscribe(disposeKey)
_lock.unlock()
}
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
}
if _observers.count == 0 {
_connection?.dispose()
_connection = nil
}
}
func on(_ event: Event<E>) {
dispatch(_synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
_lock.lock(); defer { _lock.unlock() }
if _stopped {
return Observers()
}
switch event {
case .next(let element):
_element = element
return _observers
case .error, .completed:
_stopEvent = event
_stopped = true
let observers = _observers
_observers = Observers()
_connection?.dispose()
_connection = nil
return observers
}
}
}

View File

@ -55,6 +55,7 @@ final class AnomaliesTest_ : AnomaliesTest, RxTestCase {
static var allTests: [(String, (AnomaliesTest_) -> () -> ())] { return [
("test936", AnomaliesTest.test936),
("test1323", AnomaliesTest.test1323),
("testSeparationBetweenOnAndSubscriptionLocks", AnomaliesTest.testSeparationBetweenOnAndSubscriptionLocks),
] }
}

View File

@ -68,6 +68,39 @@ extension AnomaliesTest {
}
}
func test1323() {
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
_ = share(Observable<Int>.create({ observer in
observer.on(.next(1))
Thread.sleep(forTimeInterval: 0.1)
observer.on(.completed)
return Disposables.create()
})
.flatMap { (int) -> Observable<Int> in
return Observable.create { (observer) -> Disposable in
DispatchQueue.global().async {
observer.onNext(int)
observer.onCompleted()
}
return Disposables.create()
}
})
.subscribe { (e) in
}
}
for op in [
{ $0.share(replay: 0, scope: .whileConnected) },
{ $0.share(replay: 0, scope: .forever) },
{ $0.share(replay: 1, scope: .whileConnected) },
{ $0.share(replay: 1, scope: .forever) },
{ $0.share(replay: 2, scope: .whileConnected) },
{ $0.share(replay: 2, scope: .forever) },
] as [(Observable<Int>) -> Observable<Int>] {
performSharingOperatorsTest(share: op)
}
}
func testSeparationBetweenOnAndSubscriptionLocks() {
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
for i in 0 ..< 1 {
@ -112,10 +145,12 @@ extension AnomaliesTest {
}
for op in [
{ $0.shareReplay(1) },
{ $0.replay(1).refCount() },
{ $0.publish().refCount() },
{ $0.shareReplayLatestWhileConnected() }
{ $0.share(replay: 0, scope: .whileConnected) },
{ $0.share(replay: 0, scope: .forever) },
{ $0.share(replay: 1, scope: .whileConnected) },
{ $0.share(replay: 1, scope: .forever) },
{ $0.share(replay: 2, scope: .whileConnected) },
{ $0.share(replay: 2, scope: .forever) },
] as [(Observable<Int>) -> Observable<Int>] {
performSharingOperatorsTest(share: op)
}