diff --git a/eden/mononoke/segmented_changelog/src/idmap/sql.rs b/eden/mononoke/segmented_changelog/src/idmap/sql.rs index 32955756b3..479336ffe3 100644 --- a/eden/mononoke/segmented_changelog/src/idmap/sql.rs +++ b/eden/mononoke/segmented_changelog/src/idmap/sql.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::{format_err, Result}; +use anyhow::{format_err, Context, Result}; use async_trait::async_trait; use futures::{compat::Future01CompatExt, future, TryFutureExt}; use sql::{queries, Connection}; @@ -151,8 +151,77 @@ impl IdMap for SqlIdMap { ctx: &CoreContext, mut mappings: Vec<(Vertex, ChangesetId)>, ) -> Result<()> { + // On correctness. This code is slightly coupled with the IdMap update algorithm. + // We need to ensure algorithm correctness with multiple writers and potential failures. + // We need to "throttle" writes to prevent replication lag so big transaction are + // undesirable. + // + // The IdMap update happens before the IdDag update so if a process in killed in between + // those two steps, the update algorithm has to handle "a lagging" IdDag. The last IdDag + // computed may have fewer commits processed than the database IdMap. + // + // Since we cannot do updates in one transaction the IdMap may have partial data in the + // database from an update. To help with this problem we insert IdMap entries in increasing + // order by Vertex. This results in the invariant that all Vertexes between 1 and + // last_vertex are assigned. This means that the IdMap algorithm may have to deal with + // multiple "heads". + // + // Let's look at the situation where we have multiple update processes that start from + // different commits then race to update the database. If they insert the same results we + // may choose to be optimistic and allow them both to proceed with their process until some + // difference is encountered. Updating vertexes in order should leave the IdMap in a state + // that already has to be handled. That said, being pessimistic is easier to reason about + // so we rollback the transaction if any vertex in our batch is already present. What may + // happen is that one process updates a batch and second process starts a new update and + // wins the race to update. The first process aborts and we are in a state that we + // previously described as a requirement for the update algorithm. STATS::insert.add_value(mappings.len() as i64); mappings.sort(); + + // Ensure that we have no gaps in the assignments in the IdMap by validating that mappings + // has consecutive Vertexes and they start with last_vertex+1. + // This isn't a great place for these checks. I feel pretty clowny adding them here but + // they don't hurt. Might remove them later. + if let Some(&(first, _)) = mappings.first() { + if let Some(&(last, _)) = mappings.last() { + if first + mappings.len() as u64 != last + 1 { + return Err(format_err!( + "repo {}: mappings sent for insertion are not consecutive", + self.repo_id + )); + } + } + match self + .get_last_entry(ctx) + .await + .context("error fetching last entry")? + { + None => { + if first.0 != 0 { + return Err(format_err!( + "repo {}: first vertex being inserted into idmap is not 0 ({})", + self.repo_id, + first, + )); + } + } + Some((last_stored, _)) => { + if first != last_stored + 1 { + return Err(format_err!( + "repo {}: smallest vertex being inserted does not follow last entry \ + ({} + 1 != {})", + self.repo_id, + last_stored, + first + )); + } + } + } + } + + + // With validation passed, we split the mappings into batches that we write in separate + // transactions. for (i, chunk) in mappings.chunks(INSERT_MAX).enumerate() { if i > 0 { let wait_config = WaitForReplicationConfig::default().with_logger(ctx.logger()); @@ -179,56 +248,25 @@ impl IdMap for SqlIdMap { Err(err) => { // transaction is "lost" to the query return Err(err.context(format_err!( - "inserting many IdMap entries for repository {}", + "repo {}: failed inserting IdMap entries", self.repo_id ))); } Ok((t, insert_result)) => { transaction = t; if insert_result.affected_rows() != chunk.len() as u64 { - let to_select: Vec<_> = chunk.iter().map(|v| (v.0).0).collect(); - let (t, rows) = SelectManyChangesetIds::query_with_transaction( - transaction, - &self.repo_id, - &self.version, - &to_select[..], - ) - .compat() - .await?; - transaction = t; - let changeset_mappings = rows - .into_iter() - .map(|row| (Vertex(row.0), row.1)) - .collect::>(); - for (vertex, cs_id) in chunk { - match changeset_mappings.get(vertex) { - None => { - transaction.rollback().compat().await?; - return Err(format_err!( - "Failed to insert entry ({} -> {}) in Idmap", - vertex, - cs_id - )); - } - Some(store_cs_id) => { - if store_cs_id != cs_id { - transaction.rollback().compat().await?; - return Err(format_err!( - "Duplicate segmented changelog idmap entry {} \ - has different assignments: {} vs {}", - vertex, - cs_id, - store_cs_id - )); - } - // TODO(sfilip): log redundant insert call - } - }; - } + transaction.rollback().compat().await?; + return Err(format_err!( + "repo {}: failed insert race, total entries {}, batch {}", + self.repo_id, + mappings.len(), + i + )); + } else { + transaction.commit().compat().await?; } } } - transaction.commit().compat().await?; } Ok(()) } @@ -297,9 +335,12 @@ impl IdMap for SqlIdMap { async fn get_last_entry(&self, ctx: &CoreContext) -> Result> { STATS::get_last_entry.add_value(1); ctx.perf_counters() - .increment_counter(PerfCounterType::SqlReadsReplica); + .increment_counter(PerfCounterType::SqlReadsMaster); + // From the update algorithm perspective, it makes most sense to read from master. Because + // trying to insert a value that was already inserted will fail the whole processing an + // outdated entry will definitely lead to wasted work. let rows = SelectLastEntry::query( - &self.connections.read_connection, + &self.connections.write_connection, &self.repo_id, &self.version, ) @@ -318,7 +359,7 @@ mod tests { use fbinit::FacebookInit; use mononoke_types_mocks::changesetid::{ - FIVES_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID, + AS_CSID, FIVES_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID, }; use sql_construct::SqlConstruct; @@ -337,6 +378,7 @@ mod tests { assert_eq!(idmap.get_last_entry(&ctx).await?, None); + idmap.insert(&ctx, Vertex(0), AS_CSID).await?; idmap.insert(&ctx, Vertex(1), ONES_CSID).await?; idmap.insert(&ctx, Vertex(2), TWOS_CSID).await?; idmap.insert(&ctx, Vertex(3), THREES_CSID).await?; @@ -357,6 +399,9 @@ mod tests { assert_eq!(idmap.get_last_entry(&ctx).await?, None); idmap.insert_many(&ctx, vec![]).await?; + assert!(idmap.get_changeset_id(&ctx, Vertex(1)).await.is_err()); + + idmap.insert(&ctx, Vertex(0), AS_CSID).await?; idmap .insert_many( &ctx, @@ -371,20 +416,22 @@ mod tests { assert_eq!(idmap.get_changeset_id(&ctx, Vertex(1)).await?, ONES_CSID); assert_eq!(idmap.get_changeset_id(&ctx, Vertex(3)).await?, THREES_CSID); - idmap - .insert_many( - &ctx, - vec![ - (Vertex(1), ONES_CSID), - (Vertex(2), TWOS_CSID), - (Vertex(3), THREES_CSID), - ], - ) - .await?; - assert_eq!(idmap.get_changeset_id(&ctx, Vertex(2)).await?, TWOS_CSID); + assert!( + idmap + .insert_many( + &ctx, + vec![ + (Vertex(1), ONES_CSID), + (Vertex(2), TWOS_CSID), + (Vertex(3), THREES_CSID), + ], + ) + .await + .is_err() + ); idmap - .insert_many(&ctx, vec![(Vertex(1), ONES_CSID), (Vertex(4), FOURS_CSID)]) + .insert_many(&ctx, vec![(Vertex(4), FOURS_CSID)]) .await?; assert_eq!(idmap.get_changeset_id(&ctx, Vertex(4)).await?, FOURS_CSID); @@ -408,6 +455,7 @@ mod tests { .await?; assert!(response.is_empty()); + idmap.insert(&ctx, Vertex(0), AS_CSID).await?; idmap .insert_many( &ctx, @@ -466,6 +514,10 @@ mod tests { .with_idmap_version(1) .build_sql_idmap()?; + idmap11.insert(&ctx, Vertex(0), AS_CSID).await?; + idmap12.insert(&ctx, Vertex(0), AS_CSID).await?; + idmap21.insert(&ctx, Vertex(0), AS_CSID).await?; + idmap11.insert(&ctx, Vertex(1), ONES_CSID).await?; idmap11.insert(&ctx, Vertex(2), TWOS_CSID).await?; idmap12.insert(&ctx, Vertex(1), TWOS_CSID).await?;