update canton to 20240528.13380.v10deea37 (#19293)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-05-29 08:36:37 +02:00 committed by GitHub
parent 5e7a52378c
commit 78ccafad33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 705 additions and 127 deletions

View File

@ -40,7 +40,8 @@ message RootHashMessage {
bytes root_hash = 1;
string domain_id = 2;
ViewType view_type = 3;
bytes payload = 4; // optional
int64 submission_topology_time = 4; // in microseconds of UTC time since Unix epoch
bytes payload = 5; // optional
}
// Messages sent by a participant as part of the transaction protocol

View File

@ -25,6 +25,9 @@ import scala.concurrent.duration.*
* @param activeInitRetryDelay delay between attempts while waiting for initialization of the active replica
* @param sequencerInfo how long are we going to try to get the sequencer connection information. setting this high means that
* connect calls will take quite a while if one of the sequencers is offline.
* @param topologyChangeWarnDelay maximum delay between the timestamp of the topology snapshot used during
* submission and the sequencing timestamp, after which we log inconsistency
* errors as warnings
*/
final case class ProcessingTimeout(
unbounded: NonNegativeDuration = DefaultProcessingTimeouts.unbounded,
@ -42,6 +45,7 @@ final case class ProcessingTimeout(
slowFutureWarn: NonNegativeDuration = DefaultProcessingTimeouts.slowFutureWarn,
activeInitRetryDelay: NonNegativeDuration = DefaultProcessingTimeouts.activeInitRetryDelay,
sequencerInfo: NonNegativeDuration = DefaultProcessingTimeouts.sequencerInfo,
topologyChangeWarnDelay: NonNegativeDuration = DefaultProcessingTimeouts.topologyChangeWarnDelay,
)
/** Reasonable default timeouts */
@ -78,6 +82,8 @@ object DefaultProcessingTimeouts {
val sequencerInfo: NonNegativeDuration = NonNegativeDuration.tryFromDuration(30.seconds)
val topologyChangeWarnDelay: NonNegativeDuration = NonNegativeDuration.tryFromDuration(2.minutes)
@VisibleForTesting
lazy val testing: ProcessingTimeout = ProcessingTimeout()

View File

@ -5,7 +5,7 @@ package com.digitalasset.canton.protocol.messages
import cats.Functor
import com.digitalasset.canton.ProtoDeserializationError.ValueDeserializationError
import com.digitalasset.canton.data.ViewType
import com.digitalasset.canton.data.{CantonTimestamp, ViewType}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.messages.ProtocolMessage.ProtocolMessageContentCast
import com.digitalasset.canton.protocol.messages.RootHashMessage.RootHashMessagePayloadCast
@ -32,6 +32,7 @@ final case class RootHashMessage[+Payload <: RootHashMessagePayload](
rootHash: RootHash,
override val domainId: DomainId,
viewType: ViewType,
submissionTopologyTimestamp: CantonTimestamp,
payload: Payload,
)(override val representativeProtocolVersion: RepresentativeProtocolVersion[RootHashMessage.type])
extends UnsignedProtocolMessage
@ -44,6 +45,7 @@ final case class RootHashMessage[+Payload <: RootHashMessagePayload](
rootHash = rootHash.toProtoPrimitive,
domainId = domainId.toProtoPrimitive,
viewType = viewType.toProtoEnum,
submissionTopologyTime = submissionTopologyTimestamp.toProtoPrimitive,
payload = payload.getCryptographicEvidence,
)
@ -73,11 +75,13 @@ final case class RootHashMessage[+Payload <: RootHashMessagePayload](
rootHash: RootHash = rootHash,
payload: Payload2 = payload,
viewType: ViewType = viewType,
submissionTopologyTime: CantonTimestamp = submissionTopologyTimestamp,
): RootHashMessage[Payload2] =
RootHashMessage(
rootHash,
domainId,
viewType,
submissionTopologyTime,
payload,
)(representativeProtocolVersion)
@ -101,11 +105,13 @@ object RootHashMessage
domainId: DomainId,
protocolVersion: ProtocolVersion,
viewType: ViewType,
submissionTopologyTime: CantonTimestamp,
payload: Payload,
): RootHashMessage[Payload] = RootHashMessage(
rootHash,
domainId,
viewType,
submissionTopologyTime,
payload,
)(protocolVersionRepresentativeFor(protocolVersion))
@ -114,17 +120,20 @@ object RootHashMessage
)(
rootHashMessageP: v30.RootHashMessage
): ParsingResult[RootHashMessage[Payload]] = {
val v30.RootHashMessage(rootHashP, domainIdP, viewTypeP, payloadP) = rootHashMessageP
val v30.RootHashMessage(rootHashP, domainIdP, viewTypeP, submissionTopologyTimeP, payloadP) =
rootHashMessageP
for {
rootHash <- RootHash.fromProtoPrimitive(rootHashP)
domainId <- DomainId.fromProtoPrimitive(domainIdP, "domain_id")
viewType <- ViewType.fromProtoEnum(viewTypeP)
submissionTopologyTime <- CantonTimestamp.fromProtoPrimitive(submissionTopologyTimeP)
payloadO <- payloadDeserializer(payloadP)
rpv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield RootHashMessage(
rootHash,
domainId,
viewType,
submissionTopologyTime,
payloadO,
)(rpv)
}

View File

@ -45,7 +45,7 @@ object EitherTUtil {
case _ => ()
}
/** Lifts an `if (cond) then ... else ()` into the `EitherT` a pplicative */
/** Lifts an `if (cond) then ... else ()` into the `EitherT` applicative */
def ifThenET[F[_], L](cond: Boolean)(`then`: => EitherT[F, L, _])(implicit
F: Applicative[F]
): EitherT[F, L, Unit] =

View File

@ -4,9 +4,10 @@
package com.digitalasset.canton.protocol.messages
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.data.ViewType
import com.digitalasset.canton.data.{CantonTimestamp, ViewType}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.logging.{HasLoggerName, NamedLoggingContext}
import com.digitalasset.canton.protocol.RootHash
import com.digitalasset.canton.sequencing.protocol.{
Batch,
MediatorGroupRecipient,
@ -30,10 +31,15 @@ final case class TransactionConfirmationRequest(
def mediator: MediatorGroupRecipient = informeeMessage.mediator
lazy val rootHashMessage: RootHashMessage[EmptyRootHashMessagePayload.type] = RootHashMessage(
rootHash = informeeMessage.fullInformeeTree.transactionId.toRootHash,
lazy val rootHash: RootHash = informeeMessage.fullInformeeTree.transactionId.toRootHash
private def rootHashMessage(
submissionTopologyTime: CantonTimestamp
): RootHashMessage[EmptyRootHashMessagePayload.type] = RootHashMessage(
rootHash = rootHash,
domainId = informeeMessage.domainId,
viewType = ViewType.TransactionViewType,
submissionTopologyTime = submissionTopologyTime,
payload = EmptyRootHashMessagePayload,
protocolVersion = protocolVersion,
)
@ -72,7 +78,11 @@ final case class TransactionConfirmationRequest(
Recipients.recipientGroups(
recipientsNE.map(NonEmpty.mk(Set, _, mediator))
)
List(OpenEnvelope(rootHashMessage, rootHashMessageRecipients)(protocolVersion))
List(
OpenEnvelope(rootHashMessage(ipsSnapshot.timestamp), rootHashMessageRecipients)(
protocolVersion
)
)
case None =>
loggingContext.warn("Confirmation request without root hash message recipients")
List.empty

View File

@ -0,0 +1,59 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.topology
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.crypto.DomainSyncCryptoClient
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.tracing.TraceContext
import scala.concurrent.ExecutionContext
object SubmissionTopologyHelper {
/** Retrieve the topology snapshot used during submission by the submitter of a confirmation request.
* This can be used to determine the impact of a topology change between submission and sequencing.
* An example usage is during validation of a request: if some validation fails due to such a change,
* the severity of the logs can sometimes be lowered from warning to info.
*
* Return `None` if the timestamp of the topology snapshot used at submission is too far in the past
* compared to sequencing time (as determined by
* [[com.digitalasset.canton.config.ProcessingTimeout.topologyChangeWarnDelay]]).
*
* @param sequencingTimestamp the timestamp at which the request was sequenced
* @param submissionTopologyTimestamp the timestamp of the topology used at submission
*/
def getSubmissionTopologySnapshot(
timeouts: ProcessingTimeout,
sequencingTimestamp: CantonTimestamp,
submissionTopologyTimestamp: CantonTimestamp,
crypto: DomainSyncCryptoClient,
logger: TracedLogger,
)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): FutureUnlessShutdown[Option[TopologySnapshot]] = {
val maxDelay = timeouts.topologyChangeWarnDelay.toInternal
val minTs = sequencingTimestamp - maxDelay
if (submissionTopologyTimestamp >= minTs)
crypto
.awaitSnapshotUSSupervised(s"await crypto snapshot $submissionTopologyTimestamp")(
submissionTopologyTimestamp
)
.map(syncCryptoApi => Some(syncCryptoApi.ipsSnapshot))
else {
logger.info(
s"""Declared submission topology timestamp $submissionTopologyTimestamp is too far in the past (minimum
|accepted: $minTs). Ignoring.
|Note: this delay can be adjusted with the `topology-change-warn-delay` configuration parameter.""".stripMargin
.replaceAll("\n", " ")
)
FutureUnlessShutdown.pure(None)
}
}
}

View File

@ -12,6 +12,7 @@ import com.digitalasset.canton.crypto.{
SymmetricKeyScheme,
}
import com.digitalasset.canton.data.{
CantonTimestamp,
CantonTimestampSecond,
FullInformeeTree,
GeneratorsData,
@ -231,12 +232,14 @@ final class GeneratorsMessages(
rootHash <- Arbitrary.arbitrary[RootHash]
domainId <- Arbitrary.arbitrary[DomainId]
viewType <- viewTypeArb.arbitrary
submissionTopologyTime <- Arbitrary.arbitrary[CantonTimestamp]
payload <- Arbitrary.arbitrary[RootHashMessagePayload]
} yield RootHashMessage.apply(
rootHash,
domainId,
protocolVersion,
viewType,
submissionTopologyTime,
payload,
)
)

View File

@ -9,6 +9,7 @@ import cats.syntax.alternative.*
import cats.syntax.foldable.*
import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.*
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
@ -419,7 +420,7 @@ private[mediator] class ConfirmationResponseProcessor(
requestId: RequestId,
request: MediatorConfirmationRequest,
rootHashMessages: Seq[OpenEnvelope[RootHashMessage[SerializedRootHashMessagePayload]]],
topologySnapshot: TopologySnapshot,
sequencingTopologySnapshot: TopologySnapshot,
)(implicit
loggingContext: ErrorLoggingContext
): EitherT[Future, MediatorVerdict.MediatorReject, Unit] = {
@ -459,62 +460,205 @@ private[mediator] class ConfirmationResponseProcessor(
def wrongViewType(expectedViewType: ViewType): Seq[ViewType] =
rootHashMessages.map(_.protocolMessage.viewType).filterNot(_ == expectedViewType).distinct
def checkWrongMembers(
wrongMembers: RootHashMessageRecipients.WrongMembers
)(implicit traceContext: TraceContext): EitherT[Future, RejectionReason, Unit] =
NonEmpty.from(wrongMemberErrors(wrongMembers)) match {
// The check using the sequencing topology snapshot reported no error
case None => EitherT.pure[Future, RejectionReason](())
case Some(errorsNE) =>
val mayBeDueToTopologyChange = errorsNE.forall(_.mayBeDueToTopologyChange)
val dueToTopologyChangeF = if (mayBeDueToTopologyChange) {
// The check reported only errors that may be due to a topology change.
val dueToTopologyChangeOF = for {
submissionTopologySnapshot <- OptionT(getSubmissionTopologySnapshot)
// Perform the check using the topology at submission time.
wrongMembersAtSubmission <- OptionT.liftF(
RootHashMessageRecipients.wrongMembers(
rootHashMessagesRecipients,
request,
submissionTopologySnapshot,
)(ec, loggingContext)
)
// If there are no errors using the topology at submission time, consider that the errors
// are due to a topology change.
} yield wrongMemberErrors(wrongMembersAtSubmission).isEmpty
dueToTopologyChangeOF.getOrElse {
// We could not obtain a submission topology snapshot.
// Consider that the errors are NOT due to a topology change.
false
}
} else {
// At least some of the reported errors are not due to a topology change
Future.successful(false)
}
// Use the first error for the rejection
val firstError = errorsNE.head1
EitherT.left[Unit](dueToTopologyChangeF.map(firstError.toRejectionReason))
}
def wrongMemberErrors(
wrongMembers: RootHashMessageRecipients.WrongMembers
): Seq[WrongMemberError] = {
val missingInformeeParticipantsO = Option.when(
wrongMembers.missingInformeeParticipants.nonEmpty
)(
WrongMemberError(
show"Missing root hash message for informee participants: ${wrongMembers.missingInformeeParticipants}",
// This may be due to a topology change, e.g. if a party-to-participant mapping is added for an informee
mayBeDueToTopologyChange = true,
)
)
val superfluousMembersO = Option.when(wrongMembers.superfluousMembers.nonEmpty)(
WrongMemberError(
show"Superfluous root hash message for members: ${wrongMembers.superfluousMembers}",
// This may be due to a topology change, e.g. if a party-to-participant mapping is removed for an informee
mayBeDueToTopologyChange = true,
)
)
val superfluousInformeesO = Option.when(wrongMembers.superfluousInformees.nonEmpty)(
WrongMemberError(
show"Superfluous root hash message for group addressed parties: ${wrongMembers.superfluousInformees}",
mayBeDueToTopologyChange = false,
)
)
missingInformeeParticipantsO.toList ++ superfluousMembersO ++ superfluousInformeesO
}
// Retrieve the topology snapshot at submission time. Return `None` in case of error.
def getSubmissionTopologySnapshot(implicit
traceContext: TraceContext
): Future[Option[TopologySnapshot]] = {
val submissionTopologyTimestamps = rootHashMessages
.map(_.protocolMessage.submissionTopologyTimestamp)
.distinct
submissionTopologyTimestamps match {
case Seq(submissionTopologyTimestamp) =>
val sequencingTimestamp = requestId.unwrap
SubmissionTopologyHelper
.getSubmissionTopologySnapshot(
timeouts,
sequencingTimestamp,
submissionTopologyTimestamp,
crypto,
logger,
)
.unwrap
.map(
_.onShutdown {
// TODO(i19352): Propagate `FutureUnlessShutdown` in the request validation
logger.debug(
"Returning `None` for the submission topology snapshot due to shutting down"
)
None
}
)
case Seq() =>
// This can only happen if there are no root hash messages.
// This will be detected during the wrong members check and logged as a warning, so we can log at info level.
logger.info(
s"No declared submission topology timestamp found. Inconsistencies will be logged as warnings."
)
Future.successful(None)
case _ =>
// Log at warning level because this is not detected by another check
logger.warn(
s"Found ${submissionTopologyTimestamps.size} different declared submission topology timestamps. Inconsistencies will be logged as warnings."
)
Future.successful(None)
}
}
val unitOrRejectionReason = for {
_ <- EitherTUtil
.condUnitET[Future](
wrongRecipients.isEmpty,
show"Root hash messages with wrong recipients tree: $wrongRecipients",
RejectionReason(
show"Root hash messages with wrong recipients tree: $wrongRecipients",
dueToTopologyChange = false,
),
)
repeated = repeatedMembers(rootHashMessagesRecipients)
_ <- EitherTUtil.condUnitET[Future](
repeated.isEmpty,
show"Several root hash messages for recipients: $repeated",
RejectionReason(
show"Several root hash messages for recipients: $repeated",
dueToTopologyChange = false,
),
)
_ <- EitherTUtil.condUnitET[Future](
distinctPayloads.sizeCompare(1) <= 0,
show"Different payloads in root hash messages. Sizes: ${distinctPayloads.map(_.bytes.size).mkShow()}.",
RejectionReason(
show"Different payloads in root hash messages. Sizes: ${distinctPayloads.map(_.bytes.size).mkShow()}.",
dueToTopologyChange = false,
),
)
wrongHashes = wrongRootHashes(request.rootHash)
wrongViewTypes = wrongViewType(request.viewType)
wrongMembersF = RootHashMessageRecipients.wrongMembers(
rootHashMessagesRecipients,
request,
topologySnapshot,
sequencingTopologySnapshot,
)(ec, loggingContext)
_ <- for {
_ <- EitherTUtil
.condUnitET[Future](wrongHashes.isEmpty, show"Wrong root hashes: $wrongHashes")
wrongMems <- EitherT.liftF(wrongMembersF)
.condUnitET[Future](
wrongHashes.isEmpty,
RejectionReason(show"Wrong root hashes: $wrongHashes", dueToTopologyChange = false),
)
wrongMembers <- EitherT.liftF(wrongMembersF)
_ <- EitherTUtil.condUnitET[Future](
wrongViewTypes.isEmpty,
show"View types in root hash messages differ from expected view type ${request.viewType}: $wrongViewTypes",
)
_ <-
EitherTUtil.condUnitET[Future](
wrongMems.missingInformeeParticipants.isEmpty,
show"Missing root hash message for informee participants: ${wrongMems.missingInformeeParticipants}",
)
_ <- EitherTUtil.condUnitET[Future](
wrongMems.superfluousMembers.isEmpty,
show"Superfluous root hash message for members: ${wrongMems.superfluousMembers}",
)
_ <- EitherTUtil.condUnitET[Future](
wrongMems.superfluousInformees.isEmpty,
show"Superfluous root hash message for group addressed parties: ${wrongMems.superfluousInformees}",
RejectionReason(
show"View types in root hash messages differ from expected view type ${request.viewType}: $wrongViewTypes",
dueToTopologyChange = false,
),
)
_ <- checkWrongMembers(wrongMembers)(loggingContext)
} yield ()
} yield ()
unitOrRejectionReason.leftMap { rejectionReason =>
val rejection = MediatorError.MalformedMessage
.Reject(
s"Received a mediator confirmation request with id $requestId with invalid root hash messages. Rejecting... Reason: $rejectionReason"
)
.reported()(loggingContext)
unitOrRejectionReason.leftMap { case RejectionReason(reason, dueToTopologyChange) =>
val message =
s"Received a mediator confirmation request with id $requestId with invalid root hash messages. Rejecting... Reason: $reason"
val rejection = MediatorError.MalformedMessage.Reject(message)
// If the errors are due to a topology change, we consider the request as non-malicious.
// Otherwise, we consider it malicious.
if (dueToTopologyChange) logErrorDueToTopologyChange(message)(loggingContext)
else rejection.reported()(loggingContext)
MediatorVerdict.MediatorReject(rejection)
}
}
private case class WrongMemberError(reason: String, mayBeDueToTopologyChange: Boolean) {
def toRejectionReason(dueToTopologyChange: Boolean): RejectionReason =
RejectionReason(reason, dueToTopologyChange)
}
private case class RejectionReason(reason: String, dueToTopologyChange: Boolean)
private def logErrorDueToTopologyChange(
error: String
)(implicit traceContext: TraceContext): Unit = logger.info(
error +
""" This error is due to a change of topology state between the declared topology timestamp used
| for submission and the sequencing time of the request.""".stripMargin
)
private def validateMinimumThreshold(
requestId: RequestId,
request: MediatorConfirmationRequest,

View File

@ -59,6 +59,8 @@ class ConfirmationResponseProcessorTest
protected val passiveMediator3 = MediatorId(UniqueIdentifier.tryCreate("mediator", "three"))
protected val activeMediator4 = MediatorId(UniqueIdentifier.tryCreate("mediator", "four"))
protected val testTopologyTimestamp = CantonTimestamp.Epoch
private def mediatorGroup0(mediators: NonEmpty[Seq[MediatorId]]) =
MediatorGroup(MediatorGroupIndex.zero, mediators, Seq.empty, PositiveInt.one)
@ -306,6 +308,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
TransactionViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
),
Recipients.cc(MemberRecipient(participant), mediatorGroupRecipient),
@ -492,6 +495,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
correctViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
@ -556,6 +560,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
correctViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val wrongRootHashMessage = correctRootHashMessage.copy(rootHash = wrongRootHash)
@ -747,6 +752,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
mediatorRequest.viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
@ -794,6 +800,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
ViewType.TransactionViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val mockTopologySnapshot = mock[TopologySnapshot]
@ -1064,6 +1071,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
ViewType.TransactionViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
@ -1196,6 +1204,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
ViewType.TransactionViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
@ -1252,6 +1261,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
mediatorRequest.viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
@ -1347,6 +1357,7 @@ class ConfirmationResponseProcessorTest
domainId,
testedProtocolVersion,
mediatorRequest.viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)

View File

@ -167,6 +167,7 @@ class DefaultVerdictSenderTest
val domainId: DomainId = DomainId(
UniqueIdentifier.tryFromProtoPrimitive("domain::test")
)
val testTopologyTimestamp = CantonTimestamp.Epoch
val factory =
new ExampleTransactionFactory()(domainId = domainId, mediatorGroup = transactionMediatorGroup)
@ -179,6 +180,7 @@ class DefaultVerdictSenderTest
domainId,
testedProtocolVersion,
ViewType.TransactionViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val participant: ParticipantId = ExampleTransactionFactory.submittingParticipant

View File

@ -35,7 +35,7 @@ import com.digitalasset.canton.platform.store.packagemeta.PackageMetadata
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.ThereafterOps
import com.digitalasset.canton.util.{EitherTUtil, PathUtils, SimpleExecutionQueue}
import com.digitalasset.canton.util.{PathUtils, SimpleExecutionQueue}
import com.digitalasset.canton.{LedgerSubmissionId, LfPackageId}
import com.google.protobuf.ByteString
@ -219,11 +219,15 @@ class PackageUploader(
PackageServiceErrors.Validation.handleLfEnginePackageError(_): DamlError
)
)
_ <- EitherTUtil.ifThenET(enableUpgradeValidation)(
packageUpgradeValidator
.validateUpgrade(mainPackage)(LoggingContextWithTrace(loggerFactory))
.mapK(FutureUnlessShutdown.outcomeK)
)
_ <-
if (enableUpgradeValidation) {
packageUpgradeValidator
.validateUpgrade(mainPackage)(LoggingContextWithTrace(loggerFactory))
.mapK(FutureUnlessShutdown.outcomeK)
} else {
logger.info(s"Skipping upgrade validation for package ${mainPackage._1}.")
EitherT.pure[FutureUnlessShutdown, DamlError](())
}
} yield ()
private def readDarFromPayload(darPayload: ByteString, darNameO: Option[String])(implicit

View File

@ -51,7 +51,7 @@ import com.digitalasset.canton.sequencing.client.*
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.sequencing.{AsyncResult, HandlerResult}
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.{DomainId, ParticipantId}
import com.digitalasset.canton.topology.{DomainId, ParticipantId, SubmissionTopologyHelper}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.EitherTUtil.{condUnitET, ifThenET}
import com.digitalasset.canton.util.EitherUtil.RichEither
@ -723,12 +723,24 @@ abstract class ProtocolProcessor[
viewsWithCorrectRootHash.map { case (view, _) => view.unwrap }
)
submissionTopologyTimestamp = rootHashMessage.submissionTopologyTimestamp
submissionTopologySnapshotO <- EitherT.right(
SubmissionTopologyHelper.getSubmissionTopologySnapshot(
timeouts,
ts,
submissionTopologyTimestamp,
crypto,
logger,
)
)
checkRecipientsResult <- EitherT.right(
FutureUnlessShutdown.outcomeF(
recipientsValidator.retainInputsWithValidRecipients(
requestId,
viewsWithCorrectRootHash,
snapshot.ipsSnapshot,
submissionTopologySnapshotO,
)
)
)
@ -1931,13 +1943,28 @@ object ProtocolProcessor {
)
}
final case class WrongRecipients(viewTree: ViewTree) extends MalformedPayload {
sealed trait WrongRecipientsBase extends MalformedPayload
final case class WrongRecipients(viewTree: ViewTree) extends WrongRecipientsBase {
override def pretty: Pretty[WrongRecipients] =
prettyOfClass(
param("viewHash", _.viewTree.viewHash),
param("viewPosition", _.viewTree.viewPosition),
)
def dueToTopologyChange: WrongRecipientsDueToTopologyChange =
WrongRecipientsDueToTopologyChange(viewTree)
}
final case class WrongRecipientsDueToTopologyChange(viewTree: ViewTree)
extends WrongRecipientsBase {
override def pretty: Pretty[WrongRecipientsDueToTopologyChange] =
prettyOfClass(
param("viewHash", _.viewTree.viewHash),
param("viewPosition", _.viewTree.viewPosition),
)
}
final case class IncompleteLightViewTree(

View File

@ -386,11 +386,9 @@ class TransactionProcessingSteps(
val batchSize = batch.toProtoVersioned.serializedSize
metrics.protocolMessages.confirmationRequestSize.update(batchSize)(MetricsContext.Empty)
val rootHash = request.rootHashMessage.rootHash
new PreparedTransactionBatch(
batch,
rootHash,
request.rootHash,
submitterInfoWithDedupPeriod.toCompletionInfo(),
): PreparedBatch
}
@ -595,7 +593,7 @@ class TransactionProcessingSteps(
case ParticipantsOfParty(party) => party
}
if (parties.nonEmpty) {
crypto.ips.currentSnapshotApproximation
snapshot.ipsSnapshot
.activeParticipantsOfParties(
parties.toSeq.map(_.toLf)
)

View File

@ -216,6 +216,7 @@ private[transfer] class TransferInProcessingSteps(
domainId.unwrap,
targetProtocolVersion.v,
ViewType.TransferInViewType,
recentSnapshot.ipsSnapshot.timestamp,
EmptyRootHashMessagePayload,
)
// Each member gets a message sent to itself and to the mediator

View File

@ -215,6 +215,7 @@ class TransferOutProcessingSteps(
domainId.unwrap,
sourceDomainProtocolVersion.v,
ViewType.TransferOutViewType,
sourceRecentSnapshot.ipsSnapshot.timestamp,
EmptyRootHashMessagePayload,
)
val rootHashRecipients =

View File

@ -3,13 +3,17 @@
package com.digitalasset.canton.participant.protocol.validation
import cats.data.OptionT
import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.data.ViewPosition.MerklePathElement
import com.digitalasset.canton.data.{ViewPosition, ViewTree}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.WrongRecipients
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.{
WrongRecipients,
WrongRecipientsBase,
}
import com.digitalasset.canton.participant.sync.SyncServiceError.SyncServiceAlarm
import com.digitalasset.canton.protocol.RequestId
import com.digitalasset.canton.sequencing.protocol.{
@ -24,6 +28,7 @@ import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.{ErrorUtil, IterableUtil}
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
class RecipientsValidator[I](
@ -58,16 +63,108 @@ class RecipientsValidator[I](
* - Every informee participant of v has received every descendant of v.
* - Every informee participant of v can evaluate the above conditions 1-4 for v and will conclude that v should be kept.
*
* @return inputs with valid recipients
* As for all other validations, the recipients validation is performed using the topology effective at the sequencing time
* of the request. The submitter, however, creates the request using the latest topology available locally. Therefore, it is
* possible that a change in topology between submission and sequencing time results in errors detected by the
* recipients validator: for example, a party can be newly or no longer hosted on a participant, thereby affecting
* the recipients.
* These situations are quite common on a busy network with concurrent changes, and logging them at a high severity
* is overkill and possibly confusing. Therefore, if the errors detected can possibly be attributed to a change in the
* topology, we perform the check again using the topology at submission time. If no error occurs then, the original
* errors are logged at a lower severity.
* Note that the *outcome* of the validation is itself not affected: in other words, the request will still be rejected.
* The only change is the severity of the logs.
*
* @return errors for the incorrect recipients and inputs with valid recipients
* @throws java.lang.IllegalArgumentException if the views corresponding to inputs have different root hashes
*/
def retainInputsWithValidRecipients(
requestId: RequestId,
inputs: Seq[I],
sequencingSnapshot: PartyTopologySnapshotClient,
submissionSnapshotO: Option[PartyTopologySnapshotClient],
)(implicit
traceContext: TraceContext
): Future[(Seq[WrongRecipientsBase], Seq[I])] = {
def handleAsMalicious(
errors: RecipientsValidatorErrors,
wrongRecipients: Seq[WrongRecipients],
): Seq[WrongRecipientsBase] = {
errors.alarm()
wrongRecipients
}
def handleAsNonMalicious(
errors: RecipientsValidatorErrors,
wrongRecipients: Seq[WrongRecipients],
): Seq[WrongRecipientsBase] = {
errors.logDueToTopologyChange()
// Tag the malformed payloads as due to a topology change
wrongRecipients.map(_.dueToTopologyChange)
}
for {
resultsWithSequencingSnapshot <- retainInputsWithValidRecipientsInternal(
requestId,
inputs,
sequencingSnapshot,
)
(wrongRecipients, goodInputs, errors) = resultsWithSequencingSnapshot
actualWrongRecupients <- {
if (errors.isEmpty) {
// The recipients check reported no error.
Future.successful(wrongRecipients)
} else if (errors.mayBeDueToTopologyChange) {
// The recipients check reported only errors that may be due to a topology change.
(for {
submissionSnapshot <- OptionT.fromOption[Future](submissionSnapshotO)
// Perform the recipients check using the topology at submission time.
resultWithSubmissionSnapshot <- OptionT.liftF(
retainInputsWithValidRecipientsInternal(
requestId,
inputs,
submissionSnapshot,
)
)
(_, _, errorsWithSubmissionSnapshot) = resultWithSubmissionSnapshot
} yield
if (errorsWithSubmissionSnapshot.isEmpty) {
// The recipients are correct when checked with the topology at submission time.
// Consider the request as non-malicious.
handleAsNonMalicious(errors, wrongRecipients)
} else {
// We still have errors. Consider the request as malicious.
handleAsMalicious(errors, wrongRecipients)
}).getOrElse {
// The submission snapshot is too old. Consider the request as malicious.
handleAsMalicious(errors, wrongRecipients)
}
} else {
// At least some of the reported errors are not due to a topology change.
// Consider the request as malicious.
Future.successful(handleAsMalicious(errors, wrongRecipients))
}
}
} yield {
(actualWrongRecupients, goodInputs)
}
}
def retainInputsWithValidRecipientsInternal(
requestId: RequestId,
inputs: Seq[I],
snapshot: PartyTopologySnapshotClient,
)(implicit
traceContext: TraceContext
): Future[(Seq[WrongRecipients], Seq[I])] = {
): Future[(Seq[WrongRecipients], Seq[I], RecipientsValidatorErrors)] = {
// Used to accumulate all the errors to report later.
// Each error also has an associated flag indicating whether it may be due to a topology change.
val errorBuilder = Seq.newBuilder[Error]
val rootHashes = inputs.map(viewOfInput(_).rootHash).distinct
ErrorUtil.requireArgument(
@ -96,13 +193,23 @@ class RecipientsValidator[I](
// This checks Condition 2 and 3.
val invalidRecipientPositions =
inputs.mapFilter(input =>
checkRecipientsTree(context, viewOfInput(input).viewPosition, recipientsOfInput(input))
checkRecipientsTree(
context,
viewOfInput(input).viewPosition,
recipientsOfInput(input),
errorBuilder,
)
)
val badViewPositions = invalidRecipientPositions ++ inactivePartyPositions
val badViewPositions = (invalidRecipientPositions ++ inactivePartyPositions).map {
case BadViewPosition(badViewPosition, error, mayBeDueToTopologyChange) =>
// Collect all the errors from the bad positions
errorBuilder += Error(error, mayBeDueToTopologyChange)
badViewPosition
}
// Check Condition 4, i.e., remove inputs that have a bad view position as descendant.
inputs.partitionMap { input =>
val (wrongRecipients, goodInputs) = inputs.partitionMap { input =>
val viewTree = viewOfInput(input)
val isGood = badViewPositions.forall(badViewPosition =>
@ -115,6 +222,30 @@ class RecipientsValidator[I](
WrongRecipients(viewTree),
)
}
val errorsToReport = new RecipientsValidatorErrors(errorBuilder.result())
(wrongRecipients, goodInputs, errorsToReport)
}
}
class RecipientsValidatorErrors(private val errors: Seq[Error]) {
lazy val isEmpty: Boolean = errors.isEmpty
lazy val mayBeDueToTopologyChange: Boolean = errors.forall(_.mayBeDueToTopologyChange)
def alarm()(implicit traceContext: TraceContext): Unit = errors.foreach {
case Error(error, _) =>
SyncServiceAlarm.Warn(error).report()
}
def logDueToTopologyChange()(implicit traceContext: TraceContext): Unit = errors.foreach {
case Error(error, _) =>
logger.info(
error +
""" This error is due to a change of topology state between the declared topology timestamp used
| for submission and the sequencing time of the request.""".stripMargin
)
}
}
@ -142,9 +273,7 @@ class RecipientsValidator[I](
/** Yields the positions of those views that have an informee without an active participant.
*/
private def computeInactivePartyPositions(
context: Context
)(implicit traceContext: TraceContext): Seq[ViewPosition] = {
private def computeInactivePartyPositions(context: Context): Seq[BadViewPosition] = {
val Context(requestId, _, informeeParticipantsOfPositionAndParty) = context
informeeParticipantsOfPositionAndParty.toSeq.mapFilter {
@ -157,15 +286,14 @@ class RecipientsValidator[I](
}.toSet
Option.when(inactiveParties.nonEmpty) {
SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $viewPosition has " +
s"informees without an active participant: $inactiveParties. " +
s"Discarding $viewPosition..."
)
.report()
val error =
s"Received a request with id $requestId where the view at $viewPosition has " +
s"informees without an active participant: $inactiveParties. " +
s"Discarding $viewPosition..."
viewPosition
// This may be due to a topology change, e.g. if all party-to-participant mappings for an informee
// are removed, or if a participant is disabled
BadViewPosition(viewPosition, error, mayBeDueToTopologyChange = true)
}
}
}
@ -186,32 +314,31 @@ class RecipientsValidator[I](
context: Context,
mainViewPosition: ViewPosition,
recipients: Recipients,
errorBuilder: mutable.Builder[Error, Seq[Error]],
)(implicit
traceContext: TraceContext
): Option[ViewPosition] = {
): Option[BadViewPosition] = {
val allRecipientPathsViewToRoot = recipients.allPaths.map(_.reverse)
if (allRecipientPathsViewToRoot.sizeCompare(1) > 0) {
SyncServiceAlarm
.Warn(
s"Received a request with id ${context.requestId} where the view at $mainViewPosition has a non-linear recipients tree. " +
s"Processing all paths of the tree.\n$recipients"
)
.report()
errorBuilder += Error(
s"Received a request with id ${context.requestId} where the view at $mainViewPosition has a non-linear recipients tree. " +
s"Processing all paths of the tree.\n$recipients",
mayBeDueToTopologyChange = false,
)
}
val badViewPositions = allRecipientPathsViewToRoot
.map(checkRecipientsPath(context, recipients, mainViewPosition.position, _))
val badViewPositions = allRecipientPathsViewToRoot.map(
checkRecipientsPath(context, recipients, mainViewPosition.position, _, errorBuilder)
)
val res = badViewPositions.minBy1 {
case Some((viewPosition, _)) => viewPosition.position.size
case Some(BadViewPosition(viewPosition, _, _)) => viewPosition.position.size
case None => 0
}
res.map { case (viewPosition, alarm) =>
alarm.report()
viewPosition
}
res
}
/** Yields the closest (i.e. bottom-most) ancestor of a view (if any)
@ -230,7 +357,8 @@ class RecipientsValidator[I](
mainRecipients: Recipients,
mainViewPosition: List[MerklePathElement],
recipientsPathViewToRoot: Seq[Set[Recipient]],
)(implicit traceContext: TraceContext): Option[(ViewPosition, SyncServiceAlarm.Warn)] = {
errorBuilder: mutable.Builder[Error, Seq[Error]],
)(implicit traceContext: TraceContext): Option[BadViewPosition] = {
val Context(requestId, informeesWithGroupAddressing, informeeParticipantsOfPositionAndParty) =
context
@ -246,11 +374,11 @@ class RecipientsValidator[I](
case (Some(_recipientGroup), None) =>
// recipientsPathViewToRoot is too long. This is not a problem for transparency, but it can be a problem for privacy.
SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $mainViewPosition has too many levels of recipients. Continue processing...\n$mainRecipients"
)
.report()
errorBuilder += Error(
s"Received a request with id $requestId where the view at $mainViewPosition has too many levels of recipients. Continue processing...\n$mainRecipients",
mayBeDueToTopologyChange = false,
)
None
case (None, Some(viewPosition)) =>
@ -258,14 +386,14 @@ class RecipientsValidator[I](
// If we receive a view, we also need to receive corresponding recipient groups for the view and all descendants.
// This is not the case here, so we need to discard the view at viewPosition.
val alarm = SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"no recipients group for $viewPosition. " +
s"Discarding $viewPosition with all ancestors..."
)
val error =
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"no recipients group for $viewPosition. " +
s"Discarding $viewPosition with all ancestors..."
Some(ViewPosition(viewPosition) -> alarm)
Some(
BadViewPosition(ViewPosition(viewPosition), error, mayBeDueToTopologyChange = false)
)
} else {
// We have not received the view at viewPosition. So there is no point in discarding it.
@ -279,14 +407,11 @@ class RecipientsValidator[I](
// an ancestor thereof contains this participant as recipient. (Property guaranteed by the sequencer.)
// Hence, we should have received the current view.
// Alarm, as this is not the case.
val error =
s"Received a request with id $requestId where the view at $viewPosition is missing. " +
s"Discarding all ancestors of $viewPosition..."
val alarm = SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $viewPosition is missing. " +
s"Discarding all ancestors of $viewPosition..."
)
Some(ViewPosition(viewPosition) -> alarm)
Some(BadViewPosition(ViewPosition(viewPosition), error, mayBeDueToTopologyChange = false))
case (Some(recipientGroup), Some(viewPosition)) =>
// We have received a view and there is a recipient group.
@ -304,24 +429,29 @@ class RecipientsValidator[I](
}.toSet
val extraRecipients = recipientGroup -- informeeRecipients
if (extraRecipients.nonEmpty)
SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"extra recipients $extraRecipients for the view at $viewPosition. " +
s"Continue processing..."
)
.report()
if (extraRecipients.nonEmpty) {
errorBuilder += Error(
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"extra recipients $extraRecipients for the view at $viewPosition. " +
s"Continue processing...",
// This may be due to a topology change, e.g. if a party-to-participant mapping is removed for an informee
mayBeDueToTopologyChange = true,
)
}
val missingRecipients = informeeRecipients -- recipientGroup
lazy val missingRecipientsAlarm = SyncServiceAlarm
.Warn(
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"missing recipients $missingRecipients for the view at $viewPosition. " +
s"Discarding $viewPosition with all ancestors..."
)
lazy val error =
s"Received a request with id $requestId where the view at $mainViewPosition has " +
s"missing recipients $missingRecipients for the view at $viewPosition. " +
s"Discarding $viewPosition with all ancestors..."
Option.when(missingRecipients.nonEmpty)(
ViewPosition(viewPosition) -> missingRecipientsAlarm
BadViewPosition(
ViewPosition(viewPosition),
error,
// This may be due to a topology change, e.g. if a party-to-participant mapping is added for an informee
mayBeDueToTopologyChange = true,
)
)
}
.collectFirst { case Some(result) => result }
@ -338,4 +468,16 @@ object RecipientsValidator {
Map[LfPartyId, Set[ParticipantId]],
],
)
private final case class BadViewPosition(
viewPosition: ViewPosition,
error: String,
mayBeDueToTopologyChange: Boolean,
)
private final case class Error(
error: String,
mayBeDueToTopologyChange: Boolean,
)
}

View File

@ -4,9 +4,13 @@
package com.digitalasset.canton.participant.protocol.validation
import cats.syntax.parallel.*
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.error.TransactionError
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.MalformedPayload
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.{
MalformedPayload,
WrongRecipientsDueToTopologyChange,
}
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.protocol.messages.*
import com.digitalasset.canton.topology.client.TopologySnapshot
@ -290,7 +294,15 @@ class TransactionConfirmationResponseFactory(
requestId: RequestId,
rootHash: RootHash,
malformedPayloads: Seq[MalformedPayload],
)(implicit traceContext: TraceContext): ConfirmationResponse =
)(implicit traceContext: TraceContext): ConfirmationResponse = {
val rejectError = LocalRejectError.MalformedRejects.Payloads.Reject(malformedPayloads.toString)
val dueToTopologyChange = malformedPayloads.forall {
case WrongRecipientsDueToTopologyChange(_) => true
case _ => false
}
if (!dueToTopologyChange) logged(requestId, rejectError).discard
checked(
ConfirmationResponse
.tryCreate(
@ -300,15 +312,12 @@ class TransactionConfirmationResponseFactory(
// The mediator will interpret this as a rejection
// for all views and on behalf of all declared confirming parties hosted by the participant.
None,
logged(
requestId,
LocalRejectError.MalformedRejects.Payloads
.Reject(malformedPayloads.toString),
).toLocalReject(protocolVersion),
rejectError.toLocalReject(protocolVersion),
rootHash,
Set.empty,
domainId,
protocolVersion,
)
)
}
}

View File

@ -79,6 +79,7 @@ trait MessageDispatcherTest {
import MessageDispatcherTest.*
private val domainId = DomainId.tryFromString("messageDispatcher::domain")
private val testTopologyTimestamp = CantonTimestamp.Epoch
private val participantId =
ParticipantId.tryFromProtoPrimitive("PAR::messageDispatcher::participant")
private val otherParticipant = ParticipantId.tryFromProtoPrimitive("PAR::other::participant")
@ -729,6 +730,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
UnknownTestViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val event = mkDeliver(
@ -815,6 +817,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val event =
@ -842,6 +845,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
// Batch -> expected alarms -> expected reaction
@ -956,6 +960,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val fatalBatches = List(
@ -1012,6 +1017,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val badBatches = List(
@ -1089,6 +1095,7 @@ trait MessageDispatcherTest {
domainId,
testedProtocolVersion,
viewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
val event =

View File

@ -364,6 +364,7 @@ class ProtocolProcessorTest
}
private lazy val rootHash = RootHash(TestHash.digest(1))
private lazy val testTopologyTimestamp = CantonTimestamp.Epoch
private lazy val viewHash = ViewHash(TestHash.digest(2))
private lazy val encryptedView =
EncryptedView(TestViewType)(Encrypted.fromByteString(rootHash.toProtoPrimitive))
@ -382,6 +383,7 @@ class ProtocolProcessorTest
DefaultTestIdentities.domainId,
testedProtocolVersion,
TestViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
private lazy val someRecipients = Recipients.cc(participant)

View File

@ -84,6 +84,8 @@ final class TransferOutProcessingStepsTest
private implicit val ec: ExecutionContext = executorService
private val testTopologyTimestamp = CantonTimestamp.Epoch
private lazy val sourceDomain = SourceDomainId(
DomainId(UniqueIdentifier.tryFromProtoPrimitive("source::domain"))
)
@ -891,6 +893,7 @@ final class TransferOutProcessingStepsTest
sourceDomain.unwrap,
testedProtocolVersion,
TransferOutViewType,
testTopologyTimestamp,
SerializedRootHashMessagePayload.empty,
)
}

View File

@ -7,7 +7,10 @@ import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.crypto.TestHash
import com.digitalasset.canton.data.ViewPosition.MerkleSeqIndex
import com.digitalasset.canton.data.{CantonTimestamp, ViewPosition}
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.WrongRecipients
import com.digitalasset.canton.participant.protocol.ProtocolProcessor.{
WrongRecipients,
WrongRecipientsDueToTopologyChange,
}
import com.digitalasset.canton.participant.protocol.TestProcessingSteps.TestViewTree
import com.digitalasset.canton.participant.sync.SyncServiceError.SyncServiceAlarm
import com.digitalasset.canton.protocol.{RequestId, RootHash, ViewHash}
@ -107,7 +110,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
val input = mkInput1(Seq(party1, party2), NonEmpty(Seq, participant1, participant2))
val (wrongRecipients, goodInputs) = validator
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot)
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot, None)
.futureValue
goodInputs shouldBe Seq(input)
@ -129,6 +132,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue
@ -143,7 +147,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot)
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot, None)
.futureValue,
_.shouldBeCantonError(
SyncServiceAlarm,
@ -154,6 +158,34 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
goodInputs shouldBe empty
wrongRecipients.loneElement shouldBe WrongRecipients(input.viewTree)
}
"not warn if that is due to a topology change" in {
val input = mkInput1(Seq(inactive, party1), NonEmpty(Seq, participant1))
val submissionSnapshot: TopologySnapshot =
TestingTopology(topology =
Map(
inactive -> Map(participant1 -> ParticipantPermission.Submission),
party1 -> Map(participant1 -> ParticipantPermission.Submission),
party2 -> Map(participant2 -> ParticipantPermission.Submission),
)
).build(loggerFactory).topologySnapshot()
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(
requestId,
Seq(input),
snapshot,
Some(submissionSnapshot),
)
.futureValue
// No assertion to explicitly check that no warning is emitted
)
goodInputs shouldBe empty
wrongRecipients.loneElement shouldBe WrongRecipientsDueToTopologyChange(input.viewTree)
}
}
"views have different root hashes" must {
@ -166,6 +198,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(input1, input2),
snapshot,
None,
),
_.getMessage should startWith("Views with different root hashes are not supported:"),
)
@ -187,6 +220,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -201,6 +235,42 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
WrongRecipients(childInput.viewTree),
)
}
"not warn if that is due to a topology change" in {
val parentInput = mkInput1(Seq(party1), NonEmpty(Seq, participant1), viewDepth = 1)
val childInput = mkInput2(
Seq(party1, inactive),
mkRecipients(Seq(NonEmpty(Set, participant1), NonEmpty(Set, participant1))),
viewDepth = 2,
)
val submissionSnapshot: TopologySnapshot =
TestingTopology(topology =
Map(
inactive -> Map(participant1 -> ParticipantPermission.Submission),
party1 -> Map(participant1 -> ParticipantPermission.Submission),
party2 -> Map(participant2 -> ParticipantPermission.Submission),
)
).build(loggerFactory).topologySnapshot()
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(
requestId,
Seq(parentInput, childInput),
snapshot,
Some(submissionSnapshot),
)
.futureValue
// No assertion to explicitly check that no warning is emitted
)
goodInputs shouldBe empty
wrongRecipients shouldBe Seq(
WrongRecipientsDueToTopologyChange(parentInput.viewTree),
WrongRecipientsDueToTopologyChange(childInput.viewTree),
)
}
}
"a view has a missing recipient" must {
@ -209,7 +279,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot)
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot, None)
.futureValue,
_.shouldBeCantonError(
SyncServiceAlarm,
@ -220,6 +290,34 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
goodInputs shouldBe empty
wrongRecipients.loneElement shouldBe WrongRecipients(input.viewTree)
}
"not warn if that is due to a topology change" in {
val input = mkInput1(Seq(party1, party2), NonEmpty(Seq, participant1))
val submissionSnapshot: TopologySnapshot =
TestingTopology(topology =
Map(
inactive -> Map.empty,
party1 -> Map(participant1 -> ParticipantPermission.Submission),
party2 -> Map(participant1 -> ParticipantPermission.Submission),
)
).build(loggerFactory).topologySnapshot()
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(
requestId,
Seq(input),
snapshot,
Some(submissionSnapshot),
)
.futureValue
// No assertion to explicitly check that no warning is emitted
)
goodInputs shouldBe empty
wrongRecipients.loneElement shouldBe WrongRecipientsDueToTopologyChange(input.viewTree)
}
}
"a parent view has a missing recipient" must {
@ -239,6 +337,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -250,6 +349,8 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
goodInputs.loneElement shouldBe childInput
wrongRecipients.loneElement shouldBe WrongRecipients(parentInput.viewTree)
}
// This cannot be due to a topology change, because it would affect all views
}
"a view has an extra recipient" must {
@ -258,7 +359,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot)
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot, None)
.futureValue,
_.shouldBeCantonError(
SyncServiceAlarm,
@ -269,6 +370,37 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
goodInputs.loneElement shouldBe input
wrongRecipients shouldBe empty
}
"not warn if that is due to a topology change" in {
val input = mkInput1(Seq(party1), NonEmpty(Seq, participant1, participant2))
val submissionSnapshot: TopologySnapshot =
TestingTopology(topology =
Map(
inactive -> Map.empty,
party1 -> Map(
participant1 -> ParticipantPermission.Submission,
participant2 -> ParticipantPermission.Submission,
),
party2 -> Map(participant2 -> ParticipantPermission.Submission),
)
).build(loggerFactory).topologySnapshot()
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(
requestId,
Seq(input),
snapshot,
Some(submissionSnapshot),
)
.futureValue
// No assertion to explicitly check that no warning is emitted
)
goodInputs.loneElement shouldBe input
wrongRecipients shouldBe empty
}
}
"a parent view has an extra recipient" must {
@ -286,6 +418,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -297,6 +430,8 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
goodInputs shouldBe Seq(parentInput, childInput)
wrongRecipients shouldBe empty
}
// This cannot be due to a topology change, because it would affect all views
}
"the recipients have an extra group" must {
@ -310,7 +445,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
val (wrongRecipients, goodInputs) = loggerFactory.assertLogs(
validator
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot)
.retainInputsWithValidRecipients(requestId, Seq(input), snapshot, None)
.futureValue,
_.shouldBeCantonError(
SyncServiceAlarm,
@ -335,6 +470,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -369,6 +505,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -412,6 +549,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(
@ -461,6 +599,7 @@ class RecipientsValidatorTest extends BaseTestWordSpec with HasExecutionContext
requestId,
Seq(parentInput, childInput),
snapshot,
None,
)
.futureValue,
_.shouldBeCantonError(

View File

@ -1 +1 @@
20240528.13378.v6da04664
20240528.13380.v10deea37