mononoke/microwave: create repository snapshots for faster cache warmup

Summary:
This introduces a new binary and library that (microwave: it makes warmup
faster..!) that can be used to accelerate cache warmup. The idea is the
microwave binary will run cache warmup and capture things that are loaded
during cache warmup, and commit those to a file.

We can then use that file when starting up a host to get a head start on cache
warmup by injecting all those entries into our local cache before actually
starting cache warmup.

Currently, this only supports filenodes, but that's already a pretty good
improvement. Changesets should be easy to add as well. Blobs might require a
bit more work.

Reviewed By: StanislavGlebik

Differential Revision: D20219905

fbshipit-source-id: 82bb13ca487f82ca53b4a68a90ac5893895a96e9
This commit is contained in:
Thomas Orozco 2020-03-04 03:59:31 -08:00 committed by Facebook Github Bot
parent 5bf9a8dda9
commit f4f96c1100
8 changed files with 549 additions and 1 deletions

View File

@ -272,6 +272,15 @@ impl<F: Filenodes> Filenodes for DelayedFilenodes<F> {
)
.boxify()
}
fn prime_cache(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
filenodes: &[PreparedFilenode],
) {
self.inner.prime_cache(ctx, repo_id, filenodes)
}
}
struct DelayedChangesets<C> {

View File

@ -117,6 +117,8 @@ pub trait Filenodes: Send + Sync {
path: &RepoPath,
repo_id: RepositoryId,
) -> BoxFuture<Vec<FilenodeInfo>, Error>;
fn prime_cache(&self, ctx: &CoreContext, repo_id: RepositoryId, filenodes: &[PreparedFilenode]);
}
#[cfg(test)]

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use cloned::cloned;
use context::CoreContext;
use filenodes::{FilenodeInfo, Filenodes, PreparedFilenode};
use futures::{
channel::mpsc::Sender,
compat::Future01CompatExt,
future::{FutureExt as _, TryFutureExt},
sink::SinkExt,
};
use futures_ext::{BoxFuture, FutureExt};
use mercurial_types::HgFileNodeId;
use mononoke_types::{RepoPath, RepositoryId};
use std::sync::Arc;
#[derive(Clone)]
pub struct MicrowaveFilenodes {
repo_id: RepositoryId,
recorder: Sender<PreparedFilenode>,
inner: Arc<dyn Filenodes>,
}
impl MicrowaveFilenodes {
pub fn new(
repo_id: RepositoryId,
recorder: Sender<PreparedFilenode>,
inner: Arc<dyn Filenodes>,
) -> Self {
Self {
repo_id,
recorder,
inner,
}
}
}
impl Filenodes for MicrowaveFilenodes {
fn add_filenodes(
&self,
ctx: CoreContext,
info: Vec<PreparedFilenode>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
self.inner.add_filenodes(ctx, info, repo_id)
}
fn add_or_replace_filenodes(
&self,
ctx: CoreContext,
info: Vec<PreparedFilenode>,
repo_id: RepositoryId,
) -> BoxFuture<(), Error> {
self.inner.add_or_replace_filenodes(ctx, info, repo_id)
}
fn get_filenode(
&self,
ctx: CoreContext,
path: &RepoPath,
filenode_id: HgFileNodeId,
repo_id: RepositoryId,
) -> BoxFuture<Option<FilenodeInfo>, Error> {
cloned!(self.inner, mut self.recorder, path);
// NOTE: Receiving any other repo_id here would be a programming error, so we block it.
// This wouldn't be on the path of any live traffic, so panicking if this assertion is
// violated is reasonable.
assert_eq!(repo_id, self.repo_id);
async move {
let info = inner
.get_filenode(ctx, &path, filenode_id, repo_id)
.compat()
.await?;
if let Some(ref info) = info {
recorder
.send(PreparedFilenode {
path,
info: info.clone(),
})
.await?;
}
Ok(info)
}
.boxed()
.compat()
.boxify()
}
fn get_all_filenodes_maybe_stale(
&self,
ctx: CoreContext,
path: &RepoPath,
repo_id: RepositoryId,
) -> BoxFuture<Vec<FilenodeInfo>, Error> {
self.inner.get_all_filenodes_maybe_stale(ctx, path, repo_id)
}
fn prime_cache(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
filenodes: &[PreparedFilenode],
) {
self.inner.prime_cache(ctx, repo_id, filenodes)
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
mod filenodes;
use ::filenodes::Filenodes;
use anyhow::{format_err, Error};
use blobrepo::DangerousOverride;
use blobrepo_factory::open_blobrepo;
use clap::{Arg, ArgMatches, SubCommand};
use cloned::cloned;
use cmdlib::{args, monitoring::AliveService};
use context::SessionContainer;
use fbinit::FacebookInit;
use futures::{channel::mpsc, compat::Future01CompatExt, future};
use metaconfig_parser::RepoConfigs;
use metaconfig_types::RepoConfig;
use microwave::{Snapshot, SnapshotLocation};
use slog::{info, o, Logger};
use std::path::Path;
use std::sync::Arc;
use crate::filenodes::MicrowaveFilenodes;
const SUBCOMMAND_LOCAL_PATH: &str = "local-path";
const ARG_LOCAL_PATH: &str = "local-path";
const SUBCOMMAND_BLOBSTORE: &str = "blobstore";
async fn do_main<'a>(
fb: FacebookInit,
matches: &ArgMatches<'a>,
logger: &Logger,
) -> Result<(), Error> {
let mut scuba = args::get_scuba_sample_builder(fb, &matches)?;
scuba.add_common_server_data();
let mysql_options = cmdlib::args::parse_mysql_options(&matches);
let readonly_storage = cmdlib::args::parse_readonly_storage(&matches);
let blobstore_options = cmdlib::args::parse_blobstore_options(&matches);
let caching = cmdlib::args::init_cachelib(fb, &matches, None);
let RepoConfigs { repos, common } = args::read_configs(fb, &matches)?;
let scuba_censored_table = common.scuba_censored_table;
let location = match matches.subcommand() {
(SUBCOMMAND_LOCAL_PATH, Some(sub)) => {
let path = Path::new(sub.value_of_os(ARG_LOCAL_PATH).unwrap());
info!(logger, "Writing to path {}", path.display());
SnapshotLocation::SharedLocalPath(path)
}
(SUBCOMMAND_BLOBSTORE, Some(_)) => SnapshotLocation::Blobstore,
(name, _) => return Err(format_err!("Invalid subcommand: {:?}", name)),
};
let futs = repos
.into_iter()
.map(|(name, config)| {
cloned!(blobstore_options, scuba_censored_table, mut scuba);
async move {
let logger = logger.new(o!("repo" => name.clone()));
let ctx = {
scuba.add("reponame", name);
let session = SessionContainer::new_with_defaults(fb);
session.new_context(logger.clone(), scuba)
};
let (filenodes_sender, filenodes_receiver) = mpsc::channel(1000);
let warmup_ctx = ctx.clone();
let RepoConfig {
storage_config,
repoid,
bookmarks_cache_ttl,
redaction,
filestore,
derived_data_config,
cache_warmup,
..
} = config;
let warmup = async move {
let repo = open_blobrepo(
fb,
storage_config,
repoid,
mysql_options,
caching,
bookmarks_cache_ttl,
redaction,
scuba_censored_table,
filestore,
readonly_storage,
blobstore_options,
logger,
derived_data_config,
)
.compat()
.await?;
let warmup_repo = repo.dangerous_override(|inner| -> Arc<dyn Filenodes> {
Arc::new(MicrowaveFilenodes::new(repoid, filenodes_sender, inner))
});
cache_warmup::cache_warmup(warmup_ctx, warmup_repo, cache_warmup)
.compat()
.await?;
Result::<_, Error>::Ok(repo)
};
let handle = tokio::task::spawn(warmup);
let snapshot = Snapshot::build(filenodes_receiver).await;
// Make sure cache warmup has succeeded before committign this snapshot, and get
// the repo back.
let repo = handle.await??;
snapshot.commit(&ctx, &repo, location).await?;
Result::<_, Error>::Ok(())
}
})
.collect::<Vec<_>>();
future::try_join_all(futs).await?;
Ok(())
}
#[fbinit::main]
fn main(fb: FacebookInit) -> Result<(), Error> {
let app = args::MononokeApp::new("Mononoke Local Replay")
.with_advanced_args_hidden()
.with_fb303_args()
.with_all_repos()
.with_scuba_logging_args()
.build()
.subcommand(
SubCommand::with_name(SUBCOMMAND_LOCAL_PATH)
.about("Write cache priming data to path")
.arg(
Arg::with_name(ARG_LOCAL_PATH)
.takes_value(true)
.required(true),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_BLOBSTORE)
.about("Write cache priming data to the repository blobstore"),
);
let matches = app.get_matches();
let logger = args::init_logging(fb, &matches);
let main = do_main(fb, &matches, &logger);
cmdlib::helpers::block_execute(main, fb, "microwave", &logger, &matches, AliveService)?;
Ok(())
}

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
include "eden/mononoke/mononoke_types/if/mononoke_types_thrift.thrift"
include "eden/mononoke/mercurial/types/if/mercurial_thrift.thrift"
// Code version constant -- update to invalidate saved state.
const i32 CODEVER = 0
struct FilenodeSnapshot {
// Note: required fields are enforced at runtime here (to prevent Thift from
// giving us garbage values and calling those acceptable).
1: optional mononoke_types_thrift.RepoPath path,
2: optional mercurial_thrift.HgNodeHash filenode,
3: optional mercurial_thrift.HgNodeHash p1,
4: optional mercurial_thrift.HgNodeHash p2,
5: optional CopyInfoSnapshot copyfrom,
6: optional mercurial_thrift.HgNodeHash linknode,
}
struct CopyInfoSnapshot {
1: optional mononoke_types_thrift.RepoPath path,
2: optional mercurial_thrift.HgNodeHash filenode,
}
struct RepoSnapshot {
1: optional list<FilenodeSnapshot> filenodes,
}

View File

@ -0,0 +1,199 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use blobrepo::BlobRepo;
use blobstore::Blobstore;
use context::CoreContext;
use fbthrift::compact_protocol;
use filenodes::{FilenodeInfo, PreparedFilenode};
use futures::{
compat::Future01CompatExt,
future,
stream::{Stream, StreamExt},
};
use mercurial_types::{HgChangesetId, HgFileNodeId, HgNodeHash};
use mononoke_types::{BlobstoreBytes, RepoPath, RepositoryId};
use std::path::{Path, PathBuf};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
mod thrift {
pub use microwave_if::*;
}
#[derive(Debug, Copy, Clone)]
pub enum SnapshotLocation<'a> {
SharedLocalPath(&'a Path),
Blobstore,
}
pub struct Snapshot {
snapshot: thrift::RepoSnapshot,
}
impl Snapshot {
pub async fn build<FilenodesStream>(filenodes: FilenodesStream) -> Self
where
FilenodesStream: Stream<Item = PreparedFilenode>,
{
let filenodes = filenodes
.fold(Vec::new(), |mut v, c| {
let PreparedFilenode { path, info } = c;
let t = thrift::FilenodeSnapshot {
path: Some(path.into_thrift()),
filenode: Some(info.filenode.into_nodehash().into_thrift()),
p1: info.p1.map(|p| p.into_nodehash().into_thrift()),
p2: info.p2.map(|p| p.into_nodehash().into_thrift()),
copyfrom: info.copyfrom.map(|copyfrom| thrift::CopyInfoSnapshot {
path: Some(copyfrom.0.into_thrift()),
filenode: Some(copyfrom.1.into_nodehash().into_thrift()),
}),
linknode: Some(info.linknode.into_nodehash().into_thrift()),
};
v.push(t);
future::ready(v)
})
.await;
Snapshot {
snapshot: thrift::RepoSnapshot {
filenodes: Some(filenodes),
},
}
}
pub async fn commit(
&self,
ctx: &CoreContext,
repo: &BlobRepo,
location: SnapshotLocation<'_>,
) -> Result<(), Error> {
let serialized = compact_protocol::serialize(&self.snapshot);
match location {
SnapshotLocation::SharedLocalPath(ref path) => {
let mut file = File::create(snapshot_path(path, repo.get_repoid())).await?;
file.write_all(&serialized).await?;
}
SnapshotLocation::Blobstore => {
repo.blobstore()
.put(
ctx.clone(),
snapshot_name(),
BlobstoreBytes::from_bytes(serialized),
)
.compat()
.await?;
}
};
Ok(())
}
}
fn snapshot_name() -> String {
format!("microwave_snapshot_v{}", thrift::CODEVER)
}
fn snapshot_path(shared_local_path: &Path, repo_id: RepositoryId) -> PathBuf {
let name = format!("{}{}", repo_id.prefix(), snapshot_name());
shared_local_path.join(&name)
}
async fn load_snapshot(
ctx: &CoreContext,
repo: &BlobRepo,
location: SnapshotLocation<'_>,
) -> Result<thrift::RepoSnapshot, Error> {
match location {
SnapshotLocation::SharedLocalPath(ref path) => {
let mut contents = vec![];
let mut snapshot = File::open(snapshot_path(path, repo.get_repoid())).await?;
snapshot.read_to_end(&mut contents).await?;
Ok(compact_protocol::deserialize(&contents)?)
}
SnapshotLocation::Blobstore => {
let bytes = repo
.get_blobstore()
.get(ctx.clone(), snapshot_name())
.compat()
.await?
.ok_or(Error::msg("Snapshot is missing"))?
.into_bytes();
Ok(compact_protocol::deserialize(&bytes)?)
}
}
}
pub async fn prime_cache(
ctx: &CoreContext,
repo: &BlobRepo,
location: SnapshotLocation<'_>,
) -> Result<(), Error> {
let snapshot = load_snapshot(ctx, repo, location).await?;
let filenodes = snapshot.filenodes.ok_or(Error::msg("filenodes missing"))?;
let filenodes = reheat_filenodes(filenodes)?;
repo.get_filenodes()
.prime_cache(ctx, repo.get_repoid(), filenodes.as_ref());
Ok(())
}
fn reheat_filenodes(
filenodes: Vec<thrift::FilenodeSnapshot>,
) -> Result<Vec<PreparedFilenode>, Error> {
filenodes
.into_iter()
.map(|t| {
let thrift::FilenodeSnapshot {
path,
filenode,
p1,
p2,
copyfrom,
linknode,
} = t;
let path = path.ok_or(Error::msg("path missing"))?;
let filenode = filenode.ok_or(Error::msg("filenode missing"))?;
let linknode = linknode.ok_or(Error::msg("linknode missing"))?;
let copyfrom = copyfrom
.map(|t| {
let thrift::CopyInfoSnapshot { path, filenode } = t;
let path = path.ok_or(Error::msg("copy info path missing"))?;
let filenode = filenode.ok_or(Error::msg("copy info filenode missing"))?;
Result::<_, Error>::Ok((
RepoPath::from_thrift(path)?,
HgFileNodeId::new(HgNodeHash::from_thrift(filenode)?),
))
})
.transpose()?;
let filenode = HgFileNodeId::new(HgNodeHash::from_thrift(filenode)?);
Ok(PreparedFilenode {
path: RepoPath::from_thrift(path)?,
info: FilenodeInfo {
filenode,
p1: HgNodeHash::from_thrift_opt(p1)?.map(HgFileNodeId::new),
p2: HgNodeHash::from_thrift_opt(p2)?.map(HgFileNodeId::new),
copyfrom,
linknode: HgChangesetId::new(HgNodeHash::from_thrift(linknode)?),
},
})
})
.collect()
}

View File

@ -133,4 +133,13 @@ impl Filenodes for NewFilenodes {
.compat()
.boxify()
}
fn prime_cache(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
filenodes: &[PreparedFilenode],
) {
self.reader.prime_cache(ctx, repo_id, filenodes);
}
}

View File

@ -25,7 +25,7 @@ use std::time::Duration;
use thiserror::Error as DeriveError;
use tokio_preview::time::timeout;
use filenodes::FilenodeInfo;
use filenodes::{FilenodeInfo, PreparedFilenode};
use crate::connections::{AcquireReason, Connections};
use crate::local_cache::{CacheKey, LocalCache};
@ -275,6 +275,19 @@ impl FilenodesReader {
Ok(res.try_into()?)
}
pub fn prime_cache(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
filenodes: &[PreparedFilenode],
) {
for c in filenodes {
let pwh = PathWithHash::from_repo_path(&c.path);
let key = filenode_cache_key(repo_id, &pwh, &c.info.filenode);
self.local_cache.fill(&key, &(&c.info).into())
}
}
}
#[derive(Copy, Clone)]