blobimport: write out linknodes

Summary:
This makes it quite easy to write out linknodes.

Also regenerate linknodes for our test fixtures -- the next commit will bring
them in.

Reviewed By: jsgf

Differential Revision: D6214033

fbshipit-source-id: 3b930fe9eda45a1b7bc6f0b3f81dd8af102061fc
This commit is contained in:
Siddharth Agarwal 2017-11-13 21:45:41 -08:00 committed by Facebook Github Bot
parent 66a5fa4362
commit a6c5093cc8
184 changed files with 141 additions and 51 deletions

View File

@ -15,9 +15,10 @@ use tokio_core::reactor::Core;
use blobrepo::BlobChangeset;
use futures_ext::{BoxStream, FutureExt, StreamExt};
use heads::Heads;
use linknodes::Linknodes;
use mercurial::{self, RevlogManifest, RevlogRepo};
use mercurial::revlog::RevIdx;
use mercurial_types::{Changeset, Manifest, NodeHash};
use mercurial_types::{Changeset, Manifest, NodeHash, RepoPath};
use stats::Timeseries;
use BlobstoreEntry;
@ -40,7 +41,7 @@ where
H: Heads<Key = String>,
H::Error: Into<Error>,
{
pub fn convert(self) -> Result<()> {
pub fn convert<L: Linknodes>(self, linknodes_store: L) -> Result<()> {
let mut core = self.core;
let logger_owned = self.logger;
let logger = &logger_owned;
@ -53,6 +54,7 @@ where
} else {
self.repo.changesets().boxify()
};
let linknodes_store = Arc::new(linknodes_store);
// Generate stream of changesets. For each changeset, save the cs blob, and the manifest
// blob, and the files.
@ -65,7 +67,7 @@ where
move |(seq, csid)| {
debug!(logger, "{}: changeset {}", seq, csid);
STATS::changesets.add_value(1);
copy_changeset(repo.clone(), sender.clone(), csid)
copy_changeset(repo.clone(), sender.clone(), linknodes_store.clone(), csid)
}
}) // Stream<Future<()>>
.map(|copy| cpupool.spawn(copy))
@ -104,13 +106,15 @@ where
/// The files are more complex. For each manifest, we generate a stream of entries, then flatten
/// the entry streams from all changesets into a single stream. Then each entry is filtered
/// against a set of entries that have already been copied, and any remaining are actually copied.
fn copy_changeset(
fn copy_changeset<L>(
revlog_repo: RevlogRepo,
sender: SyncSender<BlobstoreEntry>,
linknodes_store: L,
csid: NodeHash,
) -> impl Future<Item = (), Error = Error> + Send + 'static
where
Error: Send + 'static,
L: Linknodes,
{
let put = {
let sender = sender.clone();
@ -134,8 +138,7 @@ where
.and_then(move |(cs, entry)| {
let mfid = *cs.manifestid();
let linkrev = entry.linkrev;
put_blobs(revlog_repo, sender, mfid, linkrev)
put_blobs(revlog_repo, sender, linknodes_store, mfid, linkrev)
})
.map_err(move |err| {
Error::with_chain(err, format!("Can't copy manifest for cs {}", csid))
@ -150,16 +153,23 @@ where
/// Copy manifest and filelog entries into the blob store.
///
/// See the help for copy_changeset for a full description.
fn put_blobs(
fn put_blobs<L>(
revlog_repo: RevlogRepo,
sender: SyncSender<BlobstoreEntry>,
linknodes_store: L,
mfid: NodeHash,
linkrev: RevIdx,
) -> impl Future<Item = (), Error = Error> + Send + 'static {
) -> impl Future<Item = (), Error = Error> + Send + 'static
where
L: Linknodes,
{
let cs_entry_fut = revlog_repo.get_changelog().get_entry(linkrev).into_future();
revlog_repo
.get_manifest_blob_by_nodeid(&mfid)
.join(cs_entry_fut)
.from_err()
.and_then(move |blob| {
.and_then(move |(blob, cs_entry)| {
let putmf = manifest::put_entry(
sender.clone(),
mfid,
@ -167,6 +177,11 @@ fn put_blobs(
blob.parents().clone(),
);
let linknode = cs_entry.nodeid;
let put_root_linknode = linknodes_store
.add(RepoPath::root(), &mfid, &linknode)
.from_err();
// Get the listing of entries and fetch each of those
let files = RevlogManifest::new(revlog_repo.clone(), blob)
.map_err(|err| {
@ -186,7 +201,14 @@ fn put_blobs(
}
})
.flatten()
.for_each(move |entry| manifest::copy_entry(entry, sender.clone()))
.for_each(move |entry| {
// All entries share the same linknode to the changelog.
let linknode_future = linknodes_store
.add(entry.get_path().clone(), entry.get_hash(), &linknode)
.from_err();
let copy_future = manifest::copy_entry(entry, sender.clone());
copy_future.join(linknode_future).map(|_| ())
})
})
.into_future()
.flatten();
@ -195,7 +217,7 @@ fn put_blobs(
// Huh? No idea why this is needed to avoid an error below.
let files = files.boxify();
putmf.join(files).map(|_| ())
putmf.join3(put_root_linknode, files).map(|_| ())
})
}

View File

@ -9,8 +9,10 @@ error_chain! {
Blobrepo(::blobrepo::Error, ::blobrepo::ErrorKind);
Mercurial(::mercurial::Error, ::mercurial::ErrorKind);
Rocksblob(::rocksblob::Error, ::rocksblob::ErrorKind);
FileKV(::filekv::Error, ::filekv::ErrorKind);
FileHeads(::fileheads::Error, ::fileheads::ErrorKind);
Fileblob(::fileblob::Error, ::fileblob::ErrorKind);
Linknodes(::linknodes::Error, ::linknodes::ErrorKind);
Manifold(::manifoldblob::Error, ::manifoldblob::ErrorKind);
}
foreign_links {

View File

@ -27,8 +27,11 @@ extern crate blobrepo;
extern crate blobstore;
extern crate fileblob;
extern crate fileheads;
extern crate filekv;
extern crate filelinknodes;
extern crate futures_ext;
extern crate heads;
extern crate linknodes;
extern crate manifoldblob;
extern crate mercurial;
extern crate mercurial_types;
@ -60,7 +63,9 @@ use blobrepo::BlobChangeset;
use blobstore::Blobstore;
use fileblob::Fileblob;
use fileheads::FileHeads;
use filelinknodes::FileLinknodes;
use futures_ext::{BoxFuture, FutureExt};
use linknodes::NoopLinknodes;
use manifoldblob::ManifoldBlob;
use mercurial::RevlogRepo;
use rocksblob::Rocksblob;
@ -111,6 +116,7 @@ fn run_blobimport<In, Out>(
input: In,
output: Out,
blobtype: BlobstoreType,
write_linknodes: bool,
logger: &Logger,
postpone_compaction: bool,
channel_size: usize,
@ -140,43 +146,46 @@ where
// data to this thread.
let iothread = thread::Builder::new()
.name("iothread".to_owned())
.spawn(move || {
let receiverstream = stream::iter_ok::<_, ()>(recv);
let mut core = Core::new().expect("cannot create core in iothread");
let blobstore = open_blobstore(
output,
blobtype,
&core.remote(),
postpone_compaction,
max_blob_size,
)?;
// Filter only manifest entries, because changeset entries should be unique
let mut inserted_manifest_entries = std::collections::HashSet::new();
let stream = receiverstream
.map(move |sender_helper| match sender_helper {
BlobstoreEntry::Changeset(bcs) => {
bcs.save(blobstore.clone()).from_err().boxify()
}
BlobstoreEntry::ManifestEntry((key, value)) => {
if inserted_manifest_entries.insert(key.clone()) {
blobstore.put(key.clone(), value).boxify()
} else {
STATS::duplicates.add_value(1);
Ok(()).into_future().boxify()
.spawn({
let output = output.clone();
move || {
let receiverstream = stream::iter_ok::<_, ()>(recv);
let mut core = Core::new().expect("cannot create core in iothread");
let blobstore = open_blobstore(
output,
blobtype,
&core.remote(),
postpone_compaction,
max_blob_size,
)?;
// Filter only manifest entries, because changeset entries should be unique
let mut inserted_manifest_entries = std::collections::HashSet::new();
let stream = receiverstream
.map(move |sender_helper| match sender_helper {
BlobstoreEntry::Changeset(bcs) => {
bcs.save(blobstore.clone()).from_err().boxify()
}
}
})
.map_err(|_| Error::from("error happened"))
.buffer_unordered(channel_size)
.then(move |res| {
if res.is_err() {
STATS::failures.add_value(1);
} else {
STATS::successes.add_value(1);
}
res
});
core.run(stream.for_each(|_| Ok(())))
BlobstoreEntry::ManifestEntry((key, value)) => {
if inserted_manifest_entries.insert(key.clone()) {
blobstore.put(key.clone(), value).boxify()
} else {
STATS::duplicates.add_value(1);
Ok(()).into_future().boxify()
}
}
})
.map_err(|_| Error::from("error happened"))
.buffer_unordered(channel_size)
.then(move |res| {
if res.is_err() {
STATS::failures.add_value(1);
} else {
STATS::successes.add_value(1);
}
res
});
core.run(stream.for_each(|_| Ok(())))
}
})
.expect("cannot start iothread");
@ -188,11 +197,18 @@ where
sender,
headstore,
core,
cpupool,
cpupool: cpupool.clone(),
logger: logger.clone(),
commits_limit: commits_limit,
};
let res = convert_context.convert();
let res = if write_linknodes {
info!(logger, "Opening linknodes store: {:?}", output);
let linknodes_store = open_linknodes_store(&output, &cpupool)?;
convert_context.convert(linknodes_store)
} else {
info!(logger, "--linknodes not specified, not writing linknodes");
convert_context.convert(NoopLinknodes::new())
};
iothread.join().expect("failed to join io thread")?;
res
}
@ -218,6 +234,13 @@ fn open_headstore<P: Into<PathBuf>>(heads: P, pool: &Arc<CpuPool>) -> Result<Fil
Ok(headstore)
}
fn open_linknodes_store<P: Into<PathBuf>>(path: P, pool: &Arc<CpuPool>) -> Result<FileLinknodes> {
let mut linknodes_path = path.into();
linknodes_path.push("linknodes");
let linknodes_store = FileLinknodes::create_with_pool(linknodes_path, pool.clone())?;
Ok(linknodes_store)
}
fn open_blobstore<P: Into<PathBuf>>(
output: P,
ty: BlobstoreType,
@ -306,6 +329,7 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
--postpone-compaction '(rocksdb only) postpone auto compaction while importing'
-d, --debug 'print debug level output'
--linknodes 'also generate linknodes'
--channel-size [SIZE] 'channel size between worker and io threads. Default: 1000'
--commits-limit [LIMIT] 'import only LIMIT first commits from revlog repo'
--max-blob-size [LIMIT] 'max size of the blob to be inserted'
@ -403,10 +427,13 @@ fn main() {
})
.unwrap_or(1000);
let write_linknodes = matches.is_present("linknodes");
run_blobimport(
input,
output,
blobtype,
write_linknodes,
&root_log,
postpone_compaction,
channel_size,
@ -420,6 +447,7 @@ fn main() {
}),
)?;
if matches.value_of("blobstore").unwrap() == "rocksdb" && postpone_compaction {
let options = rocksdb::Options::new().create_if_missing(false);
let rocksdb = rocksdb::Db::open(Path::new(output).join("blobs"), options)

View File

@ -18,7 +18,8 @@ extern crate mercurial_types;
use std::fmt;
use std::sync::Arc;
use futures::Future;
use futures::{Future, IntoFuture};
use futures::future::FutureResult;
use mercurial_types::{NodeHash, RepoPath};
@ -83,6 +84,31 @@ pub trait Linknodes: Send + Sync + 'static {
fn get(&self, path: RepoPath, node: &NodeHash) -> Self::Get;
}
/// A linknodes implementation that never stores anything.
pub struct NoopLinknodes;
impl NoopLinknodes {
#[inline]
pub fn new() -> Self {
NoopLinknodes
}
}
impl Linknodes for NoopLinknodes {
type Get = FutureResult<NodeHash, Error>;
type Effect = FutureResult<(), Error>;
#[inline]
fn get(&self, path: RepoPath, node: &NodeHash) -> Self::Get {
Err(ErrorKind::NotFound(path, *node).into()).into_future()
}
#[inline]
fn add(&self, _path: RepoPath, _node: &NodeHash, _linknode: &NodeHash) -> Self::Effect {
Ok(()).into_future()
}
}
impl<L> Linknodes for Arc<L>
where
L: Linknodes,

View File

@ -169,6 +169,11 @@ impl RevlogRepo {
}
}
#[inline]
pub fn get_changelog(&self) -> &Revlog {
&self.changelog
}
pub fn changeset_exists(&self, nodeid: &NodeHash) -> FutureResult<bool> {
Ok(self.changelog.get_idx_by_nodeid(nodeid).is_ok()).into_future()
}

Some files were not shown because too many files have changed in this diff Show More