mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
[kvutils] - Extract command deduplication steps from the TransactionCommitter [KVL-1174] (#11547)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
0e95ccb354
commit
7bc0f82733
@ -0,0 +1,117 @@
|
|||||||
|
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
|
import com.daml.ledger.configuration.Configuration
|
||||||
|
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.Conversions.{
|
||||||
|
buildDuration,
|
||||||
|
commandDedupKey,
|
||||||
|
parseDuration,
|
||||||
|
parseTimestamp,
|
||||||
|
}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
|
||||||
|
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||||
|
DamlTransactionRejectionEntry,
|
||||||
|
Duplicate,
|
||||||
|
}
|
||||||
|
import com.daml.logging.LoggingContext
|
||||||
|
|
||||||
|
private[transaction] object CommandDeduplication {
|
||||||
|
|
||||||
|
def overwriteDeduplicationPeriodWithMaxDurationStep(
|
||||||
|
defaultConfig: Configuration
|
||||||
|
): Step = new Step {
|
||||||
|
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
|
||||||
|
loggingContext: LoggingContext
|
||||||
|
): StepResult[DamlTransactionEntrySummary] = {
|
||||||
|
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
|
||||||
|
val submission = input.submission.toBuilder
|
||||||
|
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
|
||||||
|
buildDuration(currentConfig.maxDeduplicationTime)
|
||||||
|
)
|
||||||
|
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Reject duplicate commands
|
||||||
|
*/
|
||||||
|
def deduplicateCommandStep(rejections: Rejections): Step = new Step {
|
||||||
|
def apply(
|
||||||
|
commitContext: CommitContext,
|
||||||
|
transactionEntry: DamlTransactionEntrySummary,
|
||||||
|
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
||||||
|
commitContext.recordTime
|
||||||
|
.map { recordTime =>
|
||||||
|
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
|
||||||
|
val dedupEntry = commitContext.get(dedupKey)
|
||||||
|
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
|
||||||
|
StepContinue(transactionEntry)
|
||||||
|
} else {
|
||||||
|
rejections.reject(
|
||||||
|
DamlTransactionRejectionEntry.newBuilder
|
||||||
|
.setSubmitterInfo(transactionEntry.submitterInfo)
|
||||||
|
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
|
||||||
|
.setDefiniteAnswer(false)
|
||||||
|
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
|
||||||
|
"the command is a duplicate",
|
||||||
|
commitContext.recordTime,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.getOrElse(StepContinue(transactionEntry))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def setDeduplicationEntryStep(defaultConfig: Configuration): Step =
|
||||||
|
new Step {
|
||||||
|
def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)(
|
||||||
|
implicit loggingContext: LoggingContext
|
||||||
|
): StepResult[DamlTransactionEntrySummary] = {
|
||||||
|
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
|
||||||
|
// Deduplication duration must be explicitly overwritten in a previous step
|
||||||
|
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
|
||||||
|
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
|
||||||
|
throw Err.InvalidSubmission("Deduplication duration is not set.")
|
||||||
|
}
|
||||||
|
val deduplicateUntil = Conversions.buildTimestamp(
|
||||||
|
transactionEntry.submissionTime
|
||||||
|
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
|
||||||
|
.add(config.timeModel.minSkew)
|
||||||
|
)
|
||||||
|
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
|
||||||
|
.setDeduplicatedUntil(deduplicateUntil)
|
||||||
|
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
|
||||||
|
// Set a deduplication entry.
|
||||||
|
commitContext.set(
|
||||||
|
commandDedupKey(transactionEntry.submitterInfo),
|
||||||
|
DamlStateValue.newBuilder
|
||||||
|
.setCommandDedup(
|
||||||
|
commandDedupBuilder.build
|
||||||
|
)
|
||||||
|
.build,
|
||||||
|
)
|
||||||
|
StepContinue(transactionEntry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checks that the submission time of the command is after the
|
||||||
|
// deduplicationTime represented by stateValue
|
||||||
|
private def isAfterDeduplicationTime(
|
||||||
|
submissionTime: Instant,
|
||||||
|
stateValue: DamlStateValue,
|
||||||
|
): Boolean = {
|
||||||
|
val cmdDedup = stateValue.getCommandDedup
|
||||||
|
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
|
||||||
|
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
|
||||||
|
dedupTime.isBefore(submissionTime)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -6,8 +6,6 @@
|
|||||||
|
|
||||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||||
|
|
||||||
import java.time.Instant
|
|
||||||
|
|
||||||
import com.daml.ledger.configuration.Configuration
|
import com.daml.ledger.configuration.Configuration
|
||||||
import com.daml.ledger.participant.state.kvutils.Conversions._
|
import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||||
import com.daml.ledger.participant.state.kvutils.committer.Committer._
|
import com.daml.ledger.participant.state.kvutils.committer.Committer._
|
||||||
@ -17,12 +15,8 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.validatio
|
|||||||
ModelConformanceValidator,
|
ModelConformanceValidator,
|
||||||
TransactionConsistencyValidator,
|
TransactionConsistencyValidator,
|
||||||
}
|
}
|
||||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
|
||||||
DamlTransactionRejectionEntry,
|
|
||||||
Duplicate,
|
|
||||||
}
|
|
||||||
import com.daml.ledger.participant.state.kvutils.store.{
|
import com.daml.ledger.participant.state.kvutils.store.{
|
||||||
DamlCommandDedupValue,
|
|
||||||
DamlContractKeyState,
|
DamlContractKeyState,
|
||||||
DamlContractState,
|
DamlContractState,
|
||||||
DamlLogEntry,
|
DamlLogEntry,
|
||||||
@ -75,74 +69,19 @@ private[kvutils] class TransactionCommitter(
|
|||||||
override protected val steps: Steps[DamlTransactionEntrySummary] = Iterable(
|
override protected val steps: Steps[DamlTransactionEntrySummary] = Iterable(
|
||||||
"authorize_submitter" -> authorizeSubmitters,
|
"authorize_submitter" -> authorizeSubmitters,
|
||||||
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
|
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
|
||||||
"overwrite_deduplication_period" -> overwriteDeduplicationPeriodWithMaxDuration,
|
"overwrite_deduplication_period" -> CommandDeduplication
|
||||||
"deduplicate" -> deduplicateCommand,
|
.overwriteDeduplicationPeriodWithMaxDurationStep(defaultConfig),
|
||||||
|
"deduplicate" -> CommandDeduplication.deduplicateCommandStep(rejections),
|
||||||
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
|
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
|
||||||
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
|
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
|
||||||
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
|
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
|
||||||
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
|
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
|
||||||
|
"set_deduplication_entry" -> CommandDeduplication.setDeduplicationEntryStep(defaultConfig),
|
||||||
"blind" -> blind,
|
"blind" -> blind,
|
||||||
"trim_unnecessary_nodes" -> trimUnnecessaryNodes,
|
"trim_unnecessary_nodes" -> trimUnnecessaryNodes,
|
||||||
"build_final_log_entry" -> buildFinalLogEntry,
|
"build_final_log_entry" -> buildFinalLogEntry,
|
||||||
)
|
)
|
||||||
|
|
||||||
private[transaction] def overwriteDeduplicationPeriodWithMaxDuration: Step = new Step {
|
|
||||||
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
|
|
||||||
loggingContext: LoggingContext
|
|
||||||
): StepResult[DamlTransactionEntrySummary] = {
|
|
||||||
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
|
|
||||||
val submission = input.submission.toBuilder
|
|
||||||
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
|
|
||||||
buildDuration(currentConfig.maxDeduplicationTime)
|
|
||||||
)
|
|
||||||
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Reject duplicate commands
|
|
||||||
*/
|
|
||||||
private[transaction] def deduplicateCommand: Step = new Step {
|
|
||||||
def apply(
|
|
||||||
commitContext: CommitContext,
|
|
||||||
transactionEntry: DamlTransactionEntrySummary,
|
|
||||||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
|
||||||
commitContext.recordTime
|
|
||||||
.map { recordTime =>
|
|
||||||
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
|
|
||||||
val dedupEntry = commitContext.get(dedupKey)
|
|
||||||
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
|
|
||||||
StepContinue(transactionEntry)
|
|
||||||
} else {
|
|
||||||
rejections.reject(
|
|
||||||
DamlTransactionRejectionEntry.newBuilder
|
|
||||||
.setSubmitterInfo(transactionEntry.submitterInfo)
|
|
||||||
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
|
|
||||||
.setDefiniteAnswer(false)
|
|
||||||
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
|
|
||||||
"the command is a duplicate",
|
|
||||||
commitContext.recordTime,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.getOrElse(StepContinue(transactionEntry))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checks that the submission time of the command is after the
|
|
||||||
// deduplicationTime represented by stateValue
|
|
||||||
private def isAfterDeduplicationTime(
|
|
||||||
submissionTime: Instant,
|
|
||||||
stateValue: DamlStateValue,
|
|
||||||
): Boolean = {
|
|
||||||
val cmdDedup = stateValue.getCommandDedup
|
|
||||||
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
|
|
||||||
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
|
|
||||||
dedupTime.isBefore(submissionTime)
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Authorize the submission by looking up the party allocation and verifying
|
/** Authorize the submission by looking up the party allocation and verifying
|
||||||
* that all of the submitting parties are indeed hosted by the submitting participant.
|
* that all of the submitting parties are indeed hosted by the submitting participant.
|
||||||
*/
|
*/
|
||||||
@ -197,8 +136,6 @@ private[kvutils] class TransactionCommitter(
|
|||||||
commitContext: CommitContext,
|
commitContext: CommitContext,
|
||||||
transactionEntry: DamlTransactionEntrySummary,
|
transactionEntry: DamlTransactionEntrySummary,
|
||||||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
||||||
setDedupEntry(commitContext, transactionEntry)
|
|
||||||
|
|
||||||
val blindingInfo = Blinding.blind(transactionEntry.transaction)
|
val blindingInfo = Blinding.blind(transactionEntry.transaction)
|
||||||
|
|
||||||
val divulgedContracts =
|
val divulgedContracts =
|
||||||
@ -319,35 +256,6 @@ private[kvutils] class TransactionCommitter(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[transaction] def setDedupEntry(
|
|
||||||
commitContext: CommitContext,
|
|
||||||
transactionEntry: DamlTransactionEntrySummary,
|
|
||||||
)(implicit loggingContext: LoggingContext): Unit = {
|
|
||||||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
|
|
||||||
// Deduplication duration must be explicitly overwritten in a previous step
|
|
||||||
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
|
|
||||||
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
|
|
||||||
throw Err.InvalidSubmission("Deduplication duration is not set.")
|
|
||||||
}
|
|
||||||
val deduplicateUntil = Conversions.buildTimestamp(
|
|
||||||
transactionEntry.submissionTime
|
|
||||||
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
|
|
||||||
.add(config.timeModel.minSkew)
|
|
||||||
)
|
|
||||||
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
|
|
||||||
.setDeduplicatedUntil(deduplicateUntil)
|
|
||||||
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
|
|
||||||
// Set a deduplication entry.
|
|
||||||
commitContext.set(
|
|
||||||
commandDedupKey(transactionEntry.submitterInfo),
|
|
||||||
DamlStateValue.newBuilder
|
|
||||||
.setCommandDedup(
|
|
||||||
commandDedupBuilder.build
|
|
||||||
)
|
|
||||||
.build,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def updateContractStateAndFetchDivulgedContracts(
|
private def updateContractStateAndFetchDivulgedContracts(
|
||||||
transactionEntry: DamlTransactionEntrySummary,
|
transactionEntry: DamlTransactionEntrySummary,
|
||||||
blindingInfo: BlindingInfo,
|
blindingInfo: BlindingInfo,
|
||||||
|
@ -0,0 +1,243 @@
|
|||||||
|
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||||
|
|
||||||
|
import java.time
|
||||||
|
|
||||||
|
import com.codahale.metrics.MetricRegistry
|
||||||
|
import com.daml.ledger.configuration.Configuration
|
||||||
|
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.TestHelpers._
|
||||||
|
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||||
|
DamlConfigurationEntry,
|
||||||
|
DamlSubmitterInfo,
|
||||||
|
}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
|
||||||
|
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
|
||||||
|
import com.daml.lf.data.Time.Timestamp
|
||||||
|
import com.daml.logging.LoggingContext
|
||||||
|
import com.daml.metrics.Metrics
|
||||||
|
import com.google.protobuf
|
||||||
|
import org.mockito.MockitoSugar
|
||||||
|
import org.scalatest.Inside.inside
|
||||||
|
import org.scalatest.OptionValues
|
||||||
|
import org.scalatest.matchers.should.Matchers
|
||||||
|
import org.scalatest.prop.TableDrivenPropertyChecks._
|
||||||
|
import org.scalatest.wordspec.AnyWordSpec
|
||||||
|
|
||||||
|
import scala.annotation.nowarn
|
||||||
|
import scala.util.Random
|
||||||
|
|
||||||
|
@nowarn("msg=deprecated")
|
||||||
|
class CommandDeduplicationSpec
|
||||||
|
extends AnyWordSpec
|
||||||
|
with Matchers
|
||||||
|
with MockitoSugar
|
||||||
|
with OptionValues {
|
||||||
|
import CommandDeduplicationSpec._
|
||||||
|
|
||||||
|
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
|
||||||
|
|
||||||
|
private val metrics = new Metrics(new MetricRegistry)
|
||||||
|
private val rejections = new Rejections(metrics)
|
||||||
|
private val deduplicateCommandStep = CommandDeduplication.deduplicateCommandStep(rejections)
|
||||||
|
private val setDeduplicationEntryStep =
|
||||||
|
CommandDeduplication.setDeduplicationEntryStep(theDefaultConfig)
|
||||||
|
|
||||||
|
"deduplicateCommand" should {
|
||||||
|
"continue if record time is not available" in {
|
||||||
|
val context = createCommitContext(recordTime = None)
|
||||||
|
|
||||||
|
val actual =
|
||||||
|
deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||||
|
|
||||||
|
actual match {
|
||||||
|
case StepContinue(_) => succeed
|
||||||
|
case StepStop(_) => fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"continue if record time is available but no deduplication entry could be found" in {
|
||||||
|
val inputs = Map(aDedupKey -> None)
|
||||||
|
val context =
|
||||||
|
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
|
||||||
|
|
||||||
|
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||||
|
|
||||||
|
actual match {
|
||||||
|
case StepContinue(_) => succeed
|
||||||
|
case StepStop(_) => fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"continue if record time is after deduplication time in case a deduplication entry is found" in {
|
||||||
|
val dedupValue = newDedupValue(aRecordTime)
|
||||||
|
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||||
|
val context =
|
||||||
|
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
|
||||||
|
|
||||||
|
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||||
|
|
||||||
|
actual match {
|
||||||
|
case StepContinue(_) => succeed
|
||||||
|
case StepStop(_) => fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"produce rejection log entry in case record time is on or before deduplication time" in {
|
||||||
|
for (
|
||||||
|
(recordTime, deduplicationTime) <- Iterable(
|
||||||
|
(aRecordTime, aRecordTime),
|
||||||
|
(aRecordTime, aRecordTime.addMicros(1)),
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
val dedupValue = newDedupValue(deduplicationTime)
|
||||||
|
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||||
|
val context =
|
||||||
|
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
|
||||||
|
|
||||||
|
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||||
|
|
||||||
|
actual match {
|
||||||
|
case StepContinue(_) => fail()
|
||||||
|
case StepStop(actualLogEntry) =>
|
||||||
|
actualLogEntry.hasTransactionRejectionEntry shouldBe true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"setting dedup context" should {
|
||||||
|
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
|
||||||
|
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
|
||||||
|
val deduplicationDuration = time.Duration.ofSeconds(3)
|
||||||
|
|
||||||
|
"calculate deduplicate until based on deduplication duration" in {
|
||||||
|
val (context, transactionEntrySummary) =
|
||||||
|
buildContextAndTransaction(
|
||||||
|
submissionTime,
|
||||||
|
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||||
|
)
|
||||||
|
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||||
|
contextDeduplicateUntil(
|
||||||
|
context,
|
||||||
|
transactionEntrySummary,
|
||||||
|
).value shouldBe protobuf.Timestamp
|
||||||
|
.newBuilder()
|
||||||
|
.setSeconds(
|
||||||
|
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
"set the submission time in the committer context" in {
|
||||||
|
val (context, transactionEntrySummary) =
|
||||||
|
buildContextAndTransaction(
|
||||||
|
submissionTime,
|
||||||
|
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||||
|
)
|
||||||
|
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||||
|
context
|
||||||
|
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||||
|
.map(
|
||||||
|
_.getCommandDedup.getSubmissionTime
|
||||||
|
)
|
||||||
|
.value shouldBe submissionTime
|
||||||
|
}
|
||||||
|
|
||||||
|
"throw an error for unsupported deduplication periods" in {
|
||||||
|
forAll(
|
||||||
|
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
|
||||||
|
"deduplication setter",
|
||||||
|
_.clearDeduplicationPeriod(),
|
||||||
|
_.setDeduplicationOffset("offset"),
|
||||||
|
_.setDeduplicateUntil(deduplicateUntil),
|
||||||
|
)
|
||||||
|
) { deduplicationSetter =>
|
||||||
|
{
|
||||||
|
val (context, transactionEntrySummary) =
|
||||||
|
buildContextAndTransaction(submissionTime, deduplicationSetter)
|
||||||
|
a[Err.InvalidSubmission] shouldBe thrownBy(
|
||||||
|
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"overwriteDeduplicationPeriodWithMaxDuration" should {
|
||||||
|
"set max deduplication duration as deduplication period" in {
|
||||||
|
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
|
||||||
|
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
|
||||||
|
val commitContext = createCommitContext(
|
||||||
|
None,
|
||||||
|
Map(
|
||||||
|
Conversions.configurationStateKey -> None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
val result =
|
||||||
|
CommandDeduplication.overwriteDeduplicationPeriodWithMaxDurationStep(config)(
|
||||||
|
commitContext,
|
||||||
|
aTransactionEntrySummary,
|
||||||
|
)
|
||||||
|
inside(result) { case StepContinue(entry) =>
|
||||||
|
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
|
||||||
|
maxDeduplicationDuration
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
|
||||||
|
DamlStateValue.newBuilder
|
||||||
|
.setCommandDedup(
|
||||||
|
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
|
||||||
|
)
|
||||||
|
.build
|
||||||
|
}
|
||||||
|
|
||||||
|
object CommandDeduplicationSpec {
|
||||||
|
|
||||||
|
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
|
||||||
|
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
||||||
|
private val aRecordTime = Timestamp(100)
|
||||||
|
private val aDedupKey = Conversions
|
||||||
|
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
|
||||||
|
private val aDamlConfigurationStateValue = DamlStateValue.newBuilder
|
||||||
|
.setConfigurationEntry(
|
||||||
|
DamlConfigurationEntry.newBuilder
|
||||||
|
.setConfiguration(Configuration.encode(theDefaultConfig))
|
||||||
|
)
|
||||||
|
.build
|
||||||
|
|
||||||
|
private def buildContextAndTransaction(
|
||||||
|
submissionTime: protobuf.Timestamp,
|
||||||
|
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
|
||||||
|
) = {
|
||||||
|
val context = createCommitContext(None)
|
||||||
|
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
|
||||||
|
val transactionEntrySummary = DamlTransactionEntrySummary(
|
||||||
|
aDamlTransactionEntry.toBuilder
|
||||||
|
.setSubmitterInfo(
|
||||||
|
submitterInfoAugmenter(
|
||||||
|
DamlSubmitterInfo
|
||||||
|
.newBuilder()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setSubmissionTime(submissionTime)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
context -> transactionEntrySummary
|
||||||
|
}
|
||||||
|
|
||||||
|
private def contextDeduplicateUntil(
|
||||||
|
context: CommitContext,
|
||||||
|
transactionEntrySummary: DamlTransactionEntrySummary,
|
||||||
|
) = context
|
||||||
|
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||||
|
.map(
|
||||||
|
_.getCommandDedup.getDeduplicatedUntil
|
||||||
|
)
|
||||||
|
}
|
@ -3,48 +3,39 @@
|
|||||||
|
|
||||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||||
|
|
||||||
import java.time
|
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry
|
import com.codahale.metrics.MetricRegistry
|
||||||
import com.daml.ledger.configuration.Configuration
|
import com.daml.ledger.configuration.Configuration
|
||||||
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
|
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
|
||||||
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
|
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
|
||||||
import com.daml.ledger.participant.state.kvutils.TestHelpers._
|
import com.daml.ledger.participant.state.kvutils.TestHelpers._
|
||||||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
|
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
|
||||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||||
DamlConfigurationEntry,
|
DamlConfigurationEntry,
|
||||||
DamlSubmitterInfo,
|
|
||||||
DamlTransactionRejectionEntry,
|
DamlTransactionRejectionEntry,
|
||||||
}
|
}
|
||||||
import com.daml.ledger.participant.state.kvutils.store.{
|
import com.daml.ledger.participant.state.kvutils.store.{
|
||||||
DamlCommandDedupValue,
|
|
||||||
DamlPartyAllocation,
|
DamlPartyAllocation,
|
||||||
DamlStateKey,
|
DamlStateKey,
|
||||||
DamlStateValue,
|
DamlStateValue,
|
||||||
}
|
}
|
||||||
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, committer}
|
import com.daml.ledger.participant.state.kvutils.{Conversions, committer}
|
||||||
import com.daml.lf.data.Time.Timestamp
|
import com.daml.lf.data.Time.Timestamp
|
||||||
import com.daml.lf.data.{ImmArray, Ref}
|
import com.daml.lf.data.{ImmArray, Ref}
|
||||||
import com.daml.lf.engine.Engine
|
import com.daml.lf.engine.Engine
|
||||||
import com.daml.lf.transaction._
|
import com.daml.lf.transaction._
|
||||||
import com.daml.lf.transaction.test.TransactionBuilder
|
import com.daml.lf.transaction.test.TransactionBuilder
|
||||||
import com.daml.lf.transaction.Node
|
|
||||||
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
|
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
|
||||||
import com.daml.lf.value.{Value, ValueOuterClass}
|
import com.daml.lf.value.{Value, ValueOuterClass}
|
||||||
import com.daml.logging.LoggingContext
|
import com.daml.logging.LoggingContext
|
||||||
import com.daml.metrics.Metrics
|
import com.daml.metrics.Metrics
|
||||||
import com.google.protobuf
|
|
||||||
import com.google.protobuf.Duration
|
import com.google.protobuf.Duration
|
||||||
import org.mockito.MockitoSugar
|
import org.mockito.MockitoSugar
|
||||||
import org.scalatest.Inside.inside
|
|
||||||
import org.scalatest.OptionValues
|
import org.scalatest.OptionValues
|
||||||
import org.scalatest.matchers.should.Matchers
|
import org.scalatest.matchers.should.Matchers
|
||||||
import org.scalatest.prop.TableDrivenPropertyChecks._
|
|
||||||
import org.scalatest.wordspec.AnyWordSpec
|
import org.scalatest.wordspec.AnyWordSpec
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
import scala.annotation.nowarn
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
import scala.util.Random
|
|
||||||
|
|
||||||
@nowarn("msg=deprecated")
|
@nowarn("msg=deprecated")
|
||||||
class TransactionCommitterSpec
|
class TransactionCommitterSpec
|
||||||
@ -173,149 +164,6 @@ class TransactionCommitterSpec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"deduplicateCommand" should {
|
|
||||||
"continue if record time is not available" in {
|
|
||||||
val context = createCommitContext(recordTime = None)
|
|
||||||
|
|
||||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
|
||||||
|
|
||||||
actual match {
|
|
||||||
case StepContinue(_) => succeed
|
|
||||||
case StepStop(_) => fail()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"continue if record time is available but no deduplication entry could be found" in {
|
|
||||||
val inputs = Map(aDedupKey -> None)
|
|
||||||
val context =
|
|
||||||
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
|
|
||||||
|
|
||||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
|
||||||
|
|
||||||
actual match {
|
|
||||||
case StepContinue(_) => succeed
|
|
||||||
case StepStop(_) => fail()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"continue if record time is after deduplication time in case a deduplication entry is found" in {
|
|
||||||
val dedupValue = newDedupValue(aRecordTime)
|
|
||||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
|
||||||
val context =
|
|
||||||
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
|
|
||||||
|
|
||||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
|
||||||
|
|
||||||
actual match {
|
|
||||||
case StepContinue(_) => succeed
|
|
||||||
case StepStop(_) => fail()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"produce rejection log entry in case record time is on or before deduplication time" in {
|
|
||||||
for (
|
|
||||||
(recordTime, deduplicationTime) <- Iterable(
|
|
||||||
(aRecordTime, aRecordTime),
|
|
||||||
(aRecordTime, aRecordTime.addMicros(1)),
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
val dedupValue = newDedupValue(deduplicationTime)
|
|
||||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
|
||||||
val context =
|
|
||||||
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
|
|
||||||
|
|
||||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
|
||||||
|
|
||||||
actual match {
|
|
||||||
case StepContinue(_) => fail()
|
|
||||||
case StepStop(actualLogEntry) =>
|
|
||||||
actualLogEntry.hasTransactionRejectionEntry shouldBe true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"setting dedup context" should {
|
|
||||||
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
|
|
||||||
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
|
|
||||||
val deduplicationDuration = time.Duration.ofSeconds(3)
|
|
||||||
|
|
||||||
"calculate deduplicate until based on deduplication duration" in {
|
|
||||||
val (context, transactionEntrySummary) =
|
|
||||||
buildContextAndTransaction(
|
|
||||||
submissionTime,
|
|
||||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
|
||||||
)
|
|
||||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
|
||||||
contextDeduplicateUntil(
|
|
||||||
context,
|
|
||||||
transactionEntrySummary,
|
|
||||||
).value shouldBe protobuf.Timestamp
|
|
||||||
.newBuilder()
|
|
||||||
.setSeconds(
|
|
||||||
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
|
|
||||||
)
|
|
||||||
.build()
|
|
||||||
}
|
|
||||||
|
|
||||||
"set the submission time in the committer context" in {
|
|
||||||
val (context, transactionEntrySummary) =
|
|
||||||
buildContextAndTransaction(
|
|
||||||
submissionTime,
|
|
||||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
|
||||||
)
|
|
||||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
|
||||||
context
|
|
||||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
|
||||||
.map(
|
|
||||||
_.getCommandDedup.getSubmissionTime
|
|
||||||
)
|
|
||||||
.value shouldBe submissionTime
|
|
||||||
}
|
|
||||||
|
|
||||||
"throw an error for unsupported deduplication periods" in {
|
|
||||||
forAll(
|
|
||||||
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
|
|
||||||
"deduplication setter",
|
|
||||||
_.clearDeduplicationPeriod(),
|
|
||||||
_.setDeduplicationOffset("offset"),
|
|
||||||
_.setDeduplicateUntil(deduplicateUntil),
|
|
||||||
)
|
|
||||||
) { deduplicationSetter =>
|
|
||||||
{
|
|
||||||
val (context, transactionEntrySummary) =
|
|
||||||
buildContextAndTransaction(submissionTime, deduplicationSetter)
|
|
||||||
a[Err.InvalidSubmission] shouldBe thrownBy(
|
|
||||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"overwriteDeduplicationPeriodWithMaxDuration" should {
|
|
||||||
"set max deduplication duration as deduplication period" in {
|
|
||||||
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
|
|
||||||
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
|
|
||||||
val commitContext = createCommitContext(
|
|
||||||
None,
|
|
||||||
Map(
|
|
||||||
Conversions.configurationStateKey -> None
|
|
||||||
),
|
|
||||||
)
|
|
||||||
val committer = createTransactionCommitter(config)
|
|
||||||
val result = committer.overwriteDeduplicationPeriodWithMaxDuration(
|
|
||||||
commitContext,
|
|
||||||
aTransactionEntrySummary,
|
|
||||||
)
|
|
||||||
inside(result) { case StepContinue(entry) =>
|
|
||||||
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
|
|
||||||
maxDeduplicationDuration
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"buildLogEntry" should {
|
"buildLogEntry" should {
|
||||||
"set record time in log entry when it is available" in {
|
"set record time in log entry when it is available" in {
|
||||||
val context = createCommitContext(recordTime = Some(theRecordTime))
|
val context = createCommitContext(recordTime = Some(theRecordTime))
|
||||||
@ -426,13 +274,6 @@ class TransactionCommitterSpec
|
|||||||
metrics,
|
metrics,
|
||||||
)
|
)
|
||||||
|
|
||||||
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
|
|
||||||
DamlStateValue.newBuilder
|
|
||||||
.setCommandDedup(
|
|
||||||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
|
|
||||||
)
|
|
||||||
.build
|
|
||||||
|
|
||||||
private def create(
|
private def create(
|
||||||
contractId: ContractId,
|
contractId: ContractId,
|
||||||
signatories: Set[Ref.Party] = Set(aKeyMaintainer),
|
signatories: Set[Ref.Party] = Set(aKeyMaintainer),
|
||||||
@ -474,8 +315,6 @@ object TransactionCommitterSpec {
|
|||||||
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
|
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
|
||||||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
||||||
private val aRecordTime = Timestamp(100)
|
private val aRecordTime = Timestamp(100)
|
||||||
private val aDedupKey = Conversions
|
|
||||||
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
|
|
||||||
private val aDummyValue = TransactionBuilder.record("field" -> "value")
|
private val aDummyValue = TransactionBuilder.record("field" -> "value")
|
||||||
private val aKey = "key"
|
private val aKey = "key"
|
||||||
private val aKeyMaintainer = "maintainer"
|
private val aKeyMaintainer = "maintainer"
|
||||||
@ -622,32 +461,4 @@ object TransactionCommitterSpec {
|
|||||||
private def lookupByKeyNodeBuilder =
|
private def lookupByKeyNodeBuilder =
|
||||||
TransactionOuterClass.NodeLookupByKey.newBuilder()
|
TransactionOuterClass.NodeLookupByKey.newBuilder()
|
||||||
|
|
||||||
private def buildContextAndTransaction(
|
|
||||||
submissionTime: protobuf.Timestamp,
|
|
||||||
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
|
|
||||||
) = {
|
|
||||||
val context = createCommitContext(None)
|
|
||||||
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
|
|
||||||
val transactionEntrySummary = DamlTransactionEntrySummary(
|
|
||||||
aDamlTransactionEntry.toBuilder
|
|
||||||
.setSubmitterInfo(
|
|
||||||
submitterInfoAugmenter(
|
|
||||||
DamlSubmitterInfo
|
|
||||||
.newBuilder()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.setSubmissionTime(submissionTime)
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
context -> transactionEntrySummary
|
|
||||||
}
|
|
||||||
|
|
||||||
private def contextDeduplicateUntil(
|
|
||||||
context: CommitContext,
|
|
||||||
transactionEntrySummary: DamlTransactionEntrySummary,
|
|
||||||
) = context
|
|
||||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
|
||||||
.map(
|
|
||||||
_.getCommandDedup.getDeduplicatedUntil
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user