[Sandbox-on-X] Ledger-side in-memory command deduplication [DPP-872] (#12596)

* [Sandbox-on-X] Ledger-side in-memory command deduplication

changelog_begin
changelog_end

* Enirched SequenceSpec test

* Participant-side deduplication compatibility tests exclusions

* Fix LedgerConfigurationServiceIT

* Disable command deduplication tests for SoX without conflict checking

* Remove redundant max-dedup-seconds bridge config

* Default max deduplication time 5 minutes

* Deduplication queue length metrics

* Add back removed ledger config test in SequenceSpec
This commit is contained in:
tudor-da 2022-01-28 12:42:25 +01:00 committed by GitHub
parent 7567cf50aa
commit cfa8d30491
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 485 additions and 59 deletions

View File

@ -715,6 +715,19 @@ excluded_test_tool_tests = [
},
],
},
{
# Sandbox-on-X doesn't use participant-side command deduplication starting with next release,
# hence older tests will fail to assert it.
"end": "2.0.0-snapshot.20220126.9029.1",
"platform_ranges": [
{
"start": "2.0.0-snapshot.20220126.9029.1",
"exclusions": [
"CommandDeduplicationIT:ParticipantCommandDeduplication",
],
},
],
},
]
def in_range(version, range):

View File

@ -5,6 +5,7 @@ package com.daml.error.definitions
import com.daml.error._
import com.daml.error.definitions.ErrorGroups.ParticipantErrorGroup.LedgerApiErrorGroup
import com.daml.ledger.participant.state.v2.ChangeId
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.engine.Error.Validation.ReplayMismatch
@ -760,6 +761,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
case class Reject(
_definiteAnswer: Boolean = false,
_existingCommandSubmissionId: Option[String],
_changeId: Option[ChangeId] = None,
)(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
@ -767,9 +769,12 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
definiteAnswer = _definiteAnswer,
) {
override def context: Map[String, String] =
super.context ++ _existingCommandSubmissionId.map("existing_submission_id" -> _).toList
super.context ++ _existingCommandSubmissionId
.map("existing_submission_id" -> _)
.toList ++ _changeId
.map(changeId => Seq("changeId" -> changeId.toString))
.getOrElse(Seq.empty)
}
}
@Explanation("An input contract has been archived by a concurrent transaction submission.")

View File

@ -190,8 +190,10 @@ test_deps = [
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils/app",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//ledger/sandbox-on-x:sandbox-on-x",
"//ledger/test-common",
"//ledger/test-common:dar-files-default-lib",
"//libs-scala/contextualized-logging",

View File

@ -60,7 +60,6 @@ object ConfigConverter {
// TODO SoX-to-sandbox-classic: Dedicated submissionBufferSize CLI param for sanbox-classic
submissionBufferSize = sandboxConfig.maxParallelSubmissions,
// TODO SoX-to-sandbox-classic: Dedicated submissionBufferSize CLI param for sanbox-classic
maxDedupSeconds = BridgeConfigProvider.defaultExtraConfig.maxDedupSeconds,
profileDir = sandboxConfig.profileDir,
stackTraces = sandboxConfig.stackTraces,
)

View File

@ -9,6 +9,7 @@ import com.daml.ledger.api.v1.ledger_configuration_service.{
LedgerConfiguration,
LedgerConfigurationServiceGrpc,
}
import com.daml.ledger.sandbox.BridgeConfigProvider
import com.daml.platform.sandbox.SandboxBackend
import com.daml.platform.sandbox.services.SandboxFixture
import com.google.protobuf.duration.Duration
@ -30,7 +31,7 @@ sealed trait LedgerConfigurationServiceITBase extends AnyWordSpec with Matchers
.getLedgerConfiguration
maxDeduplicationTime shouldEqual toProto(
config.initialLedgerConfiguration.configuration.maxDeduplicationTime
BridgeConfigProvider.DefaultMaximumDeduplicationTime
)
}
}

View File

@ -155,6 +155,8 @@ conformance_test(
"--exclude=ExceptionsIT:ExRollbackDuplicateKeyCreated",
"--exclude=ExceptionsIT:ExRollbackDuplicateKeyArchived",
"--exclude=ConfigManagementServiceIT:CMConcurrentSetConflicting",
"--exclude=CommandDeduplication",
"--exclude=CommandServiceIT:CSduplicate",
],
)

View File

@ -7,18 +7,18 @@ import com.daml.jwt.JwtVerifierConfigurationCli
import com.daml.ledger.api.auth.{AuthService, AuthServiceJWT, AuthServiceWildcard}
import com.daml.ledger.participant.state.kvutils.app.{Config, ConfigProvider}
import com.daml.platform.apiserver.TimeServiceBackend
import com.daml.platform.configuration.InitialLedgerConfiguration
import com.daml.platform.services.time.TimeProviderType
import scopt.OptionParser
import java.io.File
import java.nio.file.Path
import java.time.Instant
import java.time.{Duration, Instant}
// TODO SoX: Keep only ledger-bridge-related configurations in this class
// and extract the participant-specific configs in the main config file.
case class BridgeConfig(
conflictCheckingEnabled: Boolean,
maxDedupSeconds: Int,
submissionBufferSize: Int,
implicitPartyAllocation: Boolean,
timeProviderType: TimeProviderType,
@ -29,11 +29,6 @@ case class BridgeConfig(
object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
override def extraConfigParser(parser: OptionParser[Config[BridgeConfig]]): Unit = {
parser
.opt[Int]("bridge-max-dedup-seconds")
.text("Maximum deduplication time in seconds. Defaults to 30.")
.action((p, c) => c.copy(extra = c.extra.copy(maxDedupSeconds = p)))
parser
.opt[Int]("bridge-submission-buffer-size")
.text("Submission buffer size. Defaults to 500.")
@ -80,6 +75,14 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
JwtVerifierConfigurationCli.parse(parser)((v, c) =>
c.copy(extra = c.extra.copy(authService = AuthServiceJWT(v)))
)
parser.checkConfig(c =>
Either.cond(
c.maxDeduplicationDuration.forall(_.compareTo(Duration.ofHours(1L)) <= 0),
(),
"Maximum supported deduplication duration is one hour",
)
)
()
}
@ -89,10 +92,16 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
case TimeProviderType.WallClock => None
}
override def initialLedgerConfig(config: Config[BridgeConfig]): InitialLedgerConfiguration = {
val superConfig = super.initialLedgerConfig(config)
superConfig.copy(configuration =
superConfig.configuration.copy(maxDeduplicationTime = DefaultMaximumDeduplicationTime)
)
}
override val defaultExtraConfig: BridgeConfig = BridgeConfig(
// TODO SoX: Enabled by default
conflictCheckingEnabled = false,
maxDedupSeconds = 30,
submissionBufferSize = 500,
implicitPartyAllocation = false,
timeProviderType = TimeProviderType.WallClock,
@ -100,4 +109,6 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
profileDir = None,
stackTraces = false,
)
val DefaultMaximumDeduplicationTime: Duration = Duration.ofMinutes(5L)
}

View File

@ -17,9 +17,11 @@ import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import java.time.Duration
class BridgeReadService(
ledgerId: LedgerId,
maxDedupSeconds: Int,
maximumDeduplicationDuration: Duration,
stateUpdatesSource: Source[(Offset, Update), NotUsed],
)(implicit
loggingContext: LoggingContext
@ -36,7 +38,7 @@ class BridgeReadService(
config = Configuration(
generation = 1L,
timeModel = LedgerTimeModel.reasonableDefault,
maxDeduplicationTime = java.time.Duration.ofSeconds(maxDedupSeconds.toLong),
maxDeduplicationTime = maximumDeduplicationDuration,
),
initialRecordTime = Timestamp.now(),
)

View File

@ -9,18 +9,20 @@ import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.definitions.LedgerApiErrors
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2._
import com.daml.ledger.sandbox.bridge.{BridgeMetrics, LedgerBridge}
import com.daml.ledger.sandbox.domain.Submission
import com.daml.ledger.sandbox.domain.{Rejection, Submission}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.InstrumentedSource
import com.daml.telemetry.TelemetryContext
import java.time.Duration
import java.util.concurrent.{CompletableFuture, CompletionStage}
class BridgeWriteService(
@ -35,7 +37,12 @@ class BridgeWriteService(
private[this] val logger = ContextualizedLogger.get(getClass)
override def isApiDeduplicationEnabled: Boolean = true
override def isApiDeduplicationEnabled: Boolean = false
override def close(): Unit = {
logger.info("Shutting down BridgeWriteService.")
queue.complete()
}
override def submitTransaction(
submitterInfo: SubmitterInfo,
@ -45,15 +52,33 @@ class BridgeWriteService(
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
): CompletionStage[SubmissionResult] = {
implicit val errorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, submitterInfo.submissionId)
submitterInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(deduplicationDuration) =>
validateDeduplicationDurationAndSubmit(
submitterInfo,
transactionMeta,
transaction,
estimatedInterpretationCost,
deduplicationDuration,
)
case DeduplicationPeriod.DeduplicationOffset(_) =>
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.LedgerBridgeInternalError(
new RuntimeException(
"Deduplication offset periods are not supported in Sandbox-on-X ledger bridge"
),
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
}
}
override def submitConfiguration(
maxRecordTime: Time.Timestamp,
@ -136,9 +161,35 @@ class BridgeWriteService(
private def submit(submission: Submission): CompletionStage[SubmissionResult] =
toSubmissionResult(submission.submissionId, queue.offer(submission))
override def close(): Unit = {
logger.info("Shutting down BridgeLedgerFactory.")
queue.complete()
private def validateDeduplicationDurationAndSubmit(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
deduplicationDuration: Duration,
)(implicit errorLogger: ContextualizedErrorLogger): CompletionStage[SubmissionResult] = {
val maxDeduplicationDuration = submitterInfo.ledgerConfiguration.maxDeduplicationTime
if (deduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.MaxDeduplicationDurationExceeded(
deduplicationDuration,
maxDeduplicationDuration,
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
else
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
)
)
}
}

View File

@ -152,7 +152,9 @@ object SandboxOnXRunner {
readServiceWithSubscriber = new BridgeReadService(
ledgerId = config.ledgerId,
maxDedupSeconds = config.extra.maxDedupSeconds,
maximumDeduplicationDuration = config.maxDeduplicationDuration.getOrElse(
BridgeConfigProvider.DefaultMaximumDeduplicationTime
),
stateUpdatesSource,
)
@ -255,8 +257,8 @@ object SandboxOnXRunner {
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
deduplicationType = CommandDeduplicationType.SYNC_ONLY,
maxDeduplicationDurationEnforced = false,
deduplicationType = CommandDeduplicationType.ASYNC_ONLY,
maxDeduplicationDurationEnforced = true,
),
contractIdFeatures = ExperimentalContractIds.of(
v1 = ExperimentalContractIds.ContractIdV1Support.NON_SUFFIXED

View File

@ -30,6 +30,7 @@ class BridgeMetrics(metrics: Metrics) {
val keyStateSize: Histogram = registry.histogram(Prefix :+ "keys")
val consumedContractsStateSize: Histogram = registry.histogram(Prefix :+ "consumed_contracts")
val sequencerQueueLength: Histogram = registry.histogram(Prefix :+ "queue")
val deduplicationQueueLength: Histogram = registry.histogram(Prefix :+ "deduplication_queue")
}
object InputQueue {

View File

@ -12,7 +12,7 @@ import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.kvutils.app.{Config, ParticipantConfig}
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.resources.ResourceOwner
import com.daml.ledger.sandbox.BridgeConfig
import com.daml.ledger.sandbox.{BridgeConfig, BridgeConfigProvider}
import com.daml.ledger.sandbox.bridge.validate.ConflictCheckingLedgerBridge
import com.daml.ledger.sandbox.domain.Submission
import com.daml.lf.data.Ref.ParticipantId
@ -90,6 +90,11 @@ object LedgerBridge {
),
validatePartyAllocation = !config.extra.implicitPartyAllocation,
servicesThreadPoolSize = servicesThreadPoolSize,
maxDeduplicationDuration = initialLedgerConfiguration
.map(_.maxDeduplicationTime)
.getOrElse(
BridgeConfigProvider.initialLedgerConfig(config).configuration.maxDeduplicationTime
),
)
private[bridge] def packageUploadSuccess(

View File

@ -20,6 +20,7 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.events._
import java.time.Duration
import scala.concurrent.{ExecutionContext, Future}
private[validate] class ConflictCheckingLedgerBridge(
@ -63,6 +64,7 @@ private[bridge] object ConflictCheckingLedgerBridge {
errorFactories: ErrorFactories,
validatePartyAllocation: Boolean,
servicesThreadPoolSize: Int,
maxDeduplicationDuration: Duration,
)(implicit
servicesExecutionContext: ExecutionContext
): ConflictCheckingLedgerBridge =
@ -80,6 +82,7 @@ private[bridge] object ConflictCheckingLedgerBridge {
validatePartyAllocation = validatePartyAllocation,
bridgeMetrics = bridgeMetrics,
errorFactories = errorFactories,
maxDeduplicationDuration = maxDeduplicationDuration,
),
servicesThreadPoolSize = servicesThreadPoolSize,
)

View File

@ -0,0 +1,68 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.sandbox.bridge.validate
import com.daml.ledger.participant.state.v2.ChangeId
import com.daml.ledger.sandbox.bridge.BridgeMetrics
import com.daml.ledger.sandbox.bridge.validate.DeduplicationState.DeduplicationQueue
import com.daml.lf.data.Time
import java.time.Duration
import scala.collection.immutable.VectorMap
case class DeduplicationState private (
private[validate] val deduplicationQueue: DeduplicationQueue,
private val maxDeduplicationDuration: Duration,
private val currentTime: () => Time.Timestamp,
private val bridgeMetrics: BridgeMetrics,
) {
def deduplicate(
changeId: ChangeId,
commandDeduplicationDuration: Duration,
): (DeduplicationState, Boolean) = {
bridgeMetrics.SequencerState.deduplicationQueueLength.update(deduplicationQueue.size)
if (commandDeduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
throw new RuntimeException(
s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
)
else {
val now = currentTime()
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, now)
val queueAfterEvictions = deduplicationQueue.dropWhile(_._2 <= expiredTimestamp)
val isDuplicateChangeId = queueAfterEvictions
.get(changeId)
.exists(_ > expiredThreshold(commandDeduplicationDuration, now))
if (isDuplicateChangeId)
copy(deduplicationQueue = queueAfterEvictions) -> true
else
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, now)) -> false
}
}
private def expiredThreshold(
deduplicationDuration: Duration,
now: Time.Timestamp,
): Time.Timestamp =
now.subtract(deduplicationDuration)
}
object DeduplicationState {
private[sandbox] type DeduplicationQueue = VectorMap[ChangeId, Time.Timestamp]
private[validate] def empty(
deduplicationDuration: Duration,
currentTime: () => Time.Timestamp,
bridgeMetrics: BridgeMetrics,
): DeduplicationState =
DeduplicationState(
deduplicationQueue = VectorMap.empty,
maxDeduplicationDuration = deduplicationDuration,
currentTime = currentTime,
bridgeMetrics = bridgeMetrics,
)
}

View File

@ -5,22 +5,22 @@ package com.daml.ledger.sandbox.bridge.validate
import com.daml.api.util.TimeProvider
import com.daml.error.ContextualizedErrorLogger
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{CompletionInfo, Update}
import com.daml.ledger.sandbox.bridge.LedgerBridge._
import com.daml.ledger.sandbox.bridge.LedgerBridge.{fromOffset, toOffset}
import SequencerState.LastUpdatedAt
import com.daml.ledger.participant.state.v2.{ChangeId, CompletionInfo, Update}
import com.daml.ledger.sandbox.bridge.LedgerBridge.{fromOffset, toOffset, _}
import com.daml.ledger.sandbox.bridge._
import com.daml.ledger.sandbox.bridge.validate.ConflictCheckingLedgerBridge.{
Sequence,
Validation,
_,
}
import com.daml.ledger.sandbox.bridge.validate.SequencerState.LastUpdatedAt
import com.daml.ledger.sandbox.domain.Rejection._
import com.daml.ledger.sandbox.domain.Submission.{AllocateParty, Config, Transaction}
import com.daml.ledger.sandbox.domain._
import com.daml.lf.data.Ref
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.data.Ref.SubmissionId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.{Transaction => LfTransaction}
@ -29,6 +29,7 @@ import com.daml.metrics.Timed
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.events._
import java.time.Duration
import scala.util.chaining._
/** Conflict checking with the in-flight commands,
@ -43,6 +44,8 @@ private[validate] class SequenceImpl(
validatePartyAllocation: Boolean,
bridgeMetrics: BridgeMetrics,
errorFactories: ErrorFactories,
maxDeduplicationDuration: Duration,
wallClockTime: () => Time.Timestamp = () => Timestamp.now(),
) extends Sequence {
private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
@ -50,6 +53,8 @@ private[validate] class SequenceImpl(
@volatile private[validate] var sequencerState = SequencerState.empty(bridgeMetrics)
@volatile private[validate] var allocatedParties = initialAllocatedParties
@volatile private[validate] var ledgerConfiguration = initialLedgerConfiguration
@volatile private[validate] var deduplicationState =
DeduplicationState.empty(maxDeduplicationDuration, wallClockTime, bridgeMetrics)
override def apply(): Validation[(Offset, PreparedSubmission)] => Iterable[(Offset, Update)] =
in => {
@ -86,7 +91,6 @@ private[validate] class SequenceImpl(
case s: Submission.UploadPackages =>
Some(packageUploadSuccess(s, timeProvider.getCurrentTimestamp))
case _: Submission.Transaction =>
// TODO SoX: Handle gracefully
throw new RuntimeException("Unexpected Submission.Transaction")
}
@ -147,15 +151,19 @@ private[validate] class SequenceImpl(
txSubmission: PreparedTransactionSubmission,
) = {
val submitterInfo = txSubmission.submission.submitterInfo
val completionInfo = submitterInfo.toCompletionInfo()
withErrorLogger(submitterInfo.submissionId) { implicit errorLogger =>
for {
_ <- checkTimeModel(txSubmission.submission, recordTime, ledgerConfiguration)
completionInfo = submitterInfo.toCompletionInfo()
_ <- checkTimeModel(
transaction = txSubmission.submission,
recordTime = recordTime,
ledgerConfiguration = ledgerConfiguration,
)
_ <- validateParties(
allocatedParties,
txSubmission.transactionInformees,
completionInfo,
allocatedParties = allocatedParties,
transactionInformees = txSubmission.transactionInformees,
completionInfo = completionInfo,
)
_ <- conflictCheckWithInFlight(
keysState = sequencerState.keyState,
@ -164,6 +172,15 @@ private[validate] class SequenceImpl(
inputContracts = txSubmission.inputContracts,
completionInfo = completionInfo,
)
_ <- deduplicateAndUpdateState(
changeId = ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
),
deduplicationPeriod = txSubmission.submission.submitterInfo.deduplicationPeriod,
completionInfo = completionInfo,
)
} yield ()
}(txSubmission.submission.loggingContext, logger)
.fold(
@ -172,7 +189,11 @@ private[validate] class SequenceImpl(
// Update the sequencer state
sequencerState = sequencerState
.dequeue(noConflictUpTo)
.enqueue(newOffset, txSubmission.updatedKeys, txSubmission.consumedContracts)
.enqueue(
newOffset,
txSubmission.updatedKeys,
txSubmission.consumedContracts,
)
transactionAccepted(
txSubmission.submission,
@ -219,6 +240,36 @@ private[validate] class SequenceImpl(
}
}
private def deduplicateAndUpdateState(
changeId: ChangeId,
deduplicationPeriod: DeduplicationPeriod,
completionInfo: CompletionInfo,
)(implicit
errorLogger: ContextualizedErrorLogger
): Validation[Unit] =
deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(commandDeduplicationDuration) =>
val (newDeduplicationState, isDuplicate) =
deduplicationState.deduplicate(changeId, commandDeduplicationDuration)
deduplicationState = newDeduplicationState
Either.cond(
!isDuplicate,
(),
DuplicateCommand(changeId, completionInfo),
)
case _: DeduplicationPeriod.DeduplicationOffset =>
Left(
Rejection
.LedgerBridgeInternalError(
new RuntimeException(
"Deduplication offset periods are not supported in Sandbox-on-X ledger bridge"
),
completionInfo,
)
)
}
private def validateParties(
allocatedParties: Set[Ref.Party],
transactionInformees: Set[Ref.Party],

View File

@ -4,7 +4,7 @@
package com.daml
package ledger.sandbox.domain
import com.daml.ledger.participant.state.v2.{CompletionInfo, Update}
import com.daml.ledger.participant.state.v2.{ChangeId, CompletionInfo, Update}
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import error.ContextualizedErrorLogger
import error.definitions.LedgerApiErrors
@ -18,6 +18,8 @@ import com.google.protobuf.any.Any
import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import java.time.Duration
private[sandbox] sealed trait Rejection extends Product with Serializable {
def toStatus: Status
def completionInfo: CompletionInfo
@ -59,7 +61,7 @@ private[sandbox] object Rejection {
) extends Rejection {
override def toStatus: Status = LedgerApiErrors.InternalError
.UnexpectedOrUnknownException(_err)
.rpcStatus(None)
.rpcStatus(completionInfo.submissionId)
}
final case class TransactionInternallyInconsistentKey(
@ -118,6 +120,34 @@ private[sandbox] object Rejection {
errorFactories.CommandRejections.partiesNotKnownToLedger(unallocatedParties)
}
final case class DuplicateCommand(
changeId: ChangeId,
completionInfo: CompletionInfo,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
) extends Rejection {
override def toStatus: Status =
LedgerApiErrors.ConsistencyErrors.DuplicateCommand
.Reject(_definiteAnswer = false, None, Some(changeId))
.rpcStatus(completionInfo.submissionId)
}
final case class MaxDeduplicationDurationExceeded(
duration: Duration,
maxDeduplicationDuration: Duration,
completionInfo: CompletionInfo,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
) extends Rejection {
override def toStatus: Status =
LedgerApiErrors.RequestValidation.InvalidDeduplicationPeriodField
.Reject(
s"The given deduplication duration of $duration exceeds the maximum deduplication time of $maxDeduplicationDuration",
Some(maxDeduplicationDuration),
)
.rpcStatus(None)
}
final case class NoLedgerConfiguration(
completionInfo: CompletionInfo,
errorFactories: ErrorFactories,

View File

@ -0,0 +1,119 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.sandbox.bridge.validate
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v2.ChangeId
import com.daml.ledger.sandbox.bridge.BridgeMetrics
import com.daml.lf.data.{Ref, Time}
import com.daml.metrics.Metrics
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.time.Duration
import scala.collection.immutable.VectorMap
import scala.util.{Failure, Success, Try}
import scala.util.chaining._
class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
behavior of classOf[DeduplicationState].getSimpleName
private val initialTime = Time.Timestamp.now()
private val bridgeMetrics = new BridgeMetrics(new Metrics(new MetricRegistry))
it should "deduplicate commands within the requested deduplication window" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(3L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
isDuplicate shouldBe true
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime.add(Duration.ofMinutes(2))
)
isDuplicate shouldBe false
}
}
it should "evicts old entries (older than max deduplication time)" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(2L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(1L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(2), Duration.ofMinutes(1L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime,
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(3), Duration.ofMinutes(1L))
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
changeId(3) -> initialTime.add(Duration.ofMinutes(2)),
)
isDuplicate shouldBe false
}
}
it should "throw an exception on too big requested deduplication duration" in {
val maxDeduplicationDuration = Duration.ofMinutes(2L)
val commandDeduplicationDuration = maxDeduplicationDuration.plus(Duration.ofSeconds(1L))
Try(
DeduplicationState
.empty(
deduplicationDuration = maxDeduplicationDuration,
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
.deduplicate(changeId(1337), commandDeduplicationDuration)
) match {
case Failure(ex) =>
ex.getMessage shouldBe s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
case Success(_) => fail("It should throw an exception on invalid deduplication durations")
}
}
// Current time provider mock builder.
// On each call, the mock advances the time by 1 minute
private def currentTimeMock: () => Time.Timestamp = {
var currentTime = initialTime
() => currentTime.tap(_ => currentTime = currentTime.add(Duration.ofMinutes(1L)))
}
private def changeId(idx: Int): ChangeId = ChangeId(
applicationId = Ref.ApplicationId.assertFromString("some-app"),
commandId = Ref.CommandId.assertFromString(s"some-command-$idx"),
actAs = Set.empty,
)
}

View File

@ -112,7 +112,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
sequenceImpl.ledgerConfiguration shouldBe Some(config)
}
it should "reject a transaction on time model violation" in new TestContext {
it should "fail on time model check" in new TestContext {
val Seq((offset, update)) = sequence(input(lateSubmission))
offset shouldBe toOffset(1L)
@ -123,7 +123,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
)
}
it should "reject a transaction on unallocated transaction informees" in new TestContext {
it should "fail on unallocated transaction informees" in new TestContext {
val Seq((offset, update)) = sequence(input(txWithUnallocatedParty))
offset shouldBe toOffset(1L)
assertCommandRejected(update, "Parties not known on ledger: [new-guy]")
@ -153,7 +153,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
update3 shouldBe transactionAccepted(3)
}
it should "not validate party allocation if disabled" in new TestContext {
it should "validate party allocation if disabled" in new TestContext {
val Seq((offset, update)) =
sequenceWithoutPartyAllocationValidation()(input(txWithUnallocatedParty))
offset shouldBe toOffset(1L)
@ -229,6 +229,46 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
sequence(Left(rejectionMock)) shouldBe Iterable(toOffset(1L) -> commandRejectedUpdateMock)
}
it should "deduplicate commands" in new TestContext {
// Command with non-zero deduplication period
private val initialSubmission = create(cId(1))
private val deduplicationPeriod: DeduplicationPeriod.DeduplicationDuration =
DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(1L))
private val submissionWithDedupPeriod = create(
cId(1),
transactionSubmission =
tx.copy(submitterInfo = tx.submitterInfo.copy(deduplicationPeriod = deduplicationPeriod)),
)
val Seq((offset1, update1)) = sequence(initialSubmission)
offset1 shouldBe toOffset(1L)
update1 shouldBe transactionAccepted(1)
// Assert duplicate command rejected
val Seq((offset2, update2)) = sequence(submissionWithDedupPeriod)
offset2 shouldBe toOffset(2L)
assertCommandRejected(
update2,
"A command with the given command id has already been successfully processed",
deduplicationPeriod,
)
// Advance record time past the deduplication period
private val newRecordTime: Timestamp = currentRecordTime.add(deduplicationPeriod.duration)
when(timeProviderMock.getCurrentTimestamp).thenReturn(newRecordTime)
// Assert command is accepted
val Seq((offset3, update3)) = sequence(submissionWithDedupPeriod)
offset3 shouldBe toOffset(3L)
update3 shouldBe transactionAccepted(
txId = 3,
completionInfo = completionInfo.copy(optDeduplicationPeriod = Some(deduplicationPeriod)),
recordTime = newRecordTime,
)
}
private trait TestContext extends FixtureContext {
private val bridgeMetrics = new BridgeMetrics(new Metrics(new MetricRegistry))
val timeProviderMock: TimeProvider = mock[TimeProvider]
@ -238,11 +278,16 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
val allocatedInformees: Set[IdString.Party] =
(1 to 3).map(idx => s"party-$idx").map(Ref.Party.assertFromString).toSet
val initialConfig: Configuration = Configuration(
private val initialLedgerConfiguration: Some[Configuration] = Some(
Configuration(
generation = 0L,
timeModel = LedgerTimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofSeconds(60L),
)
)
val maxDeduplicationDuration: Duration = Duration.ofDays(1L)
val sequenceImpl: SequenceImpl = buildSequence()
val sequenceWithoutPartyAllocationValidation: SequenceImpl =
buildSequence(validatePartyAllocation = false)
@ -252,13 +297,16 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
val currentRecordTime: Time.Timestamp = Time.Timestamp.assertFromLong(1000L)
when(timeProviderMock.getCurrentTimestamp).thenReturn(currentRecordTime)
private val zeroDeduplicationPeriod: DeduplicationPeriod.DeduplicationDuration =
DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(0L))
// Transaction submission mocks
val submitterInfo: SubmitterInfo = SubmitterInfo(
actAs = List.empty,
readAs = List.empty,
applicationId = Ref.ApplicationId.assertFromString("applicationId"),
commandId = Ref.CommandId.assertFromString("commandId"),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(0L)),
deduplicationPeriod = zeroDeduplicationPeriod,
submissionId = Some(submissionId),
ledgerConfiguration =
Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(0L)),
@ -384,7 +432,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
def buildSequence(
validatePartyAllocation: Boolean = true,
initialLedgerConfiguration: Option[Configuration] = Some(initialConfig),
initialLedgerConfiguration: Option[Configuration] = initialLedgerConfiguration,
) = new SequenceImpl(
participantId = Ref.ParticipantId.assertFromString(participantName),
bridgeMetrics = bridgeMetrics,
@ -394,6 +442,8 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
initialLedgerEnd = Offset.beforeBegin,
initialAllocatedParties = allocatedInformees,
initialLedgerConfiguration = initialLedgerConfiguration,
maxDeduplicationDuration = maxDeduplicationDuration,
wallClockTime = () => timeProviderMock.getCurrentTimestamp,
)
def exerciseNonConsuming(
@ -417,22 +467,33 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
input(preparedTransactionSubmission)
}
def transactionAccepted(txId: Int): Update.TransactionAccepted =
def transactionAccepted(
txId: Int,
completionInfo: CompletionInfo = completionInfo,
recordTime: Time.Timestamp = currentRecordTime,
): Update.TransactionAccepted =
Update.TransactionAccepted(
optCompletionInfo = Some(completionInfo),
transactionMeta = transactionMeta,
transaction = CommittedTransaction(txMock),
transactionId = Ref.TransactionId.assertFromString(txId.toString),
recordTime = currentRecordTime,
recordTime = recordTime,
divulgedContracts = List.empty,
blindingInfo = None,
)
def assertCommandRejected(update: Update, reason: String): Assertion = update match {
def assertCommandRejected(
update: Update,
reason: String,
deduplicationPeriod: DeduplicationPeriod = zeroDeduplicationPeriod,
): Assertion = update match {
case rejection: Update.CommandRejected =>
rejection.recordTime shouldBe currentRecordTime
// Transaction statistics are not populated for rejections
rejection.completionInfo shouldBe completionInfo.copy(statistics = None)
rejection.completionInfo shouldBe completionInfo.copy(
statistics = None,
optDeduplicationPeriod = Some(deduplicationPeriod),
)
// TODO SoX: Assert error codes
rejection.reasonTemplate.message should include(reason)
case noMatch => fail(s"Expectation mismatch on expected CommandRejected: $noMatch")