mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 16:31:02 +03:00
use derived data infra to derive mercurial changesets
Summary: This completely converts mercurial changeset to be an instance of derived data: - Custom lease logic is removed - Custom changeset traversal logic is removed Naming scheme of keys for leases has been changed to conform with other derived data types. This might cause temporary spike of cpu usage during rollout. Reviewed By: farnz Differential Revision: D23575777 fbshipit-source-id: 8eb878b2b0a57312c69f865f4c5395d98df7141c
This commit is contained in:
parent
b92c64af7d
commit
463acc581d
@ -30,6 +30,7 @@ filestore = { path = "../../filestore" }
|
||||
fsnodes = { path = "../../derived_data/fsnodes" }
|
||||
git_types = { path = "../../git/git_types" }
|
||||
memblob = { path = "../../blobstore/memblob" }
|
||||
mercurial_derived_data = { path = "../../derived_data/mercurial_derived_data" }
|
||||
mercurial_mutation = { path = "../../mercurial/mutation" }
|
||||
metaconfig_types = { path = "../../metaconfig/types" }
|
||||
mononoke_types = { path = "../../mononoke_types" }
|
||||
|
@ -35,6 +35,7 @@ use futures::{compat::Future01CompatExt, future, try_join};
|
||||
use git_types::TreeHandle;
|
||||
use maplit::btreeset;
|
||||
use memblob::EagerMemblob;
|
||||
use mercurial_derived_data::MappedHgChangesetId;
|
||||
use mercurial_mutation::{HgMutationStore, SqlHgMutationStoreBuilder};
|
||||
use metaconfig_types::{
|
||||
self, CensoredScubaParams, DerivedDataConfig, FilestoreParams, Redaction, RepoConfig,
|
||||
@ -415,6 +416,7 @@ pub fn init_all_derived_data() -> DerivedDataConfig {
|
||||
RootDeletedManifestId::NAME.to_string(),
|
||||
RootUnodeManifestId::NAME.to_string(),
|
||||
TreeHandle::NAME.to_string(),
|
||||
MappedHgChangesetId::NAME.to_string(),
|
||||
},
|
||||
unode_version: UnodeVersion::V2,
|
||||
override_blame_filesize_limit: None,
|
||||
|
@ -33,7 +33,7 @@ use futures_ext::{BoxFuture, FutureExt};
|
||||
use futures_old::{Future, Stream};
|
||||
use maplit::btreemap;
|
||||
use memblob::LazyMemblob;
|
||||
use mercurial_derived_data::{get_hg_from_bonsai_changeset_with_impl, get_manifest_from_bonsai};
|
||||
use mercurial_derived_data::get_manifest_from_bonsai;
|
||||
use mercurial_types::{
|
||||
blobs::{
|
||||
BlobManifest, ContentBlobMeta, File, HgBlobChangeset, UploadHgFileContents,
|
||||
@ -1140,10 +1140,20 @@ fn test_hg_commit_generation_simple(fb: FacebookInit) {
|
||||
repo.clone(),
|
||||
))
|
||||
.unwrap();
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(&repo, ctx, bcs_id))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id))
|
||||
.unwrap();
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"5c31d1196c64c93cb5bcf8bca3a24860f103d69f"
|
||||
))
|
||||
);
|
||||
// make sure bonsai hg mapping is updated
|
||||
let map_bcs_id = runtime
|
||||
.block_on(repo.get_bonsai_from_hg(ctx, hg_cs_id))
|
||||
.unwrap();
|
||||
assert_eq!(map_bcs_id, Some(bcs_id));
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1174,14 +1184,15 @@ fn test_hg_commit_generation_stack(fb: FacebookInit) {
|
||||
repo.clone(),
|
||||
))
|
||||
.unwrap();
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(
|
||||
&repo,
|
||||
ctx,
|
||||
top_of_stack,
|
||||
))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx, top_of_stack))
|
||||
.unwrap();
|
||||
assert_eq!(count, stack_size);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"b15a980d805db1646422dbf02016aa8a9f8aacd3",
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1204,23 +1215,25 @@ fn test_hg_commit_generation_one_after_another(fb: FacebookInit) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(
|
||||
&repo,
|
||||
ctx.clone(),
|
||||
first_bcs_id,
|
||||
))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx.clone(), first_bcs_id))
|
||||
.unwrap();
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"5c31d1196c64c93cb5bcf8bca3a24860f103d69f",
|
||||
))
|
||||
);
|
||||
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(
|
||||
&repo,
|
||||
ctx,
|
||||
second_bcs_id,
|
||||
))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx, second_bcs_id))
|
||||
.unwrap();
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"09e9a31873e07ad483aa64e4dfd2cc705de40276",
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1237,14 +1250,15 @@ fn test_hg_commit_generation_diamond(fb: FacebookInit) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(
|
||||
&repo,
|
||||
ctx.clone(),
|
||||
last_bcs_id,
|
||||
))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx.clone(), last_bcs_id))
|
||||
.unwrap();
|
||||
assert_eq!(count, 4);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"5d69478d73e67e5270550e44f2acfd93f456d74a",
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1258,14 +1272,15 @@ fn test_hg_commit_generation_many_diamond(fb: FacebookInit) {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let (_, count) = runtime
|
||||
.block_on(get_hg_from_bonsai_changeset_with_impl(
|
||||
&repo,
|
||||
ctx.clone(),
|
||||
bcs_id,
|
||||
))
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx, bcs_id))
|
||||
.unwrap();
|
||||
assert_eq!(count, 200);
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"6b43556e77b7312cabd16ac5f0a85cd920d95272",
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1299,9 +1314,15 @@ fn test_hg_commit_generation_uneven_branch(fb: FacebookInit) {
|
||||
);
|
||||
|
||||
runtime.block_on(f).unwrap();
|
||||
runtime
|
||||
let hg_cs_id = runtime
|
||||
.block_on(repo.get_hg_from_bonsai_changeset(ctx.clone(), merge.get_changeset_id()))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
hg_cs_id,
|
||||
HgChangesetId::new(string_to_nodehash(
|
||||
"62b3de4cbd1bc4bf8422c6588234c28842476d3b",
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(fbcode_build)]
|
||||
|
@ -20,8 +20,6 @@ derived_data = { path = ".." }
|
||||
manifest = { path = "../../manifest" }
|
||||
mercurial_types = { path = "../../mercurial/types" }
|
||||
mononoke_types = { path = "../../mononoke_types" }
|
||||
scuba_ext = { path = "../../common/scuba_ext" }
|
||||
topo_sort = { path = "../../common/topo_sort" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
@ -31,7 +29,3 @@ tracing = { git = "https://github.com/facebookexperimental/rust-shed.git", branc
|
||||
anyhow = "1.0"
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
maplit = "1.0"
|
||||
rand = { version = "0.7", features = ["small_rng"] }
|
||||
slog = { version = "2.5", features = ["max_level_debug"] }
|
||||
tokio = { version = "=0.2.13", features = ["full"] }
|
||||
|
@ -5,24 +5,22 @@
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use crate::derive_hg_manifest::derive_hg_manifest;
|
||||
use crate::{derive_hg_manifest::derive_hg_manifest, mapping::MappedHgChangesetId};
|
||||
use anyhow::Error;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobrepo_common::changed_files::compute_changed_files;
|
||||
use blobstore::Loadable;
|
||||
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry};
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use futures::future::{self as new_future, FutureExt as NewFutureExt, TryFutureExt};
|
||||
use futures_ext::{try_boxfuture, BoxFuture, FutureExt, StreamExt};
|
||||
use futures_old::sync::oneshot;
|
||||
use futures_old::{
|
||||
future::{self, loop_fn, Loop},
|
||||
stream, Future, IntoFuture, Stream,
|
||||
use derived_data::{BonsaiDerived, DeriveError};
|
||||
use futures::{
|
||||
compat::Future01CompatExt,
|
||||
future::{try_join_all, TryFutureExt},
|
||||
};
|
||||
use futures_ext::{try_boxfuture, BoxFuture, FutureExt, StreamExt};
|
||||
use futures_old::{future, stream, Future, IntoFuture, Stream};
|
||||
use futures_stats::futures01::Timed;
|
||||
use manifest::ManifestOps;
|
||||
use maplit::hashmap;
|
||||
use mercurial_types::{
|
||||
blobs::{
|
||||
ChangesetMetadata, ContentBlobMeta, HgBlobChangeset, HgBlobEntry, HgChangesetContent,
|
||||
@ -31,16 +29,9 @@ use mercurial_types::{
|
||||
HgChangesetId, HgFileNodeId, HgManifestId, HgParents, Type,
|
||||
};
|
||||
use mononoke_types::{BonsaiChangeset, ChangesetId, FileChange, MPath};
|
||||
use scuba_ext::ScubaSampleBuilderExt;
|
||||
use slog::debug;
|
||||
use stats::prelude::*;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use time_ext::DurationExt;
|
||||
use topo_sort::sort_topological;
|
||||
use tracing::{trace_args, EventId, Traced};
|
||||
|
||||
define_stats! {
|
||||
@ -324,91 +315,23 @@ pub fn get_manifest_from_bonsai(
|
||||
.boxify()
|
||||
}
|
||||
|
||||
fn generate_lease_key(repo: &BlobRepo, bcs_id: &ChangesetId) -> String {
|
||||
let repoid = repo.get_repoid();
|
||||
format!("repoid.{}.hg-changeset.{}", repoid.id(), bcs_id)
|
||||
}
|
||||
|
||||
fn take_hg_generation_lease(
|
||||
pub(crate) async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
ctx: CoreContext,
|
||||
bcs_id: ChangesetId,
|
||||
) -> impl Future<Item = Option<HgChangesetId>, Error = Error> + Send {
|
||||
let key = generate_lease_key(&repo, &bcs_id);
|
||||
let repoid = repo.get_repoid();
|
||||
|
||||
let derived_data_lease = repo.get_derived_data_lease_ops();
|
||||
let bonsai_hg_mapping = get_bonsai_hg_mapping(&repo).clone();
|
||||
|
||||
let backoff_ms = 200;
|
||||
loop_fn(backoff_ms, move |mut backoff_ms| {
|
||||
cloned!(ctx, key);
|
||||
derived_data_lease
|
||||
.try_add_put_lease(&key)
|
||||
.or_else(|_| Ok(false))
|
||||
.and_then({
|
||||
cloned!(bcs_id, bonsai_hg_mapping, repo);
|
||||
move |leased| {
|
||||
let maybe_hg_cs =
|
||||
bonsai_hg_mapping.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id);
|
||||
if leased {
|
||||
maybe_hg_cs
|
||||
.and_then(move |maybe_hg_cs| {
|
||||
match maybe_hg_cs {
|
||||
Some(hg_cs) => release_hg_generation_lease(&repo, bcs_id)
|
||||
.then(move |_| Ok(Loop::Break(Some(hg_cs))))
|
||||
.left_future(),
|
||||
None => future::ok(Loop::Break(None)).right_future(),
|
||||
}
|
||||
})
|
||||
.left_future()
|
||||
} else {
|
||||
maybe_hg_cs
|
||||
.and_then(move |maybe_hg_cs_id| {
|
||||
match maybe_hg_cs_id {
|
||||
Some(hg_cs_id) => {
|
||||
future::ok(Loop::Break(Some(hg_cs_id))).left_future()
|
||||
}
|
||||
None => {
|
||||
let sleep = rand::random::<u64>() % backoff_ms;
|
||||
tokio::time::delay_for(Duration::from_millis(sleep))
|
||||
.then(|_| new_future::ready(Ok(())))
|
||||
.compat()
|
||||
.then(move |_: Result<(), Error>| {
|
||||
backoff_ms *= 2;
|
||||
if backoff_ms >= 1000 {
|
||||
backoff_ms = 1000;
|
||||
}
|
||||
Ok(Loop::Continue(backoff_ms))
|
||||
})
|
||||
.right_future()
|
||||
}
|
||||
}
|
||||
})
|
||||
.right_future()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn renew_hg_generation_lease_forever(
|
||||
repo: &BlobRepo,
|
||||
ctx: CoreContext,
|
||||
bcs_id: ChangesetId,
|
||||
done: BoxFuture<(), ()>,
|
||||
) {
|
||||
let key = generate_lease_key(repo, &bcs_id);
|
||||
repo.get_derived_data_lease_ops()
|
||||
.renew_lease_until(ctx, &key, done)
|
||||
}
|
||||
|
||||
fn release_hg_generation_lease(
|
||||
repo: &BlobRepo,
|
||||
bcs_id: ChangesetId,
|
||||
) -> impl Future<Item = (), Error = ()> + Send {
|
||||
let key = generate_lease_key(repo, &bcs_id);
|
||||
repo.get_derived_data_lease_ops().release_lease(&key)
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<MappedHgChangesetId>,
|
||||
) -> Result<MappedHgChangesetId, Error> {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
let parents = try_join_all(
|
||||
parents
|
||||
.into_iter()
|
||||
.map(|id| id.0.load(ctx.clone(), repo.blobstore())),
|
||||
)
|
||||
.await?;
|
||||
let hg_cs_id = generate_hg_changeset(repo, ctx, bcs_id, bonsai, parents)
|
||||
.compat()
|
||||
.await?;
|
||||
Ok(MappedHgChangesetId(hg_cs_id))
|
||||
}
|
||||
|
||||
fn generate_hg_changeset(
|
||||
@ -473,19 +396,6 @@ fn generate_hg_changeset(
|
||||
let cs_id = cs.get_changeset_id();
|
||||
|
||||
cs.save(ctx.clone(), repo.get_blobstore())
|
||||
.and_then({
|
||||
cloned!(ctx, repo);
|
||||
move |_| {
|
||||
get_bonsai_hg_mapping(&repo).add(
|
||||
ctx,
|
||||
BonsaiHgMappingEntry {
|
||||
repo_id: repo.get_repoid(),
|
||||
hg_cs_id: cs_id,
|
||||
bcs_id,
|
||||
},
|
||||
)
|
||||
}
|
||||
})
|
||||
.map(move |_| cs_id)
|
||||
.boxify()
|
||||
}
|
||||
@ -498,251 +408,24 @@ fn generate_hg_changeset(
|
||||
.timed(move |stats, _| {
|
||||
STATS::generate_hg_from_bonsai_single_latency_ms
|
||||
.add_value(stats.completion_time.as_millis_unchecked() as i64);
|
||||
STATS::generate_hg_from_bonsai_generated_commit_num.add_value(1);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
// Converts Bonsai changesets to hg changesets. It either fetches hg changeset id from
|
||||
// bonsai-hg mapping or it generates hg changeset and puts hg changeset id in bonsai-hg mapping.
|
||||
// Note that it generates parent hg changesets first.
|
||||
// This function takes care of making sure the same changeset is not generated at the same time
|
||||
// by taking leases. It also avoids using recursion to prevents stackoverflow
|
||||
pub fn get_hg_from_bonsai_changeset_with_impl(
|
||||
repo: &BlobRepo,
|
||||
ctx: CoreContext,
|
||||
bcs_id: ChangesetId,
|
||||
) -> impl Future<Item = (HgChangesetId, usize), Error = Error> + Send {
|
||||
// Finds parent bonsai commits which do not have corresponding hg changeset generated
|
||||
// Avoids using recursion
|
||||
fn find_toposorted_bonsai_cs_with_no_hg_cs_generated(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bcs_id: ChangesetId,
|
||||
bonsai_hg_mapping: Arc<dyn BonsaiHgMapping>,
|
||||
) -> impl Future<Item = Vec<BonsaiChangeset>, Error = Error> {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut visited: HashSet<ChangesetId> = HashSet::new();
|
||||
visited.insert(bcs_id);
|
||||
queue.push_back(bcs_id);
|
||||
|
||||
let repoid = repo.get_repoid();
|
||||
loop_fn(
|
||||
(queue, vec![], visited),
|
||||
move |(mut queue, mut commits_to_generate, mut visited)| {
|
||||
cloned!(ctx, repo);
|
||||
match queue.pop_front() {
|
||||
Some(bcs_id) => bonsai_hg_mapping
|
||||
.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id)
|
||||
.and_then(move |maybe_hg| {
|
||||
match maybe_hg {
|
||||
Some(_hg_cs_id) => future::ok(Loop::Continue((
|
||||
queue,
|
||||
commits_to_generate,
|
||||
visited,
|
||||
)))
|
||||
.left_future(),
|
||||
None => bcs_id
|
||||
.load(ctx.clone(), repo.blobstore())
|
||||
.compat()
|
||||
.from_err()
|
||||
.map(move |bcs| {
|
||||
commits_to_generate.push(bcs.clone());
|
||||
queue.extend(bcs.parents().filter(|p| visited.insert(*p)));
|
||||
Loop::Continue((queue, commits_to_generate, visited))
|
||||
})
|
||||
.right_future(),
|
||||
}
|
||||
})
|
||||
.left_future(),
|
||||
None => future::ok(Loop::Break(commits_to_generate)).right_future(),
|
||||
}
|
||||
},
|
||||
)
|
||||
.map(|changesets| {
|
||||
let mut graph = hashmap! {};
|
||||
let mut id_to_bcs = hashmap! {};
|
||||
for cs in changesets {
|
||||
graph.insert(cs.get_changeset_id(), cs.parents().collect());
|
||||
id_to_bcs.insert(cs.get_changeset_id(), cs);
|
||||
}
|
||||
sort_topological(&graph)
|
||||
.expect("commit graph has cycles!")
|
||||
.into_iter()
|
||||
.map(|cs_id| id_to_bcs.remove(&cs_id))
|
||||
.filter_map(|x| x)
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
// Panics if changeset not found
|
||||
fn fetch_hg_changeset_from_mapping(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bcs_id: ChangesetId,
|
||||
) -> impl Future<Item = HgBlobChangeset, Error = Error> {
|
||||
let bonsai_hg_mapping = get_bonsai_hg_mapping(&repo).clone();
|
||||
let repoid = repo.get_repoid();
|
||||
bonsai_hg_mapping
|
||||
.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id)
|
||||
.and_then(move |maybe_hg| {
|
||||
match maybe_hg {
|
||||
Some(hg_cs_id) => hg_cs_id.load(ctx, repo.blobstore()).compat().from_err(),
|
||||
None => panic!("hg changeset must be generated already"),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Panics if parent hg changesets are not generated
|
||||
// Returns whether a commit was generated or not
|
||||
fn generate_single_hg_changeset(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bcs: BonsaiChangeset,
|
||||
) -> impl Future<Item = (HgChangesetId, bool), Error = Error> {
|
||||
let bcs_id = bcs.get_changeset_id();
|
||||
|
||||
take_hg_generation_lease(repo.clone(), ctx.clone(), bcs_id.clone())
|
||||
.traced(
|
||||
&ctx.trace(),
|
||||
"create_hg_from_bonsai::wait_for_lease",
|
||||
trace_args! {},
|
||||
)
|
||||
.and_then({
|
||||
cloned!(ctx, repo);
|
||||
move |maybe_hg_cs_id| {
|
||||
match maybe_hg_cs_id {
|
||||
Some(hg_cs_id) => future::ok((hg_cs_id, false)).left_future(),
|
||||
None => {
|
||||
// We have the lease
|
||||
STATS::generate_hg_from_bonsai_changeset.add_value(1);
|
||||
|
||||
let mut hg_parents = vec![];
|
||||
for p in bcs.parents() {
|
||||
hg_parents.push(fetch_hg_changeset_from_mapping(
|
||||
ctx.clone(),
|
||||
repo.clone(),
|
||||
p,
|
||||
));
|
||||
}
|
||||
|
||||
future::join_all(hg_parents)
|
||||
.and_then({
|
||||
cloned!(repo);
|
||||
move |hg_parents| {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
renew_hg_generation_lease_forever(
|
||||
&repo,
|
||||
ctx.clone(),
|
||||
bcs_id,
|
||||
receiver.map_err(|_| ()).boxify(),
|
||||
);
|
||||
|
||||
generate_hg_changeset(
|
||||
repo.clone(),
|
||||
ctx.clone(),
|
||||
bcs_id,
|
||||
bcs,
|
||||
hg_parents,
|
||||
)
|
||||
.then(move |res| {
|
||||
let _ = sender.send(());
|
||||
res
|
||||
})
|
||||
}
|
||||
})
|
||||
.map(|hg_cs_id| (hg_cs_id, true))
|
||||
.right_future()
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.timed(move |stats, _| {
|
||||
ctx.scuba()
|
||||
.clone()
|
||||
.add_future_stats(&stats)
|
||||
.log_with_msg("Generating hg changeset", Some(format!("{}", bcs_id)));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
let repoid = repo.get_repoid();
|
||||
let bonsai_hg_mapping = get_bonsai_hg_mapping(&repo).clone();
|
||||
find_toposorted_bonsai_cs_with_no_hg_cs_generated(
|
||||
ctx.clone(),
|
||||
repo.clone(),
|
||||
bcs_id.clone(),
|
||||
bonsai_hg_mapping.clone(),
|
||||
)
|
||||
.and_then({
|
||||
cloned!(ctx, repo);
|
||||
move |commits_to_generate: Vec<BonsaiChangeset>| {
|
||||
let start = (0, commits_to_generate.into_iter());
|
||||
|
||||
loop_fn(
|
||||
start,
|
||||
move |(mut generated_count, mut commits_to_generate)| match commits_to_generate
|
||||
.next()
|
||||
{
|
||||
Some(bcs) => {
|
||||
let bcs_id = bcs.get_changeset_id();
|
||||
|
||||
generate_single_hg_changeset(ctx.clone(), repo.clone(), bcs)
|
||||
.map({
|
||||
cloned!(ctx);
|
||||
move |(hg_cs_id, generated)| {
|
||||
if generated {
|
||||
debug!(
|
||||
ctx.logger(),
|
||||
"generated hg changeset for {}: {} ({} left to visit)",
|
||||
bcs_id,
|
||||
hg_cs_id,
|
||||
commits_to_generate.len(),
|
||||
);
|
||||
generated_count += 1;
|
||||
}
|
||||
Loop::Continue((generated_count, commits_to_generate))
|
||||
}
|
||||
})
|
||||
.left_future()
|
||||
}
|
||||
None => {
|
||||
return bonsai_hg_mapping
|
||||
.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id)
|
||||
.map({
|
||||
cloned!(ctx);
|
||||
move |maybe_hg_cs_id| match maybe_hg_cs_id {
|
||||
Some(hg_cs_id) => {
|
||||
if generated_count > 0 {
|
||||
debug!(
|
||||
ctx.logger(),
|
||||
"generation complete for {}", bcs_id,
|
||||
);
|
||||
}
|
||||
Loop::Break((hg_cs_id, generated_count))
|
||||
}
|
||||
None => panic!("hg changeset must be generated already"),
|
||||
}
|
||||
})
|
||||
.right_future();
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_hg_from_bonsai_changeset(
|
||||
repo: &BlobRepo,
|
||||
ctx: CoreContext,
|
||||
bcs_id: ChangesetId,
|
||||
) -> impl Future<Item = HgChangesetId, Error = Error> + Send {
|
||||
STATS::get_hg_from_bonsai_changeset.add_value(1);
|
||||
get_hg_from_bonsai_changeset_with_impl(repo, ctx, bcs_id)
|
||||
.map(|(hg_cs_id, generated_commit_num)| {
|
||||
STATS::generate_hg_from_bonsai_generated_commit_num
|
||||
.add_value(generated_commit_num as i64);
|
||||
hg_cs_id
|
||||
MappedHgChangesetId::derive(ctx, repo.clone(), bcs_id)
|
||||
.then(|result| match result {
|
||||
Ok(id) => Ok(id.0),
|
||||
Err(err) => match err {
|
||||
DeriveError::Disabled(..) => Err(err.into()),
|
||||
DeriveError::Error(err) => Err(err),
|
||||
},
|
||||
})
|
||||
.timed(move |stats, _| {
|
||||
STATS::generate_hg_from_bonsai_total_latency_ms
|
||||
@ -750,7 +433,3 @@ pub fn get_hg_from_bonsai_changeset(
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn get_bonsai_hg_mapping(repo: &BlobRepo) -> &Arc<dyn BonsaiHgMapping> {
|
||||
repo.attribute_expected::<dyn BonsaiHgMapping>()
|
||||
}
|
||||
|
@ -11,8 +11,6 @@ pub mod derive_hg_changeset;
|
||||
pub mod derive_hg_manifest;
|
||||
mod mapping;
|
||||
|
||||
pub use derive_hg_changeset::{
|
||||
get_hg_from_bonsai_changeset, get_hg_from_bonsai_changeset_with_impl, get_manifest_from_bonsai,
|
||||
};
|
||||
pub use derive_hg_changeset::{get_hg_from_bonsai_changeset, get_manifest_from_bonsai};
|
||||
pub use derive_hg_manifest::derive_hg_manifest;
|
||||
pub use mapping::{HgChangesetIdMapping, MappedHgChangesetId};
|
||||
|
@ -5,13 +5,13 @@
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use crate::get_hg_from_bonsai_changeset;
|
||||
use anyhow::Error;
|
||||
use blobrepo::BlobRepo;
|
||||
use bonsai_hg_mapping::BonsaiHgMapping;
|
||||
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry};
|
||||
use context::CoreContext;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use futures_old::{future, Future};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use futures_ext::{BoxFuture, FutureExt as _};
|
||||
use futures_old::Future;
|
||||
use mercurial_types::HgChangesetId;
|
||||
use mononoke_types::{BonsaiChangeset, ChangesetId, RepositoryId};
|
||||
|
||||
@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct MappedHgChangesetId(HgChangesetId);
|
||||
pub struct MappedHgChangesetId(pub HgChangesetId);
|
||||
|
||||
impl BonsaiDerived for MappedHgChangesetId {
|
||||
const NAME: &'static str = "hgchangesets";
|
||||
@ -34,11 +34,11 @@ impl BonsaiDerived for MappedHgChangesetId {
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
_parents: Vec<Self>,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
get_hg_from_bonsai_changeset(&repo, ctx, bcs_id)
|
||||
.map(|hg_cs_id| MappedHgChangesetId(hg_cs_id))
|
||||
crate::derive_hg_changeset::derive_from_parents(ctx, repo, bonsai, parents)
|
||||
.boxed()
|
||||
.compat()
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
@ -76,8 +76,17 @@ impl BonsaiDerivedMapping for HgChangesetIdMapping {
|
||||
.boxify()
|
||||
}
|
||||
|
||||
// This just succeeds, because generation of the derived data also saves the mapping
|
||||
fn put(&self, _ctx: CoreContext, _csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> {
|
||||
future::ok(()).boxify()
|
||||
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
|
||||
self.mapping
|
||||
.add(
|
||||
ctx,
|
||||
BonsaiHgMappingEntry {
|
||||
repo_id: self.repo_id,
|
||||
hg_cs_id: id.0,
|
||||
bcs_id: csid,
|
||||
},
|
||||
)
|
||||
.map(|_| ())
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
|
@ -156,7 +156,11 @@ fn fail_if_disabled<Derived: BonsaiDerived>(repo: &BlobRepo) -> Result<(), Deriv
|
||||
.contains(Derived::NAME)
|
||||
{
|
||||
STATS::derived_data_disabled.add_value(1, (repo.get_repoid().id(), Derived::NAME));
|
||||
return Err(DeriveError::Disabled(Derived::NAME, repo.get_repoid()));
|
||||
return Err(DeriveError::Disabled(
|
||||
Derived::NAME,
|
||||
repo.get_repoid(),
|
||||
repo.name().clone(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -34,8 +34,8 @@ pub enum Mode {
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DeriveError {
|
||||
#[error("Derivation of {0} is not enabled for repo {1}")]
|
||||
Disabled(&'static str, RepositoryId),
|
||||
#[error("Derivation of {0} is not enabled for repo={2} repoid={1}")]
|
||||
Disabled(&'static str, RepositoryId, String),
|
||||
#[error("{0}")]
|
||||
Error(#[from] Error),
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ impl From<LoadableError> for MononokeError {
|
||||
impl From<DeriveError> for MononokeError {
|
||||
fn from(e: DeriveError) -> Self {
|
||||
match e {
|
||||
e @ DeriveError::Disabled(_, _) => MononokeError::NotAvailable(e.to_string()),
|
||||
e @ DeriveError::Disabled(..) => MononokeError::NotAvailable(e.to_string()),
|
||||
DeriveError::Error(e) => MononokeError::from(e),
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@
|
||||
# directory of this source tree.
|
||||
|
||||
$ . "${TEST_FIXTURES}/library.sh"
|
||||
$ ENABLED_DERIVED_DATA='["git_trees", "filenodes"]' setup_common_config "blob_files"
|
||||
$ ENABLED_DERIVED_DATA='["git_trees", "filenodes", "hgchangesets"]' setup_common_config "blob_files"
|
||||
$ GIT_REPO="${TESTTMP}/repo-git"
|
||||
$ HG_REPO="${TESTTMP}/repo-hg"
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
# directory of this source tree.
|
||||
|
||||
$ . "${TEST_FIXTURES}/library.sh"
|
||||
$ ENABLED_DERIVED_DATA='["git_trees", "filenodes"]' setup_common_config
|
||||
$ ENABLED_DERIVED_DATA='["git_trees", "filenodes", "hgchangesets"]' setup_common_config
|
||||
$ GIT_REPO="${TESTTMP}/repo-git"
|
||||
$ HG_REPO="${TESTTMP}/repo-hg"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user