Rename subscribeOn to subscribe(on:)

This commit is contained in:
freak4pc 2020-10-03 19:07:20 +03:00 committed by Shai Mishali
parent fa9c971d43
commit 4cde7d69ff
14 changed files with 66 additions and 25 deletions

View File

@ -274,7 +274,7 @@
self.result = dispatcher
.do(onSubscribed: { weakDelegateProxy?.checkSelectorIsObservable(selector); weakDelegateProxy?.reset() }, onDispose: { weakDelegateProxy?.reset() })
.share()
.subscribeOn(mainScheduler)
.subscribe(on: mainScheduler)
}
var on: (Event<[Any]>) -> Void {

View File

@ -26,7 +26,7 @@ public protocol ControlEventType : ObservableType {
- it delivers events on `MainScheduler.instance`.
**The implementation of `ControlEvent` will ensure that sequence of events is being subscribed on main scheduler
(`subscribeOn(ConcurrentMainScheduler.instance)` behavior).**
(`subscribe(on: ConcurrentMainScheduler.instance)` behavior).**
**It is the implementors responsibility to make sure that all other properties enumerated above are satisfied.**
@ -45,7 +45,7 @@ public struct ControlEvent<PropertyType> : ControlEventType {
/// - parameter events: Observable sequence that represents events.
/// - returns: Control event created with a observable sequence of events.
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self.events = events.subscribeOn(ConcurrentMainScheduler.instance)
self.events = events.subscribe(on: ConcurrentMainScheduler.instance)
}
/// Subscribes an observer to control events.

View File

@ -30,7 +30,7 @@ public protocol ControlPropertyType : ObservableType, ObserverType {
- it delivers events on `MainScheduler.instance`
**The implementation of `ControlProperty` will ensure that sequence of values is being subscribed on main scheduler
(`subscribeOn(ConcurrentMainScheduler.instance)` behavior).**
(`subscribe(on: ConcurrentMainScheduler.instance)` behavior).**
**It is implementor's responsibility to make sure that that all other properties enumerated above are satisfied.**
@ -53,7 +53,7 @@ public struct ControlProperty<PropertyType> : ControlPropertyType {
/// - returns: Control property created with a observable sequence of values and an observer that enables binding values
/// to property.
public init<Values: ObservableType, Sink: ObserverType>(values: Values, valueSink: Sink) where Element == Values.Element, Element == Sink.Element {
self.values = values.subscribeOn(ConcurrentMainScheduler.instance)
self.values = values.subscribe(on: ConcurrentMainScheduler.instance)
self.valueSink = valueSink.asObserver()
}

View File

@ -103,7 +103,7 @@ extension SharedSequence {
- returns: An observable sequence with no elements.
*/
public static func empty() -> SharedSequence<SharingStrategy, Element> {
SharedSequence(raw: Observable.empty().subscribeOn(SharingStrategy.scheduler))
SharedSequence(raw: Observable.empty().subscribe(on: SharingStrategy.scheduler))
}
/**
@ -122,7 +122,7 @@ extension SharedSequence {
- returns: An observable sequence containing the single specified element.
*/
public static func just(_ element: Element) -> SharedSequence<SharingStrategy, Element> {
SharedSequence(raw: Observable.just(element).subscribeOn(SharingStrategy.scheduler))
SharedSequence(raw: Observable.just(element).subscribe(on: SharingStrategy.scheduler))
}
/**

View File

@ -271,7 +271,7 @@ extension Observable {
.observe(on:scheduler.async)
// subscribe on is here because side-effects also need to be cancelable
// (smooths out any glitches caused by start-cancel immediately)
.subscribeOn(scheduler.async)
.subscribe(on: scheduler.async)
}
}

View File

@ -52,7 +52,7 @@ extension ObservableType where Element == Any {
}, onSubscribed: {
replaySubject.onNext(initialState)
})
.subscribeOn(scheduler)
.subscribe(on: scheduler)
.startWith(initialState)
.observe(on:scheduler)
}

View File

@ -7,6 +7,25 @@
//
extension ObservableType {
/**
Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
scheduler.
This operation is not commonly used.
This only performs the side-effects of subscription and unsubscription on the specified scheduler.
In order to invoke observer callbacks on a `scheduler`, use `observeOn`.
- seealso: [subscribeOn operator on reactivex.io](http://reactivex.io/documentation/operators/subscribeon.html)
- parameter scheduler: Scheduler to perform subscription and unsubscription actions on.
- returns: The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
/**
Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
@ -23,9 +42,10 @@ extension ObservableType {
- parameter scheduler: Scheduler to perform subscription and unsubscription actions on.
- returns: The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
@available(*, deprecated, renamed: "subscribe(on:)")
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
subscribe(on: scheduler)
}
}

View File

@ -139,9 +139,30 @@ extension PrimitiveSequence {
- parameter scheduler: Scheduler to perform subscription and unsubscription actions on.
- returns: The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public func subscribe(on scheduler: ImmediateSchedulerType)
-> PrimitiveSequence<Trait, Element> {
PrimitiveSequence(raw: self.source.subscribe(on: scheduler))
}
/**
Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
scheduler.
This operation is not commonly used.
This only performs the side-effects of subscription and unsubscription on the specified scheduler.
In order to invoke observer callbacks on a `scheduler`, use `observeOn`.
- seealso: [subscribeOn operator on reactivex.io](http://reactivex.io/documentation/operators/subscribeon.html)
- parameter scheduler: Scheduler to perform subscription and unsubscription actions on.
- returns: The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
@available(*, deprecated, renamed: "subscribe(on:)")
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> PrimitiveSequence<Trait, Element> {
PrimitiveSequence(raw: self.source.subscribeOn(scheduler))
subscribe(on: scheduler)
}
/**

View File

@ -48,7 +48,7 @@ extension ObservableBlockingTest {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
func operation1()->Observable<Int>{
return Observable.of(1, 2).subscribeOn(scheduler)
return Observable.of(1, 2).subscribe(on: scheduler)
}
let a = try! operation1().toBlocking().toArray()
@ -105,7 +105,7 @@ extension ObservableBlockingTest {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
func operation1()->Observable<Int>{
return Observable.just(1).subscribeOn(scheduler)
return Observable.just(1).subscribe(on: scheduler)
}
let a = try! operation1().toBlocking().first()
@ -162,7 +162,7 @@ extension ObservableBlockingTest {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .background)
func operation1()->Observable<Int>{
return Observable.just(1).subscribeOn(scheduler)
return Observable.just(1).subscribe(on: scheduler)
}
let a = try! operation1().toBlocking().last()
@ -310,7 +310,7 @@ extension ObservableBlockingTest {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
func operation1()->Observable<Int>{
return Observable.just(1).subscribeOn(scheduler)
return Observable.just(1).subscribe(on: scheduler)
}
let a = try! operation1().toBlocking().single()

View File

@ -22,7 +22,7 @@ class SharedSequenceTest: RxTest {
// test helpers that make sure that resulting driver operator honors definition
// * only one subscription is made and shared - shareReplay(1)
// * subscription is made on main thread - subscribeOn(ConcurrentMainScheduler.instance)
// * subscription is made on main thread - subscribe(on: ConcurrentMainScheduler.instance)
// * events are observed on main thread - observe(on:MainScheduler.instance)
// * it can't error out - it needs to have catch somewhere
extension SharedSequenceTest {

View File

@ -260,7 +260,7 @@ extension CompletableTest {
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start {
Completable.empty().subscribeOn(scheduler)
Completable.empty().subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [

View File

@ -337,7 +337,7 @@ extension MaybeTest {
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start {
Maybe.just(1).subscribeOn(scheduler)
Maybe.just(1).subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [

View File

@ -28,7 +28,7 @@ extension ObservableSubscribeOnTest {
}
let res = scheduler.start {
xs.subscribeOn(scheduler)
xs.subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [
@ -47,7 +47,7 @@ extension ObservableSubscribeOnTest {
])
let res = scheduler.start {
xs.subscribeOn(scheduler)
xs.subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [
@ -67,7 +67,7 @@ extension ObservableSubscribeOnTest {
])
let res = scheduler.start {
xs.subscribeOn(scheduler)
xs.subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [
@ -88,7 +88,7 @@ extension ObservableSubscribeOnTest {
])
let res = scheduler.start {
xs.subscribeOn(scheduler)
xs.subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [
@ -103,13 +103,13 @@ extension ObservableSubscribeOnTest {
#if TRACE_RESOURCES
func testSubscribeOnSerialReleasesResourcesOnComplete() {
let testScheduler = TestScheduler(initialClock: 0)
_ = Observable<Int>.just(1).subscribeOn(testScheduler).subscribe()
_ = Observable<Int>.just(1).subscribe(on: testScheduler).subscribe()
testScheduler.start()
}
func testSubscribeOnSerialReleasesResourcesOnError() {
let testScheduler = TestScheduler(initialClock: 0)
_ = Observable<Int>.error(testError).subscribeOn(testScheduler).subscribe()
_ = Observable<Int>.error(testError).subscribe(on: testScheduler).subscribe()
testScheduler.start()
}
#endif

View File

@ -274,7 +274,7 @@ extension SingleTest {
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start {
Single.just(1).subscribeOn(scheduler)
Single.just(1).subscribe(on: scheduler)
}
XCTAssertEqual(res.events, [