Blocking operators run runloop while blocking.

This commit is contained in:
Krunoslav Zaher 2015-11-05 12:34:20 +01:00
parent e39f5dbce1
commit d4cda2430e
4 changed files with 135 additions and 109 deletions

View File

@ -358,6 +358,10 @@
C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254121B8A752B00B02D69 /* UITableView+Rx.swift */; };
C88254351B8A752B00B02D69 /* UITextField+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254131B8A752B00B02D69 /* UITextField+Rx.swift */; };
C88254361B8A752B00B02D69 /* UITextView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254141B8A752B00B02D69 /* UITextView+Rx.swift */; };
C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; };
C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; };
C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; };
C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; };
C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; };
C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; };
C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; };
@ -1001,6 +1005,7 @@
C88254131B8A752B00B02D69 /* UITextField+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextField+Rx.swift"; sourceTree = "<group>"; };
C88254141B8A752B00B02D69 /* UITextView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextView+Rx.swift"; sourceTree = "<group>"; };
C88BB8711B07E5ED0064D411 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RunLoopLock.swift; sourceTree = "<group>"; };
C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BlockingObservable.swift; sourceTree = "<group>"; };
C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "BlockingObservable+Operators.swift"; sourceTree = "<group>"; };
C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = ShareReplay1.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
@ -1400,6 +1405,7 @@
C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */,
C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */,
C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */,
C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */,
C8093F591B8A73A20088E94D /* README.md */,
);
path = RxBlocking;
@ -2121,6 +2127,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */,
C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */,
C8941BE41BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */,
C8093F5E1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */,
@ -2131,6 +2138,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */,
C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */,
C8941BE51BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */,
C8093F5F1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */,
@ -2595,6 +2603,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */,
C8941BE21BD5695C00A0E874 /* BlockingObservable.swift in Sources */,
C8941BE71BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */,
C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */,
@ -2803,6 +2812,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */,
C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */,
C8941BE61BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */,
D2EBEB8A1BB9B9EE003A27DC /* ObservableConvertibleType+Blocking.swift in Sources */,

View File

@ -13,43 +13,38 @@ import Foundation
extension BlockingObservable {
/**
Blocks current thread until sequence terminates.
Blocks current thread until sequence terminates.
If sequence terminates with error, terminating error will be thrown.
If sequence terminates with error, terminating error will be thrown.
- returns: All elements of sequence.
*/
- returns: All elements of sequence.
*/
public func toArray() throws -> [E] {
let condition = NSCondition()
var elements: [E] = Array<E>()
var error: ErrorType?
var ended = false
let lock = RunLoopLock()
_ = self.source.subscribe { e in
switch e {
case .Next(let element):
elements.append(element)
case .Error(let e):
error = e
condition.lock()
ended = true
condition.signal()
condition.unlock()
case .Completed:
condition.lock()
ended = true
condition.signal()
condition.unlock()
let d = SingleAssignmentDisposable()
lock.dispatch {
d.disposable = self.source.subscribe { e in
switch e {
case .Next(let element):
elements.append(element)
case .Error(let e):
error = e
lock.stop()
case .Completed:
lock.stop()
}
}
}
condition.lock()
while !ended {
condition.wait()
}
condition.unlock()
lock.run()
d.dispose()
if let error = error {
throw error
@ -61,48 +56,42 @@ extension BlockingObservable {
extension BlockingObservable {
/**
Blocks current thread until sequence produces first element.
Blocks current thread until sequence produces first element.
If sequence terminates with error before producing first element, terminating error will be thrown.
If sequence terminates with error before producing first element, terminating error will be thrown.
- returns: First element of sequence. If sequence is empty `nil` is returned.
*/
- returns: First element of sequence. If sequence is empty `nil` is returned.
*/
public func first() throws -> E? {
let condition = NSCondition()
var element: E?
var error: ErrorType?
var ended = false
let d = SingleAssignmentDisposable()
d.disposable = self.source.subscribe { e in
switch e {
case .Next(let e):
if element == nil {
element = e
let lock = RunLoopLock()
lock.dispatch {
d.disposable = self.source.subscribe { e in
switch e {
case .Next(let e):
if element == nil {
element = e
}
break
case .Error(let e):
error = e
default:
break
}
break
case .Error(let e):
error = e
default:
break
lock.stop()
}
condition.lock()
ended = true
condition.signal()
condition.unlock()
}
condition.lock()
while !ended {
condition.wait()
}
lock.run()
d.dispose()
condition.unlock()
if let error = error {
throw error
@ -114,46 +103,40 @@ extension BlockingObservable {
extension BlockingObservable {
/**
Blocks current thread until sequence terminates.
Blocks current thread until sequence terminates.
If sequence terminates with error, terminating error will be thrown.
If sequence terminates with error, terminating error will be thrown.
- returns: Last element in the sequence. If sequence is empty `nil` is returned.
*/
- returns: Last element in the sequence. If sequence is empty `nil` is returned.
*/
public func last() throws -> E? {
let condition = NSCondition()
var element: E?
var error: ErrorType?
var ended = false
let d = SingleAssignmentDisposable()
d.disposable = self.source.subscribe { e in
switch e {
case .Next(let e):
element = e
return
case .Error(let e):
error = e
default:
break
let lock = RunLoopLock()
lock.dispatch {
d.disposable = self.source.subscribe { e in
switch e {
case .Next(let e):
element = e
return
case .Error(let e):
error = e
default:
break
}
lock.stop()
}
condition.lock()
ended = true
condition.signal()
condition.unlock()
}
condition.lock()
while !ended {
condition.wait()
}
lock.run()
d.dispose()
condition.unlock()
if let error = error {
throw error

View File

@ -0,0 +1,33 @@
//
// RunLoopLock.swift
// Rx
//
// Created by Krunoslav Zaher on 11/5/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class RunLoopLock : NSObject {
let currentRunLoop: CFRunLoopRef
override init() {
currentRunLoop = CFRunLoopGetCurrent()
}
func dispatch(action: () -> ()) {
CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode, action)
CFRunLoopWakeUp(currentRunLoop)
}
func stop() {
CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode) {
CFRunLoopStop(self.currentRunLoop)
}
CFRunLoopWakeUp(currentRunLoop)
}
func run() {
CFRunLoopRun()
}
}

View File

@ -30,8 +30,8 @@ extension ObservableBlockingTest {
try (failWith(testError) as Observable<Int>).toBlocking().toArray()
XCTFail("It should fail")
}
catch {
catch let e {
XCTAssertTrue(e as NSError === testError)
}
}
@ -67,8 +67,8 @@ extension ObservableBlockingTest {
try (failWith(testError) as Observable<Int>).toBlocking().first()
XCTFail()
}
catch {
catch let e {
XCTAssertTrue(e as NSError === testError)
}
}
@ -104,8 +104,8 @@ extension ObservableBlockingTest {
try (failWith(testError) as Observable<Int>).toBlocking().last()
XCTFail()
}
catch {
catch let e {
XCTAssertTrue(e as NSError === testError)
}
}