mononoke_api: implement pagination for all bookmarks

Summary:
Bookmark requests that are truncated because the requested limit is reached now return a `continue_after` value, containing the last bookmark that was processed.

Callers can make a subsequent request with the same parameters, but `after` set to the value received in `continue_after` to continue their request where it left off.

Reviewed By: krallin

Differential Revision: D22338301

fbshipit-source-id: 81e398bee444e0960e65dc3b4cdbbe877aff926d
This commit is contained in:
Mark Thomas 2020-07-02 07:50:12 -07:00 committed by Facebook GitHub Bot
parent 344c8edda4
commit 6d5bce25c6
3 changed files with 78 additions and 55 deletions

View File

@ -42,7 +42,6 @@ warm_bookmarks_cache = { path = "../bookmarks/warm_bookmarks_cache" }
xdiff = { path = "../../scm/lib/xdiff" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"

View File

@ -18,7 +18,9 @@ use blobrepo_factory::{BlobrepoBuilder, BlobstoreOptions, Caching, ReadOnlyStora
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use blobstore_factory::make_metadata_sql_factory;
use bookmarks::{BookmarkName, BookmarkPrefix};
use bookmarks::{
BookmarkKind, BookmarkName, BookmarkPagination, BookmarkPrefix, Bookmarks, Freshness,
};
use changeset_info::ChangesetInfo;
use context::CoreContext;
use cross_repo_sync::{CommitSyncRepos, CommitSyncer};
@ -27,10 +29,9 @@ use fbinit::FacebookInit;
use filestore::{Alias, FetchKey};
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::future::try_join_all;
use futures::stream::StreamExt as NewStreamExt;
use futures::stream::{StreamExt, TryStreamExt};
use futures::try_join;
use futures_ext::StreamExt;
use futures_old::stream::{self, Stream};
use futures_old::stream::Stream;
use itertools::Itertools;
use mercurial_types::Globalrev;
#[cfg(test)]
@ -786,56 +787,68 @@ impl RepoContext {
pub fn list_bookmarks(
&self,
include_scratch: bool,
prefix: Option<String>,
prefix: Option<&str>,
after: Option<&str>,
limit: Option<u64>,
) -> impl Stream<Item = (String, ChangesetId), Error = MononokeError> {
if include_scratch {
let prefix = match prefix.map(BookmarkPrefix::new) {
Some(Ok(prefix)) => prefix,
Some(Err(e)) => {
return stream::once(Err(MononokeError::InvalidRequest(format!(
"invalid bookmark prefix: {}",
e
))))
.boxify()
}
None => {
return stream::once(Err(MononokeError::InvalidRequest(
"prefix required to list scratch bookmarks".to_string(),
)))
.boxify()
}
};
let limit = match limit {
Some(limit) => limit,
None => {
return stream::once(Err(MononokeError::InvalidRequest(
"limit required to list scratch bookmarks".to_string(),
)))
.boxify()
}
};
self.blob_repo()
.get_bonsai_bookmarks_by_prefix_maybe_stale(self.ctx.clone(), &prefix, limit)
.map(|(bookmark, cs_id)| (bookmark.into_name().into_string(), cs_id))
.map_err(MononokeError::from)
.boxify()
) -> Result<impl Stream<Item = (String, ChangesetId), Error = MononokeError>, MononokeError>
{
let kinds = if include_scratch {
if prefix.is_none() {
return Err(MononokeError::InvalidRequest(
"prefix required to list scratch bookmarks".to_string(),
));
}
if limit.is_none() {
return Err(MononokeError::InvalidRequest(
"limit required to list scratch bookmarks".to_string(),
));
}
BookmarkKind::ALL
} else {
// TODO(mbthomas): honour `limit` for publishing bookmarks
let prefix = prefix.unwrap_or_else(|| "".to_string());
self.blob_repo()
.get_bonsai_publishing_bookmarks_maybe_stale(self.ctx.clone())
.filter_map(move |(bookmark, cs_id)| {
let name = bookmark.into_name().into_string();
if name.starts_with(&prefix) {
Some((name, cs_id))
} else {
None
}
})
.map_err(MononokeError::from)
.boxify()
}
BookmarkKind::ALL_PUBLISHING
};
let prefix = match prefix {
Some(prefix) => BookmarkPrefix::new(prefix).map_err(|e| {
MononokeError::InvalidRequest(format!(
"invalid bookmark prefix '{}': {}",
prefix, e
))
})?,
None => BookmarkPrefix::empty(),
};
let pagination = match after {
Some(after) => {
let name = BookmarkName::new(after).map_err(|e| {
MononokeError::InvalidRequest(format!(
"invalid bookmark name '{}': {}",
after, e
))
})?;
BookmarkPagination::After(name)
}
None => BookmarkPagination::FromStart,
};
let blob_repo = self.blob_repo();
let bookmarks = blob_repo
.attribute_expected::<dyn Bookmarks>()
.list(
self.ctx.clone(),
blob_repo.get_repoid(),
Freshness::MaybeStale,
&prefix,
kinds,
&pagination,
limit.unwrap_or(std::u64::MAX),
)
.map_ok(|(bookmark, cs_id)| (bookmark.into_name().into_string(), cs_id))
.map_err(MononokeError::from)
.boxed()
.compat();
Ok(bookmarks)
}
/// Get a stack for the list of heads (up to the first public commit).

View File

@ -140,10 +140,21 @@ impl SourceControlServiceImpl {
};
let repo = self.repo(ctx, &repo).await?;
let bookmarks = repo
.list_bookmarks(params.include_scratch, prefix, limit)
.list_bookmarks(
params.include_scratch,
prefix.as_deref(),
params.after.as_deref(),
limit,
)?
.collect()
.compat()
.await?;
let continue_after = match limit {
Some(limit) if bookmarks.len() as u64 >= limit => {
bookmarks.last().map(|bookmark| bookmark.0.clone())
}
_ => None,
};
let ids = bookmarks.iter().map(|(_name, cs_id)| *cs_id).collect();
let id_mapping = map_commit_identities(&repo, ids, &params.identity_schemes).await?;
let bookmarks = bookmarks
@ -155,7 +166,7 @@ impl SourceControlServiceImpl {
.collect();
Ok(thrift::RepoListBookmarksResponse {
bookmarks,
continue_after: None,
continue_after,
})
}