migrate to use new params and responses types

Summary: This diff switches megarepo_api to use the types created in the previous diffs

Reviewed By: krallin

Differential Revision: D28606338

fbshipit-source-id: c048f274395239528b3a0280172dd21052055111
This commit is contained in:
Mateusz Kwapich 2021-05-27 02:37:58 -07:00 committed by Facebook GitHub Bot
parent fe4014dcc3
commit 86b30a75fa
2 changed files with 68 additions and 48 deletions

View File

@ -9,7 +9,7 @@
use anyhow::{anyhow, Error};
use blobstore::PutBehaviour;
use blobstore::{Blobstore, Loadable, Storable};
use blobstore::{Blobstore, Storable};
use bookmarks::BookmarkName;
use context::CoreContext;
use megarepo_error::MegarepoError;
@ -24,7 +24,10 @@ use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::types::{BlobstoreKeyWrapper, Request, ThriftParams, Token};
use crate::types::{
BlobstoreKeyWrapper, MegarepoAsynchronousRequestParams, MegarepoAsynchronousRequestResult,
Request, ThriftParams, Token,
};
const INITIAL_POLL_DELAY_MS: u64 = 1000;
const MAX_POLL_DURATION: Duration = Duration::from_secs(60);
@ -55,9 +58,8 @@ impl AsyncMethodRequestQueue {
) -> Result<<P::R as Request>::Token, Error> {
let request_type = RequestType(P::R::NAME.to_owned());
let target = thrift_params.target().clone();
let rust_params: <P::R as Request>::Params = thrift_params.try_into()?;
let params_object_id: <P::R as Request>::ParamsId =
rust_params.store(&ctx, &self.blobstore).await?;
let rust_params: MegarepoAsynchronousRequestParams = thrift_params.into();
let params_object_id = rust_params.store(&ctx, &self.blobstore).await?;
let blobstore_key = BlobstoreKey(params_object_id.blobstore_key());
let table_id = self
.table
@ -77,7 +79,7 @@ impl AsyncMethodRequestQueue {
&self,
ctx: &CoreContext,
req_id: &RequestId,
) -> Result<Option<<R as Request>::StoredResult>, MegarepoError> {
) -> Result<Option<<R as Request>::ThriftResult>, MegarepoError> {
let maybe_result_blobstore_key = match self.table.poll(ctx, req_id).await? {
None => return Ok(None),
Some((_, entry)) => entry.result_blobstore_key,
@ -93,11 +95,14 @@ impl AsyncMethodRequestQueue {
}
};
let result_blobstore_id =
<R as Request>::StoredResultId::parse_blobstore_key(&result_blobstore_key.0)?;
let result_obj: <R as Request>::StoredResult =
result_blobstore_id.load(&ctx, &self.blobstore).await?;
Ok(Some(result_obj))
let result: MegarepoAsynchronousRequestResult =
MegarepoAsynchronousRequestResult::load_from_key(
ctx,
&self.blobstore,
&result_blobstore_key.0,
)
.await?;
Ok(Some(result.try_into()?))
}
pub async fn poll<T: Token>(
@ -111,14 +116,14 @@ impl AsyncMethodRequestQueue {
let req_id = RequestId(row_id, RequestType(T::R::NAME.to_owned()));
loop {
let maybe_stored_result: Option<<T::R as Request>::StoredResult> =
let maybe_thrift_result: Option<<T::R as Request>::ThriftResult> =
self.poll_once::<T::R>(&ctx, &req_id).await?;
let next_sleep = Duration::from_millis(rand::random::<u64>() % backoff_ms);
match maybe_stored_result {
Some(stored_result) => {
match maybe_thrift_result {
Some(thrift_result) => {
// Nice, the result is ready!
return <T::R as Request>::stored_result_into_poll_response(stored_result);
return <T::R as Request>::thrift_result_into_poll_response(thrift_result);
}
None if before.elapsed() + next_sleep > MAX_POLL_DURATION => {
// The result is not yet ready, but we're out of time
@ -137,7 +142,7 @@ impl AsyncMethodRequestQueue {
#[cfg(test)]
mod tests {
use super::*;
use blobstore::{Loadable, Storable};
use blobstore::Loadable;
use context::CoreContext;
use fbinit::FacebookInit;
use requests_table::{ClaimedBy, RequestStatus};
@ -150,25 +155,25 @@ mod tests {
};
use crate::types::{
MegarepoAddTargetParamsId, MegarepoChangeTargetConfigParamsId,
MegarepoRemergeSourceParamsId, MegarepoSyncChangesetParamsId,
MegarepoAsynchronousRequestParamsId, MegarepoAsynchronousRequestResult, ThriftResult,
};
use crate::types::{
MegarepoAddTargetResult, MegarepoChangeTargetConfigResult, MegarepoRemergeSourceResult,
MegarepoSyncChangesetResult,
use source_control::{
MegarepoAddTargetResponse, MegarepoChangeTargetConfigResponse,
MegarepoRemergeSourceResponse, MegarepoSyncChangesetResponse,
};
use crate::types::{
MegarepoAddTargetToken, MegarepoChangeTargetConfigToken, MegarepoRemergeSourceToken,
MegarepoSyncChangesetToken,
MegarepoAddSyncTarget, MegarepoChangeTargetConfig, MegarepoRemergeSource,
MegarepoSyncChangeset,
};
macro_rules! test_enqueue_and_poll_once {
{
$fn_name: ident,
$request_struct: ident,
$thrift_params: ident,
$token: ident,
$params_id: ident,
$result: ident,
$response: ident,
$request_type: expr,
} => {
#[fbinit::test]
@ -178,7 +183,7 @@ mod tests {
// Enqueue a request
let params: $thrift_params = Default::default();
let token: $token = q.enqueue(ctx.clone(), params.clone()).await?;
let token = q.enqueue(ctx.clone(), params.clone()).await?;
// Verify that request metadata is in the db and has expected values
let (row_id, _) = token.to_db_id_and_target()?;
@ -197,32 +202,34 @@ mod tests {
);
// Verify that request params are in the blobstore
let id = $params_id::parse_blobstore_key(&entry.args_blobstore_key.0)?;
let args: $thrift_params = id.load(&ctx, &q.blobstore).await?.into();
assert_eq!(args, params);
let id = MegarepoAsynchronousRequestParamsId::parse_blobstore_key(&entry.args_blobstore_key.0)?;
let args = id.load(&ctx, &q.blobstore).await?;
assert_eq!(args, params.into());
let req_id = RequestId(row_id, entry.request_type);
// Verify that poll_once on this request in a "new" state
// returns None
let new_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
let new_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?;
assert!(new_poll.is_none());
// Verify that poll_once on this request in a "in_progress" state
// returns None
q.table.mark_in_progress(&ctx, &req_id, &ClaimedBy("test".to_string())).await?;
let in_progress_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
let in_progress_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?;
assert!(in_progress_poll.is_none());
// Inject a result for this request
// Verify that poll_once on this request in a "in_progress" state
// returns injected result
let fake_result = $result::from_thrift(Default::default());
let fake_response: Result<$response, MegarepoError> = Ok(Default::default());
let fake_result: MegarepoAsynchronousRequestResult = fake_response.clone().into();
let fake_result_id = fake_result.clone().store(&ctx, &q.blobstore).await?;
let fake_result_key = BlobstoreKey(fake_result_id.blobstore_key());
q.table.mark_ready(&ctx, &req_id, fake_result_key).await?;
let ready_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
assert_eq!(ready_poll, Some(fake_result));
let ready_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?;
let ready_poll_response = ready_poll.unwrap().into_result();
assert_eq!(ready_poll_response.unwrap(), fake_response.unwrap());
// After a successful poll, request is marked as polled
let entry = q.table.test_get_request_entry_by_id(&ctx, &req_id.0).await?.unwrap();
@ -235,37 +242,33 @@ mod tests {
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_add_target,
MegarepoAddSyncTarget,
ThriftMegarepoAddTargetParams,
MegarepoAddTargetToken,
MegarepoAddTargetParamsId,
MegarepoAddTargetResult,
MegarepoAddTargetResponse,
"megarepo_add_sync_target",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_sync_changeset,
MegarepoSyncChangeset,
ThriftMegarepoSyncChangesetParams,
MegarepoSyncChangesetToken,
MegarepoSyncChangesetParamsId,
MegarepoSyncChangesetResult,
MegarepoSyncChangesetResponse,
"megarepo_sync_changeset",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_change_config,
MegarepoChangeTargetConfig,
ThriftMegarepoChangeTargetConfigParams,
MegarepoChangeTargetConfigToken,
MegarepoChangeTargetConfigParamsId,
MegarepoChangeTargetConfigResult,
MegarepoChangeTargetConfigResponse,
"megarepo_change_target_config",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_remerge_source,
MegarepoRemergeSource,
ThriftMegarepoRemergeSourceParams,
MegarepoRemergeSourceToken,
MegarepoRemergeSourceParamsId,
MegarepoRemergeSourceResult,
MegarepoRemergeSourceResponse,
"megarepo_remerge_source",
}
}

View File

@ -6,7 +6,8 @@
*/
use anyhow::{anyhow, Error, Result};
use blobstore::{impl_loadable_storable, Loadable, Storable};
use blobstore::{impl_loadable_storable, Blobstore, Loadable, Storable};
use context::CoreContext;
use fbthrift::compact_protocol;
use megarepo_config::Target;
use megarepo_error::MegarepoError;
@ -61,7 +62,9 @@ use source_control::{
MegarepoSyncChangesetToken as ThriftMegarepoSyncChangesetToken,
};
use std::convert::TryFrom;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::Arc;
/// Grouping of types and behaviors for an asynchronous request
pub trait Request: Sized + Send + Sync {
@ -225,6 +228,20 @@ macro_rules! impl_async_svc_stored_type {
Self { id, thrift }
}
pub async fn load_from_key(ctx: &CoreContext, blobstore: &Arc<dyn Blobstore>, key: &str) -> Result<Self, MegarepoError> {
let bytes = blobstore.get(ctx, key).await?;
let prefix = concat!("async.svc.", stringify!($value_type), ".blake2.");
if key.strip_prefix(prefix).is_none() {
return Err(MegarepoError::internal(anyhow!("{} is not a blobstore key for {}", key, stringify!($value_type))));
}
match bytes {
Some(bytes) => Ok(bytes.into_bytes().try_into()?),
None => Err(MegarepoError::internal(anyhow!("Missing blob: {}", key))),
}
}
pub fn handle(&self) -> &$handle_type {
&self.id
}