mononoke: use spawn_future in CreateChangeset

Summary:
We've recently hit a stackoverflow issue while trying to push or blobimport a
lot of commits.

Part of the stacktrace:

{P64639044}

I matched the stacktrace to CreateChangeset::create() method. When we try to
import a lot of commits we get a future that represents a long stack of
commits. Polling this future results in deep recursion.

It's not clear why we didn't have these stack overflows before. Regardless of
that using `spawn_future()` helps prevent stackoverflow because it creates a
new task for each commit and uses oneshot::channel to connect them, which avoid
recursion.

Reviewed By: aslpavel

Differential Revision: D15367915

fbshipit-source-id: cf4e7981ffd66a8a0c4531516e022eb215265bc7
This commit is contained in:
Stanislau Hlebik 2019-05-20 05:36:52 -07:00 committed by Facebook Github Bot
parent d76685495a
commit 94ec821cc4

View File

@ -33,7 +33,7 @@ use futures::future::{self, loop_fn, ok, Either, Future, Loop};
use futures::stream::{FuturesUnordered, Stream};
use futures::sync::oneshot;
use futures::IntoFuture;
use futures_ext::{try_boxfuture, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_ext::{spawn_future, try_boxfuture, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_stats::{FutureStats, Timed};
use mercurial::file::File;
use mercurial_types::manifest::Content;
@ -2326,66 +2326,69 @@ impl CreateChangeset {
let complete_changesets = repo.changesets.clone();
cloned!(repo, repo.repoid);
ChangesetHandle::new_pending(
can_be_parent.shared(),
changeset
.join(parents_complete)
.and_then({
cloned!(ctx, repo.bonsai_hg_mapping);
move |((hg_cs, bonsai_cs), _)| {
let bcs_id = bonsai_cs.get_changeset_id();
let bonsai_hg_entry = BonsaiHgMappingEntry {
repo_id: repoid.clone(),
hg_cs_id: hg_cs.get_changeset_id(),
bcs_id,
};
bonsai_hg_mapping
.add(ctx.clone(), bonsai_hg_entry)
.map(move |_| (hg_cs, bonsai_cs))
.context("While inserting mapping")
.traced_with_id(
&ctx.trace(),
"uploading bonsai hg mapping",
trace_args!(),
event_id,
)
}
})
.and_then(move |(hg_cs, bonsai_cs)| {
let completion_record = ChangesetInsert {
repo_id: repo.repoid,
cs_id: bonsai_cs.get_changeset_id(),
parents: bonsai_cs.parents().into_iter().collect(),
let changeset_complete_fut = changeset
.join(parents_complete)
.and_then({
cloned!(ctx, repo.bonsai_hg_mapping);
move |((hg_cs, bonsai_cs), _)| {
let bcs_id = bonsai_cs.get_changeset_id();
let bonsai_hg_entry = BonsaiHgMappingEntry {
repo_id: repoid.clone(),
hg_cs_id: hg_cs.get_changeset_id(),
bcs_id,
};
complete_changesets
.add(ctx.clone(), completion_record)
.map(|_| (bonsai_cs, hg_cs))
.context("While inserting into changeset table")
bonsai_hg_mapping
.add(ctx.clone(), bonsai_hg_entry)
.map(move |_| (hg_cs, bonsai_cs))
.context("While inserting mapping")
.traced_with_id(
&ctx.trace(),
"uploading final changeset",
"uploading bonsai hg mapping",
trace_args!(),
event_id,
)
})
.with_context(move |_| {
format!(
"While creating Changeset {:?}, uuid: {}",
expected_nodeid, uuid
}
})
.and_then(move |(hg_cs, bonsai_cs)| {
let completion_record = ChangesetInsert {
repo_id: repo.repoid,
cs_id: bonsai_cs.get_changeset_id(),
parents: bonsai_cs.parents().into_iter().collect(),
};
complete_changesets
.add(ctx.clone(), completion_record)
.map(|_| (bonsai_cs, hg_cs))
.context("While inserting into changeset table")
.traced_with_id(
&ctx.trace(),
"uploading final changeset",
trace_args!(),
event_id,
)
})
.map_err(|e| Error::from(e).compat())
.timed({
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("CreateChangeset Finished", None);
}
Ok(())
})
.with_context(move |_| {
format!(
"While creating Changeset {:?}, uuid: {}",
expected_nodeid, uuid
)
})
.map_err(Error::from)
.timed({
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("CreateChangeset Finished", None);
}
})
Ok(())
}
});
ChangesetHandle::new_pending(
can_be_parent.shared(),
spawn_future(changeset_complete_fut)
.map_err(|e| Error::from(e).compat())
.boxify()
.shared(),
)