vfs: fix hang

This commit is contained in:
hosted-fornet 2023-10-09 21:26:08 -07:00
parent 872b15fda2
commit 384760b23c
4 changed files with 114 additions and 109 deletions

View File

@ -16,8 +16,7 @@ pub async fn load_fs(
home_directory_path: String, home_directory_path: String,
file_key: Vec<u8>, file_key: Vec<u8>,
fs_config: FsConfig, fs_config: FsConfig,
vfs_message_sender: MessageSender, ) -> Result<(ProcessMap, Manifest, Vec<KernelMessage>), FsError> {
) -> Result<(ProcessMap, Manifest), FsError> {
// load/create fs directory, manifest + log if none. // load/create fs directory, manifest + log if none.
let fs_directory_path_str = format!("{}/fs", &home_directory_path); let fs_directory_path_str = format!("{}/fs", &home_directory_path);
@ -66,7 +65,7 @@ pub async fn load_fs(
let mut process_map: ProcessMap = HashMap::new(); let mut process_map: ProcessMap = HashMap::new();
// get current processes' wasm_bytes handles. GetState(kernel) // get current processes' wasm_bytes handles. GetState(kernel)
match manifest.read(&kernel_process_id, None, None).await { let vfs_messages = match manifest.read(&kernel_process_id, None, None).await {
Err(_) => { Err(_) => {
// bootstrap filesystem // bootstrap filesystem
bootstrap( bootstrap(
@ -74,17 +73,17 @@ pub async fn load_fs(
&kernel_process_id, &kernel_process_id,
&mut process_map, &mut process_map,
&mut manifest, &mut manifest,
&vfs_message_sender,
) )
.await .await
.expect("fresh bootstrap failed!"); .expect("fresh bootstrap failed!")
} }
Ok(bytes) => { Ok(bytes) => {
process_map = bincode::deserialize(&bytes).expect("state map deserialization error!"); process_map = bincode::deserialize(&bytes).expect("state map deserialization error!");
vec![]
} }
} };
Ok((process_map, manifest)) Ok((process_map, manifest, vfs_messages))
} }
/// function run only upon fresh boot. /// function run only upon fresh boot.
@ -100,8 +99,7 @@ async fn bootstrap(
kernel_process_id: &FileIdentifier, kernel_process_id: &FileIdentifier,
process_map: &mut ProcessMap, process_map: &mut ProcessMap,
manifest: &mut Manifest, manifest: &mut Manifest,
vfs_message_sender: &MessageSender, ) -> Result<Vec<KernelMessage>> {
) -> Result<()> {
println!("bootstrapping node...\r"); println!("bootstrapping node...\r");
const RUNTIME_MODULES: [(&str, bool); 8] = [ const RUNTIME_MODULES: [(&str, bool); 8] = [
("filesystem:sys:uqbar", false), ("filesystem:sys:uqbar", false),
@ -109,7 +107,7 @@ async fn bootstrap(
("http_client:sys:uqbar", false), ("http_client:sys:uqbar", false),
("encryptor:sys:uqbar", false), ("encryptor:sys:uqbar", false),
("net:sys:uqbar", false), ("net:sys:uqbar", false),
("vfs:sys:uqbar", false), ("vfs:sys:uqbar", true),
("kernel:sys:uqbar", false), ("kernel:sys:uqbar", false),
("eth_rpc:sys:uqbar", true), // TODO evaluate ("eth_rpc:sys:uqbar", true), // TODO evaluate
]; ];
@ -144,6 +142,7 @@ async fn bootstrap(
public: runtime_module.1, public: runtime_module.1,
}); });
} }
println!("fs bs: runtime process_map {:#?}\r", process_map);
let packages: Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> = let packages: Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> =
get_zipped_packages().await; get_zipped_packages().await;
@ -151,11 +150,12 @@ async fn bootstrap(
// need to grant all caps at the end, after process_map has been filled in! // need to grant all caps at the end, after process_map has been filled in!
let mut caps_to_grant = Vec::<(ProcessId, Capability)>::new(); let mut caps_to_grant = Vec::<(ProcessId, Capability)>::new();
let mut vfs_messages = Vec::new();
for (package_name, mut package) in packages { for (package_name, mut package) in packages {
println!("fs: handling package {package_name}...\r"); println!("fs: handling package {package_name}...\r");
// create a new package in VFS // create a new package in VFS
vfs_message_sender vfs_messages.push(KernelMessage {
.send(KernelMessage {
id: rand::random(), id: rand::random(),
source: Address { source: Address {
node: our_name.to_string(), node: our_name.to_string(),
@ -180,9 +180,7 @@ async fn bootstrap(
}), }),
payload: None, payload: None,
signed_capabilities: None, signed_capabilities: None,
}) });
.await
.unwrap();
// for each file in package.zip, recursively through all dirs, send a newfile KM to VFS // for each file in package.zip, recursively through all dirs, send a newfile KM to VFS
for i in 0..package.len() { for i in 0..package.len() {
let mut file = package.by_index(i).unwrap(); let mut file = package.by_index(i).unwrap();
@ -198,8 +196,7 @@ async fn bootstrap(
println!("fs: found file {}...\r", file_path); println!("fs: found file {}...\r", file_path);
let mut file_content = Vec::new(); let mut file_content = Vec::new();
file.read_to_end(&mut file_content).unwrap(); file.read_to_end(&mut file_content).unwrap();
vfs_message_sender vfs_messages.push(KernelMessage {
.send(KernelMessage {
id: rand::random(), id: rand::random(),
source: Address { source: Address {
node: our_name.to_string(), node: our_name.to_string(),
@ -230,9 +227,7 @@ async fn bootstrap(
bytes: file_content, bytes: file_content,
}), }),
signed_capabilities: None, signed_capabilities: None,
}) });
.await
.unwrap();
} }
} }
@ -376,7 +371,7 @@ async fn bootstrap(
.write(&kernel_process_id, &serialized_process_map) .write(&kernel_process_id, &serialized_process_map)
.await; .await;
} }
Ok(()) Ok(vfs_messages)
} }
/// go into /target folder and get all .zip package files /// go into /target folder and get all .zip package files

View File

@ -1751,7 +1751,7 @@ async fn make_event_loop(
); );
senders.insert( senders.insert(
t::ProcessId::new(Some("vfs"), "sys", "uqbar"), t::ProcessId::new(Some("vfs"), "sys", "uqbar"),
ProcessSender::Runtime(send_to_vfs.clone()), ProcessSender::Runtime(send_to_vfs),
); );
// each running process is stored in this map // each running process is stored in this map
@ -2009,10 +2009,12 @@ async fn make_event_loop(
} }
match senders.get(&kernel_message.target.process) { match senders.get(&kernel_message.target.process) {
Some(ProcessSender::Userspace(sender)) => { Some(ProcessSender::Userspace(sender)) => {
println!("el: sending to {}\r", kernel_message.target.process);
// TODO: should this failing should crash kernel? probably not // TODO: should this failing should crash kernel? probably not
sender.send(Ok(kernel_message)).await.unwrap(); sender.send(Ok(kernel_message)).await.unwrap();
} }
Some(ProcessSender::Runtime(sender)) => { Some(ProcessSender::Runtime(sender)) => {
println!("el: sending to {}\r", kernel_message.target.process);
sender.send(kernel_message).await.expect("fatal: runtime module died"); sender.send(kernel_message).await.expect("fatal: runtime module died");
} }
None => { None => {

View File

@ -373,12 +373,11 @@ async fn main() {
) )
}; };
let (kernel_process_map, manifest) = filesystem::load_fs( let (kernel_process_map, manifest, vfs_messages) = filesystem::load_fs(
our.name.clone(), our.name.clone(),
home_directory_path.clone(), home_directory_path.clone(),
file_key, file_key,
fs_config, fs_config,
vfs_message_sender.clone(),
) )
.await .await
.expect("fs load failed!"); .expect("fs load failed!");
@ -466,6 +465,7 @@ async fn main() {
print_sender.clone(), print_sender.clone(),
vfs_message_receiver, vfs_message_receiver,
caps_oracle_sender.clone(), caps_oracle_sender.clone(),
vfs_messages,
)); ));
tasks.spawn(encryptor::encryptor( tasks.spawn(encryptor::encryptor(
our.name.clone(), our.name.clone(),

View File

@ -25,8 +25,8 @@ struct Vfs {
key_to_entry: KeyToEntry, key_to_entry: KeyToEntry,
path_to_key: PathToKey, path_to_key: PathToKey,
} }
type IdentifierToVfs = HashMap<String, Arc<Mutex<Vfs>>>; type DriveToVfs = HashMap<String, Arc<Mutex<Vfs>>>;
type IdentifierToVfsSerializable = HashMap<String, Vfs>; type DriveToVfsSerializable = HashMap<String, Vfs>;
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
struct Entry { struct Entry {
@ -120,8 +120,8 @@ fn make_error_message(
} }
} }
async fn state_to_bytes(state: &IdentifierToVfs) -> Vec<u8> { async fn state_to_bytes(state: &DriveToVfs) -> Vec<u8> {
let mut serializable: IdentifierToVfsSerializable = HashMap::new(); let mut serializable: DriveToVfsSerializable = HashMap::new();
for (id, vfs) in state.iter() { for (id, vfs) in state.iter() {
let vfs = vfs.lock().await; let vfs = vfs.lock().await;
serializable.insert(id.clone(), (*vfs).clone()); serializable.insert(id.clone(), (*vfs).clone());
@ -129,14 +129,14 @@ async fn state_to_bytes(state: &IdentifierToVfs) -> Vec<u8> {
bincode::serialize(&serializable).unwrap() bincode::serialize(&serializable).unwrap()
} }
fn bytes_to_state(bytes: &Vec<u8>, state: &mut IdentifierToVfs) { fn bytes_to_state(bytes: &Vec<u8>, state: &mut DriveToVfs) {
let serializable: IdentifierToVfsSerializable = bincode::deserialize(&bytes).unwrap(); let serializable: DriveToVfsSerializable = bincode::deserialize(&bytes).unwrap();
for (id, vfs) in serializable.into_iter() { for (id, vfs) in serializable.into_iter() {
state.insert(id, Arc::new(Mutex::new(vfs))); state.insert(id, Arc::new(Mutex::new(vfs)));
} }
} }
async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &IdentifierToVfs) { async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &DriveToVfs) {
let _ = send_to_loop let _ = send_to_loop
.send(KernelMessage { .send(KernelMessage {
id: rand::random(), id: rand::random(),
@ -169,13 +169,12 @@ async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &I
async fn load_state_from_reboot( async fn load_state_from_reboot(
our_node: String, our_node: String,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,
mut recv_from_loop: MessageReceiver, mut recv_from_loop: &mut MessageReceiver,
drive_to_vfs: &mut IdentifierToVfs, drive_to_vfs: &mut DriveToVfs,
id: u64, ) {
) -> bool {
let _ = send_to_loop let _ = send_to_loop
.send(KernelMessage { .send(KernelMessage {
id, id: rand::random(),
source: Address { source: Address {
node: our_node.clone(), node: our_node.clone(),
process: VFS_PROCESS_ID.clone(), process: VFS_PROCESS_ID.clone(),
@ -197,38 +196,47 @@ async fn load_state_from_reboot(
signed_capabilities: None, signed_capabilities: None,
}) })
.await; .await;
println!("vfs lsfr 1\r");
let km = recv_from_loop.recv().await; let km = recv_from_loop.recv().await;
println!("vfs lsfr 2\r");
let Some(km) = km else { let Some(km) = km else {
return false; return ();
}; };
let KernelMessage { let KernelMessage {
message, payload, .. message, payload, ..
} = km; } = km;
let Message::Response((Response { ipc, .. }, None)) = message else { let Message::Response((Response { ipc, .. }, None)) = message else {
return false; println!("vfs lsfr f0\r");
return ();
}; };
let Ok(Ok(FsResponse::GetState)) = let Ok(Ok(FsResponse::GetState)) =
serde_json::from_str::<Result<FsResponse, FsError>>(&ipc.unwrap_or_default()) serde_json::from_str::<Result<FsResponse, FsError>>(&ipc.unwrap_or_default())
else { else {
return false; println!("vfs lsfr f1\r");
return ();
}; };
let Some(payload) = payload else { let Some(payload) = payload else {
panic!(""); panic!("");
}; };
bytes_to_state(&payload.bytes, drive_to_vfs); let mut drive_to_vfs: DriveToVfs = HashMap::new();
bytes_to_state(&payload.bytes, &mut drive_to_vfs);
return true; println!("vfs lsfr 4\r");
} }
async fn noop_future() -> Option<DriveToVfs> { None }
pub async fn vfs( pub async fn vfs(
our_node: String, our_node: String,
send_to_loop: MessageSender, send_to_loop: MessageSender,
send_to_terminal: PrintSender, send_to_terminal: PrintSender,
mut recv_from_loop: MessageReceiver, mut recv_from_loop: MessageReceiver,
send_to_caps_oracle: CapMessageSender, send_to_caps_oracle: CapMessageSender,
vfs_messages: Vec<KernelMessage>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut drive_to_vfs: IdentifierToVfs = HashMap::new(); println!("vfs: begin\r");
let mut drive_to_vfs: DriveToVfs = HashMap::new();
let mut response_router: ResponseRouter = HashMap::new(); let mut response_router: ResponseRouter = HashMap::new();
let (send_vfs_task_done, mut recv_vfs_task_done): ( let (send_vfs_task_done, mut recv_vfs_task_done): (
tokio::sync::mpsc::Sender<u64>, tokio::sync::mpsc::Sender<u64>,
@ -239,35 +247,33 @@ pub async fn vfs(
tokio::sync::mpsc::Receiver<bool>, tokio::sync::mpsc::Receiver<bool>,
) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY); ) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY);
let (response_sender, response_receiver): (MessageSender, MessageReceiver) = load_state_from_reboot(
tokio::sync::mpsc::channel(VFS_RESPONSE_CHANNEL_CAPACITY);
let first_message_id = rand::random();
response_router.insert(first_message_id, response_sender);
let is_reboot = load_state_from_reboot(
our_node.clone(), our_node.clone(),
&send_to_loop, &send_to_loop,
response_receiver, &mut recv_from_loop,
&mut drive_to_vfs, &mut drive_to_vfs,
first_message_id, ).await;
)
.await; for vfs_message in vfs_messages {
if !is_reboot { send_to_loop.send(vfs_message).await.unwrap();
// initial boot
// build_state_for_initial_boot(&process_map, &mut drive_to_vfs);
send_persist_state.send(true).await.unwrap();
} }
println!("vfs entering loop\r");
loop { loop {
tokio::select! { tokio::select! {
id_done = recv_vfs_task_done.recv() => { id_done = recv_vfs_task_done.recv() => {
println!("vfs got\r");
let Some(id_done) = id_done else { continue }; let Some(id_done) = id_done else { continue };
response_router.remove(&id_done); response_router.remove(&id_done);
continue;
}, },
_ = recv_persist_state.recv() => { _ = recv_persist_state.recv() => {
println!("vfs got\r");
persist_state(our_node.clone(), &send_to_loop, &drive_to_vfs).await; persist_state(our_node.clone(), &send_to_loop, &drive_to_vfs).await;
continue; continue;
}, },
km = recv_from_loop.recv() => { km = recv_from_loop.recv() => {
println!("vfs got\r");
let Some(km) = km else { continue }; let Some(km) = km else { continue };
if let Some(response_sender) = response_router.remove(&km.id) { if let Some(response_sender) = response_router.remove(&km.id) {
response_sender.send(km).await.unwrap(); response_sender.send(km).await.unwrap();
@ -555,6 +561,7 @@ async fn match_request(
) -> Result<(Option<String>, Option<Vec<u8>>), VfsError> { ) -> Result<(Option<String>, Option<Vec<u8>>), VfsError> {
Ok(match request.action { Ok(match request.action {
VfsAction::New => { VfsAction::New => {
println!("vfs: got New\r");
for new_cap in new_caps { for new_cap in new_caps {
let _ = send_to_loop let _ = send_to_loop
.send(KernelMessage { .send(KernelMessage {
@ -586,6 +593,7 @@ async fn match_request(
.await; .await;
} }
send_to_persist.send(true).await.unwrap(); send_to_persist.send(true).await.unwrap();
println!("vfs: done w New\r");
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::Add { VfsAction::Add {