segmented_changelog: update OverlayIdMap with assigned vertex ranges

Summary:
Pretty big bug here with the "Overlay" when we are updating both stores.  It
turns out that we don't really want a standard Overlay. We want the loaded
iddag to operate with the Ids in the shared IdMap and we want whatever is
updates to use the in process IdMap. The problem we have with the overlay is
that the shared IdMap may have more data than the in process IdMap. The shared
IdMap is always updated by the tailer, after all. This means that when we query
the overlay, we may get data from the shared store even if this is the first
time we are trying to update a changeset for the current process.

The solution here is to specify which vertexes are fetched from either store.

Reviewed By: quark-zju

Differential Revision: D27028367

fbshipit-source-id: e09f003d94100778eabd990724579c84b0f86541
This commit is contained in:
Stefan Filip 2021-03-16 09:28:20 -07:00 committed by Facebook GitHub Bot
parent c18b35a400
commit deae65979e
3 changed files with 133 additions and 42 deletions

View File

@ -18,10 +18,10 @@ pub use self::version::SqlIdMapVersionStore;
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{format_err, Result};
use anyhow::{bail, format_err, Context, Result};
use async_trait::async_trait;
use dag::Id as Vertex;
use dag::{Group, Id as Vertex, InProcessIdDag};
use sql_ext::replication::ReplicaLagMonitor;
use sql_ext::SqlConnections;
@ -121,15 +121,38 @@ impl IdMap for Arc<dyn IdMap> {
}
}
/// The idmap works in unison with the iddag. The idmap and the iddag need to be in sync for iddag
/// update operation to be consistent. When we download an iddag, it has the idmap described by
/// the shared store. The overlay allows us to update the iddag in process by adding an idmap that
/// stores all the changes that we made in process. It's important to note that the shared store is
/// updated constantly by other processes. Because vertexes are added in increasing order, we can
/// use the last entry in the downloaded iddag as the cutoff that delimitates the entries that are
/// fetched from the shared store and those that are fetched from the in process store. Note that
/// if we were to use the abcence of an entry from the in process store to fetch from the shared
/// store we would likely end up with inconsistent entries from an updated shared store.
// Vertexes greater than or equal to cutoff are associated with mem idmap.
// Vertexes less than cutoff are associated with shared idmap.
pub struct OverlayIdMap {
a: Arc<dyn IdMap>,
b: Arc<dyn IdMap>,
mem: ConcurrentMemIdMap,
shared: Arc<dyn IdMap>,
cutoff: Vertex,
}
impl OverlayIdMap {
#[allow(dead_code)]
pub fn new(a: Arc<dyn IdMap>, b: Arc<dyn IdMap>) -> Self {
Self { a, b }
pub fn new(shared: Arc<dyn IdMap>, cutoff: Vertex) -> Self {
let mem = ConcurrentMemIdMap::new();
Self {
mem,
shared,
cutoff,
}
}
pub fn from_iddag_and_idmap(iddag: &InProcessIdDag, shared: Arc<dyn IdMap>) -> Result<Self> {
let cutoff = iddag
.next_free_id(0, Group::MASTER)
.context("error fetching next iddag id")?;
Ok(Self::new(shared, cutoff))
}
}
@ -140,7 +163,16 @@ impl IdMap for OverlayIdMap {
ctx: &CoreContext,
mappings: Vec<(Vertex, ChangesetId)>,
) -> Result<()> {
self.a.insert_many(ctx, mappings).await
for (v, _) in mappings.iter() {
if v < &self.cutoff {
return Err(format_err!(
"overlay idmap asked to insert {} but cutoff is {}",
v,
self.cutoff
));
}
}
self.mem.insert_many(ctx, mappings).await
}
async fn find_many_changeset_ids(
@ -148,17 +180,25 @@ impl IdMap for OverlayIdMap {
ctx: &CoreContext,
vertexes: Vec<Vertex>,
) -> Result<HashMap<Vertex, ChangesetId>> {
let mut result = self
.a
.find_many_changeset_ids(ctx, vertexes.clone())
.await?;
let to_get_b = vertexes
.into_iter()
.filter(|v| !result.contains_key(&v))
let from_mem = vertexes
.iter()
.filter(|&v| v >= &self.cutoff)
.cloned()
.collect();
let from_b = self.b.find_many_changeset_ids(ctx, to_get_b).await?;
for (v, cs) in from_b {
result.insert(v, cs);
let mut result = self.mem.find_many_changeset_ids(ctx, from_mem).await?;
let from_shared: Vec<Vertex> = vertexes
.iter()
.filter(|&v| v < &self.cutoff)
.cloned()
.collect();
if !from_shared.is_empty() {
let shared_result = self
.shared
.find_many_changeset_ids(ctx, from_shared)
.await?;
for (v, cs) in shared_result {
result.insert(v, cs);
}
}
Ok(result)
}
@ -168,22 +208,34 @@ impl IdMap for OverlayIdMap {
ctx: &CoreContext,
cs_ids: Vec<ChangesetId>,
) -> Result<HashMap<ChangesetId, Vertex>> {
let mut result = self.a.find_many_vertexes(ctx, cs_ids.clone()).await?;
let to_get_b = cs_ids
let mut result = self.mem.find_many_vertexes(ctx, cs_ids.clone()).await?;
for (cs, v) in result.iter() {
if v < &self.cutoff {
bail!(
"unexpected assignment found in mem idmap: {} for {} but cutoff is {}",
v,
cs,
self.cutoff
);
}
}
let to_get_shared = cs_ids
.into_iter()
.filter(|cs_id| !result.contains_key(&cs_id))
.collect();
let from_b = self.b.find_many_vertexes(ctx, to_get_b).await?;
for (cs, v) in from_b {
result.insert(cs, v);
let from_shared = self.shared.find_many_vertexes(ctx, to_get_shared).await?;
for (cs, v) in from_shared {
if v < self.cutoff {
result.insert(cs, v);
}
}
Ok(result)
}
async fn get_last_entry(&self, ctx: &CoreContext) -> Result<Option<(Vertex, ChangesetId)>> {
match self.a.get_last_entry(ctx).await? {
match self.mem.get_last_entry(ctx).await? {
Some(x) => Ok(Some(x)),
None => self.b.get_last_entry(ctx).await,
None => self.shared.get_last_entry(ctx).await,
}
}
}
@ -239,11 +291,14 @@ impl IdMapFactory {
// Servers have an overlay idmap which means that all their updates to the idmap stay confined
// to the Dag that performed the updates.
pub fn for_server(&self, ctx: &CoreContext, version: IdMapVersion) -> Arc<dyn IdMap> {
Arc::new(OverlayIdMap::new(
Arc::new(ConcurrentMemIdMap::new()),
self.for_writer(ctx, version),
))
pub fn for_server(
&self,
ctx: &CoreContext,
version: IdMapVersion,
iddag: &InProcessIdDag,
) -> Result<Arc<dyn IdMap>> {
let overlay = OverlayIdMap::from_iddag_and_idmap(iddag, self.for_writer(ctx, version))?;
Ok(Arc::new(overlay))
}
pub fn with_cache_handlers(mut self, cache_handlers: CacheHandlers) -> Self {
@ -260,40 +315,65 @@ mod tests {
use fbinit::FacebookInit;
use mononoke_types_mocks::changesetid::{AS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID};
use mononoke_types_mocks::changesetid::{
AS_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID,
};
#[fbinit::test]
async fn test_write_a_read_ab(fb: FacebookInit) -> Result<()> {
async fn test_overlay_idmap(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let a = Arc::new(ConcurrentMemIdMap::new());
let b = Arc::new(ConcurrentMemIdMap::new());
let shared: Arc<dyn IdMap> = Arc::new(ConcurrentMemIdMap::new());
b.insert_many(&ctx, vec![(Vertex(0), AS_CSID), (Vertex(1), ONES_CSID)])
shared
.insert_many(&ctx, vec![(Vertex(0), AS_CSID), (Vertex(1), ONES_CSID)])
.await?;
let both = OverlayIdMap::new(a, b);
let overlay = OverlayIdMap::new(Arc::clone(&shared), Vertex(2));
assert_eq!(
both.find_many_changeset_ids(&ctx, vec![Vertex(0), Vertex(1), Vertex(2)])
overlay
.find_many_changeset_ids(&ctx, vec![Vertex(0), Vertex(1), Vertex(2)])
.await?,
hashmap![Vertex(0) => AS_CSID, Vertex(1) => ONES_CSID]
);
both.insert_many(&ctx, vec![(Vertex(2), TWOS_CSID), (Vertex(3), THREES_CSID)])
overlay
.insert_many(&ctx, vec![(Vertex(2), TWOS_CSID), (Vertex(3), THREES_CSID)])
.await?;
assert_eq!(
both.find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)])
overlay
.find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)])
.await?,
hashmap![Vertex(2) => TWOS_CSID, Vertex(3) => THREES_CSID]
);
assert_eq!(
both.b
overlay
.shared
.find_many_changeset_ids(&ctx, vec![Vertex(2), Vertex(3)])
.await?,
hashmap![]
);
shared
.insert_many(
&ctx,
vec![
(Vertex(2), THREES_CSID),
(Vertex(3), TWOS_CSID),
(Vertex(4), FOURS_CSID),
],
)
.await?;
assert_eq!(
overlay
.find_many_changeset_ids(&ctx, vec![Vertex(3), Vertex(4)])
.await?,
hashmap![Vertex(3) => THREES_CSID]
);
Ok(())
}
}

View File

@ -116,7 +116,9 @@ impl SegmentedChangelogManager {
.load(&ctx, sc_version.iddag_version)
.await
.with_context(|| format!("repo {}: failed to load iddag", self.repo_id))?;
let idmap = self.idmap_factory.for_server(ctx, sc_version.idmap_version);
let idmap = self
.idmap_factory
.for_server(ctx, sc_version.idmap_version, &iddag)?;
slog::debug!(
ctx.logger(),
"segmented changelog dag successfully loaded - repo_id: {}, idmap_version: {}, \

View File

@ -261,7 +261,16 @@ pub async fn prepare_incremental_iddag_update<'a>(
let (cs_id, parents, vertex) = entry?;
start_state.insert_parents(cs_id, parents.clone());
if let Some(v) = vertex {
start_state.insert_vertex_assignment(cs_id, v);
if v < id_map_next_id {
start_state.insert_vertex_assignment(cs_id, v);
} else {
return Err(format_err!(
"racing data while updating segmented changelog, \
next_id is {} but found {} assigned",
id_map_next_id,
v
));
}
}
let vertex_missing_from_iddag = match vertex {
Some(v) => !iddag.contains_id(v)?,