mononoke: revert D12945071

Summary: It causes test failures. REvert for now until they fixed

Reviewed By: farnz

Differential Revision: D13040073

fbshipit-source-id: fc05373c882baf42f7bd2a3a1c1173e8ba26a952
This commit is contained in:
Stanislau Hlebik 2018-11-13 04:59:36 -08:00 committed by Facebook Github Bot
parent 31a8a44ffb
commit aae4562523
2 changed files with 110 additions and 116 deletions

View File

@ -26,7 +26,7 @@ use std::collections::{HashMap, HashSet};
use bookmarks::{Bookmark, BookmarkPrefix, Bookmarks, Transaction};
use failure::{Error, Result};
use futures::{stream, Future, IntoFuture, future::{loop_fn, Loop}};
use futures_ext::{spawn_future, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use sql::Connection;
pub use sql_ext::SqlConstructors;
@ -251,100 +251,97 @@ impl Transaction for SqlBookmarksTransaction {
deletes,
} = this;
spawn_future(
write_connection
.start_transaction()
.map_err(Some)
.and_then(move |transaction| {
let force_set: Vec<_> = force_sets.into_iter().collect();
let mut ref_rows = Vec::new();
for idx in 0..force_set.len() {
ref_rows.push((&repo_id, &force_set[idx].0, &force_set[idx].1))
}
ReplaceBookmarks::query_with_transaction(transaction, &ref_rows[..])
.map_err(Some)
})
.and_then(move |(transaction, _)| {
let creates: Vec<_> = creates.into_iter().collect();
let mut ref_rows = Vec::new();
for idx in 0..creates.len() {
ref_rows.push((&repo_id, &creates[idx].0, &creates[idx].1))
}
InsertBookmarks::query_with_transaction(transaction, &ref_rows[..])
.map_err(Some)
})
.and_then(move |(transaction, _)| {
loop_fn(
(transaction, sets.into_iter()),
move |(transaction, mut updates)| match updates.next() {
Some((name, BookmarkSetData { new_cs, old_cs })) => {
UpdateBookmark::query_with_transaction(
transaction,
&repo_id,
&name,
&old_cs,
&new_cs,
).then(move |res| match res {
Err(err) => Err(Some(err)),
Ok((transaction, result)) => if result.affected_rows() == 1 {
Ok((transaction, updates))
} else {
Err(None)
},
})
.map(Loop::Continue)
.left_future()
}
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.and_then(move |transaction| {
loop_fn(
(transaction, force_deletes.into_iter()),
move |(transaction, mut deletes)| match deletes.next() {
Some(name) => {
DeleteBookmark::query_with_transaction(transaction, &repo_id, &name)
.then(move |res| match res {
Err(err) => Err(Some(err)),
Ok((transaction, _)) => Ok((transaction, deletes)),
})
.map(Loop::Continue)
.left_future()
}
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.and_then(move |transaction| {
loop_fn(
(transaction, deletes.into_iter()),
move |(transaction, mut deletes)| match deletes.next() {
Some((name, old_cs)) => DeleteBookmarkIf::query_with_transaction(
write_connection
.start_transaction()
.map_err(Some)
.and_then(move |transaction| {
let force_set: Vec<_> = force_sets.into_iter().collect();
let mut ref_rows = Vec::new();
for idx in 0..force_set.len() {
ref_rows.push((&repo_id, &force_set[idx].0, &force_set[idx].1))
}
ReplaceBookmarks::query_with_transaction(transaction, &ref_rows[..]).map_err(Some)
})
.and_then(move |(transaction, _)| {
let creates: Vec<_> = creates.into_iter().collect();
let mut ref_rows = Vec::new();
for idx in 0..creates.len() {
ref_rows.push((&repo_id, &creates[idx].0, &creates[idx].1))
}
InsertBookmarks::query_with_transaction(transaction, &ref_rows[..]).map_err(Some)
})
.and_then(move |(transaction, _)| {
loop_fn(
(transaction, sets.into_iter()),
move |(transaction, mut updates)| match updates.next() {
Some((name, BookmarkSetData { new_cs, old_cs })) => {
UpdateBookmark::query_with_transaction(
transaction,
&repo_id,
&name,
&old_cs,
&new_cs,
).then(move |res| match res {
Err(err) => Err(Some(err)),
Ok((transaction, result)) => if result.affected_rows() == 1 {
Ok((transaction, deletes))
Ok((transaction, updates))
} else {
Err(None)
},
})
.map(Loop::Continue)
.left_future(),
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.then(|result| match result {
Ok(transaction) => transaction.commit().and_then(|()| Ok(true)).left_future(),
Err(None) => Ok(false).into_future().right_future(),
Err(Some(err)) => Err(err).into_future().right_future(),
}),
).boxify()
.left_future()
}
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.and_then(move |transaction| {
loop_fn(
(transaction, force_deletes.into_iter()),
move |(transaction, mut deletes)| match deletes.next() {
Some(name) => {
DeleteBookmark::query_with_transaction(transaction, &repo_id, &name)
.then(move |res| match res {
Err(err) => Err(Some(err)),
Ok((transaction, _)) => Ok((transaction, deletes)),
})
.map(Loop::Continue)
.left_future()
}
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.and_then(move |transaction| {
loop_fn(
(transaction, deletes.into_iter()),
move |(transaction, mut deletes)| match deletes.next() {
Some((name, old_cs)) => DeleteBookmarkIf::query_with_transaction(
transaction,
&repo_id,
&name,
&old_cs,
).then(move |res| match res {
Err(err) => Err(Some(err)),
Ok((transaction, result)) => if result.affected_rows() == 1 {
Ok((transaction, deletes))
} else {
Err(None)
},
})
.map(Loop::Continue)
.left_future(),
None => Ok(Loop::Break(transaction)).into_future().right_future(),
},
)
})
.then(|result| match result {
Ok(transaction) => transaction.commit().and_then(|()| Ok(true)).left_future(),
Err(None) => Ok(false).into_future().right_future(),
Err(Some(err)) => Err(err).into_future().right_future(),
})
.boxify()
}
}

View File

@ -48,7 +48,7 @@ use sql::{Connection, Transaction};
pub use sql_ext::SqlConstructors;
use futures::{stream, Future, IntoFuture};
use futures_ext::{spawn_future, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use mercurial_types::RepositoryId;
use mononoke_types::ChangesetId;
use rust_thrift::compact_protocol;
@ -249,38 +249,35 @@ impl Changesets for SqlChangesets {
.and_then(move |parent_rows| {
try_boxfuture!(check_missing_rows(&cs.parents, &parent_rows));
let gen = parent_rows.iter().map(|row| row.2).max().unwrap_or(0) + 1;
spawn_future(
write_connection
.start_transaction()
.and_then({
cloned!(cs);
move |transaction| {
InsertChangeset::query_with_transaction(
transaction,
&[(&cs.repo_id, &cs.cs_id, &gen)],
)
}
})
.and_then(move |(transaction, result)| {
if result.affected_rows() == 1 && result.last_insert_id().is_some() {
insert_parents(
transaction,
result.last_insert_id().unwrap(),
cs,
parent_rows,
).map(|()| true)
.left_future()
} else {
transaction
.rollback()
.and_then(move |()| {
check_changeset_matches(&write_connection, cs)
})
.map(|()| false)
.right_future()
}
}),
).boxify()
write_connection
.start_transaction()
.and_then({
cloned!(cs);
move |transaction| {
InsertChangeset::query_with_transaction(
transaction,
&[(&cs.repo_id, &cs.cs_id, &gen)],
)
}
})
.and_then(move |(transaction, result)| {
if result.affected_rows() == 1 && result.last_insert_id().is_some() {
insert_parents(
transaction,
result.last_insert_id().unwrap(),
cs,
parent_rows,
).map(|()| true)
.left_future()
} else {
transaction
.rollback()
.and_then(move |()| check_changeset_matches(&write_connection, cs))
.map(|()| false)
.right_future()
}
})
.boxify()
})
.boxify()
}