Adds range operator.

This commit is contained in:
Krunoslav Zaher 2015-09-13 14:54:47 +02:00
parent ebe338675c
commit 4695ab9de7
9 changed files with 151 additions and 6 deletions

View File

@ -7,9 +7,12 @@ All notable changes to this project will be documented in this file.
* Renames `ImmediateScheduler` protocol to `ImmediateSchedulerType`
* Renames `Scheduler` protocol to `SchedulerType`
* Adds `CurrentThreadScheduler`
* Adds `generate` operator
* Cleanup of dead observer code.
* Removes `SpinLock`s in disposables in favor of more performant `OSAtomicCompareAndSwap32`.
* Adds `buffer` operator (version with time and count).
* Adds `range` operator.
## [2.0.0-alpha.2](https://github.com/ReactiveX/RxSwift/releases/tag/2.0.0-alpha.2)

View File

@ -262,6 +262,8 @@
C84B38EA1BA43380001B7D88 /* ScheduledItem.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38E71BA43380001B7D88 /* ScheduledItem.swift */; };
C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; };
C84B38EF1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; };
C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; };
C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; };
C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; };
C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; };
C88254181B8A752B00B02D69 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; };
@ -456,6 +458,7 @@
C821DBA11BA4DCAB008F3809 /* Buffer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Buffer.swift; sourceTree = "<group>"; };
C84B38E71BA43380001B7D88 /* ScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ScheduledItem.swift; sourceTree = "<group>"; };
C84B38ED1BA433CD001B7D88 /* Generate.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Generate.swift; sourceTree = "<group>"; };
C86409FB1BA593F500D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = "<group>"; };
C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F41B8A752B00B02D69 /* ItemEvents.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemEvents.swift; sourceTree = "<group>"; };
@ -667,6 +670,7 @@
C8093C801B8A72BE0088E94D /* ObserveOn.swift */,
C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */,
C8093C831B8A72BE0088E94D /* Producer.swift */,
C86409FB1BA593F500D3C4E8 /* Range.swift */,
C8093C841B8A72BE0088E94D /* Reduce.swift */,
C8093C851B8A72BE0088E94D /* RefCount.swift */,
C8093C861B8A72BE0088E94D /* Sample.swift */,
@ -1354,6 +1358,7 @@
C8093D421B8A72BE0088E94D /* Switch.swift in Sources */,
C8093DA01B8A72BE0088E94D /* BehaviorSubject.swift in Sources */,
C8093D181B8A72BE0088E94D /* DelaySubscription.swift in Sources */,
C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */,
C8093D221B8A72BE0088E94D /* Map.swift in Sources */,
C8093CD01B8A72BE0088E94D /* InfiniteSequence.swift in Sources */,
C8093D661B8A72BE0088E94D /* ObservableType.swift in Sources */,
@ -1464,6 +1469,7 @@
C8093D411B8A72BE0088E94D /* Switch.swift in Sources */,
C8093D9F1B8A72BE0088E94D /* BehaviorSubject.swift in Sources */,
C8093D171B8A72BE0088E94D /* DelaySubscription.swift in Sources */,
C86409FC1BA593F500D3C4E8 /* Range.swift in Sources */,
C8093D211B8A72BE0088E94D /* Map.swift in Sources */,
C8093CCF1B8A72BE0088E94D /* InfiniteSequence.swift in Sources */,
C8093D651B8A72BE0088E94D /* ObservableType.swift in Sources */,

View File

@ -240,6 +240,7 @@
C86409F81BA5909000D3C4E8 /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */; };
C86409F91BA5909000D3C4E8 /* SubjectType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098E1BA5909000D3C4E8 /* SubjectType.swift */; };
C86409FA1BA5909000D3C4E8 /* Variable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C864098F1BA5909000D3C4E8 /* Variable.swift */; };
C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FE1BA5A87200D3C4E8 /* Range.swift */; };
C86E2F3E1AE5A0CA00C31024 /* SearchResultViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */; };
C86E2F3F1AE5A0CA00C31024 /* SearchViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */; };
C86E2F451AE5A0CA00C31024 /* WikipediaAPI.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */; };
@ -520,6 +521,7 @@
C864098D1BA5909000D3C4E8 /* ReplaySubject.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = "<group>"; };
C864098E1BA5909000D3C4E8 /* SubjectType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubjectType.swift; sourceTree = "<group>"; };
C864098F1BA5909000D3C4E8 /* Variable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Variable.swift; sourceTree = "<group>"; };
C86409FE1BA5A87200D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = "<group>"; };
C86E2F321AE5A0CA00C31024 /* SearchResultViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchResultViewModel.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C86E2F331AE5A0CA00C31024 /* SearchViewModel.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = SearchViewModel.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C86E2F3B1AE5A0CA00C31024 /* WikipediaAPI.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaAPI.swift; sourceTree = "<group>"; };
@ -973,10 +975,10 @@
C864093F1BA5909000D3C4E8 /* AsObservable.swift */,
C86409401BA5909000D3C4E8 /* Buffer.swift */,
C86409411BA5909000D3C4E8 /* Catch.swift */,
C86409451BA5909000D3C4E8 /* CombineLatest.swift */,
C86409421BA5909000D3C4E8 /* CombineLatest+arity.swift */,
C86409431BA5909000D3C4E8 /* CombineLatest+arity.tt */,
C86409441BA5909000D3C4E8 /* CombineLatest+CollectionType.swift */,
C86409451BA5909000D3C4E8 /* CombineLatest.swift */,
C86409461BA5909000D3C4E8 /* Concat.swift */,
C86409471BA5909000D3C4E8 /* ConnectableObservable.swift */,
C86409481BA5909000D3C4E8 /* Debug.swift */,
@ -997,6 +999,7 @@
C86409571BA5909000D3C4E8 /* ObserveOn.swift */,
C86409581BA5909000D3C4E8 /* ObserveOnSerialDispatchQueue.swift */,
C86409591BA5909000D3C4E8 /* Producer.swift */,
C86409FE1BA5A87200D3C4E8 /* Range.swift */,
C864095A1BA5909000D3C4E8 /* Reduce.swift */,
C864095B1BA5909000D3C4E8 /* RefCount.swift */,
C864095C1BA5909000D3C4E8 /* Sample.swift */,
@ -1011,10 +1014,10 @@
C86409651BA5909000D3C4E8 /* TakeWhile.swift */,
C86409661BA5909000D3C4E8 /* Throttle.swift */,
C86409671BA5909000D3C4E8 /* Timer.swift */,
C864096B1BA5909000D3C4E8 /* Zip.swift */,
C86409681BA5909000D3C4E8 /* Zip+arity.swift */,
C86409691BA5909000D3C4E8 /* Zip+arity.tt */,
C864096A1BA5909000D3C4E8 /* Zip+CollectionType.swift */,
C864096B1BA5909000D3C4E8 /* Zip.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1405,6 +1408,7 @@
C86409B51BA5909000D3C4E8 /* ConnectableObservable.swift in Sources */,
C84B3A3A1BA4345A001B7D88 /* DeinitAction.swift in Sources */,
C84B3A621BA4345A001B7D88 /* UITableView+Rx.swift in Sources */,
C86409FF1BA5A87200D3C4E8 /* Range.swift in Sources */,
C86409E61BA5909000D3C4E8 /* ObserverBase.swift in Sources */,
C84B3A631BA4345A001B7D88 /* UITextField+Rx.swift in Sources */,
C84B3A571BA4345A001B7D88 /* UICollectionView+Rx.swift in Sources */,

View File

@ -113,9 +113,8 @@ public struct Queue<T>: SequenceType {
public mutating func enqueue(element: T) {
version++
_ = count == storage.count
if count == storage.count {
resizeTo(storage.count * resizeFactor)
resizeTo(max(storage.count, 1) * resizeFactor)
}
storage[pushNextIndex] = element

View File

@ -0,0 +1,51 @@
//
// Range.swift
// Rx
//
// Created by Krunoslav Zaher on 9/13/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class RangeProducer<_CompilerWorkaround> : Producer<Int> {
let start: Int
let count: Int
let scheduler: ImmediateSchedulerType
init(start: Int, count: Int, scheduler: ImmediateSchedulerType) {
self.start = start
self.count = count
self.scheduler = scheduler
}
override func run<O : ObserverType where O.E == Int>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
}
}
class RangeSink<_CompilerWorkaround, O: ObserverType where O.E == Int> : Sink<O> {
typealias Parent = RangeProducer<_CompilerWorkaround>
let parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self.parent.scheduler.scheduleRecursive(0) { i, recurse in
if i < self.parent.count {
self.observer?.on(.Next(self.parent.start + i))
recurse(i + 1)
}
else {
self.observer?.on(.Completed)
self.dispose()
}
}
}
}

View File

@ -128,4 +128,24 @@ to run the loop send out observer messages.
*/
public func generate<E>(initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
/**
Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to generate and send out observer messages.
- parameter start: The value of the first integer in the sequence.
- parameter count: The number of sequential integers to generate.
- parameter scheduler: Scheduler to run the generator loop on.
- returns: An observable sequence that contains a range of sequential integral numbers.
*/
public func range(start: Int, _ count: Int, _ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Int> {
if count < 0 {
rxFatalError("count can't be negative")
}
if start &+ (count - 1) < start {
rxFatalError("overflow of count")
}
return RangeProducer<Int>(start: start, count: count, scheduler: scheduler)
}

View File

@ -26,9 +26,19 @@ class CurrentThreadSchedulerKey : NSObject, NSCopying {
}
}
/**
Represents an object that schedules units of work on the current thread.
This is the default scheduler for operators that generate elements.
This scheduler is also sometimes called `trampoline scheduler`.
*/
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()
static var queue : ScheduleQueue? {
@ -40,10 +50,23 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
}
}
static var isScheduleRequired: Bool {
/**
Gets a value that indicates whether the caller must call a `schedule` method.
*/
public static var isScheduleRequired: Bool {
return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil
}
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
let queue = CurrentThreadScheduler.queue
@ -53,7 +76,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
return scheduledItem
}
let newQueue = RxMutableBox(Queue<ScheduledItemType>(capacity: 10))
let newQueue = RxMutableBox(Queue<ScheduledItemType>(capacity: 0))
CurrentThreadScheduler.queue = newQueue
action(state)

View File

@ -99,3 +99,32 @@ extension ObservableCreationTests {
XCTAssertEqual(count, 3)
}
}
extension ObservableCreationTests {
func testRange_Boundaries() {
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start {
range(Int.max, 1, scheduler)
}
XCTAssertEqual(res.messages, [
next(201, Int.max),
completed(202)
])
}
func testRange_Dispose() {
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start(204) {
range(-10, 5, scheduler)
}
XCTAssertEqual(res.messages, [
next(201, -10),
next(202, -9),
next(203, -8)
])
}
}

View File

@ -1289,4 +1289,14 @@ extension ObservableTimeTest {
])
}
func bufferWithTimeOrCount_Default() {
let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let result = try! range(1, 10, backgroundScheduler)
.buffer(timeSpan: 1000, count: 3, scheduler: backgroundScheduler)
.skip(1)
.first()
XCTAssertEqual(result!, [4, 5, 6])
}
}