mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 00:45:18 +03:00
mononoke: allow fetch_ids() to use read replica
Summary: Allow use of read replica when fetching bulk ids. Bulkops clients not needing most up to date bounds can use this mode providing they are not checkpointing the repo min/max ids. Existing default behaviour is unchanged. Differential Revision: D25804028 fbshipit-source-id: ca14e929ea94c351e27eed2aa012fe914c8c691e
This commit is contained in:
parent
b98566c3f9
commit
32ed0fbffe
@ -47,6 +47,7 @@ pub struct PublicChangesetBulkFetch {
|
||||
repo_id: RepositoryId,
|
||||
changesets: Arc<dyn Changesets>,
|
||||
phases: Arc<dyn Phases>,
|
||||
read_from_master: bool,
|
||||
step: u64,
|
||||
}
|
||||
|
||||
@ -60,6 +61,7 @@ impl PublicChangesetBulkFetch {
|
||||
repo_id,
|
||||
changesets,
|
||||
phases,
|
||||
read_from_master: true,
|
||||
step: MAX_FETCH_STEP,
|
||||
}
|
||||
}
|
||||
@ -73,6 +75,13 @@ impl PublicChangesetBulkFetch {
|
||||
Ok(Self { step, ..self })
|
||||
}
|
||||
|
||||
pub fn with_read_from_master(self, read_from_master: bool) -> Self {
|
||||
Self {
|
||||
read_from_master,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the ChangesetEntry, which involves actually loading the Changesets
|
||||
pub fn fetch<'a>(
|
||||
&'a self,
|
||||
@ -126,6 +135,7 @@ impl PublicChangesetBulkFetch {
|
||||
self.get_repo_bounds().right_future()
|
||||
};
|
||||
let step = self.step;
|
||||
let read_from_master = self.read_from_master;
|
||||
|
||||
async move {
|
||||
let s = bounded_traversal_stream(
|
||||
@ -146,6 +156,7 @@ impl PublicChangesetBulkFetch {
|
||||
upper,
|
||||
step,
|
||||
d.sort_order(),
|
||||
read_from_master,
|
||||
)
|
||||
.try_collect()
|
||||
.await?;
|
||||
@ -205,7 +216,9 @@ impl PublicChangesetBulkFetch {
|
||||
/// Get the repo bounds as max/min observed suitable for rust ranges (hence the + 1)
|
||||
pub async fn get_repo_bounds(&self) -> Result<(u64, u64), Error> {
|
||||
let changesets = self.changesets.get_sql_changesets();
|
||||
let (start, stop) = changesets.get_changesets_ids_bounds(self.repo_id).await?;
|
||||
let (start, stop) = changesets
|
||||
.get_changesets_ids_bounds(self.repo_id, self.read_from_master)
|
||||
.await?;
|
||||
let start = start.ok_or_else(|| Error::msg("changesets table is empty"))?;
|
||||
let stop = stop.ok_or_else(|| Error::msg("changesets table is empty"))? + 1;
|
||||
Ok((start, stop))
|
||||
|
@ -14,7 +14,6 @@ mononoke_types = { path = "../mononoke_types" }
|
||||
sql_construct = { path = "../common/sql_construct" }
|
||||
sql_ext = { path = "../common/rust/sql_ext" }
|
||||
cachelib = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
|
||||
memcache = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
|
@ -12,7 +12,6 @@ use anyhow::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use auto_impl::auto_impl;
|
||||
use bytes::Bytes;
|
||||
use cloned::cloned;
|
||||
use context::{CoreContext, PerfCounterType};
|
||||
use fbthrift::compact_protocol;
|
||||
use futures::{
|
||||
@ -454,21 +453,18 @@ impl SqlChangesets {
|
||||
repo_id: RepositoryId,
|
||||
min_id: u64,
|
||||
max_id: u64,
|
||||
read_from_master: bool,
|
||||
) -> BoxStream<'_, Result<ChangesetId, Error>> {
|
||||
// [min_id, max_id)
|
||||
// As SQL request is BETWEEN, both bounds including
|
||||
let max_id = max_id - 1;
|
||||
|
||||
cloned!(self.read_master_connection);
|
||||
let conn = self.read_conn(read_from_master);
|
||||
|
||||
async move {
|
||||
SelectAllChangesetsIdsInRange::query(
|
||||
&read_master_connection,
|
||||
&repo_id,
|
||||
&min_id,
|
||||
&max_id,
|
||||
)
|
||||
.compat()
|
||||
.await
|
||||
SelectAllChangesetsIdsInRange::query(&conn, &repo_id, &min_id, &max_id)
|
||||
.compat()
|
||||
.await
|
||||
}
|
||||
.map_ok(move |rows| {
|
||||
let changesets_ids = rows.into_iter().map(|row| Ok(row.0));
|
||||
@ -485,12 +481,13 @@ impl SqlChangesets {
|
||||
max_id: u64,
|
||||
limit: u64,
|
||||
sort_order: SortOrder,
|
||||
read_from_master: bool,
|
||||
) -> BoxStream<'_, Result<(ChangesetId, u64), Error>> {
|
||||
// [min_id, max_id)
|
||||
// As SQL request is BETWEEN, both bounds including
|
||||
let max_id = max_id - 1;
|
||||
|
||||
let conn = &self.read_master_connection;
|
||||
let conn = self.read_conn(read_from_master);
|
||||
|
||||
async move {
|
||||
if sort_order == SortOrder::Ascending {
|
||||
@ -515,12 +512,13 @@ impl SqlChangesets {
|
||||
.boxed()
|
||||
}
|
||||
|
||||
|
||||
pub async fn get_changesets_ids_bounds(
|
||||
&self,
|
||||
repo_id: RepositoryId,
|
||||
read_from_master: bool,
|
||||
) -> Result<(Option<u64>, Option<u64>), Error> {
|
||||
let rows = SelectChangesetsIdsBounds::query(&self.read_master_connection, &repo_id)
|
||||
let conn = self.read_conn(read_from_master);
|
||||
let rows = SelectChangesetsIdsBounds::query(conn, &repo_id)
|
||||
.compat()
|
||||
.await?;
|
||||
if rows.is_empty() {
|
||||
@ -529,6 +527,14 @@ impl SqlChangesets {
|
||||
Ok((Some(rows[0].0), Some(rows[0].1)))
|
||||
}
|
||||
}
|
||||
|
||||
fn read_conn(&self, read_from_master: bool) -> &Connection {
|
||||
if read_from_master {
|
||||
&self.read_master_connection
|
||||
} else {
|
||||
&self.read_connection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn check_missing_rows(
|
||||
|
@ -221,9 +221,12 @@ impl AliasVerification {
|
||||
"Process Changesets with ids: [{:?}, {:?})", min_id, max_id
|
||||
);
|
||||
|
||||
let bcs_ids =
|
||||
self.sqlchangesets
|
||||
.get_list_bs_cs_id_in_range_exclusive(self.repoid, min_id, max_id);
|
||||
let bcs_ids = self.sqlchangesets.get_list_bs_cs_id_in_range_exclusive(
|
||||
self.repoid,
|
||||
min_id,
|
||||
max_id,
|
||||
true,
|
||||
);
|
||||
|
||||
bcs_ids
|
||||
.and_then(move |bcs_id| async move {
|
||||
@ -253,7 +256,7 @@ impl AliasVerification {
|
||||
) -> Result<(), Error> {
|
||||
let (min_id, max_id) = self
|
||||
.sqlchangesets
|
||||
.get_changesets_ids_bounds(self.repoid)
|
||||
.get_changesets_ids_bounds(self.repoid, true)
|
||||
.await?;
|
||||
|
||||
let mut bounds = vec![];
|
||||
|
Loading…
Reference in New Issue
Block a user