http_client: add async batch interface

Summary:
Add the methods `send_async` and `send_async_with_progress` to `HttpClient`. These methods provide a `Futures`-based async interface that will make it easy to use the client from async Rust code.

Just like in D22231555, this is built on top of the previously introduced streaming API. When a batch request is sent, the client will start a Multi session on a task in a blocking worker thread using `tokio::task::spawn_blocking`. This means that right now, the implementation is not /truly/ async, but it should be possible to change the implementation in the future to avoid using any blocking I/O without needing to change the public interface.

Since all of the requests are part of the same Multi session, they will all proceed concurrently and, if possible, be multiplexed over the same connection in the case of multiple HTTP/2 requests to the same server (which is going to be our main use-case).

Unfortunately, since libcurl does not have any internal synchronization, ownership of the Multi session needs to be passed to the worker thread, meaning that the Multi handle will be dropped once the requests are complete. This means that connections will not be re-used when these methods are called several times serially. The API should make it obvious to the user that the internal state is not preserved since these methods both consume the `HttpClient` itself.

Reviewed By: quark-zju

Differential Revision: D22251488

fbshipit-source-id: 37caf64024cb12b95df5124379209550899d093d
This commit is contained in:
Arun Kulshreshtha 2020-06-30 21:03:21 -07:00 committed by Facebook GitHub Bot
parent 0f1794f897
commit 0b5ae6e2bf
2 changed files with 125 additions and 2 deletions

View File

@ -6,20 +6,27 @@
*/
use std::convert::{TryFrom, TryInto};
use std::pin::Pin;
use curl::{easy::Easy2, multi::Multi};
use futures::prelude::*;
use crate::{
driver::MultiDriver,
errors::{Abort, HttpClientError},
handler::{Buffered, Streaming},
progress::Progress,
receiver::Receiver,
receiver::{ChannelReceiver, Receiver},
request::{Request, StreamRequest},
response::Response,
response::{AsyncResponse, Response},
stats::Stats,
};
pub type ResponseStream =
Pin<Box<dyn Stream<Item = Result<AsyncResponse, HttpClientError>> + Send + 'static>>;
pub type StatsFuture =
Pin<Box<dyn Future<Output = Result<Stats, HttpClientError>> + Send + 'static>>;
/// A simple callback-oriented HTTP client.
///
/// Essentially a more ergonomic API for working with
@ -86,6 +93,60 @@ impl HttpClient {
})
}
/// Async version of `send` which runs all of the given request concurrently
/// in another thread. Returns a stream of responses (returned in the order
/// in which they arrive) as well as a future that will return aggregated
/// transfer statistics once all of the requests have completed.
///
/// Note that the response stream will yield a `Response` whenever all of
/// the headers for that responses have been received. The response body is
/// available as a `Stream` inside of each returned `Response` struct
pub fn send_async<I: IntoIterator<Item = Request>>(
self,
requests: I,
) -> Result<(ResponseStream, StatsFuture), HttpClientError> {
self.send_async_with_progress(requests, |_| ())
}
/// Same as `send_async()`, but takes an additional closure for
/// monitoring the collective progress of all of the transfers.
/// The closure will be called whenever any of the underlying
/// transfers make progress.
pub fn send_async_with_progress<I, P>(
self,
requests: I,
progress_cb: P,
) -> Result<(ResponseStream, StatsFuture), HttpClientError>
where
I: IntoIterator<Item = Request>,
P: FnMut(Progress) + Send + 'static,
{
let client = self;
let mut stream_requests = Vec::new();
let response_stream = stream::FuturesUnordered::new();
for req in requests {
let (receiver, streams) = ChannelReceiver::new();
let req = req.into_streaming(receiver);
stream_requests.push(req);
let res = AsyncResponse::new(streams);
response_stream.push(res);
}
let task = tokio::task::spawn_blocking(move || {
client.stream_with_progress(stream_requests, progress_cb)
});
let stats = task
.err_into::<HttpClientError>()
.map(|res| Ok(res??))
.boxed();
Ok((response_stream.boxed(), stats))
}
/// Perform the given requests, but stream the responses to the
/// `Receiver` attached to each respective request rather than
/// buffering the content of each response.
@ -299,4 +360,64 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_async() -> Result<()> {
let body1 = b"body1";
let body2 = b"body2";
let body3 = b"body3";
let mock1 = mock("GET", "/test1")
.with_status(201)
.with_body(&body1)
.create();
let mock2 = mock("GET", "/test2")
.with_status(201)
.with_body(&body2)
.create();
let mock3 = mock("GET", "/test3")
.with_status(201)
.with_body(&body3)
.create();
let server_url = Url::parse(&mockito::server_url())?;
let url1 = server_url.join("test1")?;
let req1 = Request::get(url1);
let url2 = server_url.join("test2")?;
let req2 = Request::get(url2);
let url3 = server_url.join("test3")?;
let req3 = Request::get(url3);
let client = HttpClient::new();
let (stream, stats) = client.send_async(vec![req1, req2, req3])?;
let responses = stream.try_collect::<Vec<_>>().await?;
mock1.assert();
mock2.assert();
mock3.assert();
let mut not_received = HashSet::new();
not_received.insert(body1.to_vec());
not_received.insert(body2.to_vec());
not_received.insert(body3.to_vec());
for res in responses {
assert_eq!(res.status, StatusCode::CREATED);
let body = res.body.try_concat().await?;
assert!(not_received.remove(&*body));
}
assert!(not_received.is_empty());
let stats = stats.await?;
assert_eq!(stats.requests, 3);
Ok(())
}
}

View File

@ -22,6 +22,8 @@ pub enum HttpClientError {
BadResponse,
#[error("The request was dropped before it could complete")]
RequestDropped(#[from] oneshot::Canceled),
#[error("The I/O task terminated unexpectedly: {}", .0)]
IoTaskFailed(#[from] tokio::task::JoinError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}