http-client: add stats reporting hook

Summary:
Add `with_stats_reporting` to HttpClient. It takes a closure that will be
called with all `Stats` objects generated. We then use this function in
the hg-http crate to integrate with the metrics backend used in Mercurial.

Reviewed By: kulshrax

Differential Revision: D23577869

fbshipit-source-id: 5ac23f00183f3c3d956627a869393cd4b27610d4
This commit is contained in:
Stefan Filip 2020-09-09 17:32:44 -07:00 committed by Facebook GitHub Bot
parent 008d0c82df
commit c1ab6a4e92
6 changed files with 131 additions and 6 deletions

View File

@ -0,0 +1,12 @@
[package]
name = "hg-http"
edition = "2018"
version = "0.1.0"
include = ["src/**/*.rs", "src/bin/cli.rs"]
[lib]
path = "src/lib.rs"
[dependencies]
http-client = { path = "../http-client" }
hg-metrics = { path = "../hg-metrics" }

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
///! The hg-http crate provides common utilities for dealing setting up and
///! managing http requests for the hg application. This crate specifies how
///! a topic should be treated. Topics may include monitoring, request setup,
///! paths, error handling, etc.
use hg_metrics::increment_counter;
use http_client::{HttpClient, Stats};
pub fn http_client(client_id: impl ToString) -> HttpClient {
let client_id = client_id.to_string();
let reporter = move |stats: &Stats| {
bump_counters(&client_id, stats);
};
HttpClient::new().with_stats_reporting(reporter)
}
fn bump_counters(client_id: &str, stats: &Stats) {
let n = |suffix: &'static str| -> String { format!("http/{}/{}", client_id, suffix) };
// TODO: gauges: rx_bytes and tx_bytes; histograms: request_time_ms, response_delay_ms
increment_counter(n("total_rx_bytes"), stats.downloaded);
increment_counter(n("total_tx_bytes"), stats.uploaded);
increment_counter(n("num_requests"), stats.requests);
increment_counter(n("total_request_time_ms"), stats.time.as_millis() as usize);
increment_counter(
n("total_response_delay_ms"),
stats.latency.as_millis() as usize,
)
}

View File

@ -33,5 +33,6 @@ tokio = { version = "=0.2.13", features = ["full"] }
url = "2.1.0"
[dev-dependencies]
crossbeam = "0.7"
mockito = "0.25"
tempdir = "0.3"

View File

@ -7,6 +7,7 @@
use std::convert::{TryFrom, TryInto};
use std::pin::Pin;
use std::sync::Arc;
use curl::easy::Easy2;
use futures::prelude::*;
@ -40,11 +41,31 @@ pub type StatsFuture =
#[derive(Clone)]
pub struct HttpClient {
pool: Pool,
report_stats: Option<Arc<dyn Fn(&Stats) + Send + Sync + 'static>>,
}
impl HttpClient {
pub fn new() -> Self {
Self { pool: Pool::new() }
Self {
pool: Pool::new(),
report_stats: None,
}
}
/// Automatically report stats using the provided function.
///
/// For all functions that return `Stats`, the `report_stats`
/// function will be called with the same `Stats` struct that
/// is returned. `report_stats` will be invoked just before
/// the struct is handed to the caller.
pub fn with_stats_reporting<F>(self, report_stats: F) -> Self
where
F: Fn(&Stats) + Send + Sync + 'static,
{
Self {
report_stats: Some(Arc::new(report_stats)),
..self
}
}
/// Perform multiple HTTP requests concurrently.
@ -86,12 +107,18 @@ impl HttpClient {
driver.add(handle)?;
}
driver.perform(|res| {
let stats = driver.perform(|res| {
let res = res
.map_err(|(_, e)| e.into())
.and_then(|mut easy| Response::try_from(easy.get_mut()));
response_cb(res)
})
})?;
if let Some(report_stats) = &self.report_stats {
report_stats(&stats);
}
Ok(stats)
}
/// Async version of `send` which runs all of the given request concurrently
@ -185,7 +212,14 @@ impl HttpClient {
driver.add(handle)?;
}
driver.perform(report_result_and_drop_receiver)
driver
.perform(report_result_and_drop_receiver)
.map(|stats| {
if let Some(report_stats) = &self.report_stats {
report_stats(&stats);
}
stats
})
}
}
@ -400,4 +434,48 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_report_stats() -> Result<()> {
let server_url = Url::parse(&mockito::server_url())?;
// this is actually used, it changes how mockito behaves
let _mock1 = mock("GET", "/test1")
.with_status(201)
.with_body(&b"body")
.create();
let url = server_url.join("test1")?;
let request = Request::get(url);
let (tx, rx) = crossbeam::channel::unbounded();
let client = HttpClient::new().with_stats_reporting(move |stats| {
tx.send(stats.clone()).expect("send stats over channel")
});
let stats = client.send(vec![request.clone()], |_| Ok(()))?;
assert_eq!(stats, rx.recv()?);
let stats = client.send_with_progress(vec![request.clone()], |_| Ok(()), |_| ())?;
assert_eq!(stats, rx.recv()?);
let (_stream, stats) = client.send_async(vec![request.clone()])?;
let stats = stats.await?;
assert_eq!(stats, rx.recv()?);
let (_stream, stats) = client.send_async_with_progress(vec![request.clone()], |_| ())?;
let stats = stats.await?;
assert_eq!(stats, rx.recv()?);
let my_stream_req = || request.clone().into_streaming(TestReceiver::new());
let stats = client.stream(vec![my_stream_req()])?;
assert_eq!(stats, rx.recv()?);
let stats = client.stream_with_progress(vec![my_stream_req()], |_| ())?;
assert_eq!(stats, rx.recv()?);
Ok(())
}
}

View File

@ -48,7 +48,7 @@ impl fmt::Display for Method {
/// A builder struct for HTTP requests, designed to be
/// a more egonomic API for setting up a curl handle.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Request {
url: Url,
method: Method,

View File

@ -7,7 +7,7 @@
use std::{fmt, time::Duration};
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct Stats {
pub downloaded: usize,
pub uploaded: usize,