diff --git a/eden/scm/lib/checkout/src/lib.rs b/eden/scm/lib/checkout/src/lib.rs index 1dc2134a5e..a8cac5083c 100644 --- a/eden/scm/lib/checkout/src/lib.rs +++ b/eden/scm/lib/checkout/src/lib.rs @@ -6,7 +6,7 @@ */ use anyhow::{bail, format_err, Result}; -use futures::{stream, try_join, Stream, StreamExt}; +use futures::{stream, try_join, Stream, StreamExt, TryStreamExt}; use manifest::{DiffEntry, DiffType, FileMetadata, FileType}; use revisionstore::{HgIdDataStore, RemoteDataStore, StoreKey, StoreResult}; use std::fmt; @@ -15,6 +15,8 @@ use std::sync::Arc; use types::{HgId, Key, RepoPathBuf}; use vfs::{UpdateFlag, VFS}; +const PREFETCH_CHUNK_SIZE: usize = 1000; + /// Contains lists of files to be removed / updated during checkout. pub struct CheckoutPlan { /// Files to be removed. @@ -110,7 +112,7 @@ impl CheckoutPlan { /// Pending storage futures are dropped when error is returned pub async fn apply_stream< S: Stream>>> + Unpin, - F: FnOnce(Vec) -> Result, + F: FnOnce(Vec) -> S, >( self, vfs: &VFS, @@ -132,7 +134,7 @@ impl CheckoutPlan { .map(|u| Key::new(u.path.clone(), u.content_hgid)) .collect(); - let data_stream = f(keys)?; + let data_stream = f(keys); let update_content = data_stream .zip(stream::iter(self.update_content.into_iter())) @@ -171,9 +173,7 @@ impl CheckoutPlan { store: &DS, ) -> Result { self.apply_stream(vfs, |keys| { - Ok(stream::iter( - keys.into_iter().map(|key| store.get(StoreKey::HgId(key))), - )) + stream::iter(keys.into_iter().map(|key| store.get(StoreKey::HgId(key)))) }) .await } @@ -184,11 +184,13 @@ impl CheckoutPlan { store: &DS, ) -> Result { self.apply_stream(vfs, |keys| { - let store_keys: Vec<_> = keys.into_iter().map(StoreKey::HgId).collect(); - store.prefetch(&store_keys)?; - Ok(stream::iter( - store_keys.into_iter().map(|key| store.get(key)), - )) + stream::iter(keys.into_iter().map(StoreKey::HgId)) + .chunks(PREFETCH_CHUNK_SIZE) + .map(|chunk| -> Result<_> { + store.prefetch(&chunk)?; + Ok(stream::iter(chunk.into_iter().map(|key| store.get(key)))) + }) + .try_flatten() }) .await } @@ -544,8 +546,8 @@ mod test { Ok(()) } - fn dummy_fs(v: Vec) -> Result>>>> { - Ok(stream::iter(v).map(|key| Ok(StoreResult::Found(hgid_file(&key.hgid))))) + fn dummy_fs(v: Vec) -> impl Stream>>> { + stream::iter(v).map(|key| Ok(StoreResult::Found(hgid_file(&key.hgid)))) } fn hgid_file(hgid: &HgId) -> Vec {