mirror of
https://github.com/facebook/sapling.git
synced 2024-10-11 01:07:15 +03:00
mononoke: warmup content metadata for fsnodes
Summary: It makes it backfill a great deal faster Reviewed By: krallin Differential Revision: D21040292 fbshipit-source-id: f6d06cbc76e710b4812f15e85eba73b24cdbbd3e
This commit is contained in:
parent
fec12c95f1
commit
584728bd56
@ -30,10 +30,10 @@ use derived_data_utils::{
|
|||||||
};
|
};
|
||||||
use fastlog::{fetch_parent_root_unodes, RootFastlog};
|
use fastlog::{fetch_parent_root_unodes, RootFastlog};
|
||||||
use fbinit::FacebookInit;
|
use fbinit::FacebookInit;
|
||||||
use fsnodes::RootFsnodeId;
|
use fsnodes::{prefetch_content_metadata, RootFsnodeId};
|
||||||
use futures::{
|
use futures::{
|
||||||
compat::{Future01CompatExt, Stream01CompatExt},
|
compat::{Future01CompatExt, Stream01CompatExt},
|
||||||
future::{self, ready, try_join, try_join3, TryFutureExt},
|
future::{self, ready, try_join, try_join3, try_join4, TryFutureExt},
|
||||||
stream::{self, FuturesUnordered, Stream, StreamExt, TryStreamExt},
|
stream::{self, FuturesUnordered, Stream, StreamExt, TryStreamExt},
|
||||||
};
|
};
|
||||||
use futures_ext::FutureExt as OldFutureExt;
|
use futures_ext::FutureExt as OldFutureExt;
|
||||||
@ -47,6 +47,7 @@ use phases::SqlPhases;
|
|||||||
use slog::{info, Logger};
|
use slog::{info, Logger};
|
||||||
use stats::prelude::*;
|
use stats::prelude::*;
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
fs,
|
fs,
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{
|
sync::{
|
||||||
@ -95,6 +96,7 @@ const UNREDACTED_TYPES: &[&str] = &[
|
|||||||
/// Types of derived data for which prefetching content for changed files
|
/// Types of derived data for which prefetching content for changed files
|
||||||
/// migth speed up derivation.
|
/// migth speed up derivation.
|
||||||
const PREFETCH_CONTENT_TYPES: &[&str] = &[BlameRoot::NAME];
|
const PREFETCH_CONTENT_TYPES: &[&str] = &[BlameRoot::NAME];
|
||||||
|
const PREFETCH_CONTENT_METADATA_TYPES: &[&str] = &[RootFsnodeId::NAME];
|
||||||
const PREFETCH_UNODE_TYPES: &[&str] = &[RootFastlog::NAME, RootDeletedManifestId::NAME];
|
const PREFETCH_UNODE_TYPES: &[&str] = &[RootFastlog::NAME, RootDeletedManifestId::NAME];
|
||||||
|
|
||||||
fn open_repo_maybe_unredacted<'a>(
|
fn open_repo_maybe_unredacted<'a>(
|
||||||
@ -488,6 +490,13 @@ async fn warmup(
|
|||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let metadata_warmup = async {
|
||||||
|
if PREFETCH_CONTENT_METADATA_TYPES.contains(&derived_data_type.as_ref()) {
|
||||||
|
content_metadata_warmup(ctx, repo, chunk).await?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
let unode_warmup = async {
|
let unode_warmup = async {
|
||||||
if PREFETCH_UNODE_TYPES.contains(&derived_data_type.as_ref()) {
|
if PREFETCH_UNODE_TYPES.contains(&derived_data_type.as_ref()) {
|
||||||
unode_warmup(ctx, repo, chunk).await?
|
unode_warmup(ctx, repo, chunk).await?
|
||||||
@ -495,7 +504,7 @@ async fn warmup(
|
|||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
try_join3(bcs_warmup, content_warmup, unode_warmup).await?;
|
try_join4(bcs_warmup, content_warmup, metadata_warmup, unode_warmup).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -512,6 +521,35 @@ async fn content_warmup(
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn content_metadata_warmup(
|
||||||
|
ctx: &CoreContext,
|
||||||
|
repo: &BlobRepo,
|
||||||
|
chunk: &Vec<ChangesetId>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
stream::iter(chunk)
|
||||||
|
.map({
|
||||||
|
|cs_id| async move {
|
||||||
|
let bcs = cs_id.load(ctx.clone(), repo.blobstore()).compat().await?;
|
||||||
|
|
||||||
|
let mut content_ids = HashSet::new();
|
||||||
|
for (_, maybe_file_change) in bcs.file_changes() {
|
||||||
|
if let Some(file_change) = maybe_file_change {
|
||||||
|
content_ids.insert(file_change.content_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prefetch_content_metadata(ctx.clone(), repo.blobstore().clone(), content_ids)
|
||||||
|
.compat()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Result::<_, Error>::Ok(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(Result::<_, Error>::Ok)
|
||||||
|
.try_for_each_concurrent(100, |f| f)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn unode_warmup(
|
async fn unode_warmup(
|
||||||
ctx: &CoreContext,
|
ctx: &CoreContext,
|
||||||
repo: &BlobRepo,
|
repo: &BlobRepo,
|
||||||
|
@ -88,7 +88,7 @@ pub(crate) fn derive_fsnode(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Prefetch metadata for all content IDs introduced by a changeset.
|
// Prefetch metadata for all content IDs introduced by a changeset.
|
||||||
fn prefetch_content_metadata(
|
pub fn prefetch_content_metadata(
|
||||||
ctx: CoreContext,
|
ctx: CoreContext,
|
||||||
blobstore: RepoBlobstore,
|
blobstore: RepoBlobstore,
|
||||||
content_ids: HashSet<ContentId>,
|
content_ids: HashSet<ContentId>,
|
||||||
|
@ -14,6 +14,7 @@ use thiserror::Error;
|
|||||||
mod derive;
|
mod derive;
|
||||||
mod mapping;
|
mod mapping;
|
||||||
|
|
||||||
|
pub use derive::prefetch_content_metadata;
|
||||||
pub use mapping::{RootFsnodeId, RootFsnodeMapping};
|
pub use mapping::{RootFsnodeId, RootFsnodeMapping};
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
Loading…
Reference in New Issue
Block a user