diff --git a/src/vfs.rs b/src/vfs.rs index 64dff321..ce1c54c4 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -139,10 +139,15 @@ fn bytes_to_state(bytes: &Vec, state: &mut DriveToVfs) { } } -async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &DriveToVfs) { +async fn send_persist_state_message( + our_node: String, + send_to_loop: MessageSender, + id: u64, + state: Vec, +) { let _ = send_to_loop .send(KernelMessage { - id: rand::random(), + id, source: Address { node: our_node.clone(), process: VFS_PROCESS_ID.clone(), @@ -162,13 +167,34 @@ async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &D }), payload: Some(Payload { mime: None, - bytes: state_to_bytes(state).await, + bytes: state, }), signed_capabilities: None, }) .await; } +async fn persist_state( + send_to_persist: &tokio::sync::mpsc::Sender, + recv_response: &mut MessageReceiver, + id: u64, +) { + send_to_persist.send(id).await.unwrap(); + let persist_response = recv_response.recv().await.unwrap(); + let KernelMessage { message, .. } = persist_response; + let Message::Response((Response { ipc, .. }, None)) = message else { + panic!(""); + }; + let Some(ipc) = ipc else { + panic!(""); + }; + let Ok(FsResponse::SetState) = + serde_json::from_str::>(&ipc).unwrap() + else { + panic!(""); + }; +} + async fn load_state_from_reboot( our_node: String, send_to_loop: &MessageSender, @@ -238,8 +264,8 @@ pub async fn vfs( tokio::sync::mpsc::Receiver, ) = tokio::sync::mpsc::channel(VFS_TASK_DONE_CHANNEL_CAPACITY); let (send_persist_state, mut recv_persist_state): ( - tokio::sync::mpsc::Sender, - tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, ) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY); load_state_from_reboot( @@ -264,8 +290,19 @@ pub async fn vfs( let Some(id_done) = id_done else { continue }; response_router.remove(&id_done); }, - _ = recv_persist_state.recv() => { - persist_state(our_node.clone(), &send_to_loop, &drive_to_vfs).await; + respond_to_id = recv_persist_state.recv() => { + let Some(respond_to_id) = respond_to_id else { continue }; + let our_node = our_node.clone(); + let send_to_loop = send_to_loop.clone(); + let serialized_state = state_to_bytes(&drive_to_vfs).await; + tokio::spawn( + send_persist_state_message( + our_node.clone(), + send_to_loop, + respond_to_id, + serialized_state, + ) + ); }, km = recv_from_loop.recv() => { let Some(km) = km else { @@ -365,7 +402,6 @@ pub async fn vfs( ( Arc::clone(drive_to_vfs.get(&request.drive).unwrap()), - // vec![], vec![read_cap, write_cap], ) } @@ -415,6 +451,7 @@ pub async fn vfs( } }, } + let _ = send_vfs_task_done.send(id).await; return (); }, Some((km, response_receiver)) => { @@ -474,7 +511,6 @@ pub async fn vfs( }, Ok(_) => {}, } - let _ = send_vfs_task_done.send(id).await; }, } } @@ -501,7 +537,7 @@ async fn handle_request( new_caps: Vec, vfs: Arc>, send_to_loop: MessageSender, - send_to_persist: tokio::sync::mpsc::Sender, + send_to_persist: tokio::sync::mpsc::Sender, send_to_terminal: PrintSender, send_to_caps_oracle: CapMessageSender, recv_response: MessageReceiver, @@ -623,7 +659,7 @@ async fn match_request( new_caps: Vec, vfs: Arc>, send_to_loop: &MessageSender, - send_to_persist: &tokio::sync::mpsc::Sender, + send_to_persist: &tokio::sync::mpsc::Sender, send_to_terminal: &PrintSender, send_to_caps_oracle: &CapMessageSender, mut recv_response: MessageReceiver, @@ -642,7 +678,7 @@ async fn match_request( .unwrap(); let _ = recv_cap_bool.await.unwrap(); } - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::Add { @@ -996,7 +1032,7 @@ async fn match_request( } } } - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::Rename { @@ -1061,7 +1097,7 @@ async fn match_request( vfs.key_to_entry.insert(key, entry); } } - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::Delete(full_path) => { @@ -1135,7 +1171,7 @@ async fn match_request( } } } - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::WriteOffset { full_path, offset } => { @@ -1189,7 +1225,7 @@ async fn match_request( else { panic!(""); }; - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::SetSize { full_path, size } => { @@ -1245,7 +1281,7 @@ async fn match_request( panic!(""); }; assert_eq!(size, length); - send_to_persist.send(true).await.unwrap(); + persist_state(send_to_persist, &mut recv_response, id).await; (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) } VfsAction::GetPath(hash) => {