diff --git a/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala b/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala index 0c7307d852e..e6f99a5d6a6 100644 --- a/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala +++ b/extractor/src/main/scala/com/digitalasset/extractor/Extractor.scala @@ -259,7 +259,11 @@ class Extractor[T](config: ExtractorConfig, target: T)( LedgerClientConfiguration( applicationId = config.appId, ledgerIdRequirement = LedgerIdRequirement.none, - commandClient = CommandClientConfiguration(1, 1, java.time.Duration.ofSeconds(20L)), + commandClient = CommandClientConfiguration( + maxCommandsInFlight = 1, + maxParallelSubmissions = 1, + defaultDeduplicationTime = java.time.Duration.ofSeconds(20L), + ), sslContext = config.tlsConfig.client(), token = tokenHolder.flatMap(_.token), maxInboundMessageSize = config.ledgerInboundMessageSizeMax, diff --git a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/completion.proto b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/completion.proto index 9ad162e293d..d3004e16aea 100644 --- a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/completion.proto +++ b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/completion.proto @@ -45,7 +45,7 @@ message Completion { // The submission ID this completion refers to, as described in ``commands.proto``. // Must be a valid LedgerString (as described in ``value.proto``). - // Optional for historic completions where this data is not available. + // Optional string submission_id = 6; reserved "submission_rank"; // For future use. diff --git a/ledger/ledger-api-client/BUILD.bazel b/ledger/ledger-api-client/BUILD.bazel index c796543f022..12eff867bc1 100644 --- a/ledger/ledger-api-client/BUILD.bazel +++ b/ledger/ledger-api-client/BUILD.bazel @@ -85,6 +85,8 @@ da_scala_test_suite( "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", "//ledger/caching", + "//ledger/ledger-api-common", + "//ledger/ledger-api-domain", "//ledger/ledger-grpc", "//ledger/metrics", "//libs-scala/concurrent", diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/configuration/CommandClientConfiguration.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/configuration/CommandClientConfiguration.scala index 561c191aced..a3fd4131e21 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/configuration/CommandClientConfiguration.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/configuration/CommandClientConfiguration.scala @@ -20,5 +20,9 @@ final case class CommandClientConfiguration( ) object CommandClientConfiguration { - def default = CommandClientConfiguration(1, 1, Duration.ofSeconds(30L)) + def default: CommandClientConfiguration = CommandClientConfiguration( + maxCommandsInFlight = 1, + maxParallelSubmissions = 1, + defaultDeduplicationTime = Duration.ofSeconds(30L), + ) } diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlow.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlow.scala new file mode 100644 index 00000000000..2aacc47b521 --- /dev/null +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlow.scala @@ -0,0 +1,59 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.client.services.commands + +import akka.NotUsed +import akka.stream.scaladsl.Flow +import com.daml.ledger.api.SubmissionIdGenerator +import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.client.configuration.CommandClientConfiguration +import com.daml.util.Ctx +import com.google.protobuf.duration.Duration + +import scala.annotation.nowarn + +object CommandUpdaterFlow { + + @nowarn("msg=deprecated") + def apply[Context]( + config: CommandClientConfiguration, + submissionIdGenerator: SubmissionIdGenerator, + applicationId: String, + ledgerIdToUse: LedgerId, + ): Flow[Ctx[Context, CommandSubmission], Ctx[Context, CommandSubmission], NotUsed] = + Flow[Ctx[Context, CommandSubmission]] + .map(_.map { case submission @ CommandSubmission(commands, _) => + if (LedgerId(commands.ledgerId) != ledgerIdToUse) + throw new IllegalArgumentException( + s"Failing fast on submission request of command ${commands.commandId} with invalid ledger ID ${commands.ledgerId} (client expected $ledgerIdToUse)" + ) + if (commands.applicationId != applicationId) + throw new IllegalArgumentException( + s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)" + ) + val nonEmptySubmissionId = if (commands.submissionId.isEmpty) { + submissionIdGenerator.generate() + } else { + commands.submissionId + } + val updatedDeduplicationPeriod = commands.deduplicationPeriod match { + case DeduplicationPeriod.Empty => + DeduplicationPeriod.DeduplicationTime( + Duration + .of( + config.defaultDeduplicationTime.getSeconds, + config.defaultDeduplicationTime.getNano, + ) + ) + case existing => existing + } + submission.copy(commands = + commands.copy( + submissionId = nonEmptySubmissionId, + deduplicationPeriod = updatedDeduplicationPeriod, + ) + ) + }) +} diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala index 3e529709ca2..2f666fb2bf5 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala @@ -13,7 +13,6 @@ import com.daml.ledger.client.services.commands.tracker.CommandTracker._ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ CompletionFailure, CompletionSuccess, - NotOkResponse, } import com.daml.ledger.client.services.commands.{ CommandSubmission, @@ -62,8 +61,6 @@ private[commands] class CommandTracker[Context]( private val logger = LoggerFactory.getLogger(this.getClass.getName) - type ContextualizedCompletionResponse = Ctx[Context, Either[CompletionFailure, CompletionSuccess]] - val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] = Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn") val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] = @@ -73,8 +70,8 @@ private[commands] class CommandTracker[Context]( Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]]( "commandResultIn" ) - val resultOut: Outlet[ContextualizedCompletionResponse] = - Outlet[ContextualizedCompletionResponse]("resultOut") + val resultOut: Outlet[ContextualizedCompletionResponse[Context]] = + Outlet[ContextualizedCompletionResponse[Context]]("resultOut") val offsetOut: Outlet[LedgerOffset] = Outlet[LedgerOffset]("offsetOut") @@ -166,7 +163,7 @@ private[commands] class CommandTracker[Context]( pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse)) case Right(CompletionStreamElement.CompletionElement(completion)) => - pushResultOrPullCommandResultIn(getResponsesForCompletion(completion)) + pushResultOrPullCommandResultIn(getResponseForCompletion(completion)) case Right(CompletionStreamElement.CheckpointElement(checkpoint)) => if (!hasBeenPulled(commandResultIn)) pull(commandResultIn) @@ -187,7 +184,7 @@ private[commands] class CommandTracker[Context]( ) private def pushResultOrPullCommandResultIn( - completionResponses: Seq[ContextualizedCompletionResponse] + completionResponse: Option[ContextualizedCompletionResponse[Context]] ): Unit = { // The command tracker detects timeouts outside the regular pull/push // mechanism of the input/output ports. Basically the timeout @@ -196,10 +193,13 @@ private[commands] class CommandTracker[Context]( // even though it hasn't been pulled again in the meantime. Using `emit` // instead of `push` when a completion arrives makes akka take care of // handling the signaling properly. - if (completionResponses.isEmpty && !hasBeenPulled(commandResultIn)) { - pull(commandResultIn) + completionResponse match { + case Some(response) => emit(resultOut, response) + case None => + if (!hasBeenPulled(commandResultIn)) { + pull(commandResultIn) + } } - emitMultiple(resultOut, completionResponses.to(immutable.Iterable)) } private def completeStageIfTerminal(): Unit = { @@ -212,7 +212,7 @@ private[commands] class CommandTracker[Context]( private def handleSubmitResponse( submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]] - ): Seq[ContextualizedCompletionResponse] = { + ): Option[ContextualizedCompletionResponse[Context]] = { val Ctx((_, commandKey), value, _) = submitResponse value match { case Failure(GrpcException(status @ GrpcStatus(code, _), metadata)) @@ -220,18 +220,18 @@ private[commands] class CommandTracker[Context]( getResponseForTerminalStatusCode( commandKey, GrpcStatus.toProto(status, metadata), - ).toList + ) case Failure(throwable) => logger.warn( s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...", throwable, ) - Seq.empty + None case Success(_) => logger.trace( s"Received confirmation that command ${commandKey.commandId} from submission ${commandKey.submissionId} was accepted." ) - Seq.empty + None } } @@ -241,7 +241,7 @@ private[commands] class CommandTracker[Context]( val submissionId = commands.submissionId val commandId = commands.commandId logger.trace(s"Begin tracking of command $commandId for submission $submissionId.") - if (commands.submissionId.isEmpty) { + if (submissionId.isEmpty) { throw new IllegalArgumentException( s"The submission ID for the command ID $commandId is empty. This should not happen." ) @@ -281,7 +281,7 @@ private[commands] class CommandTracker[Context]( private def getResponsesForTimeouts( instant: Instant - ): Seq[ContextualizedCompletionResponse] = { + ): Seq[ContextualizedCompletionResponse[Context]] = { logger.trace("Checking timeouts at {}", instant) pendingCommands.view.flatMap { case (commandKey, trackingData) => if (trackingData.commandTimeout.isBefore(instant)) { @@ -305,14 +305,11 @@ private[commands] class CommandTracker[Context]( }.toSeq } - private def getResponsesForCompletion( + private def getResponseForCompletion( completion: Completion - ): Seq[ContextualizedCompletionResponse] = { - + ): Option[ContextualizedCompletionResponse[Context]] = { val commandId = completion.commandId - val maybeSubmissionId = Option(completion.submissionId).collect { - case id if id.nonEmpty => id - } + val maybeSubmissionId = Option(completion.submissionId).filter(_.nonEmpty) logger.trace { val completionDescription = completion.status match { case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value => @@ -322,49 +319,26 @@ private[commands] class CommandTracker[Context]( s"Handling $completionDescription $commandId from submission $maybeSubmissionId." } - val trackedCommandKeys = pendingCommandKeys(maybeSubmissionId, commandId) - val trackingDataForSubmissionId = - trackedCommandKeys.flatMap(pendingCommands.remove(_).toList) - - if (trackingDataForSubmissionId.size > 1) { - trackingDataForSubmissionId.map { trackingData => - Ctx( - trackingData.context, - Left( - NotOkResponse( - Completion( - commandId, - Some( - StatusProto.of( - Status.Code.INTERNAL.value(), - s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.", - Seq.empty, - ) - ), - ) - ) - ), + maybeSubmissionId + .map { submissionId => + val key = TrackedCommandKey(submissionId, completion.commandId) + val trackedCommandForCompletion = pendingCommands.remove(key) + trackedCommandForCompletion.map(trackingData => + Ctx(trackingData.context, tracker.CompletionResponse(completion)) ) } - } else { - trackingDataForSubmissionId.map(trackingData => - Ctx(trackingData.context, tracker.CompletionResponse(completion)) - ) - } + .getOrElse { + logger.trace( + "Ignoring a completion with an empty submission ID for a submission from the CommandSubmissionService." + ) + None + } } - private def pendingCommandKeys( - submissionId: Option[String], - commandId: String, - ): Seq[TrackedCommandKey] = - submissionId.map(id => Seq(TrackedCommandKey(id, commandId))).getOrElse { - pendingCommands.keys.filter(_.commandId == commandId).toList - } - private def getResponseForTerminalStatusCode( commandKey: TrackedCommandKey, status: StatusProto, - ): Option[ContextualizedCompletionResponse] = { + ): Option[ContextualizedCompletionResponse[Context]] = { logger.trace( s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}." ) @@ -407,6 +381,9 @@ private[commands] class CommandTracker[Context]( } object CommandTracker { + type ContextualizedCompletionResponse[Context] = + Ctx[Context, Either[CompletionFailure, CompletionSuccess]] + private val durationOrdering = implicitly[Ordering[Duration]] private val nonTerminalCodes = diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/withoutledgerid/CommandClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/withoutledgerid/CommandClient.scala index 415cc2b9219..f99dfcd49a6 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/withoutledgerid/CommandClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/withoutledgerid/CommandClient.scala @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.daml.ledger.client.services.commands.withoutledgerid -import com.daml.ledger.client.services.commands._ + import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.{Flow, Keep, Sink, Source} @@ -18,25 +18,23 @@ import com.daml.ledger.api.v1.command_completion_service.{ } import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionServiceStub import com.daml.ledger.api.v1.command_submission_service.SubmitRequest -import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.api.validation.CommandsValidator import com.daml.ledger.client.LedgerClient import com.daml.ledger.client.configuration.CommandClientConfiguration import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized -import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey +import com.daml.ledger.client.services.commands._ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ CompletionFailure, CompletionSuccess, } +import com.daml.ledger.client.services.commands.tracker.TrackedCommandKey import com.daml.util.Ctx import com.daml.util.akkastreams.MaxInFlight -import com.google.protobuf.duration.Duration import com.google.protobuf.empty.Empty import org.slf4j.{Logger, LoggerFactory} import scalaz.syntax.tag._ -import scala.annotation.nowarn import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.util.Try @@ -150,7 +148,9 @@ private[daml] final class CommandClient( ledgerEnd <- getCompletionEnd(ledgerIdToUse, token) } yield { partyFilter(parties.toSet) - .via(commandUpdaterFlow[Context](ledgerIdToUse)) + .via( + CommandUpdaterFlow[Context](config, submissionIdGenerator, applicationId, ledgerIdToUse) + ) .viaMat( CommandTrackerFlow[Context, NotUsed]( commandSubmissionFlow = CommandSubmissionFlow[(Context, TrackedCommandKey)]( @@ -193,48 +193,12 @@ private[daml] final class CommandClient( ) } - @nowarn("msg=deprecated") - private def commandUpdaterFlow[Context](ledgerIdToUse: LedgerId) = - Flow[Ctx[Context, CommandSubmission]] - .map(_.map { case submission @ CommandSubmission(commands, _) => - if (LedgerId(commands.ledgerId) != ledgerIdToUse) - throw new IllegalArgumentException( - s"Failing fast on submission request of command ${commands.commandId} with invalid ledger ID ${commands.ledgerId} (client expected $ledgerIdToUse)" - ) - if (commands.applicationId != applicationId) - throw new IllegalArgumentException( - s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)" - ) - val nonEmptySubmissionId = if (commands.submissionId.isEmpty) { - submissionIdGenerator.generate() - } else { - commands.submissionId - } - val updatedDeduplicationPeriod = commands.deduplicationPeriod match { - case DeduplicationPeriod.Empty => - DeduplicationPeriod.DeduplicationTime( - Duration - .of( - config.defaultDeduplicationTime.getSeconds, - config.defaultDeduplicationTime.getNano, - ) - ) - case existing => existing - } - submission.copy(commands = - commands.copy( - submissionId = nonEmptySubmissionId, - deduplicationPeriod = updatedDeduplicationPeriod, - ) - ) - }) - def submissionFlow[Context]( ledgerIdToUse: LedgerId, token: Option[String] = None, ): Flow[Ctx[Context, CommandSubmission], Ctx[Context, Try[Empty]], NotUsed] = { Flow[Ctx[Context, CommandSubmission]] - .via(commandUpdaterFlow(ledgerIdToUse)) + .via(CommandUpdaterFlow[Context](config, submissionIdGenerator, applicationId, ledgerIdToUse)) .via(CommandSubmissionFlow[Context](submit(token), config.maxParallelSubmissions)) } diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala index 714b66292e4..82b3d6646d6 100644 --- a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala @@ -493,64 +493,6 @@ class CommandTrackerFlowTest } } - "a completion without the submission id arrives" should { - - "output a failure if there are multiple pending commands with the same command id" in { - val Handle(submissions, results, _, completionStreamMock) = - runCommandTrackingFlow(allSubmissionsSuccessful) - - submissions.sendNext(newSubmission("submissionId", commandId)) - submissions.sendNext(newSubmission("anotherSubmissionId", commandId)) - - val completionWithoutSubmissionId = - Completion( - commandId, - Some(successStatus), - submissionId = "", - ) - completionStreamMock.send( - CompletionStreamElement.CompletionElement(completionWithoutSubmissionId) - ) - - results.expectNext( - Ctx( - context, - Left( - failureCompletion( - code = Code.INTERNAL, - message = - s"There are multiple pending commands with ID: $commandId for submission ID: None. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.", - submissionId = "", - ) - ), - ) - ) - succeed - } - - "output a successful completion" in { - val Handle(submissions, results, _, completionStreamMock) = - runCommandTrackingFlow(allSubmissionsSuccessful) - - submissions.sendNext(submission) - - val completionWithoutSubmissionId = - Completion( - commandId, - Some(successStatus), - submissionId = "", - ) - completionStreamMock.send( - CompletionStreamElement.CompletionElement(completionWithoutSubmissionId) - ) - - results.expectNext( - Ctx(context, Right(successCompletion(submissionId = ""))) - ) - succeed - } - } - "a multitude of successful completions arrive for submitted commands" should { "output all expected values" in { @@ -612,6 +554,26 @@ class CommandTrackerFlowTest } } + "completions with empty and nonempty submission IDs arrive" should { + + "ignore a completion with an empty submission ID and output a successful response" in { + val Handle(submissions, results, _, completionStreamMock) = + runCommandTrackingFlow(allSubmissionsSuccessful) + + results.request(2) + + submissions.sendNext(submission) + + completionStreamMock.send(successfulStreamCompletion("", commandId)) + completionStreamMock.send(successfulStreamCompletion(submissionId, commandId)) + + results.expectNext( + Ctx(context, Right(successCompletion(submissionId = submissionId))) + ) + succeed + } + } + "completion stream disconnects" should { "keep run and recover the completion subscription from a recent offset" in { diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlowTest.scala new file mode 100644 index 00000000000..c9da777299e --- /dev/null +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandUpdaterFlowTest.scala @@ -0,0 +1,94 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.client.services.commands + +import akka.stream.scaladsl.{Sink, Source} +import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll +import com.daml.ledger.api.v1.commands.Commands +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.api.{SubmissionIdGenerator, domain} +import com.daml.ledger.client.configuration.CommandClientConfiguration +import com.daml.lf.data.Ref +import com.daml.telemetry.NoOpTelemetryContext +import com.daml.util.Ctx +import com.google.protobuf.duration.Duration +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import scala.annotation.nowarn +import scala.util.Failure + +@nowarn("msg=deprecated") +class CommandUpdaterFlowTest extends AsyncWordSpec with Matchers with AkkaBeforeAndAfterAll { + + import CommandUpdaterFlowTest._ + + "apply" should { + "fail fast on an invalid ledger ID" in { + val aCommandSubmission = + CommandSubmission(defaultCommands.copy(ledgerId = "anotherLedgerId")) + + runCommandUpdaterFlow(aCommandSubmission) + .transformWith { + case Failure(exception) => exception shouldBe an[IllegalArgumentException] + case _ => fail + } + } + + "fail fast on an invalid application ID" in { + val aCommandSubmission = + CommandSubmission(defaultCommands.copy(applicationId = "anotherApplicationId")) + + runCommandUpdaterFlow(aCommandSubmission) + .transformWith { + case Failure(exception) => exception shouldBe an[IllegalArgumentException] + case _ => fail + } + } + + "generate a submission ID if it's empty" in { + val aCommandSubmission = CommandSubmission(defaultCommands.copy(submissionId = "")) + + runCommandUpdaterFlow(aCommandSubmission) + .map(_.value.commands.submissionId shouldBe aSubmissionId) + } + + "set the default deduplication period if it's empty" in { + val aCommandSubmission = + CommandSubmission( + defaultCommands.copy(deduplicationPeriod = DeduplicationPeriod.Empty) + ) + val defaultDeduplicationTime = CommandClientConfiguration.default.defaultDeduplicationTime + + runCommandUpdaterFlow(aCommandSubmission) + .map( + _.value.commands.getDeduplicationTime shouldBe Duration + .of(defaultDeduplicationTime.getSeconds, defaultDeduplicationTime.getNano) + ) + } + } + + private def runCommandUpdaterFlow(aCommandSubmission: CommandSubmission) = { + Source + .single(Ctx((), aCommandSubmission, NoOpTelemetryContext)) + .via( + CommandUpdaterFlow( + CommandClientConfiguration.default, + aSubmissionIdGenerator, + anApplicationId, + aLedgerId, + ) + ) + .runWith(Sink.head) + } +} + +object CommandUpdaterFlowTest { + private val anApplicationId = "anApplicationId" + private val aLedgerId = domain.LedgerId("aLedgerId") + private val aSubmissionId = Ref.SubmissionId.assertFromString("aSubmissionId") + private val aSubmissionIdGenerator: SubmissionIdGenerator = () => aSubmissionId + private val defaultCommands = + Commands.defaultInstance.copy(applicationId = anApplicationId, ledgerId = aLedgerId.toString) +} diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala index 5caa2740c77..463b7aa8677 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala @@ -49,7 +49,7 @@ final class CommandsValidator(ledgerId: LedgerId) { appId <- requireLedgerString(commands.applicationId, "application_id") .map(domain.ApplicationId(_)) commandId <- requireLedgerString(commands.commandId, "command_id").map(domain.CommandId(_)) - submissionId <- requireSubmissionId(commands.submissionId) + submissionId <- validateSubmissionId(commands.submissionId) submitters <- validateSubmitters(commands) commandz <- requireNonEmpty(commands.commands, "commands") validatedCommands <- validateInnerCommands(commandz) diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala index 72baaa66ecb..823d80e11f6 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala @@ -4,7 +4,6 @@ package com.daml.platform.server.api.services.grpc import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger} -import com.daml.ledger.api.SubmissionIdGenerator import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{ CommandSubmissionService => ApiCommandSubmissionService @@ -32,7 +31,6 @@ class GrpcCommandSubmissionService( currentLedgerTime: () => Instant, currentUtcTime: () => Instant, maxDeduplicationTime: () => Option[Duration], - submissionIdGenerator: SubmissionIdGenerator, metrics: Metrics, )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext) extends ApiCommandSubmissionService @@ -57,7 +55,6 @@ class GrpcCommandSubmissionService( telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party) telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId) } - val requestWithSubmissionId = generateSubmissionIdIfEmpty(request) Timed.timedAndTrackedFuture( metrics.daml.commands.submissions, metrics.daml.commands.submissionsRunning, @@ -65,14 +62,14 @@ class GrpcCommandSubmissionService( .value( metrics.daml.commands.validation, validator.validate( - requestWithSubmissionId, + request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), ), ) .fold( - t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), + t => Future.failed(ValidationLogger.logFailure(request, t)), service.submit(_).map(_ => Empty.defaultInstance), ), ) @@ -80,14 +77,4 @@ class GrpcCommandSubmissionService( override def bindService(): ServerServiceDefinition = CommandSubmissionServiceGrpc.bindService(this, executionContext) - - private def generateSubmissionIdIfEmpty(request: ApiSubmitRequest): ApiSubmitRequest = { - if (request.commands.exists(_.submissionId.isEmpty)) { - val commandsWithSubmissionId = - request.commands.map(_.copy(submissionId = submissionIdGenerator.generate())) - request.copy(commands = commandsWithSubmissionId) - } else { - request - } - } } diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala index cacef76ef83..1a562190488 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala @@ -93,20 +93,18 @@ class FieldValidations private (errorFactories: ErrorFactories) { ): Either[StatusRuntimeException, Ref.LedgerString] = Ref.LedgerString.fromString(s).left.map(invalidArgument(definiteAnswer = Some(false))) - def requireSubmissionId(s: String)(implicit + def validateSubmissionId(s: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger - ): Either[StatusRuntimeException, domain.SubmissionId] = { - val fieldName = "submission_id" + ): Either[StatusRuntimeException, Option[domain.SubmissionId]] = if (s.isEmpty) { - Left(missingField(fieldName, definiteAnswer = Some(false))) + Right(None) } else { Ref.SubmissionId .fromString(s) - .map(domain.SubmissionId(_)) + .map(submissionId => Some(domain.SubmissionId(submissionId))) .left - .map(invalidField(fieldName, _, definiteAnswer = Some(false))) + .map(invalidField("submission_id", _, definiteAnswer = Some(false))) } - } def requireContractId( s: String, diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala index 1bfe7333548..bed14e3eaa7 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala @@ -84,7 +84,7 @@ class SubmitRequestValidatorTest workflowId = Some(workflowId), applicationId = applicationId, commandId = commandId, - submissionId = submissionId, + submissionId = Some(submissionId), actAs = Set(DomainMocks.party), readAs = Set.empty, submittedAt = Time.Timestamp.assertFromInstant(submittedAt), @@ -132,18 +132,14 @@ class SubmitRequestValidatorTest "CommandSubmissionRequestValidator" when { "validating command submission requests" should { - "reject requests with empty submissionId" in { + "tolerate a missing submissionId" in { val commandsValidator = new CommandsValidator(ledgerId) - requestMustFailWith( - commandsValidator.validateCommands( - api.commands.withSubmissionId(""), - internal.ledgerTime, - internal.submittedAt, - Some(internal.maxDeduplicationDuration), - ), - INVALID_ARGUMENT, - "Missing field: submission_id", - ) + commandsValidator.validateCommands( + api.commands.withSubmissionId(""), + internal.ledgerTime, + internal.submittedAt, + Some(internal.maxDeduplicationDuration), + ) shouldEqual Right(internal.emptyCommands.copy(submissionId = None)) } "reject requests with empty commands" in { diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandServiceSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandServiceSpec.scala new file mode 100644 index 00000000000..b4f554a5d8a --- /dev/null +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandServiceSpec.scala @@ -0,0 +1,111 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.server.api.services.grpc + +import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.api.testing.utils.MockMessages._ +import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService +import com.daml.ledger.api.v1.command_service.{ + SubmitAndWaitForTransactionIdResponse, + SubmitAndWaitForTransactionResponse, + SubmitAndWaitForTransactionTreeResponse, + SubmitAndWaitRequest, +} +import com.daml.ledger.api.v1.commands.{Command, CreateCommand} +import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} +import com.daml.lf.data.Ref +import com.daml.logging.LoggingContext +import com.google.protobuf.empty.Empty +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import java.time.{Duration, Instant} +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future + +class GrpcCommandServiceSpec + extends AsyncWordSpec + with MockitoSugar + with Matchers + with ArgumentMatchersSugar { + + import GrpcCommandServiceSpec._ + + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + + "GrpcCommandService" should { + "generate a submission ID if it's empty" in { + val submissionCounter = new AtomicInteger + + val mockCommandService = mock[CommandService with AutoCloseable] + when(mockCommandService.submitAndWait(any[SubmitAndWaitRequest])) + .thenReturn(Future.successful(Empty.defaultInstance)) + when(mockCommandService.submitAndWaitForTransaction(any[SubmitAndWaitRequest])) + .thenReturn(Future.successful(SubmitAndWaitForTransactionResponse.defaultInstance)) + when(mockCommandService.submitAndWaitForTransactionId(any[SubmitAndWaitRequest])) + .thenReturn(Future.successful(SubmitAndWaitForTransactionIdResponse.defaultInstance)) + when(mockCommandService.submitAndWaitForTransactionTree(any[SubmitAndWaitRequest])) + .thenReturn(Future.successful(SubmitAndWaitForTransactionTreeResponse.defaultInstance)) + + val grpcCommandService = new GrpcCommandService( + mockCommandService, + ledgerId = LedgerId(ledgerId), + currentLedgerTime = () => Instant.EPOCH, + currentUtcTime = () => Instant.EPOCH, + maxDeduplicationTime = () => Some(Duration.ZERO), + generateSubmissionId = () => + Ref.SubmissionId.assertFromString( + s"$submissionIdPrefix${submissionCounter.incrementAndGet()}" + ), + ) + + for { + _ <- grpcCommandService.submitAndWait(aSubmitAndWaitRequestWithNoSubmissionId) + _ <- grpcCommandService.submitAndWaitForTransaction(aSubmitAndWaitRequestWithNoSubmissionId) + _ <- grpcCommandService.submitAndWaitForTransactionId( + aSubmitAndWaitRequestWithNoSubmissionId + ) + _ <- grpcCommandService.submitAndWaitForTransactionTree( + aSubmitAndWaitRequestWithNoSubmissionId + ) + } yield { + def expectedSubmitAndWaitRequest(submissionIdSuffix: String) = + aSubmitAndWaitRequestWithNoSubmissionId.copy(commands = + aSubmitAndWaitRequestWithNoSubmissionId.commands + .map(_.copy(submissionId = s"$submissionIdPrefix$submissionIdSuffix")) + ) + verify(mockCommandService).submitAndWait(expectedSubmitAndWaitRequest("1")) + verify(mockCommandService).submitAndWaitForTransaction(expectedSubmitAndWaitRequest("2")) + verify(mockCommandService).submitAndWaitForTransactionId(expectedSubmitAndWaitRequest("3")) + verify(mockCommandService).submitAndWaitForTransactionTree( + expectedSubmitAndWaitRequest("4") + ) + succeed + } + } + } +} + +object GrpcCommandServiceSpec { + private val aCommand = Command.of( + Command.Command.Create( + CreateCommand( + Some(Identifier("package", moduleName = "module", entityName = "entity")), + Some( + Record( + Some(Identifier("package", moduleName = "module", entityName = "entity")), + Seq(RecordField("something", Some(Value(Value.Sum.Bool(true))))), + ) + ), + ) + ) + ) + + private val aSubmitAndWaitRequestWithNoSubmissionId = submitAndWaitRequest.copy( + commands = Some(commands.copy(commands = Seq(aCommand), submissionId = "")) + ) + + private val submissionIdPrefix = "submissionId-" +} diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala index 1a7e18afef6..29979802f67 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala @@ -10,7 +10,6 @@ import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.testing.utils.MockMessages._ import com.daml.ledger.api.v1.commands.{Command, CreateCommand} import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} -import com.daml.lf.data.Ref import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.platform.server.api.services.domain.CommandSubmissionService @@ -42,7 +41,6 @@ class GrpcCommandSubmissionServiceSpec currentLedgerTime = () => Instant.EPOCH, currentUtcTime = () => Instant.EPOCH, maxDeduplicationTime = () => Some(Duration.ZERO), - submissionIdGenerator = () => Ref.SubmissionId.assertFromString("submissionId"), metrics = new Metrics(new MetricRegistry), ) diff --git a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala index edebfe46f8b..bba44b70862 100644 --- a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala +++ b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala @@ -278,7 +278,7 @@ object domain { workflowId: Option[WorkflowId], applicationId: ApplicationId, commandId: CommandId, - submissionId: SubmissionId, + submissionId: Option[SubmissionId], actAs: Set[Ref.Party], readAs: Set[Ref.Party], submittedAt: Timestamp, diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCompletionDeduplicationInfoIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCompletionDeduplicationInfoIT.scala index 2ad979f1986..1fa926e5d46 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCompletionDeduplicationInfoIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCompletionDeduplicationInfoIT.scala @@ -49,7 +49,7 @@ final class AppendOnlyCompletionDeduplicationInfoIT[ServiceRequest]( .submitRequest(ledger, party, requestWithSubmissionId) } yield { assertApplicationIdIsPreserved(ledger.applicationId, optNoDeduplicationSubmittedCompletion) - assertSubmissionIdIsGenerated(optNoDeduplicationSubmittedCompletion) + service.assertCompletion(optNoDeduplicationSubmittedCompletion) assertDeduplicationPeriodIsReported(optNoDeduplicationSubmittedCompletion) assertSubmissionIdIsPreserved(optSubmissionIdSubmittedCompletion, RandomSubmissionId) } @@ -70,6 +70,8 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT { party: Primitive.Party, request: ProtoRequestType, )(implicit ec: ExecutionContext): Future[Option[Completion]] + + def assertCompletion(optCompletion: Option[Completion]): Unit } case object CommandService extends Service[SubmitAndWaitRequest] { @@ -96,6 +98,15 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT { _ <- ledger.submitAndWait(request) completion <- singleCompletionAfterOffset(ledger, party, offset) } yield completion + + override def assertCompletion(optCompletion: Option[Completion]): Unit = { + val completion = assertDefined(optCompletion) + assert(completion.status.forall(_.code == Status.Code.OK.value())) + assert( + Ref.SubmissionId.fromString(completion.submissionId).isRight, + "Missing or invalid submission ID in completion", + ) + } } case object CommandSubmissionService extends Service[SubmitRequest] { @@ -122,6 +133,11 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT { _ <- ledger.submit(request) completion <- singleCompletionAfterOffset(ledger, party, offset) } yield completion + + override def assertCompletion(optCompletion: Option[Completion]): Unit = { + val completion = assertDefined(optCompletion) + assert(completion.status.forall(_.code == Status.Code.OK.value())) + } } private def singleCompletionAfterOffset( @@ -155,15 +171,6 @@ private[testtool] object AppendOnlyCompletionDeduplicationInfoIT { assert(completion.deduplicationPeriod.isDefined, "The deduplication period was not reported") } - private def assertSubmissionIdIsGenerated(optCompletion: Option[Completion]): Unit = { - val completion = assertDefined(optCompletion) - assert(completion.status.forall(_.code == Status.Code.OK.value())) - assert( - Ref.SubmissionId.fromString(completion.submissionId).isRight, - "Missing or invalid submission ID in completion", - ) - } - private def assertApplicationIdIsPreserved( requestedApplicationId: String, optCompletion: Option[Completion], diff --git a/ledger/participant-integration-api/src/main/scala/db/migration/postgres/V2_1__Rebuild_Acs.scala b/ledger/participant-integration-api/src/main/scala/db/migration/postgres/V2_1__Rebuild_Acs.scala index 4e639e780c8..ea0d230ba01 100644 --- a/ledger/participant-integration-api/src/main/scala/db/migration/postgres/V2_1__Rebuild_Acs.scala +++ b/ledger/participant-integration-api/src/main/scala/db/migration/postgres/V2_1__Rebuild_Acs.scala @@ -7,7 +7,7 @@ package com.daml.platform.db.migration.postgres import java.io.InputStream import java.sql.Connection -import java.util.{Date, UUID} +import java.util.Date import akka.NotUsed import akka.stream.scaladsl.Source @@ -484,14 +484,12 @@ private[migration] class V2_1__Rebuild_Acs extends BaseJavaMigration { Some(rejectionDescription), offset, ) => - // We don't have a submission ID, so we need to generate one. - val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString) val rejectionReason = readRejectionReason(rejectionType, rejectionDescription) offset -> LedgerEntry.Rejection( recordTime = Timestamp.assertFromInstant(recordedAt.toInstant), commandId = commandId, applicationId = applicationId, - submissionId = submissionId, + submissionId = None, actAs = List(submitter), rejectionReason = rejectionReason, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala index b2da9545bdd..737cfb437a5 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala @@ -80,7 +80,7 @@ private[apiserver] final class StoreBackedCommandExecutor( commands.applicationId.unwrap, commands.commandId.unwrap, commands.deduplicationPeriod, - commands.submissionId.unwrap, + commands.submissionId.map(_.unwrap), ledgerConfiguration, ), transactionMeta = state.TransactionMeta( diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 2892eb28f63..87f0f80cf4d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -11,9 +11,9 @@ import com.daml.error.{ ErrorCodesVersionSwitcher, } import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators} +import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands} import com.daml.ledger.api.messages.command.submission.SubmitRequest -import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator} import com.daml.ledger.configuration.Configuration import com.daml.ledger.participant.state.index.v2._ import com.daml.ledger.participant.state.{v2 => state} @@ -82,7 +82,6 @@ private[apiserver] object ApiSubmissionService { currentUtcTime = () => Instant.now, maxDeduplicationTime = () => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime), - submissionIdGenerator = SubmissionIdGenerator.Random, metrics = metrics, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index 5f300cc14ce..9ef3b2f8356 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -366,7 +366,7 @@ private class JdbcLedgerDao( state.Update.CommandRejected( recordTime = recordTime, completionInfo = state - .CompletionInfo(actAs, applicationId, commandId, None, Some(submissionId)), + .CompletionInfo(actAs, applicationId, commandId, None, submissionId), reasonTemplate = reason.toParticipantStateRejectionReason, ) ), diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/entries/LedgerEntry.scala b/ledger/participant-integration-api/src/main/scala/platform/store/entries/LedgerEntry.scala index 41115afb24c..0c3061c004e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/entries/LedgerEntry.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/entries/LedgerEntry.scala @@ -17,7 +17,7 @@ private[platform] object LedgerEntry { recordTime: Timestamp, commandId: Ref.CommandId, applicationId: Ref.ApplicationId, - submissionId: Ref.SubmissionId, + submissionId: Option[Ref.SubmissionId], actAs: List[Ref.Party], rejectionReason: RejectionReason, ) extends LedgerEntry diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala index 2e9356cc69f..7863e496630 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala @@ -6,7 +6,7 @@ package com.daml.platform.apiserver.execution import java.time.Duration import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.DeduplicationPeriod -import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId, SubmissionId} +import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService} import com.daml.lf.crypto.Hash @@ -59,7 +59,7 @@ class StoreBackedCommandExecutorSpec workflowId = None, applicationId = ApplicationId(Ref.ApplicationId.assertFromString("applicationId")), commandId = CommandId(Ref.CommandId.assertFromString("commandId")), - submissionId = SubmissionId(Ref.SubmissionId.assertFromString("submissionId")), + submissionId = None, actAs = Set.empty, readAs = Set.empty, submittedAt = Time.Timestamp.Epoch, diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index 4f21958d69b..9b3bafc6e6d 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -5,7 +5,7 @@ package com.daml.platform.apiserver.services import com.codahale.metrics.MetricRegistry import com.daml.error.{ErrorCause, ErrorCodesVersionSwitcher} -import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId} +import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} @@ -42,7 +42,6 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{Assertion, Inside} import java.time.Duration -import java.util.UUID import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.{ExecutionContext, Future} @@ -355,7 +354,6 @@ class ApiSubmissionServiceSpec writeService, partyManagementService, implicitPartyAllocation = true, - deduplicationEnabled = true, mockIndexSubmissionService = indexSubmissionService, commandExecutor = mockCommandExecutor, ) @@ -433,7 +431,7 @@ object ApiSubmissionServiceSpec { commandId = CommandId( Ref.CommandId.assertFromString(s"commandId-${commandId.incrementAndGet()}") ), - submissionId = SubmissionId(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)), + submissionId = None, actAs = Set.empty, readAs = Set.empty, submittedAt = Timestamp.Epoch, diff --git a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/transaction.proto b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/transaction.proto index da75f9556f2..4e3efef1dc9 100644 --- a/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/transaction.proto +++ b/ledger/participant-state/kvutils/src/main/protobuf/com/daml/ledger/participant/state/kvutils/store/events/transaction.proto @@ -32,7 +32,7 @@ message DamlSubmitterInfo { google.protobuf.Duration deduplication_duration = 6; string deduplication_offset = 7; } - string submission_id = 9; + optional string submission_id = 9; } // Daml transaction entry, used in both `DamlSubmission` and `DamlLogEntry`. diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala index d0a969f3ecb..723fab7a412 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala @@ -166,7 +166,7 @@ private[state] object Conversions { .addAllSubmitters((subInfo.actAs: List[String]).asJava) .setApplicationId(subInfo.applicationId) .setCommandId(subInfo.commandId) - .setSubmissionId(subInfo.submissionId) + .setSubmissionId(subInfo.submissionId.getOrElse("")) subInfo.deduplicationPeriod match { case DeduplicationPeriod.DeduplicationDuration(duration) => submitterInfoBuilder.setDeduplicationDuration(buildDuration(duration)) diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala index b6c384206fe..79ef37758f3 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala @@ -275,7 +275,6 @@ object KVTest { submitter, commandId, deduplicationDuration, - randomLedgerString, ) testState.keyValueSubmission.transactionToSubmission( submitterInfo = submitterInfo, @@ -444,14 +443,13 @@ object KVTest { submitter: Ref.Party, commandId: Ref.CommandId, deduplicationDuration: Duration, - submissionId: Ref.SubmissionId, ): SubmitterInfo = { SubmitterInfo( actAs = List(submitter), applicationId = Ref.LedgerString.assertFromString("test"), commandId = commandId, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(deduplicationDuration), - submissionId = submissionId, + submissionId = None, ledgerConfiguration = Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)), ) diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index 0741fdfc5ab..8532bfae83f 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -716,7 +716,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i applicationId = Ref.LedgerString.assertFromString("tests"), commandId = Ref.LedgerString.assertFromString(commandId), deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(10)), - submissionId = Ref.LedgerString.assertFromString("submissionId"), + submissionId = None, ledgerConfiguration = Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)), ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueCommittingSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueCommittingSpec.scala index 12838d68e82..0b1f4611c3a 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueCommittingSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueCommittingSpec.scala @@ -47,7 +47,7 @@ class KeyValueCommittingSpec extends AnyWordSpec with Matchers { applicationId = applicationId, commandId = commandId, deduplicationPeriod = ApiDeduplicationPeriod.DeduplicationDuration(Duration.ZERO), - submissionId = Ref.LedgerString.assertFromString("submission"), + submissionId = None, ledgerConfiguration = Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)), ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriterSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriterSpec.scala index 80b5eaa9dce..b68e16adefd 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriterSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateWriterSpec.scala @@ -171,7 +171,7 @@ object KeyValueParticipantStateWriterSpec { applicationId = Ref.LedgerString.assertFromString("tests"), commandId = Ref.LedgerString.assertFromString(commandId), deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofDays(1)), - submissionId = Ref.LedgerString.assertFromString("submission"), + submissionId = None, ledgerConfiguration = Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)), ) diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/SubmitterInfo.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/SubmitterInfo.scala index 4c6bb5e0dd8..bc4cb900827 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/SubmitterInfo.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/SubmitterInfo.scala @@ -32,7 +32,7 @@ final case class SubmitterInfo( applicationId: Ref.ApplicationId, commandId: Ref.CommandId, deduplicationPeriod: DeduplicationPeriod, - submissionId: Ref.SubmissionId, + submissionId: Option[Ref.SubmissionId], ledgerConfiguration: Configuration, ) { @@ -44,7 +44,7 @@ final case class SubmitterInfo( applicationId, commandId, Some(deduplicationPeriod), - Some(submissionId), + submissionId, ) } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index 77cd8ad16b1..44d97cb18d8 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -206,7 +206,7 @@ private[sandbox] final class InMemoryLedger( Some(commandId), transactionId, Some(`appId`), - _, + submissionId, actAs, _, _, @@ -222,12 +222,13 @@ private[sandbox] final class InMemoryLedger( commandId, transactionId, appId, + submissionId, ) case ( offset, InMemoryLedgerEntry( - LedgerEntry.Rejection(recordTime, commandId, `appId`, _, actAs, reason) + LedgerEntry.Rejection(recordTime, commandId, `appId`, submissionId, actAs, reason) ), ) if actAs.exists(parties) => val status = reason.toParticipantStateRejectionReason.status @@ -237,6 +238,7 @@ private[sandbox] final class InMemoryLedger( commandId, status, appId, + submissionId, ) } } @@ -423,7 +425,7 @@ private[sandbox] final class InMemoryLedger( Some(submitterInfo.commandId), transactionId, Some(submitterInfo.applicationId), - Some(submitterInfo.submissionId), + submitterInfo.submissionId, submitterInfo.actAs, transactionMeta.workflowId, transactionMeta.ledgerEffectiveTime, diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala index a10fccb199b..89b192ba56e 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala @@ -88,7 +88,6 @@ class TransactionTimeModelComplianceIT ledger: Ledger, ledgerTime: Instant, commandId: String, - submissionId: String = UUID.randomUUID().toString, configuration: Configuration, ) = { val dummyTransaction = TransactionBuilder.EmptySubmitted @@ -98,7 +97,7 @@ class TransactionTimeModelComplianceIT applicationId = Ref.ApplicationId.assertFromString("appId"), commandId = Ref.CommandId.assertFromString(commandId + UUID.randomUUID().toString), deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(JDuration.ZERO), - submissionId = Ref.SubmissionId.assertFromString(submissionId), + submissionId = None, ledgerConfiguration = configuration, ) val transactionMeta = state.TransactionMeta( diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala index fdc13f83225..b3b1dfd7f7f 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala @@ -264,7 +264,7 @@ final class SqlLedgerSpec applicationId = applicationId, commandId = commandId1, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)), - submissionId = submissionId1, + submissionId = None, ledgerConfiguration = Configuration.reasonableInitialConfiguration, ), transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now), @@ -299,7 +299,7 @@ final class SqlLedgerSpec applicationId = applicationId, commandId = commandId1, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)), - submissionId = submissionId1, + submissionId = None, ledgerConfiguration = Configuration.reasonableInitialConfiguration, ), transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now), @@ -368,7 +368,7 @@ final class SqlLedgerSpec applicationId = applicationId, commandId = commandId1, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)), - submissionId = submissionId1, + submissionId = None, ledgerConfiguration = configuration, ), transactionMeta =