newstore: Add FetchError type for strongly typed fetch errors

Summary: Introduce `FetchError` type, with separates "not found" errors (which require a key), from other errors (which may or may not have an associated key). This allows us to easily fall back for only "not found" errors propagate other fetch errors to the caller. The current version of `FetchError` still does not eliminate redundantly storing the key for cases like EdenApi, where the key may be part of the error type itself. This optimization may not be worthwhile after we refactor these store implementations to use `HgId` instead of `Key`.

Reviewed By: kulshrax

Differential Revision: D26410215

fbshipit-source-id: e6198d54de64b41ff696cabd64affc8dbaa41cf9
This commit is contained in:
Meyer Jacobs 2021-02-19 10:39:21 -08:00 committed by Facebook GitHub Bot
parent 62c0be0bfd
commit c74570a4b6
5 changed files with 84 additions and 57 deletions

View File

@ -11,7 +11,6 @@ use std::{
sync::Arc, sync::Arc,
}; };
use anyhow::anyhow;
use anyhow::{bail, ensure, Result}; use anyhow::{bail, ensure, Result};
use async_trait::async_trait; use async_trait::async_trait;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
@ -30,7 +29,9 @@ use crate::{
datastore::{Delta, HgIdDataStore, HgIdMutableDeltaStore, Metadata, StoreResult}, datastore::{Delta, HgIdDataStore, HgIdMutableDeltaStore, Metadata, StoreResult},
indexedlogutil::{Store, StoreOpenOptions}, indexedlogutil::{Store, StoreOpenOptions},
localstore::{ExtStoredPolicy, LocalStore}, localstore::{ExtStoredPolicy, LocalStore},
newstore::{FetchStream, KeyStream, ReadStore, WriteResults, WriteStore, WriteStream}, newstore::{
FetchError, FetchStream, KeyStream, ReadStore, WriteResults, WriteStore, WriteStream,
},
repack::ToKeys, repack::ToKeys,
sliceext::SliceExt, sliceext::SliceExt,
types::StoreKey, types::StoreKey,
@ -279,17 +280,16 @@ impl ReadStore<Key, Entry> for IndexedLogHgIdDataStore {
spawn_blocking(move || { spawn_blocking(move || {
let inner = self_.inner.read(); let inner = self_.inner.read();
match Entry::from_log(&key, &inner.log) { match Entry::from_log(&key, &inner.log) {
// TODO(meyer): NotFound error should be strongly typed. Ok(None) => Err(FetchError::not_found(key.clone())),
Ok(None) => Err((Some(key.clone()), anyhow!("key not found in indexedlog"))),
Ok(Some(entry)) => Ok(entry), Ok(Some(entry)) => Ok(entry),
Err(e) => Err((Some(key.clone()), e)), Err(e) => Err(FetchError::with_key(key.clone(), e)),
} }
}) })
.map(move |spawn_res| { .map(move |spawn_res| {
match spawn_res { match spawn_res {
Ok(Ok(entry)) => Ok(entry), Ok(Ok(entry)) => Ok(entry),
Ok(Err(e)) => Err(e), Ok(Err(e)) => Err(e),
Err(e) => Err((Some(key_), e.into())), Err(e) => Err(FetchError::with_key(key_, e)),
} }
}) })
})) }))

View File

@ -8,7 +8,6 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::Error;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use futures_batch::ChunksTimeoutStreamExt; use futures_batch::ChunksTimeoutStreamExt;
@ -17,7 +16,7 @@ use edenapi::EdenApi;
use edenapi_types::{FileEntry, TreeAttributes, TreeEntry}; use edenapi_types::{FileEntry, TreeAttributes, TreeEntry};
use types::Key; use types::Key;
use crate::newstore::{fetch_error, FetchStream, KeyStream, ReadStore}; use crate::newstore::{fetch_error, FetchError, FetchStream, KeyStream, ReadStore};
// TODO(meyer): These should be configurable // TODO(meyer): These should be configurable
// EdenApi's API is batch-based and async, and it will split a large batch into multiple requests to send in parallel // EdenApi's API is batch-based and async, and it will split a large batch into multiple requests to send in parallel
@ -55,10 +54,11 @@ where
.map_or_else(fetch_error, |s| { .map_or_else(fetch_error, |s| {
Box::pin(s.entries.map(|v| match v { Box::pin(s.entries.map(|v| match v {
Ok(Ok(v)) => Ok(v), Ok(Ok(v)) => Ok(v),
// TODO: We could eliminate this redundant key clone with a FetchError trait. // TODO: Separate out NotFound errors from EdenApi
Ok(Err(e)) => Err((e.key.clone(), Error::new(e))), // TODO: We could eliminate this redundant key clone with a trait, I think.
Ok(Err(e)) => Err(FetchError::maybe_with_key(e.key.clone(), e)),
// TODO: What should happen when an entire batch fails? // TODO: What should happen when an entire batch fails?
Err(e) => Err((None, Error::new(e))), Err(e) => Err(FetchError::from(e)),
})) as FetchStream<Key, TreeEntry> })) as FetchStream<Key, TreeEntry>
}) })
} }
@ -84,7 +84,8 @@ where
.files(self_.repo.clone(), keys, None) .files(self_.repo.clone(), keys, None)
.await .await
.map_or_else(fetch_error, |s| { .map_or_else(fetch_error, |s| {
Box::pin(s.entries.map(|v| v.map_err(|e| (None, Error::new(e))))) // TODO: Add per-item errors to EdenApi `files`
Box::pin(s.entries.map(|v| v.map_err(FetchError::from)))
as FetchStream<Key, FileEntry> as FetchStream<Key, FileEntry>
}) })
} }

View File

@ -6,16 +6,18 @@
*/ */
use std::convert::From; use std::convert::From;
use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Error;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{channel::mpsc::channel, SinkExt, StreamExt, TryStreamExt}; use futures::{channel::mpsc::channel, SinkExt, StreamExt, TryStreamExt};
use tracing::error; use tracing::error;
use streams::select_drop; use streams::select_drop;
use crate::newstore::{BoxedReadStore, BoxedWriteStore, FetchStream, KeyStream, ReadStore}; use crate::newstore::{
BoxedReadStore, BoxedWriteStore, FetchError, FetchStream, KeyStream, ReadStore,
};
/// A combinator which queries a preferred store, then falls back to a fallback store /// A combinator which queries a preferred store, then falls back to a fallback store
/// if a key is not found in the preferred store. /// if a key is not found in the preferred store.
@ -41,7 +43,7 @@ const CHANNEL_BUFFER: usize = 200;
#[async_trait] #[async_trait]
impl<K, VP, VF> ReadStore<K, VP> for FallbackStore<K, VP, VF> impl<K, VP, VF> ReadStore<K, VP> for FallbackStore<K, VP, VF>
where where
K: Send + Sync + Clone + Unpin + 'static, K: fmt::Display + fmt::Debug + Send + Sync + Clone + Unpin + 'static,
VF: Send + Sync + 'static, VF: Send + Sync + 'static,
VP: Send + Sync + Clone + From<VF> + 'static, VP: Send + Sync + Clone + From<VF> + 'static,
{ {
@ -57,19 +59,17 @@ where
.filter_map(move |res| { .filter_map(move |res| {
let mut sender = sender.clone(); let mut sender = sender.clone();
async move { async move {
use FetchError::*;
match res { match res {
Ok(v) => Some(Ok(v)), Ok(v) => Some(Ok(v)),
Err((None, e)) => Some(Err((None, e))), // TODO(meyer): Looks like we aren't up to date with futures crate, missing "feed" method, which is probably better here.
Err((Some(k), _e)) => { // I think this might serialize the fallback stream as-written.
// TODO(meyer): We should really only fallback for "not found" errors, and pass through others. Err(NotFound(k)) => match sender.send(k.clone()).await {
// Otherwise swallowing this error here could lose us valuable information. Ok(()) => None,
// TODO(meyer): Looks like we aren't up to date with futures crate, missing "feed" method, which is probably better here. Err(e) => Some(Err(FetchError::with_key(k, e))),
// I think this might serialize the fallback stream as-written. },
match sender.send(k.clone()).await { // TODO(meyer): Should we also fall back on KeyedError, but also log an error?
Ok(()) => None, Err(e) => Some(Err(e)),
Err(e) => Some(Err((Some(k), e.into()))),
}
}
} }
} }
}); });
@ -104,9 +104,7 @@ where
.write_stream(Box::pin(write_receiver)) .write_stream(Box::pin(write_receiver))
.await .await
// TODO(meyer): Don't swallow all write errors here. // TODO(meyer): Don't swallow all write errors here.
.filter_map(|_res| { .filter_map(|_res| futures::future::ready(None));
futures::future::ready(Option::<Result<VP, (Option<K>, Error)>>::None)
});
// TODO(meyer): Implement `select_all_drop` if we continue with this approach // TODO(meyer): Implement `select_all_drop` if we continue with this approach
Box::pin(select_drop( Box::pin(select_drop(

View File

@ -12,7 +12,6 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::{anyhow, Error};
use async_trait::async_trait; use async_trait::async_trait;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
@ -22,7 +21,9 @@ use types::Key;
use crate::{ use crate::{
datastore::{Delta, HgIdDataStore, HgIdMutableDeltaStore, StoreResult}, datastore::{Delta, HgIdDataStore, HgIdMutableDeltaStore, StoreResult},
indexedlogdatastore::Entry, indexedlogdatastore::Entry,
newstore::{FetchStream, KeyStream, ReadStore, WriteResults, WriteStore, WriteStream}, newstore::{
FetchError, FetchStream, KeyStream, ReadStore, WriteResults, WriteStore, WriteStream,
},
types::StoreKey, types::StoreKey,
}; };
@ -37,30 +38,20 @@ where
Box::pin(keys.then(move |key| { Box::pin(keys.then(move |key| {
let self_ = self.clone(); let self_ = self.clone();
let key_ = key.clone(); let key_ = key.clone();
spawn_blocking(move || -> Result<Entry, (Option<Key>, Error)> { spawn_blocking(move || {
use StoreResult::*; use StoreResult::*;
let key = key_; let key = key_;
let store_key = StoreKey::HgId(key.clone()); let store_key = StoreKey::HgId(key.clone());
let blob = match self_.0.get(store_key.clone()) { let blob = match self_.0.get(store_key.clone()) {
Ok(Found(v)) => v.into(), Ok(Found(v)) => Ok(v.into()),
// TODO: Add type-safe "not found" to new storage traits. We now have two sets Ok(NotFound(_k)) => Err(FetchError::not_found(key.clone())),
// of adapters that support this being downgraded to non-type-safe "not found". Err(e) => Err(FetchError::with_key(key.clone(), e)),
Ok(NotFound(_k)) => { }?;
return Err((Some(key.clone()), anyhow!("key not found")));
}
Err(e) => {
return Err((Some(key.clone()), e));
}
};
let meta = match self_.0.get_meta(store_key) { let meta = match self_.0.get_meta(store_key) {
Ok(Found(v)) => v, Ok(Found(v)) => Ok(v),
Ok(NotFound(_k)) => { Ok(NotFound(_k)) => Err(FetchError::not_found(key.clone())),
return Err((Some(key.clone()), anyhow!("key not found"))); Err(e) => Err(FetchError::with_key(key.clone(), e)),
} }?;
Err(e) => {
return Err((Some(key.clone()), e));
}
};
Ok(Entry::new(key, blob, meta)) Ok(Entry::new(key, blob, meta))
}) })
@ -68,7 +59,7 @@ where
match spawn_res { match spawn_res {
Ok(Ok(entry)) => Ok(entry), Ok(Ok(entry)) => Ok(entry),
Ok(Err(e)) => Err(e), Ok(Err(e)) => Err(e),
Err(e) => Err((Some(key), e.into())), Err(e) => Err(FetchError::with_key(key, e)),
} }
}) })
})) }))

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2. * GNU General Public License version 2.
*/ */
use std::sync::Arc; use std::{fmt, sync::Arc};
use anyhow::Error; use anyhow::Error;
use async_trait::async_trait; use async_trait::async_trait;
@ -13,6 +13,7 @@ use futures::{
future, future,
stream::{self, BoxStream}, stream::{self, BoxStream},
}; };
use thiserror::Error;
pub mod edenapi; pub mod edenapi;
pub mod fallback; pub mod fallback;
@ -22,7 +23,7 @@ pub mod legacy;
pub type KeyStream<K> = BoxStream<'static, K>; pub type KeyStream<K> = BoxStream<'static, K>;
/// A pinned, boxed stream of (fallible) fetched values /// A pinned, boxed stream of (fallible) fetched values
pub type FetchStream<K, V> = BoxStream<'static, Result<V, (Option<K>, Error)>>; pub type FetchStream<K, V> = BoxStream<'static, Result<V, FetchError<K>>>;
/// A boxed, object-safe ReadStore trait object for a given key and value type. /// A boxed, object-safe ReadStore trait object for a given key and value type.
pub type BoxedReadStore<K, V> = Arc<dyn ReadStore<K, V>>; pub type BoxedReadStore<K, V> = Arc<dyn ReadStore<K, V>>;
@ -33,20 +34,56 @@ pub type WriteResults<K> = BoxStream<'static, Result<K, (Option<K>, Error)>>;
pub type BoxedWriteStore<K, V> = Arc<dyn WriteStore<K, V>>; pub type BoxedWriteStore<K, V> = Arc<dyn WriteStore<K, V>>;
#[derive(Debug, Error)]
pub enum FetchError<K: fmt::Debug + fmt::Display> {
#[error("failed to fetch key '{0}': key not found")]
NotFound(K),
#[error("failed to fetch key '{0}': {1}")]
KeyedError(K, Error),
#[error(transparent)]
Other(#[from] Error),
}
impl<K> FetchError<K>
where
K: fmt::Debug + fmt::Display,
{
pub fn not_found(key: K) -> Self {
FetchError::NotFound(key)
}
pub fn with_key(key: K, err: impl Into<Error>) -> Self {
FetchError::KeyedError(key, err.into())
}
pub fn maybe_with_key(maybe_key: Option<K>, err: impl Into<Error>) -> Self {
match maybe_key {
Some(key) => FetchError::KeyedError(key, err.into()),
None => FetchError::Other(err.into()),
}
}
pub fn from(err: impl Into<Error>) -> Self {
FetchError::Other(err.into())
}
}
/// Transform an error into a single-item FetchStream /// Transform an error into a single-item FetchStream
pub fn fetch_error<K, V, E>(e: E) -> FetchStream<K, V> pub fn fetch_error<K, V, E>(e: E) -> FetchStream<K, V>
where where
E: std::error::Error + Send + Sync + 'static, E: Into<Error> + Send + Sync + 'static,
K: Send + Sync + 'static, K: fmt::Display + fmt::Debug + Send + Sync + 'static,
V: Send + Sync + 'static, V: Send + Sync + 'static,
{ {
Box::pin(stream::once(future::ready(Err((None, Error::new(e)))))) Box::pin(stream::once(future::ready(Err(FetchError::from(e)))))
} }
// TODO: Add attributes support // TODO: Add attributes support
/// A typed, async key-value storage API /// A typed, async key-value storage API
#[async_trait] #[async_trait]
pub trait ReadStore<K: Send + Sync + 'static, V: Send + Sync + 'static>: pub trait ReadStore<K: fmt::Display + fmt::Debug + Send + Sync + 'static, V: Send + Sync + 'static>:
Send + Sync + 'static Send + Sync + 'static
{ {
/// Map a stream of keys to a stream of values by fetching from the underlying store /// Map a stream of keys to a stream of values by fetching from the underlying store