mononoke: add blobstore link trait

Summary: Add blobstore link trait so we can use hardlink style links in fileblob and memblob for testing and later sqlblob et al for prod.

Reviewed By: StanislavGlebik

Differential Revision: D21935647

fbshipit-source-id: f76eaca26b6b226c77d6e39e9c64e02b4145b614
This commit is contained in:
Alex Hornby 2020-06-17 02:31:22 -07:00 committed by Facebook GitHub Bot
parent 623bd3aa17
commit b21dca37b0
6 changed files with 198 additions and 50 deletions

View File

@ -8,12 +8,16 @@
#![deny(warnings)]
use std::convert::TryFrom;
use std::fs::{create_dir_all, File};
use std::fs::{create_dir_all, hard_link, File};
use std::io::{self, Read, Write};
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use anyhow::{bail, Error, Result};
use futures::{
future::{self, BoxFuture},
FutureExt, TryFutureExt,
};
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as FutureExt01};
use futures_old::{
future::{poll_fn as poll_fn01, Future as Future01},
@ -21,7 +25,7 @@ use futures_old::{
};
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreMetadata};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreMetadata, BlobstoreWithLink};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use tempfile::NamedTempFile;
@ -120,3 +124,21 @@ impl Blobstore for Fileblob {
.boxify()
}
}
impl BlobstoreWithLink for Fileblob {
// This uses hardlink semantics as the production blobstores also have hardlink like semantics
// (i.e. you can't discover a canonical link source when loading by the target)
fn link(
&self,
_ctx: CoreContext,
existing_key: String,
link_key: String,
) -> BoxFuture<'static, Result<(), Error>> {
// from std::fs::hard_link: The dst path will be a link pointing to the src path
let src_path = self.path(&existing_key);
let dst_path = self.path(&link_key);
future::ready(hard_link(src_path, dst_path))
.map_err(Error::from)
.boxed()
}
}

View File

@ -12,4 +12,5 @@ context = { path = "../../server/context" }
mononoke_types = { path = "../../mononoke_types" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
futures = { version = "0.3", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }

View File

@ -9,64 +9,111 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use anyhow::Error;
use anyhow::{format_err, Error};
use futures::{
future::{self, lazy, BoxFuture},
FutureExt,
};
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as FutureExt01};
use futures_old::future::{lazy as lazy01, IntoFuture as IntoFuture01};
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreWithLink};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
// Implements hardlink-style links
#[derive(Default, Debug)]
struct MemState {
next_id: usize,
data: HashMap<usize, BlobstoreBytes>,
links: HashMap<String, usize>,
}
impl MemState {
fn put(&mut self, key: String, value: BlobstoreBytes) {
let id = self.next_id;
self.data.insert(id, value);
self.links.insert(key, id);
self.next_id += 1;
}
fn link(&mut self, existing_key: String, link_key: String) -> Result<(), Error> {
if let Some(existing_id) = self.links.get(&existing_key) {
let existing_id = *existing_id;
self.links.insert(link_key, existing_id);
return Ok(());
}
Err(format_err!("Unknown existing_key {}", existing_key))
}
fn get(&self, key: &str) -> Option<&BlobstoreBytes> {
if let Some(id) = self.links.get(key) {
self.data.get(id)
} else {
None
}
}
fn unlink(&mut self, key: &str) -> Option<()> {
self.links.remove(key).map(|_| ())
}
}
/// In-memory "blob store"
///
/// Pure in-memory implementation for testing.
#[derive(Clone)]
pub struct EagerMemblob {
hash: Arc<Mutex<HashMap<String, BlobstoreBytes>>>,
state: Arc<Mutex<MemState>>,
}
/// As EagerMemblob, but methods are lazy - they wait until polled to do anything.
#[derive(Clone)]
pub struct LazyMemblob {
hash: Arc<Mutex<HashMap<String, BlobstoreBytes>>>,
state: Arc<Mutex<MemState>>,
}
impl EagerMemblob {
pub fn new() -> Self {
Self {
hash: Arc::new(Mutex::new(HashMap::new())),
state: Arc::new(Mutex::new(MemState::default())),
}
}
pub fn remove(&self, key: &String) -> Option<BlobstoreBytes> {
let mut inner = self.hash.lock().expect("lock poison");
inner.remove(key)
pub fn unlink(&self, key: String) -> BoxFuture<'static, Result<Option<()>, Error>> {
let mut inner = self.state.lock().expect("lock poison");
future::ok(inner.unlink(&key)).boxed()
}
}
impl LazyMemblob {
pub fn new() -> Self {
Self {
hash: Arc::new(Mutex::new(HashMap::new())),
state: Arc::new(Mutex::new(MemState::default())),
}
}
pub fn remove(&self, key: &String) -> Option<BlobstoreBytes> {
let mut inner = self.hash.lock().expect("lock poison");
inner.remove(key)
pub fn unlink(&self, key: String) -> BoxFuture<'static, Result<Option<()>, Error>> {
let state = self.state.clone();
lazy(move |_| {
let mut inner = state.lock().expect("lock poison");
Ok(inner.unlink(&key))
})
.boxed()
}
}
impl Blobstore for EagerMemblob {
fn put(&self, _ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture01<(), Error> {
let mut inner = self.hash.lock().expect("lock poison");
let mut inner = self.state.lock().expect("lock poison");
inner.insert(key, value);
inner.put(key, value);
Ok(()).into_future().boxify()
}
fn get(&self, _ctx: CoreContext, key: String) -> BoxFuture01<Option<BlobstoreGetData>, Error> {
let inner = self.hash.lock().expect("lock poison");
let inner = self.state.lock().expect("lock poison");
Ok(inner.get(&key).map(|blob_ref| blob_ref.clone().into()))
.into_future()
@ -74,40 +121,63 @@ impl Blobstore for EagerMemblob {
}
}
impl BlobstoreWithLink for EagerMemblob {
fn link(
&self,
_ctx: CoreContext,
existing_key: String,
link_key: String,
) -> BoxFuture<'static, Result<(), Error>> {
let mut inner = self.state.lock().expect("lock poison");
future::ready(inner.link(existing_key, link_key)).boxed()
}
}
impl Blobstore for LazyMemblob {
fn put(&self, _ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture01<(), Error> {
let hash = self.hash.clone();
let state = self.state.clone();
lazy01(move || {
let mut inner = hash.lock().expect("lock poison");
let mut inner = state.lock().expect("lock poison");
inner.insert(key, value);
inner.put(key, value);
Ok(()).into_future()
})
.boxify()
}
fn get(&self, _ctx: CoreContext, key: String) -> BoxFuture01<Option<BlobstoreGetData>, Error> {
let hash = self.hash.clone();
let state = self.state.clone();
lazy01(move || {
let inner = hash.lock().expect("lock poison");
let inner = state.lock().expect("lock poison");
Ok(inner.get(&key).map(|bytes| bytes.clone().into())).into_future()
})
.boxify()
}
}
impl BlobstoreWithLink for LazyMemblob {
fn link(
&self,
_ctx: CoreContext,
existing_key: String,
link_key: String,
) -> BoxFuture<'static, Result<(), Error>> {
let state = self.state.clone();
lazy(move |_| {
let mut inner = state.lock().expect("lock poison");
inner.link(existing_key, link_key)
})
.boxed()
}
}
impl fmt::Debug for EagerMemblob {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EagerMemblob")
.field(
"hash",
&format!(
"({} entries)",
self.hash.lock().expect("lock poisoned").len()
),
)
.field("state", &self.state.lock().expect("lock poisoned"))
.finish()
}
}
@ -115,13 +185,7 @@ impl fmt::Debug for EagerMemblob {
impl fmt::Debug for LazyMemblob {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LazyMemblob")
.field(
"hash",
&format!(
"({} entries)",
self.hash.lock().expect("lock poisoned").len()
),
)
.field("state", &self.state.lock().expect("lock poisoned"))
.finish()
}
}

View File

@ -13,6 +13,7 @@ use std::fmt;
use abomonation_derive::Abomonation;
use anyhow::Error;
use futures::future::BoxFuture;
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as FutureExt01};
use futures_old::future::{self as future01, Future as Future01};
use thiserror::Error;
@ -277,6 +278,17 @@ pub trait Blobstore: fmt::Debug + Send + Sync + 'static {
}
}
/// Mixin trait for blobstores that support the `link()` operation
#[auto_impl(Arc, Box)]
pub trait BlobstoreWithLink: Blobstore {
fn link(
&self,
ctx: CoreContext,
existing_key: String,
link_key: String,
) -> BoxFuture<'static, Result<(), Error>>;
}
#[derive(Debug, Error)]
pub enum LoadableError {
#[error("Blobstore error")]

View File

@ -18,31 +18,64 @@ use fbinit::FacebookInit;
use futures::compat::Future01CompatExt;
use tempdir::TempDir;
use blobstore::Blobstore;
use blobstore::{Blobstore, BlobstoreWithLink};
use context::CoreContext;
use fileblob::Fileblob;
use memblob::{EagerMemblob, LazyMemblob};
use mononoke_types::BlobstoreBytes;
async fn round_trip<B: Blobstore>(
async fn roundtrip_and_link<B: BlobstoreWithLink>(
fb: FacebookInit,
blobstore: B,
has_ctime: bool,
) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let key = "foo".to_string();
let value = BlobstoreBytes::from_bytes(&b"bar"[..]);
let key = "randomkey".to_string();
let value = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(b"appleveldata"));
// Roundtrip
blobstore
.put(ctx.clone(), key.clone(), value)
.put(ctx.clone(), key.clone(), value.clone())
.compat()
.await?;
let out = blobstore.get(ctx, key).compat().await?.unwrap();
let roundtrip = blobstore
.get(ctx.clone(), key.clone())
.compat()
.await?
.unwrap();
let orig_ctime = roundtrip.as_meta().as_ctime().clone();
assert_eq!(orig_ctime.is_some(), has_ctime);
assert_eq!(value, roundtrip.into_bytes());
let newkey = "newkey".to_string();
// And now the link
blobstore
.link(ctx.clone(), key.clone(), newkey.clone())
.await?;
let newvalue = blobstore
.get(ctx.clone(), newkey.clone())
.compat()
.await?
.unwrap();
let new_ctime = newvalue.as_meta().as_ctime().clone();
assert_eq!(new_ctime.is_some(), has_ctime);
assert_eq!(orig_ctime, new_ctime);
assert_eq!(value, newvalue.into_bytes());
let newkey_is_present = blobstore
.is_present(ctx.clone(), newkey.clone())
.compat()
.await?;
assert!(newkey_is_present);
assert_eq!(out.clone().into_raw_bytes(), Bytes::from_static(b"bar"));
assert_eq!(out.as_meta().as_ctime().is_some(), has_ctime);
Ok(())
}
@ -67,11 +100,11 @@ macro_rules! blobstore_test_impl {
use super::*;
#[fbinit::compat_test]
async fn test_round_trip(fb: FacebookInit) -> Result<(), Error> {
async fn test_roundtrip_and_link(fb: FacebookInit) -> Result<(), Error> {
let state = $state;
let has_ctime = $has_ctime;
let factory = $new_cb;
round_trip(fb, factory(state.clone())?, has_ctime).await
roundtrip_and_link(fb, factory(state.clone())?, has_ctime).await
}
#[fbinit::compat_test]

View File

@ -350,7 +350,11 @@ async fn filestore_chunk_not_found(fb: FacebookInit) -> Result<()> {
.compat()
.await?;
assert!(blob.remove(&part_id.blobstore_key()).is_some());
assert!(blob
.unlink(part_id.blobstore_key())
.await
.unwrap()
.is_some());
// This should fail
let res = filestore::fetch_concat_opt(&blob, ctx, &FetchKey::Canonical(content_id))
@ -696,7 +700,11 @@ async fn filestore_rebuild_metadata(fb: FacebookInit) -> Result<()> {
.await?;
// Remove the metadata
assert!(blob.remove(&metadata.blobstore_key()).is_some());
assert!(blob
.unlink(metadata.blobstore_key())
.await
.unwrap()
.is_some());
// Getting the metadata should cause it to get recomputed
let fut: OldBoxFuture<_, _> =
@ -706,7 +714,11 @@ async fn filestore_rebuild_metadata(fb: FacebookInit) -> Result<()> {
assert_eq!(res?, expected);
// Now, delete the content (this shouldn't normally happen, but we're injecting failure here).
assert!(blob.remove(&content_id.blobstore_key()).is_some());
assert!(blob
.unlink(content_id.blobstore_key())
.await
.unwrap()
.is_some());
// Query the metadata again. It should succeed because it's saved.
let fut: OldBoxFuture<_, _> =
@ -716,7 +728,11 @@ async fn filestore_rebuild_metadata(fb: FacebookInit) -> Result<()> {
assert_eq!(res?, expected);
// Delete the metadata now.
assert!(blob.remove(&metadata.blobstore_key()).is_some());
assert!(blob
.unlink(metadata.blobstore_key())
.await
.unwrap()
.is_some());
// And then, query it again. This should now return None, because the metadata isn't there,
// and we can't recreate it.