diff --git a/eden/mononoke/segmented_changelog/src/idmap/mod.rs b/eden/mononoke/segmented_changelog/src/idmap/mod.rs index f773c6c5ed..a3502f9d4c 100644 --- a/eden/mononoke/segmented_changelog/src/idmap/mod.rs +++ b/eden/mononoke/segmented_changelog/src/idmap/mod.rs @@ -18,10 +18,10 @@ pub use self::version::SqlIdMapVersionStore; use std::collections::HashMap; use std::sync::Arc; -use anyhow::{format_err, Result}; +use anyhow::{bail, format_err, Context, Result}; use async_trait::async_trait; -use dag::Id as Vertex; +use dag::{Group, Id as Vertex, InProcessIdDag}; use sql_ext::replication::ReplicaLagMonitor; use sql_ext::SqlConnections; @@ -121,15 +121,38 @@ impl IdMap for Arc { } } +/// The idmap works in unison with the iddag. The idmap and the iddag need to be in sync for iddag +/// update operation to be consistent. When we download an iddag, it has the idmap described by +/// the shared store. The overlay allows us to update the iddag in process by adding an idmap that +/// stores all the changes that we made in process. It's important to note that the shared store is +/// updated constantly by other processes. Because vertexes are added in increasing order, we can +/// use the last entry in the downloaded iddag as the cutoff that delimitates the entries that are +/// fetched from the shared store and those that are fetched from the in process store. Note that +/// if we were to use the abcence of an entry from the in process store to fetch from the shared +/// store we would likely end up with inconsistent entries from an updated shared store. +// Vertexes greater than or equal to cutoff are associated with mem idmap. +// Vertexes less than cutoff are associated with shared idmap. pub struct OverlayIdMap { - a: Arc, - b: Arc, + mem: ConcurrentMemIdMap, + shared: Arc, + cutoff: Vertex, } impl OverlayIdMap { - #[allow(dead_code)] - pub fn new(a: Arc, b: Arc) -> Self { - Self { a, b } + pub fn new(shared: Arc, cutoff: Vertex) -> Self { + let mem = ConcurrentMemIdMap::new(); + Self { + mem, + shared, + cutoff, + } + } + + pub fn from_iddag_and_idmap(iddag: &InProcessIdDag, shared: Arc) -> Result { + let cutoff = iddag + .next_free_id(0, Group::MASTER) + .context("error fetching next iddag id")?; + Ok(Self::new(shared, cutoff)) } } @@ -140,7 +163,16 @@ impl IdMap for OverlayIdMap { ctx: &CoreContext, mappings: Vec<(Vertex, ChangesetId)>, ) -> Result<()> { - self.a.insert_many(ctx, mappings).await + for (v, _) in mappings.iter() { + if v < &self.cutoff { + return Err(format_err!( + "overlay idmap asked to insert {} but cutoff is {}", + v, + self.cutoff + )); + } + } + self.mem.insert_many(ctx, mappings).await } async fn find_many_changeset_ids( @@ -148,17 +180,25 @@ impl IdMap for OverlayIdMap { ctx: &CoreContext, vertexes: Vec, ) -> Result> { - let mut result = self - .a - .find_many_changeset_ids(ctx, vertexes.clone()) - .await?; - let to_get_b = vertexes - .into_iter() - .filter(|v| !result.contains_key(&v)) + let from_mem = vertexes + .iter() + .filter(|&v| v >= &self.cutoff) + .cloned() .collect(); - let from_b = self.b.find_many_changeset_ids(ctx, to_get_b).await?; - for (v, cs) in from_b { - result.insert(v, cs); + let mut result = self.mem.find_many_changeset_ids(ctx, from_mem).await?; + let from_shared: Vec = vertexes + .iter() + .filter(|&v| v < &self.cutoff) + .cloned() + .collect(); + if !from_shared.is_empty() { + let shared_result = self + .shared + .find_many_changeset_ids(ctx, from_shared) + .await?; + for (v, cs) in shared_result { + result.insert(v, cs); + } } Ok(result) } @@ -168,22 +208,34 @@ impl IdMap for OverlayIdMap { ctx: &CoreContext, cs_ids: Vec, ) -> Result> { - let mut result = self.a.find_many_vertexes(ctx, cs_ids.clone()).await?; - let to_get_b = cs_ids + let mut result = self.mem.find_many_vertexes(ctx, cs_ids.clone()).await?; + for (cs, v) in result.iter() { + if v < &self.cutoff { + bail!( + "unexpected assignment found in mem idmap: {} for {} but cutoff is {}", + v, + cs, + self.cutoff + ); + } + } + let to_get_shared = cs_ids .into_iter() .filter(|cs_id| !result.contains_key(&cs_id)) .collect(); - let from_b = self.b.find_many_vertexes(ctx, to_get_b).await?; - for (cs, v) in from_b { - result.insert(cs, v); + let from_shared = self.shared.find_many_vertexes(ctx, to_get_shared).await?; + for (cs, v) in from_shared { + if v < self.cutoff { + result.insert(cs, v); + } } Ok(result) } async fn get_last_entry(&self, ctx: &CoreContext) -> Result> { - match self.a.get_last_entry(ctx).await? { + match self.mem.get_last_entry(ctx).await? { Some(x) => Ok(Some(x)), - None => self.b.get_last_entry(ctx).await, + None => self.shared.get_last_entry(ctx).await, } } } @@ -239,11 +291,14 @@ impl IdMapFactory { // Servers have an overlay idmap which means that all their updates to the idmap stay confined // to the Dag that performed the updates. - pub fn for_server(&self, ctx: &CoreContext, version: IdMapVersion) -> Arc { - Arc::new(OverlayIdMap::new( - Arc::new(ConcurrentMemIdMap::new()), - self.for_writer(ctx, version), - )) + pub fn for_server( + &self, + ctx: &CoreContext, + version: IdMapVersion, + iddag: &InProcessIdDag, + ) -> Result> { + let overlay = OverlayIdMap::from_iddag_and_idmap(iddag, self.for_writer(ctx, version))?; + Ok(Arc::new(overlay)) } pub fn with_cache_handlers(mut self, cache_handlers: CacheHandlers) -> Self { @@ -260,40 +315,65 @@ mod tests { use fbinit::FacebookInit; - use mononoke_types_mocks::changesetid::{AS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID}; + use mononoke_types_mocks::changesetid::{ + AS_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID, + }; #[fbinit::test] - async fn test_write_a_read_ab(fb: FacebookInit) -> Result<()> { + async fn test_overlay_idmap(fb: FacebookInit) -> Result<()> { let ctx = CoreContext::test_mock(fb); - let a = Arc::new(ConcurrentMemIdMap::new()); - let b = Arc::new(ConcurrentMemIdMap::new()); + let shared: Arc = Arc::new(ConcurrentMemIdMap::new()); - b.insert_many(&ctx, vec![(Vertex(0), AS_CSID), (Vertex(1), ONES_CSID)]) + shared + .insert_many(&ctx, vec![(Vertex(0), AS_CSID), (Vertex(1), ONES_CSID)]) .await?; - let both = OverlayIdMap::new(a, b); + let overlay = OverlayIdMap::new(Arc::clone(&shared), Vertex(2)); assert_eq!( - both.find_many_changeset_ids(&ctx, vec![Vertex(0), Vertex(1), Vertex(2)]) + overlay + .find_many_changeset_ids(&ctx, vec![Vertex(0), Vertex(1), Vertex(2)]) .await?, hashmap![Vertex(0) => AS_CSID, Vertex(1) => ONES_CSID] ); - both.insert_many(&ctx, vec![(Vertex(2), TWOS_CSID), (Vertex(3), THREES_CSID)]) + overlay + .insert_many(&ctx, vec![(Vertex(2), TWOS_CSID), (Vertex(3), THREES_CSID)]) .await?; assert_eq!( - both.find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)]) + overlay + .find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)]) .await?, hashmap![Vertex(2) => TWOS_CSID, Vertex(3) => THREES_CSID] ); assert_eq!( - both.b + overlay + .shared .find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)]) .await?, hashmap![] ); + + shared + .insert_many( + &ctx, + vec![ + (Vertex(2), THREES_CSID), + (Vertex(3), TWOS_CSID), + (Vertex(4), FOURS_CSID), + ], + ) + .await?; + + assert_eq!( + overlay + .find_many_changeset_ids(&ctx, vec![Vertex(3), Vertex(4)]) + .await?, + hashmap![Vertex(3) => THREES_CSID] + ); + Ok(()) } } diff --git a/eden/mononoke/segmented_changelog/src/manager.rs b/eden/mononoke/segmented_changelog/src/manager.rs index fc55f322ac..960d2f8d69 100644 --- a/eden/mononoke/segmented_changelog/src/manager.rs +++ b/eden/mononoke/segmented_changelog/src/manager.rs @@ -116,7 +116,9 @@ impl SegmentedChangelogManager { .load(&ctx, sc_version.iddag_version) .await .with_context(|| format!("repo {}: failed to load iddag", self.repo_id))?; - let idmap = self.idmap_factory.for_server(ctx, sc_version.idmap_version); + let idmap = self + .idmap_factory + .for_server(ctx, sc_version.idmap_version, &iddag)?; slog::debug!( ctx.logger(), "segmented changelog dag successfully loaded - repo_id: {}, idmap_version: {}, \ diff --git a/eden/mononoke/segmented_changelog/src/update.rs b/eden/mononoke/segmented_changelog/src/update.rs index 1ddf91b769..8e154be601 100644 --- a/eden/mononoke/segmented_changelog/src/update.rs +++ b/eden/mononoke/segmented_changelog/src/update.rs @@ -261,7 +261,16 @@ pub async fn prepare_incremental_iddag_update<'a>( let (cs_id, parents, vertex) = entry?; start_state.insert_parents(cs_id, parents.clone()); if let Some(v) = vertex { - start_state.insert_vertex_assignment(cs_id, v); + if v < id_map_next_id { + start_state.insert_vertex_assignment(cs_id, v); + } else { + return Err(format_err!( + "racing data while updating segmented changelog, \ + next_id is {} but found {} assigned", + id_map_next_id, + v + )); + } } let vertex_missing_from_iddag = match vertex { Some(v) => !iddag.contains_id(v)?,