mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-02 08:02:23 +03:00
vfs: 0.5.0
This commit is contained in:
parent
e9172423d2
commit
2bc926574f
@ -384,7 +384,7 @@ fn handle_new_package(
|
||||
.inherit(true)
|
||||
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||
path: zip_path,
|
||||
action: kt::VfsAction::ReWrite,
|
||||
action: kt::VfsAction::Write,
|
||||
})?)
|
||||
.payload(payload)
|
||||
.send_and_await_response(5)??;
|
||||
|
12
src/types.rs
12
src/types.rs
@ -976,12 +976,10 @@ pub enum VfsAction {
|
||||
CreateDir,
|
||||
CreateDirAll,
|
||||
CreateFile,
|
||||
OpenFile,
|
||||
OpenFile { create: bool },
|
||||
CloseFile,
|
||||
WriteAll,
|
||||
Write,
|
||||
ReWrite,
|
||||
WriteAt(u64),
|
||||
WriteAt,
|
||||
Append,
|
||||
SyncAll,
|
||||
Read,
|
||||
@ -989,13 +987,14 @@ pub enum VfsAction {
|
||||
ReadToEnd,
|
||||
ReadExact(u64),
|
||||
ReadToString,
|
||||
Seek(SeekFrom),
|
||||
Seek { seek_from: SeekFrom},
|
||||
RemoveFile,
|
||||
RemoveDir,
|
||||
RemoveDirAll,
|
||||
Rename(String),
|
||||
Rename { new_path: String},
|
||||
Metadata,
|
||||
AddZip,
|
||||
CopyFile { new_path: String },
|
||||
Len,
|
||||
SetLen(u64),
|
||||
Hash,
|
||||
@ -1033,6 +1032,7 @@ pub enum VfsResponse {
|
||||
Ok,
|
||||
Err(VfsError),
|
||||
Read,
|
||||
SeekFrom(u64),
|
||||
ReadDir(Vec<DirEntry>),
|
||||
ReadToString(String),
|
||||
Metadata(FileMetadata),
|
||||
|
228
src/vfs.rs
228
src/vfs.rs
@ -158,27 +158,31 @@ async fn handle_request(
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::CreateFile => {
|
||||
let _file = open_file(open_files.clone(), path, true).await?;
|
||||
// create truncates any file that might've existed before
|
||||
open_files.remove(&path);
|
||||
let _file = open_file(open_files.clone(), path, true, true).await?;
|
||||
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::OpenFile => {
|
||||
let _file = open_file(open_files.clone(), path, false).await?;
|
||||
VfsAction::OpenFile { create } => {
|
||||
// open file opens an existing file, or creates a new one if create is true
|
||||
let _file = open_file(open_files.clone(), path, create, false).await?;
|
||||
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::CloseFile => {
|
||||
// removes file from scope, resets file_handle and cursor.
|
||||
open_files.remove(&path);
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::WriteAll => {
|
||||
// should expect open file.
|
||||
VfsAction::WriteAt => {
|
||||
// doesn't create a file, writes at exact cursor.
|
||||
let Some(payload) = payload else {
|
||||
return Err(VfsError::BadRequest {
|
||||
error: "payload needs to exist for WriteAll".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.write_all(&payload.bytes).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
@ -189,45 +193,19 @@ async fn handle_request(
|
||||
error: "payload needs to exist for Write".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files.clone(), path, true).await?;
|
||||
open_files.remove(&path);
|
||||
let file = open_file(open_files.clone(), path, true, true).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.write_all(&payload.bytes).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::ReWrite => {
|
||||
let Some(payload) = payload else {
|
||||
return Err(VfsError::BadRequest {
|
||||
error: "payload needs to exist for Write".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files.clone(), path, true).await?;
|
||||
let mut file = file.lock().await;
|
||||
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&payload.bytes).await?;
|
||||
file.set_len(payload.bytes.len() as u64).await?;
|
||||
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::WriteAt(offset) => {
|
||||
let Some(payload) = payload else {
|
||||
return Err(VfsError::BadRequest {
|
||||
error: "payload needs to exist for WriteAt".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.seek(SeekFrom::Start(offset)).await?;
|
||||
file.write_all(&payload.bytes).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::Append => {
|
||||
let Some(payload) = payload else {
|
||||
return Err(VfsError::BadRequest {
|
||||
error: "payload needs to exist for Append".into(),
|
||||
});
|
||||
};
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.seek(SeekFrom::End(0)).await?;
|
||||
file.write_all(&payload.bytes).await?;
|
||||
@ -235,13 +213,13 @@ async fn handle_request(
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::SyncAll => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
file.sync_all().await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::Read => {
|
||||
let file = open_file(open_files.clone(), path.clone(), false).await?;
|
||||
let file = open_file(open_files.clone(), path.clone(), false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = Vec::new();
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
@ -253,7 +231,7 @@ async fn handle_request(
|
||||
)
|
||||
}
|
||||
VfsAction::ReadToEnd => {
|
||||
let file = open_file(open_files.clone(), path.clone(), false).await?;
|
||||
let file = open_file(open_files.clone(), path.clone(), false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = Vec::new();
|
||||
|
||||
@ -264,6 +242,16 @@ async fn handle_request(
|
||||
Some(contents),
|
||||
)
|
||||
}
|
||||
VfsAction::ReadExact(length) => {
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = vec![0; length as usize];
|
||||
file.read_exact(&mut contents).await?;
|
||||
(
|
||||
serde_json::to_vec(&VfsResponse::Read).unwrap(),
|
||||
Some(contents),
|
||||
)
|
||||
}
|
||||
VfsAction::ReadDir => {
|
||||
let mut dir = fs::read_dir(path).await?;
|
||||
let mut entries = Vec::new();
|
||||
@ -284,18 +272,8 @@ async fn handle_request(
|
||||
None,
|
||||
)
|
||||
}
|
||||
VfsAction::ReadExact(length) => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = vec![0; length as usize];
|
||||
file.read_exact(&mut contents).await?;
|
||||
(
|
||||
serde_json::to_vec(&VfsResponse::Read).unwrap(),
|
||||
Some(contents),
|
||||
)
|
||||
}
|
||||
VfsAction::ReadToString => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents).await?;
|
||||
@ -304,8 +282,8 @@ async fn handle_request(
|
||||
None,
|
||||
)
|
||||
}
|
||||
VfsAction::Seek(seek_from) => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
VfsAction::Seek { seek_from } => {
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
// same type, rust tingz
|
||||
let seek_from = match seek_from {
|
||||
@ -313,8 +291,11 @@ async fn handle_request(
|
||||
crate::types::SeekFrom::End(offset) => std::io::SeekFrom::End(offset),
|
||||
crate::types::SeekFrom::Current(offset) => std::io::SeekFrom::Current(offset),
|
||||
};
|
||||
file.seek(seek_from).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
let response = file.seek(seek_from).await?;
|
||||
(
|
||||
serde_json::to_vec(&VfsResponse::SeekFrom(response)).unwrap(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
VfsAction::RemoveFile => {
|
||||
fs::remove_file(path).await?;
|
||||
@ -328,13 +309,16 @@ async fn handle_request(
|
||||
fs::remove_dir_all(path).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::Rename(new_path) => {
|
||||
// doublecheck permission weirdness, sanitize new path
|
||||
VfsAction::Rename { new_path } => {
|
||||
fs::rename(path, new_path).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::CopyFile { new_path } => {
|
||||
fs::copy(path, new_path).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::Metadata => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
let metadata = file.metadata().await?;
|
||||
|
||||
@ -350,19 +334,19 @@ async fn handle_request(
|
||||
)
|
||||
}
|
||||
VfsAction::Len => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
let len = file.metadata().await?.len();
|
||||
(serde_json::to_vec(&VfsResponse::Len(len)).unwrap(), None)
|
||||
}
|
||||
VfsAction::SetLen(len) => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
file.set_len(len).await?;
|
||||
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
|
||||
}
|
||||
VfsAction::Hash => {
|
||||
let file = open_file(open_files.clone(), path, false).await?;
|
||||
let file = open_file(open_files.clone(), path, false, false).await?;
|
||||
let mut file = file.lock().await;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
@ -410,18 +394,19 @@ async fn handle_request(
|
||||
// Before any `.await`s are called since ZipFile is not
|
||||
// Send and so does not play nicely with await
|
||||
let (is_file, is_dir, local_path, file_contents) = {
|
||||
let mut file = zip.by_index(i).unwrap();
|
||||
let mut file = zip.by_index(i)?;
|
||||
let is_file = file.is_file();
|
||||
let is_dir = file.is_dir();
|
||||
let mut file_contents = Vec::new();
|
||||
if is_file {
|
||||
file.read_to_end(&mut file_contents).unwrap();
|
||||
file.read_to_end(&mut file_contents)?;
|
||||
};
|
||||
let local_path = path.join(file.name());
|
||||
(is_file, is_dir, local_path, file_contents)
|
||||
};
|
||||
if is_file {
|
||||
let file = open_file(open_files.clone(), local_path, true).await?;
|
||||
open_files.remove(&local_path);
|
||||
let file = open_file(open_files.clone(), local_path, true, true).await?;
|
||||
let mut file = file.lock().await;
|
||||
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
@ -522,6 +507,7 @@ async fn open_file<P: AsRef<Path>>(
|
||||
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
|
||||
path: P,
|
||||
create: bool,
|
||||
truncate: bool,
|
||||
) -> Result<Arc<Mutex<fs::File>>, VfsError> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
Ok(match open_files.get(&path) {
|
||||
@ -532,6 +518,7 @@ async fn open_file<P: AsRef<Path>>(
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(create)
|
||||
.truncate(truncate)
|
||||
.open(&path)
|
||||
.await
|
||||
.map_err(|e| VfsError::IOError {
|
||||
@ -581,24 +568,44 @@ async fn check_caps(
|
||||
VfsAction::CreateDir
|
||||
| VfsAction::CreateDirAll
|
||||
| VfsAction::CreateFile
|
||||
| VfsAction::OpenFile
|
||||
| VfsAction::OpenFile { .. }
|
||||
| VfsAction::CloseFile
|
||||
| VfsAction::WriteAll
|
||||
| VfsAction::Write
|
||||
| VfsAction::ReWrite
|
||||
| VfsAction::WriteAt(_)
|
||||
| VfsAction::WriteAt
|
||||
| VfsAction::Append
|
||||
| VfsAction::SyncAll
|
||||
| VfsAction::RemoveFile
|
||||
| VfsAction::RemoveDir
|
||||
| VfsAction::RemoveDirAll
|
||||
| VfsAction::Rename(_)
|
||||
| VfsAction::AddZip
|
||||
| VfsAction::SetLen(_) => {
|
||||
if src_package_id == package_id {
|
||||
return Ok(());
|
||||
}
|
||||
if !has_root_cap {
|
||||
|
||||
if has_root_cap {
|
||||
return Ok(());
|
||||
}
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.clone(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
"kind": "write",
|
||||
"drive": drive,
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
let has_cap = recv_cap_bool.await?;
|
||||
if !has_cap {
|
||||
return Err(VfsError::NoCap {
|
||||
action: request.action.to_string(),
|
||||
path: path.display().to_string(),
|
||||
@ -611,7 +618,7 @@ async fn check_caps(
|
||||
| VfsAction::ReadExact(_)
|
||||
| VfsAction::ReadToEnd
|
||||
| VfsAction::ReadToString
|
||||
| VfsAction::Seek(_)
|
||||
| VfsAction::Seek { .. }
|
||||
| VfsAction::Hash
|
||||
| VfsAction::Metadata
|
||||
| VfsAction::Len => {
|
||||
@ -648,6 +655,80 @@ async fn check_caps(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
VfsAction::CopyFile { new_path }
|
||||
| VfsAction::Rename { new_path } => {
|
||||
// these have 2 paths to validate
|
||||
if has_root_cap {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (new_package_id, new_drive, _rest) = parse_package_and_drive(&new_path).await?;
|
||||
let new_drive = format!("/{}/{}", new_package_id, new_drive);
|
||||
// if both new and old path are within the package_id path, ok
|
||||
if (src_package_id == package_id) && (src_package_id == new_package_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// otherwise check write caps.
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.clone(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
"kind": "write",
|
||||
"drive": drive,
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
let has_cap = recv_cap_bool.await?;
|
||||
if !has_cap {
|
||||
return Err(VfsError::NoCap {
|
||||
action: request.action.to_string(),
|
||||
path: path.display().to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// if they're within the same drive, no need for 2 caps checks
|
||||
if new_drive == drive {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.clone(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
"kind": "write",
|
||||
"drive": new_drive,
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
let has_cap = recv_cap_bool.await?;
|
||||
if !has_cap {
|
||||
return Err(VfsError::NoCap {
|
||||
action: request.action.to_string(),
|
||||
path: path.display().to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
VfsAction::CreateDrive => {
|
||||
if src_package_id != package_id {
|
||||
if !has_root_cap {
|
||||
@ -757,6 +838,15 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for VfsError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<zip::result::ZipError> for VfsError {
|
||||
fn from(err: zip::result::ZipError) -> Self {
|
||||
VfsError::IOError {
|
||||
error: err.to_string(),
|
||||
path: "".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for VfsError {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
VfsError::IOError {
|
||||
|
Loading…
Reference in New Issue
Block a user