mononoke/gotham_ext: deduplicate a bit of code

Summary: Like it says in the title.

Reviewed By: mitrandir77

Differential Revision: D27967992

fbshipit-source-id: 0deb4d90538a6889bee6b41de4c5d1533b29519b
This commit is contained in:
Thomas Orozco 2021-04-27 08:08:53 -07:00 committed by Facebook GitHub Bot
parent c3b1479215
commit 1268f3221d

View File

@ -33,7 +33,7 @@ impl Sizeable for Bytes {
/// A stream that will fire to the sender associated upon completing or being dropped. The Sender
/// will receive the amount of data that passed through the stream.
#[pin_project(PinnedDrop)]
#[pin_project(PinnedDrop, project = SignalStreamProjection)]
pub struct SignalStream<S> {
#[pin]
stream: S,
@ -59,21 +59,15 @@ where
type Item = T;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut this = self.project();
let poll = ready!(this.stream.poll_next(ctx));
let poll = ready!(this.stream.as_mut().poll_next(ctx));
match poll {
// We have an item: increment the amount of data we sent.
Some(ref item) => *this.size_sent += item.size(),
// No items left: signal our receiver.
None => {
if let Some(sender) = this.sender.take() {
let _ = sender.send(BodyMeta {
bytes_sent: *this.size_sent,
});
}
}
None => send_body_meta(this),
}
Poll::Ready(poll)
@ -84,11 +78,14 @@ where
impl<S> PinnedDrop for SignalStream<S> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(sender) = this.sender.take() {
let _ = sender.send(BodyMeta {
bytes_sent: *this.size_sent,
});
}
send_body_meta(this);
}
}
fn send_body_meta<S>(this: SignalStreamProjection<S>) {
if let Some(sender) = this.sender.take() {
let _ = sender.send(BodyMeta {
bytes_sent: *this.size_sent,
});
}
}