megarepo: basic version of async-requests crate

Summary:
This crate is a foundation for the async requests support in megarepo service.

The idea is to be able to store serialized parameters in the blobstore upon
request arrival, and to be able to query request results from the blobstore
while polling.

This diff manipulates the following classes of types:
- param types for async methods: self-explanatory
- response types: these contain only a resulting value of a completed successful execution
- stored result types: these contain a result value of a completed execution. It may either be successful or failed. These types exist for the purpose of preserving execution result in the blobstore.
- poll-response types: these contain and option of a response. If the optional value is empty, this means that the request is not yet ready
- polling tokens: these are used by the client to ask about the processing status for a submitted request

Apart from that, some of these types have both Rust and Thrift counterparts, mainly for the purposes of us being able to implement traits for Rust types.

Relationships between these types are encoded in various traits and their associated types.

The lifecycle of an async request is as follows therefore:
1. the request is submitted by the client, and enqueued
   1. params are serialized and saved into a blobstore
   1. an entry is created in the SQL table
   1. the key from that table is used to create a polling token
1. some external system processes a request [completely absent form this diff]
   1. it notices a new entry in the queue
   1. it reads request's params from the blobstore
   1. it processes the request
   1. it preserves either a success of a failure of the request into the blobstore
   1. it updates the SQL table to mention that the request is now ready to be polled
1. the client polls the request
   1. queue struct receives a polling token
   1. out of that token it constructs DB keys
   1. it looks up the request row and checks if it is in the ready state
   1. if that is the case, it reads the result_blobstore_key value and fetches serialized result object
   1. now it has to turn this serialized result into a poll response:
       1. if the result is absent, poll response is a success with an empty payload
       1. if the result is present and successful, poll response is a success with the result's successful variant as  a payload
       1. if the result is present and is a failure, the polling call throws a thrift exception with that failure

Note: Why is there yet another .thrift file introduced in this diff? I felt like these types aren't a part of the scs interface, so they don't belong in `source_control.thrift`. On the other hand, they wrap things defined in `source_control.thrift,` so I needed to include it.

Reviewed By: StanislavGlebik

Differential Revision: D27964822

fbshipit-source-id: fc1a33a799d01c908bbe18a5394eba034b780174
This commit is contained in:
Kostia Balytskyi 2021-05-10 06:50:29 -07:00 committed by Facebook GitHub Bot
parent cce23856fc
commit f10ef62cba
12 changed files with 1093 additions and 1 deletions

View File

@ -273,7 +273,9 @@ members = [
"lfs_server",
"load_limiter",
"manifest",
"megarepo_api/async_requests",
"megarepo_api/configo_client",
"megarepo_api/if",
"megarepo_api/megarepo_config",
"megarepo_api/megarepo_error",
"megarepo_api/requests_table",

View File

@ -7,7 +7,7 @@
#[macro_export]
macro_rules! impl_blobstore_conversions {
($ty:ident, $thrift_ty:ident) => {
($ty:ident, $thrift_ty:ty) => {
impl $crate::private::TryFrom<$crate::private::BlobstoreBytes> for $ty {
type Error = $crate::private::Error;

View File

@ -0,0 +1,49 @@
[package]
name = "async_requests"
version = "0.1.0"
authors = ["Facebook"]
edition = "2018"
license = "GPLv2+"
[dependencies]
anyhow = "1.0"
blobstore = { version = "0.1.0", path = "../../blobstore" }
context = { version = "0.1.0", path = "../../server/context" }
fbthrift = { version = "0.0.1+unstable", git = "https://github.com/facebook/fbthrift.git", branch = "master" }
megarepo_error = { version = "0.1.0", path = "../megarepo_error" }
megarepo_types_thrift = { version = "0.1.0", path = "../if" }
memblob = { version = "0.1.0", path = "../../blobstore/memblob" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
mononoke_types_thrift = { version = "0.1.0", path = "../../mononoke_types/if" }
rand = { version = "0.7", features = ["small_rng"] }
requests_table = { version = "0.1.0", path = "../requests_table" }
source_control = { version = "0.1.0", path = "../../scs/if" }
sql_construct = { version = "0.1.0", path = "../../common/sql_construct" }
tokio = { version = "1.4", features = ["full", "test-util"] }
[dev-dependencies]
fbinit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit-tokio = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
[patch.crates-io]
addr2line = { git = "https://github.com/gimli-rs/addr2line.git", rev = "0b6b6018b5b252a18e628fba03885f7d21844b3c" }
bytecount = { git = "https://github.com/llogiq/bytecount", rev = "469eaf8395c99397cd64d059737a9054aa014088" }
chashmap = { git = "https://gitlab.redox-os.org/ahornby/chashmap", rev = "901ace2ca3cdbc2095adb1af111d211e254e2aae" }
const-random = { git = "https://github.com/fbsource/const-random", rev = "374c5b46427fe2ffbf6acbd9c1687e0f1a809f95" }
curl = { git = "https://github.com/kulshrax/curl-rust", rev = "2a15bbd8dbbd54734313fa703a64db7ce6ddaff0" }
curl-sys = { git = "https://github.com/kulshrax/curl-rust", rev = "2a15bbd8dbbd54734313fa703a64db7ce6ddaff0" }
enumset = { git = "https://github.com/danobi/enumset", rev = "4c01c583c27a725948fededbfb3461c572a669a4" }
lru-disk-cache = { git = "https://github.com/mozilla/sccache", rev = "033ebaae69beeb0ac04e8c35d6ff1103487bd9a3" }
mysql_common = { git = "https://github.com/iammxt/rust_mysql_common", rev = "0e4c86952f1e799960e736c0b2bb9d2a6d935bf1" }
openssl = { git = "https://github.com/sfackler/rust-openssl", rev = "68fc8ba890d77986b06ca5ce29d1089285fbbcf9" }
openssl-sys = { git = "https://github.com/sfackler/rust-openssl", rev = "68fc8ba890d77986b06ca5ce29d1089285fbbcf9" }
petgraph = { git = "https://github.com/jkeljo/petgraph", rev = "e3e9dd8632d23973fdc0b42c1117d5e5fc5fa384" }
prost = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-build = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-derive = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-types = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
r2d2_sqlite = { git = "https://github.com/jsgf/r2d2-sqlite.git", rev = "6d77a828ca0a3c507a3f58561532a1b6c66c7918" }
rustfilt = { git = "https://github.com/jsgf/rustfilt.git", rev = "8141fa7f1caee562ee8daffb2ddeca3d1f0d36e5" }
tokio-core = { git = "https://github.com/bolinfest/tokio-core", rev = "5f37aa3c627d56ee49154bc851d6930f5ab4398f" }
toml = { git = "https://github.com/jsgf/toml-rs", branch = "dotted-table-0.5.7" }

View File

@ -0,0 +1,269 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
#![deny(warnings)]
use anyhow::{anyhow, Error};
use blobstore::PutBehaviour;
use blobstore::{Blobstore, Loadable, Storable};
use context::CoreContext;
use megarepo_error::MegarepoError;
use memblob::Memblob;
use requests_table::{
BlobstoreKey, LongRunningRequestsQueue, RequestId, RequestType, SqlLongRunningRequestsQueue,
};
use sql_construct::SqlConstruct;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub mod types;
pub mod tokens {
pub use crate::types::{
MegarepoAddTargetToken, MegarepoChangeTargetConfigToken, MegarepoRemergeSourceToken,
MegarepoSyncChangesetToken,
};
}
use types::{BlobstoreKeyWrapper, Request, ThriftParams, Token};
const INITIAL_POLL_DELAY_MS: u64 = 1000;
const MAX_POLL_DURATION: Duration = Duration::from_secs(60);
#[derive(Clone)]
pub struct AsyncMethodRequestQueue {
blobstore: Arc<dyn Blobstore>,
table: Arc<dyn LongRunningRequestsQueue>,
}
impl AsyncMethodRequestQueue {
pub fn new() -> Result<Self, Error> {
unimplemented!("prod queue instantiation");
}
pub fn new_test_in_memory() -> Result<Self, Error> {
let blobstore: Arc<dyn Blobstore> = Arc::new(Memblob::new(PutBehaviour::IfAbsent));
let table: Arc<dyn LongRunningRequestsQueue> =
Arc::new(SqlLongRunningRequestsQueue::with_sqlite_in_memory()?);
Ok(Self { blobstore, table })
}
pub async fn enqueue<P: ThriftParams>(
&self,
ctx: CoreContext,
thrift_params: P,
) -> Result<<P::R as Request>::Token, Error> {
let request_type = RequestType(P::R::NAME.to_owned());
let rust_params: <P::R as Request>::Params = thrift_params.try_into()?;
let params_object_id: <P::R as Request>::ParamsId =
rust_params.store(&ctx, &self.blobstore).await?;
let blobstoke_key = BlobstoreKey(params_object_id.blobstore_key());
let table_id = self
.table
.add_request(&ctx, &request_type, &blobstoke_key)
.await?;
let token = <P::R as Request>::Token::from_db_id(table_id);
Ok(token)
}
async fn poll_once<R: Request>(
&self,
ctx: &CoreContext,
req_id: &RequestId,
) -> Result<Option<<R as Request>::StoredResult>, MegarepoError> {
let maybe_result_blobstore_key = match self.table.poll(ctx, req_id).await? {
None => return Ok(None),
Some((_, entry)) => entry.result_blobstore_key,
};
let result_blobstore_key = match maybe_result_blobstore_key {
Some(rbk) => rbk,
None => {
return Err(MegarepoError::internal(anyhow!(
"Programming error: successful poll with empty result_blobstore_key for {:?}",
req_id
)));
}
};
let result_blobstore_id =
<R as Request>::StoredResultId::parse_blobstore_key(&result_blobstore_key.0)?;
let result_obj: <R as Request>::StoredResult =
result_blobstore_id.load(&ctx, &self.blobstore).await?;
Ok(Some(result_obj))
}
pub async fn poll<T: Token>(
&self,
ctx: CoreContext,
token: T,
) -> Result<<T::R as Request>::PollResponse, MegarepoError> {
let mut backoff_ms = INITIAL_POLL_DELAY_MS;
let before = Instant::now();
let req_id = RequestId(token.to_db_id()?, RequestType(T::R::NAME.to_owned()));
loop {
let maybe_stored_result: Option<<T::R as Request>::StoredResult> =
self.poll_once::<T::R>(&ctx, &req_id).await?;
let next_sleep = Duration::from_millis(rand::random::<u64>() % backoff_ms);
match maybe_stored_result {
Some(stored_result) => {
// Nice, the result is ready!
return <T::R as Request>::stored_result_into_poll_response(stored_result);
}
None if before.elapsed() + next_sleep > MAX_POLL_DURATION => {
// The result is not yet ready, but we're out of time
return Ok(T::R::empty_poll_response());
}
None => {
// The result is not yet ready and we can wait a little longer
tokio::time::sleep(next_sleep).await;
backoff_ms *= 2;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use blobstore::{Loadable, Storable};
use context::CoreContext;
use fbinit::FacebookInit;
use requests_table::RequestStatus;
use source_control::{
MegarepoAddTargetParams as ThriftMegarepoAddTargetParams,
MegarepoChangeTargetConfigParams as ThriftMegarepoChangeTargetConfigParams,
MegarepoRemergeSourceParams as ThriftMegarepoRemergeSourceParams,
MegarepoSyncChangesetParams as ThriftMegarepoSyncChangesetParams,
};
use crate::types::{
MegarepoAddTargetParamsId, MegarepoChangeTargetConfigParamsId,
MegarepoRemergeSourceParamsId, MegarepoSyncChangesetParamsId,
};
use crate::types::{
MegarepoAddTargetResult, MegarepoChangeTargetConfigResult, MegarepoRemergeSourceResult,
MegarepoSyncChangesetResult,
};
use crate::types::{
MegarepoAddTargetToken, MegarepoChangeTargetConfigToken, MegarepoRemergeSourceToken,
MegarepoSyncChangesetToken,
};
macro_rules! test_enqueue_and_poll_once {
{
$fn_name: ident,
$thrift_params: ident,
$token: ident,
$params_id: ident,
$result: ident,
$request_type: expr,
} => {
#[fbinit::test]
async fn $fn_name(fb: FacebookInit) -> Result<(), Error> {
let q = AsyncMethodRequestQueue::new_test_in_memory().unwrap();
let ctx = CoreContext::test_mock(fb);
// Enqueue a request
let params: $thrift_params = Default::default();
let token: $token = q.enqueue(ctx.clone(), params.clone()).await?;
// Verify that request metadata is in the db and has expected values
let row_id = token.to_db_id()?;
let entry = q
.table
.test_get_request_entry_by_id(&ctx, &row_id)
.await?
.expect("Request is mising in the DB");
assert_eq!(entry.status, RequestStatus::New);
assert_eq!(entry.started_processing_at, None);
assert_eq!(entry.ready_at, None);
assert_eq!(entry.polled_at, None);
assert_eq!(
entry.request_type,
RequestType($request_type.to_string())
);
// Verify that request params are in the blobstore
let id = $params_id::parse_blobstore_key(&entry.args_blobstore_key.0)?;
let args: $thrift_params = id.load(&ctx, &q.blobstore).await?.into();
assert_eq!(args, params);
let req_id = RequestId(row_id, entry.request_type);
// Verify that poll_once on this request in a "new" state
// returns None
let new_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
assert!(new_poll.is_none());
// Verify that poll_once on this request in a "in_progress" state
// returns None
q.table.mark_in_progress(&ctx, &req_id).await?;
let in_progress_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
assert!(in_progress_poll.is_none());
// Inject a result for this request
// Verify that poll_once on this request in a "in_progress" state
// returns injected result
let fake_result = $result::from_thrift(Default::default());
let fake_result_id = fake_result.clone().store(&ctx, &q.blobstore).await?;
let fake_result_key = BlobstoreKey(fake_result_id.blobstore_key());
q.table.mark_ready(&ctx, &req_id, fake_result_key).await?;
let ready_poll = q.poll_once::<<$token as Token>::R>(&ctx, &req_id).await?;
assert_eq!(ready_poll, Some(fake_result));
// After a successful poll, request is marked as polled
let entry = q.table.test_get_request_entry_by_id(&ctx, &req_id.0).await?.unwrap();
assert_eq!(entry.status, RequestStatus::Polled);
Ok(())
}
}
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_add_target,
ThriftMegarepoAddTargetParams,
MegarepoAddTargetToken,
MegarepoAddTargetParamsId,
MegarepoAddTargetResult,
"megarepo_add_sync_target",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_sync_changeset,
ThriftMegarepoSyncChangesetParams,
MegarepoSyncChangesetToken,
MegarepoSyncChangesetParamsId,
MegarepoSyncChangesetResult,
"megarepo_sync_changeset",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_change_config,
ThriftMegarepoChangeTargetConfigParams,
MegarepoChangeTargetConfigToken,
MegarepoChangeTargetConfigParamsId,
MegarepoChangeTargetConfigResult,
"megarepo_change_target_config",
}
test_enqueue_and_poll_once! {
test_enqueue_and_poll_once_remerge_source,
ThriftMegarepoRemergeSourceParams,
MegarepoRemergeSourceToken,
MegarepoRemergeSourceParamsId,
MegarepoRemergeSourceResult,
"megarepo_remerge_source",
}
}

View File

@ -0,0 +1,579 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Error, Result};
use blobstore::{impl_loadable_storable, Loadable, Storable};
use fbthrift::compact_protocol;
use megarepo_error::MegarepoError;
use megarepo_types_thrift::{
MegarepoAddTargetParamsId as ThriftMegarepoAddTargetParamsId,
MegarepoChangeTargetConfigParamsId as ThriftMegarepoChangeTargetConfigParamsId,
MegarepoRemergeSourceParamsId as ThriftMegarepoRemergeSourceParamsId,
MegarepoSyncChangesetParamsId as ThriftMegarepoSyncChangesetParamsId,
};
use megarepo_types_thrift::{
MegarepoAddTargetResult as ThriftMegarepoAddTargetResult,
MegarepoChangeTargetConfigResult as ThriftMegarepoChangeTargetConfigResult,
MegarepoRemergeSourceResult as ThriftMegarepoRemergeSourceResult,
MegarepoSyncChangesetResult as ThriftMegarepoSyncChangesetResult,
};
use megarepo_types_thrift::{
MegarepoAddTargetResultId as ThriftMegarepoAddTargetResultId,
MegarepoChangeTargetConfigResultId as ThriftMegarepoChangeTargetConfigResultId,
MegarepoRemergeSourceResultId as ThriftMegarepoRemergeSourceResultId,
MegarepoSyncChangesetResultId as ThriftMegarepoSyncChangesetResultId,
};
use mononoke_types::{hash::Blake2, impl_typed_context, impl_typed_hash_no_context};
use requests_table::RowId;
use source_control::{
MegarepoAddTargetParams as ThriftMegarepoAddTargetParams,
MegarepoChangeTargetConfigParams as ThriftMegarepoChangeTargetConfigParams,
MegarepoRemergeSourceParams as ThriftMegarepoRemergeSourceParams,
MegarepoSyncChangesetParams as ThriftMegarepoSyncChangesetParams,
};
use source_control::{
MegarepoAddTargetPollResponse as ThriftMegarepoAddTargetPollResponse,
MegarepoChangeTargetConfigPollResponse as ThriftMegarepoChangeTargetConfigPollResponse,
MegarepoRemergeSourcePollResponse as ThriftMegarepoRemergeSourcePollResponse,
MegarepoSyncChangesetPollResponse as ThriftMegarepoSyncChangesetPollResponse,
};
use source_control::{
MegarepoAddTargetResponse as ThriftMegarepoAddTargetResponse,
MegarepoChangeTargetConfigResponse as ThriftMegarepoChangeTargetConfigResponse,
MegarepoRemergeSourceResponse as ThriftMegarepoRemergeSourceResponse,
MegarepoSyncChangesetResponse as ThriftMegarepoSyncChangesetResponse,
};
use source_control::{
MegarepoAddTargetToken as ThriftMegarepoAddTargetToken,
MegarepoChangeConfigToken as ThriftMegarepoChangeConfigToken,
MegarepoRemergeSourceToken as ThriftMegarepoRemergeSourceToken,
MegarepoSyncChangesetToken as ThriftMegarepoSyncChangesetToken,
};
use std::convert::TryFrom;
use std::str::FromStr;
/// Grouping of types and behaviors for an asynchronous request
pub trait Request: Sized + Send + Sync {
/// Name of the request
const NAME: &'static str;
/// Id type for request stored result
type StoredResultId: FromStr<Err = Error>
+ Loadable<Value = Self::StoredResult>
+ BlobstoreKeyWrapper;
/// Id type for request params
type ParamsId: FromStr<Err = Error> + Loadable<Value = Self::Params> + BlobstoreKeyWrapper;
/// Rust newtype for a polling token
type Token: Token;
/// Rust type for request params
type Params: Storable<Key = Self::ParamsId> + TryFrom<Self::ThriftParams, Error = Error>;
/// Underlying thrift type for request params
type ThriftParams: ThriftParams<R = Self>;
/// Rust type for request result (response or error),
/// stored in a blobstore
type StoredResult: Storable<Key = Self::StoredResultId>;
/// A type representing potentially present response
type PollResponse;
/// Convert stored result into a result of a poll response
/// Stored result is a serialization of either a successful
/// respone, or an error. Poll response cannot convey an error,
/// so we use result of a poll response to do so.
/// Note that this function should return either a
/// non-empty poll-response, or an error
fn stored_result_into_poll_response(
sr: Self::StoredResult,
) -> Result<Self::PollResponse, MegarepoError>;
/// Return an empty poll response. This indicates
/// that the request hasn't been processed yet
fn empty_poll_response() -> Self::PollResponse;
}
/// A type, which can be parsed from a blobstore key,
/// and from which a blobstore key can be produced
/// (this is implemented by various handle types, where
/// blobstore key consists of two things: a hash
/// and a string, describing what the key refers to)
pub trait BlobstoreKeyWrapper: FromStr<Err = Error> {
fn blobstore_key(&self) -> String;
fn parse_blobstore_key(key: &str) -> Result<Self, Error>;
}
/// Thrift type representing async service method parameters
pub trait ThriftParams: Sized + Send + Sync {
type R: Request<ThriftParams = Self>;
}
/// Polling token for an async service method
pub trait Token: Sized + Send + Sync {
type R: Request<Token = Self>;
type ThriftToken;
fn into_thrift(self) -> Self::ThriftToken;
fn from_db_id(id: RowId) -> Self;
fn to_db_id(&self) -> Result<RowId, MegarepoError>;
}
/// This macro implements an async service method type,
/// which can be stored/retrieved from the blobstore.
/// Such types are usually represented as value/handle pairs.
/// Since we need to implement (potentially foreign) traits
/// on these types, we also define corrensponding Rust types
/// Some of the defined types (like context or thrift_type_newtype)
/// are not used from outside of the macro, but we still need
/// to pass identifiers for them from the outside, because
/// Rusts' macro hygiene does not allow identifier generation ¯\_(ツ)_/¯
macro_rules! impl_async_svc_stored_type {
{
/// Rust type for the Loadable handle
handle_type => $handle_type: ident,
/// Underlying thrift type for the handle
handle_thrift_type => $handle_thrift_type: ident,
/// A name for a Newtype-style trait, required by `impl_typed_hash_no_context`
/// Rust type for the Storable value
value_type => $value_type: ident,
/// Underlying thrift type for the value
value_thrift_type => $value_thrift_type: ident,
/// A helper struct for hash computations
context_type => $context_type: ident,
} => {
/// Rust handle type, wrapper around a Blake2 instance
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
pub struct $handle_type(Blake2);
impl_typed_hash_no_context! {
hash_type => $handle_type,
thrift_type => $handle_thrift_type,
}
impl BlobstoreKeyWrapper for $handle_type {
fn blobstore_key(&self) -> String {
format!("async.svc.{}.blake2.{}", stringify!($value_type), self.0.to_hex())
}
fn parse_blobstore_key(key: &str) -> Result<Self, Error> {
// concat! instead of format! to not allocate every time
let prefix = concat!("async.svc.", stringify!($value_type), ".blake2.");
match key.strip_prefix(prefix) {
None => return Err(anyhow!("{} is not a blobstore key for {}", key, stringify!($value_type))),
Some(suffix) => Self::from_str(suffix)
}
}
}
// Typed context type is needed for hash computation
impl_typed_context! {
hash_type => $handle_type,
context_type => $context_type,
context_key => stringify!($value_type),
}
/// Main value type
#[derive(Debug, Clone, PartialEq)]
pub struct $value_type {
id: $handle_type,
thrift: $value_thrift_type,
}
impl $value_type {
pub fn from_thrift(thrift: $value_thrift_type) -> Self {
let data = compact_protocol::serialize(&thrift);
let mut context = $context_type::new();
context.update(&data);
let id = context.finish();
Self { id, thrift }
}
pub fn handle(&self) -> &$handle_type {
&self.id
}
}
// Conversions between thrift types and their Rust counterparts
impl TryFrom<$handle_thrift_type> for $handle_type {
type Error = Error;
fn try_from(t: $handle_thrift_type) -> Result<Self, Self::Error> {
Self::from_thrift(t)
}
}
impl From<$handle_type> for $handle_thrift_type {
fn from(other: $handle_type) -> Self {
Self(mononoke_types_thrift::IdType::Blake2(other.0.into_thrift()))
}
}
impl TryFrom<$value_thrift_type> for $value_type {
type Error = Error;
fn try_from(t: $value_thrift_type) -> Result<Self, Self::Error> {
Ok(Self::from_thrift(t))
}
}
impl From<$value_type> for $value_thrift_type {
fn from(other: $value_type) -> Self {
other.thrift
}
}
impl_loadable_storable! {
handle_type => $handle_type,
handle_thrift_type => $handle_thrift_type,
value_type => $value_type,
value_thrift_type => $value_thrift_type,
}
}
}
/// A macro to call impl_async_svc_stored_type for params/result
/// types, as well as define a bunch of relationships between
/// these types, and their Request-related frients.
/// An underlying idea is to define as much behavior and relationships
/// as possible in the type system, so that we
/// (a) minimize a chance of using incorrent pair of types somewhere
/// (b) can write generic enqueing/polling functions
macro_rules! impl_async_svc_method_types {
{
method_name => $method_name: expr,
request_struct => $request_struct: ident,
params_handle_type => $params_handle_type: ident,
params_handle_thrift_type => $params_handle_thrift_type: ident,
params_value_type => $params_value_type: ident,
params_value_thrift_type => $params_value_thrift_type: ident,
params_context_type => $params_context_type: ident,
result_handle_type => $result_handle_type: ident,
result_handle_thrift_type => $result_handle_thrift_type: ident,
result_value_type => $result_value_type: ident,
result_value_thrift_type => $result_value_thrift_type: ident,
result_context_type => $result_context_type: ident,
response_type => $response_type: ident,
poll_response_type => $poll_response_type: ident,
token_type => $token_type: ident,
token_thrift_type => $token_thrift_type: ident,
} => {
impl_async_svc_stored_type! {
handle_type => $params_handle_type,
handle_thrift_type => $params_handle_thrift_type,
value_type => $params_value_type,
value_thrift_type => $params_value_thrift_type,
context_type => $params_context_type,
}
impl_async_svc_stored_type! {
handle_type => $result_handle_type,
handle_thrift_type => $result_handle_thrift_type,
value_type => $result_value_type,
value_thrift_type => $result_value_thrift_type,
context_type => $result_context_type,
}
pub struct $token_type(pub $token_thrift_type);
impl ThriftParams for $params_value_thrift_type {
type R = $request_struct;
}
impl Token for $token_type {
type ThriftToken = $token_thrift_type;
type R = $request_struct;
fn from_db_id(id: RowId) -> Self {
// Thrift token is a string alias
// but's guard ourselves here against
// it changing unexpectedly.
let thrift_token: $token_thrift_type = format!("{}", id.0);
Self(thrift_token)
}
fn to_db_id(&self) -> Result<RowId, MegarepoError> {
self.0
.parse::<u64>()
.map_err(MegarepoError::request)
.map(RowId)
}
fn into_thrift(self) -> $token_thrift_type {
self.0
}
}
impl From<Result<$response_type, MegarepoError>> for $result_value_type {
fn from(r: Result<$response_type, MegarepoError>) -> $result_value_type {
let thrift = match r {
Ok(payload) => $result_value_thrift_type::success(payload),
Err(e) => $result_value_thrift_type::error(e.into())
};
$result_value_type::from_thrift(thrift)
}
}
impl From<$result_value_type> for Result<$response_type, MegarepoError> {
fn from(r: $result_value_type) -> Result<$response_type, MegarepoError> {
match r.thrift {
$result_value_thrift_type::success(payload) => Ok(payload),
$result_value_thrift_type::error(e) => Err(e.into()),
$result_value_thrift_type::UnknownField(x) => {
// TODO: maybe use structured error?
Err(MegarepoError::internal(
anyhow!(
"failed to parse {} thrift. UnknownField: {}",
stringify!($result_value_thrift_type),
x,
)
))
}
}
}
}
pub struct $request_struct;
impl Request for $request_struct {
const NAME: &'static str = $method_name;
type StoredResultId = $result_handle_type;
type ParamsId = $params_handle_type;
type Token = $token_type;
type ThriftParams = $params_value_thrift_type;
type Params = $params_value_type;
type StoredResult = $result_value_type;
type PollResponse = $poll_response_type;
fn stored_result_into_poll_response(
stored_result: Self::StoredResult,
) -> Result<Self::PollResponse, MegarepoError> {
let r: Result<$response_type, MegarepoError> = stored_result.into();
r.map(|r| $poll_response_type { response: Some(r) })
}
fn empty_poll_response() -> Self::PollResponse {
$poll_response_type { response: None }
}
}
}
}
// Params and result types for megarepo_add_sync_target
impl_async_svc_method_types! {
method_name => "megarepo_add_sync_target",
request_struct => MegarepoAddSyncTarget,
params_handle_type => MegarepoAddTargetParamsId,
params_handle_thrift_type => ThriftMegarepoAddTargetParamsId,
params_value_type => MegarepoAddTargetParams,
params_value_thrift_type => ThriftMegarepoAddTargetParams,
params_context_type => MegarepoAddTargetParamsIdContext,
result_handle_type => MegarepoAddTargetResultId,
result_handle_thrift_type => ThriftMegarepoAddTargetResultId,
result_value_type => MegarepoAddTargetResult,
result_value_thrift_type => ThriftMegarepoAddTargetResult,
result_context_type => MegarepoAddTargetResultIdContext,
response_type => ThriftMegarepoAddTargetResponse,
poll_response_type => ThriftMegarepoAddTargetPollResponse,
token_type => MegarepoAddTargetToken,
token_thrift_type => ThriftMegarepoAddTargetToken,
}
// Params and result types for megarepo_change_target_config
impl_async_svc_method_types! {
method_name => "megarepo_change_target_config",
request_struct => MegarepoChangeTargetConfig,
params_handle_type => MegarepoChangeTargetConfigParamsId,
params_handle_thrift_type => ThriftMegarepoChangeTargetConfigParamsId,
params_value_type => MegarepoChangeTargetConfigParams,
params_value_thrift_type => ThriftMegarepoChangeTargetConfigParams,
params_context_type => MegarepoChangeTargetConfigParamsIdContext,
result_handle_type => MegarepoChangeTargetConfigResultId,
result_handle_thrift_type => ThriftMegarepoChangeTargetConfigResultId,
result_value_type => MegarepoChangeTargetConfigResult,
result_value_thrift_type => ThriftMegarepoChangeTargetConfigResult,
result_context_type => MegarepoChangeTargetConfigResultIdContext,
response_type => ThriftMegarepoChangeTargetConfigResponse,
poll_response_type => ThriftMegarepoChangeTargetConfigPollResponse,
token_type => MegarepoChangeTargetConfigToken,
token_thrift_type => ThriftMegarepoChangeConfigToken,
}
// Params and result types for megarepo_sync_changeset
impl_async_svc_method_types! {
method_name => "megarepo_sync_changeset",
request_struct => MegarepoSyncChangeset,
params_handle_type => MegarepoSyncChangesetParamsId,
params_handle_thrift_type => ThriftMegarepoSyncChangesetParamsId,
params_value_type => MegarepoSyncChangesetParams,
params_value_thrift_type => ThriftMegarepoSyncChangesetParams,
params_context_type => MegarepoSyncChangesetParamsIdContext,
result_handle_type => MegarepoSyncChangesetResultId,
result_handle_thrift_type => ThriftMegarepoSyncChangesetResultId,
result_value_type => MegarepoSyncChangesetResult,
result_value_thrift_type => ThriftMegarepoSyncChangesetResult,
result_context_type => MegarepoSyncChangesetResultIdContext,
response_type => ThriftMegarepoSyncChangesetResponse,
poll_response_type => ThriftMegarepoSyncChangesetPollResponse,
token_type => MegarepoSyncChangesetToken,
token_thrift_type => ThriftMegarepoSyncChangesetToken,
}
// Params and result types for megarepo_remerge_source
impl_async_svc_method_types! {
method_name => "megarepo_remerge_source",
request_struct => MegarepoRemergeSource,
params_handle_type => MegarepoRemergeSourceParamsId,
params_handle_thrift_type => ThriftMegarepoRemergeSourceParamsId,
params_value_type => MegarepoRemergeSourceParams,
params_value_thrift_type => ThriftMegarepoRemergeSourceParams,
params_context_type => MegarepoRemergeSourceParamsIdContext,
result_handle_type => MegarepoRemergeSourceResultId,
result_handle_thrift_type => ThriftMegarepoRemergeSourceResultId,
result_value_type => MegarepoRemergeSourceResult,
result_value_thrift_type => ThriftMegarepoRemergeSourceResult,
result_context_type => MegarepoRemergeSourceResultIdContext,
response_type => ThriftMegarepoRemergeSourceResponse,
poll_response_type => ThriftMegarepoRemergeSourcePollResponse,
token_type => MegarepoRemergeSourceToken,
token_thrift_type => ThriftMegarepoRemergeSourceToken,
}
#[cfg(test)]
mod test {
use super::*;
use blobstore::{Loadable, PutBehaviour, Storable};
use context::CoreContext;
use fbinit::FacebookInit;
use memblob::Memblob;
macro_rules! test_blobstore_key {
{
$type: ident,
$prefix: expr
} => {
let id = $type::from_byte_array([1; 32]);
assert_eq!(id.blobstore_key(), format!(concat!($prefix, ".blake2.{}"), id));
}
}
macro_rules! serialize_deserialize {
{
$type: ident
} => {
let id = $type::from_byte_array([1; 32]);
let serialized = serde_json::to_string(&id).unwrap();
let deserialized = serde_json::from_str(&serialized).unwrap();
assert_eq!(id, deserialized);
}
}
#[test]
fn blobstore_key() {
// These IDs are persistent, and this test is really to make sure that they don't change
// accidentally. Same as in typed_hash.rs
test_blobstore_key!(
MegarepoAddTargetParamsId,
"async.svc.MegarepoAddTargetParams"
);
test_blobstore_key!(
MegarepoAddTargetResultId,
"async.svc.MegarepoAddTargetResult"
);
test_blobstore_key!(
MegarepoChangeTargetConfigParamsId,
"async.svc.MegarepoChangeTargetConfigParams"
);
test_blobstore_key!(
MegarepoChangeTargetConfigResultId,
"async.svc.MegarepoChangeTargetConfigResult"
);
test_blobstore_key!(
MegarepoRemergeSourceParamsId,
"async.svc.MegarepoRemergeSourceParams"
);
test_blobstore_key!(
MegarepoRemergeSourceResultId,
"async.svc.MegarepoRemergeSourceResult"
);
test_blobstore_key!(
MegarepoSyncChangesetParamsId,
"async.svc.MegarepoSyncChangesetParams"
);
test_blobstore_key!(
MegarepoSyncChangesetResultId,
"async.svc.MegarepoSyncChangesetResult"
);
}
#[test]
fn test_serialize_deserialize() {
serialize_deserialize!(MegarepoAddTargetParamsId);
serialize_deserialize!(MegarepoAddTargetResultId);
serialize_deserialize!(MegarepoChangeTargetConfigParamsId);
serialize_deserialize!(MegarepoChangeTargetConfigResultId);
serialize_deserialize!(MegarepoRemergeSourceParamsId);
serialize_deserialize!(MegarepoRemergeSourceResultId);
serialize_deserialize!(MegarepoSyncChangesetParamsId);
serialize_deserialize!(MegarepoSyncChangesetResultId);
}
macro_rules! test_store_load {
{ $type: ident, $ctx: ident, $blobstore: ident } => {
let obj = $type::from_thrift(Default::default());
let id = obj
.clone()
.store(&$ctx, &$blobstore)
.await
.expect(&format!("Failed to store {}", stringify!($type)));
let obj2 = id
.load(&$ctx, &$blobstore)
.await
.expect(&format!("Failed to load {}", stringify!($type)));
assert_eq!(obj, obj2);
}
}
#[fbinit::test]
async fn test_megaerpo_add_target_params_type(fb: FacebookInit) {
let blobstore = Memblob::new(PutBehaviour::IfAbsent);
let ctx = CoreContext::test_mock(fb);
test_store_load!(MegarepoAddTargetParams, ctx, blobstore);
test_store_load!(MegarepoAddTargetResult, ctx, blobstore);
test_store_load!(MegarepoChangeTargetConfigParams, ctx, blobstore);
test_store_load!(MegarepoChangeTargetConfigResult, ctx, blobstore);
test_store_load!(MegarepoRemergeSourceParams, ctx, blobstore);
test_store_load!(MegarepoRemergeSourceResult, ctx, blobstore);
test_store_load!(MegarepoSyncChangesetParams, ctx, blobstore);
test_store_load!(MegarepoSyncChangesetResult, ctx, blobstore);
}
}

View File

@ -0,0 +1,52 @@
[package]
name = "megarepo_types_thrift"
version = "0.1.0"
authors = ["Facebook"]
edition = "2018"
license = "GPLv2+"
build = "thrift_build.rs"
[lib]
path = "thrift_lib.rs"
test = false
doctest = false
[dependencies]
anyhow = "1.0"
async-trait = "0.1.45"
codegen_includer_proc_macro = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
const-cstr = "0.3.0"
fbthrift = { version = "0.0.1+unstable", git = "https://github.com/facebook/fbthrift.git", branch = "master" }
futures = { version = "0.3.13", features = ["async-await", "compat"] }
mononoke_types_thrift = { version = "0.1.0", path = "../../mononoke_types/if" }
once_cell = "1.4"
ref-cast = "1.0.2"
serde = { version = "=1.0.118", features = ["derive", "rc"] }
serde_derive = "1.0"
source_control = { version = "0.1.0", path = "../../scs/if" }
thiserror = "1.0"
[build-dependencies]
thrift_compiler = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
[patch.crates-io]
addr2line = { git = "https://github.com/gimli-rs/addr2line.git", rev = "0b6b6018b5b252a18e628fba03885f7d21844b3c" }
bytecount = { git = "https://github.com/llogiq/bytecount", rev = "469eaf8395c99397cd64d059737a9054aa014088" }
chashmap = { git = "https://gitlab.redox-os.org/ahornby/chashmap", rev = "901ace2ca3cdbc2095adb1af111d211e254e2aae" }
const-random = { git = "https://github.com/fbsource/const-random", rev = "374c5b46427fe2ffbf6acbd9c1687e0f1a809f95" }
curl = { git = "https://github.com/kulshrax/curl-rust", rev = "2a15bbd8dbbd54734313fa703a64db7ce6ddaff0" }
curl-sys = { git = "https://github.com/kulshrax/curl-rust", rev = "2a15bbd8dbbd54734313fa703a64db7ce6ddaff0" }
enumset = { git = "https://github.com/danobi/enumset", rev = "4c01c583c27a725948fededbfb3461c572a669a4" }
lru-disk-cache = { git = "https://github.com/mozilla/sccache", rev = "033ebaae69beeb0ac04e8c35d6ff1103487bd9a3" }
mysql_common = { git = "https://github.com/iammxt/rust_mysql_common", rev = "0e4c86952f1e799960e736c0b2bb9d2a6d935bf1" }
openssl = { git = "https://github.com/sfackler/rust-openssl", rev = "68fc8ba890d77986b06ca5ce29d1089285fbbcf9" }
openssl-sys = { git = "https://github.com/sfackler/rust-openssl", rev = "68fc8ba890d77986b06ca5ce29d1089285fbbcf9" }
petgraph = { git = "https://github.com/jkeljo/petgraph", rev = "e3e9dd8632d23973fdc0b42c1117d5e5fc5fa384" }
prost = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-build = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-derive = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
prost-types = { git = "https://github.com/gabrielrussoc/prost", branch = "protoc-runtime" }
r2d2_sqlite = { git = "https://github.com/jsgf/r2d2-sqlite.git", rev = "6d77a828ca0a3c507a3f58561532a1b6c66c7918" }
rustfilt = { git = "https://github.com/jsgf/rustfilt.git", rev = "8141fa7f1caee562ee8daffb2ddeca3d1f0d36e5" }
tokio-core = { git = "https://github.com/bolinfest/tokio-core", rev = "5f37aa3c627d56ee49154bc851d6930f5ab4398f" }
toml = { git = "https://github.com/jsgf/toml-rs", branch = "dotted-table-0.5.7" }

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
//! Thrift types for megarepo async service methods
//! These aren't part of `source_control.trift`, because they aren't part of
//! the service interface. They aren't part of of `megarepo_configs.thrift`,
//! because they aren't configs. Rather, they are service implementation detail.
include "eden/mononoke/mononoke_types/if/mononoke_types_thrift.thrift"
include "eden/mononoke/scs/if/source_control.thrift"
// Id types for async service methods params and responses.
// Param and response types themselves are defined in the source_control.trift
typedef mononoke_types_thrift.IdType MegarepoAddTargetParamsId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoChangeTargetConfigParamsId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoRemergeSourceParamsId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoSyncChangesetParamsId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoSyncChangesetResultId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoAddTargetResultId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoRemergeSourceResultId (rust.newtype)
typedef mononoke_types_thrift.IdType MegarepoChangeTargetConfigResultId (rust.newtype)
// Stored error for asynchronous service methods
// TODO: better error structuring please
union StoredError {
1: string request_error,
2: string internal_error,
}
// Stored result for `source_control.megarepo_add_sync_target` calls
// These stored results are used to preserve the result of an asynchronous
// computation, so that they can be polled afterwards
union MegarepoAddTargetResult {
1: source_control.MegarepoAddTargetResponse success,
2: StoredError error,
}
// Stored result for `source_control.megarepo_change_target_config` calls
// These stored results are used to preserve the result of an asynchronous
// computation, so that they can be polled afterwards
union MegarepoChangeTargetConfigResult {
1: source_control.MegarepoChangeTargetConfigResponse success,
2: StoredError error,
}
// Stored result for `source_control.megarepo_remerge_source` calls
// These stored results are used to preserve the result of an asynchronous
// computation, so that they can be polled afterwards
union MegarepoRemergeSourceResult {
1: source_control.MegarepoRemergeSourceResponse success,
2: StoredError error,
}
// Stored result for `source_control.megarepo_sync_changeset` calls
// These stored results are used to preserve the result of an asynchronous
// computation, so that they can be polled afterwards
union MegarepoSyncChangesetResult {
1: source_control.MegarepoSyncChangesetResponse success,
2: StoredError error,
}

View File

@ -0,0 +1,45 @@
// @generated by autocargo
use std::env;
use std::fs;
use std::path::Path;
use thrift_compiler::Config;
#[rustfmt::skip]
fn main() {
let out_dir = env::var_os("OUT_DIR").expect("OUT_DIR env not provided");
let out_dir: &Path = out_dir.as_ref();
fs::write(
out_dir.join("cratemap"),
"megarepo_types_thrift crate
mononoke_types_thrift mononoke_types_thrift
source_control source_control",
).expect("Failed to write cratemap");
let conf = {
let mut conf = Config::from_env().expect("Failed to instantiate thrift_compiler::Config");
let path_from_manifest_to_base: &Path = "../../../..".as_ref();
let cargo_manifest_dir =
env::var_os("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not provided");
let cargo_manifest_dir: &Path = cargo_manifest_dir.as_ref();
let base_path = cargo_manifest_dir
.join(path_from_manifest_to_base)
.canonicalize()
.expect("Failed to canonicalize base_path");
conf.base_path(base_path);
let options = "";
if !options.is_empty() {
conf.options(options);
}
conf
};
conf
.run(&[
"megarepo_types_thrift.thrift"
])
.expect("Failed while running thrift compilation");
}

View File

@ -0,0 +1,2 @@
// @generated by autocargo
::codegen_includer_proc_macro::include!();

View File

@ -18,6 +18,7 @@ megarepo_configs = { version = "0.1.0", path = "../../../../configerator/structs
megarepo_error = { version = "0.1.0", path = "../megarepo_error" }
sha-1 = "0.8"
slog = { version = "2.5", features = ["max_level_trace"] }
tokio = { version = "1.4", features = ["full", "test-util"] }
version_cconf_index = { version = "0.1.0", path = "../../../../configerator/structs/scm/mononoke/megarepo/version_cconf_index" }
[patch.crates-io]

View File

@ -8,6 +8,7 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0"
blobstore = { version = "0.1.0", path = "../../blobstore" }
megarepo_types_thrift = { version = "0.1.0", path = "../if" }
thiserror = "1.0"
[patch.crates-io]

View File

@ -10,6 +10,7 @@
pub use anyhow::anyhow;
use blobstore::LoadableError;
use megarepo_types_thrift::StoredError;
use std::backtrace::Backtrace;
use std::convert::Infallible;
use std::error::Error as StdError;
@ -113,3 +114,27 @@ macro_rules! bail_internal {
return Err($crate::MegarepoError::InternalError($crate::InternalError($crate::Arc::new($crate::anyhow!($fmt, $($arg)*)))))
};
}
impl From<MegarepoError> for StoredError {
fn from(other: MegarepoError) -> StoredError {
// TODO: preserve error structure
match other {
MegarepoError::RequestError(e) => StoredError::request_error(format!("{}", e)),
MegarepoError::InternalError(e) => StoredError::internal_error(format!("{}", e)),
}
}
}
impl From<StoredError> for MegarepoError {
fn from(other: StoredError) -> MegarepoError {
// TODO: do something better with error structure
match other {
StoredError::request_error(e) => MegarepoError::request(anyhow!("{}", e)),
StoredError::internal_error(e) => MegarepoError::internal(anyhow!("{}", e)),
StoredError::UnknownField(x) => MegarepoError::internal(anyhow!(
"Failed to deserialize StoredError. UnknownField {}",
x
)),
}
}
}