add in transfer logic

This commit is contained in:
dr-frmr 2023-10-24 15:18:38 -04:00
parent 767a7b04f9
commit 2735c0f04e
No known key found for this signature in database
6 changed files with 561 additions and 399 deletions

View File

@ -18,6 +18,7 @@ dependencies = [
"rand",
"serde",
"serde_json",
"sha2",
"wit-bindgen",
]
@ -42,6 +43,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "cargo-component-bindings"
version = "0.1.0"
@ -72,6 +82,35 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cpufeatures"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@ -87,6 +126,16 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.10"
@ -284,6 +333,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "smallvec"
version = "1.11.0"
@ -325,6 +385,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "typenum"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicase"
version = "2.7.0"

View File

@ -17,6 +17,7 @@ cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-co
rand = "0.8.5"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.8"
wit-bindgen = { version = "0.11.0", default_features = false }
[lib]

View File

@ -4,6 +4,7 @@ use bindings::{
send_request, send_response, Guest,
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256, Sha512};
use std::collections::{HashMap, HashSet};
#[allow(dead_code)]
@ -17,6 +18,9 @@ use process_lib::PackageId;
#[allow(dead_code)]
mod ft_worker_lib;
use ft_worker_lib::{
spawn_receive_transfer, spawn_transfer, FTWorkerCommand, FTWorkerResult, FileTransferContext,
};
struct Component;
@ -43,15 +47,15 @@ struct Component;
#[derive(Debug, Serialize, Deserialize)]
struct State {
pub packages: HashMap<PackageId, PackageState>,
pub requested_packages: HashMap<PackageId, NodeId>,
}
/// state of an individual package we have downloaded
#[derive(Debug, Serialize, Deserialize)]
struct PackageState {
pub mirrored_from: NodeId,
pub listing_data: Option<PackageListing>, // None if package is unlisted
pub installed_version: Option<kt::PackageVersion>, // None if downloaded but not installed
pub mirroring: bool, // are we serving this package to others?
pub listing_data: PackageListing,
pub mirroring: bool, // are we serving this package to others?
pub auto_update: bool, // if we get a listing data update, will we try to download it?
}
@ -75,16 +79,19 @@ struct PackageListing {
pub enum Req {
LocalRequest(LocalRequest),
RemoteRequest(RemoteRequest),
FTWorkerCommand(ft_worker_lib::FTWorkerCommand),
FTWorkerCommand(FTWorkerCommand),
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)] // untagged as a meta-type for all responses
pub enum Resp {
RemoteResponse(RemoteResponse),
FTWorkerResult(FTWorkerResult),
// note that we do not need to ourselves handle local responses, as
// those are given to others rather than received.
RemoteResponse(RemoteResponse),
FTWorkerResult(ft_worker_lib::FTWorkerResult),
NewPackageResponse(NewPackageResponse),
DownloadResponse(DownloadResponse),
InstallResponse(InstallResponse),
}
/// Local requests take this form.
@ -93,6 +100,7 @@ pub enum LocalRequest {
/// expects a zipped package as payload: create a new package from it
/// if requested, will return a NewPackageResponse indicating success/failure
NewPackage {
package: PackageId,
mirror: bool, // sets whether we will mirror this package
},
/// no payload; try to download a package from a specified node
@ -139,7 +147,7 @@ pub enum NewPackageResponse {
#[derive(Debug, Serialize, Deserialize)]
pub enum DownloadResponse {
Success,
Started,
Failure,
}
@ -171,6 +179,7 @@ impl Guest for Component {
// load in our saved state or initalize a new one if none exists
let mut state = process_lib::get_state::<State>().unwrap_or(State {
packages: HashMap::new(),
requested_packages: HashMap::new(),
});
// active the main messaging loop: handle requests and responses
@ -183,6 +192,10 @@ impl Guest for Component {
continue;
}
};
print_to_terminal(
0,
&format!("app-store: got message from {}: {:?}", source.to_string(), message),
);
match message {
Message::Request(req) => {
let Some(ref ipc) = req.ipc else {
@ -190,16 +203,14 @@ impl Guest for Component {
};
match &serde_json::from_str::<Req>(ipc) {
Ok(Req::LocalRequest(local_request)) => {
match handle_local_request(local_request) {
match handle_local_request(&our, &source, local_request, &mut state) {
Ok(None) => continue,
Ok(Some(resp)) => {
if req.expects_response.is_some() {
send_response(
&Response {
inherit: false,
ipc: Some(
serde_json::to_string(&resp).unwrap(),
),
ipc: Some(serde_json::to_string(&resp).unwrap()),
metadata: None,
},
None,
@ -215,16 +226,14 @@ impl Guest for Component {
}
}
Ok(Req::RemoteRequest(remote_request)) => {
match handle_remote_request(remote_request) {
match handle_remote_request(&our, &source, remote_request, &mut state) {
Ok(None) => continue,
Ok(Some(resp)) => {
if req.expects_response.is_some() {
send_response(
&Response {
inherit: false,
ipc: Some(
serde_json::to_string(&resp).unwrap(),
),
ipc: Some(serde_json::to_string(&resp).unwrap()),
metadata: None,
},
None,
@ -234,15 +243,19 @@ impl Guest for Component {
Err(err) => {
print_to_terminal(
0,
&format!("app-store: local request error: {:?}", err),
&format!("app-store: remote request error: {:?}", err),
);
}
}
}
Ok(Req::FTWorkerCommand(ft_worker_command)) => {
// TODO handle ft_worker commands
Ok(Req::FTWorkerCommand(_)) => {
spawn_receive_transfer(&our, ipc);
}
Err(_) => {
e => {
print_to_terminal(
0,
&format!("app store bad request: {}, error {:?}", ipc, e),
);
continue;
}
}
@ -252,384 +265,461 @@ impl Guest for Component {
continue;
};
match &serde_json::from_str::<Resp>(ipc) {
Ok(Resp::RemoteResponse(remote_response)) => {
// TODO handle remote response
}
Ok(Resp::RemoteResponse(remote_response)) => match remote_response {
RemoteResponse::DownloadApproved => {
print_to_terminal(
0,
"app store: download approved, should be starting",
);
}
RemoteResponse::DownloadDenied => {
print_to_terminal(
0,
"app store: could not download package from that node!",
);
}
},
Ok(Resp::FTWorkerResult(ft_worker_result)) => {
// TODO handle ft_worker result
let Ok(context) = serde_json::from_str::<FileTransferContext>(&context.unwrap_or_default()) else {
print_to_terminal(0, "file_transfer: got weird local request");
continue;
};
match ft_worker_result {
FTWorkerResult::SendSuccess => {
print_to_terminal(
0,
&format!(
"file_transfer: successfully shared app {} in {:.4}s",
context.file_name,
std::time::SystemTime::now()
.duration_since(context.start_time)
.unwrap()
.as_secs_f64(),
),
);
}
FTWorkerResult::ReceiveSuccess(name) => {
// do with file what you'd like here
print_to_terminal(
0,
&format!("file_transfer: successfully received {:?}", name,),
);
// remove .zip from name
let package_id =
match PackageId::from_str(name.trim_end_matches(".zip")) {
Ok(package_id) => package_id,
Err(_) => {
print_to_terminal(
0,
&format!(
"app store: bad package filename: {}",
name
),
);
continue;
}
};
if let Some(install_from) =
state.requested_packages.remove(&package_id)
{
if install_from == source.node {
// auto-take zip from payload and request ourself with New
let _ = send_request(
&our,
&Request {
inherit: true, // will inherit payload!
expects_response: None,
ipc: Some(
serde_json::to_string(&Req::LocalRequest(
LocalRequest::NewPackage {
package: package_id,
mirror: true,
},
))
.unwrap(),
),
metadata: None,
},
None,
None,
);
} else {
print_to_terminal(
0,
&format!(
"app-store: got install response from bad source: {}",
install_from
),
);
}
}
}
FTWorkerResult::Err(e) => {
print_to_terminal(
0,
&format!("app store file transfer: error {:?}", e),
);
}
}
}
Err(_) => {
e => {
print_to_terminal(
0,
&format!("app store bad response: {}, error {:?}", ipc, e),
);
continue;
}
}
// // only expecting NewFromRemote for apps we've requested
// match serde_json::from_str(&response.ipc.unwrap_or_default()) {
// Ok(AppTrackerResponse::NewFromRemote { package_id }) => {
// if let Some(install_from) = state.requested_packages.remove(&package_id)
// {
// if install_from == source.node {
// // auto-take zip from payload and request ourself with New
// let _ = send_request(
// &our,
// &Request {
// inherit: true, // will inherit payload!
// expects_response: None,
// ipc: Some(
// serde_json::to_string(&AppTrackerRequest::New {
// package: package_id,
// mirror: true,
// })
// .unwrap(),
// ),
// metadata: None,
// },
// None,
// None,
// );
// } else {
// print_to_terminal(
// 0,
// &format!(
// "app-store: got install response from bad source: {}",
// install_from
// ),
// );
// }
// }
// }
// err => {
// print_to_terminal(
// 0,
// &format!("app-store: got unexpected response {:?}", err),
// );
// }
// }
}
}
}
}
}
fn handle_local_request(request: &LocalRequest) -> anyhow::Result<Option<Resp>> {
// TODO
Ok(None)
fn handle_local_request(
our: &Address,
source: &Address,
request: &LocalRequest,
state: &mut State,
) -> anyhow::Result<Option<Resp>> {
if our.node != source.node {
return Err(anyhow::anyhow!("local request from non-local node"));
}
match request {
LocalRequest::NewPackage { package, mirror } => {
let Some(mut payload) = get_payload() else {
return Err(anyhow::anyhow!("no payload"));
};
let vfs_address = Address {
node: our.node.clone(),
process: ProcessId::from_str("vfs:sys:uqbar")?,
};
// produce the version hash for this new package
let mut hasher = sha2::Sha256::new();
hasher.update(&payload.bytes);
let version_hash = format!("{:x}", hasher.finalize());
let _ = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::New,
})?),
None,
None,
5,
)?;
// add zip bytes
payload.mime = Some("application/zip".to_string());
let _ = process_lib::send_and_await_response(
&vfs_address,
true,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::Add {
full_path: package.to_string(),
entry_type: kt::AddEntryType::ZipArchive,
},
})?),
None,
Some(&payload),
5,
)?;
// save the zip file itself in VFS for sharing with other nodes
// call it <package>.zip
let _ = process_lib::send_and_await_response(
&vfs_address,
true,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::Add {
full_path: format!("/{}.zip", package.to_string()),
entry_type: kt::AddEntryType::NewFile,
},
})?),
None,
Some(&payload),
5,
)?;
let _ = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::GetEntry("/metadata.json".into()),
})?),
None,
None,
5,
)?;
let Some(payload) = get_payload() else {
return Err(anyhow::anyhow!("no metadata payload"));
};
let metadata = String::from_utf8(payload.bytes)?;
let metadata = serde_json::from_str::<PackageMetadata>(&metadata)?;
let listing_data = PackageListing {
name: metadata.package,
publisher: our.node.clone(),
description: metadata.description,
website: metadata.website,
version: metadata.version,
version_hash,
};
let package_state = PackageState {
mirrored_from: our.node.clone(),
listing_data,
mirroring: *mirror,
auto_update: true,
};
state.packages.insert(package.clone(), package_state);
process_lib::set_state::<State>(&state);
Ok(Some(Resp::NewPackageResponse(NewPackageResponse::Success)))
}
LocalRequest::Download {
package,
install_from,
} => Ok(Some(Resp::DownloadResponse(
match process_lib::send_and_await_response(
&Address {
node: install_from.clone(),
process: our.process.clone(),
},
true,
Some(serde_json::to_string(&RemoteRequest::Download(
package.clone(),
))?),
None,
None,
5,
) {
Ok((_source, Message::Response((resp, _context)))) => {
let Some(ipc) = resp.ipc else {
return Err(anyhow::anyhow!("no ipc in response"))
};
let resp = serde_json::from_str::<Resp>(&ipc)?;
match resp {
Resp::RemoteResponse(RemoteResponse::DownloadApproved) => {
state
.requested_packages
.insert(package.clone(), install_from.to_string());
process_lib::set_state::<State>(&state);
DownloadResponse::Started
}
_ => DownloadResponse::Failure,
}
}
_ => DownloadResponse::Failure,
},
))),
LocalRequest::Install(package) => {
let vfs_address = Address {
node: our.node.clone(),
process: ProcessId::from_str("vfs:sys:uqbar")?,
};
let _ = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::GetEntry("/manifest.json".into()),
})?),
None,
None,
5,
)?;
let Some(payload) = get_payload() else {
return Err(anyhow::anyhow!("no payload"));
};
let manifest = String::from_utf8(payload.bytes)?;
let manifest = serde_json::from_str::<Vec<PackageManifestEntry>>(&manifest)?;
for entry in manifest {
let path = if entry.process_wasm_path.starts_with("/") {
entry.process_wasm_path
} else {
format!("/{}", entry.process_wasm_path)
};
let (_, hash_response) = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::GetHash(path.clone()),
})?),
None,
None,
5,
)?;
let Message::Response((Response { ipc: Some(ipc), .. }, _)) = hash_response else {
return Err(anyhow::anyhow!("bad vfs response"));
};
let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_str(&ipc)? else {
return Err(anyhow::anyhow!("no hash in vfs"));
};
// build initial caps
let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new();
if entry.request_networking {
let Some(networking_cap) = get_capability(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("kernel:sys:uqbar")?,
},
&"\"network\"".to_string(),
) else {
return Err(anyhow::anyhow!("app-store: no net cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap));
}
let Some(read_cap) = get_capability(
&vfs_address.clone(),
&serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no read cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(read_cap));
let Some(write_cap) = get_capability(
&vfs_address.clone(),
&serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no write cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(write_cap));
for process_name in &entry.request_messaging {
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
// TODO handle arbitrary caps here
continue;
};
let Some(messaging_cap) = get_capability(
&Address {
node: our.node.clone(),
process: parsed_process_id.clone(),
},
&"\"messaging\"".into()
) else {
print_to_terminal(0, &format!("app-store: no cap for {} to give away!", process_name));
continue;
};
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
}
let process_id = format!("{}:{}", entry.process_name, package.to_string());
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
return Err(anyhow::anyhow!("app-store: invalid process id!"));
};
let _ = process_lib::send_request(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("kernel:sys:uqbar")?,
},
false,
Some(serde_json::to_string(&kt::KernelCommand::KillProcess(
kt::ProcessId::de_wit(parsed_new_process_id.clone()),
))?),
None,
None,
None,
);
// kernel start process takes bytes as payload + wasm_bytes_handle...
// reconsider perhaps
let (_, _bytes_response) = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::GetEntry(path),
})?),
None,
None,
5,
)?;
let Some(payload) = get_payload() else {
return Err(anyhow::anyhow!("no wasm bytes payload."));
};
let _ = process_lib::send_and_await_response(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("kernel:sys:uqbar")?,
},
false,
Some(serde_json::to_string(&kt::KernelCommand::StartProcess {
id: kt::ProcessId::de_wit(parsed_new_process_id),
wasm_bytes_handle: hash,
on_panic: entry.on_panic,
initial_capabilities,
public: entry.public,
})?),
None,
Some(&payload),
5,
)?;
}
Ok(Some(Resp::InstallResponse(InstallResponse::Success)))
}
LocalRequest::Uninstall(package) => {
// TODO
Ok(None)
}
LocalRequest::Delete(package) => {
// TODO
Ok(None)
}
}
}
fn handle_remote_request(request: &RemoteRequest) -> anyhow::Result<Option<Resp>> {
// TODO
Ok(None)
fn handle_remote_request(
our: &Address,
source: &Address,
request: &RemoteRequest,
state: &mut State,
) -> anyhow::Result<Option<Resp>> {
match request {
RemoteRequest::Download(package) => {
print_to_terminal(0, &format!("app store: got download request for {:?}", package));
print_to_terminal(0, &format!("app store: state: {:?}", state));
let Some(package_state) = state.packages.get(&package) else {
return Ok(Some(Resp::RemoteResponse(RemoteResponse::DownloadDenied)))
};
if !package_state.mirroring {
return Ok(Some(Resp::RemoteResponse(RemoteResponse::DownloadDenied)));
}
// get the .zip from VFS and attach as payload to response
let vfs_address = Address {
node: our.node.clone(),
process: ProcessId::from_str("vfs:sys:uqbar")?,
};
let file_name = format!("/{}.zip", package.to_string());
let _ = process_lib::send_and_await_response(
&vfs_address,
false,
Some(serde_json::to_string(&kt::VfsRequest {
drive: package.to_string(),
action: kt::VfsAction::GetEntry(file_name.clone()),
})?),
None,
None,
5,
)?;
// transfer will inherit the payload bytes we receive from VFS
spawn_transfer(&our, &file_name, None, &source);
Ok(Some(Resp::RemoteResponse(RemoteResponse::DownloadApproved)))
}
}
}
// fn parse_command(
// our: &Address,
// source: &Address,
// request_string: String,
// state: &mut AppTrackerState,
// ) -> anyhow::Result<Option<AppTrackerResponse>> {
// match serde_json::from_str(&request_string)? {
// // create a new package based on local payload
// AppTrackerRequest::New { package, mirror } => {
// if our.node != source.node {
// return Err(anyhow::anyhow!("new package request from non-local node"));
// }
// let Some(mut payload) = get_payload() else {
// return Err(anyhow::anyhow!("no payload"));
// };
// let vfs_address = Address {
// node: our.node.clone(),
// process: ProcessId::from_str("vfs:sys:uqbar")?,
// };
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::New,
// })?),
// None,
// None,
// 5,
// )?;
// // add zip bytes
// payload.mime = Some("application/zip".to_string());
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// true,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::Add {
// full_path: package.to_string(),
// entry_type: kt::AddEntryType::ZipArchive,
// },
// })?),
// None,
// Some(&payload),
// 5,
// )?;
// // save the zip file itself in VFS for sharing with other nodes
// // call it <package>.zip
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// true,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::Add {
// full_path: format!("/{}.zip", package.to_string()),
// entry_type: kt::AddEntryType::NewFile,
// },
// })?),
// None,
// Some(&payload),
// 5,
// )?;
// // if mirror, save in our state
// if mirror {
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::GetEntry("/metadata.json".into()),
// })?),
// None,
// None,
// 5,
// )?;
// let Some(payload) = get_payload() else {
// return Err(anyhow::anyhow!("no metadata payload"));
// };
// let metadata = String::from_utf8(payload.bytes)?;
// let metadata = serde_json::from_str::<PackageMetadata>(&metadata)?;
// state
// .mirrored_packages
// .insert(PackageId::new(&metadata.package, &metadata.publisher));
// process_lib::set_state::<AppTrackerState>(&state);
// }
// Ok(Some(AppTrackerResponse::New {
// package: package.to_string(),
// }))
// }
// // if we are the source, forward to install_from target.
// // if we install_from, respond with package if we have it
// AppTrackerRequest::NewFromRemote {
// package_id,
// install_from,
// } => {
// if our.node == source.node {
// let _ = send_request(
// &Address {
// node: install_from.clone(),
// process: our.process.clone(),
// },
// &Request {
// inherit: true,
// expects_response: Some(5), // TODO
// ipc: Some(serde_json::to_string(&AppTrackerRequest::NewFromRemote {
// package_id: package_id.clone(),
// install_from: install_from.clone(),
// })?),
// metadata: None,
// },
// None,
// None,
// );
// state.requested_packages.insert(package_id, install_from);
// process_lib::set_state::<AppTrackerState>(&state);
// Ok(None)
// } else if our.node == install_from {
// let Some(_mirror) = state.mirrored_packages.get(&package_id) else {
// return Ok(Some(AppTrackerResponse::Error { error: "package not mirrored here!".into() }))
// };
// // get the .zip from VFS and attach as payload to response
// let vfs_address = Address {
// node: our.node.clone(),
// process: ProcessId::from_str("vfs:sys:uqbar")?,
// };
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package_id.to_string(),
// action: kt::VfsAction::GetEntry(format!("/{}.zip", package_id.to_string())),
// })?),
// None,
// None,
// 5,
// )?;
// Ok(Some(AppTrackerResponse::NewFromRemote { package_id }))
// } else {
// // TODO what to do here?
// Ok(None)
// }
// }
// AppTrackerRequest::Install { package } => {
// if our.node != source.node {
// return Err(anyhow::anyhow!("install request from non-local node"));
// }
// let vfs_address = Address {
// node: our.node.clone(),
// process: ProcessId::from_str("vfs:sys:uqbar")?,
// };
// let _ = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::GetEntry("/manifest.json".into()),
// })?),
// None,
// None,
// 5,
// )?;
// let Some(payload) = get_payload() else {
// return Err(anyhow::anyhow!("no payload"));
// };
// let manifest = String::from_utf8(payload.bytes)?;
// let manifest = serde_json::from_str::<Vec<PackageManifestEntry>>(&manifest)?;
// for entry in manifest {
// let path = if entry.process_wasm_path.starts_with("/") {
// entry.process_wasm_path
// } else {
// format!("/{}", entry.process_wasm_path)
// };
// let (_, hash_response) = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::GetHash(path.clone()),
// })?),
// None,
// None,
// 5,
// )?;
// let Message::Response((Response { ipc: Some(ipc), .. }, _)) = hash_response else {
// return Err(anyhow::anyhow!("bad vfs response"));
// };
// let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_str(&ipc)? else {
// return Err(anyhow::anyhow!("no hash in vfs"));
// };
// // build initial caps
// let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new();
// if entry.request_networking {
// let Some(networking_cap) = get_capability(
// &Address {
// node: our.node.clone(),
// process: ProcessId::from_str("kernel:sys:uqbar")?,
// },
// &"\"network\"".to_string(),
// ) else {
// return Err(anyhow::anyhow!("app-store: no net cap"));
// };
// initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap));
// }
// let Some(read_cap) = get_capability(
// &vfs_address.clone(),
// &serde_json::to_string(&serde_json::json!({
// "kind": "read",
// "drive": package.to_string(),
// }))?,
// ) else {
// return Err(anyhow::anyhow!("app-store: no read cap"));
// };
// initial_capabilities.insert(kt::de_wit_signed_capability(read_cap));
// let Some(write_cap) = get_capability(
// &vfs_address.clone(),
// &serde_json::to_string(&serde_json::json!({
// "kind": "write",
// "drive": package.to_string(),
// }))?,
// ) else {
// return Err(anyhow::anyhow!("app-store: no write cap"));
// };
// initial_capabilities.insert(kt::de_wit_signed_capability(write_cap));
// for process_name in &entry.request_messaging {
// let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
// // TODO handle arbitrary caps here
// continue;
// };
// let Some(messaging_cap) = get_capability(
// &Address {
// node: our.node.clone(),
// process: parsed_process_id.clone(),
// },
// &"\"messaging\"".into()
// ) else {
// print_to_terminal(0, &format!("app-store: no cap for {} to give away!", process_name));
// continue;
// };
// initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
// }
// let process_id = format!("{}:{}", entry.process_name, package.to_string());
// let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
// return Err(anyhow::anyhow!("app-store: invalid process id!"));
// };
// let _ = process_lib::send_request(
// &Address {
// node: our.node.clone(),
// process: ProcessId::from_str("kernel:sys:uqbar")?,
// },
// false,
// Some(serde_json::to_string(&kt::KernelCommand::KillProcess(
// kt::ProcessId::de_wit(parsed_new_process_id.clone()),
// ))?),
// None,
// None,
// None,
// );
// // kernel start process takes bytes as payload + wasm_bytes_handle...
// // reconsider perhaps
// let (_, _bytes_response) = process_lib::send_and_await_response(
// &vfs_address,
// false,
// Some(serde_json::to_string(&kt::VfsRequest {
// drive: package.to_string(),
// action: kt::VfsAction::GetEntry(path),
// })?),
// None,
// None,
// 5,
// )?;
// let Some(payload) = get_payload() else {
// return Err(anyhow::anyhow!("no wasm bytes payload."));
// };
// let _ = process_lib::send_and_await_response(
// &Address {
// node: our.node.clone(),
// process: ProcessId::from_str("kernel:sys:uqbar")?,
// },
// false,
// Some(serde_json::to_string(&kt::KernelCommand::StartProcess {
// id: kt::ProcessId::de_wit(parsed_new_process_id),
// wasm_bytes_handle: hash,
// on_panic: entry.on_panic,
// initial_capabilities,
// public: entry.public,
// })?),
// None,
// Some(&payload),
// 5,
// )?;
// }
// Ok(Some(AppTrackerResponse::Install {
// package: package.to_string(),
// }))
// }
// }
// }

View File

@ -1,11 +1,11 @@
use super::bindings::component::uq_process::types::*;
use super::bindings::{Address, Payload, print_to_terminal, send_request, spawn};
use super::bindings::{print_to_terminal, send_request, spawn, Address, Payload};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct FileTransferContext {
pub file_name: String,
pub file_size: u64,
pub file_size: Option<u64>,
pub start_time: std::time::SystemTime,
}
@ -48,7 +48,7 @@ pub enum TransferError {
pub fn spawn_transfer(
our: &Address,
file_name: &str,
file_bytes: Vec<u8>,
file_bytes: Option<Vec<u8>>, // if None, expects to inherit payload!
to_addr: &Address,
) {
let transfer_id: u64 = rand::random();
@ -64,13 +64,17 @@ pub fn spawn_transfer(
return;
};
// tell the worker what to do
let payload_or_inherit = match file_bytes {
Some(bytes) => Some(Payload { mime: None, bytes }),
None => None,
};
send_request(
&Address {
node: our.node.clone(),
process: worker_process_id,
},
&Request {
inherit: false,
inherit: !payload_or_inherit.is_some(),
expects_response: Some(61),
ipc: Some(
serde_json::to_string(&FTWorkerCommand::Send {
@ -85,19 +89,19 @@ pub fn spawn_transfer(
Some(
&serde_json::to_string(&FileTransferContext {
file_name: file_name.into(),
file_size: file_bytes.len() as u64,
file_size: match &payload_or_inherit {
Some(p) => Some(p.bytes.len() as u64),
None => None, // TODO
},
start_time: std::time::SystemTime::now(),
})
.unwrap(),
),
Some(&Payload { mime: None, bytes: file_bytes }),
payload_or_inherit.as_ref(),
);
}
pub fn spawn_receive_transfer(
our: &Address,
ipc: &str,
) {
pub fn spawn_receive_transfer(our: &Address, ipc: &str) {
let Ok(FTWorkerCommand::Receive { transfer_id, .. }) = serde_json::from_str(ipc) else {
print_to_terminal(0, "file_transfer: got weird request");
return;

View File

@ -5,7 +5,6 @@
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"http_bindings:http_bindings:uqbar",
"terminal:terminal:uqbar",
"filesystem:sys:uqbar",
"http_server:sys:uqbar",

View File

@ -403,12 +403,14 @@ pub struct ProcessContext {
// filesystem.rs types
//
pub type PackageVersion = (u32, u32, u32);
/// the type that gets deserialized from `metadata.json` in a package
#[derive(Debug, Serialize, Deserialize)]
pub struct PackageMetadata {
pub package: String,
pub publisher: String,
pub version: Version,
pub version: PackageVersion,
pub description: Option<String>,
pub website: Option<String>,
}
@ -418,7 +420,7 @@ pub struct PackageMetadata {
pub struct PackageManifestEntry {
pub process_name: String,
pub process_wasm_path: String,
pub on_panic: kt::OnPanic,
pub on_panic: OnPanic,
pub request_networking: bool,
pub request_messaging: Vec<String>,
pub public: bool,