mirror of
https://github.com/facebook/sapling.git
synced 2024-10-12 01:39:21 +03:00
move Upload*
structs to mercurial crate
Summary: Move upload logic for mercurial data out of blobrepo crate Reviewed By: StanislavGlebik Differential Revision: D17366158 fbshipit-source-id: 9f1cdf4fbe67552b12fd1ef94f9c7de1be632988
This commit is contained in:
parent
8d43883d93
commit
86c0892ff2
@ -26,14 +26,14 @@ use scuba_ext::ScubaSampleBuilder;
|
||||
use tokio::executor::DefaultExecutor;
|
||||
use tracing::{trace_args, EventId, Traced};
|
||||
|
||||
use blobrepo::{
|
||||
BlobRepo, ChangesetHandle, ContentBlobMeta, CreateChangeset, UploadHgFileContents,
|
||||
UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry,
|
||||
};
|
||||
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset};
|
||||
use lfs_import_lib::lfs_upload;
|
||||
use mercurial_revlog::{manifest, RevlogChangeset, RevlogEntry, RevlogRepo};
|
||||
use mercurial_types::{
|
||||
blobs::{ChangesetMetadata, File, HgBlobChangeset, HgBlobEntry, LFSContent},
|
||||
blobs::{
|
||||
ChangesetMetadata, ContentBlobMeta, File, HgBlobChangeset, HgBlobEntry, LFSContent,
|
||||
UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry,
|
||||
},
|
||||
HgBlob, HgChangesetId, HgFileNodeId, HgManifestId, HgNodeHash, MPath, RepoPath, Type,
|
||||
NULL_HASH,
|
||||
};
|
||||
|
@ -4,17 +4,17 @@
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use std::fmt;
|
||||
#![deny(warnings)]
|
||||
|
||||
use ascii::AsciiString;
|
||||
use failure_ext::failure;
|
||||
use failure_ext::Fail;
|
||||
|
||||
use mercurial_types::{
|
||||
blobs::HgBlobChangeset, HgBlob, HgChangesetId, HgFileNodeId, HgManifestId, HgNodeHash,
|
||||
HgParents, MPath, RepoPath, Type,
|
||||
};
|
||||
use mononoke_types::{hash::Sha256, ChangesetId, ContentId};
|
||||
use mononoke_types::{hash::Sha256, ChangesetId};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum StateOpenError {
|
||||
@ -65,8 +65,6 @@ pub enum ErrorKind {
|
||||
)]
|
||||
FileContentsDeserializeFailed(String),
|
||||
#[fail(display = "Content blob missing for id: {}", _0)]
|
||||
ContentBlobMissing(ContentId),
|
||||
#[fail(display = "Content blob missing for id: {}", _0)]
|
||||
ContentBlobByAliasMissing(Sha256),
|
||||
#[fail(display = "Uploaded blob is incomplete {:?}", _0)]
|
||||
BadUploadBlob(HgBlob),
|
||||
@ -92,11 +90,6 @@ pub enum ErrorKind {
|
||||
MissingManifests,
|
||||
#[fail(display = "Expected {} to be a manifest, found a {} instead", _0, _1)]
|
||||
NotAManifest(HgNodeHash, Type),
|
||||
#[fail(
|
||||
display = "Inconsistent node hash for entry: path {}, provided: {}, computed: {}",
|
||||
_0, _1, _2
|
||||
)]
|
||||
InconsistentEntryHash(RepoPath, HgNodeHash, HgNodeHash),
|
||||
#[fail(
|
||||
display = "Inconsistent node hash for changeset: provided: {}, \
|
||||
computed: {} for blob: {:#?}",
|
||||
|
@ -5,9 +5,6 @@
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use crate::errors::ErrorKind;
|
||||
use crate::repo::{
|
||||
ContentBlobMeta, UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry,
|
||||
};
|
||||
use crate::utils::{IncompleteFilenodeInfo, IncompleteFilenodes};
|
||||
use blobstore::Blobstore;
|
||||
use cloned::cloned;
|
||||
@ -17,7 +14,10 @@ use futures::{future, Future, IntoFuture};
|
||||
use futures_ext::FutureExt;
|
||||
use manifest::{derive_manifest, Entry, LeafInfo, TreeInfo};
|
||||
use mercurial_types::{
|
||||
blobs::{fetch_file_envelope, HgBlobEntry},
|
||||
blobs::{
|
||||
fetch_file_envelope, ContentBlobMeta, HgBlobEntry, UploadHgFileContents, UploadHgFileEntry,
|
||||
UploadHgNodeHash, UploadHgTreeEntry,
|
||||
},
|
||||
HgEntry, HgEntryId, HgFileNodeId, HgManifestId,
|
||||
};
|
||||
use mononoke_types::{FileType, MPath, RepoPath};
|
||||
|
@ -15,16 +15,12 @@
|
||||
mod bonsai_generation;
|
||||
pub mod derive_hg_manifest;
|
||||
pub mod file_history;
|
||||
mod filenode_lookup;
|
||||
mod repo;
|
||||
mod repo_commit;
|
||||
mod utils;
|
||||
|
||||
pub use crate::errors::*;
|
||||
pub use crate::repo::{
|
||||
save_bonsai_changesets, BlobRepo, ContentBlobInfo, ContentBlobMeta, CreateChangeset,
|
||||
UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry,
|
||||
};
|
||||
pub use crate::repo::{save_bonsai_changesets, BlobRepo, CreateChangeset};
|
||||
pub use crate::repo_commit::ChangesetHandle;
|
||||
pub use changeset_fetcher::ChangesetFetcher;
|
||||
// TODO: This is exported for testing - is this the right place for it?
|
||||
|
@ -8,7 +8,6 @@ use super::utils::{DangerousOverride, IncompleteFilenodeInfo, IncompleteFilenode
|
||||
use crate::bonsai_generation::{create_bonsai_changeset_object, save_bonsai_changeset_object};
|
||||
use crate::derive_hg_manifest::derive_hg_manifest;
|
||||
use crate::errors::*;
|
||||
use crate::filenode_lookup::{lookup_filenode_id, store_filenode_id, FileNodeIdPointer};
|
||||
use crate::repo_commit::*;
|
||||
use blobstore::{Blobstore, Loadable, LoadableError};
|
||||
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry, BonsaiOrHgChangesetIds};
|
||||
@ -21,13 +20,11 @@ use changeset_fetcher::{ChangesetFetcher, SimpleChangesetFetcher};
|
||||
use changesets::{ChangesetEntry, ChangesetInsert, Changesets};
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use failure_ext::{
|
||||
bail_err, format_err, prelude::*, Error, FutureFailureErrorExt, FutureFailureExt, Result,
|
||||
};
|
||||
use failure_ext::{format_err, prelude::*, Error, FutureFailureErrorExt, FutureFailureExt};
|
||||
use filenodes::{FilenodeInfo, Filenodes};
|
||||
use filestore::{self, Alias, FetchKey, FilestoreConfig, StoreRequest};
|
||||
use futures::future::{self, loop_fn, ok, Either, Future, Loop};
|
||||
use futures::stream::{self, once, FuturesUnordered, Stream};
|
||||
use futures::future::{self, loop_fn, ok, Future, Loop};
|
||||
use futures::stream::{self, FuturesUnordered, Stream};
|
||||
use futures::sync::oneshot;
|
||||
use futures::IntoFuture;
|
||||
use futures_ext::{spawn_future, try_boxfuture, BoxFuture, BoxStream, FutureExt, StreamExt};
|
||||
@ -39,18 +36,17 @@ use mercurial_types::{
|
||||
fetch_file_content_from_blobstore, fetch_file_content_id_from_blobstore,
|
||||
fetch_file_content_sha256_from_blobstore, fetch_file_contents, fetch_file_envelope,
|
||||
fetch_file_metadata_from_blobstore, fetch_file_parents_from_blobstore,
|
||||
fetch_file_size_from_blobstore, BlobManifest, ChangesetMetadata, File, HgBlobChangeset,
|
||||
HgBlobEntry, HgBlobEnvelope, HgChangesetContent, META_SZ,
|
||||
fetch_file_size_from_blobstore, BlobManifest, ChangesetMetadata, ContentBlobMeta,
|
||||
HgBlobChangeset, HgBlobEntry, HgBlobEnvelope, HgChangesetContent, UploadHgFileContents,
|
||||
UploadHgFileEntry, UploadHgNodeHash,
|
||||
},
|
||||
calculate_hg_node_id_stream,
|
||||
manifest::Content,
|
||||
Changeset, FileBytes, HgBlobNode, HgChangesetId, HgEntry, HgEntryId, HgFileEnvelope,
|
||||
HgFileEnvelopeMut, HgFileNodeId, HgManifest, HgManifestEnvelopeMut, HgManifestId, HgNodeHash,
|
||||
HgParents, RepoPath, Type,
|
||||
Changeset, FileBytes, HgChangesetId, HgEntry, HgEntryId, HgFileEnvelope, HgFileNodeId,
|
||||
HgManifest, HgManifestId, HgNodeHash, HgParents, RepoPath, Type,
|
||||
};
|
||||
use mononoke_types::{
|
||||
hash::Sha256, Blob, BlobstoreBytes, BlobstoreValue, BonsaiChangeset, ChangesetId, ContentId,
|
||||
ContentMetadata, FileChange, FileType, Generation, MPath, MononokeId, RepositoryId, Timestamp,
|
||||
ContentMetadata, FileChange, Generation, MPath, MononokeId, RepositoryId, Timestamp,
|
||||
};
|
||||
use repo_blobstore::{RepoBlobstore, RepoBlobstoreArgs};
|
||||
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
|
||||
@ -102,8 +98,6 @@ define_stats! {
|
||||
get_generation_number: timeseries(RATE, SUM),
|
||||
get_generation_number_by_bonsai: timeseries(RATE, SUM),
|
||||
upload_blob: timeseries(RATE, SUM),
|
||||
upload_hg_file_entry: timeseries(RATE, SUM),
|
||||
upload_hg_tree_entry: timeseries(RATE, SUM),
|
||||
create_changeset: timeseries(RATE, SUM),
|
||||
create_changeset_compute_cf: timeseries("create_changeset.compute_changed_files"; RATE, SUM),
|
||||
create_changeset_expected_cf: timeseries("create_changeset.expected_changed_files"; RATE, SUM),
|
||||
@ -1674,456 +1668,6 @@ impl BlobRepo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Node hash handling for upload entries
|
||||
pub enum UploadHgNodeHash {
|
||||
/// Generate the hash from the uploaded content
|
||||
Generate,
|
||||
/// This hash is used as the blobstore key, even if it doesn't match the hash of the
|
||||
/// parents and raw content. This is done because in some cases like root tree manifests
|
||||
/// in hybrid mode, Mercurial sends fake hashes.
|
||||
Supplied(HgNodeHash),
|
||||
/// As Supplied, but Verify the supplied hash - if it's wrong, you will get an error.
|
||||
Checked(HgNodeHash),
|
||||
}
|
||||
|
||||
/// Context for uploading a Mercurial manifest entry.
|
||||
pub struct UploadHgTreeEntry {
|
||||
pub upload_node_id: UploadHgNodeHash,
|
||||
pub contents: Bytes,
|
||||
pub p1: Option<HgNodeHash>,
|
||||
pub p2: Option<HgNodeHash>,
|
||||
pub path: RepoPath,
|
||||
}
|
||||
|
||||
impl UploadHgTreeEntry {
|
||||
// Given the content of a manifest, ensure that there is a matching HgBlobEntry in the repo.
|
||||
// This may not upload the entry or the data blob if the repo is aware of that data already
|
||||
// existing in the underlying store.
|
||||
//
|
||||
// Note that the HgBlobEntry may not be consistent - parents do not have to be uploaded at this
|
||||
// point, as long as you know their HgNodeHashes; this is also given to you as part of the
|
||||
// result type, so that you can parallelise uploads. Consistency will be verified when
|
||||
// adding the entries to a changeset.
|
||||
// adding the entries to a changeset.
|
||||
pub fn upload(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: Arc<dyn Blobstore>,
|
||||
) -> Result<(HgNodeHash, BoxFuture<(HgBlobEntry, RepoPath), Error>)> {
|
||||
STATS::upload_hg_tree_entry.add_value(1);
|
||||
let UploadHgTreeEntry {
|
||||
upload_node_id,
|
||||
contents,
|
||||
p1,
|
||||
p2,
|
||||
path,
|
||||
} = self;
|
||||
|
||||
let logger = ctx.logger().clone();
|
||||
let computed_node_id = HgBlobNode::new(contents.clone(), p1, p2).nodeid();
|
||||
let node_id: HgNodeHash = match upload_node_id {
|
||||
UploadHgNodeHash::Generate => computed_node_id,
|
||||
UploadHgNodeHash::Supplied(node_id) => node_id,
|
||||
UploadHgNodeHash::Checked(node_id) => {
|
||||
if node_id != computed_node_id {
|
||||
bail_err!(ErrorKind::InconsistentEntryHash(
|
||||
path,
|
||||
node_id,
|
||||
computed_node_id
|
||||
));
|
||||
}
|
||||
node_id
|
||||
}
|
||||
};
|
||||
|
||||
// This is the blob that gets uploaded. Manifest contents are usually small so they're
|
||||
// stored inline.
|
||||
let envelope = HgManifestEnvelopeMut {
|
||||
node_id,
|
||||
p1,
|
||||
p2,
|
||||
computed_node_id,
|
||||
contents,
|
||||
};
|
||||
let envelope_blob = envelope.freeze().into_blob();
|
||||
|
||||
let manifest_id = HgManifestId::new(node_id);
|
||||
let blobstore_key = manifest_id.blobstore_key();
|
||||
|
||||
let blob_entry = match path.mpath().and_then(|m| m.into_iter().last()) {
|
||||
Some(m) => {
|
||||
let entry_path = m.clone();
|
||||
HgBlobEntry::new(blobstore.clone(), entry_path, node_id, Type::Tree)
|
||||
}
|
||||
None => HgBlobEntry::new_root(blobstore.clone(), manifest_id),
|
||||
};
|
||||
|
||||
fn log_upload_stats(
|
||||
logger: Logger,
|
||||
path: RepoPath,
|
||||
node_id: HgNodeHash,
|
||||
computed_node_id: HgNodeHash,
|
||||
stats: FutureStats,
|
||||
) {
|
||||
trace!(logger, "Upload HgManifestEnvelope stats";
|
||||
"phase" => "manifest_envelope_uploaded".to_string(),
|
||||
"path" => format!("{}", path),
|
||||
"node_id" => format!("{}", node_id),
|
||||
"computed_node_id" => format!("{}", computed_node_id),
|
||||
"poll_count" => stats.poll_count,
|
||||
"poll_time_us" => stats.poll_time.as_micros_unchecked(),
|
||||
"completion_time_us" => stats.completion_time.as_micros_unchecked(),
|
||||
);
|
||||
}
|
||||
|
||||
// Upload the blob.
|
||||
let upload = blobstore
|
||||
.put(ctx, blobstore_key, envelope_blob.into())
|
||||
.map({
|
||||
let path = path.clone();
|
||||
move |()| (blob_entry, path)
|
||||
})
|
||||
.timed({
|
||||
let logger = logger.clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
log_upload_stats(logger, path, node_id, computed_node_id, stats);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
Ok((node_id, upload.boxify()))
|
||||
}
|
||||
}
|
||||
|
||||
/// What sort of file contents are available to upload.
|
||||
pub enum UploadHgFileContents {
|
||||
/// Content already uploaded (or scheduled to be uploaded). Metadata will be inlined in
|
||||
/// the envelope.
|
||||
ContentUploaded(ContentBlobMeta),
|
||||
/// Raw bytes as would be sent by Mercurial, including any metadata prepended in the standard
|
||||
/// Mercurial format.
|
||||
RawBytes(Bytes),
|
||||
}
|
||||
|
||||
impl UploadHgFileContents {
|
||||
/// Upload the file contents if necessary, and asynchronously return the hash of the file node
|
||||
/// and metadata.
|
||||
fn execute(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
p1: Option<HgFileNodeId>,
|
||||
p2: Option<HgFileNodeId>,
|
||||
path: MPath,
|
||||
) -> (
|
||||
ContentBlobInfo,
|
||||
// The future that does the upload and the future that computes the node ID/metadata are
|
||||
// split up to allow greater parallelism.
|
||||
impl Future<Item = (), Error = Error> + Send,
|
||||
impl Future<Item = (HgFileNodeId, Bytes, u64), Error = Error> + Send,
|
||||
) {
|
||||
let (cbinfo, upload_fut, compute_fut) = match self {
|
||||
UploadHgFileContents::ContentUploaded(cbmeta) => {
|
||||
let upload_fut = future::ok(());
|
||||
|
||||
let size = cbmeta.size;
|
||||
let cbinfo = ContentBlobInfo { path, meta: cbmeta };
|
||||
|
||||
let lookup_fut = lookup_filenode_id(
|
||||
ctx.clone(),
|
||||
&*blobstore,
|
||||
FileNodeIdPointer::new(&cbinfo.meta.id, &cbinfo.meta.copy_from, &p1, &p2),
|
||||
);
|
||||
|
||||
let metadata_fut = Self::compute_metadata(
|
||||
ctx.clone(),
|
||||
blobstore,
|
||||
cbinfo.meta.id,
|
||||
cbinfo.meta.copy_from.clone(),
|
||||
);
|
||||
|
||||
let content_id = cbinfo.meta.id;
|
||||
|
||||
// Attempt to lookup filenode ID by alias. Fallback to computing it if we cannot.
|
||||
let compute_fut = (lookup_fut, metadata_fut).into_future().and_then({
|
||||
cloned!(ctx, blobstore);
|
||||
move |(res, metadata)| {
|
||||
res.ok_or(())
|
||||
.into_future()
|
||||
.or_else({
|
||||
cloned!(metadata);
|
||||
move |_| {
|
||||
Self::compute_filenode_id(
|
||||
ctx, &blobstore, content_id, metadata, p1, p2,
|
||||
)
|
||||
}
|
||||
})
|
||||
.map(move |fnid| (fnid, metadata, size))
|
||||
}
|
||||
});
|
||||
|
||||
(cbinfo, upload_fut.left_future(), compute_fut.left_future())
|
||||
}
|
||||
UploadHgFileContents::RawBytes(raw_content) => {
|
||||
let node_id = HgFileNodeId::new(
|
||||
HgBlobNode::new(
|
||||
raw_content.clone(),
|
||||
p1.map(HgFileNodeId::into_nodehash),
|
||||
p2.map(HgFileNodeId::into_nodehash),
|
||||
)
|
||||
.nodeid(),
|
||||
);
|
||||
|
||||
let f = File::new(raw_content, p1, p2);
|
||||
let metadata = f.metadata();
|
||||
|
||||
let copy_from = match f.copied_from() {
|
||||
Ok(copy_from) => copy_from,
|
||||
// XXX error out if copy-from information couldn't be read?
|
||||
Err(_err) => None,
|
||||
};
|
||||
// Upload the contents separately (they'll be used for bonsai changesets as well).
|
||||
let file_bytes = f.file_contents();
|
||||
|
||||
STATS::upload_blob.add_value(1);
|
||||
let (contents, upload_fut) =
|
||||
filestore::store_bytes(blobstore.clone(), ctx.clone(), file_bytes.into_bytes());
|
||||
|
||||
let upload_fut = upload_fut.timed({
|
||||
cloned!(path);
|
||||
let logger = ctx.logger().clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
UploadHgFileEntry::log_stats(
|
||||
logger,
|
||||
path,
|
||||
node_id,
|
||||
"content_uploaded",
|
||||
stats,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
let id = contents.content_id();
|
||||
let size = contents.size();
|
||||
|
||||
let cbinfo = ContentBlobInfo {
|
||||
path,
|
||||
meta: ContentBlobMeta {
|
||||
id,
|
||||
size,
|
||||
copy_from,
|
||||
},
|
||||
};
|
||||
|
||||
let compute_fut = future::ok((node_id, metadata, size));
|
||||
|
||||
(
|
||||
cbinfo,
|
||||
upload_fut.right_future(),
|
||||
compute_fut.right_future(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let key = FileNodeIdPointer::new(&cbinfo.meta.id, &cbinfo.meta.copy_from, &p1, &p2);
|
||||
|
||||
let compute_fut = compute_fut.and_then({
|
||||
cloned!(ctx, blobstore);
|
||||
move |(filenode_id, metadata, size)| {
|
||||
store_filenode_id(ctx, &blobstore, key, &filenode_id)
|
||||
.map(move |_| (filenode_id, metadata, size))
|
||||
}
|
||||
});
|
||||
|
||||
(cbinfo, upload_fut, compute_fut)
|
||||
}
|
||||
|
||||
fn compute_metadata(
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
content_id: ContentId,
|
||||
copy_from: Option<(MPath, HgFileNodeId)>,
|
||||
) -> impl Future<Item = Bytes, Error = Error> {
|
||||
filestore::peek(&*blobstore, ctx, &FetchKey::Canonical(content_id), META_SZ)
|
||||
.and_then(move |bytes| bytes.ok_or(ErrorKind::ContentBlobMissing(content_id).into()))
|
||||
.context("While computing metadata")
|
||||
.from_err()
|
||||
.map(move |bytes| {
|
||||
let mut metadata = Vec::new();
|
||||
File::generate_metadata(copy_from.as_ref(), &FileBytes(bytes), &mut metadata)
|
||||
.expect("Vec::write_all should never fail");
|
||||
|
||||
// TODO: Introduce Metadata bytes?
|
||||
Bytes::from(metadata)
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_filenode_id(
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
content_id: ContentId,
|
||||
metadata: Bytes,
|
||||
p1: Option<HgFileNodeId>,
|
||||
p2: Option<HgFileNodeId>,
|
||||
) -> impl Future<Item = HgFileNodeId, Error = Error> {
|
||||
let file_bytes = filestore::fetch(&*blobstore, ctx, &FetchKey::Canonical(content_id))
|
||||
.and_then(move |stream| stream.ok_or(ErrorKind::ContentBlobMissing(content_id).into()))
|
||||
.flatten_stream();
|
||||
|
||||
let all_bytes = once(Ok(metadata)).chain(file_bytes);
|
||||
|
||||
let hg_parents = HgParents::new(
|
||||
p1.map(HgFileNodeId::into_nodehash),
|
||||
p2.map(HgFileNodeId::into_nodehash),
|
||||
);
|
||||
|
||||
calculate_hg_node_id_stream(all_bytes, &hg_parents)
|
||||
.map(HgFileNodeId::new)
|
||||
.context("While computing a filenode id")
|
||||
.from_err()
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for uploading a Mercurial file entry.
|
||||
pub struct UploadHgFileEntry {
|
||||
pub upload_node_id: UploadHgNodeHash,
|
||||
pub contents: UploadHgFileContents,
|
||||
pub file_type: FileType,
|
||||
pub p1: Option<HgFileNodeId>,
|
||||
pub p2: Option<HgFileNodeId>,
|
||||
pub path: MPath,
|
||||
}
|
||||
|
||||
impl UploadHgFileEntry {
|
||||
pub fn upload(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: Arc<dyn Blobstore>,
|
||||
) -> Result<(ContentBlobInfo, BoxFuture<(HgBlobEntry, RepoPath), Error>)> {
|
||||
STATS::upload_hg_file_entry.add_value(1);
|
||||
let UploadHgFileEntry {
|
||||
upload_node_id,
|
||||
contents,
|
||||
file_type,
|
||||
p1,
|
||||
p2,
|
||||
path,
|
||||
} = self;
|
||||
|
||||
let (cbinfo, content_upload, compute_fut) =
|
||||
contents.execute(ctx.clone(), &blobstore, p1, p2, path.clone());
|
||||
let content_id = cbinfo.meta.id;
|
||||
let logger = ctx.logger().clone();
|
||||
|
||||
let envelope_upload =
|
||||
compute_fut.and_then(move |(computed_node_id, metadata, content_size)| {
|
||||
let node_id = match upload_node_id {
|
||||
UploadHgNodeHash::Generate => computed_node_id,
|
||||
UploadHgNodeHash::Supplied(node_id) => HgFileNodeId::new(node_id),
|
||||
UploadHgNodeHash::Checked(node_id) => {
|
||||
let node_id = HgFileNodeId::new(node_id);
|
||||
if node_id != computed_node_id {
|
||||
return Either::A(future::err(
|
||||
ErrorKind::InconsistentEntryHash(
|
||||
RepoPath::FilePath(path),
|
||||
node_id.into_nodehash(),
|
||||
computed_node_id.into_nodehash(),
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
node_id
|
||||
}
|
||||
};
|
||||
|
||||
let file_envelope = HgFileEnvelopeMut {
|
||||
node_id,
|
||||
p1,
|
||||
p2,
|
||||
content_id,
|
||||
content_size,
|
||||
metadata,
|
||||
};
|
||||
let envelope_blob = file_envelope.freeze().into_blob();
|
||||
|
||||
let blobstore_key = node_id.blobstore_key();
|
||||
|
||||
let blob_entry = HgBlobEntry::new(
|
||||
blobstore.clone(),
|
||||
path.basename().clone(),
|
||||
node_id.into_nodehash(),
|
||||
Type::File(file_type),
|
||||
);
|
||||
|
||||
let envelope_upload = blobstore
|
||||
.put(ctx, blobstore_key, envelope_blob.into())
|
||||
.timed({
|
||||
let path = path.clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
Self::log_stats(
|
||||
logger,
|
||||
path,
|
||||
node_id,
|
||||
"file_envelope_uploaded",
|
||||
stats,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.map(move |()| (blob_entry, RepoPath::FilePath(path)));
|
||||
Either::B(envelope_upload)
|
||||
});
|
||||
|
||||
let fut = envelope_upload
|
||||
.join(content_upload)
|
||||
.map(move |(envelope_res, ())| envelope_res);
|
||||
Ok((cbinfo, fut.boxify()))
|
||||
}
|
||||
|
||||
fn log_stats(
|
||||
logger: Logger,
|
||||
path: MPath,
|
||||
nodeid: HgFileNodeId,
|
||||
phase: &str,
|
||||
stats: FutureStats,
|
||||
) {
|
||||
let path = format!("{}", path);
|
||||
let nodeid = format!("{}", nodeid);
|
||||
trace!(logger, "Upload blob stats";
|
||||
"phase" => String::from(phase),
|
||||
"path" => path,
|
||||
"nodeid" => nodeid,
|
||||
"poll_count" => stats.poll_count,
|
||||
"poll_time_us" => stats.poll_time.as_micros_unchecked(),
|
||||
"completion_time_us" => stats.completion_time.as_micros_unchecked(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a content blob associated with a push that is available in
|
||||
/// the blobstore. (This blob wasn't necessarily uploaded in this push.)
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ContentBlobInfo {
|
||||
pub path: MPath,
|
||||
pub meta: ContentBlobMeta,
|
||||
}
|
||||
|
||||
/// Metadata associated with a content blob being uploaded as part of changeset creation.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ContentBlobMeta {
|
||||
pub id: ContentId,
|
||||
pub size: u64,
|
||||
// The copy info will later be stored as part of the commit.
|
||||
pub copy_from: Option<(MPath, HgFileNodeId)>,
|
||||
}
|
||||
|
||||
/// This function uploads bonsai changests object to blobstore in parallel, and then does
|
||||
/// sequential writes to changesets table. Parents of the changesets should already by saved
|
||||
/// in the repository.
|
||||
|
@ -10,10 +10,7 @@ mod tracing_blobstore;
|
||||
mod utils;
|
||||
|
||||
use benchmark_lib::{new_benchmark_repo, DelaySettings, GenManifest};
|
||||
use blobrepo::{
|
||||
compute_changed_files, BlobRepo, ContentBlobMeta, UploadHgFileContents, UploadHgFileEntry,
|
||||
UploadHgNodeHash,
|
||||
};
|
||||
use blobrepo::{compute_changed_files, BlobRepo};
|
||||
use blobstore::Storable;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
@ -25,8 +22,9 @@ use futures_ext::{BoxFuture, FutureExt};
|
||||
use maplit::btreemap;
|
||||
use memblob::LazyMemblob;
|
||||
use mercurial_types::{
|
||||
blobs::File, manifest, Changeset, FileType, HgChangesetId, HgEntry, HgFileNodeId, HgManifestId,
|
||||
HgParents, MPath, MPathElement, RepoPath,
|
||||
blobs::{ContentBlobMeta, File, UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash},
|
||||
manifest, Changeset, FileType, HgChangesetId, HgEntry, HgFileNodeId, HgManifestId, HgParents,
|
||||
MPath, MPathElement, RepoPath,
|
||||
};
|
||||
use mercurial_types_mocks::nodehash::ONES_FNID;
|
||||
use mononoke_types::bonsai_changeset::BonsaiChangesetMut;
|
||||
|
@ -16,15 +16,15 @@ use futures::stream::futures_unordered;
|
||||
use futures_ext::{BoxFuture, StreamExt};
|
||||
use scuba_ext::ScubaSampleBuilder;
|
||||
|
||||
use blobrepo::{
|
||||
BlobRepo, ChangesetHandle, CreateChangeset, UploadHgFileContents, UploadHgFileEntry,
|
||||
UploadHgNodeHash, UploadHgTreeEntry,
|
||||
};
|
||||
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset};
|
||||
use blobrepo_factory::new_memblob_empty;
|
||||
use context::CoreContext;
|
||||
use memblob::{EagerMemblob, LazyMemblob};
|
||||
use mercurial_types::{
|
||||
blobs::{ChangesetMetadata, HgBlobEntry},
|
||||
blobs::{
|
||||
ChangesetMetadata, HgBlobEntry, UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash,
|
||||
UploadHgTreeEntry,
|
||||
},
|
||||
FileType, HgBlobNode, HgFileNodeId, HgNodeHash, MPath, RepoPath,
|
||||
};
|
||||
use mononoke_types::DateTime;
|
||||
|
@ -18,13 +18,13 @@ use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
|
||||
use heapsize::HeapSizeOf;
|
||||
use quickcheck::{Arbitrary, Gen};
|
||||
|
||||
use blobrepo::{
|
||||
BlobRepo, ContentBlobInfo, ContentBlobMeta, UploadHgFileContents, UploadHgFileEntry,
|
||||
UploadHgNodeHash,
|
||||
};
|
||||
use blobrepo::BlobRepo;
|
||||
use mercurial_bundles::changegroup::CgDeltaChunk;
|
||||
use mercurial_types::{
|
||||
blobs::{File, HgBlobEntry},
|
||||
blobs::{
|
||||
ContentBlobInfo, ContentBlobMeta, File, HgBlobEntry, UploadHgFileContents,
|
||||
UploadHgFileEntry, UploadHgNodeHash,
|
||||
},
|
||||
delta, parse_rev_flags, Delta, FileType, HgFileNodeId, HgNodeHash, HgNodeKey, MPath, RepoPath,
|
||||
RevFlags, NULL_HASH,
|
||||
};
|
||||
|
@ -13,7 +13,7 @@ use crate::stats::*;
|
||||
use crate::upload_blobs::{upload_hg_blobs, UploadBlobsType, UploadableHgBlob};
|
||||
use crate::upload_changesets::upload_changeset;
|
||||
use ascii::AsciiString;
|
||||
use blobrepo::{BlobRepo, ChangesetHandle, ContentBlobInfo};
|
||||
use blobrepo::{BlobRepo, ChangesetHandle};
|
||||
use bookmarks::{BookmarkName, BookmarkUpdateReason, BundleReplayData, Transaction};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use cloned::cloned;
|
||||
@ -32,7 +32,10 @@ use mercurial_bundles::{
|
||||
PartHeaderType, PartId,
|
||||
};
|
||||
use mercurial_revlog::changeset::RevlogChangeset;
|
||||
use mercurial_types::{blobs::HgBlobEntry, HgChangesetId, HgNodeKey, RepoPath};
|
||||
use mercurial_types::{
|
||||
blobs::{ContentBlobInfo, HgBlobEntry},
|
||||
HgChangesetId, HgNodeKey, RepoPath,
|
||||
};
|
||||
use metaconfig_types::{BookmarkAttrs, InfinitepushParams, PushrebaseParams, RepoReadOnly};
|
||||
use mononoke_types::{BlobstoreValue, ChangesetId, RawBundle2, RawBundle2Id};
|
||||
use phases::{self, Phases};
|
||||
|
@ -11,10 +11,13 @@ use failure_ext::ensure_msg;
|
||||
use futures::{future::Shared, Future, Stream};
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
|
||||
use blobrepo::{BlobRepo, UploadHgNodeHash, UploadHgTreeEntry};
|
||||
use blobrepo::BlobRepo;
|
||||
use context::CoreContext;
|
||||
use mercurial_revlog::manifest::ManifestContent;
|
||||
use mercurial_types::{blobs::HgBlobEntry, HgNodeHash, HgNodeKey};
|
||||
use mercurial_types::{
|
||||
blobs::{HgBlobEntry, UploadHgNodeHash, UploadHgTreeEntry},
|
||||
HgNodeHash, HgNodeKey,
|
||||
};
|
||||
use mononoke_types::RepoPath;
|
||||
use wirepack::TreemanifestEntry;
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
use crate::errors::*;
|
||||
use crate::stats::*;
|
||||
use crate::upload_blobs::UploadableHgBlob;
|
||||
use blobrepo::{BlobRepo, ChangesetHandle, ContentBlobInfo, CreateChangeset};
|
||||
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset};
|
||||
use context::CoreContext;
|
||||
use failure::Compat;
|
||||
use failure_ext::bail_msg;
|
||||
@ -21,7 +21,7 @@ use mercurial_revlog::{
|
||||
manifest::{Details, ManifestContent},
|
||||
};
|
||||
use mercurial_types::{
|
||||
blobs::{ChangesetMetadata, HgBlobEntry},
|
||||
blobs::{ChangesetMetadata, ContentBlobInfo, HgBlobEntry},
|
||||
HgChangesetId, HgManifestId, HgNodeHash, HgNodeKey, MPath, RepoPath, NULL_HASH,
|
||||
};
|
||||
use scuba_ext::ScubaSampleBuilder;
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
use crate::{HgFileNodeId, HgNodeHash, Type};
|
||||
use failure::Fail;
|
||||
use mononoke_types::ContentId;
|
||||
use mononoke_types::{ContentId, RepoPath};
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum ErrorKind {
|
||||
@ -31,4 +31,9 @@ pub enum ErrorKind {
|
||||
ManifestDeserializeFailed(String),
|
||||
#[fail(display = "Incorrect LFS file content {}", _0)]
|
||||
IncorrectLfsFileContent(String),
|
||||
#[fail(
|
||||
display = "Inconsistent node hash for entry: path {}, provided: {}, computed: {}",
|
||||
_0, _1, _2
|
||||
)]
|
||||
InconsistentEntryHash(RepoPath, HgNodeHash, HgNodeHash),
|
||||
}
|
||||
|
@ -18,12 +18,12 @@
|
||||
// slow (and use up quite a bit of RAM, though that's something we can mitigate by
|
||||
// streaming file contents).
|
||||
|
||||
use crate::HgFileNodeId;
|
||||
use ascii::AsciiString;
|
||||
use blobstore::Blobstore;
|
||||
use context::CoreContext;
|
||||
use failure_ext::Error;
|
||||
use futures::Future;
|
||||
use mercurial_types::HgFileNodeId;
|
||||
use mononoke_types::{BlobstoreBytes, ContentId, MPath};
|
||||
|
||||
#[derive(Debug, Eq, Hash, PartialEq)]
|
||||
@ -94,68 +94,3 @@ pub fn lookup_filenode_id<B: Blobstore>(
|
||||
maybe_blob.and_then(|blob| HgFileNodeId::from_bytes(blob.as_bytes()).ok())
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use failure_ext::{err_msg, Error};
|
||||
use mercurial_types::RepoPath;
|
||||
use mercurial_types_mocks::nodehash::{FOURS_FNID, ONES_FNID, THREES_FNID, TWOS_FNID};
|
||||
use mononoke_types_mocks::contentid::{ONES_CTID, TWOS_CTID};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[test]
|
||||
fn test_hashes_are_unique() -> Result<(), Error> {
|
||||
let mut h = HashSet::new();
|
||||
|
||||
for content_id in [ONES_CTID, TWOS_CTID].iter() {
|
||||
for p1 in [Some(ONES_FNID), Some(TWOS_FNID), None].iter() {
|
||||
for p2 in [Some(THREES_FNID), Some(FOURS_FNID), None].iter() {
|
||||
let path1 = RepoPath::file("path")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path1"))?;
|
||||
|
||||
let path2 = RepoPath::file("path/2")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path2"))?;
|
||||
|
||||
let path3 = RepoPath::file("path2")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path3"))?;
|
||||
|
||||
for copy_path in [path1, path2, path3].iter() {
|
||||
for copy_parent in [ONES_FNID, TWOS_FNID, THREES_FNID].iter() {
|
||||
let copy_info = Some((copy_path.clone(), copy_parent.clone()));
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, ©_info, p1, p2);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
|
||||
if p1 == p2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, ©_info, p2, p1);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
}
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, &None, p1, p2);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
|
||||
if p1 == p2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, &None, p2, p1);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -28,3 +28,11 @@ pub use changeset::{
|
||||
serialize_cs, serialize_extras, ChangesetMetadata, Extra, HgBlobChangeset, HgChangesetContent,
|
||||
RevlogChangeset,
|
||||
};
|
||||
|
||||
pub mod filenode_lookup;
|
||||
|
||||
mod upload;
|
||||
pub use upload::{
|
||||
ContentBlobInfo, ContentBlobMeta, UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash,
|
||||
UploadHgTreeEntry,
|
||||
};
|
||||
|
485
mercurial/types/src/blobs/upload.rs
Normal file
485
mercurial/types/src/blobs/upload.rs
Normal file
@ -0,0 +1,485 @@
|
||||
// Copyright (c) 2004-present, Facebook, Inc.
|
||||
// All Rights Reserved.
|
||||
//
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use super::filenode_lookup::{lookup_filenode_id, store_filenode_id, FileNodeIdPointer};
|
||||
use super::{errors::ErrorKind, File, HgBlobEntry, META_SZ};
|
||||
use crate::{
|
||||
calculate_hg_node_id_stream, FileBytes, HgBlobNode, HgFileEnvelopeMut, HgFileNodeId,
|
||||
HgManifestEnvelopeMut, HgManifestId, HgNodeHash, HgParents, Type,
|
||||
};
|
||||
use blobstore::Blobstore;
|
||||
use bytes::Bytes;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use failure::Error;
|
||||
use failure_ext::{bail_err, FutureFailureErrorExt, Result};
|
||||
use filestore::{self, FetchKey};
|
||||
use futures::{future, stream, Future, IntoFuture, Stream};
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use futures_stats::{FutureStats, Timed};
|
||||
use mononoke_types::{ContentId, FileType, MPath, RepoPath};
|
||||
use slog::{trace, Logger};
|
||||
use stats::{define_stats, Timeseries};
|
||||
use std::sync::Arc;
|
||||
use time_ext::DurationExt;
|
||||
|
||||
define_stats! {
|
||||
prefix = "mononoke.blobrepo";
|
||||
upload_hg_file_entry: timeseries(RATE, SUM),
|
||||
upload_hg_tree_entry: timeseries(RATE, SUM),
|
||||
upload_blob: timeseries(RATE, SUM),
|
||||
}
|
||||
|
||||
/// Information about a content blob associated with a push that is available in
|
||||
/// the blobstore. (This blob wasn't necessarily uploaded in this push.)
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ContentBlobInfo {
|
||||
pub path: MPath,
|
||||
pub meta: ContentBlobMeta,
|
||||
}
|
||||
|
||||
/// Metadata associated with a content blob being uploaded as part of changeset creation.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ContentBlobMeta {
|
||||
pub id: ContentId,
|
||||
pub size: u64,
|
||||
// The copy info will later be stored as part of the commit.
|
||||
pub copy_from: Option<(MPath, HgFileNodeId)>,
|
||||
}
|
||||
|
||||
/// Node hash handling for upload entries
|
||||
pub enum UploadHgNodeHash {
|
||||
/// Generate the hash from the uploaded content
|
||||
Generate,
|
||||
/// This hash is used as the blobstore key, even if it doesn't match the hash of the
|
||||
/// parents and raw content. This is done because in some cases like root tree manifests
|
||||
/// in hybrid mode, Mercurial sends fake hashes.
|
||||
Supplied(HgNodeHash),
|
||||
/// As Supplied, but Verify the supplied hash - if it's wrong, you will get an error.
|
||||
Checked(HgNodeHash),
|
||||
}
|
||||
|
||||
/// Context for uploading a Mercurial manifest entry.
|
||||
pub struct UploadHgTreeEntry {
|
||||
pub upload_node_id: UploadHgNodeHash,
|
||||
pub contents: Bytes,
|
||||
pub p1: Option<HgNodeHash>,
|
||||
pub p2: Option<HgNodeHash>,
|
||||
pub path: RepoPath,
|
||||
}
|
||||
|
||||
impl UploadHgTreeEntry {
|
||||
// Given the content of a manifest, ensure that there is a matching HgBlobEntry in the repo.
|
||||
// This may not upload the entry or the data blob if the repo is aware of that data already
|
||||
// existing in the underlying store.
|
||||
//
|
||||
// Note that the HgBlobEntry may not be consistent - parents do not have to be uploaded at this
|
||||
// point, as long as you know their HgNodeHashes; this is also given to you as part of the
|
||||
// result type, so that you can parallelise uploads. Consistency will be verified when
|
||||
// adding the entries to a changeset.
|
||||
// adding the entries to a changeset.
|
||||
pub fn upload(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: Arc<dyn Blobstore>,
|
||||
) -> Result<(HgNodeHash, BoxFuture<(HgBlobEntry, RepoPath), Error>)> {
|
||||
STATS::upload_hg_tree_entry.add_value(1);
|
||||
let UploadHgTreeEntry {
|
||||
upload_node_id,
|
||||
contents,
|
||||
p1,
|
||||
p2,
|
||||
path,
|
||||
} = self;
|
||||
|
||||
let logger = ctx.logger().clone();
|
||||
let computed_node_id = HgBlobNode::new(contents.clone(), p1, p2).nodeid();
|
||||
let node_id: HgNodeHash = match upload_node_id {
|
||||
UploadHgNodeHash::Generate => computed_node_id,
|
||||
UploadHgNodeHash::Supplied(node_id) => node_id,
|
||||
UploadHgNodeHash::Checked(node_id) => {
|
||||
if node_id != computed_node_id {
|
||||
bail_err!(ErrorKind::InconsistentEntryHash(
|
||||
path,
|
||||
node_id,
|
||||
computed_node_id
|
||||
));
|
||||
}
|
||||
node_id
|
||||
}
|
||||
};
|
||||
|
||||
// This is the blob that gets uploaded. Manifest contents are usually small so they're
|
||||
// stored inline.
|
||||
let envelope = HgManifestEnvelopeMut {
|
||||
node_id,
|
||||
p1,
|
||||
p2,
|
||||
computed_node_id,
|
||||
contents,
|
||||
};
|
||||
let envelope_blob = envelope.freeze().into_blob();
|
||||
|
||||
let manifest_id = HgManifestId::new(node_id);
|
||||
let blobstore_key = manifest_id.blobstore_key();
|
||||
|
||||
let blob_entry = match path.mpath().and_then(|m| m.into_iter().last()) {
|
||||
Some(m) => {
|
||||
let entry_path = m.clone();
|
||||
HgBlobEntry::new(blobstore.clone(), entry_path, node_id, Type::Tree)
|
||||
}
|
||||
None => HgBlobEntry::new_root(blobstore.clone(), manifest_id),
|
||||
};
|
||||
|
||||
fn log_upload_stats(
|
||||
logger: Logger,
|
||||
path: RepoPath,
|
||||
node_id: HgNodeHash,
|
||||
computed_node_id: HgNodeHash,
|
||||
stats: FutureStats,
|
||||
) {
|
||||
trace!(logger, "Upload HgManifestEnvelope stats";
|
||||
"phase" => "manifest_envelope_uploaded".to_string(),
|
||||
"path" => format!("{}", path),
|
||||
"node_id" => format!("{}", node_id),
|
||||
"computed_node_id" => format!("{}", computed_node_id),
|
||||
"poll_count" => stats.poll_count,
|
||||
"poll_time_us" => stats.poll_time.as_micros_unchecked(),
|
||||
"completion_time_us" => stats.completion_time.as_micros_unchecked(),
|
||||
);
|
||||
}
|
||||
|
||||
// Upload the blob.
|
||||
let upload = blobstore
|
||||
.put(ctx, blobstore_key, envelope_blob.into())
|
||||
.map({
|
||||
let path = path.clone();
|
||||
move |()| (blob_entry, path)
|
||||
})
|
||||
.timed({
|
||||
let logger = logger.clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
log_upload_stats(logger, path, node_id, computed_node_id, stats);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
Ok((node_id, upload.boxify()))
|
||||
}
|
||||
}
|
||||
|
||||
/// What sort of file contents are available to upload.
|
||||
pub enum UploadHgFileContents {
|
||||
/// Content already uploaded (or scheduled to be uploaded). Metadata will be inlined in
|
||||
/// the envelope.
|
||||
ContentUploaded(ContentBlobMeta),
|
||||
/// Raw bytes as would be sent by Mercurial, including any metadata prepended in the standard
|
||||
/// Mercurial format.
|
||||
RawBytes(Bytes),
|
||||
}
|
||||
|
||||
impl UploadHgFileContents {
|
||||
/// Upload the file contents if necessary, and asynchronously return the hash of the file node
|
||||
/// and metadata.
|
||||
fn execute(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
p1: Option<HgFileNodeId>,
|
||||
p2: Option<HgFileNodeId>,
|
||||
path: MPath,
|
||||
) -> (
|
||||
ContentBlobInfo,
|
||||
// The future that does the upload and the future that computes the node ID/metadata are
|
||||
// split up to allow greater parallelism.
|
||||
impl Future<Item = (), Error = Error> + Send,
|
||||
impl Future<Item = (HgFileNodeId, Bytes, u64), Error = Error> + Send,
|
||||
) {
|
||||
let (cbinfo, upload_fut, compute_fut) = match self {
|
||||
UploadHgFileContents::ContentUploaded(cbmeta) => {
|
||||
let upload_fut = future::ok(());
|
||||
|
||||
let size = cbmeta.size;
|
||||
let cbinfo = ContentBlobInfo { path, meta: cbmeta };
|
||||
|
||||
let lookup_fut = lookup_filenode_id(
|
||||
ctx.clone(),
|
||||
&*blobstore,
|
||||
FileNodeIdPointer::new(&cbinfo.meta.id, &cbinfo.meta.copy_from, &p1, &p2),
|
||||
);
|
||||
|
||||
let metadata_fut = Self::compute_metadata(
|
||||
ctx.clone(),
|
||||
blobstore,
|
||||
cbinfo.meta.id,
|
||||
cbinfo.meta.copy_from.clone(),
|
||||
);
|
||||
|
||||
let content_id = cbinfo.meta.id;
|
||||
|
||||
// Attempt to lookup filenode ID by alias. Fallback to computing it if we cannot.
|
||||
let compute_fut = (lookup_fut, metadata_fut).into_future().and_then({
|
||||
cloned!(ctx, blobstore);
|
||||
move |(res, metadata)| {
|
||||
res.ok_or(())
|
||||
.into_future()
|
||||
.or_else({
|
||||
cloned!(metadata);
|
||||
move |_| {
|
||||
Self::compute_filenode_id(
|
||||
ctx, &blobstore, content_id, metadata, p1, p2,
|
||||
)
|
||||
}
|
||||
})
|
||||
.map(move |fnid| (fnid, metadata, size))
|
||||
}
|
||||
});
|
||||
|
||||
(cbinfo, upload_fut.left_future(), compute_fut.left_future())
|
||||
}
|
||||
UploadHgFileContents::RawBytes(raw_content) => {
|
||||
let node_id = HgFileNodeId::new(
|
||||
HgBlobNode::new(
|
||||
raw_content.clone(),
|
||||
p1.map(HgFileNodeId::into_nodehash),
|
||||
p2.map(HgFileNodeId::into_nodehash),
|
||||
)
|
||||
.nodeid(),
|
||||
);
|
||||
|
||||
let f = File::new(raw_content, p1, p2);
|
||||
let metadata = f.metadata();
|
||||
|
||||
let copy_from = match f.copied_from() {
|
||||
Ok(copy_from) => copy_from,
|
||||
// XXX error out if copy-from information couldn't be read?
|
||||
Err(_err) => None,
|
||||
};
|
||||
// Upload the contents separately (they'll be used for bonsai changesets as well).
|
||||
let file_bytes = f.file_contents();
|
||||
|
||||
STATS::upload_blob.add_value(1);
|
||||
let (contents, upload_fut) =
|
||||
filestore::store_bytes(blobstore.clone(), ctx.clone(), file_bytes.into_bytes());
|
||||
|
||||
let upload_fut = upload_fut.timed({
|
||||
cloned!(path);
|
||||
let logger = ctx.logger().clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
UploadHgFileEntry::log_stats(
|
||||
logger,
|
||||
path,
|
||||
node_id,
|
||||
"content_uploaded",
|
||||
stats,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
let id = contents.content_id();
|
||||
let size = contents.size();
|
||||
|
||||
let cbinfo = ContentBlobInfo {
|
||||
path,
|
||||
meta: ContentBlobMeta {
|
||||
id,
|
||||
size,
|
||||
copy_from,
|
||||
},
|
||||
};
|
||||
|
||||
let compute_fut = future::ok((node_id, metadata, size));
|
||||
|
||||
(
|
||||
cbinfo,
|
||||
upload_fut.right_future(),
|
||||
compute_fut.right_future(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let key = FileNodeIdPointer::new(&cbinfo.meta.id, &cbinfo.meta.copy_from, &p1, &p2);
|
||||
|
||||
let compute_fut = compute_fut.and_then({
|
||||
cloned!(ctx, blobstore);
|
||||
move |(filenode_id, metadata, size)| {
|
||||
store_filenode_id(ctx, &blobstore, key, &filenode_id)
|
||||
.map(move |_| (filenode_id, metadata, size))
|
||||
}
|
||||
});
|
||||
|
||||
(cbinfo, upload_fut, compute_fut)
|
||||
}
|
||||
|
||||
fn compute_metadata(
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
content_id: ContentId,
|
||||
copy_from: Option<(MPath, HgFileNodeId)>,
|
||||
) -> impl Future<Item = Bytes, Error = Error> {
|
||||
filestore::peek(&*blobstore, ctx, &FetchKey::Canonical(content_id), META_SZ)
|
||||
.and_then(move |bytes| bytes.ok_or(ErrorKind::ContentBlobMissing(content_id).into()))
|
||||
.context("While computing metadata")
|
||||
.from_err()
|
||||
.map(move |bytes| {
|
||||
let mut metadata = Vec::new();
|
||||
File::generate_metadata(copy_from.as_ref(), &FileBytes(bytes), &mut metadata)
|
||||
.expect("Vec::write_all should never fail");
|
||||
|
||||
// TODO: Introduce Metadata bytes?
|
||||
Bytes::from(metadata)
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_filenode_id(
|
||||
ctx: CoreContext,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
content_id: ContentId,
|
||||
metadata: Bytes,
|
||||
p1: Option<HgFileNodeId>,
|
||||
p2: Option<HgFileNodeId>,
|
||||
) -> impl Future<Item = HgFileNodeId, Error = Error> {
|
||||
let file_bytes = filestore::fetch(&*blobstore, ctx, &FetchKey::Canonical(content_id))
|
||||
.and_then(move |stream| stream.ok_or(ErrorKind::ContentBlobMissing(content_id).into()))
|
||||
.flatten_stream();
|
||||
|
||||
let all_bytes = stream::once(Ok(metadata)).chain(file_bytes);
|
||||
|
||||
let hg_parents = HgParents::new(
|
||||
p1.map(HgFileNodeId::into_nodehash),
|
||||
p2.map(HgFileNodeId::into_nodehash),
|
||||
);
|
||||
|
||||
calculate_hg_node_id_stream(all_bytes, &hg_parents)
|
||||
.map(HgFileNodeId::new)
|
||||
.context("While computing a filenode id")
|
||||
.from_err()
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for uploading a Mercurial file entry.
|
||||
pub struct UploadHgFileEntry {
|
||||
pub upload_node_id: UploadHgNodeHash,
|
||||
pub contents: UploadHgFileContents,
|
||||
pub file_type: FileType,
|
||||
pub p1: Option<HgFileNodeId>,
|
||||
pub p2: Option<HgFileNodeId>,
|
||||
pub path: MPath,
|
||||
}
|
||||
|
||||
impl UploadHgFileEntry {
|
||||
pub fn upload(
|
||||
self,
|
||||
ctx: CoreContext,
|
||||
blobstore: Arc<dyn Blobstore>,
|
||||
) -> Result<(ContentBlobInfo, BoxFuture<(HgBlobEntry, RepoPath), Error>)> {
|
||||
STATS::upload_hg_file_entry.add_value(1);
|
||||
let UploadHgFileEntry {
|
||||
upload_node_id,
|
||||
contents,
|
||||
file_type,
|
||||
p1,
|
||||
p2,
|
||||
path,
|
||||
} = self;
|
||||
|
||||
let (cbinfo, content_upload, compute_fut) =
|
||||
contents.execute(ctx.clone(), &blobstore, p1, p2, path.clone());
|
||||
let content_id = cbinfo.meta.id;
|
||||
let logger = ctx.logger().clone();
|
||||
|
||||
let envelope_upload =
|
||||
compute_fut.and_then(move |(computed_node_id, metadata, content_size)| {
|
||||
let node_id = match upload_node_id {
|
||||
UploadHgNodeHash::Generate => computed_node_id,
|
||||
UploadHgNodeHash::Supplied(node_id) => HgFileNodeId::new(node_id),
|
||||
UploadHgNodeHash::Checked(node_id) => {
|
||||
let node_id = HgFileNodeId::new(node_id);
|
||||
if node_id != computed_node_id {
|
||||
return future::err(
|
||||
ErrorKind::InconsistentEntryHash(
|
||||
RepoPath::FilePath(path),
|
||||
node_id.into_nodehash(),
|
||||
computed_node_id.into_nodehash(),
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
.left_future();
|
||||
}
|
||||
node_id
|
||||
}
|
||||
};
|
||||
|
||||
let file_envelope = HgFileEnvelopeMut {
|
||||
node_id,
|
||||
p1,
|
||||
p2,
|
||||
content_id,
|
||||
content_size,
|
||||
metadata,
|
||||
};
|
||||
let envelope_blob = file_envelope.freeze().into_blob();
|
||||
|
||||
let blobstore_key = node_id.blobstore_key();
|
||||
|
||||
let blob_entry = HgBlobEntry::new(
|
||||
blobstore.clone(),
|
||||
path.basename().clone(),
|
||||
node_id.into_nodehash(),
|
||||
Type::File(file_type),
|
||||
);
|
||||
|
||||
blobstore
|
||||
.put(ctx, blobstore_key, envelope_blob.into())
|
||||
.timed({
|
||||
let path = path.clone();
|
||||
move |stats, result| {
|
||||
if result.is_ok() {
|
||||
Self::log_stats(
|
||||
logger,
|
||||
path,
|
||||
node_id,
|
||||
"file_envelope_uploaded",
|
||||
stats,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.map(move |()| (blob_entry, RepoPath::FilePath(path)))
|
||||
.right_future()
|
||||
});
|
||||
|
||||
let fut = envelope_upload
|
||||
.join(content_upload)
|
||||
.map(move |(envelope_res, ())| envelope_res);
|
||||
Ok((cbinfo, fut.boxify()))
|
||||
}
|
||||
|
||||
fn log_stats(
|
||||
logger: Logger,
|
||||
path: MPath,
|
||||
nodeid: HgFileNodeId,
|
||||
phase: &str,
|
||||
stats: FutureStats,
|
||||
) {
|
||||
let path = format!("{}", path);
|
||||
let nodeid = format!("{}", nodeid);
|
||||
trace!(logger, "Upload blob stats";
|
||||
"phase" => String::from(phase),
|
||||
"path" => path,
|
||||
"nodeid" => nodeid,
|
||||
"poll_count" => stats.poll_count,
|
||||
"poll_time_us" => stats.poll_time.as_micros_unchecked(),
|
||||
"completion_time_us" => stats.completion_time.as_micros_unchecked(),
|
||||
);
|
||||
}
|
||||
}
|
@ -10,13 +10,14 @@
|
||||
use blobrepo::BlobRepo;
|
||||
use bytes::Bytes;
|
||||
use context::CoreContext;
|
||||
use failure::{err_msg, Error};
|
||||
use fbinit::FacebookInit;
|
||||
use fixtures::{linear, many_files_dirs};
|
||||
use futures::executor::spawn;
|
||||
use futures::{Future, Stream};
|
||||
use maplit::{btreemap, hashset};
|
||||
use mercurial_types::{
|
||||
blobs::{File, LFSContent, META_MARKER, META_SZ},
|
||||
blobs::{filenode_lookup::FileNodeIdPointer, File, LFSContent, META_MARKER, META_SZ},
|
||||
manifest::{Content, HgEmptyManifest},
|
||||
manifest_utils::{
|
||||
changed_entry_stream_with_pruner, diff_sorted_vecs, recursive_entry_stream, ChangedEntry,
|
||||
@ -28,9 +29,10 @@ use mercurial_types::{
|
||||
};
|
||||
use mercurial_types_mocks::{
|
||||
manifest::{ContentFactory, MockEntry, MockManifest},
|
||||
nodehash,
|
||||
nodehash::{self, FOURS_FNID, ONES_FNID, THREES_FNID, TWOS_FNID},
|
||||
};
|
||||
use mononoke_types::hash::Sha256;
|
||||
use mononoke_types_mocks::contentid::{ONES_CTID, TWOS_CTID};
|
||||
use quickcheck::quickcheck;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
@ -1210,3 +1212,58 @@ quickcheck! {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hashes_are_unique() -> Result<(), Error> {
|
||||
let mut h = HashSet::new();
|
||||
|
||||
for content_id in [ONES_CTID, TWOS_CTID].iter() {
|
||||
for p1 in [Some(ONES_FNID), Some(TWOS_FNID), None].iter() {
|
||||
for p2 in [Some(THREES_FNID), Some(FOURS_FNID), None].iter() {
|
||||
let path1 = RepoPath::file("path")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path1"))?;
|
||||
|
||||
let path2 = RepoPath::file("path/2")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path2"))?;
|
||||
|
||||
let path3 = RepoPath::file("path2")?
|
||||
.into_mpath()
|
||||
.ok_or(err_msg("path3"))?;
|
||||
|
||||
for copy_path in [path1, path2, path3].iter() {
|
||||
for copy_parent in [ONES_FNID, TWOS_FNID, THREES_FNID].iter() {
|
||||
let copy_info = Some((copy_path.clone(), copy_parent.clone()));
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, ©_info, p1, p2);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
|
||||
if p1 == p2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, ©_info, p2, p1);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
}
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, &None, p1, p2);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
|
||||
if p1 == p2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ptr = FileNodeIdPointer::new(&content_id, &None, p2, p1);
|
||||
assert!(!h.contains(&ptr), format!("Duplicate entry: {:?}", ptr));
|
||||
h.insert(ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user