Remove participant-side command deduplication [DPP-848] (#12677)

* Remove participant-side command deduplication

changelog_begin
changelog_end

* Addressed review comments
This commit is contained in:
tudor-da 2022-02-01 21:50:25 +01:00 committed by GitHub
parent 85f269004c
commit 5390505627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 47 additions and 1204 deletions

View File

@ -31,8 +31,6 @@ final class Metrics(val registry: MetricRegistry) {
val failedCommandInterpretations: Meter =
registry.meter(Prefix :+ "failed_command_interpretations")
val deduplicatedCommands: Meter =
registry.meter(Prefix :+ "deduplicated_commands")
val delayedSubmissions: Meter =
registry.meter(Prefix :+ "delayed_submissions")
val validSubmissions: Meter =
@ -367,11 +365,6 @@ final class Metrics(val registry: MetricRegistry) {
val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages")
val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive")
val getLfPackage: Timer = registry.timer(Prefix :+ "get_lf_package")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val removeExpiredDeduplicationData: Timer =
registry.timer(Prefix :+ "remove_expired_deduplication_data")
val stopDeduplicatingCommand: Timer =
registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")
@ -429,11 +422,6 @@ final class Metrics(val registry: MetricRegistry) {
val listKnownParties: Timer = registry.timer(Prefix :+ "list_known_parties")
val listLfPackages: Timer = registry.timer(Prefix :+ "list_lf_packages")
val getLfArchive: Timer = registry.timer(Prefix :+ "get_lf_archive")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val removeExpiredDeduplicationData: Timer =
registry.timer(Prefix :+ "remove_expired_deduplication_data")
val stopDeduplicatingCommand: Timer =
registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
private val createDbMetrics: String => DatabaseMetrics =
@ -497,15 +485,6 @@ final class Metrics(val registry: MetricRegistry) {
"store_package_entry"
) // FIXME Base name conflicts with storePackageEntry
val loadPackageEntries: DatabaseMetrics = createDbMetrics("load_package_entries")
val deduplicateCommandDbMetrics: DatabaseMetrics = createDbMetrics(
"deduplicate_command"
) // FIXME Base name conflicts with deduplicateCommand
val removeExpiredDeduplicationDataDbMetrics: DatabaseMetrics = createDbMetrics(
"remove_expired_deduplication_data"
) // FIXME Base name conflicts with removeExpiredDeduplicationData
val stopDeduplicatingCommandDbMetrics: DatabaseMetrics = createDbMetrics(
"stop_deduplicating_command"
) // FIXME Base name conflicts with stopDeduplicatingCommand
val pruneDbMetrics: DatabaseMetrics = createDbMetrics(
"prune"
) // FIXME Base name conflicts with prune
@ -679,8 +658,6 @@ final class Metrics(val registry: MetricRegistry) {
val partyEntries: Timer = registry.timer(Prefix :+ "party_entries")
val lookupConfiguration: Timer = registry.timer(Prefix :+ "lookup_configuration")
val configurationEntries: Timer = registry.timer(Prefix :+ "configuration_entries")
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")

View File

@ -1 +1 @@
5cfddb9932a7ef4ea3ba7439cf9861110181fdfd97709df0fd21af8a89e9ce76
005e1b4b85ad740cbf402f7d95eb88a15baefbe8d0c3d885639ee9a907399193

View File

@ -124,14 +124,6 @@ CREATE INDEX idx_party_entries ON party_entries (submission_id);
CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset);
CREATE INDEX idx_party_entries_party_id_and_ledger_offset ON party_entries(party_id, ledger_offset);
---------------------------------------------------------------------------------------------------
-- Submissions table
---------------------------------------------------------------------------------------------------
CREATE TABLE participant_command_submissions (
deduplication_key VARCHAR PRIMARY KEY NOT NULL,
deduplicate_until BIGINT NOT NULL
);
---------------------------------------------------------------------------------------------------
-- Completions table
---------------------------------------------------------------------------------------------------

View File

@ -0,0 +1 @@
f0518b9fdf84752d0b47a57aa9725a2b4ef820bd44ba79f2a82482d089e1c5fc

View File

@ -0,0 +1,5 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- Participant-side deduplication not supported anymore
DROP TABLE participant_command_submissions PURGE;

View File

@ -0,0 +1 @@
e61171ce37fbf7e81ef7c24bd4773bd5ca0536109f5048e9140f69f80dc893f8

View File

@ -0,0 +1,5 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- Participant-side deduplication not supported anymore
DROP TABLE participant_command_submissions;

View File

@ -35,7 +35,6 @@ import com.daml.platform.configuration.{
CommandConfiguration,
InitialLedgerConfiguration,
PartyConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.server.api.services.domain.CommandCompletionService
import com.daml.platform.server.api.services.grpc.{GrpcHealthService, GrpcTransactionService}
@ -79,7 +78,6 @@ private[daml] object ApiServices {
initialLedgerConfiguration: Option[InitialLedgerConfiguration],
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optTimeServiceBackend: Option[TimeServiceBackend],
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
@ -104,7 +102,6 @@ private[daml] object ApiServices {
private val completionsService: IndexCompletionsService = indexService
private val partyManagementService: IndexPartyManagementService = indexService
private val configManagementService: IndexConfigManagementService = indexService
private val submissionService: IndexSubmissionService = indexService
private val meteringStore: MeteringStore = indexService
private val configurationInitializer = new LedgerConfigurationInitializer(
@ -260,7 +257,6 @@ private[daml] object ApiServices {
val apiSubmissionService = ApiSubmissionService.create(
ledgerId,
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
@ -269,8 +265,7 @@ private[daml] object ApiServices {
commandExecutor,
checkOverloaded,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
submissionConfig.enableDeduplication,
partyConfig.implicitPartyAllocation
),
metrics,
errorsVersionsSwitcher,

View File

@ -6,13 +6,7 @@ package com.daml.platform.apiserver
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain.{
CommandId,
ConfigurationEntry,
LedgerId,
LedgerOffset,
TransactionId,
}
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -181,20 +175,6 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In
): Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed] =
delegate.configurationEntries(startExclusive)
override def deduplicateCommand(
commandId: CommandId,
submitter: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] =
delegate.deduplicateCommand(commandId, submitter, submittedAt, deduplicateUntil)
override def stopDeduplicatingCommand(
commandId: CommandId,
submitter: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
delegate.stopDeduplicatingCommand(commandId, submitter)
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =

View File

@ -21,11 +21,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.engine.Engine
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.{
CommandConfiguration,
PartyConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.configuration.{CommandConfiguration, PartyConfiguration}
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.usermanagement.UserManagementConfig
import com.daml.ports.{Port, PortFiles}
@ -47,7 +43,6 @@ object StandaloneApiServer {
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optWriteService: Option[state.WriteService],
authService: AuthService,
healthChecks: HealthChecks,
@ -112,7 +107,6 @@ object StandaloneApiServer {
initialLedgerConfiguration = config.initialLedgerConfiguration,
commandConfig = commandConfig,
partyConfig = partyConfig,
submissionConfig = submissionConfig,
optTimeServiceBackend = timeServiceBackend,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,

View File

@ -7,13 +7,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{
CommandId,
ConfigurationEntry,
LedgerId,
LedgerOffset,
TransactionId,
}
import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerId, LedgerOffset, TransactionId}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -202,26 +196,6 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met
delegate.configurationEntries(startExclusive),
)
override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[v2.CommandDeduplicationResult] =
Timed.future(
metrics.daml.services.index.deduplicateCommand,
delegate.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil),
)
override def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
Timed.future(
metrics.daml.services.index.stopDeduplicateCommand,
delegate.stopDeduplicatingCommand(commandId, submitters),
)
override def prune(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,

View File

@ -17,7 +17,7 @@ import com.daml.error.{
}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
@ -45,7 +45,6 @@ import io.grpc.{Status, StatusRuntimeException}
import scala.annotation.nowarn
import scala.jdk.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
private[apiserver] object ApiSubmissionService {
@ -53,7 +52,6 @@ private[apiserver] object ApiSubmissionService {
def create(
ledgerId: LedgerId,
writeService: state.WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
@ -71,7 +69,6 @@ private[apiserver] object ApiSubmissionService {
new GrpcCommandSubmissionService(
service = new ApiSubmissionService(
writeService,
submissionService,
partyManagementService,
timeProvider,
timeProviderType,
@ -94,15 +91,13 @@ private[apiserver] object ApiSubmissionService {
)
final case class Configuration(
implicitPartyAllocation: Boolean,
enableDeduplication: Boolean,
implicitPartyAllocation: Boolean
)
}
private[apiserver] final class ApiSubmissionService private[services] (
writeService: state.WriteService,
submissionService: IndexSubmissionService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
@ -137,16 +132,8 @@ private[apiserver] final class ApiSubmissionService private[services] (
val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
if (writeService.isApiDeduplicationEnabled && configuration.enableDeduplication) {
deduplicateAndRecordOnLedger(
seedService.nextSeed(),
request.commands,
ledgerConfiguration,
)
} else {
evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration)
.transform(handleSubmissionResult)
}
evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration)
.transform(handleSubmissionResult)
case None =>
Future.failed(
errorFactories.missingLedgerConfig(Status.Code.UNAVAILABLE)(definiteAnswer =
@ -157,47 +144,6 @@ private[apiserver] final class ApiSubmissionService private[services] (
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
}
private def deduplicateAndRecordOnLedger(
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
Future
.fromTry(
DeduplicationPeriod.deduplicateUntil(
commands.submittedAt,
commands.deduplicationPeriod,
)
)
.flatMap(deduplicateUntil =>
submissionService
.deduplicateCommand(
commands.commandId,
commands.actAs.toList,
commands.submittedAt,
deduplicateUntil,
)
.flatMap {
case CommandDeduplicationNew =>
evaluateAndSubmit(seed, commands, ledgerConfig)
.transform(handleSubmissionResult)
.recoverWith { case NonFatal(originalCause) =>
submissionService
.stopDeduplicatingCommand(commands.commandId, commands.actAs.toList)
.transform(_ => Failure(originalCause))
}
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(None)
)
}
)
private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit
loggingContext: LoggingContext
): Try[Unit] = {
@ -370,5 +316,4 @@ private[apiserver] final class ApiSubmissionService private[services] (
)
override def close(): Unit = ()
}

View File

@ -1,23 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.configuration
/** Configuration of the Ledger API Command Submission Service
* @param enableDeduplication
* Specifies whether the participant-side command deduplication should be turned on
* when available. By default, the command deduplication is turned on. However, on
* ledgers where the deduplication is implemented by the committer this parameter
* has no impact.
*/
case class SubmissionConfiguration(
enableDeduplication: Boolean
)
object SubmissionConfiguration {
lazy val default: SubmissionConfiguration =
SubmissionConfiguration(
enableDeduplication = true
)
}

View File

@ -10,7 +10,6 @@ import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.ConfigurationEntry.Accepted
import com.daml.ledger.api.domain.{
CommandId,
LedgerId,
LedgerOffset,
PackageEntry,
@ -321,21 +320,6 @@ private[platform] final class LedgerBackedIndexService(
toAbsolute(offset) -> config.toDomain
})
/** Deduplicate commands */
override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] =
ledger.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil)
override def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
ledger.stopDeduplicatingCommand(commandId, submitters)
/** Participant pruning command */
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext

View File

@ -6,7 +6,7 @@ package com.daml.platform.index
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.daml.ledger.api.domain.{LedgerId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -18,11 +18,8 @@ import com.daml.ledger.api.v1.transaction_service.{
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationResult,
MeteringStore,
PackageDetails,
}
import com.daml.ledger.participant.state.index.v2.MeteringStore
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
@ -167,34 +164,6 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M
)(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed] =
ledger.configurationEntries(startExclusive)
override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] =
Timed.future(
metrics.daml.index.deduplicateCommand,
ledger.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil),
)
override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit
loggingContext: LoggingContext
): Future[Unit] =
Timed.future(
metrics.daml.index.removeExpiredDeduplicationData,
ledger.removeExpiredDeduplicationData(currentTime),
)
override def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] =
Timed.future(
metrics.daml.index.stopDeduplicatingCommand,
ledger.stopDeduplicatingCommand(commandId, submitters),
)
override def prune(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,

View File

@ -3,10 +3,7 @@
package com.daml.platform.index
import akka.Done
import akka.actor.Cancellable
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.daml.error.definitions.IndexErrors.IndexDbException
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthStatus
@ -14,7 +11,6 @@ import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.ContractStore
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.ValueEnricher
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
@ -38,7 +34,7 @@ import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future}
private[platform] object ReadOnlySqlLedger {
@ -192,8 +188,7 @@ private[index] abstract class ReadOnlySqlLedger(
contractStore: ContractStore,
pruneBuffers: PruneBuffers,
dispatcher: Dispatcher[Offset],
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends BaseLedger(
) extends BaseLedger(
ledgerId,
ledgerDao,
ledgerDaoTransactionsReader,
@ -201,31 +196,5 @@ private[index] abstract class ReadOnlySqlLedger(
pruneBuffers,
dispatcher,
) {
// Periodically remove all expired deduplication cache entries.
// The current approach is not ideal for multiple ReadOnlySqlLedgers sharing
// the same database (as is the case for a horizontally scaled ledger API server).
// In that case, an external process periodically clearing expired entries would be better.
//
// Deduplication entries are added by the submission service, which might use
// a different clock than the current clock (e.g., horizontally scaled ledger API server).
// This is not an issue, because applications are not expected to submit towards the end
// of the deduplication time window.
private val (deduplicationCleanupKillSwitch, deduplicationCleanupDone) =
Source
.tick[Unit](0.millis, 10.minutes, ())
.mapAsync[Unit](1)(_ => ledgerDao.removeExpiredDeduplicationData(Timestamp.now()))
.viaMat(KillSwitches.single)(Keep.right[Cancellable, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.both[UniqueKillSwitch, Future[Done]])
.run()
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
override def close(): Unit = {
deduplicationCleanupKillSwitch.shutdown()
Await.result(deduplicationCleanupDone, 10.seconds)
super.close()
}
}

View File

@ -7,7 +7,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{CommandId, LedgerId}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -20,7 +20,7 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, ContractStore}
import com.daml.ledger.participant.state.index.v2.ContractStore
import com.daml.lf.archive.Decode
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
@ -180,24 +180,6 @@ private[platform] abstract class BaseLedger(
): Source[(Offset, ConfigurationEntry), NotUsed] =
dispatcher.startingAt(startExclusive, RangeSource(ledgerDao.getConfigurationEntries))
override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] =
ledgerDao.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil)
override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit
loggingContext: LoggingContext
): Future[Unit] =
ledgerDao.removeExpiredDeduplicationData(currentTime)
override def stopDeduplicatingCommand(commandId: CommandId, submitters: List[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[Unit] =
ledgerDao.stopDeduplicatingCommand(commandId, submitters)
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] = {

View File

@ -6,7 +6,7 @@ package com.daml.platform.store
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, PartyDetails}
import com.daml.ledger.api.domain.{LedgerId, PartyDetails}
import com.daml.ledger.api.health.ReportsHealth
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.language.Ast
@ -125,46 +125,6 @@ private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable
startExclusive: Offset
)(implicit loggingContext: LoggingContext): Source[(Offset, ConfigurationEntry), NotUsed]
/** Deduplicates commands.
* Returns CommandDeduplicationNew if this is the first time the command is submitted
* Returns CommandDeduplicationDuplicate if the command was submitted before
*
* Note: The deduplication cache is used by the submission service,
* it does not modify any on-ledger data.
*/
def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult]
/** Stops deduplicating the given command.
*
* Note: The deduplication cache is used by the submission service,
* it does not modify any on-ledger data.
*/
def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit]
/** Remove all expired deduplication entries. This method has to be called
* periodically to ensure that the deduplication cache does not grow unboundedly.
*
* @param currentTime The current time. This should use the same source of time as
* the `deduplicateUntil` argument of [[deduplicateCommand]].
* @return when DAO has finished removing expired entries. Clients do not
* need to wait for the operation to finish, it is safe to concurrently
* call deduplicateCommand().
*
* Note: The deduplication cache is used by the submission service,
* it does not modify any on-ledger data.
*/
def removeExpiredDeduplicationData(
currentTime: Timestamp
)(implicit loggingContext: LoggingContext): Future[Unit]
/** Performs participant ledger pruning up to and including the specified offset.
*/
def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit

View File

@ -1,23 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.appendonlydao
import com.daml.ledger.api.domain
import com.daml.lf.data.Ref
import java.security.MessageDigest
import scalaz.syntax.tag._
// TODO append-only: move to store
object DeduplicationKeyMaker {
def make(commandId: domain.CommandId, submitters: List[Ref.Party]): String =
commandId.unwrap + "%" + hashSubmitters(submitters.sorted(Ordering.String).distinct)
private def hashSubmitters(submitters: List[Ref.Party]): String = {
MessageDigest
.getInstance("SHA-256")
.digest(submitters.mkString.getBytes)
.mkString
}
}

View File

@ -7,18 +7,12 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationDuplicate,
CommandDeduplicationNew,
CommandDeduplicationResult,
PackageDetails,
}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.archive.ArchiveParser
import com.daml.lf.data.Ref
@ -34,11 +28,7 @@ import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store._
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.{
DeduplicationStorageBackend,
ParameterStorageBackend,
ReadStorageBackend,
}
import com.daml.platform.store.backend.{ParameterStorageBackend, ReadStorageBackend}
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interning.StringInterning
@ -64,7 +54,6 @@ private class JdbcLedgerDao(
participantId: Ref.ParticipantId,
readStorageBackend: ReadStorageBackend,
parameterStorageBackend: ParameterStorageBackend,
deduplicationStorageBackend: DeduplicationStorageBackend,
errorFactories: ErrorFactories,
materializer: Materializer,
) extends LedgerDao {
@ -366,47 +355,6 @@ private class JdbcLedgerDao(
}
}
override def deduplicateCommand(
commandId: domain.CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] =
dbDispatcher.executeSql(metrics.daml.index.db.deduplicateCommandDbMetrics) { conn =>
val key = DeduplicationKeyMaker.make(commandId, submitters)
// Insert a new deduplication entry, or update an expired entry
val updated = deduplicationStorageBackend.upsertDeduplicationEntry(
key = key,
submittedAt = submittedAt,
deduplicateUntil = deduplicateUntil,
)(conn)
if (updated == 1) {
// New row inserted, this is the first time the command is submitted
CommandDeduplicationNew
} else {
// Deduplication row already exists
CommandDeduplicationDuplicate(deduplicationStorageBackend.deduplicatedUntil(key)(conn))
}
}
override def removeExpiredDeduplicationData(
currentTime: Timestamp
)(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.removeExpiredDeduplicationDataDbMetrics)(
deduplicationStorageBackend.removeExpiredDeduplicationData(currentTime)
)
override def stopDeduplicatingCommand(
commandId: domain.CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit] = {
val key = DeduplicationKeyMaker.make(commandId, submitters)
dbDispatcher.executeSql(metrics.daml.index.db.stopDeduplicatingCommandDbMetrics)(
deduplicationStorageBackend.stopDeduplicatingCommand(key)
)
}
/** Prunes the events and command completions tables.
*
* @param pruneUpToInclusive Offset up to which to prune archived history inclusively.
@ -652,7 +600,6 @@ private[platform] object JdbcLedgerDao {
participantId,
dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
dbSupport.storageBackendFactory.createParameterStorageBackend,
dbSupport.storageBackendFactory.createDeduplicationStorageBackend,
errorFactories,
materializer = materializer,
),
@ -697,7 +644,6 @@ private[platform] object JdbcLedgerDao {
participantId,
dbSupport.storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
dbSupport.storageBackendFactory.createParameterStorageBackend,
dbSupport.storageBackendFactory.createDeduplicationStorageBackend,
errorFactories,
materializer = materializer,
),

View File

@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.health.ReportsHealth
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
@ -163,50 +163,6 @@ private[platform] trait LedgerReadDao extends ReportsHealth {
endInclusive: Offset,
)(implicit loggingContext: LoggingContext): Source[(Offset, PackageLedgerEntry), NotUsed]
/** Deduplicates commands.
*
* @param commandId The command Id
* @param submitters The submitting parties
* @param submittedAt The time when the command was submitted
* @param deduplicateUntil The time until which the command should be deduplicated
* @return whether the command is a duplicate or not
*/
def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult]
/** Remove all expired deduplication entries. This method has to be called
* periodically to ensure that the deduplication cache does not grow unboundedly.
*
* @param currentTime The current time. This should use the same source of time as
* the `deduplicateUntil` argument of [[deduplicateCommand]].
*
* @return when DAO has finished removing expired entries. Clients do not
* need to wait for the operation to finish, it is safe to concurrently
* call deduplicateCommand().
*/
def removeExpiredDeduplicationData(
currentTime: Timestamp
)(implicit loggingContext: LoggingContext): Future[Unit]
/** Stops deduplicating the given command. This method should be called after
* a command is rejected by the submission service, or after a transaction is
* rejected by the ledger. Without removing deduplication entries for failed
* commands, applications would have to wait for the end of the (long) deduplication
* window before they could send a retry.
*
* @param commandId The command Id
* @param submitters The submitting parties
* @return
*/
def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit]
/** Prunes participant events and completions in archived history and remembers largest
* pruning offset processed thus far.
*
@ -223,7 +179,6 @@ private[platform] trait LedgerReadDao extends ReportsHealth {
to: Option[Timestamp],
applicationId: Option[Ref.ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]]
}
// TODO sandbox-classic clean-up: This interface and its implementation is only used in the JdbcLedgerDao suite

View File

@ -6,15 +6,12 @@ package com.daml.platform.store.appendonlydao
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.domain.{CommandId, LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationResult,
MeteringStore,
PackageDetails,
}
import com.daml.ledger.participant.state.index.v2.MeteringStore
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
@ -98,33 +95,6 @@ private[platform] class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics:
override val completions: LedgerDaoCommandCompletionsReader = ledgerDao.completions
override def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult] =
Timed.future(
metrics.daml.index.db.deduplicateCommand,
ledgerDao.deduplicateCommand(commandId, submitters, submittedAt, deduplicateUntil),
)
override def removeExpiredDeduplicationData(currentTime: Timestamp)(implicit
loggingContext: LoggingContext
): Future[Unit] =
Timed.future(
metrics.daml.index.db.removeExpiredDeduplicationData,
ledgerDao.removeExpiredDeduplicationData(currentTime),
)
override def stopDeduplicatingCommand(commandId: CommandId, submitters: List[Ref.Party])(implicit
loggingContext: LoggingContext
): Future[Unit] =
Timed.future(
metrics.daml.index.db.stopDeduplicatingCommand,
ledgerDao.stopDeduplicatingCommand(commandId, submitters),
)
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =

View File

@ -133,8 +133,6 @@ object DbDto {
deduplication_start: Option[Long],
) extends DbDto
final case class CommandDeduplication(deduplication_key: String) extends DbDto
final case class StringInterningDto(
internalId: Int,
externalString: String,

View File

@ -165,17 +165,6 @@ trait PackageStorageBackend {
)(connection: Connection): Vector[(Offset, PackageLedgerEntry)]
}
trait DeduplicationStorageBackend {
def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Timestamp
def upsertDeduplicationEntry(
key: String,
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(connection: Connection)(implicit loggingContext: LoggingContext): Int
def removeExpiredDeduplicationData(currentTime: Timestamp)(connection: Connection): Unit
def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit
}
trait CompletionStorageBackend {
def commandCompletions(
startExclusive: Offset,

View File

@ -16,7 +16,6 @@ trait StorageBackendFactory {
def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
def createDeduplicationStorageBackend: DeduplicationStorageBackend
def createCompletionStorageBackend(stringInterning: StringInterning): CompletionStorageBackend
def createContractStorageBackend(
ledgerEndCache: LedgerEndCache,

View File

@ -5,7 +5,6 @@ package com.daml.platform.store.backend
import java.util.UUID
import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset}
import com.daml.ledger.api.domain
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.CompletionInfo
@ -15,7 +14,7 @@ import com.daml.lf.engine.Blinding
import com.daml.lf.ledger.EventId
import com.daml.lf.transaction.Transaction.ChildrenRecursion
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.{DeduplicationKeyMaker, JdbcLedgerDao}
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.daml.platform.store.appendonlydao.events._
object UpdateToDbDto {
@ -34,13 +33,7 @@ object UpdateToDbDto {
rejection_status_message = Some(u.reasonTemplate.message),
rejection_status_details =
Some(StatusDetails.of(u.reasonTemplate.status.details).toByteArray),
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
domain.CommandId(u.completionInfo.commandId),
u.completionInfo.actAs,
)
),
)
)
case u: ConfigurationChanged =>

View File

@ -1,49 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.RowParser
import com.daml.lf.data.Time.Timestamp
import com.daml.platform.store.Conversions.timestampFromMicros
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.DeduplicationStorageBackend
private[backend] trait DeduplicationStorageBackendTemplate extends DeduplicationStorageBackend {
private case class ParsedCommandData(deduplicateUntil: Timestamp)
private val CommandDataParser: RowParser[ParsedCommandData] =
timestampFromMicros("deduplicate_until")
.map(ParsedCommandData)
override def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Timestamp =
SQL"""
select deduplicate_until
from participant_command_submissions
where deduplication_key = $deduplicationKey
"""
.as(CommandDataParser.single)(connection)
.deduplicateUntil
override def removeExpiredDeduplicationData(
currentTime: Timestamp
)(connection: Connection): Unit = {
SQL"""
delete from participant_command_submissions
where deduplicate_until < ${currentTime.micros}
"""
.execute()(connection)
()
}
override def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit = {
SQL"""
delete from participant_command_submissions
where deduplication_key = $deduplicationKey
"""
.execute()(connection)
()
}
}

View File

@ -283,11 +283,6 @@ private[backend] object AppendOnlySchema {
"deduplication_start" -> fieldStrategy.bigintOptional(_ => _.deduplication_start),
)
val commandSubmissionDeletes: Table[DbDto.CommandDeduplication] =
fieldStrategy.delete("participant_command_submissions")(
"deduplication_key" -> fieldStrategy.string(_ => _.deduplication_key)
)
val stringInterningTable: Table[DbDto.StringInterningDto] =
fieldStrategy.insert("string_interning")(
"internal_id" -> fieldStrategy.int(_ => _.internalId),
@ -323,7 +318,6 @@ private[backend] object AppendOnlySchema {
packages.executeUpdate,
partyEntries.executeUpdate,
commandCompletions.executeUpdate,
commandSubmissionDeletes.executeUpdate,
stringInterningTable.executeUpdate,
createFilter.executeUpdate,
transactionMetering.executeUpdate,
@ -350,7 +344,6 @@ private[backend] object AppendOnlySchema {
packages.prepareData(collect[Package], stringInterning),
partyEntries.prepareData(collect[PartyEntry], stringInterning),
commandCompletions.prepareData(collect[CommandCompletion], stringInterning),
commandSubmissionDeletes.prepareData(collect[CommandDeduplication], stringInterning),
stringInterningTable.prepareData(collect[StringInterningDto], stringInterning),
createFilter.prepareData(collect[CreateFilter], stringInterning),
transactionMetering.prepareData(collect[TransactionMetering], stringInterning),

View File

@ -1,49 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.h2
import java.sql.Connection
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate
import scala.util.control.NonFatal
object H2DeduplicationStorageBackend extends DeduplicationStorageBackendTemplate {
private val logger = ContextualizedLogger.get(this.getClass)
override def upsertDeduplicationEntry(
key: String,
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(connection: Connection)(implicit loggingContext: LoggingContext): Int = {
// Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication
// upsert is performed simultaneously from multiple threads, the query fails with
// JdbcSQLIntegrityConstraintViolationException: Unique index or primary key violation
// Simple retry helps
def retry[T](op: => T): T =
try {
op
} catch {
case NonFatal(e) =>
logger.debug(s"Caught exception while upserting a deduplication entry: $e")
op
}
retry(
SQL"""
merge into participant_command_submissions pcs
using dual on deduplication_key = $key
when not matched then
insert (deduplication_key, deduplicate_until)
values ($key, ${deduplicateUntil.micros})
when matched and pcs.deduplicate_until < ${submittedAt.micros} then
update set deduplicate_until=${deduplicateUntil.micros}
"""
.executeUpdate()(connection)
)
}
}

View File

@ -18,7 +18,6 @@ object H2ResetStorageBackend extends ResetStorageBackend {
truncate table package_entries;
truncate table parameters;
truncate table participant_command_completions;
truncate table participant_command_submissions;
truncate table participant_events_divulgence;
truncate table participant_events_create;
truncate table participant_events_consuming_exercise;

View File

@ -14,7 +14,6 @@ import com.daml.platform.store.backend.{
ContractStorageBackend,
DBLockStorageBackend,
DataSourceStorageBackend,
DeduplicationStorageBackend,
EventStorageBackend,
IngestionStorageBackend,
PartyStorageBackend,
@ -32,9 +31,6 @@ object H2StorageBackendFactory extends StorageBackendFactory with CommonStorageB
override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend =
new PartyStorageBackendTemplate(H2QueryStrategy, ledgerEndCache)
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
H2DeduplicationStorageBackend
override def createCompletionStorageBackend(
stringInterning: StringInterning
): CompletionStorageBackend =

View File

@ -1,52 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.oracle
import java.sql.Connection
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate
import scala.util.control.NonFatal
object OracleDeduplicationStorageBackend extends DeduplicationStorageBackendTemplate {
private val logger = ContextualizedLogger.get(this.getClass)
override def upsertDeduplicationEntry(
key: String,
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(connection: Connection)(implicit loggingContext: LoggingContext): Int = {
// Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication
// upsert is performed simultaneously from multiple threads, the query fails with
// SQLIntegrityConstraintViolationException: ORA-00001: unique constraint (INDEXDB.SYS_C007590) violated
// Simple retry helps
def retry[T](op: => T): T =
try {
op
} catch {
case NonFatal(e) =>
logger.debug(s"Caught exception while upserting a deduplication entry: $e")
op
}
retry(
SQL"""
merge into participant_command_submissions pcs
using dual
on (pcs.deduplication_key = $key)
when matched then
update set pcs.deduplicate_until=${deduplicateUntil.micros}
where pcs.deduplicate_until < ${submittedAt.micros}
when not matched then
insert (pcs.deduplication_key, pcs.deduplicate_until)
values ($key, ${deduplicateUntil.micros})
"""
.executeUpdate()(connection)
)
}
}

View File

@ -17,7 +17,6 @@ object OracleResetStorageBackend extends ResetStorageBackend {
"package_entries",
"parameters",
"participant_command_completions",
"participant_command_submissions",
"participant_events_divulgence",
"participant_events_create",
"participant_events_consuming_exercise",

View File

@ -15,7 +15,6 @@ import com.daml.platform.store.backend.{
ContractStorageBackend,
DBLockStorageBackend,
DataSourceStorageBackend,
DeduplicationStorageBackend,
EventStorageBackend,
IngestionStorageBackend,
PartyStorageBackend,
@ -33,9 +32,6 @@ object OracleStorageBackendFactory extends StorageBackendFactory with CommonStor
override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend =
new PartyStorageBackendTemplate(OracleQueryStrategy, ledgerEndCache)
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
OracleDeduplicationStorageBackend
override def createCompletionStorageBackend(
stringInterning: StringInterning
): CompletionStorageBackend =

View File

@ -1,29 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend.postgresql
import java.sql.Connection
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.common.DeduplicationStorageBackendTemplate
object PostgresDeduplicationStorageBackend extends DeduplicationStorageBackendTemplate {
override def upsertDeduplicationEntry(
key: String,
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(connection: Connection)(implicit loggingContext: LoggingContext): Int =
SQL"""
insert into participant_command_submissions as pcs (deduplication_key, deduplicate_until)
values ($key, ${deduplicateUntil.micros})
on conflict (deduplication_key)
do update
set deduplicate_until=${deduplicateUntil.micros}
where pcs.deduplicate_until < ${submittedAt.micros}
"""
.executeUpdate()(connection)
}

View File

@ -17,7 +17,6 @@ object PostgresResetStorageBackend extends ResetStorageBackend {
truncate table package_entries cascade;
truncate table parameters cascade;
truncate table participant_command_completions cascade;
truncate table participant_command_submissions cascade;
truncate table participant_events_divulgence cascade;
truncate table participant_events_create cascade;
truncate table participant_events_consuming_exercise cascade;

View File

@ -15,7 +15,6 @@ import com.daml.platform.store.backend.{
ContractStorageBackend,
DBLockStorageBackend,
DataSourceStorageBackend,
DeduplicationStorageBackend,
EventStorageBackend,
IngestionStorageBackend,
PartyStorageBackend,
@ -35,9 +34,6 @@ object PostgresStorageBackendFactory
override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend =
new PartyStorageBackendTemplate(PostgresQueryStrategy, ledgerEndCache)
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
PostgresDeduplicationStorageBackend
override def createCompletionStorageBackend(
stringInterning: StringInterning
): CompletionStorageBackend =

View File

@ -79,7 +79,6 @@ case class TestBackend(
configuration: ConfigurationStorageBackend,
party: PartyStorageBackend,
packageBackend: PackageStorageBackend,
deduplication: DeduplicationStorageBackend,
completion: CompletionStorageBackend,
contract: ContractStorageBackend,
event: EventStorageBackend,
@ -104,7 +103,6 @@ object TestBackend {
configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache),
party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache),
packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache),
deduplication = storageBackendFactory.createDeduplicationStorageBackend,
completion = storageBackendFactory.createCompletionStorageBackend(stringInterning),
contract =
storageBackendFactory.createContractStorageBackend(ledgerEndCache, stringInterning),

View File

@ -16,7 +16,6 @@ trait StorageBackendSuite
with StorageBackendTestsPruning
with StorageBackendTestsDBLockForSuite
with StorageBackendTestsIntegrity
with StorageBackendTestsDeduplication
with StorageBackendTestsTimestamps
with StorageBackendTestsStringInterning
with StorageBackendTestsUserManagement

View File

@ -1,97 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.backend
import com.daml.lf.data.Time.Timestamp
import org.scalatest.Inside
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
private[backend] trait StorageBackendTestsDeduplication
extends Matchers
with Inside
with StorageBackendSpec {
this: AnyFlatSpec =>
behavior of "DeduplicationStorageBackend"
import StorageBackendTestValues._
it should "only allow one upsertDeduplicationEntry to insert a new entry" in {
val key = "deduplication key"
val submittedAt = Timestamp.assertFromLong(0L)
val deduplicateUntil = submittedAt.addMicros(1000L)
val n = 8
executeSql(backend.parameter.initializeParameters(someIdentityParams))
val insertedRows = executeParallelSql(
Vector.fill(n)(c =>
backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil)(c)
)
)
val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key))
insertedRows.count(_ == 1) shouldBe 1 // One of the calls inserts a new row
insertedRows.count(_ == 0) shouldBe (n - 1) // All other calls don't write anything
foundDeduplicateUntil shouldBe deduplicateUntil
}
it should "only allow one upsertDeduplicationEntry to update an existing expired entry" in {
val key = "deduplication key"
val submittedAt = Timestamp.assertFromLong(0L)
val deduplicateUntil = submittedAt.addMicros(1000L)
// Second submission is after the deduplication window of the first one
val submittedAt2 = Timestamp.assertFromLong(2000L)
val deduplicateUntil2 = submittedAt2.addMicros(1000L)
val n = 8
executeSql(backend.parameter.initializeParameters(someIdentityParams))
val insertedRows =
executeSql(backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil))
val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key))
val updatedRows = executeParallelSql(
Vector.fill(n)(c =>
backend.deduplication.upsertDeduplicationEntry(
key,
submittedAt2,
deduplicateUntil2,
)(c)
)
)
val foundDeduplicateUntil2 = executeSql(backend.deduplication.deduplicatedUntil(key))
insertedRows shouldBe 1 // First call inserts a new row
updatedRows.count(
_ == 1
) shouldBe 1 // One of the subsequent calls updates the now expired row
updatedRows.count(_ == 0) shouldBe (n - 1) // All other calls don't write anything
foundDeduplicateUntil shouldBe deduplicateUntil
foundDeduplicateUntil2 shouldBe deduplicateUntil2
}
it should "not update or insert anything if there is an existing active entry" in {
val key = "deduplication key"
val submittedAt = Timestamp.assertFromLong(0L)
val deduplicateUntil = submittedAt.addMicros(5000L)
// Second submission is within the deduplication window of the first one
val submittedAt2 = Timestamp.assertFromLong(1000L)
val deduplicateUntil2 = submittedAt2.addMicros(5000L)
executeSql(backend.parameter.initializeParameters(someIdentityParams))
val insertedRows = executeSql(
backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil)
)
val foundDeduplicateUntil = executeSql(backend.deduplication.deduplicatedUntil(key))
val updatedRows = executeSql(
backend.deduplication.upsertDeduplicationEntry(key, submittedAt2, deduplicateUntil2)
)
val foundDeduplicateUntil2 = executeSql(backend.deduplication.deduplicatedUntil(key))
insertedRows shouldBe 1 // First call inserts a new row
updatedRows shouldBe 0 // Second call doesn't write anything
foundDeduplicateUntil shouldBe deduplicateUntil
foundDeduplicateUntil2 shouldBe deduplicateUntil
}
}

View File

@ -1,104 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao
import java.time.Instant
import java.util.UUID
import com.daml.ledger.api.domain.CommandId
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationDuplicate,
CommandDeduplicationNew,
}
import com.daml.lf.data.Time.Timestamp
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
private[dao] trait JdbcLedgerDaoCommandDeduplicationSpec {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>
behavior of "JdbcLedgerDao (command deduplication)"
it should "correctly deduplicate a command" in {
val commandId: CommandId = CommandId(UUID.randomUUID.toString)
for {
original <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(5000))
duplicate <- ledgerDao.deduplicateCommand(commandId, List(alice), t(500), t(5500))
} yield {
original shouldBe CommandDeduplicationNew
duplicate shouldBe CommandDeduplicationDuplicate(t(5000))
}
}
it should "correctly deduplicate commands with multiple submitters" in {
val commandId: CommandId = CommandId(UUID.randomUUID.toString)
for {
original <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(0), t(5000))
duplicate1 <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(500), t(5500))
duplicate2 <- ledgerDao.deduplicateCommand(commandId, List(bob, alice), t(500), t(5500))
duplicate3 <- ledgerDao.deduplicateCommand(
commandId,
List(alice, bob, alice),
t(500),
t(5500),
)
} yield {
original shouldBe CommandDeduplicationNew
duplicate1 shouldBe CommandDeduplicationDuplicate(t(5000))
duplicate2 shouldBe CommandDeduplicationDuplicate(t(5000))
duplicate3 shouldBe CommandDeduplicationDuplicate(t(5000))
}
}
it should "not deduplicate a command after it expired" in {
val commandId: CommandId = CommandId(UUID.randomUUID.toString)
for {
original1 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(100))
original2 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(101), t(200))
} yield {
original1 shouldBe CommandDeduplicationNew
original2 shouldBe CommandDeduplicationNew
}
}
it should "not deduplicate a command after its deduplication was stopped" in {
val commandId: CommandId = CommandId(UUID.randomUUID.toString)
for {
original1 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(0), t(10000))
_ <- ledgerDao.stopDeduplicatingCommand(commandId, List(alice))
original2 <- ledgerDao.deduplicateCommand(commandId, List(alice), t(1), t(10001))
} yield {
original1 shouldBe CommandDeduplicationNew
original2 shouldBe CommandDeduplicationNew
}
}
it should "not deduplicate commands with different command ids" in {
val commandId1: CommandId = CommandId(UUID.randomUUID.toString)
val commandId2: CommandId = CommandId(UUID.randomUUID.toString)
for {
original1 <- ledgerDao.deduplicateCommand(commandId1, List(alice, bob), t(0), t(1000))
original2 <- ledgerDao.deduplicateCommand(commandId2, List(alice, bob), t(0), t(1000))
} yield {
original1 shouldBe CommandDeduplicationNew
original2 shouldBe CommandDeduplicationNew
}
}
it should "not deduplicate commands with different submitters" in {
val commandId: CommandId = CommandId(UUID.randomUUID.toString)
for {
original1 <- ledgerDao.deduplicateCommand(commandId, List(alice, bob), t(0), t(1000))
original2 <- ledgerDao.deduplicateCommand(commandId, List(alice, charlie), t(0), t(1000))
} yield {
original1 shouldBe CommandDeduplicationNew
original2 shouldBe CommandDeduplicationNew
}
}
private[this] val t0 = Instant.now()
private[this] def t(ms: Long): Timestamp = {
Timestamp.assertFromInstant(t0.plusMillis(ms))
}
}

View File

@ -9,11 +9,7 @@ import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationNew,
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.index.v2.IndexPartyManagementService
import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteService}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf
@ -331,91 +327,6 @@ class ApiSubmissionServiceSpec
}
}
behavior of "command deduplication"
it should "use deduplication if enabled" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
val indexSubmissionService = mock[IndexSubmissionService]
val mockCommandExecutor = mock[CommandExecutor]
when(
mockCommandExecutor.execute(
any[Commands],
any[Hash],
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
).thenReturn(
Future.failed(
new RuntimeException
) // we don't care about the result, deduplication already happened
)
val service =
newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
mockIndexSubmissionService = indexSubmissionService,
commandExecutor = mockCommandExecutor,
)
val submitRequest = newSubmitRequest()
service
.submit(submitRequest)
.transform(_ => {
verify(indexSubmissionService).deduplicateCommand(
any[CommandId],
any[List[Ref.Party]],
any[Timestamp],
any[Timestamp],
)(any[LoggingContext])
Success(succeed)
})
}
it should "not use deduplication when disabled" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[state.WriteService]
val indexSubmissionService = mock[IndexSubmissionService]
val mockCommandExecutor = mock[CommandExecutor]
when(
mockCommandExecutor.execute(
any[Commands],
any[Hash],
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
).thenReturn(
Future.failed(
new RuntimeException
) // we don't care about the result, deduplication already happened
)
val service =
newSubmissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
deduplicationEnabled = false,
mockIndexSubmissionService = indexSubmissionService,
commandExecutor = mockCommandExecutor,
)
val submitRequest = newSubmitRequest()
service
.submit(submitRequest)
.transform(_ => {
verify(indexSubmissionService, never).deduplicateCommand(
any[CommandId],
any[List[Ref.Party]],
any[Timestamp],
any[Timestamp],
)(any[LoggingContext])
Success(succeed)
})
}
it should "rate-limit when configured to do so" in {
val grpcError = RpcStatus.of(Status.Code.ABORTED.value(), s"Quota Exceeded", Seq.empty)
@ -424,8 +335,6 @@ class ApiSubmissionServiceSpec
mock[state.WriteService],
mock[IndexPartyManagementService],
implicitPartyAllocation = true,
deduplicationEnabled = false,
mockIndexSubmissionService = mock[IndexSubmissionService],
commandExecutor = mock[CommandExecutor],
checkOverloaded = _ => Some(SubmissionResult.SynchronousError(grpcError)),
)
@ -444,10 +353,6 @@ class ApiSubmissionServiceSpec
}
object ApiSubmissionServiceSpec {
import ArgumentMatchersSugar._
import MockitoSugar._
val commandId = new AtomicInteger()
private def newSubmitRequest() = {
@ -474,8 +379,6 @@ object ApiSubmissionServiceSpec {
partyManagementService: IndexPartyManagementService,
implicitPartyAllocation: Boolean,
commandExecutor: CommandExecutor = null,
deduplicationEnabled: Boolean = true,
mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService],
useSelfServiceErrorCodes: Boolean = true,
checkOverloaded: TelemetryContext => Option[SubmissionResult] = _ => None,
)(implicit
@ -487,33 +390,15 @@ object ApiSubmissionServiceSpec {
Some(Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO))
}
when(writeService.isApiDeduplicationEnabled).thenReturn(deduplicationEnabled)
when(
mockIndexSubmissionService.deduplicateCommand(
any[CommandId],
anyList[Ref.Party],
any[Timestamp],
any[Timestamp],
)(any[LoggingContext])
).thenReturn(Future.successful(CommandDeduplicationNew))
when(
mockIndexSubmissionService.stopDeduplicatingCommand(
any[CommandId],
anyList[Ref.Party],
)(any[LoggingContext])
).thenReturn(Future.unit)
new ApiSubmissionService(
writeService = writeService,
submissionService = mockIndexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigurationSubscription = ledgerConfigurationSubscription,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService
.Configuration(implicitPartyAllocation, enableDeduplication = true),
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(new MetricRegistry),
errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
enableSelfServiceErrorCodes = useSelfServiceErrorCodes

View File

@ -150,9 +150,6 @@ class DbDtoToStringsForInterningSpec extends AnyFlatSpec with Matchers {
deduplication_duration_nanos = Some(1),
deduplication_start = Some(1),
),
DbDto.CommandDeduplication(
deduplication_key = "74"
),
DbDto.ConfigurationEntry(
ledger_offset = "75",
recorded_at = 1,

View File

@ -5,7 +5,6 @@ package com.daml.platform.store.backend
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.DeduplicationPeriod.{DeduplicationDuration, DeduplicationOffset}
import com.daml.ledger.api.domain
import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.offset.Offset
@ -20,7 +19,7 @@ import com.daml.logging.LoggingContext
import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.appendonlydao.events.Raw.TreeEvent
import com.daml.platform.store.appendonlydao.events._
import com.daml.platform.store.appendonlydao.{DeduplicationKeyMaker, JdbcLedgerDao}
import com.daml.platform.store.appendonlydao.JdbcLedgerDao
import com.google.protobuf.ByteString
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
@ -257,13 +256,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = None,
deduplication_duration_nanos = None,
deduplication_start = None,
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
domain.CommandId(completionInfo.commandId),
completionInfo.actAs,
)
),
)
)
}
@ -1285,13 +1278,7 @@ class UpdateToDbDtoSpec extends AnyWordSpec with Matchers {
deduplication_duration_seconds = expectedDeduplicationDurationSeconds,
deduplication_duration_nanos = expectedDeduplicationDurationNanos,
deduplication_start = None,
),
DbDto.CommandDeduplication(
DeduplicationKeyMaker.make(
domain.CommandId(completionInfo.commandId),
completionInfo.actAs,
)
),
)
)
}
}

View File

@ -1,80 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.dao
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import com.daml.ledger.api.domain.CommandId
import com.daml.lf.data.Ref
import com.daml.platform.store.appendonlydao.DeduplicationKeyMaker
import org.scalatest.prop.TableDrivenPropertyChecks
import java.util.UUID
import scala.util.Random
import scalaz.syntax.tag._
class DeduplicationKeyMakerSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChecks {
val commandId: CommandId = CommandId(Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
"DeduplicationKeyMaker" should {
"make a deduplication key starting with a command ID in plain-text" in {
DeduplicationKeyMaker.make(commandId, List(aParty())) should startWith(commandId.unwrap)
}
"make different keys for different sets of submitters" in {
val aCommonParty = aParty()
val cases = Table(
("Submitters for key1", "Submitters for key2"),
(List(aParty()), List(aParty())),
(List(aCommonParty, aParty()), List(aCommonParty, aParty())),
(List(aParty(), aParty()), List(aParty(), aParty())),
)
forAll(cases) { case (key1Submitters, key2Submitters) =>
val key1 = DeduplicationKeyMaker.make(commandId, key1Submitters)
val key2 = DeduplicationKeyMaker.make(commandId, key2Submitters)
key1 shouldNot equal(key2)
}
}
"make a deduplication key with a limited length for a large number of submitters" in {
val submitters = (1 to 50).map(_ => aParty()).toList
/** The motivation for the MaxKeyLength is to avoid problems with putting deduplication key in a database
* index (e.g. for Postgres the limit for the index row size is 2712).
* The value 200 is set arbitrarily to provide some space for other data.
*/
val MaxKeyLength = 200
DeduplicationKeyMaker.make(commandId, submitters).length should be < MaxKeyLength
}
"make the same deduplication key for submitters of different order" in {
val submitter1 = aParty()
val submitter2 = aParty()
val submitter3 = aParty()
val key1 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter2, submitter3))
val key2 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter3, submitter2))
key1 shouldBe key2
}
"make the same deduplication key for duplicated submitters" in {
val submitter1 = aParty()
val submitter2 = aParty()
val key1 = DeduplicationKeyMaker.make(commandId, List(submitter1, submitter2))
val key2 = DeduplicationKeyMaker.make(
commandId,
List(submitter1, submitter1, submitter2, submitter2, submitter2),
)
key1 shouldBe key2
}
def aParty(): Ref.Party = Ref.Party.assertFromString(Random.alphanumeric.take(100).mkString)
}
}

View File

@ -16,7 +16,6 @@ trait IndexService
with IndexPartyManagementService
with IndexConfigManagementService
with IndexParticipantPruningService
with IndexSubmissionService
with MeteringStore
// with IndexTimeService //TODO: this needs some further discussion as the TimeService is actually optional
with ReportsHealth

View File

@ -1,28 +0,0 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.index.v2
import com.daml.ledger.api.domain.CommandId
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import scala.concurrent.Future
/** Serves as a backend to implement
* [[com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc]]
*/
trait IndexSubmissionService {
def deduplicateCommand(
commandId: CommandId,
submitters: List[Ref.Party],
submittedAt: Timestamp,
deduplicateUntil: Timestamp,
)(implicit loggingContext: LoggingContext): Future[CommandDeduplicationResult]
def stopDeduplicatingCommand(
commandId: CommandId,
submitters: List[Ref.Party],
)(implicit loggingContext: LoggingContext): Future[Unit]
}

View File

@ -107,13 +107,4 @@ package v2 {
knownSince: Timestamp,
sourceDescription: Option[String],
)
sealed abstract class CommandDeduplicationResult extends Product with Serializable
/** This is the first time the command was submitted. */
case object CommandDeduplicationNew extends CommandDeduplicationResult
/** This command was submitted before. */
final case class CommandDeduplicationDuplicate(deduplicateUntil: Timestamp)
extends CommandDeduplicationResult
}

View File

@ -95,6 +95,4 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
override def currentHealth(): HealthStatus =
delegate.currentHealth()
override def isApiDeduplicationEnabled: Boolean = delegate.isApiDeduplicationEnabled
}

View File

@ -18,11 +18,7 @@ import com.daml.lf.language.LanguageVersion
import com.daml.metrics.MetricsReporter
import com.daml.platform.apiserver.SeedService.Seeding
import com.daml.platform.configuration.Readers._
import com.daml.platform.configuration.{
CommandConfiguration,
IndexConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.configuration.{CommandConfiguration, IndexConfiguration}
import com.daml.platform.usermanagement.UserManagementConfig
import com.daml.ports.Port
import io.netty.handler.ssl.ClientAuth
@ -34,7 +30,6 @@ final case class Config[Extra](
mode: Mode,
ledgerId: String,
commandConfig: CommandConfiguration,
submissionConfig: SubmissionConfiguration,
tlsConfig: Option[TlsConfiguration],
participants: Seq[ParticipantConfig],
maxInboundMessageSize: Int,
@ -78,7 +73,6 @@ object Config {
mode = Mode.Run,
ledgerId = UUID.randomUUID().toString,
commandConfig = CommandConfiguration.default,
submissionConfig = SubmissionConfiguration.default,
tlsConfig = None,
participants = Vector.empty,
maxInboundMessageSize = DefaultMaxInboundMessageSize,
@ -417,17 +411,6 @@ object Config {
s" Default is ${CommandConfiguration.DefaultTrackerRetentionPeriod}."
)
opt[Unit]("disable-deduplication-unsafe")
.optional()
.hidden()
.action((_, config) =>
config
.copy(submissionConfig = config.submissionConfig.copy(enableDeduplication = false))
)
.text(
"Disable participant-side command deduplication."
)
opt[Duration]("max-deduplication-duration")
.optional()
.hidden()

View File

@ -230,7 +230,6 @@ final class Runner[T <: ReadWriteService, Extra](
ledgerId = config.ledgerId,
config = apiServerConfig,
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
partyConfig = configProvider.partyConfig(config),
optWriteService = Some(writeService),
authService = configProvider.authService(config),

View File

@ -31,7 +31,6 @@ class KeyValueParticipantStateWriter(
) extends WriteService {
private val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
override def isApiDeduplicationEnabled: Boolean = false
private val keyValueSubmission = new KeyValueSubmission(metrics)

View File

@ -109,9 +109,6 @@ class WriteServiceWithDeduplicationSupport(
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
delegate.submitConfiguration(maxRecordTime, submissionId, config)
override def isApiDeduplicationEnabled: Boolean = delegate.isApiDeduplicationEnabled
}
object WriteServiceWithDeduplicationSupport {

View File

@ -105,9 +105,4 @@ trait WriteService
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult]
/** Indicates whether command deduplication should be enabled when using this [[WriteService]]
* This is temporary until we fully transition from [[com.daml.ledger.participant.state.v1.WriteService]] to [[WriteService]]
*/
def isApiDeduplicationEnabled: Boolean = false
}

View File

@ -12,11 +12,7 @@ import com.daml.lf.data.Ref
import com.daml.metrics.MetricsReporter
import com.daml.platform.apiserver.SeedService.Seeding
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.configuration.{
CommandConfiguration,
InitialLedgerConfiguration,
SubmissionConfiguration,
}
import com.daml.platform.configuration.{CommandConfiguration, InitialLedgerConfiguration}
import com.daml.platform.services.time.TimeProviderType
import com.daml.ports.Port
import java.io.File
@ -42,7 +38,6 @@ final case class SandboxConfig(
delayBeforeSubmittingLedgerConfiguration: Duration,
timeModel: LedgerTimeModel,
commandConfig: CommandConfiguration,
submissionConfig: SubmissionConfiguration,
tlsConfig: Option[TlsConfiguration],
// TODO sandbox: Remove CLI option
scenario: Option[String],
@ -146,7 +141,6 @@ object SandboxConfig {
maxSkew = Duration.ofSeconds(120L),
).get,
commandConfig = CommandConfiguration.default,
submissionConfig = SubmissionConfiguration.default,
tlsConfig = None,
scenario = None,
implicitPartyAllocation = true,

View File

@ -69,7 +69,6 @@ object ConfigConverter {
maybeLedgerId.getOrElse(LedgerIdGenerator.generateRandomId(ledgerName).unwrap)
},
commandConfig = sandboxConfig.commandConfig,
submissionConfig = sandboxConfig.submissionConfig,
tlsConfig = sandboxConfig.tlsConfig,
participants = Seq(
singleCombinedParticipant

View File

@ -37,8 +37,6 @@ class BridgeWriteService(
private[this] val logger = ContextualizedLogger.get(getClass)
override def isApiDeduplicationEnabled: Boolean = false
override def close(): Unit = {
logger.info("Shutting down BridgeWriteService.")
queue.complete()

View File

@ -231,7 +231,6 @@ object SandboxOnXRunner {
ledgerId = config.ledgerId,
config = apiServerConfig,
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
partyConfig = PartyConfiguration(config.extra.implicitPartyAllocation),
optWriteService = Some(writeService),
authService = config.extra.authService,

View File

@ -43,7 +43,7 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.MetricsReporting
import com.daml.platform.apiserver._
import com.daml.platform.common.LedgerIdMode
import com.daml.platform.configuration.{PartyConfiguration, ServerRole, SubmissionConfiguration}
import com.daml.platform.configuration.{PartyConfiguration, ServerRole}
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer}
import com.daml.platform.sandbox.banner.Banner
import com.daml.platform.sandbox.config.SandboxConfig
@ -266,7 +266,6 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
partyConfig = PartyConfiguration.default.copy(
implicitPartyAllocation = config.implicitPartyAllocation
),
submissionConfig = SubmissionConfiguration.default,
optWriteService = Some(writeServiceWithDeduplicationSupport),
authService = authService,
healthChecks = healthChecks,