mononoke: more efficient ensure_paths_exists implementation

Summary:
Before this diff we always wrote to master even if paths already exist. This
diff changes it - first check in replica if paths are there, and only then
write to master

Reviewed By: farnz

Differential Revision: D15063307

fbshipit-source-id: 802839f340c9953c7f2812e77d81bc66917c5e77
This commit is contained in:
Stanislau Hlebik 2019-04-29 01:13:11 -07:00 committed by Facebook Github Bot
parent 1d9039751a
commit 2f4a56df43

View File

@ -42,6 +42,7 @@ use sql_ext::{create_myrouter_connections, PoolSizeConfig, SqlConnections};
use errors::ErrorKind;
use std::collections::HashSet;
use std::sync::Arc;
const DEFAULT_INSERT_CHUNK_SIZE: usize = 100;
@ -160,6 +161,13 @@ queries! {
AND paths.path_hash = {path_hash}
LIMIT 1"
}
read SelectAllPaths(repo_id: RepositoryId, >list path_hashes: Vec<u8>) -> (Vec<u8>) {
"SELECT path
FROM paths
WHERE paths.repo_id = {repo_id}
AND paths.path_hash in {path_hashes}"
}
}
impl SqlConstructors for SqlFilenodes {
@ -244,6 +252,7 @@ impl Filenodes for SqlFilenodes {
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
cloned!(self.write_connection);
cloned!(self.read_connection);
filenodes
.chunks(DEFAULT_INSERT_CHUNK_SIZE)
@ -258,7 +267,13 @@ impl Filenodes for SqlFilenodes {
})
.collect();
ensure_paths_exists(&write_connection, repo_id, &filenodes).and_then({
ensure_paths_exists(
&read_connection,
write_connection.clone(),
repo_id,
filenodes.clone(),
)
.and_then({
cloned!(write_connection);
move |()| insert_filenodes(&write_connection, repo_id, &filenodes)
})
@ -336,27 +351,66 @@ impl Filenodes for SqlFilenodes {
}
fn ensure_paths_exists(
connections: &Vec<Connection>,
read_connections: &Vec<Connection>,
write_connections: Arc<Vec<Connection>>,
repo_id: RepositoryId,
filenodes: &Vec<(FilenodeInfo, PathWithHash)>,
filenodes: Vec<(FilenodeInfo, PathWithHash)>,
) -> impl Future<Item = (), Error = Error> {
let mut path_rows: Vec<Vec<_>> = connections.iter().map(|_| Vec::new()).collect();
for &(_, ref pwh) in filenodes {
path_rows[pwh.shard_number(connections.len())].push((&repo_id, &pwh.path_bytes, &pwh.hash));
let mut path_rows: Vec<Vec<_>> = read_connections.iter().map(|_| Vec::new()).collect();
for &(_, ref pwh) in filenodes.iter() {
path_rows[pwh.shard_number(read_connections.len())].push(pwh.hash.clone());
}
let futures: Vec<_> = connections
let read_futures: Vec<_> = read_connections
.iter()
.enumerate()
.filter_map(|(shard, connection)| {
if path_rows[shard].len() != 0 {
Some(InsertPaths::query(&connection.clone(), &path_rows[shard]))
let path_rows_ref: Vec<_> = path_rows[shard].iter().collect();
Some(SelectAllPaths::query(
&connection.clone(),
&repo_id,
path_rows_ref.as_ref(),
))
} else {
None
}
})
.collect();
join_all(futures).map(|_| ())
join_all(read_futures)
.map(|fetched_paths| {
let mut v: HashSet<Vec<_>> = HashSet::new();
for paths in fetched_paths {
v.extend(paths.into_iter().map(|p| p.0));
}
v
})
.and_then(move |mut existing_paths| {
let mut path_rows: Vec<Vec<_>> = write_connections.iter().map(|_| Vec::new()).collect();
for &(_, ref pwh) in filenodes.iter() {
if existing_paths.insert(pwh.path_bytes.clone()) {
path_rows[pwh.shard_number(write_connections.len())].push((
&repo_id,
&pwh.path_bytes,
&pwh.hash,
));
}
}
let futures: Vec<_> = write_connections
.iter()
.enumerate()
.filter_map(|(shard, connection)| {
if path_rows[shard].len() != 0 {
Some(InsertPaths::query(&connection.clone(), &path_rows[shard]))
} else {
None
}
})
.collect();
join_all(futures).map(|_| ())
})
}
fn insert_filenodes(