add claimed_by to requests table

Summary:
For debugging and better error messages it would be nice to know who worked on
a given request.

I'll do AOSC schema change once accepted.

Reviewed By: krallin

Differential Revision: D28441673

fbshipit-source-id: ba146d7f43dde26d9433f76af7fe982da14b5b82
This commit is contained in:
Mateusz Kwapich 2021-05-25 09:10:28 -07:00 committed by Facebook GitHub Bot
parent fb901786da
commit 7f0144d872
5 changed files with 37 additions and 12 deletions

View File

@ -140,7 +140,7 @@ mod tests {
use blobstore::{Loadable, Storable};
use context::CoreContext;
use fbinit::FacebookInit;
use requests_table::RequestStatus;
use requests_table::{ClaimedBy, RequestStatus};
use source_control::{
MegarepoAddTargetParams as ThriftMegarepoAddTargetParams,
@ -210,7 +210,7 @@ mod tests {
// Verify that poll_once on this request in a "in_progress" state
// returns None
q.table.mark_in_progress(&ctx, &req_id).await?;
q.table.mark_in_progress(&ctx, &req_id, &ClaimedBy("test".to_string())).await?;
let in_progress_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
assert!(in_progress_poll.is_none());

View File

@ -16,5 +16,6 @@
`started_processing_at` bigint(20) DEFAULT NULL,
`ready_at` bigint(20) DEFAULT NULL,
`polled_at` bigint(20) DEFAULT NULL,
`status` VARCHAR(32) NOT NULL -- enum('new','in_progress','ready','polled') NOT NULL DEFAULT 'new',
`status` VARCHAR(32) NOT NULL, -- enum('new','in_progress','ready','polled') NOT NULL DEFAULT 'new',
`claimed_by` VARCHAR(255) NULL
);

View File

@ -18,7 +18,7 @@ mod types;
pub use crate::store::SqlLongRunningRequestsQueue;
pub use crate::types::{
BlobstoreKey, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId,
BlobstoreKey, ClaimedBy, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId,
};
/// A queue of long-running requests
@ -54,7 +54,12 @@ pub trait LongRunningRequestsQueue: Send + Sync {
) -> Result<Option<LongRunningRequestEntry>>;
/// Mark request as in-progress
async fn mark_in_progress(&self, ctx: &CoreContext, req_id: &RequestId) -> Result<bool>;
async fn mark_in_progress(
&self,
ctx: &CoreContext,
req_id: &RequestId,
claimed_by: &ClaimedBy,
) -> Result<bool>;
/// Mark request as ready
async fn mark_ready(

View File

@ -15,7 +15,9 @@ use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql_ext::SqlConnections;
use crate::LongRunningRequestsQueue;
use crate::{BlobstoreKey, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId};
use crate::{
BlobstoreKey, ClaimedBy, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId,
};
queries! {
read TestGetRequest(id: RowId) -> (
@ -29,6 +31,7 @@ queries! {
Option<Timestamp>,
Option<Timestamp>,
RequestStatus,
Option<ClaimedBy>,
) {
"SELECT request_type,
repo_id,
@ -39,7 +42,8 @@ queries! {
started_processing_at,
ready_at,
polled_at,
status
status,
claimed_by
FROM long_running_request_queue
WHERE id = {id}"
}
@ -54,6 +58,7 @@ queries! {
Option<Timestamp>,
Option<Timestamp>,
RequestStatus,
Option<ClaimedBy>,
) {
"SELECT repo_id,
bookmark,
@ -63,7 +68,8 @@ queries! {
started_processing_at,
ready_at,
polled_at,
status
status,
claimed_by
FROM long_running_request_queue
WHERE id = {id} AND request_type = {request_type}"
}
@ -79,15 +85,15 @@ queries! {
write MarkRequestAsNewAgain(id: RowId, request_type: RequestType) {
none,
"UPDATE long_running_request_queue
SET status = 'new'
SET status = 'new', claimed_by = NULL
WHERE id = {id} AND request_type = {request_type} AND status = 'inprogress'
"
}
write MarkRequestInProgress(id: RowId, request_type: RequestType, started_processing_at: Timestamp) {
write MarkRequestInProgress(id: RowId, request_type: RequestType, started_processing_at: Timestamp, claimed_by: ClaimedBy) {
none,
"UPDATE long_running_request_queue
SET started_processing_at = {started_processing_at}, status = 'inprogress'
SET started_processing_at = {started_processing_at}, status = 'inprogress', claimed_by = {claimed_by}
WHERE id = {id} AND request_type = {request_type} AND status = 'new'
"
}
@ -168,6 +174,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue {
ready_at,
polled_at,
status,
claimed_by,
) = row;
Ok(Some(LongRunningRequestEntry {
id: *id,
@ -181,17 +188,24 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue {
ready_at,
polled_at,
status,
claimed_by,
}))
}
}
}
async fn mark_in_progress(&self, _ctx: &CoreContext, req_id: &RequestId) -> Result<bool> {
async fn mark_in_progress(
&self,
_ctx: &CoreContext,
req_id: &RequestId,
claimed_by: &ClaimedBy,
) -> Result<bool> {
let res = MarkRequestInProgress::query(
&self.connections.write_connection,
&req_id.0,
&req_id.1,
&Timestamp::now(),
&claimed_by,
)
.await?;
Ok(res.affected_rows() > 0)
@ -249,6 +263,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue {
ready_at,
polled_at,
status,
claimed_by,
) = row;
match status {
@ -281,6 +296,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue {
ready_at,
polled_at,
status: RequestStatus::Polled,
claimed_by,
},
))
}
@ -298,6 +314,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue {
ready_at,
polled_at,
status: RequestStatus::Polled,
claimed_by: None,
},
)),
_ => None,

View File

@ -91,6 +91,7 @@ macro_rules! mysql_string_newtype {
mysql_string_newtype!(BlobstoreKey);
mysql_string_newtype!(RequestType);
mysql_string_newtype!(ClaimedBy);
#[derive(Clone, Copy, Debug, Eq, PartialEq, mysql::OptTryFromRowField)]
pub enum RequestStatus {
@ -113,6 +114,7 @@ pub struct LongRunningRequestEntry {
pub ready_at: Option<Timestamp>,
pub polled_at: Option<Timestamp>,
pub status: RequestStatus,
pub claimed_by: Option<ClaimedBy>,
}
impl std::fmt::Display for RequestStatus {