Allow async commits only within JdbcIndexer (#8004)

* Allow async commits within JdbcIndexer (#8004)

CHANGELOG_BEGIN
CHANGELOG_END

* Test in SqlLedgerSpec asserting no async commits in JdbcLedgerDao
This commit is contained in:
tudor-da 2020-11-23 11:13:23 +01:00 committed by GitHub
parent d5de65a234
commit f7c2ca808a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 196 additions and 143 deletions

View File

@ -63,6 +63,7 @@ object JdbcIndexer {
config.eventsPageSize,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = true,
)
_ <- ResourceOwner.forFuture(() => ledgerDao.reset())
initialLedgerEnd <- initializeLedger(ledgerDao)
@ -76,6 +77,7 @@ object JdbcIndexer {
config.eventsPageSize,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = true,
)
initialLedgerEnd <- initializeLedger(ledgerDao)
} yield new JdbcIndexer(initialLedgerEnd, config.participantId, ledgerDao, metrics)

View File

@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao
import java.sql.{Connection, PreparedStatement}
import java.sql.Connection
import java.time.Instant
import java.util.concurrent.Executors
import java.util.{Date, UUID}
@ -81,6 +81,7 @@ private class JdbcLedgerDao(
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean,
enableAsyncCommits: Boolean = false,
) extends LedgerDao {
import JdbcLedgerDao._
@ -92,6 +93,13 @@ private class JdbcLedgerDao(
private val logger = ContextualizedLogger.get(this.getClass)
LoggingContext.newLoggingContext { implicit loggingContext =>
if (enableAsyncCommits)
logger.info("Starting JdbcLedgerDao with async commit enabled")
else
logger.info("Starting JdbcLedgerDao with async commit disabled")
}
override def currentHealth(): HealthStatus = dbDispatcher.currentHealth()
override def lookupLedgerId()(implicit loggingContext: LoggingContext): Future[Option[LedgerId]] =
@ -209,60 +217,61 @@ private class JdbcLedgerDao(
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher.executeSql(
metrics.daml.index.db.storeConfigurationEntryDbMetrics,
) {
queries.withAsyncCommit { implicit conn =>
val optCurrentConfig = ParametersTable.getLedgerEndAndConfiguration(conn)
val optExpectedGeneration: Option[Long] =
optCurrentConfig.map { case (_, c) => c.generation + 1 }
val finalRejectionReason: Option[String] =
optExpectedGeneration match {
case Some(expGeneration)
if rejectionReason.isEmpty && expGeneration != configuration.generation =>
// If we're not storing a rejection and the new generation is not succ of current configuration, then
// we store a rejection. This code path is only expected to be taken in sandbox. This follows the same
// pattern as with transactions.
Some(
s"Generation mismatch: expected=$expGeneration, actual=${configuration.generation}")
) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
val optCurrentConfig = ParametersTable.getLedgerEndAndConfiguration(conn)
val optExpectedGeneration: Option[Long] =
optCurrentConfig.map { case (_, c) => c.generation + 1 }
val finalRejectionReason: Option[String] =
optExpectedGeneration match {
case Some(expGeneration)
if rejectionReason.isEmpty && expGeneration != configuration.generation =>
// If we're not storing a rejection and the new generation is not succ of current configuration, then
// we store a rejection. This code path is only expected to be taken in sandbox. This follows the same
// pattern as with transactions.
Some(
s"Generation mismatch: expected=$expGeneration, actual=${configuration.generation}")
case _ =>
// Rejection reason was set, or we have no previous configuration generation, in which case we accept any
// generation.
rejectionReason
}
ParametersTable.updateLedgerEnd(offset)
val configurationBytes = Configuration.encode(configuration).toByteArray
val typ = if (finalRejectionReason.isEmpty) {
acceptType
} else {
rejectType
case _ =>
// Rejection reason was set, or we have no previous configuration generation, in which case we accept any
// generation.
rejectionReason
}
Try({
SQL_INSERT_CONFIGURATION_ENTRY
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordedAt,
"submission_id" -> submissionId,
"typ" -> typ,
"rejection_reason" -> finalRejectionReason.orNull,
"configuration" -> configurationBytes
)
.execute()
if (typ == acceptType) {
ParametersTable.updateConfiguration(configurationBytes)
}
PersistenceResponse.Ok
}).recover {
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
logger.warn(s"Ignoring duplicate configuration submission, submissionId=$submissionId")
conn.rollback()
PersistenceResponse.Duplicate
}.get
ParametersTable.updateLedgerEnd(offset)
val configurationBytes = Configuration.encode(configuration).toByteArray
val typ = if (finalRejectionReason.isEmpty) {
acceptType
} else {
rejectType
}
Try({
SQL_INSERT_CONFIGURATION_ENTRY
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordedAt,
"submission_id" -> submissionId,
"typ" -> typ,
"rejection_reason" -> finalRejectionReason.orNull,
"configuration" -> configurationBytes
)
.execute()
if (typ == acceptType) {
ParametersTable.updateConfiguration(configurationBytes)
}
PersistenceResponse.Ok
}).recover {
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
logger.warn(s"Ignoring duplicate configuration submission, submissionId=$submissionId")
conn.rollback()
PersistenceResponse.Duplicate
}.get
}
private val SQL_INSERT_PARTY_ENTRY_ACCEPT =
@ -281,50 +290,51 @@ private class JdbcLedgerDao(
offset: Offset,
partyEntry: PartyLedgerEntry,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] = {
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) {
queries.withAsyncCommit { implicit conn =>
ParametersTable.updateLedgerEnd(offset)
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
ParametersTable.updateLedgerEnd(offset)
partyEntry match {
case PartyLedgerEntry.AllocationAccepted(submissionIdOpt, recordTime, partyDetails) =>
Try({
SQL_INSERT_PARTY_ENTRY_ACCEPT
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordTime,
"submission_id" -> submissionIdOpt,
"party" -> partyDetails.party,
"display_name" -> partyDetails.displayName,
"is_local" -> partyDetails.isLocal,
)
.execute()
SQL_INSERT_PARTY
.on(
"party" -> partyDetails.party,
"display_name" -> partyDetails.displayName,
"ledger_offset" -> offset,
"is_local" -> partyDetails.isLocal
)
.execute()
PersistenceResponse.Ok
}).recover {
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
logger.warn(
s"Ignoring duplicate party submission with ID ${partyDetails.party} for submissionId $submissionIdOpt")
conn.rollback()
PersistenceResponse.Duplicate
}.get
case PartyLedgerEntry.AllocationRejected(submissionId, recordTime, reason) =>
SQL_INSERT_PARTY_ENTRY_REJECT
partyEntry match {
case PartyLedgerEntry.AllocationAccepted(submissionIdOpt, recordTime, partyDetails) =>
Try({
SQL_INSERT_PARTY_ENTRY_ACCEPT
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordTime,
"submission_id" -> submissionId,
"rejection_reason" -> reason
"submission_id" -> submissionIdOpt,
"party" -> partyDetails.party,
"display_name" -> partyDetails.displayName,
"is_local" -> partyDetails.isLocal,
)
.execute()
SQL_INSERT_PARTY
.on(
"party" -> partyDetails.party,
"display_name" -> partyDetails.displayName,
"ledger_offset" -> offset,
"is_local" -> partyDetails.isLocal
)
.execute()
PersistenceResponse.Ok
}
}).recover {
case NonFatal(e) if e.getMessage.contains(queries.DUPLICATE_KEY_ERROR) =>
logger.warn(
s"Ignoring duplicate party submission with ID ${partyDetails.party} for submissionId $submissionIdOpt")
conn.rollback()
PersistenceResponse.Duplicate
}.get
case PartyLedgerEntry.AllocationRejected(submissionId, recordTime, reason) =>
SQL_INSERT_PARTY_ENTRY_REJECT
.on(
"ledger_offset" -> offset,
"recorded_at" -> recordTime,
"submission_id" -> submissionId,
"rejection_reason" -> reason
)
.execute()
PersistenceResponse.Ok
}
}
@ -435,39 +445,38 @@ private class JdbcLedgerDao(
blindingInfo: Option[BlindingInfo],
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics) {
queries.withAsyncCommit { implicit conn =>
val error =
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.commitValidation,
postCommitValidation.validate(
transaction = transaction,
transactionLedgerEffectiveTime = ledgerEffectiveTime,
divulged = divulged.iterator.map(_.contractId).toSet,
)
)
if (error.isEmpty) {
preparedInsert.write(metrics)
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.insertCompletion,
submitterInfo
.map(prepareCompletionInsert(_, offset, transactionId, recordTime))
.foreach(_.execute())
)
} else {
for (info @ SubmitterInfo(_, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(
domain.CommandId(commandId),
info.singleSubmitterOrThrow())
prepareRejectionInsert(info, offset, recordTime, error.get).execute()
}
}
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.updateLedgerEnd,
ParametersTable.updateLedgerEnd(offset)
)
Ok
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
val error =
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.commitValidation,
postCommitValidation.validate(
transaction = transaction,
transactionLedgerEffectiveTime = ledgerEffectiveTime,
divulged = divulged.iterator.map(_.contractId).toSet,
)
)
if (error.isEmpty) {
preparedInsert.write(metrics)
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.insertCompletion,
submitterInfo
.map(prepareCompletionInsert(_, offset, transactionId, recordTime))
.foreach(_.execute())
)
} else {
for (info @ SubmitterInfo(_, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(domain.CommandId(commandId), info.singleSubmitterOrThrow())
prepareRejectionInsert(info, offset, recordTime, error.get).execute()
}
}
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.updateLedgerEnd,
ParametersTable.updateLedgerEnd(offset)
)
Ok
}
override def storeRejection(
@ -476,15 +485,16 @@ private class JdbcLedgerDao(
offset: Offset,
reason: RejectionReason,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDbMetrics) {
queries.withAsyncCommit { implicit conn =>
for (info @ SubmitterInfo(_, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(domain.CommandId(commandId), info.singleSubmitterOrThrow())
prepareRejectionInsert(info, offset, recordTime, reason).execute()
}
ParametersTable.updateLedgerEnd(offset)
Ok
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDbMetrics) { implicit conn =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
for (info @ SubmitterInfo(_, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(domain.CommandId(commandId), info.singleSubmitterOrThrow())
prepareRejectionInsert(info, offset, recordTime, reason).execute()
}
ParametersTable.updateLedgerEnd(offset)
Ok
}
override def storeInitialState(
@ -654,7 +664,10 @@ private class JdbcLedgerDao(
optEntry: Option[PackageLedgerEntry]
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
dbDispatcher.executeSql(metrics.daml.index.db.storePackageEntryDbMetrics) {
queries.withAsyncCommit { implicit connection =>
implicit connection =>
if (enableAsyncCommits) {
queries.enableAsyncCommit
}
ParametersTable.updateLedgerEnd(offset)
if (packages.nonEmpty) {
@ -682,7 +695,6 @@ private class JdbcLedgerDao(
.execute()
}
PersistenceResponse.Ok
}
}
private def uploadLfPackages(uploadId: String, packages: List[(Archive, PackageDetails)])(
@ -881,6 +893,7 @@ private[platform] object JdbcLedgerDao {
validate = false,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = false,
).map(new MeteredLedgerReadDao(_, metrics))
}
@ -890,6 +903,7 @@ private[platform] object JdbcLedgerDao {
eventsPageSize: Int,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
jdbcAsyncCommits: Boolean,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val maxConnections =
@ -902,6 +916,7 @@ private[platform] object JdbcLedgerDao {
validate = false,
metrics,
lfValueTranslationCache,
jdbcAsyncCommits = jdbcAsyncCommits,
).map(new MeteredLedgerDao(_, metrics))
}
@ -924,7 +939,8 @@ private[platform] object JdbcLedgerDao {
validate = true,
metrics,
lfValueTranslationCache,
validatePartyAllocation
validatePartyAllocation,
jdbcAsyncCommits = false,
).map(new MeteredLedgerDao(_, metrics))
}
@ -960,6 +976,7 @@ private[platform] object JdbcLedgerDao {
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
validatePartyAllocation: Boolean = false,
jdbcAsyncCommits: Boolean,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(serverRole, jdbcUrl, maxConnections, metrics)
@ -975,6 +992,7 @@ private[platform] object JdbcLedgerDao {
metrics,
lfValueTranslationCache,
validatePartyAllocation,
jdbcAsyncCommits
)
sealed trait Queries {
@ -983,8 +1001,7 @@ private[platform] object JdbcLedgerDao {
* Performance optimization for transactions that don't
* require strong durability guarantees.
*/
protected[JdbcLedgerDao] def withAsyncCommit(block: Connection => PersistenceResponse)(
provided: Connection): PersistenceResponse
protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit
protected[JdbcLedgerDao] def SQL_INSERT_PACKAGE: String
@ -1027,15 +1044,13 @@ private[platform] object JdbcLedgerDao {
|truncate table party_entries cascade;
""".stripMargin
override protected[JdbcLedgerDao] def withAsyncCommit(block: Connection => PersistenceResponse)(
provided: Connection): PersistenceResponse = {
var statement: PreparedStatement = null
override protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit = {
val statement = conn.prepareStatement("SET LOCAL synchronous_commit = 'off'")
try {
statement = provided.prepareStatement("SET LOCAL synchronous_commit = 'off'")
statement.execute()
block(provided)
()
} finally {
if (statement != null) statement.close()
statement.close()
}
}
}
@ -1075,8 +1090,6 @@ private[platform] object JdbcLedgerDao {
""".stripMargin
/** Async commit not supported for H2 */
override protected[JdbcLedgerDao] def withAsyncCommit(block: Connection => PersistenceResponse)(
provided: Connection): PersistenceResponse =
block(provided)
override protected[JdbcLedgerDao] def enableAsyncCommit(implicit conn: Connection): Unit = ()
}
}

View File

@ -46,6 +46,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
eventsPageSize = eventsPageSize,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
)
protected final var ledgerDao: LedgerDao = _

View File

@ -216,6 +216,7 @@ class RecoveringIndexerIntegrationSpec
eventsPageSize = 100,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslation.Cache.none,
jdbcAsyncCommits = true,
)
}
}

View File

@ -4,4 +4,16 @@
<configuration>
<include resource="com/daml/platform/sandbox/logback-test.base.xml"/>
<appender name="SqlLedgerSpecAppender" class="com.daml.platform.testing.LogCollector">
<test>com.daml.platform.sandbox.stores.ledger.sql.SqlLedgerSpec</test>
</appender>
<logger name="com.daml.platform.sandbox.stores.ledger.sql.SqlLedger" level="INFO">
<appender-ref ref="SqlLedgerSpecAppender" />
</logger>
<logger name="com.daml.platform.store.dao.JdbcLedgerDao" level="INFO">
<appender-ref ref="SqlLedgerSpecAppender" />
</logger>
</configuration>

View File

@ -6,6 +6,7 @@ package com.daml.platform.sandbox.stores.ledger.sql
import java.nio.file.Paths
import java.time.Instant
import ch.qos.logback.classic.Level
import com.daml.api.util.TimeProvider
import com.daml.bazeltools.BazelRunfiles.rlocation
import com.daml.daml_lf_dev.DamlLf
@ -27,6 +28,7 @@ import com.daml.platform.sandbox.stores.ledger.Ledger
import com.daml.platform.sandbox.stores.ledger.sql.SqlLedgerSpec._
import com.daml.platform.store.IndexMetadata
import com.daml.platform.store.dao.events.LfValueTranslation
import com.daml.platform.testing.LogCollector
import com.daml.testing.postgresql.PostgresAroundEach
import org.scalatest.concurrent.{AsyncTimeLimitedTests, Eventually, ScaledTimeSpans}
import org.scalatest.time.{Minute, Seconds, Span}
@ -62,6 +64,7 @@ final class SqlLedgerSpec
}
override protected def afterEach(): Unit = {
LogCollector.clear[SqlLedgerSpec]
for (ledger <- createdLedgers)
Await.result(ledger.release(), 2.seconds)
super.afterEach()
@ -202,6 +205,23 @@ final class SqlLedgerSpec
ledger.currentHealth() should be(Healthy)
}
}
/**
* Workaround test for asserting that PostgreSQL asynchronous commits are disabled in
* [[com.daml.platform.store.dao.JdbcLedgerDao]] transactions when used from [[SqlLedger]].
*
* NOTE: This is needed for ensuring durability guarantees of DAML-on-SQL.
*/
"does not use async commit when building JdbcLedgerDao" in {
for {
_ <- createSqlLedger(validatePartyAllocation = false)
} yield {
val jdbcLedgerDaoLogs =
LogCollector.read[this.type]("com.daml.platform.store.dao.JdbcLedgerDao")
jdbcLedgerDaoLogs should contain(
Level.INFO -> "Starting JdbcLedgerDao with async commit disabled")
}
}
}
private def createSqlLedger(validatePartyAllocation: Boolean): Future[Ledger] =

View File

@ -21,9 +21,13 @@ object LogCollector {
def read[Test, Logger](
implicit test: ClassTag[Test],
logger: ClassTag[Logger]): IndexedSeq[(Level, String)] =
read[Test](logger.runtimeClass.getName)
def read[Test](loggerClassName: String)(
implicit test: ClassTag[Test]): IndexedSeq[(Level, String)] =
log
.get(test.runtimeClass.getName)
.flatMap(_.get(logger.runtimeClass.getName))
.flatMap(_.get(loggerClassName))
.fold(IndexedSeq.empty[(Level, String)])(_.result())
def clear[Test](implicit test: ClassTag[Test]): Unit = {