1
1
mirror of https://github.com/qvacua/vimr.git synced 2024-11-24 03:25:03 +03:00
This commit is contained in:
Tae Won Ha 2023-12-23 20:12:35 +01:00
parent a5814366b5
commit e568295f68
No known key found for this signature in database
GPG Key ID: E40743465B5B8B44

View File

@ -40,16 +40,16 @@ public final class RxMsgpackRpc {
}
/**
Streams `Message.notification`s and `Message.error`s by default.
When `streamResponses` is set to `true`, then also `Message.response`s.
Streams `Message.notification`s and `Message.error`s by default.
When `streamResponses` is set to `true`, then also `Message.response`s.
*/
public var stream: Observable<Message> { self.streamSubject.asObservable() }
/**
When `true`, all messages of type `MessageType.response` are also streamed
to `stream` as `Message.response`. When `false`, only the `Single`s
you get from `request(msgid, method, params, expectsReturnValue)` will
get the response as `Response`.
When `true`, all messages of type `MessageType.response` are also streamed
to `stream` as `Message.response`. When `false`, only the `Single`s
you get from `request(msgid, method, params, expectsReturnValue)` will
get the response as `Response`.
*/
public var streamResponses = false
@ -137,6 +137,7 @@ public final class RxMsgpackRpc {
)
if expectsReturnValue {
// In streamQueue since we want to sync' access self.singles only in that queue.
self?.streamQueue.async { self?.singles[msgid] = single }
}
@ -163,6 +164,9 @@ public final class RxMsgpackRpc {
}
}
// MARK: Private
// R/w only in self.queue
private var nextMsgid: UInt32 = 0
private let queue: DispatchQueue
@ -237,6 +241,7 @@ public final class RxMsgpackRpc {
}
}
// Call only in self.streamQueue
private func processMessage(_ unpacked: Value) {
guard let array = unpacked.arrayValue else {
self.streamSubject.onNext(.error(
@ -300,6 +305,21 @@ public final class RxMsgpackRpc {
}
}
// Call only in self.streamQueue
private func processResponse(msgid: UInt32, error: Value, result: Value) {
if let single = self.singles.removeValue(forKey: msgid) {
single(.success(Response(msgid: msgid, error: error, result: result)))
}
if self.streamResponses {
self.streamSubject.onNext(.response(msgid: msgid, error: error, result: result))
}
}
}
// MARK: Private utilities
extension RxMsgpackRpc {
private func unpackAllWithRemainder(_ data: Data) throws -> (values: [Value], remainder: Data?) {
var values = [Value]()
var remainderData: Data?
@ -318,16 +338,6 @@ public final class RxMsgpackRpc {
return (values, remainderData)
}
private func processResponse(msgid: UInt32, error: Value, result: Value) {
if let single = self.singles.removeValue(forKey: msgid) {
single(.success(Response(msgid: msgid, error: error, result: result)))
}
if self.streamResponses {
self.streamSubject.onNext(.response(msgid: msgid, error: error, result: result))
}
}
}
private typealias SingleResponseObserver = (SingleEvent<RxMsgpackRpc.Response>) -> Void