diff --git a/hgproto/fixtures/min.bundle b/hgproto/fixtures/min.bundle new file mode 100644 index 0000000000..d0e3f1b8a1 Binary files /dev/null and b/hgproto/fixtures/min.bundle differ diff --git a/hgproto/src/lib.rs b/hgproto/src/lib.rs index 4164fb41ad..026575361b 100644 --- a/hgproto/src/lib.rs +++ b/hgproto/src/lib.rs @@ -94,9 +94,7 @@ pub enum Request { new: NodeHash, }, Streamout, - Unbundle { - heads: Vec, /* stream: Stream, Error> TBD: Stream */ - }, + Unbundle { heads: Vec, stream: Bytes }, } /// The arguments that `getbundle` accepts, in a separate struct for diff --git a/hgproto/src/service.rs b/hgproto/src/service.rs index 20d94bc7fd..79fed62bc3 100644 --- a/hgproto/src/service.rs +++ b/hgproto/src/service.rs @@ -145,8 +145,8 @@ impl HgService { .map(|_| Response::Streamout) .map_err(self::Error::into) .boxify(), - Request::Unbundle { heads } => hgcmds - .unbundle(heads) + Request::Unbundle { heads, stream } => hgcmds + .unbundle(heads, stream) .map(|_| Response::Unbundle) .map_err(self::Error::into) .boxify(), //_ => unimplemented!() @@ -350,10 +350,7 @@ pub trait HgCommands { } // @wireprotocommand('unbundle', 'heads') - fn unbundle( - &self, - _heads: Vec, /* , _stream: BoxStream, Error> */ - ) -> HgCommandRes<()> { + fn unbundle(&self, _heads: Vec, _stream: Bytes) -> HgCommandRes<()> { unimplemented("unbundle") } } diff --git a/hgproto/src/sshproto/request.rs b/hgproto/src/sshproto/request.rs index 2aa455785b..a7c2a9e329 100644 --- a/hgproto/src/sshproto/request.rs +++ b/hgproto/src/sshproto/request.rs @@ -5,12 +5,17 @@ // GNU General Public License version 2 or any later version. use std::collections::HashMap; +use std::io::{self, Cursor, Read}; use std::iter; use std::str::{self, FromStr}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; +use futures::{Async, Stream}; use nom::{is_alphanumeric, is_digit, ErrorKind, FindSubstring, IResult, Needed, Slice}; +use slog; +use tokio_io::AsyncRead; +use mercurial_bundles::bundle2::Bundle2Stream; use mercurial_types::NodeHash; use {GetbundleArgs, Request}; @@ -278,6 +283,49 @@ where } } +// This is pretty awful. We need to consume the input until we have an entire stream, +// but only so we can count the underlying bytes. Once we have that we take the raw bytes +// and throw away all the result of parsing. +fn bundle2stream(inp: &[u8]) -> IResult<&[u8], Bytes> { + // Reaching the end of the buffer just means we need more input, not that there is no + // more input. So remap EOF to WouldBlock. + #[derive(Debug)] + struct EofCursor(Cursor); + impl> Read for EofCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self.0.read(buf) { + Ok(0) => Err(io::Error::from(io::ErrorKind::WouldBlock)), + Ok(v) => Ok(v), + Err(err) => Err(err), + } + } + } + impl> AsyncRead for EofCursor {} + + let mut cur = EofCursor(Cursor::new(inp)); + { + let logger = slog::Logger::root(slog::Discard, o!()); + let mut b2 = Bundle2Stream::new(&mut cur, logger); + + loop { + match b2.poll() { + Ok(Async::Ready(Some(_item))) => (), + Ok(Async::Ready(None)) => { + break; + } + Ok(Async::NotReady) => { + return IResult::Incomplete(Needed::Unknown); + } + Err(_err) => { + return IResult::Error(ErrorKind::Custom(0xbad)); + } + } + } + }; + + let (stream, rest) = inp.split_at(cur.0.position() as usize); + IResult::Done(rest, Bytes::from(stream)) +} /// Parse a command, given some input, a command name (used as a tag), a param parser /// function (which generalizes over batched and non-batched parameter syntaxes), @@ -373,6 +421,35 @@ pub fn parse_batch(buf: &mut BytesMut) -> Result> { parse_common(buf, batch_params) } +/// Parse an unbundle command. This is unusual because it's a normal command talking +/// the "heads" parameter, but then it consumes the rest of the stream as a bundle2 +/// stream until that finishes, then resumes normal command processing. Ideally this +/// should be implemented in a streaming way, but it isn't yet. We process the unbundle +/// command almost as usual, then follow it by capturing the bundle2 stream as Bytes and +/// return them as a Request::Unbundle object for later processing. +fn unbundle( + inp: &[u8], + parse_params: fn(&[u8], usize) + -> IResult<&[u8], HashMap, Vec>>, +) -> IResult<&[u8], Request> { + // Use this as a syntactic proxy for Request::Unbundle, which works because Request's + // values are struct-like enums, and this is a struct, so the command macro is happy + // to fill it out. + struct UnbundleCmd { + heads: Vec, + } + do_parse!(inp, + unbundle: command!("unbundle", UnbundleCmd, parse_params, { + heads => stringlist, + }) >> + stream: bundle2stream >> + (Request::Unbundle { + heads: unbundle.heads, + stream: stream + }) + ) +} + /// Common parser, generalized over how to parse parameters (either unbatched or /// batched syntax.) #[cfg_attr(rustfmt, rustfmt_skip)] @@ -441,9 +518,7 @@ fn parse_common( new => nodehash, }) | command!("streamout", Streamout, parse_params, {}) - | command!("unbundle", Unbundle, parse_params, { - heads => stringlist, - }) + | call!(unbundle, parse_params) ); // Turn "rest" into a "consumed" bytecount, so consume it once the @@ -923,7 +998,7 @@ mod test { #[cfg(test)] mod test_parse { use super::*; - use std::fmt::Display; + use std::fmt::Debug; fn hash_ones() -> NodeHash { "1111111111111111111111111111111111111111".parse().unwrap() @@ -945,7 +1020,7 @@ mod test_parse { /// - check all truncated inputs return "Ok(None)" /// - complete inputs return the expected result, and leave any remainder in /// the input buffer. - fn test_parse + Display>(inp: I, exp: Request) { + fn test_parse + Debug>(inp: I, exp: Request) { let inbytes = inp.as_ref(); // check for short inputs @@ -954,16 +1029,18 @@ mod test_parse { match parse(&mut buf) { Ok(None) => (), Ok(Some(val)) => panic!( - "BAD PASS: inp >>{}<< len {} passed unexpectedly val {:?}", - inp, + "BAD PASS: inp >>{:?}<< lpassed unexpectedly val {:?} pass with {}/{} bytes", + Bytes::from(inbytes.to_vec()), + val, l, - val + inbytes.len() ), Err(err) => panic!( - "BAD FAIL: inp >>{}<< len {} failed {:?} (not incomplete)", - inp, + "BAD FAIL: inp >>{:?}<< failed {:?} (not incomplete) with {}/{} bytes", + Bytes::from(inbytes.to_vec()), + err, l, - err + inbytes.len() ), }; } @@ -973,14 +1050,23 @@ mod test_parse { for l in 0..extra.len() { let mut buf = BytesMut::from(inbytes.to_vec()); buf.extend_from_slice(&extra[0..l]); + let buflen = buf.len(); match parse(&mut buf) { - Ok(Some(val)) => assert_eq!(val, exp), - Ok(None) => panic!("BAD INCOMPLETE: inp >>{}<< extra {} incomplete", inp, l), - Err(err) => panic!( - "BAD FAIL: inp >>{}<< extra {} failed {:?} (not incomplete)", - inp, + Ok(Some(val)) => assert_eq!(val, exp, "with {}/{} bytes", buflen, inbytes.len()), + Ok(None) => panic!( + "BAD INCOMPLETE: inp >>{:?}<< extra {} incomplete {}/{} bytes", + Bytes::from(inbytes.to_vec()), l, - err + buflen, + inbytes.len() + ), + Err(err) => panic!( + "BAD FAIL: inp >>{:?}<< extra {} failed {:?} (not incomplete) with {}/{} bytes", + Bytes::from(inbytes.to_vec()), + l, + err, + buflen, + inbytes.len() ), }; assert_eq!(&*buf, &extra[0..l]); @@ -1232,20 +1318,34 @@ mod test_parse { test_parse(inp, Request::Streamout {}); } - #[test] - fn test_parse_unbundle() { - let inp = "unbundle\n\ - heads 10\n\ - 666f726365"; // "force" in hex-encoding + fn test_parse_unbundle_with(bundle: &[u8]) { + let mut inp = b"unbundle\n\ + heads 10\n\ + 666f726365" // "force" hex encoded + .to_vec(); + inp.extend_from_slice(bundle); test_parse( inp, Request::Unbundle { - heads: vec![String::from("666f726365")], + heads: vec![String::from("666f726365")], // "force" in hex-encoding + stream: Bytes::from(bundle), }, ); } + #[test] + fn test_parse_unbundle_minimal() { + let bundle: &[u8] = &b"HG20\0\0\0\0\0\0\0\0"[..]; + test_parse_unbundle_with(bundle); + } + + #[test] + fn test_parse_unbundle_small() { + let bundle: &[u8] = &include_bytes!("../../fixtures/min.bundle")[..]; + test_parse_unbundle_with(bundle); + } + #[test] fn test_batch_parse_heads() { let mut inp = BytesMut::from(b"heads\n".to_vec()); diff --git a/server/src/repo.rs b/server/src/repo.rs index 9bcd054a4a..2d8685fb3e 100644 --- a/server/src/repo.rs +++ b/server/src/repo.rs @@ -53,8 +53,8 @@ pub trait OpenableRepoType { impl OpenableRepoType for RepoType { fn open(&self) -> Result + Sync + Send>> { - use metaconfig::repoconfig::RepoType::*; use hgproto::{Error, ErrorKind}; + use metaconfig::repoconfig::RepoType::*; fn repo_chain(err: E) -> Error { Error::with_chain(err, ErrorKind::Repo) @@ -333,10 +333,7 @@ impl HgCommands for RepoClient { } // @wireprotocommand('unbundle', 'heads') - fn unbundle( - &self, - heads: Vec, /* , _stream: BoxStream, Error> */ - ) -> HgCommandRes<()> { + fn unbundle(&self, heads: Vec, _stream: Bytes) -> HgCommandRes<()> { info!(self.logger, "unbundle heads {:?}", heads); future::ok(()).boxify() }