From 9d6f639389a893fd50913ff8769f4c4e81b7109c Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Fri, 13 Apr 2018 03:45:51 -0700 Subject: [PATCH] mononoke: always fill changesets table Summary: Let's fill cs table even if we import only part of the repo. This let's us import new changesets incrementally. That can be dangerous since we don't check if parent commits are present. However this blobimport is a temporary measure until we get a full-fidelity blobimport that uses a commit API. Reviewed By: jsgf Differential Revision: D7485495 fbshipit-source-id: 63ba91bad4eb1c1662db73293c76a506f48a4753 --- cmds/blobimport/convert.rs | 69 +++++++++++++++++++++++++++++++------- cmds/blobimport/main.rs | 46 +++++-------------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/cmds/blobimport/convert.rs b/cmds/blobimport/convert.rs index fef99340db..a8ba06ba9c 100644 --- a/cmds/blobimport/convert.rs +++ b/cmds/blobimport/convert.rs @@ -16,15 +16,16 @@ use slog::Logger; use tokio_core::reactor::Core; use blobrepo::BlobChangeset; +use changesets::{ChangesetInsert, Changesets}; use failure::{Error, Result}; use filenodes::FilenodeInfo; use futures::sync::mpsc::UnboundedSender; -use futures_ext::{BoxStream, FutureExt, StreamExt}; +use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; use heads::Heads; use mercurial::{self, RevlogManifest, RevlogRepo}; use mercurial::revlog::RevIdx; use mercurial::revlogrepo::RevlogRepoBlobimportExt; -use mercurial_types::{BlobNode, HgBlob, HgFileNodeId, NodeHash, RepoPath}; +use mercurial_types::{BlobNode, HgBlob, HgFileNodeId, NodeHash, RepoPath, RepositoryId}; use mercurial_types::nodehash::HgChangesetId; use stats::Timeseries; @@ -34,29 +35,29 @@ use manifest; pub(crate) struct ConvertContext { pub repo: RevlogRepo, - pub sender: SyncSender, pub headstore: H, pub core: Core, pub cpupool: Arc, pub logger: Logger, pub skip: Option, pub commits_limit: Option, - pub filenodes_sender: UnboundedSender, } impl ConvertContext where H: Heads, { - pub fn convert(self) -> Result<()> { - let mut core = self.core; - let logger_owned = self.logger; - let logger = &logger_owned; - let cpupool = self.cpupool; - let headstore = self.headstore; + pub fn convert( + &mut self, + sender: SyncSender, + filenodes_sender: UnboundedSender, + ) -> Result<()> { + let core = &mut self.core; + let logger = &self.logger.clone(); + let cpupool = self.cpupool.clone(); + let headstore = &self.headstore; let skip = self.skip; let commits_limit = self.commits_limit; - let filenodes_sender = self.filenodes_sender; let changesets: BoxStream = if let Some(skip) = skip { @@ -79,7 +80,7 @@ where .enumerate() .map({ let repo = self.repo.clone(); - let sender = self.sender.clone(); + let sender = sender.clone(); move |(seq, csid)| { debug!(logger, "{}: changeset {}", seq, csid); STATS::changesets.add_value(1); @@ -118,6 +119,50 @@ where info!(logger, "parsed everything, waiting for io"); Ok(()) } + + pub fn fill_changesets_store( + &self, + changesets_store: Arc, + repo_id: &RepositoryId, + ) -> BoxFuture<(), mercurial::Error> { + let repo = self.repo.clone(); + let repo_id = *repo_id; + self.get_changesets_stream() + .and_then(move |node| { + let node = mercurial::NodeHash::new(node.sha1().clone()); + repo.get_changeset(&mercurial::HgChangesetId::new(node)) + .map(move |cs| (cs, node)) + }) + .for_each(move |(cs, node)| { + let parents = cs.parents() + .into_iter() + .map(|p| HgChangesetId::new(NodeHash::new(p.sha1().clone()))) + .collect(); + let node = NodeHash::new(node.sha1().clone()); + let insert = ChangesetInsert { + repo_id, + cs_id: HgChangesetId::new(node), + parents, + }; + changesets_store.add(insert) + }) + .boxify() + } + + fn get_changesets_stream(&self) -> BoxStream { + let changesets: BoxStream = + if let Some(skip) = self.skip { + self.repo.changesets().skip(skip).boxify() + } else { + self.repo.changesets().boxify() + }; + + if let Some(limit) = self.commits_limit { + changesets.take(limit).boxify() + } else { + changesets.boxify() + } + } } /// Copy a changeset and its manifest into the blobstore diff --git a/cmds/blobimport/main.rs b/cmds/blobimport/main.rs index e7d3f8108e..b6a8f2d414 100644 --- a/cmds/blobimport/main.rs +++ b/cmds/blobimport/main.rs @@ -52,7 +52,7 @@ use std::sync::mpsc::sync_channel; use std::thread; use bytes::Bytes; -use changesets::{ChangesetInsert, Changesets, SqliteChangesets}; +use changesets::{Changesets, SqliteChangesets}; use clap::{App, Arg, ArgMatches}; use db::{get_connection_params, InstanceRequirement, ProxyRequirement}; use dieselfilenodes::{MysqlFilenodes, SqliteFilenodes, DEFAULT_INSERT_CHUNK_SIZE}; @@ -72,7 +72,7 @@ use fileblob::Fileblob; use futures_ext::{BoxFuture, FutureExt, StreamExt}; use manifoldblob::ManifoldBlob; use mercurial::{RevlogRepo, RevlogRepoOptions}; -use mercurial_types::{HgChangesetId, NodeHash, RepositoryId}; +use mercurial_types::RepositoryId; use rocksblob::Rocksblob; const DEFAULT_MANIFOLD_BUCKET: &str = "mononoke_prod"; @@ -209,55 +209,27 @@ where let repo = open_repo(&input, inmemory_logs_capacity)?; info!(logger, "Converting: {}", input.display()); - let convert_context = convert::ConvertContext { + let mut convert_context = convert::ConvertContext { repo: repo.clone(), - sender, headstore, core, cpupool: cpupool.clone(), logger: logger.clone(), skip: skip, commits_limit: commits_limit, - filenodes_sender: filenodes_sender, }; - let res = convert_context.convert(); + let res = convert_context.convert(sender, filenodes_sender); iothread.join().expect("failed to join io thread")?; filenodesthread .join() .expect("failed to join filenodesthread"); res?; - if !skip.is_none() || !commits_limit.is_none() { - warn!( - logger, - "skipping filling up changesets store because --skip or --commits-limit is set" - ); - } else { - warn!(logger, "filling up changesets changesets store"); - let changesets = open_changesets_store(output.into())?; - let mut core = Core::new()?; - let fut = repo.changesets() - .and_then(|node| { - let node = mercurial::NodeHash::new(node.sha1().clone()); - repo.get_changeset(&mercurial::HgChangesetId::new(node)) - .map(move |cs| (cs, node)) - }) - .for_each(|(cs, node)| { - let parents = cs.parents() - .into_iter() - .map(|p| HgChangesetId::new(NodeHash::new(p.sha1().clone()))) - .collect(); - let node = NodeHash::new(node.sha1().clone()); - let insert = ChangesetInsert { - repo_id, - cs_id: HgChangesetId::new(node), - parents, - }; - changesets.add(insert) - }); - core.run(fut)?; - } - Ok(()) + warn!(logger, "filling up changesets changesets store"); + let changesets_store = open_changesets_store(output.into())?; + let mut core = Core::new()?; + let fill_cs_store = convert_context.fill_changesets_store(changesets_store, &repo_id); + core.run(fill_cs_store) } fn open_changesets_store(mut output: PathBuf) -> Result> {