update canton to 20240529.13387.v44c5d50b (#19300)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-05-30 09:46:31 +02:00 committed by GitHub
parent 78ccafad33
commit dd76e2da2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 610 additions and 354 deletions

View File

@ -19,7 +19,11 @@ import com.digitalasset.canton.time.{Clock, DomainTimeTracker}
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.PartyTopologySnapshotClient.PartyInfo
import com.digitalasset.canton.topology.processing.*
import com.digitalasset.canton.topology.store.{TopologyStore, TopologyStoreId}
import com.digitalasset.canton.topology.store.{
PackageDependencyResolverUS,
TopologyStore,
TopologyStoreId,
}
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.{TraceContext, TracedScaffeine}
@ -184,7 +188,7 @@ object CachingDomainTopologyClient {
domainId: DomainId,
protocolVersion: ProtocolVersion,
store: TopologyStore[TopologyStoreId.DomainStore],
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependenciesResolver: PackageDependencyResolverUS,
cachingConfigs: CachingConfigs,
batchingConfig: BatchingConfig,
timeouts: ProcessingTimeout,
@ -200,7 +204,7 @@ object CachingDomainTopologyClient {
domainId,
protocolVersion,
store,
packageDependencies,
packageDependenciesResolver,
timeouts,
futureSupervisor,
loggerFactory,
@ -264,13 +268,13 @@ private class ForwardingTopologySnapshotClient(
override def findUnvettedPackagesOrDependencies(
participantId: ParticipantId,
packages: Set[PackageId],
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] =
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
parent.findUnvettedPackagesOrDependencies(participantId, packages)
override private[client] def loadUnvettedPackagesOrDependencies(
participant: ParticipantId,
packageId: PackageId,
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] =
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
parent.loadUnvettedPackagesOrDependencies(participant, packageId)
/** returns the list of currently known mediators */
@ -388,7 +392,7 @@ class CachingTopologySnapshot(
private val packageVettingCache =
TracedScaffeine
.buildTracedAsyncFuture[(ParticipantId, PackageId), Either[PackageId, Set[PackageId]]](
.buildTracedAsyncFutureUS[(ParticipantId, PackageId), Either[PackageId, Set[PackageId]]](
cache = cachingConfigs.packageVettingCache.buildScaffeine(),
traceContext => x => loadUnvettedPackagesOrDependencies(x._1, x._2)(traceContext).value,
)(logger)
@ -457,20 +461,21 @@ class CachingTopologySnapshot(
participants: Seq[ParticipantId]
)(implicit traceContext: TraceContext): Future[Map[ParticipantId, ParticipantAttributes]] =
participantCache.getAll(participants).map(_.collect { case (k, Some(v)) => (k, v) })
override def findUnvettedPackagesOrDependencies(
participantId: ParticipantId,
packages: Set[PackageId],
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] =
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
findUnvettedPackagesOrDependenciesUsingLoader(
participantId,
packages,
(x, y) => EitherT(packageVettingCache.get((x, y))),
(x, y) => EitherT(packageVettingCache.getUS((x, y))),
)
private[client] def loadUnvettedPackagesOrDependencies(
participant: ParticipantId,
packageId: PackageId,
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] =
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
parent.loadUnvettedPackagesOrDependencies(participant, packageId)
override def inspectKeys(

View File

@ -33,7 +33,6 @@ import com.digitalasset.canton.topology.client.PartyTopologySnapshotClient.{
import com.digitalasset.canton.topology.processing.TopologyTransactionProcessingSubscriber
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.SingleUseCell
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{LfPartyId, checked}
@ -475,8 +474,7 @@ trait VettedPackagesSnapshotClient {
def findUnvettedPackagesOrDependencies(
participantId: ParticipantId,
packages: Set[PackageId],
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]]
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]]
}
trait DomainGovernanceSnapshotClient {
@ -886,21 +884,21 @@ trait VettedPackagesSnapshotLoader extends VettedPackagesSnapshotClient {
private[client] def loadUnvettedPackagesOrDependencies(
participant: ParticipantId,
packageId: PackageId,
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]]
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]]
protected def findUnvettedPackagesOrDependenciesUsingLoader(
participantId: ParticipantId,
packages: Set[PackageId],
loader: (ParticipantId, PackageId) => EitherT[Future, PackageId, Set[PackageId]],
): EitherT[Future, PackageId, Set[PackageId]] =
loader: (ParticipantId, PackageId) => EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]],
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
packages.toList
.parFlatTraverse(packageId => loader(participantId, packageId).map(_.toList))
.map(_.toSet)
.parTraverse(packageId => loader(participantId, packageId))
.map(_.flatten.toSet)
override def findUnvettedPackagesOrDependencies(
participantId: ParticipantId,
packages: Set[PackageId],
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] =
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
findUnvettedPackagesOrDependenciesUsingLoader(
participantId,
packages,

View File

@ -4,7 +4,6 @@
package com.digitalasset.canton.topology.client
import cats.data.EitherT
import cats.syntax.either.*
import cats.syntax.functor.*
import com.daml.lf.data.Ref.PackageId
import com.daml.nameof.NameOf.functionFullName
@ -18,7 +17,11 @@ import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.time.{Clock, TimeAwaiter}
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.topology.processing.{ApproximateTime, EffectiveTime, SequencedTime}
import com.digitalasset.canton.topology.store.{TopologyStore, TopologyStoreId}
import com.digitalasset.canton.topology.store.{
PackageDependencyResolverUS,
TopologyStore,
TopologyStoreId,
}
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ErrorUtil
@ -116,7 +119,7 @@ class StoreBasedDomainTopologyClient(
val domainId: DomainId,
protocolVersion: ProtocolVersion,
store: TopologyStore[TopologyStoreId],
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependenciesResolver: PackageDependencyResolverUS,
override val timeouts: ProcessingTimeout,
override protected val futureSupervisor: FutureSupervisor,
val loggerFactory: NamedLoggerFactory,
@ -246,7 +249,7 @@ class StoreBasedDomainTopologyClient(
new StoreBasedTopologySnapshot(
timestamp,
store,
packageDependencies,
packageDependenciesResolver,
loggerFactory,
)
}
@ -315,7 +318,12 @@ class StoreBasedDomainTopologyClient(
object StoreBasedDomainTopologyClient {
def NoPackageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]] = { _ =>
EitherT(Future.successful(Either.right(Set.empty[PackageId])))
object NoPackageDependencies extends PackageDependencyResolverUS {
override def packageDependencies(packagesId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] =
EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]](
FutureUnlessShutdown.pure(Right(Set.empty[PackageId]))
)
}
}

View File

@ -34,12 +34,12 @@ import scala.reflect.ClassTag
*
* @param timestamp the asOf timestamp to use
* @param store the db store to use
* @param packageDependencies lookup function to determine the direct and indirect package dependencies
* @param packageDependencyResolver provides a way determine the direct and indirect package dependencies
*/
class StoreBasedTopologySnapshot(
val timestamp: CantonTimestamp,
store: TopologyStore[TopologyStoreId],
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
val loggerFactory: NamedLoggerFactory,
)(implicit val executionContext: ExecutionContext)
extends TopologySnapshotLoader
@ -66,9 +66,11 @@ class StoreBasedTopologySnapshot(
override private[client] def loadUnvettedPackagesOrDependencies(
participant: ParticipantId,
packageId: PackageId,
)(implicit traceContext: TraceContext): EitherT[Future, PackageId, Set[PackageId]] = {
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] = {
val vettedET = EitherT.right[PackageId](
val vettedF = FutureUnlessShutdown.outcomeF(
findTransactions(
asOfInclusive = false,
types = Seq(TopologyMapping.Code.VettedPackages),
@ -82,9 +84,9 @@ class StoreBasedTopologySnapshot(
}
)
val requiredPackagesET = store.storeId match {
val requiredPackagesF = store.storeId match {
case _: TopologyStoreId.DomainStore =>
EitherT.right[PackageId](
FutureUnlessShutdown.outcomeF(
findTransactions(
asOfInclusive = false,
types = Seq(TopologyMapping.Code.DomainParametersState),
@ -96,31 +98,30 @@ class StoreBasedTopologySnapshot(
transactions.collectOfMapping[DomainParametersState].result,
).getOrElse(throw new IllegalStateException("Unable to locate domain parameters state"))
.discard
// TODO(#14054) Once the non-proto DynamicDomainParameters is available, use it
// _.parameters.requiredPackages
Seq.empty[PackageId]
Set.empty[PackageId]
}
)
case TopologyStoreId.AuthorizedStore =>
EitherT.pure[Future, PackageId](Seq.empty)
FutureUnlessShutdown.pure(Set.empty)
}
lazy val dependenciesET = packageDependencies(packageId)
lazy val dependenciesET = packageDependencyResolver.packageDependencies(packageId).value
for {
vetted <- vettedET
requiredPackages <- requiredPackagesET
EitherT(for {
vetted <- vettedF
requiredPackages <- requiredPackagesF
// check that the main package is vetted
res <-
if (!vetted.contains(packageId))
EitherT.rightT[Future, PackageId](Set(packageId)) // main package is not vetted
FutureUnlessShutdown.pure(Right(Set(packageId))) // main package is not vetted
else {
// check which of the dependencies aren't vetted
dependenciesET.map(deps => (deps ++ requiredPackages) -- vetted)
dependenciesET.map(_.map(_ ++ requiredPackages -- vetted))
}
} yield res
} yield res)
}

View File

@ -3,7 +3,9 @@
package com.digitalasset.canton.topology.store
import cats.data.EitherT
import cats.syntax.traverse.*
import com.daml.lf.data.Ref.PackageId
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.config.CantonRequireTypes.{LengthLimitedString, String255}
@ -549,3 +551,11 @@ object TimeQuery {
} yield Range(fromO, toO)
}
}
trait PackageDependencyResolverUS {
def packageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]]
}

View File

@ -3,6 +3,7 @@
package com.digitalasset.canton.tracing
import com.daml.scalautil.Statement.discard
import com.digitalasset.canton.concurrent.DirectExecutionContext
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.lifecycle.FutureUnlessShutdownImpl.AbortedDueToShutdownException
@ -79,6 +80,11 @@ class TracedAsyncLoadingCache[K, V](
def get(key: K)(implicit traceContext: TraceContext): Future[V] =
underlying.get(TracedKey(key)(traceContext))
// Remove those mappings for which the predicate p returns true
def clear(filter: (K, V) => Boolean): Unit = {
discard(underlying.synchronous().asMap().filterInPlace((t, v) => !filter(t.key, v)))
}
def getUS(key: K)(implicit traceContext: TraceContext): FutureUnlessShutdown[V] = {
try
FutureUnlessShutdown.outcomeF(underlying.get(TracedKey(key)(traceContext)))

View File

@ -8,6 +8,7 @@ import com.digitalasset.canton.config.CachingConfigs
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, UnlessShutdown}
import org.scalatest.wordspec.AsyncWordSpec
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
class TracedScaffeineTest extends AsyncWordSpec with BaseTest {
@ -45,6 +46,35 @@ class TracedScaffeineTest extends AsyncWordSpec with BaseTest {
succeed
}
}.failOnShutdown
"Allow entries to be cleared" in {
val loads = new AtomicInteger(0)
def getValueCount(input: Int): FutureUnlessShutdown[Int] = {
loads.incrementAndGet()
FutureUnlessShutdown.pure(input)
}
val keysCache =
TracedScaffeine.buildTracedAsyncFutureUS[Int, Int](
cache = CachingConfigs.testing.mySigningKeyCache.buildScaffeine(),
loader = traceContext => input => getValueCount(input),
)(logger)
for {
_ <- keysCache.getUS(2)
_ <- keysCache.getUS(3)
_ <- keysCache.getAllUS(Seq(2, 3)).map { m =>
keysCache.clear((i, _) => i == 2)
m
}
_ <- keysCache.getUS(2)
_ <- keysCache.getUS(3)
} yield {
loads.get() shouldBe 3 // Initial 2 + 1 reload
}
}.failOnShutdown
}
}

View File

@ -70,7 +70,8 @@ final class ApiUpdateService(
validation.fold(
t => Source.failed(ValidationLogger.logFailureWithTrace(logger, request, t)),
req =>
if (req.filter.filtersByParty.isEmpty) Source.empty
if (req.filter.filtersByParty.isEmpty && req.filter.filtersForAnyParty.isEmpty)
Source.empty
else {
LoggingContextWithTrace.withEnrichedLoggingContext(
logging.startExclusive(req.startExclusive),
@ -118,7 +119,8 @@ final class ApiUpdateService(
validation.fold(
t => Source.failed(ValidationLogger.logFailureWithTrace(logger, request, t)),
req =>
if (req.filter.filtersByParty.isEmpty) Source.empty
if (req.filter.filtersByParty.isEmpty && req.filter.filtersForAnyParty.isEmpty)
Source.empty
else {
LoggingContextWithTrace.withEnrichedLoggingContext(
logging.startExclusive(req.startExclusive),

View File

@ -170,7 +170,10 @@ private[index] class IndexServiceImpl(
transactionFilter,
getPackageMetadataSnapshot(contextualizedErrorLogger),
) {
val parties = transactionFilter.filtersByParty.keySet
val parties =
if (transactionFilter.filtersForAnyParty.isEmpty)
Some(transactionFilter.filtersByParty.keySet)
else None // party-wildcard
between(startExclusive, endInclusive) { (from, to) =>
from.foreach(offset =>
Spans.setCurrentSpanAttribute(SpanAttribute.OffsetFrom, offset.toHexString)
@ -196,7 +199,9 @@ private[index] class IndexServiceImpl(
.getTransactionTrees(
startExclusive,
endInclusive,
parties, // on the query filter side we treat every party as template-wildcard party // TODO(#18362) [transaction trees] take into account party-wildcard
// on the query filter side we treat every party as template-wildcard party,
// if the party-wildcard is given then the transactions for all the templates and all the parties are fetched
parties,
eventProjectionProperties,
)
}
@ -611,8 +616,7 @@ object IndexServiceImpl {
metadata: PackageMetadata,
alwaysPopulateArguments: Boolean,
): Option[(TemplatePartiesFilter, EventProjectionProperties)] = {
val templateFilter
: Map[Identifier, Option[Set[Party]]] = // TODO(#18362) use type SetOrWildcard[Party] (see at the bottom of the file its definition)
val templateFilter: Map[Identifier, Option[Set[Party]]] =
IndexServiceImpl.templateFilter(metadata, transactionFilter)
val templateWildcardFilter: Option[Set[Party]] =
@ -742,17 +746,4 @@ object IndexServiceImpl {
}
}
// TODO(#18362)
// sealed trait SetOrWildcard[A] {
// final def isWildcard: Boolean = this eq Wildcard
// def isEmpty: Boolean
// }
// final object Wildcard extends SetOrWildcard[Nothing] {
// def isEmpty: Boolean = false
//
// }
// final case class ExplicitSet[A](set: Set[A]) extends SetOrWildcard[A] {
// def isEmpty: Boolean = set.isEmpty
// }
}

View File

@ -266,7 +266,7 @@ object EventStorageBackendTemplate {
)
private def createdTreeEventParser(
allQueryingParties: Set[Int],
allQueryingPartiesO: Option[Set[Int]],
stringInterning: StringInterning,
): RowParser[EventStorageBackend.Entry[Raw.TreeEvent.Created]] =
createdEventRow map {
@ -284,7 +284,7 @@ object EventStorageBackendTemplate {
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId
.filter(commandId =>
commandId != "" && submitters.getOrElse(Array.empty).exists(allQueryingParties)
commandId != "" && submittersInQueryingParties(submitters, allQueryingPartiesO)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -309,10 +309,12 @@ object EventStorageBackendTemplate {
.map(_.map(stringInterning.party.unsafe.externalize))
.getOrElse(Array.empty),
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.view
.filter(allQueryingParties)
allQueryingPartiesO
.fold(eventWitnesses)(allQueryingParties =>
eventWitnesses
.filter(allQueryingParties)
)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
driverMetadata = driverMetadata,
),
@ -323,7 +325,7 @@ object EventStorageBackendTemplate {
}
private def exercisedTreeEventParser(
allQueryingParties: Set[Int],
allQueryingPartiesO: Option[Set[Int]],
stringInterning: StringInterning,
): RowParser[EventStorageBackend.Entry[Raw.TreeEvent.Exercised]] =
exercisedEventRow map {
@ -340,7 +342,7 @@ object EventStorageBackendTemplate {
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId
.filter(commandId =>
commandId.nonEmpty && submitters.getOrElse(Array.empty).exists(allQueryingParties)
commandId.nonEmpty && submittersInQueryingParties(submitters, allQueryingPartiesO)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
@ -361,10 +363,12 @@ object EventStorageBackendTemplate {
),
exerciseChildEventIds = ArraySeq.unsafeWrapArray(exerciseChildEventIds),
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.view
.filter(allQueryingParties)
allQueryingPartiesO
.fold(eventWitnesses)(allQueryingParties =>
eventWitnesses
.filter(allQueryingParties)
)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
),
domainId = stringInterning.domainId.unsafe.externalize(internedDomainId),
@ -374,7 +378,7 @@ object EventStorageBackendTemplate {
}
def rawTreeEventParser(
allQueryingParties: Set[Int],
allQueryingParties: Option[Set[Int]],
stringInterning: StringInterning,
): RowParser[EventStorageBackend.Entry[Raw.TreeEvent]] =
createdTreeEventParser(allQueryingParties, stringInterning) | exercisedTreeEventParser(

View File

@ -99,7 +99,7 @@ class TransactionPointwiseQueries(
),
),
requestingParties = requestingParties,
filteringRowParser = rawTreeEventParser(_, stringInterning),
filteringRowParser = ps => rawTreeEventParser(Some(ps), stringInterning),
)(connection)
}

View File

@ -140,7 +140,7 @@ class TransactionStreamingQueries(
def fetchEventPayloadsTree(target: EventPayloadSourceForTreeTx)(
eventSequentialIds: Iterable[Long],
allFilterParties: Set[Ref.Party],
allFilterParties: Option[Set[Ref.Party]],
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]] = {
target match {
case EventPayloadSourceForTreeTx.Consuming =>
@ -239,12 +239,15 @@ class TransactionStreamingQueries(
tableName: String,
selectColumns: String,
eventSequentialIds: Iterable[Long],
allFilterParties: Set[Ref.Party],
allFilterParties: Option[Set[Ref.Party]],
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]] = {
val internedAllParties: Set[Int] = allFilterParties.iterator
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
val internedAllParties: Option[Set[Int]] = allFilterParties
.map(
_.iterator
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
)
SQL"""
SELECT
#$selectColumns,

View File

@ -47,7 +47,7 @@ private[platform] trait LedgerDaoTransactionsReader {
def getTransactionTrees(
startExclusive: Offset,
endInclusive: Offset,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
)(implicit
loggingContext: LoggingContextWithTrace

View File

@ -462,7 +462,7 @@ class ACSReader(
case Some(Some(filterParties)) => filterParties.exists(witnesses)
case Some(None) => true // party wildcard
case None =>
false // templateId is not in the filter // TODO(#18362) handle template wildcards
false // templateId is not in the filter
}
)

View File

@ -42,7 +42,7 @@ private[events] class BufferedTransactionsReader(
GetUpdatesResponse,
],
bufferedTransactionTreesReader: BufferedStreamsReader[
(Set[Party], EventProjectionProperties),
(Option[Set[Party]], EventProjectionProperties),
GetUpdateTreesResponse,
],
bufferedFlatTransactionByIdReader: BufferedTransactionByIdReader[
@ -89,7 +89,7 @@ private[events] class BufferedTransactionsReader(
override def getTransactionTrees(
startExclusive: Offset,
endInclusive: Offset,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
)(implicit
loggingContext: LoggingContextWithTrace
@ -178,18 +178,18 @@ private[platform] object BufferedTransactionsReader {
val transactionTreesStreamReader =
new BufferedStreamsReader[
(Set[Party], EventProjectionProperties),
(Option[Set[Party]], EventProjectionProperties),
GetUpdateTreesResponse,
](
inMemoryFanoutBuffer = transactionsBuffer,
fetchFromPersistence = new FetchFromPersistence[
(Set[Party], EventProjectionProperties),
(Option[Set[Party]], EventProjectionProperties),
GetUpdateTreesResponse,
] {
override def apply(
startExclusive: Offset,
endInclusive: Offset,
filter: (Set[Party], EventProjectionProperties),
filter: (Option[Set[Party]], EventProjectionProperties),
)(implicit
loggingContext: LoggingContextWithTrace
): Source[(Offset, GetUpdateTreesResponse), NotUsed] = {

View File

@ -112,7 +112,7 @@ private[events] object TransactionLogUpdatesConversions {
case traced @ Traced(reassignmentAccepted: TransactionLogUpdate.ReassignmentAccepted) =>
toReassignment(
reassignmentAccepted,
filter.allFilterParties.getOrElse(Set()), // TODO(#18362) [flat transactions]
filter.allFilterParties,
eventProjectionProperties,
lfValueTranslation,
traced.traceContext,
@ -135,7 +135,7 @@ private[events] object TransactionLogUpdatesConversions {
): Future[Option[GetTransactionResponse]] =
filter(Some(requestingParties), Map.empty, Some(requestingParties))(
transactionLogUpdate
) // TODO(#18362) [pointwise queries]
)
.collect {
case traced @ Traced(transactionAccepted: TransactionLogUpdate.TransactionAccepted) =>
toFlatTransaction(
@ -170,7 +170,7 @@ private[events] object TransactionLogUpdatesConversions {
.traverse(transactionAccepted.events)(event =>
toFlatEvent(
event,
filter.allFilterParties.getOrElse(Set.empty),
filter.allFilterParties,
eventProjectionProperties,
lfValueTranslation,
)
@ -217,15 +217,14 @@ private[events] object TransactionLogUpdatesConversions {
.get(event.templateId) match {
case Some(Some(filterParties)) => filterParties.exists(event.flatEventWitnesses)
case Some(None) => true // party wildcard
case None =>
false // templateId is not in the filter // TODO(#18362) handle template wildcards
case None => false // templateId is not in the filter
})
}
private def toFlatEvent(
event: TransactionLogUpdate.Event,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
)(implicit
@ -249,7 +248,7 @@ private[events] object TransactionLogUpdatesConversions {
}
private def exercisedToFlatEvent(
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
exercisedEvent: ExercisedEvent,
): apiEvent.Event =
apiEvent.Event(
@ -259,8 +258,11 @@ private[events] object TransactionLogUpdatesConversions {
contractId = exercisedEvent.contractId.coid,
templateId = Some(LfEngineToApi.toApiIdentifier(exercisedEvent.templateId)),
packageName = exercisedEvent.packageName,
witnessParties =
requestingParties.iterator.filter(exercisedEvent.flatEventWitnesses).toSeq,
witnessParties = requestingParties match {
case Some(parties) => parties.iterator.filter(exercisedEvent.flatEventWitnesses).toSeq
// party-wildcard
case None => exercisedEvent.flatEventWitnesses.toSeq
},
)
)
)
@ -268,7 +270,7 @@ private[events] object TransactionLogUpdatesConversions {
object ToTransactionTree {
def filter(
requestingParties: Set[Party]
requestingParties: Option[Set[Party]]
): Traced[TransactionLogUpdate] => Option[Traced[TransactionLogUpdate]] = traced =>
traced.traverse {
case transaction: TransactionLogUpdate.TransactionAccepted =>
@ -281,7 +283,7 @@ private[events] object TransactionLogUpdatesConversions {
case _: TransactionLogUpdate.TransactionRejected => None
case u: TransactionLogUpdate.ReassignmentAccepted =>
Option.when(
u.reassignmentInfo.hostedStakeholders.exists(requestingParties)
requestingParties.fold(true)(u.reassignmentInfo.hostedStakeholders.exists(_))
)(u)
}
@ -293,11 +295,11 @@ private[events] object TransactionLogUpdatesConversions {
loggingContext: LoggingContextWithTrace,
executionContext: ExecutionContext,
): Future[Option[GetTransactionTreeResponse]] =
filter(requestingParties)(transactionLogUpdate)
filter(Some(requestingParties))(transactionLogUpdate)
.collect { case traced @ Traced(tx: TransactionLogUpdate.TransactionAccepted) =>
toTransactionTree(
transactionAccepted = tx,
requestingParties,
Some(requestingParties),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(requestingParties.map(_.toString)),
@ -310,7 +312,7 @@ private[events] object TransactionLogUpdatesConversions {
.getOrElse(Future.successful(None))
def toGetTransactionTreesResponse(
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
)(implicit
@ -347,7 +349,7 @@ private[events] object TransactionLogUpdatesConversions {
private def toTransactionTree(
transactionAccepted: TransactionLogUpdate.TransactionAccepted,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
traceContext: TraceContext,
@ -380,7 +382,7 @@ private[events] object TransactionLogUpdatesConversions {
TransactionTree(
updateId = transactionAccepted.transactionId,
commandId = getCommandId(transactionAccepted.events, Some(requestingParties)),
commandId = getCommandId(transactionAccepted.events, requestingParties),
workflowId = transactionAccepted.workflowId,
effectiveAt = Some(TimestampConversion.fromLf(transactionAccepted.effectiveAt)),
offset = ApiOffset.toApiString(transactionAccepted.offset),
@ -394,7 +396,7 @@ private[events] object TransactionLogUpdatesConversions {
}
private def toTransactionTreeEvent(
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
)(event: TransactionLogUpdate.Event)(implicit
@ -421,7 +423,7 @@ private[events] object TransactionLogUpdatesConversions {
}
private def exercisedToTransactionTreeEvent(
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
verbose: Boolean,
lfValueTranslation: LfValueTranslation,
exercisedEvent: ExercisedEvent,
@ -481,7 +483,11 @@ private[events] object TransactionLogUpdatesConversions {
choiceArgument = Some(choiceArgument),
actingParties = exercisedEvent.actingParties.toSeq,
consuming = exercisedEvent.consuming,
witnessParties = requestingParties.view.filter(exercisedEvent.treeEventWitnesses).toSeq,
witnessParties = requestingParties
.fold(exercisedEvent.treeEventWitnesses)(
_.filter(exercisedEvent.treeEventWitnesses)
)
.toSeq,
childEventIds = exercisedEvent.children,
exerciseResult = maybeExerciseResult,
)
@ -490,16 +496,18 @@ private[events] object TransactionLogUpdatesConversions {
}
private def transactionTreePredicate(
requestingParties: Set[Party]
requestingPartiesO: Option[Set[Party]]
): TransactionLogUpdate.Event => Boolean = {
case createdEvent: CreatedEvent => requestingParties.exists(createdEvent.treeEventWitnesses)
case exercised: ExercisedEvent => requestingParties.exists(exercised.treeEventWitnesses)
case createdEvent: CreatedEvent =>
requestingPartiesO.fold(true)(_.exists(createdEvent.treeEventWitnesses))
case exercised: ExercisedEvent =>
requestingPartiesO.fold(true)(_.exists(exercised.treeEventWitnesses))
case _ => false
}
}
private def createdToApiCreatedEvent(
requestingParties: Set[Party],
requestingPartiesO: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
createdEvent: CreatedEvent,
@ -533,12 +541,16 @@ private[events] object TransactionLogUpdatesConversions {
)
)
val createdEventWitnesses = createdWitnesses(createdEvent)
val witnesses = requestingPartiesO
.fold(createdEventWitnesses)(_.view.filter(createdEventWitnesses).toSet)
.map(_.toString)
lfValueTranslation
.toApiContractData(
value = createdEvent.createArgument,
key = createdEvent.contractKey,
templateId = createdEvent.templateId,
witnesses = requestingParties.view.filter(createdWitnesses(createdEvent)).toSet,
witnesses = witnesses,
eventProjectionProperties = eventProjectionProperties,
fatContractInstance = getFatContractInstance,
)
@ -552,7 +564,7 @@ private[events] object TransactionLogUpdatesConversions {
createArguments = apiContractData.createArguments,
createdEventBlob = apiContractData.createdEventBlob.getOrElse(ByteString.EMPTY),
interfaceViews = apiContractData.interfaceViews,
witnessParties = requestingParties.view.filter(createdWitnesses(createdEvent)).toSeq,
witnessParties = witnesses.toSeq,
signatories = createdEvent.createSignatories.toSeq,
observers = createdEvent.createObservers.toSeq,
createdAt = Some(TimestampConversion.fromLf(createdEvent.ledgerEffectiveTime)),
@ -573,7 +585,7 @@ private[events] object TransactionLogUpdatesConversions {
private def toReassignment(
reassignmentAccepted: TransactionLogUpdate.ReassignmentAccepted,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
lfValueTranslation: LfValueTranslation,
traceContext: TraceContext,
@ -581,12 +593,12 @@ private[events] object TransactionLogUpdatesConversions {
loggingContext: LoggingContextWithTrace,
executionContext: ExecutionContext,
): Future[ApiReassignment] = {
val stringRequestingParties = requestingParties.map(_.toString)
val stringRequestingParties = requestingParties.map(_.map(_.toString))
val info = reassignmentAccepted.reassignmentInfo
(reassignmentAccepted.reassignment match {
case TransactionLogUpdate.ReassignmentAccepted.Assigned(createdEvent) =>
createdToApiCreatedEvent(
requestingParties = requestingParties,
requestingPartiesO = requestingParties,
eventProjectionProperties = eventProjectionProperties,
lfValueTranslation = lfValueTranslation,
createdEvent = createdEvent,
@ -605,6 +617,7 @@ private[events] object TransactionLogUpdatesConversions {
)
case TransactionLogUpdate.ReassignmentAccepted.Unassigned(unassign) =>
val stakeholders = reassignmentAccepted.reassignmentInfo.hostedStakeholders
Future.successful(
ApiReassignment.Event.UnassignedEvent(
ApiUnassignedEvent(
@ -618,8 +631,7 @@ private[events] object TransactionLogUpdatesConversions {
packageName = unassign.packageName,
assignmentExclusivity =
unassign.assignmentExclusivity.map(TimestampConversion.fromLf),
witnessParties = reassignmentAccepted.reassignmentInfo.hostedStakeholders
.filter(requestingParties),
witnessParties = requestingParties.fold(stakeholders)(stakeholders.filter),
)
)
)
@ -627,7 +639,7 @@ private[events] object TransactionLogUpdatesConversions {
ApiReassignment(
updateId = reassignmentAccepted.updateId,
commandId = reassignmentAccepted.completionDetails
.filter(_.submitters.exists(stringRequestingParties))
.filter(details => stringRequestingParties.fold(true)(details.submitters.exists))
.flatMap(_.completionStreamResponse.completion)
.map(_.commandId)
.getOrElse(""),

View File

@ -120,7 +120,7 @@ private[dao] final class TransactionsReader(
override def getTransactionTrees(
startExclusive: Offset,
endInclusive: Offset,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
)(implicit
loggingContext: LoggingContextWithTrace

View File

@ -69,7 +69,7 @@ class TransactionsTreeStreamReader(
def streamTreeTransaction(
queryRange: EventsRange,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
)(implicit
loggingContext: LoggingContextWithTrace
@ -112,7 +112,7 @@ class TransactionsTreeStreamReader(
private def doStreamTreeTransaction(
queryRange: EventsRange,
requestingParties: Set[Party],
requestingParties: Option[Set[Party]],
eventProjectionProperties: EventProjectionProperties,
)(implicit
loggingContext: LoggingContextWithTrace
@ -127,7 +127,8 @@ class TransactionsTreeStreamReader(
new QueueBasedConcurrencyLimiter(maxParallelPayloadQueries, executionContext)
val deserializationQueriesLimiter =
new QueueBasedConcurrencyLimiter(transactionsProcessingParallelism, executionContext)
val filterParties = requestingParties.toVector
val filterParties: Vector[Option[Party]] =
requestingParties.fold(Vector(None: Option[Party]))(_.map(Some(_)).toVector)
val idPageSizing = IdPageSizing.calculateFrom(
maxIdPageSize = maxIdsPerIdPage,
// The ids for tree transactions are retrieved from seven separate id tables:
@ -146,7 +147,7 @@ class TransactionsTreeStreamReader(
)
def fetchIds(
filterParty: Party,
filterParty: Option[Party],
target: EventIdSourceForInformees,
maxParallelIdQueriesLimiter: QueueBasedConcurrencyLimiter,
metric: DatabaseMetrics,
@ -163,7 +164,7 @@ class TransactionsTreeStreamReader(
eventStorageBackend.transactionStreamingQueries.fetchEventIdsForInformee(
target = target
)(
informeeO = Some(filterParty), // TODO(#18362) add filters for transaction trees
informeeO = filterParty,
startExclusive = state.fromIdExclusive,
endInclusive = queryRange.endInclusiveEventSeqId,
limit = state.pageSize,
@ -307,15 +308,13 @@ class TransactionsTreeStreamReader(
queryRange = queryRange,
filteringConstraints = TemplatePartiesFilter(
relation = Map.empty,
templateWildcardParties = Some(requestingParties),
templateWildcardParties = requestingParties,
),
eventProjectionProperties = eventProjectionProperties,
payloadQueriesLimiter = payloadQueriesLimiter,
deserializationQueriesLimiter = deserializationQueriesLimiter,
idPageSizing = idPageSizing,
decomposedFilters = requestingParties // TODO(#18362) add filters for transaction trees
.map(p => DecomposedFilter(Some(p), None))
.toVector,
decomposedFilters = filterParties.map(DecomposedFilter(_, None)),
maxParallelIdAssignQueries = maxParallelIdAssignQueries,
maxParallelIdUnassignQueries = maxParallelIdUnassignQueries,
maxPagesPerIdPagesBuffer = maxPagesPerIdPagesBuffer,

View File

@ -70,12 +70,12 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac
executeSql(
backend.event.transactionStreamingQueries.fetchEventPayloadsTree(
EventPayloadSourceForTreeTx.Create
)(List(1L), Set.empty)
)(List(1L), Some(Set.empty))
) ++
executeSql(
backend.event.transactionStreamingQueries.fetchEventPayloadsTree(
EventPayloadSourceForTreeTx.Consuming
)(List(2L), Set.empty)
)(List(2L), Some(Set.empty))
)
}

View File

@ -16,7 +16,6 @@ import org.scalatest.{Assertion, OptionValues}
import scala.reflect.ClassTag
// TODO(i12294): Complete test suite for asserting flat/tree/ACS streams contents
private[backend] trait StorageBackendTestsTransactionStreamsEvents
extends Matchers
with OptionValues
@ -68,24 +67,43 @@ private[backend] trait StorageBackendTestsTransactionStreamsEvents
val someParty = Ref.Party.assertFromString(signatory)
val (
// TODO(#18362) extend this test when transaction streams w/ party-wildcard are implemented
_flatTransactionEvents,
_transactionTreeEvents,
flatTransactionEvents,
transactionTreeEvents,
_flatTransaction,
_transactionTree,
acs,
) = fetch(Some(Set(someParty)))
flatTransactionEvents.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
flatTransactionEvents.map(_.event).collect { case created: FlatEvent.Created =>
created.partial.contractId
} shouldBe Vector(contractId1, contractId2, contractId3, contractId4).map(_.coid)
transactionTreeEvents.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
transactionTreeEvents.map(_.event).collect { case created: TreeEvent.Created =>
created.partial.contractId
} shouldBe Vector(contractId1, contractId2, contractId3, contractId4).map(_.coid)
acs.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
val (
_,
_,
flatTransactionEventsSuperReader,
transactionTreeEventsSuperReader,
_,
_,
acsSuperReader,
) = fetch(None)
flatTransactionEventsSuperReader.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
flatTransactionEventsSuperReader.map(_.event).collect { case created: FlatEvent.Created =>
created.partial.contractId
} shouldBe Vector(contractId1, contractId2, contractId3, contractId4).map(_.coid)
transactionTreeEventsSuperReader.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
transactionTreeEventsSuperReader.map(_.event).collect { case created: TreeEvent.Created =>
created.partial.contractId
} shouldBe Vector(contractId1, contractId2, contractId3, contractId4).map(_.coid)
acsSuperReader.map(_.eventSequentialId) shouldBe Vector(1L, 2L, 3L, 4L)
}
@ -101,12 +119,12 @@ private[backend] trait StorageBackendTestsTransactionStreamsEvents
val flatTransactionEvents = executeSql(
backend.event.transactionStreamingQueries.fetchEventPayloadsFlat(
EventPayloadSourceForFlatTx.Create
)(eventSequentialIds = Seq(1L), filterParties)
)(eventSequentialIds = Seq(1L, 2L, 3L, 4L), filterParties)
)
val transactionTreeEvents = executeSql(
backend.event.transactionStreamingQueries.fetchEventPayloadsTree(
EventPayloadSourceForTreeTx.Create
)(eventSequentialIds = Seq(1L), filterParties.getOrElse(Set.empty))
)(eventSequentialIds = Seq(1L, 2L, 3L, 4L), filterParties)
)
val flatTransaction = executeSql(
backend.event.transactionPointwiseQueries

View File

@ -524,21 +524,22 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec
.getActiveContracts(
activeAt = ledgerEnd.lastOffset,
filter = TemplatePartiesFilter(
Map(
relation = Map(
someTemplateId -> Some(Set(party1)),
unknownTemplate -> None,
),
Some(Set(party2)),
templateWildcardParties = Some(Set(party2)),
),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = None,
templateWildcardWitnesses = Some(Set(party2)),
witnessTemplateProjections = Map(
// TODO(#18362) wildcard for unknownTemplate
Some(party1) -> Map(
someTemplateId -> Projection(contractArguments = true),
unknownTemplate -> Projection(contractArguments = true),
)
someTemplateId -> Projection(contractArguments = true)
),
None -> Map(
unknownTemplate -> Projection(contractArguments = true)
),
),
),
)

View File

@ -218,7 +218,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
.getTransactionTrees(
startExclusive = from,
endInclusive = to,
requestingParties = Set(alice, bob, charlie),
requestingParties = Some(Set(alice, bob, charlie)),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(Set(alice, bob, charlie)),
@ -230,7 +230,39 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
}
}
it should "filter correctly by party" in { // TODO(#18362) add test case for wildcard
it should "work correctly for party-wildcard" in {
for {
(from, to, _) <- storeTestFixture()
result <- transactionsOf(
ledgerDao.transactionsReader
.getTransactionTrees(
startExclusive = from,
endInclusive = to,
requestingParties = Some(Set(alice, bob, charlie)),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(Set(alice, bob, charlie)),
),
)
)
resultPartyWildcard <- transactionsOf(
ledgerDao.transactionsReader
.getTransactionTrees(
startExclusive = from,
endInclusive = to,
requestingParties = None,
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = None,
),
)
)
} yield {
comparable(result) should contain theSameElementsInOrderAs comparable(resultPartyWildcard)
}
}
it should "filter correctly by party" in {
for {
from <- ledgerDao.lookupLedgerEnd()
(_, tx) <- store(
@ -251,7 +283,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
.getTransactionTrees(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
requestingParties = Set(alice),
requestingParties = Some(Set(alice)),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(Set(alice)),
@ -263,7 +295,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
.getTransactionTrees(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
requestingParties = Set(bob),
requestingParties = Some(Set(bob)),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(Set(bob)),
@ -275,7 +307,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
.getTransactionTrees(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
requestingParties = Set(charlie),
requestingParties = Some(Set(charlie)),
eventProjectionProperties = EventProjectionProperties(
verbose = true,
templateWildcardWitnesses = Some(Set(charlie)),

View File

@ -171,7 +171,6 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
}
behavior of "JdbcLedgerDao (getFlatTransactions)"
// TODO(#18362) extend for party-wildcard
it should "match the results of lookupFlatTransactionById" in {
for {
@ -305,14 +304,27 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
filter = TemplatePartiesFilter(
Map(
relation = Map(
otherTemplateId -> Some(Set(alice, bob))
),
Some(Set.empty),
templateWildcardParties = Some(Set.empty),
),
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
)
resultPartyWildcard <- transactionsOf(
ledgerDao.transactionsReader
.getFlatTransactions(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
filter = TemplatePartiesFilter(
relation = Map(otherTemplateId -> None),
templateWildcardParties = Some(Set.empty),
),
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
)
} yield {
val events = result.loneElement.events.toArray
events should have length 2
@ -324,6 +336,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
create.witnessParties.loneElement shouldBe alice
create.templateId.value shouldBe LfEngineToApi.toApiIdentifier(otherTemplateId)
}
// clear out commandId since submitter is not in the querying parties for flat transactions in the non-wildcard query
resultPartyWildcard.loneElement.copy(commandId = "") shouldBe result.loneElement
}
}
@ -347,11 +361,26 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
filter = TemplatePartiesFilter(
Map(
relation = Map(
otherTemplateId -> Some(Set(bob)),
someTemplateId -> Some(Set(alice)),
),
Some(Set.empty),
templateWildcardParties = Some(Set.empty),
),
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
)
resultPartyWildcard <- transactionsOf(
ledgerDao.transactionsReader
.getFlatTransactions(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
filter = TemplatePartiesFilter(
relation = Map(
otherTemplateId -> None,
someTemplateId -> None,
),
templateWildcardParties = Some(Set.empty),
),
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
@ -367,6 +396,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
create.witnessParties.loneElement shouldBe bob
create.templateId.value shouldBe LfEngineToApi.toApiIdentifier(otherTemplateId)
}
val eventsPartyWildcard = resultPartyWildcard.loneElement.events.toArray
eventsPartyWildcard should have length 3
}
}
@ -398,6 +429,20 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
)
resultPartyWildcard <- transactionsOf(
ledgerDao.transactionsReader
.getFlatTransactions(
startExclusive = from.lastOffset,
endInclusive = to.lastOffset,
filter = TemplatePartiesFilter(
Map(
otherTemplateId -> None
),
Some(Set(bob)),
),
eventProjectionProperties = EventProjectionProperties(verbose = true, Some(Set.empty)),
)
)
} yield {
val events = result.loneElement.events.toArray
events should have length 2
@ -409,6 +454,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
create.witnessParties.loneElement shouldBe alice
create.templateId.value shouldBe LfEngineToApi.toApiIdentifier(otherTemplateId)
}
// clear out commandId since submitter is not in the querying parties for flat transactions in the non-wildcard query
resultPartyWildcard.loneElement.copy(commandId = "") shouldBe result.loneElement
}
}

View File

@ -594,7 +594,7 @@ class ParticipantNodeBootstrap(
arguments.testingConfig,
recordSequencerInteractions,
replaySequencerConfig,
packageId => packageDependencyResolver.packageDependencies(List(packageId)),
packageDependencyResolver,
arguments.metrics.domainMetrics,
sequencerInfoLoader,
partyNotifier,

View File

@ -9,13 +9,12 @@ import cats.syntax.parallel.*
import com.daml.lf.data.Ref.PackageId
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.store.DamlPackageStore
import com.digitalasset.canton.protocol.PackageDescription
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.topology.store.PackageDependencyResolverUS
import com.digitalasset.canton.tracing.{TraceContext, TracedAsyncLoadingCache, TracedScaffeine}
import com.github.blemale.scaffeine.Scaffeine
import scala.concurrent.duration.*
@ -28,38 +27,28 @@ class PackageDependencyResolver(
)(implicit
ec: ExecutionContext
) extends NamedLogging
with FlagCloseable {
with FlagCloseable
with PackageDependencyResolverUS {
def packageDependencies(packages: List[PackageId]): EitherT[Future, PackageId, Set[PackageId]] =
packages
.parTraverse(pkgId => OptionT(dependencyCache.get(pkgId)).toRight(pkgId))
.map(_.flatten.toSet -- packages)
private val dependencyCache
: TracedAsyncLoadingCache[PackageId, Either[PackageId, Set[PackageId]]] =
TracedScaffeine.buildTracedAsyncFutureUS[PackageId, Either[PackageId, Set[PackageId]]](
cache = Scaffeine().maximumSize(10000).expireAfterAccess(15.minutes),
loader = t => p => loadPackageDependencies(p)(t).value,
allLoader = None,
)(logger)
def packageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] = {
EitherT(dependencyCache.getUS(packageId).map(_.map(_ - packageId)))
}
def getPackageDescription(packageId: PackageId)(implicit
traceContext: TraceContext
): Future[Option[PackageDescription]] = damlPackageStore.getPackageDescription(packageId)
def clearPackagesNotPreviouslyFound(): Unit = {
dependencyCache
.synchronous()
.asMap()
// .filterInPlace modifies the cache removing all packageId's from the cache that didn't exist when
// queried/cached previously.
.filterInPlace { case (_, deps) => deps.isDefined }
.discard
}
private val dependencyCache = Scaffeine()
.maximumSize(10000)
.expireAfterAccess(15.minutes)
.buildAsyncFuture[PackageId, Option[Set[PackageId]]] { packageId =>
loadPackageDependencies(packageId)(TraceContext.empty).value
.map(
// Turn a "package does not exist"-error to a None in the cache value
_.toOption
)
.onShutdown(None)
}
def clearPackagesNotPreviouslyFound(): Unit = dependencyCache.clear((_, e) => e.isLeft)
private def loadPackageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext

View File

@ -33,7 +33,7 @@ import scala.concurrent.{ExecutionContext, Future}
trait PackageOps extends NamedLogging {
def isPackageVetted(packageId: PackageId)(implicit
tc: TraceContext
): EitherT[Future, CantonError, Boolean]
): EitherT[FutureUnlessShutdown, CantonError, Boolean]
def checkPackageUnused(packageId: PackageId)(implicit
tc: TraceContext
@ -85,9 +85,10 @@ class PackageOpsImpl(
)
}
/** @return true if the authorized snapshot, or any domain snapshot has the package vetted */
override def isPackageVetted(
packageId: PackageId
)(implicit tc: TraceContext): EitherT[Future, CantonError, Boolean] = {
)(implicit tc: TraceContext): EitherT[FutureUnlessShutdown, CantonError, Boolean] = {
// Use the aliasManager to query all domains, even those that are currently disconnected
val snapshotsForDomains: List[TopologySnapshot] =
stateManager.getAll.view.keys
@ -99,10 +100,7 @@ class PackageOpsImpl(
.parTraverse { snapshot =>
snapshot
.findUnvettedPackagesOrDependencies(participantId, Set(packageId))
.map { pkgId =>
val isVetted = pkgId.isEmpty
isVetted
}
.map(_.isEmpty)
}
packageIsVettedOn.bimap(PackageMissingDependencies.Reject(packageId, _), _.contains(true))

View File

@ -137,7 +137,6 @@ class PackageService(
case true => EitherT.leftT(new PackageVetted(packageId))
case false => EitherT.rightT(())
}
.mapK(FutureUnlessShutdown.outcomeK)
for {
_ <- neededForAdminWorkflow(packageId)
@ -283,7 +282,6 @@ class PackageService(
): EitherT[FutureUnlessShutdown, CantonError, Unit] =
packageOps
.isPackageVetted(mainPkg)
.mapK(FutureUnlessShutdown.outcomeK)
.flatMap { isVetted =>
if (!isVetted)
EitherT.pure[FutureUnlessShutdown, CantonError](

View File

@ -291,7 +291,7 @@ final class RepairService(
topologySnapshot = topologyFactory.createTopologySnapshot(
startingPoints.processing.prenextTimestamp,
packageId => packageDependencyResolver.packageDependencies(List(packageId)),
packageDependencyResolver,
preferCaching = true,
)
domainParameters <- OptionT(persistentState.parameterStore.lastParameters)

View File

@ -9,7 +9,6 @@ import cats.instances.future.*
import cats.syntax.bifunctor.*
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.lf.data.Ref.PackageId
import com.digitalasset.canton.*
import com.digitalasset.canton.common.domain.SequencerConnectClient
import com.digitalasset.canton.common.domain.grpc.SequencerInfoLoader.SequencerAggregatedInfo
@ -38,6 +37,7 @@ import com.digitalasset.canton.sequencing.client.*
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.store.PackageDependencyResolverUS
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.*
import com.digitalasset.canton.util.{EitherTUtil, ResourceUtil}
@ -72,7 +72,7 @@ trait DomainRegistryHelpers extends FlagCloseable with NamedLogging { this: HasF
recordSequencerInteractions: AtomicReference[Option[RecordingConfig]],
replaySequencerConfig: AtomicReference[Option[ReplayConfig]],
topologyDispatcher: ParticipantTopologyDispatcher,
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
partyNotifier: LedgerServerPartyNotifier,
metrics: DomainAlias => SyncDomainMetrics,
participantSettings: Eval[ParticipantSettingsLookup],
@ -129,7 +129,7 @@ trait DomainRegistryHelpers extends FlagCloseable with NamedLogging { this: HasF
performUnlessClosingF("create caching client")(
topologyFactory.createCachingTopologyClient(
sequencerAggregatedInfo.staticDomainParameters.protocolVersion,
packageDependencies,
packageDependencyResolver,
)
)
)

View File

@ -4,11 +4,9 @@
package com.digitalasset.canton.participant.domain.grpc
import cats.Eval
import cats.data.EitherT
import cats.instances.future.*
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.lf.data.Ref.PackageId
import com.digitalasset.canton.*
import com.digitalasset.canton.common.domain.grpc.SequencerInfoLoader
import com.digitalasset.canton.concurrent.{FutureSupervisor, HasFutureSupervision}
@ -39,12 +37,13 @@ import com.digitalasset.canton.sequencing.{SequencerConnectionValidation, Sequen
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.store.PackageDependencyResolverUS
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.ExecutionContextExecutor
/** Domain registry used to connect to domains over GRPC
*
@ -66,7 +65,7 @@ class GrpcDomainRegistry(
testingConfig: TestingConfigInternal,
recordSequencerInteractions: AtomicReference[Option[RecordingConfig]],
replaySequencerConfig: AtomicReference[Option[ReplayConfig]],
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
metrics: DomainAlias => SyncDomainMetrics,
sequencerInfoLoader: SequencerInfoLoader,
partyNotifier: LedgerServerPartyNotifier,
@ -162,7 +161,7 @@ class GrpcDomainRegistry(
recordSequencerInteractions,
replaySequencerConfig,
topologyDispatcher,
packageDependencies,
packageDependencyResolver,
partyNotifier,
metrics,
participantSettings,

View File

@ -293,8 +293,7 @@ class StartableStoppableLedgerApiServer(
indexService.getActiveContracts(
filter = TransactionFilter(
filtersByParty = partyIds.view.map(_ -> Filters.noFilter).toMap,
filtersForAnyParty =
None, // TODO(#18362) use Some(Filters.noFilter) and remove the filtersByParty?
filtersForAnyParty = None,
alwaysPopulateCreatedEventBlob = true,
),
verbose = false,

View File

@ -972,14 +972,13 @@ class TransactionProcessingSteps(
activenessResult,
)
// The responses depend on the result of the model conformance check, and are therefore also delayed.
val responsesF = FutureUnlessShutdown.outcomeF(
val responsesF =
confirmationResponseFactory.createConfirmationResponses(
requestId,
malformedPayloads,
transactionValidationResult,
ipsSnapshot,
)
)
val pendingTransaction =
createPendingTransaction(
@ -1159,7 +1158,7 @@ class TransactionProcessingSteps(
mediator,
locallyRejectedF,
engineController.abort,
FutureUnlessShutdown.outcomeF(engineAbortStatusF),
engineAbortStatusF,
)
}
@ -1337,12 +1336,17 @@ class TransactionProcessingSteps(
def getCommitSetAndContractsToBeStoredAndEvent(
topologySnapshot: TopologySnapshot
): EitherT[Future, TransactionProcessorError, CommitAndStoreContractsAndPublishEvent] = {
): EitherT[
FutureUnlessShutdown,
TransactionProcessorError,
CommitAndStoreContractsAndPublishEvent,
] = {
val resultFE = for {
modelConformanceResultE <-
pendingRequestData.transactionValidationResult.modelConformanceResultET.value
resultET = (verdict, modelConformanceResultE) match {
// Positive verdict: we commit
case (_: Verdict.Approve, _) => handleApprovedVerdict(topologySnapshot)
@ -1351,17 +1355,22 @@ class TransactionProcessingSteps(
// a negative verdict; we then reject with the verdict, as it is the best information we have
// - otherwise, we reject with the actual error
case (reasons: Verdict.ParticipantReject, Left(error)) =>
if (error.engineAbortStatus.isAborted) rejected(reasons.keyEvent)
if (error.engineAbortStatus.isAborted)
rejected(reasons.keyEvent)
else rejectedWithModelConformanceError(error)
case (reject: Verdict.MediatorReject, Left(error)) =>
if (error.engineAbortStatus.isAborted) rejected(reject)
if (error.engineAbortStatus.isAborted)
rejected(reject)
else rejectedWithModelConformanceError(error)
// No model conformance check error: we reject with the verdict
case (reasons: Verdict.ParticipantReject, _) =>
rejected(reasons.keyEvent)
case (reject: Verdict.MediatorReject, _) =>
rejected(reject)
}
result <- resultET.value
} yield result
@ -1371,7 +1380,11 @@ class TransactionProcessingSteps(
def handleApprovedVerdict(topologySnapshot: TopologySnapshot)(implicit
traceContext: TraceContext
): EitherT[Future, TransactionProcessorError, CommitAndStoreContractsAndPublishEvent] =
): EitherT[
FutureUnlessShutdown,
TransactionProcessorError,
CommitAndStoreContractsAndPublishEvent,
] =
pendingRequestData.transactionValidationResult.modelConformanceResultET.biflatMap(
{
case ErrorWithSubTransaction(
@ -1385,7 +1398,7 @@ class TransactionProcessingSteps(
validSubTransaction,
validSubViewsNE,
topologySnapshot,
)
).mapK(FutureUnlessShutdown.outcomeK)
case error =>
// There is no valid subview
@ -1397,7 +1410,7 @@ class TransactionProcessingSteps(
pendingRequestData,
completionInfoO,
modelConformanceResult,
)
).mapK(FutureUnlessShutdown.outcomeK)
},
)
@ -1408,34 +1421,51 @@ class TransactionProcessingSteps(
.toLocalReject(protocolVersion)
)
def rejected(rejection: TransactionRejection) = {
for {
def rejected(
rejection: TransactionRejection
): EitherT[
FutureUnlessShutdown,
TransactionProcessorError,
CommitAndStoreContractsAndPublishEvent,
] = {
(for {
event <- EitherT.fromEither[Future](
createRejectionEvent(RejectionArgs(pendingRequestData, rejection))
)
} yield CommitAndStoreContractsAndPublishEvent(None, Seq(), event)
} yield CommitAndStoreContractsAndPublishEvent(None, Seq(), event))
.mapK(FutureUnlessShutdown.outcomeK)
}
for {
topologySnapshot <- EitherT.right[TransactionProcessorError](
crypto.ips.awaitSnapshot(pendingRequestData.requestTime)
)
topologySnapshot <- EitherT
.right[TransactionProcessorError](
crypto.ips.awaitSnapshot(pendingRequestData.requestTime)
)
.mapK(FutureUnlessShutdown.outcomeK)
maxDecisionTime <- ProcessingSteps
.getDecisionTime(topologySnapshot, pendingRequestData.requestTime)
.leftMap(DomainParametersError(domainId, _))
_ <-
if (ts <= maxDecisionTime) EitherT.pure[Future, TransactionProcessorError](())
else
EitherT.right[TransactionProcessorError](
Future.failed(new IllegalArgumentException("Timeout message after decision time"))
)
.mapK(FutureUnlessShutdown.outcomeK)
_ <-
(if (ts <= maxDecisionTime) EitherT.pure[Future, TransactionProcessorError](())
else
EitherT.right[TransactionProcessorError](
Future.failed(new IllegalArgumentException("Timeout message after decision time"))
)).mapK(FutureUnlessShutdown.outcomeK)
resultTopologySnapshot <- EitherT
.right[TransactionProcessorError](
crypto.ips.awaitSnapshotUS(ts)
)
mediatorActiveAtResultTs <- EitherT
.right[TransactionProcessorError](
resultTopologySnapshot.isMediatorActive(pendingRequestData.mediator)
)
.mapK(FutureUnlessShutdown.outcomeK)
resultTopologySnapshot <- EitherT.right[TransactionProcessorError](
crypto.ips.awaitSnapshot(ts)
)
mediatorActiveAtResultTs <- EitherT.right[TransactionProcessorError](
resultTopologySnapshot.isMediatorActive(pendingRequestData.mediator)
)
res <-
if (mediatorActiveAtResultTs) getCommitSetAndContractsToBeStoredAndEvent(topologySnapshot)
else {
@ -1451,7 +1481,7 @@ class TransactionProcessingSteps(
)
}
} yield res
}.mapK(FutureUnlessShutdown.outcomeK)
}
override def postProcessResult(verdict: Verdict, pendingSubmission: Nothing)(implicit
traceContext: TraceContext
@ -1518,7 +1548,7 @@ object TransactionProcessingSteps {
consistencyResultE: Either[List[ReferenceToFutureContractError], Unit],
authorizationResult: Map[ViewPosition, String],
conformanceResultET: EitherT[
Future,
FutureUnlessShutdown,
ModelConformanceChecker.ErrorWithSubTransaction,
ModelConformanceChecker.Result,
],

View File

@ -114,7 +114,6 @@ class TransactionConfirmationRequestFactory(
validatePackageVettings = true,
)
.leftMap(TransactionTreeFactoryError)
.mapK(FutureUnlessShutdown.outcomeK)
rootViews = transactionTree.rootViews.unblindedElements.toList
inputContracts = ExtractUsedContractsFromRootViews(rootViews)

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.data.{
ViewPosition,
}
import com.digitalasset.canton.ledger.participant.state.SubmitterInfo
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.{
SerializableContractOfId,
@ -52,7 +53,7 @@ trait TransactionTreeFactory {
validatePackageVettings: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[Future, TransactionTreeConversionError, GenTransactionTree]
): EitherT[FutureUnlessShutdown, TransactionTreeConversionError, GenTransactionTree]
/** Reconstructs a transaction view from a reinterpreted action description,
* using the supplied salts.

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.crypto.{HashOps, HmacOps, Salt, SaltSeed}
import com.digitalasset.canton.data.ViewConfirmationParameters.InvalidViewConfirmationParameters
import com.digitalasset.canton.data.*
import com.digitalasset.canton.ledger.participant.state.SubmitterInfo
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.*
import com.digitalasset.canton.protocol.ContractIdSyntax.*
@ -94,7 +95,7 @@ abstract class TransactionTreeFactoryImpl(
validatePackageVettings: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[Future, TransactionTreeConversionError, GenTransactionTree] = {
): EitherT[FutureUnlessShutdown, TransactionTreeConversionError, GenTransactionTree] = {
val metadata = transaction.metadata
val state = stateForSubmission(
transactionSeed,
@ -138,22 +139,24 @@ abstract class TransactionTreeFactoryImpl(
)
for {
submitterMetadata <- EitherT.fromEither[Future](
SubmitterMetadata
.fromSubmitterInfo(cryptoOps)(
submitterActAs = submitterInfo.actAs,
submitterApplicationId = submitterInfo.applicationId,
submitterCommandId = submitterInfo.commandId,
submitterSubmissionId = submitterInfo.submissionId,
submitterDeduplicationPeriod = submitterInfo.deduplicationPeriod,
submittingParticipant = participantId,
salt = submitterMetadataSalt,
maxSequencingTime,
protocolVersion = protocolVersion,
)
.leftMap(SubmitterMetadataError)
)
rootViewDecompositions <- EitherT.liftF(rootViewDecompositionsF)
submitterMetadata <- SubmitterMetadata
.fromSubmitterInfo(cryptoOps)(
submitterActAs = submitterInfo.actAs,
submitterApplicationId = submitterInfo.applicationId,
submitterCommandId = submitterInfo.commandId,
submitterSubmissionId = submitterInfo.submissionId,
submitterDeduplicationPeriod = submitterInfo.deduplicationPeriod,
submittingParticipant = participantId,
salt = submitterMetadataSalt,
maxSequencingTime,
protocolVersion = protocolVersion,
)
.leftMap(SubmitterMetadataError)
.toEitherT[FutureUnlessShutdown]
rootViewDecompositions <- EitherT
.liftF(rootViewDecompositionsF)
.mapK(FutureUnlessShutdown.outcomeK)
_ = if (logger.underlying.isDebugEnabled) {
val numRootViews = rootViewDecompositions.length
@ -172,7 +175,7 @@ abstract class TransactionTreeFactoryImpl(
requiredPackagesByParty = requiredPackagesByParty(rootViewDecompositions),
)
.leftMap(_.transformInto[UnknownPackageError])
else EitherT.rightT[Future, TransactionTreeConversionError](())
else EitherT.rightT[FutureUnlessShutdown, TransactionTreeConversionError](())
rootViews <- createRootViews(rootViewDecompositions, state, contractOfId)
.map(rootViews =>
@ -183,6 +186,7 @@ abstract class TransactionTreeFactoryImpl(
MerkleSeq.fromSeq(cryptoOps, protocolVersion)(rootViews),
)
)
.mapK(FutureUnlessShutdown.outcomeK)
} yield rootViews
}

View File

@ -8,12 +8,13 @@ import cats.syntax.bifunctor.*
import cats.syntax.parallel.*
import com.daml.lf.transaction.TransactionVersion
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.lifecycle.FutureUnlessShutdownImpl.AbortedDueToShutdownException
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.PackageUnknownTo
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.{DomainId, ParticipantId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.EitherTUtil
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.version.{DamlLfVersionToProtocolVersions, ProtocolVersion}
import com.digitalasset.canton.{LfPackageId, LfPartyId}
@ -34,6 +35,7 @@ object UsableDomain {
val packageVetted: EitherT[Future, UnknownPackage, Unit] =
resolveParticipantsAndCheckPackagesVetted(domainId, snapshot, requiredPackagesByParty)
.failOnShutdownTo(AbortedDueToShutdownException("Usable domain checking"))
val partiesConnected: EitherT[Future, MissingActiveParticipant, Unit] =
checkConnectedParties(domainId, snapshot, requiredPackagesByParty.keySet)
val compatibleProtocolVersion: EitherT[Future, UnsupportedMinimumProtocolVersion, Unit] =
@ -63,7 +65,10 @@ object UsableDomain {
private def unknownPackages(snapshot: TopologySnapshot)(
participantIdAndRequiredPackages: (ParticipantId, Set[LfPackageId])
)(implicit ec: ExecutionContext, tc: TraceContext): Future[List[PackageUnknownTo]] = {
)(implicit
ec: ExecutionContext,
tc: TraceContext,
): FutureUnlessShutdown[List[PackageUnknownTo]] = {
val (participantId, required) = participantIdAndRequiredPackages
snapshot.findUnvettedPackagesOrDependencies(participantId, required).value.map {
case Right(notVetted) =>
@ -121,19 +126,21 @@ object UsableDomain {
)(implicit
ec: ExecutionContext,
tc: TraceContext,
): EitherT[Future, UnknownPackage, Unit] =
resolveParticipants(snapshot, requiredPackagesByParty).flatMap(
checkPackagesVetted(domainId, snapshot, _)
)
): EitherT[FutureUnlessShutdown, UnknownPackage, Unit] =
resolveParticipants(snapshot, requiredPackagesByParty)
.mapK(FutureUnlessShutdown.outcomeK)
.flatMap(
checkPackagesVetted(domainId, snapshot, _)
)
def checkPackagesVetted(
private def checkPackagesVetted(
domainId: DomainId,
snapshot: TopologySnapshot,
requiredPackages: Map[ParticipantId, Set[LfPackageId]],
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): EitherT[Future, UnknownPackage, Unit] =
): EitherT[FutureUnlessShutdown, UnknownPackage, Unit] =
EitherT(
requiredPackages.toList
.parFlatTraverse(unknownPackages(snapshot))

View File

@ -37,7 +37,6 @@ private[transfer] object TransferKnownAndVetted {
targetTopology,
stakeholders.view.map(_ -> Set(packageId)).toMap,
)
.mapK(FutureUnlessShutdown.outcomeK)
.leftMap(unknownPackage =>
TransferOutProcessorError.PackageIdUnknownOrUnvetted(contractId, unknownPackage.unknownTo)
)

View File

@ -4,6 +4,7 @@
package com.digitalasset.canton.participant.protocol.validation
import cats.data.EitherT
import cats.implicits.toTraverseOps
import cats.syntax.alternative.*
import cats.syntax.bifunctor.*
import cats.syntax.functor.*
@ -17,6 +18,7 @@ import com.digitalasset.canton.data.{
TransactionView,
ViewPosition,
}
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.protocol.EngineController.{
@ -116,7 +118,9 @@ class ModelConformanceChecker(
topologySnapshot: TopologySnapshot,
commonData: CommonData,
getEngineAbortStatus: GetEngineAbortStatus,
)(implicit traceContext: TraceContext): EitherT[Future, ErrorWithSubTransaction, Result] = {
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ErrorWithSubTransaction, Result] = {
val CommonData(transactionId, ledgerTime, submissionTime, confirmationPolicy) = commonData
// Previous checks in Phase 3 ensure that all the root views are sent to the same
@ -127,7 +131,7 @@ class ModelConformanceChecker(
def findValidSubtransactions(
views: Seq[(TransactionView, ViewPosition, Option[ParticipantId])]
): Future[
): FutureUnlessShutdown[
(
Seq[Error],
Seq[(TransactionView, WithRollbackScope[WellFormedTransaction[WithSuffixes]])],
@ -151,11 +155,11 @@ class ModelConformanceChecker(
).value
errorsViewsTxs <- wfTxE match {
case Right(wfTx) => Future.successful((Seq.empty, Seq((view, wfTx))))
case Right(wfTx) => FutureUnlessShutdown.pure((Seq.empty, Seq((view, wfTx))))
// There is no point in checking subviews if we have aborted
case Left(error @ DAMLeError(DAMLe.EngineAborted(_), _)) =>
Future.successful((Seq(error), Seq.empty))
FutureUnlessShutdown.pure((Seq(error), Seq.empty))
case Left(error) =>
val subviewsWithInfo = view.subviews.unblindedElementsWithIndex.map {
@ -285,7 +289,9 @@ class ModelConformanceChecker(
getEngineAbortStatus: GetEngineAbortStatus,
)(implicit
traceContext: TraceContext
): EitherT[Future, Error, WithRollbackScope[WellFormedTransaction[WithSuffixes]]] = {
): EitherT[FutureUnlessShutdown, Error, WithRollbackScope[
WellFormedTransaction[WithSuffixes]
]] = {
val viewParticipantData = view.viewParticipantData.tryUnwrap
val RootAction(cmd, authorizers, failed, packageIdPreference) =
@ -294,8 +300,13 @@ class ModelConformanceChecker(
val rbContext = viewParticipantData.rollbackContext
val seed = viewParticipantData.actionDescription.seedOption
for {
viewInputContracts <- validateInputContracts(view, requestCounter, getEngineAbortStatus)
viewInputContracts <- validateInputContracts(view, requestCounter, getEngineAbortStatus).mapK(
FutureUnlessShutdown.outcomeK
)
_ <- validatePackageVettings(view, topologySnapshot)
contractLookupAndVerification =
new ExtendedContractLookup(
// all contracts and keys specified explicitly
@ -305,7 +316,9 @@ class ModelConformanceChecker(
serializableContractAuthenticator,
)
packagePreference <- buildPackageNameMap(packageIdPreference)
packagePreference <- buildPackageNameMap(packageIdPreference).mapK(
FutureUnlessShutdown.outcomeK
)
lfTxAndMetadata <- reinterpret(
contractLookupAndVerification,
@ -319,21 +332,24 @@ class ModelConformanceChecker(
traceContext,
packagePreference,
getEngineAbortStatus,
)
.leftWiden[Error]
).leftWiden[Error].mapK(FutureUnlessShutdown.outcomeK)
(lfTx, metadata, resolverFromReinterpretation) = lfTxAndMetadata
// For transaction views of protocol version 3 or higher,
// the `resolverFromReinterpretation` is the same as the `resolverFromView`.
// The `TransactionTreeFactoryImplV3` rebuilds the `resolverFromReinterpretation`
// again by re-running the `ContractStateMachine` and checks consistency
// with the reconstructed view's global key inputs,
// which by the view equality check is the same as the `resolverFromView`.
wfTx <- EitherT
.fromEither[Future](
WellFormedTransaction.normalizeAndCheck(lfTx, metadata, WithoutSuffixes)
)
wfTxE = WellFormedTransaction
.normalizeAndCheck(lfTx, metadata, WithoutSuffixes)
.leftMap[Error](err => TransactionNotWellFormed(err, view.viewHash))
wfTx <- EitherT(FutureUnlessShutdown.pure(wfTxE))
salts = transactionTreeFactory.saltsFromView(view)
reconstructedViewAndTx <- checked(
transactionTreeFactory.tryReconstruct(
subaction = wfTx,
@ -349,9 +365,11 @@ class ModelConformanceChecker(
TransactionTreeFactory.contractInstanceLookup(contractLookupAndVerification),
keyResolver = resolverFromReinterpretation,
)
).leftMap(err => TransactionTreeError(err, view.viewHash))
).leftMap(err => TransactionTreeError(err, view.viewHash)).mapK(FutureUnlessShutdown.outcomeK)
(reconstructedView, suffixedTx) = reconstructedViewAndTx
_ <- EitherT.cond[Future](
_ <- EitherT.cond[FutureUnlessShutdown](
view == reconstructedView,
(),
ViewReconstructionError(view, reconstructedView): Error,
@ -362,7 +380,7 @@ class ModelConformanceChecker(
private def validatePackageVettings(view: TransactionView, snapshot: TopologySnapshot)(implicit
traceContext: TraceContext
): EitherT[Future, Error, Unit] = {
): EitherT[FutureUnlessShutdown, Error, Unit] = {
val referencedContracts =
(view.inputContracts.fmap(_.contract) ++ view.createdContracts.fmap(_.contract)).values.toSet
val packageIdsOfContracts =
@ -375,11 +393,15 @@ class ModelConformanceChecker(
val informees = view.viewCommonData.tryUnwrap.viewConfirmationParameters.informees
EitherT(for {
informeeParticipantsByParty <- snapshot.activeParticipantsOfParties(informees.toSeq)
informeeParticipantsByParty <- FutureUnlessShutdown.outcomeF(
snapshot.activeParticipantsOfParties(informees.toSeq)
)
informeeParticipants = informeeParticipantsByParty.values.flatten.toSet
unvettedResult <- informeeParticipants.toSeq
.parTraverse(p => snapshot.findUnvettedPackagesOrDependencies(p, packageIds).map(p -> _))
.value
.parTraverse(p =>
snapshot.findUnvettedPackagesOrDependencies(p, packageIds).map(p -> _).value
)
.map(_.sequence)
unvettedPackages = unvettedResult match {
case Left(packageId) =>
// The package is not in the store and thus the package is not vetted.

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.participant.protocol.validation
import cats.syntax.parallel.*
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.error.TransactionError
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.{
MalformedPayload,
@ -41,7 +42,7 @@ class TransactionConfirmationResponseFactory(
)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): Future[Seq[ConfirmationResponse]] = {
): FutureUnlessShutdown[Seq[ConfirmationResponse]] = {
def hostedConfirmingPartiesOfView(
viewValidationResult: ViewValidationResult
@ -133,11 +134,13 @@ class TransactionConfirmationResponseFactory(
def responsesForWellformedPayloads(
transactionValidationResult: TransactionValidationResult
): Future[Seq[ConfirmationResponse]] =
): FutureUnlessShutdown[Seq[ConfirmationResponse]] =
transactionValidationResult.viewValidationResults.toSeq.parTraverseFilter {
case (viewPosition, viewValidationResult) =>
for {
hostedConfirmingParties <- hostedConfirmingPartiesOfView(viewValidationResult)
hostedConfirmingParties <- FutureUnlessShutdown.outcomeF(
hostedConfirmingPartiesOfView(viewValidationResult)
)
modelConformanceResultE <- transactionValidationResult.modelConformanceResultET.value
} yield {
@ -269,7 +272,7 @@ class TransactionConfirmationResponseFactory(
}
if (malformedPayloads.nonEmpty) {
Future.successful(
FutureUnlessShutdown.pure(
Seq(
createConfirmationResponsesForMalformedPayloads(
requestId,

View File

@ -5,6 +5,7 @@ package com.digitalasset.canton.participant.protocol.validation
import cats.data.EitherT
import com.digitalasset.canton.data.{SubmitterMetadata, ViewPosition}
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.ErrorLoggingContext
import com.digitalasset.canton.participant.protocol.conflictdetection.{ActivenessResult, CommitSet}
import com.digitalasset.canton.participant.protocol.validation.ContractConsistencyChecker.ReferenceToFutureContractError
@ -13,8 +14,6 @@ import com.digitalasset.canton.participant.protocol.validation.TimeValidator.Tim
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.{LfPartyId, WorkflowId}
import scala.concurrent.Future
final case class TransactionValidationResult(
transactionId: TransactionId,
confirmationPolicy: ConfirmationPolicy,
@ -24,7 +23,7 @@ final case class TransactionValidationResult(
authenticationResult: Map[ViewPosition, String],
authorizationResult: Map[ViewPosition, String],
modelConformanceResultET: EitherT[
Future,
FutureUnlessShutdown,
ModelConformanceChecker.ErrorWithSubTransaction,
ModelConformanceChecker.Result,
],

View File

@ -3,8 +3,6 @@
package com.digitalasset.canton.participant.topology
import cats.data.EitherT
import com.daml.lf.data.Ref.PackageId
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.config.{
BatchingConfig,
@ -23,8 +21,8 @@ import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.topology.client.*
import com.digitalasset.canton.topology.processing.{EffectiveTime, TopologyTransactionProcessor}
import com.digitalasset.canton.topology.store.TopologyStore
import com.digitalasset.canton.topology.store.TopologyStoreId.DomainStore
import com.digitalasset.canton.topology.store.{PackageDependencyResolverUS, TopologyStore}
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.digitalasset.canton.version.ProtocolVersion
@ -79,14 +77,14 @@ class TopologyComponentFactory(
def createTopologyClient(
protocolVersion: ProtocolVersion,
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
)(implicit executionContext: ExecutionContext): DomainTopologyClientWithInit =
new StoreBasedDomainTopologyClient(
clock,
domainId,
protocolVersion,
topologyStore,
packageDependencies,
packageDependencyResolver,
timeouts,
futureSupervisor,
loggerFactory,
@ -94,7 +92,7 @@ class TopologyComponentFactory(
def createCachingTopologyClient(
protocolVersion: ProtocolVersion,
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
)(implicit
executionContext: ExecutionContext,
traceContext: TraceContext,
@ -103,7 +101,7 @@ class TopologyComponentFactory(
domainId,
protocolVersion,
topologyStore,
packageDependencies,
packageDependencyResolver,
caching,
batching,
timeouts,
@ -113,13 +111,13 @@ class TopologyComponentFactory(
def createTopologySnapshot(
asOf: CantonTimestamp,
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependencyResolver: PackageDependencyResolverUS,
preferCaching: Boolean,
)(implicit executionContext: ExecutionContext): TopologySnapshot = {
val snapshot = new StoreBasedTopologySnapshot(
asOf,
topologyStore,
packageDependencies,
packageDependencyResolver,
loggerFactory,
)
if (preferCaching) {

View File

@ -25,7 +25,7 @@ class PackageOpsForTesting(
override def isPackageVetted(packageId: PackageId)(implicit
tc: TraceContext
): EitherT[Future, CantonError, Boolean] =
): EitherT[FutureUnlessShutdown, CantonError, Boolean] =
EitherT.rightT(false)
override def checkPackageUnused(packageId: PackageId)(implicit

View File

@ -57,19 +57,19 @@ trait PackageOpsTestBase extends AsyncWordSpec with BaseTest with ArgumentMatche
"head authorized store has the package vetted" in withTestSetup { env =>
import env.*
unvettedPackagesForSnapshots(Set.empty, Set(pkgId1))
packageOps.isPackageVetted(pkgId1).map(_ shouldBe true)
packageOps.isPackageVetted(pkgId1).failOnShutdown.map(_ shouldBe true)
}
"one domain topology snapshot has the package vetted" in withTestSetup { env =>
import env.*
unvettedPackagesForSnapshots(Set(pkgId1), Set.empty)
packageOps.isPackageVetted(pkgId1).map(_ shouldBe true)
packageOps.isPackageVetted(pkgId1).failOnShutdown.map(_ shouldBe true)
}
"all topology snapshots have the package vetted" in withTestSetup { env =>
import env.*
unvettedPackagesForSnapshots(Set.empty, Set.empty)
packageOps.isPackageVetted(pkgId1).map(_ shouldBe true)
packageOps.isPackageVetted(pkgId1).failOnShutdown.map(_ shouldBe true)
}
}
@ -77,7 +77,7 @@ trait PackageOpsTestBase extends AsyncWordSpec with BaseTest with ArgumentMatche
"all topology snapshots have the package unvetted" in withTestSetup { env =>
import env.*
unvettedPackagesForSnapshots(Set(pkgId1), Set(pkgId1))
packageOps.isPackageVetted(pkgId1).map(_ shouldBe false)
packageOps.isPackageVetted(pkgId1).failOnShutdown.map(_ shouldBe false)
}
}
@ -99,6 +99,7 @@ trait PackageOpsTestBase extends AsyncWordSpec with BaseTest with ArgumentMatche
packageOps
.isPackageVetted(pkgId1)
.failOnShutdown
.leftOrFail("missing package id")
.map(_ shouldBe PackageMissingDependencies.Reject(pkgId1, missingPkgId))
}

View File

@ -194,7 +194,7 @@ class PackageServiceTest
val dar = PackageServiceTest.loadExampleDar()
val mainPackageId = DamlPackageStore.readPackageId(dar.main)
val dependencyIds = com.daml.lf.archive.Decode.assertDecodeArchive(dar.main)._2.directDeps
for {
(for {
_ <- sut
.upload(
darBytes = ByteString.copyFrom(bytes),
@ -204,8 +204,7 @@ class PackageServiceTest
synchronizeVetting = false,
)
.valueOrFail("appending dar")
.failOnShutdown
deps <- packageDependencyResolver.packageDependencies(List(mainPackageId)).value
deps <- packageDependencyResolver.packageDependencies(mainPackageId).value
} yield {
// test for explict dependencies
deps match {
@ -214,7 +213,7 @@ class PackageServiceTest
// all direct dependencies should be part of this
(dependencyIds -- loaded) shouldBe empty
}
}
}).unwrap.map(_.failOnShutdown)
}
"validateDar validates the package" in withEnv { env =>

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.crypto.provider.symbolic.{SymbolicCrypto, Symboli
import com.digitalasset.canton.data.ViewType.TransactionViewType
import com.digitalasset.canton.data.*
import com.digitalasset.canton.ledger.participant.state.SubmitterInfo
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.LogEntry
import com.digitalasset.canton.participant.DefaultParticipantStateValues
import com.digitalasset.canton.participant.protocol.submission.EncryptedViewMessageFactory.UnableToDetermineParticipant
@ -133,7 +134,7 @@ class TransactionConfirmationRequestFactoryTest
validatePackageVettings: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[Future, TransactionTreeConversionError, GenTransactionTree] = {
): EitherT[FutureUnlessShutdown, TransactionTreeConversionError, GenTransactionTree] = {
val actAs = submitterInfo.actAs.toSet
if (actAs != Set(ExampleTransactionFactory.submitter))
fail(

View File

@ -4,9 +4,10 @@
package com.digitalasset.canton.participant.protocol.submission
import cats.data.EitherT
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.data.Ref.{IdString, PackageId}
import com.digitalasset.canton.*
import com.digitalasset.canton.data.GenTransactionTree
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.participant.DefaultParticipantStateValues
import com.digitalasset.canton.participant.protocol.submission.TransactionTreeFactory.*
import com.digitalasset.canton.protocol.ExampleTransactionFactory.{
@ -16,6 +17,8 @@ import com.digitalasset.canton.protocol.ExampleTransactionFactory.{
import com.digitalasset.canton.protocol.WellFormedTransaction.WithoutSuffixes
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.store.PackageDependencyResolverUS
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import org.scalatest.wordspec.AsyncWordSpec
@ -58,20 +61,22 @@ final class TransactionTreeFactoryImplTest extends AsyncWordSpec with BaseTest {
snapshot: TopologySnapshot = factory.topologySnapshot,
): EitherT[Future, TransactionTreeConversionError, GenTransactionTree] = {
val submitterInfo = DefaultParticipantStateValues.submitterInfo(actAs)
treeFactory.createTransactionTree(
transaction,
submitterInfo,
factory.confirmationPolicy,
Some(WorkflowId.assertFromString("testWorkflowId")),
factory.mediatorGroup,
factory.transactionSeed,
factory.transactionUuid,
snapshot,
contractInstanceOfId,
keyResolver,
factory.ledgerTime.plusSeconds(100),
validatePackageVettings = true,
)
treeFactory
.createTransactionTree(
transaction,
submitterInfo,
factory.confirmationPolicy,
Some(WorkflowId.assertFromString("testWorkflowId")),
factory.mediatorGroup,
factory.transactionSeed,
factory.transactionUuid,
snapshot,
contractInstanceOfId,
keyResolver,
factory.ledgerTime.plusSeconds(100),
validatePackageVettings = true,
)
.failOnShutdown
}
"TransactionTreeFactoryImpl@testedVersion" should {
@ -139,7 +144,6 @@ final class TransactionTreeFactoryImplTest extends AsyncWordSpec with BaseTest {
"checking package vettings" must {
lazy val treeFactory = createTransactionTreeFactory(testedProtocolVersion)
lazy val banana = PackageId.assertFromString("banana")
"fail if the main package is not vetted" in {
val example = factory.standardHappyCases(2)
createTransactionTree(
@ -160,17 +164,12 @@ final class TransactionTreeFactoryImplTest extends AsyncWordSpec with BaseTest {
successfulLookup(example),
example.keyResolver,
snapshot = defaultTestingIdentityFactory.topologySnapshot(
packageDependencies = x =>
EitherT.rightT(
if (x == ExampleTransactionFactory.packageId)
Set(banana)
else Set.empty[PackageId]
)
packageDependencyResolver = TestPackageDependencyResolver
),
).value
} yield inside(err) { case Left(UnknownPackageError(unknownTo)) =>
forEvery(unknownTo) {
_.packageId shouldBe banana
_.packageId shouldBe TestPackageDependencyResolver.exampleDependency
}
unknownTo should not be empty
}
@ -185,17 +184,44 @@ final class TransactionTreeFactoryImplTest extends AsyncWordSpec with BaseTest {
successfulLookup(example),
example.keyResolver,
snapshot = defaultTestingIdentityFactory.topologySnapshot(
packageDependencies = x =>
if (x == ExampleTransactionFactory.packageId)
EitherT.leftT(banana)
else EitherT.rightT(Set.empty[PackageId])
packageDependencyResolver = MisconfiguredPackageDependencyResolver
),
).value
} yield inside(err) { case Left(UnknownPackageError(unknownTo)) =>
unknownTo should not be empty
} yield {
inside(err) { case Left(UnknownPackageError(unknownTo)) =>
forEvery(unknownTo) {
_.packageId shouldBe ExampleTransactionFactory.packageId
}
unknownTo should not be empty
}
}
}
}
}
}
object TestPackageDependencyResolver extends PackageDependencyResolverUS {
import cats.syntax.either.*
val exampleDependency: IdString.PackageId = PackageId.assertFromString("example-dependency")
override def packageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] = {
packageId match {
case ExampleTransactionFactory.packageId =>
Right(Set(exampleDependency)).toEitherT[FutureUnlessShutdown]
case _ => Right(Set.empty[PackageId]).toEitherT[FutureUnlessShutdown]
}
}
}
object MisconfiguredPackageDependencyResolver extends PackageDependencyResolverUS {
import cats.syntax.either.*
val exampleDependency: IdString.PackageId = PackageId.assertFromString("example-dependency")
override def packageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] = {
Left(packageId).toEitherT[FutureUnlessShutdown]
}
}
}

View File

@ -17,6 +17,7 @@ import com.digitalasset.canton.data.{
FullTransactionViewTree,
TransactionView,
}
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.participant.protocol.EngineController.{
EngineAbortStatus,
@ -33,6 +34,7 @@ import com.digitalasset.canton.participant.util.DAMLe.{EngineError, PackageResol
import com.digitalasset.canton.protocol.ExampleTransactionFactory.{lfHash, submittingParticipant}
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.store.PackageDependencyResolverUS
import com.digitalasset.canton.topology.transaction.VettedPackages
import com.digitalasset.canton.topology.{TestingIdentityFactory, TestingTopology}
import com.digitalasset.canton.tracing.TraceContext
@ -157,14 +159,16 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
val rootViewTrees = views.map(_._1)
val commonData = TransactionProcessingSteps.tryCommonData(rootViewTrees)
val keyResolvers = views.forgetNE.flatMap { case (_vt, resolvers) => resolvers }.toMap
mcc.check(
rootViewTrees,
keyResolvers,
RequestCounter(0),
ips,
commonData,
getEngineAbortStatus = () => EngineAbortStatus.notAborted,
)
mcc
.check(
rootViewTrees,
keyResolvers,
RequestCounter(0),
ips,
commonData,
getEngineAbortStatus = () => EngineAbortStatus.notAborted,
)
.failOnShutdown
}
val packageName: LfPackageName = PackageName.assertFromString("package-name")
@ -382,7 +386,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
NonEmpty.from(factory.SingleCreate(lfHash(0)).rootTransactionViewTrees).value,
// The package is not vetted for signatoryParticipant
vettings = Seq(VettedPackages(submittingParticipant, None, Seq(packageId))),
packageDependenciesLookup = _ => EitherT.rightT(Set()),
packageDependenciesLookup = new TestPackageResolver(Right(Set.empty)),
expectedError = UnvettedPackages(Map(signatoryParticipant -> Set(packageId))),
)
}
@ -410,7 +414,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
NonEmpty(Seq, viewTree),
// The package is not vetted for submittingParticipant
vettings = Seq.empty,
packageDependenciesLookup = _ => EitherT.rightT(Set()),
packageDependenciesLookup = new TestPackageResolver(Right(Set.empty)),
expectedError = UnvettedPackages(Map(submittingParticipant -> Set(key.packageId.value))),
)
}
@ -419,7 +423,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
def testVettingError(
rootViewTrees: NonEmpty[Seq[FullTransactionViewTree]],
vettings: Seq[VettedPackages],
packageDependenciesLookup: PackageId => EitherT[Future, PackageId, Set[PackageId]],
packageDependenciesLookup: PackageDependencyResolverUS,
expectedError: UnvettedPackages,
): Future[Assertion] = {
import ExampleTransactionFactory.*
@ -440,7 +444,7 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
.withPackages(vettings.map(vetting => vetting.participantId -> vetting.packageIds).toMap),
loggerFactory,
TestDomainParameters.defaultDynamic,
).topologySnapshot(packageDependencies = packageDependenciesLookup)
).topologySnapshot(packageDependencyResolver = packageDependenciesLookup)
for {
error <- check(sut, viewsWithNoInputKeys(rootViewTrees), snapshot).value
@ -465,10 +469,21 @@ class ModelConformanceCheckerTest extends AsyncWordSpec with BaseTest {
// Submitter participant is unable to lookup dependencies.
// Therefore, the validation concludes that the package is not in the store
// and thus that the package is not vetted.
packageDependenciesLookup = EitherT.leftT(_),
packageDependenciesLookup = new TestPackageResolver(Left(packageId)),
expectedError = UnvettedPackages(Map(submittingParticipant -> Set(packageId))),
)
}
}
}
class TestPackageResolver(result: Either[PackageId, Set[PackageId]])
extends PackageDependencyResolverUS {
import cats.syntax.either.*
override def packageDependencies(packageId: PackageId)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, PackageId, Set[PackageId]] = {
result.toEitherT
}
}
}

View File

@ -111,7 +111,7 @@ class QueueBasedDomainOutboxTest
domainId,
protocolVersion = testedProtocolVersion,
store = target,
packageDependencies = StoreBasedDomainTopologyClient.NoPackageDependencies,
packageDependenciesResolver = StoreBasedDomainTopologyClient.NoPackageDependencies,
timeouts = timeouts,
futureSupervisor = futureSupervisor,
loggerFactory = loggerFactory,

View File

@ -97,7 +97,7 @@ class StoreBasedDomainOutboxTest
domainId,
protocolVersion = testedProtocolVersion,
store = target,
packageDependencies = StoreBasedDomainTopologyClient.NoPackageDependencies,
packageDependenciesResolver = StoreBasedDomainTopologyClient.NoPackageDependencies,
timeouts = timeouts,
futureSupervisor = futureSupervisor,
loggerFactory = loggerFactory,

View File

@ -3,10 +3,8 @@
package com.digitalasset.canton.topology
import cats.data.EitherT
import cats.syntax.either.*
import cats.syntax.functor.*
import com.daml.lf.data.Ref.PackageId
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.BaseTest.{
defaultStaticDomainParameters,
@ -34,7 +32,11 @@ import com.digitalasset.canton.topology.DefaultTestIdentities.*
import com.digitalasset.canton.topology.client.*
import com.digitalasset.canton.topology.processing.{EffectiveTime, SequencedTime}
import com.digitalasset.canton.topology.store.memory.InMemoryTopologyStore
import com.digitalasset.canton.topology.store.{TopologyStoreId, ValidatedTopologyTransaction}
import com.digitalasset.canton.topology.store.{
PackageDependencyResolverUS,
TopologyStoreId,
ValidatedTopologyTransaction,
}
import com.digitalasset.canton.topology.transaction.TopologyChangeOp.Remove
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.{NoTracing, TraceContext}
@ -352,7 +354,7 @@ class TestingIdentityFactory(
def topologySnapshot(
domainId: DomainId = DefaultTestIdentities.domainId,
packageDependencies: PackageId => EitherT[Future, PackageId, Set[PackageId]] =
packageDependencyResolver: PackageDependencyResolverUS =
StoreBasedDomainTopologyClient.NoPackageDependencies,
timestampForDomainParameters: CantonTimestamp = CantonTimestamp.Epoch,
): TopologySnapshot = {
@ -432,7 +434,7 @@ class TestingIdentityFactory(
new StoreBasedTopologySnapshot(
CantonTimestamp.Epoch,
store,
packageDependencies,
packageDependencyResolver,
loggerFactory,
)
}

View File

@ -1 +1 @@
20240528.13380.v10deea37
20240529.13387.v44c5d50b