1
1
mirror of https://github.com/qvacua/vimr.git synced 2024-12-25 23:02:35 +03:00
This commit is contained in:
Tae Won Ha 2020-12-10 21:42:10 +01:00
parent 2bae89165e
commit 5afb0cf9e1
No known key found for this signature in database
GPG Key ID: E40743465B5B8B44

View File

@ -65,23 +65,13 @@ public final class RxMsgpackRpc {
public func run(at path: String) -> Completable { public func run(at path: String) -> Completable {
Completable.create { completable in Completable.create { completable in
self.queue.async { self.queue.async {
self.stopped = false
do { do {
try self.socket = Socket.create( try self.socket = Socket.create(family: .unix, type: .stream, proto: .unix)
family: .unix,
type: .stream,
proto: .unix
)
try self.socket?.connect(to: path) try self.socket?.connect(to: path)
self.setUpThreadAndStartReading() self.setUpThreadAndStartReading()
} catch { } catch {
self.streamSubject.onError( self.streamSubject.onError(Error(msg: "Could not get socket", cause: error))
Error(msg: "Could not get socket", cause: error) completable(.error(Error(msg: "Could not get socket at \(path)", cause: error)))
)
completable(.error(
Error(msg: "Could not get socket at \(path)", cause: error)
))
} }
completable(.completed) completable(.completed)
@ -121,18 +111,16 @@ public final class RxMsgpackRpc {
] ]
) )
if self.stopped { if self.socket?.remoteConnectionClosed == true {
single(.error(Error( single(.error(Error(
msg: "Connection stopped, " + msg: "Connection stopped, but trying to send a request with msg id \(msgid)"
"but trying to send a request with msg id \(msgid)"
))) )))
return return
} }
guard let socket = self.socket else { guard let socket = self.socket else {
single(.error(Error( single(.error(Error(
msg: "Socket is invalid, " + msg: "Socket is invalid, but trying to send a request with msg id \(msgid)"
"but trying to send a request with msg id \(msgid)"
))) )))
return return
} }
@ -161,9 +149,7 @@ public final class RxMsgpackRpc {
return return
} }
if !expectsReturnValue { if !expectsReturnValue { single(.success(self.nilResponse(with: msgid))) }
single(.success(self.nilResponse(with: msgid)))
}
} }
return Disposables.create() return Disposables.create()
@ -176,7 +162,6 @@ public final class RxMsgpackRpc {
private var thread: Thread? private var thread: Thread?
private let queue: DispatchQueue private let queue: DispatchQueue
private var stopped = true
private var singles: [UInt32: SingleResponseObserver] = [:] private var singles: [UInt32: SingleResponseObserver] = [:]
private let streamSubject = PublishSubject<Message>() private let streamSubject = PublishSubject<Message>()
@ -191,7 +176,6 @@ public final class RxMsgpackRpc {
self.singles.forEach { msgid, single in single(.success(self.nilResponse(with: msgid))) } self.singles.forEach { msgid, single in single(.success(self.nilResponse(with: msgid))) }
self.singles.removeAll() self.singles.removeAll()
self.stopped = true
self.socket?.close() self.socket?.close()
} }
@ -226,7 +210,7 @@ public final class RxMsgpackRpc {
self.queue.async { self.cleanUpAndCloseSocket() } self.queue.async { self.cleanUpAndCloseSocket() }
return return
} }
} while !self.queue.sync { self.stopped } } while self.socket?.remoteConnectionClosed == false
} }
self.thread?.start() self.thread?.start()