Merge branch 'main' into da/runtime-extensions

This commit is contained in:
tadad 2023-11-14 15:38:33 +02:00 committed by GitHub
commit b9a5c008ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1021 additions and 973 deletions

View File

@ -46,15 +46,15 @@ fn build_app(target_path: &str, name: &str, parent_pkg_path: Option<&str>) {
.unwrap_or(true)
{
// create target/bindings directory
fs::create_dir_all(&format!("{}/target/bindings/{}", target_path, name,)).unwrap();
fs::create_dir_all(format!("{}/target/bindings/{}", target_path, name,)).unwrap();
// copy newly-made target.wasm into target/bindings
run_command(Command::new("cp").args(&[
run_command(Command::new("cp").args([
"target.wasm",
&format!("{}/target/bindings/{}/", target_path, name,),
]))
.unwrap();
// copy newly-made world into target/bindings
run_command(Command::new("cp").args(&[
run_command(Command::new("cp").args([
"world",
&format!("{}/target/bindings/{}/", target_path, name,),
]))
@ -65,10 +65,10 @@ fn build_app(target_path: &str, name: &str, parent_pkg_path: Option<&str>) {
if std::path::Path::new(&bash_build_path).exists() {
let cwd = std::env::current_dir().unwrap();
std::env::set_current_dir(target_path).unwrap();
run_command(&mut Command::new("/bin/bash").arg("build.sh")).unwrap();
run_command(Command::new("/bin/bash").arg("build.sh")).unwrap();
std::env::set_current_dir(cwd).unwrap();
} else {
run_command(Command::new("cargo").args(&[
run_command(Command::new("cargo").args([
"+nightly",
"build",
"--release",
@ -80,7 +80,7 @@ fn build_app(target_path: &str, name: &str, parent_pkg_path: Option<&str>) {
.unwrap();
}
// Adapt module to component with adapter based on wasi_snapshot_preview1.wasm
run_command(Command::new("wasm-tools").args(&[
run_command(Command::new("wasm-tools").args([
"component",
"new",
&format!("{}/target/wasm32-wasi/release/{}.wasm", target_path, name),
@ -99,12 +99,12 @@ fn build_app(target_path: &str, name: &str, parent_pkg_path: Option<&str>) {
format!("{}/{}.wasm", parent_pkg, name)
} else {
let pkg_folder = format!("{}/pkg/", target_path);
let _ = run_command(Command::new("mkdir").args(&["-p", &pkg_folder]));
let _ = run_command(Command::new("mkdir").args(["-p", &pkg_folder]));
format!("{}/{}.wasm", pkg_folder, name)
};
// Embed "wit" into the component
run_command(Command::new("wasm-tools").args(&[
run_command(Command::new("wasm-tools").args([
"component",
"embed",
"wit",
@ -136,7 +136,7 @@ fn main() {
let pwd = std::env::current_dir().unwrap();
// Create target.wasm (compiled .wit) & world
run_command(Command::new("wasm-tools").args(&[
run_command(Command::new("wasm-tools").args([
"component",
"wit",
&format!("{}/wit/", pwd.display()),
@ -145,18 +145,18 @@ fn main() {
"--wasm",
]))
.unwrap();
run_command(Command::new("touch").args(&[&format!("{}/world", pwd.display())])).unwrap();
run_command(Command::new("touch").args([&format!("{}/world", pwd.display())])).unwrap();
// Build wasm32-wasi apps.
let modules_dir = format!("{}/modules", pwd.display());
for entry in std::fs::read_dir(&modules_dir).unwrap() {
for entry in std::fs::read_dir(modules_dir).unwrap() {
let entry_path = entry.unwrap().path();
let package_name = entry_path.file_name().unwrap().to_str().unwrap();
// If Cargo.toml is present, build the app
let parent_pkg_path = format!("{}/pkg", entry_path.display());
if entry_path.join("Cargo.toml").exists() {
build_app(&entry_path.display().to_string(), &package_name, None);
build_app(&entry_path.display().to_string(), package_name, None);
} else if entry_path.is_dir() {
fs::create_dir_all(&parent_pkg_path).unwrap();
@ -166,7 +166,7 @@ fn main() {
if sub_entry_path.join("Cargo.toml").exists() {
build_app(
&sub_entry_path.display().to_string(),
&sub_entry_path.file_name().unwrap().to_str().unwrap(),
sub_entry_path.file_name().unwrap().to_str().unwrap(),
Some(&parent_pkg_path),
);
}
@ -199,7 +199,7 @@ fn main() {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).unwrap();
zip.write_all(&buffer).unwrap();
} else if name.as_os_str().len() != 0 {
} else if !name.as_os_str().is_empty() {
zip.add_directory(name.to_string_lossy().into_owned(), options)
.unwrap();
}

View File

@ -6,8 +6,8 @@ extern crate pleco;
use pleco::Board;
use uqbar_process_lib::uqbar::process::standard as wit;
use uqbar_process_lib::{
get_payload, get_typed_state, println, receive, set_state, Address, Message, Payload, Request,
Response,
get_payload, get_typed_state, grant_messaging, println, receive, set_state, Address, Message,
Payload, ProcessId, Request, Response,
};
wit_bindgen::generate!({
@ -174,11 +174,17 @@ const CHESS_CSS: &str = include_str!("../pkg/index.css");
impl Guest for Component {
fn init(our: String) {
let our = Address::from_str(&our).unwrap();
println!("CHESS: start");
println!("chess: start");
grant_messaging(
&our,
&Vec::from([ProcessId::from_str("http_server:sys:uqbar").unwrap()]),
);
for path in ["/", "/games"] {
Request::new()
.target(Address::new(&our.node, "http_server:sys:uqbar").unwrap()).unwrap()
.target(Address::new(&our.node, "http_server:sys:uqbar").unwrap())
.unwrap()
.ipc_bytes(
serde_json::json!({
"BindPath": {
@ -194,47 +200,47 @@ impl Guest for Component {
.send();
}
let mut state: ChessState = match get_typed_state(|bytes| Ok(bincode::deserialize::<StoredChessState>(bytes)?))
{
Some(state) => {
let mut games = HashMap::new();
for (id, game) in state.games {
if let Ok(board) = Board::from_fen(&game.board) {
games.insert(
id,
Game {
id: game.id.clone(),
turns: game.turns,
board,
white: game.white.clone(),
black: game.black.clone(),
ended: game.ended,
},
);
} else {
games.insert(
id,
Game {
id: game.id.clone(),
turns: 0,
board: Board::start_pos(),
white: game.white.clone(),
black: game.black.clone(),
ended: game.ended,
},
);
let mut state: ChessState =
match get_typed_state(|bytes| Ok(bincode::deserialize::<StoredChessState>(bytes)?)) {
Some(state) => {
let mut games = HashMap::new();
for (id, game) in state.games {
if let Ok(board) = Board::from_fen(&game.board) {
games.insert(
id,
Game {
id: game.id.clone(),
turns: game.turns,
board,
white: game.white.clone(),
black: game.black.clone(),
ended: game.ended,
},
);
} else {
games.insert(
id,
Game {
id: game.id.clone(),
turns: 0,
board: Board::start_pos(),
white: game.white.clone(),
black: game.black.clone(),
ended: game.ended,
},
);
}
}
ChessState {
games,
records: state.records,
}
}
ChessState {
games,
records: state.records,
}
}
None => ChessState {
games: HashMap::new(),
records: HashMap::new(),
},
};
None => ChessState {
games: HashMap::new(),
records: HashMap::new(),
},
};
loop {
let Ok((source, message)) = receive() else {
@ -284,7 +290,7 @@ fn handle_request(
mime: Some("application/octet-stream".to_string()),
bytes: "conflict".as_bytes().to_vec(),
})
.send()
.send();
}
}
let game = Game {
@ -324,7 +330,7 @@ fn handle_request(
mime: Some("application/octet-stream".to_string()),
bytes: "not found".as_bytes().to_vec(),
})
.send()
.send();
};
let valid_move = game
.board
@ -399,7 +405,7 @@ fn handle_request(
mime: Some("application/octet-stream".to_string()),
bytes: "not found".as_bytes().to_vec(),
})
.send()
.send();
};
game.ended = true;
@ -589,13 +595,9 @@ fn handle_request(
}
if let Some(game) = state.games.get_mut(&game_id) {
if game.turns % 2 == 0 && game.white != our.node {
return send_http_response(
403,
default_headers.clone(),
"Forbidden".to_string().as_bytes().to_vec(),
);
} else if game.turns % 2 == 1 && game.black != our.node {
if (game.turns % 2 == 0 && game.white != our.node)
|| (game.turns % 2 == 1 && game.black != our.node)
{
return send_http_response(
403,
default_headers.clone(),

View File

@ -1,5 +1,8 @@
use serde_json::json;
use uqbar_process_lib::{get_payload, receive, Address, Message, Payload, Request, Response};
use uqbar_process_lib::{
get_payload, grant_messaging, println, receive, Address, Message, Payload, ProcessId, Request,
Response,
};
wit_bindgen::generate!({
path: "../../wit",
@ -22,6 +25,11 @@ impl Guest for Component {
let our = Address::from_str(&our).unwrap();
println!("homepage: start");
grant_messaging(
&our,
&Vec::from([ProcessId::from_str("http_server:sys:uqbar").unwrap()]),
);
match main(our) {
Ok(_) => {}
Err(e) => {
@ -66,7 +74,6 @@ fn main(our: Address) -> anyhow::Result<()> {
};
if message_json["path"] == "/" && message_json["method"] == "GET" {
println!("homepage: sending response");
Response::new()
.ipc(
&json!({

View File

@ -1,7 +1,8 @@
use serde_json::json;
use std::collections::HashMap;
use uqbar_process_lib::{
get_payload, receive, println, Address, Message, Payload, Request, Response,
get_payload, grant_messaging, println, receive, Address, Message, Payload, ProcessId, Request,
Response,
};
wit_bindgen::generate!({
@ -16,7 +17,11 @@ struct Component;
impl Guest for Component {
fn init(our: String) {
let our = Address::from_str(&our).unwrap();
//print_to_terminal(1, "http_proxy: start");
grant_messaging(
&our,
&Vec::from([ProcessId::from_str("http_server:sys:uqbar").unwrap()]),
);
match main(our) {
Ok(_) => {}

View File

@ -162,7 +162,7 @@ checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "libsqlite3-sys"
version = "0.26.0"
source = "git+https://github.com/uqbar-dao/rusqlite?rev=fa6ed84#fa6ed843b65f7dd78d53a1b74c7e5095d71c2bd4"
source = "git+https://github.com/uqbar-dao/rusqlite?rev=8fb20a9#8fb20a9dedf4bea626036af8b43c92b050a24d9f"
dependencies = [
"cc",
"pkg-config",
@ -281,7 +281,7 @@ dependencies = [
[[package]]
name = "rusqlite"
version = "0.29.0"
source = "git+https://github.com/uqbar-dao/rusqlite?rev=fa6ed84#fa6ed843b65f7dd78d53a1b74c7e5095d71c2bd4"
source = "git+https://github.com/uqbar-dao/rusqlite?rev=8fb20a9#8fb20a9dedf4bea626036af8b43c92b050a24d9f"
dependencies = [
"bitflags",
"fallible-iterator",

View File

@ -14,7 +14,7 @@ lto = true
anyhow = "1.0"
bincode = "1.3.3"
rmp-serde = "1.1"
rusqlite = { git = "https://github.com/uqbar-dao/rusqlite", rev = "fa6ed84", features = ["bundled", "wasm32-wasi-vfs"] }
rusqlite = { git = "https://github.com/uqbar-dao/rusqlite", rev = "8fb20a9", features = ["bundled", "wasm32-wasi-vfs"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

View File

@ -14,7 +14,7 @@ use generic_array::GenericArray;
use rand::{thread_rng, Rng};
use ring::signature::Ed25519KeyPair;
use rsa::{BigUint, Oaep, RsaPublicKey};
use serde_json;
use std::collections::HashMap;
use std::sync::Arc;
@ -41,7 +41,7 @@ fn decrypt_data(secret_key_bytes: [u8; 32], data: Vec<u8>) -> Vec<u8> {
let nonce_bytes = data[data.len() - 12..].to_vec();
let encrypted_bytes = data[..data.len() - 12].to_vec();
let key = Key::<Aes256Gcm>::from_slice(&secret_key_bytes);
let cipher = Aes256Gcm::new(&key);
let cipher = Aes256Gcm::new(key);
let nonce = GenericArray::from_slice(&nonce_bytes);
let decrypted_bytes = cipher
.decrypt(nonce, encrypted_bytes.as_ref())
@ -89,7 +89,7 @@ pub async fn encryptor(
match serde_json::from_slice::<EncryptorMessage>(&ipc) {
Ok(message) => {
match message {
EncryptorMessage::GetKeyAction(GetKeyAction {
EncryptorMessage::GetKey(GetKeyAction {
channel_id,
public_key_hex,
}) => {
@ -183,7 +183,7 @@ pub async fn encryptor(
}
}
}
EncryptorMessage::DecryptAndForwardAction(DecryptAndForwardAction {
EncryptorMessage::DecryptAndForward(DecryptAndForwardAction {
channel_id,
forward_to,
json,
@ -193,7 +193,7 @@ pub async fn encryptor(
verbosity: 1,
content: format!(
"DECRYPTOR TO FORWARD: {}",
json.clone().unwrap_or_default().to_string()
json.clone().unwrap_or_default()
),
})
.await;
@ -212,12 +212,12 @@ pub async fn encryptor(
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let decrypted_bytes = decrypt_data(secret_key_bytes.clone(), data);
let decrypted_bytes = decrypt_data(*secret_key_bytes, data);
// Forward the unencrypted data to the target
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
@ -241,7 +241,7 @@ pub async fn encryptor(
panic!("No secret found");
}
}
EncryptorMessage::EncryptAndForwardAction(EncryptAndForwardAction {
EncryptorMessage::EncryptAndForward(EncryptAndForwardAction {
channel_id,
forward_to,
json,
@ -249,7 +249,7 @@ pub async fn encryptor(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ENCRYPTOR TO FORWARD"),
content: "ENCRYPTOR TO FORWARD".to_string(),
})
.await;
@ -266,7 +266,7 @@ pub async fn encryptor(
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let encrypted_bytes = encrypt_data(secret_key_bytes.clone(), data);
let encrypted_bytes = encrypt_data(*secret_key_bytes, data);
// Forward the ciphertext and nonce_hex to the specified process
let id: u64 = rand::random();
@ -296,16 +296,16 @@ pub async fn encryptor(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ERROR: No secret found"),
content: "ERROR: No secret found".to_string(),
})
.await;
}
}
EncryptorMessage::DecryptAction(DecryptAction { channel_id }) => {
EncryptorMessage::Decrypt(DecryptAction { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ENCRYPTOR TO DECRYPT"),
content: "ENCRYPTOR TO DECRYPT".to_string(),
})
.await;
@ -322,7 +322,7 @@ pub async fn encryptor(
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let decrypted_bytes = decrypt_data(secret_key_bytes.clone(), data);
let decrypted_bytes = decrypt_data(*secret_key_bytes, data);
let message = KernelMessage {
id: *id,
@ -352,16 +352,16 @@ pub async fn encryptor(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ERROR: No secret found"),
content: "ERROR: No secret found".to_string(),
})
.await;
}
}
EncryptorMessage::EncryptAction(EncryptAction { channel_id }) => {
EncryptorMessage::Encrypt(EncryptAction { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ENCRYPTOR TO ENCRYPT"),
content: "ENCRYPTOR TO ENCRYPT".to_string(),
})
.await;
@ -378,7 +378,7 @@ pub async fn encryptor(
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let encrypted_bytes = encrypt_data(secret_key_bytes.clone(), data);
let encrypted_bytes = encrypt_data(*secret_key_bytes, data);
let message = KernelMessage {
id: *id,
@ -408,7 +408,7 @@ pub async fn encryptor(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("ERROR: No secret found"),
content: "ERROR: No secret found".to_string(),
})
.await;
}

View File

@ -94,7 +94,7 @@ pub async fn eth_rpc(
// let call_data = content.payload.bytes.content.clone().unwrap_or(vec![]);
let Ok(action) = serde_json::from_slice::<EthRpcAction>(&json_bytes) else {
let Ok(action) = serde_json::from_slice::<EthRpcAction>(json_bytes) else {
send_to_loop
.send(make_error_message(
our.clone(),
@ -179,7 +179,8 @@ pub async fn eth_rpc(
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: connection failed, retrying in 5s"),
content: "eth_rpc: connection failed, retrying in 5s"
.to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
@ -203,7 +204,7 @@ pub async fn eth_rpc(
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: connection established"),
content: "eth_rpc: connection established".to_string(),
})
.await;
@ -234,9 +235,9 @@ pub async fn eth_rpc(
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!(
content:
"eth_rpc: subscription connection lost, reconnecting"
),
.to_string(),
})
.await;
}

View File

@ -13,12 +13,11 @@ use rusoto_s3::{
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::sync::RwLock;
use uuid;
/// Contains interface for filesystem manifest log, and write ahead log.
@ -45,7 +44,7 @@ pub struct ChunkEntry {
#[derive(Debug, Serialize, Deserialize, Clone, Eq, Hash, PartialEq)]
pub enum FileIdentifier {
UUID(u128),
Uuid(u128),
Process(ProcessId),
}
@ -66,7 +65,7 @@ pub struct BackupEntry {
#[derive(Debug, Clone, Copy)]
pub enum ChunkLocation {
ColdStorage(bool), // bool local
WAL(u64), // offset in wal,
Wal(u64), // offset in wal,
Memory(u64), // offset in memory buffer
}
@ -74,7 +73,7 @@ const NONCE_SIZE: usize = 24;
const TAG_SIZE: usize = 16;
const ENCRYPTION_OVERHEAD: usize = NONCE_SIZE + TAG_SIZE;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct InMemoryFile {
// chunks: (start) -> (hash, length, chunk_location, encrypted) [commited txs]
pub chunks: BTreeMap<u64, ([u8; 32], u64, ChunkLocation, bool)>,
@ -90,7 +89,7 @@ pub struct InMemoryFile {
impl InMemoryFile {
pub fn hash(&self) -> [u8; 32] {
let mut hasher = Hasher::new();
for (_, (hash, _, _, _)) in &self.chunks {
for (hash, _, _, _) in self.chunks.values() {
hasher.update(hash);
}
hasher.finalize().into()
@ -138,25 +137,14 @@ impl InMemoryFile {
}
}
impl Default for InMemoryFile {
fn default() -> Self {
Self {
chunks: BTreeMap::new(),
active_txs: HashMap::new(),
mem_chunks: Vec::new(),
wal_chunks: Vec::new(),
}
}
}
impl FileIdentifier {
pub fn new_uuid() -> Self {
Self::UUID(uuid::Uuid::new_v4().as_u128())
Self::Uuid(uuid::Uuid::new_v4().as_u128())
}
pub fn to_uuid(&self) -> Option<u128> {
match self {
Self::UUID(uuid) => Some(*uuid),
Self::Uuid(uuid) => Some(*uuid),
_ => None,
}
}
@ -186,7 +174,7 @@ impl Manifest {
pub async fn load(
manifest_file: fs::File,
wal_file: fs::File,
fs_directory_path: &PathBuf,
fs_directory_path: &Path,
file_key: Vec<u8>,
fs_config: FsConfig,
) -> io::Result<Self> {
@ -220,7 +208,7 @@ impl Manifest {
hash_index: Arc::new(RwLock::new(hash_index)),
manifest_file: Arc::new(RwLock::new(manifest_file)),
wal_file: Arc::new(RwLock::new(wal_file)),
fs_directory_path: fs_directory_path.clone(),
fs_directory_path: fs_directory_path.to_path_buf(),
flush_cold_freq: fs_config.flush_to_cold_interval,
memory_buffer: Arc::new(RwLock::new(Vec::new())),
memory_limit: fs_config.mem_buffer_limit,
@ -234,12 +222,12 @@ impl Manifest {
pub async fn get(&self, file: &FileIdentifier) -> Option<InMemoryFile> {
let read_lock = self.manifest.read().await;
read_lock.get(&file).cloned()
read_lock.get(file).cloned()
}
pub async fn get_length(&self, file: &FileIdentifier) -> Option<u64> {
let read_lock = self.manifest.read().await;
read_lock.get(&file).map(|f| f.get_len())
read_lock.get(file).map(|f| f.get_len())
}
pub async fn _get_memory_buffer_size(&self) -> usize {
@ -266,7 +254,7 @@ impl Manifest {
pub async fn _get_chunk_hashes(&self) -> HashSet<[u8; 32]> {
let mut in_use_hashes = HashSet::new();
for file in self.manifest.read().await.values() {
for (_start, (hash, _length, _wal_position, _encrypted)) in &file.chunks {
for (hash, _length, _wal_position, _encrypted) in file.chunks.values() {
in_use_hashes.insert(*hash);
}
}
@ -307,10 +295,10 @@ impl Manifest {
for (start, hash, length, location, encrypted) in tx_chunks {
in_memory_file
.chunks
.insert(start, (hash, length, location.clone(), encrypted));
.insert(start, (hash, length, location, encrypted));
match &location {
&ChunkLocation::Memory(..) => in_memory_file.mem_chunks.push(start),
&ChunkLocation::WAL(..) => in_memory_file.wal_chunks.push(start),
ChunkLocation::Memory(..) => in_memory_file.mem_chunks.push(start),
ChunkLocation::Wal(..) => in_memory_file.wal_chunks.push(start),
_ => {}
}
}
@ -325,7 +313,7 @@ impl Manifest {
let mut wal_file = self.wal_file.write().await;
let wal_length_before_flush = wal_file.seek(SeekFrom::End(0)).await?;
wal_file.write_all(&memory_buffer).await?;
wal_file.write_all(memory_buffer).await?;
for in_memory_file in manifest.values_mut() {
// update the locations of the in-memory chunks
@ -334,7 +322,7 @@ impl Manifest {
in_memory_file.chunks.get_mut(&start)
{
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
in_memory_file.wal_chunks.push(start);
}
}
@ -344,7 +332,7 @@ impl Manifest {
for tx_chunks in in_memory_file.active_txs.values_mut() {
for (_start, _hash, _length, location, _encrypted) in tx_chunks {
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
}
}
}
@ -374,7 +362,7 @@ impl Manifest {
in_memory_file.chunks.get_mut(&start)
{
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
in_memory_file.wal_chunks.push(start);
}
}
@ -384,7 +372,7 @@ impl Manifest {
for tx_chunks in in_memory_file.active_txs.values_mut() {
for (_start, _hash, _length, location, _encrypted) in tx_chunks {
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
}
}
}
@ -406,12 +394,12 @@ impl Manifest {
None
};
let mut chunks = data.chunks(self.chunk_size);
let chunks = data.chunks(self.chunk_size);
let mut chunk_start = 0u64;
let tx_id = rand::random::<u64>(); // uuid instead?
while let Some(chunk) = chunks.next() {
for chunk in chunks {
if memory_buffer.len() + chunk.len() > self.memory_limit {
manifest.insert(file.clone(), in_memory_file);
self.flush_to_wal(&mut manifest, &mut memory_buffer).await?;
@ -452,7 +440,7 @@ impl Manifest {
) -> Result<(), FsError> {
let chunk_hashes = self.chunk_hashes.read().await;
let chunk_hash: [u8; 32] = blake3::hash(&chunk).into();
let chunk_hash: [u8; 32] = blake3::hash(chunk).into();
let chunk_length = chunk.len() as u64;
let (copy, is_local) = if let Some(is_local) = chunk_hashes.get(&chunk_hash) {
(true, *is_local)
@ -464,7 +452,7 @@ impl Manifest {
let mut chunk_data = chunk.to_vec();
if let Some(cipher) = cipher {
chunk_data = encrypt(&cipher, &chunk_data)?;
chunk_data = encrypt(cipher, &chunk_data)?;
encrypted = true;
}
@ -498,11 +486,13 @@ impl Manifest {
};
// update the in_memory_file directly
in_memory_file
.active_txs
.entry(tx_id)
.or_insert_with(Vec::new)
.push((start, chunk_hash, chunk_length, proper_position, encrypted));
in_memory_file.active_txs.entry(tx_id).or_default().push((
start,
chunk_hash,
chunk_length,
proper_position,
encrypted,
));
Ok(())
}
@ -526,7 +516,7 @@ impl Manifest {
} else {
file.chunks
.iter()
.map(|(&start, value)| (start, value.clone()))
.map(|(&start, value)| (start, *value))
.collect()
};
@ -552,11 +542,11 @@ impl Manifest {
let mut chunk_data =
memory_buffer[offset as usize..(offset + len) as usize].to_vec();
if encrypted {
chunk_data = decrypt(&cipher, &chunk_data)?;
chunk_data = decrypt(cipher, &chunk_data)?;
}
chunk_data
}
ChunkLocation::WAL(offset) => {
ChunkLocation::Wal(offset) => {
let mut wal_file = self.wal_file.write().await;
wal_file
.seek(SeekFrom::Start(offset))
@ -578,7 +568,7 @@ impl Manifest {
error: format!("Local WAL read failed: {}", e),
})?;
if encrypted {
buffer = decrypt(&cipher, &buffer)?;
buffer = decrypt(cipher, &buffer)?;
}
buffer
}
@ -589,7 +579,7 @@ impl Manifest {
error: format!("Local Cold read failed: {}", e),
})?;
if encrypted {
buffer = decrypt(&*self.cipher, &buffer)?;
buffer = decrypt(&self.cipher, &buffer)?;
}
buffer
} else {
@ -606,7 +596,7 @@ impl Manifest {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).await?;
if encrypted {
buffer = decrypt(&*self.cipher, &buffer)?;
buffer = decrypt(&self.cipher, &buffer)?;
}
buffer
}
@ -653,7 +643,7 @@ impl Manifest {
} else {
file.chunks
.iter()
.map(|(&start, value)| (start, value.clone()))
.map(|(&start, value)| (start, *value))
.collect()
};
@ -680,11 +670,11 @@ impl Manifest {
let mut chunk_data =
memory_buffer[offset as usize..(offset + len) as usize].to_vec();
if encrypted {
chunk_data = decrypt(&cipher, &chunk_data)?;
chunk_data = decrypt(cipher, &chunk_data)?;
}
chunk_data
}
ChunkLocation::WAL(offset) => {
ChunkLocation::Wal(offset) => {
let mut wal_file = self.wal_file.write().await;
wal_file
.seek(SeekFrom::Start(offset))
@ -706,7 +696,7 @@ impl Manifest {
error: format!("Local WAL read failed: {}", e),
})?;
if encrypted {
buffer = decrypt(&cipher, &buffer)?;
buffer = decrypt(cipher, &buffer)?;
}
buffer
}
@ -717,7 +707,7 @@ impl Manifest {
error: format!("Local Cold read failed: {}", e),
})?;
if encrypted {
buffer = decrypt(&*self.cipher, &buffer)?;
buffer = decrypt(&self.cipher, &buffer)?;
}
buffer
} else {
@ -734,7 +724,7 @@ impl Manifest {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).await?;
if encrypted {
buffer = decrypt(&*self.cipher, &buffer)?;
buffer = decrypt(&self.cipher, &buffer)?;
}
buffer
}
@ -805,8 +795,8 @@ impl Manifest {
0,
); // extend the chunk data if necessary
let data_to_write = &data[data_offset..data_offset + write_length as usize];
chunk_data[chunk_data_start..chunk_data_start + write_length as usize]
let data_to_write = &data[data_offset..data_offset + write_length];
chunk_data[chunk_data_start..chunk_data_start + write_length]
.copy_from_slice(data_to_write);
if memory_buffer.len() + chunk_data.len() > self.memory_limit {
@ -966,32 +956,30 @@ impl Manifest {
let mut chunks_to_flush: Vec<([u8; 32], u64, u64, ChunkLocation, bool)> = Vec::new();
for &start in &in_memory_file.mem_chunks {
if let Some((hash, length, location, encrypted)) = in_memory_file.chunks.get(&start)
if let Some((hash, length, ChunkLocation::Memory(mem_pos), encrypted)) =
in_memory_file.chunks.get(&start)
{
if let ChunkLocation::Memory(mem_pos) = location {
chunks_to_flush.push((
*hash,
start,
*length,
ChunkLocation::Memory(*mem_pos),
*encrypted,
));
}
chunks_to_flush.push((
*hash,
start,
*length,
ChunkLocation::Memory(*mem_pos),
*encrypted,
));
}
}
for &start in &in_memory_file.wal_chunks {
if let Some((hash, length, location, encrypted)) = in_memory_file.chunks.get(&start)
if let Some((hash, length, ChunkLocation::Wal(wal_pos), encrypted)) =
in_memory_file.chunks.get(&start)
{
if let ChunkLocation::WAL(wal_pos) = location {
chunks_to_flush.push((
*hash,
start,
*length,
ChunkLocation::WAL(*wal_pos),
*encrypted,
));
}
chunks_to_flush.push((
*hash,
start,
*length,
ChunkLocation::Wal(*wal_pos),
*encrypted,
));
}
}
if !chunks_to_flush.is_empty() {
@ -1009,7 +997,7 @@ impl Manifest {
};
let buffer = match location {
ChunkLocation::WAL(wal_pos) => {
ChunkLocation::Wal(wal_pos) => {
// seek to the chunk in the WAL file
wal_file.seek(SeekFrom::Start(*wal_pos)).await?;
// read the chunk data from the WAL file
@ -1028,9 +1016,8 @@ impl Manifest {
});
}
// copy the chunk data from the memory buffer
let temp_buffer =
memory_buffer[mem_pos..mem_pos + total_len as usize].to_vec();
temp_buffer
memory_buffer[mem_pos..mem_pos + total_len as usize].to_vec()
}
_ => vec![],
};
@ -1290,13 +1277,12 @@ async fn load_wal(
match record {
Ok(WALRecord::CommitTx(tx_id)) => {
if let Some((file_id, chunks)) = tx_chunks.remove(&tx_id) {
let in_memory_file =
manifest.entry(file_id).or_insert(InMemoryFile::default());
let in_memory_file = manifest.entry(file_id).or_default();
for (start, hash, length, location, encrypted) in chunks {
in_memory_file
.chunks
.insert(start, (hash, length, location.clone(), encrypted));
if let ChunkLocation::WAL(_) = location {
.insert(start, (hash, length, location, encrypted));
if let ChunkLocation::Wal(_) = location {
in_memory_file.wal_chunks.push(start);
}
}
@ -1310,7 +1296,7 @@ async fn load_wal(
let location = if entry.copy {
ChunkLocation::ColdStorage(entry.local)
} else {
ChunkLocation::WAL(data_position)
ChunkLocation::Wal(data_position)
};
let chunks = tx_chunks
.entry(entry.tx_id)
@ -1327,15 +1313,14 @@ async fn load_wal(
// if encrypted data, add encryption overhead (nonce 24 + tag 16)
current_position += 8 + record_length as u64;
if !entry.copy {
current_position += data_length as u64;
current_position += data_length;
if entry.encrypted {
current_position += ENCRYPTION_OVERHEAD as u64;
}
}
}
Ok(WALRecord::SetLength(file_id, new_length)) => {
let in_memory_file =
manifest.entry(file_id).or_insert(InMemoryFile::default());
let in_memory_file = manifest.entry(file_id).or_default();
in_memory_file.chunks.retain(|&start, _| start < new_length);
// update mem_chunks and wal_chunks
@ -1376,11 +1361,8 @@ async fn verify_manifest(
let file_hash = in_memory_file.hash();
for (chunk_hash, _, location, _encrypted) in in_memory_file.chunks.values() {
match location {
ChunkLocation::ColdStorage(local) => {
chunk_hashes.insert(*chunk_hash, *local);
}
_ => {}
if let ChunkLocation::ColdStorage(local) = location {
chunk_hashes.insert(*chunk_hash, *local);
}
}
hash_index.insert(file_hash, file.clone());
@ -1398,7 +1380,7 @@ fn generate_nonce() -> [u8; 24] {
fn encrypt(cipher: &XChaCha20Poly1305, bytes: &[u8]) -> Result<Vec<u8>, FsError> {
let nonce = generate_nonce();
let ciphertext = cipher.encrypt(&XNonce::from_slice(&nonce), bytes)?;
let ciphertext = cipher.encrypt(XNonce::from_slice(&nonce), bytes)?;
Ok([nonce.to_vec(), ciphertext].concat())
}

View File

@ -237,7 +237,7 @@ async fn bootstrap(
.expect("fs: name error reading package.zip")
.to_owned();
let mut file_path = file_path.to_string_lossy().to_string();
if !file_path.starts_with("/") {
if !file_path.starts_with('/') {
file_path = format!("/{}", file_path);
}
println!("fs: found file {}...\r", file_path);
@ -296,9 +296,9 @@ async fn bootstrap(
// for each process-entry in manifest.json:
for mut entry in package_manifest {
let wasm_bytes = &mut Vec::new();
let mut file_path = format!("{}", entry.process_wasm_path);
if file_path.starts_with("/") {
file_path = format!("{}", &file_path[1..]);
let mut file_path = entry.process_wasm_path.to_string();
if file_path.starts_with('/') {
file_path = file_path[1..].to_string();
}
package
.by_name(&file_path)
@ -362,7 +362,7 @@ async fn bootstrap(
// save in process map
let file = FileIdentifier::new_uuid();
manifest.write(&file, &wasm_bytes).await.unwrap();
manifest.write(&file, wasm_bytes).await.unwrap();
let wasm_bytes_handle = file.to_uuid().unwrap();
process_map.insert(
@ -384,7 +384,7 @@ async fn bootstrap(
if manifest.get_by_hash(&process_map_hash).await.is_none() {
let _ = manifest
.write(&kernel_process_id, &serialized_process_map)
.write(kernel_process_id, &serialized_process_map)
.await;
}
Ok(vfs_messages)
@ -416,7 +416,7 @@ async fn get_zipped_packages() -> Vec<(String, zip::ZipArchive<std::io::Cursor<V
}
}
return packages;
packages
}
pub async fn fs_sender(
@ -565,7 +565,7 @@ async fn handle_request(
});
};
let file_uuid = match maybe_file_id {
Some(id) => FileIdentifier::UUID(id),
Some(id) => FileIdentifier::Uuid(id),
None => FileIdentifier::new_uuid(),
};
@ -588,7 +588,7 @@ async fn handle_request(
});
};
let file_uuid = FileIdentifier::UUID(file_uuid);
let file_uuid = FileIdentifier::Uuid(file_uuid);
match manifest.write_at(&file_uuid, offset, &payload.bytes).await {
Ok(_) => (),
@ -603,7 +603,7 @@ async fn handle_request(
(FsResponse::Write(file_uuid.to_uuid().unwrap()), None)
}
FsAction::Read(file_uuid) => {
let file = FileIdentifier::UUID(file_uuid);
let file = FileIdentifier::Uuid(file_uuid);
match manifest.read(&file, None, None).await {
Err(e) => {
@ -616,7 +616,7 @@ async fn handle_request(
}
}
FsAction::ReadChunk(req) => {
let file = FileIdentifier::UUID(req.file);
let file = FileIdentifier::Uuid(req.file);
match manifest
.read(&file, Some(req.start), Some(req.length))
@ -632,7 +632,7 @@ async fn handle_request(
}
}
FsAction::Delete(del) => {
let file = FileIdentifier::UUID(del);
let file = FileIdentifier::Uuid(del);
manifest.delete(&file).await?;
(FsResponse::Delete(del), None)
@ -645,7 +645,7 @@ async fn handle_request(
};
let file_uuid = match maybe_file_uuid {
Some(uuid) => FileIdentifier::UUID(uuid),
Some(uuid) => FileIdentifier::Uuid(uuid),
None => FileIdentifier::new_uuid(),
};
@ -662,7 +662,7 @@ async fn handle_request(
(FsResponse::Append(file_uuid.to_uuid().unwrap()), None)
}
FsAction::Length(file_uuid) => {
let file = FileIdentifier::UUID(file_uuid);
let file = FileIdentifier::Uuid(file_uuid);
let length = manifest.get_length(&file).await;
match length {
Some(len) => (FsResponse::Length(len), None),
@ -674,7 +674,7 @@ async fn handle_request(
}
}
FsAction::SetLength((file_uuid, length)) => {
let file = FileIdentifier::UUID(file_uuid);
let file = FileIdentifier::Uuid(file_uuid);
manifest.set_length(&file, length).await?;
// doublecheck if this is the type of return statement we want.
@ -720,7 +720,7 @@ async fn handle_request(
if expects_response.is_some() {
let response = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our_name.clone(),
process: FILESYSTEM_PROCESS_ID.clone(),
@ -738,10 +738,7 @@ async fn handle_request(
},
None,
)),
payload: match bytes {
Some(bytes) => Some(Payload { mime: None, bytes }),
None => None,
},
payload: bytes.map(|bytes| Payload { mime: None, bytes }),
signed_capabilities: None,
};

View File

@ -112,7 +112,7 @@ async fn handle_message(
let request = request_builder
.headers(deserialize_headers(req.headers))
.body(body.unwrap_or(vec![]))
.body(body.unwrap_or_default())
.build()
.unwrap();
@ -180,7 +180,7 @@ fn to_pascal_case(s: &str) -> String {
fn serialize_headers(headers: &HeaderMap) -> HashMap<String, String> {
let mut hashmap = HashMap::new();
for (key, value) in headers.iter() {
let key_str = to_pascal_case(&key.to_string());
let key_str = to_pascal_case(key.as_ref());
let value_str = value.to_str().unwrap_or("").to_string();
hashmap.insert(key_str, value_str);
}

View File

@ -3,10 +3,8 @@ use crate::register;
use crate::types::*;
use anyhow::Result;
use base64;
use futures::SinkExt;
use futures::StreamExt;
use serde_urlencoded;
use route_recognizer::Router;
use std::collections::HashMap;
@ -85,7 +83,7 @@ pub async fn http_server(
if let Err(e) = http_handle_messages(
our_name.clone(),
id.clone(),
id,
source.clone(),
message,
payload,
@ -100,12 +98,7 @@ pub async fn http_server(
.await
{
send_to_loop
.send(make_error_message(
our_name.clone(),
id.clone(),
source.clone(),
e,
))
.send(make_error_message(our_name.clone(), id, source.clone(), e))
.await
.unwrap();
}
@ -134,7 +127,7 @@ async fn handle_websocket(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("GOT WEBSOCKET BYTES"),
content: "GOT WEBSOCKET BYTES".to_string(),
})
.await;
let bytes = msg.as_bytes();
@ -147,7 +140,7 @@ async fn handle_websocket(
),
})
.await;
match serde_json::from_slice::<WebSocketClientMessage>(&bytes) {
match serde_json::from_slice::<WebSocketClientMessage>(bytes) {
Ok(parsed_msg) => {
handle_incoming_ws(
parsed_msg,
@ -157,7 +150,7 @@ async fn handle_websocket(
send_to_loop.clone(),
print_tx.clone(),
write_stream.clone(),
ws_id.clone(),
ws_id,
)
.await;
}
@ -171,39 +164,33 @@ async fn handle_websocket(
}
}
} else if msg.is_text() {
match msg.to_str() {
Ok(msg_str) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str),
})
.await;
match serde_json::from_str(&msg_str) {
Ok(parsed_msg) => {
handle_incoming_ws(
parsed_msg,
our.clone(),
jwt_secret_bytes.clone().to_vec(),
websockets.clone(),
send_to_loop.clone(),
print_tx.clone(),
write_stream.clone(),
ws_id.clone(),
)
.await;
}
_ => (),
}
if let Ok(msg_str) = msg.to_str() {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str),
})
.await;
if let Ok(parsed_msg) = serde_json::from_str(msg_str) {
handle_incoming_ws(
parsed_msg,
our.clone(),
jwt_secret_bytes.clone().to_vec(),
websockets.clone(),
send_to_loop.clone(),
print_tx.clone(),
write_stream.clone(),
ws_id,
)
.await;
}
_ => (),
}
} else if msg.is_close() {
// Delete the websocket from the map
let mut ws_map = websockets.lock().await;
for (node, node_map) in ws_map.iter_mut() {
for (channel_id, id_map) in node_map.iter_mut() {
if let Some(_) = id_map.remove(&ws_id) {
if id_map.remove(&ws_id).is_some() {
// Send disconnect message
send_ws_disconnect(
node.clone(),
@ -242,10 +229,10 @@ async fn http_handle_messages(
None => {}
Some((path, channel)) => {
// if path is /rpc/message, return accordingly with base64 encoded payload
if path == "/rpc:sys:uqbar/message".to_string() {
if path == *"/rpc:sys:uqbar/message" {
let payload = payload.map(|p| {
let bytes = p.bytes;
let base64_bytes = base64::encode(&bytes);
let base64_bytes = base64::encode(bytes);
Payload {
mime: p.mime,
bytes: base64_bytes.into_bytes(),
@ -292,7 +279,7 @@ async fn http_handle_messages(
let mut ws_auth_username = our.clone();
if segments.len() == 4
&& matches!(segments.get(0), Some(&"http-proxy"))
&& matches!(segments.first(), Some(&"http-proxy"))
&& matches!(segments.get(1), Some(&"serve"))
{
if let Some(segment) = segments.get(2) {
@ -341,7 +328,7 @@ async fn http_handle_messages(
status: 503,
headers: error_headers,
body: Some(
format!("Internal Server Error").as_bytes().to_vec(),
"Internal Server Error".to_string().as_bytes().to_vec(),
),
});
}
@ -351,65 +338,64 @@ async fn http_handle_messages(
}
}
Message::Request(Request { ipc, .. }) => {
match serde_json::from_slice(&ipc) {
Ok(message) => {
match message {
HttpServerMessage::BindPath {
path,
if let Ok(message) = serde_json::from_slice(&ipc) {
match message {
HttpServerMessage::BindPath {
path,
authenticated,
local_only,
} => {
let mut path_bindings = path_bindings.write().await;
let app = source.process.clone().to_string();
let mut path = path.clone();
if app != "homepage:homepage:uqbar" {
path = if path.starts_with('/') {
format!("/{}{}", app, path)
} else {
format!("/{}/{}", app, path)
};
}
// trim trailing "/"
path = normalize_path(&path);
let bound_path = BoundPath {
app: source.process,
authenticated,
local_only,
} => {
let mut path_bindings = path_bindings.write().await;
let app = source.process.clone().to_string();
original_path: path.clone(),
};
let mut path = path.clone();
if app != "homepage:homepage:uqbar" {
path = if path.starts_with("/") {
format!("/{}{}", app, path)
} else {
format!("/{}/{}", app, path)
};
}
// trim trailing "/"
path = normalize_path(&path);
path_bindings.add(&path, bound_path);
}
HttpServerMessage::WebSocketPush(WebSocketPush { target, is_text }) => {
let Some(payload) = payload else {
return Err(HttpServerError::NoBytes);
};
let bytes = payload.bytes;
let bound_path = BoundPath {
app: source.process,
authenticated: authenticated,
local_only: local_only,
original_path: path.clone(),
};
let mut ws_map = websockets.lock().await;
let send_text = is_text.unwrap_or(false);
let response_data = if send_text {
warp::ws::Message::text(
String::from_utf8(bytes.clone()).unwrap_or_default(),
)
} else {
warp::ws::Message::binary(bytes.clone())
};
path_bindings.add(&path, bound_path);
}
HttpServerMessage::WebSocketPush(WebSocketPush { target, is_text }) => {
let Some(payload) = payload else {
return Err(HttpServerError::NoBytes);
};
let bytes = payload.bytes;
// Send to the proxy, if registered
if let Some(channel_id) = target.id.clone() {
let locked_proxies = ws_proxies.lock().await;
let mut ws_map = websockets.lock().await;
let send_text = is_text.unwrap_or(false);
let response_data = if send_text {
warp::ws::Message::text(
String::from_utf8(bytes.clone()).unwrap_or_default(),
)
} else {
warp::ws::Message::binary(bytes.clone())
};
if let Some(proxy_nodes) = locked_proxies.get(&channel_id) {
for proxy_node in proxy_nodes {
let id: u64 = rand::random();
let bytes_content = bytes.clone();
// Send to the proxy, if registered
if let Some(channel_id) = target.id.clone() {
let locked_proxies = ws_proxies.lock().await;
if let Some(proxy_nodes) = locked_proxies.get(&channel_id) {
for proxy_node in proxy_nodes {
let id: u64 = rand::random();
let bytes_content = bytes.clone();
// Send a message to the encryptor
let message = KernelMessage {
id: id.clone(),
// Send a message to the encryptor
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
@ -440,32 +426,21 @@ async fn http_handle_messages(
signed_capabilities: None,
};
send_to_loop.send(message).await.unwrap();
}
send_to_loop.send(message).await.unwrap();
}
}
}
// Send to the websocket if registered
if let Some(node_map) = ws_map.get_mut(&target.node) {
if let Some(socket_id) = &target.id {
if let Some(ws_map) = node_map.get_mut(socket_id) {
// Iterate over ws_map values and send message to all websockets
for ws in ws_map.values_mut() {
let mut locked_write_stream = ws.lock().await;
let _ = locked_write_stream
.send(response_data.clone())
.await; // TODO: change this to binary
}
} else {
// Send to all websockets
for ws_map in node_map.values_mut() {
for ws in ws_map.values_mut() {
let mut locked_write_stream = ws.lock().await;
let _ = locked_write_stream
.send(response_data.clone())
.await;
}
}
// Send to the websocket if registered
if let Some(node_map) = ws_map.get_mut(&target.node) {
if let Some(socket_id) = &target.id {
if let Some(ws_map) = node_map.get_mut(socket_id) {
// Iterate over ws_map values and send message to all websockets
for ws in ws_map.values_mut() {
let mut locked_write_stream = ws.lock().await;
let _ =
locked_write_stream.send(response_data.clone()).await;
// TODO: change this to binary
}
} else {
// Send to all websockets
@ -479,130 +454,136 @@ async fn http_handle_messages(
}
}
} else {
// Do nothing because we don't have a WS for that node
// Send to all websockets
for ws_map in node_map.values_mut() {
for ws in ws_map.values_mut() {
let mut locked_write_stream = ws.lock().await;
let _ =
locked_write_stream.send(response_data.clone()).await;
}
}
}
} else {
// Do nothing because we don't have a WS for that node
}
HttpServerMessage::ServerAction(ServerAction { action }) => {
if action == "get-jwt-secret" && source.node == our {
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: source,
rsvp: Some(Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
}),
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::json!({
"action": "set-jwt-secret"
})
.to_string()
.into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: jwt_secret_bytes.clone(),
}),
signed_capabilities: None,
};
}
HttpServerMessage::ServerAction(ServerAction { action }) => {
if action == "get-jwt-secret" && source.node == our {
let id: u64 = rand::random();
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: source,
rsvp: Some(Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
}),
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::json!({
"action": "set-jwt-secret"
})
.to_string()
.into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: jwt_secret_bytes.clone(),
}),
signed_capabilities: None,
};
send_to_loop.send(message).await.unwrap();
}
send_to_loop.send(message).await.unwrap();
}
HttpServerMessage::WsRegister(WsRegister {
auth_token,
ws_auth_token: _,
channel_id,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone())
.await;
}
}
HttpServerMessage::WsRegister(WsRegister {
auth_token,
ws_auth_token: _,
channel_id,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
}
HttpServerMessage::WsProxyDisconnect(WsProxyDisconnect { channel_id }) => {
}
HttpServerMessage::WsProxyDisconnect(WsProxyDisconnect { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "WsDisconnect".to_string(),
})
.await;
// Check the ws_proxies for this channel_id, if it exists, delete the node that forwarded
let mut locked_proxies = ws_proxies.lock().await;
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WsDisconnect"),
content: "disconnected".to_string(),
})
.await;
// Check the ws_proxies for this channel_id, if it exists, delete the node that forwarded
let mut locked_proxies = ws_proxies.lock().await;
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("disconnected"),
})
.await;
proxy_nodes.remove(&source.node);
}
proxy_nodes.remove(&source.node);
}
HttpServerMessage::WsMessage(WsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
json,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone())
.await;
}
HttpServerMessage::WsMessage(WsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
json,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
handle_ws_message(
target.clone(),
json.clone(),
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
handle_ws_message(
target.clone(),
json.clone(),
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
HttpServerMessage::EncryptedWsMessage(EncryptedWsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
encrypted,
nonce,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(
ws_proxies.clone(),
channel_id.clone(),
source.node.clone(),
)
.await;
}
HttpServerMessage::EncryptedWsMessage(EncryptedWsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
encrypted,
nonce,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(
ws_proxies.clone(),
channel_id.clone(),
source.node.clone(),
)
.await;
handle_encrypted_ws_message(
target.clone(),
our.clone(),
channel_id.clone(),
encrypted.clone(),
nonce.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
handle_encrypted_ws_message(
target.clone(),
our.clone(),
channel_id.clone(),
encrypted.clone(),
nonce.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
}
}
Err(_) => (),
}
}
}
@ -661,7 +642,7 @@ async fn http_serve(
.and(warp::filters::header::headers_cloned())
.and(
warp::filters::query::raw()
.or(warp::any().map(|| String::default()))
.or(warp::any().map(String::default))
.unify()
.map(|query_string: String| {
if query_string.is_empty() {
@ -747,7 +728,7 @@ async fn handler(
// we extract message from base64 encoded bytes in data
// and send it to the correct app.
let message = if app == "rpc:sys:uqbar".to_string() {
let message = if app == *"rpc:sys:uqbar" {
let rpc_message: RpcMessage = match serde_json::from_slice(&body) {
// to_vec()?
Ok(v) => v,
@ -767,7 +748,7 @@ async fn handler(
}
};
let payload = match base64::decode(&rpc_message.data.unwrap_or("".to_string())) {
let payload = match base64::decode(rpc_message.data.unwrap_or("".to_string())) {
Ok(bytes) => Some(Payload {
mime: rpc_message.mime,
bytes,
@ -807,7 +788,7 @@ async fn handler(
payload,
signed_capabilities: None,
}
} else if app == "encryptor:sys:uqbar".to_string() {
} else if app == *"encryptor:sys:uqbar" {
let body_json = match String::from_utf8(body.to_vec()) {
Ok(s) => s,
Err(_) => {

View File

@ -36,7 +36,7 @@ pub struct RpcMessage {
}
pub fn parse_auth_token(auth_token: String, jwt_secret: Vec<u8>) -> Result<String, Error> {
let secret: Hmac<Sha256> = match Hmac::new_from_slice(&jwt_secret.as_slice()) {
let secret: Hmac<Sha256> = match Hmac::new_from_slice(jwt_secret.as_slice()) {
Ok(secret) => secret,
Err(_) => {
return Ok("Error recovering jwt secret".to_string());
@ -56,7 +56,7 @@ pub fn auth_cookie_valid(our_node: String, cookie: &str, jwt_secret: Vec<u8>) ->
let mut auth_token = None;
for cookie_part in cookie_parts {
let cookie_part_parts: Vec<&str> = cookie_part.split("=").collect();
let cookie_part_parts: Vec<&str> = cookie_part.split('=').collect();
if cookie_part_parts.len() == 2
&& cookie_part_parts[0] == format!("uqbar-auth_{}", our_node)
{
@ -144,14 +144,14 @@ pub async fn handle_incoming_ws(
send_to_loop.clone(),
print_tx.clone(),
write_stream.clone(),
ws_id.clone(),
ws_id,
)
.await;
} else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("Auth token parsing failed for WsRegister"),
content: "Auth token parsing failed for WsRegister".to_string(),
})
.await;
}
@ -262,10 +262,7 @@ pub fn deserialize_headers(hashmap: HashMap<String, String>) -> HeaderMap {
}
pub async fn is_port_available(bind_addr: &str) -> bool {
match TcpListener::bind(bind_addr).await {
Ok(_) => true,
Err(_) => false,
}
TcpListener::bind(bind_addr).await.is_ok()
}
pub fn binary_encoded_string_to_bytes(s: &str) -> Vec<u8> {
@ -288,13 +285,13 @@ pub async fn handle_ws_register(
let mut ws_map = websockets.lock().await;
let node_map = ws_map.entry(node.clone()).or_insert(HashMap::new());
let id_map = node_map.entry(channel_id.clone()).or_insert(HashMap::new());
id_map.insert(ws_id.clone(), write_stream.clone());
id_map.insert(ws_id, write_stream.clone());
// Send a message to the target node to add to let it know we are proxying
if node != our {
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
@ -321,7 +318,7 @@ pub async fn handle_ws_register(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WEBSOCKET CHANNEL FORWARDED!"),
content: "WEBSOCKET CHANNEL FORWARDED!".to_string(),
})
.await;
}
@ -329,7 +326,7 @@ pub async fn handle_ws_register(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WEBSOCKET CHANNEL REGISTERED!"),
content: "WEBSOCKET CHANNEL REGISTERED!".to_string(),
})
.await;
}
@ -343,7 +340,7 @@ pub async fn handle_ws_message(
) {
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
@ -385,7 +382,7 @@ pub async fn handle_encrypted_ws_message(
// Send a message to the encryptor
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
@ -433,7 +430,7 @@ pub async fn proxy_ws_message(
) {
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
@ -481,7 +478,7 @@ pub async fn send_ws_disconnect(
) {
let id: u64 = rand::random();
let message = KernelMessage {
id: id.clone(),
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),

View File

@ -57,7 +57,7 @@ struct ProcessWasi {
#[derive(Serialize, Deserialize)]
struct StartProcessMetadata {
source: t::Address,
process_id: Option<t::ProcessId>,
process_id: t::ProcessId,
persisted: t::PersistedProcess,
reboot: bool,
}
@ -262,7 +262,7 @@ impl StandardHost for ProcessWasi {
};
let our_drive_name = [
self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher_node(),
self.process.metadata.our.process.publisher(),
]
.join(":");
let Ok(Ok((_, hash_response))) = send_and_await_response(
@ -333,7 +333,7 @@ impl StandardHost for ProcessWasi {
let new_process_id = t::ProcessId::new(
Some(&name),
self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher_node(),
self.process.metadata.our.process.publisher(),
);
let Ok(Ok((_, response))) = send_and_await_response(
self,
@ -374,7 +374,7 @@ impl StandardHost for ProcessWasi {
signature: self
.process
.keypair
.sign(&bincode::serialize(&cap).unwrap())
.sign(&rmp_serde::to_vec(&cap).unwrap())
.as_ref()
.to_vec(),
})
@ -473,7 +473,7 @@ impl StandardHost for ProcessWasi {
signature: self
.process
.keypair
.sign(&bincode::serialize(&cap).unwrap())
.sign(&rmp_serde::to_vec(&cap).unwrap())
.as_ref()
.to_vec(),
})
@ -503,7 +503,7 @@ impl StandardHost for ProcessWasi {
let sig = self
.process
.keypair
.sign(&bincode::serialize(&cap).unwrap());
.sign(&rmp_serde::to_vec(&cap).unwrap_or_default());
return Ok(Some(wit::SignedCapability {
issuer: cap.issuer.en_wit().to_owned(),
params: cap.params.clone(),
@ -539,7 +539,10 @@ impl StandardHost for ProcessWasi {
issuer: t::Address::de_wit(signed_cap.issuer),
params: signed_cap.params,
};
pk.verify(&bincode::serialize(&cap).unwrap(), &signed_cap.signature)?;
pk.verify(
&rmp_serde::to_vec(&cap).unwrap_or_default(),
&signed_cap.signature,
)?;
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
@ -575,7 +578,7 @@ impl StandardHost for ProcessWasi {
.caps_oracle
.send(t::CapMessage::Has {
on: prompt.source.process.clone(),
cap: cap.clone(),
cap,
responder: tx,
})
.await;
@ -626,7 +629,10 @@ impl StandardHost for ProcessWasi {
issuer: t::Address::de_wit(signed_cap.issuer),
params: signed_cap.params,
};
pk.verify(&bincode::serialize(&cap).unwrap(), &signed_cap.signature)?;
pk.verify(
&rmp_serde::to_vec(&cap).unwrap_or_default(),
&signed_cap.signature,
)?;
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.process
@ -881,7 +887,7 @@ impl ProcessState {
content: "kernel: prompting_message has no rsvp".into(),
})
.await;
return None;
None
}
Some(address) => Some((prompting_message.id, address.clone())),
}
@ -998,13 +1004,13 @@ impl ProcessState {
let (id, target) = match self.make_response_id_target().await {
Some(r) => r,
None => {
self.send_to_terminal
let _ = self
.send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!("kernel: dropping Response {:?}", response),
})
.await
.unwrap();
.await;
return;
}
};
@ -1029,14 +1035,14 @@ impl ProcessState {
signed_capabilities: None,
})
.await
.unwrap();
.expect("fatal: kernel couldn't send response");
}
}
/// persist process_map state for next bootup
/// and wait for filesystem to respond in the affirmative
async fn persist_state(
our_name: &String,
our_name: &str,
send_to_loop: &t::MessageSender,
process_map: &t::ProcessMap,
) -> Result<()> {
@ -1045,11 +1051,11 @@ async fn persist_state(
.send(t::KernelMessage {
id: rand::random(),
source: t::Address {
node: our_name.clone(),
node: our_name.to_string(),
process: KERNEL_PROCESS_ID.clone(),
},
target: t::Address {
node: our_name.clone(),
node: our_name.to_string(),
process: FILESYSTEM_PROCESS_ID.clone(),
},
rsvp: None,
@ -1083,58 +1089,43 @@ async fn make_process_loop(
if !booted.load(Ordering::Relaxed) {
let mut pre_boot_queue = Vec::<Result<t::KernelMessage, t::WrappedSendError>>::new();
while let Some(message) = recv_in_process.recv().await {
if let Err(_) = &message {
pre_boot_queue.push(message);
continue;
match message {
Err(_) => {
pre_boot_queue.push(message);
continue;
}
Ok(message) => {
if (message.source
== t::Address {
node: metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
})
&& (message.message
== t::Message::Request(t::Request {
inherit: false,
expects_response: None,
ipc: "booted".as_bytes().to_vec(),
metadata: None,
}))
{
break;
}
pre_boot_queue.push(Ok(message));
}
}
let message = message.unwrap();
if (message.source
== t::Address {
node: metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
})
&& (message.message
== t::Message::Request(t::Request {
inherit: false,
expects_response: None,
ipc: "booted".as_bytes().to_vec(),
metadata: None,
}))
{
break;
}
pre_boot_queue.push(Ok(message));
}
}
let component =
Component::new(&engine, wasm_bytes).expect("make_process_loop: couldn't read file");
Component::new(engine, wasm_bytes).expect("make_process_loop: couldn't read file");
let mut linker = Linker::new(&engine);
let mut linker = Linker::new(engine);
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
let table = Table::new();
let wasi = WasiCtxBuilder::new().build(); // (&mut table).unwrap();
let wasi = WasiCtxBuilder::new().build();
wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap();
// wasmtime_wasi::preview2::bindings::clocks::wall_clock::add_to_linker(&mut linker, |t| t)
// .unwrap();
// wasmtime_wasi::preview2::bindings::clocks::monotonic_clock::add_to_linker(&mut linker, |t| t)
// .unwrap();
// wasmtime_wasi::preview2::bindings::clocks::timezone::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::filesystem::filesystem::add_to_linker(&mut linker, |t| t)
// .unwrap();
// wasmtime_wasi::preview2::bindings::poll::poll::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::io::streams::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::random::random::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::exit::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::environment::add_to_linker(&mut linker, |t| t)
// .unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::preopens::add_to_linker(&mut linker, |t| t)
// .unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::stdin::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::stdout::add_to_linker(&mut linker, |t| t).unwrap();
// wasmtime_wasi::preview2::bindings::cli_base::stderr::add_to_linker(&mut linker, |t| t).unwrap();
let mut store = Store::new(
engine,
@ -1250,7 +1241,7 @@ async fn make_process_loop(
signed_capabilities: None,
})
.await
.unwrap();
.expect("event loop: fatal: sender died");
// fulfill the designated OnPanic behavior
match metadata.on_panic {
@ -1282,26 +1273,30 @@ async fn make_process_loop(
signed_capabilities: None,
})
.await
.unwrap();
.expect("event loop: fatal: sender died");
}
// if requests, fire them
// check that dying process had capability to send each message
// even in death, a process can only message processes it has capabilities for
t::OnPanic::Requests(requests) => {
for (address, mut request, payload) in requests {
request.expects_response = None;
// TODO caps check
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: metadata.our.clone(),
target: address,
rsvp: None,
message: t::Message::Request(request),
payload,
signed_capabilities: None,
})
.await
.unwrap();
if initial_capabilities.contains(&t::Capability {
issuer: address.clone(),
params: "\"messaging\"".into(),
}) {
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: metadata.our.clone(),
target: address,
rsvp: None,
message: t::Message::Request(request),
payload,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
}
}
}
}
@ -1329,13 +1324,12 @@ async fn handle_kernel_request(
};
let command: t::KernelCommand = match serde_json::from_slice(&request.ipc) {
Err(e) => {
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!("kernel: couldn't parse command: {:?}", e),
})
.await
.unwrap();
.await;
return;
}
Ok(c) => c,
@ -1387,13 +1381,12 @@ async fn handle_kernel_request(
public,
} => {
let Some(ref payload) = km.payload else {
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: "kernel: process startup requires bytes".into(),
})
.await
.unwrap();
.await;
// fire an error back
send_to_loop
.send(t::KernelMessage {
@ -1417,7 +1410,7 @@ async fn handle_kernel_request(
signed_capabilities: None,
})
.await
.unwrap();
.expect("event loop: fatal: sender died");
return;
};
@ -1429,11 +1422,13 @@ async fn handle_kernel_request(
issuer: signed_cap.issuer,
params: signed_cap.params,
};
match pk.verify(&bincode::serialize(&cap).unwrap(), &signed_cap.signature) {
match pk.verify(
&rmp_serde::to_vec(&cap).unwrap_or_default(),
&signed_cap.signature,
) {
Ok(_) => {}
Err(e) => {
println!("kernel: StartProcess no cap: {}", e);
// TODO should this make the spawn fail??? could go either way
continue;
}
}
@ -1449,14 +1444,14 @@ async fn handle_kernel_request(
params: "\"messaging\"".into(),
});
// fires "success" response back
start_process(
our_name,
// fires "success" response back if successful
match start_process(
our_name.clone(),
booted,
keypair.clone(),
km.id,
&payload.bytes,
send_to_loop,
send_to_loop.clone(),
send_to_terminal,
senders,
process_handles,
@ -1467,9 +1462,9 @@ async fn handle_kernel_request(
source: if let Some(rsvp) = km.rsvp {
rsvp
} else {
km.source
km.source.clone()
},
process_id: Some(id),
process_id: id,
persisted: t::PersistedProcess {
wasm_bytes_handle,
on_panic,
@ -1479,7 +1474,35 @@ async fn handle_kernel_request(
reboot: false,
},
)
.await;
.await
{
Ok(()) => (),
Err(_e) => {
send_to_loop
.send(t::KernelMessage {
id: km.id,
source: t::Address {
node: our_name.clone(),
process: KERNEL_PROCESS_ID.clone(),
},
target: km.source,
rsvp: None,
message: t::Message::Response((
t::Response {
inherit: false,
ipc: serde_json::to_vec(&t::KernelResponse::StartProcessError)
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
}
}
}
// reboot from persisted process.
t::KernelCommand::RebootProcess {
@ -1510,7 +1533,7 @@ async fn handle_kernel_request(
metadata: Some(
serde_json::to_string(&StartProcessMetadata {
source: km.source,
process_id: Some(process_id),
process_id,
persisted,
reboot: true,
})
@ -1521,30 +1544,28 @@ async fn handle_kernel_request(
signed_capabilities: None,
})
.await
.unwrap()
.expect("event loop: fatal: sender died");
}
t::KernelCommand::KillProcess(process_id) => {
// brutal and savage killing: aborting the task.
// do not do this to a process if you don't want to risk
// dropped messages / un-replied-to-requests
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!("kernel: killing process {:?}", process_id),
verbosity: 0,
content: format!("kernel: killing process {}", process_id),
})
.await
.unwrap();
.await;
let _ = senders.remove(&process_id);
let process_handle = match process_handles.remove(&process_id) {
Some(ph) => ph,
None => {
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!("kernel: no such process {:?} to kill", process_id),
})
.await
.unwrap();
.await;
return;
}
};
@ -1555,7 +1576,7 @@ async fn handle_kernel_request(
}
process_map.remove(&process_id);
let _ = persist_state(&our_name, &send_to_loop, &process_map).await;
let _ = persist_state(&our_name, &send_to_loop, process_map).await;
send_to_loop
.send(t::KernelMessage {
@ -1579,7 +1600,7 @@ async fn handle_kernel_request(
signed_capabilities: None,
})
.await
.unwrap();
.expect("event loop: fatal: sender died");
}
}
}
@ -1624,7 +1645,7 @@ async fn handle_kernel_response(
return;
};
let meta: StartProcessMetadata = match serde_json::from_str(&metadata) {
let meta: StartProcessMetadata = match serde_json::from_str(metadata) {
Err(_) => {
let _ = send_to_terminal
.send(t::Printout {
@ -1638,27 +1659,26 @@ async fn handle_kernel_response(
};
let Some(ref payload) = km.payload else {
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"kernel: process {:?} seemingly could not be read from filesystem. km: {}",
"kernel: process {} seemingly could not be read from filesystem. km: {}",
meta.process_id, km
),
})
.await
.unwrap();
.await;
return;
};
start_process(
match start_process(
our_name,
booted,
keypair.clone(),
km.id,
&payload.bytes,
send_to_loop,
send_to_terminal,
send_to_terminal.clone(),
senders,
process_handles,
process_map,
@ -1666,7 +1686,18 @@ async fn handle_kernel_response(
caps_oracle,
meta,
)
.await;
.await
{
Ok(()) => (),
Err(e) => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel: process start fail: {:?}", e),
})
.await;
}
}
}
async fn start_process(
@ -1683,47 +1714,34 @@ async fn start_process(
engine: &Engine,
caps_oracle: t::CapMessageSender,
process_metadata: StartProcessMetadata,
) {
) -> Result<()> {
let (send_to_process, recv_in_process) =
mpsc::channel::<Result<t::KernelMessage, t::WrappedSendError>>(PROCESS_CHANNEL_CAPACITY);
let process_id = match process_metadata.process_id {
Some(id) => {
if senders.contains_key(&id) {
// TODO: make a Response to indicate failure?
send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel: process with ID {} already exists", id),
})
.await
.unwrap();
return;
} else {
id
}
}
// first cases was for reboot or start with defined name, this is for start without name
None => {
// TODO change signature of outer fn
unimplemented!()
}
};
let id = process_metadata.process_id;
if senders.contains_key(&id) {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel: process with ID {} already exists", id),
})
.await;
return Err(anyhow::anyhow!("process with ID {} already exists", id));
}
senders.insert(
process_id.clone(),
id.clone(),
ProcessSender::Userspace(send_to_process.clone()),
);
let metadata = t::ProcessMetadata {
our: t::Address {
node: our_name.clone(),
process: process_id.clone(),
process: id.clone(),
},
wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle.clone(),
wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle,
on_panic: process_metadata.persisted.on_panic.clone(),
public: process_metadata.persisted.public,
};
process_handles.insert(
process_id.clone(),
id.clone(),
tokio::spawn(
make_process_loop(
booted,
@ -1733,7 +1751,7 @@ async fn start_process(
send_to_terminal.clone(),
recv_in_process,
send_to_process,
&km_payload_bytes,
km_payload_bytes,
caps_oracle,
engine,
)
@ -1741,10 +1759,10 @@ async fn start_process(
),
);
process_map.insert(process_id, process_metadata.persisted);
process_map.insert(id, process_metadata.persisted);
if !process_metadata.reboot {
// if new, persist
let _ = persist_state(&our_name, &send_to_loop, &process_map).await;
persist_state(&our_name, &send_to_loop, process_map).await?;
}
send_to_loop
@ -1759,7 +1777,7 @@ async fn start_process(
message: t::Message::Response((
t::Response {
inherit: false,
ipc: serde_json::to_vec(&t::KernelResponse::StartedProcess).unwrap(),
ipc: serde_json::to_vec(&t::KernelResponse::StartedProcess)?,
metadata: None,
},
None,
@ -1767,8 +1785,8 @@ async fn start_process(
payload: None,
signed_capabilities: None,
})
.await
.unwrap();
.await?;
Ok(())
}
/// process event loop. allows WASM processes to send messages to various runtime modules.
@ -1834,30 +1852,35 @@ async fn make_event_loop(
signed_capabilities: None,
})
.await
.unwrap();
.expect("fatal: kernel event loop died");
}
if let t::OnPanic::Requests(requests) = &persisted.on_panic {
// if a persisted process had on-death-requests, we should perform them now
// TODO check for caps here
// even in death, a process can only message processes it has capabilities for
for (address, request, payload) in requests {
// the process that made the request is dead, so never expects response
let mut request = request.clone();
let mut request = request.to_owned();
request.expects_response = None;
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: t::Address {
node: our_name.clone(),
process: process_id.clone(),
},
target: address.clone(),
rsvp: None,
message: t::Message::Request(request),
payload: payload.clone(),
signed_capabilities: None,
})
.await
.unwrap();
if persisted.capabilities.contains(&t::Capability {
issuer: address.clone(),
params: "\"messaging\"".into(),
}) {
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: t::Address {
node: our_name.clone(),
process: process_id.clone(),
},
target: address.clone(),
rsvp: None,
message: t::Message::Request(request),
payload: payload.clone(),
signed_capabilities: None,
})
.await
.expect("fatal: kernel event loop died");
}
}
}
}
@ -1886,7 +1909,7 @@ async fn make_event_loop(
signed_capabilities: None,
})
.await
.unwrap();
.expect("fatal: kernel event loop died");
// main message loop
loop {
@ -1897,8 +1920,7 @@ async fn make_event_loop(
is_debug = !is_debug;
}
},
ne = network_error_recv.recv() => {
let Some(wrapped_network_error) = ne else { return Ok(()) };
Some(wrapped_network_error) = network_error_recv.recv() => {
let _ = send_to_terminal.send(
t::Printout {
verbosity: 1,
@ -1916,16 +1938,15 @@ async fn make_event_loop(
// a message directed to not-our-node
}
None => {
send_to_terminal
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"event loop: don't have {:?} amongst registered processes (got message for it from net)",
"event loop: don't have {} amongst registered processes (got net error for it)",
wrapped_network_error.source.process,
)
})
.await
.unwrap();
.await;
}
}
},
@ -1938,16 +1959,20 @@ async fn make_event_loop(
// enforce that if message is directed over the network, process has capability to do so
if kernel_message.source.node == our_name
&& kernel_message.target.node != our_name {
if !process_map.get(&kernel_message.source.process).unwrap().capabilities.contains(
&t::Capability {
issuer: t::Address {
let Some(proc) = process_map.get(&kernel_message.source.process) else {
continue
};
if !proc.capabilities.contains(
&t::Capability {
issuer: t::Address {
node: our_name.clone(),
process: KERNEL_PROCESS_ID.clone(),
},
params: "\"network\"".into(),
}) {
}
) {
// capabilities are not correct! skip this message.
// TODO some kind of error thrown back at process
// TODO: some kind of error thrown back at process?
let _ = send_to_terminal.send(
t::Printout {
verbosity: 0,
@ -1960,10 +1985,19 @@ async fn make_event_loop(
continue;
}
} else if kernel_message.source.node != our_name {
// note that messaging restrictions only apply to *local* processes, if your
// process has networking capabilities, it can be messaged by any process remotely..
// note that messaging restrictions only apply to *local* processes:
// your process can be messaged by any process remotely if it has
// networking capabilities.
let Some(persisted) = process_map.get(&kernel_message.target.process) else {
println!("kernel: did not find process in process_map: {}\r", kernel_message.target.process);
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"event loop: don't have {} amongst registered processes (got message for it from network)",
kernel_message.source.process,
)
})
.await;
continue;
};
if !persisted.capabilities.contains(
@ -1979,7 +2013,7 @@ async fn make_event_loop(
t::Printout {
verbosity: 0,
content: format!(
"event loop: process {} doesn't have capability to receive networked messages",
"event loop: process {} got a message from over the network, but doesn't have capability to receive networked messages",
kernel_message.target.process
)
}
@ -1993,40 +2027,37 @@ async fn make_event_loop(
&& kernel_message.source.process != *FILESYSTEM_PROCESS_ID
{
let Some(persisted_source) = process_map.get(&kernel_message.source.process) else {
println!("kernel: did not find process in process_map: {}\r", kernel_message.source.process);
continue;
continue
};
let Some(persisted_target) = process_map.get(&kernel_message.target.process) else {
println!("kernel: did not find process in process_map: {}\r", kernel_message.target.process);
continue;
continue
};
if !persisted_target.public {
if !persisted_source.capabilities.contains(&t::Capability {
if !persisted_target.public && !persisted_source.capabilities.contains(&t::Capability {
issuer: t::Address {
node: our_name.clone(),
process: kernel_message.target.process.clone(),
},
params: "\"messaging\"".into(),
}) {
// capabilities are not correct! skip this message.
// TODO some kind of error thrown back at process
let _ = send_to_terminal.send(
t::Printout {
verbosity: 0,
content: format!(
"event loop: process {} doesn't have capability to message process {}",
kernel_message.source.process, kernel_message.target.process
)
}
).await;
continue;
}
// capabilities are not correct! skip this message.
// TODO some kind of error thrown back at process?
let _ = send_to_terminal.send(
t::Printout {
verbosity: 0,
content: format!(
"event loop: process {} doesn't have capability to message process {}",
kernel_message.source.process, kernel_message.target.process
)
}
).await;
continue;
}
}
}
// end capabilities checks
// if debug mode is on, wait for user to step through
while is_debug {
let debug = recv_debug_in_loop.recv().await.unwrap();
let debug = recv_debug_in_loop.recv().await.expect("event loop: debug channel died");
match debug {
t::DebugCommand::Toggle => is_debug = !is_debug,
t::DebugCommand::Step => break,
@ -2040,7 +2071,6 @@ async fn make_event_loop(
}
).await;
if our_name != kernel_message.target.node {
// unrecoverable if fails
send_to_net.send(kernel_message).await.expect("fatal: net module died");
} else if kernel_message.target.process.process() == "kernel" {
// kernel only accepts messages from our own node
@ -2081,34 +2111,26 @@ async fn make_event_loop(
}
} else {
// pass message to appropriate runtime module or process
// the receiving process is automatically granted
// capability to communicate with the sending process.
if our_name == kernel_message.source.node {
match process_map.get_mut(&kernel_message.target.process) {
None => {
// this should never be hit?
println!("got message for process {:?} but it doesn't exist?", kernel_message.target.process);
}
Some(p) => {
let cap = t::Capability {
issuer: kernel_message.source.clone(),
params: "\"messaging\"".into(),
};
if !p.capabilities.contains(&cap) {
// insert cap in process if it doesn't already have it
p.capabilities.insert(cap);
let _ = persist_state(&our_name, &send_to_loop, &process_map).await;
match senders.get(&kernel_message.target.process) {
Some(ProcessSender::Userspace(sender)) => {
let target = kernel_message.target.process.clone();
match sender.send(Ok(kernel_message)).await {
Ok(()) => continue,
Err(_e) => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"event loop: process {} appears to have died",
target
)
})
.await;
}
}
}
}
match senders.get(&kernel_message.target.process) {
Some(ProcessSender::Userspace(sender)) => {
// TODO: should this failing should crash kernel? probably not
sender.send(Ok(kernel_message)).await.unwrap();
}
Some(ProcessSender::Runtime(sender)) => {
sender.send(kernel_message).await.expect("fatal: runtime module died");
sender.send(kernel_message).await.expect("event loop: fatal: runtime module died");
}
None => {
send_to_terminal
@ -2121,7 +2143,7 @@ async fn make_event_loop(
)
})
.await
.unwrap();
.expect("event loop: fatal: terminal sender died");
}
}
}

View File

@ -43,7 +43,7 @@ pub fn encode_keyfile(
);
let key = Key::<Aes256Gcm>::from_slice(&disk_key);
let cipher = Aes256Gcm::new(&key);
let cipher = Aes256Gcm::new(key);
let network_nonce = Aes256Gcm::generate_nonce(&mut OsRng); // 96-bits; unique per message
let jwt_nonce = Aes256Gcm::generate_nonce(&mut OsRng);
@ -82,7 +82,7 @@ pub fn decode_keyfile(keyfile: Vec<u8>, password: &str) -> Result<Keyfile, &'sta
);
let cipher_key = Key::<Aes256Gcm>::from_slice(&disk_key);
let cipher = Aes256Gcm::new(&cipher_key);
let cipher = Aes256Gcm::new(cipher_key);
let net_nonce = generic_array::GenericArray::from_slice(&key_enc[..12]);
let jwt_nonce = generic_array::GenericArray::from_slice(&jwt_enc[..12]);
@ -112,6 +112,14 @@ pub fn decode_keyfile(keyfile: Vec<u8>, password: &str) -> Result<Keyfile, &'sta
})
}
pub fn get_username(keyfile: Vec<u8>) -> Result<String, &'static str> {
let (username, _routers, _salt, _key_enc, _jwt_enc, _file_enc) =
bincode::deserialize::<(String, Vec<String>, Vec<u8>, Vec<u8>, Vec<u8>, Vec<u8>)>(&keyfile)
.map_err(|_| "failed to deserialize keyfile")?;
Ok(username)
}
/// # Returns
/// a pair of (public key (encoded as a hex string), serialized key as a pkcs8 Document)
pub fn generate_networking_key() -> (String, Document) {

View File

@ -1,6 +1,6 @@
use crate::types::*;
use anyhow::Result;
use dotenv;
use std::env;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
@ -88,7 +88,7 @@ async fn main() {
mpsc::channel(WEBSOCKET_SENDER_CHANNEL_CAPACITY);
// filesystem receives request messages via this channel, kernel sends messages
let (fs_message_sender, fs_message_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(FILESYSTEM_CHANNEL_CAPACITY.clone());
mpsc::channel(FILESYSTEM_CHANNEL_CAPACITY);
// http server channel w/ websockets (eyre)
let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
@ -194,6 +194,7 @@ async fn main() {
// if any do not match, we should prompt user to create a "transaction"
// that updates their PKI info on-chain.
let http_server_port = http_server::find_open_port(8080).await.unwrap();
println!("login or register at http://localhost:{}", http_server_port);
let (kill_tx, kill_rx) = oneshot::channel::<bool>();
let disk_keyfile = match fs::read(format!("{}/.keys", home_directory_path)).await {
@ -206,8 +207,7 @@ async fn main() {
_ = register::register(tx, kill_rx, our_ip.to_string(), http_server_port, disk_keyfile)
=> panic!("registration failed"),
(our, decoded_keyfile, encoded_keyfile) = async {
while let Some(fin) = rx.recv().await { return fin }
panic!("registration failed")
rx.recv().await.expect("registration failed")
} => (our, decoded_keyfile, encoded_keyfile),
};
@ -373,19 +373,11 @@ async fn main() {
// if a runtime task exits, try to recover it,
// unless it was terminal signaling a quit
let quit_msg: String = tokio::select! {
Some(res) = tasks.join_next() => {
if let Err(e) = res {
format!("what does this mean? {:?}", e)
} else if let Ok(Err(e)) = res {
format!(
"\x1b[38;5;196muh oh, a kernel process crashed: {}\x1b[0m",
e
)
// TODO restart the task?
} else {
format!("what does this mean???")
// TODO restart the task?
}
Some(Ok(res)) = tasks.join_next() => {
format!(
"\x1b[38;5;196muh oh, a kernel process crashed -- this should never happen: {:?}\x1b[0m",
res
)
}
quit = terminal::terminal(
our.clone(),
@ -402,10 +394,10 @@ async fn main() {
}
}
};
// shutdown signal to fs for flush
let _ = fs_kill_send.send(());
let _ = fs_kill_confirm_recv.await;
// println!("fs shutdown complete.");
// gracefully abort all running processes in kernel
let _ = kernel_message_sender
@ -430,10 +422,10 @@ async fn main() {
signed_capabilities: None,
})
.await;
// abort all remaining tasks
tasks.shutdown().await;
let _ = crossterm::terminal::disable_raw_mode();
println!("");
println!("\x1b[38;5;196m{}\x1b[0m", quit_msg);
println!("\r\n\x1b[38;5;196m{}\x1b[0m", quit_msg);
return;
}

View File

@ -354,65 +354,70 @@ async fn direct_networking(
}
}
}
// 3. receive incoming TCP connections
// 3. join any closed forwarding connection tasks and destroy them
// TODO can do more here if desired
Some(res) = forwarding_connections.join_next() => {
match res {
Ok(()) => continue,
Err(_e) => continue,
}
}
// 4. receive incoming TCP connections
Ok((stream, _socket_addr)) = tcp.accept() => {
// TODO we can perform some amount of validation here
// to prevent some amount of potential DDoS attacks.
// can also block based on socket_addr
match accept_async(MaybeTlsStream::Plain(stream)).await {
Ok(websocket) => {
print_debug(&print_tx, "net: received new websocket connection").await;
let (peer_id, routing_for, conn) =
match recv_connection(
&our,
&our_ip,
&pki,
&peers,
&mut pending_passthroughs,
&keypair,
websocket).await
{
Ok(res) => res,
Err(e) => {
print_tx.send(Printout {
verbosity: 0,
content: format!("net: recv_connection failed: {e}"),
}).await?;
continue;
}
};
// TODO if their handshake indicates they want us to proxy
// for them (aka act as a router for them) we can choose
// whether to do so here!
// if conn is direct, add peer. if passthrough, add to our
// forwarding connections joinset
match conn {
Connection::Peer(peer_conn) => {
save_new_peer(
&peer_id,
routing_for,
peers.clone(),
peer_conn,
None,
&kernel_message_tx,
&print_tx
).await;
}
Connection::Passthrough(passthrough_conn) => {
forwarding_connections.spawn(maintain_passthrough(
passthrough_conn,
));
}
Connection::PendingPassthrough(pending_conn) => {
pending_passthroughs.insert(
(peer_id.name.clone(), pending_conn.target.clone()),
pending_conn
);
// ignore connections we failed to accept...?
if let Ok(websocket) = accept_async(MaybeTlsStream::Plain(stream)).await {
print_debug(&print_tx, "net: received new websocket connection").await;
let (peer_id, routing_for, conn) =
match recv_connection(
&our,
&our_ip,
&pki,
&peers,
&mut pending_passthroughs,
&keypair,
websocket).await
{
Ok(res) => res,
Err(e) => {
print_tx.send(Printout {
verbosity: 0,
content: format!("net: recv_connection failed: {e}"),
}).await?;
continue;
}
};
// TODO if their handshake indicates they want us to proxy
// for them (aka act as a router for them) we can choose
// whether to do so here!
// if conn is direct, add peer. if passthrough, add to our
// forwarding connections joinset
match conn {
Connection::Peer(peer_conn) => {
save_new_peer(
&peer_id,
routing_for,
peers.clone(),
peer_conn,
None,
&kernel_message_tx,
&print_tx
).await;
}
Connection::Passthrough(passthrough_conn) => {
forwarding_connections.spawn(maintain_passthrough(
passthrough_conn,
));
}
Connection::PendingPassthrough(pending_conn) => {
pending_passthroughs.insert(
(peer_id.name.clone(), pending_conn.target.clone()),
pending_conn
);
}
}
// ignore connections we failed to accept...?
Err(_) => {}
}
}
}
@ -543,7 +548,7 @@ async fn init_connection_via_router(
None => continue,
Some(id) => id,
};
match init_connection(&our, &our_ip, peer_id, &keypair, Some(&router_id), false).await {
match init_connection(our, our_ip, peer_id, keypair, Some(&router_id), false).await {
Ok(direct_conn) => {
save_new_peer(
peer_id,
@ -560,7 +565,7 @@ async fn init_connection_via_router(
Err(_) => continue,
}
}
return false;
false
}
async fn recv_connection(
@ -584,7 +589,7 @@ async fn recv_connection(
// and create a Passthrough connection if so.
// a Noise 'e' message with have len 32
if first_message.len() != 32 {
let (their_id, target_name) = validate_routing_request(&our.name, &first_message, pki)?;
let (their_id, target_name) = validate_routing_request(&our.name, first_message, pki)?;
let (id, conn) = create_passthrough(
our,
our_ip,
@ -605,7 +610,7 @@ async fn recv_connection(
// -> e, ee, s, es
send_uqbar_handshake(
&our,
our,
keypair,
&our_static_key,
&mut noise,
@ -684,7 +689,7 @@ async fn recv_connection_via_router(
// -> e, ee, s, es
send_uqbar_handshake(
&our,
our,
keypair,
&our_static_key,
&mut noise,
@ -788,7 +793,7 @@ async fn init_connection(
// -> s, se
send_uqbar_handshake(
&our,
our,
keypair,
&our_static_key,
&mut noise,
@ -820,9 +825,9 @@ async fn handle_local_message(
kernel_message_tx: &MessageSender,
print_tx: &PrintSender,
) -> Result<()> {
print_debug(&print_tx, "net: handling local message").await;
print_debug(print_tx, "net: handling local message").await;
let ipc = match km.message {
Message::Request(request) => request.ipc,
Message::Request(ref request) => &request.ipc,
Message::Response((response, _context)) => {
// these are received as a router, when we send ConnectionRequests
// to a node we do routing for.
@ -846,7 +851,7 @@ async fn handle_local_message(
};
if km.source.node != our.name {
if let Ok(act) = rmp_serde::from_slice::<NetActions>(&ipc) {
if let Ok(act) = rmp_serde::from_slice::<NetActions>(ipc) {
match act {
NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => {
// for now, we don't get these from remote.
@ -875,8 +880,8 @@ async fn handle_local_message(
peers,
peer_conn,
None,
&kernel_message_tx,
&print_tx,
kernel_message_tx,
print_tx,
)
.await;
Ok(NetResponses::Accepted(from.clone()))
@ -912,43 +917,13 @@ async fn handle_local_message(
};
// if we can't parse this to a netaction, treat it as a hello and print it
// respond to a text message with a simple "delivered" response
print_tx
.send(Printout {
verbosity: 0,
content: format!(
"\x1b[3;32m{}: {}\x1b[0m",
km.source.node,
std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!")
),
})
.await?;
kernel_message_tx
.send(KernelMessage {
id: km.id,
source: Address {
node: our.name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
target: km.rsvp.unwrap_or(km.source),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: "delivered".as_bytes().to_vec(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await?;
parse_hello_message(our, &km, ipc, kernel_message_tx, print_tx).await?;
Ok(())
} else {
// available commands: "peers", "pki", "names", "diagnostics"
// first parse as raw string, then deserialize to NetActions object
let mut printout = String::new();
match std::str::from_utf8(&ipc) {
match std::str::from_utf8(ipc) {
Ok("peers") => {
printout.push_str(&format!(
"{:#?}",
@ -966,7 +941,7 @@ async fn handle_local_message(
}
Ok("diagnostics") => {
printout.push_str(&format!("our Identity: {:#?}\r\n", our));
printout.push_str(&format!("we have connections with peers:\r\n"));
printout.push_str("we have connections with peers:\r\n");
for peer in peers.iter() {
printout.push_str(&format!(
" {}, routing_for={}\r\n",
@ -988,18 +963,18 @@ async fn handle_local_message(
}
}
_ => {
match rmp_serde::from_slice::<NetActions>(&ipc)? {
NetActions::ConnectionRequest(_) => {
match rmp_serde::from_slice::<NetActions>(ipc) {
Ok(NetActions::ConnectionRequest(_)) => {
// we shouldn't receive these from ourselves.
}
NetActions::QnsUpdate(log) => {
Ok(NetActions::QnsUpdate(log)) => {
// printout.push_str(&format!("net: got QNS update for {}", log.name));
pki.insert(
log.name.clone(),
Identity {
name: log.name.clone(),
networking_key: log.public_key,
ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0 {
ws_routing: if log.ip == *"0.0.0.0" || log.port == 0 {
None
} else {
Some((log.ip, log.port))
@ -1009,7 +984,7 @@ async fn handle_local_message(
);
names.insert(log.node, log.name);
}
NetActions::QnsBatchUpdate(log_list) => {
Ok(NetActions::QnsBatchUpdate(log_list)) => {
// printout.push_str(&format!(
// "net: got QNS update with {} peers",
// log_list.len()
@ -1020,8 +995,7 @@ async fn handle_local_message(
Identity {
name: log.name.clone(),
networking_key: log.public_key,
ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0
{
ws_routing: if log.ip == *"0.0.0.0" || log.port == 0 {
None
} else {
Some((log.ip, log.port))
@ -1032,6 +1006,10 @@ async fn handle_local_message(
names.insert(log.node, log.name);
}
}
_ => {
parse_hello_message(our, &km, ipc, kernel_message_tx, print_tx).await?;
return Ok(());
}
}
}
}

View File

@ -25,14 +25,10 @@ pub async fn save_new_peer(
kernel_message_tx: &MessageSender,
print_tx: &PrintSender,
) {
print_debug(
&print_tx,
&format!("net: saving new peer {}", identity.name),
)
.await;
print_debug(print_tx, &format!("net: saving new peer {}", identity.name)).await;
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
if km.is_some() {
peer_tx.send(km.unwrap()).unwrap()
if let Some(km) = km {
peer_tx.send(km).unwrap()
}
let peer = Peer {
identity: identity.clone(),
@ -130,7 +126,6 @@ pub async fn maintain_connection(
print_debug(&print_tx, &format!("net: connection with {peer_name} died")).await;
peers.remove(&peer_name);
return;
}
/// cross the streams
@ -267,10 +262,10 @@ pub fn validate_routing_request(
.ok_or(anyhow!("unknown QNS name"))?;
let their_networking_key = signature::UnparsedPublicKey::new(
&signature::ED25519,
hex::decode(&strip_0x(&their_id.networking_key))?,
hex::decode(strip_0x(&their_id.networking_key))?,
);
their_networking_key.verify(
&[&routing_request.target, our_name].concat().as_bytes(),
[&routing_request.target, our_name].concat().as_bytes(),
&routing_request.signature,
)?;
if routing_request.target == routing_request.source {
@ -290,7 +285,7 @@ pub fn validate_handshake(
// verify their signature of their static key
let their_networking_key = signature::UnparsedPublicKey::new(
&signature::ED25519,
hex::decode(&strip_0x(&their_id.networking_key))?,
hex::decode(strip_0x(&their_id.networking_key))?,
);
their_networking_key.verify(their_static_key, &handshake.signature)?;
Ok(())
@ -351,7 +346,7 @@ pub async fn send_uqbar_handshake(
keypair: &Ed25519KeyPair,
noise_static_key: &[u8],
noise: &mut snow::HandshakeState,
buf: &mut Vec<u8>,
buf: &mut [u8],
write_stream: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
proxy_request: bool,
) -> Result<()> {
@ -372,7 +367,7 @@ pub async fn send_uqbar_handshake(
pub async fn recv_uqbar_handshake(
noise: &mut snow::HandshakeState,
buf: &mut Vec<u8>,
buf: &mut [u8],
read_stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
write_stream: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
) -> Result<HandshakePayload> {
@ -454,11 +449,51 @@ pub async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSen
}
fn strip_0x(s: &str) -> String {
if s.starts_with("0x") {
s[2..].to_string()
} else {
s.to_string()
if let Some(stripped) = s.strip_prefix("0x") {
return stripped.to_string();
}
s.to_string()
}
pub async fn parse_hello_message(
our: &Identity,
km: &KernelMessage,
ipc: &[u8],
kernel_message_tx: &MessageSender,
print_tx: &PrintSender,
) -> Result<()> {
print_tx
.send(Printout {
verbosity: 0,
content: format!(
"\x1b[3;32m{}: {}\x1b[0m",
km.source.node,
std::str::from_utf8(ipc).unwrap_or("!!message parse error!!")
),
})
.await?;
kernel_message_tx
.send(KernelMessage {
id: km.id,
source: Address {
node: our.name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
target: km.rsvp.as_ref().unwrap_or(&km.source).clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: "delivered".as_bytes().to_vec(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await?;
Ok(())
}
pub async fn print_debug(print_tx: &PrintSender, content: &str) {

@ -1 +1 @@
Subproject commit 81654a0ed724a62748e49feb0c97fad50f1e243a
Subproject commit 7120168626f4e9b09008844174c63fc30af8711f

View File

@ -1,5 +1,5 @@
use aes_gcm::aead::KeyInit;
use base64;
use hmac::Hmac;
use jwt::SignWithKey;
use ring::pkcs8::Document;
@ -11,7 +11,7 @@ use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use warp::{
http::{
header::{HeaderValue, SET_COOKIE},
header::{HeaderMap, HeaderValue, SET_COOKIE},
StatusCode,
},
Filter, Rejection, Reply,
@ -24,7 +24,7 @@ use crate::types::*;
type RegistrationSender = mpsc::Sender<(Identity, Keyfile, Vec<u8>)>;
pub fn generate_jwt(jwt_secret_bytes: &[u8], username: String) -> Option<String> {
let jwt_secret: Hmac<Sha256> = match Hmac::new_from_slice(&jwt_secret_bytes) {
let jwt_secret: Hmac<Sha256> = match Hmac::new_from_slice(jwt_secret_bytes) {
Ok(secret) => secret,
Err(_) => return None,
};
@ -59,10 +59,13 @@ pub async fn register(
let keyfile_vet = keyfile_arc.clone();
let static_files = warp::path("static").and(warp::fs::dir("./src/register/build/static/"));
let react_app = warp::path::end()
.and(warp::get())
.and(warp::fs::file("./src/register/build/index.html"));
let keyfile_vet_copy = keyfile_vet.clone();
let api = warp::path("has-keyfile")
.and(
warp::get()
@ -74,6 +77,7 @@ pub async fn register(
.and(warp::any().map(move || ip.clone()))
.and(warp::any().map(move || our_ws_info.clone()))
.and(warp::any().map(move || net_keypair_ws_info.clone()))
.and(warp::any().map(move || keyfile_vet_copy.clone()))
.and_then(handle_info),
))
.or(warp::path("vet-keyfile").and(
@ -94,7 +98,16 @@ pub async fn register(
.and_then(handle_boot),
));
let routes = static_files.or(react_app).or(api);
let mut headers = HeaderMap::new();
headers.insert(
"Cache-Control",
HeaderValue::from_static("no-store, no-cache, must-revalidate, proxy-revalidate"),
);
let routes = static_files
.or(react_app)
.or(api)
.with(warp::reply::with::headers(headers));
let _ = open::that(format!("http://localhost:{}/", port));
warp::serve(routes)
@ -115,10 +128,13 @@ async fn handle_has_keyfile(keyfile: Arc<Mutex<Option<Vec<u8>>>>) -> Result<impl
let encoded_keyfile = keyfile_lock.as_ref().unwrap();
let username: String = match encoded_keyfile.is_empty() {
true => "".to_string(),
false => {
let (user, ..): (String,) = bincode::deserialize(encoded_keyfile).unwrap();
user
}
false => match bincode::deserialize(encoded_keyfile) {
Ok(k) => {
let (user, ..): (String,) = k;
user
}
Err(_) => "".to_string(),
},
};
Ok(warp::reply::json(&username))
@ -155,9 +171,11 @@ async fn handle_boot(
sender: RegistrationSender,
mut our: Identity,
networking_keypair: Document,
mut encoded_keyfile: Vec<u8>,
encoded_keyfile: Vec<u8>,
) -> Result<impl Reply, Rejection> {
our.name = info.username;
if !info.username.is_empty() {
our.name = info.username;
}
if info.direct {
our.allowed_routers = vec![];
@ -166,12 +184,20 @@ async fn handle_boot(
}
// if keyfile was not present in node and is present from user upload
if encoded_keyfile.is_empty() && !info.keyfile.clone().is_empty() {
let mut encoded_keyfile = if !info.keyfile.clone().is_empty() {
match base64::decode(info.keyfile.clone()) {
Ok(k) => encoded_keyfile = k,
Err(_) => return Err(warp::reject()),
Ok(k) => k,
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&"Keyfile not valid base64".to_string()),
StatusCode::BAD_REQUEST,
)
.into_response())
}
}
}
} else {
encoded_keyfile
};
// if keyfile was not in node or upload or if networking required reset
let decoded_keyfile = if info.reset || encoded_keyfile.is_empty() {
@ -190,13 +216,20 @@ async fn handle_boot(
} else {
match keygen::decode_keyfile(encoded_keyfile.clone(), &info.password) {
Ok(k) => {
our.name = k.username.clone();
our.networking_key = format!(
"0x{}",
hex::encode(k.networking_keypair.public_key().as_ref())
);
k
}
Err(_) => return Err(warp::reject()),
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&"Failed to decode keyfile".to_string()),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response())
}
}
};
@ -213,7 +246,13 @@ async fn handle_boot(
let token = match generate_jwt(&decoded_keyfile.jwt_secret_bytes, our.name.clone()) {
Some(token) => token,
None => return Err(warp::reject()),
None => {
return Ok(warp::reply::with_status(
warp::reply::json(&"Failed to generate JWT".to_string()),
StatusCode::SERVICE_UNAVAILABLE,
)
.into_response())
}
};
sender
@ -247,11 +286,22 @@ async fn handle_info(
ip: String,
our_arc: Arc<Mutex<Option<Identity>>>,
networking_keypair_arc: Arc<Mutex<Option<Document>>>,
keyfile_arc: Arc<Mutex<Option<Vec<u8>>>>,
) -> Result<impl Reply, Rejection> {
// 1. Generate networking keys
let (public_key, serialized_networking_keypair) = keygen::generate_networking_key();
*networking_keypair_arc.lock().unwrap() = Some(serialized_networking_keypair);
let username = {
match keyfile_arc.lock().unwrap().clone() {
None => String::new(),
Some(encoded_keyfile) => match keygen::get_username(encoded_keyfile) {
Ok(k) => k,
Err(_) => String::new(),
},
}
};
// 2. set our...
// TODO: if IP is localhost, assign a router...
let ws_port = http_server::find_open_port(9000).await.unwrap();
@ -260,7 +310,7 @@ async fn handle_info(
// to match on
let our = Identity {
networking_key: format!("0x{}", public_key),
name: String::new(),
name: username,
ws_routing: Some((ip.clone(), ws_port)),
allowed_routers: vec![
"uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(),

View File

@ -55,7 +55,7 @@ impl CommandHistory {
}
fn get_prev(&mut self, working_line: &str) -> Option<String> {
if self.lines.len() == 0 || self.index == self.lines.len() {
if self.lines.is_empty() || self.index == self.lines.len() {
return None;
}
self.index += 1;
@ -67,7 +67,7 @@ impl CommandHistory {
}
fn get_next(&mut self) -> Option<String> {
if self.lines.len() == 0 || self.index == 0 || self.index == 1 {
if self.lines.is_empty() || self.index == 0 || self.index == 1 {
self.index = 0;
if let Some(line) = self.working_line.clone() {
self.working_line = None;
@ -83,12 +83,10 @@ impl CommandHistory {
/// provided string. otherwise, skip the first <depth> matches.
/// yes this is O(n) to provide desired ordering, can revisit if slow
fn search(&mut self, find: &str, depth: usize) -> Option<String> {
let mut skips = 0;
for line in &self.lines {
for (skips, line) in self.lines.iter().enumerate() {
if line.contains(find) && skips == depth {
return Some(line.to_string());
}
skips += 1;
}
None
}
@ -116,7 +114,7 @@ pub async fn terminal(
// print initial splash screen
println!(
"\x1b[38;5;128m{}\x1b[0m",
format!(
format_args!(
r#"
,, UU

View File

@ -73,21 +73,13 @@ impl ProcessId {
publisher_node,
})
}
pub fn to_string(&self) -> String {
[
self.process_name.as_str(),
self.package_name.as_str(),
self.publisher_node.as_str(),
]
.join(":")
}
pub fn process(&self) -> &str {
&self.process_name
}
pub fn package(&self) -> &str {
&self.package_name
}
pub fn publisher_node(&self) -> &str {
pub fn publisher(&self) -> &str {
&self.publisher_node
}
pub fn en_wit(&self) -> wit::ProcessId {
@ -214,13 +206,19 @@ impl OnPanic {
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.to_string())
write!(
f,
"{}:{}:{}",
self.process(),
self.package(),
self.publisher()
)
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}@{}", self.node, self.process.to_string(),)
write!(f, "{}@{}", self.node, self.process)
}
}
@ -250,7 +248,7 @@ impl std::fmt::Display for Message {
if context.is_none() {
"None".into()
} else {
match serde_json::from_slice::<serde_json::Value>(&context.as_ref().unwrap()) {
match serde_json::from_slice::<serde_json::Value>(context.as_ref().unwrap()) {
Ok(json) => format!("{}", json),
Err(_) => format!("{:?}", context.as_ref().unwrap()),
}
@ -1055,11 +1053,11 @@ pub struct EncryptAction {
#[derive(Debug, Serialize, Deserialize)]
pub enum EncryptorMessage {
GetKeyAction(GetKeyAction),
DecryptAndForwardAction(DecryptAndForwardAction),
EncryptAndForwardAction(EncryptAndForwardAction),
DecryptAction(DecryptAction),
EncryptAction(EncryptAction),
GetKey(GetKeyAction),
DecryptAndForward(DecryptAndForwardAction),
EncryptAndForward(EncryptAndForwardAction),
Decrypt(DecryptAction),
Encrypt(EncryptAction),
}
// encryptor End

View File

@ -76,7 +76,7 @@ fn clean_path(path: &str) -> String {
}
fn get_parent_path(path: &str) -> String {
let mut split_path: Vec<&str> = path.split("/").collect();
let mut split_path: Vec<&str> = path.split('/').collect();
split_path.pop();
let parent_path = split_path.join("/");
if parent_path.is_empty() {
@ -105,9 +105,9 @@ async fn create_entry(vfs: &mut MutexGuard<Vfs>, path: &str, key: Key) -> Result
},
};
let entry = Entry {
name: path.split("/").last().unwrap().to_string(),
name: path.split('/').last().unwrap().to_string(),
full_path: path.to_string(),
entry_type: entry_type,
entry_type,
};
vfs.key_to_entry.insert(key.clone(), entry);
vfs.path_to_key.insert(path.to_string(), key.clone());
@ -137,7 +137,7 @@ async fn rename_entry(
Some(entry) => entry,
None => return Err(VfsError::EntryNotFound),
};
entry.name = new_path.split("/").last().unwrap().to_string();
entry.name = new_path.split('/').last().unwrap().to_string();
entry.full_path = new_path.to_string();
let children = if let EntryType::Dir { children, .. } = &entry.entry_type {
@ -200,8 +200,8 @@ async fn state_to_bytes(state: &DriveToVfs) -> Vec<u8> {
bincode::serialize(&serializable).unwrap()
}
fn bytes_to_state(bytes: &Vec<u8>, state: &mut DriveToVfs) {
let serializable: DriveToVfsSerializable = bincode::deserialize(&bytes).unwrap();
fn bytes_to_state(bytes: &[u8], state: &mut DriveToVfs) {
let serializable: DriveToVfsSerializable = bincode::deserialize(bytes).unwrap();
for (id, vfs) in serializable.into_iter() {
state.insert(id, Arc::new(Mutex::new(vfs)));
}
@ -293,21 +293,21 @@ async fn load_state_from_reboot(
.await;
let km = recv_from_loop.recv().await;
let Some(km) = km else {
return ();
return;
};
let KernelMessage {
message, payload, ..
} = km;
let Message::Response((Response { ipc, .. }, None)) = message else {
return ();
return;
};
let Ok(Ok(FsResponse::GetState)) = serde_json::from_slice::<Result<FsResponse, FsError>>(&ipc)
else {
return ();
return;
};
let Some(payload) = payload else {
return ();
return;
};
bytes_to_state(&payload.bytes, drive_to_vfs);
}
@ -407,7 +407,7 @@ pub async fn vfs(
MessageSender,
MessageReceiver,
) = tokio::sync::mpsc::channel(VFS_RESPONSE_CHANNEL_CAPACITY);
response_router.insert(km.id.clone(), response_sender);
response_router.insert(km.id, response_sender);
let mut drive_to_queue_lock = drive_to_queue.lock().await;
match drive_to_queue_lock.remove(&request.drive) {
@ -520,7 +520,7 @@ pub async fn vfs(
},
}
let _ = send_vfs_task_done.send(id).await;
return ();
return ;
},
Some((km, response_receiver)) => {
// handle next item
@ -549,7 +549,7 @@ pub async fn vfs(
continue;
}
};
match handle_request(
if let Err(e) = handle_request(
our_node.clone(),
id,
source.clone(),
@ -566,18 +566,15 @@ pub async fn vfs(
send_to_caps_oracle.clone(),
response_receiver,
).await {
Err(e) => {
send_to_loop
.send(make_error_message(
our_node.into(),
id,
source,
e,
))
.await
.unwrap();
},
Ok(_) => {},
send_to_loop
.send(make_error_message(
our_node,
id,
source,
e,
))
.await
.unwrap();
}
},
}
@ -617,7 +614,7 @@ async fn handle_request(
| VfsAction::WriteOffset { .. }
| VfsAction::Append { .. }
| VfsAction::SetSize { .. } => {
let _ = send_to_caps_oracle
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
@ -645,7 +642,7 @@ async fn handle_request(
| VfsAction::GetEntry { .. }
| VfsAction::GetFileChunk { .. }
| VfsAction::GetEntryLength { .. } => {
let _ = send_to_caps_oracle
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
@ -673,7 +670,7 @@ async fn handle_request(
let (ipc, bytes) = match_request(
our_node.clone(),
id.clone(),
id,
source.clone(),
request,
payload,
@ -709,19 +706,16 @@ async fn handle_request(
},
None,
)),
payload: match bytes {
Some(bytes) => Some(Payload {
mime: Some("application/octet-stream".into()),
bytes,
}),
None => None,
},
payload: bytes.map(|bytes| Payload {
mime: Some("application/octet-stream".into()),
bytes,
}),
signed_capabilities: None,
};
let _ = send_to_loop.send(response).await;
} else {
let _ = send_to_terminal
send_to_terminal
.send(Printout {
verbosity: 1,
content: format!(
@ -755,7 +749,7 @@ async fn match_request(
VfsAction::New => {
for new_cap in new_caps {
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
let _ = send_to_caps_oracle
send_to_caps_oracle
.send(CapMessage::Add {
on: source.process.clone(),
cap: new_cap,
@ -1035,73 +1029,76 @@ async fn match_request(
VfsAction::Delete(mut full_path) => {
full_path = clean_path(&full_path);
let mut vfs = vfs.lock().await;
let Some(key) = vfs.path_to_key.remove(&full_path) else {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!("vfs: can't delete: nonexistent entry {}", full_path),
})
.await
.unwrap();
return Err(VfsError::EntryNotFound);
};
let Some(entry) = vfs.key_to_entry.remove(&key) else {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!("vfs: can't delete: nonexistent entry {}", full_path),
})
.await
.unwrap();
return Err(VfsError::EntryNotFound);
};
match entry.entry_type {
EntryType::Dir {
parent: _,
ref children,
} => {
if !children.is_empty() {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!(
"vfs: can't delete: non-empty directory {}",
full_path
),
})
.await
.unwrap();
vfs.path_to_key.insert(full_path.clone(), key.clone());
vfs.key_to_entry.insert(key.clone(), entry);
{
let mut vfs = vfs.lock().await;
let Some(key) = vfs.path_to_key.remove(&full_path) else {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!("vfs: can't delete: nonexistent entry {}", full_path),
})
.await
.unwrap();
return Err(VfsError::EntryNotFound);
};
let Some(entry) = vfs.key_to_entry.remove(&key) else {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!("vfs: can't delete: nonexistent entry {}", full_path),
})
.await
.unwrap();
return Err(VfsError::EntryNotFound);
};
match entry.entry_type {
EntryType::Dir {
parent: _,
ref children,
} => {
if !children.is_empty() {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!(
"vfs: can't delete: non-empty directory {}",
full_path
),
})
.await
.unwrap();
vfs.path_to_key.insert(full_path.clone(), key.clone());
vfs.key_to_entry.insert(key.clone(), entry);
}
}
}
EntryType::File { parent } => match vfs.key_to_entry.get_mut(&parent) {
None => {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!(
"vfs: delete: unexpected file with no parent dir: {}",
full_path
),
})
.await
.unwrap();
return Err(VfsError::InternalError);
}
Some(parent) => {
let EntryType::Dir {
parent: _,
ref mut children,
} = parent.entry_type
else {
EntryType::File { parent } => match vfs.key_to_entry.get_mut(&parent) {
None => {
send_to_terminal
.send(Printout {
verbosity: 0,
content: format!(
"vfs: delete: unexpected file with no parent dir: {}",
full_path
),
})
.await
.unwrap();
return Err(VfsError::InternalError);
};
children.remove(&key);
}
},
}
Some(parent) => {
let EntryType::Dir {
parent: _,
ref mut children,
} = parent.entry_type
else {
return Err(VfsError::InternalError);
};
children.remove(&key);
}
},
}
}
persist_state(send_to_persist, &mut recv_response, id).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
@ -1263,7 +1260,7 @@ async fn match_request(
}
VfsAction::GetPath(hash) => {
let mut vfs = vfs.lock().await;
let key = Key::File { id: hash.clone() };
let key = Key::File { id: hash };
let ipc =
serde_json::to_vec(&VfsResponse::GetPath(match vfs.key_to_entry.remove(&key) {
None => None,
@ -1282,7 +1279,7 @@ async fn match_request(
return Err(VfsError::EntryNotFound);
};
let ipc = serde_json::to_vec(&VfsResponse::GetHash(match key {
Key::File { id } => Some(id.clone()),
Key::File { id } => Some(*id),
Key::Dir { .. } => None,
}))
.unwrap();
@ -1337,10 +1334,8 @@ async fn match_request(
message: Message::Request(Request {
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: serde_json::to_vec(&FsAction::Read(
file_hash.clone(),
))
.unwrap(),
ipc: serde_json::to_vec(&FsAction::Read(*file_hash))
.unwrap(),
metadata: None,
}),
payload: None,
@ -1412,7 +1407,7 @@ async fn match_request(
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: serde_json::to_vec(&FsAction::ReadChunk(ReadChunkRequest {
file: file_hash.clone(),
file: file_hash,
start: offset,
length,
}))
@ -1450,7 +1445,7 @@ async fn match_request(
}
VfsAction::GetEntryLength(mut full_path) => {
full_path = clean_path(&full_path);
if full_path.chars().last() == Some('/') {
if full_path.ends_with('/') {
(
serde_json::to_vec(&VfsResponse::GetEntryLength(None)).unwrap(),
None,