mononoke/repo_client: remove direct usage of Tokio 0.1

Summary:
Like it says in the title. This removes places where we use Tokio 0.1 directly
in repo client. We use it for timeouts, so this updates us to Tokio 0.2
timeouts.

Where possible, I've made a few improvements to the code as well:

- I removed timeouts on `future::ok()` because a future that is immediately
  ready isn't going to time out.
- I updated some code to async / await where it made sense to do so to avoid
  round-tripping through futures 0.1 and 0.2 several times.

One thing that changes here is that we'll show Tokio's error on timeouts (which
says timeout has elapsed) instead of ours, which used to say just "timeout". I
think this doesn't make a big difference.

Reviewed By: StanislavGlebik

Differential Revision: D26485575

fbshipit-source-id: 8158f709bcc52d123a95df541aaeb1ec0fc9c281
This commit is contained in:
Thomas Orozco 2021-02-19 06:57:14 -08:00 committed by Facebook GitHub Bot
parent fc48f40f4a
commit 6583143fec
4 changed files with 109 additions and 116 deletions

View File

@ -58,7 +58,6 @@ serde_json = { version = "1.0", features = ["float_roundtrip"] }
slog = { version = "2.5", features = ["max_level_debug"] }
thiserror = "1.0"
tokio = { version = "0.2.25", features = ["full", "test-util"] }
tokio-old = { package = "tokio", version = "0.1" }
[dev-dependencies]
blobrepo_factory = { path = "../blobrepo/factory", version = "0.1.0" }

View File

@ -11,7 +11,7 @@ use chrono::Utc;
use cloned::cloned;
use context::{CoreContext, PerfCounters, SessionId};
use fbinit::FacebookInit;
use futures::{FutureExt, TryFutureExt};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures_01_ext::FutureExt as _;
use futures_old::{future, Future};
use futures_stats::{FutureStats, StreamStats};
@ -305,9 +305,9 @@ fn do_wireproto_logging<'a>(
builder.log();
})
.or_else(|_| Ok(()))
.or_else(|_| Result::<_, Error>::Ok(()))
});
tokio_old::spawn(f);
tokio::spawn(f.compat());
}
fn generate_random_string(len: usize) -> String {

View File

@ -27,12 +27,14 @@ use futures::{
channel::oneshot::{self, Sender},
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, select, Either, FutureExt, TryFutureExt},
pin_mut, StreamExt, TryStreamExt,
pin_mut,
stream::{FuturesUnordered, StreamExt, TryStreamExt},
};
use futures_01_ext::{
spawn_future, try_boxfuture, try_boxstream, BoxFuture, BoxStream, BufferedParams,
FutureExt as OldFutureExt, StreamExt as OldStreamExt, StreamTimeoutError,
spawn_future, try_boxstream, BoxFuture, BoxStream, BufferedParams, FutureExt as OldFutureExt,
StreamExt as OldStreamExt, StreamTimeoutError,
};
use futures_ext::{FbFutureExt, FbTryFutureExt};
use futures_old::future::ok;
use futures_old::{
future as future_old, stream, try_ready, Async, Future, IntoFuture, Poll, Stream,
@ -84,8 +86,6 @@ use std::time::{Duration, Instant};
use streaming_clone::RevlogStreamingChunks;
use time_ext::DurationExt;
use tokio::time::delay_for;
use tokio_old::timer::timeout::Error as TimeoutError;
use tokio_old::util::FutureExt as TokioFutureExt;
use tracing::{trace_args, Traced};
use tunables::tunables;
@ -254,13 +254,6 @@ fn getpack_timeout() -> Duration {
}
}
pub(crate) fn process_timeout_error(err: TimeoutError<Error>) -> Error {
match err.into_inner() {
Some(err) => err,
None => Error::msg("timeout"),
}
}
fn process_stream_timeout_error(err: StreamTimeoutError) -> Error {
match err {
StreamTimeoutError::Error(err) => err,
@ -1116,8 +1109,11 @@ impl HgCommands for RepoClient {
}
})
.collect()
.compat()
.timeout(default_timeout())
.map_err(process_timeout_error)
.flatten_err()
.boxed()
.compat()
.traced(self.session.trace(), ops::BETWEEN, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1158,8 +1154,6 @@ impl HgCommands for RepoClient {
}
future_old::ok(hostname)
.timeout(default_timeout())
.map_err(process_timeout_error)
.traced(self.session.trace(), ops::CLIENTTELEMETRY, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1178,8 +1172,11 @@ impl HgCommands for RepoClient {
// that points to it.
self.get_publishing_bookmarks_maybe_stale(ctx)
.map(|map| map.into_iter().map(|(_, hg_cs_id)| hg_cs_id).collect())
.compat()
.timeout(default_timeout())
.map_err(process_timeout_error)
.flatten_err()
.boxed()
.compat()
.traced(self.session.trace(), ops::HEADS, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1354,10 +1351,10 @@ impl HgCommands for RepoClient {
}
lookup_fut.compat().await
}
.timeout(default_timeout())
.flatten_err()
.boxed()
.compat()
.timeout(default_timeout())
.map_err(process_timeout_error)
.traced(self.session.trace(), ops::LOOKUP, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1375,25 +1372,21 @@ impl HgCommands for RepoClient {
let phases_hint = blobrepo.get_phases().clone();
{
cloned!(ctx, nodes);
async move { blobrepo.get_hg_bonsai_mapping(ctx, nodes).await }
}
.boxed()
.compat()
.map(|hg_bcs_mapping| {
let mut bcs_ids = vec![];
let mut bcs_hg_mapping = hashmap! {};
for (hg, bcs) in hg_bcs_mapping {
bcs_ids.push(bcs);
bcs_hg_mapping.insert(bcs, hg);
}
(bcs_ids, bcs_hg_mapping)
})
.and_then({
cloned!(ctx);
move |(bcs_ids, bcs_hg_mapping)| {
phases_hint
async move {
let hg_bcs_mapping = blobrepo
.get_hg_bonsai_mapping(ctx.clone(), nodes.clone())
.await?;
let mut bcs_ids = vec![];
let mut bcs_hg_mapping = hashmap! {};
for (hg, bcs) in hg_bcs_mapping {
bcs_ids.push(bcs);
bcs_hg_mapping.insert(bcs, hg);
}
let found_hg_changesets = phases_hint
.get_public(ctx, bcs_ids, false)
.map_ok(move |public_csids| {
public_csids
@ -1401,17 +1394,20 @@ impl HgCommands for RepoClient {
.filter_map(|csid| bcs_hg_mapping.get(&csid).cloned())
.collect::<HashSet<_>>()
})
.compat()
.await?;
let res = nodes
.into_iter()
.map(move |node| found_hg_changesets.contains(&node))
.collect::<Vec<_>>();
Result::<_, Error>::Ok(res)
}
})
.map(move |found_hg_changesets| {
nodes
.into_iter()
.map(move |node| found_hg_changesets.contains(&node))
.collect::<Vec<_>>()
})
}
.timeout(default_timeout())
.map_err(process_timeout_error)
.flatten_err()
.boxed()
.compat()
.traced(self.session.trace(), ops::KNOWN, trace_args!())
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
@ -1435,20 +1431,26 @@ impl HgCommands for RepoClient {
let nodes_len = nodes.len();
{
cloned!(ctx, nodes);
async move { blobrepo.get_hg_bonsai_mapping(ctx, nodes).await }
cloned!(ctx);
async move {
let hg_bcs_mapping = blobrepo
.get_hg_bonsai_mapping(ctx, nodes.clone())
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let res = nodes
.into_iter()
.map(move |node| hg_bcs_mapping.contains_key(&node))
.collect::<Vec<_>>();
Result::<_, Error>::Ok(res)
}
}
.timeout(default_timeout())
.flatten_err()
.boxed()
.compat()
.map(|hg_bcs_mapping| {
let hg_bcs_mapping: HashMap<_, _> = hg_bcs_mapping.into_iter().collect();
nodes
.into_iter()
.map(move |node| hg_bcs_mapping.contains_key(&node))
.collect::<Vec<_>>()
})
.timeout(default_timeout())
.map_err(process_timeout_error)
.traced(self.session.trace(), ops::KNOWNNODES, trace_args!())
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
@ -1507,8 +1509,6 @@ impl HgCommands for RepoClient {
res.insert("capabilities".to_string(), caps);
future_old::ok(res)
.timeout(default_timeout())
.map_err(process_timeout_error)
.traced(self.session.trace(), ops::HELLO, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1556,58 +1556,54 @@ impl HgCommands for RepoClient {
}
self.command_future(ops::LISTKEYSPATTERNS, UNSAMPLED, |ctx, command_logger| {
let queries = patterns.into_iter().map({
cloned!(ctx);
let max = self.repo.list_keys_patterns_max();
let session_bookmarks_cache = self.session_bookmarks_cache.clone();
move |pattern| {
let max = self.repo.list_keys_patterns_max();
let session_bookmarks_cache = self.session_bookmarks_cache.clone();
let queries = patterns.into_iter().map(move |pattern| {
cloned!(ctx, session_bookmarks_cache);
async move {
if pattern.ends_with("*") {
// prefix match
let prefix =
try_boxfuture!(BookmarkPrefix::new(&pattern[..pattern.len() - 1]));
session_bookmarks_cache
let prefix = BookmarkPrefix::new(&pattern[..pattern.len() - 1])?;
let bookmarks = session_bookmarks_cache
.get_bookmarks_by_prefix(&ctx, &prefix, max)
.compat()
.map(|(bookmark, cs_id): (BookmarkName, HgChangesetId)| {
(bookmark.to_string(), cs_id)
})
.collect()
.and_then(move |bookmarks| {
if bookmarks.len() < max as usize {
Ok(bookmarks)
} else {
Err(format_err!(
.map_ok(|(bookmark, cs_id)| {
(bookmark.to_string(), cs_id)
})
.try_collect::<Vec<_>>().await?;
if bookmarks.len() < max as usize {
Ok(bookmarks)
} else {
Err(format_err!(
"Bookmark query was truncated after {} results, use a more specific prefix search.",
max,
))
}
})
.boxify()
))
}
} else {
// literal match
let bookmark = try_boxfuture!(BookmarkName::new(&pattern));
{
cloned!(ctx, session_bookmarks_cache);
async move {
let cs_id = session_bookmarks_cache.get_bookmark(ctx, bookmark).await?;
match cs_id {
Some(cs_id) => Ok(vec![(pattern, cs_id)]),
None => Ok(Vec::new()),
}
}
let bookmark = BookmarkName::new(&pattern)?;
let cs_id = session_bookmarks_cache.get_bookmark(ctx, bookmark).await?;
match cs_id {
Some(cs_id) => Ok(vec![(pattern, cs_id)]),
None => Ok(Vec::new()),
}
.boxed()
.compat()
.boxify()
}
}
});
stream::futures_unordered(queries)
.concat2()
.map(|bookmarks| bookmarks.into_iter().collect())
queries
.collect::<FuturesUnordered<_>>()
.try_fold(BTreeMap::new(), |mut ret, books| {
ret.extend(books);
future::ready(Ok(ret))
})
.timeout(default_timeout())
.map_err(process_timeout_error)
.flatten_err()
.boxed()
.compat()
.traced(self.session.trace(), ops::LISTKEYS, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);
@ -1786,8 +1782,6 @@ impl HgCommands for RepoClient {
}
}
}
.boxed()
.compat()
.inspect_err({
cloned!(reponame);
move |err| {
@ -1816,11 +1810,13 @@ impl HgCommands for RepoClient {
};
}
})
.map(bytes_ext::copy_from_new)
.from_err()
.inspect_ok(move |_| STATS::push_success.add_value(1, (reponame,)))
.map_ok(bytes_ext::copy_from_new)
.map_err(Error::from)
.timeout(default_timeout())
.map_err(process_timeout_error)
.inspect(move |_| STATS::push_success.add_value(1, (reponame,)))
.flatten_err()
.boxed()
.compat()
.traced(&trace, ops::UNBUNDLE, trace_args!())
.timed(move |stats, _| {
command_logger.without_wireproto().finalize_command(&stats);

View File

@ -5,7 +5,6 @@
* GNU General Public License version 2.
*/
use super::process_timeout_error;
use anyhow::Error;
use blobrepo::BlobRepo;
use blobrepo_hg::{to_hg_bookmark_stream, BlobRepoHg};
@ -13,16 +12,16 @@ use bookmarks::{
Bookmark, BookmarkKind, BookmarkName, BookmarkPagination, BookmarkPrefix, Freshness,
};
use context::CoreContext;
use futures::{compat::Stream01CompatExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{compat::Stream01CompatExt, future, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures_01_ext::{FutureExt, StreamExt as OldStreamExt};
use futures_old::{future as future_old, Future, Stream as OldStream};
use futures_ext::{FbFutureExt, FbTryFutureExt};
use futures_old::{future as future_old, Future};
use mercurial_types::HgChangesetId;
use mononoke_repo::MononokeRepo;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio_old::util::FutureExt as TokioFutureExt;
use tunables::tunables;
use warm_bookmarks_cache::WarmBookmarksCache;
@ -249,14 +248,13 @@ where
self.repo
.blobrepo()
.get_publishing_bookmarks_maybe_stale(ctx)
.compat()
.fold(HashMap::new(), |mut map, item| {
.try_fold(HashMap::new(), |mut map, item| {
map.insert(item.0, item.1);
let ret: Result<_, Error> = Ok(map);
ret
future::ready(Ok(map))
})
.timeout(bookmarks_timeout())
.map_err(process_timeout_error)
.flatten_err()
.compat()
}
}