mirror of
https://github.com/ReactiveX/RxSwift.git
synced 2024-10-04 22:17:41 +03:00
Fix #1778.
This commit is contained in:
parent
212e9d1cc6
commit
bac8634608
@ -179,6 +179,8 @@ final fileprivate class ObserveOnSerialDispatchQueueSink<O: ObserverType> : Obse
|
||||
super.init()
|
||||
|
||||
cachedScheduleLambda = { pair in
|
||||
guard !cancel.isDisposed else { return Disposables.create() }
|
||||
|
||||
pair.sink.observer.on(pair.event)
|
||||
|
||||
if pair.event.isStopEvent {
|
||||
|
@ -87,8 +87,6 @@ extension ObservableObserveOnTest {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
XCTAssert(didExecute)
|
||||
}
|
||||
|
||||
@ -320,6 +318,109 @@ extension ObservableObserveOnTest {
|
||||
#endif
|
||||
}
|
||||
|
||||
// Because of `self.wait(for: [blockingTheSerialScheduler], timeout: 1.0)`.
|
||||
// Testing this on Unix should be enough.
|
||||
#if !os(Linux)
|
||||
// Test event is cancelled properly.
|
||||
extension ObservableObserveOnTest {
|
||||
func testDisposeWithEnqueuedElement() {
|
||||
let emit = PublishSubject<Int>()
|
||||
let blockingTheSerialScheduler = self.expectation(description: "blocking")
|
||||
let unblock = self.expectation(description: "unblock")
|
||||
let testDone = self.expectation(description: "test done")
|
||||
let scheduler = SerialDispatchQueueScheduler(qos: .default)
|
||||
var events: [Event<Int>] = []
|
||||
let subscription = emit.observeOn(scheduler).subscribe { update in
|
||||
switch update {
|
||||
case .next(let value):
|
||||
if value == 0 {
|
||||
blockingTheSerialScheduler.fulfill()
|
||||
self.wait(for: [unblock], timeout: 1.0)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
events.append(update)
|
||||
}
|
||||
emit.on(.next(0))
|
||||
self.wait(for: [blockingTheSerialScheduler], timeout: 1.0)
|
||||
emit.on(.next(1))
|
||||
_ = scheduler.schedule(()) { _ in
|
||||
testDone.fulfill()
|
||||
return Disposables.create()
|
||||
}
|
||||
subscription.dispose()
|
||||
unblock.fulfill()
|
||||
self.wait(for: [testDone], timeout: 1.0)
|
||||
XCTAssertEqual(events, [.next(0)])
|
||||
}
|
||||
|
||||
func testDisposeWithEnqueuedError() {
|
||||
let emit = PublishSubject<Int>()
|
||||
let blockingTheSerialScheduler = self.expectation(description: "blocking")
|
||||
let unblock = self.expectation(description: "unblock")
|
||||
let testDone = self.expectation(description: "test done")
|
||||
let scheduler = SerialDispatchQueueScheduler(qos: .default)
|
||||
var events: [Event<Int>] = []
|
||||
let subscription = emit.observeOn(scheduler).subscribe { update in
|
||||
switch update {
|
||||
case .next(let value):
|
||||
if value == 0 {
|
||||
blockingTheSerialScheduler.fulfill()
|
||||
self.wait(for: [unblock], timeout: 1.0)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
events.append(update)
|
||||
}
|
||||
emit.on(.next(0))
|
||||
self.wait(for: [blockingTheSerialScheduler], timeout: 1.0)
|
||||
emit.on(.error(TestError.dummyError))
|
||||
_ = scheduler.schedule(()) { _ in
|
||||
testDone.fulfill()
|
||||
return Disposables.create()
|
||||
}
|
||||
subscription.dispose()
|
||||
unblock.fulfill()
|
||||
self.wait(for: [testDone], timeout: 1.0)
|
||||
XCTAssertEqual(events, [.next(0)])
|
||||
}
|
||||
|
||||
func testDisposeWithEnqueuedCompleted() {
|
||||
let emit = PublishSubject<Int>()
|
||||
let blockingTheSerialScheduler = self.expectation(description: "blocking")
|
||||
let unblock = self.expectation(description: "unblock")
|
||||
let testDone = self.expectation(description: "test done")
|
||||
let scheduler = SerialDispatchQueueScheduler(qos: .default)
|
||||
var events: [Event<Int>] = []
|
||||
let subscription = emit.observeOn(scheduler).subscribe { update in
|
||||
switch update {
|
||||
case .next(let value):
|
||||
if value == 0 {
|
||||
blockingTheSerialScheduler.fulfill()
|
||||
self.wait(for: [unblock], timeout: 1.0)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
events.append(update)
|
||||
}
|
||||
emit.on(.next(0))
|
||||
self.wait(for: [blockingTheSerialScheduler], timeout: 1.0)
|
||||
emit.on(.completed)
|
||||
_ = scheduler.schedule(()) { _ in
|
||||
testDone.fulfill()
|
||||
return Disposables.create()
|
||||
}
|
||||
subscription.dispose()
|
||||
unblock.fulfill()
|
||||
self.wait(for: [testDone], timeout: 1.0)
|
||||
XCTAssertEqual(events, [.next(0)])
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// observeOn concurrent scheduler
|
||||
class ObservableObserveOnTestConcurrentSchedulerTest: ObservableObserveOnTestBase {
|
||||
|
||||
|
@ -46,7 +46,10 @@ let excludedTests: [String] = [
|
||||
"testShareReplayLatestWhileConnectedDisposableDoesntRetainAnything",
|
||||
"testSingle_DecrementCountsFirst",
|
||||
"testSinglePredicate_DecrementCountsFirst",
|
||||
"testLockUnlockCountsResources"
|
||||
"testLockUnlockCountsResources",
|
||||
"testDisposeWithEnqueuedElement",
|
||||
"testDisposeWithEnqueuedError",
|
||||
"testDisposeWithEnqueuedCompleted",
|
||||
]
|
||||
|
||||
func excludeTest(_ name: String) -> Bool {
|
||||
|
Loading…
Reference in New Issue
Block a user