diff --git a/hgproto/src/service.rs b/hgproto/src/commands.rs similarity index 89% rename from hgproto/src/service.rs rename to hgproto/src/commands.rs index 79fed62bc3..7207f02bb1 100644 --- a/hgproto/src/service.rs +++ b/hgproto/src/commands.rs @@ -15,7 +15,6 @@ use slog::Logger; use bytes::{Bytes, BytesMut}; use futures::future::{self, Future}; use futures::stream::Stream; -use tokio_service::Service; use futures_ext::{futures_ordered, BoxFuture, BoxStream, FutureExt}; use mercurial_types::NodeHash; @@ -24,36 +23,25 @@ use {BranchRes, GetbundleArgs, Request, Response}; use errors::*; use sshproto; -pub struct HgService { +pub struct HgCommandHandler { commands: H, - logger: Option, + logger: Logger, } -impl HgService { - pub fn new(hgcmds: H) -> Self { - HgService { - commands: hgcmds, - logger: None, - } +impl HgCommandHandler { + pub fn new(commands: H, logger: Logger) -> Self { + HgCommandHandler { commands, logger } } - pub fn new_with_logger(hgcmds: H, logger: &Logger) -> Self { - HgService { - commands: hgcmds, - logger: Some(logger.new(o!())), - } - } - - pub fn command(&self, req: Request) -> BoxFuture + pub fn handle(&self, req: Request) -> BoxFuture where H: HgCommands, { - if let Some(ref logger) = self.logger { - match &req { - &Request::Batch { .. } => (), - req => debug!(logger, "Got request: {:?}", req), - } + match &req { + &Request::Batch { .. } => (), + req => debug!(self.logger, "Got request: {:?}", req), } + let hgcmds = &self.commands; match req { @@ -175,9 +163,7 @@ impl HgService { } Ok(Some(cmd)) => cmd, }; - if let Some(ref logger) = self.logger { - info!(logger, "batch command: {:?}", parsed); - } + info!(self.logger, "batch command: {:?}", parsed); parsed_cmds.push(parsed); } @@ -186,7 +172,7 @@ impl HgService { // too long. let response_futures: Vec<_> = parsed_cmds .into_iter() - .map(|cmd| self.command(cmd)) + .map(|cmd| self.handle(cmd)) .collect(); let encoded_futures = response_futures @@ -231,20 +217,6 @@ fn get_or_none<'a>(map: &'a HashMap, Vec>, key: &'a [u8]) -> &'a [u8 } } -impl Service for HgService -where - H: HgCommands, -{ - type Request = Request; - type Response = Response; - type Error = Error; - type Future = BoxFuture; - - fn call(&self, req: Request) -> Self::Future { - self.command(req) - } -} - #[inline] fn unimplemented(op: S) -> HgCommandRes where @@ -360,6 +332,8 @@ mod test { use super::*; use futures::future; + use slog::Discard; + struct Dummy; impl HgCommands for Dummy { fn hello(&self) -> HgCommandRes>> { @@ -372,9 +346,10 @@ mod test { #[test] fn hello() { - let service = HgService::new(Dummy); + let logger = Logger::root(Discard, o!()); + let handler = HgCommandHandler::new(Dummy, logger); - let r = service.call(Request::Hello).wait(); + let r = handler.handle(Request::Hello).wait(); println!("hello r = {:?}", r); let mut res: HashMap> = HashMap::new(); res.insert("capabilities".into(), vec!["something".into()]); @@ -387,9 +362,10 @@ mod test { #[test] fn unimpl() { - let service = HgService::new(Dummy); + let logger = Logger::root(Discard, o!()); + let handler = HgCommandHandler::new(Dummy, logger); - let r = service.call(Request::Heads).wait(); + let r = handler.handle(Request::Heads).wait(); println!("heads r = {:?}", r); match r { diff --git a/hgproto/src/handler.rs b/hgproto/src/handler.rs new file mode 100644 index 0000000000..654e32a73e --- /dev/null +++ b/hgproto/src/handler.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2004-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use std::io; +use std::sync::Arc; + +use futures::{Poll, Stream}; +use futures_ext::{BoxStream, StreamExt, StreamLayeredExt}; +use tokio_io::codec::{Decoder, Encoder}; + +use bytes::Bytes; +use slog::{self, Logger}; + +use {HgCommands, Request, Response}; +use commands::HgCommandHandler; + +use errors::*; + +type BytesStream = BoxStream; + +pub struct HgProtoHandler { + outstream: BoxStream, +} + +struct HgProtoHandlerInner { + commands_handler: HgCommandHandler, + reqdec: Dec, + respenc: Enc, + _logger: Logger, +} + +impl HgProtoHandler { + pub fn new<'a, H, Dec, Enc, L: Into>>( + instream: BytesStream, + commands: H, + reqdec: Dec, + respenc: Enc, + logger: L, + ) -> Self + where + H: HgCommands + Send + Sync + 'static, + Dec: Decoder + Clone + Send + Sync + 'static, + Dec::Error: From + Send + 'static, + Enc: Encoder + Clone + Send + Sync + 'static, + Enc::Error: From + Send + 'static, + Error: From + From, + { + let logger = match logger.into() { + None => Logger::root(slog::Discard, o!()), + Some(logger) => logger.new(o!()), + }; + + let inner = Arc::new(HgProtoHandlerInner { + commands_handler: HgCommandHandler::new(commands, logger.new(o!())), + reqdec, + respenc, + _logger: logger, + }); + + HgProtoHandler { + outstream: handle(instream, inner), + } + } +} + +impl Stream for HgProtoHandler { + type Item = Bytes; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.outstream.poll() + } +} + +fn handle( + instream: BytesStream, + handler: Arc>, +) -> BoxStream +where + H: HgCommands + Send + Sync + 'static, + Dec: Decoder + Clone + Send + Sync + 'static, + Dec::Error: From + Send + 'static, + Enc: Encoder + Clone + Send + Sync + 'static, + Enc::Error: From + Send + 'static, + Error: From + From, +{ + instream + .decode(handler.reqdec.clone()) + .from_err() + .and_then({ + let handler = handler.clone(); + move |req| handler.commands_handler.handle(req) + }) + .encode(handler.respenc.clone()) + .from_err() + .boxify() +} diff --git a/hgproto/src/lib.rs b/hgproto/src/lib.rs index 026575361b..50a4f94980 100644 --- a/hgproto/src/lib.rs +++ b/hgproto/src/lib.rs @@ -17,7 +17,6 @@ extern crate bytes; extern crate futures; extern crate tokio_io; extern crate tokio_proto; -extern crate tokio_service; #[macro_use] extern crate slog; @@ -51,7 +50,8 @@ use mercurial_types::NodeHash; mod batch; mod errors; -mod service; +mod handler; +mod commands; pub mod sshproto; // result from `branches()` @@ -161,5 +161,6 @@ impl Response { } } +pub use commands::{HgCommandRes, HgCommands}; pub use errors::{Error, ErrorKind, Result, ResultExt}; -pub use service::{HgCommandRes, HgCommands, HgService}; +pub use handler::HgProtoHandler; diff --git a/hgproto/src/sshproto/mod.rs b/hgproto/src/sshproto/mod.rs index 6c22669d54..c3c8274bd7 100644 --- a/hgproto/src/sshproto/mod.rs +++ b/hgproto/src/sshproto/mod.rs @@ -41,8 +41,9 @@ use errors::*; pub mod request; pub mod response; - +#[derive(Clone)] pub struct HgSshCommandEncode; +#[derive(Clone)] pub struct HgSshCommandDecode; impl Encoder for HgSshCommandEncode { diff --git a/server/src/main.rs b/server/src/main.rs index 59598a2062..acaa957f0f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -57,7 +57,6 @@ use std::thread::{self, JoinHandle}; use futures::{Future, Sink, Stream}; use futures::sink::Wait; use futures::sync::mpsc; -use futures_ext::{encode, StreamLayeredExt}; use clap::{App, ArgGroup, ArgMatches}; @@ -67,8 +66,7 @@ use slog_kvfilter::KVFilter; use slog_logview::LogViewDrain; use bytes::Bytes; -use hgproto::HgService; -use hgproto::sshproto::{HgSshCommandDecode, HgSshCommandEncode}; +use hgproto::{sshproto, HgProtoHandler}; use metaconfig::RepoConfigs; use metaconfig::repoconfig::RepoType; @@ -301,18 +299,17 @@ where let drain = slog::Duplicate::new(drain, listen_log.clone()).fuse(); let conn_log = Logger::root(drain, o![]); - // Construct a repo - let client = repo::RepoClient::new(repo.clone(), &conn_log); - let service = Arc::new(HgService::new_with_logger(client, &conn_log)); - - // Map stdin into mercurial requests - let reqs = stdin.decode(HgSshCommandDecode); - - // process requests - let resps = reqs.and_then(move |req| service.clone().command(req)); + // Construct a hg protocol handler + let proto_handler = HgProtoHandler::new( + stdin, + repo::RepoClient::new(repo.clone(), &conn_log), + sshproto::HgSshCommandDecode, + sshproto::HgSshCommandEncode, + &conn_log, + ); // send responses back - let endres = encode::encode(resps, HgSshCommandEncode) + let endres = proto_handler .map_err(Error::from) .forward(stdout) .map(|_| ());