Use namedag to update segmented changelogs

Summary:
Now that namedag recovers nicely if the IdMap has more assigned ids than the IdDag, use it to build segmented changelogs.

This removes one source of divergence between client and Mononoke.

Reviewed By: quark-zju

Differential Revision: D32783287

fbshipit-source-id: 94a030b94e810d406102db92bab249dc76675dac
This commit is contained in:
Simon Farnsworth 2021-12-07 05:45:23 -08:00 committed by Facebook GitHub Bot
parent a390131c80
commit 6ab2a33ed0
14 changed files with 171 additions and 244 deletions

View File

@ -69,13 +69,14 @@ pub async fn new_server_segmented_changelog<'a>(
// All other configuration is ignored, for example there won't be periodic updates
// following a bookmark.
return Ok(Arc::new(OnDemandUpdateSegmentedChangelog::new(
ctx.clone(),
repo_id,
InProcessIdDag::new_in_process(),
Arc::new(ConcurrentMemIdMap::new()),
changeset_fetcher,
bookmarks,
bookmarks_name,
)));
)?));
}
let mut idmap_factory = IdMapFactory::new(
connections.0.clone(),

View File

@ -5,7 +5,6 @@
* GNU General Public License version 2.
*/
pub mod rebuild;
mod save_store;
pub use self::save_store::IdDagSaveStore;

View File

@ -1,90 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::collections::{HashMap, VecDeque};
use anyhow::{anyhow, Result};
use context::CoreContext;
use mononoke_types::ChangesetId;
use slog::warn;
use crate::dag::errors::NotFoundError;
use crate::dag::{Id, InProcessIdDag};
use crate::{parents::FetchParents, IdMap};
pub async fn rebuild_iddag(
ctx: &CoreContext,
parent_fetcher: &FetchParents,
idmap: &dyn IdMap,
iddag: &mut InProcessIdDag,
head: ChangesetId,
missing_head_expected: bool,
) -> Result<usize> {
let head_id = idmap
.find_dag_id(ctx, head)
.await?
.ok_or_else(|| anyhow!("Just added head {} is not in IdMap", head))?;
if !iddag.contains_id(head_id)? {
if !missing_head_expected {
warn!(
ctx.logger(),
"IdDag does not contain {} which is in the IdMap. Building IdDag", head
);
}
let parents =
load_idmap_parents_not_in_iddag(ctx, parent_fetcher, idmap, iddag, head_id).await?;
iddag
.build_segments(head_id, &|id| {
parents
.get(&id)
.cloned()
.ok_or_else(|| id.not_found_error())
})
.map_err(anyhow::Error::from)
} else {
Ok(0)
}
}
async fn load_idmap_parents_not_in_iddag(
ctx: &CoreContext,
parent_fetcher: &FetchParents,
idmap: &dyn IdMap,
iddag: &InProcessIdDag,
head_id: Id,
) -> Result<HashMap<Id, Vec<Id>>> {
let changeset_fetcher = parent_fetcher.get_changeset_fetcher();
// We're going to load all the Ids in the IdMap that aren't in the iddag,
// and track their parents.
let mut res = HashMap::new();
let mut ids_to_find = VecDeque::new();
ids_to_find.push_back(head_id);
while let Some(head) = ids_to_find.pop_front() {
let head_cs_id = idmap
.find_changeset_id(ctx, head)
.await?
.ok_or_else(|| head.not_found_error())?;
let parents = changeset_fetcher
.get_parents(ctx.clone(), head_cs_id)
.await?;
let parent_ids = idmap.find_many_dag_ids(ctx, parents.clone()).await?;
let parents: Vec<Id> = parents
.into_iter()
.map(|id| parent_ids.get(&id).copied().ok_or_else(|| anyhow!("Changeset {} not found in segmented changelog, yet should be present - reseed!", id)))
.collect::<Result<_>>()?;
for parent in &parents {
let known_id = iddag.contains_id(*parent)? || res.contains_key(parent);
if !known_id {
ids_to_find.push_back(*parent);
}
}
res.insert(head, parents);
}
Ok(res)
}

View File

@ -9,7 +9,6 @@ use async_trait::async_trait;
use context::CoreContext;
use mononoke_types::ChangesetId;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use crate::dag::errors::{self, BackendError, DagError};
@ -28,10 +27,6 @@ define_stats! {
flush_writes: timeseries(Sum),
}
/// When the wrapper has this many entries to write out, stop waiting for more
/// and write immediately
const WRITE_BATCH_SIZE: usize = 1000;
/// Type conversion - `VertexName` is used in the `dag` crate to abstract over different ID types
/// such as Mercurial IDs, Bonsai, a theoretical git ID and more.
/// but should always name a Bonsai `ChangesetId` on the server.
@ -54,18 +49,19 @@ pub fn vertex_name_from_cs_id(cs_id: &ChangesetId) -> VertexName {
VertexName::copy_from(cs_id.blake2().as_ref())
}
struct IdMapMemWrites<'a> {
#[derive(Clone)]
struct IdMapMemWrites {
/// The actual IdMap
inner: &'a dyn IdMap,
inner: Arc<dyn IdMap>,
/// Stores recent writes that haven't yet been persisted to the backing store
mem: ConcurrentMemIdMap,
mem: Arc<ConcurrentMemIdMap>,
}
impl<'a> IdMapMemWrites<'a> {
pub fn new(inner: &'a dyn IdMap) -> Self {
impl IdMapMemWrites {
pub fn new(inner: Arc<dyn IdMap>) -> Self {
Self {
inner,
mem: ConcurrentMemIdMap::new(),
mem: Arc::new(ConcurrentMemIdMap::new()),
}
}
@ -82,7 +78,7 @@ impl<'a> IdMapMemWrites<'a> {
}
#[async_trait]
impl<'a> IdMap for IdMapMemWrites<'a> {
impl IdMap for IdMapMemWrites {
async fn insert_many(
&self,
ctx: &CoreContext,
@ -95,9 +91,6 @@ impl<'a> IdMap for IdMapMemWrites<'a> {
.expect("More than an i64 of writes in one go!"),
);
let res = self.mem.insert_many(ctx, mappings).await;
if self.mem.len() >= WRITE_BATCH_SIZE {
self.flush_writes(ctx).await?;
}
res
}
@ -196,37 +189,45 @@ impl<'a> IdMap for IdMapMemWrites<'a> {
/// outside the `closure` to avoid performance issues. Having all `NameSet`
/// calculations inside the `closure` is fine, even if the final result is
/// passed out to the enclosing scope
pub struct IdMapWrapper<'a> {
#[derive(Clone)]
pub struct IdMapWrapper {
verlink: VerLink,
inner: Arc<IdMapMemWrites<'a>>,
inner: IdMapMemWrites,
ctx: CoreContext,
}
impl<'a> IdMapWrapper<'a> {
/// Run the given closure with a [`IdMapWrapper`] around the supplied [`IdMap`] and [`CoreContext`]
/// This lets you use `dag` crate methods on a server `IdMap`
pub async fn run<Fut, T>(
ctx: CoreContext,
idmap: &'a dyn IdMap,
closure: impl FnOnce(IdMapWrapper<'a>) -> Fut,
) -> anyhow::Result<T>
where
Fut: Future<Output = anyhow::Result<T>>,
{
let idmap_memwrites = Arc::new(IdMapMemWrites::new(idmap));
let wrapper = Self {
impl IdMapWrapper {
/// Create a new wrapper around the server IdMap, using CoreContext
/// for calling update functions
pub fn new(ctx: CoreContext, idmap: Arc<dyn IdMap>) -> Self {
let idmap_memwrites = IdMapMemWrites::new(idmap);
Self {
verlink: VerLink::new(),
inner: idmap_memwrites.clone(),
inner: idmap_memwrites,
ctx: ctx.clone(),
};
let res = closure(wrapper).await;
idmap_memwrites.flush_writes(&ctx).await?;
res
}
}
/// If not called, IdMap changes are discarded when this is dropped
pub async fn flush_writes(&self) -> anyhow::Result<()> {
self.inner.flush_writes(&self.ctx).await
}
/// Flushes writes and then returns the original IdMap
pub async fn finish(self) -> anyhow::Result<Arc<dyn IdMap>> {
self.flush_writes().await?;
Ok(self.inner.inner)
}
/// Get a clone of the original IdMap fed in.
/// If `flush_writes` has not been called, this will not be updated
pub fn clone_idmap(&self) -> Arc<dyn IdMap> {
self.inner.inner.clone()
}
}
#[async_trait]
impl<'a> PrefixLookup for IdMapWrapper<'a> {
impl PrefixLookup for IdMapWrapper {
async fn vertexes_by_hex_prefix(
&self,
_hex_prefix: &[u8],
@ -236,7 +237,7 @@ impl<'a> PrefixLookup for IdMapWrapper<'a> {
}
}
#[async_trait]
impl<'a> IdConvert for IdMapWrapper<'a> {
impl IdConvert for IdMapWrapper {
async fn vertex_id(&self, name: VertexName) -> Result<Id> {
// NOTE: The server implementation puts all Ids in the "master" group.
self.vertex_id_with_max_group(&name, Group::MASTER)
@ -347,7 +348,7 @@ impl<'a> IdConvert for IdMapWrapper<'a> {
}
#[async_trait]
impl<'a> IdMapWrite for IdMapWrapper<'a> {
impl IdMapWrite for IdMapWrapper {
async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
// NB: This is only suitable for tailing right now, as it writes on every call
// Eventually, this needs to use a batching interface

View File

@ -95,13 +95,14 @@ impl SegmentedChangelogManager {
format!("repo {}: failed to load segmented changelog", self.repo_id)
})?;
Ok(Arc::new(OnDemandUpdateSegmentedChangelog::new(
ctx.clone(),
self.repo_id,
owned.iddag,
owned.idmap,
Arc::clone(&self.changeset_fetcher),
Arc::clone(&self.bookmarks),
self.bookmark_name.clone(),
)))
)?))
}
// public for builder only

View File

@ -27,10 +27,12 @@ use changeset_fetcher::ChangesetFetcher;
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use crate::dag::ops::DagAddHeads;
use crate::dag::VertexListWithOptions;
use crate::idmap::IdMap;
use crate::parents::FetchParents;
use crate::read_only::ReadOnlySegmentedChangelog;
use crate::update::update_sc;
use crate::update::{head_with_options, server_namedag, ServerNameDag};
use crate::{
segmented_changelog_delegate, CloneData, Group, InProcessIdDag, Location, MismatchedHeadsError,
SegmentedChangelog,
@ -88,8 +90,7 @@ mod actual_update {
pub struct OnDemandUpdateSegmentedChangelog {
repo_id: RepositoryId,
iddag: Arc<RwLock<InProcessIdDag>>,
idmap: Arc<dyn IdMap>,
namedag: Arc<RwLock<ServerNameDag>>,
changeset_fetcher: Arc<dyn ChangesetFetcher>,
bookmarks: Arc<dyn Bookmarks>,
master_bookmark: BookmarkName,
@ -98,22 +99,24 @@ pub struct OnDemandUpdateSegmentedChangelog {
impl OnDemandUpdateSegmentedChangelog {
pub fn new(
ctx: CoreContext,
repo_id: RepositoryId,
iddag: InProcessIdDag,
idmap: Arc<dyn IdMap>,
changeset_fetcher: Arc<dyn ChangesetFetcher>,
bookmarks: Arc<dyn Bookmarks>,
master_bookmark: BookmarkName,
) -> Self {
Self {
) -> Result<Self> {
let namedag = server_namedag(ctx, iddag, idmap)?;
let namedag = Arc::new(RwLock::new(namedag));
Ok(Self {
repo_id,
iddag: Arc::new(RwLock::new(iddag)),
idmap,
namedag,
changeset_fetcher,
bookmarks,
master_bookmark,
ongoing_update: Arc::new(Mutex::new(None)),
}
})
}
pub fn with_periodic_update_to_master_bookmark(
@ -158,18 +161,11 @@ impl OnDemandUpdateSegmentedChangelog {
if let Some(fut) = &*ongoing_update {
fut.clone().map(|_| Ok(false)).boxed()
} else {
cloned!(
ctx,
self.repo_id,
self.iddag,
self.idmap,
self.changeset_fetcher
);
cloned!(ctx, self.repo_id, self.namedag, self.changeset_fetcher);
let task_ongoing_update = self.ongoing_update.clone();
let update_task = async move {
let result =
the_actual_update(ctx, repo_id, iddag, idmap, changeset_fetcher, head)
.await;
the_actual_update(ctx, repo_id, namedag, changeset_fetcher, head).await;
let mut ongoing_update = task_ongoing_update.lock();
*ongoing_update = None;
result
@ -270,7 +266,12 @@ impl OnDemandUpdateSegmentedChangelog {
}
async fn are_heads_assigned(&self, ctx: &CoreContext, heads: &[ChangesetId]) -> Result<bool> {
let dag_id_map = self.idmap.find_many_dag_ids(ctx, heads.to_vec()).await?;
let namedag = self.namedag.read().await;
let idmap_wrapper = namedag.map();
let dag_id_map = idmap_wrapper
.clone_idmap()
.find_many_dag_ids(ctx, heads.to_vec())
.await?;
if dag_id_map.len() != heads.len() {
// Maybe heads have duplicated items? Double check.
let mut heads = heads.to_vec();
@ -281,9 +282,8 @@ impl OnDemandUpdateSegmentedChangelog {
}
}
// It is safer to check that the dag_ids we got are also in the iddag.
let iddag = self.iddag.read().await;
for (_cs_id, dag_id) in dag_id_map {
if !iddag.contains_id(dag_id)? {
if !namedag.dag().contains_id(dag_id)? {
return Ok(false);
}
}
@ -291,8 +291,10 @@ impl OnDemandUpdateSegmentedChangelog {
}
async fn is_cs_assigned(&self, ctx: &CoreContext, cs_id: ChangesetId) -> Result<bool> {
if let Some(dag_id) = self
.idmap
let namedag = self.namedag.read().await;
if let Some(dag_id) = namedag
.map()
.clone_idmap()
.find_dag_id(ctx, cs_id)
.await
.context("fetching dag_id for csid")?
@ -300,8 +302,7 @@ impl OnDemandUpdateSegmentedChangelog {
// Note. This will result in two read locks being acquired for functions that call
// into build_up. It would be nice to get to one lock being acquired. I tried but
// had some issues with lifetimes :).
let iddag = self.iddag.read().await;
if iddag.contains_id(dag_id)? {
if namedag.dag().contains_id(dag_id)? {
return Ok(true);
}
}
@ -312,17 +313,17 @@ impl OnDemandUpdateSegmentedChangelog {
async fn the_actual_update(
ctx: CoreContext,
repo_id: RepositoryId,
iddag: Arc<RwLock<InProcessIdDag>>,
idmap: Arc<dyn IdMap>,
namedag: Arc<RwLock<ServerNameDag>>,
changeset_fetcher: Arc<dyn ChangesetFetcher>,
head: ChangesetId,
) -> Result<()> {
let monitored = async {
let mut iddag = iddag.write().await;
let parents_fetcher = FetchParents::new(ctx.clone(), changeset_fetcher);
update_sc(&ctx, &parents_fetcher, &mut iddag, &idmap, head).await?;
let mut namedag = namedag.write().await;
let parent_fetcher = FetchParents::new(ctx.clone(), changeset_fetcher);
let heads = VertexListWithOptions::from(vec![head_with_options(head)]);
namedag.add_heads(&parent_fetcher, &heads).await?;
namedag.map().flush_writes().await?;
Ok(())
};
actual_update::STATS::count.add_value(1);
@ -362,8 +363,8 @@ impl SegmentedChangelog for OnDemandUpdateSegmentedChangelog {
self.build_up_to_heads(ctx, &[location.descendant])
.await
.context("error while getting an up to date dag")?;
let iddag = self.iddag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(&iddag, self.idmap.clone());
let namedag = self.namedag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(namedag.dag(), namedag.map().clone_idmap());
read_dag
.location_to_many_changeset_ids(ctx, location, count)
.await
@ -379,16 +380,16 @@ impl SegmentedChangelog for OnDemandUpdateSegmentedChangelog {
self.build_up_to_heads(ctx, &master_heads)
.await
.context("error while getting an up to date dag")?;
let iddag = self.iddag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(&iddag, self.idmap.clone());
let namedag = self.namedag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(namedag.dag(), namedag.map().clone_idmap());
read_dag
.many_changeset_ids_to_locations(ctx, master_heads, cs_ids)
.await
}
async fn clone_data(&self, ctx: &CoreContext) -> Result<CloneData<ChangesetId>> {
let iddag = self.iddag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(&iddag, self.idmap.clone());
let namedag = self.namedag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(namedag.dag(), namedag.map().clone_idmap());
read_dag.clone_data(ctx).await
}
@ -401,8 +402,8 @@ impl SegmentedChangelog for OnDemandUpdateSegmentedChangelog {
self.build_up_to_heads(ctx, &[old_master, new_master])
.await
.context("error while getting an up to date dag")?;
let iddag = self.iddag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(&iddag, self.idmap.clone());
let namedag = self.namedag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(namedag.dag(), namedag.map().clone_idmap());
read_dag
.pull_fast_forward_master(ctx, old_master, new_master)
.await
@ -417,8 +418,8 @@ impl SegmentedChangelog for OnDemandUpdateSegmentedChangelog {
ancestor: ChangesetId,
descendant: ChangesetId,
) -> Result<Option<bool>> {
let iddag = self.iddag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(&iddag, self.idmap.clone());
let namedag = self.namedag.read().await;
let read_dag = ReadOnlySegmentedChangelog::new(namedag.dag(), namedag.map().clone_idmap());
read_dag.is_ancestor(ctx, ancestor, descendant).await
}
}

View File

@ -28,10 +28,6 @@ impl FetchParents {
changeset_fetcher,
}
}
pub fn get_changeset_fetcher(&self) -> &dyn ChangesetFetcher {
&self.changeset_fetcher
}
}
#[async_trait::async_trait]

View File

@ -18,12 +18,14 @@ use changeset_fetcher::ChangesetFetcher;
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use crate::dag::ops::DagAddHeads;
use crate::dag::VertexListWithOptions;
use crate::iddag::IdDagSaveStore;
use crate::idmap::IdMapFactory;
use crate::idmap::SqlIdMapVersionStore;
use crate::parents::FetchParents;
use crate::types::{IdMapVersion, SegmentedChangelogVersion};
use crate::update::update_sc;
use crate::update::{head_with_options, server_namedag};
use crate::version_store::SegmentedChangelogVersionStore;
use crate::{InProcessIdDag, SegmentedChangelogSqlConnections};
@ -90,13 +92,24 @@ impl SegmentedChangelogSeeder {
);
let idmap = self.idmap_factory.for_writer(ctx, idmap_version);
let mut iddag = InProcessIdDag::new_in_process();
let iddag = InProcessIdDag::new_in_process();
let parents_fetcher = FetchParents::new(ctx.clone(), self.changeset_fetcher.clone());
// Create a segmented changelog by updating the empty set to a full set
for head in heads {
update_sc(ctx, &parents_fetcher, &mut iddag, &idmap, head).await?;
}
let mut namedag = server_namedag(ctx.clone(), iddag, idmap)?;
let heads_with_options = VertexListWithOptions::from(
heads
.into_iter()
.map(|head| head_with_options(head))
.collect::<Vec<_>>(),
);
namedag
.add_heads(&parents_fetcher, &heads_with_options)
.await?;
let (idmap, iddag) = namedag.into_idmap_dag();
idmap.finish().await?;
// Update IdMapVersion
self.idmap_version_store

View File

@ -22,12 +22,14 @@ use changeset_fetcher::ChangesetFetcher;
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use crate::dag::ops::DagAddHeads;
use crate::dag::VertexListWithOptions;
use crate::iddag::IdDagSaveStore;
use crate::idmap::{CacheHandlers, IdMapFactory};
use crate::owned::OwnedSegmentedChangelog;
use crate::parents::FetchParents;
use crate::types::SegmentedChangelogVersion;
use crate::update::update_sc;
use crate::update::{head_with_options, server_namedag};
use crate::version_store::SegmentedChangelogVersionStore;
use crate::SegmentedChangelogSqlConnections;
@ -169,22 +171,26 @@ impl SegmentedChangelogTailer {
"repo {}: bookmark {} resolved to {}", self.repo_id, self.bookmark_name, head
);
let mut iddag = self
let iddag = self
.iddag_save_store
.load(&ctx, sc_version.iddag_version)
.await
.with_context(|| format!("repo {}: failed to load iddag", self.repo_id))?;
let new_segment_count = update_sc(
&ctx,
&FetchParents::new(ctx.clone(), self.changeset_fetcher.clone()),
&mut iddag,
&idmap,
head,
)
.await?;
let mut namedag = server_namedag(ctx.clone(), iddag, idmap)?;
let parent_fetcher = FetchParents::new(ctx.clone(), self.changeset_fetcher.clone());
if new_segment_count == 0 {
let heads = VertexListWithOptions::from(vec![head_with_options(head)]);
// Note on memory use: we do not flush the changes out in the middle
// of writing to the IdMap.
// Thus, if OOMs happen here, the IdMap may need to flush writes to the DB
// at interesting points.
let changed = namedag.add_heads(&parent_fetcher, &heads).await?;
let (idmap, iddag) = namedag.into_idmap_dag();
let idmap = idmap.finish().await?;
if !changed {
info!(
ctx.logger(),
"repo {}: segmented changelog already up to date, skipping update to iddag",

View File

@ -185,9 +185,13 @@ async fn load_owned(
Ok(OwnedSegmentedChangelog::new(iddag, idmap))
}
fn new_isolated_on_demand_update(blobrepo: &BlobRepo) -> OnDemandUpdateSegmentedChangelog {
fn new_isolated_on_demand_update(
ctx: CoreContext,
blobrepo: &BlobRepo,
) -> Result<OnDemandUpdateSegmentedChangelog> {
// feel free to add bookmark_name as a parameter when the need appears
OnDemandUpdateSegmentedChangelog::new(
ctx,
blobrepo.get_repoid(),
InProcessIdDag::new_in_process(),
Arc::new(ConcurrentMemIdMap::new()),
@ -576,7 +580,7 @@ async fn test_build_incremental_from_scratch(fb: FacebookInit) -> Result<()> {
{
// linear
let blobrepo = linear::getrepo(fb).await;
let sc = new_isolated_on_demand_update(&blobrepo);
let sc = new_isolated_on_demand_update(ctx.clone(), &blobrepo)?;
let known_cs =
resolve_cs_id(&ctx, &blobrepo, "79a13814c5ce7330173ec04d279bf95ab3f652fb").await?;
@ -591,7 +595,7 @@ async fn test_build_incremental_from_scratch(fb: FacebookInit) -> Result<()> {
{
// merge_uneven
let blobrepo = merge_uneven::getrepo(fb).await;
let sc = new_isolated_on_demand_update(&blobrepo);
let sc = new_isolated_on_demand_update(ctx.clone(), &blobrepo)?;
let known_cs =
resolve_cs_id(&ctx, &blobrepo, "264f01429683b3dd8042cb3979e8bf37007118bc").await?;
@ -658,7 +662,7 @@ async fn test_on_demand_update_commit_location_to_changeset_ids(fb: FacebookInit
// commit 5
let cs5 = resolve_cs_id(&ctx, &blobrepo, "cb15ca4a43a59acff5388cea9648c162afde8372").await?;
let sc = new_isolated_on_demand_update(&blobrepo);
let sc = new_isolated_on_demand_update(ctx.clone(), &blobrepo)?;
let answer = try_join_all(vec![
sc.location_to_changeset_id(&ctx, Location::new(cs10, 5)),
sc.location_to_changeset_id(&ctx, Location::new(cs6, 1)),
@ -667,7 +671,7 @@ async fn test_on_demand_update_commit_location_to_changeset_ids(fb: FacebookInit
.await?;
assert_eq!(answer, vec![cs5, cs5, cs5]);
let sc = new_isolated_on_demand_update(&blobrepo);
let sc = new_isolated_on_demand_update(ctx.clone(), &blobrepo)?;
let answer = try_join_all(vec![
sc.changeset_id_to_location(&ctx, vec![cs10], cs5),
sc.changeset_id_to_location(&ctx, vec![cs6], cs5),
@ -708,6 +712,7 @@ async fn test_incremental_update_with_desync_iddag(fb: FacebookInit) -> Result<(
));
let new_sc = || {
OnDemandUpdateSegmentedChangelog::new(
ctx.clone(),
blobrepo.get_repoid(),
InProcessIdDag::new_in_process(),
Arc::clone(&idmap),
@ -720,7 +725,7 @@ async fn test_incremental_update_with_desync_iddag(fb: FacebookInit) -> Result<(
let master_cs =
resolve_cs_id(&ctx, &blobrepo, "79a13814c5ce7330173ec04d279bf95ab3f652fb").await?;
let initial = new_sc();
let initial = new_sc()?;
let cs7 = resolve_cs_id(&ctx, &blobrepo, "0ed509bf086fadcb8a8a5384dc3b550729b0fc17").await?;
let distance: u64 = 4;
@ -729,7 +734,7 @@ async fn test_incremental_update_with_desync_iddag(fb: FacebookInit) -> Result<(
.await?;
assert_eq!(answer, cs7);
let second = new_sc();
let second = new_sc()?;
let cs3 = resolve_cs_id(&ctx, &blobrepo, "607314ef579bd2407752361ba1b0c1729d08b281").await?;
let answer = second
@ -856,13 +861,14 @@ async fn test_periodic_update(fb: FacebookInit) -> Result<()> {
tokio::time::pause(); // TODO: pause only works with the `current_thread` Runtime.
let on_demand = OnDemandUpdateSegmentedChangelog::new(
ctx.clone(),
blobrepo.get_repoid(),
InProcessIdDag::new_in_process(),
Arc::new(ConcurrentMemIdMap::new()),
blobrepo.get_changeset_fetcher(),
Arc::clone(blobrepo.bookmarks()) as Arc<dyn Bookmarks>,
bookmark_name.clone(),
);
)?;
let sc =
Arc::new(on_demand).with_periodic_update_to_master_bookmark(&ctx, Duration::from_secs(5));
@ -961,7 +967,7 @@ async fn test_mismatched_heads(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let blobrepo = branch_even::getrepo(fb).await;
let dag = new_isolated_on_demand_update(&blobrepo);
let dag = new_isolated_on_demand_update(ctx.clone(), &blobrepo)?;
let h1 = resolve_cs_id(&ctx, &blobrepo, "4f7f3fd428bec1a48f9314414b063c706d9c1aed").await?;
let h1_parent =
resolve_cs_id(&ctx, &blobrepo, "b65231269f651cfe784fd1d97ef02a049a37b8a0").await?;

View File

@ -5,51 +5,35 @@
* GNU General Public License version 2.
*/
use std::future::Future;
use std::sync::Arc;
use anyhow::Result;
use context::CoreContext;
use mononoke_types::ChangesetId;
use crate::dag::idmap::IdMapAssignHead;
use crate::dag::IdSet;
use crate::iddag::rebuild::rebuild_iddag;
use crate::dag::{NameDagBuilder, VertexName, VertexOptions};
use crate::idmap::{vertex_name_from_cs_id, IdMap, IdMapWrapper};
use crate::parents::FetchParents;
use crate::{Group, InProcessIdDag};
//TODO(simonfar): For some reason, building the IdDag from prepared flat segments
//doesn't work reliably. For now, we always rebuild the IdDag from commit history instead.
const REBUILD_IDDAG: bool = true;
pub type ServerNameDag = crate::dag::namedag::AbstractNameDag<InProcessIdDag, IdMapWrapper, (), ()>;
pub fn update_sc<'a>(
ctx: &'a CoreContext,
parent_fetcher: &'a FetchParents,
iddag: &'a mut InProcessIdDag,
idmap: &'a dyn IdMap,
head: ChangesetId,
) -> impl Future<Output = Result<usize>> + 'a {
async move {
let mut covered_ids = iddag.all()?;
let flat_segments = IdMapWrapper::run(ctx.clone(), idmap, move |mut idmap| async move {
idmap
.assign_head(
vertex_name_from_cs_id(&head),
parent_fetcher,
Group::MASTER,
&mut covered_ids,
&IdSet::empty(),
)
.await
.map_err(anyhow::Error::from)
})
.await?;
if REBUILD_IDDAG || flat_segments.segment_count() == 0 {
return rebuild_iddag(ctx, parent_fetcher, idmap, iddag, head, REBUILD_IDDAG).await;
}
iddag.build_segments_from_prepared_flat_segments(&flat_segments)?;
Ok(flat_segments.segment_count())
}
/// Convert a server IdDag and IdMap to a NameDag
/// Note: you will need to call NameDag::as_idmap().flush_writes
/// to write out updates to the IdMap
pub fn server_namedag(
ctx: CoreContext,
iddag: InProcessIdDag,
idmap: Arc<dyn IdMap>,
) -> Result<ServerNameDag> {
let idmap = IdMapWrapper::new(ctx, idmap);
NameDagBuilder::new_with_idmap_dag(idmap, iddag)
.build()
.map_err(anyhow::Error::from)
}
pub fn head_with_options(head: ChangesetId) -> (VertexName, VertexOptions) {
let mut options = VertexOptions::default();
options.highest_group = Group::MASTER;
(vertex_name_from_cs_id(&head), options)
}

View File

@ -390,7 +390,7 @@ where
&mut self,
parents: &dyn Parents,
heads: &VertexListWithOptions,
) -> Result<()> {
) -> Result<bool> {
self.invalidate_snapshot();
// Populate vertex negative cache to reduce round-trips doing remote lookups.
@ -445,7 +445,14 @@ where
let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
for (head, opts) in heads.vertex_options() {
if !self.contains_vertex_name(&head).await? {
let need_assigning = match self
.vertex_id_with_max_group(&head, opts.highest_group)
.await?
{
Some(id) => !self.dag.contains_id(id)?,
None => true,
};
if need_assigning {
let group = opts.highest_group;
let prepared_segments = self
.assign_head(head.clone(), parents, group, &mut covered, &reserved)
@ -463,7 +470,7 @@ where
self.dag
.build_segments_from_prepared_flat_segments(&outcome)?;
Ok(())
Ok(outcome.segment_count() > 0)
}
}

View File

@ -275,7 +275,7 @@ pub trait DagAddHeads {
&mut self,
parents: &dyn Parents,
heads: &VertexListWithOptions,
) -> Result<()>;
) -> Result<bool>;
}
/// Import a generated `CloneData` object into an empty DAG.

View File

@ -1827,7 +1827,7 @@ impl RevlogIndex {
&mut self,
parents_func: &dyn Parents,
heads: &VertexListWithOptions,
) -> dag::Result<()> {
) -> dag::Result<bool> {
if !cfg!(test) {
panic!(
"add_heads should only works for testing \
@ -1837,6 +1837,7 @@ impl RevlogIndex {
);
}
let mut updated = false;
// Update IdMap. Keep track of what heads are added.
for head in heads.vertexes() {
if !non_blocking_result(self.contains_vertex_name(&head))? {
@ -1858,11 +1859,12 @@ impl RevlogIndex {
}
let text = Bytes::from_static(b"DUMMY COMMIT MESSAGE FOR TESTING");
self.insert(head.clone(), parent_revs, text);
updated = true;
}
}
}
Ok(())
Ok(updated)
}
}
@ -1872,7 +1874,7 @@ impl DagAddHeads for RevlogIndex {
&mut self,
parents_func: &dyn Parents,
heads: &VertexListWithOptions,
) -> dag::Result<()> {
) -> dag::Result<bool> {
self.add_heads_for_testing(parents_func, heads)
}
}