segmented_changelog: add MismatchedHeadsError

Summary:
This error occurs when the client sends us a set of heads that we cannot match.
For example when the client forces a commit in the master group; this was
possible with revlogs but should be a bug with Segmented Changelog. This can
also happen when master moves backwards, clients and server have multiple heads
then the server reseeds.

Clients that get this error should reclone.

Reviewed By: quark-zju

Differential Revision: D27786602

fbshipit-source-id: 9854ccee929ae0a845236ebd83ed68158c93fc7a
This commit is contained in:
Stefan Filip 2021-04-20 20:19:20 -07:00 committed by Facebook GitHub Bot
parent 9c9e18568c
commit c8aaece37b
8 changed files with 125 additions and 51 deletions

View File

@ -116,12 +116,21 @@ impl From<BookmarkMovementError> for MononokeError {
impl From<MononokeError> for edenapi_types::ServerError {
fn from(e: MononokeError) -> Self {
Self::new(format!("{:?}", e))
edenapi_types::ServerError::from(&e)
}
}
impl From<&MononokeError> for edenapi_types::ServerError {
fn from(e: &MononokeError) -> Self {
Self::new(format!("{:?}", e))
let message = format!("{:?}", e);
let code = match e {
MononokeError::InternalError(e)
if e.0.is::<segmented_changelog::MismatchedHeadsError>() =>
{
1
}
_ => 0,
};
Self::new(message, code)
}
}

View File

@ -39,7 +39,8 @@ mod tests;
pub use segmented_changelog_types::{
dag, ArcSegmentedChangelog, CloneData, DagIdSet, FirstAncestorConstraint, FlatSegment, Group,
InProcessIdDag, Location, PreparedFlatSegments, SegmentedChangelog, StreamCloneData, Vertex,
InProcessIdDag, Location, MismatchedHeadsError, PreparedFlatSegments, SegmentedChangelog,
StreamCloneData, Vertex,
};
pub use crate::builder::{new_server_segmented_changelog, SegmentedChangelogSqlConnections};

View File

@ -30,8 +30,10 @@ use mononoke_types::{ChangesetId, RepositoryId};
use crate::idmap::IdMap;
use crate::read_only::ReadOnlySegmentedChangelog;
use crate::update::{prepare_incremental_iddag_update, update_iddag};
use crate::{segmented_changelog_delegate, SegmentedChangelog, StreamCloneData};
use crate::{CloneData, Group, InProcessIdDag, Location};
use crate::{
segmented_changelog_delegate, CloneData, Group, InProcessIdDag, Location, MismatchedHeadsError,
SegmentedChangelog, StreamCloneData,
};
define_stats! {
prefix = "mononoke.segmented_changelog.ondemand";
@ -248,7 +250,7 @@ impl OnDemandUpdateSegmentedChangelog {
}
async fn build_up_to_heads(&self, ctx: &CoreContext, heads: &[ChangesetId]) -> Result<()> {
if !self.is_one_cs_assigned(ctx, heads).await? {
if !self.are_heads_assigned(ctx, heads).await? {
self.build_up_to_bookmark(ctx).await?;
// The IdDag has two groups, the MASTER group and the NON_MASTER group. The MASTER
// group is reserved for the ancestors of the master bookmark. The MASTER group should
@ -258,29 +260,27 @@ impl OnDemandUpdateSegmentedChangelog {
// to. At the moment we only handle updating the MASTER group.
// Note for the future. We should pay attention to potential races between a changeset
// being used and the bookmark being updated.
if !self.is_one_cs_assigned(ctx, heads).await? {
bail!(
"repo {}: failed to assign all cs {:?}, not ancestor of bookmark {}",
self.repo_id,
heads,
self.master_bookmark
);
if !self.are_heads_assigned(ctx, heads).await? {
let err = MismatchedHeadsError::new(self.repo_id, heads.to_vec());
return Err(err.into());
}
}
Ok(())
}
async fn is_one_cs_assigned(&self, ctx: &CoreContext, cs_ids: &[ChangesetId]) -> Result<bool> {
let vertex_map = self.idmap.find_many_vertexes(ctx, cs_ids.to_vec()).await?;
if !vertex_map.is_empty() {
let iddag = self.iddag.read().await;
for (_cs_id, vertex) in vertex_map {
if iddag.contains_id(vertex)? {
return Ok(true);
}
async fn are_heads_assigned(&self, ctx: &CoreContext, heads: &[ChangesetId]) -> Result<bool> {
let vertex_map = self.idmap.find_many_vertexes(ctx, heads.to_vec()).await?;
if vertex_map.len() != heads.len() {
return Ok(false);
}
// It is safer to check that the vertexes we got are also in the iddag.
let iddag = self.iddag.read().await;
for (_cs_id, vertex) in vertex_map {
if !iddag.contains_id(vertex)? {
return Ok(false);
}
}
Ok(false)
Ok(true)
}
async fn is_cs_assigned(&self, ctx: &CoreContext, cs_id: ChangesetId) -> Result<bool> {

View File

@ -928,3 +928,29 @@ async fn test_periodic_reload(fb: FacebookInit) -> Result<()> {
Ok(())
}
#[fbinit::test]
async fn test_mismatched_heads(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let blobrepo = branch_even::getrepo(fb).await;
let dag = new_isolated_on_demand_update(&blobrepo);
let h1 = resolve_cs_id(&ctx, &blobrepo, "4f7f3fd428bec1a48f9314414b063c706d9c1aed").await?;
let h1_parent =
resolve_cs_id(&ctx, &blobrepo, "b65231269f651cfe784fd1d97ef02a049a37b8a0").await?;
assert_eq!(
dag.changeset_id_to_location(&ctx, vec![h1], h1_parent)
.await?,
Some(Location::new(h1, 1))
);
let h2 = resolve_cs_id(&ctx, &blobrepo, "16839021e338500b3cf7c9b871c8a07351697d68").await?;
let err = dag
.changeset_id_to_location(&ctx, vec![h1, h2], h1_parent)
.await
.err()
.unwrap();
assert!(err.is::<crate::MismatchedHeadsError>());
Ok(())
}

View File

@ -17,3 +17,4 @@ dag = { version = "0.1.0", path = "../../../scm/lib/dag", features = ["for-tests
facet = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures = { version = "0.3.13", features = ["async-await", "compat"] }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
thiserror = "1.0"

View File

@ -16,7 +16,8 @@ use async_trait::async_trait;
use auto_impl::auto_impl;
use context::CoreContext;
use futures::stream::BoxStream;
use mononoke_types::ChangesetId;
use mononoke_types::{ChangesetId, RepositoryId};
use thiserror::Error;
pub use dag;
pub use dag::{
@ -24,12 +25,6 @@ pub use dag::{
InProcessIdDag, Location, PreparedFlatSegments,
};
pub struct StreamCloneData<T> {
pub head_id: Vertex,
pub flat_segments: PreparedFlatSegments,
pub idmap_stream: BoxStream<'static, Result<(Vertex, T)>>,
}
#[facet::facet]
#[async_trait]
#[auto_impl(Arc)]
@ -117,3 +112,25 @@ pub trait SegmentedChangelog: Send + Sync {
ctx: &CoreContext,
) -> Result<StreamCloneData<ChangesetId>>;
}
pub struct StreamCloneData<T> {
pub head_id: Vertex,
pub flat_segments: PreparedFlatSegments,
pub idmap_stream: BoxStream<'static, Result<(Vertex, T)>>,
}
#[derive(Debug, Error)]
#[error("server cannot match the clients heads, repo {repo_id}, client_heads: {client_heads:?}")]
pub struct MismatchedHeadsError {
pub repo_id: RepositoryId,
pub client_heads: Vec<ChangesetId>,
}
impl MismatchedHeadsError {
pub fn new(repo_id: RepositoryId, client_heads: Vec<ChangesetId>) -> Self {
Self {
repo_id,
client_heads,
}
}
}

View File

@ -8,38 +8,43 @@
use serde::Serialize;
use thiserror::Error;
use crate::wire::{ToWire, WireError};
#[derive(Clone, Debug, Error, Eq, PartialEq)]
#[derive(Serialize)] // used to convert to Python
#[error("server error: {message}")]
#[error("server error (code {code}): {message}")]
/// Common error structure between Mononoke and Mercurial.
/// The `message` field is self explanatory, a natural language description of the issue that was
/// encountered.
/// The `code` field represents a numeric identifier of the type of issue that was encountered. In
/// most situations the code will be `0`, meaning that there is nothing special about the error.
/// Non-zero codes are used for situations where the client wants to take a specific action (when
/// the client needs to handle that error).
///
/// Error code list:
/// ---------------
/// 1: SegmentedChangelogMismatchedHeads
/// Fatal inconsistency between client and server. The client will want to reclone in this
/// situation.
pub struct ServerError {
message: String,
pub message: String,
pub code: u64,
}
impl ServerError {
pub fn new<M: Into<String>>(m: M) -> Self {
Self { message: m.into() }
pub fn new<M: Into<String>>(m: M, code: u64) -> Self {
Self {
message: m.into(),
code,
}
}
}
impl From<anyhow::Error> for ServerError {
fn from(e: anyhow::Error) -> Self {
Self::new(format!("{:?}", e))
}
}
impl ToWire for ServerError {
type Wire = WireError;
fn to_wire(self) -> Self::Wire {
WireError::new(self.message)
pub fn generic<M: Into<String>>(m: M) -> Self {
Self::new(m, 0)
}
}
#[cfg(any(test, feature = "for-tests"))]
impl quickcheck::Arbitrary for ServerError {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
ServerError::new(String::arbitrary(g))
ServerError::new(String::arbitrary(g), u64::arbitrary(g))
}
}

View File

@ -16,12 +16,26 @@ pub type WireResult<T> = Result<T, WireError>;
pub struct WireError {
#[serde(rename = "1")]
pub message: Option<String>,
#[serde(rename = "2")]
pub code: Option<u64>,
}
impl WireError {
pub fn new<M: Into<String>>(m: M) -> Self {
pub fn new<M: Into<String>>(m: M, code: u64) -> Self {
Self {
message: Some(m.into()),
code: Some(code),
}
}
}
impl ToWire for ServerError {
type Wire = WireError;
fn to_wire(self) -> Self::Wire {
WireError {
message: Some(self.message),
code: Some(self.code),
}
}
}
@ -34,7 +48,8 @@ impl ToApi for WireError {
let message = self
.message
.ok_or_else(|| WireToApiConversionError::CannotPopulateRequiredField("message"))?;
Ok(ServerError::new(message))
let code = self.code.unwrap_or(0);
Ok(ServerError::new(message, code))
}
}
@ -75,7 +90,7 @@ where
#[cfg(any(test, feature = "for-tests"))]
impl quickcheck::Arbitrary for WireError {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
WireError::new(String::arbitrary(g))
WireError::new(String::arbitrary(g), u64::arbitrary(g))
}
}