mirror of
https://github.com/facebook/sapling.git
synced 2024-10-04 22:07:44 +03:00
Mononoke LFS: allow sharding content across > 1 host
Summary: To ensure that a specific content gets sharded to multiple tasks (by tasks_per_content config parameter) each content can have multiple routing keys. These are the SHA256 (as before) or the SHA256 suffixed with "-1", "-2" etc.. When the routing key is generated for an item, one of them is picked randomly. Added config constraint to ensure that the number of tasks parameter is at least 1. Reviewed By: krallin Differential Revision: D25886402 fbshipit-source-id: fb8911dad07d2f0b6bbf57b4ede084428fe6c49d
This commit is contained in:
parent
bd4c512f2c
commit
591b6782f2
@ -1,4 +1,4 @@
|
||||
// @generated SignedSource<<98da7e38c82c4ef264e73736e23327a8>>
|
||||
// @generated SignedSource<<d80c7fdee9114fc90a27c2fe9644691f>>
|
||||
// DO NOT EDIT THIS FILE MANUALLY!
|
||||
// This file is a mechanical copy of the version in the configerator repo. To
|
||||
// modify it, edit the copy in the configerator repo instead and copy it over by
|
||||
@ -68,4 +68,7 @@ struct LfsServerConfig {
|
||||
10: i64 object_popularity_threshold;
|
||||
|
||||
11: optional ObjectPopularity object_popularity;
|
||||
|
||||
// The number of tasks to receive given content.
|
||||
12: i16 tasks_per_content;
|
||||
}
|
||||
|
@ -20,11 +20,13 @@ use gotham_ext::{
|
||||
use http::header::HeaderMap;
|
||||
use hyper::{Body, StatusCode};
|
||||
use maplit::hashmap;
|
||||
use rand::Rng;
|
||||
use redactedblobstore::has_redaction_root_cause;
|
||||
use serde::Deserialize;
|
||||
use slog::debug;
|
||||
use stats::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroU16;
|
||||
use std::time::Instant;
|
||||
use time_ext::DurationExt;
|
||||
use time_window_counter::GlobalTimeWindowCounterBuilder;
|
||||
@ -222,6 +224,18 @@ async fn resolve_internal_object(
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn generate_routing_key(tasks_per_content: NonZeroU16, oid: Sha256) -> String {
|
||||
// Randomly generate task number to send to.
|
||||
let task_n = rand::thread_rng().gen_range(0, tasks_per_content.get());
|
||||
// For the base task, no extension is added to routing key.
|
||||
let mut routing_key = format!("{}", oid);
|
||||
if task_n > 0 {
|
||||
// All other tasks have tailing number in routing key.
|
||||
routing_key = format!("{}-{}", routing_key, task_n);
|
||||
}
|
||||
routing_key
|
||||
}
|
||||
|
||||
async fn internal_objects(
|
||||
ctx: &RepositoryRequestContext,
|
||||
objects: &[RequestObject],
|
||||
@ -252,8 +266,10 @@ async fn internal_objects(
|
||||
Some(stored) => {
|
||||
let uri = if allow_consistent_routing && ctx.config.enable_consistent_routing()
|
||||
{
|
||||
let routing_key =
|
||||
generate_routing_key(ctx.config.tasks_per_content(), oid.0.into());
|
||||
ctx.uri_builder
|
||||
.consistent_download_uri(&stored.id, oid.0.into())
|
||||
.consistent_download_uri(&stored.id, routing_key)
|
||||
} else {
|
||||
ctx.uri_builder.download_uri(&stored.id)
|
||||
};
|
||||
@ -700,6 +716,25 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_routing_keys() -> Result<(), Error> {
|
||||
// allowed keys
|
||||
let allowed_routing_key_base: String = format!("{}", ONES_SHA256);
|
||||
let allowed_routing_key_one: String = format!("{}-1", allowed_routing_key_base);
|
||||
// base case
|
||||
let routing_key_base = generate_routing_key(NonZeroU16::new(1).unwrap(), ONES_SHA256);
|
||||
assert_eq!(&routing_key_base, &allowed_routing_key_base);
|
||||
|
||||
// random key case
|
||||
let allowed_routing_keys = vec![allowed_routing_key_base, allowed_routing_key_one];
|
||||
for _ in 0..5 {
|
||||
let routing_key = generate_routing_key(NonZeroU16::new(2).unwrap(), ONES_SHA256);
|
||||
assert!(allowed_routing_keys.contains(&routing_key))
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_download_upstream_failed_and_its_ok() -> Result<(), Error> {
|
||||
let o1 = obj(ONES_HASH, 111)?;
|
||||
|
@ -13,6 +13,7 @@ use serde::ser::Serializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::default::Default;
|
||||
use std::num::NonZeroU16;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@ -77,6 +78,7 @@ pub struct ServerConfig {
|
||||
pub raw_server_config: lfs_server_config::LfsServerConfig,
|
||||
throttle_limits: Vec<Limit>,
|
||||
object_popularity: Option<ObjectPopularity>,
|
||||
tasks_per_content: NonZeroU16,
|
||||
}
|
||||
|
||||
impl TryFrom<lfs_server_config::LfsServerConfig> for ServerConfig {
|
||||
@ -98,10 +100,19 @@ impl TryFrom<lfs_server_config::LfsServerConfig> for ServerConfig {
|
||||
.transpose()
|
||||
.with_context(|| "Invalid object popularity")?;
|
||||
|
||||
let tasks_per_content = value
|
||||
.tasks_per_content
|
||||
.try_into()
|
||||
.with_context(|| "tasks_per_content is < 0")?;
|
||||
|
||||
let tasks_per_content =
|
||||
NonZeroU16::new(tasks_per_content).with_context(|| "tasks_per_content is 0")?;
|
||||
|
||||
Ok(Self {
|
||||
raw_server_config: value,
|
||||
throttle_limits,
|
||||
object_popularity,
|
||||
tasks_per_content,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -138,12 +149,14 @@ impl Default for ServerConfig {
|
||||
// TODO: Remove those once they're gone from Thrift configs.
|
||||
object_popularity_category: Default::default(),
|
||||
object_popularity_threshold: Default::default(),
|
||||
tasks_per_content: 1,
|
||||
};
|
||||
|
||||
Self {
|
||||
raw_server_config,
|
||||
throttle_limits: vec![],
|
||||
object_popularity: None,
|
||||
tasks_per_content: NonZeroU16::new(1).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -171,6 +184,9 @@ impl ServerConfig {
|
||||
pub fn object_popularity_mut(&mut self) -> &mut Option<ObjectPopularity> {
|
||||
&mut self.object_popularity
|
||||
}
|
||||
pub fn tasks_per_content(&self) -> NonZeroU16 {
|
||||
self.tasks_per_content
|
||||
}
|
||||
}
|
||||
|
||||
impl Limit {
|
||||
|
@ -38,7 +38,6 @@ use hyper::{client::HttpConnector, Client};
|
||||
use hyper_openssl::HttpsConnector;
|
||||
use lfs_protocol::{RequestBatch, RequestObject, ResponseBatch};
|
||||
use metaconfig_types::RepoConfig;
|
||||
use mononoke_types::hash::Sha256;
|
||||
use mononoke_types::ContentId;
|
||||
|
||||
use crate::config::ServerConfig;
|
||||
@ -378,13 +377,13 @@ impl UriBuilder {
|
||||
pub fn consistent_download_uri(
|
||||
&self,
|
||||
content_id: &ContentId,
|
||||
oid: Sha256,
|
||||
routing_key: String,
|
||||
) -> Result<Uri, Error> {
|
||||
self.server
|
||||
.self_uri
|
||||
.build(format_args!(
|
||||
"{}/download/{}?routing={}",
|
||||
&self.repository, content_id, oid
|
||||
&self.repository, content_id, routing_key
|
||||
))
|
||||
.context(ErrorKind::UriBuilderFailed("consistent_download_uri"))
|
||||
.map_err(Error::from)
|
||||
@ -646,7 +645,7 @@ mod test {
|
||||
fn test_basic_consistent_download_uri() -> Result<(), Error> {
|
||||
let b = uri_builder("http://foo.com", Some("http://bar.com"))?;
|
||||
assert_eq!(
|
||||
b.consistent_download_uri(&content_id()?, oid()?)?
|
||||
b.consistent_download_uri(&content_id()?, format!("{}", oid()?))?
|
||||
.to_string(),
|
||||
format!(
|
||||
"http://foo.com/repo123/download/{}?routing={}",
|
||||
|
@ -21,7 +21,8 @@
|
||||
> "enable_consistent_routing": false,
|
||||
> "disable_hostname_logging": false,
|
||||
> "throttle_limits": [],
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
|
@ -17,7 +17,8 @@
|
||||
> "disable_hostname_logging": false,
|
||||
> "throttle_limits": [],
|
||||
> "acl_check": false,
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
|
@ -16,7 +16,8 @@
|
||||
> "enable_consistent_routing": false,
|
||||
> "disable_hostname_logging": true,
|
||||
> "throttle_limits": [],
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
@ -34,6 +35,7 @@
|
||||
"object_popularity": null,
|
||||
"object_popularity_category": "",
|
||||
"object_popularity_threshold": 0,
|
||||
"tasks_per_content": 1,
|
||||
"throttle_limits": [],
|
||||
"track_bytes_sent": true
|
||||
}
|
||||
@ -56,7 +58,8 @@
|
||||
> "enable_consistent_routing": false,
|
||||
> "disable_hostname_logging": false,
|
||||
> "throttle_limits": [],
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
|
@ -16,7 +16,8 @@
|
||||
> "enable_consistent_routing": false,
|
||||
> "disable_hostname_logging": false,
|
||||
> "throttle_limits": [],
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
@ -33,6 +34,7 @@
|
||||
"object_popularity": null,
|
||||
"object_popularity_category": "",
|
||||
"object_popularity_threshold": 0,
|
||||
"tasks_per_content": 1,
|
||||
"throttle_limits": [],
|
||||
"track_bytes_sent": true
|
||||
}
|
||||
@ -44,7 +46,8 @@
|
||||
> "enable_consistent_routing": false,
|
||||
> "disable_hostname_logging": false,
|
||||
> "throttle_limits": [],
|
||||
> "enforce_acl_check": false
|
||||
> "enforce_acl_check": false,
|
||||
> "tasks_per_content": 1
|
||||
> }
|
||||
> EOF
|
||||
|
||||
@ -62,6 +65,7 @@
|
||||
"object_popularity": null,
|
||||
"object_popularity_category": "",
|
||||
"object_popularity_threshold": 0,
|
||||
"tasks_per_content": 1,
|
||||
"throttle_limits": [],
|
||||
"track_bytes_sent": false
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user