Add ephemeral handle

Summary: Ephemeral handle is a blobstore that's built from a bubble and a "main blobstore", which first attempts to read from the ephemeral blobstore, but falls back to the main one. Will be used to read/write stuff in snapshots.

Reviewed By: liubov-dmitrieva

Differential Revision: D29733408

fbshipit-source-id: f15ae9d3009632cd71fafa88eac09986e0b958e7
This commit is contained in:
Yan Soares Couto 2021-07-22 13:46:01 -07:00 committed by Facebook GitHub Bot
parent 13bcc58555
commit 16fbc46f2c
5 changed files with 127 additions and 24 deletions

View File

@ -7,9 +7,11 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.45"
blobstore = { version = "0.1.0", path = ".." }
chrono = { version = "0.4", features = ["clock", "serde", "std"], default-features = false }
context = { version = "0.1.0", path = "../../server/context" }
derivative = "2.1"
facet = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
prefixblob = { version = "0.1.0", path = "../prefixblob" }

View File

@ -11,7 +11,7 @@ use std::fmt;
use std::sync::Arc;
use anyhow::Result;
use blobstore::{Blobstore, BlobstoreBytes, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreBytes, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::repo::{EPH_ID_PREFIX, EPH_ID_SUFFIX};
use mononoke_types::{DateTime, RepositoryId};
@ -20,6 +20,7 @@ use sql::mysql_async::prelude::{ConvIr, FromValue};
use sql::mysql_async::{FromValueError, Value};
use crate::error::EphemeralBlobstoreError;
use crate::handle::EphemeralHandle;
use crate::store::EphemeralBlobstore;
/// Ephemeral Blobstore Bubble ID.
@ -78,6 +79,7 @@ impl BubbleId {
/// An opened ephemeral blobstore bubble. This is a miniature blobstore
/// that stores blobs just for this ephemeral bubble in a particular repo.
#[derive(Debug, Clone)]
pub struct Bubble {
/// ID of the repository this bubble applies to.
repo_id: RepositoryId,
@ -97,6 +99,12 @@ pub struct Bubble {
ephemeral_blobstore: EphemeralBlobstore,
}
impl fmt::Display for Bubble {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Bubble({})<{}>", self.bubble_id, self.blobstore)
}
}
impl Bubble {
pub(crate) fn new(
repo_id: RepositoryId,
@ -137,26 +145,39 @@ impl Bubble {
self.bubble_id
}
pub fn get_handle(&self, main_blobstore: Arc<dyn Blobstore>) -> EphemeralHandle {
EphemeralHandle::new(self.clone(), main_blobstore)
}
pub async fn extend_lifespan(&self) -> Result<()> {
unimplemented!()
}
pub async fn get(&self, ctx: &CoreContext, key: &str) -> Result<Option<BlobstoreGetData>> {
pub(crate) async fn get(
&self,
ctx: &CoreContext,
key: &str,
) -> Result<Option<BlobstoreGetData>> {
self.check_unexpired()?;
self.blobstore.get(ctx, key).await
}
pub async fn put(&self, ctx: &CoreContext, key: &str, value: BlobstoreBytes) -> Result<()> {
pub(crate) async fn put(
&self,
ctx: &CoreContext,
key: String,
value: BlobstoreBytes,
) -> Result<()> {
self.check_unexpired()?;
self.blobstore.put(ctx, key.to_string(), value).await
self.blobstore.put(ctx, key, value).await
}
pub async fn is_present(
pub(crate) async fn is_present(
&self,
_ctx: &CoreContext,
_bubble_id: BubbleId,
_key: &str,
) -> Result<bool> {
unimplemented!()
ctx: &CoreContext,
key: &str,
) -> Result<BlobstoreIsPresent> {
self.check_unexpired()?;
self.blobstore.is_present(ctx, key).await
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::fmt;
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreBytes, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use crate::bubble::Bubble;
#[derive(Debug)]
pub struct EphemeralHandle {
bubble: Bubble,
main_blobstore: Arc<dyn Blobstore>,
}
impl EphemeralHandle {
pub(crate) fn new(bubble: Bubble, main_blobstore: Arc<dyn Blobstore>) -> Self {
Self {
bubble,
main_blobstore,
}
}
}
impl fmt::Display for EphemeralHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ReadOnlyBlobstore<{}, {}>",
self.bubble, self.main_blobstore
)
}
}
#[async_trait]
impl Blobstore for EphemeralHandle {
async fn get<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<Option<BlobstoreGetData>> {
Ok(match self.bubble.get(ctx, key).await? {
Some(content) => Some(content),
None => self.main_blobstore.get(ctx, key).await?,
})
}
async fn put<'a>(
&'a self,
ctx: &'a CoreContext,
key: String,
value: BlobstoreBytes,
) -> Result<()> {
self.bubble.put(ctx, key, value).await
}
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
Ok(match self.bubble.is_present(ctx, key).await? {
BlobstoreIsPresent::Absent | BlobstoreIsPresent::ProbablyNotPresent(_) => {
self.main_blobstore.is_present(ctx, key).await?
}
BlobstoreIsPresent::Present => BlobstoreIsPresent::Present,
})
}
}

View File

@ -15,11 +15,13 @@
mod bubble;
mod builder;
mod error;
mod handle;
mod repo;
mod store;
pub use crate::bubble::{Bubble, BubbleId};
pub use crate::builder::EphemeralBlobstoreBuilder;
pub use crate::error::EphemeralBlobstoreError;
pub use crate::handle::EphemeralHandle;
pub use crate::repo::{ArcRepoEphemeralBlobstore, RepoEphemeralBlobstore};
pub use crate::store::EphemeralBlobstore;

View File

@ -12,6 +12,7 @@ use std::sync::Arc;
use anyhow::Result;
use blobstore::Blobstore;
use chrono::Duration as ChronoDuration;
use derivative::Derivative;
use mononoke_types::{DateTime, RepositoryId, Timestamp};
use sql::queries;
use sql_ext::SqlConnections;
@ -21,10 +22,13 @@ use crate::error::EphemeralBlobstoreError;
use crate::repo::RepoEphemeralBlobstore;
/// Ephemeral Blobstore.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct EphemeralBlobstoreInner {
/// The backing blobstore where blobs are stored.
pub(crate) blobstore: Arc<dyn Blobstore>,
#[derivative(Debug = "ignore")]
/// Database used to manage the ephemeral blobstore.
pub(crate) connections: SqlConnections,
@ -40,7 +44,7 @@ pub struct EphemeralBlobstoreInner {
}
/// Ephemeral Blobstore.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct EphemeralBlobstore {
pub(crate) inner: Arc<EphemeralBlobstoreInner>,
}
@ -167,23 +171,20 @@ mod test {
ChronoDuration::days(30),
ChronoDuration::hours(6),
);
let key = "test_key".to_string();
// Create a bubble and put data in it.
let bubble1 = eph.create_bubble(REPO_ZERO).await?;
bubble1
.put(&ctx, "test_key", BlobstoreBytes::from_bytes("test data"))
.put(&ctx, key.clone(), BlobstoreBytes::from_bytes("test data"))
.await?;
let data = bubble1.get(&ctx, "test_key").await?.unwrap().into_bytes();
let data = bubble1.get(&ctx, &key).await?.unwrap().into_bytes();
assert_eq!(data.as_bytes().as_ref(), b"test data");
// Re-open the bubble and confirm we can read the data.
let bubble1_id = bubble1.bubble_id();
let bubble1_read = eph.open_bubble(REPO_ZERO, bubble1_id).await?;
let data = bubble1_read
.get(&ctx, "test_key")
.await?
.unwrap()
.into_bytes();
let data = bubble1_read.get(&ctx, &key).await?.unwrap().into_bytes();
assert_eq!(data.as_bytes().as_ref(), b"test data");
// Enumerate the blobstore and check the key got its prefix.
@ -192,7 +193,7 @@ mod test {
.await?;
assert_eq!(
enumerated.keys,
hashset! { format!("eph{}.repo0000.test_key", bubble1_id) }
hashset! { format!("eph{}.repo0000.{}", bubble1_id, key) }
);
// Create a new bubble and put data in it.
@ -200,14 +201,14 @@ mod test {
bubble2
.put(
&ctx,
"test_key",
key.clone(),
BlobstoreBytes::from_bytes("other test data"),
)
.await?;
let data = bubble2.get(&ctx, "test_key").await?.unwrap().into_bytes();
let data = bubble2.get(&ctx, &key).await?.unwrap().into_bytes();
assert_eq!(data.as_bytes().as_ref(), b"other test data");
let data = bubble1.get(&ctx, "test_key").await?.unwrap().into_bytes();
let data = bubble1.get(&ctx, &key).await?.unwrap().into_bytes();
assert_eq!(data.as_bytes().as_ref(), b"test data");
// There should now be two separate keys.
@ -218,8 +219,8 @@ mod test {
assert_eq!(
enumerated.keys,
hashset! {
format!("eph{}.repo0000.test_key", bubble1_id),
format!("eph{}.repo0000.test_key", bubble2_id),
format!("eph{}.repo0000.{}", bubble1_id, key),
format!("eph{}.repo0000.{}", bubble2_id, key),
}
);
Ok(())