derived_data: change BonsaiDerivedMapping to use new futures

Summary:
This changes the methods from ones that return old `BoxFuture`s to an async method
using `async_trait`.

Reviewed By: krallin

Differential Revision: D24689506

fbshipit-source-id: 7b13010924369f81681e6590898af703c5423385
This commit is contained in:
Mark Juggurnauth-Thomas 2020-11-03 09:13:45 -08:00 committed by Facebook GitHub Bot
parent 6a12bcc562
commit 6b8a832433
15 changed files with 190 additions and 170 deletions

View File

@ -19,7 +19,7 @@ use futures::{
future::{FutureExt, TryFutureExt},
StreamExt, TryStreamExt,
};
use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt, StreamExt as _};
use futures_ext::{spawn_future, StreamExt as _};
use futures_old::{future, stream, Future, IntoFuture, Stream};
use manifest::find_intersection_of_diffs;
use mononoke_types::{
@ -114,14 +114,15 @@ impl BlameRootMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for BlameRootMapping {
type Value = BlameRoot;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let futs = csids.into_iter().map(|csid| {
self.blobstore
.get(ctx.clone(), self.format_key(&csid))
@ -131,18 +132,23 @@ impl BonsaiDerivedMapping for BlameRootMapping {
stream::FuturesUnordered::from_iter(futs)
.filter_map(|v| v)
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> {
async fn put(
&self,
ctx: CoreContext,
csid: ChangesetId,
_id: Self::Value,
) -> Result<(), Error> {
self.blobstore
.put(
ctx,
self.format_key(&csid),
BlobstoreBytes::from_bytes(Bytes::new()),
)
.compat()
.boxify()
.await
}
}

View File

@ -17,7 +17,6 @@ derived_data = { path = ".." }
derived_data-thrift = { path = "if" }
mononoke_types = { path = "../../mononoke_types" }
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
futures = { version = "0.3.5", features = ["async-await", "compat"] }

View File

@ -16,8 +16,8 @@ use blobstore::Blobstore;
use context::CoreContext;
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use fbthrift::compact_protocol;
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;
use futures_ext::{BoxFuture, FutureExt};
use futures_old::{stream::FuturesUnordered, Future, Stream};
use mononoke_types::{BlobstoreBytes, BonsaiChangeset, ChangesetId};
@ -58,14 +58,15 @@ impl ChangesetInfoMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for ChangesetInfoMapping {
type Value = ChangesetInfo;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let futs = csids.into_iter().map(|csid| {
self.blobstore
.get(ctx.clone(), self.format_key(&csid))
@ -81,18 +82,21 @@ impl BonsaiDerivedMapping for ChangesetInfoMapping {
.filter_map(|maybe_info| maybe_info)
.collect()
.and_then(move |infos| infos.into_iter().collect::<Result<HashMap<_, _>, Error>>())
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, info: Self::Value) -> BoxFuture<(), Error> {
async fn put(
&self,
ctx: CoreContext,
csid: ChangesetId,
info: Self::Value,
) -> Result<(), Error> {
let data = {
let data = compact_protocol::serialize(&info.into_thrift());
BlobstoreBytes::from_bytes(data)
};
self.blobstore
.put(ctx, self.format_key(&csid), data)
.compat()
.boxify()
self.blobstore.put(ctx, self.format_key(&csid), data).await
}
}

View File

@ -15,7 +15,7 @@ use context::CoreContext;
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use futures_ext::StreamExt;
use futures_old::{
stream::{self, FuturesUnordered},
Future, Stream,
@ -117,14 +117,15 @@ impl RootDeletedManifestMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for RootDeletedManifestMapping {
type Value = RootDeletedManifestId;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids.into_iter().map(|cs_id| {
self.fetch_deleted_manifest(ctx.clone(), cs_id)
.map(|maybe_root_mf_id| stream::iter_ok(maybe_root_mf_id.into_iter()))
@ -132,13 +133,13 @@ impl BonsaiDerivedMapping for RootDeletedManifestMapping {
FuturesUnordered::from_iter(gets)
.flatten()
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
self.blobstore
.put(ctx, self.format_key(csid), id.into())
.compat()
.boxify()
.await
}
}

View File

@ -16,7 +16,7 @@ use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;
use futures::stream::TryStreamExt;
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use futures_ext::StreamExt;
use futures_old::{future, stream::FuturesUnordered, Future, Stream};
use manifest::{find_intersection_of_diffs, Entry};
use mononoke_types::{BonsaiChangeset, ChangesetId, FileUnodeId, ManifestUnodeId};
@ -151,14 +151,15 @@ impl RootFastlogMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for RootFastlogMapping {
type Value = RootFastlog;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids.into_iter().map(|cs_id| {
self.blobstore
.get(ctx.clone(), self.format_key(&cs_id))
@ -168,10 +169,16 @@ impl BonsaiDerivedMapping for RootFastlogMapping {
FuturesUnordered::from_iter(gets)
.filter_map(|x| x) // Remove None
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> {
async fn put(
&self,
ctx: CoreContext,
csid: ChangesetId,
_id: Self::Value,
) -> Result<(), Error> {
self.blobstore
.put(
ctx,
@ -179,8 +186,7 @@ impl BonsaiDerivedMapping for RootFastlogMapping {
// Value doesn't matter here, so just put empty Value
BlobstoreBytes::from_bytes(Bytes::new()),
)
.compat()
.boxify()
.await
}
}
@ -200,6 +206,7 @@ mod tests {
unshared_merge_even, unshared_merge_uneven,
};
use futures::StreamExt;
use futures_ext::{BoxFuture, FutureExt};
use manifest::ManifestOps;
use maplit::btreemap;
use mercurial_types::HgChangesetId;

View File

@ -19,7 +19,6 @@ filenodes = { path = "../../filenodes" }
manifest = { path = "../../manifest" }
mercurial_types = { path = "../../mercurial/types" }
mononoke_types = { path = "../../mononoke_types" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"

View File

@ -12,16 +12,14 @@ use async_trait::async_trait;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use cloned::cloned;
use context::CoreContext;
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use filenodes::{FilenodeInfo, FilenodeResult, PreparedFilenode};
use futures::{
compat::Future01CompatExt, future::try_join_all, stream, FutureExt, StreamExt, TryFutureExt,
TryStreamExt,
compat::Future01CompatExt, future::try_join_all, stream, StreamExt, TryFutureExt, TryStreamExt,
};
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_old::{future as old_future, Future};
use futures_ext::FutureExt as OldFutureExt;
use futures_old::Future;
use futures_util::try_join;
use itertools::{Either, Itertools};
use manifest::{find_intersection_of_diffs_and_parents, Entry};
@ -312,19 +310,18 @@ impl FilenodesOnlyPublicMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping {
type Value = FilenodesOnlyPublic;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
cloned!(self.repo);
async move {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
stream::iter(csids.into_iter())
.map({
let repo = &repo;
let repo = &self.repo;
let ctx = &ctx;
move |cs_id| async move {
let filenode_res = fetch_root_filenode(&ctx, cs_id, &repo).await?;
@ -350,12 +347,13 @@ impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping {
.try_collect()
.await
}
.boxed()
.compat()
.boxify()
}
fn put(&self, ctx: CoreContext, _csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
async fn put(
&self,
ctx: CoreContext,
_csid: ChangesetId,
id: Self::Value,
) -> Result<(), Error> {
let filenodes = self.repo.get_filenodes();
let repo_id = self.repo.get_repoid();
@ -365,7 +363,8 @@ impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping {
};
match root_filenode {
Some(root_filenode) => filenodes
Some(root_filenode) => {
filenodes
.add_filenodes(ctx.clone(), vec![root_filenode.into()], repo_id)
.map(|res| match res {
// If filenodes are disabled then just return success
@ -373,8 +372,10 @@ impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping {
// to FilenodeResult enum
FilenodeResult::Present(()) | FilenodeResult::Disabled => {}
})
.boxify(),
None => old_future::ok(()).boxify(),
.compat()
.await
}
None => Ok(()),
}
}
}
@ -629,7 +630,6 @@ mod tests {
// Make sure they are in the mapping
let maps = FilenodesOnlyPublic::mapping(&ctx, &repo)
.get(ctx.clone(), vec![parent_empty, child_empty])
.compat()
.await?;
assert_eq!(maps.len(), 2);
@ -659,7 +659,6 @@ mod tests {
// Make sure they are in the mapping
let maps = mapping
.get(ctx.clone(), vec![child_empty, parent_empty])
.compat()
.await?;
assert_eq!(maps.len(), 2);
Ok(())
@ -692,7 +691,7 @@ mod tests {
assert_eq!(derived, FilenodesOnlyPublic::Disabled);
let mapping = FilenodesOnlyPublic::mapping(&ctx, &repo);
let res = mapping.get(ctx.clone(), vec![cs]).compat().await?;
let res = mapping.get(ctx.clone(), vec![cs]).await?;
assert_eq!(res.get(&cs).unwrap(), &FilenodesOnlyPublic::Disabled);

View File

@ -18,7 +18,7 @@ use futures::{
compat::Future01CompatExt, stream as new_stream, StreamExt as NewStreamExt, TryFutureExt,
TryStreamExt,
};
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use futures_ext::StreamExt;
use futures_old::{
stream::{self, FuturesUnordered},
Future, Stream,
@ -116,7 +116,6 @@ impl BonsaiDerived for RootFsnodeId {
let derived = RootFsnodeId(derived);
mapping
.put(ctx.clone(), cs_id.clone(), derived.clone())
.compat()
.await?;
Ok((cs_id, derived))
}
@ -157,14 +156,15 @@ impl RootFsnodeMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for RootFsnodeMapping {
type Value = RootFsnodeId;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids.into_iter().map(|cs_id| {
self.fetch_fsnode(ctx.clone(), cs_id)
.map(|maybe_root_fsnode_id| stream::iter_ok(maybe_root_fsnode_id.into_iter()))
@ -172,14 +172,14 @@ impl BonsaiDerivedMapping for RootFsnodeMapping {
FuturesUnordered::from_iter(gets)
.flatten()
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
self.blobstore
.put(ctx, self.format_key(csid), id.into())
.compat()
.boxify()
.await
}
}

View File

@ -10,7 +10,7 @@ use async_trait::async_trait;
use blobrepo::BlobRepo;
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry};
use context::CoreContext;
use futures_ext::{BoxFuture, FutureExt as _};
use futures::compat::Future01CompatExt;
use futures_old::Future;
use mercurial_types::HgChangesetId;
use mononoke_types::{BonsaiChangeset, ChangesetId, RepositoryId};
@ -56,14 +56,15 @@ impl HgChangesetIdMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for HgChangesetIdMapping {
type Value = MappedHgChangesetId;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
self.mapping
.get(ctx, self.repo_id, csids.into())
.map(|v| {
@ -71,10 +72,11 @@ impl BonsaiDerivedMapping for HgChangesetIdMapping {
.map(|entry| (entry.bcs_id, MappedHgChangesetId(entry.hg_cs_id)))
.collect()
})
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
self.mapping
.add(
ctx,
@ -85,6 +87,7 @@ impl BonsaiDerivedMapping for HgChangesetIdMapping {
},
)
.map(|_| ())
.boxify()
.compat()
.await
}
}

View File

@ -342,7 +342,7 @@ where
Err(_) => (false, true),
};
let mut vs = mapping.get(ctx.clone(), vec![bcs_id]).compat().await?;
let mut vs = mapping.get(ctx.clone(), vec![bcs_id]).await?;
let derived = vs.remove(&bcs_id);
match derived {
@ -373,7 +373,7 @@ where
)
.compat()
.await?;
mapping.put(ctx.clone(), bcs_id, derived).compat().await?;
mapping.put(ctx.clone(), bcs_id, derived).await?;
let res: Result<_, Error> = Ok(());
res
};
@ -543,10 +543,7 @@ impl<Derived: BonsaiDerived> DeriveNode<Derived> {
{
// TODO: do not create intermediate hashmap, since this methods is going to be called
// most often, to get derived value
let csids_to_id = derived_mapping
.get(ctx.clone(), vec![*csid])
.compat()
.await?;
let csids_to_id = derived_mapping.get(ctx.clone(), vec![*csid]).await?;
match csids_to_id.get(csid) {
Some(id) => Ok(DeriveNode::Derived(id.clone())),
None => Ok(DeriveNode::Bonsai(*csid)),
@ -641,14 +638,15 @@ mod test {
}
}
#[async_trait]
impl BonsaiDerivedMapping for TestMapping {
type Value = TestGenNum;
fn get(
async fn get(
&self,
_ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture01<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let mut res = hashmap! {};
{
let mapping = self.mapping.lock().unwrap();
@ -659,20 +657,20 @@ mod test {
}
}
future01::ok(res).boxify()
Ok(res)
}
fn put(
async fn put(
&self,
_ctx: CoreContext,
csid: ChangesetId,
id: Self::Value,
) -> BoxFuture01<(), Error> {
) -> Result<(), Error> {
{
let mut mapping = self.mapping.lock().unwrap();
mapping.insert(csid, id);
}
future01::ok(()).boxify()
Ok(())
}
}
@ -696,7 +694,7 @@ mod test {
)
.unwrap();
let mapping = TestGenNum::mapping(&ctx, &repo);
let mapping = &TestGenNum::mapping(&ctx, &repo);
let actual = runtime
.block_on(TestGenNum::derive(ctx.clone(), repo.clone(), bcs_id))
.unwrap();
@ -712,7 +710,7 @@ mod test {
)
.and_then(move |new_bcs_id| {
let parents = changeset_fetcher.get_parents(ctx.clone(), new_bcs_id.clone());
let mapping = mapping.get(ctx.clone(), vec![new_bcs_id]);
let mapping = mapping.get(ctx.clone(), vec![new_bcs_id]).boxed().compat();
parents.join(mapping).map(move |(parents, mapping)| {
let gen_num = mapping.get(&new_bcs_id).unwrap();
@ -935,7 +933,7 @@ mod test {
// schedule derivation
runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?;
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
runtime.block_on_std(mapping.get(ctx.clone(), vec![csid]))?,
HashMap::new()
);
@ -950,7 +948,7 @@ mod test {
None => panic!("scheduled derivation should have been completed"),
};
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
runtime.block_on_std(mapping.get(ctx.clone(), vec![csid]))?,
hashmap! { csid => result.clone() }
);
@ -1022,7 +1020,7 @@ mod test {
// should succeed even though lease always fails
let result = runtime.block_on(TestGenNum::derive(ctx.clone(), repo.clone(), csid))?;
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
runtime.block_on_std(mapping.get(ctx, vec![csid]))?,
hashmap! { csid => result },
);

View File

@ -208,33 +208,35 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
/// After derived data was generated then it will be stored in BonsaiDerivedMapping, which is
/// normally a persistent store. This is used to avoid regenerating the same derived data over
/// and over again.
#[async_trait]
pub trait BonsaiDerivedMapping: Send + Sync + Clone {
type Value: BonsaiDerived;
/// Fetches mapping from bonsai changeset ids to generated value
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture01<HashMap<ChangesetId, Self::Value>, Error>;
) -> Result<HashMap<ChangesetId, Self::Value>, Error>;
/// Saves mapping between bonsai changeset and derived data id
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error>;
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error>;
}
#[async_trait]
impl<Mapping: BonsaiDerivedMapping> BonsaiDerivedMapping for Arc<Mapping> {
type Value = Mapping::Value;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture01<HashMap<ChangesetId, Self::Value>, Error> {
(**self).get(ctx, csids)
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
(**self).get(ctx, csids).await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error> {
(**self).put(ctx, csid, id)
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
(**self).put(ctx, csid, id).await
}
}
@ -260,24 +262,25 @@ impl<M> RegenerateMapping<M> {
}
}
#[async_trait]
impl<M> BonsaiDerivedMapping for RegenerateMapping<M>
where
M: BonsaiDerivedMapping,
{
type Value = M::Value;
fn get(
async fn get(
&self,
ctx: CoreContext,
mut csids: Vec<ChangesetId>,
) -> BoxFuture01<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
self.regenerate
.with(|regenerate| csids.retain(|id| !regenerate.contains(&id)));
self.base.get(ctx, csids)
self.base.get(ctx, csids).await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture01<(), Error> {
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
self.regenerate.with(|regenerate| regenerate.remove(&csid));
self.base.put(ctx, csid, id)
self.base.put(ctx, csid, id).await
}
}

View File

@ -15,7 +15,7 @@ use context::CoreContext;
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use futures_ext::StreamExt;
use futures_old::{
stream::{self, FuturesUnordered},
Future, Stream,
@ -132,14 +132,15 @@ impl RootUnodeManifestMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for RootUnodeManifestMapping {
type Value = RootUnodeManifestId;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids.into_iter().map(|cs_id| {
self.fetch_unode(ctx.clone(), cs_id)
.map(|maybe_root_mf_id| stream::iter_ok(maybe_root_mf_id.into_iter()))
@ -147,14 +148,14 @@ impl BonsaiDerivedMapping for RootUnodeManifestMapping {
FuturesUnordered::from_iter(gets)
.flatten()
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> BoxFuture<(), Error> {
async fn put(&self, ctx: CoreContext, csid: ChangesetId, id: Self::Value) -> Result<(), Error> {
self.blobstore
.put(ctx, self.format_key(csid), id.into())
.compat()
.boxify()
.await
}
}

View File

@ -28,7 +28,6 @@ mononoke_types = { path = "../../mononoke_types" }
topo_sort = { path = "../../common/topo_sort" }
unodes = { path = "../unodes" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
lock_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"

View File

@ -31,8 +31,6 @@ use futures::{
stream::{self, futures_unordered::FuturesUnordered},
Future, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use futures_ext::{BoxFuture as BoxFutureOld, FutureExt as OldFutureExt};
use futures_old::{future as future_old, Future as OldFuture};
use lazy_static::lazy_static;
use lock_ext::LockExt;
use mercurial_derived_data::{HgChangesetIdMapping, MappedHgChangesetId};
@ -259,11 +257,7 @@ where
let buffer = in_memory_mapping.into_buffer();
let buffer = buffer.lock().unwrap();
for (cs_id, value) in buffer.iter() {
futs.push(
orig_mapping
.put(ctx.clone(), *cs_id, value.clone())
.compat(),
);
futs.push(orig_mapping.put(ctx.clone(), *cs_id, value.clone()));
}
}
futs.try_for_each(|_| future::ok(())).await?;
@ -278,7 +272,7 @@ where
_repo: BlobRepo,
mut csids: Vec<ChangesetId>,
) -> Result<Vec<ChangesetId>, Error> {
let derived = self.mapping.get(ctx, csids.clone()).compat().await?;
let derived = self.mapping.get(ctx, csids.clone()).await?;
csids.retain(|csid| !derived.contains_key(&csid));
Ok(csids)
}
@ -356,6 +350,7 @@ where
}
}
#[async_trait]
impl<M> BonsaiDerivedMapping for InMemoryMapping<M>
where
M: BonsaiDerivedMapping + Clone,
@ -363,13 +358,14 @@ where
{
type Value = M::Value;
fn get(
async fn get(
&self,
ctx: CoreContext,
mut csids: Vec<ChangesetId>,
) -> BoxFutureOld<HashMap<ChangesetId, Self::Value>, Error> {
let buffer = self.buffer.lock().unwrap();
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let mut ans = HashMap::new();
{
let buffer = self.buffer.lock().unwrap();
csids.retain(|cs_id| {
if let Some(v) = buffer.get(cs_id) {
ans.insert(*cs_id, v.clone());
@ -378,22 +374,21 @@ where
true
}
});
self.mapping
.get(ctx, csids)
.map(move |fetched| ans.into_iter().chain(fetched.into_iter()).collect())
.boxify()
}
fn put(
let fetched = self.mapping.get(ctx, csids).await?;
Ok(ans.into_iter().chain(fetched.into_iter()).collect())
}
async fn put(
&self,
_ctx: CoreContext,
csid: ChangesetId,
id: Self::Value,
) -> BoxFutureOld<(), Error> {
) -> Result<(), Error> {
let mut buffer = self.buffer.lock().unwrap();
buffer.insert(csid, id);
future_old::ok(()).boxify()
Ok(())
}
}

View File

@ -11,7 +11,7 @@ use cloned::cloned;
use context::CoreContext;
use futures::compat::Future01CompatExt;
use futures::future::{ready, FutureExt, TryFutureExt};
use futures_ext::{BoxFuture, FutureExt as _, StreamExt};
use futures_ext::{FutureExt as _, StreamExt};
use futures_old::{stream::futures_unordered, Future, IntoFuture, Stream};
use manifest::derive_manifest;
use std::collections::HashMap;
@ -58,14 +58,15 @@ impl TreeMapping {
}
}
#[async_trait]
impl BonsaiDerivedMapping for TreeMapping {
type Value = TreeHandle;
fn get(
async fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
) -> Result<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids
.into_iter()
.map(|cs_id| self.fetch_root(ctx.clone(), cs_id));
@ -73,14 +74,19 @@ impl BonsaiDerivedMapping for TreeMapping {
futures_unordered(gets)
.filter_map(|maybe_handle| maybe_handle)
.collect_to()
.boxify()
.compat()
.await
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, root: Self::Value) -> BoxFuture<(), Error> {
async fn put(
&self,
ctx: CoreContext,
csid: ChangesetId,
root: Self::Value,
) -> Result<(), Error> {
self.blobstore
.put(ctx, self.root_key(csid), root.into())
.compat()
.boxify()
.await
}
}