1
1
mirror of https://github.com/wez/wezterm.git synced 2024-09-17 17:57:28 +03:00

ssh: introduce wrapper enum

This is a pre-cursor step to adding libssh support
This commit is contained in:
Wez Furlong 2021-10-12 20:50:00 -07:00
parent 24875004f6
commit f1e5c59566
6 changed files with 393 additions and 203 deletions

97
Cargo.lock generated
View File

@ -31,9 +31,9 @@ checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e"
[[package]]
name = "ahash"
version = "0.7.4"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom 0.2.3",
"once_cell",
@ -703,7 +703,7 @@ checksum = "7ade49b65d560ca58c403a479bb396592b155c0185eada742ee323d1d68d6318"
dependencies = [
"bitflags",
"block",
"core-foundation 0.9.1",
"core-foundation 0.9.2",
"core-graphics-types",
"foreign-types",
"libc",
@ -843,11 +843,11 @@ dependencies = [
[[package]]
name = "core-foundation"
version = "0.9.1"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62"
checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3"
dependencies = [
"core-foundation-sys 0.8.2",
"core-foundation-sys 0.8.3",
"libc",
]
@ -859,9 +859,9 @@ checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
[[package]]
name = "core-foundation-sys"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "core-graphics"
@ -882,7 +882,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "269f35f69b542b80e736a20a89a05215c0ce80c2c03c514abb2e318b78379d86"
dependencies = [
"bitflags",
"core-foundation 0.9.1",
"core-foundation 0.9.2",
"core-graphics-types",
"foreign-types",
"libc",
@ -895,7 +895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a68b68b3446082644c91ac778bf50cd4104bfb002b5a6a7c44cca5a2c70788b"
dependencies = [
"bitflags",
"core-foundation 0.9.1",
"core-foundation 0.9.2",
"foreign-types",
"libc",
]
@ -906,7 +906,7 @@ version = "19.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d74ada66e07c1cefa18f8abfba765b486f250de2e4a999e5727fc0dd4b4a25"
dependencies = [
"core-foundation 0.9.1",
"core-foundation 0.9.2",
"core-graphics 0.22.2",
"foreign-types",
"libc",
@ -1269,9 +1269,9 @@ checksum = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569"
[[package]]
name = "encoding_rs"
version = "0.8.28"
version = "0.8.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065"
checksum = "a74ea89a0a1b98f6332de42c95baff457ada66d1cb4030f9ff151b2041a1c746"
dependencies = [
"cfg-if 1.0.0",
]
@ -1887,9 +1887,9 @@ dependencies = [
[[package]]
name = "gif"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a668f699973d0f573d15749b7002a9ac9e1f9c6b220e7b165601334c173d8de"
checksum = "c3a7187e78088aead22ceedeee99779455b23fc231fe13ec443f99bb71694e5b"
dependencies = [
"color_quant",
"weezl",
@ -2059,7 +2059,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash 0.7.4",
"ahash 0.7.6",
]
[[package]]
@ -2386,9 +2386,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.103"
version = "0.2.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
[[package]]
name = "libloading"
@ -2410,6 +2410,27 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "libssh-rs"
version = "0.1.0"
source = "git+https://github.com/wez/libssh-rs.git?rev=3b74ea94143725f3411fa649d7f6f6b5178d4b1b#3b74ea94143725f3411fa649d7f6f6b5178d4b1b"
dependencies = [
"bitflags",
"libssh-rs-sys",
"thiserror",
]
[[package]]
name = "libssh-rs-sys"
version = "0.1.0"
source = "git+https://github.com/wez/libssh-rs.git?rev=3b74ea94143725f3411fa649d7f6f6b5178d4b1b#3b74ea94143725f3411fa649d7f6f6b5178d4b1b"
dependencies = [
"cc",
"libz-sys",
"openssl-sys",
"pkg-config",
]
[[package]]
name = "libssh2-sys"
version = "0.2.23"
@ -2615,7 +2636,7 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a00f42f354a2ed4894db863b3a4db47aef2d2e4435b937221749bd37a8a7aaa8"
dependencies = [
"ahash 0.7.4",
"ahash 0.7.6",
"metrics-macros",
"proc-macro-hack",
]
@ -3119,9 +3140,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
[[package]]
name = "openssl-src"
version = "111.16.0+1.1.1l"
version = "300.0.2+3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab2173f69416cf3ec12debb5823d244127d23a9b127d5a5189aa97c5fa2859f"
checksum = "14a760a11390b1a5daf72074d4f6ff1a6e772534ae191f999f57e9ee8146d1fb"
dependencies = [
"cc",
]
@ -3129,8 +3150,7 @@ dependencies = [
[[package]]
name = "openssl-sys"
version = "0.9.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058"
source = "git+https://github.com/wez/rust-openssl.git?branch=openssl-src-300#173970ddebf98bdaf8c28481a84092c85a3a9376"
dependencies = [
"autocfg",
"cc",
@ -3247,9 +3267,9 @@ checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd"
[[package]]
name = "pem"
version = "0.8.3"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb"
checksum = "3f2373df5233932a893d3bc2c78a0bf3f6d12590a1edd546b4fbefcac32c5c0f"
dependencies = [
"base64",
"once_cell",
@ -3468,9 +3488,9 @@ dependencies = [
[[package]]
name = "ppv-lite86"
version = "0.2.10"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741"
[[package]]
name = "predicates"
@ -3572,9 +3592,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.29"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d"
checksum = "edc3358ebc67bc8b7fa0c007f945b0b18226f78437d61bec735a9eb96b61ee70"
dependencies = [
"unicode-xid",
]
@ -3823,9 +3843,9 @@ dependencies = [
[[package]]
name = "rcgen"
version = "0.8.13"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2351cbef4bf91837f5ff7face6091cb277ba960d1638d2c5ae2327859912fbba"
checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7"
dependencies = [
"chrono",
"pem",
@ -4070,8 +4090,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87"
dependencies = [
"bitflags",
"core-foundation 0.9.1",
"core-foundation-sys 0.8.2",
"core-foundation 0.9.2",
"core-foundation-sys 0.8.3",
"libc",
"security-framework-sys",
]
@ -4082,7 +4102,7 @@ version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e"
dependencies = [
"core-foundation-sys 0.8.2",
"core-foundation-sys 0.8.3",
"libc",
]
@ -4293,9 +4313,9 @@ checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b"
[[package]]
name = "slab"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "slotmap"
@ -4536,7 +4556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567e910ef0207be81a4e1bb0491e9a8d9866cf45b20fe1a52c03d347da9ea51b"
dependencies = [
"cfg-if 1.0.0",
"core-foundation-sys 0.8.2",
"core-foundation-sys 0.8.3",
"doc-comment",
"libc",
"ntapi",
@ -5280,7 +5300,7 @@ dependencies = [
"anyhow",
"color-types",
"config",
"core-foundation 0.9.1",
"core-foundation 0.9.2",
"core-text",
"dwrote",
"enum-display-derive",
@ -5459,6 +5479,7 @@ dependencies = [
"filenamegen",
"indoc",
"k9",
"libssh-rs",
"log",
"once_cell",
"portable-pty",

View File

@ -12,6 +12,10 @@ resolver = "2"
opt-level = 3
# debug = 1
[patch.crates-io]
# We can remove my fork once openssl-sys points at openssl-src 300+
openssl-sys = { git="https://github.com/wez/rust-openssl.git", branch="openssl-src-300" }
[profile.dev]
# https://jakedeichert.com/blog/reducing-rust-incremental-compilation-times-on-macos-by-70-percent/
split-debuginfo = "unpacked"

View File

@ -26,6 +26,7 @@ portable-pty = { version="0.5", path = "../pty" }
regex = "1"
smol = "1.2"
ssh2 = {version="0.9.3", features=["openssl-on-win32"]}
libssh-rs = {git="https://github.com/wez/libssh-rs.git", rev="3b74ea94143725f3411fa649d7f6f6b5178d4b1b", features=["vendored", "vendored-openssl"]}
thiserror = "1.0"
# Not used directly, but is used to centralize the openssl vendor feature selection

View File

@ -1,4 +1,6 @@
use crate::session::{ChannelId, ChannelInfo, DescriptorState, SessionRequest, SessionSender};
use crate::session::{
ChannelId, ChannelInfo, DescriptorState, SessionRequest, SessionSender, SessionWrap,
};
use filedescriptor::{socketpair, FileDescriptor};
use portable_pty::{ExitStatus, PtySize};
use smol::channel::{bounded, Receiver, Sender, TryRecvError};
@ -158,12 +160,10 @@ impl portable_pty::Child for SshChildProcess {
}
impl crate::session::SessionInner {
pub fn new_pty(&mut self, sess: &ssh2::Session, newpty: &NewPty) -> anyhow::Result<()> {
pub fn new_pty(&mut self, sess: &mut SessionWrap, newpty: &NewPty) -> anyhow::Result<()> {
sess.set_blocking(true);
let mut channel = sess.channel_session()?;
channel.handle_extended_data(ssh2::ExtendedData::Merge)?;
let mut channel = sess.open_session()?;
/* libssh2 doesn't properly support agent forwarding
* at this time:
@ -177,20 +177,11 @@ impl crate::session::SessionInner {
}
*/
channel.request_pty(
&newpty.term,
None,
Some((
newpty.size.cols.into(),
newpty.size.rows.into(),
newpty.size.pixel_width.into(),
newpty.size.pixel_height.into(),
)),
)?;
channel.request_pty(newpty)?;
if let Some(env) = &newpty.env {
for (key, val) in env {
if let Err(err) = channel.setenv(key, val) {
if let Err(err) = channel.request_env(key, val) {
// Depending on the server configuration, a given
// setenv request may not succeed, but that doesn't
// prevent the connection from being set up.
@ -200,9 +191,9 @@ impl crate::session::SessionInner {
}
if let Some(cmd) = &newpty.command_line {
channel.exec(cmd)?;
channel.request_exec(cmd)?;
} else {
channel.shell()?;
channel.request_shell()?;
}
let channel_id = self.next_channel_id;
@ -257,19 +248,12 @@ impl crate::session::SessionInner {
Ok(())
}
pub fn resize_pty(&mut self, sess: &ssh2::Session, resize: &ResizePty) -> anyhow::Result<()> {
sess.set_blocking(true);
pub fn resize_pty(&mut self, resize: &ResizePty) -> anyhow::Result<()> {
let info = self
.channels
.get_mut(&resize.channel)
.ok_or_else(|| anyhow::anyhow!("invalid channel id {}", resize.channel))?;
info.channel.request_pty_size(
resize.size.cols.into(),
resize.size.rows.into(),
Some(resize.size.pixel_width.into()),
Some(resize.size.pixel_height.into()),
)?;
info.channel.resize_pty(resize)?;
if let Some(reply) = resize.reply.as_ref() {
reply.try_send(())?;
}

View File

@ -5,14 +5,15 @@ use crate::pty::*;
use anyhow::{anyhow, Context};
use camino::Utf8PathBuf;
use filedescriptor::{
poll, pollfd, socketpair, AsRawSocketDescriptor, FileDescriptor, POLLIN, POLLOUT,
poll, pollfd, socketpair, AsRawSocketDescriptor, FileDescriptor, SocketDescriptor, POLLIN,
POLLOUT,
};
use portable_pty::{ExitStatus, PtySize};
use smol::channel::{bounded, Receiver, Sender, TryRecvError};
use ssh2::BlockDirections;
use std::collections::{HashMap, VecDeque};
use std::convert::TryFrom;
use std::io::{self, Read, Write};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@ -78,9 +79,218 @@ pub(crate) struct DescriptorState {
pub buf: VecDeque<u8>,
}
pub(crate) enum FileWrap {
Ssh2(ssh2::File),
}
impl FileWrap {
pub fn reader(&mut self) -> impl std::io::Read + '_ {
match self {
Self::Ssh2(file) => file,
}
}
pub fn writer(&mut self) -> impl std::io::Write + '_ {
match self {
Self::Ssh2(file) => file,
}
}
pub fn set_metadata(&mut self, metadata: Metadata) -> SftpChannelResult<()> {
match self {
Self::Ssh2(file) => file
.setstat(metadata.into())
.map_err(SftpChannelError::from),
}
}
pub fn metadata(&mut self) -> SftpChannelResult<Metadata> {
match self {
Self::Ssh2(file) => file
.stat()
.map(Metadata::from)
.map_err(SftpChannelError::from),
}
}
pub fn read_dir(&mut self) -> SftpChannelResult<(Utf8PathBuf, Metadata)> {
match self {
Self::Ssh2(file) => {
file.readdir()
.map_err(SftpChannelError::from)
.and_then(|(path, stat)| match Utf8PathBuf::try_from(path) {
Ok(path) => Ok((path, Metadata::from(stat))),
Err(x) => Err(SftpChannelError::from(std::io::Error::new(
std::io::ErrorKind::InvalidData,
x,
))),
})
}
}
}
pub fn fsync(&mut self) -> SftpChannelResult<()> {
match self {
Self::Ssh2(file) => file.fsync().map_err(SftpChannelError::from),
}
}
}
pub(crate) struct Ssh2Session {
sess: ssh2::Session,
sftp: Option<ssh2::Sftp>,
}
pub(crate) enum SessionWrap {
Ssh2(Ssh2Session),
}
impl SessionWrap {
pub fn with_ssh2(sess: ssh2::Session) -> Self {
Self::Ssh2(Ssh2Session { sess, sftp: None })
}
pub fn set_blocking(&mut self, blocking: bool) {
match self {
Self::Ssh2(sess) => sess.sess.set_blocking(blocking),
}
}
pub fn is_blocking(&mut self) -> bool {
match self {
Self::Ssh2(sess) => sess.sess.is_blocking(),
}
}
pub fn get_poll_flags(&self) -> i16 {
match self {
Self::Ssh2(sess) => match sess.sess.block_directions() {
BlockDirections::None => 0,
BlockDirections::Inbound => POLLIN,
BlockDirections::Outbound => POLLOUT,
BlockDirections::Both => POLLIN | POLLOUT,
},
}
}
pub fn as_socket_descriptor(&self) -> SocketDescriptor {
match self {
Self::Ssh2(sess) => sess.sess.as_socket_descriptor(),
}
}
pub fn open_session(&self) -> anyhow::Result<ChannelWrap> {
match self {
Self::Ssh2(sess) => {
let channel = sess.sess.channel_session()?;
// FIXME: remove this concept
// channel.handle_extended_data(ssh2::ExtendedData::Merge)?;
Ok(ChannelWrap::Ssh2(channel))
}
}
}
}
pub(crate) enum ChannelWrap {
Ssh2(ssh2::Channel),
}
fn has_signal(chan: &ssh2::Channel) -> Option<ssh2::ExitSignal> {
if let Ok(sig) = chan.exit_signal() {
if sig.exit_signal.is_some() {
return Some(sig);
}
}
None
}
impl ChannelWrap {
pub fn exit_status(&mut self) -> Option<ExitStatus> {
match self {
Self::Ssh2(chan) => {
if chan.eof() && chan.wait_close().is_ok() {
if let Some(_sig) = has_signal(chan) {
Some(ExitStatus::with_exit_code(1))
} else if let Ok(status) = chan.exit_status() {
Some(ExitStatus::with_exit_code(status as _))
} else {
None
}
} else {
None
}
}
}
}
pub fn reader(&mut self, idx: usize) -> impl std::io::Read + '_ {
match self {
Self::Ssh2(chan) => chan.stream(idx as i32),
}
}
pub fn writer(&mut self) -> impl std::io::Write + '_ {
match self {
Self::Ssh2(chan) => chan,
}
}
pub fn close(&mut self) {
match self {
Self::Ssh2(chan) => {
let _ = chan.close();
}
}
}
pub fn request_pty(&mut self, newpty: &NewPty) -> anyhow::Result<()> {
match self {
Self::Ssh2(chan) => Ok(chan.request_pty(
&newpty.term,
None,
Some((
newpty.size.cols.into(),
newpty.size.rows.into(),
newpty.size.pixel_width.into(),
newpty.size.pixel_height.into(),
)),
)?),
}
}
pub fn request_env(&mut self, name: &str, value: &str) -> anyhow::Result<()> {
match self {
Self::Ssh2(chan) => Ok(chan.setenv(name, value)?),
}
}
pub fn request_exec(&mut self, command_line: &str) -> anyhow::Result<()> {
match self {
Self::Ssh2(chan) => Ok(chan.exec(command_line)?),
}
}
pub fn request_shell(&mut self) -> anyhow::Result<()> {
match self {
Self::Ssh2(chan) => Ok(chan.shell()?),
}
}
pub fn resize_pty(&mut self, resize: &ResizePty) -> anyhow::Result<()> {
match self {
Self::Ssh2(chan) => Ok(chan.request_pty_size(
resize.size.cols.into(),
resize.size.rows.into(),
Some(resize.size.pixel_width.into()),
Some(resize.size.pixel_height.into()),
)?),
}
}
}
pub(crate) struct ChannelInfo {
pub channel_id: ChannelId,
pub channel: ssh2::Channel,
pub channel: ChannelWrap,
pub exit: Option<Sender<ExitStatus>>,
pub descriptors: [DescriptorState; 3],
}
@ -92,8 +302,7 @@ pub(crate) struct SessionInner {
pub tx_event: Sender<SessionEvent>,
pub rx_req: Receiver<SessionRequest>,
pub channels: HashMap<ChannelId, ChannelInfo>,
pub files: HashMap<FileId, ssh2::File>,
pub sftp: Option<ssh2::Sftp>,
pub files: HashMap<FileId, FileWrap>,
pub next_channel_id: ChannelId,
pub next_file_id: FileId,
pub sender_read: FileDescriptor,
@ -200,16 +409,16 @@ impl SessionInner {
.context("notifying user that session is authenticated")?;
sess.set_blocking(false);
self.request_loop(sess)
self.request_loop(SessionWrap::with_ssh2(sess))
}
fn request_loop(&mut self, sess: ssh2::Session) -> anyhow::Result<()> {
fn request_loop(&mut self, mut sess: SessionWrap) -> anyhow::Result<()> {
let mut sleep_delay = Duration::from_millis(100);
loop {
self.tick_io()?;
self.drain_request_pipe();
self.dispatch_pending_requests(&sess)?;
self.dispatch_pending_requests(&mut sess)?;
let mut poll_array = vec![
pollfd {
@ -219,12 +428,7 @@ impl SessionInner {
},
pollfd {
fd: sess.as_socket_descriptor(),
events: match sess.block_directions() {
BlockDirections::None => 0,
BlockDirections::Inbound => POLLIN,
BlockDirections::Outbound => POLLOUT,
BlockDirections::Both => POLLIN | POLLOUT,
},
events: sess.get_poll_flags(),
revents: 0,
},
];
@ -270,7 +474,7 @@ impl SessionInner {
Ok(_) => {}
Err(err) => {
log::debug!("error reading from stdin pipe: {:#}", err);
let _ = info.channel.close();
info.channel.close();
state.fd.take();
}
}
@ -301,34 +505,16 @@ impl SessionInner {
fn tick_io(&mut self) -> anyhow::Result<()> {
for chan in self.channels.values_mut() {
if chan.exit.is_some() {
if chan.channel.eof() && chan.channel.wait_close().is_ok() {
fn has_signal(chan: &ssh2::Channel) -> Option<ssh2::ExitSignal> {
if let Ok(sig) = chan.exit_signal() {
if sig.exit_signal.is_some() {
return Some(sig);
}
}
None
}
let status = if let Some(_sig) = has_signal(&chan.channel) {
Some(ExitStatus::with_exit_code(1))
} else if let Ok(status) = chan.channel.exit_status() {
Some(ExitStatus::with_exit_code(status as _))
} else {
None
};
if let Some(status) = status {
let exit = chan.exit.take().unwrap();
smol::block_on(exit.send(status)).ok();
}
if let Some(status) = chan.channel.exit_status() {
let exit = chan.exit.take().unwrap();
smol::block_on(exit.send(status)).ok();
}
}
let stdin = &mut chan.descriptors[0];
if stdin.fd.is_some() && !stdin.buf.is_empty() {
write_from_buf(&mut chan.channel, &mut stdin.buf).context("writing to channel")?;
write_from_buf(&mut chan.channel.writer(), &mut stdin.buf)
.context("writing to channel")?;
}
for (idx, out) in chan
@ -346,7 +532,7 @@ impl SessionInner {
if room == 0 {
continue;
}
match read_into_buf(&mut chan.channel.stream(idx as i32), &mut out.buf) {
match read_into_buf(&mut chan.channel.reader(idx), &mut out.buf) {
Ok(_) => {}
Err(err) => {
if out.buf.is_empty() {
@ -370,12 +556,12 @@ impl SessionInner {
let _ = self.sender_read.read(&mut buf);
}
fn dispatch_pending_requests(&mut self, sess: &ssh2::Session) -> anyhow::Result<()> {
fn dispatch_pending_requests(&mut self, sess: &mut SessionWrap) -> anyhow::Result<()> {
while self.dispatch_one_request(sess)? {}
Ok(())
}
fn dispatch_one_request(&mut self, sess: &ssh2::Session) -> anyhow::Result<bool> {
fn dispatch_one_request(&mut self, sess: &mut SessionWrap) -> anyhow::Result<bool> {
match self.rx_req.try_recv() {
Err(TryRecvError::Closed) => anyhow::bail!("all clients are closed"),
Err(TryRecvError::Empty) => Ok(false),
@ -383,157 +569,157 @@ impl SessionInner {
sess.set_blocking(true);
let res = match req {
SessionRequest::NewPty(newpty) => {
if let Err(err) = self.new_pty(&sess, &newpty) {
if let Err(err) = self.new_pty(sess, &newpty) {
log::error!("{:?} -> error: {:#}", newpty, err);
}
Ok(true)
}
SessionRequest::ResizePty(resize) => {
if let Err(err) = self.resize_pty(&sess, &resize) {
if let Err(err) = self.resize_pty(&resize) {
log::error!("{:?} -> error: {:#}", resize, err);
}
Ok(true)
}
SessionRequest::Exec(exec) => {
if let Err(err) = self.exec(&sess, &exec) {
if let Err(err) = self.exec(sess, &exec) {
log::error!("{:?} -> error: {:#}", exec, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::OpenWithMode(msg)) => {
if let Err(err) = self.open_with_mode(&sess, &msg) {
if let Err(err) = self.open_with_mode(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Open(msg)) => {
if let Err(err) = self.open(&sess, &msg) {
if let Err(err) = self.open(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Create(msg)) => {
if let Err(err) = self.create(&sess, &msg) {
if let Err(err) = self.create(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::OpenDir(msg)) => {
if let Err(err) = self.open_dir(&sess, &msg) {
if let Err(err) = self.open_dir(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Write(msg))) => {
if let Err(err) = self.write_file(&sess, &msg) {
if let Err(err) = self.write_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Read(msg))) => {
if let Err(err) = self.read_file(&sess, &msg) {
if let Err(err) = self.read_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Close(msg))) => {
if let Err(err) = self.close_file(&sess, &msg) {
if let Err(err) = self.close_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Flush(msg))) => {
if let Err(err) = self.flush_file(&sess, &msg) {
if let Err(err) = self.flush_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::SetMetadata(msg))) => {
if let Err(err) = self.set_metadata_file(&sess, &msg) {
if let Err(err) = self.set_metadata_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Metadata(msg))) => {
if let Err(err) = self.metadata_file(&sess, &msg) {
if let Err(err) = self.metadata_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::ReadDir(msg))) => {
if let Err(err) = self.read_dir_file(&sess, &msg) {
if let Err(err) = self.read_dir_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::File(FileRequest::Fsync(msg))) => {
if let Err(err) = self.fsync_file(&sess, &msg) {
if let Err(err) = self.fsync_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::ReadDir(msg)) => {
if let Err(err) = self.read_dir(&sess, &msg) {
if let Err(err) = self.read_dir(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::CreateDir(msg)) => {
if let Err(err) = self.create_dir(&sess, &msg) {
if let Err(err) = self.create_dir(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::RemoveDir(msg)) => {
if let Err(err) = self.remove_dir(&sess, &msg) {
if let Err(err) = self.remove_dir(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Metadata(msg)) => {
if let Err(err) = self.metadata(&sess, &msg) {
if let Err(err) = self.metadata(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::SymlinkMetadata(msg)) => {
if let Err(err) = self.symlink_metadata(&sess, &msg) {
if let Err(err) = self.symlink_metadata(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::SetMetadata(msg)) => {
if let Err(err) = self.set_metadata(&sess, &msg) {
if let Err(err) = self.set_metadata(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Symlink(msg)) => {
if let Err(err) = self.symlink(&sess, &msg) {
if let Err(err) = self.symlink(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::ReadLink(msg)) => {
if let Err(err) = self.read_link(&sess, &msg) {
if let Err(err) = self.read_link(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Canonicalize(msg)) => {
if let Err(err) = self.canonicalize(&sess, &msg) {
if let Err(err) = self.canonicalize(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::Rename(msg)) => {
if let Err(err) = self.rename(&sess, &msg) {
if let Err(err) = self.rename(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
}
SessionRequest::Sftp(SftpRequest::RemoveFile(msg)) => {
if let Err(err) = self.remove_file(&sess, &msg) {
if let Err(err) = self.remove_file(sess, &msg) {
log::error!("{:?} -> error: {:#}", msg, err);
}
Ok(true)
@ -545,14 +731,12 @@ impl SessionInner {
}
}
pub fn exec(&mut self, sess: &ssh2::Session, exec: &Exec) -> anyhow::Result<()> {
sess.set_blocking(true);
let mut channel = sess.channel_session()?;
pub fn exec(&mut self, sess: &mut SessionWrap, exec: &Exec) -> anyhow::Result<()> {
let mut channel = sess.open_session()?;
if let Some(env) = &exec.env {
for (key, val) in env {
if let Err(err) = channel.setenv(key, val) {
if let Err(err) = channel.request_env(key, val) {
// Depending on the server configuration, a given
// setenv request may not succeed, but that doesn't
// prevent the connection from being set up.
@ -567,7 +751,7 @@ impl SessionInner {
}
}
channel.exec(&exec.command_line)?;
channel.request_exec(&exec.command_line)?;
let channel_id = self.next_channel_id;
self.next_channel_id += 1;
@ -625,7 +809,7 @@ impl SessionInner {
/// Open a handle to a file.
pub fn open_with_mode(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::OpenWithMode,
) -> anyhow::Result<()> {
let flags: ssh2::OpenFlags = msg.opts.into();
@ -641,7 +825,7 @@ impl SessionInner {
Ok(ssh_file) => {
let (file_id, file) = self.make_file();
msg.reply.try_send(Ok(file))?;
self.files.insert(file_id, ssh_file);
self.files.insert(file_id, FileWrap::Ssh2(ssh_file));
}
Err(x) => msg.reply.try_send(Err(x))?,
}
@ -650,7 +834,7 @@ impl SessionInner {
}
/// Helper to open a file in the `Read` mode.
pub fn open(&mut self, sess: &ssh2::Session, msg: &sftp::Open) -> anyhow::Result<()> {
pub fn open(&mut self, sess: &mut SessionWrap, msg: &sftp::Open) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.open(msg.filename.as_std_path())
.map_err(SftpChannelError::from)
@ -660,7 +844,7 @@ impl SessionInner {
Ok(ssh_file) => {
let (file_id, file) = self.make_file();
msg.reply.try_send(Ok(file))?;
self.files.insert(file_id, ssh_file);
self.files.insert(file_id, FileWrap::Ssh2(ssh_file));
}
Err(x) => msg.reply.try_send(Err(x))?,
}
@ -669,7 +853,7 @@ impl SessionInner {
}
/// Helper to create a file in write-only mode with truncation.
pub fn create(&mut self, sess: &ssh2::Session, msg: &sftp::Create) -> anyhow::Result<()> {
pub fn create(&mut self, sess: &mut SessionWrap, msg: &sftp::Create) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.create(msg.filename.as_std_path())
.map_err(SftpChannelError::from)
@ -679,7 +863,7 @@ impl SessionInner {
Ok(ssh_file) => {
let (file_id, file) = self.make_file();
msg.reply.try_send(Ok(file))?;
self.files.insert(file_id, ssh_file);
self.files.insert(file_id, FileWrap::Ssh2(ssh_file));
}
Err(x) => msg.reply.try_send(Err(x))?,
}
@ -688,7 +872,7 @@ impl SessionInner {
}
/// Helper to open a directory for reading its contents.
pub fn open_dir(&mut self, sess: &ssh2::Session, msg: &sftp::OpenDir) -> anyhow::Result<()> {
pub fn open_dir(&mut self, sess: &mut SessionWrap, msg: &sftp::OpenDir) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.opendir(msg.filename.as_std_path())
.map_err(SftpChannelError::from)
@ -698,7 +882,7 @@ impl SessionInner {
Ok(ssh_file) => {
let (file_id, file) = self.make_file();
msg.reply.try_send(Ok(file))?;
self.files.insert(file_id, ssh_file);
self.files.insert(file_id, FileWrap::Ssh2(ssh_file));
}
Err(x) => msg.reply.try_send(Err(x))?,
}
@ -715,7 +899,7 @@ impl SessionInner {
}
/// Writes to a loaded file.
fn write_file(&mut self, _sess: &ssh2::Session, msg: &sftp::WriteFile) -> anyhow::Result<()> {
fn write_file(&mut self, _sess: &mut SessionWrap, msg: &sftp::WriteFile) -> anyhow::Result<()> {
let sftp::WriteFile {
file_id,
data,
@ -723,7 +907,10 @@ impl SessionInner {
} = msg;
if let Some(file) = self.files.get_mut(file_id) {
let result = file.write_all(data).map_err(SftpChannelError::from);
let result = file
.writer()
.write_all(data)
.map_err(SftpChannelError::from);
reply.try_send(result)?;
}
@ -731,7 +918,7 @@ impl SessionInner {
}
/// Reads from a loaded file.
fn read_file(&mut self, _sess: &ssh2::Session, msg: &sftp::ReadFile) -> anyhow::Result<()> {
fn read_file(&mut self, _sess: &mut SessionWrap, msg: &sftp::ReadFile) -> anyhow::Result<()> {
let sftp::ReadFile {
file_id,
max_bytes,
@ -741,7 +928,7 @@ impl SessionInner {
if let Some(file) = self.files.get_mut(file_id) {
// TODO: Move this somewhere to avoid re-allocating buffer
let mut buf = vec![0u8; *max_bytes];
match file.read(&mut buf).map_err(SftpChannelError::from) {
match file.reader().read(&mut buf).map_err(SftpChannelError::from) {
Ok(n) => {
buf.truncate(n);
reply.try_send(Ok(buf))?;
@ -754,7 +941,7 @@ impl SessionInner {
}
/// Closes a file and removes it from the internal memory.
fn close_file(&mut self, _sess: &ssh2::Session, msg: &sftp::CloseFile) -> anyhow::Result<()> {
fn close_file(&mut self, _sess: &mut SessionWrap, msg: &sftp::CloseFile) -> anyhow::Result<()> {
self.files.remove(&msg.file_id);
msg.reply.try_send(Ok(()))?;
@ -762,9 +949,9 @@ impl SessionInner {
}
/// Flushes a file.
fn flush_file(&mut self, _sess: &ssh2::Session, msg: &sftp::FlushFile) -> anyhow::Result<()> {
fn flush_file(&mut self, _sess: &mut SessionWrap, msg: &sftp::FlushFile) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&msg.file_id) {
let result = file.flush().map_err(SftpChannelError::from);
let result = file.writer().flush().map_err(SftpChannelError::from);
msg.reply.try_send(result)?;
}
@ -774,7 +961,7 @@ impl SessionInner {
/// Sets file metadata.
fn set_metadata_file(
&mut self,
_sess: &ssh2::Session,
_sess: &mut SessionWrap,
msg: &sftp::SetMetadataFile,
) -> anyhow::Result<()> {
let sftp::SetMetadataFile {
@ -784,9 +971,7 @@ impl SessionInner {
} = msg;
if let Some(file) = self.files.get_mut(file_id) {
let result = file
.setstat((*metadata).into())
.map_err(SftpChannelError::from);
let result = file.set_metadata(*metadata).map_err(SftpChannelError::from);
reply.try_send(result)?;
}
@ -796,14 +981,11 @@ impl SessionInner {
/// Gets file stat.
fn metadata_file(
&mut self,
_sess: &ssh2::Session,
_sess: &mut SessionWrap,
msg: &sftp::MetadataFile,
) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&msg.file_id) {
let result = file
.stat()
.map(Metadata::from)
.map_err(SftpChannelError::from);
let result = file.metadata();
msg.reply.try_send(result)?;
}
@ -813,20 +995,11 @@ impl SessionInner {
/// Performs readdir for file.
fn read_dir_file(
&mut self,
_sess: &ssh2::Session,
_sess: &mut SessionWrap,
msg: &sftp::ReadDirFile,
) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&msg.file_id) {
let result = file
.readdir()
.map_err(SftpChannelError::from)
.and_then(|(path, stat)| match Utf8PathBuf::try_from(path) {
Ok(path) => Ok((path, Metadata::from(stat))),
Err(x) => Err(SftpChannelError::from(io::Error::new(
io::ErrorKind::InvalidData,
x,
))),
});
let result = file.read_dir();
msg.reply.try_send(result)?;
}
@ -836,11 +1009,11 @@ impl SessionInner {
/// Fsync file.
fn fsync_file(
&mut self,
_sess: &ssh2::Session,
_sess: &mut SessionWrap,
fsync_file: &sftp::FsyncFile,
) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&fsync_file.file_id) {
let result = file.fsync().map_err(SftpChannelError::from);
let result = file.fsync();
fsync_file.reply.try_send(result)?;
}
@ -848,7 +1021,7 @@ impl SessionInner {
}
/// Convenience function to read the files in a directory.
pub fn read_dir(&mut self, sess: &ssh2::Session, msg: &sftp::ReadDir) -> anyhow::Result<()> {
pub fn read_dir(&mut self, sess: &mut SessionWrap, msg: &sftp::ReadDir) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.readdir(msg.filename.as_std_path())
.map_err(SftpChannelError::from)
@ -858,8 +1031,8 @@ impl SessionInner {
match Utf8PathBuf::try_from(path) {
Ok(path) => mapped_entries.push((path, Metadata::from(stat))),
Err(x) => {
return Err(SftpChannelError::from(io::Error::new(
io::ErrorKind::InvalidData,
return Err(SftpChannelError::from(std::io::Error::new(
std::io::ErrorKind::InvalidData,
x,
)));
}
@ -877,7 +1050,7 @@ impl SessionInner {
/// Create a directory on the remote filesystem.
pub fn create_dir(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::CreateDir,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -892,7 +1065,7 @@ impl SessionInner {
/// Remove a directory from the remote filesystem.
pub fn remove_dir(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::RemoveDir,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -907,7 +1080,7 @@ impl SessionInner {
/// Get the metadata for a file, performed by stat(2).
pub fn metadata(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::GetMetadata,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -923,7 +1096,7 @@ impl SessionInner {
/// Get the metadata for a file, performed by lstat(2).
pub fn symlink_metadata(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::SymlinkMetadata,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -939,7 +1112,7 @@ impl SessionInner {
/// Set the metadata for a file.
pub fn set_metadata(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::SetMetadata,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -952,7 +1125,7 @@ impl SessionInner {
}
/// Create symlink at `target` pointing at `path`.
pub fn symlink(&mut self, sess: &ssh2::Session, msg: &sftp::Symlink) -> anyhow::Result<()> {
pub fn symlink(&mut self, sess: &mut SessionWrap, msg: &sftp::Symlink) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.symlink(msg.path.as_std_path(), msg.target.as_std_path())
.map_err(SftpChannelError::from)
@ -963,13 +1136,20 @@ impl SessionInner {
}
/// Read a symlink at `path`.
pub fn read_link(&mut self, sess: &ssh2::Session, msg: &sftp::ReadLink) -> anyhow::Result<()> {
pub fn read_link(
&mut self,
sess: &mut SessionWrap,
msg: &sftp::ReadLink,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.readlink(msg.path.as_std_path())
.map_err(SftpChannelError::from)
.and_then(|path| {
Utf8PathBuf::try_from(path).map_err(|x| {
SftpChannelError::from(io::Error::new(io::ErrorKind::InvalidData, x))
SftpChannelError::from(std::io::Error::new(
std::io::ErrorKind::InvalidData,
x,
))
})
})
});
@ -981,7 +1161,7 @@ impl SessionInner {
/// Resolve the real path for `path`.
pub fn canonicalize(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::Canonicalize,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -989,7 +1169,10 @@ impl SessionInner {
.map_err(SftpChannelError::from)
.and_then(|path| {
Utf8PathBuf::try_from(path).map_err(|x| {
SftpChannelError::from(io::Error::new(io::ErrorKind::InvalidData, x))
SftpChannelError::from(std::io::Error::new(
std::io::ErrorKind::InvalidData,
x,
))
})
})
});
@ -999,7 +1182,7 @@ impl SessionInner {
}
/// Rename the filesystem object on the remote filesystem.
pub fn rename(&mut self, sess: &ssh2::Session, msg: &sftp::Rename) -> anyhow::Result<()> {
pub fn rename(&mut self, sess: &mut SessionWrap, msg: &sftp::Rename) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
sftp.rename(
msg.src.as_std_path(),
@ -1016,7 +1199,7 @@ impl SessionInner {
/// Remove a file on the remote filesystem.
pub fn remove_file(
&mut self,
sess: &ssh2::Session,
sess: &mut SessionWrap,
msg: &sftp::RemoveFile,
) -> anyhow::Result<()> {
let result = self.init_sftp(sess).and_then(|sftp| {
@ -1029,20 +1212,18 @@ impl SessionInner {
}
/// Initialize the sftp channel if not already created, returning a mutable reference to it
fn init_sftp<'a, 'b>(
&'a mut self,
sess: &'b ssh2::Session,
fn init_sftp<'a>(
&mut self,
sess: &'a mut SessionWrap,
) -> SftpChannelResult<&'a mut ssh2::Sftp> {
if self.sftp.is_none() {
let blocking = sess.is_blocking();
sess.set_blocking(true);
self.sftp = Some(sess.sftp()?);
sess.set_blocking(blocking);
match sess {
SessionWrap::Ssh2(sess) => {
if sess.sftp.is_none() {
sess.sftp = Some(sess.sess.sftp()?);
}
Ok(sess.sftp.as_mut().expect("sftp should have been set above"))
}
}
Ok(self.sftp.as_mut().expect("sftp should have been set above"))
}
}
@ -1076,7 +1257,6 @@ impl Session {
rx_req,
channels: HashMap::new(),
files: HashMap::new(),
sftp: None,
next_channel_id: 1,
next_file_id: 1,
sender_read,