mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Restart the submission interpretation in case of a race [DPP-737] (#11579)
Command interpretation happens in two stages: 1. Daml interpretation 2. Determine a suitable ledger effective time The race can happen in the following situation: In 1) a contract key K is resolved to contract C1. Between 1) and 2), a transaction is stored on the participant that archives C1, but creates C2 with the same contract key K. In 2) the ledger api server tries to lookup the ledger effective time for all input contracts to the transaction. If it doesn't find one of the input contracts at all, it can conclude that the transaction wouldn't be accepted by the ledger anyway. The behavior before this patch was to simply abort command interpretation and return a rather cryptic error to the user. With this patch, the ledger api server restarts the command interpretation. If the "missing contract" was an explicit input to the command (e.g. as the contract for the exercise or as an argument to an exercise), then this command will be rejected because the contract is now archived. If the contract ID was determined via a contract key lookup, then restarting the interpretation will result in either a negative lookup or a different contract ID for the new contract under this contract key. CHANGELOG_BEGIN [Ledger API] Retry the interpretation of a command in case of a race with other transactions. This fix drastically reduces the likelihood of the error "Could not find a suitable ledger time after 0 retries". CHANGELOG_END
This commit is contained in:
parent
8d2b1b9ffe
commit
dd1b0347ec
@ -85,18 +85,32 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
|
|||||||
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
|
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.recover {
|
.recoverWith {
|
||||||
// An error while looking up the maximum ledger time for the used contracts
|
|
||||||
// most likely means that one of the contracts is already not active anymore,
|
case MissingContracts(contracts) =>
|
||||||
// which can happen under contention.
|
if (retriesLeft > 0) {
|
||||||
// A retry would only be successful in case the archived contracts were referenced by key.
|
metrics.daml.execution.retry.mark()
|
||||||
// Direct references to archived contracts will result in the same error.
|
logger.info(
|
||||||
|
s"Some input contracts could not be found. Restarting the computation. Missing contracts: ${contracts
|
||||||
|
.mkString("[", ", ", "]")}"
|
||||||
|
)
|
||||||
|
loop(commands, submissionSeed, ledgerConfiguration, retriesLeft - 1)
|
||||||
|
} else {
|
||||||
|
logger.info(
|
||||||
|
s"Lookup of maximum ledger time failed after ${maxRetries - retriesLeft}. Used contracts: ${usedContractIds
|
||||||
|
.mkString("[", ", ", "]")}."
|
||||||
|
)
|
||||||
|
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// An error while looking up the maximum ledger time for the used contracts. The nature of this error is not known.
|
||||||
|
// Not retrying automatically. All other automatically retryable cases are covered by the logic above.
|
||||||
case error =>
|
case error =>
|
||||||
logger.info(
|
logger.info(
|
||||||
s"Lookup of maximum ledger time failed. This can happen if there is contention on contracts used by the transaction. Used contracts: ${usedContractIds
|
s"Lookup of maximum ledger time failed after ${maxRetries - retriesLeft}. Used contracts: ${usedContractIds
|
||||||
.mkString(", ")}. Details: $error"
|
.mkString("[", ", ", "]")}. Details: $error"
|
||||||
)
|
)
|
||||||
Left(ErrorCause.LedgerTime(maxRetries - retriesLeft))
|
Future.successful(Left(ErrorCause.LedgerTime(maxRetries - retriesLeft)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,3 +127,5 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
|
|||||||
private[this] def advanceInputTime(cmd: Commands, newTime: Option[Time.Timestamp]): Commands =
|
private[this] def advanceInputTime(cmd: Commands, newTime: Option[Time.Timestamp]): Commands =
|
||||||
newTime.fold(cmd)(t => cmd.copy(commands = cmd.commands.copy(ledgerEffectiveTime = t)))
|
newTime.fold(cmd)(t => cmd.copy(commands = cmd.commands.copy(ledgerEffectiveTime = t)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case class MissingContracts(contracts: Set[ContractId]) extends RuntimeException
|
||||||
|
@ -9,6 +9,7 @@ import anorm.SqlParser.{byteArray, int, long, str}
|
|||||||
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
|
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
|
||||||
import com.daml.lf.data.Ref
|
import com.daml.lf.data.Ref
|
||||||
import com.daml.lf.data.Time.Timestamp
|
import com.daml.lf.data.Time.Timestamp
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
import com.daml.platform.store.Conversions.{
|
import com.daml.platform.store.Conversions.{
|
||||||
contractId,
|
contractId,
|
||||||
flatEventWitnessesColumn,
|
flatEventWitnessesColumn,
|
||||||
@ -49,11 +50,6 @@ class ContractStorageBackendTemplate(
|
|||||||
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
|
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
|
||||||
)
|
)
|
||||||
|
|
||||||
private def notFound(missingContractIds: Set[ContractId]): Throwable =
|
|
||||||
new IllegalArgumentException(
|
|
||||||
s"The following contracts have not been found: ${missingContractIds.map(_.coid).mkString(", ")}"
|
|
||||||
)
|
|
||||||
|
|
||||||
protected def maximumLedgerTimeSqlLiteral(
|
protected def maximumLedgerTimeSqlLiteral(
|
||||||
id: ContractId,
|
id: ContractId,
|
||||||
lastEventSequentialId: Long,
|
lastEventSequentialId: Long,
|
||||||
@ -124,7 +120,7 @@ class ContractStorageBackendTemplate(
|
|||||||
val missingIds = queriedIds.collect { case (missingId, None) =>
|
val missingIds = queriedIds.collect { case (missingId, None) =>
|
||||||
missingId
|
missingId
|
||||||
}
|
}
|
||||||
Failure(notFound(missingIds.toSet))
|
Failure(MissingContracts(missingIds.toSet))
|
||||||
} else Success(foundLedgerEffectiveTimes.max)
|
} else Success(foundLedgerEffectiveTimes.max)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,12 +25,14 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
|
|||||||
ContractState,
|
ContractState,
|
||||||
KeyState,
|
KeyState,
|
||||||
}
|
}
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
|
|
||||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||||
import scala.concurrent.{ExecutionContext, Future}
|
import scala.concurrent.{ExecutionContext, Future}
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Success, Try}
|
||||||
|
|
||||||
private[platform] class MutableCacheBackedContractStore(
|
private[platform] class MutableCacheBackedContractStore(
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
@ -73,41 +75,46 @@ private[platform] class MutableCacheBackedContractStore(
|
|||||||
override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
|
override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
|
||||||
loggingContext: LoggingContext
|
loggingContext: LoggingContext
|
||||||
): Future[Option[Timestamp]] =
|
): Future[Option[Timestamp]] =
|
||||||
if (ids.isEmpty)
|
Future
|
||||||
Future.failed(EmptyContractIds())
|
.fromTry(partitionCached(ids))
|
||||||
else {
|
.flatMap {
|
||||||
Future
|
case (cached, toBeFetched) if toBeFetched.isEmpty =>
|
||||||
.fromTry(partitionCached(ids))
|
Future.successful(Some(cached.max))
|
||||||
.flatMap {
|
case (cached, toBeFetched) =>
|
||||||
case (cached, toBeFetched) if toBeFetched.isEmpty =>
|
contractsReader
|
||||||
Future.successful(Some(cached.max))
|
.lookupMaximumLedgerTime(toBeFetched)
|
||||||
case (cached, toBeFetched) =>
|
.map(_.map(m => (cached + m).max))
|
||||||
contractsReader
|
}
|
||||||
.lookupMaximumLedgerTime(toBeFetched)
|
|
||||||
.map(_.map(m => (cached + m).max))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def partitionCached(
|
private def partitionCached(
|
||||||
ids: Set[ContractId]
|
ids: Set[ContractId]
|
||||||
)(implicit loggingContext: LoggingContext) = {
|
)(implicit
|
||||||
|
loggingContext: LoggingContext
|
||||||
|
): Try[(Set[Timestamp], Set[ContractId])] = {
|
||||||
val cacheQueried = ids.map(id => id -> contractsCache.get(id))
|
val cacheQueried = ids.map(id => id -> contractsCache.get(id))
|
||||||
|
|
||||||
val cached = cacheQueried.view
|
val cached = cacheQueried.view
|
||||||
.map(_._2)
|
.foldLeft[Either[Set[ContractId], Set[Timestamp]]](Right(Set.empty[Timestamp])) {
|
||||||
.foldLeft(Try(Set.empty[Timestamp])) {
|
// successful lookups
|
||||||
case (Success(timestamps), Some(active: Active)) =>
|
case (Right(timestamps), (_, Some(active: Active))) =>
|
||||||
Success(timestamps + active.createLedgerEffectiveTime)
|
Right(timestamps + active.createLedgerEffectiveTime)
|
||||||
case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids))
|
case (Right(timestamps), (_, None)) => Right(timestamps)
|
||||||
case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids))
|
|
||||||
case (Success(timestamps), None) => Success(timestamps)
|
// failure cases
|
||||||
case (failure, _) => failure
|
case (acc, (cid, Some(Archived(_) | NotFound))) =>
|
||||||
|
val missingContracts = acc.left.getOrElse(Set.empty) + cid
|
||||||
|
Left(missingContracts)
|
||||||
|
case (acc @ Left(_), _) => acc
|
||||||
}
|
}
|
||||||
|
|
||||||
cached.map { cached =>
|
cached
|
||||||
val missing = cacheQueried.collect { case (id, None) => id }
|
.map { cached =>
|
||||||
(cached, missing)
|
val missing = cacheQueried.collect { case (id, None) => id }
|
||||||
}
|
(cached, missing)
|
||||||
|
}
|
||||||
|
.left
|
||||||
|
.map(MissingContracts)
|
||||||
|
.toTry
|
||||||
}
|
}
|
||||||
|
|
||||||
private def readThroughContractsCache(contractId: ContractId)(implicit
|
private def readThroughContractsCache(contractId: ContractId)(implicit
|
||||||
@ -379,16 +386,6 @@ private[platform] object MutableCacheBackedContractStore {
|
|||||||
}.map(_ => contractStore)
|
}.map(_ => contractStore)
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class ContractNotFound(contractIds: Set[ContractId])
|
|
||||||
extends IllegalArgumentException(
|
|
||||||
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
|
|
||||||
)
|
|
||||||
|
|
||||||
final case class EmptyContractIds()
|
|
||||||
extends IllegalArgumentException(
|
|
||||||
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
|
|
||||||
)
|
|
||||||
|
|
||||||
final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
|
final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
|
||||||
override def getMessage: String =
|
override def getMessage: String =
|
||||||
s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival."
|
s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival."
|
||||||
|
@ -4,11 +4,12 @@
|
|||||||
package com.daml.platform.store.dao
|
package com.daml.platform.store.dao
|
||||||
|
|
||||||
import com.daml.lf.data.Time.Timestamp
|
import com.daml.lf.data.Time.Timestamp
|
||||||
|
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
import com.daml.lf.transaction.GlobalKey
|
import com.daml.lf.transaction.GlobalKey
|
||||||
import com.daml.lf.transaction.Node.KeyWithMaintainers
|
import com.daml.lf.transaction.Node.KeyWithMaintainers
|
||||||
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance, ValueText}
|
import com.daml.lf.value.Value.{ContractId, ValueText, VersionedContractInstance}
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
import org.scalatest.flatspec.AsyncFlatSpec
|
import org.scalatest.flatspec.AsyncFlatSpec
|
||||||
import org.scalatest.matchers.should.Matchers
|
import org.scalatest.matchers.should.Matchers
|
||||||
import org.scalatest.{Inside, LoneElement, OptionValues}
|
import org.scalatest.{Inside, LoneElement, OptionValues}
|
||||||
@ -167,8 +168,7 @@ private[dao] trait JdbcLedgerDaoContractsSpec extends LoneElement with Inside wi
|
|||||||
for {
|
for {
|
||||||
failure <- contractsReader.lookupMaximumLedgerTime(Set(randomContractId)).failed
|
failure <- contractsReader.lookupMaximumLedgerTime(Set(randomContractId)).failed
|
||||||
} yield {
|
} yield {
|
||||||
failure shouldBe an[IllegalArgumentException]
|
failure shouldBe an[MissingContracts]
|
||||||
assertIsLedgerTimeLookupError(failure.getMessage)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,17 +248,11 @@ private[dao] trait JdbcLedgerDaoContractsSpec extends LoneElement with Inside wi
|
|||||||
_ <- store(singleExercise(divulgedContractId))
|
_ <- store(singleExercise(divulgedContractId))
|
||||||
failure <- contractsReader.lookupMaximumLedgerTime(Set(divulgedContractId)).failed
|
failure <- contractsReader.lookupMaximumLedgerTime(Set(divulgedContractId)).failed
|
||||||
} yield {
|
} yield {
|
||||||
failure shouldBe an[IllegalArgumentException]
|
failure shouldBe an[MissingContracts]
|
||||||
assertIsLedgerTimeLookupError(failure.getMessage)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
it should "store contracts with a transient contract in the global divulgence" in {
|
it should "store contracts with a transient contract in the global divulgence" in {
|
||||||
store(fullyTransientWithChildren).flatMap(_ => succeed)
|
store(fullyTransientWithChildren).flatMap(_ => succeed)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def assertIsLedgerTimeLookupError(actualErrorString: String) = {
|
|
||||||
val errorString = "The following contracts have not been found"
|
|
||||||
actualErrorString should startWith(errorString)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,244 @@
|
|||||||
|
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package com.daml.platform.apiserver.execution
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
|
|
||||||
|
import com.codahale.metrics.MetricRegistry
|
||||||
|
import com.daml.error.ErrorCause
|
||||||
|
import com.daml.error.ErrorCause.LedgerTime
|
||||||
|
import com.daml.ledger.api.DeduplicationPeriod
|
||||||
|
import com.daml.ledger.api.DeduplicationPeriod.DeduplicationDuration
|
||||||
|
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId}
|
||||||
|
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
|
||||||
|
import com.daml.ledger.participant.state.index.v2.ContractStore
|
||||||
|
import com.daml.ledger.participant.state.v2.{SubmitterInfo, TransactionMeta}
|
||||||
|
import com.daml.lf.command.{Commands => LfCommands}
|
||||||
|
import com.daml.lf.crypto.Hash
|
||||||
|
import com.daml.lf.data.{ImmArray, Ref, Time}
|
||||||
|
import com.daml.lf.transaction.test.TransactionBuilder
|
||||||
|
import com.daml.lf.value.Value
|
||||||
|
import com.daml.lf.value.Value.ContractId
|
||||||
|
import com.daml.logging.LoggingContext
|
||||||
|
import com.daml.metrics.Metrics
|
||||||
|
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
|
||||||
|
import org.scalatest.matchers.should.Matchers
|
||||||
|
import org.scalatest.wordspec.AsyncWordSpec
|
||||||
|
|
||||||
|
import scala.concurrent.{ExecutionContext, Future}
|
||||||
|
import scala.util.{Failure, Success, Try}
|
||||||
|
|
||||||
|
class LedgerTimeAwareCommandExecutorSpec
|
||||||
|
extends AsyncWordSpec
|
||||||
|
with Matchers
|
||||||
|
with MockitoSugar
|
||||||
|
with ArgumentMatchersSugar {
|
||||||
|
|
||||||
|
val submissionSeed = Hash.hashPrivateKey("a key")
|
||||||
|
val configuration = Configuration(
|
||||||
|
generation = 1,
|
||||||
|
timeModel = LedgerTimeModel(
|
||||||
|
avgTransactionLatency = Duration.ZERO,
|
||||||
|
minSkew = Duration.ZERO,
|
||||||
|
maxSkew = Duration.ZERO,
|
||||||
|
).get,
|
||||||
|
maxDeduplicationTime = Duration.ZERO,
|
||||||
|
)
|
||||||
|
|
||||||
|
val cid = TransactionBuilder.newCid
|
||||||
|
val transaction = TransactionBuilder.justSubmitted(
|
||||||
|
TransactionBuilder().fetch(
|
||||||
|
TransactionBuilder().create(
|
||||||
|
cid,
|
||||||
|
Ref.Identifier(
|
||||||
|
Ref.PackageId.assertFromString("abc"),
|
||||||
|
Ref.QualifiedName.assertFromString("Main:Template"),
|
||||||
|
),
|
||||||
|
Value.ValueUnit,
|
||||||
|
Set.empty,
|
||||||
|
Set.empty,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
private def runExecutionTest(
|
||||||
|
dependsOnLedgerTime: Boolean,
|
||||||
|
contractStoreResults: List[Try[Option[Time.Timestamp]]],
|
||||||
|
finalExecutionResult: Either[ErrorCause, Time.Timestamp],
|
||||||
|
) = {
|
||||||
|
|
||||||
|
def commandExecutionResult(let: Time.Timestamp) = CommandExecutionResult(
|
||||||
|
SubmitterInfo(
|
||||||
|
Nil,
|
||||||
|
Ref.ApplicationId.assertFromString("foobar"),
|
||||||
|
Ref.CommandId.assertFromString("foobar"),
|
||||||
|
DeduplicationDuration(Duration.ofMinutes(1)),
|
||||||
|
None,
|
||||||
|
configuration,
|
||||||
|
),
|
||||||
|
TransactionMeta(let, None, Time.Timestamp.Epoch, submissionSeed, None, None, None),
|
||||||
|
transaction,
|
||||||
|
dependsOnLedgerTime,
|
||||||
|
5L,
|
||||||
|
)
|
||||||
|
|
||||||
|
val mockExecutor = mock[CommandExecutor]
|
||||||
|
when(
|
||||||
|
mockExecutor.execute(any[Commands], any[Hash], any[Configuration])(
|
||||||
|
any[ExecutionContext],
|
||||||
|
any[LoggingContext],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.thenAnswer((c: Commands) =>
|
||||||
|
Future.successful(Right(commandExecutionResult(c.commands.ledgerEffectiveTime)))
|
||||||
|
)
|
||||||
|
|
||||||
|
val mockContractStore = mock[ContractStore]
|
||||||
|
contractStoreResults.tail.foldLeft(
|
||||||
|
when(mockContractStore.lookupMaximumLedgerTime(any[Set[ContractId]])(any[LoggingContext]))
|
||||||
|
.thenReturn(Future.fromTry(contractStoreResults.head))
|
||||||
|
) { case (mock, result) =>
|
||||||
|
mock.andThen(Future.fromTry(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
val commands = Commands(
|
||||||
|
ledgerId = LedgerId("ledgerId"),
|
||||||
|
workflowId = None,
|
||||||
|
applicationId = ApplicationId(Ref.ApplicationId.assertFromString("applicationId")),
|
||||||
|
commandId = CommandId(Ref.CommandId.assertFromString("commandId")),
|
||||||
|
submissionId = None,
|
||||||
|
actAs = Set.empty,
|
||||||
|
readAs = Set.empty,
|
||||||
|
submittedAt = Time.Timestamp.Epoch,
|
||||||
|
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
|
||||||
|
commands = LfCommands(
|
||||||
|
commands = ImmArray.Empty,
|
||||||
|
ledgerEffectiveTime = Time.Timestamp.Epoch,
|
||||||
|
commandsReference = "",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
val instance = new LedgerTimeAwareCommandExecutor(
|
||||||
|
mockExecutor,
|
||||||
|
mockContractStore,
|
||||||
|
3,
|
||||||
|
new Metrics(new MetricRegistry),
|
||||||
|
)
|
||||||
|
LoggingContext.newLoggingContext { implicit context =>
|
||||||
|
instance.execute(commands, submissionSeed, configuration).map { actual =>
|
||||||
|
val expectedResult = finalExecutionResult.map(let =>
|
||||||
|
CommandExecutionResult(
|
||||||
|
SubmitterInfo(
|
||||||
|
Nil,
|
||||||
|
Ref.ApplicationId.assertFromString("foobar"),
|
||||||
|
Ref.CommandId.assertFromString("foobar"),
|
||||||
|
DeduplicationDuration(Duration.ofMinutes(1)),
|
||||||
|
None,
|
||||||
|
configuration,
|
||||||
|
),
|
||||||
|
TransactionMeta(let, None, Time.Timestamp.Epoch, submissionSeed, None, None, None),
|
||||||
|
transaction,
|
||||||
|
dependsOnLedgerTime,
|
||||||
|
5L,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
verify(mockExecutor, times(contractStoreResults.size)).execute(
|
||||||
|
any[Commands],
|
||||||
|
any[Hash],
|
||||||
|
any[Configuration],
|
||||||
|
)(any[ExecutionContext], any[LoggingContext])
|
||||||
|
verify(mockContractStore, times(contractStoreResults.size))
|
||||||
|
.lookupMaximumLedgerTime(Set(cid))
|
||||||
|
|
||||||
|
actual shouldEqual expectedResult
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val missingCid = Failure(MissingContracts(Set(cid)))
|
||||||
|
val foundEpoch = Success(Some(Time.Timestamp.Epoch))
|
||||||
|
val epochPlus5: Time.Timestamp = Time.Timestamp.Epoch.add(Duration.ofSeconds(5))
|
||||||
|
val foundEpochPlus5 = Success(Some(epochPlus5))
|
||||||
|
val noLetFound = Success(None)
|
||||||
|
|
||||||
|
"LedgerTimeAwareCommandExecutor" when {
|
||||||
|
"the model doesn't use getTime" should {
|
||||||
|
"not retry if ledger effective time is found in the contract store" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = false,
|
||||||
|
contractStoreResults = List(foundEpoch),
|
||||||
|
finalExecutionResult = Right(Time.Timestamp.Epoch),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
"not retry if the contract does not have ledger effective time" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = false,
|
||||||
|
contractStoreResults = List(noLetFound),
|
||||||
|
finalExecutionResult = Right(Time.Timestamp.Epoch),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
"retry if the contract cannot be found in the contract store and fail at max retries" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = false,
|
||||||
|
contractStoreResults = List(missingCid, missingCid, missingCid, missingCid),
|
||||||
|
finalExecutionResult = Left(LedgerTime(3)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
"succeed if the contract can be found on a retry" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = false,
|
||||||
|
contractStoreResults = List(missingCid, missingCid, missingCid, foundEpoch),
|
||||||
|
finalExecutionResult = Right(Time.Timestamp.Epoch),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
"advance the output time if the contract's LET is in the future" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = false,
|
||||||
|
contractStoreResults = List(foundEpochPlus5),
|
||||||
|
finalExecutionResult = Right(epochPlus5),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"the model uses getTime" should {
|
||||||
|
"retry if the contract's LET is in the future" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = true,
|
||||||
|
contractStoreResults = List(
|
||||||
|
// the first lookup of +5s will cause the interpretation to be restarted,
|
||||||
|
// in case the usage of getTime with a different LET would result in a different transaction
|
||||||
|
foundEpochPlus5,
|
||||||
|
// The second lookup finds the same ledger time again
|
||||||
|
foundEpochPlus5,
|
||||||
|
),
|
||||||
|
finalExecutionResult = Right(epochPlus5),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
"retry if the contract's LET is in the future and then retry if the contract is missing" in {
|
||||||
|
runExecutionTest(
|
||||||
|
dependsOnLedgerTime = true,
|
||||||
|
contractStoreResults = List(
|
||||||
|
// the first lookup of +5s will cause the interpretation to be restarted,
|
||||||
|
// in case the usage of getTime with a different LET would result in a different transaction
|
||||||
|
foundEpochPlus5,
|
||||||
|
// during the second interpretation the contract was actually archived
|
||||||
|
// and could not be found during the maximum ledger time lookup.
|
||||||
|
// this causes yet another restart of the interpretation.
|
||||||
|
missingCid,
|
||||||
|
// The third lookup finds the same ledger time again
|
||||||
|
foundEpochPlus5,
|
||||||
|
),
|
||||||
|
finalExecutionResult = Right(epochPlus5),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,8 @@ import java.sql.Connection
|
|||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
|
|
||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
|
|
||||||
final class PostCommitValidationSpec extends AnyWordSpec with Matchers {
|
final class PostCommitValidationSpec extends AnyWordSpec with Matchers {
|
||||||
@ -584,9 +586,7 @@ object PostCommitValidationSpec {
|
|||||||
l.fold(r)(left => r.fold(l)(right => if (left > right) l else r))
|
l.fold(r)(left => r.fold(l)(right => if (left > right) l else r))
|
||||||
|
|
||||||
private def notFound(contractIds: Set[ContractId]): Throwable =
|
private def notFound(contractIds: Set[ContractId]): Throwable =
|
||||||
new IllegalArgumentException(
|
MissingContracts(contractIds)
|
||||||
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
|
|
||||||
)
|
|
||||||
|
|
||||||
private def noCommittedContract(parties: List[PartyDetails]): ContractStoreFixture =
|
private def noCommittedContract(parties: List[PartyDetails]): ContractStoreFixture =
|
||||||
ContractStoreFixture(
|
ContractStoreFixture(
|
||||||
|
@ -20,13 +20,12 @@ import com.daml.lf.transaction.test.TransactionBuilder
|
|||||||
import com.daml.lf.value.Value.{ContractInstance, ValueRecord, ValueText}
|
import com.daml.lf.value.Value.{ContractInstance, ValueRecord, ValueText}
|
||||||
import com.daml.logging.LoggingContext
|
import com.daml.logging.LoggingContext
|
||||||
import com.daml.metrics.Metrics
|
import com.daml.metrics.Metrics
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
import com.daml.platform.store.EventSequentialId
|
import com.daml.platform.store.EventSequentialId
|
||||||
import com.daml.platform.store.appendonlydao.events.ContractStateEvent
|
import com.daml.platform.store.appendonlydao.events.ContractStateEvent
|
||||||
import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned}
|
import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned}
|
||||||
import com.daml.platform.store.cache.ContractStateValue.{Active, Archived}
|
import com.daml.platform.store.cache.ContractStateValue.{Active, Archived}
|
||||||
import com.daml.platform.store.cache.MutableCacheBackedContractStore.{
|
import com.daml.platform.store.cache.MutableCacheBackedContractStore.{
|
||||||
ContractNotFound,
|
|
||||||
EmptyContractIds,
|
|
||||||
EventSequentialId,
|
EventSequentialId,
|
||||||
SignalNewLedgerHead,
|
SignalNewLedgerHead,
|
||||||
SubscribeToContractStateEvents,
|
SubscribeToContractStateEvents,
|
||||||
@ -350,7 +349,7 @@ class MutableCacheBackedContractStoreSpec
|
|||||||
_ = store.cacheIndex.set(unusedOffset, 2L)
|
_ = store.cacheIndex.set(unusedOffset, 2L)
|
||||||
// populate the cache
|
// populate the cache
|
||||||
_ <- store.lookupActiveContract(Set(bob), cId_5)
|
_ <- store.lookupActiveContract(Set(bob), cId_5)
|
||||||
assertion <- recoverToSucceededIf[ContractNotFound](
|
assertion <- recoverToSucceededIf[MissingContracts](
|
||||||
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
|
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
|
||||||
)
|
)
|
||||||
} yield assertion
|
} yield assertion
|
||||||
@ -360,18 +359,11 @@ class MutableCacheBackedContractStoreSpec
|
|||||||
for {
|
for {
|
||||||
store <- contractStore(cachesSize = 0L).asFuture
|
store <- contractStore(cachesSize = 0L).asFuture
|
||||||
_ = store.cacheIndex.set(unusedOffset, 2L)
|
_ = store.cacheIndex.set(unusedOffset, 2L)
|
||||||
assertion <- recoverToSucceededIf[IllegalArgumentException](
|
assertion <- recoverToSucceededIf[MissingContracts](
|
||||||
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
|
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
|
||||||
)
|
)
|
||||||
} yield assertion
|
} yield assertion
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail if the requested contract id set is empty" in {
|
|
||||||
for {
|
|
||||||
store <- contractStore(cachesSize = 0L).asFuture
|
|
||||||
_ <- recoverToSucceededIf[EmptyContractIds](store.lookupMaximumLedgerTime(Set.empty))
|
|
||||||
} yield succeed
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createdEvent(
|
private def createdEvent(
|
||||||
@ -496,14 +488,8 @@ object MutableCacheBackedContractStoreSpec {
|
|||||||
): Future[Option[Timestamp]] = ids match {
|
): Future[Option[Timestamp]] = ids match {
|
||||||
case setIds if setIds == Set(cId_4) =>
|
case setIds if setIds == Set(cId_4) =>
|
||||||
Future.successful(Some(t4))
|
Future.successful(Some(t4))
|
||||||
case set if set.isEmpty =>
|
|
||||||
Future.failed(EmptyContractIds())
|
|
||||||
case _ =>
|
case _ =>
|
||||||
Future.failed(
|
Future.failed(MissingContracts(ids))
|
||||||
new IllegalArgumentException(
|
|
||||||
s"The following contracts have not been found: ${ids.map(_.coid).mkString(", ")}"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def lookupActiveContractAndLoadArgument(
|
override def lookupActiveContractAndLoadArgument(
|
||||||
|
@ -53,6 +53,7 @@ import com.daml.lf.transaction.{
|
|||||||
}
|
}
|
||||||
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
|
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
|
||||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||||
|
import com.daml.platform.apiserver.execution.MissingContracts
|
||||||
import com.daml.platform.index.TransactionConversion
|
import com.daml.platform.index.TransactionConversion
|
||||||
import com.daml.platform.packages.InMemoryPackageStore
|
import com.daml.platform.packages.InMemoryPackageStore
|
||||||
import com.daml.platform.participant.util.LfEngineToApi
|
import com.daml.platform.participant.util.LfEngineToApi
|
||||||
@ -339,27 +340,19 @@ private[sandbox] final class InMemoryLedger(
|
|||||||
override def lookupMaximumLedgerTime(contractIds: Set[ContractId])(implicit
|
override def lookupMaximumLedgerTime(contractIds: Set[ContractId])(implicit
|
||||||
loggingContext: LoggingContext
|
loggingContext: LoggingContext
|
||||||
): Future[Option[Timestamp]] =
|
): Future[Option[Timestamp]] =
|
||||||
if (contractIds.isEmpty) {
|
Future.fromTry(Try(this.synchronized {
|
||||||
Future.failed(
|
contractIds
|
||||||
new IllegalArgumentException(
|
.foldLeft[Option[Instant]](None)((acc, id) => {
|
||||||
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
|
val let = acs.activeContracts
|
||||||
)
|
.getOrElse(
|
||||||
)
|
id,
|
||||||
} else {
|
throw MissingContracts(Set(id)),
|
||||||
Future.fromTry(Try(this.synchronized {
|
)
|
||||||
contractIds
|
.let
|
||||||
.foldLeft[Option[Instant]](Some(Instant.MIN))((acc, id) => {
|
acc.map(acc => if (let.isAfter(acc)) let else acc)
|
||||||
val let = acs.activeContracts
|
})
|
||||||
.getOrElse(
|
.map(Timestamp.assertFromInstant)
|
||||||
id,
|
}))
|
||||||
sys.error(s"Contract $id not found while looking for maximum ledger time"),
|
|
||||||
)
|
|
||||||
.let
|
|
||||||
acc.map(acc => if (let.isAfter(acc)) let else acc)
|
|
||||||
})
|
|
||||||
.map(Timestamp.assertFromInstant)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
override def publishTransaction(
|
override def publishTransaction(
|
||||||
submitterInfo: state.SubmitterInfo,
|
submitterInfo: state.SubmitterInfo,
|
||||||
|
Loading…
Reference in New Issue
Block a user