Efficiently do Changesets::add_many in commit_graph_compat

Summary:
This makes Changesets::add_many more efficient in the compat struct (assuming the inner implementation is also efficient) by:
- Call the inner `add_many` implementation
- Modify `CommitGraph::add_recursive` so it can add multiple commits at the same time.
  - There was an inneficiency where it could visit nodes more than once (because of the order of checking `contains_key`). It has been fixed as well.
- Do old and new changeset in paralllel. No reason not to.

Reviewed By: YousefSalama

Differential Revision: D42605099

fbshipit-source-id: a3fb5f99668345c816bec66298255e4bbcc671f3
This commit is contained in:
Yan Soares Couto 2023-01-23 05:55:08 -08:00 committed by Facebook GitHub Bot
parent e500207f4f
commit 4aa816b58c
6 changed files with 101 additions and 84 deletions

View File

@ -30,6 +30,7 @@ use mononoke_types::ChangesetIdsResolvedFromPrefix;
use mononoke_types::Generation;
use smallvec::SmallVec;
use smallvec::ToSmallVec;
use vec1::Vec1;
use crate::edges::ChangesetEdges;
use crate::edges::ChangesetFrontier;
@ -92,14 +93,17 @@ impl CommitGraph {
&self,
ctx: &CoreContext,
changeset_fetcher: ArcChangesetFetcher,
cs_id: ChangesetId,
parents: ChangesetParents,
changesets: Vec1<(ChangesetId, ChangesetParents)>,
) -> Result<usize> {
let mut edges_map: HashMap<ChangesetId, ChangesetEdges> = Default::default();
let mut search_stack: Vec<(ChangesetId, ChangesetParents)> = vec![(cs_id, parents)];
let mut search_stack: Vec<(ChangesetId, ChangesetParents)> = changesets.into_vec();
let mut to_add_stack: Vec<(ChangesetId, ChangesetParents)> = Default::default();
while let Some((cs_id, parents)) = search_stack.pop() {
if edges_map.contains_key(&cs_id) {
continue;
}
to_add_stack.push((cs_id, parents.clone()));
edges_map.extend(
@ -110,15 +114,13 @@ impl CommitGraph {
);
for parent in parents {
if !edges_map.contains_key(&parent) {
search_stack.push((
parent,
changeset_fetcher
.get_parents(ctx, parent)
.await?
.to_smallvec(),
));
}
search_stack.push((
parent,
changeset_fetcher
.get_parents(ctx, parent)
.await?
.to_smallvec(),
));
}
}

View File

@ -8,7 +8,6 @@
use std::sync::Arc;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use changeset_fetcher::ArcChangesetFetcher;
@ -19,6 +18,7 @@ use changesets::ChangesetInsert;
use changesets::Changesets;
use changesets::SortOrder;
use commit_graph::ArcCommitGraph;
use commit_graph::ChangesetParents;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::stream::BoxStream;
@ -31,6 +31,7 @@ use mononoke_types::RepositoryId;
use scuba_ext::MononokeScubaSampleBuilder;
use smallvec::SmallVec;
use tunables::tunables;
use vec1::vec1;
use vec1::Vec1;
pub struct ChangesetsCommitGraphCompat {
@ -69,6 +70,61 @@ impl ChangesetsCommitGraphCompat {
scuba,
})
}
async fn maybe_write_to_new_commit_graph(
&self,
ctx: &CoreContext,
css: Vec1<(ChangesetId, ChangesetParents)>,
) {
if !tunables()
.get_by_repo_enable_writing_to_new_commit_graph(&self.repo_name)
.unwrap_or(false)
{
return;
}
let mut scuba = self.scuba.clone();
scuba.add_common_server_data();
// Only the last id, which is good enough for logging.
scuba.add("changeset_id", css.last().0.to_string());
scuba.add("changeset_count", css.len());
scuba.add("repo_name", self.repo_name.as_str());
let write_timeout = tunables().get_commit_graph_writes_timeout_ms() as u64;
// We use add_recursive because some parents might be missing
// from the new commit graph.
match tokio::time::timeout(
tokio::time::Duration::from_millis(write_timeout),
self.commit_graph
.add_recursive(ctx, self.changeset_fetcher.clone(), css)
.timed(),
)
.await
{
Err(_) => {
scuba.add("timeout_ms", write_timeout);
scuba.log_with_msg("Insertion timed out", None);
}
Ok((stats, Err(err))) => {
scuba.add("error", err.to_string());
scuba.add("time_s", stats.completion_time.as_secs_f64());
scuba.log_with_msg("Insertion failed", None);
}
Ok((stats, Ok(added_to_commit_graph))) => {
scuba.add("time_s", stats.completion_time.as_secs_f64());
scuba.add("num_added", added_to_commit_graph);
if added_to_commit_graph > 0 {
scuba.log_with_msg("Insertion succeeded", None);
} else {
scuba.log_with_msg("Changeset already stored", None);
}
}
}
}
}
#[async_trait]
@ -77,59 +133,16 @@ impl Changesets for ChangesetsCommitGraphCompat {
self.changesets.repo_id()
}
async fn add(&self, ctx: &CoreContext, cs: ChangesetInsert) -> Result<bool, Error> {
let added_to_changesets = self.changesets.add(ctx, cs.clone()).await?;
if tunables()
.get_by_repo_enable_writing_to_new_commit_graph(&self.repo_name)
.unwrap_or(false)
{
let mut scuba = self.scuba.clone();
scuba.add_common_server_data();
scuba.add("changeset_id", cs.cs_id.to_string());
scuba.add("repo_name", self.repo_name.as_str());
let write_timeout = tunables().get_commit_graph_writes_timeout_ms() as u64;
// We use add_recursive because some parents might be missing
// from the new commit graph.
match tokio::time::timeout(
tokio::time::Duration::from_millis(write_timeout),
self.commit_graph
.add_recursive(
&ctx,
self.changeset_fetcher.clone(),
cs.cs_id,
SmallVec::from_vec(cs.parents),
)
.timed(),
)
.await
{
Err(_) => {
scuba.add("timeout_ms", write_timeout);
scuba.log_with_msg("Insertion timed out", None);
}
Ok((stats, Err(err))) => {
scuba.add("error", err.to_string());
scuba.add("time_s", stats.completion_time.as_secs_f64());
scuba.log_with_msg("Insertion failed", None);
}
Ok((stats, Ok(added_to_commit_graph))) => {
scuba.add("time_s", stats.completion_time.as_secs_f64());
scuba.add("num_added", added_to_commit_graph);
if added_to_commit_graph > 0 {
scuba.log_with_msg("Insertion succeeded", None);
} else {
scuba.log_with_msg("Changeset already stored", None);
}
}
}
}
async fn add(&self, ctx: &CoreContext, cs: ChangesetInsert) -> Result<bool> {
let (added_to_changesets, ()) =
futures::try_join!(self.changesets.add(ctx, cs.clone()), async move {
self.maybe_write_to_new_commit_graph(
ctx,
vec1![(cs.cs_id, SmallVec::from_vec(cs.parents))],
)
.await;
Ok(())
})?;
Ok(added_to_changesets)
}
@ -137,19 +150,19 @@ impl Changesets for ChangesetsCommitGraphCompat {
&self,
ctx: &CoreContext,
css: Vec1<(ChangesetInsert, Generation)>,
) -> Result<(), Error> {
// TODO(yancouto): optimise this by doing few inner calls
for (cs, _gen) in css {
self.add(ctx, cs).await?;
}
) -> Result<()> {
futures::try_join!(self.changesets.add_many(ctx, css.clone()), async move {
self.maybe_write_to_new_commit_graph(
ctx,
css.mapped(|(cs, _)| (cs.cs_id, SmallVec::from_vec(cs.parents))),
)
.await;
Ok(())
})?;
Ok(())
}
async fn get(
&self,
ctx: &CoreContext,
cs_id: ChangesetId,
) -> Result<Option<ChangesetEntry>, Error> {
async fn get(&self, ctx: &CoreContext, cs_id: ChangesetId) -> Result<Option<ChangesetEntry>> {
self.changesets.get(ctx, cs_id).await
}
@ -157,7 +170,7 @@ impl Changesets for ChangesetsCommitGraphCompat {
&self,
ctx: &CoreContext,
cs_ids: Vec<ChangesetId>,
) -> Result<Vec<ChangesetEntry>, Error> {
) -> Result<Vec<ChangesetEntry>> {
self.changesets.get_many(ctx, cs_ids).await
}
@ -166,7 +179,7 @@ impl Changesets for ChangesetsCommitGraphCompat {
ctx: &CoreContext,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
) -> Result<ChangesetIdsResolvedFromPrefix> {
self.changesets
.get_many_by_prefix(ctx, cs_prefix, limit)
.await
@ -194,7 +207,7 @@ impl Changesets for ChangesetsCommitGraphCompat {
max_id: u64,
sort_and_limit: Option<(SortOrder, u64)>,
read_from_master: bool,
) -> BoxStream<'_, Result<(ChangesetId, u64), Error>> {
) -> BoxStream<'_, Result<(ChangesetId, u64)>> {
self.changesets.list_enumeration_range(
ctx,
min_id,

View File

@ -15,3 +15,4 @@ drawdag = { version = "0.1.0", path = "../../../../scm/lib/drawdag" }
in_memory_commit_graph_storage = { version = "0.1.0", path = "../in_memory_commit_graph_storage" }
mononoke_types = { version = "0.1.0", path = "../../../mononoke_types" }
smallvec = { version = "1.6.1", features = ["serde", "specialization", "union"] }
vec1 = { version = "1", features = ["serde"] }

View File

@ -18,6 +18,7 @@ use mononoke_types::ChangesetIdPrefix;
use mononoke_types::ChangesetIdsResolvedFromPrefix;
use mononoke_types::RepositoryId;
use smallvec::smallvec;
use vec1::vec1;
use crate::utils::*;
@ -445,8 +446,7 @@ pub async fn test_add_recursive(
.add_recursive(
ctx,
reference_graph.clone(),
name_cs_id("I"),
smallvec![name_cs_id("H")],
vec1![(name_cs_id("I"), smallvec![name_cs_id("H")])],
)
.await?,
9
@ -456,8 +456,7 @@ pub async fn test_add_recursive(
.add_recursive(
ctx,
reference_graph,
name_cs_id("J"),
smallvec![name_cs_id("F")],
vec1![(name_cs_id("J"), smallvec![name_cs_id("F")])]
)
.await?,
1

View File

@ -80,6 +80,7 @@ strum_macros = "0.21"
tokio = { version = "1.21.2", features = ["full", "test-util", "tracing"] }
tokio-util = { version = "0.6", features = ["full"] }
unodes = { version = "0.1.0", path = "../../derived_data/unodes" }
vec1 = { version = "1", features = ["serde"] }
[dev-dependencies]
fbinit-tokio = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }

View File

@ -13,6 +13,7 @@ use context::CoreContext;
use futures_stats::TimedFutureExt;
use mononoke_types::ChangesetId;
use smallvec::ToSmallVec;
use vec1::vec1;
use super::Repo;
@ -36,7 +37,7 @@ pub(super) async fn backfill_one(
let (stats, result) = repo
.commit_graph()
.add_recursive(ctx, changeset_fetcher, args.commit_id, parents)
.add_recursive(ctx, changeset_fetcher, vec1![(args.commit_id, parents)])
.timed()
.await;