gnerate bundles concurrently to sending them

Summary:
Currently the mononoke_hg_sync_job either prepares a bundle or sends it to hg
servers. Those steps could be done concurrently. This dfff creates a stream of
bundles that is buffered so the syncing to hg servers doesn't have to wait for
new bundle generation.

The implementation is a bit gross, unfortunately I couldn't get the lifetimes
arround `scan()` transofrmer to work without mutex and I hit higher-order
lifetime exception few times which caused me to wrap some things in Arc.

Reviewed By: yancouto

Differential Revision: D43306918

fbshipit-source-id: b2ec3da21f4791c57a02fcfe76c735f4fdb16efd
This commit is contained in:
Mateusz Kwapich 2023-02-17 12:05:09 -08:00 committed by Facebook GitHub Bot
parent ac6d40c320
commit f3ae18126b
3 changed files with 212 additions and 69 deletions

View File

@ -22,6 +22,7 @@ use futures::future::try_join_all;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::lock::Mutex;
use futures_watchdog::WatchdogExt;
use getbundle_response::SessionLfsParams;
use itertools::Itertools;
@ -89,7 +90,7 @@ impl BundlePreparer {
pub async fn prepare_batches(
&self,
ctx: &CoreContext,
overlay: &mut BookmarkOverlay,
overlay: Arc<Mutex<BookmarkOverlay>>,
entries: Vec<BookmarkUpdateLogEntry>,
) -> Result<Vec<BookmarkLogEntryBatch>, Error> {
use BookmarkUpdateReason::*;
@ -423,15 +424,16 @@ async fn split_in_batches(
ctx: &CoreContext,
lca_hint: &dyn LeastCommonAncestorsHint,
changeset_fetcher: &ArcChangesetFetcher,
overlay: &mut BookmarkOverlay,
overlay: Arc<Mutex<BookmarkOverlay>>,
entries: Vec<BookmarkUpdateLogEntry>,
) -> Result<Vec<BookmarkLogEntryBatch>, Error> {
let mut batches: Vec<BookmarkLogEntryBatch> = vec![];
let mut overlay = overlay.lock().await;
for entry in entries {
let entry = match batches.last_mut() {
Some(batch) => match batch
.try_append(ctx, lca_hint, changeset_fetcher, overlay, entry)
.try_append(ctx, lca_hint, changeset_fetcher, &mut overlay, entry)
.watched(ctx.logger())
.await?
{
@ -442,7 +444,7 @@ async fn split_in_batches(
},
None => entry,
};
batches.push(BookmarkLogEntryBatch::new(overlay, entry));
batches.push(BookmarkLogEntryBatch::new(&mut overlay, entry));
}
Ok(batches)
@ -515,6 +517,7 @@ impl BookmarkLogEntryBatch {
#[cfg(test)]
mod test {
use fbinit::FacebookInit;
use maplit::hashmap;
use mononoke_types::RepositoryId;
@ -548,12 +551,12 @@ mod test {
None,
Some(commit),
)];
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = Arc::new(Mutex::new(BookmarkOverlay::new(Arc::new(hashmap! {}))));
let res = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
overlay.clone(),
entries.clone(),
)
.await?;
@ -566,6 +569,7 @@ mod test {
// The overlay should've been mutated so that the bookmark points to
// the latest value.
let overlay = overlay.lock().await;
assert_eq!(overlay.get(&main), Some(commit));
Ok(())
@ -596,12 +600,12 @@ mod test {
create_bookmark_log_entry(1, main.clone(), Some(commit_a), Some(commit_b)),
create_bookmark_log_entry(2, main.clone(), Some(commit_b), Some(commit_c)),
];
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let res = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?;
@ -645,12 +649,12 @@ mod test {
log_entry_2.clone(),
log_entry_3.clone(),
];
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let res = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?;
@ -704,12 +708,12 @@ mod test {
log_entry_2.clone(),
log_entry_3.clone(),
];
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let res = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?;
@ -752,12 +756,12 @@ mod test {
let log_entry_2 =
create_bookmark_log_entry(1, main.clone(), Some(commit_b), Some(commit_c));
let entries = vec![log_entry_1.clone(), log_entry_2.clone()];
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let res = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?;
@ -806,12 +810,12 @@ mod test {
let entries = vec![log_entry_1, log_entry_2.clone(), log_entry_3.clone()];
// Default case: no adjustment
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {}));
let mut batch = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?
@ -823,14 +827,14 @@ mod test {
assert_eq!(batch, original);
// Skip a single entry
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {
main.clone() => commit_a,
}));
let mut batch = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?
@ -845,14 +849,14 @@ mod test {
assert_eq!(batch.entries, vec![log_entry_2, log_entry_3.clone()]);
// Skip two entries
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {
main.clone() => commit_b,
}));
let mut batch = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?
@ -867,14 +871,14 @@ mod test {
assert_eq!(batch.entries, vec![log_entry_3]);
// The whole batch was already synced - nothing to do!
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {
main.clone() => commit_c,
}));
let mut batch = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?
@ -888,14 +892,14 @@ mod test {
// Bookmark is not in the batch at all - in that case just do nothing and
// return existing bundle
let mut overlay = BookmarkOverlay::new(Arc::new(hashmap! {
let overlay = BookmarkOverlay::new(Arc::new(hashmap! {
main => commit_d,
}));
let mut batch = split_in_batches(
&ctx,
&sli,
&repo.changeset_fetcher_arc(),
&mut overlay,
Arc::new(Mutex::new(overlay)),
entries.clone(),
)
.await?

View File

@ -12,6 +12,7 @@
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -60,6 +61,7 @@ use futures::future::try_join3;
use futures::future::BoxFuture;
use futures::future::FutureExt as _;
use futures::future::TryFutureExt;
use futures::lock::Mutex;
use futures::pin_mut;
use futures::stream;
use futures::stream::StreamExt;
@ -367,7 +369,17 @@ impl HgSyncProcess {
.long("bundle-prefetch")
.takes_value(true)
.required(false)
.help("How many bundles to prefetch"),
.help("How many bundles to prefetch (NOOP left for backwards compat)"),
)
.arg(
Arg::with_name("bundle-buffer-size")
.long("bundle-buffer-size")
.takes_value(true)
.required(false)
.help(
"How many bundles should be gnererated and buffered in \
advance of replaying (min 1, default 5)",
),
)
.arg(
Arg::with_name("exit-file")
@ -1264,7 +1276,7 @@ async fn run<'a>(
let start_id = args::get_usize_opt(&sub_m, "start-id")
.ok_or_else(|| Error::msg("--start-id must be specified"))?;
let (maybe_log_entry, (bundle_preparer, mut overlay, globalrev_syncer)) = try_join(
let (maybe_log_entry, (bundle_preparer, overlay, globalrev_syncer)) = try_join(
bookmarks
.read_next_bookmark_log_entries(
ctx.clone(),
@ -1279,7 +1291,11 @@ async fn run<'a>(
if let Some(log_entry) = maybe_log_entry {
let (stats, res) = async {
let batches = bundle_preparer
.prepare_batches(ctx, &mut overlay, vec![log_entry.clone()])
.prepare_batches(
&ctx.clone(),
Arc::new(Mutex::new(overlay)),
vec![log_entry.clone()],
)
.await?;
let mut combined_entries = bundle_preparer
.prepare_bundles(ctx.clone(), batches, filter_changesets)
@ -1314,13 +1330,14 @@ async fn run<'a>(
}
(MODE_SYNC_LOOP, Some(sub_m)) => {
let start_id = args::get_i64_opt(&sub_m, "start-id");
let bundle_buffer_size = args::get_usize_opt(&sub_m, "bundle-buffer-size").unwrap_or(5);
let combine_bundles = args::get_u64_opt(&sub_m, "combine-bundles").unwrap_or(1);
let loop_forever = sub_m.is_present("loop-forever");
let replayed_sync_counter = LatestReplayedSyncCounter::new(
&repo,
maybe_darkstorm_backup_repo.as_ref().map(|r| r.as_ref()),
)?;
let exit_path = sub_m
let exit_path: Option<PathBuf> = sub_m
.value_of("exit-file")
.map(|name| Path::new(name).to_path_buf());
@ -1352,14 +1369,12 @@ async fn run<'a>(
}))
});
let (start_id, (bundle_preparer, mut overlay, globalrev_syncer)) =
let (start_id, (bundle_preparer, overlay, globalrev_syncer)) =
try_join(counter, repo_parts).watched(ctx.logger()).await?;
let outcome_handler = build_outcome_handler(ctx, lock_via);
borrowed!(bundle_preparer: &BundlePreparer);
let overlay = &mut overlay;
let mut seen_first = false;
cloned!(filter_changesets);
let s = loop_over_log_entries(
ctx,
&bookmarks,
@ -1368,28 +1383,28 @@ async fn run<'a>(
&scuba_sample,
combine_bundles,
unlock_via,
);
pin_mut!(s);
while let Some(entries) = s.try_next().await? {
if !can_continue() {
break;
)
.try_filter(|entries| future::ready(!entries.is_empty()))
.scan(Arc::new(Mutex::new(overlay)), |overlay, entries_res| {
cloned!(ctx, overlay, bundle_preparer);
async move {
let entries = match entries_res {
Ok(entries) => entries,
Err(err) => {
return Some(Err(AnonymousError { cause: err }));
}
if entries.is_empty() {
continue;
}
let mut batches = bundle_preparer
.prepare_batches(ctx, overlay, entries)
.watched(ctx.logger())
};
let batches = bundle_preparer
.prepare_batches(&ctx, overlay, entries)
.await
.map_err(|cause| AnonymousError { cause })?;
if batches.is_empty() {
continue;
.map_err(|cause| AnonymousError { cause });
Some(batches)
}
if !seen_first {
})
.try_filter(|batches| future::ready(!batches.is_empty()))
.scan(false, move |seen_first, mut batches_res| {
if let Ok(batches) = batches_res.as_mut() {
if !*seen_first {
// In the case that the sync job failed to update the
// "latest-replayed-request" counter during its previous
// run, the first batch might contain entries that were
@ -1399,15 +1414,31 @@ async fn run<'a>(
// commits, if possible.
if let Some(first) = batches.first_mut() {
first.maybe_adjust(ctx);
seen_first = true;
*seen_first = true;
}
}
}
future::ready(Some(batches_res))
})
.map_ok(|batches| {
cloned!(filter_changesets, bundle_preparer, ctx);
// Spawn is here so the bundling makes progress even when the
// latter sync stages are going on.
tokio::spawn(async move {
let bundles = bundle_preparer
.prepare_bundles(ctx.clone(), batches, filter_changesets.clone())
.prepare_bundles(ctx.clone(), batches, filter_changesets)
.watched(ctx.logger())
.await?;
Ok::<_, PipelineError>(bundles)
})
.map_err(|cause| AnonymousError {
cause: cause.into(),
})
})
.try_buffered(bundle_buffer_size);
pin_mut!(s);
while let Some(bundles) = s.try_next().await? {
let bundles: Vec<CombinedBookmarkUpdateLogEntry> = bundles?;
for bundle in bundles {
if !can_continue() {
break;

View File

@ -0,0 +1,108 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License found in the LICENSE file in the root
# directory of this source tree.
$ . "${TEST_FIXTURES}/library.sh"
setup configuration
$ setup_common_config blob_files
$ cd $TESTTMP
setup repo
$ hginit_treemanifest repo-hg
$ cd repo-hg
$ echo foo > a
$ echo foo > b
$ hg addremove && hg ci -m 'initial'
adding a
adding b
$ echo 'bar' > a
$ hg addremove && hg ci -m 'a => bar'
$ cat >> .hg/hgrc <<EOF
> [extensions]
> pushrebase =
> EOF
create master bookmark
$ hg bookmark master_bookmark -r tip
blobimport them into Mononoke storage and start Mononoke
$ cd ..
$ blobimport repo-hg/.hg repo
start mononoke
$ start_and_wait_for_mononoke_server
Make client repo
$ hgclone_treemanifest ssh://user@dummy/repo-hg client-push --noupdate --config extensions.remotenames= -q
Push to Mononoke
$ cd $TESTTMP/client-push
$ cat >> .hg/hgrc <<EOF
> [extensions]
> pushrebase =
> remotenames =
> EOF
$ hg up -q tip
Two pushes synced one after another
$ hg up -q master_bookmark
$ mkcommit commit_1
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_2
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_3
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_4
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_5
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_6
$ hgmn push -r . --to master_bookmark -q
$ hg up -q master_bookmark
$ mkcommit commit_7
$ hgmn push -r . --to master_bookmark -q
Sync it to another client
$ cd $TESTTMP/repo-hg
$ enable_replay_verification_hook
$ cat >> .hg/hgrc <<EOF
> [treemanifest]
> treeonly=True
> EOF
$ cd $TESTTMP
Sync a pushrebase bookmark move
$ mononoke_hg_sync_loop_regenerate repo-hg 1 --combine-bundles 2 --bundle-buffer-size 2 2>&1 | grep 'ful sync\|prepare' | cut -d " " -f 6- > out
(the actual syncs need to happen in-order)
$ cat out | grep sync
successful sync of entries [2, 3], repo: repo
successful sync of entries [4, 5], repo: repo
successful sync of entries [6, 7], repo: repo
successful sync of entries [8], repo: repo
(but the preparation of log entries doesn't have to be in order so we sort it)
$ cat out | grep prepare | sort
successful prepare of entries #[2, 3], repo: repo
successful prepare of entries #[4, 5], repo: repo
successful prepare of entries #[6, 7], repo: repo
successful prepare of entries #[8], repo: repo
$ cd "$TESTTMP"/repo-hg
$ hg log -r tip -T '{desc}\n'
commit_7