edenapi: send hash-to-location errors to client

Summary:
We want to distinguish between no location and failure to compute location.

It is useful to know on the client side if we failed to compute the location of
some hash or we are not sending it because we don't have a location for it.

We have different options for hash-to-location in particular but the problem is
a general one. Does it make sense for us to skip sending error at the EdenApi
handler layer? I don't think so, I think that's an old hask. We should be
evolving the EdenApi handler layer to handle application errors.

This is still at the exploration phase. I haven't fixed the client side to
handle the errors.

Reviewed By: quark-zju, kulshrax

Differential Revision: D27549926

fbshipit-source-id: 7094160fe997af0e69c8d006b9731ea7dfb3e3f8
This commit is contained in:
Stefan Filip 2021-04-20 20:19:20 -07:00 committed by Facebook GitHub Bot
parent 82b689ad9d
commit 9c9e18568c
11 changed files with 186 additions and 43 deletions

View File

@ -24,7 +24,9 @@ use types::HgId;
use crate::context::ServerContext;
use crate::errors::ErrorKind;
use crate::middleware::RequestContext;
use crate::utils::{cbor_stream, get_repo, parse_cbor_request, parse_wire_request};
use crate::utils::{
cbor_stream, get_repo, parse_cbor_request, parse_wire_request, simple_cbor_stream,
};
use super::{EdenApiMethod, HandlerInfo};
@ -76,18 +78,21 @@ pub async fn hash_to_location(state: &mut State) -> Result<impl TryIntoResponse,
hg_repo_ctx: HgRepoContext,
master_heads: Vec<HgChangesetId>,
hg_cs_ids: Vec<HgChangesetId>,
) -> Result<impl Stream<Item = Result<CommitHashToLocationResponse, Error>>, Error> {
) -> impl Stream<Item = CommitHashToLocationResponse> {
let hgcsid_to_location = hg_repo_ctx
.many_changeset_ids_to_locations(master_heads, hg_cs_ids.clone())
.await?;
let responses = hgcsid_to_location.into_iter().map(|(hgcsid, location)| {
let response = CommitHashToLocationResponse {
.await;
let responses = hg_cs_ids.into_iter().map(move |hgcsid| {
let result = hgcsid_to_location
.as_ref()
.map(|hsh| hsh.get(&hgcsid).map(|l| l.map_descendant(|x| x.into())))
.map_err(|e| (&*e).into());
CommitHashToLocationResponse {
hgid: hgcsid.into(),
location: location.map_descendant(|x| x.into()),
};
Ok(response)
result,
}
});
Ok(stream::iter(responses))
stream::iter(responses)
}
let params = HashToLocationParams::take_from(state);
@ -103,6 +108,7 @@ pub async fn hash_to_location(state: &mut State) -> Result<impl TryIntoResponse,
let hg_repo_ctx = get_repo(&sctx, &rctx, &params.repo, None).await?;
let batch = parse_wire_request::<WireCommitHashToLocationRequestBatch>(state).await?;
let unfiltered = batch.unfiltered;
let master_heads = batch
.master_heads
.into_iter()
@ -117,9 +123,22 @@ pub async fn hash_to_location(state: &mut State) -> Result<impl TryIntoResponse,
move |chunk| hash_to_location_chunk(ctx.clone(), master_heads.clone(), chunk)
})
.buffer_unordered(3)
.try_flatten()
.map_ok(|response| response.to_wire());
Ok(cbor_stream(rctx, response))
.flatten()
.filter(move |v| {
// The old behavior is to filter out error and None results. We want to preserve that
// behavior for old clients since they will not be able to deserialize other results.
let to_keep = if unfiltered == Some(true) {
true
} else {
match v.result {
Ok(Some(_)) => true,
_ => false,
}
};
futures::future::ready(to_keep)
})
.map(|response| response.to_wire());
Ok(simple_cbor_stream(rctx, response))
}
pub async fn revlog_data(state: &mut State) -> Result<impl TryIntoResponse, HttpError> {

View File

@ -55,6 +55,17 @@ where
StreamBody::new(content_stream, cbor_mime())
}
pub fn simple_cbor_stream<S, T>(rctx: RequestContext, stream: S) -> impl TryIntoResponse
where
S: Stream<Item = T> + Send + 'static,
T: Serialize + Send + 'static,
{
let byte_stream = stream.then(|item| async { to_cbor_bytes(item) });
let content_stream = ContentStream::new(byte_stream).forward_err(rctx.error_tx);
StreamBody::new(content_stream, cbor_mime())
}
pub async fn parse_cbor_request<R: DeserializeOwned>(state: &mut State) -> Result<R, HttpError> {
let body = get_request_body(state).await?;
serde_cbor::from_slice(&body)

View File

@ -22,7 +22,10 @@ use crate::middleware::RequestContext;
pub mod cbor;
pub mod convert;
pub use cbor::{cbor_mime, cbor_stream, parse_cbor_request, parse_wire_request, to_cbor_bytes};
pub use cbor::{
cbor_mime, cbor_stream, parse_cbor_request, parse_wire_request, simple_cbor_stream,
to_cbor_bytes,
};
pub use convert::{to_hg_path, to_mononoke_path, to_mpath};
pub async fn get_repo(

View File

@ -51,6 +51,7 @@ Populate test repo
o 426bada5c67598ca65036d57d9e4b64b0c1ce7a0 A
Blobimport test repo.
$ cd ..
$ blobimport repo-hg/.hg repo
@ -61,6 +62,70 @@ Start up EdenAPI server.
$ wait_for_mononoke
Create and send file data request.
$ edenapi_make_req commit-hash-to-location > req.cbor <<EOF
> {
> "master_heads": [
> "$G"
> ],
> "hgids": [
> "$F",
> "$D",
> "$C",
> "$B",
> "$A",
> "$F",
> "$H",
> "000000000000000000000000000000123456789a"
> ],
> "unfiltered": true
> }
> EOF
Reading from stdin
Generated request: WireCommitHashToLocationRequestBatch {
client_head: Some(
WireHgId("1b794c59b583e47686701d0142848e90a3a94a7d"),
),
hgids: [
WireHgId("bb56d4161ee371c720dbc8b504810c62a22fe314"),
WireHgId("f585351a92f85104bff7c284233c338b10eb1df7"),
WireHgId("26805aba1e600a82e93661149f2313866a221a7b"),
WireHgId("112478962961147124edd43549aedd1a335e44bf"),
WireHgId("426bada5c67598ca65036d57d9e4b64b0c1ce7a0"),
WireHgId("bb56d4161ee371c720dbc8b504810c62a22fe314"),
WireHgId("06383dd46c9bcbca9300252b4b6cddad88f8af21"),
WireHgId("000000000000000000000000000000123456789a"),
],
master_heads: [
WireHgId("1b794c59b583e47686701d0142848e90a3a94a7d"),
],
unfiltered: Some(
true,
),
}
$ sslcurl -s "https://localhost:$MONONOKE_SOCKET/edenapi/repo/commit/hash_to_location" --data-binary @req.cbor > res.cbor
Check files in response.
$ edenapi_read_res commit-hash-to-location res.cbor
Reading from file: "res.cbor"
000000000000000000000000000000123456789a =>
Ok(None)
06383dd46c9bcbca9300252b4b6cddad88f8af21 =>
Ok(None)
112478962961147124edd43549aedd1a335e44bf =>
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=1)))
26805aba1e600a82e93661149f2313866a221a7b =>
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=1)))
426bada5c67598ca65036d57d9e4b64b0c1ce7a0 =>
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=2)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
f585351a92f85104bff7c284233c338b10eb1df7 =>
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=0)))
And now again but this time with the default value for unfiltered
$ edenapi_make_req commit-hash-to-location > req.cbor <<EOF
> {
> "master_heads": [
@ -96,6 +161,7 @@ Create and send file data request.
master_heads: [
WireHgId("1b794c59b583e47686701d0142848e90a3a94a7d"),
],
unfiltered: None,
}
$ sslcurl -s "https://localhost:$MONONOKE_SOCKET/edenapi/repo/commit/hash_to_location" --data-binary @req.cbor > res.cbor
@ -104,12 +170,14 @@ Check files in response.
$ edenapi_read_res commit-hash-to-location res.cbor
Reading from file: "res.cbor"
112478962961147124edd43549aedd1a335e44bf =>
Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=1)
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=1)))
26805aba1e600a82e93661149f2313866a221a7b =>
Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=1)
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=1)))
426bada5c67598ca65036d57d9e4b64b0c1ce7a0 =>
Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=2)
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=2)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
f585351a92f85104bff7c284233c338b10eb1df7 =>
Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=0)
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=0)))

View File

@ -104,6 +104,7 @@ Create and send file data request.
master_heads: [
WireHgId("1b794c59b583e47686701d0142848e90a3a94a7d"),
],
unfiltered: None,
}
$ sslcurl -s "https://localhost:$MONONOKE_SOCKET/edenapi/repo/commit/hash_to_location" --data-binary @req.cbor > res.cbor
@ -112,12 +113,14 @@ Check files in response.
$ edenapi_read_res commit-hash-to-location res.cbor
Reading from file: "res.cbor"
112478962961147124edd43549aedd1a335e44bf =>
Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=1)
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=1)))
26805aba1e600a82e93661149f2313866a221a7b =>
Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=1)
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=1)))
426bada5c67598ca65036d57d9e4b64b0c1ce7a0 =>
Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=2)
Ok(Some(Location(descendant=49cb92066bfd0763fff729c354345650b7428554, dist=2)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
bb56d4161ee371c720dbc8b504810c62a22fe314 =>
Ok(Some(Location(descendant=1b794c59b583e47686701d0142848e90a3a94a7d, dist=1)))
f585351a92f85104bff7c284233c338b10eb1df7 =>
Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=0)
Ok(Some(Location(descendant=f585351a92f85104bff7c284233c338b10eb1df7, dist=0)))

View File

@ -570,6 +570,7 @@ impl EdenApi for Client {
let batch = CommitHashToLocationRequestBatch {
master_heads: master_heads.clone(),
hgids,
unfiltered: Some(true),
};
self.log_request(&batch, "commit_hash_to_location");
batch.to_wire()

View File

@ -442,14 +442,24 @@ fn cmd_commit_hash_to_location(args: CommitHashToLocationArgs) -> Result<()> {
let mut response_list: Vec<WireCommitHashToLocationResponse> = read_input(args.input, None)?;
response_list.sort_by_key(|r| r.hgid);
let iter = response_list
.iter()
.into_iter()
.skip(args.start.unwrap_or(0))
.take(args.limit.unwrap_or(usize::MAX));
for response in iter {
println!(
"{} =>\n Location(descendant={}, dist={})",
response.hgid, response.location.descendant, response.location.distance
);
if let Some(result) = response.result {
let s = match result {
Ok(Some(l)) => format!(
"Ok(Some(Location(descendant={}, dist={})))",
l.descendant, l.distance
),
Ok(None) => String::from("Ok(None)"),
Err(e) => format!(
"Err({})",
e.message.unwrap_or_else(|| String::from("error"))
),
};
println!("{} =>\n {}", response.hgid, s);
}
}
Ok(())
}

View File

@ -5,6 +5,7 @@
* GNU General Public License version 2.
*/
use anyhow::Result;
use bytes::Bytes;
#[cfg(any(test, feature = "for-tests"))]
use quickcheck::Arbitrary;
@ -13,6 +14,8 @@ use serde_derive::{Deserialize, Serialize};
use dag_types::Location;
use types::hgid::HgId;
use crate::ServerError;
/// Given a graph location, return `count` hashes following first parent links.
///
/// Example:
@ -83,13 +86,14 @@ impl Arbitrary for CommitLocationToHashRequestBatch {
pub struct CommitHashToLocationRequestBatch {
pub master_heads: Vec<HgId>,
pub hgids: Vec<HgId>,
pub unfiltered: Option<bool>,
}
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Serialize)] // used to convert to Python
pub struct CommitHashToLocationResponse {
pub hgid: HgId,
pub location: Location<HgId>,
pub result: Result<Option<Location<HgId>>, ServerError>,
}
#[cfg(any(test, feature = "for-tests"))]
@ -98,6 +102,7 @@ impl Arbitrary for CommitHashToLocationRequestBatch {
CommitHashToLocationRequestBatch {
master_heads: Arbitrary::arbitrary(g),
hgids: Arbitrary::arbitrary(g),
unfiltered: Arbitrary::arbitrary(g),
}
}
}
@ -107,7 +112,7 @@ impl Arbitrary for CommitHashToLocationResponse {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
CommitHashToLocationResponse {
hgid: Arbitrary::arbitrary(g),
location: Arbitrary::arbitrary(g),
result: Arbitrary::arbitrary(g),
}
}
}

View File

@ -154,9 +154,11 @@ pub fn parse_commit_hash_to_location_req(json: &Value) -> Result<CommitHashToLoc
.context("could not be parsed as HgId")?;
hgids.push(hgid);
}
let unfiltered = json.get("unfiltered").map(|v| v.as_bool()).flatten();
Ok(CommitHashToLocationRequestBatch {
master_heads,
hgids,
unfiltered,
})
}
/// Parse a `FileRequest` from JSON.
@ -653,6 +655,7 @@ impl ToJson for CommitHashToLocationRequestBatch {
json!({
"master_heads": self.master_heads.to_json(),
"hgids": self.hgids.to_json(),
"unfiltered": self.unfiltered,
})
}
}

View File

@ -16,7 +16,7 @@ use crate::commit::{
CommitHashToLocationRequestBatch, CommitHashToLocationResponse, CommitLocationToHashRequest,
CommitLocationToHashRequestBatch, CommitLocationToHashResponse,
};
use crate::wire::{ToApi, ToWire, WireHgId, WireToApiConversionError};
use crate::wire::{ToApi, ToWire, WireHgId, WireResult, WireToApiConversionError};
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct WireCommitLocation {
@ -182,6 +182,8 @@ pub struct WireCommitHashToLocationRequestBatch {
pub hgids: Vec<WireHgId>,
#[serde(rename = "3", default)]
pub master_heads: Vec<WireHgId>,
#[serde(rename = "4")]
pub unfiltered: Option<bool>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
@ -189,7 +191,9 @@ pub struct WireCommitHashToLocationResponse {
#[serde(rename = "1")]
pub hgid: WireHgId,
#[serde(rename = "2")]
pub location: WireCommitLocation,
pub location: Option<WireCommitLocation>,
#[serde(rename = "3")]
pub result: Option<WireResult<Option<WireCommitLocation>>>,
}
impl ToWire for CommitHashToLocationRequestBatch {
@ -201,6 +205,7 @@ impl ToWire for CommitHashToLocationRequestBatch {
client_head,
hgids: self.hgids.to_wire(),
master_heads: self.master_heads.to_wire(),
unfiltered: self.unfiltered,
}
}
}
@ -220,6 +225,7 @@ impl ToApi for WireCommitHashToLocationRequestBatch {
let api = Self::Api {
master_heads,
hgids: self.hgids.to_api()?,
unfiltered: self.unfiltered,
};
Ok(api)
}
@ -236,9 +242,14 @@ impl ToWire for CommitHashToLocationResponse {
type Wire = WireCommitHashToLocationResponse;
fn to_wire(self) -> Self::Wire {
let location = match self.result {
Ok(Some(x)) => Some(x.to_wire()),
_ => None,
};
Self::Wire {
hgid: self.hgid.to_wire(),
location: self.location.to_wire(),
location,
result: Some(self.result.to_wire()),
}
}
}
@ -248,9 +259,16 @@ impl ToApi for WireCommitHashToLocationResponse {
type Error = WireToApiConversionError;
fn to_api(self) -> Result<Self::Api, Self::Error> {
let result = match self.result {
Some(x) => x.to_api()?,
None => match self.location {
None => Ok(None),
Some(l) => Ok(Some(l.to_api()?)),
},
};
let api = Self::Api {
hgid: self.hgid.to_api()?,
location: self.location.to_api()?,
result,
};
Ok(api)
}

View File

@ -93,13 +93,15 @@ impl RemoteIdConvertProtocol for EdenApiProtocol {
};
while let Some(response) = response.entries.next().await {
let response = response.map_err(to_dag_error)?;
let path = AncestorPath {
x: Vertex::copy_from(response.location.descendant.as_ref()),
n: response.location.distance,
batch_size: 1,
};
let name = Vertex::copy_from(response.hgid.as_ref());
pairs.push((path, vec![name]));
if let Some(location) = response.result.map_err(to_dag_error)? {
let path = AncestorPath {
x: Vertex::copy_from(location.descendant.as_ref()),
n: location.distance,
batch_size: 1,
};
let name = Vertex::copy_from(response.hgid.as_ref());
pairs.push((path, vec![name]));
}
}
Ok(pairs)
}