WS Client connections

This commit is contained in:
Will Galebach 2024-01-03 15:28:38 +00:00
parent e9172423d2
commit d424a95ea4
2 changed files with 474 additions and 14 deletions

View File

@ -1,15 +1,26 @@
use crate::http::types::*;
use crate::types::*;
use anyhow::Result;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use futures::stream::SplitSink;
use futures::SinkExt;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite};
use tokio_tungstenite::tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use ethers_providers::StreamExt;
// Test http_client with these commands in the terminal
// !message our http_client {"method": "GET", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {}}
// !message our http_client {"method": "POST", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
// !message our http_client {"method": "PUT", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
// Outgoing WebSocket connections are stored by the source process ID and the channel_id
type WebSocketId = (ProcessId, u32);
type WebSocketMap = HashMap<WebSocketId, SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>>;
pub async fn http_client(
our_name: String,
send_to_loop: MessageSender,
@ -19,6 +30,9 @@ pub async fn http_client(
let client = reqwest::Client::new();
let our_name = Arc::new(our_name);
let ws_streams: Arc<Mutex<WebSocketMap>> =
Arc::new(Mutex::new(HashMap::new()));
while let Some(KernelMessage {
id,
source,
@ -33,22 +47,287 @@ pub async fn http_client(
..
}) = recv_in_client.recv().await
{
tokio::spawn(handle_message(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ipc,
payload,
client.clone(),
send_to_loop.clone(),
print_tx.clone(),
));
// First check if a WebSocketClientAction, otherwise assume it's an OutgoingHttpRequest
if let Ok(ws_action) = serde_json::from_slice::<WebSocketClientAction>(&ipc) {
let ws_streams_clone = Arc::clone(&ws_streams);
tokio::spawn(handle_websocket_action(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ws_action,
payload,
ws_streams_clone,
send_to_loop.clone(),
print_tx.clone(),
));
} else {
tokio::spawn(handle_http_request(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ipc,
payload,
client.clone(),
send_to_loop.clone(),
print_tx.clone(),
));
}
}
Err(anyhow::anyhow!("http_client: loop died"))
}
async fn handle_message(
async fn handle_websocket_action(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
ws_action: WebSocketClientAction,
payload: Option<Payload>,
ws_streams: Arc<Mutex<WebSocketMap>>,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
match ws_action {
WebSocketClientAction::Open { url, headers, channel_id } => {
connect_websocket(
our,
id,
target.clone(),
expects_response,
&url,
headers,
channel_id,
ws_streams,
send_to_loop,
print_tx,
).await;
},
WebSocketClientAction::Push { channel_id, message_type } => {
send_ws_push(
our,
id,
target,
expects_response,
channel_id,
message_type,
payload,
ws_streams,
send_to_loop,
).await;
},
WebSocketClientAction::Close { channel_id } => {
close_ws_connection(
our,
id,
target,
expects_response,
channel_id,
ws_streams,
send_to_loop,
).await;
}
}
}
async fn insert_ws (
ws_streams: &Arc<Mutex<WebSocketMap>>,
sink: SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>,
source: Address,
channel_id: u32,
) {
let mut ws_streams = ws_streams.lock().await;
ws_streams.insert((source.process, channel_id), sink);
}
async fn connect_websocket(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
url: &str,
headers: HashMap<String, String>,
channel_id: u32,
ws_streams: Arc<Mutex<WebSocketMap>>,
send_to_loop: MessageSender,
_print_tx: PrintSender,
) {
let Ok(url) = url::Url::parse(url) else {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: "failed to parse url".into(),
},
send_to_loop,
).await;
return;
};
let Ok(mut req) = url.clone().into_client_request() else {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: "failed to parse url into client request".into(),
},
send_to_loop,
).await;
return;
};
let req_headers = req.headers_mut();
for (key, value) in headers.clone() {
req_headers.insert(
HeaderName::from_bytes(key.as_bytes()).unwrap(),
HeaderValue::from_str(&value).unwrap(),
);
}
let ws_stream = match connect_async(req).await {
Ok((ws_stream, _)) => ws_stream,
Err(_) => {
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::RequestFailed {
error: "failed to connect to websocket".into(),
},
send_to_loop,
)
.await;
return;
}
};
let (sink, mut stream) = ws_stream.split();
insert_ws(&ws_streams, sink, target.clone(), channel_id).await;
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<WebSocketClientAction>(&WebSocketClientAction::Open {
url: url.to_string(),
headers,
channel_id,
})
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
// Handle different types of messages here
match msg {
TungsteniteMessage::Text(text) => {
// send a Request to the target with the text as payload
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Push {
channel_id,
message_type: WsMessageType::Text,
},
Some(Payload {
mime: Some("text/plain".into()),
bytes: text.into_bytes(),
}),
send_to_loop.clone(),
).await;
},
TungsteniteMessage::Binary(bytes) => {
// send a Request to the target with the binary as payload
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Push {
channel_id,
message_type: WsMessageType::Binary,
},
Some(Payload {
mime: Some("application/octet-stream".into()),
bytes,
}),
send_to_loop.clone(),
).await;
},
TungsteniteMessage::Close(_) => {
// send a websocket close Request to the target
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Close {
channel_id,
},
None,
send_to_loop.clone(),
).await;
// remove the websocket from the map
let mut ws_streams = ws_streams.lock().await;
ws_streams.remove(&(target.process.clone(), channel_id));
},
_ => (), // Handle other message types as needed
}
}
Err(e) => {
println!("WebSocket Client Error ({}): {:?}", channel_id, e);
// The connection was closed/reset by the remote server, so we'll remove and close it
match ws_streams.lock().await.get_mut(&(target.process.clone(), channel_id)) {
Some(ws_sink) => {
let _ = ws_sink.close().await;
},
None => {}
}
ws_streams.lock().await.remove(&(target.process.clone(), channel_id));
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Close {
channel_id,
},
None,
send_to_loop.clone(),
).await;
break;
}
}
};
}
async fn handle_http_request(
our: Arc<String>,
id: u64,
target: Address,
@ -277,3 +556,166 @@ async fn make_error_message(
.await;
}
}
async fn send_ws_push(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
channel_id: u32,
message_type: WsMessageType,
payload: Option<Payload>,
ws_streams: Arc<Mutex<WebSocketMap>>,
send_to_loop: MessageSender,
) {
let mut streams = ws_streams.lock().await;
let Some(ws_stream) = streams.get_mut(&(target.process.clone(), channel_id)) else {
// send an error message to the target
return;
};
let result = match message_type {
WsMessageType::Text => {
let Some(payload) = payload else {
// send an error message to the target
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: "no payload".into()
},
send_to_loop,
).await;
return;
};
let Ok(text) = String::from_utf8(payload.bytes) else {
// send an error message to the target
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: "failed to convert payload to string".into()
},
send_to_loop,
).await;
return;
};
ws_stream.send(TungsteniteMessage::Text(text)).await
},
WsMessageType::Binary => {
let Some(payload) = payload else {
// send an error message to the target
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::BadRequest {
req: "no payload".into()
},
send_to_loop,
).await;
return;
};
ws_stream.send(TungsteniteMessage::Binary(payload.bytes)).await
},
WsMessageType::Ping => {
// send a Request to the target with the ping as payload
ws_stream.send(TungsteniteMessage::Ping(vec![])).await
},
WsMessageType::Pong => {
// send a Request to the target with the pong as payload
ws_stream.send(TungsteniteMessage::Pong(vec![])).await
},
};
match result {
Ok(_) => {},
Err(_) => {
// send an error message to the target
make_error_message(
our,
id,
target,
expects_response,
HttpClientError::RequestFailed {
error: "failed to send message".into()
},
send_to_loop,
).await;
}
}
}
async fn close_ws_connection(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
channel_id: u32,
ws_streams: Arc<Mutex<WebSocketMap>>,
send_to_loop: MessageSender,
) {
let mut streams = ws_streams.lock().await;
let Some(ws_sink) = streams.get_mut(&(target.process.clone(), channel_id)) else {
// send an error message to the target
make_error_message(
our,
id,
target.clone(),
expects_response,
HttpClientError::BadRequest {
req: format!("No open WebSocket matching {}, {}", target.process.to_string(), channel_id),
},
send_to_loop,
).await;
return;
};
// Close the stream. The stream is closed even on error.
match ws_sink.close().await {
Ok(_) => {},
Err(_) => {}
}
streams.remove(&(target.process, channel_id));
}
async fn handle_ws_message(
our: Arc<String>,
id: u64,
target: Address,
action: WebSocketClientAction,
payload: Option<Payload>,
send_to_loop: MessageSender,
) {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Request(
Request {
inherit: false,
ipc: serde_json::to_vec::<WebSocketClientAction>(&action)
.unwrap(),
expects_response: None,
metadata: None,
}
),
payload,
signed_capabilities: None,
})
.await;
}

View File

@ -58,6 +58,24 @@ pub struct HttpResponse {
// BODY is stored in the payload, as bytes
}
/// WebSocket Client Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the `http_client:sys:uqbar` service.
#[derive(Debug, Serialize, Deserialize)]
pub enum WebSocketClientAction {
Open {
url: String,
headers: HashMap<String, String>,
channel_id: u32,
},
Push {
channel_id: u32,
message_type: WsMessageType,
},
Close {
channel_id: u32,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RpcResponseBody {
pub ipc: Vec<u8>,