mononoke: hgproto: capture unbundle stream as Bytes

Summary:
Parse the unbundle command as an `unbundle` command with its `heads`
parameter, followed by a bundle2 stream which is parsed in order to capture it
as bytes, as a simulation of an actual stream.

Reviewed By: sid0

Differential Revision: D6209818

fbshipit-source-id: 9bc454319350e2047160347964740f3a3d2f592f
This commit is contained in:
Jeremy Fitzhardinge 2017-11-01 18:49:27 -07:00 committed by Facebook Github Bot
parent 6b5a13c1f6
commit 04276f43b4
5 changed files with 130 additions and 38 deletions

BIN
hgproto/fixtures/min.bundle Normal file

Binary file not shown.

View File

@ -94,9 +94,7 @@ pub enum Request {
new: NodeHash,
},
Streamout,
Unbundle {
heads: Vec<String>, /* stream: Stream<Vec<u8>, Error> TBD: Stream */
},
Unbundle { heads: Vec<String>, stream: Bytes },
}
/// The arguments that `getbundle` accepts, in a separate struct for

View File

@ -145,8 +145,8 @@ impl<H: HgCommands> HgService<H> {
.map(|_| Response::Streamout)
.map_err(self::Error::into)
.boxify(),
Request::Unbundle { heads } => hgcmds
.unbundle(heads)
Request::Unbundle { heads, stream } => hgcmds
.unbundle(heads, stream)
.map(|_| Response::Unbundle)
.map_err(self::Error::into)
.boxify(), //_ => unimplemented!()
@ -350,10 +350,7 @@ pub trait HgCommands {
}
// @wireprotocommand('unbundle', 'heads')
fn unbundle(
&self,
_heads: Vec<String>, /* , _stream: BoxStream<Vec<u8>, Error> */
) -> HgCommandRes<()> {
fn unbundle(&self, _heads: Vec<String>, _stream: Bytes) -> HgCommandRes<()> {
unimplemented("unbundle")
}
}

View File

@ -5,12 +5,17 @@
// GNU General Public License version 2 or any later version.
use std::collections::HashMap;
use std::io::{self, Cursor, Read};
use std::iter;
use std::str::{self, FromStr};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::{Async, Stream};
use nom::{is_alphanumeric, is_digit, ErrorKind, FindSubstring, IResult, Needed, Slice};
use slog;
use tokio_io::AsyncRead;
use mercurial_bundles::bundle2::Bundle2Stream;
use mercurial_types::NodeHash;
use {GetbundleArgs, Request};
@ -278,6 +283,49 @@ where
}
}
// This is pretty awful. We need to consume the input until we have an entire stream,
// but only so we can count the underlying bytes. Once we have that we take the raw bytes
// and throw away all the result of parsing.
fn bundle2stream(inp: &[u8]) -> IResult<&[u8], Bytes> {
// Reaching the end of the buffer just means we need more input, not that there is no
// more input. So remap EOF to WouldBlock.
#[derive(Debug)]
struct EofCursor<T>(Cursor<T>);
impl<T: AsRef<[u8]>> Read for EofCursor<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.0.read(buf) {
Ok(0) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Ok(v) => Ok(v),
Err(err) => Err(err),
}
}
}
impl<T: AsRef<[u8]>> AsyncRead for EofCursor<T> {}
let mut cur = EofCursor(Cursor::new(inp));
{
let logger = slog::Logger::root(slog::Discard, o!());
let mut b2 = Bundle2Stream::new(&mut cur, logger);
loop {
match b2.poll() {
Ok(Async::Ready(Some(_item))) => (),
Ok(Async::Ready(None)) => {
break;
}
Ok(Async::NotReady) => {
return IResult::Incomplete(Needed::Unknown);
}
Err(_err) => {
return IResult::Error(ErrorKind::Custom(0xbad));
}
}
}
};
let (stream, rest) = inp.split_at(cur.0.position() as usize);
IResult::Done(rest, Bytes::from(stream))
}
/// Parse a command, given some input, a command name (used as a tag), a param parser
/// function (which generalizes over batched and non-batched parameter syntaxes),
@ -373,6 +421,35 @@ pub fn parse_batch(buf: &mut BytesMut) -> Result<Option<Request>> {
parse_common(buf, batch_params)
}
/// Parse an unbundle command. This is unusual because it's a normal command talking
/// the "heads" parameter, but then it consumes the rest of the stream as a bundle2
/// stream until that finishes, then resumes normal command processing. Ideally this
/// should be implemented in a streaming way, but it isn't yet. We process the unbundle
/// command almost as usual, then follow it by capturing the bundle2 stream as Bytes and
/// return them as a Request::Unbundle object for later processing.
fn unbundle(
inp: &[u8],
parse_params: fn(&[u8], usize)
-> IResult<&[u8], HashMap<Vec<u8>, Vec<u8>>>,
) -> IResult<&[u8], Request> {
// Use this as a syntactic proxy for Request::Unbundle, which works because Request's
// values are struct-like enums, and this is a struct, so the command macro is happy
// to fill it out.
struct UnbundleCmd {
heads: Vec<String>,
}
do_parse!(inp,
unbundle: command!("unbundle", UnbundleCmd, parse_params, {
heads => stringlist,
}) >>
stream: bundle2stream >>
(Request::Unbundle {
heads: unbundle.heads,
stream: stream
})
)
}
/// Common parser, generalized over how to parse parameters (either unbatched or
/// batched syntax.)
#[cfg_attr(rustfmt, rustfmt_skip)]
@ -441,9 +518,7 @@ fn parse_common(
new => nodehash,
})
| command!("streamout", Streamout, parse_params, {})
| command!("unbundle", Unbundle, parse_params, {
heads => stringlist,
})
| call!(unbundle, parse_params)
);
// Turn "rest" into a "consumed" bytecount, so consume it once the
@ -923,7 +998,7 @@ mod test {
#[cfg(test)]
mod test_parse {
use super::*;
use std::fmt::Display;
use std::fmt::Debug;
fn hash_ones() -> NodeHash {
"1111111111111111111111111111111111111111".parse().unwrap()
@ -945,7 +1020,7 @@ mod test_parse {
/// - check all truncated inputs return "Ok(None)"
/// - complete inputs return the expected result, and leave any remainder in
/// the input buffer.
fn test_parse<I: AsRef<[u8]> + Display>(inp: I, exp: Request) {
fn test_parse<I: AsRef<[u8]> + Debug>(inp: I, exp: Request) {
let inbytes = inp.as_ref();
// check for short inputs
@ -954,16 +1029,18 @@ mod test_parse {
match parse(&mut buf) {
Ok(None) => (),
Ok(Some(val)) => panic!(
"BAD PASS: inp >>{}<< len {} passed unexpectedly val {:?}",
inp,
"BAD PASS: inp >>{:?}<< lpassed unexpectedly val {:?} pass with {}/{} bytes",
Bytes::from(inbytes.to_vec()),
val,
l,
val
inbytes.len()
),
Err(err) => panic!(
"BAD FAIL: inp >>{}<< len {} failed {:?} (not incomplete)",
inp,
"BAD FAIL: inp >>{:?}<< failed {:?} (not incomplete) with {}/{} bytes",
Bytes::from(inbytes.to_vec()),
err,
l,
err
inbytes.len()
),
};
}
@ -973,14 +1050,23 @@ mod test_parse {
for l in 0..extra.len() {
let mut buf = BytesMut::from(inbytes.to_vec());
buf.extend_from_slice(&extra[0..l]);
let buflen = buf.len();
match parse(&mut buf) {
Ok(Some(val)) => assert_eq!(val, exp),
Ok(None) => panic!("BAD INCOMPLETE: inp >>{}<< extra {} incomplete", inp, l),
Err(err) => panic!(
"BAD FAIL: inp >>{}<< extra {} failed {:?} (not incomplete)",
inp,
Ok(Some(val)) => assert_eq!(val, exp, "with {}/{} bytes", buflen, inbytes.len()),
Ok(None) => panic!(
"BAD INCOMPLETE: inp >>{:?}<< extra {} incomplete {}/{} bytes",
Bytes::from(inbytes.to_vec()),
l,
err
buflen,
inbytes.len()
),
Err(err) => panic!(
"BAD FAIL: inp >>{:?}<< extra {} failed {:?} (not incomplete) with {}/{} bytes",
Bytes::from(inbytes.to_vec()),
l,
err,
buflen,
inbytes.len()
),
};
assert_eq!(&*buf, &extra[0..l]);
@ -1232,20 +1318,34 @@ mod test_parse {
test_parse(inp, Request::Streamout {});
}
#[test]
fn test_parse_unbundle() {
let inp = "unbundle\n\
heads 10\n\
666f726365"; // "force" in hex-encoding
fn test_parse_unbundle_with(bundle: &[u8]) {
let mut inp = b"unbundle\n\
heads 10\n\
666f726365" // "force" hex encoded
.to_vec();
inp.extend_from_slice(bundle);
test_parse(
inp,
Request::Unbundle {
heads: vec![String::from("666f726365")],
heads: vec![String::from("666f726365")], // "force" in hex-encoding
stream: Bytes::from(bundle),
},
);
}
#[test]
fn test_parse_unbundle_minimal() {
let bundle: &[u8] = &b"HG20\0\0\0\0\0\0\0\0"[..];
test_parse_unbundle_with(bundle);
}
#[test]
fn test_parse_unbundle_small() {
let bundle: &[u8] = &include_bytes!("../../fixtures/min.bundle")[..];
test_parse_unbundle_with(bundle);
}
#[test]
fn test_batch_parse_heads() {
let mut inp = BytesMut::from(b"heads\n".to_vec());

View File

@ -53,8 +53,8 @@ pub trait OpenableRepoType {
impl OpenableRepoType for RepoType {
fn open(&self) -> Result<Box<Repo<Error = hgproto::Error> + Sync + Send>> {
use metaconfig::repoconfig::RepoType::*;
use hgproto::{Error, ErrorKind};
use metaconfig::repoconfig::RepoType::*;
fn repo_chain<E: error::Error + Send + 'static>(err: E) -> Error {
Error::with_chain(err, ErrorKind::Repo)
@ -333,10 +333,7 @@ impl HgCommands for RepoClient {
}
// @wireprotocommand('unbundle', 'heads')
fn unbundle(
&self,
heads: Vec<String>, /* , _stream: BoxStream<Vec<u8>, Error> */
) -> HgCommandRes<()> {
fn unbundle(&self, heads: Vec<String>, _stream: Bytes) -> HgCommandRes<()> {
info!(self.logger, "unbundle heads {:?}", heads);
future::ok(()).boxify()
}