Merge first and firstOrError operators.

The new operator returns an optional value.
This commit is contained in:
Vinícius Soares 2017-08-02 21:36:11 -03:00 committed by Vinícius Soares
parent d4a30201a9
commit ec7e9527c2
4 changed files with 21 additions and 177 deletions

View File

@ -6,17 +6,9 @@
// Copyright © 2017 Krunoslav Zaher. All rights reserved.
//
fileprivate final class FirstSink<O: ObserverType> : Sink<O>, ObserverType {
typealias ElementType = O.E
typealias Parent = First<ElementType>
typealias E = ElementType
private var _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
fileprivate final class FirstSink<Element, O: ObserverType> : Sink<O>, ObserverType where O.E == Element? {
typealias E = Element
typealias Parent = First<E>
func on(_ event: Event<E>) {
switch event {
@ -24,32 +16,26 @@ fileprivate final class FirstSink<O: ObserverType> : Sink<O>, ObserverType {
forwardOn(.next(value))
forwardOn(.completed)
dispose()
case .error:
forwardOn(event)
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
if let defaultElement = _parent._defaultItem {
forwardOn(.next(defaultElement))
forwardOn(.completed)
} else {
forwardOn(.error(RxError.noElements))
}
forwardOn(.next(nil))
forwardOn(.completed)
dispose()
}
}
}
final class First<Element>: Producer<Element> {
final class First<Element>: Producer<Element?> {
fileprivate let _source: Observable<Element>
fileprivate let _defaultItem: E?
init(source: Observable<Element>, defaultItem: E? = nil) {
init(source: Observable<Element>) {
_source = source
_defaultItem = defaultItem
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = FirstSink(parent: self, observer: observer, cancel: cancel)
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element? {
let sink = FirstSink(observer: observer, cancel: cancel)
let subscription = _source.subscribe(sink)
return (sink: sink, subscription: subscription)
}

View File

@ -774,26 +774,13 @@ extension ObservableType {
/**
The `first` operator emits only the very first item emitted by this Observable,
or a default item if this Observable completes without emitting anything.
or nil if this Observable completes without emitting anything.
- seealso: [single operator on reactivex.io](http://reactivex.io/documentation/operators/first.html)
- parameter defaultItem: the default item to emit if the source doesn't emit anything
- returns: An observable sequence that emits a single element or a default item if the source Publisher completes without emitting any items.
- returns: An observable sequence that emits a single element or nil if the source Publisher completes without emitting any items.
*/
public func first(_ defaultItem: E) -> Single<E> {
return PrimitiveSequence(raw: First(source: self.asObservable(), defaultItem: defaultItem))
}
/**
The `firstOrError` operator emits only the very first item emitted by this Observable or
throws a `RxError.noElements` if this Observable is empty.
- seealso: [single operator on reactivex.io](http://reactivex.io/documentation/operators/first.html)
- returns: An observable sequence that emits a single element or a default item if the source Publisher completes without emitting any items.
*/
public func firstOrError() -> Single<E> {
public func first() -> Single<E?> {
return PrimitiveSequence(raw: First(source: self.asObservable()))
}

View File

@ -159,11 +159,6 @@ final class PrimitiveSequenceTest_ : PrimitiveSequenceTest, RxTestCase {
("testFirst_Many", PrimitiveSequenceTest.testFirst_Many),
("testFirst_ManyWithoutCompletion", PrimitiveSequenceTest.testFirst_ManyWithoutCompletion),
("testFirst_Error", PrimitiveSequenceTest.testFirst_Error),
("testFirstOrError_Empty", PrimitiveSequenceTest.testFirstOrError_Empty),
("testFirstOrError_One", PrimitiveSequenceTest.testFirstOrError_One),
("testFirstOrError_Many", PrimitiveSequenceTest.testFirstOrError_Many),
("testFirstOrError_ManyWithoutCompletion", PrimitiveSequenceTest.testFirstOrError_ManyWithoutCompletion),
("testFirstOrError_Error", PrimitiveSequenceTest.testFirstOrError_Error),
("testAsMaybe_Empty", PrimitiveSequenceTest.testAsMaybe_Empty),
("testAsMaybe_One", PrimitiveSequenceTest.testAsMaybe_One),
("testAsMaybe_Many", PrimitiveSequenceTest.testAsMaybe_Many),

View File

@ -1055,13 +1055,13 @@ extension PrimitiveSequenceTest {
error(260, testError)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first(0)
let res: TestableObserver<Int> = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first().map { $0 ?? -1 }
return single.asObservable()
}
XCTAssertEqual(res.events, [
next(250, 0),
next(250, -1),
completed(250)
])
@ -1081,7 +1081,7 @@ extension PrimitiveSequenceTest {
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first(0)
let single: Single<Int> = xs.first().map { $0 ?? -1 }
return single.asObservable()
}
@ -1107,7 +1107,7 @@ extension PrimitiveSequenceTest {
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first(0)
let single: Single<Int> = xs.first().map { $0 ?? -1 }
return single.asObservable()
}
@ -1133,7 +1133,7 @@ extension PrimitiveSequenceTest {
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first(0)
let single: Single<Int> = xs.first().map { $0 ?? -1 }
return single.asObservable()
}
@ -1156,131 +1156,7 @@ extension PrimitiveSequenceTest {
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.first(0)
return single.asObservable()
}
XCTAssertEqual(res.events, [
error(210, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 210)
])
}
}
extension PrimitiveSequenceTest {
func testFirstOrError_Empty() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
completed(250),
error(260, testError)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.firstOrError()
return single.asObservable()
}
XCTAssertEqual(res.events, [
error(250, RxError.noElements)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testFirstOrError_One() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
completed(250),
error(260, testError)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.firstOrError()
return single.asObservable()
}
XCTAssertEqual(res.events, [
next(210, 2),
completed(210)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 210)
])
}
func testFirstOrError_Many() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
next(220, 3),
completed(250),
error(260, testError)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.firstOrError()
return single.asObservable()
}
XCTAssertEqual(res.events, [
next(210, 2),
completed(210)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 210)
])
}
func testFirstOrError_ManyWithoutCompletion() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(160, 2),
next(280, 3),
next(250, 4),
next(300, 5)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.firstOrError()
return single.asObservable()
}
XCTAssertEqual(res.events, [
next(250, 4),
completed(250)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testFirstOrError_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
error(210, testError)
])
let res = scheduler.start { () -> Observable<Int> in
let single: Single<Int> = xs.firstOrError()
let single: Single<Int> = xs.first().map { $0 ?? -1 }
return single.asObservable()
}