Implement timed command deduplication in kvutils (#5242)

* Implement timed command deduplication in kvutils

This adds a field deduplication_time to DamlCommandDedupValue for
deduplication timeout checking.

* Bump kvutils version to 4

* Fix CommandTracker pulling commandResultIn multiple times
Now that the timeouts are generated out of band, we have 2
"unsynchronized" places that pull on commandResultIn.
Whenever we pull, we need to check that commandResultIn hasn't been
pulled before.

* Add inStaticTimeMode flag to enable command dedup in sandbox-next with static-time

Fixes #4624.

CHANGELOG_BEGIN
[kvutils] KVUtils now respects the command deduplciation time instead of
deduplicating commands forever.
CHANGELOG_END
This commit is contained in:
Gerolf Seitz 2020-04-03 10:44:35 +02:00 committed by GitHub
parent 2a10109ed2
commit 5ddbd5c511
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 85 additions and 25 deletions

View File

@ -126,7 +126,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur
)
setHandler(resultOut, new OutHandler {
override def onPull(): Unit = pull(commandResultIn)
override def onPull(): Unit = if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
})
setHandler(
@ -147,7 +147,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur
pushResultOrPullCommandResultIn(getOutputForCompletion(completion))
case Right(CompletionStreamElement.CheckpointElement(checkpoint)) =>
pull(commandResultIn)
if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
checkpoint.offset.foreach(emit(offsetOut, _))
}

View File

@ -154,6 +154,5 @@ conformance_test(
"--crt $(rlocation $TEST_WORKSPACE/$(rootpath //ledger/test-common/test-certificates:client.crt))",
"--cacrt $(rlocation $TEST_WORKSPACE/$(rootpath //ledger/test-common/test-certificates:ca.crt))",
"--pem $(rlocation $TEST_WORKSPACE/$(rootpath //ledger/test-common/test-certificates:client.pem))",
"--exclude=CommandDeduplicationIT",
],
)

View File

@ -115,7 +115,6 @@ conformance_test(
"--verbose",
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=CommandDeduplicationIT",
],
)
@ -135,7 +134,6 @@ conformance_test(
"--verbose",
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=CommandDeduplicationIT",
],
)

View File

@ -40,10 +40,11 @@ final class InMemoryLedgerReaderWriter(
private val committer = new ValidatingCommitter(
() => timeProvider.getCurrentTime,
SubmissionValidator
.create(
.createForTimeMode(
new InMemoryLedgerStateAccess(state),
allocateNextLogEntryId = () => sequentialLogEntryId.next(),
metricRegistry = metricRegistry,
inStaticTimeMode = timeProvider != TimeProvider.UTC
),
dispatcher.signalNewHead,
)

View File

@ -234,7 +234,6 @@ da_scala_test_suite(
"--verbose",
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=CommandDeduplicationIT",
],
),
conformance_test(

View File

@ -56,10 +56,11 @@ final class SqlLedgerReaderWriter(
private val committer = new ValidatingCommitter[Index](
() => timeProvider.getCurrentTime,
SubmissionValidator
.create(
.createForTimeMode(
SqlLedgerStateAccess,
allocateNextLogEntryId = () => allocateSeededLogEntryId(),
metricRegistry = metricRegistry,
inStaticTimeMode = timeProvider != TimeProvider.UTC
),
latestSequenceNo => dispatcher.signalNewHead(latestSequenceNo),
)

View File

@ -157,7 +157,6 @@ client_server_build(
testonly = True, # only test targets can depend on this.
client = "//ledger/ledger-api-test-tool",
client_args = [
"--exclude=LotsOfPartiesIT,TransactionScaleIT,CommandDeduplicationIT",
"--timeout-scale-factor=20",
"localhost:6865",
],

View File

@ -348,13 +348,16 @@ message DamlStateValue {
message DamlCommandDedupKey {
string submitter = 1;
string application_id = 2;
reserved 2; // was application_id
string command_id = 3;
}
message DamlCommandDedupValue {
// record time will be used in the future for pruning
google.protobuf.Timestamp record_time = 1;
// the time until when future commands with the same
// deduplication key will be rejected due to a duplicate submission
google.protobuf.Timestamp deduplicated_until = 2;
}
message DamlSubmissionDedupKey {

View File

@ -82,7 +82,6 @@ private[state] object Conversions {
DamlStateKey.newBuilder
.setCommandDedup(
DamlCommandDedupKey.newBuilder
.setApplicationId(subInfo.getApplicationId)
.setCommandId(subInfo.getCommandId)
.setSubmitter(subInfo.getSubmitter)
.build
@ -131,6 +130,8 @@ private[state] object Conversions {
.setSubmitter(subInfo.submitter)
.setApplicationId(subInfo.applicationId)
.setCommandId(subInfo.commandId)
.setDeduplicateUntil(
buildTimestamp(Time.Timestamp.assertFromInstant(subInfo.deduplicateUntil)))
.build
def parseSubmitterInfo(subInfo: DamlSubmitterInfo): SubmitterInfo =

View File

@ -26,9 +26,17 @@ import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
class KeyValueCommitting(metricRegistry: MetricRegistry) {
// Added inStaticTimeMode to indicate whether the ledger uses static time mode or not.
// This has an impact on command deduplication and needs to be threaded through ProcessTransactionSubmission.
// See that class for more comments.
//
// The primary constructor is private to the daml package, because we don't expect any ledger other
// than sandbox to actually support static time.
class KeyValueCommitting private[daml] (metricRegistry: MetricRegistry, inStaticTimeMode: Boolean) {
private val logger = LoggerFactory.getLogger(this.getClass)
def this(metricRegistry: MetricRegistry) = this(metricRegistry, false)
def packDamlStateKey(key: DamlStateKey): ByteString = key.toByteString
def unpackDamlStateKey(bytes: ByteString): DamlStateKey =
@ -169,13 +177,14 @@ class KeyValueCommitting(metricRegistry: MetricRegistry) {
)
case DamlSubmission.PayloadCase.TRANSACTION_ENTRY =>
new ProcessTransactionSubmission(defaultConfig, engine, metricRegistry).run(
entryId,
recordTime,
participantId,
submission.getTransactionEntry,
inputState,
)
new ProcessTransactionSubmission(defaultConfig, engine, metricRegistry, inStaticTimeMode)
.run(
entryId,
recordTime,
participantId,
submission.getTransactionEntry,
inputState,
)
case DamlSubmission.PayloadCase.PAYLOAD_NOT_SET =>
throw Err.InvalidSubmission("DamlSubmission payload not set")

View File

@ -10,6 +10,10 @@ package com.daml.ledger.participant.state.kvutils
* [after 100.13.55]: *BACKWARDS INCOMPATIBLE*
* - Remove use of relative contract ids. Introduces kvutils version 2.
* - Introduce DamlSubmissionBatch.
* - Respect the deduplication time provided by submissions.
* - Remove DamlCommandDedupKey#application_id.
* - Add DamlCommanDedupValue#deduplicatedUntil.
* - Introduces kvutils version 3.
*
* [after 100.13.52]: *BACKWARDS INCOMPATIBLE*
* - Use hash for serializing contract keys instead of serializing the value, as
@ -74,6 +78,8 @@ object Version {
* * Add submissionTime in DamlTransactionEntry and use it instead of ledgerTime to derive
* contract ids.
* * Add DamlSubmissionBatch message.
* 4: * Remove application_id from DamlCommandDedupKey. Only submitter and commandId are used for deduplication.
* * Add deduplicatedUntil field to DamlCommandDedupValue to restrict the deduplication window.
*/
val version: Long = 3
val version: Long = 4
}

View File

@ -3,6 +3,8 @@
package com.daml.ledger.participant.state.kvutils.committing
import java.time.Instant
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.Conversions._
@ -26,10 +28,22 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
// The parameter inStaticTimeMode indicates that the ledger is running in static time mode.
//
// Command deduplication is always based on wall clock time and not ledger time. In static time mode,
// record time cannot be used for command deduplication. This flag indicates that the system clock should
// be used as submission time for commands instead of record time.
//
// Other possible solutions that we discarded:
// * Pass in an additional time provider, but this hides the intent
// * Adding and additional submission field commandDedupSubmissionTime field. While having participants
// provide this field *could* lead to possible exploits, they are not exploits that could do any harm.
// The bigger concern is adding a public API for the specific use case of Sandbox with static time.
private[kvutils] class ProcessTransactionSubmission(
defaultConfig: Configuration,
engine: Engine,
metricRegistry: MetricRegistry,
inStaticTimeMode: Boolean
) {
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
@ -87,13 +101,15 @@ private[kvutils] class ProcessTransactionSubmission(
): Commit[Unit] = {
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
get(dedupKey).flatMap { dedupEntry =>
if (dedupEntry.isEmpty) {
val submissionTime = if (inStaticTimeMode) Instant.now() else recordTime.toInstant
if (dedupEntry.forall(isAfterDeduplicationTime(submissionTime, _))) {
Commit.set(
dedupKey ->
DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder
.setRecordTime(buildTimestamp(recordTime))
.setDeduplicatedUntil(transactionEntry.submitterInfo.getDeduplicateUntil)
.build)
.build)
} else {
@ -109,6 +125,20 @@ private[kvutils] class ProcessTransactionSubmission(
}
}
// Checks that the submission time of the command is after the
// deduplicationTime represented by stateValue
private def isAfterDeduplicationTime(
submissionTime: Instant,
stateValue: DamlStateValue): Boolean = {
val cmdDedup = stateValue.getCommandDedup
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
dedupTime.isBefore(submissionTime)
} else {
false
}
}
/** Authorize the submission by looking up the party allocation and verifying
* that the submitting party is indeed hosted by the submitting participant.
*

View File

@ -308,10 +308,27 @@ object SubmissionValidator {
allocateNextLogEntryId: () => DamlLogEntryId = () => allocateRandomLogEntryId(),
checkForMissingInputs: Boolean = false,
metricRegistry: MetricRegistry,
)(implicit executionContext: ExecutionContext): SubmissionValidator[LogResult] = {
createForTimeMode(
ledgerStateAccess,
allocateNextLogEntryId,
checkForMissingInputs,
metricRegistry,
inStaticTimeMode = false,
)
}
// Internal method to enable proper command dedup in sandbox with static time mode
private[daml] def createForTimeMode[LogResult](
ledgerStateAccess: LedgerStateAccess[LogResult],
allocateNextLogEntryId: () => DamlLogEntryId = () => allocateRandomLogEntryId(),
checkForMissingInputs: Boolean = false,
metricRegistry: MetricRegistry,
inStaticTimeMode: Boolean,
)(implicit executionContext: ExecutionContext): SubmissionValidator[LogResult] = {
new SubmissionValidator(
ledgerStateAccess,
processSubmission(new KeyValueCommitting(metricRegistry)),
processSubmission(new KeyValueCommitting(metricRegistry, inStaticTimeMode)),
allocateNextLogEntryId,
checkForMissingInputs,
metricRegistry,

View File

@ -432,7 +432,6 @@ server_conformance_test(
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=ClosedWorldIT",
"--exclude=CommandDeduplicationIT",
],
)
@ -466,7 +465,6 @@ server_conformance_test(
"--open-world",
"--all-tests",
"--exclude=ClosedWorldIT",
"--exclude=CommandDeduplicationIT",
"--exclude=ConfigManagementServiceIT",
],
)
@ -482,7 +480,6 @@ server_conformance_test(
"--open-world",
"--all-tests",
"--exclude=ClosedWorldIT",
"--exclude=CommandDeduplicationIT",
"--exclude=ConfigManagementServiceIT",
],
)