diff --git a/benchmark/lib/repository.rs b/benchmark/lib/repository.rs index fea9b5fe37..ef3243c8b1 100644 --- a/benchmark/lib/repository.rs +++ b/benchmark/lib/repository.rs @@ -209,6 +209,19 @@ impl Filenodes for DelayedFilenodes { delay(self.put_dist, self.inner.add_filenodes(ctx, info, repo_id)).boxify() } + fn add_or_replace_filenodes( + &self, + ctx: CoreContext, + info: BoxStream, + repo_id: RepositoryId, + ) -> BoxFuture<(), Error> { + delay( + self.put_dist, + self.inner.add_or_replace_filenodes(ctx, info, repo_id), + ) + .boxify() + } + fn get_filenode( &self, ctx: CoreContext, diff --git a/blobrepo/src/utils.rs b/blobrepo/src/utils.rs index e213ed0d1a..2881ebc13a 100644 --- a/blobrepo/src/utils.rs +++ b/blobrepo/src/utils.rs @@ -11,7 +11,7 @@ use cloned::cloned; use failure_ext::Error; use futures::future::Future; use futures::stream; -use futures_ext::StreamExt; +use futures_ext::{BoxStream, StreamExt}; use super::repo::BlobRepo; use context::CoreContext; @@ -70,6 +70,25 @@ impl IncompleteFilenodes { cs_id: HgChangesetId, repo: &BlobRepo, ) -> impl Future + Send { + repo.get_filenodes() + .add_filenodes(ctx, self.prepare_filenodes(cs_id), repo.get_repoid()) + .map(move |_| cs_id) + } + + /// Filenodes shouldn't normally be replaced + /// This function should only be used if we need to fix up filenodes + pub fn replace_filenodes( + &self, + ctx: CoreContext, + cs_id: HgChangesetId, + repo: &BlobRepo, + ) -> impl Future + Send { + repo.get_filenodes() + .add_or_replace_filenodes(ctx, self.prepare_filenodes(cs_id), repo.get_repoid()) + .map(move |_| cs_id) + } + + fn prepare_filenodes(&self, cs_id: HgChangesetId) -> BoxStream { let filenodes = { let mut filenodes = self.filenodes.lock().expect("lock poisoned"); mem::replace(&mut *filenodes, Vec::new()) @@ -79,9 +98,8 @@ impl IncompleteFilenodes { cloned!(cs_id); move |node_info| node_info.with_linknode(cs_id) }); - repo.get_filenodes() - .add_filenodes(ctx, stream::iter_ok(filenodes).boxify(), repo.get_repoid()) - .map(move |_| cs_id) + + stream::iter_ok(filenodes).boxify() } } diff --git a/filenodes/sqlfilenodes/src/lib.rs b/filenodes/sqlfilenodes/src/lib.rs index be4a68ffd1..91edcb97d2 100644 --- a/filenodes/sqlfilenodes/src/lib.rs +++ b/filenodes/sqlfilenodes/src/lib.rs @@ -80,6 +80,29 @@ queries! { ) VALUES {values}" } + write ReplaceFilenodes(values: ( + repo_id: RepositoryId, + path_hash: Vec, + is_tree: i8, + filenode: HgFileNodeId, + linknode: HgChangesetId, + p1: Option, + p2: Option, + has_copyinfo: i8, + )) { + none, + "REPLACE INTO filenodes ( + repo_id + , path_hash + , is_tree + , filenode + , linknode + , p1 + , p2 + , has_copyinfo + ) VALUES {values}" + } + write InsertFixedcopyinfo(values: ( repo_id: RepositoryId, topath_hash: Vec, @@ -262,14 +285,12 @@ impl SqlFilenodes { read_master_connection: Arc::new(read_master_connection), }) } -} -impl Filenodes for SqlFilenodes { - fn add_filenodes( + fn do_insert( &self, - _ctx: CoreContext, filenodes: BoxStream, repo_id: RepositoryId, + replace: bool, ) -> BoxFuture<(), Error> { cloned!(self.write_connection); cloned!(self.read_connection); @@ -295,12 +316,32 @@ impl Filenodes for SqlFilenodes { ) .and_then({ cloned!(write_connection); - move |()| insert_filenodes(&write_connection, repo_id, &filenodes) + move |()| insert_filenodes(&write_connection, repo_id, &filenodes, replace) }) }) .for_each(|()| Ok(())) .boxify() } +} + +impl Filenodes for SqlFilenodes { + fn add_filenodes( + &self, + _ctx: CoreContext, + filenodes: BoxStream, + repo_id: RepositoryId, + ) -> BoxFuture<(), Error> { + self.do_insert(filenodes, repo_id, false) + } + + fn add_or_replace_filenodes( + &self, + _ctx: CoreContext, + filenodes: BoxStream, + repo_id: RepositoryId, + ) -> BoxFuture<(), Error> { + self.do_insert(filenodes, repo_id, true) + } fn get_filenode( &self, @@ -437,6 +478,7 @@ fn insert_filenodes( connections: &Vec, repo_id: RepositoryId, filenodes: &Vec<(FilenodeInfo, PathWithHash)>, + replace: bool, ) -> impl Future { let mut filenode_rows: Vec> = connections.iter().map(|_| Vec::new()).collect(); let mut copydata_rows: Vec> = connections.iter().map(|_| Vec::new()).collect(); @@ -508,10 +550,13 @@ fn insert_filenodes( .enumerate() .filter_map(|(shard, connection)| { if filenode_rows[shard].len() != 0 { - Some(InsertFilenodes::query( - &connection.clone(), - &filenode_rows[shard], - )) + Some(if replace { + ReplaceFilenodes::query(&connection.clone(), &filenode_rows[shard]) + .left_future() + } else { + InsertFilenodes::query(&connection.clone(), &filenode_rows[shard]) + .right_future() + }) } else { None } diff --git a/filenodes/src/caching.rs b/filenodes/src/caching.rs index 1f597bc135..dc67c4c235 100644 --- a/filenodes/src/caching.rs +++ b/filenodes/src/caching.rs @@ -88,6 +88,15 @@ impl Filenodes for CachingFilenodes { self.filenodes.add_filenodes(ctx, info, repo_id) } + fn add_or_replace_filenodes( + &self, + ctx: CoreContext, + info: BoxStream, + repo_id: RepositoryId, + ) -> BoxFuture<(), Error> { + self.filenodes.add_or_replace_filenodes(ctx, info, repo_id) + } + fn get_filenode( &self, ctx: CoreContext, diff --git a/filenodes/src/lib.rs b/filenodes/src/lib.rs index d5266fc24a..0545cd9592 100644 --- a/filenodes/src/lib.rs +++ b/filenodes/src/lib.rs @@ -158,6 +158,13 @@ pub trait Filenodes: Send + Sync { repo_id: RepositoryId, ) -> BoxFuture<(), Error>; + fn add_or_replace_filenodes( + &self, + ctx: CoreContext, + info: BoxStream, + repo_id: RepositoryId, + ) -> BoxFuture<(), Error>; + fn get_filenode( &self, ctx: CoreContext,