mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-23 08:32:23 +03:00
Merge branch 'main' into jf/invite
This commit is contained in:
commit
ac7e413fba
@ -7,6 +7,7 @@ Last updated: 10/02/23
|
||||
# Clone the repo.
|
||||
|
||||
git clone git@github.com:uqbar-dao/uqbar.git
|
||||
git clone git@github.com:uqbar-dao/redb.git
|
||||
|
||||
# Get some stuff so we can build wasm.
|
||||
|
||||
@ -22,6 +23,7 @@ git submodule update --init --recursive
|
||||
|
||||
# Build the runtime, along with a number of booted-at-startup WASM modules including terminal and key_value
|
||||
# OPTIONAL: --release flag
|
||||
cd uqbar
|
||||
cargo +nightly build --release
|
||||
|
||||
# Create the home directory for your node
|
||||
|
8
build.rs
8
build.rs
@ -181,10 +181,10 @@ fn main() {
|
||||
for entry in std::fs::read_dir(&modules_dir).unwrap() {
|
||||
let entry_path = entry.unwrap().path();
|
||||
let package_name = entry_path.file_name().unwrap().to_str().unwrap();
|
||||
// NOT YET building KV, waiting for deps to be ready
|
||||
if package_name == "key_value" {
|
||||
return;
|
||||
}
|
||||
// // NOT YET building KV, waiting for deps to be ready
|
||||
// if package_name == "key_value" {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// If Cargo.toml is present, build the app
|
||||
let parent_pkg_path = format!("{}/pkg", entry_path.display());
|
||||
|
@ -198,25 +198,21 @@ fn parse_command(our: &Address, request_string: String) -> anyhow::Result<Apptra
|
||||
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
||||
}
|
||||
|
||||
// // TODO fix request?
|
||||
// for process_name in &entry.request_messaging {
|
||||
// let Ok(parsed_process_id) = ProcessId::from_str(process_name) else {
|
||||
// continue;
|
||||
// };
|
||||
// let Some(messaging_cap) = get_capability(
|
||||
// &Address {
|
||||
// node: our.node.clone(),
|
||||
// process: parsed_process_id.clone(),
|
||||
// },
|
||||
// &serde_json::to_string(&serde_json::json!({
|
||||
// "messaging": kt::ProcessId::de_wit(parsed_process_id),
|
||||
// })).unwrap(),
|
||||
// ) else {
|
||||
// return Err(anyhow::anyhow!(format!("app_tracker: no cap for {}", process_name)));
|
||||
// };
|
||||
// initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
||||
// }
|
||||
|
||||
for process_name in &entry.request_messaging {
|
||||
let Ok(parsed_process_id) = ProcessId::from_str(process_name) else {
|
||||
continue;
|
||||
};
|
||||
let Some(messaging_cap) = get_capability(
|
||||
&Address {
|
||||
node: our.node.clone(),
|
||||
process: parsed_process_id.clone(),
|
||||
},
|
||||
&"\"messaging\"".into()
|
||||
) else {
|
||||
return Err(anyhow::anyhow!(format!("app_tracker: no cap for {}", process_name)));
|
||||
};
|
||||
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
||||
}
|
||||
|
||||
let process_id = format!("{}:{}", entry.process_name, package.clone());
|
||||
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
|
||||
|
21
modules/key_value/key_value/Cargo.lock
generated
21
modules/key_value/key_value/Cargo.lock
generated
@ -125,6 +125,7 @@ dependencies = [
|
||||
"cargo-component-bindings",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
@ -250,6 +251,26 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.49"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.49"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec"
|
||||
version = "1.6.0"
|
||||
|
@ -16,6 +16,7 @@ bincode = "1.3.3"
|
||||
cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" }
|
||||
serde = {version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
wit-bindgen = { version = "0.11.0", default_features = false }
|
||||
|
||||
[lib]
|
||||
|
1
modules/key_value/key_value/src/key_value_types.rs
Symbolic link
1
modules/key_value/key_value/src/key_value_types.rs
Symbolic link
@ -0,0 +1 @@
|
||||
../../key_value_types.rs
|
@ -5,55 +5,94 @@ use std::collections::HashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use bindings::component::uq_process::types::*;
|
||||
use bindings::{get_capability, has_capability, Guest, print_to_terminal, receive, send_request, send_requests, spawn};
|
||||
use bindings::{create_capability, get_capability, has_capability, Guest, print_to_terminal, receive, send_request, send_response, spawn};
|
||||
|
||||
mod kernel_types;
|
||||
use kernel_types as kt;
|
||||
mod key_value_types;
|
||||
use key_value_types as kv;
|
||||
mod process_lib;
|
||||
|
||||
struct Component;
|
||||
|
||||
const PREFIX: &str = "key_value-";
|
||||
|
||||
fn make_cap(kind: &str, drive: &str) -> String {
|
||||
type DbToProcess = HashMap<String, ProcessId>;
|
||||
|
||||
fn make_vfs_cap(kind: &str, drive: &str) -> String {
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"kind": kind,
|
||||
"drive": drive,
|
||||
})).unwrap()
|
||||
}
|
||||
|
||||
fn make_db_cap(kind: &str, db: &str) -> String {
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"kind": kind,
|
||||
"db": db,
|
||||
})).unwrap()
|
||||
}
|
||||
|
||||
fn forward_if_have_cap(
|
||||
our: &Address,
|
||||
operation_type: &str,
|
||||
// operation_type: OperationType,
|
||||
db: &str,
|
||||
ipc: Option<String>,
|
||||
db_to_process: &mut DbToProcess,
|
||||
) -> anyhow::Result<()> {
|
||||
if has_capability(&make_db_cap(operation_type, db)) {
|
||||
// forward
|
||||
let Some(process_id) = db_to_process.get(db) else {
|
||||
return Err(kv::KeyValueError::DbDoesNotExist.into());
|
||||
};
|
||||
send_request(
|
||||
&Address {
|
||||
node: our.node.clone(),
|
||||
process: process_id.clone(),
|
||||
},
|
||||
&Request {
|
||||
inherit: true,
|
||||
expects_response: None,
|
||||
ipc,
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
// reject
|
||||
return Err(kv::KeyValueError::NoCap.into());
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_message (
|
||||
our: &Address,
|
||||
drive_to_process: &mut HashMap<String, ProcessId>,
|
||||
db_to_process: &mut DbToProcess,
|
||||
) -> anyhow::Result<()> {
|
||||
let (source, message) = receive().unwrap();
|
||||
// let (source, message) = receive()?;
|
||||
|
||||
if our.node != source.node {
|
||||
return Err(anyhow::anyhow!(
|
||||
"rejecting foreign Message from {:?}",
|
||||
source,
|
||||
));
|
||||
return Err(kv::KeyValueError::RejectForeign.into());
|
||||
}
|
||||
|
||||
match message {
|
||||
Message::Response(r) => {
|
||||
return Err(anyhow::anyhow!("key_value: unexpected Response: {:?}", r));
|
||||
return Err(kv::KeyValueError::UnexpectedResponse.into());
|
||||
},
|
||||
Message::Request(Request { inherit: _ , expects_response: _, ipc, metadata: _ }) => {
|
||||
Message::Request(Request { ipc, .. }) => {
|
||||
match process_lib::parse_message_ipc(ipc.clone())? {
|
||||
kt::KeyValueMessage::New { ref drive } => {
|
||||
kv::KeyValueMessage::New { ref db } => {
|
||||
// TODO: make atomic
|
||||
// (1): create vfs
|
||||
// (2): spin up worker, granting vfs caps
|
||||
// (3): issue new caps
|
||||
// (4): persist
|
||||
|
||||
if drive_to_process.contains_key(drive) {
|
||||
return Err(anyhow::anyhow!(
|
||||
"rejecting New for drive that already exists: {}",
|
||||
drive,
|
||||
))
|
||||
if db_to_process.contains_key(db) {
|
||||
return Err(kv::KeyValueError::DbAlreadyExists.into());
|
||||
}
|
||||
|
||||
// (1)
|
||||
@ -61,7 +100,7 @@ fn handle_message (
|
||||
node: our.node.clone(),
|
||||
process: kt::ProcessId::new("vfs", "sys", "uqbar").en_wit(),
|
||||
};
|
||||
let vfs_drive = format!("{}{}", PREFIX, drive);
|
||||
let vfs_drive = format!("{}{}", PREFIX, db);
|
||||
let _ = process_lib::send_and_await_response(
|
||||
&vfs_address,
|
||||
false,
|
||||
@ -77,11 +116,11 @@ fn handle_message (
|
||||
// (2)
|
||||
let vfs_read = get_capability(
|
||||
&vfs_address,
|
||||
&make_cap("read", &vfs_drive),
|
||||
&make_vfs_cap("read", &vfs_drive),
|
||||
).ok_or(anyhow::anyhow!("New failed: no vfs 'read' capability found"))?;
|
||||
let vfs_write = get_capability(
|
||||
&vfs_address,
|
||||
&make_cap("write", &vfs_drive),
|
||||
&make_vfs_cap("write", &vfs_drive),
|
||||
).ok_or(anyhow::anyhow!("New failed: no vfs 'write' capability found"))?;
|
||||
let spawned_process_id = match spawn(
|
||||
None,
|
||||
@ -97,8 +136,8 @@ fn handle_message (
|
||||
},
|
||||
};
|
||||
// grant caps
|
||||
bindings::create_capability(&source.process, &make_cap("read", drive));
|
||||
bindings::create_capability(&source.process, &make_cap("write", drive));
|
||||
create_capability(&source.process, &make_db_cap("read", db));
|
||||
create_capability(&source.process, &make_db_cap("write", db));
|
||||
// initialize worker
|
||||
send_request(
|
||||
&Address {
|
||||
@ -116,75 +155,18 @@ fn handle_message (
|
||||
);
|
||||
|
||||
// (4)
|
||||
drive_to_process.insert(drive.into(), spawned_process_id);
|
||||
db_to_process.insert(db.into(), spawned_process_id);
|
||||
// TODO
|
||||
},
|
||||
kt::KeyValueMessage::Write { ref drive, .. } => {
|
||||
// if has_capability(&make_cap("write", &drive)) {
|
||||
// forward
|
||||
let Some(process_id) = drive_to_process.get(drive) else {
|
||||
// TODO
|
||||
return Err(anyhow::anyhow!(
|
||||
"cannot write to non-existent drive {}",
|
||||
drive,
|
||||
));
|
||||
};
|
||||
send_request(
|
||||
&Address {
|
||||
node: our.node.clone(),
|
||||
process: process_id.clone(),
|
||||
},
|
||||
&Request {
|
||||
inherit: true,
|
||||
expects_response: None,
|
||||
ipc,
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
);
|
||||
// } else {
|
||||
// // reject
|
||||
// // TODO
|
||||
// return Err(anyhow::anyhow!(
|
||||
// "cannot write to drive: missing 'write' capability; {}",
|
||||
// drive,
|
||||
// ));
|
||||
// }
|
||||
kv::KeyValueMessage::Write { ref db, .. } => {
|
||||
forward_if_have_cap(our, "write", db, ipc, db_to_process)?;
|
||||
},
|
||||
kt::KeyValueMessage::Read { ref drive, .. } => {
|
||||
// if has_capability(&make_cap("read", &drive)) {
|
||||
// forward
|
||||
let Some(process_id) = drive_to_process.get(drive) else {
|
||||
// TODO
|
||||
return Err(anyhow::anyhow!(
|
||||
"cannot read from non-existent drive {}",
|
||||
drive,
|
||||
));
|
||||
};
|
||||
send_request(
|
||||
&Address {
|
||||
node: our.node.clone(),
|
||||
process: process_id.clone(),
|
||||
},
|
||||
&Request {
|
||||
inherit: true,
|
||||
expects_response: None,
|
||||
ipc,
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
);
|
||||
// } else {
|
||||
// // reject
|
||||
// // TODO
|
||||
// return Err(anyhow::anyhow!(
|
||||
// "cannot read from drive: missing 'read' capability; {}",
|
||||
// drive,
|
||||
// ));
|
||||
// }
|
||||
kv::KeyValueMessage::Read { ref db, .. } => {
|
||||
forward_if_have_cap(our, "read", db, ipc, db_to_process)?;
|
||||
},
|
||||
kv::KeyValueMessage::Err { error } => {
|
||||
return Err(error.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -196,17 +178,25 @@ impl Guest for Component {
|
||||
fn init(our: Address) {
|
||||
print_to_terminal(0, "key_value: begin");
|
||||
|
||||
let mut drive_to_process: HashMap<String, ProcessId> = HashMap::new();
|
||||
let mut db_to_process: HashMap<String, ProcessId> = HashMap::new();
|
||||
|
||||
loop {
|
||||
match handle_message(&our, &mut drive_to_process) {
|
||||
match handle_message(&our, &mut db_to_process) {
|
||||
Ok(()) => {},
|
||||
Err(e) => {
|
||||
// TODO: should we send an error on failure?
|
||||
print_to_terminal(0, format!(
|
||||
"key_value: error: {:?}",
|
||||
e,
|
||||
).as_str());
|
||||
if let Some(e) = e.downcast_ref::<kv::KeyValueError>() {
|
||||
send_response(
|
||||
&Response {
|
||||
ipc: Some(serde_json::to_string(&e).unwrap()),
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
23
modules/key_value/key_value_types.rs
Normal file
23
modules/key_value/key_value_types.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KeyValueMessage {
|
||||
New { db: String },
|
||||
Write { db: String, key: Vec<u8> },
|
||||
Read { db: String, key: Vec<u8> },
|
||||
Err { error: KeyValueError },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, thiserror::Error)]
|
||||
pub enum KeyValueError {
|
||||
#[error("DbDoesNotExist")]
|
||||
DbDoesNotExist,
|
||||
#[error("DbAlreadyExists")]
|
||||
DbAlreadyExists,
|
||||
#[error("NoCap")]
|
||||
NoCap,
|
||||
#[error("RejectForeign")]
|
||||
RejectForeign,
|
||||
#[error("UnexpectedResponse")]
|
||||
UnexpectedResponse,
|
||||
}
|
1
modules/key_value/key_value_worker/Cargo.lock
generated
1
modules/key_value/key_value_worker/Cargo.lock
generated
@ -188,6 +188,7 @@ dependencies = [
|
||||
"redb",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
|
@ -17,6 +17,7 @@ cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-co
|
||||
redb = { path = "../../../../redb" }
|
||||
serde = {version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
wit-bindgen = { version = "0.11.0", default_features = false }
|
||||
|
||||
[lib]
|
||||
|
1
modules/key_value/key_value_worker/key_value_types.rs
Symbolic link
1
modules/key_value/key_value_worker/key_value_types.rs
Symbolic link
@ -0,0 +1 @@
|
||||
../key_value_types.rs
|
1
modules/key_value/key_value_worker/src/key_value_types.rs
Symbolic link
1
modules/key_value/key_value_worker/src/key_value_types.rs
Symbolic link
@ -0,0 +1 @@
|
||||
../../key_value_types.rs
|
@ -10,6 +10,8 @@ use bindings::{get_payload, Guest, print_to_terminal, receive, send_and_await_re
|
||||
|
||||
mod kernel_types;
|
||||
use kernel_types as kt;
|
||||
mod key_value_types;
|
||||
use key_value_types as kv;
|
||||
mod process_lib;
|
||||
|
||||
struct Component;
|
||||
@ -68,35 +70,29 @@ fn send_and_await_response_wrapped(
|
||||
|
||||
fn handle_message (
|
||||
our: &Address,
|
||||
db: &mut Option<redb::Database>,
|
||||
db_handle: &mut Option<redb::Database>,
|
||||
) -> anyhow::Result<()> {
|
||||
let (source, message) = receive().unwrap();
|
||||
// let (source, message) = receive()?;
|
||||
|
||||
if our.node != source.node {
|
||||
return Err(anyhow::anyhow!(
|
||||
"rejecting foreign Message from {:?}",
|
||||
source,
|
||||
));
|
||||
return Err(kv::KeyValueError::RejectForeign.into());
|
||||
}
|
||||
|
||||
match message {
|
||||
Message::Response(_) => { unimplemented!() },
|
||||
Message::Request(Request { inherit: _ , expects_response: _, ipc, metadata: _ }) => {
|
||||
match process_lib::parse_message_ipc(ipc.clone())? {
|
||||
kt::KeyValueMessage::New { drive: kv_drive } => {
|
||||
let vfs_drive = format!("{}{}", PREFIX, kv_drive);
|
||||
match db {
|
||||
kv::KeyValueMessage::New { db } => {
|
||||
let vfs_drive = format!("{}{}", PREFIX, db);
|
||||
match db_handle {
|
||||
Some(_) => {
|
||||
return Err(anyhow::anyhow!("cannot send New more than once"));
|
||||
return Err(kv::KeyValueError::DbAlreadyExists.into());
|
||||
},
|
||||
None => {
|
||||
print_to_terminal(0, "key_value_worker: Create");
|
||||
*db = Some(redb::Database::create(
|
||||
format!(
|
||||
"/{}.redb",
|
||||
kv_drive,
|
||||
),
|
||||
*db_handle = Some(redb::Database::create(
|
||||
format!("/{}.redb", db),
|
||||
our.node.clone(),
|
||||
vfs_drive,
|
||||
get_payload_wrapped,
|
||||
@ -106,15 +102,14 @@ fn handle_message (
|
||||
},
|
||||
}
|
||||
},
|
||||
// kt::KeyValueMessage::Write { ref key, .. } => {
|
||||
kt::KeyValueMessage::Write { ref key, .. } => {
|
||||
let Some(db) = db else {
|
||||
return Err(anyhow::anyhow!("cannot send New more than once"));
|
||||
kv::KeyValueMessage::Write { ref key, .. } => {
|
||||
let Some(db_handle) = db_handle else {
|
||||
return Err(kv::KeyValueError::DbDoesNotExist.into());
|
||||
};
|
||||
|
||||
let Payload { mime: _, ref bytes } = get_payload().ok_or(anyhow::anyhow!("couldnt get bytes for Write"))?;
|
||||
|
||||
let write_txn = db.begin_write()?;
|
||||
let write_txn = db_handle.begin_write()?;
|
||||
{
|
||||
let mut table = write_txn.open_table(TABLE)?;
|
||||
table.insert(&key[..], &bytes[..])?;
|
||||
@ -129,12 +124,12 @@ fn handle_message (
|
||||
None,
|
||||
);
|
||||
},
|
||||
kt::KeyValueMessage::Read { ref key, .. } => {
|
||||
let Some(db) = db else {
|
||||
return Err(anyhow::anyhow!("cannot send New more than once"));
|
||||
kv::KeyValueMessage::Read { ref key, .. } => {
|
||||
let Some(db_handle) = db_handle else {
|
||||
return Err(kv::KeyValueError::DbDoesNotExist.into());
|
||||
};
|
||||
|
||||
let read_txn = db.begin_read()?;
|
||||
let read_txn = db_handle.begin_read()?;
|
||||
|
||||
let table = read_txn.open_table(TABLE)?;
|
||||
|
||||
@ -149,6 +144,15 @@ fn handle_message (
|
||||
);
|
||||
},
|
||||
Some(v) => {
|
||||
let bytes = v.value().to_vec();
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!(
|
||||
"key_value_worker: key, val: {:?}, {}",
|
||||
key,
|
||||
if bytes.len() < 100 { format!("{:?}", bytes) } else { "<elided>".into() },
|
||||
),
|
||||
);
|
||||
send_response(
|
||||
&Response {
|
||||
ipc,
|
||||
@ -156,12 +160,15 @@ fn handle_message (
|
||||
},
|
||||
Some(&Payload {
|
||||
mime: None,
|
||||
bytes: v.value().to_vec(),
|
||||
bytes,
|
||||
}),
|
||||
);
|
||||
},
|
||||
};
|
||||
},
|
||||
kv::KeyValueMessage::Err { error } => {
|
||||
return Err(error.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -173,17 +180,26 @@ impl Guest for Component {
|
||||
fn init(our: Address) {
|
||||
print_to_terminal(1, "key_value_worker: begin");
|
||||
|
||||
let mut db: Option<redb::Database> = None;
|
||||
let mut db_handle: Option<redb::Database> = None;
|
||||
|
||||
loop {
|
||||
match handle_message(&our, &mut db) {
|
||||
match handle_message(&our, &mut db_handle) {
|
||||
Ok(()) => {},
|
||||
Err(e) => {
|
||||
// TODO: should we send an error on failure?
|
||||
print_to_terminal(0, format!(
|
||||
"key_value_worker: error: {:?}",
|
||||
e,
|
||||
).as_str());
|
||||
if let Some(e) = e.downcast_ref::<kv::KeyValueError>() {
|
||||
send_response(
|
||||
&Response {
|
||||
ipc: Some(serde_json::to_string(&e).unwrap()),
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
);
|
||||
}
|
||||
panic!("");
|
||||
},
|
||||
};
|
||||
}
|
||||
|
@ -413,7 +413,9 @@ impl Manifest {
|
||||
|
||||
while let Some(chunk) = chunks.next() {
|
||||
if memory_buffer.len() + chunk.len() > self.memory_limit {
|
||||
manifest.insert(file.clone(), in_memory_file);
|
||||
self.flush_to_wal(&mut manifest, &mut memory_buffer).await?;
|
||||
in_memory_file = manifest.get(file).unwrap().clone();
|
||||
}
|
||||
|
||||
self.write_chunk(
|
||||
@ -804,7 +806,9 @@ impl Manifest {
|
||||
.copy_from_slice(data_to_write);
|
||||
|
||||
if memory_buffer.len() + chunk_data.len() > self.memory_limit {
|
||||
manifest.insert(file_id.clone(), file);
|
||||
self.flush_to_wal(&mut manifest, &mut memory_buffer).await?;
|
||||
file = manifest.get(file_id).unwrap().clone();
|
||||
}
|
||||
|
||||
self.write_chunk(
|
||||
|
@ -636,7 +636,7 @@ impl UqProcessImports for ProcessWasi {
|
||||
if prompt.source.node == self.process.metadata.our.node {
|
||||
// if local, need to ask them
|
||||
let cap = t::Capability {
|
||||
issuer: prompt.source.clone(),
|
||||
issuer: self.process.metadata.our.clone(),
|
||||
params,
|
||||
};
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
@ -644,7 +644,7 @@ impl UqProcessImports for ProcessWasi {
|
||||
.process
|
||||
.caps_oracle
|
||||
.send(t::CapMessage::Has {
|
||||
on: self.process.metadata.our.process.clone(),
|
||||
on: prompt.source.process.clone(),
|
||||
cap: cap.clone(),
|
||||
responder: tx,
|
||||
})
|
||||
|
@ -310,30 +310,6 @@ impl VfsError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KeyValueMessage {
|
||||
New { drive: String },
|
||||
Write { drive: String, key: Vec<u8> },
|
||||
Read { drive: String, key: Vec<u8> },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KeyValueError {
|
||||
BadDriveName,
|
||||
NoCap,
|
||||
NoBytes,
|
||||
}
|
||||
|
||||
impl KeyValueError {
|
||||
pub fn kind(&self) -> &str {
|
||||
match *self {
|
||||
KeyValueError::BadDriveName => "BadDriveName",
|
||||
KeyValueError::NoCap => "NoCap",
|
||||
KeyValueError::NoBytes => "NoBytes",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// conversions between wit types and kernel types (annoying!)
|
||||
//
|
||||
|
25
src/types.rs
25
src/types.rs
@ -629,31 +629,6 @@ impl VfsError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KeyValueMessage {
|
||||
New { drive: String },
|
||||
Write { drive: String, key: Vec<u8> },
|
||||
Read { drive: String, key: Vec<u8> },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KeyValueError {
|
||||
BadDriveName,
|
||||
NoCap,
|
||||
NoBytes,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl KeyValueError {
|
||||
pub fn kind(&self) -> &str {
|
||||
match *self {
|
||||
KeyValueError::BadDriveName => "BadDriveName",
|
||||
KeyValueError::NoCap => "NoCap",
|
||||
KeyValueError::NoBytes => "NoBytes",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// http_client.rs types
|
||||
//
|
||||
|
@ -1181,8 +1181,8 @@ async fn match_request(
|
||||
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
|
||||
}
|
||||
VfsAction::WriteOffset { full_path, offset } => {
|
||||
let mut vfs = vfs.lock().await;
|
||||
let file_hash = {
|
||||
let mut vfs = vfs.lock().await;
|
||||
let Some(key) = vfs.path_to_key.remove(&full_path) else {
|
||||
panic!("");
|
||||
};
|
||||
@ -1429,6 +1429,7 @@ async fn match_request(
|
||||
let Ok(FsResponse::Read(read_hash)) =
|
||||
serde_json::from_str::<Result<FsResponse, FsError>>(&ipc).unwrap()
|
||||
else {
|
||||
println!("vfs: GetEntry fail fs error: {}\r", ipc);
|
||||
panic!("");
|
||||
};
|
||||
// TODO get rid of PANICS!
|
||||
|
Loading…
Reference in New Issue
Block a user