From 7cc698948c368f1c8be17374e1d8848350d20b8e Mon Sep 17 00:00:00 2001 From: nicu-da Date: Wed, 25 Aug 2021 05:58:03 -0700 Subject: [PATCH] Add multiple ways of specifying deduplication [KVL-1047] (#10601) CHANGELOG_BEGIN ledger-api - Command deduplication period can now be specified by setting `deduplication_offset` instead of `deduplication_time` (only valid for v2 WriteService). This change is backwards compatible. CHANGELOG_END * Propagate the enriched deduplicationPeriod instead of deduplication duration * Update the Haskell bindings for the new deduplication period * Calculate the deduplicateUntil using the new deduplication period for backward compat * Use consistent naming for deduplication_period * Cleanup command timeout extraction from deduplication period * Add the required deduplication_offset to deduplication instead of deduplication_start * Update haskell bindings to support deduplication_offset * Add support for deduplication_offset in the ledger-api * Remove the timestamp-based deduplication from our models to simplify upgrade for users * Add optional conformance test for offset based deduplication * Remove buf rule for FIELD_SAME_ONEOF as our change is backwards compatible * Disable FIELD_SAME_ONEOF buf check for commands file * Apply suggestions from code review Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com> Co-authored-by: Samir Talwar * Update comment for deduplication period Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com> Co-authored-by: Samir Talwar --- buf.yaml | 5 +- .../daml-helper/src/DA/Daml/Helper/Ledger.hs | 2 +- .../chat/src/DA/Ledger/App/Chat/ChatLedger.hs | 2 +- .../nim/src/DA/Ledger/App/Nim/NimLedger.hs | 2 +- .../hs/bindings/src/DA/Ledger/Convert.hs | 9 +- .../hs/bindings/src/DA/Ledger/Types.hs | 9 +- .../hs/bindings/test/DA/Ledger/Tests.hs | 2 +- .../binding/retrying/CommandRetryFlowUT.scala | 18 ++-- .../com/daml/ledger/api/v1/commands.proto | 19 +++- .../services/commands/CommandClient.scala | 23 +++-- .../commands/tracker/CommandTracker.scala | 27 +++++- .../commands/CommandTrackerFlowTest.scala | 6 +- ledger/ledger-api-common/BUILD.bazel | 1 + .../api/validation/CommandsValidator.scala | 8 +- .../grpc/GrpcCommandSubmissionService.scala | 8 +- .../api/validation/FieldValidations.scala | 61 +++++++----- .../SubmitRequestValidatorTest.scala | 25 +++-- .../GrpcCommandSubmissionServiceSpec.scala | 10 +- ledger/ledger-api-domain/BUILD.bazel | 18 ++++ .../ledger/api/DeduplicationPeriod.scala | 36 ++++++- .../com/digitalasset/ledger/api/domain.scala | 10 +- .../ledger/api/DeduplicationPeriodSpec.scala | 24 +++++ .../suites/CommandDeduplicationIT.scala | 29 ++++-- .../suites/CommandDeduplicationOffsetIT.scala | 96 +++++++++++++++++++ .../ledger/api/testtool/tests/Tests.scala | 1 + .../StoreBackedCommandExecutor.scala | 3 +- .../services/ApiSubmissionService.scala | 7 +- .../StoreBackedCommandExecutorSpec.scala | 3 +- .../services/ApiSubmissionServiceSpec.scala | 4 +- .../state/v2/AdaptedV1WriteService.scala | 9 +- .../daml/nonrepudiation/testing/package.scala | 3 +- 31 files changed, 366 insertions(+), 114 deletions(-) create mode 100644 ledger/ledger-api-domain/src/test/suite/scala/com/daml/ledger/api/DeduplicationPeriodSpec.scala create mode 100644 ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationOffsetIT.scala diff --git a/buf.yaml b/buf.yaml index 48f9ea3b50..8e08e7cebc 100644 --- a/buf.yaml +++ b/buf.yaml @@ -4,7 +4,7 @@ version: v1beta1 deps: - - buf.build/googleapis/googleapis + - buf.build/googleapis/googleapis build: roots: @@ -30,3 +30,6 @@ lint: breaking: use: - WIRE_JSON + ignore_only: + FIELD_SAME_ONEOF: # ignore field_same_oneof to not trigger an error from moving an existing field into a new oneof + - com/daml/ledger/api/v1/commands.proto diff --git a/daml-assistant/daml-helper/src/DA/Daml/Helper/Ledger.hs b/daml-assistant/daml-helper/src/DA/Daml/Helper/Ledger.hs index 4a634fd312..385d565b11 100644 --- a/daml-assistant/daml-helper/src/DA/Daml/Helper/Ledger.hs +++ b/daml-assistant/daml-helper/src/DA/Daml/Helper/Ledger.hs @@ -487,7 +487,7 @@ reset args = do , cid = L.CommandId $ TL.fromStrict $ UUID.toText cmdId , actAs = parties , readAs = [] - , dedupTime = Nothing + , dedupPeriod = Nothing , minLeTimeAbs = Nothing , minLeTimeRel = Nothing , sid = Nothing diff --git a/language-support/hs/bindings/examples/chat/src/DA/Ledger/App/Chat/ChatLedger.hs b/language-support/hs/bindings/examples/chat/src/DA/Ledger/App/Chat/ChatLedger.hs index 3a1e555259..b74709c6cf 100644 --- a/language-support/hs/bindings/examples/chat/src/DA/Ledger/App/Chat/ChatLedger.hs +++ b/language-support/hs/bindings/examples/chat/src/DA/Ledger/App/Chat/ChatLedger.hs @@ -65,7 +65,7 @@ getTrans party Handle{log,lid} = do submitCommand :: Handle -> Party -> Command -> IO (Either String ()) submitCommand Handle{lid} party com = do cid <- randomCid - run 5 $ Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid}) + run 5 $ Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid}) where wid = Nothing myAid = ApplicationId "chat-console" diff --git a/language-support/hs/bindings/examples/nim/src/DA/Ledger/App/Nim/NimLedger.hs b/language-support/hs/bindings/examples/nim/src/DA/Ledger/App/Nim/NimLedger.hs index 848a69751a..e81c9bd140 100644 --- a/language-support/hs/bindings/examples/nim/src/DA/Ledger/App/Nim/NimLedger.hs +++ b/language-support/hs/bindings/examples/nim/src/DA/Ledger/App/Nim/NimLedger.hs @@ -65,7 +65,7 @@ getTrans player Handle{log,lid} = do submitCommand :: Handle -> Party -> Command -> IO (Either String ()) submitCommand Handle{lid} party com = do cid <- randomCid - run 5 (Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid})) + run 5 (Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid})) where wid = Nothing myAid = ApplicationId "nim" diff --git a/language-support/hs/bindings/src/DA/Ledger/Convert.hs b/language-support/hs/bindings/src/DA/Ledger/Convert.hs index e001d30373..abc2672a47 100644 --- a/language-support/hs/bindings/src/DA/Ledger/Convert.hs +++ b/language-support/hs/bindings/src/DA/Ledger/Convert.hs @@ -74,11 +74,18 @@ lowerCommands = \case commandsActAs = Vector.fromList $ map unParty actAs, commandsReadAs = Vector.fromList $ map unParty readAs, commandsSubmissionId = unSubmissionId (fromMaybe (SubmissionId "") sid), - commandsDeduplicationTime = dedupTime, + commandsDeduplicationPeriod = fmap lowerDeduplicationPeriod dedupPeriod, commandsCommands = Vector.fromList $ map lowerCommand coms, commandsMinLedgerTimeAbs = fmap lowerTimestamp minLeTimeAbs, commandsMinLedgerTimeRel = minLeTimeRel } +lowerDeduplicationPeriod :: DeduplicationPeriod -> LL.CommandsDeduplicationPeriod +lowerDeduplicationPeriod = \case + DeduplicationTime t -> + LL.CommandsDeduplicationPeriodDeduplicationTime t + DeduplicationOffset o -> + LL.CommandsDeduplicationPeriodDeduplicationOffset (unAbsOffset o) + lowerCommand :: Command -> LL.Command lowerCommand = \case CreateCommand{..} -> diff --git a/language-support/hs/bindings/src/DA/Ledger/Types.hs b/language-support/hs/bindings/src/DA/Ledger/Types.hs index 66588ec5db..5426b0df68 100644 --- a/language-support/hs/bindings/src/DA/Ledger/Types.hs +++ b/language-support/hs/bindings/src/DA/Ledger/Types.hs @@ -55,7 +55,7 @@ module DA.Ledger.Types( -- High Level types for communication over Ledger API SubmissionId(..), LL.Duration(..), LL.Status(..), - + DeduplicationPeriod(..) ) where import qualified Data.Aeson as A @@ -77,7 +77,7 @@ data Commands = Commands , cid :: CommandId , actAs :: [Party] , readAs :: [Party] - , dedupTime :: Maybe LL.Duration + , dedupPeriod :: Maybe DeduplicationPeriod , coms :: [Command] , minLeTimeAbs :: Maybe Timestamp , minLeTimeRel :: Maybe LL.Duration @@ -103,6 +103,11 @@ data Command } deriving (Eq,Ord,Show) +data DeduplicationPeriod + = DeduplicationTime LL.Duration + | DeduplicationOffset AbsOffset + deriving (Eq, Ord, Show) + -- ledger_offset.proto data LedgerOffset = LedgerBegin | LedgerEnd | LedgerAbsOffset AbsOffset diff --git a/language-support/hs/bindings/test/DA/Ledger/Tests.hs b/language-support/hs/bindings/test/DA/Ledger/Tests.hs index 5bb7484678..42c33259a8 100644 --- a/language-support/hs/bindings/test/DA/Ledger/Tests.hs +++ b/language-support/hs/bindings/test/DA/Ledger/Tests.hs @@ -656,7 +656,7 @@ makeCommands :: LedgerId -> Party -> Command -> IO (CommandId,Commands) makeCommands lid party com = do cid <- liftIO randomCid let wid = Nothing - return $ (cid,) $ Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid=Nothing} + return $ (cid,) $ Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid=Nothing} myAid :: ApplicationId diff --git a/language-support/scala/bindings-akka/src/test/scala/com/digitalasset/ledger/client/binding/retrying/CommandRetryFlowUT.scala b/language-support/scala/bindings-akka/src/test/scala/com/digitalasset/ledger/client/binding/retrying/CommandRetryFlowUT.scala index b81508d82b..e8ab0fc720 100644 --- a/language-support/scala/bindings-akka/src/test/scala/com/digitalasset/ledger/client/binding/retrying/CommandRetryFlowUT.scala +++ b/language-support/scala/bindings-akka/src/test/scala/com/digitalasset/ledger/client/binding/retrying/CommandRetryFlowUT.scala @@ -19,7 +19,6 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ } import com.daml.ledger.client.testing.AkkaTest import com.daml.util.Ctx -import com.google.protobuf.duration.{Duration => protoDuration} import com.google.rpc.Code import com.google.rpc.status.Status import org.scalatest.Inside @@ -37,8 +36,13 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with val mockCommandSubmission: SubmissionFlowType[RetryInfo[Status]] = Flow[In[RetryInfo[Status]]] .map { - case Ctx(context @ RetryInfo(_, _, _, status), SubmitRequest(Some(commands)), _) => - if (commands.deduplicationTime.get.nanos == 0) { + case Ctx( + context @ RetryInfo(_, nrOfRetries, _, status), + SubmitRequest(Some(commands)), + _, + ) => + // Return a completion based on the input status code only on the first submission. + if (nrOfRetries == 0) { Ctx(context, CompletionResponse(Completion(commands.commandId, Some(status)))) } else { Ctx( @@ -58,10 +62,7 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with retryInfo: RetryInfo[Status], response: Either[CompletionFailure, CompletionSuccess], ) = { - val commands = retryInfo.request.commands.get - val dedupTime = commands.deduplicationTime.get - val newDedupTime = dedupTime.copy(nanos = dedupTime.nanos + 1) - SubmitRequest(Some(commands.copy(deduplicationTime = Some(newDedupTime)))) + SubmitRequest(retryInfo.request.commands) } val retryFlow: SubmissionFlowType[RetryInfo[Status]] = @@ -78,7 +79,6 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with "commandId", "party", Seq.empty, - Some(protoDuration.of(120, 0)), ) ) ) @@ -92,7 +92,7 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with .runWith(Sink.seq) } - CommandRetryFlow.getClass.getSimpleName should { + "command retry flow" should { "propagate OK status" in { submitRequest(Code.OK_VALUE, Instant.ofEpochSecond(45)) map { result => diff --git a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto index 897b0d2979..f626a7bf99 100644 --- a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto +++ b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto @@ -57,11 +57,20 @@ message Commands { // Required repeated Command commands = 8; - // The length of the time window during which all commands with the same ``act_as`` parties and - // the same command ID will be deduplicated. - // Duplicate commands submitted before the end of this window return an ALREADY_EXISTS error. - // Optional - google.protobuf.Duration deduplication_time = 9; + oneof deduplication_period { + + // Specifies the length of the deduplication period. + // It is interpreted relative to the local clock at some point during the submission's processing. + // + // Must be non-negative. + google.protobuf.Duration deduplication_time = 9; + + // Specifies the start of the deduplication period by a completion stream offset. + // + // Must be a valid LedgerString (as described in ``value.proto``). + string deduplication_offset = 15; + + } // Lower bound for the ledger time assigned to the resulting transaction. // Note: The ledger time of a transaction is assigned as part of command interpretation. diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala index 9d16d15119..8d7504d1ec 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala @@ -17,6 +17,7 @@ import com.daml.ledger.api.v1.command_completion_service.{ } import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionServiceStub import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.api.validation.CommandsValidator import com.daml.ledger.client.LedgerClient @@ -180,16 +181,18 @@ final class CommandClient( throw new IllegalArgumentException( s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)" ) - val updateDedupTime = commands.deduplicationTime.orElse( - Some( - Duration - .of( - config.defaultDeduplicationTime.getSeconds, - config.defaultDeduplicationTime.getNano, - ) - ) - ) - r.copy(commands = Some(commands.copy(deduplicationTime = updateDedupTime))) + val updatedDeduplicationPeriod = commands.deduplicationPeriod match { + case DeduplicationPeriod.Empty => + DeduplicationPeriod.DeduplicationTime( + Duration + .of( + config.defaultDeduplicationTime.getSeconds, + config.defaultDeduplicationTime.getNano, + ) + ) + case existing => existing + } + r.copy(commands = Some(commands.copy(deduplicationPeriod = updatedDeduplicationPeriod))) }) def submissionFlow[Context]( diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala index 26cafbcacc..664a0f92a0 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala @@ -9,6 +9,7 @@ import akka.stream.stage._ import akka.stream.{Attributes, Inlet, Outlet} import com.daml.grpc.{GrpcException, GrpcStatus} import com.daml.ledger.api.v1.command_submission_service._ +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ @@ -233,12 +234,11 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti }, ) case Some(maxDedup) => - val deduplicationDuration = commands.deduplicationTime - .map(d => JDuration.ofSeconds(d.seconds, d.nanos.toLong)) - .getOrElse(maxDedup) + val commandTimeout = + maxTimeoutFromDeduplicationPeriod(commands.deduplicationPeriod, maxDedup) val trackingData = TrackingData( commandId = commandId, - commandTimeout = Instant.now().plus(deduplicationDuration), + commandTimeout = commandTimeout, context = submitRequest.context, ) pendingCommands += commandId -> trackingData @@ -318,6 +318,25 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti logic -> promise.future } + private def maxTimeoutFromDeduplicationPeriod( + deduplicationPeriod: DeduplicationPeriod, + maxDeduplicationDuration: JDuration, + ) = { + val timeoutDuration = deduplicationPeriod match { + case DeduplicationPeriod.Empty => + maxDeduplicationDuration + case DeduplicationPeriod.DeduplicationTime(duration) => + JDuration.ofSeconds(duration.seconds, duration.nanos.toLong) + case DeduplicationPeriod.DeduplicationOffset( + _ + ) => //no way of extracting the duration from here, will be removed soon + maxDeduplicationDuration + } + Instant + .now() + .plus(timeoutDuration) + } + override def shape: CommandTrackerShape[Context] = CommandTrackerShape(submitRequestIn, submitRequestOut, commandResultIn, resultOut, offsetOut) diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala index 6c39dd1306..e6aeb35578 100644 --- a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala @@ -29,6 +29,7 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ NotOkResponse, } import com.daml.util.Ctx +import com.google.protobuf.duration.{Duration => DurationProto} import com.google.protobuf.empty.Empty import com.google.protobuf.timestamp.Timestamp import com.google.rpc.code._ @@ -80,8 +81,9 @@ class CommandTrackerFlowTest Some( Commands( commandId = commandId, - deduplicationTime = - dedupTime.map(t => com.google.protobuf.duration.Duration(t.getSeconds)), + deduplicationPeriod = dedupTime + .map(t => Commands.DeduplicationPeriod.DeduplicationTime(DurationProto(t.getSeconds))) + .getOrElse(Commands.DeduplicationPeriod.Empty), ) ) ), diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index b4be2e59ce..b3dd912b1a 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -36,6 +36,7 @@ da_scala_library( "//ledger/ledger-api-akka", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", + "//ledger/ledger-offset", "//ledger/ledger-resources", "//ledger/metrics", "//libs-scala/concurrent", diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala index 9e15baf1d6..dedefa3c12 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala @@ -59,10 +59,10 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss s"Can not represent command ledger time $ledgerEffectiveTime as a Daml timestamp" ) ) - deduplicationDuration <- validateDeduplicationDuration( - commands.deduplicationTime, + deduplicationPeriod <- validateDeduplicationPeriod( + commands.deduplicationPeriod, maxDeduplicationTime, - "deduplication_time", + "deduplication_period", ) } yield domain.Commands( ledgerId = ledgerId, @@ -73,7 +73,7 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss actAs = submitters.actAs, readAs = submitters.readAs, submittedAt = currentUtcTime, - deduplicationDuration = deduplicationDuration, + deduplicationPeriod = deduplicationPeriod, commands = Commands( commands = ImmArray(validatedCommands), ledgerEffectiveTime = ledgerEffectiveTimestamp, diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala index c6d7de0844..fd06ea70ae 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala @@ -60,8 +60,12 @@ class GrpcCommandSubmissionService( Timed .value( metrics.daml.commands.validation, - validator - .validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime()), + validator.validate( + request, + currentLedgerTime(), + currentUtcTime(), + maxDeduplicationTime(), + ), ) .fold( t => Future.failed(ValidationLogger.logFailure(request, t)), diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala index 5f06cfd48e..95a1586d9a 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala @@ -5,11 +5,14 @@ package com.daml.platform.server.api.validation import java.time.Duration -import com.daml.lf.data.Ref -import com.daml.lf.value.Value.ContractId +import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto} import com.daml.ledger.api.v1.value.Identifier +import com.daml.ledger.offset.Offset +import com.daml.lf.data.Ref import com.daml.lf.data.Ref.Party +import com.daml.lf.value.Value.ContractId import com.daml.platform.server.api.validation.ErrorFactories._ import io.grpc.StatusRuntimeException @@ -99,32 +102,42 @@ trait FieldValidations { def requirePresence[T](option: Option[T], fieldName: String): Either[StatusRuntimeException, T] = option.fold[Either[StatusRuntimeException, T]](Left(missingField(fieldName)))(Right(_)) - def validateDeduplicationDuration( - durationO: Option[com.google.protobuf.duration.Duration], + /** We validate only using current time because we set the currentTime as submitTime so no need to check both + */ + def validateDeduplicationPeriod( + deduplicationPeriod: DeduplicationPeriodProto, maxDeduplicationTimeO: Option[Duration], fieldName: String, - ): Either[StatusRuntimeException, Duration] = - maxDeduplicationTimeO.fold[Either[StatusRuntimeException, Duration]]( + ): Either[StatusRuntimeException, DeduplicationPeriod] = { + + maxDeduplicationTimeO.fold[Either[StatusRuntimeException, DeduplicationPeriod]]( Left(missingLedgerConfig()) - )(maxDeduplicationTime => - durationO match { - case None => - Right(maxDeduplicationTime) - case Some(duration) => - val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong) - if (result.isNegative) - Left(invalidField(fieldName, "Duration must be positive")) - else if (result.compareTo(maxDeduplicationTime) > 0) - Left( - invalidField( - fieldName, - s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime", - ) - ) - else - Right(result) + )(maxDeduplicationDuration => { + def validateDuration(duration: Duration, exceedsMaxDurationMessage: String) = { + if (duration.isNegative) + Left(invalidField(fieldName, "Duration must be positive")) + else if (duration.compareTo(maxDeduplicationDuration) > 0) + Left(invalidField(fieldName, exceedsMaxDurationMessage)) + else Right(duration) } - ) + deduplicationPeriod match { + case DeduplicationPeriodProto.Empty => + Right(DeduplicationPeriod.DeduplicationDuration(maxDeduplicationDuration)) + case DeduplicationPeriodProto.DeduplicationTime(duration) => + val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong) + validateDuration( + result, + s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration", + ).map(DeduplicationPeriod.DeduplicationDuration) + case DeduplicationPeriodProto.DeduplicationOffset(offset) => + Right( + DeduplicationPeriod.DeduplicationOffset( + Offset.fromHexString(Ref.HexString.assertFromString(offset)) + ) + ) + } + }) + } def validateIdentifier(identifier: Identifier): Either[StatusRuntimeException, Ref.Identifier] = for { diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala index 8453680ccf..bd6ae20935 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala @@ -7,12 +7,13 @@ import java.time.Instant import java.util.UUID import com.daml.api.util.{DurationConversion, TimestampConversion} -import com.daml.ledger.api.{DomainMocks, SubmissionIdGenerator} import com.daml.ledger.api.DomainMocks.{applicationId, commandId, submissionId, workflowId} import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands} +import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto} import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand} import com.daml.ledger.api.v1.value.Value.Sum import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _} +import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks, SubmissionIdGenerator} import com.daml.lf.command.{Commands => LfCommands, CreateCommand => LfCreateCommand} import com.daml.lf.data._ import com.daml.lf.value.Value.ValueRecord @@ -59,7 +60,7 @@ class SubmitRequestValidatorTest commandId = commandId.unwrap, party = submitter, commands = Seq(command), - deduplicationTime = Some(deduplicationTime), + deduplicationPeriod = DeduplicationPeriodProto.DeduplicationTime(deduplicationTime), minLedgerTimeAbs = None, minLedgerTimeRel = None, ) @@ -84,7 +85,7 @@ class SubmitRequestValidatorTest actAs = Set(DomainMocks.party), readAs = Set.empty, submittedAt = submittedAt, - deduplicationDuration = deduplicationDuration, + deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(deduplicationDuration), commands = LfCommands( ImmArray( LfCreateCommand( @@ -297,13 +298,15 @@ class SubmitRequestValidatorTest val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) requestMustFailWith( commandsValidator.validateCommands( - api.commands.copy(deduplicationTime = Some(Duration.of(-1, 0))), + api.commands.copy(deduplicationPeriod = + DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0)) + ), internal.ledgerTime, internal.submittedAt, Some(internal.maxDeduplicationTime), ), INVALID_ARGUMENT, - "Invalid field deduplication_time: Duration must be positive", + "Invalid field deduplication_period: Duration must be positive", ) } @@ -312,13 +315,16 @@ class SubmitRequestValidatorTest val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) requestMustFailWith( commandsValidator.validateCommands( - api.commands.copy(deduplicationTime = Some(Duration.of(manySeconds, 0))), + api.commands + .copy(deduplicationPeriod = + DeduplicationPeriodProto.DeduplicationTime(Duration.of(manySeconds, 0)) + ), internal.ledgerTime, internal.submittedAt, Some(internal.maxDeduplicationTime), ), INVALID_ARGUMENT, - s"Invalid field deduplication_time: The given deduplication time of ${java.time.Duration + s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration .ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}", ) } @@ -327,13 +333,14 @@ class SubmitRequestValidatorTest val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId) commandsValidator.validateCommands( - api.commands.copy(deduplicationTime = None), + api.commands.copy(deduplicationPeriod = DeduplicationPeriodProto.Empty), internal.ledgerTime, internal.submittedAt, Some(internal.maxDeduplicationTime), ) shouldEqual Right( internal.emptyCommands.copy( - deduplicationDuration = internal.maxDeduplicationTime + deduplicationPeriod = + DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationTime) ) ) } diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala index f679f8bb88..6aa14c4691 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala @@ -8,15 +8,7 @@ import java.time.{Duration, Instant} import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.messages.command.submission.SubmitRequest -import com.daml.ledger.api.testing.utils.MockMessages.{ - applicationId, - commandId, - commands, - ledgerId, - party, - submitRequest, - workflowId, -} +import com.daml.ledger.api.testing.utils.MockMessages._ import com.daml.ledger.api.v1.commands.{Command, CreateCommand} import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} import com.daml.lf.data.Ref diff --git a/ledger/ledger-api-domain/BUILD.bazel b/ledger/ledger-api-domain/BUILD.bazel index b61f0f19c4..f3cfa93888 100644 --- a/ledger/ledger-api-domain/BUILD.bazel +++ b/ledger/ledger-api-domain/BUILD.bazel @@ -4,6 +4,7 @@ load( "//bazel_tools:scala.bzl", "da_scala_library", + "da_scala_test_suite", ) da_scala_library( @@ -45,3 +46,20 @@ da_scala_library( "//daml-lf/data", ], ) + +da_scala_test_suite( + name = "ledger-api-domain-tests", + size = "small", + srcs = glob(["src/test/suite/scala/**/*.scala"]), + scala_deps = [ + "@maven//:org_scalatest_scalatest", + "@maven//:org_scalaz_scalaz_core", + "@maven//:org_scala_lang_modules_scala_collection_compat", + "@maven//:org_scala_lang_modules_scala_java8_compat", + ], + deps = [ + ":ledger-api-domain", + ":ledger-api-domain-tests-lib", + "//daml-lf/data", + ], +) diff --git a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/DeduplicationPeriod.scala b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/DeduplicationPeriod.scala index 8b85c9ab67..36b1b6ee4c 100644 --- a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/DeduplicationPeriod.scala +++ b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/DeduplicationPeriod.scala @@ -3,7 +3,7 @@ package com.daml.ledger.api -import java.time.Duration +import java.time.{Duration, Instant} import com.daml.ledger.offset.Offset import com.daml.logging.entries.{LoggingValue, ToLoggingValue} @@ -18,6 +18,40 @@ sealed trait DeduplicationPeriod extends Product with Serializable object DeduplicationPeriod { + /** Transforms the [[period]] into an [[Instant]] to be used for deduplication into the future(deduplicateUntil). + * Only used for backwards compatibility + * @param time The time to use for calculating the [[Instant]]. It can either be submission time or current time, based on usage + * @param period The deduplication period + */ + def deduplicateUntil( + time: Instant, + period: DeduplicationPeriod, + ): Instant = period match { + case DeduplicationDuration(duration) => + time.plus(duration) + case DeduplicationOffset(_) => + throw new NotImplementedError("Offset deduplication is not supported") + } + + /** Computes deduplication duration as the duration `time + minSkew - deduplicationStart`. + * We measure `deduplicationStart` on the ledger’s clock, and thus + * we need to add the minSkew to compensate for the maximal skew that the participant might be behind the ledger’s clock. + * @param time submission time or current time + * @param deduplicationStart the [[Instant]] from where we should start deduplication, must be < than time + * @param minSkew the minimum skew as specified by the current ledger time model + */ + def deduplicationDurationFromTime( + time: Instant, + deduplicationStart: Instant, + minSkew: Duration, + ): Duration = { + assert(deduplicationStart.isBefore(time), "Deduplication must start in the past") + Duration.between( + deduplicationStart, + time.plus(minSkew), + ) + } + /** The length of the deduplication window, which ends when the [[WriteService]] or underlying Daml ledger processes * the command submission. * diff --git a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala index 422fbc5d6f..8038211220 100644 --- a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala +++ b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala @@ -3,7 +3,7 @@ package com.daml.ledger.api -import java.time.{Duration, Instant} +import java.time.Instant import com.daml.ledger.api.domain.Event.{CreateOrArchiveEvent, CreateOrExerciseEvent} import com.daml.ledger.configuration.Configuration @@ -283,11 +283,9 @@ object domain { actAs: Set[Ref.Party], readAs: Set[Ref.Party], submittedAt: Instant, - deduplicationDuration: Duration, + deduplicationPeriod: DeduplicationPeriod, commands: LfCommands, - ) { - lazy val deduplicateUntil: Instant = submittedAt.plus(deduplicationDuration) - } + ) object Commands { @@ -302,7 +300,7 @@ object domain { "actAs" -> commands.actAs, "readAs" -> commands.readAs, "submittedAt" -> commands.submittedAt, - "deduplicationDuration" -> commands.deduplicationDuration, + "deduplicationPeriod" -> commands.deduplicationPeriod, ) } diff --git a/ledger/ledger-api-domain/src/test/suite/scala/com/daml/ledger/api/DeduplicationPeriodSpec.scala b/ledger/ledger-api-domain/src/test/suite/scala/com/daml/ledger/api/DeduplicationPeriodSpec.scala new file mode 100644 index 0000000000..e11eef5909 --- /dev/null +++ b/ledger/ledger-api-domain/src/test/suite/scala/com/daml/ledger/api/DeduplicationPeriodSpec.scala @@ -0,0 +1,24 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api + +import java.time.{Duration, Instant} + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class DeduplicationPeriodSpec extends AnyWordSpec with Matchers { + "calculating deduplication until" should { + val time = Instant.ofEpochSecond(100) + + "return expected result when sending duration" in { + val deduplicateUntil = DeduplicationPeriod.deduplicateUntil( + time, + DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(3)), + ) + deduplicateUntil shouldEqual time.plusSeconds(3) + } + + } +} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index 46801ca33c..3eef3e5def 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -7,14 +7,17 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions._ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.client.binding.Primitive import com.daml.ledger.test.model.DA.Types.Tuple2 import com.daml.ledger.test.model.Test.TextKeyOperations._ import com.daml.ledger.test.model.Test._ import com.daml.timer.Delayed import io.grpc.Status -import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration) @@ -31,18 +34,29 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva "Deduplicate commands within the deduplication time window", allocate(SingleParty), )(implicit ec => { case Participants(Participant(ledger, party)) => - val requestA1 = ledger + requestsAreSubmittedAndDeduplicated( + ledger, + party, + DeduplicationPeriod.DeduplicationTime(deduplicationTime.asProtobuf), + ) + }) + + private def requestsAreSubmittedAndDeduplicated( + ledger: ParticipantTestContext, + party: Primitive.Party, + deduplicationPeriod: => DeduplicationPeriod, + )(implicit ec: ExecutionContext) = { + lazy val requestA1 = ledger .submitRequest(party, DummyWithAnnotation(party, "First submission").create.command) .update( - _.commands.deduplicationTime := deduplicationTime.asProtobuf + _.commands.deduplicationPeriod := deduplicationPeriod ) - val requestA2 = ledger + lazy val requestA2 = ledger .submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command) .update( - _.commands.deduplicationTime := deduplicationTime.asProtobuf, + _.commands.deduplicationPeriod := deduplicationPeriod, _.commands.commandId := requestA1.commands.get.commandId, ) - for { // Submit command A (first deduplication window) // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, @@ -53,7 +67,6 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva .submit(requestA1) .mustFail("submitting the first request for the second time") completions1 <- ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd1)(party)) - // Wait until the end of first deduplication window _ <- Delayed.by(deduplicationWindowWait)(()) @@ -96,7 +109,7 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva s"There should be 2 active contracts, but received $activeContracts", ) } - }) + } test( "CDStopOnSubmissionFailure", diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationOffsetIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationOffsetIT.scala new file mode 100644 index 0000000000..12547282c3 --- /dev/null +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationOffsetIT.scala @@ -0,0 +1,96 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.testtool.suites + +import com.daml.ledger.api.testtool.infrastructure.Allocation._ +import com.daml.ledger.api.testtool.infrastructure.Assertions._ +import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.test.model.Test._ +import com.google.rpc.code.Code + +final class CommandDeduplicationOffsetIT extends LedgerTestSuite { + + test( + "CDOffsetDeduplication", + "Deduplicate commands based on offset", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + for { + ledgerEnd1 <- ledger.currentEnd() + requestA1 = ledger + .submitRequest(party, DummyWithAnnotation(party, "First submission").create.command) + .update( + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset( + ledgerEnd1.getAbsolute + ) + ) + _ <- ledger.submit(requestA1) + completionsA1First <- ledger.firstCompletions( + ledger.completionStreamRequest(ledgerEnd1)(party) + ) + ledgerEndAfterFirstSubmit <- ledger.currentEnd() + _ <- ledger.submit(requestA1) + completionsA1Second <- ledger.firstCompletions( + ledger.completionStreamRequest(ledgerEndAfterFirstSubmit)(party) + ) // This will be deduplicated as the deduplication offset includes the first submitted command. + ledgerEnd2 <- ledger.currentEnd() + requestA2 = ledger + .submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command) + .update( + _.commands.deduplicationPeriod := DeduplicationPeriod + .DeduplicationOffset(ledgerEnd2.getAbsolute), + _.commands.commandId := requestA1.commands.get.commandId, + ) + _ <- ledger.submit( + requestA2 + ) // the deduplication offset is moved to the last completion so this will be successful + completionsA2First <- ledger.firstCompletions( + ledger.completionStreamRequest(ledgerEnd2)(party) + ) + activeContracts <- ledger.activeContracts(party) + } yield { + assert(ledgerEnd1 != ledgerEnd2) + val completionCommandId1 = + assertSingleton("Expected only one first completion", completionsA1First) + assert( + completionCommandId1.commandId == requestA1.commands.get.commandId, + "The command ID of the first completion does not match the command ID of the first submission", + ) + assert( + completionCommandId1.status.get.code == Code.OK.value, + s"First command did not complete OK. Had status ${completionCommandId1.status}", + ) + val failureCommandId1 = + assertSingleton("Expected only one first failure", completionsA1Second) + assert( + failureCommandId1.commandId == requestA1.commands.get.commandId, + "The command ID of the second completion does not match the command ID of the first submission", + ) + assert( + failureCommandId1.status.get.code == Code.ALREADY_EXISTS.value, + s"Second completion for the first submission was not deduplicated ${failureCommandId1.status}", + ) + + val completionCommandId2 = + assertSingleton( + "Expected only one second successful completion", + completionsA2First, + ) + + assert( + completionCommandId2.commandId == requestA2.commands.get.commandId, + "The command ID of the second completion does not match the command ID of the second submission", + ) + assert( + completionCommandId2.status.get.code == Code.OK.value, + s"Second command did not complete OK. Had status ${completionCommandId1.status}", + ) + assert( + activeContracts.size == 2, + s"There should be 2 active contracts, but received ${activeContracts.size} contracts, with events: $activeContracts", + ) + } + }) +} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala index c5566352cb..14ade2642e 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala @@ -62,6 +62,7 @@ object Tests { Vector( new ParticipantPruningIT, new MultiPartySubmissionIT, + new CommandDeduplicationOffsetIT, ) val retired: Vector[LedgerTestSuite] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala index e33490607e..e90f286fca 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/StoreBackedCommandExecutor.scala @@ -6,7 +6,6 @@ package com.daml.platform.apiserver.execution import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong -import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.api.domain.{Commands => ApiCommands} import com.daml.ledger.configuration.Configuration import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService} @@ -80,7 +79,7 @@ private[apiserver] final class StoreBackedCommandExecutor( commands.actAs.toList, commands.applicationId.unwrap, commands.commandId.unwrap, - DeduplicationPeriod.DeduplicationDuration(commands.deduplicationDuration), + commands.deduplicationPeriod, commands.submissionId.unwrap, ledgerConfiguration, ), diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 6816c9798d..5496f60e52 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -7,7 +7,7 @@ import java.time.{Duration, Instant} import java.util.UUID import com.daml.api.util.TimeProvider -import com.daml.ledger.api.SubmissionIdGenerator +import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator} import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.configuration.Configuration @@ -140,7 +140,10 @@ private[apiserver] final class ApiSubmissionService private[services] ( commands.commandId, commands.actAs.toList, commands.submittedAt, - commands.deduplicateUntil, + DeduplicationPeriod.deduplicateUntil( + commands.submittedAt, + commands.deduplicationPeriod, + ), ) .flatMap { case CommandDeduplicationNew => diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala index 079d5dacbe..463279036e 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/execution/StoreBackedCommandExecutorSpec.scala @@ -6,6 +6,7 @@ package com.daml.platform.apiserver.execution import java.time.{Duration, Instant} import com.codahale.metrics.MetricRegistry +import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId, SubmissionId} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService} @@ -63,7 +64,7 @@ class StoreBackedCommandExecutorSpec actAs = Set.empty, readAs = Set.empty, submittedAt = Instant.EPOCH, - deduplicationDuration = Duration.ZERO, + deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO), commands = LfCommands( commands = ImmArray.empty, ledgerEffectiveTime = Time.Timestamp.Epoch, diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index e00f0912b4..0447ce0186 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -9,7 +9,7 @@ import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.atomic.AtomicInteger import com.codahale.metrics.MetricRegistry -import com.daml.ledger.api.DomainMocks +import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks} import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -418,7 +418,7 @@ object ApiSubmissionServiceSpec { actAs = Set.empty, readAs = Set.empty, submittedAt = Instant.MIN, - deduplicationDuration = Duration.ZERO, + deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO), commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""), ) ) diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1WriteService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1WriteService.scala index 9eaa272bf1..18dc7ef30d 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1WriteService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1WriteService.scala @@ -89,11 +89,10 @@ private[v2] object AdaptedV1WriteService { private val NoErrorDetails = Seq.empty[com.google.protobuf.any.Any] def adaptSubmitterInfo(submitterInfo: SubmitterInfo): v1.SubmitterInfo = { - val deduplicateUntil = submitterInfo.deduplicationPeriod match { - case DeduplicationPeriod.DeduplicationDuration(duration) => Instant.now().plus(duration) - case DeduplicationPeriod.DeduplicationOffset(_) => - throw new NotImplementedError("Deduplication offset not supported as deduplication period") - } + val deduplicateUntil = DeduplicationPeriod.deduplicateUntil( + Instant.now(), + submitterInfo.deduplicationPeriod, + ) v1.SubmitterInfo( actAs = submitterInfo.actAs, applicationId = submitterInfo.applicationId, diff --git a/runtime-components/non-repudiation-testing/src/main/scala/com/daml/nonrepudiation/testing/package.scala b/runtime-components/non-repudiation-testing/src/main/scala/com/daml/nonrepudiation/testing/package.scala index a82173233a..de4f6ca436 100644 --- a/runtime-components/non-repudiation-testing/src/main/scala/com/daml/nonrepudiation/testing/package.scala +++ b/runtime-components/non-repudiation-testing/src/main/scala/com/daml/nonrepudiation/testing/package.scala @@ -8,6 +8,7 @@ import java.security.cert.X509Certificate import java.util.UUID import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod.DeduplicationTime import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand} import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} import com.google.protobuf.duration.Duration @@ -66,7 +67,7 @@ package object testing { ) ) ), - deduplicationTime = Some(Duration(seconds = 1.day.toSeconds)), + deduplicationPeriod = DeduplicationTime(Duration(seconds = 1.day.toSeconds)), minLedgerTimeRel = Some(Duration(seconds = 1.minute.toSeconds)), ) )