Merge pull request #22 from uqbar-dao/dr/drop-peer-fix

networking: fix to drop offline peers faster + add 'delivered' response for !hi messages
This commit is contained in:
dr-frmr 2023-10-13 12:00:26 -04:00 committed by GitHub
commit 1e98b9209b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 18 deletions

View File

@ -29,7 +29,7 @@ fn parse_command(our_name: &str, line: String) {
},
&Request {
inherit: false,
expects_response: None,
expects_response: Some(5),
ipc: Some(message.into()),
metadata: None,
},
@ -62,9 +62,9 @@ fn parse_command(our_name: &str, line: String) {
} else {
target_node.into()
},
process: ProcessId::from_str(target_process).unwrap_or_else(|_|
ProcessId::from_str(&format!("{}:sys:uqbar", target_process)).unwrap(),
),
process: ProcessId::from_str(target_process).unwrap_or_else(|_| {
ProcessId::from_str(&format!("{}:sys:uqbar", target_process)).unwrap()
}),
},
&Request {
inherit: false,
@ -108,7 +108,11 @@ impl Guest for Component {
};
parse_command(&our.node, command);
}
_ => continue,
Message::Response((Response { ipc, metadata }, _)) => {
if let Some(txt) = &ipc {
print_to_terminal(0, &format!("net response: {}", txt));
}
}
}
}
}

View File

@ -217,10 +217,18 @@ pub async fn maintain_connection(
// with the matching peer handler "sender".
//
if let Some(peer) = peers.read().await.get(to) {
let _ = peer.sender.send((
let id = *id;
let to = to.clone();
match peer.sender.send((
PeerMessage::Net(net_message),
Some(forwarding_ack_tx.clone()),
));
)) {
Ok(_) => {}
Err(_) => {
peers.write().await.remove(&to);
message_tx.send((NetworkMessage::Nack(id), None)).unwrap();
}
}
} else {
// if we don't have the peer, throw a nack.
// println!("net: nacking message with id {id}\r");
@ -467,10 +475,10 @@ async fn peer_handler(
break;
}
println!("net: failed to deserialize message from {}\r", who);
continue;
break;
}
println!("net: failed to decrypt message from {}, could be spoofer\r", who);
continue;
break;
}
} => {
// println!("net: lost peer {who}\r");

View File

@ -87,6 +87,7 @@ pub async fn networking(
keys.clone(),
pki.clone(),
names.clone(),
kernel_message_tx.clone(),
print_tx.clone(),
)
.await;
@ -98,8 +99,16 @@ pub async fn networking(
//
if let Some(peer) = peers_read.get(target) {
// println!("net: direct send to known peer\r");
let _ = peer.sender.send((PeerMessage::Raw(km.clone()), None));
continue;
match peer.sender.send((PeerMessage::Raw(km.clone()), None)) {
Ok(_) => continue,
Err(_) => {
// println!("net: failed to send to known peer\r");
drop(peers_read);
peers.write().await.remove(target);
error_offline(km, &network_error_tx).await;
continue;
}
}
}
drop(peers_read);
//
@ -585,14 +594,14 @@ async fn connect_to_routers(
let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await {
Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake,
_ => {
println!("net: failed handshake with {router_name}\r");
// println!("net: failed handshake with {router_name}\r");
conn_handle.abort();
let _ = routers_to_try_tx.send(router_name);
continue;
}
};
let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &router_id) else {
println!("net: failed handshake with {router_name}\r");
// println!("net: failed handshake with {router_name}\r");
conn_handle.abort();
let _ = routers_to_try_tx.send(router_name);
continue;
@ -677,10 +686,13 @@ async fn handle_incoming_message(
keys: PeerKeys,
pki: OnchainPKI,
names: PKINames,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) {
let data = match km.message {
Message::Response(_) => return,
Message::Response(_) => {
return;
}
Message::Request(request) => match request.ipc {
None => return,
Some(ipc) => ipc,
@ -688,10 +700,31 @@ async fn handle_incoming_message(
};
if km.source.node != our.name {
// respond to a text message with a simple "delivered" response
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("\x1b[3;32m{}: {}\x1b[0m", km.source.node, data,),
content: format!("\x1b[3;32m{}: {}\x1b[0m", km.source.node, data),
})
.await;
let _ = kernel_message_tx
.send(KernelMessage {
id: km.id,
source: Address {
node: our.name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
target: km.rsvp.unwrap_or(km.source),
rsvp: None,
message: Message::Response((
Response {
ipc: Some("delivered".into()),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
} else {

View File

@ -112,9 +112,10 @@ async fn handle_post(
},
allowed_routers: if ip == "localhost" || !info.direct {
vec![
"uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(),
"uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(),
"uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(),
"testnode104.uq".into(),
// "uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(),
// "uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(),
// "uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(),
]
} else {
vec![]