mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 08:47:12 +03:00
mononoke: update throttledblob to use governor crate
Summary: governor supercedes ratelimit_meter and provides async apis which means we don't need to use our own async_limiter version. This is in preparation for the next diff D26021464 which uses governor's update_n_ready() api for byte rate limiting, rather than adding it to AsyncLimiter in D26021464. Reviewed By: krallin Differential Revision: D26153156 fbshipit-source-id: c0b79baee3b71c770353152c6d7c63f616171c86
This commit is contained in:
parent
45fb30f125
commit
6b074bd668
@ -7,10 +7,9 @@ license = "GPLv2+"
|
||||
include = ["src/*.rs"]
|
||||
|
||||
[dependencies]
|
||||
async_limiter = { path = "../../common/async_limiter", version = "0.1.0" }
|
||||
blobstore = { path = "..", version = "0.1.0" }
|
||||
context = { path = "../../server/context", version = "0.1.0" }
|
||||
mononoke_types = { path = "../../mononoke_types", version = "0.1.0" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.29"
|
||||
ratelimit_meter = "5"
|
||||
governor = "0.3.2"
|
||||
|
@ -9,11 +9,12 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
use async_limiter::AsyncLimiter;
|
||||
use ratelimit_meter::{algorithms::LeakyBucket, DirectRateLimiter};
|
||||
use governor::{
|
||||
clock::DefaultClock,
|
||||
state::{direct::NotKeyed, InMemoryState},
|
||||
Jitter, Quota, RateLimiter,
|
||||
};
|
||||
use std::{fmt, num::NonZeroU32, time::Duration};
|
||||
|
||||
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
|
||||
use context::CoreContext;
|
||||
@ -34,25 +35,26 @@ impl ThrottleOptions {
|
||||
/// A Blobstore that rate limits the number of read and write operations.
|
||||
pub struct ThrottledBlob<T: fmt::Debug> {
|
||||
blobstore: T,
|
||||
read_qps_limiter: Option<AsyncLimiter>,
|
||||
write_qps_limiter: Option<AsyncLimiter>,
|
||||
read_qps_limiter: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
|
||||
write_qps_limiter: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
|
||||
/// The options fields are used for Debug. They are not consulted at runtime.
|
||||
options: ThrottleOptions,
|
||||
}
|
||||
|
||||
async fn limiter(limit: Option<NonZeroU32>) -> Option<AsyncLimiter> {
|
||||
match limit {
|
||||
Some(limit) => {
|
||||
Some(AsyncLimiter::new(DirectRateLimiter::<LeakyBucket>::per_second(limit)).await)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
static JITTER_MAX: Duration = Duration::from_millis(5);
|
||||
|
||||
fn jitter() -> Jitter {
|
||||
Jitter::up_to(JITTER_MAX)
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug + Send + Sync> ThrottledBlob<T> {
|
||||
pub async fn new(blobstore: T, options: ThrottleOptions) -> Self {
|
||||
let read_qps_limiter = limiter(options.read_qps).await;
|
||||
let write_qps_limiter = limiter(options.write_qps).await;
|
||||
let read_qps_limiter = options
|
||||
.read_qps
|
||||
.map(|qps| RateLimiter::direct(Quota::per_second(qps)));
|
||||
let write_qps_limiter = options
|
||||
.write_qps
|
||||
.map(|qps| RateLimiter::direct(Quota::per_second(qps)));
|
||||
Self {
|
||||
blobstore,
|
||||
read_qps_limiter,
|
||||
@ -70,7 +72,7 @@ impl<T: Blobstore> Blobstore for ThrottledBlob<T> {
|
||||
key: &'a str,
|
||||
) -> Result<Option<BlobstoreGetData>> {
|
||||
if let Some(limiter) = self.read_qps_limiter.as_ref() {
|
||||
limiter.access().await?;
|
||||
limiter.until_ready_with_jitter(jitter()).await;
|
||||
}
|
||||
self.blobstore.get(ctx, key).await
|
||||
}
|
||||
@ -82,14 +84,14 @@ impl<T: Blobstore> Blobstore for ThrottledBlob<T> {
|
||||
value: BlobstoreBytes,
|
||||
) -> Result<()> {
|
||||
if let Some(limiter) = self.write_qps_limiter.as_ref() {
|
||||
limiter.access().await?;
|
||||
limiter.until_ready_with_jitter(jitter()).await;
|
||||
}
|
||||
self.blobstore.put(ctx, key, value).await
|
||||
}
|
||||
|
||||
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
|
||||
if let Some(limiter) = self.read_qps_limiter.as_ref() {
|
||||
limiter.access().await?;
|
||||
limiter.until_ready_with_jitter(jitter()).await;
|
||||
}
|
||||
self.blobstore.is_present(ctx, key).await
|
||||
}
|
||||
@ -105,7 +107,7 @@ impl<T: BlobstorePutOps> BlobstorePutOps for ThrottledBlob<T> {
|
||||
put_behaviour: PutBehaviour,
|
||||
) -> Result<OverwriteStatus> {
|
||||
if let Some(limiter) = self.write_qps_limiter.as_ref() {
|
||||
limiter.access().await?;
|
||||
limiter.until_ready_with_jitter(jitter()).await;
|
||||
}
|
||||
self.blobstore
|
||||
.put_explicit(ctx, key, value, put_behaviour)
|
||||
@ -119,7 +121,7 @@ impl<T: BlobstorePutOps> BlobstorePutOps for ThrottledBlob<T> {
|
||||
value: BlobstoreBytes,
|
||||
) -> Result<OverwriteStatus> {
|
||||
if let Some(limiter) = self.write_qps_limiter.as_ref() {
|
||||
limiter.access().await?;
|
||||
limiter.until_ready_with_jitter(jitter()).await;
|
||||
}
|
||||
self.blobstore.put_with_status(ctx, key, value).await
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user