Add ReplayRelay (#2111)

This commit is contained in:
Zsolt Kovács 2020-01-09 08:15:50 +01:00 committed by Shai Mishali
parent e7a3fcb800
commit 4e969fa385
16 changed files with 889 additions and 20 deletions

View File

@ -127,6 +127,7 @@ custom_categories:
- BehaviorRelay
- Observable+Bind
- PublishRelay
- ReplayRelay
- Utils
- name: RxSwift
children:

View File

@ -6,10 +6,10 @@ All of behave exactly the same like described [here](http://reactivex.io/documen
Relays
======
RxRelay provides two kinds of Relays: `PublishRelay` and `BehaviorRelay`.
RxRelay provides three kinds of Relays: `PublishRelay`, `BehaviorRelay` and `ReplayRelay`.
They behave exactly like their parallel `Subject`s, with two changes:
- Relays never complete.
- Relays never emit errors.
In essence, Relays only emit `.next` events, and never terminate.
In essence, Relays only emit `.next` events, and never terminate.

View File

@ -68,7 +68,7 @@ RxSwift comprises five separate components depending on each other in the follow
* **RxSwift**: The core of RxSwift, providing the Rx standard as (mostly) defined by [ReactiveX](https://reactivex.io). It has no other dependencies.
* **RxCocoa**: Provides Cocoa-specific capabilities for general iOS/macOS/watchOS & tvOS app development, such as Shared Sequences, Traits, and much more. It depends on both `RxSwift` and `RxRelay`.
* **RxRelay**: Provides `PublishRelay` and `BehaviorRelay`, two [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`.
* **RxRelay**: Provides `PublishRelay`, `BehaviorRelay` and `ReplayRelay`, three [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`.
* **RxTest** and **RxBlocking**: Provides testing capabilities for Rx-based systems. It depends on `RxSwift`.
###### ... find compatible

View File

@ -47,6 +47,10 @@
54700CA11CE37E1900EF3A8F /* UINavigationItem+RxTests.swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */; };
54D2138E1CE0824E0028D5B4 /* UINavigationItem+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */; };
601AE3DA1EE24E4F00617386 /* SwiftSupport.swift in Sources */ = {isa = PBXBuildFile; fileRef = 601AE3D91EE24E4F00617386 /* SwiftSupport.swift */; };
6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A94254923AFC2F300B7A24C /* ReplayRelay.swift */; };
78B6157523B69F49009C2AD9 /* Binder.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8E65EFA1F6E91D1004478C3 /* Binder.swift */; };
78B6157723B6A035009C2AD9 /* Binder+Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78B6157623B6A035009C2AD9 /* Binder+Tests.swift */; };
7EDBAEB41C89B1A6006CBE67 /* UITabBarItem+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */; };
@ -941,6 +945,8 @@
54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+RxTests.swift.swift"; sourceTree = "<group>"; };
54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+Rx.swift"; sourceTree = "<group>"; };
601AE3D91EE24E4F00617386 /* SwiftSupport.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SwiftSupport.swift; sourceTree = "<group>"; };
6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelayTests.swift; sourceTree = "<group>"; };
6A94254923AFC2F300B7A24C /* ReplayRelay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelay.swift; sourceTree = "<group>"; };
78B6157623B6A035009C2AD9 /* Binder+Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Binder+Tests.swift"; sourceTree = "<group>"; };
7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITabBarItem+RxTests.swift"; sourceTree = "<group>"; };
7F600F3D1C5D0C0100535B1D /* UIRefreshControl+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIRefreshControl+Rx.swift"; sourceTree = "<group>"; };
@ -1517,6 +1523,7 @@
children = (
C8B0F7101F530CA700548EBE /* PublishRelay.swift */,
C8C8BCCE1F8944B800501D4D /* BehaviorRelay.swift */,
6A94254923AFC2F300B7A24C /* ReplayRelay.swift */,
A2897D61225CA3F3004EA481 /* Observable+Bind.swift */,
A2897D68225D023A004EA481 /* Utils.swift */,
A2FD4EA4225D0A8100288525 /* Info.plist */,
@ -1528,6 +1535,7 @@
isa = PBXGroup;
children = (
A2FD4E9B225D04FF00288525 /* Observable+RelayBindTests.swift */,
6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */,
);
path = RxRelayTests;
sourceTree = "<group>";
@ -2904,6 +2912,7 @@
buildActionMask = 2147483647;
files = (
A2897D57225CA236004EA481 /* PublishRelay.swift in Sources */,
6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */,
A2897D58225CA236004EA481 /* BehaviorRelay.swift in Sources */,
A2897D62225CA3F3004EA481 /* Observable+Bind.swift in Sources */,
A2897D69225D023A004EA481 /* Utils.swift in Sources */,
@ -3072,6 +3081,7 @@
C83509351C38706E0027C24C /* KVOObservableTests.swift in Sources */,
C89046581DC5F6F70041C7D8 /* UISearchBar+RxTests.swift in Sources */,
C85218011E33FC160015DD38 /* RecursiveLock.swift in Sources */,
6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C822BACA1DB4058000F98810 /* Event+Test.swift in Sources */,
C83509421C38706E0027C24C /* MainThreadPrimitiveHotObservable.swift in Sources */,
C801DE4A1F6EBB84008DB060 /* Observable+PrimitiveSequenceTest.swift in Sources */,
@ -3317,6 +3327,7 @@
C820A9AF1EB5073E00D431BC /* Observable+FilterTests.swift in Sources */,
C820A9DF1EB50CF800D431BC /* Observable+ThrottleTests.swift in Sources */,
C83509F01C3875580027C24C /* PrimitiveMockObserver.swift in Sources */,
6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C820A9C31EB509FC00D431BC /* Observable+SkipTests.swift in Sources */,
C83509C31C3875220027C24C /* KVOObservableTests.swift in Sources */,
C8A9B6F51DAD752200C9B027 /* Observable+BindTests.swift in Sources */,
@ -3416,6 +3427,7 @@
C8D970F11F532FD30058F2FE /* SharedSequence+Extensions.swift in Sources */,
C820A9941EB4FD1400D431BC /* Observable+SwitchIfEmptyTests.swift in Sources */,
C83509CF1C3875260027C24C /* NSView+RxTests.swift in Sources */,
6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C820A9501EB4EC3C00D431BC /* Observable+ReduceTests.swift in Sources */,
C820A9841EB4FB0400D431BC /* Observable+UsingTests.swift in Sources */,
C820A9881EB4FB5B00D431BC /* Observable+DebugTests.swift in Sources */,

View File

@ -78,6 +78,34 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
})
}
/**
Creates new subscription and sends elements to `ReplayRelay`.
This method can be only called from `MainThread`.
- parameter relay: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func drive(_ relays: ReplayRelay<Element>...) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.drive(onNext: { e in
relays.forEach { $0.accept(e) }
})
}
/**
Creates new subscription and sends elements to `ReplayRelay`.
This method can be only called from `MainThread`.
- parameter relay: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func drive(_ relays: ReplayRelay<Element?>...) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.drive(onNext: { e in
relays.forEach { $0.accept(e) }
})
}
/**
Subscribes to observable sequence using custom binder function.
This method can be only called from `MainThread`.

View File

@ -66,7 +66,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
}
/**
Creates new subscription and sends elements to relay.
Creates new subscription and sends elements to `PublishRelay`.
- parameter to: Target relays for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
@ -78,7 +78,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
}
/**
Creates new subscription and sends elements to relay.
Creates new subscription and sends elements to `PublishRelay`.
- parameter to: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
@ -89,6 +89,30 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
})
}
/**
Creates new subscription and sends elements to `ReplayRelay`.
- parameter to: Target relays for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func emit(to relays: ReplayRelay<Element>...) -> Disposable {
return self.emit(onNext: { e in
relays.forEach { $0.accept(e) }
})
}
/**
Creates new subscription and sends elements to `ReplayRelay`.
- parameter to: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func emit(to relays: ReplayRelay<Element?>...) -> Disposable {
return self.emit(onNext: { e in
relays.forEach { $0.accept(e) }
})
}
/**
Subscribes an element handler, a completion handler and disposed handler to an observable sequence.
@ -105,6 +129,3 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed)
}
}

View File

@ -100,4 +100,50 @@ extension ObservableType {
}
}
}
/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
public func bind(to relays: ReplayRelay<Element>...) -> Disposable {
self.bind(to: relays)
}
/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
public func bind(to relays: ReplayRelay<Element?>...) -> Disposable {
self.map { $0 as Element? }.bind(to: relays)
}
/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
private func bind(to relays: [ReplayRelay<Element>]) -> Disposable {
subscribe { e in
switch e {
case let .next(element):
relays.forEach {
$0.accept(element)
}
case let .error(error):
rxFatalErrorInDebug("Binding error to behavior relay: \(error)")
case .completed:
break
}
}
}
}

50
RxRelay/ReplayRelay.swift Normal file
View File

@ -0,0 +1,50 @@
//
// ReplayRelay.swift
// RxRelay
//
// Created by Zsolt Kovacs on 12/22/19.
// Copyright © 2019 Krunoslav Zaher. All rights reserved.
//
import RxSwift
/// ReplayRelay is a wrapper for `ReplaySubject`.
///
/// Unlike `ReplaySubject` it can't terminate with an error or complete.
public final class ReplayRelay<Element>: ObservableType {
private let subject: ReplaySubject<Element>
// Accepts `event` and emits it to subscribers
public func accept(_ event: Element) {
self.subject.onNext(event)
}
private init(subject: ReplaySubject<Element>) {
self.subject = subject
}
/// Creates new instance of `ReplayRelay` that replays at most `bufferSize` last elements sent to it.
///
/// - parameter bufferSize: Maximal number of elements to replay to observers after subscription.
/// - returns: New instance of replay relay.
public static func create(bufferSize: Int) -> ReplayRelay<Element> {
ReplayRelay(subject: ReplaySubject.create(bufferSize: bufferSize))
}
/// Creates a new instance of `ReplayRelay` that buffers all the sent to it.
/// To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable'
/// number of elements.
public static func createUnbound() -> ReplayRelay<Element> {
ReplayRelay(subject: ReplaySubject.createUnbounded())
}
/// Subscribes observer
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.subject.subscribe(observer)
}
/// - returns: Canonical interface for push style sequence
public func asObservable() -> Observable<Element> {
self.subject.asObserver()
}
}

View File

@ -0,0 +1 @@
../../Tests/RxRelayTests/ReplayRelayTests.swift

View File

@ -258,13 +258,20 @@ final class DriverTest_ : DriverTest, RxTestCase {
("testDriveOptionalObserver", DriverTest.testDriveOptionalObserver),
("testDriveOptionalObservers", DriverTest.testDriveOptionalObservers),
("testDriveNoAmbiguity", DriverTest.testDriveNoAmbiguity),
("testDriveRelay", DriverTest.testDriveRelay),
("testDriveRelays", DriverTest.testDriveRelays),
("testDriveOptionalRelay1", DriverTest.testDriveOptionalRelay1),
("testDriveBehaviorRelay", DriverTest.testDriveBehaviorRelay),
("testDriveBehaviorRelays", DriverTest.testDriveBehaviorRelays),
("testDriveOptionalBehaviorRelay1", DriverTest.testDriveOptionalBehaviorRelay1),
("testDriveOptionalBehaviorRelays1", DriverTest.testDriveOptionalBehaviorRelays1),
("testDriveOptionalRelay2", DriverTest.testDriveOptionalRelay2),
("testDriveOptionalBehaviorRelay2", DriverTest.testDriveOptionalBehaviorRelay2),
("testDriveOptionalBehaviorRelays2", DriverTest.testDriveOptionalBehaviorRelays2),
("testDriveRelayNoAmbiguity", DriverTest.testDriveRelayNoAmbiguity),
("testDriveBehaviorRelayNoAmbiguity", DriverTest.testDriveBehaviorRelayNoAmbiguity),
("testDriveReplayRelay", DriverTest.testDriveReplayRelay),
("testDriveReplayRelays", DriverTest.testDriveReplayRelays),
("testDriveOptionalReplayRelay1", DriverTest.testDriveOptionalReplayRelay1),
("testDriveOptionalReplayRelays", DriverTest.testDriveOptionalReplayRelays),
("testDriveOptionalReplayRelay2", DriverTest.testDriveOptionalReplayRelay2),
("testDriveReplayRelays2", DriverTest.testDriveReplayRelays2),
("testDriveReplayRelayNoAmbiguity", DriverTest.testDriveReplayRelayNoAmbiguity),
] }
}
@ -1186,6 +1193,11 @@ final class ObservableRelayBindTest_ : ObservableRelayBindTest, RxTestCase {
("testBindToOptionalBehaviorRelay", ObservableRelayBindTest.testBindToOptionalBehaviorRelay),
("testBindToOptionalBehaviorRelays", ObservableRelayBindTest.testBindToOptionalBehaviorRelays),
("testBindToBehaviorRelayNoAmbiguity", ObservableRelayBindTest.testBindToBehaviorRelayNoAmbiguity),
("testBindToReplayRelay", ObservableRelayBindTest.testBindToReplayRelay),
("testBindToReplayRelays", ObservableRelayBindTest.testBindToReplayRelays),
("testBindToOptionalReplayRelay", ObservableRelayBindTest.testBindToOptionalReplayRelay),
("testBindToOptionalReplayRelays", ObservableRelayBindTest.testBindToOptionalReplayRelays),
("testBindToReplayRelayNoAmbiguity", ObservableRelayBindTest.testBindToReplayRelayNoAmbiguity),
] }
}
@ -1855,6 +1867,21 @@ final class RecursiveLockTests_ : RecursiveLockTests, RxTestCase {
] }
}
final class ReplayRelayTests_ : ReplayRelayTests, RxTestCase {
#if os(macOS)
required override init() {
super.init()
}
#endif
static var allTests: [(String, (ReplayRelayTests_) -> () -> Void)] { return [
("test_noEvents", ReplayRelayTests.test_noEvents),
("test_fewerEventsThanBufferSize", ReplayRelayTests.test_fewerEventsThanBufferSize),
("test_moreEventsThanBufferSize", ReplayRelayTests.test_moreEventsThanBufferSize),
("test_moreEventsThanBufferSizeMultipleObservers", ReplayRelayTests.test_moreEventsThanBufferSizeMultipleObservers),
] }
}
final class ReplaySubjectTest_ : ReplaySubjectTest, RxTestCase {
#if os(macOS)
required override init() {
@ -1866,6 +1893,14 @@ final class ReplaySubjectTest_ : ReplaySubjectTest, RxTestCase {
("test_hasObserversNoObservers", ReplaySubjectTest.test_hasObserversNoObservers),
("test_hasObserversOneObserver", ReplaySubjectTest.test_hasObserversOneObserver),
("test_hasObserversManyObserver", ReplaySubjectTest.test_hasObserversManyObserver),
("test_noEvents", ReplaySubjectTest.test_noEvents),
("test_fewerEventsThanBufferSize", ReplaySubjectTest.test_fewerEventsThanBufferSize),
("test_moreEventsThanBufferSize", ReplaySubjectTest.test_moreEventsThanBufferSize),
("test_moreEventsThanBufferSizeMultipleObservers", ReplaySubjectTest.test_moreEventsThanBufferSizeMultipleObservers),
("test_subscribingBeforeComplete", ReplaySubjectTest.test_subscribingBeforeComplete),
("test_subscribingAfterComplete", ReplaySubjectTest.test_subscribingAfterComplete),
("test_subscribingBeforeError", ReplaySubjectTest.test_subscribingBeforeError),
("test_subscribingAfterError", ReplaySubjectTest.test_subscribingAfterError),
] }
}
@ -1970,6 +2005,13 @@ final class SignalTests_ : SignalTests, RxTestCase {
("testEmitOptionalPublishRelay2", SignalTests.testEmitOptionalPublishRelay2),
("testEmitPublishRelays2", SignalTests.testEmitPublishRelays2),
("testEmitPublishRelayNoAmbiguity", SignalTests.testEmitPublishRelayNoAmbiguity),
("testEmitReplayRelay", SignalTests.testEmitReplayRelay),
("testEmitReplayRelays", SignalTests.testEmitReplayRelays),
("testEmitOptionalReplayRelay1", SignalTests.testEmitOptionalReplayRelay1),
("testEmitOptionalReplayRelays", SignalTests.testEmitOptionalReplayRelays),
("testEmitOptionalReplayRelay2", SignalTests.testEmitOptionalReplayRelay2),
("testEmitReplayRelays2", SignalTests.testEmitReplayRelays2),
("testEmitReplayRelayNoAmbiguity", SignalTests.testEmitReplayRelayNoAmbiguity),
] }
}
@ -2171,6 +2213,7 @@ func XCTMain(_ tests: [() -> Void]) {
testCase(PublishSubjectTest_.allTests),
testCase(ReactiveTests_.allTests),
testCase(RecursiveLockTests_.allTests),
testCase(ReplayRelayTests_.allTests),
testCase(ReplaySubjectTest_.allTests),
testCase(SharedSequenceOperatorTests_.allTests),
testCase(SharingSchedulerTest_.allTests),

View File

@ -0,0 +1 @@
../../RxRelay/ReplayRelay.swift

View File

@ -397,7 +397,7 @@ extension DriverTest {
// MARK: drive optional behavior relay
extension DriverTest {
func testDriveRelay() {
func testDriveBehaviorRelay() {
let relay = BehaviorRelay<Int>(value: 0)
let subscription = (Driver.just(1) as Driver<Int>).drive(relay)
@ -406,7 +406,7 @@ extension DriverTest {
subscription.dispose()
}
func testDriveRelays() {
func testDriveBehaviorRelays() {
let relay1 = BehaviorRelay<Int>(value: 0)
let relay2 = BehaviorRelay<Int>(value: 0)
@ -416,7 +416,7 @@ extension DriverTest {
XCTAssertEqual(relay2.value, 1)
}
func testDriveOptionalRelay1() {
func testDriveOptionalBehaviorRelay1() {
let relay = BehaviorRelay<Int?>(value: 0)
_ = (Driver.just(1) as Driver<Int>).drive(relay)
@ -434,7 +434,7 @@ extension DriverTest {
XCTAssertEqual(relay2.value, 1)
}
func testDriveOptionalRelay2() {
func testDriveOptionalBehaviorRelay2() {
let relay = BehaviorRelay<Int?>(value: 0)
_ = (Driver.just(1) as Driver<Int?>).drive(relay)
@ -452,7 +452,7 @@ extension DriverTest {
XCTAssertEqual(relay2.value, 1)
}
func testDriveRelayNoAmbiguity() {
func testDriveBehaviorRelayNoAmbiguity() {
let relay = BehaviorRelay<Int?>(value: 0)
// shouldn't cause compile time error
@ -461,3 +461,122 @@ extension DriverTest {
XCTAssertEqual(relay.value, 1)
}
}
// MARK: drive optional behavior relay
extension DriverTest {
func testDriveReplayRelay() {
let relay = ReplayRelay<Int>.create(bufferSize: 1)
var latest: Int?
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Driver.just(1) as Driver<Int>).drive(relay)
XCTAssertEqual(latest, 1)
}
func testDriveReplayRelays() {
let relay1 = ReplayRelay<Int>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Driver.just(1) as Driver<Int>).drive(relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testDriveOptionalReplayRelay1() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int? = nil
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Driver.just(1) as Driver<Int>).drive(relay)
XCTAssertEqual(latest, 1)
}
func testDriveOptionalReplayRelays() {
let relay1 = ReplayRelay<Int?>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int?>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Driver.just(1) as Driver<Int>).drive(relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testDriveOptionalReplayRelay2() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int?
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Driver.just(1) as Driver<Int?>).drive(relay)
XCTAssertEqual(latest, 1)
}
func testDriveReplayRelays2() {
let relay1 = ReplayRelay<Int?>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int?>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Driver.just(1) as Driver<Int?>).drive(relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testDriveReplayRelayNoAmbiguity() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int? = nil
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
// shouldn't cause compile time error
_ = Driver.just(1).drive(relay)
XCTAssertEqual(latest, 1)
}
}

View File

@ -408,7 +408,7 @@ extension SignalTests {
}
}
// MARK: Emit to relay
// MARK: Emit to publish relay
extension SignalTests {
func testEmitPublishRelay() {
@ -527,3 +527,123 @@ extension SignalTests {
XCTAssertEqual(latest, 1)
}
}
// MARK: Emit to replay relay
extension SignalTests {
func testEmitReplayRelay() {
let relay = ReplayRelay<Int>.create(bufferSize: 1)
var latest: Int?
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Signal.just(1) as Signal<Int>).emit(to: relay)
XCTAssertEqual(latest, 1)
}
func testEmitReplayRelays() {
let relay1 = ReplayRelay<Int>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Signal.just(1) as Signal<Int>).emit(to: relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testEmitOptionalReplayRelay1() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int? = nil
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Signal.just(1) as Signal<Int>).emit(to: relay)
XCTAssertEqual(latest, 1)
}
func testEmitOptionalReplayRelays() {
let relay1 = ReplayRelay<Int?>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int?>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Signal.just(1) as Signal<Int>).emit(to: relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testEmitOptionalReplayRelay2() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int?
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
_ = (Signal.just(1) as Signal<Int?>).emit(to: relay)
XCTAssertEqual(latest, 1)
}
func testEmitReplayRelays2() {
let relay1 = ReplayRelay<Int?>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int?>.create(bufferSize: 1)
var latest1: Int?
var latest2: Int?
_ = relay1.subscribe(onNext: { latestElement in
latest1 = latestElement
})
_ = relay2.subscribe(onNext: { latestElement in
latest2 = latestElement
})
_ = (Signal.just(1) as Signal<Int?>).emit(to: relay1, relay2)
XCTAssertEqual(latest1, 1)
XCTAssertEqual(latest2, 1)
}
func testEmitReplayRelayNoAmbiguity() {
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
var latest: Int? = nil
_ = relay.subscribe(onNext: { latestElement in
latest = latestElement
})
// shouldn't cause compile time error
_ = Signal.just(1).emit(to: relay)
XCTAssertEqual(latest, 1)
}
}

View File

@ -22,7 +22,7 @@ extension ObservableRelayBindTest {
let relay = PublishRelay<Int>()
_ = relay.subscribe{ event in
_ = relay.subscribe { event in
events.append(Recorded(time: 0, value: event))
}
@ -164,3 +164,106 @@ extension ObservableRelayBindTest {
XCTAssertEqual(relay.value, 1)
}
}
// MARK: bind(to:) replay relay
extension ObservableRelayBindTest {
func testBindToReplayRelay() {
var events: [Recorded<Event<Int>>] = []
let relay = ReplayRelay<Int>.create(bufferSize: 1)
_ = relay.subscribe { event in
events.append(Recorded(time: 0, value: event))
}
_ = Observable.just(1).bind(to: relay)
XCTAssertEqual(events, [
.next(1),
])
}
func testBindToReplayRelays() {
var events1: [Recorded<Event<Int>>] = []
var events2: [Recorded<Event<Int>>] = []
let relay1 = ReplayRelay<Int>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int>.create(bufferSize: 1)
_ = relay1.subscribe { event in
events1.append(Recorded(time: 0, value: event))
}
_ = relay2.subscribe { event in
events2.append(Recorded(time: 0, value: event))
}
_ = Observable.just(1).bind(to: relay1, relay2)
XCTAssertEqual(events1, [
.next(1),
])
XCTAssertEqual(events2, [
.next(1),
])
}
func testBindToOptionalReplayRelay() {
var events: [Recorded<Event<Int?>>] = []
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
_ = relay.subscribe { event in
events.append(Recorded(time: 0, value: event))
}
_ = (Observable.just(1) as Observable<Int>).bind(to: relay)
XCTAssertEqual(events, [
.next(1),
])
}
func testBindToOptionalReplayRelays() {
var events1: [Recorded<Event<Int?>>] = []
var events2: [Recorded<Event<Int?>>] = []
let relay1 = ReplayRelay<Int?>.create(bufferSize: 1)
let relay2 = ReplayRelay<Int?>.create(bufferSize: 1)
_ = relay1.subscribe { event in
events1.append(Recorded(time: 0, value: event))
}
_ = relay2.subscribe { event in
events2.append(Recorded(time: 0, value: event))
}
_ = (Observable.just(1) as Observable<Int>).bind(to: relay1, relay2)
XCTAssertEqual(events1, [
.next(1),
])
XCTAssertEqual(events2, [
.next(1),
])
}
func testBindToReplayRelayNoAmbiguity() {
var events: [Recorded<Event<Int?>>] = []
let relay = ReplayRelay<Int?>.create(bufferSize: 1)
_ = relay.subscribe { event in
events.append(Recorded(time: 0, value: event))
}
_ = Observable.just(1).bind(to: relay)
XCTAssertEqual(events, [
.next(1),
])
}
}

View File

@ -0,0 +1,113 @@
//
// ReplayRelayTests.swift
// Tests
//
// Created by Zsolt Kovacs on 12/31/19.
// Copyright © 2019 Krunoslav Zaher. All rights reserved.
//
import XCTest
import RxSwift
import RxRelay
import RxTest
class ReplayRelayTests: RxTest {
func test_noEvents() {
let scheduler = TestScheduler(initialClock: 0)
let relay = ReplayRelay<Int>.create(bufferSize: 3)
let result = scheduler.createObserver(Int.self)
_ = relay.subscribe(result)
XCTAssertTrue(result.events.isEmpty)
}
func test_fewerEventsThanBufferSize() {
let scheduler = TestScheduler(initialClock: 0)
var relay: ReplayRelay<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) }
scheduler.scheduleAt(150) { relay.accept(1) }
scheduler.scheduleAt(200) { relay.accept(2) }
scheduler.scheduleAt(300) { subscription = relay.subscribe(result) }
scheduler.scheduleAt(350) {
XCTAssertEqual(result.events, [
.next(300, 1),
.next(300, 2),
])
}
scheduler.scheduleAt(400) { subscription.dispose() }
scheduler.start()
}
func test_moreEventsThanBufferSize() {
let scheduler = TestScheduler(initialClock: 0)
var relay: ReplayRelay<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) }
scheduler.scheduleAt(150) { relay.accept(1) }
scheduler.scheduleAt(200) { relay.accept(2) }
scheduler.scheduleAt(250) { relay.accept(3) }
scheduler.scheduleAt(300) { relay.accept(4) }
scheduler.scheduleAt(350) { relay.accept(5) }
scheduler.scheduleAt(400) { subscription = relay.subscribe(result) }
scheduler.scheduleAt(450) {
XCTAssertEqual(result.events, [
.next(400, 3),
.next(400, 4),
.next(400, 5),
])
}
scheduler.scheduleAt(500) { subscription.dispose() }
scheduler.start()
}
func test_moreEventsThanBufferSizeMultipleObservers() {
let scheduler = TestScheduler(initialClock: 0)
var relay: ReplayRelay<Int>! = nil
let result1 = scheduler.createObserver(Int.self)
var subscription1: Disposable! = nil
let result2 = scheduler.createObserver(Int.self)
var subscription2: Disposable! = nil
scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subscription1 = relay.subscribe(result1) }
scheduler.scheduleAt(200) { relay.accept(1) }
scheduler.scheduleAt(250) { relay.accept(2) }
scheduler.scheduleAt(300) { relay.accept(3) }
scheduler.scheduleAt(350) { relay.accept(4) }
scheduler.scheduleAt(400) { relay.accept(5) }
scheduler.scheduleAt(450) { subscription2 = relay.subscribe(result2) }
scheduler.scheduleAt(500) {
XCTAssertEqual(result1.events, [
.next(200, 1),
.next(250, 2),
.next(300, 3),
.next(350, 4),
.next(400, 5),
])
XCTAssertEqual(result2.events, [
.next(450, 3),
.next(450, 4),
.next(450, 5),
])
}
scheduler.scheduleAt(550) {
subscription1.dispose()
subscription2.dispose()
}
scheduler.start()
}
}

View File

@ -70,4 +70,215 @@ class ReplaySubjectTest: RxTest {
scheduler.start()
}
func test_noEvents() {
let scheduler = TestScheduler(initialClock: 0)
let subject = ReplaySubject<Int>.create(bufferSize: 3)
let result = scheduler.createObserver(Int.self)
_ = subject.subscribe(result)
XCTAssertTrue(result.events.isEmpty)
}
func test_fewerEventsThanBufferSize() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(300) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(350) {
XCTAssertEqual(result.events, [
.next(300, 1),
.next(300, 2),
])
}
scheduler.scheduleAt(400) { subscription.dispose() }
scheduler.start()
}
func test_moreEventsThanBufferSize() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(250) { subject.onNext(3) }
scheduler.scheduleAt(300) { subject.onNext(4) }
scheduler.scheduleAt(350) { subject.onNext(5) }
scheduler.scheduleAt(400) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(450) {
XCTAssertEqual(result.events, [
.next(400, 3),
.next(400, 4),
.next(400, 5),
])
}
scheduler.scheduleAt(500) { subscription.dispose() }
scheduler.start()
}
func test_moreEventsThanBufferSizeMultipleObservers() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result1 = scheduler.createObserver(Int.self)
var subscription1: Disposable! = nil
let result2 = scheduler.createObserver(Int.self)
var subscription2: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subscription1 = subject.subscribe(result1) }
scheduler.scheduleAt(200) { subject.onNext(1) }
scheduler.scheduleAt(250) { subject.onNext(2) }
scheduler.scheduleAt(300) { subject.onNext(3) }
scheduler.scheduleAt(350) { subject.onNext(4) }
scheduler.scheduleAt(400) { subject.onNext(5) }
scheduler.scheduleAt(450) { subscription2 = subject.subscribe(result2) }
scheduler.scheduleAt(500) {
XCTAssertEqual(result1.events, [
.next(200, 1),
.next(250, 2),
.next(300, 3),
.next(350, 4),
.next(400, 5),
])
XCTAssertEqual(result2.events, [
.next(450, 3),
.next(450, 4),
.next(450, 5),
])
}
scheduler.scheduleAt(550) {
subscription1.dispose()
subscription2.dispose()
}
scheduler.start()
}
func test_subscribingBeforeComplete() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(250) { subject.onNext(3) }
scheduler.scheduleAt(300) { subject.onNext(4) }
scheduler.scheduleAt(350) { subject.onNext(5) }
scheduler.scheduleAt(400) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(450) { subject.onCompleted() }
scheduler.scheduleAt(500) {
XCTAssertEqual(result.events, [
.next(400, 3),
.next(400, 4),
.next(400, 5),
.completed(450),
])
}
scheduler.scheduleAt(550) { subscription.dispose() }
scheduler.start()
}
func test_subscribingAfterComplete() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(250) { subject.onNext(3) }
scheduler.scheduleAt(300) { subject.onNext(4) }
scheduler.scheduleAt(350) { subject.onNext(5) }
scheduler.scheduleAt(400) { subject.onCompleted() }
scheduler.scheduleAt(450) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(500) {
XCTAssertEqual(result.events, [
.next(450, 3),
.next(450, 4),
.next(450, 5),
.completed(450),
])
}
scheduler.scheduleAt(550) { subscription.dispose() }
scheduler.start()
}
func test_subscribingBeforeError() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(250) { subject.onNext(3) }
scheduler.scheduleAt(300) { subject.onNext(4) }
scheduler.scheduleAt(350) { subject.onNext(5) }
scheduler.scheduleAt(400) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(450) { subject.onError(testError) }
scheduler.scheduleAt(500) {
XCTAssertEqual(result.events, [
.next(400, 3),
.next(400, 4),
.next(400, 5),
.error(450, testError),
])
}
scheduler.scheduleAt(550) { subscription.dispose() }
scheduler.start()
}
func test_subscribingAfterError() {
let scheduler = TestScheduler(initialClock: 0)
var subject: ReplaySubject<Int>! = nil
let result = scheduler.createObserver(Int.self)
var subscription: Disposable! = nil
scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) }
scheduler.scheduleAt(150) { subject.onNext(1) }
scheduler.scheduleAt(200) { subject.onNext(2) }
scheduler.scheduleAt(250) { subject.onNext(3) }
scheduler.scheduleAt(300) { subject.onNext(4) }
scheduler.scheduleAt(350) { subject.onNext(5) }
scheduler.scheduleAt(400) { subject.onError(testError) }
scheduler.scheduleAt(450) { subscription = subject.subscribe(result) }
scheduler.scheduleAt(500) {
XCTAssertEqual(result.events, [
.next(450, 3),
.next(450, 4),
.next(450, 5),
.error(450, testError),
])
}
scheduler.scheduleAt(550) { subscription.dispose() }
scheduler.start()
}
}