mononoke/repo_client: remove 0.1 tokio timeout from streaming clone

Summary:
Like it says in the title. This one was pretty easy to just convert to 0.3
futures so I did so.

Reviewed By: StanislavGlebik

Differential Revision: D26485577

fbshipit-source-id: 76c751c1004288dda1d7b62866979c9228e0ef34
This commit is contained in:
Thomas Orozco 2021-02-19 06:57:14 -08:00 committed by Facebook GitHub Bot
parent c8854bf5c3
commit ae83446c36
3 changed files with 175 additions and 182 deletions

View File

@ -198,7 +198,6 @@ impl StreamingCloneWarmup {
let chunks = self
.fetcher
.fetch_changelog(ctx.clone(), self.repoid, self.blobstore.clone())
.compat()
.await?;
info!(
ctx.logger(),
@ -239,21 +238,13 @@ async fn chunks_warmup(ctx: CoreContext, chunks: RevlogStreamingChunks) -> Resul
data_size: data_size_expected,
} = chunks;
let index = stream::iter(
index_blobs
.into_iter()
.map(|f| f.compat().map_ok(|b| b.len())),
)
.buffer_unordered(100)
.try_fold(0usize, |acc, size| future::ok(acc + size));
let index = stream::iter(index_blobs.into_iter().map(|f| f.map_ok(|b| b.len())))
.buffer_unordered(100)
.try_fold(0usize, |acc, size| future::ok(acc + size));
let data = stream::iter(
data_blobs
.into_iter()
.map(|f| f.compat().map_ok(|b| b.len())),
)
.buffer_unordered(100)
.try_fold(0usize, |acc, size| future::ok(acc + size));
let data = stream::iter(data_blobs.into_iter().map(|f| f.map_ok(|b| b.len())))
.buffer_unordered(100)
.try_fold(0usize, |acc, size| future::ok(acc + size));
let (index_size, data_size) = try_join(index, data).await?;
if index_size_expected != index_size {

View File

@ -39,7 +39,7 @@ use futures_old::future::ok;
use futures_old::{
future as future_old, stream as stream_old, try_ready, Async, Future, IntoFuture, Poll, Stream,
};
use futures_stats::{Timed, TimedStreamTrait};
use futures_stats::{Timed, TimedFutureExt, TimedStreamTrait};
use getbundle_response::{
create_getbundle_response, DraftsInBundlesPolicy, PhasesPart, SessionLfsParams,
};
@ -1931,124 +1931,129 @@ impl HgCommands for RepoClient {
// @wireprotocommand('stream_out_shallow')
fn stream_out_shallow(&self) -> BoxStream<BytesOld, Error> {
self.command_stream(ops::STREAMOUTSHALLOW, UNSAMPLED, |ctx, command_logger| {
let changelog = match self.repo.streaming_clone() {
None => Ok(RevlogStreamingChunks::new()).into_future().left_future(),
Some(SqlStreamingCloneConfig {
blobstore,
fetcher,
repoid,
}) => fetcher
.fetch_changelog(ctx.clone(), *repoid, blobstore.clone())
.right_future(),
};
let streaming_clone = self.repo.streaming_clone().clone();
changelog
.map({
cloned!(ctx);
move |chunk| {
let data_blobs = chunk
.data_blobs
.into_iter()
.map(|fut| {
fut.timed({
let ctx = ctx.clone();
move |stats, blob| {
ctx.perf_counters().add_to_counter(
PerfCounterType::SumManifoldPollTime,
stats.poll_time.as_nanos_unchecked() as i64,
);
if let Ok(bytes) = blob {
ctx.perf_counters().add_to_counter(
PerfCounterType::BytesSent,
bytes.len() as i64,
)
}
Ok(())
}
})
})
.collect();
let index_blobs = chunk
.index_blobs
.into_iter()
.map(|fut| {
fut.timed({
let ctx = ctx.clone();
move |stats, blob| {
ctx.perf_counters().add_to_counter(
PerfCounterType::SumManifoldPollTime,
stats.poll_time.as_nanos_unchecked() as i64,
);
if let Ok(bytes) = blob {
ctx.perf_counters().add_to_counter(
PerfCounterType::BytesSent,
bytes.len() as i64,
)
}
Ok(())
}
})
})
.collect();
RevlogStreamingChunks {
data_size: chunk.data_size,
index_size: chunk.index_size,
data_blobs,
index_blobs,
let stream = {
cloned!(ctx);
async move {
let changelog = match streaming_clone {
None => RevlogStreamingChunks::new(),
Some(SqlStreamingCloneConfig {
blobstore,
fetcher,
repoid,
}) => {
fetcher
.fetch_changelog(ctx.clone(), repoid, blobstore.clone())
.await?
}
};
let data_blobs = changelog
.data_blobs
.into_iter()
.map(|fut| {
cloned!(ctx);
async move {
let (stats, res) = fut.timed().await;
ctx.perf_counters().add_to_counter(
PerfCounterType::SumManifoldPollTime,
stats.poll_time.as_nanos_unchecked() as i64,
);
if let Ok(bytes) = res.as_ref() {
ctx.perf_counters().add_to_counter(
PerfCounterType::BytesSent,
bytes.len() as i64,
)
}
res
}
.boxed()
})
.collect();
let index_blobs = changelog
.index_blobs
.into_iter()
.map(|fut| {
cloned!(ctx);
async move {
let (stats, res) = fut.timed().await;
ctx.perf_counters().add_to_counter(
PerfCounterType::SumManifoldPollTime,
stats.poll_time.as_nanos_unchecked() as i64,
);
if let Ok(bytes) = res.as_ref() {
ctx.perf_counters().add_to_counter(
PerfCounterType::BytesSent,
bytes.len() as i64,
)
}
res
}
.boxed()
})
.collect();
let changelog = RevlogStreamingChunks {
data_size: changelog.data_size,
index_size: changelog.index_size,
data_blobs,
index_blobs,
};
debug!(
ctx.logger(),
"streaming changelog {} index bytes, {} data bytes",
changelog.index_size,
changelog.data_size
);
let mut response_header = Vec::new();
// Send OK response.
response_header.push(Bytes::from_static(b"0\n"));
// send header.
let total_size = changelog.index_size + changelog.data_size;
let file_count = 2;
let header = format!("{} {}\n", file_count, total_size);
response_header.push(header.into_bytes().into());
let response = stream::iter(response_header.into_iter().map(Ok));
fn build_file_stream(
name: &str,
size: usize,
data: Vec<futures::future::BoxFuture<'static, Result<Bytes, Error>>>,
) -> impl futures::stream::Stream<Item = Result<Bytes, Error>> + Send
{
let header = format!("{}\0{}\n", name, size);
stream::once(future::ready(Ok(header.into_bytes().into())))
.chain(stream::iter(data.into_iter()).buffered(100))
}
})
.map({
cloned!(ctx);
move |changelog_chunks| {
debug!(
ctx.logger(),
"streaming changelog {} index bytes, {} data bytes",
changelog_chunks.index_size,
changelog_chunks.data_size
);
let mut response_header = Vec::new();
// TODO(t34058163): actually send a real streaming response, not an empty one
// Send OK response.
response_header.push(Bytes::from_static(b"0\n"));
// send header.
let total_size = changelog_chunks.index_size + changelog_chunks.data_size;
let file_count = 2;
let header = format!("{} {}\n", file_count, total_size);
response_header.push(header.into_bytes().into());
let response = stream_old::iter_ok(response_header);
fn build_file_stream(
name: &str,
size: usize,
data: Vec<BoxFuture<Bytes, Error>>,
) -> impl Stream<Item = Bytes, Error = Error> + Send
{
let header = format!("{}\0{}\n", name, size);
let res = response
.chain(build_file_stream(
"00changelog.i",
changelog.index_size,
changelog.index_blobs,
))
.chain(build_file_stream(
"00changelog.d",
changelog.data_size,
changelog.data_blobs,
));
stream_old::once(Ok(header.into_bytes().into()))
.chain(stream_old::iter_ok(data.into_iter()).buffered(100))
}
Ok(res)
}
}
.try_flatten_stream();
response
.chain(build_file_stream(
"00changelog.i",
changelog_chunks.index_size,
changelog_chunks.index_blobs,
))
.chain(build_file_stream(
"00changelog.d",
changelog_chunks.data_size,
changelog_chunks.data_blobs,
))
}
})
.flatten_stream()
stream
.whole_stream_timeout(clone_timeout())
.map(bytes_ext::copy_from_new)
.map_err(process_stream_timeout_error)
.flatten_err()
.map_ok(bytes_ext::copy_from_new)
.boxed()
.compat()
.timed({
move |stats, _| {
command_logger.finalize_command(ctx, &stats, None);

View File

@ -11,10 +11,10 @@ use std::vec::Vec;
use anyhow::Error;
use bytes::Bytes;
use cloned::cloned;
use futures::future::{FutureExt, TryFutureExt};
use futures_01_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_old::{future::lazy, Future};
use futures::{
compat::Future01CompatExt,
future::{BoxFuture, FutureExt},
};
use sql::{queries, Connection};
use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql_ext::SqlConnections;
@ -35,8 +35,8 @@ pub enum ErrorKind {
pub struct RevlogStreamingChunks {
pub index_size: usize,
pub data_size: usize,
pub index_blobs: Vec<BoxFuture<Bytes, Error>>,
pub data_blobs: Vec<BoxFuture<Bytes, Error>>,
pub index_blobs: Vec<BoxFuture<'static, Result<Bytes, Error>>>,
pub data_blobs: Vec<BoxFuture<'static, Result<Bytes, Error>>>,
}
impl RevlogStreamingChunks {
@ -83,63 +83,60 @@ fn fetch_blob(
blobstore: impl Blobstore + 'static,
key: &[u8],
expected_size: usize,
) -> BoxFuture<Bytes, Error> {
) -> BoxFuture<'static, Result<Bytes, Error>> {
let key = String::from_utf8_lossy(key).into_owned();
lazy(move || {
{
cloned!(key);
async move { blobstore.get(&ctx, &key).await }
}
.boxed()
.compat()
.and_then(move |data| {
match data {
None => Err(ErrorKind::MissingStreamingBlob(key).into()),
Some(data) if data.as_bytes().len() == expected_size => Ok(data.into_raw_bytes()),
Some(data) => {
Err(
ErrorKind::CorruptStreamingBlob(key, data.as_bytes().len(), expected_size)
.into(),
)
}
async move {
let data = blobstore.get(&ctx, &key).await?;
match data {
None => Err(ErrorKind::MissingStreamingBlob(key).into()),
Some(data) if data.as_bytes().len() == expected_size => Ok(data.into_raw_bytes()),
Some(data) => {
Err(
ErrorKind::CorruptStreamingBlob(key, data.as_bytes().len(), expected_size)
.into(),
)
}
})
})
.boxify()
}
}
.boxed()
}
impl SqlStreamingChunksFetcher {
pub fn fetch_changelog(
pub async fn fetch_changelog(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
blobstore: impl Blobstore + Clone + 'static,
) -> BoxFuture<RevlogStreamingChunks, Error> {
SelectChunks::query(&self.read_connection, &repo_id)
.map(move |rows| {
rows.into_iter().fold(
RevlogStreamingChunks::new(),
move |mut res, (idx_blob_name, idx_size, data_blob_name, data_size)| {
let data_size = data_size as usize;
let idx_size = idx_size as usize;
res.data_size += data_size;
res.index_size += idx_size;
res.data_blobs.push(fetch_blob(
ctx.clone(),
blobstore.clone(),
&data_blob_name,
data_size,
));
res.index_blobs.push(fetch_blob(
ctx.clone(),
blobstore.clone(),
&idx_blob_name,
idx_size,
));
res
},
)
})
.boxify()
) -> Result<RevlogStreamingChunks, Error> {
let rows = SelectChunks::query(&self.read_connection, &repo_id)
.compat()
.await?;
let res = rows.into_iter().fold(
RevlogStreamingChunks::new(),
move |mut res, (idx_blob_name, idx_size, data_blob_name, data_size)| {
let data_size = data_size as usize;
let idx_size = idx_size as usize;
res.data_size += data_size;
res.index_size += idx_size;
res.data_blobs.push(fetch_blob(
ctx.clone(),
blobstore.clone(),
&data_blob_name,
data_size,
));
res.index_blobs.push(fetch_blob(
ctx.clone(),
blobstore.clone(),
&idx_blob_name,
idx_size,
));
res
},
);
Ok(res)
}
}