mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-23 03:44:04 +03:00
manifest: remove grant_messaging
in favor of public
This commit is contained in:
parent
2da972fd31
commit
b85413339a
8
build.rs
8
build.rs
@ -180,10 +180,10 @@ fn main() {
|
||||
let entry_path = entry.unwrap().path();
|
||||
let package_name = entry_path.file_name().unwrap().to_str().unwrap();
|
||||
|
||||
// // NOT YET building KV, waiting for deps to be ready
|
||||
// if package_name == "key_value" {
|
||||
// continue;
|
||||
// }
|
||||
// NOT YET building KV, waiting for deps to be ready
|
||||
if package_name == "key_value" {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If Cargo.toml is present, build the app
|
||||
let parent_pkg_path = format!("{}/pkg", entry_path.display());
|
||||
|
@ -16,6 +16,6 @@
|
||||
"kernel:sys:uqbar",
|
||||
"eth_rpc:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
]
|
@ -13,6 +13,7 @@ use kernel_types as kt;
|
||||
|
||||
#[allow(dead_code)]
|
||||
mod process_lib;
|
||||
mod transfer_lib;
|
||||
|
||||
struct Component;
|
||||
|
||||
@ -23,7 +24,7 @@ pub enum AppTrackerRequest {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ApptrackerResponse {
|
||||
pub enum AppTrackerResponse {
|
||||
New { package: String },
|
||||
Install { package: String },
|
||||
Error { error: String },
|
||||
@ -36,10 +37,10 @@ pub struct PackageManifestEntry {
|
||||
pub on_panic: kt::OnPanic,
|
||||
pub request_networking: bool,
|
||||
pub request_messaging: Vec<String>,
|
||||
pub grant_messaging: Vec<String>, // special logic for the string "all": makes process public
|
||||
pub public: bool,
|
||||
}
|
||||
|
||||
fn parse_command(our: &Address, request_string: String) -> anyhow::Result<ApptrackerResponse> {
|
||||
fn parse_command(our: &Address, request_string: String) -> anyhow::Result<AppTrackerResponse> {
|
||||
match serde_json::from_str(&request_string)? {
|
||||
AppTrackerRequest::New { package } => {
|
||||
let Some(payload) = get_payload() else {
|
||||
@ -84,7 +85,7 @@ fn parse_command(our: &Address, request_string: String) -> anyhow::Result<Apptra
|
||||
Some(&payload),
|
||||
5,
|
||||
)?;
|
||||
Ok(ApptrackerResponse::New { package })
|
||||
Ok(AppTrackerResponse::New { package })
|
||||
}
|
||||
AppTrackerRequest::Install { package } => {
|
||||
let vfs_address = Address {
|
||||
@ -186,30 +187,6 @@ fn parse_command(our: &Address, request_string: String) -> anyhow::Result<Apptra
|
||||
}
|
||||
};
|
||||
|
||||
// let Some(messaging_cap) = get_capability(
|
||||
// &Address {
|
||||
// node: our.node.clone(),
|
||||
// process: entry_process_id,
|
||||
// },
|
||||
// &"\"messaging\"".into()
|
||||
// ) else {
|
||||
// return Err(anyhow::anyhow!(
|
||||
// "app_tracker: no messaging cap for {} to give away!",
|
||||
// entry.process_name,
|
||||
// ));
|
||||
// };
|
||||
// for process_name in &entry.grant_messaging {
|
||||
// if process_name == "all" {
|
||||
// public = true;
|
||||
// continue;
|
||||
// }
|
||||
// let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
|
||||
// // TODO handle arbitrary caps here
|
||||
// continue;
|
||||
// };
|
||||
// bindings::share_capability(&parsed_process_id, &messaging_cap);
|
||||
// }
|
||||
|
||||
for process_name in &entry.request_messaging {
|
||||
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
|
||||
// TODO handle arbitrary caps here
|
||||
@ -228,22 +205,6 @@ fn parse_command(our: &Address, request_string: String) -> anyhow::Result<Apptra
|
||||
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
||||
}
|
||||
|
||||
for process_name in &entry.request_messaging {
|
||||
let Ok(parsed_process_id) = ProcessId::from_str(process_name) else {
|
||||
continue;
|
||||
};
|
||||
let Some(messaging_cap) = get_capability(
|
||||
&Address {
|
||||
node: our.node.clone(),
|
||||
process: parsed_process_id.clone(),
|
||||
},
|
||||
&"\"messaging\"".into()
|
||||
) else {
|
||||
return Err(anyhow::anyhow!(format!("app_tracker: no cap for {}", process_name)));
|
||||
};
|
||||
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
||||
}
|
||||
|
||||
let process_id = format!("{}:{}", entry.process_name, package.clone());
|
||||
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
|
||||
return Err(anyhow::anyhow!("app_tracker: invalid process id!"));
|
||||
@ -307,7 +268,7 @@ fn parse_command(our: &Address, request_string: String) -> anyhow::Result<Apptra
|
||||
5,
|
||||
)?;
|
||||
}
|
||||
Ok(ApptrackerResponse::Install { package })
|
||||
Ok(AppTrackerResponse::Install { package })
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -354,7 +315,7 @@ impl Guest for Component {
|
||||
Err(e) => {
|
||||
print_to_terminal(0, &format!("app_tracker: got error {}", e));
|
||||
if let Some(_) = expects_response {
|
||||
let error = ApptrackerResponse::Error {
|
||||
let error = AppTrackerResponse::Error {
|
||||
error: format!("{}", e),
|
||||
};
|
||||
let _ = send_response(
|
||||
|
226
modules/app_tracker/src/transfer_lib.rs
Normal file
226
modules/app_tracker/src/transfer_lib.rs
Normal file
@ -0,0 +1,226 @@
|
||||
use super::bindings::component::uq_process::types::*;
|
||||
use crate::bindings::wasi::random::random;
|
||||
use crate::bindings::{get_payload, get_unix_time, receive, send_request, send_response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TransferError {
|
||||
// in all errors, u64 is number of bytes successfully transferred
|
||||
TargetOffline(u64),
|
||||
TargetTimeout(u64),
|
||||
TargetRejected(u64),
|
||||
SourceFailed(u64),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum TransferMetadata {
|
||||
Begin {
|
||||
file_name: String,
|
||||
file_size: u64,
|
||||
total_chunks: u64,
|
||||
},
|
||||
}
|
||||
|
||||
pub fn transfer(
|
||||
to_addr: Address,
|
||||
bytes: Vec<u8>,
|
||||
max_timeout: u64,
|
||||
) -> (
|
||||
Result<(), TransferError>,
|
||||
Vec<Result<(Address, Message), (SendError, Option<Context>)>>,
|
||||
) {
|
||||
let transfer_context_id = random::get_random_u64();
|
||||
let mut bytes_remaining: u64 = bytes.len() as u64;
|
||||
let mut offset: u64 = 0;
|
||||
let mut chunk_size: u64 = 1048576; // 1MB
|
||||
let mut chunks_sent = 0;
|
||||
let total_chunks = (bytes.len() as f64 / chunk_size as f64).ceil() as u64;
|
||||
loop {
|
||||
chunks_sent += 1;
|
||||
if bytes_remaining < chunk_size {
|
||||
chunk_size = bytes_remaining;
|
||||
}
|
||||
let payload = Payload {
|
||||
mime: None,
|
||||
bytes: bytes[offset as usize..offset as usize + chunk_size as usize].to_vec(),
|
||||
};
|
||||
send_request(
|
||||
&to_addr,
|
||||
&Request {
|
||||
inherit: false,
|
||||
expects_response: Some(max_timeout),
|
||||
ipc: None,
|
||||
metadata: Some(if chunks_sent == 1 {
|
||||
serde_json::to_string(&TransferMetadata::Begin {
|
||||
file_name: "test".to_string(),
|
||||
file_size: bytes.len() as u64,
|
||||
total_chunks,
|
||||
})
|
||||
.unwrap()
|
||||
} else {
|
||||
chunks_sent.to_string()
|
||||
}),
|
||||
},
|
||||
Some(&&transfer_context_id.to_string()),
|
||||
Some(&payload),
|
||||
);
|
||||
bytes_remaining -= chunk_size;
|
||||
offset += chunk_size;
|
||||
if bytes_remaining == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let mut chunks_confirmed = 0;
|
||||
let mut non_transfer_message_queue = Vec::new();
|
||||
loop {
|
||||
let next = receive();
|
||||
if let Err((send_error, context)) = &next {
|
||||
match context {
|
||||
Some(_) => match send_error.kind {
|
||||
SendErrorKind::Offline => {
|
||||
return (
|
||||
Err(TransferError::TargetOffline(chunks_confirmed * chunk_size)),
|
||||
non_transfer_message_queue,
|
||||
)
|
||||
}
|
||||
SendErrorKind::Timeout => {
|
||||
return (
|
||||
Err(TransferError::TargetTimeout(chunks_confirmed * chunk_size)),
|
||||
non_transfer_message_queue,
|
||||
)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
non_transfer_message_queue.push(next);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Ok((source, message)) = &next {
|
||||
if source.process == to_addr.process {
|
||||
match message {
|
||||
Message::Request(_) => {
|
||||
non_transfer_message_queue.push(next);
|
||||
continue;
|
||||
}
|
||||
Message::Response((response, context)) => {
|
||||
if transfer_context_id
|
||||
== context
|
||||
.as_ref()
|
||||
.unwrap_or(&"".into())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(0)
|
||||
{
|
||||
chunks_confirmed += 1;
|
||||
if response
|
||||
.metadata
|
||||
.as_ref()
|
||||
.unwrap_or(&"".into())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(0)
|
||||
!= chunks_confirmed
|
||||
{
|
||||
return (
|
||||
Err(TransferError::TargetRejected(
|
||||
chunks_confirmed * chunk_size,
|
||||
)),
|
||||
non_transfer_message_queue,
|
||||
);
|
||||
}
|
||||
if chunks_confirmed == chunks_sent {
|
||||
return (Ok(()), non_transfer_message_queue);
|
||||
}
|
||||
} else {
|
||||
non_transfer_message_queue.push(next);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
non_transfer_message_queue.push(next);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive_transfer(
|
||||
transfer_source: Address,
|
||||
total_chunks: u64,
|
||||
max_timeout: u64,
|
||||
) -> (
|
||||
Result<Vec<u8>, TransferError>,
|
||||
Vec<Result<(Address, Message), (SendError, Option<Context>)>>,
|
||||
) {
|
||||
let start_time: u64 = get_unix_time();
|
||||
// get first payload then loop and receive rest
|
||||
let mut file = match get_payload() {
|
||||
Some(payload) => payload.bytes,
|
||||
None => {
|
||||
return (Err(TransferError::SourceFailed(0)), vec![]);
|
||||
}
|
||||
};
|
||||
// respond to first request
|
||||
send_response(
|
||||
&Response {
|
||||
ipc: None,
|
||||
metadata: Some(1.to_string()),
|
||||
},
|
||||
None,
|
||||
);
|
||||
if total_chunks == 1 {
|
||||
return (Ok(file), vec![]);
|
||||
}
|
||||
let mut chunk_num = 1;
|
||||
let mut non_transfer_message_queue = Vec::new();
|
||||
loop {
|
||||
let next = receive();
|
||||
if start_time + max_timeout < get_unix_time() {
|
||||
return (
|
||||
Err(TransferError::TargetTimeout(file.len() as u64)),
|
||||
non_transfer_message_queue,
|
||||
);
|
||||
}
|
||||
if let Err(_) = &next {
|
||||
non_transfer_message_queue.push(next);
|
||||
} else if let Ok((source, message)) = &next {
|
||||
// we know all messages from source process will be for this transfer,
|
||||
// since they are sent sequentially and it's a single-file queue.
|
||||
if source.process == transfer_source.process {
|
||||
match message {
|
||||
Message::Request(_) => {
|
||||
let payload = match get_payload() {
|
||||
Some(payload) => payload,
|
||||
None => {
|
||||
return (
|
||||
Err(TransferError::SourceFailed(file.len() as u64)),
|
||||
non_transfer_message_queue,
|
||||
);
|
||||
}
|
||||
};
|
||||
chunk_num += 1;
|
||||
file.extend(payload.bytes);
|
||||
send_response(
|
||||
&Response {
|
||||
ipc: None,
|
||||
metadata: Some(chunk_num.to_string()),
|
||||
},
|
||||
None,
|
||||
);
|
||||
if chunk_num == total_chunks {
|
||||
return (Ok(file), non_transfer_message_queue);
|
||||
}
|
||||
}
|
||||
Message::Response(_) => {
|
||||
return (
|
||||
Err(TransferError::SourceFailed(file.len() as u64)),
|
||||
non_transfer_message_queue,
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
non_transfer_message_queue.push(next);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -9,6 +9,6 @@
|
||||
"encryptor:sys:uqbar",
|
||||
"http_server:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -9,6 +9,6 @@
|
||||
"http_server:sys:uqbar",
|
||||
"encryptor:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -10,8 +10,6 @@
|
||||
"encryptor:sys:uqbar",
|
||||
"vfs:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": [
|
||||
"http_server:sys:uqbar"
|
||||
]
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -9,6 +9,6 @@
|
||||
"encryptor:sys:uqbar",
|
||||
"http_server:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -7,8 +7,6 @@
|
||||
"request_messaging": [
|
||||
"vfs:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": [
|
||||
"all"
|
||||
]
|
||||
"public": true
|
||||
}
|
||||
]
|
||||
|
@ -7,6 +7,6 @@
|
||||
"request_messaging": [
|
||||
"http_bindings:http_bindings:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -9,9 +9,6 @@
|
||||
"http_bindings:http_bindings:uqbar",
|
||||
"eth_rpc:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": [
|
||||
"eth_rpc:sys:uqbar",
|
||||
"filesystem:sys:uqbar"
|
||||
]
|
||||
"public": true
|
||||
}
|
||||
]
|
||||
|
@ -9,6 +9,6 @@
|
||||
"app_tracker:app_tracker:uqbar",
|
||||
"http_server:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": []
|
||||
"public": false
|
||||
}
|
||||
]
|
||||
|
@ -7,8 +7,6 @@
|
||||
"request_messaging": [
|
||||
"net:sys:uqbar"
|
||||
],
|
||||
"grant_messaging": [
|
||||
"all"
|
||||
]
|
||||
"public": true
|
||||
}
|
||||
]
|
||||
|
@ -338,26 +338,26 @@ async fn bootstrap(
|
||||
.unwrap(),
|
||||
});
|
||||
|
||||
let mut public_process = false;
|
||||
let public_process = entry.public;
|
||||
|
||||
// queue the granted capabilities
|
||||
for process_name in &entry.grant_messaging {
|
||||
if process_name == "all" {
|
||||
public_process = true;
|
||||
continue;
|
||||
}
|
||||
let process_id = ProcessId::from_str(process_name).unwrap();
|
||||
caps_to_grant.push((
|
||||
process_id.clone(),
|
||||
Capability {
|
||||
issuer: Address {
|
||||
node: our_name.to_string(),
|
||||
process: ProcessId::from_str(&our_process_id).unwrap(),
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
},
|
||||
));
|
||||
}
|
||||
// for process_name in &entry.public {
|
||||
// if process_name == "all" {
|
||||
// public_process = true;
|
||||
// continue;
|
||||
// }
|
||||
// let process_id = ProcessId::from_str(process_name).unwrap();
|
||||
// caps_to_grant.push((
|
||||
// process_id.clone(),
|
||||
// Capability {
|
||||
// issuer: Address {
|
||||
// node: our_name.to_string(),
|
||||
// process: ProcessId::from_str(&our_process_id).unwrap(),
|
||||
// },
|
||||
// params: "\"messaging\"".into(),
|
||||
// },
|
||||
// ));
|
||||
// }
|
||||
|
||||
// save in process map
|
||||
let file = FileIdentifier::new_uuid();
|
||||
|
@ -408,7 +408,7 @@ pub struct PackageManifestEntry {
|
||||
pub on_panic: OnPanic,
|
||||
pub request_networking: bool,
|
||||
pub request_messaging: Vec<String>,
|
||||
pub grant_messaging: Vec<String>, // special logic for the string "all": makes process public
|
||||
pub public: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
Loading…
Reference in New Issue
Block a user