From 8b2561119a29b7d5fe45d2fb025db3d7ccab8dc3 Mon Sep 17 00:00:00 2001 From: Youssef Ibrahim Date: Mon, 22 May 2023 15:15:01 -0700 Subject: [PATCH] newadmin: add command for updating/building preloaded commit graph blob Summary: Adds a new command update-preloaded that tries to load a blob of the preloaded commit graph, update it with any newly added changeset edges, and re-upload it to the blobstore. Reviewed By: mitrandir77 Differential Revision: D45993560 fbshipit-source-id: 4d14cbf091d86f165ed58fbba9305f0fa6021632 --- .../sql_commit_graph_storage/src/lib.rs | 270 ++++++++++++++++++ eden/mononoke/tools/admin/Cargo.toml | 1 + .../admin/src/commands/commit_graph/mod.rs | 11 + .../commands/commit_graph/update_preloaded.rs | 191 +++++++++++++ 4 files changed, 473 insertions(+) create mode 100644 eden/mononoke/tools/admin/src/commands/commit_graph/update_preloaded.rs diff --git a/eden/mononoke/repo_attributes/commit_graph/sql_commit_graph_storage/src/lib.rs b/eden/mononoke/repo_attributes/commit_graph/sql_commit_graph_storage/src/lib.rs index 940d376e47..399474c1c1 100644 --- a/eden/mononoke/repo_attributes/commit_graph/sql_commit_graph_storage/src/lib.rs +++ b/eden/mononoke/repo_attributes/commit_graph/sql_commit_graph_storage/src/lib.rs @@ -561,6 +561,221 @@ mononoke_queries! { " } + // The only difference between mysql and sqlite is the FORCE INDEX + read SelectManyChangesetsInIdRange(repo_id: RepositoryId, start_id: u64, end_id: u64, limit: u64) -> ( + ChangesetId, // cs_id + Option, // gen + Option, // skip_tree_depth + Option, // p1_linear_depth + Option, // parent_count + Option, // merge_ancestor + Option, // merge_ancestor_gen + Option, // merge_ancestor_skip_tree_depth + Option, // merge_ancestor_p1_linear_depth + Option, // skip_tree_parent + Option, // skip_tree_parent_gen + Option, // skip_tree_parent_skip_tree_depth + Option, // skip_tree_parent_p1_linear_depth + Option, // skip_tree_skew_ancestor + Option, // skip_tree_skew_ancestor_gen + Option, // skip_tree_skew_ancestor_skip_tree_depth + Option, // skip_tree_skew_ancestor_p1_linear_depth + Option, // p1_linear_skew_ancestor + Option, // p1_linear_skew_ancestor_gen + Option, // p1_linear_skew_ancestor_skip_tree_depth + Option, // p1_linear_skew_ancestor_p1_linear_depth + usize, // parent_num + Option, // parent + Option, // parent_gen + Option, // parent_skip_tree_depth + Option, // parent_p1_linear_depth + ) { + mysql("WITH csp AS ( + SELECT cs.id + FROM commit_graph_edges cs FORCE INDEX(repo_id_id) + WHERE cs.repo_id = {repo_id} AND cs.id >= {start_id} AND cs.id <= {end_id} + ORDER BY cs.id ASC + LIMIT {limit} + ) + + SELECT + cs0.cs_id AS cs_id, + NULL AS gen, + NULL AS skip_tree_depth, + NULL AS p1_linear_depth, + NULL AS parent_count, + NULL AS merge_ancestor, + NULL AS merge_ancestor_gen, + NULL AS merge_ancestor_skip_tree_depth, + NULL AS merge_ancestor_p1_linear_depth, + NULL AS skip_tree_parent, + NULL AS skip_tree_parent_gen, + NULL AS skip_tree_parent_skip_tree_depth, + NULL AS skip_tree_parent_p1_linear_depth, + NULL AS skip_tree_skew_ancestor, + NULL AS skip_tree_skew_ancestor_gen, + NULL AS skip_tree_skew_ancestor_skip_tree_depth, + NULL AS skip_tree_skew_ancestor_p1_linear_depth, + NULL AS p1_linear_skew_ancestor, + NULL AS p1_linear_skew_ancestor_gen, + NULL AS p1_linear_skew_ancestor_skip_tree_depth, + NULL AS p1_linear_skew_ancestor_p1_linear_depth, + cgmp.parent_num AS parent_num, + cs1.cs_id AS parent, + cs1.gen AS parent_gen, + cs1.skip_tree_depth AS parent_skip_tree_depth, + cs1.p1_linear_depth AS parent_p1_linear_depth + FROM csp + INNER JOIN commit_graph_merge_parents cgmp ON csp.id = cgmp.id + INNER JOIN commit_graph_edges cs0 ON cs0.id = cgmp.id + INNER JOIN commit_graph_edges cs1 ON cs1.id = cgmp.parent + WHERE cs0.parent_count >= 2 + + UNION + + SELECT + cs0.cs_id AS cs_id, + cs0.gen AS gen, + cs0.skip_tree_depth AS skip_tree_depth, + cs0.p1_linear_depth AS p1_linear_depth, + cs0.parent_count AS parent_count, + cs_merge_ancestor.cs_id AS merge_ancestor, + cs_merge_ancestor.gen AS merge_ancestor_gen, + cs_merge_ancestor.skip_tree_depth AS merge_ancestor_skip_tree_depth, + cs_merge_ancestor.p1_linear_depth AS merge_ancestor_p1_linear_depth, + cs_skip_tree_parent.cs_id AS skip_tree_parent, + cs_skip_tree_parent.gen AS skip_tree_parent_gen, + cs_skip_tree_parent.skip_tree_depth AS skip_tree_parent_skip_tree_depth, + cs_skip_tree_parent.p1_linear_depth AS skip_tree_parent_p1_linear_depth, + cs_skip_tree_skew_ancestor.cs_id AS skip_tree_skew_ancestor, + cs_skip_tree_skew_ancestor.gen AS skip_tree_skew_ancestor_gen, + cs_skip_tree_skew_ancestor.skip_tree_depth AS skip_tree_skew_ancestor_skip_tree_depth, + cs_skip_tree_skew_ancestor.p1_linear_depth AS skip_tree_skew_ancestor_p1_linear_depth, + cs_p1_linear_skew_ancestor.cs_id AS p1_linear_skew_ancestor, + cs_p1_linear_skew_ancestor.gen AS p1_linear_skew_ancestor_gen, + cs_p1_linear_skew_ancestor.skip_tree_depth AS p1_linear_skew_ancestor_skip_tree_depth, + cs_p1_linear_skew_ancestor.p1_linear_depth AS p1_linear_skew_ancestor_p1_linear_depth, + 0 AS parent_num, + cs_p1_parent.cs_id AS parent, + cs_p1_parent.gen AS parent_gen, + cs_p1_parent.skip_tree_depth AS parent_skip_tree_depth, + cs_p1_parent.p1_linear_depth AS parent_p1_linear_depth + FROM csp + INNER JOIN commit_graph_edges cs0 ON csp.id = cs0.id + LEFT JOIN commit_graph_edges cs_p1_parent ON cs_p1_parent.id = cs0.p1_parent + LEFT JOIN commit_graph_edges cs_merge_ancestor ON cs_merge_ancestor.id = cs0.merge_ancestor + LEFT JOIN commit_graph_edges cs_skip_tree_parent ON cs_skip_tree_parent.id = cs0.skip_tree_parent + LEFT JOIN commit_graph_edges cs_skip_tree_skew_ancestor ON cs_skip_tree_skew_ancestor.id = cs0.skip_tree_skew_ancestor + LEFT JOIN commit_graph_edges cs_p1_linear_skew_ancestor ON cs_p1_linear_skew_ancestor.id = cs0.p1_linear_skew_ancestor + ORDER BY parent_num ASC") + sqlite("WITH csp AS ( + SELECT cs.id + FROM commit_graph_edges cs + WHERE cs.repo_id = {repo_id} AND cs.id >= {start_id} AND cs.id <= {end_id} + ORDER BY cs.id ASC + LIMIT {limit} + ) + + SELECT + cs0.cs_id AS cs_id, + NULL AS gen, + NULL AS skip_tree_depth, + NULL AS p1_linear_depth, + NULL AS parent_count, + NULL AS merge_ancestor, + NULL AS merge_ancestor_gen, + NULL AS merge_ancestor_skip_tree_depth, + NULL AS merge_ancestor_p1_linear_depth, + NULL AS skip_tree_parent, + NULL AS skip_tree_parent_gen, + NULL AS skip_tree_parent_skip_tree_depth, + NULL AS skip_tree_parent_p1_linear_depth, + NULL AS skip_tree_skew_ancestor, + NULL AS skip_tree_skew_ancestor_gen, + NULL AS skip_tree_skew_ancestor_skip_tree_depth, + NULL AS skip_tree_skew_ancestor_p1_linear_depth, + NULL AS p1_linear_skew_ancestor, + NULL AS p1_linear_skew_ancestor_gen, + NULL AS p1_linear_skew_ancestor_skip_tree_depth, + NULL AS p1_linear_skew_ancestor_p1_linear_depth, + cgmp.parent_num AS parent_num, + cs1.cs_id AS parent, + cs1.gen AS parent_gen, + cs1.skip_tree_depth AS parent_skip_tree_depth, + cs1.p1_linear_depth AS parent_p1_linear_depth + FROM csp + INNER JOIN commit_graph_merge_parents cgmp ON csp.id = cgmp.id + INNER JOIN commit_graph_edges cs0 ON cs0.id = cgmp.id + INNER JOIN commit_graph_edges cs1 ON cs1.id = cgmp.parent + WHERE cs0.parent_count >= 2 + + UNION + + SELECT + cs0.cs_id AS cs_id, + cs0.gen AS gen, + cs0.skip_tree_depth AS skip_tree_depth, + cs0.p1_linear_depth AS p1_linear_depth, + cs0.parent_count AS parent_count, + cs_merge_ancestor.cs_id AS merge_ancestor, + cs_merge_ancestor.gen AS merge_ancestor_gen, + cs_merge_ancestor.skip_tree_depth AS merge_ancestor_skip_tree_depth, + cs_merge_ancestor.p1_linear_depth AS merge_ancestor_p1_linear_depth, + cs_skip_tree_parent.cs_id AS skip_tree_parent, + cs_skip_tree_parent.gen AS skip_tree_parent_gen, + cs_skip_tree_parent.skip_tree_depth AS skip_tree_parent_skip_tree_depth, + cs_skip_tree_parent.p1_linear_depth AS skip_tree_parent_p1_linear_depth, + cs_skip_tree_skew_ancestor.cs_id AS skip_tree_skew_ancestor, + cs_skip_tree_skew_ancestor.gen AS skip_tree_skew_ancestor_gen, + cs_skip_tree_skew_ancestor.skip_tree_depth AS skip_tree_skew_ancestor_skip_tree_depth, + cs_skip_tree_skew_ancestor.p1_linear_depth AS skip_tree_skew_ancestor_p1_linear_depth, + cs_p1_linear_skew_ancestor.cs_id AS p1_linear_skew_ancestor, + cs_p1_linear_skew_ancestor.gen AS p1_linear_skew_ancestor_gen, + cs_p1_linear_skew_ancestor.skip_tree_depth AS p1_linear_skew_ancestor_skip_tree_depth, + cs_p1_linear_skew_ancestor.p1_linear_depth AS p1_linear_skew_ancestor_p1_linear_depth, + 0 AS parent_num, + cs_p1_parent.cs_id AS parent, + cs_p1_parent.gen AS parent_gen, + cs_p1_parent.skip_tree_depth AS parent_skip_tree_depth, + cs_p1_parent.p1_linear_depth AS parent_p1_linear_depth + FROM csp + INNER JOIN commit_graph_edges cs0 ON csp.id = cs0.id + LEFT JOIN commit_graph_edges cs_p1_parent ON cs_p1_parent.id = cs0.p1_parent + LEFT JOIN commit_graph_edges cs_merge_ancestor ON cs_merge_ancestor.id = cs0.merge_ancestor + LEFT JOIN commit_graph_edges cs_skip_tree_parent ON cs_skip_tree_parent.id = cs0.skip_tree_parent + LEFT JOIN commit_graph_edges cs_skip_tree_skew_ancestor ON cs_skip_tree_skew_ancestor.id = cs0.skip_tree_skew_ancestor + LEFT JOIN commit_graph_edges cs_p1_linear_skew_ancestor ON cs_p1_linear_skew_ancestor.id = cs0.p1_linear_skew_ancestor + ORDER BY parent_num ASC") + } + + // The only difference between mysql and sqlite is the FORCE INDEX + read SelectMaxIdInRange(repo_id: RepositoryId, start_id: u64, end_id: u64, limit: u64) -> (u64) { + mysql("SELECT MAX(id) + FROM ( + SELECT id + FROM commit_graph_edges FORCE INDEX(repo_id_id) + WHERE repo_id = {repo_id} AND id >= {start_id} AND id <= {end_id} + ORDER BY id ASC + LIMIT {limit} + ) AS ids") + sqlite("SELECT MAX(id) + FROM ( + SELECT id + FROM commit_graph_edges + WHERE repo_id = {repo_id} AND id >= {start_id} AND id <= {end_id} + ORDER BY id ASC + LIMIT {limit} + ) AS ids") + } + + read SelectMaxId(repo_id: RepositoryId) -> (u64) { + " + SELECT MAX(id) + FROM commit_graph_edges + WHERE repo_id = {repo_id} + " + } + read SelectChangesetsInRange(repo_id: RepositoryId, min_id: ChangesetId, max_id: ChangesetId, limit: usize) -> (ChangesetId) { " SELECT cs_id @@ -782,6 +997,61 @@ impl SqlCommitGraphStorage { .collect()) } } + + // Lower level APIs for quickly iterating over all changeset edges + + /// Fetch a maximum of `limit` changeset edges for changesets having + /// auto-increment ids between `start_id` and `end_id`. + pub async fn fetch_many_edges_in_id_range( + &self, + ctx: &CoreContext, + start_id: u64, + end_id: u64, + limit: u64, + ) -> Result> { + Ok(Self::collect_changeset_edges( + &SelectManyChangesetsInIdRange::query( + &self.read_connection.conn, + &self.repo_id, + &start_id, + &end_id, + &limit, + ) + .await?, + )) + } + + /// Returns the maximum auto-increment id for any changeset in the repo, + /// or `None` if there are no changesets. + pub async fn max_id(&self, ctx: &CoreContext) -> Result> { + Ok( + SelectMaxId::query(&self.read_connection.conn, &self.repo_id) + .await? + .first() + .map(|(id,)| *id), + ) + } + + /// Returns the maximum auto-increment id changesets having auto-increment + /// ids between `start_id` and `end_id`, or `None` if there are no changesets. + pub async fn max_id_in_range( + &self, + ctx: &CoreContext, + start_id: u64, + end_id: u64, + limit: u64, + ) -> Result> { + Ok(SelectMaxIdInRange::query( + &self.read_connection.conn, + &self.repo_id, + &start_id, + &end_id, + &limit, + ) + .await? + .first() + .map(|(id,)| *id)) + } } #[async_trait] diff --git a/eden/mononoke/tools/admin/Cargo.toml b/eden/mononoke/tools/admin/Cargo.toml index b644bda122..2d7a4f1a78 100644 --- a/eden/mononoke/tools/admin/Cargo.toml +++ b/eden/mononoke/tools/admin/Cargo.toml @@ -73,6 +73,7 @@ mutable_counters = { version = "0.1.0", path = "../../mutable_counters" } mutable_renames = { version = "0.1.0", path = "../../mutable_renames" } once_cell = "1.12" phases = { version = "0.1.0", path = "../../phases" } +preloaded_commit_graph_storage = { version = "0.1.0", path = "../../repo_attributes/commit_graph/preloaded_commit_graph_storage" } prettytable-rs = "0.10" pushrebase = { version = "0.1.0", path = "../../pushrebase" } pushrebase_mutation_mapping = { version = "0.1.0", path = "../../pushrebase_mutation_mapping" } diff --git a/eden/mononoke/tools/admin/src/commands/commit_graph/mod.rs b/eden/mononoke/tools/admin/src/commands/commit_graph/mod.rs index 319ea23ae1..aa13c05109 100644 --- a/eden/mononoke/tools/admin/src/commands/commit_graph/mod.rs +++ b/eden/mononoke/tools/admin/src/commands/commit_graph/mod.rs @@ -10,6 +10,7 @@ mod backfill; mod backfill_one; mod checkpoints; mod range_stream; +mod update_preloaded; use ancestors_difference::AncestorsDifferenceArgs; use anyhow::Result; @@ -28,7 +29,9 @@ use metaconfig_types::RepoConfig; use mononoke_app::args::RepoArgs; use mononoke_app::MononokeApp; use range_stream::RangeStreamArgs; +use repo_blobstore::RepoBlobstore; use repo_identity::RepoIdentity; +use update_preloaded::UpdatePreloadedArgs; #[derive(Parser)] pub struct CommandArgs { @@ -49,6 +52,8 @@ pub enum CommitGraphSubcommand { AncestorsDifference(AncestorsDifferenceArgs), /// Display ids of all commits that are simultaneously a descendant of one commit (start) and an ancestor of another (end) in topological order. RangeStream(RangeStreamArgs), + /// Update preloaded commit graph and upload it to blobstore. + UpdatePreloaded(UpdatePreloadedArgs), } #[facet::container] @@ -79,6 +84,9 @@ pub struct Repo { #[facet] bonsai_svnrev_mapping: dyn BonsaiSvnrevMapping, + + #[facet] + repo_blobstore: RepoBlobstore, } pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> { @@ -96,5 +104,8 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> { CommitGraphSubcommand::RangeStream(args) => { range_stream::range_stream(&ctx, &repo, args).await } + CommitGraphSubcommand::UpdatePreloaded(args) => { + update_preloaded::update_preloaded(&ctx, &app, &repo, args).await + } } } diff --git a/eden/mononoke/tools/admin/src/commands/commit_graph/update_preloaded.rs b/eden/mononoke/tools/admin/src/commands/commit_graph/update_preloaded.rs new file mode 100644 index 0000000000..229fd80d5d --- /dev/null +++ b/eden/mononoke/tools/admin/src/commands/commit_graph/update_preloaded.rs @@ -0,0 +1,191 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use std::collections::HashMap; + +use anyhow::anyhow; +use anyhow::Result; +use blobstore::Blobstore; +use clap::Args; +use commit_graph_types::edges::ChangesetEdges; +use context::CoreContext; +use fbthrift::compact_protocol; +use metaconfig_types::RepoConfigRef; +use mononoke_app::MononokeApp; +use mononoke_types::BlobstoreBytes; +use mononoke_types::ChangesetId; +use preloaded_commit_graph_storage::ExtendablePreloadedEdges; +use rendezvous::RendezVousOptions; +use repo_blobstore::RepoBlobstoreRef; +use repo_identity::RepoIdentityRef; +use sql_commit_graph_storage::SqlCommitGraphStorage; +use sql_commit_graph_storage::SqlCommitGraphStorageBuilder; +use tokio::time::sleep; +use tokio::time::Duration; + +use super::Repo; + +#[derive(Args)] +pub struct UpdatePreloadedArgs { + /// Blobstore key for the preloaded commit graph. + #[clap(long)] + blobstore_key: String, + + /// Whether to rebuild the preloaded commit graph or start + /// from the previous blob. + #[clap(long)] + rebuild: bool, + + /// Number of times to retry fetching changeset edges + /// from the database. + #[clap(long, default_value_t = 0)] + sql_retries: u64, + + /// Maximum number of changeset edges in a chunk + /// fetched from the database. + #[clap(long)] + chunk_size: u64, + + /// Sleep time between fetching changeset edges in milliseconds. + #[clap(long)] + sleep_ms: u64, +} + +async fn try_fetch_chunk( + ctx: &CoreContext, + sql_storage: &SqlCommitGraphStorage, + start_id: u64, + end_id: u64, + chunk_size: u64, + mut sql_retries: u64, + sleep_ms: u64, +) -> Result> { + loop { + match sql_storage + .fetch_many_edges_in_id_range(ctx, start_id, end_id, chunk_size) + .await + { + Ok(edges) => return Ok(edges), + Err(err) => match sql_retries { + 0 => return Err(err), + _ => { + println!("{:?}", err); + println!("Retrying fetching changeset edges"); + + sql_retries -= 1; + sleep(Duration::from_millis(sleep_ms)).await; + } + }, + } + } +} + +pub(super) async fn update_preloaded( + ctx: &CoreContext, + app: &MononokeApp, + repo: &Repo, + args: UpdatePreloadedArgs, +) -> Result<()> { + let sql_storage = app + .repo_factory() + .sql_factory(&repo.repo_config().storage_config.metadata) + .await? + .open::() + .await? + .build( + RendezVousOptions { + free_connections: 5, + }, + repo.repo_identity().id(), + ); + + let preloaded_edges = match args.rebuild { + false => match repo.repo_blobstore().get(ctx, &args.blobstore_key).await? { + Some(bytes) => { + preloaded_commit_graph_storage::deserialize_preloaded_edges(bytes.into_raw_bytes())? + } + None => Default::default(), + }, + true => Default::default(), + }; + + // The newly added changesets all have higher sql ids than the maximum + // id from the previously preloaded changesets. + let mut start_id = preloaded_edges + .max_sql_id + .map_or(1, |id| id.saturating_add(1)); + // Query the maximum sql id for this repo only once to avoid tailing + // new changesets. + let end_id = sql_storage.max_id(ctx).await?.unwrap_or(0); + + println!( + "Updating with changesets having sql ids between {} and {} inclusive", + start_id, end_id + ); + + let mut extendable_preloaded_edges = + ExtendablePreloadedEdges::from_preloaded_edges(preloaded_edges); + + while start_id <= end_id { + // Tries to fetch the first chunk_size changeset edges between + // start_id and end_id. + let edges_chunk = try_fetch_chunk( + ctx, + &sql_storage, + start_id, + end_id, + args.chunk_size, + args.sql_retries, + args.sleep_ms, + ) + .await?; + + if edges_chunk.is_empty() { + break; + } + + // Query the maximum sql id from the fetched chunk to fetch the next + // chunks from after it. + let max_id_in_chunk = sql_storage + .max_id_in_range(ctx, start_id, end_id, edges_chunk.len() as u64) + .await? + .ok_or_else(|| anyhow!("Chunk is not empty but couldn't find max id"))?; + + println!( + "Fetched chunk containing {} edges. Maximum sql id in chunk is {}", + edges_chunk.len(), + max_id_in_chunk + ); + + for (_cs_id, edges) in edges_chunk { + extendable_preloaded_edges.add(edges)?; + } + extendable_preloaded_edges.update_max_sql_id(max_id_in_chunk); + start_id = max_id_in_chunk + 1; + + println!("Extended preloaded edges with chunk"); + + sleep(Duration::from_millis(args.sleep_ms)).await; + } + println!("Deserializing preloaded edges"); + + let bytes = compact_protocol::serialize( + &extendable_preloaded_edges + .into_preloaded_edges() + .to_thrift()?, + ); + + println!("Deserialized preloaded edges into {} bytes", bytes.len()); + + repo.repo_blobstore() + .put(ctx, args.blobstore_key, BlobstoreBytes::from_bytes(bytes)) + .await?; + + println!("Uploaded updated preloaded edges to blobstore"); + + Ok(()) +}