diff --git a/RxPack/Sources/RxPack/RxMsgpackRpc.swift b/RxPack/Sources/RxPack/RxMsgpackRpc.swift index 0947158e..3359498e 100644 --- a/RxPack/Sources/RxPack/RxMsgpackRpc.swift +++ b/RxPack/Sources/RxPack/RxMsgpackRpc.swift @@ -40,16 +40,16 @@ public final class RxMsgpackRpc { } /** - Streams `Message.notification`s and `Message.error`s by default. - When `streamResponses` is set to `true`, then also `Message.response`s. + Streams `Message.notification`s and `Message.error`s by default. + When `streamResponses` is set to `true`, then also `Message.response`s. */ public var stream: Observable { self.streamSubject.asObservable() } /** - When `true`, all messages of type `MessageType.response` are also streamed - to `stream` as `Message.response`. When `false`, only the `Single`s - you get from `request(msgid, method, params, expectsReturnValue)` will - get the response as `Response`. + When `true`, all messages of type `MessageType.response` are also streamed + to `stream` as `Message.response`. When `false`, only the `Single`s + you get from `request(msgid, method, params, expectsReturnValue)` will + get the response as `Response`. */ public var streamResponses = false @@ -126,7 +126,7 @@ public final class RxMsgpackRpc { self?.queue.async { guard let msgid = self?.nextMsgid else { return } self?.nextMsgid += 1 - + let packed = pack( [ .uint(MessageType.request.rawValue), @@ -137,6 +137,7 @@ public final class RxMsgpackRpc { ) if expectsReturnValue { + // In streamQueue since we want to sync' access self.singles only in that queue. self?.streamQueue.async { self?.singles[msgid] = single } } @@ -163,6 +164,9 @@ public final class RxMsgpackRpc { } } + // MARK: Private + + // R/w only in self.queue private var nextMsgid: UInt32 = 0 private let queue: DispatchQueue @@ -237,6 +241,7 @@ public final class RxMsgpackRpc { } } + // Call only in self.streamQueue private func processMessage(_ unpacked: Value) { guard let array = unpacked.arrayValue else { self.streamSubject.onNext(.error( @@ -300,6 +305,21 @@ public final class RxMsgpackRpc { } } + // Call only in self.streamQueue + private func processResponse(msgid: UInt32, error: Value, result: Value) { + if let single = self.singles.removeValue(forKey: msgid) { + single(.success(Response(msgid: msgid, error: error, result: result))) + } + + if self.streamResponses { + self.streamSubject.onNext(.response(msgid: msgid, error: error, result: result)) + } + } +} + +// MARK: Private utilities + +extension RxMsgpackRpc { private func unpackAllWithRemainder(_ data: Data) throws -> (values: [Value], remainder: Data?) { var values = [Value]() var remainderData: Data? @@ -318,16 +338,6 @@ public final class RxMsgpackRpc { return (values, remainderData) } - - private func processResponse(msgid: UInt32, error: Value, result: Value) { - if let single = self.singles.removeValue(forKey: msgid) { - single(.success(Response(msgid: msgid, error: error, result: result))) - } - - if self.streamResponses { - self.streamSubject.onNext(.response(msgid: msgid, error: error, result: result)) - } - } } private typealias SingleResponseObserver = (SingleEvent) -> Void