vfs: refactor!

This commit is contained in:
bitful-pannul 2023-12-13 20:27:22 -03:00
parent 3cc06271fe
commit 09d993eb30
2 changed files with 158 additions and 197 deletions

View File

@ -1035,8 +1035,10 @@ pub enum VfsError {
BadRequest { error: String },
#[error("vfs: error parsing path: {path}, error: {error}")]
ParseError { error: String, path: String },
#[error("vfs: IO error: {source}, at path {path}.")]
IOError { source: tokio::io::Error, path: String },
#[error("vfs: IO error: {error}, at path {path}.")]
IOError { error: String, path: String },
#[error("vfs: kernel capability channel error: {error}")]
CapChannelFail { error: String },
#[error("vfs: Bad JSON payload: {error}.")]
BadJson { error: String },
#[error("vfs: File not found at path {path}.")]
@ -1054,6 +1056,7 @@ impl VfsError {
VfsError::BadRequest { .. } => "BadRequest",
VfsError::ParseError { .. } => "ParseError",
VfsError::IOError { .. } => "IOError",
VfsError::CapChannelFail { .. } => "CapChannelFail",
VfsError::BadJson { .. } => "NoJson",
VfsError::NotFound { .. } => "NotFound",
VfsError::CreateDirError { .. } => "CreateDirError",

View File

@ -6,7 +6,7 @@ use std::sync::Arc;
use tokio::fs;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::Mutex;
use crate::types::*;
@ -24,11 +24,6 @@ pub async fn vfs(
panic!("failed creating vfs dir! {:?}", e);
}
//process_A has drive A
//process_B creates drive A, conflict.
//process_A has drive A, your processId => drive => folder =>
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
let process_queues = Arc::new(Mutex::new(
@ -132,17 +127,24 @@ async fn handle_request(
}
};
// sort by package_id instead? pros/cons?
// current prepend to filepaths needs to be: /process_id/drive/path
let (process_id, drive) = parse_process_and_drive(&request.path).await?;
let drive = format!("/{}/{}", process_id, drive);
let path = PathBuf::from(request.path.clone()); // validate
check_caps(
our_node.clone(),
source.clone(),
send_to_caps_oracle.clone(),
&request,
path.clone(),
drive,
vfs_path.clone(),
)
.await?;
let path = request.path; // validate
let (ipc, bytes) = match request.action {
VfsAction::CreateDrive => {
@ -160,13 +162,12 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::CreateFile => {
let file = open_file(open_files.clone(), path).await?;
let _file = open_file(open_files.clone(), path, true).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::OpenFile => {
// shouldn't create?
let file = open_file(open_files.clone(), path).await?;
let _file = open_file(open_files.clone(), path, false).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
@ -176,26 +177,38 @@ async fn handle_request(
}
VfsAction::WriteAll => {
// should expect open file.
let file = open_file(open_files.clone(), path).await?;
let Some(payload) = payload else {
return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() })
};
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Write => {
let file = open_file(open_files.clone(), path).await?;
let Some(payload) = payload else {
return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() })
};
let file = open_file(open_files.clone(), path, true).await?;
let mut file = file.lock().await;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::WriteAt(offset) => {
let file = open_file(open_files.clone(), path).await?;
let Some(payload) = payload else {
return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() })
};
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(offset)).await?;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Append => {
let file = open_file(open_files.clone(), path).await?;
let Some(payload) = payload else {
return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() })
};
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::End(0)).await?;
file.write_all(&payload.bytes).await?;
@ -203,13 +216,13 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::SyncAll => {
let file = open_file(open_files.clone(), path).await?;
let mut file = file.lock().await;
let file = open_file(open_files.clone(), path, false).await?;
let file = file.lock().await;
file.sync_all().await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Read => {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
@ -223,7 +236,7 @@ async fn handle_request(
let mut entries = Vec::new();
while let Some(entry) = dir.next_entry().await? {
// risky non-unicode
entries.push(entry.path().to_str()?);
entries.push(entry.path().display().to_string());
}
(
serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(),
@ -231,30 +244,35 @@ async fn handle_request(
)
}
VfsAction::ReadExact(length) => {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
let mut contents = vec![0; length as usize];
file.read_exact(&mut contents).await?;
(
serde_json::to_vec(&VfsResponse::ReadExact).unwrap(),
serde_json::to_vec(&VfsResponse::Read).unwrap(),
Some(contents),
)
}
VfsAction::ReadToString => {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
(
serde_json::to_vec(&VfsResponse::ReadToString).unwrap(),
Some(contents.into_bytes()),
serde_json::to_vec(&VfsResponse::ReadToString(contents)).unwrap(),
None,
)
}
VfsAction::Seek(seek_from) => {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
// same type, rust tingz
let seek_from = match seek_from {
crate::types::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset),
crate::types::SeekFrom::End(offset) => std::io::SeekFrom::End(offset),
crate::types::SeekFrom::Current(offset) => std::io::SeekFrom::Current(offset),
};
file.seek(seek_from).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::RemoveFile => {
@ -275,22 +293,22 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Len => {
let file = open_file(open_files.clone(), path).await?;
let mut file = file.lock().await;
let file = open_file(open_files.clone(), path, false).await?;
let file = file.lock().await;
let len = file.metadata().await?.len();
(
serde_json::to_vec(&VfsResponse::Len(Some(len))).unwrap(),
serde_json::to_vec(&VfsResponse::Len(len)).unwrap(),
None,
)
}
VfsAction::SetLen(len) => {
let file = open_file(open_files.clone(), path).await?;
let mut file = file.lock().await;
let file = open_file(open_files.clone(), path, false).await?;
let file = file.lock().await;
file.set_len(len).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Hash => {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
let mut hasher = blake3::Hasher::new();
let mut buffer = [0; 1024];
@ -301,13 +319,16 @@ async fn handle_request(
}
hasher.update(&buffer[..bytes_read]);
}
let hash = hasher.finalize();
let hash: [u8; 32] = hasher.finalize().into();
(
serde_json::to_vec(&VfsResponse::Hash(Some(hash.to_vec()))).unwrap(),
serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(),
None,
)
}
VfsAction::AddZip => {
let Some(payload) = payload else {
return Err(VfsError::BadRequest { error: "payload needs to exist for AddZip".into() })
};
let Some(mime) = payload.mime else {
return Err(VfsError::BadRequest { error: "payload mime type needs to exist for AddZip".into() })
};
@ -317,7 +338,7 @@ async fn handle_request(
let file = std::io::Cursor::new(&payload.bytes);
let mut zip = match zip::ZipArchive::new(file) {
Ok(f) => f,
Err(_) => return Err(VfsError::InternalError),
Err(e) => return Err(VfsError::ParseError { error: e.to_string(), path: path.display().to_string() }),
};
// loop through items in archive; recursively add to root
@ -337,19 +358,12 @@ async fn handle_request(
(is_file, is_dir, full_path, file_contents)
};
if is_file {
let file = open_file(open_files.clone(), path).await?;
let file = open_file(open_files.clone(), path, true).await?;
let mut file = file.lock().await;
file.write_all(&file_contents).await.unwrap();
file.write_all(&file_contents).await?;
} else if is_dir {
let path = validate_path(
vfs_path.clone(),
request.drive.clone(),
path.clone(),
)
.await?;
// If it's a directory, create it
fs::create_dir_all(path).await.unwrap();
fs::create_dir_all(path).await?;
} else {
println!("vfs: zip with non-file non-dir");
return Err(VfsError::CreateDirError { path: path, error: "vfs: zip with non-file non-dir".into() });
@ -357,7 +371,7 @@ async fn handle_request(
}
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
}
};
if let Some(target) = km.rsvp.or_else(|| {
expects_response.map(|_| Address {
@ -425,30 +439,12 @@ async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsE
Ok((process_id, drive))
}
async fn validate_path(
vfs_path: String,
drive: String,
request_path: String,
) -> Result<PathBuf, VfsError> {
let drive_base = Path::new(&vfs_path).join(&drive);
if let Err(e) = fs::create_dir_all(&drive_base).await {
println!("failed creating drive dir! {:?}", e);
}
let request_path = request_path.strip_prefix("/").unwrap_or(&request_path);
let combined = drive_base.join(&request_path);
if combined.starts_with(&drive_base) {
Ok(combined)
} else {
println!("didn't start with base, combined: {:?}", combined);
Err(VfsError::InternalError)
}
}
async fn open_file(
async fn open_file<P: AsRef<Path>>(
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
path: PathBuf,
path: P,
create: bool,
) -> Result<Arc<Mutex<fs::File>>, VfsError> {
let path = path.as_ref().to_path_buf();
Ok(match open_files.get(&path) {
Some(file) => Arc::clone(file.value()),
None => {
@ -456,12 +452,12 @@ async fn open_file(
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.create(create)
.open(&path)
.await
.map_err(|_| VfsError::InternalError)?,
.map_err(|e| VfsError::IOError { error: e.to_string(), path: path.display().to_string() })?,
));
open_files.insert(path, Arc::clone(&file));
open_files.insert(path.clone(), Arc::clone(&file));
file
}
})
@ -470,123 +466,16 @@ async fn open_file(
async fn check_caps(
our_node: String,
source: Address,
send_to_caps_oracle: CapMessageSender,
mut send_to_caps_oracle: CapMessageSender,
request: &VfsRequest,
path: PathBuf,
drive: String,
vfs_dir_path: String,
) -> Result<(), VfsError> {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
// check caps
// process_id + drive. + kernel needs auto_access.
match &request.action {
VfsAction::Add { .. }
| VfsAction::Delete { .. }
| VfsAction::WriteOffset { .. }
| VfsAction::Append { .. }
| VfsAction::SetSize { .. } => {
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"process_id": process_id.to_string(),
"drive": request.drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await
.unwrap();
let has_cap = recv_cap_bool.await.unwrap();
if !has_cap {
return Err(VfsError::NoCap);
}
Ok(())
}
VfsAction::GetEntry { .. }
| VfsAction::GetFileChunk { .. }
| VfsAction::GetEntryLength { .. } => {
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": request.drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await
.unwrap();
let has_cap = recv_cap_bool.await.unwrap();
if !has_cap {
return Err(VfsError::NoCap);
}
Ok(())
}
VfsAction::New { .. } => {
let read_cap = Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(
&serde_json::json!({"kind": "read", "drive": request.drive}),
)
.unwrap(),
};
let write_cap = Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(
&serde_json::json!({"kind": "write", "drive": request.drive}),
)
.unwrap(),
};
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Add {
on: source.process.clone(),
cap: read_cap,
responder: send_cap_bool,
})
.await
.unwrap();
let _ = recv_cap_bool.await.unwrap();
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Add {
on: source.process.clone(),
cap: write_cap,
responder: send_cap_bool,
})
.await
.unwrap();
let _ = recv_cap_bool.await.unwrap();
Ok(())
}
}
}
async fn check_capss(
our_node: String,
source: Address,
send_to_caps_oracle: CapMessageSender,
request: &VfsRequest,
) -> Result<(), VfsError> {
match request.action {
VfsAction::CreateDir
| VfsAction::CreateDirAll
| VfsAction::CreateFile
@ -601,6 +490,7 @@ async fn check_capss(
| VfsAction::RemoveDir
| VfsAction::RemoveDirAll
| VfsAction::Rename(_)
| VfsAction::AddZip
| VfsAction::SetLen(_) => {
send_to_caps_oracle
.send(CapMessage::Has {
@ -612,18 +502,16 @@ async fn check_capss(
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"process_id": process_id.to_string(),
"drive": request.drive,
"drive": drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await
.unwrap();
let has_cap = recv_cap_bool.await.unwrap();
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(VfsError::NoCap);
return Err(VfsError::NoCap { action: request.action.to_string(), path: path.display().to_string() });
}
Ok(())
}
@ -634,17 +522,63 @@ async fn check_capss(
| VfsAction::Seek(_)
| VfsAction::Hash
| VfsAction::Len => {
if !caps.read {
return Err(VfsError::NoCap {
action: format!("{:?}", action),
path: caps.path.clone(),
});
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(VfsError::NoCap { action: request.action.to_string(), path: path.display().to_string() });
}
Ok(())
}
VfsAction::CreateDrive {
VfsAction::CreateDrive => {
add_capability("read", &drive, &our_node, &source, &mut send_to_caps_oracle).await?;
add_capability("write", &drive, &our_node, &source, &mut send_to_caps_oracle).await?;
let drive_path = format!("{}{}", vfs_dir_path, drive);
fs::create_dir_all(drive_path).await?;
Ok(())
}
}
}
async fn add_capability(
kind: &str,
drive: &str,
our_node: &str,
source: &Address,
send_to_caps_oracle: &mut CapMessageSender,
) -> Result<(), VfsError> {
let cap = Capability {
issuer: Address {
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({ "kind": kind, "drive": drive })).unwrap(),
};
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Add {
on: source.process.clone(),
cap,
responder: send_cap_bool,
})
.await?;
let _ = recv_cap_bool.await?;
Ok(())
}
@ -674,3 +608,27 @@ fn make_error_message(
signed_capabilities: None,
}
}
impl From<std::io::Error> for VfsError {
fn from(err: std::io::Error) -> Self {
VfsError::IOError { error: err.to_string(), path: "".to_string() } // replace with appropriate VfsError variant and fields
}
}
impl From<tokio::sync::oneshot::error::RecvError> for VfsError {
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
VfsError::CapChannelFail { error: err.to_string() }
}
}
impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for VfsError {
fn from(err: tokio::sync::mpsc::error::SendError<CapMessage>) -> Self {
VfsError::CapChannelFail { error: err.to_string() }
}
}
impl std::fmt::Display for VfsAction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}