cli_rs: Loop fetching access counts

Summary: Loop fetching access counts and update rendering.

Reviewed By: fanzeyi

Differential Revision: D32109917

fbshipit-source-id: 83eae25bc7938497f2f99cb966e0fd7ce907f017
This commit is contained in:
Grace Ku 2021-11-05 16:21:19 -07:00 committed by Facebook GitHub Bot
parent 62315132a1
commit a324477274
2 changed files with 97 additions and 75 deletions

View File

@ -10,6 +10,7 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.51"
crossterm = { version = "0.20.0", features = ["event-stream"] }
dirs = "2.0"
edenfs-client = { version = "0.1.0", path = "../edenfs-client" }
edenfs-error = { version = "0.1.0", path = "../edenfs-error" }

View File

@ -8,7 +8,9 @@
//! edenfsctl minitop
use async_trait::async_trait;
use crossterm::{cursor, QueueableCommand};
use std::collections::BTreeMap;
use std::io::{stdout, Write};
use std::path::Path;
use std::time::Duration;
use std::time::SystemTime;
@ -47,6 +49,20 @@ fn parse_refresh_rate(arg: &str) -> Duration {
}
const UNKNOWN_COMMAND: &str = "<unknown>";
const COLUMN_TITLES: &[&str] = &[
"TOP PID",
"MOUNT",
"FUSE R",
"FUSE W",
"FUSE COUNT",
"FUSE FETCH",
"MEMORY",
"DISK",
"IMPORTS",
"FUSE TIME",
"FUSE LAST",
"CMD",
];
trait GetAccessCountsResultExt {
fn get_cmd_for_pid(&self, pid: &i32) -> Result<String>;
@ -199,88 +215,93 @@ impl TrackedProcesses {
impl crate::Subcommand for MinitopCmd {
async fn run(&self, instance: EdenFsInstance) -> Result<ExitCode> {
let client = instance.connect(None).await?;
let counts = client
.getAccessCounts(self.refresh_rate.as_secs().try_into().from_err()?)
.await
.from_err()?;
// Update currently tracked processes (and add new ones if they haven't been tracked yet)
let mut tracked_processes = TrackedProcesses::new();
for (mount, accesses) in &counts.accessesByMount {
for (pid, access_counts) in &accesses.accessCountsByPid {
tracked_processes.update_process(
pid,
mount,
counts.get_cmd_for_pid(pid)?,
access_counts.clone(),
accesses.fetchCountsByPid.get(pid).unwrap_or(&0),
)?;
let mut stdout = stdout();
loop {
// Update currently tracked processes (and add new ones if they haven't been tracked yet)
let counts = client
.getAccessCounts(self.refresh_rate.as_secs().try_into().from_err()?)
.await
.from_err()?;
for (mount, accesses) in &counts.accessesByMount {
for (pid, access_counts) in &accesses.accessCountsByPid {
tracked_processes.update_process(
pid,
mount,
counts.get_cmd_for_pid(pid)?,
access_counts.clone(),
accesses.fetchCountsByPid.get(pid).unwrap_or(&0),
)?;
}
for (pid, fetch_counts) in &accesses.fetchCountsByPid {
tracked_processes.update_process(
pid,
mount,
counts.get_cmd_for_pid(pid)?,
AccessCounts::default(),
fetch_counts,
)?;
}
}
for (pid, fetch_counts) in &accesses.fetchCountsByPid {
tracked_processes.update_process(
pid,
mount,
counts.get_cmd_for_pid(pid)?,
AccessCounts::default(),
fetch_counts,
)?;
// Render aggregated processes
stdout.queue(cursor::SavePosition).from_err()?;
stdout
.write(format!("{}\n", COLUMN_TITLES.join("\t")).as_bytes())
.from_err()?;
for aggregated_process in tracked_processes.aggregated_processes() {
stdout
.write(
format!(
"{top_pid}\t \
{mount}\t \
{fuse_reads}\t \
{fuse_writes}\t \
{fuse_total}\t \
{fuse_fetch}\t \
{fuse_memory_cache_imports}\t \
{fuse_disk_cache_imports}\t \
{fuse_backing_store_imports}\t \
{fuse_duration}\t \
{fuse_last_access}\t \
{command}\n",
top_pid = aggregated_process.pid,
mount = aggregated_process.mount,
fuse_reads = aggregated_process.access_counts.fsChannelReads,
fuse_writes = aggregated_process.access_counts.fsChannelWrites,
fuse_total = aggregated_process.access_counts.fsChannelTotal,
fuse_fetch = aggregated_process.fetch_counts,
fuse_memory_cache_imports =
aggregated_process.access_counts.fsChannelMemoryCacheImports,
fuse_disk_cache_imports =
aggregated_process.access_counts.fsChannelDiskCacheImports,
fuse_backing_store_imports = aggregated_process
.access_counts
.fsChannelBackingStoreImports,
fuse_duration = aggregated_process.access_counts.fsChannelDurationNs,
fuse_last_access = aggregated_process
.last_access_time
.elapsed()
.from_err()?
.as_nanos(),
command = aggregated_process.cmd,
)
.as_bytes(),
)
.from_err()?;
}
stdout.queue(cursor::RestorePosition).from_err()?;
stdout.flush().from_err()?;
tokio::time::sleep(self.refresh_rate).await;
}
const COLUMN_TITLES: &[&str] = &[
"TOP PID",
"MOUNT",
"FUSE R",
"FUSE W",
"FUSE COUNT",
"FUSE FETCH",
"MEMORY",
"DISK",
"IMPORTS",
"FUSE TIME",
"FUSE LAST",
"CMD",
];
unreachable!("minitop is unable to start");
println!("{}", COLUMN_TITLES.join("\t"));
for aggregated_process in tracked_processes.aggregated_processes() {
println!(
"{top_pid}\t \
{mount}\t \
{fuse_reads}\t \
{fuse_writes}\t \
{fuse_total}\t \
{fuse_fetch}\t \
{fuse_memory_cache_imports}\t \
{fuse_disk_cache_imports}\t \
{fuse_backing_store_imports}\t \
{fuse_duration}\t \
{fuse_last_access}\t \
{command}",
top_pid = aggregated_process.pid,
mount = aggregated_process.mount,
fuse_reads = aggregated_process.access_counts.fsChannelReads,
fuse_writes = aggregated_process.access_counts.fsChannelWrites,
fuse_total = aggregated_process.access_counts.fsChannelTotal,
fuse_fetch = aggregated_process.fetch_counts,
fuse_memory_cache_imports =
aggregated_process.access_counts.fsChannelMemoryCacheImports,
fuse_disk_cache_imports =
aggregated_process.access_counts.fsChannelDiskCacheImports,
fuse_backing_store_imports = aggregated_process
.access_counts
.fsChannelBackingStoreImports,
fuse_duration = aggregated_process.access_counts.fsChannelDurationNs,
fuse_last_access = aggregated_process
.last_access_time
.elapsed()
.from_err()?
.as_nanos(),
command = aggregated_process.cmd,
)
}
Ok(0)
}
}