mononoke: support many blobstores in metaconfig

Summary: Restructure the configs so that we can specify more than one blobstore

Reviewed By: lukaspiatkowski

Differential Revision: D13234286

fbshipit-source-id: a98ede17921ed6148add570288ac23636b086398
This commit is contained in:
Stanislau Hlebik 2018-12-05 05:56:00 -08:00 committed by Facebook Github Bot
parent a51cb1dacf
commit 706e98799b
8 changed files with 187 additions and 73 deletions

View File

@ -23,8 +23,9 @@ use blobrepo::{BlobRepo, get_sha256_alias, get_sha256_alias_key};
use context::CoreContext;
use mercurial_types::{HgManifestId, RepositoryId};
use mercurial_types::manifest::Content;
use metaconfig::RemoteBlobstoreArgs;
use metaconfig::repoconfig::RepoConfig;
use metaconfig::repoconfig::RepoType::{BlobFiles, BlobManifold, BlobRocks};
use metaconfig::repoconfig::RepoType::{BlobFiles, BlobRemote, BlobRocks};
use mononoke_types::FileContents;
use reachabilityindex::{GenerationNumberBFS, ReachabilityIndex};
@ -56,24 +57,44 @@ impl MononokeRepo {
BlobRocks(path) => BlobRepo::new_rocksdb(logger.clone(), &path, repoid)
.into_future()
.left_future(),
BlobManifold(args) => match myrouter_port {
None => Err(err_msg(
"Missing myrouter port, unable to open BlobManifold repo",
)).into_future()
.left_future(),
Some(myrouter_port) => myrouter::wait_for_myrouter(myrouter_port, &args.db_address)
.and_then({
cloned!(logger);
move |()| {
BlobRepo::new_manifold_no_postcommit(
logger,
&args,
repoid,
myrouter_port,
)
BlobRemote{
ref blobstores_args,
ref db_address,
ref filenode_shards,
} => {
if blobstores_args.len() != 1 {
Err(err_msg("only single manifold blobstore is supported"))
.into_future()
.left_future()
} else {
let manifold_args = match blobstores_args.get(0).unwrap() {
RemoteBlobstoreArgs::Manifold(manifold_args) => {
manifold_args
}
})
.right_future(),
};
match myrouter_port {
None => Err(err_msg(
"Missing myrouter port, unable to open BlobRemote repo",
)).into_future()
.left_future(),
Some(myrouter_port) => myrouter::wait_for_myrouter(myrouter_port, &db_address)
.and_then({
cloned!(db_address, filenode_shards, logger, manifold_args);
move |()| {
BlobRepo::new_manifold_no_postcommit(
logger,
&manifold_args,
db_address.clone(),
filenode_shards.clone(),
repoid,
myrouter_port,
)
}
})
.right_future(),
}
}
},
_ => Err(err_msg("Unsupported repo type."))
.into_future()

View File

@ -282,6 +282,8 @@ impl BlobRepo {
pub fn new_manifold_scribe_commits<C>(
logger: Logger,
args: &ManifoldArgs,
db_address: String,
filenode_shards: Option<usize>,
repoid: RepositoryId,
myrouter_port: u16,
scribe: C,
@ -289,7 +291,14 @@ impl BlobRepo {
where
C: ScribeClient + Sync + Send + 'static,
{
let mut repo = Self::new_manifold_no_postcommit(logger, args, repoid, myrouter_port)?;
let mut repo = Self::new_manifold_no_postcommit(
logger,
args,
db_address,
filenode_shards,
repoid,
myrouter_port,
)?;
let category = format!("mononoke_commits");
repo.postcommit_queue = Arc::new(post_commit::LogToScribe::new(scribe, category));
Ok(repo)
@ -298,10 +307,12 @@ impl BlobRepo {
pub fn new_manifold_no_postcommit(
logger: Logger,
args: &ManifoldArgs,
db_address: String,
filenode_shards: Option<usize>,
repoid: RepositoryId,
myrouter_port: u16,
) -> Result<Self> {
let bookmarks = SqlBookmarks::with_myrouter(&args.db_address, myrouter_port);
let bookmarks = SqlBookmarks::with_myrouter(&db_address, myrouter_port);
let blobstore = ThriftManifoldBlob::new(args.bucket.clone())?;
let blobstore = PrefixBlobstore::new(blobstore, format!("flat/{}", args.prefix));
@ -315,11 +326,9 @@ impl BlobRepo {
))?);
let blobstore = Arc::new(new_cachelib_blobstore(blobstore, blob_pool, presence_pool));
let filenodes = match args.filenode_shards {
Some(shards) => {
SqlFilenodes::with_sharded_myrouter(&args.db_address, myrouter_port, shards)
}
None => SqlFilenodes::with_myrouter(&args.db_address, myrouter_port),
let filenodes = match filenode_shards {
Some(shards) => SqlFilenodes::with_sharded_myrouter(&db_address, myrouter_port, shards),
None => SqlFilenodes::with_myrouter(&db_address, myrouter_port),
};
let filenodes = CachingFilenodes::new(
Arc::new(filenodes),
@ -327,17 +336,17 @@ impl BlobRepo {
"filenodes".to_string(),
)))?,
"sqlfilenodes",
&args.db_address,
&db_address,
);
let changesets = SqlChangesets::with_myrouter(&args.db_address, myrouter_port);
let changesets = SqlChangesets::with_myrouter(&db_address, myrouter_port);
let changesets_cache_pool = cachelib::get_pool("changesets").ok_or(Error::from(
ErrorKind::MissingCachePool("changesets".to_string()),
))?;
let changesets = CachingChangests::new(Arc::new(changesets), changesets_cache_pool.clone());
let changesets = Arc::new(changesets);
let bonsai_hg_mapping = SqlBonsaiHgMapping::with_myrouter(&args.db_address, myrouter_port);
let bonsai_hg_mapping = SqlBonsaiHgMapping::with_myrouter(&db_address, myrouter_port);
let bonsai_hg_mapping = CachingBonsaiHgMapping::new(
Arc::new(bonsai_hg_mapping),
cachelib::get_pool("bonsai_hg_mapping").ok_or(Error::from(

View File

@ -26,7 +26,7 @@ use changesets::{SqlChangesets, SqlConstructors};
use context::CoreContext;
use hooks::HookManager;
use mercurial_types::RepositoryId;
use metaconfig::{ManifoldArgs, RepoConfigs, RepoReadOnly, RepoType};
use metaconfig::{ManifoldArgs, RemoteBlobstoreArgs, RepoConfigs, RepoReadOnly, RepoType};
use repo_client::{open_blobrepo, MononokeRepo};
const CACHE_ARGS: &[(&str, &str)] = &[
@ -231,13 +231,10 @@ pub fn open_sql_changesets(matches: &ArgMatches) -> Result<SqlChangesets> {
RepoType::BlobRocks(ref data_dir) => {
SqlChangesets::with_sqlite_path(data_dir.join("changesets"))
}
RepoType::BlobManifold(ref manifold_args) => {
RepoType::BlobRemote { ref db_address, .. } => {
let myrouter_port =
parse_myrouter_port(matches).expect("myrouter port provided is not provided");
Ok(SqlChangesets::with_myrouter(
&manifold_args.db_address,
myrouter_port,
))
Ok(SqlChangesets::with_myrouter(&db_address, myrouter_port))
}
RepoType::TestBlobDelayRocks(ref data_dir, ..) => {
SqlChangesets::with_sqlite_path(data_dir.join("changesets"))
@ -478,7 +475,15 @@ fn open_repo_internal<'a>(
setup_repo_dir(&data_dir, create).expect("Setting up rocksdb blobrepo failed");
logger.new(o!["BlobRepo:Rocksdb" => data_dir.to_string_lossy().into_owned()])
}
RepoType::BlobManifold(ref manifold_args) => {
RepoType::BlobRemote{ref blobstores_args, ..} => {
if blobstores_args.len() != 1 {
return Err(err_msg("only single manifold blobstore is supported"));
}
let manifold_args = match blobstores_args.get(0).unwrap() {
RemoteBlobstoreArgs::Manifold(manifold_args) => {
manifold_args
}
};
logger.new(o!["BlobRepo:Manifold" => manifold_args.bucket.clone()])
}
RepoType::TestBlobDelayRocks(ref data_dir, ..) => {
@ -512,12 +517,6 @@ pub fn parse_manifold_args<'a>(matches: &ArgMatches<'a>) -> ManifoldArgs {
ManifoldArgs {
bucket: matches.value_of("manifold-bucket").unwrap().to_string(),
prefix: matches.value_of("manifold-prefix").unwrap().to_string(),
db_address: matches.value_of("db-address").unwrap().to_string(),
filenode_shards: matches.value_of("filenode-shards").map(|shard_count| {
shard_count
.parse()
.expect("shard count must be a positive integer")
}),
}
}

View File

@ -154,9 +154,18 @@ fn create_blobrepo(logger: &Logger, matches: &ArgMatches) -> BlobRepo {
.expect("missing myrouter port")
.parse::<u16>()
.expect("myrouter port is not a valid u16");
let db_address = matches.value_of("db-address").unwrap().to_string();
let filenode_shards = matches.value_of("filenode-shards").map(|shard_count| {
shard_count
.parse()
.expect("shard count must be a positive integer")
});
BlobRepo::new_manifold_no_postcommit(
logger.clone(),
&manifold_args,
db_address,
filenode_shards,
RepositoryId::new(0),
myrouter_port,
).expect("failed to create blobrepo instance")

View File

@ -34,7 +34,7 @@ extern crate mononoke_types;
pub mod errors;
pub mod repoconfig;
pub use repoconfig::{CacheWarmupParams, LfsParams, ManifoldArgs, PushrebaseParams, RepoConfigs,
RepoReadOnly, RepoType};
pub use repoconfig::{CacheWarmupParams, LfsParams, ManifoldArgs, PushrebaseParams,
RemoteBlobstoreArgs, RepoConfigs, RepoReadOnly, RepoType};
pub use errors::{Error, ErrorKind};

View File

@ -24,10 +24,6 @@ pub struct ManifoldArgs {
pub bucket: String,
/// Prefix to be prepended to all the keys. In prod it should be ""
pub prefix: String,
/// Identifies the SQL database to connect to.
pub db_address: String,
/// If present, the number of shards to spread filenodes across
pub filenode_shards: Option<usize>,
}
/// Configuration of a single repository
@ -70,7 +66,7 @@ impl RepoConfig {
/// Returns a db address that is referenced in this config or None if there is none
pub fn get_db_address(&self) -> Option<&str> {
match self.repotype {
RepoType::BlobManifold(ref args) => Some(&args.db_address),
RepoType::BlobRemote { ref db_address, .. } => Some(&db_address),
_ => None,
}
}
@ -190,6 +186,13 @@ impl Default for LfsParams {
}
}
/// Remote blobstore arguments
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RemoteBlobstoreArgs {
/// Manifold arguments
Manifold(ManifoldArgs),
}
/// Types of repositories supported
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RepoType {
@ -202,7 +205,14 @@ pub enum RepoType {
/// RocksDb database
BlobRocks(PathBuf),
/// Blob repository with path pointing to the directory where a server socket is going to be.
BlobManifold(ManifoldArgs),
BlobRemote {
/// Manifold arguments
blobstores_args: Vec<RemoteBlobstoreArgs>,
/// Identifies the SQL database to connect to.
db_address: String,
/// If present, the number of shards to spread filenodes across
filenode_shards: Option<usize>,
},
/// Blob repository with path pointing to on-disk files with data. The files are stored in a
/// RocksDb database, and a log-normal delay is applied to access to simulate a remote store
/// like Manifold. Params are path, mean microseconds, stddev microseconds.
@ -380,16 +390,33 @@ impl RepoConfigs {
RawRepoType::Files => RepoType::BlobFiles(get_path(&this)?),
RawRepoType::BlobRocks => RepoType::BlobRocks(get_path(&this)?),
RawRepoType::BlobRemote => {
let manifold_bucket = this.manifold_bucket.ok_or(ErrorKind::InvalidConfig(
"manifold bucket must be specified".into(),
let remote_blobstores = this.remote_blobstore.ok_or(ErrorKind::InvalidConfig(
"remote blobstores must be specified".into(),
))?;
let mut blobstores_args = vec![];
let db_address = this.db_address.expect("xdb tier was not specified");
RepoType::BlobManifold(ManifoldArgs {
bucket: manifold_bucket,
prefix: this.manifold_prefix.unwrap_or("".into()),
for blobstore in remote_blobstores {
match blobstore.blobstore_type {
RawBlobstoreType::Manifold => {
let manifold_bucket =
blobstore.manifold_bucket.ok_or(ErrorKind::InvalidConfig(
"manifold bucket must be specified".into(),
))?;
let manifold_args = ManifoldArgs {
bucket: manifold_bucket,
prefix: blobstore.manifold_prefix.unwrap_or("".into()),
};
blobstores_args.push(RemoteBlobstoreArgs::Manifold(manifold_args));
}
}
}
RepoType::BlobRemote {
blobstores_args,
db_address,
filenode_shards: this.filenode_shards,
})
}
}
RawRepoType::TestBlobDelayRocks => RepoType::TestBlobDelayRocks(
get_path(&this)?,
@ -488,8 +515,6 @@ struct RawRepoConfig {
repotype: RawRepoType,
enabled: Option<bool>,
generation_cache_size: Option<usize>,
manifold_bucket: Option<String>,
manifold_prefix: Option<String>,
repoid: i32,
db_address: Option<String>,
filenode_shards: Option<usize>,
@ -507,6 +532,7 @@ struct RawRepoConfig {
readonly: Option<bool>,
hook_manager_params: Option<HookManagerParams>,
skiplist_index_blobstore_key: Option<String>,
remote_blobstore: Option<Vec<RawRemoteBlobstoreConfig>>,
}
#[derive(Debug, Deserialize, Clone)]
@ -541,6 +567,13 @@ struct RawHookConfig {
bypass_pushvar: Option<String>,
}
#[derive(Debug, Deserialize, Clone)]
struct RawRemoteBlobstoreConfig {
blobstore_type: RawBlobstoreType,
manifold_bucket: Option<String>,
manifold_prefix: Option<String>,
}
/// Types of repositories supported
#[derive(Clone, Debug, Deserialize)]
enum RawRepoType {
@ -550,6 +583,12 @@ enum RawRepoType {
#[serde(rename = "blob:testdelay")] TestBlobDelayRocks,
}
/// Types of blobstores supported
#[derive(Clone, Debug, Deserialize)]
enum RawBlobstoreType {
#[serde(rename = "manifold")] Manifold,
}
#[derive(Clone, Debug, Deserialize)]
struct RawPushrebaseParams {
rewritedates: Option<bool>,
@ -574,8 +613,8 @@ mod test {
let hook1_content = "this is hook1";
let hook2_content = "this is hook2";
let fbsource_content = r#"
path="/tmp/fbsource"
repotype="blob:rocks"
db_address="db_address"
repotype="blob:remote"
generation_cache_size=1048576
repoid=0
scuba_table="scuba_table"
@ -586,6 +625,13 @@ mod test {
[hook_manager_params]
entrylimit=1234
weightlimit=4321
[[remote_blobstore]]
blobstore_type="manifold"
manifold_bucket="bucket"
[[remote_blobstore]]
blobstore_type="manifold"
manifold_bucket="anotherbucket"
manifold_prefix="someprefix"
[[bookmarks]]
name="master"
[[bookmarks.hooks]]
@ -640,12 +686,27 @@ mod test {
let repoconfig = RepoConfigs::read_configs(tmp_dir.path()).expect("failed to read configs");
let first_manifold_args = ManifoldArgs {
bucket: "bucket".into(),
prefix: "".into(),
};
let second_manifold_args = ManifoldArgs {
bucket: "anotherbucket".into(),
prefix: "someprefix".into(),
};
let mut repos = HashMap::new();
repos.insert(
"fbsource".to_string(),
RepoConfig {
enabled: true,
repotype: RepoType::BlobRocks("/tmp/fbsource".into()),
repotype: RepoType::BlobRemote{
db_address: "db_address".into(),
blobstores_args: vec![
RemoteBlobstoreArgs::Manifold(first_manifold_args),
RemoteBlobstoreArgs::Manifold(second_manifold_args),
],
filenode_shards: None,
},
generation_cache_size: 1024 * 1024,
repoid: 0,
scuba_table: Some("scuba_table".to_string()),

View File

@ -20,7 +20,7 @@ use blobrepo::BlobRepo;
use blobstore::{Blobstore, PrefixBlobstore};
use hooks::HookManager;
use mercurial_types::RepositoryId;
use metaconfig::{LfsParams, PushrebaseParams};
use metaconfig::{LfsParams, PushrebaseParams, RemoteBlobstoreArgs};
use metaconfig::repoconfig::{RepoReadOnly, RepoType};
use errors::*;
@ -113,15 +113,30 @@ pub fn open_blobrepo(
let blobrepo = match repotype {
BlobFiles(ref path) => BlobRepo::new_files(logger, &path, repoid)?,
BlobRocks(ref path) => BlobRepo::new_rocksdb(logger, &path, repoid)?,
BlobManifold(ref args) => BlobRepo::new_manifold_scribe_commits(
logger,
args,
repoid,
myrouter_port.ok_or(err_msg(
"Missing myrouter port, unable to open BlobManifold repo",
))?,
ScribeCxxClient::new(),
)?,
BlobRemote {
ref blobstores_args,
ref db_address,
ref filenode_shards,
} => {
if blobstores_args.len() != 1 {
return Err(err_msg("only single manifold blobstore is supported"));
}
let manifold_args = match blobstores_args.get(0).unwrap() {
RemoteBlobstoreArgs::Manifold(manifold_args) => manifold_args,
};
BlobRepo::new_manifold_scribe_commits(
logger,
manifold_args,
db_address.clone(),
filenode_shards.clone(),
repoid,
myrouter_port.ok_or(err_msg(
"Missing myrouter port, unable to open BlobRemote repo",
))?,
ScribeCxxClient::new(),
)?
}
TestBlobDelayRocks(ref path, mean, stddev) => {
// We take in an arithmetic mean and stddev, and deduce a log normal
let mean = mean as f64 / 1_000_000.0;

View File

@ -95,10 +95,10 @@ pub fn repo_handlers(
try_boxfuture!(load_hooks(&mut hook_manager, config.clone()));
let streaming_clone = match config.repotype {
RepoType::BlobManifold(ref args) => Some(try_boxfuture!(streaming_clone(
RepoType::BlobRemote{ ref db_address, ..} => Some(try_boxfuture!(streaming_clone(
blobrepo.clone(),
&args.db_address,
myrouter_port.expect("myrouter_port not provided for BlobManifold repo"),
&db_address,
myrouter_port.expect("myrouter_port not provided for BlobRemote repo"),
repoid
))),
_ => None,