diff --git a/RxPack/Sources/RxPack/RxMsgpackRpc.swift b/RxPack/Sources/RxPack/RxMsgpackRpc.swift index 3359498e..94dbae44 100644 --- a/RxPack/Sources/RxPack/RxMsgpackRpc.swift +++ b/RxPack/Sources/RxPack/RxMsgpackRpc.swift @@ -57,16 +57,11 @@ public final class RxMsgpackRpc { public init(queueQos: DispatchQoS) { let uuidStr = self.uuid.uuidString - self.queue = DispatchQueue( - label: "\(String(reflecting: RxMsgpackRpc.self))-\(uuidStr)", - qos: queueQos, - target: .global(qos: queueQos.qosClass) - ) self.pipeReadQueue = DispatchQueue( label: "\(String(reflecting: RxMsgpackRpc.self))-pipeReadQueue-\(uuidStr)", qos: queueQos ) - self.streamQueue = DispatchQueue( + self.queue = DispatchQueue( label: "\(String(reflecting: RxMsgpackRpc.self))-streamSubjectQueue-\(uuidStr)", qos: queueQos, target: .global(qos: queueQos.qosClass) @@ -78,11 +73,11 @@ public final class RxMsgpackRpc { self.outPipe = outPipe self.errorPipe = errorPipe - self.queue.async { [weak self] in self?.startReading() } + self.startReading() } public func stop() { - self.queue.async { [weak self] in self?.cleanUp() } + self.cleanUp() } public func response(msgid: UInt32, error: Value, result: Value) -> Completable { @@ -138,7 +133,7 @@ public final class RxMsgpackRpc { if expectsReturnValue { // In streamQueue since we want to sync' access self.singles only in that queue. - self?.streamQueue.async { self?.singles[msgid] = single } + self?.queue.async { self?.singles[msgid] = single } } do { @@ -169,9 +164,8 @@ public final class RxMsgpackRpc { // R/w only in self.queue private var nextMsgid: UInt32 = 0 - private let queue: DispatchQueue private let pipeReadQueue: DispatchQueue - private let streamQueue: DispatchQueue + private let queue: DispatchQueue private var inPipe: Pipe? private var outPipe: Pipe? @@ -188,11 +182,11 @@ public final class RxMsgpackRpc { } private func cleanUp() { - self.inPipe = nil - self.outPipe = nil - self.errorPipe = nil - - self.streamQueue.async { [weak self] in + self.queue.async { [weak self] in + self?.inPipe = nil + self?.outPipe = nil + self?.errorPipe = nil + self?.streamSubject.onCompleted() self?.singles.forEach { _, single in single(.failure(Error(msg: "Pipe closed"))) } } @@ -227,11 +221,11 @@ public final class RxMsgpackRpc { else { dataToUnmarshall.removeAll(keepingCapacity: true) } _ = consume remainderData - self?.streamQueue.async { + self?.queue.async { values.forEach { value in self?.processMessage(value) } } } catch { - self?.streamQueue.async { + self?.queue.async { self?.streamSubject.onError(Error(msg: "Could not read from pipe", cause: error)) } }