From eabb19d7f74f0016396f6fa821b59570345f07b4 Mon Sep 17 00:00:00 2001 From: nicu-da Date: Thu, 26 Aug 2021 02:33:01 -0700 Subject: [PATCH] [ledger-api] Add deduplication_duration to deduplication period [kvl-1047] (#10676) * Add `deduplication_duration` to `deduplication_period` and deprecate `deduplication_time` CHANGELOG_BEGIN ledger-api - add `deduplication_duration` as a future replacement for `deduplication_time` in the command proto definition CHANGELOG_END * Add tests for deduplication period validation --- .../com/daml/ledger/api/v1/commands.proto | 10 +- .../commands/tracker/CommandTracker.scala | 2 + ledger/ledger-api-common/BUILD.bazel | 2 + .../api/validation/FieldValidations.scala | 18 ++- .../SubmitRequestValidatorTest.scala | 143 ++++++++++++------ .../suites/CommandDeduplicationIT.scala | 27 ++-- 6 files changed, 130 insertions(+), 72 deletions(-) diff --git a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto index f626a7bf99..ab61cb0b95 100644 --- a/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto +++ b/ledger-api/grpc-definitions/com/daml/ledger/api/v1/commands.proto @@ -59,10 +59,9 @@ message Commands { oneof deduplication_period { - // Specifies the length of the deduplication period. - // It is interpreted relative to the local clock at some point during the submission's processing. + // Same semantics as deduplication_duration // - // Must be non-negative. + // DEPRECATED google.protobuf.Duration deduplication_time = 9; // Specifies the start of the deduplication period by a completion stream offset. @@ -70,6 +69,11 @@ message Commands { // Must be a valid LedgerString (as described in ``value.proto``). string deduplication_offset = 15; + // Specifies the length of the deduplication period. + // It is interpreted relative to the local clock at some point during the submission's processing. + // + // Must be non-negative. + google.protobuf.Duration deduplication_duration = 16; } // Lower bound for the ledger time assigned to the resulting transaction. diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala index 664a0f92a0..bc15ccaa8a 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala @@ -327,6 +327,8 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti maxDeduplicationDuration case DeduplicationPeriod.DeduplicationTime(duration) => JDuration.ofSeconds(duration.seconds, duration.nanos.toLong) + case DeduplicationPeriod.DeduplicationDuration(duration) => + JDuration.ofSeconds(duration.seconds, duration.nanos.toLong) case DeduplicationPeriod.DeduplicationOffset( _ ) => //no way of extracting the duration from here, will be removed soon diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index b3dd912b1a..813775c79a 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -118,8 +118,10 @@ da_scala_test_suite( "//ledger/ledger-api-client", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", + "//ledger/ledger-offset", "//ledger/metrics", "//ledger/metrics:metrics-test-lib", + "//ledger/participant-state/kvutils", "//libs-scala/concurrent", "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala index 95a1586d9a..95b4f1e310 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/FieldValidations.scala @@ -14,6 +14,7 @@ import com.daml.lf.data.Ref import com.daml.lf.data.Ref.Party import com.daml.lf.value.Value.ContractId import com.daml.platform.server.api.validation.ErrorFactories._ +import com.google.protobuf.duration.{Duration => DurationProto} import io.grpc.StatusRuntimeException import scala.util.Try @@ -120,21 +121,28 @@ trait FieldValidations { Left(invalidField(fieldName, exceedsMaxDurationMessage)) else Right(duration) } + + def protoDurationToDurationPeriod(duration: DurationProto) = { + val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong) + validateDuration( + result, + s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration", + ).map(DeduplicationPeriod.DeduplicationDuration) + } + deduplicationPeriod match { case DeduplicationPeriodProto.Empty => Right(DeduplicationPeriod.DeduplicationDuration(maxDeduplicationDuration)) case DeduplicationPeriodProto.DeduplicationTime(duration) => - val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong) - validateDuration( - result, - s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration", - ).map(DeduplicationPeriod.DeduplicationDuration) + protoDurationToDurationPeriod(duration) case DeduplicationPeriodProto.DeduplicationOffset(offset) => Right( DeduplicationPeriod.DeduplicationOffset( Offset.fromHexString(Ref.HexString.assertFromString(offset)) ) ) + case DeduplicationPeriodProto.DeduplicationDuration(duration) => + protoDurationToDurationPeriod(duration) } }) } diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala index bd6ae20935..7b5d227861 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/ledger/api/validation/SubmitRequestValidatorTest.scala @@ -3,7 +3,7 @@ package com.daml.ledger.api.validation -import java.time.Instant +import java.time.{Instant, Duration => JDuration} import java.util.UUID import com.daml.api.util.{DurationConversion, TimestampConversion} @@ -14,6 +14,7 @@ import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand} import com.daml.ledger.api.v1.value.Value.Sum import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _} import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks, SubmissionIdGenerator} +import com.daml.ledger.participant.state.kvutils.OffsetBuilder import com.daml.lf.command.{Commands => LfCommands, CreateCommand => LfCreateCommand} import com.daml.lf.data._ import com.daml.lf.value.Value.ValueRecord @@ -24,7 +25,6 @@ import io.grpc.Status.Code.{INVALID_ARGUMENT, UNAVAILABLE} import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.wordspec.AnyWordSpec import scalaz.syntax.tag._ - class SubmitRequestValidatorTest extends AnyWordSpec with ValidatorTestUtils @@ -70,7 +70,7 @@ class SubmitRequestValidatorTest val ledgerTime = Instant.EPOCH.plusSeconds(10) val submittedAt = Instant.now val timeDelta = java.time.Duration.ofSeconds(1) - val maxDeduplicationTime = java.time.Duration.ofDays(1) + val maxDeduplicationDuration = java.time.Duration.ofDays(1) val deduplicationDuration = java.time.Duration.ofSeconds( api.deduplicationTime.seconds, api.deduplicationTime.nanos.toLong, @@ -139,7 +139,7 @@ class SubmitRequestValidatorTest api.commands.withCommands(Seq.empty), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ), INVALID_ARGUMENT, "Missing field: commands", @@ -154,7 +154,7 @@ class SubmitRequestValidatorTest api.commands.withLedgerId(""), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ), INVALID_ARGUMENT, "Missing field: ledger_id", @@ -168,7 +168,7 @@ class SubmitRequestValidatorTest api.commands.withWorkflowId(""), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right( internal.emptyCommands.copy( workflowId = None, @@ -184,7 +184,7 @@ class SubmitRequestValidatorTest api.commands.withApplicationId(""), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ), INVALID_ARGUMENT, "Missing field: application_id", @@ -198,7 +198,7 @@ class SubmitRequestValidatorTest api.commands.withCommandId(""), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ), INVALID_ARGUMENT, "Missing field: command_id", @@ -213,7 +213,7 @@ class SubmitRequestValidatorTest api.commands.withParty(""), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ), INVALID_ARGUMENT, """Missing field: party or act_as""", @@ -232,7 +232,7 @@ class SubmitRequestValidatorTest .addReadAs("bob"), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) inside(result) { case Right(cmd) => // actAs parties are gathered from "party" and "readAs" fields @@ -250,7 +250,7 @@ class SubmitRequestValidatorTest api.commands.withParty("").addActAs(api.submitter), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right(internal.emptyCommands) } @@ -262,7 +262,7 @@ class SubmitRequestValidatorTest api.commands.withParty(api.submitter).addActAs(api.submitter).addReadAs(api.submitter), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right(internal.emptyCommands) } @@ -276,7 +276,7 @@ class SubmitRequestValidatorTest ), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs)) } @@ -290,57 +290,108 @@ class SubmitRequestValidatorTest ), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs)) } - "not allow negative deduplication time" in { - val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) - requestMustFailWith( - commandsValidator.validateCommands( - api.commands.copy(deduplicationPeriod = - DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0)) + "transform valid deduplication into correct internal structure" in { + val deduplicationDuration = Duration.of(10, 0) + val offset = OffsetBuilder.fromLong(0) + forAll( + Table[DeduplicationPeriodProto, DeduplicationPeriod]( + ("input proto deduplication", "valid model deduplication"), + DeduplicationPeriodProto.DeduplicationTime(deduplicationDuration) -> DeduplicationPeriod + .DeduplicationDuration(JDuration.ofSeconds(10)), + DeduplicationPeriodProto.DeduplicationDuration( + deduplicationDuration + ) -> DeduplicationPeriod + .DeduplicationDuration(JDuration.ofSeconds(10)), + DeduplicationPeriodProto.DeduplicationOffset(offset.toHexString) -> DeduplicationPeriod + .DeduplicationOffset(offset), + DeduplicationPeriodProto.Empty -> DeduplicationPeriod.DeduplicationDuration( + internal.maxDeduplicationDuration ), - internal.ledgerTime, - internal.submittedAt, - Some(internal.maxDeduplicationTime), - ), - INVALID_ARGUMENT, - "Invalid field deduplication_period: Duration must be positive", - ) + ) + ) { + case ( + sentDeduplication: DeduplicationPeriodProto, + expectedDeduplication: DeduplicationPeriod, + ) => + val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) + val result = commandsValidator.validateCommands( + api.commands.copy(deduplicationPeriod = sentDeduplication), + internal.ledgerTime, + internal.submittedAt, + Some(internal.maxDeduplicationDuration), + ) + inside(result) { case Right(valid) => + valid.deduplicationPeriod shouldBe (expectedDeduplication) + } + } + } + + "not allow negative deduplication time" in { + forAll( + Table( + "deduplication period", + DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0)), + DeduplicationPeriodProto.DeduplicationDuration(Duration.of(-1, 0)), + ) + ) { deduplication => + val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) + requestMustFailWith( + commandsValidator.validateCommands( + api.commands.copy(deduplicationPeriod = deduplication), + internal.ledgerTime, + internal.submittedAt, + Some(internal.maxDeduplicationDuration), + ), + INVALID_ARGUMENT, + "Invalid field deduplication_period: Duration must be positive", + ) + } } "not allow deduplication time exceeding maximum deduplication time" in { - val manySeconds = 100000L - val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) - requestMustFailWith( - commandsValidator.validateCommands( - api.commands - .copy(deduplicationPeriod = - DeduplicationPeriodProto.DeduplicationTime(Duration.of(manySeconds, 0)) - ), - internal.ledgerTime, - internal.submittedAt, - Some(internal.maxDeduplicationTime), - ), - INVALID_ARGUMENT, - s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration - .ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}", - ) + val durationSecondsExceedingMax = + internal.maxDeduplicationDuration.plusSeconds(1).getSeconds + forAll( + Table( + "deduplication period", + DeduplicationPeriodProto.DeduplicationTime(Duration.of(durationSecondsExceedingMax, 0)), + DeduplicationPeriodProto.DeduplicationDuration( + Duration.of(durationSecondsExceedingMax, 0) + ), + ) + ) { deduplication => + val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId) + requestMustFailWith( + commandsValidator.validateCommands( + api.commands + .copy(deduplicationPeriod = deduplication), + internal.ledgerTime, + internal.submittedAt, + Some(internal.maxDeduplicationDuration), + ), + INVALID_ARGUMENT, + s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration + .ofSeconds(durationSecondsExceedingMax)} exceeds the maximum deduplication time of ${internal.maxDeduplicationDuration}", + ) + } } - "default to maximum deduplication time if deduplication time is missing" in { + "default to maximum deduplication time if deduplication is missing" in { val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId) commandsValidator.validateCommands( api.commands.copy(deduplicationPeriod = DeduplicationPeriodProto.Empty), internal.ledgerTime, internal.submittedAt, - Some(internal.maxDeduplicationTime), + Some(internal.maxDeduplicationDuration), ) shouldEqual Right( internal.emptyCommands.copy( deduplicationPeriod = - DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationTime) + DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationDuration) ) ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index 3eef3e5def..b5457595f9 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -7,17 +7,15 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions._ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ -import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod -import com.daml.ledger.client.binding.Primitive import com.daml.ledger.test.model.DA.Types.Tuple2 import com.daml.ledger.test.model.Test.TextKeyOperations._ import com.daml.ledger.test.model.Test._ import com.daml.timer.Delayed import io.grpc.Status +import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration) @@ -34,27 +32,20 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva "Deduplicate commands within the deduplication time window", allocate(SingleParty), )(implicit ec => { case Participants(Participant(ledger, party)) => - requestsAreSubmittedAndDeduplicated( - ledger, - party, - DeduplicationPeriod.DeduplicationTime(deduplicationTime.asProtobuf), - ) - }) - - private def requestsAreSubmittedAndDeduplicated( - ledger: ParticipantTestContext, - party: Primitive.Party, - deduplicationPeriod: => DeduplicationPeriod, - )(implicit ec: ExecutionContext) = { lazy val requestA1 = ledger .submitRequest(party, DummyWithAnnotation(party, "First submission").create.command) .update( - _.commands.deduplicationPeriod := deduplicationPeriod + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationTime( + deduplicationTime.asProtobuf + ) ) lazy val requestA2 = ledger .submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command) .update( - _.commands.deduplicationPeriod := deduplicationPeriod, + _.commands.deduplicationPeriod := DeduplicationPeriod + .DeduplicationDuration( + deduplicationTime.asProtobuf + ), //same semantics as `DeduplicationTime` _.commands.commandId := requestA1.commands.get.commandId, ) for { @@ -109,7 +100,7 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva s"There should be 2 active contracts, but received $activeContracts", ) } - } + }) test( "CDStopOnSubmissionFailure",