Merge pull request #24 from uqbar-dao/hf/key-value-fr

key_value fr
This commit is contained in:
hosted-fornet 2023-10-13 23:57:50 -07:00 committed by GitHub
commit 25e0ca23ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 182 additions and 173 deletions

View File

@ -7,6 +7,7 @@ Last updated: 10/02/23
# Clone the repo.
git clone git@github.com:uqbar-dao/uqbar.git
git clone git@github.com:uqbar-dao/redb.git
# Get some stuff so we can build wasm.
@ -19,6 +20,7 @@ cargo install --git https://github.com/bytecodealliance/cargo-component --locked
# Build the runtime, along with a number of booted-at-startup WASM modules including terminal and key_value
# OPTIONAL: --release flag
cd uqbar
cargo +nightly build --release
# Create the home directory for your node

View File

@ -177,10 +177,10 @@ fn main() {
for entry in std::fs::read_dir(&modules_dir).unwrap() {
let entry_path = entry.unwrap().path();
let package_name = entry_path.file_name().unwrap().to_str().unwrap();
// NOT YET building KV, waiting for deps to be ready
if package_name == "key_value" {
continue;
}
// // NOT YET building KV, waiting for deps to be ready
// if package_name == "key_value" {
// continue;
// }
// If Cargo.toml is present, build the app
let parent_pkg_path = format!("{}/pkg", entry_path.display());

View File

@ -125,6 +125,7 @@ dependencies = [
"cargo-component-bindings",
"serde",
"serde_json",
"thiserror",
"wit-bindgen",
]
@ -250,6 +251,26 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tinyvec"
version = "1.6.0"

View File

@ -16,6 +16,7 @@ bincode = "1.3.3"
cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" }
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
wit-bindgen = { version = "0.11.0", default_features = false }
[lib]

View File

@ -0,0 +1 @@
../../key_value_types.rs

View File

@ -5,55 +5,94 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use bindings::component::uq_process::types::*;
use bindings::{get_capability, has_capability, Guest, print_to_terminal, receive, send_request, send_requests, spawn};
use bindings::{create_capability, get_capability, has_capability, Guest, print_to_terminal, receive, send_request, send_response, spawn};
mod kernel_types;
use kernel_types as kt;
mod key_value_types;
use key_value_types as kv;
mod process_lib;
struct Component;
const PREFIX: &str = "key_value-";
fn make_cap(kind: &str, drive: &str) -> String {
type DbToProcess = HashMap<String, ProcessId>;
fn make_vfs_cap(kind: &str, drive: &str) -> String {
serde_json::to_string(&serde_json::json!({
"kind": kind,
"drive": drive,
})).unwrap()
}
fn make_db_cap(kind: &str, db: &str) -> String {
serde_json::to_string(&serde_json::json!({
"kind": kind,
"db": db,
})).unwrap()
}
fn forward_if_have_cap(
our: &Address,
operation_type: &str,
// operation_type: OperationType,
db: &str,
ipc: Option<String>,
db_to_process: &mut DbToProcess,
) -> anyhow::Result<()> {
if has_capability(&make_db_cap(operation_type, db)) {
// forward
let Some(process_id) = db_to_process.get(db) else {
return Err(kv::KeyValueError::DbDoesNotExist.into());
};
send_request(
&Address {
node: our.node.clone(),
process: process_id.clone(),
},
&Request {
inherit: true,
expects_response: None,
ipc,
metadata: None,
},
None,
None,
);
return Ok(());
} else {
// reject
return Err(kv::KeyValueError::NoCap.into());
}
}
fn handle_message (
our: &Address,
drive_to_process: &mut HashMap<String, ProcessId>,
db_to_process: &mut DbToProcess,
) -> anyhow::Result<()> {
let (source, message) = receive().unwrap();
// let (source, message) = receive()?;
if our.node != source.node {
return Err(anyhow::anyhow!(
"rejecting foreign Message from {:?}",
source,
));
return Err(kv::KeyValueError::RejectForeign.into());
}
match message {
Message::Response(r) => {
return Err(anyhow::anyhow!("key_value: unexpected Response: {:?}", r));
return Err(kv::KeyValueError::UnexpectedResponse.into());
},
Message::Request(Request { inherit: _ , expects_response: _, ipc, metadata: _ }) => {
Message::Request(Request { ipc, .. }) => {
match process_lib::parse_message_ipc(ipc.clone())? {
kt::KeyValueMessage::New { ref drive } => {
kv::KeyValueMessage::New { ref db } => {
// TODO: make atomic
// (1): create vfs
// (2): spin up worker, granting vfs caps
// (3): issue new caps
// (4): persist
if drive_to_process.contains_key(drive) {
return Err(anyhow::anyhow!(
"rejecting New for drive that already exists: {}",
drive,
))
if db_to_process.contains_key(db) {
return Err(kv::KeyValueError::DbAlreadyExists.into());
}
// (1)
@ -61,7 +100,7 @@ fn handle_message (
node: our.node.clone(),
process: kt::ProcessId::new("vfs", "sys", "uqbar").en_wit(),
};
let vfs_drive = format!("{}{}", PREFIX, drive);
let vfs_drive = format!("{}{}", PREFIX, db);
let _ = process_lib::send_and_await_response(
&vfs_address,
false,
@ -77,11 +116,11 @@ fn handle_message (
// (2)
let vfs_read = get_capability(
&vfs_address,
&make_cap("read", &vfs_drive),
&make_vfs_cap("read", &vfs_drive),
).ok_or(anyhow::anyhow!("New failed: no vfs 'read' capability found"))?;
let vfs_write = get_capability(
&vfs_address,
&make_cap("write", &vfs_drive),
&make_vfs_cap("write", &vfs_drive),
).ok_or(anyhow::anyhow!("New failed: no vfs 'write' capability found"))?;
let spawned_process_id = match spawn(
None,
@ -97,8 +136,8 @@ fn handle_message (
},
};
// grant caps
bindings::create_capability(&source.process, &make_cap("read", drive));
bindings::create_capability(&source.process, &make_cap("write", drive));
create_capability(&source.process, &make_db_cap("read", db));
create_capability(&source.process, &make_db_cap("write", db));
// initialize worker
send_request(
&Address {
@ -116,75 +155,18 @@ fn handle_message (
);
// (4)
drive_to_process.insert(drive.into(), spawned_process_id);
db_to_process.insert(db.into(), spawned_process_id);
// TODO
},
kt::KeyValueMessage::Write { ref drive, .. } => {
// if has_capability(&make_cap("write", &drive)) {
// forward
let Some(process_id) = drive_to_process.get(drive) else {
// TODO
return Err(anyhow::anyhow!(
"cannot write to non-existent drive {}",
drive,
));
};
send_request(
&Address {
node: our.node.clone(),
process: process_id.clone(),
kv::KeyValueMessage::Write { ref db, .. } => {
forward_if_have_cap(our, "write", db, ipc, db_to_process)?;
},
&Request {
inherit: true,
expects_response: None,
ipc,
metadata: None,
},
None,
None,
);
// } else {
// // reject
// // TODO
// return Err(anyhow::anyhow!(
// "cannot write to drive: missing 'write' capability; {}",
// drive,
// ));
// }
},
kt::KeyValueMessage::Read { ref drive, .. } => {
// if has_capability(&make_cap("read", &drive)) {
// forward
let Some(process_id) = drive_to_process.get(drive) else {
// TODO
return Err(anyhow::anyhow!(
"cannot read from non-existent drive {}",
drive,
));
};
send_request(
&Address {
node: our.node.clone(),
process: process_id.clone(),
},
&Request {
inherit: true,
expects_response: None,
ipc,
metadata: None,
},
None,
None,
);
// } else {
// // reject
// // TODO
// return Err(anyhow::anyhow!(
// "cannot read from drive: missing 'read' capability; {}",
// drive,
// ));
// }
kv::KeyValueMessage::Read { ref db, .. } => {
forward_if_have_cap(our, "read", db, ipc, db_to_process)?;
},
kv::KeyValueMessage::Err { error } => {
return Err(error.into());
}
}
Ok(())
@ -196,17 +178,25 @@ impl Guest for Component {
fn init(our: Address) {
print_to_terminal(0, "key_value: begin");
let mut drive_to_process: HashMap<String, ProcessId> = HashMap::new();
let mut db_to_process: HashMap<String, ProcessId> = HashMap::new();
loop {
match handle_message(&our, &mut drive_to_process) {
match handle_message(&our, &mut db_to_process) {
Ok(()) => {},
Err(e) => {
// TODO: should we send an error on failure?
print_to_terminal(0, format!(
"key_value: error: {:?}",
e,
).as_str());
if let Some(e) = e.downcast_ref::<kv::KeyValueError>() {
send_response(
&Response {
ipc: Some(serde_json::to_string(&e).unwrap()),
metadata: None,
},
None,
);
}
},
};
}

View File

@ -0,0 +1,24 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum KeyValueMessage {
New { db: String },
Write { db: String, key: Vec<u8>, val: Vec<u8> },
// Write { db: String, key: Vec<u8> },
Read { db: String, key: Vec<u8> },
Err { error: KeyValueError },
}
#[derive(Debug, Serialize, Deserialize, thiserror::Error)]
pub enum KeyValueError {
#[error("DbDoesNotExist")]
DbDoesNotExist,
#[error("DbAlreadyExists")]
DbAlreadyExists,
#[error("NoCap")]
NoCap,
#[error("RejectForeign")]
RejectForeign,
#[error("UnexpectedResponse")]
UnexpectedResponse,
}

View File

@ -188,6 +188,7 @@ dependencies = [
"redb",
"serde",
"serde_json",
"thiserror",
"wit-bindgen",
]

View File

@ -17,6 +17,7 @@ cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-co
redb = { path = "../../../../redb" }
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
wit-bindgen = { version = "0.11.0", default_features = false }
[lib]

View File

@ -0,0 +1 @@
../key_value_types.rs

View File

@ -0,0 +1 @@
../../key_value_types.rs

View File

@ -10,6 +10,8 @@ use bindings::{get_payload, Guest, print_to_terminal, receive, send_and_await_re
mod kernel_types;
use kernel_types as kt;
mod key_value_types;
use key_value_types as kv;
mod process_lib;
struct Component;
@ -68,35 +70,29 @@ fn send_and_await_response_wrapped(
fn handle_message (
our: &Address,
db: &mut Option<redb::Database>,
db_handle: &mut Option<redb::Database>,
) -> anyhow::Result<()> {
let (source, message) = receive().unwrap();
// let (source, message) = receive()?;
if our.node != source.node {
return Err(anyhow::anyhow!(
"rejecting foreign Message from {:?}",
source,
));
return Err(kv::KeyValueError::RejectForeign.into());
}
match message {
Message::Response(_) => { unimplemented!() },
Message::Request(Request { inherit: _ , expects_response: _, ipc, metadata: _ }) => {
match process_lib::parse_message_ipc(ipc.clone())? {
kt::KeyValueMessage::New { drive: kv_drive } => {
let vfs_drive = format!("{}{}", PREFIX, kv_drive);
match db {
kv::KeyValueMessage::New { db } => {
let vfs_drive = format!("{}{}", PREFIX, db);
match db_handle {
Some(_) => {
return Err(anyhow::anyhow!("cannot send New more than once"));
return Err(kv::KeyValueError::DbAlreadyExists.into());
},
None => {
print_to_terminal(0, "key_value_worker: Create");
*db = Some(redb::Database::create(
format!(
"/{}.redb",
kv_drive,
),
*db_handle = Some(redb::Database::create(
format!("/{}.redb", db),
our.node.clone(),
vfs_drive,
get_payload_wrapped,
@ -106,18 +102,19 @@ fn handle_message (
},
}
},
// kt::KeyValueMessage::Write { ref key, .. } => {
kt::KeyValueMessage::Write { ref key, .. } => {
let Some(db) = db else {
return Err(anyhow::anyhow!("cannot send New more than once"));
// kv::KeyValueMessage::Write { ref key, .. } => {
kv::KeyValueMessage::Write { ref key, ref val, .. } => {
let Some(db_handle) = db_handle else {
return Err(kv::KeyValueError::DbDoesNotExist.into());
};
let Payload { mime: _, ref bytes } = get_payload().ok_or(anyhow::anyhow!("couldnt get bytes for Write"))?;
// let Payload { mime: _, ref bytes } = get_payload().ok_or(anyhow::anyhow!("couldnt get bytes for Write"))?;
let write_txn = db.begin_write()?;
let write_txn = db_handle.begin_write()?;
{
let mut table = write_txn.open_table(TABLE)?;
table.insert(&key[..], &bytes[..])?;
// table.insert(&key[..], &bytes[..])?;
table.insert(&key[..], &val[..])?;
}
write_txn.commit()?;
@ -129,12 +126,12 @@ fn handle_message (
None,
);
},
kt::KeyValueMessage::Read { ref key, .. } => {
let Some(db) = db else {
return Err(anyhow::anyhow!("cannot send New more than once"));
kv::KeyValueMessage::Read { ref key, .. } => {
let Some(db_handle) = db_handle else {
return Err(kv::KeyValueError::DbDoesNotExist.into());
};
let read_txn = db.begin_read()?;
let read_txn = db_handle.begin_read()?;
let table = read_txn.open_table(TABLE)?;
@ -149,6 +146,11 @@ fn handle_message (
);
},
Some(v) => {
let bytes = v.value().to_vec();
print_to_terminal(
1,
&format!("key_value_worker: key, val: {:?}, {:?}", key, bytes),
);
send_response(
&Response {
ipc,
@ -156,12 +158,15 @@ fn handle_message (
},
Some(&Payload {
mime: None,
bytes: v.value().to_vec(),
bytes,
}),
);
},
};
},
kv::KeyValueMessage::Err { error } => {
return Err(error.into());
}
}
Ok(())
@ -173,17 +178,26 @@ impl Guest for Component {
fn init(our: Address) {
print_to_terminal(1, "key_value_worker: begin");
let mut db: Option<redb::Database> = None;
let mut db_handle: Option<redb::Database> = None;
loop {
match handle_message(&our, &mut db) {
match handle_message(&our, &mut db_handle) {
Ok(()) => {},
Err(e) => {
// TODO: should we send an error on failure?
print_to_terminal(0, format!(
"key_value_worker: error: {:?}",
e,
).as_str());
if let Some(e) = e.downcast_ref::<kv::KeyValueError>() {
send_response(
&Response {
ipc: Some(serde_json::to_string(&e).unwrap()),
metadata: None,
},
None,
);
}
panic!("");
},
};
}

View File

@ -636,7 +636,7 @@ impl UqProcessImports for ProcessWasi {
if prompt.source.node == self.process.metadata.our.node {
// if local, need to ask them
let cap = t::Capability {
issuer: prompt.source.clone(),
issuer: self.process.metadata.our.clone(),
params,
};
let (tx, rx) = tokio::sync::oneshot::channel();
@ -644,7 +644,7 @@ impl UqProcessImports for ProcessWasi {
.process
.caps_oracle
.send(t::CapMessage::Has {
on: self.process.metadata.our.process.clone(),
on: prompt.source.process.clone(),
cap: cap.clone(),
responder: tx,
})

View File

@ -310,30 +310,6 @@ impl VfsError {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KeyValueMessage {
New { drive: String },
Write { drive: String, key: Vec<u8> },
Read { drive: String, key: Vec<u8> },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KeyValueError {
BadDriveName,
NoCap,
NoBytes,
}
impl KeyValueError {
pub fn kind(&self) -> &str {
match *self {
KeyValueError::BadDriveName => "BadDriveName",
KeyValueError::NoCap => "NoCap",
KeyValueError::NoBytes => "NoBytes",
}
}
}
//
// conversions between wit types and kernel types (annoying!)
//

View File

@ -605,31 +605,6 @@ impl VfsError {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KeyValueMessage {
New { drive: String },
Write { drive: String, key: Vec<u8> },
Read { drive: String, key: Vec<u8> },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KeyValueError {
BadDriveName,
NoCap,
NoBytes,
}
#[allow(dead_code)]
impl KeyValueError {
pub fn kind(&self) -> &str {
match *self {
KeyValueError::BadDriveName => "BadDriveName",
KeyValueError::NoCap => "NoCap",
KeyValueError::NoBytes => "NoBytes",
}
}
}
//
// http_client.rs types
//

View File

@ -1181,8 +1181,8 @@ async fn match_request(
(Some(serde_json::to_string(&VfsResponse::Ok).unwrap()), None)
}
VfsAction::WriteOffset { full_path, offset } => {
let mut vfs = vfs.lock().await;
let file_hash = {
let mut vfs = vfs.lock().await;
let Some(key) = vfs.path_to_key.remove(&full_path) else {
panic!("");
};
@ -1429,6 +1429,7 @@ async fn match_request(
let Ok(FsResponse::Read(read_hash)) =
serde_json::from_str::<Result<FsResponse, FsError>>(&ipc).unwrap()
else {
println!("vfs: GetEntry fail fs error: {}\r", ipc);
panic!("");
};
// TODO get rid of PANICS!