From bccad1bf4204f55903b060d64eb5b948fa6f2827 Mon Sep 17 00:00:00 2001 From: Lukas Piatkowski Date: Tue, 24 Apr 2018 11:03:18 -0700 Subject: [PATCH] new_blobimport: import bookmarks from revlog repo Summary: This change isn't doing much on it's own since rocksdb's BlobRepo is using in memory Bookmarks ATM and they dissapear when import is finished. Later bookmarks will be used for Head discovery, then it will be properly tested Reviewed By: farnz Differential Revision: D7728716 fbshipit-source-id: ad50f35b18d93aa1e38951408092e46e67fde0c7 --- cmds/new_blobimport/bookmark.rs | 54 ++++++++++++++++++++++++++++++++ cmds/new_blobimport/changeset.rs | 4 +-- cmds/new_blobimport/main.rs | 41 ++++++++++++++---------- 3 files changed, 81 insertions(+), 18 deletions(-) create mode 100644 cmds/new_blobimport/bookmark.rs diff --git a/cmds/new_blobimport/bookmark.rs b/cmds/new_blobimport/bookmark.rs new file mode 100644 index 0000000000..876d2303ea --- /dev/null +++ b/cmds/new_blobimport/bookmark.rs @@ -0,0 +1,54 @@ +// Copyright (c) 2004-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use std::sync::Arc; + +use ascii::AsciiString; +use failure::prelude::*; +use futures::prelude::*; +use futures_ext::{BoxFuture, FutureExt}; +use slog::Logger; + +use blobrepo::BlobRepo; +use mercurial::RevlogRepo; +use mercurial_types::DChangesetId; + +pub fn upload_bookmarks( + logger: &Logger, + revlogrepo: RevlogRepo, + blobrepo: Arc, +) -> BoxFuture<(), Error> { + let logger = logger.clone(); + let bookmarks = Arc::new(try_boxfuture!(revlogrepo.get_bookmarks())); + + (*bookmarks).keys().and_then({ + let bookmarks = bookmarks.clone(); + move |key| { + (*bookmarks).get(&key).and_then(move |v| { + let (cs_id, _) = v.ok_or_else(|| format_err!("Bookmark value missing: {:?}", key))?; + Ok((key, cs_id)) + }) + } + }).chunks(100) // send 100 bookmarks in a single transaction + .and_then({ + let blobrepo = blobrepo.clone(); + move |vec| { + let count = vec.len(); + let mut transaction = blobrepo.update_bookmark_transaction(); + + for (key, value) in vec { + let key = try_boxfuture!(AsciiString::from_ascii(key)); + let value = DChangesetId::new(value.into_nodehash().into_mononoke()); + try_boxfuture!(transaction.create(&key, &value)) + } + + transaction.commit().map(move |()| count).boxify() + } + }).for_each(move |count| { + info!(logger, "uploaded chunk of {:?} bookmarks", count); + Ok(()) + }).boxify() +} diff --git a/cmds/new_blobimport/changeset.rs b/cmds/new_blobimport/changeset.rs index 8d24290523..e91c02eff9 100644 --- a/cmds/new_blobimport/changeset.rs +++ b/cmds/new_blobimport/changeset.rs @@ -8,8 +8,8 @@ use std::collections::HashMap; use std::sync::Arc; use bytes::Bytes; -use failure_ext::{err_msg, Error, Fail, FutureFailureErrorExt, FutureFailureExt, ResultExt, - StreamFailureErrorExt}; +use failure::err_msg; +use failure::prelude::*; use futures::{Future, IntoFuture}; use futures::future::{self, SharedItem}; use futures::stream::{self, Stream}; diff --git a/cmds/new_blobimport/main.rs b/cmds/new_blobimport/main.rs index e3362b9d0b..a10fa35700 100644 --- a/cmds/new_blobimport/main.rs +++ b/cmds/new_blobimport/main.rs @@ -7,11 +7,13 @@ #![deny(warnings)] #![feature(conservative_impl_trait)] +extern crate ascii; extern crate blobrepo; extern crate bytes; extern crate changesets; extern crate clap; -extern crate failure_ext; +#[macro_use] +extern crate failure_ext as failure; extern crate futures; #[macro_use] extern crate futures_ext; @@ -22,6 +24,7 @@ extern crate slog; extern crate slog_glog_fmt; extern crate tokio_core; +mod bookmark; mod changeset; use std::fs; @@ -29,7 +32,7 @@ use std::path::Path; use std::sync::Arc; use clap::{App, Arg, ArgMatches}; -use failure_ext::err_msg; +use failure::err_msg; use futures::{Future, Stream}; use slog::{Drain, Logger}; use slog_glog_fmt::default_drain as glog_drain; @@ -166,22 +169,28 @@ fn main() { let blobrepo = Arc::new(open_blobrepo(&logger, core.remote(), &matches)); - let csstream = changeset::upload_changesets(revlogrepo, blobrepo); + let upload_changesets = changeset::upload_changesets(revlogrepo.clone(), blobrepo.clone()) + .for_each(|cs| { + cs.map(|cs| { + info!(logger, "inserted: {}", cs.get_changeset_id()); + () + }).map_err(|err| { + error!(logger, "failed to blobimport: {}", err); - core.run(csstream.for_each(|cs| { - cs.map(|cs| { - info!(logger, "inserted: {}", cs.get_changeset_id()); - () - }).map_err(|err| { - error!(logger, "failed to blobimport: {}", err); + for cause in err.causes() { + info!(logger, "cause: {}", cause); + } + info!(logger, "root cause: {:?}", err.root_cause()); - for cause in err.causes() { - info!(logger, "cause: {}", cause); - } - info!(logger, "root cause: {:?}", err.root_cause()); + let msg = format!("failed to blobimport: {}", err); + err_msg(msg) + }) + }); - let msg = format!("failed to blobimport: {}", err); - err_msg(msg) - }) + let upload_bookmarks = bookmark::upload_bookmarks(&logger, revlogrepo, blobrepo); + + core.run(upload_changesets.and_then(|()| { + info!(logger, "finished uploading changesets, now doing bookmarks"); + upload_bookmarks })).expect("main stream failed"); }