dag: make DagPersistent and DagAddHeads async

Summary: This makes it easier to make DagAlgorithm async.

Reviewed By: sfilipco

Differential Revision: D25345234

fbshipit-source-id: 5ca4bac38f5aac4c6611146a87f423a244f1f5a2
This commit is contained in:
Jun Wu 2020-12-10 12:32:23 -08:00 committed by Facebook GitHub Bot
parent 6d9e8eb249
commit a03e8f4c55
17 changed files with 69 additions and 34 deletions

View File

@ -200,7 +200,7 @@ py_class!(pub class commits |py| {
def migraterevlogtosegments(revlogdir: &PyPath, segmentsdir: &PyPath, commitsdir: &PyPath, master: Names) -> PyResult<PyNone> {
let revlog = RevlogCommits::new(revlogdir.as_path()).map_pyerr(py)?;
let mut segments = HgCommits::new(segmentsdir.as_path(), commitsdir.as_path()).map_pyerr(py)?;
py.allow_threads(|| segments.import_dag(revlog, master.0)).map_pyerr(py)?;
py.allow_threads(|| block_on(segments.import_dag(revlog, master.0))).map_pyerr(py)?;
Ok(PyNone)
}

View File

@ -6,5 +6,6 @@ edition = "2018"
[dependencies]
dag = { path = ".." }
drawdag = { path = "../../drawdag" }
nonblocking = { path = "../../nonblocking" }
tempfile = "3"
vlqencoding = { path = "../../vlqencoding" }

View File

@ -11,6 +11,7 @@ use dag::ops::DagAlgorithm;
use dag::ops::IdConvert;
use dag::OnDiskIdDag;
use dag::{ops::DagPersistent, spanset::SpanSet, Id, NameDag, VertexName};
use nonblocking::non_blocking_result;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Range;
@ -51,7 +52,7 @@ impl TestContext {
}
}
impl<T: AsRef<[usize]>> GeneralTestContext<T> {
impl<T: AsRef<[usize]> + Send + Sync> GeneralTestContext<T> {
pub fn from_parents(parents: Vec<T>) -> Self {
// Prepare NameDag
let parents_by_name = |name: VertexName| -> dag::Result<Vec<VertexName>> {
@ -84,7 +85,7 @@ impl<T: AsRef<[usize]>> GeneralTestContext<T> {
let dir = tempfile::tempdir().unwrap();
let mut dag = NameDag::open(dir.path()).unwrap();
dag.add_heads_and_flush(&parents_by_name, &master_names, &head_names)
non_blocking_result(dag.add_heads_and_flush(&parents_by_name, &master_names, &head_names))
.unwrap();
// Prepare idmap

View File

@ -7,4 +7,6 @@ edition = "2018"
anyhow = "1"
dag = { path = ".." }
git2 = "0.13"
nonblocking = { path = "../../nonblocking" }
parking_lot = "0.11"
tracing = "0.1"

View File

@ -11,6 +11,7 @@ use dag::ops::DagPersistent;
use dag::Dag;
use dag::Set;
use dag::Vertex;
use nonblocking::non_blocking_result;
use std::collections::BTreeMap;
use std::ops::Deref;
use std::path::Path;
@ -107,11 +108,19 @@ fn sync_from_git(
}
}
let parent_func = |v: Vertex| -> dag::Result<Vec<Vertex>> {
struct ForceSend<T>(T);
// See https://github.com/rust-lang/git2-rs/issues/194, libgit2 can be
// accessed by a different thread.
unsafe impl<T> Send for ForceSend<T> {}
let git_repo = ForceSend(git_repo);
let parent_func = move |v: Vertex| -> dag::Result<Vec<Vertex>> {
tracing::trace!("visiting git commit {:?}", &v);
let oid = git2::Oid::from_bytes(v.as_ref())
.with_context(|| format!("converting to git oid for {:?}", &v))?;
let commit = git_repo
.0
.find_commit(oid)
.with_context(|| format!("resolving {:?} to git commit", &v))?;
Ok(commit
@ -119,7 +128,7 @@ fn sync_from_git(
.map(|id| Vertex::copy_from(id.as_bytes()))
.collect())
};
dag.add_heads_and_flush(parent_func, &master_heads, &non_master_heads)?;
non_blocking_result(dag.add_heads_and_flush(parent_func, &master_heads, &non_master_heads))?;
let possible_heads =
Set::from_static_names(master_heads.into_iter().chain(non_master_heads.into_iter()));

View File

@ -150,7 +150,7 @@ pub(crate) fn beautify(
let mut dag = MemNameDag::new();
let get_parents = |v| this.parent_names(v);
dag.add_heads(get_parents, &heads)?;
nonblocking::non_blocking(dag.add_heads(get_parents, &heads))??;
Ok(dag)
}

View File

@ -58,3 +58,5 @@ pub mod tests;
pub use errors::DagError as Error;
pub type Result<T> = std::result::Result<T, Error>;
pub use nonblocking;

View File

@ -80,6 +80,7 @@ where
state: S,
}
#[async_trait::async_trait]
impl<IS, M, P, S> DagPersistent for AbstractNameDag<IdDag<IS>, M, P, S>
where
IS: IdDagStore + Persist,
@ -92,7 +93,7 @@ where
///
/// This is similar to calling `add_heads` followed by `flush`.
/// But is faster.
fn add_heads_and_flush<F>(
async fn add_heads_and_flush<F>(
&mut self,
parent_names_func: F,
master_names: &[VertexName],
@ -100,6 +101,7 @@ where
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send,
{
if !self.pending_heads.is_empty() {
return programming(format!(
@ -146,7 +148,7 @@ where
/// Write in-memory DAG to disk. This will also pick up changes to
/// the DAG by other processes.
fn flush(&mut self, master_heads: &[VertexName]) -> Result<()> {
async fn flush(&mut self, master_heads: &[VertexName]) -> Result<()> {
// Sanity check.
for head in master_heads.iter() {
if !self.map.contains_vertex_name(head)? {
@ -168,12 +170,15 @@ where
let mut new_name_dag = self.path.open()?;
let seg_size = self.dag.get_new_segment_size();
new_name_dag.dag.set_new_segment_size(seg_size);
new_name_dag.add_heads_and_flush(&parents, master_heads, non_master_heads)?;
new_name_dag
.add_heads_and_flush(&parents, master_heads, non_master_heads)
.await?;
*self = new_name_dag;
Ok(())
}
}
#[async_trait::async_trait]
impl<IS, M, P, S> DagAddHeads for AbstractNameDag<IdDag<IS>, M, P, S>
where
IS: IdDagStore,
@ -190,8 +195,9 @@ where
/// The added vertexes are immediately query-able. They will get Ids
/// assigned to the NON_MASTER group internally. The `flush` function
/// can re-assign Ids to the MASTER group.
fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
async fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
where
F: Send,
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
// Assign to the NON_MASTER group unconditionally so we can avoid the

View File

@ -162,11 +162,13 @@ pub trait DagAlgorithm: Send + Sync {
}
/// Add vertexes recursively to the DAG.
#[async_trait::async_trait]
pub trait DagAddHeads {
/// Add vertexes and their ancestors to the DAG. This does not persistent
/// changes to disk.
fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
async fn add_heads<F>(&mut self, parents: F, heads: &[VertexName]) -> Result<()>
where
F: Send,
F: Fn(VertexName) -> Result<Vec<VertexName>>;
}
@ -177,23 +179,29 @@ pub trait DagImportCloneData {
}
/// Persistent the DAG on disk.
#[async_trait::async_trait]
pub trait DagPersistent {
/// Write in-memory DAG to disk. This might also pick up changes to
/// the DAG by other processes.
fn flush(&mut self, master_heads: &[VertexName]) -> Result<()>;
async fn flush(&mut self, master_heads: &[VertexName]) -> Result<()>;
/// A faster path for add_heads, followed by flush.
fn add_heads_and_flush<F>(
async fn add_heads_and_flush<F>(
&mut self,
parent_names_func: F,
master_names: &[VertexName],
non_master_names: &[VertexName],
) -> Result<()>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>;
F: Fn(VertexName) -> Result<Vec<VertexName>>,
F: Send;
/// Import from another (potentially large) DAG. Write to disk immediately.
fn import_and_flush(&mut self, dag: &dyn DagAlgorithm, master_heads: NameSet) -> Result<()> {
async fn import_and_flush(
&mut self,
dag: &dyn DagAlgorithm,
master_heads: NameSet,
) -> Result<()> {
let heads = dag.heads(dag.all()?)?;
let non_master_heads = heads - master_heads.clone();
let master_heads: Vec<VertexName> = master_heads.iter()?.collect::<Result<Vec<_>>>()?;
@ -201,6 +209,7 @@ pub trait DagPersistent {
non_master_heads.iter()?.collect::<Result<Vec<_>>>()?;
let parent_func = |v| dag.parent_names(v);
self.add_heads_and_flush(parent_func, &master_heads, &non_master_heads)
.await
}
}
@ -274,7 +283,7 @@ where
.map(|p| VertexName::copy_from(p.as_bytes()))
.collect())
};
self.add_heads(&parents_func, &heads[..])?;
nonblocking::non_blocking_result(self.add_heads(&parents_func, &heads[..]))?;
Ok(())
}
}

View File

@ -17,6 +17,7 @@ use crate::NameDag;
use crate::NameSet;
use crate::Result;
use crate::SpanSet;
use nonblocking::non_blocking_result;
use tempfile::tempdir;
use test_dag::TestDag;
@ -195,7 +196,7 @@ fn test_generic_dag_import(dag: impl DagAlgorithm + DagAddHeads) -> Result<()> {
let dir = tempdir().unwrap();
let mut dag2 = NameDag::open(&dir.path())?;
dag2.import_and_flush(&dag1, nameset("J"))?;
non_blocking_result(dag2.import_and_flush(&dag1, nameset("J")))?;
assert_eq!(
render(&dag2),
r#"
@ -733,13 +734,13 @@ fn test_namedag_reassign_master() -> crate::Result<()> {
assert_eq!(format!("{:?}", dag.parent_names("C".into())?), "[B]");
// First flush, A, B, C are non-master.
dag.flush(&[]).unwrap();
non_blocking_result(dag.flush(&[])).unwrap();
assert_eq!(format!("{:?}", dag.vertex_id("A".into())?), "N0");
assert_eq!(format!("{:?}", dag.vertex_id("C".into())?), "N2");
// Second flush, making B master without adding new vertexes.
dag.flush(&["B".into()]).unwrap();
non_blocking_result(dag.flush(&["B".into()])).unwrap();
assert_eq!(format!("{:?}", dag.vertex_id("A".into())?), "0");
assert_eq!(format!("{:?}", dag.vertex_id("B".into())?), "1");
assert_eq!(format!("{:?}", dag.vertex_id("C".into())?), "N0");

View File

@ -14,6 +14,7 @@ use crate::render::render_namedag;
use crate::NameDag;
use crate::Result;
use crate::Vertex;
use nonblocking::non_blocking_result;
use std::collections::HashSet;
/// Dag structure for testing purpose.
@ -61,13 +62,13 @@ impl TestDag {
None => all_heads,
};
self.dag.dag.set_new_segment_size(self.seg_size);
self.dag.add_heads(&parent_func, &heads).unwrap();
non_blocking_result(self.dag.add_heads(&parent_func, &heads)).unwrap();
self.validate();
let master_heads = master_heads
.iter()
.map(|s| Vertex::copy_from(s.as_bytes()))
.collect::<Vec<_>>();
self.dag.flush(&master_heads).unwrap();
non_blocking_result(self.dag.flush(&master_heads)).unwrap();
self.validate();
}

View File

@ -11,6 +11,7 @@ python3 = ["bindings/python3", "pytracing/python3", "python3-sys", "cpython-ext/
[dependencies]
anyhow = "1.0.20"
async-runtime = { path = "../async-runtime" }
bindings = { path = "../../edenscmnative/bindings", default-features = false }
blackbox = { path = "../blackbox" }
clidispatch = { path = "../clidispatch" }

View File

@ -10,6 +10,7 @@ use super::Result;
use super::IO;
use anyhow::format_err;
use anyhow::Context;
use async_runtime::block_on_exclusive as block_on;
use clidispatch::errors;
use cliparser::define_flags;
use dag::namedag::IndexedLogNameDagPath;
@ -75,8 +76,7 @@ pub fn run(opts: StatusOpts, _io: &mut IO, config: ConfigSet) -> Result<u8> {
.import_clone_data(vertex_clone_data)
.context("error importing segmented changelog")?;
namedag
.flush(&[master.clone()])
block_on(namedag.flush(&[master.clone()]))
.context("error writing segmented changelog to disk")?;
fs::write(

View File

@ -56,8 +56,8 @@ impl HgCommits {
/// Import another DAG. `main` specifies the main branch for commit graph
/// optimization.
pub fn import_dag(&mut self, other: impl DagAlgorithm, main: Set) -> Result<()> {
self.dag.import_and_flush(&other, main)?;
pub async fn import_dag(&mut self, other: impl DagAlgorithm, main: Set) -> Result<()> {
self.dag.import_and_flush(&other, main).await?;
Ok(())
}
@ -92,15 +92,13 @@ impl AppendCommits for HgCommits {
}
// Write commit data to zstore.
let mut zstore = self.commits.write();
for commit in commits {
let text = text_with_header(&commit.raw_text, &commit.parents)?;
let vertex = Vertex::copy_from(zstore.insert(&text, &[])?.as_ref());
let vertex = Vertex::copy_from(self.commits.write().insert(&text, &[])?.as_ref());
if vertex != commit.vertex {
return Err(crate::Error::HashMismatch(vertex, commit.vertex.clone()));
}
}
drop(zstore);
// Write commit graph to DAG.
let commits_map: HashMap<Vertex, HgCommit> = commits
@ -108,7 +106,7 @@ impl AppendCommits for HgCommits {
.cloned()
.map(|c| (c.vertex.clone(), c))
.collect();
let parent_func = |v: Vertex| -> dag::Result<Vec<Vertex>> {
let parent_func = move |v: Vertex| -> dag::Result<Vec<Vertex>> {
match commits_map.get(&v) {
Some(commit) => Ok(commit.parents.clone()),
None => v.not_found(),
@ -128,14 +126,14 @@ impl AppendCommits for HgCommits {
.cloned()
.collect()
};
self.dag.add_heads(parent_func, &heads)?;
self.dag.add_heads(parent_func, &heads).await?;
Ok(())
}
async fn flush(&mut self, master_heads: &[Vertex]) -> Result<()> {
self.flush_commit_data().await?;
self.dag.flush(master_heads)?;
self.dag.flush(master_heads).await?;
Ok(())
}

View File

@ -72,7 +72,7 @@ impl AppendCommits for MemHgCommits {
}
heads.into_iter().collect()
};
self.dag.add_heads(parent_func, &heads)?;
self.dag.add_heads(parent_func, &heads).await?;
Ok(())
}

View File

@ -33,6 +33,7 @@ use anyhow::Result;
use bitflags::bitflags;
use dag::namedag::MemNameDag;
use dag::nameset::meta::MetaSet;
use dag::nonblocking::non_blocking_result;
use dag::ops::DagAddHeads;
use dag::DagAlgorithm;
use dag::Set;
@ -412,7 +413,8 @@ impl MutationStore {
// Construct the graph.
let mut dag = MemNameDag::new();
dag.add_heads(parent_func, &heads)?;
// Inserting to a memory DAG from a fully known parent function is non-blocking.
non_blocking_result(dag.add_heads(parent_func, &heads))?;
Ok(dag)
}
}

View File

@ -1668,10 +1668,12 @@ impl RevlogIndex {
}
}
#[async_trait::async_trait]
impl DagAddHeads for RevlogIndex {
fn add_heads<F>(&mut self, parents_func: F, heads: &[Vertex]) -> dag::Result<()>
async fn add_heads<F>(&mut self, parents_func: F, heads: &[Vertex]) -> dag::Result<()>
where
F: Fn(Vertex) -> dag::Result<Vec<Vertex>>,
F: Send,
{
self.add_heads_for_testing(&parents_func, heads)
}