mononoke/blobrepo: repo_commit: update old future type names

Summary: I'm going to asyncify some things here. Let's start with this.

Reviewed By: farnz

Differential Revision: D21451761

fbshipit-source-id: 64c78de4ab640b826a3ec1d6d84149d46f225024
This commit is contained in:
Thomas Orozco 2020-05-12 06:41:30 -07:00 committed by Facebook GitHub Bot
parent 2d09c375dd
commit 58abcc937a

View File

@ -12,9 +12,14 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error, Result};
use cloned::cloned;
use failure_ext::{Compat, FutureFailureErrorExt, StreamFailureErrorExt};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_old::future::{self, Future, Shared, SharedError, SharedItem};
use futures_old::stream::{self, Stream};
use futures_ext::{
BoxFuture as OldBoxFuture, BoxStream as OldBoxStream, FutureExt as OldFutureExt,
StreamExt as OldStreamExt,
};
use futures_old::future::{
self as old_future, Future as OldFuture, Shared, SharedError, SharedItem,
};
use futures_old::stream::{self as old_stream, Stream as OldStream};
use futures_old::sync::oneshot;
use futures_old::IntoFuture;
use futures_stats::Timed;
@ -56,19 +61,19 @@ define_stats! {
/// See `get_completed_changeset()` for the public API you can use to extract the final changeset
#[derive(Clone)]
pub struct ChangesetHandle {
can_be_parent: Shared<BoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
can_be_parent: Shared<OldBoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
// * Shared is required here because a single changeset can have more than one child, and
// all of those children will want to refer to the corresponding future for their parents.
// * The Compat<Error> here is because the error type for Shared (a cloneable wrapper called
// SharedError) doesn't implement Fail, and only implements Error if the wrapped type
// implements Error.
completion_future: Shared<BoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
completion_future: Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
}
impl ChangesetHandle {
pub fn new_pending(
can_be_parent: Shared<BoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
completion_future: Shared<BoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
can_be_parent: Shared<OldBoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
completion_future: Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
) -> Self {
Self {
can_be_parent,
@ -117,7 +122,7 @@ impl ChangesetHandle {
pub fn get_completed_changeset(
self,
) -> Shared<BoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>> {
) -> Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>> {
self.completion_future
}
}
@ -163,9 +168,9 @@ impl UploadEntries {
ctx: CoreContext,
entry: &HgBlobEntry,
path: RepoPath,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
if entry.get_type() != manifest::Type::Tree {
future::err(
old_future::err(
ErrorKind::NotAManifest(entry.get_hash().into_nodehash(), entry.get_type()).into(),
)
.boxify()
@ -179,7 +184,7 @@ impl UploadEntries {
ctx: CoreContext,
entry: &HgBlobEntry,
path: RepoPath,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
let inner_mutex = self.inner.clone();
entry
.get_parents(ctx)
@ -191,7 +196,7 @@ impl UploadEntries {
});
inner.parents.extend(node_keys);
future::ok(())
old_future::ok(())
})
.map(|_| ())
.boxify()
@ -206,9 +211,9 @@ impl UploadEntries {
&self,
ctx: CoreContext,
entry: &HgBlobEntry,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
if entry.get_type() != manifest::Type::Tree {
return future::err(
return old_future::err(
ErrorKind::NotAManifest(entry.get_hash().into_nodehash(), entry.get_type()).into(),
)
.boxify();
@ -221,7 +226,7 @@ impl UploadEntries {
ctx: CoreContext,
entry: &HgBlobEntry,
path: RepoPath,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
{
let mut inner = self.inner.lock().expect("Lock poisoned");
inner.uploaded_entries.insert(path.clone(), entry.clone());
@ -258,7 +263,7 @@ impl UploadEntries {
blobstore: RepoBlobstore,
node_id: HgNodeHash,
is_tree: bool,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
let key = if is_tree {
HgManifestId::new(node_id).blobstore_key()
} else {
@ -272,7 +277,7 @@ impl UploadEntries {
ctx: CoreContext,
mf_id: HgManifestId,
parent_manifest_ids: Vec<HgManifestId>,
) -> BoxFuture<(), Error> {
) -> OldBoxFuture<(), Error> {
let required_checks = {
let inner = self.inner.lock().expect("Lock poisoned");
let blobstore = inner.blobstore.clone();
@ -340,7 +345,7 @@ impl UploadEntries {
STATS::finalize_parent.add_value(checks.len() as i64);
future::join_all(checks).timed({
old_future::join_all(checks).timed({
let mut scuba_logger = self.scuba_logger();
move |stats, result| {
if result.is_ok() {
@ -385,7 +390,7 @@ fn compute_changed_files_pair(
to: HgManifestId,
from: HgManifestId,
repo: BlobRepo,
) -> BoxFuture<HashSet<MPath>, Error> {
) -> OldBoxFuture<HashSet<MPath>, Error> {
from.diff(ctx.clone(), repo.get_blobstore(), to)
.filter_map(|diff| {
let (path, entry) = match diff {
@ -400,7 +405,7 @@ fn compute_changed_files_pair(
})
.fold(HashSet::new(), |mut set, path| {
set.insert(path);
future::ok::<_, Error>(set)
old_future::ok::<_, Error>(set)
})
.boxify()
}
@ -422,7 +427,7 @@ pub fn compute_changed_files(
root: HgManifestId,
p1: Option<HgManifestId>,
p2: Option<HgManifestId>,
) -> BoxFuture<Vec<MPath>, Error> {
) -> OldBoxFuture<Vec<MPath>, Error> {
match (p1, p2) {
(None, None) => root
.list_leaf_entries(ctx.clone(), repo.get_blobstore())
@ -470,7 +475,7 @@ fn compute_removed_files(
child: HgManifestId,
parent: Option<HgManifestId>,
repo: BlobRepo,
) -> impl Future<Item = Vec<MPath>, Error = Error> {
) -> impl OldFuture<Item = Vec<MPath>, Error = Error> {
compute_files_with_status(ctx, child, parent, repo, move |diff| match diff {
Diff::Removed(path, entry) => match entry {
Entry::Leaf(_) => path,
@ -486,7 +491,7 @@ fn compute_files_with_status(
parent: Option<HgManifestId>,
repo: BlobRepo,
filter_map: impl Fn(Diff<Entry<HgManifestId, (FileType, HgFileNodeId)>>) -> Option<MPath>,
) -> impl Future<Item = Vec<MPath>, Error = Error> {
) -> impl OldFuture<Item = Vec<MPath>, Error = Error> {
let s = match parent {
Some(parent) => parent
.diff(ctx.clone(), repo.get_blobstore(), child)
@ -509,7 +514,7 @@ pub fn check_case_conflicts(
repo: BlobRepo,
child_root_mf: HgManifestId,
parent_root_mf: Option<HgManifestId>,
) -> impl Future<Item = (), Error = Error> {
) -> impl OldFuture<Item = (), Error = Error> {
compute_files_with_status(
ctx.clone(),
child_root_mf,
@ -530,7 +535,7 @@ pub fn check_case_conflicts(
cloned!(ctx);
move |added_files| match parent_root_mf {
Some(parent_root_mf) => {
let mut case_conflict_checks = stream::FuturesUnordered::new();
let mut case_conflict_checks = old_stream::FuturesUnordered::new();
for f in added_files {
case_conflict_checks.push(
repo.check_case_conflict_in_manifest(
@ -577,16 +582,16 @@ fn mercurial_mpath_comparator(a: &MPath, b: &MPath) -> ::std::cmp::Ordering {
pub fn process_entries(
ctx: CoreContext,
entry_processor: &UploadEntries,
root_manifest: BoxFuture<Option<(HgBlobEntry, RepoPath)>, Error>,
new_child_entries: BoxStream<(HgBlobEntry, RepoPath), Error>,
) -> BoxFuture<HgManifestId, Error> {
root_manifest: OldBoxFuture<Option<(HgBlobEntry, RepoPath)>, Error>,
new_child_entries: OldBoxStream<(HgBlobEntry, RepoPath), Error>,
) -> OldBoxFuture<HgManifestId, Error> {
let root_manifest_fut = root_manifest
.context("While uploading root manifest")
.from_err()
.and_then({
cloned!(ctx, entry_processor);
move |root_manifest| match root_manifest {
None => future::ok(None).boxify(),
None => old_future::ok(None).boxify(),
Some((entry, path)) => {
let hash = entry.get_hash().into_nodehash();
if entry.get_type() == manifest::Type::Tree && path == RepoPath::RootPath {
@ -595,7 +600,7 @@ pub fn process_entries(
.map(move |_| Some(hash))
.boxify()
} else {
future::err(Error::from(ErrorKind::BadRootManifest(entry.get_type())))
old_future::err(Error::from(ErrorKind::BadRootManifest(entry.get_type())))
.boxify()
}
}
@ -610,14 +615,14 @@ pub fn process_entries(
move |(entry, path)| entry_processor.process_one_entry(ctx.clone(), &entry, path)
})
.buffer_unordered(100)
.for_each(|()| future::ok(()));
.for_each(|()| old_future::ok(()));
let mut scuba_logger = entry_processor.scuba_logger();
root_manifest_fut
.join(child_entries_fut)
.and_then(move |(root_hash, ())| match root_hash {
None => future::ok(HgManifestId::new(NULL_HASH)).boxify(),
Some(root_hash) => future::ok(HgManifestId::new(root_hash)).boxify(),
None => old_future::ok(HgManifestId::new(NULL_HASH)).boxify(),
Some(root_hash) => old_future::ok(HgManifestId::new(root_hash)).boxify(),
})
.timed(move |stats, result| {
if result.is_ok() {
@ -633,13 +638,13 @@ pub fn process_entries(
pub fn extract_parents_complete(
p1: &Option<ChangesetHandle>,
p2: &Option<ChangesetHandle>,
) -> BoxFuture<SharedItem<()>, SharedError<Compat<Error>>> {
) -> OldBoxFuture<SharedItem<()>, SharedError<Compat<Error>>> {
match (p1.as_ref(), p2.as_ref()) {
(None, None) => future::ok(()).shared().boxify(),
(None, None) => old_future::ok(()).shared().boxify(),
(Some(p), None) | (None, Some(p)) => p
.completion_future
.clone()
.and_then(|_| future::ok(()).shared())
.and_then(|_| old_future::ok(()).shared())
.boxify(),
(Some(p1), Some(p2)) => p1
.completion_future
@ -667,7 +672,7 @@ pub fn extract_parents_complete(
let p2_completion_future = p2.completion_future.clone();
move |_| p2_completion_future
})
.and_then(|_| future::ok(()).shared())
.and_then(|_| old_future::ok(()).shared())
.boxify(),
}
.boxify()
@ -677,16 +682,16 @@ pub fn handle_parents(
mut scuba_logger: ScubaSampleBuilder,
p1: Option<ChangesetHandle>,
p2: Option<ChangesetHandle>,
) -> BoxFuture<(HgParents, Vec<HgManifestId>, Vec<ChangesetId>), Error> {
) -> OldBoxFuture<(HgParents, Vec<HgManifestId>, Vec<ChangesetId>), Error> {
let p1 = p1.map(|cs| cs.can_be_parent);
let p2 = p2.map(|cs| cs.can_be_parent);
let p1 = match p1 {
Some(p1) => p1.map(Some).boxify(),
None => future::ok(None).boxify(),
None => old_future::ok(None).boxify(),
};
let p2 = match p2 {
Some(p2) => p2.map(Some).boxify(),
None => future::ok(None).boxify(),
None => old_future::ok(None).boxify(),
};
// DO NOT replace and_then() with join() or futures_ordered()!