streams: add logic to handle missing items

Summary:
In case the server does not respect the input contract and missed
some items without returning errors. The current logic would retry
forever. Change it to detect the issue and raise an error.

Reviewed By: DurhamG

Differential Revision: D24293497

fbshipit-source-id: 09421c7743078a488a9c81ce66fd92c12b39543c
This commit is contained in:
Jun Wu 2020-10-20 15:18:01 -07:00 committed by Facebook GitHub Bot
parent 3942b0cbe6
commit b35886ecdf
2 changed files with 101 additions and 29 deletions

View File

@ -106,7 +106,8 @@ impl StreamCommitText for HybridCommits {
reponame,
};
let buffer_size = 5000;
let stream = HybridStream::new(input, resolver, buffer_size);
let retry_limit = 0;
let stream = HybridStream::new(input, resolver, buffer_size, retry_limit);
let stream = stream.map_ok(|(vertex, raw_text)| ParentlessHgCommit { vertex, raw_text });
Ok(Box::pin(stream))
}
@ -177,6 +178,10 @@ impl HybridResolver<Vertex, Bytes, anyhow::Error> for Resolver {
});
Ok(Box::pin(commits) as BoxStream<'_, _>)
}
fn retry_error(&self, _attempt: usize, input: &[Vertex]) -> anyhow::Error {
anyhow::format_err!("cannot resolve {:?} remotely", input)
}
}
delegate!(IdConvert | PrefixLookup | DagAlgorithm | ToIdSet | ToSet, HybridCommits => self.commits);

View File

@ -44,6 +44,12 @@ struct HybridStreamState<I, O, E> {
/// Defines how to resolve I to O.
resolver: Box<dyn HybridResolver<I, O, E> + Send + Sync + 'static>,
/// Retry attempted.
attempt: usize,
/// Maximum retry count.
retry_limit: usize,
}
/// Defines how to resolve input to output using local data and remote data.
@ -58,6 +64,9 @@ pub trait HybridResolver<I, O, E> {
&mut self,
input: &[I],
) -> Result<BoxStream<'static, Result<(I, O), E>>, E>;
/// Raise an error if the server did not response to input multiple times.
fn retry_error(&self, attempt: usize, input: &[I]) -> E;
}
#[derive(Debug)]
@ -77,6 +86,7 @@ where
stream: BoxStream<'static, Result<I, E>>,
resolver: impl HybridResolver<I, O, E> + Send + Sync + 'static,
buffer_size: usize,
retry_limit: usize,
) -> Self {
let state = HybridStreamState {
input: stream,
@ -85,6 +95,8 @@ where
buffer_size: buffer_size.max(1),
request: Default::default(),
resolver: Box::new(resolver),
attempt: 0,
retry_limit,
};
let stream = futures::stream::unfold(state, |mut state| async {
let item = state.next_item().await;
@ -155,6 +167,8 @@ where
None => ResolveState::NotResolved(input),
};
self.buffer.push_back(state);
// Reset attempt counter.
self.attempt = 0;
count += 1;
}
// Reached the end.
@ -176,6 +190,10 @@ where
None => {
let batch: Vec<I> = self.remote_input();
if !batch.is_empty() {
if self.attempt > self.retry_limit {
return Err(self.resolver.retry_error(self.attempt, &batch));
}
self.attempt += 1;
let request = self.resolver.resolve_remote(&batch).await?;
self.request = Some(request);
}
@ -224,46 +242,69 @@ mod tests {
use std::sync::Mutex;
use tokio::time::{delay_for, Duration};
#[tokio::test]
async fn test_hybrid_stream() {
type I = usize;
type O = String;
type E = std::io::Error;
type I = usize;
type O = String;
type E = std::io::Error;
#[derive(Default)]
struct Resolver {
cached: Arc<Mutex<HashMap<usize, String>>>,
#[derive(Default)]
struct Resolver {
cached: Arc<Mutex<HashMap<usize, String>>>,
}
#[async_trait]
impl HybridResolver<I, O, E> for Resolver {
fn resolve_local(&mut self, input: &I) -> Result<Option<O>, E> {
Ok(self.cached.lock().unwrap().get(input).cloned())
}
#[async_trait]
impl HybridResolver<I, O, E> for Resolver {
fn resolve_local(&mut self, input: &I) -> Result<Option<O>, E> {
Ok(self.cached.lock().unwrap().get(input).cloned())
}
async fn resolve_remote(
&mut self,
input: &[I],
) -> Result<BoxStream<'static, Result<(I, O), E>>, E> {
let cached = self.cached.clone();
// Exercise ".await" in this function.
delay_for(Duration::from_millis(1)).await;
let input: Vec<I> = input.iter().cloned().collect();
let output_iter = input.into_iter().map(move |i| {
async fn resolve_remote(
&mut self,
input: &[I],
) -> Result<BoxStream<'static, Result<(I, O), E>>, E> {
let cached = self.cached.clone();
// Exercise ".await" in this function.
delay_for(Duration::from_millis(1)).await;
// Return nothing for 404.
let output_iter = input
.to_vec()
.into_iter()
.filter(|&i| i != 404)
.map(move |i| {
let o = i.to_string();
cached.lock().unwrap().insert(i, o.clone());
Ok((i, o))
// Return an error for 500.
if i == 500 {
Err(error("cannot resolve 500"))
} else {
Ok((i, o))
}
});
Ok(Box::pin(stream::iter(output_iter)))
}
Ok(Box::pin(stream::iter(output_iter)))
}
fn retry_error(&self, attempt: usize, input: &[I]) -> E {
error(format!(
"give up after {} attempts for input {:?}",
attempt, input
))
}
}
fn error(msg: impl ToString) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, msg.to_string())
}
fn u(v: Option<Result<(I, O), E>>) -> (I, O) {
v.unwrap().unwrap()
}
#[tokio::test]
async fn test_hybrid_stream() {
for &buffer_size in [0, 1, 2, 10].iter() {
let input = stream::iter(vec![0, 1, 3, 5, 10].into_iter().map(Ok));
let resolver = Resolver::default();
resolver.cached.lock().unwrap().insert(1, "one".to_string());
let mut stream = HybridStream::new(Box::pin(input), resolver, buffer_size);
let u = |v: Option<Result<(I, O), E>>| v.unwrap().unwrap();
let mut stream = HybridStream::new(Box::pin(input), resolver, buffer_size, 0);
assert_eq!(u(stream.next().await), (0, "0".to_string()));
assert_eq!(u(stream.next().await), (1, "one".to_string()));
assert_eq!(u(stream.next().await), (3, "3".to_string()));
@ -273,4 +314,30 @@ mod tests {
assert!(stream.next().await.is_none());
}
}
#[tokio::test]
async fn test_hybrid_stream_retry() {
for &retry_limit in [0, 3].iter() {
let input = stream::iter(vec![0, 404, 1, 2].into_iter().map(Ok));
let resolver = Resolver::default();
let mut stream = HybridStream::new(Box::pin(input), resolver, 3, retry_limit);
assert_eq!(u(stream.next().await), (0, "0".to_string()));
assert_eq!(
stream.next().await.unwrap().unwrap_err().to_string(),
format!("give up after {} attempts for input [404]", retry_limit + 1)
);
}
}
#[tokio::test]
async fn test_hybrid_stream_error() {
let input = stream::iter(vec![0, 500, 1, 2].into_iter().map(Ok));
let resolver = Resolver::default();
let mut stream = HybridStream::new(Box::pin(input), resolver, 3, 1);
assert_eq!(u(stream.next().await), (0, "0".to_string()));
assert_eq!(
stream.next().await.unwrap().unwrap_err().to_string(),
"cannot resolve 500",
);
}
}