convert derived data utils to new futures

Summary: Convert derived data utils to use new style futures

Reviewed By: StanislavGlebik

Differential Revision: D24331068

fbshipit-source-id: ad658b278802afa1e4ecd44c5a24164135748790
This commit is contained in:
Pavel Aslanov 2020-10-16 07:44:19 -07:00 committed by Facebook GitHub Bot
parent 274d46b7a0
commit 4190463c33
5 changed files with 129 additions and 138 deletions

View File

@ -160,7 +160,6 @@ async fn check_derived_data_exists(
let pending = derived_utils
.pending(ctx.clone(), repo.clone(), cs_ids.clone())
.compat()
.await?;
for cs_id in cs_ids {

View File

@ -34,7 +34,6 @@ use futures::{
};
use futures_ext::FutureExt as OldFutureExt;
use futures_old::Future as OldFuture;
use futures_stats::Timed;
use futures_stats::TimedFutureExt;
use metaconfig_types::DerivedDataConfig;
use mononoke_types::{ChangesetId, DateTime};
@ -494,7 +493,7 @@ async fn subcommand_backfill(
);
let total_count = changesets.len();
let mut generated_count = 0;
let mut generated_count = 0usize;
let mut total_duration = Duration::from_secs(0);
if regenerate {
@ -505,7 +504,6 @@ async fn subcommand_backfill(
let (stats, chunk_size) = async {
let chunk = derived_utils
.pending(ctx.clone(), repo.clone(), chunk.to_vec())
.compat()
.await?;
let chunk_size = chunk.len();
@ -513,7 +511,6 @@ async fn subcommand_backfill(
derived_utils
.backfill_batch_dangerous(ctx.clone(), repo.clone(), chunk)
.compat()
.await?;
Result::<_, Error>::Ok(chunk_size)
}
@ -604,10 +601,7 @@ async fn tail_one_iteration(
async move {
// create new context so each derivation would have its own trace
let ctx = CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
let pending = derive
.pending(ctx.clone(), repo.clone(), heads)
.compat()
.await?;
let pending = derive.pending(ctx.clone(), repo.clone(), heads).await?;
let oldest_underived =
derive.find_oldest_underived(&ctx, &repo, &pending).await?;
@ -634,7 +628,7 @@ async fn tail_one_iteration(
let pending_futs = pending.into_iter().map(|(derive, pending, _)| {
pending
.into_iter()
.map(|csid| derive.derive(ctx.clone(), repo.clone(), csid).compat())
.map(|csid| derive.derive(ctx.clone(), repo.clone(), csid))
.collect::<Vec<_>>()
});
@ -682,23 +676,21 @@ async fn subcommand_single(
stream::iter(derived_utils)
.map(Ok)
.try_for_each_concurrent(100, |derived_utils| {
derived_utils
.derive(ctx.clone(), repo.clone(), csid)
.timed({
cloned!(ctx);
move |stats, result| {
info!(
ctx.logger(),
"derived {} in {:?}: {:?}",
derived_utils.name(),
stats.completion_time,
result
);
Ok(())
}
})
.map(|_| ())
.compat()
cloned!(ctx, repo);
async move {
let (stats, result) = derived_utils
.derive(ctx.clone(), repo.clone(), csid)
.timed()
.await;
info!(
ctx.logger(),
"derived {} in {:?}: {:?}",
derived_utils.name(),
stats.completion_time,
result
);
Ok(())
}
})
.await
}
@ -716,7 +708,6 @@ mod tests {
sync::atomic::{AtomicUsize, Ordering},
};
use tests_utils::resolve_cs_id;
use tokio_compat::runtime::Runtime;
use unodes::RootUnodeManifestId;
#[fbinit::compat_test]
@ -766,29 +757,30 @@ mod tests {
Ok(())
}
#[fbinit::test]
fn test_backfill_data_latest(fb: FacebookInit) -> Result<(), Error> {
let mut runtime = Runtime::new()?;
#[fbinit::compat_test]
async fn test_backfill_data_latest(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo = runtime.block_on_std(linear::getrepo(fb));
let repo = linear::getrepo(fb).await;
let hg_cs_id = HgChangesetId::from_str("79a13814c5ce7330173ec04d279bf95ab3f652fb")?;
let maybe_bcs_id = runtime.block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_cs_id))?;
let maybe_bcs_id = repo
.get_bonsai_from_hg(ctx.clone(), hg_cs_id)
.compat()
.await?;
let bcs_id = maybe_bcs_id.unwrap();
let derived_utils = derived_data_utils(repo.clone(), RootUnodeManifestId::NAME)?;
runtime.block_on(derived_utils.backfill_batch_dangerous(ctx, repo, vec![bcs_id]))?;
derived_utils
.backfill_batch_dangerous(ctx, repo, vec![bcs_id])
.await?;
Ok(())
}
#[fbinit::test]
fn test_backfill_data_batch(fb: FacebookInit) -> Result<(), Error> {
let mut runtime = Runtime::new()?;
#[fbinit::compat_test]
async fn test_backfill_data_batch(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo = runtime.block_on_std(linear::getrepo(fb));
let repo = linear::getrepo(fb).await;
let mut batch = vec![];
let hg_cs_ids = vec![
@ -799,50 +791,50 @@ mod tests {
];
for hg_cs_id in &hg_cs_ids {
let hg_cs_id = HgChangesetId::from_str(hg_cs_id)?;
let maybe_bcs_id = runtime.block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_cs_id))?;
let maybe_bcs_id = repo
.get_bonsai_from_hg(ctx.clone(), hg_cs_id)
.compat()
.await?;
batch.push(maybe_bcs_id.unwrap());
}
let derived_utils = derived_data_utils(repo.clone(), RootUnodeManifestId::NAME)?;
let pending =
runtime.block_on(derived_utils.pending(ctx.clone(), repo.clone(), batch.clone()))?;
let pending = derived_utils
.pending(ctx.clone(), repo.clone(), batch.clone())
.await?;
assert_eq!(pending.len(), hg_cs_ids.len());
runtime.block_on(derived_utils.backfill_batch_dangerous(
ctx.clone(),
repo.clone(),
batch.clone(),
))?;
let pending = runtime.block_on(derived_utils.pending(ctx, repo, batch))?;
derived_utils
.backfill_batch_dangerous(ctx.clone(), repo.clone(), batch.clone())
.await?;
let pending = derived_utils.pending(ctx, repo, batch).await?;
assert_eq!(pending.len(), 0);
Ok(())
}
#[fbinit::test]
fn test_backfill_data_failing_blobstore(fb: FacebookInit) -> Result<(), Error> {
#[fbinit::compat_test]
async fn test_backfill_data_failing_blobstore(fb: FacebookInit) -> Result<(), Error> {
// The test exercises that derived data mapping entries are written only after
// all other blobstore writes were successful i.e. mapping entry shouldn't exist
// if any of the corresponding blobs weren't successfully saved
let mut runtime = Runtime::new()?;
let ctx = CoreContext::test_mock(fb);
let origrepo = runtime.block_on_std(linear::getrepo(fb));
let origrepo = linear::getrepo(fb).await;
let repo = origrepo.dangerous_override(|blobstore| -> Arc<dyn Blobstore> {
Arc::new(FailingBlobstore::new("manifest".to_string(), blobstore))
});
let first_hg_cs_id = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?;
let maybe_bcs_id =
runtime.block_on(repo.get_bonsai_from_hg(ctx.clone(), first_hg_cs_id))?;
let maybe_bcs_id = repo
.get_bonsai_from_hg(ctx.clone(), first_hg_cs_id)
.compat()
.await?;
let bcs_id = maybe_bcs_id.unwrap();
let derived_utils = derived_data_utils(repo.clone(), RootUnodeManifestId::NAME)?;
let res = runtime.block_on(derived_utils.backfill_batch_dangerous(
ctx.clone(),
repo.clone(),
vec![bcs_id],
));
let res = derived_utils
.backfill_batch_dangerous(ctx.clone(), repo.clone(), vec![bcs_id])
.await;
// Deriving should fail because blobstore writes fail
assert!(res.is_err());
@ -851,10 +843,14 @@ mod tests {
// is now safe
let repo = origrepo;
let second_hg_cs_id = HgChangesetId::from_str("3e0e761030db6e479a7fb58b12881883f9f8c63f")?;
let maybe_bcs_id =
runtime.block_on(repo.get_bonsai_from_hg(ctx.clone(), second_hg_cs_id))?;
let maybe_bcs_id = repo
.get_bonsai_from_hg(ctx.clone(), second_hg_cs_id)
.compat()
.await?;
let bcs_id = maybe_bcs_id.unwrap();
runtime.block_on(derived_utils.backfill_batch_dangerous(ctx, repo, vec![bcs_id]))?;
derived_utils
.backfill_batch_dangerous(ctx, repo, vec![bcs_id])
.await?;
Ok(())
}

View File

@ -27,14 +27,12 @@ use fastlog::{RootFastlog, RootFastlogMapping};
use fsnodes::{RootFsnodeId, RootFsnodeMapping};
use futures::{
compat::Future01CompatExt,
future::{self, ready, try_join_all},
future::{self, ready, try_join_all, BoxFuture, FutureExt},
stream::{self, futures_unordered::FuturesUnordered},
Future, Stream, StreamExt, TryStreamExt,
};
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_old::{
future as future_old, stream as stream_old, Future as OldFuture, Stream as OldStream,
Future, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use futures_ext::{BoxFuture as BoxFutureOld, FutureExt as OldFutureExt};
use futures_old::{future as future_old, Future as OldFuture};
use lazy_static::lazy_static;
use lock_ext::LockExt;
use mercurial_derived_data::{HgChangesetIdMapping, MappedHgChangesetId};
@ -108,8 +106,7 @@ pub fn derive_data_for_csids(
for csid in &csids {
let fut = derived_utils
.derive(ctx.clone(), repo.clone(), *csid)
.map(|_| ())
.compat();
.map_ok(|_| ());
futs.push(fut);
}
@ -137,22 +134,22 @@ pub trait DerivedUtils: Send + Sync + 'static {
ctx: CoreContext,
repo: BlobRepo,
csid: ChangesetId,
) -> BoxFuture<String, Error>;
) -> BoxFuture<'static, Result<String, Error>>;
fn backfill_batch_dangerous(
&self,
ctx: CoreContext,
repo: BlobRepo,
csids: Vec<ChangesetId>,
) -> BoxFuture<(), Error>;
) -> BoxFuture<'static, Result<(), Error>>;
/// Find pending changeset (changesets for which data have not been derived)
fn pending(
async fn pending(
&self,
ctx: CoreContext,
repo: BlobRepo,
csids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetId>, Error>;
) -> Result<Vec<ChangesetId>, Error>;
/// Regenerate derived data for specified set of commits
fn regenerate(&self, csids: &Vec<ChangesetId>);
@ -192,15 +189,19 @@ where
ctx: CoreContext,
repo: BlobRepo,
csid: ChangesetId,
) -> BoxFuture<String, Error> {
) -> BoxFuture<'static, Result<String, Error>> {
// We call derive_impl directly so that we can pass
// `self.mapping` there. This will allow us to
// e.g. regenerate derived data for the commit
// even if it was already generated (see RegenerateMapping call).
derive_impl::<M::Value, _>(ctx, repo, self.mapping.clone(), csid, self.mode)
.map(|result| format!("{:?}", result))
.from_err()
.boxify()
cloned!(self.mapping, self.mode);
async move {
let result = derive_impl::<M::Value, _>(ctx, repo, mapping, csid, mode)
.compat()
.await?;
Ok(format!("{:?}", result))
}
.boxed()
}
/// !!!!This function is dangerous and should be used with care!!!!
@ -219,7 +220,7 @@ where
ctx: CoreContext,
repo: BlobRepo,
csids: Vec<ChangesetId>,
) -> BoxFuture<(), Error> {
) -> BoxFuture<'static, Result<(), Error>> {
let orig_mapping = self.mapping.clone();
// With InMemoryMapping we can ensure that mapping entries are written only after
// all corresponding blobs were successfully saved
@ -237,52 +238,51 @@ where
});
let memblobstore = memblobstore.expect("memblobstore should have been updated");
stream_old::iter_ok(csids)
.for_each({
cloned!(ctx, in_memory_mapping, repo);
move |csid| {
// create new context so each derivation would have its own trace
let ctx = CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
derive_impl::<M::Value, _>(
ctx.clone(),
repo.clone(),
in_memory_mapping.clone(),
csid,
DeriveMode::Unsafe,
)
.map(|_| ())
.from_err()
}
})
.and_then({
cloned!(ctx, memblobstore);
move |_| memblobstore.persist(ctx)
})
.and_then(move |_| {
async move {
for csid in csids {
// create new context so each derivation would have its own trace
let ctx = CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
derive_impl::<M::Value, _>(
ctx.clone(),
repo.clone(),
in_memory_mapping.clone(),
csid,
DeriveMode::Unsafe,
)
.compat()
.await?;
}
// flush blobstore
memblobstore.persist(ctx.clone()).compat().await?;
// flush mapping
let futs = FuturesUnordered::new();
{
let buffer = in_memory_mapping.into_buffer();
let buffer = buffer.lock().unwrap();
let mut futs = vec![];
for (cs_id, value) in buffer.iter() {
futs.push(orig_mapping.put(ctx.clone(), *cs_id, value.clone()));
futs.push(
orig_mapping
.put(ctx.clone(), *cs_id, value.clone())
.compat(),
);
}
stream_old::futures_unordered(futs).for_each(|_| Ok(()))
})
.boxify()
}
futs.try_for_each(|_| future::ok(())).await?;
Ok(())
}
.boxed()
}
fn pending(
async fn pending(
&self,
ctx: CoreContext,
_repo: BlobRepo,
mut csids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetId>, Error> {
self.mapping
.get(ctx, csids.clone())
.map(move |derived| {
csids.retain(|csid| !derived.contains_key(&csid));
csids
})
.boxify()
) -> Result<Vec<ChangesetId>, Error> {
let derived = self.mapping.get(ctx, csids.clone()).compat().await?;
csids.retain(|csid| !derived.contains_key(&csid));
Ok(csids)
}
async fn find_oldest_underived<'a>(
@ -369,7 +369,7 @@ where
&self,
ctx: CoreContext,
mut csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> BoxFutureOld<HashMap<ChangesetId, Self::Value>, Error> {
let buffer = self.buffer.lock().unwrap();
let mut ans = HashMap::new();
csids.retain(|cs_id| {
@ -387,7 +387,12 @@ where
.boxify()
}
fn put(&self, _ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
fn put(
&self,
_ctx: CoreContext,
csid: ChangesetId,
id: Self::Value,
) -> BoxFutureOld<(), Error> {
let mut buffer = self.buffer.lock().unwrap();
buffer.insert(csid, id);
future_old::ok(()).boxify()
@ -498,7 +503,6 @@ impl DeriveGraph {
if let Some(deriver) = &node.deriver {
deriver
.backfill_batch_dangerous(ctx.clone(), repo, node.csids.clone())
.compat()
.await?;
if let (Some(first), Some(last)) = (node.csids.first(), node.csids.last()) {
slog::info!(
@ -785,7 +789,6 @@ pub fn find_underived_many(
let derivers = derivers.iter().map(|deriver| async move {
if deriver
.pending(ctx.clone(), repo.clone(), vec![csid])
.compat()
.await?
.is_empty()
{
@ -994,7 +997,7 @@ mod tests {
ctx: CoreContext,
repo: BlobRepo,
csid: ChangesetId,
) -> BoxFuture<String, Error> {
) -> BoxFuture<'static, Result<String, Error>> {
self.deriver.derive(ctx, repo, csid)
}
@ -1003,18 +1006,18 @@ mod tests {
ctx: CoreContext,
repo: BlobRepo,
csids: Vec<ChangesetId>,
) -> BoxFuture<(), Error> {
) -> BoxFuture<'static, Result<(), Error>> {
self.deriver.backfill_batch_dangerous(ctx, repo, csids)
}
fn pending(
async fn pending(
&self,
ctx: CoreContext,
repo: BlobRepo,
csids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetId>, Error> {
) -> Result<Vec<ChangesetId>, Error> {
self.count.fetch_add(1, Ordering::SeqCst);
self.deriver.pending(ctx, repo, csids)
self.deriver.pending(ctx, repo, csids).await
}
fn regenerate(&self, _csids: &Vec<ChangesetId>) {
@ -1075,14 +1078,8 @@ mod tests {
}
);
unodes_deriver
.derive(ctx.clone(), repo.clone(), b)
.compat()
.await?;
blame_deriver
.derive(ctx.clone(), repo.clone(), a)
.compat()
.await?;
unodes_deriver.derive(ctx.clone(), repo.clone(), b).await?;
blame_deriver.derive(ctx.clone(), repo.clone(), a).await?;
let entries: BTreeMap<_, _> = find_underived_many(
ctx.clone(),

View File

@ -255,7 +255,6 @@ async fn derive_bonsais_single_repo(
for csid in bcs_ids {
derived_util
.derive(ctx.clone(), repo.clone(), csid.clone())
.compat()
.map_ok(|_| ())
.await?;
}
@ -748,7 +747,8 @@ where
"Bookmark {:?} unexpectedly dropped in {:?} when trying to generate large_dest_bookmark",
dest_bookmark,
commit_syncer
)})?;
)
})?;
info!(
ctx.logger(),
"Set large repo's destination bookmark to {}", large_dest_bookmark

View File

@ -731,7 +731,6 @@ mod tests {
let derived_utils = derived_data_utils(repo.clone(), derived_data_type)?;
let pending = derived_utils
.pending(ctx.clone(), repo.clone(), cs_ids.to_vec())
.compat()
.await?;
assert!(pending.is_empty());
}