From 972822e2188b6350e16f223d8ca9c8c196144ce6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rain=20=E2=81=A3?= Date: Wed, 20 Jun 2018 13:18:25 -0700 Subject: [PATCH] blobrepo: simplify code for converting bonsai FileChange to hg Summary: Fetching the blob is still required to compute the node hash, but we don't have to reupload it. Reviewed By: farnz Differential Revision: D8508462 fbshipit-source-id: 341a1a2f82d8f8b939ebf3990b3467ed7ad9244c --- blobrepo/src/lib.rs | 4 +- blobrepo/src/repo.rs | 343 +++++++++++--------- blobrepo/test/utils.rs | 19 +- bundle2-resolver/src/changegroup/filelog.rs | 7 +- cmds/blobimport/changeset.rs | 12 +- mercurial/src/file.rs | 73 ++++- mononoke-types/src/file_contents.rs | 8 + 7 files changed, 295 insertions(+), 171 deletions(-) diff --git a/blobrepo/src/lib.rs b/blobrepo/src/lib.rs index 502cf1bb34..f842738a6a 100644 --- a/blobrepo/src/lib.rs +++ b/blobrepo/src/lib.rs @@ -70,8 +70,8 @@ pub use errors::*; pub use changeset::BlobChangeset; pub use file::HgBlobEntry; pub use manifest::BlobManifest; -pub use repo::{BlobRepo, ContentBlobInfo, ContentBlobMeta, CreateChangeset, UploadHgFileEntry, - UploadHgNodeHash, UploadHgTreeEntry}; +pub use repo::{BlobRepo, ContentBlobInfo, ContentBlobMeta, CreateChangeset, UploadHgFileContents, + UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry}; pub use repo_commit::ChangesetHandle; // TODO: This is exported for testing - is this the right place for it? pub use repo_commit::compute_changed_files; diff --git a/blobrepo/src/repo.rs b/blobrepo/src/repo.rs index 5fb45e032a..9c311a1dd9 100644 --- a/blobrepo/src/repo.rs +++ b/blobrepo/src/repo.rs @@ -5,7 +5,6 @@ // GNU General Public License version 2 or any later version. use std::collections::{BTreeMap, HashSet}; -use std::io::Write; use std::mem; use std::path::Path; use std::sync::{mpsc, Arc}; @@ -40,8 +39,8 @@ use mercurial_types::{Changeset, Entry, HgBlob, HgBlobNode, HgChangesetId, HgFil HgFileNodeId, HgManifestEnvelopeMut, HgManifestId, HgNodeHash, HgParents, Manifest, RepoPath, RepositoryId, Type}; use mercurial_types::manifest::Content; -use mononoke_types::{Blob, BlobstoreValue, BonsaiChangeset, ChangesetId, ContentId, DateTime, - FileChange, FileContents, FileType, MPath, MPathElement, MononokeId}; +use mononoke_types::{Blob, BlobstoreValue, BonsaiChangeset, ContentId, DateTime, FileChange, + FileContents, FileType, MPath, MPathElement, MononokeId}; use rocksblob::Rocksblob; use rocksdb; use tokio_core::reactor::Core; @@ -535,44 +534,26 @@ impl BlobRepo { path: &MPath, change: Option<&FileChange>, ) -> impl Future, Error = Error> + Send { - fn prepend_metadata( - content: Bytes, - _copy_from: Option<&(MPath, ChangesetId)>, - ) -> Result { - let mut buf = Vec::new(); - File::generate_copied_from( - None, //FIXME: we need external {ChangesetId -> HgNodeHash} mapping - &mut buf, - )?; - buf.write(content.as_ref())?; - Ok(buf.into()) - } - match change { None => Either::A(future::ok(None)), Some(change) => { - let upload_future = self.fetch(change.content_id()).and_then({ - let repo = self.clone(); - let change = change.clone(); - let path = path.clone(); - move |file_content| { - let hg_content = try_boxfuture!(prepend_metadata( - file_content.into_bytes(), - change.copy_from() - )); - let upload_entry = UploadHgFileEntry { - upload_node_id: UploadHgNodeHash::Generate, - contents: hg_content, - file_type: change.file_type(), - p1, - p2, - path: path, - }; - let (_, _, upload_future) = try_boxfuture!(upload_entry.upload(&repo)); - upload_future.map(|(entry, _)| Some(entry)).boxify() - } - }); - Either::B(upload_future) + let upload_entry = UploadHgFileEntry { + upload_node_id: UploadHgNodeHash::Generate, + contents: UploadHgFileContents::ContentUploaded(ContentBlobMeta { + id: *change.content_id(), + // FIXME: need external {ChangesetID -> HgNodeHash} mapping + copy_from: None, + }), + file_type: change.file_type(), + p1, + p2, + path: path.clone(), + }; + let upload_fut = match upload_entry.upload(self) { + Ok((_, upload_fut)) => upload_fut, + Err(err) => return Either::A(future::err(err)), + }; + Either::B(upload_fut.map(|(entry, _)| Some(entry))) } } } @@ -827,10 +808,125 @@ impl UploadHgTreeEntry { } } +/// What sort of file contents are available to upload. +pub enum UploadHgFileContents { + /// Content already uploaded (or scheduled to be uploaded). Metadata will be inlined in + /// the envelope. + ContentUploaded(ContentBlobMeta), + /// Raw bytes as would be sent by Mercurial, including any metadata prepended in the standard + /// Mercurial format. + RawBytes(Bytes), +} + +impl UploadHgFileContents { + /// Upload the file contents if necessary, and asynchronously return the hash of the file node + /// and metadata. + fn execute( + self, + repo: &BlobRepo, + p1: Option, + p2: Option, + path: MPath, + ) -> ( + ContentBlobInfo, + // The future that does the upload and the future that computes the node ID/metadata are + // split up to allow greater parallelism. + impl Future + Send, + impl Future + Send, + ) { + match self { + UploadHgFileContents::ContentUploaded(cbmeta) => { + let upload_fut = future::ok(()); + let compute_fut = Self::compute(cbmeta.clone(), repo, p1, p2); + let cbinfo = ContentBlobInfo { path, meta: cbmeta }; + (cbinfo, Either::A(upload_fut), Either::A(compute_fut)) + } + UploadHgFileContents::RawBytes(raw_content) => { + let node_id = Self::node_id(raw_content.clone(), p1.as_ref(), p2.as_ref()); + let f = File::new(raw_content, p1.as_ref(), p2.as_ref()); + let metadata = f.metadata(); + + let copy_from = match f.copied_from() { + Ok(copy_from) => copy_from, + // XXX error out if copy-from information couldn't be read? + Err(_err) => None, + }; + // Upload the contents separately (they'll be used for bonsai changesets as well). + let contents = f.file_contents(); + let size = contents.size() as u64; + let contents_blob = contents.into_blob(); + let cbinfo = ContentBlobInfo { + path: path.clone(), + meta: ContentBlobMeta { + id: *contents_blob.id(), + copy_from, + }, + }; + + let upload_fut = repo.upload_blob(contents_blob) + .map(|_content_id| ()) + .timed({ + let logger = repo.logger.clone(); + move |stats, result| { + if result.is_ok() { + UploadHgFileEntry::log_stats( + logger, + path, + node_id, + "content_uploaded", + stats, + ); + } + Ok(()) + } + }); + let compute_fut = future::ok((node_id, metadata, size)); + + (cbinfo, Either::B(upload_fut), Either::B(compute_fut)) + } + } + } + + fn compute( + cbmeta: ContentBlobMeta, + repo: &BlobRepo, + p1: Option, + p2: Option, + ) -> impl Future { + // Computing the file node hash requires fetching the blob and gluing it together with the + // metadata. + repo.fetch(&cbmeta.id).map(move |file_contents| { + let size = file_contents.size() as u64; + let mut metadata = Vec::new(); + File::generate_metadata(cbmeta.copy_from.as_ref(), &file_contents, &mut metadata) + .expect("Vec::write_all should never fail"); + + let file_bytes = file_contents.into_bytes(); + + // XXX this is just a hash computation, so it shouldn't require a copy + let raw_content = [&metadata[..], &file_bytes[..]].concat(); + let node_id = Self::node_id(raw_content, p1.as_ref(), p2.as_ref()); + (node_id, Bytes::from(metadata), size) + }) + } + + #[inline] + fn node_id>( + raw_content: B, + p1: Option<&HgNodeHash>, + p2: Option<&HgNodeHash>, + ) -> HgNodeHash { + let raw_content = raw_content.into(); + HgBlobNode::new(raw_content, p1, p2) + .nodeid() + .expect("contents must have data available") + } +} + /// Context for uploading a Mercurial file entry. pub struct UploadHgFileEntry { pub upload_node_id: UploadHgNodeHash, - pub contents: Bytes, + pub contents: UploadHgFileContents, pub file_type: FileType, pub p1: Option, pub p2: Option, @@ -841,13 +937,7 @@ impl UploadHgFileEntry { pub fn upload( self, repo: &BlobRepo, - ) -> Result< - ( - HgNodeHash, - ContentBlobInfo, - BoxFuture<(HgBlobEntry, RepoPath), Error>, - ), - > { + ) -> Result<(ContentBlobInfo, BoxFuture<(HgBlobEntry, RepoPath), Error>)> { STATS::upload_hg_file_entry.add_value(1); let UploadHgFileEntry { upload_node_id, @@ -858,112 +948,75 @@ impl UploadHgFileEntry { path, } = self; - let node_id: HgNodeHash = match upload_node_id { - UploadHgNodeHash::Generate => { - HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref()) - .nodeid() - .expect("contents must have data available") - } - UploadHgNodeHash::Supplied(node_id) => node_id, - UploadHgNodeHash::Checked(node_id) => { - let computed_node_id = HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref()) - .nodeid() - .expect("contents must have data available"); - if node_id != computed_node_id { - bail_err!(ErrorKind::InconsistentEntryHash( - RepoPath::FilePath(path), - node_id, - computed_node_id - )); - } - node_id - } - }; + let (cbinfo, content_upload, compute_fut) = contents.execute(repo, p1, p2, path.clone()); + let content_id = cbinfo.meta.id; - // Split up the file into metadata and file contents. - let f = File::new(contents, p1.as_ref(), p2.as_ref()); - let metadata = f.metadata(); - let contents = f.file_contents(); + let blobstore = repo.blobstore.clone(); + let logger = repo.logger.clone(); - // Upload the contents separately (they'll be used for bonsai changesets as well). - let copy_from = match f.copied_from() { - Ok(copy_from) => copy_from, - // XXX error out if copy-from information couldn't be read? - Err(_err) => None, - }; - let content_size = contents.size() as u64; - let (cbinfo, content_upload) = - Self::upload_content_blob(contents, node_id, path.clone(), copy_from, repo)?; - - let file_envelope = HgFileEnvelopeMut { - node_id, - p1, - p2, - content_id: cbinfo.meta.id, - content_size, - metadata, - }; - let envelope_blob = file_envelope.freeze().into_blob(); - - let blobstore_key = HgFileNodeId::new(node_id).blobstore_key(); - - let blob_entry = HgBlobEntry::new( - repo.blobstore.clone(), - path.basename().clone(), - node_id, - Type::File(file_type), - ); - - let envelope_upload = repo.blobstore - .put(blobstore_key, envelope_blob.into()) - .timed({ - let logger = repo.logger.clone(); - let path = path.clone(); - move |stats, result| { - if result.is_ok() { - Self::log_stats(logger, path, node_id, "file_envelope_uploaded", stats); + let envelope_upload = + compute_fut.and_then(move |(computed_node_id, metadata, content_size)| { + let node_id = match upload_node_id { + UploadHgNodeHash::Generate => computed_node_id, + UploadHgNodeHash::Supplied(node_id) => node_id, + UploadHgNodeHash::Checked(node_id) => { + if node_id != computed_node_id { + return Either::A(future::err( + ErrorKind::InconsistentEntryHash( + RepoPath::FilePath(path), + node_id, + computed_node_id, + ).into(), + )); + } + node_id } - Ok(()) - } + }; + + let file_envelope = HgFileEnvelopeMut { + node_id, + p1, + p2, + content_id, + content_size, + metadata, + }; + let envelope_blob = file_envelope.freeze().into_blob(); + + let blobstore_key = HgFileNodeId::new(node_id).blobstore_key(); + + let blob_entry = HgBlobEntry::new( + blobstore.clone(), + path.basename().clone(), + node_id, + Type::File(file_type), + ); + + let envelope_upload = blobstore + .put(blobstore_key, envelope_blob.into()) + .timed({ + let path = path.clone(); + move |stats, result| { + if result.is_ok() { + Self::log_stats( + logger, + path, + node_id, + "file_envelope_uploaded", + stats, + ); + } + Ok(()) + } + }) + .map(move |()| (blob_entry, RepoPath::FilePath(path))); + Either::B(envelope_upload) }); let fut = envelope_upload .join(content_upload) - .map(move |(..)| (blob_entry, RepoPath::FilePath(path))); - Ok((node_id, cbinfo, fut.boxify())) - } - - fn upload_content_blob( - contents: FileContents, - node_id: HgNodeHash, - path: MPath, - copy_from: Option<(MPath, HgNodeHash)>, - repo: &BlobRepo, - ) -> Result< - ( - ContentBlobInfo, - impl Future + Send, - ), - > { - let contents_blob = contents.into_blob(); - let cbinfo = ContentBlobInfo { - path: path.clone(), - meta: ContentBlobMeta { - id: *contents_blob.id(), - copy_from, - }, - }; - - let fut = repo.upload_blob(contents_blob).map(move |_id| ()).timed({ - let logger = repo.logger.clone(); - move |stats, result| { - if result.is_ok() { - Self::log_stats(logger, path, node_id, "content_uploaded", stats); - } - Ok(()) - } - }); - Ok((cbinfo, fut)) + .map(move |(envelope_res, ())| envelope_res); + Ok((cbinfo, fut.boxify())) } fn log_stats(logger: Logger, path: MPath, nodeid: HgNodeHash, phase: &str, stats: Stats) { diff --git a/blobrepo/test/utils.rs b/blobrepo/test/utils.rs index b827c1b65a..199f3c3806 100644 --- a/blobrepo/test/utils.rs +++ b/blobrepo/test/utils.rs @@ -14,10 +14,10 @@ use futures::future::Future; use futures::stream::futures_unordered; use futures_ext::{BoxFuture, StreamExt}; -use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, UploadHgFileEntry, - UploadHgNodeHash, UploadHgTreeEntry}; +use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, UploadHgFileContents, + UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry}; use blobstore::{EagerMemblob, LazyMemblob}; -use mercurial_types::{FileType, HgNodeHash, RepoPath}; +use mercurial_types::{FileType, HgBlobNode, HgNodeHash, RepoPath}; use mononoke_types::DateTime; use std::sync::Arc; @@ -151,15 +151,22 @@ fn upload_hg_file_entry( p1: Option, p2: Option, ) -> (HgNodeHash, BoxFuture<(HgBlobEntry, RepoPath), Error>) { + // Ideally the node id returned from upload.upload would be used, but that isn't immediately + // available -- so compute it ourselves. + let node_id = HgBlobNode::new(contents.clone(), p1.as_ref(), p2.as_ref()) + .nodeid() + .expect("contents must have data available"); + let upload = UploadHgFileEntry { - upload_node_id: UploadHgNodeHash::Generate, - contents, + upload_node_id: UploadHgNodeHash::Checked(node_id), + contents: UploadHgFileContents::RawBytes(contents), file_type, p1, p2, path: path.into_mpath().expect("expected a path to be present"), }; - let (node_id, _, upload_fut) = upload.upload(repo).unwrap(); + + let (_, upload_fut) = upload.upload(repo).unwrap(); (node_id, upload_fut) } diff --git a/bundle2-resolver/src/changegroup/filelog.rs b/bundle2-resolver/src/changegroup/filelog.rs index 4e7a2ee653..4826d4118c 100644 --- a/bundle2-resolver/src/changegroup/filelog.rs +++ b/bundle2-resolver/src/changegroup/filelog.rs @@ -16,7 +16,8 @@ use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; use heapsize::HeapSizeOf; use quickcheck::{Arbitrary, Gen}; -use blobrepo::{BlobRepo, ContentBlobInfo, HgBlobEntry, UploadHgFileEntry, UploadHgNodeHash}; +use blobrepo::{BlobRepo, ContentBlobInfo, HgBlobEntry, UploadHgFileContents, UploadHgFileEntry, + UploadHgNodeHash}; use mercurial_bundles::changegroup::CgDeltaChunk; use mercurial_types::{delta, Delta, FileType, HgNodeHash, HgNodeKey, MPath, RepoPath, NULL_HASH}; @@ -58,7 +59,7 @@ impl UploadableHgBlob for Filelog { }; let upload = UploadHgFileEntry { upload_node_id: UploadHgNodeHash::Checked(node_key.hash), - contents: self.data, + contents: UploadHgFileContents::RawBytes(self.data), // XXX should this really be Regular? file_type: FileType::Regular, p1: self.p1, @@ -66,7 +67,7 @@ impl UploadableHgBlob for Filelog { path, }; - let (_node, cbinfo, fut) = upload.upload(repo)?; + let (cbinfo, fut) = upload.upload(repo)?; Ok(( node_key, (cbinfo, fut.map_err(Error::compat).boxify().shared()), diff --git a/cmds/blobimport/changeset.rs b/cmds/blobimport/changeset.rs index bece79aecc..fe3d0d8b52 100644 --- a/cmds/blobimport/changeset.rs +++ b/cmds/blobimport/changeset.rs @@ -19,7 +19,7 @@ use futures_cpupool::CpuPool; use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; use blobrepo::{BlobChangeset, BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, - UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry}; + UploadHgFileContents, UploadHgFileEntry, UploadHgNodeHash, UploadHgTreeEntry}; use mercurial::{manifest, RevlogChangeset, RevlogEntry, RevlogRepo}; use mercurial_types::{HgBlob, HgChangesetId, HgManifestId, HgNodeHash, MPath, RepoPath, Type, NULL_HASH}; @@ -187,15 +187,17 @@ fn upload_entry( Type::File(ft) => { let upload = UploadHgFileEntry { upload_node_id, - contents: content - .into_inner() - .expect("contents should always be available"), + contents: UploadHgFileContents::RawBytes( + content + .into_inner() + .expect("contents should always be available"), + ), file_type: ft, p1: p1.cloned(), p2: p2.cloned(), path, }; - let (_, _, upload_fut) = try_boxfuture!(upload.upload(&blobrepo)); + let (_, upload_fut) = try_boxfuture!(upload.upload(&blobrepo)); upload_fut } } diff --git a/mercurial/src/file.rs b/mercurial/src/file.rs index eec402842e..d6bf960aca 100644 --- a/mercurial/src/file.rs +++ b/mercurial/src/file.rs @@ -121,17 +121,23 @@ impl File { } } - pub fn generate_copied_from( - copy_info: Option<(MPath, HgNodeHash)>, + pub fn generate_metadata( + copy_from: Option<&(MPath, HgNodeHash)>, + file_contents: &FileContents, buf: &mut T, ) -> Result<()> where T: Write, { - buf.write_all(META_MARKER)?; - match copy_info { - None => (), + match copy_from { + None => if file_contents.starts_with(META_MARKER) { + // If the file contents starts with META_MARKER, the metadata must be + // written out to avoid ambiguity. + buf.write_all(META_MARKER)?; + buf.write_all(META_MARKER)?; + }, Some((path, version)) => { + buf.write_all(META_MARKER)?; buf.write_all(COPY_PATH_KEY)?; buf.write_all(b": ")?; path.generate(buf)?; @@ -140,9 +146,9 @@ impl File { buf.write_all(COPY_REV_KEY)?; buf.write_all(b": ")?; buf.write_all(version.to_hex().as_ref())?; + buf.write_all(META_MARKER)?; } }; - buf.write_all(META_MARKER)?; Ok(()) } @@ -186,8 +192,9 @@ impl File { #[cfg(test)] mod test { - use super::{File, META_MARKER, META_SZ}; - use mercurial_types::{HgNodeHash, MPath}; + use super::*; + + use mercurial_types_mocks::nodehash::*; #[test] fn extract_meta_sz() { @@ -275,10 +282,56 @@ mod test { ) } + #[test] + fn generate_metadata_0() { + const FILE_CONTENTS: &[u8] = b"foobar"; + let file_contents = FileContents::Bytes(Bytes::from(FILE_CONTENTS)); + let mut out: Vec = vec![]; + File::generate_metadata(None, &file_contents, &mut out) + .expect("Vec::write_all should succeed"); + assert_eq!(out.as_slice(), &b""[..]); + + let mut out: Vec = vec![]; + File::generate_metadata( + Some(&(MPath::new("foo").unwrap(), ONES_HASH)), + &file_contents, + &mut out, + ).expect("Vec::write_all should succeed"); + assert_eq!( + out.as_slice(), + &b"\x01\ncopy: foo\ncopyrev: 1111111111111111111111111111111111111111\x01\n"[..] + ); + } + + #[test] + fn generate_metadata_1() { + // The meta marker in the beginning should cause metadata to unconditionally be emitted. + const FILE_CONTENTS: &[u8] = b"\x01\nfoobar"; + let file_contents = FileContents::Bytes(Bytes::from(FILE_CONTENTS)); + let mut out: Vec = vec![]; + File::generate_metadata(None, &file_contents, &mut out) + .expect("Vec::write_all should succeed"); + assert_eq!(out.as_slice(), &b"\x01\n\x01\n"[..]); + + let mut out: Vec = vec![]; + File::generate_metadata( + Some(&(MPath::new("foo").unwrap(), ONES_HASH)), + &file_contents, + &mut out, + ).expect("Vec::write_all should succeed"); + assert_eq!( + out.as_slice(), + &b"\x01\ncopy: foo\ncopyrev: 1111111111111111111111111111111111111111\x01\n"[..] + ); + } + quickcheck! { - fn copy_info_roundtrip(copy_info: Option<(MPath, HgNodeHash)>) -> bool { + fn copy_info_roundtrip( + copy_info: Option<(MPath, HgNodeHash)>, + contents: FileContents + ) -> bool { let mut buf = Vec::new(); - let result = File::generate_copied_from(copy_info.clone(), &mut buf) + let result = File::generate_metadata(copy_info.as_ref(), &contents, &mut buf) .and_then(|_| { File::get_copied_from(File::parse_meta(&buf)) }); diff --git a/mononoke-types/src/file_contents.rs b/mononoke-types/src/file_contents.rs index b066f3f38c..efc6d57eb4 100644 --- a/mononoke-types/src/file_contents.rs +++ b/mononoke-types/src/file_contents.rs @@ -45,6 +45,14 @@ impl FileContents { } } + /// Whether this starts with a particular string. + #[inline] + pub fn starts_with(&self, needle: &[u8]) -> bool { + match self { + FileContents::Bytes(b) => b.starts_with(needle), + } + } + pub fn into_bytes(self) -> Bytes { match self { FileContents::Bytes(bytes) => bytes,