bookmarks: remove repo_id from Bookmarks methods

Summary:
Remove the `repo_id` parameter from the `Bookmarks` trait methods.

The `repo_id` parameters was intended to allow a single `Bookmarks` implementation
to serve multiple repos.  In practise, however, each repo has its own config, which
results in a separate `Bookmarks` instance for each repo.  The `repo_id` parameter
complicates the API and provides no benefit.

To make this work, we switch to the `Builder` pattern for `SqlBookmarks`, which
allows us to inject the `repo_id` at construction time.  In fact nothing here
prevents us from adding back-end sharing later on, as these `SqlBookmarks` objects
are free to share data in their implementation.

Reviewed By: StanislavGlebik

Differential Revision: D22437089

fbshipit-source-id: d20e08ce6313108b74912683c620d25d6bf7ca01
This commit is contained in:
Mark Thomas 2020-07-10 04:44:58 -07:00 committed by Facebook GitHub Bot
parent aed95ea96d
commit fb5fdb9c15
22 changed files with 459 additions and 497 deletions

View File

@ -302,7 +302,7 @@ where
let to_cs_id = get_remapped_cs_id(to_sync_outcome)?;
if from_cs_id != to_cs_id {
let mut bookmark_txn = bookmarks.create_transaction(ctx.clone(), target_repo_id);
let mut bookmark_txn = bookmarks.create_transaction(ctx.clone());
debug!(
ctx.logger(),
"syncing bookmark {} to {:?}", bookmark, to_cs_id

View File

@ -17,7 +17,7 @@ use cloned::cloned;
use context::CoreContext;
use cross_repo_sync::CommitSyncRepos;
use cross_repo_sync::{rewrite_commit, upload_commits, CommitSyncOutcome, CommitSyncer};
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use fixtures::linear;
use futures::{
@ -851,12 +851,12 @@ async fn init_repos(
bookmark_renamer_type: BookmarkRenamerType,
) -> Result<(CommitSyncer<SqlSyncedCommitMapping>, TargetRepoDbs), Error> {
let ctx = CoreContext::test_mock(fb);
let target_repo_dbs = init_dbs()?;
let source_repo_id = RepositoryId::new(1);
let source_repo = new_memblob_empty_with_id(None, source_repo_id)?;
linear::initrepo(fb, &source_repo).await;
let target_repo_id = RepositoryId::new(2);
let target_repo_dbs = init_dbs(target_repo_id)?;
let target_repo = new_memblob_empty_with_id(None, target_repo_id)?;
let bookmarks = target_repo_dbs.bookmarks.clone();
let bookmark_update_log = target_repo_dbs.bookmark_update_log.clone();
@ -1132,7 +1132,7 @@ async fn init_merged_repos(
for idx in 0..num_repos {
let repoid = RepositoryId::new(idx as i32);
let small_repo = new_memblob_empty_with_id(None, repoid)?;
let small_repo_dbs = init_dbs()?;
let small_repo_dbs = init_dbs(repoid)?;
let bookmarks = small_repo_dbs.bookmarks.clone();
let bookmark_update_log = small_repo_dbs.bookmark_update_log.clone();
@ -1482,13 +1482,15 @@ async fn move_bookmark(
Ok(())
}
fn init_dbs() -> Result<TargetRepoDbs, Error> {
fn init_dbs(repo_id: RepositoryId) -> Result<TargetRepoDbs, Error> {
let con = SqliteConnection::open_in_memory()?;
con.execute_batch(SqlMutableCounters::CREATION_QUERY)?;
con.execute_batch(SqlBookmarks::CREATION_QUERY)?;
con.execute_batch(SqlBookmarksBuilder::CREATION_QUERY)?;
let connections = SqlConnections::new_single(Connection::with_sqlite(con));
let bookmarks = Arc::new(SqlBookmarks::from_sql_connections(connections.clone()));
let bookmarks = Arc::new(
SqlBookmarksBuilder::from_sql_connections(connections.clone()).with_repo_id(repo_id),
);
let counters = SqlMutableCounters::from_sql_connections(connections.clone());
Ok(TargetRepoDbs {

View File

@ -20,7 +20,7 @@ use bonsai_hg_mapping::{
use cacheblob::{dummy::DummyLease, new_cachelib_blobstore};
use changesets::{CachingChangesets, ChangesetEntry, ChangesetInsert, Changesets, SqlChangesets};
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use delayblob::DelayedBlobstore;
use fbinit::FacebookInit;
use filenodes::{FilenodeInfo, FilenodeResult, Filenodes, PreparedFilenode};
@ -125,13 +125,13 @@ pub fn new_benchmark_repo(fb: FacebookInit, settings: DelaySettings) -> Result<B
))
};
// Disable redaction check when executing benchmark reports
let repoid = RepositoryId::new(rand::random());
// TODO:
// - add caching
// - add delay
let bookmarks = Arc::new(SqlBookmarks::with_sqlite_in_memory()?);
// Disable redaction check when executing benchmark reports
let repoid = RepositoryId::new(rand::random());
let bookmarks = Arc::new(SqlBookmarksBuilder::with_sqlite_in_memory()?.with_repo_id(repoid));
let bonsai_git_mapping =
Arc::new(SqlBonsaiGitMappingConnection::with_sqlite_in_memory()?.with_repo_id(repoid));

View File

@ -329,7 +329,7 @@ impl BlobRepoHg for BlobRepo {
) -> BoxFuture<Option<HgChangesetId>, Error> {
STATS::get_bookmark.add_value(1);
self.bookmarks()
.get(ctx.clone(), name, self.get_repoid())
.get(ctx.clone(), name)
.compat()
.and_then({
let repo = self.clone();
@ -355,7 +355,6 @@ impl BlobRepoHg for BlobRepo {
.bookmarks()
.list(
ctx.clone(),
self.get_repoid(),
Freshness::MaybeStale,
&BookmarkPrefix::empty(),
&[BookmarkKind::PullDefaultPublishing][..],
@ -378,7 +377,6 @@ impl BlobRepoHg for BlobRepo {
.bookmarks()
.list(
ctx.clone(),
self.get_repoid(),
Freshness::MaybeStale,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,
@ -402,7 +400,6 @@ impl BlobRepoHg for BlobRepo {
.bookmarks()
.list(
ctx.clone(),
self.get_repoid(),
Freshness::MaybeStale,
prefix,
BookmarkKind::ALL,

View File

@ -20,7 +20,7 @@ use cacheblob::{
};
use changeset_info::ChangesetInfo;
use changesets::{CachingChangesets, Changesets, SqlChangesets};
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use deleted_files_manifest::RootDeletedManifestId;
use derived_data::BonsaiDerived;
use derived_data_filenodes::FilenodesOnlyPublic;
@ -323,7 +323,8 @@ impl TestRepoBuilder {
let phases_factory = SqlPhasesFactory::with_sqlite_in_memory()?;
let bookmarks = Arc::new(SqlBookmarks::with_sqlite_in_memory()?);
let bookmarks =
Arc::new(SqlBookmarksBuilder::with_sqlite_in_memory()?.with_repo_id(repo_id));
Ok(blobrepo_new(
bookmarks.clone(),
@ -404,7 +405,7 @@ pub fn new_memblob_with_sqlite_connection_with_id(
con: SqliteConnection,
repo_id: RepositoryId,
) -> Result<(BlobRepo, Connection)> {
con.execute_batch(SqlBookmarks::CREATION_QUERY)?;
con.execute_batch(SqlBookmarksBuilder::CREATION_QUERY)?;
con.execute_batch(SqlChangesets::CREATION_QUERY)?;
con.execute_batch(SqlBonsaiGitMappingConnection::CREATION_QUERY)?;
con.execute_batch(SqlBonsaiGlobalrevMapping::CREATION_QUERY)?;
@ -431,7 +432,9 @@ pub fn new_memblob_with_connection_with_id(
let phases_factory = SqlPhasesFactory::from_sql_connections(sql_connections.clone());
let bookmarks = Arc::new(SqlBookmarks::from_sql_connections(sql_connections.clone()));
let bookmarks = Arc::new(
SqlBookmarksBuilder::from_sql_connections(sql_connections.clone()).with_repo_id(repo_id),
);
Ok((
blobrepo_new(
@ -484,14 +487,15 @@ async fn new_development(
let bookmarks = async {
let sql_bookmarks = Arc::new(
sql_factory
.open::<SqlBookmarks>()
.open::<SqlBookmarksBuilder>()
.compat()
.await
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?,
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?
.with_repo_id(repoid),
);
let bookmarks: Arc<dyn Bookmarks> = if let Some(ttl) = bookmarks_cache_ttl {
Arc::new(CachedBookmarks::new(sql_bookmarks.clone(), ttl))
Arc::new(CachedBookmarks::new(sql_bookmarks.clone(), ttl, repoid))
} else {
sql_bookmarks.clone()
};
@ -627,7 +631,11 @@ async fn new_production(
let filenodes_tier = sql_factory.tier_name_shardable::<NewFilenodesBuilder>()?;
let filenodes_builder = sql_factory.open_shardable::<NewFilenodesBuilder>().compat();
let bookmarks = sql_factory.open::<SqlBookmarks>().compat();
let bookmarks = async {
let builder = sql_factory.open::<SqlBookmarksBuilder>().compat().await?;
Ok(builder.with_repo_id(repoid))
};
let changesets = sql_factory.open::<SqlChangesets>().compat();
let bonsai_git_mapping = async {
let conn = sql_factory
@ -688,7 +696,7 @@ async fn new_production(
let (bookmarks, bookmark_update_log): (Arc<dyn Bookmarks>, Arc<dyn BookmarkUpdateLog>) =
if let Some(ttl) = bookmarks_cache_ttl {
(
Arc::new(CachedBookmarks::new(bookmarks.clone(), ttl)),
Arc::new(CachedBookmarks::new(bookmarks.clone(), ttl, repoid)),
bookmarks,
)
} else {

View File

@ -161,7 +161,6 @@ impl BlobRepo {
self.attribute_expected::<dyn Bookmarks>()
.list(
ctx,
self.get_repoid(),
Freshness::MaybeStale,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,
@ -181,7 +180,6 @@ impl BlobRepo {
self.attribute_expected::<dyn Bookmarks>()
.list(
ctx,
self.get_repoid(),
Freshness::MaybeStale,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,
@ -202,7 +200,6 @@ impl BlobRepo {
self.attribute_expected::<dyn Bookmarks>()
.list(
ctx,
self.get_repoid(),
Freshness::MaybeStale,
prefix,
BookmarkKind::ALL,
@ -248,7 +245,7 @@ impl BlobRepo {
) -> BoxFuture<Option<ChangesetId>, Error> {
STATS::get_bookmark.add_value(1);
self.attribute_expected::<dyn Bookmarks>()
.get(ctx, name, self.get_repoid())
.get(ctx, name)
.compat()
.boxify()
}
@ -305,14 +302,7 @@ impl BlobRepo {
) -> impl Stream<Item = (Option<ChangesetId>, BookmarkUpdateReason, Timestamp), Error = Error>
{
self.attribute_expected::<dyn BookmarkUpdateLog>()
.list_bookmark_log_entries(
ctx.clone(),
name,
self.get_repoid(),
max_rec,
offset,
freshness,
)
.list_bookmark_log_entries(ctx.clone(), name, max_rec, offset, freshness)
.compat()
}
@ -324,7 +314,7 @@ impl BlobRepo {
freshness: Freshness,
) -> impl Stream<Item = BookmarkUpdateLogEntry, Error = Error> {
self.attribute_expected::<dyn BookmarkUpdateLog>()
.read_next_bookmark_log_entries(ctx, id, self.get_repoid(), limit, freshness)
.read_next_bookmark_log_entries(ctx, id, limit, freshness)
.compat()
}
@ -335,14 +325,14 @@ impl BlobRepo {
exclude_reason: Option<BookmarkUpdateReason>,
) -> impl Future<Item = u64, Error = Error> {
self.attribute_expected::<dyn BookmarkUpdateLog>()
.count_further_bookmark_log_entries(ctx, id, self.get_repoid(), exclude_reason)
.count_further_bookmark_log_entries(ctx, id, exclude_reason)
.compat()
}
pub fn update_bookmark_transaction(&self, ctx: CoreContext) -> Box<dyn BookmarkTransaction> {
STATS::update_bookmark_transaction.add_value(1);
self.attribute_expected::<dyn Bookmarks>()
.create_transaction(ctx, self.get_repoid())
.create_transaction(ctx)
}
// Returns the generation number of a changeset

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use mononoke_types::RepositoryId;
use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql_ext::SqlConnections;
use crate::store::SqlBookmarks;
#[derive(Clone)]
pub struct SqlBookmarksBuilder {
pub(crate) connections: SqlConnections,
}
impl SqlConstruct for SqlBookmarksBuilder {
const LABEL: &'static str = "bookmarks";
const CREATION_QUERY: &'static str = include_str!("../schemas/sqlite-bookmarks.sql");
fn from_sql_connections(connections: SqlConnections) -> Self {
Self { connections }
}
}
impl SqlConstructFromMetadataDatabaseConfig for SqlBookmarksBuilder {}
impl SqlBookmarksBuilder {
pub fn with_repo_id(self, repo_id: RepositoryId) -> SqlBookmarks {
SqlBookmarks::new(repo_id, self.connections)
}
}

View File

@ -7,10 +7,11 @@
#![deny(warnings)]
pub mod builder;
pub mod store;
pub mod transaction;
pub use crate::store::SqlBookmarks;
pub use crate::builder::SqlBookmarksBuilder;
#[cfg(test)]
mod test {
@ -37,12 +38,14 @@ mod test {
#[fbinit::compat_test]
async fn test_update_kind_compatibility(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let store = SqlBookmarks::with_sqlite_in_memory().unwrap();
let store = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let scratch_name = BookmarkName::new("book1").unwrap();
let publishing_name = BookmarkName::new("book2").unwrap();
let pull_default_name = BookmarkName::new("book3").unwrap();
let conn = store.write_connection.clone();
let conn = store.connections.write_connection.clone();
let rows = vec![
(
@ -68,12 +71,12 @@ mod test {
crate::transaction::insert_bookmarks(&conn, &rows[..]).await?;
// Using 'create_scratch' to replace a non-scratch bookmark should fail.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.create_scratch(&publishing_name, ONES_CSID)?;
assert!(!txn.commit().await?);
// Using 'create' to replace a scratch bookmark should fail.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.create(
&scratch_name,
ONES_CSID,
@ -83,17 +86,17 @@ mod test {
assert!(!txn.commit().await?);
// Using 'update_scratch' to update a publishing bookmark should fail.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update_scratch(&publishing_name, TWOS_CSID, ONES_CSID)?;
assert!(!txn.commit().await?);
// Using 'update_scratch' to update a pull-default bookmark should fail.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update_scratch(&pull_default_name, TWOS_CSID, ONES_CSID)?;
assert!(!txn.commit().await?);
// Using 'update' to update a publishing bookmark should succeed.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update(
&publishing_name,
TWOS_CSID,
@ -104,7 +107,7 @@ mod test {
assert!(txn.commit().await?);
// Using 'update' to update a pull-default bookmark should succeed.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update(
&pull_default_name,
TWOS_CSID,
@ -115,7 +118,7 @@ mod test {
assert!(txn.commit().await?);
// Using 'update' to update a scratch bookmark should fail.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update(
&scratch_name,
TWOS_CSID,
@ -126,7 +129,7 @@ mod test {
assert!(!txn.commit().await?);
// Using 'update_scratch' to update a scratch bookmark should succeed.
let mut txn = store.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = store.create_transaction(ctx.clone());
txn.update_scratch(&scratch_name, TWOS_CSID, ONES_CSID)?;
assert!(txn.commit().await?);
@ -172,8 +175,10 @@ mod test {
let ctx = CoreContext::test_mock(fb);
let repo_id = RepositoryId::new(123);
let store = SqlBookmarks::with_sqlite_in_memory().unwrap();
let conn = store.write_connection.clone();
let store = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(repo_id);
let conn = store.connections.write_connection.clone();
let rows: Vec<_> = bookmarks
.iter()
@ -186,7 +191,6 @@ mod test {
let response = store
.list(
ctx,
repo_id,
query_freshness,
query_prefix,
query_kinds,

View File

@ -19,8 +19,7 @@ use futures::future::{self, BoxFuture, Future, FutureExt, TryFutureExt};
use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
use mononoke_types::Timestamp;
use mononoke_types::{ChangesetId, RepositoryId};
use sql::{queries, Connection};
use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql::queries;
use sql_ext::SqlConnections;
use stats::prelude::*;
@ -33,13 +32,6 @@ define_stats! {
get_bookmark: timeseries(Rate, Sum),
}
#[derive(Clone)]
pub struct SqlBookmarks {
pub(crate) write_connection: Connection,
pub(crate) read_connection: Connection,
pub(crate) read_master_connection: Connection,
}
queries! {
read SelectBookmark(repo_id: RepositoryId, name: BookmarkName) -> (ChangesetId) {
"SELECT changeset_id
@ -194,22 +186,21 @@ queries! {
}
impl SqlConstruct for SqlBookmarks {
const LABEL: &'static str = "bookmarks";
#[derive(Clone)]
pub struct SqlBookmarks {
repo_id: RepositoryId,
pub(crate) connections: SqlConnections,
}
const CREATION_QUERY: &'static str = include_str!("../schemas/sqlite-bookmarks.sql");
fn from_sql_connections(connections: SqlConnections) -> Self {
impl SqlBookmarks {
pub(crate) fn new(repo_id: RepositoryId, connections: SqlConnections) -> Self {
Self {
write_connection: connections.write_connection,
read_connection: connections.read_connection,
read_master_connection: connections.read_master_connection,
repo_id,
connections,
}
}
}
impl SqlConstructFromMetadataDatabaseConfig for SqlBookmarks {}
fn query_to_stream<F>(query: F) -> BoxStream<'static, Result<(Bookmark, ChangesetId)>>
where
F: Future<Output = Result<Vec<(BookmarkName, BookmarkKind, ChangesetId)>>> + Send + 'static,
@ -228,7 +219,6 @@ impl Bookmarks for SqlBookmarks {
fn list(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
freshness: Freshness,
prefix: &BookmarkPrefix,
kinds: &[BookmarkKind],
@ -240,23 +230,23 @@ impl Bookmarks for SqlBookmarks {
STATS::list_maybe_stale.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
&self.read_connection
&self.connections.read_connection
}
Freshness::MostRecent => {
STATS::list.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
&self.read_master_connection
&self.connections.read_master_connection
}
};
if prefix.is_empty() {
match pagination {
BookmarkPagination::FromStart => {
query_to_stream(SelectAll::query(&conn, &repo_id, &limit, kinds).compat())
query_to_stream(SelectAll::query(&conn, &self.repo_id, &limit, kinds).compat())
}
BookmarkPagination::After(ref after) => query_to_stream(
SelectAllAfter::query(&conn, &repo_id, after, &limit, kinds).compat(),
SelectAllAfter::query(&conn, &self.repo_id, after, &limit, kinds).compat(),
),
}
} else {
@ -265,7 +255,7 @@ impl Bookmarks for SqlBookmarks {
BookmarkPagination::FromStart => query_to_stream(
SelectByPrefix::query(
&conn,
&repo_id,
&self.repo_id,
&prefix_like_pattern,
&"\\",
&limit,
@ -276,7 +266,7 @@ impl Bookmarks for SqlBookmarks {
BookmarkPagination::After(ref after) => query_to_stream(
SelectByPrefixAfter::query(
&conn,
&repo_id,
&self.repo_id,
&prefix_like_pattern,
&"\\",
after,
@ -293,26 +283,25 @@ impl Bookmarks for SqlBookmarks {
&self,
ctx: CoreContext,
name: &BookmarkName,
repo_id: RepositoryId,
) -> BoxFuture<'static, Result<Option<ChangesetId>>> {
STATS::get_bookmark.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
SelectBookmark::query(&self.read_master_connection, &repo_id, &name)
.compat()
.map_ok(|rows| rows.into_iter().next().map(|row| row.0))
.boxed()
SelectBookmark::query(
&self.connections.read_master_connection,
&self.repo_id,
&name,
)
.compat()
.map_ok(|rows| rows.into_iter().next().map(|row| row.0))
.boxed()
}
fn create_transaction(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
) -> Box<dyn BookmarkTransaction> {
fn create_transaction(&self, ctx: CoreContext) -> Box<dyn BookmarkTransaction> {
Box::new(SqlBookmarksTransaction::new(
ctx,
self.write_connection.clone(),
repo_id.clone(),
self.connections.write_connection.clone(),
self.repo_id.clone(),
))
}
}
@ -322,7 +311,6 @@ impl BookmarkUpdateLog for SqlBookmarks {
&self,
ctx: CoreContext,
name: BookmarkName,
repo_id: RepositoryId,
max_rec: u32,
offset: Option<u32>,
freshness: Freshness,
@ -330,22 +318,26 @@ impl BookmarkUpdateLog for SqlBookmarks {
let connection = if freshness == Freshness::MostRecent {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
&self.read_master_connection
&self.connections.read_master_connection
} else {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
&self.read_connection
&self.connections.read_connection
};
match offset {
Some(offset) => {
SelectBookmarkLogsWithOffset::query(&connection, &repo_id, &name, &max_rec, &offset)
.compat()
.map_ok(|rows| stream::iter(rows.into_iter().map(Ok)))
.try_flatten_stream()
.boxed()
}
None => SelectBookmarkLogs::query(&connection, &repo_id, &name, &max_rec)
Some(offset) => SelectBookmarkLogsWithOffset::query(
&connection,
&self.repo_id,
&name,
&max_rec,
&offset,
)
.compat()
.map_ok(|rows| stream::iter(rows.into_iter().map(Ok)))
.try_flatten_stream()
.boxed(),
None => SelectBookmarkLogs::query(&connection, &self.repo_id, &name, &max_rec)
.compat()
.map_ok(|rows| stream::iter(rows.into_iter().map(Ok)))
.try_flatten_stream()
@ -357,23 +349,26 @@ impl BookmarkUpdateLog for SqlBookmarks {
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
maybe_exclude_reason: Option<BookmarkUpdateReason>,
) -> BoxFuture<'static, Result<u64>> {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
let query = match maybe_exclude_reason {
Some(ref r) => CountFurtherBookmarkLogEntriesWithoutReason::query(
&self.read_connection,
&self.connections.read_connection,
&id,
&repoid,
&self.repo_id,
&r,
)
.compat()
.boxed(),
None => CountFurtherBookmarkLogEntries::query(&self.read_connection, &id, &repoid)
.compat()
.boxed(),
None => CountFurtherBookmarkLogEntries::query(
&self.connections.read_connection,
&id,
&self.repo_id,
)
.compat()
.boxed(),
};
query
@ -398,110 +393,120 @@ impl BookmarkUpdateLog for SqlBookmarks {
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
) -> BoxFuture<'static, Result<Vec<(BookmarkUpdateReason, u64)>>> {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
CountFurtherBookmarkLogEntriesByReason::query(&self.read_connection, &id, &repoid)
.compat()
.map_ok(|entries| entries.into_iter().collect())
.boxed()
CountFurtherBookmarkLogEntriesByReason::query(
&self.connections.read_connection,
&id,
&self.repo_id,
)
.compat()
.map_ok(|entries| entries.into_iter().collect())
.boxed()
}
fn skip_over_bookmark_log_entries_with_reason(
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
reason: BookmarkUpdateReason,
) -> BoxFuture<'static, Result<Option<u64>>> {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
SkipOverBookmarkLogEntriesWithReason::query(&self.read_connection, &id, &repoid, &reason)
.compat()
.map_ok(|entries| entries.first().map(|entry| entry.0))
.boxed()
SkipOverBookmarkLogEntriesWithReason::query(
&self.connections.read_connection,
&id,
&self.repo_id,
&reason,
)
.compat()
.map_ok(|entries| entries.first().map(|entry| entry.0))
.boxed()
}
fn read_next_bookmark_log_entries_same_bookmark_and_reason(
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
limit: u64,
) -> BoxStream<'static, Result<BookmarkUpdateLogEntry>> {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
ReadNextBookmarkLogEntries::query(&self.read_connection, &id, &repoid, &limit)
.compat()
.map_ok(|entries| {
let homogenous_entries: Vec<_> = match entries.iter().nth(0).cloned() {
Some(first_entry) => {
// Note: types are explicit here to protect us from query behavior change
// when tuple items 2 or 5 become something else, and we still succeed
// compiling everything because of the type inference
let first_name: &BookmarkName = &first_entry.2;
let first_reason: &BookmarkUpdateReason = &first_entry.5;
entries
.into_iter()
.take_while(|entry| {
let name: &BookmarkName = &entry.2;
let reason: &BookmarkUpdateReason = &entry.5;
name == first_name && reason == first_reason
})
.collect()
}
None => entries.into_iter().collect(),
};
stream::iter(homogenous_entries.into_iter().map(Ok)).and_then(|entry| async move {
let (
id,
repo_id,
name,
to_cs_id,
from_cs_id,
reason,
timestamp,
bundle_handle,
commit_timestamps_json,
) = entry;
let bundle_replay_data =
RawBundleReplayData::maybe_new(bundle_handle, commit_timestamps_json)?;
Ok(BookmarkUpdateLogEntry {
id,
repo_id,
bookmark_name: name,
to_changeset_id: to_cs_id,
from_changeset_id: from_cs_id,
reason,
timestamp,
bundle_replay_data,
})
ReadNextBookmarkLogEntries::query(
&self.connections.read_connection,
&id,
&self.repo_id,
&limit,
)
.compat()
.map_ok(|entries| {
let homogenous_entries: Vec<_> = match entries.iter().nth(0).cloned() {
Some(first_entry) => {
// Note: types are explicit here to protect us from query behavior change
// when tuple items 2 or 5 become something else, and we still succeed
// compiling everything because of the type inference
let first_name: &BookmarkName = &first_entry.2;
let first_reason: &BookmarkUpdateReason = &first_entry.5;
entries
.into_iter()
.take_while(|entry| {
let name: &BookmarkName = &entry.2;
let reason: &BookmarkUpdateReason = &entry.5;
name == first_name && reason == first_reason
})
.collect()
}
None => entries.into_iter().collect(),
};
stream::iter(homogenous_entries.into_iter().map(Ok)).and_then(|entry| async move {
let (
id,
repo_id,
name,
to_cs_id,
from_cs_id,
reason,
timestamp,
bundle_handle,
commit_timestamps_json,
) = entry;
let bundle_replay_data =
RawBundleReplayData::maybe_new(bundle_handle, commit_timestamps_json)?;
Ok(BookmarkUpdateLogEntry {
id,
repo_id,
bookmark_name: name,
to_changeset_id: to_cs_id,
from_changeset_id: from_cs_id,
reason,
timestamp,
bundle_replay_data,
})
})
.try_flatten_stream()
.boxed()
})
.try_flatten_stream()
.boxed()
}
fn read_next_bookmark_log_entries(
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
limit: u64,
freshness: Freshness,
) -> BoxStream<'static, Result<BookmarkUpdateLogEntry>> {
let connection = if freshness == Freshness::MostRecent {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
&self.read_master_connection
&self.connections.read_master_connection
} else {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
&self.read_connection
&self.connections.read_connection
};
ReadNextBookmarkLogEntries::query(&connection, &id, &repoid, &limit)
ReadNextBookmarkLogEntries::query(&connection, &id, &self.repo_id, &limit)
.compat()
.map_ok(|entries| {
stream::iter(entries.into_iter().map(Ok)).and_then(|entry| async move {

View File

@ -15,7 +15,7 @@ use bookmarks::{
BookmarkUpdateLogEntry, BookmarkUpdateReason, Bookmarks, Freshness, RawBundleReplayData,
};
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use futures::stream::TryStreamExt;
use maplit::hashmap;
@ -57,11 +57,13 @@ fn compare_log_entries(
fn test_simple_unconditional_set_get(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_correct = create_bookmark_name("book");
let name_incorrect = create_bookmark_name("book2");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(
&name_correct,
ONES_CSID,
@ -72,23 +74,17 @@ fn test_simple_unconditional_set_get(fb: FacebookInit) {
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_correct, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_correct).await.unwrap(),
Some(ONES_CSID)
);
assert_eq!(
bookmarks
.get(ctx.clone(), &name_incorrect, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_incorrect).await.unwrap(),
None
);
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 0, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -110,11 +106,13 @@ fn test_simple_unconditional_set_get(fb: FacebookInit) {
fn test_multi_unconditional_set_get(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let name_2 = create_bookmark_name("book2");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
txn.force_set(&name_2, TWOS_CSID, BookmarkUpdateReason::TestMove, None)
@ -122,18 +120,12 @@ fn test_multi_unconditional_set_get(fb: FacebookInit) {
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
assert_eq!(
bookmarks
.get(ctx.clone(), &name_2, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_2).await.unwrap(),
Some(TWOS_CSID)
);
})
@ -143,24 +135,23 @@ fn test_multi_unconditional_set_get(fb: FacebookInit) {
fn test_unconditional_set_same_bookmark(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
})
@ -170,25 +161,24 @@ fn test_unconditional_set_same_bookmark(fb: FacebookInit) {
fn test_simple_create(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 0, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -210,15 +200,17 @@ fn test_simple_create(fb: FacebookInit) {
fn test_create_already_existing(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(!txn.commit().await.unwrap());
@ -229,24 +221,26 @@ fn test_create_already_existing(fb: FacebookInit) {
fn test_create_change_same_bookmark(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn
.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn
.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn
@ -259,7 +253,7 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -272,7 +266,7 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -285,7 +279,7 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
.force_delete(&name_1, BookmarkUpdateReason::TestMove, None)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_delete(&name_1, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn
@ -298,7 +292,7 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.delete(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn
@ -311,7 +305,7 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
)
.is_err());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -330,15 +324,17 @@ fn test_create_change_same_bookmark(fb: FacebookInit) {
fn test_simple_update_bookmark(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -350,16 +346,13 @@ fn test_simple_update_bookmark(fb: FacebookInit) {
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(TWOS_CSID)
);
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 1, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 1, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -381,15 +374,17 @@ fn test_simple_update_bookmark(fb: FacebookInit) {
fn test_noop_update(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
ONES_CSID,
@ -401,10 +396,7 @@ fn test_noop_update(fb: FacebookInit) {
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
})
@ -414,28 +406,27 @@ fn test_noop_update(fb: FacebookInit) {
fn test_scratch_update_bookmark(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create_scratch(&name_1, ONES_CSID).unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update_scratch(&name_1, TWOS_CSID, ONES_CSID).unwrap();
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks.get(ctx.clone(), &name_1).await.unwrap(),
Some(TWOS_CSID)
);
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 1, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 1, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -448,10 +439,12 @@ fn test_scratch_update_bookmark(fb: FacebookInit) {
fn test_update_non_existent_bookmark(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -468,15 +461,17 @@ fn test_update_non_existent_bookmark(fb: FacebookInit) {
fn test_update_existing_bookmark_with_incorrect_commit(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
ONES_CSID,
@ -493,48 +488,34 @@ fn test_update_existing_bookmark_with_incorrect_commit(fb: FacebookInit) {
fn test_force_delete(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_delete(&name_1, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
None
);
assert_eq!(bookmarks.get(ctx.clone(), &name_1).await.unwrap(), None);
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert!(bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap()
.is_some());
assert!(bookmarks.get(ctx.clone(), &name_1).await.unwrap().is_some());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_delete(&name_1, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
None
);
assert_eq!(bookmarks.get(ctx.clone(), &name_1).await.unwrap(), None);
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 2, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 2, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -556,32 +537,30 @@ fn test_force_delete(fb: FacebookInit) {
fn test_delete(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.delete(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert_eq!(txn.commit().await.unwrap(), false);
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert!(bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap()
.is_some());
assert!(bookmarks.get(ctx.clone(), &name_1).await.unwrap().is_some());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.delete(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
compare_log_entries(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 1, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 1, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap(),
@ -603,20 +582,18 @@ fn test_delete(fb: FacebookInit) {
fn test_delete_incorrect_hash(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.unwrap());
assert!(bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap()
.is_some());
assert!(bookmarks.get(ctx.clone(), &name_1).await.unwrap().is_some());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.delete(&name_1, TWOS_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert_eq!(txn.commit().await.unwrap(), false);
@ -627,11 +604,13 @@ fn test_delete_incorrect_hash(fb: FacebookInit) {
fn test_list_by_prefix(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book1");
let name_2 = create_bookmark_name("book2");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.create(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
txn.create(&name_2, TWOS_CSID, BookmarkUpdateReason::TestMove, None)
@ -646,7 +625,6 @@ fn test_list_by_prefix(fb: FacebookInit) {
bookmarks
.list(
ctx.clone(),
REPO_ZERO,
Freshness::MostRecent,
&prefix,
BookmarkKind::ALL,
@ -666,7 +644,6 @@ fn test_list_by_prefix(fb: FacebookInit) {
bookmarks
.list(
ctx.clone(),
REPO_ZERO,
Freshness::MostRecent,
&name_1_prefix,
BookmarkKind::ALL,
@ -686,7 +663,6 @@ fn test_list_by_prefix(fb: FacebookInit) {
bookmarks
.list(
ctx.clone(),
REPO_ZERO,
Freshness::MostRecent,
&name_2_prefix,
BookmarkKind::ALL,
@ -708,16 +684,18 @@ fn test_list_by_prefix(fb: FacebookInit) {
fn test_create_different_repos(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let builder = SqlBookmarksBuilder::with_sqlite_in_memory().unwrap();
let bookmarks_0 = builder.clone().with_repo_id(REPO_ZERO);
let bookmarks_1 = builder.with_repo_id(REPO_ONE);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks_0.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
// Updating value from another repo, should fail
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ONE);
let mut txn = bookmarks_1.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -729,38 +707,32 @@ fn test_create_different_repos(fb: FacebookInit) {
assert_eq!(txn.commit().await.unwrap(), false);
// Creating value should succeed
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ONE);
let mut txn = bookmarks_1.create_transaction(ctx.clone());
txn.create(&name_1, TWOS_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks_0.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
assert_eq!(
bookmarks.get(ctx.clone(), &name_1, REPO_ONE).await.unwrap(),
bookmarks_1.get(ctx.clone(), &name_1).await.unwrap(),
Some(TWOS_CSID)
);
// Force deleting should delete only from one repo
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ONE);
let mut txn = bookmarks_1.create_transaction(ctx.clone());
txn.force_delete(&name_1, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
assert_eq!(
bookmarks
.get(ctx.clone(), &name_1, REPO_ZERO)
.await
.unwrap(),
bookmarks_0.get(ctx.clone(), &name_1).await.unwrap(),
Some(ONES_CSID)
);
// delete should fail for another repo
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ONE);
let mut txn = bookmarks_1.create_transaction(ctx.clone());
txn.delete(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert_eq!(txn.commit().await.unwrap(), false);
@ -769,12 +741,12 @@ fn test_create_different_repos(fb: FacebookInit) {
async fn fetch_single(
fb: FacebookInit,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
id: u64,
) -> BookmarkUpdateLogEntry {
let ctx = CoreContext::test_mock(fb);
bookmarks
.read_next_bookmark_log_entries(ctx, id, REPO_ZERO, 1, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx, id, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -787,16 +759,18 @@ async fn fetch_single(
fn test_log_correct_order(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let name_2 = create_bookmark_name("book2");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -807,7 +781,7 @@ fn test_log_correct_order(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
THREES_CSID,
@ -818,7 +792,7 @@ fn test_log_correct_order(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
FOURS_CSID,
@ -829,12 +803,12 @@ fn test_log_correct_order(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_2, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
FIVES_CSID,
@ -845,7 +819,7 @@ fn test_log_correct_order(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
SIXES_CSID,
@ -873,7 +847,7 @@ fn test_log_correct_order(fb: FacebookInit) {
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 4, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 0, 4, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -883,7 +857,7 @@ fn test_log_correct_order(fb: FacebookInit) {
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 8, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 0, 8, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -892,7 +866,7 @@ fn test_log_correct_order(fb: FacebookInit) {
);
let entries = bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 6, Freshness::MostRecent)
.read_next_bookmark_log_entries(ctx.clone(), 0, 6, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap();
@ -914,7 +888,7 @@ fn test_log_correct_order(fb: FacebookInit) {
);
let entries = bookmarks
.read_next_bookmark_log_entries_same_bookmark_and_reason(ctx.clone(), 0, REPO_ZERO, 6)
.read_next_bookmark_log_entries_same_bookmark_and_reason(ctx.clone(), 0, 6)
.try_collect::<Vec<_>>()
.await
.unwrap();
@ -927,7 +901,7 @@ fn test_log_correct_order(fb: FacebookInit) {
assert_eq!(cs_ids, vec![ONES_CSID, TWOS_CSID, THREES_CSID, FOURS_CSID]);
let entries = bookmarks
.read_next_bookmark_log_entries_same_bookmark_and_reason(ctx.clone(), 5, REPO_ZERO, 6)
.read_next_bookmark_log_entries_same_bookmark_and_reason(ctx.clone(), 5, 6)
.try_collect::<Vec<_>>()
.await
.unwrap();
@ -945,14 +919,16 @@ fn test_log_correct_order(fb: FacebookInit) {
fn test_log_bundle_replay_data(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let expected = RawBundleReplayData {
bundle_handle: "handle".to_string(),
commit_timestamps_json: "json_data".to_string(),
};
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(
&name_1,
ONES_CSID,
@ -972,22 +948,25 @@ fn test_log_bundle_replay_data(fb: FacebookInit) {
fn test_read_log_entry_many_repos(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let builder = SqlBookmarksBuilder::with_sqlite_in_memory().unwrap();
let bookmarks_0 = builder.clone().with_repo_id(REPO_ZERO);
let bookmarks_1 = builder.clone().with_repo_id(REPO_ONE);
let bookmarks_2 = builder.with_repo_id(REPO_TWO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks_0.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ONE);
let mut txn = bookmarks_1.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ZERO, 1, Freshness::MostRecent)
bookmarks_0
.read_next_bookmark_log_entries(ctx.clone(), 0, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -996,8 +975,8 @@ fn test_read_log_entry_many_repos(fb: FacebookInit) {
);
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_ONE, 1, Freshness::MostRecent)
bookmarks_1
.read_next_bookmark_log_entries(ctx.clone(), 0, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -1006,8 +985,8 @@ fn test_read_log_entry_many_repos(fb: FacebookInit) {
);
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 1, REPO_ZERO, 1, Freshness::MostRecent)
bookmarks_0
.read_next_bookmark_log_entries(ctx.clone(), 1, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -1016,8 +995,8 @@ fn test_read_log_entry_many_repos(fb: FacebookInit) {
);
assert_eq!(
bookmarks
.read_next_bookmark_log_entries(ctx.clone(), 0, REPO_TWO, 1, Freshness::MostRecent)
bookmarks_2
.read_next_bookmark_log_entries(ctx.clone(), 0, 1, Freshness::MostRecent)
.try_collect::<Vec<_>>()
.await
.unwrap()
@ -1060,15 +1039,17 @@ fn test_update_reason_conversion() -> Result<(), Error> {
fn test_list_bookmark_log_entries(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let ctx = CoreContext::test_mock(fb);
let bookmarks = SqlBookmarks::with_sqlite_in_memory().unwrap();
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()
.unwrap()
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();
assert!(txn.commit().await.is_ok());
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
TWOS_CSID,
@ -1079,7 +1060,7 @@ fn test_list_bookmark_log_entries(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
THREES_CSID,
@ -1090,7 +1071,7 @@ fn test_list_bookmark_log_entries(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
FOURS_CSID,
@ -1101,7 +1082,7 @@ fn test_list_bookmark_log_entries(fb: FacebookInit) {
.unwrap();
txn.commit().await.unwrap();
let mut txn = bookmarks.create_transaction(ctx.clone(), REPO_ZERO);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.update(
&name_1,
FIVES_CSID,
@ -1117,7 +1098,6 @@ fn test_list_bookmark_log_entries(fb: FacebookInit) {
.list_bookmark_log_entries(
ctx.clone(),
name_1.clone(),
REPO_ZERO,
3,
None,
Freshness::MostRecent
@ -1135,14 +1115,7 @@ fn test_list_bookmark_log_entries(fb: FacebookInit) {
assert_eq!(
bookmarks
.list_bookmark_log_entries(
ctx.clone(),
name_1,
REPO_ZERO,
3,
Some(1),
Freshness::MostRecent
)
.list_bookmark_log_entries(ctx.clone(), name_1, 3, Some(1), Freshness::MostRecent)
.map_ok(|(cs, rs, _ts)| (cs, rs)) // dropping timestamps
.try_collect::<Vec<_>>()
.await

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2.
*/
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@ -42,7 +42,6 @@ impl Cache {
// NOTE: this function should be fast, as it is executed under a lock
fn new(
ctx: CoreContext,
repoid: RepositoryId,
bookmarks: Arc<dyn Bookmarks>,
expires: Instant,
freshness: Freshness,
@ -52,7 +51,6 @@ impl Cache {
bookmarks
.list(
ctx,
repoid,
freshness,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,
@ -91,42 +89,42 @@ impl Cache {
#[derive(Clone)]
pub struct CachedBookmarks {
repo_id: RepositoryId,
ttl: Duration,
caches: Arc<Mutex<HashMap<RepositoryId, Cache>>>,
cache: Arc<Mutex<Option<Cache>>>,
bookmarks: Arc<dyn Bookmarks>,
}
impl CachedBookmarks {
pub fn new(bookmarks: Arc<dyn Bookmarks>, ttl: Duration) -> Self {
pub fn new(bookmarks: Arc<dyn Bookmarks>, ttl: Duration, repo_id: RepositoryId) -> Self {
Self {
repo_id,
ttl,
bookmarks,
caches: Arc::new(Mutex::new(HashMap::new())),
cache: Arc::new(Mutex::new(None)),
}
}
/// Gets or creates a cache for specified respository
fn get_cache(&self, ctx: CoreContext, repoid: RepositoryId) -> Cache {
let mut caches = self.caches.lock().expect("lock poisoned");
/// Gets or creates the cache
fn cache(&self, ctx: CoreContext) -> Cache {
let mut cache = self.cache.lock().expect("lock poisoned");
let now = Instant::now();
caches
.entry(repoid)
// create new cache if the old one has either expired or failed
.and_modify(|cache| {
match *cache {
Some(ref mut cache) => {
// create new cache if the old one has either expired or failed
let cache_failed = cache.is_failed();
let mut cache_hit = true;
if cache.expires <= now || cache_failed {
cache_hit = false;
*cache = Cache::new(
ctx.clone(),
repoid,
self.bookmarks.clone(),
now + self.ttl,
// NOTE: We want freshness to behave as follows:
// - if we are asking for maybe-stale bookmarks we
// want to keep asking for this type of bookmarks
// - if we had a write from the current machine,
// `purge_cache` will request bookmarks from the
// `purge` will request bookmarks from the
// master region, but it might fail:
// - if it fails we want to keep asking for fresh bookmarks
// - if it succeeds the next request should go through a
@ -139,52 +137,51 @@ impl CachedBookmarks {
}
if cache_hit {
STATS::cached_bookmarks_hits.add_value(1, (repoid.id().to_string(),))
STATS::cached_bookmarks_hits.add_value(1, (self.repo_id.id().to_string(),))
} else {
STATS::cached_bookmarks_misses.add_value(1, (repoid.id().to_string(),))
STATS::cached_bookmarks_misses.add_value(1, (self.repo_id.id().to_string(),))
}
})
// create new cache if there is no cache entry
.or_insert_with(|| {
Cache::new(
cache.clone()
}
None => {
// create new cache if there isn't one
let new_cache = Cache::new(
ctx,
repoid,
self.bookmarks.clone(),
now + self.ttl,
Freshness::MaybeStale,
)
})
.clone()
);
*cache = Some(new_cache.clone());
new_cache
}
}
}
/// Removes old cache entry and replaces whith new one which will go through master region
fn purge_cache(&self, ctx: CoreContext, repoid: RepositoryId) -> Cache {
let cache = Cache::new(
/// Removes old cache and replaces with a new one which will go through master region
fn purge(&self, ctx: CoreContext) -> Cache {
let new_cache = Cache::new(
ctx,
repoid,
self.bookmarks.clone(),
Instant::now() + self.ttl,
Freshness::MostRecent,
);
{
let mut caches = self.caches.lock().expect("lock poisoned");
caches.insert(repoid, cache.clone());
}
cache
let mut cache = self.cache.lock().expect("lock poisoned");
*cache = Some(new_cache.clone());
new_cache
}
/// Answers a bookmark query from cache.
fn list_from_publishing_cache(
&self,
ctx: CoreContext,
repoid: RepositoryId,
prefix: &BookmarkPrefix,
kinds: &[BookmarkKind],
pagination: &BookmarkPagination,
limit: u64,
) -> BoxStream<'static, Result<(Bookmark, ChangesetId)>> {
let range = prefix.to_range().with_pagination(pagination.clone());
let cache = self.get_cache(ctx, repoid);
let cache = self.cache(ctx);
let filter_kinds = if BookmarkKind::ALL_PUBLISHING
.iter()
.all(|kind| kinds.iter().any(|k| k == kind))
@ -231,8 +228,7 @@ impl CachedBookmarks {
struct CachedBookmarksTransaction {
ctx: CoreContext,
repoid: RepositoryId,
caches: CachedBookmarks,
cache: CachedBookmarks,
transaction: Box<dyn BookmarkTransaction>,
dirty: bool,
}
@ -240,15 +236,13 @@ struct CachedBookmarksTransaction {
impl CachedBookmarksTransaction {
fn new(
ctx: CoreContext,
repoid: RepositoryId,
caches: CachedBookmarks,
cache: CachedBookmarks,
transaction: Box<dyn BookmarkTransaction>,
) -> Self {
Self {
ctx,
repoid,
cache,
transaction,
caches,
dirty: false,
}
}
@ -258,7 +252,6 @@ impl Bookmarks for CachedBookmarks {
fn list(
&self,
ctx: CoreContext,
repoid: RepositoryId,
freshness: Freshness,
prefix: &BookmarkPrefix,
kinds: &[BookmarkKind],
@ -271,25 +264,19 @@ impl Bookmarks for CachedBookmarks {
.all(|kind| BookmarkKind::ALL_PUBLISHING.iter().any(|k| k == kind))
{
// All requested kinds are supported by the cache.
return self
.list_from_publishing_cache(ctx, repoid, prefix, kinds, pagination, limit);
return self.list_from_publishing_cache(ctx, prefix, kinds, pagination, limit);
}
}
// Bypass the cache as it cannot serve this request.
self.bookmarks
.list(ctx, repoid, freshness, prefix, kinds, pagination, limit)
.list(ctx, freshness, prefix, kinds, pagination, limit)
}
fn create_transaction(
&self,
ctx: CoreContext,
repoid: RepositoryId,
) -> Box<dyn BookmarkTransaction> {
fn create_transaction(&self, ctx: CoreContext) -> Box<dyn BookmarkTransaction> {
Box::new(CachedBookmarksTransaction::new(
ctx.clone(),
repoid,
self.clone(),
self.bookmarks.create_transaction(ctx, repoid),
self.bookmarks.create_transaction(ctx),
))
}
@ -297,12 +284,11 @@ impl Bookmarks for CachedBookmarks {
&self,
ctx: CoreContext,
bookmark: &BookmarkName,
repoid: RepositoryId,
) -> BoxFuture<'static, Result<Option<ChangesetId>>> {
// NOTE: If you to implement a Freshness notion here and try to fetch from cache, be
// mindful that not all bookmarks are cached, so a cache miss here does not necessarily
// mean that the Bookmark does not exist.
self.bookmarks.get(ctx, bookmark, repoid)
self.bookmarks.get(ctx, bookmark)
}
}
@ -385,8 +371,7 @@ impl BookmarkTransaction for CachedBookmarksTransaction {
fn commit(self: Box<Self>) -> BoxFuture<'static, Result<bool>> {
let CachedBookmarksTransaction {
transaction,
caches,
repoid,
cache,
ctx,
dirty,
} = *self;
@ -395,7 +380,7 @@ impl BookmarkTransaction for CachedBookmarksTransaction {
.commit()
.map_ok(move |success| {
if success && dirty {
caches.purge_cache(ctx, repoid);
cache.purge(ctx);
}
success
})
@ -408,8 +393,7 @@ impl BookmarkTransaction for CachedBookmarksTransaction {
) -> BoxFuture<'static, Result<bool>> {
let CachedBookmarksTransaction {
transaction,
caches,
repoid,
cache,
ctx,
dirty,
} = *self;
@ -418,7 +402,7 @@ impl BookmarkTransaction for CachedBookmarksTransaction {
.commit_with_hook(txn_hook)
.map_ok(move |success| {
if success && dirty {
caches.purge_cache(ctx, repoid);
cache.purge(ctx);
}
success
})
@ -473,9 +457,8 @@ mod tests {
fn create_dirty_transaction(
bookmarks: &impl Bookmarks,
ctx: CoreContext,
repoid: RepositoryId,
) -> Box<dyn BookmarkTransaction> {
let mut transaction = bookmarks.create_transaction(ctx.clone(), repoid);
let mut transaction = bookmarks.create_transaction(ctx.clone());
// Dirty the transaction.
transaction
@ -494,7 +477,6 @@ mod tests {
&self,
_ctx: CoreContext,
_name: &BookmarkName,
_repoid: RepositoryId,
) -> BoxFuture<'static, Result<Option<ChangesetId>>> {
unimplemented!()
}
@ -502,7 +484,6 @@ mod tests {
fn list(
&self,
_ctx: CoreContext,
_repo_id: RepositoryId,
freshness: Freshness,
prefix: &BookmarkPrefix,
kinds: &[BookmarkKind],
@ -529,11 +510,7 @@ mod tests {
.boxed()
}
fn create_transaction(
&self,
_ctx: CoreContext,
_repoid: RepositoryId,
) -> Box<dyn BookmarkTransaction> {
fn create_transaction(&self, _ctx: CoreContext) -> Box<dyn BookmarkTransaction> {
Box::new(MockTransaction)
}
}
@ -543,7 +520,7 @@ mod tests {
impl BookmarkTransaction for MockTransaction {
fn update(
&mut self,
_key: &BookmarkName,
_bookmark: &BookmarkName,
_new_cs: ChangesetId,
_old_cs: ChangesetId,
_reason: BookmarkUpdateReason,
@ -554,7 +531,7 @@ mod tests {
fn create(
&mut self,
_key: &BookmarkName,
_bookmark: &BookmarkName,
_new_cs: ChangesetId,
_reason: BookmarkUpdateReason,
_bundle_replay: Option<&dyn BundleReplay>,
@ -564,7 +541,7 @@ mod tests {
fn force_set(
&mut self,
_key: &BookmarkName,
_bookmark: &BookmarkName,
_new_cs: ChangesetId,
_reason: BookmarkUpdateReason,
_bundle_replay: Option<&dyn BundleReplay>,
@ -574,7 +551,7 @@ mod tests {
fn delete(
&mut self,
_key: &BookmarkName,
_bookmark: &BookmarkName,
_old_cs: ChangesetId,
_reason: BookmarkUpdateReason,
_bundle_replay: Option<&dyn BundleReplay>,
@ -584,7 +561,7 @@ mod tests {
fn force_delete(
&mut self,
_key: &BookmarkName,
_bookmark: &BookmarkName,
_reason: BookmarkUpdateReason,
_bundle_replay: Option<&dyn BundleReplay>,
) -> Result<()> {
@ -668,12 +645,12 @@ mod tests {
fn test_cached_bookmarks(fb: FacebookInit) {
let mut rt = Runtime::new().unwrap();
let ctx = CoreContext::test_mock(fb);
let repoid = RepositoryId::new(0);
let repo_id = RepositoryId::new(0);
let (mock, requests) = MockBookmarks::create();
let requests = requests.into_future();
let bookmarks = CachedBookmarks::new(Arc::new(mock), Duration::from_secs(3));
let bookmarks = CachedBookmarks::new(Arc::new(mock), Duration::from_secs(3), repo_id);
let spawn_query = |prefix: &'static str, rt: &mut Runtime| {
let (sender, receiver) = oneshot::channel();
@ -681,7 +658,6 @@ mod tests {
let fut = bookmarks
.list(
ctx.clone(),
repoid,
Freshness::MaybeStale,
&BookmarkPrefix::new(prefix).unwrap(),
BookmarkKind::ALL_PUBLISHING,
@ -731,14 +707,14 @@ mod tests {
let requests = assert_no_pending_requests(requests, &mut rt, 100);
// Create a non dirty transaction and make sure that no requests go to master.
let transaction = bookmarks.create_transaction(ctx.clone(), repoid);
let transaction = bookmarks.create_transaction(ctx.clone());
rt.block_on(transaction.commit().compat()).unwrap();
let _ = spawn_query("c", &mut rt);
let requests = assert_no_pending_requests(requests, &mut rt, 100);
// successfull transaction should redirect further requests to master
let transaction = create_dirty_transaction(&bookmarks, ctx.clone(), repoid);
let transaction = create_dirty_transaction(&bookmarks, ctx.clone());
rt.block_on(transaction.commit().compat()).unwrap();
let res = spawn_query("a", &mut rt);
@ -850,7 +826,7 @@ mod tests {
let (mock, requests) = MockBookmarks::create();
let requests = requests.into_future();
let store = CachedBookmarks::new(Arc::new(mock), Duration::from_secs(100));
let store = CachedBookmarks::new(Arc::new(mock), Duration::from_secs(100), repo_id);
let (sender, receiver) = oneshot::channel();
@ -858,7 +834,6 @@ mod tests {
let fut = store
.list(
ctx,
repo_id,
query_freshness,
query_prefix,
query_kinds,

View File

@ -12,7 +12,7 @@ use anyhow::Result;
use context::CoreContext;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use mononoke_types::{ChangesetId, RepositoryId};
use mononoke_types::ChangesetId;
mod cache;
mod log;
@ -37,7 +37,6 @@ pub trait Bookmarks: Send + Sync + 'static {
&self,
ctx: CoreContext,
name: &BookmarkName,
repoid: RepositoryId,
) -> BoxFuture<'static, Result<Option<ChangesetId>>>;
/// List bookmarks that match certain parameters.
@ -58,7 +57,6 @@ pub trait Bookmarks: Send + Sync + 'static {
fn list(
&self,
ctx: CoreContext,
repoid: RepositoryId,
freshness: Freshness,
prefix: &BookmarkPrefix,
kinds: &[BookmarkKind],
@ -67,9 +65,5 @@ pub trait Bookmarks: Send + Sync + 'static {
) -> BoxStream<'static, Result<(Bookmark, ChangesetId)>>;
/// Create a transaction to modify bookmarks.
fn create_transaction(
&self,
ctx: CoreContext,
repoid: RepositoryId,
) -> Box<dyn BookmarkTransaction>;
fn create_transaction(&self, ctx: CoreContext) -> Box<dyn BookmarkTransaction>;
}

View File

@ -47,7 +47,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
limit: u64,
freshness: Freshness,
) -> BoxStream<'static, Result<BookmarkUpdateLogEntry>>;
@ -58,7 +57,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
limit: u64,
) -> BoxStream<'static, Result<BookmarkUpdateLogEntry>>;
@ -67,7 +65,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
_ctx: CoreContext,
name: BookmarkName,
repo_id: RepositoryId,
max_rec: u32,
offset: Option<u32>,
freshness: Freshness,
@ -79,7 +76,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
_ctx: CoreContext,
id: u64,
repoid: RepositoryId,
exclude_reason: Option<BookmarkUpdateReason>,
) -> BoxFuture<'static, Result<u64>>;
@ -88,7 +84,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
_ctx: CoreContext,
id: u64,
repoid: RepositoryId,
) -> BoxFuture<'static, Result<Vec<(BookmarkUpdateReason, u64)>>>;
/// Find the last contiguous BookmarkUpdateLog entry matching the reason provided.
@ -96,7 +91,6 @@ pub trait BookmarkUpdateLog: Send + Sync + 'static {
&self,
ctx: CoreContext,
id: u64,
repoid: RepositoryId,
reason: BookmarkUpdateReason,
) -> BoxFuture<'static, Result<Option<u64>>>;
}

View File

@ -13,7 +13,7 @@ use bookmarks::{BookmarkUpdateLog, BookmarkUpdateLogEntry, BookmarkUpdateReason,
use clap::{App, Arg, ArgMatches, SubCommand};
use cmdlib::args;
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use futures::stream::StreamExt;
use futures::{compat::Future01CompatExt, future};
@ -136,7 +136,7 @@ async fn last_processed(
ctx: &CoreContext,
repo_id: RepositoryId,
mutable_counters: &SqlMutableCounters,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
match (
sub_m.value_of(ARG_SET),
@ -207,7 +207,6 @@ async fn last_processed(
.skip_over_bookmark_log_entries_with_reason(
ctx.clone(),
counter.try_into()?,
repo_id,
BookmarkUpdateReason::Blobimport,
)
.await?;
@ -256,7 +255,7 @@ async fn remains(
ctx: &CoreContext,
repo_id: RepositoryId,
mutable_counters: &SqlMutableCounters,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
let quiet = sub_m.is_present(ARG_QUIET);
let without_blobimport = sub_m.is_present(ARG_WITHOUT_BLOBIMPORT);
@ -278,7 +277,7 @@ async fn remains(
};
let remaining = bookmarks
.count_further_bookmark_log_entries(ctx.clone(), counter, repo_id, exclude_reason)
.count_further_bookmark_log_entries(ctx.clone(), counter, exclude_reason)
.await
.with_context(|| {
format!(
@ -309,7 +308,7 @@ async fn show(
ctx: &CoreContext,
repo: &BlobRepo,
mutable_counters: &SqlMutableCounters,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
let limit = args::get_u64(sub_m, "limit", 10);
@ -326,7 +325,6 @@ async fn show(
let mut entries = bookmarks.read_next_bookmark_log_entries(
ctx.clone(),
counter.try_into()?,
repo.get_repoid(),
limit,
Freshness::MostRecent,
);
@ -364,7 +362,7 @@ async fn fetch_bundle(
sub_m: &ArgMatches<'_>,
ctx: &CoreContext,
repo: &BlobRepo,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
let id = args::get_i64_opt(&sub_m, ARG_ID)
.ok_or_else(|| format_err!("--{} is not specified", ARG_ID))?;
@ -374,7 +372,7 @@ async fn fetch_bundle(
.ok_or_else(|| format_err!("--{} is not specified", ARG_OUTPUT_FILE))?
.into();
let log_entry = get_entry_by_id(ctx, repo.get_repoid(), bookmarks, id).await?;
let log_entry = get_entry_by_id(ctx, bookmarks, id).await?;
let bundle_replay_data: BundleReplayData = log_entry
.bundle_replay_data
@ -397,7 +395,7 @@ async fn verify(
ctx: &CoreContext,
repo_id: RepositoryId,
mutable_counters: &SqlMutableCounters,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
let counter = mutable_counters
.get_counter(ctx.clone(), repo_id, LATEST_REPLAYED_REQUEST_KEY)
@ -407,7 +405,7 @@ async fn verify(
.try_into()?;
let counts = bookmarks
.count_further_bookmark_log_entries_by_reason(ctx.clone(), counter, repo_id)
.count_further_bookmark_log_entries_by_reason(ctx.clone(), counter)
.await?;
let (blobimports, others): (
@ -458,7 +456,7 @@ async fn inspect(
sub_m: &ArgMatches<'_>,
ctx: &CoreContext,
repo: &BlobRepo,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
) -> Result<(), Error> {
async fn load_opt(
ctx: &CoreContext,
@ -479,7 +477,7 @@ async fn inspect(
let id = args::get_i64_opt(&sub_m, ARG_ID)
.ok_or_else(|| format_err!("--{} is not specified", ARG_ID))?;
let log_entry = get_entry_by_id(ctx, repo.get_repoid(), bookmarks, id).await?;
let log_entry = get_entry_by_id(ctx, bookmarks, id).await?;
println!("Bookmark: {}", log_entry.bookmark_name);
@ -514,18 +512,11 @@ async fn inspect(
async fn get_entry_by_id(
ctx: &CoreContext,
repo_id: RepositoryId,
bookmarks: &SqlBookmarks,
bookmarks: &dyn BookmarkUpdateLog,
id: i64,
) -> Result<BookmarkUpdateLogEntry, Error> {
let log_entry = bookmarks
.read_next_bookmark_log_entries(
ctx.clone(),
(id - 1).try_into()?,
repo_id,
1,
Freshness::MostRecent,
)
.read_next_bookmark_log_entries(ctx.clone(), (id - 1).try_into()?, 1, Freshness::MostRecent)
.next()
.await
.ok_or_else(|| Error::msg("no log entries found"))??;
@ -552,10 +543,11 @@ pub async fn subcommand_process_hg_sync<'a>(
.await
.context("While opening SqlMutableCounters")?;
let bookmarks = args::open_sql::<SqlBookmarks>(fb, &matches)
let bookmarks = args::open_sql::<SqlBookmarksBuilder>(fb, &matches)
.compat()
.await
.context("While opening SqlBookmarks")?;
.context("While opening SqlBookmarks")?
.with_repo_id(repo_id);
let res = match sub_m.subcommand() {
(HG_SYNC_LAST_PROCESSED, Some(sub_m)) => {

View File

@ -493,7 +493,6 @@ async fn tail_one_iteration(
.bookmarks()
.list(
ctx.clone(),
repo.get_repoid(),
Freshness::MostRecent,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,

View File

@ -837,7 +837,6 @@ impl RepoContext {
.attribute_expected::<dyn Bookmarks>()
.list(
self.ctx.clone(),
blob_repo.get_repoid(),
Freshness::MaybeStale,
&prefix,
kinds,

View File

@ -59,7 +59,7 @@ impl RepoWriteContext {
let old_target = self
.blob_repo()
.bookmarks()
.get(self.ctx().clone(), &bookmark, self.blob_repo().get_repoid())
.get(self.ctx().clone(), &bookmark)
.await?;
let action = if is_scratch_bookmark {

View File

@ -13,7 +13,7 @@ use bookmarks::{BookmarkName, BookmarkUpdateLog, BookmarkUpdateReason, Freshness
use context::CoreContext;
use fbinit::FacebookInit;
use futures::stream::TryStreamExt;
use mononoke_types::{ChangesetId, RepositoryId};
use mononoke_types::ChangesetId;
use tests_utils::drawdag::create_from_dag;
use crate::repo::{Repo, RepoContext};
@ -77,7 +77,6 @@ async fn move_bookmark(fb: FacebookInit) -> Result<()> {
.list_bookmark_log_entries(
ctx.clone(),
BookmarkName::new("trunk")?,
RepositoryId::new(0),
3,
None,
Freshness::MostRecent,

View File

@ -1302,7 +1302,7 @@ mod tests {
use blobrepo_hg::BlobRepoHg;
use blobrepo_override::DangerousOverride;
use bookmarks::{BookmarkTransactionError, BookmarkUpdateLog, Bookmarks};
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use fixtures::{linear, many_files_dirs, merge_even};
use futures::{
@ -1491,7 +1491,9 @@ mod tests {
// Initializes bookmarks and mutable_counters on the "same db" i.e. on the same
// sqlite connection
async fn init_bookmarks_mutable_counters() -> Result<
async fn init_bookmarks_mutable_counters(
repo_id: RepositoryId,
) -> Result<
(
Arc<dyn Bookmarks>,
Arc<dyn BookmarkUpdateLog>,
@ -1501,12 +1503,13 @@ mod tests {
> {
let con = SqliteConnection::open_in_memory()?;
con.execute_batch(SqlMutableCounters::CREATION_QUERY)?;
con.execute_batch(SqlBookmarks::CREATION_QUERY)?;
con.execute_batch(SqlBookmarksBuilder::CREATION_QUERY)?;
let con = Connection::with_sqlite(con);
let bookmarks = Arc::new(SqlBookmarks::from_sql_connections(
SqlConnections::new_single(con.clone()),
));
let bookmarks = Arc::new(
SqlBookmarksBuilder::from_sql_connections(SqlConnections::new_single(con.clone()))
.with_repo_id(repo_id),
);
let mutable_counters = Arc::new(SqlMutableCounters::from_sql_connections(
SqlConnections::new_single(con),
));
@ -1587,15 +1590,15 @@ mod tests {
.commit()
.await?;
let repoid = repo.get_repoid();
let (bookmarks, bookmark_update_log, mutable_counters) =
init_bookmarks_mutable_counters().await?;
init_bookmarks_mutable_counters(repoid).await?;
let repo = repo
.dangerous_override(|_| bookmarks)
.dangerous_override(|_| bookmark_update_log);
let bcs = bcs_id.load(ctx.clone(), repo.blobstore()).await?;
let repoid = repo.get_repoid();
let mut book = master_bookmark();
bookmark(&ctx, &repo, book.bookmark.clone())

View File

@ -12,7 +12,7 @@ use bookmarks::{BookmarkName, BookmarkUpdateReason, Bookmarks};
use clap::{App, Arg, SubCommand};
use cmdlib::args;
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use futures::future::TryFutureExt;
use futures_old::future::Future;
@ -62,7 +62,8 @@ fn main(fb: FacebookInit) -> Result<()> {
let matches = setup_app().get_matches();
let repo_id = args::get_repo_id(fb, &matches).unwrap();
let fut = args::open_sql::<SqlBookmarks>(fb, &matches).and_then(move |bookmarks| {
let fut = args::open_sql::<SqlBookmarksBuilder>(fb, &matches).and_then(move |builder| {
let bookmarks = builder.with_repo_id(repo_id);
let name = matches.value_of(BOOKMARK).unwrap().to_string();
let reason = match matches.is_present(BLOBIMPORT) {
true => BookmarkUpdateReason::Blobimport,
@ -71,7 +72,7 @@ fn main(fb: FacebookInit) -> Result<()> {
let bookmark = BookmarkName::new(name).unwrap();
let mut txn = bookmarks.create_transaction(ctx, repo_id);
let mut txn = bookmarks.create_transaction(ctx);
match matches.subcommand() {
(CREATE, Some(sub_m)) => {

View File

@ -194,13 +194,7 @@ async fn get_replay_stream<'a>(
let entry = repo
.attribute_expected::<dyn BookmarkUpdateLog>()
.read_next_bookmark_log_entries(
ctx.clone(),
id - 1,
repo.get_repoid(),
1,
Freshness::MostRecent,
)
.read_next_bookmark_log_entries(ctx.clone(), id - 1, 1, Freshness::MostRecent)
.next()
.await
.ok_or_else(|| format_err!("Entry with id {} does not exist", id))??;

View File

@ -668,12 +668,10 @@ where
Route: 'static + Send + Clone,
{
// Build lookups
let repoid = *(&repo.get_repoid());
let published_bookmarks = repo
.bookmarks()
.list(
ctx.clone(),
repoid,
Freshness::MostRecent,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,