Fully asyncify blobstore sync queue

Summary: Move it from `'static` BoxFutures to async_trait and lifetimes

Reviewed By: markbt

Differential Revision: D22927171

fbshipit-source-id: 637a983fa6fa91d4cd1e73d822340cb08647c57d
This commit is contained in:
Simon Farnsworth 2020-08-05 15:39:36 -07:00 committed by Facebook GitHub Bot
parent 51bca90726
commit 0c3fe9b20f
10 changed files with 218 additions and 237 deletions

View File

@ -75,7 +75,7 @@ impl MultiplexedBlobstorePutHandler for QueueBlobstorePutHandler {
key: &'out str,
) -> BoxFuture<'out, Result<(), Error>> {
self.queue.add(
ctx.clone(),
ctx,
BlobstoreSyncQueueEntry::new(
key.to_string(),
blobstore_id,
@ -109,7 +109,7 @@ impl Blobstore for MultiplexedBlobstore {
// check synchronization queue. If it does not contain entries with this key
// it means it is true-none otherwise, only replica containing key has
// failed and we need to return error.
let entries = queue.get(ctx, key).await?;
let entries = queue.get(&ctx, &key).await?;
if entries.is_empty() {
Ok(None)
} else {
@ -142,7 +142,7 @@ impl Blobstore for MultiplexedBlobstore {
if let Some(ErrorKind::AllFailed(_)) = error.downcast_ref() {
return Err(error);
}
let entries = queue.get(ctx, key).await?;
let entries = queue.get(&ctx, &key).await?;
if entries.is_empty() {
Ok(false)
} else {

View File

@ -171,7 +171,7 @@ async fn blobstore_get(
ErrorKind::SomeFailedOthersNone(_) => {
// MultiplexedBlobstore returns Ok(None) here if queue is empty for the key
// and Error otherwise. Scrub does likewise.
let entries = queue.get(ctx.clone(), key).await?;
let entries = queue.get(ctx, &key).await?;
if entries.is_empty() {
// No pending write for the key, it really is None
Ok(None)
@ -181,7 +181,7 @@ async fn blobstore_get(
}
}
ErrorKind::SomeMissingItem(missing_reads, Some(value)) => {
let entries = queue.get(ctx.clone(), key.clone()).await?;
let entries = queue.get(ctx, &key).await?;
let mut needs_repair: HashMap<BlobstoreId, &dyn Blobstore> = HashMap::new();
for k in missing_reads.iter() {

View File

@ -242,7 +242,7 @@ async fn scrub_blobstore_fetch_none(fb: FacebookInit) -> Result<(), Error> {
id: None,
operation_key: OperationKey::gen(),
};
queue.add(ctx.clone(), entry).await?;
queue.add(&ctx, entry).await?;
fut.await?;
@ -441,7 +441,7 @@ async fn multiplexed(fb: FacebookInit) {
put_fut.await.expect("case 2 put_fut failed");
match queue
.get(ctx.clone(), k1.clone())
.get(&ctx, &k1)
.await
.expect("case 2 get failed")
.as_slice()
@ -506,7 +506,7 @@ async fn multiplexed_operation_keys(fb: FacebookInit) -> Result<(), Error> {
.expect("test multiplexed_operation_keys, put failed");
match queue
.get(ctx.clone(), k3.clone())
.get(&ctx, &k3)
.await
.expect("test multiplexed_operation_keys, get failed")
.as_slice()
@ -571,7 +571,7 @@ async fn scrubbed(fb: FacebookInit) {
bs1.tick(Some("bs1 failed"));
put_fut.await.unwrap();
match queue.get(ctx.clone(), k1.clone()).await.unwrap().as_slice() {
match queue.get(&ctx, &k1).await.unwrap().as_slice() {
[entry] => assert_eq!(entry.blobstore_id, bid0, "Queue bad"),
_ => panic!("only one entry expected"),
}
@ -642,11 +642,11 @@ async fn scrubbed(fb: FacebookInit) {
let k1 = String::from("k1");
let v1 = make_value("v1");
match queue.get(ctx.clone(), k1.clone()).await.unwrap().as_slice() {
match queue.get(&ctx, &k1).await.unwrap().as_slice() {
[entry] => {
assert_eq!(entry.blobstore_id, bid0, "Queue bad");
queue
.del(ctx.clone(), vec![entry.clone()])
.del(&ctx, &vec![entry.clone()])
.await
.expect("Could not delete scrub queue entry");
}

View File

@ -23,6 +23,7 @@ cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch
sql = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
auto_impl = "0.4"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
tokio = { version = "=0.2.13", features = ["full"] }

View File

@ -8,6 +8,7 @@
#![deny(warnings)]
use anyhow::{format_err, Error};
use async_trait::async_trait;
use auto_impl::auto_impl;
use cloned::cloned;
use context::CoreContext;
@ -112,21 +113,22 @@ impl BlobstoreSyncQueueEntry {
}
}
#[async_trait]
#[auto_impl(Arc, Box)]
pub trait BlobstoreSyncQueue: Send + Sync {
fn add(
&self,
ctx: CoreContext,
async fn add<'a>(
&'a self,
ctx: &'a CoreContext,
entry: BlobstoreSyncQueueEntry,
) -> BoxFuture<'static, Result<(), Error>> {
self.add_many(ctx, Box::new(vec![entry].into_iter()))
) -> Result<(), Error> {
self.add_many(ctx, vec![entry]).await
}
fn add_many(
&self,
ctx: CoreContext,
entries: Box<dyn Iterator<Item = BlobstoreSyncQueueEntry> + Send>,
) -> BoxFuture<'static, Result<(), Error>>;
async fn add_many<'a>(
&'a self,
ctx: &'a CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> Result<(), Error>;
/// Returns list of entries that consist of two groups of entries:
/// 1. Group with at most `limit` entries that are older than `older_than` and
@ -136,26 +138,26 @@ pub trait BlobstoreSyncQueue: Send + Sync {
/// As a result the caller gets a reasonably limited slice of BlobstoreSyncQueue entries that
/// are all related, so that the caller doesn't need to fetch more data from BlobstoreSyncQueue
/// to process the sync queue.
fn iter(
&self,
ctx: CoreContext,
key_like: Option<String>,
async fn iter<'a>(
&'a self,
ctx: &'a CoreContext,
key_like: Option<&'a String>,
multiplex_id: MultiplexId,
older_than: DateTime,
limit: usize,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>>;
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error>;
fn del(
&self,
ctx: CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> BoxFuture<'static, Result<(), Error>>;
async fn del<'a>(
&'a self,
ctx: &'a CoreContext,
entries: &'a [BlobstoreSyncQueueEntry],
) -> Result<(), Error>;
fn get(
&self,
ctx: CoreContext,
key: String,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>>;
async fn get<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a String,
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error>;
}
#[derive(Clone)]
@ -327,144 +329,133 @@ async fn insert_entries(
Ok(())
}
#[async_trait]
impl BlobstoreSyncQueue for SqlBlobstoreSyncQueue {
fn add_many(
&self,
_ctx: CoreContext,
entries: Box<dyn Iterator<Item = BlobstoreSyncQueueEntry> + Send>,
) -> BoxFuture<'static, Result<(), Error>> {
cloned!(self.write_sender, self.ensure_worker_scheduled);
async move {
ensure_worker_scheduled.await;
let (senders_entries, receivers): (Vec<_>, Vec<_>) = entries
.map(|entry| {
let (sender, receiver) = oneshot::channel();
((sender, entry), receiver)
})
.unzip();
async fn add_many<'a>(
&'a self,
_ctx: &'a CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> Result<(), Error> {
self.ensure_worker_scheduled.clone().await;
let (senders_entries, receivers): (Vec<_>, Vec<_>) = entries
.into_iter()
.map(|entry| {
let (sender, receiver) = oneshot::channel();
((sender, entry), receiver)
})
.unzip();
STATS::adds.add_value(senders_entries.len() as i64);
senders_entries
.into_iter()
.map(|(send, entry)| write_sender.unbounded_send((send, entry)))
.collect::<Result<_, _>>()?;
let results = future::try_join_all(receivers)
.map_err(|errs| format_err!("failed to receive result {:?}", errs))
.await?;
let errs: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
if errs.len() > 0 {
Err(format_err!("failed to receive result {:?}", errs))
} else {
Ok(())
}
STATS::adds.add_value(senders_entries.len() as i64);
senders_entries
.into_iter()
.map(|(send, entry)| self.write_sender.unbounded_send((send, entry)))
.collect::<Result<_, _>>()?;
let results = future::try_join_all(receivers)
.map_err(|errs| format_err!("failed to receive result {:?}", errs))
.await?;
let errs: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
if errs.len() > 0 {
Err(format_err!("failed to receive result {:?}", errs))
} else {
Ok(())
}
.boxed()
}
fn iter(
&self,
_ctx: CoreContext,
key_like: Option<String>,
async fn iter<'a>(
&'a self,
_ctx: &'a CoreContext,
key_like: Option<&'a String>,
multiplex_id: MultiplexId,
older_than: DateTime,
limit: usize,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>> {
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error> {
STATS::iters.add_value(1);
let query = match &key_like {
Some(sql_like) => GetRangeOfEntriesLike::query(
&self.read_master_connection,
&sql_like,
&multiplex_id,
&older_than.into(),
&limit,
)
.compat()
.left_future(),
None => GetRangeOfEntries::query(
&self.read_master_connection,
&multiplex_id,
&older_than.into(),
&limit,
)
.compat()
.right_future(),
};
async move {
let rows = query.await?;
Ok(rows
.into_iter()
.map(
|(blobstore_key, blobstore_id, multiplex_id, timestamp, operation_key, id)| {
BlobstoreSyncQueueEntry {
blobstore_key,
blobstore_id,
multiplex_id,
timestamp: timestamp.into(),
operation_key,
id: Some(id),
}
},
let rows = match key_like {
Some(sql_like) => {
GetRangeOfEntriesLike::query(
&self.read_master_connection,
sql_like,
&multiplex_id,
&older_than.into(),
&limit,
)
.collect())
}
.boxed()
}
fn del(
&self,
_ctx: CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> BoxFuture<'static, Result<(), Error>> {
cloned!(self.write_connection);
async move {
let ids: Vec<u64> = entries
.into_iter()
.map(|entry| {
entry.id.ok_or_else(|| {
format_err!(
"BlobstoreSyncQueueEntry must contain `id` to be able to delete it"
)
})
})
.collect::<Result<_, _>>()?;
for chunk in ids.chunks(10_000) {
let deletion_result = DeleteEntries::query(&write_connection, chunk)
.compat()
.await?;
STATS::dels.add_value(deletion_result.affected_rows() as i64);
.compat()
.await
}
Ok(())
}
.boxed()
None => {
GetRangeOfEntries::query(
&self.read_master_connection,
&multiplex_id,
&older_than.into(),
&limit,
)
.compat()
.await
}
}?;
Ok(rows
.into_iter()
.map(
|(blobstore_key, blobstore_id, multiplex_id, timestamp, operation_key, id)| {
BlobstoreSyncQueueEntry {
blobstore_key,
blobstore_id,
multiplex_id,
timestamp: timestamp.into(),
operation_key,
id: Some(id),
}
},
)
.collect())
}
fn get(
&self,
_ctx: CoreContext,
key: String,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>> {
let query = GetByKey::query(&self.read_master_connection, &key).compat();
async move {
let rows = query.await?;
Ok(rows
.into_iter()
.map(
|(blobstore_key, blobstore_id, multiplex_id, timestamp, operation_key, id)| {
BlobstoreSyncQueueEntry {
blobstore_key,
blobstore_id,
multiplex_id,
timestamp: timestamp.into(),
operation_key,
id: Some(id),
}
},
)
.collect())
async fn del<'a>(
&'a self,
_ctx: &'a CoreContext,
entries: &'a [BlobstoreSyncQueueEntry],
) -> Result<(), Error> {
let ids: Vec<u64> = entries
.into_iter()
.map(|entry| {
entry.id.ok_or_else(|| {
format_err!("BlobstoreSyncQueueEntry must contain `id` to be able to delete it")
})
})
.collect::<Result<_, _>>()?;
for chunk in ids.chunks(10_000) {
let deletion_result = DeleteEntries::query(&self.write_connection, chunk)
.compat()
.await?;
STATS::dels.add_value(deletion_result.affected_rows() as i64);
}
.boxed()
Ok(())
}
async fn get<'a>(
&'a self,
_ctx: &'a CoreContext,
key: &'a String,
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error> {
let rows = GetByKey::query(&self.read_master_connection, key)
.compat()
.await?;
Ok(rows
.into_iter()
.map(
|(blobstore_key, blobstore_id, multiplex_id, timestamp, operation_key, id)| {
BlobstoreSyncQueueEntry {
blobstore_key,
blobstore_id,
multiplex_id,
timestamp: timestamp.into(),
operation_key,
id: Some(id),
}
},
)
.collect())
}
}

View File

@ -46,65 +46,59 @@ async fn test_simple(fb: FacebookInit) -> Result<(), Error> {
let entry4 = BlobstoreSyncQueueEntry::new(key0.clone(), bs1, mp, t2, op2);
// add
assert!(queue.add(ctx.clone(), entry0.clone()).await.is_ok());
assert!(queue.add(ctx.clone(), entry1.clone()).await.is_ok());
assert!(queue.add(ctx.clone(), entry2.clone()).await.is_ok());
assert!(queue.add(ctx.clone(), entry3.clone()).await.is_ok());
assert!(queue.add(ctx.clone(), entry4.clone()).await.is_ok());
assert!(queue.add(&ctx, entry0.clone()).await.is_ok());
assert!(queue.add(&ctx, entry1.clone()).await.is_ok());
assert!(queue.add(&ctx, entry2.clone()).await.is_ok());
assert!(queue.add(&ctx, entry3.clone()).await.is_ok());
assert!(queue.add(&ctx, entry4.clone()).await.is_ok());
// get
let entries1 = queue
.get(ctx.clone(), key0.clone())
.await
.expect("Get failed");
let entries1 = queue.get(&ctx, &key0).await.expect("Get failed");
assert_eq!(entries1.len(), 4);
assert_eq!(entries1[0].operation_key, op0);
let entries2 = queue
.get(ctx.clone(), key1.clone())
.await
.expect("Get failed");
let entries2 = queue.get(&ctx, &key1).await.expect("Get failed");
assert_eq!(entries2.len(), 1);
assert_eq!(entries2[0].operation_key, op1);
// iter
let some_entries = queue
.iter(ctx.clone(), None, mp, t1, 1)
.iter(&ctx, None, mp, t1, 1)
.await
.expect("DateTime range iteration failed");
assert_eq!(some_entries.len(), 2);
let some_entries = queue
.iter(ctx.clone(), None, mp, t1, 2)
.iter(&ctx, None, mp, t1, 2)
.await
.expect("DateTime range iteration failed");
assert_eq!(some_entries.len(), 3);
let some_entries = queue
.iter(ctx.clone(), None, mp, t0, 1)
.iter(&ctx, None, mp, t0, 1)
.await
.expect("DateTime range iteration failed");
assert_eq!(some_entries.len(), 2);
let some_entries = queue
.iter(ctx.clone(), None, mp, t0, 100)
.iter(&ctx, None, mp, t0, 100)
.await
.expect("DateTime range iteration failed");
assert_eq!(some_entries.len(), 2);
// delete
queue
.del(ctx.clone(), vec![entry0])
.del(&ctx, &[entry0])
.await
.expect_err("Deleting entry without `id` should have failed");
queue
.del(ctx.clone(), entries1)
.del(&ctx, &entries1)
.await
.expect("Failed to remove entries1");
queue
.del(ctx.clone(), entries2)
.del(&ctx, &entries2)
.await
.expect("Failed to remove entries2");
// iter
let entries = queue
.iter(ctx.clone(), None, mp, t1, 100)
.iter(&ctx, None, mp, t1, 100)
.await
.expect("Iterating over entries failed");
assert_eq!(entries.len(), 0);

View File

@ -9,6 +9,7 @@
//! --dry-run mode to test the healer
use anyhow::Error;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry};
use context::CoreContext;
@ -77,44 +78,46 @@ impl<Q: BlobstoreSyncQueue> DummyBlobstoreSyncQueue<Q> {
}
}
#[async_trait]
impl<Q: BlobstoreSyncQueue> BlobstoreSyncQueue for DummyBlobstoreSyncQueue<Q> {
fn add_many(
&self,
_ctx: CoreContext,
entries: Box<dyn Iterator<Item = BlobstoreSyncQueueEntry> + Send>,
) -> BoxFuture<'static, Result<(), Error>> {
let entries: Vec<_> = entries.map(|e| format!("{:?}", e)).collect();
async fn add_many<'a>(
&'a self,
_ctx: &'a CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> Result<(), Error> {
let entries: Vec<_> = entries.into_iter().map(|e| format!("{:?}", e)).collect();
info!(self.logger, "I would have written {}", entries.join(",\n"));
future::ok(()).boxed()
Ok(())
}
fn iter(
&self,
ctx: CoreContext,
key_like: Option<String>,
async fn iter<'a>(
&'a self,
ctx: &'a CoreContext,
key_like: Option<&'a String>,
multiplex_id: MultiplexId,
older_than: DateTime,
limit: usize,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>> {
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error> {
self.inner
.iter(ctx, key_like, multiplex_id, older_than, limit)
.await
}
fn del(
&self,
_ctx: CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> BoxFuture<'static, Result<(), Error>> {
async fn del<'a>(
&'a self,
_ctx: &'a CoreContext,
entries: &'a [BlobstoreSyncQueueEntry],
) -> Result<(), Error> {
let entries: Vec<_> = entries.iter().map(|e| format!("{:?}", e)).collect();
info!(self.logger, "I would have deleted {}", entries.join(",\n"));
future::ok(()).boxed()
Ok(())
}
fn get(
&self,
ctx: CoreContext,
key: String,
) -> BoxFuture<'static, Result<Vec<BlobstoreSyncQueueEntry>, Error>> {
self.inner.get(ctx, key)
async fn get<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a String,
) -> Result<Vec<BlobstoreSyncQueueEntry>, Error> {
self.inner.get(ctx, key).await
}
}

View File

@ -74,8 +74,8 @@ impl Healer {
let queue_entries = self
.sync_queue
.iter(
ctx.clone(),
self.blobstore_key_like.clone(),
ctx,
self.blobstore_key_like.as_ref(),
self.multiplex_id,
healing_deadline.clone(),
self.blobstore_sync_queue_limit,
@ -453,7 +453,7 @@ async fn cleanup_after_healing(
) -> Result<u64> {
let n = entries.len() as u64;
info!(ctx.logger(), "Deleting {} actioned queue entries", n);
sync_queue.del(ctx.clone(), entries).await?;
sync_queue.del(ctx, &entries).await?;
Ok(n)
}
@ -481,7 +481,5 @@ async fn requeue_partial_heal(
id: None,
})
.collect();
sync_queue
.add_many(ctx.clone(), Box::new(new_entries.into_iter()))
.await
sync_queue.add_many(ctx, new_entries).await
}

View File

@ -91,9 +91,7 @@ impl<Q: BlobstoreSyncQueue> BlobstoreSyncQueueExt for Q {
) -> BoxFuture<'out, Result<usize>> {
let zero_date = DateTime::now();
async move {
let entries = self
.iter(ctx.clone(), None, multiplex_id, zero_date, 100)
.await?;
let entries = self.iter(ctx, None, multiplex_id, zero_date, 100).await?;
if entries.len() >= 100 {
Err(format_err!("too many entries"))
} else {
@ -630,9 +628,7 @@ async fn healer_heal_with_failing_blobstore(fb: FacebookInit) -> Result<(), Erro
t0,
op0,
)];
sync_queue
.add_many(ctx.clone(), Box::new(entries.into_iter()))
.await?;
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(1000, 10, sync_queue.clone(), stores, mp, None, false);
@ -685,9 +681,7 @@ async fn healer_heal_with_default_multiplex_id(fb: FacebookInit) -> Result<(), E
];
let sync_queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory()?);
sync_queue
.add_many(ctx.clone(), Box::new(entries.into_iter()))
.await?;
sync_queue.add_many(&ctx, entries).await?;
// We aren't healing blobs for old_mp, so expect to only have 1 blob in each
// blobstore at the end of the test.
@ -723,9 +717,7 @@ async fn healer_heal_complete_batch(fb: FacebookInit) -> Result<(), Error> {
];
let sync_queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory()?);
sync_queue
.add_many(ctx.clone(), Box::new(entries.into_iter()))
.await?;
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(2, 10, sync_queue, stores, mp, None, false);
let (complete_batch, _) = healer.heal(&ctx, DateTime::now()).await?;
@ -753,9 +745,7 @@ async fn healer_heal_incomplete_batch(fb: FacebookInit) -> Result<(), Error> {
];
let sync_queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory()?);
sync_queue
.add_many(ctx.clone(), Box::new(entries.into_iter()))
.await?;
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(20, 10, sync_queue, stores, mp, None, false);
let (complete_batch, _) = healer.heal(&ctx, DateTime::now()).await?;

View File

@ -344,16 +344,20 @@ async fn populate_healer_queue(
if !config.dry_run {
let src_blobstore_id = config.src_blobstore_id;
let multiplex_id = config.multiplex_id;
let iterator_box = Box::new(entries.keys.into_iter().map(move |entry| {
BlobstoreSyncQueueEntry::new(
entry,
src_blobstore_id,
multiplex_id,
DateTime::now(),
OperationKey::gen(),
)
}));
queue.add_many(config.ctx.clone(), iterator_box).await?;
let entries = entries
.keys
.into_iter()
.map(move |entry| {
BlobstoreSyncQueueEntry::new(
entry,
src_blobstore_id,
multiplex_id,
DateTime::now(),
OperationKey::gen(),
)
})
.collect();
queue.add_many(&config.ctx, entries).await?;
}
state = put_resume_state(blobstore.clone(), &config, state).await?;
match entries.next_token {