wal: Remove operation key

Summary:
**Context**: We want to roll out the new WAL multiplexed blobstore and need final tweaks before doing that. See https://fb.quip.com/vyTWAgGZSA2Z

Operation key was initially added on D20557702 (3d314761c3) because we had the problem of doing `select * where blobstore_key=X` and getting entries from multiple writings to the same key.

This is not a problem on WAL anymore, because we never do this type of select, each row is enough information for us and there's no need for follow up queries.

After we fully remove old multiplex, we can delete all code for operation key.

Note: Since I was also touching the table schema, I also fixed some columns, made `blob_size` not null, since we never insert a NULL value, it being nullable was a remnant from the old sync queue, where it was added after the table already existed.

Differential Revision: D40718122

fbshipit-source-id: 661c6f28d1c13f94fe757b7c13f5832187146208
This commit is contained in:
Yan Soares Couto 2022-11-03 15:12:55 -07:00 committed by Facebook GitHub Bot
parent bf32800670
commit a066ea558e
8 changed files with 62 additions and 143 deletions

View File

@ -25,7 +25,6 @@ use blobstore::PutBehaviour;
use blobstore_stats::OperationType;
use blobstore_sync_queue::BlobstoreWal;
use blobstore_sync_queue::BlobstoreWalEntry;
use blobstore_sync_queue::OperationKey;
use cloned::cloned;
use context::CoreContext;
use context::PerfCounterType;
@ -192,19 +191,11 @@ impl WalMultiplexedBlobstore {
ctx.perf_counters()
.increment_counter(PerfCounterType::BlobPuts);
// Unique id associated with the put operation for this multiplexed blobstore.
let operation_key = OperationKey::gen();
let blob_size = value.len() as u64;
// Log the blobstore key and wait till it succeeds
let ts = Timestamp::now();
let log_entry = BlobstoreWalEntry::new(
key.clone(),
self.multiplex_id,
ts,
operation_key,
Some(blob_size),
);
let log_entry = BlobstoreWalEntry::new(key.clone(), self.multiplex_id, ts, blob_size);
self.wal_queue.log(ctx, log_entry).await.with_context(|| {
format!(
"WAL Multiplexed Blobstore: Failed writing to the WAL: key {}",

View File

@ -19,7 +19,6 @@ use blobstore::BlobstoreGetData;
use blobstore::BlobstoreIsPresent;
use blobstore::BlobstorePutOps;
use blobstore_sync_queue::BlobstoreWal;
use blobstore_sync_queue::OperationKey;
use blobstore_sync_queue::SqlBlobstoreWal;
use blobstore_test_utils::Tickable;
use bytes::Bytes;
@ -940,7 +939,7 @@ fn setup_multiplex(
quorum: usize,
timeout: Option<MultiplexTimeout>,
) -> Result<(
Arc<Tickable<OperationKey>>,
Arc<Tickable<()>>,
Vec<(BlobstoreId, Arc<Tickable<(BlobstoreBytes, u64)>>)>,
WalMultiplexedBlobstore,
)> {
@ -979,7 +978,7 @@ fn setup_blobstores(
(tickable_blobstores, blobstores)
}
fn setup_queue() -> (Arc<Tickable<OperationKey>>, Arc<dyn BlobstoreWal>) {
fn setup_queue() -> (Arc<Tickable<()>>, Arc<dyn BlobstoreWal>) {
let tickable_queue = Arc::new(Tickable::new());
let wal_queue = tickable_queue.clone() as Arc<dyn BlobstoreWal>;
(tickable_queue, wal_queue)

View File

@ -24,7 +24,6 @@ use blobstore::OverwriteStatus;
use blobstore::PutBehaviour;
use blobstore_sync_queue::BlobstoreWal;
use blobstore_sync_queue::BlobstoreWalEntry;
use blobstore_sync_queue::OperationKey;
use context::CoreContext;
use futures::channel::oneshot;
use lock_ext::LockExt;
@ -169,11 +168,11 @@ impl BlobstorePutOps for Tickable<(BlobstoreBytes, u64)> {
}
#[async_trait]
impl BlobstoreWal for Tickable<OperationKey> {
impl BlobstoreWal for Tickable<()> {
async fn log<'a>(&'a self, _ctx: &'a CoreContext, entry: BlobstoreWalEntry) -> Result<()> {
self.on_tick().await?;
self.storage.with(|s| {
s.insert(entry.blobstore_key, entry.operation_key);
s.insert(entry.blobstore_key, ());
});
Ok(())
}

View File

@ -10,8 +10,6 @@ CREATE TABLE IF NOT EXISTS `blobstore_write_ahead_log` (
`blobstore_key` varchar NOT NULL,
`timestamp` BIGINT NOT NULL, /* time the blob was added to the queue */
`multiplex_id` INTEGER NOT NULL,
/* different per different put operations, see D20557702 */
`operation_key` BINARY(16) NOT NULL DEFAULT X'00000000000000000000000000000000',
`blob_size` BIGINT,
`retry_count` INTEGER NOT NULL DEFAULT 0
`blob_size` BIGINT NOT NULL,
`retry_count` INTEGER NOT NULL
);

View File

@ -39,8 +39,6 @@ use sql_construct::SqlShardedConstruct;
use sql_ext::SqlShardedConnections;
use vec1::Vec1;
use crate::OperationKey;
const SQL_WAL_WRITE_BUFFER_SIZE: usize = 1000;
/// Row id of the entry, and which SQL shard it belongs to.
@ -57,8 +55,7 @@ pub struct BlobstoreWalEntry {
pub timestamp: Timestamp,
/// Present if this entry was obtained from reading from SQL
pub read_info: Option<ReadInfo>,
pub operation_key: OperationKey,
pub blob_size: Option<u64>,
pub blob_size: u64,
pub retry_count: u32,
}
@ -67,14 +64,12 @@ impl BlobstoreWalEntry {
blobstore_key: String,
multiplex_id: MultiplexId,
timestamp: Timestamp,
operation_key: OperationKey,
blob_size: Option<u64>,
blob_size: u64,
) -> Self {
Self {
blobstore_key,
multiplex_id,
timestamp,
operation_key,
blob_size,
read_info: None,
retry_count: 0,
@ -85,21 +80,11 @@ impl BlobstoreWalEntry {
self.retry_count += 1;
}
fn into_sql_tuple(
self,
) -> (
String,
MultiplexId,
Timestamp,
OperationKey,
Option<u64>,
u32,
) {
fn into_sql_tuple(self) -> (String, MultiplexId, Timestamp, u64, u32) {
let Self {
blobstore_key,
multiplex_id,
timestamp,
operation_key,
blob_size,
retry_count,
..
@ -108,31 +93,17 @@ impl BlobstoreWalEntry {
blobstore_key,
multiplex_id,
timestamp,
operation_key,
blob_size,
retry_count,
)
}
fn from_row(
shard_id: usize,
row: (
String,
MultiplexId,
Timestamp,
OperationKey,
u64,
Option<u64>,
u32,
),
) -> Self {
let (blobstore_key, multiplex_id, timestamp, operation_key, id, blob_size, retry_count) =
row;
fn from_row(shard_id: usize, row: (String, MultiplexId, Timestamp, u64, u64, u32)) -> Self {
let (blobstore_key, multiplex_id, timestamp, id, blob_size, retry_count) = row;
Self {
blobstore_key,
multiplex_id,
timestamp,
operation_key,
read_info: Some(ReadInfo { id, shard_id }),
blob_size,
retry_count,
@ -403,7 +374,7 @@ async fn insert_entries(
.collect();
let entries_ref: Vec<_> = entries
.iter()
.map(|(a, b, c, d, e, f)| (a, b, c, d, e, f)) // &(a, b, ...) into (&a, &b, ...)
.map(|(a, b, c, d, e)| (a, b, c, d, e)) // &(a, b, ...) into (&a, &b, ...)
.collect();
WalInsertEntry::query(write_connection, &entries_ref).await
@ -419,12 +390,11 @@ queries! {
blobstore_key: String,
multiplex_id: MultiplexId,
timestamp: Timestamp,
operation_key: OperationKey,
blob_size: Option<u64>,
blob_size: u64,
retry_count: u32,
)) {
none,
"INSERT INTO blobstore_write_ahead_log (blobstore_key, multiplex_id, timestamp, operation_key, blob_size, retry_count)
"INSERT INTO blobstore_write_ahead_log (blobstore_key, multiplex_id, timestamp, blob_size, retry_count)
VALUES {values}"
}
@ -439,15 +409,13 @@ queries! {
String,
MultiplexId,
Timestamp,
OperationKey,
u64,
Option<u64>,
u64,
u32,
) {
"SELECT blobstore_key, multiplex_id, timestamp, operation_key, id, blob_size, retry_count
"SELECT blobstore_key, multiplex_id, timestamp, id, blob_size, retry_count
FROM blobstore_write_ahead_log
WHERE multiplex_id = {multiplex_id} AND timestamp <= {older_than}
LIMIT {limit}
"
LIMIT {limit}"
}
}

View File

@ -119,37 +119,26 @@ async fn test_write_ahead_log(fb: FacebookInit) -> Result<(), Error> {
let t1 = DateTime::from_rfc3339("2018-11-29T12:01:00.00Z")?.into();
let t2 = DateTime::from_rfc3339("2018-11-29T12:02:00.00Z")?.into();
let node_id = [1, 2, 2, 4, 5, 6, 7, 8];
// All operation keys are different because using WAL instead of a sync-queue
// allows to write a key to the WAL only once.
// If the key has multiple appearances in the WAL, it means it was written
// in different sessions with different operation keys.
let op0 = OperationKey(Uuid::from_fields(0, 0, 1, &node_id)?); // for key0
let op1 = OperationKey(Uuid::from_fields(0, 0, 2, &node_id)?); // for second put of key0
let op2 = OperationKey(Uuid::from_fields(0, 0, 3, &node_id)?); // for key1
let op3 = OperationKey(Uuid::from_fields(0, 0, 4, &node_id)?); // for second put of key1
let entry0 = BlobstoreWalEntry::new(key0.clone(), mp, t0, op0.clone(), None);
let entry1 = BlobstoreWalEntry::new(key0, mp, t1, op1, None);
let entry2 = BlobstoreWalEntry::new(key1.clone(), mp, t1, op2, None);
let entry3 = BlobstoreWalEntry::new(key1, mp, t2, op3, None);
let entry0 = BlobstoreWalEntry::new(key0.clone(), mp, t0, 12);
let entry1 = BlobstoreWalEntry::new(key0, mp, t1, 13);
let entry2 = BlobstoreWalEntry::new(key1.clone(), mp, t1, 14);
let entry3 = BlobstoreWalEntry::new(key1, mp, t2, 15);
// add
assert!(wal.log(&ctx, entry0.clone()).await.is_ok());
assert!(
wal.log_many(&ctx, vec![entry1, entry2.clone()])
.await
.is_ok()
);
assert!(wal.log(&ctx, entry3.clone()).await.is_ok());
wal.log(&ctx, entry0.clone()).await.unwrap();
wal.log_many(&ctx, vec![entry1, entry2.clone()])
.await
.unwrap();
wal.log(&ctx, entry3.clone()).await.unwrap();
// read different ranges of entries
let validate = |entry: &BlobstoreWalEntry, expected: &BlobstoreWalEntry| {
assert_eq!(entry.blobstore_key, expected.blobstore_key);
assert_eq!(entry.multiplex_id, expected.multiplex_id);
assert_eq!(entry.timestamp, expected.timestamp);
assert_eq!(entry.operation_key, expected.operation_key);
assert_eq!(entry.blob_size, expected.blob_size);
assert_eq!(entry.retry_count, expected.retry_count);
assert!(entry.read_info.is_some());
// read_info is not compared
};
let some_entries = wal

View File

@ -186,9 +186,9 @@ impl WalHealer {
.map(|(key, entries)| {
let entries: Vec<_> = entries.into_iter().collect();
let healing_weight = entries
.iter()
.find_map(|entry| entry.blob_size)
.unwrap_or(DEFAULT_BLOB_SIZE_BYTES);
.first()
// The "or" never happens. Can be fixed with vec1.
.map_or(DEFAULT_BLOB_SIZE_BYTES, |entry| entry.blob_size);
let fut =
heal_blob(ctx, self.blobstores.clone(), key).map(|outcome| (outcome, entries));
@ -434,18 +434,11 @@ async fn enqueue_entries(
let BlobstoreWalEntry {
blobstore_key,
multiplex_id,
operation_key,
blob_size,
..
} = entry;
BlobstoreWalEntry::new(
blobstore_key,
multiplex_id,
Timestamp::now(),
operation_key,
blob_size,
)
BlobstoreWalEntry::new(blobstore_key, multiplex_id, Timestamp::now(), blob_size)
})
.collect();

View File

@ -14,7 +14,6 @@ use async_trait::async_trait;
use blobstore::Blobstore;
use blobstore::BlobstoreBytes;
use blobstore::BlobstoreGetData;
use blobstore_sync_queue::OperationKey;
use blobstore_sync_queue::SqlBlobstoreWal;
use bytes::Bytes;
use context::CoreContext;
@ -118,11 +117,10 @@ async fn test_all_blobstores_failing(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 12);
wal.log_many(&ctx, vec![entry]).await?;
let expected = vec![(key.clone(), op.clone())];
let expected = vec![key.clone()];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -143,7 +141,7 @@ async fn test_all_blobstores_failing(fb: FacebookInit) -> Result<()> {
healer.heal(&ctx, age).await?;
// check that the queue have the entry, because the blob couldn't be healed
let expected = vec![(key.clone(), op)];
let expected = vec![key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
Ok(())
@ -181,13 +179,11 @@ async fn test_healthy_blob(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
// it can even have multiple entries, if the blob was written twice
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op1 = OperationKey::gen();
let entry1 = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op1.clone(), None);
let op2 = OperationKey::gen();
let entry2 = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op2.clone(), None);
let entry1 = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 13);
let entry2 = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 14);
wal.log_many(&ctx, vec![entry1, entry2]).await?;
let expected = vec![(key.clone(), op1), (key.clone(), op2)];
let expected = vec![key.clone(), key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -242,11 +238,10 @@ async fn test_missing_blob_healed(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 15);
wal.log_many(&ctx, vec![entry]).await?;
let expected = vec![(key.clone(), op)];
let expected = vec![key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -301,11 +296,10 @@ async fn test_missing_blob_not_healed(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 16);
wal.log_many(&ctx, vec![entry]).await?;
let expected = vec![(key.clone(), op.clone())];
let expected = vec![key.clone()];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -327,7 +321,7 @@ async fn test_missing_blob_not_healed(fb: FacebookInit) -> Result<()> {
// check that the queue still has the entry, because the blob couldn't be healed
// in the failing blobstore
let expected = vec![(key.clone(), op)];
let expected = vec![key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
Ok(())
@ -360,11 +354,10 @@ async fn test_blob_cannot_be_fetched(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 17);
wal.log_many(&ctx, vec![entry]).await?;
let expected = vec![(key.clone(), op.clone())];
let expected = vec![key.clone()];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -385,7 +378,7 @@ async fn test_blob_cannot_be_fetched(fb: FacebookInit) -> Result<()> {
healer.heal(&ctx, age).await?;
// check that the queue still has the entry, because the blob couldn't be healed
let expected = vec![(key.clone(), op)];
let expected = vec![key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
Ok(())
@ -431,18 +424,15 @@ async fn test_different_blobs_wal_entries(fb: FacebookInit) -> Result<()> {
// entries from different multiplex configuration
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op1 = OperationKey::gen();
let entry1 = BlobstoreWalEntry::new(key1.clone(), mid1, ts, op1.clone(), None);
let op2 = OperationKey::gen();
let entry2 = BlobstoreWalEntry::new(key2.clone(), mid1, ts, op2.clone(), None);
let op3 = OperationKey::gen();
let entry3 = BlobstoreWalEntry::new(key3.clone(), mid2, ts, op3.clone(), None);
let entry1 = BlobstoreWalEntry::new(key1.clone(), mid1, ts, 18);
let entry2 = BlobstoreWalEntry::new(key2.clone(), mid1, ts, 19);
let entry3 = BlobstoreWalEntry::new(key3.clone(), mid2, ts, 20);
wal.log_many(&ctx, vec![entry1, entry2, entry3]).await?;
let expected = vec![(key1.clone(), op1), (key2.clone(), op2)];
let expected = vec![key1, key2];
validate_queue(&ctx, wal.clone(), mid1, Timestamp::now(), expected).await?;
let expected = vec![(key3.clone(), op3.clone())];
let expected = vec![key3.clone()];
validate_queue(&ctx, wal.clone(), mid2, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -456,7 +446,7 @@ async fn test_different_blobs_wal_entries(fb: FacebookInit) -> Result<()> {
// check that the queue has only entries from different multiplex configuration
validate_queue(&ctx, wal.clone(), mid1, Timestamp::now(), vec![]).await?;
let expected = vec![(key3.clone(), op3)];
let expected = vec![key3.clone()];
validate_queue(&ctx, wal.clone(), mid2, Timestamp::now(), expected).await?;
// also check that the third blob wasn't healed
@ -492,11 +482,10 @@ async fn test_blob_missing_completely(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
// it can even have multiple entries, if the blob was written twice
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, ts, 21);
wal.log_many(&ctx, vec![entry]).await?;
let expected = vec![(key.clone(), op.clone())];
let expected = vec![key.clone()];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
let buf_params = BufferedParams {
@ -519,7 +508,7 @@ async fn test_blob_missing_completely(fb: FacebookInit) -> Result<()> {
// check that the queue has the entry, because the blob is completely
// missing (all blobstore reads succeeded, but couldn't find the blob) and we are
// unable to heal it now, but maybe it wasn't yet written to the storages
let expected = vec![(key.clone(), op)];
let expected = vec![key];
validate_queue(&ctx, wal.clone(), multiplex_id, Timestamp::now(), expected).await?;
Ok(())
@ -551,8 +540,7 @@ async fn test_entry_timestamp_updated(fb: FacebookInit) -> Result<()> {
// the queue will have an entry for the previous write
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
let op = OperationKey::gen();
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, original_ts, op.clone(), None);
let entry = BlobstoreWalEntry::new(key.clone(), multiplex_id, original_ts, 22);
wal.log_many(&ctx, vec![entry]).await?;
let original_entries = wal
@ -595,25 +583,19 @@ async fn validate_queue<'a>(
wal: Arc<dyn BlobstoreWal>,
multiplex_id: MultiplexId,
older_than: Timestamp,
mut expected: Vec<(String, OperationKey)>,
mut expected: Vec<String>,
) -> Result<()> {
let mut entries: Vec<_> = wal
.read(ctx, &multiplex_id, &older_than, 100)
.await?
.into_iter()
.map(
|BlobstoreWalEntry {
blobstore_key,
operation_key,
..
}| (blobstore_key, operation_key),
)
.map(|e| e.blobstore_key)
.collect();
assert_eq!(entries.len(), expected.len());
entries.sort_by(|a, b| a.0.cmp(&b.0));
expected.sort_by(|a, b| a.0.cmp(&b.0));
entries.sort();
expected.sort();
for (a, b) in entries.iter().zip(expected.iter()) {
assert_eq!(a, b);