1
1
mirror of https://github.com/qvacua/vimr.git synced 2024-12-25 23:02:35 +03:00
This commit is contained in:
Tae Won Ha 2020-07-05 20:17:31 +02:00
parent c3be5b6441
commit f9c5d4a601
No known key found for this signature in database
GPG Key ID: E40743465B5B8B44

View File

@ -4,37 +4,32 @@
*/
import Foundation
import RxSwift
import MessagePack
import RxSwift
import Socket
public final class RxMsgpackRpc {
public typealias Value = MessagePackValue
enum MessageType: UInt64 {
case request = 0
case response = 1
case notification = 2
}
public enum Message {
case response(msgid: UInt32, error: Value, result: Value)
case notification(method: String, params: [Value])
case error(value: Value, msg: String)
}
public struct Response {
public let msgid: UInt32
public let error: Value
public let result: Value
}
public struct Error: Swift.Error {
var msg: String
var cause: Swift.Error?
@ -45,17 +40,17 @@ public final class RxMsgpackRpc {
}
/**
Streams `Message.notification`s and `Message.error`s by default.
When `streamResponses` is set to `true`, then also `Message.response`s.
*/
Streams `Message.notification`s and `Message.error`s by default.
When `streamResponses` is set to `true`, then also `Message.response`s.
*/
public var stream: Observable<Message> { self.streamSubject.asObservable() }
/**
When `true`, all messages of type `MessageType.response` are also streamed
to `stream` as `Message.response`. When `false`, only the `Single`s
you get from `request(msgid, method, params, expectsReturnValue)` will
get the response as `Response`.
*/
When `true`, all messages of type `MessageType.response` are also streamed
to `stream` as `Message.response`. When `false`, only the `Single`s
you get from `request(msgid, method, params, expectsReturnValue)` will
get the response as `Response`.
*/
public var streamResponses = false
public func run(at path: String) -> Completable {
@ -119,14 +114,18 @@ public final class RxMsgpackRpc {
)
if self.stopped {
single(.error(Error(msg: "Connection stopped, " +
"but trying to send a request with msg id \(msgid)")))
single(.error(Error(
msg: "Connection stopped, " +
"but trying to send a request with msg id \(msgid)"
)))
return
}
guard let socket = self.socket else {
single(.error(Error(msg: "Socket is invalid, " +
"but trying to send a request with msg id \(msgid)")))
single(.error(Error(
msg: "Socket is invalid, " +
"but trying to send a request with msg id \(msgid)"
)))
return
}
@ -137,17 +136,19 @@ public final class RxMsgpackRpc {
if writtenBytes < packed.count {
single(.error(Error(
msg: "(Written) = \(writtenBytes) < \(packed.count) = " +
"(requested) for msg id: \(msgid)"
"(requested) for msg id: \(msgid)"
)))
return
}
} catch {
self.streamSubject.onError(Error(
msg: "Could not write to socket for msg id: \(msgid)", cause: error))
msg: "Could not write to socket for msg id: \(msgid)", cause: error
))
single(.error(Error(
msg: "Could not write to socket for msg id: \(msgid)", cause: error)))
msg: "Could not write to socket for msg id: \(msgid)", cause: error
)))
return
}
@ -162,6 +163,7 @@ public final class RxMsgpackRpc {
}
// MARK: - Private
private var nextMsgid: UInt32 = 0
private var socket: Socket?
@ -226,8 +228,8 @@ public final class RxMsgpackRpc {
}
guard let rawType = array[0].uint64Value,
let type = MessageType(rawValue: rawType)
else {
let type = MessageType(rawValue: rawType)
else {
self.streamSubject.onNext(.error(
value: unpacked, msg: "Could not get the type of the message"
))
@ -235,7 +237,6 @@ public final class RxMsgpackRpc {
}
switch type {
case .response:
guard array.count == 4 else {
self.streamSubject.onNext(.error(
@ -265,8 +266,8 @@ public final class RxMsgpackRpc {
}
guard let method = array[1].stringValue,
let params = array[2].arrayValue
else {
let params = array[2].arrayValue
else {
self.streamSubject.onNext(.error(
value: unpacked,
msg: "Could not get the method and params"