Changes nested SharedSequence strategy to use inner sharing strategy for result.

This commit is contained in:
Krunoslav Zaher 2017-09-03 15:19:14 +02:00
parent c8f410b5f5
commit 6d2d15cdea
No known key found for this signature in database
GPG Key ID: 74BC718B68EA3842
2 changed files with 41 additions and 41 deletions

View File

@ -44,7 +44,7 @@ extension SharedSequenceConvertibleType {
}
// MARK: switchLatest
extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType, E.SharingStrategy == SharingStrategy {
extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType {
/**
Transforms an observable sequence of observable sequences into an observable sequence
@ -55,12 +55,12 @@ extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType,
- returns: The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
public func switchLatest() -> SharedSequence<SharingStrategy, E.E> {
public func switchLatest() -> SharedSequence<E.SharingStrategy, E.E> {
let source: Observable<E.E> = self
.asObservable()
.map { $0.asSharedSequence() }
.switchLatest()
return SharedSequence<SharingStrategy, E.E>(source)
return SharedSequence<E.SharingStrategy, E.E>(source)
}
}
@ -76,12 +76,12 @@ extension SharedSequenceConvertibleType {
- returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an
Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
public func flatMapLatest<R>(_ selector: @escaping (E) -> SharedSequence<SharingStrategy, R>)
-> SharedSequence<SharingStrategy, R> {
public func flatMapLatest<Sharing, R>(_ selector: @escaping (E) -> SharedSequence<Sharing, R>)
-> SharedSequence<Sharing, R> {
let source: Observable<R> = self
.asObservable()
.flatMapLatest(selector)
return SharedSequence<SharingStrategy, R>(source)
return SharedSequence<Sharing, R>(source)
}
}
@ -95,12 +95,12 @@ extension SharedSequenceConvertibleType {
- parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel.
- returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated.
*/
public func flatMapFirst<R>(_ selector: @escaping (E) -> SharedSequence<SharingStrategy, R>)
-> SharedSequence<SharingStrategy, R> {
public func flatMapFirst<Sharing, R>(_ selector: @escaping (E) -> SharedSequence<Sharing, R>)
-> SharedSequence<Sharing, R> {
let source: Observable<R> = self
.asObservable()
.flatMapFirst(selector)
return SharedSequence<SharingStrategy, R>(source)
return SharedSequence<Sharing, R>(source)
}
}
@ -208,7 +208,7 @@ extension SharedSequenceConvertibleType {
- parameter selector: A transform function to apply to each element.
- returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.
*/
public func flatMap<R>(_ selector: @escaping (E) -> SharedSequence<SharingStrategy, R>) -> SharedSequence<SharingStrategy, R> {
public func flatMap<Sharing, R>(_ selector: @escaping (E) -> SharedSequence<Sharing, R>) -> SharedSequence<Sharing, R> {
let source = self.asObservable()
.flatMap(selector)
@ -261,17 +261,17 @@ extension SharedSequenceConvertibleType {
}
// MARK: merge
extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType, E.SharingStrategy == SharingStrategy {
extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType {
/**
Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence.
- returns: The observable sequence that merges the elements of the observable sequences.
*/
public func merge() -> SharedSequence<SharingStrategy, E.E> {
public func merge() -> SharedSequence<E.SharingStrategy, E.E> {
let source = self.asObservable()
.map { $0.asSharedSequence() }
.merge()
return SharedSequence<SharingStrategy, E.E>(source)
return SharedSequence<E.SharingStrategy, E.E>(source)
}
/**
@ -281,11 +281,11 @@ extension SharedSequenceConvertibleType where E : SharedSequenceConvertibleType,
- returns: The observable sequence that merges the elements of the inner sequences.
*/
public func merge(maxConcurrent: Int)
-> SharedSequence<SharingStrategy, E.E> {
-> SharedSequence<E.SharingStrategy, E.E> {
let source = self.asObservable()
.map { $0.asSharedSequence() }
.merge(maxConcurrent: maxConcurrent)
return SharedSequence<SharingStrategy, E.E>(source)
return SharedSequence<E.SharingStrategy, E.E>(source)
}
}

View File

@ -84,22 +84,22 @@ extension SharedSequenceOperatorTests {
// MARK: switch latest
extension SharedSequenceOperatorTests {
func testAsDriver_switchLatest() {
let hotObservable = BackgroundThreadPrimitiveHotObservable<Driver<Int>>()
let hotObservable = BackgroundThreadPrimitiveHotObservable<Signal<Int>>()
let hotObservable1 = MainThreadPrimitiveHotObservable<Int>()
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
let driver = hotObservable.asDriver(onErrorJustReturn: hotObservable1.asDriver(onErrorJustReturn: -1)).switchLatest()
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: hotObservable1.asSignal(onErrorJustReturn: -1)).switchLatest()
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(hotObservable1.asDriver(onErrorJustReturn: -2)))
hotObservable.on(.next(hotObservable1.asSignal(onErrorJustReturn: -2)))
hotObservable1.on(.next(1))
hotObservable1.on(.next(2))
hotObservable1.on(.error(testError))
hotObservable.on(.next(hotObservable2.asDriver(onErrorJustReturn: -3)))
hotObservable.on(.next(hotObservable2.asSignal(onErrorJustReturn: -3)))
hotObservable2.on(.next(10))
hotObservable2.on(.next(11))
@ -129,15 +129,15 @@ extension SharedSequenceOperatorTests {
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
let errorHotObservable = MainThreadPrimitiveHotObservable<Int>()
let drivers: [Driver<Int>] = [
hotObservable1.asDriver(onErrorJustReturn: -2),
hotObservable2.asDriver(onErrorJustReturn: -3),
errorHotObservable.asDriver(onErrorJustReturn: -4),
let signals: [Signal<Int>] = [
hotObservable1.asSignal(onErrorJustReturn: -2),
hotObservable2.asSignal(onErrorJustReturn: -3),
errorHotObservable.asSignal(onErrorJustReturn: -4),
]
let driver = hotObservable.asDriver(onErrorJustReturn: 2).flatMapLatest { drivers[$0] }
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: 2).flatMapLatest { signals[$0] }
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(0))
@ -175,15 +175,15 @@ extension SharedSequenceOperatorTests {
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
let errorHotObservable = MainThreadPrimitiveHotObservable<Int>()
let drivers: [Driver<Int>] = [
hotObservable1.asDriver(onErrorJustReturn: -2),
hotObservable2.asDriver(onErrorJustReturn: -3),
errorHotObservable.asDriver(onErrorJustReturn: -4),
let signals: [Signal<Int>] = [
hotObservable1.asSignal(onErrorJustReturn: -2),
hotObservable2.asSignal(onErrorJustReturn: -3),
errorHotObservable.asSignal(onErrorJustReturn: -4),
]
let driver = hotObservable.asDriver(onErrorJustReturn: 2).flatMapFirst { drivers[$0] }
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: 2).flatMapFirst { signals[$0] }
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(0))
@ -393,12 +393,12 @@ extension SharedSequenceOperatorTests {
extension SharedSequenceOperatorTests {
func testAsDriver_flatMap() {
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
let driver = hotObservable.asDriver(onErrorJustReturn: -1).flatMap { (n: Int) -> Driver<Int> in
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: -1).flatMap { (n: Int) -> Signal<Int> in
XCTAssertTrue(DispatchQueue.isMain)
return Driver.just(n + 1)
return Signal.just(n + 1)
}
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(1))
@ -447,12 +447,12 @@ extension SharedSequenceOperatorTests {
extension SharedSequenceOperatorTests {
func testAsDriver_merge() {
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
let driver = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Driver<Int> in
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Signal<Int> in
XCTAssertTrue(DispatchQueue.isMain)
return Driver.just(n + 1)
return Signal.just(n + 1)
}.merge()
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(1))
@ -467,12 +467,12 @@ extension SharedSequenceOperatorTests {
func testAsDriver_merge2() {
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
let driver = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Driver<Int> in
let xs: Signal<Int> = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Signal<Int> in
XCTAssertTrue(DispatchQueue.isMain)
return Driver.just(n + 1)
return Signal.just(n + 1)
}.merge(maxConcurrent: 1)
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(xs) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
hotObservable.on(.next(1))