mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Add support for fingerprints in CommitContext
(#6567)
This commit is contained in:
parent
873e0163cd
commit
43f99f45fc
@ -95,7 +95,7 @@ class KeyValueCommitting private[daml] (
|
||||
defaultConfig: Configuration,
|
||||
submission: DamlSubmission,
|
||||
participantId: ParticipantId,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
inputState: DamlStateMap,
|
||||
): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) = {
|
||||
metrics.daml.kvutils.committer.processing.inc()
|
||||
metrics.daml.kvutils.committer.last.lastRecordTimeGauge.updateValue(recordTime.toString)
|
||||
@ -136,7 +136,7 @@ class KeyValueCommitting private[daml] (
|
||||
defaultConfig: Configuration,
|
||||
submission: DamlSubmission,
|
||||
participantId: ParticipantId,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
inputState: DamlStateMap,
|
||||
): (DamlLogEntry, Map[DamlStateKey, DamlStateValue]) =
|
||||
submission.getPayloadCase match {
|
||||
case DamlSubmission.PayloadCase.PACKAGE_UPLOAD_ENTRY =>
|
||||
|
@ -8,7 +8,12 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlStateKey,
|
||||
DamlStateValue
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlStateMap, Err}
|
||||
import com.daml.ledger.participant.state.kvutils.{
|
||||
DamlStateMap,
|
||||
DamlStateMapWithFingerprints,
|
||||
Err,
|
||||
Fingerprint
|
||||
}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -21,15 +26,19 @@ import scala.collection.mutable
|
||||
private[kvutils] trait CommitContext {
|
||||
private[this] val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def inputs: DamlStateMap
|
||||
def inputsWithFingerprints: DamlStateMapWithFingerprints
|
||||
final def inputs: DamlStateMap = inputsWithFingerprints.map {
|
||||
case (key, (value, _)) => (key, value)
|
||||
}
|
||||
|
||||
// NOTE(JM): The outputs must be iterable in deterministic order, hence we
|
||||
// keep track of insertion order.
|
||||
private val outputOrder: mutable.ArrayBuffer[DamlStateKey] =
|
||||
mutable.ArrayBuffer()
|
||||
private val outputs: mutable.Map[DamlStateKey, DamlStateValue] =
|
||||
mutable.HashMap.empty[DamlStateKey, DamlStateValue]
|
||||
private val accessedInputKeys: mutable.Set[DamlStateKey] =
|
||||
mutable.Set.empty[DamlStateKey]
|
||||
private val accessedInputKeysAndFingerprints: mutable.Set[(DamlStateKey, Fingerprint)] =
|
||||
mutable.Set.empty[(DamlStateKey, Fingerprint)]
|
||||
|
||||
def getEntryId: DamlLogEntryId
|
||||
def getRecordTime: Option[Timestamp]
|
||||
@ -38,9 +47,9 @@ private[kvutils] trait CommitContext {
|
||||
/** Retrieve value from output state, or if not found, from input state. */
|
||||
def get(key: DamlStateKey): Option[DamlStateValue] =
|
||||
outputs.get(key).orElse {
|
||||
val value = inputs.getOrElse(key, throw Err.MissingInputState(key))
|
||||
accessedInputKeys += key
|
||||
value
|
||||
val value = inputsWithFingerprints.getOrElse(key, throw Err.MissingInputState(key))
|
||||
accessedInputKeysAndFingerprints += key -> value._2
|
||||
value._1
|
||||
}
|
||||
|
||||
/** Set a value in the output state. */
|
||||
@ -69,8 +78,8 @@ private[kvutils] trait CommitContext {
|
||||
}
|
||||
|
||||
/** Get the accessed input key set. */
|
||||
def getAccessedInputKeys: collection.Set[DamlStateKey] =
|
||||
accessedInputKeys
|
||||
def getAccessedInputKeysWithFingerprints: collection.Set[(DamlStateKey, Fingerprint)] =
|
||||
accessedInputKeysAndFingerprints
|
||||
|
||||
private def inputAlreadyContains(key: DamlStateKey, value: DamlStateValue): Boolean =
|
||||
inputs.get(key).exists(_.contains(value))
|
||||
|
@ -12,7 +12,13 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlStateKey,
|
||||
DamlStateValue
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, DamlStateMap, Err}
|
||||
import com.daml.ledger.participant.state.kvutils.{
|
||||
Conversions,
|
||||
DamlStateMap,
|
||||
DamlStateMapWithFingerprints,
|
||||
FingerprintPlaceholder,
|
||||
Err
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.committer.Committer._
|
||||
import com.daml.ledger.participant.state.v1.{Configuration, ParticipantId}
|
||||
import com.daml.lf.data.Time
|
||||
@ -76,7 +82,10 @@ private[committer] trait Committer[Submission, PartialResult] {
|
||||
override def getEntryId: DamlLogEntryId = entryId
|
||||
override def getRecordTime: Option[Time.Timestamp] = recordTime
|
||||
override def getParticipantId: ParticipantId = participantId
|
||||
override def inputs: DamlStateMap = inputState
|
||||
override def inputsWithFingerprints: DamlStateMapWithFingerprints =
|
||||
inputState.map {
|
||||
case (key, value) => (key, (value, FingerprintPlaceholder))
|
||||
}
|
||||
}
|
||||
var cstate = init(ctx, submission)
|
||||
for ((info, step) <- steps) {
|
||||
@ -97,7 +106,7 @@ object Committer {
|
||||
|
||||
def getCurrentConfiguration(
|
||||
defaultConfig: Configuration,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
inputState: DamlStateMap,
|
||||
logger: Logger): (Option[DamlConfigurationEntry], Configuration) =
|
||||
inputState
|
||||
.getOrElse(
|
||||
|
@ -30,9 +30,12 @@ import com.google.protobuf.ByteString
|
||||
package object kvutils {
|
||||
|
||||
type Bytes = ByteString
|
||||
|
||||
type DamlStateMap = Map[DamlStateKey, Option[DamlStateValue]]
|
||||
|
||||
type Fingerprint = Bytes
|
||||
type DamlStateMapWithFingerprints = Map[DamlStateKey, (Option[DamlStateValue], Fingerprint)]
|
||||
val FingerprintPlaceholder: Fingerprint = ByteString.EMPTY
|
||||
|
||||
val MetricPrefix: MetricName = MetricName.DAML :+ "kvutils"
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import com.codahale.metrics.Timer
|
||||
import com.daml.caching.Cache
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, DamlStateMap, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.SubmissionValidator._
|
||||
import com.daml.ledger.validator.ValidationFailed.{MissingInputState, ValidationError}
|
||||
@ -45,7 +45,7 @@ class SubmissionValidator[LogResult] private[validator] (
|
||||
Timestamp,
|
||||
DamlSubmission,
|
||||
ParticipantId,
|
||||
Map[DamlStateKey, Option[DamlStateValue]],
|
||||
DamlStateMap,
|
||||
) => LogEntryAndState,
|
||||
allocateLogEntryId: () => DamlLogEntryId,
|
||||
checkForMissingInputs: Boolean,
|
||||
@ -235,7 +235,7 @@ class SubmissionValidator[LogResult] private[validator] (
|
||||
|
||||
private def verifyAllInputsArePresent[T](
|
||||
declaredInputs: Seq[DamlStateKey],
|
||||
readInputs: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
readInputs: DamlStateMap,
|
||||
): Future[Unit] = {
|
||||
if (checkForMissingInputs) {
|
||||
val missingInputs = declaredInputs.toSet -- readInputs.filter(_._2.isDefined).keySet
|
||||
@ -250,7 +250,7 @@ class SubmissionValidator[LogResult] private[validator] (
|
||||
}
|
||||
|
||||
private def flattenInputStates(
|
||||
inputs: Map[DamlStateKey, Option[DamlStateValue]]
|
||||
inputs: DamlStateMap
|
||||
): Map[DamlStateKey, DamlStateValue] =
|
||||
inputs.collect {
|
||||
case (key, Some(value)) => key -> value
|
||||
@ -347,7 +347,7 @@ object SubmissionValidator {
|
||||
recordTime: Timestamp,
|
||||
damlSubmission: DamlSubmission,
|
||||
participantId: ParticipantId,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]],
|
||||
inputState: DamlStateMap,
|
||||
): LogEntryAndState =
|
||||
keyValueCommitting.processSubmission(
|
||||
damlLogEntryId,
|
||||
|
@ -10,7 +10,11 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
|
||||
DamlStateValue
|
||||
}
|
||||
import com.daml.ledger.participant.state.kvutils.Err.MissingInputState
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils, DamlStateMap, TestHelpers}
|
||||
import com.daml.ledger.participant.state.kvutils.{
|
||||
DamlKvutils,
|
||||
DamlStateMapWithFingerprints,
|
||||
TestHelpers
|
||||
}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.lf.data.Time
|
||||
import org.scalatest.{Matchers, WordSpec}
|
||||
@ -18,29 +22,32 @@ import org.scalatest.{Matchers, WordSpec}
|
||||
class CommitContextSpec extends WordSpec with Matchers {
|
||||
"get" should {
|
||||
"check output first" in {
|
||||
val context = newInstance(Map(aKey -> Some(anotherValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> anotherValue))
|
||||
context.set(aKey, aValue)
|
||||
context.get(aKey) shouldBe Some(aValue)
|
||||
}
|
||||
|
||||
"return input if key has not been output" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> aValue))
|
||||
context.get(aKey) shouldBe Some(aValue)
|
||||
}
|
||||
|
||||
"record all accessed input keys" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue), anotherKey -> Some(anotherValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> aValue, anotherKey -> anotherValue))
|
||||
context.get(aKey)
|
||||
context.get(anotherKey)
|
||||
|
||||
context.getAccessedInputKeys shouldBe Set(aKey, anotherKey)
|
||||
context.getAccessedInputKeysWithFingerprints shouldBe Set(
|
||||
aKey -> aValue.toByteString,
|
||||
anotherKey -> anotherValue.toByteString)
|
||||
}
|
||||
|
||||
"not record input keys that are not accessed" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue), anotherKey -> Some(anotherValue)))
|
||||
val context =
|
||||
newInstance(newDamlStateMap(aKey -> aValue, anotherKey -> anotherValue))
|
||||
context.get(aKey)
|
||||
|
||||
context.getAccessedInputKeys shouldBe Set(aKey)
|
||||
context.getAccessedInputKeysWithFingerprints shouldBe Set(aKey -> aValue.toByteString)
|
||||
}
|
||||
|
||||
"throw in case key cannot be found" in {
|
||||
@ -70,19 +77,19 @@ class CommitContextSpec extends WordSpec with Matchers {
|
||||
}
|
||||
|
||||
"not output a key whose value is identical to its input value" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> aValue))
|
||||
context.set(aKey, aValue)
|
||||
context.getOutputs should have size 0
|
||||
}
|
||||
|
||||
"output a key whose value has changed from its input value" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> aValue))
|
||||
context.set(aKey, anotherValue)
|
||||
context.getOutputs.toSeq shouldBe Seq((aKey, anotherValue))
|
||||
}
|
||||
|
||||
"output last set value for a key that was also input" in {
|
||||
val context = newInstance(Map(aKey -> Some(aValue)))
|
||||
val context = newInstance(newDamlStateMap(aKey -> aValue))
|
||||
|
||||
context.set(aKey, anotherValue)
|
||||
context.set(aKey, aValue)
|
||||
@ -101,7 +108,8 @@ class CommitContextSpec extends WordSpec with Matchers {
|
||||
.setParty(DamlPartyAllocation.newBuilder.setDisplayName("another party name"))
|
||||
.build
|
||||
|
||||
private class TestCommitContext(override val inputs: DamlStateMap) extends CommitContext {
|
||||
private class TestCommitContext(override val inputsWithFingerprints: DamlStateMapWithFingerprints)
|
||||
extends CommitContext {
|
||||
override def getEntryId: DamlKvutils.DamlLogEntryId = DamlLogEntryId.getDefaultInstance
|
||||
|
||||
override def getRecordTime: Option[Time.Timestamp] = Some(Time.Timestamp.now())
|
||||
@ -109,5 +117,11 @@ class CommitContextSpec extends WordSpec with Matchers {
|
||||
override def getParticipantId: ParticipantId = TestHelpers.mkParticipantId(1)
|
||||
}
|
||||
|
||||
private def newInstance(inputs: DamlStateMap = Map.empty) = new TestCommitContext(inputs)
|
||||
private def newInstance(inputsWithFingerprints: DamlStateMapWithFingerprints = Map.empty) =
|
||||
new TestCommitContext(inputsWithFingerprints)
|
||||
|
||||
private def newDamlStateMap(
|
||||
keyAndValues: (DamlStateKey, DamlStateValue)*): DamlStateMapWithFingerprints =
|
||||
(for ((key, value) <- keyAndValues)
|
||||
yield (key, (Some(value), value.toByteString))).toMap
|
||||
}
|
||||
|
@ -4,13 +4,13 @@
|
||||
package com.daml.ledger.participant.state.kvutils.committer
|
||||
|
||||
import com.daml.ledger.participant.state.kvutils.TestHelpers.{mkEntryId, mkParticipantId}
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils, DamlStateMap}
|
||||
import com.daml.ledger.participant.state.kvutils.{DamlKvutils, DamlStateMapWithFingerprints}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
|
||||
class FakeCommitContext(
|
||||
recordTime: Option[Timestamp],
|
||||
override val inputs: DamlStateMap = Map.empty,
|
||||
override val inputsWithFingerprints: DamlStateMapWithFingerprints = Map.empty,
|
||||
participantId: Int = 0,
|
||||
entryId: Int = 0)
|
||||
extends CommitContext {
|
||||
|
@ -28,6 +28,8 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
.build
|
||||
private val aTransactionEntrySummary = DamlTransactionEntrySummary(aDamlTransactionEntry)
|
||||
private val aRecordTime = Timestamp(100)
|
||||
private val dedupKey = Conversions
|
||||
.commandDedupKey(aTransactionEntrySummary.submitterInfo)
|
||||
|
||||
"deduplicateCommand" should {
|
||||
"continue if record time is not available" in {
|
||||
@ -44,8 +46,9 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
|
||||
"continue if record time is available but no deduplication entry could be found" in {
|
||||
val instance = createTransactionCommitter()
|
||||
val inputs = Map(Conversions.commandDedupKey(aTransactionEntrySummary.submitterInfo) -> None)
|
||||
val context = new FakeCommitContext(recordTime = Some(aRecordTime), inputs = inputs)
|
||||
val inputs = Map(dedupKey -> (None -> ByteString.EMPTY))
|
||||
val context =
|
||||
new FakeCommitContext(recordTime = Some(aRecordTime), inputsWithFingerprints = inputs)
|
||||
|
||||
val actual = instance.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
@ -57,14 +60,13 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
|
||||
"continue if record time is after deduplication time in case a deduplication entry is found" in {
|
||||
val instance = createTransactionCommitter()
|
||||
val dedupValue = DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(aRecordTime)))
|
||||
.build
|
||||
val dedupValue = newDedupValue(aRecordTime)
|
||||
val inputs =
|
||||
Map(Conversions.commandDedupKey(aTransactionEntrySummary.submitterInfo) -> Some(dedupValue))
|
||||
Map(dedupKey -> (Some(dedupValue) -> dedupValue.toByteString))
|
||||
val context =
|
||||
new FakeCommitContext(recordTime = Some(aRecordTime.addMicros(1)), inputs = inputs)
|
||||
new FakeCommitContext(
|
||||
recordTime = Some(aRecordTime.addMicros(1)),
|
||||
inputsWithFingerprints = inputs)
|
||||
|
||||
val actual = instance.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
@ -79,15 +81,11 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
for ((recordTime, deduplicationTime) <- Iterable(
|
||||
(aRecordTime, aRecordTime),
|
||||
(aRecordTime, aRecordTime.addMicros(1)))) {
|
||||
val dedupValue = DamlStateValue.newBuilder
|
||||
.setCommandDedup(DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(
|
||||
buildTimestamp(deduplicationTime)))
|
||||
.build
|
||||
val dedupValue = newDedupValue(deduplicationTime)
|
||||
val inputs =
|
||||
Map(
|
||||
Conversions.commandDedupKey(aTransactionEntrySummary.submitterInfo) -> Some(dedupValue))
|
||||
Map(dedupKey -> (Some(dedupValue) -> dedupValue.toByteString))
|
||||
val context =
|
||||
new FakeCommitContext(recordTime = Some(recordTime), inputs = inputs)
|
||||
new FakeCommitContext(recordTime = Some(recordTime), inputsWithFingerprints = inputs)
|
||||
|
||||
val actual = instance.deduplicateCommand(context, aTransactionEntrySummary)
|
||||
|
||||
@ -127,11 +125,14 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
)
|
||||
.build
|
||||
val inputWithDeclaredConfig =
|
||||
Map(Conversions.configurationStateKey -> Some(configurationStateValue))
|
||||
Map(
|
||||
Conversions.configurationStateKey -> (Some(configurationStateValue) -> configurationStateValue.toByteString))
|
||||
|
||||
for (ledgerEffectiveTime <- Iterable(lowerBound, upperBound)) {
|
||||
val context =
|
||||
new FakeCommitContext(recordTime = Some(recordTime), inputs = inputWithDeclaredConfig)
|
||||
new FakeCommitContext(
|
||||
recordTime = Some(recordTime),
|
||||
inputsWithFingerprints = inputWithDeclaredConfig)
|
||||
val transactionEntrySummary = DamlTransactionEntrySummary(
|
||||
aDamlTransactionEntry.toBuilder
|
||||
.setLedgerEffectiveTime(
|
||||
@ -152,4 +153,10 @@ class TransactionCommitterSpec extends WordSpec with Matchers with MockitoSugar
|
||||
|
||||
private def createTransactionCommitter(): TransactionCommitter =
|
||||
new TransactionCommitter(theDefaultConfig, mock[Engine], metrics, inStaticTimeMode = false)
|
||||
|
||||
private def newDedupValue(deduplicationTime: Timestamp): DamlStateValue =
|
||||
DamlStateValue.newBuilder
|
||||
.setCommandDedup(
|
||||
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(buildTimestamp(deduplicationTime)))
|
||||
.build
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.caching.Cache
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.MockitoHelpers.captor
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.kvutils.{Bytes, DamlStateMap, Envelope, KeyValueCommitting}
|
||||
import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.ledger.validator.SubmissionValidator.{LogEntryAndState, RawKeyValuePairs}
|
||||
import com.daml.ledger.validator.SubmissionValidatorSpec._
|
||||
@ -92,7 +92,7 @@ class SubmissionValidatorSpec extends AsyncWordSpec with Matchers with Inside {
|
||||
recordTime: Timestamp,
|
||||
damlSubmission: DamlSubmission,
|
||||
participantId: ParticipantId,
|
||||
inputState: Map[DamlStateKey, Option[DamlStateValue]]
|
||||
inputState: DamlStateMap
|
||||
): LogEntryAndState =
|
||||
throw new IllegalArgumentException("Validation failed")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user