new_blobimport: import bookmarks from revlog repo

Summary:
This change isn't doing much on it's own since rocksdb's BlobRepo is using in memory Bookmarks ATM and they dissapear when import is finished.
Later bookmarks will be used for Head discovery, then it will be properly tested

Reviewed By: farnz

Differential Revision: D7728716

fbshipit-source-id: ad50f35b18d93aa1e38951408092e46e67fde0c7
This commit is contained in:
Lukas Piatkowski 2018-04-24 11:03:18 -07:00 committed by Facebook Github Bot
parent a692a2a9ea
commit bccad1bf42
3 changed files with 81 additions and 18 deletions

View File

@ -0,0 +1,54 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::sync::Arc;
use ascii::AsciiString;
use failure::prelude::*;
use futures::prelude::*;
use futures_ext::{BoxFuture, FutureExt};
use slog::Logger;
use blobrepo::BlobRepo;
use mercurial::RevlogRepo;
use mercurial_types::DChangesetId;
pub fn upload_bookmarks(
logger: &Logger,
revlogrepo: RevlogRepo,
blobrepo: Arc<BlobRepo>,
) -> BoxFuture<(), Error> {
let logger = logger.clone();
let bookmarks = Arc::new(try_boxfuture!(revlogrepo.get_bookmarks()));
(*bookmarks).keys().and_then({
let bookmarks = bookmarks.clone();
move |key| {
(*bookmarks).get(&key).and_then(move |v| {
let (cs_id, _) = v.ok_or_else(|| format_err!("Bookmark value missing: {:?}", key))?;
Ok((key, cs_id))
})
}
}).chunks(100) // send 100 bookmarks in a single transaction
.and_then({
let blobrepo = blobrepo.clone();
move |vec| {
let count = vec.len();
let mut transaction = blobrepo.update_bookmark_transaction();
for (key, value) in vec {
let key = try_boxfuture!(AsciiString::from_ascii(key));
let value = DChangesetId::new(value.into_nodehash().into_mononoke());
try_boxfuture!(transaction.create(&key, &value))
}
transaction.commit().map(move |()| count).boxify()
}
}).for_each(move |count| {
info!(logger, "uploaded chunk of {:?} bookmarks", count);
Ok(())
}).boxify()
}

View File

@ -8,8 +8,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use failure_ext::{err_msg, Error, Fail, FutureFailureErrorExt, FutureFailureExt, ResultExt,
StreamFailureErrorExt};
use failure::err_msg;
use failure::prelude::*;
use futures::{Future, IntoFuture};
use futures::future::{self, SharedItem};
use futures::stream::{self, Stream};

View File

@ -7,11 +7,13 @@
#![deny(warnings)]
#![feature(conservative_impl_trait)]
extern crate ascii;
extern crate blobrepo;
extern crate bytes;
extern crate changesets;
extern crate clap;
extern crate failure_ext;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
#[macro_use]
extern crate futures_ext;
@ -22,6 +24,7 @@ extern crate slog;
extern crate slog_glog_fmt;
extern crate tokio_core;
mod bookmark;
mod changeset;
use std::fs;
@ -29,7 +32,7 @@ use std::path::Path;
use std::sync::Arc;
use clap::{App, Arg, ArgMatches};
use failure_ext::err_msg;
use failure::err_msg;
use futures::{Future, Stream};
use slog::{Drain, Logger};
use slog_glog_fmt::default_drain as glog_drain;
@ -166,22 +169,28 @@ fn main() {
let blobrepo = Arc::new(open_blobrepo(&logger, core.remote(), &matches));
let csstream = changeset::upload_changesets(revlogrepo, blobrepo);
let upload_changesets = changeset::upload_changesets(revlogrepo.clone(), blobrepo.clone())
.for_each(|cs| {
cs.map(|cs| {
info!(logger, "inserted: {}", cs.get_changeset_id());
()
}).map_err(|err| {
error!(logger, "failed to blobimport: {}", err);
core.run(csstream.for_each(|cs| {
cs.map(|cs| {
info!(logger, "inserted: {}", cs.get_changeset_id());
()
}).map_err(|err| {
error!(logger, "failed to blobimport: {}", err);
for cause in err.causes() {
info!(logger, "cause: {}", cause);
}
info!(logger, "root cause: {:?}", err.root_cause());
for cause in err.causes() {
info!(logger, "cause: {}", cause);
}
info!(logger, "root cause: {:?}", err.root_cause());
let msg = format!("failed to blobimport: {}", err);
err_msg(msg)
})
});
let msg = format!("failed to blobimport: {}", err);
err_msg(msg)
})
let upload_bookmarks = bookmark::upload_bookmarks(&logger, revlogrepo, blobrepo);
core.run(upload_changesets.and_then(|()| {
info!(logger, "finished uploading changesets, now doing bookmarks");
upload_bookmarks
})).expect("main stream failed");
}