batched tailing mode

Summary:
This diff add new mode of tailing based on derived data graph, it uses same functionality as backfill.
- `tail_batch_iteration` uses `bounded_traversal_dag` directly instead of leveraging `DeriveGraph::derive` so we could warm-up dependencies for each node before calling `DerivedUitls::backfill_batch_dangerous`

Reviewed By: StanislavGlebik

Differential Revision: D24306156

fbshipit-source-id: 006cb6d4df9424cd6501cb4a381b95f096e70551
This commit is contained in:
Pavel Aslanov 2020-10-19 07:28:24 -07:00 committed by Facebook GitHub Bot
parent 87db93687e
commit d752cfa651
4 changed files with 157 additions and 49 deletions

View File

@ -101,6 +101,7 @@ blobstore_sync_queue = { path = "blobstore_sync_queue" }
bonsai_globalrev_mapping = { path = "bonsai_globalrev_mapping" }
bookmark_renaming = { path = "commit_rewriting/bookmark_renaming" }
bookmarks = { path = "bookmarks" }
bounded_traversal = { path = "common/bounded_traversal" }
bulkops = { path = "bulkops" }
cacheblob = { path = "blobstore/cacheblob" }
changeset_fetcher = { path = "blobrepo/changeset_fetcher" }

View File

@ -28,7 +28,7 @@ use derived_data_utils::{
use fbinit::FacebookInit;
use fsnodes::RootFsnodeId;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
compat::Future01CompatExt,
future::{self, try_join},
stream::{self, StreamExt, TryStreamExt},
};
@ -44,7 +44,7 @@ use std::{
fs,
path::Path,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};
use time_ext::DurationExt;
@ -68,6 +68,7 @@ const ARG_REGENERATE: &str = "regenerate";
const ARG_PREFETCHED_COMMITS_PATH: &str = "prefetched-commits-path";
const ARG_CHANGESET: &str = "changeset";
const ARG_USE_SHARED_LEASES: &str = "use-shared-leases";
const ARG_BATCHED: &str = "batched";
const SUBCOMMAND_BACKFILL: &str = "backfill";
const SUBCOMMAND_BACKFILL_ALL: &str = "backfill-all";
@ -183,6 +184,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
for derived data lease with other mononoke services and start deriving only if the lock \
is taken"
),
)
.arg(
Arg::with_name(ARG_BATCHED)
.long(ARG_BATCHED)
.takes_value(false)
.required(false)
.help("Use batched deriver instead of calling `::derive` periodically")
),
)
.subcommand(
@ -365,7 +373,8 @@ async fn run_subcmd<'a>(
.compat()
.await?;
let use_shared_leases = sub_m.is_present(ARG_USE_SHARED_LEASES);
subcommand_tail(&ctx, unredacted_repo, use_shared_leases).await
let batched = sub_m.is_present(ARG_BATCHED);
subcommand_tail(&ctx, unredacted_repo, use_shared_leases, batched).await
}
(SUBCOMMAND_PREFETCH_COMMITS, Some(sub_m)) => {
let out_filename = sub_m
@ -454,26 +463,7 @@ async fn subcommand_backfill_all(
.iter()
.map(|name| derived_data_utils_unsafe(repo.clone(), name.clone()))
.collect::<Result<Vec<_>, _>>()?;
let heads: Vec<_> = repo
.get_bonsai_heads_maybe_stale(ctx.clone())
.compat()
.try_collect()
.await?;
info!(ctx.logger(), "heads count: {}", heads.len());
info!(ctx.logger(), "building derived data graph",);
let derive_graph = derived_data_utils::build_derive_graph(
ctx,
repo,
heads,
derivers,
CHUNK_SIZE,
// This means that for 1000 commits it will inspect all changesets for underived data
// after 1000 commits in 1000 * 1.5 commits, then 1000 in 1000 * 1.5 ^ 2 ... 1000 in 1000 * 1.5 ^ n
ThinOut::new(1000.0, 1.5),
)
.await?;
info!(ctx.logger(), "deriving data",);
derive_graph.derive(ctx.clone(), repo.clone()).await
tail_batch_iteration(ctx, repo, derivers.as_ref()).await
}
async fn subcommand_backfill(
@ -507,7 +497,7 @@ async fn subcommand_backfill(
.await?;
let chunk_size = chunk.len();
warmup::warmup(ctx, repo, derived_data_type, &chunk).await?;
warmup::warmup(ctx, repo, derived_data_type.as_ref(), &chunk).await?;
derived_utils
.backfill_batch_dangerous(ctx.clone(), repo.clone(), chunk)
@ -546,6 +536,7 @@ async fn subcommand_tail(
ctx: &CoreContext,
unredacted_repo: BlobRepo,
use_shared_leases: bool,
batched: bool,
) -> Result<(), Error> {
let unredacted_repo = if use_shared_leases {
// "shared" leases are the default - so we don't need to do anything.
@ -567,19 +558,34 @@ async fn subcommand_tail(
.into_iter()
.map(|name| derived_data_utils(unredacted_repo.clone(), name))
.collect::<Result<_, Error>>()?;
slog::info!(
ctx.logger(),
"[{}] derived data: {:?}",
unredacted_repo.name(),
derive_utils
.iter()
.map(|d| d.name())
.collect::<BTreeSet<_>>(),
);
loop {
tail_one_iteration(ctx, &unredacted_repo, &derive_utils).await?;
if batched {
info!(ctx.logger(), "using batched deriver");
loop {
tail_batch_iteration(ctx, &unredacted_repo, &derive_utils).await?;
}
} else {
info!(ctx.logger(), "using simple deriver");
loop {
tail_one_iteration(ctx, &unredacted_repo, &derive_utils).await?;
}
}
}
async fn tail_one_iteration(
async fn get_most_recent_heads(
ctx: &CoreContext,
repo: &BlobRepo,
derive_utils: &[Arc<dyn DerivedUtils>],
) -> Result<(), Error> {
let heads = repo
.bookmarks()
) -> Result<Vec<ChangesetId>, Error> {
repo.bookmarks()
.list(
ctx.clone(),
Freshness::MostRecent,
@ -590,7 +596,80 @@ async fn tail_one_iteration(
)
.map_ok(|(_name, csid)| csid)
.try_collect::<Vec<_>>()
.await?;
.await
}
async fn tail_batch_iteration(
ctx: &CoreContext,
repo: &BlobRepo,
derive_utils: &[Arc<dyn DerivedUtils>],
) -> Result<(), Error> {
let heads = get_most_recent_heads(ctx, repo).await?;
let derive_graph = derived_data_utils::build_derive_graph(
ctx,
repo,
heads,
derive_utils.to_vec(),
CHUNK_SIZE,
// This means that for 1000 commits it will inspect all changesets for underived data
// after 1000 commits in 1000 * 1.5 commits, then 1000 in 1000 * 1.5 ^ 2 ... 1000 in 1000 * 1.5 ^ n
ThinOut::new(1000.0, 1.5),
)
.await?;
let size = derive_graph.size();
if size == 0 {
tokio::time::delay_for(Duration::from_millis(250)).await;
} else {
info!(ctx.logger(), "deriving data {}", size);
// We are using `bounded_traversal_dag` directly instead of `DeriveGraph::derive`
// so we could use `warmup::warmup` on each node.
bounded_traversal::bounded_traversal_dag(
100,
derive_graph,
|node| async move {
let deps = node.dependencies.clone();
Ok((node, deps))
},
move |node, _| {
cloned!(ctx, repo);
async move {
if let Some(deriver) = &node.deriver {
warmup::warmup(&ctx, &repo, deriver.name(), &node.csids).await?;
let timestamp = Instant::now();
deriver
.backfill_batch_dangerous(ctx.clone(), repo, node.csids.clone())
.await?;
if let (Some(first), Some(last)) = (node.csids.first(), node.csids.last()) {
slog::info!(
ctx.logger(),
"[{}:{}] count:{} time:{:?} start:{} end:{}",
deriver.name(),
node.id,
node.csids.len(),
timestamp.elapsed(),
first,
last
);
}
}
Ok::<_, Error>(())
}
},
)
.await?
.ok_or_else(|| anyhow!("derive graph contains a cycle"))?;
}
Ok(())
}
async fn tail_one_iteration(
ctx: &CoreContext,
repo: &BlobRepo,
derive_utils: &[Arc<dyn DerivedUtils>],
) -> Result<(), Error> {
let heads = get_most_recent_heads(ctx, repo).await?;
// Find heads that needs derivation and find their oldest underived ancestor
let find_pending_futs: Vec<_> = derive_utils

View File

@ -35,7 +35,7 @@ const PREFETCH_UNODE_TYPES: &[&str] = &[RootFastlog::NAME, RootDeletedManifestId
pub(crate) async fn warmup(
ctx: &CoreContext,
repo: &BlobRepo,
derived_data_type: &String,
derived_data_type: &str,
chunk: &Vec<ChangesetId>,
) -> Result<(), Error> {
// Warmup bonsai changesets unconditionally because
@ -55,21 +55,21 @@ pub(crate) async fn warmup(
};
let content_warmup = async {
if PREFETCH_CONTENT_TYPES.contains(&derived_data_type.as_ref()) {
if PREFETCH_CONTENT_TYPES.contains(&derived_data_type) {
content_warmup(ctx, repo, chunk).await?
}
Ok(())
};
let metadata_warmup = async {
if PREFETCH_CONTENT_METADATA_TYPES.contains(&derived_data_type.as_ref()) {
if PREFETCH_CONTENT_METADATA_TYPES.contains(&derived_data_type) {
content_metadata_warmup(ctx, repo, chunk).await?
}
Ok(())
};
let unode_warmup = async {
if PREFETCH_UNODE_TYPES.contains(&derived_data_type.as_ref()) {
if PREFETCH_UNODE_TYPES.contains(&derived_data_type) {
unode_warmup(ctx, repo, chunk).await?
}
Ok(())

View File

@ -40,6 +40,7 @@ use mononoke_types::{BonsaiChangeset, ChangesetId};
use std::{
collections::{HashMap, HashSet},
hash::{Hash, Hasher},
io::Write,
sync::{Arc, Mutex},
};
use topo_sort::sort_topological;
@ -459,7 +460,7 @@ fn derived_data_utils_impl(
}
pub struct DeriveGraphInner {
id: usize,
pub id: usize,
// deriver can be None only for the root element, and csids for this element is empty.
pub deriver: Option<Arc<dyn DerivedUtils>>,
pub csids: Vec<ChangesetId>,
@ -489,6 +490,27 @@ impl DeriveGraph {
}
}
pub fn is_empty(&self) -> bool {
self.size() == 0
}
/// Number of derivation to be carried out by this graph
pub fn size(&self) -> usize {
let mut stack = vec![self];
let mut visited = HashSet::new();
let mut count = 0;
while let Some(node) = stack.pop() {
count += node.csids.len();
for dep in node.dependencies.iter() {
if visited.insert(dep.id) {
stack.push(dep);
}
}
}
count
}
/// Derive all data in the graph
pub async fn derive(&self, ctx: CoreContext, repo: BlobRepo) -> Result<(), Error> {
bounded_traversal::bounded_traversal_dag(
100,
@ -527,14 +549,20 @@ impl DeriveGraph {
// render derive graph as digraph that can be rendered with graphviz
// for debugging purposes.
// $ dot -Tpng <outout> -o <image>
pub fn digraph(&self, w: &mut dyn std::io::Write) -> std::io::Result<()> {
pub fn digraph(&self, w: &mut dyn Write) -> std::io::Result<()> {
writeln!(w, "digraph DeriveGraph {{")?;
let mut stack = vec![self];
let mut visited = HashSet::new();
while let Some(node) = stack.pop() {
let deriver = node.deriver.as_ref().map_or_else(|| "root", |d| d.name());
writeln!(w, " {0} [label=\"{1}:{0}\"]", node.id, deriver)?;
writeln!(
w,
" {0} [label=\"[{1}] id:{0} csids:{2}\"]",
node.id,
deriver,
node.csids.len()
)?;
for dep in node.dependencies.iter() {
writeln!(w, " {} -> {}", node.id, dep.id)?;
if visited.insert(dep.id) {
@ -619,7 +647,6 @@ pub async fn build_derive_graph(
find_underived_many(ctx.clone(), repo.clone(), csids, derivers.clone(), thin_out);
let mut found_changesets = 0usize;
let start = std::time::Instant::now();
slog::info!(ctx.logger(), "searching for underived changesets");
while let Some((csid, parents, derivers)) = underived_stream.try_next().await? {
underived_dag.insert(csid, parents);
underived_to_derivers.insert(csid, derivers);
@ -633,12 +660,14 @@ pub async fn build_derive_graph(
);
}
}
slog::info!(
ctx.logger(),
"found changsets: {} {:.3}/s",
found_changesets,
found_changesets as f32 / start.elapsed().as_secs_f32(),
);
if found_changesets > 0 {
slog::info!(
ctx.logger(),
"found changsets: {} {:.3}/s",
found_changesets,
found_changesets as f32 / start.elapsed().as_secs_f32(),
);
}
// topologically sort changeset
let underived_ordered = sort_topological(&underived_dag).expect("commit graph has cycles!");
@ -902,6 +931,7 @@ mod tests {
thin_out,
)
.await?;
assert_eq!(graph.size(), 16);
let (graph_ids, nodes) = derive_graph_unpack(&graph);
assert_eq!(
graph_ids,
@ -941,9 +971,7 @@ mod tests {
thin_out,
)
.await?;
let (_, nodes) = derive_graph_unpack(&graph);
assert_eq!(nodes.len(), 1);
assert!(nodes[0].dependencies.is_empty());
assert!(graph.is_empty());
Ok::<_, Error>(())
}