From a692a2a9ea4160a03fbb174ca47addbff6d4a73b Mon Sep 17 00:00:00 2001 From: Lukas Piatkowski Date: Tue, 24 Apr 2018 11:03:16 -0700 Subject: [PATCH] new_blobimport: split main.rs to make this crate more maintable Summary: I am planning to add importing bookmarks, doing it on current main.rs would make it unreadable, so I am splitting this file now Reviewed By: farnz Differential Revision: D7728185 fbshipit-source-id: fdfb4f60eecd9c8af7626bd0e892bb1bfbf7f081 --- cmds/new_blobimport/changeset.rs | 272 ++++++++++++++++++++++++++++ cmds/new_blobimport/main.rs | 300 ++++--------------------------- 2 files changed, 304 insertions(+), 268 deletions(-) create mode 100644 cmds/new_blobimport/changeset.rs diff --git a/cmds/new_blobimport/changeset.rs b/cmds/new_blobimport/changeset.rs new file mode 100644 index 0000000000..8d24290523 --- /dev/null +++ b/cmds/new_blobimport/changeset.rs @@ -0,0 +1,272 @@ +// Copyright (c) 2004-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use failure_ext::{err_msg, Error, Fail, FutureFailureErrorExt, FutureFailureExt, ResultExt, + StreamFailureErrorExt}; +use futures::{Future, IntoFuture}; +use futures::future::{self, SharedItem}; +use futures::stream::{self, Stream}; +use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; + +use blobrepo::{BlobChangeset, BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, + UploadHgEntry}; +use mercurial::{manifest, HgChangesetId, HgManifestId, HgNodeHash, RevlogChangeset, RevlogEntry, + RevlogRepo, NULL_HASH}; +use mercurial_types::{HgBlob, MPath, RepoPath, Type}; + +struct ParseChangeset { + revlogcs: BoxFuture, Error>, + rootmf: + BoxFuture, Option)>, Error>, + entries: BoxStream<(Option, RevlogEntry), Error>, +} + +// Extracts all the data from revlog repo that commit API may need. +fn parse_changeset(revlog_repo: RevlogRepo, csid: HgChangesetId) -> ParseChangeset { + let revlogcs = revlog_repo + .get_changeset(&csid) + .with_context(move |_| format!("While reading changeset {:?}", csid)) + .map_err(Fail::compat) + .boxify() + .shared(); + + let rootmf = revlogcs + .clone() + .map_err(Error::from) + .and_then({ + let revlog_repo = revlog_repo.clone(); + move |cs| { + if cs.manifestid().into_nodehash() == NULL_HASH { + future::ok(None).boxify() + } else { + revlog_repo + .get_root_manifest(cs.manifestid()) + .map({ + let manifest_id = *cs.manifestid(); + move |rootmf| Some((manifest_id, rootmf)) + }) + .boxify() + } + } + }) + .with_context(move |_| format!("While reading root manifest for {:?}", csid)) + .map_err(Fail::compat) + .boxify() + .shared(); + + let entries = revlogcs + .clone() + .map_err(Error::from) + .and_then({ + let revlog_repo = revlog_repo.clone(); + move |cs| { + let mut parents = cs.parents() + .into_iter() + .map(HgChangesetId::new) + .map(|csid| { + let revlog_repo = revlog_repo.clone(); + revlog_repo + .get_changeset(&csid) + .and_then(move |cs| { + if cs.manifestid().into_nodehash() == NULL_HASH { + future::ok(None).boxify() + } else { + revlog_repo + .get_root_manifest(cs.manifestid()) + .map(Some) + .boxify() + } + }) + .boxify() + }); + + let p1 = parents.next().unwrap_or(Ok(None).into_future().boxify()); + let p2 = parents.next().unwrap_or(Ok(None).into_future().boxify()); + + p1.join(p2) + .with_context(move |_| format!("While reading parents of {:?}", csid)) + .from_err() + } + }) + .join(rootmf.clone().from_err()) + .map(|((p1, p2), rootmf_shared)| match *rootmf_shared { + None => stream::empty().boxify(), + Some((_, ref rootmf)) => { + manifest::new_entry_intersection_stream(&rootmf, p1.as_ref(), p2.as_ref()) + } + }) + .flatten_stream() + .with_context(move |_| format!("While reading entries for {:?}", csid)) + .from_err() + .boxify(); + + let revlogcs = revlogcs.map_err(Error::from).boxify(); + + let rootmf = rootmf + .map_err(Error::from) + .and_then(move |rootmf_shared| match *rootmf_shared { + None => Ok(None), + Some((manifest_id, ref mf)) => { + let mut bytes = Vec::new(); + mf.generate(&mut bytes).with_context(|_| { + format!("While generating root manifest blob for {:?}", csid) + })?; + + let (p1, p2) = mf.parents().get_nodes(); + Ok(Some(( + manifest_id, + HgBlob::from(Bytes::from(bytes)), + p1.cloned(), + p2.cloned(), + ))) + } + }) + .boxify(); + + ParseChangeset { + revlogcs, + rootmf, + entries, + } +} + +fn upload_entry( + blobrepo: &BlobRepo, + entry: RevlogEntry, + path: Option, +) -> BoxFuture<(HgBlobEntry, RepoPath), Error> { + let blobrepo = blobrepo.clone(); + + let ty = entry.get_type(); + + let path = MPath::join_element_opt(path.as_ref(), entry.get_name()); + let path = match path { + // XXX this shouldn't be possible -- encode this in the type system + None => { + return future::err(err_msg( + "internal error: joined root path with root manifest", + )).boxify() + } + Some(path) => path, + }; + let path = match ty { + Type::Tree => RepoPath::DirectoryPath(path), + Type::File(_) => RepoPath::FilePath(path), + }; + + let content = entry.get_raw_content(); + let parents = entry.get_parents(); + + content + .join(parents) + .and_then(move |(content, parents)| { + let (p1, p2) = parents.get_nodes(); + let upload = UploadHgEntry { + nodeid: entry.get_hash().into_nodehash(), + raw_content: content, + content_type: ty, + p1: p1.cloned(), + p2: p2.cloned(), + path, + check_nodeid: true, + }; + upload.upload(&blobrepo) + }) + .and_then(|(_, entry)| entry) + .boxify() +} + +pub fn upload_changesets( + revlogrepo: RevlogRepo, + blobrepo: Arc, +) -> BoxStream, Error>, Error> { + let mut parent_changeset_handles: HashMap = HashMap::new(); + + revlogrepo + .changesets() + .and_then({ + let revlogrepo = revlogrepo.clone(); + move |csid| { + let ParseChangeset { + revlogcs, + rootmf, + entries, + } = parse_changeset(revlogrepo.clone(), HgChangesetId::new(csid)); + revlogcs.map(move |cs| (csid, cs, rootmf, entries)) + } + }) + .map(move |(csid, cs, rootmf, entries)| { + let rootmf = rootmf + .and_then({ + let blobrepo = blobrepo.clone(); + move |rootmf| { + match rootmf { + None => future::ok(None).boxify(), + Some((manifest_id, blob, p1, p2)) => { + let upload = UploadHgEntry { + nodeid: manifest_id.into_nodehash(), + raw_content: blob, + content_type: Type::Tree, + p1, + p2, + path: RepoPath::root(), + // The root tree manifest is expected to have the wrong hash in + // hybrid mode. This will probably never go away for + // compatibility with old repositories. + check_nodeid: false, + }; + let (_, entry) = try_boxfuture!(upload.upload(&blobrepo)); + entry.map(Some).boxify() + } + } + } + }) + .boxify(); + + let entries = entries + .and_then({ + let blobrepo = blobrepo.clone(); + move |(path, entry)| upload_entry(&blobrepo, entry, path) + }) + .boxify(); + + let (p1handle, p2handle) = { + let mut parents = cs.parents().into_iter().map(|p| { + parent_changeset_handles + .get(&p) + .cloned() + .expect(&format!("parent {} not found for {}", p, csid)) + }); + + (parents.next(), parents.next()) + }; + + let create_changeset = CreateChangeset { + p1: p1handle, + p2: p2handle, + root_manifest: rootmf, + sub_entries: entries, + user: String::from_utf8(Vec::from(cs.user())) + .expect(&format!("non-utf8 username for {}", csid)), + time: cs.time().clone(), + extra: cs.extra().clone(), + comments: String::from_utf8(Vec::from(cs.comments())) + .expect(&format!("non-utf8 comments for {}", csid)), + }; + let cshandle = create_changeset.create(&blobrepo); + parent_changeset_handles.insert(csid, cshandle.clone()); + cshandle + .get_completed_changeset() + .with_context(move |_| format!("While uploading changeset: {}", csid)) + .from_err() + .boxify() + }) + .boxify() +} diff --git a/cmds/new_blobimport/main.rs b/cmds/new_blobimport/main.rs index 10d8b31869..e3362b9d0b 100644 --- a/cmds/new_blobimport/main.rs +++ b/cmds/new_blobimport/main.rs @@ -22,194 +22,26 @@ extern crate slog; extern crate slog_glog_fmt; extern crate tokio_core; -use std::collections::HashMap; +mod changeset; + use std::fs; use std::path::Path; +use std::sync::Arc; -use bytes::Bytes; -use clap::{App, Arg}; -use failure_ext::{err_msg, Error, Fail, FutureFailureErrorExt, FutureFailureExt, ResultExt, - StreamFailureErrorExt}; -use futures::{Future, IntoFuture}; -use futures::future::{self, SharedItem}; -use futures::stream::{self, Stream}; -use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; -use slog::Drain; +use clap::{App, Arg, ArgMatches}; +use failure_ext::err_msg; +use futures::{Future, Stream}; +use slog::{Drain, Logger}; use slog_glog_fmt::default_drain as glog_drain; -use tokio_core::reactor::Core; +use tokio_core::reactor::{Core, Remote}; -use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry, UploadHgEntry}; +use blobrepo::BlobRepo; use changesets::SqliteChangesets; -use mercurial::{HgChangesetId, HgManifestId, HgNodeHash, RevlogChangeset, RevlogEntry, RevlogRepo, - NULL_HASH}; -use mercurial_types::{HgBlob, MPath, RepoPath, RepositoryId, Type}; +use mercurial::RevlogRepo; +use mercurial_types::RepositoryId; -struct ParseChangeset { - revlogcs: BoxFuture, Error>, - rootmf: - BoxFuture, Option)>, Error>, - entries: BoxStream<(Option, RevlogEntry), Error>, -} - -// Extracts all the data from revlog repo that commit API may need. -fn parse_changeset(revlog_repo: RevlogRepo, csid: HgChangesetId) -> ParseChangeset { - let revlogcs = revlog_repo - .get_changeset(&csid) - .with_context(move |_| format!("While reading changeset {:?}", csid)) - .map_err(Fail::compat) - .boxify() - .shared(); - - let rootmf = revlogcs - .clone() - .map_err(Error::from) - .and_then({ - let revlog_repo = revlog_repo.clone(); - move |cs| { - if cs.manifestid().into_nodehash() == NULL_HASH { - future::ok(None).boxify() - } else { - revlog_repo - .get_root_manifest(cs.manifestid()) - .map({ - let manifest_id = *cs.manifestid(); - move |rootmf| Some((manifest_id, rootmf)) - }) - .boxify() - } - } - }) - .with_context(move |_| format!("While reading root manifest for {:?}", csid)) - .map_err(Fail::compat) - .boxify() - .shared(); - - let entries = revlogcs - .clone() - .map_err(Error::from) - .and_then({ - let revlog_repo = revlog_repo.clone(); - move |cs| { - let mut parents = cs.parents() - .into_iter() - .map(HgChangesetId::new) - .map(|csid| { - let revlog_repo = revlog_repo.clone(); - revlog_repo - .get_changeset(&csid) - .and_then(move |cs| { - if cs.manifestid().into_nodehash() == NULL_HASH { - future::ok(None).boxify() - } else { - revlog_repo - .get_root_manifest(cs.manifestid()) - .map(Some) - .boxify() - } - }) - .boxify() - }); - - let p1 = parents.next().unwrap_or(Ok(None).into_future().boxify()); - let p2 = parents.next().unwrap_or(Ok(None).into_future().boxify()); - - p1.join(p2) - .with_context(move |_| format!("While reading parents of {:?}", csid)) - .from_err() - } - }) - .join(rootmf.clone().from_err()) - .map(|((p1, p2), rootmf_shared)| match *rootmf_shared { - None => stream::empty().boxify(), - Some((_, ref rootmf)) => mercurial::manifest::new_entry_intersection_stream( - &rootmf, - p1.as_ref(), - p2.as_ref(), - ), - }) - .flatten_stream() - .with_context(move |_| format!("While reading entries for {:?}", csid)) - .from_err() - .boxify(); - - let revlogcs = revlogcs.map_err(Error::from).boxify(); - - let rootmf = rootmf - .map_err(Error::from) - .and_then(move |rootmf_shared| match *rootmf_shared { - None => Ok(None), - Some((manifest_id, ref mf)) => { - let mut bytes = Vec::new(); - mf.generate(&mut bytes).with_context(|_| { - format!("While generating root manifest blob for {:?}", csid) - })?; - - let (p1, p2) = mf.parents().get_nodes(); - Ok(Some(( - manifest_id, - HgBlob::from(Bytes::from(bytes)), - p1.cloned(), - p2.cloned(), - ))) - } - }) - .boxify(); - - ParseChangeset { - revlogcs, - rootmf, - entries, - } -} - -fn upload_entry( - blobrepo: &BlobRepo, - entry: RevlogEntry, - path: Option, -) -> BoxFuture<(HgBlobEntry, RepoPath), Error> { - let blobrepo = blobrepo.clone(); - - let ty = entry.get_type(); - - let path = MPath::join_element_opt(path.as_ref(), entry.get_name()); - let path = match path { - // XXX this shouldn't be possible -- encode this in the type system - None => { - return future::err(err_msg( - "internal error: joined root path with root manifest", - )).boxify() - } - Some(path) => path, - }; - let path = match ty { - Type::Tree => RepoPath::DirectoryPath(path), - Type::File(_) => RepoPath::FilePath(path), - }; - - let content = entry.get_raw_content(); - let parents = entry.get_parents(); - - content - .join(parents) - .and_then(move |(content, parents)| { - let (p1, p2) = parents.get_nodes(); - let upload = UploadHgEntry { - nodeid: entry.get_hash().into_nodehash(), - raw_content: content, - content_type: ty, - p1: p1.cloned(), - p2: p2.cloned(), - path, - check_nodeid: true, - }; - upload.upload(&blobrepo) - }) - .and_then(|(_, entry)| entry) - .boxify() -} - -fn main() { - let app = App::new("revlog to blob importer") +fn setup_app<'a, 'b>() -> App<'a, 'b> { + App::new("revlog to blob importer") .version("0.0.0") .about("make blobs") .args_from_usage( @@ -230,19 +62,13 @@ fn main() { .possible_values(&["rocksdb", "manifold"]) .required(true) .help("blobstore type"), - ); - let matches = app.get_matches(); - let input = matches.value_of("INPUT").expect("input is not specified"); - let revlogrepo = RevlogRepo::open(input).expect("cannot open revlogrepo"); - - let mut core = Core::new().expect("cannot create tokio core"); - - let drain = glog_drain().fuse(); - let logger = slog::Logger::root(drain, o![]); + ) +} +fn open_blobrepo<'a>(logger: &Logger, remote: Remote, matches: &ArgMatches<'a>) -> BlobRepo { let repo_id = RepositoryId::new(matches.value_of("repo_id").unwrap().parse().unwrap()); - let blobrepo = match matches.value_of("blobstore").unwrap() { + match matches.value_of("blobstore").unwrap() { "rocksdb" => { let output = matches.value_of("OUTPUT").expect("output is not specified"); let output = Path::new(output) @@ -306,7 +132,7 @@ fn main() { logger.new(o!["BlobRepo:TestManifold" => manifold_bucket.to_owned()]), manifold_bucket, "new_blobimport_test", - &core.remote(), + &remote, repo_id, matches .value_of("db-address") @@ -322,87 +148,25 @@ fn main() { ).expect("failed to create manifold blobrepo") } bad => panic!("unexpected blobstore type: {}", bad), + } +} + +fn main() { + let matches = setup_app().get_matches(); + + let revlogrepo = { + let input = matches.value_of("INPUT").expect("input is not specified"); + RevlogRepo::open(input).expect("cannot open revlogrepo") }; - let mut parent_changeset_handles: HashMap = HashMap::new(); + let mut core = Core::new().expect("cannot create tokio core"); - let csstream = revlogrepo - .changesets() - .and_then({ - let revlogrepo = revlogrepo.clone(); - move |csid| { - let ParseChangeset { - revlogcs, - rootmf, - entries, - } = parse_changeset(revlogrepo.clone(), HgChangesetId::new(csid)); - revlogcs.map(move |cs| (csid, cs, rootmf, entries)) - } - }) - .map(move |(csid, cs, rootmf, entries)| { - let rootmf = rootmf - .and_then({ - let blobrepo = blobrepo.clone(); - move |rootmf| { - match rootmf { - None => future::ok(None).boxify(), - Some((manifest_id, blob, p1, p2)) => { - let upload = UploadHgEntry { - nodeid: manifest_id.into_nodehash(), - raw_content: blob, - content_type: Type::Tree, - p1, - p2, - path: RepoPath::root(), - // The root tree manifest is expected to have the wrong hash in - // hybrid mode. This will probably never go away for - // compatibility with old repositories. - check_nodeid: false, - }; - let (_, entry) = try_boxfuture!(upload.upload(&blobrepo)); - entry.map(Some).boxify() - } - } - } - }) - .boxify(); + let drain = glog_drain().fuse(); + let logger = Logger::root(drain, o![]); - let entries = entries - .and_then({ - let blobrepo = blobrepo.clone(); - move |(path, entry)| upload_entry(&blobrepo, entry, path) - }) - .boxify(); + let blobrepo = Arc::new(open_blobrepo(&logger, core.remote(), &matches)); - let (p1handle, p2handle) = { - let mut parents = cs.parents().into_iter().map(|p| { - parent_changeset_handles - .get(&p) - .cloned() - .expect(&format!("parent {} not found for {}", p, csid)) - }); - - (parents.next(), parents.next()) - }; - - let create_changeset = CreateChangeset { - p1: p1handle, - p2: p2handle, - root_manifest: rootmf, - sub_entries: entries, - user: String::from_utf8(Vec::from(cs.user())) - .expect(&format!("non-utf8 username for {}", csid)), - time: cs.time().clone(), - extra: cs.extra().clone(), - comments: String::from_utf8(Vec::from(cs.comments())) - .expect(&format!("non-utf8 comments for {}", csid)), - }; - let cshandle = create_changeset.create(&blobrepo); - parent_changeset_handles.insert(csid, cshandle.clone()); - cshandle - .get_completed_changeset() - .with_context(move |_| format!("While uploading changeset: {}", csid)) - }); + let csstream = changeset::upload_changesets(revlogrepo, blobrepo); core.run(csstream.for_each(|cs| { cs.map(|cs| {