mononoke: replace DelayBlob with DelayedBlobstore

Summary:
We didn't use DelayBlob at all, however we use DelayedBlobstore in benchmark
lib. DelayedBlobstore seem to have more useful options, so let's remove
DelayBlob and use DelayedBlobstore instead.

Reviewed By: farnz

Differential Revision: D20245865

fbshipit-source-id: bd694a0e178367014adc2776185450693f87475d
This commit is contained in:
Stanislau Hlebik 2020-03-04 12:45:39 -08:00 committed by Facebook Github Bot
parent c008ba8513
commit 2fddb7e1e4
2 changed files with 42 additions and 107 deletions

View File

@ -21,6 +21,7 @@ use cacheblob::{dummy::DummyLease, new_cachelib_blobstore};
use changesets::{CachingChangesets, ChangesetEntry, ChangesetInsert, Changesets, SqlChangesets};
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use delayblob::DelayedBlobstore;
use fbinit::FacebookInit;
use filenodes::{FilenodeInfo, Filenodes, PreparedFilenode};
use filestore::FilestoreConfig;
@ -29,8 +30,7 @@ use futures_old::{future, Future};
use memblob::EagerMemblob;
use mercurial_types::{HgChangesetIdPrefix, HgChangesetIdsResolvedFromPrefix, HgFileNodeId};
use mononoke_types::{
BlobstoreBytes, ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepoPath,
RepositoryId,
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepoPath, RepositoryId,
};
use newfilenodes::NewFilenodesBuilder;
use phases::{SqlPhasesFactory, SqlPhasesStore};
@ -172,41 +172,6 @@ where
})
}
#[derive(Debug)]
struct DelayedBlobstore<B> {
inner: B,
get_dist: Normal,
put_dist: Normal,
}
impl<B> DelayedBlobstore<B> {
fn new(inner: B, get_dist: Normal, put_dist: Normal) -> Self {
Self {
inner,
get_dist,
put_dist,
}
}
}
impl<B: Blobstore> Blobstore for DelayedBlobstore<B> {
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
delay(self.get_dist, self.inner.get(ctx, key)).boxify()
}
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
delay(self.put_dist, self.inner.put(ctx, key, value)).boxify()
}
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<bool, Error> {
delay(self.get_dist, self.inner.is_present(ctx, key)).boxify()
}
fn assert_present(&self, ctx: CoreContext, key: String) -> BoxFuture<(), Error> {
delay(self.get_dist, self.inner.assert_present(ctx, key)).boxify()
}
}
struct DelayedFilenodes<F> {
inner: F,
get_dist: Normal,

View File

@ -7,104 +7,74 @@
#![deny(warnings)]
use std::fmt;
use std::iter::{repeat, Map, Repeat};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::time::Duration;
use crate::future::lazy;
use anyhow::Error;
use futures_ext::{BoxFuture, FutureExt};
use tokio::prelude::*;
use tokio::timer::Delay;
use futures::future::FutureExt;
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_old::future::Future;
use futures_util::future::TryFutureExt;
use rand::Rng;
use rand_distr::Distribution;
use blobstore::Blobstore;
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
/// A blobstore that imposes a delay on all its operations, where the delay is generated by a
/// passed in function or closure
pub struct DelayBlob<F>
where
F: FnMut(()) -> Duration + 'static + Send + Sync,
{
blobstore: Box<dyn Blobstore>,
delay: Mutex<Map<Repeat<()>, F>>,
get_roundtrips: usize,
put_roundtrips: usize,
is_present_roundtrips: usize,
assert_present_roundtrips: usize,
pub type Normal = rand_distr::Normal<f64>;
#[derive(Debug)]
pub struct DelayedBlobstore<B> {
inner: B,
get_dist: Normal,
put_dist: Normal,
}
impl<F> DelayBlob<F>
where
F: FnMut(()) -> Duration + 'static + Send + Sync,
{
pub fn new(
blobstore: Box<dyn Blobstore>,
delay_gen: F,
get_roundtrips: usize,
put_roundtrips: usize,
is_present_roundtrips: usize,
assert_present_roundtrips: usize,
) -> Self {
impl<B> DelayedBlobstore<B> {
pub fn new(inner: B, get_dist: Normal, put_dist: Normal) -> Self {
Self {
blobstore,
delay: Mutex::new(repeat(()).map(delay_gen)),
get_roundtrips,
put_roundtrips,
is_present_roundtrips,
assert_present_roundtrips,
inner,
get_dist,
put_dist,
}
}
fn sleep(&self, roundtrips: usize) -> impl Future<Item = (), Error = Error> + 'static {
let mut locked_delay = self.delay.lock().expect("lock poisoned");
let delay = locked_delay.by_ref().take(roundtrips).sum();
lazy(move || Delay::new(Instant::now() + delay)).map_err(Error::from)
}
}
impl<F> Blobstore for DelayBlob<F>
where
F: FnMut(()) -> Duration + 'static + Send + Sync,
{
impl<B: Blobstore> Blobstore for DelayedBlobstore<B> {
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let sleep = self.sleep(self.get_roundtrips);
let get = self.blobstore.get(ctx, key);
sleep.and_then(move |_| get).boxify()
delay(self.get_dist, self.inner.get(ctx, key)).boxify()
}
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
let sleep = self.sleep(self.put_roundtrips);
let put = self.blobstore.put(ctx, key, value);
sleep.and_then(move |_| put).boxify()
delay(self.put_dist, self.inner.put(ctx, key, value)).boxify()
}
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<bool, Error> {
let sleep = self.sleep(self.is_present_roundtrips);
let is_present = self.blobstore.is_present(ctx, key);
sleep.and_then(move |_| is_present).boxify()
delay(self.get_dist, self.inner.is_present(ctx, key)).boxify()
}
fn assert_present(&self, ctx: CoreContext, key: String) -> BoxFuture<(), Error> {
let sleep = self.sleep(self.assert_present_roundtrips);
let assert_present = self.blobstore.assert_present(ctx, key);
sleep.and_then(move |_| assert_present).boxify()
delay(self.get_dist, self.inner.assert_present(ctx, key)).boxify()
}
}
impl<F> fmt::Debug for DelayBlob<F>
fn delay<F, D>(distribution: D, target: F) -> impl Future<Item = F::Item, Error = Error>
where
F: FnMut(()) -> Duration + 'static + Send + Sync,
D: Distribution<f64>,
F: Future<Error = Error>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DelayBlob")
.field("blobstore", &self.blobstore)
.field("get_roundtrips", &self.get_roundtrips)
.field("put_roundtrips", &self.put_roundtrips)
.field("is_present_roundtrips", &self.is_present_roundtrips)
.field("assert_present_roundtrips", &self.assert_present_roundtrips)
.finish()
let seconds = rand::thread_rng().sample(distribution).abs();
async move {
tokio::time::delay_for(Duration::new(
seconds.trunc() as u64,
(seconds.fract() * 1e+9) as u32,
))
.await;
let res: Result<_, Error> = Ok(());
res
}
.boxed()
.compat()
.and_then(|()| target)
}