diff --git a/src/filesystem/mod.rs b/src/filesystem/mod.rs index 03914e9e..eb70d5d1 100644 --- a/src/filesystem/mod.rs +++ b/src/filesystem/mod.rs @@ -16,8 +16,7 @@ pub async fn load_fs( home_directory_path: String, file_key: Vec, fs_config: FsConfig, - vfs_message_sender: MessageSender, -) -> Result<(ProcessMap, Manifest), FsError> { +) -> Result<(ProcessMap, Manifest, Vec), FsError> { // load/create fs directory, manifest + log if none. let fs_directory_path_str = format!("{}/fs", &home_directory_path); @@ -66,7 +65,7 @@ pub async fn load_fs( let mut process_map: ProcessMap = HashMap::new(); // get current processes' wasm_bytes handles. GetState(kernel) - match manifest.read(&kernel_process_id, None, None).await { + let vfs_messages = match manifest.read(&kernel_process_id, None, None).await { Err(_) => { // bootstrap filesystem bootstrap( @@ -74,17 +73,17 @@ pub async fn load_fs( &kernel_process_id, &mut process_map, &mut manifest, - &vfs_message_sender, ) .await - .expect("fresh bootstrap failed!"); + .expect("fresh bootstrap failed!") } Ok(bytes) => { process_map = bincode::deserialize(&bytes).expect("state map deserialization error!"); + vec![] } - } + }; - Ok((process_map, manifest)) + Ok((process_map, manifest, vfs_messages)) } /// function run only upon fresh boot. @@ -100,8 +99,7 @@ async fn bootstrap( kernel_process_id: &FileIdentifier, process_map: &mut ProcessMap, manifest: &mut Manifest, - vfs_message_sender: &MessageSender, -) -> Result<()> { +) -> Result> { println!("bootstrapping node...\r"); const RUNTIME_MODULES: [(&str, bool); 8] = [ ("filesystem:sys:uqbar", false), @@ -109,7 +107,7 @@ async fn bootstrap( ("http_client:sys:uqbar", false), ("encryptor:sys:uqbar", false), ("net:sys:uqbar", false), - ("vfs:sys:uqbar", false), + ("vfs:sys:uqbar", true), ("kernel:sys:uqbar", false), ("eth_rpc:sys:uqbar", true), // TODO evaluate ]; @@ -144,6 +142,7 @@ async fn bootstrap( public: runtime_module.1, }); } + println!("fs bs: runtime process_map {:#?}\r", process_map); let packages: Vec<(String, zip::ZipArchive>>)> = get_zipped_packages().await; @@ -151,38 +150,37 @@ async fn bootstrap( // need to grant all caps at the end, after process_map has been filled in! let mut caps_to_grant = Vec::<(ProcessId, Capability)>::new(); + let mut vfs_messages = Vec::new(); + for (package_name, mut package) in packages { println!("fs: handling package {package_name}...\r"); // create a new package in VFS - vfs_message_sender - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our_name.to_string(), - process: FILESYSTEM_PROCESS_ID.clone(), - }, - target: Address { - node: our_name.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: Some( - serde_json::to_string::(&VfsRequest { - drive: package_name.clone(), - action: VfsAction::New, - }) - .unwrap(), - ), - metadata: None, - }), - payload: None, - signed_capabilities: None, - }) - .await - .unwrap(); + vfs_messages.push(KernelMessage { + id: rand::random(), + source: Address { + node: our_name.to_string(), + process: FILESYSTEM_PROCESS_ID.clone(), + }, + target: Address { + node: our_name.to_string(), + process: VFS_PROCESS_ID.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: Some( + serde_json::to_string::(&VfsRequest { + drive: package_name.clone(), + action: VfsAction::New, + }) + .unwrap(), + ), + metadata: None, + }), + payload: None, + signed_capabilities: None, + }); // for each file in package.zip, recursively through all dirs, send a newfile KM to VFS for i in 0..package.len() { let mut file = package.by_index(i).unwrap(); @@ -198,41 +196,38 @@ async fn bootstrap( println!("fs: found file {}...\r", file_path); let mut file_content = Vec::new(); file.read_to_end(&mut file_content).unwrap(); - vfs_message_sender - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our_name.to_string(), - process: FILESYSTEM_PROCESS_ID.clone(), - }, - target: Address { - node: our_name.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: Some( - serde_json::to_string::(&VfsRequest { - drive: package_name.clone(), - action: VfsAction::Add { - full_path: file_path, - entry_type: AddEntryType::NewFile, - }, - }) - .unwrap(), - ), - metadata: None, - }), - payload: Some(Payload { - mime: None, - bytes: file_content, - }), - signed_capabilities: None, - }) - .await - .unwrap(); + vfs_messages.push(KernelMessage { + id: rand::random(), + source: Address { + node: our_name.to_string(), + process: FILESYSTEM_PROCESS_ID.clone(), + }, + target: Address { + node: our_name.to_string(), + process: VFS_PROCESS_ID.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: Some( + serde_json::to_string::(&VfsRequest { + drive: package_name.clone(), + action: VfsAction::Add { + full_path: file_path, + entry_type: AddEntryType::NewFile, + }, + }) + .unwrap(), + ), + metadata: None, + }), + payload: Some(Payload { + mime: None, + bytes: file_content, + }), + signed_capabilities: None, + }); } } @@ -376,7 +371,7 @@ async fn bootstrap( .write(&kernel_process_id, &serialized_process_map) .await; } - Ok(()) + Ok(vfs_messages) } /// go into /target folder and get all .zip package files diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index ad9728f4..c2f982c0 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -1751,7 +1751,7 @@ async fn make_event_loop( ); senders.insert( t::ProcessId::new(Some("vfs"), "sys", "uqbar"), - ProcessSender::Runtime(send_to_vfs.clone()), + ProcessSender::Runtime(send_to_vfs), ); // each running process is stored in this map @@ -2009,10 +2009,12 @@ async fn make_event_loop( } match senders.get(&kernel_message.target.process) { Some(ProcessSender::Userspace(sender)) => { + println!("el: sending to {}\r", kernel_message.target.process); // TODO: should this failing should crash kernel? probably not sender.send(Ok(kernel_message)).await.unwrap(); } Some(ProcessSender::Runtime(sender)) => { + println!("el: sending to {}\r", kernel_message.target.process); sender.send(kernel_message).await.expect("fatal: runtime module died"); } None => { diff --git a/src/main.rs b/src/main.rs index 84d21b0a..43b91bac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -373,12 +373,11 @@ async fn main() { ) }; - let (kernel_process_map, manifest) = filesystem::load_fs( + let (kernel_process_map, manifest, vfs_messages) = filesystem::load_fs( our.name.clone(), home_directory_path.clone(), file_key, fs_config, - vfs_message_sender.clone(), ) .await .expect("fs load failed!"); @@ -466,6 +465,7 @@ async fn main() { print_sender.clone(), vfs_message_receiver, caps_oracle_sender.clone(), + vfs_messages, )); tasks.spawn(encryptor::encryptor( our.name.clone(), diff --git a/src/vfs.rs b/src/vfs.rs index 126ef674..2457e8eb 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -25,8 +25,8 @@ struct Vfs { key_to_entry: KeyToEntry, path_to_key: PathToKey, } -type IdentifierToVfs = HashMap>>; -type IdentifierToVfsSerializable = HashMap; +type DriveToVfs = HashMap>>; +type DriveToVfsSerializable = HashMap; #[derive(Clone, Debug, Deserialize, Serialize)] struct Entry { @@ -120,8 +120,8 @@ fn make_error_message( } } -async fn state_to_bytes(state: &IdentifierToVfs) -> Vec { - let mut serializable: IdentifierToVfsSerializable = HashMap::new(); +async fn state_to_bytes(state: &DriveToVfs) -> Vec { + let mut serializable: DriveToVfsSerializable = HashMap::new(); for (id, vfs) in state.iter() { let vfs = vfs.lock().await; serializable.insert(id.clone(), (*vfs).clone()); @@ -129,14 +129,14 @@ async fn state_to_bytes(state: &IdentifierToVfs) -> Vec { bincode::serialize(&serializable).unwrap() } -fn bytes_to_state(bytes: &Vec, state: &mut IdentifierToVfs) { - let serializable: IdentifierToVfsSerializable = bincode::deserialize(&bytes).unwrap(); +fn bytes_to_state(bytes: &Vec, state: &mut DriveToVfs) { + let serializable: DriveToVfsSerializable = bincode::deserialize(&bytes).unwrap(); for (id, vfs) in serializable.into_iter() { state.insert(id, Arc::new(Mutex::new(vfs))); } } -async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &IdentifierToVfs) { +async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &DriveToVfs) { let _ = send_to_loop .send(KernelMessage { id: rand::random(), @@ -169,13 +169,12 @@ async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &I async fn load_state_from_reboot( our_node: String, send_to_loop: &MessageSender, - mut recv_from_loop: MessageReceiver, - drive_to_vfs: &mut IdentifierToVfs, - id: u64, -) -> bool { + mut recv_from_loop: &mut MessageReceiver, + drive_to_vfs: &mut DriveToVfs, +) { let _ = send_to_loop .send(KernelMessage { - id, + id: rand::random(), source: Address { node: our_node.clone(), process: VFS_PROCESS_ID.clone(), @@ -197,38 +196,47 @@ async fn load_state_from_reboot( signed_capabilities: None, }) .await; + println!("vfs lsfr 1\r"); let km = recv_from_loop.recv().await; + println!("vfs lsfr 2\r"); let Some(km) = km else { - return false; + return (); }; let KernelMessage { message, payload, .. } = km; let Message::Response((Response { ipc, .. }, None)) = message else { - return false; + println!("vfs lsfr f0\r"); + return (); }; let Ok(Ok(FsResponse::GetState)) = serde_json::from_str::>(&ipc.unwrap_or_default()) else { - return false; + println!("vfs lsfr f1\r"); + return (); }; let Some(payload) = payload else { panic!(""); }; - bytes_to_state(&payload.bytes, drive_to_vfs); + let mut drive_to_vfs: DriveToVfs = HashMap::new(); + bytes_to_state(&payload.bytes, &mut drive_to_vfs); - return true; + println!("vfs lsfr 4\r"); } +async fn noop_future() -> Option { None } + pub async fn vfs( our_node: String, send_to_loop: MessageSender, send_to_terminal: PrintSender, mut recv_from_loop: MessageReceiver, send_to_caps_oracle: CapMessageSender, + vfs_messages: Vec, ) -> anyhow::Result<()> { - let mut drive_to_vfs: IdentifierToVfs = HashMap::new(); + println!("vfs: begin\r"); + let mut drive_to_vfs: DriveToVfs = HashMap::new(); let mut response_router: ResponseRouter = HashMap::new(); let (send_vfs_task_done, mut recv_vfs_task_done): ( tokio::sync::mpsc::Sender, @@ -239,35 +247,33 @@ pub async fn vfs( tokio::sync::mpsc::Receiver, ) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY); - let (response_sender, response_receiver): (MessageSender, MessageReceiver) = - tokio::sync::mpsc::channel(VFS_RESPONSE_CHANNEL_CAPACITY); - let first_message_id = rand::random(); - response_router.insert(first_message_id, response_sender); - let is_reboot = load_state_from_reboot( + load_state_from_reboot( our_node.clone(), &send_to_loop, - response_receiver, + &mut recv_from_loop, &mut drive_to_vfs, - first_message_id, - ) - .await; - if !is_reboot { - // initial boot - // build_state_for_initial_boot(&process_map, &mut drive_to_vfs); - send_persist_state.send(true).await.unwrap(); + ).await; + + for vfs_message in vfs_messages { + send_to_loop.send(vfs_message).await.unwrap(); } + println!("vfs entering loop\r"); loop { tokio::select! { id_done = recv_vfs_task_done.recv() => { + println!("vfs got\r"); let Some(id_done) = id_done else { continue }; response_router.remove(&id_done); + continue; }, _ = recv_persist_state.recv() => { + println!("vfs got\r"); persist_state(our_node.clone(), &send_to_loop, &drive_to_vfs).await; continue; }, km = recv_from_loop.recv() => { + println!("vfs got\r"); let Some(km) = km else { continue }; if let Some(response_sender) = response_router.remove(&km.id) { response_sender.send(km).await.unwrap(); @@ -555,6 +561,7 @@ async fn match_request( ) -> Result<(Option, Option>), VfsError> { Ok(match request.action { VfsAction::New => { + println!("vfs: got New\r"); for new_cap in new_caps { let _ = send_to_loop .send(KernelMessage { @@ -586,6 +593,7 @@ async fn match_request( .await; } send_to_persist.send(true).await.unwrap(); + println!("vfs: done w New\r"); (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::Add {