diff --git a/blobstore/src/lib.rs b/blobstore/src/lib.rs index db3cc5d0fa..0768c262b3 100644 --- a/blobstore/src/lib.rs +++ b/blobstore/src/lib.rs @@ -18,10 +18,8 @@ use std::sync::Arc; use futures::Future; mod boxed; -mod retrying; pub use boxed::{ArcBlobstore, BoxBlobstore}; -pub use retrying::RetryingBlobstore; /// Basic trait for the Blob Store interface /// diff --git a/blobstore/src/retrying.rs b/blobstore/src/retrying.rs deleted file mode 100644 index 2092b217b5..0000000000 --- a/blobstore/src/retrying.rs +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright (c) 2004-present, Facebook, Inc. -// All Rights Reserved. -// -// This software may be used and distributed according to the terms of the -// GNU General Public License version 2 or any later version. - -use std::error; -use std::marker::PhantomData; -use std::sync::Arc; -use std::time::Duration; - -use futures::{Future, IntoFuture}; -use futures::future::{loop_fn, Loop}; -use futures::sync::oneshot; -use futures_ext::{BoxFuture, FutureExt}; -use tokio_core::reactor::{Remote, Timeout}; - -use super::Blobstore; -use boxed::ArcBlobstore; - -struct SyncPhantomData(PhantomData); - -unsafe impl Sync for SyncPhantomData {} - -/// Blobstore that retries failed put/get operations with provided delay -pub struct RetryingBlobstore { - blobstore: ArcBlobstore, - remote: Remote, - get_retry_delay: Arc Option + Send + Sync>, - put_retry_delay: Arc Option + Send + Sync>, - _phantom: SyncPhantomData, -} - -impl RetryingBlobstore { - /// Take a blobstore and add retry functionality to it - pub fn new( - blobstore: ArcBlobstore, - remote: &Remote, - get_retry_delay: Arc Option + Send + Sync>, - put_retry_delay: Arc Option + Send + Sync>, - ) -> Self { - RetryingBlobstore { - blobstore, - remote: remote.clone(), - get_retry_delay, - put_retry_delay, - _phantom: SyncPhantomData(PhantomData), - } - } -} - -fn delay_for(remote: &Remote, delay: Duration) -> BoxFuture<(), E> -where - E: From<::std::io::Error> + From + Send + 'static, -{ - let (tx, rx) = oneshot::channel(); - remote.spawn(move |handle| { - Timeout::new(delay, handle) - .into_future() - .and_then(|x| x) - .map_err(E::from) - .then(move |finished| tx.send(finished)) - .map_err(|_| ()) - }); - - rx.from_err().and_then(|x| x).boxify() -} - -impl Blobstore for RetryingBlobstore -where - K: Clone + Send + 'static, - Vi: Clone + Send + 'static, - Vo: AsRef<[u8]> + Send + 'static, - E: From + From<::std::io::Error> + From + error::Error + Send + 'static, - EInner: error::Error + Send + 'static, -{ - type Key = K; - type ValueIn = Vi; - type ValueOut = Vo; - type Error = E; - - type GetBlob = BoxFuture, Self::Error>; - type PutBlob = BoxFuture<(), Self::Error>; - - fn get(&self, key: &Self::Key) -> Self::GetBlob { - loop_fn((0, None), { - let blobstore = self.blobstore.clone(); - let remote = self.remote.clone(); - let key = key.clone(); - let retry_delay = self.get_retry_delay.clone(); - move |(attempt, _): (usize, Option)| { - blobstore.get(&key).from_err().then({ - let remote = remote.clone(); - let retry_delay = retry_delay.clone(); - move |result| match result { - Ok(resp) => Ok(Loop::Break((attempt, resp))).into_future().boxify(), - Err(err) => match retry_delay(attempt) { - None => Err(err).into_future().boxify(), - Some(dur) => delay_for(&remote, dur) - .map(move |()| Loop::Continue((attempt + 1, None))) - .boxify(), - }, - } - }) - } - }).map(|(_, resp)| resp) - .boxify() - } - - fn put(&self, key: Self::Key, value: Self::ValueIn) -> Self::PutBlob { - loop_fn(0, { - let blobstore = self.blobstore.clone(); - let remote = self.remote.clone(); - let retry_delay = self.put_retry_delay.clone(); - move |attempt| { - blobstore.put(key.clone(), value.clone()).from_err().then({ - let remote = remote.clone(); - let retry_delay = retry_delay.clone(); - move |result| match result { - Ok(()) => Ok(Loop::Break(attempt)).into_future().boxify(), - Err(err) => match retry_delay(attempt) { - None => Err(err).into_future().boxify(), - Some(dur) => delay_for(&remote, dur) - .map(move |()| Loop::Continue(attempt + 1)) - .boxify(), - }, - } - }) - } - }).map(|_| ()) - .boxify() - } -} diff --git a/blobstore/test/main.rs b/blobstore/test/main.rs index 3dc3d33580..d3daf45a49 100644 --- a/blobstore/test/main.rs +++ b/blobstore/test/main.rs @@ -13,8 +13,6 @@ extern crate error_chain; extern crate futures; extern crate futures_ext; -#[macro_use] -extern crate lazy_static; extern crate tempdir; extern crate tokio_core; @@ -23,16 +21,10 @@ extern crate fileblob; extern crate memblob; extern crate rocksblob; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - use futures::Future; -use futures::future::IntoFuture; -use futures_ext::{BoxFuture, FutureExt}; use tempdir::TempDir; -use tokio_core::reactor::{Core, Remote}; -use blobstore::{Blobstore, RetryingBlobstore}; +use blobstore::Blobstore; use fileblob::Fileblob; use memblob::Memblob; use rocksblob::Rocksblob; @@ -149,155 +141,3 @@ blobstore_test_impl! { persistent: true, } } - -mod flaky_errors { - error_chain! { - errors { - Flakiness { - description("flakiness happend") - } - } - foreign_links { - Io(::std::io::Error); - Oneshot(::futures::sync::oneshot::Canceled); - } - } -} - -use flaky_errors::{Error as FlakyError, ErrorKind as FlakyErrorKind}; - -struct FlakyBlobstore { - blobstore: Memblob, - flakiness: Mutex, // number of calls that will fail -} - -impl Blobstore for FlakyBlobstore { - type Key = String; - type ValueIn = Vec; - type ValueOut = Self::ValueIn; - type Error = FlakyError; - type PutBlob = BoxFuture<(), Self::Error>; - type GetBlob = BoxFuture, Self::Error>; - - fn put(&self, k: Self::Key, v: Self::ValueIn) -> Self::PutBlob { - let mut flakiness = self.flakiness.lock().expect("lock poison"); - if *flakiness == 0 { - self.blobstore - .put(k, v) - .map_err(|_| FlakyError::from("never happens")) - .boxify() - } else { - *flakiness = (*flakiness) - 1; - Err(FlakyErrorKind::Flakiness.into()).into_future().boxify() - } - } - - fn get(&self, k: &Self::Key) -> Self::GetBlob { - let mut flakiness = self.flakiness.lock().expect("lock poison"); - if *flakiness == 0 { - self.blobstore - .get(k) - .map_err(|_| FlakyError::from("never happens")) - .boxify() - } else { - *flakiness = *flakiness - 1; - Err(FlakyErrorKind::Flakiness.into()).into_future().boxify() - } - } -} - -#[cfg(test)] -mod retry_tests { - use super::*; - - use std::sync::mpsc::channel; - - lazy_static! { - static ref REMOTE: Remote = { - let (tx, rx) = channel(); - ::std::thread::spawn(move || { - let mut core = Core::new().expect("failed to create tokio Core"); - tx.send(core.remote()).unwrap(); - loop { - core.turn(None); - } - }); - rx.recv().unwrap() - }; - } - - fn flaky_blobstore( - flakiness: usize, - max_attempts: usize, - ) -> RetryingBlobstore, Vec, FlakyError, FlakyError> { - let blobstore = FlakyBlobstore { - blobstore: Memblob::new(), - flakiness: Mutex::new(flakiness), - }; - let retry_delay = Arc::new(move |attempt| if attempt < max_attempts { - Some(Duration::from_secs(0)) - } else { - None - }); - RetryingBlobstore::new( - blobstore.arced(), - &*REMOTE, - retry_delay.clone(), - retry_delay, - ) - } - - fn check_failing(flakiness: usize, max_attempts: usize) { - let blobstore = flaky_blobstore(flakiness, max_attempts); - let foo = "foo".to_owned(); - let bar = b"bar"[..].into(); - match blobstore - .put(foo.clone(), bar) - .wait() - .expect_err("error expected") - { - FlakyError(FlakyErrorKind::Flakiness, _) => (), - err => panic!("unexpected error: {:?}", err), - } - - let blobstore = flaky_blobstore(flakiness, max_attempts); - match blobstore.get(&foo).wait().expect_err("error expected") { - FlakyError(FlakyErrorKind::Flakiness, _) => (), - err => panic!("unexpected error: {:?}", err), - } - } - - fn check_succeeding(flakiness: usize, max_attempts: usize) { - let blobstore = flaky_blobstore(flakiness, max_attempts); - let foo = "foo".to_owned(); - let bar: Vec = b"bar"[..].into(); - blobstore - .put(foo.clone(), bar.clone()) - .wait() - .expect("success expected"); - assert_eq!( - blobstore.get(&foo).wait().expect("success expected"), - Some(bar) - ); - } - - #[test] - fn test_one_attempt() { - check_succeeding(0, 0); - check_failing(1, 0); - } - - #[test] - fn test_multiple_attempts_succeeding() { - check_succeeding(0, 2); - check_succeeding(1, 2); - check_succeeding(2, 2); - } - - #[test] - fn test_multiple_attempts_failing() { - check_failing(3, 2); - check_failing(10, 2); - check_failing(6, 5); - } -} diff --git a/cmds/blobimport/main.rs b/cmds/blobimport/main.rs index 49616312ce..b0560c8207 100644 --- a/cmds/blobimport/main.rs +++ b/cmds/blobimport/main.rs @@ -46,7 +46,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::mpsc::sync_channel; use std::thread; -use std::time::Duration; use bytes::Bytes; use clap::{App, Arg, ArgMatches}; @@ -58,7 +57,7 @@ use stats::Timeseries; use tokio_core::reactor::{Core, Remote}; use blobrepo::BlobChangeset; -use blobstore::{Blobstore, RetryingBlobstore}; +use blobstore::Blobstore; use fileblob::Fileblob; use fileheads::FileHeads; use futures_ext::{BoxFuture, FutureExt}; @@ -245,24 +244,7 @@ fn open_blobstore>( } BlobstoreType::Manifold(bucket) => { let mb: ManifoldBlob = ManifoldBlob::new_may_panic(bucket, remote); - let rmb: RetryingBlobstore< - String, - Bytes, - Vec, - Error, - manifoldblob::Error, - > = RetryingBlobstore::new( - mb.arced(), - remote, - Arc::new(|_| None), - Arc::new(|attempt| if attempt > 3 { - None - } else { - // 100ms 400ms 1.6s 6.4s - Some(Duration::from_millis(100 * 4u64.pow(attempt as u32))) - }), - ); - rmb.arced() + mb.arced() } };