Added ConcurrentDispatchQueue scheduler.

This commit is contained in:
Krunoslav Zaher 2015-07-05 19:17:29 +02:00
parent d5c977109a
commit 8683b6a883
15 changed files with 219 additions and 54 deletions

View File

@ -363,7 +363,7 @@
isa = PBXProject;
attributes = {
LastSwiftUpdateCheck = 0700;
LastUpgradeCheck = 0610;
LastUpgradeCheck = 0640;
ORGANIZATIONNAME = "Krunoslav Zaher";
TargetAttributes = {
C81553DD1A98AB4A00C63152 = {
@ -578,7 +578,6 @@
"$(inherited)",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Debug-iphoneos",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Release-iphoneos",
"/Users/kzaher/Projects/Rx/RxSwift/build/Debug-iphoneos",
);
INSTALL_PATH = "$(LOCAL_LIBRARY_DIR)/Frameworks";
IPHONEOS_DEPLOYMENT_TARGET = 8.0;
@ -602,7 +601,6 @@
"$(inherited)",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Debug-iphoneos",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Release-iphoneos",
"/Users/kzaher/Projects/Rx/RxSwift/build/Debug-iphoneos",
);
INSTALL_PATH = "$(LOCAL_LIBRARY_DIR)/Frameworks";
IPHONEOS_DEPLOYMENT_TARGET = 8.0;
@ -668,7 +666,6 @@
"$(inherited)",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Debug-iphoneos",
"$(USER_LIBRARY_DIR)/Developer/Xcode/DerivedData/Rx-cfkyozdvlaegqibzixjokeysigeo/Build/Products/Release-iphoneos",
"/Users/kzaher/Projects/Rx/RxSwift/build/Debug-iphoneos",
);
INSTALL_PATH = "$(LOCAL_LIBRARY_DIR)/Frameworks";
IPHONEOS_DEPLOYMENT_TARGET = 8.0;
@ -683,6 +680,7 @@
isa = XCBuildConfiguration;
buildSettings = {
CLANG_ENABLE_MODULES = YES;
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;
@ -701,6 +699,7 @@
isa = XCBuildConfiguration;
buildSettings = {
CLANG_ENABLE_MODULES = YES;
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;
@ -720,6 +719,7 @@
isa = XCBuildConfiguration;
buildSettings = {
CLANG_ENABLE_MODULES = YES;
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;

View File

@ -15,7 +15,7 @@ class Dependencies {
let URLSession = NSURLSession.sharedSession()
let backgroundWorkScheduler: ImmediateScheduler
let mainScheduler: DispatchQueueScheduler
let mainScheduler: SerialDispatchQueueScheduler
let wireframe: Wireframe
private init() {

View File

@ -24,9 +24,6 @@ class WikipediaSearchViewController: ViewController {
override func viewDidLoad() {
super.viewDidLoad()
let a = Subject<Int>()
sendNext(a, 1)
let resultsTableView = self.searchDisplayController!.searchResultsTableView
let searchBar = self.searchDisplayController!.searchBar

View File

@ -262,8 +262,8 @@
<rect key="frame" x="0.0" y="0.0" width="55" height="35"/>
<autoresizingMask key="autoresizingMask"/>
<subviews>
<label opaque="NO" userInteractionEnabled="NO" contentMode="left" horizontalHuggingPriority="251" verticalHuggingPriority="251" misplaced="YES" text="Label" lineBreakMode="tailTruncation" baselineAdjustment="alignBaselines" adjustsFontSizeToFit="NO" translatesAutoresizingMaskIntoConstraints="NO" id="vIm-V4-xJI">
<rect key="frame" x="4" y="14.5" width="42" height="20.5"/>
<label opaque="NO" userInteractionEnabled="NO" contentMode="left" horizontalHuggingPriority="251" verticalHuggingPriority="251" text="Label" lineBreakMode="tailTruncation" baselineAdjustment="alignBaselines" adjustsFontSizeToFit="NO" translatesAutoresizingMaskIntoConstraints="NO" id="vIm-V4-xJI">
<rect key="frame" x="6.5" y="7.5" width="42" height="20.5"/>
<fontDescription key="fontDescription" type="system" pointSize="17"/>
<color key="textColor" red="0.0" green="0.0" blue="0.0" alpha="1" colorSpace="calibratedRGB"/>
<nil key="highlightedColor"/>
@ -284,8 +284,8 @@
<rect key="frame" x="0.0" y="0.0" width="107.5" height="25"/>
<autoresizingMask key="autoresizingMask"/>
<subviews>
<label opaque="NO" userInteractionEnabled="NO" contentMode="left" horizontalHuggingPriority="251" verticalHuggingPriority="251" misplaced="YES" text="Label" lineBreakMode="tailTruncation" baselineAdjustment="alignBaselines" adjustsFontSizeToFit="NO" translatesAutoresizingMaskIntoConstraints="NO" id="Dob-Ct-qBk">
<rect key="frame" x="24" y="14.5" width="42" height="20.5"/>
<label opaque="NO" userInteractionEnabled="NO" contentMode="left" horizontalHuggingPriority="251" verticalHuggingPriority="251" text="Label" lineBreakMode="tailTruncation" baselineAdjustment="alignBaselines" adjustsFontSizeToFit="NO" translatesAutoresizingMaskIntoConstraints="NO" id="Dob-Ct-qBk">
<rect key="frame" x="33" y="2.5" width="42" height="20.5"/>
<fontDescription key="fontDescription" type="system" pointSize="17"/>
<color key="textColor" red="0.98431372549999996" green="0.98431372549999996" blue="0.94901960780000005" alpha="1" colorSpace="calibratedRGB"/>
<nil key="highlightedColor"/>

View File

@ -29,8 +29,8 @@
C83CEA321B2495D7006828AC /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = C83CEA311B2495D7006828AC /* TakeWhile.swift */; };
C83CEA331B2495D7006828AC /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = C83CEA311B2495D7006828AC /* TakeWhile.swift */; };
C86AE05C1AE3F0ED00C8A2A6 /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86AE05B1AE3F0ED00C8A2A6 /* Catch.swift */; };
C87CF5FD1B1B7EC300283912 /* DispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F41B1B7EC300283912 /* DispatchQueueScheduler.swift */; };
C87CF5FE1B1B7EC300283912 /* DispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F41B1B7EC300283912 /* DispatchQueueScheduler.swift */; };
C87CF5FD1B1B7EC300283912 /* SerialDispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F41B1B7EC300283912 /* SerialDispatchQueueScheduler.swift */; };
C87CF5FE1B1B7EC300283912 /* SerialDispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F41B1B7EC300283912 /* SerialDispatchQueueScheduler.swift */; };
C87CF6051B1B7EC300283912 /* MainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F81B1B7EC300283912 /* MainScheduler.swift */; };
C87CF6061B1B7EC300283912 /* MainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F81B1B7EC300283912 /* MainScheduler.swift */; };
C87CF6071B1B7EC300283912 /* OperationQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C87CF5F91B1B7EC300283912 /* OperationQueueScheduler.swift */; };
@ -190,6 +190,10 @@
C8AED7AA1B3C2ACA00678DDE /* Skip.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AED7A81B3C2ACA00678DDE /* Skip.swift */; };
C8AED7B21B3C66B000678DDE /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AED7B11B3C66B000678DDE /* ReplaySubject.swift */; };
C8AED7B31B3C66B000678DDE /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AED7B11B3C66B000678DDE /* ReplaySubject.swift */; };
C8AF26E91B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AF26E81B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift */; };
C8AF26EA1B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AF26E81B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift */; };
C8AF26EC1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AF26EB1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift */; };
C8AF26ED1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8AF26EB1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift */; };
C8B35B831B234BF7009851DA /* PeriodicScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B35B821B234BF7009851DA /* PeriodicScheduler.swift */; };
C8B35B841B234BF7009851DA /* PeriodicScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B35B821B234BF7009851DA /* PeriodicScheduler.swift */; };
C8B35B8A1B2393A1009851DA /* SchedulerServices+Emulation.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B35B891B2393A1009851DA /* SchedulerServices+Emulation.swift */; };
@ -236,7 +240,7 @@
C83CEA2E1B23D27E006828AC /* Timer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
C83CEA311B2495D7006828AC /* TakeWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TakeWhile.swift; sourceTree = "<group>"; };
C86AE05B1AE3F0ED00C8A2A6 /* Catch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Catch.swift; sourceTree = "<group>"; };
C87CF5F41B1B7EC300283912 /* DispatchQueueScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DispatchQueueScheduler.swift; sourceTree = "<group>"; };
C87CF5F41B1B7EC300283912 /* SerialDispatchQueueScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SerialDispatchQueueScheduler.swift; sourceTree = "<group>"; };
C87CF5F81B1B7EC300283912 /* MainScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = MainScheduler.swift; sourceTree = "<group>"; };
C87CF5F91B1B7EC300283912 /* OperationQueueScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = OperationQueueScheduler.swift; sourceTree = "<group>"; };
C87CF60F1B1B916500283912 /* ImmediateScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ImmediateScheduler.swift; sourceTree = "<group>"; };
@ -316,6 +320,8 @@
C8A60C3B1AF4191B007923F0 /* Sample.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Sample.swift; sourceTree = "<group>"; };
C8AED7A81B3C2ACA00678DDE /* Skip.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Skip.swift; sourceTree = "<group>"; };
C8AED7B11B3C66B000678DDE /* ReplaySubject.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = "<group>"; };
C8AF26E81B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentDispatchQueueScheduler.swift; sourceTree = "<group>"; };
C8AF26EB1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DispatchQueueSchedulerPriority.swift; sourceTree = "<group>"; };
C8B35B821B234BF7009851DA /* PeriodicScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeriodicScheduler.swift; sourceTree = "<group>"; };
C8B35B891B2393A1009851DA /* SchedulerServices+Emulation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SchedulerServices+Emulation.swift"; sourceTree = "<group>"; };
C8B787F51AF544A200206D02 /* Observable+Debug.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+Debug.swift"; sourceTree = "<group>"; };
@ -358,12 +364,14 @@
C87CF5F31B1B7EC300283912 /* Schedulers */ = {
isa = PBXGroup;
children = (
C83CEA2B1B23B267006828AC /* Scheduler+Extensions.swift */,
C87CF5F41B1B7EC300283912 /* DispatchQueueScheduler.swift */,
C8AF26E81B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift */,
C87CF5F81B1B7EC300283912 /* MainScheduler.swift */,
C87CF5F91B1B7EC300283912 /* OperationQueueScheduler.swift */,
C8B35B891B2393A1009851DA /* SchedulerServices+Emulation.swift */,
C83CEA281B23AFEE006828AC /* RecursiveScheduler.swift */,
C83CEA2B1B23B267006828AC /* Scheduler+Extensions.swift */,
C8B35B891B2393A1009851DA /* SchedulerServices+Emulation.swift */,
C87CF5F41B1B7EC300283912 /* SerialDispatchQueueScheduler.swift */,
C8AF26EB1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift */,
);
path = Schedulers;
sourceTree = "<group>";
@ -371,7 +379,6 @@
C897EC2A1B10D3F4009C2CB0 /* Subjects */ = {
isa = PBXGroup;
children = (
C8AED7B71B3C855A00678DDE /* ReplaySubject.swift */,
C897EC311B10D426009C2CB0 /* BehaviorSubject.swift */,
C897EC2B1B10D3F4009C2CB0 /* ConnectableObservableType.swift */,
C897EC341B10D8F2009C2CB0 /* PublishSubject.swift */,
@ -630,7 +637,7 @@
isa = PBXProject;
attributes = {
LastSwiftUpdateCheck = 0700;
LastUpgradeCheck = 0630;
LastUpgradeCheck = 0640;
ORGANIZATIONNAME = "Krunoslav Zaher";
TargetAttributes = {
C8A56AD61AD7424700B4673B = {
@ -681,7 +688,7 @@
buildActionMask = 2147483647;
files = (
C88BB81C1B07E5ED0064D411 /* ConcatSink.swift in Sources */,
C87CF5FE1B1B7EC300283912 /* DispatchQueueScheduler.swift in Sources */,
C87CF5FE1B1B7EC300283912 /* SerialDispatchQueueScheduler.swift in Sources */,
C80B561A1B2DA441008F915D /* DelaySubscription.swift in Sources */,
C87CF6141B1B916500283912 /* Scheduler.swift in Sources */,
C88BB81D1B07E5ED0064D411 /* Sample.swift in Sources */,
@ -696,6 +703,7 @@
C83CEA2A1B23AFEE006828AC /* RecursiveScheduler.swift in Sources */,
C88BB8241B07E5ED0064D411 /* Observable+Single.swift in Sources */,
C897EC451B1119A4009C2CB0 /* Zip+arity.swift in Sources */,
C8AF26ED1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift in Sources */,
C83CEA2D1B23B267006828AC /* Scheduler+Extensions.swift in Sources */,
C88BB8251B07E5ED0064D411 /* Sink.swift in Sources */,
C88BB8261B07E5ED0064D411 /* SerialDisposable.swift in Sources */,
@ -712,6 +720,7 @@
C8FDC5F61B2B3D280065F8D9 /* BinaryDisposable.swift in Sources */,
C88BB82D1B07E5ED0064D411 /* ScopedDispose.swift in Sources */,
C88BB82E1B07E5ED0064D411 /* Queue.swift in Sources */,
C8AF26EA1B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift in Sources */,
C88BB82F1B07E5ED0064D411 /* AnyObject+Rx.swift in Sources */,
C88BB8301B07E5ED0064D411 /* TailRecursiveSink.swift in Sources */,
C88BB8321B07E5ED0064D411 /* AutoDetachObserver.swift in Sources */,
@ -791,7 +800,7 @@
buildActionMask = 2147483647;
files = (
C8A56B631AD7435900B4673B /* ConcatSink.swift in Sources */,
C87CF5FD1B1B7EC300283912 /* DispatchQueueScheduler.swift in Sources */,
C87CF5FD1B1B7EC300283912 /* SerialDispatchQueueScheduler.swift in Sources */,
C80B56191B2DA441008F915D /* DelaySubscription.swift in Sources */,
C87CF6131B1B916500283912 /* Scheduler.swift in Sources */,
C8A60C3C1AF4191B007923F0 /* Sample.swift in Sources */,
@ -806,6 +815,7 @@
C83CEA291B23AFEE006828AC /* RecursiveScheduler.swift in Sources */,
C8A56B7B1AD7435900B4673B /* Observable+Single.swift in Sources */,
C897EC441B1119A4009C2CB0 /* Zip+arity.swift in Sources */,
C8AF26EC1B49960F00131C03 /* DispatchQueueSchedulerPriority.swift in Sources */,
C83CEA2C1B23B267006828AC /* Scheduler+Extensions.swift in Sources */,
C8A56B6F1AD7435900B4673B /* Sink.swift in Sources */,
C8A56B581AD7435900B4673B /* SerialDisposable.swift in Sources */,
@ -822,6 +832,7 @@
C8FDC5F51B2B3D280065F8D9 /* BinaryDisposable.swift in Sources */,
C814CEA51AF56E6400E98087 /* ScopedDispose.swift in Sources */,
C8A56B511AD7435900B4673B /* Queue.swift in Sources */,
C8AF26E91B4995EE00131C03 /* ConcurrentDispatchQueueScheduler.swift in Sources */,
C8A56B4C1AD7435900B4673B /* AnyObject+Rx.swift in Sources */,
C8A56B721AD7435900B4673B /* TailRecursiveSink.swift in Sources */,
C8A56B811AD7435900B4673B /* AutoDetachObserver.swift in Sources */,
@ -963,6 +974,7 @@
C8633A961B08FA5500375D60 /* Release-Tests */ = {
isa = XCBuildConfiguration;
buildSettings = {
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;
@ -979,6 +991,7 @@
C88BB86F1B07E5ED0064D411 /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;
@ -995,6 +1008,7 @@
C88BB8701B07E5ED0064D411 /* Release */ = {
isa = XCBuildConfiguration;
buildSettings = {
COMBINE_HIDPI_IMAGES = YES;
DEFINES_MODULE = YES;
DYLIB_COMPATIBILITY_VERSION = 1;
DYLIB_CURRENT_VERSION = 1;

View File

@ -13,7 +13,7 @@ class ObserveOnDispatchQueueSink<O: ObserverType> : ScheduledSerialSchedulerObse
var cancel: Disposable
init(scheduler: DispatchQueueScheduler, observer: O, cancel: Disposable) {
init(scheduler: SerialDispatchQueueScheduler, observer: O, cancel: Disposable) {
self.cancel = cancel
super.init(scheduler: scheduler, observer: observer)
}
@ -36,10 +36,10 @@ public var numberOfDispatchQueueObservables: Int32 = 0
#endif
class ObserveOnDispatchQueue<E> : Producer<E> {
let scheduler: DispatchQueueScheduler
let scheduler: SerialDispatchQueueScheduler
let source: Observable<E>
init(source: Observable<E>, scheduler: DispatchQueueScheduler) {
init(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source

View File

@ -9,10 +9,10 @@
import Foundation
class ScheduledSerialSchedulerObserver<O: ObserverType> : ObserverBase<O.Element> {
let scheduler: DispatchQueueScheduler
let scheduler: SerialDispatchQueueScheduler
let observer: O
init(scheduler: DispatchQueueScheduler, observer: O) {
init(scheduler: SerialDispatchQueueScheduler, observer: O) {
self.scheduler = scheduler
self.observer = observer
super.init()

View File

@ -48,7 +48,7 @@ public func observeOn<E>
(scheduler: ImmediateScheduler)
-> Observable<E> -> Observable<E> {
return { source in
if let scheduler = scheduler as? DispatchQueueScheduler {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnDispatchQueue(source: source, scheduler: scheduler)
}
else {

View File

@ -0,0 +1,125 @@
//
// ConcurrentDispatchQueueScheduler.swift
// RxSwift
//
// Created by Krunoslav Zaher on 7/5/15.
// Copyright (c) 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
public typealias TimeInterval = NSTimeInterval
public typealias Time = NSDate
private let queue : dispatch_queue_t
public var now : NSDate {
get {
return NSDate()
}
}
// leeway for scheduling timers
var leeway: Int64 = 0
public init(queue: dispatch_queue_t) {
self.queue = queue
}
// Convenience init for scheduler that wraps one of the global concurrent dispatch queues.
//
// DISPATCH_QUEUE_PRIORITY_DEFAULT
// DISPATCH_QUEUE_PRIORITY_HIGH
// DISPATCH_QUEUE_PRIORITY_LOW
public convenience init(globalConcurrentQueuePriority: DispatchQueueSchedulerPriority) {
var priority: Int = 0
switch globalConcurrentQueuePriority {
case .High:
priority = DISPATCH_QUEUE_PRIORITY_HIGH
case .Default:
priority = DISPATCH_QUEUE_PRIORITY_DEFAULT
case .Low:
priority = DISPATCH_QUEUE_PRIORITY_LOW
}
self.init(queue: dispatch_get_global_queue(priority, UInt(0)))
}
class func convertTimeIntervalToDispatchInterval(timeInterval: NSTimeInterval) -> Int64 {
return Int64(timeInterval * Double(NSEC_PER_SEC))
}
class func convertTimeIntervalToDispatchTime(timeInterval: NSTimeInterval) -> dispatch_time_t {
return dispatch_time(DISPATCH_TIME_NOW, convertTimeIntervalToDispatchInterval(timeInterval))
}
public final func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
let cancel = SingleAssignmentDisposable()
dispatch_async(self.queue) {
if cancel.disposed {
return
}
_ = ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
cancel.disposable = disposable
})
}
return success(cancel)
}
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (/*Scheduler<NSTimeInterval, NSDate>,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue)
let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime)
let compositeDisposable = CompositeDisposable()
dispatch_source_set_timer(timer, dispatchInterval, DISPATCH_TIME_FOREVER, 0)
dispatch_source_set_event_handler(timer, {
if compositeDisposable.disposed {
return
}
ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
compositeDisposable.addDisposable(disposable)
})
})
dispatch_resume(timer)
compositeDisposable.addDisposable(AnonymousDisposable {
dispatch_source_cancel(timer)
})
return success(compositeDisposable)
}
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable> {
let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue)
let initial = MainScheduler.convertTimeIntervalToDispatchTime(startAfter)
let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchInterval(period)
var timerState = state
let validDispatchInterval = dispatchInterval < 0 ? 0 : UInt64(dispatchInterval)
dispatch_source_set_timer(timer, initial, validDispatchInterval, 0)
let cancel = AnonymousDisposable {
dispatch_source_cancel(timer)
}
dispatch_source_set_event_handler(timer, {
if cancel.disposed {
return
}
timerState = action(timerState)
})
dispatch_resume(timer)
return success(cancel)
}
}

View File

@ -0,0 +1,15 @@
//
// DispatchQueueSchedulerPriority.swift
// RxSwift
//
// Created by Krunoslav Zaher on 7/5/15.
// Copyright (c) 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
public enum DispatchQueueSchedulerPriority {
case High
case Default
case Low
}

View File

@ -12,7 +12,7 @@ struct MainSchedulerSingleton {
static let sharedInstance = MainScheduler()
}
public final class MainScheduler : DispatchQueueScheduler {
public final class MainScheduler : SerialDispatchQueueScheduler {
private init() {
super.init(serialQueue: dispatch_get_main_queue())

View File

@ -1,5 +1,5 @@
//
// DispatchQueueScheduler.swift
// SerialQueueScheduler.swift
// Rx
//
// Created by Krunoslav Zaher on 2/8/15.
@ -8,11 +8,8 @@
import Foundation
public enum DispatchQueueSchedulerPriority {
case High
case Default
case Low
}
@availability(*, deprecated=1.7, message="Replaced by `SerialDispatchQueueScheduler`")
public typealias DispatchQueueScheduler = SerialDispatchQueueScheduler
// This is a scheduler that wraps dispatch queue.
// It can wrap both serial and concurrent dispatch queues.
@ -29,7 +26,7 @@ public enum DispatchQueueSchedulerPriority {
// internal serial queue can be customized using `serialQueueConfiguration`
// callback.
//
public class DispatchQueueScheduler: Scheduler, PeriodicScheduler {
public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
public typealias TimeInterval = NSTimeInterval
public typealias Time = NSDate

View File

@ -10,7 +10,7 @@ import Foundation
import XCTest
import RxSwift
class ObservableConcurrencyTest : RxTest {
class ObservableConcurrencyTestBase : RxTest {
var lock = OS_SPINLOCK_INIT
func performLocked(action: () -> Void) {
@ -27,6 +27,9 @@ class ObservableConcurrencyTest : RxTest {
}
}
class ObservableConcurrencyTest : ObservableConcurrencyTestBase {
}
// observeSingleOn
extension ObservableConcurrencyTest {
func testObserveSingleOn_DeadlockSimple() {
@ -188,12 +191,12 @@ extension ObservableConcurrencyTest {
// observeOn serial scheduler
extension ObservableConcurrencyTest {
func runDispatchQueueSchedulerTests(tests: (scheduler: DispatchQueueScheduler) -> Disposable) {
let scheduler = DispatchQueueScheduler(internalSerialQueueName: "testQueue1")
func runDispatchQueueSchedulerTests(tests: (scheduler: SerialDispatchQueueScheduler) -> Disposable) {
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "testQueue1")
let _ = runDispatchQueueSchedulerTests(scheduler, tests: tests) >- scopedDispose
}
func runDispatchQueueSchedulerTests(scheduler: DispatchQueueScheduler, tests: (scheduler: DispatchQueueScheduler) -> Disposable) -> Disposable {
func runDispatchQueueSchedulerTests(scheduler: SerialDispatchQueueScheduler, tests: (scheduler: SerialDispatchQueueScheduler) -> Disposable) -> Disposable {
// simplest possible solution, even though it has horrible efficiency in this case probably
var wait = OS_SPINLOCK_INIT
OSSpinLockLock(&wait)
@ -210,8 +213,8 @@ extension ObservableConcurrencyTest {
return disposable
}
func runDispatchQueueSchedulerMultiplexedTests(tests: [(scheduler: DispatchQueueScheduler) -> Disposable]) {
let scheduler = DispatchQueueScheduler(internalSerialQueueName: "testQueue1")
func runDispatchQueueSchedulerMultiplexedTests(tests: [(scheduler: SerialDispatchQueueScheduler) -> Disposable]) {
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "testQueue1")
let compositeDisposable = CompositeDisposable()
@ -458,28 +461,32 @@ extension ObservableConcurrencyTest {
}
// observeOn concurrent scheduler
extension ObservableConcurrencyTest {
func runConcurentSchedulerTest(test: (scheduler: OperationQueueScheduler) -> Disposable) {
class ObservableConcurrentSchedulerConcurrencyTest: ObservableConcurrencyTestBase {
func createScheduler() -> ImmediateScheduler {
let operationQueue = NSOperationQueue()
operationQueue.maxConcurrentOperationCount = 8
let scheduler = OperationQueueScheduler(operationQueue: operationQueue)
return OperationQueueScheduler(operationQueue: operationQueue)
}
func runConcurentSchedulerTest(test: (scheduler: ImmediateScheduler) -> Disposable) {
let scheduler = createScheduler()
let d1 = runConcurentSchedulerTest(scheduler, test: test) >- scopedDispose
}
func runConcurentSchedulerTest(scheduler: OperationQueueScheduler, test: (scheduler: OperationQueueScheduler) -> Disposable) -> Disposable {
func runConcurentSchedulerTest(scheduler: ImmediateScheduler, test: (scheduler: ImmediateScheduler) -> Disposable) -> Disposable {
let disposable = test(scheduler: scheduler)
scheduler.operationQueue.waitUntilAllOperationsAreFinished()
//scheduler.operationQueue.waitUntilAllOperationsAreFinished()
// stupid solution, but works
NSThread.sleepForTimeInterval(0.01)
return disposable
}
func runConcurentSchedulerMutiplexedTests(tests: [(scheduler: OperationQueueScheduler) -> Disposable]) {
let operationQueue = NSOperationQueue()
operationQueue.maxConcurrentOperationCount = 8
let scheduler = OperationQueueScheduler(operationQueue: operationQueue)
func runConcurentSchedulerMutiplexedTests(tests: [(scheduler: ImmediateScheduler) -> Disposable]) {
let scheduler = createScheduler()
let compositeDisposable = CompositeDisposable()
@ -757,6 +764,12 @@ extension ObservableConcurrencyTest {
}
}
class ObservableConcurrentSchedulerConcurrencyTest2 : ObservableConcurrentSchedulerConcurrencyTest {
override func createScheduler() -> ImmediateScheduler {
return ConcurrentDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
}
}
// subscribeOn
extension ObservableConcurrencyTest {

View File

@ -744,7 +744,7 @@ extension ObservableTimeTest {
}
func testInterval_TimeSpan_Zero_DefaultScheduler() {
var scheduler = DispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
var scheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let observer = PrimitiveMockObserver<Int64>()

View File

@ -327,7 +327,7 @@
isa = PBXProject;
attributes = {
LastSwiftUpdateCheck = 0700;
LastUpgradeCheck = 0630;
LastUpgradeCheck = 0640;
TargetAttributes = {
C811081F1AF50E11001C13E4 = {
CreatedOnToolsVersion = 6.3;
@ -469,6 +469,7 @@
buildSettings = {
EMBEDDED_CONTENT_CONTAINS_SWIFT = YES;
INFOPLIST_FILE = Tests/Info.plist;
ONLY_ACTIVE_ARCH = YES;
OTHER_LDFLAGS = (
"-framework",
XCTest,
@ -641,6 +642,7 @@
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COMBINE_HIDPI_IMAGES = YES;
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
ENABLE_NS_ASSERTIONS = NO;
@ -682,6 +684,7 @@
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COMBINE_HIDPI_IMAGES = YES;
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
ENABLE_STRICT_OBJC_MSGSEND = YES;
@ -729,6 +732,7 @@
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COMBINE_HIDPI_IMAGES = YES;
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
ENABLE_NS_ASSERTIONS = NO;