mononoke/lfs_server: ignore redaction errors in batch

Summary:
If a blob is redacted, we shouldn't crash in batch. Instead, we should return
that the blob exists, and let the download path return to the client the
information that the blob is redacted. This diff does that.

Reviewed By: HarveyHunt

Differential Revision: D20897247

fbshipit-source-id: 3f305dfd9de4ac6a749a9eaedce101f594284d16
This commit is contained in:
Thomas Orozco 2020-04-08 11:54:46 -07:00 committed by Facebook GitHub Bot
parent 77149d7ee8
commit 0a21ab46c4
5 changed files with 185 additions and 26 deletions

View File

@ -25,6 +25,10 @@ impl<T: Blobstore + Clone> ContextConcurrencyBlobstore<T> {
Self { blobstore }
}
pub fn as_inner(&self) -> &T {
&self.blobstore
}
pub fn into_inner(self) -> T {
self.blobstore
}

View File

@ -230,6 +230,13 @@ where
}
}
pub fn has_redaction_root_cause(e: &Error) -> bool {
match e.root_cause().downcast_ref::<ErrorKind>() {
Some(ErrorKind::Censored(_, _)) => true,
None => false,
}
}
#[cfg(test)]
mod test {

View File

@ -6,7 +6,7 @@
*/
use anyhow::Error;
use futures::compat::Future01CompatExt;
use futures::{compat::Future01CompatExt, future::TryFutureExt};
use futures_util::{future::try_join_all, pin_mut, select, try_join, FutureExt};
use gotham::state::{FromState, State};
use gotham_derive::{StateData, StaticResponseExtender};
@ -14,6 +14,7 @@ use gotham_ext::body_ext::BodyExt;
use http::header::HeaderMap;
use hyper::{Body, StatusCode};
use maplit::hashmap;
use redactedblobstore::has_redaction_root_cause;
use scuba::ScubaValue;
use serde::Deserialize;
use slog::debug;
@ -160,8 +161,24 @@ async fn resolve_internal_object(
let exists = blobstore
.get(ctx.ctx.clone(), content_id.blobstore_key())
.compat()
.await?
.is_some();
.map_ok(|b| b.is_some())
.or_else(|e| async move {
// If a load error was caused by redaction, then check for existence instead, which
// isn't subject to redaction (only the content is). The reason we normally check for
// the content itself is because doing a exists() check does not fill our Cachelib
// cache on hit, whereas get() does (and those blobs are all very small because they're
// just lists of the blobs that make up the actual large file). We therefore only
// fallback to this slow path when redaction gets in the way (uncommon).
if has_redaction_root_cause(&e) {
Ok(blobstore
.is_present(ctx.ctx.clone(), content_id.blobstore_key())
.compat()
.await?)
} else {
Err(e)
}
})
.await?;
if exists {
Ok(Some(content_id))
@ -534,7 +551,15 @@ pub async fn batch(state: &mut State) -> Result<impl TryIntoResponse, HttpError>
mod test {
use super::*;
use blobrepo::DangerousOverride;
use blobrepo_factory::TestRepoBuilder;
use bytes::Bytes;
use context::CoreContext;
use fbinit::FacebookInit;
use filestore::{self, StoreRequest};
use futures_old::stream as stream_old;
use hyper::Uri;
use mononoke_types_mocks::hash::ONES_SHA256;
use std::sync::Arc;
use lfs_protocol::Sha256 as LfsSha256;
@ -748,4 +773,74 @@ mod test {
Ok(())
}
#[fbinit::compat_test]
async fn test_resolve_missing(fb: FacebookInit) -> Result<(), Error> {
let ctx = RepositoryRequestContext::test_builder(fb)?.build()?;
assert_eq!(resolve_internal_object(&ctx, ONES_SHA256).await?, None);
Ok(())
}
#[fbinit::compat_test]
async fn test_resolve_present(fb: FacebookInit) -> Result<(), Error> {
let ctx = RepositoryRequestContext::test_builder(fb)?.build()?;
let meta = filestore::store(
ctx.repo.blobstore().clone(),
ctx.repo.filestore_config(),
ctx.ctx.clone(),
&StoreRequest::new(6),
stream_old::once(Ok(Bytes::from("foobar"))),
)
.compat()
.await?;
assert_eq!(
resolve_internal_object(&ctx, meta.sha256).await?,
Some(meta.content_id)
);
Ok(())
}
#[fbinit::compat_test]
async fn test_resolve_redacted(fb: FacebookInit) -> Result<(), Error> {
// First, have the filestore tell us what the hash for this blob would be, so we can create
// a new repo and redact it.
let stub = TestRepoBuilder::new().build()?;
let meta = filestore::store(
stub.blobstore().clone(),
stub.filestore_config(),
CoreContext::test_mock(fb),
&StoreRequest::new(6),
stream_old::once(Ok(Bytes::from("foobar"))),
)
.compat()
.await?;
// NOTE: It's not ideal that we have to do as_inner 3 times here, but that's how
// the structure of RepoBlobstore works right now: it's just a type name. So, we
// get to the inner blobstore, and then clone that. It's where our data is!
let stub_blobstore = stub.blobstore().as_inner().as_inner().as_inner().clone();
// Now, create a new blob repo with redaction, then swap the blobstore from the stub repo
// into it, which has the data (but now it is redacted)!
let repo = TestRepoBuilder::new()
.redacted(Some(
hashmap! { meta.content_id.blobstore_key() => "test".to_string() },
))
.build()?
.dangerous_override(|_: Arc<dyn Blobstore>| stub_blobstore);
let ctx = RepositoryRequestContext::test_builder(fb)?
.repo(repo)
.build()?;
assert_eq!(
resolve_internal_object(&ctx, meta.sha256).await?,
Some(meta.content_id)
);
Ok(())
}
}

View File

@ -113,7 +113,7 @@ impl LfsServerContext {
repository,
server: inner.server.clone(),
},
client: inner.client.clone(),
client: HttpClient::Enabled(inner.client.clone()),
config,
always_wait_for_upstream: inner.always_wait_for_upstream,
max_upload_size: inner.max_upload_size,
@ -159,6 +159,13 @@ fn acl_check(
}
}
#[derive(Clone)]
enum HttpClient {
Enabled(Arc<HttpsHyperClient>),
#[cfg(test)]
Disabled,
}
#[derive(Clone)]
pub struct RepositoryRequestContext {
pub ctx: CoreContext,
@ -167,7 +174,7 @@ pub struct RepositoryRequestContext {
pub config: Arc<ServerConfig>,
always_wait_for_upstream: bool,
max_upload_size: Option<u64>,
client: Arc<HttpsHyperClient>,
client: HttpClient,
}
impl RepositoryRequestContext {
@ -204,6 +211,13 @@ impl RepositoryRequestContext {
}
pub fn dispatch(&self, request: Request<Body>) -> impl Future<Output = Result<Bytes, Error>> {
#[allow(clippy::infallible_destructuring_match)]
let client = match self.client {
HttpClient::Enabled(ref client) => client,
#[cfg(test)]
HttpClient::Disabled => panic!("HttpClient is disabled in test"),
};
let (sender, receiver) = oneshot::channel();
// NOTE: We spawn the request on an executor because we'd like to read the response even if
@ -211,7 +225,7 @@ impl RepositoryRequestContext {
// response, Hyper will not reuse the conneciton for its pool (which makes sense for the
// general case: if your server is sending you 5GB of data and you drop the future, you
// don't want to read all that later just to reuse a connection).
let fut = self.client.request(request).then(move |r| {
let fut = client.request(request).then(move |r| {
let _ = sender.send(r);
future::ready(())
});
@ -392,6 +406,8 @@ impl BaseUri {
#[cfg(test)]
mod test {
use super::*;
use blobrepo_factory::TestRepoBuilder;
use fbinit::FacebookInit;
use lfs_protocol::Sha256 as LfsSha256;
use mononoke_types::{hash::Sha256, ContentId};
use std::str::FromStr;
@ -400,6 +416,57 @@ mod test {
const TWOS_HASH: &str = "2222222222222222222222222222222222222222222222222222222222222222";
const SIZE: u64 = 123;
pub fn uri_builder(self_uri: &str, upstream_uri: &str) -> Result<UriBuilder, Error> {
let server = ServerUris::new(self_uri, Some(upstream_uri))?;
Ok(UriBuilder {
repository: "repo123".to_string(),
server: Arc::new(server),
})
}
pub struct TestContextBuilder {
fb: FacebookInit,
repo: BlobRepo,
uri_builder: UriBuilder,
}
impl TestContextBuilder {
pub fn repo(mut self, repo: BlobRepo) -> Self {
self.repo = repo;
self
}
pub fn build(self) -> Result<RepositoryRequestContext, Error> {
let Self {
fb,
repo,
uri_builder,
} = self;
Ok(RepositoryRequestContext {
ctx: CoreContext::test_mock(fb),
repo,
config: Arc::new(ServerConfig::default()),
uri_builder,
always_wait_for_upstream: false,
max_upload_size: None,
client: HttpClient::Disabled,
})
}
}
impl RepositoryRequestContext {
pub fn test_builder(fb: FacebookInit) -> Result<TestContextBuilder, Error> {
let uri_builder = uri_builder("http://foo.com/", "http://bar.com")?;
Ok(TestContextBuilder {
fb,
repo: TestRepoBuilder::new().build()?,
uri_builder,
})
}
}
fn obj() -> Result<RequestObject, Error> {
Ok(RequestObject {
oid: LfsSha256::from_str(ONES_HASH)?,
@ -415,14 +482,6 @@ mod test {
Sha256::from_str(TWOS_HASH)
}
fn uri_builder(self_uri: &str, upstream_uri: &str) -> Result<UriBuilder, Error> {
let server = ServerUris::new(self_uri, Some(upstream_uri))?;
Ok(UriBuilder {
repository: "repo123".to_string(),
server: Arc::new(server),
})
}
#[test]
fn test_basic_upload_uri() -> Result<(), Error> {
let b = uri_builder("http://foo.com", "http://bar.com")?;

View File

@ -11,7 +11,7 @@ use bytes::Bytes;
use futures_old::{Async, Future, Poll};
use mercurial_types::FileBytes;
use redactedblobstore::ErrorKind;
use redactedblobstore::has_redaction_root_cause;
/// Tombstone string to replace the content of redacted files with
const REDACTED_CONTENT: &str =
@ -43,17 +43,11 @@ where
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(r)) => Ok(Async::Ready(r)),
Err(e) => {
let root_cause = e.root_cause();
let maybe_redacted = root_cause.downcast_ref::<ErrorKind>();
// If the error was caused by redaction, then return a tombstone instead of the
// content.
match maybe_redacted {
Some(ErrorKind::Censored(_, _)) => {
if has_redaction_root_cause(&e) {
let ret = (Bytes::new(), FileBytes(REDACTED_CONTENT.as_bytes().into()));
Ok(Async::Ready(ret))
}
None => Err(e),
} else {
Err(e)
}
}
}