gotham_ext: don't end SignalStream on error

Summary: Previously, `SignalStream` would assume that an error would always end the `Stream`, and would therefore stop and report the amount of transferred data upon encountering any error. This isn't always the desired behavior, as it is possible for `TryStream`s to return mid-stream errors without immediately ending. `SignalStream` should allow for this kind of usage.

Reviewed By: farnz

Differential Revision: D23643337

fbshipit-source-id: 2c7ffd9d02c05bc09c6ec0e282c0b2cca166e079
This commit is contained in:
Arun Kulshreshtha 2020-09-11 09:48:47 -07:00 committed by Facebook GitHub Bot
parent eb7dc081c3
commit 4d76a4c241

View File

@ -11,6 +11,7 @@ use std::pin::Pin;
use bytes::Bytes;
use futures::{
channel::oneshot::Sender,
ready,
stream::Stream,
task::{Context, Poll},
};
@ -70,26 +71,19 @@ where
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let (stream, sender, size_sent) = self.pin_get_parts();
if sender.is_none() {
return Poll::Ready(None);
}
let poll = ready!(stream.poll_next(ctx));
let poll = match stream.poll_next(ctx) {
Poll::Ready(poll) => poll,
Poll::Pending => {
return Poll::Pending;
}
};
if let Some(Ok(ref item)) = poll {
match poll {
// We have an item: increment the amount of data we sent.
*size_sent += item.size();
} else {
Some(Ok(ref item)) => *size_sent += item.size(),
// Got an error: ignore it for size purposes.
Some(Err(_)) => {}
// No items left: signal our receiver.
let _ = sender
.take()
.expect("presence checked above")
.send(*size_sent);
None => {
if let Some(sender) = sender.take() {
let _ = sender.send(*size_sent);
}
}
}
Poll::Ready(poll)