From 86b30a75fad23302c67c3b8d9ce89e76900fafd0 Mon Sep 17 00:00:00 2001 From: Mateusz Kwapich Date: Thu, 27 May 2021 02:37:58 -0700 Subject: [PATCH] migrate to use new params and responses types Summary: This diff switches megarepo_api to use the types created in the previous diffs Reviewed By: krallin Differential Revision: D28606338 fbshipit-source-id: c048f274395239528b3a0280172dd21052055111 --- .../megarepo_api/async_requests/src/queue.rs | 97 ++++++++++--------- .../megarepo_api/async_requests/src/types.rs | 19 +++- 2 files changed, 68 insertions(+), 48 deletions(-) diff --git a/eden/mononoke/megarepo_api/async_requests/src/queue.rs b/eden/mononoke/megarepo_api/async_requests/src/queue.rs index 67f0c97f2c..6699e63410 100644 --- a/eden/mononoke/megarepo_api/async_requests/src/queue.rs +++ b/eden/mononoke/megarepo_api/async_requests/src/queue.rs @@ -9,7 +9,7 @@ use anyhow::{anyhow, Error}; use blobstore::PutBehaviour; -use blobstore::{Blobstore, Loadable, Storable}; +use blobstore::{Blobstore, Storable}; use bookmarks::BookmarkName; use context::CoreContext; use megarepo_error::MegarepoError; @@ -24,7 +24,10 @@ use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; -use crate::types::{BlobstoreKeyWrapper, Request, ThriftParams, Token}; +use crate::types::{ + BlobstoreKeyWrapper, MegarepoAsynchronousRequestParams, MegarepoAsynchronousRequestResult, + Request, ThriftParams, Token, +}; const INITIAL_POLL_DELAY_MS: u64 = 1000; const MAX_POLL_DURATION: Duration = Duration::from_secs(60); @@ -55,9 +58,8 @@ impl AsyncMethodRequestQueue { ) -> Result<::Token, Error> { let request_type = RequestType(P::R::NAME.to_owned()); let target = thrift_params.target().clone(); - let rust_params: ::Params = thrift_params.try_into()?; - let params_object_id: ::ParamsId = - rust_params.store(&ctx, &self.blobstore).await?; + let rust_params: MegarepoAsynchronousRequestParams = thrift_params.into(); + let params_object_id = rust_params.store(&ctx, &self.blobstore).await?; let blobstore_key = BlobstoreKey(params_object_id.blobstore_key()); let table_id = self .table @@ -77,7 +79,7 @@ impl AsyncMethodRequestQueue { &self, ctx: &CoreContext, req_id: &RequestId, - ) -> Result::StoredResult>, MegarepoError> { + ) -> Result::ThriftResult>, MegarepoError> { let maybe_result_blobstore_key = match self.table.poll(ctx, req_id).await? { None => return Ok(None), Some((_, entry)) => entry.result_blobstore_key, @@ -93,11 +95,14 @@ impl AsyncMethodRequestQueue { } }; - let result_blobstore_id = - ::StoredResultId::parse_blobstore_key(&result_blobstore_key.0)?; - let result_obj: ::StoredResult = - result_blobstore_id.load(&ctx, &self.blobstore).await?; - Ok(Some(result_obj)) + let result: MegarepoAsynchronousRequestResult = + MegarepoAsynchronousRequestResult::load_from_key( + ctx, + &self.blobstore, + &result_blobstore_key.0, + ) + .await?; + Ok(Some(result.try_into()?)) } pub async fn poll( @@ -111,14 +116,14 @@ impl AsyncMethodRequestQueue { let req_id = RequestId(row_id, RequestType(T::R::NAME.to_owned())); loop { - let maybe_stored_result: Option<::StoredResult> = + let maybe_thrift_result: Option<::ThriftResult> = self.poll_once::(&ctx, &req_id).await?; let next_sleep = Duration::from_millis(rand::random::() % backoff_ms); - match maybe_stored_result { - Some(stored_result) => { + match maybe_thrift_result { + Some(thrift_result) => { // Nice, the result is ready! - return ::stored_result_into_poll_response(stored_result); + return ::thrift_result_into_poll_response(thrift_result); } None if before.elapsed() + next_sleep > MAX_POLL_DURATION => { // The result is not yet ready, but we're out of time @@ -137,7 +142,7 @@ impl AsyncMethodRequestQueue { #[cfg(test)] mod tests { use super::*; - use blobstore::{Loadable, Storable}; + use blobstore::Loadable; use context::CoreContext; use fbinit::FacebookInit; use requests_table::{ClaimedBy, RequestStatus}; @@ -150,25 +155,25 @@ mod tests { }; use crate::types::{ - MegarepoAddTargetParamsId, MegarepoChangeTargetConfigParamsId, - MegarepoRemergeSourceParamsId, MegarepoSyncChangesetParamsId, + MegarepoAsynchronousRequestParamsId, MegarepoAsynchronousRequestResult, ThriftResult, }; - use crate::types::{ - MegarepoAddTargetResult, MegarepoChangeTargetConfigResult, MegarepoRemergeSourceResult, - MegarepoSyncChangesetResult, + + use source_control::{ + MegarepoAddTargetResponse, MegarepoChangeTargetConfigResponse, + MegarepoRemergeSourceResponse, MegarepoSyncChangesetResponse, }; + use crate::types::{ - MegarepoAddTargetToken, MegarepoChangeTargetConfigToken, MegarepoRemergeSourceToken, - MegarepoSyncChangesetToken, + MegarepoAddSyncTarget, MegarepoChangeTargetConfig, MegarepoRemergeSource, + MegarepoSyncChangeset, }; macro_rules! test_enqueue_and_poll_once { { $fn_name: ident, + $request_struct: ident, $thrift_params: ident, - $token: ident, - $params_id: ident, - $result: ident, + $response: ident, $request_type: expr, } => { #[fbinit::test] @@ -178,7 +183,7 @@ mod tests { // Enqueue a request let params: $thrift_params = Default::default(); - let token: $token = q.enqueue(ctx.clone(), params.clone()).await?; + let token = q.enqueue(ctx.clone(), params.clone()).await?; // Verify that request metadata is in the db and has expected values let (row_id, _) = token.to_db_id_and_target()?; @@ -197,32 +202,34 @@ mod tests { ); // Verify that request params are in the blobstore - let id = $params_id::parse_blobstore_key(&entry.args_blobstore_key.0)?; - let args: $thrift_params = id.load(&ctx, &q.blobstore).await?.into(); - assert_eq!(args, params); + let id = MegarepoAsynchronousRequestParamsId::parse_blobstore_key(&entry.args_blobstore_key.0)?; + let args = id.load(&ctx, &q.blobstore).await?; + assert_eq!(args, params.into()); let req_id = RequestId(row_id, entry.request_type); // Verify that poll_once on this request in a "new" state // returns None - let new_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?; + let new_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?; assert!(new_poll.is_none()); // Verify that poll_once on this request in a "in_progress" state // returns None q.table.mark_in_progress(&ctx, &req_id, &ClaimedBy("test".to_string())).await?; - let in_progress_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?; + let in_progress_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?; assert!(in_progress_poll.is_none()); // Inject a result for this request // Verify that poll_once on this request in a "in_progress" state // returns injected result - let fake_result = $result::from_thrift(Default::default()); + let fake_response: Result<$response, MegarepoError> = Ok(Default::default()); + let fake_result: MegarepoAsynchronousRequestResult = fake_response.clone().into(); let fake_result_id = fake_result.clone().store(&ctx, &q.blobstore).await?; let fake_result_key = BlobstoreKey(fake_result_id.blobstore_key()); q.table.mark_ready(&ctx, &req_id, fake_result_key).await?; - let ready_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?; - assert_eq!(ready_poll, Some(fake_result)); + let ready_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?; + let ready_poll_response = ready_poll.unwrap().into_result(); + assert_eq!(ready_poll_response.unwrap(), fake_response.unwrap()); // After a successful poll, request is marked as polled let entry = q.table.test_get_request_entry_by_id(&ctx, &req_id.0).await?.unwrap(); @@ -235,37 +242,33 @@ mod tests { test_enqueue_and_poll_once! { test_enqueue_and_poll_once_add_target, + MegarepoAddSyncTarget, ThriftMegarepoAddTargetParams, - MegarepoAddTargetToken, - MegarepoAddTargetParamsId, - MegarepoAddTargetResult, + MegarepoAddTargetResponse, "megarepo_add_sync_target", } test_enqueue_and_poll_once! { test_enqueue_and_poll_once_sync_changeset, + MegarepoSyncChangeset, ThriftMegarepoSyncChangesetParams, - MegarepoSyncChangesetToken, - MegarepoSyncChangesetParamsId, - MegarepoSyncChangesetResult, + MegarepoSyncChangesetResponse, "megarepo_sync_changeset", } test_enqueue_and_poll_once! { test_enqueue_and_poll_once_change_config, + MegarepoChangeTargetConfig, ThriftMegarepoChangeTargetConfigParams, - MegarepoChangeTargetConfigToken, - MegarepoChangeTargetConfigParamsId, - MegarepoChangeTargetConfigResult, + MegarepoChangeTargetConfigResponse, "megarepo_change_target_config", } test_enqueue_and_poll_once! { test_enqueue_and_poll_once_remerge_source, + MegarepoRemergeSource, ThriftMegarepoRemergeSourceParams, - MegarepoRemergeSourceToken, - MegarepoRemergeSourceParamsId, - MegarepoRemergeSourceResult, + MegarepoRemergeSourceResponse, "megarepo_remerge_source", } } diff --git a/eden/mononoke/megarepo_api/async_requests/src/types.rs b/eden/mononoke/megarepo_api/async_requests/src/types.rs index 2222c15297..5f09d4fcee 100644 --- a/eden/mononoke/megarepo_api/async_requests/src/types.rs +++ b/eden/mononoke/megarepo_api/async_requests/src/types.rs @@ -6,7 +6,8 @@ */ use anyhow::{anyhow, Error, Result}; -use blobstore::{impl_loadable_storable, Loadable, Storable}; +use blobstore::{impl_loadable_storable, Blobstore, Loadable, Storable}; +use context::CoreContext; use fbthrift::compact_protocol; use megarepo_config::Target; use megarepo_error::MegarepoError; @@ -61,7 +62,9 @@ use source_control::{ MegarepoSyncChangesetToken as ThriftMegarepoSyncChangesetToken, }; use std::convert::TryFrom; +use std::convert::TryInto; use std::str::FromStr; +use std::sync::Arc; /// Grouping of types and behaviors for an asynchronous request pub trait Request: Sized + Send + Sync { @@ -225,6 +228,20 @@ macro_rules! impl_async_svc_stored_type { Self { id, thrift } } + pub async fn load_from_key(ctx: &CoreContext, blobstore: &Arc, key: &str) -> Result { + let bytes = blobstore.get(ctx, key).await?; + + let prefix = concat!("async.svc.", stringify!($value_type), ".blake2."); + if key.strip_prefix(prefix).is_none() { + return Err(MegarepoError::internal(anyhow!("{} is not a blobstore key for {}", key, stringify!($value_type)))); + } + + match bytes { + Some(bytes) => Ok(bytes.into_bytes().try_into()?), + None => Err(MegarepoError::internal(anyhow!("Missing blob: {}", key))), + } + } + pub fn handle(&self) -> &$handle_type { &self.id }