update canton to 20240703.13595.v030861a1 (#19504)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-07-04 10:19:46 +02:00 committed by GitHub
parent 02abf338ec
commit 178358b6fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 162 additions and 59 deletions

View File

@ -12,5 +12,7 @@ service EnterpriseParticipantReplicationService {
message SetPassive { message SetPassive {
message Request {} message Request {}
message Response {} message Response {
bool became_passive = 1;
}
} }

View File

@ -1841,7 +1841,7 @@ object ParticipantAdminCommands {
object Replication { object Replication {
final case class SetPassiveCommand() final case class SetPassiveCommand()
extends GrpcAdminCommand[SetPassive.Request, SetPassive.Response, Unit] { extends GrpcAdminCommand[SetPassive.Request, SetPassive.Response, Boolean] {
override type Svc = EnterpriseParticipantReplicationServiceStub override type Svc = EnterpriseParticipantReplicationServiceStub
override def createService( override def createService(
@ -1858,9 +1858,9 @@ object ParticipantAdminCommands {
): Future[SetPassive.Response] = ): Future[SetPassive.Response] =
service.setPassive(request) service.setPassive(request)
override def handleResponse(response: SetPassive.Response): Either[String, Unit] = override def handleResponse(response: SetPassive.Response): Either[String, Boolean] =
response match { response match {
case SetPassive.Response() => Right(()) case SetPassive.Response(becamePassive) => Right(becamePassive)
} }
} }
} }

View File

@ -1077,9 +1077,9 @@ class ParticipantReplicationAdministrationGroup(
@Help.Summary("Set the participant replica to passive") @Help.Summary("Set the participant replica to passive")
@Help.Description( @Help.Description(
"Trigger a graceful fail-over from this active replica to another passive replica." "Trigger a graceful fail-over from this active replica to another passive replica. Returns true if another replica became active."
) )
def set_passive(): Unit = { def set_passive(): Boolean = {
consoleEnvironment.run { consoleEnvironment.run {
runner.adminCommand( runner.adminCommand(
ParticipantAdminCommands.Replication.SetPassiveCommand() ParticipantAdminCommands.Replication.SetPassiveCommand()

View File

@ -51,7 +51,7 @@ import scala.util.control.NonFatal
*/ */
trait Environment extends NamedLogging with AutoCloseable with NoTracing { trait Environment extends NamedLogging with AutoCloseable with NoTracing {
type Config <: CantonConfig type Config <: CantonConfig & ConfigDefaults[DefaultPorts, Config]
type Console <: ConsoleEnvironment type Console <: ConsoleEnvironment
val config: Config val config: Config

View File

@ -91,4 +91,5 @@ object HashPurpose {
val OrderingRequestSignature = HashPurpose(42, "OrderingRequestSignature") val OrderingRequestSignature = HashPurpose(42, "OrderingRequestSignature")
val TopologyMappingUniqueKey = HashPurpose(43, "TopologyMappingUniqueKey") val TopologyMappingUniqueKey = HashPurpose(43, "TopologyMappingUniqueKey")
val CantonScript = HashPurpose(44, "CantonScriptHash") val CantonScript = HashPurpose(44, "CantonScriptHash")
val BftAvailabilityAck = HashPurpose(45, "BftAvailabilityAck")
} }

View File

@ -60,14 +60,20 @@ sealed trait LocalRejectError
def _resourcesType: Option[ErrorResource] = None def _resourcesType: Option[ErrorResource] = None
/** The affected resources. /** The affected resources.
* Will be logged as part of the context information. * It is used as follows:
* If this error is converted to an rpc Status, this field is included as com.google.rpc.ResourceInfo. * - It will be logged as part of the context information.
* - It is included into the resulting LocalReject.
* - The LocalReject is sent via the sequencer to the mediator. Therefore: do not include any confidential data!
* - The LocalReject is also output through the ledger API.
*/ */
def _resources: Seq[String] = Seq() def _resources: Seq[String] = Seq()
override def resources: Seq[(ErrorResource, String)] = override def resources: Seq[(ErrorResource, String)] =
_resourcesType.fold(Seq.empty[(ErrorResource, String)])(rt => _resources.map(rs => (rt, rs))) _resourcesType.fold(Seq.empty[(ErrorResource, String)])(rt => _resources.map(rs => (rt, rs)))
override def context: Map[String, String] =
_resourcesType.map(_.asString -> _resources.show).toList.toMap ++ super.context
override def pretty: Pretty[LocalRejectError] = override def pretty: Pretty[LocalRejectError] =
prettyOfClass( prettyOfClass(
param("code", _.code.id.unquoted), param("code", _.code.id.unquoted),

View File

@ -233,6 +233,7 @@ class SequencerNodeBootstrap(
storage, storage,
sequencerId, sequencerId,
arguments.parameterConfig, arguments.parameterConfig,
arguments.futureSupervisor,
loggerFactory, loggerFactory,
)(config.sequencer) )(config.sequencer)
addCloseable(ret) addCloseable(ret)

View File

@ -160,6 +160,7 @@ trait MkSequencerFactory {
storage: Storage, storage: Storage,
sequencerId: SequencerId, sequencerId: SequencerId,
nodeParameters: CantonNodeParameters, nodeParameters: CantonNodeParameters,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory, loggerFactory: NamedLoggerFactory,
)( )(
sequencerConfig: SequencerConfig sequencerConfig: SequencerConfig
@ -177,6 +178,7 @@ object CommunitySequencerFactory extends MkSequencerFactory {
storage: Storage, storage: Storage,
sequencerId: SequencerId, sequencerId: SequencerId,
nodeParameters: CantonNodeParameters, nodeParameters: CantonNodeParameters,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory, loggerFactory: NamedLoggerFactory,
)(sequencerConfig: SequencerConfig)(implicit )(sequencerConfig: SequencerConfig)(implicit
executionContext: ExecutionContext executionContext: ExecutionContext

View File

@ -206,7 +206,10 @@ abstract class SequencerApiTest
}) or include("Detected new members without sequencer counter") or }) or include("Detected new members without sequencer counter") or
include regex "Creating .* at block height None" or include regex "Creating .* at block height None" or
include("Subscribing to block source from") or include("Subscribing to block source from") or
include("Advancing sim clock")) include("Advancing sim clock") or
(include("Creating ForkJoinPool with parallelism") and include(
"to avoid starvation"
)))
}, },
) )
} yield { } yield {

View File

@ -4,6 +4,7 @@
package com.digitalasset.canton.integration package com.digitalasset.canton.integration
import com.digitalasset.canton.CloseableTest import com.digitalasset.canton.CloseableTest
import com.digitalasset.canton.config.DefaultPorts
import com.digitalasset.canton.config.RequireTypes.PositiveInt import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.environment.Environment import com.digitalasset.canton.environment.Environment
import com.digitalasset.canton.logging.{LogEntry, NamedLogging, SuppressingLogger} import com.digitalasset.canton.logging.{LogEntry, NamedLogging, SuppressingLogger}
@ -83,7 +84,9 @@ sealed trait EnvironmentSetup[E <: Environment, TCE <: TestConsoleEnvironment[E]
config config
) )
} }
val finalConfig = configTransform(pluginConfig)
// Once all the plugins and config transformation is done apply the defaults
val finalConfig = configTransform(pluginConfig).withDefaults(new DefaultPorts())
val scopedMetricsFactory = new ScopedInMemoryMetricsFactory val scopedMetricsFactory = new ScopedInMemoryMetricsFactory
val environmentFixture = val environmentFixture =

View File

@ -987,7 +987,7 @@ class ParticipantNode(
val id: ParticipantId, val id: ParticipantId,
val metrics: ParticipantMetrics, val metrics: ParticipantMetrics,
val config: LocalParticipantConfig, val config: LocalParticipantConfig,
storage: Storage, val storage: Storage,
override protected val clock: Clock, override protected val clock: Clock,
val cryptoPureApi: CryptoPureApi, val cryptoPureApi: CryptoPureApi,
identityPusher: ParticipantTopologyDispatcher, identityPusher: ParticipantTopologyDispatcher,

View File

@ -172,7 +172,7 @@ class AdminWorkflowServices(
packageService packageService
.vetPackages( .vetPackages(
AdminWorkflowServices.AdminWorkflowPackages.keys.toSeq, AdminWorkflowServices.AdminWorkflowPackages.keys.toSeq,
syncVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
) )
) )
@ -200,7 +200,7 @@ class AdminWorkflowServices(
fileNameO = Some(AdminWorkflowServices.AdminWorkflowDarResourceName), fileNameO = Some(AdminWorkflowServices.AdminWorkflowDarResourceName),
submissionIdO = None, submissionIdO = None,
vetAllPackages = true, vetAllPackages = true,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
.void .void
) )

View File

@ -5,6 +5,7 @@ package com.digitalasset.canton.participant.admin
import cats.data.EitherT import cats.data.EitherT
import cats.implicits.toBifunctorOps import cats.implicits.toBifunctorOps
import cats.syntax.functor.*
import cats.syntax.parallel.* import cats.syntax.parallel.*
import com.daml.nameof.NameOf.functionFullName import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.LfPackageId import com.digitalasset.canton.LfPackageId
@ -41,7 +42,7 @@ trait PackageOps extends NamedLogging {
def vetPackages( def vetPackages(
packages: Seq[PackageId], packages: Seq[PackageId],
synchronize: Boolean, synchronizeVetting: PackageVettingSynchronization,
)(implicit )(implicit
traceContext: TraceContext traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] ): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit]
@ -106,15 +107,25 @@ class PackageOpsImpl(
packageIsVettedOn.bimap(PackageMissingDependencies.Reject(packageId, _), _.contains(true)) packageIsVettedOn.bimap(PackageMissingDependencies.Reject(packageId, _), _.contains(true))
} }
override def vetPackages(packages: Seq[PackageId], synchronize: Boolean)(implicit override def vetPackages(
packages: Seq[PackageId],
synchronizeVetting: PackageVettingSynchronization,
)(implicit
traceContext: TraceContext traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = { ): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = {
modifyVettedPackages { existingPackages => for {
// Keep deterministic order for testing and keep optimal O(n) newVettedPackagesCreated <- modifyVettedPackages { existingPackages =>
val existingPackagesSet = existingPackages.toSet // Keep deterministic order for testing and keep optimal O(n)
val packagesToBeAdded = packages.filterNot(existingPackagesSet) val existingPackagesSet = existingPackages.toSet
existingPackages ++ packagesToBeAdded val packagesToBeAdded = packages.filterNot(existingPackagesSet)
} existingPackages ++ packagesToBeAdded
}
// only synchronize with the connected domains if a new VettedPackages transaction was actually issued
_ <- EitherTUtil.ifThenET(newVettedPackagesCreated) {
synchronizeVetting.sync(packages.toSet)
}
} yield ()
} }
override def revokeVettingForPackages( override def revokeVettingForPackages(
@ -124,14 +135,15 @@ class PackageOpsImpl(
)(implicit tc: TraceContext): EitherT[FutureUnlessShutdown, CantonError, Unit] = { )(implicit tc: TraceContext): EitherT[FutureUnlessShutdown, CantonError, Unit] = {
val packagesToUnvet = packages.toSet val packagesToUnvet = packages.toSet
modifyVettedPackages(_.filterNot(packagesToUnvet)).leftWiden modifyVettedPackages(_.filterNot(packagesToUnvet)).leftWiden[CantonError].void
} }
/** Returns true if a new VettedPackages transaction was authorized. */
def modifyVettedPackages( def modifyVettedPackages(
action: Seq[LfPackageId] => Seq[LfPackageId] action: Seq[LfPackageId] => Seq[LfPackageId]
)(implicit )(implicit
tc: TraceContext tc: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = { ): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Boolean] = {
// TODO(#14069) this vetting extension might fail on concurrent requests // TODO(#14069) this vetting extension might fail on concurrent requests
for { for {
@ -179,6 +191,6 @@ class PackageOpsImpl(
.map(_ => ()) .map(_ => ())
) )
} }
} yield () } yield newVettedPackagesState != currentPackages
} }
} }

View File

@ -60,7 +60,7 @@ trait DarService {
fileNameO: Option[String], fileNameO: Option[String],
submissionIdO: Option[LedgerSubmissionId], submissionIdO: Option[LedgerSubmissionId],
vetAllPackages: Boolean, vetAllPackages: Boolean,
synchronizeVetting: Boolean, synchronizeVetting: PackageVettingSynchronization,
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, DamlError, Hash] )(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, DamlError, Hash]
def validateDar( def validateDar(
@ -150,12 +150,15 @@ class PackageService(
): EitherT[FutureUnlessShutdown, CantonError, Unit] = ): EitherT[FutureUnlessShutdown, CantonError, Unit] =
ifDarExists(darHash)(removeDarLf(_, _))(ifNotExistsOperationFailed = "DAR archive removal") ifDarExists(darHash)(removeDarLf(_, _))(ifNotExistsOperationFailed = "DAR archive removal")
def vetDar(darHash: Hash, synchronize: Boolean)(implicit def vetDar(
darHash: Hash,
synchronizeVetting: PackageVettingSynchronization,
)(implicit
traceContext: TraceContext traceContext: TraceContext
): EitherT[FutureUnlessShutdown, CantonError, Unit] = ): EitherT[FutureUnlessShutdown, CantonError, Unit] =
ifDarExists(darHash) { (_, darLf) => ifDarExists(darHash) { (_, darLf) =>
packageOps packageOps
.vetPackages(darLf.all.map(readPackageId), synchronize) .vetPackages(darLf.all.map(readPackageId), synchronizeVetting)
.leftWiden[CantonError] .leftWiden[CantonError]
}(ifNotExistsOperationFailed = "DAR archive vetting") }(ifNotExistsOperationFailed = "DAR archive vetting")
@ -305,14 +308,16 @@ class PackageService(
* @param fileNameO The DAR filename, present if uploaded via the Admin API. * @param fileNameO The DAR filename, present if uploaded via the Admin API.
* @param submissionIdO upstream submissionId for ledger api server to recognize previous package upload requests * @param submissionIdO upstream submissionId for ledger api server to recognize previous package upload requests
* @param vetAllPackages if true, then the packages will be vetted automatically * @param vetAllPackages if true, then the packages will be vetted automatically
* @param synchronizeVetting if true, the future will terminate once the participant observed the package vetting on all connected domains * @param synchronizeVetting a value of PackageVettingSynchronization, that checks that the packages have been vetted on all connected domains.
* The Future returned by the check will complete once all domains have observed the vetting for the new packages.
* The caller may also pass be a no-op implementation that immediately returns, depending no the caller's needs for synchronization.
*/ */
def upload( def upload(
darBytes: ByteString, darBytes: ByteString,
fileNameO: Option[String], fileNameO: Option[String],
submissionIdO: Option[LedgerSubmissionId], submissionIdO: Option[LedgerSubmissionId],
vetAllPackages: Boolean, vetAllPackages: Boolean,
synchronizeVetting: Boolean, synchronizeVetting: PackageVettingSynchronization,
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, DamlError, Hash] = { )(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, DamlError, Hash] = {
val submissionId = val submissionId =
submissionIdO.getOrElse(LedgerSubmissionId.assertFromString(UUID.randomUUID().toString)) submissionIdO.getOrElse(LedgerSubmissionId.assertFromString(UUID.randomUUID().toString))
@ -348,11 +353,14 @@ class PackageService(
.map(_.toRight(s"No such dar ${darId}").flatMap(PackageService.darToLf)) .map(_.toRight(s"No such dar ${darId}").flatMap(PackageService.darToLf))
) )
def vetPackages(packages: Seq[PackageId], syncVetting: Boolean)(implicit def vetPackages(
packages: Seq[PackageId],
synchronizeVetting: PackageVettingSynchronization,
)(implicit
traceContext: TraceContext traceContext: TraceContext
): EitherT[FutureUnlessShutdown, DamlError, Unit] = ): EitherT[FutureUnlessShutdown, DamlError, Unit] =
packageOps packageOps
.vetPackages(packages, syncVetting) .vetPackages(packages, synchronizeVetting)
.leftMap[DamlError] { err => .leftMap[DamlError] { err =>
implicit val code = err.code implicit val code = err.code
CantonPackageServiceError.IdentityManagerParentError(err) CantonPackageServiceError.IdentityManagerParentError(err)

View File

@ -0,0 +1,26 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.participant.admin
import cats.data.EitherT
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.participant.topology.ParticipantTopologyManagerError
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.EitherTUtil
import com.digitalasset.daml.lf.data.Ref.PackageId
// TODO(#15087) remove this synchronization logic once topology events are published on the ledger api
trait PackageVettingSynchronization {
def sync(packages: Set[PackageId])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit]
}
object PackageVettingSynchronization {
object NoSync extends PackageVettingSynchronization {
override def sync(packages: Set[PackageId])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = EitherTUtil.unitUS
}
}

View File

@ -11,8 +11,8 @@ import com.digitalasset.canton.admin.participant.v30.{DarDescription as ProtoDar
import com.digitalasset.canton.crypto.Hash import com.digitalasset.canton.crypto.Hash
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil.GrpcErrors import com.digitalasset.canton.networking.grpc.CantonGrpcUtil.GrpcErrors
import com.digitalasset.canton.participant.admin.PackageService
import com.digitalasset.canton.participant.admin.PackageService.DarDescriptor import com.digitalasset.canton.participant.admin.PackageService.DarDescriptor
import com.digitalasset.canton.participant.admin.{PackageService, PackageVettingSynchronization}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc} import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.util.{EitherTUtil, OptionUtil} import com.digitalasset.canton.util.{EitherTUtil, OptionUtil}
import com.digitalasset.canton.{LfPackageId, protocol} import com.digitalasset.canton.{LfPackageId, protocol}
@ -24,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}
class GrpcPackageService( class GrpcPackageService(
service: PackageService, service: PackageService,
synchronizeVetting: PackageVettingSynchronization,
protected val loggerFactory: NamedLoggerFactory, protected val loggerFactory: NamedLoggerFactory,
)(implicit ec: ExecutionContext) )(implicit ec: ExecutionContext)
extends PackageServiceGrpc.PackageService extends PackageServiceGrpc.PackageService
@ -62,7 +63,9 @@ class GrpcPackageService(
fileNameO = Some(request.filename), fileNameO = Some(request.filename),
submissionIdO = None, submissionIdO = None,
vetAllPackages = request.vetAllPackages, vetAllPackages = request.vetAllPackages,
synchronizeVetting = request.synchronizeVetting, synchronizeVetting =
if (request.synchronizeVetting) synchronizeVetting
else PackageVettingSynchronization.NoSync,
) )
} yield UploadDarResponse( } yield UploadDarResponse(
UploadDarResponse.Value.Success(UploadDarResponse.Success(hash.toHexString)) UploadDarResponse.Value.Success(UploadDarResponse.Success(hash.toHexString))
@ -109,7 +112,10 @@ class GrpcPackageService(
val ret = for { val ret = for {
hash <- EitherT.fromEither[Future](extractHash(request.darHash)) hash <- EitherT.fromEither[Future](extractHash(request.darHash))
_unit <- service _unit <- service
.vetDar(hash, request.synchronize) .vetDar(
hash,
if (request.synchronize) synchronizeVetting else PackageVettingSynchronization.NoSync,
)
.leftMap(_.asGrpcError) .leftMap(_.asGrpcError)
.onShutdown(Left(GrpcErrors.AbortedDueToShutdown.Error().asGrpcError)) .onShutdown(Left(GrpcErrors.AbortedDueToShutdown.Error().asGrpcError))
} yield VetDarResponse() } yield VetDarResponse()

View File

@ -97,6 +97,7 @@ class StartableStoppableLedgerApiDependentServices(
.bindService( .bindService(
new GrpcPackageService( new GrpcPackageService(
packageService, packageService,
syncService.synchronizeVettingOnConnectedDomains,
loggerFactory, loggerFactory,
), ),
ec, ec,

View File

@ -10,7 +10,6 @@ import cats.syntax.functor.*
import cats.syntax.option.* import cats.syntax.option.*
import cats.syntax.parallel.* import cats.syntax.parallel.*
import cats.syntax.traverse.* import cats.syntax.traverse.*
import com.daml.error.utils.DecodedCantonError
import com.daml.metrics.api.MetricsContext import com.daml.metrics.api.MetricsContext
import com.daml.nonempty.NonEmpty import com.daml.nonempty.NonEmpty
import com.daml.nonempty.catsinstances.* import com.daml.nonempty.catsinstances.*
@ -1089,15 +1088,7 @@ class TransactionProcessingSteps(
submitterMetaO.flatMap(completionInfoFromSubmitterMetadataO(_, freshOwnTimelyTx)) submitterMetaO.flatMap(completionInfoFromSubmitterMetadataO(_, freshOwnTimelyTx))
rejectionReason.logWithContext(Map("requestId" -> pendingTransaction.requestId.toString)) rejectionReason.logWithContext(Map("requestId" -> pendingTransaction.requestId.toString))
val rejectionReasonStatus = rejectionReason.reason() val rejection = LedgerSyncEvent.CommandRejected.FinalReason(rejectionReason.reason())
val mappedRejectionReason =
DecodedCantonError.fromGrpcStatus(rejectionReasonStatus) match {
case Right(error) => rejectionReasonStatus
case Left(err) =>
logger.warn(s"Failed to parse the rejection reason: $err")
rejectionReasonStatus
}
val rejection = LedgerSyncEvent.CommandRejected.FinalReason(mappedRejectionReason)
val tseO = completionInfoO.map(info => val tseO = completionInfoO.map(info =>
TimestampedEvent( TimestampedEvent(

View File

@ -67,11 +67,11 @@ class TransactionConfirmationResponseFactory(
viewValidationResult.activenessResult viewValidationResult.activenessResult
if (inactive.nonEmpty) if (inactive.nonEmpty)
logger.debug( logger.info(
show"View $viewHash of request $requestId rejected due to inactive contract(s) $inactive" show"View $viewHash of request $requestId rejected due to inactive contract(s) $inactive"
) )
if (alreadyLocked.nonEmpty) if (alreadyLocked.nonEmpty)
logger.debug( logger.info(
show"View $viewHash of request $requestId rejected due to contention on contract(s) $alreadyLocked" show"View $viewHash of request $requestId rejected due to contention on contract(s) $alreadyLocked"
) )

View File

@ -222,6 +222,44 @@ class CantonSyncService(
loggerFactory, loggerFactory,
) )
/** Validates that the provided packages are vetted on the currently connected domains. */
// TODO(#15087) remove this waiting logic once topology events are published on the ledger api
val synchronizeVettingOnConnectedDomains: PackageVettingSynchronization =
new PackageVettingSynchronization {
override def sync(packages: Set[PackageId])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = {
// wait for packages to be vetted on the currently connected domains
EitherT
.right[ParticipantTopologyManagerError](
connectedDomainsLookup.snapshot.toSeq.parTraverse { case (domainId, syncDomain) =>
syncDomain.topologyClient
.await(
_.findUnvettedPackagesOrDependencies(participantId, packages)
.bimap(
_missingPackage => false,
unvettedPackages => unvettedPackages.nonEmpty,
)
.merge
.onShutdown(false),
timeouts.network.duration,
)
.map(domainId -> _)
}
)
.map { result =>
result.foreach { case (domainId, successful) =>
if (!successful)
logger.warn(
s"Waiting for vetting of packages $packages on domain $domainId timed out."
)
}
result
}
.void
}
}
private case class AttemptReconnect( private case class AttemptReconnect(
alias: DomainAlias, alias: DomainAlias,
last: CantonTimestamp, last: CantonTimestamp,
@ -661,7 +699,7 @@ class CantonSyncService(
fileNameO = None, fileNameO = None,
submissionIdO = Some(submissionId), submissionIdO = Some(submissionId),
vetAllPackages = true, vetAllPackages = true,
synchronizeVetting = false, synchronizeVetting = synchronizeVettingOnConnectedDomains,
) )
.map(_ => SubmissionResult.Acknowledged) .map(_ => SubmissionResult.Acknowledged)
.onShutdown(Left(CommonErrors.ServerIsShuttingDown.Reject())) .onShutdown(Left(CommonErrors.ServerIsShuttingDown.Reject()))

View File

@ -40,7 +40,10 @@ class PackageOpsForTesting(
)(implicit tc: TraceContext): EitherT[FutureUnlessShutdown, CantonError, Unit] = )(implicit tc: TraceContext): EitherT[FutureUnlessShutdown, CantonError, Unit] =
EitherT.rightT(()) EitherT.rightT(())
override def vetPackages(packages: Seq[PackageId], synchronize: Boolean)(implicit override def vetPackages(
packages: Seq[PackageId],
synchronizeVetting: PackageVettingSynchronization,
)(implicit
traceContext: TraceContext traceContext: TraceContext
): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] = ): EitherT[FutureUnlessShutdown, ParticipantTopologyManagerError, Unit] =
EitherT.rightT(()) EitherT.rightT(())

View File

@ -208,7 +208,7 @@ class PackageOpsTest extends PackageOpsTestBase {
arrangeCurrentlyVetted(List(pkgId1)) arrangeCurrentlyVetted(List(pkgId1))
expectNewVettingState(List(pkgId1, pkgId2)) expectNewVettingState(List(pkgId1, pkgId2))
packageOps packageOps
.vetPackages(Seq(pkgId1, pkgId2), synchronize = false) .vetPackages(Seq(pkgId1, pkgId2), PackageVettingSynchronization.NoSync)
.value .value
.unwrap .unwrap
.map(inside(_) { case UnlessShutdown.Outcome(Right(_)) => succeed }) .map(inside(_) { case UnlessShutdown.Outcome(Right(_)) => succeed })
@ -222,7 +222,7 @@ class PackageOpsTest extends PackageOpsTestBase {
// Not ordered to prove that we check set-equality not ordered // Not ordered to prove that we check set-equality not ordered
arrangeCurrentlyVetted(List(pkgId2, pkgId1)) arrangeCurrentlyVetted(List(pkgId2, pkgId1))
packageOps packageOps
.vetPackages(Seq(pkgId1, pkgId2), synchronize = false) .vetPackages(Seq(pkgId1, pkgId2), PackageVettingSynchronization.NoSync)
.value .value
.unwrap .unwrap
.map(inside(_) { case UnlessShutdown.Outcome(Right(_)) => .map(inside(_) { case UnlessShutdown.Outcome(Right(_)) =>

View File

@ -131,7 +131,7 @@ class PackageServiceTest
fileNameO = Some("CantonExamples"), fileNameO = Some("CantonExamples"),
submissionIdO = None, submissionIdO = None,
vetAllPackages = false, vetAllPackages = false,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
.value .value
.map(_.valueOrFail("append dar")) .map(_.valueOrFail("append dar"))
@ -154,7 +154,7 @@ class PackageServiceTest
fileNameO = Some("some/path/CantonExamples.dar"), fileNameO = Some("some/path/CantonExamples.dar"),
submissionIdO = None, submissionIdO = None,
vetAllPackages = false, vetAllPackages = false,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
.value .value
.map(_.valueOrFail("should be right")) .map(_.valueOrFail("should be right"))
@ -201,7 +201,7 @@ class PackageServiceTest
fileNameO = Some("some/path/CantonExamples.dar"), fileNameO = Some("some/path/CantonExamples.dar"),
submissionIdO = None, submissionIdO = None,
vetAllPackages = false, vetAllPackages = false,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
.valueOrFail("appending dar") .valueOrFail("appending dar")
deps <- packageDependencyResolver.packageDependencies(mainPackageId).value deps <- packageDependencyResolver.packageDependencies(mainPackageId).value
@ -253,7 +253,7 @@ class PackageServiceTest
Some(badDarPath), Some(badDarPath),
None, None,
vetAllPackages = false, vetAllPackages = false,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
)("append illformed.dar").failOnShutdown )("append illformed.dar").failOnShutdown
} yield { } yield {
@ -299,7 +299,7 @@ class PackageServiceTest
"requested by PackageService.vetDar" should { "requested by PackageService.vetDar" should {
"reject the request with an error" in withEnv( "reject the request with an error" in withEnv(
rejectOnMissingDar( rejectOnMissingDar(
_.vetDar(unknownDarHash, synchronize = true), _.vetDar(unknownDarHash, PackageVettingSynchronization.NoSync),
unknownDarHash, unknownDarHash,
"DAR archive vetting", "DAR archive vetting",
) )
@ -326,9 +326,9 @@ class PackageServiceTest
sut.upload( sut.upload(
darBytes = payload, darBytes = payload,
fileNameO = Some(darName), fileNameO = Some(darName),
None, submissionIdO = None,
vetAllPackages = false, vetAllPackages = false,
synchronizeVetting = false, synchronizeVetting = PackageVettingSynchronization.NoSync,
) )
) )
} }

View File

@ -1 +1 @@
20240702.13586.v19985bc9 20240703.13595.v030861a1