[ledger-api] Add deduplication_duration to deduplication period [kvl-1047] (#10676)

* Add `deduplication_duration` to `deduplication_period` and deprecate `deduplication_time`

CHANGELOG_BEGIN
ledger-api - add `deduplication_duration` as a future replacement for `deduplication_time` in the command proto definition
CHANGELOG_END

* Add tests for deduplication period validation
This commit is contained in:
nicu-da 2021-08-26 02:33:01 -07:00 committed by GitHub
parent 96ad9b5ab8
commit eabb19d7f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 72 deletions

View File

@ -59,10 +59,9 @@ message Commands {
oneof deduplication_period {
// Specifies the length of the deduplication period.
// It is interpreted relative to the local clock at some point during the submission's processing.
// Same semantics as deduplication_duration
//
// Must be non-negative.
// DEPRECATED
google.protobuf.Duration deduplication_time = 9;
// Specifies the start of the deduplication period by a completion stream offset.
@ -70,6 +69,11 @@ message Commands {
// Must be a valid LedgerString (as described in ``value.proto``).
string deduplication_offset = 15;
// Specifies the length of the deduplication period.
// It is interpreted relative to the local clock at some point during the submission's processing.
//
// Must be non-negative.
google.protobuf.Duration deduplication_duration = 16;
}
// Lower bound for the ledger time assigned to the resulting transaction.

View File

@ -327,6 +327,8 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti
maxDeduplicationDuration
case DeduplicationPeriod.DeduplicationTime(duration) =>
JDuration.ofSeconds(duration.seconds, duration.nanos.toLong)
case DeduplicationPeriod.DeduplicationDuration(duration) =>
JDuration.ofSeconds(duration.seconds, duration.nanos.toLong)
case DeduplicationPeriod.DeduplicationOffset(
_
) => //no way of extracting the duration from here, will be removed soon

View File

@ -118,8 +118,10 @@ da_scala_test_suite(
"//ledger/ledger-api-client",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-offset",
"//ledger/metrics",
"//ledger/metrics:metrics-test-lib",
"//ledger/participant-state/kvutils",
"//libs-scala/concurrent",
"//libs-scala/contextualized-logging",
"//libs-scala/grpc-utils",

View File

@ -14,6 +14,7 @@ import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.value.Value.ContractId
import com.daml.platform.server.api.validation.ErrorFactories._
import com.google.protobuf.duration.{Duration => DurationProto}
import io.grpc.StatusRuntimeException
import scala.util.Try
@ -120,21 +121,28 @@ trait FieldValidations {
Left(invalidField(fieldName, exceedsMaxDurationMessage))
else Right(duration)
}
def protoDurationToDurationPeriod(duration: DurationProto) = {
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
validateDuration(
result,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration",
).map(DeduplicationPeriod.DeduplicationDuration)
}
deduplicationPeriod match {
case DeduplicationPeriodProto.Empty =>
Right(DeduplicationPeriod.DeduplicationDuration(maxDeduplicationDuration))
case DeduplicationPeriodProto.DeduplicationTime(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
validateDuration(
result,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration",
).map(DeduplicationPeriod.DeduplicationDuration)
protoDurationToDurationPeriod(duration)
case DeduplicationPeriodProto.DeduplicationOffset(offset) =>
Right(
DeduplicationPeriod.DeduplicationOffset(
Offset.fromHexString(Ref.HexString.assertFromString(offset))
)
)
case DeduplicationPeriodProto.DeduplicationDuration(duration) =>
protoDurationToDurationPeriod(duration)
}
})
}

View File

@ -3,7 +3,7 @@
package com.daml.ledger.api.validation
import java.time.Instant
import java.time.{Instant, Duration => JDuration}
import java.util.UUID
import com.daml.api.util.{DurationConversion, TimestampConversion}
@ -14,6 +14,7 @@ import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand}
import com.daml.ledger.api.v1.value.Value.Sum
import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _}
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks, SubmissionIdGenerator}
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
import com.daml.lf.command.{Commands => LfCommands, CreateCommand => LfCreateCommand}
import com.daml.lf.data._
import com.daml.lf.value.Value.ValueRecord
@ -24,7 +25,6 @@ import io.grpc.Status.Code.{INVALID_ARGUMENT, UNAVAILABLE}
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
import scalaz.syntax.tag._
class SubmitRequestValidatorTest
extends AnyWordSpec
with ValidatorTestUtils
@ -70,7 +70,7 @@ class SubmitRequestValidatorTest
val ledgerTime = Instant.EPOCH.plusSeconds(10)
val submittedAt = Instant.now
val timeDelta = java.time.Duration.ofSeconds(1)
val maxDeduplicationTime = java.time.Duration.ofDays(1)
val maxDeduplicationDuration = java.time.Duration.ofDays(1)
val deduplicationDuration = java.time.Duration.ofSeconds(
api.deduplicationTime.seconds,
api.deduplicationTime.nanos.toLong,
@ -139,7 +139,7 @@ class SubmitRequestValidatorTest
api.commands.withCommands(Seq.empty),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: commands",
@ -154,7 +154,7 @@ class SubmitRequestValidatorTest
api.commands.withLedgerId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: ledger_id",
@ -168,7 +168,7 @@ class SubmitRequestValidatorTest
api.commands.withWorkflowId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(
internal.emptyCommands.copy(
workflowId = None,
@ -184,7 +184,7 @@ class SubmitRequestValidatorTest
api.commands.withApplicationId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: application_id",
@ -198,7 +198,7 @@ class SubmitRequestValidatorTest
api.commands.withCommandId(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Missing field: command_id",
@ -213,7 +213,7 @@ class SubmitRequestValidatorTest
api.commands.withParty(""),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"""Missing field: party or act_as""",
@ -232,7 +232,7 @@ class SubmitRequestValidatorTest
.addReadAs("bob"),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
)
inside(result) { case Right(cmd) =>
// actAs parties are gathered from "party" and "readAs" fields
@ -250,7 +250,7 @@ class SubmitRequestValidatorTest
api.commands.withParty("").addActAs(api.submitter),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(internal.emptyCommands)
}
@ -262,7 +262,7 @@ class SubmitRequestValidatorTest
api.commands.withParty(api.submitter).addActAs(api.submitter).addReadAs(api.submitter),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(internal.emptyCommands)
}
@ -276,7 +276,7 @@ class SubmitRequestValidatorTest
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}
@ -290,57 +290,108 @@ class SubmitRequestValidatorTest
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}
"not allow negative deduplication time" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod =
DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0))
"transform valid deduplication into correct internal structure" in {
val deduplicationDuration = Duration.of(10, 0)
val offset = OffsetBuilder.fromLong(0)
forAll(
Table[DeduplicationPeriodProto, DeduplicationPeriod](
("input proto deduplication", "valid model deduplication"),
DeduplicationPeriodProto.DeduplicationTime(deduplicationDuration) -> DeduplicationPeriod
.DeduplicationDuration(JDuration.ofSeconds(10)),
DeduplicationPeriodProto.DeduplicationDuration(
deduplicationDuration
) -> DeduplicationPeriod
.DeduplicationDuration(JDuration.ofSeconds(10)),
DeduplicationPeriodProto.DeduplicationOffset(offset.toHexString) -> DeduplicationPeriod
.DeduplicationOffset(offset),
DeduplicationPeriodProto.Empty -> DeduplicationPeriod.DeduplicationDuration(
internal.maxDeduplicationDuration
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
),
INVALID_ARGUMENT,
"Invalid field deduplication_period: Duration must be positive",
)
)
) {
case (
sentDeduplication: DeduplicationPeriodProto,
expectedDeduplication: DeduplicationPeriod,
) =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val result = commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = sentDeduplication),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
)
inside(result) { case Right(valid) =>
valid.deduplicationPeriod shouldBe (expectedDeduplication)
}
}
}
"not allow negative deduplication time" in {
forAll(
Table(
"deduplication period",
DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0)),
DeduplicationPeriodProto.DeduplicationDuration(Duration.of(-1, 0)),
)
) { deduplication =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = deduplication),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
"Invalid field deduplication_period: Duration must be positive",
)
}
}
"not allow deduplication time exceeding maximum deduplication time" in {
val manySeconds = 100000L
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands
.copy(deduplicationPeriod =
DeduplicationPeriodProto.DeduplicationTime(Duration.of(manySeconds, 0))
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
),
INVALID_ARGUMENT,
s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration
.ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}",
)
val durationSecondsExceedingMax =
internal.maxDeduplicationDuration.plusSeconds(1).getSeconds
forAll(
Table(
"deduplication period",
DeduplicationPeriodProto.DeduplicationTime(Duration.of(durationSecondsExceedingMax, 0)),
DeduplicationPeriodProto.DeduplicationDuration(
Duration.of(durationSecondsExceedingMax, 0)
),
)
) { deduplication =>
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands
.copy(deduplicationPeriod = deduplication),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationDuration),
),
INVALID_ARGUMENT,
s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration
.ofSeconds(durationSecondsExceedingMax)} exceeds the maximum deduplication time of ${internal.maxDeduplicationDuration}",
)
}
}
"default to maximum deduplication time if deduplication time is missing" in {
"default to maximum deduplication time if deduplication is missing" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.copy(deduplicationPeriod = DeduplicationPeriodProto.Empty),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
Some(internal.maxDeduplicationDuration),
) shouldEqual Right(
internal.emptyCommands.copy(
deduplicationPeriod =
DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationTime)
DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationDuration)
)
)
}

View File

@ -7,17 +7,15 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions._
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.test.model.DA.Types.Tuple2
import com.daml.ledger.test.model.Test.TextKeyOperations._
import com.daml.ledger.test.model.Test._
import com.daml.timer.Delayed
import io.grpc.Status
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration)
@ -34,27 +32,20 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
"Deduplicate commands within the deduplication time window",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
requestsAreSubmittedAndDeduplicated(
ledger,
party,
DeduplicationPeriod.DeduplicationTime(deduplicationTime.asProtobuf),
)
})
private def requestsAreSubmittedAndDeduplicated(
ledger: ParticipantTestContext,
party: Primitive.Party,
deduplicationPeriod: => DeduplicationPeriod,
)(implicit ec: ExecutionContext) = {
lazy val requestA1 = ledger
.submitRequest(party, DummyWithAnnotation(party, "First submission").create.command)
.update(
_.commands.deduplicationPeriod := deduplicationPeriod
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationTime(
deduplicationTime.asProtobuf
)
)
lazy val requestA2 = ledger
.submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command)
.update(
_.commands.deduplicationPeriod := deduplicationPeriod,
_.commands.deduplicationPeriod := DeduplicationPeriod
.DeduplicationDuration(
deduplicationTime.asProtobuf
), //same semantics as `DeduplicationTime`
_.commands.commandId := requestA1.commands.get.commandId,
)
for {
@ -109,7 +100,7 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
s"There should be 2 active contracts, but received $activeContracts",
)
}
}
})
test(
"CDStopOnSubmissionFailure",