Scheduler revamp (moving towards protocol extensions), internal optimizations, interface cleanup.

This commit is contained in:
Krunoslav Zaher 2015-08-31 10:59:37 +02:00
parent 1f0f5b9788
commit 6ac94ea5b6
44 changed files with 933 additions and 998 deletions

View File

@ -65,8 +65,6 @@
C8093CFE1B8A72BE0088E94D /* Observable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C681B8A72BE0088E94D /* Observable.swift */; };
C8093CFF1B8A72BE0088E94D /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6B1B8A72BE0088E94D /* Amb.swift */; };
C8093D001B8A72BE0088E94D /* Amb.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6B1B8A72BE0088E94D /* Amb.swift */; };
C8093D011B8A72BE0088E94D /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6C1B8A72BE0088E94D /* AnonymousObservable.swift */; };
C8093D021B8A72BE0088E94D /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6C1B8A72BE0088E94D /* AnonymousObservable.swift */; };
C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6D1B8A72BE0088E94D /* AsObservable.swift */; };
C8093D051B8A72BE0088E94D /* Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C6E1B8A72BE0088E94D /* Catch.swift */; };
@ -99,8 +97,6 @@
C8093D241B8A72BE0088E94D /* Merge.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C7D1B8A72BE0088E94D /* Merge.swift */; };
C8093D251B8A72BE0088E94D /* Multicast.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C7E1B8A72BE0088E94D /* Multicast.swift */; };
C8093D261B8A72BE0088E94D /* Multicast.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C7E1B8A72BE0088E94D /* Multicast.swift */; };
C8093D271B8A72BE0088E94D /* ObservableBase.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C7F1B8A72BE0088E94D /* ObservableBase.swift */; };
C8093D281B8A72BE0088E94D /* ObservableBase.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C7F1B8A72BE0088E94D /* ObservableBase.swift */; };
C8093D291B8A72BE0088E94D /* ObserveOn.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C801B8A72BE0088E94D /* ObserveOn.swift */; };
C8093D2A1B8A72BE0088E94D /* ObserveOn.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C801B8A72BE0088E94D /* ObserveOn.swift */; };
C8093D2B1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */; };
@ -185,8 +181,6 @@
C8093D7C1B8A72BE0088E94D /* ObserverType+Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAA1B8A72BE0088E94D /* ObserverType+Extensions.swift */; };
C8093D7D1B8A72BE0088E94D /* ObserverType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAB1B8A72BE0088E94D /* ObserverType.swift */; };
C8093D7E1B8A72BE0088E94D /* ObserverType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAB1B8A72BE0088E94D /* ObserverType.swift */; };
C8093D7F1B8A72BE0088E94D /* PeriodicScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAC1B8A72BE0088E94D /* PeriodicScheduler.swift */; };
C8093D801B8A72BE0088E94D /* PeriodicScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAC1B8A72BE0088E94D /* PeriodicScheduler.swift */; };
C8093D851B8A72BE0088E94D /* Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAF1B8A72BE0088E94D /* Rx.swift */; };
C8093D861B8A72BE0088E94D /* Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CAF1B8A72BE0088E94D /* Rx.swift */; };
C8093D871B8A72BE0088E94D /* RxBox.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CB01B8A72BE0088E94D /* RxBox.swift */; };
@ -320,6 +314,18 @@
C8A468CB1B8A894100BF917B /* RxSwift.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C88BB8711B07E5ED0064D411 /* RxSwift.framework */; };
C8C3D9FE1B935EDF004D233E /* Zip+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */; settings = {ASSET_TAGS = (); }; };
C8C3D9FF1B935EDF004D233E /* Zip+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA031B9390C4004D233E /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA021B9390C4004D233E /* Just.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA041B9390C4004D233E /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA021B9390C4004D233E /* Just.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA061B9393AC004D233E /* Empty.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA051B9393AC004D233E /* Empty.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA071B9393AC004D233E /* Empty.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA051B9393AC004D233E /* Empty.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA091B93941E004D233E /* FailWith.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA081B93941E004D233E /* FailWith.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA0A1B93941E004D233E /* FailWith.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA081B93941E004D233E /* FailWith.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA0C1B93959F004D233E /* Never.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA0B1B93959F004D233E /* Never.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA0D1B93959F004D233E /* Never.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA0B1B93959F004D233E /* Never.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA0F1B939767004D233E /* CurrentThreadScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA101B939767004D233E /* CurrentThreadScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA121B93A3EA004D233E /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */; settings = {ASSET_TAGS = (); }; };
C8C3DA131B93A3EA004D233E /* AnonymousObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */; settings = {ASSET_TAGS = (); }; };
/* End PBXBuildFile section */
/* Begin PBXFileReference section */
@ -359,7 +365,6 @@
C8093C671B8A72BE0088E94D /* Observable+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+Extensions.swift"; sourceTree = "<group>"; };
C8093C681B8A72BE0088E94D /* Observable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Observable.swift; sourceTree = "<group>"; };
C8093C6B1B8A72BE0088E94D /* Amb.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Amb.swift; sourceTree = "<group>"; };
C8093C6C1B8A72BE0088E94D /* AnonymousObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AnonymousObservable.swift; sourceTree = "<group>"; };
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AsObservable.swift; sourceTree = "<group>"; };
C8093C6E1B8A72BE0088E94D /* Catch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Catch.swift; sourceTree = "<group>"; };
C8093C6F1B8A72BE0088E94D /* CombineLatest+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+arity.swift"; sourceTree = "<group>"; };
@ -377,7 +382,6 @@
C8093C7C1B8A72BE0088E94D /* Map.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Map.swift; sourceTree = "<group>"; };
C8093C7D1B8A72BE0088E94D /* Merge.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Merge.swift; sourceTree = "<group>"; };
C8093C7E1B8A72BE0088E94D /* Multicast.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Multicast.swift; sourceTree = "<group>"; };
C8093C7F1B8A72BE0088E94D /* ObservableBase.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObservableBase.swift; sourceTree = "<group>"; };
C8093C801B8A72BE0088E94D /* ObserveOn.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserveOn.swift; sourceTree = "<group>"; };
C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserveOnSerialDispatchQueue.swift; sourceTree = "<group>"; };
C8093C821B8A72BE0088E94D /* ObserveSingleOn.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserveSingleOn.swift; sourceTree = "<group>"; };
@ -421,7 +425,6 @@
C8093CA91B8A72BE0088E94D /* TailRecursiveSink.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TailRecursiveSink.swift; sourceTree = "<group>"; };
C8093CAA1B8A72BE0088E94D /* ObserverType+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObserverType+Extensions.swift"; sourceTree = "<group>"; };
C8093CAB1B8A72BE0088E94D /* ObserverType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserverType.swift; sourceTree = "<group>"; };
C8093CAC1B8A72BE0088E94D /* PeriodicScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeriodicScheduler.swift; sourceTree = "<group>"; };
C8093CAF1B8A72BE0088E94D /* Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Rx.swift; sourceTree = "<group>"; };
C8093CB01B8A72BE0088E94D /* RxBox.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxBox.swift; sourceTree = "<group>"; };
C8093CB11B8A72BE0088E94D /* RxResult.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxResult.swift; sourceTree = "<group>"; };
@ -510,6 +513,12 @@
C88BB8711B07E5ED0064D411 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8A56AD71AD7424700B4673B /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Zip+CollectionType.swift"; sourceTree = "<group>"; };
C8C3DA021B9390C4004D233E /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = "<group>"; };
C8C3DA051B9393AC004D233E /* Empty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Empty.swift; sourceTree = "<group>"; };
C8C3DA081B93941E004D233E /* FailWith.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = FailWith.swift; sourceTree = "<group>"; };
C8C3DA0B1B93959F004D233E /* Never.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Never.swift; sourceTree = "<group>"; };
C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CurrentThreadScheduler.swift; sourceTree = "<group>"; };
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AnonymousObservable.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@ -580,7 +589,6 @@
C8093CA01B8A72BE0088E94D /* ObserverOf.swift */,
C8093CAB1B8A72BE0088E94D /* ObserverType.swift */,
C8093CAA1B8A72BE0088E94D /* ObserverType+Extensions.swift */,
C8093CAC1B8A72BE0088E94D /* PeriodicScheduler.swift */,
C8093CAF1B8A72BE0088E94D /* Rx.swift */,
C8093CB01B8A72BE0088E94D /* RxBox.swift */,
C8093CB11B8A72BE0088E94D /* RxResult.swift */,
@ -658,7 +666,6 @@
isa = PBXGroup;
children = (
C8093C6B1B8A72BE0088E94D /* Amb.swift */,
C8093C6C1B8A72BE0088E94D /* AnonymousObservable.swift */,
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */,
C8093C6E1B8A72BE0088E94D /* Catch.swift */,
C8093C711B8A72BE0088E94D /* CombineLatest.swift */,
@ -677,7 +684,6 @@
C8093C7C1B8A72BE0088E94D /* Map.swift */,
C8093C7D1B8A72BE0088E94D /* Merge.swift */,
C8093C7E1B8A72BE0088E94D /* Multicast.swift */,
C8093C7F1B8A72BE0088E94D /* ObservableBase.swift */,
C8093C801B8A72BE0088E94D /* ObserveOn.swift */,
C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */,
C8093C821B8A72BE0088E94D /* ObserveSingleOn.swift */,
@ -700,6 +706,11 @@
C8093C921B8A72BE0088E94D /* Zip+arity.swift */,
C8093C931B8A72BE0088E94D /* Zip+arity.tt */,
C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */,
C8C3DA021B9390C4004D233E /* Just.swift */,
C8C3DA051B9393AC004D233E /* Empty.swift */,
C8C3DA081B93941E004D233E /* FailWith.swift */,
C8C3DA0B1B93959F004D233E /* Never.swift */,
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -730,6 +741,7 @@
C8093CBA1B8A72BE0088E94D /* Scheduler+Extensions.swift */,
C8093CBB1B8A72BE0088E94D /* SchedulerServices+Emulation.swift */,
C8093CBC1B8A72BE0088E94D /* SerialDispatchQueueScheduler.swift */,
C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */,
);
path = Schedulers;
sourceTree = "<group>";
@ -1296,13 +1308,14 @@
C8093CDA1B8A72BE0088E94D /* BooleanDisposable.swift in Sources */,
C8093D5A1B8A72BE0088E94D /* Observable+Creation.swift in Sources */,
C8093CCC1B8A72BE0088E94D /* ConnectableObservableType.swift in Sources */,
C8C3DA041B9390C4004D233E /* Just.swift in Sources */,
C8093CE61B8A72BE0088E94D /* NopDisposable.swift in Sources */,
C8093CD41B8A72BE0088E94D /* Disposable.swift in Sources */,
C8093CC41B8A72BE0088E94D /* AnyObject+Rx.swift in Sources */,
C8093CEE1B8A72BE0088E94D /* SingleAssignmentDisposable.swift in Sources */,
C8093D681B8A72BE0088E94D /* Observer.swift in Sources */,
C8C3DA0A1B93941E004D233E /* FailWith.swift in Sources */,
C8093D781B8A72BE0088E94D /* ScheduledObserver.swift in Sources */,
C8093D801B8A72BE0088E94D /* PeriodicScheduler.swift in Sources */,
C8093D9C1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */,
C8093D6A1B8A72BE0088E94D /* ObserverOf.swift in Sources */,
C8093D3C1B8A72BE0088E94D /* Skip.swift in Sources */,
@ -1310,6 +1323,7 @@
C8093D2E1B8A72BE0088E94D /* ObserveSingleOn.swift in Sources */,
C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA071B9393AC004D233E /* Empty.swift in Sources */,
C8093D761B8A72BE0088E94D /* SafeObserver.swift in Sources */,
C8093D881B8A72BE0088E94D /* RxBox.swift in Sources */,
C8093D3A1B8A72BE0088E94D /* Sink.swift in Sources */,
@ -1329,10 +1343,8 @@
C8093D121B8A72BE0088E94D /* ConnectableObservable.swift in Sources */,
C8093D621B8A72BE0088E94D /* Observable+StandardSequenceOperators.swift in Sources */,
C8093D1A1B8A72BE0088E94D /* DistinctUntilChanged.swift in Sources */,
C8093D021B8A72BE0088E94D /* AnonymousObservable.swift in Sources */,
C8093D561B8A72BE0088E94D /* Observable+Binding.swift in Sources */,
C8093D8A1B8A72BE0088E94D /* RxResult.swift in Sources */,
C8093D281B8A72BE0088E94D /* ObservableBase.swift in Sources */,
C8093D7A1B8A72BE0088E94D /* TailRecursiveSink.swift in Sources */,
C8093CC81B8A72BE0088E94D /* AsyncLock.swift in Sources */,
C8093D9A1B8A72BE0088E94D /* Scheduler+Extensions.swift in Sources */,
@ -1341,6 +1353,7 @@
C8093D361B8A72BE0088E94D /* Sample.swift in Sources */,
C8093CEA1B8A72BE0088E94D /* ScopedDispose.swift in Sources */,
C8093D261B8A72BE0088E94D /* Multicast.swift in Sources */,
C8C3DA101B939767004D233E /* CurrentThreadScheduler.swift in Sources */,
C8093D861B8A72BE0088E94D /* Rx.swift in Sources */,
C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */,
C8093DA61B8A72BE0088E94D /* SubjectType.swift in Sources */,
@ -1358,6 +1371,7 @@
C8093D981B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */,
C8093D381B8A72BE0088E94D /* Scan.swift in Sources */,
C8093CD21B8A72BE0088E94D /* Queue.swift in Sources */,
C8C3DA131B93A3EA004D233E /* AnonymousObservable.swift in Sources */,
C8093D201B8A72BE0088E94D /* FlatMap.swift in Sources */,
C8093CE01B8A72BE0088E94D /* DisposeBase.swift in Sources */,
C8093CD61B8A72BE0088E94D /* AnonymousDisposable.swift in Sources */,
@ -1398,6 +1412,7 @@
C8093CFE1B8A72BE0088E94D /* Observable.swift in Sources */,
C8093CE21B8A72BE0088E94D /* NAryDisposable.swift in Sources */,
C8093CEC1B8A72BE0088E94D /* SerialDisposable.swift in Sources */,
C8C3DA0D1B93959F004D233E /* Never.swift in Sources */,
C8093D721B8A72BE0088E94D /* NopObserver.swift in Sources */,
C8093D7C1B8A72BE0088E94D /* ObserverType+Extensions.swift in Sources */,
C8093CF61B8A72BE0088E94D /* Event.swift in Sources */,
@ -1412,13 +1427,14 @@
C8093CD91B8A72BE0088E94D /* BooleanDisposable.swift in Sources */,
C8093D591B8A72BE0088E94D /* Observable+Creation.swift in Sources */,
C8093CCB1B8A72BE0088E94D /* ConnectableObservableType.swift in Sources */,
C8C3DA031B9390C4004D233E /* Just.swift in Sources */,
C8093CE51B8A72BE0088E94D /* NopDisposable.swift in Sources */,
C8093CD31B8A72BE0088E94D /* Disposable.swift in Sources */,
C8093CC31B8A72BE0088E94D /* AnyObject+Rx.swift in Sources */,
C8093CED1B8A72BE0088E94D /* SingleAssignmentDisposable.swift in Sources */,
C8093D671B8A72BE0088E94D /* Observer.swift in Sources */,
C8C3DA091B93941E004D233E /* FailWith.swift in Sources */,
C8093D771B8A72BE0088E94D /* ScheduledObserver.swift in Sources */,
C8093D7F1B8A72BE0088E94D /* PeriodicScheduler.swift in Sources */,
C8093D9B1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */,
C8093D691B8A72BE0088E94D /* ObserverOf.swift in Sources */,
C8093D3B1B8A72BE0088E94D /* Skip.swift in Sources */,
@ -1426,6 +1442,7 @@
C8093D2D1B8A72BE0088E94D /* ObserveSingleOn.swift in Sources */,
C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA061B9393AC004D233E /* Empty.swift in Sources */,
C8093D751B8A72BE0088E94D /* SafeObserver.swift in Sources */,
C8093D871B8A72BE0088E94D /* RxBox.swift in Sources */,
C8093D391B8A72BE0088E94D /* Sink.swift in Sources */,
@ -1445,10 +1462,8 @@
C8093D111B8A72BE0088E94D /* ConnectableObservable.swift in Sources */,
C8093D611B8A72BE0088E94D /* Observable+StandardSequenceOperators.swift in Sources */,
C8093D191B8A72BE0088E94D /* DistinctUntilChanged.swift in Sources */,
C8093D011B8A72BE0088E94D /* AnonymousObservable.swift in Sources */,
C8093D551B8A72BE0088E94D /* Observable+Binding.swift in Sources */,
C8093D891B8A72BE0088E94D /* RxResult.swift in Sources */,
C8093D271B8A72BE0088E94D /* ObservableBase.swift in Sources */,
C8093D791B8A72BE0088E94D /* TailRecursiveSink.swift in Sources */,
C8093CC71B8A72BE0088E94D /* AsyncLock.swift in Sources */,
C8093D991B8A72BE0088E94D /* Scheduler+Extensions.swift in Sources */,
@ -1457,6 +1472,7 @@
C8093D351B8A72BE0088E94D /* Sample.swift in Sources */,
C8093CE91B8A72BE0088E94D /* ScopedDispose.swift in Sources */,
C8093D251B8A72BE0088E94D /* Multicast.swift in Sources */,
C8C3DA0F1B939767004D233E /* CurrentThreadScheduler.swift in Sources */,
C8093D851B8A72BE0088E94D /* Rx.swift in Sources */,
C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */,
C8093DA51B8A72BE0088E94D /* SubjectType.swift in Sources */,
@ -1474,6 +1490,7 @@
C8093D971B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */,
C8093D371B8A72BE0088E94D /* Scan.swift in Sources */,
C8093CD11B8A72BE0088E94D /* Queue.swift in Sources */,
C8C3DA121B93A3EA004D233E /* AnonymousObservable.swift in Sources */,
C8093D1F1B8A72BE0088E94D /* FlatMap.swift in Sources */,
C8093CDF1B8A72BE0088E94D /* DisposeBase.swift in Sources */,
C8093CD51B8A72BE0088E94D /* AnonymousDisposable.swift in Sources */,
@ -1514,6 +1531,7 @@
C8093CFD1B8A72BE0088E94D /* Observable.swift in Sources */,
C8093CE11B8A72BE0088E94D /* NAryDisposable.swift in Sources */,
C8093CEB1B8A72BE0088E94D /* SerialDisposable.swift in Sources */,
C8C3DA0C1B93959F004D233E /* Never.swift in Sources */,
C8093D711B8A72BE0088E94D /* NopObserver.swift in Sources */,
C8093D7B1B8A72BE0088E94D /* ObserverType+Extensions.swift in Sources */,
C8093CF51B8A72BE0088E94D /* Event.swift in Sources */,

View File

@ -12,8 +12,7 @@ import RxSwift
#endif
extension ObservableType {
public func toArray()
-> RxResult<[E]> {
public func toArray() throws -> [E] {
let condition = NSCondition()
var elements = [E]()
@ -46,15 +45,15 @@ extension ObservableType {
condition.unlock()
if let error = error {
return failure(error)
throw error
}
return success(elements)
return elements
}
}
extension ObservableType {
public var first: RxResult<E?> {
public func first() throws -> E? {
let condition = NSCondition()
var element: E?
@ -92,15 +91,15 @@ extension ObservableType {
condition.unlock()
if let error = error {
return failure(error)
throw error
}
return success(element)
return element
}
}
extension ObservableType {
public var last: RxResult<E?> {
public func last() throws -> E? {
let condition = NSCondition()
var element: E?
@ -136,9 +135,9 @@ extension ObservableType {
condition.unlock()
if let error = error {
return failure(error)
throw error
}
return success(element)
return element
}
}

View File

@ -11,7 +11,7 @@ import Foundation
import RxSwift
#endif
class KVOObservable<Element> : Producer<Element?>
class KVOObservable<Element> : _Producer<Element?>
, KVOObservableProtocol {
unowned var target: AnyObject
var strongTarget: AnyObject?

File diff suppressed because it is too large Load Diff

View File

@ -8,26 +8,11 @@
import Foundation
public let NopDisposableResult = success(NopDisposable.instance)
// Disposable that does nothing
// Nop = No Operation
public class NopDisposable : Disposable {
struct Internal {
static let instance = NopDisposable()
}
public class var instance: Disposable {
get {
return Internal.instance
}
}
/*
public class func Instance() -> Disposable {
return Internal.instance
}*/
public static let instance: Disposable = NopDisposable()
init() {

View File

@ -37,7 +37,7 @@ public class ScheduledDisposable : Cancelable {
public func dispose() {
scheduler.schedule(()) {
self.disposeInner()
return NopDisposableResult
return NopDisposable.instance
}
}

View File

@ -10,14 +10,15 @@ import Foundation
public protocol ImmediateScheduler {
func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable>
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
}
public func scheduleRecursively<State>(scheduler: ImmediateScheduler, state: State,
action: (state: State, recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateSchedulerOf(action: action, scheduler: scheduler)
recursiveScheduler.schedule(state)
return recursiveScheduler
}
extension ImmediateScheduler {
public func scheduleRecursively<State>(state: State, action: (state: State, recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateSchedulerOf(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return recursiveScheduler
}
}

View File

@ -75,30 +75,8 @@ extension ObservableType {
}
public extension ObservableType {
/*
Observables can really be anything, implemented by anyone and hooked into large `Observable` chains.
Some of them maybe have flawed implementations that don't respect Rx message grammar.
To guard from rogue `Observable`s and `Observer`s Rx internal classes have safeguards in place.
Those safeguards will ensure that those rogue `Observables` or `Observers` don't cause
havoc in the system.
Unfortunately, that comes with significant performance penalty. To improve overall performance
internal Rx classes can drop their safety mechanisms when talking with other known implementations.
`Producers` are special kind of observables that need to make sure that message grammar is respected.
*/
// All internal subscribe calls go through this method
public func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
if let source = self as? Producer<O.E> {
return source.subscribeRaw(observer, enableSafeguard: false)
}
if let source = self as? ObservableBase<O.E> {
return source.subscribe(observer)
}
return self.subscribe(observer)
}
}

View File

@ -7,17 +7,53 @@
//
import Foundation
import Darwin
public class AnonymousObservable<Element> : ObservableBase<Element> {
class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
var isStopped: Int32 = 0
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<E>) {
switch event {
case .Next:
if isStopped == 1 {
return
}
self.observer?.on(event)
case .Error:
fallthrough
case .Completed:
if OSAtomicCompareAndSwap32(0, 1, &isStopped) {
self.observer?.on(event)
self.dispose()
}
}
}
func run(parent: Parent) -> Disposable {
return parent.subscribeHandler(ObserverOf(self))
}
}
public class AnonymousObservable<Element> : Producer<Element> {
public typealias SubscribeHandler = (ObserverOf<Element>) -> Disposable
public let subscribeHandler: SubscribeHandler
public init(_ subscribeHandler: SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
public override func subscribeCore(observer: ObserverOf<Element>) -> Disposable {
return subscribeHandler(observer)
public override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
setSink(sink)
return sink.run(self)
}
}

View File

@ -44,9 +44,8 @@ class DelaySubscription<Element, S: Scheduler>: Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DelaySubscriptionSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
let scheduledDisposable = scheduler.scheduleRelative((), dueTime: dueTime) { _ in
return success(self.source.subscribeSafe(sink))
return scheduler.scheduleRelative((), dueTime: dueTime) { _ in
return self.source.subscribeSafe(sink)
}
return getScheduledDisposable(scheduledDisposable)
}
}

View File

@ -0,0 +1,20 @@
//
// Empty.swift
// Rx
//
// Created by Krunoslav Zaher on 8/30/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class Empty<Element> : Producer<Element> {
override init() {
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}

View File

@ -0,0 +1,22 @@
//
// FailWith.swift
// Rx
//
// Created by Krunoslav Zaher on 8/30/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class FailWith<Element> : Producer<Element> {
let error: ErrorType
init(error: ErrorType) {
self.error = error
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
observer.on(.Error(error))
return NopDisposable.instance
}
}

View File

@ -0,0 +1,23 @@
//
// Just.swift
// Rx
//
// Created by Krunoslav Zaher on 8/30/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class Just<Element> : Producer<Element> {
let element: Element
init(element: Element) {
self.element = element
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
observer.on(.Next(element))
observer.on(.Completed)
return NopDisposable.instance
}
}

View File

@ -0,0 +1,15 @@
//
// Never.swift
// Rx
//
// Created by Krunoslav Zaher on 8/30/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class Never<Element> : Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
return NopDisposable.instance
}
}

View File

@ -1,25 +0,0 @@
//
// ObservableBase.swift
// Rx
//
// Created by Krunoslav Zaher on 2/15/15.
// Copyright (c) 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
public class ObservableBase<Element> : Observable<Element> {
override public func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let autoDetachObserver = AutoDetachObserver(observer: observer)
let disposable = subscribeCore(ObserverOf(autoDetachObserver))
autoDetachObserver.setDisposable(disposable)
return autoDetachObserver
}
func subscribeCore(observer: ObserverOf<Element>) -> Disposable {
return abstractMethod()
}
}

View File

@ -74,7 +74,7 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
}
if shouldStart {
scheduleDisposable.disposable = scheduleRecursively(self.scheduler, state: (), action: self.run)
scheduleDisposable.disposable = self.scheduler.scheduleRecursively((), action: self.run)
}
}

View File

@ -25,14 +25,14 @@ class ObserveOnSerialDispatchQueueSink<O: ObserverType> : ObserverBase<O.E> {
}
override func onCore(event: Event<E>) {
self.scheduler.schedule(()) { (_) -> RxResult<Disposable> in
self.scheduler.schedule(()) { (_) -> Disposable in
send(self.observer, event)
if event.isStopEvent {
self.dispose()
}
return NopDisposableResult
return NopDisposable.instance
}
}

View File

@ -61,7 +61,7 @@ class ObserveSingleOnObserver<O: ObserverType> : Sink<O>, ObserverType {
self.dispose()
return NopDisposableResult
return NopDisposable.instance
}
}
}

View File

@ -8,26 +8,18 @@
import Foundation
// Base class for implementation of query operators, providing performance benefits over the use of `create`
// They are also responsible for ensuring correct message grammar and disposal grammar.
//
// That assumption enables big performance gain, but rogue `Producers` can bring system into
// an invalid state.
//
// For that reason, extreme care is needed when subclassing `Producer`. It's code correctness must be proven
// and it needs to be thorougly tested.
public class Producer<Element> : Observable<Element> {
public class _Producer<Element> : Producer<Element> {
public override init() {
super.init()
}
}
public class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
return self.subscribeRaw(observer, enableSafeguard: true)
}
public func subscribeRaw<O : ObserverType where O.E == Element>(observer: O, enableSafeguard: Bool) -> Disposable {
let _: Observer<Element>
let sink = SingleAssignmentDisposable()
let subscription = SingleAssignmentDisposable()

View File

@ -101,12 +101,12 @@ class SkipTimeSink<ElementType, S: Scheduler, O: ObserverType where O.E == Eleme
func run() -> Disposable {
let disposeTimer = parent.scheduler.scheduleRelative((), dueTime: self.parent.duration) {
self.tick()
return NopDisposableResult
return NopDisposable.instance
}
let disposeSubscription = parent.source.subscribeSafe(self)
return BinaryDisposable(disposeTimer.get(), disposeSubscription)
return BinaryDisposable(disposeTimer, disposeSubscription)
}
}

View File

@ -33,14 +33,12 @@ class SubscribeOnSink<O: ObserverType> : Sink<O>, ObserverType {
disposeEverything.disposable = cancelSchedule
let scheduleResult = parent.scheduler.schedule(()) { (_) -> RxResult<Disposable> in
cancelSchedule.disposable = parent.scheduler.schedule(()) { (_) -> Disposable in
let subscription = self.parent.source.subscribeSafe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return NopDisposableResult
return NopDisposable.instance
}
cancelSchedule.disposable = getScheduledDisposable(scheduleResult)
return disposeEverything
}
}

View File

@ -104,12 +104,12 @@ class TakeTimeSink<ElementType, S: Scheduler, O: ObserverType where O.E == Eleme
func run() -> Disposable {
let disposeTimer = parent.scheduler.scheduleRelative((), dueTime: self.parent.duration) {
self.tick()
return NopDisposableResult
return NopDisposable.instance
}
let disposeSubscription = parent.source.subscribeSafe(self)
return BinaryDisposable(disposeTimer.get(), disposeSubscription)
return BinaryDisposable(disposeTimer, disposeSubscription)
}
}

View File

@ -17,7 +17,7 @@ class ThrottleSink<O: ObserverType, SchedulerType: Scheduler> : Sink<O>, Observe
var lock = NSRecursiveLock()
// state
var id = 0 as UInt64
let value = RxMutableBox<Element?>(nil)
var value: Element? = nil
let cancellable = SerialDisposable()
@ -46,19 +46,19 @@ class ThrottleSink<O: ObserverType, SchedulerType: Scheduler> : Sink<O>, Observe
let latestId = self.lock.calculateLocked { () -> UInt64 in
let observer = self.observer
let oldValue = self.value.value
let oldValue = self.value
self.id = self.id &+ 1
switch event {
case .Next(let element):
self.value.value = element
self.value = element
case .Error:
self.value.value = nil
self.value = nil
observer?.on(event)
self.dispose()
case .Completed:
self.value.value = nil
self.value = nil
if let value = oldValue {
observer?.on(.Next(value))
}
@ -78,27 +78,20 @@ class ThrottleSink<O: ObserverType, SchedulerType: Scheduler> : Sink<O>, Observe
let scheduler = self.parent.scheduler
let dueTime = self.parent.dueTime
let _ = scheduler.scheduleRelative(latestId, dueTime: dueTime) { (id) in
let disposeTimer = scheduler.scheduleRelative(latestId, dueTime: dueTime) { (id) in
self.propagate()
return NopDisposableResult
}.map { disposeTimer -> Disposable in
d.disposable = disposeTimer
return disposeTimer
}.recoverWith { e -> RxResult<Disposable> in
self.lock.performLocked {
observer?.on(.Error(e))
self.dispose()
}
return NopDisposableResult
return NopDisposable.instance
}
d.disposable = disposeTimer
default: break
}
}
func propagate() {
let originalValue: Element? = self.lock.calculateLocked {
let originalValue = self.value.value
self.value.value = nil
let originalValue = self.value
self.value = nil
return originalValue
}

View File

@ -19,12 +19,10 @@ class TimerSink<S: Scheduler, O: ObserverType where O.E == Int64> : Sink<O> {
}
func run() -> Disposable {
let result = self.parent.schedulePeriodic(state: 0, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
return self.parent.scheduler.schedulePeriodic(0 as Int64, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.observer?.on(.Next(state))
return state &+ 1
}
return result
}
}
@ -39,39 +37,26 @@ class TimerOneOffSink<S: Scheduler, O: ObserverType where O.E == Int64> : Sink<O
}
func run() -> Disposable {
let result = self.parent.scheduler.scheduleRelative((), dueTime: self.parent.dueTime) { (_) -> RxResult<Disposable> in
return self.parent.scheduler.scheduleRelative((), dueTime: self.parent.dueTime) { (_) -> Disposable in
self.observer?.on(.Next(0))
self.observer?.on(.Completed)
return NopDisposableResult
return NopDisposable.instance
}
ensureScheduledSuccessfully(result.map { _ in () })
return result.get()
}
}
class Timer<S: Scheduler>: Producer<Int64> {
typealias TimeInterval = S.TimeInterval
typealias SchedulePeriodic = (
state: Int64,
startAfter: S.TimeInterval,
period: S.TimeInterval,
action: (state: Int64) -> Int64
) -> Disposable
let scheduler: S
let dueTime: TimeInterval
let period: TimeInterval?
let schedulePeriodic: SchedulePeriodic
init(dueTime: TimeInterval, period: TimeInterval?, scheduler: S, schedulePeriodic: SchedulePeriodic) {
init(dueTime: TimeInterval, period: TimeInterval?, scheduler: S) {
self.scheduler = scheduler
self.dueTime = dueTime
self.period = period
self.schedulePeriodic = schedulePeriodic
}
override func run<O : ObserverType where O.E == Int64>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {

View File

@ -17,28 +17,19 @@ public func create<E>(subscribe: (ObserverOf<E>) -> Disposable) -> Observable<E>
// empty
public func empty<E>() -> Observable<E> {
return AnonymousObservable { observer in
sendCompleted(observer)
return NopDisposable.instance
}
return Empty<E>()
}
// never
public func never<E>() -> Observable<E> {
return AnonymousObservable { observer in
return NopDisposable.instance
}
return Never()
}
// return
public func just<E>(element: E) -> Observable<E> {
return AnonymousObservable { observer in
sendNext(observer, element)
sendCompleted(observer)
return NopDisposable.instance
}
return Just(element: element)
}
public func sequenceOf<E>(elements: E ...) -> Observable<E> {
@ -66,10 +57,7 @@ public func from<E, S where S: SequenceType, S.Generator.Element == E>(sequence:
// fail
public func failWith<E>(error: ErrorType) -> Observable<E> {
return AnonymousObservable { observer in
sendError(observer, error)
return NopDisposable.instance
}
return FailWith(error: error)
}
// defer

View File

@ -40,45 +40,22 @@ extension ObservableType {
// interval
// fallback {
public func interval<S: Scheduler>(period: S.TimeInterval, _ scheduler: S)
-> Observable<Int64> {
return Timer(dueTime: period,
period: period,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
scheduler: scheduler
)
}
// }
// periodic schedulers {
public func interval<S: PeriodicScheduler>(period: S.TimeInterval, _ scheduler: S)
-> Observable<Int64> {
return Timer(dueTime: period,
period: period,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
)
}
// }
// timer
// fallback {
public func timer<S: Scheduler>(dueTime: S.TimeInterval, _ period: S.TimeInterval, scheduler: S)
-> Observable<Int64> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
scheduler: scheduler
)
}
@ -87,37 +64,10 @@ public func timer<S: Scheduler>(dueTime: S.TimeInterval, scheduler: S)
return Timer(
dueTime: dueTime,
period: nil,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
scheduler: scheduler
)
}
// }
// periodic schedulers {
public func timer<S: PeriodicScheduler>(dueTime: S.TimeInterval, _ period: S.TimeInterval, scheduler: S)
-> Observable<Int64> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
)
}
public func timer<S: PeriodicScheduler>(dueTime: S.TimeInterval, scheduler: S)
-> Observable<Int64> {
return Timer(
dueTime: dueTime,
period: nil,
scheduler: scheduler,
schedulePeriodic: abstractSchedulePeriodic(scheduler)
)
}
// }
// take
extension ObservableType {

View File

@ -1,13 +0,0 @@
//
// PeriodicScheduler.swift
// RxSwift
//
// Created by Krunoslav Zaher on 6/6/15.
// Copyright (c) 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
public protocol PeriodicScheduler : Scheduler {
func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable>
}

View File

@ -16,45 +16,8 @@ public protocol Scheduler: ImmediateScheduler {
get
}
func scheduleRelative<StateType>(state: StateType, dueTime: TimeInterval, action: (StateType) -> RxResult<Disposable>) -> RxResult<Disposable>
func scheduleRelative<StateType>(state: StateType, dueTime: TimeInterval, action: (StateType) -> Disposable) -> Disposable
func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable
}
// This is being called every time `Rx` scheduler performs action to
// check the result of the computation.
//
// The default implementation will throw an Exception if the result failed.
//
// It's probably best to make sure all of the errors have been handled before
// the computation finishes, but it's not unreasonable to change the implementation
// for release builds to silently fail (although I would not recommend it).
//
// Changing default behavior is not recommended because possible data corruption
// is "usually" a lot worse than letting the program crash.
//
func ensureScheduledSuccessfully(result: RxResult<Void>) -> RxResult<Void> {
switch result {
case .Failure(let error):
return errorDuringScheduledAction(error);
default: break
}
return SuccessResult
}
func getScheduledDisposable(disposable: RxResult<Disposable>) -> Disposable {
switch disposable {
case .Failure(let error):
errorDuringScheduledAction(error);
return NopDisposable.instance
default:
return disposable.get()
}
}
func errorDuringScheduledAction(error: ErrorType) -> RxResult<Void> {
let exception = NSException(name: "ScheduledActionError", reason: "Error happened during scheduled action execution", userInfo: ["error": error as! AnyObject])
exception.raise()
return SuccessResult
}

View File

@ -8,7 +8,7 @@
import Foundation
public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
public class ConcurrentDispatchQueueScheduler: Scheduler {
public typealias TimeInterval = NSTimeInterval
public typealias Time = NSDate
@ -53,11 +53,11 @@ public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
return dispatch_time(DISPATCH_TIME_NOW, convertTimeIntervalToDispatchInterval(timeInterval))
}
public final func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
public final func schedule<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
func scheduleInternal<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
dispatch_async(self.queue) {
@ -65,15 +65,13 @@ public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
return
}
_ = ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
cancel.disposable = disposable
})
cancel.disposable = action(state)
}
return success(cancel)
return cancel
}
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (/*Scheduler<NSTimeInterval, NSDate>,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable {
let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue)
let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime)
@ -85,9 +83,7 @@ public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
if compositeDisposable.disposed {
return
}
ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
compositeDisposable.addDisposable(disposable)
})
compositeDisposable.addDisposable(action(state))
})
dispatch_resume(timer)
@ -95,7 +91,7 @@ public class ConcurrentDispatchQueueScheduler: Scheduler, PeriodicScheduler {
dispatch_source_cancel(timer)
})
return success(compositeDisposable)
return compositeDisposable
}
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable> {

View File

@ -0,0 +1,39 @@
//
// CurrentThreadScheduler.swift
// Rx
//
// Created by Krunoslav Zaher on 8/30/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
let CurrentThreadSchedulerKeyInstance = CurrentThreadSchedulerKey()
class CurrentThreadSchedulerKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerKeyInstance
}
override var hashValue: Int { return -904739208 }
override func copy() -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
func copyWithZone(zone: NSZone) -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
}
// WIP
class CurrentThreadScheduler : ImmediateScheduler {
static var isScheduleRequired: Bool {
return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil
}
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
return NopDisposable.instance
}
}

View File

@ -22,7 +22,7 @@ public final class MainScheduler : SerialDispatchQueueScheduler {
}
}
override func scheduleInternal<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
override func scheduleInternal<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
if NSThread.currentThread().isMainThread {
return action(state)
}

View File

@ -15,7 +15,7 @@ public class OperationQueueScheduler: ImmediateScheduler {
self.operationQueue = operationQueue
}
public func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
let compositeDisposable = CompositeDisposable()
@ -26,10 +26,8 @@ public class OperationQueueScheduler: ImmediateScheduler {
return
}
ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
compositeDisposableWeak?.addDisposable(disposable)
return ()
})
let disposable = action(state)
compositeDisposableWeak?.addDisposable(disposable)
}
self.operationQueue.addOperation(operation)
@ -38,7 +36,7 @@ public class OperationQueueScheduler: ImmediateScheduler {
operation.cancel()
})
return success(compositeDisposable)
return compositeDisposable
}
}

View File

@ -16,11 +16,11 @@ class RecursiveScheduler<State, S: Scheduler>: RecursiveSchedulerOf<State, S.Tim
super.init(action: action)
}
override func scheduleRelativeAdapter(state: State, dueTime: S.TimeInterval, action: (State) -> RxResult<Disposable>) -> RxResult<Disposable> {
override func scheduleRelativeAdapter(state: State, dueTime: S.TimeInterval, action: State -> Disposable) -> Disposable {
return scheduler.scheduleRelative(state, dueTime: dueTime, action: action)
}
override func scheduleAdapter(state: State, action: (State) -> RxResult<Disposable>) -> RxResult<Disposable> {
override func scheduleAdapter(state: State, action: State -> Disposable) -> Disposable {
return scheduler.schedule(state, action: action)
}
}
@ -42,11 +42,11 @@ public class RecursiveSchedulerOf<State, TimeInterval> : Disposable {
// abstract methods
func scheduleRelativeAdapter(state: State, dueTime: TimeInterval, action: (State) -> RxResult<Disposable>) -> RxResult<Disposable> {
func scheduleRelativeAdapter(state: State, dueTime: TimeInterval, action: State -> Disposable) -> Disposable {
return abstractMethod()
}
func scheduleAdapter(state: State, action: (State) -> RxResult<Disposable>) -> RxResult<Disposable> {
func scheduleAdapter(state: State, action: State -> Disposable) -> Disposable {
return abstractMethod()
}
@ -58,10 +58,10 @@ public class RecursiveSchedulerOf<State, TimeInterval> : Disposable {
var isDone = false
var removeKey: CompositeDisposable.DisposeKey? = nil
let d = scheduleRelativeAdapter(state, dueTime: dueTime) { (state) -> RxResult<Disposable> in
let d = scheduleRelativeAdapter(state, dueTime: dueTime) { (state) -> Disposable in
// best effort
if self.group.disposed {
return NopDisposableResult
return NopDisposable.instance
}
let action = self.lock.calculateLocked { () -> Action? in
@ -79,18 +79,15 @@ public class RecursiveSchedulerOf<State, TimeInterval> : Disposable {
action(state: state, scheduler: self)
}
return NopDisposableResult
return NopDisposable.instance
}
ensureScheduledSuccessfully(d.map { disposable in
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d.get())
isAdded = true
}
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d)
isAdded = true
}
return ()
})
}
}
// immediate scheduling
@ -101,10 +98,10 @@ public class RecursiveSchedulerOf<State, TimeInterval> : Disposable {
var isDone = false
var removeKey: CompositeDisposable.DisposeKey? = nil
let d = scheduleAdapter(state) { (state) -> RxResult<Disposable> in
let d = scheduleAdapter(state) { (state) -> Disposable in
// best effort
if self.group.disposed {
return NopDisposableResult
return NopDisposable.instance
}
let action = self.lock.calculateLocked { () -> Action? in
@ -122,18 +119,15 @@ public class RecursiveSchedulerOf<State, TimeInterval> : Disposable {
action(state: state, scheduler: self)
}
return NopDisposableResult
return NopDisposable.instance
}
ensureScheduledSuccessfully(d.map { disposable in
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d.get())
isAdded = true
}
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d)
isAdded = true
}
return ()
})
}
}
public func dispose() {
@ -166,10 +160,10 @@ class RecursiveImmediateSchedulerOf<State> : Disposable {
var isDone = false
var removeKey: CompositeDisposable.DisposeKey? = nil
let d = self.scheduler.schedule(state) { (state) -> RxResult<Disposable> in
let d = self.scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self.group.disposed {
return NopDisposableResult
return NopDisposable.instance
}
let action = self.lock.calculateLocked { () -> Action? in
@ -187,18 +181,15 @@ class RecursiveImmediateSchedulerOf<State> : Disposable {
action(state: state, recurse: self.schedule)
}
return NopDisposableResult
return NopDisposable.instance
}
ensureScheduledSuccessfully(d.map { disposable in
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d.get())
isAdded = true
}
lock.performLocked {
if !isDone {
removeKey = group.addDisposable(d)
isAdded = true
}
return ()
})
}
}
func dispose() {

View File

@ -8,54 +8,19 @@
import Foundation
// periodic scheduling
// Compiler will choose correct implementation depending on scheduler capabilities.
//
// If scheduler has periodic scheduling capabilities, it will choose them.
// Fallback is periodic recursive scheduler SchedulePeriodicRecursive.
func abstractSchedulePeriodic<State, S: PeriodicScheduler>(
scheduler: S
)
-> (
state: State,
startAfter: S.TimeInterval,
period: S.TimeInterval,
action: (state: State) -> State
) -> Disposable {
return { state, startAfter, period, action in
let result = scheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
ensureScheduledSuccessfully(result.map { _ in () })
return result.get()
}
}
func abstractSchedulePeriodic<State, S: Scheduler>(
scheduler: S
)
-> (
state: State,
startAfter: S.TimeInterval,
period: S.TimeInterval,
action: (state: State) -> State
) -> Disposable {
return { state, startAfter, period, action in
let schedule = SchedulePeriodicRecursive(scheduler: scheduler, startAfter: startAfter, period: period, action: action, state: state)
extension Scheduler {
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
let schedule = SchedulePeriodicRecursive(scheduler: self, startAfter: startAfter, period: period, action: action, state: state)
return schedule.start()
}
func scheduleRecursive<State>(state: State, dueTime: TimeInterval, action: (state: State, scheduler: RecursiveSchedulerOf<State, TimeInterval>) -> Void) -> Disposable {
let scheduler = RecursiveScheduler(scheduler: self, action: action)
scheduler.schedule(state, dueTime: dueTime)
return scheduler
}
}
// recursive scheduling
func scheduleRecursive<State, S: Scheduler>(scheduler: S, _ state: State, _ dueTime: S.TimeInterval,
_ action: (state: State, scheduler: RecursiveSchedulerOf<State, S.TimeInterval>) -> Void) -> Disposable {
let scheduler = RecursiveScheduler<State, S>(scheduler: scheduler, action: action)
scheduler.schedule(state, dueTime: dueTime)
return scheduler
}

View File

@ -36,7 +36,7 @@ class SchedulePeriodicRecursive<State, S: Scheduler> {
}
func start() -> Disposable {
return scheduleRecursive(scheduler, SchedulePeriodicRecursiveCommand.Tick, self.startAfter, self.tick)
return scheduler.scheduleRecursive(SchedulePeriodicRecursiveCommand.Tick, dueTime: self.startAfter, action: self.tick)
}
func tick(command: SchedulePeriodicRecursiveCommand, scheduler: RecursiveScheduler) -> Void {

View File

@ -23,7 +23,7 @@ import Foundation
// internal serial queue can be customized using `serialQueueConfiguration`
// callback.
//
public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
public class SerialDispatchQueueScheduler: Scheduler {
public typealias TimeInterval = NSTimeInterval
public typealias Time = NSDate
@ -90,11 +90,11 @@ public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
return dispatch_time(DISPATCH_TIME_NOW, convertTimeIntervalToDispatchInterval(timeInterval))
}
public final func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
public final func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
func scheduleInternal<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
dispatch_async(self.serialQueue) {
@ -102,15 +102,14 @@ public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
return
}
_ = ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
cancel.disposable = disposable
})
cancel.disposable = action(state)
}
return success(cancel)
return cancel
}
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (/*Scheduler<NSTimeInterval, NSDate>,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
public final func scheduleRelative<StateType>(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable {
let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.serialQueue)
let dispatchInterval = MainScheduler.convertTimeIntervalToDispatchTime(dueTime)
@ -122,9 +121,7 @@ public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
if compositeDisposable.disposed {
return
}
ensureScheduledSuccessfully(action(/*self,*/ state).map { disposable in
compositeDisposable.addDisposable(disposable)
})
compositeDisposable.addDisposable(action(state))
})
dispatch_resume(timer)
@ -132,10 +129,10 @@ public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
dispatch_source_cancel(timer)
})
return success(compositeDisposable)
return compositeDisposable
}
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable> {
public func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.serialQueue)
let initial = MainScheduler.convertTimeIntervalToDispatchTime(startAfter)
@ -157,6 +154,6 @@ public class SerialDispatchQueueScheduler: Scheduler, PeriodicScheduler {
})
dispatch_resume(timer)
return success(cancel)
return cancel
}
}

View File

@ -37,7 +37,7 @@ class ColdObservable<Element: Equatable>: Observable<Element> {
for recordedEvent in recordedEvents {
testScheduler.scheduleRelative((), dueTime: recordedEvent.time, action: { (Int) in
self.observers.forEach { $0.on(recordedEvent.event) }
return NopDisposableResult
return NopDisposable.instance
})
}

View File

@ -32,7 +32,7 @@ class HotObservable<Element : Equatable> : Observable<Element> {
for recordedEvent in recordedEvents {
testScheduler.schedule((), time: recordedEvent.time) { t in
self.observers.forEach { $0.on(recordedEvent.event) }
return NopDisposableResult
return NopDisposable.instance
}
}
}

View File

@ -21,14 +21,6 @@ func createObserver<E>(scheduler: TestScheduler) -> MockObserver<E> {
return MockObserver(scheduler: scheduler)
}
class PeriodicTestScheduler : TestScheduler, PeriodicScheduler {
override init(initialClock: Time) {
super.init(initialClock: initialClock)
}
}
class TestScheduler : VirtualTimeSchedulerBase {
override init(initialClock: Time) {
@ -50,7 +42,7 @@ class TestScheduler : VirtualTimeSchedulerBase {
func scheduleAt(time: Time, action: () -> Void) {
self.schedule((), time: time) { _ in
action()
return NopDisposableResult
return NopDisposable.instance
}
}
@ -63,17 +55,17 @@ class TestScheduler : VirtualTimeSchedulerBase {
self.schedule(state, time: created) { (state) in
source = create()
return NopDisposableResult
return NopDisposable.instance
}
self.schedule(state, time: subscribed) { (state) in
subscription = source!.subscribe(observer)
return NopDisposableResult
return NopDisposable.instance
}
self.schedule(state, time: disposed) { (state) in
subscription!.dispose()
return NopDisposableResult
return NopDisposable.instance
}
start()

View File

@ -18,7 +18,7 @@ protocol ScheduledItemProtocol : Cancelable {
}
class ScheduledItem<T> : ScheduledItemProtocol {
typealias Action = (/*Scheduler<Int, Int>,*/ T) -> RxResult<Disposable>
typealias Action = T -> Disposable
let action: Action
let state: T
@ -39,7 +39,7 @@ class ScheduledItem<T> : ScheduledItemProtocol {
}
func invoke() {
self.disposable.disposable = (action(/*scheduler,*/ state).get())
self.disposable.disposable = action(state)
}
func dispose() {
@ -74,17 +74,17 @@ class VirtualTimeSchedulerBase : Scheduler, CustomStringConvertible {
self.enabled = false
}
func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
return self.scheduleRelative(state, dueTime: 0) { /*s,*/ a in
return action(/*s,*/ a)
func schedule<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
return self.scheduleRelative(state, dueTime: 0) { a in
return action(a)
}
}
func scheduleRelative<StateType>(state: StateType, dueTime: Int, action: (/*Scheduler<Int, Int>,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
func scheduleRelative<StateType>(state: StateType, dueTime: Int, action: StateType -> Disposable) -> Disposable {
return schedule(state, time: now + dueTime, action: action)
}
func schedule<StateType>(state: StateType, time: Int, action: (/*Scheduler<Int, Int>,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable> {
func schedule<StateType>(state: StateType, time: Int, action: StateType -> Disposable) -> Disposable {
let compositeDisposable = CompositeDisposable()
let scheduleTime: Int
@ -101,10 +101,10 @@ class VirtualTimeSchedulerBase : Scheduler, CustomStringConvertible {
compositeDisposable.addDisposable(item)
return success(compositeDisposable)
return compositeDisposable
}
func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable> {
func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable {
let compositeDisposable = CompositeDisposable()
let scheduleTime: Int
@ -117,7 +117,7 @@ class VirtualTimeSchedulerBase : Scheduler, CustomStringConvertible {
let item = ScheduledItem(action: { [unowned self] state in
if compositeDisposable.disposed {
return NopDisposableResult
return NopDisposable.instance
}
let nextState = action(state)
return self.schedulePeriodic(nextState, startAfter: period, period: period, action: action)
@ -127,7 +127,7 @@ class VirtualTimeSchedulerBase : Scheduler, CustomStringConvertible {
compositeDisposable.addDisposable(item)
return success(compositeDisposable)
return compositeDisposable
}
func start() {

View File

@ -24,7 +24,7 @@ extension AnonymousObservableTests {
var elements = [Int]()
let d = a .subscribeNext { n in
let d = a.subscribeNext { n in
elements.append(n)
}
@ -48,7 +48,7 @@ extension AnonymousObservableTests {
var elements = [Int]()
let d = a .subscribeNext { n in
let d = a.subscribeNext { n in
elements.append(n)
}
@ -72,7 +72,7 @@ extension AnonymousObservableTests {
var elements = [Int]()
let d = a .subscribeNext { n in
let d = a.subscribeNext { n in
elements.append(n)
}

View File

@ -22,29 +22,35 @@ class ObservableBlockingTest : RxTest {
extension ObservableBlockingTest {
func testToArray_empty() {
XCTAssert(((empty() as Observable<Int>).toArray()).get() == [])
XCTAssert(try! (empty() as Observable<Int>).toArray() == [])
}
func testToArray_return() {
XCTAssert((just(42).toArray()).get() == [42])
XCTAssert(try! just(42).toArray() == [42])
}
func testToArray_fail() {
XCTAssert(((failWith(testError) as Observable<Int>).toArray()).isFailure)
do {
try (failWith(testError) as Observable<Int>).toArray()
XCTFail("It should fail")
}
catch {
}
}
func testToArray_someData() {
XCTAssert((sequenceOf(42, 43, 44, 45).toArray()).get() == [42, 43, 44, 45])
XCTAssert(try! sequenceOf(42, 43, 44, 45).toArray() == [42, 43, 44, 45])
}
func testToArray_withRealScheduler() {
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let array = interval(0.001, scheduler)
let array = try! interval(0.001, scheduler)
.take(10)
.toArray()
XCTAssert(array.get() == Array(0..<10))
XCTAssert(array == Array(0..<10))
}
}
@ -52,29 +58,35 @@ extension ObservableBlockingTest {
extension ObservableBlockingTest {
func testFirst_empty() {
XCTAssert(((empty() as Observable<Int>).first).get() == nil)
XCTAssert(try! (empty() as Observable<Int>).first() == nil)
}
func testFirst_return() {
XCTAssert((just(42).first).get() == 42)
XCTAssert(try! just(42).first() == 42)
}
func testFirst_fail() {
XCTAssert(((failWith(testError) as Observable<Int>).first).isFailure)
do {
try (failWith(testError) as Observable<Int>).first()
XCTFail()
}
catch {
}
}
func testFirst_someData() {
XCTAssert((sequenceOf(42, 43, 44, 45).first).get() == 42)
XCTAssert(try! sequenceOf(42, 43, 44, 45).first() == 42)
}
func testFirst_withRealScheduler() {
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let array = interval(0.001, scheduler)
let array = try! interval(0.001, scheduler)
.take(10)
.first
.first()
XCTAssert(array.get() == 0)
XCTAssert(array == 0)
}
}
@ -82,29 +94,35 @@ extension ObservableBlockingTest {
extension ObservableBlockingTest {
func testLast_empty() {
XCTAssert(((empty() as Observable<Int>).last).get() == nil)
XCTAssert(try! (empty() as Observable<Int>).last() == nil)
}
func testLast_return() {
XCTAssert((just(42).last).get() == 42)
XCTAssert(try! just(42).last() == 42)
}
func testLast_fail() {
XCTAssert(((failWith(testError) as Observable<Int>).last).isFailure)
do {
try (failWith(testError) as Observable<Int>).last()
XCTFail()
}
catch {
}
}
func testLast_someData() {
XCTAssert((sequenceOf(42, 43, 44, 45).last).get() == 45)
XCTAssert(try! sequenceOf(42, 43, 44, 45).last() == 45)
}
func testLast_withRealScheduler() {
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let array = interval(0.001, scheduler)
let array = try! interval(0.001, scheduler)
.take(10)
.last
.last()
XCTAssert(array.get() == 9)
XCTAssert(array == 9)
}
}

View File

@ -209,7 +209,7 @@ extension ObservableConcurrencyTest {
scheduler.schedule(()) { s in
OSSpinLockUnlock(&wait)
return NopDisposableResult
return NopDisposable.instance
}
OSSpinLockLock(&wait)
@ -500,7 +500,7 @@ class ObservableConcurrentSchedulerConcurrencyTest: ObservableConcurrencyTestBas
var writtenStarted = 0
var writtenEnded = 0
var concurrent = { () -> RxResult<Disposable> in
var concurrent = { () -> Disposable in
self.performLocked {
events.append("Started")
}
@ -527,14 +527,14 @@ class ObservableConcurrentSchedulerConcurrencyTest: ObservableConcurrencyTestBas
stop.on(.Completed)
return NopDisposableResult
return NopDisposable.instance
}
_ = scheduler.schedule((), action: concurrent)
_ = scheduler.schedule((), action: concurrent)
let _ = stop.last
let _ = try! stop.last()
XCTAssertEqual(events, ["Started", "Started", "Ended", "Ended"])
}

View File

@ -270,14 +270,14 @@ extension ObservableTimeTest {
let start = NSDate()
let a = from([just(0), never()]).concat()
let a = try! from([just(0), never()]).concat()
.throttle(2.0, scheduler)
.first
.first()
let end = NSDate()
XCTAssertEqualWithAccuracy(2, end.timeIntervalSinceDate(start), accuracy: 0.5)
XCTAssertEqual(a.get()!, 0)
XCTAssertEqual(a, 0)
}
}
@ -742,7 +742,7 @@ extension ObservableTimeTest {
}
func testInterval_TimeSpan_Zero() {
let scheduler = PeriodicTestScheduler(initialClock: 0)
let scheduler = TestScheduler(initialClock: 0)
let res = scheduler.start(210) {
interval(0, scheduler)
@ -783,7 +783,7 @@ extension ObservableTimeTest {
scheduler.schedule(()) { _ in
OSSpinLockUnlock(&lock)
return NopDisposableResult
return NopDisposable.instance
}
// wait until dispatch queue cleans it's resources
@ -812,14 +812,14 @@ extension ObservableTimeTest {
let start = NSDate()
let a = interval(1, scheduler)
let a = try! interval(1, scheduler)
.take(2)
.toArray()
let end = NSDate()
XCTAssertEqualWithAccuracy(2, end.timeIntervalSinceDate(start), accuracy: 0.3)
XCTAssertEqual(a.get(), [0, 1])
XCTAssertEqual(a, [0, 1])
}
}