mirror of
https://github.com/facebook/sapling.git
synced 2024-10-12 01:39:21 +03:00
mononoke: make sure caches are purged when backsyncer updates transaction
Reviewed By: ikostia, farnz Differential Revision: D18478452 fbshipit-source-id: 8a0a96508f82224d2e1b312390616974084a5e03
This commit is contained in:
parent
b0bc02a308
commit
a5e451eeb8
@ -178,6 +178,7 @@ pub fn open_blobrepo_given_datasources(
|
||||
scuba_censored_table,
|
||||
repoid,
|
||||
filestore_config,
|
||||
bookmarks_cache_ttl,
|
||||
)
|
||||
}
|
||||
Caching::Enabled => new_production(
|
||||
@ -244,11 +245,21 @@ fn new_development(
|
||||
scuba_censored_table: Option<String>,
|
||||
repoid: RepositoryId,
|
||||
filestore_config: FilestoreConfig,
|
||||
bookmarks_cache_ttl: Option<Duration>,
|
||||
) -> BoxFuture<BlobRepo, Error> {
|
||||
let bookmarks = sql_factory
|
||||
.open::<SqlBookmarks>()
|
||||
.chain_err(ErrorKind::StateOpen(StateOpenError::Bookmarks))
|
||||
.from_err();
|
||||
.from_err()
|
||||
.map(move |bookmarks| {
|
||||
let bookmarks: Arc<dyn Bookmarks> = if let Some(ttl) = bookmarks_cache_ttl {
|
||||
Arc::new(CachedBookmarks::new(bookmarks, ttl))
|
||||
} else {
|
||||
bookmarks
|
||||
};
|
||||
|
||||
bookmarks
|
||||
});
|
||||
|
||||
let filenodes = sql_factory
|
||||
.open_filenodes()
|
||||
|
@ -947,6 +947,10 @@ impl BlobRepo {
|
||||
self.bonsai_hg_mapping.clone()
|
||||
}
|
||||
|
||||
pub fn get_bookmarks_object(&self) -> Arc<dyn Bookmarks> {
|
||||
self.bookmarks.clone()
|
||||
}
|
||||
|
||||
fn store_file_change(
|
||||
&self,
|
||||
ctx: CoreContext,
|
||||
|
@ -26,6 +26,7 @@ use sql::{queries, Connection, Transaction as SqlTransaction};
|
||||
pub use sql_ext::{SqlConstructors, TransactionResult};
|
||||
use stats::{define_stats, Timeseries};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
const DEFAULT_MAX: u64 = std::u64::MAX;
|
||||
const MAX_BOOKMARK_TRANSACTION_ATTEMPT_COUNT: usize = 5;
|
||||
@ -1136,10 +1137,34 @@ impl Transaction for SqlBookmarksTransaction {
|
||||
|
||||
fn commit(self: Box<Self>) -> BoxFuture<bool, Error> {
|
||||
let Self {
|
||||
write_connection,
|
||||
ref write_connection,
|
||||
..
|
||||
} = *self;
|
||||
let sql_txn_factory = Arc::new({
|
||||
cloned!(write_connection);
|
||||
move || {
|
||||
write_connection
|
||||
.start_transaction()
|
||||
.map(TransactionResult::Succeeded)
|
||||
.boxify()
|
||||
}
|
||||
});
|
||||
|
||||
self.commit_into_txn(sql_txn_factory).boxify()
|
||||
}
|
||||
|
||||
// commit_into_txn() can be used to have the same transaction to update two different
|
||||
// database tables. `sql_txn_factory()` provides this sql transaction, which is later
|
||||
// used to commit all bookmark updates into it.
|
||||
fn commit_into_txn(
|
||||
self: Box<Self>,
|
||||
sql_txn_factory: Arc<dyn Fn() -> BoxFuture<TransactionResult, Error> + Sync + Send>,
|
||||
) -> BoxFuture<bool, Error> {
|
||||
let Self {
|
||||
repo_id,
|
||||
ctx,
|
||||
payload,
|
||||
..
|
||||
} = *self;
|
||||
|
||||
ctx.perf_counters()
|
||||
@ -1147,9 +1172,12 @@ impl Transaction for SqlBookmarksTransaction {
|
||||
|
||||
let commit_fut = conditional_retry_without_delay(
|
||||
move |_attempt| {
|
||||
write_connection
|
||||
.start_transaction()
|
||||
sql_txn_factory()
|
||||
.map_err(BookmarkTransactionError::Other)
|
||||
.and_then(|txn_result| match txn_result {
|
||||
TransactionResult::Succeeded(txn) => Ok(txn),
|
||||
TransactionResult::Failed => Err(BookmarkTransactionError::LogicError),
|
||||
})
|
||||
.and_then({
|
||||
cloned!(payload);
|
||||
move |txn| Self::attempt_write(txn, repo_id, payload)
|
||||
|
@ -16,6 +16,7 @@ use failure_ext::Result;
|
||||
use futures::{future, stream, Future, Stream};
|
||||
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
|
||||
use mononoke_types::{ChangesetId, RepositoryId, Timestamp};
|
||||
use sql_ext::TransactionResult;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::{Arc, Mutex},
|
||||
@ -434,6 +435,29 @@ impl Transaction for CachedBookmarksTransaction {
|
||||
})
|
||||
.boxify()
|
||||
}
|
||||
|
||||
fn commit_into_txn(
|
||||
self: Box<Self>,
|
||||
txn_factory: Arc<dyn Fn() -> BoxFuture<TransactionResult, Error> + Sync + Send>,
|
||||
) -> BoxFuture<bool, Error> {
|
||||
let CachedBookmarksTransaction {
|
||||
transaction,
|
||||
caches,
|
||||
repoid,
|
||||
ctx,
|
||||
dirty,
|
||||
} = *self;
|
||||
|
||||
transaction
|
||||
.commit_into_txn(txn_factory)
|
||||
.map(move |success| {
|
||||
if success && dirty {
|
||||
caches.purge_cache(ctx, repoid);
|
||||
}
|
||||
success
|
||||
})
|
||||
.boxify()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -700,6 +724,13 @@ mod tests {
|
||||
fn commit(self: Box<Self>) -> BoxFuture<bool, Error> {
|
||||
future::ok(true).boxify()
|
||||
}
|
||||
|
||||
fn commit_into_txn(
|
||||
self: Box<Self>,
|
||||
_txn_factory: Arc<dyn Fn() -> BoxFuture<TransactionResult, Error> + Sync + Send>,
|
||||
) -> BoxFuture<bool, Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// next_request provides a way to advance through the stream of requests dispatched by
|
||||
|
@ -21,9 +21,11 @@ use sql::mysql_async::{
|
||||
prelude::{ConvIr, FromValue},
|
||||
FromValueError, Value,
|
||||
};
|
||||
use sql_ext::TransactionResult;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::ops::{Bound, Range, RangeBounds, RangeFrom, RangeFull};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod cache;
|
||||
pub use cache::CachedBookmarks;
|
||||
@ -613,6 +615,16 @@ pub trait Transaction: Send + Sync + 'static {
|
||||
/// successful, or errors if transaction has failed. Logical failure is indicated by
|
||||
/// returning a successful `false` value; infrastructure failure is reported via an Error.
|
||||
fn commit(self: Box<Self>) -> BoxFuture<bool, Error>;
|
||||
|
||||
/// Commits the transaction using provided transaction. If bookmarks implementation
|
||||
/// is not support committing into transactions, then it should return an error.
|
||||
/// Future succeeds if transaction has been
|
||||
/// successful, or errors if transaction has failed. Logical failure is indicated by
|
||||
/// returning a successful `false` value; infrastructure failure is reported via an Error.
|
||||
fn commit_into_txn(
|
||||
self: Box<Self>,
|
||||
txn_factory: Arc<dyn Fn() -> BoxFuture<TransactionResult, Error> + Sync + Send>,
|
||||
) -> BoxFuture<bool, Error>;
|
||||
}
|
||||
|
||||
impl From<BookmarkName> for Value {
|
||||
|
@ -149,7 +149,7 @@ fn create_repo_sync_target(
|
||||
let large_repo = target_incomplete_repo_handler.repo.blobrepo().clone();
|
||||
let mapping: Arc<dyn SyncedCommitMapping> = Arc::new(synced_commit_mapping);
|
||||
let syncers = try_boxfuture!(create_commit_syncers(
|
||||
small_repo,
|
||||
small_repo.clone(),
|
||||
large_repo,
|
||||
&commit_sync_config,
|
||||
mapping.clone()
|
||||
@ -162,6 +162,7 @@ fn create_repo_sync_target(
|
||||
|
||||
open_backsyncer_dbs_compat(
|
||||
ctx.clone(),
|
||||
small_repo,
|
||||
db_config,
|
||||
maybe_myrouter_port,
|
||||
readonly_storage,
|
||||
|
Loading…
Reference in New Issue
Block a user