dequeue_stream

Summary:
This abstracts the constant poolling for new items that megarepo workers will
be doing.

Reviewed By: krallin

Differential Revision: D28606344

fbshipit-source-id: 159c78c207956bcf893ffb68f7f7e93fc2426c8b
This commit is contained in:
Mateusz Kwapich 2021-05-27 02:37:58 -07:00 committed by Facebook GitHub Bot
parent e6219d521c
commit 54cf93cc70
2 changed files with 39 additions and 3 deletions

View File

@ -7,10 +7,13 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0"
async-stream = "0.3"
blobstore = { version = "0.1.0", path = "../../blobstore" }
bookmarks = { version = "0.1.0", path = "../../bookmarks" }
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
context = { version = "0.1.0", path = "../../server/context" }
fbthrift = { version = "0.0.1+unstable", git = "https://github.com/facebook/fbthrift.git", branch = "master" }
futures = { version = "0.3.13", features = ["async-await", "compat"] }
megarepo_config = { version = "0.1.0", path = "../megarepo_config" }
megarepo_error = { version = "0.1.0", path = "../megarepo_error" }
megarepo_types_thrift = { version = "0.1.0", path = "../if" }

View File

@ -8,17 +8,20 @@
#![deny(warnings)]
use anyhow::{anyhow, Error};
use async_stream::try_stream;
use blobstore::PutBehaviour;
use blobstore::{Blobstore, Storable};
use bookmarks::BookmarkName;
use cloned::cloned;
use context::CoreContext;
use futures::stream::Stream;
use megarepo_error::MegarepoError;
use memblob::Memblob;
use mononoke_types::RepositoryId;
use requests_table::{
BlobstoreKey, ClaimedBy, LongRunningRequestsQueue, RequestId, RequestType,
SqlLongRunningRequestsQueue,
BlobstoreKey, LongRunningRequestsQueue, RequestType, SqlLongRunningRequestsQueue,
};
pub use requests_table::{ClaimedBy, RequestId};
use sql_construct::SqlConstruct;
use std::convert::TryFrom;
use std::convert::TryInto;
@ -33,6 +36,8 @@ use crate::types::{
const INITIAL_POLL_DELAY_MS: u64 = 1000;
const MAX_POLL_DURATION: Duration = Duration::from_secs(60);
const DEQUEUE_STREAM_MAX_LATENCY_MS: u64 = 1000;
#[derive(Clone)]
pub struct AsyncMethodRequestQueue {
blobstore: Arc<dyn Blobstore>,
@ -102,6 +107,32 @@ impl AsyncMethodRequestQueue {
}
}
pub fn dequeue_stream(
&self,
ctx: &CoreContext,
claimed_by: &ClaimedBy,
supported_repos: &[RepositoryId],
) -> impl Stream<Item = Result<(RequestId, MegarepoAsynchronousRequestParams), MegarepoError>>
{
let queue = self.clone();
let supported_repos: Vec<_> = supported_repos.into();
cloned!(ctx, claimed_by);
try_stream! {
let sleep_time = Duration::from_millis(DEQUEUE_STREAM_MAX_LATENCY_MS);
loop {
match queue.dequeue(&ctx, &claimed_by, &supported_repos).await? {
Some((request_id, params)) => {
yield (request_id, params);
},
None => {
// Nothing to do, let's sleep before trying again.
tokio::time::sleep(sleep_time).await;
}
}
}
}
}
async fn poll_once<R: Request>(
&self,
ctx: &CoreContext,
@ -171,6 +202,7 @@ mod tests {
use super::*;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::stream::{StreamExt, TryStreamExt};
use requests_table::{ClaimedBy, RequestStatus};
use source_control::{
@ -232,9 +264,10 @@ mod tests {
let new_poll = q.poll_once::<$request_struct>(&ctx, &req_id).await?;
assert!(new_poll.is_none());
let mut request_stream = q.dequeue_stream(&ctx, &ClaimedBy("tests".to_string()), &[entry.repo_id]).boxed();
// Simulate the tailer and grab the element from the queue, this should return the params
// back and flip its state back to "in_progress"
let (req_id, params_from_store) = q.dequeue(&ctx, &ClaimedBy("tests".to_string()), &[entry.repo_id]).await?.unwrap();
let (req_id, params_from_store) = request_stream.try_next().await?.unwrap();
// Verify that request params from blobstore match what we put there
assert_eq!(params_from_store, params.into());