hgproto: introduce stream based handling of input and output of hg protocol

Summary: This is the first step that establishes a stream based API, later diffs will propagate this idea downstream, so that we fully support streaming in f.e. unbundle bundle2

Reviewed By: jsgf

Differential Revision: D6239508

fbshipit-source-id: 43afba4f640586b728cb4cb85d14226a677ee58b
This commit is contained in:
Lukas Piatkowski 2017-11-08 12:21:28 -08:00 committed by Facebook Github Bot
parent ab834dd388
commit e55e4a3a2d
5 changed files with 136 additions and 61 deletions

View File

@ -15,7 +15,6 @@ use slog::Logger;
use bytes::{Bytes, BytesMut};
use futures::future::{self, Future};
use futures::stream::Stream;
use tokio_service::Service;
use futures_ext::{futures_ordered, BoxFuture, BoxStream, FutureExt};
use mercurial_types::NodeHash;
@ -24,36 +23,25 @@ use {BranchRes, GetbundleArgs, Request, Response};
use errors::*;
use sshproto;
pub struct HgService<H> {
pub struct HgCommandHandler<H> {
commands: H,
logger: Option<Logger>,
logger: Logger,
}
impl<H: HgCommands> HgService<H> {
pub fn new(hgcmds: H) -> Self {
HgService {
commands: hgcmds,
logger: None,
}
impl<H: HgCommands> HgCommandHandler<H> {
pub fn new(commands: H, logger: Logger) -> Self {
HgCommandHandler { commands, logger }
}
pub fn new_with_logger(hgcmds: H, logger: &Logger) -> Self {
HgService {
commands: hgcmds,
logger: Some(logger.new(o!())),
}
}
pub fn command(&self, req: Request) -> BoxFuture<Response, Error>
pub fn handle(&self, req: Request) -> BoxFuture<Response, Error>
where
H: HgCommands,
{
if let Some(ref logger) = self.logger {
match &req {
&Request::Batch { .. } => (),
req => debug!(logger, "Got request: {:?}", req),
}
match &req {
&Request::Batch { .. } => (),
req => debug!(self.logger, "Got request: {:?}", req),
}
let hgcmds = &self.commands;
match req {
@ -175,9 +163,7 @@ impl<H: HgCommands> HgService<H> {
}
Ok(Some(cmd)) => cmd,
};
if let Some(ref logger) = self.logger {
info!(logger, "batch command: {:?}", parsed);
}
info!(self.logger, "batch command: {:?}", parsed);
parsed_cmds.push(parsed);
}
@ -186,7 +172,7 @@ impl<H: HgCommands> HgService<H> {
// too long.
let response_futures: Vec<_> = parsed_cmds
.into_iter()
.map(|cmd| self.command(cmd))
.map(|cmd| self.handle(cmd))
.collect();
let encoded_futures = response_futures
@ -231,20 +217,6 @@ fn get_or_none<'a>(map: &'a HashMap<Vec<u8>, Vec<u8>>, key: &'a [u8]) -> &'a [u8
}
}
impl<H> Service for HgService<H>
where
H: HgCommands,
{
type Request = Request;
type Response = Response;
type Error = Error;
type Future = BoxFuture<Response, Self::Error>;
fn call(&self, req: Request) -> Self::Future {
self.command(req)
}
}
#[inline]
fn unimplemented<S, T>(op: S) -> HgCommandRes<T>
where
@ -360,6 +332,8 @@ mod test {
use super::*;
use futures::future;
use slog::Discard;
struct Dummy;
impl HgCommands for Dummy {
fn hello(&self) -> HgCommandRes<HashMap<String, Vec<String>>> {
@ -372,9 +346,10 @@ mod test {
#[test]
fn hello() {
let service = HgService::new(Dummy);
let logger = Logger::root(Discard, o!());
let handler = HgCommandHandler::new(Dummy, logger);
let r = service.call(Request::Hello).wait();
let r = handler.handle(Request::Hello).wait();
println!("hello r = {:?}", r);
let mut res: HashMap<String, Vec<String>> = HashMap::new();
res.insert("capabilities".into(), vec!["something".into()]);
@ -387,9 +362,10 @@ mod test {
#[test]
fn unimpl() {
let service = HgService::new(Dummy);
let logger = Logger::root(Discard, o!());
let handler = HgCommandHandler::new(Dummy, logger);
let r = service.call(Request::Heads).wait();
let r = handler.handle(Request::Heads).wait();
println!("heads r = {:?}", r);
match r {

100
hgproto/src/handler.rs Normal file
View File

@ -0,0 +1,100 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::io;
use std::sync::Arc;
use futures::{Poll, Stream};
use futures_ext::{BoxStream, StreamExt, StreamLayeredExt};
use tokio_io::codec::{Decoder, Encoder};
use bytes::Bytes;
use slog::{self, Logger};
use {HgCommands, Request, Response};
use commands::HgCommandHandler;
use errors::*;
type BytesStream = BoxStream<Bytes, io::Error>;
pub struct HgProtoHandler {
outstream: BoxStream<Bytes, Error>,
}
struct HgProtoHandlerInner<H, Dec, Enc> {
commands_handler: HgCommandHandler<H>,
reqdec: Dec,
respenc: Enc,
_logger: Logger,
}
impl HgProtoHandler {
pub fn new<'a, H, Dec, Enc, L: Into<Option<&'a Logger>>>(
instream: BytesStream,
commands: H,
reqdec: Dec,
respenc: Enc,
logger: L,
) -> Self
where
H: HgCommands + Send + Sync + 'static,
Dec: Decoder<Item = Request> + Clone + Send + Sync + 'static,
Dec::Error: From<io::Error> + Send + 'static,
Enc: Encoder<Item = Response> + Clone + Send + Sync + 'static,
Enc::Error: From<Error> + Send + 'static,
Error: From<Dec::Error> + From<Enc::Error>,
{
let logger = match logger.into() {
None => Logger::root(slog::Discard, o!()),
Some(logger) => logger.new(o!()),
};
let inner = Arc::new(HgProtoHandlerInner {
commands_handler: HgCommandHandler::new(commands, logger.new(o!())),
reqdec,
respenc,
_logger: logger,
});
HgProtoHandler {
outstream: handle(instream, inner),
}
}
}
impl Stream for HgProtoHandler {
type Item = Bytes;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.outstream.poll()
}
}
fn handle<H, Dec, Enc>(
instream: BytesStream,
handler: Arc<HgProtoHandlerInner<H, Dec, Enc>>,
) -> BoxStream<Bytes, Error>
where
H: HgCommands + Send + Sync + 'static,
Dec: Decoder<Item = Request> + Clone + Send + Sync + 'static,
Dec::Error: From<io::Error> + Send + 'static,
Enc: Encoder<Item = Response> + Clone + Send + Sync + 'static,
Enc::Error: From<Error> + Send + 'static,
Error: From<Dec::Error> + From<Enc::Error>,
{
instream
.decode(handler.reqdec.clone())
.from_err()
.and_then({
let handler = handler.clone();
move |req| handler.commands_handler.handle(req)
})
.encode(handler.respenc.clone())
.from_err()
.boxify()
}

View File

@ -17,7 +17,6 @@ extern crate bytes;
extern crate futures;
extern crate tokio_io;
extern crate tokio_proto;
extern crate tokio_service;
#[macro_use]
extern crate slog;
@ -51,7 +50,8 @@ use mercurial_types::NodeHash;
mod batch;
mod errors;
mod service;
mod handler;
mod commands;
pub mod sshproto;
// result from `branches()`
@ -161,5 +161,6 @@ impl Response {
}
}
pub use commands::{HgCommandRes, HgCommands};
pub use errors::{Error, ErrorKind, Result, ResultExt};
pub use service::{HgCommandRes, HgCommands, HgService};
pub use handler::HgProtoHandler;

View File

@ -41,8 +41,9 @@ use errors::*;
pub mod request;
pub mod response;
#[derive(Clone)]
pub struct HgSshCommandEncode;
#[derive(Clone)]
pub struct HgSshCommandDecode;
impl Encoder for HgSshCommandEncode {

View File

@ -57,7 +57,6 @@ use std::thread::{self, JoinHandle};
use futures::{Future, Sink, Stream};
use futures::sink::Wait;
use futures::sync::mpsc;
use futures_ext::{encode, StreamLayeredExt};
use clap::{App, ArgGroup, ArgMatches};
@ -67,8 +66,7 @@ use slog_kvfilter::KVFilter;
use slog_logview::LogViewDrain;
use bytes::Bytes;
use hgproto::HgService;
use hgproto::sshproto::{HgSshCommandDecode, HgSshCommandEncode};
use hgproto::{sshproto, HgProtoHandler};
use metaconfig::RepoConfigs;
use metaconfig::repoconfig::RepoType;
@ -301,18 +299,17 @@ where
let drain = slog::Duplicate::new(drain, listen_log.clone()).fuse();
let conn_log = Logger::root(drain, o![]);
// Construct a repo
let client = repo::RepoClient::new(repo.clone(), &conn_log);
let service = Arc::new(HgService::new_with_logger(client, &conn_log));
// Map stdin into mercurial requests
let reqs = stdin.decode(HgSshCommandDecode);
// process requests
let resps = reqs.and_then(move |req| service.clone().command(req));
// Construct a hg protocol handler
let proto_handler = HgProtoHandler::new(
stdin,
repo::RepoClient::new(repo.clone(), &conn_log),
sshproto::HgSshCommandDecode,
sshproto::HgSshCommandEncode,
&conn_log,
);
// send responses back
let endres = encode::encode(resps, HgSshCommandEncode)
let endres = proto_handler
.map_err(Error::from)
.forward(stdout)
.map(|_| ());