mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 08:47:12 +03:00
derived_data: change BonsaiDerived::derive_from_parents to use new futures
Summary: This changes the trait method from one that returns an old `BoxFuture` to an async method using `async_trait`. Reviewed By: krallin Differential Revision: D24686888 fbshipit-source-id: 0ac231cdbb60d256b6d5ad5aafbe8779b96905f3
This commit is contained in:
parent
54fe3f8c65
commit
6a12bcc562
@ -21,6 +21,7 @@ unodes = { path = "../unodes" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
bytes = { version = "0.5", features = ["serde"] }
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
|
@ -6,6 +6,7 @@
|
||||
*/
|
||||
|
||||
use anyhow::{format_err, Error};
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::{Blobstore, BlobstoreBytes, Loadable};
|
||||
use bytes::Bytes;
|
||||
@ -34,6 +35,7 @@ pub const BLAME_FILESIZE_LIMIT: u64 = 10 * 1024 * 1024;
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct BlameRoot(ChangesetId);
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for BlameRoot {
|
||||
const NAME: &'static str = "blame";
|
||||
type Mapping = BlameRootMapping;
|
||||
@ -42,12 +44,12 @@ impl BonsaiDerived for BlameRoot {
|
||||
BlameRootMapping::new(repo.blobstore().boxed())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
_parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let csid = bonsai.get_changeset_id();
|
||||
let root_manifest = RootUnodeManifestId::derive(ctx.clone(), repo.clone(), csid)
|
||||
.from_err()
|
||||
@ -92,7 +94,8 @@ impl BonsaiDerived for BlameRoot {
|
||||
.for_each(|_| Ok(()))
|
||||
.map(move |_| BlameRoot(csid))
|
||||
})
|
||||
.boxify()
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ mononoke_types = { path = "../../mononoke_types" }
|
||||
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
unicode-segmentation = "1.6.0"
|
||||
|
@ -5,9 +5,12 @@
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use anyhow::Error;
|
||||
use std::{collections::HashMap, iter::FromIterator, sync::Arc};
|
||||
use std::collections::HashMap;
|
||||
use std::iter::FromIterator;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::Blobstore;
|
||||
use context::CoreContext;
|
||||
@ -15,11 +18,12 @@ use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
use fbthrift::compact_protocol;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use futures_old::{future, stream::FuturesUnordered, Future, Stream};
|
||||
use futures_old::{stream::FuturesUnordered, Future, Stream};
|
||||
use mononoke_types::{BlobstoreBytes, BonsaiChangeset, ChangesetId};
|
||||
|
||||
use crate::ChangesetInfo;
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for ChangesetInfo {
|
||||
const NAME: &'static str = "changeset_info";
|
||||
type Mapping = ChangesetInfoMapping;
|
||||
@ -28,14 +32,14 @@ impl BonsaiDerived for ChangesetInfo {
|
||||
ChangesetInfoMapping::new(repo.blobstore().boxed())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
_ctx: CoreContext,
|
||||
_repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
_parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let csid = bonsai.get_changeset_id();
|
||||
future::ok(ChangesetInfo::new(csid, bonsai)).boxify()
|
||||
Ok(ChangesetInfo::new(csid, bonsai))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ unodes = { path = "../unodes" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
bytes = { version = "0.5", features = ["serde"] }
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
|
@ -7,12 +7,14 @@
|
||||
|
||||
use crate::derive::{derive_deleted_files_manifest, get_changes};
|
||||
use anyhow::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::{Blobstore, BlobstoreGetData};
|
||||
use bytes::Bytes;
|
||||
use context::CoreContext;
|
||||
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
use futures::future::{FutureExt as NewFutureExt, TryFutureExt};
|
||||
use futures::compat::Future01CompatExt;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures_ext::{BoxFuture, FutureExt, StreamExt};
|
||||
use futures_old::{
|
||||
stream::{self, FuturesUnordered},
|
||||
@ -55,6 +57,7 @@ impl From<RootDeletedManifestId> for BlobstoreBytes {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for RootDeletedManifestId {
|
||||
const NAME: &'static str = "deleted_manifest";
|
||||
type Mapping = RootDeletedManifestMapping;
|
||||
@ -63,30 +66,27 @@ impl BonsaiDerived for RootDeletedManifestId {
|
||||
RootDeletedManifestMapping::new(repo.blobstore().clone())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
get_changes(ctx.clone(), repo.clone(), bonsai)
|
||||
.boxed()
|
||||
.compat()
|
||||
.and_then(move |changes| {
|
||||
derive_deleted_files_manifest(
|
||||
ctx,
|
||||
repo,
|
||||
bcs_id,
|
||||
parents
|
||||
.into_iter()
|
||||
.map(|root_mf_id| root_mf_id.deleted_manifest_id().clone())
|
||||
.collect(),
|
||||
changes,
|
||||
)
|
||||
})
|
||||
.map(RootDeletedManifestId)
|
||||
.boxify()
|
||||
let changes = get_changes(ctx.clone(), repo.clone(), bonsai).await?;
|
||||
derive_deleted_files_manifest(
|
||||
ctx,
|
||||
repo,
|
||||
bcs_id,
|
||||
parents
|
||||
.into_iter()
|
||||
.map(|root_mf_id| root_mf_id.deleted_manifest_id().clone())
|
||||
.collect(),
|
||||
changes,
|
||||
)
|
||||
.map(RootDeletedManifestId)
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,17 +6,16 @@
|
||||
*/
|
||||
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::{Blobstore, BlobstoreBytes, Loadable};
|
||||
use bytes::Bytes;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
use futures::{
|
||||
compat::Future01CompatExt,
|
||||
future::{FutureExt as NewFutureExt, TryFutureExt},
|
||||
stream::TryStreamExt,
|
||||
};
|
||||
use futures::compat::Future01CompatExt;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures::stream::TryStreamExt;
|
||||
use futures_ext::{BoxFuture, FutureExt, StreamExt};
|
||||
use futures_old::{future, stream::FuturesUnordered, Future, Stream};
|
||||
use manifest::{find_intersection_of_diffs, Entry};
|
||||
@ -51,6 +50,7 @@ pub enum ErrorKind {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RootFastlog(ChangesetId);
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for RootFastlog {
|
||||
const NAME: &'static str = "fastlog";
|
||||
type Mapping = RootFastlogMapping;
|
||||
@ -59,51 +59,43 @@ impl BonsaiDerived for RootFastlog {
|
||||
RootFastlogMapping::new(repo.blobstore().boxed())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
_parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
async move {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
let (root_unode_mf_id, parents) =
|
||||
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), bcs_id)
|
||||
.from_err()
|
||||
.join(fetch_parent_root_unodes(ctx.clone(), repo.clone(), bonsai))
|
||||
.compat()
|
||||
.await?;
|
||||
|
||||
let blobstore = repo.get_blobstore().boxed();
|
||||
let unode_mf_id = root_unode_mf_id.manifest_unode_id().clone();
|
||||
|
||||
find_intersection_of_diffs(ctx.clone(), blobstore.clone(), unode_mf_id, parents)
|
||||
.map_ok(move |(_, entry)| {
|
||||
cloned!(blobstore, ctx);
|
||||
async move {
|
||||
let res = tokio::spawn(async move {
|
||||
let parents = fetch_unode_parents(&ctx, &blobstore, entry).await?;
|
||||
|
||||
let fastlog_batch =
|
||||
create_new_batch(&ctx, &blobstore, parents, bcs_id).await?;
|
||||
|
||||
save_fastlog_batch_by_unode_id(&ctx, &blobstore, entry, fastlog_batch)
|
||||
.await
|
||||
})
|
||||
.await?;
|
||||
|
||||
res
|
||||
}
|
||||
})
|
||||
.try_buffer_unordered(100)
|
||||
.try_for_each(|_| async { Ok(()) })
|
||||
) -> Result<Self, Error> {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
let (root_unode_mf_id, parents) =
|
||||
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), bcs_id)
|
||||
.from_err()
|
||||
.join(fetch_parent_root_unodes(ctx.clone(), repo.clone(), bonsai))
|
||||
.compat()
|
||||
.await?;
|
||||
|
||||
Ok(RootFastlog(bcs_id))
|
||||
}
|
||||
.boxed()
|
||||
.compat()
|
||||
.boxify()
|
||||
let blobstore = repo.get_blobstore().boxed();
|
||||
let unode_mf_id = root_unode_mf_id.manifest_unode_id().clone();
|
||||
|
||||
find_intersection_of_diffs(ctx.clone(), blobstore.clone(), unode_mf_id, parents)
|
||||
.map_ok(move |(_, entry)| {
|
||||
cloned!(blobstore, ctx);
|
||||
async move {
|
||||
tokio::spawn(async move {
|
||||
let parents = fetch_unode_parents(&ctx, &blobstore, entry).await?;
|
||||
|
||||
let fastlog_batch =
|
||||
create_new_batch(&ctx, &blobstore, parents, bcs_id).await?;
|
||||
|
||||
save_fastlog_batch_by_unode_id(&ctx, &blobstore, entry, fastlog_batch).await
|
||||
})
|
||||
.await?
|
||||
}
|
||||
})
|
||||
.try_buffer_unordered(100)
|
||||
.try_for_each(|_| async { Ok(()) })
|
||||
.await?;
|
||||
|
||||
Ok(RootFastlog(bcs_id))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ mononoke_types = { path = "../../mononoke_types" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
futures-util = "0.3"
|
||||
|
@ -8,6 +8,7 @@
|
||||
#![deny(warnings)]
|
||||
|
||||
use anyhow::{format_err, Error};
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobrepo_hg::BlobRepoHg;
|
||||
use blobstore::Loadable;
|
||||
@ -103,6 +104,7 @@ pub enum FilenodesOnlyPublic {
|
||||
Disabled,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for FilenodesOnlyPublic {
|
||||
const NAME: &'static str = "filenodes";
|
||||
type Mapping = FilenodesOnlyPublicMapping;
|
||||
@ -111,51 +113,46 @@ impl BonsaiDerived for FilenodesOnlyPublic {
|
||||
FilenodesOnlyPublicMapping::new(repo.clone())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
_parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
async move {
|
||||
let filenodes = generate_all_filenodes(&ctx, &repo, bonsai.get_changeset_id()).await?;
|
||||
) -> Result<Self, Error> {
|
||||
let filenodes = generate_all_filenodes(&ctx, &repo, bonsai.get_changeset_id()).await?;
|
||||
|
||||
if filenodes.is_empty() {
|
||||
// This commit didn't create any new filenodes, and it's root manifest is the
|
||||
// same as one of the parents (that can happen if this commit is empty).
|
||||
// In that case we don't need to insert a root filenode - it will be inserted
|
||||
// when parent is derived.
|
||||
Ok(FilenodesOnlyPublic::Present {
|
||||
root_filenode: None,
|
||||
})
|
||||
} else {
|
||||
let (roots, non_roots): (Vec<_>, Vec<_>) =
|
||||
filenodes.into_iter().partition_map(classify_filenode);
|
||||
let mut roots = roots.into_iter();
|
||||
if filenodes.is_empty() {
|
||||
// This commit didn't create any new filenodes, and it's root manifest is the
|
||||
// same as one of the parents (that can happen if this commit is empty).
|
||||
// In that case we don't need to insert a root filenode - it will be inserted
|
||||
// when parent is derived.
|
||||
Ok(FilenodesOnlyPublic::Present {
|
||||
root_filenode: None,
|
||||
})
|
||||
} else {
|
||||
let (roots, non_roots): (Vec<_>, Vec<_>) =
|
||||
filenodes.into_iter().partition_map(classify_filenode);
|
||||
let mut roots = roots.into_iter();
|
||||
|
||||
match (roots.next(), roots.next()) {
|
||||
(Some(root_filenode), None) => {
|
||||
let filenodes = repo.get_filenodes();
|
||||
let repo_id = repo.get_repoid();
|
||||
let filenode_res = filenodes
|
||||
.add_filenodes(ctx.clone(), non_roots, repo_id)
|
||||
.compat()
|
||||
.await?;
|
||||
match (roots.next(), roots.next()) {
|
||||
(Some(root_filenode), None) => {
|
||||
let filenodes = repo.get_filenodes();
|
||||
let repo_id = repo.get_repoid();
|
||||
let filenode_res = filenodes
|
||||
.add_filenodes(ctx.clone(), non_roots, repo_id)
|
||||
.compat()
|
||||
.await?;
|
||||
|
||||
match filenode_res {
|
||||
FilenodeResult::Present(()) => Ok(FilenodesOnlyPublic::Present {
|
||||
root_filenode: Some(root_filenode),
|
||||
}),
|
||||
FilenodeResult::Disabled => Ok(FilenodesOnlyPublic::Disabled),
|
||||
}
|
||||
match filenode_res {
|
||||
FilenodeResult::Present(()) => Ok(FilenodesOnlyPublic::Present {
|
||||
root_filenode: Some(root_filenode),
|
||||
}),
|
||||
FilenodeResult::Disabled => Ok(FilenodesOnlyPublic::Disabled),
|
||||
}
|
||||
_ => Err(format_err!("expected exactly one root, found {:?}", roots)),
|
||||
}
|
||||
_ => Err(format_err!("expected exactly one root, found {:?}", roots)),
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
.compat()
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,12 +76,12 @@ impl BonsaiDerived for RootFsnodeId {
|
||||
RootFsnodeMapping::new(repo.blobstore().clone())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
derive_fsnode(
|
||||
ctx,
|
||||
repo,
|
||||
@ -92,7 +92,8 @@ impl BonsaiDerived for RootFsnodeId {
|
||||
get_file_changes(&bonsai),
|
||||
)
|
||||
.map(RootFsnodeId)
|
||||
.boxify()
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn batch_derive<'a, Iter>(
|
||||
|
@ -27,5 +27,6 @@ stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch
|
||||
time_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
tracing = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
|
@ -6,10 +6,10 @@
|
||||
*/
|
||||
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry};
|
||||
use context::CoreContext;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use futures_ext::{BoxFuture, FutureExt as _};
|
||||
use futures_old::Future;
|
||||
use mercurial_types::HgChangesetId;
|
||||
@ -22,6 +22,7 @@ use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct MappedHgChangesetId(pub HgChangesetId);
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for MappedHgChangesetId {
|
||||
const NAME: &'static str = "hgchangesets";
|
||||
type Mapping = HgChangesetIdMapping;
|
||||
@ -30,16 +31,13 @@ impl BonsaiDerived for MappedHgChangesetId {
|
||||
HgChangesetIdMapping::new(repo)
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
crate::derive_hg_changeset::derive_from_parents(ctx, repo, bonsai, parents)
|
||||
.boxed()
|
||||
.compat()
|
||||
.boxify()
|
||||
) -> Result<Self, Error> {
|
||||
crate::derive_hg_changeset::derive_from_parents(ctx, repo, bonsai, parents).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@ use cacheblob::LeaseOps;
|
||||
use context::CoreContext;
|
||||
use futures::{
|
||||
compat::Future01CompatExt,
|
||||
future::{try_join, try_join_all, TryFutureExt},
|
||||
future::{try_join, try_join_all, FutureExt, TryFutureExt},
|
||||
TryStreamExt,
|
||||
};
|
||||
use futures_ext::FutureExt as FutureExt01;
|
||||
@ -360,6 +360,8 @@ where
|
||||
let deriver = async {
|
||||
let derived =
|
||||
Derived::derive_from_parents(ctx.clone(), repo.clone(), bcs, parents)
|
||||
.boxed()
|
||||
.compat()
|
||||
.traced_with_id(
|
||||
&ctx.trace(),
|
||||
"derive::derive_from_parents",
|
||||
@ -557,6 +559,7 @@ mod test {
|
||||
use super::*;
|
||||
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use blobrepo_hg::BlobRepoHg;
|
||||
use blobrepo_override::DangerousOverride;
|
||||
use bookmarks::BookmarkName;
|
||||
@ -595,6 +598,7 @@ mod test {
|
||||
#[derive(Clone, Hash, Eq, Ord, PartialEq, PartialOrd, Debug)]
|
||||
struct TestGenNum(u64, ChangesetId, Vec<ChangesetId>);
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for TestGenNum {
|
||||
const NAME: &'static str = "test_gen_num";
|
||||
type Mapping = TestMapping;
|
||||
@ -604,20 +608,19 @@ mod test {
|
||||
MAPPINGS.with(|m| m.entry(session).or_insert_with(TestMapping::new).clone())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
_ctx: CoreContext,
|
||||
_repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture01<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let parent_commits = parents.iter().map(|x| x.1).collect();
|
||||
|
||||
future01::ok(Self(
|
||||
Ok(Self(
|
||||
parents.into_iter().max().map(|x| x.0).unwrap_or(0) + 1,
|
||||
bonsai.get_changeset_id(),
|
||||
parent_commits,
|
||||
))
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,9 +11,8 @@ use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use context::CoreContext;
|
||||
use futures::{
|
||||
compat::Future01CompatExt, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
|
||||
};
|
||||
use futures::future::{FutureExt, TryFutureExt};
|
||||
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as _};
|
||||
use lock_ext::LockExt;
|
||||
use mononoke_types::{BonsaiChangeset, ChangesetId, RepositoryId};
|
||||
@ -65,12 +64,12 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
|
||||
/// For example, to derive HgChangesetId we also need to derive all filenodes and all manifests
|
||||
/// and then store them in blobstore. Derived data library is only responsible for
|
||||
/// updating BonsaiDerivedMapping.
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture01<Self, Error>;
|
||||
) -> Result<Self, Error>;
|
||||
|
||||
/// TODO(ahornby) delete onces all callsites using ::derive03
|
||||
///
|
||||
@ -197,9 +196,7 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
|
||||
{
|
||||
let iter = csids.into_iter();
|
||||
stream::iter(iter.map(|cs_id| async move {
|
||||
let derived = Self::derive(ctx.clone(), repo.clone(), cs_id)
|
||||
.compat()
|
||||
.await?;
|
||||
let derived = Self::derive03(ctx, repo, cs_id).await?;
|
||||
Ok((cs_id, derived))
|
||||
}))
|
||||
.buffered(100)
|
||||
|
@ -21,6 +21,7 @@ repo_blobstore = { path = "../../blobrepo/repo_blobstore" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
bytes = { version = "0.5", features = ["serde"] }
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
|
@ -7,11 +7,13 @@
|
||||
|
||||
use crate::derive::derive_unode_manifest;
|
||||
use anyhow::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::{Blobstore, BlobstoreGetData};
|
||||
use bytes::Bytes;
|
||||
use context::CoreContext;
|
||||
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
|
||||
use futures::compat::Future01CompatExt;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures_ext::{BoxFuture, FutureExt, StreamExt};
|
||||
use futures_old::{
|
||||
@ -60,6 +62,7 @@ impl From<RootUnodeManifestId> for BlobstoreBytes {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for RootUnodeManifestId {
|
||||
const NAME: &'static str = "unodes";
|
||||
type Mapping = RootUnodeManifestMapping;
|
||||
@ -71,12 +74,12 @@ impl BonsaiDerived for RootUnodeManifestId {
|
||||
)
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let bcs_id = bonsai.get_changeset_id();
|
||||
derive_unode_manifest(
|
||||
ctx,
|
||||
@ -89,7 +92,8 @@ impl BonsaiDerived for RootUnodeManifestId {
|
||||
get_file_changes(&bonsai),
|
||||
)
|
||||
.map(RootUnodeManifestId)
|
||||
.boxify()
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch
|
||||
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
|
||||
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
digest = "0.8"
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
futures-old = { package = "futures", version = "0.1" }
|
||||
|
@ -6,8 +6,10 @@
|
||||
*/
|
||||
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use futures::compat::Future01CompatExt;
|
||||
use futures::future::{ready, FutureExt, TryFutureExt};
|
||||
use futures_ext::{BoxFuture, FutureExt as _, StreamExt};
|
||||
use futures_old::{stream::futures_unordered, Future, IntoFuture, Stream};
|
||||
@ -82,6 +84,7 @@ impl BonsaiDerivedMapping for TreeMapping {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerived for TreeHandle {
|
||||
const NAME: &'static str = "git_trees";
|
||||
type Mapping = TreeMapping;
|
||||
@ -90,17 +93,17 @@ impl BonsaiDerived for TreeHandle {
|
||||
TreeMapping::new(repo.blobstore().boxed())
|
||||
}
|
||||
|
||||
fn derive_from_parents(
|
||||
async fn derive_from_parents(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
) -> BoxFuture<Self, Error> {
|
||||
) -> Result<Self, Error> {
|
||||
let blobstore = repo.get_blobstore();
|
||||
let changes = get_file_changes(&blobstore, &ctx, bonsai);
|
||||
changes
|
||||
.and_then(move |changes| derive_git_manifest(ctx, blobstore, parents, changes))
|
||||
.boxify()
|
||||
let changes = get_file_changes(&blobstore, &ctx, bonsai).compat().await?;
|
||||
derive_git_manifest(ctx, blobstore, parents, changes)
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user