[kvutils] - Extract command deduplication steps from the TransactionCommitter [KVL-1174] (#11547)

CHANGELOG_BEGIN

CHANGELOG_END
This commit is contained in:
nicu-da 2021-11-04 09:29:04 -07:00 committed by GitHub
parent 0e95ccb354
commit 7bc0f82733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 368 additions and 289 deletions

View File

@ -0,0 +1,117 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committer.transaction
import java.time.Instant
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.kvutils.Conversions.{
buildDuration,
commandDedupKey,
parseDuration,
parseTimestamp,
}
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlTransactionRejectionEntry,
Duplicate,
}
import com.daml.logging.LoggingContext
private[transaction] object CommandDeduplication {
def overwriteDeduplicationPeriodWithMaxDurationStep(
defaultConfig: Configuration
): Step = new Step {
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
val submission = input.submission.toBuilder
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
buildDuration(currentConfig.maxDeduplicationTime)
)
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
}
}
/** Reject duplicate commands
*/
def deduplicateCommandStep(rejections: Rejections): Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime
.map { recordTime =>
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val dedupEntry = commitContext.get(dedupKey)
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
StepContinue(transactionEntry)
} else {
rejections.reject(
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
.setDefiniteAnswer(false)
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
"the command is a duplicate",
commitContext.recordTime,
)
}
}
.getOrElse(StepContinue(transactionEntry))
}
}
def setDeduplicationEntryStep(defaultConfig: Configuration): Step =
new Step {
def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)(
implicit loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
// Deduplication duration must be explicitly overwritten in a previous step
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
throw Err.InvalidSubmission("Deduplication duration is not set.")
}
val deduplicateUntil = Conversions.buildTimestamp(
transactionEntry.submissionTime
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
.add(config.timeModel.minSkew)
)
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
.setDeduplicatedUntil(deduplicateUntil)
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
// Set a deduplication entry.
commitContext.set(
commandDedupKey(transactionEntry.submitterInfo),
DamlStateValue.newBuilder
.setCommandDedup(
commandDedupBuilder.build
)
.build,
)
StepContinue(transactionEntry)
}
}
// Checks that the submission time of the command is after the
// deduplicationTime represented by stateValue
private def isAfterDeduplicationTime(
submissionTime: Instant,
stateValue: DamlStateValue,
): Boolean = {
val cmdDedup = stateValue.getCommandDedup
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
dedupTime.isBefore(submissionTime)
} else {
false
}
}
}

View File

@ -6,8 +6,6 @@
package com.daml.ledger.participant.state.kvutils.committer.transaction
import java.time.Instant
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
@ -17,12 +15,8 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.validatio
ModelConformanceValidator,
TransactionConsistencyValidator,
}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlTransactionRejectionEntry,
Duplicate,
}
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{
DamlCommandDedupValue,
DamlContractKeyState,
DamlContractState,
DamlLogEntry,
@ -75,74 +69,19 @@ private[kvutils] class TransactionCommitter(
override protected val steps: Steps[DamlTransactionEntrySummary] = Iterable(
"authorize_submitter" -> authorizeSubmitters,
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
"overwrite_deduplication_period" -> overwriteDeduplicationPeriodWithMaxDuration,
"deduplicate" -> deduplicateCommand,
"overwrite_deduplication_period" -> CommandDeduplication
.overwriteDeduplicationPeriodWithMaxDurationStep(defaultConfig),
"deduplicate" -> CommandDeduplication.deduplicateCommandStep(rejections),
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
"set_deduplication_entry" -> CommandDeduplication.setDeduplicationEntryStep(defaultConfig),
"blind" -> blind,
"trim_unnecessary_nodes" -> trimUnnecessaryNodes,
"build_final_log_entry" -> buildFinalLogEntry,
)
private[transaction] def overwriteDeduplicationPeriodWithMaxDuration: Step = new Step {
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
val submission = input.submission.toBuilder
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
buildDuration(currentConfig.maxDeduplicationTime)
)
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
}
}
/** Reject duplicate commands
*/
private[transaction] def deduplicateCommand: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime
.map { recordTime =>
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val dedupEntry = commitContext.get(dedupKey)
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
StepContinue(transactionEntry)
} else {
rejections.reject(
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
.setDefiniteAnswer(false)
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
"the command is a duplicate",
commitContext.recordTime,
)
}
}
.getOrElse(StepContinue(transactionEntry))
}
}
// Checks that the submission time of the command is after the
// deduplicationTime represented by stateValue
private def isAfterDeduplicationTime(
submissionTime: Instant,
stateValue: DamlStateValue,
): Boolean = {
val cmdDedup = stateValue.getCommandDedup
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
dedupTime.isBefore(submissionTime)
} else {
false
}
}
/** Authorize the submission by looking up the party allocation and verifying
* that all of the submitting parties are indeed hosted by the submitting participant.
*/
@ -197,8 +136,6 @@ private[kvutils] class TransactionCommitter(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
setDedupEntry(commitContext, transactionEntry)
val blindingInfo = Blinding.blind(transactionEntry.transaction)
val divulgedContracts =
@ -319,35 +256,6 @@ private[kvutils] class TransactionCommitter(
}
}
private[transaction] def setDedupEntry(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): Unit = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
// Deduplication duration must be explicitly overwritten in a previous step
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
throw Err.InvalidSubmission("Deduplication duration is not set.")
}
val deduplicateUntil = Conversions.buildTimestamp(
transactionEntry.submissionTime
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
.add(config.timeModel.minSkew)
)
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
.setDeduplicatedUntil(deduplicateUntil)
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
// Set a deduplication entry.
commitContext.set(
commandDedupKey(transactionEntry.submitterInfo),
DamlStateValue.newBuilder
.setCommandDedup(
commandDedupBuilder.build
)
.build,
)
}
private def updateContractStateAndFetchDivulgedContracts(
transactionEntry: DamlTransactionEntrySummary,
blindingInfo: BlindingInfo,

View File

@ -0,0 +1,243 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.committer.transaction
import java.time
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
DamlSubmitterInfo,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.google.protobuf
import org.mockito.MockitoSugar
import org.scalatest.Inside.inside
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.wordspec.AnyWordSpec
import scala.annotation.nowarn
import scala.util.Random
@nowarn("msg=deprecated")
class CommandDeduplicationSpec
extends AnyWordSpec
with Matchers
with MockitoSugar
with OptionValues {
import CommandDeduplicationSpec._
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private val metrics = new Metrics(new MetricRegistry)
private val rejections = new Rejections(metrics)
private val deduplicateCommandStep = CommandDeduplication.deduplicateCommandStep(rejections)
private val setDeduplicationEntryStep =
CommandDeduplication.setDeduplicationEntryStep(theDefaultConfig)
"deduplicateCommand" should {
"continue if record time is not available" in {
val context = createCommitContext(recordTime = None)
val actual =
deduplicateCommandStep(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"continue if record time is available but no deduplication entry could be found" in {
val inputs = Map(aDedupKey -> None)
val context =
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"continue if record time is after deduplication time in case a deduplication entry is found" in {
val dedupValue = newDedupValue(aRecordTime)
val inputs = Map(aDedupKey -> Some(dedupValue))
val context =
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"produce rejection log entry in case record time is on or before deduplication time" in {
for (
(recordTime, deduplicationTime) <- Iterable(
(aRecordTime, aRecordTime),
(aRecordTime, aRecordTime.addMicros(1)),
)
) {
val dedupValue = newDedupValue(deduplicationTime)
val inputs = Map(aDedupKey -> Some(dedupValue))
val context =
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
val actual = deduplicateCommandStep(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasTransactionRejectionEntry shouldBe true
}
}
}
"setting dedup context" should {
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
val deduplicationDuration = time.Duration.ofSeconds(3)
"calculate deduplicate until based on deduplication duration" in {
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
)
setDeduplicationEntryStep(context, transactionEntrySummary)
contextDeduplicateUntil(
context,
transactionEntrySummary,
).value shouldBe protobuf.Timestamp
.newBuilder()
.setSeconds(
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
)
.build()
}
"set the submission time in the committer context" in {
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
)
setDeduplicationEntryStep(context, transactionEntrySummary)
context
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
.map(
_.getCommandDedup.getSubmissionTime
)
.value shouldBe submissionTime
}
"throw an error for unsupported deduplication periods" in {
forAll(
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
"deduplication setter",
_.clearDeduplicationPeriod(),
_.setDeduplicationOffset("offset"),
_.setDeduplicateUntil(deduplicateUntil),
)
) { deduplicationSetter =>
{
val (context, transactionEntrySummary) =
buildContextAndTransaction(submissionTime, deduplicationSetter)
a[Err.InvalidSubmission] shouldBe thrownBy(
setDeduplicationEntryStep(context, transactionEntrySummary)
)
}
}
}
}
"overwriteDeduplicationPeriodWithMaxDuration" should {
"set max deduplication duration as deduplication period" in {
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
val commitContext = createCommitContext(
None,
Map(
Conversions.configurationStateKey -> None
),
)
val result =
CommandDeduplication.overwriteDeduplicationPeriodWithMaxDurationStep(config)(
commitContext,
aTransactionEntrySummary,
)
inside(result) { case StepContinue(entry) =>
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
maxDeduplicationDuration
)
}
}
}
}
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
)
.build
}
object CommandDeduplicationSpec {
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
private val aRecordTime = Timestamp(100)
private val aDedupKey = Conversions
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
private val aDamlConfigurationStateValue = DamlStateValue.newBuilder
.setConfigurationEntry(
DamlConfigurationEntry.newBuilder
.setConfiguration(Configuration.encode(theDefaultConfig))
)
.build
private def buildContextAndTransaction(
submissionTime: protobuf.Timestamp,
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
) = {
val context = createCommitContext(None)
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
val transactionEntrySummary = DamlTransactionEntrySummary(
aDamlTransactionEntry.toBuilder
.setSubmitterInfo(
submitterInfoAugmenter(
DamlSubmitterInfo
.newBuilder()
)
)
.setSubmissionTime(submissionTime)
.build()
)
context -> transactionEntrySummary
}
private def contextDeduplicateUntil(
context: CommitContext,
transactionEntrySummary: DamlTransactionEntrySummary,
) = context
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
.map(
_.getCommandDedup.getDeduplicatedUntil
)
}

View File

@ -3,48 +3,39 @@
package com.daml.ledger.participant.state.kvutils.committer.transaction
import java.time
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions.{buildDuration, buildTimestamp}
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepStop}
import com.daml.ledger.participant.state.kvutils.committer.{StepContinue, StepStop}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
DamlSubmitterInfo,
DamlTransactionRejectionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{
DamlCommandDedupValue,
DamlPartyAllocation,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, committer}
import com.daml.ledger.participant.state.kvutils.{Conversions, committer}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.data.{ImmArray, Ref}
import com.daml.lf.engine.Engine
import com.daml.lf.transaction._
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value.{ContractId, ValueRecord, ValueText}
import com.daml.lf.value.{Value, ValueOuterClass}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.google.protobuf
import com.google.protobuf.Duration
import org.mockito.MockitoSugar
import org.scalatest.Inside.inside
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.wordspec.AnyWordSpec
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.util.Random
@nowarn("msg=deprecated")
class TransactionCommitterSpec
@ -173,149 +164,6 @@ class TransactionCommitterSpec
}
}
"deduplicateCommand" should {
"continue if record time is not available" in {
val context = createCommitContext(recordTime = None)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"continue if record time is available but no deduplication entry could be found" in {
val inputs = Map(aDedupKey -> None)
val context =
createCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"continue if record time is after deduplication time in case a deduplication entry is found" in {
val dedupValue = newDedupValue(aRecordTime)
val inputs = Map(aDedupKey -> Some(dedupValue))
val context =
createCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => succeed
case StepStop(_) => fail()
}
}
"produce rejection log entry in case record time is on or before deduplication time" in {
for (
(recordTime, deduplicationTime) <- Iterable(
(aRecordTime, aRecordTime),
(aRecordTime, aRecordTime.addMicros(1)),
)
) {
val dedupValue = newDedupValue(deduplicationTime)
val inputs = Map(aDedupKey -> Some(dedupValue))
val context =
createCommitContext(recordTime = Some(recordTime), inputs = inputs)
val actual = transactionCommitter.deduplicateCommand(context, aTransactionEntrySummary)
actual match {
case StepContinue(_) => fail()
case StepStop(actualLogEntry) =>
actualLogEntry.hasTransactionRejectionEntry shouldBe true
}
}
}
"setting dedup context" should {
val deduplicateUntil = protobuf.Timestamp.newBuilder().setSeconds(30).build()
val submissionTime = protobuf.Timestamp.newBuilder().setSeconds(60).build()
val deduplicationDuration = time.Duration.ofSeconds(3)
"calculate deduplicate until based on deduplication duration" in {
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
)
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
contextDeduplicateUntil(
context,
transactionEntrySummary,
).value shouldBe protobuf.Timestamp
.newBuilder()
.setSeconds(
submissionTime.getSeconds + deduplicationDuration.getSeconds + theDefaultConfig.timeModel.minSkew.getSeconds
)
.build()
}
"set the submission time in the committer context" in {
val (context, transactionEntrySummary) =
buildContextAndTransaction(
submissionTime,
_.setDeduplicationDuration(Conversions.buildDuration(deduplicationDuration)),
)
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
context
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
.map(
_.getCommandDedup.getSubmissionTime
)
.value shouldBe submissionTime
}
"throw an error for unsupported deduplication periods" in {
forAll(
Table[DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder](
"deduplication setter",
_.clearDeduplicationPeriod(),
_.setDeduplicationOffset("offset"),
_.setDeduplicateUntil(deduplicateUntil),
)
) { deduplicationSetter =>
{
val (context, transactionEntrySummary) =
buildContextAndTransaction(submissionTime, deduplicationSetter)
a[Err.InvalidSubmission] shouldBe thrownBy(
transactionCommitter.setDedupEntry(context, transactionEntrySummary)
)
}
}
}
}
"overwriteDeduplicationPeriodWithMaxDuration" should {
"set max deduplication duration as deduplication period" in {
val maxDeduplicationDuration = time.Duration.ofSeconds(Random.nextLong())
val config = theDefaultConfig.copy(maxDeduplicationTime = maxDeduplicationDuration)
val commitContext = createCommitContext(
None,
Map(
Conversions.configurationStateKey -> None
),
)
val committer = createTransactionCommitter(config)
val result = committer.overwriteDeduplicationPeriodWithMaxDuration(
commitContext,
aTransactionEntrySummary,
)
inside(result) { case StepContinue(entry) =>
entry.submitterInfo.getDeduplicationDuration shouldBe buildDuration(
maxDeduplicationDuration
)
}
}
}
}
"buildLogEntry" should {
"set record time in log entry when it is available" in {
val context = createCommitContext(recordTime = Some(theRecordTime))
@ -426,13 +274,6 @@ class TransactionCommitterSpec
metrics,
)
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime))
)
.build
private def create(
contractId: ContractId,
signatories: Set[Ref.Party] = Set(aKeyMaintainer),
@ -474,8 +315,6 @@ object TransactionCommitterSpec {
private val aDamlTransactionEntry = createEmptyTransactionEntry(List("aSubmitter"))
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
private val aRecordTime = Timestamp(100)
private val aDedupKey = Conversions
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
private val aDummyValue = TransactionBuilder.record("field" -> "value")
private val aKey = "key"
private val aKeyMaintainer = "maintainer"
@ -622,32 +461,4 @@ object TransactionCommitterSpec {
private def lookupByKeyNodeBuilder =
TransactionOuterClass.NodeLookupByKey.newBuilder()
private def buildContextAndTransaction(
submissionTime: protobuf.Timestamp,
submitterInfoAugmenter: DamlSubmitterInfo.Builder => DamlSubmitterInfo.Builder,
) = {
val context = createCommitContext(None)
context.set(Conversions.configurationStateKey, aDamlConfigurationStateValue)
val transactionEntrySummary = DamlTransactionEntrySummary(
aDamlTransactionEntry.toBuilder
.setSubmitterInfo(
submitterInfoAugmenter(
DamlSubmitterInfo
.newBuilder()
)
)
.setSubmissionTime(submissionTime)
.build()
)
context -> transactionEntrySummary
}
private def contextDeduplicateUntil(
context: CommitContext,
transactionEntrySummary: DamlTransactionEntrySummary,
) = context
.get(Conversions.commandDedupKey(transactionEntrySummary.submitterInfo))
.map(
_.getCommandDedup.getDeduplicatedUntil
)
}