Add automatic retry of queries

Database queries will now be retried if possible using exponential
backoff with jitter. This should help protect against connection
problems and also timeouts caused by us now setting lock_timeout to
avoid blocking other queries. If such a timeout occurs, we should fail
to let waiting queries execute and then try again until the blocking
queries have completed.
This commit is contained in:
fabianlindfors 2022-01-25 16:27:33 +01:00
parent c3369a4c12
commit 40fb39741d
2 changed files with 62 additions and 4 deletions

View File

@ -19,4 +19,5 @@ anyhow = { version = "1.0.44", features = ["backtrace"] }
clap = { version = "3.0.0", features = ["derive"] }
toml = "0.5"
version = "3.0.0"
colored = "2"
colored = "2"
rand = "0.8"

View File

@ -1,5 +1,8 @@
use std::{cmp::min, time::Duration};
use anyhow::{anyhow, Context};
use postgres::{types::ToSql, NoTls, Row};
use rand::prelude::*;
// DbLocker wraps a regular DbConn, only allowing access using the
// `lock` method. This method will acquire the advisory lock before
@ -103,12 +106,12 @@ impl DbConn {
impl Conn for DbConn {
fn run(&mut self, query: &str) -> anyhow::Result<()> {
self.client.batch_execute(query)?;
retry_automatically(|| self.client.batch_execute(query))?;
Ok(())
}
fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>> {
let rows = self.client.query(query, &[])?;
let rows = retry_automatically(|| self.client.query(query, &[]))?;
Ok(rows)
}
@ -117,7 +120,7 @@ impl Conn for DbConn {
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Vec<Row>> {
let rows = self.client.query(query, params)?;
let rows = retry_automatically(|| self.client.query(query, params))?;
Ok(rows)
}
@ -168,3 +171,57 @@ impl Conn for Transaction<'_> {
Ok(Transaction { transaction })
}
}
// Retry a database operation with exponential backoff and jitter
fn retry_automatically<T>(
mut f: impl FnMut() -> Result<T, postgres::Error>,
) -> Result<T, postgres::Error> {
const STARTING_WAIT_TIME: u64 = 100;
const MAX_WAIT_TIME: u64 = 3_200;
const MAX_ATTEMPTS: u32 = 10;
let mut rng = rand::thread_rng();
let mut attempts = 0;
loop {
let result = f();
let error = match result {
Ok(_) => return result,
Err(err) => err,
};
// If we got a database error, we check if it's retryable.
// If we didn't get a database error, then it's most likely some kind of connection
// error which should also be retried.
if let Some(db_error) = error.as_db_error() {
if !error_retryable(db_error) {
return Err(error);
}
}
attempts += 1;
if attempts >= MAX_ATTEMPTS {
return Err(error);
}
// The wait time increases exponentially, starting at 100ms and doubling up to a max of 3.2s.
let wait_time = min(
MAX_WAIT_TIME,
STARTING_WAIT_TIME * u64::pow(2, attempts - 1),
);
// The jitter is up to half the wait time
let jitter: u64 = rng.gen_range(0..wait_time / 2);
std::thread::sleep(Duration::from_millis(wait_time + jitter));
}
}
// Check if a database error can be retried
fn error_retryable(error: &postgres::error::DbError) -> bool {
match error.code() {
// Caused by lock_timeout being exceeded
&postgres::error::SqlState::LOCK_NOT_AVAILABLE => true,
_ => false,
}
}