edenapi: make multi_request take callback

Summary: Refactor `multi_request` to take a callback to process each received response. This will allow us to write responses as soon as we get them rather than buffering them (this will happen in a later diff).

Reviewed By: xavierd

Differential Revision: D15405299

fbshipit-source-id: 1f7317f5c1ae4a6570c2e5c354b1fbcdf1f1127a
This commit is contained in:
Arun Kulshreshtha 2019-05-20 11:49:20 -07:00 committed by Facebook Github Bot
parent 157325e38d
commit 64cfee5321

View File

@ -145,7 +145,11 @@ impl EdenApi for EdenApiCurlClient {
keys: batch.into_iter().collect(), keys: batch.into_iter().collect(),
}); });
let responses: Vec<DataResponse> = self.multi_request(&url, requests, progress)?; let mut responses = Vec::new();
self.multi_request(&url, requests, progress, |res: DataResponse| {
responses.push(res);
Ok(())
})?;
log::debug!( log::debug!(
"Received {} responses with {} total entries", "Received {} responses with {} total entries",
@ -190,15 +194,16 @@ impl EdenApi for EdenApiCurlClient {
depth: max_depth, depth: max_depth,
}); });
let responses: Vec<HistoryResponse> = self.multi_request(&url, requests, progress)?; let mut responses = Vec::new();
self.multi_request(&url, requests, progress, |res: HistoryResponse| {
responses.push(res);
Ok(())
})?;
log::debug!( log::debug!(
"Received {} responses with {} total entries", "Received {} responses with {} total entries",
responses.len(), responses.len(),
responses responses.iter().map(|res| res.entries.len()).sum::<usize>(),
.iter()
.map(|entry| entry.entries.len())
.sum::<usize>(),
); );
write_to_historystore(store, responses.into_iter().flatten()) write_to_historystore(store, responses.into_iter().flatten())
@ -228,18 +233,21 @@ impl EdenApiCurlClient {
} }
/// Send multiple concurrent POST requests using the given requests as the /// Send multiple concurrent POST requests using the given requests as the
/// JSON payload of each respective request. Assumes that the responses are /// CBOR payload of each respective request. Assumes that the responses are
/// CBOR encoded, and automatically deserializes and returns them. /// CBOR encoded, and automatically deserializes them before passing
fn multi_request<I, T, R>( /// them to the given callback.
fn multi_request<R, I, T, F>(
&self, &self,
url: &Url, url: &Url,
requests: I, requests: I,
progress_cb: Option<ProgressFn>, progress_cb: Option<ProgressFn>,
) -> Fallible<Vec<T>> mut response_cb: F,
) -> Fallible<()>
where where
R: Serialize, R: Serialize,
T: DeserializeOwned,
I: IntoIterator<Item = R>, I: IntoIterator<Item = R>,
T: DeserializeOwned,
F: FnMut(T) -> Fallible<()>,
{ {
let requests = requests.into_iter().collect::<Vec<_>>(); let requests = requests.into_iter().collect::<Vec<_>>();
let num_requests = requests.len(); let num_requests = requests.len();
@ -259,8 +267,6 @@ impl EdenApiCurlClient {
progress.set_callback(progress_cb); progress.set_callback(progress_cb);
driver.set_progress_manager(progress); driver.set_progress_manager(progress);
let mut responses = Vec::with_capacity(num_requests);
log::debug!("Performing {} requests", num_requests); log::debug!("Performing {} requests", num_requests);
let start = Instant::now(); let start = Instant::now();
@ -268,15 +274,14 @@ impl EdenApiCurlClient {
let easy = res?; let easy = res?;
let data = easy.get_ref().data(); let data = easy.get_ref().data();
let response = serde_cbor::from_slice::<T>(data)?; let response = serde_cbor::from_slice::<T>(data)?;
responses.push(response); response_cb(response)
Ok(())
})?; })?;
let elapsed = start.elapsed(); let elapsed = start.elapsed();
let total_bytes = driver.progress().unwrap().stats().downloaded; let total_bytes = driver.progress().unwrap().stats().downloaded;
print_download_stats(total_bytes, elapsed); print_download_stats(total_bytes, elapsed);
Ok(responses) Ok(())
} }
} }