fsprobe: add --parallel

Summary:
This option allows to perfrom operations in parallel to test throughput
Using fairly simple approach of sending to first available worker to build parallelism. There is probably some crate that provides multi-consumer queue, but current implementation is likely sufficient for our purposes

Reviewed By: DurhamG

Differential Revision: D29997918

fbshipit-source-id: fe90b88b9c19546fefbb7643c5a8cd5ea8c565aa
This commit is contained in:
Andrey Chursin 2021-08-02 10:26:35 -07:00 committed by Facebook GitHub Bot
parent dc30a70bdd
commit 331b3d81f7

View File

@ -10,30 +10,47 @@ use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Instant;
use structopt::StructOpt;
#[derive(StructOpt)]
struct Cli {
path: PathBuf,
#[structopt(short = "p", long = "parallel")]
parallel: Option<usize>,
}
fn main() {
let args = Cli::from_args();
let plan = ProbePlan::load(&args.path).expect("Failed to load fsprobe plan");
let mut stats = Stats::default();
let stats = Arc::new(Stats::default());
let start = Instant::now();
plan.run(&mut stats);
if let Some(threads) = args.parallel {
plan.run_parallel(&stats, threads);
} else {
plan.run(&stats);
}
let duration = Instant::now() - start;
let rate = rate(stats.bytes as f64 / (duration.as_millis() as f64 / 1000.));
println!("{:?}: {}, {}", duration, stats, rate);
let duration_ms = duration.as_millis() as f64;
let rate = rate(stats.bytes.load(Ordering::Relaxed) as f64 / (duration_ms / 1000.));
let files = stats.files.load(Ordering::Relaxed) as f64;
let lat = duration_ms / files;
let qps = files / duration_ms;
println!(
"lat: {:.2} ms, qps: {:.0}, dur: {:?}, {}, rate {}",
lat, qps, duration, stats, rate
);
}
#[derive(Default)]
struct Stats {
files: u64,
bytes: u64,
errors: u64,
files: AtomicU64,
bytes: AtomicU64,
errors: AtomicU64,
}
struct ProbePlan(Vec<ProbeAction>);
@ -58,9 +75,40 @@ impl ProbePlan {
Ok(Self(actions))
}
pub fn run(self, stats: &mut Stats) {
pub fn run(self, stats: &Arc<Stats>) {
for action in self.0 {
action.run(stats);
action.run(&*stats);
}
}
pub fn run_parallel(self, stats: &Arc<Stats>, thread_count: usize) {
let mut threads = vec![];
let mut senders = vec![];
for _ in 0..thread_count {
let stats = stats.clone();
let (sender, recv) = mpsc::sync_channel::<ProbeAction>(8);
let thread = thread::spawn(move || {
for action in recv {
action.run(&*stats)
}
});
threads.push(thread);
senders.push(sender);
}
let mut idx = 0;
for mut action in self.0 {
loop {
idx = (idx + 1) % senders.len();
match senders[idx].try_send(action) {
Ok(_) => break,
Err(TrySendError::Disconnected(_)) => panic!("Worker terminated"),
Err(TrySendError::Full(ret)) => action = ret,
}
}
}
senders.clear();
for thread in threads {
thread.join().expect("Worker panic");
}
}
}
@ -85,22 +133,22 @@ impl ProbeAction {
}
}
pub fn run(&self, stats: &mut Stats) {
pub fn run(&self, stats: &Stats) {
let r = match self {
Self::Read(path) => Self::read(path, stats),
};
if let Err(err) = r {
stats.errors += 1;
stats.errors.fetch_add(1, Ordering::Relaxed);
eprintln!("{} failed: {}", self, err);
}
}
fn read(path: &Path, stats: &mut Stats) -> Result<()> {
fn read(path: &Path, stats: &Stats) -> Result<()> {
let mut file = File::open(path)?;
let mut v = vec![];
file.read_to_end(&mut v)?;
stats.bytes += v.len() as u64;
stats.files += 1;
stats.bytes.fetch_add(v.len() as u64, Ordering::Relaxed);
stats.files.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
@ -118,7 +166,9 @@ impl fmt::Display for Stats {
write!(
f,
"{} files, {} bytes, {} errors",
self.files, self.bytes, self.errors
self.files.load(Ordering::Relaxed),
self.bytes.load(Ordering::Relaxed),
self.errors.load(Ordering::Relaxed),
)
}
}