Add multiple ways of specifying deduplication [KVL-1047] (#10601)

CHANGELOG_BEGIN
ledger-api - Command deduplication period can now be specified by setting `deduplication_offset` instead of `deduplication_time` (only valid for v2 WriteService). This change is backwards compatible.
CHANGELOG_END

* Propagate the enriched deduplicationPeriod instead of deduplication duration

* Update the Haskell bindings for the new deduplication period

* Calculate the deduplicateUntil using the new deduplication period for backward compat

* Use consistent naming for deduplication_period

* Cleanup command timeout extraction from deduplication period

* Add the required deduplication_offset to deduplication instead of deduplication_start

* Update haskell bindings to support deduplication_offset

* Add support for deduplication_offset in the ledger-api

* Remove the timestamp-based deduplication from our models to simplify upgrade for users

* Add optional conformance test for offset based deduplication

* Remove buf rule for FIELD_SAME_ONEOF as our change is backwards compatible

* Disable FIELD_SAME_ONEOF buf check for commands file

* Apply suggestions from code review

Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com>
Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>

* Update comment for deduplication period

Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com>
Co-authored-by: Samir Talwar <samir.talwar@digitalasset.com>
This commit is contained in:
nicu-da 2021-08-25 05:58:03 -07:00 committed by GitHub
parent 53be19f86c
commit 7cc698948c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 366 additions and 114 deletions

View File

@ -4,7 +4,7 @@
version: v1beta1
deps:
- buf.build/googleapis/googleapis
- buf.build/googleapis/googleapis
build:
roots:
@ -30,3 +30,6 @@ lint:
breaking:
use:
- WIRE_JSON
ignore_only:
FIELD_SAME_ONEOF: # ignore field_same_oneof to not trigger an error from moving an existing field into a new oneof
- com/daml/ledger/api/v1/commands.proto

View File

@ -487,7 +487,7 @@ reset args = do
, cid = L.CommandId $ TL.fromStrict $ UUID.toText cmdId
, actAs = parties
, readAs = []
, dedupTime = Nothing
, dedupPeriod = Nothing
, minLeTimeAbs = Nothing
, minLeTimeRel = Nothing
, sid = Nothing

View File

@ -65,7 +65,7 @@ getTrans party Handle{log,lid} = do
submitCommand :: Handle -> Party -> Command -> IO (Either String ())
submitCommand Handle{lid} party com = do
cid <- randomCid
run 5 $ Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid})
run 5 $ Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid})
where
wid = Nothing
myAid = ApplicationId "chat-console"

View File

@ -65,7 +65,7 @@ getTrans player Handle{log,lid} = do
submitCommand :: Handle -> Party -> Command -> IO (Either String ())
submitCommand Handle{lid} party com = do
cid <- randomCid
run 5 (Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid}))
run 5 (Ledger.submit (Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid}))
where
wid = Nothing
myAid = ApplicationId "nim"

View File

@ -74,11 +74,18 @@ lowerCommands = \case
commandsActAs = Vector.fromList $ map unParty actAs,
commandsReadAs = Vector.fromList $ map unParty readAs,
commandsSubmissionId = unSubmissionId (fromMaybe (SubmissionId "") sid),
commandsDeduplicationTime = dedupTime,
commandsDeduplicationPeriod = fmap lowerDeduplicationPeriod dedupPeriod,
commandsCommands = Vector.fromList $ map lowerCommand coms,
commandsMinLedgerTimeAbs = fmap lowerTimestamp minLeTimeAbs,
commandsMinLedgerTimeRel = minLeTimeRel }
lowerDeduplicationPeriod :: DeduplicationPeriod -> LL.CommandsDeduplicationPeriod
lowerDeduplicationPeriod = \case
DeduplicationTime t ->
LL.CommandsDeduplicationPeriodDeduplicationTime t
DeduplicationOffset o ->
LL.CommandsDeduplicationPeriodDeduplicationOffset (unAbsOffset o)
lowerCommand :: Command -> LL.Command
lowerCommand = \case
CreateCommand{..} ->

View File

@ -55,7 +55,7 @@ module DA.Ledger.Types( -- High Level types for communication over Ledger API
SubmissionId(..),
LL.Duration(..),
LL.Status(..),
DeduplicationPeriod(..)
) where
import qualified Data.Aeson as A
@ -77,7 +77,7 @@ data Commands = Commands
, cid :: CommandId
, actAs :: [Party]
, readAs :: [Party]
, dedupTime :: Maybe LL.Duration
, dedupPeriod :: Maybe DeduplicationPeriod
, coms :: [Command]
, minLeTimeAbs :: Maybe Timestamp
, minLeTimeRel :: Maybe LL.Duration
@ -103,6 +103,11 @@ data Command
}
deriving (Eq,Ord,Show)
data DeduplicationPeriod
= DeduplicationTime LL.Duration
| DeduplicationOffset AbsOffset
deriving (Eq, Ord, Show)
-- ledger_offset.proto
data LedgerOffset = LedgerBegin | LedgerEnd | LedgerAbsOffset AbsOffset

View File

@ -656,7 +656,7 @@ makeCommands :: LedgerId -> Party -> Command -> IO (CommandId,Commands)
makeCommands lid party com = do
cid <- liftIO randomCid
let wid = Nothing
return $ (cid,) $ Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupTime=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid=Nothing}
return $ (cid,) $ Commands {lid,wid,aid=myAid,cid,actAs=[party],readAs=[],dedupPeriod=Nothing,coms=[com],minLeTimeAbs=Nothing,minLeTimeRel=Nothing,sid=Nothing}
myAid :: ApplicationId

View File

@ -19,7 +19,6 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
}
import com.daml.ledger.client.testing.AkkaTest
import com.daml.util.Ctx
import com.google.protobuf.duration.{Duration => protoDuration}
import com.google.rpc.Code
import com.google.rpc.status.Status
import org.scalatest.Inside
@ -37,8 +36,13 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
val mockCommandSubmission: SubmissionFlowType[RetryInfo[Status]] =
Flow[In[RetryInfo[Status]]]
.map {
case Ctx(context @ RetryInfo(_, _, _, status), SubmitRequest(Some(commands)), _) =>
if (commands.deduplicationTime.get.nanos == 0) {
case Ctx(
context @ RetryInfo(_, nrOfRetries, _, status),
SubmitRequest(Some(commands)),
_,
) =>
// Return a completion based on the input status code only on the first submission.
if (nrOfRetries == 0) {
Ctx(context, CompletionResponse(Completion(commands.commandId, Some(status))))
} else {
Ctx(
@ -58,10 +62,7 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
retryInfo: RetryInfo[Status],
response: Either[CompletionFailure, CompletionSuccess],
) = {
val commands = retryInfo.request.commands.get
val dedupTime = commands.deduplicationTime.get
val newDedupTime = dedupTime.copy(nanos = dedupTime.nanos + 1)
SubmitRequest(Some(commands.copy(deduplicationTime = Some(newDedupTime))))
SubmitRequest(retryInfo.request.commands)
}
val retryFlow: SubmissionFlowType[RetryInfo[Status]] =
@ -78,7 +79,6 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
"commandId",
"party",
Seq.empty,
Some(protoDuration.of(120, 0)),
)
)
)
@ -92,7 +92,7 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
.runWith(Sink.seq)
}
CommandRetryFlow.getClass.getSimpleName should {
"command retry flow" should {
"propagate OK status" in {
submitRequest(Code.OK_VALUE, Instant.ofEpochSecond(45)) map { result =>

View File

@ -57,11 +57,20 @@ message Commands {
// Required
repeated Command commands = 8;
// The length of the time window during which all commands with the same ``act_as`` parties and
// the same command ID will be deduplicated.
// Duplicate commands submitted before the end of this window return an ALREADY_EXISTS error.
// Optional
google.protobuf.Duration deduplication_time = 9;
oneof deduplication_period {
// Specifies the length of the deduplication period.
// It is interpreted relative to the local clock at some point during the submission's processing.
//
// Must be non-negative.
google.protobuf.Duration deduplication_time = 9;
// Specifies the start of the deduplication period by a completion stream offset.
//
// Must be a valid LedgerString (as described in ``value.proto``).
string deduplication_offset = 15;
}
// Lower bound for the ledger time assigned to the resulting transaction.
// Note: The ledger time of a transaction is assigned as part of command interpretation.

View File

@ -17,6 +17,7 @@ import com.daml.ledger.api.v1.command_completion_service.{
}
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionServiceStub
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.validation.CommandsValidator
import com.daml.ledger.client.LedgerClient
@ -180,16 +181,18 @@ final class CommandClient(
throw new IllegalArgumentException(
s"Failing fast on submission request of command ${commands.commandId} with invalid application ID ${commands.applicationId} (client expected $applicationId)"
)
val updateDedupTime = commands.deduplicationTime.orElse(
Some(
Duration
.of(
config.defaultDeduplicationTime.getSeconds,
config.defaultDeduplicationTime.getNano,
)
)
)
r.copy(commands = Some(commands.copy(deduplicationTime = updateDedupTime)))
val updatedDeduplicationPeriod = commands.deduplicationPeriod match {
case DeduplicationPeriod.Empty =>
DeduplicationPeriod.DeduplicationTime(
Duration
.of(
config.defaultDeduplicationTime.getSeconds,
config.defaultDeduplicationTime.getNano,
)
)
case existing => existing
}
r.copy(commands = Some(commands.copy(deduplicationPeriod = updatedDeduplicationPeriod)))
})
def submissionFlow[Context](

View File

@ -9,6 +9,7 @@ import akka.stream.stage._
import akka.stream.{Attributes, Inlet, Outlet}
import com.daml.grpc.{GrpcException, GrpcStatus}
import com.daml.ledger.api.v1.command_submission_service._
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
@ -233,12 +234,11 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti
},
)
case Some(maxDedup) =>
val deduplicationDuration = commands.deduplicationTime
.map(d => JDuration.ofSeconds(d.seconds, d.nanos.toLong))
.getOrElse(maxDedup)
val commandTimeout =
maxTimeoutFromDeduplicationPeriod(commands.deduplicationPeriod, maxDedup)
val trackingData = TrackingData(
commandId = commandId,
commandTimeout = Instant.now().plus(deduplicationDuration),
commandTimeout = commandTimeout,
context = submitRequest.context,
)
pendingCommands += commandId -> trackingData
@ -318,6 +318,25 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Opti
logic -> promise.future
}
private def maxTimeoutFromDeduplicationPeriod(
deduplicationPeriod: DeduplicationPeriod,
maxDeduplicationDuration: JDuration,
) = {
val timeoutDuration = deduplicationPeriod match {
case DeduplicationPeriod.Empty =>
maxDeduplicationDuration
case DeduplicationPeriod.DeduplicationTime(duration) =>
JDuration.ofSeconds(duration.seconds, duration.nanos.toLong)
case DeduplicationPeriod.DeduplicationOffset(
_
) => //no way of extracting the duration from here, will be removed soon
maxDeduplicationDuration
}
Instant
.now()
.plus(timeoutDuration)
}
override def shape: CommandTrackerShape[Context] =
CommandTrackerShape(submitRequestIn, submitRequestOut, commandResultIn, resultOut, offsetOut)

View File

@ -29,6 +29,7 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
NotOkResponse,
}
import com.daml.util.Ctx
import com.google.protobuf.duration.{Duration => DurationProto}
import com.google.protobuf.empty.Empty
import com.google.protobuf.timestamp.Timestamp
import com.google.rpc.code._
@ -80,8 +81,9 @@ class CommandTrackerFlowTest
Some(
Commands(
commandId = commandId,
deduplicationTime =
dedupTime.map(t => com.google.protobuf.duration.Duration(t.getSeconds)),
deduplicationPeriod = dedupTime
.map(t => Commands.DeduplicationPeriod.DeduplicationTime(DurationProto(t.getSeconds)))
.getOrElse(Commands.DeduplicationPeriod.Empty),
)
)
),

View File

@ -36,6 +36,7 @@ da_scala_library(
"//ledger/ledger-api-akka",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-offset",
"//ledger/ledger-resources",
"//ledger/metrics",
"//libs-scala/concurrent",

View File

@ -59,10 +59,10 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss
s"Can not represent command ledger time $ledgerEffectiveTime as a Daml timestamp"
)
)
deduplicationDuration <- validateDeduplicationDuration(
commands.deduplicationTime,
deduplicationPeriod <- validateDeduplicationPeriod(
commands.deduplicationPeriod,
maxDeduplicationTime,
"deduplication_time",
"deduplication_period",
)
} yield domain.Commands(
ledgerId = ledgerId,
@ -73,7 +73,7 @@ final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: Submiss
actAs = submitters.actAs,
readAs = submitters.readAs,
submittedAt = currentUtcTime,
deduplicationDuration = deduplicationDuration,
deduplicationPeriod = deduplicationPeriod,
commands = Commands(
commands = ImmArray(validatedCommands),
ledgerEffectiveTime = ledgerEffectiveTimestamp,

View File

@ -60,8 +60,12 @@ class GrpcCommandSubmissionService(
Timed
.value(
metrics.daml.commands.validation,
validator
.validate(request, currentLedgerTime(), currentUtcTime(), maxDeduplicationTime()),
validator.validate(
request,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),

View File

@ -5,11 +5,14 @@ package com.daml.platform.server.api.validation
import java.time.Duration
import com.daml.lf.data.Ref
import com.daml.lf.value.Value.ContractId
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto}
import com.daml.ledger.api.v1.value.Identifier
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.value.Value.ContractId
import com.daml.platform.server.api.validation.ErrorFactories._
import io.grpc.StatusRuntimeException
@ -99,32 +102,42 @@ trait FieldValidations {
def requirePresence[T](option: Option[T], fieldName: String): Either[StatusRuntimeException, T] =
option.fold[Either[StatusRuntimeException, T]](Left(missingField(fieldName)))(Right(_))
def validateDeduplicationDuration(
durationO: Option[com.google.protobuf.duration.Duration],
/** We validate only using current time because we set the currentTime as submitTime so no need to check both
*/
def validateDeduplicationPeriod(
deduplicationPeriod: DeduplicationPeriodProto,
maxDeduplicationTimeO: Option[Duration],
fieldName: String,
): Either[StatusRuntimeException, Duration] =
maxDeduplicationTimeO.fold[Either[StatusRuntimeException, Duration]](
): Either[StatusRuntimeException, DeduplicationPeriod] = {
maxDeduplicationTimeO.fold[Either[StatusRuntimeException, DeduplicationPeriod]](
Left(missingLedgerConfig())
)(maxDeduplicationTime =>
durationO match {
case None =>
Right(maxDeduplicationTime)
case Some(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
if (result.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (result.compareTo(maxDeduplicationTime) > 0)
Left(
invalidField(
fieldName,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime",
)
)
else
Right(result)
)(maxDeduplicationDuration => {
def validateDuration(duration: Duration, exceedsMaxDurationMessage: String) = {
if (duration.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (duration.compareTo(maxDeduplicationDuration) > 0)
Left(invalidField(fieldName, exceedsMaxDurationMessage))
else Right(duration)
}
)
deduplicationPeriod match {
case DeduplicationPeriodProto.Empty =>
Right(DeduplicationPeriod.DeduplicationDuration(maxDeduplicationDuration))
case DeduplicationPeriodProto.DeduplicationTime(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
validateDuration(
result,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationDuration",
).map(DeduplicationPeriod.DeduplicationDuration)
case DeduplicationPeriodProto.DeduplicationOffset(offset) =>
Right(
DeduplicationPeriod.DeduplicationOffset(
Offset.fromHexString(Ref.HexString.assertFromString(offset))
)
)
}
})
}
def validateIdentifier(identifier: Identifier): Either[StatusRuntimeException, Ref.Identifier] =
for {

View File

@ -7,12 +7,13 @@ import java.time.Instant
import java.util.UUID
import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.ledger.api.{DomainMocks, SubmissionIdGenerator}
import com.daml.ledger.api.DomainMocks.{applicationId, commandId, submissionId, workflowId}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto}
import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand}
import com.daml.ledger.api.v1.value.Value.Sum
import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _}
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks, SubmissionIdGenerator}
import com.daml.lf.command.{Commands => LfCommands, CreateCommand => LfCreateCommand}
import com.daml.lf.data._
import com.daml.lf.value.Value.ValueRecord
@ -59,7 +60,7 @@ class SubmitRequestValidatorTest
commandId = commandId.unwrap,
party = submitter,
commands = Seq(command),
deduplicationTime = Some(deduplicationTime),
deduplicationPeriod = DeduplicationPeriodProto.DeduplicationTime(deduplicationTime),
minLedgerTimeAbs = None,
minLedgerTimeRel = None,
)
@ -84,7 +85,7 @@ class SubmitRequestValidatorTest
actAs = Set(DomainMocks.party),
readAs = Set.empty,
submittedAt = submittedAt,
deduplicationDuration = deduplicationDuration,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(deduplicationDuration),
commands = LfCommands(
ImmArray(
LfCreateCommand(
@ -297,13 +298,15 @@ class SubmitRequestValidatorTest
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = Some(Duration.of(-1, 0))),
api.commands.copy(deduplicationPeriod =
DeduplicationPeriodProto.DeduplicationTime(Duration.of(-1, 0))
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
),
INVALID_ARGUMENT,
"Invalid field deduplication_time: Duration must be positive",
"Invalid field deduplication_period: Duration must be positive",
)
}
@ -312,13 +315,16 @@ class SubmitRequestValidatorTest
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = Some(Duration.of(manySeconds, 0))),
api.commands
.copy(deduplicationPeriod =
DeduplicationPeriodProto.DeduplicationTime(Duration.of(manySeconds, 0))
),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
),
INVALID_ARGUMENT,
s"Invalid field deduplication_time: The given deduplication time of ${java.time.Duration
s"Invalid field deduplication_period: The given deduplication time of ${java.time.Duration
.ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}",
)
}
@ -327,13 +333,14 @@ class SubmitRequestValidatorTest
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = None),
api.commands.copy(deduplicationPeriod = DeduplicationPeriodProto.Empty),
internal.ledgerTime,
internal.submittedAt,
Some(internal.maxDeduplicationTime),
) shouldEqual Right(
internal.emptyCommands.copy(
deduplicationDuration = internal.maxDeduplicationTime
deduplicationPeriod =
DeduplicationPeriod.DeduplicationDuration(internal.maxDeduplicationTime)
)
)
}

View File

@ -8,15 +8,7 @@ import java.time.{Duration, Instant}
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.MockMessages.{
applicationId,
commandId,
commands,
ledgerId,
party,
submitRequest,
workflowId,
}
import com.daml.ledger.api.testing.utils.MockMessages._
import com.daml.ledger.api.v1.commands.{Command, CreateCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.lf.data.Ref

View File

@ -4,6 +4,7 @@
load(
"//bazel_tools:scala.bzl",
"da_scala_library",
"da_scala_test_suite",
)
da_scala_library(
@ -45,3 +46,20 @@ da_scala_library(
"//daml-lf/data",
],
)
da_scala_test_suite(
name = "ledger-api-domain-tests",
size = "small",
srcs = glob(["src/test/suite/scala/**/*.scala"]),
scala_deps = [
"@maven//:org_scalatest_scalatest",
"@maven//:org_scalaz_scalaz_core",
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scala_lang_modules_scala_java8_compat",
],
deps = [
":ledger-api-domain",
":ledger-api-domain-tests-lib",
"//daml-lf/data",
],
)

View File

@ -3,7 +3,7 @@
package com.daml.ledger.api
import java.time.Duration
import java.time.{Duration, Instant}
import com.daml.ledger.offset.Offset
import com.daml.logging.entries.{LoggingValue, ToLoggingValue}
@ -18,6 +18,40 @@ sealed trait DeduplicationPeriod extends Product with Serializable
object DeduplicationPeriod {
/** Transforms the [[period]] into an [[Instant]] to be used for deduplication into the future(deduplicateUntil).
* Only used for backwards compatibility
* @param time The time to use for calculating the [[Instant]]. It can either be submission time or current time, based on usage
* @param period The deduplication period
*/
def deduplicateUntil(
time: Instant,
period: DeduplicationPeriod,
): Instant = period match {
case DeduplicationDuration(duration) =>
time.plus(duration)
case DeduplicationOffset(_) =>
throw new NotImplementedError("Offset deduplication is not supported")
}
/** Computes deduplication duration as the duration `time + minSkew - deduplicationStart`.
* We measure `deduplicationStart` on the ledgers clock, and thus
* we need to add the minSkew to compensate for the maximal skew that the participant might be behind the ledgers clock.
* @param time submission time or current time
* @param deduplicationStart the [[Instant]] from where we should start deduplication, must be < than time
* @param minSkew the minimum skew as specified by the current ledger time model
*/
def deduplicationDurationFromTime(
time: Instant,
deduplicationStart: Instant,
minSkew: Duration,
): Duration = {
assert(deduplicationStart.isBefore(time), "Deduplication must start in the past")
Duration.between(
deduplicationStart,
time.plus(minSkew),
)
}
/** The length of the deduplication window, which ends when the [[WriteService]] or underlying Daml ledger processes
* the command submission.
*

View File

@ -3,7 +3,7 @@
package com.daml.ledger.api
import java.time.{Duration, Instant}
import java.time.Instant
import com.daml.ledger.api.domain.Event.{CreateOrArchiveEvent, CreateOrExerciseEvent}
import com.daml.ledger.configuration.Configuration
@ -283,11 +283,9 @@ object domain {
actAs: Set[Ref.Party],
readAs: Set[Ref.Party],
submittedAt: Instant,
deduplicationDuration: Duration,
deduplicationPeriod: DeduplicationPeriod,
commands: LfCommands,
) {
lazy val deduplicateUntil: Instant = submittedAt.plus(deduplicationDuration)
}
)
object Commands {
@ -302,7 +300,7 @@ object domain {
"actAs" -> commands.actAs,
"readAs" -> commands.readAs,
"submittedAt" -> commands.submittedAt,
"deduplicationDuration" -> commands.deduplicationDuration,
"deduplicationPeriod" -> commands.deduplicationPeriod,
)
}

View File

@ -0,0 +1,24 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api
import java.time.{Duration, Instant}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class DeduplicationPeriodSpec extends AnyWordSpec with Matchers {
"calculating deduplication until" should {
val time = Instant.ofEpochSecond(100)
"return expected result when sending duration" in {
val deduplicateUntil = DeduplicationPeriod.deduplicateUntil(
time,
DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(3)),
)
deduplicateUntil shouldEqual time.plusSeconds(3)
}
}
}

View File

@ -7,14 +7,17 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions._
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.test.model.DA.Types.Tuple2
import com.daml.ledger.test.model.Test.TextKeyOperations._
import com.daml.ledger.test.model.Test._
import com.daml.timer.Delayed
import io.grpc.Status
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration)
@ -31,18 +34,29 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
"Deduplicate commands within the deduplication time window",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val requestA1 = ledger
requestsAreSubmittedAndDeduplicated(
ledger,
party,
DeduplicationPeriod.DeduplicationTime(deduplicationTime.asProtobuf),
)
})
private def requestsAreSubmittedAndDeduplicated(
ledger: ParticipantTestContext,
party: Primitive.Party,
deduplicationPeriod: => DeduplicationPeriod,
)(implicit ec: ExecutionContext) = {
lazy val requestA1 = ledger
.submitRequest(party, DummyWithAnnotation(party, "First submission").create.command)
.update(
_.commands.deduplicationTime := deduplicationTime.asProtobuf
_.commands.deduplicationPeriod := deduplicationPeriod
)
val requestA2 = ledger
lazy val requestA2 = ledger
.submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command)
.update(
_.commands.deduplicationTime := deduplicationTime.asProtobuf,
_.commands.deduplicationPeriod := deduplicationPeriod,
_.commands.commandId := requestA1.commands.get.commandId,
)
for {
// Submit command A (first deduplication window)
// Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server,
@ -53,7 +67,6 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
.submit(requestA1)
.mustFail("submitting the first request for the second time")
completions1 <- ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd1)(party))
// Wait until the end of first deduplication window
_ <- Delayed.by(deduplicationWindowWait)(())
@ -96,7 +109,7 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva
s"There should be 2 active contracts, but received $activeContracts",
)
}
})
}
test(
"CDStopOnSubmissionFailure",

View File

@ -0,0 +1,96 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.testtool.suites
import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions._
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.test.model.Test._
import com.google.rpc.code.Code
final class CommandDeduplicationOffsetIT extends LedgerTestSuite {
test(
"CDOffsetDeduplication",
"Deduplicate commands based on offset",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
for {
ledgerEnd1 <- ledger.currentEnd()
requestA1 = ledger
.submitRequest(party, DummyWithAnnotation(party, "First submission").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset(
ledgerEnd1.getAbsolute
)
)
_ <- ledger.submit(requestA1)
completionsA1First <- ledger.firstCompletions(
ledger.completionStreamRequest(ledgerEnd1)(party)
)
ledgerEndAfterFirstSubmit <- ledger.currentEnd()
_ <- ledger.submit(requestA1)
completionsA1Second <- ledger.firstCompletions(
ledger.completionStreamRequest(ledgerEndAfterFirstSubmit)(party)
) // This will be deduplicated as the deduplication offset includes the first submitted command.
ledgerEnd2 <- ledger.currentEnd()
requestA2 = ledger
.submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod
.DeduplicationOffset(ledgerEnd2.getAbsolute),
_.commands.commandId := requestA1.commands.get.commandId,
)
_ <- ledger.submit(
requestA2
) // the deduplication offset is moved to the last completion so this will be successful
completionsA2First <- ledger.firstCompletions(
ledger.completionStreamRequest(ledgerEnd2)(party)
)
activeContracts <- ledger.activeContracts(party)
} yield {
assert(ledgerEnd1 != ledgerEnd2)
val completionCommandId1 =
assertSingleton("Expected only one first completion", completionsA1First)
assert(
completionCommandId1.commandId == requestA1.commands.get.commandId,
"The command ID of the first completion does not match the command ID of the first submission",
)
assert(
completionCommandId1.status.get.code == Code.OK.value,
s"First command did not complete OK. Had status ${completionCommandId1.status}",
)
val failureCommandId1 =
assertSingleton("Expected only one first failure", completionsA1Second)
assert(
failureCommandId1.commandId == requestA1.commands.get.commandId,
"The command ID of the second completion does not match the command ID of the first submission",
)
assert(
failureCommandId1.status.get.code == Code.ALREADY_EXISTS.value,
s"Second completion for the first submission was not deduplicated ${failureCommandId1.status}",
)
val completionCommandId2 =
assertSingleton(
"Expected only one second successful completion",
completionsA2First,
)
assert(
completionCommandId2.commandId == requestA2.commands.get.commandId,
"The command ID of the second completion does not match the command ID of the second submission",
)
assert(
completionCommandId2.status.get.code == Code.OK.value,
s"Second command did not complete OK. Had status ${completionCommandId1.status}",
)
assert(
activeContracts.size == 2,
s"There should be 2 active contracts, but received ${activeContracts.size} contracts, with events: $activeContracts",
)
}
})
}

View File

@ -62,6 +62,7 @@ object Tests {
Vector(
new ParticipantPruningIT,
new MultiPartySubmissionIT,
new CommandDeduplicationOffsetIT,
)
val retired: Vector[LedgerTestSuite] =

View File

@ -6,7 +6,6 @@ package com.daml.platform.apiserver.execution
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{Commands => ApiCommands}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService}
@ -80,7 +79,7 @@ private[apiserver] final class StoreBackedCommandExecutor(
commands.actAs.toList,
commands.applicationId.unwrap,
commands.commandId.unwrap,
DeduplicationPeriod.DeduplicationDuration(commands.deduplicationDuration),
commands.deduplicationPeriod,
commands.submissionId.unwrap,
ledgerConfiguration,
),

View File

@ -7,7 +7,7 @@ import java.time.{Duration, Instant}
import java.util.UUID
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
@ -140,7 +140,10 @@ private[apiserver] final class ApiSubmissionService private[services] (
commands.commandId,
commands.actAs.toList,
commands.submittedAt,
commands.deduplicateUntil,
DeduplicationPeriod.deduplicateUntil(
commands.submittedAt,
commands.deduplicationPeriod,
),
)
.flatMap {
case CommandDeduplicationNew =>

View File

@ -6,6 +6,7 @@ package com.daml.platform.apiserver.execution
import java.time.{Duration, Instant}
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId, SubmissionId}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService}
@ -63,7 +64,7 @@ class StoreBackedCommandExecutorSpec
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.EPOCH,
deduplicationDuration = Duration.ZERO,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
commands = LfCommands(
commands = ImmArray.empty,
ledgerEffectiveTime = Time.Timestamp.Epoch,

View File

@ -9,7 +9,7 @@ import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.DomainMocks
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
@ -418,7 +418,7 @@ object ApiSubmissionServiceSpec {
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.MIN,
deduplicationDuration = Duration.ZERO,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""),
)
)

View File

@ -89,11 +89,10 @@ private[v2] object AdaptedV1WriteService {
private val NoErrorDetails = Seq.empty[com.google.protobuf.any.Any]
def adaptSubmitterInfo(submitterInfo: SubmitterInfo): v1.SubmitterInfo = {
val deduplicateUntil = submitterInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(duration) => Instant.now().plus(duration)
case DeduplicationPeriod.DeduplicationOffset(_) =>
throw new NotImplementedError("Deduplication offset not supported as deduplication period")
}
val deduplicateUntil = DeduplicationPeriod.deduplicateUntil(
Instant.now(),
submitterInfo.deduplicationPeriod,
)
v1.SubmitterInfo(
actAs = submitterInfo.actAs,
applicationId = submitterInfo.applicationId,

View File

@ -8,6 +8,7 @@ import java.security.cert.X509Certificate
import java.util.UUID
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod.DeduplicationTime
import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.google.protobuf.duration.Duration
@ -66,7 +67,7 @@ package object testing {
)
)
),
deduplicationTime = Some(Duration(seconds = 1.day.toSeconds)),
deduplicationPeriod = DeduplicationTime(Duration(seconds = 1.day.toSeconds)),
minLedgerTimeRel = Some(Duration(seconds = 1.minute.toSeconds)),
)
)