mononoke: context_concurrency_blobstore

Summary:
This adds a blobstore that can reach into a CoreContext in order to identify
the allowed level of concurrency for blobstore requests initiated by this
CoreContext. This will let us replay infinitepush bundles with limits on a
per-request basis.

Reviewed By: farnz

Differential Revision: D20038575

fbshipit-source-id: 07299701879b7ae65ad9b7ff6e991ceddf062b24
This commit is contained in:
Thomas Orozco 2020-03-11 08:50:37 -07:00 committed by Facebook GitHub Bot
parent 52380d76a5
commit c5917acc3f
6 changed files with 235 additions and 3 deletions

View File

@ -0,0 +1,215 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use blobstore::Blobstore;
use cloned::cloned;
use context::CoreContext;
use futures::{compat::Future01CompatExt, FutureExt as _, TryFutureExt};
use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::BlobstoreBytes;
use scopeguard::defer;
/// A layer over an existing blobstore that respects a CoreContext's blobstore concurrency
#[derive(Clone, Debug)]
pub struct ContextConcurrencyBlobstore<T: Blobstore + Clone> {
blobstore: T,
}
impl<T: Blobstore + Clone> ContextConcurrencyBlobstore<T> {
pub fn new(blobstore: T) -> Self {
Self { blobstore }
}
}
impl<T: Blobstore + Clone> Blobstore for ContextConcurrencyBlobstore<T> {
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
cloned!(self.blobstore);
async move {
// NOTE: We need to clone() here because the context cannot be borrowed when we pass it
// down. We should eventually be able to get rid of this.
let session = ctx.session().clone();
let permit = match session.blobstore_semaphore() {
Some(sem) => Some(sem.acquire().await),
None => None,
};
defer!({
drop(permit);
});
blobstore.get(ctx, key).compat().await
}
.boxed()
.compat()
.boxify()
}
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
cloned!(self.blobstore);
async move {
let session = ctx.session().clone();
let permit = match session.blobstore_semaphore() {
Some(sem) => Some(sem.acquire().await),
None => None,
};
defer!({
drop(permit);
});
blobstore.put(ctx, key, value).compat().await
}
.boxed()
.compat()
.boxify()
}
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<bool, Error> {
cloned!(self.blobstore);
async move {
let session = ctx.session().clone();
let permit = match session.blobstore_semaphore() {
Some(sem) => Some(sem.acquire().await),
None => None,
};
defer!({
drop(permit);
});
blobstore.is_present(ctx, key).compat().await
}
.boxed()
.compat()
.boxify()
}
}
#[cfg(test)]
mod test {
use super::*;
use context::SessionContainer;
use fbinit::FacebookInit;
use scuba_ext::ScubaSampleBuilder;
use slog::{o, Drain, Level, Logger};
use slog_glog_fmt::default_drain;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::Duration;
use tokio::time;
#[derive(Clone, Debug)]
struct NonConcurentBlobstore(Arc<AtomicU64>);
impl NonConcurentBlobstore {
fn new() -> Self {
Self(Arc::new(AtomicU64::new(0)))
}
}
impl Blobstore for NonConcurentBlobstore {
fn get(&self, _ctx: CoreContext, _key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let ctr = self.0.clone();
if ctr.fetch_add(1, Ordering::Relaxed) > 0 {
panic!("No!");
}
async move {
time::delay_for(Duration::from_millis(10)).await;
ctr.fetch_sub(1, Ordering::Relaxed);
Ok(None)
}
.boxed()
.compat()
.boxify()
}
fn put(
&self,
_ctx: CoreContext,
_key: String,
_value: BlobstoreBytes,
) -> BoxFuture<(), Error> {
let ctr = self.0.clone();
if self.0.fetch_add(1, Ordering::Relaxed) > 0 {
panic!("No!");
}
async move {
time::delay_for(Duration::from_millis(10)).await;
ctr.fetch_sub(1, Ordering::Relaxed);
Ok(())
}
.boxed()
.compat()
.boxify()
}
fn is_present(&self, _ctx: CoreContext, _key: String) -> BoxFuture<bool, Error> {
let ctr = self.0.clone();
if self.0.fetch_add(1, Ordering::Relaxed) > 0 {
panic!("No!");
}
async move {
ctr.fetch_sub(1, Ordering::Relaxed);
time::delay_for(Duration::from_millis(10)).await;
Ok(false)
}
.boxed()
.compat()
.boxify()
}
}
fn logger() -> Logger {
let drain = default_drain().filter_level(Level::Debug).ignore_res();
Logger::root(drain, o![])
}
#[fbinit::test]
async fn test_semaphore(fb: FacebookInit) -> Result<(), Error> {
let session = SessionContainer::builder(fb)
.blobstore_concurrency(1)
.build();
let ctx = session.new_context(logger(), ScubaSampleBuilder::with_discard());
let blob = ContextConcurrencyBlobstore::new(NonConcurentBlobstore::new());
let res = futures::future::try_join(
blob.get(ctx.clone(), "foo".to_string()).compat(),
blob.get(ctx.clone(), "foo".to_string()).compat(),
)
.await?;
assert_eq!(res, (None, None));
let bytes = BlobstoreBytes::from_bytes("test foobar");
let res = futures::future::try_join(
blob.put(ctx.clone(), "foo".to_string(), bytes.clone())
.compat(),
blob.put(ctx.clone(), "foo".to_string(), bytes.clone())
.compat(),
)
.await?;
assert_eq!(res, ((), ()));
let res = futures::future::try_join(
blob.is_present(ctx.clone(), "foo".to_string()).compat(),
blob.is_present(ctx.clone(), "foo".to_string()).compat(),
)
.await?;
assert_eq!(res, (false, false));
Ok(())
}
}

View File

@ -22,6 +22,7 @@ use blobstore::ErrorKind;
use blobstore::{Blobstore, DisabledBlob};
use blobstore_sync_queue::SqlBlobstoreSyncQueue;
use chaosblob::ChaosBlobstore;
use context_concurrency_blobstore::ContextConcurrencyBlobstore;
use fileblob::Fileblob;
use itertools::Either;
use manifoldblob::ThriftManifoldBlob;
@ -476,6 +477,8 @@ pub fn make_blobstore(
};
store
.map(|inner| Arc::new(ContextConcurrencyBlobstore::new(inner)) as Arc<dyn Blobstore>)
.boxify()
}
pub fn make_blobstore_multiplexed(

View File

@ -27,3 +27,4 @@ futures = "0.1"
rand = { version = "0.7", features = ["small_rng"] }
serde_json = "1.0"
slog = { version="2.5", features=["max_level_debug"] }
tokio = { version = "0.2", features = ["full"] }

View File

@ -10,6 +10,7 @@ use rand::{self, distributions::Alphanumeric, thread_rng, Rng};
use session_id::SessionId;
use sshrelay::SshEnvVars;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::TraceContext;
use super::{SessionContainer, SessionContainerInner};
@ -43,6 +44,7 @@ impl SessionContainerBuilder {
user_unix_name: None,
source_hostname: None,
ssh_env_vars: SshEnvVars::default(),
blobstore_semaphore: None,
#[cfg(fbcode_build)]
facebook_data: SessionFacebookData::default(),
},
@ -74,6 +76,11 @@ impl SessionContainerBuilder {
self
}
pub fn blobstore_concurrency(mut self, concurrency: usize) -> Self {
self.inner.blobstore_semaphore = Some(Semaphore::new(concurrency));
self
}
#[cfg(fbcode_build)]
pub(crate) fn facebook_data(&mut self) -> &mut SessionFacebookData {
&mut self.inner.facebook_data

View File

@ -11,6 +11,7 @@ use session_id::SessionId;
use slog::Logger;
use sshrelay::SshEnvVars;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::TraceContext;
pub use self::builder::{generate_session_id, SessionContainerBuilder};
@ -34,6 +35,7 @@ struct SessionContainerInner {
user_unix_name: Option<String>,
source_hostname: Option<String>,
ssh_env_vars: SshEnvVars,
blobstore_semaphore: Option<Semaphore>,
#[cfg(fbcode_build)]
facebook_data: SessionFacebookData,
}
@ -77,6 +79,10 @@ impl SessionContainer {
&self.inner.ssh_env_vars
}
pub fn blobstore_semaphore(&self) -> Option<&Semaphore> {
self.inner.blobstore_semaphore.as_ref()
}
#[cfg(fbcode_build)]
pub(crate) fn facebook_data(&self) -> &SessionFacebookData {
&self.inner.facebook_data

View File

@ -42,7 +42,7 @@ Check bookmarks
Check blobstore-fetch, normal
$ mononoke_admin blobstore-fetch changeset.blake2.c3384961b16276f2db77df9d7c874bbe981cf0525bd6f84a502f919044f2dabd 2>&1 | strip_glog
using blobstore: MultiplexedBlobstore* (glob)
using blobstore: *MultiplexedBlobstore* (glob)
Some(BlobstoreBytes(* (glob)
Check blobstore-fetch, with scrub actions
@ -53,14 +53,14 @@ Check blobstore-fetch, with scrub actions
29
$ mononoke_admin blobstore-fetch --scrub-blobstore-action=ReportOnly changeset.blake2.c3384961b16276f2db77df9d7c874bbe981cf0525bd6f84a502f919044f2dabd 2>&1 | strip_glog
using blobstore: ScrubBlobstore* (glob)
using blobstore: *ScrubBlobstore* (glob)
scrub: blobstore_id BlobstoreId(1) not repaired for repo0000.changeset.blake2.c3384961b16276f2db77df9d7c874bbe981cf0525bd6f84a502f919044f2dabd
Some(BlobstoreBytes(* (glob)
$ ls blobstore/1/blobs | wc -l
29
$ mononoke_admin blobstore-fetch --scrub-blobstore-action=Repair changeset.blake2.c3384961b16276f2db77df9d7c874bbe981cf0525bd6f84a502f919044f2dabd 2>&1 | strip_glog
using blobstore: ScrubBlobstore* (glob)
using blobstore: *ScrubBlobstore* (glob)
scrub: blobstore_id BlobstoreId(1) repaired for repo0000.changeset.blake2.c3384961b16276f2db77df9d7c874bbe981cf0525bd6f84a502f919044f2dabd
Some(BlobstoreBytes(* (glob)
$ ls blobstore/1/blobs | wc -l