update canton to 20240708.13628.v22bd0def (#19536)

* update canton to 20240708.13628.v22bd0def

tell-slack: canton

* merge fix

* participant got renamed to participant_uid in proto

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Bas van Gijzel <bas.vangijzel@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-07-09 15:39:12 +01:00 committed by GitHub
parent 42417f34a0
commit 6996fa07c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
112 changed files with 1195 additions and 753 deletions

View File

@ -1,35 +0,0 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package com.digitalasset.canton.admin.traffic.v30;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
// Full traffic status for a member at a point in time
message MemberTrafficStatus {
// Represents a top up event valid from a certain timestamp
message TopUpEvent {
// Timestamp at which the top up becomes valid (inclusive)
google.protobuf.Timestamp effective_at = 1;
// Topology transaction serial id that is used to discriminate between top ups with the same effective_at, which is possible
uint32 serial = 2;
// Traffic limit of the top up
uint64 extra_traffic_limit = 3;
}
// Member the status is about
string member = 1;
// Total extra traffic bought. Optional.
google.protobuf.UInt64Value total_extra_traffic_limit = 2;
// Total extra traffic consumed
uint64 total_extra_traffic_consumed = 3;
// Current and future top up events that have been registered but are not necessarily active yet
repeated TopUpEvent top_up_events = 4; // TODO(i17477): Was never used, remove when we're done with the rework
// Timestamp at which the status is valid
google.protobuf.Timestamp ts = 5;
// Serial number of the balance (total_extra_traffic_limit) of this status
google.protobuf.UInt32Value balance_serial = 6;
}

View File

@ -16,8 +16,11 @@ import com.digitalasset.canton.domain.sequencing.admin.grpc.InitializeSequencerR
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerSnapshot
import com.digitalasset.canton.sequencer.admin.v30
import com.digitalasset.canton.topology.{Member, SequencerId}
import com.digitalasset.canton.util.GrpcStreamingUtils
import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import io.grpc.Context.CancellableContext
import io.grpc.stub.StreamObserver
import io.grpc.{Context, ManagedChannel}
import scala.concurrent.Future
@ -60,7 +63,12 @@ object EnterpriseSequencerAdminCommands {
service: v30.SequencerInitializationServiceGrpc.SequencerInitializationServiceStub,
request: v30.InitializeSequencerFromOnboardingStateRequest,
): Future[v30.InitializeSequencerFromOnboardingStateResponse] =
service.initializeSequencerFromOnboardingState(request)
GrpcStreamingUtils.streamToServer(
service.initializeSequencerFromOnboardingState,
(onboardingState: Array[Byte]) =>
v30.InitializeSequencerFromOnboardingStateRequest(ByteString.copyFrom(onboardingState)),
request.onboardingState,
)
override def createRequest()
: Either[String, v30.InitializeSequencerFromOnboardingStateRequest] =
@ -92,7 +100,15 @@ object EnterpriseSequencerAdminCommands {
service: v30.SequencerInitializationServiceGrpc.SequencerInitializationServiceStub,
request: v30.InitializeSequencerFromGenesisStateRequest,
): Future[v30.InitializeSequencerFromGenesisStateResponse] =
service.initializeSequencerFromGenesisState(request)
GrpcStreamingUtils.streamToServer(
service.initializeSequencerFromGenesisState,
(topologySnapshot: Array[Byte]) =>
v30.InitializeSequencerFromGenesisStateRequest(
topologySnapshot = ByteString.copyFrom(topologySnapshot),
Some(domainParameters.toProtoV30),
),
request.topologySnapshot,
)
override def createRequest(): Either[String, v30.InitializeSequencerFromGenesisStateRequest] =
Right(
@ -143,17 +159,20 @@ object EnterpriseSequencerAdminCommands {
override def timeoutType: TimeoutType = DefaultUnboundedTimeout
}
final case class OnboardingState(memberOrTimestamp: Either[SequencerId, CantonTimestamp])
extends BaseSequencerAdministrationCommand[
final case class OnboardingState(
observer: StreamObserver[v30.OnboardingStateResponse],
sequencerOrTimestamp: Either[SequencerId, CantonTimestamp],
) extends BaseSequencerAdministrationCommand[
v30.OnboardingStateRequest,
v30.OnboardingStateResponse,
ByteString,
CancellableContext,
CancellableContext,
] {
override def createRequest(): Either[String, v30.OnboardingStateRequest] = {
Right(
v30.OnboardingStateRequest(request =
memberOrTimestamp.fold[v30.OnboardingStateRequest.Request](
member => v30.OnboardingStateRequest.Request.SequencerId(member.toProtoPrimitive),
sequencerOrTimestamp.fold[v30.OnboardingStateRequest.Request](
sequencer =>
v30.OnboardingStateRequest.Request.SequencerUid(sequencer.uid.toProtoPrimitive),
timestamp => v30.OnboardingStateRequest.Request.Timestamp(timestamp.toProtoTimestamp),
)
)
@ -163,22 +182,14 @@ object EnterpriseSequencerAdminCommands {
override def submitRequest(
service: v30.SequencerAdministrationServiceGrpc.SequencerAdministrationServiceStub,
request: v30.OnboardingStateRequest,
): Future[v30.OnboardingStateResponse] = service.onboardingState(request)
): Future[CancellableContext] = {
val context = Context.current().withCancellation()
context.run(() => service.onboardingState(request, observer))
Future.successful(context)
}
override def handleResponse(
response: v30.OnboardingStateResponse
): Either[String, ByteString] =
response.value match {
case v30.OnboardingStateResponse.Value
.Failure(v30.OnboardingStateResponse.Failure(reason)) =>
Left(reason)
case v30.OnboardingStateResponse.Value
.Success(
v30.OnboardingStateResponse.Success(onboardingState)
) =>
Right(onboardingState)
case _ => Left("response is empty")
}
override def handleResponse(response: CancellableContext): Either[String, CancellableContext] =
Right(response)
// command will potentially take a long time
override def timeoutType: TimeoutType = DefaultUnboundedTimeout

View File

@ -34,10 +34,7 @@ import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.data.{CantonTimestamp, CantonTimestampSecond}
import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.participant.admin.ResourceLimits
import com.digitalasset.canton.participant.admin.grpc.{
GrpcParticipantRepairService,
TransferSearchResult,
}
import com.digitalasset.canton.participant.admin.grpc.TransferSearchResult
import com.digitalasset.canton.participant.admin.traffic.TrafficStateAdmin
import com.digitalasset.canton.participant.domain.DomainConnectionConfig as CDomainConnectionConfig
import com.digitalasset.canton.participant.pruning.AcsCommitmentProcessor.{
@ -55,7 +52,7 @@ import com.digitalasset.canton.serialization.ProtoConverter.InstantConverter
import com.digitalasset.canton.time.PositiveSeconds
import com.digitalasset.canton.topology.{DomainId, ParticipantId, PartyId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.BinaryFileUtil
import com.digitalasset.canton.util.{BinaryFileUtil, GrpcStreamingUtils}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerCounter, config}
import com.google.protobuf.ByteString
@ -68,9 +65,8 @@ import io.grpc.{Context, ManagedChannel}
import java.io.IOException
import java.nio.file.{Files, Path, Paths}
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, MILLISECONDS}
import scala.concurrent.{Future, Promise, blocking}
object ParticipantAdminCommands {
@ -386,49 +382,6 @@ object ParticipantAdminCommands {
object ParticipantRepairManagement {
sealed trait StreamingMachinery[Req, Resp] {
def stream(
load: StreamObserver[Resp] => StreamObserver[Req],
requestBuilder: Array[Byte] => Req,
snapshot: ByteString,
): Future[Resp] = {
val requestComplete = Promise[Resp]()
val ref = new AtomicReference[Option[Resp]](None)
val responseObserver = new StreamObserver[Resp] {
override def onNext(value: Resp): Unit = {
ref.set(Some(value))
}
override def onError(t: Throwable): Unit = requestComplete.failure(t)
override def onCompleted(): Unit = {
ref.get() match {
case Some(response) => requestComplete.success(response)
case None =>
requestComplete.failure(
io.grpc.Status.CANCELLED
.withDescription("Server completed the request before providing a response")
.asRuntimeException()
)
}
}
}
val requestObserver = load(responseObserver)
snapshot.toByteArray
.grouped(GrpcParticipantRepairService.DefaultChunkSize.value)
.foreach { bytes =>
blocking {
requestObserver.onNext(requestBuilder(bytes))
}
}
requestObserver.onCompleted()
requestComplete.future
}
}
final case class ExportAcs(
parties: Set[PartyId],
partiesOffboarding: Boolean,
@ -489,8 +442,11 @@ object ParticipantAdminCommands {
acsChunk: ByteString,
workflowIdPrefix: String,
allowContractIdSuffixRecomputation: Boolean,
) extends GrpcAdminCommand[ImportAcsRequest, ImportAcsResponse, Map[LfContractId, LfContractId]]
with StreamingMachinery[ImportAcsRequest, ImportAcsResponse] {
) extends GrpcAdminCommand[
ImportAcsRequest,
ImportAcsResponse,
Map[LfContractId, LfContractId],
] {
override type Svc = ParticipantRepairServiceStub
@ -511,7 +467,7 @@ object ParticipantAdminCommands {
service: ParticipantRepairServiceStub,
request: ImportAcsRequest,
): Future[ImportAcsResponse] = {
stream(
GrpcStreamingUtils.streamToServer(
service.importAcs,
(bytes: Array[Byte]) =>
ImportAcsRequest(

View File

@ -27,11 +27,23 @@ object ListPartiesResult {
private def fromProtoV30(
value: v30.ListPartiesResponse.Result.ParticipantDomains
): ParsingResult[ParticipantDomains] =
): ParsingResult[ParticipantDomains] = {
val participantIdNew = UniqueIdentifier
.fromProtoPrimitive(value.participantUid, "participant_uid")
.map(ParticipantId(_))
// TODO(#16458) Remove this fallback which is used to allow 3.1 console
// to talk to 3.0 nodes
val participantIdOld = participantIdNew.orElse(
ParticipantId.fromProtoPrimitive(value.participantUid, "participant_uid")
)
for {
participantId <- ParticipantId.fromProtoPrimitive(value.participant, "participant")
participantId <- participantIdNew.orElse(participantIdOld)
domains <- value.domains.traverse(fromProtoV30)
} yield ParticipantDomains(participantId, domains)
}
def fromProtoV30(
value: v30.ListPartiesResponse.Result

View File

@ -58,7 +58,7 @@ trait HealthDumpGenerator[Status <: CantonStatus] {
def generateHealthDump(
outputFile: File,
extraFilesToZip: Seq[File] = Seq.empty,
): File = {
): Unit = {
import io.circe.generic.auto.*
import CantonHealthAdministrationEncoders.*
@ -120,7 +120,5 @@ trait HealthDumpGenerator[Status <: CantonStatus] {
val files = Iterator(logFile, logLastErrorsFile, tmpFile).filter(_.nonEmpty)
outputFile.zipIn(files ++ extraFilesToZip.iterator ++ rollingLogs)
}
outputFile
}
}

View File

@ -777,10 +777,10 @@ abstract class SequencerReference(
)
override def maybeId: Option[SequencerId] = topology.maybeIdHelper(SequencerId(_))
private lazy val setup_ = new SequencerSetupGroup(this)
private lazy val setup_ = new SequencerAdministration(this)
@Help.Summary("Methods used for node initialization")
def setup: SequencerSetupGroup = setup_
def setup: SequencerAdministration = setup_
@Help.Summary("Health and diagnostic related commands")
@Help.Group("Health")

View File

@ -1,38 +0,0 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.console.commands
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Future, Promise}
import scala.language.reflectiveCalls
private[commands] class ByteStringStreamObserver[
T <: ByteStringStreamObserver.ByteStringChunk
] extends StreamObserver[T] {
private val byteBuffer = new AtomicReference(Vector.empty[Byte])
private val requestComplete: Promise[ByteString] = Promise[ByteString]()
def result: Future[ByteString] =
requestComplete.future
override def onNext(value: T): Unit =
byteBuffer.getAndUpdate(_ ++ value.chunk.toByteArray).discard
override def onError(t: Throwable): Unit = {
requestComplete.tryFailure(t).discard
}
override def onCompleted(): Unit = {
val finalByteString = ByteString.copyFrom(byteBuffer.get().toArray)
requestComplete.trySuccess(finalByteString).discard
}
}
private[commands] object ByteStringStreamObserver {
type ByteStringChunk = { val chunk: ByteString }
}

View File

@ -10,27 +10,24 @@ import com.digitalasset.canton.admin.api.client.commands.{
TopologyAdminCommands,
}
import com.digitalasset.canton.config.{ConsoleCommandTimeout, NonNegativeDuration}
import com.digitalasset.canton.console.CommandErrors.{CommandError, GenericCommandError}
import com.digitalasset.canton.console.CommandErrors.CommandError
import com.digitalasset.canton.console.ConsoleMacros.utils
import com.digitalasset.canton.console.{
AdminCommandRunner,
CantonHealthAdministration,
CommandErrors,
CommandSuccessful,
ConsoleCommandResult,
ConsoleEnvironment,
Help,
Helpful,
}
import com.digitalasset.canton.grpc.FileStreamObserver
import com.digitalasset.canton.health.admin.data.NodeStatus
import com.digitalasset.canton.health.admin.{data, v30}
import com.digitalasset.canton.networking.grpc.GrpcError
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.util.ResourceUtil
import io.grpc.StatusRuntimeException
import io.grpc.Context
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Await, TimeoutException}
class HealthAdministration[S <: data.NodeStatus.Status](
runner: AdminCommandRunner,
@ -73,23 +70,16 @@ class HealthAdministration[S <: data.NodeStatus.Status](
val responseObserver =
new FileStreamObserver[v30.HealthDumpResponse](outputFile, _.chunk)
def call = consoleEnvironment.run {
def call: ConsoleCommandResult[Context.CancellableContext] =
adminCommand(new StatusAdminCommands.GetHealthDump(responseObserver, chunkSize))
}
try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await.result(responseObserver.result, timeout.duration)
).map(_ => outputFile.pathAsString)
}
} catch {
case sre: StatusRuntimeException =>
GenericCommandError(GrpcError("Generating health dump file", "dump", sre).toString)
case _: TimeoutException =>
outputFile.delete(swallowIOExceptions = true)
CommandErrors.ConsoleTimeout.Error(timeout.asJavaApproximation)
}
processResult(
call,
responseObserver.result,
timeout,
"Generating health dump",
cleanupOnError = () => outputFile.delete(),
).map(_ => outputFile.pathAsString)
}
private def runningCommand =

View File

@ -10,11 +10,8 @@ import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommand
import com.digitalasset.canton.admin.participant.v30.ExportAcsResponse
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.config.{ConsoleCommandTimeout, NonNegativeDuration}
import com.digitalasset.canton.console.CommandErrors.GenericCommandError
import com.digitalasset.canton.console.{
AdminCommandRunner,
CommandErrors,
CommandSuccessful,
ConsoleCommandResult,
ConsoleEnvironment,
FeatureFlag,
@ -23,8 +20,8 @@ import com.digitalasset.canton.console.{
Helpful,
}
import com.digitalasset.canton.data.RepairContract
import com.digitalasset.canton.grpc.FileStreamObserver
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.networking.grpc.GrpcError
import com.digitalasset.canton.participant.ParticipantNode
import com.digitalasset.canton.participant.admin.data.ActiveContract
import com.digitalasset.canton.participant.domain.DomainConnectionConfig
@ -35,11 +32,10 @@ import com.digitalasset.canton.util.ResourceUtil
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerCounter}
import com.google.protobuf.ByteString
import io.grpc.StatusRuntimeException
import io.grpc.Context
import java.time.Instant
import java.util.UUID
import scala.concurrent.{Await, TimeoutException}
class ParticipantRepairAdministration(
val consoleEnvironment: ConsoleEnvironment,
@ -149,7 +145,7 @@ class ParticipantRepairAdministration(
val file = File(outputFile)
val responseObserver = new FileStreamObserver[ExportAcsResponse](file, _.chunk)
def call = consoleEnvironment.run {
def call: ConsoleCommandResult[Context.CancellableContext] =
runner.adminCommand(
ParticipantAdminCommands.ParticipantRepairManagement
.ExportAcs(
@ -162,23 +158,14 @@ class ParticipantRepairAdministration(
force = force,
)
)
}
try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await.result(responseObserver.result, timeout.duration)
)
}
} catch {
case sre: StatusRuntimeException =>
GenericCommandError(
GrpcError("Generating acs snapshot file", "download_acs_snapshot", sre).toString
)
case _: TimeoutException =>
file.delete(swallowIOExceptions = true)
CommandErrors.ConsoleTimeout.Error(timeout.asJavaApproximation)
}
processResult(
call,
responseObserver.result,
timeout,
request = "exporting Acs",
cleanupOnError = () => file.delete(),
)
}
}
}

View File

@ -9,15 +9,21 @@ import com.digitalasset.canton.admin.api.client.commands.EnterpriseSequencerAdmi
InitializeFromOnboardingState,
}
import com.digitalasset.canton.admin.api.client.data.StaticDomainParameters
import com.digitalasset.canton.config.{ConsoleCommandTimeout, NonNegativeDuration}
import com.digitalasset.canton.console.{Help, SequencerReference}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.admin.grpc.InitializeSequencerResponse
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerSnapshot
import com.digitalasset.canton.grpc.ByteStringStreamObserver
import com.digitalasset.canton.sequencer.admin.v30.OnboardingStateResponse
import com.digitalasset.canton.topology.SequencerId
import com.google.protobuf.ByteString
class SequencerSetupGroup(node: SequencerReference) extends ConsoleCommandGroup.Impl(node) {
import scala.concurrent.ExecutionContext
class SequencerAdministration(node: SequencerReference) extends ConsoleCommandGroup.Impl(node) {
private def timeouts: ConsoleCommandTimeout = consoleEnvironment.commandTimeouts
private implicit val ec: ExecutionContext = consoleEnvironment.environment.executionContext
@Help.Summary(
"Download sequencer snapshot at given point in time to bootstrap another sequencer"
)
@ -33,10 +39,22 @@ class SequencerSetupGroup(node: SequencerReference) extends ConsoleCommandGroup.
"Download the onboarding state at a given point in time to bootstrap another sequencer"
)
def onboarding_state_at_timestamp(
timestamp: CantonTimestamp
timestamp: CantonTimestamp,
timeout: NonNegativeDuration = timeouts.unbounded,
): ByteString = {
consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerAdminCommands.OnboardingState(Right(timestamp)))
val responseObserver =
new ByteStringStreamObserver[OnboardingStateResponse](_.onboardingStateForSequencer)
def call =
runner.adminCommand(
EnterpriseSequencerAdminCommands.OnboardingState(
observer = responseObserver,
sequencerOrTimestamp = Right(timestamp),
)
)
processResult(call, responseObserver.resultBytes, timeout, "Downloading onboarding state")
}
}
@ -44,10 +62,21 @@ class SequencerSetupGroup(node: SequencerReference) extends ConsoleCommandGroup.
"Download the onboarding state for a given sequencer"
)
def onboarding_state_for_sequencer(
sequencerId: SequencerId
sequencerId: SequencerId,
timeout: NonNegativeDuration = timeouts.unbounded,
): ByteString = {
consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerAdminCommands.OnboardingState(Left(sequencerId)))
val responseObserver =
new ByteStringStreamObserver[OnboardingStateResponse](_.onboardingStateForSequencer)
def call =
runner.adminCommand(
EnterpriseSequencerAdminCommands.OnboardingState(
observer = responseObserver,
sequencerOrTimestamp = Left(sequencerId),
)
)
processResult(call, responseObserver.resultBytes, timeout, "Downloading onboarding state")
}
}

View File

@ -31,9 +31,9 @@ import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.grpc.ByteStringStreamObserver
import com.digitalasset.canton.health.admin.data.TopologyQueueStatus
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.networking.grpc.GrpcError
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.admin.grpc.TopologyStore.Authorized
@ -52,15 +52,15 @@ import com.digitalasset.canton.topology.transaction.TopologyTransaction.TxHash
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.util.{BinaryFileUtil, OptionUtil, ResourceUtil}
import com.digitalasset.canton.util.{BinaryFileUtil, OptionUtil}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.daml.lf.data.Ref.PackageId
import com.google.protobuf.ByteString
import io.grpc.{Context, StatusRuntimeException}
import io.grpc.Context
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.concurrent.{ExecutionContext, Future}
import scala.math.Ordering.Implicits.infixOrderingOps
import scala.reflect.ClassTag
@ -398,7 +398,7 @@ class TopologyAdministrationGroup(
timeout: NonNegativeDuration = timeouts.unbounded,
): ByteString = {
consoleEnvironment.run {
val responseObserver = new ByteStringStreamObserver[GenesisStateResponse]
val responseObserver = new ByteStringStreamObserver[GenesisStateResponse](_.chunk)
def call: ConsoleCommandResult[Context.CancellableContext] =
adminCommand(
@ -410,20 +410,7 @@ class TopologyAdministrationGroup(
)
)
call.flatMap { call =>
try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await.result(responseObserver.result, timeout.duration)
)
}
} catch {
case sre: StatusRuntimeException =>
GenericCommandError(GrpcError("Generating genesis state", "", sre).toString)
case _: TimeoutException =>
CommandErrors.ConsoleTimeout.Error(timeout.asJavaApproximation)
}
}
processResult(call, responseObserver.resultBytes, timeout, "Downloading the genesis state")
}
}

View File

@ -5,15 +5,20 @@ package com.digitalasset.canton.console
import cats.syntax.either.*
import cats.syntax.functorFilter.*
import com.digitalasset.canton.config.NonNegativeDuration
import com.digitalasset.canton.console.CommandErrors.GenericCommandError
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.logging.ErrorLoggingContext
import com.digitalasset.canton.util.BinaryFileUtil
import com.digitalasset.canton.networking.grpc.GrpcError
import com.digitalasset.canton.util.{BinaryFileUtil, ResourceUtil}
import com.google.protobuf.ByteString
import io.grpc.{Context, StatusRuntimeException}
import java.io.File
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission.{OWNER_READ, OWNER_WRITE}
import scala.concurrent.{Await, Future, TimeoutException}
import scala.jdk.CollectionConverters.*
package object commands {
@ -61,4 +66,30 @@ package object commands {
}
BinaryFileUtil.writeByteStringToFile(outputFile, bytes)
}
private[commands] def processResult[T](
call: ConsoleCommandResult[Context.CancellableContext],
result: Future[T],
timeout: NonNegativeDuration,
request: String,
serverName: String = "",
cleanupOnError: () => Unit = () => (),
): ConsoleCommandResult[T] = {
call.flatMap { call =>
try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await.result(result, timeout.duration)
)
}
} catch {
case sre: StatusRuntimeException =>
cleanupOnError()
GenericCommandError(GrpcError(request, serverName, sre).toString)
case _: TimeoutException =>
cleanupOnError()
CommandErrors.ConsoleTimeout.Error(timeout.asJavaApproximation)
}
}
}
}

View File

@ -3,6 +3,7 @@
package com.digitalasset.canton.environment
import better.files.File
import cats.data.EitherT
import cats.syntax.either.*
import com.daml.grpc.adapter.ExecutionSequencerFactory
@ -132,7 +133,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
private val healthDumpGenerator = new SingleUseCell[HealthDumpGenerator[_]]
// Function passed down to the node boostrap used to generate a health dump file
val writeHealthDumpToFile: HealthDumpFunction = () =>
val writeHealthDumpToFile: HealthDumpFunction = (file: File) =>
Future {
healthDumpGenerator
.getOrElse {
@ -158,11 +159,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
newGenerator
}
}
.generateHealthDump(
better.files.File.newTemporaryFile(
prefix = "canton-remote-health-dump"
)
)
.generateHealthDump(file)
}
installJavaUtilLoggingBridge()

View File

@ -3,6 +3,7 @@
package com.digitalasset.canton.environment
import better.files.File
import cats.Applicative
import cats.data.EitherT
import com.daml.metrics.HealthMetrics
@ -124,7 +125,8 @@ class NodesTest extends FixtureAnyWordSpec with BaseTest with HasExecutionContex
testingConfig = TestingConfigInternal(),
futureSupervisor = FutureSupervisor.Noop,
loggerFactory = loggerFactory,
writeHealthDumpToFile = () => Future.failed(new RuntimeException("Not implemented")),
writeHealthDumpToFile =
(file: File) => Future.failed(new RuntimeException("Not implemented")),
configuredOpenTelemetry = ConfiguredOpenTelemetry(
OpenTelemetrySdk.builder().build(),
SdkTracerProvider.builder(),

View File

@ -29,9 +29,7 @@ message SequencerConnect {
message GetDomainIdResponse {
string domain_id = 1;
// If `sequencer_id` is an empty string, consumers of this API can assume
// that `domain_id` serves as the `sequencer_id`.
string sequencer_id = 2;
string sequencer_uid = 2;
}
message GetDomainParametersRequest {}

View File

@ -97,7 +97,7 @@ message ParticipantStatusInfo {
}
message SequencerNodeStatus {
repeated string connected_participants = 1;
repeated string connected_participant_uids = 1;
// required - status of the sequencer component it is running
SequencerHealthStatus sequencer = 2;
string domain_id = 3;

View File

@ -16,8 +16,8 @@ import "scalapb/scalapb.proto";
message AcsCommitment {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.StableProtoVersion";
string domain_id = 1;
string sending_participant = 2;
string counter_participant = 3;
string sending_participant_uid = 2;
string counter_participant_uid = 3;
int64 from_exclusive = 4; // in microseconds of UTC time since Unix epoch
int64 to_inclusive = 5; // in microseconds of UTC time since Unix epoch
bytes commitment = 6;

View File

@ -11,6 +11,6 @@ import "scalapb/scalapb.proto";
message OrderingRequest {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.UnstableProtoVersion";
string sequencer_id = 1; // Id of the sequencer requesting ordering of the request
string sequencer_uid = 1; // UID of the sequencer requesting ordering of the request
google.protobuf.BytesValue content = 2; // Content of the request to be ordered
}

View File

@ -125,7 +125,7 @@ message CommonMetadata {
reserved 2;
string domain_id = 3;
string uuid = 4;
string mediator = 5;
int32 mediator_group = 5;
}
message SubmitterMetadata {
@ -135,7 +135,7 @@ message SubmitterMetadata {
repeated string act_as = 2;
string application_id = 3;
string command_id = 4;
string submitting_participant = 5;
string submitting_participant_uid = 5;
string submission_id = 6; // optional; absent if not specified by submitter
v30.DeduplicationPeriod dedup_period = 7;
int64 max_sequencing_time = 8; // in microseconds of UTC time since Unix epoch

View File

@ -28,7 +28,7 @@ message TransferOutCommonData {
repeated string stakeholders = 3;
repeated string admin_parties = 4;
string uuid = 5;
string source_mediator = 6;
int32 source_mediator_group = 6;
TransferSubmitterMetadata submitter_metadata = 7;
}
@ -60,7 +60,7 @@ message TransferInCommonData {
string target_domain = 2;
repeated string stakeholders = 3;
string uuid = 4;
string target_mediator = 6;
int32 target_mediator_group = 6;
TransferSubmitterMetadata submitter_metadata = 7;
}
@ -68,7 +68,7 @@ message TransferSubmitterMetadata {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.UnstableProtoVersion";
string submitter = 1;
string submitting_participant = 2;
string submitting_participant_uid = 2;
string command_id = 3;
string submission_id = 4; // optional
string application_id = 5;

View File

@ -108,7 +108,7 @@ message OwnerToKeyMapping {
// UNIQUE(participant,domain)
message DomainTrustCertificate {
// the uid of the participant
string participant = 1;
string participant_uid = 1;
// the uid of the domain that the participant trusts
string domain = 2;
@ -127,7 +127,7 @@ message DomainTrustCertificate {
// UNIQUE(domain,participant)
message ParticipantDomainPermission {
string domain = 1;
string participant = 2;
string participant_uid = 2;
// the permission level of the participant on this domain (usually submission)
Enums.ParticipantPermission permission = 3;
@ -160,7 +160,7 @@ message PartyHostingLimits {
// UNIQUE(participant, domain)
message VettedPackages {
// the participant vetting the packages
string participant = 1;
string participant_uid = 1;
// the hash of the vetted packages
repeated string package_ids = 2;
@ -181,7 +181,7 @@ message VettedPackages {
message PartyToParticipant {
message HostingParticipant {
// the target participant that the party should be mapped to
string participant = 1;
string participant_uid = 1;
// permission of the participant for this particular party (the actual
// will be min of ParticipantDomainPermission.ParticipantPermission and this setting)

View File

@ -70,7 +70,7 @@ message ListPartiesResponse {
com.digitalasset.canton.protocol.v30.Enums.ParticipantPermission permission = 2;
}
string participant = 1;
string participant_uid = 1;
/**
* permissions of this participant for this party on a per domain basis

View File

@ -298,17 +298,6 @@
<maxLevel>INFO</maxLevel>
</rewrite>
<!-- grpc "Half-closed without a request" grpc shutdown flake - might be when client hangs up before server response? https://github.com/grpc/grpc-java/issues/4890
reported by daml-repo ErrorInterceptor/ErrorListener added with daml 2.0.0-snapshot.20211220
Can probably be removed once https://github.com/digital-asset/daml/issues/13413 is fixed. -->
<rewrite class="com.digitalasset.canton.logging.Rewrite">
<logger>com.digitalasset.canton.platform.apiserver.error.ErrorInterceptor</logger>
<contains>LEDGER_API_INTERNAL_ERROR</contains>
<exceptionMessage>Half-closed without a request</exceptionMessage>
<maxLevel>INFO</maxLevel>
<testing>true</testing>
</rewrite>
<!-- shutdown issues in ledger-api server TODO(#11534) -->
<rewrite class="com.digitalasset.canton.logging.Rewrite">
<logger>com.digitalasset.canton.platform.apiserver.error.ErrorInterceptor</logger>

View File

@ -56,9 +56,13 @@ object ProtoDeserializationError extends ProtoDeserializationErrorGroup {
extends ProtoDeserializationError {
override val message = s"Unable to convert numeric field `$field`: $error"
}
final case class InvariantViolation(error: String) extends ProtoDeserializationError {
override def message = error
final case class InvariantViolation(field: Option[String], error: String)
extends ProtoDeserializationError {
override def message =
field.fold(error)(field => s"Invariant violation in field `$field`: $error")
}
final case class MaxBytesToDecompressExceeded(error: String) extends ProtoDeserializationError {
override def message = error
}
@ -105,9 +109,12 @@ object ProtoDeserializationError extends ProtoDeserializationErrorGroup {
}
object InvariantViolation {
def toProtoDeserializationError(e: PureInvariantViolation): InvariantViolation =
InvariantViolation(e.message)
def apply(e: PureInvariantViolation): InvariantViolation = InvariantViolation(e.message)
def toProtoDeserializationError(field: String, e: PureInvariantViolation): InvariantViolation =
InvariantViolation(field = Some(field), error = e.message)
def apply(field: String, e: PureInvariantViolation): InvariantViolation =
InvariantViolation(field = Some(field), error = e.message)
def apply(field: String, error: String): InvariantViolation =
InvariantViolation(field = Some(field), error = error)
}
}

View File

@ -179,13 +179,6 @@ object CantonRequireTypes {
errorMsg(str, maxLength, name),
)
}
def fromProtoPrimitive(
str: String,
name: Option[String] = None,
): ParsingResult[LengthLimitedString] =
LengthLimitedString
.create(str, defaultMaxLength, name)
.leftMap(e => ProtoInvariantViolation(e))
// Should be used rarely - most of the time SetParameter[String255] etc.
// (defined through LengthLimitedStringCompanion) should be used
@ -425,7 +418,7 @@ object CantonRequireTypes {
factoryMethod(str)(name)
def fromProtoPrimitive(str: String, name: String): ParsingResult[A] =
create(str, Some(name)).leftMap(e => ProtoInvariantViolation(e))
create(str, Some(name)).leftMap(e => ProtoInvariantViolation(field = Some(name), error = e))
implicit val lengthLimitedStringOrder: Order[A] =
Order.by[A, String](_.str)

View File

@ -78,22 +78,25 @@ object CantonTimestampSecond {
def MinValue = CantonTimestampSecond(LfTimestamp.MinValue)
def fromProtoTimestamp(ts: ProtoTimestamp): ParsingResult[CantonTimestampSecond] = {
def fromProtoTimestamp(
ts: ProtoTimestamp,
field: String,
): ParsingResult[CantonTimestampSecond] = {
for {
instant <- ProtoConverter.InstantConverter.fromProtoPrimitive(ts)
ts <- CantonTimestampSecond
.fromInstant(instant)
.left
.map(ProtoDeserializationError.InvariantViolation(_))
.map(ProtoDeserializationError.InvariantViolation(field, _))
} yield ts
}
def fromProtoPrimitive(ts: Long): ParsingResult[CantonTimestampSecond] = {
def fromProtoPrimitive(field: String, ts: Long): ParsingResult[CantonTimestampSecond] = {
for {
timestamp <- CantonTimestamp.fromProtoPrimitive(ts)
seconds <- CantonTimestampSecond
.fromCantonTimestamp(timestamp)
.leftMap(ProtoDeserializationError.InvariantViolation(_))
.leftMap(ProtoDeserializationError.InvariantViolation(field, _))
} yield seconds
}

View File

@ -51,7 +51,7 @@ final case class CommonMetadata private (
domainId = domainId.toProtoPrimitive,
salt = Some(salt.toProtoV30),
uuid = ProtoConverter.UuidConverter.toProtoPrimitive(uuid),
mediator = mediator.toProtoPrimitive,
mediatorGroup = mediator.group.value,
)
}
}
@ -106,14 +106,14 @@ object CommonMetadata
domainUid <- UniqueIdentifier
.fromProtoPrimitive_(domainIdP)
.leftMap(e => ProtoDeserializationError.ValueDeserializationError("domainId", e.message))
mediator <- MediatorGroupRecipient
.fromProtoPrimitive(mediatorP, "CommonMetadata.mediator")
mediatorGroup <- ProtoConverter.parseNonNegativeInt("mediator", mediatorP)
mediatorGroupRecipient = MediatorGroupRecipient.apply(mediatorGroup)
salt <- ProtoConverter
.parseRequired(Salt.fromProtoV30, "salt", saltP)
.leftMap(_.inField("salt"))
uuid <- ProtoConverter.UuidConverter.fromProtoPrimitive(uuidP).leftMap(_.inField("uuid"))
pv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield CommonMetadata(DomainId(domainUid), mediator, salt, uuid)(
} yield CommonMetadata(DomainId(domainUid), mediatorGroupRecipient, salt, uuid)(
hashOps,
pv,
Some(bytes),

View File

@ -304,7 +304,7 @@ object GenTransactionTree {
CommonMetadata.fromByteString(expectedProtocolVersion)(hashOps),
)
commonMetadataUnblinded <- commonMetadata.unwrap.leftMap(_ =>
InvariantViolation("GenTransactionTree.commonMetadata is blinded")
InvariantViolation(field = "GenTransactionTree.commonMetadata", error = "is blinded")
)
participantMetadata <- MerkleTree
.fromProtoOptionV30(

View File

@ -133,7 +133,8 @@ object LightTransactionViewTree
result <- LightTransactionViewTree
.create(tree, subviewHashes, rpv)
.leftMap(e =>
ProtoDeserializationError.InvariantViolation(s"Unable to create transaction tree: $e")
ProtoDeserializationError
.InvariantViolation("tree", s"Unable to create transaction tree: $e")
)
} yield result

View File

@ -66,7 +66,7 @@ object Quorum {
.traverse { partyIndexAndWeight =>
val v30.PartyIndexAndWeight(indexP, weightP) = partyIndexAndWeight
for {
weight <- parsePositiveInt(weightP)
weight <- parsePositiveInt("weight", weightP)
confirmingParty <-
Either.cond(
0 <= indexP && indexP < informees.size, {
@ -79,7 +79,7 @@ object Quorum {
)
} yield confirmingParty
}
threshold <- parseNonNegativeInt(thresholdP)
threshold <- parseNonNegativeInt("threshold", thresholdP)
} yield new Quorum(confirmers.toMap, threshold)
}

View File

@ -61,7 +61,7 @@ final case class SubmitterMetadata private (
actAs = actAs.toSeq,
applicationId = applicationId.toProtoPrimitive,
commandId = commandId.toProtoPrimitive,
submittingParticipant = submittingParticipant.toProtoPrimitive,
submittingParticipantUid = submittingParticipant.uid.toProtoPrimitive,
salt = Some(salt.toProtoV30),
submissionId = submissionId.getOrElse(""),
dedupPeriod = Some(SerializableDeduplicationPeriod(dedupPeriod).toProtoV30),
@ -141,15 +141,19 @@ object SubmitterMetadata
actAsP,
applicationIdP,
commandIdP,
submittingParticipantP,
submittingParticipantUidP,
submissionIdP,
dedupPeriodOP,
maxSequencingTimeOP,
) = metaDataP
for {
submittingParticipant <- ParticipantId
.fromProtoPrimitive(submittingParticipantP, "SubmitterMetadata.submitter_participant")
submittingParticipant <- UniqueIdentifier
.fromProtoPrimitive(
submittingParticipantUidP,
"SubmitterMetadata.submitter_participant_uid",
)
.map(ParticipantId(_))
actAs <- actAsP.traverse(
ProtoConverter
.parseLfPartyId(_)

View File

@ -136,7 +136,7 @@ object TransferInViewTree
*
* @param salt Salt for blinding the Merkle hash
* @param targetDomain The domain on which the contract is transferred in
* @param targetMediator The mediator that coordinates the transfer-in request on the target domain
* @param targetMediatorGroup The mediator that coordinates the transfer-in request on the target domain
* @param stakeholders The stakeholders of the transferred contract
* @param uuid The uuid of the transfer-in request
* @param submitterMetadata information about the submission
@ -144,7 +144,7 @@ object TransferInViewTree
final case class TransferInCommonData private (
override val salt: Salt,
targetDomain: TargetDomainId,
targetMediator: MediatorGroupRecipient,
targetMediatorGroup: MediatorGroupRecipient,
stakeholders: Set[LfPartyId],
uuid: UUID,
submitterMetadata: TransferSubmitterMetadata,
@ -167,7 +167,7 @@ final case class TransferInCommonData private (
v30.TransferInCommonData(
salt = Some(salt.toProtoV30),
targetDomain = targetDomain.toProtoPrimitive,
targetMediator = targetMediator.toProtoPrimitive,
targetMediatorGroup = targetMediatorGroup.group.value,
stakeholders = stakeholders.toSeq,
uuid = ProtoConverter.UuidConverter.toProtoPrimitive(uuid),
submitterMetadata = Some(submitterMetadata.toProtoV30),
@ -184,7 +184,7 @@ final case class TransferInCommonData private (
override def pretty: Pretty[TransferInCommonData] = prettyOfClass(
param("submitter metadata", _.submitterMetadata),
param("target domain", _.targetDomain),
param("target mediator", _.targetMediator),
param("target mediator group", _.targetMediatorGroup),
param("stakeholders", _.stakeholders),
param("uuid", _.uuid),
param("salt", _.salt),
@ -234,16 +234,16 @@ object TransferInCommonData
targetDomainP,
stakeholdersP,
uuidP,
targetMediatorP,
targetMediatorGroupP,
submitterMetadataPO,
) = transferInCommonDataP
for {
salt <- ProtoConverter.parseRequired(Salt.fromProtoV30, "salt", saltP)
targetDomain <- TargetDomainId.fromProtoPrimitive(targetDomainP, "target_domain")
targetMediator <- MediatorGroupRecipient.fromProtoPrimitive(
targetMediatorP,
"target_mediator",
targetMediatorGroup <- ProtoConverter.parseNonNegativeInt(
"target_mediator_group",
targetMediatorGroupP,
)
stakeholders <- stakeholdersP.traverse(ProtoConverter.parseLfPartyId)
uuid <- ProtoConverter.UuidConverter.fromProtoPrimitive(uuidP)
@ -254,7 +254,7 @@ object TransferInCommonData
} yield TransferInCommonData(
salt,
targetDomain,
targetMediator,
MediatorGroupRecipient(targetMediatorGroup),
stakeholders.toSet,
uuid,
submitterMetadata,
@ -460,7 +460,7 @@ final case class FullTransferInTree(tree: TransferInViewTree)
override def domainId: DomainId = commonData.targetDomain.unwrap
override def mediator: MediatorGroupRecipient = commonData.targetMediator
override def mediator: MediatorGroupRecipient = commonData.targetMediatorGroup
override def informees: Set[LfPartyId] = commonData.confirmingParties.keySet

View File

@ -129,7 +129,7 @@ object TransferOutViewTree
*
* @param salt Salt for blinding the Merkle hash
* @param sourceDomain The domain to which the transfer-out request is sent
* @param sourceMediator The mediator that coordinates the transfer-out request on the source domain
* @param sourceMediatorGroup The mediator that coordinates the transfer-out request on the source domain
* @param stakeholders The stakeholders of the contract to be transferred
* @param adminParties The admin parties of transferring transfer-out participants
* @param uuid The request UUID of the transfer-out
@ -138,7 +138,7 @@ object TransferOutViewTree
final case class TransferOutCommonData private (
override val salt: Salt,
sourceDomain: SourceDomainId,
sourceMediator: MediatorGroupRecipient,
sourceMediatorGroup: MediatorGroupRecipient,
stakeholders: Set[LfPartyId],
adminParties: Set[LfPartyId],
uuid: UUID,
@ -162,7 +162,7 @@ final case class TransferOutCommonData private (
v30.TransferOutCommonData(
salt = Some(salt.toProtoV30),
sourceDomain = sourceDomain.toProtoPrimitive,
sourceMediator = sourceMediator.toProtoPrimitive,
sourceMediatorGroup = sourceMediatorGroup.group.value,
stakeholders = stakeholders.toSeq,
adminParties = adminParties.toSeq,
uuid = ProtoConverter.UuidConverter.toProtoPrimitive(uuid),
@ -180,7 +180,7 @@ final case class TransferOutCommonData private (
override def pretty: Pretty[TransferOutCommonData] = prettyOfClass(
param("submitter metadata", _.submitterMetadata),
param("source domain", _.sourceDomain),
param("source mediator", _.sourceMediator),
param("source mediator group", _.sourceMediatorGroup),
param("stakeholders", _.stakeholders),
param("admin parties", _.adminParties),
param("uuid", _.uuid),
@ -234,16 +234,16 @@ object TransferOutCommonData
stakeholdersP,
adminPartiesP,
uuidP,
sourceMediatorP,
sourceMediatorGroupP,
submitterMetadataPO,
) = transferOutCommonDataP
for {
salt <- ProtoConverter.parseRequired(Salt.fromProtoV30, "salt", saltP)
sourceDomain <- SourceDomainId.fromProtoPrimitive(sourceDomainP, "source_domain")
sourceMediator <- MediatorGroupRecipient.fromProtoPrimitive(
sourceMediatorP,
"source_mediator",
sourceMediatorGroup <- ProtoConverter.parseNonNegativeInt(
"source_mediator_group",
sourceMediatorGroupP,
)
stakeholders <- stakeholdersP.traverse(ProtoConverter.parseLfPartyId)
adminParties <- adminPartiesP.traverse(ProtoConverter.parseLfPartyId)
@ -255,7 +255,7 @@ object TransferOutCommonData
} yield TransferOutCommonData(
salt,
sourceDomain,
sourceMediator,
MediatorGroupRecipient(sourceMediatorGroup),
stakeholders.toSet,
adminParties.toSet,
uuid,
@ -440,7 +440,7 @@ final case class FullTransferOutTree(tree: TransferOutViewTree)
override def domainId: DomainId = sourceDomain.unwrap
override def mediator: MediatorGroupRecipient = commonData.sourceMediator
override def mediator: MediatorGroupRecipient = commonData.sourceMediatorGroup
override def informees: Set[LfPartyId] = commonData.confirmingParties.keySet

View File

@ -8,7 +8,7 @@ import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.v30
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.topology.{ParticipantId, UniqueIdentifier}
/** Information about the submitters of the transaction in the case of a Transfer.
* This data structure is quite similar to [[com.digitalasset.canton.data.SubmitterMetadata]]
@ -26,7 +26,7 @@ final case class TransferSubmitterMetadata(
def toProtoV30: v30.TransferSubmitterMetadata =
v30.TransferSubmitterMetadata(
submitter = submitter,
submittingParticipant = submittingParticipant.toProtoPrimitive,
submittingParticipantUid = submittingParticipant.uid.toProtoPrimitive,
commandId = commandId,
submissionId = submissionId.getOrElse(""),
applicationId = applicationId,
@ -59,7 +59,9 @@ object TransferSubmitterMetadata {
for {
submitter <- ProtoConverter.parseLfPartyId(submitterP)
submittingParticipant <-
ParticipantId.fromProtoPrimitive(submittingParticipantP, "submittingParticipant")
UniqueIdentifier
.fromProtoPrimitive(submittingParticipantP, "submitting_participant_uid")
.map(ParticipantId(_))
commandId <- ProtoConverter.parseCommandId(commandIdP)
submissionId <- ProtoConverter.parseLFSubmissionIdO(submissionIdP)
applicationId <- ProtoConverter.parseLFApplicationId(applicationIdP)

View File

@ -232,7 +232,10 @@ object ViewConfirmationParameters {
Either.cond(
notAnInformee.isEmpty,
ViewConfirmationParameters(informees, quorums),
InvariantViolation(s"confirming parties $notAnInformee are not in the list of informees"),
InvariantViolation(
field = None,
error = s"confirming parties $notAnInformee are not in the list of informees",
),
)
}

View File

@ -0,0 +1,71 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.grpc
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
class ByteStringStreamObserver[T](converter: T => ByteString)
extends ByteStringStreamObserverWithContext[T, Unit](converter, _ => ()) {
def resultBytes(implicit ec: ExecutionContext): Future[ByteString] = result.map(_._1)
}
// This observer allows extracting a bytestring, as well as other fields that are part of the request.
// It expects these fields to remain unchanged during the processing of the stream.
class ByteStringStreamObserverWithContext[T, Context](
converter: T => ByteString,
extractContext: T => Context,
) extends StreamObserver[T] {
private val byteBuffer = new AtomicReference(ByteString.EMPTY)
private val requestComplete: Promise[(ByteString, Context)] = Promise[(ByteString, Context)]()
val context = new AtomicReference[Option[Context]](None)
def result: Future[(ByteString, Context)] =
requestComplete.future
private def setOrCheck(current: Context): Try[Unit] =
if (!context.compareAndSet(None, Some(current))) {
val previous = context.get()
if (previous.contains(current)) {
Success(())
} else {
Failure(new IllegalStateException(s"Context cannot be changed from: $previous to $current"))
}
} else {
Success(())
}
override def onNext(value: T): Unit = {
val processRequest =
for {
_ <- setOrCheck(extractContext(value))
_ <- Try(byteBuffer.getAndUpdate(b1 => b1.concat(converter(value))).discard)
} yield ()
processRequest match {
case Failure(exception) => requestComplete.failure(exception)
case Success(_) => () // Nothing to do, just move on to the next request
}
}
override def onError(t: Throwable): Unit = {
requestComplete.tryFailure(t).discard
}
override def onCompleted(): Unit = {
val finalByteString = byteBuffer.get()
val finalResult =
context
.get()
.map(Success(_))
.getOrElse(Failure(new IllegalStateException("Context not set")))
.map((finalByteString, _))
requestComplete.tryComplete(finalResult).discard
}
}

View File

@ -1,7 +1,7 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.console.commands
package com.digitalasset.canton.grpc
import better.files.File
import com.digitalasset.canton.discard.Implicits.DiscardOps
@ -12,7 +12,7 @@ import io.grpc.stub.StreamObserver
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
private[commands] class FileStreamObserver[T](
class FileStreamObserver[T](
inputFile: File,
converter: T => ByteString,
) extends StreamObserver[T] {

View File

@ -197,10 +197,12 @@ object PrettyUtil extends PrettyUtil {
import scala.language.implicitConversions
trait PrettyBareCase extends Product with PrettyPrinting {
/** A trait for case classes that should be pretty-printed with their name only.
*/
trait PrettyNameOnlyCase extends Product with PrettyPrinting {
@SuppressWarnings(Array("org.wartremover.warts.Product"))
override protected[pretty] def pretty: Pretty[this.type] = prettyOfObject
}
object PrettyBareCase {
implicit def toString(pt: PrettyBareCase): String = pt.toString
object PrettyNameOnlyCase {
implicit def toString(pt: PrettyNameOnlyCase): String = pt.toString
}

View File

@ -196,15 +196,7 @@ class ApiRequestLoggerBase(
if (enhancedStatus.getCode == UNKNOWN || enhancedStatus.getCode == DATA_LOSS) {
logger.error(message, enhancedStatus.getCause)
} else if (enhancedStatus.getCode == INTERNAL) {
if (enhancedStatus.getDescription == "Half-closed without a request") {
// If a call is cancelled, GRPC may half-close the call before the first message has been delivered.
// The result is this status.
// Logging with INFO to not confuse the user.
// The status is still delivered to the client, to facilitate troubleshooting if there is a deeper problem.
logger.info(message, enhancedStatus.getCause)
} else {
logger.error(message, enhancedStatus.getCause)
}
logger.error(message, enhancedStatus.getCause)
} else if (enhancedStatus.getCode == UNAUTHENTICATED) {
logger.debug(message, enhancedStatus.getCause)
} else {

View File

@ -746,12 +746,14 @@ object DynamicDomainParameters extends HasProtocolVersionedCompanion[DynamicDoma
confirmationRequestsMaxRate <- NonNegativeInt
.create(confirmationRequestsMaxRateP)
.leftMap(InvariantViolation.toProtoDeserializationError)
.leftMap(
InvariantViolation.toProtoDeserializationError("confirmation_requests_max_rate", _)
)
maxRequestSize <- NonNegativeInt
.create(maxRequestSizeP)
.map(MaxRequestSize)
.leftMap(InvariantViolation.toProtoDeserializationError)
.leftMap(InvariantViolation.toProtoDeserializationError("max_request_size", _))
sequencerAggregateSubmissionTimeout <- NonNegativeFiniteDuration.fromProtoPrimitiveO(
"sequencerAggregateSubmissionTimeout"
@ -789,7 +791,7 @@ object DynamicDomainParameters extends HasProtocolVersionedCompanion[DynamicDoma
class InvalidDynamicDomainParameters(message: String) extends RuntimeException(message) {
lazy val toProtoDeserializationError: ProtoDeserializationError.InvariantViolation =
ProtoDeserializationError.InvariantViolation(message)
ProtoDeserializationError.InvariantViolation(field = None, error = message)
}
}
@ -943,9 +945,13 @@ object AcsCommitmentsCatchUpConfig {
): ParsingResult[AcsCommitmentsCatchUpConfig] = {
val v30.AcsCommitmentsCatchUpConfig(catchUpIntervalSkipP, nrIntervalsToTriggerCatchUpP) = value
for {
catchUpIntervalSkip <- ProtoConverter.parsePositiveInt(catchUpIntervalSkipP)
catchUpIntervalSkip <- ProtoConverter.parsePositiveInt(
"catchup_interval_skip",
catchUpIntervalSkipP,
)
nrIntervalsToTriggerCatchUp <- ProtoConverter.parsePositiveInt(
nrIntervalsToTriggerCatchUpP
"nr_intervals_to_trigger_catch_up",
nrIntervalsToTriggerCatchUpP,
)
} yield AcsCommitmentsCatchUpConfig(catchUpIntervalSkip, nrIntervalsToTriggerCatchUp)
}

View File

@ -127,8 +127,8 @@ abstract sealed case class AcsCommitment private (
protected def toProtoV30: v30.AcsCommitment = {
v30.AcsCommitment(
domainId = domainId.toProtoPrimitive,
sendingParticipant = sender.toProtoPrimitive,
counterParticipant = counterParticipant.toProtoPrimitive,
sendingParticipantUid = sender.uid.toProtoPrimitive,
counterParticipantUid = counterParticipant.uid.toProtoPrimitive,
fromExclusive = period.fromExclusive.toProtoPrimitive,
toInclusive = period.toInclusive.toProtoPrimitive,
commitment = AcsCommitment.commitmentTypeToProto(commitment),
@ -192,22 +192,30 @@ object AcsCommitment extends HasMemoizedProtocolVersionedWrapperCompanion[AcsCom
): ParsingResult[AcsCommitment] = {
for {
domainId <- DomainId.fromProtoPrimitive(protoMsg.domainId, "AcsCommitment.domainId")
sender <- ParticipantId.fromProtoPrimitive(
protoMsg.sendingParticipant,
"AcsCommitment.sender",
sender <- UniqueIdentifier
.fromProtoPrimitive(
protoMsg.sendingParticipantUid,
"AcsCommitment.sending_participant_uid",
)
.map(ParticipantId(_))
counterParticipant <- UniqueIdentifier
.fromProtoPrimitive(
protoMsg.counterParticipantUid,
"AcsCommitment.counter_participant_uid",
)
.map(ParticipantId(_))
fromExclusive <- CantonTimestampSecond.fromProtoPrimitive(
"from_exclusive",
protoMsg.fromExclusive,
)
counterParticipant <- ParticipantId.fromProtoPrimitive(
protoMsg.counterParticipant,
"AcsCommitment.counterParticipant",
)
fromExclusive <- CantonTimestampSecond.fromProtoPrimitive(protoMsg.fromExclusive)
toInclusive <- CantonTimestampSecond.fromProtoPrimitive(protoMsg.toInclusive)
toInclusive <- CantonTimestampSecond.fromProtoPrimitive("to_inclusive", protoMsg.toInclusive)
periodLength <- PositiveSeconds
.between(fromExclusive, toInclusive)
.leftMap { _ =>
ProtoDeserializationError.InvariantViolation(
s"Illegal commitment period length: $fromExclusive, $toInclusive"
field = None,
error = s"Illegal commitment period length: $fromExclusive, $toInclusive",
)
}

View File

@ -278,7 +278,7 @@ object ConfirmationResponse
domainId,
)(rpv, Some(bytes))
)
.leftMap(err => InvariantViolation(err.toString))
.leftMap(err => InvariantViolation(field = None, error = err.toString))
} yield response
}

View File

@ -94,8 +94,11 @@ object SetTrafficPurchasedMessage
)(bytes: ByteString): ParsingResult[SetTrafficPurchasedMessage] = {
for {
member <- Member.fromProtoPrimitive(proto.member, "member")
serial <- ProtoConverter.parsePositiveInt(proto.serial)
totalTrafficPurchased <- ProtoConverter.parseNonNegativeLong(proto.totalTrafficPurchased)
serial <- ProtoConverter.parsePositiveInt("serial", proto.serial)
totalTrafficPurchased <- ProtoConverter.parseNonNegativeLong(
"total_traffic_purchased",
proto.totalTrafficPurchased,
)
domainId <- DomainId.fromProtoPrimitive(proto.domainId, "domain_id")
rpv <- protocolVersionRepresentativeFor(ProtoVersion(1))
} yield SetTrafficPurchasedMessage(

View File

@ -55,7 +55,7 @@ final case class TransferInMediatorMessage(
override def domainId: DomainId = commonData.targetDomain.unwrap
override def mediator: MediatorGroupRecipient = commonData.targetMediator
override def mediator: MediatorGroupRecipient = commonData.targetMediatorGroup
override def requestUuid: UUID = commonData.uuid

View File

@ -54,7 +54,7 @@ final case class TransferOutMediatorMessage(
override def domainId: DomainId = commonData.sourceDomain.unwrap
override def mediator: MediatorGroupRecipient = commonData.sourceMediator
override def mediator: MediatorGroupRecipient = commonData.sourceMediatorGroup
override def requestUuid: UUID = commonData.uuid

View File

@ -186,7 +186,7 @@ object Verdict
reasons <- reasonsP.traverse(fromProtoReasonV30)
reasonsNE <- NonEmpty
.from(reasons.toList)
.toRight(InvariantViolation("Field reasons must not be empty!"))
.toRight(InvariantViolation("reasons", "must not be empty!"))
} yield ParticipantReject(reasonsNE)(pv)
def fromProtoV30(
@ -226,7 +226,7 @@ object Verdict
localReject <- localVerdict match {
case localReject: LocalReject => Right(localReject)
case LocalApprove() =>
Left(InvariantViolation("RejectionReason.reject must not be approve."))
Left(InvariantViolation("reject", "must not be approve"))
}
} yield (parties, localReject)
}

View File

@ -175,7 +175,10 @@ object SequencerConnections
submissionRequestAmplificationP,
) = sequencerConnectionsProto
for {
sequencerTrustThreshold <- ProtoConverter.parsePositiveInt(sequencerTrustThresholdP)
sequencerTrustThreshold <- ProtoConverter.parsePositiveInt(
"sequencer_trust_threshold",
sequencerTrustThresholdP,
)
submissionRequestAmplification <- ProtoConverter.parseRequired(
SubmissionRequestAmplification.fromProtoV30,
"submission_request_amplification",
@ -198,7 +201,7 @@ object SequencerConnections
sequencerConnectionsNes,
sequencerTrustThreshold,
submissionRequestAmplification,
).leftMap(ProtoDeserializationError.InvariantViolation(_))
).leftMap(ProtoDeserializationError.InvariantViolation(field = None, _))
} yield sequencerConnections
}

View File

@ -45,7 +45,7 @@ object SubmissionRequestAmplification {
): ParsingResult[SubmissionRequestAmplification] = {
val v30.SubmissionRequestAmplification(factorP, patienceP) = proto
for {
factor <- ProtoConverter.parsePositiveInt(factorP)
factor <- ProtoConverter.parsePositiveInt("factor", factorP)
patience <- ProtoConverter.parseRequired(
config.NonNegativeFiniteDuration.fromProtoPrimitive("patience"),
"patience",

View File

@ -73,7 +73,10 @@ object TrafficControlParameters {
proto: protoV30.TrafficControlParameters
): ParsingResult[TrafficControlParameters] = {
for {
maxBaseTrafficAmount <- ProtoConverter.parseNonNegativeLong(proto.maxBaseTrafficAmount)
maxBaseTrafficAmount <- ProtoConverter.parseNonNegativeLong(
"max_base_traffic_amount",
proto.maxBaseTrafficAmount,
)
maxBaseTrafficAccumulationDuration <- ProtoConverter.parseRequired(
time.NonNegativeFiniteDuration.fromProtoPrimitive("max_base_traffic_accumulation_duration"),
"max_base_traffic_accumulation_duration",
@ -86,7 +89,10 @@ object TrafficControlParameters {
"set_balance_request_submission_window_size",
proto.setBalanceRequestSubmissionWindowSize,
)
scalingFactor <- ProtoConverter.parsePositiveInt(proto.readVsWriteScalingFactor)
scalingFactor <- ProtoConverter.parsePositiveInt(
"read_vs_write_scaling_factor",
proto.readVsWriteScalingFactor,
)
} yield TrafficControlParameters(
maxBaseTrafficAmount,
scalingFactor,

View File

@ -86,7 +86,7 @@ object AggregationRule
"eligible_members",
eligibleMembersP,
)
threshold <- ProtoConverter.parsePositiveInt(thresholdP)
threshold <- ProtoConverter.parsePositiveInt("threshold", thresholdP)
rpv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield AggregationRule(eligibleMembers, threshold)(rpv)
}

View File

@ -5,13 +5,12 @@ package com.digitalasset.canton.sequencing.protocol
import cats.syntax.either.*
import com.digitalasset.canton.ProtoDeserializationError.{
InvariantViolation,
StringConversionError,
ValueConversionError,
}
import com.digitalasset.canton.config.CantonRequireTypes.{String3, String300}
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.MediatorGroup.MediatorGroupIndex
import com.digitalasset.canton.topology.client.TopologySnapshot
@ -85,9 +84,7 @@ object Recipient {
s"Cannot parse group number $rest, error ${e.getMessage}"
)
)
group <- NonNegativeInt
.create(groupInt)
.leftMap(e => InvariantViolation(e.message))
group <- ProtoConverter.parseNonNegativeInt(s"group in $recipient", groupInt)
} yield MediatorGroupRecipient(group)
case AllMembersOfDomain.Code =>
Right(AllMembersOfDomain)

View File

@ -62,7 +62,7 @@ object SequencingSubmissionCost
): ParsingResult[SequencingSubmissionCost] = {
val v30.SequencingSubmissionCost(costP) = proto
for {
cost <- ProtoConverter.parseNonNegativeLong(costP)
cost <- ProtoConverter.parseNonNegativeLong("cost", costP)
rpv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield SequencingSubmissionCost(cost, rpv.representative)
}

View File

@ -123,12 +123,24 @@ object TrafficState {
def fromProtoV30(
trafficStateP: v30.TrafficState
): Either[ProtoDeserializationError, TrafficState] = for {
extraTrafficLimit <- ProtoConverter.parseNonNegativeLong(trafficStateP.extraTrafficPurchased)
extraTrafficConsumed <- ProtoConverter.parseNonNegativeLong(trafficStateP.extraTrafficConsumed)
baseTrafficRemainder <- ProtoConverter.parseNonNegativeLong(trafficStateP.baseTrafficRemainder)
lastConsumedCost <- ProtoConverter.parseNonNegativeLong(trafficStateP.lastConsumedCost)
extraTrafficLimit <- ProtoConverter.parseNonNegativeLong(
"extra_traffic_purchased",
trafficStateP.extraTrafficPurchased,
)
extraTrafficConsumed <- ProtoConverter.parseNonNegativeLong(
"extra_traffic_consumed",
trafficStateP.extraTrafficConsumed,
)
baseTrafficRemainder <- ProtoConverter.parseNonNegativeLong(
"base_traffic_remainder",
trafficStateP.baseTrafficRemainder,
)
lastConsumedCost <- ProtoConverter.parseNonNegativeLong(
"last_consumed_cost",
trafficStateP.lastConsumedCost,
)
timestamp <- CantonTimestamp.fromProtoPrimitive(trafficStateP.timestamp)
serial <- trafficStateP.serial.traverse(ProtoConverter.parsePositiveInt)
serial <- trafficStateP.serial.traverse(ProtoConverter.parsePositiveInt("serial", _))
} yield TrafficState(
extraTrafficLimit,
extraTrafficConsumed,

View File

@ -216,16 +216,19 @@ object TrafficConsumed {
for {
member <- Member.fromProtoPrimitive(trafficConsumedP.member, "member")
extraTrafficConsumed <- ProtoConverter.parseNonNegativeLong(
trafficConsumedP.extraTrafficConsumed
"extra_traffic_consumed",
trafficConsumedP.extraTrafficConsumed,
)
baseTrafficRemainder <- ProtoConverter.parseNonNegativeLong(
trafficConsumedP.baseTrafficRemainder
"base_traffic_remainder",
trafficConsumedP.baseTrafficRemainder,
)
sequencingTimestamp <- CantonTimestamp.fromProtoPrimitive(
trafficConsumedP.sequencingTimestamp
)
lastConsumedCost <- ProtoConverter.parseNonNegativeLong(
trafficConsumedP.lastConsumedCost
"last_consumed_cost",
trafficConsumedP.lastConsumedCost,
)
} yield TrafficConsumed(
member = member,

View File

@ -66,8 +66,11 @@ object TrafficPurchased {
def fromProtoV30(trafficPurchasedP: TrafficPurchasedP): ParsingResult[TrafficPurchased] =
for {
member <- Member.fromProtoPrimitive(trafficPurchasedP.member, "member")
serial <- ProtoConverter.parsePositiveInt(trafficPurchasedP.serial)
balance <- ProtoConverter.parseNonNegativeLong(trafficPurchasedP.extraTrafficPurchased)
serial <- ProtoConverter.parsePositiveInt("serial", trafficPurchasedP.serial)
balance <- ProtoConverter.parseNonNegativeLong(
"extra_traffic_purchased",
trafficPurchasedP.extraTrafficPurchased,
)
sequencingTimestamp <- CantonTimestamp.fromProtoPrimitive(
trafficPurchasedP.sequencingTimestamp
)

View File

@ -41,13 +41,16 @@ object TrafficReceipt {
def fromProtoV30(trafficReceiptP: TrafficReceiptP): ParsingResult[TrafficReceipt] =
for {
consumedCost <- ProtoConverter.parseNonNegativeLong(
trafficReceiptP.consumedCost
"consumed_cost",
trafficReceiptP.consumedCost,
)
totalExtraTrafficConsumed <- ProtoConverter.parseNonNegativeLong(
trafficReceiptP.extraTrafficConsumed
"extra_traffic_consumed",
trafficReceiptP.extraTrafficConsumed,
)
baseTrafficRemainder <- ProtoConverter.parseNonNegativeLong(
trafficReceiptP.baseTrafficRemainder
"base_traffic_remainder",
trafficReceiptP.baseTrafficRemainder,
)
} yield TrafficReceipt(
consumedCost = consumedCost,

View File

@ -116,17 +116,23 @@ object ProtoConverter {
parsed <- contentNE.toNEF.traverse(fromProto)
} yield parsed
def parsePositiveInt(i: Int): ParsingResult[PositiveInt] =
PositiveInt.create(i).leftMap(ProtoDeserializationError.InvariantViolation(_))
def parsePositiveInt(field: String, i: Int): ParsingResult[PositiveInt] =
PositiveInt.create(i).leftMap(ProtoDeserializationError.InvariantViolation(field, _))
def parsePositiveLong(l: Long): ParsingResult[PositiveLong] =
PositiveLong.create(l).leftMap(ProtoDeserializationError.InvariantViolation(_))
def parsePositiveLong(field: String, l: Long): ParsingResult[PositiveLong] =
PositiveLong
.create(l)
.leftMap(ProtoDeserializationError.InvariantViolation(field, _))
def parseNonNegativeInt(i: Int): ParsingResult[NonNegativeInt] =
NonNegativeInt.create(i).leftMap(ProtoDeserializationError.InvariantViolation(_))
def parseNonNegativeInt(field: String, i: Int): ParsingResult[NonNegativeInt] =
NonNegativeInt
.create(i)
.leftMap(ProtoDeserializationError.InvariantViolation(field, _))
def parseNonNegativeLong(l: Long): ParsingResult[NonNegativeLong] =
NonNegativeLong.create(l).leftMap(ProtoDeserializationError.InvariantViolation(_))
def parseNonNegativeLong(field: String, l: Long): ParsingResult[NonNegativeLong] =
NonNegativeLong
.create(l)
.leftMap(ProtoDeserializationError.InvariantViolation(field, _))
def parseLfPartyId(party: String): ParsingResult[LfPartyId] =
parseString(party)(LfPartyId.fromString)

View File

@ -79,6 +79,24 @@ sealed trait TopologyMapping extends Product with Serializable with PrettyPrinti
}
object TopologyMapping {
private[transaction] def participantIdFromProtoPrimitive(
proto: String,
fieldName: String,
): ParsingResult[ParticipantId] = {
/*
We changed some of the topology protobufs from `string participant_id` (member id, with PAR prefix)
to string participant_uid` (uid of the participant, without PAR prefix).
This allows backward compatible parsing.
The fallback can be removed if/when all the topology transactions in the global synchronizer are
recreated from scratch.
*/
val participantIdOld = ParticipantId.fromProtoPrimitive(proto = proto, fieldName = fieldName)
participantIdOld.orElse(
UniqueIdentifier.fromProtoPrimitive(uid = proto, fieldName = fieldName).map(ParticipantId(_))
)
}
private[transaction] def buildUniqueKey(code: TopologyMapping.Code)(
addUniqueKeyToBuilder: HashBuilder => HashBuilder
@ -527,13 +545,14 @@ object DecentralizedNamespaceDefinition {
decentralizedNamespace <- Fingerprint
.fromProtoPrimitive(decentralizedNamespaceP)
.map(Namespace(_))
threshold <- ProtoConverter.parsePositiveInt(thresholdP)
threshold <- ProtoConverter.parsePositiveInt("threshold", thresholdP)
owners <- ownersP.traverse(Fingerprint.fromProtoPrimitive)
ownersNE <- NonEmpty
.from(owners.toSet)
.toRight(
ProtoDeserializationError.InvariantViolation(
"owners cannot be empty"
field = "owners",
error = "cannot be empty",
)
)
item <- create(decentralizedNamespace, threshold, ownersNE.map(Namespace(_)))
@ -698,7 +717,7 @@ final case class DomainTrustCertificate(
def toProto: v30.DomainTrustCertificate =
v30.DomainTrustCertificate(
participant = participantId.toProtoPrimitive,
participantUid = participantId.uid.toProtoPrimitive,
domain = domainId.toProtoPrimitive,
transferOnlyToGivenTargetDomains = transferOnlyToGivenTargetDomains,
targetDomains = targetDomains.map(_.toProtoPrimitive),
@ -739,7 +758,10 @@ object DomainTrustCertificate {
value: v30.DomainTrustCertificate
): ParsingResult[DomainTrustCertificate] =
for {
participantId <- ParticipantId.fromProtoPrimitive(value.participant, "participant")
participantId <- TopologyMapping.participantIdFromProtoPrimitive(
value.participantUid,
"participant_uid",
)
domainId <- DomainId.fromProtoPrimitive(value.domain, "domain")
transferOnlyToGivenTargetDomains = value.transferOnlyToGivenTargetDomains
targetDomains <- value.targetDomains.traverse(
@ -839,7 +861,7 @@ final case class ParticipantDomainPermission(
def toProto: v30.ParticipantDomainPermission =
v30.ParticipantDomainPermission(
domain = domainId.toProtoPrimitive,
participant = participantId.toProtoPrimitive,
participantUid = participantId.uid.toProtoPrimitive,
permission = permission.toProtoV30,
limits = limits.map(_.toProto),
loginAfter = loginAfter.map(_.toProtoPrimitive),
@ -908,7 +930,10 @@ object ParticipantDomainPermission {
): ParsingResult[ParticipantDomainPermission] =
for {
domainId <- DomainId.fromProtoPrimitive(value.domain, "domain")
participantId <- ParticipantId.fromProtoPrimitive(value.participant, "participant")
participantId <- TopologyMapping.participantIdFromProtoPrimitive(
value.participantUid,
"participant_uid",
)
permission <- ParticipantPermission.fromProtoV30(value.permission)
limits = value.limits.map(ParticipantDomainLimits.fromProtoV30)
loginAfter <- value.loginAfter.traverse(CantonTimestamp.fromProtoPrimitive)
@ -985,7 +1010,7 @@ final case class VettedPackages(
def toProto: v30.VettedPackages =
v30.VettedPackages(
participant = participantId.toProtoPrimitive,
participantUid = participantId.uid.toProtoPrimitive,
packageIds = packageIds,
domain = domainId.fold("")(_.toProtoPrimitive),
)
@ -1025,7 +1050,10 @@ object VettedPackages {
value: v30.VettedPackages
): ParsingResult[VettedPackages] =
for {
participantId <- ParticipantId.fromProtoPrimitive(value.participant, "participant")
participantId <- TopologyMapping.participantIdFromProtoPrimitive(
value.participantUid,
"participant_uid",
)
packageIds <- value.packageIds
.traverse(LfPackageId.fromString)
.leftMap(ProtoDeserializationError.ValueConversionError("package_ids", _))
@ -1043,7 +1071,7 @@ final case class HostingParticipant(
) {
def toProto: v30.PartyToParticipant.HostingParticipant =
v30.PartyToParticipant.HostingParticipant(
participant = participantId.toProtoPrimitive,
participantUid = participantId.uid.toProtoPrimitive,
permission = permission.toProtoV30,
)
}
@ -1052,7 +1080,10 @@ object HostingParticipant {
def fromProtoV30(
value: v30.PartyToParticipant.HostingParticipant
): ParsingResult[HostingParticipant] = for {
participantId <- ParticipantId.fromProtoPrimitive(value.participant, "participant")
participantId <- TopologyMapping.participantIdFromProtoPrimitive(
value.participantUid,
"participant_uid",
)
permission <- ParticipantPermission.fromProtoV30(value.permission)
} yield HostingParticipant(participantId, permission)
}
@ -1192,7 +1223,7 @@ object PartyToParticipant {
): ParsingResult[PartyToParticipant] =
for {
partyId <- PartyId.fromProtoPrimitive(value.party, "party")
threshold <- ProtoConverter.parsePositiveInt(value.threshold)
threshold <- ProtoConverter.parsePositiveInt("threshold", value.threshold)
participants <- value.participants.traverse(HostingParticipant.fromProtoV30)
groupAddressing = value.groupAddressing
domainId <-
@ -1271,7 +1302,7 @@ object AuthorityOf {
): ParsingResult[AuthorityOf] =
for {
partyId <- PartyId.fromProtoPrimitive(value.party, "party")
threshold <- ProtoConverter.parsePositiveInt(value.threshold)
threshold <- ProtoConverter.parsePositiveInt("threshold", value.threshold)
parties <- value.parties.traverse(PartyId.fromProtoPrimitive(_, "parties"))
domainId <-
if (value.domain.nonEmpty)
@ -1473,8 +1504,8 @@ object MediatorDomainState {
domainId <- DomainId.fromProtoPrimitive(domainIdP, "domain")
group <- NonNegativeInt
.create(groupP)
.leftMap(ProtoDeserializationError.InvariantViolation(_))
threshold <- ProtoConverter.parsePositiveInt(thresholdP)
.leftMap(ProtoDeserializationError.InvariantViolation("group", _))
threshold <- ProtoConverter.parsePositiveInt("threshold", thresholdP)
active <- activeP.traverse(
UniqueIdentifier.fromProtoPrimitive(_, "active").map(MediatorId(_))
)
@ -1564,7 +1595,7 @@ object SequencerDomainState {
val v30.SequencerDomainState(domainIdP, thresholdP, activeP, observersP) = value
for {
domainId <- DomainId.fromProtoPrimitive(domainIdP, "domain")
threshold <- ProtoConverter.parsePositiveInt(thresholdP)
threshold <- ProtoConverter.parsePositiveInt("threshold", thresholdP)
active <- activeP.traverse(
UniqueIdentifier.fromProtoPrimitive(_, "active").map(SequencerId(_))
)

View File

@ -83,6 +83,7 @@ class ValidatingTopologyMappingChecks(
.select[TopologyChangeOp.Replace, PartyToParticipant]
.map(
checkPartyToParticipant(
effective,
_,
inStore.flatMap(_.select[TopologyChangeOp.Replace, PartyToParticipant]),
)
@ -139,6 +140,7 @@ class ValidatingTopologyMappingChecks(
.select[TopologyChangeOp.Replace, DecentralizedNamespaceDefinition]
.map(
checkDecentralizedNamespaceDefinitionReplace(
effective,
_,
inStore.flatMap(_.select[TopologyChangeOp, DecentralizedNamespaceDefinition]),
)
@ -150,9 +152,9 @@ class ValidatingTopologyMappingChecks(
) =>
toValidate
.select[TopologyChangeOp.Replace, NamespaceDelegation]
.map(checkNamespaceDelegationReplace)
.map(checkNamespaceDelegationReplace(effective, _))
case otherwise => None
case _otherwise => None
}
checkFirstIsNotRemove
@ -234,7 +236,7 @@ class ValidatingTopologyMappingChecks(
case param :: Nil => Right(param)
case param :: rest =>
logger.error(
s"Multiple domain parameters at ${effective} ${rest.size + 1}. Using first one: $param."
s"Multiple domain parameters at $effective ${rest.size + 1}. Using first one: $param."
)
Right(param)
}
@ -296,7 +298,7 @@ class ValidatingTopologyMappingChecks(
case Some(Some(loginAfter)) if loginAfter > effective.value =>
// this should not happen except under race conditions, as sequencers should not let participants login
logger.warn(
s"Rejecting onboarding of ${toValidate.mapping.participantId} as the participant still has a login ban until ${loginAfter}"
s"Rejecting onboarding of ${toValidate.mapping.participantId} as the participant still has a login ban until $loginAfter"
)
Left(
TopologyTransactionRejection
@ -339,6 +341,7 @@ class ValidatingTopologyMappingChecks(
* - new participants have an OTK with at least 1 signing key and 1 encryption key
*/
private def checkPartyToParticipant(
effective: EffectiveTime,
toValidate: SignedTopologyTransaction[TopologyChangeOp.Replace, PartyToParticipant],
inStore: Option[SignedTopologyTransaction[TopologyChangeOp.Replace, PartyToParticipant]],
)(implicit
@ -409,9 +412,9 @@ class ValidatingTopologyMappingChecks(
case Nil => // No hosting limits found. This is expected if no restrictions are in place
None
case quota :: Nil => Some(quota)
case multiple @ (quota :: _) =>
case multiple @ quota :: _ =>
logger.error(
s"Multiple PartyHostingLimits at ${effective} ${multiple.size}. Using first one with quota $quota."
s"Multiple PartyHostingLimits at $effective ${multiple.size}. Using first one with quota $quota."
)
Some(quota)
}
@ -433,7 +436,7 @@ class ValidatingTopologyMappingChecks(
for {
_ <- checkParticipants()
_ <- checkHostingLimits(EffectiveTime.MaxValue)
_ <- checkHostingLimits(effective)
} yield ()
}
@ -461,7 +464,7 @@ class ValidatingTopologyMappingChecks(
EitherTUtil.condUnitET[Future][TopologyTransactionRejection](
// all nodes require signing keys
// non-participants don't need encryption keys
(!isParticipant || encryptionKeys.nonEmpty),
!isParticipant || encryptionKeys.nonEmpty,
TopologyTransactionRejection.InvalidTopologyMapping(
"OwnerToKeyMapping for participants must contain at least 1 encryption key."
),
@ -576,6 +579,7 @@ class ValidatingTopologyMappingChecks(
}
private def checkDecentralizedNamespaceDefinitionReplace(
effective: EffectiveTime,
toValidate: SignedTopologyTransaction[
TopologyChangeOp.Replace,
DecentralizedNamespaceDefinition,
@ -601,11 +605,11 @@ class ValidatingTopologyMappingChecks(
EitherTUtil.unit
}
def checkNoClashWithRootCertificates()(implicit
def checkNoClashWithRootCertificates(effective: EffectiveTime)(implicit
traceContext: TraceContext
): EitherT[Future, TopologyTransactionRejection, Unit] = {
loadFromStore(
EffectiveTime.MaxValue,
effective,
Code.NamespaceDelegation,
filterUid = None,
filterNamespace = Some(Seq(toValidate.mapping.namespace)),
@ -622,22 +626,23 @@ class ValidatingTopologyMappingChecks(
for {
_ <- checkDecentralizedNamespaceDerivedFromOwners()
_ <- checkNoClashWithRootCertificates()
_ <- checkNoClashWithRootCertificates(effective)
} yield ()
}
private def checkNamespaceDelegationReplace(
effective: EffectiveTime,
toValidate: SignedTopologyTransaction[
TopologyChangeOp.Replace,
NamespaceDelegation,
]
],
)(implicit traceContext: TraceContext): EitherT[Future, TopologyTransactionRejection, Unit] = {
def checkNoClashWithDecentralizedNamespaces()(implicit
traceContext: TraceContext
): EitherT[Future, TopologyTransactionRejection, Unit] = {
EitherTUtil.ifThenET(NamespaceDelegation.isRootCertificate(toValidate)) {
loadFromStore(
EffectiveTime.MaxValue,
effective,
Code.DecentralizedNamespaceDefinition,
filterUid = None,
filterNamespace = Some(Seq(toValidate.mapping.namespace)),

View File

@ -211,7 +211,7 @@ object TopologyTransaction
val v30.TopologyTransaction(opP, serialP, mappingP) = transactionP
for {
mapping <- ProtoConverter.parseRequired(TopologyMapping.fromProtoV30, "mapping", mappingP)
serial <- ProtoConverter.parsePositiveInt(serialP)
serial <- ProtoConverter.parsePositiveInt("serial", serialP)
op <- ProtoConverter.parseEnum(TopologyChangeOp.fromProtoV30, "operation", opP)
rpv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield TopologyTransaction(op, serial, mapping)(

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: CantonExamples

View File

@ -23,7 +23,13 @@ import com.digitalasset.canton.sequencing.GrpcSequencerConnection
import com.digitalasset.canton.sequencing.protocol.{HandshakeRequest, HandshakeResponse}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.transaction.SignedTopologyTransaction.GenericSignedTopologyTransaction
import com.digitalasset.canton.topology.{DomainId, Member, ParticipantId, SequencerId}
import com.digitalasset.canton.topology.{
DomainId,
Member,
ParticipantId,
SequencerId,
UniqueIdentifier,
}
import com.digitalasset.canton.tracing.{TraceContext, TracingConfig}
import com.digitalasset.canton.util.retry.RetryUtil.AllExnRetryable
import com.digitalasset.canton.util.retry.Success
@ -81,12 +87,10 @@ class GrpcSequencerConnectClient(
domainId <- EitherT.fromEither[Future](domainId)
sequencerId =
if (response.sequencerId.isEmpty) Right(SequencerId(domainId.unwrap))
else
SequencerId
.fromProtoPrimitive(response.sequencerId, "sequencerId")
.leftMap[Error](err => Error.DeserializationFailure(err.toString))
sequencerId = UniqueIdentifier
.fromProtoPrimitive(response.sequencerUid, "sequencerUid")
.leftMap[Error](err => Error.DeserializationFailure(err.toString))
.map(SequencerId(_))
sequencerId <- EitherT.fromEither[Future](sequencerId)
} yield DomainClientBootstrapInfo(domainId, sequencerId)

View File

@ -576,9 +576,9 @@ object SequencerInfoLoader {
.flatMap { case (_, v) => v.headOption }
.toSeq
if (validSequencerConnections.sizeIs >= sequencerTrustThreshold.unwrap) {
val nonEmptyResult = NonEmptyUtil.fromUnsafe(validSequencerConnections)
val validSequencerConnectionsNE = NonEmptyUtil.fromUnsafe(validSequencerConnections)
val expectedSequencers = NonEmptyUtil.fromUnsafe(
nonEmptyResult
validSequencerConnectionsNE
.groupBy(_.connection.sequencerAlias)
.view
.mapValues(_.map(_.domainClientBootstrapInfo.sequencerId).head1)
@ -586,15 +586,15 @@ object SequencerInfoLoader {
)
SequencerConnections
.many(
nonEmptyResult.map(_.connection),
validSequencerConnectionsNE.map(_.connection),
sequencerTrustThreshold,
submissionRequestAmplification,
)
.leftMap(SequencerInfoLoaderError.FailedToConnectToSequencers)
.map(connections =>
SequencerAggregatedInfo(
domainId = nonEmptyResult.head1.domainClientBootstrapInfo.domainId,
staticDomainParameters = nonEmptyResult.head1.staticDomainParameters,
domainId = validSequencerConnectionsNE.head1.domainClientBootstrapInfo.domainId,
staticDomainParameters = validSequencerConnectionsNE.head1.staticDomainParameters,
expectedSequencers = expectedSequencers,
sequencerConnections = connections,
)

View File

@ -115,7 +115,7 @@ trait CantonNodeBootstrap[+T <: CantonNode] extends FlagCloseable with NamedLogg
}
object CantonNodeBootstrap {
type HealthDumpFunction = () => Future[File]
type HealthDumpFunction = File => Future[Unit]
}
trait BaseMetrics {

View File

@ -166,7 +166,10 @@ object SimpleStatus {
.flatMap(DurationConverter.fromProtoPrimitive)
ports <- proto.ports.toList
.traverse { case (s, i) =>
Port.create(i).leftMap(InvariantViolation.toProtoDeserializationError).map(p => (s, p))
Port
.create(i)
.leftMap(InvariantViolation.toProtoDeserializationError("ports", _))
.map(p => (s, p))
}
.map(_.toMap)
topology <- ProtoConverter.parseRequired(
@ -364,7 +367,7 @@ final case class SequencerNodeStatus(
) extends NodeStatus.Status {
override def active: Boolean = sequencer.isActive
def toProtoV30: v30.StatusResponse.Status = {
val participants = connectedParticipants.map(_.toProtoPrimitive)
val participants = connectedParticipants.map(_.uid.toProtoPrimitive)
SimpleStatus(uid, uptime, ports, active, topologyQueue, components).toProtoV30.copy(
extra = v30
.SequencerNodeStatus(
@ -403,8 +406,10 @@ object SequencerNodeStatus {
v30.SequencerNodeStatus.parseFrom,
sequencerNodeStatusP =>
for {
participants <- sequencerNodeStatusP.connectedParticipants.traverse(pId =>
ParticipantId.fromProtoPrimitive(pId, s"SequencerNodeStatus.connectedParticipants")
participants <- sequencerNodeStatusP.connectedParticipantUids.traverse(pUid =>
UniqueIdentifier
.fromProtoPrimitive(pUid, s"SequencerNodeStatus.connected_participants")
.map(ParticipantId(_))
)
sequencer <- ProtoConverter.parseRequired(
SequencerHealthStatus.fromProto,
@ -413,7 +418,7 @@ object SequencerNodeStatus {
)
domainId <- DomainId.fromProtoPrimitive(
sequencerNodeStatusP.domainId,
s"SequencerNodeStatus.domainId",
s"SequencerNodeStatus.domain_id",
)
admin <- ProtoConverter.parseRequired(
SequencerAdminStatus.fromProto,

View File

@ -5,17 +5,15 @@ package com.digitalasset.canton.health.admin.grpc
import better.files.*
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.health.admin.grpc.GrpcStatusService.DefaultHealthDumpChunkSize
import com.digitalasset.canton.health.admin.v30.{HealthDumpRequest, HealthDumpResponse}
import com.digitalasset.canton.health.admin.{data, v30}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, NodeLoggingUtil}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.google.protobuf.ByteString
import com.digitalasset.canton.util.GrpcStreamingUtils
import io.grpc.Status
import io.grpc.stub.StreamObserver
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.concurrent.{ExecutionContext, Future}
object GrpcStatusService {
val DefaultHealthDumpChunkSize: Int =
@ -24,7 +22,7 @@ object GrpcStatusService {
class GrpcStatusService(
status: => Future[data.NodeStatus[_]],
healthDump: () => Future[File],
healthDump: File => Future[Unit],
processingTimeout: ProcessingTimeout,
val loggerFactory: NamedLoggerFactory,
)(implicit
@ -58,39 +56,12 @@ class GrpcStatusService(
request: HealthDumpRequest,
responseObserver: StreamObserver[HealthDumpResponse],
): Unit = {
// Create a context that will be automatically cancelled after the processing timeout deadline
val context = io.grpc.Context
.current()
.withCancellation()
context.run { () =>
val processingResult = healthDump().map { dumpFile =>
val chunkSize = request.chunkSize.getOrElse(DefaultHealthDumpChunkSize)
dumpFile.newInputStream
.buffered(chunkSize)
.autoClosed { s =>
Iterator
.continually(s.readNBytes(chunkSize))
// Before pushing new chunks to the stream, keep checking that the context has not been cancelled
// This avoids the server reading the entire dump file for nothing if the client has already cancelled
.takeWhile(_.nonEmpty && !context.isCancelled)
.foreach { chunk =>
responseObserver.onNext(HealthDumpResponse(ByteString.copyFrom(chunk)))
}
}
}
Try(Await.result(processingResult, processingTimeout.unbounded.duration)) match {
case Failure(exception) =>
responseObserver.onError(exception)
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
case Success(_) =>
if (!context.isCancelled) responseObserver.onCompleted()
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
}
}
GrpcStreamingUtils.streamToClientFromFile(
(file: File) => healthDump(file),
responseObserver,
byteString => HealthDumpResponse(byteString),
processingTimeout.unbounded.duration,
)
}
override def setLogLevel(request: v30.SetLogLevelRequest): Future[v30.SetLogLevelResponse] = {

View File

@ -131,7 +131,7 @@ class GrpcTopologyAggregationService(
party = partyId.toProtoPrimitive,
participants = participants.map { case (participantId, domains) =>
v30.ListPartiesResponse.Result.ParticipantDomains(
participant = participantId.toProtoPrimitive,
participantUid = participantId.uid.toProtoPrimitive,
domains = domains.map { case (domainId, permission) =>
v30.ListPartiesResponse.Result.ParticipantDomains.DomainPermissions(
domain = domainId.toProtoPrimitive,

View File

@ -71,7 +71,7 @@ import com.digitalasset.canton.topology.{
}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.{EitherTUtil, GrpcUtils, OptionUtil}
import com.digitalasset.canton.util.{EitherTUtil, GrpcStreamingUtils, OptionUtil}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{ProtoDeserializationError, topology}
import com.google.protobuf.ByteString
@ -815,12 +815,11 @@ class GrpcTopologyManagerReadService(
request: GenesisStateRequest,
responseObserver: StreamObserver[GenesisStateResponse],
): Unit = {
GrpcUtils.streamResponse(
GrpcStreamingUtils.streamToClient(
(out: OutputStream) => getGenesisState(request.filterDomainStore, request.timestamp, out),
responseObserver,
byteString => GenesisStateResponse(byteString),
processingTimeout.unbounded.duration,
None,
)
}

View File

@ -81,7 +81,9 @@ class GrpcTopologyManagerWriteService(
case Type.Proposal(Proposal(op, serial, mapping)) =>
val validatedMappingE = for {
// we treat serial=0 as "serial was not set". negative values should be rejected by parsePositiveInt
serial <- Option.when(serial != 0)(serial).traverse(ProtoConverter.parsePositiveInt)
serial <- Option
.when(serial != 0)(serial)
.traverse(ProtoConverter.parsePositiveInt("serial", _))
op <- ProtoConverter.parseEnum(TopologyChangeOp.fromProtoV30, "operation", op)
mapping <- ProtoConverter.required("AuthorizeRequest.mapping", mapping)
signingKeys <-

View File

@ -0,0 +1,194 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.util
import better.files.File.newTemporaryFile
import better.files.{DisposeableExtensions, File, *}
import com.digitalasset.canton.config.DefaultProcessingTimeouts
import com.digitalasset.canton.grpc.ByteStringStreamObserverWithContext
import com.google.protobuf.ByteString
import io.grpc.Context
import io.grpc.stub.StreamObserver
import java.io.{
BufferedInputStream,
ByteArrayInputStream,
ByteArrayOutputStream,
InputStream,
OutputStream,
}
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future, Promise, blocking}
import scala.util.{Failure, Success, Try}
object GrpcStreamingUtils {
private final val defaultChunkSize: Int =
1024 * 1024 * 2 // 2MB - This is half of the default max message size of gRPC
def streamFromClient[Req, Resp, C](
extractChunkBytes: Req => ByteString,
extractContext: Req => C,
processFullRequest: (ByteString, C) => Future[Resp],
responseObserver: StreamObserver[Resp],
processingTimeout: Duration = DefaultProcessingTimeouts.unbounded.duration,
)(implicit ec: ExecutionContext): StreamObserver[Req] = {
val observer =
new ByteStringStreamObserverWithContext[Req, C](extractChunkBytes, extractContext) {
override def onCompleted(): Unit = {
super.onCompleted()
val responseF = this.result.flatMap { case (byteString, context) =>
processFullRequest(byteString, context)
}
Try(Await.result(responseF, processingTimeout)) match {
case Failure(exception) => responseObserver.onError(exception)
case Success(response) =>
responseObserver.onNext(response)
responseObserver.onCompleted()
}
}
}
observer
}
def streamToServer[Req, Resp](
load: StreamObserver[Resp] => StreamObserver[Req],
requestBuilder: Array[Byte] => Req,
byteString: ByteString,
): Future[Resp] = {
val requestComplete = Promise[Resp]()
val ref = new AtomicReference[Option[Resp]](None)
val responseObserver = new StreamObserver[Resp] {
override def onNext(value: Resp): Unit = {
ref.set(Some(value))
}
override def onError(t: Throwable): Unit = requestComplete.failure(t)
override def onCompleted(): Unit = {
ref.get() match {
case Some(response) => requestComplete.success(response)
case None =>
requestComplete.failure(
io.grpc.Status.CANCELLED
.withDescription("Server completed the request before providing a response")
.asRuntimeException()
)
}
}
}
val requestObserver = load(responseObserver)
byteString.toByteArray
.grouped(defaultChunkSize)
.foreach { bytes =>
blocking {
requestObserver.onNext(requestBuilder(bytes))
}
}
requestObserver.onCompleted()
requestComplete.future
}
def streamToClient[T](
responseF: OutputStream => Future[Unit],
responseObserver: StreamObserver[T],
fromByteString: FromByteString[T],
processingTimeout: Duration = DefaultProcessingTimeouts.unbounded.duration,
chunkSizeO: Option[Int] = None,
)(implicit ec: ExecutionContext): Unit = {
val context = io.grpc.Context
.current()
.withCancellation()
val outputStream = new ByteArrayOutputStream()
context.run { () =>
val processingResult = responseF(outputStream).map { _ =>
val chunkSize = chunkSizeO.getOrElse(defaultChunkSize)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
streamResponseChunks(context, responseObserver)(
new BufferedInputStream(inputStream),
chunkSize,
fromByteString,
)
}
finishStream(context, responseObserver)(processingResult, processingTimeout)
}
}
def streamToClientFromFile[T](
responseF: File => Future[Unit],
responseObserver: StreamObserver[T],
fromByteString: FromByteString[T],
processingTimeout: Duration = DefaultProcessingTimeouts.unbounded.duration,
chunkSizeO: Option[Int] = None,
)(implicit ec: ExecutionContext): Unit = {
val file = newTemporaryFile()
val context = io.grpc.Context
.current()
.withCancellation()
context.run { () =>
val processingResult = responseF(file).map { _ =>
val chunkSize = chunkSizeO.getOrElse(defaultChunkSize)
streamResponseChunks(context, responseObserver)(
file.newInputStream.buffered(chunkSize),
chunkSize,
fromByteString,
)
}
finishStream(context, responseObserver)(processingResult, processingTimeout)
}
}
private def streamResponseChunks[T](
context: Context.CancellableContext,
responseObserver: StreamObserver[T],
)(
inputStream: InputStream,
chunkSize: Int,
fromByteString: FromByteString[T],
) = {
inputStream.autoClosed { s =>
Iterator
.continually(s.readNBytes(chunkSize))
// Before pushing new chunks to the stream, keep checking that the context has not been cancelled
// This avoids the server reading the entire dump file for nothing if the client has already cancelled
.takeWhile(_.nonEmpty && !context.isCancelled)
.foreach { byteArray =>
val chunk: ByteString = ByteString.copyFrom(byteArray)
responseObserver.onNext(fromByteString.toT(chunk))
}
}
}
private def finishStream[T](
context: Context.CancellableContext,
responseObserver: StreamObserver[T],
)(f: Future[Unit], timeout: Duration): Unit = {
Try(Await.result(f, timeout)) match {
case Failure(exception) =>
responseObserver.onError(exception)
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
case Success(_) =>
if (!context.isCancelled) responseObserver.onCompleted()
else {
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
}
}
}
}
// Define a type class for converting ByteString to the generic type T
trait FromByteString[T] {
def toT(chunk: ByteString): T
}

View File

@ -1,68 +0,0 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.util
import better.files.DisposeableExtensions
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import java.io.{BufferedInputStream, ByteArrayInputStream, ByteArrayOutputStream, OutputStream}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object GrpcUtils {
private final val defaultChunkSize: Int =
1024 * 1024 * 2 // 2MB - This is half of the default max message size of gRPC
def streamResponse[T](
responseF: OutputStream => Future[Unit],
responseObserver: StreamObserver[T],
fromByteString: FromByteString[T],
processingTimeout: Duration,
chunkSizeO: Option[Int] = None,
)(implicit ec: ExecutionContext): Unit = {
val context = io.grpc.Context
.current()
.withCancellation()
val outputStream = new ByteArrayOutputStream()
context.run { () =>
val processingResult = responseF(outputStream).map { _ =>
val chunkSize = chunkSizeO.getOrElse(defaultChunkSize)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
new BufferedInputStream(inputStream)
.autoClosed { s =>
Iterator
.continually(s.readNBytes(chunkSize))
// Before pushing new chunks to the stream, keep checking that the context has not been cancelled
// This avoids the server reading the entire dump file for nothing if the client has already cancelled
.takeWhile(_.nonEmpty && !context.isCancelled)
.foreach { byteArray =>
val chunk: ByteString = ByteString.copyFrom(byteArray)
responseObserver.onNext(fromByteString.toT(chunk))
}
}
}
Try(Await.result(processingResult, processingTimeout)) match {
case Failure(exception) =>
responseObserver.onError(exception)
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
case Success(_) =>
if (!context.isCancelled) responseObserver.onCompleted()
else {
context.cancel(new io.grpc.StatusRuntimeException(io.grpc.Status.CANCELLED))
()
}
}
}
}
}
// Define a type class for converting ByteString to the generic type T
trait FromByteString[T] {
def toT(chunk: ByteString): T
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --target=2.1
name: medical-records

View File

@ -32,7 +32,7 @@ service SequencerAdministrationService {
// Fetch the onboarding state for a given sequencer.
// the returned bytestring can be used directly to initialize the given sequencer later on
rpc OnboardingState(OnboardingStateRequest) returns (OnboardingStateResponse);
rpc OnboardingState(OnboardingStateRequest) returns (stream OnboardingStateResponse);
// Disable members at the sequencer. Will prevent existing and new instances from connecting, and permit removing their data.
rpc DisableMember(DisableMemberRequest) returns (DisableMemberResponse);
@ -110,24 +110,15 @@ message SnapshotResponse {
message OnboardingStateRequest {
oneof request {
// The sequencer for which to fetch the onboarding state
string sequencer_id = 1;
string sequencer_uid = 1;
// The effective time the should be "contained" in the sequencer snapshot
google.protobuf.Timestamp timestamp = 2;
}
}
message OnboardingStateResponse {
message Success {
// versioned OnboardingStateForSequencer
bytes onboarding_state_for_sequencer = 1;
}
message Failure {
string reason = 1;
}
oneof value {
Success success = 1;
Failure failure = 2;
}
// versioned OnboardingStateForSequencer
bytes onboarding_state_for_sequencer = 1;
}
message OnboardingStateForSequencer {

View File

@ -16,8 +16,8 @@ service SequencerInitializationService {
// and will immediately attempt to use it.
// If the request is received after the sequencer has been successfully initialized it should return successfully
// if the domain_id matches the domain that the sequencer has been initialized for, otherwise it should fail.
rpc InitializeSequencerFromGenesisState(InitializeSequencerFromGenesisStateRequest) returns (InitializeSequencerFromGenesisStateResponse);
rpc InitializeSequencerFromOnboardingState(InitializeSequencerFromOnboardingStateRequest) returns (InitializeSequencerFromOnboardingStateResponse);
rpc InitializeSequencerFromGenesisState(stream InitializeSequencerFromGenesisStateRequest) returns (InitializeSequencerFromGenesisStateResponse);
rpc InitializeSequencerFromOnboardingState(stream InitializeSequencerFromOnboardingStateRequest) returns (InitializeSequencerFromOnboardingStateResponse);
}
// Includes sufficient detail for:

View File

@ -11,7 +11,6 @@ import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.data.SequencerBlockStore.InvalidTimestamp
import com.digitalasset.canton.domain.block.data.db.DbSequencerBlockStore
import com.digitalasset.canton.domain.block.data.memory.InMemorySequencerBlockStore
import com.digitalasset.canton.domain.sequencing.integrations.state.statemanager.{
@ -19,6 +18,7 @@ import com.digitalasset.canton.domain.sequencing.integrations.state.statemanager
MemberSignedEvents,
MemberTimestamps,
}
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError
import com.digitalasset.canton.domain.sequencing.sequencer.{
InFlightAggregationUpdates,
InFlightAggregations,
@ -86,7 +86,7 @@ trait SequencerBlockStore extends AutoCloseable {
*/
def readStateForBlockContainingTimestamp(timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, InvalidTimestamp, BlockEphemeralState]
): EitherT[Future, SequencerError, BlockEphemeralState]
def pruningStatus()(implicit traceContext: TraceContext): Future[InternalSequencerPruningStatus]
@ -324,6 +324,4 @@ object SequencerBlockStore {
unifiedSequencer = unifiedSequencer,
)
}
final case class InvalidTimestamp(timestamp: CantonTimestamp)
}

View File

@ -11,7 +11,6 @@ import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.data.EphemeralState.counterToCheckpoint
import com.digitalasset.canton.domain.block.data.SequencerBlockStore.InvalidTimestamp
import com.digitalasset.canton.domain.block.data.{
BlockEphemeralState,
BlockInfo,
@ -24,6 +23,8 @@ import com.digitalasset.canton.domain.sequencing.integrations.state.statemanager
MemberSignedEvents,
MemberTimestamps,
}
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.BlockNotFound
import com.digitalasset.canton.domain.sequencing.sequencer.store.CounterCheckpoint
import com.digitalasset.canton.domain.sequencing.sequencer.{
InFlightAggregationUpdates,
@ -127,13 +128,13 @@ class DbSequencerBlockStore(
override def readStateForBlockContainingTimestamp(
timestamp: CantonTimestamp
)(implicit traceContext: TraceContext): EitherT[Future, InvalidTimestamp, BlockEphemeralState] =
)(implicit traceContext: TraceContext): EitherT[Future, SequencerError, BlockEphemeralState] =
EitherT(
storage.query(
for {
heightAndTimestamp <- findBlockContainingTimestamp(timestamp)
state <- heightAndTimestamp match {
case None => DBIO.successful(Left(InvalidTimestamp(timestamp)))
case None => DBIO.successful(Left(BlockNotFound.InvalidTimestamp(timestamp)))
case Some(block) => readAtBlock(block).map(Right.apply)
}
} yield state,

View File

@ -12,7 +12,6 @@ import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.domain.block.data.EphemeralState.counterToCheckpoint
import com.digitalasset.canton.domain.block.data.SequencerBlockStore.InvalidTimestamp
import com.digitalasset.canton.domain.block.data.{
BlockEphemeralState,
BlockInfo,
@ -24,6 +23,8 @@ import com.digitalasset.canton.domain.sequencing.integrations.state.statemanager
MemberSignedEvents,
MemberTimestamps,
}
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.BlockNotFound
import com.digitalasset.canton.domain.sequencing.sequencer.{
InFlightAggregationUpdates,
InternalSequencerPruningStatus,
@ -157,12 +158,12 @@ class InMemorySequencerBlockStore(
override def readStateForBlockContainingTimestamp(
timestamp: CantonTimestamp
)(implicit traceContext: TraceContext): EitherT[Future, InvalidTimestamp, BlockEphemeralState] =
)(implicit traceContext: TraceContext): EitherT[Future, SequencerError, BlockEphemeralState] =
blockToTimestampMap.toList
.sortBy(_._2._1)
.find(_._2._1 >= timestamp)
.fold[EitherT[Future, InvalidTimestamp, BlockEphemeralState]](
EitherT.leftT(InvalidTimestamp(timestamp))
.fold[EitherT[Future, SequencerError, BlockEphemeralState]](
EitherT.leftT(BlockNotFound.InvalidTimestamp(timestamp))
) { case (blockHeight, (blockTimestamp, latestSequencerEventTs)) =>
val block = BlockInfo(blockHeight, blockTimestamp, latestSequencerEventTs)
EitherT.right(

View File

@ -5,7 +5,7 @@ package com.digitalasset.canton.domain.metrics
import com.daml.metrics.HealthMetrics
import com.daml.metrics.api.HistogramInventory.Item
import com.daml.metrics.api.MetricHandle.{Histogram, LabeledMetricsFactory, Meter, Timer}
import com.daml.metrics.api.MetricHandle.{Gauge, Histogram, LabeledMetricsFactory, Meter, Timer}
import com.daml.metrics.api.{
HistogramInventory,
MetricInfo,
@ -14,33 +14,76 @@ import com.daml.metrics.api.{
MetricsContext,
}
import com.daml.metrics.grpc.GrpcServerMetrics
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.environment.BaseMetrics
import com.digitalasset.canton.logging.pretty.PrettyBareCase
import com.digitalasset.canton.logging.pretty.PrettyNameOnlyCase
import com.digitalasset.canton.metrics.{DbStorageHistograms, DbStorageMetrics}
import com.digitalasset.canton.topology.SequencerId
class BftOrderingHistograms(val parent: MetricName)(implicit
inventory: HistogramInventory
) {
private[metrics] val prefix = parent :+ BftOrderingMetrics.Prefix
private[metrics] val dbStorage = new DbStorageHistograms(parent)
private[metrics] val requestsOrderingTime: Item = Item(
prefix :+ "requests-ordering-time",
summary = "Requests ordering time",
description =
"""Records the rate and time it takes to order requests. This metric is always meaningful
|when queried on and restricted to the receiving sequencer; in other cases, it is meaningful only
|when the receiving and reporting sequencers' clocks are kept synchronized.""",
qualification = MetricQualification.Latency,
)
object global {
private[metrics] val prefix = BftOrderingHistograms.this.prefix :+ "global"
private[metrics] val requestsSize: Item = Item(
prefix :+ "requests-size",
summary = "Requests ordering time",
description = """Records the size of requests to the BFT ordering service""",
qualification = MetricQualification.Debug,
)
private[metrics] val requestsOrderingLatency: Item = Item(
prefix :+ "requests-ordering-latency",
summary = "Requests ordering latency",
description =
"""Records the rate and latency it takes to order requests. This metric is always meaningful
|when queried on and restricted to the receiving sequencer; in other cases, it is meaningful only
|when the receiving and reporting sequencers' clocks are kept synchronized.""",
qualification = MetricQualification.Latency,
)
}
object ingress {
private[metrics] val prefix = BftOrderingHistograms.this.prefix :+ "ingress"
private[metrics] val requestsSize: Item = Item(
prefix :+ "requests-size",
summary = "Requests size",
description = "Records the size of requests to the BFT ordering service.",
qualification = MetricQualification.Traffic,
)
}
object consensus {
private[metrics] val prefix = BftOrderingHistograms.this.prefix :+ "consensus"
private[metrics] val consensusCommitLatency: Item = Item(
prefix :+ "commit-latency",
summary = "Consensus commit latency",
description =
"Records the rate and latency it takes to commit a block at the consensus level.",
qualification = MetricQualification.Latency,
)
}
object topology {
private[metrics] val prefix = BftOrderingHistograms.this.prefix :+ "topology"
private[metrics] val queryLatency: Item = Item(
prefix :+ "query-latency",
summary = "Topology query latency",
description = "Records the rate and latency it takes to query the topology client.",
qualification = MetricQualification.Latency,
)
}
// Force the registration of all histograms, else it would happen too late
// because Scala `object`s are lazily initialized.
{
global.requestsOrderingLatency.discard
ingress.requestsSize.discard
consensus.consensusCommitLatency.discard
topology.queryLatency.discard
}
}
class BftOrderingMetrics(
@ -50,21 +93,21 @@ class BftOrderingMetrics(
override val healthMetrics: HealthMetrics,
) extends BaseMetrics {
override val prefix: MetricName = histograms.prefix
object dbStorage extends DbStorageMetrics(histograms.dbStorage, openTelemetryMetricsFactory)
private implicit val mc: MetricsContext = MetricsContext.Empty
override def storageMetrics: DbStorageMetrics = dbStorage
override val prefix: MetricName = histograms.prefix
object dbStorage extends DbStorageMetrics(histograms.dbStorage, openTelemetryMetricsFactory)
override def storageMetrics: DbStorageMetrics = dbStorage
object global {
private val prefix = BftOrderingMetrics.this.prefix :+ "global"
private val prefix = histograms.global.prefix
val bytesOrdered: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ s"ordered-${Histogram.Bytes}",
prefix :+ "ordered-bytes",
summary = "Bytes ordered",
description = "Measures the total bytes ordered.",
qualification = MetricQualification.Traffic,
@ -73,7 +116,7 @@ class BftOrderingMetrics(
val requestsOrdered: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ s"ordered-requests",
prefix :+ "ordered-requests",
summary = "Requests ordered",
description = "Measures the total requests ordered.",
qualification = MetricQualification.Traffic,
@ -98,39 +141,18 @@ class BftOrderingMetrics(
)
)
object requestsOrderingTime {
val timer: Timer =
openTelemetryMetricsFactory.timer(histograms.requestsOrderingTime.info)
object requestsOrderingLatency {
object labels {
val ReceivingSequencer: String = "receivingSequencer"
}
val timer: Timer =
openTelemetryMetricsFactory.timer(histograms.global.requestsOrderingLatency.info)
}
}
object ingress {
private val prefix = BftOrderingMetrics.this.prefix :+ "ingress"
val requestsReceived: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ s"received-requests",
summary = "Requests received",
description = "Measures the total requests received.",
qualification = MetricQualification.Traffic,
)
)
val bytesReceived: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ s"received-bytes",
summary = "Bytes received",
description = "Measures the total bytes received.",
qualification = MetricQualification.Traffic,
)
)
val requestsSize: Histogram =
openTelemetryMetricsFactory.histogram(histograms.requestsSize.info)
private val prefix = histograms.ingress.prefix
object labels {
val Tag: String = "tag"
@ -141,13 +163,183 @@ class BftOrderingMetrics(
val Key: String = "outcome"
object values {
sealed trait OutcomeValue extends Product with PrettyBareCase
sealed trait OutcomeValue extends PrettyNameOnlyCase
case object Success extends OutcomeValue
case object QueueFull extends OutcomeValue
case object RequestTooBig extends OutcomeValue
}
}
}
val requestsReceived: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "received-requests",
summary = "Requests received",
description = "Measures the total requests received.",
qualification = MetricQualification.Traffic,
)
)
val bytesReceived: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "received-bytes",
summary = "Bytes received",
description = "Measures the total bytes received.",
qualification = MetricQualification.Traffic,
)
)
val requestsSize: Histogram =
openTelemetryMetricsFactory.histogram(histograms.ingress.requestsSize.info)
}
object consensus {
private val prefix = histograms.consensus.prefix
val commitLatency: Timer =
openTelemetryMetricsFactory.timer(histograms.consensus.consensusCommitLatency.info)
val prepareVotesPercent: Gauge[Double] = openTelemetryMetricsFactory.gauge(
MetricInfo(
prefix :+ "prepare-votes-percent",
summary = "Block vote % during prepare",
description =
"Percentage of BFT sequencers that voted for a block in the PBFT prepare stage.",
qualification = MetricQualification.Debug,
),
0.0d,
)
val commitVotesPercent: Gauge[Double] = openTelemetryMetricsFactory.gauge(
MetricInfo(
prefix :+ "commit-votes-percent",
summary = "Block vote % during commit",
description =
"Percentage of BFT sequencers that voted for a block in the PBFT commit stage.",
qualification = MetricQualification.Debug,
),
0.0d,
)
}
object topology {
private val prefix = histograms.topology.prefix
val validators: Gauge[Int] = openTelemetryMetricsFactory.gauge(
MetricInfo(
prefix :+ "validators",
summary = "Active validators",
description = "Number of BFT sequencers actively involved in consensus.",
qualification = MetricQualification.Debug,
),
0,
)
val queryLatency: Timer =
openTelemetryMetricsFactory.timer(histograms.topology.queryLatency.info)
}
object p2p {
private val prefix = BftOrderingMetrics.this.prefix :+ "p2p"
object connections {
private val prefix = p2p.prefix :+ "connections"
val connected: Gauge[Int] = openTelemetryMetricsFactory.gauge(
MetricInfo(
prefix :+ "connected",
summary = "Connected peers",
description = "Number of connected P2P endpoints.",
qualification = MetricQualification.Debug,
),
0,
)
val authenticated: Gauge[Int] = openTelemetryMetricsFactory.gauge(
MetricInfo(
prefix :+ "authenticated",
summary = "Authenticated peers",
description = "Number of connected P2P endpoints that are also authenticated.",
qualification = MetricQualification.Debug,
),
0,
)
}
object send {
private val prefix = p2p.prefix :+ "send"
object labels {
val TargetSequencer: String = "targetSequencer"
val DroppedAsUnauthenticated: String = "droppedAsUnauthenticated"
object targetModule {
val Key: String = "targetModule"
object values {
sealed trait TargetModuleValue extends PrettyNameOnlyCase
case object Availability extends TargetModuleValue
case object Consensus extends TargetModuleValue
}
}
}
val sentBytes: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "sent-bytes",
summary = "Bytes sent",
description = "Total P2P bytes sent.",
qualification = MetricQualification.Traffic,
)
)
val sentMessages: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "sent-messages",
summary = "Messages sent",
description = "Total P2P messages sent.",
qualification = MetricQualification.Traffic,
)
)
}
object receive {
private val prefix = p2p.prefix :+ "receive"
object labels {
val SourceSequencer: String = "sourceSequencer"
object source {
val Key: String = "targetModule"
object values {
sealed trait SourceValue extends PrettyNameOnlyCase
case object SourceParsingFailed extends SourceValue
case class Empty(from: SequencerId) extends SourceValue
case class Availability(from: SequencerId) extends SourceValue
case class Consensus(from: SequencerId) extends SourceValue
}
}
}
val sentBytes: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "sent-bytes",
summary = "Bytes sent",
description = "Total P2P bytes sent.",
qualification = MetricQualification.Traffic,
)
)
val sentMessages: Meter = openTelemetryMetricsFactory.meter(
MetricInfo(
prefix :+ "sent-messages",
summary = "Messages sent",
description = "Total P2P messages sent.",
qualification = MetricQualification.Traffic,
)
)
}
}
}

View File

@ -15,6 +15,7 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.RegisterError
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerWriter.ResetWatermark
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.SnapshotNotFound
import com.digitalasset.canton.domain.sequencing.sequencer.errors.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.SequencerStore.SequencerPruningResult
import com.digitalasset.canton.domain.sequencing.sequencer.store.*
@ -408,7 +409,7 @@ class DatabaseSequencer(
override def snapshot(timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, String, SequencerSnapshot] = {
): EitherT[Future, SequencerError, SequencerSnapshot] = {
for {
safeWatermarkO <- EitherT.right(store.safeWatermark)
// we check if watermark is after the requested timestamp to avoid snapshotting the sequencer
@ -418,13 +419,13 @@ class DatabaseSequencer(
case Some(safeWatermark) =>
EitherTUtil.condUnitET[Future](
timestamp <= safeWatermark,
s"Requested snapshot at $timestamp is after the safe watermark $safeWatermark",
SnapshotNotFound.Error(timestamp, safeWatermark),
)
case None =>
EitherT.leftT[Future, Unit](s"No safe watermark found for the sequencer")
EitherT.leftT[Future, Unit](SnapshotNotFound.MissingSafeWatermark(topologyClientMember))
}
}
snapshot <- EitherT.right[String](store.readStateAtTimestamp(timestamp))
snapshot <- EitherT.right[SequencerError](store.readStateAtTimestamp(timestamp))
} yield snapshot
}

View File

@ -12,7 +12,7 @@ import com.digitalasset.canton.serialization.{
ProtoConverter,
ProtocolVersionedMemoizedEvidence,
}
import com.digitalasset.canton.topology.SequencerId
import com.digitalasset.canton.topology.{SequencerId, UniqueIdentifier}
import com.digitalasset.canton.version.*
import com.google.protobuf.ByteString
@ -33,7 +33,7 @@ final case class OrderingRequest[+A <: HasCryptographicEvidence] private (
private def toProtoV30: v30.OrderingRequest =
v30.OrderingRequest(
sequencerId.toProtoPrimitive,
sequencerId.uid.toProtoPrimitive,
Some(content.getCryptographicEvidence),
)
@ -96,7 +96,9 @@ object OrderingRequest
val v30.OrderingRequest(sequencerIdP, content) = orderingRequestP
for {
contentB <- ProtoConverter.required("content", content)
sequencerId <- SequencerId.fromProtoPrimitive(sequencerIdP, "sequencer_id")
sequencerId <- UniqueIdentifier
.fromProtoPrimitive(sequencerIdP, "sequencer_uid")
.map(SequencerId(_))
rpv <- protocolVersionRepresentativeFor(ProtoVersion(30))
} yield OrderingRequest(sequencerId, BytestringWithCryptographicEvidence(contentB))(
rpv,

View File

@ -12,6 +12,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.errors.{
CreateSubscriptionError,
RegisterMemberError,
SequencerAdministrationError,
SequencerError,
SequencerWriteError,
}
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.TimestampSelector.TimestampSelector
@ -114,7 +115,7 @@ trait Sequencer
*/
def snapshot(timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, String, SequencerSnapshot]
): EitherT[Future, SequencerError, SequencerSnapshot]
/** Disable the provided member. Should prevent them from reading or writing in the future (although they can still be addressed).
* Their unread data can also be pruned.

View File

@ -189,7 +189,7 @@ object SequencerSnapshot extends HasProtocolVersionedCompanion[SequencerSnapshot
maxSequencingTime,
aggregationRule,
)
.leftMap(err => ProtoDeserializationError.InvariantViolation(err))
.leftMap(err => ProtoDeserializationError.InvariantViolation(field = None, err))
} yield aggregationId -> inFlightAggregation
}

View File

@ -23,7 +23,10 @@ import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.{
}
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscriptionError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.{
CreateSubscriptionError,
SequencerError,
}
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.TimestampSelector.{
ExactTimestamp,
TimestampSelector,
@ -453,40 +456,34 @@ class BlockSequencer(
override def snapshot(
timestamp: CantonTimestamp
)(implicit traceContext: TraceContext): EitherT[Future, String, SequencerSnapshot] = {
)(implicit traceContext: TraceContext): EitherT[Future, SequencerError, SequencerSnapshot] = {
// TODO(#12676) Make sure that we don't request a snapshot for a state that was already pruned
for {
bsSnapshot <- store
.readStateForBlockContainingTimestamp(timestamp)
.biflatMap(
_ =>
EitherT.leftT[Future, SequencerSnapshot](
s"Provided timestamp $timestamp is not linked to a block"
),
blockEphemeralState => {
for {
// Look up traffic info at the latest timestamp from the block,
// because that's where the onboarded sequencer will start reading
trafficPurchased <- EitherT.liftF[Future, String, Seq[TrafficPurchased]](
trafficPurchasedStore
.lookupLatestBeforeInclusive(blockEphemeralState.latestBlock.lastTs)
)
trafficConsumed <- EitherT.liftF[Future, String, Seq[TrafficConsumed]](
blockRateLimitManager.trafficConsumedStore
.lookupLatestBeforeInclusive(blockEphemeralState.latestBlock.lastTs)
)
} yield blockEphemeralState
.toSequencerSnapshot(protocolVersion, trafficPurchased, trafficConsumed)
.tap(snapshot =>
if (logger.underlying.isDebugEnabled()) {
logger.trace(
s"Snapshot for timestamp $timestamp generated from ephemeral state:\n$blockEphemeralState"
)
}
)
},
)
.flatMap(blockEphemeralState => {
for {
// Look up traffic info at the latest timestamp from the block,
// because that's where the onboarded sequencer will start reading
trafficPurchased <- EitherT.liftF[Future, SequencerError, Seq[TrafficPurchased]](
trafficPurchasedStore
.lookupLatestBeforeInclusive(blockEphemeralState.latestBlock.lastTs)
)
trafficConsumed <- EitherT.liftF[Future, SequencerError, Seq[TrafficConsumed]](
blockRateLimitManager.trafficConsumedStore
.lookupLatestBeforeInclusive(blockEphemeralState.latestBlock.lastTs)
)
} yield blockEphemeralState
.toSequencerSnapshot(protocolVersion, trafficPurchased, trafficConsumed)
.tap(snapshot =>
if (logger.underlying.isDebugEnabled()) {
logger.trace(
s"Snapshot for timestamp $timestamp generated from ephemeral state:\n$blockEphemeralState"
)
}
)
})
finalSnapshot <- {
if (unifiedSequencer) {
super.snapshot(bsSnapshot.lastTs).map { dbsSnapshot =>
@ -499,7 +496,7 @@ class BlockSequencer(
)(dbsSnapshot.representativeProtocolVersion)
}
} else {
EitherT.pure[Future, String](bsSnapshot)
EitherT.pure[Future, SequencerError](bsSnapshot)
}
}
} yield {

View File

@ -21,6 +21,7 @@ import com.digitalasset.canton.util.LoggerUtil
import scala.concurrent.duration.Duration
sealed trait SequencerError extends BaseCantonError
object SequencerError extends SequencerErrorGroup {
@Explanation("""
@ -218,5 +219,44 @@ object SequencerError extends SequencerErrorGroup {
s"The payload to event time bound [$bound] has been been exceeded by payload time [$payloadTs] and sequenced event time [$sequencedTs]: $messageId"
)
}
@Explanation(
"""This error indicates that no sequencer snapshot can be found for the given timestamp."""
)
@Resolution(
"""Verify that the timestamp is correct and that the sequencer is healthy."""
)
object SnapshotNotFound
extends ErrorCode(
"SNAPSHOT_NOT_FOUND",
ErrorCategory.InvalidGivenCurrentSystemStateOther,
) {
final case class Error(requestTimestamp: CantonTimestamp, safeWatermark: CantonTimestamp)
extends BaseCantonError.Impl(
cause =
s"Requested snapshot at $requestTimestamp is after the safe watermark $safeWatermark"
)
with SequencerError
final case class MissingSafeWatermark(id: Member)
extends BaseCantonError.Impl(
cause = s"No safe watermark found for the sequencer $id"
)
with SequencerError
}
@Explanation("""This error indicates that no block can be found for the given timestamp.""")
@Resolution(
"""Verify that the timestamp is correct and that the sequencer is healthy."""
)
object BlockNotFound
extends ErrorCode(
"BLOCK_NOT_FOUND",
ErrorCategory.InvalidGivenCurrentSystemStateOther,
) {
final case class InvalidTimestamp(timestamp: CantonTimestamp)
extends BaseCantonError.Impl(
cause = s"Invalid timestamp $timestamp"
)
with SequencerError
}
}

View File

@ -12,7 +12,7 @@ import com.digitalasset.canton.ProtoDeserializationError.FieldNotSet
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.TimestampSelector
import com.digitalasset.canton.domain.sequencing.sequencer.{OnboardingStateForSequencer, Sequencer}
import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.error.{BaseCantonError, CantonError}
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil
@ -21,6 +21,7 @@ import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.sequencer.admin.v30
import com.digitalasset.canton.sequencer.admin.v30.OnboardingStateRequest.Request
import com.digitalasset.canton.sequencer.admin.v30.{
OnboardingStateResponse,
SetTrafficPurchasedRequest,
SetTrafficPurchasedResponse,
}
@ -31,11 +32,18 @@ import com.digitalasset.canton.topology.client.DomainTopologyClient
import com.digitalasset.canton.topology.processing.{EffectiveTime, SequencedTime}
import com.digitalasset.canton.topology.store.TopologyStore
import com.digitalasset.canton.topology.store.TopologyStoreId.DomainStore
import com.digitalasset.canton.topology.{Member, SequencerId}
import com.digitalasset.canton.topology.{
Member,
SequencerId,
TopologyManagerError,
UniqueIdentifier,
}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.util.EitherTUtil
import com.digitalasset.canton.util.{EitherTUtil, GrpcStreamingUtils}
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusRuntimeException}
import java.io.OutputStream
import scala.concurrent.{ExecutionContext, Future}
class GrpcSequencerAdministrationService(
@ -112,18 +120,16 @@ class GrpcSequencerAdministrationService(
override def snapshot(request: v30.SnapshotRequest): Future[v30.SnapshotResponse] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
(for {
timestamp <- EitherT
.fromEither[Future](
ProtoConverter
.parseRequired(CantonTimestamp.fromProtoTimestamp, "timestamp", request.timestamp)
)
.leftMap(_.toString)
result <- sequencer.snapshot(timestamp)
timestamp <- wrapErr(
ProtoConverter
.parseRequired(CantonTimestamp.fromProtoTimestamp, "timestamp", request.timestamp)
)
result <- sequencer.snapshot(timestamp).leftWiden[BaseCantonError]
} yield result)
.fold[v30.SnapshotResponse](
error =>
v30.SnapshotResponse(
v30.SnapshotResponse.Value.Failure(v30.SnapshotResponse.Failure(error))
v30.SnapshotResponse.Value.Failure(v30.SnapshotResponse.Failure(error.cause))
),
result =>
v30.SnapshotResponse(
@ -135,21 +141,33 @@ class GrpcSequencerAdministrationService(
}
override def onboardingState(
request: v30.OnboardingStateRequest
): Future[v30.OnboardingStateResponse] = {
request: v30.OnboardingStateRequest,
responseObserver: StreamObserver[OnboardingStateResponse],
): Unit =
GrpcStreamingUtils.streamToClient(
(out: OutputStream) => onboardingState(request, out),
responseObserver,
byteString => OnboardingStateResponse(byteString),
)
private def onboardingState(
request: v30.OnboardingStateRequest,
out: OutputStream,
): Future[Unit] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
val parseMemberOrTimestamp = request.request match {
case Request.Empty => Left(FieldNotSet("sequencer_id"): ProtoDeserializationError)
case Request.SequencerId(sequencerId) =>
SequencerId
.fromProtoPrimitive(sequencerId, "sequencer_id")
case Request.SequencerUid(sequencerUid) =>
UniqueIdentifier
.fromProtoPrimitive(sequencerUid, "sequencer_id")
.map(SequencerId(_))
.map(Left(_))
case Request.Timestamp(referenceEffectiveTime) =>
CantonTimestamp.fromProtoTimestamp(referenceEffectiveTime).map(Right(_))
}
(for {
memberOrTimestamp <- EitherT.fromEither[Future](parseMemberOrTimestamp).leftMap(_.toString)
val res = for {
memberOrTimestamp <- wrapErr(parseMemberOrTimestamp)
referenceEffective <- memberOrTimestamp match {
case Left(sequencerId) =>
EitherT(
@ -158,17 +176,20 @@ class GrpcSequencerAdministrationService(
.map(txOpt =>
txOpt
.map(stored => stored.validFrom)
.toRight(s"Did not find onboarding topology transaction for $sequencerId")
.toRight(
TopologyManagerError.InternalError
.Other(s"Did not find onboarding topology transaction for $sequencerId")
)
)
)
case Right(timestamp) =>
EitherT.rightT[Future, String](EffectiveTime(timestamp))
EitherT.rightT[Future, BaseCantonError](EffectiveTime(timestamp))
}
_ <- domainTimeTracker
.awaitTick(referenceEffective.value)
.map(EitherT.right[String](_).void)
.getOrElse(EitherTUtil.unit[String])
.map(EitherT.right[CantonError](_).void)
.getOrElse(EitherTUtil.unit[BaseCantonError])
/* find the sequencer snapshot that contains a sequenced timestamp that is >= to the reference/onboarding effective time
if we take the sequencing time here, we might miss out topology transactions between sequencerSnapshot.lastTs and effectiveTime
@ -189,32 +210,17 @@ class GrpcSequencerAdministrationService(
sequencerSnapshot <- sequencer.snapshot(referenceEffective.value)
topologySnapshot <- EitherT.right[String](
topologySnapshot <- EitherT.right[BaseCantonError](
topologyStore.findEssentialStateAtSequencedTime(SequencedTime(sequencerSnapshot.lastTs))
)
} yield (topologySnapshot, sequencerSnapshot))
.fold[v30.OnboardingStateResponse](
error =>
v30.OnboardingStateResponse(
v30.OnboardingStateResponse.Value.Failure(
v30.OnboardingStateResponse.Failure(error)
)
),
{ case (topologySnapshot, sequencerSnapshot) =>
v30.OnboardingStateResponse(
v30.OnboardingStateResponse.Value.Success(
v30.OnboardingStateResponse.Success(
OnboardingStateForSequencer(
topologySnapshot,
staticDomainParameters,
sequencerSnapshot,
staticDomainParameters.protocolVersion,
).toByteString
)
)
)
},
)
} yield OnboardingStateForSequencer(
topologySnapshot,
staticDomainParameters,
sequencerSnapshot,
staticDomainParameters.protocolVersion,
).toByteString.writeTo(out)
mapErrNew(res)
}
override def disableMember(
@ -248,9 +254,12 @@ class GrpcSequencerAdministrationService(
val result = {
for {
member <- wrapErrUS(Member.fromProtoPrimitive(requestP.member, "member"))
serial <- wrapErrUS(ProtoConverter.parsePositiveInt(requestP.serial))
serial <- wrapErrUS(ProtoConverter.parsePositiveInt("serial", requestP.serial))
totalTrafficPurchased <- wrapErrUS(
ProtoConverter.parseNonNegativeLong(requestP.totalTrafficPurchased)
ProtoConverter.parseNonNegativeLong(
"total_traffic_purchased",
requestP.totalTrafficPurchased,
)
)
highestMaxSequencingTimestamp <- sequencer
.setTrafficPurchased(member, serial, totalTrafficPurchased, sequencerClient)

View File

@ -53,7 +53,7 @@ class GrpcSequencerConnectService(
Future.successful(
GetDomainIdResponse(
domainId = domainId.toProtoPrimitive,
sequencerId = sequencerId.toProtoPrimitive,
sequencerUid = sequencerId.uid.toProtoPrimitive,
)
)

View File

@ -16,7 +16,7 @@ import com.digitalasset.canton.error.CantonError
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.grpc.CantonGrpcUtil.*
import com.digitalasset.canton.protocol.StaticDomainParameters
import com.digitalasset.canton.protocol.{StaticDomainParameters, v30}
import com.digitalasset.canton.sequencer.admin.v30.SequencerInitializationServiceGrpc.SequencerInitializationService
import com.digitalasset.canton.sequencer.admin.v30.{
InitializeSequencerFromGenesisStateRequest,
@ -36,6 +36,9 @@ import com.digitalasset.canton.topology.transaction.{
TopologyMapping,
}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.util.GrpcStreamingUtils
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import scala.concurrent.{ExecutionContext, Future}
@ -48,14 +51,27 @@ class GrpcSequencerInitializationService(
with NamedLogging {
override def initializeSequencerFromGenesisState(
request: InitializeSequencerFromGenesisStateRequest
responseObserver: StreamObserver[InitializeSequencerFromGenesisStateResponse]
): StreamObserver[InitializeSequencerFromGenesisStateRequest] = {
GrpcStreamingUtils.streamFromClient(
_.topologySnapshot,
_.domainParameters,
(topologySnapshot: ByteString, domainParams: Option[v30.StaticDomainParameters]) =>
initializeSequencerFromGenesisState(topologySnapshot, domainParams),
responseObserver,
)
}
private def initializeSequencerFromGenesisState(
topologySnapshot: ByteString,
domainParameters: Option[v30.StaticDomainParameters],
): Future[InitializeSequencerFromGenesisStateResponse] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
val res: EitherT[Future, CantonError, InitializeSequencerFromGenesisStateResponse] = for {
topologyState <- EitherT.fromEither[Future](
StoredTopologyTransactions
.fromTrustedByteString(request.topologySnapshot)
.fromTrustedByteString(topologySnapshot)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
@ -64,7 +80,7 @@ class GrpcSequencerInitializationService(
.parseRequired(
StaticDomainParameters.fromProtoV30,
"domain_parameters",
request.domainParameters,
domainParameters,
)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
@ -98,8 +114,18 @@ class GrpcSequencerInitializationService(
}
override def initializeSequencerFromOnboardingState(
request: InitializeSequencerFromOnboardingStateRequest
): Future[InitializeSequencerFromOnboardingStateResponse] = {
responseObserver: StreamObserver[InitializeSequencerFromOnboardingStateResponse]
): StreamObserver[InitializeSequencerFromOnboardingStateRequest] = {
GrpcStreamingUtils.streamFromClient(
_.onboardingState,
_ => (),
(onboardingState: ByteString, _: Unit) =>
initializeSequencerFromOnboardingState(onboardingState),
responseObserver,
)
}
private def initializeSequencerFromOnboardingState(onboardingState: ByteString) = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
val res: EitherT[Future, CantonError, InitializeSequencerFromOnboardingStateResponse] = for {
onboardingState <- EitherT.fromEither[Future](
@ -108,7 +134,7 @@ class GrpcSequencerInitializationService(
// the caller of this endpoint could get the onboarding state from various sequencers
// and compare them for byte-for-byte equality, to increase the confidence that this
// is safe to deserialize
.fromTrustedByteString(request.onboardingState)
.fromTrustedByteString(onboardingState)
.leftMap(ProtoDeserializationFailure.Wrap(_))
)
initializeRequest = InitializeSequencerRequest(
@ -127,7 +153,6 @@ class GrpcSequencerInitializationService(
} yield InitializeSequencerFromOnboardingStateResponse(result.replicated)
mapErrNew(res)
}
}
object GrpcSequencerInitializationService {

View File

@ -9,7 +9,10 @@ import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt
import com.digitalasset.canton.crypto.{HashPurpose, Signature}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.RegisterError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscriptionError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.{
CreateSubscriptionError,
SequencerError,
}
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.TimestampSelector.TimestampSelector
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.{
SequencerRateLimitError,
@ -139,7 +142,7 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
override def pruningScheduler: Option[PruningScheduler] = ???
override def snapshot(timestamp: CantonTimestamp)(implicit
traceContext: TraceContext
): EitherT[Future, String, SequencerSnapshot] =
): EitherT[Future, SequencerError, SequencerSnapshot] =
???
override protected val localSequencerMember: Member = sequencerId
override protected def disableMemberInternal(member: Member)(implicit

View File

@ -105,7 +105,7 @@ class DatabaseSequencerSnapshottingTest extends SequencerApiTest {
error <- sequencer
.snapshot(CantonTimestamp.MaxValue)
.leftOrFail("snapshotting after the watermark is expected to fail")
_ <- error should include(" is after the safe watermark")
_ <- error.cause should include(" is after the safe watermark")
// Note: below we use the timestamp that is currently the safe watermark in the sequencer
snapshot <- valueOrFail(sequencer.snapshot(CantonTimestamp.Epoch.immediateSuccessor))(

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --enable-interfaces=yes
name: carbonv1-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --enable-interfaces=yes
name: carbonv2-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
name: experimental-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --enable-interfaces=yes
name: model-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
name: package-management-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
build-options:
- --enable-interfaces=yes
name: semantic-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240701.13159.0.v6e9c240f
sdk-version: 3.1.0-snapshot.20240705.13166.0.v801ce9b3
name: upgrade-tests
source: .
version: 1.0.0

Some files were not shown because too many files have changed in this diff Show More