dag: make parent function async

Summary: Make the parent function used by various graph building functions async.

Reviewed By: sfilipco

Differential Revision: D25353612

fbshipit-source-id: 31f173dc82f0cce6022cc2caae78369fdc821c8f
This commit is contained in:
Jun Wu 2020-12-10 12:32:23 -08:00 committed by Facebook GitHub Bot
parent bffffb2415
commit dcf4957619
11 changed files with 106 additions and 100 deletions

View File

@ -52,19 +52,22 @@ impl TestContext {
}
}
impl<T: AsRef<[usize]> + Send + Sync> GeneralTestContext<T> {
impl<T: AsRef<[usize]> + Send + Sync + Clone + 'static> GeneralTestContext<T> {
pub fn from_parents(parents: Vec<T>) -> Self {
// Prepare NameDag
let parents_by_name = |name: VertexName| -> dag::Result<Vec<VertexName>> {
let i = String::from_utf8(name.as_ref().to_vec())
.unwrap()
.parse::<usize>()
.unwrap();
Ok(parents[i]
.as_ref()
.iter()
.map(|p| format!("{}", p).as_bytes().to_vec().into())
.collect())
let parents_by_name = {
let parents = parents.clone();
move |name: VertexName| -> dag::Result<Vec<VertexName>> {
let i = String::from_utf8(name.as_ref().to_vec())
.unwrap()
.parse::<usize>()
.unwrap();
Ok(parents[i]
.as_ref()
.iter()
.map(|p| format!("{}", p).as_bytes().to_vec().into())
.collect())
}
};
// Pick heads from 0..n
let get_heads = |n: usize| -> Vec<VertexName> {
@ -85,7 +88,9 @@ impl<T: AsRef<[usize]> + Send + Sync> GeneralTestContext<T> {
let dir = tempfile::tempdir().unwrap();
let mut dag = NameDag::open(dir.path()).unwrap();
non_blocking_result(dag.add_heads_and_flush(&parents_by_name, &master_names, &head_names))
let parents_map: Box<dyn Fn(VertexName) -> dag::Result<Vec<VertexName>> + Send + Sync> =
Box::new(parents_by_name);
non_blocking_result(dag.add_heads_and_flush(&parents_map, &master_names, &head_names))
.unwrap();
// Prepare idmap

View File

@ -132,7 +132,9 @@ fn sync_from_git(
.map(|id| Vertex::copy_from(id.as_bytes()))
.collect())
};
non_blocking_result(dag.add_heads_and_flush(parent_func, &master_heads, &non_master_heads))?;
let parents: Box<dyn Fn(Vertex) -> dag::Result<Vec<Vertex>> + Send + Sync> =
Box::new(parent_func);
non_blocking_result(dag.add_heads_and_flush(&parents, &master_heads, &non_master_heads))?;
let possible_heads =
Set::from_static_names(master_heads.into_iter().chain(non_master_heads.into_iter()));

View File

@ -16,7 +16,6 @@ use futures::future::BoxFuture;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use nonblocking::non_blocking_result;
use std::collections::HashMap;
use std::collections::HashSet;
use std::future::Future;
@ -174,8 +173,7 @@ pub(crate) async fn beautify(
sort(&get_ancestors, &mut heads[..], main_branch).await?;
let mut dag = MemNameDag::new();
let get_parents = |v| non_blocking_result(this.parent_names(v));
dag.add_heads(get_parents, &heads).await?;
dag.add_heads(&this.dag_snapshot()?, &heads).await?;
Ok(dag)
}

View File

@ -12,6 +12,7 @@
use crate::id::{Group, Id, VertexName};
use crate::locked::Locked;
use crate::ops::IdConvert;
use crate::ops::Parents;
use crate::segment::PreparedFlatSegments;
use crate::Result;
@ -41,16 +42,12 @@ pub trait IdMapAssignHead: IdConvert + IdMapWrite {
/// New `id`s inserted by this function will have the specified `group`.
/// Existing `id`s that are ancestors of `head` will get re-assigned
/// if they have a higher `group`.
async fn assign_head<F>(
async fn assign_head(
&mut self,
head: VertexName,
parents_by_name: F,
parents_by_name: &dyn Parents,
group: Group,
) -> Result<PreparedFlatSegments>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send + Sync,
{
) -> Result<PreparedFlatSegments> {
// There are some interesting cases to optimize the numbers:
//
// C For a merge C, it has choice to assign numbers to A or B
@ -112,7 +109,7 @@ pub trait IdMapAssignHead: IdConvert + IdMapWrite {
// (re-)assign it to this group.
match self.vertex_id_with_max_group(&head, group).await? {
None => {
let parents = parents_by_name(head.clone())?;
let parents = parents_by_name.parent_names(head.clone()).await?;
todo_stack.push(Todo::Assign(head, parents.len()));
// If the parent was not assigned, or was assigned to a higher group,
// (re-)assign the parent to this group.

View File

@ -11,7 +11,6 @@
use crate::clone::CloneData;
use crate::delegate;
use crate::errors::bug;
use crate::errors::programming;
use crate::id::Group;
use crate::id::Id;
@ -31,6 +30,7 @@ use crate::ops::DagPersistent;
use crate::ops::IdConvert;
use crate::ops::IdMapSnapshot;
use crate::ops::Open;
use crate::ops::Parents;
use crate::ops::Persist;
use crate::ops::PrefixLookup;
use crate::ops::ToIdSet;
@ -93,16 +93,12 @@ where
///
/// This is similar to calling `add_heads` followed by `flush`.
/// But is faster.
async fn add_heads_and_flush<F>(
async fn add_heads_and_flush(
&mut self,
parent_names_func: F,
parent_names_func: &dyn Parents,
master_names: &[VertexName],
non_master_names: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send + Sync,
{
) -> Result<()> {
if !self.pending_heads.is_empty() {
return programming(format!(
"ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
@ -152,17 +148,14 @@ where
// PERF: There could be a fast path that does not re-assign numbers.
// But in practice we might always want to re-assign master commits.
let snapshot = self.try_snapshot()?;
let parents = {
let snapshot = snapshot.clone();
move |name| non_blocking_result(snapshot.parent_names(name))
};
let dag_snapshot = self.dag_snapshot()?;
let non_master_heads = &snapshot.pending_heads;
let mut new_name_dag: Self = self.path.open()?;
let seg_size = self.dag.get_new_segment_size();
new_name_dag.dag.set_new_segment_size(seg_size);
new_name_dag
.add_heads_and_flush(&parents, master_heads, non_master_heads)
.add_heads_and_flush(&dag_snapshot, master_heads, non_master_heads)
.await?;
*self = new_name_dag;
Ok(())
@ -186,11 +179,7 @@ where
/// The added vertexes are immediately query-able. They will get Ids
/// assigned to the NON_MASTER group internally. The `flush` function
/// can re-assign Ids to the MASTER group.
async fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
where
F: Send + Sync,
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
async fn add_heads(&mut self, parents: &dyn Parents, heads: &[VertexName]) -> Result<()> {
// Assign to the NON_MASTER group unconditionally so we can avoid the
// complexity re-assigning non-master ids.
//
@ -212,7 +201,7 @@ where
let mut outcome = PreparedFlatSegments::default();
for head in heads.iter() {
if !self.map.contains_vertex_name(head).await? {
outcome.merge(self.map.assign_head(head.clone(), &parents, group).await?);
outcome.merge(self.map.assign_head(head.clone(), parents, group).await?);
self.pending_heads.push(head.clone());
}
}
@ -611,14 +600,7 @@ where
map.remove_non_master()?;
// Rebuild them.
let parent_func = |name: VertexName| match parents.get(&name) {
Some(names) => Ok(names.iter().cloned().collect()),
None => bug(format!(
"bug: parents of {:?} is missing (in rebuild_non_master)",
name
)),
};
build(map, dag, parent_func, &[], &heads[..]).await?;
build(map, dag, &parents, &[], &heads[..]).await?;
Ok(())
};
@ -626,16 +608,14 @@ where
}
/// Build IdMap and Segments for the given heads.
pub async fn build<F, IS, M>(
pub async fn build<IS, M>(
map: &mut Locked<'_, M>,
dag: &mut Locked<'_, IdDag<IS>>,
parent_names_func: F,
parent_names_func: &dyn Parents,
master_heads: &[VertexName],
non_master_heads: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send + Sync,
IS: IdDagStore + Persist,
M: IdMapAssignHead + Persist,
M: Send,
@ -650,7 +630,7 @@ where
{
for node in nodes.iter() {
outcome.merge(
map.assign_head(node.clone(), &parent_names_func, *group)
map.assign_head(node.clone(), parent_names_func, *group)
.await?,
);
}

View File

@ -20,7 +20,6 @@ use crate::nameset::NameSet;
use crate::nameset::SyncNameSetQuery;
use crate::IdSet;
use crate::Result;
use nonblocking::non_blocking_result;
use std::sync::Arc;
/// DAG related read-only algorithms.
@ -167,15 +166,41 @@ pub trait DagAlgorithm: Send + Sync {
fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>>;
}
#[async_trait::async_trait]
pub trait Parents: Send + Sync {
async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>>;
}
#[async_trait::async_trait]
impl Parents for Arc<dyn DagAlgorithm + Send + Sync> {
async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
DagAlgorithm::parent_names(self, name).await
}
}
#[async_trait::async_trait]
impl<'a> Parents for Box<dyn Fn(VertexName) -> Result<Vec<VertexName>> + Send + Sync + 'a> {
async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
(self)(name)
}
}
#[async_trait::async_trait]
impl Parents for std::collections::HashMap<VertexName, Vec<VertexName>> {
async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
match self.get(&name) {
Some(v) => Ok(v.clone()),
None => name.not_found(),
}
}
}
/// Add vertexes recursively to the DAG.
#[async_trait::async_trait]
pub trait DagAddHeads {
/// Add vertexes and their ancestors to the DAG. This does not persistent
/// changes to disk.
async fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
where
F: Send + Sync,
F: Fn(VertexName) -> Result<Vec<VertexName>>;
async fn add_heads(&mut self, parents: &dyn Parents, heads: &[VertexName]) -> Result<()>;
}
/// Import a generated `CloneData` object into the DAG.
@ -192,15 +217,12 @@ pub trait DagPersistent {
async fn flush(&mut self, master_heads: &[VertexName]) -> Result<()>;
/// A faster path for add_heads, followed by flush.
async fn add_heads_and_flush<F>(
async fn add_heads_and_flush(
&mut self,
parent_names_func: F,
parent_names_func: &dyn Parents,
master_names: &[VertexName],
non_master_names: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send + Sync;
) -> Result<()>;
/// Import from another (potentially large) DAG. Write to disk immediately.
async fn import_and_flush(
@ -213,8 +235,7 @@ pub trait DagPersistent {
let master_heads: Vec<VertexName> = master_heads.iter()?.collect::<Result<Vec<_>>>()?;
let non_master_heads: Vec<VertexName> =
non_master_heads.iter()?.collect::<Result<Vec<_>>>()?;
let parent_func = |v| non_blocking_result(dag.parent_names(v));
self.add_heads_and_flush(parent_func, &master_heads, &non_master_heads)
self.add_heads_and_flush(&dag.dag_snapshot()?, &master_heads, &non_master_heads)
.await
}
}
@ -288,13 +309,12 @@ where
}
};
let parents_func = move |name: VertexName| -> Result<Vec<VertexName>> {
Ok(parents[&String::from_utf8(name.as_ref().to_vec()).unwrap()]
.iter()
.map(|p| VertexName::copy_from(p.as_bytes()))
.collect())
};
nonblocking::non_blocking_result(self.add_heads(&parents_func, &heads[..]))?;
let v = |s: String| VertexName::copy_from(s.as_bytes());
let parents: std::collections::HashMap<VertexName, Vec<VertexName>> = parents
.into_iter()
.map(|(k, vs)| (v(k), vs.into_iter().map(v).collect()))
.collect();
nonblocking::non_blocking_result(self.add_heads(&parents, &heads[..]))?;
Ok(())
}
}

View File

@ -12,9 +12,9 @@ use crate::ops::DagPersistent;
use crate::ops::IdConvert;
use crate::render::render_namedag;
use crate::NameDag;
use crate::Result;
use crate::Vertex;
use nonblocking::non_blocking_result;
use std::collections::HashMap;
use std::collections::HashSet;
/// Dag structure for testing purpose.
@ -107,7 +107,7 @@ impl TestDag {
fn get_heads_and_parents_func_from_ascii(
text: &str,
) -> (Vec<Vertex>, impl Fn(Vertex) -> Result<Vec<Vertex>>) {
) -> (Vec<Vertex>, HashMap<Vertex, Vec<Vertex>>) {
let parents = drawdag::parse(&text);
let mut heads = parents
.keys()
@ -116,11 +116,10 @@ fn get_heads_and_parents_func_from_ascii(
.map(|&v| Vertex::copy_from(v.as_bytes()))
.collect::<Vec<_>>();
heads.sort();
let func = move |name: Vertex| -> Result<Vec<Vertex>> {
Ok(parents[&String::from_utf8(name.as_ref().to_vec()).unwrap()]
.iter()
.map(|p| Vertex::copy_from(p.as_bytes()))
.collect())
};
(heads, func)
let v = |s: String| Vertex::copy_from(s.as_bytes());
let parents = parents
.into_iter()
.map(|(k, vs)| (v(k), vs.into_iter().map(v).collect()))
.collect();
(heads, parents)
}

View File

@ -126,7 +126,9 @@ impl AppendCommits for HgCommits {
.cloned()
.collect()
};
self.dag.add_heads(parent_func, &heads).await?;
let parent_func: Box<dyn Fn(Vertex) -> dag::Result<Vec<Vertex>> + Send + Sync> =
Box::new(parent_func);
self.dag.add_heads(&parent_func, &heads).await?;
Ok(())
}

View File

@ -72,7 +72,9 @@ impl AppendCommits for MemHgCommits {
}
heads.into_iter().collect()
};
self.dag.add_heads(parent_func, &heads).await?;
let parent_func: Box<dyn Fn(Vertex) -> dag::Result<Vec<Vertex>> + Send + Sync> =
Box::new(parent_func);
self.dag.add_heads(&parent_func, &heads).await?;
Ok(())
}

View File

@ -418,7 +418,7 @@ impl MutationStore {
}
// Construct parent_func.
let parent_func = |node: VertexName| -> dag::Result<Vec<VertexName>> {
let parent_func = move |node: VertexName| -> dag::Result<Vec<VertexName>> {
match parent_map.get(&Node::from_slice(node.as_ref()).unwrap()) {
None => Ok(Vec::new()),
Some(parents) => Ok(parents
@ -440,8 +440,11 @@ impl MutationStore {
// Construct the graph.
let mut dag = MemNameDag::new();
let parents: Box<dyn Fn(VertexName) -> dag::Result<Vec<VertexName>> + Send + Sync> =
Box::new(parent_func);
// Inserting to a memory DAG from a fully known parent function is non-blocking.
non_blocking_result(dag.add_heads(parent_func, &heads))?;
non_blocking_result(dag.add_heads(&parents, &heads))?;
Ok(dag)
}
}

View File

@ -22,6 +22,7 @@ use dag::ops::DagAddHeads;
use dag::ops::DagAlgorithm;
use dag::ops::IdConvert;
use dag::ops::IdMapSnapshot;
use dag::ops::Parents;
use dag::ops::PrefixLookup;
use dag::ops::ToIdSet;
use dag::Group;
@ -1636,10 +1637,11 @@ impl IdMapSnapshot for RevlogIndex {
}
impl RevlogIndex {
fn add_heads_for_testing<F>(&mut self, parents_func: &F, heads: &[Vertex]) -> dag::Result<()>
where
F: Fn(Vertex) -> dag::Result<Vec<Vertex>>,
{
fn add_heads_for_testing(
&mut self,
parents_func: &dyn Parents,
heads: &[Vertex],
) -> dag::Result<()> {
if !cfg!(test) {
panic!(
"add_heads should only works for testing \
@ -1652,7 +1654,7 @@ impl RevlogIndex {
// Update IdMap. Keep track of what heads are added.
for head in heads.iter() {
if !non_blocking_result(self.contains_vertex_name(&head))? {
let parents = parents_func(head.clone())?;
let parents = non_blocking_result(parents_func.parent_names(head.clone()))?;
for parent in parents.iter() {
self.add_heads_for_testing(parents_func, &[parent.clone()])?;
}
@ -1680,12 +1682,8 @@ impl RevlogIndex {
#[async_trait::async_trait]
impl DagAddHeads for RevlogIndex {
async fn add_heads<F>(&mut self, parents_func: F, heads: &[Vertex]) -> dag::Result<()>
where
F: Fn(Vertex) -> dag::Result<Vec<Vertex>>,
F: Send,
{
self.add_heads_for_testing(&parents_func, heads)
async fn add_heads(&mut self, parents_func: &dyn Parents, heads: &[Vertex]) -> dag::Result<()> {
self.add_heads_for_testing(parents_func, heads)
}
}