bundle2-resolver: use BlobRepo::create_changeset to upload changesets

Reviewed By: farnz

Differential Revision: D7035844

fbshipit-source-id: a0bc89f717bf878ada981bc04ae6fea0ac0b5660
This commit is contained in:
Lukas Piatkowski 2018-02-28 11:09:19 -08:00 committed by Facebook Github Bot
parent 995e15697e
commit 22a7f7e523

View File

@ -11,13 +11,15 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::{Future, Stream};
use futures::future::{err, ok};
use futures::stream;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use slog::Logger;
use blobrepo::BlobRepo;
use blobrepo::{BlobEntry, BlobRepo, ChangesetHandle};
use mercurial::changeset::RevlogChangeset;
use mercurial::manifest::revlog::ManifestContent;
use mercurial_bundles::{parts, Bundle2EncodeBuilder, Bundle2Item};
use mercurial_types::{NodeHash, RepoPath};
use mercurial_types::{Changeset, ChangesetId, MPath, ManifestId, NodeHash, RepoPath};
use changegroup::{convert_to_revlog_changesets, convert_to_revlog_filelog, split_changegroup,
Filelog};
@ -29,6 +31,7 @@ type PartId = u32;
type Changesets = Vec<(NodeHash, RevlogChangeset)>;
type Filelogs = HashMap<(NodeHash, RepoPath), <Filelog as UploadableBlob>::Value>;
type Manifests = HashMap<(NodeHash, RepoPath), <TreemanifestEntry as UploadableBlob>::Value>;
type UploadedChangesets = HashMap<NodeHash, ChangesetHandle>;
/// The resolve function takes a bundle2, interprets it's content as Changesets, Filelogs and
/// Manifests and uploades all of them to the provided BlobRepo in the correct order.
@ -166,17 +169,92 @@ impl Bundle2Resolver {
.boxify()
}
/// Takes parsed Changesets and scheduled for upload Filelogs and Manifests. The content of
/// Manifests is used to figure out DAG of dependencies between a given Changeset and the
/// Manifests and Filelogs it adds.
/// The Changesets are scheduled for uploading and a Future is returned, whose completion means
/// that the changesets were uploaded
fn upload_changesets(
&self,
changesets: Changesets,
filelogs: Filelogs,
manifests: Manifests,
) -> BoxFuture<(), Error> {
fn upload_changeset(
repo: Arc<BlobRepo>,
node: NodeHash,
revlog_cs: RevlogChangeset,
mut uploaded_changesets: UploadedChangesets,
filelogs: &Filelogs,
manifests: &Manifests,
) -> BoxFuture<UploadedChangesets, Error> {
let (p1, p2) = {
let (p1, p2) = revlog_cs.parents().get_nodes();
(
get_parent(&repo, &uploaded_changesets, p1.cloned()),
get_parent(&repo, &uploaded_changesets, p2.cloned()),
)
};
let (root_manifest, entries) = try_boxfuture!(walk_manifests(
*revlog_cs.manifestid(),
&manifests,
&filelogs
));
p1.join(p2)
.and_then(move |(p1, p2)| {
let scheduled_uploading = repo.create_changeset(
p1,
p2,
root_manifest,
entries,
String::from_utf8(revlog_cs.user().into())?,
revlog_cs.time().clone(),
revlog_cs.extra().clone(),
String::from_utf8(revlog_cs.comments().into())?,
);
uploaded_changesets.insert(node, scheduled_uploading);
Ok(uploaded_changesets)
})
.boxify()
}
let repo = self.repo.clone();
debug!(self.logger, "changesets: {:?}", changesets);
debug!(self.logger, "filelogs: {:?}", filelogs.keys());
debug!(self.logger, "manifests: {:?}", manifests.keys());
ok(()).boxify()
stream::iter_ok(changesets)
.fold(
HashMap::new(),
move |uploaded_changesets, (node, revlog_cs)| {
upload_changeset(
repo.clone(),
node.clone(),
revlog_cs,
uploaded_changesets,
&filelogs,
&manifests,
).map_err(move |err| {
err.context(format!(
"While trying to upload Changeset with id {:?}",
node
))
})
},
)
.and_then(|uploaded_changesets| {
stream::futures_unordered(
uploaded_changesets
.into_iter()
.map(|(_, cs)| cs.get_completed_changeset()),
).map_err(|err| format_err!("{:?}", err))
.for_each(|_| Ok(()))
})
.map_err(|err| err.context("While uploading Changesets to BlobRepo").into())
.boxify()
}
/// Ensures that the next item in stream is None
@ -212,3 +290,109 @@ impl Bundle2Resolver {
.boxify()
}
}
/// Retrieves the parent from uploaded changesets, if it is missing then fetches it from BlobRepo
fn get_parent(
repo: &BlobRepo,
map: &UploadedChangesets,
p: Option<NodeHash>,
) -> BoxFuture<Option<ChangesetHandle>, Error> {
match p {
None => ok(None).boxify(),
Some(p) => match map.get(&p) {
None => repo.get_changeset_by_changesetid(&ChangesetId::new(p))
.map(|cs| Some(cs.into()))
.boxify(),
Some(cs) => ok(Some(cs.clone())).boxify(),
},
}
}
type BlobFuture = BoxFuture<(BlobEntry, RepoPath), Error>;
type BlobStream = BoxStream<(BlobEntry, RepoPath), Error>;
/// In order to generate the DAG of dependencies between Root Manifest and other Manifests and
/// Filelogs we need to walk that DAG.
/// This function starts with the Root Manifest Id and returns Future for Root Manifest and Stream
/// of all dependent Manifests and Filelogs that were provided in this push.
fn walk_manifests(
manifest_root_id: ManifestId,
manifests: &Manifests,
filelogs: &Filelogs,
) -> Result<(BlobFuture, BlobStream)> {
fn walk_helper(
path_taken: &MPath,
manifest_content: &ManifestContent,
manifests: &Manifests,
filelogs: &Filelogs,
) -> Result<Vec<BlobFuture>> {
if path_taken.len() > 4096 {
bail_msg!(
"Exceeded max manifest path during walking with path: {:?}",
path_taken
);
}
let mut entries: Vec<BlobFuture> = Vec::new();
for (name, details) in manifest_content.files.iter() {
let nodehash = details.entryid().clone().into_nodehash();
let next_path = path_taken.join(name);
if details.is_tree() {
let key = (nodehash, RepoPath::dir(next_path)?);
if let Some(&(ref manifest_content, ref blobfuture)) = manifests.get(&key) {
entries.push(
blobfuture
.clone()
.map(|it| (*it).clone())
.map_err(|err| format_err!("{:?}", err))
.boxify(),
);
entries.append(&mut walk_helper(
key.1.mpath().expect("RepoPath::dir must have MPath"),
manifest_content,
manifests,
filelogs,
)?);
}
} else {
if let Some(blobfuture) = filelogs.get(&(nodehash, RepoPath::file(next_path)?)) {
entries.push(
blobfuture
.clone()
.map(|it| (*it).clone())
.map_err(|err| format_err!("{:?}", err))
.boxify(),
);
}
}
}
Ok(entries)
}
let &(ref manifest_content, ref manifest_root) = manifests
.get(&(manifest_root_id.clone().into_nodehash(), RepoPath::root()))
.ok_or_else(|| format_err!("Missing root tree manifest"))?;
Ok((
manifest_root
.clone()
.map(|it| (*it).clone())
.map_err(|err| format_err!("{:?}", err))
.boxify(),
stream::futures_unordered(walk_helper(
&MPath::empty(),
&manifest_content,
manifests,
filelogs,
)?).map_err(move |err| {
err.context(format!(
"While walking dependencies of Root Manifest with id {:?}",
manifest_root_id
)).into()
})
.boxify(),
))
}