mononoke/skiplists: spawn skiplist index fetching

Summary:
On setup SCS initializes the repos concurrently, warming up derived data for each repo, warming bookmark cache and fetching skiplists.

Fetching skiplists is an expensive operation and includes two steps: async get a large blob from the Blobstore and then sync deserialization of the blob.
While running on the same task as warming the bookmark cache, it takes all CPU and the other futures have to wait and can't process results returned by MySQL queries or connect to the DB. Thus SCS eventually fail to acquire a new connection or to perform a query in a reasonable time and terminates.

Spawning skiplists in a separate task helps to unlock the thread where the warm is running.

This was first noticed in TW tasks because after the MySQL rollout some of the SCS tasks started to take an hour to start.
To debug this and localize the issue, we put debug output to see what exactly blocks the repo initialization and, turned out it, when skiplists fetching started the rest was blocked.

Reviewed By: StanislavGlebik

Differential Revision: D26128171

fbshipit-source-id: fe9e1882af898950cf16d8e939dc6bc6be56510e
This commit is contained in:
Aida Getoeva 2021-01-29 10:38:53 -08:00 committed by Facebook GitHub Bot
parent b6b68257be
commit db3dbff5d3
2 changed files with 27 additions and 14 deletions

View File

@ -15,6 +15,7 @@ mononoke_types = { path = "../../mononoke_types", version = "0.1.0" }
reachabilityindex = { path = "..", version = "0.1.0" }
skiplist-thrift = { path = "../if", version = "0.1.0" }
tunables = { path = "../../tunables", version = "0.1.0" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master", version = "0.0.1+unstable" }
anyhow = "1.0"
async-trait = "0.1.29"
@ -35,7 +36,6 @@ revset = { path = "../../revset", version = "0.1.0" }
test-helpers = { path = "../test-helpers", version = "0.1.0" }
tests_utils = { path = "../../tests/utils", version = "0.1.0" }
async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
futures-old = { package = "futures", version = "0.1.30" }

View File

@ -15,12 +15,14 @@ use async_trait::async_trait;
use blobstore::Blobstore;
use bytes::Bytes;
use chashmap::CHashMap;
use cloned::cloned;
use context::{CoreContext, PerfCounterType};
use futures::future::try_join_all;
use futures::stream::{futures_unordered::FuturesUnordered, TryStreamExt};
use futures_util::try_join;
use maplit::{hashmap, hashset};
use slog::{info, Logger};
use tokio::task;
use changeset_fetcher::ChangesetFetcher;
use mononoke_types::{ChangesetId, Generation, FIRST_GENERATION};
@ -135,20 +137,31 @@ pub async fn fetch_skiplist_index(
match maybe_skiplist_blobstore_key {
Some(skiplist_index_blobstore_key) => {
info!(ctx.logger(), "Fetching and initializing skiplist");
let maybebytes = blobstore.get(ctx, &skiplist_index_blobstore_key).await?;
let slg = match maybebytes {
Some(bytes) => {
let bytes = bytes.into_raw_bytes();
let skiplist = deserialize_skiplist_index(ctx.logger().clone(), bytes)?;
info!(ctx.logger(), "Built skiplist");
skiplist
let skiplist_index = task::spawn({
cloned!(ctx, blobstore, skiplist_index_blobstore_key);
async move {
let maybebytes = blobstore.get(&ctx, &skiplist_index_blobstore_key).await?;
match maybebytes {
Some(bytes) => {
let bytes = bytes.into_raw_bytes();
let logger = ctx.logger().clone();
let skiplist = task::spawn_blocking(move || {
deserialize_skiplist_index(logger, bytes)
})
.await??;
info!(ctx.logger(), "Built skiplist");
Ok(skiplist)
}
None => {
info!(ctx.logger(), "Skiplist is empty!");
Result::<_, Error>::Ok(SkiplistIndex::new())
}
}
}
None => {
info!(ctx.logger(), "Skiplist is empty!");
SkiplistIndex::new()
}
};
Ok(Arc::new(slg))
});
let skiplist_index = skiplist_index.await??;
Ok(Arc::new(skiplist_index))
}
None => Ok(Arc::new(SkiplistIndex::new())),
}