mononoke: always fill changesets table

Summary:
Let's fill cs table even if we import only part of the repo. This let's us
import new changesets incrementally.

That can be dangerous since we don't check if parent commits are present.
However this blobimport is a temporary measure until we get a full-fidelity
blobimport that uses a commit API.

Reviewed By: jsgf

Differential Revision: D7485495

fbshipit-source-id: 63ba91bad4eb1c1662db73293c76a506f48a4753
This commit is contained in:
Stanislau Hlebik 2018-04-13 03:45:51 -07:00 committed by Facebook Github Bot
parent d2558d0d6c
commit 9d6f639389
2 changed files with 66 additions and 49 deletions

View File

@ -16,15 +16,16 @@ use slog::Logger;
use tokio_core::reactor::Core;
use blobrepo::BlobChangeset;
use changesets::{ChangesetInsert, Changesets};
use failure::{Error, Result};
use filenodes::FilenodeInfo;
use futures::sync::mpsc::UnboundedSender;
use futures_ext::{BoxStream, FutureExt, StreamExt};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use heads::Heads;
use mercurial::{self, RevlogManifest, RevlogRepo};
use mercurial::revlog::RevIdx;
use mercurial::revlogrepo::RevlogRepoBlobimportExt;
use mercurial_types::{BlobNode, HgBlob, HgFileNodeId, NodeHash, RepoPath};
use mercurial_types::{BlobNode, HgBlob, HgFileNodeId, NodeHash, RepoPath, RepositoryId};
use mercurial_types::nodehash::HgChangesetId;
use stats::Timeseries;
@ -34,29 +35,29 @@ use manifest;
pub(crate) struct ConvertContext<H> {
pub repo: RevlogRepo,
pub sender: SyncSender<BlobstoreEntry>,
pub headstore: H,
pub core: Core,
pub cpupool: Arc<CpuPool>,
pub logger: Logger,
pub skip: Option<u64>,
pub commits_limit: Option<u64>,
pub filenodes_sender: UnboundedSender<FilenodeInfo>,
}
impl<H> ConvertContext<H>
where
H: Heads,
{
pub fn convert(self) -> Result<()> {
let mut core = self.core;
let logger_owned = self.logger;
let logger = &logger_owned;
let cpupool = self.cpupool;
let headstore = self.headstore;
pub fn convert(
&mut self,
sender: SyncSender<BlobstoreEntry>,
filenodes_sender: UnboundedSender<FilenodeInfo>,
) -> Result<()> {
let core = &mut self.core;
let logger = &self.logger.clone();
let cpupool = self.cpupool.clone();
let headstore = &self.headstore;
let skip = self.skip;
let commits_limit = self.commits_limit;
let filenodes_sender = self.filenodes_sender;
let changesets: BoxStream<mercurial::NodeHash, mercurial::Error> = if let Some(skip) = skip
{
@ -79,7 +80,7 @@ where
.enumerate()
.map({
let repo = self.repo.clone();
let sender = self.sender.clone();
let sender = sender.clone();
move |(seq, csid)| {
debug!(logger, "{}: changeset {}", seq, csid);
STATS::changesets.add_value(1);
@ -118,6 +119,50 @@ where
info!(logger, "parsed everything, waiting for io");
Ok(())
}
pub fn fill_changesets_store(
&self,
changesets_store: Arc<Changesets>,
repo_id: &RepositoryId,
) -> BoxFuture<(), mercurial::Error> {
let repo = self.repo.clone();
let repo_id = *repo_id;
self.get_changesets_stream()
.and_then(move |node| {
let node = mercurial::NodeHash::new(node.sha1().clone());
repo.get_changeset(&mercurial::HgChangesetId::new(node))
.map(move |cs| (cs, node))
})
.for_each(move |(cs, node)| {
let parents = cs.parents()
.into_iter()
.map(|p| HgChangesetId::new(NodeHash::new(p.sha1().clone())))
.collect();
let node = NodeHash::new(node.sha1().clone());
let insert = ChangesetInsert {
repo_id,
cs_id: HgChangesetId::new(node),
parents,
};
changesets_store.add(insert)
})
.boxify()
}
fn get_changesets_stream(&self) -> BoxStream<mercurial::NodeHash, mercurial::Error> {
let changesets: BoxStream<mercurial::NodeHash, mercurial::Error> =
if let Some(skip) = self.skip {
self.repo.changesets().skip(skip).boxify()
} else {
self.repo.changesets().boxify()
};
if let Some(limit) = self.commits_limit {
changesets.take(limit).boxify()
} else {
changesets.boxify()
}
}
}
/// Copy a changeset and its manifest into the blobstore

View File

@ -52,7 +52,7 @@ use std::sync::mpsc::sync_channel;
use std::thread;
use bytes::Bytes;
use changesets::{ChangesetInsert, Changesets, SqliteChangesets};
use changesets::{Changesets, SqliteChangesets};
use clap::{App, Arg, ArgMatches};
use db::{get_connection_params, InstanceRequirement, ProxyRequirement};
use dieselfilenodes::{MysqlFilenodes, SqliteFilenodes, DEFAULT_INSERT_CHUNK_SIZE};
@ -72,7 +72,7 @@ use fileblob::Fileblob;
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use manifoldblob::ManifoldBlob;
use mercurial::{RevlogRepo, RevlogRepoOptions};
use mercurial_types::{HgChangesetId, NodeHash, RepositoryId};
use mercurial_types::RepositoryId;
use rocksblob::Rocksblob;
const DEFAULT_MANIFOLD_BUCKET: &str = "mononoke_prod";
@ -209,55 +209,27 @@ where
let repo = open_repo(&input, inmemory_logs_capacity)?;
info!(logger, "Converting: {}", input.display());
let convert_context = convert::ConvertContext {
let mut convert_context = convert::ConvertContext {
repo: repo.clone(),
sender,
headstore,
core,
cpupool: cpupool.clone(),
logger: logger.clone(),
skip: skip,
commits_limit: commits_limit,
filenodes_sender: filenodes_sender,
};
let res = convert_context.convert();
let res = convert_context.convert(sender, filenodes_sender);
iothread.join().expect("failed to join io thread")?;
filenodesthread
.join()
.expect("failed to join filenodesthread");
res?;
if !skip.is_none() || !commits_limit.is_none() {
warn!(
logger,
"skipping filling up changesets store because --skip or --commits-limit is set"
);
} else {
warn!(logger, "filling up changesets changesets store");
let changesets = open_changesets_store(output.into())?;
let mut core = Core::new()?;
let fut = repo.changesets()
.and_then(|node| {
let node = mercurial::NodeHash::new(node.sha1().clone());
repo.get_changeset(&mercurial::HgChangesetId::new(node))
.map(move |cs| (cs, node))
})
.for_each(|(cs, node)| {
let parents = cs.parents()
.into_iter()
.map(|p| HgChangesetId::new(NodeHash::new(p.sha1().clone())))
.collect();
let node = NodeHash::new(node.sha1().clone());
let insert = ChangesetInsert {
repo_id,
cs_id: HgChangesetId::new(node),
parents,
};
changesets.add(insert)
});
core.run(fut)?;
}
Ok(())
warn!(logger, "filling up changesets changesets store");
let changesets_store = open_changesets_store(output.into())?;
let mut core = Core::new()?;
let fill_cs_store = convert_context.fill_changesets_store(changesets_store, &repo_id);
core.run(fill_cs_store)
}
fn open_changesets_store(mut output: PathBuf) -> Result<Arc<Changesets>> {