fix persistence

This commit is contained in:
hosted-fornet 2023-10-12 16:52:53 -07:00
parent 46fec4c6f7
commit aa0cdf084a

View File

@ -139,10 +139,15 @@ fn bytes_to_state(bytes: &Vec<u8>, state: &mut DriveToVfs) {
} }
} }
async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &DriveToVfs) { async fn send_persist_state_message(
our_node: String,
send_to_loop: MessageSender,
id: u64,
state: Vec<u8>,
) {
let _ = send_to_loop let _ = send_to_loop
.send(KernelMessage { .send(KernelMessage {
id: rand::random(), id,
source: Address { source: Address {
node: our_node.clone(), node: our_node.clone(),
process: VFS_PROCESS_ID.clone(), process: VFS_PROCESS_ID.clone(),
@ -162,13 +167,34 @@ async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &D
}), }),
payload: Some(Payload { payload: Some(Payload {
mime: None, mime: None,
bytes: state_to_bytes(state).await, bytes: state,
}), }),
signed_capabilities: None, signed_capabilities: None,
}) })
.await; .await;
} }
async fn persist_state(
send_to_persist: &tokio::sync::mpsc::Sender<u64>,
recv_response: &mut MessageReceiver,
id: u64,
) {
send_to_persist.send(id).await.unwrap();
let persist_response = recv_response.recv().await.unwrap();
let KernelMessage { message, .. } = persist_response;
let Message::Response((Response { ipc, .. }, None)) = message else {
panic!("");
};
let Some(ipc) = ipc else {
panic!("");
};
let Ok(FsResponse::SetState) =
serde_json::from_str::<Result<FsResponse, FsError>>(&ipc).unwrap()
else {
panic!("");
};
}
async fn load_state_from_reboot( async fn load_state_from_reboot(
our_node: String, our_node: String,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,
@ -238,8 +264,8 @@ pub async fn vfs(
tokio::sync::mpsc::Receiver<u64>, tokio::sync::mpsc::Receiver<u64>,
) = tokio::sync::mpsc::channel(VFS_TASK_DONE_CHANNEL_CAPACITY); ) = tokio::sync::mpsc::channel(VFS_TASK_DONE_CHANNEL_CAPACITY);
let (send_persist_state, mut recv_persist_state): ( let (send_persist_state, mut recv_persist_state): (
tokio::sync::mpsc::Sender<bool>, tokio::sync::mpsc::Sender<u64>,
tokio::sync::mpsc::Receiver<bool>, tokio::sync::mpsc::Receiver<u64>,
) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY); ) = tokio::sync::mpsc::channel(VFS_PERSIST_STATE_CHANNEL_CAPACITY);
load_state_from_reboot( load_state_from_reboot(
@ -264,8 +290,19 @@ pub async fn vfs(
let Some(id_done) = id_done else { continue }; let Some(id_done) = id_done else { continue };
response_router.remove(&id_done); response_router.remove(&id_done);
}, },
_ = recv_persist_state.recv() => { respond_to_id = recv_persist_state.recv() => {
persist_state(our_node.clone(), &send_to_loop, &drive_to_vfs).await; let Some(respond_to_id) = respond_to_id else { continue };
let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let serialized_state = state_to_bytes(&drive_to_vfs).await;
tokio::spawn(
send_persist_state_message(
our_node.clone(),
send_to_loop,
respond_to_id,
serialized_state,
)
);
}, },
km = recv_from_loop.recv() => { km = recv_from_loop.recv() => {
let Some(km) = km else { let Some(km) = km else {
@ -365,7 +402,6 @@ pub async fn vfs(
( (
Arc::clone(drive_to_vfs.get(&request.drive).unwrap()), Arc::clone(drive_to_vfs.get(&request.drive).unwrap()),
// vec![],
vec![read_cap, write_cap], vec![read_cap, write_cap],
) )
} }
@ -415,6 +451,7 @@ pub async fn vfs(
} }
}, },
} }
let _ = send_vfs_task_done.send(id).await;
return (); return ();
}, },
Some((km, response_receiver)) => { Some((km, response_receiver)) => {
@ -474,7 +511,6 @@ pub async fn vfs(
}, },
Ok(_) => {}, Ok(_) => {},
} }
let _ = send_vfs_task_done.send(id).await;
}, },
} }
} }
@ -501,7 +537,7 @@ async fn handle_request(
new_caps: Vec<Capability>, new_caps: Vec<Capability>,
vfs: Arc<Mutex<Vfs>>, vfs: Arc<Mutex<Vfs>>,
send_to_loop: MessageSender, send_to_loop: MessageSender,
send_to_persist: tokio::sync::mpsc::Sender<bool>, send_to_persist: tokio::sync::mpsc::Sender<u64>,
send_to_terminal: PrintSender, send_to_terminal: PrintSender,
send_to_caps_oracle: CapMessageSender, send_to_caps_oracle: CapMessageSender,
recv_response: MessageReceiver, recv_response: MessageReceiver,
@ -623,7 +659,7 @@ async fn match_request(
new_caps: Vec<Capability>, new_caps: Vec<Capability>,
vfs: Arc<Mutex<Vfs>>, vfs: Arc<Mutex<Vfs>>,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,
send_to_persist: &tokio::sync::mpsc::Sender<bool>, send_to_persist: &tokio::sync::mpsc::Sender<u64>,
send_to_terminal: &PrintSender, send_to_terminal: &PrintSender,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
mut recv_response: MessageReceiver, mut recv_response: MessageReceiver,
@ -642,7 +678,7 @@ async fn match_request(
.unwrap(); .unwrap();
let _ = recv_cap_bool.await.unwrap(); let _ = recv_cap_bool.await.unwrap();
} }
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::Add { VfsAction::Add {
@ -996,7 +1032,7 @@ async fn match_request(
} }
} }
} }
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::Rename { VfsAction::Rename {
@ -1061,7 +1097,7 @@ async fn match_request(
vfs.key_to_entry.insert(key, entry); vfs.key_to_entry.insert(key, entry);
} }
} }
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::Delete(full_path) => { VfsAction::Delete(full_path) => {
@ -1135,7 +1171,7 @@ async fn match_request(
} }
} }
} }
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::WriteOffset { full_path, offset } => { VfsAction::WriteOffset { full_path, offset } => {
@ -1189,7 +1225,7 @@ async fn match_request(
else { else {
panic!(""); panic!("");
}; };
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::SetSize { full_path, size } => { VfsAction::SetSize { full_path, size } => {
@ -1245,7 +1281,7 @@ async fn match_request(
panic!(""); panic!("");
}; };
assert_eq!(size, length); assert_eq!(size, length);
send_to_persist.send(true).await.unwrap(); persist_state(send_to_persist, &mut recv_response, id).await;
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None) (Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
} }
VfsAction::GetPath(hash) => { VfsAction::GetPath(hash) => {