blobrepo: simplify code for converting bonsai FileChange to hg

Summary:
Fetching the blob is still required to compute the node hash, but we don't have
to reupload it.

Reviewed By: farnz

Differential Revision: D8508462

fbshipit-source-id: 341a1a2f82d8f8b939ebf3990b3467ed7ad9244c
This commit is contained in:
Rain ⁣ 2018-06-20 13:18:25 -07:00 committed by Facebook Github Bot
parent 4eca2ec2d9
commit 972822e218
7 changed files with 295 additions and 171 deletions

View File

@ -70,8 +70,8 @@ pub use errors::*;
pub use changeset::BlobChangeset;
pub use file::HgBlobEntry;
pub use manifest::BlobManifest;
pub use repo::{BlobRepo, ContentBlobInfo, ContentBlobMeta, CreateChangeset, UploadHgFileEntry,
UploadHgNodeHash, UploadHgTreeEntry};
pub use repo::{BlobRepo, ContentBlobInfo, ContentBlobMeta, CreateChangeset, UploadHgFileContents,
UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry};
pub use repo_commit::ChangesetHandle;
// TODO: This is exported for testing - is this the right place for it?
pub use repo_commit::compute_changed_files;

View File

@ -5,7 +5,6 @@
// GNU General Public License version 2 or any later version.
use std::collections::{BTreeMap, HashSet};
use std::io::Write;
use std::mem;
use std::path::Path;
use std::sync::{mpsc, Arc};
@ -40,8 +39,8 @@ use mercurial_types::{Changeset, Entry, HgBlob, HgBlobNode, HgChangesetId, HgFil
HgFileNodeId, HgManifestEnvelopeMut, HgManifestId, HgNodeHash, HgParents,
Manifest, RepoPath, RepositoryId, Type};
use mercurial_types::manifest::Content;
use mononoke_types::{Blob, BlobstoreValue, BonsaiChangeset, ChangesetId, ContentId, DateTime,
FileChange, FileContents, FileType, MPath, MPathElement, MononokeId};
use mononoke_types::{Blob, BlobstoreValue, BonsaiChangeset, ContentId, DateTime, FileChange,
FileContents, FileType, MPath, MPathElement, MononokeId};
use rocksblob::Rocksblob;
use rocksdb;
use tokio_core::reactor::Core;
@ -535,44 +534,26 @@ impl BlobRepo {
path: &MPath,
change: Option<&FileChange>,
) -> impl Future<Item = Option<HgBlobEntry>, Error = Error> + Send {
fn prepend_metadata(
content: Bytes,
_copy_from: Option<&(MPath, ChangesetId)>,
) -> Result<Bytes> {
let mut buf = Vec::new();
File::generate_copied_from(
None, //FIXME: we need external {ChangesetId -> HgNodeHash} mapping
&mut buf,
)?;
buf.write(content.as_ref())?;
Ok(buf.into())
}
match change {
None => Either::A(future::ok(None)),
Some(change) => {
let upload_future = self.fetch(change.content_id()).and_then({
let repo = self.clone();
let change = change.clone();
let path = path.clone();
move |file_content| {
let hg_content = try_boxfuture!(prepend_metadata(
file_content.into_bytes(),
change.copy_from()
));
let upload_entry = UploadHgFileEntry {
upload_node_id: UploadHgNodeHash::Generate,
contents: hg_content,
file_type: change.file_type(),
p1,
p2,
path: path,
};
let (_, _, upload_future) = try_boxfuture!(upload_entry.upload(&repo));
upload_future.map(|(entry, _)| Some(entry)).boxify()
}
});
Either::B(upload_future)
let upload_entry = UploadHgFileEntry {
upload_node_id: UploadHgNodeHash::Generate,
contents: UploadHgFileContents::ContentUploaded(ContentBlobMeta {
id: *change.content_id(),
// FIXME: need external {ChangesetID -> HgNodeHash} mapping
copy_from: None,
}),
file_type: change.file_type(),
p1,
p2,
path: path.clone(),
};
let upload_fut = match upload_entry.upload(self) {
Ok((_, upload_fut)) => upload_fut,
Err(err) => return Either::A(future::err(err)),
};
Either::B(upload_fut.map(|(entry, _)| Some(entry)))
}
}
}
@ -827,10 +808,125 @@ impl UploadHgTreeEntry {
}
}
/// What sort of file contents are available to upload.
pub enum UploadHgFileContents {
/// Content already uploaded (or scheduled to be uploaded). Metadata will be inlined in
/// the envelope.
ContentUploaded(ContentBlobMeta),
/// Raw bytes as would be sent by Mercurial, including any metadata prepended in the standard
/// Mercurial format.
RawBytes(Bytes),
}
impl UploadHgFileContents {
/// Upload the file contents if necessary, and asynchronously return the hash of the file node
/// and metadata.
fn execute(
self,
repo: &BlobRepo,
p1: Option<HgNodeHash>,
p2: Option<HgNodeHash>,
path: MPath,
) -> (
ContentBlobInfo,
// The future that does the upload and the future that computes the node ID/metadata are
// split up to allow greater parallelism.
impl Future<Item = (), Error = Error> + Send,
impl Future<Item = (HgNodeHash, Bytes, u64), Error = Error> + Send,
) {
match self {
UploadHgFileContents::ContentUploaded(cbmeta) => {
let upload_fut = future::ok(());
let compute_fut = Self::compute(cbmeta.clone(), repo, p1, p2);
let cbinfo = ContentBlobInfo { path, meta: cbmeta };
(cbinfo, Either::A(upload_fut), Either::A(compute_fut))
}
UploadHgFileContents::RawBytes(raw_content) => {
let node_id = Self::node_id(raw_content.clone(), p1.as_ref(), p2.as_ref());
let f = File::new(raw_content, p1.as_ref(), p2.as_ref());
let metadata = f.metadata();
let copy_from = match f.copied_from() {
Ok(copy_from) => copy_from,
// XXX error out if copy-from information couldn't be read?
Err(_err) => None,
};
// Upload the contents separately (they'll be used for bonsai changesets as well).
let contents = f.file_contents();
let size = contents.size() as u64;
let contents_blob = contents.into_blob();
let cbinfo = ContentBlobInfo {
path: path.clone(),
meta: ContentBlobMeta {
id: *contents_blob.id(),
copy_from,
},
};
let upload_fut = repo.upload_blob(contents_blob)
.map(|_content_id| ())
.timed({
let logger = repo.logger.clone();
move |stats, result| {
if result.is_ok() {
UploadHgFileEntry::log_stats(
logger,
path,
node_id,
"content_uploaded",
stats,
);
}
Ok(())
}
});
let compute_fut = future::ok((node_id, metadata, size));
(cbinfo, Either::B(upload_fut), Either::B(compute_fut))
}
}
}
fn compute(
cbmeta: ContentBlobMeta,
repo: &BlobRepo,
p1: Option<HgNodeHash>,
p2: Option<HgNodeHash>,
) -> impl Future<Item = (HgNodeHash, Bytes, u64), Error = Error> {
// Computing the file node hash requires fetching the blob and gluing it together with the
// metadata.
repo.fetch(&cbmeta.id).map(move |file_contents| {
let size = file_contents.size() as u64;
let mut metadata = Vec::new();
File::generate_metadata(cbmeta.copy_from.as_ref(), &file_contents, &mut metadata)
.expect("Vec::write_all should never fail");
let file_bytes = file_contents.into_bytes();
// XXX this is just a hash computation, so it shouldn't require a copy
let raw_content = [&metadata[..], &file_bytes[..]].concat();
let node_id = Self::node_id(raw_content, p1.as_ref(), p2.as_ref());
(node_id, Bytes::from(metadata), size)
})
}
#[inline]
fn node_id<B: Into<Bytes>>(
raw_content: B,
p1: Option<&HgNodeHash>,
p2: Option<&HgNodeHash>,
) -> HgNodeHash {
let raw_content = raw_content.into();
HgBlobNode::new(raw_content, p1, p2)
.nodeid()
.expect("contents must have data available")
}
}
/// Context for uploading a Mercurial file entry.
pub struct UploadHgFileEntry {
pub upload_node_id: UploadHgNodeHash,
pub contents: Bytes,
pub contents: UploadHgFileContents,
pub file_type: FileType,
pub p1: Option<HgNodeHash>,
pub p2: Option<HgNodeHash>,
@ -841,13 +937,7 @@ impl UploadHgFileEntry {
pub fn upload(
self,
repo: &BlobRepo,
) -> Result<
(
HgNodeHash,
ContentBlobInfo,
BoxFuture<(HgBlobEntry, RepoPath), Error>,
),
> {
) -> Result<(ContentBlobInfo, BoxFuture<(HgBlobEntry, RepoPath), Error>)> {
STATS::upload_hg_file_entry.add_value(1);
let UploadHgFileEntry {
upload_node_id,
@ -858,112 +948,75 @@ impl UploadHgFileEntry {
path,
} = self;
let node_id: HgNodeHash = match upload_node_id {
UploadHgNodeHash::Generate => {
HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref())
.nodeid()
.expect("contents must have data available")
}
UploadHgNodeHash::Supplied(node_id) => node_id,
UploadHgNodeHash::Checked(node_id) => {
let computed_node_id = HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref())
.nodeid()
.expect("contents must have data available");
if node_id != computed_node_id {
bail_err!(ErrorKind::InconsistentEntryHash(
RepoPath::FilePath(path),
node_id,
computed_node_id
));
}
node_id
}
};
let (cbinfo, content_upload, compute_fut) = contents.execute(repo, p1, p2, path.clone());
let content_id = cbinfo.meta.id;
// Split up the file into metadata and file contents.
let f = File::new(contents, p1.as_ref(), p2.as_ref());
let metadata = f.metadata();
let contents = f.file_contents();
let blobstore = repo.blobstore.clone();
let logger = repo.logger.clone();
// Upload the contents separately (they'll be used for bonsai changesets as well).
let copy_from = match f.copied_from() {
Ok(copy_from) => copy_from,
// XXX error out if copy-from information couldn't be read?
Err(_err) => None,
};
let content_size = contents.size() as u64;
let (cbinfo, content_upload) =
Self::upload_content_blob(contents, node_id, path.clone(), copy_from, repo)?;
let file_envelope = HgFileEnvelopeMut {
node_id,
p1,
p2,
content_id: cbinfo.meta.id,
content_size,
metadata,
};
let envelope_blob = file_envelope.freeze().into_blob();
let blobstore_key = HgFileNodeId::new(node_id).blobstore_key();
let blob_entry = HgBlobEntry::new(
repo.blobstore.clone(),
path.basename().clone(),
node_id,
Type::File(file_type),
);
let envelope_upload = repo.blobstore
.put(blobstore_key, envelope_blob.into())
.timed({
let logger = repo.logger.clone();
let path = path.clone();
move |stats, result| {
if result.is_ok() {
Self::log_stats(logger, path, node_id, "file_envelope_uploaded", stats);
let envelope_upload =
compute_fut.and_then(move |(computed_node_id, metadata, content_size)| {
let node_id = match upload_node_id {
UploadHgNodeHash::Generate => computed_node_id,
UploadHgNodeHash::Supplied(node_id) => node_id,
UploadHgNodeHash::Checked(node_id) => {
if node_id != computed_node_id {
return Either::A(future::err(
ErrorKind::InconsistentEntryHash(
RepoPath::FilePath(path),
node_id,
computed_node_id,
).into(),
));
}
node_id
}
Ok(())
}
};
let file_envelope = HgFileEnvelopeMut {
node_id,
p1,
p2,
content_id,
content_size,
metadata,
};
let envelope_blob = file_envelope.freeze().into_blob();
let blobstore_key = HgFileNodeId::new(node_id).blobstore_key();
let blob_entry = HgBlobEntry::new(
blobstore.clone(),
path.basename().clone(),
node_id,
Type::File(file_type),
);
let envelope_upload = blobstore
.put(blobstore_key, envelope_blob.into())
.timed({
let path = path.clone();
move |stats, result| {
if result.is_ok() {
Self::log_stats(
logger,
path,
node_id,
"file_envelope_uploaded",
stats,
);
}
Ok(())
}
})
.map(move |()| (blob_entry, RepoPath::FilePath(path)));
Either::B(envelope_upload)
});
let fut = envelope_upload
.join(content_upload)
.map(move |(..)| (blob_entry, RepoPath::FilePath(path)));
Ok((node_id, cbinfo, fut.boxify()))
}
fn upload_content_blob(
contents: FileContents,
node_id: HgNodeHash,
path: MPath,
copy_from: Option<(MPath, HgNodeHash)>,
repo: &BlobRepo,
) -> Result<
(
ContentBlobInfo,
impl Future<Item = (), Error = Error> + Send,
),
> {
let contents_blob = contents.into_blob();
let cbinfo = ContentBlobInfo {
path: path.clone(),
meta: ContentBlobMeta {
id: *contents_blob.id(),
copy_from,
},
};
let fut = repo.upload_blob(contents_blob).map(move |_id| ()).timed({
let logger = repo.logger.clone();
move |stats, result| {
if result.is_ok() {
Self::log_stats(logger, path, node_id, "content_uploaded", stats);
}
Ok(())
}
});
Ok((cbinfo, fut))
.map(move |(envelope_res, ())| envelope_res);
Ok((cbinfo, fut.boxify()))
}
fn log_stats(logger: Logger, path: MPath, nodeid: HgNodeHash, phase: &str, stats: Stats) {

View File

@ -14,10 +14,10 @@ use futures::future::Future;
use futures::stream::futures_unordered;
use futures_ext::{BoxFuture, StreamExt};
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, UploadHgFileEntry,
UploadHgNodeHash, UploadHgTreeEntry};
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, UploadHgFileContents,
UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry};
use blobstore::{EagerMemblob, LazyMemblob};
use mercurial_types::{FileType, HgNodeHash, RepoPath};
use mercurial_types::{FileType, HgBlobNode, HgNodeHash, RepoPath};
use mononoke_types::DateTime;
use std::sync::Arc;
@ -151,15 +151,22 @@ fn upload_hg_file_entry(
p1: Option<HgNodeHash>,
p2: Option<HgNodeHash>,
) -> (HgNodeHash, BoxFuture<(HgBlobEntry, RepoPath), Error>) {
// Ideally the node id returned from upload.upload would be used, but that isn't immediately
// available -- so compute it ourselves.
let node_id = HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref())
.nodeid()
.expect("contents must have data available");
let upload = UploadHgFileEntry {
upload_node_id: UploadHgNodeHash::Generate,
contents,
upload_node_id: UploadHgNodeHash::Checked(node_id),
contents: UploadHgFileContents::RawBytes(contents),
file_type,
p1,
p2,
path: path.into_mpath().expect("expected a path to be present"),
};
let (node_id, _, upload_fut) = upload.upload(repo).unwrap();
let (_, upload_fut) = upload.upload(repo).unwrap();
(node_id, upload_fut)
}

View File

@ -16,7 +16,8 @@ use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use heapsize::HeapSizeOf;
use quickcheck::{Arbitrary, Gen};
use blobrepo::{BlobRepo, ContentBlobInfo, HgBlobEntry, UploadHgFileEntry, UploadHgNodeHash};
use blobrepo::{BlobRepo, ContentBlobInfo, HgBlobEntry, UploadHgFileContents, UploadHgFileEntry,
UploadHgNodeHash};
use mercurial_bundles::changegroup::CgDeltaChunk;
use mercurial_types::{delta, Delta, FileType, HgNodeHash, HgNodeKey, MPath, RepoPath, NULL_HASH};
@ -58,7 +59,7 @@ impl UploadableHgBlob for Filelog {
};
let upload = UploadHgFileEntry {
upload_node_id: UploadHgNodeHash::Checked(node_key.hash),
contents: self.data,
contents: UploadHgFileContents::RawBytes(self.data),
// XXX should this really be Regular?
file_type: FileType::Regular,
p1: self.p1,
@ -66,7 +67,7 @@ impl UploadableHgBlob for Filelog {
path,
};
let (_node, cbinfo, fut) = upload.upload(repo)?;
let (cbinfo, fut) = upload.upload(repo)?;
Ok((
node_key,
(cbinfo, fut.map_err(Error::compat).boxify().shared()),

View File

@ -19,7 +19,7 @@ use futures_cpupool::CpuPool;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use blobrepo::{BlobChangeset, BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry,
UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry};
UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry};
use mercurial::{manifest, RevlogChangeset, RevlogEntry, RevlogRepo};
use mercurial_types::{HgBlob, HgChangesetId, HgManifestId, HgNodeHash, MPath, RepoPath, Type,
NULL_HASH};
@ -187,15 +187,17 @@ fn upload_entry(
Type::File(ft) => {
let upload = UploadHgFileEntry {
upload_node_id,
contents: content
.into_inner()
.expect("contents should always be available"),
contents: UploadHgFileContents::RawBytes(
content
.into_inner()
.expect("contents should always be available"),
),
file_type: ft,
p1: p1.cloned(),
p2: p2.cloned(),
path,
};
let (_, _, upload_fut) = try_boxfuture!(upload.upload(&blobrepo));
let (_, upload_fut) = try_boxfuture!(upload.upload(&blobrepo));
upload_fut
}
}

View File

@ -121,17 +121,23 @@ impl File {
}
}
pub fn generate_copied_from<T>(
copy_info: Option<(MPath, HgNodeHash)>,
pub fn generate_metadata<T>(
copy_from: Option<&(MPath, HgNodeHash)>,
file_contents: &FileContents,
buf: &mut T,
) -> Result<()>
where
T: Write,
{
buf.write_all(META_MARKER)?;
match copy_info {
None => (),
match copy_from {
None => if file_contents.starts_with(META_MARKER) {
// If the file contents starts with META_MARKER, the metadata must be
// written out to avoid ambiguity.
buf.write_all(META_MARKER)?;
buf.write_all(META_MARKER)?;
},
Some((path, version)) => {
buf.write_all(META_MARKER)?;
buf.write_all(COPY_PATH_KEY)?;
buf.write_all(b": ")?;
path.generate(buf)?;
@ -140,9 +146,9 @@ impl File {
buf.write_all(COPY_REV_KEY)?;
buf.write_all(b": ")?;
buf.write_all(version.to_hex().as_ref())?;
buf.write_all(META_MARKER)?;
}
};
buf.write_all(META_MARKER)?;
Ok(())
}
@ -186,8 +192,9 @@ impl File {
#[cfg(test)]
mod test {
use super::{File, META_MARKER, META_SZ};
use mercurial_types::{HgNodeHash, MPath};
use super::*;
use mercurial_types_mocks::nodehash::*;
#[test]
fn extract_meta_sz() {
@ -275,10 +282,56 @@ mod test {
)
}
#[test]
fn generate_metadata_0() {
const FILE_CONTENTS: &[u8] = b"foobar";
let file_contents = FileContents::Bytes(Bytes::from(FILE_CONTENTS));
let mut out: Vec<u8> = vec![];
File::generate_metadata(None, &file_contents, &mut out)
.expect("Vec::write_all should succeed");
assert_eq!(out.as_slice(), &b""[..]);
let mut out: Vec<u8> = vec![];
File::generate_metadata(
Some(&(MPath::new("foo").unwrap(), ONES_HASH)),
&file_contents,
&mut out,
).expect("Vec::write_all should succeed");
assert_eq!(
out.as_slice(),
&b"\x01\ncopy: foo\ncopyrev: 1111111111111111111111111111111111111111\x01\n"[..]
);
}
#[test]
fn generate_metadata_1() {
// The meta marker in the beginning should cause metadata to unconditionally be emitted.
const FILE_CONTENTS: &[u8] = b"\x01\nfoobar";
let file_contents = FileContents::Bytes(Bytes::from(FILE_CONTENTS));
let mut out: Vec<u8> = vec![];
File::generate_metadata(None, &file_contents, &mut out)
.expect("Vec::write_all should succeed");
assert_eq!(out.as_slice(), &b"\x01\n\x01\n"[..]);
let mut out: Vec<u8> = vec![];
File::generate_metadata(
Some(&(MPath::new("foo").unwrap(), ONES_HASH)),
&file_contents,
&mut out,
).expect("Vec::write_all should succeed");
assert_eq!(
out.as_slice(),
&b"\x01\ncopy: foo\ncopyrev: 1111111111111111111111111111111111111111\x01\n"[..]
);
}
quickcheck! {
fn copy_info_roundtrip(copy_info: Option<(MPath, HgNodeHash)>) -> bool {
fn copy_info_roundtrip(
copy_info: Option<(MPath, HgNodeHash)>,
contents: FileContents
) -> bool {
let mut buf = Vec::new();
let result = File::generate_copied_from(copy_info.clone(), &mut buf)
let result = File::generate_metadata(copy_info.as_ref(), &contents, &mut buf)
.and_then(|_| {
File::get_copied_from(File::parse_meta(&buf))
});

View File

@ -45,6 +45,14 @@ impl FileContents {
}
}
/// Whether this starts with a particular string.
#[inline]
pub fn starts_with(&self, needle: &[u8]) -> bool {
match self {
FileContents::Bytes(b) => b.starts_with(needle),
}
}
pub fn into_bytes(self) -> Bytes {
match self {
FileContents::Bytes(bytes) => bytes,