update canton to 20240521.13331.v449e982c (#19247)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-05-22 12:09:44 +02:00 committed by GitHub
parent c584588a81
commit 609171a257
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2734 additions and 1932 deletions

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.admin.api.client.commands
import cats.syntax.either.*
import ch.qos.logback.classic.Level
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.health.admin.data.WaitingForExternalInput
import com.digitalasset.canton.health.admin.v30.{
HealthDumpRequest,
HealthDumpResponse,
@ -43,7 +44,14 @@ object StatusAdminCommands {
override def handleResponse(response: v30.StatusResponse): Either[String, data.NodeStatus[S]] =
((response.response match {
case v30.StatusResponse.Response.NotInitialized(notInitialized) =>
Right(data.NodeStatus.NotInitialized(notInitialized.active))
WaitingForExternalInput
.fromProtoV30(notInitialized.waitingForExternalInput)
.map(waitingFor =>
data.NodeStatus.NotInitialized(
notInitialized.active,
waitingFor,
)
)
case v30.StatusResponse.Response.Success(status) =>
deserialize(status).map(data.NodeStatus.Success(_))
case v30.StatusResponse.Response.Empty =>
@ -89,6 +97,28 @@ object StatusAdminCommands {
case _ => false
})
sealed abstract class IsReadyForExternalInput(
externalInput: v30.StatusResponse.NotInitialized.WaitingForExternalInput
) extends StatusAdminCommands.FromStatus({
case v30.StatusResponse.Response
.NotInitialized(v30.StatusResponse.NotInitialized(active, `externalInput`)) =>
true
case _ => false
})
object IsReadyForId
extends IsReadyForExternalInput(
v30.StatusResponse.NotInitialized.WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_ID
)
object IsReadyForNodeTopology
extends IsReadyForExternalInput(
v30.StatusResponse.NotInitialized.WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_NODE_TOPOLOGY
)
object IsReadyForInitialization
extends IsReadyForExternalInput(
v30.StatusResponse.NotInitialized.WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_INITIALIZATION
)
class FromStatus(predicate: v30.StatusResponse.Response => Boolean)
extends GetStatusBase[Boolean] {
override def handleResponse(response: v30.StatusResponse): Either[String, Boolean] =

View File

@ -765,7 +765,7 @@ trait ConsoleMacros extends NamedLogging with NoTracing {
true,
s"${instance.id.member} has already been initialized for domain ${status.domainId} instead of $domainId",
)
case NodeStatus.NotInitialized(true) =>
case NodeStatus.NotInitialized(true, _) =>
// the node is not yet initialized for this domain
Right(false)
case NodeStatus.Failure(msg) =>

View File

@ -113,10 +113,25 @@ class HealthAdministration[S <: data.NodeStatus.Status](
// in case the node is not reachable, we assume it is not running
falseIfUnreachable(runningCommand)
@Help.Summary("Check if the node is ready for setting the node's id")
def is_ready_for_id(): Boolean = falseIfUnreachable(
adminCommand(StatusAdminCommands.IsReadyForId)
)
@Help.Summary("Check if the node is ready for uploading the node's identity topology")
def is_ready_for_node_topology(): Boolean = falseIfUnreachable(
adminCommand(StatusAdminCommands.IsReadyForNodeTopology)
)
@Help.Summary("Check if the node is ready for initialization")
def is_ready_for_initialization(): Boolean = falseIfUnreachable(
adminCommand(StatusAdminCommands.IsReadyForInitialization)
)
@Help.Summary("Check if the node is running and is the active instance (mediator, participant)")
def active: Boolean = status match {
case NodeStatus.Success(status) => status.active
case NodeStatus.NotInitialized(active) => active
case NodeStatus.NotInitialized(active, _) => active
case _ => false
}
@ -142,6 +157,13 @@ class HealthAdministration[S <: data.NodeStatus.Status](
})
}
@Help.Summary("Wait for the node to be ready for setting the node's id")
def wait_for_ready_for_id(): Unit = waitFor(is_ready_for_id())
@Help.Summary("Wait for the node to be ready for uploading the node's identity topology")
def wait_for_ready_for_node_topology(): Unit = waitFor(is_ready_for_node_topology())
@Help.Summary("Wait for the node to be ready for initialization")
def wait_for_ready_for_initialization(): Unit = waitFor(is_ready_for_initialization())
protected def waitFor(condition: => Boolean): Unit = {
// all calls here are potentially unbounded. we do not know how long it takes
// for a node to start or for a node to become initialised. so we use the unbounded

View File

@ -21,6 +21,7 @@ import com.digitalasset.canton.console.{
FeatureFlagFilter,
Help,
Helpful,
MediatorReference,
}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.NamedLoggerFactory
@ -127,22 +128,26 @@ class MediatorPruningAdministrationGroup(
}
class MediatorSetupGroup(consoleCommandGroup: ConsoleCommandGroup)
extends ConsoleCommandGroup.Impl(consoleCommandGroup) {
class MediatorSetupGroup(node: MediatorReference) extends ConsoleCommandGroup.Impl(node) {
@Help.Summary("Assign a mediator to a domain")
def assign(
domainId: DomainId,
sequencerConnections: SequencerConnections,
sequencerConnectionValidation: SequencerConnectionValidation =
SequencerConnectionValidation.All,
): Unit = consoleEnvironment.run {
runner.adminCommand(
Initialize(
domainId,
sequencerConnections,
sequencerConnectionValidation,
waitForReady: Boolean = true,
): Unit = {
if (waitForReady) node.health.wait_for_ready_for_initialization()
consoleEnvironment.run {
runner.adminCommand(
Initialize(
domainId,
sequencerConnections,
sequencerConnectionValidation,
)
)
)
}
}
}

View File

@ -9,14 +9,14 @@ import com.digitalasset.canton.admin.api.client.commands.EnterpriseSequencerAdmi
InitializeFromOnboardingState,
}
import com.digitalasset.canton.admin.api.client.data.StaticDomainParameters
import com.digitalasset.canton.console.Help
import com.digitalasset.canton.console.{Help, SequencerReference}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.admin.grpc.InitializeSequencerResponse
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerSnapshot
import com.digitalasset.canton.topology.SequencerId
import com.google.protobuf.ByteString
class SequencerSetupGroup(parent: ConsoleCommandGroup) extends ConsoleCommandGroup.Impl(parent) {
class SequencerSetupGroup(node: SequencerReference) extends ConsoleCommandGroup.Impl(node) {
@Help.Summary(
"Download sequencer snapshot at given point in time to bootstrap another sequencer"
@ -79,7 +79,10 @@ class SequencerSetupGroup(parent: ConsoleCommandGroup) extends ConsoleCommandGro
def assign_from_genesis_state(
genesisState: ByteString,
domainParameters: StaticDomainParameters,
): InitializeSequencerResponse =
waitForReady: Boolean = true,
): InitializeSequencerResponse = {
if (waitForReady) node.health.wait_for_ready_for_initialization()
consoleEnvironment.run {
runner.adminCommand(
InitializeFromGenesisState(
@ -88,16 +91,23 @@ class SequencerSetupGroup(parent: ConsoleCommandGroup) extends ConsoleCommandGro
)
)
}
}
@Help.Summary(
"Dynamically initialize a sequencer from a point later than the beginning of the event stream." +
"This is called as part of the sequencer.setup.onboard_new_sequencer command, so you are unlikely to need to call this directly."
)
def assign_from_onboarding_state(onboardingState: ByteString): InitializeSequencerResponse =
def assign_from_onboarding_state(
onboardingState: ByteString,
waitForReady: Boolean = true,
): InitializeSequencerResponse = {
if (waitForReady) node.health.wait_for_ready_for_initialization()
consoleEnvironment.run {
runner.adminCommand(
InitializeFromOnboardingState(onboardingState)
)
}
}
}

View File

@ -84,10 +84,13 @@ class TopologyAdministrationGroup(
|Automatic node initialisation is usually turned off to preserve the identity of a participant or domain
|node (during major version upgrades) or if the topology transactions are managed through
|a different topology manager than the one integrated into this node.""")
def init_id(identifier: UniqueIdentifier): Unit =
def init_id(identifier: UniqueIdentifier, waitForReady: Boolean = true): Unit = {
if (waitForReady) instance.health.wait_for_ready_for_id()
consoleEnvironment.run {
adminCommand(TopologyAdminCommands.Init.InitId(identifier.toProtoPrimitive))
}
}
private def getIdCommand(): ConsoleCommandResult[UniqueIdentifier] =
adminCommand(TopologyAdminCommands.Init.GetId())

View File

@ -6,7 +6,7 @@ package com.digitalasset.canton.integration.tests
import com.digitalasset.canton.DomainAlias
import com.digitalasset.canton.config.CommunityStorageConfig
import com.digitalasset.canton.console.{CommandFailure, InstanceReference}
import com.digitalasset.canton.health.admin.data.NodeStatus
import com.digitalasset.canton.health.admin.data.{NodeStatus, WaitingForInitialization}
import com.digitalasset.canton.integration.CommunityTests.{
CommunityIntegrationTest,
SharedCommunityEnvironment,
@ -36,8 +36,14 @@ sealed trait EnterpriseFeatureInCommunityIntegrationTest
sequencer1.start()
mediator1.start()
sequencer1.health.status shouldBe NodeStatus.NotInitialized(true)
mediator1.health.status shouldBe NodeStatus.NotInitialized(true)
sequencer1.health.status shouldBe NodeStatus.NotInitialized(
active = true,
Some(WaitingForInitialization),
)
mediator1.health.status shouldBe NodeStatus.NotInitialized(
active = true,
Some(WaitingForInitialization),
)
bootstrap.domain(
domainAlias,

View File

@ -5,7 +5,7 @@ package com.digitalasset.canton.integration.tests
import com.digitalasset.canton.config.CommunityStorageConfig
import com.digitalasset.canton.console.InstanceReference
import com.digitalasset.canton.health.admin.data.NodeStatus
import com.digitalasset.canton.health.admin.data.{NodeStatus, WaitingForInitialization}
import com.digitalasset.canton.integration.CommunityTests.{
CommunityIntegrationTest,
SharedCommunityEnvironment,
@ -31,8 +31,14 @@ sealed trait SimplestPingCommunityIntegrationTest
sequencer1.start()
mediator1.start()
sequencer1.health.status shouldBe NodeStatus.NotInitialized(true)
mediator1.health.status shouldBe NodeStatus.NotInitialized(true)
sequencer1.health.status shouldBe NodeStatus.NotInitialized(
active = true,
Some(WaitingForInitialization),
)
mediator1.health.status shouldBe NodeStatus.NotInitialized(
active = true,
Some(WaitingForInitialization),
)
bootstrap.domain(
"da",

View File

@ -57,7 +57,15 @@ message StatusResponse {
}
message NotInitialized {
enum WaitingForExternalInput {
WAITING_FOR_EXTERNAL_INPUT_UNSPECIFIED = 0;
WAITING_FOR_EXTERNAL_INPUT_ID = 1;
WAITING_FOR_EXTERNAL_INPUT_NODE_TOPOLOGY = 2;
WAITING_FOR_EXTERNAL_INPUT_INITIALIZATION = 3;
}
bool active = 1; // Indicate if the node is active, usually true unless it's a replicated node that is passive
WaitingForExternalInput waiting_for_external_input = 2; // Indicates whether the node is waiting for external input
}
oneof response {

View File

@ -3,14 +3,16 @@
package com.digitalasset.canton.sequencing
import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.{MediatorGroup, Member, PartyId}
import com.digitalasset.canton.topology.{MediatorGroup, Member, ParticipantId, PartyId}
import com.digitalasset.canton.tracing.TraceContext
import scala.concurrent.{ExecutionContext, Future}
object GroupAddressResolver {
def resolveGroupsToMembers(
groupRecipients: Set[GroupRecipient],
topologySnapshot: TopologySnapshot,
@ -32,11 +34,7 @@ object GroupAddressResolver {
mapping <-
topologySnapshot
.activeParticipantsOfParties(parties.toSeq)
} yield mapping.map[GroupRecipient, Set[Member]] { case (party, participants) =>
ParticipantsOfParty(
PartyId.tryFromLfParty(party)
) -> participants.toSet[Member]
}
} yield asGroupRecipientsToMembers(mapping)
}
mediatorGroupByMember <- {
val mediatorGroups = groupRecipients.collect { case MediatorGroupRecipient(group) =>
@ -50,12 +48,7 @@ object GroupAddressResolver {
.mediatorGroupsOfAll(mediatorGroups)
.leftMap(_ => Seq.empty[MediatorGroup])
.merge
} yield groups
.map(group =>
MediatorGroupRecipient(group.index) -> (group.active.forgetNE ++ group.passive)
.toSet[Member]
)
.toMap[GroupRecipient, Set[Member]]
} yield asGroupRecipientsToMembers(groups)
}
allRecipients <- {
if (!groupRecipients.contains(AllMembersOfDomain)) {
@ -84,4 +77,24 @@ object GroupAddressResolver {
}
} yield participantsOfParty ++ mediatorGroupByMember ++ sequencersOfDomain ++ allRecipients
}
def asGroupRecipientsToMembers(
groups: Seq[MediatorGroup]
): Map[GroupRecipient, Set[Member]] =
groups
.map(group =>
MediatorGroupRecipient(group.index) -> (group.active.forgetNE ++ group.passive)
.toSet[Member]
)
.toMap[GroupRecipient, Set[Member]]
def asGroupRecipientsToMembers(
mapping: Map[LfPartyId, Set[ParticipantId]]
): Map[GroupRecipient, Set[Member]] = {
mapping.map[GroupRecipient, Set[Member]] { case (party, participants) =>
ParticipantsOfParty(
PartyId.tryFromLfParty(party)
) -> participants.toSet[Member]
}
}
}

View File

@ -58,7 +58,7 @@ final case class SubmissionRequest private (
@transient override protected lazy val companionObj: SubmissionRequest.type = SubmissionRequest
@VisibleForTesting
def isRequest: Boolean = {
def isConfirmationRequest: Boolean = {
val hasParticipantRecipient = batch.allMembers.exists {
case _: ParticipantId => true
case _: Member => false
@ -125,7 +125,7 @@ final case class SubmissionRequest private (
* [[com.digitalasset.canton.sequencing.protocol.ClosedEnvelope.bytes]] are interpreted.
* </li>
* <li>The [[sender]] and the [[messageId]], as they are specific to the sender of a particular submission request</li>
* <li>The [[isRequest]] flag because it is irrelevant for delivery or aggregation</li>
* <li>The [[isConfirmationRequest]] flag because it is irrelevant for delivery or aggregation</li>
* </ul>
*/
def aggregationId(hashOps: HashOps): Option[AggregationId] = aggregationRule.map { rule =>

View File

@ -478,6 +478,26 @@ object TopologyManagerError extends TopologyManagerErrorGroup {
with TopologyManagerError
}
@Explanation(
"This error indicates that the topology transaction references parties that are currently unknown."
)
@Resolution(
"""Wait for the onboarding of the parties to be become active or remove the unknown parties from the topology transaction.
|The metadata details of this error contain the unknown parties in the field ``parties``."""
)
object UnknownParties
extends ErrorCode(
id = "UNKNOWN_PARTIES",
ErrorCategory.InvalidGivenCurrentSystemStateResourceMissing,
) {
final case class Failure(parties: Seq[PartyId])(implicit
override val loggingContext: ErrorLoggingContext
) extends CantonError.Impl(
cause = s"Parties ${parties.sorted.mkString(", ")} are unknown."
)
with TopologyManagerError
}
@Explanation(
"""This error indicates that a participant is trying to rescind their domain trust certificate
|while still being hosting parties."""

View File

@ -57,6 +57,15 @@ object TopologyTransactionRejection {
}
}
final case class UnknownParties(parties: Seq[PartyId]) extends TopologyTransactionRejection {
override def asString: String = s"Parties ${parties.sorted.mkString(", ")} are unknown."
override def pretty: Pretty[UnknownParties.this.type] = prettyOfString(_ => asString)
override def toTopologyManagerError(implicit elc: ErrorLoggingContext): TopologyManagerError =
TopologyManagerError.UnknownParties.Failure(parties)
}
final case class OnboardingRestrictionInPlace(
participant: ParticipantId,
restriction: OnboardingRestriction,

View File

@ -100,6 +100,11 @@ class ValidatingTopologyMappingChecks(
)
)
case (Code.AuthorityOf, None | Some(Code.AuthorityOf)) =>
toValidate
.select[TopologyChangeOp.Replace, AuthorityOf]
.map(checkAuthorityOf(effective, _))
case otherwise => None
}
checkOpt.getOrElse(EitherTUtil.unit)
@ -418,4 +423,42 @@ class ValidatingTopologyMappingChecks(
)
thresholdCheck.flatMap(_ => checkMissingMappings())
}
private def checkAuthorityOf(
effectiveTime: EffectiveTime,
toValidate: SignedTopologyTransaction[TopologyChangeOp.Replace, AuthorityOf],
)(implicit
traceContext: TraceContext
): EitherT[Future, TopologyTransactionRejection, Unit] = {
def checkPartiesAreKnown(): EitherT[Future, TopologyTransactionRejection, Unit] = {
val allPartiesToLoad = toValidate.mapping.partyId +: toValidate.mapping.parties
loadFromStore(
effectiveTime,
Code.PartyToParticipant,
filterUid = Some(allPartiesToLoad.map(_.uid)),
).flatMap { partyMappings =>
val knownParties = partyMappings.result
.flatMap(_.selectMapping[PartyToParticipant])
.map(_.mapping.partyId)
val missingParties = allPartiesToLoad.toSet -- knownParties
EitherTUtil.condUnitET(
missingParties.isEmpty,
TopologyTransactionRejection.UnknownParties(missingParties.toSeq.sorted),
)
}
}
val checkThreshold = {
val actual = toValidate.mapping.threshold.value
val mustBeAtMost = toValidate.mapping.parties.size
EitherTUtil.condUnitET(
actual <= mustBeAtMost,
TopologyTransactionRejection.ThresholdTooHigh(actual, mustBeAtMost),
)
}
checkThreshold.flatMap(_ => checkPartiesAreKnown())
}
}

View File

@ -150,7 +150,9 @@ public class JsonLfDecodersTest {
eq("\"1990-11-09T04:30:23.1Z\"", timestampUTC(1990, Month.NOVEMBER, 9, 4, 30, 23, 100000)),
eq("\"1990-11-09T04:30:23Z\"", timestampUTC(1990, Month.NOVEMBER, 9, 4, 30, 23, 0)),
eq("\"0001-01-01T00:00:00Z\"", timestampUTC(1, Month.JANUARY, 1, 0, 0, 0, 0)),
eq("\"1990-11-09T04:30:23.123-07:00\"", timestampUTC(1990, Month.NOVEMBER, 9, 11, 30, 23, 123000)));
eq(
"\"1990-11-09T04:30:23.123-07:00\"",
timestampUTC(1990, Month.NOVEMBER, 9, 11, 30, 23, 123000)));
}
@Test

View File

@ -7,6 +7,7 @@ import cats.data.EitherT
import cats.syntax.traverse.*
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.health.admin.data.WaitingForExternalInput
import com.digitalasset.canton.lifecycle.UnlessShutdown.AbortedDueToShutdown
import com.digitalasset.canton.lifecycle.{
FlagCloseable,
@ -71,6 +72,9 @@ abstract class BootstrapStage[T <: CantonNode, StageResult <: BootstrapStageOrLe
closeables.updateAndGet(_ :+ item).discard
}
/** indicates the type of external input the stage might be waiting for */
def waitingFor: Option[WaitingForExternalInput] = None
/** main handler to implement where we attempt to init this stage
* if we return None, then the init was okay but stopped at this level (waiting for
* further input)

View File

@ -34,7 +34,12 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.environment.CantonNodeBootstrap.HealthDumpFunction
import com.digitalasset.canton.error.FatalError
import com.digitalasset.canton.health.admin.data.NodeStatus
import com.digitalasset.canton.health.admin.data.{
NodeStatus,
WaitingForExternalInput,
WaitingForId,
WaitingForNodeTopology,
}
import com.digitalasset.canton.health.admin.grpc.GrpcStatusService
import com.digitalasset.canton.health.admin.v30.StatusServiceGrpc
import com.digitalasset.canton.health.{
@ -198,7 +203,23 @@ abstract class CantonNodeBootstrapImpl[
private def status: Future[NodeStatus[NodeStatus.Status]] = {
getNode
.map(_.status.map(NodeStatus.Success(_)))
.getOrElse(Future.successful(NodeStatus.NotInitialized(isActive)))
.getOrElse(
Future.successful(NodeStatus.NotInitialized(isActive, waitingFor))
)
}
private def waitingFor: Option[WaitingForExternalInput] = {
def nextStage(stage: BootstrapStage[?, ?]): Option[BootstrapStage[?, ?]] = {
stage.next match {
case Some(s: BootstrapStage[_, _]) => nextStage(s)
case Some(_: RunningNode[_]) => None
// BootstrapStageOrLeaf is not a sealed class, therefore we need to catch any other
// possible subclass
case Some(_) => None
case None => Some(stage)
}
}
nextStage(startupStage).flatMap(_.waitingFor)
}
protected def registerHealthGauge(): Unit = {
@ -481,6 +502,8 @@ abstract class CantonNodeBootstrapImpl[
)
)
override def waitingFor: Option[WaitingForExternalInput] = Some(WaitingForId)
override protected def stageCompleted(implicit
traceContext: TraceContext
): Future[Option[UniqueIdentifier]] = initializationStore.uid
@ -622,6 +645,8 @@ abstract class CantonNodeBootstrapImpl[
super.start()
}
override def waitingFor: Option[WaitingForExternalInput] = Some(WaitingForNodeTopology)
override protected def stageCompleted(implicit
traceContext: TraceContext
): Future[Option[Unit]] = {

View File

@ -7,11 +7,12 @@ import cats.syntax.either.*
import cats.syntax.functor.*
import cats.syntax.option.*
import cats.syntax.traverse.*
import com.digitalasset.canton.ProtoDeserializationError.InvariantViolation
import com.digitalasset.canton.ProtoDeserializationError.{InvariantViolation, UnrecognizedEnum}
import com.digitalasset.canton.config.RequireTypes.Port
import com.digitalasset.canton.health.ComponentHealthState.UnhealthyState
import com.digitalasset.canton.health.admin.data.NodeStatus.{multiline, portsString}
import com.digitalasset.canton.health.admin.v30
import com.digitalasset.canton.health.admin.v30.StatusResponse.NotInitialized.WaitingForExternalInput as V30WaitingForExternalInput
import com.digitalasset.canton.health.{
ComponentHealthState,
ComponentStatus,
@ -51,8 +52,10 @@ object NodeStatus {
}
/** A node is running but not yet initialized. */
final case class NotInitialized(active: Boolean) extends NodeStatus[Nothing] {
override def pretty: Pretty[NotInitialized] = prettyOfClass(param("active", _.active))
final case class NotInitialized(active: Boolean, waitingFor: Option[WaitingForExternalInput])
extends NodeStatus[Nothing] {
override def pretty: Pretty[NotInitialized] =
prettyOfClass(param("active", _.active), paramIfDefined("waitingFor", _.waitingFor))
override def trySuccess: Nothing = sys.error(s"Node is not yet initialized.")
override def successOption: Option[Nothing] = None
@ -84,6 +87,45 @@ object NodeStatus {
if (elements.isEmpty) "None" else elements.map(el => s"\n\t$el").mkString
}
sealed abstract class WaitingForExternalInput extends PrettyPrinting {
def toProtoV30: V30WaitingForExternalInput
}
case object WaitingForId extends WaitingForExternalInput {
override def pretty: Pretty[WaitingForId.this.type] = prettyOfString(_ => "ID")
override def toProtoV30: V30WaitingForExternalInput =
V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_ID
}
case object WaitingForNodeTopology extends WaitingForExternalInput {
override def pretty: Pretty[WaitingForNodeTopology.this.type] =
prettyOfString(_ => "Node Topology")
override def toProtoV30: V30WaitingForExternalInput =
V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_NODE_TOPOLOGY
}
case object WaitingForInitialization extends WaitingForExternalInput {
override def pretty: Pretty[WaitingForInitialization.this.type] =
prettyOfString(_ => "Initialization")
override def toProtoV30: V30WaitingForExternalInput =
V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_INITIALIZATION
}
object WaitingForExternalInput {
def fromProtoV30(
externalInput: V30WaitingForExternalInput
): ParsingResult[Option[WaitingForExternalInput]] = externalInput match {
case V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_UNSPECIFIED => Right(None)
case V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_ID => Right(Some(WaitingForId))
case V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_NODE_TOPOLOGY =>
Right(Some(WaitingForNodeTopology))
case V30WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_INITIALIZATION =>
Right(Some(WaitingForInitialization))
case V30WaitingForExternalInput.Unrecognized(unrecognizedValue) =>
Left(UnrecognizedEnum("waiting_for_external_input", unrecognizedValue))
}
}
final case class SimpleStatus(
uid: UniqueIdentifier,
uptime: Duration,

View File

@ -36,9 +36,18 @@ class GrpcStatusService(
status.map {
case data.NodeStatus.Success(status) =>
v30.StatusResponse(v30.StatusResponse.Response.Success(status.toProtoV30))
case data.NodeStatus.NotInitialized(active) =>
case data.NodeStatus.NotInitialized(active, waitingFor) =>
v30.StatusResponse(
v30.StatusResponse.Response.NotInitialized(v30.StatusResponse.NotInitialized(active))
v30.StatusResponse.Response.NotInitialized(
v30.StatusResponse.NotInitialized(
active,
waitingFor
.map(_.toProtoV30)
.getOrElse(
v30.StatusResponse.NotInitialized.WaitingForExternalInput.WAITING_FOR_EXTERNAL_INPUT_UNSPECIFIED
),
)
)
)
case data.NodeStatus.Failure(_msg) =>
// The node's status should never return a Failure here.

View File

@ -51,7 +51,15 @@ class ValidatingTopologyMappingChecksTest
}
"TopologyMappingChecks" when {
import DefaultTestIdentities.{domainId, participant1, participant2, participant3, party1}
import DefaultTestIdentities.{
domainId,
participant1,
participant2,
participant3,
party1,
party2,
party3,
}
import factory.TestingTransactions.*
def checkTransaction(
@ -465,6 +473,68 @@ class ValidatingTopologyMappingChecksTest
)
}
}
"validating AuthorityOf" should {
val ptps @ Seq(p1_ptp, p2_ptp, p3_ptp) = Seq(party1, party2, party3).map { party =>
factory.mkAdd(
PartyToParticipant(
party,
None,
PositiveInt.one,
Seq(HostingParticipant(participant1, ParticipantPermission.Confirmation)),
groupAddressing = false,
)
)
}
"report no errors for valid mappings" in {
val (checks, store) = mk()
addToStore(store, ptps*)
val authorityOf =
factory.mkAdd(AuthorityOf(party1, None, PositiveInt.two, Seq(party2, party3)))
checkTransaction(checks, authorityOf) shouldBe Right(())
}
"report UnknownParties for missing PTPs for referenced parties" in {
val (checks, store) = mk()
addToStore(store, p1_ptp)
val missingAuthorizingParty =
factory.mkAdd(AuthorityOf(party2, None, PositiveInt.one, Seq(party1)))
checkTransaction(checks, missingAuthorizingParty) shouldBe Left(
TopologyTransactionRejection.UnknownParties(Seq(party2))
)
val missingAuthorizedParty =
factory.mkAdd(AuthorityOf(party1, None, PositiveInt.one, Seq(party2)))
checkTransaction(checks, missingAuthorizedParty) shouldBe Left(
TopologyTransactionRejection.UnknownParties(Seq(party2))
)
val missingAllParties =
factory.mkAdd(AuthorityOf(party2, None, PositiveInt.one, Seq(party3)))
checkTransaction(checks, missingAllParties) shouldBe Left(
TopologyTransactionRejection.UnknownParties(Seq(party2, party3))
)
val missingMixedParties =
factory.mkAdd(AuthorityOf(party2, None, PositiveInt.one, Seq(party1, party3)))
checkTransaction(checks, missingMixedParties) shouldBe Left(
TopologyTransactionRejection.UnknownParties(Seq(party2, party3))
)
}
"report ThresholdTooHigh if the threshold is higher than the number of authorized parties" in {
val (checks, store) = mk()
addToStore(store, ptps*)
val thresholdTooHigh =
factory.mkAdd(AuthorityOf(party1, None, PositiveInt.three, Seq(party2, party3)))
checkTransaction(checks, thresholdTooHigh) shouldBe Left(
TopologyTransactionRejection.ThresholdTooHigh(3, 2)
)
}
}
}
private def addToStore(

View File

@ -13,13 +13,23 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.domain.block
import com.digitalasset.canton.domain.block.BlockSequencerStateManager.HeadState
import com.digitalasset.canton.domain.block.BlockUpdateGenerator.BlockChunk
import com.digitalasset.canton.domain.block.data.{
BlockEphemeralState,
BlockInfo,
EphemeralState,
SequencerBlockStore,
}
import com.digitalasset.canton.domain.block.update.BlockUpdateGenerator.BlockChunk
import com.digitalasset.canton.domain.block.update.{
BlockUpdate,
BlockUpdateGenerator,
ChunkUpdate,
CompleteBlockUpdate,
LocalBlockUpdate,
OrderedBlockUpdate,
SignedChunkEvents,
UnsignedChunkEvents,
}
import com.digitalasset.canton.domain.sequencing.integrations.state.statemanager.MemberCounters
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencer
import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscriptionError
@ -78,14 +88,14 @@ trait BlockSequencerStateManagerBase extends FlagCloseableAsync {
def isMemberEnabled(member: Member): Boolean
/** Flow to turn [[com.digitalasset.canton.domain.block.BlockEvents]] of one block
* into a series of [[com.digitalasset.canton.domain.block.OrderedBlockUpdate]]s
* into a series of [[update.OrderedBlockUpdate]]s
* that are to be persisted subsequently using [[applyBlockUpdate]].
*/
def processBlock(
bug: BlockUpdateGenerator
): Flow[BlockEvents, Traced[OrderedBlockUpdate[SignedChunkEvents]], NotUsed]
/** Persists the [[com.digitalasset.canton.domain.block.BlockUpdate]]s and completes the waiting RPC calls
/** Persists the [[update.BlockUpdate]]s and completes the waiting RPC calls
* as necessary.
*/
def applyBlockUpdate(

View File

@ -15,7 +15,7 @@ import com.digitalasset.canton.sequencing.protocol.TrafficState
import com.digitalasset.canton.topology.Member
/** Subset of the [[EphemeralState]] that is used by the block processing stage
* of the [[com.digitalasset.canton.domain.block.BlockUpdateGenerator]]
* of the [[com.digitalasset.canton.domain.block.update.BlockUpdateGenerator]]
*/
final case class BlockUpdateEphemeralState(
checkpoints: Map[Member, CounterCheckpoint],

View File

@ -0,0 +1,516 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.block.update
import cats.implicits.catsStdInstancesForFuture
import cats.syntax.alternative.*
import cats.syntax.functor.*
import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import com.daml.metrics.api.MetricsContext
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, HashPurpose, SyncCryptoClient}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.LedgerBlockEvent
import com.digitalasset.canton.domain.block.LedgerBlockEvent.{Acknowledgment, Send}
import com.digitalasset.canton.domain.block.update.BlockUpdateGeneratorImpl.{
RecipientStats,
SequencedSubmission,
State,
}
import com.digitalasset.canton.domain.block.update.SequencedSubmissionsValidator.SequencedSubmissionsValidationResult
import com.digitalasset.canton.domain.metrics.BlockMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.SequencerRateLimitManager
import com.digitalasset.canton.error.BaseAlarm
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.sequencing.GroupAddressResolver
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.*
import com.digitalasset.canton.version.ProtocolVersion
import monocle.Monocle.toAppliedFocusOps
import scala.concurrent.{ExecutionContext, Future}
/** Processes a chunk of events in a block, yielding a [[ChunkUpdate]].
*/
private[update] final class BlockChunkProcessor(
domainId: DomainId,
protocolVersion: ProtocolVersion,
domainSyncCryptoApi: DomainSyncCryptoClient,
sequencerId: SequencerId,
rateLimitManager: SequencerRateLimitManager,
orderingTimeFixMode: OrderingTimeFixMode,
override val loggerFactory: NamedLoggerFactory,
metrics: BlockMetrics,
)(implicit closeContext: CloseContext)
extends NamedLogging {
private val sequencedSubmissionsValidator =
new SequencedSubmissionsValidator(
domainId,
protocolVersion,
domainSyncCryptoApi,
sequencerId,
rateLimitManager,
loggerFactory,
)
def processChunk(
state: BlockUpdateGeneratorImpl.State,
height: Long,
index: Int,
chunkEvents: NonEmpty[Seq[Traced[LedgerBlockEvent]]],
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(BlockUpdateGeneratorImpl.State, ChunkUpdate[UnsignedChunkEvents])] = {
val (lastTsBeforeValidation, fixedTsChanges) = fixTimestamps(height, index, state, chunkEvents)
// TODO(i18438): verify the signature of the sequencer on the SendEvent
val submissionRequests =
fixedTsChanges.collect { case (ts, ev @ Traced(sendEvent: Send)) =>
// Discard the timestamp of the `Send` event as this one is obsolete
(ts, ev.map(_ => sendEvent.signedSubmissionRequest))
}
FutureUtil.doNotAwait(
recordSubmissionMetrics(fixedTsChanges.map(_._2)),
"submission metric updating failed",
)
for {
sequencedSubmissionsWithSnapshots <-
addSnapshots(
state.latestSequencerEventTimestamp,
state.ephemeral.headCounter(sequencerId),
submissionRequests,
)
newMembers <- detectMembersWithoutSequencerCounters(state, sequencedSubmissionsWithSnapshots)
_ = if (newMembers.nonEmpty) {
logger.info(s"Detected new members without sequencer counter: $newMembers")
}
acksValidationResult <- processAcknowledgements(state, fixedTsChanges)
(acksByMember, invalidAcks) = acksValidationResult
// Warn if we use an approximate snapshot but only after we've read at least one
warnIfApproximate = state.ephemeral.headCounterAboveGenesis(sequencerId)
newMembersTraffic <-
computeNewMembersTraffic(
state,
lastTsBeforeValidation,
newMembers,
warnIfApproximate,
)
stateWithNewMembers = addNewMembers(
state,
height,
index,
newMembers,
acksByMember,
newMembersTraffic,
)
validationResult <-
sequencedSubmissionsValidator.validateSequencedSubmissions(
stateWithNewMembers,
height,
sequencedSubmissionsWithSnapshots,
)
SequencedSubmissionsValidationResult(
finalEphemeralState,
reversedSignedEvents,
inFlightAggregationUpdates,
lastSequencerEventTimestamp,
reversedOutcomes,
) = validationResult
finalEphemeralStateWithAggregationExpiry =
finalEphemeralState.evictExpiredInFlightAggregations(lastTsBeforeValidation)
chunkUpdate =
ChunkUpdate(
newMembers,
acksByMember,
invalidAcks,
reversedSignedEvents.reverse,
inFlightAggregationUpdates,
lastSequencerEventTimestamp,
finalEphemeralStateWithAggregationExpiry,
reversedOutcomes.reverse,
)
// We don't want to take into consideration events that have possibly been discarded, otherwise we could
// assign a last ts value to the block based on an event that wasn't included in the block which would cause
// validations to fail down the line. That's why we need to compute it using only validated events, instead of
// using the lastTs computed initially pre-validation.
lastChunkTsOfSuccessfulEvents =
reversedSignedEvents
.map(_.sequencingTimestamp)
.maxOption
.orElse(newMembers.values.maxOption)
.getOrElse(state.lastChunkTs)
newState =
BlockUpdateGeneratorImpl.State(
state.lastBlockTs,
lastChunkTsOfSuccessfulEvents,
lastSequencerEventTimestamp.orElse(state.latestSequencerEventTimestamp),
finalEphemeralStateWithAggregationExpiry,
)
} yield (newState, chunkUpdate)
}
private def fixTimestamps(
height: Long,
index: Int,
state: State,
chunk: NonEmpty[Seq[Traced[LedgerBlockEvent]]],
): (CantonTimestamp, Seq[(CantonTimestamp, Traced[LedgerBlockEvent])]) = {
val (lastTsBeforeValidation, revFixedTsChanges) =
// With this logic, we assign to the initial non-Send events the same timestamp as for the last
// block. This means that we will include these events in the ephemeral state of the previous block
// when we re-read it from the database. But this doesn't matter given that all those events are idempotent.
chunk.forgetNE.foldLeft[
(CantonTimestamp, Seq[(CantonTimestamp, Traced[LedgerBlockEvent])]),
]((state.lastChunkTs, Seq.empty)) { case ((lastTs, events), event) =>
event.value match {
case send: Send =>
val ts = ensureStrictlyIncreasingTimestamp(lastTs, send.timestamp)
logger.info(
show"Observed Send with messageId ${send.signedSubmissionRequest.content.messageId.singleQuoted} in block $height, chunk $index and assigned it timestamp $ts"
)(event.traceContext)
(ts, (ts, event) +: events)
case _ =>
logger.info(
s"Observed ${event.value} in block $height, chunk $index at timestamp $lastTs"
)(
event.traceContext
)
(lastTs, (lastTs, event) +: events)
}
}
val fixedTsChanges: Seq[(CantonTimestamp, Traced[LedgerBlockEvent])] = revFixedTsChanges.reverse
(lastTsBeforeValidation, fixedTsChanges)
}
// only accept the provided timestamp if it's strictly greater than the last timestamp
// otherwise just offset the last valid timestamp by 1
private def ensureStrictlyIncreasingTimestamp(
lastTs: CantonTimestamp,
providedTimestamp: CantonTimestamp,
): CantonTimestamp = {
val invariant = providedTimestamp > lastTs
orderingTimeFixMode match {
case OrderingTimeFixMode.ValidateOnly =>
if (!invariant)
sys.error(
"BUG: sequencing timestamps are not strictly monotonically increasing," +
s" last timestamp $lastTs, provided timestamp: $providedTimestamp"
)
providedTimestamp
case OrderingTimeFixMode.MakeStrictlyIncreasing =>
if (invariant) {
providedTimestamp
} else {
lastTs.immediateSuccessor
}
}
}
private def addSnapshots(
latestSequencerEventTimestamp: Option[CantonTimestamp],
sequencersSequencerCounter: Option[SequencerCounter],
submissionRequests: Seq[(CantonTimestamp, Traced[SignedContent[SubmissionRequest]])],
)(implicit executionContext: ExecutionContext): FutureUnlessShutdown[Seq[SequencedSubmission]] =
submissionRequests.parTraverse { case (sequencingTimestamp, tracedSubmissionRequest) =>
tracedSubmissionRequest.withTraceContext { implicit traceContext => submissionRequest =>
// Warn if we use an approximate snapshot but only after we've read at least one
val warnIfApproximate = sequencersSequencerCounter.exists(_ > SequencerCounter.Genesis)
for {
sequencingSnapshot <- SyncCryptoClient.getSnapshotForTimestampUS(
domainSyncCryptoApi,
sequencingTimestamp,
latestSequencerEventTimestamp,
protocolVersion,
warnIfApproximate,
)
topologySnapshotO <- submissionRequest.content.topologyTimestamp match {
case None => FutureUnlessShutdown.pure(None)
case Some(topologyTimestamp) if topologyTimestamp <= sequencingTimestamp =>
SyncCryptoClient
.getSnapshotForTimestampUS(
domainSyncCryptoApi,
topologyTimestamp,
latestSequencerEventTimestamp,
protocolVersion,
warnIfApproximate,
)
.map(Some(_))
case _ => FutureUnlessShutdown.pure(None)
}
} yield SequencedSubmission(
sequencingTimestamp,
submissionRequest,
sequencingSnapshot,
topologySnapshotO,
)(traceContext)
}
}
private def computeNewMembersTraffic(
state: State,
lastTsBeforeValidation: CantonTimestamp,
newMembers: Map[Member, CantonTimestamp],
warnIfApproximate: Boolean,
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[Map[_ <: Member, TrafficState]] =
if (newMembers.nonEmpty) {
// We are using the snapshot at lastTs for all new members in this chunk rather than their registration times.
// In theory, a parameter change could have become effective in between, but we deliberately ignore this for now.
// Moreover, a member is effectively registered when it appears in the topology state with the relevant certificate,
// but the traffic state here is created only when the member sends or receives the first message.
for {
snapshot <- SyncCryptoClient
.getSnapshotForTimestampUS(
client = domainSyncCryptoApi,
desiredTimestamp = lastTsBeforeValidation,
previousTimestampO = state.latestSequencerEventTimestamp,
protocolVersion = protocolVersion,
warnIfApproximate = warnIfApproximate,
)
parameters <- snapshot.ipsSnapshot.trafficControlParameters(protocolVersion)
updatedStates <- parameters match {
case Some(params) =>
newMembers.toList
.parTraverse { case (member, timestamp) =>
rateLimitManager
.createNewTrafficStateAt(
member,
timestamp.immediatePredecessor,
params,
)
.map(member -> _)
}
.map(_.toMap)
case _ =>
FutureUnlessShutdown.pure(
newMembers.view.mapValues { timestamp =>
TrafficState.empty(timestamp)
}.toMap
)
}
} yield updatedStates
} else FutureUnlessShutdown.pure(Map.empty)
private def addNewMembers(
state: State,
height: Long,
index: Int,
newMembers: Map[Member, CantonTimestamp],
acksByMember: Map[Member, CantonTimestamp],
newMembersTraffic: Map[_ <: Member, TrafficState],
)(implicit traceContext: TraceContext): State = {
val newMemberStatus = newMembers.map { case (member, ts) =>
member -> InternalSequencerMemberStatus(ts, None)
}
val newMembersWithAcknowledgements =
acksByMember.foldLeft(state.ephemeral.membersMap ++ newMemberStatus) {
case (membersMap, (member, timestamp)) =>
membersMap
.get(member)
.fold {
logger.debug(
s"Ack at $timestamp for $member (block $height, chunk $index) being ignored because the member has not yet been registered."
)
membersMap
} { memberStatus =>
membersMap.updated(member, memberStatus.copy(lastAcknowledged = Some(timestamp)))
}
}
state
.focus(_.ephemeral.membersMap)
.replace(newMembersWithAcknowledgements)
.focus(_.ephemeral.trafficState)
.modify(_ ++ newMembersTraffic)
}
private def detectMembersWithoutSequencerCounters(
state: BlockUpdateGeneratorImpl.State,
sequencedSubmissions: Seq[SequencedSubmission],
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[Map[Member, CantonTimestamp]] =
sequencedSubmissions
.parFoldMapA { sequencedSubmission =>
val SequencedSubmission(sequencingTimestamp, event, sequencingSnapshot, topologySnapshotO) =
sequencedSubmission
def recipientIsKnown(member: Member): Future[Option[Member]] = {
if (!member.isAuthenticated) Future.successful(None)
else
sequencingSnapshot.ipsSnapshot
.isMemberKnown(member)
.map(Option.when(_)(member))
}
val topologySnapshot = topologySnapshotO.getOrElse(sequencingSnapshot).ipsSnapshot
import event.content.sender
for {
groupToMembers <- FutureUnlessShutdown.outcomeF(
GroupAddressResolver.resolveGroupsToMembers(
event.content.batch.allRecipients.collect { case groupRecipient: GroupRecipient =>
groupRecipient
},
topologySnapshot,
)
)
memberRecipients = event.content.batch.allRecipients.collect {
case MemberRecipient(member) => member
}
eligibleSenders = event.content.aggregationRule.fold(Seq.empty[Member])(
_.eligibleSenders
)
knownMemberRecipientsOrSender <- FutureUnlessShutdown.outcomeF(
(eligibleSenders ++ memberRecipients.toSeq :+ sender)
.parTraverseFilter(recipientIsKnown)
)
} yield {
val knownGroupMembers = groupToMembers.values.flatten
val allowUnauthenticatedSender = Option.when(!sender.isAuthenticated)(sender).toList
val allMembersInSubmission =
Set.empty ++ knownGroupMembers ++ knownMemberRecipientsOrSender ++ allowUnauthenticatedSender
(allMembersInSubmission -- state.ephemeral.registeredMembers)
.map(_ -> sequencingTimestamp)
.toSeq
}
}
.map(
_.groupBy { case (member, _) => member }
.mapFilter { tssForMember => tssForMember.map { case (_, ts) => ts }.minOption }
)
private def processAcknowledgements(
state: State,
fixedTsChanges: Seq[(CantonTimestamp, Traced[LedgerBlockEvent])],
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[
(Map[Member, CantonTimestamp], Seq[(Member, CantonTimestamp, BaseAlarm)])
] =
for {
snapshot <- SyncCryptoClient.getSnapshotForTimestampUS(
domainSyncCryptoApi,
state.lastBlockTs,
state.latestSequencerEventTimestamp,
protocolVersion,
warnIfApproximate = false,
)
allAcknowledgements = fixedTsChanges.collect { case (_, t @ Traced(Acknowledgment(ack))) =>
t.map(_ => ack)
}
(goodTsAcks, futureAcks) = allAcknowledgements.partition { tracedSignedAck =>
// Intentionally use the previous block's last timestamp
// such that the criterion does not depend on how the block events are chunked up.
tracedSignedAck.value.content.timestamp <= state.lastBlockTs
}
invalidTsAcks = futureAcks.map(_.withTraceContext { implicit traceContext => signedAck =>
val ack = signedAck.content
val member = ack.member
val timestamp = ack.timestamp
val error =
SequencerError.InvalidAcknowledgementTimestamp.Error(member, timestamp, state.lastBlockTs)
(member, timestamp, error)
})
sigChecks <- FutureUnlessShutdown.outcomeF(Future.sequence(goodTsAcks.map(_.withTraceContext {
implicit traceContext => signedAck =>
val ack = signedAck.content
signedAck
.verifySignature(
snapshot,
ack.member,
HashPurpose.AcknowledgementSignature,
)
.leftMap(error =>
(
ack.member,
ack.timestamp,
SequencerError.InvalidAcknowledgementSignature
.Error(signedAck, state.lastBlockTs, error): BaseAlarm,
)
)
.map(_ => (ack.member, ack.timestamp))
}.value)))
(invalidSigAcks, validSigAcks) = sigChecks.separate
acksByMember = validSigAcks
// Look for the highest acked timestamp by each member
.groupBy { case (member, _) => member }
.fmap(NonEmptyUtil.fromUnsafe(_).maxBy1(_._2)._2)
} yield (acksByMember, invalidTsAcks ++ invalidSigAcks)
private def recordSubmissionMetrics(
value: Seq[Traced[LedgerBlockEvent]]
)(implicit executionContext: ExecutionContext): Future[Unit] =
Future {
value.foreach(_.withTraceContext { implicit traceContext =>
{
case LedgerBlockEvent.Send(_, signedSubmissionRequest, payloadSize) =>
signedSubmissionRequest.content.content.batch.allRecipients
.foldLeft(RecipientStats()) {
case (acc, MemberRecipient(ParticipantId(_)) | ParticipantsOfParty(_)) =>
acc.copy(participants = true)
case (acc, MemberRecipient(MediatorId(_)) | MediatorGroupRecipient(_)) =>
acc.copy(mediators = true)
case (acc, MemberRecipient(SequencerId(_)) | SequencersOfDomain) =>
acc.copy(sequencers = true)
case (
acc,
MemberRecipient(UnauthenticatedMemberId(_)),
) =>
acc // not used
case (acc, AllMembersOfDomain) => acc.copy(broadcast = true)
}
.updateMetric(
signedSubmissionRequest.content.content.sender,
payloadSize,
logger,
metrics,
)
case LedgerBlockEvent.Acknowledgment(request) =>
// record the event
metrics.blockEvents
.mark()(
MetricsContext(
"sender" -> request.content.member.toString,
"type" -> "ack",
)
)
// record the timestamp of the acknowledgment
metrics
.updateAcknowledgementGauge(
request.content.member.toString,
request.content.timestamp.underlying.micros,
)
}
})
}
}

View File

@ -1,15 +1,15 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.block
package com.digitalasset.canton.domain.block.update
import cats.syntax.functor.*
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.crypto.SyncCryptoApi
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockUpdateGenerator.EventsForSubmissionRequest
import com.digitalasset.canton.domain.block.data.{BlockInfo, BlockUpdateEphemeralState}
import com.digitalasset.canton.domain.block.update.BlockUpdateGenerator.EventsForSubmissionRequest
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencer.LocalEvent
import com.digitalasset.canton.domain.sequencing.sequencer.{
InFlightAggregationUpdates,

View File

@ -0,0 +1,359 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.block.update
import cats.syntax.functor.*
import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import com.daml.metrics.api.MetricsContext
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.config.CantonRequireTypes.String73
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, HashPurpose, SyncCryptoApi}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.domain.block.LedgerBlockEvent.*
import com.digitalasset.canton.domain.block.data.{
BlockEphemeralState,
BlockInfo,
BlockUpdateEphemeralState,
}
import com.digitalasset.canton.domain.block.{BlockEvents, LedgerBlockEvent, RawLedgerBlock}
import com.digitalasset.canton.domain.metrics.BlockMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequestOps
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.InvalidLedgerEvent
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.SequencerRateLimitManager
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.store.SequencedEventStore.OrdinarySequencedEvent
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.digitalasset.canton.util.*
import com.digitalasset.canton.version.ProtocolVersion
import scala.collection.immutable
import scala.concurrent.ExecutionContext
/** Exposes functions that take the deserialized contents of a block from a blockchain integration
* and compute the new [[BlockUpdate]]s.
*
* These functions correspond to the following steps in the block processing stream pipeline:
* 1. Extracting block events from a raw ledger block ([[extractBlockEvents]]).
* 2. Chunking such block events into either event chunks terminated by a sequencer-addessed event or a block
* completion ([[chunkBlock]]).
* 3. Validating and enriching chunks to yield block updates ([[processBlockChunk]]).
* 4. Signing chunked events with the signing key of the sequencer ([[signChunkEvents]]).
*
* In particular, these functions are responsible for the final timestamp assignment of a given submission request.
* The timestamp assignment works as follows:
* 1. an initial timestamp is assigned to the submission request by the sequencer that writes it to the ledger
* 2. each sequencer that reads the block potentially adapts the previously assigned timestamp
* deterministically via `ensureStrictlyIncreasingTimestamp`
* 3. this timestamp is used to compute the [[BlockUpdate]]s
*
* Reasoning:
* Step 1 is done so that every sequencer sees the same timestamp for a given event.
* Step 2 is needed because different sequencers may assign the same timestamps to different events or may not assign
* strictly increasing timestamps due to clock skews.
*
* Invariants:
* For step 2, we assume that every sequencer observes the same stream of events from the underlying ledger
* (and especially that events are always read in the same order).
*/
trait BlockUpdateGenerator {
import BlockUpdateGenerator.*
type InternalState
def internalStateFor(state: BlockEphemeralState): InternalState
def extractBlockEvents(block: RawLedgerBlock): BlockEvents
def chunkBlock(block: BlockEvents)(implicit
traceContext: TraceContext
): immutable.Iterable[BlockChunk]
def processBlockChunk(state: InternalState, chunk: BlockChunk)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate[UnsignedChunkEvents])]
def signChunkEvents(events: UnsignedChunkEvents)(implicit
ec: ExecutionContext
): FutureUnlessShutdown[SignedChunkEvents]
}
object BlockUpdateGenerator {
type EventsForSubmissionRequest = Map[Member, SequencedEvent[ClosedEnvelope]]
type SignedEvents = Map[Member, OrdinarySerializedEvent]
sealed trait BlockChunk extends Product with Serializable
final case class NextChunk(
blockHeight: Long,
chunkIndex: Int,
events: NonEmpty[Seq[Traced[LedgerBlockEvent]]],
) extends BlockChunk
final case class EndOfBlock(blockHeight: Long) extends BlockChunk
}
class BlockUpdateGeneratorImpl(
domainId: DomainId,
protocolVersion: ProtocolVersion,
domainSyncCryptoApi: DomainSyncCryptoClient,
sequencerId: SequencerId,
maybeLowerTopologyTimestampBound: Option[CantonTimestamp],
rateLimitManager: SequencerRateLimitManager,
orderingTimeFixMode: OrderingTimeFixMode,
metrics: BlockMetrics,
protected val loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit val closeContext: CloseContext)
extends BlockUpdateGenerator
with NamedLogging {
import BlockUpdateGenerator.*
import BlockUpdateGeneratorImpl.*
private val blockChunkProcessor =
new BlockChunkProcessor(
domainId,
protocolVersion,
domainSyncCryptoApi,
sequencerId,
rateLimitManager,
orderingTimeFixMode,
loggerFactory,
metrics,
)
override type InternalState = State
override def internalStateFor(state: BlockEphemeralState): InternalState = State(
lastBlockTs = state.latestBlock.lastTs,
lastChunkTs = state.latestBlock.lastTs,
latestSequencerEventTimestamp = state.latestBlock.latestSequencerEventTimestamp,
ephemeral = state.state.toBlockUpdateEphemeralState,
)
override def extractBlockEvents(block: RawLedgerBlock): BlockEvents = {
val ledgerBlockEvents = block.events.mapFilter { tracedEvent =>
implicit val traceContext: TraceContext = tracedEvent.traceContext
LedgerBlockEvent.fromRawBlockEvent(protocolVersion)(tracedEvent.value) match {
case Left(error) =>
InvalidLedgerEvent.Error(block.blockHeight, error).discard
None
case Right(value) =>
Some(Traced(value))
}
}
BlockEvents(block.blockHeight, ledgerBlockEvents)
}
override def chunkBlock(
block: BlockEvents
)(implicit traceContext: TraceContext): immutable.Iterable[BlockChunk] = {
val blockHeight = block.height
metrics.height.updateValue(blockHeight)
// We must start a new chunk whenever the chunk processing advances lastSequencerEventTimestamp
// Otherwise the logic for retrieving a topology snapshot or traffic state could deadlock
IterableUtil
.splitAfter(block.events)(event => isAddressingSequencers(event.value))
.zipWithIndex
.map { case (events, index) =>
NextChunk(blockHeight, index, events)
} ++ Seq(EndOfBlock(blockHeight))
}
private def isAddressingSequencers(event: LedgerBlockEvent): Boolean =
event match {
case Send(_, signedOrderingRequest, _) =>
val allRecipients =
signedOrderingRequest.signedSubmissionRequest.content.batch.allRecipients
allRecipients.contains(AllMembersOfDomain) ||
allRecipients.contains(SequencersOfDomain)
case _ => false
}
override final def processBlockChunk(state: InternalState, chunk: BlockChunk)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[(InternalState, OrderedBlockUpdate[UnsignedChunkEvents])] =
chunk match {
case EndOfBlock(height) =>
val newState = state.copy(lastBlockTs = state.lastChunkTs)
val update = CompleteBlockUpdate(
BlockInfo(height, state.lastChunkTs, state.latestSequencerEventTimestamp)
)
FutureUnlessShutdown.pure(newState -> update)
case NextChunk(height, index, chunksEvents) =>
blockChunkProcessor.processChunk(state, height, index, chunksEvents)
}
override def signChunkEvents(unsignedEvents: UnsignedChunkEvents)(implicit
ec: ExecutionContext
): FutureUnlessShutdown[SignedChunkEvents] = {
val UnsignedChunkEvents(
sender,
events,
signingSnapshot,
sequencingTimestamp,
sequencingSnapshot,
trafficStates,
submissionRequestTraceContext,
) = unsignedEvents
implicit val traceContext: TraceContext = submissionRequestTraceContext
val signingTimestamp = signingSnapshot.ipsSnapshot.timestamp
val signedEventsF =
maybeLowerTopologyTimestampBound match {
case Some(bound) if bound > signingTimestamp =>
// As the required topology snapshot timestamp is older than the lower topology timestamp bound, the timestamp
// of this sequencer's very first topology snapshot, tombstone the events. This enables subscriptions to signal to
// subscribers that this sequencer is not in a position to serve the events behind these sequencer counters.
// Comparing against the lower signing timestamp bound prevents tombstones in "steady-state" sequencing beyond
// "soon" after initial sequencer onboarding. See #13609
events.toSeq.parTraverse { case (member, event) =>
logger.info(
s"Sequencing tombstone for ${member.identifier} at ${event.timestamp} and ${event.counter}. Sequencer signing key at $signingTimestamp not available before the bound $bound."
)
// sign tombstones using key valid at sequencing timestamp as event timestamp has no signing key and we
// are not sequencing the event anyway, but the tombstone
val err = DeliverError.create(
event.counter,
sequencingTimestamp, // use sequencing timestamp for tombstone
domainId,
MessageId(String73.tryCreate("tombstone")), // dummy message id
SequencerErrors.PersistTombstone(event.timestamp, event.counter),
protocolVersion,
)
for {
signedContent <-
SignedContent
.create(
sequencingSnapshot.pureCrypto,
sequencingSnapshot,
err,
Some(sequencingSnapshot.ipsSnapshot.timestamp),
HashPurpose.SequencedEventSignature,
protocolVersion,
)
.valueOr { syncCryptoError =>
ErrorUtil.internalError(
new RuntimeException(
s"Error signing tombstone deliver error: $syncCryptoError"
)
)
}
} yield {
member -> OrdinarySequencedEvent(signedContent, None)(traceContext)
}
}
case _ =>
events.toSeq
.parTraverse { case (member, event) =>
SignedContent
.create(
signingSnapshot.pureCrypto,
signingSnapshot,
event,
None,
HashPurpose.SequencedEventSignature,
protocolVersion,
)
.valueOr(syncCryptoError =>
ErrorUtil.internalError(
new RuntimeException(s"Error signing events: $syncCryptoError")
)
)
.map { signedContent =>
// only include traffic state for the sender
val trafficStateO = Option.when(!unifiedSequencer || member == sender) {
trafficStates.getOrElse(
member,
ErrorUtil.invalidState(s"Sender $member unknown by rate limiter."),
)
}
member ->
OrdinarySequencedEvent(signedContent, trafficStateO)(traceContext)
}
}
}
signedEventsF.map(signedEvents => SignedChunkEvents(signedEvents.toMap))
}
}
object BlockUpdateGeneratorImpl {
type SignedEvents = NonEmpty[Map[Member, OrdinarySerializedEvent]]
type EventsForSubmissionRequest = Map[Member, SequencedEvent[ClosedEnvelope]]
private[block] final case class State(
lastBlockTs: CantonTimestamp,
lastChunkTs: CantonTimestamp,
latestSequencerEventTimestamp: Option[CantonTimestamp],
ephemeral: BlockUpdateEphemeralState,
)
private[update] final case class SequencedSubmission(
sequencingTimestamp: CantonTimestamp,
submissionRequest: SignedContent[SubmissionRequest],
sequencingSnapshot: SyncCryptoApi,
topologySnapshotO: Option[SyncCryptoApi],
)(val traceContext: TraceContext)
private[update] final case class RecipientStats(
participants: Boolean = false,
mediators: Boolean = false,
sequencers: Boolean = false,
broadcast: Boolean = false,
) {
private[block] def updateMetric(
sender: Member,
payloadSize: Int,
logger: TracedLogger,
metrics: BlockMetrics,
)(implicit traceContext: TraceContext): Unit = {
val messageType = {
// by looking at the recipient lists and the sender, we'll figure out what type of message we've been getting
(sender, participants, mediators, sequencers, broadcast) match {
case (ParticipantId(_), false, true, false, false) =>
"send-confirmation-response"
case (ParticipantId(_), true, true, false, false) =>
"send-confirmation-request"
case (MediatorId(_), true, false, false, false) =>
"send-verdict"
case (ParticipantId(_), true, false, false, false) =>
"send-commitment"
case (SequencerId(_), true, false, true, false) =>
"send-topup"
case (SequencerId(_), false, true, true, false) =>
"send-topup-med"
case (_, false, false, false, true) =>
"send-topology"
case (_, false, false, false, false) =>
"send-time-proof"
case _ =>
def r(boolean: Boolean, s: String) = if (boolean) Seq(s) else Seq.empty
val recipients = r(participants, "participants") ++
r(mediators, "mediators") ++
r(sequencers, "sequencers") ++
r(broadcast, "broadcast")
logger.warn(s"Unexpected message from $sender to " + recipients.mkString(","))
"send-unexpected"
}
}
val mc = MetricsContext(
"sender" -> sender.toString,
"type" -> messageType,
)
metrics.blockEvents.mark()(mc)
metrics.blockEventBytes.mark(payloadSize.longValue)(mc)
}
}
}

View File

@ -0,0 +1,264 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.domain.block.update
import cats.syntax.functor.*
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, SyncCryptoApi}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.data.BlockUpdateEphemeralState
import com.digitalasset.canton.domain.block.update.BlockUpdateGeneratorImpl.{
SequencedSubmission,
State,
}
import com.digitalasset.canton.domain.block.update.SequencedSubmissionsValidator.SequencedSubmissionsValidationResult
import com.digitalasset.canton.domain.block.update.SubmissionRequestValidator.SubmissionRequestValidationResult
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.CounterCheckpoint
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.SequencerRateLimitManager
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{ErrorUtil, MapsUtil, MonadUtil}
import com.digitalasset.canton.version.ProtocolVersion
import scala.concurrent.ExecutionContext
/** Validates a list of [[SequencedSubmission]]s corresponding to a chunk.
*/
private[update] final class SequencedSubmissionsValidator(
domainId: DomainId,
protocolVersion: ProtocolVersion,
domainSyncCryptoApi: DomainSyncCryptoClient,
sequencerId: SequencerId,
rateLimitManager: SequencerRateLimitManager,
override val loggerFactory: NamedLoggerFactory,
)(implicit closeContext: CloseContext)
extends NamedLogging {
private val submissionRequestValidator =
new SubmissionRequestValidator(
domainId,
protocolVersion,
domainSyncCryptoApi,
sequencerId,
rateLimitManager,
loggerFactory,
)
def validateSequencedSubmissions(
state: State,
height: Long,
submissionRequestsWithSnapshots: Seq[SequencedSubmission],
)(implicit ec: ExecutionContext): FutureUnlessShutdown[SequencedSubmissionsValidationResult] =
MonadUtil.foldLeftM(
initialState = SequencedSubmissionsValidationResult(ephemeralState = state.ephemeral),
submissionRequestsWithSnapshots,
)(validateSequencedSubmissionAndAddEvents(state.latestSequencerEventTimestamp, height))
/** @param latestSequencerEventTimestamp
* Since each chunk contains at most one event addressed to the sequencer,
* (and if so it's the last event), we can treat this timestamp static for the whole chunk and
* need not update it in the accumulator.
*/
private def validateSequencedSubmissionAndAddEvents(
latestSequencerEventTimestamp: Option[CantonTimestamp],
height: Long,
)(
partialResult: SequencedSubmissionsValidationResult,
sequencedSubmissionRequest: SequencedSubmission,
)(implicit ec: ExecutionContext): FutureUnlessShutdown[SequencedSubmissionsValidationResult] = {
val SequencedSubmissionsValidationResult(
stateFromPartialResult,
reversedEvents,
inFlightAggregationUpdates,
sequencerEventTimestampSoFar,
reversedOutcomes,
) = partialResult
val SequencedSubmission(
sequencingTimestamp,
signedSubmissionRequest,
sequencingSnapshot,
signingSnapshot,
) = sequencedSubmissionRequest
implicit val traceContext: TraceContext = sequencedSubmissionRequest.traceContext
ErrorUtil.requireState(
sequencerEventTimestampSoFar.isEmpty,
"Only the last event in a chunk could be addressed to the sequencer",
)
for {
newStateAndOutcome <-
submissionRequestValidator.validateAndGenerateSequencedEvents(
stateFromPartialResult,
sequencingTimestamp,
signedSubmissionRequest,
sequencingSnapshot,
signingSnapshot,
latestSequencerEventTimestamp,
)
SubmissionRequestValidationResult(newState, outcome, sequencerEventTimestamp) =
newStateAndOutcome
result <-
processSubmissionOutcome(
newState,
outcome,
resultIfNoDeliverEvents = partialResult,
inFlightAggregationUpdates,
sequencingSnapshot,
sequencingTimestamp,
sequencerEventTimestamp,
latestSequencerEventTimestamp,
signedSubmissionRequest,
remainingReversedEvents = reversedEvents,
remainingReversedOutcomes = reversedOutcomes,
)
_ = logger.debug(
s"At block $height, the submission request ${signedSubmissionRequest.content.messageId} " +
s"at $sequencingTimestamp created the following counters: \n" ++ outcome.eventsByMember
.map { case (member, sequencedEvent) =>
s"\t counter ${sequencedEvent.counter} for $member"
}
.mkString("\n")
)
} yield result
}
private def updateTrafficStates(
ephemeralState: BlockUpdateEphemeralState,
members: Set[Member],
sequencingTimestamp: CantonTimestamp,
snapshot: SyncCryptoApi,
latestTopologyTimestamp: Option[CantonTimestamp],
)(implicit
ec: ExecutionContext,
tc: TraceContext,
): FutureUnlessShutdown[BlockUpdateEphemeralState] =
snapshot.ipsSnapshot
.trafficControlParameters(protocolVersion)
.flatMap {
case Some(parameters) =>
val states = members
.flatMap(member => ephemeralState.trafficState.get(member).map(member -> _))
.toMap
rateLimitManager
.getUpdatedTrafficStatesAtTimestamp(
states,
sequencingTimestamp,
parameters,
latestTopologyTimestamp,
warnIfApproximate = ephemeralState.headCounterAboveGenesis(sequencerId),
)
.map { trafficStateUpdates =>
ephemeralState
.copy(trafficState =
ephemeralState.trafficState ++ trafficStateUpdates.view.mapValues(_.state).toMap
)
}
case _ => FutureUnlessShutdown.pure(ephemeralState)
}
private def processSubmissionOutcome(
state: BlockUpdateEphemeralState,
outcome: SubmissionRequestOutcome,
resultIfNoDeliverEvents: SequencedSubmissionsValidationResult,
inFlightAggregationUpdates: InFlightAggregationUpdates,
sequencingSnapshot: SyncCryptoApi,
sequencingTimestamp: CantonTimestamp,
sequencerEventTimestamp: Option[CantonTimestamp],
latestSequencerEventTimestamp: Option[CantonTimestamp],
signedSubmissionRequest: SignedContent[SubmissionRequest],
remainingReversedEvents: Seq[UnsignedChunkEvents],
remainingReversedOutcomes: Seq[SubmissionRequestOutcome],
)(implicit
ec: ExecutionContext,
traceContext: TraceContext,
): FutureUnlessShutdown[SequencedSubmissionsValidationResult] = {
val SubmissionRequestOutcome(
deliverEvents,
newAggregationO,
signingSnapshotO,
_,
) = outcome
NonEmpty.from(deliverEvents) match {
case None => // No state update if there is nothing to deliver
FutureUnlessShutdown.pure(resultIfNoDeliverEvents)
case Some(deliverEventsNE) =>
val newCheckpoints = state.checkpoints ++ deliverEvents.fmap(d =>
CounterCheckpoint(d.counter, d.timestamp, None)
) // ordering of the two operands matters
val (newInFlightAggregations, newInFlightAggregationUpdates) =
newAggregationO.fold(state.inFlightAggregations -> inFlightAggregationUpdates) {
case (aggregationId, inFlightAggregationUpdate) =>
InFlightAggregations.tryApplyUpdates(
state.inFlightAggregations,
Map(aggregationId -> inFlightAggregationUpdate),
ignoreInFlightAggregationErrors = false,
) ->
MapsUtil.extendedMapWith(
inFlightAggregationUpdates,
Iterable(aggregationId -> inFlightAggregationUpdate),
)(_ tryMerge _)
}
val newState =
state.copy(
inFlightAggregations = newInFlightAggregations,
checkpoints = newCheckpoints,
)
// If we haven't yet computed a snapshot for signing,
// we now get one for the sequencing timestamp
val signingSnapshot = signingSnapshotO.getOrElse(sequencingSnapshot)
for {
// Update the traffic status of the recipients before generating the events below.
// Typically traffic state might change even for recipients if a top up becomes effective at that timestamp
// Doing this here ensures that the traffic state persisted for the event is correct
// It's also important to do this here after group -> Set[member] resolution has been performed so we get
// the actual member recipients
trafficUpdatedState <-
updateTrafficStates(
newState,
deliverEventsNE.keySet,
sequencingTimestamp,
sequencingSnapshot,
latestSequencerEventTimestamp,
)
} yield {
val unsignedEvents = UnsignedChunkEvents(
signedSubmissionRequest.content.sender,
deliverEventsNE,
signingSnapshot,
sequencingTimestamp,
sequencingSnapshot,
trafficUpdatedState.trafficState.view.mapValues(_.toSequencedEventTrafficState),
traceContext,
)
SequencedSubmissionsValidationResult(
trafficUpdatedState,
unsignedEvents +: remainingReversedEvents,
newInFlightAggregationUpdates,
sequencerEventTimestamp,
outcome +: remainingReversedOutcomes,
)
}
}
}
}
private[update] object SequencedSubmissionsValidator {
final case class SequencedSubmissionsValidationResult(
ephemeralState: BlockUpdateEphemeralState,
reversedSignedEvents: Seq[UnsignedChunkEvents] = Seq.empty,
inFlightAggregationUpdates: InFlightAggregationUpdates = Map.empty,
lastSequencerEventTimestamp: Option[CantonTimestamp] = None,
reversedOutcomes: Seq[SubmissionRequestOutcome] = Seq.empty,
)
}

View File

@ -29,13 +29,11 @@ import com.digitalasset.canton.domain.mediator.store.{
import com.digitalasset.canton.domain.metrics.MediatorMetrics
import com.digitalasset.canton.domain.service.GrpcSequencerConnectionService
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.admin.data.MediatorNodeStatus
import com.digitalasset.canton.health.{
ComponentStatus,
DependenciesHealthService,
GrpcHealthReporter,
LivenessHealthService,
MutableHealthComponent,
import com.digitalasset.canton.health.*
import com.digitalasset.canton.health.admin.data.{
MediatorNodeStatus,
WaitingForExternalInput,
WaitingForInitialization,
}
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, HasCloseContext, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
@ -278,6 +276,10 @@ class MediatorNodeBootstrap(
)
}
override def waitingFor: Option[WaitingForExternalInput] = Some(
WaitingForInitialization
)
override protected def autoCompleteStage()
: EitherT[FutureUnlessShutdown, String, Option[DomainId]] =
EitherT.rightT(None) // this stage doesn't have auto-init

View File

@ -30,7 +30,12 @@ import com.digitalasset.canton.domain.sequencing.sequencer.store.{
import com.digitalasset.canton.domain.sequencing.service.GrpcSequencerInitializationService
import com.digitalasset.canton.domain.server.DynamicDomainGrpcServer
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.admin.data.{SequencerHealthStatus, SequencerNodeStatus}
import com.digitalasset.canton.health.admin.data.{
SequencerHealthStatus,
SequencerNodeStatus,
WaitingForExternalInput,
WaitingForInitialization,
}
import com.digitalasset.canton.health.{
ComponentStatus,
DependenciesHealthService,
@ -320,6 +325,9 @@ class SequencerNodeBootstrap(
}
}
override def waitingFor: Option[WaitingForExternalInput] =
Some(WaitingForInitialization)
override protected def autoCompleteStage(): EitherT[FutureUnlessShutdown, String, Option[
(StaticDomainParameters, SequencerFactory, DomainTopologyManager)
]] =

View File

@ -11,12 +11,9 @@ import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, HashPurpose, Signature}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockSequencerStateManagerBase
import com.digitalasset.canton.domain.block.data.SequencerBlockStore
import com.digitalasset.canton.domain.block.{
BlockSequencerStateManagerBase,
BlockUpdateGeneratorImpl,
LocalBlockUpdate,
}
import com.digitalasset.canton.domain.block.update.{BlockUpdateGeneratorImpl, LocalBlockUpdate}
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.PruningError.UnsafePruningPoint
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.{

View File

@ -543,12 +543,7 @@ class GrpcSequencerService(
sender match {
// Rate limiting only if participants send to participants.
case participantId: ParticipantId if request.batch.allRecipients.exists {
case AllMembersOfDomain => true
case MemberRecipient(_: ParticipantId) => true
case ParticipantsOfParty(_) => true
case _: Recipient => false
} =>
case participantId: ParticipantId if request.isConfirmationRequest =>
for {
confirmationRequestsMaxRate <- EitherTUtil
.fromFuture(

View File

@ -16,16 +16,18 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockSequencerStateManager.ChunkState
import com.digitalasset.canton.domain.block.data.memory.InMemorySequencerBlockStore
import com.digitalasset.canton.domain.block.data.{BlockEphemeralState, BlockInfo, EphemeralState}
import com.digitalasset.canton.domain.block.update.{
BlockUpdate,
BlockUpdateGenerator,
OrderedBlockUpdate,
SignedChunkEvents,
}
import com.digitalasset.canton.domain.block.{
BlockEvents,
BlockSequencerStateManager,
BlockSequencerStateManagerBase,
BlockUpdate,
BlockUpdateGenerator,
OrderedBlockUpdate,
RawLedgerBlock,
SequencerDriverHealthStatus,
SignedChunkEvents,
}
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequest

View File

@ -222,6 +222,26 @@ class GrpcSequencerServiceTest
)
}
private lazy val defaultConfirmationRequest: SubmissionRequest = {
val sender: Member = participant
val recipientPar = MemberRecipient(DefaultTestIdentities.participant2)
val recipientMed = MediatorGroupRecipient(NonNegativeInt.zero)
mkSubmissionRequest(
Batch(
List(
ClosedEnvelope.create(
content,
Recipients.cc(recipientPar, recipientMed),
Seq.empty,
testedProtocolVersion,
)
),
testedProtocolVersion,
),
sender,
)
}
"send signed" should {
def signedSubmissionReq(
request: SubmissionRequest
@ -438,14 +458,14 @@ class GrpcSequencerServiceTest
)
}
"reject on rate excess" in { implicit env =>
"reject on confirmation rate excess" in { implicit env =>
def expectSuccess(): Future[Assertion] = {
sendAndCheckSucceed(defaultRequest)
sendAndCheckSucceed(defaultConfirmationRequest)
}
def expectOneSuccessOneOverloaded(): Future[Assertion] = {
val result1F = send(defaultRequest, authenticated = true)
val result2F = send(defaultRequest, authenticated = true)
val result1F = send(defaultConfirmationRequest, authenticated = true)
val result2F = send(defaultConfirmationRequest, authenticated = true)
for {
result1 <- result1F
result2 <- result2F
@ -461,8 +481,9 @@ class GrpcSequencerServiceTest
case (Some(error), None) => assertOverloadedError(error)
case (None, Some(error)) => assertOverloadedError(error)
case (Some(_), Some(_)) =>
fail("at least one successful submition expected, but both failed")
case (None, None) => fail("at least one overloaded submition expected, but none failed")
fail("at least one successful submission expected, but both failed")
case (None, None) =>
fail("at least one overloaded submission expected, but none failed")
}
}
}

View File

@ -1 +1 @@
20240520.13324.vdf52c631
20240521.13331.v449e982c