WIP: refactor http_client, remove encryptor

This commit is contained in:
dr-frmr 2023-11-16 23:17:27 -05:00
parent f11d323586
commit 18df6332b4
No known key found for this signature in database
8 changed files with 254 additions and 927 deletions

View File

@ -112,7 +112,7 @@ fn send_ws_update(our: Address, game: Game) -> anyhow::Result<()> {
.target(Address::new(&our.node, "encryptor:sys:uqbar").unwrap())?
.ipc_bytes(
serde_json::json!({
"EncryptAndForwardAction": {
"EncryptAndForward": {
"channel_id": our.process.to_string(),
"forward_to": {
"node": our.node.clone(),

View File

@ -5,7 +5,8 @@
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"net:sys:uqbar"
"net:sys:uqbar",
"http_client:sys:uqbar"
],
"public": true
}

View File

@ -1,429 +0,0 @@
extern crate generic_array;
extern crate num_traits;
extern crate rand;
use crate::types::*;
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm,
Key, // Or `Aes128Gcm`
Nonce,
};
use anyhow::Result;
use generic_array::GenericArray;
use rand::{thread_rng, Rng};
use ring::signature::Ed25519KeyPair;
use rsa::{BigUint, Oaep, RsaPublicKey};
use std::collections::HashMap;
use std::sync::Arc;
use crate::encryptor::num_traits::Num;
fn encrypt_data(secret_key_bytes: [u8; 32], data: Vec<u8>) -> Vec<u8> {
let key = Key::<Aes256Gcm>::from_slice(&secret_key_bytes);
let cipher = Aes256Gcm::new(key);
let mut nonce_bytes: [u8; 12] = [0; 12];
thread_rng().fill(&mut nonce_bytes);
let nonce = Nonce::from_slice(&nonce_bytes);
let ciphertext = cipher
.encrypt(nonce, data.as_ref())
.expect("encryption failure!");
let mut data = ciphertext;
data.extend(nonce_bytes);
data
}
fn decrypt_data(secret_key_bytes: [u8; 32], data: Vec<u8>) -> Vec<u8> {
let nonce_bytes = data[data.len() - 12..].to_vec();
let encrypted_bytes = data[..data.len() - 12].to_vec();
let key = Key::<Aes256Gcm>::from_slice(&secret_key_bytes);
let cipher = Aes256Gcm::new(key);
let nonce = GenericArray::from_slice(&nonce_bytes);
let decrypted_bytes = cipher
.decrypt(nonce, encrypted_bytes.as_ref())
.expect("decryption failure!");
decrypted_bytes
}
pub async fn encryptor(
our: String,
keypair: Arc<Ed25519KeyPair>,
message_tx: MessageSender,
mut recv_in_encryptor: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
// Generally, the secret_id will be the ID that corresponds to a particular app or websocket connection
// For authenticated + encrypted HTTP routes, the secret_id will always be "http_bindings"
let mut secrets: HashMap<String, [u8; 32]> = HashMap::new(); // Store secrets as hex strings? Or as bytes?
while let Some(kernel_message) = recv_in_encryptor.recv().await {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ENCRYPTOR MESSAGE".to_string(),
})
.await;
let KernelMessage {
ref id,
source,
rsvp,
message,
payload,
..
} = kernel_message;
let Message::Request(Request { ipc, .. }) = message else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "encryptor: bad message".to_string(),
})
.await;
continue;
};
match serde_json::from_slice::<EncryptorMessage>(&ipc) {
Ok(message) => {
match message {
EncryptorMessage::GetKey(GetKeyAction {
channel_id,
public_key_hex,
}) => {
let n = BigUint::from_str_radix(&public_key_hex.clone(), 16)
.expect("failed to parse hex string");
let e = BigUint::from(65537u32);
match RsaPublicKey::new(n, e) {
Ok(public_key) => {
let padding = Oaep::new::<sha2::Sha256>();
let mut rng = rand::rngs::OsRng;
let public_key_bytes = hex::decode(public_key_hex)
.expect("failed to decode hex string");
let signed_public_key =
keypair.sign(&public_key_bytes).as_ref().to_vec();
let encrypted_secret: Vec<u8>;
if let Some(secret) = secrets.get(&channel_id) {
// Secret already exists
// Encrypt the secret with the public key and return it
encrypted_secret = public_key
.encrypt(&mut rng, padding, secret)
.expect("failed to encrypt message");
} else {
// Secret does not exist, must create
// Create a new secret, store it, encrypt it with the public key, and return it
let mut secret = [0u8; 32];
thread_rng().fill(&mut secret);
secrets.insert(channel_id, secret);
// Create a new AES-GCM cipher with the given key
// So do I encrypt the
encrypted_secret = public_key
.encrypt(&mut rng, padding, &secret)
.expect("failed to encrypt message");
}
let mut headers = HashMap::new();
headers.insert(
"Content-Type".to_string(),
"application/json".to_string(),
);
let target = match rsvp {
Some(rsvp) => rsvp,
None => Address {
node: source.node.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
};
// Generate and send the response
let response = KernelMessage {
id: *id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::json!({
"status": 201,
"headers": headers,
}).to_string().into_bytes(),
metadata: None,
},
None,
)),
payload: Some(Payload {
mime: Some("application/json".to_string()),
bytes: serde_json::json!({
"encrypted_secret": hex::encode(encrypted_secret).to_string(),
"signed_public_key": hex::encode(&signed_public_key).to_string(),
}).to_string().as_bytes().to_vec(),
}),
signed_capabilities: None,
};
message_tx.send(response).await.unwrap();
}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("Error: {}", e),
})
.await;
}
}
}
EncryptorMessage::DecryptAndForward(DecryptAndForwardAction {
channel_id,
forward_to,
json,
}) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!(
"DECRYPTOR TO FORWARD: {}",
json.clone().unwrap_or_default()
),
})
.await;
// The payload.bytes should be the encrypted data, with the last 12 bytes being the nonce
let Some(payload) = payload else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "No payload".to_string(),
})
.await;
continue;
};
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let decrypted_bytes = decrypt_data(*secret_key_bytes, data);
// Forward the unencrypted data to the target
let id: u64 = rand::random();
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
},
target: forward_to,
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None, // A forwarded message does not expect a response
ipc: json.unwrap_or_default().to_string().into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: decrypted_bytes,
}),
signed_capabilities: None,
};
message_tx.send(message).await.unwrap();
} else {
panic!("No secret found");
}
}
EncryptorMessage::EncryptAndForward(EncryptAndForwardAction {
channel_id,
forward_to,
json,
}) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ENCRYPTOR TO FORWARD".to_string(),
})
.await;
let Some(payload) = payload else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "No payload".to_string(),
})
.await;
continue;
};
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let encrypted_bytes = encrypt_data(*secret_key_bytes, data);
// Forward the ciphertext and nonce_hex to the specified process
let id: u64 = rand::random();
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
},
target: forward_to,
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None, // A forwarded message does not expect a response
ipc: json.unwrap_or_default().to_string().into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: encrypted_bytes,
}),
signed_capabilities: None,
};
message_tx.send(message).await.unwrap();
} else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ERROR: No secret found".to_string(),
})
.await;
}
}
EncryptorMessage::Decrypt(DecryptAction { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ENCRYPTOR TO DECRYPT".to_string(),
})
.await;
let Some(payload) = payload else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "No payload".to_string(),
})
.await;
continue;
};
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let decrypted_bytes = decrypt_data(*secret_key_bytes, data);
let message = KernelMessage {
id: *id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
},
target: source,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: vec![],
metadata: None,
},
None,
)),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: decrypted_bytes,
}),
signed_capabilities: None,
};
message_tx.send(message).await.unwrap();
} else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ERROR: No secret found".to_string(),
})
.await;
}
}
EncryptorMessage::Encrypt(EncryptAction { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ENCRYPTOR TO ENCRYPT".to_string(),
})
.await;
let Some(payload) = payload else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "No payload".to_string(),
})
.await;
continue;
};
let data = payload.bytes.clone();
if let Some(secret_key_bytes) = secrets.get(&channel_id) {
let encrypted_bytes = encrypt_data(*secret_key_bytes, data);
let message = KernelMessage {
id: *id,
source: Address {
node: our.clone(),
process: ENCRYPTOR_PROCESS_ID.clone(),
},
target: source,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: vec![],
metadata: None,
},
None,
)),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: encrypted_bytes,
}),
signed_capabilities: None,
};
message_tx.send(message).await.unwrap();
} else {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "ERROR: No secret found".to_string(),
})
.await;
}
}
}
}
Err(_) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "Not a valid EncryptorMessage".to_string(),
})
.await;
}
}
}
Err(anyhow::anyhow!("encryptor: exited"))
}

View File

@ -1,169 +1,230 @@
use crate::types::*;
use anyhow::Result;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
// Test http_client with these commands in the terminal
// !message tuna http_client {"method": "GET", "uri": "https://jsonplaceholder.typicode.com/posts", "headers": {}, "body": ""}
// !message tuna http_client {"method": "POST", "uri": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}, "body": "{\"title\": \"foo\", \"body\": \"bar\"}"}
// !message tuna http_client {"method": "PUT", "uri": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}, "body": "{\"title\": \"foo\", \"body\": \"bar\"}"}
// !message our http_client {"method": "GET", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {}}
// !message our http_client {"method": "POST", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
// !message our http_client {"method": "PUT", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
//
// http_client.rs types
//
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpRequest {
pub method: String, // must parse to http::Method
pub version: Option<String>, // must parse to http::Version
pub url: String, // must parse to url::Url
pub headers: HashMap<String, String>,
// BODY is stored in the payload, as bytes
// TIMEOUT is stored in the message expect_response
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpResponse {
pub status: u16,
pub headers: HashMap<String, String>,
// BODY is stored in the payload, as bytes
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpClientError {
#[error("http_client: request could not be parsed to HttpRequest: {}.", req)]
BadRequest { req: String },
#[error("http_client: http method not supported: {}", method)]
BadMethod { method: String },
#[error("http_client: url could not be parsed: {}", url)]
BadUrl { url: String },
#[error("http_client: http version not supported: {}", version)]
BadVersion { version: String },
#[error("http_client: failed to execute request {}", error)]
RequestFailed { error: String },
}
pub async fn http_client(
our_name: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
_print_tx: PrintSender,
) -> Result<()> {
while let Some(message) = recv_in_client.recv().await {
let KernelMessage {
id,
source,
rsvp,
message:
Message::Request(Request {
expects_response,
ipc,
..
}),
payload,
..
} = message.clone()
else {
return Err(anyhow::anyhow!("http_client: bad message"));
};
let client = reqwest::Client::new();
let our_name = Arc::new(our_name);
let our_name = our_name.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_message(
our_name.clone(),
send_to_loop.clone(),
id,
rsvp,
while let Some(KernelMessage {
id,
source,
rsvp,
message:
Message::Request(Request {
expects_response,
source.clone(),
ipc,
{
if let Some(payload) = payload {
Some(payload.bytes)
} else {
None
}
},
print_tx.clone(),
)
.await
{
send_to_loop
.send(make_error_message(our_name.clone(), id, source, e))
.await
.unwrap();
}
});
..
}),
payload,
..
}) = recv_in_client.recv().await
{
tokio::spawn(handle_message(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ipc,
payload,
client.clone(),
send_to_loop.clone(),
));
}
Err(anyhow::anyhow!("http_client: exited"))
Err(anyhow::anyhow!("http_client: loop died"))
}
async fn handle_message(
our: String,
send_to_loop: MessageSender,
our: Arc<String>,
id: u64,
rsvp: Option<Address>,
target: Address,
expects_response: Option<u64>,
source: Address,
json: Vec<u8>,
body: Option<Vec<u8>>,
_print_tx: PrintSender,
) -> Result<(), HttpClientError> {
let target = if expects_response.is_some() {
source.clone()
} else {
let Some(rsvp) = rsvp else {
return Err(HttpClientError::BadRsvp);
};
rsvp.clone()
};
let req: HttpClientRequest = match serde_json::from_slice(&json) {
body: Option<Payload>,
client: reqwest::Client,
send_to_loop: MessageSender,
) {
let req: HttpRequest = match serde_json::from_slice(&json) {
Ok(req) => req,
Err(e) => {
return Err(HttpClientError::BadJson {
json: String::from_utf8(json).unwrap_or_default(),
error: format!("{}", e),
})
Err(_e) => {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: String::from_utf8(json).unwrap_or_default(),
},
send_to_loop,
)
.await;
return;
}
};
let client = reqwest::Client::new();
let request_builder = match req.method.to_uppercase()[..].to_string().as_str() {
"GET" => client.get(req.uri),
"PUT" => client.put(req.uri),
"POST" => client.post(req.uri),
"DELETE" => client.delete(req.uri),
method => {
return Err(HttpClientError::BadMethod {
method: method.into(),
});
}
let Ok(req_method) = http::Method::from_bytes(req.method.as_bytes()) else {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadMethod { method: req.method },
send_to_loop,
)
.await;
return;
};
let request = request_builder
let mut request_builder = client.request(req_method, req.url);
if let Some(version) = req.version {
request_builder = match version.as_str() {
"HTTP/0.9" => request_builder.version(http::Version::HTTP_09),
"HTTP/1.0" => request_builder.version(http::Version::HTTP_10),
"HTTP/1.1" => request_builder.version(http::Version::HTTP_11),
"HTTP/2.0" => request_builder.version(http::Version::HTTP_2),
"HTTP/3.0" => request_builder.version(http::Version::HTTP_3),
_ => {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadVersion { version },
send_to_loop,
)
.await;
return;
}
}
}
if let Some(payload) = body {
request_builder = request_builder.body(payload.bytes);
}
let Ok(request) = request_builder
.headers(deserialize_headers(req.headers))
.body(body.unwrap_or_default())
.build()
.unwrap();
let response = match client.execute(request).await {
Ok(response) => response,
Err(e) => {
return Err(HttpClientError::RequestFailed {
error: format!("{}", e),
});
}
};
let http_client_response = HttpClientResponse {
status: response.status().as_u16(),
headers: serialize_headers(&response.headers().clone()),
};
let message = KernelMessage {
id,
source: Address {
node: our,
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<HttpClientResponse, HttpClientError>>(&Ok(
http_client_response,
))
.unwrap(),
metadata: None,
else {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::RequestFailed {
error: "failed to build request".into(),
},
None,
)),
payload: Some(Payload {
mime: Some("application/json".into()),
bytes: response.bytes().await.unwrap().to_vec(),
}),
signed_capabilities: None,
send_to_loop,
)
.await;
return;
};
send_to_loop.send(message).await.unwrap();
Ok(())
match client.execute(request).await {
Ok(response) => {
if expects_response.is_some() {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<HttpResponse, HttpClientError>>(
&Ok(HttpResponse {
status: response.status().as_u16(),
headers: serialize_headers(&response.headers()),
}),
)
.unwrap(),
metadata: None,
},
None,
)),
payload: Some(Payload {
mime: None,
bytes: response.bytes().await.unwrap_or_default().to_vec(),
}),
signed_capabilities: None,
})
.await;
}
}
Err(e) => {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::RequestFailed {
error: e.to_string(),
},
send_to_loop,
)
.await;
}
}
}
//
// helpers
//
fn to_pascal_case(s: &str) -> String {
s.split('-')
.map(|word| {
@ -198,30 +259,38 @@ fn deserialize_headers(hashmap: HashMap<String, String>) -> HeaderMap {
header_map
}
fn make_error_message(
our_name: String,
async fn make_error_message(
our: Arc<String>,
id: u64,
source: Address,
target: Address,
expects_response: Option<u64>,
error: HttpClientError,
) -> KernelMessage {
KernelMessage {
id,
source: source.clone(),
target: Address {
node: our_name.clone(),
process: source.process.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<HttpClientResponse, HttpClientError>>(&Err(error))
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
send_to_loop: MessageSender,
) {
if expects_response.is_some() {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<HttpResponse, HttpClientError>>(&Err(
error,
))
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
}
}

View File

@ -501,88 +501,6 @@ async fn http_handle_messages(
send_to_loop.send(message).await.unwrap();
}
}
HttpServerMessage::WsRegister(WsRegister {
auth_token,
ws_auth_token: _,
channel_id,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
}
}
HttpServerMessage::WsProxyDisconnect(WsProxyDisconnect { channel_id }) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "WsDisconnect".to_string(),
})
.await;
// Check the ws_proxies for this channel_id, if it exists, delete the node that forwarded
let mut locked_proxies = ws_proxies.lock().await;
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "disconnected".to_string(),
})
.await;
proxy_nodes.remove(&source.node);
}
}
HttpServerMessage::WsMessage(WsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
json,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(ws_proxies.clone(), channel_id, source.node.clone()).await;
handle_ws_message(
target.clone(),
json.clone(),
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
}
HttpServerMessage::EncryptedWsMessage(EncryptedWsMessage {
auth_token,
ws_auth_token: _,
channel_id,
target,
encrypted,
nonce,
}) => {
if let Ok(_node) =
parse_auth_token(auth_token, jwt_secret_bytes.clone().to_vec())
{
add_ws_proxy(
ws_proxies.clone(),
channel_id.clone(),
source.node.clone(),
)
.await;
handle_encrypted_ws_message(
target.clone(),
our.clone(),
channel_id.clone(),
encrypted.clone(),
nonce.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
}
}
}
}

View File

@ -114,7 +114,6 @@ pub async fn handle_incoming_ws(
write_stream: SharedWriteStream,
ws_id: u64,
) {
let cloned_parsed_msg = parsed_msg.clone();
match parsed_msg {
WebSocketClientMessage::WsRegister(WsRegister {
ws_auth_token,
@ -129,19 +128,13 @@ pub async fn handle_incoming_ws(
.await;
// Get node from auth token
if let Ok(node) = parse_auth_token(ws_auth_token, jwt_secret_bytes.clone()) {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("NODE: {}", node),
})
.await;
if node != our {
return;
}
handle_ws_register(
node,
cloned_parsed_msg,
channel_id.clone(),
our.clone(),
&our,
&channel_id,
websockets.clone(),
send_to_loop.clone(),
print_tx.clone(),
write_stream.clone(),
ws_id,
@ -170,29 +163,16 @@ pub async fn handle_incoming_ws(
content: format!("ACTION: {}", target.node.clone()),
})
.await;
// TODO: restrict sending actions to ourself and nodes for which we are proxying
// TODO: use the channel_id
if let Ok(node) = parse_auth_token(ws_auth_token, jwt_secret_bytes.clone()) {
if node == target.node {
if target.node == our {
handle_ws_message(
target.clone(),
json.clone(),
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
} else {
proxy_ws_message(
node,
cloned_parsed_msg,
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
if our == target.node && node == target.node {
handle_ws_message(
target.clone(),
json.clone(),
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
}
}
@ -212,28 +192,17 @@ pub async fn handle_incoming_ws(
})
.await;
if let Ok(node) = parse_auth_token(ws_auth_token, jwt_secret_bytes.clone()) {
if node == target.node {
if target.node == our {
handle_encrypted_ws_message(
target.clone(),
our.clone(),
channel_id.clone(),
encrypted.clone(),
nonce.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
} else {
proxy_ws_message(
node,
cloned_parsed_msg,
our.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
if node == target.node && our == target.node {
handle_encrypted_ws_message(
target.clone(),
our.clone(),
channel_id.clone(),
encrypted.clone(),
nonce.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}
}
}
@ -270,12 +239,9 @@ pub fn binary_encoded_string_to_bytes(s: &str) -> Vec<u8> {
}
pub async fn handle_ws_register(
node: String,
parsed_msg: WebSocketClientMessage,
channel_id: String,
our: String,
our_name: &str,
channel_id: &str,
websockets: WebSockets,
send_to_loop: MessageSender,
print_tx: PrintSender,
write_stream: SharedWriteStream,
ws_id: u64,
@ -283,46 +249,12 @@ pub async fn handle_ws_register(
// let _ = print_tx.send(Printout { verbosity: 1, content: format!("1.2 {}", node) }).await;
// TODO: restrict registration to ourself and nodes for which we are proxying
let mut ws_map = websockets.lock().await;
let node_map = ws_map.entry(node.clone()).or_insert(HashMap::new());
let id_map = node_map.entry(channel_id.clone()).or_insert(HashMap::new());
let node_map = ws_map.entry(our_name.to_string()).or_insert(HashMap::new());
let id_map = node_map
.entry(channel_id.to_string())
.or_insert(HashMap::new());
id_map.insert(ws_id, write_stream.clone());
// Send a message to the target node to add to let it know we are proxying
if node != our {
let id: u64 = rand::random();
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: node.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::json!(parsed_msg).to_string().into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()),
bytes: vec![],
}),
signed_capabilities: None,
};
send_to_loop.send(message).await.unwrap();
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "WEBSOCKET CHANNEL FORWARDED!".to_string(),
})
.await;
}
let _ = print_tx
.send(Printout {
verbosity: 1,
@ -421,54 +353,6 @@ pub async fn handle_encrypted_ws_message(
send_to_loop.send(message).await.unwrap();
}
pub async fn proxy_ws_message(
node: String,
parsed_msg: WebSocketClientMessage,
our: String,
send_to_loop: MessageSender,
_print_tx: PrintSender,
) {
let id: u64 = rand::random();
let message = KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node,
process: HTTP_SERVER_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::json!(parsed_msg).to_string().into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()),
bytes: vec![],
}),
signed_capabilities: None,
};
send_to_loop.send(message).await.unwrap();
}
pub async fn add_ws_proxy(ws_proxies: WebSocketProxies, channel_id: String, source_node: String) {
let mut locked_proxies = ws_proxies.lock().await;
if let Some(proxy_nodes) = locked_proxies.get_mut(&channel_id) {
if !proxy_nodes.contains(&source_node) {
proxy_nodes.insert(source_node);
}
} else {
let mut proxy_nodes = HashSet::new();
proxy_nodes.insert(source_node);
locked_proxies.insert(channel_id, proxy_nodes);
}
}
pub async fn send_ws_disconnect(
node: String,
our: String,

View File

@ -8,7 +8,6 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio::{fs, time::timeout};
mod encryptor;
mod eth_rpc;
mod filesystem;
mod http_client;
@ -35,7 +34,6 @@ const HTTP_CHANNEL_CAPACITY: usize = 32;
const HTTP_CLIENT_CHANNEL_CAPACITY: usize = 32;
const ETH_RPC_CHANNEL_CAPACITY: usize = 32;
const VFS_CHANNEL_CAPACITY: usize = 1_000;
const ENCRYPTOR_CHANNEL_CAPACITY: usize = 32;
const CAP_CHANNEL_CAPACITY: usize = 1_000;
#[cfg(feature = "llm")]
const LLM_CHANNEL_CAPACITY: usize = 32;
@ -104,9 +102,6 @@ async fn main() {
// vfs maintains metadata about files in fs for processes
let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(VFS_CHANNEL_CAPACITY);
// encryptor handles end-to-end encryption for client messages
let (encryptor_sender, encryptor_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ENCRYPTOR_CHANNEL_CAPACITY);
// terminal receives prints via this channel, all other modules send prints
let (print_sender, print_receiver): (PrintSender, PrintReceiver) =
mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
@ -253,11 +248,6 @@ async fn main() {
vfs_message_sender,
true,
),
(
ProcessId::new(Some("encryptor"), "sys", "uqbar"),
encryptor_sender,
false,
),
];
#[cfg(feature = "llm")]
@ -362,13 +352,6 @@ async fn main() {
caps_oracle_sender.clone(),
vfs_messages,
));
tasks.spawn(encryptor::encryptor(
our.name.clone(),
networking_keypair_arc.clone(),
kernel_message_sender.clone(),
encryptor_receiver,
print_sender.clone(),
));
#[cfg(feature = "llm")]
{
tasks.spawn(llm::llm(

View File

@ -833,53 +833,6 @@ impl VfsError {
}
}
//
// http_client.rs types
//
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpClientRequest {
pub uri: String,
pub method: String,
pub headers: HashMap<String, String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpClientResponse {
pub status: u16,
pub headers: HashMap<String, String>,
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpClientError {
#[error("http_client: rsvp is None but message is expecting response")]
BadRsvp,
#[error("http_client: no json in request")]
NoJson,
#[error(
"http_client: JSON payload could not be parsed to HttpClientRequest: {error}. Got {:?}.",
json
)]
BadJson { json: String, error: String },
#[error("http_client: http method not supported: {:?}", method)]
BadMethod { method: String },
#[error("http_client: failed to execute request {:?}", error)]
RequestFailed { error: String },
}
#[allow(dead_code)]
impl HttpClientError {
pub fn kind(&self) -> &str {
match *self {
HttpClientError::BadRsvp { .. } => "BadRsvp",
HttpClientError::NoJson { .. } => "NoJson",
HttpClientError::BadJson { .. } => "BadJson",
HttpClientError::BadMethod { .. } => "BadMethod",
HttpClientError::RequestFailed { .. } => "RequestFailed",
}
}
}
//
// custom kernel displays
//
@ -975,10 +928,6 @@ pub enum HttpServerMessage {
},
WebSocketPush(WebSocketPush),
ServerAction(ServerAction),
WsRegister(WsRegister), // Coming from a proxy
WsProxyDisconnect(WsProxyDisconnect), // Coming from a proxy
WsMessage(WsMessage), // Coming from a proxy
EncryptedWsMessage(EncryptedWsMessage), // Coming from a proxy
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -988,12 +937,6 @@ pub struct WsRegister {
pub channel_id: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WsProxyDisconnect {
// Doesn't require auth because it's coming from the proxy
pub channel_id: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WsMessage {
pub ws_auth_token: String,
@ -1019,45 +962,3 @@ pub enum WebSocketClientMessage {
WsMessage(WsMessage),
EncryptedWsMessage(EncryptedWsMessage),
}
// http_server End
// encryptor Start
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetKeyAction {
pub channel_id: String,
pub public_key_hex: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DecryptAndForwardAction {
pub channel_id: String,
pub forward_to: Address, // node, process
pub json: Option<serde_json::Value>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptAndForwardAction {
pub channel_id: String,
pub forward_to: Address, // node, process
pub json: Option<serde_json::Value>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DecryptAction {
pub channel_id: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptAction {
pub channel_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum EncryptorMessage {
GetKey(GetKeyAction),
DecryptAndForward(DecryptAndForwardAction),
EncryptAndForward(EncryptAndForwardAction),
Decrypt(DecryptAction),
Encrypt(EncryptAction),
}
// encryptor End