mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 16:57:40 +03:00
[kvutils] Record all key lookups by transaction committer [KVL-750] (#8082)
This commit is contained in:
parent
8bceeb13de
commit
0de8f1b1b7
@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v1.ParticipantId
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
import scala.collection.mutable
|
||||
|
||||
/** Commit context provides access to state inputs, commit parameters (e.g. record time) and
|
||||
@ -23,7 +24,7 @@ import scala.collection.mutable
|
||||
private[kvutils] trait CommitContext {
|
||||
private[this] val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def inputs: DamlStateMap
|
||||
protected def inputs: DamlStateMap
|
||||
|
||||
// NOTE(JM): The outputs must be iterable in deterministic order, hence we
|
||||
// keep track of insertion order.
|
||||
@ -42,6 +43,7 @@ private[kvutils] trait CommitContext {
|
||||
var outOfTimeBoundsLogEntry: Option[DamlLogEntry] = None
|
||||
|
||||
def getRecordTime: Option[Timestamp]
|
||||
|
||||
def getParticipantId: ParticipantId
|
||||
|
||||
def preExecute: Boolean = getRecordTime.isEmpty
|
||||
@ -54,6 +56,22 @@ private[kvutils] trait CommitContext {
|
||||
value
|
||||
}
|
||||
|
||||
/** Reads key from input state. Records the key as being accessed even if it's not available. */
|
||||
def read(key: DamlStateKey): Option[DamlStateValue] = {
|
||||
accessedInputKeys += key
|
||||
inputs.get(key).flatten
|
||||
}
|
||||
|
||||
/** Generates a collection from the inputs as determined by a partial function.
|
||||
* Records all keys in the input as being accessed. */
|
||||
def collectInputs[B, That](
|
||||
partialFunction: PartialFunction[(DamlStateKey, Option[DamlStateValue]), B])(
|
||||
implicit bf: CanBuildFrom[Map[DamlStateKey, Option[DamlStateValue]], B, That]): That = {
|
||||
val result = inputs.collect(partialFunction)
|
||||
inputs.keys.foreach(accessedInputKeys.add)
|
||||
result
|
||||
}
|
||||
|
||||
/** Set a value in the output state. */
|
||||
def set(key: DamlStateKey, value: DamlStateValue): Unit = {
|
||||
if (!outputs.contains(key)) {
|
||||
|
@ -10,7 +10,7 @@ import com.daml.ledger.participant.state.kvutils.Conversions._
|
||||
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.Committer._
|
||||
import com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter._
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, DamlStateMap, Err, InputsAndEffects}
|
||||
import com.daml.ledger.participant.state.kvutils.{Conversions, Err, InputsAndEffects}
|
||||
import com.daml.ledger.participant.state.v1.{Configuration, RejectionReason, TimeModel}
|
||||
import com.daml.lf.archive.Decode
|
||||
import com.daml.lf.archive.Reader.ParseError
|
||||
@ -215,10 +215,9 @@ private[kvutils] class TransactionCommitter(
|
||||
// which are not evidenced in the transaction itself and hence the contract key state is
|
||||
// not included in the inputs.
|
||||
lazy val knownKeys: Map[DamlContractKey, Value.ContractId] =
|
||||
commitContext.inputs.collect {
|
||||
commitContext.collectInputs {
|
||||
case (key, Some(value))
|
||||
if value.hasContractState
|
||||
&& value.getContractState.hasContractKey
|
||||
if value.getContractState.hasContractKey
|
||||
&& contractIsActiveAndVisibleToSubmitter(
|
||||
transactionEntry,
|
||||
value.getContractState) =>
|
||||
@ -235,9 +234,9 @@ private[kvutils] class TransactionCommitter(
|
||||
transactionEntry.submissionSeed,
|
||||
)
|
||||
.consume(
|
||||
lookupContract(transactionEntry, commitContext.inputs),
|
||||
lookupPackage(transactionEntry, commitContext.inputs),
|
||||
lookupKey(commitContext.inputs, knownKeys),
|
||||
lookupContract(transactionEntry, commitContext),
|
||||
lookupPackage(transactionEntry, commitContext),
|
||||
lookupKey(commitContext, knownKeys),
|
||||
)
|
||||
.fold(
|
||||
err =>
|
||||
@ -358,8 +357,9 @@ private[kvutils] class TransactionCommitter(
|
||||
(commitContext, transactionEntry) => StepStop(buildLogEntry(transactionEntry, commitContext))
|
||||
|
||||
private def validateContractKeys: Step = (commitContext, transactionEntry) => {
|
||||
val damlState = commitContext.inputs
|
||||
.collect { case (k, Some(v)) => k -> v } ++ commitContext.getOutputs
|
||||
val damlState = commitContext.collectInputs {
|
||||
case (key, Some(value)) if key.hasContractKey => key -> value
|
||||
} ++ commitContext.getOutputs
|
||||
val startingKeys = damlState.collect {
|
||||
case (k, v) if k.hasContractKey && v.getContractKeyState.getContractId.nonEmpty => k
|
||||
}.toSet
|
||||
@ -595,7 +595,7 @@ private[kvutils] class TransactionCommitter(
|
||||
// an input to a transaction, we do not need to verify the inputs separately.
|
||||
private def lookupContract(
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
inputState: DamlStateMap)(
|
||||
commitContext: CommitContext)(
|
||||
coid: Value.ContractId,
|
||||
): Option[Value.ContractInst[Value.VersionedValue[Value.ContractId]]] = {
|
||||
val stateKey = contractIdToStateKey(coid)
|
||||
@ -607,7 +607,7 @@ private[kvutils] class TransactionCommitter(
|
||||
// This is not a problem because after the transaction reinterpretation, we compare the original
|
||||
// transaction with the reinterpreted one, and the LookupByKey node will not match.
|
||||
// Additionally, all contract keys are checked to uphold causal monotonicity.
|
||||
contractState <- inputState.get(stateKey).flatMap(_.map(_.getContractState))
|
||||
contractState <- commitContext.read(stateKey).map(_.getContractState)
|
||||
if contractIsActiveAndVisibleToSubmitter(transactionEntry, contractState)
|
||||
contract = Conversions.decodeContractInstance(contractState.getContractInstance)
|
||||
} yield contract
|
||||
@ -618,13 +618,12 @@ private[kvutils] class TransactionCommitter(
|
||||
// the DAML state entry at `DamlStateKey(packageId = pkgId)`.
|
||||
private def lookupPackage(
|
||||
transactionEntry: DamlTransactionEntrySummary,
|
||||
inputState: DamlStateMap,
|
||||
commitContext: CommitContext,
|
||||
)(pkgId: PackageId): Option[Ast.Package] = {
|
||||
val stateKey = packageStateKey(pkgId)
|
||||
for {
|
||||
value <- inputState
|
||||
.get(stateKey)
|
||||
.flatten
|
||||
value <- commitContext
|
||||
.read(stateKey)
|
||||
.orElse {
|
||||
logger.warn(
|
||||
s"Lookup package failed, package not found, packageId=$pkgId correlationId=${transactionEntry.commandId}")
|
||||
@ -654,7 +653,7 @@ private[kvutils] class TransactionCommitter(
|
||||
}
|
||||
|
||||
private def lookupKey(
|
||||
inputState: DamlStateMap,
|
||||
commitContext: CommitContext,
|
||||
knownKeys: Map[DamlContractKey, Value.ContractId],
|
||||
)(key: GlobalKeyWithMaintainers): Option[Value.ContractId] = {
|
||||
// we don't check whether the contract is active or not, because in we might not have loaded it earlier.
|
||||
@ -670,7 +669,7 @@ private[kvutils] class TransactionCommitter(
|
||||
// contract keys respect causal monotonicity.
|
||||
val stateKey = Conversions.globalKeyToStateKey(key.globalKey)
|
||||
val contractId = for {
|
||||
stateValue <- inputState.get(stateKey).flatten
|
||||
stateValue <- commitContext.read(stateKey)
|
||||
if stateValue.getContractKeyState.getContractId.nonEmpty
|
||||
} yield decodeContractId(stateValue.getContractKeyState.getContractId)
|
||||
|
||||
|
@ -185,10 +185,16 @@ object KVTest {
|
||||
deduplicationTime: Duration = Duration.ofDays(1)): KVTest[(DamlLogEntryId, DamlLogEntry)] =
|
||||
for {
|
||||
testState <- get[KVTestState]
|
||||
submInfo = createSubmitterInfo(submitter, commandId, deduplicationTime, testState)
|
||||
submissionInfo = createSubmitterInfo(submitter, commandId, deduplicationTime, testState)
|
||||
(tx, txMetaData) = transaction
|
||||
subm = transactionToSubmission(submissionSeed, letDelta, testState, submInfo, tx, txMetaData)
|
||||
result <- submit(subm)
|
||||
submission = transactionToSubmission(
|
||||
submissionSeed,
|
||||
letDelta,
|
||||
testState,
|
||||
submissionInfo,
|
||||
tx,
|
||||
txMetaData)
|
||||
result <- submit(submission)
|
||||
} yield result
|
||||
|
||||
def preExecuteTransaction(
|
||||
@ -201,10 +207,16 @@ object KVTest {
|
||||
: KVTest[(DamlLogEntryId, PreExecutionResult)] =
|
||||
for {
|
||||
testState <- get[KVTestState]
|
||||
submInfo = createSubmitterInfo(submitter, commandId, deduplicationTime, testState)
|
||||
submitterInfo = createSubmitterInfo(submitter, commandId, deduplicationTime, testState)
|
||||
(tx, txMetaData) = transaction
|
||||
subm = transactionToSubmission(submissionSeed, letDelta, testState, submInfo, tx, txMetaData)
|
||||
result <- preExecute(subm)
|
||||
submission = transactionToSubmission(
|
||||
submissionSeed,
|
||||
letDelta,
|
||||
testState,
|
||||
submitterInfo,
|
||||
tx,
|
||||
txMetaData)
|
||||
result <- preExecute(submission)
|
||||
} yield result
|
||||
|
||||
def submitConfig(
|
||||
|
@ -50,6 +50,49 @@ class CommitContextSpec extends WordSpec with Matchers {
|
||||
}
|
||||
}
|
||||
|
||||
"read" should {
|
||||
"return input key and record its access" in {
|
||||
val context =
|
||||
newInstance(inputs = newDamlStateMap(aKey -> aValue, anotherKey -> anotherValue))
|
||||
|
||||
context.read(aKey) shouldBe Some(aValue)
|
||||
context.getAccessedInputKeys shouldBe Set(aKey)
|
||||
}
|
||||
|
||||
"record key as accessed even if it is not available in the input" in {
|
||||
val context = newInstance()
|
||||
|
||||
context.read(aKey) shouldBe None
|
||||
context.getAccessedInputKeys shouldBe Set(aKey)
|
||||
}
|
||||
}
|
||||
|
||||
"collectInputs" should {
|
||||
"return keys matching the predicate and mark all inputs as accessed" in {
|
||||
val expectedKey1 = aKeyWithContractId("a1")
|
||||
val expectedKey2 = aKeyWithContractId("a2")
|
||||
val expected = Map(
|
||||
expectedKey1 -> Some(aValue),
|
||||
expectedKey2 -> None
|
||||
)
|
||||
val inputs = expected ++ Map(aKeyWithContractId("b") -> Some(aValue))
|
||||
val context = newInstance(inputs = inputs)
|
||||
|
||||
context.collectInputs {
|
||||
case (key, _) if key.getContractId.startsWith("a") => key
|
||||
}.toSet shouldBe expected.keys
|
||||
context.getAccessedInputKeys shouldBe inputs.keys
|
||||
}
|
||||
|
||||
"return no keys and mark all inputs as accessed for a predicate producing empty output" in {
|
||||
val context =
|
||||
newInstance(inputs = newDamlStateMap(aKey -> aValue, anotherKey -> anotherValue))
|
||||
|
||||
context.collectInputs { case _ if false => () } shouldBe empty
|
||||
context.getAccessedInputKeys shouldBe Set(aKey, anotherKey)
|
||||
}
|
||||
}
|
||||
|
||||
"set" should {
|
||||
"maintain order of keys based on when they were seen first" in {
|
||||
val context = newInstance()
|
||||
@ -104,9 +147,11 @@ class CommitContextSpec extends WordSpec with Matchers {
|
||||
}
|
||||
}
|
||||
|
||||
private val aKey: DamlStateKey = DamlStateKey.newBuilder.setContractId("contract ID 1").build
|
||||
private val anotherKey: DamlStateKey =
|
||||
DamlStateKey.newBuilder.setContractId("contract ID 2").build
|
||||
private def aKeyWithContractId(id: String): DamlStateKey =
|
||||
DamlStateKey.newBuilder.setContractId(id).build
|
||||
|
||||
private val aKey: DamlStateKey = aKeyWithContractId("contract ID 1")
|
||||
private val anotherKey: DamlStateKey = aKeyWithContractId("contract ID 2")
|
||||
private val aValue: DamlStateValue = DamlStateValue.newBuilder
|
||||
.setParty(DamlPartyAllocation.newBuilder.setDisplayName("a party name"))
|
||||
.build
|
||||
|
Loading…
Reference in New Issue
Block a user