mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 16:57:49 +03:00
blobstore: add RetryingBlobstore that retries failed put/get operations with delay
Reviewed By: StanislavGlebik Differential Revision: D6203017 fbshipit-source-id: 277fa267e86d2cb5eede241bf80dd8d1c90a3b96
This commit is contained in:
parent
c8d6e7f954
commit
9712530c0e
@ -10,14 +10,18 @@ extern crate futures;
|
||||
|
||||
extern crate futures_ext;
|
||||
|
||||
extern crate tokio_core;
|
||||
|
||||
use std::error;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::Future;
|
||||
|
||||
mod boxed;
|
||||
mod retrying;
|
||||
|
||||
pub use boxed::{ArcBlobstore, BoxBlobstore};
|
||||
pub use retrying::RetryingBlobstore;
|
||||
|
||||
/// Basic trait for the Blob Store interface
|
||||
///
|
||||
|
133
blobstore/src/retrying.rs
Normal file
133
blobstore/src/retrying.rs
Normal file
@ -0,0 +1,133 @@
|
||||
// Copyright (c) 2004-present, Facebook, Inc.
|
||||
// All Rights Reserved.
|
||||
//
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use std::error;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::{Future, IntoFuture};
|
||||
use futures::future::{loop_fn, Loop};
|
||||
use futures::sync::oneshot;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use tokio_core::reactor::{Remote, Timeout};
|
||||
|
||||
use super::Blobstore;
|
||||
use boxed::ArcBlobstore;
|
||||
|
||||
struct SyncPhantomData<E>(PhantomData<E>);
|
||||
|
||||
unsafe impl<E> Sync for SyncPhantomData<E> {}
|
||||
|
||||
/// Blobstore that retries failed put/get operations with provided delay
|
||||
pub struct RetryingBlobstore<K, Vi, Vo, E, EInner> {
|
||||
blobstore: ArcBlobstore<K, Vi, Vo, EInner>,
|
||||
remote: Remote,
|
||||
get_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
|
||||
put_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
|
||||
_phantom: SyncPhantomData<E>,
|
||||
}
|
||||
|
||||
impl<K, Vi, Vo, E, EInner> RetryingBlobstore<K, Vi, Vo, E, EInner> {
|
||||
/// Take a blobstore and add retry functionality to it
|
||||
pub fn new(
|
||||
blobstore: ArcBlobstore<K, Vi, Vo, EInner>,
|
||||
remote: &Remote,
|
||||
get_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
|
||||
put_retry_delay: Arc<Fn(usize) -> Option<Duration> + Send + Sync>,
|
||||
) -> Self {
|
||||
RetryingBlobstore {
|
||||
blobstore,
|
||||
remote: remote.clone(),
|
||||
get_retry_delay,
|
||||
put_retry_delay,
|
||||
_phantom: SyncPhantomData(PhantomData),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn delay_for<E>(remote: &Remote, delay: Duration) -> BoxFuture<(), E>
|
||||
where
|
||||
E: From<::std::io::Error> + From<oneshot::Canceled> + Send + 'static,
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
remote.spawn(move |handle| {
|
||||
Timeout::new(delay, handle)
|
||||
.into_future()
|
||||
.and_then(|x| x)
|
||||
.map_err(E::from)
|
||||
.then(move |finished| tx.send(finished))
|
||||
.map_err(|_| ())
|
||||
});
|
||||
|
||||
rx.from_err().and_then(|x| x).boxify()
|
||||
}
|
||||
|
||||
impl<K, Vi, Vo, E, EInner> Blobstore for RetryingBlobstore<K, Vi, Vo, E, EInner>
|
||||
where
|
||||
K: Clone + Send + 'static,
|
||||
Vi: Clone + Send + 'static,
|
||||
Vo: AsRef<[u8]> + Send + 'static,
|
||||
E: From<EInner> + From<::std::io::Error> + From<oneshot::Canceled> + error::Error + Send + 'static,
|
||||
EInner: error::Error + Send + 'static,
|
||||
{
|
||||
type Key = K;
|
||||
type ValueIn = Vi;
|
||||
type ValueOut = Vo;
|
||||
type Error = E;
|
||||
|
||||
type GetBlob = BoxFuture<Option<Self::ValueOut>, Self::Error>;
|
||||
type PutBlob = BoxFuture<(), Self::Error>;
|
||||
|
||||
fn get(&self, key: &Self::Key) -> Self::GetBlob {
|
||||
loop_fn((0, None), {
|
||||
let blobstore = self.blobstore.clone();
|
||||
let remote = self.remote.clone();
|
||||
let key = key.clone();
|
||||
let retry_delay = self.get_retry_delay.clone();
|
||||
move |(attempt, _): (usize, Option<Vo>)| {
|
||||
blobstore.get(&key).from_err().then({
|
||||
let remote = remote.clone();
|
||||
let retry_delay = retry_delay.clone();
|
||||
move |result| match result {
|
||||
Ok(resp) => Ok(Loop::Break((attempt, resp))).into_future().boxify(),
|
||||
Err(err) => match retry_delay(attempt) {
|
||||
None => Err(err).into_future().boxify(),
|
||||
Some(dur) => delay_for(&remote, dur)
|
||||
.map(move |()| Loop::Continue((attempt + 1, None)))
|
||||
.boxify(),
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
}).map(|(_, resp)| resp)
|
||||
.boxify()
|
||||
}
|
||||
|
||||
fn put(&self, key: Self::Key, value: Self::ValueIn) -> Self::PutBlob {
|
||||
loop_fn(0, {
|
||||
let blobstore = self.blobstore.clone();
|
||||
let remote = self.remote.clone();
|
||||
let retry_delay = self.put_retry_delay.clone();
|
||||
move |attempt| {
|
||||
blobstore.put(key.clone(), value.clone()).from_err().then({
|
||||
let remote = remote.clone();
|
||||
let retry_delay = retry_delay.clone();
|
||||
move |result| match result {
|
||||
Ok(()) => Ok(Loop::Break(attempt)).into_future().boxify(),
|
||||
Err(err) => match retry_delay(attempt) {
|
||||
None => Err(err).into_future().boxify(),
|
||||
Some(dur) => delay_for(&remote, dur)
|
||||
.map(move |()| Loop::Continue(attempt + 1))
|
||||
.boxify(),
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
}).map(|_| ())
|
||||
.boxify()
|
||||
}
|
||||
}
|
@ -12,17 +12,27 @@
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
extern crate futures;
|
||||
extern crate futures_ext;
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
extern crate tempdir;
|
||||
extern crate tokio_core;
|
||||
|
||||
extern crate blobstore;
|
||||
extern crate fileblob;
|
||||
extern crate memblob;
|
||||
extern crate rocksblob;
|
||||
|
||||
use futures::Future;
|
||||
use tempdir::TempDir;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use blobstore::Blobstore;
|
||||
use futures::Future;
|
||||
use futures::future::IntoFuture;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use tempdir::TempDir;
|
||||
use tokio_core::reactor::{Core, Remote};
|
||||
|
||||
use blobstore::{Blobstore, RetryingBlobstore};
|
||||
use fileblob::Fileblob;
|
||||
use memblob::Memblob;
|
||||
use rocksblob::Rocksblob;
|
||||
@ -139,3 +149,155 @@ blobstore_test_impl! {
|
||||
persistent: true,
|
||||
}
|
||||
}
|
||||
|
||||
mod flaky_errors {
|
||||
error_chain! {
|
||||
errors {
|
||||
Flakiness {
|
||||
description("flakiness happend")
|
||||
}
|
||||
}
|
||||
foreign_links {
|
||||
Io(::std::io::Error);
|
||||
Oneshot(::futures::sync::oneshot::Canceled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use flaky_errors::{Error as FlakyError, ErrorKind as FlakyErrorKind};
|
||||
|
||||
struct FlakyBlobstore {
|
||||
blobstore: Memblob,
|
||||
flakiness: Mutex<usize>, // number of calls that will fail
|
||||
}
|
||||
|
||||
impl Blobstore for FlakyBlobstore {
|
||||
type Key = String;
|
||||
type ValueIn = Vec<u8>;
|
||||
type ValueOut = Self::ValueIn;
|
||||
type Error = FlakyError;
|
||||
type PutBlob = BoxFuture<(), Self::Error>;
|
||||
type GetBlob = BoxFuture<Option<Self::ValueOut>, Self::Error>;
|
||||
|
||||
fn put(&self, k: Self::Key, v: Self::ValueIn) -> Self::PutBlob {
|
||||
let mut flakiness = self.flakiness.lock().expect("lock poison");
|
||||
if *flakiness == 0 {
|
||||
self.blobstore
|
||||
.put(k, v)
|
||||
.map_err(|_| FlakyError::from("never happens"))
|
||||
.boxify()
|
||||
} else {
|
||||
*flakiness = (*flakiness) - 1;
|
||||
Err(FlakyErrorKind::Flakiness.into()).into_future().boxify()
|
||||
}
|
||||
}
|
||||
|
||||
fn get(&self, k: &Self::Key) -> Self::GetBlob {
|
||||
let mut flakiness = self.flakiness.lock().expect("lock poison");
|
||||
if *flakiness == 0 {
|
||||
self.blobstore
|
||||
.get(k)
|
||||
.map_err(|_| FlakyError::from("never happens"))
|
||||
.boxify()
|
||||
} else {
|
||||
*flakiness = *flakiness - 1;
|
||||
Err(FlakyErrorKind::Flakiness.into()).into_future().boxify()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod retry_tests {
|
||||
use super::*;
|
||||
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
lazy_static! {
|
||||
static ref REMOTE: Remote = {
|
||||
let (tx, rx) = channel();
|
||||
::std::thread::spawn(move || {
|
||||
let mut core = Core::new().expect("failed to create tokio Core");
|
||||
tx.send(core.remote()).unwrap();
|
||||
loop {
|
||||
core.turn(None);
|
||||
}
|
||||
});
|
||||
rx.recv().unwrap()
|
||||
};
|
||||
}
|
||||
|
||||
fn flaky_blobstore(
|
||||
flakiness: usize,
|
||||
max_attempts: usize,
|
||||
) -> RetryingBlobstore<String, Vec<u8>, Vec<u8>, FlakyError, FlakyError> {
|
||||
let blobstore = FlakyBlobstore {
|
||||
blobstore: Memblob::new(),
|
||||
flakiness: Mutex::new(flakiness),
|
||||
};
|
||||
let retry_delay = Arc::new(move |attempt| if attempt < max_attempts {
|
||||
Some(Duration::from_secs(0))
|
||||
} else {
|
||||
None
|
||||
});
|
||||
RetryingBlobstore::new(
|
||||
blobstore.arced(),
|
||||
&*REMOTE,
|
||||
retry_delay.clone(),
|
||||
retry_delay,
|
||||
)
|
||||
}
|
||||
|
||||
fn check_failing(flakiness: usize, max_attempts: usize) {
|
||||
let blobstore = flaky_blobstore(flakiness, max_attempts);
|
||||
let foo = "foo".to_owned();
|
||||
let bar = b"bar"[..].into();
|
||||
match blobstore
|
||||
.put(foo.clone(), bar)
|
||||
.wait()
|
||||
.expect_err("error expected")
|
||||
{
|
||||
FlakyError(FlakyErrorKind::Flakiness, _) => (),
|
||||
err => panic!("unexpected error: {:?}", err),
|
||||
}
|
||||
|
||||
let blobstore = flaky_blobstore(flakiness, max_attempts);
|
||||
match blobstore.get(&foo).wait().expect_err("error expected") {
|
||||
FlakyError(FlakyErrorKind::Flakiness, _) => (),
|
||||
err => panic!("unexpected error: {:?}", err),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_succeeding(flakiness: usize, max_attempts: usize) {
|
||||
let blobstore = flaky_blobstore(flakiness, max_attempts);
|
||||
let foo = "foo".to_owned();
|
||||
let bar: Vec<u8> = b"bar"[..].into();
|
||||
blobstore
|
||||
.put(foo.clone(), bar.clone())
|
||||
.wait()
|
||||
.expect("success expected");
|
||||
assert_eq!(
|
||||
blobstore.get(&foo).wait().expect("success expected"),
|
||||
Some(bar)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_one_attempt() {
|
||||
check_succeeding(0, 0);
|
||||
check_failing(1, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_attempts_succeeding() {
|
||||
check_succeeding(0, 2);
|
||||
check_succeeding(1, 2);
|
||||
check_succeeding(2, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_attempts_failing() {
|
||||
check_failing(3, 2);
|
||||
check_failing(10, 2);
|
||||
check_failing(6, 5);
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ error_chain! {
|
||||
}
|
||||
foreign_links {
|
||||
Io(::std::io::Error);
|
||||
Oneshot(::futures::sync::oneshot::Canceled);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::{App, Arg, ArgMatches};
|
||||
@ -57,7 +58,7 @@ use slog_glog_fmt::default_drain as glog_drain;
|
||||
use tokio_core::reactor::{Core, Remote};
|
||||
|
||||
use blobrepo::BlobChangeset;
|
||||
use blobstore::Blobstore;
|
||||
use blobstore::{Blobstore, RetryingBlobstore};
|
||||
use fileblob::Fileblob;
|
||||
use fileheads::FileHeads;
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
@ -220,7 +221,24 @@ fn open_blobstore(
|
||||
}
|
||||
BlobstoreType::Manifold(bucket) => {
|
||||
let mb: ManifoldBlob<String, Bytes> = ManifoldBlob::new_may_panic(bucket, remote);
|
||||
mb.arced()
|
||||
let rmb: RetryingBlobstore<
|
||||
String,
|
||||
Bytes,
|
||||
Vec<u8>,
|
||||
Error,
|
||||
manifoldblob::Error,
|
||||
> = RetryingBlobstore::new(
|
||||
mb.arced(),
|
||||
remote,
|
||||
Arc::new(|_| None),
|
||||
Arc::new(|attempt| if attempt > 3 {
|
||||
None
|
||||
} else {
|
||||
// 100ms 400ms 1.6s 6.4s
|
||||
Some(Duration::from_millis(100 * 4u64.pow(attempt as u32)))
|
||||
}),
|
||||
);
|
||||
rmb.arced()
|
||||
}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user