1
1
mirror of https://github.com/qvacua/vimr.git synced 2024-09-19 04:57:14 +03:00

Reduce number of queues

This commit is contained in:
Tae Won Ha 2023-12-23 20:19:54 +01:00
parent e568295f68
commit 2517e90258
No known key found for this signature in database
GPG Key ID: E40743465B5B8B44

View File

@ -57,16 +57,11 @@ public final class RxMsgpackRpc {
public init(queueQos: DispatchQoS) {
let uuidStr = self.uuid.uuidString
self.queue = DispatchQueue(
label: "\(String(reflecting: RxMsgpackRpc.self))-\(uuidStr)",
qos: queueQos,
target: .global(qos: queueQos.qosClass)
)
self.pipeReadQueue = DispatchQueue(
label: "\(String(reflecting: RxMsgpackRpc.self))-pipeReadQueue-\(uuidStr)",
qos: queueQos
)
self.streamQueue = DispatchQueue(
self.queue = DispatchQueue(
label: "\(String(reflecting: RxMsgpackRpc.self))-streamSubjectQueue-\(uuidStr)",
qos: queueQos,
target: .global(qos: queueQos.qosClass)
@ -78,11 +73,11 @@ public final class RxMsgpackRpc {
self.outPipe = outPipe
self.errorPipe = errorPipe
self.queue.async { [weak self] in self?.startReading() }
self.startReading()
}
public func stop() {
self.queue.async { [weak self] in self?.cleanUp() }
self.cleanUp()
}
public func response(msgid: UInt32, error: Value, result: Value) -> Completable {
@ -138,7 +133,7 @@ public final class RxMsgpackRpc {
if expectsReturnValue {
// In streamQueue since we want to sync' access self.singles only in that queue.
self?.streamQueue.async { self?.singles[msgid] = single }
self?.queue.async { self?.singles[msgid] = single }
}
do {
@ -169,9 +164,8 @@ public final class RxMsgpackRpc {
// R/w only in self.queue
private var nextMsgid: UInt32 = 0
private let queue: DispatchQueue
private let pipeReadQueue: DispatchQueue
private let streamQueue: DispatchQueue
private let queue: DispatchQueue
private var inPipe: Pipe?
private var outPipe: Pipe?
@ -188,11 +182,11 @@ public final class RxMsgpackRpc {
}
private func cleanUp() {
self.inPipe = nil
self.outPipe = nil
self.errorPipe = nil
self.streamQueue.async { [weak self] in
self.queue.async { [weak self] in
self?.inPipe = nil
self?.outPipe = nil
self?.errorPipe = nil
self?.streamSubject.onCompleted()
self?.singles.forEach { _, single in single(.failure(Error(msg: "Pipe closed"))) }
}
@ -227,11 +221,11 @@ public final class RxMsgpackRpc {
else { dataToUnmarshall.removeAll(keepingCapacity: true) }
_ = consume remainderData
self?.streamQueue.async {
self?.queue.async {
values.forEach { value in self?.processMessage(value) }
}
} catch {
self?.streamQueue.async {
self?.queue.async {
self?.streamSubject.onError(Error(msg: "Could not read from pipe", cause: error))
}
}