edenapi: change client file fetching to work with FileResponse

Summary:
Currently fetching files from edenapi has no mechanism to report errors
to the client, so if the server can't find the key or hits an error, the client
just doesn't hear anything back.

As part of improving our network reliability and debugability, let's enable
passing errors back. The commit APIs use a pattern involving a response object
that includes the request input (in files case a Key) and a Result, so let's
follow that same pattern. This will require a new files2 endpoint, which is
introduced in a subsequent diff.

In this diff, we just introduce the FileResponse type and convert the current
FileEntry response from files v1 into the new type so we can make the endpoint
swappable later.

D27549923 (82b689ad9d) has discussion on why to choose this pattern.

Reviewed By: quark-zju

Differential Revision: D33283031

fbshipit-source-id: ec2e34760ee47ead95964e2d33e0be4173bb4e77
This commit is contained in:
Durham Goode 2022-01-19 17:21:28 -08:00 committed by Facebook GitHub Bot
parent 8d5ae8b80f
commit d589c916d5
14 changed files with 156 additions and 47 deletions

View File

@ -44,12 +44,13 @@ Check files in response.
$ hgedenapi debugapi -e filesattrs -f req
[{"key": {"node": bin("17b8d4e3bafd4ec4812ad7c930aace9bf07ab033"),
"path": "copy.txt"},
"content": {"metadata": {"size": None,
"flags": None},
"hg_file_blob": b"\x01\ncopy: test.txt\ncopyrev: 186cafa3319c24956783383dc44c5cbc68c5a0ca\n\x01\ntest content\n"},
"parents": None,
"aux_data": {"sha1": bin("4fe2b8dd12cd9cd6a413ea960cd8c09c25f19527"),
"sha256": bin("a1fff0ffefb9eace7230c24e50731f0a91c62f9cefdfe77121c2f607125dffae"),
"content_id": bin("888dcf533a354c23e4bf67e1ada984d96bb1089b0c3c03f4c2cb773709e7aa42"),
"total_size": 13}}]
"result": {"Ok": {"key": {"node": bin("17b8d4e3bafd4ec4812ad7c930aace9bf07ab033"),
"path": "copy.txt"},
"content": {"metadata": {"size": None,
"flags": None},
"hg_file_blob": b"\x01\ncopy: test.txt\ncopyrev: 186cafa3319c24956783383dc44c5cbc68c5a0ca\n\x01\ntest content\n"},
"parents": None,
"aux_data": {"sha1": bin("4fe2b8dd12cd9cd6a413ea960cd8c09c25f19527"),
"sha256": bin("a1fff0ffefb9eace7230c24e50731f0a91c62f9cefdfe77121c2f607125dffae"),
"content_id": bin("888dcf533a354c23e4bf67e1ada984d96bb1089b0c3c03f4c2cb773709e7aa42"),
"total_size": 13}}}}]

View File

@ -34,7 +34,7 @@ use edenapi_types::CommitRevlogData;
use edenapi_types::EphemeralPrepareResponse;
use edenapi_types::FetchSnapshotRequest;
use edenapi_types::FetchSnapshotResponse;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use edenapi_types::FileType;
use edenapi_types::HgChangesetContent;
@ -103,14 +103,14 @@ py_class!(pub class client |py| {
def files(
&self,
keys: Vec<(PyPathBuf, Serde<HgId>)>
) -> PyResult<TStream<anyhow::Result<Serde<FileEntry>>>> {
) -> PyResult<TStream<anyhow::Result<Serde<FileResponse>>>> {
self.inner(py).as_ref().files_py(py, keys)
}
def filesattrs(
&self,
spec: Serde<Vec<FileSpec>>,
) -> PyResult<TStream<anyhow::Result<Serde<FileEntry>>>> {
) -> PyResult<TStream<anyhow::Result<Serde<FileResponse>>>> {
let api = self.inner(py).as_ref();
let entries = py
.allow_threads(|| block_unless_interrupted(api.files_attrs(spec.0)))

View File

@ -40,7 +40,7 @@ use edenapi_types::CommitRevlogData;
use edenapi_types::EdenApiServerError;
use edenapi_types::FetchSnapshotRequest;
use edenapi_types::FetchSnapshotResponse;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::HgChangesetContent;
use edenapi_types::HgFilenodeData;
use edenapi_types::HgMutationEntryContent;
@ -88,7 +88,7 @@ pub trait EdenApiPyExt: EdenApi {
&self,
py: Python,
keys: Vec<(PyPathBuf, Serde<HgId>)>,
) -> PyResult<TStream<anyhow::Result<Serde<FileEntry>>>> {
) -> PyResult<TStream<anyhow::Result<Serde<FileResponse>>>> {
let keys = to_keys(py, &keys)?;
let entries = py
.allow_threads(|| block_unless_interrupted(self.files(keys)))

View File

@ -33,6 +33,7 @@ use edenapi::types::CommitMutationsResponse;
use edenapi::types::CommitRevlogData;
use edenapi::types::FileContent;
use edenapi::types::FileEntry;
use edenapi::types::FileResponse;
use edenapi::types::FileSpec;
use edenapi::types::HgId;
use edenapi::types::HistoryEntry;
@ -68,7 +69,7 @@ impl EdenApi for EagerRepo {
Ok(vec!["segmented-changelog".to_string()])
}
async fn files(&self, keys: Vec<Key>) -> edenapi::Result<Response<FileEntry>> {
async fn files(&self, keys: Vec<Key>) -> edenapi::Result<Response<FileResponse>> {
debug!("files {}", debug_key_list(&keys));
let mut values = Vec::with_capacity(keys.len());
for key in keys {
@ -77,7 +78,7 @@ impl EdenApi for EagerRepo {
let (p1, p2) = extract_p1_p2(&data);
let parents = Parents::new(p1, p2);
let entry = FileEntry {
key,
key: key.clone(),
parents,
// PERF: to_vec().into() converts minibytes::Bytes to bytes::Bytes.
content: Some(FileContent {
@ -86,12 +87,16 @@ impl EdenApi for EagerRepo {
}),
aux_data: None,
};
values.push(Ok(entry));
let response = FileResponse {
key,
result: Ok(entry),
};
values.push(Ok(response));
}
Ok(convert_to_response(values))
}
async fn files_attrs(&self, reqs: Vec<FileSpec>) -> edenapi::Result<Response<FileEntry>> {
async fn files_attrs(&self, reqs: Vec<FileSpec>) -> edenapi::Result<Response<FileResponse>> {
debug!("files {}", debug_spec_list(&reqs));
let mut values = Vec::with_capacity(reqs.len());
for spec in reqs {
@ -102,7 +107,7 @@ impl EdenApi for EagerRepo {
let parents = Parents::new(p1, p2);
// TODO(meyer): Actually implement aux data here.
let entry = FileEntry {
key,
key: key.clone(),
parents,
// PERF: to_vec().into() converts minibytes::Bytes to bytes::Bytes.
content: Some(FileContent {
@ -111,7 +116,11 @@ impl EdenApi for EagerRepo {
}),
aux_data: None,
};
values.push(Ok(entry));
let response = FileResponse {
key,
result: Ok(entry),
};
values.push(Ok(response));
}
Ok(convert_to_response(values))
}

View File

@ -47,6 +47,7 @@ use edenapi_types::FetchSnapshotRequest;
use edenapi_types::FetchSnapshotResponse;
use edenapi_types::FileEntry;
use edenapi_types::FileRequest;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use edenapi_types::HgFilenodeData;
use edenapi_types::HgMutationEntryContent;
@ -412,7 +413,7 @@ impl Client {
pub(crate) async fn fetch_files(
&self,
keys: Vec<Key>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
tracing::info!("Requesting content for {} file(s)", keys.len());
if keys.is_empty() {
@ -428,7 +429,19 @@ impl Client {
req
})?;
Ok(self.fetch_guard::<FileEntry>(requests, guards)?)
// Convert files v1's FileEntry response into a files v2 FileResponse.
let response = self.fetch_guard::<FileEntry>(requests, guards)?;
let entries = response
.entries
.map_ok(|file_entry| FileResponse {
key: file_entry.key.clone(),
result: Ok(file_entry),
})
.boxed();
Ok(Response {
entries,
stats: response.stats,
})
}
pub(crate) async fn fetch_trees(
@ -458,7 +471,7 @@ impl Client {
pub(crate) async fn fetch_files_attrs(
&self,
reqs: Vec<FileSpec>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
tracing::info!("Requesting attributes for {} file(s)", reqs.len());
if reqs.is_empty() {
@ -474,7 +487,19 @@ impl Client {
req
})?;
Ok(self.fetch_guard::<FileEntry>(requests, guards)?)
// Convert files v1's FileEntry response into a files v2 FileResponse.
let response = self.fetch_guard::<FileEntry>(requests, guards)?;
let entries = response
.entries
.map_ok(|file_entry| FileResponse {
key: file_entry.key.clone(),
result: Ok(file_entry),
})
.boxed();
Ok(Response {
entries,
stats: response.stats,
})
}
/// Upload a single file
@ -587,7 +612,7 @@ impl EdenApi for Client {
Ok(caps)
}
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileResponse>, EdenApiError> {
tracing::info!("Requesting content for {} file(s)", keys.len());
let prog = self.inner.file_progress.create_or_extend(keys.len() as u64);
@ -603,7 +628,10 @@ impl EdenApi for Client {
.await
}
async fn files_attrs(&self, reqs: Vec<FileSpec>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files_attrs(
&self,
reqs: Vec<FileSpec>,
) -> Result<Response<FileResponse>, EdenApiError> {
tracing::info!("Requesting attributes for {} file(s)", reqs.len());
let prog = self.inner.file_progress.create_or_extend(reqs.len() as u64);

View File

@ -15,7 +15,7 @@ use super::RetryableStreamRequest;
use crate::client::Client;
use crate::errors::EdenApiError;
use crate::response::Response;
use crate::types::FileEntry;
use crate::types::FileResponse;
use crate::types::FileSpec;
pub(crate) struct RetryableFiles {
@ -31,7 +31,7 @@ impl RetryableFiles {
#[async_trait]
impl RetryableStreamRequest for RetryableFiles {
type Item = FileEntry;
type Item = FileResponse;
async fn perform(&self, client: Client) -> Result<Response<Self::Item>, EdenApiError> {
let keys = self.keys.iter().cloned().collect();
@ -39,7 +39,7 @@ impl RetryableStreamRequest for RetryableFiles {
}
fn received_item(&mut self, item: &Self::Item) {
self.keys.remove(item.key());
self.keys.remove(&item.key);
}
}
@ -56,7 +56,7 @@ impl RetryableFileAttrs {
#[async_trait]
impl RetryableStreamRequest for RetryableFileAttrs {
type Item = FileEntry;
type Item = FileResponse;
async fn perform(&self, client: Client) -> Result<Response<Self::Item>, EdenApiError> {
let reqs = self.reqs.values().cloned().collect();
@ -64,6 +64,6 @@ impl RetryableStreamRequest for RetryableFileAttrs {
}
fn received_item(&mut self, item: &Self::Item) {
self.reqs.remove(item.key());
self.reqs.remove(&item.key);
}
}

View File

@ -27,7 +27,7 @@ use edenapi_types::EdenApiServerError;
use edenapi_types::EphemeralPrepareResponse;
use edenapi_types::FetchSnapshotRequest;
use edenapi_types::FetchSnapshotResponse;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use edenapi_types::HgFilenodeData;
use edenapi_types::HgMutationEntryContent;
@ -59,12 +59,15 @@ pub trait EdenApi: Send + Sync + 'static {
Err(EdenApiError::NotSupported)
}
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileResponse>, EdenApiError> {
let _ = keys;
Err(EdenApiError::NotSupported)
}
async fn files_attrs(&self, reqs: Vec<FileSpec>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files_attrs(
&self,
reqs: Vec<FileSpec>,
) -> Result<Response<FileResponse>, EdenApiError> {
let _ = reqs;
Err(EdenApiError::NotSupported)
}

View File

@ -21,6 +21,7 @@ use types::parents::Parents;
use crate::ContentId;
use crate::InvalidHgId;
use crate::ServerError;
use crate::Sha1;
use crate::Sha256;
use crate::UploadToken;
@ -144,6 +145,12 @@ impl FileContent {
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct FileResponse {
pub key: Key,
pub result: Result<FileEntry, ServerError>,
}
/// Structure representing source control file content on the wire.
/// Includes the information required to add the data to a mutable store,
/// along with the parents for hash validation.

View File

@ -100,6 +100,7 @@ pub use crate::file::FileContent;
pub use crate::file::FileEntry;
pub use crate::file::FileError;
pub use crate::file::FileRequest;
pub use crate::file::FileResponse;
pub use crate::file::FileSpec;
pub use crate::file::HgFilenodeData;
pub use crate::file::UploadHgFilenodeRequest;

View File

@ -13,6 +13,7 @@ use serde_derive::Serialize;
use crate::file::FileContent;
use crate::file::FileEntry;
use crate::file::FileResponse;
pub use crate::file::WireFileAttributes;
pub use crate::file::WireFileAuxData;
pub use crate::file::WireFileRequest;
@ -25,6 +26,7 @@ use crate::wire::ToApi;
use crate::wire::ToWire;
use crate::wire::WireKey;
use crate::wire::WireParents;
use crate::wire::WireResult;
use crate::wire::WireRevisionstoreMetadata;
use crate::wire::WireToApiConversionError;
@ -92,6 +94,47 @@ impl ToApi for WireFileEntry {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct WireFileResponse {
#[serde(rename = "0")]
pub key: Option<WireKey>,
#[serde(rename = "1")]
pub result: Option<WireResult<WireFileEntry>>,
}
impl ToWire for FileResponse {
type Wire = WireFileResponse;
fn to_wire(self) -> Self::Wire {
Self::Wire {
key: Some(self.key.to_wire()),
result: Some(self.result.to_wire()),
}
}
}
impl ToApi for WireFileResponse {
type Api = FileResponse;
type Error = WireToApiConversionError;
fn to_api(self) -> Result<Self::Api, Self::Error> {
Ok(Self::Api {
key: match self.key {
Some(key) => key.to_api()?,
None => return Err(WireToApiConversionError::CannotPopulateRequiredField("key")),
},
result: match self.result {
Some(result) => result.to_api()?,
None => {
return Err(WireToApiConversionError::CannotPopulateRequiredField(
"result",
));
}
},
})
}
}
#[cfg(any(test, feature = "for-tests"))]
impl Arbitrary for WireFileEntry {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {

View File

@ -68,7 +68,15 @@ impl RemoteDataStore for EdenApiDataStore<File> {
.entries
.map(|entry| {
let store = self.store.clone();
spawn_blocking(move || entry.map(|e| store.add_file(&e)))
spawn_blocking(move || {
entry.map(|e| {
if let Ok(entry) = e.result {
store.add_file(&entry)
} else {
Ok(())
}
})
})
})
.buffer_unordered(4);

View File

@ -14,7 +14,7 @@ use edenapi::EdenApi;
use edenapi::EdenApiError;
use edenapi::Response;
use edenapi_types::EdenApiServerError;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use edenapi_types::TreeAttributes;
use edenapi_types::TreeEntry;
@ -116,21 +116,21 @@ impl EdenApiFileStore {
pub fn files_blocking(
&self,
keys: Vec<Key>,
) -> Result<BlockingResponse<FileEntry>, EdenApiError> {
) -> Result<BlockingResponse<FileResponse>, EdenApiError> {
BlockingResponse::from_async(self.client.files(keys))
}
pub fn files_attrs_blocking(
&self,
reqs: Vec<FileSpec>,
) -> Result<BlockingResponse<FileEntry>, EdenApiError> {
) -> Result<BlockingResponse<FileResponse>, EdenApiError> {
BlockingResponse::from_async(self.client.files_attrs(reqs))
}
pub async fn files_attrs(
&self,
reqs: Vec<FileSpec>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
self.client.files_attrs(reqs).await
}
}
@ -152,7 +152,7 @@ pub trait EdenApiStoreKind: Send + Sync + 'static {
async fn prefetch_files(
_client: Arc<dyn EdenApi>,
_keys: Vec<Key>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
unimplemented!("fetching files not supported for this store")
}
@ -170,7 +170,7 @@ impl EdenApiStoreKind for File {
async fn prefetch_files(
client: Arc<dyn EdenApi>,
keys: Vec<Key>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
client.files(keys).await
}
}

View File

@ -15,7 +15,7 @@ use async_runtime::block_on;
use async_runtime::spawn_blocking;
use async_runtime::stream_to_iter;
use crossbeam::channel::Sender;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use futures::StreamExt;
use parking_lot::RwLock;
@ -394,12 +394,14 @@ impl FetchState {
skip(entry, indexedlog_cache, lfs_cache, aux_cache, memcache)
)]
fn found_edenapi(
entry: FileEntry,
entry: FileResponse,
indexedlog_cache: Option<Arc<IndexedLogHgIdDataStore>>,
lfs_cache: Option<Arc<LfsStore>>,
aux_cache: Option<Arc<AuxStore>>,
memcache: Option<Arc<MemcacheStore>>,
) -> Result<(StoreFile, Option<LfsPointersEntry>)> {
let entry = entry.result?;
let key = entry.key.clone();
let mut file = StoreFile::default();
let mut lfsptr = None;

View File

@ -22,6 +22,7 @@ use edenapi_types::EdenApiServerError;
use edenapi_types::FileAttributes;
use edenapi_types::FileContent;
use edenapi_types::FileEntry;
use edenapi_types::FileResponse;
use edenapi_types::FileSpec;
use edenapi_types::HistoryEntry;
use edenapi_types::TreeAttributes;
@ -249,7 +250,7 @@ impl FakeEdenApi {
fn get_files(
map: &HashMap<Key, (Bytes, Option<u64>)>,
reqs: impl Iterator<Item = FileSpec>,
) -> Result<Response<FileEntry>, EdenApiError> {
) -> Result<Response<FileResponse>, EdenApiError> {
let entries = reqs
.filter_map(|spec| {
let parents = Parents::default();
@ -277,7 +278,10 @@ impl FakeEdenApi {
entry = entry.with_content(content);
}
Some(Ok(entry))
Some(Ok(FileResponse {
key: spec.key,
result: Ok(entry),
}))
})
.collect::<Vec<_>>();
@ -314,7 +318,7 @@ impl EdenApi for FakeEdenApi {
Ok(ResponseMeta::default())
}
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files(&self, keys: Vec<Key>) -> Result<Response<FileResponse>, EdenApiError> {
Self::get_files(
&self.files,
keys.into_iter().map(|key| FileSpec {
@ -327,7 +331,10 @@ impl EdenApi for FakeEdenApi {
)
}
async fn files_attrs(&self, reqs: Vec<FileSpec>) -> Result<Response<FileEntry>, EdenApiError> {
async fn files_attrs(
&self,
reqs: Vec<FileSpec>,
) -> Result<Response<FileResponse>, EdenApiError> {
Self::get_files(&self.files, reqs.into_iter())
}