diff --git a/eden/mononoke/bulkops/src/lib.rs b/eden/mononoke/bulkops/src/lib.rs index 415f545743..cc0b5a3ab0 100644 --- a/eden/mononoke/bulkops/src/lib.rs +++ b/eden/mononoke/bulkops/src/lib.rs @@ -47,6 +47,7 @@ pub struct PublicChangesetBulkFetch { repo_id: RepositoryId, changesets: Arc, phases: Arc, + read_from_master: bool, step: u64, } @@ -60,6 +61,7 @@ impl PublicChangesetBulkFetch { repo_id, changesets, phases, + read_from_master: true, step: MAX_FETCH_STEP, } } @@ -73,6 +75,13 @@ impl PublicChangesetBulkFetch { Ok(Self { step, ..self }) } + pub fn with_read_from_master(self, read_from_master: bool) -> Self { + Self { + read_from_master, + ..self + } + } + /// Fetch the ChangesetEntry, which involves actually loading the Changesets pub fn fetch<'a>( &'a self, @@ -126,6 +135,7 @@ impl PublicChangesetBulkFetch { self.get_repo_bounds().right_future() }; let step = self.step; + let read_from_master = self.read_from_master; async move { let s = bounded_traversal_stream( @@ -146,6 +156,7 @@ impl PublicChangesetBulkFetch { upper, step, d.sort_order(), + read_from_master, ) .try_collect() .await?; @@ -205,7 +216,9 @@ impl PublicChangesetBulkFetch { /// Get the repo bounds as max/min observed suitable for rust ranges (hence the + 1) pub async fn get_repo_bounds(&self) -> Result<(u64, u64), Error> { let changesets = self.changesets.get_sql_changesets(); - let (start, stop) = changesets.get_changesets_ids_bounds(self.repo_id).await?; + let (start, stop) = changesets + .get_changesets_ids_bounds(self.repo_id, self.read_from_master) + .await?; let start = start.ok_or_else(|| Error::msg("changesets table is empty"))?; let stop = stop.ok_or_else(|| Error::msg("changesets table is empty"))? + 1; Ok((start, stop)) diff --git a/eden/mononoke/changesets/Cargo.toml b/eden/mononoke/changesets/Cargo.toml index 02a9d44872..8ddbc8a45f 100644 --- a/eden/mononoke/changesets/Cargo.toml +++ b/eden/mononoke/changesets/Cargo.toml @@ -14,7 +14,6 @@ mononoke_types = { path = "../mononoke_types" } sql_construct = { path = "../common/sql_construct" } sql_ext = { path = "../common/rust/sql_ext" } cachelib = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" } memcache = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } diff --git a/eden/mononoke/changesets/src/lib.rs b/eden/mononoke/changesets/src/lib.rs index 6e6fd72394..6477d24dfa 100644 --- a/eden/mononoke/changesets/src/lib.rs +++ b/eden/mononoke/changesets/src/lib.rs @@ -12,7 +12,6 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use auto_impl::auto_impl; use bytes::Bytes; -use cloned::cloned; use context::{CoreContext, PerfCounterType}; use fbthrift::compact_protocol; use futures::{ @@ -454,21 +453,18 @@ impl SqlChangesets { repo_id: RepositoryId, min_id: u64, max_id: u64, + read_from_master: bool, ) -> BoxStream<'_, Result> { // [min_id, max_id) // As SQL request is BETWEEN, both bounds including let max_id = max_id - 1; - cloned!(self.read_master_connection); + let conn = self.read_conn(read_from_master); + async move { - SelectAllChangesetsIdsInRange::query( - &read_master_connection, - &repo_id, - &min_id, - &max_id, - ) - .compat() - .await + SelectAllChangesetsIdsInRange::query(&conn, &repo_id, &min_id, &max_id) + .compat() + .await } .map_ok(move |rows| { let changesets_ids = rows.into_iter().map(|row| Ok(row.0)); @@ -485,12 +481,13 @@ impl SqlChangesets { max_id: u64, limit: u64, sort_order: SortOrder, + read_from_master: bool, ) -> BoxStream<'_, Result<(ChangesetId, u64), Error>> { // [min_id, max_id) // As SQL request is BETWEEN, both bounds including let max_id = max_id - 1; - let conn = &self.read_master_connection; + let conn = self.read_conn(read_from_master); async move { if sort_order == SortOrder::Ascending { @@ -515,12 +512,13 @@ impl SqlChangesets { .boxed() } - pub async fn get_changesets_ids_bounds( &self, repo_id: RepositoryId, + read_from_master: bool, ) -> Result<(Option, Option), Error> { - let rows = SelectChangesetsIdsBounds::query(&self.read_master_connection, &repo_id) + let conn = self.read_conn(read_from_master); + let rows = SelectChangesetsIdsBounds::query(conn, &repo_id) .compat() .await?; if rows.is_empty() { @@ -529,6 +527,14 @@ impl SqlChangesets { Ok((Some(rows[0].0), Some(rows[0].1))) } } + + fn read_conn(&self, read_from_master: bool) -> &Connection { + if read_from_master { + &self.read_master_connection + } else { + &self.read_connection + } + } } fn check_missing_rows( diff --git a/eden/mononoke/cmds/aliasverify.rs b/eden/mononoke/cmds/aliasverify.rs index 006f216c26..a5a3d85b94 100644 --- a/eden/mononoke/cmds/aliasverify.rs +++ b/eden/mononoke/cmds/aliasverify.rs @@ -221,9 +221,12 @@ impl AliasVerification { "Process Changesets with ids: [{:?}, {:?})", min_id, max_id ); - let bcs_ids = - self.sqlchangesets - .get_list_bs_cs_id_in_range_exclusive(self.repoid, min_id, max_id); + let bcs_ids = self.sqlchangesets.get_list_bs_cs_id_in_range_exclusive( + self.repoid, + min_id, + max_id, + true, + ); bcs_ids .and_then(move |bcs_id| async move { @@ -253,7 +256,7 @@ impl AliasVerification { ) -> Result<(), Error> { let (min_id, max_id) = self .sqlchangesets - .get_changesets_ids_bounds(self.repoid) + .get_changesets_ids_bounds(self.repoid, true) .await?; let mut bounds = vec![];