segmented_changelog: add comments around IdMap insert expectations

Summary:
Comments for why we don't need a lock when updating the SqlIdMap with multiple
writers. Structure can definitely be improved but I'll live with this for a
short time.

No fundamental change in logic. I added extra checks to the insert function and
changed from an optimistic insert race logic to a pessimistic version. I
explain in the comments that it's to have an easier time reasoning about what
happens and that theoretically doesn't matter.

Reviewed By: quark-zju

Differential Revision: D25606290

fbshipit-source-id: ea21915fc797fe759b3fe481e8ad9e8cb594fb6a
This commit is contained in:
Stefan Filip 2020-12-23 16:49:29 -08:00 committed by Facebook GitHub Bot
parent dafd5c32bf
commit 65054f2044

View File

@ -8,7 +8,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{format_err, Result};
use anyhow::{format_err, Context, Result};
use async_trait::async_trait;
use futures::{compat::Future01CompatExt, future, TryFutureExt};
use sql::{queries, Connection};
@ -151,8 +151,77 @@ impl IdMap for SqlIdMap {
ctx: &CoreContext,
mut mappings: Vec<(Vertex, ChangesetId)>,
) -> Result<()> {
// On correctness. This code is slightly coupled with the IdMap update algorithm.
// We need to ensure algorithm correctness with multiple writers and potential failures.
// We need to "throttle" writes to prevent replication lag so big transaction are
// undesirable.
//
// The IdMap update happens before the IdDag update so if a process in killed in between
// those two steps, the update algorithm has to handle "a lagging" IdDag. The last IdDag
// computed may have fewer commits processed than the database IdMap.
//
// Since we cannot do updates in one transaction the IdMap may have partial data in the
// database from an update. To help with this problem we insert IdMap entries in increasing
// order by Vertex. This results in the invariant that all Vertexes between 1 and
// last_vertex are assigned. This means that the IdMap algorithm may have to deal with
// multiple "heads".
//
// Let's look at the situation where we have multiple update processes that start from
// different commits then race to update the database. If they insert the same results we
// may choose to be optimistic and allow them both to proceed with their process until some
// difference is encountered. Updating vertexes in order should leave the IdMap in a state
// that already has to be handled. That said, being pessimistic is easier to reason about
// so we rollback the transaction if any vertex in our batch is already present. What may
// happen is that one process updates a batch and second process starts a new update and
// wins the race to update. The first process aborts and we are in a state that we
// previously described as a requirement for the update algorithm.
STATS::insert.add_value(mappings.len() as i64);
mappings.sort();
// Ensure that we have no gaps in the assignments in the IdMap by validating that mappings
// has consecutive Vertexes and they start with last_vertex+1.
// This isn't a great place for these checks. I feel pretty clowny adding them here but
// they don't hurt. Might remove them later.
if let Some(&(first, _)) = mappings.first() {
if let Some(&(last, _)) = mappings.last() {
if first + mappings.len() as u64 != last + 1 {
return Err(format_err!(
"repo {}: mappings sent for insertion are not consecutive",
self.repo_id
));
}
}
match self
.get_last_entry(ctx)
.await
.context("error fetching last entry")?
{
None => {
if first.0 != 0 {
return Err(format_err!(
"repo {}: first vertex being inserted into idmap is not 0 ({})",
self.repo_id,
first,
));
}
}
Some((last_stored, _)) => {
if first != last_stored + 1 {
return Err(format_err!(
"repo {}: smallest vertex being inserted does not follow last entry \
({} + 1 != {})",
self.repo_id,
last_stored,
first
));
}
}
}
}
// With validation passed, we split the mappings into batches that we write in separate
// transactions.
for (i, chunk) in mappings.chunks(INSERT_MAX).enumerate() {
if i > 0 {
let wait_config = WaitForReplicationConfig::default().with_logger(ctx.logger());
@ -179,56 +248,25 @@ impl IdMap for SqlIdMap {
Err(err) => {
// transaction is "lost" to the query
return Err(err.context(format_err!(
"inserting many IdMap entries for repository {}",
"repo {}: failed inserting IdMap entries",
self.repo_id
)));
}
Ok((t, insert_result)) => {
transaction = t;
if insert_result.affected_rows() != chunk.len() as u64 {
let to_select: Vec<_> = chunk.iter().map(|v| (v.0).0).collect();
let (t, rows) = SelectManyChangesetIds::query_with_transaction(
transaction,
&self.repo_id,
&self.version,
&to_select[..],
)
.compat()
.await?;
transaction = t;
let changeset_mappings = rows
.into_iter()
.map(|row| (Vertex(row.0), row.1))
.collect::<HashMap<_, _>>();
for (vertex, cs_id) in chunk {
match changeset_mappings.get(vertex) {
None => {
transaction.rollback().compat().await?;
return Err(format_err!(
"Failed to insert entry ({} -> {}) in Idmap",
vertex,
cs_id
));
}
Some(store_cs_id) => {
if store_cs_id != cs_id {
transaction.rollback().compat().await?;
return Err(format_err!(
"Duplicate segmented changelog idmap entry {} \
has different assignments: {} vs {}",
vertex,
cs_id,
store_cs_id
));
}
// TODO(sfilip): log redundant insert call
}
};
}
transaction.rollback().compat().await?;
return Err(format_err!(
"repo {}: failed insert race, total entries {}, batch {}",
self.repo_id,
mappings.len(),
i
));
} else {
transaction.commit().compat().await?;
}
}
}
transaction.commit().compat().await?;
}
Ok(())
}
@ -297,9 +335,12 @@ impl IdMap for SqlIdMap {
async fn get_last_entry(&self, ctx: &CoreContext) -> Result<Option<(Vertex, ChangesetId)>> {
STATS::get_last_entry.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
.increment_counter(PerfCounterType::SqlReadsMaster);
// From the update algorithm perspective, it makes most sense to read from master. Because
// trying to insert a value that was already inserted will fail the whole processing an
// outdated entry will definitely lead to wasted work.
let rows = SelectLastEntry::query(
&self.connections.read_connection,
&self.connections.write_connection,
&self.repo_id,
&self.version,
)
@ -318,7 +359,7 @@ mod tests {
use fbinit::FacebookInit;
use mononoke_types_mocks::changesetid::{
FIVES_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID,
AS_CSID, FIVES_CSID, FOURS_CSID, ONES_CSID, THREES_CSID, TWOS_CSID,
};
use sql_construct::SqlConstruct;
@ -337,6 +378,7 @@ mod tests {
assert_eq!(idmap.get_last_entry(&ctx).await?, None);
idmap.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap.insert(&ctx, Vertex(1), ONES_CSID).await?;
idmap.insert(&ctx, Vertex(2), TWOS_CSID).await?;
idmap.insert(&ctx, Vertex(3), THREES_CSID).await?;
@ -357,6 +399,9 @@ mod tests {
assert_eq!(idmap.get_last_entry(&ctx).await?, None);
idmap.insert_many(&ctx, vec![]).await?;
assert!(idmap.get_changeset_id(&ctx, Vertex(1)).await.is_err());
idmap.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap
.insert_many(
&ctx,
@ -371,20 +416,22 @@ mod tests {
assert_eq!(idmap.get_changeset_id(&ctx, Vertex(1)).await?, ONES_CSID);
assert_eq!(idmap.get_changeset_id(&ctx, Vertex(3)).await?, THREES_CSID);
idmap
.insert_many(
&ctx,
vec![
(Vertex(1), ONES_CSID),
(Vertex(2), TWOS_CSID),
(Vertex(3), THREES_CSID),
],
)
.await?;
assert_eq!(idmap.get_changeset_id(&ctx, Vertex(2)).await?, TWOS_CSID);
assert!(
idmap
.insert_many(
&ctx,
vec![
(Vertex(1), ONES_CSID),
(Vertex(2), TWOS_CSID),
(Vertex(3), THREES_CSID),
],
)
.await
.is_err()
);
idmap
.insert_many(&ctx, vec![(Vertex(1), ONES_CSID), (Vertex(4), FOURS_CSID)])
.insert_many(&ctx, vec![(Vertex(4), FOURS_CSID)])
.await?;
assert_eq!(idmap.get_changeset_id(&ctx, Vertex(4)).await?, FOURS_CSID);
@ -408,6 +455,7 @@ mod tests {
.await?;
assert!(response.is_empty());
idmap.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap
.insert_many(
&ctx,
@ -466,6 +514,10 @@ mod tests {
.with_idmap_version(1)
.build_sql_idmap()?;
idmap11.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap12.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap21.insert(&ctx, Vertex(0), AS_CSID).await?;
idmap11.insert(&ctx, Vertex(1), ONES_CSID).await?;
idmap11.insert(&ctx, Vertex(2), TWOS_CSID).await?;
idmap12.insert(&ctx, Vertex(1), TWOS_CSID).await?;