mirror of
https://github.com/uqbar-dao/nectar.git
synced 2025-01-02 05:28:22 +03:00
clippy fixes
This commit is contained in:
parent
05a6747835
commit
f64c18256d
@ -89,7 +89,7 @@ pub async fn encryptor(
|
||||
match serde_json::from_slice::<EncryptorMessage>(&ipc) {
|
||||
Ok(message) => {
|
||||
match message {
|
||||
EncryptorMessage::GetKeyAction(GetKeyAction {
|
||||
EncryptorMessage::GetKey(GetKeyAction {
|
||||
channel_id,
|
||||
public_key_hex,
|
||||
}) => {
|
||||
@ -183,7 +183,7 @@ pub async fn encryptor(
|
||||
}
|
||||
}
|
||||
}
|
||||
EncryptorMessage::DecryptAndForwardAction(DecryptAndForwardAction {
|
||||
EncryptorMessage::DecryptAndForward(DecryptAndForwardAction {
|
||||
channel_id,
|
||||
forward_to,
|
||||
json,
|
||||
@ -241,7 +241,7 @@ pub async fn encryptor(
|
||||
panic!("No secret found");
|
||||
}
|
||||
}
|
||||
EncryptorMessage::EncryptAndForwardAction(EncryptAndForwardAction {
|
||||
EncryptorMessage::EncryptAndForward(EncryptAndForwardAction {
|
||||
channel_id,
|
||||
forward_to,
|
||||
json,
|
||||
@ -301,7 +301,7 @@ pub async fn encryptor(
|
||||
.await;
|
||||
}
|
||||
}
|
||||
EncryptorMessage::DecryptAction(DecryptAction { channel_id }) => {
|
||||
EncryptorMessage::Decrypt(DecryptAction { channel_id }) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
@ -357,7 +357,7 @@ pub async fn encryptor(
|
||||
.await;
|
||||
}
|
||||
}
|
||||
EncryptorMessage::EncryptAction(EncryptAction { channel_id }) => {
|
||||
EncryptorMessage::Encrypt(EncryptAction { channel_id }) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
|
@ -13,7 +13,7 @@ use rusoto_s3::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
||||
@ -44,7 +44,7 @@ pub struct ChunkEntry {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Eq, Hash, PartialEq)]
|
||||
pub enum FileIdentifier {
|
||||
UUID(u128),
|
||||
Uuid(u128),
|
||||
Process(ProcessId),
|
||||
}
|
||||
|
||||
@ -65,7 +65,7 @@ pub struct BackupEntry {
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ChunkLocation {
|
||||
ColdStorage(bool), // bool local
|
||||
WAL(u64), // offset in wal,
|
||||
Wal(u64), // offset in wal,
|
||||
Memory(u64), // offset in memory buffer
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ pub struct InMemoryFile {
|
||||
impl InMemoryFile {
|
||||
pub fn hash(&self) -> [u8; 32] {
|
||||
let mut hasher = Hasher::new();
|
||||
for (_, (hash, _, _, _)) in &self.chunks {
|
||||
for (hash, _, _, _) in self.chunks.values() {
|
||||
hasher.update(hash);
|
||||
}
|
||||
hasher.finalize().into()
|
||||
@ -139,12 +139,12 @@ impl InMemoryFile {
|
||||
|
||||
impl FileIdentifier {
|
||||
pub fn new_uuid() -> Self {
|
||||
Self::UUID(uuid::Uuid::new_v4().as_u128())
|
||||
Self::Uuid(uuid::Uuid::new_v4().as_u128())
|
||||
}
|
||||
|
||||
pub fn to_uuid(&self) -> Option<u128> {
|
||||
match self {
|
||||
Self::UUID(uuid) => Some(*uuid),
|
||||
Self::Uuid(uuid) => Some(*uuid),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@ -174,7 +174,7 @@ impl Manifest {
|
||||
pub async fn load(
|
||||
manifest_file: fs::File,
|
||||
wal_file: fs::File,
|
||||
fs_directory_path: &PathBuf,
|
||||
fs_directory_path: &Path,
|
||||
file_key: Vec<u8>,
|
||||
fs_config: FsConfig,
|
||||
) -> io::Result<Self> {
|
||||
@ -208,7 +208,7 @@ impl Manifest {
|
||||
hash_index: Arc::new(RwLock::new(hash_index)),
|
||||
manifest_file: Arc::new(RwLock::new(manifest_file)),
|
||||
wal_file: Arc::new(RwLock::new(wal_file)),
|
||||
fs_directory_path: fs_directory_path.clone(),
|
||||
fs_directory_path: fs_directory_path.to_path_buf(),
|
||||
flush_cold_freq: fs_config.flush_to_cold_interval,
|
||||
memory_buffer: Arc::new(RwLock::new(Vec::new())),
|
||||
memory_limit: fs_config.mem_buffer_limit,
|
||||
@ -254,7 +254,7 @@ impl Manifest {
|
||||
pub async fn _get_chunk_hashes(&self) -> HashSet<[u8; 32]> {
|
||||
let mut in_use_hashes = HashSet::new();
|
||||
for file in self.manifest.read().await.values() {
|
||||
for (_start, (hash, _length, _wal_position, _encrypted)) in &file.chunks {
|
||||
for (hash, _length, _wal_position, _encrypted) in file.chunks.values() {
|
||||
in_use_hashes.insert(*hash);
|
||||
}
|
||||
}
|
||||
@ -297,8 +297,8 @@ impl Manifest {
|
||||
.chunks
|
||||
.insert(start, (hash, length, location, encrypted));
|
||||
match &location {
|
||||
&ChunkLocation::Memory(..) => in_memory_file.mem_chunks.push(start),
|
||||
&ChunkLocation::WAL(..) => in_memory_file.wal_chunks.push(start),
|
||||
ChunkLocation::Memory(..) => in_memory_file.mem_chunks.push(start),
|
||||
ChunkLocation::Wal(..) => in_memory_file.wal_chunks.push(start),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@ -322,7 +322,7 @@ impl Manifest {
|
||||
in_memory_file.chunks.get_mut(&start)
|
||||
{
|
||||
if let ChunkLocation::Memory(offset) = location {
|
||||
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
|
||||
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
|
||||
in_memory_file.wal_chunks.push(start);
|
||||
}
|
||||
}
|
||||
@ -332,7 +332,7 @@ impl Manifest {
|
||||
for tx_chunks in in_memory_file.active_txs.values_mut() {
|
||||
for (_start, _hash, _length, location, _encrypted) in tx_chunks {
|
||||
if let ChunkLocation::Memory(offset) = location {
|
||||
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
|
||||
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -362,7 +362,7 @@ impl Manifest {
|
||||
in_memory_file.chunks.get_mut(&start)
|
||||
{
|
||||
if let ChunkLocation::Memory(offset) = location {
|
||||
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
|
||||
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
|
||||
in_memory_file.wal_chunks.push(start);
|
||||
}
|
||||
}
|
||||
@ -372,7 +372,7 @@ impl Manifest {
|
||||
for tx_chunks in in_memory_file.active_txs.values_mut() {
|
||||
for (_start, _hash, _length, location, _encrypted) in tx_chunks {
|
||||
if let ChunkLocation::Memory(offset) = location {
|
||||
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
|
||||
*location = ChunkLocation::Wal(wal_length_before_flush + *offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -546,7 +546,7 @@ impl Manifest {
|
||||
}
|
||||
chunk_data
|
||||
}
|
||||
ChunkLocation::WAL(offset) => {
|
||||
ChunkLocation::Wal(offset) => {
|
||||
let mut wal_file = self.wal_file.write().await;
|
||||
wal_file
|
||||
.seek(SeekFrom::Start(offset))
|
||||
@ -674,7 +674,7 @@ impl Manifest {
|
||||
}
|
||||
chunk_data
|
||||
}
|
||||
ChunkLocation::WAL(offset) => {
|
||||
ChunkLocation::Wal(offset) => {
|
||||
let mut wal_file = self.wal_file.write().await;
|
||||
wal_file
|
||||
.seek(SeekFrom::Start(offset))
|
||||
@ -956,32 +956,30 @@ impl Manifest {
|
||||
let mut chunks_to_flush: Vec<([u8; 32], u64, u64, ChunkLocation, bool)> = Vec::new();
|
||||
|
||||
for &start in &in_memory_file.mem_chunks {
|
||||
if let Some((hash, length, location, encrypted)) = in_memory_file.chunks.get(&start)
|
||||
if let Some((hash, length, ChunkLocation::Memory(mem_pos), encrypted)) =
|
||||
in_memory_file.chunks.get(&start)
|
||||
{
|
||||
if let ChunkLocation::Memory(mem_pos) = location {
|
||||
chunks_to_flush.push((
|
||||
*hash,
|
||||
start,
|
||||
*length,
|
||||
ChunkLocation::Memory(*mem_pos),
|
||||
*encrypted,
|
||||
));
|
||||
}
|
||||
chunks_to_flush.push((
|
||||
*hash,
|
||||
start,
|
||||
*length,
|
||||
ChunkLocation::Memory(*mem_pos),
|
||||
*encrypted,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
for &start in &in_memory_file.wal_chunks {
|
||||
if let Some((hash, length, location, encrypted)) = in_memory_file.chunks.get(&start)
|
||||
if let Some((hash, length, ChunkLocation::Wal(wal_pos), encrypted)) =
|
||||
in_memory_file.chunks.get(&start)
|
||||
{
|
||||
if let ChunkLocation::WAL(wal_pos) = location {
|
||||
chunks_to_flush.push((
|
||||
*hash,
|
||||
start,
|
||||
*length,
|
||||
ChunkLocation::WAL(*wal_pos),
|
||||
*encrypted,
|
||||
));
|
||||
}
|
||||
chunks_to_flush.push((
|
||||
*hash,
|
||||
start,
|
||||
*length,
|
||||
ChunkLocation::Wal(*wal_pos),
|
||||
*encrypted,
|
||||
));
|
||||
}
|
||||
}
|
||||
if !chunks_to_flush.is_empty() {
|
||||
@ -999,7 +997,7 @@ impl Manifest {
|
||||
};
|
||||
|
||||
let buffer = match location {
|
||||
ChunkLocation::WAL(wal_pos) => {
|
||||
ChunkLocation::Wal(wal_pos) => {
|
||||
// seek to the chunk in the WAL file
|
||||
wal_file.seek(SeekFrom::Start(*wal_pos)).await?;
|
||||
// read the chunk data from the WAL file
|
||||
@ -1284,7 +1282,7 @@ async fn load_wal(
|
||||
in_memory_file
|
||||
.chunks
|
||||
.insert(start, (hash, length, location, encrypted));
|
||||
if let ChunkLocation::WAL(_) = location {
|
||||
if let ChunkLocation::Wal(_) = location {
|
||||
in_memory_file.wal_chunks.push(start);
|
||||
}
|
||||
}
|
||||
@ -1298,7 +1296,7 @@ async fn load_wal(
|
||||
let location = if entry.copy {
|
||||
ChunkLocation::ColdStorage(entry.local)
|
||||
} else {
|
||||
ChunkLocation::WAL(data_position)
|
||||
ChunkLocation::Wal(data_position)
|
||||
};
|
||||
let chunks = tx_chunks
|
||||
.entry(entry.tx_id)
|
||||
@ -1363,11 +1361,8 @@ async fn verify_manifest(
|
||||
let file_hash = in_memory_file.hash();
|
||||
|
||||
for (chunk_hash, _, location, _encrypted) in in_memory_file.chunks.values() {
|
||||
match location {
|
||||
ChunkLocation::ColdStorage(local) => {
|
||||
chunk_hashes.insert(*chunk_hash, *local);
|
||||
}
|
||||
_ => {}
|
||||
if let ChunkLocation::ColdStorage(local) = location {
|
||||
chunk_hashes.insert(*chunk_hash, *local);
|
||||
}
|
||||
}
|
||||
hash_index.insert(file_hash, file.clone());
|
||||
|
@ -539,7 +539,7 @@ async fn handle_request(
|
||||
});
|
||||
};
|
||||
let file_uuid = match maybe_file_id {
|
||||
Some(id) => FileIdentifier::UUID(id),
|
||||
Some(id) => FileIdentifier::Uuid(id),
|
||||
None => FileIdentifier::new_uuid(),
|
||||
};
|
||||
|
||||
@ -562,7 +562,7 @@ async fn handle_request(
|
||||
});
|
||||
};
|
||||
|
||||
let file_uuid = FileIdentifier::UUID(file_uuid);
|
||||
let file_uuid = FileIdentifier::Uuid(file_uuid);
|
||||
|
||||
match manifest.write_at(&file_uuid, offset, &payload.bytes).await {
|
||||
Ok(_) => (),
|
||||
@ -577,7 +577,7 @@ async fn handle_request(
|
||||
(FsResponse::Write(file_uuid.to_uuid().unwrap()), None)
|
||||
}
|
||||
FsAction::Read(file_uuid) => {
|
||||
let file = FileIdentifier::UUID(file_uuid);
|
||||
let file = FileIdentifier::Uuid(file_uuid);
|
||||
|
||||
match manifest.read(&file, None, None).await {
|
||||
Err(e) => {
|
||||
@ -590,7 +590,7 @@ async fn handle_request(
|
||||
}
|
||||
}
|
||||
FsAction::ReadChunk(req) => {
|
||||
let file = FileIdentifier::UUID(req.file);
|
||||
let file = FileIdentifier::Uuid(req.file);
|
||||
|
||||
match manifest
|
||||
.read(&file, Some(req.start), Some(req.length))
|
||||
@ -606,7 +606,7 @@ async fn handle_request(
|
||||
}
|
||||
}
|
||||
FsAction::Delete(del) => {
|
||||
let file = FileIdentifier::UUID(del);
|
||||
let file = FileIdentifier::Uuid(del);
|
||||
manifest.delete(&file).await?;
|
||||
|
||||
(FsResponse::Delete(del), None)
|
||||
@ -619,7 +619,7 @@ async fn handle_request(
|
||||
};
|
||||
|
||||
let file_uuid = match maybe_file_uuid {
|
||||
Some(uuid) => FileIdentifier::UUID(uuid),
|
||||
Some(uuid) => FileIdentifier::Uuid(uuid),
|
||||
None => FileIdentifier::new_uuid(),
|
||||
};
|
||||
|
||||
@ -636,7 +636,7 @@ async fn handle_request(
|
||||
(FsResponse::Append(file_uuid.to_uuid().unwrap()), None)
|
||||
}
|
||||
FsAction::Length(file_uuid) => {
|
||||
let file = FileIdentifier::UUID(file_uuid);
|
||||
let file = FileIdentifier::Uuid(file_uuid);
|
||||
let length = manifest.get_length(&file).await;
|
||||
match length {
|
||||
Some(len) => (FsResponse::Length(len), None),
|
||||
@ -648,7 +648,7 @@ async fn handle_request(
|
||||
}
|
||||
}
|
||||
FsAction::SetLength((file_uuid, length)) => {
|
||||
let file = FileIdentifier::UUID(file_uuid);
|
||||
let file = FileIdentifier::Uuid(file_uuid);
|
||||
manifest.set_length(&file, length).await?;
|
||||
|
||||
// doublecheck if this is the type of return statement we want.
|
||||
|
@ -164,39 +164,33 @@ async fn handle_websocket(
|
||||
}
|
||||
}
|
||||
} else if msg.is_text() {
|
||||
match msg.to_str() {
|
||||
Ok(msg_str) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str),
|
||||
})
|
||||
.await;
|
||||
match serde_json::from_str(msg_str) {
|
||||
Ok(parsed_msg) => {
|
||||
handle_incoming_ws(
|
||||
parsed_msg,
|
||||
our.clone(),
|
||||
jwt_secret_bytes.clone().to_vec(),
|
||||
websockets.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
write_stream.clone(),
|
||||
ws_id,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
if let Ok(msg_str) = msg.to_str() {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str),
|
||||
})
|
||||
.await;
|
||||
if let Ok(parsed_msg) = serde_json::from_str(msg_str) {
|
||||
handle_incoming_ws(
|
||||
parsed_msg,
|
||||
our.clone(),
|
||||
jwt_secret_bytes.clone().to_vec(),
|
||||
websockets.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
write_stream.clone(),
|
||||
ws_id,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
} else if msg.is_close() {
|
||||
// Delete the websocket from the map
|
||||
let mut ws_map = websockets.lock().await;
|
||||
for (node, node_map) in ws_map.iter_mut() {
|
||||
for (channel_id, id_map) in node_map.iter_mut() {
|
||||
if let Some(_) = id_map.remove(&ws_id) {
|
||||
if id_map.remove(&ws_id).is_some() {
|
||||
// Send disconnect message
|
||||
send_ws_disconnect(
|
||||
node.clone(),
|
||||
@ -344,64 +338,63 @@ async fn http_handle_messages(
|
||||
}
|
||||
}
|
||||
Message::Request(Request { ipc, .. }) => {
|
||||
match serde_json::from_slice(&ipc) {
|
||||
Ok(message) => {
|
||||
match message {
|
||||
HttpServerMessage::BindPath {
|
||||
path,
|
||||
if let Ok(message) = serde_json::from_slice(&ipc) {
|
||||
match message {
|
||||
HttpServerMessage::BindPath {
|
||||
path,
|
||||
authenticated,
|
||||
local_only,
|
||||
} => {
|
||||
let mut path_bindings = path_bindings.write().await;
|
||||
let app = source.process.clone().to_string();
|
||||
|
||||
let mut path = path.clone();
|
||||
if app != "homepage:homepage:uqbar" {
|
||||
path = if path.starts_with('/') {
|
||||
format!("/{}{}", app, path)
|
||||
} else {
|
||||
format!("/{}/{}", app, path)
|
||||
};
|
||||
}
|
||||
// trim trailing "/"
|
||||
path = normalize_path(&path);
|
||||
|
||||
let bound_path = BoundPath {
|
||||
app: source.process,
|
||||
authenticated,
|
||||
local_only,
|
||||
} => {
|
||||
let mut path_bindings = path_bindings.write().await;
|
||||
let app = source.process.clone().to_string();
|
||||
original_path: path.clone(),
|
||||
};
|
||||
|
||||
let mut path = path.clone();
|
||||
if app != "homepage:homepage:uqbar" {
|
||||
path = if path.starts_with('/') {
|
||||
format!("/{}{}", app, path)
|
||||
} else {
|
||||
format!("/{}/{}", app, path)
|
||||
};
|
||||
}
|
||||
// trim trailing "/"
|
||||
path = normalize_path(&path);
|
||||
path_bindings.add(&path, bound_path);
|
||||
}
|
||||
HttpServerMessage::WebSocketPush(WebSocketPush { target, is_text }) => {
|
||||
let Some(payload) = payload else {
|
||||
return Err(HttpServerError::NoBytes);
|
||||
};
|
||||
let bytes = payload.bytes;
|
||||
|
||||
let bound_path = BoundPath {
|
||||
app: source.process,
|
||||
authenticated,
|
||||
local_only,
|
||||
original_path: path.clone(),
|
||||
};
|
||||
let mut ws_map = websockets.lock().await;
|
||||
let send_text = is_text.unwrap_or(false);
|
||||
let response_data = if send_text {
|
||||
warp::ws::Message::text(
|
||||
String::from_utf8(bytes.clone()).unwrap_or_default(),
|
||||
)
|
||||
} else {
|
||||
warp::ws::Message::binary(bytes.clone())
|
||||
};
|
||||
|
||||
path_bindings.add(&path, bound_path);
|
||||
}
|
||||
HttpServerMessage::WebSocketPush(WebSocketPush { target, is_text }) => {
|
||||
let Some(payload) = payload else {
|
||||
return Err(HttpServerError::NoBytes);
|
||||
};
|
||||
let bytes = payload.bytes;
|
||||
// Send to the proxy, if registered
|
||||
if let Some(channel_id) = target.id.clone() {
|
||||
let locked_proxies = ws_proxies.lock().await;
|
||||
|
||||
let mut ws_map = websockets.lock().await;
|
||||
let send_text = is_text.unwrap_or(false);
|
||||
let response_data = if send_text {
|
||||
warp::ws::Message::text(
|
||||
String::from_utf8(bytes.clone()).unwrap_or_default(),
|
||||
)
|
||||
} else {
|
||||
warp::ws::Message::binary(bytes.clone())
|
||||
};
|
||||
if let Some(proxy_nodes) = locked_proxies.get(&channel_id) {
|
||||
for proxy_node in proxy_nodes {
|
||||
let id: u64 = rand::random();
|
||||
let bytes_content = bytes.clone();
|
||||
|
||||
// Send to the proxy, if registered
|
||||
if let Some(channel_id) = target.id.clone() {
|
||||
let locked_proxies = ws_proxies.lock().await;
|
||||
|
||||
if let Some(proxy_nodes) = locked_proxies.get(&channel_id) {
|
||||
for proxy_node in proxy_nodes {
|
||||
let id: u64 = rand::random();
|
||||
let bytes_content = bytes.clone();
|
||||
|
||||
// Send a message to the encryptor
|
||||
let message = KernelMessage {
|
||||
// Send a message to the encryptor
|
||||
let message = KernelMessage {
|
||||
id,
|
||||
source: Address {
|
||||
node: our.clone(),
|
||||
@ -433,32 +426,21 @@ async fn http_handle_messages(
|
||||
signed_capabilities: None,
|
||||
};
|
||||
|
||||
send_to_loop.send(message).await.unwrap();
|
||||
}
|
||||
send_to_loop.send(message).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send to the websocket if registered
|
||||
if let Some(node_map) = ws_map.get_mut(&target.node) {
|
||||
if let Some(socket_id) = &target.id {
|
||||
if let Some(ws_map) = node_map.get_mut(socket_id) {
|
||||
// Iterate over ws_map values and send message to all websockets
|
||||
for ws in ws_map.values_mut() {
|
||||
let mut locked_write_stream = ws.lock().await;
|
||||
let _ = locked_write_stream
|
||||
.send(response_data.clone())
|
||||
.await; // TODO: change this to binary
|
||||
}
|
||||
} else {
|
||||
// Send to all websockets
|
||||
for ws_map in node_map.values_mut() {
|
||||
for ws in ws_map.values_mut() {
|
||||
let mut locked_write_stream = ws.lock().await;
|
||||
let _ = locked_write_stream
|
||||
.send(response_data.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
// Send to the websocket if registered
|
||||
if let Some(node_map) = ws_map.get_mut(&target.node) {
|
||||
if let Some(socket_id) = &target.id {
|
||||
if let Some(ws_map) = node_map.get_mut(socket_id) {
|
||||
// Iterate over ws_map values and send message to all websockets
|
||||
for ws in ws_map.values_mut() {
|
||||
let mut locked_write_stream = ws.lock().await;
|
||||
let _ =
|
||||
locked_write_stream.send(response_data.clone()).await;
|
||||
// TODO: change this to binary
|
||||
}
|
||||
} else {
|
||||
// Send to all websockets
|
||||
@ -472,130 +454,136 @@ async fn http_handle_messages(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Do nothing because we don't have a WS for that node
|
||||
// Send to all websockets
|
||||
for ws_map in node_map.values_mut() {
|
||||
for ws in ws_map.values_mut() {
|
||||
let mut locked_write_stream = ws.lock().await;
|
||||
let _ =
|
||||
locked_write_stream.send(response_data.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Do nothing because we don't have a WS for that node
|
||||
}
|
||||
HttpServerMessage::ServerAction(ServerAction { action }) => {
|
||||
if action == "get-jwt-secret" && source.node == our {
|
||||
let id: u64 = rand::random();
|
||||
let message = KernelMessage {
|
||||
id,
|
||||
source: Address {
|
||||
node: our.clone(),
|
||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||
},
|
||||
target: source,
|
||||
rsvp: Some(Address {
|
||||
node: our.clone(),
|
||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||
}),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
ipc: serde_json::json!({
|
||||
"action": "set-jwt-secret"
|
||||
})
|
||||
.to_string()
|
||||
.into_bytes(),
|
||||
metadata: None,
|
||||
}),
|
||||
payload: Some(Payload {
|
||||
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
|
||||
bytes: jwt_secret_bytes.clone(),
|
||||
}),
|
||||
signed_capabilities: None,
|
||||
};
|
||||
}
|
||||
HttpServerMessage::ServerAction(ServerAction { action }) => {
|
||||
if action == "get-jwt-secret" && source.node == our {
|
||||
let id: u64 = rand::random();
|
||||
let message = KernelMessage {
|
||||
id,
|
||||
source: Address {
|
||||
node: our.clone(),
|
||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||
},
|
||||
target: source,
|
||||
rsvp: Some(Address {
|
||||
node: our.clone(),
|
||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||
}),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
ipc: serde_json::json!({
|
||||
"action": "set-jwt-secret"
|
||||
})
|
||||
.to_string()
|
||||
.into_bytes(),
|
||||
metadata: None,
|
||||
}),
|
||||
payload: Some(Payload {
|
||||
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
|
||||
bytes: jwt_secret_bytes.clone(),
|
||||
}),
|
||||
signed_capabilities: None,
|
||||
};
|
||||
|
||||
send_to_loop.send(message).await.unwrap();
|
||||
}
|
||||
send_to_loop.send(message).await.unwrap();
|
||||
}
|
||||
HttpServerMessage::WsRegister(WsRegister {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
HttpServerMessage::WsRegister(WsRegister {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
|
||||
}
|
||||
HttpServerMessage::WsProxyDisconnect(WsProxyDisconnect { channel_id }) => {
|
||||
}
|
||||
HttpServerMessage::WsProxyDisconnect(WsProxyDisconnect { channel_id }) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: "WsDisconnect".to_string(),
|
||||
})
|
||||
.await;
|
||||
// Check the ws_proxies for this channel_id, if it exists, delete the node that forwarded
|
||||
let mut locked_proxies = ws_proxies.lock().await;
|
||||
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: "WsDisconnect".to_string(),
|
||||
content: "disconnected".to_string(),
|
||||
})
|
||||
.await;
|
||||
// Check the ws_proxies for this channel_id, if it exists, delete the node that forwarded
|
||||
let mut locked_proxies = ws_proxies.lock().await;
|
||||
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: "disconnected".to_string(),
|
||||
})
|
||||
.await;
|
||||
proxy_nodes.remove(&source.node);
|
||||
}
|
||||
proxy_nodes.remove(&source.node);
|
||||
}
|
||||
HttpServerMessage::WsMessage(WsMessage {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
target,
|
||||
json,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone())
|
||||
.await;
|
||||
}
|
||||
HttpServerMessage::WsMessage(WsMessage {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
target,
|
||||
json,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
|
||||
|
||||
handle_ws_message(
|
||||
target.clone(),
|
||||
json.clone(),
|
||||
our.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
handle_ws_message(
|
||||
target.clone(),
|
||||
json.clone(),
|
||||
our.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
HttpServerMessage::EncryptedWsMessage(EncryptedWsMessage {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
target,
|
||||
encrypted,
|
||||
nonce,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(
|
||||
ws_proxies.clone(),
|
||||
channel_id.clone(),
|
||||
source.node.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
HttpServerMessage::EncryptedWsMessage(EncryptedWsMessage {
|
||||
auth_token,
|
||||
ws_auth_token: _,
|
||||
channel_id,
|
||||
target,
|
||||
encrypted,
|
||||
nonce,
|
||||
}) => {
|
||||
if let Ok(_node) =
|
||||
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
|
||||
{
|
||||
add_ws_proxy(
|
||||
ws_proxies.clone(),
|
||||
channel_id.clone(),
|
||||
source.node.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
handle_encrypted_ws_message(
|
||||
target.clone(),
|
||||
our.clone(),
|
||||
channel_id.clone(),
|
||||
encrypted.clone(),
|
||||
nonce.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
handle_encrypted_ws_message(
|
||||
target.clone(),
|
||||
our.clone(),
|
||||
channel_id.clone(),
|
||||
encrypted.clone(),
|
||||
nonce.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ impl StandardHost for ProcessWasi {
|
||||
};
|
||||
let our_drive_name = [
|
||||
self.process.metadata.our.process.package(),
|
||||
self.process.metadata.our.process.publisher_node(),
|
||||
self.process.metadata.our.process.publisher(),
|
||||
]
|
||||
.join(":");
|
||||
let Ok(Ok((_, hash_response))) = send_and_await_response(
|
||||
@ -333,7 +333,7 @@ impl StandardHost for ProcessWasi {
|
||||
let new_process_id = t::ProcessId::new(
|
||||
Some(&name),
|
||||
self.process.metadata.our.process.package(),
|
||||
self.process.metadata.our.process.publisher_node(),
|
||||
self.process.metadata.our.process.publisher(),
|
||||
);
|
||||
let Ok(Ok((_, response))) = send_and_await_response(
|
||||
self,
|
||||
@ -1042,7 +1042,7 @@ impl ProcessState {
|
||||
/// persist process_map state for next bootup
|
||||
/// and wait for filesystem to respond in the affirmative
|
||||
async fn persist_state(
|
||||
our_name: &String,
|
||||
our_name: &str,
|
||||
send_to_loop: &t::MessageSender,
|
||||
process_map: &t::ProcessMap,
|
||||
) -> Result<()> {
|
||||
@ -1051,11 +1051,11 @@ async fn persist_state(
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: t::Address {
|
||||
node: our_name.clone(),
|
||||
node: our_name.to_string(),
|
||||
process: KERNEL_PROCESS_ID.clone(),
|
||||
},
|
||||
target: t::Address {
|
||||
node: our_name.clone(),
|
||||
node: our_name.to_string(),
|
||||
process: FILESYSTEM_PROCESS_ID.clone(),
|
||||
},
|
||||
rsvp: None,
|
||||
|
@ -199,8 +199,7 @@ async fn main() {
|
||||
_ = register::register(tx, kill_rx, our_ip.to_string(), http_server_port, disk_keyfile)
|
||||
=> panic!("registration failed"),
|
||||
(our, decoded_keyfile, encoded_keyfile) = async {
|
||||
while let Some(fin) = rx.recv().await { return fin }
|
||||
panic!("registration failed")
|
||||
rx.recv().await.expect("registration failed")
|
||||
} => (our, decoded_keyfile, encoded_keyfile),
|
||||
};
|
||||
|
||||
|
@ -367,60 +367,57 @@ async fn direct_networking(
|
||||
// TODO we can perform some amount of validation here
|
||||
// to prevent some amount of potential DDoS attacks.
|
||||
// can also block based on socket_addr
|
||||
match accept_async(MaybeTlsStream::Plain(stream)).await {
|
||||
Ok(websocket) => {
|
||||
print_debug(&print_tx, "net: received new websocket connection").await;
|
||||
let (peer_id, routing_for, conn) =
|
||||
match recv_connection(
|
||||
&our,
|
||||
&our_ip,
|
||||
&pki,
|
||||
&peers,
|
||||
&mut pending_passthroughs,
|
||||
&keypair,
|
||||
websocket).await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
print_tx.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!("net: recv_connection failed: {e}"),
|
||||
}).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// TODO if their handshake indicates they want us to proxy
|
||||
// for them (aka act as a router for them) we can choose
|
||||
// whether to do so here!
|
||||
// if conn is direct, add peer. if passthrough, add to our
|
||||
// forwarding connections joinset
|
||||
match conn {
|
||||
Connection::Peer(peer_conn) => {
|
||||
save_new_peer(
|
||||
&peer_id,
|
||||
routing_for,
|
||||
peers.clone(),
|
||||
peer_conn,
|
||||
None,
|
||||
&kernel_message_tx,
|
||||
&print_tx
|
||||
).await;
|
||||
}
|
||||
Connection::Passthrough(passthrough_conn) => {
|
||||
forwarding_connections.spawn(maintain_passthrough(
|
||||
passthrough_conn,
|
||||
));
|
||||
}
|
||||
Connection::PendingPassthrough(pending_conn) => {
|
||||
pending_passthroughs.insert(
|
||||
(peer_id.name.clone(), pending_conn.target.clone()),
|
||||
pending_conn
|
||||
);
|
||||
// ignore connections we failed to accept...?
|
||||
if let Ok(websocket) = accept_async(MaybeTlsStream::Plain(stream)).await {
|
||||
print_debug(&print_tx, "net: received new websocket connection").await;
|
||||
let (peer_id, routing_for, conn) =
|
||||
match recv_connection(
|
||||
&our,
|
||||
&our_ip,
|
||||
&pki,
|
||||
&peers,
|
||||
&mut pending_passthroughs,
|
||||
&keypair,
|
||||
websocket).await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
print_tx.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!("net: recv_connection failed: {e}"),
|
||||
}).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// TODO if their handshake indicates they want us to proxy
|
||||
// for them (aka act as a router for them) we can choose
|
||||
// whether to do so here!
|
||||
// if conn is direct, add peer. if passthrough, add to our
|
||||
// forwarding connections joinset
|
||||
match conn {
|
||||
Connection::Peer(peer_conn) => {
|
||||
save_new_peer(
|
||||
&peer_id,
|
||||
routing_for,
|
||||
peers.clone(),
|
||||
peer_conn,
|
||||
None,
|
||||
&kernel_message_tx,
|
||||
&print_tx
|
||||
).await;
|
||||
}
|
||||
Connection::Passthrough(passthrough_conn) => {
|
||||
forwarding_connections.spawn(maintain_passthrough(
|
||||
passthrough_conn,
|
||||
));
|
||||
}
|
||||
Connection::PendingPassthrough(pending_conn) => {
|
||||
pending_passthroughs.insert(
|
||||
(peer_id.name.clone(), pending_conn.target.clone()),
|
||||
pending_conn
|
||||
);
|
||||
}
|
||||
}
|
||||
// ignore connections we failed to accept...?
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,8 +27,8 @@ pub async fn save_new_peer(
|
||||
) {
|
||||
print_debug(print_tx, &format!("net: saving new peer {}", identity.name)).await;
|
||||
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
|
||||
if km.is_some() {
|
||||
peer_tx.send(km.unwrap()).unwrap()
|
||||
if let Some(km) = km {
|
||||
peer_tx.send(km).unwrap()
|
||||
}
|
||||
let peer = Peer {
|
||||
identity: identity.clone(),
|
||||
@ -346,7 +346,7 @@ pub async fn send_uqbar_handshake(
|
||||
keypair: &Ed25519KeyPair,
|
||||
noise_static_key: &[u8],
|
||||
noise: &mut snow::HandshakeState,
|
||||
buf: &mut Vec<u8>,
|
||||
buf: &mut [u8],
|
||||
write_stream: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
|
||||
proxy_request: bool,
|
||||
) -> Result<()> {
|
||||
@ -367,7 +367,7 @@ pub async fn send_uqbar_handshake(
|
||||
|
||||
pub async fn recv_uqbar_handshake(
|
||||
noise: &mut snow::HandshakeState,
|
||||
buf: &mut Vec<u8>,
|
||||
buf: &mut [u8],
|
||||
read_stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
||||
write_stream: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
|
||||
) -> Result<HandshakePayload> {
|
||||
@ -449,11 +449,10 @@ pub async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSen
|
||||
}
|
||||
|
||||
fn strip_0x(s: &str) -> String {
|
||||
if s.starts_with("0x") {
|
||||
s[2..].to_string()
|
||||
} else {
|
||||
s.to_string()
|
||||
if let Some(stripped) = s.strip_prefix("0x") {
|
||||
return stripped.to_string();
|
||||
}
|
||||
s.to_string()
|
||||
}
|
||||
|
||||
pub async fn parse_hello_message(
|
||||
|
@ -83,12 +83,10 @@ impl CommandHistory {
|
||||
/// provided string. otherwise, skip the first <depth> matches.
|
||||
/// yes this is O(n) to provide desired ordering, can revisit if slow
|
||||
fn search(&mut self, find: &str, depth: usize) -> Option<String> {
|
||||
let mut skips = 0;
|
||||
for line in &self.lines {
|
||||
for (skips, line) in self.lines.iter().enumerate() {
|
||||
if line.contains(find) && skips == depth {
|
||||
return Some(line.to_string());
|
||||
}
|
||||
skips += 1;
|
||||
}
|
||||
None
|
||||
}
|
||||
@ -116,7 +114,7 @@ pub async fn terminal(
|
||||
// print initial splash screen
|
||||
println!(
|
||||
"\x1b[38;5;128m{}\x1b[0m",
|
||||
format!(
|
||||
format_args!(
|
||||
r#"
|
||||
|
||||
,, UU
|
||||
|
24
src/types.rs
24
src/types.rs
@ -73,21 +73,13 @@ impl ProcessId {
|
||||
publisher_node,
|
||||
})
|
||||
}
|
||||
pub fn to_string(&self) -> String {
|
||||
[
|
||||
self.process_name.as_str(),
|
||||
self.package_name.as_str(),
|
||||
self.publisher_node.as_str(),
|
||||
]
|
||||
.join(":")
|
||||
}
|
||||
pub fn process(&self) -> &str {
|
||||
&self.process_name
|
||||
}
|
||||
pub fn package(&self) -> &str {
|
||||
&self.package_name
|
||||
}
|
||||
pub fn publisher_node(&self) -> &str {
|
||||
pub fn publisher(&self) -> &str {
|
||||
&self.publisher_node
|
||||
}
|
||||
pub fn en_wit(&self) -> wit::ProcessId {
|
||||
@ -214,13 +206,13 @@ impl OnPanic {
|
||||
|
||||
impl std::fmt::Display for ProcessId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
write!(f, "{}:{}:{}", self.process(), self.package(), self.publisher())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Address {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}@{}", self.node, self.process.to_string(),)
|
||||
write!(f, "{}@{}", self.node, self.process)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1055,10 +1047,10 @@ pub struct EncryptAction {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EncryptorMessage {
|
||||
GetKeyAction(GetKeyAction),
|
||||
DecryptAndForwardAction(DecryptAndForwardAction),
|
||||
EncryptAndForwardAction(EncryptAndForwardAction),
|
||||
DecryptAction(DecryptAction),
|
||||
EncryptAction(EncryptAction),
|
||||
GetKey(GetKeyAction),
|
||||
DecryptAndForward(DecryptAndForwardAction),
|
||||
EncryptAndForward(EncryptAndForwardAction),
|
||||
Decrypt(DecryptAction),
|
||||
Encrypt(EncryptAction),
|
||||
}
|
||||
// encryptor End
|
||||
|
25
src/vfs.rs
25
src/vfs.rs
@ -200,7 +200,7 @@ async fn state_to_bytes(state: &DriveToVfs) -> Vec<u8> {
|
||||
bincode::serialize(&serializable).unwrap()
|
||||
}
|
||||
|
||||
fn bytes_to_state(bytes: &Vec<u8>, state: &mut DriveToVfs) {
|
||||
fn bytes_to_state(bytes: &[u8], state: &mut DriveToVfs) {
|
||||
let serializable: DriveToVfsSerializable = bincode::deserialize(bytes).unwrap();
|
||||
for (id, vfs) in serializable.into_iter() {
|
||||
state.insert(id, Arc::new(Mutex::new(vfs)));
|
||||
@ -549,7 +549,7 @@ pub async fn vfs(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match handle_request(
|
||||
if let Err(e) = handle_request(
|
||||
our_node.clone(),
|
||||
id,
|
||||
source.clone(),
|
||||
@ -566,18 +566,15 @@ pub async fn vfs(
|
||||
send_to_caps_oracle.clone(),
|
||||
response_receiver,
|
||||
).await {
|
||||
Err(e) => {
|
||||
send_to_loop
|
||||
.send(make_error_message(
|
||||
our_node,
|
||||
id,
|
||||
source,
|
||||
e,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
},
|
||||
Ok(_) => {},
|
||||
send_to_loop
|
||||
.send(make_error_message(
|
||||
our_node,
|
||||
id,
|
||||
source,
|
||||
e,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
},
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user