mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 00:45:18 +03:00
mononoke: start syncing globalrevs to darkstorm repos via hg sync job
Reviewed By: krallin Differential Revision: D27268740 fbshipit-source-id: d6688d3655b43d4a276c030bc9b0efa851273b7e
This commit is contained in:
parent
e6fae1b836
commit
b5d9e79c9c
@ -19,6 +19,7 @@ base64 = "0.11.0"
|
||||
blobrepo = { version = "0.1.0", path = "../blobrepo" }
|
||||
blobrepo_hg = { version = "0.1.0", path = "../blobrepo/blobrepo_hg" }
|
||||
blobstore = { version = "0.1.0", path = "../blobstore" }
|
||||
bonsai_globalrev_mapping = { version = "0.1.0", path = "../bonsai_globalrev_mapping" }
|
||||
bookmarks = { version = "0.1.0", path = "../bookmarks" }
|
||||
borrowed = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
bytes = { version = "0.5", features = ["serde"] }
|
||||
@ -72,7 +73,6 @@ tokio = { version = "0.2.25", features = ["full", "test-util"] }
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.5"
|
||||
async_unit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
bonsai_globalrev_mapping = { version = "0.1.0", path = "../bonsai_globalrev_mapping" }
|
||||
fbinit-tokio-02 = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
memblob = { version = "0.1.0", path = "../blobstore/memblob" }
|
||||
mercurial_types-mocks = { version = "0.1.0", path = "../mercurial/types/mocks" }
|
||||
|
@ -5,16 +5,20 @@
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use crate::CommitsInBundle;
|
||||
use anyhow::{format_err, Error};
|
||||
use blobrepo::BlobRepo;
|
||||
use bonsai_globalrev_mapping::BonsaiGlobalrevMappingEntry;
|
||||
use bookmarks::BookmarkName;
|
||||
use context::CoreContext;
|
||||
use fbinit::FacebookInit;
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use metaconfig_types::HgsqlGlobalrevsName;
|
||||
use mononoke_types::ChangesetId;
|
||||
use sql::{queries, Connection};
|
||||
use sql_construct::{facebook::FbSqlConstruct, SqlConstruct};
|
||||
use sql_ext::{facebook::MysqlOptions, SqlConnections};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -22,6 +26,12 @@ use std::sync::Arc;
|
||||
pub enum GlobalrevSyncer {
|
||||
Noop,
|
||||
Sql(Arc<SqlGlobalrevSyncer>),
|
||||
Darkstorm(Arc<DarkstormGlobalrevSyncer>),
|
||||
}
|
||||
|
||||
pub struct DarkstormGlobalrevSyncer {
|
||||
orig_repo: BlobRepo,
|
||||
darkstorm_repo: BlobRepo,
|
||||
}
|
||||
|
||||
pub struct SqlGlobalrevSyncer {
|
||||
@ -82,19 +92,73 @@ impl GlobalrevSyncer {
|
||||
Ok(GlobalrevSyncer::Sql(Arc::new(syncer)))
|
||||
}
|
||||
|
||||
pub fn darkstorm(orig_repo: &BlobRepo, darkstorm_repo: &BlobRepo) -> Self {
|
||||
Self::Darkstorm(Arc::new(DarkstormGlobalrevSyncer {
|
||||
orig_repo: orig_repo.clone(),
|
||||
darkstorm_repo: darkstorm_repo.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn sync(
|
||||
&self,
|
||||
ctx: &CoreContext,
|
||||
bookmark: &BookmarkName,
|
||||
bcs_id: ChangesetId,
|
||||
commits: &CommitsInBundle,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
Self::Noop => Ok(()),
|
||||
Self::Sql(syncer) => syncer.sync(ctx, bookmark, bcs_id).await,
|
||||
Self::Darkstorm(syncer) => syncer.sync(ctx, commits).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DarkstormGlobalrevSyncer {
|
||||
pub async fn sync(&self, ctx: &CoreContext, commits: &CommitsInBundle) -> Result<(), Error> {
|
||||
let commits = match commits {
|
||||
CommitsInBundle::Commits(commits) => commits,
|
||||
CommitsInBundle::Unknown => {
|
||||
return Err(format_err!(
|
||||
"can't use darkstorm globalrev syncer because commits that were \
|
||||
sent in the bundle are not known"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let bcs_id_to_globalrev = stream::iter(commits.iter().map(|bcs_id| async move {
|
||||
let maybe_globalrev = self
|
||||
.orig_repo
|
||||
.get_globalrev_from_bonsai(ctx, *bcs_id)
|
||||
.await?;
|
||||
Result::<_, Error>::Ok((bcs_id, maybe_globalrev))
|
||||
}))
|
||||
.map(Ok)
|
||||
.try_buffer_unordered(100)
|
||||
.try_filter_map(|(bcs_id, maybe_globalrev)| async move {
|
||||
Ok(maybe_globalrev.map(|globalrev| (bcs_id, globalrev)))
|
||||
})
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
.await?;
|
||||
|
||||
let entries = bcs_id_to_globalrev
|
||||
.into_iter()
|
||||
.map(|(bcs_id, globalrev)| BonsaiGlobalrevMappingEntry {
|
||||
repo_id: self.darkstorm_repo.get_repoid(),
|
||||
bcs_id: *bcs_id,
|
||||
globalrev,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.darkstorm_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.bulk_import(ctx, &entries[..])
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlGlobalrevSyncer {
|
||||
pub async fn sync(
|
||||
&self,
|
||||
@ -180,9 +244,11 @@ mod test {
|
||||
use super::*;
|
||||
use bonsai_globalrev_mapping::BonsaiGlobalrevMappingEntry;
|
||||
use mercurial_types_mocks::globalrev::{GLOBALREV_ONE, GLOBALREV_THREE, GLOBALREV_TWO};
|
||||
use mononoke_types::RepositoryId;
|
||||
use mononoke_types_mocks::changesetid::{ONES_CSID, TWOS_CSID};
|
||||
use mononoke_types_mocks::repo::REPO_ZERO;
|
||||
use sql::rusqlite::Connection as SqliteConnection;
|
||||
use test_repo_factory::TestRepoFactory;
|
||||
|
||||
queries! {
|
||||
write InitGlobalrevCounter(repo: String, rev: u64) {
|
||||
@ -290,4 +356,71 @@ mod test {
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_sync_darkstorm(fb: FacebookInit) -> Result<(), Error> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
|
||||
let orig_repo: BlobRepo = TestRepoFactory::new()?
|
||||
.with_id(RepositoryId::new(0))
|
||||
.build()?;
|
||||
let darkstorm_repo: BlobRepo = TestRepoFactory::new()?
|
||||
.with_id(RepositoryId::new(1))
|
||||
.build()?;
|
||||
|
||||
let e1 = BonsaiGlobalrevMappingEntry {
|
||||
repo_id: REPO_ZERO,
|
||||
bcs_id: ONES_CSID,
|
||||
globalrev: GLOBALREV_ONE,
|
||||
};
|
||||
|
||||
let e2 = BonsaiGlobalrevMappingEntry {
|
||||
repo_id: REPO_ZERO,
|
||||
bcs_id: TWOS_CSID,
|
||||
globalrev: GLOBALREV_TWO,
|
||||
};
|
||||
orig_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.bulk_import(&ctx, &[e1, e2])
|
||||
.await?;
|
||||
|
||||
let syncer = DarkstormGlobalrevSyncer {
|
||||
orig_repo,
|
||||
darkstorm_repo: darkstorm_repo.clone(),
|
||||
};
|
||||
|
||||
assert!(
|
||||
darkstorm_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.get_globalrev_from_bonsai(&ctx, darkstorm_repo.get_repoid(), ONES_CSID)
|
||||
.await?
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
darkstorm_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.get_globalrev_from_bonsai(&ctx, darkstorm_repo.get_repoid(), TWOS_CSID)
|
||||
.await?
|
||||
.is_none()
|
||||
);
|
||||
syncer
|
||||
.sync(&ctx, &CommitsInBundle::Commits(vec![ONES_CSID, TWOS_CSID]))
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
Some(GLOBALREV_ONE),
|
||||
darkstorm_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.get_globalrev_from_bonsai(&ctx, darkstorm_repo.get_repoid(), ONES_CSID)
|
||||
.await?
|
||||
);
|
||||
assert_eq!(
|
||||
Some(GLOBALREV_TWO),
|
||||
darkstorm_repo
|
||||
.bonsai_globalrev_mapping()
|
||||
.get_globalrev_from_bonsai(&ctx, darkstorm_repo.get_repoid(), TWOS_CSID)
|
||||
.await?
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -379,7 +379,7 @@ pub struct CombinedBookmarkUpdateLogEntry {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum CommitsInBundle {
|
||||
pub enum CommitsInBundle {
|
||||
Commits(Vec<ChangesetId>),
|
||||
Unknown,
|
||||
}
|
||||
@ -425,7 +425,12 @@ async fn sync_single_combined_entry(
|
||||
) -> Result<RetryAttemptsCount, Error> {
|
||||
if let Some((cs_id, _hg_cs_id)) = combined_entry.cs_id {
|
||||
globalrev_syncer
|
||||
.sync(ctx, &combined_entry.bookmark, cs_id)
|
||||
.sync(
|
||||
ctx,
|
||||
&combined_entry.bookmark,
|
||||
cs_id,
|
||||
&combined_entry.commits,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
||||
@ -821,48 +826,56 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(
|
||||
}
|
||||
};
|
||||
|
||||
let globalrev_syncer = {
|
||||
let params = match (
|
||||
hgsql_db_addr.as_ref(),
|
||||
repo_config
|
||||
.pushrebase
|
||||
.globalrevs_publishing_bookmark
|
||||
.as_ref(),
|
||||
generate_bundles,
|
||||
) {
|
||||
(Some(addr), Some(book), true) => Some((addr.as_str(), book.clone())),
|
||||
(Some(..), Some(..), false) => {
|
||||
return Err(format_err!(
|
||||
"Syncing globalrevs ({}) requires generating bundles ({})",
|
||||
HGSQL_GLOBALREVS_DB_ADDR,
|
||||
GENERATE_BUNDLES
|
||||
));
|
||||
let globalrevs_publishing_bookmark = repo_config
|
||||
.pushrebase
|
||||
.globalrevs_publishing_bookmark
|
||||
.as_ref();
|
||||
borrowed!(mysql_options, maybe_darkstorm_backup_repo);
|
||||
let globalrev_syncer = async move {
|
||||
let globalrev_syncer = match globalrevs_publishing_bookmark {
|
||||
Some(bookmark) => {
|
||||
if !generate_bundles {
|
||||
return Err(format_err!(
|
||||
"Syncing globalrevs ({}) requires generating bundles ({})",
|
||||
HGSQL_GLOBALREVS_DB_ADDR,
|
||||
GENERATE_BUNDLES
|
||||
));
|
||||
}
|
||||
|
||||
match (hgsql_db_addr.as_ref(), maybe_darkstorm_backup_repo) {
|
||||
(Some(addr), None) => {
|
||||
GlobalrevSyncer::new(
|
||||
fb,
|
||||
// FIXME: this clone should go away once GlobalrevSyncer is asyncified
|
||||
repo.clone(),
|
||||
hgsql_use_sqlite,
|
||||
Some((addr.as_str(), bookmark.clone())),
|
||||
&mysql_options,
|
||||
readonly_storage.0,
|
||||
hgsql_globalrevs_name,
|
||||
)
|
||||
.await
|
||||
}
|
||||
(None, Some(darkstorm_backup_repo)) => {
|
||||
Ok(GlobalrevSyncer::darkstorm(&repo, &darkstorm_backup_repo))
|
||||
}
|
||||
(Some(_), Some(_)) => {
|
||||
return Err(format_err!(
|
||||
"This repository has both hgsql and darkstorm repo specified - this is unsupported",
|
||||
));
|
||||
}
|
||||
(None, None) => {
|
||||
return Err(format_err!(
|
||||
"This repository has Globalrevs enabled but syncing ({}) is not enabled",
|
||||
HGSQL_GLOBALREVS_DB_ADDR,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
(Some(..), None, ..) => {
|
||||
return Err(format_err!(
|
||||
"Syncing globalrevs ({}) requires a globalrevs_publishing_bookmark",
|
||||
HGSQL_GLOBALREVS_DB_ADDR,
|
||||
));
|
||||
}
|
||||
(None, Some(..), ..) => {
|
||||
return Err(format_err!(
|
||||
"This repository has Globalrevs enabled but syncing ({}) is not enabled",
|
||||
HGSQL_GLOBALREVS_DB_ADDR,
|
||||
));
|
||||
}
|
||||
(None, None, ..) => None,
|
||||
None => Ok(GlobalrevSyncer::Noop),
|
||||
};
|
||||
|
||||
GlobalrevSyncer::new(
|
||||
fb,
|
||||
// FIXME: this clone should go away once GlobalrevSyncer is asyncified
|
||||
repo.clone(),
|
||||
hgsql_use_sqlite,
|
||||
params,
|
||||
&mysql_options,
|
||||
readonly_storage.0,
|
||||
hgsql_globalrevs_name,
|
||||
)
|
||||
globalrev_syncer
|
||||
};
|
||||
|
||||
try_join3(preparer, overlay, globalrev_syncer)
|
||||
|
@ -0,0 +1,86 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This software may be used and distributed according to the terms of the
|
||||
# GNU General Public License found in the LICENSE file in the root
|
||||
# directory of this source tree.
|
||||
|
||||
$ . "${TEST_FIXTURES}/library.sh"
|
||||
|
||||
setup configuration
|
||||
|
||||
$ DISALLOW_NON_PUSHREBASE=1 GLOBALREVS_PUBLISHING_BOOKMARK=master_bookmark REPOID=0 REPONAME=orig setup_common_config blob_files
|
||||
$ REPOID=1 REPONAME=backup setup_common_config blob_files
|
||||
$ export BACKUP_REPO_ID=1
|
||||
$ cd $TESTTMP
|
||||
|
||||
setup repo
|
||||
|
||||
$ hginit_treemanifest repo-hg
|
||||
$ cd repo-hg
|
||||
$ echo foo > a
|
||||
$ echo foo > b
|
||||
$ hg addremove && hg ci -m 'initial'
|
||||
adding a
|
||||
adding b
|
||||
$ echo 'bar' > a
|
||||
$ hg addremove && hg ci -m 'a => bar'
|
||||
$ cat >> .hg/hgrc <<EOF
|
||||
> [extensions]
|
||||
> pushrebase =
|
||||
> EOF
|
||||
|
||||
create master bookmark
|
||||
|
||||
$ hg bookmark master_bookmark -r tip
|
||||
|
||||
blobimport them into Mononoke storage and start Mononoke
|
||||
$ cd ..
|
||||
$ REPOID=0 blobimport repo-hg/.hg orig
|
||||
$ REPONAME=orig
|
||||
$ REPOID=1 blobimport repo-hg/.hg backup
|
||||
|
||||
start mononoke
|
||||
$ mononoke
|
||||
$ wait_for_mononoke
|
||||
|
||||
Make client repo
|
||||
$ hgclone_treemanifest ssh://user@dummy/repo-hg client-push --noupdate --config extensions.remotenames= -q
|
||||
$ hgclone_treemanifest mononoke://$(mononoke_address)/backup backup --noupdate --config extensions.remotenames=
|
||||
|
||||
Push to Mononoke
|
||||
$ cd $TESTTMP/client-push
|
||||
$ cat >> .hg/hgrc <<EOF
|
||||
> [extensions]
|
||||
> pushrebase =
|
||||
> remotenames =
|
||||
> EOF
|
||||
$ hg up -q master_bookmark
|
||||
|
||||
$ mkcommit pushcommit
|
||||
$ hgmn push -r . --to master_bookmark -q
|
||||
$ hg up -q master_bookmark
|
||||
$ mkcommit pushcommit2
|
||||
$ mkcommit pushcommit3
|
||||
$ hgmn push -r . --to master_bookmark -q
|
||||
|
||||
Sync to backup repos
|
||||
$ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select repo_id, globalrev from bonsai_globalrev_mapping"
|
||||
0|1000147970
|
||||
0|1000147971
|
||||
0|1000147972
|
||||
|
||||
$ mononoke_backup_sync backup sync-loop 2 --generate-bundles 2>&1 | grep 'successful sync'
|
||||
* successful sync of entries [3] (glob)
|
||||
* successful sync of entries [4] (glob)
|
||||
|
||||
|
||||
Make sure correct mutable counter is used (it should be repoid = 1)
|
||||
$ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select * from mutable_counters" | grep latest
|
||||
1|latest-replayed-request|4
|
||||
$ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select repo_id, globalrev from bonsai_globalrev_mapping"
|
||||
0|1000147970
|
||||
0|1000147971
|
||||
0|1000147972
|
||||
1|1000147970
|
||||
1|1000147971
|
||||
1|1000147972
|
Loading…
Reference in New Issue
Block a user