mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 00:45:18 +03:00
mononoke: change leases logic
Summary: At the moment our memcache lease implementation has surprising side effect - releasing the lease with `put_success = true` sets a memcache key, which means that until this key is evicted from memcache it's not possible to take this lease again (`try_add_put_lease()` fails with because it uses memcache.add_with_ttl() method, which returns false if a key already exists in memcache). That caused a deadlock in our derived data generation: 1) `derive_may_panic()` function does a few things - it actually derives the data, then releases the lease but it DOES NOT update derived data mapping (mapping is updated later in a batch for performance reasons). 2) So the following situation is possible - data was successfully derived, memcache lease key was updated (hence taking the lease one more time is not possible) but mapping update failed. This is an invalid state - any attempt to get the lease fails, but mapping doesn't exist either, and workers just loop forever. There seems to be a few ways to fix the deadlock, but changing release_lease() logic seems to cause less surprises in future - we expect memcache leases to be a "soft lock", and when a lock was unlocked it should be possible to lock it again. So currently release_lease() checks the value that's set in the memcache key. If this value is LockState::uploaded(...) (which can be set only by MemcacheLease::put()`), then nothing is done to this key, otherwise the key is deleted. this ensure that 1) No changes to the current behaviour of the CachingBlobstore - lease is invalidated on inserting a new value in the cache 2) Deadlock issue is fixed, because release_lease() now deletes the key. Reviewed By: krallin Differential Revision: D19468234 fbshipit-source-id: 85525bc11802a0fd4d94a692bf7c6dd4569a6f46
This commit is contained in:
parent
8e3ecba192
commit
c77969df2c
@ -1483,7 +1483,7 @@ impl BlobRepo {
|
||||
maybe_hg_cs
|
||||
.and_then(move |maybe_hg_cs| match maybe_hg_cs {
|
||||
Some(hg_cs) => repo
|
||||
.release_hg_generation_lease(bcs_id, true)
|
||||
.release_hg_generation_lease(bcs_id)
|
||||
.then(move |_| Ok(Loop::Break(Some(hg_cs))))
|
||||
.left_future(),
|
||||
None => future::ok(Loop::Break(None)).right_future(),
|
||||
@ -1510,10 +1510,9 @@ impl BlobRepo {
|
||||
fn release_hg_generation_lease(
|
||||
&self,
|
||||
bcs_id: ChangesetId,
|
||||
put_success: bool,
|
||||
) -> impl Future<Item = (), Error = ()> + Send {
|
||||
let key = self.generate_lease_key(&bcs_id);
|
||||
self.derived_data_lease.release_lease(&key, put_success)
|
||||
self.derived_data_lease.release_lease(&key)
|
||||
}
|
||||
|
||||
fn generate_hg_changeset(
|
||||
@ -1745,7 +1744,7 @@ impl BlobRepo {
|
||||
}
|
||||
})
|
||||
.then(move |res| {
|
||||
repo.release_hg_generation_lease(bcs_id, res.is_ok())
|
||||
repo.release_hg_generation_lease(bcs_id)
|
||||
.then(move |_| res.map(|hg_cs_id| (hg_cs_id, true)))
|
||||
})
|
||||
.right_future()
|
||||
|
@ -26,7 +26,7 @@ impl LeaseOps for DummyLease {
|
||||
Ok(()).into_future().boxify()
|
||||
}
|
||||
|
||||
fn release_lease(&self, _key: &str, _put_success: bool) -> BoxFuture<(), ()> {
|
||||
fn release_lease(&self, _key: &str) -> BoxFuture<(), ()> {
|
||||
Ok(()).into_future().boxify()
|
||||
}
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ impl LeaseOps for InProcessLease {
|
||||
}
|
||||
}
|
||||
|
||||
fn release_lease(&self, key: &str, _put_success: bool) -> BoxFuture<(), ()> {
|
||||
fn release_lease(&self, key: &str) -> BoxFuture<(), ()> {
|
||||
let mut in_flight_leases = self.leases.lock().expect("lock poisoned");
|
||||
|
||||
if let Some((sender, _)) = in_flight_leases.remove(key) {
|
||||
|
@ -93,10 +93,8 @@ pub trait LeaseOps: fmt::Debug + Send + Sync + 'static {
|
||||
/// It is acceptable to return from this future without checking the state of the cache entry.
|
||||
fn wait_for_other_leases(&self, key: &str) -> BoxFuture<(), ()>;
|
||||
|
||||
/// Releases any leases held on `key`. `put_success` is a hint; if it is `true`, the entry
|
||||
/// can transition from Leased to either Present or Empty, while if it is `false`, the entry
|
||||
/// must transition from Leased to Empty.
|
||||
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()>;
|
||||
/// Releases any leases held on `key`. The entry must transition from Leased to Empty.
|
||||
fn release_lease(&self, key: &str) -> BoxFuture<(), ()>;
|
||||
}
|
||||
|
||||
impl<C> CacheOps for Arc<C>
|
||||
@ -128,8 +126,8 @@ where
|
||||
self.as_ref().wait_for_other_leases(key)
|
||||
}
|
||||
|
||||
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()> {
|
||||
self.as_ref().release_lease(key, put_success)
|
||||
fn release_lease(&self, key: &str) -> BoxFuture<(), ()> {
|
||||
self.as_ref().release_lease(key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,31 +269,23 @@ where
|
||||
|
||||
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
|
||||
let can_put = self.take_put_lease(&key);
|
||||
let cache_put = CacheOpsUtil::put(&self.cache, &key, value.clone())
|
||||
.join(future::lazy({
|
||||
let lease = self.lease.clone();
|
||||
let key = key.clone();
|
||||
move || lease.release_lease(&key, true).or_else(|_| Ok(()))
|
||||
}))
|
||||
.then(|_| Ok(()));
|
||||
let cache_put = CacheOpsUtil::put(&self.cache, &key, value.clone());
|
||||
|
||||
let blobstore_put = future::lazy({
|
||||
let blobstore = self.blobstore.clone();
|
||||
let lease = self.lease.clone();
|
||||
let key = key.clone();
|
||||
move || {
|
||||
blobstore
|
||||
.put(ctx, key.clone(), value)
|
||||
.or_else(move |r| lease.release_lease(&key, false).then(|_| Err(r)))
|
||||
}
|
||||
cloned!(self.blobstore, key);
|
||||
move || blobstore.put(ctx, key, value)
|
||||
});
|
||||
|
||||
cloned!(self.lease);
|
||||
can_put
|
||||
.and_then(move |can_put| {
|
||||
if can_put {
|
||||
Either::A(blobstore_put.and_then(move |_| cache_put))
|
||||
blobstore_put
|
||||
.and_then(|_| cache_put)
|
||||
.then(move |res| lease.release_lease(&key).then(move |_| res))
|
||||
.left_future()
|
||||
} else {
|
||||
Either::B(Ok(()).into_future())
|
||||
Ok(()).into_future().right_future()
|
||||
}
|
||||
})
|
||||
.boxify()
|
||||
|
@ -9,9 +9,13 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Error;
|
||||
use cloned::cloned;
|
||||
use fbinit::FacebookInit;
|
||||
use fbthrift::compact_protocol;
|
||||
use futures::{future::Either, Future, IntoFuture};
|
||||
use futures::{
|
||||
future::{self, Either},
|
||||
Future, IntoFuture,
|
||||
};
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use memcache::{KeyGen, MemcacheClient};
|
||||
|
||||
@ -55,7 +59,10 @@ mod LEASE_STATS {
|
||||
wait_ms: dynamic_timeseries("{}.wait_ms", (lease_type: &'static str); Rate, Sum),
|
||||
release: dynamic_timeseries("{}.release", (lease_type: &'static str); Rate, Sum),
|
||||
release_good: dynamic_timeseries("{}.release_good", (lease_type: &'static str); Rate, Sum),
|
||||
release_bad: dynamic_timeseries("{}.release_bad", (lease_type: &'static str); Rate, Sum),
|
||||
release_held_by_other: dynamic_timeseries("{}.release_held_by_other", (lease_type: &'static str); Rate, Sum),
|
||||
release_bad_key: dynamic_timeseries("{}.release_bad_key", (lease_type: &'static str); Rate, Sum),
|
||||
release_key_set: dynamic_timeseries("{}.release_key_set", (lease_type: &'static str); Rate, Sum),
|
||||
release_no_lease: dynamic_timeseries("{}.release_no_lease", (lease_type: &'static str); Rate, Sum),
|
||||
}
|
||||
pub use self::STATS::*;
|
||||
}
|
||||
@ -84,20 +91,25 @@ fn mc_raw_put(
|
||||
let uploaded = compact_protocol::serialize(&LockState::uploaded_key(orig_key));
|
||||
|
||||
STATS::presence_put.add_value(1);
|
||||
memcache.set(presence_key, uploaded).then(move |res| {
|
||||
if let Err(_) = res {
|
||||
STATS::presence_put_err.add_value(1);
|
||||
}
|
||||
if value.len() < MEMCACHE_MAX_SIZE {
|
||||
STATS::blob_put.add_value(1);
|
||||
Either::A(memcache.set(key, value.into_bytes()).or_else(|_| {
|
||||
STATS::blob_put_err.add_value(1);
|
||||
Ok(()).into_future()
|
||||
}))
|
||||
} else {
|
||||
Either::B(Ok(()).into_future())
|
||||
}
|
||||
})
|
||||
// This cache key is read by leases, and if it's set then lease can't be reacquired.
|
||||
// To be on the safe side let's add a ttl on this memcache key.
|
||||
let lock_ttl = Duration::from_secs(50);
|
||||
memcache
|
||||
.set_with_ttl(presence_key, uploaded, lock_ttl)
|
||||
.then(move |res| {
|
||||
if let Err(_) = res {
|
||||
STATS::presence_put_err.add_value(1);
|
||||
}
|
||||
if value.len() < MEMCACHE_MAX_SIZE {
|
||||
STATS::blob_put.add_value(1);
|
||||
Either::A(memcache.set(key, value.into_bytes()).or_else(|_| {
|
||||
STATS::blob_put_err.add_value(1);
|
||||
Ok(()).into_future()
|
||||
}))
|
||||
} else {
|
||||
Either::B(Ok(()).into_future())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
impl MemcacheOps {
|
||||
@ -285,17 +297,72 @@ impl LeaseOps for MemcacheOps {
|
||||
tokio_timer::sleep(retry_delay).map_err(|_| ()).boxify()
|
||||
}
|
||||
|
||||
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()> {
|
||||
fn release_lease(&self, key: &str) -> BoxFuture<(), ()> {
|
||||
let mc_key = self.presence_keygen.key(key);
|
||||
LEASE_STATS::release.add_value(1, (self.lease_type,));
|
||||
if put_success {
|
||||
let uploaded = compact_protocol::serialize(&LockState::uploaded_key(key.to_string()));
|
||||
LEASE_STATS::release_good.add_value(1, (self.lease_type,));
|
||||
cloned!(self.memcache, self.hostname, self.lease_type);
|
||||
|
||||
self.memcache.set(mc_key, uploaded).boxify()
|
||||
} else {
|
||||
LEASE_STATS::release_bad.add_value(1, (self.lease_type,));
|
||||
self.memcache.del(mc_key).boxify()
|
||||
}
|
||||
// This future checks the state of the lease, and releases it only
|
||||
// if it's locked by us right now.
|
||||
let f = future::lazy(move || {
|
||||
memcache
|
||||
.get(mc_key.clone())
|
||||
.and_then(move |maybe_data| match maybe_data {
|
||||
Some(bytes) => {
|
||||
let state: Result<LockState, Error> =
|
||||
compact_protocol::deserialize(Vec::from(bytes));
|
||||
match state {
|
||||
Ok(state) => match state {
|
||||
LockState::locked_by(locked_by) => {
|
||||
if locked_by == hostname {
|
||||
LEASE_STATS::release_good.add_value(1, (lease_type,));
|
||||
// The lease is held by us - just remove it
|
||||
memcache.del(mc_key).left_future()
|
||||
} else {
|
||||
LEASE_STATS::release_held_by_other
|
||||
.add_value(1, (lease_type,));
|
||||
// Someone else grabbed a lease, leave it alone
|
||||
future::ok(()).right_future()
|
||||
}
|
||||
}
|
||||
LockState::uploaded_key(up_key) => {
|
||||
if up_key != mc_key {
|
||||
LEASE_STATS::release_bad_key.add_value(1, (lease_type,));
|
||||
// Invalid key - fix it up. Normally that shouldn't
|
||||
// ever occur
|
||||
memcache.del(mc_key).left_future()
|
||||
} else {
|
||||
LEASE_STATS::release_key_set.add_value(1, (lease_type,));
|
||||
// Key is valid, and is most likely set by
|
||||
// cache.put(...). Lease is already release,
|
||||
// no need to do anything here
|
||||
future::ok(()).right_future()
|
||||
}
|
||||
}
|
||||
LockState::UnknownField(_) => {
|
||||
LEASE_STATS::release_bad_key.add_value(1, (lease_type,));
|
||||
// Possibly a newer version of the server enabled it?
|
||||
// Don't touch it just in case
|
||||
future::ok(()).right_future()
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
LEASE_STATS::release_bad_key.add_value(1, (lease_type,));
|
||||
// Fix up invalid value
|
||||
memcache.del(mc_key).left_future()
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
LEASE_STATS::release_no_lease.add_value(1, (lease_type,));
|
||||
future::ok(()).right_future()
|
||||
}
|
||||
})
|
||||
});
|
||||
// We don't have to wait for the releasing to finish, it can be done in background
|
||||
// because leases have a timeout. So even if they haven't been released explicitly they
|
||||
// will be released after a timeout.
|
||||
tokio::spawn(f);
|
||||
future::ok(()).boxify()
|
||||
}
|
||||
}
|
||||
|
@ -338,7 +338,7 @@ where
|
||||
.then(move |result| {
|
||||
if leased {
|
||||
lease
|
||||
.release_lease(&lease_key, result.is_ok())
|
||||
.release_lease(&lease_key)
|
||||
.then(|_| result)
|
||||
.right_future()
|
||||
} else {
|
||||
@ -746,7 +746,7 @@ mod test {
|
||||
|
||||
// release lease
|
||||
runtime
|
||||
.block_on(lease.release_lease(&lease_key, false))
|
||||
.block_on(lease.release_lease(&lease_key))
|
||||
.map_err(|_| Error::msg("failed to release a lease"))?;
|
||||
|
||||
runtime.block_on(tokio_timer::sleep(Duration::from_millis(300)))?;
|
||||
@ -775,7 +775,7 @@ mod test {
|
||||
result
|
||||
);
|
||||
runtime
|
||||
.block_on(lease.release_lease(&lease_key, false))
|
||||
.block_on(lease.release_lease(&lease_key))
|
||||
.map_err(|_| Error::msg("failed to release a lease"))?;
|
||||
|
||||
Ok(())
|
||||
@ -793,7 +793,7 @@ mod test {
|
||||
future::err(()).boxify()
|
||||
}
|
||||
|
||||
fn release_lease(&self, _key: &str, _put_success: bool) -> BoxFuture<(), ()> {
|
||||
fn release_lease(&self, _key: &str) -> BoxFuture<(), ()> {
|
||||
future::err(()).boxify()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user