diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index 042d1669..3df9981d 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -454,8 +454,8 @@ async fn handle_kernel_response( .await; return; }; - // ignore responses that aren't filesystem responses - if km.source.process != *STATE_PROCESS_ID { + // ignore responses that aren't filesystem or state responses + if km.source.process != *STATE_PROCESS_ID && km.source.process != *VFS_PROCESS_ID { return; } let Some(ref metadata) = response.metadata else { diff --git a/src/main.rs b/src/main.rs index 7c973e0c..bc3cf14b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -354,7 +354,7 @@ async fn main() { true, )); - let (kernel_process_map, db, vfs_messages) = state::load_state( + let (kernel_process_map, db) = state::load_state( our.name.clone(), home_directory_path.clone(), runtime_extensions.clone(), diff --git a/src/state.rs b/src/state.rs index c9fcf9f1..9a552165 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,8 @@ use anyhow::Result; use rocksdb::checkpoint::Checkpoint; use rocksdb::{Options, DB}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::io::Read; use std::path::Path; use std::sync::Arc; use tokio::fs; @@ -12,7 +13,7 @@ pub async fn load_state( our_name: String, home_directory_path: String, runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, -) -> Result<(ProcessMap, DB, Vec), StateError> { +) -> Result<(ProcessMap, DB), StateError> { let state_path = format!("{}/kernel", &home_directory_path); if let Err(e) = fs::create_dir_all(&state_path).await { @@ -28,12 +29,31 @@ pub async fn load_state( opts.create_if_missing(true); // let cf_name = "kernel_state"; // let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default()); - let mut db = DB::open_default(state_path).unwrap(); + let db = DB::open_default(state_path).unwrap(); let mut process_map: ProcessMap = HashMap::new(); - let vfs_messages = vec![]; - println!("booted process map: {:?}", process_map); - Ok((process_map, db, vfs_messages)) + let kernel_id = KERNEL_PROCESS_ID.to_hash(); + match db.get(kernel_id) { + Ok(Some(value)) => { + process_map = bincode::deserialize::(&value).unwrap(); + } + Ok(None) => { + bootstrap( + &our_name, + home_directory_path.clone(), + runtime_extensions.clone(), + &mut process_map, + ).await.unwrap(); + + db.put(kernel_id, bincode::serialize(&process_map).unwrap()) + .unwrap(); + } + Err(e) => { + panic!("failed to load kernel state from db: {:?}", e); + } + } + + Ok((process_map, db)) } pub async fn state_sender( @@ -43,8 +63,6 @@ pub async fn state_sender( mut recv_state: MessageReceiver, db: DB, home_directory_path: String, - // mut recv_kill: Receiver<()>, - // send_kill_confirm: Sender<()>, ) -> Result<(), anyhow::Error> { let db = Arc::new(db); // into main loop @@ -140,14 +158,12 @@ async fn handle_request( let key = process_id.to_hash(); match db.get(key) { Ok(Some(value)) => { - println!("found value"); ( serde_json::to_vec(&StateResponse::GetState).unwrap(), Some(value), ) } Ok(None) => { - println!("nothing found"); return Err(StateError::NotFound { process_id: process_id.clone(), }); @@ -231,6 +247,291 @@ async fn handle_request( Ok(()) } +/// function run only upon fresh boot. +/// +/// for each folder in /modules, looks for a package.zip file, extracts the contents, +/// sends the contents to VFS, and reads the manifest.json. +/// +/// the manifest.json contains instructions for which processes to boot and what +/// capabilities to give them. since we are inside runtime, can spawn those out of +/// thin air. +async fn bootstrap( + our_name: &str, + home_directory_path: String, + runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, + process_map: &mut ProcessMap, +) -> Result<()> { + println!("bootstrapping node...\r"); + + let mut runtime_caps: HashSet = HashSet::new(); + // kernel is a special case + runtime_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: ProcessId::from_str("kernel:sys:uqbar").unwrap(), + }, + params: "\"messaging\"".into(), + }); + // net is a special case + runtime_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + params: "\"messaging\"".into(), + }); + for runtime_module in runtime_extensions.clone() { + runtime_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: runtime_module.0, + }, + params: "\"messaging\"".into(), + }); + } + // give all runtime processes the ability to send messages across the network + runtime_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: KERNEL_PROCESS_ID.clone(), + }, + params: "\"network\"".into(), + }); + + // finally, save runtime modules in state map as well, somewhat fakely + // special cases for kernel and net + process_map + .entry(ProcessId::from_str("kernel:sys:uqbar").unwrap()) + .or_insert(PersistedProcess { + wasm_bytes_handle: "".into(), + on_panic: OnPanic::Restart, + capabilities: runtime_caps.clone(), + public: false, + }); + process_map + .entry(ProcessId::from_str("net:sys:uqbar").unwrap()) + .or_insert(PersistedProcess { + wasm_bytes_handle: "".into(), + on_panic: OnPanic::Restart, + capabilities: runtime_caps.clone(), + public: false, + }); + for runtime_module in runtime_extensions { + process_map + .entry(runtime_module.0) + .or_insert(PersistedProcess { + wasm_bytes_handle: "".into(), + on_panic: OnPanic::Restart, + capabilities: runtime_caps.clone(), + public: runtime_module.2, + }); + } + + let distro_path = format!("{}/vfs/kernel:sys:uqbar/", &home_directory_path); + fs::create_dir_all(&distro_path).await.expect("bootstrap vfs dir creation failed!"); + + let packages: Vec<(String, zip::ZipArchive>>)> = + get_zipped_packages().await; + + for (package_name, mut package) in packages { + // special case tester: only load it in if in simulation mode + if package_name == "tester" { + #[cfg(not(feature = "simulation-mode"))] + continue; + #[cfg(feature = "simulation-mode")] + {} + } + + println!("fs: handling package {package_name}...\r"); + // get and read metadata.json + let Ok(mut package_metadata_zip) = package.by_name("metadata.json") else { + println!( + "fs: missing metadata for package {}, skipping", + package_name + ); + continue; + }; + let mut metadata_content = Vec::new(); + package_metadata_zip + .read_to_end(&mut metadata_content) + .unwrap(); + drop(package_metadata_zip); + let package_metadata: serde_json::Value = + serde_json::from_slice(&metadata_content).expect("fs: metadata parse error"); + + println!("fs: found package metadata: {:?}\r", package_metadata); + + let package_name = package_metadata["package"] + .as_str() + .expect("fs: metadata parse error: bad package name"); + + let package_publisher = package_metadata["publisher"] + .as_str() + .expect("fs: metadata parse error: bad publisher name"); + + // create a new package in VFS + let our_drive_name = [package_name, package_publisher].join(":"); + let drive_path = format!("/kernel:sys:uqbar/{}", &our_drive_name); + + let full_drive_path = format!("{}/{}", &distro_path, &our_drive_name); + fs::create_dir(&full_drive_path).await.expect("vfs dir creation failed!"); + + + // for each file in package.zip, recursively through all dirs, send a newfile KM to VFS + for i in 0..package.len() { + let mut file = package.by_index(i).unwrap(); + if file.is_file() { + let file_path = file + .enclosed_name() + .expect("fs: name error reading package.zip") + .to_owned(); + let mut file_path = file_path.to_string_lossy().to_string(); + if !file_path.starts_with('/') { + file_path = format!("/{}", file_path); + } + println!("fs: found file {}...\r", file_path); + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content).unwrap(); + let path = format!("{}/{}", &full_drive_path, file_path); + fs::write(&path, file_content).await.unwrap(); + } + } + + // get and read manifest.json + let Ok(mut package_manifest_zip) = package.by_name("manifest.json") else { + println!( + "fs: missing manifest for package {}, skipping", + package_name + ); + continue; + }; + let mut manifest_content = Vec::new(); + package_manifest_zip + .read_to_end(&mut manifest_content) + .unwrap(); + drop(package_manifest_zip); + let package_manifest = String::from_utf8(manifest_content)?; + let package_manifest = serde_json::from_str::>(&package_manifest) + .expect("fs: manifest parse error"); + + // for each process-entry in manifest.json: + for mut entry in package_manifest { + let wasm_bytes = &mut Vec::new(); + let mut file_path = entry.process_wasm_path.to_string(); + if file_path.starts_with('/') { + file_path = file_path[1..].to_string(); + } + package + .by_name(&file_path) + .expect("fs: no wasm found in package!") + .read_to_end(wasm_bytes) + .unwrap(); + + // spawn the requested capabilities + // remember: out of thin air, because this is the root distro + let mut requested_caps = HashSet::new(); + let our_process_id = format!( + "{}:{}:{}", + entry.process_name, package_name, package_publisher + ); + entry.request_messaging = Some(entry.request_messaging.unwrap_or_default()); + if let Some(ref mut request_messaging) = entry.request_messaging { + request_messaging.push(our_process_id.clone()); + for process_name in request_messaging { + requested_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: ProcessId::from_str(process_name).unwrap(), + }, + params: "\"messaging\"".into(), + }); + } + } + + if entry.request_networking { + requested_caps.insert(Capability { + issuer: Address { + node: our_name.to_string(), + process: KERNEL_PROCESS_ID.clone(), + }, + params: "\"network\"".into(), + }); + } + + // give access to package_name vfs + requested_caps.insert(Capability { + issuer: Address { + node: our_name.into(), + process: VFS_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "kind": "read", + "drive": drive_path, + })) + .unwrap(), + }); + requested_caps.insert(Capability { + issuer: Address { + node: our_name.into(), + process: VFS_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "kind": "write", + "drive": drive_path, + })) + .unwrap(), + }); + + let public_process = entry.public; + + let wasm_bytes_handle = format!( + "{}/{}", + &drive_path, &file_path + ); + + process_map.insert( + ProcessId::new(Some(&entry.process_name), package_name, package_publisher), + PersistedProcess { + wasm_bytes_handle, + on_panic: entry.on_panic, + capabilities: requested_caps, + public: public_process, + }, + ); + } + } + Ok(()) +} + +/// go into /target folder and get all .zip package files +async fn get_zipped_packages() -> Vec<(String, zip::ZipArchive>>)> { + println!("fs: reading distro packages...\r"); + let target_path = std::path::Path::new("target"); + + let mut packages = Vec::new(); + + if let Ok(mut entries) = fs::read_dir(target_path).await { + while let Ok(Some(entry)) = entries.next_entry().await { + if entry.file_name().to_string_lossy().ends_with(".zip") { + let package_name = entry + .file_name() + .to_string_lossy() + .trim_end_matches(".zip") + .to_string(); + if let Ok(bytes) = fs::read(entry.path()).await { + if let Ok(zip) = zip::ZipArchive::new(std::io::Cursor::new(bytes)) { + // add to list of packages + println!("fs: found package: {}\r", package_name); + packages.push((package_name, zip)); + } + } + } + } + } + + packages +} + fn make_error_message(our_name: String, km: &KernelMessage, error: StateError) -> KernelMessage { KernelMessage { id: km.id, diff --git a/src/vfs.rs b/src/vfs.rs index 0c71ab17..38898aec 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -133,20 +133,24 @@ async fn handle_request( // sort by package_id instead? pros/cons? // current prepend to filepaths needs to be: /process_id/drive/path - let (process_id, drive) = parse_process_and_drive(&request.path).await?; + let (process_id, drive, rest) = parse_process_and_drive(&request.path).await?; let drive = format!("/{}/{}", process_id, drive); let path = PathBuf::from(request.path.clone()); // validate - check_caps( - our_node.clone(), - source.clone(), - send_to_caps_oracle.clone(), - &request, - path.clone(), - drive, - vfs_path.clone(), - ) - .await?; + if km.source.process != *KERNEL_PROCESS_ID { + check_caps( + our_node.clone(), + source.clone(), + send_to_caps_oracle.clone(), + &request, + path.clone(), + drive.clone(), + vfs_path.clone(), + ) + .await?; + } + + let path = PathBuf::from(format!("{}{}/{}", vfs_path, drive, rest)); let (ipc, bytes) = match request.action { VfsAction::CreateDrive => { @@ -438,7 +442,7 @@ async fn handle_request( Ok(()) } -async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsError> { +async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String, String), VfsError> { if !path.starts_with('/') { return Err(VfsError::ParseError { error: "path does not start with /".into(), @@ -467,7 +471,9 @@ async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsE } }; - Ok((process_id, drive)) + let remaining_path = parts[3..].join("/"); + + Ok((process_id, drive, remaining_path)) } async fn open_file>(