Handle out-of-time-bounds log entry (#6568)

This commit is contained in:
Miklos 2020-07-03 15:27:27 +02:00 committed by GitHub
parent 3e55fda66b
commit 52fc0635f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 445 additions and 6 deletions

View File

@ -113,6 +113,9 @@ message DamlLogEntry {
// A rejection of party allocation request.
DamlPartyAllocationRejectionEntry party_allocation_rejection_entry = 9;
// A rejection of a pre-executed submission because of out-of-time-bounds.
DamlOutOfTimeBoundsEntry out_of_time_bounds_entry = 10;
}
}
@ -316,6 +319,17 @@ message DamlPartyAllocationRejectionEntry {
}
}
// Indicates that a submission has been rejected after pre-execution.
// [[KeyValueConsumption.logEntryToUpdate]] will pick the right rejection reason based on current
// record time.
message DamlOutOfTimeBoundsEntry {
// We don't expect entry.recordTime to be present.
DamlLogEntry entry = 1;
google.protobuf.Timestamp duplicate_until = 2;
google.protobuf.Timestamp too_early_until = 3;
google.protobuf.Timestamp too_late_from = 4;
}
// DAML state key. [[KeyValueCommitting]] produces effects that are committed
// to the ledger from the `DamlSubmission`: a log entry to be created, and
// the set of DAML state updates.

View File

@ -11,6 +11,7 @@ import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.{Transaction => Tx}
import com.google.common.io.BaseEncoding
import com.google.protobuf.ByteString
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
@ -18,6 +19,8 @@ import scala.collection.JavaConverters._
* key-value based ledger.
*/
object KeyValueConsumption {
private val logger = LoggerFactory.getLogger(this.getClass)
def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString
def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes)
@ -32,8 +35,14 @@ object KeyValueConsumption {
*/
// TODO(BH): add participantId to ensure participant id matches in DamlLogEntry
@throws(classOf[Err])
def logEntryToUpdate(entryId: DamlLogEntryId, entry: DamlLogEntry): List[Update] = {
val recordTime = parseTimestamp(entry.getRecordTime)
def logEntryToUpdate(
entryId: DamlLogEntryId,
entry: DamlLogEntry,
recordTimeForUpdate: Option[Timestamp] = None): List[Update] = {
val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) {
case true => parseTimestamp(entry.getRecordTime)
}
val recordTime = resolveRecordTimeOrThrow(recordTimeForUpdate, recordTimeFromLogEntry)
entry.getPayloadCase match {
case DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY =>
@ -112,7 +121,7 @@ object KeyValueConsumption {
}
case DamlLogEntry.PayloadCase.TRANSACTION_ENTRY =>
List(txEntryToUpdate(entryId, entry.getTransactionEntry, recordTime))
List(transactionEntryToUpdate(entryId, entry.getTransactionEntry, recordTime))
case DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY =>
val configEntry = entry.getConfigurationEntry
@ -173,11 +182,24 @@ object KeyValueConsumption {
case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY =>
transactionRejectionEntryToUpdate(recordTime, entry.getTransactionRejectionEntry)
case DamlLogEntry.PayloadCase.OUT_OF_TIME_BOUNDS_ENTRY =>
outOfTimeBoundsEntryToUpdate(recordTime, entry.getOutOfTimeBoundsEntry).toList
case DamlLogEntry.PayloadCase.PAYLOAD_NOT_SET =>
throw Err.InternalError("logEntryToUpdate: PAYLOAD_NOT_SET!")
}
}
private def resolveRecordTimeOrThrow(
recordTimeForUpdate: Option[Timestamp],
recordTimeFromLogEntry: Option[Timestamp]): Timestamp =
(recordTimeForUpdate, recordTimeFromLogEntry) match {
case (_, Some(recordTime)) => recordTime
case (Some(recordTime), _) => recordTime
case (None, None) =>
throw Err.InternalError("Record time must be provided in order to generate an update")
}
private def transactionRejectionEntryToUpdate(
recordTime: Timestamp,
rejEntry: DamlTransactionRejectionEntry): List[Update] = {
@ -213,7 +235,7 @@ object KeyValueConsumption {
}
/** Transform the transaction entry into the [[Update.TransactionAccepted]] event. */
private def txEntryToUpdate(
private def transactionEntryToUpdate(
entryId: DamlLogEntryId,
txEntry: DamlTransactionEntry,
recordTime: Timestamp,
@ -243,6 +265,102 @@ object KeyValueConsumption {
)
}
private[kvutils] case class TimeBounds(
tooEarlyUntil: Option[Timestamp] = None,
tooLateFrom: Option[Timestamp] = None,
deduplicateUntil: Option[Timestamp] = None)
private[kvutils] def outOfTimeBoundsEntryToUpdate(
recordTime: Timestamp,
outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry): Option[Update] = {
val timeBounds = parseTimeBounds(outOfTimeBoundsEntry)
val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _)
val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _)
val tooLate = timeBounds.tooLateFrom.exists(recordTime > _)
val invalidRecordTime = tooEarly || tooLate
val wrappedLogEntry = outOfTimeBoundsEntry.getEntry
wrappedLogEntry.getPayloadCase match {
case _ if deduplicated =>
// We don't emit updates for deduplicated submissions.
None
case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if invalidRecordTime =>
val transactionRejectionEntry = wrappedLogEntry.getTransactionRejectionEntry
val reason = (timeBounds.tooEarlyUntil, timeBounds.tooLateFrom) match {
case (Some(lowerBound), Some(upperBound)) =>
s"Record time $recordTime outside of range [$lowerBound, $upperBound]"
case (Some(lowerBound), None) =>
s"Record time $recordTime outside of valid range ($recordTime < $lowerBound)"
case (None, Some(upperBound)) =>
s"Record time $recordTime outside of valid range ($recordTime > $upperBound)"
case _ =>
"Record time outside of valid range"
}
val rejectionReason = RejectionReason.InvalidLedgerTime(reason)
Some(
Update.CommandRejected(
recordTime = recordTime,
submitterInfo = parseSubmitterInfo(transactionRejectionEntry.getSubmitterInfo),
reason = rejectionReason
)
)
case DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY if invalidRecordTime =>
val configurationRejectionEntry = wrappedLogEntry.getConfigurationRejectionEntry
val reason = timeBounds.tooLateFrom
.map { maximumRecordTime =>
s"Configuration change timed out: $maximumRecordTime < $recordTime"
}
.getOrElse("Configuration change timed out")
Some(
Update.ConfigurationChangeRejected(
recordTime,
SubmissionId.assertFromString(configurationRejectionEntry.getSubmissionId),
ParticipantId.assertFromString(configurationRejectionEntry.getParticipantId),
Configuration.decode(configurationRejectionEntry.getConfiguration).right.get,
reason
)
)
case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY |
DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_REJECTION_ENTRY |
DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY |
DamlLogEntry.PayloadCase.PARTY_ALLOCATION_REJECTION_ENTRY =>
logger.error(
s"Dropped out-of-time-bounds log entry of type=${wrappedLogEntry.getPayloadCase}")
None
case DamlLogEntry.PayloadCase.TRANSACTION_ENTRY |
DamlLogEntry.PayloadCase.PACKAGE_UPLOAD_ENTRY |
DamlLogEntry.PayloadCase.CONFIGURATION_ENTRY |
DamlLogEntry.PayloadCase.PARTY_ALLOCATION_ENTRY |
DamlLogEntry.PayloadCase.OUT_OF_TIME_BOUNDS_ENTRY =>
throw Err.InternalError(
s"Out-of-time-bounds log entry does not contain a rejection entry: ${wrappedLogEntry.getPayloadCase}")
}
}
private def parseTimeBounds(outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry): TimeBounds = {
val duplicateUntilMaybe = parseOptionalTimestamp(
outOfTimeBoundsEntry.hasDuplicateUntil,
outOfTimeBoundsEntry.getDuplicateUntil)
val tooEarlyUntilMaybe = parseOptionalTimestamp(
outOfTimeBoundsEntry.hasTooEarlyUntil,
outOfTimeBoundsEntry.getTooEarlyUntil)
val tooLateFromMaybe = parseOptionalTimestamp(
outOfTimeBoundsEntry.hasTooLateFrom,
outOfTimeBoundsEntry.getTooLateFrom)
TimeBounds(tooEarlyUntilMaybe, tooLateFromMaybe, duplicateUntilMaybe)
}
private def parseOptionalTimestamp(
hasTimestamp: Boolean,
getTimestamp: => com.google.protobuf.Timestamp): Option[Timestamp] =
PartialFunction.condOpt(hasTimestamp) {
case true => parseTimestamp(getTimestamp)
}
@throws(classOf[Err])
private def parseLedgerString(what: String)(s: String): Ref.LedgerString =
Ref.LedgerString

View File

@ -469,7 +469,7 @@ private[kvutils] class TransactionCommitter(
// result in a LookupByKey than the original transaction. This means that the contract state data for the
// contractId pointed to by that contractKey might not have been preloaded into the input state map.
// This is not a problem because after the transaction reinterpretation, we compare the original
// transaction with the reintrepreted one, and the LookupByKey node will not match.
// transaction with the reinterpreted one, and the LookupByKey node will not match.
// Additionally, all contract keys are checked to uphold causal monotonicity.
contractState <- inputState.get(stateKey).flatMap(_.map(_.getContractState))
if contractIsActiveAndVisibleToSubmitter(transactionEntry, contractState)

View File

@ -81,7 +81,8 @@ class ConflictDetection(val damlMetrics: Metrics) {
case PARTY_ALLOCATION_ENTRY | PACKAGE_UPLOAD_ENTRY | CONFIGURATION_ENTRY |
TRANSACTION_REJECTION_ENTRY | CONFIGURATION_REJECTION_ENTRY |
PACKAGE_UPLOAD_REJECTION_ENTRY | PARTY_ALLOCATION_REJECTION_ENTRY =>
PACKAGE_UPLOAD_REJECTION_ENTRY | PARTY_ALLOCATION_REJECTION_ENTRY |
OUT_OF_TIME_BOUNDS_ENTRY =>
logger.trace(s"Dropping conflicting submission (${logEntry.getPayloadCase})")
metrics.dropped.inc()
None

View File

@ -0,0 +1,305 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
import java.time.Instant
import com.daml.ledger.participant.state.kvutils.Conversions.buildTimestamp
import com.daml.ledger.participant.state.kvutils.DamlKvutils.DamlLogEntry.PayloadCase._
import com.daml.ledger.participant.state.kvutils.DamlKvutils._
import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{
TimeBounds,
logEntryToUpdate,
outOfTimeBoundsEntryToUpdate
}
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
import com.daml.ledger.participant.state.v1.{Configuration, RejectionReason, Update}
import com.daml.lf.data.Time.Timestamp
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.prop.TableFor4
import org.scalatest.prop.Tables.Table
import org.scalatest.{Matchers, WordSpec}
class KeyValueConsumptionSpec extends WordSpec with Matchers {
private val aLogEntryId = DamlLogEntryId.getDefaultInstance
private val aLogEntryWithoutRecordTime = DamlLogEntry.newBuilder
.setPackageUploadEntry(DamlPackageUploadEntry.getDefaultInstance)
.build
private val aRecordTime = Timestamp(123456789)
private val aRecordTimeInstant = aRecordTime.toInstant
private val aRecordTimeFromLogEntry = Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
private val aLogEntryWithRecordTime = DamlLogEntry.newBuilder
.setRecordTime(Conversions.buildTimestamp(aRecordTimeFromLogEntry))
.setPackageUploadEntry(DamlPackageUploadEntry.getDefaultInstance)
.build
"logEntryToUpdate" should {
"throw in case no record time is available from the log entry or input argument" in {
assertThrows[Err](
logEntryToUpdate(aLogEntryId, aLogEntryWithoutRecordTime, recordTimeForUpdate = None))
}
"use log entry's record time instead of one provided as input" in {
val actual :: Nil = logEntryToUpdate(
aLogEntryId,
aLogEntryWithRecordTime,
recordTimeForUpdate = Some(aRecordTime))
actual.recordTime shouldBe aRecordTimeFromLogEntry
}
"use record time from log entry if not provided as input" in {
val actual :: Nil =
logEntryToUpdate(aLogEntryId, aLogEntryWithRecordTime, recordTimeForUpdate = None)
actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
}
}
private def verifyNoUpdateIsGenerated(actual: Option[Update]): Unit = {
actual should be(None)
()
}
case class Assertions(
verify: Option[Update] => Unit = verifyNoUpdateIsGenerated,
throwsInternalError: Boolean = false
)
"outOfTimeBoundsEntryToUpdate" should {
"not generate an update for deduplicated entries" in {
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(
TimeBounds(deduplicateUntil = Some(aRecordTime)),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions()),
(
TimeBounds(deduplicateUntil = Some(aRecordTime)),
aRecordTime,
PACKAGE_UPLOAD_REJECTION_ENTRY,
Assertions()),
(
TimeBounds(deduplicateUntil = Some(aRecordTime)),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions()),
(
TimeBounds(deduplicateUntil = Some(aRecordTime)),
aRecordTime,
PARTY_ALLOCATION_REJECTION_ENTRY,
Assertions())
)
runAll(testCases)
}
"generate a rejection entry for a transaction if record time is out of time bounds" in {
def verifyCommandRejection(actual: Option[Update]): Unit = actual match {
case Some(Update.CommandRejected(recordTime, submitterInfo, reason)) =>
recordTime shouldBe aRecordTime
submitterInfo shouldBe Conversions.parseSubmitterInfo(someSubmitterInfo)
reason shouldBe a[RejectionReason.InvalidLedgerTime]
()
case _ => fail
}
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(
TimeBounds(
tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1)))),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions(verify = verifyCommandRejection)),
(
TimeBounds(
tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1)))),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions(verify = verifyCommandRejection)),
(
TimeBounds(tooLateFrom = Some(aRecordTime)),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions()),
(
TimeBounds(tooEarlyUntil = Some(aRecordTime)),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions()),
// Record time within time bounds.
(
TimeBounds(
tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))),
tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1)))
),
aRecordTime,
TRANSACTION_REJECTION_ENTRY,
Assertions())
)
runAll(testCases)
}
"generate a rejection entry for a configuration if record time is out of time bounds" in {
def verifyConfigurationRejection(actual: Option[Update]): Unit = actual match {
case Some(
Update.ConfigurationChangeRejected(
recordTime,
submissionId,
participantId,
proposedConfiguration,
rejectionReason)) =>
recordTime shouldBe aRecordTime
submissionId shouldBe aConfigurationRejectionEntry.getSubmissionId
participantId shouldBe aConfigurationRejectionEntry.getParticipantId
proposedConfiguration shouldBe Configuration
.decode(aConfigurationRejectionEntry.getConfiguration)
.getOrElse(fail)
rejectionReason should include("Configuration change timed out")
()
case _ => fail
}
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(
TimeBounds(
tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1)))),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions(verify = verifyConfigurationRejection)),
(
TimeBounds(
tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1)))),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions(verify = verifyConfigurationRejection)),
(
TimeBounds(tooLateFrom = Some(aRecordTime)),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions()),
(
TimeBounds(tooEarlyUntil = Some(aRecordTime)),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions()),
// Record time within time bounds.
(
TimeBounds(
tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))),
tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1)))
),
aRecordTime,
CONFIGURATION_REJECTION_ENTRY,
Assertions())
)
runAll(testCases)
}
"not generate an update for rejected entries" in {
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(TimeBounds(), aRecordTime, TRANSACTION_REJECTION_ENTRY, Assertions()),
(TimeBounds(), aRecordTime, PACKAGE_UPLOAD_REJECTION_ENTRY, Assertions()),
(TimeBounds(), aRecordTime, CONFIGURATION_REJECTION_ENTRY, Assertions()),
(TimeBounds(), aRecordTime, PARTY_ALLOCATION_REJECTION_ENTRY, Assertions())
)
runAll(testCases)
}
"throw in case a normal log entry is seen" in {
val testCases = Table(
("Time Bounds", "Record Time", "Log Entry Type", "Assertions"),
(TimeBounds(), aRecordTime, TRANSACTION_ENTRY, Assertions(throwsInternalError = true)),
(TimeBounds(), aRecordTime, PACKAGE_UPLOAD_ENTRY, Assertions(throwsInternalError = true)),
(TimeBounds(), aRecordTime, CONFIGURATION_ENTRY, Assertions(throwsInternalError = true)),
(TimeBounds(), aRecordTime, PARTY_ALLOCATION_ENTRY, Assertions(throwsInternalError = true)),
(
TimeBounds(),
aRecordTime,
OUT_OF_TIME_BOUNDS_ENTRY,
Assertions(throwsInternalError = true))
)
runAll(testCases)
}
}
private def runAll(
table: TableFor4[TimeBounds, Timestamp, DamlLogEntry.PayloadCase, Assertions]): Unit = {
forAll(table) {
(
timeBounds: TimeBounds,
recordTime: Timestamp,
logEntryType: DamlLogEntry.PayloadCase,
assertions: Assertions) =>
val inputEntry = buildOutOfTimeBoundsEntry(timeBounds, logEntryType)
if (assertions.throwsInternalError) {
assertThrows[Err.InternalError](outOfTimeBoundsEntryToUpdate(recordTime, inputEntry))
} else {
val actual = outOfTimeBoundsEntryToUpdate(recordTime, inputEntry)
assertions.verify(actual)
()
}
}
}
private def buildOutOfTimeBoundsEntry(
timeBounds: TimeBounds,
logEntryType: DamlLogEntry.PayloadCase): DamlOutOfTimeBoundsEntry = {
val builder = DamlOutOfTimeBoundsEntry.newBuilder
timeBounds.tooEarlyUntil.foreach(value => builder.setTooEarlyUntil(buildTimestamp(value)))
timeBounds.tooLateFrom.foreach(value => builder.setTooLateFrom(buildTimestamp(value)))
timeBounds.deduplicateUntil.foreach(value => builder.setDuplicateUntil(buildTimestamp(value)))
builder.setEntry(buildLogEntry(logEntryType))
builder.build
}
private def someSubmitterInfo: DamlSubmitterInfo =
DamlSubmitterInfo.newBuilder
.setSubmitter("a submitter")
.setApplicationId("test")
.setCommandId("a command ID")
.setDeduplicateUntil(com.google.protobuf.Timestamp.getDefaultInstance)
.build
private def aTransactionRejectionEntry: DamlTransactionRejectionEntry =
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(someSubmitterInfo)
.build
private def aConfigurationRejectionEntry: DamlConfigurationRejectionEntry =
DamlConfigurationRejectionEntry.newBuilder
.setConfiguration(Configuration.encode(LedgerReader.DefaultConfiguration))
.setSubmissionId("a submission")
.setParticipantId("a participant")
.build
private def buildLogEntry(payloadCase: DamlLogEntry.PayloadCase): DamlLogEntry = {
val builder = DamlLogEntry.newBuilder
payloadCase match {
case TRANSACTION_ENTRY =>
builder.setTransactionEntry(DamlTransactionEntry.getDefaultInstance)
case TRANSACTION_REJECTION_ENTRY =>
builder.setTransactionRejectionEntry(aTransactionRejectionEntry)
case PACKAGE_UPLOAD_ENTRY =>
builder.setPackageUploadEntry(DamlPackageUploadEntry.getDefaultInstance)
case PACKAGE_UPLOAD_REJECTION_ENTRY =>
builder.setPackageUploadRejectionEntry(DamlPackageUploadRejectionEntry.getDefaultInstance)
case CONFIGURATION_ENTRY =>
builder.setConfigurationEntry(DamlConfigurationEntry.getDefaultInstance)
case CONFIGURATION_REJECTION_ENTRY =>
builder.setConfigurationRejectionEntry(aConfigurationRejectionEntry)
case PARTY_ALLOCATION_ENTRY =>
builder.setPartyAllocationEntry(DamlPartyAllocationEntry.getDefaultInstance)
case PARTY_ALLOCATION_REJECTION_ENTRY =>
builder.setPartyAllocationRejectionEntry(
DamlPartyAllocationRejectionEntry.getDefaultInstance)
case OUT_OF_TIME_BOUNDS_ENTRY =>
builder.setOutOfTimeBoundsEntry(DamlOutOfTimeBoundsEntry.getDefaultInstance)
case PAYLOAD_NOT_SET =>
()
}
builder.build
}
}

View File

@ -172,6 +172,7 @@ object KeyValueParticipantStateReaderSpec {
.newBuilder()
.setPartyAllocationEntry(
DamlPartyAllocationEntry.newBuilder().setParty("aParty").setParticipantId("aParticipant"))
.setRecordTime(com.google.protobuf.Timestamp.newBuilder.setSeconds(1234))
.build()
private val aWrappedLogEntry = Envelope.enclose(aLogEntry)