From 09d993eb30c859ac2e3dd9b16d2168ef70ba41de Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 13 Dec 2023 20:27:22 -0300 Subject: [PATCH] vfs: refactor! --- src/types.rs | 7 +- src/vfs.rs | 348 ++++++++++++++++++++++----------------------------- 2 files changed, 158 insertions(+), 197 deletions(-) diff --git a/src/types.rs b/src/types.rs index 47be53bf..7b480344 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1035,8 +1035,10 @@ pub enum VfsError { BadRequest { error: String }, #[error("vfs: error parsing path: {path}, error: {error}")] ParseError { error: String, path: String }, - #[error("vfs: IO error: {source}, at path {path}.")] - IOError { source: tokio::io::Error, path: String }, + #[error("vfs: IO error: {error}, at path {path}.")] + IOError { error: String, path: String }, + #[error("vfs: kernel capability channel error: {error}")] + CapChannelFail { error: String }, #[error("vfs: Bad JSON payload: {error}.")] BadJson { error: String }, #[error("vfs: File not found at path {path}.")] @@ -1054,6 +1056,7 @@ impl VfsError { VfsError::BadRequest { .. } => "BadRequest", VfsError::ParseError { .. } => "ParseError", VfsError::IOError { .. } => "IOError", + VfsError::CapChannelFail { .. } => "CapChannelFail", VfsError::BadJson { .. } => "NoJson", VfsError::NotFound { .. } => "NotFound", VfsError::CreateDirError { .. } => "CreateDirError", diff --git a/src/vfs.rs b/src/vfs.rs index af96f000..df324e48 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::fs; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::Mutex; use crate::types::*; @@ -24,11 +24,6 @@ pub async fn vfs( panic!("failed creating vfs dir! {:?}", e); } - //process_A has drive A - //process_B creates drive A, conflict. - - //process_A has drive A, your processId => drive => folder => - let open_files: Arc>>> = Arc::new(DashMap::new()); let process_queues = Arc::new(Mutex::new( @@ -132,17 +127,24 @@ async fn handle_request( } }; + // sort by package_id instead? pros/cons? + // current prepend to filepaths needs to be: /process_id/drive/path let (process_id, drive) = parse_process_and_drive(&request.path).await?; + let drive = format!("/{}/{}", process_id, drive); + let path = PathBuf::from(request.path.clone()); // validate check_caps( our_node.clone(), source.clone(), send_to_caps_oracle.clone(), &request, + path.clone(), + drive, + vfs_path.clone(), ) .await?; - let path = request.path; // validate + let (ipc, bytes) = match request.action { VfsAction::CreateDrive => { @@ -160,13 +162,12 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::CreateFile => { - let file = open_file(open_files.clone(), path).await?; + let _file = open_file(open_files.clone(), path, true).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::OpenFile => { - // shouldn't create? - let file = open_file(open_files.clone(), path).await?; + let _file = open_file(open_files.clone(), path, false).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } @@ -176,26 +177,38 @@ async fn handle_request( } VfsAction::WriteAll => { // should expect open file. - let file = open_file(open_files.clone(), path).await?; + let Some(payload) = payload else { + return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() }) + }; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; file.write_all(&payload.bytes).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Write => { - let file = open_file(open_files.clone(), path).await?; + let Some(payload) = payload else { + return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() }) + }; + let file = open_file(open_files.clone(), path, true).await?; let mut file = file.lock().await; file.write_all(&payload.bytes).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::WriteAt(offset) => { - let file = open_file(open_files.clone(), path).await?; + let Some(payload) = payload else { + return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() }) + }; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(offset)).await?; file.write_all(&payload.bytes).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Append => { - let file = open_file(open_files.clone(), path).await?; + let Some(payload) = payload else { + return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() }) + }; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::End(0)).await?; file.write_all(&payload.bytes).await?; @@ -203,13 +216,13 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::SyncAll => { - let file = open_file(open_files.clone(), path).await?; - let mut file = file.lock().await; + let file = open_file(open_files.clone(), path, false).await?; + let file = file.lock().await; file.sync_all().await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Read => { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; @@ -223,7 +236,7 @@ async fn handle_request( let mut entries = Vec::new(); while let Some(entry) = dir.next_entry().await? { // risky non-unicode - entries.push(entry.path().to_str()?); + entries.push(entry.path().display().to_string()); } ( serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(), @@ -231,30 +244,35 @@ async fn handle_request( ) } VfsAction::ReadExact(length) => { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; let mut contents = vec![0; length as usize]; file.read_exact(&mut contents).await?; ( - serde_json::to_vec(&VfsResponse::ReadExact).unwrap(), + serde_json::to_vec(&VfsResponse::Read).unwrap(), Some(contents), ) } VfsAction::ReadToString => { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; let mut contents = String::new(); file.read_to_string(&mut contents).await?; ( - serde_json::to_vec(&VfsResponse::ReadToString).unwrap(), - Some(contents.into_bytes()), + serde_json::to_vec(&VfsResponse::ReadToString(contents)).unwrap(), + None, ) } VfsAction::Seek(seek_from) => { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; + // same type, rust tingz + let seek_from = match seek_from { + crate::types::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset), + crate::types::SeekFrom::End(offset) => std::io::SeekFrom::End(offset), + crate::types::SeekFrom::Current(offset) => std::io::SeekFrom::Current(offset), + }; file.seek(seek_from).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::RemoveFile => { @@ -275,22 +293,22 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Len => { - let file = open_file(open_files.clone(), path).await?; - let mut file = file.lock().await; + let file = open_file(open_files.clone(), path, false).await?; + let file = file.lock().await; let len = file.metadata().await?.len(); ( - serde_json::to_vec(&VfsResponse::Len(Some(len))).unwrap(), + serde_json::to_vec(&VfsResponse::Len(len)).unwrap(), None, ) } VfsAction::SetLen(len) => { - let file = open_file(open_files.clone(), path).await?; - let mut file = file.lock().await; + let file = open_file(open_files.clone(), path, false).await?; + let file = file.lock().await; file.set_len(len).await?; (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Hash => { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, false).await?; let mut file = file.lock().await; let mut hasher = blake3::Hasher::new(); let mut buffer = [0; 1024]; @@ -301,13 +319,16 @@ async fn handle_request( } hasher.update(&buffer[..bytes_read]); } - let hash = hasher.finalize(); + let hash: [u8; 32] = hasher.finalize().into(); ( - serde_json::to_vec(&VfsResponse::Hash(Some(hash.to_vec()))).unwrap(), + serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(), None, ) } VfsAction::AddZip => { + let Some(payload) = payload else { + return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() }) + }; let Some(mime) = payload.mime else { return Err(VfsError::BadRequest { error: "payload mime type needs to exist for AddZip".into() }) }; @@ -317,7 +338,7 @@ async fn handle_request( let file = std::io::Cursor::new(&payload.bytes); let mut zip = match zip::ZipArchive::new(file) { Ok(f) => f, - Err(_) => return Err(VfsError::InternalError), + Err(e) => return Err(VfsError::ParseError { error: e.to_string(), path: path.display().to_string() }), }; // loop through items in archive; recursively add to root @@ -337,19 +358,12 @@ async fn handle_request( (is_file, is_dir, full_path, file_contents) }; if is_file { - let file = open_file(open_files.clone(), path).await?; + let file = open_file(open_files.clone(), path, true).await?; let mut file = file.lock().await; - file.write_all(&file_contents).await.unwrap(); + file.write_all(&file_contents).await?; } else if is_dir { - let path = validate_path( - vfs_path.clone(), - request.drive.clone(), - path.clone(), - ) - .await?; - // If it's a directory, create it - fs::create_dir_all(path).await.unwrap(); + fs::create_dir_all(path).await?; } else { println!("vfs: zip with non-file non-dir"); return Err(VfsError::CreateDirError { path: path, error: "vfs: zip with non-file non-dir".into() }); @@ -357,7 +371,7 @@ async fn handle_request( } (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } - } + }; if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { @@ -425,30 +439,12 @@ async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsE Ok((process_id, drive)) } -async fn validate_path( - vfs_path: String, - drive: String, - request_path: String, -) -> Result { - let drive_base = Path::new(&vfs_path).join(&drive); - if let Err(e) = fs::create_dir_all(&drive_base).await { - println!("failed creating drive dir! {:?}", e); - } - let request_path = request_path.strip_prefix("/").unwrap_or(&request_path); - - let combined = drive_base.join(&request_path); - if combined.starts_with(&drive_base) { - Ok(combined) - } else { - println!("didn't start with base, combined: {:?}", combined); - Err(VfsError::InternalError) - } -} - -async fn open_file( +async fn open_file>( open_files: Arc>>>, - path: PathBuf, + path: P, + create: bool, ) -> Result>, VfsError> { + let path = path.as_ref().to_path_buf(); Ok(match open_files.get(&path) { Some(file) => Arc::clone(file.value()), None => { @@ -456,12 +452,12 @@ async fn open_file( OpenOptions::new() .read(true) .write(true) - .create(true) + .create(create) .open(&path) .await - .map_err(|_| VfsError::InternalError)?, + .map_err(|e| VfsError::IOError { error: e.to_string(), path: path.display().to_string() })?, )); - open_files.insert(path, Arc::clone(&file)); + open_files.insert(path.clone(), Arc::clone(&file)); file } }) @@ -470,123 +466,16 @@ async fn open_file( async fn check_caps( our_node: String, source: Address, - send_to_caps_oracle: CapMessageSender, + mut send_to_caps_oracle: CapMessageSender, request: &VfsRequest, + path: PathBuf, + drive: String, + vfs_dir_path: String, ) -> Result<(), VfsError> { let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); // check caps // process_id + drive. + kernel needs auto_access. match &request.action { - VfsAction::Add { .. } - | VfsAction::Delete { .. } - | VfsAction::WriteOffset { .. } - | VfsAction::Append { .. } - | VfsAction::SetSize { .. } => { - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.clone(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "write", - "process_id": process_id.to_string(), - "drive": request.drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await - .unwrap(); - let has_cap = recv_cap_bool.await.unwrap(); - if !has_cap { - return Err(VfsError::NoCap); - } - Ok(()) - } - VfsAction::GetEntry { .. } - | VfsAction::GetFileChunk { .. } - | VfsAction::GetEntryLength { .. } => { - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.clone(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "read", - "drive": request.drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await - .unwrap(); - let has_cap = recv_cap_bool.await.unwrap(); - if !has_cap { - return Err(VfsError::NoCap); - } - Ok(()) - } - VfsAction::New { .. } => { - let read_cap = Capability { - issuer: Address { - node: our_node.clone(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string( - &serde_json::json!({"kind": "read", "drive": request.drive}), - ) - .unwrap(), - }; - let write_cap = Capability { - issuer: Address { - node: our_node.clone(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string( - &serde_json::json!({"kind": "write", "drive": request.drive}), - ) - .unwrap(), - }; - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Add { - on: source.process.clone(), - cap: read_cap, - responder: send_cap_bool, - }) - .await - .unwrap(); - let _ = recv_cap_bool.await.unwrap(); - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Add { - on: source.process.clone(), - cap: write_cap, - responder: send_cap_bool, - }) - .await - .unwrap(); - let _ = recv_cap_bool.await.unwrap(); - Ok(()) - } - } -} - -async fn check_capss( - our_node: String, - source: Address, - send_to_caps_oracle: CapMessageSender, - request: &VfsRequest, -) -> Result<(), VfsError> { - match request.action { VfsAction::CreateDir | VfsAction::CreateDirAll | VfsAction::CreateFile @@ -601,6 +490,7 @@ async fn check_capss( | VfsAction::RemoveDir | VfsAction::RemoveDirAll | VfsAction::Rename(_) + | VfsAction::AddZip | VfsAction::SetLen(_) => { send_to_caps_oracle .send(CapMessage::Has { @@ -612,18 +502,16 @@ async fn check_capss( }, params: serde_json::to_string(&serde_json::json!({ "kind": "write", - "process_id": process_id.to_string(), - "drive": request.drive, + "drive": drive, })) .unwrap(), }, responder: send_cap_bool, }) - .await - .unwrap(); - let has_cap = recv_cap_bool.await.unwrap(); + .await?; + let has_cap = recv_cap_bool.await?; if !has_cap { - return Err(VfsError::NoCap); + return Err(VfsError::NoCap { action: request.action.to_string(), path: path.display().to_string() }); } Ok(()) } @@ -633,18 +521,64 @@ async fn check_capss( | VfsAction::ReadToString | VfsAction::Seek(_) | VfsAction::Hash - | VfsAction::Len => { - if !caps.read { - return Err(VfsError::NoCap { - action: format!("{:?}", action), - path: caps.path.clone(), - }); + | VfsAction::Len => { + send_to_caps_oracle + .send(CapMessage::Has { + on: source.process.clone(), + cap: Capability { + issuer: Address { + node: our_node.clone(), + process: VFS_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "kind": "read", + "drive": drive, + })) + .unwrap(), + }, + responder: send_cap_bool, + }) + .await?; + let has_cap = recv_cap_bool.await?; + if !has_cap { + return Err(VfsError::NoCap { action: request.action.to_string(), path: path.display().to_string() }); } + Ok(()) } - VfsAction::CreateDrive { - + VfsAction::CreateDrive => { + add_capability("read", &drive, &our_node, &source, &mut send_to_caps_oracle).await?; + add_capability("write", &drive, &our_node, &source, &mut send_to_caps_oracle).await?; + + let drive_path = format!("{}{}", vfs_dir_path, drive); + fs::create_dir_all(drive_path).await?; + Ok(()) } } +} + +async fn add_capability( + kind: &str, + drive: &str, + our_node: &str, + source: &Address, + send_to_caps_oracle: &mut CapMessageSender, +) -> Result<(), VfsError> { + let cap = Capability { + issuer: Address { + node: our_node.to_string(), + process: VFS_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ "kind": kind, "drive": drive })).unwrap(), + }; + let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + send_to_caps_oracle + .send(CapMessage::Add { + on: source.process.clone(), + cap, + responder: send_cap_bool, + }) + .await?; + let _ = recv_cap_bool.await?; Ok(()) } @@ -674,3 +608,27 @@ fn make_error_message( signed_capabilities: None, } } + +impl From for VfsError { + fn from(err: std::io::Error) -> Self { + VfsError::IOError { error: err.to_string(), path: "".to_string() } // replace with appropriate VfsError variant and fields + } +} + +impl From for VfsError { + fn from(err: tokio::sync::oneshot::error::RecvError) -> Self { + VfsError::CapChannelFail { error: err.to_string() } + } +} + +impl From> for VfsError { + fn from(err: tokio::sync::mpsc::error::SendError) -> Self { + VfsError::CapChannelFail { error: err.to_string() } + } +} + +impl std::fmt::Display for VfsAction { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} \ No newline at end of file