refactor actors to simple struct

Summary: While I was working on `actix-srserver`, I realized the current design of the API server is quite unnecessary. The "MononokeActor" and "MononokeRepoActor" are only returning futures without much CPU computation cost. So it don't need to be placed in a separate thread.

Reviewed By: jsgf

Differential Revision: D9472848

fbshipit-source-id: 618ec39c42d90717fa6985fee7d6308420962d3f
This commit is contained in:
Arun Kulshreshtha 2018-08-31 13:53:54 -07:00 committed by Facebook Github Bot
parent ed34b17e1a
commit 2dc93d6a5f
8 changed files with 96 additions and 142 deletions

View File

@ -11,10 +11,8 @@ mod response;
use std::collections::HashMap;
use actix::{Actor, Addr, Context, Handler};
use actix::dev::Request;
use failure::Error;
use futures::{Future, IntoFuture};
use futures::IntoFuture;
use futures_ext::{BoxFuture, FutureExt};
use slog::Logger;
use tokio::runtime::TaskExecutor;
@ -23,64 +21,38 @@ use metaconfig::repoconfig::RepoConfigs;
use errors::ErrorKind;
pub use self::query::{MononokeQuery, MononokeRepoQuery};
pub use self::repo::MononokeRepoActor;
pub use self::repo::MononokeRepo;
pub use self::response::MononokeRepoResponse;
pub struct MononokeActor {
repos: HashMap<String, Addr<MononokeRepoActor>>,
pub struct Mononoke {
repos: HashMap<String, MononokeRepo>,
}
impl MononokeActor {
impl Mononoke {
pub fn new(logger: Logger, config: RepoConfigs, executor: TaskExecutor) -> Self {
let logger = logger.clone();
let repos = config
.repos
.into_iter()
.filter(move |&(_, ref config)| config.enabled)
.map(move |(reponame, config)| {
.map(move |(name, config)| {
cloned!(logger, executor);
(
reponame,
MononokeRepoActor::create(move |_| {
MononokeRepoActor::new(logger, config, executor)
.expect("Unable to initialize repo")
}),
)
let repo =
MononokeRepo::new(logger, config, executor).expect("Unable to initialize repo");
(name, repo)
})
.collect();
Self { repos }
}
}
impl Actor for MononokeActor {
type Context = Context<Self>;
}
impl Handler<MononokeQuery> for MononokeActor {
type Result = Result<Request<MononokeRepoActor, MononokeRepoQuery>, Error>;
fn handle(
&mut self,
pub fn send_query(
&self,
MononokeQuery { repo, kind, .. }: MononokeQuery,
_ctx: &mut Context<Self>,
) -> Self::Result {
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
match self.repos.get(&repo) {
Some(repo) => Ok(repo.send(kind)),
None => Err(ErrorKind::NotFound(repo, None).into()),
Some(repo) => repo.send_query(kind),
None => Err(ErrorKind::NotFound(repo, None)).into_future().boxify(),
}
}
}
pub fn unwrap_request(
request: Request<MononokeActor, MononokeQuery>,
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
request
.into_future()
.from_err()
.and_then(|result| result) // use flatten here will blind the compiler.
.and_then(|result| result.map_err(From::from))
.flatten()
.flatten()
.from_err()
}

View File

@ -6,15 +6,10 @@
use std::convert::TryFrom;
use actix::Message;
use actix::dev::Request;
use failure::Error;
use futures_ext::BoxFuture;
use apiserver_thrift::types::MononokeGetRawParams;
use super::{MononokeRepoActor, MononokeRepoResponse};
#[derive(Debug)]
pub enum MononokeRepoQuery {
GetRawFile {
@ -40,19 +35,11 @@ pub enum MononokeRepoQuery {
},
}
impl Message for MononokeRepoQuery {
type Result = Result<BoxFuture<MononokeRepoResponse, Error>, Error>;
}
pub struct MononokeQuery {
pub kind: MononokeRepoQuery,
pub repo: String,
}
impl Message for MononokeQuery {
type Result = Result<Request<MononokeRepoActor, MononokeRepoQuery>, Error>;
}
impl TryFrom<MononokeGetRawParams> for MononokeQuery {
type Error = Error;

View File

@ -7,8 +7,7 @@
use std::convert::TryInto;
use std::sync::Arc;
use actix::{Actor, Context, Handler};
use failure::{err_msg, Error, Result};
use failure::{err_msg, Error};
use futures::{Future, IntoFuture};
use futures::sync::oneshot;
use futures_ext::BoxFuture;
@ -31,14 +30,14 @@ use from_string as FS;
use super::{MononokeRepoQuery, MononokeRepoResponse};
use super::model::Entry;
pub struct MononokeRepoActor {
pub struct MononokeRepo {
repo: Arc<BlobRepo>,
logger: Logger,
executor: TaskExecutor,
}
impl MononokeRepoActor {
pub fn new(logger: Logger, config: RepoConfig, executor: TaskExecutor) -> Result<Self> {
impl MononokeRepo {
pub fn new(logger: Logger, config: RepoConfig, executor: TaskExecutor) -> Result<Self, Error> {
let repoid = RepositoryId::new(config.repoid);
let repo = match config.repotype {
BlobRocks(ref path) => BlobRepo::new_rocksdb(logger.clone(), &path, repoid),
@ -57,17 +56,17 @@ impl MononokeRepoActor {
&self,
changeset: String,
path: String,
) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
debug!(
self.logger,
"Retrieving file content of {} at changeset {}.", path, changeset
);
let mpath = FS::get_mpath(path.clone())?;
let changesetid = FS::get_changeset_id(changeset)?;
let mpath = try_boxfuture!(FS::get_mpath(path.clone()));
let changesetid = try_boxfuture!(FS::get_changeset_id(changeset));
let repo = self.repo.clone();
Ok(api::get_content_by_path(repo, changesetid, Some(mpath))
api::get_content_by_path(repo, changesetid, Some(mpath))
.and_then(move |content| match content {
Content::File(content)
| Content::Executable(content)
@ -77,14 +76,14 @@ impl MononokeRepoActor {
_ => Err(ErrorKind::InvalidInput(path.to_string(), None).into()),
})
.from_err()
.boxify())
.boxify()
}
fn is_ancestor(
&self,
proposed_ancestor: String,
proposed_descendent: String,
) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let genbfs = GenerationNumberBFS::new();
let src_hash_maybe = FS::get_nodehash(&proposed_descendent);
let dst_hash_maybe = FS::get_nodehash(&proposed_ancestor);
@ -103,28 +102,27 @@ impl MononokeRepoActor {
}
});
let (tx, rx) = oneshot::channel::<Result<bool>>();
let (tx, rx) = oneshot::channel::<Result<bool, ErrorKind>>();
self.executor.spawn(
src_hash_future
.and_then(|src| dst_hash_future.map(move |dst| (src, dst)))
.and_then({
cloned!(self.repo);
move |(src, dst)| genbfs.query_reachability(repo, src, dst)
move |(src, dst)| genbfs.query_reachability(repo, src, dst).from_err()
})
.then(|r| tx.send(r).map_err(|_| ())),
);
Ok(rx.flatten()
rx.flatten()
.map(|answer| MononokeRepoResponse::IsAncestor { answer })
.from_err()
.boxify())
.boxify()
}
fn get_blob_content(&self, hash: String) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
let blobhash = FS::get_nodehash(&hash)?;
fn get_blob_content(&self, hash: String) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let blobhash = try_boxfuture!(FS::get_nodehash(&hash));
Ok(self.repo
self.repo
.get_file_content(&blobhash)
.and_then(move |content| match content {
FileContents::Bytes(content) => {
@ -132,23 +130,23 @@ impl MononokeRepoActor {
}
})
.from_err()
.boxify())
.boxify()
}
fn list_directory(
&self,
changeset: String,
path: String,
) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let mpath = if path.is_empty() {
None
} else {
Some(FS::get_mpath(path.clone())?)
Some(try_boxfuture!(FS::get_mpath(path.clone())))
};
let changesetid = FS::get_changeset_id(changeset)?;
let changesetid = try_boxfuture!(FS::get_changeset_id(changeset));
let repo = self.repo.clone();
Ok(api::get_content_by_path(repo, changesetid, mpath)
api::get_content_by_path(repo, changesetid, mpath)
.and_then(move |content| match content {
Content::Tree(tree) => Ok(tree),
_ => Err(ErrorKind::InvalidInput(path.to_string(), None).into()),
@ -161,13 +159,13 @@ impl MononokeRepoActor {
files: Box::new(files),
})
.from_err()
.boxify())
.boxify()
}
fn get_tree(&self, hash: String) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
let treehash = FS::get_nodehash(&hash)?;
fn get_tree(&self, hash: String) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let treehash = try_boxfuture!(FS::get_nodehash(&hash));
Ok(self.repo
self.repo
.get_manifest_by_nodeid(&treehash)
.map(|tree| {
tree.list()
@ -177,29 +175,21 @@ impl MononokeRepoActor {
files: Box::new(files),
})
.from_err()
.boxify())
.boxify()
}
fn get_changeset(&self, hash: String) -> Result<BoxFuture<MononokeRepoResponse, Error>> {
let changesetid = FS::get_changeset_id(hash)?;
fn get_changeset(&self, hash: String) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let changesetid = try_boxfuture!(FS::get_changeset_id(hash));
Ok(self.repo
self.repo
.get_changeset_by_changesetid(&changesetid)
.and_then(|changeset| changeset.try_into().map_err(From::from))
.map(|changeset| MononokeRepoResponse::GetChangeset { changeset })
.from_err()
.boxify())
.boxify()
}
}
impl Actor for MononokeRepoActor {
type Context = Context<Self>;
}
impl Handler<MononokeRepoQuery> for MononokeRepoActor {
type Result = Result<BoxFuture<MononokeRepoResponse, Error>>;
fn handle(&mut self, msg: MononokeRepoQuery, _ctx: &mut Context<Self>) -> Self::Result {
pub fn send_query(&self, msg: MononokeRepoQuery) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
use MononokeRepoQuery::*;
match msg {

View File

@ -11,10 +11,10 @@ use actix_web::HttpResponse;
use actix_web::error::ResponseError;
use actix_web::http::StatusCode;
use failure::{Context, Error, Fail};
use apiserver_thrift::types::{MononokeAPIException, MononokeAPIExceptionKind};
use futures::Canceled;
use api::errors::ErrorKind as ApiError;
use apiserver_thrift::types::{MononokeAPIException, MononokeAPIExceptionKind};
use blobrepo::ErrorKind as BlobRepoError;
use reachabilityindex::errors::ErrorKind as ReachabilityIndexError;
@ -105,6 +105,13 @@ impl From<Error> for ErrorKind {
}
}
impl From<Canceled> for ErrorKind {
fn from(e: Canceled) -> ErrorKind {
let error = Error::from_boxed_compat(Box::new(e));
ErrorKind::InternalError(error)
}
}
impl From<MailboxError> for ErrorKind {
fn from(e: MailboxError) -> ErrorKind {
ErrorKind::InternalError(e.into())

View File

@ -10,7 +10,6 @@ use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Arc;
use failure::{Error, Result};
use futures::{Future, IntoFuture};
use futures_ext::{BoxFuture, FutureExt};
@ -22,23 +21,21 @@ use mononoke_types::MPath;
use errors::ErrorKind;
pub fn get_mpath(path: String) -> Result<MPath> {
MPath::try_from(&*path).map_err(|e| ErrorKind::InvalidInput(path, Some(e)).into())
pub fn get_mpath(path: String) -> Result<MPath, ErrorKind> {
MPath::try_from(&*path).map_err(|e| ErrorKind::InvalidInput(path, Some(e)))
}
pub fn get_changeset_id(changesetid: String) -> Result<HgChangesetId> {
HgChangesetId::from_str(&changesetid)
.map_err(|e| ErrorKind::InvalidInput(changesetid, Some(e)).into())
pub fn get_changeset_id(changesetid: String) -> Result<HgChangesetId, ErrorKind> {
HgChangesetId::from_str(&changesetid).map_err(|e| ErrorKind::InvalidInput(changesetid, Some(e)))
}
pub fn get_bookmark(bookmark: String) -> Result<Bookmark> {
pub fn get_bookmark(bookmark: String) -> Result<Bookmark, ErrorKind> {
Bookmark::new(bookmark.clone())
.map_err(|e| ErrorKind::InvalidInput(bookmark.to_string(), Some(e)).into())
.map_err(|e| ErrorKind::InvalidInput(bookmark.to_string(), Some(e)))
}
pub fn get_nodehash(hash: &str) -> Result<HgNodeHash> {
HgNodeHash::from_str(hash)
.map_err(|e| ErrorKind::InvalidInput(hash.to_string(), Some(e)).into())
pub fn get_nodehash(hash: &str) -> Result<HgNodeHash, ErrorKind> {
HgNodeHash::from_str(hash).map_err(|e| ErrorKind::InvalidInput(hash.to_string(), Some(e)))
}
// interpret a string as a bookmark and find the corresponding changeset id.
@ -47,10 +44,10 @@ pub fn get_nodehash(hash: &str) -> Result<HgNodeHash> {
pub fn string_to_bookmark_changeset_id(
node_string: String,
repo: Arc<BlobRepo>,
) -> BoxFuture<HgChangesetId, Error> {
) -> BoxFuture<HgChangesetId, ErrorKind> {
get_bookmark(node_string.clone())
.into_future()
.and_then({ move |bookmark| api::get_changeset_by_bookmark(repo, bookmark).from_err() })
.map_err(move |e| ErrorKind::InvalidInput(node_string.to_string(), Some(e)).into())
.map_err(move |e| ErrorKind::InvalidInput(node_string.to_string(), Some(e.into())))
.boxify()
}

View File

@ -23,6 +23,7 @@ extern crate cmdlib;
extern crate failure_ext as failure;
extern crate fb303;
extern crate futures;
#[macro_use]
extern crate futures_ext;
extern crate mercurial_types;
extern crate metaconfig;
@ -59,8 +60,8 @@ mod thrift;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use actix::{Actor, Addr};
use actix_web::{http, server, App, HttpRequest, HttpResponse, State};
use blobrepo::BlobRepo;
use bookmarks::Bookmark;
@ -79,7 +80,7 @@ use mercurial_types::nodehash::HgChangesetId;
use metaconfig::RepoConfigs;
use scuba_ext::ScubaSampleBuilder;
use actor::{unwrap_request, MononokeActor, MononokeQuery, MononokeRepoQuery, MononokeRepoResponse};
use actor::{Mononoke, MononokeQuery, MononokeRepoQuery, MononokeRepoResponse};
use errors::ErrorKind;
mod config {
@ -114,13 +115,13 @@ struct HashQueryInfo {
fn get_raw_file(
(state, info): (State<HttpServerState>, actix_web::Path<QueryInfo>),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::GetRawFile {
changeset: info.changeset.clone(),
path: info.path.clone(),
},
}))
})
}
fn is_ancestor(
@ -132,58 +133,58 @@ fn is_ancestor(
let proposed_descendent_parsed = percent_decode(info.proposed_descendent.as_bytes())
.decode_utf8_lossy()
.to_string();
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::IsAncestor {
proposed_ancestor: proposed_ancestor_parsed,
proposed_descendent: proposed_descendent_parsed,
},
}))
})
}
fn list_directory(
(state, info): (State<HttpServerState>, actix_web::Path<QueryInfo>),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::ListDirectory {
changeset: info.changeset.clone(),
path: info.path.clone(),
},
}))
})
}
fn get_blob_content(
(state, info): (State<HttpServerState>, actix_web::Path<HashQueryInfo>),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::GetBlobContent {
hash: info.hash.clone(),
},
}))
})
}
fn get_tree(
(state, info): (State<HttpServerState>, actix_web::Path<HashQueryInfo>),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::GetTree {
hash: info.hash.clone(),
},
}))
})
}
fn get_changeset(
(state, info): (State<HttpServerState>, actix_web::Path<HashQueryInfo>),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
unwrap_request(state.mononoke.send(MononokeQuery {
state.mononoke.send_query(MononokeQuery {
repo: info.repo.clone(),
kind: MononokeRepoQuery::GetChangeset {
hash: info.hash.clone(),
},
}))
})
}
fn setup_logger(debug: bool) -> Logger {
@ -238,7 +239,7 @@ fn create_config<P: AsRef<Path>>(
#[derive(Clone)]
struct HttpServerState {
mononoke: Addr<MononokeActor>,
mononoke: Arc<Mononoke>,
logger: Logger,
}
@ -382,17 +383,15 @@ fn main() -> Result<()> {
let use_ssl = ssl_acceptor.is_some();
let sys = actix::System::new("mononoke-apiserver");
let executor = runtime.executor();
let addr = MononokeActor::create(move |_| {
MononokeActor::new(mononoke_logger.clone(), repo_configs, executor)
});
let mononoke = Mononoke::new(mononoke_logger.clone(), repo_configs, executor);
let mononoke = Arc::new(mononoke);
if let Ok(port) = thrift_port {
thrift::make_thrift(thrift_logger, host.to_string(), port, addr.clone());
thrift::make_thrift(thrift_logger, host.to_string(), port, mononoke.clone());
}
let state = HttpServerState {
mononoke: addr,
mononoke,
logger: actix_logger.clone(),
};

View File

@ -8,7 +8,9 @@ mod dispatcher;
mod fb303;
mod mononoke;
use actix::{Addr, Arbiter};
use std::sync::Arc;
use actix::Arbiter;
use slog::Logger;
use apiserver_thrift::server::make_MononokeAPIService_server;
@ -18,9 +20,9 @@ use srserver::ThriftServerBuilder;
use self::dispatcher::ThriftDispatcher;
use self::fb303::FacebookServiceImpl;
use self::mononoke::MononokeAPIServiceImpl;
use super::actor::MononokeActor;
use super::actor::Mononoke;
pub fn make_thrift(logger: Logger, host: String, port: i32, addr: Addr<MononokeActor>) {
pub fn make_thrift(logger: Logger, host: String, port: i32, addr: Arc<Mononoke>) {
let dispatcher = ThriftDispatcher(Arbiter::new("thrift-worker"));
dispatcher.start({

View File

@ -5,8 +5,8 @@
// GNU General Public License version 2 or any later version.
use std::convert::TryInto;
use std::sync::Arc;
use actix::Addr;
use futures::{Future, IntoFuture};
use futures_ext::{BoxFuture, FutureExt};
use slog::Logger;
@ -15,16 +15,16 @@ use apiserver_thrift::server::MononokeApiservice;
use apiserver_thrift::services::mononoke_apiservice::GetRawExn;
use apiserver_thrift::types::MononokeGetRawParams;
use super::super::actor::{unwrap_request, MononokeActor, MononokeRepoResponse};
use super::super::actor::{Mononoke, MononokeRepoResponse};
#[derive(Clone)]
pub struct MononokeAPIServiceImpl {
addr: Addr<MononokeActor>,
addr: Arc<Mononoke>,
logger: Logger,
}
impl MononokeAPIServiceImpl {
pub fn new(addr: Addr<MononokeActor>, logger: Logger) -> Self {
pub fn new(addr: Arc<Mononoke>, logger: Logger) -> Self {
Self { addr, logger }
}
}
@ -37,7 +37,7 @@ impl MononokeApiservice for MononokeAPIServiceImpl {
.from_err()
.and_then({
cloned!(self.addr);
move |param| unwrap_request(addr.send(param))
move |param| addr.send_query(param)
})
.and_then(|resp: MononokeRepoResponse| match resp {
MononokeRepoResponse::GetRawFile { content } => Ok(content.to_vec()),