diff --git a/src/app/data_collection.rs b/src/app/data_collection.rs index 54a76bb4..5042c6d6 100644 --- a/src/app/data_collection.rs +++ b/src/app/data_collection.rs @@ -23,7 +23,7 @@ fn push_if_valid(result: &Result, vector_to_push: &mut } } -#[derive(Debug, Default, Clone)] +#[derive(Clone, Debug, Default)] pub struct Data { pub list_of_cpu_packages: Vec, pub list_of_io: Vec, @@ -31,10 +31,10 @@ pub struct Data { pub memory: Vec, pub swap: Vec, pub list_of_temperature_sensor: Vec, - pub network: Vec, + pub network: network::NetworkStorage, pub list_of_processes: Vec, pub grouped_list_of_processes: Option>, - pub list_of_disks: Vec, // Only need to keep a list of disks and their data + pub list_of_disks: Vec, } pub struct DataState { @@ -45,9 +45,6 @@ pub struct DataState { prev_pid_stats: HashMap, prev_idle: f64, prev_non_idle: f64, - prev_net_rx_bytes: u64, - prev_net_tx_bytes: u64, - prev_net_access_time: Instant, temperature_type: temperature::TemperatureType, last_clean: Instant, // Last time stale data was cleared use_current_cpu_total: bool, @@ -63,9 +60,6 @@ impl Default for DataState { prev_pid_stats: HashMap::new(), prev_idle: 0_f64, prev_non_idle: 0_f64, - prev_net_rx_bytes: 0, - prev_net_tx_bytes: 0, - prev_net_access_time: Instant::now(), temperature_type: temperature::TemperatureType::Celsius, last_clean: Instant::now(), use_current_cpu_total: false, @@ -97,18 +91,61 @@ impl DataState { let current_instant = std::time::Instant::now(); + // Network + let new_network_data = network::get_network_data( + &self.sys, + &self.data.network.last_collection_time, + &mut self.data.network.total_rx, + &mut self.data.network.total_tx, + ¤t_instant, + ) + .await; + + let joining_points: Option> = + if !self.data.network.data_points.is_empty() { + if let Some(prev_data) = self + .data + .network + .data_points + .get(&self.data.network.last_collection_time) + { + // If not empty, inject joining points + + let rx_diff = new_network_data.rx as f64 - prev_data.0.rx as f64; + let tx_diff = new_network_data.tx as f64 - prev_data.0.tx as f64; + let time_gap = current_instant + .duration_since(self.data.network.last_collection_time) + .as_millis() as f64; + + let mut new_joining_points = Vec::new(); + + for idx in 0..100 { + new_joining_points.push(network::NetworkJoinPoint { + rx: prev_data.0.rx as f64 + rx_diff / 100.0 * idx as f64, + tx: prev_data.0.tx as f64 + tx_diff / 100.0 * idx as f64, + time_offset_milliseconds: time_gap / 100.0 * (100 - idx) as f64, + }); + } + Some(new_joining_points) + } else { + None + } + } else { + None + }; + + // Set values + self.data.network.rx = new_network_data.rx; + self.data.network.tx = new_network_data.tx; + self.data.network.last_collection_time = current_instant; + + // Add new point + self.data + .network + .data_points + .insert(current_instant, (new_network_data, joining_points)); + // What we want to do: For timed data, if there is an error, just do not add. For other data, just don't update! - push_if_valid( - &network::get_network_data( - &self.sys, - &mut self.prev_net_rx_bytes, - &mut self.prev_net_tx_bytes, - &mut self.prev_net_access_time, - ¤t_instant, - ) - .await, - &mut self.data.network, - ); push_if_valid( &cpu::get_cpu_data_list(&self.sys, ¤t_instant), &mut self.data.list_of_cpu_packages, @@ -167,6 +204,8 @@ impl DataState { self.prev_pid_stats.remove(&stale); } + // TODO: [OPT] cleaning stale network + self.data.list_of_cpu_packages = self .data .list_of_cpu_packages @@ -197,16 +236,6 @@ impl DataState { }) .collect::>(); - self.data.network = self - .data - .network - .iter() - .cloned() - .filter(|entry| { - clean_instant.duration_since(entry.instant).as_secs() <= self.stale_max_seconds - }) - .collect::>(); - self.data.list_of_io = self .data .list_of_io diff --git a/src/app/data_collection/network.rs b/src/app/data_collection/network.rs index 1ba7c90f..c9b97de6 100644 --- a/src/app/data_collection/network.rs +++ b/src/app/data_collection/network.rs @@ -1,34 +1,58 @@ use futures::StreamExt; use heim::net; use heim::units::information::byte; +use std::collections::BTreeMap; use std::time::Instant; use sysinfo::{NetworkExt, System, SystemExt}; -#[derive(Debug, Clone)] -/// Note all values are in bytes... -pub struct NetworkData { +#[derive(Clone, Debug)] +pub struct NetworkJoinPoint { + pub rx: f64, + pub tx: f64, + pub time_offset_milliseconds: f64, +} + +#[derive(Clone, Debug)] +pub struct NetworkStorage { + pub data_points: BTreeMap>)>, pub rx: u64, pub tx: u64, pub total_rx: u64, pub total_tx: u64, - pub instant: Instant, + pub last_collection_time: Instant, +} + +impl Default for NetworkStorage { + fn default() -> Self { + NetworkStorage { + data_points: BTreeMap::default(), + rx: 0, + tx: 0, + total_rx: 0, + total_tx: 0, + last_collection_time: Instant::now(), + } + } +} + +#[derive(Clone, Debug)] +/// Note all values are in bytes... +pub struct NetworkData { + pub rx: u64, + pub tx: u64, } pub async fn get_network_data( - sys: &System, prev_net_rx_bytes: &mut u64, prev_net_tx_bytes: &mut u64, - prev_net_access_time: &mut Instant, curr_time: &Instant, -) -> crate::utils::error::Result { + sys: &System, prev_net_access_time: &Instant, prev_net_rx: &mut u64, prev_net_tx: &mut u64, + curr_time: &Instant, +) -> NetworkData { + // FIXME: [WIN] Track current total bytes... also is this accurate? if cfg!(target_os = "windows") { let network_data = sys.get_network(); - - *prev_net_access_time = *curr_time; - Ok(NetworkData { + NetworkData { rx: network_data.get_income(), tx: network_data.get_outcome(), - total_rx: 0, - total_tx: 0, - instant: *prev_net_access_time, - }) + } } else { let mut io_data = net::io_counters(); let mut net_rx: u64 = 0; @@ -40,21 +64,23 @@ pub async fn get_network_data( net_tx += io.bytes_sent().get::(); } } - let cur_time = Instant::now(); - let elapsed_time = cur_time.duration_since(*prev_net_access_time).as_secs_f64(); + let elapsed_time = curr_time + .duration_since(*prev_net_access_time) + .as_secs_f64(); - let rx = ((net_rx - *prev_net_rx_bytes) as f64 / elapsed_time) as u64; - let tx = ((net_tx - *prev_net_tx_bytes) as f64 / elapsed_time) as u64; + if *prev_net_rx == 0 { + *prev_net_rx = net_rx; + } - *prev_net_rx_bytes = net_rx; - *prev_net_tx_bytes = net_tx; - *prev_net_access_time = cur_time; - Ok(NetworkData { - rx, - tx, - total_rx: *prev_net_rx_bytes, - total_tx: *prev_net_tx_bytes, - instant: *prev_net_access_time, - }) + if *prev_net_tx == 0 { + *prev_net_tx = net_tx; + } + + let rx = ((net_rx - *prev_net_rx) as f64 / elapsed_time) as u64; + let tx = ((net_tx - *prev_net_tx) as f64 / elapsed_time) as u64; + + *prev_net_rx = net_rx; + *prev_net_tx = net_tx; + NetworkData { rx, tx } } } diff --git a/src/data_conversion.rs b/src/data_conversion.rs index 5def8bf0..58322d97 100644 --- a/src/data_conversion.rs +++ b/src/data_conversion.rs @@ -399,64 +399,60 @@ pub fn update_network_data_points(app_data: &data_collection::Data) -> Converted } pub fn convert_network_data_points( - network_data: &[data_collection::network::NetworkData], + network_data: &data_collection::network::NetworkStorage, ) -> ConvertedNetworkData { let mut rx: Vec<(f64, f64)> = Vec::new(); let mut tx: Vec<(f64, f64)> = Vec::new(); - for data in network_data { - let current_time = std::time::Instant::now(); + let current_time = network_data.last_collection_time; + for (time, data) in &network_data.data_points { + let time_from_start: f64 = ((TIME_STARTS_FROM as f64 + - current_time.duration_since(*time).as_millis() as f64) + * 10_f64) + .floor(); + + // Insert in joiner points + if let Some(joiners) = &data.1 { + for joiner in joiners { + let offset_time = time_from_start - joiner.time_offset_milliseconds as f64 * 10_f64; + rx.push(( + offset_time, + if joiner.rx > 0.0 { + (joiner.rx).log(2.0) + } else { + 0.0 + }, + )); + + tx.push(( + offset_time, + if joiner.tx > 0.0 { + (joiner.tx).log(2.0) + } else { + 0.0 + }, + )); + } + } + + // Insert in main points let rx_data = ( - ((TIME_STARTS_FROM as f64 - - current_time.duration_since(data.instant).as_millis() as f64) - * 10_f64) - .floor(), - if data.rx > 0 { - (data.rx as f64).log(2.0) + time_from_start, + if data.0.rx > 0 { + (data.0.rx as f64).log(2.0) } else { 0.0 }, ); let tx_data = ( - ((TIME_STARTS_FROM as f64 - - current_time.duration_since(data.instant).as_millis() as f64) - * 10_f64) - .floor(), - if data.tx > 0 { - (data.tx as f64).log(2.0) + time_from_start, + if data.0.tx > 0 { + (data.0.tx as f64).log(2.0) } else { 0.0 }, ); - //debug!("Plotting: {:?} bytes rx, {:?} bytes tx", rx_data, tx_data); - - // Now, inject our joining points... - if !rx.is_empty() { - let previous_element_data = *(rx.last().unwrap()); - for idx in 0..50 { - rx.push(( - previous_element_data.0 - + ((rx_data.0 - previous_element_data.0) / 50.0 * f64::from(idx)), - previous_element_data.1 - + ((rx_data.1 - previous_element_data.1) / 50.0 * f64::from(idx)), - )); - } - } - - // Now, inject our joining points... - if !tx.is_empty() { - let previous_element_data = *(tx.last().unwrap()); - for idx in 0..50 { - tx.push(( - previous_element_data.0 - + ((tx_data.0 - previous_element_data.0) / 50.0 * f64::from(idx)), - previous_element_data.1 - + ((tx_data.1 - previous_element_data.1) / 50.0 * f64::from(idx)), - )); - } - } - rx.push(rx_data); tx.push(tx_data); } @@ -466,13 +462,8 @@ pub fn convert_network_data_points( let total_tx_converted_result: (f64, String); let tx_converted_result: (f64, String); - if let Some(last_num_bytes_entry) = network_data.last() { - rx_converted_result = get_exact_byte_values(last_num_bytes_entry.rx, false); - total_rx_converted_result = get_exact_byte_values(last_num_bytes_entry.total_rx, false) - } else { - rx_converted_result = get_exact_byte_values(0, false); - total_rx_converted_result = get_exact_byte_values(0, false); - } + rx_converted_result = get_exact_byte_values(network_data.rx, false); + total_rx_converted_result = get_exact_byte_values(network_data.total_rx, false); let rx_display = format!("{:.*}{}", 1, rx_converted_result.0, rx_converted_result.1); let total_rx_display = if cfg!(not(target_os = "windows")) { format!( @@ -483,13 +474,8 @@ pub fn convert_network_data_points( "N/A".to_string() }; - if let Some(last_num_bytes_entry) = network_data.last() { - tx_converted_result = get_exact_byte_values(last_num_bytes_entry.tx, false); - total_tx_converted_result = get_exact_byte_values(last_num_bytes_entry.total_tx, false); - } else { - tx_converted_result = get_exact_byte_values(0, false); - total_tx_converted_result = get_exact_byte_values(0, false); - } + tx_converted_result = get_exact_byte_values(network_data.tx, false); + total_tx_converted_result = get_exact_byte_values(network_data.total_tx, false); let tx_display = format!("{:.*}{}", 1, tx_converted_result.0, tx_converted_result.1); let total_tx_display = if cfg!(not(target_os = "windows")) { format!( diff --git a/src/main.rs b/src/main.rs index 47b55fb6..043ddcf5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -199,7 +199,7 @@ fn main() -> error::Result<()> { } futures::executor::block_on(data_state.update_data()); tx.send(Event::Update(Box::from(data_state.data.clone()))) - .unwrap(); + .unwrap(); // TODO: [UNWRAP] Might be required, it's in a closure and idk how to deal with it if first_run { // Fix for if you set a really long time for update periods (and just gives a faster first value)