mononoke: remove retries

Summary:
Putting retries on this layer is not very good, because it requires every
client to add RetryingBlobstore.

Reviewed By: kulshrax

Differential Revision: D6298254

fbshipit-source-id: dbdce7fe141f9e1511322e74a1258d3819a68eb5
This commit is contained in:
Stanislau Hlebik 2017-11-13 03:10:33 -08:00 committed by Facebook Github Bot
parent 2ac5581e5d
commit 83f2eb90a5
4 changed files with 3 additions and 316 deletions

View File

@ -18,10 +18,8 @@ use std::sync::Arc;
use futures::Future; use futures::Future;
mod boxed; mod boxed;
mod retrying;
pub use boxed::{ArcBlobstore, BoxBlobstore}; pub use boxed::{ArcBlobstore, BoxBlobstore};
pub use retrying::RetryingBlobstore;
/// Basic trait for the Blob Store interface /// Basic trait for the Blob Store interface
/// ///

View File

@ -1,133 +0,0 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::error;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use futures::{Future, IntoFuture};
use futures::future::{loop_fn, Loop};
use futures::sync::oneshot;
use futures_ext::{BoxFuture, FutureExt};
use tokio_core::reactor::{Remote, Timeout};
use super::Blobstore;
use boxed::ArcBlobstore;
struct SyncPhantomData<E>(PhantomData<E>);
unsafe impl<E> Sync for SyncPhantomData<E> {}
/// Blobstore that retries failed put/get operations with provided delay
pub struct RetryingBlobstore<K, Vi, Vo, E, EInner> {
blobstore: ArcBlobstore<K, Vi, Vo, EInner>,
remote: Remote,
get_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
put_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
_phantom: SyncPhantomData<E>,
}
impl<K, Vi, Vo, E, EInner> RetryingBlobstore<K, Vi, Vo, E, EInner> {
/// Take a blobstore and add retry functionality to it
pub fn new(
blobstore: ArcBlobstore<K, Vi, Vo, EInner>,
remote: &Remote,
get_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
put_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
) -> Self {
RetryingBlobstore {
blobstore,
remote: remote.clone(),
get_retry_delay,
put_retry_delay,
_phantom: SyncPhantomData(PhantomData),
}
}
}
fn delay_for<E>(remote: &Remote, delay: Duration) -> BoxFuture<(), E>
where
E: From<::std::io::Error> + From<oneshot::Canceled> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
remote.spawn(move |handle| {
Timeout::new(delay, handle)
.into_future()
.and_then(|x| x)
.map_err(E::from)
.then(move |finished| tx.send(finished))
.map_err(|_| ())
});
rx.from_err().and_then(|x| x).boxify()
}
impl<K, Vi, Vo, E, EInner> Blobstore for RetryingBlobstore<K, Vi, Vo, E, EInner>
where
K: Clone + Send + 'static,
Vi: Clone + Send + 'static,
Vo: AsRef<[u8]> + Send + 'static,
E: From<EInner> + From<::std::io::Error> + From<oneshot::Canceled> + error::Error + Send + 'static,
EInner: error::Error + Send + 'static,
{
type Key = K;
type ValueIn = Vi;
type ValueOut = Vo;
type Error = E;
type GetBlob = BoxFuture<Option<Self::ValueOut>, Self::Error>;
type PutBlob = BoxFuture<(), Self::Error>;
fn get(&self, key: &Self::Key) -> Self::GetBlob {
loop_fn((0, None), {
let blobstore = self.blobstore.clone();
let remote = self.remote.clone();
let key = key.clone();
let retry_delay = self.get_retry_delay.clone();
move |(attempt, _): (usize, Option<Vo>)| {
blobstore.get(&key).from_err().then({
let remote = remote.clone();
let retry_delay = retry_delay.clone();
move |result| match result {
Ok(resp) => Ok(Loop::Break((attempt, resp))).into_future().boxify(),
Err(err) => match retry_delay(attempt) {
None => Err(err).into_future().boxify(),
Some(dur) => delay_for(&remote, dur)
.map(move |()| Loop::Continue((attempt + 1, None)))
.boxify(),
},
}
})
}
}).map(|(_, resp)| resp)
.boxify()
}
fn put(&self, key: Self::Key, value: Self::ValueIn) -> Self::PutBlob {
loop_fn(0, {
let blobstore = self.blobstore.clone();
let remote = self.remote.clone();
let retry_delay = self.put_retry_delay.clone();
move |attempt| {
blobstore.put(key.clone(), value.clone()).from_err().then({
let remote = remote.clone();
let retry_delay = retry_delay.clone();
move |result| match result {
Ok(()) => Ok(Loop::Break(attempt)).into_future().boxify(),
Err(err) => match retry_delay(attempt) {
None => Err(err).into_future().boxify(),
Some(dur) => delay_for(&remote, dur)
.map(move |()| Loop::Continue(attempt + 1))
.boxify(),
},
}
})
}
}).map(|_| ())
.boxify()
}
}

View File

@ -13,8 +13,6 @@
extern crate error_chain; extern crate error_chain;
extern crate futures; extern crate futures;
extern crate futures_ext; extern crate futures_ext;
#[macro_use]
extern crate lazy_static;
extern crate tempdir; extern crate tempdir;
extern crate tokio_core; extern crate tokio_core;
@ -23,16 +21,10 @@ extern crate fileblob;
extern crate memblob; extern crate memblob;
extern crate rocksblob; extern crate rocksblob;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::Future; use futures::Future;
use futures::future::IntoFuture;
use futures_ext::{BoxFuture, FutureExt};
use tempdir::TempDir; use tempdir::TempDir;
use tokio_core::reactor::{Core, Remote};
use blobstore::{Blobstore, RetryingBlobstore}; use blobstore::Blobstore;
use fileblob::Fileblob; use fileblob::Fileblob;
use memblob::Memblob; use memblob::Memblob;
use rocksblob::Rocksblob; use rocksblob::Rocksblob;
@ -149,155 +141,3 @@ blobstore_test_impl! {
persistent: true, persistent: true,
} }
} }
mod flaky_errors {
error_chain! {
errors {
Flakiness {
description("flakiness happend")
}
}
foreign_links {
Io(::std::io::Error);
Oneshot(::futures::sync::oneshot::Canceled);
}
}
}
use flaky_errors::{Error as FlakyError, ErrorKind as FlakyErrorKind};
struct FlakyBlobstore {
blobstore: Memblob,
flakiness: Mutex<usize>, // number of calls that will fail
}
impl Blobstore for FlakyBlobstore {
type Key = String;
type ValueIn = Vec<u8>;
type ValueOut = Self::ValueIn;
type Error = FlakyError;
type PutBlob = BoxFuture<(), Self::Error>;
type GetBlob = BoxFuture<Option<Self::ValueOut>, Self::Error>;
fn put(&self, k: Self::Key, v: Self::ValueIn) -> Self::PutBlob {
let mut flakiness = self.flakiness.lock().expect("lock poison");
if *flakiness == 0 {
self.blobstore
.put(k, v)
.map_err(|_| FlakyError::from("never happens"))
.boxify()
} else {
*flakiness = (*flakiness) - 1;
Err(FlakyErrorKind::Flakiness.into()).into_future().boxify()
}
}
fn get(&self, k: &Self::Key) -> Self::GetBlob {
let mut flakiness = self.flakiness.lock().expect("lock poison");
if *flakiness == 0 {
self.blobstore
.get(k)
.map_err(|_| FlakyError::from("never happens"))
.boxify()
} else {
*flakiness = *flakiness - 1;
Err(FlakyErrorKind::Flakiness.into()).into_future().boxify()
}
}
}
#[cfg(test)]
mod retry_tests {
use super::*;
use std::sync::mpsc::channel;
lazy_static! {
static ref REMOTE: Remote = {
let (tx, rx) = channel();
::std::thread::spawn(move || {
let mut core = Core::new().expect("failed to create tokio Core");
tx.send(core.remote()).unwrap();
loop {
core.turn(None);
}
});
rx.recv().unwrap()
};
}
fn flaky_blobstore(
flakiness: usize,
max_attempts: usize,
) -> RetryingBlobstore<String, Vec<u8>, Vec<u8>, FlakyError, FlakyError> {
let blobstore = FlakyBlobstore {
blobstore: Memblob::new(),
flakiness: Mutex::new(flakiness),
};
let retry_delay = Arc::new(move |attempt| if attempt < max_attempts {
Some(Duration::from_secs(0))
} else {
None
});
RetryingBlobstore::new(
blobstore.arced(),
&*REMOTE,
retry_delay.clone(),
retry_delay,
)
}
fn check_failing(flakiness: usize, max_attempts: usize) {
let blobstore = flaky_blobstore(flakiness, max_attempts);
let foo = "foo".to_owned();
let bar = b"bar"[..].into();
match blobstore
.put(foo.clone(), bar)
.wait()
.expect_err("error expected")
{
FlakyError(FlakyErrorKind::Flakiness, _) => (),
err => panic!("unexpected error: {:?}", err),
}
let blobstore = flaky_blobstore(flakiness, max_attempts);
match blobstore.get(&foo).wait().expect_err("error expected") {
FlakyError(FlakyErrorKind::Flakiness, _) => (),
err => panic!("unexpected error: {:?}", err),
}
}
fn check_succeeding(flakiness: usize, max_attempts: usize) {
let blobstore = flaky_blobstore(flakiness, max_attempts);
let foo = "foo".to_owned();
let bar: Vec<u8> = b"bar"[..].into();
blobstore
.put(foo.clone(), bar.clone())
.wait()
.expect("success expected");
assert_eq!(
blobstore.get(&foo).wait().expect("success expected"),
Some(bar)
);
}
#[test]
fn test_one_attempt() {
check_succeeding(0, 0);
check_failing(1, 0);
}
#[test]
fn test_multiple_attempts_succeeding() {
check_succeeding(0, 2);
check_succeeding(1, 2);
check_succeeding(2, 2);
}
#[test]
fn test_multiple_attempts_failing() {
check_failing(3, 2);
check_failing(10, 2);
check_failing(6, 5);
}
}

View File

@ -46,7 +46,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc::sync_channel; use std::sync::mpsc::sync_channel;
use std::thread; use std::thread;
use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use clap::{App, Arg, ArgMatches}; use clap::{App, Arg, ArgMatches};
@ -58,7 +57,7 @@ use stats::Timeseries;
use tokio_core::reactor::{Core, Remote}; use tokio_core::reactor::{Core, Remote};
use blobrepo::BlobChangeset; use blobrepo::BlobChangeset;
use blobstore::{Blobstore, RetryingBlobstore}; use blobstore::Blobstore;
use fileblob::Fileblob; use fileblob::Fileblob;
use fileheads::FileHeads; use fileheads::FileHeads;
use futures_ext::{BoxFuture, FutureExt}; use futures_ext::{BoxFuture, FutureExt};
@ -245,24 +244,7 @@ fn open_blobstore<P: Into<PathBuf>>(
} }
BlobstoreType::Manifold(bucket) => { BlobstoreType::Manifold(bucket) => {
let mb: ManifoldBlob<String, Bytes> = ManifoldBlob::new_may_panic(bucket, remote); let mb: ManifoldBlob<String, Bytes> = ManifoldBlob::new_may_panic(bucket, remote);
let rmb: RetryingBlobstore< mb.arced()
String,
Bytes,
Vec<u8>,
Error,
manifoldblob::Error,
> = RetryingBlobstore::new(
mb.arced(),
remote,
Arc::new(|_| None),
Arc::new(|attempt| if attempt > 3 {
None
} else {
// 100ms 400ms 1.6s 6.4s
Some(Duration::from_millis(100 * 4u64.pow(attempt as u32)))
}),
);
rmb.arced()
} }
}; };