1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-23 21:32:13 +03:00

ssh: adopt dispatch helper for ssh file and dir requests too

@chipsenkbeil: I spotted a latent bug in here that got fixed as
a side effect of this change.  For `write_file` and possibly others,
reply.try_send was only called in the case where file_id was valid.
For an invalid id, I think the caller could hang.

Not sure if this was a problem in practice, but I wonder if it might
have contributed to some of the weird state issues you mentioned.
This commit is contained in:
Wez Furlong 2021-10-19 19:49:01 -07:00
parent 74b763a3f6
commit 5ada8e20cc
3 changed files with 149 additions and 262 deletions

View File

@ -5,12 +5,9 @@ use crate::filewrap::FileWrap;
use crate::pty::*; use crate::pty::*;
use crate::session::{Exec, ExecResult, SessionEvent, SessionRequest, SignalChannel}; use crate::session::{Exec, ExecResult, SessionEvent, SessionRequest, SignalChannel};
use crate::sessionwrap::SessionWrap; use crate::sessionwrap::SessionWrap;
use crate::sftp::dir::{CloseDir, Dir, DirId, DirRequest, ReadDirHandle}; use crate::sftp::dir::{Dir, DirId, DirRequest};
use crate::sftp::file::{ use crate::sftp::file::{File, FileId, FileRequest};
CloseFile, File, FileId, FileRequest, FlushFile, FsyncFile, MetadataFile, ReadFile, use crate::sftp::{OpenWithMode, SftpChannelResult, SftpRequest};
SetMetadataFile, WriteFile,
};
use crate::sftp::{OpenWithMode, SftpChannelError, SftpChannelResult, SftpRequest};
use crate::sftpwrap::SftpWrap; use crate::sftpwrap::SftpWrap;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
@ -437,60 +434,129 @@ impl SessionInner {
SessionRequest::Sftp(SftpRequest::OpenDir(path, reply)) => { SessionRequest::Sftp(SftpRequest::OpenDir(path, reply)) => {
dispatch(reply, || self.open_dir(sess, path), "OpenDir") dispatch(reply, || self.open_dir(sess, path), "OpenDir")
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::Write(msg))) => { SessionRequest::Sftp(SftpRequest::File(FileRequest::Write(msg, reply))) => {
if let Err(err) = self.write_file(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) let file = self
.files
.get_mut(&msg.file_id)
.ok_or_else(|| anyhow!("invalid file_id"))?;
file.writer().write_all(&msg.data)?;
Ok(())
},
"write_file",
)
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::Read(msg))) => { SessionRequest::Sftp(SftpRequest::File(FileRequest::Read(msg, reply))) => {
if let Err(err) = self.read_file(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) let file = self
.files
.get_mut(&msg.file_id)
.ok_or_else(|| anyhow!("invalid file_id"))?;
// TODO: Move this somewhere to avoid re-allocating buffer
let mut buf = vec![0u8; msg.max_bytes];
let n = file.reader().read(&mut buf)?;
buf.truncate(n);
Ok(buf)
},
"read_file",
)
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::Close(msg))) => { SessionRequest::Sftp(SftpRequest::File(FileRequest::Close(file_id, reply))) => {
if let Err(err) = self.close_file(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) self.files.remove(&file_id);
Ok(())
},
"close_file",
)
} }
SessionRequest::Sftp(SftpRequest::Dir(DirRequest::Close(msg))) => { SessionRequest::Sftp(SftpRequest::Dir(DirRequest::Close(dir_id, reply))) => {
if let Err(err) = self.close_dir(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) self.dirs
.remove(&dir_id)
.ok_or_else(|| anyhow!("invalid dir_id"))?;
Ok(())
},
"close_dir",
)
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::Flush(msg))) => { SessionRequest::Sftp(SftpRequest::Dir(DirRequest::ReadDir(dir_id, reply))) => {
if let Err(err) = self.flush_file(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) let dir = self
.dirs
.get_mut(&dir_id)
.ok_or_else(|| anyhow!("invalid dir_id"))?;
dir.read_dir()
},
"read_dir",
)
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::SetMetadata(msg))) => { SessionRequest::Sftp(SftpRequest::File(FileRequest::Flush(file_id, reply))) => {
if let Err(err) = self.set_metadata_file(sess, &msg) { dispatch(
log::error!("{:?} -> error: {:#}", msg, err); reply,
} || {
Ok(true) let file = self
.files
.get_mut(&file_id)
.ok_or_else(|| anyhow!("invalid file_id"))?;
file.writer().flush()?;
Ok(())
},
"flush_file",
)
} }
SessionRequest::Sftp(SftpRequest::File(FileRequest::Metadata(msg))) => { SessionRequest::Sftp(SftpRequest::File(FileRequest::SetMetadata(
if let Err(err) = self.metadata_file(sess, &msg) { msg,
log::error!("{:?} -> error: {:#}", msg, err); reply,
} ))) => dispatch(
Ok(true) reply,
} || {
SessionRequest::Sftp(SftpRequest::Dir(DirRequest::ReadDir(msg))) => { let file = self
if let Err(err) = self.read_dir_handle(sess, &msg) { .files
log::error!("{:?} -> error: {:#}", msg, err); .get_mut(&msg.file_id)
} .ok_or_else(|| anyhow!("invalid file_id"))?;
Ok(true) file.set_metadata(msg.metadata)
} },
SessionRequest::Sftp(SftpRequest::File(FileRequest::Fsync(msg))) => { "set_metadata_file",
if let Err(err) = self.fsync_file(sess, &msg) { ),
log::error!("{:?} -> error: {:#}", msg, err); SessionRequest::Sftp(SftpRequest::File(FileRequest::Metadata(
} file_id,
Ok(true) reply,
))) => dispatch(
reply,
|| {
let file = self
.files
.get_mut(&file_id)
.ok_or_else(|| anyhow!("invalid file_id"))?;
file.metadata()
},
"metadata_file",
),
SessionRequest::Sftp(SftpRequest::File(FileRequest::Fsync(file_id, reply))) => {
dispatch(
reply,
|| {
let file = self
.files
.get_mut(&file_id)
.ok_or_else(|| anyhow!("invalid file_id"))?;
file.fsync()
},
"fsync",
)
} }
SessionRequest::Sftp(SftpRequest::ReadDir(path, reply)) => { SessionRequest::Sftp(SftpRequest::ReadDir(path, reply)) => {
dispatch(reply, || self.init_sftp(sess)?.read_dir(&path), "read_dir") dispatch(reply, || self.init_sftp(sess)?.read_dir(&path), "read_dir")
} }
@ -668,131 +734,6 @@ impl SessionInner {
Ok(dir) Ok(dir)
} }
/// Writes to a loaded file.
fn write_file(&mut self, _sess: &mut SessionWrap, msg: &WriteFile) -> anyhow::Result<()> {
let WriteFile {
file_id,
data,
reply,
} = msg;
if let Some(file) = self.files.get_mut(file_id) {
let result = file
.writer()
.write_all(data)
.map_err(SftpChannelError::from);
reply.try_send(result)?;
}
Ok(())
}
/// Reads from a loaded file.
fn read_file(&mut self, _sess: &mut SessionWrap, msg: &ReadFile) -> anyhow::Result<()> {
let ReadFile {
file_id,
max_bytes,
reply,
} = msg;
if let Some(file) = self.files.get_mut(file_id) {
// TODO: Move this somewhere to avoid re-allocating buffer
let mut buf = vec![0u8; *max_bytes];
match file.reader().read(&mut buf).map_err(SftpChannelError::from) {
Ok(n) => {
buf.truncate(n);
reply.try_send(Ok(buf))?;
}
Err(x) => reply.try_send(Err(x))?,
}
}
Ok(())
}
fn close_dir(&mut self, _sess: &mut SessionWrap, msg: &CloseDir) -> anyhow::Result<()> {
self.dirs.remove(&msg.dir_id);
msg.reply.try_send(Ok(()))?;
Ok(())
}
/// Closes a file and removes it from the internal memory.
fn close_file(&mut self, _sess: &mut SessionWrap, msg: &CloseFile) -> anyhow::Result<()> {
self.files.remove(&msg.file_id);
msg.reply.try_send(Ok(()))?;
Ok(())
}
/// Flushes a file.
fn flush_file(&mut self, _sess: &mut SessionWrap, msg: &FlushFile) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&msg.file_id) {
let result = file.writer().flush().map_err(SftpChannelError::from);
msg.reply.try_send(result)?;
}
Ok(())
}
/// Sets file metadata.
fn set_metadata_file(
&mut self,
_sess: &mut SessionWrap,
msg: &SetMetadataFile,
) -> anyhow::Result<()> {
let SetMetadataFile {
file_id,
metadata,
reply,
} = msg;
if let Some(file) = self.files.get_mut(file_id) {
let result = file.set_metadata(*metadata).map_err(SftpChannelError::from);
reply.try_send(result)?;
}
Ok(())
}
/// Gets file stat.
fn metadata_file(&mut self, _sess: &mut SessionWrap, msg: &MetadataFile) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&msg.file_id) {
let result = file.metadata();
msg.reply.try_send(result)?;
}
Ok(())
}
/// Performs readdir for file.
fn read_dir_handle(
&mut self,
_sess: &mut SessionWrap,
msg: &ReadDirHandle,
) -> anyhow::Result<()> {
if let Some(dir) = self.dirs.get_mut(&msg.dir_id) {
let result = dir.read_dir();
msg.reply.try_send(result)?;
}
Ok(())
}
/// Fsync file.
fn fsync_file(
&mut self,
_sess: &mut SessionWrap,
fsync_file: &FsyncFile,
) -> anyhow::Result<()> {
if let Some(file) = self.files.get_mut(&fsync_file.file_id) {
let result = file.fsync();
fsync_file.reply.try_send(result)?;
}
Ok(())
}
/// Initialize the sftp channel if not already created, returning a mutable reference to it /// Initialize the sftp channel if not already created, returning a mutable reference to it
fn init_sftp<'a>(&mut self, sess: &'a mut SessionWrap) -> SftpChannelResult<&'a mut SftpWrap> { fn init_sftp<'a>(&mut self, sess: &'a mut SessionWrap) -> SftpChannelResult<&'a mut SftpWrap> {
match sess { match sess {

View File

@ -17,22 +17,10 @@ impl fmt::Debug for Dir {
} }
} }
#[derive(Debug)]
pub(crate) struct CloseDir {
pub dir_id: DirId,
pub reply: Sender<SftpChannelResult<()>>,
}
#[derive(Debug)]
pub(crate) struct ReadDirHandle {
pub dir_id: DirId,
pub reply: Sender<SftpChannelResult<(Utf8PathBuf, Metadata)>>,
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum DirRequest { pub(crate) enum DirRequest {
Close(CloseDir), Close(DirId, Sender<SftpChannelResult<()>>),
ReadDir(ReadDirHandle), ReadDir(DirId, Sender<SftpChannelResult<(Utf8PathBuf, Metadata)>>),
} }
impl Drop for Dir { impl Drop for Dir {
@ -41,10 +29,8 @@ impl Drop for Dir {
if let Some(tx) = self.tx.take() { if let Some(tx) = self.tx.take() {
let (reply, _) = bounded(1); let (reply, _) = bounded(1);
let _ = tx.try_send(SessionRequest::Sftp(SftpRequest::Dir(DirRequest::Close( let _ = tx.try_send(SessionRequest::Sftp(SftpRequest::Dir(DirRequest::Close(
CloseDir { self.dir_id,
dir_id: self.dir_id, reply,
reply,
},
)))); ))));
} }
} }
@ -76,10 +62,8 @@ impl Dir {
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(SessionRequest::Sftp(SftpRequest::Dir(DirRequest::ReadDir( .send(SessionRequest::Sftp(SftpRequest::Dir(DirRequest::ReadDir(
ReadDirHandle { self.dir_id,
dir_id: self.dir_id, reply,
reply,
},
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;

View File

@ -26,58 +26,31 @@ struct FileState {
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum FileRequest { pub(crate) enum FileRequest {
Write(WriteFile), Write(WriteFile, Sender<SftpChannelResult<()>>),
Read(ReadFile), Read(ReadFile, Sender<SftpChannelResult<Vec<u8>>>),
Close(CloseFile), Close(FileId, Sender<SftpChannelResult<()>>),
Flush(FlushFile), Flush(FileId, Sender<SftpChannelResult<()>>),
SetMetadata(SetMetadataFile), SetMetadata(SetMetadataFile, Sender<SftpChannelResult<()>>),
Metadata(MetadataFile), Metadata(FileId, Sender<SftpChannelResult<Metadata>>),
Fsync(FsyncFile), Fsync(FileId, Sender<SftpChannelResult<()>>),
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct WriteFile { pub(crate) struct WriteFile {
pub file_id: FileId, pub file_id: FileId,
pub data: Vec<u8>, pub data: Vec<u8>,
pub reply: Sender<SftpChannelResult<()>>,
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ReadFile { pub(crate) struct ReadFile {
pub file_id: FileId, pub file_id: FileId,
pub max_bytes: usize, pub max_bytes: usize,
pub reply: Sender<SftpChannelResult<Vec<u8>>>,
}
#[derive(Debug)]
pub(crate) struct CloseFile {
pub file_id: FileId,
pub reply: Sender<SftpChannelResult<()>>,
}
#[derive(Debug)]
pub(crate) struct FlushFile {
pub file_id: FileId,
pub reply: Sender<SftpChannelResult<()>>,
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct SetMetadataFile { pub(crate) struct SetMetadataFile {
pub file_id: FileId, pub file_id: FileId,
pub metadata: Metadata, pub metadata: Metadata,
pub reply: Sender<SftpChannelResult<()>>,
}
#[derive(Debug)]
pub(crate) struct MetadataFile {
pub file_id: FileId,
pub reply: Sender<SftpChannelResult<Metadata>>,
}
#[derive(Debug)]
pub(crate) struct FsyncFile {
pub file_id: FileId,
pub reply: Sender<SftpChannelResult<()>>,
} }
impl fmt::Debug for File { impl fmt::Debug for File {
@ -94,10 +67,8 @@ impl Drop for File {
if let Some(tx) = self.tx.take() { if let Some(tx) = self.tx.take() {
let (reply, _) = bounded(1); let (reply, _) = bounded(1);
let _ = tx.try_send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Close( let _ = tx.try_send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Close(
CloseFile { self.file_id,
file_id: self.file_id, reply,
reply,
},
)))); ))));
} }
} }
@ -125,11 +96,13 @@ impl File {
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(SessionRequest::Sftp(SftpRequest::File( .send(SessionRequest::Sftp(SftpRequest::File(
FileRequest::SetMetadata(SetMetadataFile { FileRequest::SetMetadata(
file_id: self.file_id, SetMetadataFile {
metadata, file_id: self.file_id,
metadata,
},
reply, reply,
}), ),
))) )))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -145,10 +118,7 @@ impl File {
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(SessionRequest::Sftp(SftpRequest::File( .send(SessionRequest::Sftp(SftpRequest::File(
FileRequest::Metadata(MetadataFile { FileRequest::Metadata(self.file_id, reply),
file_id: self.file_id,
reply,
}),
))) )))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -165,10 +135,8 @@ impl File {
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Fsync( .send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Fsync(
FsyncFile { self.file_id,
file_id: self.file_id, reply,
reply,
},
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -293,11 +261,8 @@ impl smol::io::AsyncWrite for File {
async fn inner_write(tx: SessionSender, file_id: usize, data: Vec<u8>) -> SftpChannelResult<()> { async fn inner_write(tx: SessionSender, file_id: usize, data: Vec<u8>) -> SftpChannelResult<()> {
let (reply, rx) = bounded(1); let (reply, rx) = bounded(1);
tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Write( tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Write(
WriteFile { WriteFile { file_id, data },
file_id, reply,
data,
reply,
},
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -315,11 +280,8 @@ async fn inner_read(
) -> SftpChannelResult<Vec<u8>> { ) -> SftpChannelResult<Vec<u8>> {
let (reply, rx) = bounded(1); let (reply, rx) = bounded(1);
tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Read( tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Read(
ReadFile { ReadFile { file_id, max_bytes },
file_id, reply,
max_bytes,
reply,
},
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -330,7 +292,7 @@ async fn inner_read(
async fn inner_flush(tx: SessionSender, file_id: usize) -> SftpChannelResult<()> { async fn inner_flush(tx: SessionSender, file_id: usize) -> SftpChannelResult<()> {
let (reply, rx) = bounded(1); let (reply, rx) = bounded(1);
tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Flush( tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Flush(
FlushFile { file_id, reply }, file_id, reply,
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;
@ -341,7 +303,7 @@ async fn inner_flush(tx: SessionSender, file_id: usize) -> SftpChannelResult<()>
async fn inner_close(tx: SessionSender, file_id: usize) -> SftpChannelResult<()> { async fn inner_close(tx: SessionSender, file_id: usize) -> SftpChannelResult<()> {
let (reply, rx) = bounded(1); let (reply, rx) = bounded(1);
tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Close( tx.send(SessionRequest::Sftp(SftpRequest::File(FileRequest::Close(
CloseFile { file_id, reply }, file_id, reply,
)))) ))))
.await?; .await?;
let result = rx.recv().await??; let result = rx.recv().await??;