diff --git a/phases/src/caching.rs b/phases/src/caching.rs index 5ed5a3b521..654954b84d 100644 --- a/phases/src/caching.rs +++ b/phases/src/caching.rs @@ -4,7 +4,6 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. -use {Phase, Phases, PhasesHint}; use blobrepo::BlobRepo; use context::CoreContext; use errors::*; @@ -12,10 +11,12 @@ use futures::{future, Future}; use futures_ext::{BoxFuture, FutureExt}; use memcache::{KeyGen, MemcacheClient}; use mononoke_types::{ChangesetId, RepositoryId}; +use reachabilityindex::SkiplistIndex; use stats::Timeseries; use std::sync::Arc; use std::time::Duration; use try_from::TryInto; +use {Phase, Phases, PhasesReachabilityHint}; // Memcache constants, should be changed when we want to invalidate memcache // entries @@ -38,17 +39,17 @@ pub fn get_cache_key(repo_id: &RepositoryId, cs_id: &ChangesetId) -> String { pub struct CachingHintPhases { phases_store: Arc, // phases_store is the underlying persistent storage (db) - phases_hint: PhasesHint, // phases_hint for slow path calculation - memcache: MemcacheClient, // Memcache Client for temporary caching + phases_reachability_hint: PhasesReachabilityHint, // phases_reachability_hint for slow path calculation + memcache: MemcacheClient, // Memcache Client for temporary caching keygen: KeyGen, } impl CachingHintPhases { - pub fn new(phases_store: Arc) -> Self { + pub fn new(phases_store: Arc, skip_index: Arc) -> Self { let key_prefix = "scm.mononoke.phases"; Self { phases_store, - phases_hint: PhasesHint::new(), + phases_reachability_hint: PhasesReachabilityHint::new(skip_index), memcache: MemcacheClient::new(), keygen: KeyGen::new(key_prefix, MC_CODEVER, MC_SITEVER), } @@ -106,7 +107,7 @@ impl Phases for CachingHintPhases { self.keygen, self.memcache, self.phases_store, - self.phases_hint + self.phases_reachability_hint ); let repo_id = repo.get_repoid(); // Look up in the memcache. @@ -129,7 +130,7 @@ impl Phases for CachingHintPhases { } // The phase is not found. Try to calculate it. // It will be error if calculation failed. - None => phases_hint + None => phases_reachability_hint .get(ctx.clone(), repo.clone(), cs_id) .and_then(move |phase| { // The phase is calculated. Refresh memcache. diff --git a/phases/src/hint.rs b/phases/src/hint.rs index ecb592a3e7..d00943cd05 100644 --- a/phases/src/hint.rs +++ b/phases/src/hint.rs @@ -9,7 +9,6 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. -use Phase; use blobrepo::BlobRepo; use context::CoreContext; use errors::*; @@ -18,17 +17,17 @@ use futures_ext::{BoxFuture, FutureExt}; use mononoke_types::ChangesetId; use reachabilityindex::ReachabilityIndex; use reachabilityindex::SkiplistIndex; +use std::sync::Arc; +use Phase; #[derive(Clone)] -pub struct PhasesHint { - index: SkiplistIndex, +pub struct PhasesReachabilityHint { + index: Arc, } -impl PhasesHint { - pub fn new() -> Self { - Self { - index: SkiplistIndex::new(), - } +impl PhasesReachabilityHint { + pub fn new(skip_index: Arc) -> Self { + Self { index: skip_index } } /// Retrieve the phase specified by this commit, if the commit exists @@ -46,6 +45,10 @@ impl PhasesHint { .and_then(move |vec| { let mut vecf = Vec::new(); for (_, public_cs) in vec { + if public_cs == cs_id { + return future::ok(vec![true]).left_future(); + } + cloned!(ctx, index); let changeset_fetcher = repo.get_changeset_fetcher(); vecf.push(index.query_reachability(ctx, changeset_fetcher, public_cs, cs_id)); @@ -54,6 +57,7 @@ impl PhasesHint { .skip_while(|&x| future::ok(!x)) .take(1) .collect() + .right_future() }) .map(|vec| { // vec should be size 0 or 1 diff --git a/phases/src/lib.rs b/phases/src/lib.rs index ade89e0f42..7e78e1032e 100644 --- a/phases/src/lib.rs +++ b/phases/src/lib.rs @@ -35,7 +35,7 @@ mod errors; pub use errors::*; mod hint; -pub use hint::PhasesHint; +pub use hint::PhasesReachabilityHint; use ascii::AsciiString; use blobrepo::BlobRepo; @@ -44,12 +44,16 @@ use futures::{future, Future}; use futures_ext::{BoxFuture, FutureExt}; use mercurial_types::HgPhase; use mononoke_types::{ChangesetId, RepositoryId}; -use std::{fmt, str}; +use reachabilityindex::SkiplistIndex; use std::sync::Arc; +use std::{fmt, str}; use try_from::TryFrom; +use sql::mysql_async::{ + prelude::{ConvIr, FromValue}, + FromValueError, Value, +}; use sql::Connection; -use sql::mysql_async::{FromValueError, Value, prelude::{ConvIr, FromValue}}; pub use sql_ext::SqlConstructors; type FromValueResult = ::std::result::Result; @@ -92,7 +96,8 @@ impl TryFrom for Phase { format!( "Conversion error from IOBuf to Phase, received {} bytes", v.len() - ).into(), + ) + .into(), )), } } @@ -155,7 +160,7 @@ pub trait Phases: Send + Sync { pub struct HintPhases { phases_store: Arc, // phases_store is the underlying persistent storage (db) - phases_hint: PhasesHint, // phases_hint for slow path calculation + phases_reachability_hint: PhasesReachabilityHint, // phases_reachability_hint for slow path calculation } #[derive(Clone)] @@ -165,7 +170,7 @@ pub struct SqlPhases { read_master_connection: Connection, } -queries!{ +queries! { write InsertPhase(values: (repo_id: RepositoryId, cs_id: ChangesetId, phase: Phase)) { none, mysql("INSERT INTO phases (repo_id, cs_id, phase) VALUES {values} ON DUPLICATE KEY UPDATE phase = VALUES(phase)") @@ -211,8 +216,9 @@ impl Phases for SqlPhases { InsertPhase::query( &self.write_connection, &[(&repo.get_repoid(), &cs_id, &phase)], - ).map(move |result| result.affected_rows() >= 1) - .boxify() + ) + .map(move |result| result.affected_rows() >= 1) + .boxify() } /// Retrieve the phase specified by this commit from the table, if available. @@ -229,10 +235,10 @@ impl Phases for SqlPhases { } impl HintPhases { - pub fn new(phases_store: Arc) -> Self { + pub fn new(phases_store: Arc, skip_index: Arc) -> Self { Self { phases_store, - phases_hint: PhasesHint::new(), + phases_reachability_hint: PhasesReachabilityHint::new(skip_index), } } } @@ -264,7 +270,7 @@ impl Phases for HintPhases { repo: BlobRepo, cs_id: ChangesetId, ) -> BoxFuture, Error> { - cloned!(self.phases_store, self.phases_hint); + cloned!(self.phases_store, self.phases_reachability_hint); // Try to fetch from the underlying storage. phases_store .get(ctx.clone(), repo.clone(), cs_id) @@ -274,7 +280,7 @@ impl Phases for HintPhases { return future::ok(maybe_phase).left_future(); } // Not found. Calculate using the slow path. - phases_hint + phases_reachability_hint .get(ctx.clone(), repo.clone(), cs_id) .and_then(move |phase| { if phase == Phase::Public { @@ -362,7 +368,8 @@ mod tests { } fn set_bookmark(ctx: CoreContext, repo: BlobRepo, book: &Bookmark, cs_id: &str) { - let head = repo.get_bonsai_from_hg(ctx.clone(), &HgChangesetId::from_str(cs_id).unwrap()) + let head = repo + .get_bonsai_from_hg(ctx.clone(), &HgChangesetId::from_str(cs_id).unwrap()) .wait() .unwrap() .unwrap(); @@ -400,7 +407,8 @@ mod tests { let repo = linear::getrepo(None); // delete all existing bookmarks - for (bookmark, _) in repo.get_bonsai_bookmarks(ctx.clone()) + for (bookmark, _) in repo + .get_bonsai_bookmarks(ctx.clone()) .collect() .wait() .unwrap() @@ -416,36 +424,62 @@ mod tests { "eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b", ); - let public_commit = repo.get_bonsai_from_hg( - ctx.clone(), - &HgChangesetId::from_str("d0a361e9022d226ae52f689667bd7d212a19cfe0").unwrap(), - ).wait() + let public_commit = repo + .get_bonsai_from_hg( + ctx.clone(), + &HgChangesetId::from_str("d0a361e9022d226ae52f689667bd7d212a19cfe0").unwrap(), + ) + .wait() .unwrap() .unwrap(); - let other_public_commit = repo.get_bonsai_from_hg( - ctx.clone(), - &HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap(), - ).wait() + let other_public_commit = repo + .get_bonsai_from_hg( + ctx.clone(), + &HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap(), + ) + .wait() .unwrap() .unwrap(); - let draft_commit = repo.get_bonsai_from_hg( - ctx.clone(), - &HgChangesetId::from_str("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157").unwrap(), - ).wait() + let draft_commit = repo + .get_bonsai_from_hg( + ctx.clone(), + &HgChangesetId::from_str("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157").unwrap(), + ) + .wait() .unwrap() .unwrap(); - let other_draft_commit = repo.get_bonsai_from_hg( - ctx.clone(), - &HgChangesetId::from_str("a5ffa77602a066db7d5cfb9fb5823a0895717c5a").unwrap(), - ).wait() + let other_draft_commit = repo + .get_bonsai_from_hg( + ctx.clone(), + &HgChangesetId::from_str("a5ffa77602a066db7d5cfb9fb5823a0895717c5a").unwrap(), + ) + .wait() + .unwrap() + .unwrap(); + + let public_bookmark_commit = repo + .get_bonsai_from_hg( + ctx.clone(), + &HgChangesetId::from_str("eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b").unwrap(), + ) + .wait() .unwrap() .unwrap(); let phases_store = Arc::new(SqlPhases::with_sqlite_in_memory().unwrap()); - let hint_phases = HintPhases::new(phases_store.clone()); + let hint_phases = HintPhases::new(phases_store.clone(), Arc::new(SkiplistIndex::new())); + + assert_eq!( + hint_phases + .get(ctx.clone(), repo.clone(), public_bookmark_commit) + .wait() + .unwrap(), + Some(Phase::Public), + "slow path: get phase for a Public commit which is also a public bookmark" + ); assert_eq!( hint_phases diff --git a/server/repo_listener/src/repo_handlers.rs b/server/repo_listener/src/repo_handlers.rs index dc8e2f526e..91e175bbc2 100644 --- a/server/repo_listener/src/repo_handlers.rs +++ b/server/repo_listener/src/repo_handlers.rs @@ -124,7 +124,7 @@ pub fn repo_handlers( let hash_validation_percentage = config.hash_validation_percentage.clone(); let wireproto_scribe_category = config.wireproto_scribe_category.clone(); - let lca_hint = match config.skiplist_index_blobstore_key.clone() { + let skip_index = match config.skiplist_index_blobstore_key.clone() { Some(skiplist_index_blobstore_key) => { let blobstore = repo.blobrepo().get_blobstore(); blobstore @@ -144,24 +144,7 @@ pub fn repo_handlers( None => ok(Arc::new(SkiplistIndex::new())).right_future(), }; - let phases_hint: Arc = match config.repotype { - RepoType::BlobFiles(ref data_dir) - | RepoType::BlobRocks(ref data_dir) - | RepoType::TestBlobDelayRocks(ref data_dir, ..) => { - let storage = Arc::new( - SqlPhases::with_sqlite_path(data_dir.join("phases")) - .expect("unable to initialize sqlite db for phases"), - ); - Arc::new(HintPhases::new(storage)) - } - RepoType::BlobRemote { ref db_address, .. } => { - let storage = Arc::new(SqlPhases::with_myrouter( - &db_address, - myrouter_port.expect("myrouter_port not provided for BlobRemote repo"), - )); - Arc::new(CachingHintPhases::new(storage)) - } - }; + let repotype = config.repotype.clone(); // TODO (T32873881): Arc should become BlobRepo let initial_warmup = ensure_myrouter_ready.and_then({ @@ -175,11 +158,35 @@ pub fn repo_handlers( }); ready_handle - .wait_for(initial_warmup.join(lca_hint).map(|((), lca_hint)| lca_hint)) +.wait_for(initial_warmup.and_then(|()| skip_index)) .map({ cloned!(root_log); - move |lca_hint| { + move |skip_index| { info!(root_log, "Repo warmup for {} complete", reponame); + + // initialize phases hint from the skip index + let phases_hint: Arc = match repotype { + RepoType::BlobFiles(ref data_dir) + | RepoType::BlobRocks(ref data_dir) + | RepoType::TestBlobDelayRocks(ref data_dir, ..) => { + let storage = Arc::new( + SqlPhases::with_sqlite_path(data_dir.join("phases")) + .expect("unable to initialize sqlite db for phases"), + ); + Arc::new(HintPhases::new(storage, skip_index.clone())) + } + RepoType::BlobRemote { ref db_address, .. } => { + let storage = Arc::new(SqlPhases::with_myrouter( + &db_address, + myrouter_port.expect("myrouter_port not provided for BlobRemote repo"), + )); + Arc::new(CachingHintPhases::new(storage, skip_index.clone())) + } + }; + + // initialize lca hint from the skip index + let lca_hint: Arc = skip_index; + ( reponame, RepoHandler {