Tidy up locking work

This commit is contained in:
freak4pc 2020-10-02 11:20:59 +03:00 committed by Shai Mishali
parent 9d93141582
commit 62e6139209
24 changed files with 301 additions and 375 deletions

View File

@ -16,21 +16,8 @@ typealias SpinLock = RecursiveLock
extension RecursiveLock : Lock {
@inline(__always)
final func performLocked(_ action: () -> Void) {
self.lock(); defer { self.unlock() }
action()
}
@inline(__always)
final func calculateLocked<T>(_ action: () -> T) -> T {
final func performLocked<T>(_ action: () -> T) -> T {
self.lock(); defer { self.unlock() }
return action()
}
@inline(__always)
final func calculateLockedOrFail<T>(_ action: () throws -> T) throws -> T {
self.lock(); defer { self.unlock() }
let result = try action()
return result
}
}

View File

@ -6,16 +6,11 @@
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
protocol LockOwnerType : class, Lock {
protocol LockOwnerType: class, Lock {
var lock: RecursiveLock { get }
}
extension LockOwnerType {
func lock() {
self.lock.lock()
}
func unlock() {
self.lock.unlock()
}
func lock() { self.lock.lock() }
func unlock() { self.lock.unlock() }
}

View File

@ -22,8 +22,7 @@ public final class CompositeDisposable : DisposeBase, Cancelable {
private var disposables: Bag<Disposable>? = Bag()
public var isDisposed: Bool {
self.lock.lock(); defer { self.lock.unlock() }
return self.disposables == nil
self.lock.performLocked { self.disposables == nil }
}
public override init() {
@ -82,16 +81,15 @@ public final class CompositeDisposable : DisposeBase, Cancelable {
}
private func _insert(_ disposable: Disposable) -> DisposeKey? {
self.lock.lock(); defer { self.lock.unlock() }
let bagKey = self.disposables?.insert(disposable)
return bagKey.map(DisposeKey.init)
self.lock.performLocked {
let bagKey = self.disposables?.insert(disposable)
return bagKey.map(DisposeKey.init)
}
}
/// - returns: Gets the number of disposables contained in the `CompositeDisposable`.
public var count: Int {
self.lock.lock(); defer { self.lock.unlock() }
return self.disposables?.count ?? 0
self.lock.performLocked { self.disposables?.count ?? 0 }
}
/// Removes and disposes the disposable identified by `disposeKey` from the CompositeDisposable.
@ -102,8 +100,7 @@ public final class CompositeDisposable : DisposeBase, Cancelable {
}
private func _remove(for disposeKey: DisposeKey) -> Disposable? {
self.lock.lock(); defer { self.lock.unlock() }
return self.disposables?.removeKey(disposeKey.key)
self.lock.performLocked { self.disposables?.removeKey(disposeKey.key) }
}
/// Disposes all disposables in the group and removes them from the group.
@ -114,12 +111,11 @@ public final class CompositeDisposable : DisposeBase, Cancelable {
}
private func _dispose() -> Bag<Disposable>? {
self.lock.lock(); defer { self.lock.unlock() }
let disposeBag = self.disposables
self.disposables = nil
return disposeBag
self.lock.performLocked {
let current = self.disposables
self.disposables = nil
return current
}
}
}

View File

@ -48,14 +48,15 @@ public final class DisposeBag: DisposeBase {
}
private func _insert(_ disposable: Disposable) -> Disposable? {
self.lock.lock(); defer { self.lock.unlock() }
if self.isDisposed {
return disposable
self.lock.performLocked {
if self.isDisposed {
return disposable
}
self.disposables.append(disposable)
return nil
}
self.disposables.append(disposable)
return nil
}
/// This is internal on purpose, take a look at `CompositeDisposable` instead.
@ -68,14 +69,14 @@ public final class DisposeBag: DisposeBase {
}
private func _dispose() -> [Disposable] {
self.lock.lock(); defer { self.lock.unlock() }
let disposables = self.disposables
self.disposables.removeAll(keepingCapacity: false)
self.isDisposed = true
return disposables
self.lock.performLocked {
let disposables = self.disposables
self.disposables.removeAll(keepingCapacity: false)
self.isDisposed = true
return disposables
}
}
deinit {
@ -114,11 +115,12 @@ extension DisposeBag {
/// Convenience function allows an array of disposables to be gathered for disposal.
public func insert(_ disposables: [Disposable]) {
self.lock.lock(); defer { self.lock.unlock() }
if self.isDisposed {
disposables.forEach { $0.dispose() }
} else {
self.disposables += disposables
self.lock.performLocked {
if self.isDisposed {
disposables.forEach { $0.dispose() }
} else {
self.disposables += disposables
}
}
}

View File

@ -15,8 +15,7 @@ public final class RefCountDisposable : DisposeBase, Cancelable {
/// - returns: Was resource disposed.
public var isDisposed: Bool {
self.lock.lock(); defer { self.lock.unlock() }
return self.disposable == nil
self.lock.performLocked { self.disposable == nil }
}
/// Initializes a new instance of the `RefCountDisposable`.
@ -31,7 +30,7 @@ public final class RefCountDisposable : DisposeBase, Cancelable {
When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned.
*/
public func retain() -> Disposable {
self.lock.calculateLocked {
self.lock.performLocked {
if self.disposable != nil {
do {
_ = try incrementChecked(&self.count)
@ -48,7 +47,7 @@ public final class RefCountDisposable : DisposeBase, Cancelable {
/// Disposes the underlying disposable only when all dependent disposables have been disposed.
public func dispose() {
let oldDisposable: Disposable? = self.lock.calculateLocked {
let oldDisposable: Disposable? = self.lock.performLocked {
if let oldDisposable = self.disposable, !self.primaryDisposed {
self.primaryDisposed = true
@ -67,7 +66,7 @@ public final class RefCountDisposable : DisposeBase, Cancelable {
}
fileprivate func release() {
let oldDisposable: Disposable? = self.lock.calculateLocked {
let oldDisposable: Disposable? = self.lock.performLocked {
if let oldDisposable = self.disposable {
do {
_ = try decrementChecked(&self.count)

View File

@ -33,12 +33,12 @@ public final class SerialDisposable : DisposeBase, Cancelable {
*/
public var disposable: Disposable {
get {
self.lock.calculateLocked {
self.lock.performLocked {
self.current ?? Disposables.create()
}
}
set (newDisposable) {
let disposable: Disposable? = self.lock.calculateLocked {
let disposable: Disposable? = self.lock.performLocked {
if self.isDisposed {
return newDisposable
}
@ -61,11 +61,9 @@ public final class SerialDisposable : DisposeBase, Cancelable {
}
private func _dispose() -> Disposable? {
self.lock.lock(); defer { self.lock.unlock() }
if self.isDisposed {
return nil
}
else {
self.lock.performLocked {
guard !self.isDisposed else { return nil }
self.disposed = true
let current = self.current
self.current = nil

View File

@ -109,24 +109,20 @@ extension Hooks {
/// Error handler called in case onError handler wasn't provided.
public static var defaultErrorHandler: DefaultErrorHandler {
get {
lock.lock(); defer { lock.unlock() }
return _defaultErrorHandler
lock.performLocked { _defaultErrorHandler }
}
set {
lock.lock(); defer { lock.unlock() }
_defaultErrorHandler = newValue
lock.performLocked { _defaultErrorHandler = newValue }
}
}
/// Subscription callstack block to fetch custom callstack information.
public static var customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack {
get {
lock.lock(); defer { lock.unlock() }
return _customCaptureSubscriptionCallstack
lock.performLocked { _customCaptureSubscriptionCallstack }
}
set {
lock.lock(); defer { lock.unlock() }
_customCaptureSubscriptionCallstack = newValue
lock.performLocked { _customCaptureSubscriptionCallstack = newValue }
}
}
}

View File

@ -65,53 +65,52 @@ final private class CombineLatestCollectionTypeSink<Collection: Swift.Collection
}
func on(_ event: Event<SourceElement>, atIndex: Int) {
self.lock.lock(); defer { self.lock.unlock() } // {
switch event {
case .next(let element):
if self.values[atIndex] == nil {
self.numberOfValues += 1
}
self.values[atIndex] = element
if self.numberOfValues < self.parent.count {
let numberOfOthersThatAreDone = self.numberOfDone - (self.isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self.parent.count - 1 {
self.forwardOn(.completed)
self.dispose()
}
return
}
do {
let result = try self.parent.resultSelector(self.values.map { $0! })
self.forwardOn(.next(result))
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
if self.isDone[atIndex] {
return
}
self.isDone[atIndex] = true
self.numberOfDone += 1
if self.numberOfDone == self.parent.count {
self.lock.lock(); defer { self.lock.unlock() }
switch event {
case .next(let element):
if self.values[atIndex] == nil {
self.numberOfValues += 1
}
self.values[atIndex] = element
if self.numberOfValues < self.parent.count {
let numberOfOthersThatAreDone = self.numberOfDone - (self.isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self.parent.count - 1 {
self.forwardOn(.completed)
self.dispose()
}
else {
self.subscriptions[atIndex].dispose()
}
return
}
// }
do {
let result = try self.parent.resultSelector(self.values.map { $0! })
self.forwardOn(.next(result))
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
if self.isDone[atIndex] {
return
}
self.isDone[atIndex] = true
self.numberOfDone += 1
if self.numberOfDone == self.parent.count {
self.forwardOn(.completed)
self.dispose()
}
else {
self.subscriptions[atIndex].dispose()
}
}
}
func run() -> Disposable {

View File

@ -86,15 +86,16 @@ final private class DebounceSink<Observer: ObserverType>
}
func propagate(_ currentId: UInt64) -> Disposable {
self.lock.lock(); defer { self.lock.unlock() } // {
let originalValue = self.value
self.lock.performLocked {
let originalValue = self.value
if let value = originalValue, self.id == currentId {
self.value = nil
self.forwardOn(.next(value))
if let value = originalValue, self.id == currentId {
self.value = nil
self.forwardOn(.next(value))
}
return Disposables.create()
}
// }
return Disposables.create()
}
}

View File

@ -61,13 +61,12 @@ final private class DelaySink<Observer: ObserverType>
//
// Another complication is that scheduler is potentially concurrent so internal queue is used.
func drainQueue(state: (), scheduler: AnyRecursiveScheduler<()>) {
self.lock.lock() // {
let hasFailed = self.errorEvent != nil
if !hasFailed {
self.running = true
}
self.lock.unlock() // }
self.lock.lock()
let hasFailed = self.errorEvent != nil
if !hasFailed {
self.running = true
}
self.lock.unlock()
if hasFailed {
return
@ -76,24 +75,24 @@ final private class DelaySink<Observer: ObserverType>
var ranAtLeastOnce = false
while true {
self.lock.lock() // {
let errorEvent = self.errorEvent
self.lock.lock()
let errorEvent = self.errorEvent
let eventToForwardImmediately = ranAtLeastOnce ? nil : self.queue.dequeue()?.event
let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !self.queue.isEmpty ? self.queue.peek().eventTime : nil
let eventToForwardImmediately = ranAtLeastOnce ? nil : self.queue.dequeue()?.event
let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !self.queue.isEmpty ? self.queue.peek().eventTime : nil
if errorEvent == nil {
if eventToForwardImmediately != nil {
}
else if nextEventToScheduleOriginalTime != nil {
self.running = false
}
else {
self.running = false
self.active = false
}
if errorEvent == nil {
if eventToForwardImmediately != nil {
}
self.lock.unlock() // {
else if nextEventToScheduleOriginalTime != nil {
self.running = false
}
else {
self.running = false
self.active = false
}
}
self.lock.unlock()
if let errorEvent = errorEvent {
self.forwardOn(errorEvent)
@ -127,22 +126,22 @@ final private class DelaySink<Observer: ObserverType>
switch event {
case .error:
self.lock.lock() // {
let shouldSendImmediately = !self.running
self.queue = Queue(capacity: 0)
self.errorEvent = event
self.lock.unlock() // }
self.lock.lock()
let shouldSendImmediately = !self.running
self.queue = Queue(capacity: 0)
self.errorEvent = event
self.lock.unlock()
if shouldSendImmediately {
self.forwardOn(event)
self.dispose()
}
default:
self.lock.lock() // {
let shouldSchedule = !self.active
self.active = true
self.queue.enqueue((self.scheduler.now, event))
self.lock.unlock() // }
self.lock.lock()
let shouldSchedule = !self.active
self.active = true
self.queue.enqueue((self.scheduler.now, event))
self.lock.unlock()
if shouldSchedule {
self.cancelable.disposable = self.scheduler.scheduleRecursive((), dueTime: self.dueTime, action: self.drainQueue)

View File

@ -255,7 +255,7 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
self.lock.lock(); defer { self.lock.unlock() } // {
self.lock.performLocked {
let subscribe: Bool
if self.activeCount < self.maxConcurrent {
self.activeCount += 1
@ -282,7 +282,7 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
}
return nil
// }
}
}
func on(_ event: Event<SourceElement>) {
@ -292,22 +292,22 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
self.subscribe(sequence, group: self.group)
}
case .error(let error):
self.lock.lock(); defer { self.lock.unlock() }
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.lock.lock(); defer { self.lock.unlock() }
if self.activeCount == 0 {
self.forwardOn(.completed)
self.lock.performLocked {
self.forwardOn(.error(error))
self.dispose()
}
else {
self.sourceSubscription.dispose()
}
case .completed:
self.lock.performLocked {
if self.activeCount == 0 {
self.forwardOn(.completed)
self.dispose()
}
else {
self.sourceSubscription.dispose()
}
self.stopped = true
self.stopped = true
}
}
}
}
@ -388,7 +388,7 @@ private final class MergeSinkIter<SourceElement, SourceSequence: ObservableConve
}
func on(_ event: Event<Element>) {
self.parent.lock.lock(); defer { self.parent.lock.unlock() } // lock {
self.parent.lock.performLocked {
switch event {
case .next(let value):
self.parent.forwardOn(.next(value))
@ -400,7 +400,7 @@ private final class MergeSinkIter<SourceElement, SourceSequence: ObservableConve
self.parent.activeCount -= 1
self.parent.checkCompleted()
}
// }
}
}
}
@ -434,7 +434,7 @@ private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType
@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
self.lock.lock(); defer { self.lock.unlock() } // {
self.lock.performLocked {
if !self.subscribeNext {
return nil
}
@ -449,7 +449,7 @@ private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType
self.dispose()
return nil
}
// }
}
}
func on(_ event: Event<SourceElement>) {
@ -459,14 +459,16 @@ private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType
self.subscribeInner(value.asObservable())
}
case .error(let error):
self.lock.lock(); defer { self.lock.unlock() }
self.forwardOn(.error(error))
self.dispose()
self.lock.performLocked {
self.forwardOn(.error(error))
self.dispose()
}
case .completed:
self.lock.lock(); defer { self.lock.unlock() }
self.stopped = true
self.sourceSubscription.dispose()
self.checkCompleted()
self.lock.performLocked {
self.stopped = true
self.sourceSubscription.dispose()
self.checkCompleted()
}
}
}

View File

@ -176,7 +176,7 @@ final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
}
func dispose() {
lock.lock(); defer { lock.unlock() } // {
lock.lock(); defer { lock.unlock() }
fetchOr(self.disposed, 1)
guard let parent = self.parent else {
return
@ -190,7 +190,6 @@ final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
self.subscription?.dispose()
self.subscription = nil
// }
}
}
@ -215,7 +214,7 @@ final private class ConnectableObservableAdapter<Subject: SubjectType>
}
override func connect() -> Disposable {
return self.lock.calculateLocked {
return self.lock.performLocked {
if let connection = self.connection {
return connection
}
@ -261,7 +260,7 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
func run() -> Disposable {
let subscription = self.parent.source.subscribe(self)
self.parent.lock.lock(); defer { self.parent.lock.unlock() } // {
self.parent.lock.lock(); defer { self.parent.lock.unlock() }
self.connectionIdSnapshot = self.parent.connectionId
@ -276,11 +275,10 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
else {
self.parent.count += 1
}
// }
return Disposables.create {
subscription.dispose()
self.parent.lock.lock(); defer { self.parent.lock.unlock() } // {
self.parent.lock.lock(); defer { self.parent.lock.unlock() }
if self.parent.connectionId != self.connectionIdSnapshot {
return
}
@ -299,7 +297,6 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
else {
rxFatalError("Something went wrong with RefCount disposing mechanism")
}
// }
}
}
@ -308,15 +305,14 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
case .next:
self.forwardOn(event)
case .error, .completed:
self.parent.lock.lock() // {
if self.parent.connectionId == self.connectionIdSnapshot {
let connection = self.parent.connectableSubscription
defer { connection?.dispose() }
self.parent.count = 0
self.parent.connectionId = self.parent.connectionId &+ 1
self.parent.connectableSubscription = nil
}
// }
self.parent.lock.lock()
if self.parent.connectionId == self.connectionIdSnapshot {
let connection = self.parent.connectableSubscription
defer { connection?.dispose() }
self.parent.count = 0
self.parent.connectionId = self.parent.connectionId &+ 1
self.parent.connectableSubscription = nil
}
self.parent.lock.unlock()
self.forwardOn(event)
self.dispose()

View File

@ -85,7 +85,7 @@ final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer
}
override func onCore(_ event: Event<Element>) {
let shouldStart = self.lock.calculateLocked { () -> Bool in
let shouldStart = self.lock.performLocked { () -> Bool in
self.queue.enqueue(event)
switch self.state {
@ -103,7 +103,7 @@ final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer
}
func run(_ state: (), _ recurse: (()) -> Void) {
let (nextEvent, observer) = self.lock.calculateLocked { () -> (Event<Element>?, Observer) in
let (nextEvent, observer) = self.lock.performLocked { () -> (Event<Element>?, Observer) in
if !self.queue.isEmpty {
return (self.queue.dequeue(), self.observer)
}
@ -131,13 +131,10 @@ final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer
}
func shouldContinue_synchronized() -> Bool {
self.lock.lock(); defer { self.lock.unlock() }
if !self.queue.isEmpty {
return true
}
else {
self.state = .stopped
return false
self.lock.performLocked {
let isEmpty = self.queue.isEmpty
if isEmpty { self.state = .stopped }
return !isEmpty
}
}

View File

@ -181,9 +181,7 @@ private final class ShareReplay1WhileConnectedConnection<Element>
}
final func on(_ event: Event<Element>) {
self.lock.lock()
let observers = self.synchronized_on(event)
self.lock.unlock()
let observers = self.lock.performLocked { self.synchronized_on(event) }
dispatch(observers, event)
}
@ -208,14 +206,15 @@ private final class ShareReplay1WhileConnectedConnection<Element>
}
final func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock(); defer { self.lock.unlock() }
if let element = self.element {
observer.on(.next(element))
self.lock.performLocked {
if let element = self.element {
observer.on(.next(element))
}
let disposeKey = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
let disposeKey = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
final private func synchronized_dispose() {
@ -227,10 +226,7 @@ private final class ShareReplay1WhileConnectedConnection<Element>
}
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock()
let shouldDisconnect = self.synchronized_unsubscribe(disposeKey)
self.lock.unlock()
if shouldDisconnect {
if self.lock.performLocked({ self.synchronized_unsubscribe(disposeKey) }) {
self.subscription.dispose()
}
}
@ -275,12 +271,10 @@ final private class ShareReplay1WhileConnected<Element>
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let connection = self.synchronized_subscribe(observer)
let count = connection.observers.count
let disposable = connection.synchronized_subscribe(observer)
self.lock.unlock()
if count == 0 {
@ -332,9 +326,7 @@ private final class ShareWhileConnectedConnection<Element>
}
final func on(_ event: Event<Element>) {
self.lock.lock()
let observers = self.synchronized_on(event)
self.lock.unlock()
let observers = self.lock.performLocked { self.synchronized_on(event) }
dispatch(observers, event)
}
@ -358,11 +350,11 @@ private final class ShareWhileConnectedConnection<Element>
}
final func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock(); defer { self.lock.unlock() }
self.lock.performLocked {
let disposeKey = self.observers.insert(observer.on)
let disposeKey = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
}
final private func synchronized_dispose() {
@ -374,10 +366,7 @@ private final class ShareWhileConnectedConnection<Element>
}
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock()
let shouldDisconnect = self.synchronized_unsubscribe(disposeKey)
self.lock.unlock()
if shouldDisconnect {
if self.lock.performLocked({ self.synchronized_unsubscribe(disposeKey) }) {
self.subscription.dispose()
}
}
@ -422,12 +411,10 @@ final private class ShareWhileConnected<Element>
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let connection = self.synchronized_subscribe(observer)
let count = connection.observers.count
let disposable = connection.synchronized_subscribe(observer)
self.lock.unlock()
if count == 0 {

View File

@ -140,10 +140,10 @@ final private class TakeTimeSink<Element, Observer: ObserverType>
}
func tick() {
self.lock.lock(); defer { self.lock.unlock() }
self.forwardOn(.completed)
self.dispose()
self.lock.performLocked {
self.forwardOn(.completed)
self.dispose()
}
}
func run() -> Disposable {

View File

@ -123,7 +123,7 @@ final private class ThrottleSink<Observer: ObserverType>
}
func propagate(_: Int) -> Disposable {
self.lock.lock(); defer { self.lock.unlock() } // {
self.lock.performLocked {
if let lastUnsentElement = self.lastUnsentElement {
self.sendNow(element: lastUnsentElement)
}
@ -132,7 +132,8 @@ final private class ThrottleSink<Observer: ObserverType>
self.forwardOn(.completed)
self.dispose()
}
// }
}
return Disposables.create()
}
}

View File

@ -62,9 +62,10 @@ final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Obs
func run() -> Disposable {
return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.lock.lock(); defer { self.lock.unlock() }
self.forwardOn(.next(state))
return state &+ 1
self.lock.performLocked {
self.forwardOn(.next(state))
return state &+ 1
}
}
}
}

View File

@ -66,65 +66,64 @@ final private class ZipCollectionTypeSink<Collection: Swift.Collection, Observer
}
func on(_ event: Event<SourceElement>, atIndex: Int) {
self.lock.lock(); defer { self.lock.unlock() } // {
switch event {
case .next(let element):
self.values[atIndex].enqueue(element)
if self.values[atIndex].count == 1 {
self.numberOfValues += 1
}
if self.numberOfValues < self.parent.count {
if self.numberOfDone == self.parent.count - 1 {
self.forwardOn(.completed)
self.dispose()
}
return
}
do {
var arguments = [SourceElement]()
arguments.reserveCapacity(self.parent.count)
// recalculate number of values
self.numberOfValues = 0
for i in 0 ..< self.values.count {
arguments.append(self.values[i].dequeue()!)
if !self.values[i].isEmpty {
self.numberOfValues += 1
}
}
let result = try self.parent.resultSelector(arguments)
self.forwardOn(.next(result))
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
if self.isDone[atIndex] {
return
}
self.isDone[atIndex] = true
self.numberOfDone += 1
if self.numberOfDone == self.parent.count {
self.lock.lock(); defer { self.lock.unlock() }
switch event {
case .next(let element):
self.values[atIndex].enqueue(element)
if self.values[atIndex].count == 1 {
self.numberOfValues += 1
}
if self.numberOfValues < self.parent.count {
if self.numberOfDone == self.parent.count - 1 {
self.forwardOn(.completed)
self.dispose()
}
else {
self.subscriptions[atIndex].dispose()
}
return
}
// }
do {
var arguments = [SourceElement]()
arguments.reserveCapacity(self.parent.count)
// recalculate number of values
self.numberOfValues = 0
for i in 0 ..< self.values.count {
arguments.append(self.values[i].dequeue()!)
if !self.values[i].isEmpty {
self.numberOfValues += 1
}
}
let result = try self.parent.resultSelector(arguments)
self.forwardOn(.next(result))
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
if self.isDone[atIndex] {
return
}
self.isDone[atIndex] = true
self.numberOfDone += 1
if self.numberOfDone == self.parent.count {
self.forwardOn(.completed)
self.dispose()
}
else {
self.subscriptions[atIndex].dispose()
}
}
}
func run() -> Disposable {

View File

@ -121,11 +121,12 @@ func decrementChecked(_ i: inout Int) throws -> Int {
}
func unregister() {
self.lock.lock(); defer { self.lock.unlock() }
let pointer = Unmanaged.passUnretained(Thread.current).toOpaque()
self.threads[pointer] = (self.threads[pointer] ?? 1) - 1
if self.threads[pointer] == 0 {
self.threads[pointer] = nil
self.lock.performLocked {
let pointer = Unmanaged.passUnretained(Thread.current).toOpaque()
self.threads[pointer] = (self.threads[pointer] ?? 1) - 1
if self.threads[pointer] == 0 {
self.threads[pointer] = nil
}
}
}
}

View File

@ -45,7 +45,7 @@ final class AnyRecursiveScheduler<State> {
return Disposables.create()
}
let action = self.lock.calculateLocked { () -> Action? in
let action = self.lock.performLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self.group.remove(for: removeKey)
@ -96,7 +96,7 @@ final class AnyRecursiveScheduler<State> {
return Disposables.create()
}
let action = self.lock.calculateLocked { () -> Action? in
let action = self.lock.performLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self.group.remove(for: removeKey)
@ -172,7 +172,7 @@ final class RecursiveImmediateScheduler<State> {
return Disposables.create()
}
let action = self.lock.calculateLocked { () -> Action? in
let action = self.lock.performLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self.group.remove(for: removeKey)

View File

@ -22,8 +22,9 @@ public final class AsyncSubject<Element>
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
self.lock.lock(); defer { self.lock.unlock() }
return self.observers.count > 0
self.lock.performLocked {
self.observers.count > 0
}
}
let lock = RecursiveLock()
@ -109,8 +110,7 @@ public final class AsyncSubject<Element>
/// - parameter observer: Observer to subscribe to the subject.
/// - returns: Disposable object that can be used to unsubscribe the observer from the subject.
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock(); defer { self.lock.unlock() }
return self.synchronized_subscribe(observer)
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
@ -133,8 +133,7 @@ public final class AsyncSubject<Element>
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock(); defer { self.lock.unlock() }
self.synchronized_unsubscribe(disposeKey)
self.lock.performLocked { self.synchronized_unsubscribe(disposeKey) }
}
func synchronized_unsubscribe(_ disposeKey: DisposeKey) {

View File

@ -22,10 +22,7 @@ public final class BehaviorSubject<Element>
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
self.lock.lock()
let value = self.observers.count > 0
self.lock.unlock()
return value
self.lock.performLocked { self.observers.count > 0 }
}
let lock = RecursiveLock()
@ -60,19 +57,18 @@ public final class BehaviorSubject<Element>
///
/// - returns: Latest value.
public func value() throws -> Element {
self.lock.lock(); defer { self.lock.unlock() } // {
if self.isDisposed {
throw RxError.disposed(object: self)
}
if let error = self.stoppedEvent?.error {
// intentionally throw exception
throw error
}
else {
return self.element
}
//}
self.lock.lock(); defer { self.lock.unlock() }
if self.isDisposed {
throw RxError.disposed(object: self)
}
if let error = self.stoppedEvent?.error {
// intentionally throw exception
throw error
}
else {
return self.element
}
}
/// Notifies all subscribed observers about next event.
@ -107,10 +103,7 @@ public final class BehaviorSubject<Element>
/// - parameter observer: Observer to subscribe to the subject.
/// - returns: Disposable object that can be used to unsubscribe the observer from the subject.
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let subscription = self.synchronized_subscribe(observer)
self.lock.unlock()
return subscription
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
@ -131,9 +124,7 @@ public final class BehaviorSubject<Element>
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock()
self.synchronized_unsubscribe(disposeKey)
self.lock.unlock()
self.lock.performLocked { self.synchronized_unsubscribe(disposeKey) }
}
func synchronized_unsubscribe(_ disposeKey: DisposeKey) {
@ -151,11 +142,11 @@ public final class BehaviorSubject<Element>
/// Unsubscribe all observers and release resources.
public func dispose() {
self.lock.lock()
self.disposed = true
self.observers.removeAll()
self.stoppedEvent = nil
self.lock.unlock()
self.lock.performLocked {
self.disposed = true
self.observers.removeAll()
self.stoppedEvent = nil
}
}
#if TRACE_RESOURCES

View File

@ -22,10 +22,7 @@ public final class PublishSubject<Element>
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
self.lock.lock()
let count = self.observers.count > 0
self.lock.unlock()
return count
self.lock.performLocked { self.observers.count > 0 }
}
private let lock = RecursiveLock()
@ -93,10 +90,7 @@ public final class PublishSubject<Element>
- returns: Disposable object that can be used to unsubscribe the observer from the subject.
*/
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let subscription = self.synchronized_subscribe(observer)
self.lock.unlock()
return subscription
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
@ -115,9 +109,7 @@ public final class PublishSubject<Element>
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock()
self.synchronized_unsubscribe(disposeKey)
self.lock.unlock()
self.lock.performLocked { self.synchronized_unsubscribe(disposeKey) }
}
func synchronized_unsubscribe(_ disposeKey: DisposeKey) {
@ -131,9 +123,7 @@ public final class PublishSubject<Element>
/// Unsubscribe all observers and release resources.
public func dispose() {
self.lock.lock()
self.synchronized_dispose()
self.lock.unlock()
self.lock.performLocked { self.synchronized_dispose() }
}
final func synchronized_dispose() {

View File

@ -21,10 +21,7 @@ public class ReplaySubject<Element>
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
self.lock.lock()
let value = self.observers.count > 0
self.lock.unlock()
return value
self.lock.performLocked { self.observers.count > 0 }
}
fileprivate let lock = RecursiveLock()
@ -147,10 +144,7 @@ private class ReplayBufferBase<Element>
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.lock()
let subscription = self.synchronized_subscribe(observer)
self.lock.unlock()
return subscription
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
@ -173,9 +167,7 @@ private class ReplayBufferBase<Element>
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self.lock.lock()
self.synchronized_unsubscribe(disposeKey)
self.lock.unlock()
self.lock.performLocked { self.synchronized_unsubscribe(disposeKey) }
}
func synchronized_unsubscribe(_ disposeKey: DisposeKey) {
@ -193,9 +185,7 @@ private class ReplayBufferBase<Element>
}
func synchronizedDispose() {
self.lock.lock()
self.synchronized_dispose()
self.lock.unlock()
self.lock.performLocked { self.synchronized_dispose() }
}
func synchronized_dispose() {