mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-21 15:41:47 +03:00
kernel: add snapshots
This commit is contained in:
parent
df13c524a8
commit
9b48bf10e8
@ -502,6 +502,7 @@ async fn main() {
|
||||
print_sender.clone(),
|
||||
state_receiver,
|
||||
db,
|
||||
home_directory_path.clone(),
|
||||
));
|
||||
tasks.spawn(http::server::http_server(
|
||||
our.name.clone(),
|
||||
|
37
src/state.rs
37
src/state.rs
@ -1,9 +1,13 @@
|
||||
use crate::filesystem::manifest::{FileIdentifier, Manifest};
|
||||
use crate::types::*;
|
||||
use anyhow::Result;
|
||||
use rocksdb::{ColumnFamilyDescriptor, Options, DB};
|
||||
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
|
||||
use rocksdb::checkpoint::Checkpoint;
|
||||
use rocksdb::{ColumnFamilyDescriptor, Options, DB, Env};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::{io::Read, sync::Arc};
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
|
||||
pub async fn load_state(
|
||||
@ -11,10 +15,10 @@ pub async fn load_state(
|
||||
home_directory_path: String,
|
||||
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
|
||||
) -> Result<(ProcessMap, DB, Vec<KernelMessage>), FsError> {
|
||||
let state_directory_path_str = format!("{}/kernel", &home_directory_path);
|
||||
let state_path = format!("{}/kernel", &home_directory_path);
|
||||
|
||||
if let Err(e) = fs::create_dir_all(&state_directory_path_str).await {
|
||||
panic!("failed creating fs dir! {:?}", e);
|
||||
if let Err(e) = fs::create_dir_all(&state_path).await {
|
||||
panic!("failed creating kernel state dir! {:?}", e);
|
||||
}
|
||||
|
||||
// more granular kernel_state in column families
|
||||
@ -26,7 +30,7 @@ pub async fn load_state(
|
||||
opts.create_if_missing(true);
|
||||
// let cf_name = "kernel_state";
|
||||
// let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default());
|
||||
let mut db = DB::open_default(state_directory_path_str).unwrap();
|
||||
let mut db = DB::open_default(state_path).unwrap();
|
||||
let mut process_map: ProcessMap = HashMap::new();
|
||||
|
||||
// kernel hash?
|
||||
@ -41,7 +45,7 @@ pub async fn load_state(
|
||||
.expect("fresh bootstrap failed!"),
|
||||
Err(e) => panic!("operational problem encountered: {}", e),
|
||||
};
|
||||
|
||||
println!("booted process map: {:?}", process_map);
|
||||
Ok((process_map, db, vfs_messages))
|
||||
}
|
||||
|
||||
@ -51,6 +55,7 @@ pub async fn state_sender(
|
||||
send_to_terminal: PrintSender,
|
||||
mut recv_state: MessageReceiver,
|
||||
db: DB,
|
||||
home_directory_path: String,
|
||||
// mut recv_kill: Receiver<()>,
|
||||
// send_kill_confirm: Sender<()>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
@ -67,11 +72,11 @@ pub async fn state_sender(
|
||||
);
|
||||
continue;
|
||||
}
|
||||
println!("hello");
|
||||
let db_clone = db.clone();
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let send_to_terminal = send_to_terminal.clone();
|
||||
let our_name = our_name.clone();
|
||||
let home_directory_path = home_directory_path.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
@ -81,6 +86,7 @@ pub async fn state_sender(
|
||||
db_clone,
|
||||
send_to_loop.clone(),
|
||||
send_to_terminal,
|
||||
home_directory_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -96,7 +102,6 @@ pub async fn state_sender(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
@ -105,6 +110,7 @@ async fn handle_request(
|
||||
db: Arc<DB>,
|
||||
send_to_loop: MessageSender,
|
||||
_send_to_terminal: PrintSender,
|
||||
home_directory_path: String,
|
||||
) -> Result<(), FsError> {
|
||||
let KernelMessage {
|
||||
id,
|
||||
@ -140,7 +146,6 @@ async fn handle_request(
|
||||
let key = handle.to_le_bytes();
|
||||
match db.get(key) {
|
||||
Ok(Some(value)) => {
|
||||
println!("found value wasm");
|
||||
(StateResponse::Read(handle), Some(value))
|
||||
}
|
||||
Ok(None) => {
|
||||
@ -204,6 +209,18 @@ async fn handle_request(
|
||||
}
|
||||
}
|
||||
}
|
||||
StateAction::Backup => {
|
||||
// handle Backup action
|
||||
println!("got backup");
|
||||
let checkpoint_dir = format!("{}/kernel/checkpoint", &home_directory_path);
|
||||
|
||||
if Path::new(&checkpoint_dir).exists() {
|
||||
let _ = fs::remove_dir_all(&checkpoint_dir).await;
|
||||
}
|
||||
let checkpoint = Checkpoint::new(&db).unwrap();
|
||||
checkpoint.create_checkpoint(&checkpoint_dir).unwrap();
|
||||
(StateResponse::Backup, None)
|
||||
}
|
||||
};
|
||||
|
||||
if expects_response.is_some() {
|
||||
|
@ -901,6 +901,7 @@ pub enum StateAction {
|
||||
GetState(ProcessId),
|
||||
SetState(ProcessId),
|
||||
DeleteState(ProcessId),
|
||||
Backup,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
@ -909,6 +910,7 @@ pub enum StateResponse {
|
||||
GetState,
|
||||
SetState,
|
||||
DeleteState,
|
||||
Backup,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
Loading…
Reference in New Issue
Block a user