Restart the submission interpretation in case of a race [DPP-737] (#11579)

Command interpretation happens in two stages:
1. Daml interpretation
2. Determine a suitable ledger effective time

The race can happen in the following situation:
In 1) a contract key K is resolved to contract C1.
Between 1) and 2), a transaction is stored on the participant that
archives C1, but creates C2 with the same contract key K.
In 2) the ledger api server tries to lookup the ledger effective time
for all input contracts to the transaction. If it doesn't find one of
the input contracts at all, it can conclude that the transaction
wouldn't be accepted by the ledger anyway.

The behavior before this patch was to simply abort command
interpretation and return a rather cryptic error to the user.
With this patch, the ledger api server restarts the command
interpretation.
If the "missing contract" was an explicit input to the
command (e.g. as the contract for the exercise or as an argument to an
exercise), then this command will be rejected because the contract is
now archived.
If the contract ID was determined via a contract key lookup, then
restarting the interpretation will result in either a negative lookup or
a different contract ID for the new contract under this contract key.

CHANGELOG_BEGIN
[Ledger API] Retry the interpretation of a command in case of a race
with other transactions. This fix drastically reduces the likelihood of the error
"Could not find a suitable ledger time after 0 retries".
CHANGELOG_END
This commit is contained in:
Gerolf Seitz 2021-11-09 10:03:57 +01:00 committed by GitHub
parent 8d2b1b9ffe
commit dd1b0347ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 333 additions and 107 deletions

View File

@ -85,18 +85,32 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
Future.successful(Left(ErrorCause.LedgerTime(maxRetries))) Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
} }
} }
.recover { .recoverWith {
// An error while looking up the maximum ledger time for the used contracts
// most likely means that one of the contracts is already not active anymore, case MissingContracts(contracts) =>
// which can happen under contention. if (retriesLeft > 0) {
// A retry would only be successful in case the archived contracts were referenced by key. metrics.daml.execution.retry.mark()
// Direct references to archived contracts will result in the same error. logger.info(
s"Some input contracts could not be found. Restarting the computation. Missing contracts: ${contracts
.mkString("[", ", ", "]")}"
)
loop(commands, submissionSeed, ledgerConfiguration, retriesLeft - 1)
} else {
logger.info(
s"Lookup of maximum ledger time failed after ${maxRetries - retriesLeft}. Used contracts: ${usedContractIds
.mkString("[", ", ", "]")}."
)
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
}
// An error while looking up the maximum ledger time for the used contracts. The nature of this error is not known.
// Not retrying automatically. All other automatically retryable cases are covered by the logic above.
case error => case error =>
logger.info( logger.info(
s"Lookup of maximum ledger time failed. This can happen if there is contention on contracts used by the transaction. Used contracts: ${usedContractIds s"Lookup of maximum ledger time failed after ${maxRetries - retriesLeft}. Used contracts: ${usedContractIds
.mkString(", ")}. Details: $error" .mkString("[", ", ", "]")}. Details: $error"
) )
Left(ErrorCause.LedgerTime(maxRetries - retriesLeft)) Future.successful(Left(ErrorCause.LedgerTime(maxRetries - retriesLeft)))
} }
} }
@ -113,3 +127,5 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
private[this] def advanceInputTime(cmd: Commands, newTime: Option[Time.Timestamp]): Commands = private[this] def advanceInputTime(cmd: Commands, newTime: Option[Time.Timestamp]): Commands =
newTime.fold(cmd)(t => cmd.copy(commands = cmd.commands.copy(ledgerEffectiveTime = t))) newTime.fold(cmd)(t => cmd.copy(commands = cmd.commands.copy(ledgerEffectiveTime = t)))
} }
case class MissingContracts(contracts: Set[ContractId]) extends RuntimeException

View File

@ -9,6 +9,7 @@ import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~} import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
import com.daml.lf.data.Ref import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp import com.daml.lf.data.Time.Timestamp
import com.daml.platform.apiserver.execution.MissingContracts
import com.daml.platform.store.Conversions.{ import com.daml.platform.store.Conversions.{
contractId, contractId,
flatEventWitnessesColumn, flatEventWitnessesColumn,
@ -49,11 +50,6 @@ class ContractStorageBackendTemplate(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers" "Cannot lookup the maximum ledger time for an empty set of contract identifiers"
) )
private def notFound(missingContractIds: Set[ContractId]): Throwable =
new IllegalArgumentException(
s"The following contracts have not been found: ${missingContractIds.map(_.coid).mkString(", ")}"
)
protected def maximumLedgerTimeSqlLiteral( protected def maximumLedgerTimeSqlLiteral(
id: ContractId, id: ContractId,
lastEventSequentialId: Long, lastEventSequentialId: Long,
@ -124,7 +120,7 @@ class ContractStorageBackendTemplate(
val missingIds = queriedIds.collect { case (missingId, None) => val missingIds = queriedIds.collect { case (missingId, None) =>
missingId missingId
} }
Failure(notFound(missingIds.toSet)) Failure(MissingContracts(missingIds.toSet))
} else Success(foundLedgerEffectiveTimes.max) } else Success(foundLedgerEffectiveTimes.max)
} }
} }

View File

@ -25,12 +25,14 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
ContractState, ContractState,
KeyState, KeyState,
} }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import com.daml.platform.apiserver.execution.MissingContracts
import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try} import scala.util.{Success, Try}
private[platform] class MutableCacheBackedContractStore( private[platform] class MutableCacheBackedContractStore(
metrics: Metrics, metrics: Metrics,
@ -73,9 +75,6 @@ private[platform] class MutableCacheBackedContractStore(
override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext loggingContext: LoggingContext
): Future[Option[Timestamp]] = ): Future[Option[Timestamp]] =
if (ids.isEmpty)
Future.failed(EmptyContractIds())
else {
Future Future
.fromTry(partitionCached(ids)) .fromTry(partitionCached(ids))
.flatMap { .flatMap {
@ -86,28 +85,36 @@ private[platform] class MutableCacheBackedContractStore(
.lookupMaximumLedgerTime(toBeFetched) .lookupMaximumLedgerTime(toBeFetched)
.map(_.map(m => (cached + m).max)) .map(_.map(m => (cached + m).max))
} }
}
private def partitionCached( private def partitionCached(
ids: Set[ContractId] ids: Set[ContractId]
)(implicit loggingContext: LoggingContext) = { )(implicit
loggingContext: LoggingContext
): Try[(Set[Timestamp], Set[ContractId])] = {
val cacheQueried = ids.map(id => id -> contractsCache.get(id)) val cacheQueried = ids.map(id => id -> contractsCache.get(id))
val cached = cacheQueried.view val cached = cacheQueried.view
.map(_._2) .foldLeft[Either[Set[ContractId], Set[Timestamp]]](Right(Set.empty[Timestamp])) {
.foldLeft(Try(Set.empty[Timestamp])) { // successful lookups
case (Success(timestamps), Some(active: Active)) => case (Right(timestamps), (_, Some(active: Active))) =>
Success(timestamps + active.createLedgerEffectiveTime) Right(timestamps + active.createLedgerEffectiveTime)
case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids)) case (Right(timestamps), (_, None)) => Right(timestamps)
case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids))
case (Success(timestamps), None) => Success(timestamps) // failure cases
case (failure, _) => failure case (acc, (cid, Some(Archived(_) | NotFound))) =>
val missingContracts = acc.left.getOrElse(Set.empty) + cid
Left(missingContracts)
case (acc @ Left(_), _) => acc
} }
cached.map { cached => cached
.map { cached =>
val missing = cacheQueried.collect { case (id, None) => id } val missing = cacheQueried.collect { case (id, None) => id }
(cached, missing) (cached, missing)
} }
.left
.map(MissingContracts)
.toTry
} }
private def readThroughContractsCache(contractId: ContractId)(implicit private def readThroughContractsCache(contractId: ContractId)(implicit
@ -379,16 +386,6 @@ private[platform] object MutableCacheBackedContractStore {
}.map(_ => contractStore) }.map(_ => contractStore)
} }
final case class ContractNotFound(contractIds: Set[ContractId])
extends IllegalArgumentException(
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
)
final case class EmptyContractIds()
extends IllegalArgumentException(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)
final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace { final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
override def getMessage: String = override def getMessage: String =
s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival." s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival."

View File

@ -4,11 +4,12 @@
package com.daml.platform.store.dao package com.daml.platform.store.dao
import com.daml.lf.data.Time.Timestamp import com.daml.lf.data.Time.Timestamp
import java.util.UUID import java.util.UUID
import com.daml.lf.transaction.GlobalKey import com.daml.lf.transaction.GlobalKey
import com.daml.lf.transaction.Node.KeyWithMaintainers import com.daml.lf.transaction.Node.KeyWithMaintainers
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance, ValueText} import com.daml.lf.value.Value.{ContractId, ValueText, VersionedContractInstance}
import com.daml.platform.apiserver.execution.MissingContracts
import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import org.scalatest.{Inside, LoneElement, OptionValues} import org.scalatest.{Inside, LoneElement, OptionValues}
@ -167,8 +168,7 @@ private[dao] trait JdbcLedgerDaoContractsSpec extends LoneElement with Inside wi
for { for {
failure <- contractsReader.lookupMaximumLedgerTime(Set(randomContractId)).failed failure <- contractsReader.lookupMaximumLedgerTime(Set(randomContractId)).failed
} yield { } yield {
failure shouldBe an[IllegalArgumentException] failure shouldBe an[MissingContracts]
assertIsLedgerTimeLookupError(failure.getMessage)
} }
} }
@ -248,17 +248,11 @@ private[dao] trait JdbcLedgerDaoContractsSpec extends LoneElement with Inside wi
_ <- store(singleExercise(divulgedContractId)) _ <- store(singleExercise(divulgedContractId))
failure <- contractsReader.lookupMaximumLedgerTime(Set(divulgedContractId)).failed failure <- contractsReader.lookupMaximumLedgerTime(Set(divulgedContractId)).failed
} yield { } yield {
failure shouldBe an[IllegalArgumentException] failure shouldBe an[MissingContracts]
assertIsLedgerTimeLookupError(failure.getMessage)
} }
} }
it should "store contracts with a transient contract in the global divulgence" in { it should "store contracts with a transient contract in the global divulgence" in {
store(fullyTransientWithChildren).flatMap(_ => succeed) store(fullyTransientWithChildren).flatMap(_ => succeed)
} }
private[this] def assertIsLedgerTimeLookupError(actualErrorString: String) = {
val errorString = "The following contracts have not been found"
actualErrorString should startWith(errorString)
}
} }

View File

@ -0,0 +1,244 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.execution
import java.time.Duration
import com.codahale.metrics.MetricRegistry
import com.daml.error.ErrorCause
import com.daml.error.ErrorCause.LedgerTime
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.DeduplicationPeriod.DeduplicationDuration
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.ContractStore
import com.daml.ledger.participant.state.v2.{SubmitterInfo, TransactionMeta}
import com.daml.lf.command.{Commands => LfCommands}
import com.daml.lf.crypto.Hash
import com.daml.lf.data.{ImmArray, Ref, Time}
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value
import com.daml.lf.value.Value.ContractId
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
class LedgerTimeAwareCommandExecutorSpec
extends AsyncWordSpec
with Matchers
with MockitoSugar
with ArgumentMatchersSugar {
val submissionSeed = Hash.hashPrivateKey("a key")
val configuration = Configuration(
generation = 1,
timeModel = LedgerTimeModel(
avgTransactionLatency = Duration.ZERO,
minSkew = Duration.ZERO,
maxSkew = Duration.ZERO,
).get,
maxDeduplicationTime = Duration.ZERO,
)
val cid = TransactionBuilder.newCid
val transaction = TransactionBuilder.justSubmitted(
TransactionBuilder().fetch(
TransactionBuilder().create(
cid,
Ref.Identifier(
Ref.PackageId.assertFromString("abc"),
Ref.QualifiedName.assertFromString("Main:Template"),
),
Value.ValueUnit,
Set.empty,
Set.empty,
)
)
)
private def runExecutionTest(
dependsOnLedgerTime: Boolean,
contractStoreResults: List[Try[Option[Time.Timestamp]]],
finalExecutionResult: Either[ErrorCause, Time.Timestamp],
) = {
def commandExecutionResult(let: Time.Timestamp) = CommandExecutionResult(
SubmitterInfo(
Nil,
Ref.ApplicationId.assertFromString("foobar"),
Ref.CommandId.assertFromString("foobar"),
DeduplicationDuration(Duration.ofMinutes(1)),
None,
configuration,
),
TransactionMeta(let, None, Time.Timestamp.Epoch, submissionSeed, None, None, None),
transaction,
dependsOnLedgerTime,
5L,
)
val mockExecutor = mock[CommandExecutor]
when(
mockExecutor.execute(any[Commands], any[Hash], any[Configuration])(
any[ExecutionContext],
any[LoggingContext],
)
)
.thenAnswer((c: Commands) =>
Future.successful(Right(commandExecutionResult(c.commands.ledgerEffectiveTime)))
)
val mockContractStore = mock[ContractStore]
contractStoreResults.tail.foldLeft(
when(mockContractStore.lookupMaximumLedgerTime(any[Set[ContractId]])(any[LoggingContext]))
.thenReturn(Future.fromTry(contractStoreResults.head))
) { case (mock, result) =>
mock.andThen(Future.fromTry(result))
}
val commands = Commands(
ledgerId = LedgerId("ledgerId"),
workflowId = None,
applicationId = ApplicationId(Ref.ApplicationId.assertFromString("applicationId")),
commandId = CommandId(Ref.CommandId.assertFromString("commandId")),
submissionId = None,
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Time.Timestamp.Epoch,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
commands = LfCommands(
commands = ImmArray.Empty,
ledgerEffectiveTime = Time.Timestamp.Epoch,
commandsReference = "",
),
)
val instance = new LedgerTimeAwareCommandExecutor(
mockExecutor,
mockContractStore,
3,
new Metrics(new MetricRegistry),
)
LoggingContext.newLoggingContext { implicit context =>
instance.execute(commands, submissionSeed, configuration).map { actual =>
val expectedResult = finalExecutionResult.map(let =>
CommandExecutionResult(
SubmitterInfo(
Nil,
Ref.ApplicationId.assertFromString("foobar"),
Ref.CommandId.assertFromString("foobar"),
DeduplicationDuration(Duration.ofMinutes(1)),
None,
configuration,
),
TransactionMeta(let, None, Time.Timestamp.Epoch, submissionSeed, None, None, None),
transaction,
dependsOnLedgerTime,
5L,
)
)
verify(mockExecutor, times(contractStoreResults.size)).execute(
any[Commands],
any[Hash],
any[Configuration],
)(any[ExecutionContext], any[LoggingContext])
verify(mockContractStore, times(contractStoreResults.size))
.lookupMaximumLedgerTime(Set(cid))
actual shouldEqual expectedResult
}
}
}
val missingCid = Failure(MissingContracts(Set(cid)))
val foundEpoch = Success(Some(Time.Timestamp.Epoch))
val epochPlus5: Time.Timestamp = Time.Timestamp.Epoch.add(Duration.ofSeconds(5))
val foundEpochPlus5 = Success(Some(epochPlus5))
val noLetFound = Success(None)
"LedgerTimeAwareCommandExecutor" when {
"the model doesn't use getTime" should {
"not retry if ledger effective time is found in the contract store" in {
runExecutionTest(
dependsOnLedgerTime = false,
contractStoreResults = List(foundEpoch),
finalExecutionResult = Right(Time.Timestamp.Epoch),
)
}
"not retry if the contract does not have ledger effective time" in {
runExecutionTest(
dependsOnLedgerTime = false,
contractStoreResults = List(noLetFound),
finalExecutionResult = Right(Time.Timestamp.Epoch),
)
}
"retry if the contract cannot be found in the contract store and fail at max retries" in {
runExecutionTest(
dependsOnLedgerTime = false,
contractStoreResults = List(missingCid, missingCid, missingCid, missingCid),
finalExecutionResult = Left(LedgerTime(3)),
)
}
"succeed if the contract can be found on a retry" in {
runExecutionTest(
dependsOnLedgerTime = false,
contractStoreResults = List(missingCid, missingCid, missingCid, foundEpoch),
finalExecutionResult = Right(Time.Timestamp.Epoch),
)
}
"advance the output time if the contract's LET is in the future" in {
runExecutionTest(
dependsOnLedgerTime = false,
contractStoreResults = List(foundEpochPlus5),
finalExecutionResult = Right(epochPlus5),
)
}
}
"the model uses getTime" should {
"retry if the contract's LET is in the future" in {
runExecutionTest(
dependsOnLedgerTime = true,
contractStoreResults = List(
// the first lookup of +5s will cause the interpretation to be restarted,
// in case the usage of getTime with a different LET would result in a different transaction
foundEpochPlus5,
// The second lookup finds the same ledger time again
foundEpochPlus5,
),
finalExecutionResult = Right(epochPlus5),
)
}
"retry if the contract's LET is in the future and then retry if the contract is missing" in {
runExecutionTest(
dependsOnLedgerTime = true,
contractStoreResults = List(
// the first lookup of +5s will cause the interpretation to be restarted,
// in case the usage of getTime with a different LET would result in a different transaction
foundEpochPlus5,
// during the second interpretation the contract was actually archived
// and could not be found during the maximum ledger time lookup.
// this causes yet another restart of the interpretation.
missingCid,
// The third lookup finds the same ledger time again
foundEpochPlus5,
),
finalExecutionResult = Right(epochPlus5),
)
}
}
}
}

View File

@ -19,6 +19,8 @@ import java.sql.Connection
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import com.daml.platform.apiserver.execution.MissingContracts
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
final class PostCommitValidationSpec extends AnyWordSpec with Matchers { final class PostCommitValidationSpec extends AnyWordSpec with Matchers {
@ -584,9 +586,7 @@ object PostCommitValidationSpec {
l.fold(r)(left => r.fold(l)(right => if (left > right) l else r)) l.fold(r)(left => r.fold(l)(right => if (left > right) l else r))
private def notFound(contractIds: Set[ContractId]): Throwable = private def notFound(contractIds: Set[ContractId]): Throwable =
new IllegalArgumentException( MissingContracts(contractIds)
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
)
private def noCommittedContract(parties: List[PartyDetails]): ContractStoreFixture = private def noCommittedContract(parties: List[PartyDetails]): ContractStoreFixture =
ContractStoreFixture( ContractStoreFixture(

View File

@ -20,13 +20,12 @@ import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value.{ContractInstance, ValueRecord, ValueText} import com.daml.lf.value.Value.{ContractInstance, ValueRecord, ValueText}
import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics import com.daml.metrics.Metrics
import com.daml.platform.apiserver.execution.MissingContracts
import com.daml.platform.store.EventSequentialId import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.events.ContractStateEvent import com.daml.platform.store.appendonlydao.events.ContractStateEvent
import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned} import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned}
import com.daml.platform.store.cache.ContractStateValue.{Active, Archived} import com.daml.platform.store.cache.ContractStateValue.{Active, Archived}
import com.daml.platform.store.cache.MutableCacheBackedContractStore.{ import com.daml.platform.store.cache.MutableCacheBackedContractStore.{
ContractNotFound,
EmptyContractIds,
EventSequentialId, EventSequentialId,
SignalNewLedgerHead, SignalNewLedgerHead,
SubscribeToContractStateEvents, SubscribeToContractStateEvents,
@ -350,7 +349,7 @@ class MutableCacheBackedContractStoreSpec
_ = store.cacheIndex.set(unusedOffset, 2L) _ = store.cacheIndex.set(unusedOffset, 2L)
// populate the cache // populate the cache
_ <- store.lookupActiveContract(Set(bob), cId_5) _ <- store.lookupActiveContract(Set(bob), cId_5)
assertion <- recoverToSucceededIf[ContractNotFound]( assertion <- recoverToSucceededIf[MissingContracts](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5)) store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
) )
} yield assertion } yield assertion
@ -360,18 +359,11 @@ class MutableCacheBackedContractStoreSpec
for { for {
store <- contractStore(cachesSize = 0L).asFuture store <- contractStore(cachesSize = 0L).asFuture
_ = store.cacheIndex.set(unusedOffset, 2L) _ = store.cacheIndex.set(unusedOffset, 2L)
assertion <- recoverToSucceededIf[IllegalArgumentException]( assertion <- recoverToSucceededIf[MissingContracts](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5)) store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
) )
} yield assertion } yield assertion
} }
"fail if the requested contract id set is empty" in {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ <- recoverToSucceededIf[EmptyContractIds](store.lookupMaximumLedgerTime(Set.empty))
} yield succeed
}
} }
private def createdEvent( private def createdEvent(
@ -496,14 +488,8 @@ object MutableCacheBackedContractStoreSpec {
): Future[Option[Timestamp]] = ids match { ): Future[Option[Timestamp]] = ids match {
case setIds if setIds == Set(cId_4) => case setIds if setIds == Set(cId_4) =>
Future.successful(Some(t4)) Future.successful(Some(t4))
case set if set.isEmpty =>
Future.failed(EmptyContractIds())
case _ => case _ =>
Future.failed( Future.failed(MissingContracts(ids))
new IllegalArgumentException(
s"The following contracts have not been found: ${ids.map(_.coid).mkString(", ")}"
)
)
} }
override def lookupActiveContractAndLoadArgument( override def lookupActiveContractAndLoadArgument(

View File

@ -53,6 +53,7 @@ import com.daml.lf.transaction.{
} }
import com.daml.lf.value.Value.{ContractId, VersionedContractInstance} import com.daml.lf.value.Value.{ContractId, VersionedContractInstance}
import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.apiserver.execution.MissingContracts
import com.daml.platform.index.TransactionConversion import com.daml.platform.index.TransactionConversion
import com.daml.platform.packages.InMemoryPackageStore import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.participant.util.LfEngineToApi import com.daml.platform.participant.util.LfEngineToApi
@ -339,27 +340,19 @@ private[sandbox] final class InMemoryLedger(
override def lookupMaximumLedgerTime(contractIds: Set[ContractId])(implicit override def lookupMaximumLedgerTime(contractIds: Set[ContractId])(implicit
loggingContext: LoggingContext loggingContext: LoggingContext
): Future[Option[Timestamp]] = ): Future[Option[Timestamp]] =
if (contractIds.isEmpty) {
Future.failed(
new IllegalArgumentException(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)
)
} else {
Future.fromTry(Try(this.synchronized { Future.fromTry(Try(this.synchronized {
contractIds contractIds
.foldLeft[Option[Instant]](Some(Instant.MIN))((acc, id) => { .foldLeft[Option[Instant]](None)((acc, id) => {
val let = acs.activeContracts val let = acs.activeContracts
.getOrElse( .getOrElse(
id, id,
sys.error(s"Contract $id not found while looking for maximum ledger time"), throw MissingContracts(Set(id)),
) )
.let .let
acc.map(acc => if (let.isAfter(acc)) let else acc) acc.map(acc => if (let.isAfter(acc)) let else acc)
}) })
.map(Timestamp.assertFromInstant) .map(Timestamp.assertFromInstant)
})) }))
}
override def publishTransaction( override def publishTransaction(
submitterInfo: state.SubmitterInfo, submitterInfo: state.SubmitterInfo,