From 40fb39741de501d2655876524159b5de260b5a0d Mon Sep 17 00:00:00 2001 From: fabianlindfors Date: Tue, 25 Jan 2022 16:27:33 +0100 Subject: [PATCH] Add automatic retry of queries Database queries will now be retried if possible using exponential backoff with jitter. This should help protect against connection problems and also timeouts caused by us now setting lock_timeout to avoid blocking other queries. If such a timeout occurs, we should fail to let waiting queries execute and then try again until the blocking queries have completed. --- Cargo.toml | 3 ++- src/db.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 292a237..3f40e09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,5 @@ anyhow = { version = "1.0.44", features = ["backtrace"] } clap = { version = "3.0.0", features = ["derive"] } toml = "0.5" version = "3.0.0" -colored = "2" \ No newline at end of file +colored = "2" +rand = "0.8" \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 5197124..e0f01eb 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,8 @@ +use std::{cmp::min, time::Duration}; + use anyhow::{anyhow, Context}; use postgres::{types::ToSql, NoTls, Row}; +use rand::prelude::*; // DbLocker wraps a regular DbConn, only allowing access using the // `lock` method. This method will acquire the advisory lock before @@ -103,12 +106,12 @@ impl DbConn { impl Conn for DbConn { fn run(&mut self, query: &str) -> anyhow::Result<()> { - self.client.batch_execute(query)?; + retry_automatically(|| self.client.batch_execute(query))?; Ok(()) } fn query(&mut self, query: &str) -> anyhow::Result> { - let rows = self.client.query(query, &[])?; + let rows = retry_automatically(|| self.client.query(query, &[]))?; Ok(rows) } @@ -117,7 +120,7 @@ impl Conn for DbConn { query: &str, params: &[&(dyn ToSql + Sync)], ) -> anyhow::Result> { - let rows = self.client.query(query, params)?; + let rows = retry_automatically(|| self.client.query(query, params))?; Ok(rows) } @@ -168,3 +171,57 @@ impl Conn for Transaction<'_> { Ok(Transaction { transaction }) } } + +// Retry a database operation with exponential backoff and jitter +fn retry_automatically( + mut f: impl FnMut() -> Result, +) -> Result { + const STARTING_WAIT_TIME: u64 = 100; + const MAX_WAIT_TIME: u64 = 3_200; + const MAX_ATTEMPTS: u32 = 10; + + let mut rng = rand::thread_rng(); + let mut attempts = 0; + loop { + let result = f(); + + let error = match result { + Ok(_) => return result, + Err(err) => err, + }; + + // If we got a database error, we check if it's retryable. + // If we didn't get a database error, then it's most likely some kind of connection + // error which should also be retried. + if let Some(db_error) = error.as_db_error() { + if !error_retryable(db_error) { + return Err(error); + } + } + + attempts += 1; + if attempts >= MAX_ATTEMPTS { + return Err(error); + } + + // The wait time increases exponentially, starting at 100ms and doubling up to a max of 3.2s. + let wait_time = min( + MAX_WAIT_TIME, + STARTING_WAIT_TIME * u64::pow(2, attempts - 1), + ); + + // The jitter is up to half the wait time + let jitter: u64 = rng.gen_range(0..wait_time / 2); + + std::thread::sleep(Duration::from_millis(wait_time + jitter)); + } +} + +// Check if a database error can be retried +fn error_retryable(error: &postgres::error::DbError) -> bool { + match error.code() { + // Caused by lock_timeout being exceeded + &postgres::error::SqlState::LOCK_NOT_AVAILABLE => true, + _ => false, + } +}