mononoke: limit based queries for bulkops fetch_id

Summary:
When querying for changesets in a repo,  often there are very few changesets for a repo inside a 65536 wide range of ids, which means multiple round trips to the database.

This change adds a LIMIT based mysql query that can always return up to the specified limit rows if the repo has them, and then using bounded_traversal_stream to unfold from the highest id loaded from that query to the next chunk to be loaded.

Reviewed By: StanislavGlebik

Differential Revision: D25804023

fbshipit-source-id: 46df2ea48d01bc4143d96642e45066f520faa4d6
This commit is contained in:
Alex Hornby 2021-01-14 09:49:46 -08:00 committed by Facebook GitHub Bot
parent aa984b0835
commit 7d04c24a91
3 changed files with 228 additions and 33 deletions

View File

@ -7,6 +7,7 @@ license = "GPLv2+"
include = ["src/**/*.rs"]
[dependencies]
bounded_traversal = { path = "../common/bounded_traversal" }
changesets = { path = "../changesets" }
context = { path = "../server/context" }
mononoke_types = { path = "../mononoke_types" }
@ -16,6 +17,7 @@ futures = { version = "0.3.5", features = ["async-await", "compat"] }
itertools = "0.8"
[dev-dependencies]
blobrepo = { path = "../blobrepo" }
bookmarks = { path = "../bookmarks" }
fixtures = { path = "../tests/fixtures" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -10,7 +10,7 @@
///! bulkops
///!
///! Utiltities for handling data in bulk.
use std::cmp::min;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::iter::Iterator;
use std::sync::Arc;
@ -23,7 +23,8 @@ use futures::{
};
use itertools::Either;
use changesets::{ChangesetEntry, Changesets, SqlChangesets};
use bounded_traversal::bounded_traversal_stream;
use changesets::{ChangesetEntry, Changesets, SortOrder, SqlChangesets};
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use phases::{Phases, SqlPhases};
@ -34,6 +35,15 @@ pub enum Direction {
OldestFirst,
}
impl Direction {
fn sort_order(&self) -> SortOrder {
match self {
Direction::NewestFirst => SortOrder::Descending,
Direction::OldestFirst => SortOrder::Ascending,
}
}
}
pub struct PublicChangesetBulkFetch {
repo_id: RepositoryId,
changesets: Arc<dyn Changesets>,
@ -100,7 +110,7 @@ impl PublicChangesetBulkFetch {
}
/// Fetch just the ids without attempting to load the Changesets.
/// Each id comes with the chunk bounds it was loaded from
/// Each id comes with the chunk bounds it was loaded from, using rusts upper exclusive bounds convention.
/// One can optionally specify repo bounds, or None to have it resolved for you (specifying it is useful when checkpointing)
pub fn fetch_ids<'a>(
&'a self,
@ -116,17 +126,66 @@ impl PublicChangesetBulkFetch {
} else {
self.get_repo_bounds().right_future()
};
let step = self.step;
async move {
let repo_bounds = repo_bounds.await?;
let s = stream::iter(windows(repo_bounds, self.step, d))
.then(move |chunk_bounds| async move {
let ids =
public_ids_for_chunk(ctx, repo_id, changesets, phases, d, chunk_bounds);
let res = ids.await?.into_iter().map(move |id| Ok((id, chunk_bounds)));
Ok::<_, Error>(stream::iter(res))
})
.try_flatten();
let s = bounded_traversal_stream(1, Some(repo_bounds.await?), {
// Returns ids plus next bounds to query, if any
move |(lower, upper): (u64, u64)| {
async move {
let results: Vec<_> = changesets
.get_list_bs_cs_id_in_range_exclusive_limit(
repo_id,
lower,
upper,
step,
d.sort_order(),
)
.try_collect()
.await?;
let count = results.len() as u64;
let mut max_id = lower;
let mut min_id = upper - 1;
let cs_ids: Vec<ChangesetId> = results
.into_iter()
.map(|(cs_id, id)| {
max_id = max(max_id, id);
min_id = min(min_id, id);
cs_id
})
.collect();
let (completed, new_bounds) = if d == Direction::OldestFirst {
((lower, max_id + 1), (max_id + 1, upper))
} else {
((min_id, upper), (lower, min_id))
};
let (completed, new_bounds) =
if count < step || new_bounds.0 == new_bounds.1 {
((lower, upper), None)
} else if new_bounds.0 >= new_bounds.1 {
bail!("Logic error, bad bounds {:?}", new_bounds)
} else {
// We have more to load
(completed, Some(new_bounds))
};
Ok::<_, Error>(((cs_ids, completed), new_bounds))
}
}
})
.and_then(move |(mut ids, completed_bounds)| async move {
if !ids.is_empty() {
let public = phases.get_public_raw(ctx, &ids).await?;
ids.retain(|id| public.contains(&id));
}
Ok::<_, Error>(stream::iter(
ids.into_iter().map(move |id| Ok((id, completed_bounds))),
))
})
.try_flatten();
Ok(s)
}
.try_flatten_stream()
@ -195,6 +254,7 @@ mod tests {
use fbinit::FacebookInit;
use blobrepo::BlobRepo;
use bookmarks::BookmarkName;
use fixtures::branch_wide;
use mononoke_types::ChangesetId;
@ -210,23 +270,39 @@ mod tests {
Ok(())
}
#[fbinit::compat_test]
async fn test_fetch_all_public_changesets(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
async fn get_test_repo(ctx: &CoreContext, fb: FacebookInit) -> Result<BlobRepo, Error> {
let blobrepo = branch_wide::getrepo(fb).await;
// our function avoids derivation so we need to explicitly do the derivation for
// phases to have any data
{
let phases = blobrepo.get_phases();
let sql_phases = phases.get_sql_phases();
let master = BookmarkName::new("master")?;
let master = blobrepo
.get_bonsai_bookmark(ctx.clone(), &master)
.await?
.unwrap();
mark_reachable_as_public(&ctx, sql_phases, &[master], false).await?;
}
let phases = blobrepo.get_phases();
let sql_phases = phases.get_sql_phases();
let master = BookmarkName::new("master")?;
let master = blobrepo
.get_bonsai_bookmark(ctx.clone(), &master)
.await?
.unwrap();
mark_reachable_as_public(&ctx, sql_phases, &[master], false).await?;
Ok(blobrepo)
}
fn build_fetcher(
step_size: u64,
blobrepo: &BlobRepo,
) -> Result<PublicChangesetBulkFetch, Error> {
PublicChangesetBulkFetch::new(
blobrepo.get_repoid(),
blobrepo.get_changesets_object(),
blobrepo.get_phases(),
)
.with_step(step_size)
}
#[fbinit::compat_test]
async fn test_fetch_all_public_changesets(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let blobrepo = get_test_repo(&ctx, fb).await?;
let expected = [
"56c0203d7a9a83f14a47a17d3a10e55b1d08feb106fd72f28275e603c6e59625",
@ -243,14 +319,9 @@ mod tests {
if d == &Direction::NewestFirst {
expected.reverse();
}
// Check a range of step sizes in lieu of varying the repo bounds
for step_size in 1..5 {
let fetcher = PublicChangesetBulkFetch::new(
blobrepo.get_repoid(),
blobrepo.get_changesets_object(),
blobrepo.get_phases(),
)
.with_step(step_size)?;
// Repo bounds are 1..8. Check a range of step sizes in lieu of varying the repo bounds
for step_size in 1..9 {
let fetcher = build_fetcher(step_size, &blobrepo)?;
let entries: Vec<ChangesetEntry> = fetcher.fetch(&ctx, *d).try_collect().await?;
let public_ids: Vec<ChangesetId> = entries.into_iter().map(|e| e.cs_id).collect();
let public_ids2: Vec<ChangesetId> = fetcher
@ -268,4 +339,57 @@ mod tests {
}
Ok(())
}
#[fbinit::compat_test]
async fn test_fetch_ids_completed_bounds(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let blobrepo = get_test_repo(&ctx, fb).await?;
// Check what bounds we expect each of the returned changesets to be loaded from
let expectations: &[(u64, Direction, &[(u64, u64)])] = &[
// Three changesets in 8 steps, so observed bounds do not abut
(1, Direction::OldestFirst, &[(1, 2), (3, 4), (7, 8)]),
(2, Direction::OldestFirst, &[(1, 3), (3, 5), (7, 8)]),
// Still not abutting as first two in first step, then last in last step
(3, Direction::OldestFirst, &[(1, 4), (1, 4), (7, 8)]),
// Step one less than repo bounds, now abuts
(6, Direction::OldestFirst, &[(1, 7), (1, 7), (7, 8)]),
// Exactly cover repo bounds in one step
(7, Direction::OldestFirst, &[(1, 8), (1, 8), (1, 8)]),
// Slightly bigger step than repo bounds
(8, Direction::OldestFirst, &[(1, 8), (1, 8), (1, 8)]),
// Three changesets in 8 steps, so observed bounds do not abut
(1, Direction::NewestFirst, &[(7, 8), (3, 4), (1, 2)]),
(2, Direction::NewestFirst, &[(6, 8), (2, 4), (1, 2)]),
// In this direction starts to abut
(3, Direction::NewestFirst, &[(5, 8), (2, 5), (1, 2)]),
// Step one less than repo bounds, now abuts
(6, Direction::NewestFirst, &[(2, 8), (2, 8), (1, 2)]),
// Exactly cover repo bounds in one step
(7, Direction::NewestFirst, &[(1, 8), (1, 8), (1, 8)]),
// Slightly bigger step than repo bounds
(8, Direction::NewestFirst, &[(1, 8), (1, 8), (1, 8)]),
];
for (step, dir, expected_completed) in expectations.iter() {
let fetcher = build_fetcher(*step, &blobrepo)?;
let repo_bounds: (u64, u64) = fetcher.get_repo_bounds().await?;
assert_eq!((1, 8), repo_bounds);
let completed: Vec<(u64, u64)> = fetcher
.fetch_ids(&ctx, *dir, Some(repo_bounds))
.map_ok(|(_cs_id, completed)| completed)
.try_collect()
.await?;
assert_eq!(
completed.as_slice(),
*expected_completed,
"step {} dir {:?}",
step,
dir
);
}
Ok(())
}
}

View File

@ -242,6 +242,24 @@ queries! {
ORDER BY id"
}
read SelectAllChangesetsIdsInRangeLimitAsc(repo_id: RepositoryId, min_id: u64, max_id: u64, limit: u64) -> (ChangesetId, u64) {
"SELECT cs_id, id
FROM changesets
WHERE repo_id = {repo_id}
AND id BETWEEN {min_id} AND {max_id}
ORDER BY id
LIMIT {limit}"
}
read SelectAllChangesetsIdsInRangeLimitDesc(repo_id: RepositoryId, min_id: u64, max_id: u64, limit: u64) -> (ChangesetId, u64) {
"SELECT cs_id, id
FROM changesets
WHERE repo_id = {repo_id}
AND id BETWEEN {min_id} AND {max_id}
ORDER BY id DESC
LIMIT {limit}"
}
read SelectChangesetsIdsBounds(repo_id: RepositoryId) -> (u64, u64) {
"SELECT min(id), max(id)
FROM changesets
@ -424,6 +442,12 @@ async fn fetch_many_by_prefix(
Ok(result)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SortOrder {
Ascending,
Descending,
}
impl SqlChangesets {
pub fn get_list_bs_cs_id_in_range_exclusive(
&self,
@ -454,6 +478,51 @@ impl SqlChangesets {
.boxed()
}
pub fn get_list_bs_cs_id_in_range_exclusive_limit(
&self,
repo_id: RepositoryId,
min_id: u64,
max_id: u64,
limit: u64,
sort_order: SortOrder,
) -> BoxStream<'_, Result<(ChangesetId, u64), Error>> {
// [min_id, max_id)
cloned!(self.read_master_connection);
// As SQL request is BETWEEN, both bounds including
let max_id = max_id - 1;
async move {
if sort_order == SortOrder::Ascending {
SelectAllChangesetsIdsInRangeLimitAsc::query(
&read_master_connection,
&repo_id,
&min_id,
&max_id,
&limit,
)
.compat()
.await
} else {
SelectAllChangesetsIdsInRangeLimitDesc::query(
&read_master_connection,
&repo_id,
&min_id,
&max_id,
&limit,
)
.compat()
.await
}
}
.map_ok(|rows| {
let changesets_ids = rows.into_iter().map(|row| Ok((row.0, row.1)));
stream::iter(changesets_ids)
})
.try_flatten_stream()
.boxed()
}
pub async fn get_changesets_ids_bounds(
&self,
repo_id: RepositoryId,