newadmin: add command for updating/building preloaded commit graph blob

Summary: Adds a new command update-preloaded that tries to load a blob of the preloaded commit graph, update it with any newly added changeset edges, and re-upload it to the blobstore.

Reviewed By: mitrandir77

Differential Revision: D45993560

fbshipit-source-id: 4d14cbf091d86f165ed58fbba9305f0fa6021632
This commit is contained in:
Youssef Ibrahim 2023-05-22 15:15:01 -07:00 committed by Facebook GitHub Bot
parent 1b8ae05406
commit 8b2561119a
4 changed files with 473 additions and 0 deletions

View File

@ -561,6 +561,221 @@ mononoke_queries! {
"
}
// The only difference between mysql and sqlite is the FORCE INDEX
read SelectManyChangesetsInIdRange(repo_id: RepositoryId, start_id: u64, end_id: u64, limit: u64) -> (
ChangesetId, // cs_id
Option<u64>, // gen
Option<u64>, // skip_tree_depth
Option<u64>, // p1_linear_depth
Option<usize>, // parent_count
Option<ChangesetId>, // merge_ancestor
Option<u64>, // merge_ancestor_gen
Option<u64>, // merge_ancestor_skip_tree_depth
Option<u64>, // merge_ancestor_p1_linear_depth
Option<ChangesetId>, // skip_tree_parent
Option<u64>, // skip_tree_parent_gen
Option<u64>, // skip_tree_parent_skip_tree_depth
Option<u64>, // skip_tree_parent_p1_linear_depth
Option<ChangesetId>, // skip_tree_skew_ancestor
Option<u64>, // skip_tree_skew_ancestor_gen
Option<u64>, // skip_tree_skew_ancestor_skip_tree_depth
Option<u64>, // skip_tree_skew_ancestor_p1_linear_depth
Option<ChangesetId>, // p1_linear_skew_ancestor
Option<u64>, // p1_linear_skew_ancestor_gen
Option<u64>, // p1_linear_skew_ancestor_skip_tree_depth
Option<u64>, // p1_linear_skew_ancestor_p1_linear_depth
usize, // parent_num
Option<ChangesetId>, // parent
Option<u64>, // parent_gen
Option<u64>, // parent_skip_tree_depth
Option<u64>, // parent_p1_linear_depth
) {
mysql("WITH csp AS (
SELECT cs.id
FROM commit_graph_edges cs FORCE INDEX(repo_id_id)
WHERE cs.repo_id = {repo_id} AND cs.id >= {start_id} AND cs.id <= {end_id}
ORDER BY cs.id ASC
LIMIT {limit}
)
SELECT
cs0.cs_id AS cs_id,
NULL AS gen,
NULL AS skip_tree_depth,
NULL AS p1_linear_depth,
NULL AS parent_count,
NULL AS merge_ancestor,
NULL AS merge_ancestor_gen,
NULL AS merge_ancestor_skip_tree_depth,
NULL AS merge_ancestor_p1_linear_depth,
NULL AS skip_tree_parent,
NULL AS skip_tree_parent_gen,
NULL AS skip_tree_parent_skip_tree_depth,
NULL AS skip_tree_parent_p1_linear_depth,
NULL AS skip_tree_skew_ancestor,
NULL AS skip_tree_skew_ancestor_gen,
NULL AS skip_tree_skew_ancestor_skip_tree_depth,
NULL AS skip_tree_skew_ancestor_p1_linear_depth,
NULL AS p1_linear_skew_ancestor,
NULL AS p1_linear_skew_ancestor_gen,
NULL AS p1_linear_skew_ancestor_skip_tree_depth,
NULL AS p1_linear_skew_ancestor_p1_linear_depth,
cgmp.parent_num AS parent_num,
cs1.cs_id AS parent,
cs1.gen AS parent_gen,
cs1.skip_tree_depth AS parent_skip_tree_depth,
cs1.p1_linear_depth AS parent_p1_linear_depth
FROM csp
INNER JOIN commit_graph_merge_parents cgmp ON csp.id = cgmp.id
INNER JOIN commit_graph_edges cs0 ON cs0.id = cgmp.id
INNER JOIN commit_graph_edges cs1 ON cs1.id = cgmp.parent
WHERE cs0.parent_count >= 2
UNION
SELECT
cs0.cs_id AS cs_id,
cs0.gen AS gen,
cs0.skip_tree_depth AS skip_tree_depth,
cs0.p1_linear_depth AS p1_linear_depth,
cs0.parent_count AS parent_count,
cs_merge_ancestor.cs_id AS merge_ancestor,
cs_merge_ancestor.gen AS merge_ancestor_gen,
cs_merge_ancestor.skip_tree_depth AS merge_ancestor_skip_tree_depth,
cs_merge_ancestor.p1_linear_depth AS merge_ancestor_p1_linear_depth,
cs_skip_tree_parent.cs_id AS skip_tree_parent,
cs_skip_tree_parent.gen AS skip_tree_parent_gen,
cs_skip_tree_parent.skip_tree_depth AS skip_tree_parent_skip_tree_depth,
cs_skip_tree_parent.p1_linear_depth AS skip_tree_parent_p1_linear_depth,
cs_skip_tree_skew_ancestor.cs_id AS skip_tree_skew_ancestor,
cs_skip_tree_skew_ancestor.gen AS skip_tree_skew_ancestor_gen,
cs_skip_tree_skew_ancestor.skip_tree_depth AS skip_tree_skew_ancestor_skip_tree_depth,
cs_skip_tree_skew_ancestor.p1_linear_depth AS skip_tree_skew_ancestor_p1_linear_depth,
cs_p1_linear_skew_ancestor.cs_id AS p1_linear_skew_ancestor,
cs_p1_linear_skew_ancestor.gen AS p1_linear_skew_ancestor_gen,
cs_p1_linear_skew_ancestor.skip_tree_depth AS p1_linear_skew_ancestor_skip_tree_depth,
cs_p1_linear_skew_ancestor.p1_linear_depth AS p1_linear_skew_ancestor_p1_linear_depth,
0 AS parent_num,
cs_p1_parent.cs_id AS parent,
cs_p1_parent.gen AS parent_gen,
cs_p1_parent.skip_tree_depth AS parent_skip_tree_depth,
cs_p1_parent.p1_linear_depth AS parent_p1_linear_depth
FROM csp
INNER JOIN commit_graph_edges cs0 ON csp.id = cs0.id
LEFT JOIN commit_graph_edges cs_p1_parent ON cs_p1_parent.id = cs0.p1_parent
LEFT JOIN commit_graph_edges cs_merge_ancestor ON cs_merge_ancestor.id = cs0.merge_ancestor
LEFT JOIN commit_graph_edges cs_skip_tree_parent ON cs_skip_tree_parent.id = cs0.skip_tree_parent
LEFT JOIN commit_graph_edges cs_skip_tree_skew_ancestor ON cs_skip_tree_skew_ancestor.id = cs0.skip_tree_skew_ancestor
LEFT JOIN commit_graph_edges cs_p1_linear_skew_ancestor ON cs_p1_linear_skew_ancestor.id = cs0.p1_linear_skew_ancestor
ORDER BY parent_num ASC")
sqlite("WITH csp AS (
SELECT cs.id
FROM commit_graph_edges cs
WHERE cs.repo_id = {repo_id} AND cs.id >= {start_id} AND cs.id <= {end_id}
ORDER BY cs.id ASC
LIMIT {limit}
)
SELECT
cs0.cs_id AS cs_id,
NULL AS gen,
NULL AS skip_tree_depth,
NULL AS p1_linear_depth,
NULL AS parent_count,
NULL AS merge_ancestor,
NULL AS merge_ancestor_gen,
NULL AS merge_ancestor_skip_tree_depth,
NULL AS merge_ancestor_p1_linear_depth,
NULL AS skip_tree_parent,
NULL AS skip_tree_parent_gen,
NULL AS skip_tree_parent_skip_tree_depth,
NULL AS skip_tree_parent_p1_linear_depth,
NULL AS skip_tree_skew_ancestor,
NULL AS skip_tree_skew_ancestor_gen,
NULL AS skip_tree_skew_ancestor_skip_tree_depth,
NULL AS skip_tree_skew_ancestor_p1_linear_depth,
NULL AS p1_linear_skew_ancestor,
NULL AS p1_linear_skew_ancestor_gen,
NULL AS p1_linear_skew_ancestor_skip_tree_depth,
NULL AS p1_linear_skew_ancestor_p1_linear_depth,
cgmp.parent_num AS parent_num,
cs1.cs_id AS parent,
cs1.gen AS parent_gen,
cs1.skip_tree_depth AS parent_skip_tree_depth,
cs1.p1_linear_depth AS parent_p1_linear_depth
FROM csp
INNER JOIN commit_graph_merge_parents cgmp ON csp.id = cgmp.id
INNER JOIN commit_graph_edges cs0 ON cs0.id = cgmp.id
INNER JOIN commit_graph_edges cs1 ON cs1.id = cgmp.parent
WHERE cs0.parent_count >= 2
UNION
SELECT
cs0.cs_id AS cs_id,
cs0.gen AS gen,
cs0.skip_tree_depth AS skip_tree_depth,
cs0.p1_linear_depth AS p1_linear_depth,
cs0.parent_count AS parent_count,
cs_merge_ancestor.cs_id AS merge_ancestor,
cs_merge_ancestor.gen AS merge_ancestor_gen,
cs_merge_ancestor.skip_tree_depth AS merge_ancestor_skip_tree_depth,
cs_merge_ancestor.p1_linear_depth AS merge_ancestor_p1_linear_depth,
cs_skip_tree_parent.cs_id AS skip_tree_parent,
cs_skip_tree_parent.gen AS skip_tree_parent_gen,
cs_skip_tree_parent.skip_tree_depth AS skip_tree_parent_skip_tree_depth,
cs_skip_tree_parent.p1_linear_depth AS skip_tree_parent_p1_linear_depth,
cs_skip_tree_skew_ancestor.cs_id AS skip_tree_skew_ancestor,
cs_skip_tree_skew_ancestor.gen AS skip_tree_skew_ancestor_gen,
cs_skip_tree_skew_ancestor.skip_tree_depth AS skip_tree_skew_ancestor_skip_tree_depth,
cs_skip_tree_skew_ancestor.p1_linear_depth AS skip_tree_skew_ancestor_p1_linear_depth,
cs_p1_linear_skew_ancestor.cs_id AS p1_linear_skew_ancestor,
cs_p1_linear_skew_ancestor.gen AS p1_linear_skew_ancestor_gen,
cs_p1_linear_skew_ancestor.skip_tree_depth AS p1_linear_skew_ancestor_skip_tree_depth,
cs_p1_linear_skew_ancestor.p1_linear_depth AS p1_linear_skew_ancestor_p1_linear_depth,
0 AS parent_num,
cs_p1_parent.cs_id AS parent,
cs_p1_parent.gen AS parent_gen,
cs_p1_parent.skip_tree_depth AS parent_skip_tree_depth,
cs_p1_parent.p1_linear_depth AS parent_p1_linear_depth
FROM csp
INNER JOIN commit_graph_edges cs0 ON csp.id = cs0.id
LEFT JOIN commit_graph_edges cs_p1_parent ON cs_p1_parent.id = cs0.p1_parent
LEFT JOIN commit_graph_edges cs_merge_ancestor ON cs_merge_ancestor.id = cs0.merge_ancestor
LEFT JOIN commit_graph_edges cs_skip_tree_parent ON cs_skip_tree_parent.id = cs0.skip_tree_parent
LEFT JOIN commit_graph_edges cs_skip_tree_skew_ancestor ON cs_skip_tree_skew_ancestor.id = cs0.skip_tree_skew_ancestor
LEFT JOIN commit_graph_edges cs_p1_linear_skew_ancestor ON cs_p1_linear_skew_ancestor.id = cs0.p1_linear_skew_ancestor
ORDER BY parent_num ASC")
}
// The only difference between mysql and sqlite is the FORCE INDEX
read SelectMaxIdInRange(repo_id: RepositoryId, start_id: u64, end_id: u64, limit: u64) -> (u64) {
mysql("SELECT MAX(id)
FROM (
SELECT id
FROM commit_graph_edges FORCE INDEX(repo_id_id)
WHERE repo_id = {repo_id} AND id >= {start_id} AND id <= {end_id}
ORDER BY id ASC
LIMIT {limit}
) AS ids")
sqlite("SELECT MAX(id)
FROM (
SELECT id
FROM commit_graph_edges
WHERE repo_id = {repo_id} AND id >= {start_id} AND id <= {end_id}
ORDER BY id ASC
LIMIT {limit}
) AS ids")
}
read SelectMaxId(repo_id: RepositoryId) -> (u64) {
"
SELECT MAX(id)
FROM commit_graph_edges
WHERE repo_id = {repo_id}
"
}
read SelectChangesetsInRange(repo_id: RepositoryId, min_id: ChangesetId, max_id: ChangesetId, limit: usize) -> (ChangesetId) {
"
SELECT cs_id
@ -782,6 +997,61 @@ impl SqlCommitGraphStorage {
.collect())
}
}
// Lower level APIs for quickly iterating over all changeset edges
/// Fetch a maximum of `limit` changeset edges for changesets having
/// auto-increment ids between `start_id` and `end_id`.
pub async fn fetch_many_edges_in_id_range(
&self,
ctx: &CoreContext,
start_id: u64,
end_id: u64,
limit: u64,
) -> Result<HashMap<ChangesetId, ChangesetEdges>> {
Ok(Self::collect_changeset_edges(
&SelectManyChangesetsInIdRange::query(
&self.read_connection.conn,
&self.repo_id,
&start_id,
&end_id,
&limit,
)
.await?,
))
}
/// Returns the maximum auto-increment id for any changeset in the repo,
/// or `None` if there are no changesets.
pub async fn max_id(&self, ctx: &CoreContext) -> Result<Option<u64>> {
Ok(
SelectMaxId::query(&self.read_connection.conn, &self.repo_id)
.await?
.first()
.map(|(id,)| *id),
)
}
/// Returns the maximum auto-increment id changesets having auto-increment
/// ids between `start_id` and `end_id`, or `None` if there are no changesets.
pub async fn max_id_in_range(
&self,
ctx: &CoreContext,
start_id: u64,
end_id: u64,
limit: u64,
) -> Result<Option<u64>> {
Ok(SelectMaxIdInRange::query(
&self.read_connection.conn,
&self.repo_id,
&start_id,
&end_id,
&limit,
)
.await?
.first()
.map(|(id,)| *id))
}
}
#[async_trait]

View File

@ -73,6 +73,7 @@ mutable_counters = { version = "0.1.0", path = "../../mutable_counters" }
mutable_renames = { version = "0.1.0", path = "../../mutable_renames" }
once_cell = "1.12"
phases = { version = "0.1.0", path = "../../phases" }
preloaded_commit_graph_storage = { version = "0.1.0", path = "../../repo_attributes/commit_graph/preloaded_commit_graph_storage" }
prettytable-rs = "0.10"
pushrebase = { version = "0.1.0", path = "../../pushrebase" }
pushrebase_mutation_mapping = { version = "0.1.0", path = "../../pushrebase_mutation_mapping" }

View File

@ -10,6 +10,7 @@ mod backfill;
mod backfill_one;
mod checkpoints;
mod range_stream;
mod update_preloaded;
use ancestors_difference::AncestorsDifferenceArgs;
use anyhow::Result;
@ -28,7 +29,9 @@ use metaconfig_types::RepoConfig;
use mononoke_app::args::RepoArgs;
use mononoke_app::MononokeApp;
use range_stream::RangeStreamArgs;
use repo_blobstore::RepoBlobstore;
use repo_identity::RepoIdentity;
use update_preloaded::UpdatePreloadedArgs;
#[derive(Parser)]
pub struct CommandArgs {
@ -49,6 +52,8 @@ pub enum CommitGraphSubcommand {
AncestorsDifference(AncestorsDifferenceArgs),
/// Display ids of all commits that are simultaneously a descendant of one commit (start) and an ancestor of another (end) in topological order.
RangeStream(RangeStreamArgs),
/// Update preloaded commit graph and upload it to blobstore.
UpdatePreloaded(UpdatePreloadedArgs),
}
#[facet::container]
@ -79,6 +84,9 @@ pub struct Repo {
#[facet]
bonsai_svnrev_mapping: dyn BonsaiSvnrevMapping,
#[facet]
repo_blobstore: RepoBlobstore,
}
pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
@ -96,5 +104,8 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
CommitGraphSubcommand::RangeStream(args) => {
range_stream::range_stream(&ctx, &repo, args).await
}
CommitGraphSubcommand::UpdatePreloaded(args) => {
update_preloaded::update_preloaded(&ctx, &app, &repo, args).await
}
}
}

View File

@ -0,0 +1,191 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::collections::HashMap;
use anyhow::anyhow;
use anyhow::Result;
use blobstore::Blobstore;
use clap::Args;
use commit_graph_types::edges::ChangesetEdges;
use context::CoreContext;
use fbthrift::compact_protocol;
use metaconfig_types::RepoConfigRef;
use mononoke_app::MononokeApp;
use mononoke_types::BlobstoreBytes;
use mononoke_types::ChangesetId;
use preloaded_commit_graph_storage::ExtendablePreloadedEdges;
use rendezvous::RendezVousOptions;
use repo_blobstore::RepoBlobstoreRef;
use repo_identity::RepoIdentityRef;
use sql_commit_graph_storage::SqlCommitGraphStorage;
use sql_commit_graph_storage::SqlCommitGraphStorageBuilder;
use tokio::time::sleep;
use tokio::time::Duration;
use super::Repo;
#[derive(Args)]
pub struct UpdatePreloadedArgs {
/// Blobstore key for the preloaded commit graph.
#[clap(long)]
blobstore_key: String,
/// Whether to rebuild the preloaded commit graph or start
/// from the previous blob.
#[clap(long)]
rebuild: bool,
/// Number of times to retry fetching changeset edges
/// from the database.
#[clap(long, default_value_t = 0)]
sql_retries: u64,
/// Maximum number of changeset edges in a chunk
/// fetched from the database.
#[clap(long)]
chunk_size: u64,
/// Sleep time between fetching changeset edges in milliseconds.
#[clap(long)]
sleep_ms: u64,
}
async fn try_fetch_chunk(
ctx: &CoreContext,
sql_storage: &SqlCommitGraphStorage,
start_id: u64,
end_id: u64,
chunk_size: u64,
mut sql_retries: u64,
sleep_ms: u64,
) -> Result<HashMap<ChangesetId, ChangesetEdges>> {
loop {
match sql_storage
.fetch_many_edges_in_id_range(ctx, start_id, end_id, chunk_size)
.await
{
Ok(edges) => return Ok(edges),
Err(err) => match sql_retries {
0 => return Err(err),
_ => {
println!("{:?}", err);
println!("Retrying fetching changeset edges");
sql_retries -= 1;
sleep(Duration::from_millis(sleep_ms)).await;
}
},
}
}
}
pub(super) async fn update_preloaded(
ctx: &CoreContext,
app: &MononokeApp,
repo: &Repo,
args: UpdatePreloadedArgs,
) -> Result<()> {
let sql_storage = app
.repo_factory()
.sql_factory(&repo.repo_config().storage_config.metadata)
.await?
.open::<SqlCommitGraphStorageBuilder>()
.await?
.build(
RendezVousOptions {
free_connections: 5,
},
repo.repo_identity().id(),
);
let preloaded_edges = match args.rebuild {
false => match repo.repo_blobstore().get(ctx, &args.blobstore_key).await? {
Some(bytes) => {
preloaded_commit_graph_storage::deserialize_preloaded_edges(bytes.into_raw_bytes())?
}
None => Default::default(),
},
true => Default::default(),
};
// The newly added changesets all have higher sql ids than the maximum
// id from the previously preloaded changesets.
let mut start_id = preloaded_edges
.max_sql_id
.map_or(1, |id| id.saturating_add(1));
// Query the maximum sql id for this repo only once to avoid tailing
// new changesets.
let end_id = sql_storage.max_id(ctx).await?.unwrap_or(0);
println!(
"Updating with changesets having sql ids between {} and {} inclusive",
start_id, end_id
);
let mut extendable_preloaded_edges =
ExtendablePreloadedEdges::from_preloaded_edges(preloaded_edges);
while start_id <= end_id {
// Tries to fetch the first chunk_size changeset edges between
// start_id and end_id.
let edges_chunk = try_fetch_chunk(
ctx,
&sql_storage,
start_id,
end_id,
args.chunk_size,
args.sql_retries,
args.sleep_ms,
)
.await?;
if edges_chunk.is_empty() {
break;
}
// Query the maximum sql id from the fetched chunk to fetch the next
// chunks from after it.
let max_id_in_chunk = sql_storage
.max_id_in_range(ctx, start_id, end_id, edges_chunk.len() as u64)
.await?
.ok_or_else(|| anyhow!("Chunk is not empty but couldn't find max id"))?;
println!(
"Fetched chunk containing {} edges. Maximum sql id in chunk is {}",
edges_chunk.len(),
max_id_in_chunk
);
for (_cs_id, edges) in edges_chunk {
extendable_preloaded_edges.add(edges)?;
}
extendable_preloaded_edges.update_max_sql_id(max_id_in_chunk);
start_id = max_id_in_chunk + 1;
println!("Extended preloaded edges with chunk");
sleep(Duration::from_millis(args.sleep_ms)).await;
}
println!("Deserializing preloaded edges");
let bytes = compact_protocol::serialize(
&extendable_preloaded_edges
.into_preloaded_edges()
.to_thrift()?,
);
println!("Deserialized preloaded edges into {} bytes", bytes.len());
repo.repo_blobstore()
.put(ctx, args.blobstore_key, BlobstoreBytes::from_bytes(bytes))
.await?;
println!("Uploaded updated preloaded edges to blobstore");
Ok(())
}