add a cache service in Mononoke API Server

Summary: This diff adds `CacheManager` to Mononoke API Server that is responsible to manage caches (memcache currently, and cachelib in the future).

Reviewed By: StanislavGlebik

Differential Revision: D15678253

fbshipit-source-id: 3716c11db1be00d8526f43c4f6b4a2434e6c87c3
This commit is contained in:
Zeyi (Rice) Fan 2019-06-19 18:01:00 -07:00 committed by Facebook Github Bot
parent 305884a4b4
commit ebc387df34
3 changed files with 161 additions and 1 deletions

View File

@ -18,6 +18,7 @@ use slog::{debug, info, Logger};
use metaconfig_parser::RepoConfigs;
use crate::cache::CacheManager;
use crate::errors::ErrorKind;
mod lfs;
@ -33,6 +34,8 @@ pub use self::response::MononokeRepoResponse;
pub struct Mononoke {
repos: HashMap<String, MononokeRepo>,
#[allow(dead_code)]
cache: Option<CacheManager>,
}
impl Mononoke {
@ -41,6 +44,7 @@ impl Mononoke {
config: RepoConfigs,
myrouter_port: Option<u16>,
with_skiplist: bool,
cache: Option<CacheManager>,
) -> impl Future<Item = Self, Error = Error> {
join_all(
config
@ -63,6 +67,7 @@ impl Mononoke {
)
.map(move |repos| Self {
repos: repos.into_iter().collect(),
cache,
})
}

143
apiserver/src/cache.rs Normal file
View File

@ -0,0 +1,143 @@
// Copyright (c) 2019-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use bytes::Bytes;
use futures::{Future, IntoFuture};
use caching_ext::MemcacheHandler;
use cloned::cloned;
use futures_ext::{BoxFuture, FutureExt};
use iobuf::IOBuf;
use memcache::{KeyGen, MemcacheClient};
use crate::errors::ErrorKind;
#[derive(Clone)]
pub struct CacheManager {
memcache: MemcacheHandler,
keygen: KeyGen,
}
impl CacheManager {
const KEY_PREFIX: &'static str = "scm.mononoke.apiserver";
const MC_CODEVER: u32 = 1;
const MC_SITEVER: u32 = 1;
pub fn new() -> Self {
CacheManager {
memcache: MemcacheClient::new().into(),
keygen: KeyGen::new(Self::KEY_PREFIX, Self::MC_CODEVER, Self::MC_SITEVER),
}
}
#[cfg(test)]
pub fn new_with_memcache(memcache: MemcacheHandler) -> Self {
CacheManager {
memcache,
keygen: KeyGen::new(Self::KEY_PREFIX, Self::MC_CODEVER, Self::MC_SITEVER),
}
}
fn get(&self, key: String) -> impl Future<Item = Option<IOBuf>, Error = ()> {
self.memcache.get(self.keygen.key(key))
}
fn set(&self, key: String, value: Bytes) -> impl Future<Item = (), Error = ()> {
self.memcache.set(self.keygen.key(key), value)
}
#[allow(dead_code)]
pub fn get_or_fill<
RES: Future<Item = ITEM, Error = ErrorKind> + Send + 'static,
ITEM: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
>(
&self,
key: String,
fill: RES,
) -> BoxFuture<ITEM, ErrorKind> {
let fill_future = fill.and_then({
let this = self.clone();
cloned!(key);
move |resp| {
bincode::serialize(&resp)
.map_err(|_| ())
.into_future()
.and_then(move |serialized| this.set(key, serialized.into()))
.then(|_| Ok(resp))
}
});
self.get(key)
.and_then(|result| match result {
Some(cached) => Ok(Bytes::from(cached)),
None => Err(()),
})
.and_then(|cached| bincode::deserialize(&cached[..]).map_err(|_| ()))
.or_else(|_| fill_future)
.boxify()
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::future::{lazy, FutureResult};
use serde_derive::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Debug)]
struct CacheableStruct(pub u32);
#[test]
fn test_cache_missed() {
let mock_memcache = MemcacheHandler::create_mock();
let manager = CacheManager::new_with_memcache(mock_memcache);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let key = "key-test-missed".to_string();
let num = 1032;
let value = CacheableStruct(num);
let result = runtime.block_on(manager.get(key.clone())).unwrap();
// not in cache yet
assert_eq!(result, None);
let result = runtime
.block_on(manager.get_or_fill(key.clone(), Ok(value).into_future()))
.unwrap();
assert_eq!(result.0, num);
let result = runtime.block_on(manager.get(key.clone())).unwrap().unwrap();
let result = Bytes::from(result);
let result: CacheableStruct = bincode::deserialize(&result[..]).unwrap();
assert_eq!(result.0, num);
}
#[test]
fn test_cache_hit() {
let mock_memcache = MemcacheHandler::create_mock();
let manager = CacheManager::new_with_memcache(mock_memcache);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let key = "key-test-hit".to_string();
let num = 1032;
let value = CacheableStruct(num);
let result = runtime
.block_on(manager.get_or_fill(key.clone(), Ok(value).into_future()))
.unwrap();
assert_eq!(result.0, num);
let result = runtime
.block_on(manager.get_or_fill(
key.clone(),
lazy(|| -> FutureResult<CacheableStruct, ErrorKind> { unreachable!() }),
))
.unwrap();
assert_eq!(result.0, num);
}
}

View File

@ -33,6 +33,7 @@ use sshrelay::SshEnvVars;
use tracing::TraceContext;
mod actor;
mod cache;
mod errors;
mod from_string;
mod middleware;
@ -41,6 +42,7 @@ mod thrift;
use crate::actor::{
BatchRequest, Mononoke, MononokeQuery, MononokeRepoQuery, MononokeRepoResponse, Revision,
};
use crate::cache::CacheManager;
use crate::errors::ErrorKind;
use crate::middleware::ScubaMiddleware;
@ -556,7 +558,8 @@ fn main() -> Fallible<()> {
.long("ssl-ticket-seeds")
.value_name("PATH")
.help("path to the ssl ticket seeds"),
);
)
.arg(Arg::with_name("with-cache").long("with-cache"));
let app = cmdlib::args::add_myrouter_args(app);
let matches =
@ -573,6 +576,7 @@ fn main() -> Fallible<()> {
.expect("must set config path");
let with_scuba = matches.is_present("with-scuba");
let with_skiplist = !matches.is_present("without-skiplist");
let with_cache = matches.is_present("with-cache");
let myrouter_port = cmdlib::args::parse_myrouter_port(&matches);
let address = format!("{}:{}", host, port);
@ -638,11 +642,19 @@ fn main() -> Fallible<()> {
let use_ssl = ssl_acceptor.is_some();
let sys = actix::System::new("mononoke-apiserver");
let cache = if with_cache {
Some(CacheManager::new())
} else {
None
};
let mononoke = runtime.block_on(Mononoke::new(
mononoke_logger.clone(),
repo_configs,
myrouter_port,
with_skiplist,
cache,
))?;
let mononoke = Arc::new(mononoke);