edenapi: make MultiDriver take a callback to handle results

Summary:
Previously `MultiDriver` (a struct with manages a `curl::Multi` session) would collect all of the received HTTP responses and return them after all transfers had completed. This was problematic in situations where the number of requests is large; the process can run out of memory buffering this data. A better approach would be to deal with the data the moment it is received.

This diff changes `MultiDriver` to take a callback which can handle each response as each individual transfer completes, rather than waiting until the end. For now, the EdenAPI client simply sets a callback that adds the data to a Vec, meaning that we're still buffering. In a later diff, we'll update the callback to directly write the data to storage instead.

Differential Revision: D15379276

fbshipit-source-id: ab4f98e49141f6582b6e2d366045eda514404197
This commit is contained in:
Arun Kulshreshtha 2019-05-16 16:20:22 -07:00 committed by Facebook Github Bot
parent c9c4175981
commit 0cc687df58
2 changed files with 62 additions and 63 deletions

View File

@ -259,21 +259,23 @@ impl EdenApiCurlClient {
progress.set_callback(progress_cb);
driver.set_progress_manager(progress);
let mut responses = Vec::with_capacity(num_requests);
log::debug!("Performing {} requests", num_requests);
let start = Instant::now();
let handles = driver.perform()?.into_result()?;
driver.perform(|res| {
let easy = res?;
let data = easy.get_ref().data();
let response = serde_cbor::from_slice::<T>(data)?;
responses.push(response);
Ok(())
})?;
let elapsed = start.elapsed();
let total_bytes = driver.progress().unwrap().stats().downloaded;
print_download_stats(total_bytes, elapsed);
let mut responses = Vec::with_capacity(handles.len());
for easy in handles {
let data = easy.get_ref().data();
let response = serde_cbor::from_slice::<T>(data)?;
responses.push(response);
}
Ok(responses)
}
}

View File

@ -1,13 +1,13 @@
// Copyright Facebook, Inc. 2019
use std::{fmt::Write, mem, time::Duration};
use std::{cell::RefCell, time::Duration};
use curl::{
self,
easy::{Easy2, Handler},
multi::{Easy2Handle, Multi},
};
use failure::{err_msg, Fallible};
use failure::Fallible;
use crate::progress::ProgressManager;
@ -15,35 +15,13 @@ use crate::progress::ProgressManager;
/// on any active transfer in a curl::Multi session.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// The result of using a MultiDriver to manage a curl::Multi session.
/// Contains all of the Easy2 handles for the session along with
/// information about which (if any) of the transfers failed.
pub struct MultiDriverResult<H> {
handles: Vec<Easy2<H>>,
failed: Vec<(usize, curl::Error)>,
}
impl<H> MultiDriverResult<H> {
pub fn into_result(self) -> Fallible<Vec<Easy2<H>>> {
if self.failed.is_empty() {
return Ok(self.handles);
}
let mut msg = "The following transfers failed:\n".to_string();
for (i, e) in self.failed {
write!(&mut msg, "{}: {}\n", i, e)?;
}
Err(err_msg(msg))
}
}
/// Struct that manages a curl::Multi session, synchronously driving
/// all of the transfers therein to completion.
pub struct MultiDriver<H> {
multi: Multi,
handles: Vec<Easy2Handle<H>>,
handles: RefCell<Vec<Option<Easy2Handle<H>>>>,
progress: Option<ProgressManager>,
num_transfers: usize,
fail_early: bool,
}
@ -51,8 +29,9 @@ impl<H: Handler> MultiDriver<H> {
pub fn with_capacity(capacity: usize) -> Self {
Self {
multi: Multi::new(),
handles: Vec::with_capacity(capacity),
handles: RefCell::new(Vec::with_capacity(capacity)),
progress: None,
num_transfers: 0,
fail_early: false,
}
}
@ -70,24 +49,15 @@ impl<H: Handler> MultiDriver<H> {
// Assign a token to this Easy2 handle so we can correlate messages
// for this handle with the corresponding Easy2Handle while the
// Easy2 is owned by the Multi handle.
let token = self.handles.len();
let mut handles = self.handles.borrow_mut();
let token = handles.len();
let mut handle = self.multi.add2(easy)?;
handle.set_token(token)?;
self.handles.push(handle);
handles.push(Some(handle));
self.num_transfers += 1;
Ok(())
}
/// Remove and return all of the Easy2 handles in the Multi stack.
pub fn remove_all(&mut self) -> Fallible<Vec<Easy2<H>>> {
let handles = mem::replace(&mut self.handles, Vec::with_capacity(0));
let mut easy_vec = Vec::with_capacity(handles.len());
for handle in handles {
let easy = self.multi.remove2(handle)?;
easy_vec.push(easy);
}
Ok(easy_vec)
}
/// If `fail_early` is set to true, then the driver will return early if
/// any transfers fail (leaving the remaining transfers in an unfinished
/// state); otherwise, the driver will only return once all transfers
@ -98,21 +68,21 @@ impl<H: Handler> MultiDriver<H> {
/// Drive all of the Easy2 handles in the Multi stack to completion.
///
/// Returns all of the Easy2 handles in the Multi stack in the order
/// they were added, along with the indices of any failed transfers
/// (along with the corresponding error code).
pub(super) fn perform(&mut self) -> Fallible<MultiDriverResult<H>> {
let num_transfers = self.handles.len();
let mut in_progress = num_transfers;
let mut failed = Vec::new();
/// The caller-supplied callback will be called whenever a transfer
/// completes, successfully or otherwise.
pub fn perform<F>(&mut self, mut callback: F) -> Fallible<()>
where
F: FnMut(Result<Easy2<H>, curl::Error>) -> Fallible<()>,
{
let mut in_progress = self.num_transfers;
let mut i = 0;
loop {
log::trace!(
"Iteration {}: {}/{} transfers complete",
i,
num_transfers - in_progress,
num_transfers
self.num_transfers - in_progress,
self.num_transfers
);
i += 1;
@ -120,6 +90,7 @@ impl<H: Handler> MultiDriver<H> {
// Check for messages; a message indicates a transfer completed (successfully or not).
let mut should_report_progress = false;
let mut error = None;
self.multi.messages(|msg| {
let token = msg.token().unwrap();
log::trace!("Got message for transfer {}", token);
@ -129,10 +100,25 @@ impl<H: Handler> MultiDriver<H> {
match msg.result() {
Some(Ok(())) => {
log::trace!("Transfer {} complete", token);
match self.take_handle(token) {
Ok(Some(handle)) => {
if let Err(e) = callback(Ok(handle)) {
error = Some(e);
}
}
Ok(None) => {
log::trace!("Handle already taken");
}
Err(e) => {
error = Some(e);
}
}
}
Some(Err(e)) => {
log::trace!("Transfer {} failed: {}", token, &e);
failed.push((token, e));
if let Err(e) = callback(Err(e)) {
error = Some(e);
}
}
None => {
// Theoretically this should never happen because
@ -142,9 +128,11 @@ impl<H: Handler> MultiDriver<H> {
}
});
if self.fail_early && failed.len() > 0 {
log::debug!("At least one transfer failed; aborting.");
break;
if let Some(e) = error {
if self.fail_early {
log::debug!("At least one transfer failed; aborting.");
return Err(e);
}
}
if should_report_progress {
@ -167,7 +155,16 @@ impl<H: Handler> MultiDriver<H> {
}
}
let handles = self.remove_all()?;
Ok(MultiDriverResult { handles, failed })
Ok(())
}
fn take_handle(&self, index: usize) -> Fallible<Option<Easy2<H>>> {
let mut handles = self.handles.borrow_mut();
if let Some(handle) = handles[index].take() {
let easy = self.multi.remove2(handle)?;
Ok(Some(easy))
} else {
Ok(None)
}
}
}