use memcache leases in derived data implementation

Summary: Use memcache leases to prevent simultaneous generation of derived data for the same changeset id.

Reviewed By: StanislavGlebik

Differential Revision: D16666659

fbshipit-source-id: 47e57449ab854e595a9dc860414c49afa966be01
This commit is contained in:
Pavel Aslanov 2019-08-14 03:58:11 -07:00 committed by Facebook Github Bot
parent dbac37cec5
commit 4046f95e03
8 changed files with 308 additions and 51 deletions

View File

@ -4,31 +4,26 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::{sync::Arc, time::Duration};
use cloned::cloned;
use failure_ext::prelude::*;
use failure_ext::{Error, Result};
use futures::{future::IntoFuture, Future};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use std::collections::HashMap;
use blobstore_factory::{make_blobstore, SqlFactory, SqliteFactory, XdbFactory};
use blobrepo::BlobRepo;
use blobrepo_errors::*;
use blobstore::Blobstore;
use blobstore_factory::{make_blobstore, SqlFactory, SqliteFactory, XdbFactory};
use bonsai_hg_mapping::{CachingBonsaiHgMapping, SqlBonsaiHgMapping};
use bookmarks::{Bookmarks, CachedBookmarks};
use cacheblob::{
dummy::DummyLease, new_cachelib_blobstore_no_lease, new_memcache_blobstore, MemcacheOps,
new_cachelib_blobstore_no_lease, new_memcache_blobstore, InProcessLease, MemcacheOps,
};
use censoredblob::SqlCensoredContentStore;
use changeset_fetcher::{ChangesetFetcher, SimpleChangesetFetcher};
use changesets::{CachingChangesets, SqlChangesets};
use cloned::cloned;
use dbbookmarks::SqlBookmarks;
use failure_ext::prelude::*;
use failure_ext::{Error, Result};
use filenodes::CachingFilenodes;
use filestore::FilestoreConfig;
use futures::{future::IntoFuture, Future};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use memblob::EagerMemblob;
use metaconfig_types::{
self, BlobConfig, Censoring, FilestoreParams, MetadataDBConfig, StorageConfig,
@ -38,7 +33,7 @@ use repo_blobstore::RepoBlobstoreArgs;
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use sql_ext::myrouter_ready;
use sqlfilenodes::{SqlConstructors, SqlFilenodes};
use std::iter::FromIterator;
use std::{collections::HashMap, iter::FromIterator, sync::Arc, time::Duration};
#[derive(Copy, Clone, PartialEq)]
pub enum Caching {
@ -184,7 +179,7 @@ pub fn new_memblob_empty(blobstore: Option<Arc<dyn Blobstore>>) -> Result<BlobRe
SqlBonsaiHgMapping::with_sqlite_in_memory()
.chain_err(ErrorKind::StateOpen(StateOpenError::BonsaiHgMapping))?,
),
Arc::new(DummyLease {}),
Arc::new(InProcessLease::new()),
FilestoreConfig::default(),
))
}
@ -231,7 +226,7 @@ fn new_development<T: SqlFactory>(
filenodes,
changesets,
bonsai_hg_mapping,
Arc::new(DummyLease {}),
Arc::new(InProcessLease::new()),
filestore_config,
)
}
@ -274,7 +269,7 @@ fn new_production<T: SqlFactory>(
let changesets_cache_pool = try_boxfuture!(get_volatile_pool("changesets"));
let bonsai_hg_mapping_cache_pool = try_boxfuture!(get_volatile_pool("bonsai_hg_mapping"));
let hg_generation_lease = try_boxfuture!(MemcacheOps::new("bonsai-hg-generation", ""));
let derive_data_lease = try_boxfuture!(MemcacheOps::new("derived-data-lease", ""));
let filenodes_tier_and_filenodes = sql_factory.open_filenodes();
let bookmarks = sql_factory.open::<SqlBookmarks>();
@ -325,7 +320,7 @@ fn new_production<T: SqlFactory>(
changesets,
Arc::new(bonsai_hg_mapping),
Arc::new(changeset_fetcher_factory),
Arc::new(hg_generation_lease),
Arc::new(derive_data_lease),
filestore_config,
)
},

View File

@ -35,6 +35,7 @@ pub use blob_changeset::{ChangesetMetadata, HgBlobChangeset, HgChangesetContent}
pub use changeset_fetcher::ChangesetFetcher;
// TODO: This is exported for testing - is this the right place for it?
pub use crate::repo_commit::compute_changed_files;
pub use utils::UnittestOverride;
pub mod internal {
pub use crate::utils::{IncompleteFilenodeInfo, IncompleteFilenodes};

View File

@ -4,7 +4,7 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use super::utils::{IncompleteFilenodeInfo, IncompleteFilenodes};
use super::utils::{IncompleteFilenodeInfo, IncompleteFilenodes, UnittestOverride};
use crate::bonsai_generation::{create_bonsai_changeset_object, save_bonsai_changeset_object};
use crate::derive_hg_manifest::derive_hg_manifest;
use crate::errors::*;
@ -126,7 +126,7 @@ pub struct BlobRepo {
// (for example, revsets).
changeset_fetcher_factory:
Arc<dyn Fn() -> Arc<dyn ChangesetFetcher + Send + Sync> + Send + Sync>,
hg_generation_lease: Arc<dyn LeaseOps>,
derived_data_lease: Arc<dyn LeaseOps>,
filestore_config: FilestoreConfig,
}
@ -137,7 +137,7 @@ impl BlobRepo {
filenodes: Arc<dyn Filenodes>,
changesets: Arc<dyn Changesets>,
bonsai_hg_mapping: Arc<dyn BonsaiHgMapping>,
hg_generation_lease: Arc<dyn LeaseOps>,
derived_data_lease: Arc<dyn LeaseOps>,
filestore_config: FilestoreConfig,
) -> Self {
let (blobstore, repoid) = blobstore_args.into_blobrepo_parts();
@ -160,7 +160,7 @@ impl BlobRepo {
bonsai_hg_mapping,
repoid,
changeset_fetcher_factory: Arc::new(changeset_fetcher_factory),
hg_generation_lease,
derived_data_lease,
filestore_config,
}
}
@ -174,7 +174,7 @@ impl BlobRepo {
changeset_fetcher_factory: Arc<
dyn Fn() -> Arc<dyn ChangesetFetcher + Send + Sync> + Send + Sync,
>,
hg_generation_lease: Arc<dyn LeaseOps>,
derived_data_lease: Arc<dyn LeaseOps>,
filestore_config: FilestoreConfig,
) -> Self {
let (blobstore, repoid) = blobstore_args.into_blobrepo_parts();
@ -186,7 +186,7 @@ impl BlobRepo {
bonsai_hg_mapping,
repoid,
changeset_fetcher_factory,
hg_generation_lease,
derived_data_lease,
filestore_config,
}
}
@ -208,7 +208,7 @@ impl BlobRepo {
changesets,
bonsai_hg_mapping,
repoid,
hg_generation_lease,
derived_data_lease,
filestore_config,
..
} = self;
@ -224,7 +224,7 @@ impl BlobRepo {
filenodes,
changesets,
bonsai_hg_mapping,
hg_generation_lease,
derived_data_lease,
filestore_config,
)
}
@ -1433,9 +1433,13 @@ impl BlobRepo {
})
}
pub fn get_derived_data_lease_ops(&self) -> Arc<dyn LeaseOps> {
self.derived_data_lease.clone()
}
fn generate_lease_key(&self, bcs_id: &ChangesetId) -> String {
let repoid = self.get_repoid();
format!("repoid.{}.bonsai.{}", repoid.id(), bcs_id)
format!("repoid.{}.hg-changeset.{}", repoid.id(), bcs_id)
}
fn take_hg_generation_lease(
@ -1446,16 +1450,16 @@ impl BlobRepo {
let key = self.generate_lease_key(&bcs_id);
let repoid = self.get_repoid();
cloned!(self.bonsai_hg_mapping, self.hg_generation_lease);
cloned!(self.bonsai_hg_mapping, self.derived_data_lease);
let repo = self.clone();
loop_fn((), move |()| {
cloned!(ctx, key);
hg_generation_lease
derived_data_lease
.try_add_put_lease(&key)
.or_else(|_| Ok(false))
.and_then({
cloned!(bcs_id, bonsai_hg_mapping, hg_generation_lease, repo);
cloned!(bcs_id, bonsai_hg_mapping, derived_data_lease, repo);
move |leased| {
let maybe_hg_cs =
bonsai_hg_mapping.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id);
@ -1475,7 +1479,7 @@ impl BlobRepo {
Some(hg_cs_id) => {
future::ok(Loop::Break(Some(hg_cs_id))).left_future()
}
None => hg_generation_lease
None => derived_data_lease
.wait_for_other_leases(&key)
.then(|_| Ok(Loop::Continue(())))
.right_future(),
@ -1493,7 +1497,7 @@ impl BlobRepo {
put_success: bool,
) -> impl Future<Item = (), Error = ()> + Send {
let key = self.generate_lease_key(&bcs_id);
self.hg_generation_lease.release_lease(&key, put_success)
self.derived_data_lease.release_lease(&key, put_success)
}
fn generate_hg_changeset(
@ -2610,7 +2614,7 @@ impl Clone for BlobRepo {
bonsai_hg_mapping: self.bonsai_hg_mapping.clone(),
repoid: self.repoid.clone(),
changeset_fetcher_factory: self.changeset_fetcher_factory.clone(),
hg_generation_lease: self.hg_generation_lease.clone(),
derived_data_lease: self.derived_data_lease.clone(),
filestore_config: self.filestore_config.clone(),
}
}
@ -2636,3 +2640,16 @@ where
})
.buffer_unordered(100)
}
impl UnittestOverride<Arc<dyn LeaseOps>> for BlobRepo {
fn unittest_override<F>(&self, modify: F) -> Self
where
F: FnOnce(Arc<dyn LeaseOps>) -> Arc<dyn LeaseOps>,
{
let derived_data_lease = modify(self.derived_data_lease.clone());
BlobRepo {
derived_data_lease,
..self.clone()
}
}
}

View File

@ -84,3 +84,13 @@ impl IncompleteFilenodes {
.map(move |_| cs_id)
}
}
/// Create new instance of implementing object with overridden field of spcecified type.
///
/// This trait only supposed to be used from unittests, when it is necessary to replace
/// some of the fields to better tests specific behaviour.
pub trait UnittestOverride<T> {
fn unittest_override<F>(&self, modify: F) -> Self
where
F: FnOnce(T) -> T;
}

View File

@ -12,6 +12,7 @@ pub use crate::cachelib_cache::{new_cachelib_blobstore, new_cachelib_blobstore_n
pub mod dummy;
mod in_process_lease;
pub use in_process_lease::InProcessLease;
mod locking_cache;
pub use crate::locking_cache::{

View File

@ -10,11 +10,14 @@ use blobrepo::BlobRepo;
use cloned::cloned;
use context::CoreContext;
use failure::Error;
use futures::{future, stream, Future, Stream};
use futures_ext::{bounded_traversal, FutureExt};
use futures::{
future::{self, Loop},
stream, Future, IntoFuture, Stream,
};
use futures_ext::{bounded_traversal, FutureExt, StreamExt};
use mononoke_types::ChangesetId;
use std::{
collections::{HashMap, HashSet},
collections::HashSet,
sync::{Arc, Mutex},
};
use topo_sort::sort_topological;
@ -24,8 +27,6 @@ use topo_sort::sort_topological;
/// nothing will be generated. Otherwise this function will generate data for this commit and for
/// all it's ancestors that didn't have this derived data.
///
/// TODO(T47650154) - add memcache leases to prevent deriving the data for the same commit at
/// the same time
/// TODO(T47650184) - log to scuba and ods how long it took to generate derived data
pub(crate) fn derive_impl<
Derived: BonsaiDerived,
@ -73,10 +74,10 @@ pub(crate) fn derive_impl<
}
})
.filter_map(|x| x) // Remove all None
.collect()
.collect_to()
.map(|v| {
stream::iter_ok(
sort_topological(&v.into_iter().collect::<HashMap<_, _>>())
sort_topological(&v)
.expect("commit graph has cycles!")
.into_iter()
.rev(),
@ -103,6 +104,14 @@ where
{
let bcs_fut = repo.get_bonsai_changeset(ctx.clone(), bcs_id.clone());
let lease = repo.get_derived_data_lease_ops();
let lease_key = Arc::new(format!(
"repo{}.{}.{}",
repo.get_repoid().id(),
Derived::NAME,
bcs_id
));
let changeset_fetcher = repo.get_changeset_fetcher();
let derived_parents =
changeset_fetcher
@ -116,13 +125,70 @@ where
}
});
bcs_fut
.join(derived_parents)
.and_then({
cloned!(ctx);
move |(bcs, parents)| Derived::derive_from_parents(ctx, repo, bcs, parents)
})
.and_then(move |derived| derived_mapping.put(ctx, bcs_id, derived))
bcs_fut.join(derived_parents).and_then({
cloned!(ctx);
move |(bcs, parents)| {
future::loop_fn((), move |()| {
lease
.try_add_put_lease(&lease_key)
.then(|result| {
// In case of lease unavailability we do not want to stall
// generation of all derived data, since lease is a soft lock
// it is safe to assume that we successfuly acquired it
match result {
Ok(leased) => Ok((leased, false)),
Err(_) => Ok((false, true)),
}
})
.and_then({
cloned!(ctx, repo, derived_mapping, lease, lease_key, bcs, parents);
move |(leased, ignored)| {
derived_mapping
.get(ctx.clone(), vec![bcs_id])
.map(move |mut vs| vs.remove(&bcs_id))
.and_then({
cloned!(derived_mapping, lease, lease_key);
move |derived| match derived {
Some(_) => future::ok(Loop::Break(())).left_future(),
None => {
if leased || ignored {
Derived::derive_from_parents(
ctx.clone(),
repo,
bcs,
parents,
)
.and_then(move |derived| {
derived_mapping.put(ctx, bcs_id, derived)
})
.map(|_| Loop::Break(()))
.left_future()
.right_future()
} else {
lease
.wait_for_other_leases(&lease_key)
.then(|_| Ok(Loop::Continue(())))
.right_future()
.right_future()
}
}
}
})
.then(move |result| {
if leased {
lease
.release_lease(&lease_key, result.is_ok())
.then(|_| result)
.right_future()
} else {
result.into_future().left_future()
}
})
}
})
})
}
})
}
fn fetch_derived_may_panic<Derived, Mapping>(
@ -176,23 +242,34 @@ impl<Derived: BonsaiDerived> DeriveNode<Derived> {
mod test {
use super::*;
use blobrepo::UnittestOverride;
use bookmarks::BookmarkName;
use cacheblob::LeaseOps;
use failure::err_msg;
use fixtures::{
branch_even, branch_uneven, branch_wide, linear, many_diamonds, many_files_dirs,
merge_even, merge_uneven, unshared_merge_even, unshared_merge_uneven,
};
use futures_ext::BoxFuture;
use lock_ext::LockExt;
use maplit::hashmap;
use mercurial_types::HgChangesetId;
use mononoke_types::BonsaiChangeset;
use revset::AncestorsNodeStream;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::runtime::Runtime;
#[derive(Clone, Hash, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone, Hash, Eq, Ord, PartialEq, PartialOrd, Debug)]
struct TestGenNum(u64, ChangesetId, Vec<ChangesetId>);
impl BonsaiDerived for TestGenNum {
const NAME: &'static str = "test_gen_num";
fn derive_from_parents(
_ctx: CoreContext,
_repo: BlobRepo,
@ -210,6 +287,7 @@ mod test {
}
}
#[derive(Debug)]
struct TestMapping {
mapping: Arc<Mutex<HashMap<ChangesetId, TestGenNum>>>,
}
@ -304,9 +382,9 @@ mod test {
}
#[test]
fn test_derive_linear() {
fn test_derive_linear() -> Result<(), Error> {
let ctx = CoreContext::test_mock();
let mut runtime = Runtime::new().unwrap();
let mut runtime = Runtime::new()?;
let repo = branch_even::getrepo();
derive_for_master(&mut runtime, ctx.clone(), repo.clone());
@ -337,5 +415,150 @@ mod test {
let repo = many_diamonds::getrepo(&mut runtime);
derive_for_master(&mut runtime, ctx.clone(), repo.clone());
Ok(())
}
#[test]
fn test_leases() -> Result<(), Error> {
let ctx = CoreContext::test_mock();
let mut runtime = Runtime::new()?;
let mapping = Arc::new(TestMapping::new());
let repo = linear::getrepo();
let hg_csid = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?;
let csid = runtime
.block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_csid))?
.ok_or(err_msg("known hg does not have bonsai csid"))?;
let lease = repo.get_derived_data_lease_ops();
let lease_key = Arc::new(format!(
"repo{}.{}.{}",
repo.get_repoid().id(),
TestGenNum::NAME,
csid
));
// take lease
assert_eq!(
runtime.block_on(lease.try_add_put_lease(&lease_key)),
Ok(true)
);
assert_eq!(
runtime.block_on(lease.try_add_put_lease(&lease_key)),
Ok(false)
);
let output = Arc::new(Mutex::new(Vec::new()));
runtime.spawn(
TestGenNum::derive(ctx.clone(), repo.clone(), mapping.clone(), csid).then({
cloned!(output);
move |result| {
output.with(move |output| output.push(result));
Ok::<_, ()>(())
}
}),
);
// schedule derivation
runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?;
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
HashMap::new()
);
// release lease
runtime
.block_on(lease.release_lease(&lease_key, false))
.map_err(|_| err_msg("failed to release a lease"))?;
runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?;
let result = match output.with(|output| output.pop()) {
Some(result) => result?,
None => panic!("scheduled derivation should have been completed"),
};
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
hashmap! { csid => result.clone() }
);
// take lease
assert_eq!(
runtime.block_on(lease.try_add_put_lease(&lease_key)),
Ok(true),
);
// should succed as lease should not be request
assert_eq!(
runtime.block_on(TestGenNum::derive(
ctx.clone(),
repo.clone(),
mapping.clone(),
csid
))?,
result
);
runtime
.block_on(lease.release_lease(&lease_key, false))
.map_err(|_| err_msg("failed to release a lease"))?;
Ok(())
}
#[derive(Debug)]
struct FailingLease;
impl LeaseOps for FailingLease {
fn try_add_put_lease(&self, _key: &str) -> BoxFuture<bool, ()> {
future::err(()).boxify()
}
fn wait_for_other_leases(&self, _key: &str) -> BoxFuture<(), ()> {
future::err(()).boxify()
}
fn release_lease(&self, _key: &str, _put_success: bool) -> BoxFuture<(), ()> {
future::err(()).boxify()
}
}
#[test]
fn test_always_failing_lease() -> Result<(), Error> {
let ctx = CoreContext::test_mock();
let mut runtime = Runtime::new()?;
let mapping = Arc::new(TestMapping::new());
let repo = linear::getrepo().unittest_override(|_| Arc::new(FailingLease));
let hg_csid = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?;
let csid = runtime
.block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_csid))?
.ok_or(err_msg("known hg does not have bonsai csid"))?;
let lease = repo.get_derived_data_lease_ops();
let lease_key = Arc::new(format!(
"repo{}.{}.{}",
repo.get_repoid().id(),
TestGenNum::NAME,
csid
));
// takig lease should fail
assert_eq!(
runtime.block_on(lease.try_add_put_lease(&lease_key)),
Err(())
);
// should succeed even though lease always fails
let result = runtime.block_on(TestGenNum::derive(
ctx.clone(),
repo.clone(),
mapping.clone(),
csid,
))?;
assert_eq!(
runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?,
hashmap! { csid => result },
);
Ok(())
}
}

View File

@ -4,6 +4,8 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
use blobrepo::BlobRepo;
use context::CoreContext;
use failure::Error;
@ -16,6 +18,12 @@ mod derive_impl;
/// Trait for the data that can be derived from bonsai changeset.
/// Examples of that are hg changeset id, unodes root manifest id, git changeset ids etc
pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
/// Name of derived data
///
/// Should be unique string (among derived data types), which is used to identify or
/// name data (for example lease keys) assoicated with particular derived data type.
const NAME: &'static str;
/// Defines how to derive new representation for bonsai having derivations
/// for parents and having a current bonsai object.
///

View File

@ -50,6 +50,8 @@ impl From<RootUnodeManifestId> for BlobstoreBytes {
}
impl BonsaiDerived for RootUnodeManifestId {
const NAME: &'static str = "unodes";
fn derive_from_parents(
ctx: CoreContext,
repo: BlobRepo,