mononoke: move core logic of bookmarks transaction commit into a separate fn

Summary:
See bottom diff for more context on the problem we are trying to solve.

This is a preparation for commit retries.

Reviewed By: krallin

Differential Revision: D15855498

fbshipit-source-id: 6675cc2d698a985bc2ee7d54da00173ba192bc6b
This commit is contained in:
Kostia Balytskyi 2019-06-19 03:51:35 -07:00 committed by Facebook Github Bot
parent d9156afe6c
commit 3e5f7242ed

View File

@ -726,118 +726,26 @@ impl SqlBookmarksTransaction {
},
)
}
}
impl Transaction for SqlBookmarksTransaction {
fn update(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
old_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.sets
.insert(key.clone(), (BookmarkSetData { new_cs, old_cs }, reason));
Ok(())
}
fn create(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.creates.insert(key.clone(), (new_cs, reason));
Ok(())
}
fn force_set(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.force_sets.insert(key.clone(), (new_cs, reason));
Ok(())
}
fn delete(
&mut self,
key: &BookmarkName,
old_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.deletes.insert(key.clone(), (old_cs, reason));
Ok(())
}
fn force_delete(&mut self, key: &BookmarkName, reason: BookmarkUpdateReason) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.force_deletes.insert(key.clone(), reason);
Ok(())
}
fn update_infinitepush(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
old_cs: ChangesetId,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.infinitepush_sets
.insert(key.clone(), BookmarkSetData { new_cs, old_cs });
Ok(())
}
fn create_infinitepush(&mut self, key: &BookmarkName, new_cs: ChangesetId) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.infinitepush_creates.insert(key.clone(), new_cs);
Ok(())
}
fn commit(self: Box<Self>) -> BoxFuture<bool, Error> {
let this = *self;
let Self {
write_connection,
repo_id,
force_sets,
creates,
sets,
force_deletes,
deletes,
infinitepush_sets,
infinitepush_creates,
} = this;
let mut log_rows: HashMap<_, (Option<ChangesetId>, Option<ChangesetId>, _)> =
HashMap::new();
for (bookmark, (to_cs_id, reason)) in force_sets.clone() {
log_rows.insert(bookmark, (None, Some(to_cs_id), reason));
}
for (bookmark, (to_cs_id, reason)) in creates.clone() {
log_rows.insert(bookmark, (None, Some(to_cs_id), reason));
}
for (bookmark, (bookmark_set_data, reason)) in sets.clone() {
let from_cs_id = bookmark_set_data.old_cs;
let to_cs_id = bookmark_set_data.new_cs;
log_rows.insert(bookmark, (Some(from_cs_id), Some(to_cs_id), reason));
}
for (bookmark, reason) in force_deletes.clone() {
log_rows.insert(bookmark, (None, None, reason));
}
for (bookmark, (from_cs_id, reason)) in deletes.clone() {
log_rows.insert(bookmark, (Some(from_cs_id), None, reason));
}
fn attempt_commit(
write_connection: &Connection,
repo_id: RepositoryId,
force_sets: HashMap<BookmarkName, (ChangesetId, BookmarkUpdateReason)>,
creates: HashMap<BookmarkName, (ChangesetId, BookmarkUpdateReason)>,
sets: HashMap<BookmarkName, (BookmarkSetData, BookmarkUpdateReason)>,
force_deletes: HashMap<BookmarkName, BookmarkUpdateReason>,
deletes: HashMap<BookmarkName, (ChangesetId, BookmarkUpdateReason)>,
infinitepush_sets: HashMap<BookmarkName, BookmarkSetData>,
infinitepush_creates: HashMap<BookmarkName, ChangesetId>,
log_rows: HashMap<
BookmarkName,
(
Option<ChangesetId>,
Option<ChangesetId>,
BookmarkUpdateReason,
),
>,
) -> BoxFuture<SqlTransaction, BookmarkTransactionError> {
// NOTE: Infinitepush updates do *not* go into log_rows. This is because the
// BookmarkUpdateLog is currently used for replays to Mercurial, and those updates should
// not be replayed (for Infinitepush, those updates are actually dispatched by the client
@ -1011,18 +919,144 @@ impl Transaction for SqlBookmarksTransaction {
Self::log_bookmark_moves(repo_id, Timestamp::now(), log_rows, transaction)
.map_err(BookmarkTransactionError::RetryableError)
})
.then(|result| match result {
Ok(transaction) => transaction.commit().and_then(|()| Ok(true)).left_future(),
Err(BookmarkTransactionError::LogicError) => Ok(false).into_future().right_future(),
Err(BookmarkTransactionError::Other(err)) => Err(err).into_future().right_future(),
Err(BookmarkTransactionError::RetryableError(err)) => {
Err(err).into_future().right_future()
}
})
.boxify()
}
}
impl Transaction for SqlBookmarksTransaction {
fn update(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
old_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.sets
.insert(key.clone(), (BookmarkSetData { new_cs, old_cs }, reason));
Ok(())
}
fn create(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.creates.insert(key.clone(), (new_cs, reason));
Ok(())
}
fn force_set(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.force_sets.insert(key.clone(), (new_cs, reason));
Ok(())
}
fn delete(
&mut self,
key: &BookmarkName,
old_cs: ChangesetId,
reason: BookmarkUpdateReason,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.deletes.insert(key.clone(), (old_cs, reason));
Ok(())
}
fn force_delete(&mut self, key: &BookmarkName, reason: BookmarkUpdateReason) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.force_deletes.insert(key.clone(), reason);
Ok(())
}
fn update_infinitepush(
&mut self,
key: &BookmarkName,
new_cs: ChangesetId,
old_cs: ChangesetId,
) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.infinitepush_sets
.insert(key.clone(), BookmarkSetData { new_cs, old_cs });
Ok(())
}
fn create_infinitepush(&mut self, key: &BookmarkName, new_cs: ChangesetId) -> Result<()> {
self.check_if_bookmark_already_used(key)?;
self.infinitepush_creates.insert(key.clone(), new_cs);
Ok(())
}
fn commit(self: Box<Self>) -> BoxFuture<bool, Error> {
let this = *self;
let Self {
write_connection,
repo_id,
force_sets,
creates,
sets,
force_deletes,
deletes,
infinitepush_sets,
infinitepush_creates,
} = this;
let mut log_rows: HashMap<_, (Option<ChangesetId>, Option<ChangesetId>, _)> =
HashMap::new();
for (bookmark, (to_cs_id, reason)) in force_sets.clone() {
log_rows.insert(bookmark, (None, Some(to_cs_id), reason));
}
for (bookmark, (to_cs_id, reason)) in creates.clone() {
log_rows.insert(bookmark, (None, Some(to_cs_id), reason));
}
for (bookmark, (bookmark_set_data, reason)) in sets.clone() {
let from_cs_id = bookmark_set_data.old_cs;
let to_cs_id = bookmark_set_data.new_cs;
log_rows.insert(bookmark, (Some(from_cs_id), Some(to_cs_id), reason));
}
for (bookmark, reason) in force_deletes.clone() {
log_rows.insert(bookmark, (None, None, reason));
}
for (bookmark, (from_cs_id, reason)) in deletes.clone() {
log_rows.insert(bookmark, (Some(from_cs_id), None, reason));
}
Self::attempt_commit(
&write_connection,
repo_id,
force_sets,
creates,
sets,
force_deletes,
deletes,
infinitepush_sets,
infinitepush_creates,
log_rows,
)
.then(|result| match result {
Ok(transaction) => transaction.commit().and_then(|()| Ok(true)).left_future(),
Err(BookmarkTransactionError::LogicError) => Ok(false).into_future().right_future(),
Err(BookmarkTransactionError::Other(err)) => Err(err).into_future().right_future(),
Err(BookmarkTransactionError::RetryableError(err)) => {
Err(err).into_future().right_future()
}
})
.boxify()
}
}
#[derive(Clone)]
struct BookmarkSetData {
new_cs: ChangesetId,