1
1
mirror of https://github.com/qvacua/vimr.git synced 2024-11-24 03:25:03 +03:00

Merge pull request #1031 from qvacua/pipe

Use pipe for IPC
This commit is contained in:
Tae Won Ha 2023-12-11 19:29:12 +01:00 committed by GitHub
commit 9901ee40c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 218 additions and 274 deletions

View File

@ -5,7 +5,7 @@
import Foundation import Foundation
struct Defs { enum Defs {
static let loggerSubsystem = "com.qvacua.NvimView" static let loggerSubsystem = "com.qvacua.NvimView"
enum LoggerCategory { enum LoggerCategory {

View File

@ -5,7 +5,7 @@
import Cocoa import Cocoa
final class KeyUtils { enum KeyUtils {
static func isControlCode(key: String) -> Bool { static func isControlCode(key: String) -> Bool {
guard key.count == 1 else { guard key.count == 1 else {
return false return false

View File

@ -58,10 +58,18 @@ extension NvimView {
switch option { switch option {
case let .guifont(fontSpec): case let .guifont(fontSpec):
command = self.api.setOptionValue(name: "guifont", value: .string(fontSpec), opts: ["scope": .string("global")]) command = self.api.setOptionValue(
name: "guifont",
value: .string(fontSpec),
opts: ["scope": .string("global")]
)
case let .guifontWide(fontSpec): case let .guifontWide(fontSpec):
command = self.api.setOptionValue(name: "guifontwide", value: .string(fontSpec), opts: ["scope": .string("global")]) command = self.api.setOptionValue(
name: "guifontwide",
value: .string(fontSpec),
opts: ["scope": .string("global")]
)
} }
command command

View File

@ -71,120 +71,93 @@ extension NvimView {
self.log.info("NVIM_LISTEN_ADDRESS=\(sockPath)") self.log.info("NVIM_LISTEN_ADDRESS=\(sockPath)")
let inPipe: Pipe, outPipe: Pipe, errorPipe: Pipe
do { do {
try self.bridge.runLocalServerAndNvim(width: size.width, height: size.height) (inPipe, outPipe, errorPipe) = try self.bridge.runLocalServerAndNvim(
} width: size.width, height: size.height
catch let err as RxNeovimApi.Error { )
} catch let err as RxNeovimApi.Error {
self.eventsSubject.onNext(.ipcBecameInvalid( self.eventsSubject.onNext(.ipcBecameInvalid(
"Could not launch neovim (\(err))." "Could not launch neovim (\(err))."
)) ))
}
catch { return
} catch {
self.eventsSubject.onNext(.ipcBecameInvalid( self.eventsSubject.onNext(.ipcBecameInvalid(
"Could not launch neovim." "Could not launch neovim."
)) ))
}
// Wait for listen and socket creation to occur return
let timeout = Duration.seconds(4)
let start_time = ContinuousClock.now
while !FileManager.default.fileExists(atPath: sockPath) {
usleep(1000)
if ContinuousClock.now - start_time > timeout {
self.eventsSubject.onNext(.ipcBecameInvalid(
"Timeout waiting for neovim."
))
return
}
} }
// We wait here, since the user of NvimView cannot subscribe // We wait here, since the user of NvimView cannot subscribe
// on the Completable. We could demand that the user call launchNeoVim() // on the Completable. We could demand that the user call launchNeoVim()
// by themselves, but... // by themselves, but...
try?
self.api.run(at: sockPath) // See https://neovim.io/doc/user/ui.html#ui-startup for startup sequence
// When we call nvim_command("autocmd VimEnter * call rpcrequest(1, 'vimenter')")
// Neovim will send us a vimenter request and enter a blocking state.
// We do some autocmd setup and send a response to exit the blocking state in
// NvimView.swift
try? self.api.run(inPipe: inPipe, outPipe: outPipe, errorPipe: errorPipe)
.andThen( .andThen(
self.api.getApiInfo() self.api.getApiInfo(errWhenBlocked: false)
.do(onError: { err in .flatMapCompletable { value in
throw RxNeovimApi.Error guard let info = value.arrayValue,
.exception(message: "Could not connect to neovim (\(err)).") info.count == 2,
}) let channel = info[0].int32Value,
.map { let dict = info[1].dictionaryValue,
value in let version = dict["version"]?.dictionaryValue,
guard let info = value.arrayValue, let major = version["major"]?.intValue,
info.count == 2, let minor = version["minor"]?.intValue
let channel = info[0].int32Value else {
else { throw RxNeovimApi.Error.exception(message: "Could not convert values to api info.")
throw RxNeovimApi.Error }
.exception(message: "Could not convert values to api info.")
guard (major >= kMinAlphaVersion && minor >= kMinMinorVersion) || major >=
kMinMajorVersion
else {
self.eventsSubject.onNext(.ipcBecameInvalid(
"Incompatible neovim version \(major).\(minor)"
))
throw RxNeovimApi.Error.exception(message: "Incompatible neovim version.")
}
return self.api.exec2(src: """
let g:gui_vimr = 1
autocmd ExitPre * call rpcnotify(\(channel), 'autocommand', 'exitpre')
autocmd VimEnter * call rpcnotify(\(channel), 'autocommand', 'vimenter')
autocmd ColorScheme * call rpcnotify(\(channel), 'autocommand', 'colorscheme', get(nvim_get_hl(0, {'id': hlID('Normal')}), 'fg', -1), get(nvim_get_hl(0, {'id': hlID('Normal')}), 'bg', -1), get(nvim_get_hl(0, {'id': hlID('Visual')}), 'fg', -1), get(nvim_get_hl(0, {'id': hlID('Visual')}), 'bg', -1), get(nvim_get_hl(0, {'id': hlID('Directory')}), 'fg', -1))
autocmd VimEnter * call rpcrequest(\(channel), 'vimenter')
""", opts: [:], errWhenBlocked: false)
.asCompletable()
} }
)
return channel .andThen(
}.flatMapCompletable { self.api.uiAttach(width: size.width, height: size.height, options: [
// FIXME: make lua "ext_linegrid": true,
self.api.exec2(src: """ "ext_multigrid": false,
":augroup vimr "ext_tabline": MessagePackValue(self.usesCustomTabBar),
":augroup! "rgb": true,
:autocmd VimEnter * call rpcnotify(\($0), 'autocommand', 'vimenter') ])
:autocmd BufWinEnter * call rpcnotify(\( )
$0 .andThen(
), 'autocommand', 'bufwinenter', str2nr(expand('<abuf>'))) self.sourceFileUrls.reduce(.empty()) { prev, url in
:autocmd BufWinEnter * call rpcnotify(\( prev.andThen(
$0 self.api.exec2(src: "source \(url.shellEscapedPath)", opts: ["output": true])
), 'autocommand', 'bufwinleave', str2nr(expand('<abuf>'))) .map { retval in
:autocmd TabEnter * call rpcnotify(\( guard let output = retval["output"]?.stringValue else {
$0 throw RxNeovimApi.Error
), 'autocommand', 'tabenter', str2nr(expand('<abuf>'))) .exception(message: "Could not convert values to output.")
:autocmd BufWritePost * call rpcnotify(\( }
$0 return output
), 'autocommand', 'bufwritepost', str2nr(expand('<abuf>')))
:autocmd BufEnter * call rpcnotify(\(
$0
), 'autocommand', 'bufenter', str2nr(expand('<abuf>')))
:autocmd DirChanged * call rpcnotify(\(
$0
), 'autocommand', 'dirchanged', expand('<afile>'))
:autocmd ColorScheme * call rpcnotify(\($0), 'autocommand', 'colorscheme', \
get(nvim_get_hl(0, {'id': hlID('Normal')}), 'fg', -1), \
get(nvim_get_hl(0, {'id': hlID('Normal')}), 'bg', -1), \
get(nvim_get_hl(0, {'id': hlID('Visual')}), 'fg', -1), \
get(nvim_get_hl(0, {'id': hlID('Visual')}), 'bg', -1), \
get(nvim_get_hl(0, {'id': hlID('Directory')}), 'fg', -1))
:autocmd ExitPre * call rpcnotify(\($0), 'autocommand', 'exitpre')
:autocmd BufModifiedSet * call rpcnotify(\($0), 'autocommand', 'bufmodifiedset', \
str2nr(expand('<abuf>')), getbufinfo(str2nr(expand('<abuf>')))[0].changed)
:let g:gui_vimr = 1
":augroup END
""", opts: [:]).asCompletable()
.andThen(
self.sourceFileUrls.reduce(Completable.empty()) { prev, url in
prev
.andThen(
self.api.exec2(src: "source \(url.shellEscapedPath)", opts: ["output": true])
.map {
retval in
guard let output_value = retval["output"] ?? retval["output"],
let output = output_value.stringValue
else {
throw RxNeovimApi.Error
.exception(message: "Could not convert values to output.")
}
return output
}
.asCompletable()
)
} }
) .asCompletable()
.andThen(self.api.uiAttach(width: size.width, height: size.height, options: [ )
"ext_linegrid": true,
"ext_multigrid": false,
"ext_tabline": MessagePackValue(self.usesCustomTabBar),
"rgb": true,
]))
.andThen(self.api.subscribe(event: NvimView.rpcEventName))
} }
).wait() )
.andThen(self.api.subscribe(event: NvimView.rpcEventName))
.wait()
} }
private func randomEmoji() -> String { private func randomEmoji() -> String {

View File

@ -40,7 +40,7 @@ public protocol NvimViewDelegate: AnyObject {
public final class NvimView: NSView, NSUserInterfaceValidations, NSTextInputClient { public final class NvimView: NSView, NSUserInterfaceValidations, NSTextInputClient {
// MARK: - Public // MARK: - Public
public static let rpcEventName = "com.qvacua.NvimView" public static let rpcEventName = "com.qvacua.NvimView"
public static let minFontSize = 4.0 public static let minFontSize = 4.0
@ -182,13 +182,24 @@ public final class NvimView: NSView, NSUserInterfaceValidations, NSTextInputClie
self.api.msgpackRawStream self.api.msgpackRawStream
.subscribe(onNext: { [weak self] msg in .subscribe(onNext: { [weak self] msg in
switch msg { switch msg {
case let .request(msgid, method, _):
// See https://neovim.io/doc/user/ui.html#ui-startup
// "vimenter" RPC request will be sent to us
// which is the result of
// nvim_command("autocmd VimEnter * call rpcrequest(1, 'vimenter')") in
// NvimView+Resize.swift
// This is the only request sent from Neovim to the UI, afaics.
guard method == "vimenter" else { break }
self?.log.debug("Processing blocking vimenter request")
self?.setupAutocmdsAndSendResponse(forMsgid: msgid)
case let .notification(method, params): case let .notification(method, params):
self?.log.debug("NOTIFICATION: \(method): \(params)") self?.log.trace("NOTIFICATION: \(method): \(params)")
if method == NvimView.rpcEventName { if method == NvimView.rpcEventName {
self?.eventsSubject.onNext(.rpcEvent(params)) self?.eventsSubject.onNext(.rpcEvent(params))
} }
if method == "redraw" { if method == "redraw" {
self?.renderData(params) self?.renderData(params)
} else if method == "autocommand" { } else if method == "autocommand" {
@ -346,4 +357,33 @@ public final class NvimView: NSView, NSUserInterfaceValidations, NSTextInputClie
private var _linespacing = NvimView.defaultLinespacing private var _linespacing = NvimView.defaultLinespacing
private var _characterspacing = NvimView.defaultCharacterspacing private var _characterspacing = NvimView.defaultCharacterspacing
private func setupAutocmdsAndSendResponse(forMsgid msgid: UInt32) {
self.api.getApiInfo(errWhenBlocked: false)
.flatMapCompletable { value in
guard let info = value.arrayValue,
info.count == 2,
let channel = info[0].int32Value
else {
throw RxNeovimApi.Error.exception(message: "Could not convert values to api info.")
}
// swiftformat:disable all
return self.api.exec2(src: """
autocmd BufWinEnter * call rpcnotify(\(channel), 'autocommand', 'bufwinenter', str2nr(expand('<abuf>')))
autocmd BufWinLeave * call rpcnotify(\(channel), 'autocommand', 'bufwinleave', str2nr(expand('<abuf>')))
autocmd TabEnter * call rpcnotify(\(channel), 'autocommand', 'tabenter', str2nr(expand('<abuf>')))
autocmd BufWritePost * call rpcnotify(\(channel), 'autocommand', 'bufwritepost', str2nr(expand('<abuf>')))
autocmd BufEnter * call rpcnotify(\(channel), 'autocommand', 'bufenter', str2nr(expand('<abuf>')))
autocmd DirChanged * call rpcnotify(\( channel), 'autocommand', 'dirchanged', expand('<afile>'))
autocmd BufModifiedSet * call rpcnotify(\(channel), 'autocommand', 'bufmodifiedset', str2nr(expand('<abuf>')), getbufinfo(str2nr(expand('<abuf>')))[0].changed)
""", opts: [:], errWhenBlocked: false)
// swiftformat:enable all
.asCompletable()
.andThen(
self.api.sendResponse(msgid: msgid, error: .nil, result: .nil)
)
}
.subscribe().disposed(by: self.disposeBag)
}
} }

View File

@ -7,13 +7,12 @@ import Commons
import Foundation import Foundation
import MessagePack import MessagePack
import os import os
import RxPack
import RxSwift
import RxNeovim import RxNeovim
import RxPack import RxPack
import RxSwift
let kMinAlphaVersion = 0 let kMinAlphaVersion = 0
let kMinMinorVersion = 10 let kMinMinorVersion = 9
let kMinMajorVersion = 1 let kMinMajorVersion = 1
final class UiBridge { final class UiBridge {
@ -39,11 +38,11 @@ final class UiBridge {
} }
} }
func runLocalServerAndNvim(width: Int, height: Int) throws { func runLocalServerAndNvim(width: Int, height: Int) throws -> (Pipe, Pipe, Pipe) {
self.initialWidth = width self.initialWidth = width
self.initialHeight = height self.initialHeight = height
try self.launchNvimUsingLoginShellEnv() return try self.launchNvimUsingLoginShellEnv()
} }
func quit() -> Completable { func quit() -> Completable {
@ -70,7 +69,7 @@ final class UiBridge {
self.nvimServerProc?.terminate() self.nvimServerProc?.terminate()
} }
private func launchNvimUsingLoginShellEnv() throws { private func launchNvimUsingLoginShellEnv() throws -> (Pipe, Pipe, Pipe) {
var env = self.envDict var env = self.envDict
env["NVIM_LISTEN_ADDRESS"] = self.listenAddress env["NVIM_LISTEN_ADDRESS"] = self.listenAddress
@ -85,9 +84,7 @@ final class UiBridge {
process.standardOutput = outPipe process.standardOutput = outPipe
process.currentDirectoryPath = self.cwd.path process.currentDirectoryPath = self.cwd.path
if self.nvimBinary != "", if self.nvimBinary != "", FileManager.default.fileExists(atPath: self.nvimBinary) {
FileManager.default.fileExists(atPath: self.nvimBinary)
{
process.launchPath = self.nvimBinary process.launchPath = self.nvimBinary
} else { } else {
// We know that NvimServer is there. // We know that NvimServer is there.
@ -97,11 +94,7 @@ final class UiBridge {
} }
process.environment = env process.environment = env
process process .arguments = ["--embed"] + self.nvimArgs
.arguments =
["--embed",
"--listen",
self.listenAddress] + self.nvimArgs
self.log.debug( self.log.debug(
"Launching NvimServer \(String(describing: process.launchPath)) with args: \(String(describing: process.arguments))" "Launching NvimServer \(String(describing: process.launchPath)) with args: \(String(describing: process.arguments))"
@ -113,71 +106,8 @@ final class UiBridge {
.exception(message: "Could not run neovim process.") .exception(message: "Could not run neovim process.")
} }
try self.doInitialVersionCheck(inPipe: inPipe, outPipe: outPipe)
self.nvimServerProc = process self.nvimServerProc = process
} return (inPipe, outPipe, errorPipe)
private func doInitialVersionCheck(inPipe: Pipe, outPipe: Pipe) throws {
// Construct Msgpack query for api info
let packed = pack(
[
.uint(RxMsgpackRpc.MessageType.request.rawValue),
.uint(UInt64(0)),
.string("nvim_get_api_info"),
.array([]),
]
)
try inPipe.fileHandleForWriting.write(contentsOf: packed)
// Read responses from the pipe back
var accumulatedData : Data = Data()
var values : [MessagePackValue] = []
var remainderData: Data? = nil
while (true) {
let data = outPipe.fileHandleForReading.availableData
if data.count == 0 {
break
}
accumulatedData.append(data)
try (values, remainderData) = RxMsgpackRpc.unpackAllWithReminder(accumulatedData)
if let remainderData { accumulatedData = remainderData }
else { accumulatedData.count = 0 }
if values.count > 0 {
break
}
}
// Validate version response
guard values.count >= 1,
let firstResponse = values[0].arrayValue,
firstResponse.count == 4,
let rawType = firstResponse[0].uint64Value,
let type = RxMsgpackRpc.MessageType(rawValue: rawType),
type == RxMsgpackRpc.MessageType.response /* this is a response */,
let msgId = firstResponse[1].uint64Value,
msgId == 0 /* no confusion on stream */,
firstResponse[2] == nil /* no error */,
let info = firstResponse[3].arrayValue /* response value */,
info.count == 2,
let dict = info[1].dictionaryValue,
let version = dict["version"]?.dictionaryValue,
let major = version["major"]?.intValue,
let minor = version["minor"]?.intValue
else {
throw RxNeovimApi.Error
.exception(message: "Could not convert values to api info.")
}
guard (major >= kMinAlphaVersion && minor >= kMinMinorVersion) || major >= kMinMajorVersion
else {
throw RxNeovimApi.Error
.exception(message: "Incompatible neovim version.")
}
} }
private func interactive(for shell: URL) -> Bool { private func interactive(for shell: URL) -> Bool {

View File

@ -4,6 +4,7 @@ import PackageDescription
let package = Package( let package = Package(
name: "RxPack", name: "RxPack",
platforms: [.macOS(.v13)],
products: [ products: [
.library(name: "RxPack", targets: ["RxPack"]), .library(name: "RxPack", targets: ["RxPack"]),
.library(name: "RxNeovim", targets: ["RxNeovim"]), .library(name: "RxNeovim", targets: ["RxNeovim"]),
@ -11,14 +12,12 @@ let package = Package(
dependencies: [ dependencies: [
.package(url: "https://github.com/ReactiveX/RxSwift", from: "6.6.0"), .package(url: "https://github.com/ReactiveX/RxSwift", from: "6.6.0"),
.package(url: "https://github.com/a2/MessagePack.swift", .upToNextMinor(from: "4.0.0")), .package(url: "https://github.com/a2/MessagePack.swift", .upToNextMinor(from: "4.0.0")),
.package(url: "https://github.com/IBM-Swift/BlueSocket", from: "2.0.2"),
.package(url: "https://github.com/Quick/Nimble", from: "13.0.0"), .package(url: "https://github.com/Quick/Nimble", from: "13.0.0"),
], ],
targets: [ targets: [
.target(name: "RxPack", dependencies: [ .target(name: "RxPack", dependencies: [
.product(name: "RxSwift", package: "RxSwift"), .product(name: "RxSwift", package: "RxSwift"),
.product(name: "MessagePack", package: "MessagePack.swift"), .product(name: "MessagePack", package: "MessagePack.swift"),
.product(name: "Socket", package: "BlueSocket"),
]), ]),
.testTarget(name: "RxPackTests", dependencies: [ .testTarget(name: "RxPackTests", dependencies: [
"RxPack", "RxPack",

View File

@ -34,7 +34,9 @@ public final class RxNeovimApi {
public var msgpackRawStream: Observable<RxMsgpackRpc.Message> { self.msgpackRpc.stream } public var msgpackRawStream: Observable<RxMsgpackRpc.Message> { self.msgpackRpc.stream }
public func run(at path: String) -> Completable { self.msgpackRpc.run(at: path) } public func run(inPipe: Pipe, outPipe: Pipe, errorPipe: Pipe) -> Completable {
self.msgpackRpc.run(inPipe: inPipe, outPipe: outPipe, errorPipe: errorPipe)
}
public func stop() -> Completable { self.msgpackRpc.stop() } public func stop() -> Completable { self.msgpackRpc.stop() }
@ -64,6 +66,10 @@ public final class RxNeovimApi {
} }
} }
public func sendResponse(msgid: UInt32, error: Value, result: Value) -> Completable {
self.msgpackRpc.response(msgid: msgid, error: error, result: result)
}
public init() {} public init() {}
private let msgpackRpc = RxMsgpackRpc(queueQos: .userInteractive) private let msgpackRpc = RxMsgpackRpc(queueQos: .userInteractive)

View File

@ -4,7 +4,6 @@
import Foundation import Foundation
import MessagePack import MessagePack
import RxSwift import RxSwift
import Socket
public final class RxMsgpackRpc { public final class RxMsgpackRpc {
public static let defaultReadBufferSize = 10240 public static let defaultReadBufferSize = 10240
@ -21,6 +20,7 @@ public final class RxMsgpackRpc {
case response(msgid: UInt32, error: Value, result: Value) case response(msgid: UInt32, error: Value, result: Value)
case notification(method: String, params: [Value]) case notification(method: String, params: [Value])
case error(value: Value, msg: String) case error(value: Value, msg: String)
case request(msgid: UInt32, method: String, params: [Value])
} }
public struct Response { public struct Response {
@ -67,23 +67,14 @@ public final class RxMsgpackRpc {
) )
} }
public func run( public func run(inPipe: Pipe, outPipe: Pipe, errorPipe: Pipe) -> Completable {
at path: String, self.inPipe = inPipe
readBufferSize: Int = RxMsgpackRpc.defaultReadBufferSize self.outPipe = outPipe
) -> Completable { self.errorPipe = errorPipe
Completable.create { completable in
self.queue.async {
do {
try self.socket = Socket.create(family: .unix, type: .stream, proto: .unix)
self.socket?.readBufferSize = readBufferSize
try self.socket?.connect(to: path)
self.setUpThreadAndStartReading()
} catch {
self.socket = nil
self.streamSubject.onError(Error(msg: "Could not get socket", cause: error))
completable(.error(Error(msg: "Could not get socket at \(path)", cause: error)))
}
return Completable.create { completable in
self.queue.async {
self.startReading()
completable(.completed) completable(.completed)
} }
@ -94,7 +85,7 @@ public final class RxMsgpackRpc {
public func stop() -> Completable { public func stop() -> Completable {
Completable.create { completable in Completable.create { completable in
self.queue.async { self.queue.async {
self.cleanUpAndCloseSocket() self.cleanUp()
completable(.completed) completable(.completed)
} }
@ -102,6 +93,38 @@ public final class RxMsgpackRpc {
} }
} }
public func response(msgid: UInt32, error: Value, result: Value) -> Completable {
Completable.create { completable in
self.queue.async {
let packed = pack(
[
.uint(MessageType.response.rawValue),
.uint(UInt64(msgid)),
error,
result,
]
)
do {
try self.inPipe?.fileHandleForWriting.write(contentsOf: packed)
completable(.completed)
} catch {
self.streamSubject.onError(Error(
msg: "Could not write to socket for msg id: \(msgid)", cause: error
))
completable(.error(Error(
msg: "Could not write to socket for msg id: \(msgid)", cause: error
)))
return
}
}
return Disposables.create()
}
}
public func request( public func request(
method: String, method: String,
params: [Value], params: [Value],
@ -121,33 +144,10 @@ public final class RxMsgpackRpc {
] ]
) )
if self.socket?.remoteConnectionClosed == true {
single(.failure(Error(
msg: "Connection stopped, but trying to send a request with msg id \(msgid)"
)))
return
}
guard let socket = self.socket else {
single(.failure(Error(
msg: "Socket is invalid, but trying to send a request with " +
"msg id \(msgid): \(method) with \(params)"
)))
return
}
if expectsReturnValue { self.singles[msgid] = single } if expectsReturnValue { self.singles[msgid] = single }
do { do {
var remainder: Data? = packed try self.inPipe?.fileHandleForWriting.write(contentsOf: packed)
while let dataToSend = remainder {
let writtenBytes = try socket.write(from: dataToSend)
if writtenBytes < dataToSend.count {
remainder = packed.suffix(from: writtenBytes)
} else {
remainder = nil
}
}
} catch { } catch {
self.streamSubject.onError(Error( self.streamSubject.onError(Error(
msg: "Could not write to socket for msg id: \(msgid)", cause: error msg: "Could not write to socket for msg id: \(msgid)", cause: error
@ -169,10 +169,13 @@ public final class RxMsgpackRpc {
private var nextMsgid: UInt32 = 0 private var nextMsgid: UInt32 = 0
private var socket: Socket?
private let queue: DispatchQueue private let queue: DispatchQueue
private let dataQueue: DispatchQueue private let dataQueue: DispatchQueue
private var inPipe: Pipe?
private var outPipe: Pipe?
private var errorPipe: Pipe?
private var singles: [UInt32: SingleResponseObserver] = [:] private var singles: [UInt32: SingleResponseObserver] = [:]
private let streamSubject = PublishSubject<Message>() private let streamSubject = PublishSubject<Message>()
@ -181,52 +184,40 @@ public final class RxMsgpackRpc {
Response(msgid: msgid, error: .nil, result: .nil) Response(msgid: msgid, error: .nil, result: .nil)
} }
private func cleanUpAndCloseSocket() { private func cleanUp() {
self.inPipe = nil
self.outPipe = nil
self.errorPipe = nil
self.streamSubject.onCompleted() self.streamSubject.onCompleted()
self.singles.forEach { _, single in single(.failure(Error(msg: "Socket closed"))) } self.singles.forEach { _, single in single(.failure(Error(msg: "Socket closed"))) }
self.singles.removeAll()
self.socket?.close()
} }
private func setUpThreadAndStartReading() { private func startReading() {
self.dataQueue.async { [unowned self] in self.dataQueue.async { [unowned self] in
guard let socket = self.socket else { return } var readData: Data
var dataToUnmarshall = Data(capacity: Self.defaultReadBufferSize) var dataToUnmarshall = Data(capacity: Self.defaultReadBufferSize)
repeat { repeat {
do { do {
var readData = Data(capacity: Self.defaultReadBufferSize) guard let buffer = self.outPipe?.fileHandleForReading.availableData else { break }
let readBytes = try socket.read(into: &readData) readData = buffer
if readBytes > 0 { if readData.count > 0 {
dataToUnmarshall.append(readData) dataToUnmarshall.append(readData)
let (values, remainderData) = try RxMsgpackRpc.unpackAllWithReminder(dataToUnmarshall) let (values, remainderData) = try self.unpackAllWithReminder(dataToUnmarshall)
if let remainderData { dataToUnmarshall = remainderData } if let remainderData { dataToUnmarshall = remainderData }
else { dataToUnmarshall.count = 0 } else { dataToUnmarshall.count = 0 }
values.forEach(self.processMessage) values.forEach(self.processMessage)
} else if readBytes == 0 {
if socket.remoteConnectionClosed {
self.queue.async { self.cleanUpAndCloseSocket() }
return
}
continue
} }
} catch let error as Socket.Error {
self.streamSubject.onError(Error(msg: "Could not read from socket", cause: error))
self.queue.async { self.cleanUpAndCloseSocket() }
return
} catch { } catch {
self.streamSubject.onNext( self.streamSubject.onError(Error(msg: "Could not read from pipe", cause: error))
.error(value: .nil, msg: "Data from socket could not be unpacked")
)
self.queue.async { self.cleanUpAndCloseSocket() }
return
} }
} while self.socket?.remoteConnectionClosed == false } while readData.count > 0
self.streamSubject.onNext(.notification(method: "autocommand", params: ["exitpre"]))
self.cleanUp()
} }
} }
@ -286,15 +277,20 @@ public final class RxMsgpackRpc {
self.streamSubject.onNext(.notification(method: method, params: params)) self.streamSubject.onNext(.notification(method: method, params: params))
case .request: case .request:
self.streamSubject.onNext(.error( guard let msgid = array[1].uint32Value, let method = array[2].stringValue,
value: unpacked, let params = array[3].arrayValue
msg: "Got message type request from remote" else {
)) return
}
self.streamSubject.onNext(.request(msgid: msgid, method: method, params: params))
return return
} }
} }
public static func unpackAllWithReminder(_ data: Data) throws -> (values: [Value], remainder: Data?) { private func unpackAllWithReminder(_ data: Data) throws
-> (values: [Value], remainder: Data?)
{
var values = [Value]() var values = [Value]()
var remainderData: Data? var remainderData: Data?

View File

@ -1,14 +1,5 @@
{ {
"pins" : [ "pins" : [
{
"identity" : "bluesocket",
"kind" : "remoteSourceControl",
"location" : "https://github.com/IBM-Swift/BlueSocket",
"state" : {
"revision" : "7b23a867008e0027bfd6f4d398d44720707bc8ca",
"version" : "2.0.4"
}
},
{ {
"identity" : "cwlcatchexception", "identity" : "cwlcatchexception",
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",

View File

@ -13,7 +13,6 @@ import Workspace
extension MainWindow { extension MainWindow {
func rpcEventAction(params rawParams: [MessagePackValue]) { func rpcEventAction(params rawParams: [MessagePackValue]) {
Swift.print("################### \(rawParams)")
guard rawParams.count > 0 else { return } guard rawParams.count > 0 else { return }
guard let strEvent = rawParams[0].stringValue, guard let strEvent = rawParams[0].stringValue,

View File

@ -21,11 +21,13 @@ main() {
if [[ "${for_dev}" == true ]]; then if [[ "${for_dev}" == true ]]; then
pushd ./Neovim >/dev/null pushd ./Neovim >/dev/null
make CMAKE_BUILD_TYPE=Release mkdir -p ./build/install
make CMAKE_BUILD_TYPE=Release CMAKE_EXTRA_FLAGS="-DCMAKE_INSTALL_PREFIX=./install"
make install
popd >/dev/null popd >/dev/null
cp ./Neovim/build/bin/nvim "${resources_folder}/NvimServer" cp ./Neovim/build/install/bin/nvim "${resources_folder}/NvimServer"
cp -r ./Neovim/build/runtime "${resources_folder}" cp -r ./Neovim/build/install/share/nvim/runtime "${resources_folder}"
else else
./bin/neovim/bin/build_neovim.sh ./bin/neovim/bin/build_neovim.sh
pushd ./Neovim/build >/dev/null pushd ./Neovim/build >/dev/null