[Sandbox-on-X] Fix record time assignment (#12706)

changelog_begin
changelog_end
This commit is contained in:
tudor-da 2022-02-02 15:32:07 +01:00 committed by GitHub
parent 496bc4e45b
commit 784faf6179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 145 additions and 90 deletions

View File

@ -14,13 +14,13 @@ import scala.collection.immutable.VectorMap
case class DeduplicationState private (
private[validate] val deduplicationQueue: DeduplicationQueue,
private val maxDeduplicationDuration: Duration,
private val currentTime: () => Time.Timestamp,
private val bridgeMetrics: BridgeMetrics,
) {
def deduplicate(
changeId: ChangeId,
commandDeduplicationDuration: Duration,
recordTime: Time.Timestamp,
): (DeduplicationState, Boolean) = {
bridgeMetrics.SequencerState.deduplicationQueueLength.update(deduplicationQueue.size)
if (commandDeduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
@ -28,19 +28,18 @@ case class DeduplicationState private (
s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
)
else {
val now = currentTime()
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, now)
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, recordTime)
val queueAfterEvictions = deduplicationQueue.dropWhile(_._2 <= expiredTimestamp)
val isDuplicateChangeId = queueAfterEvictions
.get(changeId)
.exists(_ > expiredThreshold(commandDeduplicationDuration, now))
.exists(_ >= expiredThreshold(commandDeduplicationDuration, recordTime))
if (isDuplicateChangeId)
copy(deduplicationQueue = queueAfterEvictions) -> true
else
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, now)) -> false
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, recordTime)) -> false
}
}
@ -56,13 +55,11 @@ object DeduplicationState {
private[validate] def empty(
deduplicationDuration: Duration,
currentTime: () => Time.Timestamp,
bridgeMetrics: BridgeMetrics,
): DeduplicationState =
DeduplicationState(
deduplicationQueue = VectorMap.empty,
maxDeduplicationDuration = deduplicationDuration,
currentTime = currentTime,
bridgeMetrics = bridgeMetrics,
)
}

View File

@ -45,7 +45,6 @@ private[validate] class SequenceImpl(
bridgeMetrics: BridgeMetrics,
errorFactories: ErrorFactories,
maxDeduplicationDuration: Duration,
wallClockTime: () => Time.Timestamp = () => Timestamp.now(),
) extends Sequence {
private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
@ -54,7 +53,7 @@ private[validate] class SequenceImpl(
@volatile private[validate] var allocatedParties = initialAllocatedParties
@volatile private[validate] var ledgerConfiguration = initialLedgerConfiguration
@volatile private[validate] var deduplicationState =
DeduplicationState.empty(maxDeduplicationDuration, wallClockTime, bridgeMetrics)
DeduplicationState.empty(maxDeduplicationDuration, bridgeMetrics)
override def apply(): Validation[(Offset, PreparedSubmission)] => Iterable[(Offset, Update)] =
in => {
@ -149,11 +148,11 @@ private[validate] class SequenceImpl(
newOffset: LastUpdatedAt,
recordTime: Timestamp,
txSubmission: PreparedTransactionSubmission,
) = {
val submitterInfo = txSubmission.submission.submitterInfo
val completionInfo = submitterInfo.toCompletionInfo()
): Update =
withErrorLogger(txSubmission.submission.submitterInfo.submissionId) { implicit errorLogger =>
val submitterInfo = txSubmission.submission.submitterInfo
val completionInfo = submitterInfo.toCompletionInfo()
withErrorLogger(submitterInfo.submissionId) { implicit errorLogger =>
for {
_ <- checkTimeModel(
transaction = txSubmission.submission,
@ -172,37 +171,30 @@ private[validate] class SequenceImpl(
inputContracts = txSubmission.inputContracts,
completionInfo = completionInfo,
)
_ <- deduplicateAndUpdateState(
recordTime = timeProvider.getCurrentTimestamp
updatedDeduplicationState <- deduplicate(
changeId = ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
),
deduplicationPeriod = txSubmission.submission.submitterInfo.deduplicationPeriod,
deduplicationPeriod = submitterInfo.deduplicationPeriod,
completionInfo = completionInfo,
recordTime = recordTime,
)
} yield ()
}(txSubmission.submission.loggingContext, logger)
.fold(
_.toCommandRejectedUpdate(recordTime),
{ _ =>
// Update the sequencer state
sequencerState = sequencerState
.dequeue(noConflictUpTo)
.enqueue(
newOffset,
txSubmission.updatedKeys,
txSubmission.consumedContracts,
)
transactionAccepted(
txSubmission.submission,
offsetIdx,
timeProvider.getCurrentTimestamp,
)
},
_ = updateStatesOnSuccessfulValidation(
noConflictUpTo,
newOffset,
txSubmission,
updatedDeduplicationState,
)
} yield transactionAccepted(
txSubmission.submission,
offsetIdx,
recordTime,
)
}
}(txSubmission.submission.loggingContext, logger)
.fold(_.toCommandRejectedUpdate(recordTime), identity)
private def conflictCheckWithInFlight(
keysState: Map[Key, (Option[ContractId], LastUpdatedAt)],
@ -240,22 +232,22 @@ private[validate] class SequenceImpl(
}
}
private def deduplicateAndUpdateState(
private def deduplicate(
changeId: ChangeId,
deduplicationPeriod: DeduplicationPeriod,
completionInfo: CompletionInfo,
recordTime: Time.Timestamp,
)(implicit
errorLogger: ContextualizedErrorLogger
): Validation[Unit] =
): Validation[DeduplicationState] =
deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(commandDeduplicationDuration) =>
val (newDeduplicationState, isDuplicate) =
deduplicationState.deduplicate(changeId, commandDeduplicationDuration)
deduplicationState.deduplicate(changeId, commandDeduplicationDuration, recordTime)
deduplicationState = newDeduplicationState
Either.cond(
!isDuplicate,
(),
newDeduplicationState,
DuplicateCommand(changeId, completionInfo),
)
case _: DeduplicationPeriod.DeduplicationOffset =>
@ -296,4 +288,20 @@ private[validate] class SequenceImpl(
.map(Rejection.InvalidLedgerTime(completionInfo, _)(errorFactories))
)
}
private def updateStatesOnSuccessfulValidation(
noConflictUpTo: LastUpdatedAt,
newOffset: LastUpdatedAt,
txSubmission: PreparedTransactionSubmission,
updatedDeduplicationState: DeduplicationState,
): Unit = {
sequencerState = sequencerState
.dequeue(noConflictUpTo)
.enqueue(
newOffset,
txSubmission.updatedKeys,
txSubmission.consumedContracts,
)
deduplicationState = updatedDeduplicationState
}
}

View File

@ -19,33 +19,48 @@ import scala.util.chaining._
class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
behavior of classOf[DeduplicationState].getSimpleName
private val initialTime = Time.Timestamp.now()
private val t0 = Time.Timestamp.now()
private val t1 = t0.add(Duration.ofMinutes(1L))
private val t2 = t0.add(Duration.ofMinutes(2L))
private val t3 = t0.add(Duration.ofMinutes(3L))
private val bridgeMetrics = new BridgeMetrics(new Metrics(new MetricRegistry))
it should "deduplicate commands within the requested deduplication window" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(3L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t0,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> t0)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t1,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> initialTime)
newDeduplicationState.deduplicationQueue shouldBe VectorMap(changeId(1) -> t0)
isDuplicate shouldBe true
}
._1
.deduplicate(changeId(1), Duration.ofMinutes(2L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(2L),
recordTime = t3,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime.add(Duration.ofMinutes(2))
changeId(1) -> t3
)
isDuplicate shouldBe false
}
@ -54,33 +69,44 @@ class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
it should "evicts old entries (older than max deduplication time)" in {
val deduplicationState = DeduplicationState.empty(
deduplicationDuration = Duration.ofMinutes(2L),
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
deduplicationState
.deduplicate(changeId(1), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(1),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t0,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime
changeId(1) -> t0
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(2), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(2),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t1,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(1) -> initialTime,
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
changeId(1) -> t0,
changeId(2) -> t1,
)
isDuplicate shouldBe false
}
._1
.deduplicate(changeId(3), Duration.ofMinutes(1L))
.deduplicate(
changeId = changeId(3),
commandDeduplicationDuration = Duration.ofMinutes(1L),
recordTime = t2,
)
.tap { case (newDeduplicationState, isDuplicate) =>
newDeduplicationState.deduplicationQueue shouldBe VectorMap(
changeId(2) -> initialTime.add(Duration.ofMinutes(1)),
changeId(3) -> initialTime.add(Duration.ofMinutes(2)),
changeId(2) -> t1,
changeId(3) -> t2,
)
isDuplicate shouldBe false
}
@ -93,10 +119,9 @@ class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
DeduplicationState
.empty(
deduplicationDuration = maxDeduplicationDuration,
currentTime = currentTimeMock,
bridgeMetrics = bridgeMetrics,
)
.deduplicate(changeId(1337), commandDeduplicationDuration)
.deduplicate(changeId(1337), commandDeduplicationDuration, t0)
) match {
case Failure(ex) =>
ex.getMessage shouldBe s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
@ -104,13 +129,6 @@ class DeduplicateStateSpec extends AnyFlatSpec with Matchers {
}
}
// Current time provider mock builder.
// On each call, the mock advances the time by 1 minute
private def currentTimeMock: () => Time.Timestamp = {
var currentTime = initialTime
() => currentTime.tap(_ => currentTime = currentTime.add(Duration.ofMinutes(1L)))
}
private def changeId(idx: Int): ChangeId = ChangeId(
applicationId = Ref.ApplicationId.assertFromString("some-app"),
commandId = Ref.CommandId.assertFromString(s"some-command-$idx"),

View File

@ -162,51 +162,68 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
it should "assert internal consistency validation on transaction submission conflicts" in new TestContext {
// Conflict validation passes on empty Sequencer State
val Seq((offset1, update1)) = sequence(create(cId(1), Some(contractKey(1L))))
val Seq((offset1, update1)) =
sequence(create(cId(1), Some(contractKey(1L)), cmdId = commandId(2)))
offset1 shouldBe toOffset(1L)
update1 shouldBe transactionAccepted(1)
update1 shouldBe transactionAccepted(1, cmdId = commandId(2))
// Attempt assigning an active contract key
val Seq((offset2, update2)) = sequence(create(cId(2), Some(contractKey(1L))))
val Seq((offset2, update2)) =
sequence(create(cId(2), Some(contractKey(1L)), cmdId = commandId(2)))
offset2 shouldBe toOffset(2L)
assertCommandRejected(update2, "Inconsistent: DuplicateKey: contract key is not unique")
assertCommandRejected(
update2,
"Inconsistent: DuplicateKey: contract key is not unique",
cmdId = commandId(2),
)
// Archiving a contract for the first time should succeed
val Seq((offset3, update3)) = sequence(consume(cId(3)))
val Seq((offset3, update3)) = sequence(consume(cId(3), cmdId = commandId(3)))
offset3 shouldBe toOffset(3L)
update3 shouldBe transactionAccepted(3)
update3 shouldBe transactionAccepted(3, cmdId = commandId(3))
// Reject when trying to archive a contract again
val Seq((offset4, update4)) = sequence(consume(cId(3)))
val Seq((offset4, update4)) = sequence(consume(cId(3), cmdId = commandId(3)))
offset4 shouldBe toOffset(4L)
assertCommandRejected(update4, s"Inconsistent: Could not lookup contracts: [${cId(3).coid}]")
assertCommandRejected(
update4,
s"Inconsistent: Could not lookup contracts: [${cId(3).coid}]",
cmdId = commandId(3),
)
// Archiving a contract with an assigned key for the first time succeeds
val Seq((offset5, update5)) = sequence(consume(cId(4), Some(contractKey(2L))))
val Seq((offset5, update5)) =
sequence(consume(cId(4), Some(contractKey(2L)), cmdId = commandId(4)))
offset5 shouldBe toOffset(5L)
update5 shouldBe transactionAccepted(5)
update5 shouldBe transactionAccepted(5, cmdId = commandId(4))
// Reject on unknown key
val Seq((offset6, update6)) = sequence(exerciseNonConsuming(cId(5), contractKey(2L)))
val Seq((offset6, update6)) =
sequence(exerciseNonConsuming(cId(5), contractKey(2L), cmdId = commandId(5)))
offset6 shouldBe toOffset(6L)
assertCommandRejected(
update6,
s"Inconsistent: Contract key lookup with different results: expected [None], actual [Some(${cId(5)})]",
cmdId = commandId(5),
)
// Reject on inconsistent key usage
val Seq((offset7, update7)) = sequence(exerciseNonConsuming(cId(5), contractKey(1L)))
val Seq((offset7, update7)) =
sequence(exerciseNonConsuming(cId(5), contractKey(1L), cmdId = commandId(5)))
offset7 shouldBe toOffset(7L)
assertCommandRejected(
update7,
s"Inconsistent: Contract key lookup with different results: expected [Some(${cId(1)})], actual [Some(${cId(5)})]",
cmdId = commandId(5),
)
}
it should "forward the noConflictUpTo offsets to the sequencer state queue and allow its pruning" in new TestContext {
// Ingest two transactions which are archiving contracts
val Seq((offset1, _)) = sequence(consume(contractId = cId(1), noConflictUpTo = toOffset(0L)))
val Seq((offset2, _)) = sequence(consume(contractId = cId(2), noConflictUpTo = toOffset(0L)))
val Seq((offset1, _)) =
sequence(consume(contractId = cId(1), noConflictUpTo = toOffset(0L), cmdId = commandId(1)))
val Seq((offset2, _)) =
sequence(consume(contractId = cId(2), noConflictUpTo = toOffset(0L), cmdId = commandId(2)))
// Check that the sequencer queue includes the updates
sequenceImpl.sequencerState.sequencerQueue should contain theSameElementsAs Vector(
@ -216,7 +233,8 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
sequenceImpl.sequencerState.consumedContractsState shouldBe Set(cId(1), cId(2))
// Ingest another transaction with the noConflictUpTo equal to the offset of the previous transaction
val Seq((offset3, _)) = sequence(consume(contractId = cId(3), noConflictUpTo = offset2))
val Seq((offset3, _)) =
sequence(consume(contractId = cId(3), noConflictUpTo = offset2, cmdId = commandId(3)))
// Assert that the queue has pruned the previous entries
sequenceImpl.sequencerState.sequencerQueue should contain theSameElementsAs Vector(
@ -240,11 +258,12 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
cId(1),
transactionSubmission =
tx.copy(submitterInfo = tx.submitterInfo.copy(deduplicationPeriod = deduplicationPeriod)),
cmdId = commandId(1),
)
val Seq((offset1, update1)) = sequence(initialSubmission)
offset1 shouldBe toOffset(1L)
update1 shouldBe transactionAccepted(1)
update1 shouldBe transactionAccepted(1, cmdId = commandId(1))
// Assert duplicate command rejected
val Seq((offset2, update2)) = sequence(submissionWithDedupPeriod)
@ -253,10 +272,12 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
update2,
"A command with the given command id has already been successfully processed",
deduplicationPeriod,
cmdId = commandId(1),
)
// Advance record time past the deduplication period
private val newRecordTime: Timestamp = currentRecordTime.add(deduplicationPeriod.duration)
private val newRecordTime: Timestamp =
currentRecordTime.add(deduplicationPeriod.duration.plusSeconds(1L))
when(timeProviderMock.getCurrentTimestamp).thenReturn(newRecordTime)
// Assert command is accepted
@ -305,7 +326,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
actAs = List.empty,
readAs = List.empty,
applicationId = Ref.ApplicationId.assertFromString("applicationId"),
commandId = Ref.CommandId.assertFromString("commandId"),
commandId = commandId(1),
deduplicationPeriod = zeroDeduplicationPeriod,
submissionId = Some(submissionId),
ledgerConfiguration =
@ -391,6 +412,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
keyO: Option[GlobalKey] = None,
informees: Set[Ref.Party] = txInformees,
transactionSubmission: Submission.Transaction = tx,
cmdId: Ref.CommandId = commandId(1),
): Right[Nothing, (Offset, PreparedTransactionSubmission)] = {
val keyInputs = keyO.map(k => Map(k -> KeyCreate)).getOrElse(Map.empty)
val updatedKeys = keyO.map(k => Map(k -> Some(contractId))).getOrElse(Map.empty)
@ -400,7 +422,9 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
keyInputs = keyInputs,
updatedKeys = updatedKeys,
transactionInformees = informees,
submission = transactionSubmission,
submission = transactionSubmission.copy(submitterInfo =
transactionSubmission.submitterInfo.copy(commandId = cmdId)
),
)
input(preparedTransactionSubmission)
@ -412,6 +436,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
noConflictUpTo: Offset = Offset.beforeBegin,
informees: Set[Ref.Party] = txInformees,
transactionSubmission: Submission.Transaction = tx,
cmdId: Ref.CommandId = commandId(1),
): Right[Nothing, (Offset, PreparedTransactionSubmission)] = {
val keyInputs = keyO.map(k => Map(k -> KeyActive(contractId))).getOrElse(Map.empty)
val updatedKeys = keyO.map(k => Map(k -> None)).getOrElse(Map.empty)
@ -424,7 +449,8 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
updatedKeys = updatedKeys,
consumedContracts = inputContracts,
transactionInformees = informees,
submission = transactionSubmission,
submission =
transactionSubmission.copy(submitterInfo = submitterInfo.copy(commandId = cmdId)),
)
Right(noConflictUpTo -> preparedTransactionSubmission)
@ -443,7 +469,6 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
initialAllocatedParties = allocatedInformees,
initialLedgerConfiguration = initialLedgerConfiguration,
maxDeduplicationDuration = maxDeduplicationDuration,
wallClockTime = () => timeProviderMock.getCurrentTimestamp,
)
def exerciseNonConsuming(
@ -451,6 +476,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
key: GlobalKey,
informees: Set[Ref.Party] = txInformees,
transactionSubmission: Submission.Transaction = tx,
cmdId: Ref.CommandId = commandId(1),
): Right[Nothing, (Offset, PreparedTransactionSubmission)] = {
val inputContracts = Set(contractId)
@ -461,7 +487,8 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
updatedKeys = Map.empty,
consumedContracts = Set.empty,
transactionInformees = informees,
submission = transactionSubmission,
submission =
transactionSubmission.copy(submitterInfo = submitterInfo.copy(commandId = cmdId)),
)
input(preparedTransactionSubmission)
@ -471,9 +498,10 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
txId: Int,
completionInfo: CompletionInfo = completionInfo,
recordTime: Time.Timestamp = currentRecordTime,
cmdId: Ref.CommandId = commandId(1),
): Update.TransactionAccepted =
Update.TransactionAccepted(
optCompletionInfo = Some(completionInfo),
optCompletionInfo = Some(completionInfo.copy(commandId = cmdId)),
transactionMeta = transactionMeta,
transaction = CommittedTransaction(txMock),
transactionId = Ref.TransactionId.assertFromString(txId.toString),
@ -486,6 +514,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
update: Update,
reason: String,
deduplicationPeriod: DeduplicationPeriod = zeroDeduplicationPeriod,
cmdId: Ref.CommandId = commandId(1),
): Assertion = update match {
case rejection: Update.CommandRejected =>
rejection.recordTime shouldBe currentRecordTime
@ -493,6 +522,7 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
rejection.completionInfo shouldBe completionInfo.copy(
statistics = None,
optDeduplicationPeriod = Some(deduplicationPeriod),
commandId = cmdId,
)
// TODO SoX: Assert error codes
rejection.reasonTemplate.message should include(reason)
@ -512,4 +542,6 @@ class SequenceSpec extends AnyFlatSpec with MockitoSugar with Matchers with Argu
}
private def cId(i: Int) = ContractId.V1(Hash.hashPrivateKey(i.toString))
private def commandId(i: Int): Ref.CommandId = Ref.CommandId.assertFromString(s"cmd-$i")
}