mononoke: deduplicate known* methods code

Summary:
the code is almost the same, so it would be good to deduplicate it. The
duplication let to the annoying differences in logging - i.e. we logged how
many nodes were sent to us in `known` call but not in `knownnodes` call.

Reviewed By: farnz

Differential Revision: D27650583

fbshipit-source-id: 5e2e3be3b9fd66631364d23f34d241c27e370340
This commit is contained in:
Stanislau Hlebik 2021-04-08 09:36:49 -07:00 committed by Facebook GitHub Bot
parent 7358178f4a
commit 9b02233ed2

View File

@ -991,6 +991,57 @@ impl RepoClient {
Ok(None)
}
}
fn known_impl<Func, Fut>(
&self,
nodes: Vec<HgChangesetId>,
command: &'static str,
filter: Func,
) -> HgCommandRes<Vec<bool>>
where
Func: FnOnce(CoreContext, Vec<HgChangesetId>, Vec<(HgChangesetId, ChangesetId)>) -> Fut
+ Send
+ 'static,
Fut: future::Future<Output = Result<Vec<bool>, Error>> + Send + 'static,
{
self.command_future(command, UNSAMPLED, |ctx, mut command_logger| {
let blobrepo = self.repo.blobrepo().clone();
let nodes_len = nodes.len();
let args = json!({
"nodes_count": nodes_len,
});
command_logger.add_trimmed_scuba_extra("command_args", &args);
{
cloned!(ctx);
async move {
let hg_bcs_mapping = blobrepo
.get_hg_bonsai_mapping(ctx.clone(), nodes.clone())
.await?;
filter(ctx, nodes, hg_bcs_mapping).await
}
}
.timeout(default_timeout())
.flatten_err()
.boxed()
.compat()
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
ctx.perf_counters()
.add_to_counter(PerfCounterType::NumKnown, known.len() as i64);
ctx.perf_counters().add_to_counter(
PerfCounterType::NumUnknown,
(nodes_len - known.len()) as i64,
);
}
command_logger.without_wireproto().finalize_command(&stats);
Ok(())
})
})
}
}
fn throttle_stream<F, S, V>(
@ -1385,109 +1436,53 @@ impl HgCommands for RepoClient {
// @wireprotocommand('known', 'nodes *'), but the '*' is ignored
fn known(&self, nodes: Vec<HgChangesetId>) -> HgCommandRes<Vec<bool>> {
self.command_future(ops::KNOWN, UNSAMPLED, |ctx, mut command_logger| {
let blobrepo = self.repo.blobrepo().clone();
let phases_hint = self.repo.blobrepo().get_phases().clone();
self.known_impl(
nodes,
ops::KNOWN,
move |ctx, nodes, hg_bcs_mapping| async move {
let mut bcs_ids = vec![];
let mut bcs_hg_mapping = hashmap! {};
let nodes_len = nodes.len();
let args = json!({
"nodes_count": nodes_len,
});
command_logger.add_trimmed_scuba_extra("command_args", &args);
let phases_hint = blobrepo.get_phases().clone();
{
cloned!(ctx);
async move {
let hg_bcs_mapping = blobrepo
.get_hg_bonsai_mapping(ctx.clone(), nodes.clone())
.await?;
let mut bcs_ids = vec![];
let mut bcs_hg_mapping = hashmap! {};
for (hg, bcs) in hg_bcs_mapping {
bcs_ids.push(bcs);
bcs_hg_mapping.insert(bcs, hg);
}
let found_hg_changesets = phases_hint
.get_public(ctx, bcs_ids, false)
.map_ok(move |public_csids| {
public_csids
.into_iter()
.filter_map(|csid| bcs_hg_mapping.get(&csid).cloned())
.collect::<HashSet<_>>()
})
.await?;
let res = nodes
.into_iter()
.map(move |node| found_hg_changesets.contains(&node))
.collect::<Vec<_>>();
Result::<_, Error>::Ok(res)
for (hg, bcs) in hg_bcs_mapping {
bcs_ids.push(bcs);
bcs_hg_mapping.insert(bcs, hg);
}
}
.timeout(default_timeout())
.flatten_err()
.boxed()
.compat()
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
ctx.perf_counters()
.add_to_counter(PerfCounterType::NumKnown, known.len() as i64);
ctx.perf_counters().add_to_counter(
PerfCounterType::NumUnknown,
(nodes_len - known.len()) as i64,
);
}
command_logger.without_wireproto().finalize_command(&stats);
Ok(())
})
})
let found_hg_changesets = phases_hint
.get_public(ctx, bcs_ids, false)
.map_ok(move |public_csids| {
public_csids
.into_iter()
.filter_map(|csid| bcs_hg_mapping.get(&csid).cloned())
.collect::<HashSet<_>>()
})
.await?;
let res = nodes
.into_iter()
.map(move |node| found_hg_changesets.contains(&node))
.collect::<Vec<_>>();
Ok(res)
},
)
}
fn knownnodes(&self, nodes: Vec<HgChangesetId>) -> HgCommandRes<Vec<bool>> {
self.command_future(ops::KNOWNNODES, UNSAMPLED, |ctx, command_logger| {
let blobrepo = self.repo.blobrepo().clone();
self.known_impl(
nodes,
ops::KNOWNNODES,
move |_ctx, nodes, hg_bcs_mapping| async move {
let hg_bcs_mapping = hg_bcs_mapping.into_iter().collect::<HashMap<_, _>>();
let res = nodes
.into_iter()
.map(move |node| hg_bcs_mapping.contains_key(&node))
.collect::<Vec<_>>();
let nodes_len = nodes.len();
{
cloned!(ctx);
async move {
let hg_bcs_mapping = blobrepo
.get_hg_bonsai_mapping(ctx, nodes.clone())
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let res = nodes
.into_iter()
.map(move |node| hg_bcs_mapping.contains_key(&node))
.collect::<Vec<_>>();
Result::<_, Error>::Ok(res)
}
}
.timeout(default_timeout())
.flatten_err()
.boxed()
.compat()
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
ctx.perf_counters()
.add_to_counter(PerfCounterType::NumKnown, known.len() as i64);
ctx.perf_counters().add_to_counter(
PerfCounterType::NumUnknown,
(nodes_len - known.len()) as i64,
);
}
command_logger.without_wireproto().finalize_command(&stats);
Ok(())
})
})
Ok(res)
},
)
}
// @wireprotocommand('getbundle', '*')