mononoke: add wirepack converter

Summary:
Wirepack packer code has an interesting logic that can be generalized.
For example, it can be used on the push side to convert b2xtreegroup wirepack
stream into stream on entries that can be used by Commit Api. This logic will
be implemented in the next diffs.

In this diff we separate wirepack logic into
two parts: converter stream, that converts wirepack stream into another stream,
and a trait that is used by converter.

Reviewed By: lukaspiatkowski

Differential Revision: D6965830

fbshipit-source-id: 726a84ba44c7f85b5aac48e702755dc904c9c6ab
This commit is contained in:
Stanislau Hlebik 2018-02-14 08:07:34 -08:00 committed by Facebook Github Bot
parent c0b25700db
commit e40bd8bd3d
3 changed files with 295 additions and 188 deletions

View File

@ -0,0 +1,251 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! Given a stream of wirepack entries, convert it to to other stream, using WirePackPartProcessor
use std::mem;
use futures::{Async, Poll, Stream};
use mercurial_types::RepoPath;
use super::{DataEntry, HistoryEntry, Part};
use errors::*;
pub trait WirePackPartProcessor {
type Data;
fn history_meta(&mut self, path: &RepoPath, entry_count: u32) -> Result<Option<Self::Data>>;
fn history(&mut self, entry: &HistoryEntry) -> Result<Option<Self::Data>>;
fn data_meta(&mut self, path: &RepoPath, entry_count: u32) -> Result<Option<Self::Data>>;
fn data(&mut self, data_entry: &DataEntry) -> Result<Option<Self::Data>>;
fn end(&mut self) -> Result<Option<Self::Data>>;
}
pub struct WirePackConverter<S, P> {
part_stream: S,
state: State,
processor: P,
}
impl<S, P> WirePackConverter<S, P> {
pub fn new(part_stream: S, processor: P) -> Self {
Self {
part_stream,
state: State::HistoryMeta,
processor,
}
}
}
impl<S, P> Stream for WirePackConverter<S, P>
where
S: Stream<Item = Part, Error = Error>,
P: WirePackPartProcessor,
{
type Item = <P as WirePackPartProcessor>::Data;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Error> {
use self::Part::*;
if self.state == State::End {
// The stream is over.
return Ok(Async::Ready(None));
}
loop {
match try_ready!(self.part_stream.poll()) {
None => {
self.state.seen_none()?;
return Ok(Async::Ready(None));
}
Some(HistoryMeta { path, entry_count }) => {
if let Some(history_meta) = self.processor.history_meta(&path, entry_count)? {
self.state.seen_history_meta(path, entry_count)?;
return Ok(Async::Ready(Some(history_meta)));
}
// seen_history_meta comes afterwards because we have to transfer
// ownership of path
self.state.seen_history_meta(path, entry_count)?;
}
Some(History(history_entry)) => {
self.state.seen_history(&history_entry)?;
if let Some(encoded_history) = self.processor.history(&history_entry)? {
return Ok(Async::Ready(Some(encoded_history)));
}
}
Some(DataMeta { path, entry_count }) => {
if let Some(data_meta) = self.processor.data_meta(&path, entry_count)? {
self.state.seen_data_meta(path, entry_count)?;
return Ok(Async::Ready(Some(data_meta)));
}
self.state.seen_data_meta(path, entry_count)?;
}
Some(Data(data_entry)) => {
self.state.seen_data(&data_entry)?;
if let Some(data_entry) = self.processor.data(&data_entry)? {
return Ok(Async::Ready(Some(data_entry)));
}
}
Some(End) => {
self.state.seen_end()?;
if let Some(end) = self.processor.end()? {
return Ok(Async::Ready(Some(end)));
}
}
}
}
}
}
#[derive(Debug, PartialEq)]
enum State {
HistoryMeta,
History { path: RepoPath, entry_count: u32 },
DataMeta { path: RepoPath },
Data { path: RepoPath, entry_count: u32 },
End,
Invalid,
}
impl State {
fn next_history_state(path: RepoPath, entry_count: u32) -> Self {
if entry_count == 0 {
State::DataMeta { path }
} else {
State::History { path, entry_count }
}
}
fn next_data_state(path: RepoPath, entry_count: u32) -> Self {
if entry_count == 0 {
State::HistoryMeta
} else {
State::Data { path, entry_count }
}
}
fn seen_history_meta(&mut self, path: RepoPath, entry_count: u32) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::HistoryMeta => Self::next_history_state(path, entry_count),
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected history meta entry (state: {:?})",
other
)));
}
};
Ok(())
}
fn seen_history(&mut self, entry: &HistoryEntry) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::History { path, entry_count } => {
ensure_err!(
entry_count > 0,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw history entry for {} after count dropped to 0",
entry.node
))
);
Self::next_history_state(path, entry_count - 1)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected history entry for {} (state: {:?})",
entry.node, other
)));
}
};
Ok(())
}
fn seen_data_meta(&mut self, path: RepoPath, entry_count: u32) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::DataMeta {
path: expected_path,
} => {
ensure_err!(
path == expected_path,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw data meta for path '{}', expected path '{}'\
(entry_count: {})",
path, expected_path, entry_count
))
);
Self::next_data_state(path, entry_count)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw unexpected data meta for {} (entry count: {}, \
state: {:?}",
path, entry_count, other
),));
}
};
Ok(())
}
fn seen_data(&mut self, entry: &DataEntry) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::Data { path, entry_count } => {
ensure_err!(
entry_count > 0,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw history entry for {} after count dropped to 0",
entry.node
))
);
Self::next_data_state(path, entry_count - 1)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected data entry for {} (state: {:?})",
entry.node, other
)));
}
};
Ok(())
}
fn seen_end(&mut self) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::HistoryMeta => State::End,
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected end (state: {:?})",
other
)));
}
};
Ok(())
}
fn seen_none(&mut self) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::End => State::End,
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected None (state: {:?})",
other
)));
}
};
Ok(())
}
}

View File

@ -17,6 +17,7 @@ use delta;
use errors::*;
use utils::BytesExt;
mod converter;
pub mod packer;
#[cfg(test)]
mod quickcheck_types;

View File

@ -8,31 +8,29 @@
//! The format is documented at
//! https://bitbucket.org/facebook/hg-experimental/src/@/remotefilelog/wirepack.py.
use std::mem;
use byteorder::BigEndian;
use bytes::BufMut;
use futures::{Async, Poll, Stream};
use mercurial_types::{MPath, RepoPath};
use futures::{Poll, Stream};
use chunk::Chunk;
use errors::*;
use mercurial_types::{MPath, RepoPath};
use super::{DataEntry, HistoryEntry, Kind, Part, WIREPACK_END};
use super::converter::{WirePackConverter, WirePackPartProcessor};
use errors::*;
pub struct WirePackPacker<S> {
part_stream: S,
kind: Kind,
state: State,
stream: WirePackConverter<S, PackerProcessor>,
}
impl<S> WirePackPacker<S> {
impl<S> WirePackPacker<S>
where
S: Stream<Item = Part, Error = Error>,
{
pub fn new(part_stream: S, kind: Kind) -> Self {
Self {
part_stream,
kind,
state: State::HistoryMeta,
stream: WirePackConverter::new(part_stream, PackerProcessor { kind }),
}
}
}
@ -45,192 +43,49 @@ where
type Error = Error;
fn poll(&mut self) -> Poll<Option<Chunk>, Error> {
use self::Part::*;
self.stream.poll()
}
}
if self.state == State::End {
// The stream is over.
return Ok(Async::Ready(None));
}
struct PackerProcessor {
kind: Kind,
}
unsafe impl Send for PackerProcessor {}
unsafe impl Sync for PackerProcessor {}
impl WirePackPartProcessor for PackerProcessor {
type Data = Chunk;
fn history_meta(&mut self, path: &RepoPath, entry_count: u32) -> Result<Option<Self::Data>> {
let mut builder = ChunkBuilder::new(self.kind);
match try_ready!(self.part_stream.poll()) {
None => {
self.state.seen_none()?;
return Ok(Async::Ready(None));
}
Some(HistoryMeta { path, entry_count }) => {
builder.encode_filename(&path)?;
builder.encode_entry_count(entry_count);
// seen_history_meta comes afterwards because we have to transfer ownership of path
self.state.seen_history_meta(path, entry_count)?;
}
Some(History(history_entry)) => {
self.state.seen_history(&history_entry)?;
builder.encode_history(&history_entry)?;
}
Some(DataMeta { path, entry_count }) => {
self.state.seen_data_meta(path, entry_count)?;
builder.encode_entry_count(entry_count);
}
Some(Data(data_entry)) => {
self.state.seen_data(&data_entry)?;
builder.encode_data(&data_entry)?;
}
Some(End) => {
self.state.seen_end()?;
builder.encode_end();
}
}
Ok(Async::Ready(Some(builder.build()?)))
}
}
#[derive(Debug, PartialEq)]
enum State {
HistoryMeta,
History { path: RepoPath, entry_count: u32 },
DataMeta { path: RepoPath },
Data { path: RepoPath, entry_count: u32 },
End,
Invalid,
}
impl State {
fn next_history_state(path: RepoPath, entry_count: u32) -> Self {
if entry_count == 0 {
State::DataMeta { path }
} else {
State::History { path, entry_count }
}
builder.encode_filename(&path)?;
builder.encode_entry_count(entry_count);
Ok(Some(builder.build()?))
}
fn next_data_state(path: RepoPath, entry_count: u32) -> Self {
if entry_count == 0 {
State::HistoryMeta
} else {
State::Data { path, entry_count }
}
fn history(&mut self, entry: &HistoryEntry) -> Result<Option<Self::Data>> {
let mut builder = ChunkBuilder::new(self.kind);
builder.encode_history(entry)?;
Ok(Some(builder.build()?))
}
fn seen_history_meta(&mut self, path: RepoPath, entry_count: u32) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::HistoryMeta => Self::next_history_state(path, entry_count),
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected history meta entry (state: {:?})",
other
)));
}
};
Ok(())
fn data_meta(&mut self, _path: &RepoPath, entry_count: u32) -> Result<Option<Self::Data>> {
let mut builder = ChunkBuilder::new(self.kind);
builder.encode_entry_count(entry_count);
Ok(Some(builder.build()?))
}
fn seen_history(&mut self, entry: &HistoryEntry) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::History { path, entry_count } => {
ensure_err!(
entry_count > 0,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw history entry for {} after count dropped to 0",
entry.node
))
);
Self::next_history_state(path, entry_count - 1)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected history entry for {} (state: {:?})",
entry.node, other
)));
}
};
Ok(())
fn data(&mut self, data_entry: &DataEntry) -> Result<Option<Self::Data>> {
let mut builder = ChunkBuilder::new(self.kind);
builder.encode_data(&data_entry)?;
Ok(Some(builder.build()?))
}
fn seen_data_meta(&mut self, path: RepoPath, entry_count: u32) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::DataMeta {
path: expected_path,
} => {
ensure_err!(
path == expected_path,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw data meta for path '{}', expected path '{}'\
(entry_count: {})",
path, expected_path, entry_count
))
);
Self::next_data_state(path, entry_count)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw unexpected data meta for {} (entry count: {}, \
state: {:?}",
path, entry_count, other
),));
}
};
Ok(())
}
fn seen_data(&mut self, entry: &DataEntry) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::Data { path, entry_count } => {
ensure_err!(
entry_count > 0,
ErrorKind::WirePackEncode(format!(
"invalid encode stream: saw history entry for {} after count dropped to 0",
entry.node
))
);
Self::next_data_state(path, entry_count - 1)
}
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected data entry for {} (state: {:?})",
entry.node, other
)));
}
};
Ok(())
}
fn seen_end(&mut self) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::HistoryMeta => State::End,
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected end (state: {:?})",
other
)));
}
};
Ok(())
}
fn seen_none(&mut self) -> Result<()> {
let state = mem::replace(self, State::Invalid);
*self = match state {
State::End => State::End,
other => {
bail_err!(ErrorKind::WirePackEncode(format!(
"invalid encode stream: unexpected None (state: {:?})",
other
)));
}
};
Ok(())
fn end(&mut self) -> Result<Option<Self::Data>> {
let mut builder = ChunkBuilder::new(self.kind);
builder.encode_end();
Ok(Some(builder.build()?))
}
}