checkout: chunk prefetches during apply_remote_data_store

Summary: This helps with pipelining (once chunk is prefetched it will start rolling out to fs) and also helps with limiting number of keys prefetched at once

Reviewed By: quark-zju

Differential Revision: D26496789

fbshipit-source-id: 9aa6b08c1605ab2a06a02347897f5dcfa22297fd
This commit is contained in:
Andrey Chursin 2021-02-19 10:27:15 -08:00 committed by Facebook GitHub Bot
parent c7269df47e
commit 72dfb176b5

View File

@ -6,7 +6,7 @@
*/ */
use anyhow::{bail, format_err, Result}; use anyhow::{bail, format_err, Result};
use futures::{stream, try_join, Stream, StreamExt}; use futures::{stream, try_join, Stream, StreamExt, TryStreamExt};
use manifest::{DiffEntry, DiffType, FileMetadata, FileType}; use manifest::{DiffEntry, DiffType, FileMetadata, FileType};
use revisionstore::{HgIdDataStore, RemoteDataStore, StoreKey, StoreResult}; use revisionstore::{HgIdDataStore, RemoteDataStore, StoreKey, StoreResult};
use std::fmt; use std::fmt;
@ -15,6 +15,8 @@ use std::sync::Arc;
use types::{HgId, Key, RepoPathBuf}; use types::{HgId, Key, RepoPathBuf};
use vfs::{UpdateFlag, VFS}; use vfs::{UpdateFlag, VFS};
const PREFETCH_CHUNK_SIZE: usize = 1000;
/// Contains lists of files to be removed / updated during checkout. /// Contains lists of files to be removed / updated during checkout.
pub struct CheckoutPlan { pub struct CheckoutPlan {
/// Files to be removed. /// Files to be removed.
@ -110,7 +112,7 @@ impl CheckoutPlan {
/// Pending storage futures are dropped when error is returned /// Pending storage futures are dropped when error is returned
pub async fn apply_stream< pub async fn apply_stream<
S: Stream<Item = Result<StoreResult<Vec<u8>>>> + Unpin, S: Stream<Item = Result<StoreResult<Vec<u8>>>> + Unpin,
F: FnOnce(Vec<Key>) -> Result<S>, F: FnOnce(Vec<Key>) -> S,
>( >(
self, self,
vfs: &VFS, vfs: &VFS,
@ -132,7 +134,7 @@ impl CheckoutPlan {
.map(|u| Key::new(u.path.clone(), u.content_hgid)) .map(|u| Key::new(u.path.clone(), u.content_hgid))
.collect(); .collect();
let data_stream = f(keys)?; let data_stream = f(keys);
let update_content = data_stream let update_content = data_stream
.zip(stream::iter(self.update_content.into_iter())) .zip(stream::iter(self.update_content.into_iter()))
@ -171,9 +173,7 @@ impl CheckoutPlan {
store: &DS, store: &DS,
) -> Result<CheckoutStats> { ) -> Result<CheckoutStats> {
self.apply_stream(vfs, |keys| { self.apply_stream(vfs, |keys| {
Ok(stream::iter( stream::iter(keys.into_iter().map(|key| store.get(StoreKey::HgId(key))))
keys.into_iter().map(|key| store.get(StoreKey::HgId(key))),
))
}) })
.await .await
} }
@ -184,11 +184,13 @@ impl CheckoutPlan {
store: &DS, store: &DS,
) -> Result<CheckoutStats> { ) -> Result<CheckoutStats> {
self.apply_stream(vfs, |keys| { self.apply_stream(vfs, |keys| {
let store_keys: Vec<_> = keys.into_iter().map(StoreKey::HgId).collect(); stream::iter(keys.into_iter().map(StoreKey::HgId))
store.prefetch(&store_keys)?; .chunks(PREFETCH_CHUNK_SIZE)
Ok(stream::iter( .map(|chunk| -> Result<_> {
store_keys.into_iter().map(|key| store.get(key)), store.prefetch(&chunk)?;
)) Ok(stream::iter(chunk.into_iter().map(|key| store.get(key))))
})
.try_flatten()
}) })
.await .await
} }
@ -544,8 +546,8 @@ mod test {
Ok(()) Ok(())
} }
fn dummy_fs(v: Vec<Key>) -> Result<impl Stream<Item = Result<StoreResult<Vec<u8>>>>> { fn dummy_fs(v: Vec<Key>) -> impl Stream<Item = Result<StoreResult<Vec<u8>>>> {
Ok(stream::iter(v).map(|key| Ok(StoreResult::Found(hgid_file(&key.hgid))))) stream::iter(v).map(|key| Ok(StoreResult::Found(hgid_file(&key.hgid))))
} }
fn hgid_file(hgid: &HgId) -> Vec<u8> { fn hgid_file(hgid: &HgId) -> Vec<u8> {