Add maxDeduplicationTime to the participant state API (#4722)

* Add max deduplication time to ledger configuration

* Add TTL to participant state API

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Robert Autenrieth 2020-03-11 09:49:37 +01:00 committed by GitHub
parent 8fe497ea5d
commit 9fd484469a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 112 additions and 23 deletions

View File

@ -122,6 +122,7 @@ message DamlSubmitterInfo {
string command_id = 2;
string application_id = 3;
google.protobuf.Timestamp maximum_record_time = 4;
google.protobuf.Timestamp deduplicate_until = 5;
}
// DAML transaction entry, used in both `DamlSubmission` and `DamlLogEntry`.

View File

@ -138,7 +138,8 @@ private[state] object Conversions {
submitter = Party.assertFromString(subInfo.getSubmitter),
applicationId = LedgerString.assertFromString(subInfo.getApplicationId),
commandId = LedgerString.assertFromString(subInfo.getCommandId),
maxRecordTime = parseTimestamp(subInfo.getMaximumRecordTime)
maxRecordTime = parseTimestamp(subInfo.getMaximumRecordTime),
deduplicateUntil = parseTimestamp(subInfo.getDeduplicateUntil).toInstant,
)
def buildTimestamp(ts: Time.Timestamp): com.google.protobuf.Timestamp = {

View File

@ -67,6 +67,9 @@ object Version {
* 2: Deprecate use of relative contract identifiers. The transaction is submitted with absolute contract
* identifiers. Backwards incompatible to remove unnecessary traversal of the transaction when consuming
* it and to make it possible to remove DamlLogEntryId.
*
* 3: Add an explicit deduplication time window to each submission. Backwards incompatible because
* it is unclear how to set a sensible default value while the submission time us unknown.
*/
val version: Long = 2
val version: Long = 3
}

View File

@ -3,6 +3,8 @@
package com.daml.ledger.participant.state.kvutils.api
import java.time.Duration
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.v1.{Configuration, LedgerId, Offset, TimeModel}
@ -22,6 +24,7 @@ trait LedgerReader extends ReportsHealth {
object LedgerReader {
val DefaultConfiguration = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1),
)
}

View File

@ -721,6 +721,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
maxRecordTime = inTheFuture(10.seconds),
deduplicateUntil = inTheFuture(10.seconds).toInstant,
)
private def theOffset(first: Long, rest: Long*): Offset =
@ -782,7 +783,7 @@ object ParticipantStateIntegrationSpecBase {
private def matchTransaction(update: Update, expectedCommandId: String): Assertion =
inside(update) {
case TransactionAccepted(Some(SubmitterInfo(_, _, actualCommandId, _)), _, _, _, _, _) =>
case TransactionAccepted(Some(SubmitterInfo(_, _, actualCommandId, _, _)), _, _, _, _, _) =>
actualCommandId should be(expectedCommandId)
}
}

View File

@ -207,14 +207,16 @@ object KVTest {
mrtDelta: Duration = minMRTDelta,
letDelta: Duration = Duration.ZERO,
commandId: CommandId = randomLedgerString,
): KVTest[(DamlLogEntryId, DamlLogEntry)] =
deduplicationTime: Duration = Duration.ofDays(1)): KVTest[(DamlLogEntryId, DamlLogEntry)] =
for {
testState <- get[KVTestState]
submInfo = SubmitterInfo(
submitter = submitter,
applicationId = Ref.LedgerString.assertFromString("test"),
commandId = commandId,
maxRecordTime = testState.recordTime.addMicros(mrtDelta.toNanos / 1000)
maxRecordTime = testState.recordTime.addMicros(mrtDelta.toNanos / 1000),
deduplicateUntil =
testState.recordTime.addMicros(deduplicationTime.toNanos / 1000).toInstant,
)
(tx, txMetaData) = transaction
subm = KeyValueSubmission.transactionToSubmission(

View File

@ -3,6 +3,7 @@
package com.daml.ledger.participant.state.kvutils
import java.time.Duration
import java.util.UUID
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntryId
@ -108,7 +109,8 @@ object TestHelpers {
val theRecordTime: Timestamp = Timestamp.Epoch
val theDefaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1),
)
def mkEntryId(n: Int): DamlLogEntryId =

View File

@ -87,7 +87,11 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with MockitoSugar {
private val aSubmissionId: SubmissionId =
Ref.LedgerString.assertFromString(UUID.randomUUID().toString)
private val aConfiguration: Configuration = Configuration(1, TimeModel.reasonableDefault)
private val aConfiguration: Configuration = Configuration(
generation = 1,
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1),
)
private def createWriter(captor: Option[ArgumentCaptor[Array[Byte]]] = None): LedgerWriter = {
val writer = mock[LedgerWriter]
@ -101,7 +105,8 @@ class KeyValueParticipantStateWriterSpec extends WordSpec with MockitoSugar {
submitter = party,
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString("X"),
maxRecordTime = rt.addMicros(Duration.ofSeconds(10).toNanos / 1000)
maxRecordTime = rt.addMicros(Duration.ofSeconds(10).toNanos / 1000),
deduplicateUntil = rt.addMicros(Duration.ofDays(1).toNanos / 1000).toInstant,
)
private def transactionMeta(let: Timestamp) = TransactionMeta(

View File

@ -4,6 +4,7 @@
package com.daml.ledger.participant.state.kvutils.tools
import java.io.{DataInputStream, FileInputStream}
import java.time.Duration
import java.util.concurrent.TimeUnit
import com.codahale.metrics
@ -50,7 +51,8 @@ object IntegrityCheck extends App {
val engine = Engine()
val defaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault
timeModel = TimeModel.reasonableDefault,
maxDeduplicationTime = Duration.ofDays(1),
)
var state = Map.empty[Proto.DamlStateKey, Proto.DamlStateValue]

View File

@ -31,6 +31,11 @@ message LedgerConfiguration {
// The ledger time model, specifying the bounds for
// ledger effective time and maximum record time of transactions.
LedgerTimeModel time_model = 3;
// The maximum value for the ``deduplication_time`` parameter of command submissions
// (as described in ``commands.proto``). This defines the maximum time window during which
// commands can be deduplicated.
google.protobuf.Duration max_deduplication_time = 4;
}
message LedgerTimeModel {

View File

@ -48,10 +48,13 @@ stale data.
``time_model`` is required.
``authorized_participant_ids`` is optional. If non-empty, then configuration
change originating from a participant that does not match any of the authorized
participants field will be rejected.
If unset, then change from any participant is accepted and that participant can set this field.
*since version 2*
``max_deduplication_time`` is required and must be set to a positive duration.
This defines the maximum value for the corresponding ``deduplication_time``
parameter of command submissions, i.e., the maximum time during which a command
can be deduplicated.
message LedgerTimeModel
^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -15,12 +15,19 @@ final case class Configuration(
generation: Long,
/** The time model of the ledger. Specifying the time-to-live bounds for Ledger API commands. */
timeModel: TimeModel,
/** The maximum time window during which commands can be deduplicated. */
maxDeduplicationTime: Duration,
)
object Configuration {
import com.daml.ledger.participant.state.protobuf
val protobufVersion: Long = 1L
/**
* Version history:
* V1: initial version
* V2: added maxDeduplicationTime
*/
val protobufVersion: Long = 2L
def decode(bytes: Array[Byte]): Either[String, Configuration] =
Try(protobuf.LedgerConfiguration.parseFrom(bytes)).toEither.left
@ -31,6 +38,7 @@ object Configuration {
def decode(config: protobuf.LedgerConfiguration): Either[String, Configuration] =
config.getVersion match {
case 1 => DecodeV1.decode(config)
case 2 => DecodeV2.decode(config)
case v => Left(s"Unknown version: $v")
}
@ -46,6 +54,38 @@ object Configuration {
Configuration(
generation = config.getGeneration,
timeModel = tm,
maxDeduplicationTime = Duration.ofDays(1),
)
}
def decodeTimeModel(tm: protobuf.LedgerTimeModel): Either[String, TimeModel] =
TimeModel(
maxClockSkew = parseDuration(tm.getMaxClockSkew),
minTransactionLatency = parseDuration(tm.getMinTransactionLatency),
maxTtl = parseDuration(tm.getMaxTtl),
avgTransactionLatency = parseDuration(tm.getAvgTransactionLatency),
minSkew = parseDuration(tm.getMinSkew),
maxSkew = parseDuration(tm.getMaxSkew),
).toEither.left.map(e => s"decodeTimeModel: ${e.getMessage}")
}
private object DecodeV2 {
def decode(config: protobuf.LedgerConfiguration): Either[String, Configuration] =
for {
tm <- if (config.hasTimeModel)
decodeTimeModel(config.getTimeModel)
else
Left("Missing time model")
maxDeduplicationTime <- if (config.hasMaxDeduplicationTime)
Right(parseDuration(config.getMaxDeduplicationTime))
else
Left("Missing maximum command time to live")
} yield {
Configuration(
generation = config.getGeneration,
timeModel = tm,
maxDeduplicationTime = maxDeduplicationTime,
)
}
@ -74,6 +114,7 @@ object Configuration {
.setMinSkew(buildDuration(tm.minSkew))
.setMaxSkew(buildDuration(tm.maxSkew))
)
.setMaxDeduplicationTime(buildDuration(config.maxDeduplicationTime))
.build
}

View File

@ -3,6 +3,8 @@
package com.daml.ledger.participant.state.v1
import java.time.Instant
import com.digitalasset.daml.lf.data.Time.Timestamp
/** Information provided by the submitter of changes submitted to the ledger.
@ -25,10 +27,19 @@ import com.digitalasset.daml.lf.data.Time.Timestamp
* by DAML applications to deduce from the record time reported by the
* ledger whether a change that they submitted has been lost in transit.
*
* @param deduplicateUntil: the time until which the command should be deduplicated.
* Command deduplication is already performed by the participant.
* The ledger may choose to perform additional (cross-participant)
* command deduplication. If it chooses to do so, it must follow the
* same rules as the participant:
* - Deduplication is based on the (submitter, commandId) tuple.
* - Commands must not be deduplicated after the `deduplicateUntil` time has passed.
* - Commands should not be deduplicated after the command was rejected.
*/
final case class SubmitterInfo(
submitter: Party,
applicationId: ApplicationId,
commandId: CommandId,
maxRecordTime: Timestamp //TODO: this should be a regular Instant
maxRecordTime: Timestamp, //TODO: this should be a regular Instant
deduplicateUntil: Instant,
)

View File

@ -67,7 +67,8 @@ class CommandExecutorImpl(
submitted.submitter,
submitted.applicationId.unwrap,
submitted.commandId.unwrap,
Time.Timestamp.assertFromInstant(submitted.maximumRecordTime)
Time.Timestamp.assertFromInstant(submitted.maximumRecordTime),
submitted.deduplicateUntil,
),
TransactionMeta(
Time.Timestamp.assertFromInstant(submitted.ledgerEffectiveTime),

View File

@ -5,7 +5,7 @@ package com.digitalasset.platform.sandbox
import java.io.File
import java.nio.file.Files
import java.time.Instant
import java.time.{Duration, Instant}
import akka.actor.ActorSystem
import akka.stream.Materializer
@ -211,7 +211,11 @@ final class SandboxServer(
implicit val actorSystem: ActorSystem = materializer.system
implicit val executionContext: ExecutionContext = materializer.executionContext
val defaultConfiguration = ParticipantState.Configuration(0, config.timeModel)
val defaultConfiguration = ParticipantState.Configuration(
generation = 0,
timeModel = config.timeModel,
maxDeduplicationTime = Duration.ofDays(1),
)
val (acs, ledgerEntries, mbLedgerTime) = createInitialState(config, packageStore)

View File

@ -114,7 +114,8 @@ class ImplicitPartyAdditionIT
Ref.Party.assertFromString(submitter),
Ref.LedgerString.assertFromString("appId"),
Ref.LedgerString.assertFromString(commandId),
Time.Timestamp.assertFromInstant(MRT)
Time.Timestamp.assertFromInstant(MRT),
DeduplicateUntil,
)
val transactionMeta = TransactionMeta(
@ -129,6 +130,7 @@ class ImplicitPartyAdditionIT
val LET = Instant.EPOCH.plusSeconds(10)
val MRT = Instant.EPOCH.plusSeconds(10)
val DeduplicateUntil = Instant.now.plusSeconds(3600)
"A Ledger" should {
"implicitly add parties mentioned in a transaction" in allFixtures { ledger =>

View File

@ -94,7 +94,8 @@ class TransactionMRTComplianceIT
submitter = Ref.Party.assertFromString("submitter"),
applicationId = Ref.LedgerString.assertFromString("appId"),
commandId = Ref.LedgerString.assertFromString("cmdId"),
maxRecordTime = Time.Timestamp.assertFromInstant(MRT)
maxRecordTime = Time.Timestamp.assertFromInstant(MRT),
deduplicateUntil = Instant.EPOCH
)
val transactionMeta = TransactionMeta(
ledgerEffectiveTime = Time.Timestamp.assertFromInstant(LET),

View File

@ -4,7 +4,7 @@
package com.digitalasset.platform.store.dao
import java.io.File
import java.time.Instant
import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
@ -267,7 +267,8 @@ class JdbcLedgerDaoSpec
val defaultConfig = Configuration(
generation = 0,
timeModel = TimeModel.reasonableDefault
timeModel = TimeModel.reasonableDefault,
Duration.ofDays(1),
)
"be able to persist and load configuration" in {