edenapi: improve progress reporting

Summary:
This diff adds a new progress reporting framework to the Eden API crate and uses it to power progress bars for HTTP file downloads in Mercurial.

The new `ProgressManager` type is designed to aggregate progress values across multiple concurrent HTTP transfers. The API is currently designed to integrate well with libcurl's progress callback API, allowing all of the curl handles within a curl multi session to concurrently report their progress.

This progress can then be reported (in aggregate) to a user-provided callback. In most cases, this callback will be a Rust wrapper around a callback provided by the Python code. The `EdenAPI` trait and FFI bindings have been updated accordingly to allow optionally passing in a callback for long-running operations.

Lastly, in `remotefilelog`'s Python code, the callback is specified as a Python closure that simply updates the progress bar.

Reviewed By: quark-zju

Differential Revision: D15179983

fbshipit-source-id: ee677b71beff730f91aafe0364124f7ea0671387
This commit is contained in:
Arun Kulshreshtha 2019-05-02 14:16:25 -07:00 committed by Facebook Github Bot
parent 398d8bcc1f
commit 60f8a0938f
9 changed files with 324 additions and 46 deletions

View File

@ -768,27 +768,47 @@ class fileserverclient(object):
# will result in a type error, so convert them here.
fileids = [tuple(i) for i in fileids]
if edenapi.debug(self.ui):
self.ui.warn(_("fetching %d files over HTTP\n") % len(fileids))
if fetchdata:
with progress.spinner(self.ui, _("Fetching file content over HTTP")):
self.ui.metrics.gauge("http_getfiles_revs", len(fileids))
self.ui.metrics.gauge("http_getfiles_calls", 1)
datapackpath = self.repo.edenapi.get_files(fileids)
else:
datapackpath = None
if fetchhistory:
with progress.spinner(self.ui, _("Fetching file history over HTTP")):
self.ui.metrics.gauge("http_gethistory_revs", len(fileids))
self.ui.metrics.gauge("http_gethistory_calls", 1)
histpackpath = self.repo.edenapi.get_history(fileids)
else:
histpackpath = None
datapackpath = self._httpfetchdata(fileids) if fetchdata else None
histpackpath = self._httpfetchhistory(fileids) if fetchhistory else None
return datapackpath, histpackpath
def _httpfetchdata(self, fileids):
"""Fetch file data over HTTP using the Eden API"""
if edenapi.debug(self.ui):
self.ui.warn(_("fetching data for %d files over HTTP\n") % len(fileids))
with progress.bar(
self.ui, _("Fetching file content over HTTP"), start=None, unit="bytes"
) as prog:
self.ui.metrics.gauge("http_getfiles_revs", len(fileids))
self.ui.metrics.gauge("http_getfiles_calls", 1)
def progcallback(dl, dlt, ul, ult):
if dl > 0:
prog._total = dlt
prog.value = dl
return self.repo.edenapi.get_files(fileids, progcallback)
def _httpfetchhistory(self, fileids, depth=None):
"""Fetch file history over HTTP using the Eden API"""
if edenapi.debug(self.ui):
self.ui.warn(_("fetching history for %d files over HTTP\n") % len(fileids))
with progress.bar(
self.ui, _("Fetching file history over HTTP"), start=None, unit="bytes"
) as prog:
self.ui.metrics.gauge("http_gethistory_revs", len(fileids))
self.ui.metrics.gauge("http_gethistory_calls", 1)
def progcallback(dl, dlt, ul, ult):
if dl > 0:
prog._total = dlt
prog.value = dl
return self.repo.edenapi.get_history(fileids, depth, progcallback)
def connect(self):
if self.cacheprocess:
options = ""

View File

@ -11,7 +11,7 @@ use encoding::{local_bytes_to_path, path_to_local_bytes};
use failure::format_err;
use log;
use edenapi::{Config, EdenApi, EdenApiCurlClient, EdenApiHyperClient};
use edenapi::{Config, EdenApi, EdenApiCurlClient, EdenApiHyperClient, ProgressFn, ProgressStats};
use types::{Key, Node, RepoPath};
pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
@ -84,14 +84,15 @@ py_class!(class client |py| {
self.inner(py).hostname().map_pyerr::<exc::RuntimeError>(py)
}
def get_files(&self, keys: Vec<(PyBytes, PyBytes)>) -> PyResult<PyBytes> {
def get_files(&self, keys: Vec<(PyBytes, PyBytes)>, progress_fn: Option<PyObject> = None) -> PyResult<PyBytes> {
let keys = keys.into_iter()
.map(|(path, node)| make_key(py, &path, &node))
.collect::<PyResult<Vec<Key>>>()?;
let client = self.inner(py);
let progress_fn = progress_fn.map(wrap_callback);
let out_path = py.allow_threads(move || {
client.get_files(keys)
client.get_files(keys, progress_fn)
}).map_pyerr::<exc::RuntimeError>(py)?;
let out_path = path_to_local_bytes(&out_path)
@ -103,15 +104,17 @@ py_class!(class client |py| {
def get_history(
&self,
keys: Vec<(PyBytes, PyBytes)>,
depth: Option<u32> = None
depth: Option<u32> = None,
progress_cb: Option<PyObject> = None
) -> PyResult<PyBytes> {
let keys = keys.into_iter()
.map(|(path, node)| make_key(py, &path, &node))
.collect::<PyResult<Vec<Key>>>()?;
let client = self.inner(py);
let progress_cb = progress_cb.map(wrap_callback);
let out_path = py.allow_threads(move || {
client.get_history(keys, depth)
client.get_history(keys, depth, progress_cb)
}).map_pyerr::<exc::RuntimeError>(py)?;
let out_path = path_to_local_bytes(&out_path)
@ -129,3 +132,11 @@ fn make_key(py: Python, path: &PyBytes, node: &PyBytes) -> PyResult<Key> {
let node = Node::from_str(node).map_pyerr::<exc::RuntimeError>(py)?;
Ok(Key::new(path, node))
}
fn wrap_callback(callback: PyObject) -> ProgressFn {
Box::new(move |stats: ProgressStats| {
let gil = Python::acquire_gil();
let py = gil.python();
let _ = callback.call(py, stats.as_tuple(), None);
})
}

View File

@ -6,6 +6,8 @@ use failure::Fallible;
use types::Key;
use crate::progress::ProgressFn;
pub trait EdenApi: Send + Sync {
/// Hit the API server's /health_check endpoint.
/// Returns Ok(()) if the expected response is received, or an Error otherwise
@ -17,17 +19,22 @@ pub trait EdenApi: Send + Sync {
/// Fetch the content of the specified files from the API server and write
/// them to a datapack in the configured cache directory. Returns the path
/// of the resulting packfile.
/// of the resulting packfile. Optionally takes a callback to report progress.
///
/// Note that the keys are passed in as a `Vec` rather than using `IntoIterator`
/// in order to keep this trait object-safe.
fn get_files(&self, keys: Vec<Key>) -> Fallible<PathBuf>;
fn get_files(&self, keys: Vec<Key>, progress: Option<ProgressFn>) -> Fallible<PathBuf>;
/// Fetch the history of the specified files from the API server and write
/// them to a historypack in the configured cache directory. Returns the path
/// of the resulting packfile.
/// of the resulting packfile. Optionally takes a callback to report progress.
///
/// Note that the keys are passed in as a `Vec` rather than using `IntoIterator`
/// in order to keep this trait object-safe.
fn get_history(&self, keys: Vec<Key>, max_depth: Option<u32>) -> Fallible<PathBuf>;
fn get_history(
&self,
keys: Vec<Key>,
max_depth: Option<u32>,
progress: Option<ProgressFn>,
) -> Fallible<PathBuf>;
}

View File

@ -23,6 +23,7 @@ use types::{FileDataRequest, FileDataResponse, FileHistoryRequest, FileHistoryRe
use crate::api::EdenApi;
use crate::config::{ClientCreds, Config};
use crate::packs::{write_datapack, write_historypack};
use crate::progress::{ProgressFn, ProgressHandle, ProgressManager, ProgressStats};
mod driver;
@ -85,7 +86,8 @@ impl EdenApiCurlClient {
impl EdenApi for EdenApiCurlClient {
fn health_check(&self) -> Fallible<()> {
let mut handle = self.easy()?;
let handler = Collector::new();
let mut handle = self.easy(handler)?;
let url = self.base_url.join(paths::HEALTH_CHECK)?;
handle.url(url.as_str())?;
handle.get(true)?;
@ -94,7 +96,7 @@ impl EdenApi for EdenApiCurlClient {
let code = handle.response_code()?;
ensure!(code == 200, "Received HTTP status code {}", code);
let response = String::from_utf8_lossy(&handle.get_ref().0);
let response = String::from_utf8_lossy(&handle.get_ref().data());
ensure!(
response == "I_AM_ALIVE",
"Unexpected response: {:?}",
@ -105,7 +107,8 @@ impl EdenApi for EdenApiCurlClient {
}
fn hostname(&self) -> Fallible<String> {
let mut handle = self.easy()?;
let handler = Collector::new();
let mut handle = self.easy(handler)?;
let url = self.base_url.join(paths::HOSTNAME)?;
handle.url(url.as_str())?;
handle.get(true)?;
@ -114,11 +117,11 @@ impl EdenApi for EdenApiCurlClient {
let code = handle.response_code()?;
ensure!(code == 200, "Received HTTP status code {}", code);
let response = String::from_utf8(handle.get_ref().0.to_vec())?;
let response = String::from_utf8(handle.get_ref().data().to_vec())?;
Ok(response)
}
fn get_files(&self, keys: Vec<Key>) -> Fallible<PathBuf> {
fn get_files(&self, keys: Vec<Key>, progress: Option<ProgressFn>) -> Fallible<PathBuf> {
log::debug!("Fetching {} files", keys.len());
let url = self.repo_base_url()?.join(paths::DATA)?;
@ -133,7 +136,7 @@ impl EdenApi for EdenApiCurlClient {
keys: batch.into_iter().collect(),
});
let responses: Vec<FileDataResponse> = self.multi_request(&url, requests)?;
let responses: Vec<FileDataResponse> = self.multi_request(&url, requests, progress)?;
log::debug!(
"Received {} responses with {} total entries",
@ -158,7 +161,12 @@ impl EdenApi for EdenApiCurlClient {
write_datapack(cache_path, files)
}
fn get_history(&self, keys: Vec<Key>, max_depth: Option<u32>) -> Fallible<PathBuf> {
fn get_history(
&self,
keys: Vec<Key>,
max_depth: Option<u32>,
progress: Option<ProgressFn>,
) -> Fallible<PathBuf> {
log::debug!("Fetching {} files", keys.len());
let url = self.repo_base_url()?.join(paths::HISTORY)?;
@ -174,7 +182,7 @@ impl EdenApi for EdenApiCurlClient {
depth: max_depth,
});
let responses: Vec<FileHistoryResponse> = self.multi_request(&url, requests)?;
let responses: Vec<FileHistoryResponse> = self.multi_request(&url, requests, progress)?;
log::debug!(
"Received {} responses with {} total entries",
@ -202,20 +210,26 @@ impl EdenApiCurlClient {
}
/// Configure a new curl::Easy2 handle using this client's settings.
fn easy(&self) -> Fallible<Easy2<Collector>> {
let mut handle = Easy2::new(Collector::new());
fn easy<H: Handler>(&self, handler: H) -> Fallible<Easy2<H>> {
let mut handle = Easy2::new(handler);
if let Some(ClientCreds { ref certs, ref key }) = &self.creds {
handle.ssl_cert(certs)?;
handle.ssl_key(key)?;
}
handle.http_version(HttpVersion::V2)?;
handle.progress(true)?;
Ok(handle)
}
/// Send multiple concurrent POST requests using the given requests as the
/// JSON payload of each respective request. Assumes that the responses are
/// CBOR encoded, and automatically deserializes and returns them.
fn multi_request<I, T, R>(&self, url: &Url, requests: I) -> Fallible<Vec<T>>
fn multi_request<I, T, R>(
&self,
url: &Url,
requests: I,
progress_cb: Option<ProgressFn>,
) -> Fallible<Vec<T>>
where
R: Serialize,
T: DeserializeOwned,
@ -224,13 +238,20 @@ impl EdenApiCurlClient {
let requests = requests.into_iter().collect::<Vec<_>>();
let num_requests = requests.len();
let mut progress = ProgressManager::with_capacity(num_requests);
let mut driver = MultiDriver::with_capacity(num_requests);
for request in requests {
let mut easy = self.easy()?;
let handle = progress.register();
let handler = Collector::with_progress(handle);
let mut easy = self.easy(handler)?;
prepare_cbor_post(&mut easy, &url, &request)?;
driver.add(easy)?;
}
progress.set_callback(progress_cb);
driver.set_progress_manager(progress);
log::debug!("Performing {} requests", num_requests);
let start = Instant::now();
let handles = driver.perform(true)?.into_result()?;
@ -239,7 +260,7 @@ impl EdenApiCurlClient {
let mut responses = Vec::with_capacity(handles.len());
let mut total_bytes = 0;
for easy in handles {
let data = &easy.get_ref().0;
let data = &easy.get_ref().data();
total_bytes += data.len();
let response = serde_cbor::from_slice::<T>(data)?;
@ -252,20 +273,48 @@ impl EdenApiCurlClient {
}
/// Simple Handler that just writes all received data to an internal buffer.
#[derive(Default)]
struct Collector(Vec<u8>);
struct Collector {
data: Vec<u8>,
progress: Option<ProgressHandle>,
}
impl Collector {
fn new() -> Self {
Default::default()
Self {
data: Vec::new(),
progress: None,
}
}
fn with_progress(progress: ProgressHandle) -> Self {
Self {
data: Vec::new(),
progress: Some(progress),
}
}
fn data(&self) -> &[u8] {
&self.data
}
}
impl Handler for Collector {
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
self.0.extend_from_slice(data);
self.data.extend_from_slice(data);
Ok(data.len())
}
fn progress(&mut self, dltotal: f64, dlnow: f64, ultotal: f64, ulnow: f64) -> bool {
if let Some(ref progress) = self.progress {
let dltotal = dltotal as u64;
let dlnow = dlnow as u64;
let ultotal = ultotal as u64;
let ulnow = ulnow as u64;
let stats = ProgressStats::new(dlnow, ulnow, dltotal, ultotal);
progress.update(stats);
}
true
}
}
/// Configure the given Easy2 handle to perform a POST request.

View File

@ -9,6 +9,8 @@ use curl::{
};
use failure::{err_msg, Fallible};
use crate::progress::ProgressManager;
/// Timeout for a single iteration of waiting for activity
/// on any active transfer in a curl::Multi session.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@ -41,6 +43,7 @@ impl<H> MultiDriverResult<H> {
pub struct MultiDriver<H> {
multi: Multi,
handles: Vec<Easy2Handle<H>>,
progress: Option<ProgressManager>,
}
impl<H: Handler> MultiDriver<H> {
@ -48,9 +51,14 @@ impl<H: Handler> MultiDriver<H> {
Self {
multi: Multi::new(),
handles: Vec::with_capacity(capacity),
progress: None,
}
}
pub fn set_progress_manager(&mut self, progress: ProgressManager) {
self.progress = Some(progress);
}
/// Add an Easy2 handle to the Multi stack.
pub fn add(&mut self, easy: Easy2<H>) -> Fallible<()> {
// Assign a token to this Easy2 handle so we can correlate messages
@ -102,9 +110,13 @@ impl<H: Handler> MultiDriver<H> {
in_progress = self.multi.perform()? as usize;
// Check for messages; a message indicates a transfer completed (successfully or not).
let mut should_report_progress = false;
self.multi.messages(|msg| {
let token = msg.token().unwrap();
log::trace!("Got message for transfer {}", token);
should_report_progress = true;
match msg.result() {
Some(Ok(())) => {
log::trace!("Transfer {} complete", token);
@ -126,6 +138,12 @@ impl<H: Handler> MultiDriver<H> {
break;
}
if should_report_progress {
if let Some(ref mut progress) = self.progress {
progress.report();
}
}
if in_progress == 0 {
log::debug!("All transfers finished successfully.");
break;

View File

@ -25,6 +25,7 @@ use url_ext::UrlExt;
use crate::api::EdenApi;
use crate::config::{ClientCreds, Config};
use crate::packs::{write_datapack, write_historypack};
use crate::progress::ProgressFn;
pub(crate) type HyperClient = Client<HttpsConnector<HttpConnector>, Body>;
@ -154,7 +155,7 @@ impl EdenApi for EdenApiHyperClient {
/// Fetch the content of the specified file from the API server and write
/// it to a datapack in the configured cache directory. Returns the path
/// of the resulting packfile.
fn get_files(&self, keys: Vec<Key>) -> Fallible<PathBuf> {
fn get_files(&self, keys: Vec<Key>, _: Option<ProgressFn>) -> Fallible<PathBuf> {
let client = Arc::clone(&self.client);
let prefix = self.repo_base_url()?.join(paths::GET_FILE)?;
@ -173,7 +174,12 @@ impl EdenApi for EdenApiHyperClient {
/// Fetch the history of the specified file from the API server and write
/// it to a historypack in the configured cache directory. Returns the path
/// of the resulting packfile.
fn get_history(&self, keys: Vec<Key>, max_depth: Option<u32>) -> Fallible<PathBuf> {
fn get_history(
&self,
keys: Vec<Key>,
max_depth: Option<u32>,
_: Option<ProgressFn>,
) -> Fallible<PathBuf> {
let client = Arc::clone(&self.client);
let prefix = self.repo_base_url()?.join(paths::GET_HISTORY)?;

View File

@ -5,8 +5,10 @@ mod config;
mod curl;
mod hyper;
mod packs;
mod progress;
pub use crate::api::EdenApi;
pub use crate::config::Config;
pub use crate::curl::EdenApiCurlClient;
pub use crate::hyper::EdenApiHyperClient;
pub use crate::progress::{ProgressFn, ProgressStats};

View File

@ -0,0 +1,81 @@
// Copyright Facebook, Inc. 2019
use std::{cell::RefCell, rc::Rc};
pub use stats::ProgressStats;
mod stats;
pub type ProgressFn = Box<dyn FnMut(ProgressStats) + Send + 'static>;
pub struct ProgressHandle {
inner: Rc<RefCell<ProgressManagerInner>>,
index: usize,
}
impl ProgressHandle {
pub fn update(&self, stats: ProgressStats) {
self.inner.borrow_mut().update(self.index, stats);
}
}
pub struct ProgressManager {
inner: Rc<RefCell<ProgressManagerInner>>,
callback: Option<ProgressFn>,
}
impl ProgressManager {
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Rc::new(RefCell::new(ProgressManagerInner::with_capacity(capacity))),
callback: None,
}
}
pub fn set_callback(&mut self, f: Option<ProgressFn>) {
self.callback = f;
}
pub fn register(&self) -> ProgressHandle {
let inner = Rc::clone(&self.inner);
let index = inner.borrow_mut().register();
ProgressHandle { inner, index }
}
pub fn stats(&self) -> ProgressStats {
self.inner.borrow().stats()
}
pub fn report(&mut self) {
let stats = self.stats();
if let Some(ref mut callback) = self.callback {
callback(stats);
}
}
}
struct ProgressManagerInner {
stats: Vec<ProgressStats>,
}
impl ProgressManagerInner {
fn with_capacity(capacity: usize) -> Self {
Self {
stats: Vec::with_capacity(capacity),
}
}
fn register(&mut self) -> usize {
let index = self.stats.len();
self.stats.push(Default::default());
index
}
fn update(&mut self, index: usize, stats: ProgressStats) {
self.stats[index] = stats;
}
fn stats(&self) -> ProgressStats {
self.stats.iter().cloned().sum()
}
}

View File

@ -0,0 +1,84 @@
// Copyright Facebook, Inc. 2019
use std::{
fmt,
iter::Sum,
ops::{Add, AddAssign, Sub, SubAssign},
};
#[derive(Default, Debug, Copy, Clone)]
pub struct ProgressStats {
pub downloaded: u64,
pub uploaded: u64,
pub dltotal: u64,
pub ultotal: u64,
}
impl ProgressStats {
pub fn new(downloaded: u64, uploaded: u64, dltotal: u64, ultotal: u64) -> Self {
Self {
downloaded,
uploaded,
dltotal,
ultotal,
}
}
pub fn as_tuple(&self) -> (u64, u64, u64, u64) {
(self.downloaded, self.dltotal, self.uploaded, self.ultotal)
}
}
impl fmt::Display for ProgressStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Downloaded: {}/{} bytes; Uploaded {}/{} bytes",
self.downloaded, self.dltotal, self.uploaded, self.ultotal
)
}
}
impl Add for ProgressStats {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
downloaded: self.downloaded + other.downloaded,
uploaded: self.uploaded + other.uploaded,
dltotal: self.dltotal + other.dltotal,
ultotal: self.ultotal + other.ultotal,
}
}
}
impl AddAssign for ProgressStats {
fn add_assign(&mut self, other: ProgressStats) {
*self = *self + other
}
}
impl Sub for ProgressStats {
type Output = Self;
fn sub(self, other: Self) -> Self {
Self {
downloaded: self.downloaded - other.downloaded,
uploaded: self.uploaded - other.uploaded,
dltotal: self.dltotal - other.dltotal,
ultotal: self.ultotal - other.ultotal,
}
}
}
impl SubAssign for ProgressStats {
fn sub_assign(&mut self, other: ProgressStats) {
*self = *self - other
}
}
impl Sum for ProgressStats {
fn sum<I: Iterator<Item = ProgressStats>>(iter: I) -> ProgressStats {
iter.fold(Default::default(), Add::add)
}
}