update canton to 20231207.11939.0.vd18f4862/2.8.0-snapshot.20231207.11619.0.v1fe7c380/3.0.0-snapshot.20231207.11939.0.vd18f4862 (#18001)

* update canton to 20231207.11939.0.vd18f4862/2.8.0-snapshot.20231207.11619.0.v1fe7c380/3.0.0-snapshot.20231207.11939.0.vd18f4862

CHANGELOG_BEGIN
CHANGELOG_END

* apply fix?

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Gary Verhaegen <gary.verhaegen@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2023-12-08 15:00:25 +01:00 committed by GitHub
parent ff18f64df4
commit db7309c4fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 3246 additions and 550 deletions

View File

@ -636,7 +636,7 @@ object TopologyAdminCommandsX {
)
),
mustFullyAuthorize = mustFullyAuthorize,
forceChange = false,
forceChange = forceChange,
signedBy = signedBy.map(_.toProtoPrimitive),
store,
)

View File

@ -8,16 +8,25 @@ import com.daml.nonempty.NonEmptyUtil
import com.digitalasset.canton.admin.api.client.data.crypto.*
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.config.{NonNegativeFiniteDuration, PositiveDurationSeconds}
import com.digitalasset.canton.protocol.DynamicDomainParameters.InvalidDynamicDomainParameters
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.protocol.DynamicDomainParameters.{
InvalidDynamicDomainParameters,
protocolVersionRepresentativeFor,
}
import com.digitalasset.canton.protocol.{
DynamicDomainParameters as DynamicDomainParametersInternal,
StaticDomainParameters as StaticDomainParametersInternal,
v2 as protocolV2,
}
import com.digitalasset.canton.time.{
Clock,
NonNegativeFiniteDuration as InternalNonNegativeFiniteDuration,
PositiveSeconds,
}
import com.digitalasset.canton.topology.admin.v0.DomainParametersChangeAuthorization
import com.digitalasset.canton.util.BinaryFileUtil
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{crypto as DomainCrypto}
import com.digitalasset.canton.version.{ProtoVersion, ProtocolVersion}
import com.digitalasset.canton.crypto as DomainCrypto
import com.google.common.annotations.VisibleForTesting
import io.scalaland.chimney.dsl.*
@ -102,6 +111,7 @@ final case class DynamicDomainParameters(
maxRatePerParticipant: NonNegativeInt,
maxRequestSize: NonNegativeInt,
sequencerAggregateSubmissionTimeout: NonNegativeFiniteDuration,
trafficControlParameters: Option[TrafficControlParameters],
) {
if (ledgerTimeRecordTimeTolerance * 2 > mediatorDeduplicationTimeout)
@ -127,12 +137,25 @@ final case class DynamicDomainParameters(
transferExclusivityTimeout: NonNegativeFiniteDuration = transferExclusivityTimeout,
topologyChangeDelay: NonNegativeFiniteDuration = topologyChangeDelay,
ledgerTimeRecordTimeTolerance: NonNegativeFiniteDuration = ledgerTimeRecordTimeTolerance,
mediatorDeduplicationTimeout: NonNegativeFiniteDuration = mediatorDeduplicationTimeout,
reconciliationInterval: PositiveDurationSeconds = reconciliationInterval,
maxRatePerParticipant: NonNegativeInt = maxRatePerParticipant,
maxRequestSize: NonNegativeInt = maxRequestSize,
sequencerAggregateSubmissionTimeout: NonNegativeFiniteDuration =
sequencerAggregateSubmissionTimeout,
trafficControlParameters: Option[TrafficControlParameters] = trafficControlParameters,
): DynamicDomainParameters = this.copy(
participantResponseTimeout = participantResponseTimeout,
mediatorReactionTimeout = mediatorReactionTimeout,
transferExclusivityTimeout = transferExclusivityTimeout,
topologyChangeDelay = topologyChangeDelay,
ledgerTimeRecordTimeTolerance = ledgerTimeRecordTimeTolerance,
mediatorDeduplicationTimeout = mediatorDeduplicationTimeout,
reconciliationInterval = reconciliationInterval,
maxRatePerParticipant = maxRatePerParticipant,
maxRequestSize = maxRequestSize,
sequencerAggregateSubmissionTimeout = sequencerAggregateSubmissionTimeout,
trafficControlParameters = trafficControlParameters,
)
def toProto: DomainParametersChangeAuthorization.Parameters =
@ -162,6 +185,27 @@ final case class DynamicDomainParameters(
trafficControlParameters = None,
)
)
private[canton] def toInternal: DynamicDomainParametersInternal =
DynamicDomainParametersInternal.tryCreate(
participantResponseTimeout =
InternalNonNegativeFiniteDuration.fromConfig(participantResponseTimeout),
mediatorReactionTimeout =
InternalNonNegativeFiniteDuration.fromConfig(mediatorReactionTimeout),
transferExclusivityTimeout =
InternalNonNegativeFiniteDuration.fromConfig(transferExclusivityTimeout),
topologyChangeDelay = InternalNonNegativeFiniteDuration.fromConfig(topologyChangeDelay),
ledgerTimeRecordTimeTolerance =
InternalNonNegativeFiniteDuration.fromConfig(ledgerTimeRecordTimeTolerance),
mediatorDeduplicationTimeout =
InternalNonNegativeFiniteDuration.fromConfig(mediatorDeduplicationTimeout),
reconciliationInterval = PositiveSeconds.fromConfig(reconciliationInterval),
maxRatePerParticipant = maxRatePerParticipant,
maxRequestSize = MaxRequestSize(maxRequestSize),
sequencerAggregateSubmissionTimeout =
InternalNonNegativeFiniteDuration.fromConfig(sequencerAggregateSubmissionTimeout),
trafficControlParameters = trafficControlParameters.map(_.toInternal),
)(protocolVersionRepresentativeFor(ProtoVersion(0)))
}
object DynamicDomainParameters {
@ -173,6 +217,11 @@ object DynamicDomainParameters {
DynamicDomainParametersInternal.defaultValues(protocolVersion)
)
private[canton] def initialValues(clock: Clock, protocolVersion: ProtocolVersion) =
DynamicDomainParameters(
DynamicDomainParametersInternal.initialValues(clock, protocolVersion)
)
def apply(
domain: DynamicDomainParametersInternal
): DynamicDomainParameters =

View File

@ -0,0 +1,27 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.admin.api.client.data
import com.digitalasset.canton.config
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
import com.digitalasset.canton.sequencing.{
TrafficControlParameters as TrafficControlParametersInternal
}
import com.digitalasset.canton.time.{NonNegativeFiniteDuration as InternalNonNegativeFiniteDuration}
// TODO(#15650) Properly expose new BFT parameters and domain limits
final case class TrafficControlParameters(
maxBaseTrafficAmount: NonNegativeLong,
readVsWriteScalingFactor: PositiveInt,
maxBaseTrafficAccumulationDuration: config.NonNegativeFiniteDuration,
) {
private[canton] def toInternal: TrafficControlParametersInternal =
TrafficControlParametersInternal(
maxBaseTrafficAmount = maxBaseTrafficAmount,
readVsWriteScalingFactor = readVsWriteScalingFactor,
maxBaseTrafficAccumulationDuration =
InternalNonNegativeFiniteDuration.fromConfig(maxBaseTrafficAccumulationDuration),
)
}

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.console
import com.daml.lf.data.Ref.PackageId
import com.digitalasset.canton.*
import com.digitalasset.canton.admin.api.client.commands.GrpcAdminCommand
import com.digitalasset.canton.config.RequireTypes.Port
@ -34,7 +33,7 @@ import com.digitalasset.canton.topology.{DomainId, NodeIdentity, ParticipantId}
import com.digitalasset.canton.tracing.NoTracing
import com.digitalasset.canton.util.ErrorUtil
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, TimeoutException}
import scala.util.hashing.MurmurHash3
trait InstanceReferenceCommon
@ -538,10 +537,68 @@ abstract class ParticipantReference(
@Help.Group("Topology")
@Help.Description("This group contains access to the full set of topology management commands.")
def topology: TopologyAdministrationGroup = topology_
override protected def vettedPackagesOfParticipant(): Set[PackageId] = topology.vetted_packages
.list(filterStore = "Authorized", filterParticipant = id.filterString)
.flatMap(_.item.packageIds)
.toSet
override protected def waitPackagesVetted(
timeout: NonNegativeDuration = consoleEnvironment.commandTimeouts.bounded
): Unit = {
def waitForPackages(
targetTopology: TopologyAdministrationGroup,
observer: String,
domainId: DomainId,
): Unit =
try {
AdminCommandRunner
.retryUntilTrue(timeout) {
// ensure that vetted packages on the domain match the ones in the authorized store
val onTargetTopologyDomainStore = targetTopology.vetted_packages
.list(filterStore = domainId.filterString, filterParticipant = id.filterString)
.flatMap(_.item.packageIds)
.toSet
val onParticipantAuthorizedStore = topology.vetted_packages
.list(filterStore = "Authorized", filterParticipant = id.filterString)
.flatMap(_.item.packageIds)
.toSet
val ret = onParticipantAuthorizedStore == onTargetTopologyDomainStore
if (!ret) {
logger.debug(
show"Still waiting for package vetting updates to be observed by $observer on $domainId: vetted - onDomain is ${onParticipantAuthorizedStore -- onTargetTopologyDomainStore} while onDomain -- vetted is ${onTargetTopologyDomainStore -- onParticipantAuthorizedStore}"
)
}
ret
}
.discard
} catch {
case _: TimeoutException =>
logger.error(
show"$observer has not observed all vetting txs of $id on domain $domainId within the given timeout."
)
}
val connected = domains.list_connected().map(_.domainId).toSet
// for every domain this participant is connected to
consoleEnvironment.domains.all
.filter(d => d.health.running() && d.health.initialized() && connected.contains(d.id))
.foreach { domain =>
waitForPackages(domain.topology, s"Domain ${domain.name}", domain.id)
}
// for every participant
consoleEnvironment.participants.all
.filter(p => p.health.running() && p.health.initialized())
.foreach { participant =>
// for every domain this participant is connected to as well
participant.domains.list_connected().foreach {
case item if connected.contains(item.domainId) =>
waitForPackages(
participant.topology,
s"Participant ${participant.name}",
item.domainId,
)
case _ =>
}
}
}
override protected def participantIsActiveOnDomain(
domainId: DomainId,
@ -724,10 +781,55 @@ abstract class ParticipantReferenceX(
@Help.Group("Topology")
@Help.Description("This group contains access to the full set of topology management commands.")
override def topology: TopologyAdministrationGroupX = topology_
override protected def vettedPackagesOfParticipant(): Set[PackageId] = topology.vetted_packages
.list(filterStore = "Authorized", filterParticipant = id.filterString)
.flatMap(_.item.packageIds)
.toSet
override protected def waitPackagesVetted(timeout: NonNegativeDuration): Unit = {
val connected = domains.list_connected().map(_.domainId).toSet
// for every participant
consoleEnvironment.participantsX.all
.filter(p => p.health.running() && p.health.initialized())
.foreach { participant =>
// for every domain this participant is connected to as well
participant.domains.list_connected().foreach {
case item if connected.contains(item.domainId) =>
try {
AdminCommandRunner
.retryUntilTrue(timeout) {
// ensure that vetted packages on the domain match the ones in the authorized store
val onDomain = participant.topology.vetted_packages
.list(
filterStore = item.domainId.filterString,
filterParticipant = id.filterString,
)
.flatMap(_.item.packageIds)
.toSet
// Vetted packages from the participant's authorized store
val onParticipantAuthorizedStore = topology.vetted_packages
.list(filterStore = "Authorized", filterParticipant = id.filterString)
.filter(_.item.domainId.forall(_ == item.domainId))
.flatMap(_.item.packageIds)
.toSet
val ret = onParticipantAuthorizedStore == onDomain
if (!ret) {
logger.debug(
show"Still waiting for package vetting updates to be observed by Participant ${participant.name} on ${item.domainId}: vetted - onDomain is ${onParticipantAuthorizedStore -- onDomain} while onDomain -- vetted is ${onDomain -- onParticipantAuthorizedStore}"
)
}
ret
}
.discard
} catch {
case _: TimeoutException =>
logger.error(
show"Participant ${participant.name} has not observed all vetting txs of $id on domain ${item.domainId} within the given timeout."
)
}
case _ =>
}
}
}
override protected def participantIsActiveOnDomain(
domainId: DomainId,
participantId: ParticipantId,

View File

@ -6,7 +6,6 @@ package com.digitalasset.canton.console.commands
import cats.syntax.option.*
import cats.syntax.traverse.*
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.lf.data.Ref.PackageId
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommands.Pruning.{
GetParticipantScheduleCommand,
@ -772,7 +771,10 @@ trait ParticipantAdministration extends FeatureFlagFilter {
def id: ParticipantId
protected def vettedPackagesOfParticipant(): Set[PackageId]
protected def waitPackagesVetted(
timeout: NonNegativeDuration = consoleEnvironment.commandTimeouts.bounded
): Unit
protected def participantIsActiveOnDomain(
domainId: DomainId,
participantId: ParticipantId,
@ -959,7 +961,6 @@ trait ParticipantAdministration extends FeatureFlagFilter {
def synchronize_vetting(
timeout: NonNegativeDuration = consoleEnvironment.commandTimeouts.bounded
): Unit = {
val connected = domains.list_connected().map(_.domainId).toSet
// ensure that the ledger api server has seen all packages
try {
@ -984,61 +985,8 @@ trait ParticipantAdministration extends FeatureFlagFilter {
)
}
def waitForPackages(
topology: TopologyAdministrationGroup,
observer: String,
domainId: DomainId,
): Unit = {
try {
AdminCommandRunner
.retryUntilTrue(timeout) {
// ensure that vetted packages on the domain match the ones in the authorized store
val onDomain = topology.vetted_packages
.list(filterStore = domainId.filterString, filterParticipant = id.filterString)
.flatMap(_.item.packageIds)
.toSet
val vetted = vettedPackagesOfParticipant()
val ret = vetted == onDomain
if (!ret) {
logger.debug(
show"Still waiting for package vetting updates to be observed by $observer on $domainId: vetted - onDomain is ${vetted -- onDomain} while onDomain -- vetted is ${onDomain -- vetted}"
)
}
ret
}
.discard
} catch {
case _: TimeoutException =>
logger.error(
show"$observer has not observed all vetting txs of $id on domain $domainId within the given timeout."
)
}
}
// for every domain this participant is connected to
consoleEnvironment.domains.all
.filter(d => d.health.running() && d.health.initialized() && connected.contains(d.id))
.foreach { domain =>
waitForPackages(domain.topology, s"Domain ${domain.name}", domain.id)
}
// for every participant
consoleEnvironment.participants.all
.filter(p => p.health.running() && p.health.initialized())
.foreach { participant =>
// for every domain this participant is connected to as well
participant.domains.list_connected().foreach {
case item if connected.contains(item.domainId) =>
waitForPackages(
participant.topology,
s"Participant ${participant.name}",
item.domainId,
)
case _ =>
}
}
waitPackagesVetted(timeout)
}
}
@Help.Summary("Manage domain connections")

View File

@ -105,7 +105,10 @@ abstract class TopologyAdministrationGroupCommon(
timeout: Option[NonNegativeDuration]
)(grpcCommand: => GrpcAdminCommand[_, _, T]): T = {
val ret = consoleEnvironment.run(runner.adminCommand(grpcCommand))
ConsoleMacros.utils.synchronize_topology(timeout)(consoleEnvironment)
// Only wait for topology synchronization if a timeout is specified.
if (timeout.nonEmpty) {
ConsoleMacros.utils.synchronize_topology(timeout)(consoleEnvironment)
}
ret
}
}

View File

@ -12,6 +12,10 @@ import com.digitalasset.canton.admin.api.client.commands.{
TopologyAdminCommandsX,
}
import com.digitalasset.canton.admin.api.client.data.topologyx.*
import com.digitalasset.canton.admin.api.client.data.{
DynamicDomainParameters as ConsoleDynamicDomainParameters,
TrafficControlParameters,
}
import com.digitalasset.canton.config
import com.digitalasset.canton.config.NonNegativeDuration
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt, PositiveLong}
@ -29,9 +33,10 @@ import com.digitalasset.canton.console.{
}
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.health.admin.data.TopologyQueueStatus
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.protocol.DynamicDomainParameters
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.admin.grpc.BaseQueryX
import com.digitalasset.canton.topology.store.TopologyStoreId.AuthorizedStore
@ -39,9 +44,14 @@ import com.digitalasset.canton.topology.store.{StoredTopologyTransactionsX, Time
import com.digitalasset.canton.topology.transaction.SignedTopologyTransactionX.GenericSignedTopologyTransactionX
import com.digitalasset.canton.topology.transaction.TopologyTransactionX.TxHash
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.version.ProtocolVersion
import com.google.protobuf.ByteString
import java.time.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.math.Ordering.Implicits.infixOrderingOps
import scala.reflect.ClassTag
trait InitNodeIdX extends ConsoleCommandGroup {
@ -228,7 +238,7 @@ class TopologyAdministrationGroupX(
val domainParameterState =
instance.topology.domain_parameters.propose(
domainId,
DynamicDomainParameters
ConsoleDynamicDomainParameters
.initialValues(
consoleEnvironment.environment.clock,
ProtocolVersion.latest,
@ -1498,9 +1508,7 @@ class TopologyAdministrationGroupX(
This transaction will be rejected if another fully authorized transaction with the same serial already
exists, or if there is a gap between this serial and the most recently used serial.
If None, the serial will be automatically selected by the node.
signedBy: the fingerprint of the key to be used to sign this proposal
ops: Either Replace or Remove the vetting. Default to Replace.
|""")
signedBy: the fingerprint of the key to be used to sign this proposal""")
def propose(
participant: ParticipantId,
packageIds: Seq[PackageId],
@ -1796,6 +1804,7 @@ class TopologyAdministrationGroupX(
@Help.Summary("Manage domain parameters state", FeatureFlag.Preview)
@Help.Group("Domain Parameters State")
object domain_parameters extends Helpful {
@Help.Summary("List dynamic domain parameters")
def list(
filterStore: String = "",
proposals: Boolean = false,
@ -1820,9 +1829,24 @@ class TopologyAdministrationGroupX(
)
}
@Help.Summary("Get the configured dynamic domain parameters")
def get_dynamic_domain_parameters(domainId: DomainId): ConsoleDynamicDomainParameters =
ConsoleDynamicDomainParameters(
expectExactlyOneResult(
list(
filterStore = domainId.filterString,
proposals = false,
timeQuery = TimeQueryX.HeadState,
operation = Some(TopologyChangeOpX.Replace),
filterDomain = domainId.filterString,
)
).item
)
@Help.Summary("Propose changes to dynamic domain parameters")
@Help.Description("""
domain: the target domain
@Help.Description(
"""
domainId: the target domain
parameters: the new dynamic domain parameters to be used on the domain
store: - "Authorized": the topology transaction will be stored in the node's authorized store and automatically
@ -1838,32 +1862,326 @@ class TopologyAdministrationGroupX(
serial: the expected serial this topology transaction should have. Serials must be contiguous and start at 1.
This transaction will be rejected if another fully authorized transaction with the same serial already
exists, or if there is a gap between this serial and the most recently used serial.
If None, the serial will be automatically selected by the node.""")
If None, the serial will be automatically selected by the node.
synchronize: Synchronize timeout can be used to ensure that the state has been propagated into the node
force: must be set to true when performing a dangerous operation, such as increasing the ledgerTimeRecordTimeTolerance"""
)
def propose(
domain: DomainId,
parameters: DynamicDomainParameters,
domainId: DomainId,
parameters: ConsoleDynamicDomainParameters,
store: Option[String] = None,
mustFullyAuthorize: Boolean = false,
// TODO(#14056) don't use the instance's root namespace key by default.
// let the grpc service figure out the right key to use, once that's implemented
signedBy: Option[Fingerprint] = Some(instance.id.uid.namespace.fingerprint),
serial: Option[PositiveInt] = None,
): SignedTopologyTransactionX[TopologyChangeOpX, DomainParametersStateX] =
consoleEnvironment.run {
adminCommand(
TopologyAdminCommandsX.Write.Propose(
// TODO(#14058) maybe don't just take default values for dynamic parameters
DomainParametersStateX(
domain,
parameters,
),
signedBy.toList,
serial = serial,
mustFullyAuthorize = mustFullyAuthorize,
store = store.getOrElse(domain.filterString),
)
synchronize: Option[config.NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
force: Boolean = false,
): SignedTopologyTransactionX[TopologyChangeOpX, DomainParametersStateX] = { // TODO(#15815): Don't expose internal TopologyMappingX and TopologyChangeOpX classes
synchronisation.runAdminCommand(synchronize)(
TopologyAdminCommandsX.Write.Propose(
DomainParametersStateX(
domainId,
parameters.toInternal,
),
signedBy.toList,
serial = serial,
mustFullyAuthorize = mustFullyAuthorize,
store = store.getOrElse(domainId.filterString),
forceChange = force,
)
)
}
@Help.Summary("Propose an update to dynamic domain parameters")
@Help.Description(
"""
domainId: the target domain
update: the new dynamic domain parameters to be used on the domain
mustFullyAuthorize: when set to true, the proposal's previously received signatures and the signature of this node must be
sufficient to fully authorize the topology transaction. if this is not the case, the request fails.
when set to false, the proposal retains the proposal status until enough signatures are accumulated to
satisfy the mapping's authorization requirements.
signedBy: the fingerprint of the key to be used to sign this proposal
synchronize: Synchronize timeout can be used to ensure that the state has been propagated into the node
force: must be set to true when performing a dangerous operation, such as increasing the ledgerTimeRecordTimeTolerance"""
)
def propose_update(
domainId: DomainId, // TODO(#15803) check whether we can infer domainId
update: ConsoleDynamicDomainParameters => ConsoleDynamicDomainParameters,
mustFullyAuthorize: Boolean = false,
// TODO(#14056) don't use the instance's root namespace key by default.
signedBy: Option[Fingerprint] = Some(instance.id.uid.namespace.fingerprint),
synchronize: Option[config.NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
force: Boolean = false,
): Unit = {
val domainStore = domainId.filterString
val previousParameters = expectExactlyOneResult(
list(
filterDomain = domainId.filterString,
filterStore = domainStore,
operation = Some(TopologyChangeOpX.Replace),
)
)
val newParameters = update(ConsoleDynamicDomainParameters(previousParameters.item))
// Avoid topology manager ALREADY_EXISTS error by not submitting a no-op proposal.
// TODO(#15817): Move such ux-resilience avoiding error to write_service
if (ConsoleDynamicDomainParameters(previousParameters.item) != newParameters) {
propose(
domainId,
newParameters,
Some(domainStore),
mustFullyAuthorize,
signedBy,
Some(previousParameters.context.serial.increment),
synchronize,
force,
).discard
}
}
@Help.Summary("Update the participant response timeout in the dynamic domain parameters")
def set_participant_response_timeout(
domainId: DomainId,
timeout: config.NonNegativeFiniteDuration,
): Unit = propose_update(domainId, _.update(participantResponseTimeout = timeout))
@Help.Summary("Update the mediator reaction timeout in the dynamic domain parameters")
def set_mediator_reaction_timeout(
domainId: DomainId,
timeout: config.NonNegativeFiniteDuration,
): Unit = propose_update(domainId, _.update(mediatorReactionTimeout = timeout))
@Help.Summary("Update the transfer exclusivity timeout in the dynamic domain parameters")
def set_transfer_exclusivity_timeout(
domainId: DomainId,
timeout: config.NonNegativeFiniteDuration,
): Unit = propose_update(domainId, _.update(transferExclusivityTimeout = timeout))
@Help.Summary("Update the topology change delay in the dynamic domain parameters")
def set_topology_change_delay(
domainId: DomainId,
delay: config.NonNegativeFiniteDuration,
): Unit = propose_update(domainId, _.update(topologyChangeDelay = delay))
@Help.Summary("Update the ledger time record time tolerance in the dynamic domain parameters")
@Help.Description(
"""If it would be insecure to perform the change immediately,
|the command will block and wait until it is secure to perform the change.
|The command will block for at most twice of ``newLedgerTimeRecordTimeTolerance``.
|
|The method will fail if ``mediatorDeduplicationTimeout`` is less than twice of ``newLedgerTimeRecordTimeTolerance``.
|
|Do not modify domain parameters concurrently while running this command,
|because the command may override concurrent changes.
|
|force: update ``ledgerTimeRecordTimeTolerance`` immediately without blocking.
|This is safe to do during domain bootstrapping and in test environments, but should not be done in operational production systems."""
)
def set_ledger_time_record_time_tolerance(
domainId: DomainId,
newLedgerTimeRecordTimeTolerance: config.NonNegativeFiniteDuration,
force: Boolean = false,
): Unit = {
TraceContext.withNewTraceContext { implicit tc =>
if (!force) {
securely_set_ledger_time_record_time_tolerance(
domainId,
newLedgerTimeRecordTimeTolerance,
)
} else {
logger.info(
s"Immediately updating ledgerTimeRecordTimeTolerance to $newLedgerTimeRecordTimeTolerance..."
)
propose_update(
domainId,
_.update(ledgerTimeRecordTimeTolerance = newLedgerTimeRecordTimeTolerance),
force = true,
)
}
}
}
private def securely_set_ledger_time_record_time_tolerance(
domainId: DomainId,
newLedgerTimeRecordTimeTolerance: config.NonNegativeFiniteDuration,
)(implicit traceContext: TraceContext): Unit = {
implicit val ec: ExecutionContext = consoleEnvironment.environment.executionContext
// See i9028 for a detailed design.
// https://docs.google.com/document/d/1tpPbzv2s6bjbekVGBn6X5VZuw0oOTHek5c30CBo4UkI/edit#bookmark=id.1dzc6dxxlpca
// We wait until the antecedent of Lemma 2 Item 2 is falsified for all changes that violate the conclusion.
// Compute new parameters
val oldDomainParameters = get_dynamic_domain_parameters(domainId)
val oldLedgerTimeRecordTimeTolerance = oldDomainParameters.ledgerTimeRecordTimeTolerance
val minMediatorDeduplicationTimeout = newLedgerTimeRecordTimeTolerance * 2
if (oldDomainParameters.mediatorDeduplicationTimeout < minMediatorDeduplicationTimeout) {
val err = TopologyManagerError.IncreaseOfLedgerTimeRecordTimeTolerance
.PermanentlyInsecure(
newLedgerTimeRecordTimeTolerance.toInternal,
oldDomainParameters.mediatorDeduplicationTimeout.toInternal,
)
val msg = CantonError.stringFromContext(err)
consoleEnvironment.run(GenericCommandError(msg))
}
logger.info(
s"Securely updating ledgerTimeRecordTimeTolerance to $newLedgerTimeRecordTimeTolerance..."
)
// Poll until it is safe to increase ledgerTimeRecordTimeTolerance
def checkPreconditions(): Future[Unit] = {
val startTs = consoleEnvironment.environment.clock.now
// Update mediatorDeduplicationTimeout for several reasons:
// 1. Make sure it is big enough.
// 2. The resulting topology transaction gives us a meaningful lower bound on the sequencer clock.
logger.info(
s"Do a no-op update of ledgerTimeRecordTimeTolerance to $oldLedgerTimeRecordTimeTolerance..."
)
propose_update(
domainId,
_.copy(ledgerTimeRecordTimeTolerance = oldLedgerTimeRecordTimeTolerance),
)
logger.debug("Check for incompatible past domain parameters...")
val allTransactions = list(
domainId.filterString,
// We can't specify a lower bound in range because that would be compared against validFrom.
// (But we need to compare to validUntil).
timeQuery = TimeQueryX.Range(None, None),
)
// This serves as a lower bound of validFrom for the next topology transaction.
val lastSequencerTs =
allTransactions
.map(_.context.validFrom)
.maxOption
.getOrElse(throw new NoSuchElementException("Missing domain parameters!"))
logger.debug(s"Last sequencer timestamp is $lastSequencerTs.")
// Determine how long we need to wait until all incompatible domainParameters have become
// invalid for at least minMediatorDeduplicationTimeout.
val waitDuration = allTransactions
.filterNot(tx =>
ConsoleDynamicDomainParameters(tx.item).compatibleWithNewLedgerTimeRecordTimeTolerance(
newLedgerTimeRecordTimeTolerance
)
)
.map { tx =>
val elapsedForAtLeast = tx.context.validUntil match {
case Some(validUntil) => Duration.between(validUntil, lastSequencerTs)
case None => Duration.ZERO
}
minMediatorDeduplicationTimeout.asJava minus elapsedForAtLeast
}
.maxOption
.getOrElse(Duration.ZERO)
if (waitDuration > Duration.ZERO) {
logger.info(
show"Found incompatible past domain parameters. Waiting for $waitDuration..."
)
// Use the clock instead of Threading.sleep to support sim clock based tests.
val delayF = consoleEnvironment.environment.clock
.scheduleAt(
_ => (),
startTs.plus(waitDuration),
) // avoid scheduleAfter, because that causes a race condition in integration tests
.onShutdown(
throw new IllegalStateException(
"Update of ledgerTimeRecordTimeTolerance interrupted due to shutdown."
)
)
// Do not submit checkPreconditions() to the clock because it is blocking and would therefore block the clock.
delayF.flatMap(_ => checkPreconditions())
} else {
Future.unit
}
}
consoleEnvironment.commandTimeouts.unbounded.await(
"Wait until ledgerTimeRecordTimeTolerance can be increased."
)(
checkPreconditions()
)
// Now that past values of mediatorDeduplicationTimeout have been large enough,
// we can change ledgerTimeRecordTimeTolerance.
logger.info(
s"Now changing ledgerTimeRecordTimeTolerance to $newLedgerTimeRecordTimeTolerance..."
)
propose_update(
domainId,
_.copy(ledgerTimeRecordTimeTolerance = newLedgerTimeRecordTimeTolerance),
force = true,
)
}
@Help.Summary("Update the mediator deduplication timeout in the dynamic domain parameters")
def set_mediator_deduplication_timeout(
domainId: DomainId,
timeout: config.NonNegativeFiniteDuration,
): Unit = propose_update(domainId, _.update(mediatorDeduplicationTimeout = timeout))
@Help.Summary("Update the reconciliation interval in the dynamic domain parameters")
def set_reconciliation_interval(
domainId: DomainId,
interval: config.PositiveDurationSeconds,
): Unit = propose_update(domainId, _.update(reconciliationInterval = interval))
@Help.Summary("Update the maximum rate per participant in the dynamic domain parameters")
def set_max_rate_per_participant(
domainId: DomainId,
rate: NonNegativeInt,
): Unit = propose_update(domainId, _.update(maxRatePerParticipant = rate))
@Help.Summary("Update the maximum request size in the dynamic domain parameters")
@Help.Description(
"""The update won't have any effect until the sequencers are restarted."""
)
def set_max_request_size(
domainId: DomainId,
size: NonNegativeInt,
): Unit = propose_update(domainId, _.update(maxRequestSize = size))
@Help.Summary(
"Update the sequencer aggregate submission timeout in the dynamic domain parameters"
)
def set_sequencer_aggregate_submission_timeout(
domainId: DomainId,
timeout: config.NonNegativeFiniteDuration,
): Unit =
propose_update(domainId, _.update(sequencerAggregateSubmissionTimeout = timeout))
@Help.Summary(
"Update the `trafficControlParameters` in the dynamic domain parameters"
)
def set_traffic_control_parameters(
domainId: DomainId,
trafficControlParameters: TrafficControlParameters,
): Unit = propose_update(
domainId,
_.update(trafficControlParameters = Some(trafficControlParameters)),
)
@Help.Summary(
"Clear the traffic control parameters in the dynamic domain parameters"
)
def clear_traffic_control_parameters(domainId: DomainId): Unit =
propose_update(domainId, _.update(trafficControlParameters = None))
}
@Help.Summary("Inspect topology stores")
@ -1886,4 +2204,8 @@ class TopologyAdministrationGroupX(
s"Found ${multipleResults.size} results, but expect at most one."
)
}
private def expectExactlyOneResult[R](seq: Seq[R]): R = expectAtMostOneResult(seq).getOrElse(
throw new IllegalStateException(s"Expected exactly one result, but found none")
)
}

View File

@ -0,0 +1 @@
canton.domains.mydomain.init.domain-parameters.protocol-version = 3

View File

@ -334,8 +334,14 @@ class CantonCommunityConfigTest extends AnyWordSpec with BaseTest {
"parsing our config example snippets" should {
"succeed on all examples" in {
val inputDir = baseDir / "documentation-snippets"
val exclude = List(
"enforce-protocol-version-domain-2.5.conf" // Does not build anymore but needed in the docs
)
inputDir
.list(_.extension.contains(".conf"))
.filterNot(file => exclude.contains(file.name))
.foreach(file =>
loggerFactory.assertLogsUnorderedOptional(
loadFiles(Seq(simpleConf, "documentation-snippets/" + file.name))

View File

@ -143,7 +143,7 @@ object FutureSupervisor {
fut.thereafter {
case Failure(exception) =>
log(
s"${description} failed with exception after ${elapsed(itm)}",
s"${description} failed with exception after ${elapsed(itm)}: $exception",
logLevel,
errorLoggingContext,
)

View File

@ -13,10 +13,7 @@ import com.digitalasset.canton.logging.ErrorLoggingContext
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.{DurationConverter, ParsingResult}
import com.digitalasset.canton.time.{
NonNegativeFiniteDuration as NonNegativeFiniteDurationInternal,
PositiveSeconds as PositiveSecondsInternal,
}
import com.digitalasset.canton.time.{NonNegativeFiniteDuration as NonNegativeFiniteDurationInternal}
import com.digitalasset.canton.util.FutureUtil.defaultStackTraceFilter
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.{FutureUtil, LoggerUtil, StackTraceUtil}
@ -438,12 +435,6 @@ final case class PositiveDurationSeconds(underlying: FiniteDuration)
}
def asFiniteApproximation: FiniteDuration = underlying
private[canton] def toInternal: PositiveSecondsInternal = checked(
PositiveSecondsInternal.tryCreate(
asJava
)
)
}
object PositiveDurationSeconds

View File

@ -625,7 +625,7 @@ class SequencedEventValidatorImpl(
event.timestamp,
lastTopologyClientTimestamp(priorEventO),
protocolVersion,
warnIfApproximate = true,
warnIfApproximate = priorEventO.nonEmpty,
optimistic,
)
.leftMap(InvalidTimestampOfSigningKey(event.timestamp, signingTs, _))

View File

@ -318,7 +318,7 @@ object SequencerClientFactory {
auth,
metrics,
processingTimeout,
loggerFactory,
loggerFactory.append("sequencerConnection", connection.sequencerAlias.unwrap),
domainParameters.protocolVersion,
)
}

View File

@ -204,6 +204,8 @@ object NonNegativeFiniteDuration extends RefinedDurationCompanion[NonNegativeFin
def apply(duration: PositiveSeconds): NonNegativeFiniteDuration = checked(
NonNegativeFiniteDuration.tryCreate(duration.duration)
)
def fromConfig(config: NonNegativeFiniteDurationConfig) = NonNegativeFiniteDuration(config.asJava)
}
final case class NonNegativeSeconds private (duration: Duration)
@ -266,6 +268,9 @@ object PositiveSeconds extends RefinedDurationCompanion[PositiveSeconds] {
PositiveSeconds(duration),
s"Duration should be positive and rounded to the second, found: $duration",
)
def fromConfig(config: PositiveDurationSecondsConfig): PositiveSeconds =
PositiveSeconds(config.asJava)
}
object EnrichedDurations {

View File

@ -13,7 +13,9 @@ import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.protocol.DynamicDomainParameters
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.TopologyManagerError.IncreaseOfLedgerTimeRecordTimeTolerance
import com.digitalasset.canton.topology.processing.{EffectiveTime, SequencedTime}
import com.digitalasset.canton.topology.store.TopologyStoreId.{AuthorizedStore, DomainStore}
import com.digitalasset.canton.topology.store.{TopologyStoreId, TopologyStoreX}
@ -23,7 +25,7 @@ import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.SimpleExecutionQueue
import com.digitalasset.canton.util.{MonadUtil, SimpleExecutionQueue}
import com.digitalasset.canton.version.ProtocolVersion
import com.google.common.annotations.VisibleForTesting
@ -384,6 +386,7 @@ abstract class TopologyManagerX[+StoreID <: TopologyStoreId](
{
val ts = timestampForValidation()
for {
_ <- MonadUtil.sequentialTraverse_(transactions)(transactionIsNotDangerous(_, force))
// validate incrementally and apply to in-memory state
_ <- processor
.validateAndApplyAuthorization(
@ -408,6 +411,59 @@ abstract class TopologyManagerX[+StoreID <: TopologyStoreId](
"add-topology-transaction",
)
private def transactionIsNotDangerous(
transaction: SignedTopologyTransactionX[TopologyChangeOpX, TopologyMappingX],
force: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[Future, TopologyManagerError, Unit] = transaction.mapping match {
case DomainParametersStateX(domainId, newDomainParameters) =>
checkLedgerTimeRecordTimeToleranceNotIncreasing(domainId, newDomainParameters, force)
case _ => EitherT.rightT(())
}
private def checkLedgerTimeRecordTimeToleranceNotIncreasing(
domainId: DomainId,
newDomainParameters: DynamicDomainParameters,
force: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, TopologyManagerError, Unit] = {
// See i9028 for a detailed design.
EitherT(for {
headTransactions <- store.findPositiveTransactions(
asOf = CantonTimestamp.MaxValue,
asOfInclusive = false,
isProposal = false,
types = Seq(DomainParametersStateX.code),
filterUid = Some(Seq(domainId.uid)),
filterNamespace = None,
)
} yield {
headTransactions.toTopologyState
.collectFirst { case DomainParametersStateX(_, previousParameters) =>
previousParameters
} match {
case None => Right(())
case Some(domainParameters) =>
val changeIsDangerous =
newDomainParameters.ledgerTimeRecordTimeTolerance > domainParameters.ledgerTimeRecordTimeTolerance
if (changeIsDangerous && force) {
logger.info(
s"Forcing dangerous increase of ledger time record time tolerance from ${domainParameters.ledgerTimeRecordTimeTolerance} to ${newDomainParameters.ledgerTimeRecordTimeTolerance}"
)
}
Either.cond(
!changeIsDangerous || force,
(),
IncreaseOfLedgerTimeRecordTimeTolerance.TemporarilyInsecure(
domainParameters.ledgerTimeRecordTimeTolerance,
newDomainParameters.ledgerTimeRecordTimeTolerance,
),
)
}
})
}
/** notify observers about new transactions about to be stored */
protected def notifyObservers(
timestamp: CantonTimestamp,

View File

@ -29,7 +29,7 @@ object TerminateProcessing {
* is finished. Hence, this no-op terminate processing should be used only in domain nodes.
*/
private[processing] object NoOpTerminateTopologyProcessing extends TerminateProcessing {
private[canton] object NoOpTerminateTopologyProcessing extends TerminateProcessing {
override def terminate(
sc: SequencerCounter,
sequencedTime: SequencedTime,

View File

@ -327,7 +327,12 @@ abstract class TopologyTransactionProcessorCommonImpl[M](
),
)
internalProcessEnvelopes(sc, sequencedTime, transactionsF)
case _: DeliverError => HandlerResult.done
case err: DeliverError =>
internalProcessEnvelopes(
err.counter,
SequencedTime(err.timestamp),
FutureUnlessShutdown.pure(Nil),
)
}
}
}

View File

@ -141,6 +141,7 @@ class TopologyTransactionProcessorX(
)
)
// TODO(#15089): do not notify the terminate processing for replayed events
_ <- performUnlessClosingF("terminate-processing")(
terminateProcessing.terminate(sc, sequencingTimestamp, effectiveTimestamp)
)

View File

@ -210,8 +210,8 @@ public final class SubmitCommandsRequest {
listOfCommands);
}
// TODO(i15642) Refactor this to take CommmandsSubmission when deprecated methods using it below are
// removed
// TODO(i15642) Refactor this to take CommmandsSubmission when deprecated methods using it below
// are removed
private static CommandsOuterClass.Commands deprecatedToProto(
@NonNull String ledgerId,
@NonNull String workflowId,

View File

@ -11,17 +11,18 @@ import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/** Annotation for tagging whole test suites as unstable.
/**
* Annotation for tagging whole test suites as unstable.
*
* Unstable tests will only run as part of stability_test jobs and/or as part of unstable_test.
* Unstable tests are still periodically executed and failures are reported to DataDog.
* But pull requests can still be merged, even if unstable tests fail.
* <p>Unstable tests will only run as part of stability_test jobs and/or as part of unstable_test.
* Unstable tests are still periodically executed and failures are reported to DataDog. But pull
* requests can still be merged, even if unstable tests fail.
*
* The UnstableTest annotation and tag have currently no effect on Fabric/Ethereum/Ccf/Nightly tests.
* <p>The UnstableTest annotation and tag have currently no effect on Fabric/Ethereum/Ccf/Nightly
* tests.
*/
@org.scalatest.TagAnnotation
@Inherited
@Retention(RUNTIME)
@Target({METHOD, TYPE})
public @interface UnstableTest {
}
public @interface UnstableTest {}

View File

@ -0,0 +1,526 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator
import cats.data.EitherT
import cats.instances.future.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.concurrent.ExecutionContextIdlenessExecutorService
import com.digitalasset.canton.config.*
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.domain.admin.v0.MediatorInitializationServiceGrpc
import com.digitalasset.canton.domain.mediator.admin.gprc.InitializeMediatorRequest
import com.digitalasset.canton.domain.mediator.service.GrpcMediatorInitializationService
import com.digitalasset.canton.domain.mediator.store.{
MediatorDomainConfiguration,
MediatorDomainConfigurationStore,
}
import com.digitalasset.canton.domain.mediator.topology.MediatorTopologyManager
import com.digitalasset.canton.domain.metrics.MediatorNodeMetrics
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.ComponentStatus
import com.digitalasset.canton.health.admin.data.MediatorNodeStatus
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.resource.Storage
import com.digitalasset.canton.sequencing.client.SequencerClientConfig
import com.digitalasset.canton.time.{Clock, HasUptime}
import com.digitalasset.canton.topology
import com.digitalasset.canton.topology.admin.grpc.GrpcTopologyManagerWriteService
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.processing.{
TopologyTransactionProcessor,
TopologyTransactionProcessorCommon,
}
import com.digitalasset.canton.topology.store.TopologyStoreId.DomainStore
import com.digitalasset.canton.topology.store.{
DomainTopologyStore,
StoredTopologyTransaction,
StoredTopologyTransactions,
TopologyStoreId,
}
import com.digitalasset.canton.topology.transaction.{
MediatorDomainState,
RequestSide,
TopologyChangeOp,
TopologyMapping,
}
import com.digitalasset.canton.topology.{DomainId, MediatorId, NodeId}
import com.digitalasset.canton.util.{EitherTUtil, SimpleExecutionQueue}
import com.digitalasset.canton.version.{ProtocolVersion, ProtocolVersionCompatibility}
import monocle.macros.syntax.lens.*
import org.apache.pekko.actor.ActorSystem
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.Future
/** Community mediator Node configuration
*
* @param init all nodes must provided a init config however the mediator cannot auto initialize itself so defaults `autoInit` to `false`
* @param timeTracker configuration for how time is tracked on the connected domain using the sequencer
*/
final case class CommunityMediatorNodeConfig(
override val adminApi: CommunityAdminServerConfig = CommunityAdminServerConfig(),
override val storage: CommunityStorageConfig = CommunityStorageConfig.Memory(),
override val crypto: CommunityCryptoConfig = CommunityCryptoConfig(),
override val init: InitConfig = InitConfig(identity = None),
override val timeTracker: DomainTimeTrackerConfig = DomainTimeTrackerConfig(),
override val sequencerClient: SequencerClientConfig = SequencerClientConfig(),
override val caching: CachingConfigs = CachingConfigs(),
parameters: MediatorNodeParameterConfig = MediatorNodeParameterConfig(),
override val monitoring: NodeMonitoringConfig = NodeMonitoringConfig(),
override val topologyX: TopologyXConfig = TopologyXConfig.NotUsed,
) extends MediatorNodeConfigCommon(
adminApi,
storage,
crypto,
init,
timeTracker,
sequencerClient,
caching,
monitoring,
)
with ConfigDefaults[DefaultPorts, CommunityMediatorNodeConfig] {
override val nodeTypeName: String = "mediator"
override def replicationEnabled: Boolean = false
override def withDefaults(ports: DefaultPorts): CommunityMediatorNodeConfig = {
this
.focus(_.adminApi.internalPort)
.modify(ports.mediatorAdminApiPort.setDefaultPort)
}
}
abstract class MediatorNodeConfigCommon(
val adminApi: AdminServerConfig,
val storage: StorageConfig,
val crypto: CryptoConfig,
val init: InitConfig,
val timeTracker: DomainTimeTrackerConfig,
val sequencerClient: SequencerClientConfig,
val caching: CachingConfigs,
val monitoring: NodeMonitoringConfig,
) extends LocalNodeConfig {
override def clientAdminApi: ClientConfig = adminApi.clientConfig
def toRemoteConfig: RemoteMediatorConfig = RemoteMediatorConfig(adminApi.clientConfig)
def replicationEnabled: Boolean
}
/** Various parameters for non-standard mediator settings
*
* @param dontWarnOnDeprecatedPV if true, then this mediator will not emit a warning when connecting to a sequencer using a deprecated protocol version.
*/
final case class MediatorNodeParameterConfig(
// TODO(i15561): Revert back to `false` once there is a stable Daml 3 protocol version
override val devVersionSupport: Boolean = true,
override val dontWarnOnDeprecatedPV: Boolean = false,
override val initialProtocolVersion: ProtocolVersion = ProtocolVersion.latest,
batching: BatchingConfig = BatchingConfig(),
) extends ProtocolConfig
with LocalNodeParametersConfig
final case class MediatorNodeParameters(
general: CantonNodeParameters.General,
protocol: CantonNodeParameters.Protocol,
) extends CantonNodeParameters
with HasGeneralCantonNodeParameters
with HasProtocolCantonNodeParameters
final case class RemoteMediatorConfig(
adminApi: ClientConfig
) extends NodeConfig {
override def clientAdminApi: ClientConfig = adminApi
}
class MediatorNodeBootstrap(
arguments: CantonNodeBootstrapCommonArguments[
MediatorNodeConfigCommon,
MediatorNodeParameters,
MediatorNodeMetrics,
],
override protected val replicaManager: MediatorReplicaManagerStub,
override protected val mediatorRuntimeFactory: MediatorRuntimeFactory,
)(
implicit executionContext: ExecutionContextIdlenessExecutorService,
override protected implicit val executionSequencerFactory: ExecutionSequencerFactory,
scheduler: ScheduledExecutorService,
actorSystem: ActorSystem,
) extends CantonNodeBootstrapBase[
MediatorNode,
MediatorNodeConfigCommon,
MediatorNodeParameters,
MediatorNodeMetrics,
](
arguments
)
with MediatorNodeBootstrapCommon[MediatorNode, MediatorNodeConfigCommon] {
protected val topologyManager =
new MediatorTopologyManager(
clock,
authorizedTopologyStore,
crypto.value,
timeouts,
parameters.initialProtocolVersion,
loggerFactory,
futureSupervisor,
)
startTopologyManagementWriteService(topologyManager)
protected val sequencedTopologyStore =
new DomainTopologyStore(storage, timeouts, loggerFactory, futureSupervisor)
override protected def sequencedTopologyStores
: Seq[com.digitalasset.canton.topology.store.TopologyStore[TopologyStoreId]] =
sequencedTopologyStore.get().toList
protected val domainConfigurationStore =
MediatorDomainConfigurationStore(storage, timeouts, loggerFactory)
protected override val supportsReplicaInitialization: Boolean = config.replicationEnabled
/** Simple async action queue to ensure no initialization functionality runs concurrently avoiding any chance
* of races.
*/
private val initializationActionQueue = new SimpleExecutionQueue(
"mediator-init-queue",
futureSupervisor,
timeouts,
loggerFactory,
)
adminServerRegistry
.addServiceU(
topology.admin.v0.TopologyManagerWriteServiceGrpc
.bindService(
new GrpcTopologyManagerWriteService(
topologyManager,
crypto.value.cryptoPublicStore,
parameters.initialProtocolVersion,
loggerFactory,
),
executionContext,
)
)
adminServerRegistry
.addServiceU(
MediatorInitializationServiceGrpc
.bindService(
new GrpcMediatorInitializationService(handleInitializationRequest, loggerFactory),
executionContext,
),
true,
)
// TODO(#11052) duplicate init methods
override def initializeWithProvidedId(id: NodeId): EitherT[Future, String, Unit] = EitherT.leftT(
"This method is disabled for mediator nodes. Please use normal mediator initialization methods"
)
/** Generate an identity for the node. */
override protected def autoInitializeIdentity(
initConfigBase: InitConfigBase
): EitherT[FutureUnlessShutdown, String, Unit] = {
// Much like sequencer nodes, mediator nodes cannot initialize their own identity as it is
// inherently tied to the identity of the domain of which it is operating on behalf of.
// The domain node will call the mediator with its identity to initialize it as well as
// providing sufficient identity information and configuration to connect to a sequencer.
logger.warn(
"Mediator cannot auto initialize their identity. init.auto-init should be set to false"
)
EitherT.pure(())
}
/** Handle the initialization request (presumably from the domain node). */
private def handleInitializationRequest(
request: InitializeMediatorRequest
): EitherT[FutureUnlessShutdown, String, SigningPublicKey] = {
logger.debug("Handling initialization request")
// ensure initialization actions aren't run concurrently
initializationActionQueue.executeE(
{
def domainConfiguration(initialKeyFingerprint: Fingerprint): MediatorDomainConfiguration =
MediatorDomainConfiguration(
initialKeyFingerprint,
request.domainId,
request.domainParameters,
request.sequencerConnections,
)
def storeTopologySnapshot(): EitherT[Future, String, Unit] =
request.topologyState match {
case Some(snapshot) =>
val store = sequencedTopologyStore.initOrGet(DomainStore(request.domainId))
for {
cleanSnapshot <- EitherT.fromEither(
MediatorNodeBootstrap.checkIfTopologySnapshotIsValid(
request.mediatorId,
snapshot,
request.domainId.unwrap == request.mediatorId.uid,
)
)
_ = if (cleanSnapshot.result.length != snapshot.result.length)
logger.debug(
s"Storing initial topology state of length=${cleanSnapshot.result.length} (passed one had length=${snapshot.result.length} into domain store)"
)
_ <- EitherT.right(store.bootstrap(cleanSnapshot))
} yield ()
case None => EitherT.pure(())
}
def initializeMediator: EitherT[Future, String, SigningPublicKey] =
for {
_ <- EitherT.fromEither[Future](
ProtocolVersionCompatibility.isSupportedByDomainNode(
parameters,
request.domainParameters.protocolVersion,
)
)
_ <- storeTopologySnapshot()
key <- request.signingKeyFingerprint
.map(CantonNodeBootstrapCommon.getOrCreateSigningKeyByFingerprint(crypto.value))
.getOrElse(
CantonNodeBootstrapCommon.getOrCreateSigningKey(crypto.value)(s"$name-signing")
)
_ <- domainConfigurationStore
.saveConfiguration(domainConfiguration(key.fingerprint))
.leftMap(err => s"Failed to save domain configuration: $err")
// store the new mediator identity. It's only once this completes successfully that a mediator is considered
// initialized. If we crash/fail after storing the domain config but before storing the id the Mediator will
// need to be called by the domain initialization again.
nodeId = NodeId(request.mediatorId.uid)
_ = logger.debug(
s"Initializing mediator node with member id [${nodeId.identity}] "
)
_ <- storeId(nodeId)
_ = EitherTUtil.doNotAwait(
initialize(nodeId).onShutdown(
Left("Initialized successfully but aborting startup due to shutdown")
),
"Failed to initialize mediator",
)
} yield key
def replyWithExistingConfiguration(
existingConfig: MediatorDomainConfiguration
): EitherT[Future, String, SigningPublicKey] =
for {
// first check they're at least trying to initialize us for the same domain
_ <- EitherT.cond(
existingConfig.domainId == request.domainId,
(),
s"Attempt to initialize mediator for domain [${request.domainId}]. Mediator is already initialized for domain [${existingConfig.domainId}]",
)
// Returns the initial signing key. If the mediator node has rolled over its keys, the initial key may not be valid anymore.
// However a mediator node should not be initialized again after it has been running for a while.
existingKey <- crypto.value.cryptoPublicStore
.signingKey(existingConfig.initialKeyFingerprint)
.leftMap(_.toString)
.subflatMap(_.toRight(s"Failed to lookup initial signing key"))
} yield existingKey
for {
// if initialize is called on a passive instance any write operation will throw a passive instance exception,
// however do a check upfront to provide a more helpful error message if we're already passive at this point
_ <- EitherTUtil.condUnitET(storage.isActive, "Mediator replica is not active")
// check to see if we're already initialized
existingId <- EitherT.right(initializationStore.id)
// check to see if we've already initialized the mediator domain configuration
existingConfiguration <- domainConfigurationStore.fetchConfiguration
.leftMap(err => s"Failed to fetch domain configuration from database: $err")
// if we have an id, we should have had persisted the domain configuration. check this is true.
_ <- (existingId, existingConfiguration) match {
case (Some(_id), None) =>
logger.warn(
"Mediator is in a bad state. It has a stored identity but is missing the required domain configuration to successfully start."
)
EitherT.leftT(
"Mediator has an identity but is missing the required domain configuration"
)
case _ => EitherT.pure(())
}
// if we have already been initialized take our original mediator signing key, otherwise start the process of
// initializing the mediator. if there is an existing configuration but no stored id it means we haven't successfully
// initialized before and there for will attempt to initialize with the new config.
mediatorKey <- existingId
.flatMap(_ =>
existingConfiguration
) // we've validated above that if existingId is set then existingConfiguration is also present
.fold(initializeMediator)(replyWithExistingConfiguration)
} yield mediatorKey
},
s"handle initialization request for ${request.domainId}",
)
}
/** Attempt to start the node with this identity. */
override protected def initialize(nodeId: NodeId): EitherT[FutureUnlessShutdown, String, Unit] = {
val mediatorId = MediatorId(nodeId.identity)
def topologyComponentFactory(domainId: DomainId, protocolVersion: ProtocolVersion): EitherT[
Future,
String,
(TopologyTransactionProcessorCommon, DomainTopologyClientWithInit),
] = {
val topologyStore = sequencedTopologyStore.initOrGet(DomainStore(domainId))
EitherT.right(
TopologyTransactionProcessor.createProcessorAndClientForDomain(
topologyStore,
mediatorId,
domainId,
protocolVersion,
crypto.value,
Map(),
parameters,
clock,
futureSupervisor,
loggerFactory,
)
)
}
startInstanceUnlessClosing(performUnlessClosingEitherU(functionFullName) {
initializeNodePrerequisites(
storage,
crypto.value,
mediatorId,
fetchConfig = () => domainConfigurationStore.fetchConfiguration.leftMap(_.toString),
saveConfig = domainConfigurationStore.saveConfiguration(_).leftMap(_.toString),
indexedStringStore,
topologyComponentFactory,
None,
None,
None,
).map(domainId =>
new MediatorNode(
config,
mediatorId,
domainId,
replicaManager,
storage,
clock,
loggerFactory,
nodeHealthService.dependencies.map(_.toComponentStatus),
)
)
})
}
override protected def onClosed(): Unit = {
Lifecycle.close(
initializationActionQueue,
sequencedTopologyStore,
domainConfigurationStore,
deferredSequencerClientHealth,
)(
logger
)
super.onClosed()
}
}
object MediatorNodeBootstrap {
val LoggerFactoryKeyName: String = "mediator"
/** validates if topology snapshot is suitable to initialise mediator */
def checkIfTopologySnapshotIsValid(
mediatorId: MediatorId,
snapshot: StoredTopologyTransactions[TopologyChangeOp.Positive],
emptyIsValid: Boolean,
): Either[String, StoredTopologyTransactions[TopologyChangeOp.Positive]] = {
def accumulateSide(x: TopologyMapping, sides: (Boolean, Boolean)): (Boolean, Boolean) =
x match {
case x: MediatorDomainState if x.mediator == mediatorId =>
RequestSide.accumulateSide(sides, x.side)
case _ => sides
}
// empty snapshot is valid
// TODO(i8054) rip out default mediator initialisation
if (emptyIsValid && snapshot.result.isEmpty) Right(snapshot)
else {
val (accumulated, (fromSeen, toSeen)) = snapshot.result.foldLeft(
(Seq.empty[StoredTopologyTransaction[TopologyChangeOp.Positive]], (false, false))
) {
// don't add any more transactions after initialisation
case (acc @ (_, (true, true)), _) => acc
case ((acc, sides), elem) =>
val (from, to) = accumulateSide(elem.transaction.transaction.element.mapping, sides)
// append element unless the mediator is initialised in this mapping (as we will receive this mapping
// from the dispatcher
if (from && to)
(acc, (from, to))
else (acc :+ elem, (from, to))
}
Either.cond(
fromSeen && toSeen,
StoredTopologyTransactions(accumulated),
s"Mediator ${mediatorId} is not activated in the given snapshot",
)
}
}
}
class MediatorNodeCommon(
config: MediatorNodeConfigCommon,
mediatorId: MediatorId,
domainId: DomainId,
replicaManager: MediatorReplicaManagerStub,
storage: Storage,
override protected val clock: Clock,
override protected val loggerFactory: NamedLoggerFactory,
healthData: => Seq[ComponentStatus],
) extends CantonNode
with NamedLogging
with HasUptime {
def isActive: Boolean = replicaManager.isActive
def status: Future[MediatorNodeStatus] = {
val ports = Map("admin" -> config.adminApi.port)
Future.successful(
MediatorNodeStatus(
mediatorId.uid,
domainId,
uptime(),
ports,
replicaManager.isActive,
replicaManager.getTopologyQueueStatus(),
healthData,
)
)
}
override def close(): Unit =
Lifecycle.close(
replicaManager,
storage,
)(logger)
}
class MediatorNode(
config: MediatorNodeConfigCommon,
mediatorId: MediatorId,
domainId: DomainId,
replicaManager: MediatorReplicaManagerStub,
storage: Storage,
clock: Clock,
loggerFactory: NamedLoggerFactory,
components: => Seq[ComponentStatus],
) extends MediatorNodeCommon(
config,
mediatorId,
domainId,
replicaManager,
storage,
clock,
loggerFactory,
components,
)

View File

@ -0,0 +1,306 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator
import cats.Monad
import cats.data.EitherT
import cats.instances.future.*
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.DomainAlias
import com.digitalasset.canton.common.domain.grpc.SequencerInfoLoader
import com.digitalasset.canton.config.*
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.domain.Domain
import com.digitalasset.canton.domain.mediator.store.MediatorDomainConfiguration
import com.digitalasset.canton.domain.metrics.MediatorNodeMetrics
import com.digitalasset.canton.domain.service.GrpcSequencerConnectionService
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.{HealthService, MutableHealthComponent}
import com.digitalasset.canton.resource.Storage
import com.digitalasset.canton.sequencing.SequencerConnections
import com.digitalasset.canton.sequencing.client.{
RequestSigner,
SequencerClient,
SequencerClientFactory,
}
import com.digitalasset.canton.store.db.SequencerClientDiscriminator
import com.digitalasset.canton.store.{
IndexedStringStore,
SendTrackerStore,
SequencedEventStore,
SequencerCounterTrackerStore,
}
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.processing.TopologyTransactionProcessorCommon
import com.digitalasset.canton.util.ResourceUtil
import com.digitalasset.canton.version.{ProtocolVersion, ProtocolVersionCompatibility}
import monocle.Lens
import scala.concurrent.Future
trait MediatorNodeBootstrapCommon[
T <: CantonNode,
NC <: LocalNodeConfig & MediatorNodeConfigCommon,
] {
this: CantonNodeBootstrapCommon[T, NC, MediatorNodeParameters, MediatorNodeMetrics] =>
type TopologyComponentFactory = (DomainId, ProtocolVersion) => EitherT[
Future,
String,
(TopologyTransactionProcessorCommon, DomainTopologyClientWithInit),
]
protected val replicaManager: MediatorReplicaManagerStub
protected def mediatorRuntimeFactory: MediatorRuntimeFactory
protected lazy val deferredSequencerClientHealth =
MutableHealthComponent(loggerFactory, SequencerClient.healthName, timeouts)
protected implicit def executionSequencerFactory: ExecutionSequencerFactory
override protected def mkNodeHealthService(storage: Storage): HealthService =
HealthService(
"mediator",
logger,
timeouts,
Seq(storage),
softDependencies = Seq(deferredSequencerClientHealth),
)
/** Attempt to start the node with this identity. */
protected def initializeNodePrerequisites(
storage: Storage,
crypto: Crypto,
mediatorId: MediatorId,
fetchConfig: () => EitherT[Future, String, Option[MediatorDomainConfiguration]],
saveConfig: MediatorDomainConfiguration => EitherT[Future, String, Unit],
indexedStringStore: IndexedStringStore,
topologyComponentFactory: TopologyComponentFactory,
topologyManagerStatusO: Option[TopologyManagerStatus],
maybeDomainTopologyStateInit: Option[DomainTopologyInitializationCallback],
maybeDomainOutboxFactory: Option[DomainOutboxXFactory],
): EitherT[Future, String, DomainId] =
for {
domainConfig <- fetchConfig()
.leftMap(err => s"Failed to fetch domain configuration: $err")
.flatMap { x =>
EitherT.fromEither(
x.toRight(
s"Mediator domain config has not been set. Must first be initialized by the domain in order to start."
)
)
}
sequencerInfoLoader = new SequencerInfoLoader(
timeouts = timeouts,
traceContextPropagation = parameters.tracing.propagation,
clientProtocolVersions = NonEmpty.mk(Seq, domainConfig.domainParameters.protocolVersion),
minimumProtocolVersion = Some(domainConfig.domainParameters.protocolVersion),
dontWarnOnDeprecatedPV = parameterConfig.dontWarnOnDeprecatedPV,
loggerFactory = loggerFactory.append("domainId", domainConfig.domainId.toString),
)
_ <- EitherT.right[String](
replicaManager.setup(
adminServerRegistry,
() =>
mkMediatorRuntime(
mediatorId,
domainConfig,
indexedStringStore,
fetchConfig,
saveConfig,
storage,
crypto,
topologyComponentFactory,
topologyManagerStatusO,
maybeDomainTopologyStateInit,
maybeDomainOutboxFactory,
sequencerInfoLoader,
),
storage.isActive,
)
)
} yield domainConfig.domainId
private def mkMediatorRuntime(
mediatorId: MediatorId,
domainConfig: MediatorDomainConfiguration,
indexedStringStore: IndexedStringStore,
fetchConfig: () => EitherT[Future, String, Option[MediatorDomainConfiguration]],
saveConfig: MediatorDomainConfiguration => EitherT[Future, String, Unit],
storage: Storage,
crypto: Crypto,
topologyComponentFactory: TopologyComponentFactory,
topologyManagerStatusO: Option[TopologyManagerStatus],
maybeDomainTopologyStateInit: Option[DomainTopologyInitializationCallback],
maybeDomainOutboxFactory: Option[DomainOutboxXFactory],
sequencerInfoLoader: SequencerInfoLoader,
): EitherT[Future, String, MediatorRuntime] = {
val domainId = domainConfig.domainId
val domainLoggerFactory = loggerFactory.append("domainId", domainId.toString)
val domainAlias = DomainAlias(domainConfig.domainId.uid.toLengthLimitedString)
def getSequencerConnectionFromStore = fetchConfig()
.map(_.map(_.sequencerConnections))
for {
_ <- CryptoHandshakeValidator
.validate(domainConfig.domainParameters, config.crypto)
.toEitherT
mediatorClient <- EitherT.right(
SequencerClientDiscriminator.fromDomainMember(mediatorId, indexedStringStore)
)
sequencedEventStore = SequencedEventStore(
storage,
mediatorClient,
domainConfig.domainParameters.protocolVersion,
timeouts,
domainLoggerFactory,
)
sendTrackerStore = SendTrackerStore(storage)
sequencerCounterTrackerStore = SequencerCounterTrackerStore(
storage,
mediatorClient,
timeouts,
domainLoggerFactory,
)
topologyProcessorAndClient <- topologyComponentFactory(
domainId,
domainConfig.domainParameters.protocolVersion,
)
(topologyProcessor, topologyClient) = topologyProcessorAndClient
_ = ips.add(topologyClient)
syncCryptoApi = new DomainSyncCryptoClient(
mediatorId,
domainId,
topologyClient,
crypto,
parameters.cachingConfigs,
timeouts,
futureSupervisor,
domainLoggerFactory,
)
sequencerClientFactory = SequencerClientFactory(
domainId,
syncCryptoApi,
crypto,
None,
parameters.sequencerClient,
parameters.tracing.propagation,
arguments.testingConfig,
domainConfig.domainParameters,
timeouts,
clock,
topologyClient,
futureSupervisor,
member =>
Domain.recordSequencerInteractions
.get()
.lift(member)
.map(Domain.setMemberRecordingPath(member)),
member =>
Domain.replaySequencerConfig.get().lift(member).map(Domain.defaultReplayPath(member)),
arguments.metrics.mediator.sequencerClient,
parameters.loggingConfig,
domainLoggerFactory,
ProtocolVersionCompatibility.trySupportedProtocolsDomain(parameters),
None,
)
sequencerClientRef =
GrpcSequencerConnectionService.setup[MediatorDomainConfiguration](mediatorId)(
adminServerRegistry,
fetchConfig,
saveConfig,
Lens[MediatorDomainConfiguration, SequencerConnections](_.sequencerConnections)(
connection => conf => conf.copy(sequencerConnections = connection)
),
RequestSigner(syncCryptoApi, domainConfig.domainParameters.protocolVersion),
sequencerClientFactory,
sequencerInfoLoader,
domainAlias,
)
// we wait here until the sequencer becomes active. this allows to reconfigure the
// sequencer client address
connections <- GrpcSequencerConnectionService.waitUntilSequencerConnectionIsValid(
sequencerClientFactory,
this,
futureSupervisor,
getSequencerConnectionFromStore,
)
info <- sequencerInfoLoader
.loadSequencerEndpoints(
domainAlias,
connections,
)
.leftMap(_.cause)
requestSigner = RequestSigner(syncCryptoApi, domainConfig.domainParameters.protocolVersion)
_ <- maybeDomainTopologyStateInit match {
case Some(domainTopologyStateInit) =>
val headSnapshot = topologyClient.headSnapshot
for {
// TODO(i12076): Request topology information from all sequencers and reconcile
isMediatorActive <- EitherT.right[String](headSnapshot.isMediatorActive(mediatorId))
_ <- Monad[EitherT[Future, String, *]].whenA(!isMediatorActive)(
sequencerClientFactory
.makeTransport(
info.sequencerConnections.default,
mediatorId,
requestSigner,
)
.flatMap(
ResourceUtil.withResourceEitherT(_)(
domainTopologyStateInit
.callback(topologyClient, _, domainConfig.domainParameters.protocolVersion)
)
)
)
} yield {}
case None => EitherT.pure[Future, String](())
}
sequencerClient <- sequencerClientFactory.create(
mediatorId,
sequencedEventStore,
sendTrackerStore,
requestSigner,
info.sequencerConnections,
info.expectedSequencers,
)
_ = sequencerClientRef.set(sequencerClient)
_ = deferredSequencerClientHealth.set(sequencerClient.healthComponent)
// can just new up the enterprise mediator factory here as the mediator node is only available in enterprise setups
mediatorRuntime <- mediatorRuntimeFactory.create(
mediatorId,
domainId,
storage,
sequencerCounterTrackerStore,
sequencedEventStore,
sequencerClient,
syncCryptoApi,
topologyClient,
topologyProcessor,
topologyManagerStatusO,
maybeDomainOutboxFactory,
config.timeTracker,
parameters,
domainConfig.domainParameters.protocolVersion,
clock,
arguments.metrics.mediator,
futureSupervisor,
domainLoggerFactory,
)
_ <- mediatorRuntime.start()
} yield mediatorRuntime
}
}

View File

@ -0,0 +1,386 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator
import cats.data.EitherT
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.canton.concurrent.ExecutionContextIdlenessExecutorService
import com.digitalasset.canton.config.*
import com.digitalasset.canton.crypto.{Crypto, Fingerprint}
import com.digitalasset.canton.domain.admin.v2.MediatorInitializationServiceGrpc
import com.digitalasset.canton.domain.mediator.admin.gprc.{
InitializeMediatorRequestX,
InitializeMediatorResponseX,
}
import com.digitalasset.canton.domain.mediator.service.GrpcMediatorInitializationServiceX
import com.digitalasset.canton.domain.mediator.store.{
MediatorDomainConfiguration,
MediatorDomainConfigurationStore,
}
import com.digitalasset.canton.domain.metrics.MediatorNodeMetrics
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.{ComponentStatus, GrpcHealthReporter, HealthService}
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, HasCloseContext}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.resource.Storage
import com.digitalasset.canton.sequencing.client.SequencerClientConfig
import com.digitalasset.canton.store.IndexedStringStore
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.processing.{
TopologyTransactionProcessorCommon,
TopologyTransactionProcessorX,
}
import com.digitalasset.canton.topology.store.TopologyStoreId.DomainStore
import com.digitalasset.canton.topology.store.TopologyStoreX
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.SingleUseCell
import com.digitalasset.canton.version.ProtocolVersion
import monocle.macros.syntax.lens.*
import org.apache.pekko.actor.ActorSystem
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.Future
/** Community Mediator Node X configuration that defaults to auto-init
*/
final case class CommunityMediatorNodeXConfig(
override val adminApi: CommunityAdminServerConfig = CommunityAdminServerConfig(),
override val storage: CommunityStorageConfig = CommunityStorageConfig.Memory(),
override val crypto: CommunityCryptoConfig = CommunityCryptoConfig(),
override val init: InitConfig = InitConfig(identity = Some(InitConfigBase.Identity())),
override val timeTracker: DomainTimeTrackerConfig = DomainTimeTrackerConfig(),
override val sequencerClient: SequencerClientConfig = SequencerClientConfig(),
override val caching: CachingConfigs = CachingConfigs(),
parameters: MediatorNodeParameterConfig = MediatorNodeParameterConfig(),
override val monitoring: NodeMonitoringConfig = NodeMonitoringConfig(),
override val topologyX: TopologyXConfig = TopologyXConfig(),
) extends MediatorNodeConfigCommon(
adminApi,
storage,
crypto,
init,
timeTracker,
sequencerClient,
caching,
monitoring,
)
with ConfigDefaults[DefaultPorts, CommunityMediatorNodeXConfig] {
override val nodeTypeName: String = "mediatorx"
override def replicationEnabled: Boolean = false
override def withDefaults(ports: DefaultPorts): CommunityMediatorNodeXConfig = {
this
.focus(_.adminApi.internalPort)
.modify(ports.mediatorXAdminApiPort.setDefaultPort)
}
/** the following case class match will help us detect any additional configuration options added
* for "classic" non-X nodes that may apply to X-nodes as well.
*/
private def _completenessCheck(
classicConfig: CommunityMediatorNodeConfig
): CommunityMediatorNodeXConfig =
classicConfig match {
case CommunityMediatorNodeConfig(
adminApi,
storage,
crypto,
init,
timeTracker,
sequencerClient,
caching,
parameters,
monitoring,
topologyX,
) =>
CommunityMediatorNodeXConfig(
adminApi,
storage,
crypto,
init,
timeTracker,
sequencerClient,
caching,
parameters,
monitoring,
)
}
}
class MediatorNodeBootstrapX(
arguments: CantonNodeBootstrapCommonArguments[
MediatorNodeConfigCommon,
MediatorNodeParameters,
MediatorNodeMetrics,
],
protected val replicaManager: MediatorReplicaManagerStub,
override protected val mediatorRuntimeFactory: MediatorRuntimeFactory,
)(
implicit executionContext: ExecutionContextIdlenessExecutorService,
override protected implicit val executionSequencerFactory: ExecutionSequencerFactory,
scheduler: ScheduledExecutorService,
actorSystem: ActorSystem,
) extends CantonNodeBootstrapX[
MediatorNodeX,
MediatorNodeConfigCommon,
MediatorNodeParameters,
MediatorNodeMetrics,
](arguments)
with MediatorNodeBootstrapCommon[MediatorNodeX, MediatorNodeConfigCommon] {
override protected def member(uid: UniqueIdentifier): Member = MediatorId(uid)
private val domainTopologyManager = new SingleUseCell[DomainTopologyManagerX]()
override protected def sequencedTopologyStores: Seq[TopologyStoreX[DomainStore]] =
domainTopologyManager.get.map(_.store).toList
override protected def sequencedTopologyManagers: Seq[DomainTopologyManagerX] =
domainTopologyManager.get.toList
private class WaitForMediatorToDomainInit(
storage: Storage,
crypto: Crypto,
mediatorId: MediatorId,
authorizedTopologyManager: AuthorizedTopologyManagerX,
healthService: HealthService,
) extends BootstrapStageWithStorage[MediatorNodeX, StartupNode, DomainId](
"wait-for-mediator-to-domain-init",
bootstrapStageCallback,
storage,
config.init.autoInit,
)
with GrpcMediatorInitializationServiceX.Callback {
adminServerRegistry
.addServiceU(
MediatorInitializationServiceGrpc
.bindService(
new GrpcMediatorInitializationServiceX(this, loggerFactory),
executionContext,
),
true,
)
protected val domainConfigurationStore =
MediatorDomainConfigurationStore(storage, timeouts, loggerFactory)
addCloseable(domainConfigurationStore)
addCloseable(deferredSequencerClientHealth)
override protected def stageCompleted(implicit
traceContext: TraceContext
): Future[Option[DomainId]] = domainConfigurationStore.fetchConfiguration.toOption.mapFilter {
case Some(res) => Some(res.domainId)
case None => None
}.value
override protected def buildNextStage(domainId: DomainId): StartupNode = {
val domainTopologyStore =
TopologyStoreX(DomainStore(domainId), storage, timeouts, loggerFactory)
addCloseable(domainTopologyStore)
val outboxQueue = new DomainOutboxQueue(loggerFactory)
val topologyManager = new DomainTopologyManagerX(
clock = clock,
crypto = crypto,
store = domainTopologyStore,
outboxQueue = outboxQueue,
enableTopologyTransactionValidation = config.topologyX.enableTopologyTransactionValidation,
timeouts = timeouts,
futureSupervisor = futureSupervisor,
loggerFactory = loggerFactory,
)
if (domainTopologyManager.putIfAbsent(topologyManager).nonEmpty) {
// TODO(#14048) how to handle this error properly?
throw new IllegalStateException("domainTopologyManager shouldn't have been set before")
}
new StartupNode(
storage,
crypto,
mediatorId,
authorizedTopologyManager,
topologyManager,
domainId,
domainConfigurationStore,
domainTopologyStore,
healthService,
)
}
override protected def autoCompleteStage()
: EitherT[FutureUnlessShutdown, String, Option[DomainId]] =
EitherT.rightT(None) // this stage doesn't have auto-init
override def initialize(request: InitializeMediatorRequestX)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, InitializeMediatorResponseX] =
completeWithExternal {
logger.info(
s"Assigning mediator to ${request.domainId} via sequencers ${request.sequencerConnections}"
)
domainConfigurationStore
.saveConfiguration(
MediatorDomainConfiguration(
Fingerprint.tryCreate("unused"), // x-nodes do not need to return the initial key
request.domainId,
request.domainParameters,
request.sequencerConnections,
)
)
.leftMap(_.toString)
.map(_ => request.domainId)
}.map(_ => InitializeMediatorResponseX())
}
private class StartupNode(
storage: Storage,
crypto: Crypto,
mediatorId: MediatorId,
authorizedTopologyManager: AuthorizedTopologyManagerX,
domainTopologyManager: DomainTopologyManagerX,
domainId: DomainId,
domainConfigurationStore: MediatorDomainConfigurationStore,
domainTopologyStore: TopologyStoreX[DomainStore],
healthService: HealthService,
) extends BootstrapStage[MediatorNodeX, RunningNode[MediatorNodeX]](
description = "Startup mediator node",
bootstrapStageCallback,
)
with HasCloseContext {
private val domainLoggerFactory = loggerFactory.append("domainId", domainId.toString)
override def attempt()(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, Option[RunningNode[MediatorNodeX]]] = {
def topologyComponentFactory(domainId: DomainId, protocolVersion: ProtocolVersion): EitherT[
Future,
String,
(TopologyTransactionProcessorCommon, DomainTopologyClientWithInit),
] =
EitherT.right(
TopologyTransactionProcessorX.createProcessorAndClientForDomain(
domainTopologyStore,
domainId,
protocolVersion,
crypto,
arguments.parameterConfig,
config.topologyX.enableTopologyTransactionValidation,
arguments.clock,
arguments.futureSupervisor,
domainLoggerFactory,
)
)
val domainOutboxFactory = new DomainOutboxXFactory(
domainId = domainId,
memberId = mediatorId,
authorizedTopologyManager = authorizedTopologyManager,
domainTopologyManager = domainTopologyManager,
crypto = crypto,
topologyXConfig = config.topologyX,
timeouts = timeouts,
loggerFactory = domainLoggerFactory,
)
performUnlessClosingEitherU("starting up mediator node") {
val indexedStringStore = IndexedStringStore.create(
storage,
parameterConfig.cachingConfigs.indexedStrings,
timeouts,
domainLoggerFactory,
)
addCloseable(indexedStringStore)
for {
domainId <- initializeNodePrerequisites(
storage,
crypto,
mediatorId,
() => domainConfigurationStore.fetchConfiguration.leftMap(_.toString),
domainConfigurationStore.saveConfiguration(_).leftMap(_.toString),
indexedStringStore,
topologyComponentFactory,
Some(TopologyManagerStatus.combined(authorizedTopologyManager, domainTopologyManager)),
maybeDomainTopologyStateInit = Some(
new StoreBasedDomainTopologyInitializationCallback(mediatorId, domainTopologyStore)
),
maybeDomainOutboxFactory = Some(domainOutboxFactory),
)
} yield {
val node = new MediatorNodeX(
arguments.config,
mediatorId,
domainId,
replicaManager,
storage,
clock,
domainLoggerFactory,
components = healthService.dependencies.map(_.toComponentStatus),
)
addCloseable(node)
Some(new RunningNode(bootstrapStageCallback, node))
}
}
}
}
override protected def customNodeStages(
storage: Storage,
crypto: Crypto,
nodeId: UniqueIdentifier,
authorizedTopologyManager: AuthorizedTopologyManagerX,
healthServer: GrpcHealthReporter,
healthService: HealthService,
): BootstrapStageOrLeaf[MediatorNodeX] = {
new WaitForMediatorToDomainInit(
storage,
crypto,
MediatorId(nodeId),
authorizedTopologyManager,
healthService,
)
}
override protected def onClosed(): Unit = {
super.onClosed()
}
}
object MediatorNodeBootstrapX {
val LoggerFactoryKeyName: String = "mediatorx"
}
class MediatorNodeX(
config: MediatorNodeConfigCommon,
mediatorId: MediatorId,
domainId: DomainId,
protected[canton] val replicaManager: MediatorReplicaManagerStub,
storage: Storage,
clock: Clock,
loggerFactory: NamedLoggerFactory,
components: => Seq[ComponentStatus],
) extends MediatorNodeCommon(
config,
mediatorId,
domainId,
replicaManager,
storage,
clock,
loggerFactory,
components,
) {
override def close(): Unit = {
super.close()
}
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator
import cats.data.EitherT
import com.digitalasset.canton.health.admin.data.TopologyQueueStatus
import com.digitalasset.canton.networking.grpc.CantonMutableHandlerRegistry
import com.digitalasset.canton.tracing.TraceContext
import scala.concurrent.Future
trait MediatorReplicaManagerStub extends AutoCloseable {
def setup(
adminServiceRegistry: CantonMutableHandlerRegistry,
factory: () => EitherT[Future, String, MediatorRuntime],
isActive: Boolean,
)(implicit traceContext: TraceContext): Future[Unit]
def isActive: Boolean
def getTopologyQueueStatus(): TopologyQueueStatus
protected[canton] def mediatorRuntime: Option[MediatorRuntime]
}

View File

@ -0,0 +1,67 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.service
import cats.data.EitherT
import cats.syntax.either.*
import com.digitalasset.canton.crypto.SigningPublicKey
import com.digitalasset.canton.domain.admin.v0
import com.digitalasset.canton.domain.mediator.admin.gprc.{
InitializeMediatorRequest,
InitializeMediatorResponse,
}
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil.*
import com.digitalasset.canton.tracing.TraceContextGrpc
import com.digitalasset.canton.util.EitherTUtil
import io.grpc.Status
import scala.concurrent.{ExecutionContext, Future}
/** Hosts the initialization service for the mediator.
* Upon receiving an initialize request it will the provided `initialize` function.
* Will ensure that `initialize` is not called concurrently.
*/
class GrpcMediatorInitializationService(
initialization: InitializeMediatorRequest => EitherT[
FutureUnlessShutdown,
String,
SigningPublicKey,
],
protected val loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext)
extends v0.MediatorInitializationServiceGrpc.MediatorInitializationService
with NamedLogging {
/** Initialize a Mediator service
* If the Mediator is uninitialized it should initialize itself with the provided configuration
* If the Mediator is already initialized then verify the request is for the domain we're running against,
* if correct then just return the current key otherwise fail.
*/
override def initialize(
requestP: v0.InitializeMediatorRequest
): Future[v0.InitializeMediatorResponse] = {
TraceContextGrpc.withGrpcTraceContext { implicit traceContext =>
for {
request <- EitherTUtil.toFuture(
InitializeMediatorRequest
.fromProtoV0(requestP)
.leftMap(err => s"Failed to deserialize request: $err")
.leftMap(Status.INVALID_ARGUMENT.withDescription)
.leftMap(_.asRuntimeException)
.toEitherT[Future]
)
response <-
initialization(request)
.fold[InitializeMediatorResponse](
InitializeMediatorResponse.Failure,
InitializeMediatorResponse.Success,
)
.map(_.toProtoV0)
.asGrpcResponse
} yield response
}
}
}

View File

@ -0,0 +1,68 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.service
import cats.data.EitherT
import cats.syntax.either.*
import com.digitalasset.canton.ProtoDeserializationError.ProtoDeserializationFailure
import com.digitalasset.canton.domain.Domain.FailedToInitialiseDomainNode
import com.digitalasset.canton.domain.admin.v2
import com.digitalasset.canton.domain.admin.v2.{
InitializeMediatorRequest,
InitializeMediatorResponse,
}
import com.digitalasset.canton.domain.mediator.admin.gprc.{
InitializeMediatorRequestX,
InitializeMediatorResponseX,
}
import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil.*
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import scala.concurrent.{ExecutionContext, Future}
/** Hosts the initialization service for the mediator.
* Upon receiving an initialize request it will the provided `initialize` function.
*/
class GrpcMediatorInitializationServiceX(
handler: GrpcMediatorInitializationServiceX.Callback,
val loggerFactory: NamedLoggerFactory,
)(implicit
executionContext: ExecutionContext
) extends v2.MediatorInitializationServiceGrpc.MediatorInitializationService
with NamedLogging {
override def initialize(
requestP: InitializeMediatorRequest
): Future[InitializeMediatorResponse] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
val res: EitherT[Future, CantonError, InitializeMediatorResponse] = for {
request <- EitherT.fromEither[Future](
InitializeMediatorRequestX
.fromProtoV2(requestP)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
result <- handler
.initialize(request)
.leftMap(FailedToInitialiseDomainNode.Failure(_))
.onShutdown(Left(FailedToInitialiseDomainNode.Shutdown())): EitherT[
Future,
CantonError,
InitializeMediatorResponseX,
]
} yield result.toProtoV2
mapErrNew(res)
}
}
object GrpcMediatorInitializationServiceX {
trait Callback {
def initialize(request: InitializeMediatorRequestX)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, InitializeMediatorResponseX]
}
}

View File

@ -0,0 +1,137 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.store
import cats.data.EitherT
import cats.syntax.traverse.*
import com.digitalasset.canton.config.CantonRequireTypes.{String1, String255, String68}
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.crypto.Fingerprint
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.resource.{DbStorage, DbStore}
import com.digitalasset.canton.sequencing.SequencerConnections
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.tracing.TraceContext
import com.google.protobuf.ByteString
import scala.concurrent.{ExecutionContext, Future}
class DbMediatorDomainConfigurationStore(
override protected val storage: DbStorage,
override protected val timeouts: ProcessingTimeout,
override protected val loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext)
extends MediatorDomainConfigurationStore
with DbStore {
private type SerializedRow = (String68, String255, ByteString, ByteString)
import DbStorage.Implicits.*
import storage.api.*
// sentinel value used to ensure the table can only have a single row
// see create table sql for more details
protected val singleRowLockValue: String1 = String1.fromChar('X')
override def fetchConfiguration(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Option[MediatorDomainConfiguration]] =
for {
rowO <- EitherT.right(
storage
.query(
sql"""select initial_key_context, domain_id, static_domain_parameters, sequencer_connection
from mediator_domain_configuration #${storage.limit(1)}""".as[SerializedRow].headOption,
"fetch-configuration",
)
)
config <- EitherT
.fromEither[Future](rowO.traverse(deserialize))
.leftMap[MediatorDomainConfigurationStoreError](
MediatorDomainConfigurationStoreError.DeserializationError
)
} yield config
override def saveConfiguration(configuration: MediatorDomainConfiguration)(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Unit] = {
val (initialKeyContext, domainId, domainParameters, sequencerConnection) = serialize(
configuration
)
EitherT.right(
storage
.update_(
storage.profile match {
case _: DbStorage.Profile.H2 =>
sqlu"""merge into mediator_domain_configuration
(lock, initial_key_context, domain_id, static_domain_parameters, sequencer_connection)
values
($singleRowLockValue, $initialKeyContext, $domainId, $domainParameters, $sequencerConnection)"""
case _: DbStorage.Profile.Postgres =>
sqlu"""insert into mediator_domain_configuration (initial_key_context, domain_id, static_domain_parameters, sequencer_connection)
values ($initialKeyContext, $domainId, $domainParameters, $sequencerConnection)
on conflict (lock) do update set initial_key_context = excluded.initial_key_context,
domain_id = excluded.domain_id,
static_domain_parameters = excluded.static_domain_parameters,
sequencer_connection = excluded.sequencer_connection"""
case _: DbStorage.Profile.Oracle =>
sqlu"""merge into mediator_domain_configuration mdc
using (
select
$initialKeyContext initial_key_context,
$domainId domain_id,
$domainParameters static_domain_parameters,
$sequencerConnection sequencer_connection
from dual
) excluded
on (mdc."LOCK" = 'X')
when matched then
update set mdc.initial_key_context = excluded.initial_key_context,
mdc.domain_id = excluded.domain_id,
mdc.static_domain_parameters = excluded.static_domain_parameters,
mdc.sequencer_connection = excluded.sequencer_connection
when not matched then
insert (initial_key_context, domain_id, static_domain_parameters, sequencer_connection)
values (excluded.initial_key_context, excluded.domain_id, excluded.static_domain_parameters, excluded.sequencer_connection)
"""
},
"save-configuration",
)
)
}
private def serialize(config: MediatorDomainConfiguration): SerializedRow = {
val MediatorDomainConfiguration(
initialKeyFingerprint,
domainId,
domainParameters,
sequencerConnections,
) = config
(
initialKeyFingerprint.toLengthLimitedString,
domainId.toLengthLimitedString,
domainParameters.toByteString,
sequencerConnections.toByteString(domainParameters.protocolVersion),
)
}
private def deserialize(
row: SerializedRow
): ParsingResult[MediatorDomainConfiguration] = {
for {
initialKeyFingerprint <- Fingerprint.fromProtoPrimitive(row._1.unwrap)
domainId <- DomainId.fromProtoPrimitive(row._2.unwrap, "domainId")
domainParameters <- StaticDomainParameters.fromByteString(row._3)
sequencerConnections <- SequencerConnections.fromByteString(row._4)
} yield MediatorDomainConfiguration(
initialKeyFingerprint,
domainId,
domainParameters,
sequencerConnections,
)
}
}

View File

@ -0,0 +1,29 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.store
import cats.data.EitherT
import com.digitalasset.canton.tracing.TraceContext
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future}
class InMemoryMediatorDomainConfigurationStore(implicit executionContext: ExecutionContext)
extends MediatorDomainConfigurationStore {
private val currentConfiguration = new AtomicReference[Option[MediatorDomainConfiguration]](None)
override def fetchConfiguration(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Option[MediatorDomainConfiguration]] =
EitherT.pure(currentConfiguration.get())
override def saveConfiguration(configuration: MediatorDomainConfiguration)(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Unit] = {
currentConfiguration.set(Some(configuration))
EitherT.pure(())
}
override def close(): Unit = ()
}

View File

@ -0,0 +1,53 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.store
import cats.data.EitherT
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.crypto.Fingerprint
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.resource.{DbStorage, MemoryStorage, Storage}
import com.digitalasset.canton.sequencing.SequencerConnections
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.tracing.TraceContext
import scala.concurrent.{ExecutionContext, Future}
final case class MediatorDomainConfiguration(
@Deprecated(since = "3.0.0, x-nodes do not need to return the initial key")
initialKeyFingerprint: Fingerprint,
domainId: DomainId,
domainParameters: StaticDomainParameters,
sequencerConnections: SequencerConnections,
)
sealed trait MediatorDomainConfigurationStoreError
object MediatorDomainConfigurationStoreError {
final case class DbError(exception: Throwable) extends MediatorDomainConfigurationStoreError
final case class DeserializationError(deserializationError: ProtoDeserializationError)
extends MediatorDomainConfigurationStoreError
}
trait MediatorDomainConfigurationStore extends AutoCloseable {
def fetchConfiguration(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Option[MediatorDomainConfiguration]]
def saveConfiguration(configuration: MediatorDomainConfiguration)(implicit
traceContext: TraceContext
): EitherT[Future, MediatorDomainConfigurationStoreError, Unit]
}
object MediatorDomainConfigurationStore {
def apply(storage: Storage, timeouts: ProcessingTimeout, loggerFactory: NamedLoggerFactory)(
implicit executionContext: ExecutionContext
): MediatorDomainConfigurationStore =
storage match {
case _: MemoryStorage => new InMemoryMediatorDomainConfigurationStore
case storage: DbStorage =>
new DbMediatorDomainConfigurationStore(storage, timeouts, loggerFactory)
}
}

View File

@ -0,0 +1,55 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.topology
import cats.data.EitherT
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.crypto.Crypto
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.store.{TopologyStore, TopologyStoreId}
import com.digitalasset.canton.topology.transaction.{SignedTopologyTransaction, TopologyChangeOp}
import com.digitalasset.canton.topology.{TopologyManager, TopologyManagerError}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import scala.concurrent.{ExecutionContext, Future}
class MediatorTopologyManager(
clock: Clock,
override val store: TopologyStore[TopologyStoreId.AuthorizedStore],
crypto: Crypto,
override protected val timeouts: ProcessingTimeout,
protocolVersion: ProtocolVersion,
loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
)(implicit ec: ExecutionContext)
extends TopologyManager[TopologyManagerError](
clock,
crypto,
store,
timeouts,
protocolVersion,
loggerFactory,
futureSupervisor,
)(
ec
) {
override protected def checkNewTransaction(
transaction: SignedTopologyTransaction[TopologyChangeOp],
force: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, TopologyManagerError, Unit] =
EitherT.rightT[Future, TopologyManagerError](())
override protected def notifyObservers(
timestamp: CantonTimestamp,
transactions: Seq[SignedTopologyTransaction[TopologyChangeOp]],
)(implicit traceContext: TraceContext): Future[Unit] = Future.unit
override protected def wrapError(error: TopologyManagerError)(implicit
traceContext: TraceContext
): TopologyManagerError = error
}

View File

@ -68,9 +68,11 @@ trait AbstractSequencerPruningStatus {
private[canton] final case class InternalSequencerPruningStatus(
override val lowerBound: CantonTimestamp,
override val members: Seq[SequencerMemberStatus],
membersMap: Map[Member, SequencerMemberStatus],
) extends AbstractSequencerPruningStatus
with PrettyPrinting {
override val members: Seq[SequencerMemberStatus] = membersMap.values.toSeq
def toSequencerPruningStatus(now: CantonTimestamp): SequencerPruningStatus =
SequencerPruningStatus(lowerBound, now, members)
@ -84,8 +86,14 @@ private[canton] object InternalSequencerPruningStatus {
/** Sentinel value to use for Sequencers that don't yet support the status endpoint */
val Unimplemented =
InternalSequencerPruningStatus(CantonTimestamp.MinValue, members = Seq.empty)
InternalSequencerPruningStatus(CantonTimestamp.MinValue, membersMap = Map.empty)
def apply(
lowerBound: CantonTimestamp,
members: Seq[SequencerMemberStatus],
): InternalSequencerPruningStatus = {
InternalSequencerPruningStatus(lowerBound, members.map(m => m.member -> m).toMap)
}
}
/** Pruning status of a Sequencer.
@ -100,7 +108,7 @@ final case class SequencerPruningStatus(
with PrettyPrinting {
def toInternal: InternalSequencerPruningStatus =
InternalSequencerPruningStatus(lowerBound, members)
InternalSequencerPruningStatus(lowerBound, members.map(m => m.member -> m).toMap)
/** Using the member details, calculate based on their acknowledgements when is the latest point we can
* safely prune without losing any data that may still be read.

View File

@ -0,0 +1,259 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.service
import cats.data.EitherT
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.canton.DomainAlias
import com.digitalasset.canton.common.domain.grpc.SequencerInfoLoader
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.domain.admin.v0
import com.digitalasset.canton.domain.admin.v0.EnterpriseSequencerConnectionServiceGrpc.EnterpriseSequencerConnectionService
import com.digitalasset.canton.lifecycle.{
CloseContext,
FlagCloseable,
FutureUnlessShutdown,
PromiseUnlessShutdown,
}
import com.digitalasset.canton.logging.ErrorLoggingContext
import com.digitalasset.canton.networking.grpc.CantonMutableHandlerRegistry
import com.digitalasset.canton.sequencing.client.SequencerClient.SequencerTransports
import com.digitalasset.canton.sequencing.client.{
RequestSigner,
SequencerClient,
SequencerClientTransportFactory,
}
import com.digitalasset.canton.sequencing.{
GrpcSequencerConnection,
SequencerConnection,
SequencerConnections,
}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.retry.RetryUtil.NoExnRetryable
import com.digitalasset.canton.util.{EitherTUtil, retry}
import com.google.protobuf.empty.Empty
import io.grpc.{Status, StatusException}
import monocle.Lens
import org.apache.pekko.stream.Materializer
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
class GrpcSequencerConnectionService(
fetchConnection: () => EitherT[Future, String, Option[
SequencerConnections
]],
setConnection: SequencerConnections => EitherT[
Future,
String,
Unit,
],
)(implicit ec: ExecutionContext)
extends v0.EnterpriseSequencerConnectionServiceGrpc.EnterpriseSequencerConnectionService {
override def getConnection(request: v0.GetConnectionRequest): Future[v0.GetConnectionResponse] =
EitherTUtil.toFuture(
fetchConnection()
.leftMap(error => Status.FAILED_PRECONDITION.withDescription(error).asException())
.map { optSequencerConnections =>
v0.GetConnectionResponse(
optSequencerConnections.toList.flatMap(_.toProtoV0),
optSequencerConnections.map(_.sequencerTrustThreshold.unwrap).getOrElse(0),
)
}
)
override def setConnection(request: v0.SetConnectionRequest): Future[Empty] =
EitherTUtil.toFuture(for {
existing <- getConnection
requestedReplacement <- parseConnection(request)
_ <- validateReplacement(existing, requestedReplacement)
_ <- setConnection(requestedReplacement)
.leftMap(error => Status.FAILED_PRECONDITION.withDescription(error).asException())
} yield Empty())
private def getConnection: EitherT[Future, StatusException, SequencerConnections] =
fetchConnection()
.leftMap(error => Status.INTERNAL.withDescription(error).asException())
.flatMap[StatusException, SequencerConnections] {
case None =>
EitherT.leftT(
Status.FAILED_PRECONDITION
.withDescription(
"Initialize node before attempting to change sequencer connection"
)
.asException()
)
case Some(conn) =>
EitherT.rightT(conn)
}
private def parseConnection(
request: v0.SetConnectionRequest
): EitherT[Future, StatusException, SequencerConnections] =
SequencerConnections
.fromProtoV0(
request.sequencerConnections,
request.sequencerTrustThreshold,
)
.leftMap(err => Status.INVALID_ARGUMENT.withDescription(err.message).asException())
.toEitherT[Future]
private def validateReplacement(
existing: SequencerConnections,
requestedReplacement: SequencerConnections,
): EitherT[Future, StatusException, Unit] =
(existing.default, requestedReplacement.default) match {
// TODO(i12076): How should we be checking connetions here? what should be validated?
case (_: GrpcSequencerConnection, _: GrpcSequencerConnection) =>
EitherT.rightT[Future, StatusException](())
case _ =>
EitherT.leftT[Future, Unit](
Status.INVALID_ARGUMENT
.withDescription(
"requested replacement connection info is not of the same type as the existing"
)
.asException()
)
}
}
object GrpcSequencerConnectionService {
trait UpdateSequencerClient {
def set(client: SequencerClient): Unit
}
def setup[C](member: Member)(
registry: CantonMutableHandlerRegistry,
fetchConfig: () => EitherT[Future, String, Option[C]],
saveConfig: C => EitherT[Future, String, Unit],
sequencerConnectionLens: Lens[C, SequencerConnections],
requestSigner: RequestSigner,
transportFactory: SequencerClientTransportFactory,
sequencerInfoLoader: SequencerInfoLoader,
domainAlias: DomainAlias,
)(implicit
executionContext: ExecutionContextExecutor,
executionServiceFactory: ExecutionSequencerFactory,
materializer: Materializer,
traceContext: TraceContext,
errorLoggingContext: ErrorLoggingContext,
closeContext: CloseContext,
) = {
val clientO = new AtomicReference[Option[SequencerClient]](None)
registry.addServiceU(
EnterpriseSequencerConnectionService.bindService(
new GrpcSequencerConnectionService(
fetchConnection = () => fetchConfig().map(_.map(sequencerConnectionLens.get)),
setConnection = newSequencerConnection =>
for {
currentConfig <- fetchConfig()
newConfig <- currentConfig.fold(
EitherT.leftT[Future, C](
"Can't update config when none has yet been set. Please initialize node."
)
)(config =>
EitherT.rightT(sequencerConnectionLens.replace(newSequencerConnection)(config))
)
// validate connection before making transport (as making transport will hang if the connection
// is invalid)
_ <- transportFactory
.validateTransport(
newSequencerConnection,
logWarning = false,
)
.onShutdown(Left("Aborting due to shutdown"))
newEndpointsInfo <- sequencerInfoLoader
.loadSequencerEndpoints(domainAlias, newSequencerConnection)
.leftMap(_.cause)
sequencerTransportsMap <- transportFactory
.makeTransport(
newEndpointsInfo.sequencerConnections,
member,
requestSigner,
)
sequencerTransports <- EitherT.fromEither[Future](
SequencerTransports.from(
sequencerTransportsMap,
newEndpointsInfo.expectedSequencers,
newEndpointsInfo.sequencerConnections.sequencerTrustThreshold,
)
)
// important to only save the config and change the transport after the `makeTransport` has run and done the handshake
_ <- saveConfig(newConfig)
_ <- EitherT.right(
clientO
.get()
.fold {
// need to close here
sequencerTransportsMap.values.foreach(_.close())
Future.unit
}(_.changeTransport(sequencerTransports))
)
} yield (),
),
executionContext,
)
)
new UpdateSequencerClient {
override def set(client: SequencerClient): Unit = clientO.set(Some(client))
}
}
def waitUntilSequencerConnectionIsValid(
factory: SequencerClientTransportFactory,
flagCloseable: FlagCloseable,
futureSupervisor: FutureSupervisor,
loadConfig: => EitherT[Future, String, Option[
SequencerConnections
]],
)(implicit
errorLoggingContext: ErrorLoggingContext,
traceContext: TraceContext,
executionContext: ExecutionContextExecutor,
): EitherT[Future, String, SequencerConnections] = {
val promise =
new PromiseUnlessShutdown[Either[String, SequencerConnection]](
"wait-for-valid-connection",
futureSupervisor,
)
flagCloseable.runOnShutdown_(promise)
implicit val closeContext = CloseContext(flagCloseable)
def tryNewConfig: EitherT[FutureUnlessShutdown, String, SequencerConnections] = {
flagCloseable
.performUnlessClosingEitherU("load config")(loadConfig)
.flatMap {
case Some(settings) =>
factory
.validateTransport(settings, logWarning = true)
.map(_ => settings)
case None => EitherT.leftT("No sequencer connection config")
}
}
import scala.concurrent.duration.*
EitherT(
retry
.Pause(
errorLoggingContext.logger,
flagCloseable,
maxRetries = Int.MaxValue,
delay = 50.millis,
operationName = "wait-for-valid-sequencer-connection",
)
.unlessShutdown(
tryNewConfig.value,
NoExnRetryable,
)
.onShutdown(Left("Aborting due to shutdown"))
)
}
}

View File

@ -0,0 +1,132 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.mediator.store
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.config.RequireTypes.Port
import com.digitalasset.canton.crypto.Fingerprint
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.sequencing.{GrpcSequencerConnection, SequencerConnections}
import com.digitalasset.canton.store.db.{DbTest, H2Test, PostgresTest}
import com.digitalasset.canton.topology.DefaultTestIdentities
import com.digitalasset.canton.{BaseTest, SequencerAlias}
import monocle.macros.syntax.lens.*
import org.scalatest.wordspec.{AsyncWordSpec, AsyncWordSpecLike}
import scala.concurrent.Future
trait MediatorDomainConfigurationStoreTest {
this: AsyncWordSpecLike with BaseTest =>
def mediatorDomainConfigurationStore(mkStore: => MediatorDomainConfigurationStore): Unit = {
"returns nothing for an empty store" in {
val store = mkStore
for {
config <- valueOrFail(store.fetchConfiguration)("fetchConfiguration")
} yield config shouldBe None
}
"when set returns set value" in {
val store = mkStore
val connection = GrpcSequencerConnection(
NonEmpty(Seq, Endpoint("sequencer", Port.tryCreate(100))),
true,
None,
SequencerAlias.Default,
)
val originalConfig = MediatorDomainConfiguration(
Fingerprint.tryCreate("fingerprint"),
DefaultTestIdentities.domainId,
defaultStaticDomainParameters,
SequencerConnections.single(connection),
)
for {
_ <- valueOrFail(store.saveConfiguration(originalConfig))("saveConfiguration")
persistedConfig <- valueOrFail(store.fetchConfiguration)("fetchConfiguration").map(_.value)
} yield persistedConfig shouldBe originalConfig
}
"supports updating the config" in {
val store = mkStore
val defaultParams = defaultStaticDomainParameters
val connection = GrpcSequencerConnection(
NonEmpty(
Seq,
Endpoint("sequencer", Port.tryCreate(200)),
Endpoint("sequencer", Port.tryCreate(300)),
),
true,
None,
SequencerAlias.Default,
)
val originalConfig = MediatorDomainConfiguration(
Fingerprint.tryCreate("fingerprint"),
DefaultTestIdentities.domainId,
defaultParams,
SequencerConnections.single(connection),
)
val updatedConfig = originalConfig
.focus(_.domainParameters)
.replace(
BaseTest.defaultStaticDomainParametersWith(
uniqueContractKeys = true
)
)
for {
_ <- valueOrFail(store.saveConfiguration(originalConfig))("save original config")
persistedConfig1 <- valueOrFail(store.fetchConfiguration)("fetch original config")
.map(_.value)
_ = persistedConfig1 shouldBe originalConfig
_ <- valueOrFail(store.saveConfiguration(updatedConfig))("save updated config")
persistedConfig2 <- valueOrFail(store.fetchConfiguration)("fetch updated config")
.map(_.value)
} yield persistedConfig2 shouldBe updatedConfig
}
}
}
class MediatorDomainConfigurationStoreTestInMemory
extends AsyncWordSpec
with BaseTest
with MediatorDomainConfigurationStoreTest {
behave like mediatorDomainConfigurationStore(new InMemoryMediatorDomainConfigurationStore())
}
trait DbMediatorDomainConfigurationStoreTest
extends AsyncWordSpec
with BaseTest
with MediatorDomainConfigurationStoreTest {
this: DbTest =>
override def cleanDb(storage: DbStorage): Future[Unit] = {
import storage.api.*
storage.update(
DBIO.seq(
sqlu"truncate table mediator_domain_configuration"
),
functionFullName,
)
}
behave like mediatorDomainConfigurationStore(
new DbMediatorDomainConfigurationStore(storage, timeouts, loggerFactory)
)
}
class MediatorDomainConfigurationStoreTestPostgres
extends DbMediatorDomainConfigurationStoreTest
with PostgresTest
class MediatorDomainConfigurationStoreTestH2
extends DbMediatorDomainConfigurationStoreTest
with H2Test

View File

@ -34,7 +34,16 @@ trait ConsoleEnvironmentTestHelpers[+CE <: ConsoleEnvironment] { this: CE =>
.find(_.name == name)
.getOrElse(sys.error(s"neither local nor remote participant [$name] is configured"))
def px(name: String): LocalParticipantReferenceX = participantsX.local
def lpx(name: String): LocalParticipantReferenceX = participantsX.local
.find(_.name == name)
.getOrElse(sys.error(s"participant x [$name] not configured"))
def rpx(name: String): RemoteParticipantReferenceX =
participantsX.remote
.find(_.name == name)
.getOrElse(sys.error(s"remote participant [$name] not configured"))
def px(name: String): ParticipantReferenceX = participantsX.all
.find(_.name == name)
.getOrElse(sys.error(s"neither local nor remote participant x [$name] is configured"))

View File

@ -9,6 +9,7 @@ import com.digitalasset.canton.console.{
LocalParticipantReference,
LocalParticipantReferenceX,
ParticipantReference,
ParticipantReferenceX,
}
/** Aliases used by our typical single domain and multi domain tests.
@ -22,11 +23,12 @@ trait CommonTestAliases[+CE <: ConsoleEnvironment] {
lazy val participant2: LocalParticipantReference = lp("participant2")
lazy val participant3: LocalParticipantReference = lp("participant3")
lazy val participant4: LocalParticipantReference = lp("participant4")
lazy val participant1x: LocalParticipantReferenceX = px("participant1")
lazy val participant2x: LocalParticipantReferenceX = px("participant2")
lazy val participant3x: LocalParticipantReferenceX = px("participant3")
lazy val participant4x: LocalParticipantReferenceX = px("participant4")
lazy val participant5x: LocalParticipantReferenceX = px("participant5")
lazy val participant1x: LocalParticipantReferenceX = lpx("participant1")
lazy val participant1x_ : ParticipantReferenceX = px("participant1")
lazy val participant2x: LocalParticipantReferenceX = lpx("participant2")
lazy val participant3x: LocalParticipantReferenceX = lpx("participant3")
lazy val participant4x: LocalParticipantReferenceX = lpx("participant4")
lazy val participant5x: LocalParticipantReferenceX = lpx("participant5")
lazy val da: CE#DomainLocalRef = d("da")
lazy val acme: CE#DomainLocalRef = d("acme")
lazy val repairDomain: CE#DomainLocalRef = d("repair")

View File

@ -19,6 +19,22 @@ import com.digitalasset.canton.{SequencerCounter, topology}
import scala.concurrent.{ExecutionContext, Future}
class ParticipantTopologyTerminateProcessingTickerX(
recordOrderPublisher: RecordOrderPublisher,
override protected val loggerFactory: NamedLoggerFactory,
) extends topology.processing.TerminateProcessing
with NamedLogging {
override def terminate(
sc: SequencerCounter,
sequencedTime: SequencedTime,
effectiveTime: EffectiveTime,
)(implicit traceContext: TraceContext): Future[Unit] = {
recordOrderPublisher.tick(sc, sequencedTime.value)
Future.unit
}
}
class ParticipantTopologyTerminateProcessingX(
recordOrderPublisher: RecordOrderPublisher,
store: TopologyStoreX[TopologyStoreId.DomainStore],

View File

@ -20,8 +20,6 @@ import com.digitalasset.canton.util.Thereafter.syntax.*
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.DurationConverters.*
import scala.math.Ordering.Implicits.*
private[participant] class PruneObserver(
requestJournalStore: RequestJournalStore,

View File

@ -56,14 +56,7 @@ import com.digitalasset.canton.participant.pruning.{
SortedReconciliationIntervalsProvider,
}
import com.digitalasset.canton.participant.store.ActiveContractSnapshot.ActiveContractIdsChange
import com.digitalasset.canton.participant.store.{
ContractChange,
ParticipantNodePersistentState,
StateChangeType,
StoredContract,
SyncDomainEphemeralState,
SyncDomainPersistentState,
}
import com.digitalasset.canton.participant.store.*
import com.digitalasset.canton.participant.sync.SyncServiceError.SyncServiceAlarm
import com.digitalasset.canton.participant.topology.ParticipantTopologyDispatcherCommon
import com.digitalasset.canton.participant.topology.client.MissingKeysAlerter
@ -78,7 +71,6 @@ import com.digitalasset.canton.sequencing.handlers.CleanSequencerCounterTracker
import com.digitalasset.canton.sequencing.protocol.{ClosedEnvelope, Envelope, EventWithErrors}
import com.digitalasset.canton.store.SequencedEventStore
import com.digitalasset.canton.store.SequencedEventStore.PossiblyIgnoredSequencedEvent
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.time.{Clock, DomainTimeTracker}
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
import com.digitalasset.canton.topology.client.PartyTopologySnapshotClient.AuthorityOfResponse

View File

@ -16,7 +16,7 @@ import com.digitalasset.canton.crypto.{Crypto, DomainSyncCryptoClient}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.participant.event.RecordOrderPublisher
import com.digitalasset.canton.participant.protocol.ParticipantTopologyTerminateProcessingX
import com.digitalasset.canton.participant.protocol.ParticipantTopologyTerminateProcessingTickerX
import com.digitalasset.canton.participant.topology.client.MissingKeysAlerter
import com.digitalasset.canton.participant.traffic.{
TrafficStateController,
@ -231,9 +231,8 @@ class TopologyComponentFactoryX(
acsCommitmentScheduleEffectiveTime: Traced[EffectiveTime] => Unit
)(implicit executionContext: ExecutionContext): TopologyTransactionProcessorCommon = {
val terminateTopologyProcessing = new ParticipantTopologyTerminateProcessingX(
val terminateTopologyProcessing = new ParticipantTopologyTerminateProcessingTickerX(
recordOrderPublisher,
topologyStore,
loggerFactory,
)

View File

@ -13,76 +13,182 @@ import java.util.Queue;
import static org.slf4j.event.Level.*;
/**
* Logger that formats log messages then saves in a buffer.
* Trace messages are currently ignored.
* Logger that formats log messages then saves in a buffer. Trace messages are currently ignored.
* Java because of awkwardness overriding and using Java varargs methods from Scala.
*/
class BufferingLogger extends MarkerIgnoringBase {
private final Queue<LogEntry> messages;
private final Function1<LogEntry, Boolean> skip;
private final Queue<LogEntry> messages;
private final Function1<LogEntry, Boolean> skip;
public BufferingLogger(Queue<LogEntry> messages, String name, Function1<LogEntry, Boolean> skip) {
this.messages = messages;
this.name = name;
this.skip = skip;
}
@Override public boolean isTraceEnabled() { return false; }
@Override public void trace(String msg) {}
@Override public void trace(String format, Object arg) {}
@Override public void trace(String format, Object arg1, Object arg2) {}
@Override public void trace(String format, Object... arguments) {}
@Override public void trace(String msg, Throwable t) {}
@Override public boolean isDebugEnabled() { return true; }
@Override public void debug(String msg) { skipOrAdd(toLogEntry(DEBUG, msg)); }
@Override public void debug(String format, Object arg) { skipOrAdd(toLogEntry(DEBUG, format, arg)); }
@Override public void debug(String format, Object arg1, Object arg2) { skipOrAdd(toLogEntry(DEBUG, format, arg1, arg2)); }
@Override public void debug(String format, Object... arguments) { skipOrAdd(toLogEntry(DEBUG, format, arguments)); }
@Override public void debug(String msg, Throwable t) { skipOrAdd(toLogEntry(DEBUG, msg, t)); }
@Override public boolean isInfoEnabled() { return true; }
@Override public void info(String msg) { skipOrAdd(toLogEntry(INFO, msg)); }
@Override public void info(String format, Object arg) { skipOrAdd(toLogEntry(INFO, format, arg)); }
@Override public void info(String format, Object arg1, Object arg2) { skipOrAdd(toLogEntry(INFO, format, arg1, arg2)); }
@Override public void info(String format, Object... arguments) { skipOrAdd(toLogEntry(INFO, format, arguments)); }
@Override public void info(String msg, Throwable t) { skipOrAdd(toLogEntry(INFO, msg, t)); }
@Override public boolean isWarnEnabled() { return true; }
@Override public void warn(String msg) { skipOrAdd(toLogEntry(WARN, msg)); }
@Override public void warn(String msg, Throwable t) { skipOrAdd(toLogEntry(WARN, msg, t)); }
@Override public void warn(String format, Object arg) { skipOrAdd(toLogEntry(WARN, format, arg)); }
@Override public void warn(String format, Object arg1, Object arg2) { skipOrAdd(toLogEntry(WARN, format, arg1, arg2)); }
@Override public void warn(String format, Object... arguments) { skipOrAdd(toLogEntry(WARN, format, arguments)); }
@Override public boolean isErrorEnabled() { return true; }
@Override public void error(String msg) { skipOrAdd(toLogEntry(ERROR, msg)); }
@Override public void error(String msg, Throwable t) { skipOrAdd(toLogEntry(ERROR, msg, t)); }
@Override public void error(String format, Object arg) { skipOrAdd(toLogEntry(ERROR, format, arg)); }
@Override public void error(String format, Object arg1, Object arg2) { skipOrAdd(toLogEntry(ERROR, format, arg1, arg2)); }
@Override public void error(String format, Object... arguments) { skipOrAdd(toLogEntry(ERROR, format, arguments)); }
private LogEntry toLogEntry(Level level, String msg) {
return LogEntry$.MODULE$.apply(level, name, msg, (Throwable) null);
}
private LogEntry toLogEntry(Level level, String msg, Throwable t) {
return LogEntry$.MODULE$.apply(level, name, msg, t);
}
private LogEntry toLogEntry(Level level, String format, Object arg) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.format(format, arg));
}
private LogEntry toLogEntry(Level level, String format, Object arg1, Object arg2) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.format(format, arg1, arg2));
}
private LogEntry toLogEntry(Level level, String format, Object[] args) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.arrayFormat(format, args));
}
private void skipOrAdd(LogEntry entry) {
if (!skip.apply(entry)) { messages.add(entry); }
public BufferingLogger(Queue<LogEntry> messages, String name, Function1<LogEntry, Boolean> skip) {
this.messages = messages;
this.name = name;
this.skip = skip;
}
@Override
public boolean isTraceEnabled() {
return false;
}
@Override
public void trace(String msg) {}
@Override
public void trace(String format, Object arg) {}
@Override
public void trace(String format, Object arg1, Object arg2) {}
@Override
public void trace(String format, Object... arguments) {}
@Override
public void trace(String msg, Throwable t) {}
@Override
public boolean isDebugEnabled() {
return true;
}
@Override
public void debug(String msg) {
skipOrAdd(toLogEntry(DEBUG, msg));
}
@Override
public void debug(String format, Object arg) {
skipOrAdd(toLogEntry(DEBUG, format, arg));
}
@Override
public void debug(String format, Object arg1, Object arg2) {
skipOrAdd(toLogEntry(DEBUG, format, arg1, arg2));
}
@Override
public void debug(String format, Object... arguments) {
skipOrAdd(toLogEntry(DEBUG, format, arguments));
}
@Override
public void debug(String msg, Throwable t) {
skipOrAdd(toLogEntry(DEBUG, msg, t));
}
@Override
public boolean isInfoEnabled() {
return true;
}
@Override
public void info(String msg) {
skipOrAdd(toLogEntry(INFO, msg));
}
@Override
public void info(String format, Object arg) {
skipOrAdd(toLogEntry(INFO, format, arg));
}
@Override
public void info(String format, Object arg1, Object arg2) {
skipOrAdd(toLogEntry(INFO, format, arg1, arg2));
}
@Override
public void info(String format, Object... arguments) {
skipOrAdd(toLogEntry(INFO, format, arguments));
}
@Override
public void info(String msg, Throwable t) {
skipOrAdd(toLogEntry(INFO, msg, t));
}
@Override
public boolean isWarnEnabled() {
return true;
}
@Override
public void warn(String msg) {
skipOrAdd(toLogEntry(WARN, msg));
}
@Override
public void warn(String msg, Throwable t) {
skipOrAdd(toLogEntry(WARN, msg, t));
}
@Override
public void warn(String format, Object arg) {
skipOrAdd(toLogEntry(WARN, format, arg));
}
@Override
public void warn(String format, Object arg1, Object arg2) {
skipOrAdd(toLogEntry(WARN, format, arg1, arg2));
}
@Override
public void warn(String format, Object... arguments) {
skipOrAdd(toLogEntry(WARN, format, arguments));
}
@Override
public boolean isErrorEnabled() {
return true;
}
@Override
public void error(String msg) {
skipOrAdd(toLogEntry(ERROR, msg));
}
@Override
public void error(String msg, Throwable t) {
skipOrAdd(toLogEntry(ERROR, msg, t));
}
@Override
public void error(String format, Object arg) {
skipOrAdd(toLogEntry(ERROR, format, arg));
}
@Override
public void error(String format, Object arg1, Object arg2) {
skipOrAdd(toLogEntry(ERROR, format, arg1, arg2));
}
@Override
public void error(String format, Object... arguments) {
skipOrAdd(toLogEntry(ERROR, format, arguments));
}
private LogEntry toLogEntry(Level level, String msg) {
return LogEntry$.MODULE$.apply(level, name, msg, (Throwable) null);
}
private LogEntry toLogEntry(Level level, String msg, Throwable t) {
return LogEntry$.MODULE$.apply(level, name, msg, t);
}
private LogEntry toLogEntry(Level level, String format, Object arg) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.format(format, arg));
}
private LogEntry toLogEntry(Level level, String format, Object arg1, Object arg2) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.format(format, arg1, arg2));
}
private LogEntry toLogEntry(Level level, String format, Object[] args) {
return LogEntry$.MODULE$.apply(level, name, MessageFormatter.arrayFormat(format, args));
}
private void skipOrAdd(LogEntry entry) {
if (!skip.apply(entry)) {
messages.add(entry);
}
}
}

View File

@ -14,423 +14,428 @@ import static org.slf4j.event.Level.*;
class SuppressingLoggerDispatcher extends SubstituteLogger {
private Logger suppressedMessageLogger;
private AtomicReference<SuppressionRule> activeSuppressionRule;
private String suppressionPrefix;
private Logger suppressedMessageLogger;
private AtomicReference<SuppressionRule> activeSuppressionRule;
private String suppressionPrefix;
SuppressingLoggerDispatcher(String name, Logger suppressedMessageLogger, AtomicReference<SuppressionRule> activeSuppressionRule, String suppressionPrefix) {
// because we know we won't log anything before setting the delegate there's no need to set an event queue hence the null
super(name, null, true);
this.suppressedMessageLogger = suppressedMessageLogger;
this.activeSuppressionRule = activeSuppressionRule;
this.suppressionPrefix = suppressionPrefix;
}
SuppressingLoggerDispatcher(
String name,
Logger suppressedMessageLogger,
AtomicReference<SuppressionRule> activeSuppressionRule,
String suppressionPrefix) {
// because we know we won't log anything before setting the delegate there's no need to set an
// event queue hence the null
super(name, null, true);
this.suppressedMessageLogger = suppressedMessageLogger;
this.activeSuppressionRule = activeSuppressionRule;
this.suppressionPrefix = suppressionPrefix;
}
@Override
public void error(String msg) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, msg));
suppressedMessageLogger.error(msg);
} else {
super.error(msg);
}
@Override
public void error(String msg) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, msg));
suppressedMessageLogger.error(msg);
} else {
super.error(msg);
}
}
@Override
public void error(String format, Object arg) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arg);
suppressedMessageLogger.error(format, arg);
} else {
super.error(format, arg);
}
@Override
public void error(String format, Object arg) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arg);
suppressedMessageLogger.error(format, arg);
} else {
super.error(format, arg);
}
}
@Override
public void error(String format, Object arg1, Object arg2) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arg1, arg2);
suppressedMessageLogger.error(format, arg1, arg2);
} else {
super.error(format, arg1, arg2);
}
@Override
public void error(String format, Object arg1, Object arg2) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arg1, arg2);
suppressedMessageLogger.error(format, arg1, arg2);
} else {
super.error(format, arg1, arg2);
}
}
@Override
public void error(String format, Object... arguments) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arguments);
suppressedMessageLogger.error(format, arguments);
} else {
super.error(format, arguments);
}
@Override
public void error(String format, Object... arguments) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, format), arguments);
suppressedMessageLogger.error(format, arguments);
} else {
super.error(format, arguments);
}
}
@Override
public void error(String msg, Throwable t) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, msg), t);
suppressedMessageLogger.error(msg, t);
} else {
super.error(msg, t);
}
@Override
public void error(String msg, Throwable t) {
if (isSuppressed(ERROR)) {
super.info(withSuppressionHint(ERROR, msg), t);
suppressedMessageLogger.error(msg, t);
} else {
super.error(msg, t);
}
}
@Override
public void error(Marker marker, String msg) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, msg));
suppressedMessageLogger.error(marker, msg);
} else {
super.error(marker, msg);
}
@Override
public void error(Marker marker, String msg) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, msg));
suppressedMessageLogger.error(marker, msg);
} else {
super.error(marker, msg);
}
}
@Override
public void error(Marker marker, String format, Object arg) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arg);
suppressedMessageLogger.error(marker, format, arg);
} else {
super.error(marker, format, arg);
}
@Override
public void error(Marker marker, String format, Object arg) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arg);
suppressedMessageLogger.error(marker, format, arg);
} else {
super.error(marker, format, arg);
}
}
@Override
public void error(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arg1, arg2);
suppressedMessageLogger.error(marker, format, arg1, arg2);
} else {
super.error(marker, format, arg1, arg2);
}
@Override
public void error(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arg1, arg2);
suppressedMessageLogger.error(marker, format, arg1, arg2);
} else {
super.error(marker, format, arg1, arg2);
}
}
@Override
public void error(Marker marker, String format, Object... arguments) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arguments);
suppressedMessageLogger.error(marker, format, arguments);
} else {
super.error(marker, format, arguments);
}
@Override
public void error(Marker marker, String format, Object... arguments) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, format), arguments);
suppressedMessageLogger.error(marker, format, arguments);
} else {
super.error(marker, format, arguments);
}
}
@Override
public void error(Marker marker, String msg, Throwable t) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, msg), t);
suppressedMessageLogger.error(marker, msg, t);
} else {
super.error(marker, msg, t);
}
@Override
public void error(Marker marker, String msg, Throwable t) {
if (isSuppressed(ERROR)) {
super.info(marker, withSuppressionHint(ERROR, msg), t);
suppressedMessageLogger.error(marker, msg, t);
} else {
super.error(marker, msg, t);
}
}
@Override
public void warn(String msg) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, msg));
suppressedMessageLogger.warn(msg);
} else {
super.warn(msg);
}
@Override
public void warn(String msg) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, msg));
suppressedMessageLogger.warn(msg);
} else {
super.warn(msg);
}
}
@Override
public void warn(String format, Object arg) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arg);
suppressedMessageLogger.warn(format, arg);
} else {
super.warn(format, arg);
}
@Override
public void warn(String format, Object arg) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arg);
suppressedMessageLogger.warn(format, arg);
} else {
super.warn(format, arg);
}
}
@Override
public void warn(String format, Object arg1, Object arg2) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arg1, arg2);
suppressedMessageLogger.warn(format, arg1, arg2);
} else {
super.warn(format, arg1, arg2);
}
@Override
public void warn(String format, Object arg1, Object arg2) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arg1, arg2);
suppressedMessageLogger.warn(format, arg1, arg2);
} else {
super.warn(format, arg1, arg2);
}
}
@Override
public void warn(String format, Object... arguments) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arguments);
suppressedMessageLogger.warn(format, arguments);
} else {
super.warn(format, arguments);
}
@Override
public void warn(String format, Object... arguments) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, format), arguments);
suppressedMessageLogger.warn(format, arguments);
} else {
super.warn(format, arguments);
}
}
@Override
public void warn(String msg, Throwable t) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, msg), t);
suppressedMessageLogger.warn(msg, t);
} else {
super.warn(msg, t);
}
@Override
public void warn(String msg, Throwable t) {
if (isSuppressed(WARN)) {
super.info(withSuppressionHint(WARN, msg), t);
suppressedMessageLogger.warn(msg, t);
} else {
super.warn(msg, t);
}
}
@Override
public void warn(Marker marker, String msg) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, msg));
suppressedMessageLogger.warn(marker, msg);
} else {
super.warn(marker, msg);
}
@Override
public void warn(Marker marker, String msg) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, msg));
suppressedMessageLogger.warn(marker, msg);
} else {
super.warn(marker, msg);
}
}
@Override
public void warn(Marker marker, String format, Object arg) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arg);
suppressedMessageLogger.warn(marker, format, arg);
} else {
super.warn(marker, format, arg);
}
@Override
public void warn(Marker marker, String format, Object arg) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arg);
suppressedMessageLogger.warn(marker, format, arg);
} else {
super.warn(marker, format, arg);
}
}
@Override
public void warn(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arg1, arg2);
suppressedMessageLogger.warn(marker, format, arg1, arg2);
} else {
super.warn(marker, format, arg1, arg2);
}
@Override
public void warn(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arg1, arg2);
suppressedMessageLogger.warn(marker, format, arg1, arg2);
} else {
super.warn(marker, format, arg1, arg2);
}
}
@Override
public void warn(Marker marker, String format, Object... arguments) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arguments);
suppressedMessageLogger.warn(marker, format, arguments);
} else {
super.warn(marker, format, arguments);
}
@Override
public void warn(Marker marker, String format, Object... arguments) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, format), arguments);
suppressedMessageLogger.warn(marker, format, arguments);
} else {
super.warn(marker, format, arguments);
}
}
@Override
public void warn(Marker marker, String msg, Throwable t) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, msg), t);
suppressedMessageLogger.warn(marker, msg, t);
} else {
super.warn(marker, msg, t);
}
@Override
public void warn(Marker marker, String msg, Throwable t) {
if (isSuppressed(WARN)) {
super.info(marker, withSuppressionHint(WARN, msg), t);
suppressedMessageLogger.warn(marker, msg, t);
} else {
super.warn(marker, msg, t);
}
}
@Override
public void info(String msg) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, msg));
suppressedMessageLogger.info(msg);
} else {
super.info(msg);
}
@Override
public void info(String msg) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, msg));
suppressedMessageLogger.info(msg);
} else {
super.info(msg);
}
}
@Override
public void info(String format, Object arg) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arg);
suppressedMessageLogger.info(format, arg);
} else {
super.info(format, arg);
}
@Override
public void info(String format, Object arg) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arg);
suppressedMessageLogger.info(format, arg);
} else {
super.info(format, arg);
}
}
@Override
public void info(String format, Object arg1, Object arg2) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arg1, arg2);
suppressedMessageLogger.info(format, arg1, arg2);
} else {
super.info(format, arg1, arg2);
}
@Override
public void info(String format, Object arg1, Object arg2) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arg1, arg2);
suppressedMessageLogger.info(format, arg1, arg2);
} else {
super.info(format, arg1, arg2);
}
}
@Override
public void info(String format, Object... arguments) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arguments);
suppressedMessageLogger.info(format, arguments);
} else {
super.info(format, arguments);
}
@Override
public void info(String format, Object... arguments) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, format), arguments);
suppressedMessageLogger.info(format, arguments);
} else {
super.info(format, arguments);
}
}
@Override
public void info(String msg, Throwable t) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, msg), t);
suppressedMessageLogger.info(msg, t);
} else {
super.info(msg, t);
}
@Override
public void info(String msg, Throwable t) {
if (isSuppressed(INFO)) {
super.info(withSuppressionHint(INFO, msg), t);
suppressedMessageLogger.info(msg, t);
} else {
super.info(msg, t);
}
}
@Override
public void info(Marker marker, String msg) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, msg));
suppressedMessageLogger.info(marker, msg);
} else {
super.info(marker, msg);
}
@Override
public void info(Marker marker, String msg) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, msg));
suppressedMessageLogger.info(marker, msg);
} else {
super.info(marker, msg);
}
}
@Override
public void info(Marker marker, String format, Object arg) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arg);
suppressedMessageLogger.info(marker, format, arg);
} else {
super.info(marker, format, arg);
}
@Override
public void info(Marker marker, String format, Object arg) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arg);
suppressedMessageLogger.info(marker, format, arg);
} else {
super.info(marker, format, arg);
}
}
@Override
public void info(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arg1, arg2);
suppressedMessageLogger.info(marker, format, arg1, arg2);
} else {
super.info(marker, format, arg1, arg2);
}
@Override
public void info(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arg1, arg2);
suppressedMessageLogger.info(marker, format, arg1, arg2);
} else {
super.info(marker, format, arg1, arg2);
}
}
@Override
public void info(Marker marker, String format, Object... arguments) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arguments);
suppressedMessageLogger.info(marker, format, arguments);
} else {
super.info(marker, format, arguments);
}
@Override
public void info(Marker marker, String format, Object... arguments) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(INFO, format), arguments);
suppressedMessageLogger.info(marker, format, arguments);
} else {
super.info(marker, format, arguments);
}
}
@Override
public void info(Marker marker, String msg, Throwable t) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.info(marker, msg, t);
} else {
super.info(marker, msg, t);
}
@Override
public void info(Marker marker, String msg, Throwable t) {
if (isSuppressed(INFO)) {
super.info(marker, withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.info(marker, msg, t);
} else {
super.info(marker, msg, t);
}
}
@Override
public void debug(String msg) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, msg));
suppressedMessageLogger.debug(msg);
} else {
super.debug(msg);
}
@Override
public void debug(String msg) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, msg));
suppressedMessageLogger.debug(msg);
} else {
super.debug(msg);
}
}
@Override
public void debug(String format, Object arg) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arg);
suppressedMessageLogger.debug(format, arg);
} else{
super.debug(format, arg);
}
@Override
public void debug(String format, Object arg) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arg);
suppressedMessageLogger.debug(format, arg);
} else {
super.debug(format, arg);
}
}
@Override
public void debug(String format, Object arg1, Object arg2) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arg1, arg2);
suppressedMessageLogger.debug(format, arg1, arg2);
} else {
super.debug(format, arg1, arg2);
}
@Override
public void debug(String format, Object arg1, Object arg2) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arg1, arg2);
suppressedMessageLogger.debug(format, arg1, arg2);
} else {
super.debug(format, arg1, arg2);
}
}
@Override
public void debug(String format, Object... arguments) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arguments);
suppressedMessageLogger.debug(format, arguments);
} else {
super.debug(format, arguments);
}
@Override
public void debug(String format, Object... arguments) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, format), arguments);
suppressedMessageLogger.debug(format, arguments);
} else {
super.debug(format, arguments);
}
}
@Override
public void debug(String msg, Throwable t) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.debug(msg, t);
} else {
super.debug(msg, t);
}
@Override
public void debug(String msg, Throwable t) {
if (isSuppressed(DEBUG)) {
super.debug(withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.debug(msg, t);
} else {
super.debug(msg, t);
}
}
@Override
public void debug(Marker marker, String msg) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, msg));
suppressedMessageLogger.debug(marker, msg);
} else {
super.debug(marker, msg);
}
@Override
public void debug(Marker marker, String msg) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, msg));
suppressedMessageLogger.debug(marker, msg);
} else {
super.debug(marker, msg);
}
}
@Override
public void debug(Marker marker, String format, Object arg) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arg);
suppressedMessageLogger.debug(marker, format, arg);
} else {
super.debug(marker, format, arg);
}
@Override
public void debug(Marker marker, String format, Object arg) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arg);
suppressedMessageLogger.debug(marker, format, arg);
} else {
super.debug(marker, format, arg);
}
}
@Override
public void debug(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arg1, arg2);
suppressedMessageLogger.debug(marker, format, arg1, arg2);
} else {
super.debug(marker, format, arg1, arg2);
}
@Override
public void debug(Marker marker, String format, Object arg1, Object arg2) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arg1, arg2);
suppressedMessageLogger.debug(marker, format, arg1, arg2);
} else {
super.debug(marker, format, arg1, arg2);
}
}
@Override
public void debug(Marker marker, String format, Object... arguments) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arguments);
suppressedMessageLogger.debug(marker, format, arguments);
} else {
super.debug(marker, format, arguments);
}
@Override
public void debug(Marker marker, String format, Object... arguments) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, format), arguments);
suppressedMessageLogger.debug(marker, format, arguments);
} else {
super.debug(marker, format, arguments);
}
}
@Override
public void debug(Marker marker, String msg, Throwable t) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.debug(marker, msg, t);
} else {
super.debug(marker, msg, t);
}
@Override
public void debug(Marker marker, String msg, Throwable t) {
if (isSuppressed(DEBUG)) {
super.debug(marker, withSuppressionHint(DEBUG, msg), t);
suppressedMessageLogger.debug(marker, msg, t);
} else {
super.debug(marker, msg, t);
}
}
private boolean isSuppressed(Level level) {
return activeSuppressionRule.get().isSuppressed(getName(), level);
}
private boolean isSuppressed(Level level) {
return activeSuppressionRule.get().isSuppressed(getName(), level);
}
public String withSuppressionHint(Level level, String msg) {
return String.format(suppressionPrefix, level, msg);
}
public String withSuppressionHint(Level level, String msg) {
return String.format(suppressionPrefix, level, msg);
}
}

View File

@ -23,9 +23,9 @@ if [ "{local}" = "true" ]; then
exit 0
fi
CANTON_ENTERPRISE_VERSION=2.8.0-snapshot.20231206.11618.0.vd015b3cb
CANTON_ENTERPRISE_SHA=26d264b9341850f2bafc283b6f0ee11d1e1a3190c05703c1d3a5d0d2c3e0b9e5
CANTON_ENTERPRISE_URL=https://digitalasset.jfrog.io/artifactory/assembly/daml/canton-backup/2.8.0-snapshot.20231206.11618.0.vd015b3cb/26d264b9341850f2bafc283b6f0ee11d1e1a3190c05703c1d3a5d0d2c3e0b9e5/canton-enterprise-2.8.0-snapshot.20231206.11618.0.vd015b3cb.tar.gz
CANTON_ENTERPRISE_VERSION=2.8.0-snapshot.20231207.11619.0.v1fe7c380
CANTON_ENTERPRISE_SHA=c284ea116a812c1a279cc4f0349bedb2974b214b810a1d8c7f86e35a9503a266
CANTON_ENTERPRISE_URL=https://digitalasset.jfrog.io/artifactory/assembly/daml/canton-backup/2.8.0-snapshot.20231207.11619.0.v1fe7c380/c284ea116a812c1a279cc4f0349bedb2974b214b810a1d8c7f86e35a9503a266/canton-enterprise-2.8.0-snapshot.20231207.11619.0.v1fe7c380.tar.gz
url=$$CANTON_ENTERPRISE_URL