vfs: bootstrap directly onto disk

This commit is contained in:
bitful-pannul 2023-12-13 23:19:11 -03:00
parent 70b86f29be
commit 46057abcf4
4 changed files with 333 additions and 26 deletions

View File

@ -454,8 +454,8 @@ async fn handle_kernel_response(
.await;
return;
};
// ignore responses that aren't filesystem responses
if km.source.process != *STATE_PROCESS_ID {
// ignore responses that aren't filesystem or state responses
if km.source.process != *STATE_PROCESS_ID && km.source.process != *VFS_PROCESS_ID {
return;
}
let Some(ref metadata) = response.metadata else {

View File

@ -354,7 +354,7 @@ async fn main() {
true,
));
let (kernel_process_map, db, vfs_messages) = state::load_state(
let (kernel_process_map, db) = state::load_state(
our.name.clone(),
home_directory_path.clone(),
runtime_extensions.clone(),

View File

@ -1,7 +1,8 @@
use anyhow::Result;
use rocksdb::checkpoint::Checkpoint;
use rocksdb::{Options, DB};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use tokio::fs;
@ -12,7 +13,7 @@ pub async fn load_state(
our_name: String,
home_directory_path: String,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
) -> Result<(ProcessMap, DB, Vec<KernelMessage>), StateError> {
) -> Result<(ProcessMap, DB), StateError> {
let state_path = format!("{}/kernel", &home_directory_path);
if let Err(e) = fs::create_dir_all(&state_path).await {
@ -28,12 +29,31 @@ pub async fn load_state(
opts.create_if_missing(true);
// let cf_name = "kernel_state";
// let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default());
let mut db = DB::open_default(state_path).unwrap();
let db = DB::open_default(state_path).unwrap();
let mut process_map: ProcessMap = HashMap::new();
let vfs_messages = vec![];
println!("booted process map: {:?}", process_map);
Ok((process_map, db, vfs_messages))
let kernel_id = KERNEL_PROCESS_ID.to_hash();
match db.get(kernel_id) {
Ok(Some(value)) => {
process_map = bincode::deserialize::<ProcessMap>(&value).unwrap();
}
Ok(None) => {
bootstrap(
&our_name,
home_directory_path.clone(),
runtime_extensions.clone(),
&mut process_map,
).await.unwrap();
db.put(kernel_id, bincode::serialize(&process_map).unwrap())
.unwrap();
}
Err(e) => {
panic!("failed to load kernel state from db: {:?}", e);
}
}
Ok((process_map, db))
}
pub async fn state_sender(
@ -43,8 +63,6 @@ pub async fn state_sender(
mut recv_state: MessageReceiver,
db: DB,
home_directory_path: String,
// mut recv_kill: Receiver<()>,
// send_kill_confirm: Sender<()>,
) -> Result<(), anyhow::Error> {
let db = Arc::new(db);
// into main loop
@ -140,14 +158,12 @@ async fn handle_request(
let key = process_id.to_hash();
match db.get(key) {
Ok(Some(value)) => {
println!("found value");
(
serde_json::to_vec(&StateResponse::GetState).unwrap(),
Some(value),
)
}
Ok(None) => {
println!("nothing found");
return Err(StateError::NotFound {
process_id: process_id.clone(),
});
@ -231,6 +247,291 @@ async fn handle_request(
Ok(())
}
/// function run only upon fresh boot.
///
/// for each folder in /modules, looks for a package.zip file, extracts the contents,
/// sends the contents to VFS, and reads the manifest.json.
///
/// the manifest.json contains instructions for which processes to boot and what
/// capabilities to give them. since we are inside runtime, can spawn those out of
/// thin air.
async fn bootstrap(
our_name: &str,
home_directory_path: String,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
process_map: &mut ProcessMap,
) -> Result<()> {
println!("bootstrapping node...\r");
let mut runtime_caps: HashSet<Capability> = HashSet::new();
// kernel is a special case
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str("kernel:sys:uqbar").unwrap(),
},
params: "\"messaging\"".into(),
});
// net is a special case
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
params: "\"messaging\"".into(),
});
for runtime_module in runtime_extensions.clone() {
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: runtime_module.0,
},
params: "\"messaging\"".into(),
});
}
// give all runtime processes the ability to send messages across the network
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: KERNEL_PROCESS_ID.clone(),
},
params: "\"network\"".into(),
});
// finally, save runtime modules in state map as well, somewhat fakely
// special cases for kernel and net
process_map
.entry(ProcessId::from_str("kernel:sys:uqbar").unwrap())
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: false,
});
process_map
.entry(ProcessId::from_str("net:sys:uqbar").unwrap())
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: false,
});
for runtime_module in runtime_extensions {
process_map
.entry(runtime_module.0)
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: runtime_module.2,
});
}
let distro_path = format!("{}/vfs/kernel:sys:uqbar/", &home_directory_path);
fs::create_dir_all(&distro_path).await.expect("bootstrap vfs dir creation failed!");
let packages: Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> =
get_zipped_packages().await;
for (package_name, mut package) in packages {
// special case tester: only load it in if in simulation mode
if package_name == "tester" {
#[cfg(not(feature = "simulation-mode"))]
continue;
#[cfg(feature = "simulation-mode")]
{}
}
println!("fs: handling package {package_name}...\r");
// get and read metadata.json
let Ok(mut package_metadata_zip) = package.by_name("metadata.json") else {
println!(
"fs: missing metadata for package {}, skipping",
package_name
);
continue;
};
let mut metadata_content = Vec::new();
package_metadata_zip
.read_to_end(&mut metadata_content)
.unwrap();
drop(package_metadata_zip);
let package_metadata: serde_json::Value =
serde_json::from_slice(&metadata_content).expect("fs: metadata parse error");
println!("fs: found package metadata: {:?}\r", package_metadata);
let package_name = package_metadata["package"]
.as_str()
.expect("fs: metadata parse error: bad package name");
let package_publisher = package_metadata["publisher"]
.as_str()
.expect("fs: metadata parse error: bad publisher name");
// create a new package in VFS
let our_drive_name = [package_name, package_publisher].join(":");
let drive_path = format!("/kernel:sys:uqbar/{}", &our_drive_name);
let full_drive_path = format!("{}/{}", &distro_path, &our_drive_name);
fs::create_dir(&full_drive_path).await.expect("vfs dir creation failed!");
// for each file in package.zip, recursively through all dirs, send a newfile KM to VFS
for i in 0..package.len() {
let mut file = package.by_index(i).unwrap();
if file.is_file() {
let file_path = file
.enclosed_name()
.expect("fs: name error reading package.zip")
.to_owned();
let mut file_path = file_path.to_string_lossy().to_string();
if !file_path.starts_with('/') {
file_path = format!("/{}", file_path);
}
println!("fs: found file {}...\r", file_path);
let mut file_content = Vec::new();
file.read_to_end(&mut file_content).unwrap();
let path = format!("{}/{}", &full_drive_path, file_path);
fs::write(&path, file_content).await.unwrap();
}
}
// get and read manifest.json
let Ok(mut package_manifest_zip) = package.by_name("manifest.json") else {
println!(
"fs: missing manifest for package {}, skipping",
package_name
);
continue;
};
let mut manifest_content = Vec::new();
package_manifest_zip
.read_to_end(&mut manifest_content)
.unwrap();
drop(package_manifest_zip);
let package_manifest = String::from_utf8(manifest_content)?;
let package_manifest = serde_json::from_str::<Vec<PackageManifestEntry>>(&package_manifest)
.expect("fs: manifest parse error");
// for each process-entry in manifest.json:
for mut entry in package_manifest {
let wasm_bytes = &mut Vec::new();
let mut file_path = entry.process_wasm_path.to_string();
if file_path.starts_with('/') {
file_path = file_path[1..].to_string();
}
package
.by_name(&file_path)
.expect("fs: no wasm found in package!")
.read_to_end(wasm_bytes)
.unwrap();
// spawn the requested capabilities
// remember: out of thin air, because this is the root distro
let mut requested_caps = HashSet::new();
let our_process_id = format!(
"{}:{}:{}",
entry.process_name, package_name, package_publisher
);
entry.request_messaging = Some(entry.request_messaging.unwrap_or_default());
if let Some(ref mut request_messaging) = entry.request_messaging {
request_messaging.push(our_process_id.clone());
for process_name in request_messaging {
requested_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str(process_name).unwrap(),
},
params: "\"messaging\"".into(),
});
}
}
if entry.request_networking {
requested_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: KERNEL_PROCESS_ID.clone(),
},
params: "\"network\"".into(),
});
}
// give access to package_name vfs
requested_caps.insert(Capability {
issuer: Address {
node: our_name.into(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": drive_path,
}))
.unwrap(),
});
requested_caps.insert(Capability {
issuer: Address {
node: our_name.into(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": drive_path,
}))
.unwrap(),
});
let public_process = entry.public;
let wasm_bytes_handle = format!(
"{}/{}",
&drive_path, &file_path
);
process_map.insert(
ProcessId::new(Some(&entry.process_name), package_name, package_publisher),
PersistedProcess {
wasm_bytes_handle,
on_panic: entry.on_panic,
capabilities: requested_caps,
public: public_process,
},
);
}
}
Ok(())
}
/// go into /target folder and get all .zip package files
async fn get_zipped_packages() -> Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> {
println!("fs: reading distro packages...\r");
let target_path = std::path::Path::new("target");
let mut packages = Vec::new();
if let Ok(mut entries) = fs::read_dir(target_path).await {
while let Ok(Some(entry)) = entries.next_entry().await {
if entry.file_name().to_string_lossy().ends_with(".zip") {
let package_name = entry
.file_name()
.to_string_lossy()
.trim_end_matches(".zip")
.to_string();
if let Ok(bytes) = fs::read(entry.path()).await {
if let Ok(zip) = zip::ZipArchive::new(std::io::Cursor::new(bytes)) {
// add to list of packages
println!("fs: found package: {}\r", package_name);
packages.push((package_name, zip));
}
}
}
}
}
packages
}
fn make_error_message(our_name: String, km: &KernelMessage, error: StateError) -> KernelMessage {
KernelMessage {
id: km.id,

View File

@ -133,20 +133,24 @@ async fn handle_request(
// sort by package_id instead? pros/cons?
// current prepend to filepaths needs to be: /process_id/drive/path
let (process_id, drive) = parse_process_and_drive(&request.path).await?;
let (process_id, drive, rest) = parse_process_and_drive(&request.path).await?;
let drive = format!("/{}/{}", process_id, drive);
let path = PathBuf::from(request.path.clone()); // validate
if km.source.process != *KERNEL_PROCESS_ID {
check_caps(
our_node.clone(),
source.clone(),
send_to_caps_oracle.clone(),
&request,
path.clone(),
drive,
drive.clone(),
vfs_path.clone(),
)
.await?;
}
let path = PathBuf::from(format!("{}{}/{}", vfs_path, drive, rest));
let (ipc, bytes) = match request.action {
VfsAction::CreateDrive => {
@ -438,7 +442,7 @@ async fn handle_request(
Ok(())
}
async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsError> {
async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String, String), VfsError> {
if !path.starts_with('/') {
return Err(VfsError::ParseError {
error: "path does not start with /".into(),
@ -467,7 +471,9 @@ async fn parse_process_and_drive(path: &str) -> Result<(ProcessId, String), VfsE
}
};
Ok((process_id, drive))
let remaining_path = parts[3..].join("/");
Ok((process_id, drive, remaining_path))
}
async fn open_file<P: AsRef<Path>>(