new_blobimport: speed up changeset parsing

Summary: Parsing and reading revlogs is cpu intensive, thus let's use cpupool for it

Reviewed By: StanislavGlebik

Differential Revision: D7926174

fbshipit-source-id: 7f023088941e1ad118a683da972f87607e0bfec4
This commit is contained in:
Lukas Piatkowski 2018-05-11 12:42:33 -07:00 committed by Facebook Github Bot
parent 1f58bd7060
commit 7dff6240c1
2 changed files with 77 additions and 64 deletions

View File

@ -13,6 +13,7 @@ use failure::prelude::*;
use futures::{Future, IntoFuture};
use futures::future::{self, SharedItem};
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use blobrepo::{BlobChangeset, BlobRepo, ChangesetHandle, CreateChangeset, HgBlobEntry,
@ -186,25 +187,23 @@ fn upload_entry(
pub fn upload_changesets(
revlogrepo: RevlogRepo,
blobrepo: Arc<BlobRepo>,
cpupool_size: usize,
) -> BoxStream<BoxFuture<SharedItem<BlobChangeset>, Error>, Error> {
let mut parent_changeset_handles: HashMap<HgNodeHash, ChangesetHandle> = HashMap::new();
revlogrepo
.changesets()
.and_then({
.map({
let revlogrepo = revlogrepo.clone();
let blobrepo = blobrepo.clone();
move |csid| {
let ParseChangeset {
revlogcs,
rootmf,
entries,
} = parse_changeset(revlogrepo.clone(), HgChangesetId::new(csid));
revlogcs.map(move |cs| (csid, cs, rootmf, entries))
}
})
.map(move |(csid, cs, rootmf, entries)| {
let rootmf = rootmf
.and_then({
let rootmf = rootmf.map({
let blobrepo = blobrepo.clone();
move |rootmf| {
match rootmf {
@ -222,20 +221,34 @@ pub fn upload_changesets(
// compatibility with old repositories.
check_nodeid: false,
};
let (_, entry) = try_boxfuture!(upload.upload(&blobrepo));
entry.map(Some).boxify()
upload
.upload(&blobrepo)
.into_future()
.and_then(|(_, entry)| entry)
.map(Some)
.boxify()
}
}
}
})
.boxify();
});
let entries = entries
.and_then({
let entries = entries.map({
let blobrepo = blobrepo.clone();
move |(path, entry)| upload_entry(&blobrepo, entry, path)
})
.boxify();
});
revlogcs
.join3(rootmf, entries.collect())
.map(move |(cs, rootmf, entries)| (csid, cs, rootmf, entries))
}
})
.map({
let cpupool = CpuPool::new(cpupool_size);
move |fut| cpupool.spawn(fut)
})
.buffered(100)
.map(move |(csid, cs, rootmf, entries)| {
let entries = stream::futures_unordered(entries).boxify();
let (p1handle, p2handle) = {
let mut parents = cs.parents().into_iter().map(|p| {

View File

@ -15,6 +15,7 @@ extern crate clap;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
extern crate futures_cpupool;
#[macro_use]
extern crate futures_ext;
extern crate mercurial;
@ -50,17 +51,18 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
.about("make blobs")
.args_from_usage(
r#"
<INPUT> 'input revlog repo'
--debug 'print debug logs'
--repo_id <repo_id> 'ID of the newly imported repo'
--manifold-bucket [BUCKET] 'manifold bucket'
--db-address [address] 'address of a db. Used only for manifold blobstore'
--blobstore-cache-size [SIZE] 'size of the blobstore cache'
--changesets-cache-size [SIZE] 'size of the changesets cache'
--filenodes-cache-size [SIZE] 'size of the filenodes cache'
--io-thread-num [NUM] 'num of the io threads to use'
<INPUT> 'input revlog repo'
--debug 'print debug logs'
--repo_id <repo_id> 'ID of the newly imported repo'
--manifold-bucket [BUCKET] 'manifold bucket'
--db-address [address] 'address of a db. Used only for manifold blobstore'
--blobstore-cache-size [SIZE] 'size of the blobstore cache'
--changesets-cache-size [SIZE] 'size of the changesets cache'
--filenodes-cache-size [SIZE] 'size of the filenodes cache'
--io-thread-num [NUM] 'num of the io threads to use'
--max-concurrent-request-per-io-thread [NUM] 'max requests per io thread'
[OUTPUT] 'Blobstore output'
--parsing-cpupool-size [NUM] 'size of cpupool for parsing revlogs'
[OUTPUT] 'Blobstore output'
"#,
)
.arg(
@ -73,6 +75,16 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
)
}
fn get_usize<'a>(matches: &ArgMatches<'a>, key: &str, default: usize) -> usize {
matches
.value_of(key)
.map(|val| {
val.parse::<usize>()
.expect(&format!("{} must be integer", key))
})
.unwrap_or(default)
}
fn open_blobrepo<'a>(logger: &Logger, matches: &ArgMatches<'a>) -> BlobRepo {
let repo_id = RepositoryId::new(matches.value_of("repo_id").unwrap().parse().unwrap());
@ -144,26 +156,11 @@ fn open_blobrepo<'a>(logger: &Logger, matches: &ArgMatches<'a>) -> BlobRepo {
matches
.value_of("db-address")
.expect("--db-address is not specified"),
matches
.value_of("blobstore-cache-size")
.map(|val| val.parse::<usize>().expect("cache size must be integer"))
.unwrap_or(100_000_000),
matches
.value_of("changesets-cache-size")
.map(|val| val.parse::<usize>().expect("cache size must be integer"))
.unwrap_or(100_000_000),
matches
.value_of("filenodes-cache-size")
.map(|val| val.parse::<usize>().expect("cache size must be integer"))
.unwrap_or(100_000_000),
matches
.value_of("io-thread-num")
.map(|val| val.parse::<usize>().expect("cache size must be integer"))
.unwrap_or(5),
matches
.value_of("max-concurrent-requests-per-io-thread")
.map(|val| val.parse::<usize>().expect("cache size must be integer"))
.unwrap_or(5),
get_usize(&matches, "blobstore-cache-size", 100_000_000),
get_usize(&matches, "changesets-cache-size", 100_000_000),
get_usize(&matches, "filenodes-cache-size", 100_000_000),
get_usize(&matches, "io-thread-num", 5),
get_usize(&matches, "max-concurrent-requests-per-io-thread", 5),
).expect("failed to create manifold blobrepo")
}
bad => panic!("unexpected blobstore type: {}", bad),
@ -194,27 +191,30 @@ fn main() {
let blobrepo = Arc::new(open_blobrepo(&logger, &matches));
let cs_count = Arc::new(AtomicUsize::new(1));
let upload_changesets = changeset::upload_changesets(revlogrepo.clone(), blobrepo.clone())
.for_each(|cs| {
cs.map(|cs| {
debug!(logger, "inserted: {}", cs.get_changeset_id());
let cnt = cs_count.fetch_add(1, Ordering::SeqCst);
if cnt % 5000 == 0 {
info!(logger, "inserted commits # {}", cnt);
}
()
}).map_err(|err| {
error!(logger, "failed to blobimport: {}", err);
let upload_changesets = changeset::upload_changesets(
revlogrepo.clone(),
blobrepo.clone(),
get_usize(&matches, "parsing-cpupool-size", 8),
).for_each(|cs| {
cs.map(|cs| {
debug!(logger, "inserted: {}", cs.get_changeset_id());
let cnt = cs_count.fetch_add(1, Ordering::SeqCst);
if cnt % 5000 == 0 {
info!(logger, "inserted commits # {}", cnt);
}
()
}).map_err(|err| {
error!(logger, "failed to blobimport: {}", err);
for cause in err.causes() {
info!(logger, "cause: {}", cause);
}
info!(logger, "root cause: {:?}", err.root_cause());
for cause in err.causes() {
info!(logger, "cause: {}", cause);
}
info!(logger, "root cause: {:?}", err.root_cause());
let msg = format!("failed to blobimport: {}", err);
err_msg(msg)
})
});
let msg = format!("failed to blobimport: {}", err);
err_msg(msg)
})
});
let upload_bookmarks = bookmark::upload_bookmarks(&logger, revlogrepo, blobrepo);