From 5afb0cf9e18af209062d25350251034cbb27de5b Mon Sep 17 00:00:00 2001 From: Tae Won Ha Date: Thu, 10 Dec 2020 21:42:10 +0100 Subject: [PATCH] Refactor --- RxPack/Sources/RxPack/RxMsgpackRpc.swift | 32 ++++++------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/RxPack/Sources/RxPack/RxMsgpackRpc.swift b/RxPack/Sources/RxPack/RxMsgpackRpc.swift index 31d38012..4d492ea0 100644 --- a/RxPack/Sources/RxPack/RxMsgpackRpc.swift +++ b/RxPack/Sources/RxPack/RxMsgpackRpc.swift @@ -65,23 +65,13 @@ public final class RxMsgpackRpc { public func run(at path: String) -> Completable { Completable.create { completable in self.queue.async { - self.stopped = false - do { - try self.socket = Socket.create( - family: .unix, - type: .stream, - proto: .unix - ) + try self.socket = Socket.create(family: .unix, type: .stream, proto: .unix) try self.socket?.connect(to: path) self.setUpThreadAndStartReading() } catch { - self.streamSubject.onError( - Error(msg: "Could not get socket", cause: error) - ) - completable(.error( - Error(msg: "Could not get socket at \(path)", cause: error) - )) + self.streamSubject.onError(Error(msg: "Could not get socket", cause: error)) + completable(.error(Error(msg: "Could not get socket at \(path)", cause: error))) } completable(.completed) @@ -121,18 +111,16 @@ public final class RxMsgpackRpc { ] ) - if self.stopped { + if self.socket?.remoteConnectionClosed == true { single(.error(Error( - msg: "Connection stopped, " + - "but trying to send a request with msg id \(msgid)" + msg: "Connection stopped, but trying to send a request with msg id \(msgid)" ))) return } guard let socket = self.socket else { single(.error(Error( - msg: "Socket is invalid, " + - "but trying to send a request with msg id \(msgid)" + msg: "Socket is invalid, but trying to send a request with msg id \(msgid)" ))) return } @@ -161,9 +149,7 @@ public final class RxMsgpackRpc { return } - if !expectsReturnValue { - single(.success(self.nilResponse(with: msgid))) - } + if !expectsReturnValue { single(.success(self.nilResponse(with: msgid))) } } return Disposables.create() @@ -176,7 +162,6 @@ public final class RxMsgpackRpc { private var thread: Thread? private let queue: DispatchQueue - private var stopped = true private var singles: [UInt32: SingleResponseObserver] = [:] private let streamSubject = PublishSubject() @@ -191,7 +176,6 @@ public final class RxMsgpackRpc { self.singles.forEach { msgid, single in single(.success(self.nilResponse(with: msgid))) } self.singles.removeAll() - self.stopped = true self.socket?.close() } @@ -226,7 +210,7 @@ public final class RxMsgpackRpc { self.queue.async { self.cleanUpAndCloseSocket() } return } - } while !self.queue.sync { self.stopped } + } while self.socket?.remoteConnectionClosed == false } self.thread?.start()