From 2f4a56df43edbcbbf43c84a7648f4ebf7a0f9987 Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Mon, 29 Apr 2019 01:13:11 -0700 Subject: [PATCH] mononoke: more efficient ensure_paths_exists implementation Summary: Before this diff we always wrote to master even if paths already exist. This diff changes it - first check in replica if paths are there, and only then write to master Reviewed By: farnz Differential Revision: D15063307 fbshipit-source-id: 802839f340c9953c7f2812e77d81bc66917c5e77 --- filenodes/sqlfilenodes/src/lib.rs | 72 +++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/filenodes/sqlfilenodes/src/lib.rs b/filenodes/sqlfilenodes/src/lib.rs index 5901b7128e..3f3e17a8ae 100644 --- a/filenodes/sqlfilenodes/src/lib.rs +++ b/filenodes/sqlfilenodes/src/lib.rs @@ -42,6 +42,7 @@ use sql_ext::{create_myrouter_connections, PoolSizeConfig, SqlConnections}; use errors::ErrorKind; +use std::collections::HashSet; use std::sync::Arc; const DEFAULT_INSERT_CHUNK_SIZE: usize = 100; @@ -160,6 +161,13 @@ queries! { AND paths.path_hash = {path_hash} LIMIT 1" } + + read SelectAllPaths(repo_id: RepositoryId, >list path_hashes: Vec) -> (Vec) { + "SELECT path + FROM paths + WHERE paths.repo_id = {repo_id} + AND paths.path_hash in {path_hashes}" + } } impl SqlConstructors for SqlFilenodes { @@ -244,6 +252,7 @@ impl Filenodes for SqlFilenodes { repo_id: RepositoryId, ) -> BoxFuture<(), Error> { cloned!(self.write_connection); + cloned!(self.read_connection); filenodes .chunks(DEFAULT_INSERT_CHUNK_SIZE) @@ -258,7 +267,13 @@ impl Filenodes for SqlFilenodes { }) .collect(); - ensure_paths_exists(&write_connection, repo_id, &filenodes).and_then({ + ensure_paths_exists( + &read_connection, + write_connection.clone(), + repo_id, + filenodes.clone(), + ) + .and_then({ cloned!(write_connection); move |()| insert_filenodes(&write_connection, repo_id, &filenodes) }) @@ -336,27 +351,66 @@ impl Filenodes for SqlFilenodes { } fn ensure_paths_exists( - connections: &Vec, + read_connections: &Vec, + write_connections: Arc>, repo_id: RepositoryId, - filenodes: &Vec<(FilenodeInfo, PathWithHash)>, + filenodes: Vec<(FilenodeInfo, PathWithHash)>, ) -> impl Future { - let mut path_rows: Vec> = connections.iter().map(|_| Vec::new()).collect(); - for &(_, ref pwh) in filenodes { - path_rows[pwh.shard_number(connections.len())].push((&repo_id, &pwh.path_bytes, &pwh.hash)); + let mut path_rows: Vec> = read_connections.iter().map(|_| Vec::new()).collect(); + for &(_, ref pwh) in filenodes.iter() { + path_rows[pwh.shard_number(read_connections.len())].push(pwh.hash.clone()); } - let futures: Vec<_> = connections + let read_futures: Vec<_> = read_connections .iter() .enumerate() .filter_map(|(shard, connection)| { if path_rows[shard].len() != 0 { - Some(InsertPaths::query(&connection.clone(), &path_rows[shard])) + let path_rows_ref: Vec<_> = path_rows[shard].iter().collect(); + Some(SelectAllPaths::query( + &connection.clone(), + &repo_id, + path_rows_ref.as_ref(), + )) } else { None } }) .collect(); - join_all(futures).map(|_| ()) + + join_all(read_futures) + .map(|fetched_paths| { + let mut v: HashSet> = HashSet::new(); + for paths in fetched_paths { + v.extend(paths.into_iter().map(|p| p.0)); + } + v + }) + .and_then(move |mut existing_paths| { + let mut path_rows: Vec> = write_connections.iter().map(|_| Vec::new()).collect(); + for &(_, ref pwh) in filenodes.iter() { + if existing_paths.insert(pwh.path_bytes.clone()) { + path_rows[pwh.shard_number(write_connections.len())].push(( + &repo_id, + &pwh.path_bytes, + &pwh.hash, + )); + } + } + + let futures: Vec<_> = write_connections + .iter() + .enumerate() + .filter_map(|(shard, connection)| { + if path_rows[shard].len() != 0 { + Some(InsertPaths::query(&connection.clone(), &path_rows[shard])) + } else { + None + } + }) + .collect(); + join_all(futures).map(|_| ()) + }) } fn insert_filenodes(