rust: futures-ext: only consume as much input as needed for the decoder to make progress

Summary:
Avoid over-consuming the input, as excess input isn't easily pushed
back into the input stream. This is necessary to make sure that when parsing a
bundle2 stream we don't consume input from beyond the end of the stream, which
can upset the hg ssh command processor.

Reviewed By: sid0

Differential Revision: D6214174

fbshipit-source-id: 3a2227fab5ddfbb247437508206d40e85024c48e
This commit is contained in:
Jeremy Fitzhardinge 2017-11-01 18:49:24 -07:00 committed by Facebook Github Bot
parent 6ba05b01a8
commit 6b5a13c1f6
3 changed files with 26 additions and 11 deletions

View File

@ -9,7 +9,7 @@
use std::cmp;
use std::io::{self, Read};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use futures::{Async, Poll, Stream};
use tokio_io::AsyncRead;
use tokio_io::codec::Decoder;
@ -113,7 +113,25 @@ where
// Otherwise, try to read more data and try again. Make sure we've
// got room to make progress.
self.buf.reserve(BUFSIZE);
if 0 == try_ready!(self.inner.read_buf(&mut self.buf)) {
let got = unsafe {
// Read 1 byte at a time to avoid over-reading, since we don't know
// how much we'll need. (Ideally the decoder could tell us how much
// more input it needs to make progress.)
// TODO: (jsgf) T23239742 Either fix decode to return amount needed or
// completely rewrite as a streaming command parser.
let mut buf = &mut self.buf;
let n = {
let b = &mut buf.bytes_mut()[..1];
self.inner.prepare_uninitialized_buffer(b);
let n = try_nb!(self.inner.read(b));
assert!(n <= 1);
n
};
buf.advance_mut(n);
n
};
if got == 0 {
self.eof = true;
}
@ -212,8 +230,8 @@ mod test {
assert_eq!(res.unwrap().as_ref(), b"hello, world!");
// Make sure the entire input has been read.
assert_eq!(stream.get_ref().position(), 25);
// Make sure only the required input (and no more) has been read.
assert_eq!(stream.get_ref().position(), 17);
// Add some more data to the end so that we can test that both the
// remaining bits in the buffer and the additional data we wrote can be
@ -223,7 +241,7 @@ mod test {
let mut read2 = stream.into_inner_leading();
let mut out = vec![];
assert_matches!(read2.read_to_end(&mut out), Ok(16));
assert_eq!(out, b"foo-bar-baz-quux");
assert_matches!(read2.read_to_end(&mut out), Ok(0));
assert!(out.is_empty());
}
}

View File

@ -16,7 +16,7 @@ extern crate futures;
#[cfg(test)]
#[macro_use]
extern crate quickcheck;
#[cfg(test)]
#[macro_use]
extern crate tokio_core;
extern crate tokio_io;

View File

@ -35,10 +35,7 @@ use utils::get_compression_param;
const BZIP2_BUNDLE2: &[u8] = include_bytes!("fixtures/bzip2.bin");
const UNCOMP_BUNDLE2: &[u8] = include_bytes!("fixtures/uncompressed.bin");
const UNKNOWN_COMPRESSION_BUNDLE2: &[u8] = include_bytes!(
"fixtures/unknown-compression.\
bin"
);
const UNKNOWN_COMPRESSION_BUNDLE2: &[u8] = include_bytes!("fixtures/unknown-compression.bin");
const CHANGESET1_HASH_STR: &str = "b2040b24fd5cdfaf36e3164ddc357e834167b14a";
const CHANGESET2_HASH_STR: &str = "415ab71954c98ea93dab4b8f61f04ca57bc5c33c";