Slightly optimized how networking is... I think.

This commit is contained in:
ClementTsang 2020-01-20 01:28:30 -05:00
parent ae6e27d25a
commit 840b0cccc8
4 changed files with 157 additions and 116 deletions

View File

@ -23,7 +23,7 @@ fn push_if_valid<T: std::clone::Clone>(result: &Result<T>, vector_to_push: &mut
}
}
#[derive(Debug, Default, Clone)]
#[derive(Clone, Debug, Default)]
pub struct Data {
pub list_of_cpu_packages: Vec<cpu::CPUPackage>,
pub list_of_io: Vec<disks::IOPackage>,
@ -31,10 +31,10 @@ pub struct Data {
pub memory: Vec<mem::MemData>,
pub swap: Vec<mem::MemData>,
pub list_of_temperature_sensor: Vec<temperature::TempData>,
pub network: Vec<network::NetworkData>,
pub network: network::NetworkStorage,
pub list_of_processes: Vec<processes::ProcessData>,
pub grouped_list_of_processes: Option<Vec<processes::ProcessData>>,
pub list_of_disks: Vec<disks::DiskData>, // Only need to keep a list of disks and their data
pub list_of_disks: Vec<disks::DiskData>,
}
pub struct DataState {
@ -45,9 +45,6 @@ pub struct DataState {
prev_pid_stats: HashMap<String, (f64, Instant)>,
prev_idle: f64,
prev_non_idle: f64,
prev_net_rx_bytes: u64,
prev_net_tx_bytes: u64,
prev_net_access_time: Instant,
temperature_type: temperature::TemperatureType,
last_clean: Instant, // Last time stale data was cleared
use_current_cpu_total: bool,
@ -63,9 +60,6 @@ impl Default for DataState {
prev_pid_stats: HashMap::new(),
prev_idle: 0_f64,
prev_non_idle: 0_f64,
prev_net_rx_bytes: 0,
prev_net_tx_bytes: 0,
prev_net_access_time: Instant::now(),
temperature_type: temperature::TemperatureType::Celsius,
last_clean: Instant::now(),
use_current_cpu_total: false,
@ -97,18 +91,61 @@ impl DataState {
let current_instant = std::time::Instant::now();
// Network
let new_network_data = network::get_network_data(
&self.sys,
&self.data.network.last_collection_time,
&mut self.data.network.total_rx,
&mut self.data.network.total_tx,
&current_instant,
)
.await;
let joining_points: Option<Vec<network::NetworkJoinPoint>> =
if !self.data.network.data_points.is_empty() {
if let Some(prev_data) = self
.data
.network
.data_points
.get(&self.data.network.last_collection_time)
{
// If not empty, inject joining points
let rx_diff = new_network_data.rx as f64 - prev_data.0.rx as f64;
let tx_diff = new_network_data.tx as f64 - prev_data.0.tx as f64;
let time_gap = current_instant
.duration_since(self.data.network.last_collection_time)
.as_millis() as f64;
let mut new_joining_points = Vec::new();
for idx in 0..100 {
new_joining_points.push(network::NetworkJoinPoint {
rx: prev_data.0.rx as f64 + rx_diff / 100.0 * idx as f64,
tx: prev_data.0.tx as f64 + tx_diff / 100.0 * idx as f64,
time_offset_milliseconds: time_gap / 100.0 * (100 - idx) as f64,
});
}
Some(new_joining_points)
} else {
None
}
} else {
None
};
// Set values
self.data.network.rx = new_network_data.rx;
self.data.network.tx = new_network_data.tx;
self.data.network.last_collection_time = current_instant;
// Add new point
self.data
.network
.data_points
.insert(current_instant, (new_network_data, joining_points));
// What we want to do: For timed data, if there is an error, just do not add. For other data, just don't update!
push_if_valid(
&network::get_network_data(
&self.sys,
&mut self.prev_net_rx_bytes,
&mut self.prev_net_tx_bytes,
&mut self.prev_net_access_time,
&current_instant,
)
.await,
&mut self.data.network,
);
push_if_valid(
&cpu::get_cpu_data_list(&self.sys, &current_instant),
&mut self.data.list_of_cpu_packages,
@ -167,6 +204,8 @@ impl DataState {
self.prev_pid_stats.remove(&stale);
}
// TODO: [OPT] cleaning stale network
self.data.list_of_cpu_packages = self
.data
.list_of_cpu_packages
@ -197,16 +236,6 @@ impl DataState {
})
.collect::<Vec<_>>();
self.data.network = self
.data
.network
.iter()
.cloned()
.filter(|entry| {
clean_instant.duration_since(entry.instant).as_secs() <= self.stale_max_seconds
})
.collect::<Vec<_>>();
self.data.list_of_io = self
.data
.list_of_io

View File

@ -1,34 +1,58 @@
use futures::StreamExt;
use heim::net;
use heim::units::information::byte;
use std::collections::BTreeMap;
use std::time::Instant;
use sysinfo::{NetworkExt, System, SystemExt};
#[derive(Debug, Clone)]
/// Note all values are in bytes...
pub struct NetworkData {
#[derive(Clone, Debug)]
pub struct NetworkJoinPoint {
pub rx: f64,
pub tx: f64,
pub time_offset_milliseconds: f64,
}
#[derive(Clone, Debug)]
pub struct NetworkStorage {
pub data_points: BTreeMap<Instant, (NetworkData, Option<Vec<NetworkJoinPoint>>)>,
pub rx: u64,
pub tx: u64,
pub total_rx: u64,
pub total_tx: u64,
pub instant: Instant,
pub last_collection_time: Instant,
}
impl Default for NetworkStorage {
fn default() -> Self {
NetworkStorage {
data_points: BTreeMap::default(),
rx: 0,
tx: 0,
total_rx: 0,
total_tx: 0,
last_collection_time: Instant::now(),
}
}
}
#[derive(Clone, Debug)]
/// Note all values are in bytes...
pub struct NetworkData {
pub rx: u64,
pub tx: u64,
}
pub async fn get_network_data(
sys: &System, prev_net_rx_bytes: &mut u64, prev_net_tx_bytes: &mut u64,
prev_net_access_time: &mut Instant, curr_time: &Instant,
) -> crate::utils::error::Result<NetworkData> {
sys: &System, prev_net_access_time: &Instant, prev_net_rx: &mut u64, prev_net_tx: &mut u64,
curr_time: &Instant,
) -> NetworkData {
// FIXME: [WIN] Track current total bytes... also is this accurate?
if cfg!(target_os = "windows") {
let network_data = sys.get_network();
*prev_net_access_time = *curr_time;
Ok(NetworkData {
NetworkData {
rx: network_data.get_income(),
tx: network_data.get_outcome(),
total_rx: 0,
total_tx: 0,
instant: *prev_net_access_time,
})
}
} else {
let mut io_data = net::io_counters();
let mut net_rx: u64 = 0;
@ -40,21 +64,23 @@ pub async fn get_network_data(
net_tx += io.bytes_sent().get::<byte>();
}
}
let cur_time = Instant::now();
let elapsed_time = cur_time.duration_since(*prev_net_access_time).as_secs_f64();
let elapsed_time = curr_time
.duration_since(*prev_net_access_time)
.as_secs_f64();
let rx = ((net_rx - *prev_net_rx_bytes) as f64 / elapsed_time) as u64;
let tx = ((net_tx - *prev_net_tx_bytes) as f64 / elapsed_time) as u64;
if *prev_net_rx == 0 {
*prev_net_rx = net_rx;
}
*prev_net_rx_bytes = net_rx;
*prev_net_tx_bytes = net_tx;
*prev_net_access_time = cur_time;
Ok(NetworkData {
rx,
tx,
total_rx: *prev_net_rx_bytes,
total_tx: *prev_net_tx_bytes,
instant: *prev_net_access_time,
})
if *prev_net_tx == 0 {
*prev_net_tx = net_tx;
}
let rx = ((net_rx - *prev_net_rx) as f64 / elapsed_time) as u64;
let tx = ((net_tx - *prev_net_tx) as f64 / elapsed_time) as u64;
*prev_net_rx = net_rx;
*prev_net_tx = net_tx;
NetworkData { rx, tx }
}
}

View File

@ -399,64 +399,60 @@ pub fn update_network_data_points(app_data: &data_collection::Data) -> Converted
}
pub fn convert_network_data_points(
network_data: &[data_collection::network::NetworkData],
network_data: &data_collection::network::NetworkStorage,
) -> ConvertedNetworkData {
let mut rx: Vec<(f64, f64)> = Vec::new();
let mut tx: Vec<(f64, f64)> = Vec::new();
for data in network_data {
let current_time = std::time::Instant::now();
let current_time = network_data.last_collection_time;
for (time, data) in &network_data.data_points {
let time_from_start: f64 = ((TIME_STARTS_FROM as f64
- current_time.duration_since(*time).as_millis() as f64)
* 10_f64)
.floor();
// Insert in joiner points
if let Some(joiners) = &data.1 {
for joiner in joiners {
let offset_time = time_from_start - joiner.time_offset_milliseconds as f64 * 10_f64;
rx.push((
offset_time,
if joiner.rx > 0.0 {
(joiner.rx).log(2.0)
} else {
0.0
},
));
tx.push((
offset_time,
if joiner.tx > 0.0 {
(joiner.tx).log(2.0)
} else {
0.0
},
));
}
}
// Insert in main points
let rx_data = (
((TIME_STARTS_FROM as f64
- current_time.duration_since(data.instant).as_millis() as f64)
* 10_f64)
.floor(),
if data.rx > 0 {
(data.rx as f64).log(2.0)
time_from_start,
if data.0.rx > 0 {
(data.0.rx as f64).log(2.0)
} else {
0.0
},
);
let tx_data = (
((TIME_STARTS_FROM as f64
- current_time.duration_since(data.instant).as_millis() as f64)
* 10_f64)
.floor(),
if data.tx > 0 {
(data.tx as f64).log(2.0)
time_from_start,
if data.0.tx > 0 {
(data.0.tx as f64).log(2.0)
} else {
0.0
},
);
//debug!("Plotting: {:?} bytes rx, {:?} bytes tx", rx_data, tx_data);
// Now, inject our joining points...
if !rx.is_empty() {
let previous_element_data = *(rx.last().unwrap());
for idx in 0..50 {
rx.push((
previous_element_data.0
+ ((rx_data.0 - previous_element_data.0) / 50.0 * f64::from(idx)),
previous_element_data.1
+ ((rx_data.1 - previous_element_data.1) / 50.0 * f64::from(idx)),
));
}
}
// Now, inject our joining points...
if !tx.is_empty() {
let previous_element_data = *(tx.last().unwrap());
for idx in 0..50 {
tx.push((
previous_element_data.0
+ ((tx_data.0 - previous_element_data.0) / 50.0 * f64::from(idx)),
previous_element_data.1
+ ((tx_data.1 - previous_element_data.1) / 50.0 * f64::from(idx)),
));
}
}
rx.push(rx_data);
tx.push(tx_data);
}
@ -466,13 +462,8 @@ pub fn convert_network_data_points(
let total_tx_converted_result: (f64, String);
let tx_converted_result: (f64, String);
if let Some(last_num_bytes_entry) = network_data.last() {
rx_converted_result = get_exact_byte_values(last_num_bytes_entry.rx, false);
total_rx_converted_result = get_exact_byte_values(last_num_bytes_entry.total_rx, false)
} else {
rx_converted_result = get_exact_byte_values(0, false);
total_rx_converted_result = get_exact_byte_values(0, false);
}
rx_converted_result = get_exact_byte_values(network_data.rx, false);
total_rx_converted_result = get_exact_byte_values(network_data.total_rx, false);
let rx_display = format!("{:.*}{}", 1, rx_converted_result.0, rx_converted_result.1);
let total_rx_display = if cfg!(not(target_os = "windows")) {
format!(
@ -483,13 +474,8 @@ pub fn convert_network_data_points(
"N/A".to_string()
};
if let Some(last_num_bytes_entry) = network_data.last() {
tx_converted_result = get_exact_byte_values(last_num_bytes_entry.tx, false);
total_tx_converted_result = get_exact_byte_values(last_num_bytes_entry.total_tx, false);
} else {
tx_converted_result = get_exact_byte_values(0, false);
total_tx_converted_result = get_exact_byte_values(0, false);
}
tx_converted_result = get_exact_byte_values(network_data.tx, false);
total_tx_converted_result = get_exact_byte_values(network_data.total_tx, false);
let tx_display = format!("{:.*}{}", 1, tx_converted_result.0, tx_converted_result.1);
let total_tx_display = if cfg!(not(target_os = "windows")) {
format!(

View File

@ -199,7 +199,7 @@ fn main() -> error::Result<()> {
}
futures::executor::block_on(data_state.update_data());
tx.send(Event::Update(Box::from(data_state.data.clone())))
.unwrap();
.unwrap(); // TODO: [UNWRAP] Might be required, it's in a closure and idk how to deal with it
if first_run {
// Fix for if you set a really long time for update periods (and just gives a faster first value)