From 6b8a8324335e113b88ce68b379d4f980e97d1176 Mon Sep 17 00:00:00 2001 From: Mark Juggurnauth-Thomas Date: Tue, 3 Nov 2020 09:13:45 -0800 Subject: [PATCH] derived_data: change BonsaiDerivedMapping to use new futures Summary: This changes the methods from ones that return old `BoxFuture`s to an async method using `async_trait`. Reviewed By: krallin Differential Revision: D24689506 fbshipit-source-id: 7b13010924369f81681e6590898af703c5423385 --- eden/mononoke/derived_data/blame/derived.rs | 20 ++-- .../derived_data/changeset_info/Cargo.toml | 1 - .../derived_data/changeset_info/derive.rs | 22 ++-- .../deleted_files_manifest/mapping.rs | 15 +-- eden/mononoke/derived_data/fastlog/mapping.rs | 21 ++-- .../derived_data/filenodes/Cargo.toml | 1 - eden/mononoke/derived_data/filenodes/lib.rs | 105 +++++++++--------- eden/mononoke/derived_data/fsnodes/mapping.rs | 16 +-- .../mercurial_derived_data/mapping.rs | 15 ++- eden/mononoke/derived_data/src/derive_impl.rs | 32 +++--- eden/mononoke/derived_data/src/lib.rs | 29 ++--- eden/mononoke/derived_data/unodes/mapping.rs | 15 +-- eden/mononoke/derived_data/utils/Cargo.toml | 1 - eden/mononoke/derived_data/utils/lib.rs | 47 ++++---- .../mononoke/git/git_types/src/derive_tree.rs | 20 ++-- 15 files changed, 190 insertions(+), 170 deletions(-) diff --git a/eden/mononoke/derived_data/blame/derived.rs b/eden/mononoke/derived_data/blame/derived.rs index e13e72d7b4..c15984c6af 100644 --- a/eden/mononoke/derived_data/blame/derived.rs +++ b/eden/mononoke/derived_data/blame/derived.rs @@ -19,7 +19,7 @@ use futures::{ future::{FutureExt, TryFutureExt}, StreamExt, TryStreamExt, }; -use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt, StreamExt as _}; +use futures_ext::{spawn_future, StreamExt as _}; use futures_old::{future, stream, Future, IntoFuture, Stream}; use manifest::find_intersection_of_diffs; use mononoke_types::{ @@ -114,14 +114,15 @@ impl BlameRootMapping { } } +#[async_trait] impl BonsaiDerivedMapping for BlameRootMapping { type Value = BlameRoot; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let futs = csids.into_iter().map(|csid| { self.blobstore .get(ctx.clone(), self.format_key(&csid)) @@ -131,18 +132,23 @@ impl BonsaiDerivedMapping for BlameRootMapping { stream::FuturesUnordered::from_iter(futs) .filter_map(|v| v) .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> { + async fn put( + &self, + ctx: CoreContext, + csid: ChangesetId, + _id: Self::Value, + ) -> Result<(), Error> { self.blobstore .put( ctx, self.format_key(&csid), BlobstoreBytes::from_bytes(Bytes::new()), ) - .compat() - .boxify() + .await } } diff --git a/eden/mononoke/derived_data/changeset_info/Cargo.toml b/eden/mononoke/derived_data/changeset_info/Cargo.toml index 1dcd04200d..3f4f4bcc22 100644 --- a/eden/mononoke/derived_data/changeset_info/Cargo.toml +++ b/eden/mononoke/derived_data/changeset_info/Cargo.toml @@ -17,7 +17,6 @@ derived_data = { path = ".." } derived_data-thrift = { path = "if" } mononoke_types = { path = "../../mononoke_types" } fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" } -futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" async-trait = "0.1.29" futures = { version = "0.3.5", features = ["async-await", "compat"] } diff --git a/eden/mononoke/derived_data/changeset_info/derive.rs b/eden/mononoke/derived_data/changeset_info/derive.rs index 77b71d23a9..66c834694e 100644 --- a/eden/mononoke/derived_data/changeset_info/derive.rs +++ b/eden/mononoke/derived_data/changeset_info/derive.rs @@ -16,8 +16,8 @@ use blobstore::Blobstore; use context::CoreContext; use derived_data::{BonsaiDerived, BonsaiDerivedMapping}; use fbthrift::compact_protocol; +use futures::compat::Future01CompatExt; use futures::future::TryFutureExt; -use futures_ext::{BoxFuture, FutureExt}; use futures_old::{stream::FuturesUnordered, Future, Stream}; use mononoke_types::{BlobstoreBytes, BonsaiChangeset, ChangesetId}; @@ -58,14 +58,15 @@ impl ChangesetInfoMapping { } } +#[async_trait] impl BonsaiDerivedMapping for ChangesetInfoMapping { type Value = ChangesetInfo; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let futs = csids.into_iter().map(|csid| { self.blobstore .get(ctx.clone(), self.format_key(&csid)) @@ -81,18 +82,21 @@ impl BonsaiDerivedMapping for ChangesetInfoMapping { .filter_map(|maybe_info| maybe_info) .collect() .and_then(move |infos| infos.into_iter().collect::, Error>>()) - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, info: Self::Value) -> BoxFuture<(), Error> { + async fn put( + &self, + ctx: CoreContext, + csid: ChangesetId, + info: Self::Value, + ) -> Result<(), Error> { let data = { let data = compact_protocol::serialize(&info.into_thrift()); BlobstoreBytes::from_bytes(data) }; - self.blobstore - .put(ctx, self.format_key(&csid), data) - .compat() - .boxify() + self.blobstore.put(ctx, self.format_key(&csid), data).await } } diff --git a/eden/mononoke/derived_data/deleted_files_manifest/mapping.rs b/eden/mononoke/derived_data/deleted_files_manifest/mapping.rs index 78e5630e3b..9eed4871e1 100644 --- a/eden/mononoke/derived_data/deleted_files_manifest/mapping.rs +++ b/eden/mononoke/derived_data/deleted_files_manifest/mapping.rs @@ -15,7 +15,7 @@ use context::CoreContext; use derived_data::{BonsaiDerived, BonsaiDerivedMapping}; use futures::compat::Future01CompatExt; use futures::future::TryFutureExt; -use futures_ext::{BoxFuture, FutureExt, StreamExt}; +use futures_ext::StreamExt; use futures_old::{ stream::{self, FuturesUnordered}, Future, Stream, @@ -117,14 +117,15 @@ impl RootDeletedManifestMapping { } } +#[async_trait] impl BonsaiDerivedMapping for RootDeletedManifestMapping { type Value = RootDeletedManifestId; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let gets = csids.into_iter().map(|cs_id| { self.fetch_deleted_manifest(ctx.clone(), cs_id) .map(|maybe_root_mf_id| stream::iter_ok(maybe_root_mf_id.into_iter())) @@ -132,13 +133,13 @@ impl BonsaiDerivedMapping for RootDeletedManifestMapping { FuturesUnordered::from_iter(gets) .flatten() .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> { + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { self.blobstore .put(ctx, self.format_key(csid), id.into()) - .compat() - .boxify() + .await } } diff --git a/eden/mononoke/derived_data/fastlog/mapping.rs b/eden/mononoke/derived_data/fastlog/mapping.rs index 365cab0593..037cc2e5a2 100644 --- a/eden/mononoke/derived_data/fastlog/mapping.rs +++ b/eden/mononoke/derived_data/fastlog/mapping.rs @@ -16,7 +16,7 @@ use derived_data::{BonsaiDerived, BonsaiDerivedMapping}; use futures::compat::Future01CompatExt; use futures::future::TryFutureExt; use futures::stream::TryStreamExt; -use futures_ext::{BoxFuture, FutureExt, StreamExt}; +use futures_ext::StreamExt; use futures_old::{future, stream::FuturesUnordered, Future, Stream}; use manifest::{find_intersection_of_diffs, Entry}; use mononoke_types::{BonsaiChangeset, ChangesetId, FileUnodeId, ManifestUnodeId}; @@ -151,14 +151,15 @@ impl RootFastlogMapping { } } +#[async_trait] impl BonsaiDerivedMapping for RootFastlogMapping { type Value = RootFastlog; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let gets = csids.into_iter().map(|cs_id| { self.blobstore .get(ctx.clone(), self.format_key(&cs_id)) @@ -168,10 +169,16 @@ impl BonsaiDerivedMapping for RootFastlogMapping { FuturesUnordered::from_iter(gets) .filter_map(|x| x) // Remove None .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> { + async fn put( + &self, + ctx: CoreContext, + csid: ChangesetId, + _id: Self::Value, + ) -> Result<(), Error> { self.blobstore .put( ctx, @@ -179,8 +186,7 @@ impl BonsaiDerivedMapping for RootFastlogMapping { // Value doesn't matter here, so just put empty Value BlobstoreBytes::from_bytes(Bytes::new()), ) - .compat() - .boxify() + .await } } @@ -200,6 +206,7 @@ mod tests { unshared_merge_even, unshared_merge_uneven, }; use futures::StreamExt; + use futures_ext::{BoxFuture, FutureExt}; use manifest::ManifestOps; use maplit::btreemap; use mercurial_types::HgChangesetId; diff --git a/eden/mononoke/derived_data/filenodes/Cargo.toml b/eden/mononoke/derived_data/filenodes/Cargo.toml index fde79403e9..53da977107 100644 --- a/eden/mononoke/derived_data/filenodes/Cargo.toml +++ b/eden/mononoke/derived_data/filenodes/Cargo.toml @@ -19,7 +19,6 @@ filenodes = { path = "../../filenodes" } manifest = { path = "../../manifest" } mercurial_types = { path = "../../mercurial/types" } mononoke_types = { path = "../../mononoke_types" } -cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" async-trait = "0.1.29" diff --git a/eden/mononoke/derived_data/filenodes/lib.rs b/eden/mononoke/derived_data/filenodes/lib.rs index 88d1ffca9e..2b1cb69fce 100644 --- a/eden/mononoke/derived_data/filenodes/lib.rs +++ b/eden/mononoke/derived_data/filenodes/lib.rs @@ -12,16 +12,14 @@ use async_trait::async_trait; use blobrepo::BlobRepo; use blobrepo_hg::BlobRepoHg; use blobstore::Loadable; -use cloned::cloned; use context::CoreContext; use derived_data::{BonsaiDerived, BonsaiDerivedMapping}; use filenodes::{FilenodeInfo, FilenodeResult, PreparedFilenode}; use futures::{ - compat::Future01CompatExt, future::try_join_all, stream, FutureExt, StreamExt, TryFutureExt, - TryStreamExt, + compat::Future01CompatExt, future::try_join_all, stream, StreamExt, TryFutureExt, TryStreamExt, }; -use futures_ext::{BoxFuture, FutureExt as OldFutureExt}; -use futures_old::{future as old_future, Future}; +use futures_ext::FutureExt as OldFutureExt; +use futures_old::Future; use futures_util::try_join; use itertools::{Either, Itertools}; use manifest::{find_intersection_of_diffs_and_parents, Entry}; @@ -312,50 +310,50 @@ impl FilenodesOnlyPublicMapping { } } +#[async_trait] impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping { type Value = FilenodesOnlyPublic; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { - cloned!(self.repo); - async move { - stream::iter(csids.into_iter()) - .map({ - let repo = &repo; - let ctx = &ctx; - move |cs_id| async move { - let filenode_res = fetch_root_filenode(&ctx, cs_id, &repo).await?; - let maybe_root_filenode = match filenode_res { - FilenodeResult::Present(maybe_root_filenode) => maybe_root_filenode, - FilenodeResult::Disabled => { - return Ok(Some((cs_id, FilenodesOnlyPublic::Disabled))); - } - }; + ) -> Result, Error> { + stream::iter(csids.into_iter()) + .map({ + let repo = &self.repo; + let ctx = &ctx; + move |cs_id| async move { + let filenode_res = fetch_root_filenode(&ctx, cs_id, &repo).await?; + let maybe_root_filenode = match filenode_res { + FilenodeResult::Present(maybe_root_filenode) => maybe_root_filenode, + FilenodeResult::Disabled => { + return Ok(Some((cs_id, FilenodesOnlyPublic::Disabled))); + } + }; - Ok(maybe_root_filenode.map(move |filenode| { - ( - cs_id, - FilenodesOnlyPublic::Present { - root_filenode: Some(filenode), - }, - ) - })) - } - }) - .buffer_unordered(100) - .try_filter_map(|x| async { Ok(x) }) - .try_collect() - .await - } - .boxed() - .compat() - .boxify() + Ok(maybe_root_filenode.map(move |filenode| { + ( + cs_id, + FilenodesOnlyPublic::Present { + root_filenode: Some(filenode), + }, + ) + })) + } + }) + .buffer_unordered(100) + .try_filter_map(|x| async { Ok(x) }) + .try_collect() + .await } - fn put(&self, ctx: CoreContext, _csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> { + async fn put( + &self, + ctx: CoreContext, + _csid: ChangesetId, + id: Self::Value, + ) -> Result<(), Error> { let filenodes = self.repo.get_filenodes(); let repo_id = self.repo.get_repoid(); @@ -365,16 +363,19 @@ impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping { }; match root_filenode { - Some(root_filenode) => filenodes - .add_filenodes(ctx.clone(), vec![root_filenode.into()], repo_id) - .map(|res| match res { - // If filenodes are disabled then just return success - // but use explicit match here in case we add more variants - // to FilenodeResult enum - FilenodeResult::Present(()) | FilenodeResult::Disabled => {} - }) - .boxify(), - None => old_future::ok(()).boxify(), + Some(root_filenode) => { + filenodes + .add_filenodes(ctx.clone(), vec![root_filenode.into()], repo_id) + .map(|res| match res { + // If filenodes are disabled then just return success + // but use explicit match here in case we add more variants + // to FilenodeResult enum + FilenodeResult::Present(()) | FilenodeResult::Disabled => {} + }) + .compat() + .await + } + None => Ok(()), } } } @@ -629,7 +630,6 @@ mod tests { // Make sure they are in the mapping let maps = FilenodesOnlyPublic::mapping(&ctx, &repo) .get(ctx.clone(), vec![parent_empty, child_empty]) - .compat() .await?; assert_eq!(maps.len(), 2); @@ -659,7 +659,6 @@ mod tests { // Make sure they are in the mapping let maps = mapping .get(ctx.clone(), vec![child_empty, parent_empty]) - .compat() .await?; assert_eq!(maps.len(), 2); Ok(()) @@ -692,7 +691,7 @@ mod tests { assert_eq!(derived, FilenodesOnlyPublic::Disabled); let mapping = FilenodesOnlyPublic::mapping(&ctx, &repo); - let res = mapping.get(ctx.clone(), vec![cs]).compat().await?; + let res = mapping.get(ctx.clone(), vec![cs]).await?; assert_eq!(res.get(&cs).unwrap(), &FilenodesOnlyPublic::Disabled); diff --git a/eden/mononoke/derived_data/fsnodes/mapping.rs b/eden/mononoke/derived_data/fsnodes/mapping.rs index 4569ba851e..2582c3456d 100644 --- a/eden/mononoke/derived_data/fsnodes/mapping.rs +++ b/eden/mononoke/derived_data/fsnodes/mapping.rs @@ -18,7 +18,7 @@ use futures::{ compat::Future01CompatExt, stream as new_stream, StreamExt as NewStreamExt, TryFutureExt, TryStreamExt, }; -use futures_ext::{BoxFuture, FutureExt, StreamExt}; +use futures_ext::StreamExt; use futures_old::{ stream::{self, FuturesUnordered}, Future, Stream, @@ -116,7 +116,6 @@ impl BonsaiDerived for RootFsnodeId { let derived = RootFsnodeId(derived); mapping .put(ctx.clone(), cs_id.clone(), derived.clone()) - .compat() .await?; Ok((cs_id, derived)) } @@ -157,14 +156,15 @@ impl RootFsnodeMapping { } } +#[async_trait] impl BonsaiDerivedMapping for RootFsnodeMapping { type Value = RootFsnodeId; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let gets = csids.into_iter().map(|cs_id| { self.fetch_fsnode(ctx.clone(), cs_id) .map(|maybe_root_fsnode_id| stream::iter_ok(maybe_root_fsnode_id.into_iter())) @@ -172,14 +172,14 @@ impl BonsaiDerivedMapping for RootFsnodeMapping { FuturesUnordered::from_iter(gets) .flatten() .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> { + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { self.blobstore .put(ctx, self.format_key(csid), id.into()) - .compat() - .boxify() + .await } } diff --git a/eden/mononoke/derived_data/mercurial_derived_data/mapping.rs b/eden/mononoke/derived_data/mercurial_derived_data/mapping.rs index e521f9ea94..27a611d078 100644 --- a/eden/mononoke/derived_data/mercurial_derived_data/mapping.rs +++ b/eden/mononoke/derived_data/mercurial_derived_data/mapping.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use blobrepo::BlobRepo; use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry}; use context::CoreContext; -use futures_ext::{BoxFuture, FutureExt as _}; +use futures::compat::Future01CompatExt; use futures_old::Future; use mercurial_types::HgChangesetId; use mononoke_types::{BonsaiChangeset, ChangesetId, RepositoryId}; @@ -56,14 +56,15 @@ impl HgChangesetIdMapping { } } +#[async_trait] impl BonsaiDerivedMapping for HgChangesetIdMapping { type Value = MappedHgChangesetId; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { self.mapping .get(ctx, self.repo_id, csids.into()) .map(|v| { @@ -71,10 +72,11 @@ impl BonsaiDerivedMapping for HgChangesetIdMapping { .map(|entry| (entry.bcs_id, MappedHgChangesetId(entry.hg_cs_id))) .collect() }) - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> { + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { self.mapping .add( ctx, @@ -85,6 +87,7 @@ impl BonsaiDerivedMapping for HgChangesetIdMapping { }, ) .map(|_| ()) - .boxify() + .compat() + .await } } diff --git a/eden/mononoke/derived_data/src/derive_impl.rs b/eden/mononoke/derived_data/src/derive_impl.rs index 2edd89c1ca..7f7630dadb 100644 --- a/eden/mononoke/derived_data/src/derive_impl.rs +++ b/eden/mononoke/derived_data/src/derive_impl.rs @@ -342,7 +342,7 @@ where Err(_) => (false, true), }; - let mut vs = mapping.get(ctx.clone(), vec![bcs_id]).compat().await?; + let mut vs = mapping.get(ctx.clone(), vec![bcs_id]).await?; let derived = vs.remove(&bcs_id); match derived { @@ -373,7 +373,7 @@ where ) .compat() .await?; - mapping.put(ctx.clone(), bcs_id, derived).compat().await?; + mapping.put(ctx.clone(), bcs_id, derived).await?; let res: Result<_, Error> = Ok(()); res }; @@ -543,10 +543,7 @@ impl DeriveNode { { // TODO: do not create intermediate hashmap, since this methods is going to be called // most often, to get derived value - let csids_to_id = derived_mapping - .get(ctx.clone(), vec![*csid]) - .compat() - .await?; + let csids_to_id = derived_mapping.get(ctx.clone(), vec![*csid]).await?; match csids_to_id.get(csid) { Some(id) => Ok(DeriveNode::Derived(id.clone())), None => Ok(DeriveNode::Bonsai(*csid)), @@ -641,14 +638,15 @@ mod test { } } + #[async_trait] impl BonsaiDerivedMapping for TestMapping { type Value = TestGenNum; - fn get( + async fn get( &self, _ctx: CoreContext, csids: Vec, - ) -> BoxFuture01, Error> { + ) -> Result, Error> { let mut res = hashmap! {}; { let mapping = self.mapping.lock().unwrap(); @@ -659,20 +657,20 @@ mod test { } } - future01::ok(res).boxify() + Ok(res) } - fn put( + async fn put( &self, _ctx: CoreContext, csid: ChangesetId, id: Self::Value, - ) -> BoxFuture01<(), Error> { + ) -> Result<(), Error> { { let mut mapping = self.mapping.lock().unwrap(); mapping.insert(csid, id); } - future01::ok(()).boxify() + Ok(()) } } @@ -696,7 +694,7 @@ mod test { ) .unwrap(); - let mapping = TestGenNum::mapping(&ctx, &repo); + let mapping = &TestGenNum::mapping(&ctx, &repo); let actual = runtime .block_on(TestGenNum::derive(ctx.clone(), repo.clone(), bcs_id)) .unwrap(); @@ -712,7 +710,7 @@ mod test { ) .and_then(move |new_bcs_id| { let parents = changeset_fetcher.get_parents(ctx.clone(), new_bcs_id.clone()); - let mapping = mapping.get(ctx.clone(), vec![new_bcs_id]); + let mapping = mapping.get(ctx.clone(), vec![new_bcs_id]).boxed().compat(); parents.join(mapping).map(move |(parents, mapping)| { let gen_num = mapping.get(&new_bcs_id).unwrap(); @@ -935,7 +933,7 @@ mod test { // schedule derivation runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?; assert_eq!( - runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + runtime.block_on_std(mapping.get(ctx.clone(), vec![csid]))?, HashMap::new() ); @@ -950,7 +948,7 @@ mod test { None => panic!("scheduled derivation should have been completed"), }; assert_eq!( - runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + runtime.block_on_std(mapping.get(ctx.clone(), vec![csid]))?, hashmap! { csid => result.clone() } ); @@ -1022,7 +1020,7 @@ mod test { // should succeed even though lease always fails let result = runtime.block_on(TestGenNum::derive(ctx.clone(), repo.clone(), csid))?; assert_eq!( - runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + runtime.block_on_std(mapping.get(ctx, vec![csid]))?, hashmap! { csid => result }, ); diff --git a/eden/mononoke/derived_data/src/lib.rs b/eden/mononoke/derived_data/src/lib.rs index 33bfe8dad8..9c772d3550 100644 --- a/eden/mononoke/derived_data/src/lib.rs +++ b/eden/mononoke/derived_data/src/lib.rs @@ -208,33 +208,35 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone { /// After derived data was generated then it will be stored in BonsaiDerivedMapping, which is /// normally a persistent store. This is used to avoid regenerating the same derived data over /// and over again. +#[async_trait] pub trait BonsaiDerivedMapping: Send + Sync + Clone { type Value: BonsaiDerived; /// Fetches mapping from bonsai changeset ids to generated value - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture01, Error>; + ) -> Result, Error>; /// Saves mapping between bonsai changeset and derived data id - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error>; + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error>; } +#[async_trait] impl BonsaiDerivedMapping for Arc { type Value = Mapping::Value; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture01, Error> { - (**self).get(ctx, csids) + ) -> Result, Error> { + (**self).get(ctx, csids).await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error> { - (**self).put(ctx, csid, id) + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { + (**self).put(ctx, csid, id).await } } @@ -260,24 +262,25 @@ impl RegenerateMapping { } } +#[async_trait] impl BonsaiDerivedMapping for RegenerateMapping where M: BonsaiDerivedMapping, { type Value = M::Value; - fn get( + async fn get( &self, ctx: CoreContext, mut csids: Vec, - ) -> BoxFuture01, Error> { + ) -> Result, Error> { self.regenerate .with(|regenerate| csids.retain(|id| !regenerate.contains(&id))); - self.base.get(ctx, csids) + self.base.get(ctx, csids).await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error> { + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { self.regenerate.with(|regenerate| regenerate.remove(&csid)); - self.base.put(ctx, csid, id) + self.base.put(ctx, csid, id).await } } diff --git a/eden/mononoke/derived_data/unodes/mapping.rs b/eden/mononoke/derived_data/unodes/mapping.rs index d039e62ea9..4dca9ac3d3 100644 --- a/eden/mononoke/derived_data/unodes/mapping.rs +++ b/eden/mononoke/derived_data/unodes/mapping.rs @@ -15,7 +15,7 @@ use context::CoreContext; use derived_data::{BonsaiDerived, BonsaiDerivedMapping}; use futures::compat::Future01CompatExt; use futures::future::TryFutureExt; -use futures_ext::{BoxFuture, FutureExt, StreamExt}; +use futures_ext::StreamExt; use futures_old::{ stream::{self, FuturesUnordered}, Future, Stream, @@ -132,14 +132,15 @@ impl RootUnodeManifestMapping { } } +#[async_trait] impl BonsaiDerivedMapping for RootUnodeManifestMapping { type Value = RootUnodeManifestId; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let gets = csids.into_iter().map(|cs_id| { self.fetch_unode(ctx.clone(), cs_id) .map(|maybe_root_mf_id| stream::iter_ok(maybe_root_mf_id.into_iter())) @@ -147,14 +148,14 @@ impl BonsaiDerivedMapping for RootUnodeManifestMapping { FuturesUnordered::from_iter(gets) .flatten() .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> { + async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> { self.blobstore .put(ctx, self.format_key(csid), id.into()) - .compat() - .boxify() + .await } } diff --git a/eden/mononoke/derived_data/utils/Cargo.toml b/eden/mononoke/derived_data/utils/Cargo.toml index fabb0cef8c..cac857cdd9 100644 --- a/eden/mononoke/derived_data/utils/Cargo.toml +++ b/eden/mononoke/derived_data/utils/Cargo.toml @@ -28,7 +28,6 @@ mononoke_types = { path = "../../mononoke_types" } topo_sort = { path = "../../common/topo_sort" } unodes = { path = "../unodes" } cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } lock_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" async-trait = "0.1.29" diff --git a/eden/mononoke/derived_data/utils/lib.rs b/eden/mononoke/derived_data/utils/lib.rs index 817ba42953..dd0cd71940 100644 --- a/eden/mononoke/derived_data/utils/lib.rs +++ b/eden/mononoke/derived_data/utils/lib.rs @@ -31,8 +31,6 @@ use futures::{ stream::{self, futures_unordered::FuturesUnordered}, Future, Stream, StreamExt, TryFutureExt, TryStreamExt, }; -use futures_ext::{BoxFuture as BoxFutureOld, FutureExt as OldFutureExt}; -use futures_old::{future as future_old, Future as OldFuture}; use lazy_static::lazy_static; use lock_ext::LockExt; use mercurial_derived_data::{HgChangesetIdMapping, MappedHgChangesetId}; @@ -259,11 +257,7 @@ where let buffer = in_memory_mapping.into_buffer(); let buffer = buffer.lock().unwrap(); for (cs_id, value) in buffer.iter() { - futs.push( - orig_mapping - .put(ctx.clone(), *cs_id, value.clone()) - .compat(), - ); + futs.push(orig_mapping.put(ctx.clone(), *cs_id, value.clone())); } } futs.try_for_each(|_| future::ok(())).await?; @@ -278,7 +272,7 @@ where _repo: BlobRepo, mut csids: Vec, ) -> Result, Error> { - let derived = self.mapping.get(ctx, csids.clone()).compat().await?; + let derived = self.mapping.get(ctx, csids.clone()).await?; csids.retain(|csid| !derived.contains_key(&csid)); Ok(csids) } @@ -356,6 +350,7 @@ where } } +#[async_trait] impl BonsaiDerivedMapping for InMemoryMapping where M: BonsaiDerivedMapping + Clone, @@ -363,37 +358,37 @@ where { type Value = M::Value; - fn get( + async fn get( &self, ctx: CoreContext, mut csids: Vec, - ) -> BoxFutureOld, Error> { - let buffer = self.buffer.lock().unwrap(); + ) -> Result, Error> { let mut ans = HashMap::new(); - csids.retain(|cs_id| { - if let Some(v) = buffer.get(cs_id) { - ans.insert(*cs_id, v.clone()); - false - } else { - true - } - }); + { + let buffer = self.buffer.lock().unwrap(); + csids.retain(|cs_id| { + if let Some(v) = buffer.get(cs_id) { + ans.insert(*cs_id, v.clone()); + false + } else { + true + } + }); + } - self.mapping - .get(ctx, csids) - .map(move |fetched| ans.into_iter().chain(fetched.into_iter()).collect()) - .boxify() + let fetched = self.mapping.get(ctx, csids).await?; + Ok(ans.into_iter().chain(fetched.into_iter()).collect()) } - fn put( + async fn put( &self, _ctx: CoreContext, csid: ChangesetId, id: Self::Value, - ) -> BoxFutureOld<(), Error> { + ) -> Result<(), Error> { let mut buffer = self.buffer.lock().unwrap(); buffer.insert(csid, id); - future_old::ok(()).boxify() + Ok(()) } } diff --git a/eden/mononoke/git/git_types/src/derive_tree.rs b/eden/mononoke/git/git_types/src/derive_tree.rs index e74e09e7be..a96f1ab686 100644 --- a/eden/mononoke/git/git_types/src/derive_tree.rs +++ b/eden/mononoke/git/git_types/src/derive_tree.rs @@ -11,7 +11,7 @@ use cloned::cloned; use context::CoreContext; use futures::compat::Future01CompatExt; use futures::future::{ready, FutureExt, TryFutureExt}; -use futures_ext::{BoxFuture, FutureExt as _, StreamExt}; +use futures_ext::{FutureExt as _, StreamExt}; use futures_old::{stream::futures_unordered, Future, IntoFuture, Stream}; use manifest::derive_manifest; use std::collections::HashMap; @@ -58,14 +58,15 @@ impl TreeMapping { } } +#[async_trait] impl BonsaiDerivedMapping for TreeMapping { type Value = TreeHandle; - fn get( + async fn get( &self, ctx: CoreContext, csids: Vec, - ) -> BoxFuture, Error> { + ) -> Result, Error> { let gets = csids .into_iter() .map(|cs_id| self.fetch_root(ctx.clone(), cs_id)); @@ -73,14 +74,19 @@ impl BonsaiDerivedMapping for TreeMapping { futures_unordered(gets) .filter_map(|maybe_handle| maybe_handle) .collect_to() - .boxify() + .compat() + .await } - fn put(&self, ctx: CoreContext, csid: ChangesetId, root: Self::Value) -> BoxFuture<(), Error> { + async fn put( + &self, + ctx: CoreContext, + csid: ChangesetId, + root: Self::Value, + ) -> Result<(), Error> { self.blobstore .put(ctx, self.root_key(csid), root.into()) - .compat() - .boxify() + .await } }