mirror of
https://github.com/uqbar-dao/nectar.git
synced 2025-01-02 05:28:22 +03:00
quickfix: net dispose of old forwarding connection tasks, refactor !hi responses to DRY
This commit is contained in:
parent
87d0afa950
commit
77338b9c31
@ -354,7 +354,15 @@ async fn direct_networking(
|
||||
}
|
||||
}
|
||||
}
|
||||
// 3. receive incoming TCP connections
|
||||
// 3. join any closed forwarding connection tasks and destroy them
|
||||
// TODO can do more here if desired
|
||||
Some(res) = forwarding_connections.join_next() => {
|
||||
match res {
|
||||
Ok(()) => continue,
|
||||
Err(_e) => continue,
|
||||
}
|
||||
}
|
||||
// 4. receive incoming TCP connections
|
||||
Ok((stream, _socket_addr)) = tcp.accept() => {
|
||||
// TODO we can perform some amount of validation here
|
||||
// to prevent some amount of potential DDoS attacks.
|
||||
@ -822,7 +830,7 @@ async fn handle_local_message(
|
||||
) -> Result<()> {
|
||||
print_debug(&print_tx, "net: handling local message").await;
|
||||
let ipc = match km.message {
|
||||
Message::Request(request) => request.ipc,
|
||||
Message::Request(ref request) => &request.ipc,
|
||||
Message::Response((response, _context)) => {
|
||||
// these are received as a router, when we send ConnectionRequests
|
||||
// to a node we do routing for.
|
||||
@ -846,7 +854,7 @@ async fn handle_local_message(
|
||||
};
|
||||
|
||||
if km.source.node != our.name {
|
||||
if let Ok(act) = rmp_serde::from_slice::<NetActions>(&ipc) {
|
||||
if let Ok(act) = rmp_serde::from_slice::<NetActions>(ipc) {
|
||||
match act {
|
||||
NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => {
|
||||
// for now, we don't get these from remote.
|
||||
@ -912,43 +920,13 @@ async fn handle_local_message(
|
||||
};
|
||||
// if we can't parse this to a netaction, treat it as a hello and print it
|
||||
// respond to a text message with a simple "delivered" response
|
||||
print_tx
|
||||
.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"\x1b[3;32m{}: {}\x1b[0m",
|
||||
km.source.node,
|
||||
std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!")
|
||||
),
|
||||
})
|
||||
.await?;
|
||||
kernel_message_tx
|
||||
.send(KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.name.clone(),
|
||||
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
|
||||
},
|
||||
target: km.rsvp.unwrap_or(km.source),
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
ipc: "delivered".as_bytes().to_vec(),
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
)),
|
||||
payload: None,
|
||||
signed_capabilities: None,
|
||||
})
|
||||
.await?;
|
||||
parse_hello_message(&our, &km, ipc, &kernel_message_tx, &print_tx).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
// available commands: "peers", "pki", "names", "diagnostics"
|
||||
// first parse as raw string, then deserialize to NetActions object
|
||||
let mut printout = String::new();
|
||||
match std::str::from_utf8(&ipc) {
|
||||
match std::str::from_utf8(ipc) {
|
||||
Ok("peers") => {
|
||||
printout.push_str(&format!(
|
||||
"{:#?}",
|
||||
@ -988,7 +966,7 @@ async fn handle_local_message(
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
match rmp_serde::from_slice::<NetActions>(&ipc) {
|
||||
match rmp_serde::from_slice::<NetActions>(ipc) {
|
||||
Ok(NetActions::ConnectionRequest(_)) => {
|
||||
// we shouldn't receive these from ourselves.
|
||||
}
|
||||
@ -1033,16 +1011,8 @@ async fn handle_local_message(
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
print_tx
|
||||
.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"\x1b[3;32m{}: {}\x1b[0m",
|
||||
km.source.node,
|
||||
std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!")
|
||||
),
|
||||
})
|
||||
.await?;
|
||||
parse_hello_message(&our, &km, ipc, &kernel_message_tx, &print_tx).await?;
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -461,6 +461,47 @@ fn strip_0x(s: &str) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn parse_hello_message(
|
||||
our: &Identity,
|
||||
km: &KernelMessage,
|
||||
ipc: &[u8],
|
||||
kernel_message_tx: &MessageSender,
|
||||
print_tx: &PrintSender,
|
||||
) -> Result<()> {
|
||||
print_tx
|
||||
.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"\x1b[3;32m{}: {}\x1b[0m",
|
||||
km.source.node,
|
||||
std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!")
|
||||
),
|
||||
})
|
||||
.await?;
|
||||
kernel_message_tx
|
||||
.send(KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.name.clone(),
|
||||
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
|
||||
},
|
||||
target: km.rsvp.as_ref().unwrap_or(&km.source).clone(),
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
ipc: "delivered".as_bytes().to_vec(),
|
||||
metadata: None,
|
||||
},
|
||||
None,
|
||||
)),
|
||||
payload: None,
|
||||
signed_capabilities: None,
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn print_debug(print_tx: &PrintSender, content: &str) {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
|
Loading…
Reference in New Issue
Block a user