megarepo_api: turn tokens into target-containing structs

Summary:
I should've made them structs from the beginning, but of course I thought that
I know better and these tokens can definitely not be richer than just strings.

Well, it turns out we need them to be richer. Specific reason is that in theory
a single Mononoke (or scs_server) instance can run with multiple storage
configs. For us this means that one target's requests may be stored in one
db, while another target's requests - in another one. For blobstores this is
even used in practice, while for xdb it's just a theoretical thing, but we need
to support it nevertheless.

To do so, let's add the ability to query the target (and, correspondingly, the
Mononoke repo) from any king of params our async methods receive: ThriftParams
or Token implementors.

In addition, this diff really implements `AddScubaParams` and `AddScubaResponse` for more things than before, so there's that.

Finally, apart from making tokens structured, this diff also changes an interface in two more ways:
- it adds optional `message` fields to several params structs
- it adds `changesets_to_merge` to `MegarepoChangeTargetConfigParams`

Reviewed By: StanislavGlebik

Differential Revision: D28333999

fbshipit-source-id: 99bd19b040b59ee788ef661dda3171cc56254d33
This commit is contained in:
Kostia Balytskyi 2021-05-12 02:00:05 -07:00 committed by Facebook GitHub Bot
parent dee8e4f613
commit 7d06a54ff8
8 changed files with 258 additions and 46 deletions

View File

@ -60,6 +60,7 @@ impl AsyncMethodRequestQueue {
thrift_params: P,
) -> Result<<P::R as Request>::Token, Error> {
let request_type = RequestType(P::R::NAME.to_owned());
let target = thrift_params.target().clone();
let rust_params: <P::R as Request>::Params = thrift_params.try_into()?;
let params_object_id: <P::R as Request>::ParamsId =
rust_params.store(&ctx, &self.blobstore).await?;
@ -68,7 +69,7 @@ impl AsyncMethodRequestQueue {
.table
.add_request(&ctx, &request_type, &blobstoke_key)
.await?;
let token = <P::R as Request>::Token::from_db_id(table_id);
let token = <P::R as Request>::Token::from_db_id_and_target(table_id, target);
Ok(token)
}
@ -106,7 +107,8 @@ impl AsyncMethodRequestQueue {
) -> Result<<T::R as Request>::PollResponse, MegarepoError> {
let mut backoff_ms = INITIAL_POLL_DELAY_MS;
let before = Instant::now();
let req_id = RequestId(token.to_db_id()?, RequestType(T::R::NAME.to_owned()));
let (row_id, _target) = token.to_db_id_and_target()?;
let req_id = RequestId(row_id, RequestType(T::R::NAME.to_owned()));
loop {
let maybe_stored_result: Option<<T::R as Request>::StoredResult> =
@ -179,7 +181,7 @@ mod tests {
let token: $token = q.enqueue(ctx.clone(), params.clone()).await?;
// Verify that request metadata is in the db and has expected values
let row_id = token.to_db_id()?;
let (row_id, _) = token.to_db_id_and_target()?;
let entry = q
.table
.test_get_request_entry_by_id(&ctx, &row_id)

View File

@ -117,13 +117,18 @@ pub trait ThriftParams: Sized + Send + Sync {
}
/// Polling token for an async service method
pub trait Token: Sized + Send + Sync {
pub trait Token: Clone + Sized + Send + Sync {
type R: Request<Token = Self>;
type ThriftToken;
fn into_thrift(self) -> Self::ThriftToken;
fn from_db_id(id: RowId) -> Self;
fn to_db_id(&self) -> Result<RowId, MegarepoError>;
fn from_db_id_and_target(id: RowId, target: Target) -> Self;
fn to_db_id_and_target(&self) -> Result<(RowId, Target), MegarepoError>;
/// Every Token referes to some Target
/// This method is needed to extract it from the
/// implementor of this trait
fn target(&self) -> &Target;
}
/// This macro implements an async service method type,
@ -288,6 +293,7 @@ macro_rules! impl_async_svc_method_types {
context_type => $result_context_type,
}
#[derive(Clone)]
pub struct $token_type(pub $token_thrift_type);
impl ThriftParams for $params_value_thrift_type {
@ -302,24 +308,32 @@ macro_rules! impl_async_svc_method_types {
type ThriftToken = $token_thrift_type;
type R = $request_struct;
fn from_db_id(id: RowId) -> Self {
fn from_db_id_and_target(id: RowId, target: Target) -> Self {
// Thrift token is a string alias
// but's guard ourselves here against
// it changing unexpectedly.
let thrift_token: $token_thrift_type = format!("{}", id.0);
let thrift_token = $token_thrift_type {
target,
id: id.0 as i64
};
Self(thrift_token)
}
fn to_db_id(&self) -> Result<RowId, MegarepoError> {
self.0
.parse::<u64>()
.map_err(MegarepoError::request)
.map(RowId)
fn to_db_id_and_target(&self) -> Result<(RowId, Target), MegarepoError> {
let row_id = self.0.id as u64;
let row_id = RowId(row_id);
let target = self.0.target.clone();
Ok((row_id, target))
}
fn into_thrift(self) -> $token_thrift_type {
self.0
}
fn target(&self) -> &Target {
&self.0.target
}
}
impl From<Result<$response_type, MegarepoError>> for $result_value_type {

View File

@ -919,10 +919,31 @@ struct RepoListHgManifestParams {
}
// Polling tokens for async megarepo methods
typedef string MegarepoChangeConfigToken
typedef string MegarepoSyncChangesetToken
typedef string MegarepoRemergeSourceToken
typedef string MegarepoAddTargetToken
struct MegarepoChangeConfigToken {
// A target this token relates to
1: megarepo_configs.Target target,
// An actual token payload
2: i64 id,
}
struct MegarepoSyncChangesetToken {
// A target this token relates to
1: megarepo_configs.Target target,
// An actual token payload
2: i64 id,
}
struct MegarepoRemergeSourceToken{
// A target this token relates to
1: megarepo_configs.Target target,
// An actual token payload
2: i64 id,
}
struct MegarepoAddTargetToken {
// A target this token relates to
1: megarepo_configs.Target target,
// An actual token payload
2: i64 id,
}
// Params for the megarepo_add_sync_target_config method
struct MegarepoAddConfigParams {
@ -953,6 +974,9 @@ struct MegarepoAddTargetParams {
// to the source revision if it is a changeset itself
// Each source name MUST be present in this map.
2: map<string, megarepo_configs.ChangesetId> changesets_to_merge
// A message to be used in the commit description
// If not provided, service will generate commit description
3: optional string message,
}
// Params for megarepo_change_target_config method
@ -968,6 +992,14 @@ struct MegarepoChangeTargetConfigParams {
// when this operation tries to advance it
// This argument exists to prevent race conditions
3: megarepo_configs.ChangesetId target_location,
// Initial changesets to merge for each of the
// sources in the `target`. Similar to `changesets_to_merge`
// in the `MegarepoAddTargetParams` struct, see docstring
// there
4: map<string, megarepo_configs.ChangesetId> changesets_to_merge
// A message to be used in the commit description
// If not provided, service will generate commit description
5: optional string message,
}
// Params for megarepo_sync_changeset method
@ -1000,6 +1032,9 @@ struct MegarepoRemergeSourceParams {
// when this operation tries to advance it
// This argument exists to prevent race conditions
4: megarepo_configs.ChangesetId target_location,
// A message to be used in the commit description
// If not provided, service will generate commit description
5: optional string message,
}
// Method response structures
@ -1200,6 +1235,9 @@ struct MegarepoAddConfigResponse {
}
struct MegarepoAddTargetResponse {
// A new position of the target bookmark
// after the "sync changeset" operaton finished
1: megarepo_configs.ChangesetId cs_id
}
struct MegarepoAddTargetPollResponse {

View File

@ -45,6 +45,7 @@ mod history;
mod into_response;
mod methods;
mod monitoring;
mod scuba_common;
mod scuba_params;
mod scuba_response;
mod source_control_impl;

View File

@ -23,7 +23,7 @@ use crate::source_control_impl::SourceControlServiceImpl;
/// become much more expensive and will utilize the async
/// request approach. Still, we want to expose the incomplete
/// version of this call, for our clients to test.
const FAKE_ADD_TARGET_TOKEN: &str = "FAKE_ADD_TARGET_TOKEN";
const FAKE_ADD_TARGET_TOKEN: i64 = -36;
impl SourceControlServiceImpl {
fn verify_repos_by_config(
@ -71,13 +71,19 @@ impl SourceControlServiceImpl {
) -> Result<thrift::MegarepoAddTargetToken, errors::ServiceError> {
let config = params.config_with_new_target;
self.verify_repos_by_config(&config)?;
// TODO (ikostia): stop using the fake taken
// TODO (ikostia, stash, mitrandir): stop using the fake token
let megarepo_configs = self.mononoke.megarepo_configs();
let target = config.target.clone();
megarepo_configs
.add_target_with_config_version(ctx, config)
.await?;
Ok(FAKE_ADD_TARGET_TOKEN.to_owned())
let token = thrift::MegarepoAddTargetToken {
id: FAKE_ADD_TARGET_TOKEN,
target,
};
Ok(token)
}
pub(crate) async fn megarepo_add_sync_target_poll(
@ -85,10 +91,14 @@ impl SourceControlServiceImpl {
_ctx: CoreContext,
token: thrift::MegarepoAddTargetToken,
) -> Result<thrift::MegarepoAddTargetPollResponse, errors::ServiceError> {
// TODO (ikostia): stop using the fake taken
if token == FAKE_ADD_TARGET_TOKEN {
// TODO (ikostia, stash, mitrandir): stop using the fake token
if token.id == FAKE_ADD_TARGET_TOKEN {
Ok(thrift::MegarepoAddTargetPollResponse {
response: Some(thrift::MegarepoAddTargetResponse {}),
response: Some(thrift::MegarepoAddTargetResponse {
// This is obviously incorrect and should be removed together
// with the fake token
cs_id: Default::default(),
}),
})
} else {
Err(errors::ServiceError::from(errors::not_implemented(

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use megarepo_config::Target;
use scuba_ext::MononokeScubaSampleBuilder;
pub(crate) fn hex(v: &[u8]) -> String {
faster_hex::hex_string(v).expect("hex_string should never fail")
}
pub(crate) enum Reported {
Param,
Response,
}
pub(crate) fn report_megarepo_target(
target: &Target,
scuba: &mut MononokeScubaSampleBuilder,
reported: Reported,
) {
match reported {
Reported::Param => {
scuba.add("param_megarepo_target_bookmark", target.bookmark.clone());
scuba.add("param_megarepo_target_repo_id", target.repo_id);
}
Reported::Response => {
scuba.add("response_megarepo_target_bookmark", target.bookmark.clone());
scuba.add("response_megarepo_target_repo_id", target.repo_id);
}
}
}

View File

@ -11,6 +11,7 @@ use scuba_ext::{MononokeScubaSampleBuilder, ScubaValue};
use source_control as thrift;
use crate::commit_id::CommitIdExt;
use crate::scuba_common::{hex, report_megarepo_target, Reported};
/// A trait for logging a thrift `Params` struct to scuba.
///
@ -279,10 +280,7 @@ impl AddScubaParams for thrift::FileInfoParams {}
impl AddScubaParams for thrift::FileDiffParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add(
"other_file",
faster_hex::hex_string(&self.other_file_id).expect("hex_string should never fail"),
);
scuba.add("other_file", hex(&self.other_file_id));
scuba.add("param_format", self.format.to_string());
scuba.add("param_context", self.context);
}
@ -297,22 +295,79 @@ impl AddScubaParams for thrift::TreeListParams {
impl AddScubaParams for thrift::RepoListHgManifestParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add(
"hg_manifest_id",
faster_hex::hex_string(&self.hg_manifest_id).expect("hex_string should never fail"),
);
scuba.add("hg_manifest_id", hex(&self.hg_manifest_id));
}
}
// Various Megarepo polling tokens are just aliases for String
impl AddScubaParams for String {}
impl AddScubaParams for thrift::MegarepoAddTargetToken {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoSyncChangesetParams {}
impl AddScubaParams for thrift::MegarepoChangeConfigToken {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoRemergeSourceParams {}
impl AddScubaParams for thrift::MegarepoRemergeSourceToken {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoChangeTargetConfigParams {}
impl AddScubaParams for thrift::MegarepoSyncChangesetToken {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoAddTargetParams {}
impl AddScubaParams for thrift::MegarepoSyncChangesetParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_source_name", self.source_name.clone());
scuba.add("param_megarepo_cs_id", hex(&self.cs_id));
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoAddConfigParams {}
impl AddScubaParams for thrift::MegarepoRemergeSourceParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_source_name", self.source_name.clone());
scuba.add("param_megarepo_cs_id", hex(&self.cs_id));
scuba.add("param_megarepo_target_location", hex(&self.target_location));
scuba.add("param_megarepo_message", self.message.clone());
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoChangeTargetConfigParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_version", self.new_version.clone());
scuba.add("param_megarepo_message", self.message.clone());
scuba.add("param_megarepo_target_location", hex(&self.target_location));
report_megarepo_target(&self.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoAddTargetParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add(
"param_megarepo_version",
self.config_with_new_target.version.clone(),
);
scuba.add("param_megarepo_message", self.message.clone());
report_megarepo_target(&self.config_with_new_target.target, scuba, Reported::Param);
}
}
impl AddScubaParams for thrift::MegarepoAddConfigParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_megarepo_version", self.new_config.version.clone());
report_megarepo_target(&self.new_config.target, scuba, Reported::Param);
}
}

View File

@ -9,6 +9,7 @@ use scuba_ext::MononokeScubaSampleBuilder;
use source_control as thrift;
use crate::commit_id::CommitIdExt;
use crate::scuba_common::{report_megarepo_target, Reported};
/// A trait for logging a thrift `Response` struct to scuba.
pub(crate) trait AddScubaResponse: Send + Sync {
@ -75,10 +76,6 @@ impl AddScubaResponse for thrift::TreeListResponse {}
impl AddScubaResponse for thrift::RepoListHgManifestResponse {}
// MegarepoRemergeSourceToken, MegarepoChangeConfigToken
// MegarepoSyncChangesetToken are all just aliases for String
impl AddScubaResponse for String {}
// TODO: report cs_ids where possible
impl AddScubaResponse for thrift::MegarepoRemergeSourceResponse {}
@ -90,10 +87,70 @@ impl AddScubaResponse for thrift::MegarepoAddTargetResponse {}
impl AddScubaResponse for thrift::MegarepoAddConfigResponse {}
impl AddScubaResponse for thrift::MegarepoRemergeSourcePollResponse {}
// Helper fn to report PollResponse types
fn report_maybe_response<R: AddScubaResponse>(
maybe_response: &Option<R>,
scuba: &mut MononokeScubaSampleBuilder,
) {
match maybe_response {
None => {
scuba.add("megarepo_ready", false);
}
Some(resp) => {
scuba.add("megarepo_ready", true);
<R as AddScubaResponse>::add_scuba_response(&resp, scuba);
}
}
}
impl AddScubaResponse for thrift::MegarepoSyncChangesetPollResponse {}
impl AddScubaResponse for thrift::MegarepoRemergeSourcePollResponse {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
report_maybe_response(&self.response, scuba);
}
}
impl AddScubaResponse for thrift::MegarepoChangeTargetConfigPollResponse {}
impl AddScubaResponse for thrift::MegarepoSyncChangesetPollResponse {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
report_maybe_response(&self.response, scuba);
}
}
impl AddScubaResponse for thrift::MegarepoAddTargetPollResponse {}
impl AddScubaResponse for thrift::MegarepoChangeTargetConfigPollResponse {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
report_maybe_response(&self.response, scuba);
}
}
impl AddScubaResponse for thrift::MegarepoAddTargetPollResponse {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
report_maybe_response(&self.response, scuba);
}
}
impl AddScubaResponse for thrift::MegarepoAddTargetToken {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Response);
}
}
impl AddScubaResponse for thrift::MegarepoChangeConfigToken {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Response);
}
}
impl AddScubaResponse for thrift::MegarepoRemergeSourceToken {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Response);
}
}
impl AddScubaResponse for thrift::MegarepoSyncChangesetToken {
fn add_scuba_response(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("megarepo_token", self.id);
report_megarepo_target(&self.target, scuba, Reported::Response);
}
}