2024-07-10 18:36:22 +03:00
|
|
|
use std::fmt::Debug;
|
|
|
|
|
2022-03-05 00:32:28 +03:00
|
|
|
use clock::ReplicaId;
|
2024-07-10 18:36:22 +03:00
|
|
|
use collections::{BTreeMap, HashSet};
|
2022-03-05 00:32:28 +03:00
|
|
|
|
|
|
|
pub struct Network<T: Clone, R: rand::Rng> {
|
2024-07-10 18:36:22 +03:00
|
|
|
inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
|
|
|
|
disconnected_peers: HashSet<ReplicaId>,
|
2022-03-05 00:32:28 +03:00
|
|
|
rng: R,
|
|
|
|
}
|
|
|
|
|
2024-07-10 18:36:22 +03:00
|
|
|
#[derive(Clone, Debug)]
|
2022-03-05 00:32:28 +03:00
|
|
|
struct Envelope<T: Clone> {
|
|
|
|
message: T,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Clone, R: rand::Rng> Network<T, R> {
|
|
|
|
pub fn new(rng: R) -> Self {
|
|
|
|
Network {
|
2024-07-10 18:36:22 +03:00
|
|
|
inboxes: BTreeMap::default(),
|
|
|
|
disconnected_peers: HashSet::default(),
|
2022-03-05 00:32:28 +03:00
|
|
|
rng,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn add_peer(&mut self, id: ReplicaId) {
|
|
|
|
self.inboxes.insert(id, Vec::new());
|
|
|
|
}
|
|
|
|
|
2024-07-10 18:36:22 +03:00
|
|
|
pub fn disconnect_peer(&mut self, id: ReplicaId) {
|
|
|
|
self.disconnected_peers.insert(id);
|
|
|
|
self.inboxes.get_mut(&id).unwrap().clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn reconnect_peer(&mut self, id: ReplicaId, replicate_from: ReplicaId) {
|
|
|
|
assert!(self.disconnected_peers.remove(&id));
|
|
|
|
self.replicate(replicate_from, id);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn is_disconnected(&self, id: ReplicaId) -> bool {
|
|
|
|
self.disconnected_peers.contains(&id)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn contains_disconnected_peers(&self) -> bool {
|
|
|
|
!self.disconnected_peers.is_empty()
|
|
|
|
}
|
|
|
|
|
2022-03-05 00:32:28 +03:00
|
|
|
pub fn replicate(&mut self, old_replica_id: ReplicaId, new_replica_id: ReplicaId) {
|
|
|
|
self.inboxes
|
|
|
|
.insert(new_replica_id, self.inboxes[&old_replica_id].clone());
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn is_idle(&self) -> bool {
|
|
|
|
self.inboxes.values().all(|i| i.is_empty())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
|
2024-07-10 18:36:22 +03:00
|
|
|
// Drop messages from disconnected peers.
|
|
|
|
if self.disconnected_peers.contains(&sender) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2022-03-05 00:32:28 +03:00
|
|
|
for (replica, inbox) in self.inboxes.iter_mut() {
|
2024-07-10 18:36:22 +03:00
|
|
|
if *replica != sender && !self.disconnected_peers.contains(replica) {
|
2022-03-05 00:32:28 +03:00
|
|
|
for message in &messages {
|
|
|
|
// Insert one or more duplicates of this message, potentially *before* the previous
|
|
|
|
// message sent by this peer to simulate out-of-order delivery.
|
|
|
|
for _ in 0..self.rng.gen_range(1..4) {
|
|
|
|
let insertion_index = self.rng.gen_range(0..inbox.len() + 1);
|
|
|
|
inbox.insert(
|
|
|
|
insertion_index,
|
|
|
|
Envelope {
|
|
|
|
message: message.clone(),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
|
|
|
|
!self.inboxes[&receiver].is_empty()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
|
|
|
|
let inbox = self.inboxes.get_mut(&receiver).unwrap();
|
|
|
|
let count = self.rng.gen_range(0..inbox.len() + 1);
|
|
|
|
inbox
|
|
|
|
.drain(0..count)
|
|
|
|
.map(|envelope| envelope.message)
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
}
|