Merge pull request #117 from uqbar-dao/bp/backup

backup
This commit is contained in:
dr-frmr 2023-12-27 01:38:28 -05:00 committed by GitHub
commit 6d12014c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 888 additions and 797 deletions

View File

@ -206,6 +206,22 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -387,6 +403,15 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicase"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.14"
@ -423,11 +448,12 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "uqbar_process_lib"
version = "0.4.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=8342b1a#8342b1a131401fb5d141dab8c90e79aa6d2bc909"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=2d17d75#2d17d75152e55ef3ed417c79312e209ca45b8dbb"
dependencies = [
"anyhow",
"bincode",
"http",
"mime_guess",
"rand",
"serde",
"serde_json",

View File

@ -3,8 +3,6 @@ name = "app_store"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.release]
panic = "abort"
opt-level = "s"
@ -17,7 +15,7 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.8"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "8342b1a" }
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "2d17d75" }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
[lib]

File diff suppressed because it is too large Load Diff

View File

@ -58,7 +58,7 @@ dependencies = [
[[package]]
name = "ft_worker"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"bincode",
@ -157,6 +157,22 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -321,6 +337,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "unicase"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.14"
@ -357,11 +382,12 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "uqbar_process_lib"
version = "0.4.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=b09d987#b09d9875edce1a230549cf56cf088f95e38d4abd"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=2d17d75#2d17d75152e55ef3ed417c79312e209ca45b8dbb"
dependencies = [
"anyhow",
"bincode",
"http",
"mime_guess",
"rand",
"serde",
"serde_json",
@ -381,6 +407,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View File

@ -1,6 +1,6 @@
[package]
name = "ft_worker"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -16,7 +16,7 @@ bincode = "1.3.3"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "b09d987" }
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "2d17d75" }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
[lib]

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use uqbar_process_lib::uqbar::process::standard::*;
use uqbar_process_lib::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct FileTransferContext {
@ -13,9 +13,9 @@ pub struct FileTransferContext {
/// in order to prompt them to spawn a worker
#[derive(Debug, Serialize, Deserialize)]
pub enum FTWorkerCommand {
/// make sure to attach file itself as payload
Send {
// make sure to attach file itself as payload
target: String, // annoying, but this is Address
target: Address,
file_name: String,
timeout: u64,
},
@ -32,10 +32,12 @@ pub enum FTWorkerCommand {
#[derive(Debug, Serialize, Deserialize)]
pub enum FTWorkerResult {
SendSuccess,
ReceiveSuccess(String), // name of file, bytes in payload
/// string is name of file. bytes in payload
ReceiveSuccess(String),
Err(TransferError),
}
/// the possible errors that can be returned to the parent inside `FTWorkerResult`
#[derive(Debug, Serialize, Deserialize)]
pub enum TransferError {
TargetOffline,
@ -44,47 +46,48 @@ pub enum TransferError {
SourceFailed,
}
/// A helper function to spawn a worker and initialize a file transfer.
/// The outcome will be sent as an [`FTWorkerResult`] to the caller process.
///
/// if `file_bytes` is None, expects to inherit payload!
#[allow(dead_code)]
pub fn spawn_transfer(
our: &Address,
file_name: &str,
file_bytes: Option<Vec<u8>>, // if None, expects to inherit payload!
file_bytes: Option<Vec<u8>>,
timeout: u64,
to_addr: &Address,
) {
) -> anyhow::Result<()> {
let transfer_id: u64 = rand::random();
// spawn a worker and tell it to send the file
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
"/ft_worker.wasm".into(),
&OnExit::None, // can set message-on-panic here
Some(transfer_id.to_string().as_str()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None, // can set message-on-panic here
&Capabilities::All,
false, // not public
) else {
print_to_terminal(0, "file_transfer: failed to spawn worker!");
return;
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
// tell the worker what to do
let payload_or_inherit = match file_bytes {
Some(bytes) => Some(Payload { mime: None, bytes }),
None => None,
};
send_request(
&Address {
node: our.node.clone(),
process: worker_process_id,
},
&Request {
inherit: !payload_or_inherit.is_some(),
expects_response: Some(61),
ipc: serde_json::to_vec(&FTWorkerCommand::Send {
target: to_addr.to_string(),
let mut req = Request::new()
.target((our.node.as_ref(), worker_process_id))
.inherit(!payload_or_inherit.is_some())
.expects_response(timeout + 1) // don't call with 2^64 lol
.ipc(
serde_json::to_vec(&FTWorkerCommand::Send {
target: to_addr.clone(),
file_name: file_name.into(),
timeout: 60,
timeout,
})
.unwrap(),
metadata: None,
},
Some(
&serde_json::to_vec(&FileTransferContext {
)
.context(
serde_json::to_vec(&FileTransferContext {
file_name: file_name.into(),
file_size: match &payload_or_inherit {
Some(p) => Some(p.bytes.len() as u64),
@ -93,39 +96,36 @@ pub fn spawn_transfer(
start_time: std::time::SystemTime::now(),
})
.unwrap(),
),
payload_or_inherit.as_ref(),
);
);
if let Some(payload) = payload_or_inherit {
req = req.payload(payload);
}
req.send()
}
pub fn spawn_receive_transfer(our: &Address, ipc: &[u8]) {
/// A helper function to allow a process to easily handle an incoming transfer
/// from an ft_worker. Call this when you get the initial [`FTWorkerCommand::Receive`]
/// and let it do the rest. The outcome will be sent as an [`FTWorkerResult`] inside
/// a Response to the caller.
#[allow(dead_code)]
pub fn spawn_receive_transfer(our: &Address, ipc: &[u8]) -> anyhow::Result<()> {
let Ok(FTWorkerCommand::Receive { transfer_id, .. }) = serde_json::from_slice(ipc) else {
print_to_terminal(0, "file_transfer: got weird request");
return;
return Err(anyhow::anyhow!("spawn_receive_transfer: got malformed request"));
};
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
"/ft_worker.wasm".into(),
&OnExit::None, // can set message-on-panic here
Some(transfer_id.to_string().as_str()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None, // can set message-on-panic here
&Capabilities::All,
false, // not public
) else {
print_to_terminal(0, "file_transfer: failed to spawn worker!");
return;
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
// forward receive command to worker
send_request(
&Address {
node: our.node.clone(),
process: worker_process_id,
},
&Request {
inherit: true,
expects_response: None,
ipc: ipc.to_vec(),
metadata: None,
},
None,
None,
);
Request::new()
.target((our.node.as_ref(), worker_process_id))
.inherit(true)
.ipc(ipc)
.send()
}

View File

@ -1,8 +1,6 @@
use serde::{Deserialize, Serialize};
//use uqbar_process_lib::uqbar::process::standard::*;
use uqbar_process_lib::uqbar::process::standard::{Message as StdMessage, Request as StdRequest, Response as StdResponse, SendErrorKind};
use uqbar_process_lib::{await_message, get_payload, print_to_terminal, send_and_await_response, send_request, send_response, Address, Message, Payload};
use uqbar_process_lib::*;
use uqbar_process_lib::println;
mod ft_worker_lib;
use ft_worker_lib::*;
@ -22,204 +20,151 @@ pub enum FTWorkerProtocol {
Finished,
}
struct Component;
impl Guest for Component {
fn init(our: String) {
let our = Address::from_str(&our).unwrap();
print_to_terminal(1, &format!("{}: start", our.process));
call_init!(init);
let Ok(Message::Request { source: parent_process, ipc, .. }) = await_message() else {
fn init(our: Address) {
let Ok(Message::Request { source: parent_process, ipc, .. }) = await_message() else {
panic!("ft_worker: got bad init message");
};
let command = serde_json::from_slice::<FTWorkerCommand>(&ipc)
.expect("ft_worker: got unparseable init message");
let command = serde_json::from_slice::<FTWorkerCommand>(&ipc)
.expect("ft_worker: got unparseable init message");
match command {
FTWorkerCommand::Send {
target,
file_name,
timeout,
} => {
let transfer_id: u64 = our.process.process().parse().unwrap();
let Some(payload) = get_payload() else {
print_to_terminal(0, "FTWorker wasn't given payload, exiting");
return
};
let file_bytes = payload.bytes;
let mut file_size = file_bytes.len() as u64;
let mut offset: u64 = 0;
let mut chunk_size: u64 = 1048576; // 1MB
let total_chunks = (file_size as f64 / chunk_size as f64).ceil() as u64;
// send a file to another worker
// start by telling target to expect a file,
// then upon reciving affirmative response,
// send contents in chunks and wait for
// acknowledgement.
match send_and_await_response(
&Address::from_str(&target).unwrap(),
&StdRequest {
inherit: false,
expects_response: Some(timeout),
ipc: serde_json::to_vec(&FTWorkerCommand::Receive {
transfer_id,
file_name,
file_size,
total_chunks,
timeout,
})
.unwrap(),
metadata: None,
},
None,
) {
Err(send_error) => {
respond_to_parent(FTWorkerResult::Err(match send_error.kind {
SendErrorKind::Offline => TransferError::TargetOffline,
SendErrorKind::Timeout => TransferError::TargetTimeout,
}))
}
Ok((opp_worker, StdMessage::Response((response, _)))) => {
let Ok(FTWorkerProtocol::Ready) = serde_json::from_slice(&response.ipc) else {
respond_to_parent(FTWorkerResult::Err(TransferError::TargetRejected));
return;
};
// send file in chunks
loop {
if file_size < chunk_size {
// this is the last chunk, so we should expect a Finished response
chunk_size = file_size;
let payload = Payload {
mime: None,
bytes: file_bytes
[offset as usize..offset as usize + chunk_size as usize]
.to_vec(),
};
send_request(
&opp_worker,
&StdRequest {
inherit: false,
expects_response: Some(timeout),
ipc: vec![],
metadata: None,
},
None,
Some(&payload),
);
break;
}
let payload = Payload {
mime: None,
bytes: file_bytes
[offset as usize..offset as usize + chunk_size as usize]
.to_vec(),
};
send_request(
&opp_worker,
&StdRequest {
inherit: false,
expects_response: None,
ipc: vec![],
metadata: None,
},
None,
Some(&payload),
);
file_size -= chunk_size;
offset += chunk_size;
}
// now wait for Finished response
let Ok(Message::Response { ipc, .. }) = await_message() else {
respond_to_parent(FTWorkerResult::Err(TransferError::TargetRejected));
return;
};
let Ok(FTWorkerProtocol::Finished) = serde_json::from_slice(&ipc) else {
respond_to_parent(FTWorkerResult::Err(TransferError::TargetRejected));
return;
};
// return success to parent
respond_to_parent(FTWorkerResult::SendSuccess);
}
_ => respond_to_parent(FTWorkerResult::Err(TransferError::TargetRejected)),
}
}
FTWorkerCommand::Receive {
file_name,
total_chunks,
timeout,
..
} => {
// send Ready response to counterparty
send_response(
&StdResponse {
inherit: false,
ipc: serde_json::to_vec(&FTWorkerProtocol::Ready).unwrap(),
metadata: None,
},
None,
);
// receive a file from a worker, then send it to parent
// all messages will be chunks of file. when we receive the
// last chunk, send a Finished message to sender and Success to parent.
let mut file_bytes = Vec::new();
let mut chunks_received = 0;
let start_time = std::time::Instant::now();
loop {
let Ok(Message::Request { .. }) = await_message() else {
respond_to_parent(FTWorkerResult::Err(TransferError::SourceFailed));
return;
};
if start_time.elapsed().as_secs() > timeout {
respond_to_parent(FTWorkerResult::Err(TransferError::SourceFailed));
return;
}
let Some(payload) = get_payload() else {
respond_to_parent(FTWorkerResult::Err(TransferError::SourceFailed));
return;
};
chunks_received += 1;
file_bytes.extend(payload.bytes);
if chunks_received == total_chunks {
break;
}
}
// send Finished message to sender
send_response(
&StdResponse {
inherit: false,
ipc: serde_json::to_vec(&FTWorkerProtocol::Finished).unwrap(),
metadata: None,
},
None,
);
// send Success message to parent
send_request(
&parent_process,
&StdRequest {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&FTWorkerResult::ReceiveSuccess(file_name))
.unwrap(),
metadata: None,
},
None,
Some(&Payload {
mime: None,
bytes: file_bytes,
}),
);
}
let Some(result) = (match command {
FTWorkerCommand::Send {
target,
file_name,
timeout,
} => Some(handle_send(&our, &target, &file_name, timeout)),
FTWorkerCommand::Receive {
file_name,
total_chunks,
timeout,
..
} => handle_receive(parent_process, &file_name, total_chunks, timeout),
}) else { return };
Response::new()
.ipc(serde_json::to_vec(&result).unwrap())
.send()
.unwrap();
// job is done
}
fn handle_send(our: &Address, target: &Address, file_name: &str, timeout: u64) -> FTWorkerResult {
let transfer_id: u64 = our.process().parse().unwrap();
let Some(payload) = get_payload() else {
println!("ft_worker: wasn't given payload!");
return FTWorkerResult::Err(TransferError::SourceFailed)
};
let file_bytes = payload.bytes;
let mut file_size = file_bytes.len() as u64;
let mut offset: u64 = 0;
let chunk_size: u64 = 1048576; // 1MB, can be changed
let total_chunks = (file_size as f64 / chunk_size as f64).ceil() as u64;
// send a file to another worker
// start by telling target to expect a file,
// then upon reciving affirmative response,
// send contents in chunks and wait for
// acknowledgement.
let Ok(Ok(response)) = Request::to(target.clone())
.ipc(serde_json::to_vec(&FTWorkerCommand::Receive {
transfer_id,
file_name: file_name.to_string(),
file_size,
total_chunks,
timeout,
}).unwrap())
.send_and_await_response(timeout) else {
return FTWorkerResult::Err(TransferError::TargetOffline)
};
let opp_worker = response.source();
let Ok(FTWorkerProtocol::Ready) = serde_json::from_slice(&response.ipc()) else {
return FTWorkerResult::Err(TransferError::TargetRejected)
};
// send file in chunks
loop {
if file_size < chunk_size {
// this is the last chunk, so we should expect a Finished response
let _ = Request::to(opp_worker.clone())
.ipc(vec![])
.payload(Payload {
mime: None,
bytes: file_bytes[offset as usize..offset as usize + file_size as usize]
.to_vec(),
})
.expects_response(timeout)
.send();
break;
}
let _ = Request::to(opp_worker.clone())
.ipc(vec![])
.payload(Payload {
mime: None,
bytes: file_bytes[offset as usize..offset as usize + chunk_size as usize].to_vec(),
})
.send();
file_size -= chunk_size;
offset += chunk_size;
}
// now wait for Finished response
let Ok(Message::Response { ipc, .. }) = await_message() else {
return FTWorkerResult::Err(TransferError::TargetRejected)
};
let Ok(FTWorkerProtocol::Finished) = serde_json::from_slice(&ipc) else {
return FTWorkerResult::Err(TransferError::TargetRejected)
};
// return success to parent
return FTWorkerResult::SendSuccess;
}
fn handle_receive(
parent_process: Address,
file_name: &str,
total_chunks: u64,
timeout: u64,
) -> Option<FTWorkerResult> {
// send Ready response to counterparty
Response::new()
.ipc(serde_json::to_vec(&FTWorkerProtocol::Ready).unwrap())
.send()
.unwrap();
// receive a file from a worker, then send it to parent
// all messages will be chunks of file. when we receive the
// last chunk, send a Finished message to sender and Success to parent.
let mut file_bytes = Vec::new();
let mut chunks_received = 0;
let start_time = std::time::Instant::now();
loop {
let Ok(Message::Request { .. }) = await_message() else {
return Some(FTWorkerResult::Err(TransferError::SourceFailed))
};
if start_time.elapsed().as_secs() > timeout {
return Some(FTWorkerResult::Err(TransferError::SourceFailed))
}
let Some(payload) = get_payload() else {
return Some(FTWorkerResult::Err(TransferError::SourceFailed))
};
chunks_received += 1;
file_bytes.extend(payload.bytes);
if chunks_received == total_chunks {
break;
}
}
}
fn respond_to_parent(result: FTWorkerResult) {
send_response(
&StdResponse {
inherit: false,
ipc: serde_json::to_vec(&result).unwrap(),
metadata: None,
},
None,
);
// send Finished message to sender
Response::new()
.ipc(serde_json::to_vec(&FTWorkerProtocol::Finished).unwrap())
.send()
.unwrap();
// send Success message to parent
Request::to(parent_process)
.ipc(serde_json::to_vec(&FTWorkerResult::ReceiveSuccess(file_name.to_string())).unwrap())
.payload(Payload {
mime: None,
bytes: file_bytes,
})
.send()
.unwrap();
None
}

View File

@ -9,7 +9,6 @@
"filesystem:sys:uqbar",
"http_server:sys:uqbar",
"http_client:sys:uqbar",
"encryptor:sys:uqbar",
"net:sys:uqbar",
"vfs:sys:uqbar",
"kernel:sys:uqbar",
@ -21,6 +20,11 @@
}
}
],
"grant_messaging": [
"http_server:sys:uqbar",
"terminal:terminal:uqbar",
"vfs:sys:uqbar"
],
"public": false
}
]

View File

@ -1,6 +1,6 @@
{
"package": "app_store",
"publisher": "uqbar",
"version": [0, 1, 0],
"description": "A package manager + app store. This JSON field is optional and you can add whatever you want in addition to this."
"version": [0, 2, 0],
"description": "A package manager + app store."
}

View File

@ -6,8 +6,7 @@
"request_networking": false,
"request_messaging": [
"http_bindings:http_bindings:uqbar",
"http_server:sys:uqbar",
"encryptor:sys:uqbar"
"http_server:sys:uqbar"
],
"public": false
}

View File

@ -69,7 +69,7 @@ pub async fn send_and_await_response(
}
let id = process
.process
.handle_request(source, target, request, None, payload)
.send_request(source, target, request, None, payload)
.await;
match id {
Ok(id) => match process.process.get_specific_message_for_process(id).await {
@ -103,7 +103,7 @@ impl ProcessState {
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
/// that timeout and return timeout error if it expires.
pub async fn handle_request(
pub async fn send_request(
&mut self,
fake_source: Option<t::Address>, // only used when kernel steps in to get/set state
target: wit::Address,

View File

@ -579,7 +579,7 @@ impl StandardHost for process::ProcessWasi {
) -> Result<()> {
let id = self
.process
.handle_request(None, target, request, context, payload)
.send_request(None, target, request, context, payload)
.await;
match id {
Ok(_id) => Ok(()),
@ -599,7 +599,7 @@ impl StandardHost for process::ProcessWasi {
for request in requests {
let id = self
.process
.handle_request(None, request.0, request.1, request.2, request.3)
.send_request(None, request.0, request.1, request.2, request.3)
.await;
match id {
Ok(_id) => continue,

View File

@ -267,8 +267,11 @@ async fn handle_request(
}
}
KvAction::Backup => {
// loop through all db directories and backup.
//
// looping through open dbs and flushing their memtables
for db_ref in open_kvs.iter() {
let db = db_ref.value();
db.flush()?;
}
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
};
@ -428,11 +431,7 @@ async fn check_caps(
Ok(())
}
KvAction::Backup { .. } => {
if source.process != *STATE_PROCESS_ID {
return Err(KvError::NoCap {
error: request.action.to_string(),
});
}
// caps
Ok(())
}
}

View File

@ -284,8 +284,17 @@ async fn handle_request(
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Backup => {
// execute WAL flush.
//
for db_ref in open_dbs.iter() {
let db = db_ref.value().lock().await;
let result: rusqlite::Result<()> = db
.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |_| Ok(()))
.map(|_| ());
if let Err(e) = result {
return Err(SqliteError::RusqliteError {
error: e.to_string(),
});
}
}
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
};
@ -448,11 +457,7 @@ async fn check_caps(
Ok(())
}
SqliteAction::Backup => {
if source.process != *STATE_PROCESS_ID {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
// flushing WALs for backup
Ok(())
}
}
@ -531,7 +536,7 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError)
id: km.id,
source: Address {
node: our_name.clone(),
process: KV_PROCESS_ID.clone(),
process: SQLITE_PROCESS_ID.clone(),
},
target: match &km.rsvp {
None => km.source.clone(),

View File

@ -218,15 +218,23 @@ async fn handle_request(
}
}
StateAction::Backup => {
// handle Backup action
println!("got backup");
let checkpoint_dir = format!("{}/kernel/checkpoint", &home_directory_path);
let checkpoint_dir = format!("{}/kernel/backup", &home_directory_path);
if Path::new(&checkpoint_dir).exists() {
let _ = fs::remove_dir_all(&checkpoint_dir).await;
fs::remove_dir_all(&checkpoint_dir).await?;
}
let checkpoint = Checkpoint::new(&db).unwrap();
checkpoint.create_checkpoint(&checkpoint_dir).unwrap();
let checkpoint = Checkpoint::new(&db).map_err(|e| StateError::RocksDBError {
action: "BackupCheckpointNew".into(),
error: e.to_string(),
})?;
checkpoint.create_checkpoint(&checkpoint_dir).map_err(|e| {
StateError::RocksDBError {
action: "BackupCheckpointCreate".into(),
error: e.to_string(),
}
})?;
(serde_json::to_vec(&StateResponse::Backup).unwrap(), None)
}
};
@ -348,7 +356,7 @@ async fn bootstrap(
let packages = get_zipped_packages().await;
for (package_name, mut package) in packages {
for (package_name, mut package) in packages.clone() {
// special case tester: only load it in if in simulation mode
if package_name == "tester" {
#[cfg(not(feature = "simulation-mode"))]
@ -488,58 +496,6 @@ async fn bootstrap(
}
}
// grant capabilities to other initially spawned processes, distro
if let Some(to_grant) = &entry.grant_messaging {
for value in to_grant {
let mut capability = None;
let mut to_process = None;
match value {
serde_json::Value::String(process_name) => {
if let Ok(parsed_process_id) = ProcessId::from_str(process_name) {
capability = Some(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str(process_name).unwrap(),
},
params: "\"messaging\"".into(),
});
to_process = Some(parsed_process_id);
}
}
serde_json::Value::Object(map) => {
if let Some(process_name) = map.get("process") {
if let Ok(parsed_process_id) =
ProcessId::from_str(&process_name.as_str().unwrap())
{
if let Some(params) = map.get("params") {
capability = Some(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str(
process_name.as_str().unwrap(),
)
.unwrap(),
},
params: params.to_string(),
});
to_process = Some(parsed_process_id);
}
}
}
}
_ => {
continue;
}
}
if let Some(cap) = capability {
if let Some(process) = process_map.get_mut(&to_process.unwrap()) {
process.capabilities.insert(cap);
}
}
}
}
if entry.request_networking {
requested_caps.insert(Capability {
issuer: Address {
@ -589,6 +545,114 @@ async fn bootstrap(
);
}
}
// second loop: go and grant_capabilities to processes
// can't do this in first loop because we need to have all processes in the map first
for (package_name, mut package) in packages {
// special case tester: only load it in if in simulation mode
if package_name == "tester" {
#[cfg(not(feature = "simulation-mode"))]
continue;
#[cfg(feature = "simulation-mode")]
{}
}
// get and read manifest.json
let Ok(mut package_manifest_zip) = package.by_name("manifest.json") else {
println!(
"fs: missing manifest for package {}, skipping",
package_name
);
continue;
};
let mut manifest_content = Vec::new();
package_manifest_zip
.read_to_end(&mut manifest_content)
.unwrap();
drop(package_manifest_zip);
let package_manifest = String::from_utf8(manifest_content)?;
let package_manifest = serde_json::from_str::<Vec<PackageManifestEntry>>(&package_manifest)
.expect("fs: manifest parse error");
// get and read metadata.json
let Ok(mut package_metadata_zip) = package.by_name("metadata.json") else {
println!(
"fs: missing metadata for package {}, skipping",
package_name
);
continue;
};
let mut metadata_content = Vec::new();
package_metadata_zip
.read_to_end(&mut metadata_content)
.unwrap();
drop(package_metadata_zip);
let package_metadata: serde_json::Value =
serde_json::from_slice(&metadata_content).expect("fs: metadata parse error");
println!("fs: found package metadata: {:?}\r", package_metadata);
let package_name = package_metadata["package"]
.as_str()
.expect("fs: metadata parse error: bad package name");
let package_publisher = package_metadata["publisher"]
.as_str()
.expect("fs: metadata parse error: bad publisher name");
// for each process-entry in manifest.json:
for entry in package_manifest {
let our_process_id = format!(
"{}:{}:{}",
entry.process_name, package_name, package_publisher
);
// grant capabilities to other initially spawned processes, distro
if let Some(to_grant) = &entry.grant_messaging {
for value in to_grant {
match value {
serde_json::Value::String(process_name) => {
if let Ok(parsed_process_id) = ProcessId::from_str(process_name) {
if let Some(process) = process_map.get_mut(&parsed_process_id) {
process.capabilities.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str(&our_process_id).unwrap(),
},
params: "\"messaging\"".into(),
});
}
}
}
serde_json::Value::Object(map) => {
if let Some(process_name) = map.get("process") {
if let Ok(parsed_process_id) =
ProcessId::from_str(&process_name.as_str().unwrap())
{
if let Some(params) = map.get("params") {
if let Some(process) =
process_map.get_mut(&parsed_process_id)
{
process.capabilities.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::from_str(&our_process_id)
.unwrap(),
},
params: params.to_string(),
});
}
}
}
}
}
_ => {
continue;
}
}
}
}
}
}
Ok(())
}
@ -609,6 +673,14 @@ async fn get_zipped_packages() -> Vec<(String, zip::ZipArchive<std::io::Cursor<&
packages
}
impl From<std::io::Error> for StateError {
fn from(err: std::io::Error) -> Self {
StateError::IOError {
error: err.to_string(),
}
}
}
fn make_error_message(our_name: String, km: &KernelMessage, error: StateError) -> KernelMessage {
KernelMessage {
id: km.id,

View File

@ -5,7 +5,6 @@ use std::collections::{HashMap, HashSet};
use thiserror::Error;
lazy_static::lazy_static! {
pub static ref ENCRYPTOR_PROCESS_ID: ProcessId = ProcessId::new(Some("encryptor"), "sys", "uqbar");
pub static ref ETH_RPC_PROCESS_ID: ProcessId = ProcessId::new(Some("eth_rpc"), "sys", "uqbar");
pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "sys", "uqbar");
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar");

View File

@ -558,6 +558,25 @@ async fn check_caps(
let src_package_id = PackageId::new(source.process.package(), source.process.publisher());
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
// check for root cap (todo make temp buffer so this is more efficient?)
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"root": true,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_root_cap = recv_cap_bool.await?;
match &request.action {
VfsAction::CreateDir
| VfsAction::CreateDirAll
@ -579,25 +598,7 @@ async fn check_caps(
if src_package_id == package_id {
return Ok(());
}
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
if !has_root_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
@ -617,6 +618,10 @@ async fn check_caps(
if src_package_id == package_id {
return Ok(());
}
if has_root_cap {
return Ok(());
}
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
@ -645,25 +650,7 @@ async fn check_caps(
}
VfsAction::CreateDrive => {
if src_package_id != package_id {
// might have root caps
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"root": true,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
if !has_root_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),