This commit is contained in:
dr-frmr 2024-06-05 17:38:38 -06:00
parent 63f8acce46
commit d31c65c8df
No known key found for this signature in database
3 changed files with 86 additions and 84 deletions

View File

@ -41,5 +41,8 @@ fn init(_our: Address) {
println!("no file found at {}", file_path);
return;
};
println!("{}", String::from_utf8(blob.bytes).unwrap());
match String::from_utf8(blob.bytes) {
Ok(s) => println!("{s}"),
Err(_e) => println!("error: file at {file_path} could not be parsed as utf-8 string!"),
}
}

View File

@ -1,5 +1,5 @@
use dashmap::DashMap;
use std::collections::{HashMap, VecDeque};
// use std::collections::{HashMap, VecDeque};
use std::io::prelude::*;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
@ -18,7 +18,7 @@ pub async fn vfs(
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let vfs_path = format!("{}/vfs", &home_directory_path);
let vfs_path = format!("{home_directory_path}/vfs");
if let Err(e) = fs::create_dir_all(&vfs_path).await {
panic!("failed creating vfs dir! {:?}", e);
@ -27,70 +27,73 @@ pub async fn vfs(
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::new();
// let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
// HashMap::new();
loop {
let Some(km) = recv_from_loop.recv().await else {
continue;
};
if our_node.clone() != km.source.node {
println!(
"vfs: request must come from our_node={}, got: {}",
our_node, km.source.node,
);
while let Some(km) = recv_from_loop.recv().await {
if our_node != km.source.node {
let _ = send_to_terminal.send(Printout {
verbosity: 1,
content: format!(
"vfs: got request from {}, but requests must come from our node {our_node}\r",
km.source.node,
),
});
continue;
}
let queue = process_queues
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();
// let queue = process_queues
// .entry(km.source.process.clone())
// .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
// .clone();
{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
}
// {
// let mut queue_lock = queue.lock().await;
// queue_lock.push_back(km.clone());
// }
// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_loop = send_to_loop.clone();
let open_files = open_files.clone();
let vfs_path = vfs_path.clone();
// // clone Arcs
// let our_node = our_node.clone();
// let send_to_caps_oracle = send_to_caps_oracle.clone();
// let send_to_terminal = send_to_terminal.clone();
// let send_to_loop = send_to_loop.clone();
// let open_files = open_files.clone();
// let vfs_path = vfs_path.clone();
// tokio::spawn(async move {
// let mut queue_lock = queue.lock().await;
// if let Some(km) = queue_lock.pop_front() {
let (km_id, km_source) = (km.id.clone(), km.source.clone());
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
&our_node,
km,
open_files.clone(),
send_to_loop.clone(),
send_to_terminal.clone(),
send_to_caps_oracle.clone(),
vfs_path.clone(),
&send_to_loop,
&send_to_terminal,
&send_to_caps_oracle,
&vfs_path,
)
.await
{
let _ = send_to_loop
.send(make_error_message(our_node.clone(), km.id, km.source, e))
.send(make_error_message(our_node.clone(), km_id, km_source, e))
.await;
}
// }
// });
}
});
}
Ok(())
}
async fn handle_request(
our_node: String,
our_node: &str,
km: KernelMessage,
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
send_to_caps_oracle: CapMessageSender,
vfs_path: PathBuf,
send_to_loop: &MessageSender,
send_to_terminal: &PrintSender,
send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf,
) -> Result<(), VfsError> {
let KernelMessage {
id,
@ -114,7 +117,6 @@ async fn handle_request(
let request: VfsRequest = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
println!("vfs: got invalid Request: {}", e);
return Err(VfsError::BadJson {
error: e.to_string(),
});
@ -130,7 +132,7 @@ async fn handle_request(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
@ -161,7 +163,7 @@ async fn handle_request(
let response = KernelMessage {
id,
source: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target: source,
@ -196,7 +198,7 @@ async fn handle_request(
if km.source.process != *KERNEL_PROCESS_ID {
check_caps(
our_node.clone(),
our_node,
source.clone(),
send_to_caps_oracle.clone(),
&request,
@ -470,7 +472,6 @@ async fn handle_request(
} else if is_dir {
fs::create_dir_all(local_path).await?;
} else {
println!("vfs: zip with non-file non-dir");
return Err(VfsError::CreateDirError {
path: path.display().to_string(),
error: "vfs: zip with non-file non-dir".into(),
@ -483,14 +484,14 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| {
expects_response.map(|_| Address {
node: our_node.clone(),
node: our_node.to_string(),
process: source.process.clone(),
})
}) {
let response = KernelMessage {
id,
source: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target,
@ -512,7 +513,6 @@ async fn handle_request(
let _ = send_to_loop.send(response).await;
} else {
println!("vfs: not sending response: ");
send_to_terminal
.send(Printout {
verbosity: 2,
@ -538,11 +538,7 @@ async fn parse_package_and_drive(
let normalized_path = normalize_path(&joined_path);
if !normalized_path.starts_with(vfs_path) {
return Err(VfsError::BadRequest {
error: format!(
"input path tries to escape parent vfs directory: {:?}",
path
)
.into(),
error: format!("input path tries to escape parent vfs directory: {path}"),
})?;
}
@ -550,11 +546,7 @@ async fn parse_package_and_drive(
let path = normalized_path
.strip_prefix(vfs_path)
.map_err(|_| VfsError::BadRequest {
error: format!(
"input path tries to escape parent vfs directory: {:?}",
path
)
.into(),
error: format!("input path tries to escape parent vfs directory: {path}"),
})?
.display()
.to_string();
@ -567,7 +559,7 @@ async fn parse_package_and_drive(
if parts.len() < 2 {
return Err(VfsError::ParseError {
error: "malformed path".into(),
path: path.to_string(),
path,
});
}
@ -576,7 +568,7 @@ async fn parse_package_and_drive(
Err(e) => {
return Err(VfsError::ParseError {
error: e.to_string(),
path: path.to_string(),
path,
})
}
};
@ -617,7 +609,7 @@ async fn open_file<P: AsRef<Path>>(
}
async fn check_caps(
our_node: String,
our_node: &str,
source: Address,
mut send_to_caps_oracle: CapMessageSender,
request: &VfsRequest,
@ -635,7 +627,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
@ -676,7 +668,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
@ -718,7 +710,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
@ -761,7 +753,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
@ -792,7 +784,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({

View File

@ -252,15 +252,22 @@ pub enum ProcessIdParseError {
impl std::fmt::Display for ProcessIdParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_string())
write!(
f,
"{}",
match self {
ProcessIdParseError::TooManyColons => "Too many colons",
ProcessIdParseError::MissingField => "Missing field",
}
)
}
}
impl std::error::Error for ProcessIdParseError {
fn description(&self) -> &str {
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
ProcessIdParseError::TooManyColons => "Too many colons",
ProcessIdParseError::MissingField => "Missing field",
}
}
}
@ -1514,7 +1521,7 @@ pub enum VfsError {
BadBytes { action: String, path: String },
#[error("vfs: bad request error: {error}")]
BadRequest { error: String },
#[error("vfs: error parsing path: {path}, error: {error}")]
#[error("vfs: error parsing path: {path}: {error}")]
ParseError { error: String, path: String },
#[error("vfs: IO error: {error}, at path {path}")]
IOError { error: String, path: String },