mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 08:47:12 +03:00
mononoke: Migrate to new config structures, leaving config files unchanged
Summary: This migrates the internal structures representing the repo and storage config, while retaining the existing config file format. The `RepoType` type has been replaced by `BlobConfig`, an enum containing all the config information for all the supported blobstores. In addition there's the `StorageConfig` type which includes `BlobConfig`, and also `MetadataDBConfig` for the local or remote SQL database for metadata. Reviewed By: StanislavGlebik Differential Revision: D15065421 fbshipit-source-id: 47636074fceb6a7e35524f667376a5bb05bd8612
This commit is contained in:
parent
6123587f83
commit
046abb21ad
@ -83,7 +83,7 @@ impl MononokeRepo {
|
||||
let sha1_cache = cachelib::get_pool("content-sha1");
|
||||
open_blobrepo(
|
||||
logger.clone(),
|
||||
config.repotype,
|
||||
config.storage_config.clone(),
|
||||
repoid,
|
||||
myrouter_port,
|
||||
config.bookmarks_cache_ttl,
|
||||
|
@ -4,124 +4,201 @@
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use std::{path::Path, sync::Arc, time::Duration};
|
||||
|
||||
use cloned::cloned;
|
||||
use failure_ext::prelude::*;
|
||||
use failure_ext::{err_msg, Error, Result};
|
||||
use futures::{
|
||||
future::{self, IntoFuture},
|
||||
Future,
|
||||
};
|
||||
use futures_ext::{BoxFuture, FutureExt};
|
||||
use slog::{self, o, Discard, Drain, Logger};
|
||||
|
||||
use blobrepo::BlobRepo;
|
||||
use blobrepo_errors::*;
|
||||
use blobstore::Blobstore;
|
||||
use blobstore_sync_queue::{BlobstoreSyncQueue, SqlBlobstoreSyncQueue};
|
||||
use blobstore::{Blobstore, DisabledBlob};
|
||||
use blobstore_sync_queue::SqlBlobstoreSyncQueue;
|
||||
use bonsai_hg_mapping::{CachingBonsaiHgMapping, SqlBonsaiHgMapping};
|
||||
use bookmarks::{Bookmarks, CachedBookmarks};
|
||||
use cacheblob::{dummy::DummyLease, new_cachelib_blobstore, new_memcache_blobstore, MemcacheOps};
|
||||
use changeset_fetcher::{ChangesetFetcher, SimpleChangesetFetcher};
|
||||
use changesets::{CachingChangests, SqlChangesets};
|
||||
use cloned::cloned;
|
||||
use dbbookmarks::SqlBookmarks;
|
||||
use failure_ext::prelude::*;
|
||||
use failure_ext::{Error, Result};
|
||||
use fileblob::Fileblob;
|
||||
use filenodes::CachingFilenodes;
|
||||
use futures::{
|
||||
future::{self, IntoFuture},
|
||||
Future,
|
||||
};
|
||||
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
|
||||
use glusterblob::Glusterblob;
|
||||
use manifoldblob::ThriftManifoldBlob;
|
||||
use memblob::EagerMemblob;
|
||||
use metaconfig_types::{self, RemoteBlobstoreArgs, RepoType, ShardedFilenodesParams};
|
||||
use metaconfig_types::{self, BlobConfig, MetadataDBConfig, ShardedFilenodesParams, StorageConfig};
|
||||
use mononoke_types::RepositoryId;
|
||||
use multiplexedblob::MultiplexedBlobstore;
|
||||
use prefixblob::PrefixBlobstore;
|
||||
use rocksblob::Rocksblob;
|
||||
use rocksdb;
|
||||
use scuba::ScubaClient;
|
||||
use slog::{self, o, Discard, Drain, Logger};
|
||||
use sqlblob::Sqlblob;
|
||||
use sqlfilenodes::{SqlConstructors, SqlFilenodes};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Create a new BlobRepo with purely local state.
|
||||
fn new_local(
|
||||
logger: Logger,
|
||||
path: &Path,
|
||||
blobstore: Arc<Blobstore>,
|
||||
repoid: RepositoryId,
|
||||
) -> Result<BlobRepo> {
|
||||
let bookmarks = SqlBookmarks::with_sqlite_path(path.join("books"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Bookmarks))?;
|
||||
let filenodes = SqlFilenodes::with_sqlite_path(path.join("filenodes"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Filenodes))?;
|
||||
let changesets = SqlChangesets::with_sqlite_path(path.join("changesets"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Changesets))?;
|
||||
let bonsai_hg_mapping = SqlBonsaiHgMapping::with_sqlite_path(path.join("bonsai_hg_mapping"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::BonsaiHgMapping))?;
|
||||
|
||||
Ok(BlobRepo::new(
|
||||
logger,
|
||||
Arc::new(bookmarks),
|
||||
blobstore,
|
||||
Arc::new(filenodes),
|
||||
Arc::new(changesets),
|
||||
Arc::new(bonsai_hg_mapping),
|
||||
repoid,
|
||||
Arc::new(DummyLease {}),
|
||||
))
|
||||
}
|
||||
|
||||
/// Most local use cases should use new_rocksdb instead. This is only meant for test
|
||||
/// fixtures.
|
||||
fn new_files(logger: Logger, path: &Path, repoid: RepositoryId) -> Result<BlobRepo> {
|
||||
let blobstore = Fileblob::create(path.join("blobs"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
|
||||
new_local(logger, path, Arc::new(blobstore), repoid)
|
||||
}
|
||||
|
||||
fn new_rocksdb(logger: Logger, path: &Path, repoid: RepositoryId) -> Result<BlobRepo> {
|
||||
let options = rocksdb::Options::new().create_if_missing(true);
|
||||
let blobstore = Rocksblob::open_with_options(path.join("blobs"), options)
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
|
||||
new_local(logger, path, Arc::new(blobstore), repoid)
|
||||
}
|
||||
|
||||
fn new_sqlite(logger: Logger, path: &Path, repoid: RepositoryId) -> Result<BlobRepo> {
|
||||
let blobstore = Sqlblob::with_sqlite_path(repoid, path.join("blobs"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
|
||||
new_local(logger, path, Arc::new(blobstore), repoid)
|
||||
}
|
||||
|
||||
/// Construct a new BlobRepo with the given storage configuration. If the metadata DB is
|
||||
/// remote (ie, MySQL), then it configures a full set of caches. Otherwise with local storage
|
||||
/// it's assumed to be a test configuration.
|
||||
///
|
||||
/// The blobstore config is actually orthogonal to this, but it wouldn't make much sense to
|
||||
/// configure a local blobstore with a remote db, or vice versa. There's no error checking
|
||||
/// at this level (aside from disallowing a multiplexed blobstore with a local db).
|
||||
pub fn open_blobrepo(
|
||||
logger: slog::Logger,
|
||||
repotype: RepoType,
|
||||
storage_config: StorageConfig,
|
||||
repoid: RepositoryId,
|
||||
myrouter_port: Option<u16>,
|
||||
bookmarks_cache_ttl: Option<Duration>,
|
||||
) -> impl Future<Item = BlobRepo, Error = Error> {
|
||||
use metaconfig_types::RepoType::*;
|
||||
) -> BoxFuture<BlobRepo, Error> {
|
||||
let blobstore = make_blobstore(
|
||||
repoid,
|
||||
&storage_config.blobstore,
|
||||
&storage_config.dbconfig,
|
||||
myrouter_port,
|
||||
);
|
||||
|
||||
match repotype {
|
||||
BlobFiles(ref path) => new_files(logger, &path, repoid).into_future().left_future(),
|
||||
BlobRocks(ref path) => new_rocksdb(logger, &path, repoid)
|
||||
blobstore
|
||||
.and_then(move |blobstore| match storage_config.dbconfig {
|
||||
MetadataDBConfig::LocalDB { path } => new_local(logger, &path, blobstore, repoid),
|
||||
MetadataDBConfig::Mysql {
|
||||
db_address,
|
||||
sharded_filenodes,
|
||||
} => new_remote(
|
||||
logger,
|
||||
db_address,
|
||||
sharded_filenodes,
|
||||
blobstore,
|
||||
repoid,
|
||||
myrouter_port,
|
||||
bookmarks_cache_ttl,
|
||||
),
|
||||
})
|
||||
.boxify()
|
||||
}
|
||||
|
||||
/// Construct a blobstore according to the specification. The multiplexed blobstore
|
||||
/// needs an SQL DB for its queue, as does the MySQL blobstore.
|
||||
fn make_blobstore(
|
||||
repoid: RepositoryId,
|
||||
blobconfig: &BlobConfig,
|
||||
dbconfig: &MetadataDBConfig,
|
||||
myrouter_port: Option<u16>,
|
||||
) -> BoxFuture<Arc<Blobstore>, Error> {
|
||||
use BlobConfig::*;
|
||||
|
||||
match blobconfig {
|
||||
Disabled => {
|
||||
Ok(Arc::new(DisabledBlob::new("Disabled by configuration")) as Arc<dyn Blobstore>)
|
||||
.into_future()
|
||||
.boxify()
|
||||
}
|
||||
|
||||
Files { path } => Fileblob::create(path.join("blobs"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.map_err(Error::from)
|
||||
.into_future()
|
||||
.left_future(),
|
||||
BlobSqlite(ref path) => new_sqlite(logger, &path, repoid)
|
||||
.boxify(),
|
||||
|
||||
Rocks { path } => {
|
||||
let options = rocksdb::Options::new().create_if_missing(true);
|
||||
Rocksblob::open_with_options(path.join("blobs"), options)
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.map_err(Error::from)
|
||||
.into_future()
|
||||
.boxify()
|
||||
}
|
||||
|
||||
Sqlite { path } => Sqlblob::with_sqlite_path(repoid, path.join("blobs"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))
|
||||
.map_err(Error::from)
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.into_future()
|
||||
.left_future(),
|
||||
BlobRemote {
|
||||
ref blobstores_args,
|
||||
ref db_address,
|
||||
write_lock_db_address: _,
|
||||
ref sharded_filenodes,
|
||||
} => new_remote(
|
||||
logger,
|
||||
blobstores_args,
|
||||
db_address.clone(),
|
||||
sharded_filenodes.clone(),
|
||||
repoid,
|
||||
myrouter_port,
|
||||
bookmarks_cache_ttl,
|
||||
)
|
||||
.right_future(),
|
||||
.boxify(),
|
||||
|
||||
Manifold { bucket, prefix } => ThriftManifoldBlob::new(bucket.clone())
|
||||
.map({
|
||||
cloned!(prefix);
|
||||
move |manifold| PrefixBlobstore::new(manifold, format!("flat/{}", prefix))
|
||||
})
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Blobstore))
|
||||
.map_err(Error::from)
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.into_future()
|
||||
.boxify(),
|
||||
|
||||
Gluster {
|
||||
tier,
|
||||
export,
|
||||
basepath,
|
||||
} => Glusterblob::with_smc(tier.clone(), export.clone(), basepath.clone())
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.boxify(),
|
||||
|
||||
Mysql {
|
||||
shard_map,
|
||||
shard_num,
|
||||
} => if let Some(myrouter_port) = myrouter_port {
|
||||
Sqlblob::with_myrouter(repoid, shard_map, myrouter_port, *shard_num)
|
||||
} else {
|
||||
Sqlblob::with_raw_xdb_shardmap(repoid, shard_map, *shard_num)
|
||||
}
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.into_future()
|
||||
.boxify(),
|
||||
|
||||
Multiplexed {
|
||||
scuba_table,
|
||||
blobstores,
|
||||
} => {
|
||||
let queue = dbconfig
|
||||
.get_db_address()
|
||||
.ok_or_else(|| err_msg("Multiplexed blobstore requires remote DB for queue"))
|
||||
.and_then(move |dbaddr| {
|
||||
myrouter_port
|
||||
.ok_or_else(|| err_msg("Need myrouter port for remote DB"))
|
||||
.map(|port| (dbaddr, port))
|
||||
})
|
||||
.map(|(addr, port)| Arc::new(SqlBlobstoreSyncQueue::with_myrouter(addr, port)))
|
||||
.into_future();
|
||||
|
||||
let components: Vec<_> = blobstores
|
||||
.iter()
|
||||
.map({
|
||||
cloned!(dbconfig);
|
||||
move |(blobstoreid, config)| {
|
||||
cloned!(blobstoreid);
|
||||
make_blobstore(repoid, config, &dbconfig, myrouter_port)
|
||||
.map({ move |store| (blobstoreid, store) })
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
queue
|
||||
.and_then({
|
||||
cloned!(scuba_table);
|
||||
move |queue| {
|
||||
future::join_all(components).map({
|
||||
move |components| {
|
||||
MultiplexedBlobstore::new(
|
||||
repoid,
|
||||
components,
|
||||
queue,
|
||||
scuba_table.map(|table| Arc::new(ScubaClient::new(table))),
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
.map(|store| Arc::new(store) as Arc<dyn Blobstore>)
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,163 +276,110 @@ fn new_filenodes(
|
||||
Ok(filenodes)
|
||||
}
|
||||
|
||||
pub fn new_remote(
|
||||
/// Create a new BlobRepo with purely local state. (Well, it could be a remote blobstore, but
|
||||
/// that would be weird to use with a local metadata db.)
|
||||
fn new_local(
|
||||
logger: Logger,
|
||||
dbpath: &Path,
|
||||
blobstore: Arc<Blobstore>,
|
||||
repoid: RepositoryId,
|
||||
) -> Result<BlobRepo> {
|
||||
let bookmarks = SqlBookmarks::with_sqlite_path(dbpath.join("books"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Bookmarks))?;
|
||||
let filenodes = SqlFilenodes::with_sqlite_path(dbpath.join("filenodes"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Filenodes))?;
|
||||
let changesets = SqlChangesets::with_sqlite_path(dbpath.join("changesets"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Changesets))?;
|
||||
let bonsai_hg_mapping = SqlBonsaiHgMapping::with_sqlite_path(dbpath.join("bonsai_hg_mapping"))
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::BonsaiHgMapping))?;
|
||||
|
||||
Ok(BlobRepo::new(
|
||||
logger,
|
||||
Arc::new(bookmarks),
|
||||
blobstore,
|
||||
Arc::new(filenodes),
|
||||
Arc::new(changesets),
|
||||
Arc::new(bonsai_hg_mapping),
|
||||
repoid,
|
||||
Arc::new(DummyLease {}),
|
||||
))
|
||||
}
|
||||
|
||||
fn open_xdb<T: SqlConstructors>(addr: &str, myrouter_port: Option<u16>) -> Result<Arc<T>> {
|
||||
let ret = if let Some(myrouter_port) = myrouter_port {
|
||||
T::with_myrouter(addr, myrouter_port)
|
||||
} else {
|
||||
T::with_raw_xdb_tier(addr)?
|
||||
};
|
||||
Ok(Arc::new(ret))
|
||||
}
|
||||
|
||||
/// If the DB is remote then set up for a full production configuration.
|
||||
/// In theory this could be with a local blobstore, but that would just be weird.
|
||||
fn new_remote(
|
||||
logger: Logger,
|
||||
args: &RemoteBlobstoreArgs,
|
||||
db_address: String,
|
||||
sharded_filenodes: Option<ShardedFilenodesParams>,
|
||||
blobstore: Arc<Blobstore>,
|
||||
repoid: RepositoryId,
|
||||
myrouter_port: Option<u16>,
|
||||
bookmarks_cache_ttl: Option<Duration>,
|
||||
) -> impl Future<Item = BlobRepo, Error = Error> {
|
||||
// recursively construct blobstore from arguments
|
||||
fn eval_remote_args(
|
||||
args: RemoteBlobstoreArgs,
|
||||
repoid: RepositoryId,
|
||||
myrouter_port: Option<u16>,
|
||||
queue: Arc<BlobstoreSyncQueue>,
|
||||
) -> BoxFuture<Arc<Blobstore>, Error> {
|
||||
match args {
|
||||
RemoteBlobstoreArgs::Manifold(manifold_args) => {
|
||||
let blobstore: Arc<Blobstore> = Arc::new(PrefixBlobstore::new(
|
||||
try_boxfuture!(ThriftManifoldBlob::new(manifold_args.bucket.clone())),
|
||||
format!("flat/{}", manifold_args.prefix),
|
||||
));
|
||||
future::ok(blobstore).boxify()
|
||||
}
|
||||
RemoteBlobstoreArgs::Gluster(args) => {
|
||||
Glusterblob::with_smc(args.tier, args.export, args.basepath)
|
||||
.map(|blobstore| -> Arc<Blobstore> { Arc::new(blobstore) })
|
||||
.boxify()
|
||||
}
|
||||
RemoteBlobstoreArgs::Mysql(args) => {
|
||||
let blobstore: Arc<Blobstore> = match myrouter_port {
|
||||
Some(myrouter_port) => Arc::new(try_boxfuture!(Sqlblob::with_myrouter(
|
||||
repoid,
|
||||
args.shardmap,
|
||||
myrouter_port,
|
||||
args.shard_num,
|
||||
))),
|
||||
None => Arc::new(try_boxfuture!(Sqlblob::with_raw_xdb_shardmap(
|
||||
repoid,
|
||||
args.shardmap,
|
||||
args.shard_num,
|
||||
))),
|
||||
};
|
||||
future::ok(blobstore).boxify()
|
||||
}
|
||||
RemoteBlobstoreArgs::Multiplexed {
|
||||
scuba_table,
|
||||
blobstores,
|
||||
} => {
|
||||
let blobstores: Vec<_> = blobstores
|
||||
.into_iter()
|
||||
.map(|(blobstore_id, arg)| {
|
||||
eval_remote_args(arg, repoid, myrouter_port, queue.clone())
|
||||
.map(move |blobstore| (blobstore_id, blobstore))
|
||||
})
|
||||
.collect();
|
||||
future::join_all(blobstores)
|
||||
.map(move |blobstores| {
|
||||
if blobstores.len() == 1 {
|
||||
let (_, blobstore) = blobstores.into_iter().next().unwrap();
|
||||
blobstore
|
||||
} else {
|
||||
Arc::new(MultiplexedBlobstore::new(
|
||||
repoid,
|
||||
blobstores,
|
||||
queue.clone(),
|
||||
scuba_table.map(|table| Arc::new(ScubaClient::new(table))),
|
||||
))
|
||||
}
|
||||
})
|
||||
.boxify()
|
||||
}
|
||||
) -> Result<BlobRepo> {
|
||||
let blobstore = new_memcache_blobstore(blobstore, "multiplexed", "")?;
|
||||
let blob_pool = Arc::new(cachelib::get_pool("blobstore-blobs").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("blobstore-blobs".to_string()),
|
||||
))?);
|
||||
let presence_pool = Arc::new(cachelib::get_pool("blobstore-presence").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("blobstore-presence".to_string()),
|
||||
))?);
|
||||
let blobstore = Arc::new(new_cachelib_blobstore(blobstore, blob_pool, presence_pool));
|
||||
|
||||
let filenodes = new_filenodes(&db_address, sharded_filenodes, myrouter_port)?;
|
||||
|
||||
let bookmarks: Arc<dyn Bookmarks> = {
|
||||
let bookmarks = open_xdb::<SqlBookmarks>(&db_address, myrouter_port)?;
|
||||
if let Some(ttl) = bookmarks_cache_ttl {
|
||||
Arc::new(CachedBookmarks::new(bookmarks, ttl))
|
||||
} else {
|
||||
bookmarks
|
||||
}
|
||||
}
|
||||
|
||||
let blobstore_sync_queue: Arc<BlobstoreSyncQueue> = match myrouter_port {
|
||||
Some(myrouter_port) => Arc::new(SqlBlobstoreSyncQueue::with_myrouter(
|
||||
&db_address,
|
||||
myrouter_port,
|
||||
)),
|
||||
None => Arc::new(try_boxfuture!(SqlBlobstoreSyncQueue::with_raw_xdb_tier(
|
||||
&db_address
|
||||
))),
|
||||
};
|
||||
eval_remote_args(args.clone(), repoid, myrouter_port, blobstore_sync_queue)
|
||||
.and_then(move |blobstore| {
|
||||
let blobstore = new_memcache_blobstore(blobstore, "multiplexed", "")?;
|
||||
let blob_pool = Arc::new(cachelib::get_pool("blobstore-blobs").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("blobstore-blobs".to_string()),
|
||||
))?);
|
||||
let presence_pool =
|
||||
Arc::new(cachelib::get_pool("blobstore-presence").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("blobstore-presence".to_string()),
|
||||
))?);
|
||||
let blobstore = Arc::new(new_cachelib_blobstore(blobstore, blob_pool, presence_pool));
|
||||
|
||||
let filenodes = new_filenodes(&db_address, sharded_filenodes, myrouter_port)?;
|
||||
let changesets = open_xdb::<SqlChangesets>(&db_address, myrouter_port)?;
|
||||
let changesets_cache_pool = cachelib::get_pool("changesets").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("changesets".to_string()),
|
||||
))?;
|
||||
let changesets = CachingChangests::new(changesets, changesets_cache_pool.clone());
|
||||
let changesets = Arc::new(changesets);
|
||||
|
||||
let bookmarks: Arc<dyn Bookmarks> = {
|
||||
let bookmarks = match myrouter_port {
|
||||
Some(myrouter_port) => {
|
||||
Arc::new(SqlBookmarks::with_myrouter(&db_address, myrouter_port))
|
||||
}
|
||||
None => Arc::new(SqlBookmarks::with_raw_xdb_tier(&db_address)?),
|
||||
};
|
||||
if let Some(ttl) = bookmarks_cache_ttl {
|
||||
Arc::new(CachedBookmarks::new(bookmarks, ttl))
|
||||
} else {
|
||||
bookmarks
|
||||
}
|
||||
};
|
||||
let bonsai_hg_mapping = open_xdb::<SqlBonsaiHgMapping>(&db_address, myrouter_port)?;
|
||||
let bonsai_hg_mapping = CachingBonsaiHgMapping::new(
|
||||
bonsai_hg_mapping,
|
||||
cachelib::get_pool("bonsai_hg_mapping").ok_or(Error::from(ErrorKind::MissingCachePool(
|
||||
"bonsai_hg_mapping".to_string(),
|
||||
)))?,
|
||||
);
|
||||
|
||||
let changesets = match myrouter_port {
|
||||
Some(myrouter_port) => SqlChangesets::with_myrouter(&db_address, myrouter_port),
|
||||
None => SqlChangesets::with_raw_xdb_tier(&db_address)?,
|
||||
};
|
||||
let changesets_cache_pool = cachelib::get_pool("changesets").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("changesets".to_string()),
|
||||
))?;
|
||||
let changesets =
|
||||
CachingChangests::new(Arc::new(changesets), changesets_cache_pool.clone());
|
||||
let changesets = Arc::new(changesets);
|
||||
let changeset_fetcher_factory = {
|
||||
cloned!(changesets, repoid);
|
||||
move || {
|
||||
let res: Arc<ChangesetFetcher + Send + Sync> = Arc::new(SimpleChangesetFetcher::new(
|
||||
changesets.clone(),
|
||||
repoid.clone(),
|
||||
));
|
||||
res
|
||||
}
|
||||
};
|
||||
|
||||
let bonsai_hg_mapping = match myrouter_port {
|
||||
Some(myrouter_port) => {
|
||||
SqlBonsaiHgMapping::with_myrouter(&db_address, myrouter_port)
|
||||
}
|
||||
None => SqlBonsaiHgMapping::with_raw_xdb_tier(&db_address)?,
|
||||
};
|
||||
let bonsai_hg_mapping = CachingBonsaiHgMapping::new(
|
||||
Arc::new(bonsai_hg_mapping),
|
||||
cachelib::get_pool("bonsai_hg_mapping").ok_or(Error::from(
|
||||
ErrorKind::MissingCachePool("bonsai_hg_mapping".to_string()),
|
||||
))?,
|
||||
);
|
||||
|
||||
let changeset_fetcher_factory = {
|
||||
cloned!(changesets, repoid);
|
||||
move || {
|
||||
let res: Arc<ChangesetFetcher + Send + Sync> = Arc::new(
|
||||
SimpleChangesetFetcher::new(changesets.clone(), repoid.clone()),
|
||||
);
|
||||
res
|
||||
}
|
||||
};
|
||||
|
||||
Ok(BlobRepo::new_with_changeset_fetcher_factory(
|
||||
logger,
|
||||
bookmarks,
|
||||
blobstore,
|
||||
Arc::new(filenodes),
|
||||
changesets,
|
||||
Arc::new(bonsai_hg_mapping),
|
||||
repoid,
|
||||
Arc::new(changeset_fetcher_factory),
|
||||
Arc::new(MemcacheOps::new("bonsai-hg-generation", "")?),
|
||||
))
|
||||
})
|
||||
.boxify()
|
||||
Ok(BlobRepo::new_with_changeset_fetcher_factory(
|
||||
logger,
|
||||
bookmarks,
|
||||
blobstore,
|
||||
Arc::new(filenodes),
|
||||
changesets,
|
||||
Arc::new(bonsai_hg_mapping),
|
||||
repoid,
|
||||
Arc::new(changeset_fetcher_factory),
|
||||
Arc::new(MemcacheOps::new("bonsai-hg-generation", "")?),
|
||||
))
|
||||
}
|
||||
|
@ -37,9 +37,7 @@ use blobrepo_factory::open_blobrepo;
|
||||
use changesets::{SqlChangesets, SqlConstructors};
|
||||
use context::CoreContext;
|
||||
use metaconfig_parser::RepoConfigs;
|
||||
use metaconfig_types::{
|
||||
ManifoldArgs, MysqlBlobstoreArgs, RemoteBlobstoreArgs, RepoConfig, RepoType,
|
||||
};
|
||||
use metaconfig_types::{BlobConfig, MetadataDBConfig, RepoConfig};
|
||||
use mononoke_types::RepositoryId;
|
||||
|
||||
const CACHE_ARGS: &[(&str, &str)] = &[
|
||||
@ -301,14 +299,15 @@ where
|
||||
T: SqlConstructors,
|
||||
{
|
||||
let (_, config) = get_config(matches)?;
|
||||
match config.repotype {
|
||||
RepoType::BlobFiles(ref data_dir)
|
||||
| RepoType::BlobRocks(ref data_dir)
|
||||
| RepoType::BlobSqlite(ref data_dir) => T::with_sqlite_path(data_dir.join(name)),
|
||||
RepoType::BlobRemote { ref db_address, .. } => match parse_myrouter_port(matches) {
|
||||
Some(myrouter_port) => Ok(T::with_myrouter(&db_address, myrouter_port)),
|
||||
None => T::with_raw_xdb_tier(&db_address),
|
||||
},
|
||||
match &config.storage_config.dbconfig {
|
||||
MetadataDBConfig::LocalDB { path } => T::with_sqlite_path(path.join(name)),
|
||||
MetadataDBConfig::Mysql { db_address, .. } => {
|
||||
assert!(name != "filenodes");
|
||||
match parse_myrouter_port(matches) {
|
||||
Some(myrouter_port) => Ok(T::with_myrouter(&db_address, myrouter_port)),
|
||||
None => T::with_raw_xdb_tier(&db_address),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -541,14 +540,20 @@ pub fn get_config<'a>(matches: &ArgMatches<'a>) -> Result<(String, RepoConfig)>
|
||||
let repo_id = get_repo_id(matches)?;
|
||||
|
||||
let configs = read_configs(matches)?;
|
||||
let repo_config = configs
|
||||
let mut repo_config: Vec<_> = configs
|
||||
.repos
|
||||
.into_iter()
|
||||
.filter(|(_, config)| RepositoryId::new(config.repoid) == repo_id)
|
||||
.last();
|
||||
match repo_config {
|
||||
Some((name, config)) => Ok((name, config)),
|
||||
None => Err(err_msg(format!("unknown repoid {:?}", repo_id))),
|
||||
.collect();
|
||||
if repo_config.is_empty() {
|
||||
Err(err_msg(format!("unknown repoid {:?}", repo_id)))
|
||||
} else if repo_config.len() > 1 {
|
||||
Err(err_msg(format!(
|
||||
"repoid {:?} defined multiple times",
|
||||
repo_id
|
||||
)))
|
||||
} else {
|
||||
Ok(repo_config.pop().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@ -566,23 +571,37 @@ fn open_repo_internal<'a>(
|
||||
reponame,
|
||||
repo_id.as_ref().unwrap()
|
||||
);
|
||||
let logger = match config.repotype {
|
||||
RepoType::BlobFiles(ref data_dir) => {
|
||||
setup_repo_dir(&data_dir, create).expect("Setting up file blobrepo failed");
|
||||
logger.new(o!["BlobRepo:Files" => data_dir.to_string_lossy().into_owned()])
|
||||
let logger = match &config.storage_config.blobstore {
|
||||
BlobConfig::Disabled => {
|
||||
logger.new(o!["BlobConfig:Disabled" => "Disabled in config".to_string()])
|
||||
}
|
||||
RepoType::BlobRocks(ref data_dir) => {
|
||||
setup_repo_dir(&data_dir, create).expect("Setting up rocksdb blobrepo failed");
|
||||
logger.new(o!["BlobRepo:Rocksdb" => data_dir.to_string_lossy().into_owned()])
|
||||
BlobConfig::Files { path } => {
|
||||
setup_repo_dir(path, create).expect("Setting up file blobrepo failed");
|
||||
logger.new(o!["BlobConfig:Files" => path.to_string_lossy().into_owned()])
|
||||
}
|
||||
RepoType::BlobSqlite(ref data_dir) => {
|
||||
setup_repo_dir(&data_dir, create).expect("Setting up sqlite blobrepo failed");
|
||||
logger.new(o!["BlobRepo:Sqlite" => data_dir.to_string_lossy().into_owned()])
|
||||
BlobConfig::Rocks { path } => {
|
||||
setup_repo_dir(path, create).expect("Setting up rocksdb blobrepo failed");
|
||||
logger.new(o!["BlobConfig:Rocksdb" => path.to_string_lossy().into_owned()])
|
||||
}
|
||||
BlobConfig::Sqlite { path } => {
|
||||
setup_repo_dir(path, create).expect("Setting up sqlite blobrepo failed");
|
||||
logger.new(o!["BlobConfig:Sqlite" => path.to_string_lossy().into_owned()])
|
||||
}
|
||||
BlobConfig::Manifold { bucket, prefix } => {
|
||||
logger.new(o!["BlobConfig:Manifold" => format!("{} {}", bucket, prefix)])
|
||||
}
|
||||
BlobConfig::Gluster {
|
||||
tier,
|
||||
export,
|
||||
basepath,
|
||||
} => logger.new(o!["BlobConfig:Gluster" => format!("{} {} {}", tier, export, basepath)]),
|
||||
BlobConfig::Mysql {
|
||||
shard_map,
|
||||
shard_num,
|
||||
} => logger.new(o!["BlobConfig:Mysql" => format!("{} {}", shard_map, shard_num)]),
|
||||
BlobConfig::Multiplexed { blobstores, .. } => {
|
||||
logger.new(o!["BlobConfig:Multiplexed" => format!("{:?}", blobstores)])
|
||||
}
|
||||
RepoType::BlobRemote {
|
||||
ref blobstores_args,
|
||||
..
|
||||
} => logger.new(o!["BlobRepo:Remote" => format!("{:?}", blobstores_args)]),
|
||||
};
|
||||
|
||||
let myrouter_port = parse_myrouter_port(matches);
|
||||
@ -591,7 +610,7 @@ fn open_repo_internal<'a>(
|
||||
.and_then(move |repo_id| {
|
||||
open_blobrepo(
|
||||
logger.clone(),
|
||||
config.repotype.clone(),
|
||||
config.storage_config,
|
||||
repo_id,
|
||||
myrouter_port,
|
||||
config.bookmarks_cache_ttl,
|
||||
@ -600,12 +619,12 @@ fn open_repo_internal<'a>(
|
||||
.boxify()
|
||||
}
|
||||
|
||||
pub fn parse_blobstore_args<'a>(matches: &ArgMatches<'a>) -> RemoteBlobstoreArgs {
|
||||
pub fn parse_blobstore_args<'a>(matches: &ArgMatches<'a>) -> BlobConfig {
|
||||
// The unwraps here are safe because default values have already been provided in mononoke_app
|
||||
// above.
|
||||
match matches.value_of("mysql-blobstore-shardmap") {
|
||||
Some(shardmap) => RemoteBlobstoreArgs::Mysql(MysqlBlobstoreArgs {
|
||||
shardmap: shardmap.to_string(),
|
||||
Some(shardmap) => BlobConfig::Mysql {
|
||||
shard_map: shardmap.to_string(),
|
||||
shard_num: matches
|
||||
.value_of("mysql-blobstore-shard-num")
|
||||
.unwrap()
|
||||
@ -613,11 +632,11 @@ pub fn parse_blobstore_args<'a>(matches: &ArgMatches<'a>) -> RemoteBlobstoreArgs
|
||||
.ok()
|
||||
.and_then(NonZeroUsize::new)
|
||||
.expect("Provided mysql-blobstore-shard-num must be int larger than 0"),
|
||||
}),
|
||||
None => RemoteBlobstoreArgs::Manifold(ManifoldArgs {
|
||||
},
|
||||
None => BlobConfig::Manifold {
|
||||
bucket: matches.value_of("manifold-bucket").unwrap().to_string(),
|
||||
prefix: matches.value_of("manifold-prefix").unwrap().to_string(),
|
||||
}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ use mercurial_types::{
|
||||
Changeset, HgChangesetEnvelope, HgChangesetId, HgFileEnvelope, HgManifestEnvelope,
|
||||
HgManifestId, MPath, MPathElement, Manifest,
|
||||
};
|
||||
use metaconfig_types::RemoteBlobstoreArgs;
|
||||
use metaconfig_types::BlobConfig;
|
||||
use mononoke_hg_sync_job_helper_lib::save_bundle_to_file;
|
||||
use mononoke_types::{
|
||||
BlobstoreBytes, BlobstoreValue, BonsaiChangeset, ChangesetId, DateTime, FileChange,
|
||||
@ -1168,8 +1168,8 @@ fn subcommand_blobstore_fetch(
|
||||
let blobstore_args = args::parse_blobstore_args(&matches);
|
||||
let repo_id = try_boxfuture!(args::get_repo_id(&matches));
|
||||
|
||||
let manifold_args = match blobstore_args {
|
||||
RemoteBlobstoreArgs::Manifold(args) => args,
|
||||
let (bucket, prefix) = match blobstore_args {
|
||||
BlobConfig::Manifold { bucket, prefix } => (bucket, prefix),
|
||||
bad => panic!("Unsupported blobstore: {:#?}", bad),
|
||||
};
|
||||
|
||||
@ -1179,7 +1179,7 @@ fn subcommand_blobstore_fetch(
|
||||
let use_memcache = sub_m.value_of("use-memcache").map(|val| val.to_string());
|
||||
let no_prefix = sub_m.is_present("no-prefix");
|
||||
|
||||
let blobstore = ManifoldBlob::new_with_prefix(&manifold_args.bucket, &manifold_args.prefix);
|
||||
let blobstore = ManifoldBlob::new_with_prefix(&bucket, &prefix);
|
||||
|
||||
match (use_memcache, no_prefix) {
|
||||
(None, false) => {
|
||||
@ -1188,14 +1188,12 @@ fn subcommand_blobstore_fetch(
|
||||
}
|
||||
(None, true) => blobstore.get(ctx, key.clone()).boxify(),
|
||||
(Some(mode), false) => {
|
||||
let blobstore =
|
||||
new_memcache_blobstore(blobstore, "manifold", manifold_args.bucket).unwrap();
|
||||
let blobstore = new_memcache_blobstore(blobstore, "manifold", bucket).unwrap();
|
||||
let blobstore = PrefixBlobstore::new(blobstore, repo_id.prefix());
|
||||
get_cache(ctx.clone(), &blobstore, key.clone(), mode)
|
||||
}
|
||||
(Some(mode), true) => {
|
||||
let blobstore =
|
||||
new_memcache_blobstore(blobstore, "manifold", manifold_args.bucket).unwrap();
|
||||
let blobstore = new_memcache_blobstore(blobstore, "manifold", bucket).unwrap();
|
||||
get_cache(ctx.clone(), &blobstore, key.clone(), mode)
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ use cloned::cloned;
|
||||
use cmdlib::args;
|
||||
use context::CoreContext;
|
||||
use dummy::{DummyBlobstore, DummyBlobstoreSyncQueue};
|
||||
use failure_ext::{bail_msg, ensure_msg, err_msg, prelude::*};
|
||||
use failure_ext::{bail_msg, err_msg, prelude::*};
|
||||
use futures::{
|
||||
future::{join_all, loop_fn, ok, Loop},
|
||||
prelude::*,
|
||||
@ -27,7 +27,7 @@ use futures_ext::{spawn_future, BoxFuture, FutureExt};
|
||||
use glusterblob::Glusterblob;
|
||||
use healer::RepoHealer;
|
||||
use manifoldblob::ThriftManifoldBlob;
|
||||
use metaconfig_types::{RemoteBlobstoreArgs, RepoConfig, RepoType};
|
||||
use metaconfig_types::{BlobConfig, MetadataDBConfig, StorageConfig};
|
||||
use mononoke_types::RepositoryId;
|
||||
use prefixblob::PrefixBlobstore;
|
||||
use rate_limiter::RateLimiter;
|
||||
@ -41,22 +41,20 @@ use tokio_timer::Delay;
|
||||
|
||||
const MAX_ALLOWED_REPLICATION_LAG_SECS: usize = 5;
|
||||
|
||||
fn maybe_schedule_healer_for_repo(
|
||||
fn maybe_schedule_healer_for_storage(
|
||||
dry_run: bool,
|
||||
blobstore_sync_queue_limit: usize,
|
||||
logger: Logger,
|
||||
rate_limiter: RateLimiter,
|
||||
config: RepoConfig,
|
||||
repo_id: RepositoryId,
|
||||
storage_config: StorageConfig,
|
||||
myrouter_port: u16,
|
||||
replication_lag_db_regions: Vec<String>,
|
||||
) -> Result<BoxFuture<(), Error>> {
|
||||
ensure_msg!(config.enabled, "Repo is disabled");
|
||||
|
||||
let (db_address, blobstores_args) = match config.repotype {
|
||||
RepoType::BlobRemote {
|
||||
ref db_address,
|
||||
blobstores_args: RemoteBlobstoreArgs::Multiplexed { ref blobstores, .. },
|
||||
..
|
||||
let (db_address, blobstores_args) = match &storage_config {
|
||||
StorageConfig {
|
||||
dbconfig: MetadataDBConfig::Mysql { db_address, .. },
|
||||
blobstore: BlobConfig::Multiplexed { blobstores, .. },
|
||||
} => (db_address.clone(), blobstores.clone()),
|
||||
_ => bail_msg!("Repo doesn't use Multiplexed blobstore"),
|
||||
};
|
||||
@ -65,32 +63,36 @@ fn maybe_schedule_healer_for_repo(
|
||||
let mut blobstores = HashMap::new();
|
||||
for (id, args) in blobstores_args.into_iter() {
|
||||
match args {
|
||||
RemoteBlobstoreArgs::Manifold(args) => {
|
||||
let blobstore = ThriftManifoldBlob::new(args.bucket)
|
||||
BlobConfig::Manifold { bucket, prefix } => {
|
||||
let blobstore = ThriftManifoldBlob::new(bucket)
|
||||
.chain_err("While opening ThriftManifoldBlob")?;
|
||||
let blobstore =
|
||||
PrefixBlobstore::new(blobstore, format!("flat/{}", args.prefix));
|
||||
let blobstore = PrefixBlobstore::new(blobstore, format!("flat/{}", prefix));
|
||||
let blobstore: Arc<dyn Blobstore> = Arc::new(blobstore);
|
||||
blobstores.insert(id, ok(blobstore).boxify());
|
||||
}
|
||||
RemoteBlobstoreArgs::Gluster(args) => {
|
||||
let blobstore = Glusterblob::with_smc(args.tier, args.export, args.basepath)
|
||||
BlobConfig::Gluster {
|
||||
tier,
|
||||
export,
|
||||
basepath,
|
||||
} => {
|
||||
let blobstore = Glusterblob::with_smc(tier, export, basepath)
|
||||
.map(|blobstore| -> Arc<dyn Blobstore> { Arc::new(blobstore) })
|
||||
.boxify();
|
||||
blobstores.insert(id, blobstore);
|
||||
}
|
||||
RemoteBlobstoreArgs::Mysql(args) => {
|
||||
BlobConfig::Mysql {
|
||||
shard_map,
|
||||
shard_num,
|
||||
} => {
|
||||
let blobstore: Arc<dyn Blobstore> = Arc::new(Sqlblob::with_myrouter(
|
||||
RepositoryId::new(config.repoid),
|
||||
args.shardmap,
|
||||
repo_id,
|
||||
shard_map,
|
||||
myrouter_port,
|
||||
args.shard_num,
|
||||
shard_num,
|
||||
)?);
|
||||
blobstores.insert(id, ok(blobstore).boxify());
|
||||
}
|
||||
RemoteBlobstoreArgs::Multiplexed { .. } => {
|
||||
bail_msg!("Unsupported nested Multiplexed blobstore")
|
||||
}
|
||||
unsupported => bail_msg!("Unsupported blobstore type {:?}", unsupported),
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,7 +148,7 @@ fn maybe_schedule_healer_for_repo(
|
||||
let repo_healer = RepoHealer::new(
|
||||
logger.clone(),
|
||||
blobstore_sync_queue_limit,
|
||||
RepositoryId::new(config.repoid),
|
||||
repo_id,
|
||||
rate_limiter,
|
||||
sync_queue,
|
||||
Arc::new(blobstores),
|
||||
@ -281,12 +283,13 @@ fn main() -> Result<()> {
|
||||
"repo" => format!("{} ({})", name, config.repoid),
|
||||
));
|
||||
|
||||
let scheduled = maybe_schedule_healer_for_repo(
|
||||
let scheduled = maybe_schedule_healer_for_storage(
|
||||
dry_run,
|
||||
blobstore_sync_queue_limit,
|
||||
logger.clone(),
|
||||
rate_limiter.clone(),
|
||||
config,
|
||||
RepositoryId::new(config.repoid),
|
||||
config.storage_config,
|
||||
myrouter_port,
|
||||
matches
|
||||
.value_of("db-regions")
|
||||
|
@ -4,8 +4,7 @@
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
|
||||
use clap::Arg;
|
||||
use cloned::cloned;
|
||||
@ -21,13 +20,19 @@ use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry, SqlBlobs
|
||||
use cmdlib::args;
|
||||
use context::CoreContext;
|
||||
use manifoldblob::{ManifoldRange, ThriftManifoldBlob};
|
||||
use metaconfig_types::{BlobstoreId, ManifoldArgs, RemoteBlobstoreArgs, RepoType};
|
||||
use metaconfig_types::{BlobConfig, BlobstoreId, MetadataDBConfig, StorageConfig};
|
||||
use mononoke_types::{BlobstoreBytes, DateTime, RepositoryId};
|
||||
use sql_ext::SqlConstructors;
|
||||
|
||||
/// Save manifold continuation token each once per `PRESERVE_STATE_RATIO` entries
|
||||
const PRESERVE_STATE_RATIO: usize = 10_000;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ManifoldArgs {
|
||||
bucket: String,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
/// Configuration options
|
||||
#[derive(Debug)]
|
||||
struct Config {
|
||||
@ -182,32 +187,29 @@ fn parse_args() -> Result<Config, Error> {
|
||||
));
|
||||
}
|
||||
|
||||
let (blobstores_args, db_address) = match &repo_config.repotype {
|
||||
RepoType::BlobRemote {
|
||||
blobstores_args,
|
||||
db_address,
|
||||
..
|
||||
} => (blobstores_args, db_address),
|
||||
repo_type => return Err(format_err!("unsupported repotype: {:?}", repo_type)),
|
||||
let (blobstores, db_address) = match &repo_config.storage_config {
|
||||
StorageConfig {
|
||||
dbconfig: MetadataDBConfig::Mysql { db_address, .. },
|
||||
blobstore: BlobConfig::Multiplexed { blobstores, .. },
|
||||
} => (blobstores, db_address),
|
||||
storage => return Err(format_err!("unsupported storage: {:?}", storage)),
|
||||
};
|
||||
let manifold_args = match blobstores_args {
|
||||
RemoteBlobstoreArgs::Multiplexed { blobstores, .. } => blobstores
|
||||
.iter()
|
||||
.filter(|(id, _)| src_blobstore_id == **id)
|
||||
.map(|(_, args)| args)
|
||||
.next()
|
||||
.ok_or(format_err!(
|
||||
"failed to find source blobstore id: {:?}",
|
||||
src_blobstore_id,
|
||||
))
|
||||
.and_then(|args| match args {
|
||||
RemoteBlobstoreArgs::Manifold(args) => Ok(args.clone()),
|
||||
_ => Err(err_msg("source blobstore must be a manifold")),
|
||||
let manifold_args = blobstores
|
||||
.iter()
|
||||
.filter(|(id, _)| src_blobstore_id == *id)
|
||||
.map(|(_, args)| args)
|
||||
.next()
|
||||
.ok_or(format_err!(
|
||||
"failed to find source blobstore id: {:?}",
|
||||
src_blobstore_id,
|
||||
))
|
||||
.and_then(|args| match args {
|
||||
BlobConfig::Manifold { bucket, prefix } => Ok(ManifoldArgs {
|
||||
bucket: bucket.clone(),
|
||||
prefix: prefix.clone(),
|
||||
}),
|
||||
_ => Err(err_msg(
|
||||
"toplevel blobstore expected to be a multiplexed blobstore",
|
||||
)),
|
||||
}?;
|
||||
_ => Err(err_msg("source blobstore must be a manifold")),
|
||||
})?;
|
||||
|
||||
let myrouter_port =
|
||||
args::parse_myrouter_port(&matches).ok_or(err_msg("myrouter-port must be specified"))?;
|
||||
|
@ -89,7 +89,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let blobrepo = open_blobrepo(
|
||||
logger.clone(),
|
||||
config.repotype.clone(),
|
||||
config.storage_config.clone(),
|
||||
RepositoryId::new(config.repoid),
|
||||
myrouter_port,
|
||||
config.bookmarks_cache_ttl,
|
||||
@ -322,14 +322,14 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.long("exclude")
|
||||
.short("e")
|
||||
.help("a comma separated list of changesets to exclude")
|
||||
.takes_value(true)
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("exclude_file")
|
||||
.long("exclude_file")
|
||||
.short("f")
|
||||
.help("a file containing changesets to exclude that is separated by new lines")
|
||||
.takes_value(true)
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("limit")
|
||||
|
@ -23,8 +23,8 @@ use hooks_content_stores::{BlobRepoChangesetStore, BlobRepoFileContentStore};
|
||||
use maplit::{hashmap, hashset};
|
||||
use mercurial_types::{HgChangesetId, MPath};
|
||||
use metaconfig_types::{
|
||||
BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams, HookConfig, HookParams, HookType,
|
||||
RepoConfig, RepoReadOnly, RepoType,
|
||||
BlobConfig, BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams, HookConfig, HookParams,
|
||||
HookType, MetadataDBConfig, RepoConfig, RepoReadOnly, StorageConfig,
|
||||
};
|
||||
use mononoke_types::FileType;
|
||||
use regex::Regex;
|
||||
@ -1188,7 +1188,13 @@ fn to_mpath(string: &str) -> MPath {
|
||||
|
||||
fn default_repo_config() -> RepoConfig {
|
||||
RepoConfig {
|
||||
repotype: RepoType::BlobFiles("whatev".into()),
|
||||
storage_config: StorageConfig {
|
||||
blobstore: BlobConfig::Disabled,
|
||||
dbconfig: MetadataDBConfig::LocalDB {
|
||||
path: "/some/place".into(),
|
||||
},
|
||||
},
|
||||
write_lock_db_address: None,
|
||||
enabled: true,
|
||||
generation_cache_size: 1,
|
||||
repoid: 1,
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
use serde_derive::Deserialize;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::{BTreeMap, HashMap},
|
||||
fs,
|
||||
num::NonZeroUsize,
|
||||
path::{Path, PathBuf},
|
||||
@ -21,10 +21,10 @@ use crate::errors::*;
|
||||
use bookmarks::Bookmark;
|
||||
use failure_ext::ResultExt;
|
||||
use metaconfig_types::{
|
||||
BlobstoreId, BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams, CacheWarmupParams,
|
||||
CommonConfig, GlusterArgs, HookBypass, HookConfig, HookManagerParams, HookParams, HookType,
|
||||
LfsParams, ManifoldArgs, MysqlBlobstoreArgs, PushrebaseParams, RemoteBlobstoreArgs, RepoConfig,
|
||||
RepoReadOnly, RepoType, ShardedFilenodesParams, WhitelistEntry,
|
||||
BlobConfig, BlobstoreId, BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams,
|
||||
CacheWarmupParams, CommonConfig, HookBypass, HookConfig, HookManagerParams, HookParams,
|
||||
HookType, LfsParams, MetadataDBConfig, PushrebaseParams, RepoConfig, RepoReadOnly,
|
||||
ShardedFilenodesParams, StorageConfig, WhitelistEntry,
|
||||
};
|
||||
use regex::Regex;
|
||||
use toml;
|
||||
@ -46,12 +46,16 @@ pub struct RepoConfigs {
|
||||
|
||||
impl RepoConfigs {
|
||||
/// Read repo configs
|
||||
pub fn read_configs<P: AsRef<Path>>(config_path: P) -> Result<Self> {
|
||||
let repos_dir = config_path.as_ref().join("repos");
|
||||
pub fn read_configs(config_path: impl AsRef<Path>) -> Result<Self> {
|
||||
let config_path = config_path.as_ref();
|
||||
|
||||
let repos_dir = config_path.join("repos");
|
||||
if !repos_dir.is_dir() {
|
||||
return Err(
|
||||
ErrorKind::InvalidFileStructure("expected 'repos' directory".into()).into(),
|
||||
);
|
||||
return Err(ErrorKind::InvalidFileStructure(format!(
|
||||
"expected 'repos' directory under {}",
|
||||
config_path.display()
|
||||
))
|
||||
.into());
|
||||
}
|
||||
let mut repo_configs = HashMap::new();
|
||||
for entry in repos_dir.read_dir()? {
|
||||
@ -59,13 +63,13 @@ impl RepoConfigs {
|
||||
let dir_path = entry.path();
|
||||
if dir_path.is_dir() {
|
||||
let (name, config) =
|
||||
RepoConfigs::read_single_repo_config(&dir_path, config_path.as_ref())
|
||||
RepoConfigs::read_single_repo_config(&dir_path, config_path)
|
||||
.context(format!("while opening config for {:?} repo", dir_path))?;
|
||||
repo_configs.insert(name, config);
|
||||
}
|
||||
}
|
||||
|
||||
let common_dir = config_path.as_ref().join("common");
|
||||
let common_dir = config_path.join("common");
|
||||
let maybe_common_config = if common_dir.is_dir() {
|
||||
Self::read_common_config(&common_dir)?
|
||||
} else {
|
||||
@ -279,10 +283,32 @@ impl RepoConfigs {
|
||||
})
|
||||
}
|
||||
|
||||
let repotype = match this.repotype {
|
||||
RawRepoType::Files => RepoType::BlobFiles(get_path(&this)?),
|
||||
RawRepoType::BlobRocks => RepoType::BlobRocks(get_path(&this)?),
|
||||
RawRepoType::BlobSqlite => RepoType::BlobSqlite(get_path(&this)?),
|
||||
let storage_config = match this.repotype {
|
||||
RawRepoType::Files => StorageConfig {
|
||||
blobstore: BlobConfig::Files {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
dbconfig: MetadataDBConfig::LocalDB {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
},
|
||||
RawRepoType::BlobRocks => StorageConfig {
|
||||
blobstore: BlobConfig::Rocks {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
dbconfig: MetadataDBConfig::LocalDB {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
},
|
||||
RawRepoType::BlobSqlite => StorageConfig {
|
||||
blobstore: BlobConfig::Sqlite {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
dbconfig: MetadataDBConfig::LocalDB {
|
||||
path: get_path(&this)?,
|
||||
},
|
||||
},
|
||||
|
||||
RawRepoType::BlobRemote => {
|
||||
let remote_blobstores = this.remote_blobstore;
|
||||
if remote_blobstores.is_empty() {
|
||||
@ -295,9 +321,7 @@ impl RepoConfigs {
|
||||
"xdb tier was not specified".into(),
|
||||
))?;
|
||||
|
||||
let write_lock_db_address = this.write_lock_db_address;
|
||||
|
||||
let mut blobstores = HashMap::new();
|
||||
let mut blobstores = BTreeMap::new();
|
||||
for blobstore in remote_blobstores {
|
||||
let args = match blobstore.blobstore_type {
|
||||
RawBlobstoreType::Manifold => {
|
||||
@ -305,11 +329,10 @@ impl RepoConfigs {
|
||||
blobstore.manifold_bucket.ok_or(ErrorKind::InvalidConfig(
|
||||
"manifold bucket must be specified".into(),
|
||||
))?;
|
||||
let manifold_args = ManifoldArgs {
|
||||
BlobConfig::Manifold {
|
||||
bucket: manifold_bucket,
|
||||
prefix: blobstore.manifold_prefix.unwrap_or("".into()),
|
||||
};
|
||||
RemoteBlobstoreArgs::Manifold(manifold_args)
|
||||
}
|
||||
}
|
||||
RawBlobstoreType::Gluster => {
|
||||
let tier = blobstore.gluster_tier.ok_or(ErrorKind::InvalidConfig(
|
||||
@ -322,11 +345,11 @@ impl RepoConfigs {
|
||||
blobstore.gluster_basepath.ok_or(ErrorKind::InvalidConfig(
|
||||
"gluster basepath must be specified".into(),
|
||||
))?;
|
||||
RemoteBlobstoreArgs::Gluster(GlusterArgs {
|
||||
BlobConfig::Gluster {
|
||||
tier,
|
||||
export,
|
||||
basepath,
|
||||
})
|
||||
}
|
||||
}
|
||||
RawBlobstoreType::Mysql => {
|
||||
let shardmap = blobstore.mysql_shardmap.ok_or(
|
||||
@ -346,10 +369,10 @@ impl RepoConfigs {
|
||||
than 0"
|
||||
.into(),
|
||||
))?;
|
||||
RemoteBlobstoreArgs::Mysql(MysqlBlobstoreArgs {
|
||||
shardmap,
|
||||
BlobConfig::Mysql {
|
||||
shard_map: shardmap,
|
||||
shard_num,
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
if blobstores.insert(blobstore.blobstore_id, args).is_some() {
|
||||
@ -364,9 +387,9 @@ impl RepoConfigs {
|
||||
let (_, args) = blobstores.into_iter().next().unwrap();
|
||||
args
|
||||
} else {
|
||||
RemoteBlobstoreArgs::Multiplexed {
|
||||
BlobConfig::Multiplexed {
|
||||
scuba_table: this.blobstore_scuba_table,
|
||||
blobstores,
|
||||
blobstores: blobstores.into_iter().collect(),
|
||||
}
|
||||
};
|
||||
|
||||
@ -388,11 +411,12 @@ impl RepoConfigs {
|
||||
})
|
||||
.transpose();
|
||||
|
||||
RepoType::BlobRemote {
|
||||
blobstores_args,
|
||||
db_address,
|
||||
sharded_filenodes: sharded_filenodes?,
|
||||
write_lock_db_address,
|
||||
StorageConfig {
|
||||
blobstore: blobstores_args,
|
||||
dbconfig: MetadataDBConfig::Mysql {
|
||||
db_address,
|
||||
sharded_filenodes: sharded_filenodes?,
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -486,7 +510,8 @@ impl RepoConfigs {
|
||||
let skiplist_index_blobstore_key = this.skiplist_index_blobstore_key;
|
||||
Ok(RepoConfig {
|
||||
enabled,
|
||||
repotype,
|
||||
storage_config,
|
||||
write_lock_db_address: this.write_lock_db_address.clone(),
|
||||
generation_cache_size,
|
||||
repoid,
|
||||
scuba_table,
|
||||
@ -811,43 +836,41 @@ mod test {
|
||||
|
||||
let repoconfig = RepoConfigs::read_configs(tmp_dir.path()).expect("failed to read configs");
|
||||
|
||||
let first_manifold_args = ManifoldArgs {
|
||||
bucket: "bucket".into(),
|
||||
prefix: "".into(),
|
||||
};
|
||||
let second_gluster_args = GlusterArgs {
|
||||
tier: "mononoke.gluster.tier".into(),
|
||||
export: "groot".into(),
|
||||
basepath: "mononoke/glusterblob-test".into(),
|
||||
};
|
||||
let mut blobstores = HashMap::new();
|
||||
blobstores.insert(
|
||||
BlobstoreId::new(0),
|
||||
RemoteBlobstoreArgs::Manifold(first_manifold_args),
|
||||
);
|
||||
blobstores.insert(
|
||||
BlobstoreId::new(1),
|
||||
RemoteBlobstoreArgs::Gluster(second_gluster_args),
|
||||
);
|
||||
let blobstores_args = RemoteBlobstoreArgs::Multiplexed {
|
||||
scuba_table: Some("blobstore_scuba_table".to_string()),
|
||||
blobstores,
|
||||
};
|
||||
|
||||
let mut repos = HashMap::new();
|
||||
repos.insert(
|
||||
"fbsource".to_string(),
|
||||
RepoConfig {
|
||||
enabled: true,
|
||||
repotype: RepoType::BlobRemote {
|
||||
db_address: "db_address".into(),
|
||||
blobstores_args,
|
||||
sharded_filenodes: Some(ShardedFilenodesParams {
|
||||
shard_map: "db_address_shards".into(),
|
||||
shard_num: NonZeroUsize::new(123).unwrap(),
|
||||
}),
|
||||
write_lock_db_address: Some("write_lock_db_address".into()),
|
||||
storage_config: StorageConfig {
|
||||
dbconfig: MetadataDBConfig::Mysql {
|
||||
db_address: "db_address".into(),
|
||||
sharded_filenodes: Some(ShardedFilenodesParams {
|
||||
shard_map: "db_address_shards".into(),
|
||||
shard_num: NonZeroUsize::new(123).unwrap(),
|
||||
}),
|
||||
},
|
||||
blobstore: BlobConfig::Multiplexed {
|
||||
scuba_table: Some("blobstore_scuba_table".to_string()),
|
||||
blobstores: vec![
|
||||
(
|
||||
BlobstoreId::new(0),
|
||||
BlobConfig::Manifold {
|
||||
bucket: "bucket".into(),
|
||||
prefix: String::new(),
|
||||
},
|
||||
),
|
||||
(
|
||||
BlobstoreId::new(1),
|
||||
BlobConfig::Gluster {
|
||||
tier: "mononoke.gluster.tier".into(),
|
||||
export: "groot".into(),
|
||||
basepath: "mononoke/glusterblob-test".into(),
|
||||
},
|
||||
),
|
||||
],
|
||||
},
|
||||
},
|
||||
write_lock_db_address: Some("write_lock_db_address".into()),
|
||||
generation_cache_size: 1024 * 1024,
|
||||
repoid: 0,
|
||||
scuba_table: Some("scuba_table".to_string()),
|
||||
@ -944,7 +967,15 @@ mod test {
|
||||
"www".to_string(),
|
||||
RepoConfig {
|
||||
enabled: true,
|
||||
repotype: RepoType::BlobFiles("/tmp/www".into()),
|
||||
storage_config: StorageConfig {
|
||||
dbconfig: MetadataDBConfig::LocalDB {
|
||||
path: "/tmp/www".into(),
|
||||
},
|
||||
blobstore: BlobConfig::Files {
|
||||
path: "/tmp/www".into(),
|
||||
},
|
||||
},
|
||||
write_lock_db_address: None,
|
||||
generation_cache_size: 10 * 1024 * 1024,
|
||||
repoid: 1,
|
||||
scuba_table: Some("scuba_table".to_string()),
|
||||
|
@ -10,6 +10,8 @@
|
||||
#![deny(missing_docs)]
|
||||
#![deny(warnings)]
|
||||
|
||||
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, str, sync::Arc, time::Duration};
|
||||
|
||||
use bookmarks::Bookmark;
|
||||
use regex::Regex;
|
||||
use scuba::ScubaValue;
|
||||
@ -19,41 +21,6 @@ use sql::mysql_async::{
|
||||
prelude::{ConvIr, FromValue},
|
||||
FromValueError, Value,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Arguments for setting up a Manifold blobstore.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ManifoldArgs {
|
||||
/// Bucket of the backing Manifold blobstore to connect to
|
||||
pub bucket: String,
|
||||
/// Prefix to be prepended to all the keys. In prod it should be ""
|
||||
pub prefix: String,
|
||||
}
|
||||
|
||||
/// Arguments for settings up a Gluster blobstore
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct GlusterArgs {
|
||||
/// Gluster tier
|
||||
pub tier: String,
|
||||
/// Nfs export name
|
||||
pub export: String,
|
||||
/// Content prefix path
|
||||
pub basepath: String,
|
||||
}
|
||||
|
||||
/// Arguments for setting up a Mysql blobstore.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct MysqlBlobstoreArgs {
|
||||
/// Name of the Mysql shardmap to use
|
||||
pub shardmap: String,
|
||||
/// Number of shards in the Mysql shardmap
|
||||
pub shard_num: NonZeroUsize,
|
||||
}
|
||||
|
||||
/// Single entry that
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
@ -77,15 +44,18 @@ pub struct CommonConfig {
|
||||
}
|
||||
|
||||
/// Configuration of a single repository
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
#[derive(Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct RepoConfig {
|
||||
/// If false, this repo config is completely ignored.
|
||||
pub enabled: bool,
|
||||
/// Defines the type of repository
|
||||
pub repotype: RepoType,
|
||||
/// Persistent storage for this repo
|
||||
pub storage_config: StorageConfig,
|
||||
/// Address of the SQL database used to lock writes to a repo.
|
||||
pub write_lock_db_address: Option<String>,
|
||||
/// How large a cache to use (in bytes) for RepoGenCache derived information
|
||||
pub generation_cache_size: usize,
|
||||
/// Numerical repo id of the repo.
|
||||
// XXX Use RepositoryId?
|
||||
pub repoid: i32,
|
||||
/// Scuba table for logging performance of operations
|
||||
pub scuba_table: Option<String>,
|
||||
@ -119,10 +89,7 @@ pub struct RepoConfig {
|
||||
impl RepoConfig {
|
||||
/// Returns a db address that is referenced in this config or None if there is none
|
||||
pub fn get_db_address(&self) -> Option<&str> {
|
||||
match self.repotype {
|
||||
RepoType::BlobRemote { ref db_address, .. } => Some(&db_address),
|
||||
_ => None,
|
||||
}
|
||||
self.storage_config.dbconfig.get_db_address()
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,6 +102,12 @@ pub enum RepoReadOnly {
|
||||
ReadWrite,
|
||||
}
|
||||
|
||||
impl Default for RepoReadOnly {
|
||||
fn default() -> Self {
|
||||
RepoReadOnly::ReadWrite
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration of warming up the Mononoke cache. This warmup happens on startup
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct CacheWarmupParams {
|
||||
@ -378,36 +351,6 @@ impl Default for LfsParams {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remote blobstore arguments
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum RemoteBlobstoreArgs {
|
||||
/// Manifold arguments
|
||||
Manifold(ManifoldArgs),
|
||||
/// Gluster blobstore arguemnts
|
||||
Gluster(GlusterArgs),
|
||||
/// Mysql blobstore arguments
|
||||
Mysql(MysqlBlobstoreArgs),
|
||||
/// Multiplexed
|
||||
Multiplexed {
|
||||
/// Scuba table for tracking performance of blobstore operations
|
||||
scuba_table: Option<String>,
|
||||
/// Multiplexed blobstores
|
||||
blobstores: HashMap<BlobstoreId, RemoteBlobstoreArgs>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<ManifoldArgs> for RemoteBlobstoreArgs {
|
||||
fn from(manifold_args: ManifoldArgs) -> Self {
|
||||
RemoteBlobstoreArgs::Manifold(manifold_args)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GlusterArgs> for RemoteBlobstoreArgs {
|
||||
fn from(gluster_args: GlusterArgs) -> Self {
|
||||
RemoteBlobstoreArgs::Gluster(gluster_args)
|
||||
}
|
||||
}
|
||||
|
||||
/// Id used to discriminate diffirent underlying blobstore instances
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Deserialize)]
|
||||
pub struct BlobstoreId(u64);
|
||||
@ -447,31 +390,142 @@ impl From<BlobstoreId> for ScubaValue {
|
||||
}
|
||||
}
|
||||
|
||||
/// Types of repositories supported
|
||||
/// Define storage needed for repo.
|
||||
/// Storage consists of a blobstore and some kind of SQL DB for metadata. The configurations
|
||||
/// can be broadly classified as "local" and "remote". "Local" is primarily for testing, and is
|
||||
/// only suitable for single hosts. "Remote" is durable storage which can be shared by multiple
|
||||
/// BlobRepo instances on different hosts.
|
||||
#[derive(Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct StorageConfig {
|
||||
/// Blobstores. If the blobstore has a BlobstoreId then it can be used as a component of
|
||||
/// a Multiplexed blobstore.
|
||||
pub blobstore: BlobConfig,
|
||||
/// Metadata DB
|
||||
pub dbconfig: MetadataDBConfig,
|
||||
}
|
||||
|
||||
/// Configuration for a blobstore
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum RepoType {
|
||||
/// Blob repository with path pointing to on-disk files with data. The files are stored in a
|
||||
///
|
||||
///
|
||||
pub enum BlobConfig {
|
||||
/// Administratively disabled blobstore
|
||||
Disabled,
|
||||
/// Blob repository with path pointing to on-disk files with data. Blobs are stored in
|
||||
/// separate files.
|
||||
/// NOTE: this is read-only and for development/testing only. Production uses will break things.
|
||||
BlobFiles(PathBuf),
|
||||
Files {
|
||||
/// Path to directory containing files
|
||||
path: PathBuf,
|
||||
},
|
||||
/// Blob repository with path pointing to on-disk files with data. The files are stored in a
|
||||
/// RocksDb database
|
||||
BlobRocks(PathBuf),
|
||||
Rocks {
|
||||
/// Path to RocksDB directory
|
||||
path: PathBuf,
|
||||
},
|
||||
/// Blob repository with path pointing to on-disk files with data. The files are stored in a
|
||||
/// Sqlite database
|
||||
BlobSqlite(PathBuf),
|
||||
/// Blob repository with path pointing to the directory where a server socket is going to be.
|
||||
BlobRemote {
|
||||
/// Remote blobstores arguments
|
||||
blobstores_args: RemoteBlobstoreArgs,
|
||||
Sqlite {
|
||||
/// Path to SQLite DB
|
||||
path: PathBuf,
|
||||
},
|
||||
/// Store in a manifold bucket
|
||||
Manifold {
|
||||
/// Bucket of the backing Manifold blobstore to connect to
|
||||
bucket: String,
|
||||
/// Prefix to be prepended to all the keys. In prod it should be ""
|
||||
prefix: String,
|
||||
},
|
||||
/// Store in a gluster mount
|
||||
Gluster {
|
||||
/// Gluster tier
|
||||
tier: String,
|
||||
/// Nfs export name
|
||||
export: String,
|
||||
/// Content prefix path
|
||||
basepath: String,
|
||||
},
|
||||
/// Store in a sharded Mysql
|
||||
Mysql {
|
||||
/// Name of the Mysql shardmap to use
|
||||
shard_map: String,
|
||||
/// Number of shards in the Mysql shardmap
|
||||
shard_num: NonZeroUsize,
|
||||
},
|
||||
/// Multiplex across multiple blobstores for redundancy
|
||||
Multiplexed {
|
||||
/// A scuba table I guess
|
||||
scuba_table: Option<String>,
|
||||
/// Set of blobstores being multiplexed over
|
||||
blobstores: Vec<(BlobstoreId, BlobConfig)>,
|
||||
},
|
||||
}
|
||||
|
||||
impl BlobConfig {
|
||||
/// Return true if the blobstore is strictly local. Multiplexed blobstores are local iff
|
||||
/// all their components are.
|
||||
pub fn is_local(&self) -> bool {
|
||||
use BlobConfig::*;
|
||||
|
||||
match self {
|
||||
Disabled | Files { .. } | Rocks { .. } | Sqlite { .. } => true,
|
||||
Manifold { .. } | Gluster { .. } | Mysql { .. } => false,
|
||||
Multiplexed { blobstores, .. } => blobstores
|
||||
.iter()
|
||||
.map(|(_, config)| config)
|
||||
.all(BlobConfig::is_local),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for BlobConfig {
|
||||
fn default() -> Self {
|
||||
BlobConfig::Disabled
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for the Metadata DB
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum MetadataDBConfig {
|
||||
/// Remove MySQL DB
|
||||
Mysql {
|
||||
/// Identifies the SQL database to connect to.
|
||||
db_address: String,
|
||||
/// If present, sharding configuration for filenodes.
|
||||
sharded_filenodes: Option<ShardedFilenodesParams>,
|
||||
/// Address of the SQL database used to lock writes to a repo.
|
||||
write_lock_db_address: Option<String>,
|
||||
},
|
||||
/// Local SQLite dbs
|
||||
LocalDB {
|
||||
/// Path to directory of sqlite dbs
|
||||
path: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for MetadataDBConfig {
|
||||
fn default() -> Self {
|
||||
MetadataDBConfig::LocalDB {
|
||||
path: PathBuf::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataDBConfig {
|
||||
/// Return true if this is a local on-disk DB.
|
||||
pub fn is_local(&self) -> bool {
|
||||
match self {
|
||||
MetadataDBConfig::LocalDB { .. } => true,
|
||||
MetadataDBConfig::Mysql { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return address we should connect to for a remote DB
|
||||
/// (Assumed to be Mysql)
|
||||
pub fn get_db_address(&self) -> Option<&str> {
|
||||
if let MetadataDBConfig::Mysql { db_address, .. } = self {
|
||||
Some(db_address.as_str())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Params fro the bunle2 replay
|
||||
|
@ -22,7 +22,7 @@ use cache_warmup::cache_warmup;
|
||||
use context::CoreContext;
|
||||
use hooks::{hook_loader::load_hooks, HookManager};
|
||||
use hooks_content_stores::{BlobRepoChangesetStore, BlobRepoFileContentStore};
|
||||
use metaconfig_types::{RepoConfig, RepoType};
|
||||
use metaconfig_types::{MetadataDBConfig, RepoConfig, StorageConfig};
|
||||
use mononoke_types::RepositoryId;
|
||||
use phases::{CachingHintPhases, HintPhases, Phases, SqlConstructors, SqlPhases};
|
||||
use reachabilityindex::LeastCommonAncestorsHint;
|
||||
@ -61,11 +61,11 @@ pub fn repo_handlers(
|
||||
.map(|(reponame, config)| {
|
||||
info!(
|
||||
root_log,
|
||||
"Start warming for repo {}, type {:?}", reponame, config.repotype
|
||||
"Start warming for repo {}, type {:?}", reponame, config.storage_config.blobstore
|
||||
);
|
||||
// TODO(T37478150, luk): this is not a test use case, need to address this later
|
||||
let ctx = CoreContext::test_mock();
|
||||
let ensure_myrouter_ready = match config.get_db_address() {
|
||||
let ensure_myrouter_ready = match config.storage_config.dbconfig.get_db_address() {
|
||||
None => future::ok(()).left_future(),
|
||||
Some(db_address) => {
|
||||
let myrouter_port = try_boxfuture!(myrouter_port.ok_or_else(|| format_err!(
|
||||
@ -84,7 +84,7 @@ pub fn repo_handlers(
|
||||
let repoid = RepositoryId::new(config.repoid);
|
||||
open_blobrepo(
|
||||
logger.clone(),
|
||||
config.repotype.clone(),
|
||||
config.storage_config.clone(),
|
||||
repoid,
|
||||
myrouter_port,
|
||||
config.bookmarks_cache_ttl,
|
||||
@ -106,29 +106,28 @@ pub fn repo_handlers(
|
||||
info!(root_log, "Loading hooks");
|
||||
try_boxfuture!(load_hooks(&mut hook_manager, config.clone()));
|
||||
|
||||
let streaming_clone = match config.repotype {
|
||||
RepoType::BlobRemote { ref db_address, .. } => {
|
||||
let streaming_clone =
|
||||
if let Some(db_address) = config.storage_config.dbconfig.get_db_address() {
|
||||
Some(try_boxfuture!(streaming_clone(
|
||||
blobrepo.clone(),
|
||||
&db_address,
|
||||
myrouter_port.expect("myrouter_port not provided for BlobRemote repo"),
|
||||
repoid
|
||||
)))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let read_write_fetcher = match &config.repotype {
|
||||
RepoType::BlobRemote {
|
||||
write_lock_db_address: Some(addr),
|
||||
..
|
||||
} => RepoReadWriteFetcher::with_myrouter(
|
||||
// XXX Fixme - put write_lock_db_address into storage_config.dbconfig?
|
||||
let read_write_fetcher = if let Some(addr) = config.write_lock_db_address {
|
||||
RepoReadWriteFetcher::with_myrouter(
|
||||
config.readonly.clone(),
|
||||
reponame.clone(),
|
||||
addr.clone(),
|
||||
myrouter_port.expect("myrouter_port not provided for BlobRemote repo"),
|
||||
),
|
||||
_ => RepoReadWriteFetcher::new(config.readonly.clone(), reponame.clone()),
|
||||
)
|
||||
} else {
|
||||
RepoReadWriteFetcher::new(config.readonly.clone(), reponame.clone())
|
||||
};
|
||||
|
||||
let repo = MononokeRepo::new(
|
||||
@ -172,7 +171,7 @@ pub fn repo_handlers(
|
||||
};
|
||||
|
||||
let RepoConfig {
|
||||
repotype,
|
||||
storage_config: StorageConfig { dbconfig, .. },
|
||||
cache_warmup: cache_warmup_params,
|
||||
..
|
||||
} = config;
|
||||
@ -196,17 +195,15 @@ pub fn repo_handlers(
|
||||
info!(root_log, "Repo warmup for {} complete", reponame);
|
||||
|
||||
// initialize phases hint from the skip index
|
||||
let phases_hint: Arc<Phases> = match repotype {
|
||||
RepoType::BlobFiles(ref data_dir)
|
||||
| RepoType::BlobRocks(ref data_dir)
|
||||
| RepoType::BlobSqlite(ref data_dir) => {
|
||||
let phases_hint: Arc<Phases> = match dbconfig {
|
||||
MetadataDBConfig::LocalDB { path } => {
|
||||
let storage = Arc::new(
|
||||
SqlPhases::with_sqlite_path(data_dir.join("phases"))
|
||||
SqlPhases::with_sqlite_path(path.join("phases"))
|
||||
.expect("unable to initialize sqlite db for phases"),
|
||||
);
|
||||
Arc::new(HintPhases::new(storage))
|
||||
}
|
||||
RepoType::BlobRemote { ref db_address, .. } => {
|
||||
MetadataDBConfig::Mysql { db_address, .. } => {
|
||||
let storage = Arc::new(SqlPhases::with_myrouter(
|
||||
&db_address,
|
||||
myrouter_port.expect(
|
||||
|
Loading…
Reference in New Issue
Block a user