using not the default skip index for phases calculations

Summary:
use the correct skip index

sorry for some rustfmt.

Reviewed By: StanislavGlebik

Differential Revision: D13636059

fbshipit-source-id: 2815d82b63b86bda053f5a3a9a1b8a3b72abbf82
This commit is contained in:
Liubov Dmitrieva 2019-01-14 06:24:23 -08:00 committed by Facebook Github Bot
parent 6fc5c58810
commit eeb303f932
4 changed files with 113 additions and 67 deletions

View File

@ -4,7 +4,6 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use {Phase, Phases, PhasesHint};
use blobrepo::BlobRepo;
use context::CoreContext;
use errors::*;
@ -12,10 +11,12 @@ use futures::{future, Future};
use futures_ext::{BoxFuture, FutureExt};
use memcache::{KeyGen, MemcacheClient};
use mononoke_types::{ChangesetId, RepositoryId};
use reachabilityindex::SkiplistIndex;
use stats::Timeseries;
use std::sync::Arc;
use std::time::Duration;
use try_from::TryInto;
use {Phase, Phases, PhasesReachabilityHint};
// Memcache constants, should be changed when we want to invalidate memcache
// entries
@ -38,17 +39,17 @@ pub fn get_cache_key(repo_id: &RepositoryId, cs_id: &ChangesetId) -> String {
pub struct CachingHintPhases {
phases_store: Arc<Phases>, // phases_store is the underlying persistent storage (db)
phases_hint: PhasesHint, // phases_hint for slow path calculation
memcache: MemcacheClient, // Memcache Client for temporary caching
phases_reachability_hint: PhasesReachabilityHint, // phases_reachability_hint for slow path calculation
memcache: MemcacheClient, // Memcache Client for temporary caching
keygen: KeyGen,
}
impl CachingHintPhases {
pub fn new(phases_store: Arc<Phases>) -> Self {
pub fn new(phases_store: Arc<Phases>, skip_index: Arc<SkiplistIndex>) -> Self {
let key_prefix = "scm.mononoke.phases";
Self {
phases_store,
phases_hint: PhasesHint::new(),
phases_reachability_hint: PhasesReachabilityHint::new(skip_index),
memcache: MemcacheClient::new(),
keygen: KeyGen::new(key_prefix, MC_CODEVER, MC_SITEVER),
}
@ -106,7 +107,7 @@ impl Phases for CachingHintPhases {
self.keygen,
self.memcache,
self.phases_store,
self.phases_hint
self.phases_reachability_hint
);
let repo_id = repo.get_repoid();
// Look up in the memcache.
@ -129,7 +130,7 @@ impl Phases for CachingHintPhases {
}
// The phase is not found. Try to calculate it.
// It will be error if calculation failed.
None => phases_hint
None => phases_reachability_hint
.get(ctx.clone(), repo.clone(), cs_id)
.and_then(move |phase| {
// The phase is calculated. Refresh memcache.

View File

@ -9,7 +9,6 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use Phase;
use blobrepo::BlobRepo;
use context::CoreContext;
use errors::*;
@ -18,17 +17,17 @@ use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::ChangesetId;
use reachabilityindex::ReachabilityIndex;
use reachabilityindex::SkiplistIndex;
use std::sync::Arc;
use Phase;
#[derive(Clone)]
pub struct PhasesHint {
index: SkiplistIndex,
pub struct PhasesReachabilityHint {
index: Arc<SkiplistIndex>,
}
impl PhasesHint {
pub fn new() -> Self {
Self {
index: SkiplistIndex::new(),
}
impl PhasesReachabilityHint {
pub fn new(skip_index: Arc<SkiplistIndex>) -> Self {
Self { index: skip_index }
}
/// Retrieve the phase specified by this commit, if the commit exists
@ -46,6 +45,10 @@ impl PhasesHint {
.and_then(move |vec| {
let mut vecf = Vec::new();
for (_, public_cs) in vec {
if public_cs == cs_id {
return future::ok(vec![true]).left_future();
}
cloned!(ctx, index);
let changeset_fetcher = repo.get_changeset_fetcher();
vecf.push(index.query_reachability(ctx, changeset_fetcher, public_cs, cs_id));
@ -54,6 +57,7 @@ impl PhasesHint {
.skip_while(|&x| future::ok(!x))
.take(1)
.collect()
.right_future()
})
.map(|vec| {
// vec should be size 0 or 1

View File

@ -35,7 +35,7 @@ mod errors;
pub use errors::*;
mod hint;
pub use hint::PhasesHint;
pub use hint::PhasesReachabilityHint;
use ascii::AsciiString;
use blobrepo::BlobRepo;
@ -44,12 +44,16 @@ use futures::{future, Future};
use futures_ext::{BoxFuture, FutureExt};
use mercurial_types::HgPhase;
use mononoke_types::{ChangesetId, RepositoryId};
use std::{fmt, str};
use reachabilityindex::SkiplistIndex;
use std::sync::Arc;
use std::{fmt, str};
use try_from::TryFrom;
use sql::mysql_async::{
prelude::{ConvIr, FromValue},
FromValueError, Value,
};
use sql::Connection;
use sql::mysql_async::{FromValueError, Value, prelude::{ConvIr, FromValue}};
pub use sql_ext::SqlConstructors;
type FromValueResult<T> = ::std::result::Result<T, FromValueError>;
@ -92,7 +96,8 @@ impl TryFrom<iobuf::IOBuf> for Phase {
format!(
"Conversion error from IOBuf to Phase, received {} bytes",
v.len()
).into(),
)
.into(),
)),
}
}
@ -155,7 +160,7 @@ pub trait Phases: Send + Sync {
pub struct HintPhases {
phases_store: Arc<Phases>, // phases_store is the underlying persistent storage (db)
phases_hint: PhasesHint, // phases_hint for slow path calculation
phases_reachability_hint: PhasesReachabilityHint, // phases_reachability_hint for slow path calculation
}
#[derive(Clone)]
@ -165,7 +170,7 @@ pub struct SqlPhases {
read_master_connection: Connection,
}
queries!{
queries! {
write InsertPhase(values: (repo_id: RepositoryId, cs_id: ChangesetId, phase: Phase)) {
none,
mysql("INSERT INTO phases (repo_id, cs_id, phase) VALUES {values} ON DUPLICATE KEY UPDATE phase = VALUES(phase)")
@ -211,8 +216,9 @@ impl Phases for SqlPhases {
InsertPhase::query(
&self.write_connection,
&[(&repo.get_repoid(), &cs_id, &phase)],
).map(move |result| result.affected_rows() >= 1)
.boxify()
)
.map(move |result| result.affected_rows() >= 1)
.boxify()
}
/// Retrieve the phase specified by this commit from the table, if available.
@ -229,10 +235,10 @@ impl Phases for SqlPhases {
}
impl HintPhases {
pub fn new(phases_store: Arc<Phases>) -> Self {
pub fn new(phases_store: Arc<Phases>, skip_index: Arc<SkiplistIndex>) -> Self {
Self {
phases_store,
phases_hint: PhasesHint::new(),
phases_reachability_hint: PhasesReachabilityHint::new(skip_index),
}
}
}
@ -264,7 +270,7 @@ impl Phases for HintPhases {
repo: BlobRepo,
cs_id: ChangesetId,
) -> BoxFuture<Option<Phase>, Error> {
cloned!(self.phases_store, self.phases_hint);
cloned!(self.phases_store, self.phases_reachability_hint);
// Try to fetch from the underlying storage.
phases_store
.get(ctx.clone(), repo.clone(), cs_id)
@ -274,7 +280,7 @@ impl Phases for HintPhases {
return future::ok(maybe_phase).left_future();
}
// Not found. Calculate using the slow path.
phases_hint
phases_reachability_hint
.get(ctx.clone(), repo.clone(), cs_id)
.and_then(move |phase| {
if phase == Phase::Public {
@ -362,7 +368,8 @@ mod tests {
}
fn set_bookmark(ctx: CoreContext, repo: BlobRepo, book: &Bookmark, cs_id: &str) {
let head = repo.get_bonsai_from_hg(ctx.clone(), &HgChangesetId::from_str(cs_id).unwrap())
let head = repo
.get_bonsai_from_hg(ctx.clone(), &HgChangesetId::from_str(cs_id).unwrap())
.wait()
.unwrap()
.unwrap();
@ -400,7 +407,8 @@ mod tests {
let repo = linear::getrepo(None);
// delete all existing bookmarks
for (bookmark, _) in repo.get_bonsai_bookmarks(ctx.clone())
for (bookmark, _) in repo
.get_bonsai_bookmarks(ctx.clone())
.collect()
.wait()
.unwrap()
@ -416,36 +424,62 @@ mod tests {
"eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b",
);
let public_commit = repo.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("d0a361e9022d226ae52f689667bd7d212a19cfe0").unwrap(),
).wait()
let public_commit = repo
.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("d0a361e9022d226ae52f689667bd7d212a19cfe0").unwrap(),
)
.wait()
.unwrap()
.unwrap();
let other_public_commit = repo.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap(),
).wait()
let other_public_commit = repo
.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap(),
)
.wait()
.unwrap()
.unwrap();
let draft_commit = repo.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157").unwrap(),
).wait()
let draft_commit = repo
.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157").unwrap(),
)
.wait()
.unwrap()
.unwrap();
let other_draft_commit = repo.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("a5ffa77602a066db7d5cfb9fb5823a0895717c5a").unwrap(),
).wait()
let other_draft_commit = repo
.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("a5ffa77602a066db7d5cfb9fb5823a0895717c5a").unwrap(),
)
.wait()
.unwrap()
.unwrap();
let public_bookmark_commit = repo
.get_bonsai_from_hg(
ctx.clone(),
&HgChangesetId::from_str("eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b").unwrap(),
)
.wait()
.unwrap()
.unwrap();
let phases_store = Arc::new(SqlPhases::with_sqlite_in_memory().unwrap());
let hint_phases = HintPhases::new(phases_store.clone());
let hint_phases = HintPhases::new(phases_store.clone(), Arc::new(SkiplistIndex::new()));
assert_eq!(
hint_phases
.get(ctx.clone(), repo.clone(), public_bookmark_commit)
.wait()
.unwrap(),
Some(Phase::Public),
"slow path: get phase for a Public commit which is also a public bookmark"
);
assert_eq!(
hint_phases

View File

@ -124,7 +124,7 @@ pub fn repo_handlers(
let hash_validation_percentage = config.hash_validation_percentage.clone();
let wireproto_scribe_category = config.wireproto_scribe_category.clone();
let lca_hint = match config.skiplist_index_blobstore_key.clone() {
let skip_index = match config.skiplist_index_blobstore_key.clone() {
Some(skiplist_index_blobstore_key) => {
let blobstore = repo.blobrepo().get_blobstore();
blobstore
@ -144,24 +144,7 @@ pub fn repo_handlers(
None => ok(Arc::new(SkiplistIndex::new())).right_future(),
};
let phases_hint: Arc<Phases> = match config.repotype {
RepoType::BlobFiles(ref data_dir)
| RepoType::BlobRocks(ref data_dir)
| RepoType::TestBlobDelayRocks(ref data_dir, ..) => {
let storage = Arc::new(
SqlPhases::with_sqlite_path(data_dir.join("phases"))
.expect("unable to initialize sqlite db for phases"),
);
Arc::new(HintPhases::new(storage))
}
RepoType::BlobRemote { ref db_address, .. } => {
let storage = Arc::new(SqlPhases::with_myrouter(
&db_address,
myrouter_port.expect("myrouter_port not provided for BlobRemote repo"),
));
Arc::new(CachingHintPhases::new(storage))
}
};
let repotype = config.repotype.clone();
// TODO (T32873881): Arc<BlobRepo> should become BlobRepo
let initial_warmup = ensure_myrouter_ready.and_then({
@ -175,11 +158,35 @@ pub fn repo_handlers(
});
ready_handle
.wait_for(initial_warmup.join(lca_hint).map(|((), lca_hint)| lca_hint))
.wait_for(initial_warmup.and_then(|()| skip_index))
.map({
cloned!(root_log);
move |lca_hint| {
move |skip_index| {
info!(root_log, "Repo warmup for {} complete", reponame);
// initialize phases hint from the skip index
let phases_hint: Arc<Phases> = match repotype {
RepoType::BlobFiles(ref data_dir)
| RepoType::BlobRocks(ref data_dir)
| RepoType::TestBlobDelayRocks(ref data_dir, ..) => {
let storage = Arc::new(
SqlPhases::with_sqlite_path(data_dir.join("phases"))
.expect("unable to initialize sqlite db for phases"),
);
Arc::new(HintPhases::new(storage, skip_index.clone()))
}
RepoType::BlobRemote { ref db_address, .. } => {
let storage = Arc::new(SqlPhases::with_myrouter(
&db_address,
myrouter_port.expect("myrouter_port not provided for BlobRemote repo"),
));
Arc::new(CachingHintPhases::new(storage, skip_index.clone()))
}
};
// initialize lca hint from the skip index
let lca_hint: Arc<LeastCommonAncestorsHint + Send + Sync> = skip_index;
(
reponame,
RepoHandler {