update canton to 20240530.13394.v9f8b7db6 (#19309)

* update canton to 20240530.13394.v9f8b7db6

tell-slack: canton

* sync canton to a6f07355d1

* sync to canton 5646e99c55

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Paul Brauner <paul.brauner@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-05-31 18:10:09 +00:00 committed by GitHub
parent 4ba35794d2
commit 60488be037
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
100 changed files with 1447 additions and 512 deletions

View File

@ -25,6 +25,17 @@ service InspectionService {
rpc LookupOffsetByTime(LookupOffsetByTime.Request) returns (LookupOffsetByTime.Response);
// Look up the ledger offset by an index, e.g. 1 returns the first offset, 2 the second, etc.
rpc LookupOffsetByIndex(LookupOffsetByIndex.Request) returns (LookupOffsetByIndex.Response);
// Configure metrics for slow counter-participants (i.e., that are behind in sending commitments) and
// configure thresholds for when a counter-participant is deemed slow.
// TODO(#10436) R7
rpc SetConfigForSlowCounterParticipants(SetConfigForSlowCounterParticipants.Request) returns (SetConfigForSlowCounterParticipants.Response);
// Get the current configuration for metrics for slow counter-participants.
// TODO(#10436) R7
rpc GetConfigForSlowCounterParticipants(GetConfigForSlowCounterParticipants.Request) returns (GetConfigForSlowCounterParticipants.Response);
// Get the number of intervals that counter-participants are behind in sending commitments.
// Can be used to decide whether to ignore slow counter-participants w.r.t. pruning.
// TODO(#10436) R7
rpc GetIntervalsBehindForCounterParticipants(GetIntervalsBehindForCounterParticipants.Request) returns (GetIntervalsBehindForCounterParticipants.Response);
}
message LookupContractDomain {
@ -78,3 +89,71 @@ message LookupOffsetByIndex {
string offset = 1;
}
}
/*
The configuration concerns the following metrics, and each of the metrics is issued per domain:
- The maximum number of intervals that a distiguished participant falls behind
- The maximum number of intervals that a participant in the "default" group falls behind
- The number of participants in the distiguished group that are behind by at least `thresholdDistiguished`
reconciliation intervals.
- The number of participants in the "default" group that are behind by at least `thresholdDefault`
reconciliation intervals.
- Selected participants for which we publish independent metrics counting how many intervals they are behind
*/
message SlowCounterParticipantDomainConfig {
// the domains for which we apply the settings below
repeated string domain_ids = 1;
// all non-distinguished counter-participants are implicitly part of the group "default"
repeated string distinguished_participant_uids = 2;
// the number of reconciliation intervals for a distiguished counter-participant to be considered slow
uint64 threshold_distinguished = 3;
// the number of reconciliation intervals for an "other" counter-participant to be considered slow
uint64 threshold_default = 4;
// participants for which we publish independent metrics counting how many intervals they are behind
repeated string participant_uids_metrics = 5;
}
message SetConfigForSlowCounterParticipants {
message Request {
// we can specify multiple configurations, each being applied to one or more domains
// if the domain IDs in the different configs overlap, the latest config in the order of the "configs" takes precedence
repeated SlowCounterParticipantDomainConfig configs = 1;
}
message Response {}
}
message GetConfigForSlowCounterParticipants {
message Request {
// filters by domains
repeated string domain_ids = 2;
}
message Response {
repeated SlowCounterParticipantDomainConfig configs = 1;
}
}
message GetIntervalsBehindForCounterParticipants {
message CounterParticipantInfo {
string counter_participant_uid = 1;
string domain_id = 2;
// MaxInt means that the counter-participant never sent a commitment
uint64 intervals_behind = 3;
// the "as of" sequencing timestamp at which this information was obtained
google.protobuf.Timestamp as_of_sequencing_timestamp = 4;
}
message Request {
// if empty, all counter-participants are considered
repeated string counter_participant_uids = 1;
// if empty, all domains are considered
repeated string domain_ids = 2;
// if set, only counter-participants that are behind by at least this number of intervals are returned
optional uint64 threshold = 3;
}
message Response {
repeated CounterParticipantInfo intervals_behind = 1;
}
}

View File

@ -51,6 +51,22 @@ service PruningService {
// Retrieve the automatic, participant-specific pruning configuration.
rpc GetParticipantSchedule(com.digitalasset.canton.admin.pruning.v30.GetParticipantSchedule.Request) returns (com.digitalasset.canton.admin.pruning.v30.GetParticipantSchedule.Response);
// TODO(#18453) R6
// Disable waiting for commitments from the given counter-participants
// Disabling waiting for commitments disregards these counter-participants w.r.t. pruning, which gives up
// non-repudiation for those counter-participants, but increases pruning resilience to failures
// and slowdowns of those counter-participants and/or the network
rpc SetNoWaitCommitmentsFrom(com.digitalasset.canton.admin.pruning.v30.SetNoWaitCommitmentsFrom.Request) returns (com.digitalasset.canton.admin.pruning.v30.SetNoWaitCommitmentsFrom.Response);
// TODO(#18453) R6
// Enable waiting for commitments from the given counter-participants
// Waiting for commitments is the default behavior; explicitly enabling it is useful if it was explicitly disabled
rpc ResetNoWaitCommitmentsFrom(com.digitalasset.canton.admin.pruning.v30.ResetNoWaitCommitmentsFrom.Request) returns (com.digitalasset.canton.admin.pruning.v30.ResetNoWaitCommitmentsFrom.Response);
// TODO(#18453) R6
// Retrieve the configuration of waiting for commitments from counter-participants
rpc GetNoWaitCommitmentsFrom(com.digitalasset.canton.admin.pruning.v30.GetNoWaitCommitmentsFrom.Request) returns (com.digitalasset.canton.admin.pruning.v30.GetNoWaitCommitmentsFrom.Response);
}
message PruneRequest {

View File

@ -84,3 +84,93 @@ message LocatePruningTimestamp {
google.protobuf.Timestamp timestamp = 1;
}
}
// overwrites previous no-wait-configuration on the given domains for the given counter participants
message SetNoWaitCommitmentsFrom {
message Request {
// ignore the given participants; an empty list has no effect
repeated string counter_participant_uids = 1;
// pruning calls with an offset larger or equal to (the given pruning_offset/ to the offset equivalent to the given
// sequencing time) will disregard the given counter participants
oneof timestamp_or_offset {
google.protobuf.Timestamp sequencing_timestamp = 3;
string pruning_offset = 4;
}
// ignore the given participants on these domains
// an empty list has no effect
repeated string domain_ids = 5;
}
message Response {
// counter participants for which we updated the no-wait-config and on which domains
map <string, Domains> participant_domains_mapping = 1;
}
}
// deletes previous no-wait-configuration on the given domains for the given counter participants
message ResetNoWaitCommitmentsFrom {
message Request {
// an empty list has no effect
repeated string counter_participant_uids = 1;
// an empty list has no effect
repeated string domain_ids = 2;
}
message Response {
// counter participants for which we updated the no-wait-config and on which domains
map <string, Domains> participant_domains_mapping = 1;
}
}
message Domains {
repeated string domain_ids = 1;
}
// Returns the state of no-wait-config at the time the request executes, with optional filtering on domains and
// counter participants
// The response includes the status of the specified counter participants on the specified domains, regardless of
// whether these counter participants have shared contracts with the participant at the time the call executes.
// If no counter participants / domains are specified, the response includes all counter participants that are
// known to the local participant at the time when the call executes, regardless of whether they have shared contracts
// with the participant at the time the call executes.
// Even if some participants may not be connected to some domains at the time the query executes, the response still
// includes them if they are known to the participant or specified in the arguments.
message GetNoWaitCommitmentsFrom {
message Request {
repeated string domain_ids = 1;
repeated string participant_uids = 2;
}
message Response {
// participants that are ignored
repeated NoWaitCommitmentsSetup ignored_participants = 1;
// participants that are not ignored
repeated WaitCommitmentsSetup not_ignored_participants = 2;
}
}
message NoWaitCommitmentsSetup {
string counter_participant_uid = 1;
// since when the counter participant is ignored
oneof timestamp_or_offset_active {
google.protobuf.Timestamp sequencing_timestamp = 2;
string pruning_offset = 3;
}
Domains domain_ids = 4;
SharedContractsState counter_participant_state = 5;
}
message WaitCommitmentsSetup {
string counter_participant_uid = 1;
Domains domain_ids = 2;
SharedContractsState counter_participant_state = 3;
}
enum SharedContractsState {
STATE_UNSPECIFIED = 0;
// the participant has shared contracts with the counter participant
SHARED_CONTRACTS = 1;
// the participant knows the counter participant but has no shared contracts
NO_SHARED_CONTRACTS = 2;
}

View File

@ -29,7 +29,8 @@ import com.digitalasset.canton.admin.participant.v30.ResourceManagementServiceGr
import com.digitalasset.canton.admin.participant.v30.TransferServiceGrpc.TransferServiceStub
import com.digitalasset.canton.admin.participant.v30.{ResourceLimits as _, *}
import com.digitalasset.canton.admin.pruning
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.admin.pruning.v30.{NoWaitCommitmentsSetup, WaitCommitmentsSetup}
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.participant.admin.ResourceLimits
@ -38,11 +39,13 @@ import com.digitalasset.canton.participant.admin.grpc.{
TransferSearchResult,
}
import com.digitalasset.canton.participant.domain.DomainConnectionConfig as CDomainConnectionConfig
import com.digitalasset.canton.participant.pruning.AcsCommitmentProcessor.SharedContractsState
import com.digitalasset.canton.participant.sync.UpstreamOffsetConvert
import com.digitalasset.canton.protocol.LfContractId
import com.digitalasset.canton.sequencing.SequencerConnectionValidation
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.InstantConverter
import com.digitalasset.canton.topology.{DomainId, PartyId}
import com.digitalasset.canton.topology.{DomainId, ParticipantId, PartyId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.traffic.MemberTrafficStatus
import com.digitalasset.canton.util.BinaryFileUtil
@ -1007,6 +1010,160 @@ object ParticipantAdminCommands {
Right(response.offset)
}
// TODO(#10436) R7: The code below should be sufficient.
final case class SetConfigForSlowCounterParticipants(
configs: Seq[SlowCounterParticipantDomainConfig]
) extends Base[
v30.SetConfigForSlowCounterParticipants.Request,
v30.SetConfigForSlowCounterParticipants.Response,
Unit,
] {
override def createRequest() = Right(
v30.SetConfigForSlowCounterParticipants
.Request(
configs.map(_.toProtoV30)
)
)
override def submitRequest(
service: InspectionServiceStub,
request: v30.SetConfigForSlowCounterParticipants.Request,
): Future[v30.SetConfigForSlowCounterParticipants.Response] =
service.setConfigForSlowCounterParticipants(request)
override def handleResponse(
response: v30.SetConfigForSlowCounterParticipants.Response
): Right[
String,
Unit,
] = Right(())
}
final case class SlowCounterParticipantDomainConfig(
domainIds: Seq[DomainId],
distinguishedParticipants: Seq[ParticipantId],
thresholdDistinguished: NonNegativeInt,
thresholdDefault: NonNegativeInt,
participantsMetrics: Seq[ParticipantId],
) {
def toProtoV30: v30.SlowCounterParticipantDomainConfig = {
v30.SlowCounterParticipantDomainConfig(
domainIds.map(_.toProtoPrimitive),
distinguishedParticipants.map(_.toProtoPrimitive),
thresholdDistinguished.value.toLong,
thresholdDefault.value.toLong,
participantsMetrics.map(_.toProtoPrimitive),
)
}
}
object SlowCounterParticipantDomainConfig {
def fromProtoV30(
config: v30.SlowCounterParticipantDomainConfig
): Either[String, SlowCounterParticipantDomainConfig] = {
val thresholdDistinguished = NonNegativeInt.tryCreate(config.thresholdDistinguished.toInt)
val thresholdDefault = NonNegativeInt.tryCreate(config.thresholdDefault.toInt)
val distinguishedParticipants =
config.distinguishedParticipantUids.map(ParticipantId.tryFromProtoPrimitive)
val participantsMetrics =
config.participantUidsMetrics.map(ParticipantId.tryFromProtoPrimitive)
for {
domainIds <- config.domainIds.map(DomainId.fromString).sequence
} yield SlowCounterParticipantDomainConfig(
domainIds,
distinguishedParticipants,
thresholdDistinguished,
thresholdDefault,
participantsMetrics,
)
}
}
// TODO(#10436) R7: The code below should be sufficient.
final case class GetConfigForSlowCounterParticipants(
domainIds: Seq[DomainId]
) extends Base[
v30.GetConfigForSlowCounterParticipants.Request,
v30.GetConfigForSlowCounterParticipants.Response,
Seq[SlowCounterParticipantDomainConfig],
] {
override def createRequest() = Right(
v30.GetConfigForSlowCounterParticipants
.Request(
domainIds.map(_.toProtoPrimitive)
)
)
override def submitRequest(
service: InspectionServiceStub,
request: v30.GetConfigForSlowCounterParticipants.Request,
): Future[v30.GetConfigForSlowCounterParticipants.Response] =
service.getConfigForSlowCounterParticipants(request)
override def handleResponse(
response: v30.GetConfigForSlowCounterParticipants.Response
): Either[String, Seq[SlowCounterParticipantDomainConfig]] =
response.configs.map(SlowCounterParticipantDomainConfig.fromProtoV30).sequence
}
final case class CounterParticipantInfo(
participantId: ParticipantId,
domainId: DomainId,
intervalsBehind: PositiveInt,
asOfSequencingTimestamp: Instant,
)
// TODO(#10436) R7: The code below should be sufficient.
final case class GetIntervalsBehindForCounterParticipants(
counterParticipants: Seq[ParticipantId],
domainIds: Seq[DomainId],
threshold: NonNegativeInt,
) extends Base[
v30.GetIntervalsBehindForCounterParticipants.Request,
v30.GetIntervalsBehindForCounterParticipants.Response,
Seq[CounterParticipantInfo],
] {
override def createRequest() = Right(
v30.GetIntervalsBehindForCounterParticipants
.Request(
counterParticipants.map(_.toProtoPrimitive),
domainIds.map(_.toProtoPrimitive),
Some(threshold.value.toLong),
)
)
override def submitRequest(
service: InspectionServiceStub,
request: v30.GetIntervalsBehindForCounterParticipants.Request,
): Future[v30.GetIntervalsBehindForCounterParticipants.Response] =
service.getIntervalsBehindForCounterParticipants(request)
override def handleResponse(
response: v30.GetIntervalsBehindForCounterParticipants.Response
): Either[String, Seq[CounterParticipantInfo]] = {
response.intervalsBehind.map { info =>
for {
domainId <- DomainId.fromString(info.domainId)
asOf <- ProtoConverter
.parseRequired(
CantonTimestamp.fromProtoTimestamp,
"as_of",
info.asOfSequencingTimestamp,
)
.leftMap(_.toString)
} yield CounterParticipantInfo(
ParticipantId.tryFromProtoPrimitive(info.counterParticipantUid),
domainId,
PositiveInt.tryCreate(info.intervalsBehind.toInt),
asOf.toInstant,
)
}.sequence
}
}
}
object Pruning {
@ -1106,6 +1263,245 @@ object ParticipantAdminCommands {
}
}
// TODO(#18453) R6: The code below should be sufficient.
final case class SetNoWaitCommitmentsFrom(
counterParticipants: Seq[ParticipantId],
domainIds: Seq[DomainId],
startingAt: Either[Instant, ParticipantOffset],
) extends Base[
pruning.v30.SetNoWaitCommitmentsFrom.Request,
pruning.v30.SetNoWaitCommitmentsFrom.Response,
Map[ParticipantId, Seq[DomainId]],
] {
override def createRequest(): Either[String, pruning.v30.SetNoWaitCommitmentsFrom.Request] = {
for {
tsOrOffset <- startingAt match {
case Right(offset) =>
offset.value match {
case Value.Absolute(value) =>
Right(Right(value).withLeft[CantonTimestamp]).withLeft[String]
case other => Left(s"Unable to convert ledger_end `$other` to absolute value")
}
case Left(ts) =>
CantonTimestamp.fromInstant(ts) match {
case Left(value) => Left(value)
case Right(value) => Right(Left(value).withRight[String]).withLeft[String]
}
}
} yield pruning.v30.SetNoWaitCommitmentsFrom.Request(
counterParticipants.map(_.toProtoPrimitive),
tsOrOffset match {
case Left(ts) =>
pruning.v30.SetNoWaitCommitmentsFrom.Request.TimestampOrOffset
.SequencingTimestamp(ts.toProtoTimestamp)
case Right(offset) =>
pruning.v30.SetNoWaitCommitmentsFrom.Request.TimestampOrOffset.PruningOffset(offset)
},
domainIds.map(_.toProtoPrimitive),
)
}
override def submitRequest(
service: Svc,
request: pruning.v30.SetNoWaitCommitmentsFrom.Request,
): Future[pruning.v30.SetNoWaitCommitmentsFrom.Response] =
service.setNoWaitCommitmentsFrom(request)
override def handleResponse(
response: pruning.v30.SetNoWaitCommitmentsFrom.Response
): Either[String, Map[ParticipantId, Seq[DomainId]]] = {
val m = response.participantDomainsMapping
.map { case (participant, domains) =>
ParticipantId.tryFromProtoPrimitive(participant) ->
domains.domainIds.map(DomainId.fromString).sequence
}
if (m.forall(_._2.isRight)) Right(m.map { case (id, either) =>
id -> either.getOrElse(Seq.empty)
})
else
Left("Error parsing response of setNoWaitCommitmentsFrom")
}
}
// TODO(#18453) R6: The code below should be sufficient.
final case class NoWaitCommitments(
counterParticipant: ParticipantId,
startingAt: Either[Instant, ParticipantOffset],
domains: Seq[DomainId],
state: SharedContractsState,
)
object NoWaitCommitments {
def fromSetup(setup: Seq[NoWaitCommitmentsSetup]): Either[String, Seq[NoWaitCommitments]] = {
val s = setup.map(setup =>
for {
ts <- setup.timestampOrOffsetActive match {
case NoWaitCommitmentsSetup.TimestampOrOffsetActive.SequencingTimestamp(ts) =>
CantonTimestamp.fromProtoTimestamp(ts).leftMap(_.toString)
case _ => Left("Conversion error for timestamp in ignoredParticipants")
}
offset <- setup.timestampOrOffsetActive match {
case NoWaitCommitmentsSetup.TimestampOrOffsetActive.PruningOffset(offset) =>
Right(UpstreamOffsetConvert.toParticipantOffset(offset))
case _ => Left("Conversion error for Offset in ignoredParticipants")
}
domains <- setup.domainIds.traverse(_.domainIds.traverse(DomainId.fromString))
state <- SharedContractsState
.fromProtoV30(setup.counterParticipantState)
.leftMap(_.toString)
} yield NoWaitCommitments(
ParticipantId.tryFromProtoPrimitive(setup.counterParticipantUid),
setup.timestampOrOffsetActive match {
case NoWaitCommitmentsSetup.TimestampOrOffsetActive.SequencingTimestamp(_) =>
ts.toInstant.asLeft[ParticipantOffset]
case _ => offset.asRight[Instant]
},
domains.getOrElse(Seq.empty),
state,
)
)
if (s.forall(_.isRight)) {
Right(
s.map(
_.getOrElse(
NoWaitCommitments(
ParticipantId.tryFromProtoPrimitive("error"),
Left(Instant.EPOCH),
Seq.empty,
SharedContractsState.NoSharedContracts,
)
)
)
)
} else
Left("Error parsing response of getNoWaitCommitmentsFrom")
}
}
// TODO(#18453) R6: The code below should be sufficient.
final case class SetWaitCommitmentsFrom(
counterParticipants: Seq[ParticipantId],
domainIds: Seq[DomainId],
) extends Base[
pruning.v30.ResetNoWaitCommitmentsFrom.Request,
pruning.v30.ResetNoWaitCommitmentsFrom.Response,
Map[ParticipantId, Seq[DomainId]],
] {
override def createRequest(): Right[String, pruning.v30.ResetNoWaitCommitmentsFrom.Request] =
Right(
pruning.v30.ResetNoWaitCommitmentsFrom.Request(
counterParticipants.map(_.toProtoPrimitive),
domainIds.map(_.toProtoPrimitive),
)
)
override def submitRequest(
service: Svc,
request: pruning.v30.ResetNoWaitCommitmentsFrom.Request,
): Future[pruning.v30.ResetNoWaitCommitmentsFrom.Response] =
service.resetNoWaitCommitmentsFrom(request)
override def handleResponse(
response: pruning.v30.ResetNoWaitCommitmentsFrom.Response
): Either[String, Map[ParticipantId, Seq[DomainId]]] = {
val m = response.participantDomainsMapping
.map { case (participant, domains) =>
ParticipantId.tryFromProtoPrimitive(participant) ->
domains.domainIds.map(DomainId.fromString).sequence
}
if (m.forall(_._2.isRight)) Right(m.map { case (id, either) =>
id -> either.getOrElse(Seq.empty)
})
else
Left("Error parsing response of resetNoWaitCommitmentsFrom")
}
}
// TODO(#18453) R6: The code below should be sufficient.
final case class WaitCommitments(
counterParticipant: ParticipantId,
domains: Seq[DomainId],
state: SharedContractsState,
)
object WaitCommitments {
def fromSetup(setup: Seq[WaitCommitmentsSetup]): Either[String, Seq[WaitCommitments]] = {
val s = setup.map(setup =>
for {
domains <- setup.domainIds.traverse(_.domainIds.traverse(DomainId.fromString))
state <- SharedContractsState
.fromProtoV30(setup.counterParticipantState)
.leftMap(_.toString)
} yield WaitCommitments(
ParticipantId.tryFromProtoPrimitive(setup.counterParticipantUid),
domains.getOrElse(Seq.empty),
state,
)
)
if (s.forall(_.isRight)) {
Right(
s.map(
_.getOrElse(
WaitCommitments(
ParticipantId.tryFromProtoPrimitive("error"),
Seq.empty,
SharedContractsState.NoSharedContracts,
)
)
)
)
} else
Left("Error parsing response of getNoWaitCommitmentsFrom")
}
}
// TODO(#18453) R6: The code below should be sufficient.
final case class GetNoWaitCommitmentsFrom(
domains: Seq[DomainId],
counterParticipants: Seq[ParticipantId],
) extends Base[
pruning.v30.GetNoWaitCommitmentsFrom.Request,
pruning.v30.GetNoWaitCommitmentsFrom.Response,
(Seq[NoWaitCommitments], Seq[WaitCommitments]),
] {
override def createRequest(): Right[String, pruning.v30.GetNoWaitCommitmentsFrom.Request] =
Right(
pruning.v30.GetNoWaitCommitmentsFrom.Request(
domains.map(_.toProtoPrimitive),
counterParticipants.map(_.toProtoPrimitive),
)
)
override def submitRequest(
service: Svc,
request: pruning.v30.GetNoWaitCommitmentsFrom.Request,
): Future[pruning.v30.GetNoWaitCommitmentsFrom.Response] =
service.getNoWaitCommitmentsFrom(request)
override def handleResponse(
response: pruning.v30.GetNoWaitCommitmentsFrom.Response
): Either[String, (Seq[NoWaitCommitments], Seq[WaitCommitments])] = {
val ignoredCounterParticipants = NoWaitCommitments.fromSetup(response.ignoredParticipants)
val nonIgnoredCounterParticipants =
WaitCommitments.fromSetup(response.notIgnoredParticipants)
if (ignoredCounterParticipants.isLeft || nonIgnoredCounterParticipants.isLeft) {
Left("Error parsing response of getNoWaitCommitmentsFrom")
} else {
Right(
(
ignoredCounterParticipants.getOrElse(Seq.empty),
nonIgnoredCounterParticipants.getOrElse(Seq.empty),
)
)
}
}
}
final case class GetParticipantScheduleCommand()
extends Base[
pruning.v30.GetParticipantSchedule.Request,

View File

@ -13,8 +13,8 @@ import cats.data.Validated
import cats.syntax.either.*
import cats.syntax.functor.*
import com.daml.jwt.JwtTimestampLeeway
import com.daml.metrics.MetricsFilterConfig
import com.daml.metrics.api.MetricQualification
import com.daml.metrics.{HistogramDefinition, MetricsFilterConfig}
import com.daml.nonempty.NonEmpty
import com.daml.nonempty.catsinstances.*
import com.digitalasset.canton.config.CantonRequireTypes.LengthLimitedString.{
@ -82,7 +82,6 @@ import com.digitalasset.canton.pureconfigutils.HttpServerConfig
import com.digitalasset.canton.pureconfigutils.SharedConfigReaders.catchConvertError
import com.digitalasset.canton.sequencing.authentication.AuthenticationTokenManagerConfig
import com.digitalasset.canton.sequencing.client.SequencerClientConfig
import com.daml.metrics.HistogramDefinition
import com.digitalasset.canton.time.EnrichedDurations.RichNonNegativeFiniteDurationConfig
import com.digitalasset.canton.tracing.TracingConfig
import com.typesafe.config.ConfigException.UnresolvedSubstitution

View File

@ -8,9 +8,21 @@ import cats.syntax.option.*
import cats.syntax.traverse.*
import com.daml.ledger.api.v2.participant_offset.ParticipantOffset
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommands.Inspection.{
CounterParticipantInfo,
GetConfigForSlowCounterParticipants,
GetIntervalsBehindForCounterParticipants,
SetConfigForSlowCounterParticipants,
SlowCounterParticipantDomainConfig,
}
import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommands.Pruning.{
GetNoWaitCommitmentsFrom,
GetParticipantScheduleCommand,
NoWaitCommitments,
SetNoWaitCommitmentsFrom,
SetParticipantScheduleCommand,
SetWaitCommitmentsFrom,
WaitCommitments,
}
import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommands.Resources.{
GetResourceLimits,
@ -25,7 +37,7 @@ import com.digitalasset.canton.admin.api.client.data.{
import com.digitalasset.canton.admin.participant.v30
import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc
import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc.PruningServiceStub
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.config.{DomainTimeTrackerConfig, NonNegativeDuration}
import com.digitalasset.canton.console.{
AdminCommandRunner,
@ -755,6 +767,207 @@ class LocalCommitmentsAdministrationGroup(
)
}
// TODO(#18453) R6: The code below should be sufficient.
@Help.Summary(
"Disable waiting for commitments from the given counter-participants."
)
@Help.Description(
"""Disabling waiting for commitments disregards these counter-participants w.r.t. pruning,
|which gives up non-repudiation for those counter-participants, but increases pruning resilience
|to failures and slowdowns of those counter-participants and/or the network.
|The command returns a map of counter-participants and the domains for which the setting was changed.
|Returns an error if `startingAt` does not translate to an existing offset.
|If the participant set is empty, the command does nothing."""
)
def setNoWaitCommitmentsFrom(
counterParticipants: Seq[ParticipantId],
domainIds: Seq[DomainId],
startingAt: Either[Instant, ParticipantOffset],
): Map[ParticipantId, Seq[DomainId]] = {
consoleEnvironment.run(
runner.adminCommand(
SetNoWaitCommitmentsFrom(
counterParticipants,
domainIds,
startingAt,
)
)
)
}
// TODO(#18453) R6: The code below should be sufficient.
@Help.Summary(
"Enable waiting for commitments from the given counter-participants. This is the default behavior; enabling waiting" +
"for commitments is only necessary if it was previously disabled."
)
@Help.Description(
"""Enables waiting for commitments, which blocks pruning at offsets where commitments from these counter-participants
|are missing.
|The command returns a map of counter-participants and the domains for which the setting was changed.
|If the participant set is empty or the domain set is empty, the command does nothing."""
)
def setWaitCommitmentsFrom(
counterParticipants: Seq[ParticipantId],
domainIds: Seq[DomainId],
): Map[ParticipantId, Seq[DomainId]] = {
consoleEnvironment.run(
runner.adminCommand(
SetWaitCommitmentsFrom(
counterParticipants,
domainIds,
)
)
)
}
// TODO(#18453) R6: The code below should be sufficient.
@Help.Summary(
"Retrieves the latest (i.e., w.r.t. the query execution time) configuration of waiting for commitments from counter-participants."
)
@Help.Description(
"""The configuration for waiting for commitments from counter-participants is returned as two sets:
|a set of ignored counter-participants, the domains and the timestamp, and a set of not-ignored
|counter-participants and the domains.
|Filters by the specified counter-participants and domains. If the counter-participant and / or
|domains are empty, it considers all domains and participants known to the participant, regardless of
|whether they share contracts with the participant.
|Even if some participants may not be connected to some domains at the time the query executes, the response still
|includes them if they are known to the participant or specified in the arguments."""
)
def getNoWaitCommitmentsFrom(
domains: Seq[DomainId],
counterParticipants: Seq[ParticipantId],
): (Seq[NoWaitCommitments], Seq[WaitCommitments]) =
consoleEnvironment.run(
runner.adminCommand(
GetNoWaitCommitmentsFrom(
domains,
counterParticipants,
)
)
)
// TODO(#10436) R7: The code below should be sufficient.
@Help.Summary(
"Configure metrics for slow counter-participants (i.e., that are behind in sending commitments) and" +
"configure thresholds for when a counter-participant is deemed slow."
)
@Help.Description("""The configurations are per domain or set of domains and concern the following metrics
|issued per domain:
| - The maximum number of intervals that a distinguished participant falls
| behind. All participants that are not in the distinguished group are automatically part of the default group
| - The maximum number of intervals that a participant in the default groups falls behind
| - The number of participants in the distinguished group that are behind by at least `thresholdDistinguished`
| reconciliation intervals.
| - The number of participants not in the distinguished group that are behind by at least `thresholdDefault`
| reconciliation intervals.
| - Separate metric for each participant in `individualMetrics` argument tracking how many intervals that
|participant is behind""")
def setConfigForSlowCounterParticipants(
configs: Seq[SlowCounterParticipantDomainConfig]
): Unit = {
consoleEnvironment.run(
runner.adminCommand(
SetConfigForSlowCounterParticipants(
configs
)
)
)
}
// TODO(#10436) R7
def addConfigForSlowCounterParticipants(
counterParticipantsDistinguished: Seq[ParticipantId],
domains: Seq[DomainId],
) = ???
// TODO(#10436) R7
def removeConfigForSlowCounterParticipants(
counterParticipantsDistinguished: Seq[ParticipantId],
domains: Seq[DomainId],
) = ???
// TODO(#10436) R7
def addParticipanttoIndividualMetrics(
individualMetrics: Seq[ParticipantId],
domains: Seq[DomainId],
) = ???
// TODO(#10436) R7
def removeParticipantFromIndividualMetrics(
individualMetrics: Seq[ParticipantId],
domains: Seq[DomainId],
) = ???
// TODO(#10436) R7: The code below should be sufficient.
@Help.Summary(
"Lists for the given domains the configuration of metrics for slow counter-participants (i.e., that" +
"are behind in sending commitments)"
)
@Help.Description("""Lists the following config per domain. If `domains` is empty, the command lists config for all
domains:
"| - The participants in the distinguished group, which have two metrics:
the maximum number of intervals that a participant is behind, and the number of participants that are behind
by at least `thresholdDistinguished` reconciliation intervals
| - The participants not in the distinguished group, which have two metrics: the maximum number of intervals that a participant
| is behind, and the number of participants that are behind by at least `thresholdDefault` reconciliation intervals
| - Parameters `thresholdDistinguished` and `thresholdDefault`
| - The participants in `individualMetrics`, which have individual metrics per participant showing how many
reconciliation intervals that participant is behind""")
def getConfigForSlowCounterParticipants(
domains: Seq[DomainId]
): Seq[SlowCounterParticipantDomainConfig] = {
consoleEnvironment.run(
runner.adminCommand(
GetConfigForSlowCounterParticipants(
domains
)
)
)
}
case class SlowCounterParticipantInfo(
domains: Seq[DomainId],
distinguished: Seq[ParticipantId],
default: Seq[ParticipantId],
individualMetrics: Seq[ParticipantId],
thresholdDistinguished: NonNegativeInt,
thresholdDefault: NonNegativeInt,
)
// TODO(#10436) R7: Return the slow counter participant config for the given domains and counterParticipants
// Filter the gRPC response of `getConfigForSlowCounterParticipants` with `counterParticipants`
def getConfigForSlowCounterParticipant(
domains: Seq[DomainId],
counterParticipants: Seq[ParticipantId],
): Seq[SlowCounterParticipantInfo] = Seq.empty
// TODO(#10436) R7: The code below should be sufficient.
@Help.Summary(
"Lists for every participant and domain the number of intervals that the participant is behind in sending commitments" +
"if that participant is behind by at least threshold intervals."
)
@Help.Description("""If `counterParticipants` is empty, the command considers all counter-participants.
|If `domains` is empty, the command considers all domains.
|If `threshold` is not set, the command considers 0.
|Counter-participants that never sent a commitment appear in the output only if they're explicitly given in
|`counterParticipants`. For such counter-participant that never sent a commitment, the output shows they are
|behind by MaxInt""")
def getIntervalsBehindForCounterParticipants(
counterParticipants: Seq[ParticipantId],
domains: Seq[DomainId],
threshold: Option[NonNegativeInt],
): Seq[CounterParticipantInfo] = {
consoleEnvironment.run(
runner.adminCommand(
GetIntervalsBehindForCounterParticipants(
counterParticipants,
domains,
threshold.getOrElse(NonNegativeInt.zero),
)
)
)
}
}
class ParticipantReplicationAdministrationGroup(

View File

@ -6,10 +6,16 @@ package com.digitalasset.canton.environment
import cats.data.EitherT
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.metrics.api.{MetricsInfoFilter, HistogramInventory}
import com.daml.metrics.api.{HistogramInventory, MetricsInfoFilter}
import com.digitalasset.canton.concurrent.*
import com.digitalasset.canton.config.*
import com.digitalasset.canton.console.{ConsoleEnvironment, ConsoleOutput, GrpcAdminCommandRunner, HealthDumpGenerator, StandardConsoleOutput}
import com.digitalasset.canton.console.{
ConsoleEnvironment,
ConsoleOutput,
GrpcAdminCommandRunner,
HealthDumpGenerator,
StandardConsoleOutput,
}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.domain.mediator.{MediatorNodeBootstrap, MediatorNodeParameters}

View File

@ -7,9 +7,6 @@ import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.digitalasset.canton.domain.metrics.{MediatorHistograms, SequencerHistograms}
import com.digitalasset.canton.participant.metrics.ParticipantHistograms
/** Pre-register histogram metrics
*
* Open telemetry requires us to define the histogram buckets before defining the actual metric.
@ -23,5 +20,4 @@ class CantonHistograms()(implicit val inventory: HistogramInventory) {
private[metrics] val mediator: MediatorHistograms = new MediatorHistograms(prefix)
private[metrics] val sequencer: SequencerHistograms = new SequencerHistograms(prefix)
}

View File

@ -4,8 +4,11 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.MetricHandle.LabeledMetricsFactory
import com.daml.metrics.api.opentelemetry.{
OpenTelemetryMetricsFactory,
QualificationFilteringMetricsFactory,
}
import com.daml.metrics.api.{MetricQualification, MetricsContext, MetricsInfoFilter}
import com.daml.metrics.api.opentelemetry.{OpenTelemetryMetricsFactory, QualificationFilteringMetricsFactory}
import com.daml.metrics.grpc.DamlGrpcServerMetrics
import com.daml.metrics.{HealthMetrics, HistogramDefinition, MetricsFilterConfig}
import com.digitalasset.canton.config.NonNegativeFiniteDuration
@ -210,7 +213,7 @@ final case class MetricsRegistry(
if (testingSupportAdhocMetrics) Some(logger.underlying) else None,
globalMetricsContext = extraContext,
),
baseFilter
baseFilter,
)
}
}

View File

@ -3,10 +3,9 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.MetricQualification
import com.daml.metrics.api.testing.InMemoryMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricQualification, MetricsInfoFilter}
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.telemetry.MetricsInfoFilter
import io.opentelemetry.api.OpenTelemetry
import org.scalatest.wordspec.AnyWordSpec

View File

@ -6,12 +6,14 @@ syntax = "proto3";
package com.digitalasset.canton.domain.api.v30;
import "com/digitalasset/canton/protocol/v30/sequencing.proto";
import "com/digitalasset/canton/protocol/v30/topology.proto";
service SequencerConnectService {
rpc Handshake(SequencerConnect.HandshakeRequest) returns (SequencerConnect.HandshakeResponse);
rpc GetDomainId(SequencerConnect.GetDomainIdRequest) returns (SequencerConnect.GetDomainIdResponse);
rpc GetDomainParameters(SequencerConnect.GetDomainParametersRequest) returns (SequencerConnect.GetDomainParametersResponse);
rpc VerifyActive(SequencerConnect.VerifyActiveRequest) returns (SequencerConnect.VerifyActiveResponse);
rpc RegisterOnboardingTopologyTransactions(SequencerConnect.RegisterOnboardingTopologyTransactionsRequest) returns (SequencerConnect.RegisterOnboardingTopologyTransactionsResponse);
}
message SequencerConnect {
@ -55,4 +57,10 @@ message SequencerConnect {
Failure failure = 2;
}
}
message RegisterOnboardingTopologyTransactionsRequest {
repeated com.digitalasset.canton.protocol.v30.SignedTopologyTransaction topology_transactions = 1;
}
message RegisterOnboardingTopologyTransactionsResponse {}
}

View File

@ -3,9 +3,15 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.MetricHandle.{Counter, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Counter, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class DbStorageHistograms(val parent: MetricName)(implicit
inventory: HistogramInventory

View File

@ -4,10 +4,16 @@
package com.digitalasset.canton.metrics
import cats.Eval
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.digitalasset.canton.SequencerAlias
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
import com.digitalasset.canton.SequencerAlias
import scala.collection.concurrent.TrieMap

View File

@ -7,8 +7,8 @@ import com.daml.error.utils.DecodedCantonError
import com.digitalasset.canton.error.ErrorCodeUtils.errorCategoryFromString
import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.sequencing.authentication.MemberAuthentication.{
MemberAccessDisabled,
MissingToken,
ParticipantAccessDisabled,
}
import com.digitalasset.canton.sequencing.authentication.grpc.Constant
import com.digitalasset.canton.tracing.TraceContext
@ -201,7 +201,7 @@ object GrpcError {
case INVALID_ARGUMENT | UNAUTHENTICATED
if !checkAuthenticationError(
optTrailers,
Seq(MissingToken.toString, ParticipantAccessDisabled.toString),
Seq(MissingToken.toString, MemberAccessDisabled.toString),
) =>
GrpcClientError(request, serverName, status, optTrailers, decodedError)

View File

@ -72,18 +72,11 @@ object MemberAuthentication extends MemberAuthentication {
show"Domain id $domainId provided by member $member does not match the domain id of the domain the ${member.description} is trying to connect to",
"NonMatchingDomainId",
)
final case class ParticipantAccessDisabled(participantId: ParticipantId)
final case class MemberAccessDisabled(member: Member)
extends AuthenticationError(
s"Participant $participantId access is disabled",
"ParticipantAccessDisabled",
s"Member $member access is disabled",
"MemberAccessDisabled",
)
final case class MediatorAccessDisabled(mediator: MediatorId)
extends AuthenticationError(
s"Mediator $mediator access is disabled",
"MediatorAccessDisabled",
)
final case class TokenVerificationException(member: String)
extends AuthenticationError(
s"Due to an internal error, the server side token lookup for member $member failed",

View File

@ -45,9 +45,16 @@ object SendResult {
case UnlessShutdown.Outcome(SendResult.Success(deliver)) =>
logger.trace(s"$sendDescription was sequenced at ${deliver.timestamp}")
case UnlessShutdown.Outcome(SendResult.Error(error)) =>
logger.warn(
s"$sendDescription was rejected by the sequencer at ${error.timestamp} because [${error.reason}]"
)
error match {
case DeliverError(_, _, _, _, SequencerErrors.AggregateSubmissionAlreadySent(_)) =>
logger.info(
s"$sendDescription was rejected by the sequencer at ${error.timestamp} because [${error.reason}]"
)
case _ =>
logger.warn(
s"$sendDescription was rejected by the sequencer at ${error.timestamp} because [${error.reason}]"
)
}
case UnlessShutdown.Outcome(SendResult.Timeout(sequencerTime)) =>
logger.warn(s"$sendDescription timed out at $sequencerTime")
case UnlessShutdown.AbortedDueToShutdown =>

View File

@ -28,7 +28,7 @@ object ForceFlag {
* other choice, eg. when importing a topology snapshot.</li>
* </ul>
*/
private[topology] val all: Map[v30.ForceFlag, ForceFlag] =
val all: Map[v30.ForceFlag, ForceFlag] =
Seq[ForceFlag](AlienMember, LedgerTimeRecordTimeToleranceIncrease)
.map(ff => ff.toProtoV30 -> ff)
.toMap
@ -55,7 +55,7 @@ object ForceFlags {
val none: ForceFlags = ForceFlags()
/** @see [[ForceFlag.all]] */
private[topology] val all: ForceFlags = ForceFlags(ForceFlag.all.values.toSet)
val all: ForceFlags = ForceFlags(ForceFlag.all.values.toSet)
def fromProtoV30(flags: Seq[v30.ForceFlag]): ParsingResult[ForceFlags] =
flags.traverse(ForceFlag.fromProtoV30).map(flags => ForceFlags(flags.toSet))

View File

@ -35,4 +35,8 @@ trait TopologyTransactionProcessingSubscriber {
transactions: Seq[GenericSignedTopologyTransaction],
)(implicit traceContext: TraceContext): FutureUnlessShutdown[Unit]
/** The order in which the subscriber should be executed among all the subscriptions.
* Lower values are executed first.
*/
def executionOrder: Int = 10
}

View File

@ -7,11 +7,13 @@ import cats.syntax.functorFilter.*
import cats.syntax.parallel.*
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
import com.daml.nonempty.NonEmptyReturningOps.*
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.concurrent.{DirectExecutionContext, FutureSupervisor}
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.crypto.CryptoPureApi
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.environment.CantonNodeParameters
import com.digitalasset.canton.lifecycle.{FlagCloseable, FutureUnlessShutdown, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
@ -42,8 +44,7 @@ import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.digitalasset.canton.util.{ErrorUtil, FutureUtil, MonadUtil, SimpleExecutionQueue}
import com.digitalasset.canton.version.ProtocolVersion
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ListBuffer
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.concurrent.{ExecutionContext, Future}
import scala.math.Ordering.Implicits.*
@ -72,7 +73,14 @@ class TopologyTransactionProcessor(
private val initialised = new AtomicBoolean(false)
private val listeners = ListBuffer[TopologyTransactionProcessingSubscriber]()
// Outer list designate listener groups, different groups are to be executed sequentially,
// priority is determined by the `TopologyTransactionProcessingSubscriber.executionOrder`.
// Inner list (subscribers with the same priority) can be executed in parallel.
// Code calling the listeners is assuming that the structure is kept as described above,
// with no further manipulations needed.
private val listeners = new AtomicReference(
List[NonEmpty[List[TopologyTransactionProcessingSubscriber]]]()
)
private val timeAdjuster =
new TopologyTimestampPlusEpsilonTracker(timeouts, loggerFactory, futureSupervisor)
@ -86,7 +94,16 @@ class TopologyTransactionProcessor(
/** assumption: subscribers don't do heavy lifting */
final def subscribe(listener: TopologyTransactionProcessingSubscriber): Unit = {
listeners += listener
listeners
.getAndUpdate(oldListeners =>
// we add the new listener to the pile, and then re-sort the list into groups by execution order
(oldListeners.flatten :+ listener).distinct // .distinct guards against double subscription
.groupBy1(_.executionOrder)
.toList
.sortBy { case (order, _) => order }
.map { case (_, groupListeners) => groupListeners }
)
.discard
}
private def initialise(
@ -178,7 +195,7 @@ class TopologyTransactionProcessor(
logger.debug(
s"Updating listener heads to ${effective} and ${approximate}. Potential changes: ${potentialChanges}"
)
listeners.toList.foreach(_.updateHead(effective, approximate, potentialChanges))
listeners.get().flatten.foreach(_.updateHead(effective, approximate, potentialChanges))
}
/** Inform the topology manager where the subscription starts when using [[processEnvelopes]] rather than [[createHandler]] */
@ -398,19 +415,24 @@ class TopologyTransactionProcessor(
effectiveTimestamp,
validated,
)
_ = logger.debug(
s"Notifying listeners of ${sequencingTimestamp}, ${effectiveTimestamp} and SC ${sc}"
)
validTransactions = validated.collect {
case tx if tx.rejectionReason.isEmpty => tx.transaction
}
_ <- performUnlessClosingUSF("notify-topology-transaction-observers")(
listeners.toList.parTraverse_(
_.observed(
sequencingTimestamp,
effectiveTimestamp,
sc,
validated.collect { case tx if tx.rejectionReason.isEmpty => tx.transaction },
MonadUtil.sequentialTraverse_(listeners.get())(listenerGroup => {
logger.debug(
s"Notifying listener group (${listenerGroup.head1.executionOrder}) of ${sequencingTimestamp}, ${effectiveTimestamp} and SC ${sc}"
)
)
listenerGroup.forgetNE.parTraverse_(
_.observed(
sequencingTimestamp,
effectiveTimestamp,
sc,
validTransactions,
)
)
})
)
// TODO(#15089): do not notify the terminate processing for replayed events.

View File

@ -450,8 +450,6 @@ object TopologyStore {
TopologyMapping.Code.OwnerToKeyMapping,
// TODO(#14060) - potentially revisit this once we implement TopologyStore.filterInitialParticipantDispatchingTransactions
TopologyMapping.Code.NamespaceDelegation,
TopologyMapping.Code.IdentifierDelegation,
TopologyMapping.Code.DecentralizedNamespaceDefinition,
)
def filterInitialParticipantDispatchingTransactions(

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: CantonExamples

View File

@ -16,7 +16,8 @@ import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.sequencing.protocol.{HandshakeRequest, HandshakeResponse}
import com.digitalasset.canton.sequencing.{GrpcSequencerConnection, SequencerConnection}
import com.digitalasset.canton.topology.{DomainId, ParticipantId, SequencerId}
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.topology.{DomainId, Member, ParticipantId, SequencerId}
import com.digitalasset.canton.tracing.{TraceContext, TracingConfig}
import scala.concurrent.{ExecutionContextExecutor, Future}
@ -61,6 +62,12 @@ trait SequencerConnectClient extends NamedLogging with AutoCloseable {
case v30.SequencerConnect.VerifyActiveResponse.Value.Empty =>
Left(Error.InvalidResponse("Missing response from VerifyActive"))
}
def registerOnboardingTopologyTransactions(
domainAlias: DomainAlias,
member: Member,
topologyTransactions: Seq[GenericSignedTopologyTransaction],
)(implicit traceContext: TraceContext): EitherT[Future, Error, Unit]
}
object SequencerConnectClient {

View File

@ -22,7 +22,8 @@ import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.sequencing.GrpcSequencerConnection
import com.digitalasset.canton.sequencing.protocol.{HandshakeRequest, HandshakeResponse}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.{DomainId, ParticipantId, SequencerId}
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.topology.{DomainId, Member, ParticipantId, SequencerId}
import com.digitalasset.canton.tracing.{TraceContext, TracingConfig}
import com.digitalasset.canton.util.retry.RetryUtil.AllExnRetryable
import com.digitalasset.canton.util.retry.Success
@ -203,6 +204,34 @@ class GrpcSequencerConnectClient(
.apply(verifyActive(), AllExnRetryable)
).thereafter(_ => closeableChannel.close())
}
override def registerOnboardingTopologyTransactions(
domainAlias: DomainAlias,
member: Member,
topologyTransactions: Seq[GenericSignedTopologyTransaction],
)(implicit traceContext: TraceContext): EitherT[Future, Error, Unit] = {
val interceptor = new SequencerConnectClientInterceptor(member, loggerFactory)
CantonGrpcUtil
.sendSingleGrpcRequest(
serverName = domainAlias.unwrap,
requestDescription = "register-onboarding-topology-transactions",
channel = builder.build(),
stubFactory = channel =>
v30.SequencerConnectServiceGrpc.stub(ClientInterceptors.intercept(channel, interceptor)),
timeout = timeouts.network.unwrap,
logger = logger,
logPolicy = CantonGrpcUtil.silentLogPolicy,
retryPolicy = CantonGrpcUtil.RetryPolicy.noRetry,
)(
_.registerOnboardingTopologyTransactions(
SequencerConnect.RegisterOnboardingTopologyTransactionsRequest(
topologyTransactions.map(_.toProtoV30)
)
)
)
.bimap(err => Error.Transport(err.toString), _ => ())
}
}
object GrpcSequencerConnectClient {

View File

@ -5,13 +5,13 @@ package com.digitalasset.canton.common.domain.grpc
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.sequencing.authentication.grpc.Constant
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.topology.Member
import io.grpc.ClientCall.Listener
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall
import io.grpc.*
class SequencerConnectClientInterceptor(
participantId: ParticipantId,
member: Member,
val loggerFactory: NamedLoggerFactory,
) extends ClientInterceptor
with NamedLogging {
@ -22,7 +22,7 @@ class SequencerConnectClientInterceptor(
): ClientCall[ReqT, RespT] =
new SimpleForwardingClientCall[ReqT, RespT](next.newCall(method, callOptions)) {
override def start(responseListener: Listener[RespT], headers: Metadata): Unit = {
headers.put(Constant.MEMBER_ID_METADATA_KEY, participantId.toProtoPrimitive)
headers.put(Constant.MEMBER_ID_METADATA_KEY, member.toProtoPrimitive)
super.start(responseListener, headers)
}

View File

@ -367,7 +367,7 @@ object TaskSchedulerTest {
class MockTaskSchedulerMetrics extends TaskSchedulerMetrics {
val prefix: MetricName = MetricName("test")
override val sequencerCounterQueue: metrics.api.MetricHandle.Counter = NoOpCounter(
prefix :+ "counter"
MetricInfo(prefix :+ "counter", "", MetricQualification.Debug)
)
override def taskQueue(size: () => Int): Gauge.CloseableGauge =

View File

@ -524,7 +524,6 @@ trait TopologyStoreTest extends AsyncWordSpec with TopologyStoreTestBase {
)
onboardingTransactionUnlessShutdown.onShutdown(fail()) shouldBe Seq(
tx4_DND,
tx5_DTC,
tx6_DTC_Update,
)

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: medical-records

View File

@ -172,7 +172,7 @@ private[mediator] class ConfirmationResponseProcessor(
val timeout = responseAggregation.timeout(version = timestamp)
for {
snapshot <- crypto.ips.awaitSnapshotUS(timestamp)(traceContext)
snapshot <- crypto.ips.awaitSnapshotUS(responseAggregation.requestId.unwrap)(traceContext)
domainParameters <- FutureUnlessShutdown.outcomeF(
snapshot
@ -298,7 +298,7 @@ private[mediator] class ConfirmationResponseProcessor(
): Future[Either[Option[MediatorVerdict.MediatorReject], Unit]] = {
val topologySnapshot = snapshot.ipsSnapshot
(for {
// Bail out, if this mediator or group is passive, except is the mediator itself is passive in an active group.
// Bail out, if this mediator or group is passive, except if the mediator itself is passive in an active group.
isActive <- EitherT.right[Option[MediatorVerdict.MediatorReject]](
topologySnapshot.isMediatorActive(mediatorId)
)

View File

@ -55,7 +55,7 @@ private[mediator] class MediatorEventsProcessor(
lastEvent = events.last1
determinedStages <- FutureUnlessShutdown.pure(
uniqueEnvelopesByEvent.flatMap { case (event, envelope) => determine(event, envelope) }
uniqueEnvelopesByEvent.flatMap { case (event, envelopes) => determine(event, envelopes) }
)
// we need to advance time on the confirmation response even if there is no relevant mediator events
@ -103,7 +103,7 @@ private[mediator] class MediatorEventsProcessor(
private def determine(
tracedProtocolEvent: TracedProtocolEvent,
envelope: Seq[DefaultOpenEnvelope],
envelopes: Seq[DefaultOpenEnvelope],
): Seq[Traced[MediatorEvent]] = {
implicit val traceContext: TraceContext = tracedProtocolEvent.traceContext
val event = tracedProtocolEvent.value
@ -111,7 +111,8 @@ private[mediator] class MediatorEventsProcessor(
case deliver: Deliver[?] => deliver.topologyTimestampO
case _ => None
}
val stages = extractMediatorEvents(event.counter, event.timestamp, topologyTimestampO, envelope)
val stages =
extractMediatorEvents(event.counter, event.timestamp, topologyTimestampO, envelopes)
stages.map(Traced(_))
}

View File

@ -40,7 +40,7 @@ trait ResponseAggregator extends HasLoggerName with Product with Serializable {
def isFinalized: Boolean
/** Validate the additional confirmation response received and record if unless already finalized.
/** Validate the additional confirmation response received and record unless already finalized.
*/
def validateAndProgress(
responseTimestamp: CantonTimestamp,

View File

@ -56,7 +56,12 @@ class BlockMetrics(
)
)(MetricsContext.Empty)
private val ackGaugeInfo = MetricInfo(prefix :+ "acknowledgments_micros", "Acknowledgments by senders in Micros", MetricQualification.Latency, labelsWithDescription = Map("sender" -> "The sender of the acknowledgment"))
private val ackGaugeInfo = MetricInfo(
prefix :+ "acknowledgments_micros",
"Acknowledgments by senders in Micros",
MetricQualification.Latency,
labelsWithDescription = Map("sender" -> "The sender of the acknowledgment"),
)
def updateAcknowledgementGauge(sender: String, value: Long): Unit =
acknowledgments

View File

@ -5,7 +5,13 @@ package com.digitalasset.canton.domain.metrics
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{MetricInfo, MetricName, MetricQualification, MetricsContext, HistogramInventory}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
import com.daml.metrics.grpc.{DamlGrpcServerMetrics, GrpcServerMetrics}
import com.daml.metrics.{CacheMetrics, HealthMetrics}
import com.digitalasset.canton.environment.BaseMetrics

View File

@ -656,6 +656,7 @@ class SequencerNodeBootstrap(
arguments.metrics,
indexedDomain,
syncCrypto,
domainTopologyManager,
domainTopologyStore,
topologyClient,
topologyProcessor,

View File

@ -127,6 +127,7 @@ class SequencerRuntime(
val metrics: SequencerMetrics,
indexedDomain: IndexedDomain,
syncCrypto: DomainSyncCryptoClient,
domainTopologyManager: DomainTopologyManager,
topologyStore: TopologyStore[DomainStore],
topologyClient: DomainTopologyClientWithInit,
topologyProcessor: TopologyTransactionProcessor,
@ -208,6 +209,7 @@ class SequencerRuntime(
sequencerDomainParamsLookup,
localNodeParameters,
staticDomainParameters.protocolVersion,
domainTopologyManager,
topologyStateForInitializationService,
loggerFactory,
)
@ -322,6 +324,7 @@ class SequencerRuntime(
domainId,
sequencerId,
staticDomainParameters,
domainTopologyManager,
syncCrypto,
loggerFactory,
)(
@ -369,9 +372,9 @@ class SequencerRuntime(
if (localNodeParameters.useUnifiedSequencer) {
logger.info("Subscribing to topology transactions for auto-registering members")
// TODO(#18399): This listener runs after the snapshot became available, thus can be late with registering a member,
// if a concurrent code is already using that member. Need to find a solution to that.
topologyProcessor.subscribe(new TopologyTransactionProcessingSubscriber {
override val executionOrder: Int = 5
override def observed(
sequencedTimestamp: SequencedTime,
effectiveTimestamp: EffectiveTime,
@ -391,7 +394,7 @@ class SequencerRuntime(
val f = possibleNewMembers
.parTraverse_ { member =>
logger.info(s"Topology change has triggered sequencer registration of member $member")
sequencer.registerMember(member)
sequencer.registerMemberInternal(member, effectiveTimestamp.value)
}
.valueOr(e =>
ErrorUtil.internalError(new RuntimeException(s"Failed to register member: $e"))

View File

@ -198,11 +198,11 @@ class MemberAuthenticationService(
member match {
case participant: ParticipantId =>
EitherT(isParticipantActive(participant).map {
if (_) Right(()) else Left(ParticipantAccessDisabled(participant))
Either.cond(_, (), MemberAccessDisabled(participant))
})
case mediator: MediatorId =>
EitherT(isMediatorActive(mediator).map {
if (_) Right(()) else Left(MediatorAccessDisabled(mediator))
Either.cond(_, (), MemberAccessDisabled(mediator))
})
case _ =>
// TODO(#4933) check if sequencer is active

View File

@ -18,13 +18,7 @@ import com.digitalasset.canton.sequencing.authentication.MemberAuthentication.{
TokenVerificationException,
}
import com.digitalasset.canton.sequencing.authentication.grpc.Constant
import com.digitalasset.canton.topology.{
AuthenticatedMember,
DomainId,
Member,
UnauthenticatedMemberId,
UniqueIdentifier,
}
import com.digitalasset.canton.topology.{DomainId, Member, UniqueIdentifier}
import com.digitalasset.canton.tracing.TraceContext
import io.grpc.*
@ -69,21 +63,15 @@ class SequencerAuthenticationServerInterceptor(
.map(DomainId(_))
.leftMap(err => VerifyTokenError.GeneralError(err.message))
.toEitherT[Future]
storedTokenO <- member match {
case _: UnauthenticatedMemberId =>
EitherT.pure[Future, VerifyTokenError](None: Option[StoredAuthenticationToken])
case authenticatedMember: AuthenticatedMember =>
for {
token <- tokenO
.toRight(
VerifyTokenError.GeneralError("Authentication headers are missing for token")
)
.toEitherT[Future]
storedToken <- verifyToken(authenticatedMember, intendedDomainId, token).map(
_.some
storedTokenO <-
for {
token <- tokenO
.toRight[VerifyTokenError](
VerifyTokenError.GeneralError("Authentication headers are missing for token")
)
} yield storedToken
}
.toEitherT[Future]
storedToken <- verifyToken(member, intendedDomainId, token)
} yield Some(storedToken)
} yield {
val contextWithAuthorizedMember = originalContext
.withValue(IdentityContextHelper.storedAuthenticationTokenContextKey, storedTokenO)
@ -132,7 +120,7 @@ class SequencerAuthenticationServerInterceptor(
/** Checks the supplied authentication token for the member. */
private def verifyToken(
member: AuthenticatedMember,
member: Member,
intendedDomain: DomainId,
token: AuthenticationToken,
)(implicit

View File

@ -277,8 +277,8 @@ class DatabaseSequencer(
/** Package private to use access method in tests, see `TestDatabaseSequencerWrapper`.
*/
final private[sequencer] def registerMemberInternal(member: Member, timestamp: CantonTimestamp)(
implicit traceContext: TraceContext
override final def registerMemberInternal(member: Member, timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, RegisterError, Unit] = {
EitherT
.right[RegisterError](store.registerMember(member, timestamp))

View File

@ -88,10 +88,17 @@ trait Sequencer
* Unauthenticated members are registered at the current time using sequencer's `clock`.
* Idempotent, can be called multiple times for the same member.
*/
// TODO(#18399): This method is pretty much useless,
// will be fully useless with unauth. members gone,
// to be removed in favor of `registerMemberInternal`.
def registerMember(member: Member)(implicit
traceContext: TraceContext
): EitherT[Future, RegisterError, Unit]
def registerMemberInternal(member: Member, timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, RegisterError, Unit]
def sendAsyncSigned(signedSubmission: SignedContent[SubmissionRequest])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncError, Unit]

View File

@ -162,8 +162,7 @@ class GrpcSequencerAuthenticationService(
def maliciousOrFaulty(): Status =
Status.INTERNAL.withDescription(err.reason)
err match {
case MemberAuthentication.ParticipantAccessDisabled(_) |
MemberAuthentication.MediatorAccessDisabled(_) =>
case MemberAuthentication.MemberAccessDisabled(_) =>
Status.PERMISSION_DENIED.withDescription(err.reason)
case MemberAuthentication.NonMatchingDomainId(_, _) =>
Status.FAILED_PRECONDITION.withDescription(err.reason)

View File

@ -5,6 +5,8 @@ package com.digitalasset.canton.domain.sequencing.service
import cats.data.EitherT
import cats.syntax.either.*
import cats.syntax.traverse.*
import com.digitalasset.canton.ProtoDeserializationError.ProtoDeserializationFailure
import com.digitalasset.canton.crypto.DomainSyncCryptoClient
import com.digitalasset.canton.domain.api.v30.SequencerConnect
import com.digitalasset.canton.domain.api.v30.SequencerConnect.GetDomainParametersResponse.Parameters
@ -13,14 +15,21 @@ import com.digitalasset.canton.domain.api.v30.SequencerConnect.{
GetDomainIdResponse,
GetDomainParametersRequest,
GetDomainParametersResponse,
RegisterOnboardingTopologyTransactionsResponse,
}
import com.digitalasset.canton.domain.api.v30 as proto
import com.digitalasset.canton.domain.sequencing.authentication.grpc.IdentityContextHelper
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.topology.{DomainId, ParticipantId, SequencerId}
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.store.TopologyStore
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.topology.transaction.{SignedTopologyTransaction, TopologyMapping}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.util.EitherTUtil
import com.digitalasset.canton.version.{ProtocolVersion, ProtocolVersionValidation}
import io.grpc.{Status, StatusRuntimeException}
import scala.concurrent.{ExecutionContext, Future}
@ -30,6 +39,7 @@ class GrpcSequencerConnectService(
domainId: DomainId,
sequencerId: SequencerId,
staticDomainParameters: StaticDomainParameters,
domainTopologyManager: DomainTopologyManager,
cryptoApi: DomainSyncCryptoClient,
protected val loggerFactory: NamedLoggerFactory,
)(implicit ec: ExecutionContext)
@ -100,6 +110,90 @@ class GrpcSequencerConnectService(
)
}
override def registerOnboardingTopologyTransactions(
request: SequencerConnect.RegisterOnboardingTopologyTransactionsRequest
): Future[SequencerConnect.RegisterOnboardingTopologyTransactionsResponse] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
val resultET = for {
member <- EitherT.fromEither[Future](
IdentityContextHelper.getCurrentStoredMember.toRight(
invalidRequest("Unable to find member id in gRPC context")
)
)
isKnown <- EitherT.right(
cryptoApi.ips.headSnapshot.isMemberKnown(member)
)
_ <- EitherTUtil.condUnitET[Future](
!isKnown,
failedPrecondition(s"Member $member is already known on the domain"),
)
transactions <- CantonGrpcUtil.mapErrNew(
request.topologyTransactions
.traverse(
SignedTopologyTransaction
.fromProtoV30(ProtocolVersionValidation(serverProtocolVersion), _)
)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
_ <- checkForOnlyOnboardingTransactions(member, transactions)
_ <- CantonGrpcUtil.mapErrNewETUS(
domainTopologyManager
.add(transactions, ForceFlags.all, expectFullAuthorization = false)
)
} yield RegisterOnboardingTopologyTransactionsResponse.defaultInstance
EitherTUtil.toFuture(resultET)
}
private def checkForOnlyOnboardingTransactions(
member: Member,
transactions: Seq[GenericSignedTopologyTransaction],
): EitherT[Future, StatusRuntimeException, Unit] = {
val unexpectedUids = transactions.filter(_.mapping.maybeUid.exists(_ != member.uid))
val unexpectedNamespaces = transactions.filter(_.mapping.namespace != member.namespace)
val expectedMappings =
if (member.code == ParticipantId.Code) TopologyStore.initialParticipantDispatchingSet
else Set(TopologyMapping.Code.NamespaceDelegation, TopologyMapping.Code.OwnerToKeyMapping)
val submittedMappings = transactions.map(_.mapping.code).toSet
val unexpectedMappings = submittedMappings -- expectedMappings
val missingMappings = expectedMappings -- submittedMappings
val unexpectedProposals = transactions.filter(_.isProposal)
val resultET = for {
_ <- EitherTUtil.condUnitET[Future](
unexpectedMappings.isEmpty,
s"Unexpected topology mappings for onboarding $member: $unexpectedMappings",
)
_ <- EitherTUtil.condUnitET[Future](
unexpectedUids.isEmpty,
s"Mappings for unexpected UIDs for onboarding $member: $unexpectedUids",
)
_ <- EitherTUtil.condUnitET[Future](
unexpectedNamespaces.isEmpty,
s"Mappings for unexpected namespaces for onboarding $member: $unexpectedNamespaces",
)
_ <- EitherTUtil.condUnitET[Future](
missingMappings.isEmpty,
s"Missing mappings for onboarding $member: $missingMappings",
)
_ <- EitherTUtil.condUnitET[Future](
unexpectedProposals.isEmpty,
s"Unexpected proposals for onboarding $member: $missingMappings",
)
} yield ()
resultET.leftMap(invalidRequest)
}
private def invalidRequest(message: String): StatusRuntimeException =
Status.INVALID_ARGUMENT.withDescription(message).asRuntimeException()
private def failedPrecondition(message: String): StatusRuntimeException =
Status.FAILED_PRECONDITION.withDescription(message).asRuntimeException()
/*
Note: we only get the participantId from the context; we have no idea
whether the member is authenticated or not.

View File

@ -113,6 +113,7 @@ object GrpcSequencerService {
domainParamsLookup: DynamicDomainParametersLookup[SequencerDomainParameters],
parameters: SequencerParameters,
protocolVersion: ProtocolVersion,
domainTopologyManager: DomainTopologyManager,
topologyStateForInitializationService: TopologyStateForInitializationService,
loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext, materializer: Materializer): GrpcSequencerService =
@ -134,6 +135,7 @@ object GrpcSequencerService {
),
domainParamsLookup,
parameters,
domainTopologyManager,
topologyStateForInitializationService,
protocolVersion,
)
@ -244,6 +246,7 @@ class GrpcSequencerService(
directSequencerSubscriptionFactory: DirectSequencerSubscriptionFactory,
domainParamsLookup: DynamicDomainParametersLookup[SequencerDomainParameters],
parameters: SequencerParameters,
domainTopologyManager: DomainTopologyManager,
topologyStateForInitializationService: TopologyStateForInitializationService,
protocolVersion: ProtocolVersion,
maxItemsInTopologyResponse: PositiveInt = PositiveInt.tryCreate(100),

View File

@ -4,10 +4,9 @@
package com.digitalasset.canton.domain.metrics
import com.daml.metrics.HealthMetrics
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.daml.metrics.grpc.DamlGrpcServerMetrics
import com.daml.metrics.api.HistogramInventory
object SequencerTestMetrics
extends SequencerMetrics(

View File

@ -8,9 +8,9 @@ import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.config.DefaultProcessingTimeouts
import com.digitalasset.canton.crypto.{Nonce, Signature}
import com.digitalasset.canton.sequencing.authentication.MemberAuthentication.{
MemberAccessDisabled,
MissingToken,
NonMatchingDomainId,
ParticipantAccessDisabled,
}
import com.digitalasset.canton.sequencing.authentication.grpc.AuthenticationTokenWithExpiry
import com.digitalasset.canton.sequencing.authentication.{AuthenticationToken, MemberAuthentication}
@ -131,8 +131,8 @@ class MemberAuthenticationServiceTest extends AsyncWordSpec with BaseTest {
"token validation should fail"
)
} yield {
generateNonceError shouldBe ParticipantAccessDisabled(p1)
validateSignatureError shouldBe ParticipantAccessDisabled(p1)
generateNonceError shouldBe MemberAccessDisabled(p1)
validateSignatureError shouldBe MemberAccessDisabled(p1)
validateTokenError shouldBe MissingToken(p1)
}
}

View File

@ -16,7 +16,6 @@ import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.sequencing.authentication.grpc.{
AuthenticationTokenManagerTest,
AuthenticationTokenWithExpiry,
SequencerClientNoAuthentication,
SequencerClientTokenAuthentication,
}
import com.digitalasset.canton.sequencing.authentication.{
@ -24,12 +23,7 @@ import com.digitalasset.canton.sequencing.authentication.{
AuthenticationTokenManagerConfig,
}
import com.digitalasset.canton.time.SimClock
import com.digitalasset.canton.topology.{
DomainId,
ParticipantId,
UnauthenticatedMemberId,
UniqueIdentifier,
}
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.{BaseTest, HasExecutionContext}
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
@ -64,6 +58,7 @@ class SequencerAuthenticationServerInterceptorTest
lazy val store: MemberAuthenticationStore = new InMemoryMemberAuthenticationStore()
lazy val domainId = DomainId(UniqueIdentifier.tryFromProtoPrimitive("popo::pipi"))
lazy val authService = new MemberAuthenticationService(
domainId,
null,
@ -171,20 +166,6 @@ class SequencerAuthenticationServerInterceptorTest
client.hello(Hello.Request("hi")).futureValue.msg shouldBe "hello back"
}
"succeed request if client does not need authentication" in new GrpcContext {
store
.saveToken(StoredAuthenticationToken(participantId, neverExpire, token.token))
.futureValue
val clientAuthentication =
new SequencerClientNoAuthentication(domainId, unauthenticatedMemberId)
channel = InProcessChannelBuilder
.forName(channelName)
.build()
val client = clientAuthentication(HelloServiceGrpc.stub(channel))
client.hello(Hello.Request("hi")).futureValue.msg shouldBe "hello back"
}
"fail request if participant use interceptor with incorrect token information" in new GrpcContext {
store
.saveToken(StoredAuthenticationToken(participantId, neverExpire, token.token))

View File

@ -114,6 +114,14 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
newlyRegisteredMembers.add(member)
EitherT.pure(())
}
override def registerMemberInternal(member: Member, timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, RegisterError, Unit] = {
newlyRegisteredMembers.add(member)
EitherT.pure(())
}
override def readInternal(member: Member, offset: SequencerCounter)(implicit
traceContext: TraceContext
): EitherT[Future, CreateSubscriptionError, Sequencer.EventSource] =

View File

@ -100,6 +100,7 @@ final case class Env(loggerFactory: NamedLoggerFactory)(implicit
def timeouts = DefaultProcessingTimeouts.testing
private val futureSupervisor = FutureSupervisor.Noop
private val topologyClient = mock[DomainTopologyClient]
private val mockDomainTopologyManager = mock[DomainTopologyManager]
private val mockTopologySnapshot = mock[TopologySnapshot]
private val confirmationRequestsMaxRate =
DynamicDomainParameters.defaultConfirmationRequestsMaxRate
@ -183,6 +184,7 @@ final case class Env(loggerFactory: NamedLoggerFactory)(implicit
sequencerSubscriptionFactory,
domainParamsLookup,
params,
mockDomainTopologyManager,
topologyStateForInitializationService,
BaseTest.testedProtocolVersion,
)
@ -191,6 +193,7 @@ final case class Env(loggerFactory: NamedLoggerFactory)(implicit
sequencerId = sequencerId,
staticDomainParameters = BaseTest.defaultStaticDomainParameters,
cryptoApi = cryptoApi,
domainTopologyManager = mockDomainTopologyManager,
loggerFactory = loggerFactory,
)

View File

@ -101,6 +101,7 @@ class GrpcSequencerServiceTest
val sequencerSubscriptionFactory = mock[DirectSequencerSubscriptionFactory]
private val topologyClient = mock[DomainTopologyClient]
private val mockTopologySnapshot = mock[TopologySnapshot]
private val mockDomainTopologyManager = mock[DomainTopologyManager]
when(topologyClient.currentSnapshotApproximation(any[TraceContext]))
.thenReturn(mockTopologySnapshot)
when(
@ -169,6 +170,7 @@ class GrpcSequencerServiceTest
sequencerSubscriptionFactory,
domainParamLookup,
params,
mockDomainTopologyManager,
topologyInitService,
BaseTest.testedProtocolVersion,
maxItemsInTopologyResponse = PositiveInt.tryCreate(maxItemsInTopologyBatch),

View File

@ -5,7 +5,15 @@ package com.digitalasset.canton.domain.sequencing.sequencer.reference
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName, MetricsContext}
import com.digitalasset.canton.config.{BatchAggregatorConfig, BatchingConfig, ConnectionAllocation, DbParametersConfig, ProcessingTimeout, QueryCostMonitoringConfig, StorageConfig}
import com.digitalasset.canton.config.{
BatchAggregatorConfig,
BatchingConfig,
ConnectionAllocation,
DbParametersConfig,
ProcessingTimeout,
QueryCostMonitoringConfig,
StorageConfig,
}
import com.digitalasset.canton.domain.block.BlockFormat.DefaultFirstBlockHeight
import com.digitalasset.canton.domain.block.{SequencerDriver, SequencerDriverFactory}
import com.digitalasset.canton.domain.sequencing.sequencer.reference.BaseReferenceSequencerDriverFactory.createClock

View File

@ -7,11 +7,11 @@ import com.daml.lf.data.Ref.{ApplicationId, ParticipantId}
import com.daml.lf.data.Time.Timestamp
import spray.json.{DefaultJsonProtocol, RootJsonFormat, *}
import DefaultJsonProtocol.*
import java.time.Instant
import scala.util.Try
import DefaultJsonProtocol.*
object MeteringReport {
type Scheme = String
@ -38,10 +38,11 @@ object MeteringReport {
final case class ApplicationReport(application: ApplicationId, events: Long)
implicit val TimestampFormat: RootJsonFormat[Timestamp] =
stringJsonFormat(v => for {
instant <- Try(Instant.parse(v)).toEither.left.map(_.getMessage)
timestamp <- Timestamp.fromInstant(instant)
} yield timestamp
stringJsonFormat(v =>
for {
instant <- Try(Instant.parse(v)).toEither.left.map(_.getMessage)
timestamp <- Timestamp.fromInstant(instant)
} yield timestamp
)(_.toString)
implicit val ApplicationIdFormat: RootJsonFormat[ApplicationId] =

View File

@ -5,8 +5,8 @@ package com.digitalasset.canton.platform.apiserver
import com.daml.error.{DamlError, ErrorGenerator}
import com.daml.ledger.resources.ResourceOwner
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.testing.{InMemoryMetricsFactory, MetricValues}
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.digitalasset.canton.config.RequireTypes.Port
import com.digitalasset.canton.domain.api.v0
import com.digitalasset.canton.domain.api.v0.Hello
@ -23,11 +23,7 @@ import com.digitalasset.canton.logging.{
NamedLoggerFactory,
SuppressingLogger,
}
import com.digitalasset.canton.metrics.{
HistogramInventory,
LedgerApiServerHistograms,
LedgerApiServerMetrics,
}
import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
import com.digitalasset.canton.platform.apiserver.GrpcServerSpec.*
import com.digitalasset.canton.platform.apiserver.configuration.RateLimitingConfig
import com.digitalasset.canton.platform.apiserver.ratelimiting.{

View File

@ -19,6 +19,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import spray.json.{JsArray, JsBoolean, JsNull, JsNumber, JsObject, JsString, JsValue, enrichAny}
import java.time.Instant
class JcsSpec extends AnyWordSpec with Matchers {
private val uno: JsValue = JsNumber(1)
@ -72,8 +74,8 @@ class JcsSpec extends AnyWordSpec with Matchers {
}
"serialize report" in {
val application = Ref.ApplicationId.assertFromString("a0")
val from = Timestamp.assertFromString("2022-01-01T00:00:00Z")
val to = Timestamp.assertFromString("2022-01-01T00:00:00Z")
val from = Timestamp.assertFromInstant(Instant.parse("2022-01-01T00:00:00Z"))
val to = Timestamp.assertFromInstant(Instant.parse("2022-01-01T00:00:00Z"))
val report = ParticipantReport(
participant = Ref.ParticipantId.assertFromString("p0"),
request = Request(from, Some(to), Some(application)),

View File

@ -6,7 +6,7 @@ package com.digitalasset.canton.platform.apiserver.ratelimiting
import com.daml.executors.executors.{NamedExecutor, QueueAwareExecutor}
import com.daml.ledger.api.testing.utils.PekkoBeforeAndAfterAll
import com.daml.ledger.resources.ResourceOwner
import com.daml.metrics.api.MetricsContext
import com.daml.metrics.api.{MetricInfo, MetricQualification, MetricsContext}
import com.daml.ports.Port
import com.daml.scalautil.Statement.discard
import com.daml.tracing.NoOpTelemetry
@ -99,7 +99,13 @@ final class RateLimitingInterceptorSpec
it should "allow metadata requests even when over limit" in {
metrics.openTelemetryMetricsFactory
.meter(metrics.lapi.threadpool.apiServices :+ "submitted")(MetricsContext.Empty)
.meter(
MetricInfo(
metrics.lapi.threadpool.apiServices :+ "submitted",
"",
MetricQualification.Debug,
)
)(MetricsContext.Empty)
.mark(config.maxApiServicesQueueSize.toLong + 1)(MetricsContext.Empty) // Over limit
val protoService = ProtoReflectionService.newInstance()
@ -131,7 +137,13 @@ final class RateLimitingInterceptorSpec
it should "allow health checks event when over limit" in {
metrics.openTelemetryMetricsFactory
.meter(metrics.lapi.threadpool.apiServices :+ "submitted")(MetricsContext.Empty)
.meter(
MetricInfo(
metrics.lapi.threadpool.apiServices :+ "submitted",
"",
MetricQualification.Debug,
)
)(MetricsContext.Empty)
.mark(config.maxApiServicesQueueSize.toLong + 1)(MetricsContext.Empty) // Over limit
val healthService =

View File

@ -4,15 +4,11 @@
package com.digitalasset.canton.platform.indexer.ha
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.digitalasset.canton.ledger.api.health.ReportsHealth
import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger}
import com.digitalasset.canton.metrics.{
HistogramInventory,
LedgerApiServerHistograms,
LedgerApiServerMetrics,
}
import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
import com.digitalasset.canton.platform.LedgerApiServer
import com.digitalasset.canton.platform.config.{CommandServiceConfig, IndexServiceConfig}
import com.digitalasset.canton.platform.indexer.{

View File

@ -239,7 +239,8 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers with Nam
val someHash = Hash.hashPrivateKey("p0")
val someRecordTime = Time.Timestamp.assertFromString("2000-01-01T00:00:00.000000Z")
val someRecordTime =
Time.Timestamp.assertFromInstant(Instant.parse("2000-01-01T00:00:00.000000Z"))
val someCompletionInfo = state.CompletionInfo(
actAs = Nil,

View File

@ -6,8 +6,8 @@ package com.digitalasset.canton.platform.packages
import com.daml.daml_lf_dev.DamlLf
import com.daml.lf.archive.DarParser
import com.daml.lf.data.Ref.PackageId
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{MetricInfo, MetricName, MetricQualification}
import com.digitalasset.canton.concurrent.Threading
import com.digitalasset.canton.ledger.resources.TestResourceContext
import com.digitalasset.canton.testing.utils.TestModels
@ -31,7 +31,8 @@ class DeduplicatingPackageLoaderSpec
private[this] var actorSystem: ActorSystem = _
private[this] val loadCount = new AtomicLong()
private[this] val metric = NoOpMetricsFactory.timer(MetricName("test-metric"))
private[this] val metric =
NoOpMetricsFactory.timer(MetricInfo(MetricName("test-metric"), "", MetricQualification.Debug))
private[this] val dar =
TestModels.com_daml_ledger_test_ModelTestDar_path

View File

@ -40,7 +40,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks.*
import org.scalatest.wordspec.AnyWordSpec
import java.time.Duration
import java.time.{Duration, Instant}
import scala.concurrent.{ExecutionContext, Future}
// Note: this suite contains hand-crafted updates that are impossible to produce on some ledgers
@ -1630,7 +1630,8 @@ object UpdateToDbDtoSpec {
private val otherParticipantId =
Ref.ParticipantId.assertFromString("UpdateToDbDtoSpecRemoteParticipant")
private val someOffset = Offset.fromHexString(Ref.HexString.assertFromString("abcdef"))
private val someRecordTime = Time.Timestamp.assertFromString("2000-01-01T00:00:00.000000Z")
private val someRecordTime =
Time.Timestamp.assertFromInstant(Instant.parse(("2000-01-01T00:00:00.000000Z")))
private val someApplicationId =
Ref.ApplicationId.assertFromString("UpdateToDbDtoSpecApplicationId")
private val someCommandId = Ref.CommandId.assertFromString("UpdateToDbDtoSpecCommandId")

View File

@ -22,6 +22,8 @@ import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import org.scalatest.wordspec.AnyWordSpec
import java.time.Instant
class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
import DbDtoEq.*
@ -44,7 +46,8 @@ class UpdateToMeteringDbDtoSpec extends AnyWordSpec with MetricValues {
val someHash = Hash.hashPrivateKey("p0")
val someRecordTime = Time.Timestamp.assertFromString("2000-01-01T00:00:00.000000Z")
val someRecordTime =
Time.Timestamp.assertFromInstant(Instant.parse("2000-01-01T00:00:00.000000Z"))
val someCompletionInfo = state.CompletionInfo(
actAs = Nil,

View File

@ -9,18 +9,14 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Ref
import com.daml.lf.engine.{Engine, EngineConfig}
import com.daml.lf.language.{LanguageMajorVersion, LanguageVersion}
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.daml.resources.PureResource
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.ledger.api.domain.ParticipantId
import com.digitalasset.canton.logging.LoggingContextWithTrace.withNewLoggingContext
import com.digitalasset.canton.logging.SuppressingLogger
import com.digitalasset.canton.metrics.{
HistogramInventory,
LedgerApiServerHistograms,
LedgerApiServerMetrics,
}
import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
import com.digitalasset.canton.platform.config.{
ActiveContractsServiceStreamsConfig,
ServerRole,

View File

@ -4,8 +4,8 @@
package com.digitalasset.canton.ledger.indexerbenchmark
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.daml.resources
import com.digitalasset.canton.concurrent.DirectExecutionContext
import com.digitalasset.canton.data.Offset
@ -17,11 +17,7 @@ import com.digitalasset.canton.ledger.participant.state.{
Update,
}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.metrics.{
HistogramInventory,
LedgerApiServerHistograms,
LedgerApiServerMetrics,
}
import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
import com.digitalasset.canton.platform.LedgerApiServer
import com.digitalasset.canton.platform.indexer.ha.HaConfig
import com.digitalasset.canton.platform.indexer.{Indexer, IndexerServiceOwner, JdbcIndexer}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --enable-interfaces=yes
name: carbonv1-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --enable-interfaces=yes
name: carbonv2-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
name: experimental-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --enable-interfaces=yes
name: model-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
name: package-management-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --enable-interfaces=yes
name: semantic-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
name: upgrade-tests
source: .
version: 1.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
name: upgrade-tests
source: .
version: 2.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
name: upgrade-tests
source: .
version: 3.0.0

View File

@ -3,9 +3,9 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Counter, LabeledMetricsFactory, Meter, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification}
import com.daml.metrics.api.HistogramInventory.Item
class CommandHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -4,9 +4,9 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.CacheMetrics
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification}
import com.daml.metrics.api.HistogramInventory.Item
class ExecutionHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -4,9 +4,15 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.DatabaseMetrics
import com.daml.metrics.api.MetricHandle.{Counter, Histogram, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Counter, Histogram, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
trait TransactionStreamsDbHistograms {

View File

@ -3,9 +3,15 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class IndexHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -102,8 +102,6 @@ class LedgerApiServerHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory
) {
private[metrics] val services = new ServicesHistograms(prefix :+ "services")
private[metrics] val commands = new CommandHistograms(prefix :+ "commands")
private[metrics] val execution = new ExecutionHistograms(prefix :+ "execution")
@ -115,5 +113,4 @@ class LedgerApiServerHistograms(val prefix: MetricName)(implicit
// is anyway hardcoded
private val _db = new DatabaseMetricsHistograms()
}

View File

@ -4,9 +4,15 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.DatabaseMetrics
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class ParallelIndexerHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -3,9 +3,15 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.*
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class ServicesHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: JsonEncodingTest

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.dev
name: JsonEncodingTestDev

View File

@ -22,6 +22,7 @@ import shapeless.record.Record as HRecord
import spray.json.*
import scalaz.syntax.show.*
import java.time.Instant
import scala.annotation.nowarn
import scala.util.{Success, Try}
import scala.util.Random.shuffle
@ -126,7 +127,7 @@ abstract class ApiCodecCompressedSpec
fListOfText = Vector("foo", "bar"),
fListOfUnit = Vector((), ()),
fDate = Time.Date assertFromString "2019-01-28",
fTimestamp = Time.Timestamp assertFromString "2019-01-28T12:44:33.22Z",
fTimestamp = Time.Timestamp.assertFromInstant(Instant.parse("2019-01-28T12:44:33.22Z")),
fOptionalText = None,
fOptionalUnit = Some(()),
fOptOptText = Some(Some("foo")),
@ -258,15 +259,15 @@ class ApiCodecCompressedSpecStable extends ApiCodecCompressedSpec {
}
def cn(canonical: String, numerically: String, typ: VA)(
expected: typ.Inj,
alternates: String*
expected: typ.Inj,
alternates: String*
)(implicit pos: source.Position) =
(pos.lineNumber, canonical, numerically, typ, expected, alternates)
def c(canonical: String, typ: VA)(expected: typ.Inj, alternates: String*)(implicit
pos: source.Position
pos: source.Position
) =
cn(canonical, canonical, typ)(expected, alternates: _*)(pos)
cn(canonical, canonical, typ)(expected, alternates*)(pos)
object VAs {
val ooi = VA.optional(VA.optional(VA.int64))
@ -324,11 +325,15 @@ class ApiCodecCompressedSpecStable extends ApiCodecCompressedSpec {
"\"0.12345123445001\"",
),
c("\"1990-11-09T04:30:23.123456Z\"", VA.timestamp)(
Time.Timestamp assertFromString "1990-11-09T04:30:23.123456Z",
Time.Timestamp.assertFromInstant(Instant.parse("1990-11-09T04:30:23.123456Z")),
"\"1990-11-09T04:30:23.1234569Z\"",
),
c("\"1970-01-01T00:00:00Z\"", VA.timestamp)(Time.Timestamp assertFromLong 0),
c("\"1969-12-31T23:00:00Z\"", VA.timestamp)(Time.Timestamp.assertFromLong(-3600000000L), "\"1970-01-01T00:00:00+01:00\""),
// Ensure ISO 8601 timestamps with offsets are successfully parsed by comparing to (epoch - 1 hour)
c("\"1969-12-31T23:00:00Z\"", VA.timestamp)(
Time.Timestamp.assertFromLong(-3600000000L),
"\"1970-01-01T00:00:00+01:00\"",
),
cn("\"42\"", "42", VA.int64)(42, "\"+42\""),
cn("\"0\"", "0", VA.int64)(0, "-0", "\"+0\"", "\"-0\""),
c("\"Alice\"", VA.party)(Ref.Party assertFromString "Alice"),
@ -392,7 +397,7 @@ class ApiCodecCompressedSpecStable extends ApiCodecCompressedSpec {
typ.prj(parsed) should ===(Some(expected))
apiValueToJsValue(parsed) should ===(json)
numCodec.apiValueToJsValue(parsed) should ===(numJson)
val tAlternates = Table("alternate", alternates: _*)
val tAlternates = Table("alternate", alternates*)
forEvery(tAlternates) { alternate =>
val aJson = alternate.parseJson
typ.prj(jsValueToApiValue(aJson, typ.t, typeLookup)) should ===(Some(expected))

View File

@ -4,7 +4,11 @@
package com.digitalasset.canton.http
import com.daml.ledger.api.v2 as lav2
import lav2.command_service.{SubmitAndWaitForTransactionResponse, SubmitAndWaitForTransactionTreeResponse, SubmitAndWaitRequest}
import lav2.command_service.{
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse,
SubmitAndWaitRequest,
}
import lav2.transaction.{Transaction, TransactionTree}
import com.digitalasset.canton.http.util.Logging as HLogging
import com.daml.logging.LoggingContextOf
@ -12,8 +16,13 @@ import LoggingContextOf.{label, newLoggingContext}
import com.daml.jwt.JwtSigner
import com.daml.jwt.domain.{DecodedJwt, Jwt}
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.ledger.api.auth.{AuthServiceJWTCodec, AuthServiceJWTPayload, StandardJWTPayload, StandardJWTTokenFormat}
import com.digitalasset.canton.tracing.{NoTracing, TraceContext}
import com.digitalasset.canton.ledger.api.auth.{
AuthServiceJWTCodec,
AuthServiceJWTPayload,
StandardJWTPayload,
StandardJWTTokenFormat,
}
import com.digitalasset.canton.tracing.NoTracing
import org.scalatest.Inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
@ -73,7 +82,7 @@ class CommandServiceTest extends AsyncWordSpec with Matchers with Inside with No
}
}
object CommandServiceTest extends BaseTest {
object CommandServiceTest extends BaseTest {
private val multiPartyJwp = domain.JwtWritePayload(
domain.ApplicationId("myapp"),
submitter = domain.Party subst NonEmptyList("foo", "bar"),
@ -90,7 +99,6 @@ object CommandServiceTest extends BaseTest {
: LoggingContextOf[HLogging.InstanceUUID with HLogging.RequestID] =
newLoggingContext(label[HLogging.InstanceUUID with HLogging.RequestID])(identity)
// TODO(#13303): Deduplicate with original
def jwtForParties(
actAs: List[domain.Party],
@ -131,21 +139,21 @@ object CommandServiceTest extends BaseTest {
new CommandService(
submitAndWaitForTransaction = (_, req) =>
_ =>
_ =>
Future {
txns.add(req)
import lav2.event.{CreatedEvent, Event}, Event.Event.Created
import com.digitalasset.canton.fetchcontracts.util.IdentifierConverters.apiIdentifier
val creation = Event(
Created(
CreatedEvent(
templateId = Some(apiIdentifier(tplId)),
createArguments = Some(lav2.value.Record()),
_ =>
Future {
txns.add(req)
import lav2.event.{CreatedEvent, Event}, Event.Event.Created
import com.digitalasset.canton.fetchcontracts.util.IdentifierConverters.apiIdentifier
val creation = Event(
Created(
CreatedEvent(
templateId = Some(apiIdentifier(tplId)),
createArguments = Some(lav2.value.Record()),
)
)
)
)
\/-(SubmitAndWaitForTransactionResponse(Some(Transaction(events = Seq(creation)))))
},
\/-(SubmitAndWaitForTransactionResponse(Some(Transaction(events = Seq(creation)))))
},
submitAndWaitForTransactionTree = (_, req) =>
_ =>
Future {

View File

@ -13,11 +13,11 @@ import org.scalatest.matchers.should.Matchers
import scalaz.\/-
import spray.json.enrichAny
import java.time.LocalDate
import java.time.{Instant, LocalDate}
class MeteringReportEndpointTest extends AnyFreeSpec with Matchers {
import MeteringReportEndpoint._
import MeteringReportEndpoint.*
"MeteringReportEndpoint" - {
@ -35,13 +35,13 @@ class MeteringReportEndpointTest extends AnyFreeSpec with Matchers {
}
"should convert to timestamp to protobuf timestamp" in {
val expected = Timestamp.assertFromString("2022-02-03T00:00:00Z")
val expected = Timestamp.assertFromInstant(Instant.parse("2022-02-03T00:00:00Z"))
val actual = toTimestamp(LocalDate.of(2022, 2, 3))
actual shouldBe expected
}
"should convert to protobuf request" in {
import request._
import request.*
val expected = metering_report_service.GetMeteringReportRequest(
Some(toPbTimestamp(toTimestamp(from))),
to.map(toTimestamp).map(toPbTimestamp),

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240527.13089.0.vb44823df
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
build-options:
- --target=2.1
name: AdminWorkflows

View File

@ -8,10 +8,13 @@ import cats.syntax.parallel.*
import com.digitalasset.canton.LedgerTransactionId
import com.digitalasset.canton.admin.participant.v30.InspectionServiceGrpc.InspectionService
import com.digitalasset.canton.admin.participant.v30.{
GetConfigForSlowCounterParticipants,
GetIntervalsBehindForCounterParticipants,
LookupContractDomain,
LookupOffsetByIndex,
LookupOffsetByTime,
LookupTransactionDomain,
SetConfigForSlowCounterParticipants,
}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.participant.admin.inspection.SyncStateInspection
@ -111,4 +114,27 @@ class GrpcInspectionService(syncStateInspection: SyncStateInspection)(implicit
)
}
}
/** Configure metrics for slow counter-participants (i.e., that are behind in sending commitments) and
* configure thresholds for when a counter-participant is deemed slow.
* TODO(#10436) R7
*/
override def setConfigForSlowCounterParticipants(
request: SetConfigForSlowCounterParticipants.Request
): Future[SetConfigForSlowCounterParticipants.Response] = ???
/** Get the current configuration for metrics for slow counter-participants.
* TODO(#10436) R7
*/
override def getConfigForSlowCounterParticipants(
request: GetConfigForSlowCounterParticipants.Request
): Future[GetConfigForSlowCounterParticipants.Response] = ???
/** Get the number of intervals that counter-participants are behind in sending commitments.
* Can be used to decide whether to ignore slow counter-participants w.r.t. pruning.
* TODO(#10436) R7
*/
override def getIntervalsBehindForCounterParticipants(
request: GetIntervalsBehindForCounterParticipants.Request
): Future[GetIntervalsBehindForCounterParticipants.Response] = ???
}

View File

@ -10,6 +10,11 @@ import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.admin.grpc.{GrpcPruningScheduler, HasPruningScheduler}
import com.digitalasset.canton.admin.participant.v30.*
import com.digitalasset.canton.admin.pruning.v30
import com.digitalasset.canton.admin.pruning.v30.{
GetNoWaitCommitmentsFrom,
ResetNoWaitCommitmentsFrom,
SetNoWaitCommitmentsFrom,
}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.error.CantonErrorGroups.ParticipantErrorGroup.PruningServiceErrorGroup
@ -174,6 +179,31 @@ class GrpcPruningService(
)
case Some(scheduler) => Future.successful(scheduler)
}
/** TODO(#18453) R6
* Enable or disable waiting for commitments from the given counter-participants
* Disabling waiting for commitments disregards these counter-participants w.r.t. pruning, which gives up
* non-repudiation for those counter-participants, but increases pruning resilience to failures
* and slowdowns of those counter-participants and/or the network
*/
override def setNoWaitCommitmentsFrom(
request: SetNoWaitCommitmentsFrom.Request
): Future[SetNoWaitCommitmentsFrom.Response] = ???
/** TODO(#18453) R6
* Retrieve the configuration of waiting for commitments from counter-participants
*/
override def getNoWaitCommitmentsFrom(
request: GetNoWaitCommitmentsFrom.Request
): Future[GetNoWaitCommitmentsFrom.Response] = ???
/** TODO(#18453) R6
* Enable waiting for commitments from the given counter-participants
* Waiting for commitments is the default behavior; explicitly enabling it is useful if it was explicitly disabled
*/
override def resetNoWaitCommitmentsFrom(
request: ResetNoWaitCommitmentsFrom.Request
): Future[ResetNoWaitCommitmentsFrom.Response] = ???
}
sealed trait PruningServiceError extends CantonError

View File

@ -208,14 +208,12 @@ trait DomainRegistryHelpers extends FlagCloseable with NamedLogging { this: HasF
s"Participant is not yet active on domain ${domainId}. Initialising topology"
)
for {
sequencerConnectClient <- sequencerConnectClient(sequencerAggregatedInfo)
success <- topologyDispatcher.onboardToDomain(
domainId,
config.domain,
config.timeTracker,
sequencerAggregatedInfo.sequencerConnections,
sequencerClientFactory,
sequencerConnectClient,
sequencerAggregatedInfo.staticDomainParameters.protocolVersion,
sequencerAggregatedInfo.expectedSequencers,
)
_ <- EitherT.cond[FutureUnlessShutdown](
success,

View File

@ -1,188 +0,0 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.participant.domain
import cats.data.EitherT
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.common.domain.{
RegisterTopologyTransactionHandle,
SequencerBasedRegisterTopologyTransactionHandle,
}
import com.digitalasset.canton.config.{DomainTimeTrackerConfig, ProcessingTimeout, TopologyConfig}
import com.digitalasset.canton.crypto.Crypto
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory}
import com.digitalasset.canton.participant.topology.DomainOnboardingOutbox
import com.digitalasset.canton.sequencing.*
import com.digitalasset.canton.sequencing.client.RequestSigner.UnauthenticatedRequestSigner
import com.digitalasset.canton.sequencing.client.{SequencerClient, SequencerClientFactory}
import com.digitalasset.canton.sequencing.handlers.DiscardIgnoredEvents
import com.digitalasset.canton.store.memory.{InMemorySendTrackerStore, InMemorySequencedEventStore}
import com.digitalasset.canton.time.{Clock, DomainTimeTracker}
import com.digitalasset.canton.topology.store.TopologyStore
import com.digitalasset.canton.topology.store.TopologyStoreId.{AuthorizedStore, DomainStore}
import com.digitalasset.canton.topology.{
DomainId,
ParticipantId,
SequencerId,
UnauthenticatedMemberId,
}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerAlias}
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
/** Takes care of requesting approval of the participant's initial topology transactions to the IDM via the sequencer.
* Before these transactions have been approved, the participant cannot connect to the sequencer because it can't
* authenticate without the IDM having approved the transactions. Because of that, this initial request is sent by
* a dynamically created unauthenticated member whose sole purpose is to send this request and wait for the response.
*/
final class ParticipantInitializeTopology(
domainId: DomainId,
alias: DomainAlias,
participantId: ParticipantId,
authorizedStore: TopologyStore[AuthorizedStore],
targetStore: TopologyStore[DomainStore],
clock: Clock,
timeTracker: DomainTimeTrackerConfig,
processingTimeout: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
sequencerClientFactory: SequencerClientFactory,
connections: SequencerConnections,
crypto: Crypto,
config: TopologyConfig,
protocolVersion: ProtocolVersion,
expectedSequencers: NonEmpty[Map[SequencerAlias, SequencerId]],
) {
def run()(implicit
executionContext: ExecutionContextExecutor,
executionSequencerFactory: ExecutionSequencerFactory,
materializer: Materializer,
tracer: Tracer,
traceContext: TraceContext,
loggingContext: ErrorLoggingContext,
): EitherT[FutureUnlessShutdown, DomainRegistryError, Boolean] = {
val unauthenticatedMember =
UnauthenticatedMemberId.tryCreate(participantId.namespace)(crypto.pureCrypto)
loggingContext.logger.debug(
s"Unauthenticated member $unauthenticatedMember will register initial topology transactions on behalf of participant $participantId"
)
def pushTopologyAndVerify(
client: SequencerClient,
domainTimeTracker: DomainTimeTracker,
): EitherT[FutureUnlessShutdown, DomainRegistryError, Boolean] = {
val handle = createHandler(client)
for {
_ <- EitherT.right[DomainRegistryError](
FutureUnlessShutdown.outcomeF(
// using send tracking requires a subscription, otherwise the send tracker
// doesn't get updated
client.subscribeAfterUnauthenticated(
CantonTimestamp.MinValue,
// There is no point in ignoring events in an unauthenticated subscription
DiscardIgnoredEvents(loggerFactory)(
ApplicationHandler.success(s"participant-initialize-topology-$alias")
),
domainTimeTracker,
)
)
)
// push the initial set of topology transactions to the domain and stop using the unauthenticated member
success <- initiateOnboarding(handle)
} yield success
}
for {
unauthenticatedSequencerClient <- sequencerClientFactory
.create(
unauthenticatedMember,
new InMemorySequencedEventStore(loggerFactory),
new InMemorySendTrackerStore(),
UnauthenticatedRequestSigner,
connections,
expectedSequencers, // TODO(i12906): Iterate over sequencers until the honest answer is received
)
.leftMap[DomainRegistryError](
DomainRegistryError.ConnectionErrors.FailedToConnectToSequencer.Error(_)
)
.mapK(FutureUnlessShutdown.outcomeK)
domainTimeTracker = DomainTimeTracker(
timeTracker,
clock,
unauthenticatedSequencerClient,
protocolVersion,
processingTimeout,
loggerFactory,
)
success <- {
def closeEverything(): Future[Unit] = {
unauthenticatedSequencerClient.close()
domainTimeTracker.close()
Future.unit
}
EitherT {
FutureUnlessShutdown {
pushTopologyAndVerify(unauthenticatedSequencerClient, domainTimeTracker).value.unwrap
.transformWith {
case Failure(exception) =>
// Close everything and then return the original failure
closeEverything().flatMap(_ => Future.failed(exception))
case Success(value) =>
// Close everything and then return the result
closeEverything().map(_ => value)
}
}
}
}
} yield {
success
}
}
private def createHandler(
client: SequencerClient
)(implicit ec: ExecutionContext): RegisterTopologyTransactionHandle =
new SequencerBasedRegisterTopologyTransactionHandle(
client,
domainId,
participantId,
clock,
config,
protocolVersion,
processingTimeout,
loggerFactory,
)
private def initiateOnboarding(
handle: RegisterTopologyTransactionHandle
)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): EitherT[FutureUnlessShutdown, DomainRegistryError, Boolean] =
DomainOnboardingOutbox
.initiateOnboarding(
alias,
domainId,
protocolVersion,
participantId,
handle,
authorizedStore,
targetStore,
processingTimeout,
loggerFactory,
crypto,
)
}

View File

@ -3,10 +3,15 @@
package com.digitalasset.canton.participant.metrics
import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory, Meter, Timer}
import com.daml.metrics.api.{MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory, Meter, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class CommitmentHistograms(parent: MetricName)(implicit inventory: HistogramInventory) {
private[metrics] val prefix = parent :+ "commitments"

View File

@ -5,16 +5,22 @@ package com.digitalasset.canton.participant.metrics
import cats.Eval
import com.daml.metrics.HealthMetrics
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.Gauge.CloseableGauge
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, Histogram, LabeledMetricsFactory, Meter}
import com.daml.metrics.api.noop.NoOpGauge
import com.daml.metrics.api.{HistogramInventory, MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
import com.daml.metrics.grpc.GrpcServerMetrics
import com.digitalasset.canton.DomainAlias
import com.digitalasset.canton.data.TaskSchedulerMetrics
import com.digitalasset.canton.environment.BaseMetrics
import com.digitalasset.canton.http.metrics.{HttpApiHistograms, HttpApiMetrics}
import com.daml.metrics.api.HistogramInventory.Item
import com.digitalasset.canton.metrics.*
import com.digitalasset.canton.participant.metrics.PruningMetrics as ParticipantPruningMetrics

View File

@ -3,10 +3,15 @@
package com.digitalasset.canton.participant.metrics
import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{MetricInfo, MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
MetricName,
MetricQualification,
MetricsContext,
}
class PruningHistograms(parent: MetricName)(implicit inventory: HistogramInventory) {
private[metrics] val prefix = parent :+ "pruning"

View File

@ -3,10 +3,9 @@
package com.digitalasset.canton.participant.metrics
import com.daml.metrics.api.MetricHandle.{Histogram, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{MetricName, MetricQualification, MetricsContext}
import com.daml.metrics.api.HistogramInventory
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Histogram, LabeledMetricsFactory, Timer}
import com.daml.metrics.api.{HistogramInventory, MetricName, MetricQualification, MetricsContext}
class TransactionProcessingHistograms(val prefix: MetricName)(implicit
inventory: HistogramInventory

View File

@ -340,21 +340,30 @@ trait MessageDispatcher { this: NamedLogging =>
}
}
val checkedRootHashMessagesC =
checkRootHashMessageAndViews(ts, rootHashMessages, encryptedViews)
val checkedRootHashMessagesC = checkRootHashMessageAndViews(rootHashMessages, encryptedViews)
checkedRootHashMessagesC.nonaborts.iterator.foreach(alarm(sc, ts, _))
for {
result <- checkedRootHashMessagesC.toEither match {
case Right(goodRequest) =>
if (!containsTopologyTransactions)
processRequest(goodRequest)
else {
if (containsTopologyTransactions) {
/* A batch should not contain a request and a topology transaction.
* Handling of such a batch is done consistently with the case [[ExpectMalformedMediatorConfirmationRequestResult]] below.
*
* In order to safely drop the confirmation request, we must make sure that every other participant will be able to make
* the same decision, otherwise we will break transparency.
* Here, the decision is based on the fact that the batch contained a valid topology transaction. These transactions are
* addressed to `AllMembersOfDomain`, which by definition means that everyone will receive them.
* Therefore, anyone who received this confirmation request has also received the topology transaction, and will
* be able to make the same decision and drop the confirmation request.
*
* Note that we could instead decide to process both the confirmation request and the topology transactions.
* This would not have a conceptual problem, because the topology transactions always become effective *after*
* their sequencing time, but it would likely make the code more complicated than relying on the above argument.
*/
alarm(sc, ts, "Invalid batch containing both a request and topology transaction")
tickRecordOrderPublisher(sc, ts)
}
} else
processRequest(goodRequest)
case Left(DoNotExpectMediatorResult) =>
if (containsTopologyTransactions) {
@ -387,7 +396,6 @@ trait MessageDispatcher { this: NamedLogging =>
* @return [[com.digitalasset.canton.util.Checked.Abort]] indicates a really malformed request and the appropriate reaction
*/
private def checkRootHashMessageAndViews(
ts: CantonTimestamp,
rootHashMessages: List[OpenEnvelope[RootHashMessage[SerializedRootHashMessagePayload]]],
encryptedViews: List[OpenEnvelope[EncryptedViewMessage[ViewType]]],
)(implicit

View File

@ -12,6 +12,7 @@ import cats.syntax.traverse.*
import cats.syntax.validated.*
import com.daml.error.*
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.admin.pruning
import com.digitalasset.canton.concurrent.{FutureSupervisor, Threading}
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt, PositiveNumeric}
import com.digitalasset.canton.config.{ProcessingTimeout, TestingConfigInternal}
@ -55,6 +56,7 @@ import com.digitalasset.canton.protocol.{
import com.digitalasset.canton.sequencing.client.SendAsyncClientError.RequestRefused
import com.digitalasset.canton.sequencing.client.SequencerClientSend
import com.digitalasset.canton.sequencing.protocol.{Batch, OpenEnvelope, Recipients, SendAsyncError}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.store.SequencerCounterTrackerStore
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.processing.EffectiveTime
@ -66,7 +68,7 @@ import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.*
import com.digitalasset.canton.util.retry.Policy
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{LfPartyId, TransferCounter}
import com.digitalasset.canton.{LfPartyId, ProtoDeserializationError, TransferCounter}
import com.google.common.annotations.VisibleForTesting
import java.util.concurrent.atomic.AtomicReference
@ -2289,4 +2291,36 @@ object AcsCommitmentProcessor extends HasLoggerName {
}
}
}
sealed trait SharedContractsState {
def toProtoV30: pruning.v30.SharedContractsState
}
object SharedContractsState {
object SharedContracts extends SharedContractsState {
override val toProtoV30: pruning.v30.SharedContractsState =
pruning.v30.SharedContractsState.SHARED_CONTRACTS
}
object NoSharedContracts extends SharedContractsState {
override val toProtoV30: pruning.v30.SharedContractsState =
pruning.v30.SharedContractsState.NO_SHARED_CONTRACTS
}
def fromProtoV30(
proto: pruning.v30.SharedContractsState
): ParsingResult[SharedContractsState] =
proto match {
case pruning.v30.SharedContractsState.NO_SHARED_CONTRACTS => Right(NoSharedContracts)
case pruning.v30.SharedContractsState.SHARED_CONTRACTS => Right(SharedContracts)
case _ =>
Left(
ProtoDeserializationError.ValueConversionError(
"no wait commitments from",
s"Unknown value: $proto",
)
)
}
}
}

View File

@ -6,29 +6,25 @@ package com.digitalasset.canton.participant.topology
import cats.data.EitherT
import cats.syntax.functor.*
import cats.syntax.parallel.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.DomainAlias
import com.digitalasset.canton.common.domain.{
RegisterTopologyTransactionHandle,
SequencerBasedRegisterTopologyTransactionHandle,
SequencerConnectClient,
}
import com.digitalasset.canton.concurrent.{FutureSupervisor, HasFutureSupervision}
import com.digitalasset.canton.config.{DomainTimeTrackerConfig, LocalNodeConfig, ProcessingTimeout}
import com.digitalasset.canton.config.{LocalNodeConfig, ProcessingTimeout}
import com.digitalasset.canton.crypto.Crypto
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.health.admin.data.TopologyQueueStatus
import com.digitalasset.canton.lifecycle.*
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.domain.{
DomainRegistryError,
ParticipantInitializeTopology,
}
import com.digitalasset.canton.participant.domain.DomainRegistryError
import com.digitalasset.canton.participant.store.SyncDomainPersistentState
import com.digitalasset.canton.participant.sync.SyncDomainPersistentStateManager
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.sequencing.SequencerConnections
import com.digitalasset.canton.sequencing.client.{SequencerClient, SequencerClientFactory}
import com.digitalasset.canton.sequencing.client.SequencerClient
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.DomainTopologyClientWithInit
@ -39,13 +35,10 @@ import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.*
import com.digitalasset.canton.util.*
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerAlias}
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.*
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
trait ParticipantTopologyDispatcherHandle {
@ -219,36 +212,25 @@ class ParticipantTopologyDispatcher(
def onboardToDomain(
domainId: DomainId,
alias: DomainAlias,
timeTrackerConfig: DomainTimeTrackerConfig,
sequencerConnections: SequencerConnections,
sequencerClientFactory: SequencerClientFactory,
sequencerConnectClient: SequencerConnectClient,
protocolVersion: ProtocolVersion,
expectedSequencers: NonEmpty[Map[SequencerAlias, SequencerId]],
)(implicit
executionContext: ExecutionContextExecutor,
executionSequencerFactory: ExecutionSequencerFactory,
materializer: Materializer,
tracer: Tracer,
traceContext: TraceContext,
): EitherT[FutureUnlessShutdown, DomainRegistryError, Boolean] = {
getState(domainId).flatMap { state =>
(new ParticipantInitializeTopology(
domainId,
alias,
participantId,
manager.store,
state.topologyStore,
clock,
timeTrackerConfig,
timeouts,
loggerFactory.append("domainId", domainId.toString),
sequencerClientFactory,
sequencerConnections,
crypto,
config.topology,
protocolVersion,
expectedSequencers,
)).run()
DomainOnboardingOutbox
.initiateOnboarding(
alias,
domainId,
protocolVersion,
participantId,
sequencerConnectClient,
manager.store,
timeouts,
loggerFactory.append("domainId", domainId.toString),
crypto,
)
}
}
@ -353,14 +335,13 @@ private class DomainOnboardingOutbox(
val domainId: DomainId,
val protocolVersion: ProtocolVersion,
participantId: ParticipantId,
val handle: RegisterTopologyTransactionHandle,
sequencerConnectClient: SequencerConnectClient,
val authorizedStore: TopologyStore[TopologyStoreId.AuthorizedStore],
val targetStore: TopologyStore[TopologyStoreId.DomainStore],
val timeouts: ProcessingTimeout,
val loggerFactory: NamedLoggerFactory,
override protected val crypto: Crypto,
) extends DomainOutboxDispatch
with StoreBasedDomainOutboxDispatchHelper {
) extends StoreBasedDomainOutboxDispatchHelper
with FlagCloseable {
override protected val memberId: Member = participantId
@ -372,12 +353,12 @@ private class DomainOnboardingOutbox(
_ = logger.debug(
s"Sending ${initialTransactions.size} onboarding transactions to ${domain}"
)
result <- dispatch(domain, initialTransactions).leftMap[DomainRegistryError](
DomainRegistryError.InitialOnboardingError.Error(_)
)
} yield {
result.forall(res => isExpectedState(res))
}).thereafter { _ =>
_result <- dispatch(initialTransactions)
.mapK(FutureUnlessShutdown.outcomeK)
.leftMap(err =>
DomainRegistryError.InitialOnboardingError.Error(err.toString): DomainRegistryError
)
} yield true).thereafter { _ =>
close()
}
@ -392,13 +373,10 @@ private class DomainOnboardingOutbox(
.findParticipantOnboardingTransactions(participantId, domainId)
)
)
applicablePossiblyPresent <- EitherT.right(
applicable <- EitherT.right(
performUnlessClosingF(functionFullName)(onlyApplicable(candidates))
)
_ <- EitherT.fromEither[FutureUnlessShutdown](initializedWith(applicablePossiblyPresent))
applicable <- EitherT.right(
performUnlessClosingF(functionFullName)(notAlreadyPresent(applicablePossiblyPresent))
)
_ <- EitherT.fromEither[FutureUnlessShutdown](initializedWith(applicable))
// Try to convert if necessary the topology transactions for the required protocol version of the domain
convertedTxs <- performUnlessClosingEitherUSF(functionFullName) {
convertTransactions(applicable).leftMap[DomainRegistryError](
@ -407,6 +385,16 @@ private class DomainOnboardingOutbox(
}
} yield convertedTxs
private def dispatch(
transactions: Seq[GenericSignedTopologyTransaction]
)(implicit traceContext: TraceContext): EitherT[Future, SequencerConnectClient.Error, Unit] = {
sequencerConnectClient.registerOnboardingTopologyTransactions(
domain,
participantId,
transactions,
)
}
private def initializedWith(
initial: Seq[GenericSignedTopologyTransaction]
)(implicit traceContext: TraceContext): Either[DomainRegistryError, Unit] = {
@ -442,9 +430,8 @@ object DomainOnboardingOutbox {
domainId: DomainId,
protocolVersion: ProtocolVersion,
participantId: ParticipantId,
handle: RegisterTopologyTransactionHandle,
sequencerConnectClient: SequencerConnectClient,
authorizedStore: TopologyStore[TopologyStoreId.AuthorizedStore],
targetStore: TopologyStore[TopologyStoreId.DomainStore],
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
crypto: Crypto,
@ -457,9 +444,8 @@ object DomainOnboardingOutbox {
domainId,
protocolVersion,
participantId,
handle,
sequencerConnectClient,
authorizedStore,
targetStore,
timeouts,
loggerFactory,
crypto,

View File

@ -3,10 +3,9 @@
package com.digitalasset.canton.participant.metrics
import com.daml.metrics.api.MetricName
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.digitalasset.canton.DomainAlias
import com.daml.metrics.api.HistogramInventory
object ParticipantTestMetrics
extends ParticipantMetrics(

View File

@ -4,7 +4,7 @@
package com.digitalasset.canton.metrics
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{MetricName, MetricsContext}
import com.daml.metrics.api.{HistogramInventory, MetricName, MetricsContext}
object CommonMockMetrics {

View File

@ -16,7 +16,11 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.metrics.{SdkMeterProvider, SdkMeterProviderBuilder}
import io.opentelemetry.sdk.trace.`export`.{BatchSpanProcessor, BatchSpanProcessorBuilder, SpanExporter}
import io.opentelemetry.sdk.trace.`export`.{
BatchSpanProcessor,
BatchSpanProcessorBuilder,
SpanExporter,
}
import io.opentelemetry.sdk.trace.samplers.Sampler
import io.opentelemetry.sdk.trace.{SdkTracerProvider, SdkTracerProviderBuilder}
@ -24,7 +28,6 @@ import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters.ScalaDurationOps
import scala.util.chaining.scalaUtilChainingOps
object OpenTelemetryFactory {
def initializeOpenTelemetry(
@ -75,13 +78,15 @@ object OpenTelemetryFactory {
else builder
val meterProviderBuilder =
OpenTelemetryUtil.addViewsToProvider(
SdkMeterProvider.builder,
testingSupportAdhocMetrics,
histogramInventory,
histogramFilter,
histogramConfigs,
).pipe(setMetricsReader)
OpenTelemetryUtil
.addViewsToProvider(
SdkMeterProvider.builder,
testingSupportAdhocMetrics,
histogramInventory,
histogramFilter,
histogramConfigs,
)
.pipe(setMetricsReader)
val configuredSdk = OpenTelemetrySdk.builder
.setTracerProvider(tracerProviderBuilder.build)
@ -127,5 +132,4 @@ object OpenTelemetryFactory {
if (config.parentBased) Sampler.parentBased(sampler) else sampler
}
}

View File

@ -1 +1 @@
20240529.13387.v44c5d50b
20240531.13401.v5646e99c