diff --git a/blobrepo/factory/src/lib.rs b/blobrepo/factory/src/lib.rs index ce9eb53438..b993410d10 100644 --- a/blobrepo/factory/src/lib.rs +++ b/blobrepo/factory/src/lib.rs @@ -4,31 +4,26 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. -use std::{sync::Arc, time::Duration}; - -use cloned::cloned; -use failure_ext::prelude::*; -use failure_ext::{Error, Result}; -use futures::{future::IntoFuture, Future}; -use futures_ext::{try_boxfuture, BoxFuture, FutureExt}; -use std::collections::HashMap; - -use blobstore_factory::{make_blobstore, SqlFactory, SqliteFactory, XdbFactory}; - use blobrepo::BlobRepo; use blobrepo_errors::*; use blobstore::Blobstore; +use blobstore_factory::{make_blobstore, SqlFactory, SqliteFactory, XdbFactory}; use bonsai_hg_mapping::{CachingBonsaiHgMapping, SqlBonsaiHgMapping}; use bookmarks::{Bookmarks, CachedBookmarks}; use cacheblob::{ - dummy::DummyLease, new_cachelib_blobstore_no_lease, new_memcache_blobstore, MemcacheOps, + new_cachelib_blobstore_no_lease, new_memcache_blobstore, InProcessLease, MemcacheOps, }; use censoredblob::SqlCensoredContentStore; use changeset_fetcher::{ChangesetFetcher, SimpleChangesetFetcher}; use changesets::{CachingChangesets, SqlChangesets}; +use cloned::cloned; use dbbookmarks::SqlBookmarks; +use failure_ext::prelude::*; +use failure_ext::{Error, Result}; use filenodes::CachingFilenodes; use filestore::FilestoreConfig; +use futures::{future::IntoFuture, Future}; +use futures_ext::{try_boxfuture, BoxFuture, FutureExt}; use memblob::EagerMemblob; use metaconfig_types::{ self, BlobConfig, Censoring, FilestoreParams, MetadataDBConfig, StorageConfig, @@ -38,7 +33,7 @@ use repo_blobstore::RepoBlobstoreArgs; use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt}; use sql_ext::myrouter_ready; use sqlfilenodes::{SqlConstructors, SqlFilenodes}; -use std::iter::FromIterator; +use std::{collections::HashMap, iter::FromIterator, sync::Arc, time::Duration}; #[derive(Copy, Clone, PartialEq)] pub enum Caching { @@ -184,7 +179,7 @@ pub fn new_memblob_empty(blobstore: Option>) -> Result( filenodes, changesets, bonsai_hg_mapping, - Arc::new(DummyLease {}), + Arc::new(InProcessLease::new()), filestore_config, ) } @@ -274,7 +269,7 @@ fn new_production( let changesets_cache_pool = try_boxfuture!(get_volatile_pool("changesets")); let bonsai_hg_mapping_cache_pool = try_boxfuture!(get_volatile_pool("bonsai_hg_mapping")); - let hg_generation_lease = try_boxfuture!(MemcacheOps::new("bonsai-hg-generation", "")); + let derive_data_lease = try_boxfuture!(MemcacheOps::new("derived-data-lease", "")); let filenodes_tier_and_filenodes = sql_factory.open_filenodes(); let bookmarks = sql_factory.open::(); @@ -325,7 +320,7 @@ fn new_production( changesets, Arc::new(bonsai_hg_mapping), Arc::new(changeset_fetcher_factory), - Arc::new(hg_generation_lease), + Arc::new(derive_data_lease), filestore_config, ) }, diff --git a/blobrepo/src/lib.rs b/blobrepo/src/lib.rs index db9a04b11d..1d28c45623 100644 --- a/blobrepo/src/lib.rs +++ b/blobrepo/src/lib.rs @@ -35,6 +35,7 @@ pub use blob_changeset::{ChangesetMetadata, HgBlobChangeset, HgChangesetContent} pub use changeset_fetcher::ChangesetFetcher; // TODO: This is exported for testing - is this the right place for it? pub use crate::repo_commit::compute_changed_files; +pub use utils::UnittestOverride; pub mod internal { pub use crate::utils::{IncompleteFilenodeInfo, IncompleteFilenodes}; diff --git a/blobrepo/src/repo.rs b/blobrepo/src/repo.rs index a6b10ccc5e..640ff89bc3 100644 --- a/blobrepo/src/repo.rs +++ b/blobrepo/src/repo.rs @@ -4,7 +4,7 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. -use super::utils::{IncompleteFilenodeInfo, IncompleteFilenodes}; +use super::utils::{IncompleteFilenodeInfo, IncompleteFilenodes, UnittestOverride}; use crate::bonsai_generation::{create_bonsai_changeset_object, save_bonsai_changeset_object}; use crate::derive_hg_manifest::derive_hg_manifest; use crate::errors::*; @@ -126,7 +126,7 @@ pub struct BlobRepo { // (for example, revsets). changeset_fetcher_factory: Arc Arc + Send + Sync>, - hg_generation_lease: Arc, + derived_data_lease: Arc, filestore_config: FilestoreConfig, } @@ -137,7 +137,7 @@ impl BlobRepo { filenodes: Arc, changesets: Arc, bonsai_hg_mapping: Arc, - hg_generation_lease: Arc, + derived_data_lease: Arc, filestore_config: FilestoreConfig, ) -> Self { let (blobstore, repoid) = blobstore_args.into_blobrepo_parts(); @@ -160,7 +160,7 @@ impl BlobRepo { bonsai_hg_mapping, repoid, changeset_fetcher_factory: Arc::new(changeset_fetcher_factory), - hg_generation_lease, + derived_data_lease, filestore_config, } } @@ -174,7 +174,7 @@ impl BlobRepo { changeset_fetcher_factory: Arc< dyn Fn() -> Arc + Send + Sync, >, - hg_generation_lease: Arc, + derived_data_lease: Arc, filestore_config: FilestoreConfig, ) -> Self { let (blobstore, repoid) = blobstore_args.into_blobrepo_parts(); @@ -186,7 +186,7 @@ impl BlobRepo { bonsai_hg_mapping, repoid, changeset_fetcher_factory, - hg_generation_lease, + derived_data_lease, filestore_config, } } @@ -208,7 +208,7 @@ impl BlobRepo { changesets, bonsai_hg_mapping, repoid, - hg_generation_lease, + derived_data_lease, filestore_config, .. } = self; @@ -224,7 +224,7 @@ impl BlobRepo { filenodes, changesets, bonsai_hg_mapping, - hg_generation_lease, + derived_data_lease, filestore_config, ) } @@ -1433,9 +1433,13 @@ impl BlobRepo { }) } + pub fn get_derived_data_lease_ops(&self) -> Arc { + self.derived_data_lease.clone() + } + fn generate_lease_key(&self, bcs_id: &ChangesetId) -> String { let repoid = self.get_repoid(); - format!("repoid.{}.bonsai.{}", repoid.id(), bcs_id) + format!("repoid.{}.hg-changeset.{}", repoid.id(), bcs_id) } fn take_hg_generation_lease( @@ -1446,16 +1450,16 @@ impl BlobRepo { let key = self.generate_lease_key(&bcs_id); let repoid = self.get_repoid(); - cloned!(self.bonsai_hg_mapping, self.hg_generation_lease); + cloned!(self.bonsai_hg_mapping, self.derived_data_lease); let repo = self.clone(); loop_fn((), move |()| { cloned!(ctx, key); - hg_generation_lease + derived_data_lease .try_add_put_lease(&key) .or_else(|_| Ok(false)) .and_then({ - cloned!(bcs_id, bonsai_hg_mapping, hg_generation_lease, repo); + cloned!(bcs_id, bonsai_hg_mapping, derived_data_lease, repo); move |leased| { let maybe_hg_cs = bonsai_hg_mapping.get_hg_from_bonsai(ctx.clone(), repoid, bcs_id); @@ -1475,7 +1479,7 @@ impl BlobRepo { Some(hg_cs_id) => { future::ok(Loop::Break(Some(hg_cs_id))).left_future() } - None => hg_generation_lease + None => derived_data_lease .wait_for_other_leases(&key) .then(|_| Ok(Loop::Continue(()))) .right_future(), @@ -1493,7 +1497,7 @@ impl BlobRepo { put_success: bool, ) -> impl Future + Send { let key = self.generate_lease_key(&bcs_id); - self.hg_generation_lease.release_lease(&key, put_success) + self.derived_data_lease.release_lease(&key, put_success) } fn generate_hg_changeset( @@ -2610,7 +2614,7 @@ impl Clone for BlobRepo { bonsai_hg_mapping: self.bonsai_hg_mapping.clone(), repoid: self.repoid.clone(), changeset_fetcher_factory: self.changeset_fetcher_factory.clone(), - hg_generation_lease: self.hg_generation_lease.clone(), + derived_data_lease: self.derived_data_lease.clone(), filestore_config: self.filestore_config.clone(), } } @@ -2636,3 +2640,16 @@ where }) .buffer_unordered(100) } + +impl UnittestOverride> for BlobRepo { + fn unittest_override(&self, modify: F) -> Self + where + F: FnOnce(Arc) -> Arc, + { + let derived_data_lease = modify(self.derived_data_lease.clone()); + BlobRepo { + derived_data_lease, + ..self.clone() + } + } +} diff --git a/blobrepo/src/utils.rs b/blobrepo/src/utils.rs index a6070d832f..e213ed0d1a 100644 --- a/blobrepo/src/utils.rs +++ b/blobrepo/src/utils.rs @@ -84,3 +84,13 @@ impl IncompleteFilenodes { .map(move |_| cs_id) } } + +/// Create new instance of implementing object with overridden field of spcecified type. +/// +/// This trait only supposed to be used from unittests, when it is necessary to replace +/// some of the fields to better tests specific behaviour. +pub trait UnittestOverride { + fn unittest_override(&self, modify: F) -> Self + where + F: FnOnce(T) -> T; +} diff --git a/blobstore/cacheblob/src/lib.rs b/blobstore/cacheblob/src/lib.rs index 556ea69176..8d99376904 100644 --- a/blobstore/cacheblob/src/lib.rs +++ b/blobstore/cacheblob/src/lib.rs @@ -12,6 +12,7 @@ pub use crate::cachelib_cache::{new_cachelib_blobstore, new_cachelib_blobstore_n pub mod dummy; mod in_process_lease; +pub use in_process_lease::InProcessLease; mod locking_cache; pub use crate::locking_cache::{ diff --git a/derived_data/src/derive_impl.rs b/derived_data/src/derive_impl.rs index 5c9e9fba00..fa430068dd 100644 --- a/derived_data/src/derive_impl.rs +++ b/derived_data/src/derive_impl.rs @@ -10,11 +10,14 @@ use blobrepo::BlobRepo; use cloned::cloned; use context::CoreContext; use failure::Error; -use futures::{future, stream, Future, Stream}; -use futures_ext::{bounded_traversal, FutureExt}; +use futures::{ + future::{self, Loop}, + stream, Future, IntoFuture, Stream, +}; +use futures_ext::{bounded_traversal, FutureExt, StreamExt}; use mononoke_types::ChangesetId; use std::{ - collections::{HashMap, HashSet}, + collections::HashSet, sync::{Arc, Mutex}, }; use topo_sort::sort_topological; @@ -24,8 +27,6 @@ use topo_sort::sort_topological; /// nothing will be generated. Otherwise this function will generate data for this commit and for /// all it's ancestors that didn't have this derived data. /// -/// TODO(T47650154) - add memcache leases to prevent deriving the data for the same commit at -/// the same time /// TODO(T47650184) - log to scuba and ods how long it took to generate derived data pub(crate) fn derive_impl< Derived: BonsaiDerived, @@ -73,10 +74,10 @@ pub(crate) fn derive_impl< } }) .filter_map(|x| x) // Remove all None - .collect() + .collect_to() .map(|v| { stream::iter_ok( - sort_topological(&v.into_iter().collect::>()) + sort_topological(&v) .expect("commit graph has cycles!") .into_iter() .rev(), @@ -103,6 +104,14 @@ where { let bcs_fut = repo.get_bonsai_changeset(ctx.clone(), bcs_id.clone()); + let lease = repo.get_derived_data_lease_ops(); + let lease_key = Arc::new(format!( + "repo{}.{}.{}", + repo.get_repoid().id(), + Derived::NAME, + bcs_id + )); + let changeset_fetcher = repo.get_changeset_fetcher(); let derived_parents = changeset_fetcher @@ -116,13 +125,70 @@ where } }); - bcs_fut - .join(derived_parents) - .and_then({ - cloned!(ctx); - move |(bcs, parents)| Derived::derive_from_parents(ctx, repo, bcs, parents) - }) - .and_then(move |derived| derived_mapping.put(ctx, bcs_id, derived)) + bcs_fut.join(derived_parents).and_then({ + cloned!(ctx); + move |(bcs, parents)| { + future::loop_fn((), move |()| { + lease + .try_add_put_lease(&lease_key) + .then(|result| { + // In case of lease unavailability we do not want to stall + // generation of all derived data, since lease is a soft lock + // it is safe to assume that we successfuly acquired it + match result { + Ok(leased) => Ok((leased, false)), + Err(_) => Ok((false, true)), + } + }) + .and_then({ + cloned!(ctx, repo, derived_mapping, lease, lease_key, bcs, parents); + move |(leased, ignored)| { + derived_mapping + .get(ctx.clone(), vec![bcs_id]) + .map(move |mut vs| vs.remove(&bcs_id)) + .and_then({ + cloned!(derived_mapping, lease, lease_key); + move |derived| match derived { + Some(_) => future::ok(Loop::Break(())).left_future(), + None => { + if leased || ignored { + Derived::derive_from_parents( + ctx.clone(), + repo, + bcs, + parents, + ) + .and_then(move |derived| { + derived_mapping.put(ctx, bcs_id, derived) + }) + .map(|_| Loop::Break(())) + .left_future() + .right_future() + } else { + lease + .wait_for_other_leases(&lease_key) + .then(|_| Ok(Loop::Continue(()))) + .right_future() + .right_future() + } + } + } + }) + .then(move |result| { + if leased { + lease + .release_lease(&lease_key, result.is_ok()) + .then(|_| result) + .right_future() + } else { + result.into_future().left_future() + } + }) + } + }) + }) + } + }) } fn fetch_derived_may_panic( @@ -176,23 +242,34 @@ impl DeriveNode { mod test { use super::*; + use blobrepo::UnittestOverride; use bookmarks::BookmarkName; + use cacheblob::LeaseOps; + use failure::err_msg; use fixtures::{ branch_even, branch_uneven, branch_wide, linear, many_diamonds, many_files_dirs, merge_even, merge_uneven, unshared_merge_even, unshared_merge_uneven, }; use futures_ext::BoxFuture; + use lock_ext::LockExt; use maplit::hashmap; + use mercurial_types::HgChangesetId; use mononoke_types::BonsaiChangeset; use revset::AncestorsNodeStream; - use std::collections::HashMap; - use std::sync::{Arc, Mutex}; + use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, Mutex}, + time::Duration, + }; use tokio::runtime::Runtime; - #[derive(Clone, Hash, Eq, Ord, PartialEq, PartialOrd)] + #[derive(Clone, Hash, Eq, Ord, PartialEq, PartialOrd, Debug)] struct TestGenNum(u64, ChangesetId, Vec); impl BonsaiDerived for TestGenNum { + const NAME: &'static str = "test_gen_num"; + fn derive_from_parents( _ctx: CoreContext, _repo: BlobRepo, @@ -210,6 +287,7 @@ mod test { } } + #[derive(Debug)] struct TestMapping { mapping: Arc>>, } @@ -304,9 +382,9 @@ mod test { } #[test] - fn test_derive_linear() { + fn test_derive_linear() -> Result<(), Error> { let ctx = CoreContext::test_mock(); - let mut runtime = Runtime::new().unwrap(); + let mut runtime = Runtime::new()?; let repo = branch_even::getrepo(); derive_for_master(&mut runtime, ctx.clone(), repo.clone()); @@ -337,5 +415,150 @@ mod test { let repo = many_diamonds::getrepo(&mut runtime); derive_for_master(&mut runtime, ctx.clone(), repo.clone()); + + Ok(()) + } + + #[test] + fn test_leases() -> Result<(), Error> { + let ctx = CoreContext::test_mock(); + let mut runtime = Runtime::new()?; + let mapping = Arc::new(TestMapping::new()); + let repo = linear::getrepo(); + + let hg_csid = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?; + let csid = runtime + .block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_csid))? + .ok_or(err_msg("known hg does not have bonsai csid"))?; + + let lease = repo.get_derived_data_lease_ops(); + let lease_key = Arc::new(format!( + "repo{}.{}.{}", + repo.get_repoid().id(), + TestGenNum::NAME, + csid + )); + + // take lease + assert_eq!( + runtime.block_on(lease.try_add_put_lease(&lease_key)), + Ok(true) + ); + assert_eq!( + runtime.block_on(lease.try_add_put_lease(&lease_key)), + Ok(false) + ); + + let output = Arc::new(Mutex::new(Vec::new())); + runtime.spawn( + TestGenNum::derive(ctx.clone(), repo.clone(), mapping.clone(), csid).then({ + cloned!(output); + move |result| { + output.with(move |output| output.push(result)); + Ok::<_, ()>(()) + } + }), + ); + + // schedule derivation + runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?; + assert_eq!( + runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + HashMap::new() + ); + + // release lease + runtime + .block_on(lease.release_lease(&lease_key, false)) + .map_err(|_| err_msg("failed to release a lease"))?; + + runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?; + let result = match output.with(|output| output.pop()) { + Some(result) => result?, + None => panic!("scheduled derivation should have been completed"), + }; + assert_eq!( + runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + hashmap! { csid => result.clone() } + ); + + // take lease + assert_eq!( + runtime.block_on(lease.try_add_put_lease(&lease_key)), + Ok(true), + ); + // should succed as lease should not be request + assert_eq!( + runtime.block_on(TestGenNum::derive( + ctx.clone(), + repo.clone(), + mapping.clone(), + csid + ))?, + result + ); + runtime + .block_on(lease.release_lease(&lease_key, false)) + .map_err(|_| err_msg("failed to release a lease"))?; + + Ok(()) + } + + #[derive(Debug)] + struct FailingLease; + + impl LeaseOps for FailingLease { + fn try_add_put_lease(&self, _key: &str) -> BoxFuture { + future::err(()).boxify() + } + + fn wait_for_other_leases(&self, _key: &str) -> BoxFuture<(), ()> { + future::err(()).boxify() + } + + fn release_lease(&self, _key: &str, _put_success: bool) -> BoxFuture<(), ()> { + future::err(()).boxify() + } + } + + #[test] + fn test_always_failing_lease() -> Result<(), Error> { + let ctx = CoreContext::test_mock(); + let mut runtime = Runtime::new()?; + let mapping = Arc::new(TestMapping::new()); + let repo = linear::getrepo().unittest_override(|_| Arc::new(FailingLease)); + + let hg_csid = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?; + let csid = runtime + .block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_csid))? + .ok_or(err_msg("known hg does not have bonsai csid"))?; + + let lease = repo.get_derived_data_lease_ops(); + let lease_key = Arc::new(format!( + "repo{}.{}.{}", + repo.get_repoid().id(), + TestGenNum::NAME, + csid + )); + + // takig lease should fail + assert_eq!( + runtime.block_on(lease.try_add_put_lease(&lease_key)), + Err(()) + ); + + // should succeed even though lease always fails + let result = runtime.block_on(TestGenNum::derive( + ctx.clone(), + repo.clone(), + mapping.clone(), + csid, + ))?; + assert_eq!( + runtime.block_on(mapping.get(ctx.clone(), vec![csid]))?, + hashmap! { csid => result }, + ); + + Ok(()) } } diff --git a/derived_data/src/lib.rs b/derived_data/src/lib.rs index d9d29412c4..4f459f26b5 100644 --- a/derived_data/src/lib.rs +++ b/derived_data/src/lib.rs @@ -4,6 +4,8 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. +#![deny(warnings)] + use blobrepo::BlobRepo; use context::CoreContext; use failure::Error; @@ -16,6 +18,12 @@ mod derive_impl; /// Trait for the data that can be derived from bonsai changeset. /// Examples of that are hg changeset id, unodes root manifest id, git changeset ids etc pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone { + /// Name of derived data + /// + /// Should be unique string (among derived data types), which is used to identify or + /// name data (for example lease keys) assoicated with particular derived data type. + const NAME: &'static str; + /// Defines how to derive new representation for bonsai having derivations /// for parents and having a current bonsai object. /// diff --git a/manifest/derive_unode_manifest/derived_data_unodes.rs b/manifest/derive_unode_manifest/derived_data_unodes.rs index e8171a9d2d..d3981c787b 100644 --- a/manifest/derive_unode_manifest/derived_data_unodes.rs +++ b/manifest/derive_unode_manifest/derived_data_unodes.rs @@ -50,6 +50,8 @@ impl From for BlobstoreBytes { } impl BonsaiDerived for RootUnodeManifestId { + const NAME: &'static str = "unodes"; + fn derive_from_parents( ctx: CoreContext, repo: BlobRepo,