mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
Safely update ledger end (#8221)
CHANGELOG_BEGIN [Integration Kit] Re-enabled asynchronous commits in JdbcIndexer. CHANGELOG_END
This commit is contained in:
parent
76b6fd86fb
commit
ae28cf40c2
@ -3,6 +3,8 @@
|
||||
|
||||
package com.daml.platform.indexer
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl.{Flow, Keep, Sink}
|
||||
@ -22,6 +24,7 @@ import com.daml.platform.ApiOffset.ApiOffsetConverter
|
||||
import com.daml.platform.common
|
||||
import com.daml.platform.common.MismatchException
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.indexer.OffsetUpdate.OffsetStepUpdatePair
|
||||
import com.daml.platform.store.FlywayMigrations
|
||||
import com.daml.platform.store.dao.events.LfValueTranslation
|
||||
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerDao, PersistenceResponse}
|
||||
@ -63,10 +66,7 @@ object JdbcIndexer {
|
||||
config.eventsPageSize,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
/* There is currently a corner-case which affects durability
|
||||
* guarantees when performing async commits. This feature
|
||||
* will be disabled until the mitigation is merged. */
|
||||
jdbcAsyncCommits = false,
|
||||
jdbcAsyncCommits = true,
|
||||
)
|
||||
_ <- ResourceOwner.forFuture(() => ledgerDao.reset())
|
||||
initialLedgerEnd <- initializeLedger(ledgerDao)
|
||||
@ -80,7 +80,7 @@ object JdbcIndexer {
|
||||
config.eventsPageSize,
|
||||
metrics,
|
||||
lfValueTranslationCache,
|
||||
jdbcAsyncCommits = false,
|
||||
jdbcAsyncCommits = true,
|
||||
)
|
||||
initialLedgerEnd <- initializeLedger(ledgerDao)
|
||||
} yield new JdbcIndexer(initialLedgerEnd, config.participantId, ledgerDao, metrics)
|
||||
@ -237,45 +237,45 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
new SubscriptionResourceOwner(readService)
|
||||
|
||||
private def handleStateUpdate(
|
||||
implicit loggingContext: LoggingContext): Flow[(Offset, Update), Unit, NotUsed] =
|
||||
Flow[(Offset, Update)]
|
||||
.wireTap(Sink.foreach[(Offset, Update)] {
|
||||
case (offset, update) =>
|
||||
implicit loggingContext: LoggingContext): Flow[OffsetUpdate, Unit, NotUsed] =
|
||||
Flow[OffsetUpdate]
|
||||
.wireTap(Sink.foreach[OffsetUpdate] {
|
||||
case OffsetUpdate(offsetStep, update) =>
|
||||
val lastReceivedRecordTime = update.recordTime.toInstant.toEpochMilli
|
||||
|
||||
logger.trace(update.description)
|
||||
|
||||
metrics.daml.indexer.lastReceivedRecordTime.updateValue(lastReceivedRecordTime)
|
||||
metrics.daml.indexer.lastReceivedOffset.updateValue(offset.toApiString)
|
||||
metrics.daml.indexer.lastReceivedOffset.updateValue(offsetStep.offset.toApiString)
|
||||
})
|
||||
.mapAsync(1)((prepareTransactionInsert _).tupled)
|
||||
.mapAsync(1)(prepareTransactionInsert)
|
||||
.mapAsync(1) {
|
||||
case kvUpdate @ OffsetUpdate(offset, update) =>
|
||||
withEnrichedLoggingContext(JdbcIndexer.loggingContextFor(offset, update)) {
|
||||
case offsetUpdate @ OffsetUpdate(offsetStep, update) =>
|
||||
withEnrichedLoggingContext(JdbcIndexer.loggingContextFor(offsetStep.offset, update)) {
|
||||
implicit loggingContext =>
|
||||
Timed.future(
|
||||
metrics.daml.indexer.stateUpdateProcessing,
|
||||
executeUpdate(kvUpdate),
|
||||
executeUpdate(offsetUpdate),
|
||||
)
|
||||
}
|
||||
}
|
||||
.map(_ => ())
|
||||
|
||||
private def prepareTransactionInsert(offset: Offset, update: Update): Future[OffsetUpdate] =
|
||||
update match {
|
||||
case tx: TransactionAccepted =>
|
||||
private def prepareTransactionInsert(offsetUpdate: OffsetUpdate): Future[OffsetUpdate] =
|
||||
offsetUpdate match {
|
||||
case OffsetStepUpdatePair(offsetStep, tx: TransactionAccepted) =>
|
||||
Timed.future(
|
||||
metrics.daml.index.db.storeTransactionDbMetrics.prepareBatches,
|
||||
Future {
|
||||
OffsetUpdate.PreparedTransactionInsert(
|
||||
offset = offset,
|
||||
offsetStep = offsetStep,
|
||||
update = tx,
|
||||
preparedInsert = ledgerDao.prepareTransactionInsert(
|
||||
submitterInfo = tx.optSubmitterInfo,
|
||||
workflowId = tx.transactionMeta.workflowId,
|
||||
transactionId = tx.transactionId,
|
||||
ledgerEffectiveTime = tx.transactionMeta.ledgerEffectiveTime.toInstant,
|
||||
offset = offset,
|
||||
offset = offsetStep.offset,
|
||||
transaction = tx.transaction,
|
||||
divulgedContracts = tx.divulgedContracts,
|
||||
blindingInfo = tx.blindingInfo,
|
||||
@ -283,14 +283,14 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
)
|
||||
}(mat.executionContext)
|
||||
)
|
||||
case update => Future.successful(OffsetUpdate.OffsetUpdatePair(offset, update))
|
||||
case offsetUpdate => Future.successful(offsetUpdate)
|
||||
}
|
||||
|
||||
private def executeUpdate(offsetUpdate: OffsetUpdate)(
|
||||
implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
offsetUpdate match {
|
||||
case OffsetUpdate.PreparedTransactionInsert(
|
||||
offset,
|
||||
offsetStep,
|
||||
TransactionAccepted(
|
||||
optSubmitterInfo,
|
||||
transactionMeta,
|
||||
@ -307,12 +307,12 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
transactionId = transactionId,
|
||||
recordTime = recordTime.toInstant,
|
||||
ledgerEffectiveTime = transactionMeta.ledgerEffectiveTime.toInstant,
|
||||
offset = offset,
|
||||
offsetStep = offsetStep,
|
||||
transaction = transaction,
|
||||
divulged = divulgedContracts,
|
||||
blindingInfo = blindingInfo,
|
||||
)
|
||||
case OffsetUpdate.OffsetUpdatePair(offset, update) =>
|
||||
case OffsetUpdate.OffsetStepUpdatePair(offsetStep, update) =>
|
||||
update match {
|
||||
case PartyAddedToParticipant(
|
||||
party,
|
||||
@ -326,7 +326,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
recordTime.toInstant,
|
||||
domain.PartyDetails(party, Some(displayName), participantId == hostingParticipantId)
|
||||
)
|
||||
ledgerDao.storePartyEntry(offset, entry)
|
||||
ledgerDao.storePartyEntry(offsetStep, entry)
|
||||
|
||||
case PartyAllocationRejected(
|
||||
submissionId,
|
||||
@ -339,7 +339,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
recordTime.toInstant,
|
||||
rejectionReason,
|
||||
)
|
||||
ledgerDao.storePartyEntry(offset, entry)
|
||||
ledgerDao.storePartyEntry(offsetStep, entry)
|
||||
|
||||
case PublicPackageUpload(archives, optSourceDescription, recordTime, optSubmissionId) =>
|
||||
val recordTimeInstant = recordTime.toInstant
|
||||
@ -353,7 +353,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
val optEntry: Option[PackageLedgerEntry] =
|
||||
optSubmissionId.map(submissionId =>
|
||||
PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTimeInstant))
|
||||
ledgerDao.storePackageEntry(offset, packages, optEntry)
|
||||
ledgerDao.storePackageEntry(offsetStep, packages, optEntry)
|
||||
|
||||
case PublicPackageUploadRejected(submissionId, recordTime, rejectionReason) =>
|
||||
val entry = PackageLedgerEntry.PackageUploadRejected(
|
||||
@ -361,11 +361,11 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
recordTime.toInstant,
|
||||
rejectionReason,
|
||||
)
|
||||
ledgerDao.storePackageEntry(offset, List.empty, Some(entry))
|
||||
ledgerDao.storePackageEntry(offsetStep, List.empty, Some(entry))
|
||||
|
||||
case config: ConfigurationChanged =>
|
||||
ledgerDao.storeConfigurationEntry(
|
||||
offset,
|
||||
offsetStep,
|
||||
config.recordTime.toInstant,
|
||||
config.submissionId,
|
||||
config.newConfiguration,
|
||||
@ -374,7 +374,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
|
||||
case configRejection: ConfigurationChangeRejected =>
|
||||
ledgerDao.storeConfigurationEntry(
|
||||
offset,
|
||||
offsetStep,
|
||||
configRejection.recordTime.toInstant,
|
||||
configRejection.submissionId,
|
||||
configRejection.proposedConfiguration,
|
||||
@ -382,7 +382,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
)
|
||||
|
||||
case CommandRejected(recordTime, submitterInfo, reason) =>
|
||||
ledgerDao.storeRejection(Some(submitterInfo), recordTime.toInstant, offset, reason)
|
||||
ledgerDao.storeRejection(Some(submitterInfo), recordTime.toInstant, offsetStep, reason)
|
||||
case update: TransactionAccepted =>
|
||||
import update._
|
||||
logger.warn(
|
||||
@ -394,7 +394,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
workflowId = transactionMeta.workflowId,
|
||||
transactionId = transactionId,
|
||||
ledgerEffectiveTime = transactionMeta.ledgerEffectiveTime.toInstant,
|
||||
offset = offset,
|
||||
offset = offsetStep.offset,
|
||||
transaction = transaction,
|
||||
divulgedContracts = divulgedContracts,
|
||||
blindingInfo = blindingInfo,
|
||||
@ -403,7 +403,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
transactionId = transactionId,
|
||||
recordTime = recordTime.toInstant,
|
||||
ledgerEffectiveTime = transactionMeta.ledgerEffectiveTime.toInstant,
|
||||
offset = offset,
|
||||
offsetStep = offsetStep,
|
||||
transaction = transaction,
|
||||
divulged = divulgedContracts,
|
||||
blindingInfo = blindingInfo,
|
||||
@ -411,6 +411,28 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
}
|
||||
}
|
||||
|
||||
private def zipWithPreviousOffset(
|
||||
initialOffset: Option[Offset]): Flow[(Offset, Update), OffsetStepUpdatePair, NotUsed] =
|
||||
Flow[(Offset, Update)]
|
||||
.statefulMapConcat { () =>
|
||||
val previousOffsetRef = new AtomicReference(initialOffset)
|
||||
|
||||
{ offsetUpdateTuple: (Offset, Update) =>
|
||||
val (nextOffset, update) = offsetUpdateTuple
|
||||
val offsetStep =
|
||||
previousOffsetRef
|
||||
.getAndSet(Some(nextOffset))
|
||||
.map { previousOffset =>
|
||||
IncrementalOffsetStep(previousOffset, nextOffset)
|
||||
}
|
||||
.getOrElse {
|
||||
CurrentOffset(nextOffset)
|
||||
}
|
||||
|
||||
OffsetStepUpdatePair(offsetStep, update) :: Nil
|
||||
}
|
||||
}
|
||||
|
||||
private class SubscriptionResourceOwner(
|
||||
readService: ReadService,
|
||||
)(implicit loggingContext: LoggingContext)
|
||||
@ -420,6 +442,7 @@ private[daml] class JdbcIndexer private[indexer] (
|
||||
val (killSwitch, completionFuture) = readService
|
||||
.stateUpdates(startExclusive)
|
||||
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
|
||||
.via(zipWithPreviousOffset(startExclusive))
|
||||
.via(handleStateUpdate)
|
||||
.toMat(Sink.ignore)(Keep.both)
|
||||
.run()
|
||||
|
@ -7,21 +7,35 @@ import com.daml.ledger.participant.state.v1.{Offset, Update}
|
||||
import com.daml.ledger.participant.state.v1.Update.TransactionAccepted
|
||||
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
|
||||
|
||||
sealed trait OffsetUpdate {
|
||||
def offset: Offset
|
||||
sealed trait OffsetUpdate extends Product with Serializable {
|
||||
def offsetStep: OffsetStep
|
||||
|
||||
def update: Update
|
||||
}
|
||||
|
||||
object OffsetUpdate {
|
||||
def unapply(offsetUpdate: OffsetUpdate): Option[(Offset, Update)] =
|
||||
Some((offsetUpdate.offset, offsetUpdate.update))
|
||||
def unapply(offsetUpdate: OffsetUpdate): Option[(OffsetStep, Update)] =
|
||||
Some((offsetUpdate.offsetStep, offsetUpdate.update))
|
||||
|
||||
final case class PreparedTransactionInsert(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
update: TransactionAccepted,
|
||||
preparedInsert: PreparedInsert)
|
||||
extends OffsetUpdate
|
||||
|
||||
final case class OffsetUpdatePair(offset: Offset, update: Update) extends OffsetUpdate
|
||||
final case class OffsetStepUpdatePair(offsetStep: OffsetStep, update: Update) extends OffsetUpdate
|
||||
}
|
||||
|
||||
sealed trait OffsetStep extends Product with Serializable {
|
||||
def offset: Offset
|
||||
}
|
||||
|
||||
object OffsetStep {
|
||||
def apply(previousOffset: Option[Offset], offset: Offset): OffsetStep = previousOffset match {
|
||||
case Some(prevOffset) => IncrementalOffsetStep(prevOffset, offset)
|
||||
case None => CurrentOffset(offset)
|
||||
}
|
||||
}
|
||||
|
||||
final case class CurrentOffset(offset: Offset) extends OffsetStep
|
||||
final case class IncrementalOffsetStep(previousOffset: Offset, offset: Offset) extends OffsetStep
|
||||
|
@ -34,6 +34,7 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
import com.daml.platform.configuration.ServerRole
|
||||
import com.daml.platform.indexer.{CurrentOffset, OffsetStep}
|
||||
import com.daml.platform.store.Conversions._
|
||||
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
|
||||
import com.daml.platform.store._
|
||||
@ -210,7 +211,7 @@ private class JdbcLedgerDao(
|
||||
|""".stripMargin)
|
||||
|
||||
override def storeConfigurationEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
recordedAt: Instant,
|
||||
submissionId: String,
|
||||
configuration: Configuration,
|
||||
@ -241,7 +242,7 @@ private class JdbcLedgerDao(
|
||||
rejectionReason
|
||||
}
|
||||
|
||||
ParametersTable.updateLedgerEnd(offset)
|
||||
ParametersTable.updateLedgerEnd(offsetStep)
|
||||
val configurationBytes = Configuration.encode(configuration).toByteArray
|
||||
val typ = if (finalRejectionReason.isEmpty) {
|
||||
acceptType
|
||||
@ -252,7 +253,7 @@ private class JdbcLedgerDao(
|
||||
Try({
|
||||
SQL_INSERT_CONFIGURATION_ENTRY
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"recorded_at" -> recordedAt,
|
||||
"submission_id" -> submissionId,
|
||||
"typ" -> typ,
|
||||
@ -288,21 +289,21 @@ private class JdbcLedgerDao(
|
||||
|""".stripMargin)
|
||||
|
||||
override def storePartyEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
partyEntry: PartyLedgerEntry,
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] = {
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) { implicit conn =>
|
||||
if (enableAsyncCommits) {
|
||||
queries.enableAsyncCommit
|
||||
}
|
||||
ParametersTable.updateLedgerEnd(offset)
|
||||
ParametersTable.updateLedgerEnd(offsetStep)
|
||||
|
||||
partyEntry match {
|
||||
case PartyLedgerEntry.AllocationAccepted(submissionIdOpt, recordTime, partyDetails) =>
|
||||
Try({
|
||||
SQL_INSERT_PARTY_ENTRY_ACCEPT
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"recorded_at" -> recordTime,
|
||||
"submission_id" -> submissionIdOpt,
|
||||
"party" -> partyDetails.party,
|
||||
@ -314,7 +315,7 @@ private class JdbcLedgerDao(
|
||||
.on(
|
||||
"party" -> partyDetails.party,
|
||||
"display_name" -> partyDetails.displayName,
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"is_local" -> partyDetails.isLocal
|
||||
)
|
||||
.execute()
|
||||
@ -329,7 +330,7 @@ private class JdbcLedgerDao(
|
||||
case PartyLedgerEntry.AllocationRejected(submissionId, recordTime, reason) =>
|
||||
SQL_INSERT_PARTY_ENTRY_REJECT
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"recorded_at" -> recordTime,
|
||||
"submission_id" -> submissionId,
|
||||
"rejection_reason" -> reason
|
||||
@ -450,7 +451,7 @@ private class JdbcLedgerDao(
|
||||
transactionId: TransactionId,
|
||||
recordTime: Instant,
|
||||
ledgerEffectiveTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
transaction: CommittedTransaction,
|
||||
divulged: Iterable[DivulgedContract],
|
||||
blindingInfo: Option[BlindingInfo],
|
||||
@ -474,15 +475,15 @@ private class JdbcLedgerDao(
|
||||
Timed.value(
|
||||
metrics.daml.index.db.storeTransactionDbMetrics.insertCompletion,
|
||||
submitterInfo
|
||||
.map(prepareCompletionInsert(_, offset, transactionId, recordTime))
|
||||
.map(prepareCompletionInsert(_, offsetStep.offset, transactionId, recordTime))
|
||||
.foreach(_.execute())
|
||||
)
|
||||
} else {
|
||||
submitterInfo.foreach(handleError(offset, _, recordTime, error.get))
|
||||
submitterInfo.foreach(handleError(offsetStep.offset, _, recordTime, error.get))
|
||||
}
|
||||
Timed.value(
|
||||
metrics.daml.index.db.storeTransactionDbMetrics.updateLedgerEnd,
|
||||
ParametersTable.updateLedgerEnd(offset)
|
||||
ParametersTable.updateLedgerEnd(offsetStep)
|
||||
)
|
||||
Ok
|
||||
}
|
||||
@ -490,7 +491,7 @@ private class JdbcLedgerDao(
|
||||
override def storeRejection(
|
||||
submitterInfo: Option[SubmitterInfo],
|
||||
recordTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
reason: RejectionReason,
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDbMetrics) { implicit conn =>
|
||||
@ -498,9 +499,9 @@ private class JdbcLedgerDao(
|
||||
queries.enableAsyncCommit
|
||||
}
|
||||
for (info <- submitterInfo) {
|
||||
handleError(offset, info, recordTime, reason)
|
||||
handleError(offsetStep.offset, info, recordTime, reason)
|
||||
}
|
||||
ParametersTable.updateLedgerEnd(offset)
|
||||
ParametersTable.updateLedgerEnd(offsetStep)
|
||||
Ok
|
||||
}
|
||||
|
||||
@ -540,7 +541,7 @@ private class JdbcLedgerDao(
|
||||
).execute()
|
||||
}
|
||||
}
|
||||
ParametersTable.updateLedgerEnd(newLedgerEnd)
|
||||
ParametersTable.updateLedgerEnd(CurrentOffset(newLedgerEnd))
|
||||
}
|
||||
|
||||
private def toParticipantRejection(reason: domain.RejectionReason): RejectionReason =
|
||||
@ -662,16 +663,16 @@ private class JdbcLedgerDao(
|
||||
|""".stripMargin)
|
||||
|
||||
override def storePackageEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
packages: List[(Archive, PackageDetails)],
|
||||
optEntry: Option[PackageLedgerEntry]
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
optEntry: Option[PackageLedgerEntry])(
|
||||
implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.storePackageEntryDbMetrics) {
|
||||
implicit connection =>
|
||||
if (enableAsyncCommits) {
|
||||
queries.enableAsyncCommit
|
||||
}
|
||||
ParametersTable.updateLedgerEnd(offset)
|
||||
ParametersTable.updateLedgerEnd(offsetStep)
|
||||
|
||||
if (packages.nonEmpty) {
|
||||
val uploadId = optEntry.map(_.submissionId).getOrElse(UUID.randomUUID().toString)
|
||||
@ -682,7 +683,7 @@ private class JdbcLedgerDao(
|
||||
case PackageLedgerEntry.PackageUploadAccepted(submissionId, recordTime) =>
|
||||
SQL_INSERT_PACKAGE_ENTRY_ACCEPT
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"recorded_at" -> recordTime,
|
||||
"submission_id" -> submissionId,
|
||||
)
|
||||
@ -690,7 +691,7 @@ private class JdbcLedgerDao(
|
||||
case PackageLedgerEntry.PackageUploadRejected(submissionId, recordTime, reason) =>
|
||||
SQL_INSERT_PACKAGE_ENTRY_REJECT
|
||||
.on(
|
||||
"ledger_offset" -> offset,
|
||||
"ledger_offset" -> offsetStep.offset,
|
||||
"recorded_at" -> recordTime,
|
||||
"submission_id" -> submissionId,
|
||||
"rejection_reason" -> reason
|
||||
|
@ -27,6 +27,7 @@ import com.daml.lf.transaction.{BlindingInfo, GlobalKey}
|
||||
import com.daml.lf.value.Value
|
||||
import com.daml.lf.value.Value.{ContractId, ContractInst}
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.platform.indexer.OffsetStep
|
||||
import com.daml.platform.store.dao.events.{TransactionsReader, TransactionsWriter}
|
||||
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
|
||||
import com.daml.platform.store.entries.{
|
||||
@ -207,7 +208,7 @@ private[platform] trait LedgerWriteDao extends ReportsHealth {
|
||||
transactionId: TransactionId,
|
||||
recordTime: Instant,
|
||||
ledgerEffectiveTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
transaction: CommittedTransaction,
|
||||
divulged: Iterable[DivulgedContract],
|
||||
blindingInfo: Option[BlindingInfo],
|
||||
@ -216,7 +217,7 @@ private[platform] trait LedgerWriteDao extends ReportsHealth {
|
||||
def storeRejection(
|
||||
submitterInfo: Option[SubmitterInfo],
|
||||
recordTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
reason: RejectionReason,
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse]
|
||||
|
||||
@ -235,11 +236,11 @@ private[platform] trait LedgerWriteDao extends ReportsHealth {
|
||||
/**
|
||||
* Stores a party allocation or rejection thereof.
|
||||
*
|
||||
* @param offset the offset to store the party entry
|
||||
* @param offsetStep Pair of previous offset and the offset to store the party entry at
|
||||
* @param partyEntry the PartyEntry to be stored
|
||||
* @return Ok when the operation was successful otherwise a Duplicate
|
||||
*/
|
||||
def storePartyEntry(offset: Offset, partyEntry: PartyLedgerEntry)(
|
||||
def storePartyEntry(offsetStep: OffsetStep, partyEntry: PartyLedgerEntry)(
|
||||
implicit loggingContext: LoggingContext,
|
||||
): Future[PersistenceResponse]
|
||||
|
||||
@ -247,18 +248,18 @@ private[platform] trait LedgerWriteDao extends ReportsHealth {
|
||||
* Store a configuration change or rejection.
|
||||
*/
|
||||
def storeConfigurationEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
recordedAt: Instant,
|
||||
submissionId: String,
|
||||
configuration: Configuration,
|
||||
rejectionReason: Option[String]
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse]
|
||||
rejectionReason: Option[String])(
|
||||
implicit loggingContext: LoggingContext): Future[PersistenceResponse]
|
||||
|
||||
/**
|
||||
* Store a DAML-LF package upload result.
|
||||
*/
|
||||
def storePackageEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
packages: List[(Archive, PackageDetails)],
|
||||
optEntry: Option[PackageLedgerEntry]
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse]
|
||||
|
@ -20,6 +20,7 @@ import com.daml.lf.value.Value
|
||||
import com.daml.lf.value.Value.{ContractId, ContractInst}
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
import com.daml.platform.indexer.OffsetStep
|
||||
import com.daml.platform.store.dao.events.{TransactionsReader, TransactionsWriter}
|
||||
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
|
||||
import com.daml.platform.store.entries.{
|
||||
@ -162,7 +163,7 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
transactionId: TransactionId,
|
||||
recordTime: Instant,
|
||||
ledgerEffectiveTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
transaction: CommittedTransaction,
|
||||
divulged: Iterable[DivulgedContract],
|
||||
blindingInfo: Option[BlindingInfo],
|
||||
@ -175,7 +176,7 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
transactionId,
|
||||
recordTime,
|
||||
ledgerEffectiveTime,
|
||||
offset,
|
||||
offsetStep,
|
||||
transaction,
|
||||
divulged,
|
||||
blindingInfo,
|
||||
@ -206,12 +207,12 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
override def storeRejection(
|
||||
submitterInfo: Option[SubmitterInfo],
|
||||
recordTime: Instant,
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
reason: RejectionReason,
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
Timed.future(
|
||||
metrics.daml.index.db.storeRejection,
|
||||
ledgerDao.storeRejection(submitterInfo, recordTime, offset, reason),
|
||||
ledgerDao.storeRejection(submitterInfo, recordTime, offsetStep, reason),
|
||||
)
|
||||
|
||||
override def storeInitialState(
|
||||
@ -235,16 +236,16 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
ledgerDao.reset()
|
||||
|
||||
override def storePartyEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
partyEntry: PartyLedgerEntry,
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
Timed.future(
|
||||
metrics.daml.index.db.storePartyEntry,
|
||||
ledgerDao.storePartyEntry(offset, partyEntry),
|
||||
ledgerDao.storePartyEntry(offsetStep, partyEntry),
|
||||
)
|
||||
|
||||
override def storeConfigurationEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
recordTime: Instant,
|
||||
submissionId: String,
|
||||
configuration: Configuration,
|
||||
@ -253,7 +254,7 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
Timed.future(
|
||||
metrics.daml.index.db.storeConfigurationEntry,
|
||||
ledgerDao.storeConfigurationEntry(
|
||||
offset,
|
||||
offsetStep,
|
||||
recordTime,
|
||||
submissionId,
|
||||
configuration,
|
||||
@ -262,12 +263,12 @@ private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics)
|
||||
)
|
||||
|
||||
override def storePackageEntry(
|
||||
offset: Offset,
|
||||
offsetStep: OffsetStep,
|
||||
packages: List[(Archive, PackageDetails)],
|
||||
entry: Option[PackageLedgerEntry],
|
||||
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] =
|
||||
Timed.future(
|
||||
metrics.daml.index.db.storePackageEntry,
|
||||
ledgerDao.storePackageEntry(offset, packages, entry))
|
||||
ledgerDao.storePackageEntry(offsetStep, packages, entry))
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import anorm.SqlParser.byteArray
|
||||
import anorm.{Row, RowParser, SimpleSql, SqlStringInterpolation, ~}
|
||||
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
|
||||
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
|
||||
import com.daml.platform.indexer.{IncrementalOffsetStep, CurrentOffset, OffsetStep}
|
||||
import com.daml.platform.store.Conversions.{OffsetToStatement, ledgerString, offset, participantId}
|
||||
import com.daml.scalautil.Statement.discard
|
||||
|
||||
@ -72,11 +73,32 @@ private[dao] object ParametersTable {
|
||||
def getInitialLedgerEnd(connection: Connection): Option[Offset] =
|
||||
SelectLedgerEnd.as(LedgerEndParser.single)(connection)
|
||||
|
||||
def updateLedgerEnd(ledgerEnd: Offset)(implicit connection: Connection): Unit =
|
||||
discard(
|
||||
SQL"update #$TableName set #$LedgerEndColumnName = $ledgerEnd where (#$LedgerEndColumnName is null or #$LedgerEndColumnName < $ledgerEnd)"
|
||||
.execute()
|
||||
)
|
||||
/**
|
||||
* Updates the ledger end.
|
||||
*
|
||||
* When provided with a (previous, current) ledger end tuple ([[IncrementalOffsetStep]],
|
||||
* the update is performed conditioned by the match between the persisted ledger end and the
|
||||
* provided previous ledger end.
|
||||
*
|
||||
* This mechanism is used to protect callers that cannot provide strong durability guarantees
|
||||
* ([[JdbcLedgerDao]] when used with asynchronous commits on PostgreSQL).
|
||||
*
|
||||
* @param offsetStep The offset step.
|
||||
* @param connection The SQL connection.
|
||||
*/
|
||||
def updateLedgerEnd(offsetStep: OffsetStep)(implicit connection: Connection): Unit =
|
||||
offsetStep match {
|
||||
case CurrentOffset(ledgerEnd) =>
|
||||
discard(
|
||||
SQL"update #$TableName set #$LedgerEndColumnName = $ledgerEnd where (#$LedgerEndColumnName is null or #$LedgerEndColumnName < $ledgerEnd)"
|
||||
.execute())
|
||||
case IncrementalOffsetStep(previousOffset, ledgerEnd) =>
|
||||
val sqlStatement =
|
||||
SQL"update #$TableName set #$LedgerEndColumnName = $ledgerEnd where #$LedgerEndColumnName = $previousOffset"
|
||||
if (sqlStatement.executeUpdate() == 0) {
|
||||
throw LedgerEndUpdateError(previousOffset)
|
||||
}
|
||||
}
|
||||
|
||||
def updateConfiguration(configuration: Array[Byte])(
|
||||
implicit connection: Connection,
|
||||
@ -88,4 +110,7 @@ private[dao] object ParametersTable {
|
||||
LedgerEndAndConfigurationParser.single
|
||||
)(connection)
|
||||
|
||||
case class LedgerEndUpdateError(expected: Offset)
|
||||
extends RuntimeException(
|
||||
s"Could not update ledger end. Previous ledger end does not match expected ${expected.toHexString}")
|
||||
}
|
||||
|
@ -7,16 +7,16 @@ import java.time.Instant
|
||||
import java.util.UUID
|
||||
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason, SubmitterInfo}
|
||||
import com.daml.lf.data.Ref.Party
|
||||
import com.daml.ledger.ApplicationId
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
import com.daml.ledger.participant.state.v1.{Offset, RejectionReason, SubmitterInfo}
|
||||
import com.daml.lf.data.Ref.Party
|
||||
import com.daml.platform.ApiOffset
|
||||
import com.daml.platform.store.dao.JdbcLedgerDaoCompletionsSpec._
|
||||
import com.daml.platform.store.CompletionFromTransaction
|
||||
import org.scalatest.{LoneElement, OptionValues}
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import com.daml.platform.store.dao.JdbcLedgerDaoCompletionsSpec._
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
@ -161,7 +161,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl
|
||||
}
|
||||
|
||||
it should "return the expected status for each rejection reason" in {
|
||||
val reasons = Seq[RejectionReason](
|
||||
val reasons = List[RejectionReason](
|
||||
RejectionReason.Disputed(""),
|
||||
RejectionReason.Inconsistent(""),
|
||||
RejectionReason.InvalidLedgerTime(""),
|
||||
@ -173,7 +173,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl
|
||||
|
||||
for {
|
||||
from <- ledgerDao.lookupLedgerEnd()
|
||||
_ <- Future.sequence(reasons.map(reason => storeRejection(reason)))
|
||||
_ <- seq(reasons.map(reason => prepareStoreRejection(reason)))
|
||||
to <- ledgerDao.lookupLedgerEnd()
|
||||
responses <- ledgerDao.completions
|
||||
.getCommandCompletions(from, to, applicationId, parties)
|
||||
@ -189,16 +189,16 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl
|
||||
}
|
||||
}
|
||||
|
||||
private def storeRejection(
|
||||
private def prepareStoreRejection(
|
||||
reason: RejectionReason,
|
||||
commandId: String = UUID.randomUUID().toString,
|
||||
): Future[Offset] = {
|
||||
lazy val offset = nextOffset()
|
||||
): () => Future[Offset] = () => {
|
||||
val offset = nextOffset()
|
||||
ledgerDao
|
||||
.storeRejection(
|
||||
submitterInfo = Some(SubmitterInfo(List(party1), applicationId, commandId, Instant.EPOCH)),
|
||||
recordTime = Instant.now,
|
||||
offset = offset,
|
||||
offsetStep = nextOffsetStep(offset),
|
||||
reason = reason,
|
||||
)
|
||||
.map(_ => offset)
|
||||
@ -214,11 +214,23 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl
|
||||
submitterInfo = Some(
|
||||
SubmitterInfo(List(party1, party2, party3), applicationId, commandId, Instant.EPOCH)),
|
||||
recordTime = Instant.now,
|
||||
offset = offset,
|
||||
offsetStep = nextOffsetStep(offset),
|
||||
reason = reason,
|
||||
)
|
||||
.map(_ => offset)
|
||||
}
|
||||
|
||||
private def storeRejection(
|
||||
reason: RejectionReason,
|
||||
commandId: String = UUID.randomUUID().toString,
|
||||
): Future[Offset] = prepareStoreRejection(reason, commandId)()
|
||||
|
||||
/** Starts and executes futures sequentially */
|
||||
private def seq(s: List[() => Future[Offset]]): Future[Seq[Offset]] =
|
||||
s match {
|
||||
case Nil => Future(Seq.empty)
|
||||
case hd :: tail => hd().flatMap(offset => seq(tail).map(offset +: _))
|
||||
}
|
||||
}
|
||||
|
||||
private[dao] object JdbcLedgerDaoCompletionsSpec {
|
||||
|
@ -6,6 +6,9 @@ package com.daml.platform.store.dao
|
||||
import java.time.Instant
|
||||
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.daml.ledger.participant.state.v1.{Configuration, Offset}
|
||||
import com.daml.platform.indexer.{CurrentOffset, IncrementalOffsetStep}
|
||||
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
|
||||
import com.daml.platform.store.entries.ConfigurationEntry
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -21,12 +24,10 @@ trait JdbcLedgerDaoConfigurationSpec { this: AsyncFlatSpec with Matchers with Jd
|
||||
startingOffset <- ledgerDao.lookupLedgerEnd()
|
||||
startingConfig <- ledgerDao.lookupLedgerConfiguration()
|
||||
|
||||
response <- ledgerDao.storeConfigurationEntry(
|
||||
response <- storeConfigurationEntry(
|
||||
offset,
|
||||
Instant.EPOCH,
|
||||
s"submission-$offsetString",
|
||||
defaultConfig,
|
||||
None,
|
||||
)
|
||||
optStoredConfig <- ledgerDao.lookupLedgerConfiguration()
|
||||
endingOffset <- ledgerDao.lookupLedgerEnd()
|
||||
@ -45,12 +46,11 @@ trait JdbcLedgerDaoConfigurationSpec { this: AsyncFlatSpec with Matchers with Jd
|
||||
for {
|
||||
startingConfig <- ledgerDao.lookupLedgerConfiguration().map(_.map(_._2))
|
||||
proposedConfig = startingConfig.getOrElse(defaultConfig)
|
||||
response <- ledgerDao.storeConfigurationEntry(
|
||||
response <- storeConfigurationEntry(
|
||||
offset,
|
||||
Instant.EPOCH,
|
||||
s"config-rejection-$offsetString",
|
||||
proposedConfig,
|
||||
Some("bad config"),
|
||||
Some("bad config")
|
||||
)
|
||||
storedConfig <- ledgerDao.lookupLedgerConfiguration().map(_.map(_._2))
|
||||
entries <- ledgerDao
|
||||
@ -76,47 +76,34 @@ trait JdbcLedgerDaoConfigurationSpec { this: AsyncFlatSpec with Matchers with Jd
|
||||
|
||||
// Store a new configuration with a known submission id
|
||||
submissionId = s"refuse-config-$offsetString0"
|
||||
resp0 <- ledgerDao.storeConfigurationEntry(
|
||||
resp0 <- storeConfigurationEntry(
|
||||
offset0,
|
||||
Instant.EPOCH,
|
||||
submissionId,
|
||||
config.copy(generation = config.generation + 1),
|
||||
None,
|
||||
)
|
||||
newConfig <- ledgerDao.lookupLedgerConfiguration().map(_.map(_._2).get)
|
||||
|
||||
// Submission with duplicate submissionId is rejected
|
||||
offset1 = nextOffset()
|
||||
resp1 <- ledgerDao.storeConfigurationEntry(
|
||||
offset1,
|
||||
Instant.EPOCH,
|
||||
resp1 <- storeConfigurationEntry(
|
||||
nextOffset(),
|
||||
submissionId,
|
||||
newConfig.copy(generation = config.generation + 1),
|
||||
None,
|
||||
shouldUpdateLedgerEnd = false,
|
||||
)
|
||||
|
||||
// Submission with mismatching generation is rejected
|
||||
offset2 = nextOffset()
|
||||
offsetString2 = offset2.toLong
|
||||
resp2 <- ledgerDao.storeConfigurationEntry(
|
||||
resp2 <- storeConfigurationEntry(
|
||||
offset2,
|
||||
Instant.EPOCH,
|
||||
s"refuse-config-$offsetString2",
|
||||
config,
|
||||
None,
|
||||
)
|
||||
|
||||
// Submission with unique submissionId and correct generation is accepted.
|
||||
offset3 = nextOffset()
|
||||
offsetString3 = offset3.toLong
|
||||
offsetString3 = offset3
|
||||
lastConfig = newConfig.copy(generation = newConfig.generation + 1)
|
||||
resp3 <- ledgerDao.storeConfigurationEntry(
|
||||
offset3,
|
||||
Instant.EPOCH,
|
||||
s"refuse-config-$offsetString3",
|
||||
lastConfig,
|
||||
None,
|
||||
)
|
||||
resp3 <- storeConfigurationEntry(offset3, s"refuse-config-$offsetString3", lastConfig)
|
||||
lastConfigActual <- ledgerDao.lookupLedgerConfiguration().map(_.map(_._2).get)
|
||||
|
||||
entries <- ledgerDao.getConfigurationEntries(startExclusive, offset3).runWith(Sink.seq)
|
||||
@ -139,4 +126,37 @@ trait JdbcLedgerDaoConfigurationSpec { this: AsyncFlatSpec with Matchers with Jd
|
||||
}
|
||||
}
|
||||
|
||||
it should "fail trying to store configuration with non-incremental offsets" in {
|
||||
recoverToSucceededIf[LedgerEndUpdateError](
|
||||
storeConfigurationEntry(
|
||||
nextOffset(),
|
||||
s"submission-invalid-offsets",
|
||||
defaultConfig,
|
||||
maybePreviousOffset = Some(nextOffset())
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private def storeConfigurationEntry(
|
||||
offset: Offset,
|
||||
submissionId: String,
|
||||
lastConfig: Configuration,
|
||||
rejectionReason: Option[String] = None,
|
||||
shouldUpdateLedgerEnd: Boolean = true,
|
||||
maybePreviousOffset: Option[Offset] = Option.empty) =
|
||||
ledgerDao
|
||||
.storeConfigurationEntry(
|
||||
offsetStep = maybePreviousOffset
|
||||
.orElse(previousOffset.get())
|
||||
.map(IncrementalOffsetStep(_, offset))
|
||||
.getOrElse(CurrentOffset(offset)),
|
||||
Instant.EPOCH,
|
||||
submissionId,
|
||||
lastConfig,
|
||||
rejectionReason,
|
||||
)
|
||||
.map { r =>
|
||||
if (shouldUpdateLedgerEnd) previousOffset.set(Some(offset))
|
||||
r
|
||||
}
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ private[dao] trait JdbcLedgerDaoDivulgenceSpec extends LoneElement with Inside {
|
||||
_ <- store(
|
||||
divulgedContracts = Map((create2, someVersionedContractInstance) -> Set(alice)),
|
||||
blindingInfo = None,
|
||||
nextOffset() -> LedgerEntry.Transaction(
|
||||
offsetAndTx = nextOffset() -> LedgerEntry.Transaction(
|
||||
commandId = Some(UUID.randomUUID.toString),
|
||||
transactionId = UUID.randomUUID.toString,
|
||||
applicationId = Some(appId),
|
||||
|
@ -5,6 +5,11 @@ package com.daml.platform.store.dao
|
||||
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import com.daml.daml_lf_dev.DamlLf
|
||||
import com.daml.ledger.participant.state.index.v2.PackageDetails
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.platform.indexer.IncrementalOffsetStep
|
||||
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
|
||||
|
||||
private[dao] trait JdbcLedgerDaoPackagesSpec {
|
||||
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
|
||||
@ -17,18 +22,14 @@ private[dao] trait JdbcLedgerDaoPackagesSpec {
|
||||
val offset1 = nextOffset()
|
||||
val offset2 = nextOffset()
|
||||
for {
|
||||
firstUploadResult <- ledgerDao
|
||||
.storePackageEntry(
|
||||
offset1,
|
||||
packages
|
||||
.map(a => a._1 -> a._2.copy(sourceDescription = Some(firstDescription)))
|
||||
.take(1),
|
||||
None)
|
||||
secondUploadResult <- ledgerDao
|
||||
.storePackageEntry(
|
||||
offset2,
|
||||
packages.map(a => a._1 -> a._2.copy(sourceDescription = Some(secondDescription))),
|
||||
None)
|
||||
firstUploadResult <- storePackageEntry(
|
||||
offset1,
|
||||
packages
|
||||
.map(a => a._1 -> a._2.copy(sourceDescription = Some(firstDescription)))
|
||||
.take(1))
|
||||
secondUploadResult <- storePackageEntry(
|
||||
offset2,
|
||||
packages.map(a => a._1 -> a._2.copy(sourceDescription = Some(secondDescription))))
|
||||
loadedPackages <- ledgerDao.listLfPackages
|
||||
} yield {
|
||||
firstUploadResult shouldBe PersistenceResponse.Ok
|
||||
@ -39,4 +40,17 @@ private[dao] trait JdbcLedgerDaoPackagesSpec {
|
||||
}
|
||||
}
|
||||
|
||||
it should "fail on storing package entry with non-incremental offsets" in {
|
||||
val offset = nextOffset()
|
||||
recoverToSucceededIf[LedgerEndUpdateError](
|
||||
ledgerDao
|
||||
.storePackageEntry(IncrementalOffsetStep(offset, offset), packages, None)
|
||||
)
|
||||
}
|
||||
|
||||
private def storePackageEntry(
|
||||
offset: Offset,
|
||||
packageList: List[(DamlLf.Archive, PackageDetails)]) =
|
||||
ledgerDao
|
||||
.storePackageEntry(nextOffsetStep(offset), packageList, None)
|
||||
}
|
||||
|
@ -7,7 +7,10 @@ import java.time.Instant
|
||||
import java.util.UUID
|
||||
|
||||
import com.daml.ledger.api.domain.PartyDetails
|
||||
import com.daml.ledger.participant.state.v1.Offset
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.platform.indexer.{IncrementalOffsetStep, OffsetStep}
|
||||
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
|
||||
import com.daml.platform.store.entries.PartyLedgerEntry
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -28,26 +31,10 @@ private[dao] trait JdbcLedgerDaoPartiesSpec {
|
||||
displayName = Some("Bob Bobertson"),
|
||||
isLocal = true,
|
||||
)
|
||||
val offset1 = nextOffset()
|
||||
for {
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset1,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = alice,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(alice, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
offset2 = nextOffset()
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset2,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = bob,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(bob, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
parties <- ledgerDao.listKnownParties()
|
||||
} yield {
|
||||
@ -71,16 +58,8 @@ private[dao] trait JdbcLedgerDaoPartiesSpec {
|
||||
displayName = Some("Carol Carlisle"),
|
||||
isLocal = true,
|
||||
)
|
||||
val offset = nextOffset()
|
||||
for {
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = carol,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(carol, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
carolPartyDetails <- ledgerDao.getParties(Seq(party))
|
||||
noPartyDetails <- ledgerDao.getParties(Seq(nonExistentParty))
|
||||
@ -104,26 +83,10 @@ private[dao] trait JdbcLedgerDaoPartiesSpec {
|
||||
displayName = Some("Dangerous Dan"),
|
||||
isLocal = true,
|
||||
)
|
||||
val offset1 = nextOffset()
|
||||
for {
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset1,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = dan,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(dan, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
offset2 = nextOffset()
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset2,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = eve,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(eve, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
parties <- ledgerDao.getParties(Seq(danParty, eveParty, nonExistentParty))
|
||||
} yield {
|
||||
@ -137,29 +100,46 @@ private[dao] trait JdbcLedgerDaoPartiesSpec {
|
||||
displayName = Some("Fred Flintstone"),
|
||||
isLocal = true,
|
||||
)
|
||||
val offset1 = nextOffset()
|
||||
for {
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset1,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = fred,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(fred, nextOffset())
|
||||
_ = response should be(PersistenceResponse.Ok)
|
||||
offset2 = nextOffset()
|
||||
response <- ledgerDao.storePartyEntry(
|
||||
offset2,
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = fred,
|
||||
),
|
||||
)
|
||||
response <- storePartyEntry(fred, nextOffset(), shouldUpdateLegerEnd = false)
|
||||
} yield {
|
||||
response should be(PersistenceResponse.Duplicate)
|
||||
}
|
||||
}
|
||||
|
||||
it should "fail on storing a party entry with non-incremental offsets" in {
|
||||
val fred = PartyDetails(
|
||||
party = Ref.Party.assertFromString(s"Fred-${UUID.randomUUID()}"),
|
||||
displayName = Some("Fred Flintstone"),
|
||||
isLocal = true,
|
||||
)
|
||||
recoverToSucceededIf[LedgerEndUpdateError](
|
||||
ledgerDao.storePartyEntry(
|
||||
IncrementalOffsetStep(nextOffset(), nextOffset()),
|
||||
allocationAccepted(fred)
|
||||
))
|
||||
}
|
||||
|
||||
private def storePartyEntry(
|
||||
partyDetails: PartyDetails,
|
||||
offset: Offset,
|
||||
shouldUpdateLegerEnd: Boolean = true) =
|
||||
ledgerDao
|
||||
.storePartyEntry(
|
||||
OffsetStep(previousOffset.get(), offset),
|
||||
allocationAccepted(partyDetails)
|
||||
)
|
||||
.map { response =>
|
||||
if (shouldUpdateLegerEnd) previousOffset.set(Some(offset))
|
||||
response
|
||||
}
|
||||
|
||||
private def allocationAccepted(partyDetails: PartyDetails): PartyLedgerEntry.AllocationAccepted =
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
submissionIdOpt = Some(UUID.randomUUID().toString),
|
||||
recordTime = Instant.now,
|
||||
partyDetails = partyDetails,
|
||||
)
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ package com.daml.platform.store.dao
|
||||
import java.io.File
|
||||
import java.time.{Duration, Instant}
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
|
||||
|
||||
import akka.stream.scaladsl.Sink
|
||||
import com.daml.bazeltools.BazelRunfiles.rlocation
|
||||
@ -29,6 +29,7 @@ import com.daml.lf.transaction.{
|
||||
import com.daml.lf.value.Value
|
||||
import com.daml.lf.value.Value.{ContractId, ContractInst, ValueRecord, ValueText, ValueUnit}
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.platform.indexer.OffsetStep
|
||||
import com.daml.platform.store.entries.LedgerEntry
|
||||
import org.scalatest.AsyncTestSuite
|
||||
|
||||
@ -41,11 +42,16 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
|
||||
|
||||
protected implicit final val loggingContext: LoggingContext = LoggingContext.ForTesting
|
||||
|
||||
val previousOffset: AtomicReference[Option[Offset]] =
|
||||
new AtomicReference[Option[Offset]](Option.empty)
|
||||
|
||||
protected final val nextOffset: () => Offset = {
|
||||
val base = BigInt(1) << 32
|
||||
val counter = new AtomicLong(0)
|
||||
() =>
|
||||
Offset.fromByteArray((base + counter.getAndIncrement()).toByteArray)
|
||||
{
|
||||
Offset.fromByteArray((base + counter.getAndIncrement()).toByteArray)
|
||||
}
|
||||
}
|
||||
|
||||
protected final implicit class OffsetToLong(offset: Offset) {
|
||||
@ -508,8 +514,19 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
|
||||
divulgedContracts: Map[(ContractId, v1.ContractInst), Set[Party]],
|
||||
blindingInfo: Option[BlindingInfo],
|
||||
offsetAndTx: (Offset, LedgerEntry.Transaction),
|
||||
): Future[(Offset, LedgerEntry.Transaction)] =
|
||||
storeOffsetStepAndTx(
|
||||
divulgedContracts = divulgedContracts,
|
||||
blindingInfo = blindingInfo,
|
||||
offsetStepAndTx = nextOffsetStep(offsetAndTx._1) -> offsetAndTx._2
|
||||
)
|
||||
|
||||
protected final def storeOffsetStepAndTx(
|
||||
divulgedContracts: Map[(ContractId, v1.ContractInst), Set[Party]],
|
||||
blindingInfo: Option[BlindingInfo],
|
||||
offsetStepAndTx: (OffsetStep, LedgerEntry.Transaction),
|
||||
): Future[(Offset, LedgerEntry.Transaction)] = {
|
||||
val (offset, entry) = offsetAndTx
|
||||
val (offsetStep, entry) = offsetStepAndTx
|
||||
val submitterInfo =
|
||||
for (actAs <- if (entry.actAs.isEmpty) None else Some(entry.actAs); app <- entry.applicationId;
|
||||
cmd <- entry.commandId) yield v1.SubmitterInfo(actAs, app, cmd, Instant.EPOCH)
|
||||
@ -521,7 +538,7 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
|
||||
entry.workflowId,
|
||||
entry.transactionId,
|
||||
ledgerEffectiveTime,
|
||||
offset,
|
||||
offsetStep.offset,
|
||||
committedTransaction,
|
||||
divulged,
|
||||
blindingInfo,
|
||||
@ -534,17 +551,20 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
|
||||
transaction = committedTransaction,
|
||||
recordTime = entry.recordedAt,
|
||||
ledgerEffectiveTime = ledgerEffectiveTime,
|
||||
offset = offset,
|
||||
offsetStep = offsetStep,
|
||||
divulged = divulged,
|
||||
blindingInfo = blindingInfo,
|
||||
)
|
||||
.map(_ => offsetAndTx)
|
||||
.map(_ => offsetStep.offset -> entry)
|
||||
}
|
||||
|
||||
protected final def store(
|
||||
offsetAndTx: (Offset, LedgerEntry.Transaction),
|
||||
): Future[(Offset, LedgerEntry.Transaction)] =
|
||||
store(divulgedContracts = Map.empty, blindingInfo = None, offsetAndTx)
|
||||
storeOffsetStepAndTx(
|
||||
divulgedContracts = Map.empty,
|
||||
blindingInfo = None,
|
||||
offsetStepAndTx = nextOffsetStep(offsetAndTx._1) -> offsetAndTx._2)
|
||||
|
||||
protected final def storeSync(
|
||||
commands: Vector[(Offset, LedgerEntry.Transaction)],
|
||||
@ -711,6 +731,8 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
|
||||
.map(c => c.commandId -> c.status.get.code)
|
||||
.runWith(Sink.seq)
|
||||
|
||||
def nextOffsetStep(offset: Offset): OffsetStep =
|
||||
OffsetStep(previousOffset.getAndSet(Some(offset)), offset)
|
||||
}
|
||||
|
||||
object JdbcLedgerDaoSuite {
|
||||
@ -733,4 +755,5 @@ object JdbcLedgerDaoSuite {
|
||||
.traverse(a => Free suspend (Free liftF f(a)))
|
||||
.foldMap(NaturalTransformation.refl)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ import com.daml.lf.transaction.{BlindingInfo, NodeId}
|
||||
import org.scalatest.LoneElement
|
||||
import org.scalatest.flatspec.AsyncFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import com.daml.platform.indexer.IncrementalOffsetStep
|
||||
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
|
||||
|
||||
private[dao] trait JdbcLedgerDaoTransactionsWriterSpec extends LoneElement {
|
||||
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
|
||||
@ -67,4 +69,14 @@ private[dao] trait JdbcLedgerDaoTransactionsWriterSpec extends LoneElement {
|
||||
}
|
||||
}
|
||||
|
||||
it should "fail trying to store transactions with non-incremental offsets" in {
|
||||
val (offset, tx) = singleCreate
|
||||
recoverToSucceededIf[LedgerEndUpdateError](
|
||||
storeOffsetStepAndTx(
|
||||
offsetStepAndTx = IncrementalOffsetStep(nextOffset(), offset) -> tx,
|
||||
blindingInfo = None,
|
||||
divulgedContracts = Map.empty,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLe
|
||||
import com.daml.platform.store.{BaseLedger, FlywayMigrations}
|
||||
import com.daml.resources.ProgramResource.StartupException
|
||||
import scalaz.Tag
|
||||
import com.daml.platform.indexer.CurrentOffset
|
||||
|
||||
import scala.collection.immutable.Queue
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -211,7 +212,7 @@ private[sandbox] object SqlLedger {
|
||||
})
|
||||
|
||||
ledgerDao
|
||||
.storePackageEntry(newLedgerEnd, packages, None)
|
||||
.storePackageEntry(CurrentOffset(newLedgerEnd), packages, None)
|
||||
.transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))(
|
||||
DEC)
|
||||
} else {
|
||||
@ -363,7 +364,7 @@ private final class SqlLedger(
|
||||
ledgerDao.storeRejection(
|
||||
Some(submitterInfo),
|
||||
recordTime,
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
reason,
|
||||
),
|
||||
_ => {
|
||||
@ -388,7 +389,7 @@ private final class SqlLedger(
|
||||
transactionId,
|
||||
recordTime,
|
||||
transactionMeta.ledgerEffectiveTime.toInstant,
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
transactionCommitter.commitTransaction(transactionId, transaction),
|
||||
divulgedContracts,
|
||||
blindingInfo,
|
||||
@ -426,7 +427,7 @@ private final class SqlLedger(
|
||||
enqueue { offset =>
|
||||
ledgerDao
|
||||
.storePartyEntry(
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
PartyLedgerEntry.AllocationAccepted(
|
||||
Some(submissionId),
|
||||
timeProvider.getCurrentTime,
|
||||
@ -454,7 +455,7 @@ private final class SqlLedger(
|
||||
enqueue { offset =>
|
||||
ledgerDao
|
||||
.storePackageEntry(
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
packages,
|
||||
Some(PackageLedgerEntry.PackageUploadAccepted(submissionId, timeProvider.getCurrentTime)),
|
||||
)
|
||||
@ -481,7 +482,7 @@ private final class SqlLedger(
|
||||
if (recordTime.isAfter(mrt)) {
|
||||
ledgerDao
|
||||
.storeConfigurationEntry(
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
recordTime,
|
||||
submissionId,
|
||||
config,
|
||||
@ -497,7 +498,7 @@ private final class SqlLedger(
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
for {
|
||||
response <- ledgerDao.storeConfigurationEntry(
|
||||
offset,
|
||||
CurrentOffset(offset),
|
||||
recordTime,
|
||||
submissionId,
|
||||
config,
|
||||
|
Loading…
Reference in New Issue
Block a user