Create changesets API in BlobRepo

Summary:
Provide an API to ask BlobRepo to create changesets for you from
pieces that you either have to hand, or have created via upload_entry().

Parallelism is maintained in as far as possible - if you commit N changesets,
they should all upload blobs in parallel, but the final completion future
depends on the parents, so that completion order can be maintained.

The ultimate goal of this API is to ensure that only valid commits are added to the `BlobRepo` - this means that, once the future returned by `create_changeset` resolves, you have a repo with commits and blobs in place. Until then, all the pieces can be uploaded, but are not guaranteed to be accessible to clients.

Still TODO is teaching this to use the complete changesets infra so that we
simply know which changesets are fully uploaded.

Reviewed By: StanislavGlebik

Differential Revision: D6743004

fbshipit-source-id: 813329058d85c022d75388890181b48b78d2acf3
This commit is contained in:
Simon Farnsworth 2018-02-27 02:12:26 -08:00 committed by Facebook Github Bot
parent 3526e7137b
commit 5e074bdd43
9 changed files with 1087 additions and 111 deletions

View File

@ -43,13 +43,29 @@ fn cskey(changesetid: &ChangesetId) -> String {
}
impl BlobChangeset {
pub fn new(changesetid: &ChangesetId, revlogcs: RevlogChangeset) -> Self {
pub fn new(revlogcs: RevlogChangeset) -> Result<Self> {
let node = revlogcs.get_node()?;
let nodeid = node.nodeid()
.ok_or(Error::from(ErrorKind::NodeGenerationFailed))?;
let changesetid = ChangesetId::new(nodeid);
Ok(Self {
changesetid,
revlogcs,
})
}
pub fn new_with_id(changesetid: &ChangesetId, revlogcs: RevlogChangeset) -> Self {
Self {
changesetid: *changesetid,
revlogcs,
}
}
pub fn get_changeset_id(&self) -> ChangesetId {
self.changesetid
}
pub fn load(
blobstore: &Arc<Blobstore>,
changesetid: &ChangesetId,

View File

@ -11,8 +11,7 @@ use bytes::Bytes;
pub use failure::Error;
use mercurial_types::{Blob, BlobHash, NodeHash};
use mercurial_types::nodehash::ChangesetId;
use mercurial_types::{Blob, BlobHash, ChangesetId, NodeHash, Parents, RepoPath, Type};
#[derive(Debug)]
pub enum StateOpenError {
@ -48,6 +47,18 @@ pub enum ErrorKind {
#[fail(display = "Content missing nodeid {} (blob hash {:?})", _0, _1)]
ContentMissing(NodeHash, BlobHash),
#[fail(display = "Uploaded blob is incomplete {:?}", _0)] BadUploadBlob(Blob<Bytes>),
#[fail(display = "Parents are not in blob store {:?}", _0)] ParentsUnknown(Parents),
#[fail(display = "Serialization of node failed {} ({})", _0, _1)]
SerializationFailed(NodeHash, bincode::Error),
#[fail(display = "Root manifest is not a manifest (type {})", _0)] BadRootManifest(Type),
#[fail(display = "Manifest type {} does not match uploaded type {}", _0, _1)]
ManifestTypeMismatch(Type, Type),
#[fail(display = "Node generation failed for unknown reason")] NodeGenerationFailed,
#[fail(display = "Path {} appears multiple times in manifests", _0)] DuplicateEntry(RepoPath),
#[fail(display = "Duplicate manifest hash {}", _0)] DuplicateManifest(NodeHash),
#[fail(display = "Missing entries in new changeset {}", _0)] MissingEntries(NodeHash),
#[fail(display = "Some manifests do not exist")] MissingManifests,
#[fail(display = "Parents failed to complete")] ParentsFailed,
#[fail(display = "Expected {} to be a manifest, found a {} instead", _0, _1)]
NotAManifest(NodeHash, Type),
}

View File

@ -30,6 +30,7 @@ extern crate fileblob;
extern crate filebookmarks;
extern crate fileheads;
extern crate filelinknodes;
#[macro_use]
extern crate futures_ext;
extern crate heads;
extern crate linknodes;
@ -49,6 +50,7 @@ mod manifest;
mod file;
mod errors;
mod utils;
mod repo_commit;
pub use errors::*;
@ -56,6 +58,9 @@ pub use changeset::BlobChangeset;
pub use file::BlobEntry;
pub use manifest::BlobManifest;
pub use repo::BlobRepo;
pub use repo_commit::ChangesetHandle;
// TODO: This is exported for testing - is this the right place for it?
pub use repo_commit::compute_changed_files;
//
// TODO: (jsgf) T21597565 This is exposed here for blobimport -- don't use it for anything else.

View File

@ -4,17 +4,18 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::mem;
use std::path::Path;
use std::sync::Arc;
use bincode;
use bytes::Bytes;
use failure::ResultExt;
use failure::{Fail, ResultExt};
use futures::{Async, Poll};
use futures::future::Future;
use futures::stream::{self, Stream};
use futures::sync::oneshot;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use blobstore::Blobstore;
@ -27,12 +28,12 @@ use filelinknodes::FileLinknodes;
use heads::Heads;
use linknodes::Linknodes;
use manifoldblob::ManifoldBlob;
use memblob::EagerMemblob;
use memblob::{EagerMemblob, LazyMemblob};
use membookmarks::MemBookmarks;
use memheads::MemHeads;
use memlinknodes::MemLinknodes;
use mercurial_types::{Blob, BlobNode, Changeset, ChangesetId, Entry, MPath, Manifest, NodeHash,
Parents, RepoPath, RepositoryId};
Parents, RepoPath, RepositoryId, Time};
use mercurial_types::manifest;
use mercurial_types::nodehash::ManifestId;
use rocksblob::Rocksblob;
@ -43,6 +44,7 @@ use BlobChangeset;
use BlobManifest;
use errors::*;
use file::{fetch_file_content_and_renames_from_blobstore, BlobEntry};
use repo_commit::*;
use utils::{get_node, get_node_key, RawNodeBlob};
pub struct BlobRepo {
@ -135,6 +137,24 @@ impl BlobRepo {
)
}
pub fn new_lazymemblob(
heads: MemHeads,
bookmarks: MemBookmarks,
blobstore: LazyMemblob,
linknodes: MemLinknodes,
changesets: SqliteChangesets,
repoid: RepositoryId,
) -> Self {
Self::new(
Arc::new(heads),
Arc::new(bookmarks),
Arc::new(blobstore),
Arc::new(linknodes),
Arc::new(changesets),
repoid,
)
}
pub fn new_test_manifold<T: ToString>(
bucket: T,
remote: &Remote,
@ -310,6 +330,84 @@ impl BlobRepo {
.boxify(),
))
}
/// Create a changeset in this repo. This will upload all the blobs to the underlying Blobstore
/// and ensure that the changeset is marked as "complete".
/// No attempt is made to clean up the Blobstore if the changeset creation fails
pub fn create_changeset(
&self,
p1: Option<ChangesetHandle>,
p2: Option<ChangesetHandle>,
root_manifest: BoxFuture<(BlobEntry, RepoPath), Error>,
new_child_entries: BoxStream<(BlobEntry, RepoPath), Error>,
user: String,
time: Time,
extra: BTreeMap<Vec<u8>, Vec<u8>>,
comments: String,
) -> ChangesetHandle {
let entry_processor = UploadEntries::new(self.blobstore.clone());
let (signal_parent_ready, can_be_parent) = oneshot::channel();
let upload_entries = process_entries(
self.clone(),
&entry_processor,
root_manifest,
new_child_entries,
);
let parents_complete = extract_parents_complete(&p1, &p2);
let parents_data = handle_parents(self.clone(), p1, p2);
let changeset = {
upload_entries.join(parents_data).and_then({
let linknodes = self.linknodes.clone();
let blobstore = self.blobstore.clone();
move |((root_manifest, root_hash), (parents, p1_manifest, p2_manifest))| {
compute_changed_files(
&root_manifest,
p1_manifest.as_ref(),
p2_manifest.as_ref(),
).and_then({
move |files| {
let blobcs = try_boxfuture!(make_new_changeset(
parents,
root_hash,
user,
time,
extra,
files,
comments,
));
let cs_id = blobcs.get_changeset_id().into_nodehash();
let manifest_id = *blobcs.manifestid();
blobcs
.save(blobstore)
.join(entry_processor.finalize(linknodes, cs_id))
.map(move |_| {
// We deliberately eat this error - this is only so that
// another changeset can start uploading to the blob store
// while we complete this one
let _ = signal_parent_ready.send((cs_id, manifest_id));
})
.map(move |_| blobcs)
.boxify()
}
})
}
})
};
let parents_complete =
parents_complete.map_err(|e| ErrorKind::ParentsFailed.context(e).into());
// TODO This should call the completion API before returning changeset
ChangesetHandle::new_pending(
can_be_parent.shared(),
parents_complete.and_then(|_| changeset).boxify().shared(),
)
}
}
impl Clone for BlobRepo {

414
blobrepo/src/repo_commit.rs Normal file
View File

@ -0,0 +1,414 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem;
use std::sync::{Arc, Mutex};
use futures::future::{self, Future, Shared, SharedError, SharedItem};
use futures::stream::Stream;
use futures::sync::oneshot;
use futures_ext::{BoxFuture, BoxStream, FutureExt};
use blobstore::Blobstore;
use linknodes::{ErrorKind as LinknodeErrorKind, Linknodes};
use mercurial::changeset::RevlogChangeset;
use mercurial_types::{Changeset, Entry, EntryId, MPath, Manifest, NodeHash, Parents, RepoPath,
Time};
use mercurial_types::manifest::{self, Content};
use mercurial_types::manifest_utils::{changed_entry_stream, EntryStatus};
use mercurial_types::nodehash::ManifestId;
use BlobChangeset;
use BlobRepo;
use errors::*;
use file::BlobEntry;
use utils::get_node_key;
/// A handle to a possibly incomplete BlobChangeset. This is used instead of
/// Future<Item = BlobChangeset> where we don't want to fully serialize waiting for completion.
/// For example, `create_changeset` takes these as p1/p2 so that it can handle the blobstore side
/// of creating a new changeset before its parent changesets are complete.
/// See `get_completed_changeset()` for the public API you can use to extract the final changeset
#[derive(Clone)]
pub struct ChangesetHandle {
can_be_parent: Shared<oneshot::Receiver<(NodeHash, ManifestId)>>,
completion_future: Shared<BoxFuture<BlobChangeset, Error>>,
}
impl ChangesetHandle {
pub fn new_pending(
can_be_parent: Shared<oneshot::Receiver<(NodeHash, ManifestId)>>,
completion_future: Shared<BoxFuture<BlobChangeset, Error>>,
) -> Self {
Self {
can_be_parent,
completion_future,
}
}
pub fn get_completed_changeset(self) -> Shared<BoxFuture<BlobChangeset, Error>> {
self.completion_future
}
}
impl From<BlobChangeset> for ChangesetHandle {
fn from(bcs: BlobChangeset) -> Self {
let (trigger, can_be_parent) = oneshot::channel();
// The send cannot fail at this point, barring an optimizer noticing that `can_be_parent`
// is unused and dropping early. Eat the error, as in this case, nothing is blocked waiting
// for the send
let _ = trigger.send((bcs.get_changeset_id().into_nodehash(), *bcs.manifestid()));
Self {
can_be_parent: can_be_parent.shared(),
completion_future: future::ok(bcs).boxify().shared(),
}
}
}
/// State used while tracking uploaded entries, to ensure that a changeset ends up with the right
/// set of blobs uploaded, and all linknodes present.
struct UploadEntriesState {
/// Listing of blobs that we need, based on parsing the root manifest and all the newly
/// uploaded child manifests
required_entries: HashMap<RepoPath, EntryId>,
/// All the blobs that have been uploaded in this changeset
uploaded_entries: HashMap<RepoPath, EntryId>,
/// Parent hashes (if any) of the blobs that have been uploaded in this changeset. Used for
/// validation of this upload - all parents must either have been uploaded in this changeset,
/// or be present in the blobstore before the changeset can complete.
parents: HashSet<NodeHash>,
blobstore: Arc<Blobstore>,
}
#[derive(Clone)]
pub struct UploadEntries {
inner: Arc<Mutex<UploadEntriesState>>,
}
impl UploadEntries {
pub fn new(blobstore: Arc<Blobstore>) -> Self {
Self {
inner: Arc::new(Mutex::new(UploadEntriesState {
required_entries: HashMap::new(),
uploaded_entries: HashMap::new(),
parents: HashSet::new(),
blobstore,
})),
}
}
/// Parse a manifest and record the referenced blobs so that we know whether or not we have
/// a complete changeset with all blobs, or whether there is missing data.
fn process_manifest(&self, entry: &BlobEntry, path: RepoPath) -> BoxFuture<(), Error> {
let inner_mutex = self.inner.clone();
let parents_found = self.find_parents(entry);
let entry_hash = entry.get_hash().into_nodehash();
let entry_type = entry.get_type();
entry
.get_content()
.and_then(move |content| match content {
Content::Tree(manifest) => manifest
.list()
.for_each(move |entry| {
let mpath = path.mpath()
.unwrap_or(&MPath::empty())
.join_element(entry.get_name());
let path = try_boxfuture!(match entry.get_type() {
manifest::Type::File
| manifest::Type::Symlink
| manifest::Type::Executable => RepoPath::file(mpath),
manifest::Type::Tree => RepoPath::dir(mpath),
});
let mut inner = inner_mutex.lock().expect("Lock poisoned");
inner.required_entries.insert(path, *entry.get_hash());
future::ok(()).boxify()
})
.boxify(),
_ => {
return future::err(ErrorKind::NotAManifest(entry_hash, entry_type).into())
.boxify()
}
})
.join(parents_found)
.map(|_| ())
.boxify()
}
fn find_parents(&self, entry: &BlobEntry) -> BoxFuture<(), Error> {
let inner_mutex = self.inner.clone();
entry
.get_parents()
.and_then(move |parents| {
let mut inner = inner_mutex.lock().expect("Lock poisoned");
inner.parents.extend(parents.into_iter());
future::ok(())
})
.map(|_| ())
.boxify()
}
/// The root manifest needs special processing - unlike all other entries, it is required even
/// if no other manifest references it. Otherwise, this function is the same as
/// `process_one_entry` and can be called after it.
/// It is safe to call this multiple times, but not recommended - every manifest passed to
/// this function is assumed required for this commit, even if it is not the root.
pub fn process_root_manifest(&self, entry: &BlobEntry) -> BoxFuture<(), Error> {
if entry.get_type() != manifest::Type::Tree {
return future::err(
ErrorKind::NotAManifest(entry.get_hash().into_nodehash(), entry.get_type()).into(),
).boxify();
}
{
let mut inner = self.inner.lock().expect("Lock poisoned");
inner
.required_entries
.insert(RepoPath::root(), *entry.get_hash());
}
self.process_one_entry(entry, RepoPath::root())
}
pub fn process_one_entry(&self, entry: &BlobEntry, path: RepoPath) -> BoxFuture<(), Error> {
{
let mut inner = self.inner.lock().expect("Lock poisoned");
inner
.uploaded_entries
.insert(path.clone(), *entry.get_hash());
}
if entry.get_type() == manifest::Type::Tree {
self.process_manifest(entry, path)
} else {
self.find_parents(&entry)
}
}
pub fn finalize(self, linknodes: Arc<Linknodes>, cs_id: NodeHash) -> BoxFuture<(), Error> {
let required_checks = {
let inner = self.inner.lock().expect("Lock poisoned");
let checks: Vec<_> = inner
.required_entries
.iter()
.filter_map(|(path, entryid)| {
if inner.uploaded_entries.contains_key(path) {
None
} else {
let key = get_node_key(entryid.into_nodehash());
let blobstore = inner.blobstore.clone();
Some(blobstore.assert_present(key))
}
})
.collect();
future::join_all(checks).boxify()
};
let parent_checks = {
let inner = self.inner.lock().expect("Lock poisoned");
let checks: Vec<_> = inner
.parents
.iter()
.map(|nodeid| {
let key = get_node_key(*nodeid);
let blobstore = inner.blobstore.clone();
blobstore.assert_present(key)
})
.collect();
future::join_all(checks).boxify()
};
let linknodes = {
let mut inner = self.inner.lock().expect("Lock poisoned");
let uploaded_entries = mem::replace(&mut inner.uploaded_entries, HashMap::new());
let futures = uploaded_entries.into_iter().map(move |(path, entryid)| {
linknodes
.add(path, &entryid.into_nodehash(), &cs_id)
.or_else(|err| match err.downcast_ref::<LinknodeErrorKind>() {
Some(&LinknodeErrorKind::AlreadyExists { .. }) => future::ok(()),
_ => future::err(err),
})
});
future::join_all(futures).boxify()
};
parent_checks
.join3(required_checks, linknodes)
.map(|_| ())
.boxify()
}
}
fn compute_changed_files_pair(
to: &Box<Manifest + Sync>,
from: &Box<Manifest + Sync>,
) -> BoxFuture<HashSet<MPath>, Error> {
changed_entry_stream(to, from, MPath::empty())
.filter_map(|change| match change.status {
EntryStatus::Deleted(entry)
| EntryStatus::Added(entry)
| EntryStatus::Modified(entry, _) => {
if entry.get_type() == manifest::Type::Tree {
None
} else {
Some(change.path.join_element(entry.get_name()))
}
}
})
.fold(HashSet::new(), |mut set, path| {
set.insert(path);
future::ok::<_, Error>(set)
})
.boxify()
}
pub fn compute_changed_files(
root: &Box<Manifest + Sync>,
p1: Option<&Box<Manifest + Sync>>,
p2: Option<&Box<Manifest + Sync>>,
) -> BoxFuture<Vec<MPath>, Error> {
let empty = manifest::EmptyManifest {}.boxed();
match (p1, p2) {
(None, None) => compute_changed_files_pair(&root, &empty),
(Some(manifest), None) | (None, Some(manifest)) => {
compute_changed_files_pair(&root, &manifest)
}
(Some(p1), Some(p2)) => compute_changed_files_pair(&root, &p1)
.join(compute_changed_files_pair(&root, &p2))
.map(|(left, right)| {
left.symmetric_difference(&right)
.cloned()
.collect::<HashSet<MPath>>()
})
.boxify(),
}.map(|files| {
let mut files: Vec<MPath> = files.into_iter().collect();
files.sort_unstable();
files
})
.boxify()
}
pub fn process_entries(
repo: BlobRepo,
entry_processor: &UploadEntries,
root_manifest: BoxFuture<(BlobEntry, RepoPath), Error>,
new_child_entries: BoxStream<(BlobEntry, RepoPath), Error>,
) -> BoxFuture<(Box<Manifest + Sync>, ManifestId), Error> {
root_manifest
.and_then({
let entry_processor = entry_processor.clone();
move |(entry, path)| {
let hash = entry.get_hash().into_nodehash();
if entry.get_type() == manifest::Type::Tree && path == RepoPath::RootPath {
entry_processor
.process_root_manifest(&entry)
.map(move |_| hash)
.boxify()
} else {
future::err(Error::from(ErrorKind::BadRootManifest(entry.get_type()))).boxify()
}
}
})
.and_then({
let entry_processor = entry_processor.clone();
|hash| {
new_child_entries
.for_each(move |(entry, path)| entry_processor.process_one_entry(&entry, path))
.map(move |_| hash)
}
})
.and_then(move |root_hash| {
repo.get_manifest_by_nodeid(&root_hash)
.map(move |m| (m, ManifestId::new(root_hash)))
})
.boxify()
}
pub fn extract_parents_complete(
p1: &Option<ChangesetHandle>,
p2: &Option<ChangesetHandle>,
) -> BoxFuture<SharedItem<()>, SharedError<Error>> {
match (p1.as_ref(), p2.as_ref()) {
(None, None) => future::ok(()).shared().boxify(),
(Some(p), None) | (None, Some(p)) => p.completion_future
.clone()
.and_then(|_| future::ok(()).shared())
.boxify(),
(Some(p1), Some(p2)) => p1.completion_future
.clone()
.join(p2.completion_future.clone())
.and_then(|_| future::ok(()).shared())
.boxify(),
}.boxify()
}
pub fn handle_parents(
repo: BlobRepo,
p1: Option<ChangesetHandle>,
p2: Option<ChangesetHandle>,
) -> BoxFuture<
(
Parents,
(Option<Box<Manifest + Sync>>),
(Option<Box<Manifest + Sync>>),
),
Error,
> {
let p1 = p1.map(|cs| cs.can_be_parent);
let p2 = p2.map(|cs| cs.can_be_parent);
p1.join(p2)
.and_then(|(p1, p2)| {
let p1 = match p1 {
Some(item) => {
let (hash, manifest) = *item;
(Some(hash), Some(manifest))
}
None => (None, None),
};
let p2 = match p2 {
Some(item) => {
let (hash, manifest) = *item;
(Some(hash), Some(manifest))
}
None => (None, None),
};
future::ok((p1, p2))
})
.map_err(|e| Error::from(e))
.and_then(move |((p1_hash, p1_manifest), (p2_hash, p2_manifest))| {
let parents = Parents::new(p1_hash.as_ref(), p2_hash.as_ref());
let p1_manifest = p1_manifest.map(|m| repo.get_manifest_by_nodeid(&m.into_nodehash()));
let p2_manifest = p2_manifest.map(|m| repo.get_manifest_by_nodeid(&m.into_nodehash()));
p1_manifest
.join(p2_manifest)
.map(move |(p1_manifest, p2_manifest)| (parents, p1_manifest, p2_manifest))
})
.boxify()
}
pub fn make_new_changeset(
parents: Parents,
root_hash: ManifestId,
user: String,
time: Time,
extra: BTreeMap<Vec<u8>, Vec<u8>>,
files: Vec<MPath>,
comments: String,
) -> Result<BlobChangeset> {
let changeset = RevlogChangeset::new_from_parts(
parents,
root_hash,
user.into_bytes(),
time,
extra,
files,
comments.into_bytes(),
);
BlobChangeset::new(changeset)
}

View File

@ -8,31 +8,41 @@
extern crate ascii;
extern crate bytes;
extern crate failure_ext as failure;
extern crate futures;
extern crate futures_ext;
extern crate blobrepo;
extern crate changesets;
extern crate many_files_dirs;
extern crate memblob;
extern crate membookmarks;
extern crate memheads;
extern crate memlinknodes;
extern crate mercurial_types;
use std::collections::BTreeMap;
use ascii::AsAsciiStr;
use bytes::Bytes;
use failure::Error;
use futures::executor::spawn;
use futures::future::Future;
use futures::stream::futures_unordered;
use futures_ext::{BoxFuture, StreamExt};
use blobrepo::BlobRepo;
use blobrepo::{compute_changed_files, BlobEntry, BlobRepo, ChangesetHandle};
use changesets::SqliteChangesets;
use memblob::EagerMemblob;
use memblob::{EagerMemblob, LazyMemblob};
use membookmarks::MemBookmarks;
use memheads::MemHeads;
use memlinknodes::MemLinknodes;
use mercurial_types::{manifest, Blob, Entry, EntryId, MPathElement, NodeHash, RepoPath,
RepositoryId};
use mercurial_types::{manifest, Blob, Changeset, ChangesetId, Entry, EntryId, MPath, MPathElement,
ManifestId, NodeHash, RepoPath, RepositoryId, Time};
fn get_empty_repo() -> BlobRepo {
// We start with utilities - search down for "TESTS BEGIN" to find the actual test cases
fn get_empty_eager_repo() -> BlobRepo {
let bookmarks: MemBookmarks = MemBookmarks::new();
let heads: MemHeads = MemHeads::new();
let blobs = EagerMemblob::new();
@ -43,126 +53,516 @@ fn get_empty_repo() -> BlobRepo {
BlobRepo::new_memblob(heads, bookmarks, blobs, linknodes, changesets, repoid)
}
#[test]
fn upload_blob_no_parents() {
let repo = get_empty_repo();
fn get_empty_lazy_repo() -> BlobRepo {
let bookmarks: MemBookmarks = MemBookmarks::new();
let heads: MemHeads = MemHeads::new();
let blobs = LazyMemblob::new();
let linknodes = MemLinknodes::new();
let changesets = SqliteChangesets::in_memory().expect("cannot create in memory changesets");
let repoid = RepositoryId::new(0);
let blob: Blob<Bytes> = Bytes::from(&b"blob"[..]).into();
let expected_hash = NodeHash::from_ascii_str(
"c3127cdbf2eae0f09653f9237d85c8436425b246"
.as_ascii_str()
.unwrap(),
).unwrap();
BlobRepo::new_lazymemblob(heads, bookmarks, blobs, linknodes, changesets, repoid)
}
macro_rules! test_both_repotypes {
($impl_name:ident, $lazy_test:ident, $eager_test:ident) => {
#[test]
fn $lazy_test() {
$impl_name(get_empty_lazy_repo())
}
#[test]
fn $eager_test() {
$impl_name(get_empty_eager_repo())
}
};
(should_panic, $impl_name:ident, $lazy_test:ident, $eager_test:ident) => {
#[test]
#[should_panic]
fn $lazy_test() {
$impl_name(get_empty_lazy_repo())
}
#[test]
#[should_panic]
fn $eager_test() {
$impl_name(get_empty_eager_repo())
}
}
}
fn upload_file_no_parents<S>(
repo: &BlobRepo,
data: S,
path: &RepoPath,
) -> (NodeHash, BoxFuture<(BlobEntry, RepoPath), Error>)
where
S: Into<String>,
{
let blob: Blob<Bytes> = Bytes::from(data.into().as_bytes()).into();
repo.upload_entry(blob, manifest::Type::File, None, None, path.clone())
.unwrap()
}
fn upload_file_one_parent<S>(
repo: &BlobRepo,
data: S,
path: &RepoPath,
p1: NodeHash,
) -> (NodeHash, BoxFuture<(BlobEntry, RepoPath), Error>)
where
S: Into<String>,
{
let blob: Blob<Bytes> = Bytes::from(data.into().as_bytes()).into();
repo.upload_entry(blob, manifest::Type::File, Some(p1), None, path.clone())
.unwrap()
}
fn upload_manifest_no_parents<S>(
repo: &BlobRepo,
data: S,
path: &RepoPath,
) -> (NodeHash, BoxFuture<(BlobEntry, RepoPath), Error>)
where
S: Into<String>,
{
let blob: Blob<Bytes> = Bytes::from(data.into().as_bytes()).into();
repo.upload_entry(blob, manifest::Type::Tree, None, None, path.clone())
.unwrap()
}
fn upload_manifest_one_parent<S>(
repo: &BlobRepo,
data: S,
path: &RepoPath,
p1: NodeHash,
) -> (NodeHash, BoxFuture<(BlobEntry, RepoPath), Error>)
where
S: Into<String>,
{
let blob: Blob<Bytes> = Bytes::from(data.into().as_bytes()).into();
repo.upload_entry(blob, manifest::Type::Tree, Some(p1), None, path.clone())
.unwrap()
}
fn create_changeset_no_parents(
repo: &BlobRepo,
root_manifest: BoxFuture<(BlobEntry, RepoPath), Error>,
other_nodes: Vec<BoxFuture<(BlobEntry, RepoPath), Error>>,
) -> ChangesetHandle {
repo.create_changeset(
None,
None,
root_manifest,
futures_unordered(other_nodes).boxify(),
"author <author@fb.com>".into(),
Time { time: 0, tz: 0 },
BTreeMap::new(),
"Test commit".into(),
)
}
fn create_changeset_one_parent(
repo: &BlobRepo,
root_manifest: BoxFuture<(BlobEntry, RepoPath), Error>,
other_nodes: Vec<BoxFuture<(BlobEntry, RepoPath), Error>>,
p1: ChangesetHandle,
) -> ChangesetHandle {
repo.create_changeset(
Some(p1),
None,
root_manifest,
futures_unordered(other_nodes).boxify(),
"\u{041F}\u{0451}\u{0442}\u{0440} <peter@fb.com>".into(),
Time { time: 1234, tz: 0 },
BTreeMap::new(),
"Child commit".into(),
)
}
fn string_to_nodehash(hash: &str) -> NodeHash {
NodeHash::from_ascii_str(hash.as_ascii_str().unwrap()).unwrap()
}
fn run_future<F>(future: F) -> Result<F::Item, F::Error>
where
F: Future,
{
spawn(future).wait_future()
}
// TESTS BEGIN
fn upload_blob_no_parents(repo: BlobRepo) {
let expected_hash = string_to_nodehash("c3127cdbf2eae0f09653f9237d85c8436425b246");
let fake_path = RepoPath::file("fake/file").expect("Can't generate fake RepoPath");
// The blob does not exist...
assert!(
spawn(repo.get_file_content(&expected_hash))
.wait_future()
.is_err()
);
assert!(run_future(repo.get_file_content(&expected_hash)).is_err());
// We upload it...
let (hash, future) =
repo.upload_entry(blob, manifest::Type::File, None, None, fake_path.clone())
.unwrap();
let (hash, future) = upload_file_no_parents(&repo, "blob", &fake_path);
assert!(hash == expected_hash);
// The entry we're given is correct...
assert!(
spawn(
future
.and_then(|(entry, path)| {
assert!(path == fake_path);
assert!(entry.get_hash() == &EntryId::new(expected_hash));
assert!(entry.get_type() == manifest::Type::File);
assert!(entry.get_name() == &Some(MPathElement::new("file".into())));
entry.get_content()
})
.map(|content| match content {
manifest::Content::File(f) => assert!(f == b"blob"[..].into()),
_ => panic!(),
}),
).wait_future()
.is_ok()
);
let (entry, path) = run_future(future).unwrap();
assert!(path == fake_path);
assert!(entry.get_hash() == &EntryId::new(expected_hash));
assert!(entry.get_type() == manifest::Type::File);
assert!(entry.get_name() == &Some(MPathElement::new("file".into())));
let content = run_future(entry.get_content()).unwrap();
match content {
manifest::Content::File(f) => assert!(f == b"blob"[..].into()),
_ => panic!(),
};
// And the blob now exists
let bytes = run_future(repo.get_file_content(&expected_hash)).unwrap();
assert!(bytes == b"blob");
}
test_both_repotypes!(
upload_blob_no_parents,
upload_blob_no_parents_lazy,
upload_blob_no_parents_eager
);
fn upload_blob_one_parent(repo: BlobRepo) {
let expected_hash = string_to_nodehash("c2d60b35a8e7e034042a9467783bbdac88a0d219");
let fake_path = RepoPath::file("fake/file").expect("Can't generate fake RepoPath");
let (p1, future) = upload_file_no_parents(&repo, "blob", &fake_path);
// The blob does not exist...
run_future(repo.get_file_content(&expected_hash)).is_err();
// We upload it...
let (hash, future2) = upload_file_one_parent(&repo, "blob", &fake_path, p1);
assert!(hash == expected_hash);
// The entry we're given is correct...
let (entry, path) = run_future(future2.join(future).map(|(item, _)| item)).unwrap();
assert!(path == fake_path);
assert!(entry.get_hash() == &EntryId::new(expected_hash));
assert!(entry.get_type() == manifest::Type::File);
assert!(entry.get_name() == &Some(MPathElement::new("file".into())));
let content = run_future(entry.get_content()).unwrap();
match content {
manifest::Content::File(f) => assert!(f == b"blob"[..].into()),
_ => panic!(),
};
// And the blob now exists
let bytes = run_future(repo.get_file_content(&expected_hash)).unwrap();
assert!(bytes == b"blob");
}
test_both_repotypes!(
upload_blob_one_parent,
upload_blob_one_parent_lazy,
upload_blob_one_parent_eager
);
fn create_one_changeset(repo: BlobRepo) {
let fake_file_path = RepoPath::file("file").expect("Can't generate fake RepoPath");
let fake_dir_path = RepoPath::dir("dir").expect("Can't generate fake RepoPath");
let expected_files = vec![
RepoPath::file("dir/file")
.expect("Can't generate fake RepoPath")
.mpath()
.unwrap()
.clone(),
];
let author: String = "author <author@fb.com>".into();
let (filehash, file_future) = upload_file_no_parents(&repo, "blob", &fake_file_path);
let (dirhash, manifest_dir_future) =
upload_manifest_no_parents(&repo, format!("file\0{}\n", filehash), &fake_dir_path);
let (roothash, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
let commit = create_changeset_no_parents(
&repo,
root_manifest_future,
vec![file_future, manifest_dir_future],
);
let cs = run_future(commit.get_completed_changeset()).unwrap();
assert!(cs.manifestid() == &ManifestId::new(roothash));
assert!(cs.user() == author.as_bytes());
assert!(cs.parents().get_nodes() == (None, None));
let files: Vec<_> = cs.files().into();
assert!(
spawn(
repo.get_file_content(&expected_hash)
.map(|bytes| assert!(bytes == b"blob"))
).wait_future()
.is_ok()
files == expected_files,
format!("Got {:?}, expected {:?}", files, expected_files)
);
// And check the file blob is present
let bytes = run_future(repo.get_file_content(&filehash)).unwrap();
assert!(bytes == b"blob");
}
test_both_repotypes!(
create_one_changeset,
create_one_changeset_lazy,
create_one_changeset_eager
);
fn create_two_changesets(repo: BlobRepo) {
let fake_file_path = RepoPath::file("file").expect("Can't generate fake RepoPath");
let fake_dir_path = RepoPath::file("dir").expect("Can't generate fake RepoPath");
let utf_author: String = "\u{041F}\u{0451}\u{0442}\u{0440} <peter@fb.com>".into();
let (filehash, file_future) = upload_file_no_parents(&repo, "blob", &fake_file_path);
let (dirhash, manifest_dir_future) =
upload_manifest_no_parents(&repo, format!("file\0{}\n", filehash), &fake_dir_path);
let (roothash, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
let commit1 = create_changeset_no_parents(
&repo,
root_manifest_future,
vec![file_future, manifest_dir_future],
);
let (roothash, root_manifest_future) = upload_manifest_one_parent(
&repo,
format!("file\0{}\n", filehash),
&RepoPath::root(),
roothash,
);
let commit2 = create_changeset_one_parent(&repo, root_manifest_future, vec![], commit1.clone());
let (commit1, commit2) = run_future(
commit1
.get_completed_changeset()
.join(commit2.get_completed_changeset()),
).unwrap();
assert!(commit2.manifestid() == &ManifestId::new(roothash));
assert!(commit2.user() == utf_author.as_bytes());
let files: Vec<_> = commit2.files().into();
let expected_files = vec![MPath::new("dir/file").unwrap(), MPath::new("file").unwrap()];
assert!(
files == expected_files,
format!("Got {:?}, expected {:?}", files, expected_files)
);
assert!(commit1.parents().get_nodes() == (None, None));
let commit1_id = Some(commit1.get_changeset_id().into_nodehash());
let expected_parents = (commit1_id.as_ref(), None);
assert!(commit2.parents().get_nodes() == expected_parents);
let linknode = run_future(repo.get_linknode(fake_file_path, &filehash)).unwrap();
assert!(
linknode == commit1.get_changeset_id().into_nodehash(),
"Bad linknode {} - should be {}",
linknode,
commit1.get_changeset_id().into_nodehash()
);
}
#[test]
fn upload_blob_one_parent() {
let repo = get_empty_repo();
test_both_repotypes!(
create_two_changesets,
create_two_changesets_lazy,
create_two_changesets_eager
);
let blob: Blob<Bytes> = Bytes::from(&b"blob"[..]).into();
let expected_hash = NodeHash::from_ascii_str(
"c2d60b35a8e7e034042a9467783bbdac88a0d219"
.as_ascii_str()
.unwrap(),
).unwrap();
let fake_path = RepoPath::file("fake/file").expect("Can't generate fake RepoPath");
fn create_bad_changeset(repo: BlobRepo) {
let dirhash = string_to_nodehash("c2d60b35a8e7e034042a9467783bbdac88a0d219");
let (p1, future) = repo.upload_entry(
blob.clone(),
manifest::Type::File,
None,
None,
fake_path.clone(),
).unwrap();
let (_, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
// The blob does not exist...
assert!(
spawn(repo.get_file_content(&expected_hash))
.wait_future()
.is_err()
);
let commit = create_changeset_no_parents(&repo, root_manifest_future, vec![]);
// We upload it...
let future = future.map(|_| {
repo.upload_entry(
blob,
manifest::Type::File,
Some(p1),
None,
fake_path.clone(),
run_future(commit.get_completed_changeset()).unwrap();
}
test_both_repotypes!(
should_panic,
create_bad_changeset,
create_bad_changeset_lazy,
create_bad_changeset_eager
);
fn create_double_linknode(repo: BlobRepo) {
let fake_file_path = RepoPath::file("file").expect("Can't generate fake RepoPath");
let fake_dir_path = RepoPath::dir("dir").expect("Can't generate fake RepoPath");
let (filehash, parent_commit) = {
let (filehash, file_future) = upload_file_no_parents(&repo, "blob", &fake_file_path);
let (dirhash, manifest_dir_future) =
upload_manifest_no_parents(&repo, format!("file\0{}\n", filehash), &fake_dir_path);
let (_, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
(
filehash,
create_changeset_no_parents(
&repo,
root_manifest_future,
vec![manifest_dir_future, file_future],
),
)
});
let (hash, future) = spawn(future).wait_future().unwrap().unwrap();
};
assert!(hash == expected_hash);
let child_commit = {
let (filehash, file_future) = upload_file_no_parents(&repo, "blob", &fake_file_path);
// The entry we're given is correct...
let (dirhash, manifest_dir_future) =
upload_manifest_no_parents(&repo, format!("file\0{}\n", filehash), &fake_dir_path);
let (_, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
create_changeset_one_parent(
&repo,
root_manifest_future,
vec![manifest_dir_future, file_future],
parent_commit.clone(),
)
};
let child = run_future(child_commit.get_completed_changeset()).unwrap();
let parent = run_future(parent_commit.get_completed_changeset()).unwrap();
let linknode = run_future(repo.get_linknode(fake_file_path, &filehash)).unwrap();
assert!(
spawn(
future
.and_then(|(entry, path)| {
assert!(path == fake_path);
assert!(entry.get_hash() == &EntryId::new(expected_hash));
assert!(entry.get_type() == manifest::Type::File);
assert!(entry.get_name() == &Some(MPathElement::new("file".into())));
entry.get_content()
})
.map(|content| match content {
manifest::Content::File(f) => assert!(f == b"blob"[..].into()),
_ => panic!(),
}),
).wait_future()
.is_ok()
linknode != child.get_changeset_id().into_nodehash(),
"Linknode on child commit = should be on parent"
);
// And the blob now exists
assert!(
spawn(
repo.get_file_content(&expected_hash)
.map(|bytes| assert!(bytes == b"blob"))
).wait_future()
.is_ok()
linknode == parent.get_changeset_id().into_nodehash(),
"Linknode not on parent commit - ended up on {} instead",
linknode
);
}
test_both_repotypes!(
create_double_linknode,
create_double_linknode_lazy,
create_double_linknode_eager
);
fn check_linknode_creation(repo: BlobRepo) {
let fake_dir_path = RepoPath::dir("dir").expect("Can't generate fake RepoPath");
let author: String = "author <author@fb.com>".into();
let files: Vec<_> = (1..100)
.into_iter()
.map(|id| {
let path = RepoPath::file(
MPath::new(format!("file{}", id)).expect("String to MPath failed"),
).expect("Can't generate fake RepoPath");
let (hash, future) = upload_file_no_parents(&repo, format!("blob id {}", id), &path);
((hash, path), future)
})
.collect();
let (metadata, mut uploads): (Vec<_>, Vec<_>) = files.into_iter().unzip();
let manifest = metadata
.iter()
.fold(String::new(), |mut acc, &(hash, ref path)| {
acc.push_str(format!("{}\0{}\n", path, hash).as_str());
acc
});
let (dirhash, manifest_dir_future) =
upload_manifest_no_parents(&repo, manifest, &fake_dir_path);
let (roothash, root_manifest_future) =
upload_manifest_no_parents(&repo, format!("dir\0{}t\n", dirhash), &RepoPath::root());
uploads.push(manifest_dir_future);
let commit = create_changeset_no_parents(&repo, root_manifest_future, uploads);
let cs = run_future(commit.get_completed_changeset()).unwrap();
assert!(cs.manifestid() == &ManifestId::new(roothash));
assert!(cs.user() == author.as_bytes());
assert!(cs.parents().get_nodes() == (None, None));
let cs_id = cs.get_changeset_id().into_nodehash();
// And check all the linknodes got created
metadata.into_iter().for_each(|(hash, path)| {
let linknode = run_future(repo.get_linknode(path, &hash)).unwrap();
assert!(
linknode == cs_id,
"Linknode is {}, should be {}",
linknode,
cs_id
);
})
}
test_both_repotypes!(
check_linknode_creation,
check_linknode_creation_lazy,
check_linknode_creation_eager
);
#[test]
fn test_compute_changed_files_no_parents() {
let repo = many_files_dirs::getrepo();
let nodehash = string_to_nodehash("a6cb7dddec32acaf9a28db46cdb3061682155531");
let expected = vec![
MPath::new(b"1").unwrap(),
MPath::new(b"2").unwrap(),
MPath::new(b"dir1").unwrap(),
MPath::new(b"dir2/file_1_in_dir2").unwrap(),
];
let cs = run_future(repo.get_changeset_by_changesetid(&ChangesetId::new(nodehash))).unwrap();
let mf = run_future(repo.get_manifest_by_nodeid(&cs.manifestid().into_nodehash())).unwrap();
let diff = run_future(compute_changed_files(&mf, None, None)).unwrap();
assert!(
diff == expected,
"Got {:?}, expected {:?}\n",
diff,
expected,
);
}
#[test]
fn test_compute_changed_files_one_parent() {
// Note that this is a commit and its parent commit, so you can use:
// hg log -T"{node}\n{files % ' MPath::new(b\"{file}\").unwrap(),\\n'}\\n" -r $HASH
// to see how Mercurial would compute the files list and confirm that it's the same
let repo = many_files_dirs::getrepo();
let nodehash = string_to_nodehash("a6cb7dddec32acaf9a28db46cdb3061682155531");
let parenthash = string_to_nodehash("473b2e715e0df6b2316010908879a3c78e275dd9");
let expected = vec![
MPath::new(b"dir1").unwrap(),
MPath::new(b"dir1/file_1_in_dir1").unwrap(),
MPath::new(b"dir1/file_2_in_dir1").unwrap(),
MPath::new(b"dir1/subdir1/file_1").unwrap(),
MPath::new(b"dir1/subdir1/subsubdir1/file_1").unwrap(),
MPath::new(b"dir1/subdir1/subsubdir2/file_1").unwrap(),
MPath::new(b"dir1/subdir1/subsubdir2/file_2").unwrap(),
];
let cs = run_future(repo.get_changeset_by_changesetid(&ChangesetId::new(nodehash))).unwrap();
let mf = run_future(repo.get_manifest_by_nodeid(&cs.manifestid().into_nodehash())).unwrap();
let parent_cs =
run_future(repo.get_changeset_by_changesetid(&ChangesetId::new(parenthash))).unwrap();
let parent_mf =
run_future(repo.get_manifest_by_nodeid(&parent_cs.manifestid().into_nodehash())).unwrap();
let diff = run_future(compute_changed_files(&mf, Some(&parent_mf), None)).unwrap();
assert!(
diff == expected,
"Got {:?}, expected {:?}\n",
diff,
expected,
);
}

View File

@ -132,7 +132,7 @@ where
.get_changeset_by_changesetid(&csid)
.from_err()
.and_then(move |cs| {
let bcs = BlobChangeset::new(&csid, cs);
let bcs = BlobChangeset::new_with_id(&csid, cs);
sender
.send(BlobstoreEntry::Changeset(bcs))
.map_err(Error::from)

View File

@ -7,8 +7,8 @@
use std::fmt::{self, Display};
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use futures::future::{self, Future};
use futures::stream::{self, Stream};
use blob::Blob;
use blobnode::Parents;
@ -49,6 +49,18 @@ pub trait Manifest: Send + 'static {
}
}
pub struct EmptyManifest;
impl Manifest for EmptyManifest {
fn lookup(&self, _path: &MPath) -> BoxFuture<Option<Box<Entry + Sync>>, Error> {
future::ok(None).boxify()
}
fn list(&self) -> BoxStream<Box<Entry + Sync>, Error> {
stream::empty().boxify()
}
}
pub struct BoxManifest<M>
where
M: Manifest,

View File

@ -153,6 +153,26 @@ fn parsetimeextra<S: AsRef<[u8]>>(s: S) -> Result<(Time, Extra)> {
}
impl RevlogChangeset {
pub fn new_from_parts(
parents: Parents,
manifestid: ManifestId,
user: Vec<u8>,
time: Time,
extra: BTreeMap<Vec<u8>, Vec<u8>>,
files: Vec<MPath>,
comments: Vec<u8>,
) -> Self {
Self {
parents,
manifestid,
user,
time,
extra: Extra(extra),
files,
comments,
}
}
pub fn new<T: AsRef<[u8]>>(node: BlobNode<T>) -> Result<Self> {
Self::parse(node)
}