mononoke: add regenerate_hg_filenodes tool

Reviewed By: farnz

Differential Revision: D16803182

fbshipit-source-id: ea1f450607ee0d86a09fa13810080968b6bf20b0
This commit is contained in:
Stanislau Hlebik 2019-08-14 12:38:35 -07:00 committed by Facebook Github Bot
parent d71642a013
commit 4a8601a968
5 changed files with 105 additions and 13 deletions

View File

@ -209,6 +209,19 @@ impl<F: Filenodes> Filenodes for DelayedFilenodes<F> {
delay(self.put_dist, self.inner.add_filenodes(ctx, info, repo_id)).boxify()
}
fn add_or_replace_filenodes(
&self,
ctx: CoreContext,
info: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
delay(
self.put_dist,
self.inner.add_or_replace_filenodes(ctx, info, repo_id),
)
.boxify()
}
fn get_filenode(
&self,
ctx: CoreContext,

View File

@ -11,7 +11,7 @@ use cloned::cloned;
use failure_ext::Error;
use futures::future::Future;
use futures::stream;
use futures_ext::StreamExt;
use futures_ext::{BoxStream, StreamExt};
use super::repo::BlobRepo;
use context::CoreContext;
@ -70,6 +70,25 @@ impl IncompleteFilenodes {
cs_id: HgChangesetId,
repo: &BlobRepo,
) -> impl Future<Item = HgChangesetId, Error = Error> + Send {
repo.get_filenodes()
.add_filenodes(ctx, self.prepare_filenodes(cs_id), repo.get_repoid())
.map(move |_| cs_id)
}
/// Filenodes shouldn't normally be replaced
/// This function should only be used if we need to fix up filenodes
pub fn replace_filenodes(
&self,
ctx: CoreContext,
cs_id: HgChangesetId,
repo: &BlobRepo,
) -> impl Future<Item = HgChangesetId, Error = Error> + Send {
repo.get_filenodes()
.add_or_replace_filenodes(ctx, self.prepare_filenodes(cs_id), repo.get_repoid())
.map(move |_| cs_id)
}
fn prepare_filenodes(&self, cs_id: HgChangesetId) -> BoxStream<FilenodeInfo, Error> {
let filenodes = {
let mut filenodes = self.filenodes.lock().expect("lock poisoned");
mem::replace(&mut *filenodes, Vec::new())
@ -79,9 +98,8 @@ impl IncompleteFilenodes {
cloned!(cs_id);
move |node_info| node_info.with_linknode(cs_id)
});
repo.get_filenodes()
.add_filenodes(ctx, stream::iter_ok(filenodes).boxify(), repo.get_repoid())
.map(move |_| cs_id)
stream::iter_ok(filenodes).boxify()
}
}

View File

@ -80,6 +80,29 @@ queries! {
) VALUES {values}"
}
write ReplaceFilenodes(values: (
repo_id: RepositoryId,
path_hash: Vec<u8>,
is_tree: i8,
filenode: HgFileNodeId,
linknode: HgChangesetId,
p1: Option<HgFileNodeId>,
p2: Option<HgFileNodeId>,
has_copyinfo: i8,
)) {
none,
"REPLACE INTO filenodes (
repo_id
, path_hash
, is_tree
, filenode
, linknode
, p1
, p2
, has_copyinfo
) VALUES {values}"
}
write InsertFixedcopyinfo(values: (
repo_id: RepositoryId,
topath_hash: Vec<u8>,
@ -262,14 +285,12 @@ impl SqlFilenodes {
read_master_connection: Arc::new(read_master_connection),
})
}
}
impl Filenodes for SqlFilenodes {
fn add_filenodes(
fn do_insert(
&self,
_ctx: CoreContext,
filenodes: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
replace: bool,
) -> BoxFuture<(), Error> {
cloned!(self.write_connection);
cloned!(self.read_connection);
@ -295,12 +316,32 @@ impl Filenodes for SqlFilenodes {
)
.and_then({
cloned!(write_connection);
move |()| insert_filenodes(&write_connection, repo_id, &filenodes)
move |()| insert_filenodes(&write_connection, repo_id, &filenodes, replace)
})
})
.for_each(|()| Ok(()))
.boxify()
}
}
impl Filenodes for SqlFilenodes {
fn add_filenodes(
&self,
_ctx: CoreContext,
filenodes: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
self.do_insert(filenodes, repo_id, false)
}
fn add_or_replace_filenodes(
&self,
_ctx: CoreContext,
filenodes: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
self.do_insert(filenodes, repo_id, true)
}
fn get_filenode(
&self,
@ -437,6 +478,7 @@ fn insert_filenodes(
connections: &Vec<Connection>,
repo_id: RepositoryId,
filenodes: &Vec<(FilenodeInfo, PathWithHash)>,
replace: bool,
) -> impl Future<Item = (), Error = Error> {
let mut filenode_rows: Vec<Vec<_>> = connections.iter().map(|_| Vec::new()).collect();
let mut copydata_rows: Vec<Vec<_>> = connections.iter().map(|_| Vec::new()).collect();
@ -508,10 +550,13 @@ fn insert_filenodes(
.enumerate()
.filter_map(|(shard, connection)| {
if filenode_rows[shard].len() != 0 {
Some(InsertFilenodes::query(
&connection.clone(),
&filenode_rows[shard],
))
Some(if replace {
ReplaceFilenodes::query(&connection.clone(), &filenode_rows[shard])
.left_future()
} else {
InsertFilenodes::query(&connection.clone(), &filenode_rows[shard])
.right_future()
})
} else {
None
}

View File

@ -88,6 +88,15 @@ impl Filenodes for CachingFilenodes {
self.filenodes.add_filenodes(ctx, info, repo_id)
}
fn add_or_replace_filenodes(
&self,
ctx: CoreContext,
info: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
self.filenodes.add_or_replace_filenodes(ctx, info, repo_id)
}
fn get_filenode(
&self,
ctx: CoreContext,

View File

@ -158,6 +158,13 @@ pub trait Filenodes: Send + Sync {
repo_id: RepositoryId,
) -> BoxFuture<(), Error>;
fn add_or_replace_filenodes(
&self,
ctx: CoreContext,
info: BoxStream<FilenodeInfo, Error>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error>;
fn get_filenode(
&self,
ctx: CoreContext,