mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
[kvutils] - Extract command deduplication steps from the TransactionCommitter [KVL-1174] (#11547)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
0e95ccb354
commit
7bc0f82733
@ -0,0 +1,117 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import com.daml.ledger.configuration.Configuration
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{
|
||||
buildDuration,
|
||||
commandDedupKey,
|
||||
parseDuration,
|
||||
parseTimestamp,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
|
||||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
|
||||
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||
DamlTransactionRejectionEntry,
|
||||
Duplicate,
|
||||
}
|
||||
import com.daml.logging.LoggingContext
|
||||
|
||||
private[transaction] object CommandDeduplication {
|
||||
|
||||
def overwriteDeduplicationPeriodWithMaxDurationStep(
|
||||
defaultConfig: Configuration
|
||||
): Step = new Step {
|
||||
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): StepResult[DamlTransactionEntrySummary] = {
|
||||
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
|
||||
val submission = input.submission.toBuilder
|
||||
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
|
||||
buildDuration(currentConfig.maxDeduplicationTime)
|
||||
)
|
||||
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
|
||||
}
|
||||
}
|
||||
|
||||
/** Reject duplicate commands
|
||||
*/
|
||||
def deduplicateCommandStep(rejections: Rejections): Step = new Step {
|
||||
def apply(
|
||||
commitContext: CommitContext,
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
||||
commitContext.recordTime
|
||||
.map { recordTime =>
|
||||
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
|
||||
val dedupEntry = commitContext.get(dedupKey)
|
||||
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
|
||||
StepContinue(transactionEntry)
|
||||
} else {
|
||||
rejections.reject(
|
||||
DamlTransactionRejectionEntry.newBuilder
|
||||
.setSubmitterInfo(transactionEntry.submitterInfo)
|
||||
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
|
||||
.setDefiniteAnswer(false)
|
||||
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
|
||||
"the command is a duplicate",
|
||||
commitContext.recordTime,
|
||||
)
|
||||
}
|
||||
}
|
||||
.getOrElse(StepContinue(transactionEntry))
|
||||
}
|
||||
}
|
||||
|
||||
def setDeduplicationEntryStep(defaultConfig: Configuration): Step =
|
||||
new Step {
|
||||
def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)(
|
||||
implicit loggingContext: LoggingContext
|
||||
): StepResult[DamlTransactionEntrySummary] = {
|
||||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
|
||||
// Deduplication duration must be explicitly overwritten in a previous step
|
||||
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
|
||||
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
|
||||
throw Err.InvalidSubmission("Deduplication duration is not set.")
|
||||
}
|
||||
val deduplicateUntil = Conversions.buildTimestamp(
|
||||
transactionEntry.submissionTime
|
||||
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
|
||||
.add(config.timeModel.minSkew)
|
||||
)
|
||||
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
|
||||
.setDeduplicatedUntil(deduplicateUntil)
|
||||
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
|
||||
// Set a deduplication entry.
|
||||
commitContext.set(
|
||||
commandDedupKey(transactionEntry.submitterInfo),
|
||||
DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
commandDedupBuilder.build
|
||||
)
|
||||
.build,
|
||||
)
|
||||
StepContinue(transactionEntry)
|
||||
}
|
||||
}
|
||||
|
||||
// Checks that the submission time of the command is after the
|
||||
// deduplicationTime represented by stateValue
|
||||
private def isAfterDeduplicationTime(
|
||||
submissionTime: Instant,
|
||||
stateValue: DamlStateValue,
|
||||
): Boolean = {
|
||||
val cmdDedup = stateValue.getCommandDedup
|
||||
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
|
||||
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
|
||||
dedupTime.isBefore(submissionTime)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
@ -6,8 +6,6 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import com.daml.ledger.configuration.Configuration
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.Committer._
|
||||
@ -17,12 +15,8 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.validatio
|
||||
ModelConformanceValidator,
|
||||
TransactionConsistencyValidator,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||
DamlTransactionRejectionEntry,
|
||||
Duplicate,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
|
||||
import com.daml.ledger.participant.state.kvutils.store.{
|
||||
DamlCommandDedupValue,
|
||||
DamlContractKeyState,
|
||||
DamlContractState,
|
||||
DamlLogEntry,
|
||||
@ -75,74 +69,19 @@ private[kvutils] class TransactionCommitter(
|
||||
override protected val steps: Steps[DamlTransactionEntrySummary] = Iterable(
|
||||
"authorize_submitter" -> authorizeSubmitters,
|
||||
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
|
||||
"overwrite_deduplication_period" -> overwriteDeduplicationPeriodWithMaxDuration,
|
||||
"deduplicate" -> deduplicateCommand,
|
||||
"overwrite_deduplication_period" -> CommandDeduplication
|
||||
.overwriteDeduplicationPeriodWithMaxDurationStep(defaultConfig),
|
||||
"deduplicate" -> CommandDeduplication.deduplicateCommandStep(rejections),
|
||||
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
|
||||
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
|
||||
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
|
||||
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
|
||||
"set_deduplication_entry" -> CommandDeduplication.setDeduplicationEntryStep(defaultConfig),
|
||||
"blind" -> blind,
|
||||
"trim_unnecessary_nodes" -> trimUnnecessaryNodes,
|
||||
"build_final_log_entry" -> buildFinalLogEntry,
|
||||
)
|
||||
|
||||
private[transaction] def overwriteDeduplicationPeriodWithMaxDuration: Step = new Step {
|
||||
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
|
||||
loggingContext: LoggingContext
|
||||
): StepResult[DamlTransactionEntrySummary] = {
|
||||
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
|
||||
val submission = input.submission.toBuilder
|
||||
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
|
||||
buildDuration(currentConfig.maxDeduplicationTime)
|
||||
)
|
||||
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
|
||||
}
|
||||
}
|
||||
|
||||
/** Reject duplicate commands
|
||||
*/
|
||||
private[transaction] def deduplicateCommand: Step = new Step {
|
||||
def apply(
|
||||
commitContext: CommitContext,
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
||||
commitContext.recordTime
|
||||
.map { recordTime =>
|
||||
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
|
||||
val dedupEntry = commitContext.get(dedupKey)
|
||||
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
|
||||
StepContinue(transactionEntry)
|
||||
} else {
|
||||
rejections.reject(
|
||||
DamlTransactionRejectionEntry.newBuilder
|
||||
.setSubmitterInfo(transactionEntry.submitterInfo)
|
||||
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
|
||||
.setDefiniteAnswer(false)
|
||||
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
|
||||
"the command is a duplicate",
|
||||
commitContext.recordTime,
|
||||
)
|
||||
}
|
||||
}
|
||||
.getOrElse(StepContinue(transactionEntry))
|
||||
}
|
||||
}
|
||||
|
||||
// Checks that the submission time of the command is after the
|
||||
// deduplicationTime represented by stateValue
|
||||
private def isAfterDeduplicationTime(
|
||||
submissionTime: Instant,
|
||||
stateValue: DamlStateValue,
|
||||
): Boolean = {
|
||||
val cmdDedup = stateValue.getCommandDedup
|
||||
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
|
||||
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
|
||||
dedupTime.isBefore(submissionTime)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/** Authorize the submission by looking up the party allocation and verifying
|
||||
* that all of the submitting parties are indeed hosted by the submitting participant.
|
||||
*/
|
||||
@ -197,8 +136,6 @@ private[kvutils] class TransactionCommitter(
|
||||
commitContext: CommitContext,
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
|
||||
setDedupEntry(commitContext, transactionEntry)
|
||||
|
||||
val blindingInfo = Blinding.blind(transactionEntry.transaction)
|
||||
|
||||
val divulgedContracts =
|
||||
@ -319,35 +256,6 @@ private[kvutils] class TransactionCommitter(
|
||||
}
|
||||
}
|
||||
|
||||
private[transaction] def setDedupEntry(
|
||||
commitContext: CommitContext,
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
)(implicit loggingContext: LoggingContext): Unit = {
|
||||
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
|
||||
// Deduplication duration must be explicitly overwritten in a previous step
|
||||
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
|
||||
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
|
||||
throw Err.InvalidSubmission("Deduplication duration is not set.")
|
||||
}
|
||||
val deduplicateUntil = Conversions.buildTimestamp(
|
||||
transactionEntry.submissionTime
|
||||
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
|
||||
.add(config.timeModel.minSkew)
|
||||
)
|
||||
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
|
||||
.setDeduplicatedUntil(deduplicateUntil)
|
||||
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
|
||||
// Set a deduplication entry.
|
||||
commitContext.set(
|
||||
commandDedupKey(transactionEntry.submitterInfo),
|
||||
DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
commandDedupBuilder.build
|
||||
)
|
||||
.build,
|
||||
)
|
||||
}
|
||||
|
||||
private def updateContractStateAndFetchDivulgedContracts(
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
blindingInfo: BlindingInfo,
|
||||
|
@ -0,0 +1,243 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||
|
||||
import java.time
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.configuration.Configuration
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
|
||||
import com.daml.ledger.participant.state.kvutils.TestHelpers._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||
DamlConfigurationEntry,
|
||||
DamlSubmitterInfo,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.google.protobuf
|
||||
import org.mockito.MockitoSugar
|
||||
import org.scalatest.Inside.inside
|
||||
import org.scalatest.OptionValues
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.prop.TableDrivenPropertyChecks._
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import scala.util.Random
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
class CommandDeduplicationSpec
|
||||
extends AnyWordSpec
|
||||
with Matchers
|
||||
with MockitoSugar
|
||||
with OptionValues {
|
||||
import CommandDeduplicationSpec._
|
||||
|
||||
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
|
||||
|
||||
private val metrics = new Metrics(new MetricRegistry)
|
||||
private val rejections = new Rejections(metrics)
|
||||
private val deduplicateCommandStep = CommandDeduplication.deduplicateCommandStep(rejections)
|
||||
private val setDeduplicationEntryStep =
|
||||
CommandDeduplication.setDeduplicationEntryStep(theDefaultConfig)
|
||||
|
||||
"deduplicateCommand" should {
|
||||
"continue if record time is not available" in {
|
||||
val context = createCommitContext(recordTime = None)
|
||||
|
||||
val actual =
|
||||
deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"continue if record time is available but no deduplication entry could be found" in {
|
||||
val inputs = Map(aDedupKey -> None)
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
|
||||
|
||||
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"continue if record time is after deduplication time in case a deduplication entry is found" in {
|
||||
val dedupValue = newDedupValue(aRecordTime)
|
||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
|
||||
|
||||
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"produce rejection log entry in case record time is on or before deduplication time" in {
|
||||
for (
|
||||
(recordTime, deduplicationTime) <- Iterable(
|
||||
(aRecordTime, aRecordTime),
|
||||
(aRecordTime, aRecordTime.addMicros(1)),
|
||||
)
|
||||
) {
|
||||
val dedupValue = newDedupValue(deduplicationTime)
|
||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
|
||||
|
||||
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => fail()
|
||||
case StepStop(actualLogEntry) =>
|
||||
actualLogEntry.hasTransactionRejectionEntry shouldBe true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"setting dedup context" should {
|
||||
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
|
||||
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
|
||||
val deduplicationDuration = time.Duration.ofSeconds(3)
|
||||
|
||||
"calculate deduplicate until based on deduplication duration" in {
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
submissionTime,
|
||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||
)
|
||||
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||
contextDeduplicateUntil(
|
||||
context,
|
||||
transactionEntrySummary,
|
||||
).value shouldBe protobuf.Timestamp
|
||||
.newBuilder()
|
||||
.setSeconds(
|
||||
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
|
||||
)
|
||||
.build()
|
||||
}
|
||||
|
||||
"set the submission time in the committer context" in {
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
submissionTime,
|
||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||
)
|
||||
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||
context
|
||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||
.map(
|
||||
_.getCommandDedup.getSubmissionTime
|
||||
)
|
||||
.value shouldBe submissionTime
|
||||
}
|
||||
|
||||
"throw an error for unsupported deduplication periods" in {
|
||||
forAll(
|
||||
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
|
||||
"deduplication setter",
|
||||
_.clearDeduplicationPeriod(),
|
||||
_.setDeduplicationOffset("offset"),
|
||||
_.setDeduplicateUntil(deduplicateUntil),
|
||||
)
|
||||
) { deduplicationSetter =>
|
||||
{
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(submissionTime, deduplicationSetter)
|
||||
a[Err.InvalidSubmission] shouldBe thrownBy(
|
||||
setDeduplicationEntryStep(context, transactionEntrySummary)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"overwriteDeduplicationPeriodWithMaxDuration" should {
|
||||
"set max deduplication duration as deduplication period" in {
|
||||
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
|
||||
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
|
||||
val commitContext = createCommitContext(
|
||||
None,
|
||||
Map(
|
||||
Conversions.configurationStateKey -> None
|
||||
),
|
||||
)
|
||||
val result =
|
||||
CommandDeduplication.overwriteDeduplicationPeriodWithMaxDurationStep(config)(
|
||||
commitContext,
|
||||
aTransactionEntrySummary,
|
||||
)
|
||||
inside(result) { case StepContinue(entry) =>
|
||||
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
|
||||
maxDeduplicationDuration
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
|
||||
DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
|
||||
)
|
||||
.build
|
||||
}
|
||||
|
||||
object CommandDeduplicationSpec {
|
||||
|
||||
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
|
||||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
||||
private val aRecordTime = Timestamp(100)
|
||||
private val aDedupKey = Conversions
|
||||
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
|
||||
private val aDamlConfigurationStateValue = DamlStateValue.newBuilder
|
||||
.setConfigurationEntry(
|
||||
DamlConfigurationEntry.newBuilder
|
||||
.setConfiguration(Configuration.encode(theDefaultConfig))
|
||||
)
|
||||
.build
|
||||
|
||||
private def buildContextAndTransaction(
|
||||
submissionTime: protobuf.Timestamp,
|
||||
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
|
||||
) = {
|
||||
val context = createCommitContext(None)
|
||||
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
|
||||
val transactionEntrySummary = DamlTransactionEntrySummary(
|
||||
aDamlTransactionEntry.toBuilder
|
||||
.setSubmitterInfo(
|
||||
submitterInfoAugmenter(
|
||||
DamlSubmitterInfo
|
||||
.newBuilder()
|
||||
)
|
||||
)
|
||||
.setSubmissionTime(submissionTime)
|
||||
.build()
|
||||
)
|
||||
context -> transactionEntrySummary
|
||||
}
|
||||
|
||||
private def contextDeduplicateUntil(
|
||||
context: CommitContext,
|
||||
transactionEntrySummary: DamlTransactionEntrySummary,
|
||||
) = context
|
||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||
.map(
|
||||
_.getCommandDedup.getDeduplicatedUntil
|
||||
)
|
||||
}
|
@ -3,48 +3,39 @@
|
||||
|
||||
package com.daml.ledger.participant.state.kvutils.committer.transaction
|
||||
|
||||
import java.time
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.configuration.Configuration
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
|
||||
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
|
||||
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
|
||||
import com.daml.ledger.participant.state.kvutils.TestHelpers._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
|
||||
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
|
||||
import com.daml.ledger.participant.state.kvutils.store.events.{
|
||||
DamlConfigurationEntry,
|
||||
DamlSubmitterInfo,
|
||||
DamlTransactionRejectionEntry,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.store.{
|
||||
DamlCommandDedupValue,
|
||||
DamlPartyAllocation,
|
||||
DamlStateKey,
|
||||
DamlStateValue,
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, committer}
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, committer}
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.lf.data.{ImmArray, Ref}
|
||||
import com.daml.lf.engine.Engine
|
||||
import com.daml.lf.transaction._
|
||||
import com.daml.lf.transaction.test.TransactionBuilder
|
||||
import com.daml.lf.transaction.Node
|
||||
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
|
||||
import com.daml.lf.value.{Value, ValueOuterClass}
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.google.protobuf
|
||||
import com.google.protobuf.Duration
|
||||
import org.mockito.MockitoSugar
|
||||
import org.scalatest.Inside.inside
|
||||
import org.scalatest.OptionValues
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.prop.TableDrivenPropertyChecks._
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.Random
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
class TransactionCommitterSpec
|
||||
@ -173,149 +164,6 @@ class TransactionCommitterSpec
|
||||
}
|
||||
}
|
||||
|
||||
"deduplicateCommand" should {
|
||||
"continue if record time is not available" in {
|
||||
val context = createCommitContext(recordTime = None)
|
||||
|
||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"continue if record time is available but no deduplication entry could be found" in {
|
||||
val inputs = Map(aDedupKey -> None)
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
|
||||
|
||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"continue if record time is after deduplication time in case a deduplication entry is found" in {
|
||||
val dedupValue = newDedupValue(aRecordTime)
|
||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
|
||||
|
||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => succeed
|
||||
case StepStop(_) => fail()
|
||||
}
|
||||
}
|
||||
|
||||
"produce rejection log entry in case record time is on or before deduplication time" in {
|
||||
for (
|
||||
(recordTime, deduplicationTime) <- Iterable(
|
||||
(aRecordTime, aRecordTime),
|
||||
(aRecordTime, aRecordTime.addMicros(1)),
|
||||
)
|
||||
) {
|
||||
val dedupValue = newDedupValue(deduplicationTime)
|
||||
val inputs = Map(aDedupKey -> Some(dedupValue))
|
||||
val context =
|
||||
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
|
||||
|
||||
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
actual match {
|
||||
case StepContinue(_) => fail()
|
||||
case StepStop(actualLogEntry) =>
|
||||
actualLogEntry.hasTransactionRejectionEntry shouldBe true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"setting dedup context" should {
|
||||
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
|
||||
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
|
||||
val deduplicationDuration = time.Duration.ofSeconds(3)
|
||||
|
||||
"calculate deduplicate until based on deduplication duration" in {
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
submissionTime,
|
||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||
)
|
||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
||||
contextDeduplicateUntil(
|
||||
context,
|
||||
transactionEntrySummary,
|
||||
).value shouldBe protobuf.Timestamp
|
||||
.newBuilder()
|
||||
.setSeconds(
|
||||
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
|
||||
)
|
||||
.build()
|
||||
}
|
||||
|
||||
"set the submission time in the committer context" in {
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(
|
||||
submissionTime,
|
||||
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
|
||||
)
|
||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
||||
context
|
||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||
.map(
|
||||
_.getCommandDedup.getSubmissionTime
|
||||
)
|
||||
.value shouldBe submissionTime
|
||||
}
|
||||
|
||||
"throw an error for unsupported deduplication periods" in {
|
||||
forAll(
|
||||
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
|
||||
"deduplication setter",
|
||||
_.clearDeduplicationPeriod(),
|
||||
_.setDeduplicationOffset("offset"),
|
||||
_.setDeduplicateUntil(deduplicateUntil),
|
||||
)
|
||||
) { deduplicationSetter =>
|
||||
{
|
||||
val (context, transactionEntrySummary) =
|
||||
buildContextAndTransaction(submissionTime, deduplicationSetter)
|
||||
a[Err.InvalidSubmission] shouldBe thrownBy(
|
||||
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"overwriteDeduplicationPeriodWithMaxDuration" should {
|
||||
"set max deduplication duration as deduplication period" in {
|
||||
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
|
||||
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
|
||||
val commitContext = createCommitContext(
|
||||
None,
|
||||
Map(
|
||||
Conversions.configurationStateKey -> None
|
||||
),
|
||||
)
|
||||
val committer = createTransactionCommitter(config)
|
||||
val result = committer.overwriteDeduplicationPeriodWithMaxDuration(
|
||||
commitContext,
|
||||
aTransactionEntrySummary,
|
||||
)
|
||||
inside(result) { case StepContinue(entry) =>
|
||||
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
|
||||
maxDeduplicationDuration
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"buildLogEntry" should {
|
||||
"set record time in log entry when it is available" in {
|
||||
val context = createCommitContext(recordTime = Some(theRecordTime))
|
||||
@ -426,13 +274,6 @@ class TransactionCommitterSpec
|
||||
metrics,
|
||||
)
|
||||
|
||||
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
|
||||
DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
|
||||
)
|
||||
.build
|
||||
|
||||
private def create(
|
||||
contractId: ContractId,
|
||||
signatories: Set[Ref.Party] = Set(aKeyMaintainer),
|
||||
@ -474,8 +315,6 @@ object TransactionCommitterSpec {
|
||||
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
|
||||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
||||
private val aRecordTime = Timestamp(100)
|
||||
private val aDedupKey = Conversions
|
||||
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
|
||||
private val aDummyValue = TransactionBuilder.record("field" -> "value")
|
||||
private val aKey = "key"
|
||||
private val aKeyMaintainer = "maintainer"
|
||||
@ -622,32 +461,4 @@ object TransactionCommitterSpec {
|
||||
private def lookupByKeyNodeBuilder =
|
||||
TransactionOuterClass.NodeLookupByKey.newBuilder()
|
||||
|
||||
private def buildContextAndTransaction(
|
||||
submissionTime: protobuf.Timestamp,
|
||||
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
|
||||
) = {
|
||||
val context = createCommitContext(None)
|
||||
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
|
||||
val transactionEntrySummary = DamlTransactionEntrySummary(
|
||||
aDamlTransactionEntry.toBuilder
|
||||
.setSubmitterInfo(
|
||||
submitterInfoAugmenter(
|
||||
DamlSubmitterInfo
|
||||
.newBuilder()
|
||||
)
|
||||
)
|
||||
.setSubmissionTime(submissionTime)
|
||||
.build()
|
||||
)
|
||||
context -> transactionEntrySummary
|
||||
}
|
||||
|
||||
private def contextDeduplicateUntil(
|
||||
context: CommitContext,
|
||||
transactionEntrySummary: DamlTransactionEntrySummary,
|
||||
) = context
|
||||
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
|
||||
.map(
|
||||
_.getCommandDedup.getDeduplicatedUntil
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user