mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 16:57:49 +03:00
mononoke/filestore: convert finalize to futures 0.3
Summary: Like it says in the title. This also lets us get rid of some macros we no longer need. Reviewed By: markbt Differential Revision: D24727259 fbshipit-source-id: 5e3211bc08fa5376b4cfce4bea0428ab7bf3dc0f
This commit is contained in:
parent
5adbccff53
commit
c2ad26f991
@ -8,9 +8,7 @@
|
||||
use anyhow::Error;
|
||||
use blobstore::{Blobstore, Storable};
|
||||
use context::CoreContext;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures_ext::{try_left_future, FutureExt};
|
||||
use futures_old::{Future, IntoFuture};
|
||||
use futures::future;
|
||||
use mononoke_types::{BlobstoreValue, ContentAlias, ContentMetadata};
|
||||
|
||||
use crate::errors::{ErrorKind, InvalidHash};
|
||||
@ -18,30 +16,28 @@ use crate::fetch_key::{Alias, AliasBlob};
|
||||
use crate::prepare::Prepared;
|
||||
use crate::StoreRequest;
|
||||
|
||||
// Verify that a given $expected hash matches the $effective hash, and otherwise return a left
|
||||
// future containing the $error.
|
||||
macro_rules! check_request_hash {
|
||||
($expected:expr, $effective:expr, $error:expr) => {
|
||||
if let Some(expected) = $expected {
|
||||
if *expected != $effective {
|
||||
return Err($error(InvalidHash {
|
||||
expected: *expected,
|
||||
effective: $effective,
|
||||
})
|
||||
.into())
|
||||
.into_future()
|
||||
.left_future();
|
||||
}
|
||||
fn check_hash<T: std::fmt::Debug + PartialEq + Copy>(
|
||||
expected: Option<T>,
|
||||
effective: T,
|
||||
) -> Result<(), InvalidHash<T>> {
|
||||
if let Some(expected) = expected {
|
||||
if expected != effective {
|
||||
return Err(InvalidHash {
|
||||
expected,
|
||||
effective,
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finalize<B: Blobstore + Clone>(
|
||||
pub async fn finalize<B: Blobstore + Clone>(
|
||||
blobstore: B,
|
||||
ctx: CoreContext,
|
||||
req: Option<&StoreRequest>,
|
||||
outcome: Prepared,
|
||||
) -> impl Future<Item = ContentMetadata, Error = Error> {
|
||||
) -> Result<ContentMetadata, Error> {
|
||||
let Prepared {
|
||||
sha1,
|
||||
sha256,
|
||||
@ -64,44 +60,24 @@ pub fn finalize<B: Blobstore + Clone>(
|
||||
git_sha1: req_git_sha1,
|
||||
} = req;
|
||||
|
||||
let _ = try_left_future!(expected_size.check_equals(total_size));
|
||||
expected_size.check_equals(total_size)?;
|
||||
|
||||
{
|
||||
use ErrorKind::*;
|
||||
check_request_hash!(req_content_id, content_id, InvalidContentId);
|
||||
check_request_hash!(req_sha1, sha1, InvalidSha1);
|
||||
check_request_hash!(req_sha256, sha256, InvalidSha256);
|
||||
check_request_hash!(req_git_sha1, git_sha1, InvalidGitSha1);
|
||||
check_hash(*req_content_id, content_id).map_err(InvalidContentId)?;
|
||||
check_hash(*req_sha1, sha1).map_err(InvalidSha1)?;
|
||||
check_hash(*req_sha256, sha256).map_err(InvalidSha256)?;
|
||||
check_hash(*req_git_sha1, git_sha1).map_err(InvalidGitSha1)?;
|
||||
}
|
||||
}
|
||||
|
||||
let put_contents = blob.store(ctx.clone(), &blobstore).compat();
|
||||
|
||||
let alias = ContentAlias::from_content_id(content_id);
|
||||
let put_sha1 = AliasBlob(Alias::Sha1(sha1), alias.clone()).store(ctx.clone(), &blobstore);
|
||||
let put_sha256 = AliasBlob(Alias::Sha256(sha256), alias.clone()).store(ctx.clone(), &blobstore);
|
||||
let put_git_sha1 =
|
||||
AliasBlob(Alias::GitSha1(git_sha1.sha1()), alias).store(ctx.clone(), &blobstore);
|
||||
|
||||
let put_sha1 = AliasBlob(Alias::Sha1(sha1), alias.clone())
|
||||
.store(ctx.clone(), &blobstore)
|
||||
.compat();
|
||||
|
||||
let put_sha256 = AliasBlob(Alias::Sha256(sha256), alias.clone())
|
||||
.store(ctx.clone(), &blobstore)
|
||||
.compat();
|
||||
|
||||
let put_git_sha1 = AliasBlob(Alias::GitSha1(git_sha1.sha1()), alias)
|
||||
.store(ctx.clone(), &blobstore)
|
||||
.compat();
|
||||
|
||||
let metadata = ContentMetadata {
|
||||
total_size,
|
||||
content_id,
|
||||
sha1,
|
||||
git_sha1,
|
||||
sha256,
|
||||
};
|
||||
|
||||
let put_metadata = metadata.clone().into_blob().store(ctx, &blobstore).compat();
|
||||
|
||||
// Since we don't have atomicity for puts, we need to make sure they're ordered
|
||||
// Since we don't have atomicity for multiple puts, we need to make sure they're ordered
|
||||
// correctly:
|
||||
//
|
||||
// - write the forward-mapping aliases
|
||||
@ -118,10 +94,20 @@ pub fn finalize<B: Blobstore + Clone>(
|
||||
// cache, as everything in it can be computed from the content id. Therefore, in principle,
|
||||
// if it doesn't get written we can fix it up later.
|
||||
|
||||
(put_sha1, put_sha256, put_git_sha1)
|
||||
.into_future()
|
||||
.and_then(move |_| put_contents)
|
||||
.and_then(move |_| put_metadata)
|
||||
.map(move |_| metadata)
|
||||
.right_future()
|
||||
|
||||
future::try_join3(put_sha1, put_sha256, put_git_sha1).await?;
|
||||
|
||||
blob.store(ctx.clone(), &blobstore).await?;
|
||||
|
||||
let metadata = ContentMetadata {
|
||||
total_size,
|
||||
content_id,
|
||||
sha1,
|
||||
git_sha1,
|
||||
sha256,
|
||||
};
|
||||
|
||||
metadata.clone().into_blob().store(ctx, &blobstore).await?;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ use std::convert::TryInto;
|
||||
use anyhow::Error;
|
||||
use cloned::cloned;
|
||||
use futures::{
|
||||
compat::{Future01CompatExt, Stream01CompatExt},
|
||||
compat::Stream01CompatExt,
|
||||
future::{FutureExt, TryFutureExt},
|
||||
stream::TryStreamExt,
|
||||
};
|
||||
@ -422,9 +422,7 @@ pub fn store<B: Blobstore + Clone>(
|
||||
}
|
||||
};
|
||||
|
||||
finalize::finalize(blobstore, ctx, Some(&req), prepared)
|
||||
.compat()
|
||||
.await
|
||||
finalize::finalize(blobstore, ctx, Some(&req), prepared).await
|
||||
}
|
||||
.boxed()
|
||||
.compat()
|
||||
|
Loading…
Reference in New Issue
Block a user