copytrace: make read_renamed_metadata API async

Summary:
Our current practice is only using `block_on` in Python bindings, so this diff is to change the `read_rename_metadata` to an Async function and also remove the `block_on`s in its implementations, so it is consistent with other APIs.

In the longterm, we'd like to remove those Async APIs, but we will to use a lib to help us migrate them in an automatic way, consistent APIs will make the migration process eaiser.

Reviewed By: quark-zju

Differential Revision: D44199939

fbshipit-source-id: a75d1356e373f3d883da4ab972520a665587eb23
This commit is contained in:
Zhaolong Zhu 2023-03-28 13:28:54 -07:00 committed by Facebook GitHub Bot
parent a6daf1870c
commit dd514e6692
10 changed files with 86 additions and 68 deletions

View File

@ -102,7 +102,7 @@ py_class!(pub class dagcopytrace |py| {
) -> PyResult<HashMap<String, String>> {
let old_tree = old_tree.get_underlying(py);
let new_tree = new_tree.get_underlying(py);
let map = self.inner(py).find_renames(&old_tree.read(), &new_tree.read()).map_pyerr(py)?;
let map = block_on(self.inner(py).find_renames(&old_tree.read(), &new_tree.read())).map_pyerr(py)?;
let map = map
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))

View File

@ -63,10 +63,10 @@ impl ReadFileContents for PythonFileScmStore {
futures::stream::iter(contents.into_iter()).boxed()
}
fn read_rename_metadata(
async fn read_rename_metadata(
&self,
_keys: Vec<Key>,
) -> Result<Vec<(Key, Option<Key>)>, Self::Error> {
Ok(vec![])
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
futures::stream::empty().boxed()
}
}

View File

@ -1050,11 +1050,11 @@ mod test {
.boxed()
}
fn read_rename_metadata(
async fn read_rename_metadata(
&self,
_keys: Vec<Key>,
) -> Result<Vec<(Key, Option<Key>)>, Self::Error> {
Ok(vec![])
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
stream::empty().boxed()
}
}

View File

@ -49,7 +49,7 @@ pub trait CopyTrace {
/// TODO: move this method into a separate trait. Practically the graph log and
/// the find_renames can use different impls independently and form different
/// combinations.
fn find_renames(
async fn find_renames(
&self,
old_tree: &TreeManifest,
new_tree: &TreeManifest,

View File

@ -17,6 +17,7 @@ use manifest_tree::TreeManifest;
use manifest_tree::TreeStore;
use pathhistory::RenameTracer;
use pathmatcher::AlwaysMatcher;
use storemodel::futures::StreamExt;
use storemodel::ReadFileContents;
use storemodel::ReadRootTreeIds;
use types::HgId;
@ -58,14 +59,21 @@ impl DagCopyTrace {
Ok(dag_copy_trace)
}
fn read_renamed_metadata(&self, keys: Vec<Key>) -> Result<HashMap<RepoPathBuf, RepoPathBuf>> {
async fn read_renamed_metadata(
&self,
keys: Vec<Key>,
) -> Result<HashMap<RepoPathBuf, RepoPathBuf>> {
// TODO: add metrics for the size of the result
let renames = self.file_reader.read_rename_metadata(keys)?;
let map: HashMap<_, _> = renames
.into_iter()
.filter(|(_, v)| v.is_some())
.map(|(key, rename_from_key)| (key.path, rename_from_key.unwrap().path))
.collect();
let mut renames = self.file_reader.read_rename_metadata(keys).await;
let mut map: HashMap<RepoPathBuf, RepoPathBuf> = HashMap::new();
while let Some(rename) = renames.next().await {
let (key, rename_from_key) = rename?;
if let Some(rename_from_key) = rename_from_key {
map.insert(key.path, rename_from_key.path);
}
}
Ok(map)
}
@ -130,7 +138,7 @@ impl DagCopyTrace {
// For simplicity, we only check p1.
let p1 = &parents[0];
let (old_manifest, new_manifest) = self.vertex_to_tree_manifest(p1, &commit).await?;
let renames = self.find_renames(&old_manifest, &new_manifest)?;
let renames = self.find_renames(&old_manifest, &new_manifest).await?;
let (renames, next_commit) = match direction {
SearchDirection::Backward => (renames, p1.clone()),
SearchDirection::Forward => {
@ -253,7 +261,7 @@ impl CopyTrace for DagCopyTrace {
}
}
fn find_renames(
async fn find_renames(
&self,
old_tree: &TreeManifest,
new_tree: &TreeManifest,
@ -262,23 +270,29 @@ impl CopyTrace for DagCopyTrace {
// * [x] parse file header and get mv info
// * support content similarity for sl repo
// * support content similarity for git repo
let matcher = AlwaysMatcher::new();
let diff = Diff::new(old_tree, new_tree, &matcher)?;
let mut new_files = Vec::new();
for entry in diff {
let entry = entry?;
if let DiffType::RightOnly(file_metadata) = entry.diff_type {
let path = entry.path;
let key = Key {
path,
hgid: file_metadata.hgid,
};
new_files.push(key);
{
// this block is for dropping matcher and diff at the end of the block,
// otherwise the compiler compilains variable might be used across 'await'
let matcher = AlwaysMatcher::new();
let diff = Diff::new(old_tree, new_tree, &matcher)?;
for entry in diff {
let entry = entry?;
if let DiffType::RightOnly(file_metadata) = entry.diff_type {
let path = entry.path;
let key = Key {
path,
hgid: file_metadata.hgid,
};
new_files.push(key);
}
}
}
self.read_renamed_metadata(new_files)
self.read_renamed_metadata(new_files).await
}
}

View File

@ -45,17 +45,19 @@ impl ReadFileContents for EagerRepoStore {
futures::stream::iter(iter).boxed()
}
fn read_rename_metadata(&self, keys: Vec<Key>) -> Result<Vec<(Key, Option<Key>)>, Self::Error> {
keys.into_iter()
.map(|k| {
let id = k.hgid;
let copy_from = match self.get_content(id)? {
Some(data) => strip_metadata(&data)?.1,
None => anyhow::bail!("no such file: {:?}", &k),
};
Ok((k, copy_from))
})
.collect()
async fn read_rename_metadata(
&self,
keys: Vec<Key>,
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
let iter = keys.into_iter().map(|k| {
let id = k.hgid;
let copy_from = match self.get_content(id)? {
Some(data) => strip_metadata(&data)?.1,
None => anyhow::bail!("no such file: {:?}", &k),
};
Ok((k, copy_from))
});
futures::stream::iter(iter).boxed()
}
}

View File

@ -37,11 +37,11 @@ impl ReadFileContents for GitStore {
futures::stream::iter(iter).boxed()
}
fn read_rename_metadata(
async fn read_rename_metadata(
&self,
_keys: Vec<Key>,
) -> Result<Vec<(Key, Option<Key>)>, Self::Error> {
Ok(vec![])
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
futures::stream::empty().boxed()
}
}

View File

@ -12,7 +12,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use anyhow::format_err;
use anyhow::Result;
use async_runtime::block_on;
use async_trait::async_trait;
use futures::stream;
use futures::stream::BoxStream;
@ -54,16 +53,16 @@ where
.boxed()
}
fn read_rename_metadata(&self, keys: Vec<Key>) -> Result<Vec<(Key, Option<Key>)>> {
let items = block_on(
stream_data_from_remote_data_store(self.0.clone(), keys)
.map(|result| match result {
Ok((_data, key, copy_from)) => Ok((key, copy_from)),
Err(err) => Err(err),
})
.collect::<Vec<_>>(),
);
items.into_iter().collect()
async fn read_rename_metadata(
&self,
keys: Vec<Key>,
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
stream_data_from_remote_data_store(self.0.clone(), keys)
.map(|result| match result {
Ok((_data, key, copy_from)) => Ok((key, copy_from)),
Err(err) => Err(err),
})
.boxed()
}
}
@ -80,16 +79,16 @@ impl ReadFileContents for ArcFileStore {
.boxed()
}
fn read_rename_metadata(&self, keys: Vec<Key>) -> Result<Vec<(Key, Option<Key>)>> {
let items = block_on(
stream_data_from_scmstore(self.0.clone(), keys)
.map(|result| match result {
Ok((_data, key, copy_from)) => Ok((key, copy_from)),
Err(err) => Err(err),
})
.collect::<Vec<_>>(),
);
items.into_iter().collect()
async fn read_rename_metadata(
&self,
keys: Vec<Key>,
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
stream_data_from_scmstore(self.0.clone(), keys)
.map(|result| match result {
Ok((_data, key, copy_from)) => Ok((key, copy_from)),
Err(err) => Err(err),
})
.boxed()
}
}

View File

@ -50,7 +50,10 @@ pub trait ReadFileContents {
/// Read rename metadata of sepcified files.
///
/// The result is a vector of (key, Option<rename_from_key>) pairs for success case.
fn read_rename_metadata(&self, keys: Vec<Key>) -> Result<Vec<(Key, Option<Key>)>, Self::Error>;
async fn read_rename_metadata(
&self,
keys: Vec<Key>,
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>>;
}
pub trait RefreshableReadFileContents: ReadFileContents {

View File

@ -537,11 +537,11 @@ inc
.boxed()
}
fn read_rename_metadata(
async fn read_rename_metadata(
&self,
_keys: Vec<Key>,
) -> Result<Vec<(Key, Option<Key>)>, Self::Error> {
Ok(vec![])
) -> BoxStream<Result<(Key, Option<Key>), Self::Error>> {
stream::empty().boxed()
}
}
}