convert changesets to new type futures

Summary: Convert `Changsets` trait and all its uses to new type futures

Reviewed By: krallin

Differential Revision: D25638875

fbshipit-source-id: 947423e2ee47a463861678b146641bcc6b899a4a
This commit is contained in:
Simon Farnsworth 2021-01-06 07:09:39 -08:00 committed by Facebook GitHub Bot
parent 669d08df0e
commit b4a234bbe5
21 changed files with 485 additions and 698 deletions

View File

@ -291,42 +291,44 @@ impl<C> DelayedChangesets<C> {
}
}
#[async_trait]
impl<C: Changesets> Changesets for DelayedChangesets<C> {
fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error> {
delay(self.put_dist, self.inner.add(ctx, cs)).boxify()
async fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> Result<bool, Error> {
delay_v2(self.put_dist).await;
self.inner.add(ctx, cs).await
}
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
delay(self.get_dist, self.inner.get(ctx, repo_id, cs_id)).boxify()
) -> Result<Option<ChangesetEntry>, Error> {
delay_v2(self.get_dist).await;
self.inner.get(ctx, repo_id, cs_id).await
}
fn get_many(
async fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
delay(self.get_dist, self.inner.get_many(ctx, repo_id, cs_ids)).boxify()
) -> Result<Vec<ChangesetEntry>, Error> {
delay_v2(self.get_dist).await;
self.inner.get_many(ctx, repo_id, cs_ids).await
}
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
delay(
self.get_dist,
self.inner
.get_many_by_prefix(ctx, repo_id, cs_prefix, limit),
)
.boxify()
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
delay_v2(self.get_dist).await;
self.inner
.get_many_by_prefix(ctx, repo_id, cs_prefix, limit)
.await
}
fn prime_cache(&self, ctx: &CoreContext, changesets: &[ChangesetEntry]) {

View File

@ -17,7 +17,6 @@ use cloned::cloned;
use context::CoreContext;
use futures::{
channel::oneshot,
compat::Future01CompatExt,
future::{self, BoxFuture, FutureExt, TryFutureExt},
stream::BoxStream,
};
@ -288,7 +287,6 @@ impl CreateChangeset {
};
complete_changesets
.add(ctx.clone(), completion_record)
.compat()
.await
.context("While inserting into changeset table")?;

View File

@ -223,7 +223,6 @@ impl BlobRepoHg for BlobRepo {
let existing: HashSet<_> = self
.get_changesets_object()
.get_many(ctx.clone(), self.get_repoid(), notfound.clone())
.compat()
.await?
.into_iter()
.map(|entry| entry.cs_id)
@ -281,7 +280,6 @@ impl BlobRepoHg for BlobRepo {
let res = self
.get_changesets_object()
.get(ctx, self.get_repoid(), bonsai)
.compat()
.await?;
Ok(res.is_some())
}
@ -304,7 +302,6 @@ impl BlobRepoHg for BlobRepo {
let parents = self
.get_changesets_object()
.get(ctx.clone(), self.get_repoid(), csid)
.compat()
.await?
.ok_or(ErrorKind::BonsaiNotFound(csid))?
.parents

View File

@ -13,4 +13,3 @@ mononoke_types = { path = "../../mononoke_types" }
anyhow = "1.0"
async-trait = "0.1.29"
auto_impl = "0.4"
futures = { version = "0.3.5", features = ["async-await", "compat"] }

View File

@ -10,7 +10,6 @@ use async_trait::async_trait;
use auto_impl::auto_impl;
use changesets::Changesets;
use context::CoreContext;
use futures::compat::Future01CompatExt;
use mononoke_types::{ChangesetId, Generation, RepositoryId};
use std::{any::Any, collections::HashMap, sync::Arc};
@ -61,7 +60,6 @@ impl ChangesetFetcher for SimpleChangesetFetcher {
let maybe_cs = self
.changesets
.get(ctx, self.repo_id.clone(), cs_id.clone())
.compat()
.await?;
let cs = maybe_cs.ok_or_else(|| format_err!("{} not found", cs_id))?;
Ok(Generation::new(cs.gen))
@ -75,7 +73,6 @@ impl ChangesetFetcher for SimpleChangesetFetcher {
let maybe_cs = self
.changesets
.get(ctx, self.repo_id.clone(), cs_id.clone())
.compat()
.await?;
let cs = maybe_cs.ok_or_else(|| format_err!("{} not found", cs_id))?;
Ok(cs.parents)

View File

@ -21,7 +21,6 @@ use cloned::cloned;
use context::CoreContext;
use filestore::FilestoreConfig;
use futures::{
compat::Future01CompatExt,
future::{try_join, BoxFuture},
stream::FuturesUnordered,
FutureExt, Stream, TryStreamExt,
@ -199,7 +198,6 @@ impl BlobRepo {
.inner
.changesets
.get(ctx, self.get_repoid(), changesetid)
.compat()
.await?;
Ok(changeset.is_some())
}
@ -214,7 +212,6 @@ impl BlobRepo {
.inner
.changesets
.get(ctx, self.get_repoid(), changesetid)
.compat()
.await?;
let parents = changeset
.ok_or_else(|| format_err!("Commit {} does not exist in the repo", changesetid))?
@ -334,7 +331,6 @@ impl BlobRepo {
.inner
.changesets
.get(ctx, self.get_repoid(), cs)
.compat()
.await?;
Ok(result.map(|res| Generation::new(res.gen)))
}
@ -475,14 +471,7 @@ pub async fn save_bonsai_changesets(
cs_id: bcs_id,
parents: bcs.parents().into_iter().collect(),
};
cloned!(ctx, complete_changesets);
bonsai_complete_futs.push(async move {
complete_changesets
.add(ctx, completion_record)
.compat()
.await
});
bonsai_complete_futs.push(complete_changesets.add(ctx.clone(), completion_record));
}
}

View File

@ -15,7 +15,6 @@ use std::sync::Arc;
use anyhow::{Error, Result};
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{try_join, TryFutureExt},
stream::{self, BoxStream, StreamExt, TryStreamExt},
Stream,
@ -69,7 +68,6 @@ pub fn fetch_all_public_changesets<'a>(
async move {
let (start, stop) = changesets
.get_changesets_ids_bounds(repo_id.clone())
.compat()
.await?;
let start = start.ok_or_else(|| Error::msg("changesets table is empty"))?;
@ -81,13 +79,10 @@ pub fn fetch_all_public_changesets<'a>(
.and_then(move |(lower_bound, upper_bound)| async move {
let ids: Vec<_> = changesets
.get_list_bs_cs_id_in_range_exclusive(repo_id, lower_bound, upper_bound)
.compat()
.try_collect()
.await?;
let (entries, public) = try_join(
changesets
.get_many(ctx.clone(), repo_id, ids.clone())
.compat(),
changesets.get_many(ctx.clone(), repo_id, ids.clone()),
phases.get_public_raw(ctx, &ids),
)
.await?;

View File

@ -17,7 +17,6 @@ cachelib = { git = "https://github.com/facebookexperimental/rust-shed.git", bran
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
memcache = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
sql = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
@ -25,9 +24,9 @@ abomonation = "0.7"
abomonation_derive = "0.5"
anyhow = "1.0"
async-trait = "0.1.29"
auto_impl = "0.4"
bytes = { version = "0.5", features = ["serde"] }
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
maplit = "1.0"
thiserror = "1.0"

View File

@ -17,20 +17,15 @@ use changeset_entry_thrift as thrift;
use context::CoreContext;
use fbinit::FacebookInit;
use fbthrift::compact_protocol;
use futures::{
compat::Future01CompatExt,
future::{FutureExt, TryFutureExt},
};
use futures_ext::{BoxFuture, FutureExt as _};
use futures_old::Future;
use maplit::hashset;
use memcache::{KeyGen, MemcacheClient};
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[cfg(test)]
use caching_ext::MockStoreStats;
@ -113,74 +108,56 @@ impl CachingChangesets {
}
}
#[async_trait]
impl Changesets for CachingChangesets {
fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error> {
self.changesets.add(ctx, cs)
async fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> Result<bool, Error> {
self.changesets.add(ctx, cs).await
}
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
let this = (*self).clone();
async move {
let ctx = (&ctx, repo_id, &this);
let mut map = get_or_fill(ctx, hashset![cs_id]).await?;
Ok(map.remove(&cs_id))
}
.boxed()
.compat()
.boxify()
) -> Result<Option<ChangesetEntry>, Error> {
let ctx = (&ctx, repo_id, self);
let mut map = get_or_fill(ctx, hashset![cs_id]).await?;
Ok(map.remove(&cs_id))
}
fn get_many(
async fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
let this = (*self).clone();
async move {
let ctx = (&ctx, repo_id, &this);
let res = get_or_fill(ctx, cs_ids.into_iter().collect())
.await?
.into_iter()
.map(|(_, val)| val)
.collect();
Ok(res)
}
.boxed()
.compat()
.boxify()
) -> Result<Vec<ChangesetEntry>, Error> {
let ctx = (&ctx, repo_id, self);
let res = get_or_fill(ctx, cs_ids.into_iter().collect())
.await?
.into_iter()
.map(|(_, val)| val)
.collect();
Ok(res)
}
/// Use caching for the full changeset ids and slower path otherwise.
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
if let Some(id) = cs_prefix.into_changeset_id() {
return self
.get(ctx, repo_id, id)
.map(move |res| {
match res {
Some(_) if limit > 0 => ChangesetIdsResolvedFromPrefix::Single(id),
_ => ChangesetIdsResolvedFromPrefix::NoMatch,
}
})
.boxify();
let res = self.get(ctx, repo_id, id).await?;
return match res {
Some(_) if limit > 0 => Ok(ChangesetIdsResolvedFromPrefix::Single(id)),
_ => Ok(ChangesetIdsResolvedFromPrefix::NoMatch),
};
}
self.changesets
.get_many_by_prefix(ctx, repo_id, cs_prefix, limit)
.boxify()
.await
}
fn prime_cache(&self, _ctx: &CoreContext, changesets: &[ChangesetEntry]) {
@ -258,7 +235,6 @@ impl KeyedEntityStore<ChangesetId, ChangesetEntry> for CacheRequest<'_> {
let res = mapping
.changesets
.get_many((*ctx).clone(), *repo_id, keys.into_iter().collect())
.compat()
.await?;
Result::<_, Error>::Ok(res.into_iter().map(|e| (e.cs_id, e)).collect())

View File

@ -9,12 +9,17 @@
use abomonation_derive::Abomonation;
use anyhow::{Error, Result};
use async_trait::async_trait;
use auto_impl::auto_impl;
use bytes::Bytes;
use cloned::cloned;
use context::{CoreContext, PerfCounterType};
use fbthrift::compact_protocol;
use futures_ext::{try_boxfuture, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_old::{future::ok, stream, Future, IntoFuture};
use futures::{
compat::Future01CompatExt,
stream::{self, BoxStream, StreamExt},
TryFutureExt,
};
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
@ -29,7 +34,6 @@ mod caching;
mod errors;
#[cfg(test)]
mod test;
mod wrappers;
pub use caching::{get_cache_key, CachingChangesets};
pub use errors::ErrorKind;
@ -98,13 +102,11 @@ pub fn deserialize_cs_entries(blob: &Bytes) -> Result<Vec<ChangesetEntry>> {
compact_protocol::deserialize(blob)?;
let mut entries = vec![];
for thrift_entry in thrift_entries {
let parents: Result<Vec<_>> = thrift_entry
let parents = thrift_entry
.parents
.into_iter()
.map(ChangesetId::from_thrift)
.collect();
let parents = parents?;
.collect::<Result<Vec<_>>>()?;
let entry = ChangesetEntry {
repo_id: RepositoryId::new(thrift_entry.repo_id.0),
cs_id: ChangesetId::from_thrift(thrift_entry.cs_id)?,
@ -113,7 +115,6 @@ pub fn deserialize_cs_entries(blob: &Bytes) -> Result<Vec<ChangesetEntry>> {
};
entries.push(entry);
}
Ok(entries)
}
@ -125,35 +126,37 @@ pub struct ChangesetInsert {
}
/// Interface to storage of changesets that have been completely stored in Mononoke.
#[async_trait]
#[auto_impl(&, Arc)]
pub trait Changesets: Send + Sync {
/// Add a new entry to the changesets table. Returns true if new changeset was inserted,
/// returns false if the same changeset has already existed.
fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error>;
async fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> Result<bool, Error>;
/// Retrieve the row specified by this commit, if available.
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error>;
) -> Result<Option<ChangesetEntry>, Error>;
/// Retrieve the rows for all the commits if available
fn get_many(
async fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error>;
) -> Result<Vec<ChangesetEntry>, Error>;
/// Retrieve the rows for all the commits with the given prefix up to the given limit
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error>;
) -> Result<ChangesetIdsResolvedFromPrefix, Error>;
fn prime_cache(&self, ctx: &CoreContext, changesets: &[ChangesetEntry]);
@ -263,154 +266,125 @@ impl SqlConstruct for SqlChangesets {
impl SqlConstructFromMetadataDatabaseConfig for SqlChangesets {}
#[async_trait]
impl Changesets for SqlChangesets {
fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error> {
async fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> Result<bool, Error> {
STATS::adds.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlWrites);
cloned!(self.write_connection);
let parent_rows = {
if cs.parents.is_empty() {
Ok(Vec::new()).into_future().boxify()
Vec::new()
} else {
SelectChangesets::query(&write_connection, &cs.repo_id, &cs.parents[..]).boxify()
SelectChangesets::query(&self.write_connection, &cs.repo_id, &cs.parents[..])
.compat()
.await?
}
};
check_missing_rows(&cs.parents, &parent_rows)?;
let gen = parent_rows.iter().map(|row| row.2).max().unwrap_or(0) + 1;
let transaction = self.write_connection.start_transaction().compat().await?;
let (transaction, result) =
InsertChangeset::query_with_transaction(transaction, &[(&cs.repo_id, &cs.cs_id, &gen)])
.compat()
.await?;
parent_rows
.and_then(move |parent_rows| {
try_boxfuture!(check_missing_rows(&cs.parents, &parent_rows));
let gen = parent_rows.iter().map(|row| row.2).max().unwrap_or(0) + 1;
write_connection
.start_transaction()
.and_then({
cloned!(cs);
move |transaction| {
InsertChangeset::query_with_transaction(
transaction,
&[(&cs.repo_id, &cs.cs_id, &gen)],
)
}
})
.and_then(move |(transaction, result)| {
if result.affected_rows() == 1 && result.last_insert_id().is_some() {
insert_parents(
transaction,
result.last_insert_id().unwrap(),
cs,
parent_rows,
)
.map(|()| true)
.left_future()
} else {
transaction
.rollback()
.and_then(move |()| check_changeset_matches(&write_connection, cs))
.map(|()| false)
.right_future()
}
})
.boxify()
})
.boxify()
if result.affected_rows() == 1 && result.last_insert_id().is_some() {
insert_parents(
transaction,
result.last_insert_id().unwrap(),
cs,
parent_rows,
)
.await?;
Ok(true)
} else {
transaction.rollback().compat().await?;
check_changeset_matches(&self.write_connection, cs).await?;
Ok(false)
}
}
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
) -> Result<Option<ChangesetEntry>, Error> {
STATS::gets.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
cloned!(self.read_master_connection);
select_changeset(&self.read_connection, repo_id, cs_id)
.and_then(move |maybe_mapping| {
match maybe_mapping {
Some(mapping) => Ok(Some(mapping)).into_future().boxify(),
None => {
STATS::gets_master.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
select_changeset(&read_master_connection, repo_id, cs_id)
}
}
})
.boxify()
let maybe_mapping = select_changeset(&self.read_connection, repo_id, cs_id).await?;
match maybe_mapping {
Some(mapping) => Ok(Some(mapping)),
None => {
STATS::gets_master.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
select_changeset(&self.read_master_connection, repo_id, cs_id).await
}
}
}
fn get_many(
async fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
cloned!(self.read_master_connection);
) -> Result<Vec<ChangesetEntry>, Error> {
if cs_ids.is_empty() {
ok(vec![]).boxify()
return Ok(vec![]);
}
STATS::get_many.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
let fetched_cs = select_many_changesets(&self.read_connection, repo_id, &cs_ids).await?;
let fetched_set: HashSet<_> = fetched_cs
.clone()
.into_iter()
.map(|cs_entry| cs_entry.cs_id)
.collect();
let notfetched_cs_ids: Vec<_> = cs_ids
.into_iter()
.filter(|cs_id| !fetched_set.contains(cs_id))
.collect();
if notfetched_cs_ids.is_empty() {
Ok(fetched_cs)
} else {
STATS::get_many.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
select_many_changesets(&self.read_connection, repo_id, &cs_ids)
.and_then(move |fetched_cs| {
let fetched_set: HashSet<_> = fetched_cs
.clone()
.into_iter()
.map(|cs_entry| cs_entry.cs_id)
.collect();
let notfetched_cs_ids: Vec<_> = cs_ids
.into_iter()
.filter(|cs_id| !fetched_set.contains(cs_id))
.collect();
if notfetched_cs_ids.is_empty() {
ok(fetched_cs).left_future()
} else {
STATS::get_many.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
select_many_changesets(&read_master_connection, repo_id, &notfetched_cs_ids)
.map(move |mut master_fetched_cs| {
master_fetched_cs.extend(fetched_cs);
master_fetched_cs
})
.right_future()
}
})
.boxify()
.increment_counter(PerfCounterType::SqlReadsMaster);
let mut master_fetched_cs =
select_many_changesets(&self.read_master_connection, repo_id, &notfetched_cs_ids)
.await?;
master_fetched_cs.extend(fetched_cs);
Ok(master_fetched_cs)
}
}
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
STATS::get_many_by_prefix.add_value(1);
cloned!(self.read_master_connection);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
fetch_many_by_prefix(&self.read_connection, repo_id, &cs_prefix, limit)
.and_then(move |resolved_cs| {
match resolved_cs {
ChangesetIdsResolvedFromPrefix::NoMatch => {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
fetch_many_by_prefix(&read_master_connection, repo_id, &cs_prefix, limit)
}
_ => ok(resolved_cs).boxify(),
}
})
.boxify()
let resolved_cs =
fetch_many_by_prefix(&self.read_connection, repo_id, &cs_prefix, limit).await?;
match resolved_cs {
ChangesetIdsResolvedFromPrefix::NoMatch => {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
fetch_many_by_prefix(&self.read_master_connection, repo_id, &cs_prefix, limit).await
}
_ => Ok(resolved_cs),
}
}
fn prime_cache(&self, _ctx: &CoreContext, _changesets: &[ChangesetEntry]) {
@ -422,32 +396,32 @@ impl Changesets for SqlChangesets {
}
}
fn fetch_many_by_prefix(
async fn fetch_many_by_prefix(
connection: &Connection,
repo_id: RepositoryId,
cs_prefix: &ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
SelectChangesetsRange::query(
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
let rows = SelectChangesetsRange::query(
&connection,
&repo_id,
&cs_prefix.min_as_ref(),
&cs_prefix.max_as_ref(),
&(limit + 1),
)
.map(move |rows| {
let mut fetched_cs: Vec<ChangesetId> = rows.into_iter().map(|row| row.0).collect();
match fetched_cs.len() {
0 => ChangesetIdsResolvedFromPrefix::NoMatch,
1 => ChangesetIdsResolvedFromPrefix::Single(fetched_cs[0].clone()),
l if l <= limit => ChangesetIdsResolvedFromPrefix::Multiple(fetched_cs),
_ => ChangesetIdsResolvedFromPrefix::TooMany({
fetched_cs.pop();
fetched_cs
}),
}
})
.boxify()
.compat()
.await?;
let mut fetched_cs: Vec<ChangesetId> = rows.into_iter().map(|row| row.0).collect();
let result = match fetched_cs.len() {
0 => ChangesetIdsResolvedFromPrefix::NoMatch,
1 => ChangesetIdsResolvedFromPrefix::Single(fetched_cs[0].clone()),
l if l <= limit => ChangesetIdsResolvedFromPrefix::Multiple(fetched_cs),
_ => ChangesetIdsResolvedFromPrefix::TooMany({
fetched_cs.pop();
fetched_cs
}),
};
Ok(result)
}
impl SqlChangesets {
@ -456,37 +430,42 @@ impl SqlChangesets {
repo_id: RepositoryId,
min_id: u64,
max_id: u64,
) -> BoxStream<ChangesetId, Error> {
) -> BoxStream<'_, Result<ChangesetId, Error>> {
// [min_id, max_id)
cloned!(self.read_master_connection);
// As SQL request is BETWEEN, both bounds including
let max_id = max_id - 1;
SelectAllChangesetsIdsInRange::query(&read_master_connection, &repo_id, &min_id, &max_id)
.map(move |rows| {
let changesets_ids = rows.into_iter().map(|row| row.0);
stream::iter_ok(changesets_ids).boxify()
})
.from_err()
.flatten_stream()
.boxify()
cloned!(self.read_master_connection);
async move {
SelectAllChangesetsIdsInRange::query(
&read_master_connection,
&repo_id,
&min_id,
&max_id,
)
.compat()
.await
}
.map_ok(move |rows| {
let changesets_ids = rows.into_iter().map(|row| Ok(row.0));
stream::iter(changesets_ids)
})
.try_flatten_stream()
.boxed()
}
pub fn get_changesets_ids_bounds(
pub async fn get_changesets_ids_bounds(
&self,
repo_id: RepositoryId,
) -> BoxFuture<(Option<u64>, Option<u64>), Error> {
cloned!(self.read_master_connection);
SelectChangesetsIdsBounds::query(&read_master_connection, &repo_id)
.map(move |rows| {
if rows.is_empty() {
(None, None)
} else {
(Some(rows[0].0), Some(rows[0].1))
}
})
.boxify()
) -> Result<(Option<u64>, Option<u64>), Error> {
let rows = SelectChangesetsIdsBounds::query(&self.read_master_connection, &repo_id)
.compat()
.await?;
if rows.is_empty() {
Ok((None, None))
} else {
Ok((Some(rows[0].0), Some(rows[0].1)))
}
}
}
@ -508,12 +487,12 @@ fn check_missing_rows(
}
}
fn insert_parents(
async fn insert_parents(
transaction: Transaction,
new_cs_id: u64,
cs: ChangesetInsert,
parent_rows: Vec<(u64, ChangesetId, u64)>,
) -> impl Future<Item = (), Error = Error> {
) -> Result<(), Error> {
// parent_rows might not be in the same order as cs.parents.
let parent_map: HashMap<_, _> = parent_rows.into_iter().map(|row| (row.1, row.0)).collect();
@ -537,76 +516,77 @@ fn insert_parents(
.map(|row| (&row.0, &row.1, &row.2))
.collect();
InsertParents::query_with_transaction(transaction, &ref_parent_inserts[..])
.and_then(|(transaction, _)| transaction.commit())
let (transaction, _) =
InsertParents::query_with_transaction(transaction, &ref_parent_inserts[..])
.compat()
.await?;
transaction.commit().compat().await?;
Ok(())
}
fn check_changeset_matches(
async fn check_changeset_matches(
connection: &Connection,
cs: ChangesetInsert,
) -> impl Future<Item = (), Error = Error> {
select_changeset(&connection, cs.repo_id, cs.cs_id).and_then(move |stored_cs| {
let stored_parents = stored_cs.map(|cs| cs.parents);
if Some(&cs.parents) == stored_parents.as_ref() {
Ok(())
} else {
Err(ErrorKind::DuplicateInsertionInconsistency(
cs.cs_id,
stored_parents.unwrap_or(Vec::new()),
cs.parents,
)
.into())
}
})
) -> Result<(), Error> {
let stored_parents = select_changeset(&connection, cs.repo_id, cs.cs_id)
.await?
.map(|cs| cs.parents);
if Some(&cs.parents) == stored_parents.as_ref() {
Ok(())
} else {
Err(ErrorKind::DuplicateInsertionInconsistency(
cs.cs_id,
stored_parents.unwrap_or_default(),
cs.parents,
)
.into())
}
}
fn select_changeset(
async fn select_changeset(
connection: &Connection,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
cloned!(repo_id, cs_id);
SelectChangeset::query(&connection, &repo_id, &cs_id)
.map(move |rows| {
if rows.is_empty() {
None
} else {
let gen = rows[0].0;
Some(ChangesetEntry {
repo_id,
cs_id,
parents: rows.into_iter().filter_map(|row| row.1).collect(),
gen,
})
}
) -> Result<Option<ChangesetEntry>, Error> {
let rows = SelectChangeset::query(&connection, &repo_id, &cs_id)
.compat()
.await?;
let result = if rows.is_empty() {
None
} else {
let gen = rows[0].0;
Some(ChangesetEntry {
repo_id,
cs_id,
parents: rows.into_iter().filter_map(|row| row.1).collect(),
gen,
})
.boxify()
};
Ok(result)
}
fn select_many_changesets(
async fn select_many_changesets(
connection: &Connection,
repo_id: RepositoryId,
cs_ids: &Vec<ChangesetId>,
) -> impl Future<Item = Vec<ChangesetEntry>, Error = Error> {
SelectManyChangesets::query(&connection, &repo_id, &cs_ids[..]).map(move |fetched_changesets| {
let mut cs_id_to_cs_entry = HashMap::new();
for (cs_id, gen, maybe_parent, _) in fetched_changesets {
cs_id_to_cs_entry
.entry(cs_id)
.or_insert(ChangesetEntry {
repo_id,
cs_id,
parents: vec![],
gen,
})
.parents
.extend(maybe_parent.into_iter());
}
cs_id_to_cs_entry.values().cloned().collect()
})
) -> Result<Vec<ChangesetEntry>, Error> {
let fetched_changesets = SelectManyChangesets::query(&connection, &repo_id, &cs_ids[..])
.compat()
.await?;
let mut cs_id_to_cs_entry = HashMap::new();
for (cs_id, gen, maybe_parent, _) in fetched_changesets {
cs_id_to_cs_entry
.entry(cs_id)
.or_insert(ChangesetEntry {
repo_id,
cs_id,
parents: vec![],
gen,
})
.parents
.extend(maybe_parent.into_iter());
}
Ok(cs_id_to_cs_entry.values().cloned().collect())
}
#[cfg(test)]

View File

@ -9,33 +9,43 @@
use super::{
CachingChangesets, ChangesetEntry, ChangesetInsert, Changesets, ErrorKind, SqlChangesets,
};
use anyhow::Error;
use assert_matches::assert_matches;
use caching_ext::MockStoreStats;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::Future;
use maplit::hashset;
use mononoke_types::{ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix};
use mononoke_types_mocks::changesetid::*;
use mononoke_types_mocks::repo::*;
use sql_construct::SqlConstruct;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::str::FromStr;
use std::sync::Arc;
use tokio_compat::runtime::Runtime;
use std::{collections::HashSet, iter::FromIterator, str::FromStr, sync::Arc};
fn run_test(fb: FacebookInit, test_fn: fn(FacebookInit, SqlChangesets)) {
test_fn(fb, SqlChangesets::with_sqlite_in_memory().unwrap());
async fn run_test<F, FO>(fb: FacebookInit, test_fn: F) -> Result<(), Error>
where
F: FnOnce(FacebookInit, SqlChangesets) -> FO,
FO: Future<Output = Result<(), Error>>,
{
test_fn(fb, SqlChangesets::with_sqlite_in_memory().unwrap()).await?;
Ok(())
}
fn run_caching_test(fb: FacebookInit, test_fn: fn(FacebookInit, CachingChangesets)) {
async fn run_caching_test<F, FO>(fb: FacebookInit, test_fn: F) -> Result<(), Error>
where
F: FnOnce(FacebookInit, CachingChangesets) -> FO,
FO: Future<Output = Result<(), Error>>,
{
let real_changesets = Arc::new(SqlChangesets::with_sqlite_in_memory().unwrap());
let changesets = CachingChangesets::mocked(real_changesets);
test_fn(fb, changesets);
test_fn(fb, changesets).await?;
Ok(())
}
fn add_and_get<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn add_and_get<C: Changesets + 'static>(
fb: FacebookInit,
changesets: C,
) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row = ChangesetInsert {
@ -44,13 +54,8 @@ fn add_and_get<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row))
.expect("Adding new entry failed");
let result = rt
.block_on(changesets.get(ctx, REPO_ZERO, ONES_CSID))
.expect("Get failed");
changesets.add(ctx.clone(), row).await?;
let result = changesets.get(ctx, REPO_ZERO, ONES_CSID).await?;
assert_eq!(
result,
Some(ChangesetEntry {
@ -60,10 +65,10 @@ fn add_and_get<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
gen: 1,
}),
);
Ok(())
}
fn add_missing_parents<C: Changesets>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn add_missing_parents<C: Changesets>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row = ChangesetInsert {
@ -72,31 +77,29 @@ fn add_missing_parents<C: Changesets>(fb: FacebookInit, changesets: C) {
parents: vec![TWOS_CSID],
};
let result = rt
.block_on(changesets.add(ctx, row))
let result = changesets
.add(ctx, row)
.await
.expect_err("Adding entry with missing parents failed (should have succeeded)");
assert_matches!(
result.downcast::<ErrorKind>(),
Ok(ErrorKind::MissingParents(ref x)) if x == &vec![TWOS_CSID]
);
Ok(())
}
fn missing<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn missing<C: Changesets + 'static>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let result = rt
.block_on(changesets.get(ctx, REPO_ZERO, ONES_CSID))
let result = changesets
.get(ctx, REPO_ZERO, ONES_CSID)
.await
.expect("Failed to fetch missing changeset (should succeed with None instead)");
assert_eq!(result, None);
Ok(())
}
fn duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: ONES_CSID,
@ -104,32 +107,31 @@ fn duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
};
assert_eq!(
rt.block_on(changesets.add(ctx.clone(), row.clone()))
.expect("Adding new entry failed"),
changesets.add(ctx.clone(), row.clone()).await?,
true,
"inserting unique changeset must return true"
);
assert_eq!(
rt.block_on(changesets.add(ctx.clone(), row.clone()))
.expect("error while adding changeset"),
changesets.add(ctx.clone(), row.clone()).await?,
false,
"inserting the same changeset must return false"
);
Ok(())
}
fn broken_duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn broken_duplicate<C: Changesets + 'static>(
fb: FacebookInit,
changesets: C,
) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: ONES_CSID,
parents: vec![],
};
assert_eq!(
rt.block_on(changesets.add(ctx.clone(), row))
.expect("Adding new entry failed"),
changesets.add(ctx.clone(), row).await?,
true,
"inserting unique changeset must return true"
);
@ -140,8 +142,7 @@ fn broken_duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
parents: vec![],
};
assert_eq!(
rt.block_on(changesets.add(ctx.clone(), row))
.expect("Adding new entry failed"),
changesets.add(ctx.clone(), row).await?,
true,
"inserting unique changeset must return true"
);
@ -151,17 +152,19 @@ fn broken_duplicate<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![TWOS_CSID],
};
let result = rt
.block_on(changesets.add(ctx.clone(), row))
let result = changesets
.add(ctx.clone(), row)
.await
.expect_err("Adding changeset with the same hash but differen parents should fail");
match result.downcast::<ErrorKind>() {
Ok(ErrorKind::DuplicateInsertionInconsistency(..)) => {}
err => panic!("unexpected error: {:?}", err),
};
Ok(())
}
fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn complex<C: Changesets>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -169,44 +172,38 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
changesets.add(ctx.clone(), row1).await?;
let row2 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
changesets.add(ctx.clone(), row2).await?;
let row3 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: THREES_CSID,
parents: vec![TWOS_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row3))
.expect("Adding row 3 failed");
changesets.add(ctx.clone(), row3).await?;
let row4 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: FOURS_CSID,
parents: vec![ONES_CSID, THREES_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row4))
.expect("Adding row 4 failed");
changesets.add(ctx.clone(), row4).await?;
let row5 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: FIVES_CSID,
parents: vec![ONES_CSID, TWOS_CSID, FOURS_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row5))
.expect("Adding row 5 failed");
changesets.add(ctx.clone(), row5).await?;
assert_eq!(
rt.block_on(changesets.get(ctx.clone(), REPO_ZERO, ONES_CSID))
.expect("Get row 1 failed"),
changesets.get(ctx.clone(), REPO_ZERO, ONES_CSID).await?,
Some(ChangesetEntry {
repo_id: REPO_ZERO,
cs_id: ONES_CSID,
@ -216,8 +213,7 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
);
assert_eq!(
rt.block_on(changesets.get(ctx.clone(), REPO_ZERO, TWOS_CSID))
.expect("Get row 2 failed"),
changesets.get(ctx.clone(), REPO_ZERO, TWOS_CSID).await?,
Some(ChangesetEntry {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
@ -227,8 +223,7 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
);
assert_eq!(
rt.block_on(changesets.get(ctx.clone(), REPO_ZERO, THREES_CSID))
.expect("Get row 3 failed"),
changesets.get(ctx.clone(), REPO_ZERO, THREES_CSID).await?,
Some(ChangesetEntry {
repo_id: REPO_ZERO,
cs_id: THREES_CSID,
@ -238,8 +233,7 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
);
assert_eq!(
rt.block_on(changesets.get(ctx.clone(), REPO_ZERO, FOURS_CSID))
.expect("Get row 4 failed"),
changesets.get(ctx.clone(), REPO_ZERO, FOURS_CSID).await?,
Some(ChangesetEntry {
repo_id: REPO_ZERO,
cs_id: FOURS_CSID,
@ -249,8 +243,7 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
);
assert_eq!(
rt.block_on(changesets.get(ctx.clone(), REPO_ZERO, FIVES_CSID))
.expect("Get row 5 failed"),
changesets.get(ctx.clone(), REPO_ZERO, FIVES_CSID).await?,
Some(ChangesetEntry {
repo_id: REPO_ZERO,
cs_id: FIVES_CSID,
@ -258,10 +251,11 @@ fn complex<C: Changesets>(fb: FacebookInit, changesets: C) {
gen: 4,
}),
);
Ok(())
}
fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -269,44 +263,39 @@ fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
changesets.add(ctx.clone(), row1).await?;
let row2 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
changesets.add(ctx.clone(), row2).await?;
let row3 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: THREES_CSID,
parents: vec![TWOS_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row3))
.expect("Adding row 3 failed");
changesets.add(ctx.clone(), row3).await?;
let row4 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: FOURS_CSID,
parents: vec![ONES_CSID, THREES_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row4))
.expect("Adding row 4 failed");
changesets.add(ctx.clone(), row4).await?;
let row5 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: FIVES_CSID,
parents: vec![THREES_CSID, ONES_CSID, TWOS_CSID, FOURS_CSID],
};
rt.block_on(changesets.add(ctx.clone(), row5))
.expect("Adding row 5 failed");
changesets.add(ctx.clone(), row5).await?;
let actual = rt
.block_on(changesets.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID]))
.expect("Get row 1 failed");
let actual = changesets
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID])
.await?;
assert_eq!(
HashSet::from_iter(actual),
@ -326,14 +315,13 @@ fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
]
);
let actual = rt
.block_on(changesets.get_many(
let actual = changesets
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, TWOS_CSID, THREES_CSID],
))
.expect("Get row 2 failed");
)
.await?;
assert_eq!(
HashSet::from_iter(actual),
hashset![
@ -358,16 +346,12 @@ fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
]
);
let actual = rt
.block_on(changesets.get_many(ctx.clone(), REPO_ZERO, vec![]))
.expect("Get row 3 failed");
let actual = changesets.get_many(ctx.clone(), REPO_ZERO, vec![]).await?;
assert_eq!(HashSet::from_iter(actual), hashset![]);
let actual = rt
.block_on(changesets.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, FOURS_CSID]))
.expect("Get row 2 failed");
let actual = changesets
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, FOURS_CSID])
.await?;
assert_eq!(
HashSet::from_iter(actual),
hashset![
@ -386,14 +370,13 @@ fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
]
);
let actual = rt
.block_on(changesets.get_many(
let actual = changesets
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, FOURS_CSID, FIVES_CSID],
))
.expect("Get row 2 failed");
)
.await?;
assert_eq!(
HashSet::from_iter(actual),
hashset![
@ -417,10 +400,11 @@ fn get_many<C: Changesets>(fb: FacebookInit, changesets: C) {
},
]
);
Ok(())
}
fn get_many_missing<C: Changesets>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn get_many_missing<C: Changesets>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -428,25 +412,22 @@ fn get_many_missing<C: Changesets>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
changesets.add(ctx.clone(), row1).await?;
let row2 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
changesets.add(ctx.clone(), row2).await?;
let actual = rt
.block_on(changesets.get_many(
let actual = changesets
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, TWOS_CSID, THREES_CSID],
))
.expect("get_many failed");
)
.await?;
assert_eq!(
HashSet::from_iter(actual),
hashset![
@ -464,10 +445,11 @@ fn get_many_missing<C: Changesets>(fb: FacebookInit, changesets: C) {
},
]
);
Ok(())
}
fn get_many_by_prefix<C: Changesets>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn get_many_by_prefix<C: Changesets>(fb: FacebookInit, changesets: C) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -491,103 +473,95 @@ fn get_many_by_prefix<C: Changesets>(fb: FacebookInit, changesets: C) {
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
rt.block_on(changesets.add(ctx.clone(), row3))
.expect("Adding row 3 failed");
rt.block_on(changesets.add(ctx.clone(), row4))
.expect("Adding row 4 failed");
changesets.add(ctx.clone(), row1).await?;
changesets.add(ctx.clone(), row2).await?;
changesets.add(ctx.clone(), row3).await?;
changesets.add(ctx.clone(), row4).await?;
// found a single changeset
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_bytes(&ONES_CSID.as_ref()[0..8]).unwrap(),
10,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(actual, ChangesetIdsResolvedFromPrefix::Single(ONES_CSID));
// found a single changeset
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_bytes(&TWOS_CSID.as_ref()[0..12]).unwrap(),
1,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(actual, ChangesetIdsResolvedFromPrefix::Single(TWOS_CSID));
// found several changesets within the limit
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_bytes(&FS_CSID.as_ref()[0..8]).unwrap(),
10,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(
actual,
ChangesetIdsResolvedFromPrefix::Multiple(vec![FS_ES_CSID, FS_CSID]),
);
// found several changesets within the limit by hex string prefix
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_str(&"fff").unwrap(),
10,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(
actual,
ChangesetIdsResolvedFromPrefix::Multiple(vec![FS_ES_CSID, FS_CSID]),
);
// found several changesets exceeding the limit
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_bytes(&FS_CSID.as_ref()[0..8]).unwrap(),
1,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(
actual,
ChangesetIdsResolvedFromPrefix::TooMany(vec![FS_ES_CSID]),
);
// not found
let actual = rt
.block_on(changesets.get_many_by_prefix(
let actual = changesets
.get_many_by_prefix(
ctx.clone(),
REPO_ZERO,
ChangesetIdPrefix::from_bytes(&THREES_CSID.as_ref()[0..16]).unwrap(),
10,
))
.expect("get_many_by_prefix failed");
)
.await?;
assert_eq!(actual, ChangesetIdsResolvedFromPrefix::NoMatch);
Ok(())
}
fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn caching_fill<C: Changesets + 'static>(
fb: FacebookInit,
changesets: C,
) -> Result<(), Error> {
let changesets = Arc::new(changesets);
let mut cc = CachingChangesets::mocked(changesets.clone());
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -595,33 +569,25 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![],
};
let row2 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
parents: vec![],
};
let row3 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: THREES_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
rt.block_on(changesets.add(ctx.clone(), row3))
.expect("Adding row 3 failed");
changesets.add(ctx.clone(), row1).await?;
changesets.add(ctx.clone(), row2).await?;
changesets.add(ctx.clone(), row3).await?;
// First read should miss everyhwere.
let _ = rt
.block_on(cc.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID]))
.expect("read 1 failed");
let _ = cc
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID])
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -644,10 +610,9 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// Another read with the same pool should hit in local cache.
let _ = rt
.block_on(cc.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID]))
.expect("read 2 failed");
let _ = cc
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID])
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -672,10 +637,9 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
cc = cc.fork_cachelib();
// Read with a separate pool should hit in Memcache.
let _ = rt
.block_on(cc.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID]))
.expect("read 3 failed");
let _ = cc
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID])
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -698,10 +662,9 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// Reading again from the separate pool should now hit in local cache.
let _ = rt
.block_on(cc.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID]))
.expect("read 4 failed");
let _ = cc
.get_many(ctx.clone(), REPO_ZERO, vec![ONES_CSID, TWOS_CSID])
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -724,14 +687,13 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// Partial read should partially hit
let _ = rt
.block_on(cc.get_many(
let _ = cc
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, TWOS_CSID, THREES_CSID],
))
.expect("read 5 failed");
)
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -754,14 +716,13 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// Partial read should have filled local cache.
let _ = rt
.block_on(cc.get_many(
let _ = cc
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, TWOS_CSID, THREES_CSID],
))
.expect("read 6 failed");
)
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -782,14 +743,16 @@ fn caching_fill<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
},
"read 6, memcache"
);
Ok(())
}
fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
let mut rt = Runtime::new().unwrap();
async fn caching_shared<C: Changesets + 'static>(
fb: FacebookInit,
changesets: C,
) -> Result<(), Error> {
let changesets = Arc::new(changesets);
let cc = CachingChangesets::mocked(changesets.clone());
let ctx = CoreContext::test_mock(fb);
let row1 = ChangesetInsert {
@ -797,33 +760,23 @@ fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
cs_id: ONES_CSID,
parents: vec![],
};
let row2 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: TWOS_CSID,
parents: vec![],
};
let row3 = ChangesetInsert {
repo_id: REPO_ZERO,
cs_id: THREES_CSID,
parents: vec![],
};
rt.block_on(changesets.add(ctx.clone(), row1))
.expect("Adding row 1 failed");
rt.block_on(changesets.add(ctx.clone(), row2))
.expect("Adding row 2 failed");
rt.block_on(changesets.add(ctx.clone(), row3))
.expect("Adding row 3 failed");
changesets.add(ctx.clone(), row1).await?;
changesets.add(ctx.clone(), row2).await?;
changesets.add(ctx.clone(), row3).await?;
// get should miss
let _ = rt
.block_on(cc.get(ctx.clone(), REPO_ZERO, ONES_CSID))
.expect("read 1 failed");
let _ = cc.get(ctx.clone(), REPO_ZERO, ONES_CSID).await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -846,14 +799,13 @@ fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// get_many should hit for what was filled by get
let _ = rt
.block_on(cc.get_many(
let _ = cc
.get_many(
ctx.clone(),
REPO_ZERO,
vec![ONES_CSID, TWOS_CSID, THREES_CSID],
))
.expect("read 2 failed");
)
.await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -876,10 +828,7 @@ fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
);
// get should hit for what was filled by get_many
let _ = rt
.block_on(cc.get(ctx.clone(), REPO_ZERO, THREES_CSID))
.expect("read 3 failed");
let _ = cc.get(ctx.clone(), REPO_ZERO, THREES_CSID).await?;
assert_eq!(
cc.cachelib_stats(),
MockStoreStats {
@ -900,6 +849,8 @@ fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
},
"read 3, memcache"
);
Ok(())
}
// NOTE: Use this wrapper macro to make sure tests are executed both with Changesets and
@ -907,14 +858,14 @@ fn caching_shared<C: Changesets + 'static>(fb: FacebookInit, changesets: C) {
// CachingChangesets.
macro_rules! testify {
($plain_name: ident, $caching_name: ident, $input: ident) => {
#[fbinit::test]
fn $plain_name(fb: FacebookInit) {
run_test(fb, $input);
#[fbinit::compat_test]
async fn $plain_name(fb: FacebookInit) -> Result<(), Error> {
run_test(fb, $input).await
}
#[fbinit::test]
fn $caching_name(fb: FacebookInit) {
run_caching_test(fb, $input);
#[fbinit::compat_test]
async fn $caching_name(fb: FacebookInit) -> Result<(), Error> {
run_caching_test(fb, $input).await
}
};
}
@ -945,12 +896,12 @@ testify!(
get_many_missing
);
#[fbinit::test]
fn test_caching_fill(fb: FacebookInit) {
run_test(fb, caching_fill);
#[fbinit::compat_test]
async fn test_caching_fill(fb: FacebookInit) -> Result<(), Error> {
run_test(fb, caching_fill).await
}
#[fbinit::test]
fn test_caching_shared(fb: FacebookInit) {
run_test(fb, caching_shared);
#[fbinit::compat_test]
async fn test_caching_shared(fb: FacebookInit) -> Result<(), Error> {
run_test(fb, caching_shared).await
}

View File

@ -1,61 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
//! Implementations for wrappers that enable dynamic dispatch. Add more as necessary.
use std::sync::Arc;
use anyhow::Error;
use context::CoreContext;
use futures_ext::BoxFuture;
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
use crate::{ChangesetEntry, ChangesetInsert, Changesets, SqlChangesets};
impl Changesets for Arc<dyn Changesets> {
fn add(&self, ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error> {
(**self).add(ctx, cs)
}
fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
(**self).get(ctx, repo_id, cs_id)
}
fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
(**self).get_many(ctx, repo_id, cs_ids)
}
fn get_many_by_prefix(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_prefix: ChangesetIdPrefix,
limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
(**self).get_many_by_prefix(ctx, repo_id, cs_prefix, limit)
}
fn prime_cache(&self, ctx: &CoreContext, changesets: &[ChangesetEntry]) {
(**self).prime_cache(ctx, changesets)
}
fn get_sql_changesets(&self) -> &SqlChangesets {
(**self).get_sql_changesets()
}
}

View File

@ -17,7 +17,6 @@ use anyhow::{format_err, Error, Result};
use bytes::Bytes;
use clap::Arg;
use fbinit::FacebookInit;
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::future::TryFutureExt;
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::try_join;
@ -222,10 +221,9 @@ impl AliasVerification {
"Process Changesets with ids: [{:?}, {:?})", min_id, max_id
);
let bcs_ids = self
.sqlchangesets
.get_list_bs_cs_id_in_range_exclusive(self.repoid, min_id, max_id)
.compat();
let bcs_ids =
self.sqlchangesets
.get_list_bs_cs_id_in_range_exclusive(self.repoid, min_id, max_id);
bcs_ids
.and_then(move |bcs_id| async move {
@ -256,7 +254,6 @@ impl AliasVerification {
let (min_id, max_id) = self
.sqlchangesets
.get_changesets_ids_bounds(self.repoid)
.compat()
.await?;
let mut bounds = vec![];

View File

@ -11,13 +11,11 @@ blobstore = { path = "../blobstore" }
context = { path = "../server/context" }
mononoke_types = { path = "../mononoke_types" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
bytes = { version = "0.5", features = ["serde"] }
digest = "0.8"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
itertools = "0.8"
pin-project = "0.4"
sha-1 = "0.8"

View File

@ -6,20 +6,18 @@
*/
use anyhow::Error;
use async_trait::async_trait;
use changesets::{ChangesetEntry, ChangesetInsert, Changesets, SqlChangesets};
use context::CoreContext;
use futures::{
compat::Future01CompatExt,
future::{self, FutureExt as _, TryFutureExt},
};
use futures_ext::{BoxFuture, FutureExt};
use futures_old::{future as future_old, Future};
use futures::future;
use lock_ext::LockExt;
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
#[derive(Clone)]
pub struct MemWritesChangesets<T: Changesets + Clone + 'static> {
@ -36,67 +34,59 @@ impl<T: Changesets + Clone + 'static> MemWritesChangesets<T> {
}
}
#[async_trait]
impl<T: Changesets + Clone + 'static> Changesets for MemWritesChangesets<T> {
fn add(&self, ctx: CoreContext, ci: ChangesetInsert) -> BoxFuture<bool, Error> {
let this = self.clone();
async fn add(&self, ctx: CoreContext, ci: ChangesetInsert) -> Result<bool, Error> {
let ChangesetInsert {
repo_id,
cs_id,
parents,
} = ci;
async move {
let cs = this.get(ctx.clone(), repo_id, cs_id).compat();
let parent_css = this
.get_many(ctx.clone(), repo_id, parents.clone())
.compat();
let (cs, parent_css) = future::try_join(cs, parent_css).await?;
let cs = self.get(ctx.clone(), repo_id, cs_id);
let parent_css = self.get_many(ctx.clone(), repo_id, parents.clone());
let (cs, parent_css) = future::try_join(cs, parent_css).await?;
if cs.is_some() {
Ok(false)
} else {
let gen = parent_css.into_iter().map(|p| p.gen).max().unwrap_or(0);
if cs.is_some() {
Ok(false)
} else {
let gen = parent_css.into_iter().map(|p| p.gen).max().unwrap_or(0);
let entry = ChangesetEntry {
repo_id,
cs_id,
parents,
gen,
};
let entry = ChangesetEntry {
repo_id,
cs_id,
parents,
gen,
};
this.cache
.with(|cache| cache.insert((repo_id, cs_id), entry));
self.cache
.with(|cache| cache.insert((repo_id, cs_id), entry));
Ok(true)
}
Ok(true)
}
.boxed()
.compat()
.boxify()
}
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
) -> Result<Option<ChangesetEntry>, Error> {
match self
.cache
.with(|cache| cache.get(&(repo_id, cs_id)).cloned())
{
Some(entry) => future_old::ok(Some(entry)).boxify(),
None => self.inner.get(ctx, repo_id, cs_id).boxify(),
Some(entry) => Ok(Some(entry)),
None => self.inner.get(ctx, repo_id, cs_id).await,
}
}
fn get_many(
async fn get_many(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
) -> Result<Vec<ChangesetEntry>, Error> {
let mut from_cache = vec![];
let mut from_inner = vec![];
@ -110,22 +100,18 @@ impl<T: Changesets + Clone + 'static> Changesets for MemWritesChangesets<T> {
};
}
self.inner
.get_many(ctx, repo_id, from_inner)
.map(move |from_inner| {
from_cache.extend(from_inner);
from_cache
})
.boxify()
let from_inner = self.inner.get_many(ctx, repo_id, from_inner).await?;
from_cache.extend(from_inner);
Ok(from_cache)
}
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
_ctx: CoreContext,
_repo_id: RepositoryId,
_cs_prefix: ChangesetIdPrefix,
_limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
unimplemented!("This is not currently implemented in Gitimport")
}

View File

@ -24,7 +24,7 @@ use cloned::cloned;
use context::CoreContext;
use derived_data::BonsaiDerived;
use filestore::{self, Alias, FetchKey, FilestoreConfig, StoreRequest};
use futures::{compat::Future01CompatExt, future, stream, Stream, StreamExt, TryStreamExt};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use git2::{Oid, Repository, Sort};
use git_types::TreeHandle;
use linked_hash_map::LinkedHashMap;
@ -260,7 +260,6 @@ pub async fn gitimport(
parents: bcs.parents().collect(),
},
)
.compat()
.await?;
if bonsai_git_mapping {

View File

@ -32,6 +32,7 @@ cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
clap = "2.33"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
slog = { version = "2.5", features = ["max_level_debug"] }

View File

@ -6,16 +6,11 @@
*/
use anyhow::Error;
use async_trait::async_trait;
use changesets::{ChangesetEntry, ChangesetInsert, Changesets, SqlChangesets};
use cloned::cloned;
use context::CoreContext;
use futures::{
channel::mpsc::Sender,
compat::Future01CompatExt,
future::{FutureExt as _, TryFutureExt},
sink::SinkExt,
};
use futures_ext::{BoxFuture, FutureExt};
use futures::{channel::mpsc::Sender, sink::SinkExt};
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
@ -42,45 +37,40 @@ impl MicrowaveChangesets {
}
}
#[async_trait]
impl Changesets for MicrowaveChangesets {
fn add(&self, _ctx: CoreContext, cs: ChangesetInsert) -> BoxFuture<bool, Error> {
async fn add(&self, _ctx: CoreContext, cs: ChangesetInsert) -> Result<bool, Error> {
// See rationale in filenodes.rs for why we error out on unexpected calls under
// MicrowaveFilenodes.
unimplemented!("MicrowaveChangesets: unexpected add in repo {}", cs.repo_id)
}
fn get(
async fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<ChangesetEntry>, Error> {
) -> Result<Option<ChangesetEntry>, Error> {
cloned!(self.inner, mut self.recorder);
// NOTE: See MicrowaveFilenodes for context on this.
assert_eq!(repo_id, self.repo_id);
let entry = inner.get(ctx, repo_id, cs_id).await?;
async move {
let entry = inner.get(ctx, repo_id, cs_id).compat().await?;
if let Some(ref entry) = entry {
assert_eq!(entry.repo_id, repo_id); // Same as above
recorder.send(entry.clone()).await?;
}
Ok(entry)
if let Some(ref entry) = entry {
assert_eq!(entry.repo_id, repo_id); // Same as above
recorder.send(entry.clone()).await?;
}
.boxed()
.compat()
.boxify()
Ok(entry)
}
fn get_many(
async fn get_many(
&self,
_ctx: CoreContext,
repo_id: RepositoryId,
_cs_ids: Vec<ChangesetId>,
) -> BoxFuture<Vec<ChangesetEntry>, Error> {
) -> Result<Vec<ChangesetEntry>, Error> {
// Same as above
unimplemented!(
"MicrowaveChangesets: unexpected get_many in repo {}",
@ -88,13 +78,13 @@ impl Changesets for MicrowaveChangesets {
)
}
fn get_many_by_prefix(
async fn get_many_by_prefix(
&self,
_ctx: CoreContext,
repo_id: RepositoryId,
_cs_prefix: ChangesetIdPrefix,
_limit: usize,
) -> BoxFuture<ChangesetIdsResolvedFromPrefix, Error> {
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
// Same as above
unimplemented!(
"MicrowaveChangesets: unexpected get_many_by_prefix in repo {}",

View File

@ -816,7 +816,6 @@ impl RepoContext {
prefix,
MAX_LIMIT_AMBIGUOUS_IDS,
)
.compat()
.await?,
),
ChangesetPrefixSpecifier::Globalrev(prefix) => {
@ -1012,7 +1011,6 @@ impl RepoContext {
self.blob_repo().get_repoid(),
queue.clone(),
)
.compat()
.await?
.into_iter()
.map(|cs_entry| cs_entry.parents)

View File

@ -23,12 +23,10 @@ mononoke_types = { path = "../mononoke_types" }
revset = { path = "../revset" }
tunables = { path = "../tunables" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
sql = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
maplit = "1.0"
slog = { version = "2.5", features = ["max_level_debug"] }
thiserror = "1.0"

View File

@ -42,7 +42,6 @@ async_compression = { git = "https://github.com/facebookexperimental/rust-shed.g
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
hash_memo = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
scuba = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
@ -58,7 +57,6 @@ dashmap = "3.11.10"
derive_more = "0.99.3"
filetime = "0.2.9"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
hex = "0.4"
internment = {version = "0.4.1", features = ["serde"]}
itertools = "0.8"