From 7f0144d87207e42f98ff3bc599fd69b8f367b39d Mon Sep 17 00:00:00 2001 From: Mateusz Kwapich Date: Tue, 25 May 2021 09:10:28 -0700 Subject: [PATCH] add claimed_by to requests table Summary: For debugging and better error messages it would be nice to know who worked on a given request. I'll do AOSC schema change once accepted. Reviewed By: krallin Differential Revision: D28441673 fbshipit-source-id: ba146d7f43dde26d9433f76af7fe982da14b5b82 --- .../megarepo_api/async_requests/src/queue.rs | 4 +-- .../sqlite-long_running_requests_queue.sql | 3 +- .../megarepo_api/requests_table/src/lib.rs | 9 ++++-- .../megarepo_api/requests_table/src/store.rs | 31 ++++++++++++++----- .../megarepo_api/requests_table/src/types.rs | 2 ++ 5 files changed, 37 insertions(+), 12 deletions(-) diff --git a/eden/mononoke/megarepo_api/async_requests/src/queue.rs b/eden/mononoke/megarepo_api/async_requests/src/queue.rs index 79a93f1ea3..f9178c4860 100644 --- a/eden/mononoke/megarepo_api/async_requests/src/queue.rs +++ b/eden/mononoke/megarepo_api/async_requests/src/queue.rs @@ -140,7 +140,7 @@ mod tests { use blobstore::{Loadable, Storable}; use context::CoreContext; use fbinit::FacebookInit; - use requests_table::RequestStatus; + use requests_table::{ClaimedBy, RequestStatus}; use source_control::{ MegarepoAddTargetParams as ThriftMegarepoAddTargetParams, @@ -210,7 +210,7 @@ mod tests { // Verify that poll_once on this request in a "in_progress" state // returns None - q.table.mark_in_progress(&ctx, &req_id).await?; + q.table.mark_in_progress(&ctx, &req_id, &ClaimedBy("test".to_string())).await?; let in_progress_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?; assert!(in_progress_poll.is_none()); diff --git a/eden/mononoke/megarepo_api/requests_table/schemas/sqlite-long_running_requests_queue.sql b/eden/mononoke/megarepo_api/requests_table/schemas/sqlite-long_running_requests_queue.sql index 37b7c8b83d..17a121dc01 100644 --- a/eden/mononoke/megarepo_api/requests_table/schemas/sqlite-long_running_requests_queue.sql +++ b/eden/mononoke/megarepo_api/requests_table/schemas/sqlite-long_running_requests_queue.sql @@ -16,5 +16,6 @@ `started_processing_at` bigint(20) DEFAULT NULL, `ready_at` bigint(20) DEFAULT NULL, `polled_at` bigint(20) DEFAULT NULL, - `status` VARCHAR(32) NOT NULL -- enum('new','in_progress','ready','polled') NOT NULL DEFAULT 'new', + `status` VARCHAR(32) NOT NULL, -- enum('new','in_progress','ready','polled') NOT NULL DEFAULT 'new', + `claimed_by` VARCHAR(255) NULL ); diff --git a/eden/mononoke/megarepo_api/requests_table/src/lib.rs b/eden/mononoke/megarepo_api/requests_table/src/lib.rs index dde0d36e68..036f567d56 100644 --- a/eden/mononoke/megarepo_api/requests_table/src/lib.rs +++ b/eden/mononoke/megarepo_api/requests_table/src/lib.rs @@ -18,7 +18,7 @@ mod types; pub use crate::store::SqlLongRunningRequestsQueue; pub use crate::types::{ - BlobstoreKey, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId, + BlobstoreKey, ClaimedBy, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId, }; /// A queue of long-running requests @@ -54,7 +54,12 @@ pub trait LongRunningRequestsQueue: Send + Sync { ) -> Result>; /// Mark request as in-progress - async fn mark_in_progress(&self, ctx: &CoreContext, req_id: &RequestId) -> Result; + async fn mark_in_progress( + &self, + ctx: &CoreContext, + req_id: &RequestId, + claimed_by: &ClaimedBy, + ) -> Result; /// Mark request as ready async fn mark_ready( diff --git a/eden/mononoke/megarepo_api/requests_table/src/store.rs b/eden/mononoke/megarepo_api/requests_table/src/store.rs index e830eb3d7b..670888f42d 100644 --- a/eden/mononoke/megarepo_api/requests_table/src/store.rs +++ b/eden/mononoke/megarepo_api/requests_table/src/store.rs @@ -15,7 +15,9 @@ use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig}; use sql_ext::SqlConnections; use crate::LongRunningRequestsQueue; -use crate::{BlobstoreKey, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId}; +use crate::{ + BlobstoreKey, ClaimedBy, LongRunningRequestEntry, RequestId, RequestStatus, RequestType, RowId, +}; queries! { read TestGetRequest(id: RowId) -> ( @@ -29,6 +31,7 @@ queries! { Option, Option, RequestStatus, + Option, ) { "SELECT request_type, repo_id, @@ -39,7 +42,8 @@ queries! { started_processing_at, ready_at, polled_at, - status + status, + claimed_by FROM long_running_request_queue WHERE id = {id}" } @@ -54,6 +58,7 @@ queries! { Option, Option, RequestStatus, + Option, ) { "SELECT repo_id, bookmark, @@ -63,7 +68,8 @@ queries! { started_processing_at, ready_at, polled_at, - status + status, + claimed_by FROM long_running_request_queue WHERE id = {id} AND request_type = {request_type}" } @@ -79,15 +85,15 @@ queries! { write MarkRequestAsNewAgain(id: RowId, request_type: RequestType) { none, "UPDATE long_running_request_queue - SET status = 'new' + SET status = 'new', claimed_by = NULL WHERE id = {id} AND request_type = {request_type} AND status = 'inprogress' " } - write MarkRequestInProgress(id: RowId, request_type: RequestType, started_processing_at: Timestamp) { + write MarkRequestInProgress(id: RowId, request_type: RequestType, started_processing_at: Timestamp, claimed_by: ClaimedBy) { none, "UPDATE long_running_request_queue - SET started_processing_at = {started_processing_at}, status = 'inprogress' + SET started_processing_at = {started_processing_at}, status = 'inprogress', claimed_by = {claimed_by} WHERE id = {id} AND request_type = {request_type} AND status = 'new' " } @@ -168,6 +174,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue { ready_at, polled_at, status, + claimed_by, ) = row; Ok(Some(LongRunningRequestEntry { id: *id, @@ -181,17 +188,24 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue { ready_at, polled_at, status, + claimed_by, })) } } } - async fn mark_in_progress(&self, _ctx: &CoreContext, req_id: &RequestId) -> Result { + async fn mark_in_progress( + &self, + _ctx: &CoreContext, + req_id: &RequestId, + claimed_by: &ClaimedBy, + ) -> Result { let res = MarkRequestInProgress::query( &self.connections.write_connection, &req_id.0, &req_id.1, &Timestamp::now(), + &claimed_by, ) .await?; Ok(res.affected_rows() > 0) @@ -249,6 +263,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue { ready_at, polled_at, status, + claimed_by, ) = row; match status { @@ -281,6 +296,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue { ready_at, polled_at, status: RequestStatus::Polled, + claimed_by, }, )) } @@ -298,6 +314,7 @@ impl LongRunningRequestsQueue for SqlLongRunningRequestsQueue { ready_at, polled_at, status: RequestStatus::Polled, + claimed_by: None, }, )), _ => None, diff --git a/eden/mononoke/megarepo_api/requests_table/src/types.rs b/eden/mononoke/megarepo_api/requests_table/src/types.rs index 9f37dfa08e..48e9d6bdf6 100644 --- a/eden/mononoke/megarepo_api/requests_table/src/types.rs +++ b/eden/mononoke/megarepo_api/requests_table/src/types.rs @@ -91,6 +91,7 @@ macro_rules! mysql_string_newtype { mysql_string_newtype!(BlobstoreKey); mysql_string_newtype!(RequestType); +mysql_string_newtype!(ClaimedBy); #[derive(Clone, Copy, Debug, Eq, PartialEq, mysql::OptTryFromRowField)] pub enum RequestStatus { @@ -113,6 +114,7 @@ pub struct LongRunningRequestEntry { pub ready_at: Option, pub polled_at: Option, pub status: RequestStatus, + pub claimed_by: Option, } impl std::fmt::Display for RequestStatus {