diff --git a/blobstore/src/lib.rs b/blobstore/src/lib.rs index b549f6e490..db3cc5d0fa 100644 --- a/blobstore/src/lib.rs +++ b/blobstore/src/lib.rs @@ -10,14 +10,18 @@ extern crate futures; extern crate futures_ext; +extern crate tokio_core; + use std::error; use std::sync::Arc; use futures::Future; mod boxed; +mod retrying; pub use boxed::{ArcBlobstore, BoxBlobstore}; +pub use retrying::RetryingBlobstore; /// Basic trait for the Blob Store interface /// diff --git a/blobstore/src/retrying.rs b/blobstore/src/retrying.rs new file mode 100644 index 0000000000..2092b217b5 --- /dev/null +++ b/blobstore/src/retrying.rs @@ -0,0 +1,133 @@ +// Copyright (c) 2004-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use std::error; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +use futures::{Future, IntoFuture}; +use futures::future::{loop_fn, Loop}; +use futures::sync::oneshot; +use futures_ext::{BoxFuture, FutureExt}; +use tokio_core::reactor::{Remote, Timeout}; + +use super::Blobstore; +use boxed::ArcBlobstore; + +struct SyncPhantomData(PhantomData); + +unsafe impl Sync for SyncPhantomData {} + +/// Blobstore that retries failed put/get operations with provided delay +pub struct RetryingBlobstore { + blobstore: ArcBlobstore, + remote: Remote, + get_retry_delay: Arc Option + Send + Sync>, + put_retry_delay: Arc Option + Send + Sync>, + _phantom: SyncPhantomData, +} + +impl RetryingBlobstore { + /// Take a blobstore and add retry functionality to it + pub fn new( + blobstore: ArcBlobstore, + remote: &Remote, + get_retry_delay: Arc Option + Send + Sync>, + put_retry_delay: Arc Option + Send + Sync>, + ) -> Self { + RetryingBlobstore { + blobstore, + remote: remote.clone(), + get_retry_delay, + put_retry_delay, + _phantom: SyncPhantomData(PhantomData), + } + } +} + +fn delay_for(remote: &Remote, delay: Duration) -> BoxFuture<(), E> +where + E: From<::std::io::Error> + From + Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + remote.spawn(move |handle| { + Timeout::new(delay, handle) + .into_future() + .and_then(|x| x) + .map_err(E::from) + .then(move |finished| tx.send(finished)) + .map_err(|_| ()) + }); + + rx.from_err().and_then(|x| x).boxify() +} + +impl Blobstore for RetryingBlobstore +where + K: Clone + Send + 'static, + Vi: Clone + Send + 'static, + Vo: AsRef<[u8]> + Send + 'static, + E: From + From<::std::io::Error> + From + error::Error + Send + 'static, + EInner: error::Error + Send + 'static, +{ + type Key = K; + type ValueIn = Vi; + type ValueOut = Vo; + type Error = E; + + type GetBlob = BoxFuture, Self::Error>; + type PutBlob = BoxFuture<(), Self::Error>; + + fn get(&self, key: &Self::Key) -> Self::GetBlob { + loop_fn((0, None), { + let blobstore = self.blobstore.clone(); + let remote = self.remote.clone(); + let key = key.clone(); + let retry_delay = self.get_retry_delay.clone(); + move |(attempt, _): (usize, Option)| { + blobstore.get(&key).from_err().then({ + let remote = remote.clone(); + let retry_delay = retry_delay.clone(); + move |result| match result { + Ok(resp) => Ok(Loop::Break((attempt, resp))).into_future().boxify(), + Err(err) => match retry_delay(attempt) { + None => Err(err).into_future().boxify(), + Some(dur) => delay_for(&remote, dur) + .map(move |()| Loop::Continue((attempt + 1, None))) + .boxify(), + }, + } + }) + } + }).map(|(_, resp)| resp) + .boxify() + } + + fn put(&self, key: Self::Key, value: Self::ValueIn) -> Self::PutBlob { + loop_fn(0, { + let blobstore = self.blobstore.clone(); + let remote = self.remote.clone(); + let retry_delay = self.put_retry_delay.clone(); + move |attempt| { + blobstore.put(key.clone(), value.clone()).from_err().then({ + let remote = remote.clone(); + let retry_delay = retry_delay.clone(); + move |result| match result { + Ok(()) => Ok(Loop::Break(attempt)).into_future().boxify(), + Err(err) => match retry_delay(attempt) { + None => Err(err).into_future().boxify(), + Some(dur) => delay_for(&remote, dur) + .map(move |()| Loop::Continue(attempt + 1)) + .boxify(), + }, + } + }) + } + }).map(|_| ()) + .boxify() + } +} diff --git a/blobstore/test/main.rs b/blobstore/test/main.rs index 50c35b1ae3..3dc3d33580 100644 --- a/blobstore/test/main.rs +++ b/blobstore/test/main.rs @@ -12,17 +12,27 @@ #[macro_use] extern crate error_chain; extern crate futures; +extern crate futures_ext; +#[macro_use] +extern crate lazy_static; extern crate tempdir; +extern crate tokio_core; extern crate blobstore; extern crate fileblob; extern crate memblob; extern crate rocksblob; -use futures::Future; -use tempdir::TempDir; +use std::sync::{Arc, Mutex}; +use std::time::Duration; -use blobstore::Blobstore; +use futures::Future; +use futures::future::IntoFuture; +use futures_ext::{BoxFuture, FutureExt}; +use tempdir::TempDir; +use tokio_core::reactor::{Core, Remote}; + +use blobstore::{Blobstore, RetryingBlobstore}; use fileblob::Fileblob; use memblob::Memblob; use rocksblob::Rocksblob; @@ -139,3 +149,155 @@ blobstore_test_impl! { persistent: true, } } + +mod flaky_errors { + error_chain! { + errors { + Flakiness { + description("flakiness happend") + } + } + foreign_links { + Io(::std::io::Error); + Oneshot(::futures::sync::oneshot::Canceled); + } + } +} + +use flaky_errors::{Error as FlakyError, ErrorKind as FlakyErrorKind}; + +struct FlakyBlobstore { + blobstore: Memblob, + flakiness: Mutex, // number of calls that will fail +} + +impl Blobstore for FlakyBlobstore { + type Key = String; + type ValueIn = Vec; + type ValueOut = Self::ValueIn; + type Error = FlakyError; + type PutBlob = BoxFuture<(), Self::Error>; + type GetBlob = BoxFuture, Self::Error>; + + fn put(&self, k: Self::Key, v: Self::ValueIn) -> Self::PutBlob { + let mut flakiness = self.flakiness.lock().expect("lock poison"); + if *flakiness == 0 { + self.blobstore + .put(k, v) + .map_err(|_| FlakyError::from("never happens")) + .boxify() + } else { + *flakiness = (*flakiness) - 1; + Err(FlakyErrorKind::Flakiness.into()).into_future().boxify() + } + } + + fn get(&self, k: &Self::Key) -> Self::GetBlob { + let mut flakiness = self.flakiness.lock().expect("lock poison"); + if *flakiness == 0 { + self.blobstore + .get(k) + .map_err(|_| FlakyError::from("never happens")) + .boxify() + } else { + *flakiness = *flakiness - 1; + Err(FlakyErrorKind::Flakiness.into()).into_future().boxify() + } + } +} + +#[cfg(test)] +mod retry_tests { + use super::*; + + use std::sync::mpsc::channel; + + lazy_static! { + static ref REMOTE: Remote = { + let (tx, rx) = channel(); + ::std::thread::spawn(move || { + let mut core = Core::new().expect("failed to create tokio Core"); + tx.send(core.remote()).unwrap(); + loop { + core.turn(None); + } + }); + rx.recv().unwrap() + }; + } + + fn flaky_blobstore( + flakiness: usize, + max_attempts: usize, + ) -> RetryingBlobstore, Vec, FlakyError, FlakyError> { + let blobstore = FlakyBlobstore { + blobstore: Memblob::new(), + flakiness: Mutex::new(flakiness), + }; + let retry_delay = Arc::new(move |attempt| if attempt < max_attempts { + Some(Duration::from_secs(0)) + } else { + None + }); + RetryingBlobstore::new( + blobstore.arced(), + &*REMOTE, + retry_delay.clone(), + retry_delay, + ) + } + + fn check_failing(flakiness: usize, max_attempts: usize) { + let blobstore = flaky_blobstore(flakiness, max_attempts); + let foo = "foo".to_owned(); + let bar = b"bar"[..].into(); + match blobstore + .put(foo.clone(), bar) + .wait() + .expect_err("error expected") + { + FlakyError(FlakyErrorKind::Flakiness, _) => (), + err => panic!("unexpected error: {:?}", err), + } + + let blobstore = flaky_blobstore(flakiness, max_attempts); + match blobstore.get(&foo).wait().expect_err("error expected") { + FlakyError(FlakyErrorKind::Flakiness, _) => (), + err => panic!("unexpected error: {:?}", err), + } + } + + fn check_succeeding(flakiness: usize, max_attempts: usize) { + let blobstore = flaky_blobstore(flakiness, max_attempts); + let foo = "foo".to_owned(); + let bar: Vec = b"bar"[..].into(); + blobstore + .put(foo.clone(), bar.clone()) + .wait() + .expect("success expected"); + assert_eq!( + blobstore.get(&foo).wait().expect("success expected"), + Some(bar) + ); + } + + #[test] + fn test_one_attempt() { + check_succeeding(0, 0); + check_failing(1, 0); + } + + #[test] + fn test_multiple_attempts_succeeding() { + check_succeeding(0, 2); + check_succeeding(1, 2); + check_succeeding(2, 2); + } + + #[test] + fn test_multiple_attempts_failing() { + check_failing(3, 2); + check_failing(10, 2); + check_failing(6, 5); + } +} diff --git a/cmds/blobimport/errors.rs b/cmds/blobimport/errors.rs index 2feac98f55..cf5ad07faf 100644 --- a/cmds/blobimport/errors.rs +++ b/cmds/blobimport/errors.rs @@ -15,6 +15,7 @@ error_chain! { } foreign_links { Io(::std::io::Error); + Oneshot(::futures::sync::oneshot::Canceled); } } diff --git a/cmds/blobimport/main.rs b/cmds/blobimport/main.rs index 3aba8794e8..eb774d043a 100644 --- a/cmds/blobimport/main.rs +++ b/cmds/blobimport/main.rs @@ -47,6 +47,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::mpsc::sync_channel; use std::thread; +use std::time::Duration; use bytes::Bytes; use clap::{App, Arg, ArgMatches}; @@ -57,7 +58,7 @@ use slog_glog_fmt::default_drain as glog_drain; use tokio_core::reactor::{Core, Remote}; use blobrepo::BlobChangeset; -use blobstore::Blobstore; +use blobstore::{Blobstore, RetryingBlobstore}; use fileblob::Fileblob; use fileheads::FileHeads; use futures_ext::{BoxFuture, FutureExt}; @@ -220,7 +221,24 @@ fn open_blobstore( } BlobstoreType::Manifold(bucket) => { let mb: ManifoldBlob = ManifoldBlob::new_may_panic(bucket, remote); - mb.arced() + let rmb: RetryingBlobstore< + String, + Bytes, + Vec, + Error, + manifoldblob::Error, + > = RetryingBlobstore::new( + mb.arced(), + remote, + Arc::new(|_| None), + Arc::new(|attempt| if attempt > 3 { + None + } else { + // 100ms 400ms 1.6s 6.4s + Some(Duration::from_millis(100 * 4u64.pow(attempt as u32))) + }), + ); + rmb.arced() } };