update canton to 20240604.13418.v5318c201 (#19327)

tell-slack: canton

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-06-04 14:39:55 +02:00 committed by GitHub
parent 18e4e155fb
commit 79929ac266
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
80 changed files with 687 additions and 1630 deletions

View File

@ -4,14 +4,7 @@
package com.digitalasset.canton.console
import com.digitalasset.canton.admin.api.client.commands.EnterpriseSequencerAdminCommands.LocatePruningTimestampCommand
import com.digitalasset.canton.admin.api.client.commands.{
EnterpriseSequencerAdminCommands,
EnterpriseSequencerBftAdminCommands,
GrpcAdminCommand,
PruningSchedulerCommands,
SequencerAdminCommands,
SequencerPublicCommands,
}
import com.digitalasset.canton.admin.api.client.commands.*
import com.digitalasset.canton.admin.api.client.data.StaticDomainParameters as ConsoleStaticDomainParameters
import com.digitalasset.canton.config.RequireTypes.{ExistingFile, NonNegativeInt, Port, PositiveInt}
import com.digitalasset.canton.config.*
@ -51,7 +44,6 @@ import com.digitalasset.canton.participant.{ParticipantNode, ParticipantNodeBoot
import com.digitalasset.canton.sequencer.admin.v30.SequencerPruningAdministrationServiceGrpc
import com.digitalasset.canton.sequencer.admin.v30.SequencerPruningAdministrationServiceGrpc.SequencerPruningAdministrationServiceStub
import com.digitalasset.canton.sequencing.{GrpcSequencerConnection, SequencerConnections}
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.store.TimeQuery
import com.digitalasset.canton.tracing.NoTracing
@ -1053,16 +1045,9 @@ abstract class SequencerReference(
|The command will fail if a client has not yet read and acknowledged some data up to the specified time."""
)
def prune_at(timestamp: CantonTimestamp): String = {
val status = this.status()
val unauthenticatedMembers =
status.unauthenticatedMembersToDisable(
this.consoleEnvironment.environment.config.parameters.retentionPeriodDefaults.unauthenticatedMembers.toInternal
)
unauthenticatedMembers.foreach(disable_member)
val msg = this.consoleEnvironment.run {
this.consoleEnvironment.run {
runner.adminCommand(EnterpriseSequencerAdminCommands.Prune(timestamp))
}
s"$msg. Automatically disabled ${unauthenticatedMembers.size} unauthenticated member clients."
}
@Help.Summary(
@ -1078,10 +1063,7 @@ abstract class SequencerReference(
if (dryRun) {
formatDisableDryRun(timestamp, clientsToDisable)
} else {
val authenticatedClientsToDisable = clientsToDisable.members.toSeq.filter(_.isAuthenticated)
// There's no need to explicitly disable unauthenticated members
// prune will take care of that implicitly
authenticatedClientsToDisable.foreach(disable_member)
clientsToDisable.members.toSeq.foreach(disable_member)
// check we can now prune for the provided timestamp
val statusAfterDisabling = status()
@ -1094,7 +1076,7 @@ abstract class SequencerReference(
val pruneMsg = prune_at(timestamp)
if (clientsToDisable.members.nonEmpty) {
s"$pruneMsg\nDisabled the following authenticated members:${authenticatedClientsToDisable.map(_.toString).sorted.mkString("\n - ", "\n - ", "\n")}"
s"$pruneMsg\nDisabled the following members:${clientsToDisable.members.toSeq.map(_.toString).sorted.mkString("\n - ", "\n - ", "\n")}"
} else {
pruneMsg
}

View File

@ -69,13 +69,6 @@ service SequencerService {
// still accept the request internally and therefore emit events later on.
rpc SendAsyncVersioned(SendAsyncVersionedRequest) returns (SendAsyncVersionedResponse);
// Submit an unauthenticated request to the sequencer.
// The behavior is as for SendAsyncVersioned, except that the sender is not authenticated.
// (Further details, e.g. about allowed recipients, can be found in the implementation.)
//
// This method will be discontinued soon.
rpc SendAsyncUnauthenticatedVersioned(SendAsyncUnauthenticatedVersionedRequest) returns (SendAsyncUnauthenticatedVersionedResponse);
// Establishes a stream with the server to receive sequenced events from the domain after the given
// counter. The delivered events will have a sequential counter and monotonically increasing timestamp.
//
@ -84,11 +77,6 @@ service SequencerService {
// event.topology_timestamp refers to a time before the sequencer has been onboarded.
rpc SubscribeVersioned(SubscriptionRequest) returns (stream VersionedSubscriptionResponse);
// Same as SubscribeVersioned except that it lacks authentication for the subscriber.
//
// This method will be discontinued soon.
rpc SubscribeUnauthenticatedVersioned(SubscriptionRequest) returns (stream VersionedSubscriptionResponse);
// Allows a member to acknowledge that they have read all events up to and including the provided timestamp,
// and that they will never re-read these events again. This information is currently only used for informational
// purposes and to provide a watermark for which it is safe to prune earlier events from the sequencer data stores.
@ -108,41 +96,6 @@ message SendAsyncVersionedRequest {
bytes signed_submission_request = 1;
}
message SendAsyncUnauthenticatedVersionedRequest {
// Contains a versioned SubmissionRequest
bytes submission_request = 1;
}
message SendAsyncUnauthenticatedVersionedResponse {
Error error = 1; // Defined iff the response is an error.
message Error {
oneof reason {
// The sequencer couldn't read the request (typically indicates a serialization and/or versioning bug).
string request_invalid = 1;
// The sequencer could read the request but refused to handle it (the request may violate a max size constraint).
string request_refused = 2;
// The sequencer is overloaded and does not have capacity to handle this request.
string overloaded = 3;
// The specified sender is not registered so the sequencer cannot guarantee publishing a Deliver event if the request can be sequenced.
string sender_unknown = 4;
// The sequencer is shutting down so is declining to process new requests
string shutting_down = 5;
// The sequencer is unavailable and can't currently process requests
string unavailable = 6;
// There are one or more recipients that are not registered so the sequencer cannot guarantee publishing a Deliver event if the request can be sequenced.
// This message was added in protocol version 1.1, therefore it must not be used by a sequencer operating on Canton 1.0 protocol version.
string unknown_recipients = 7;
}
}
}
// Changes compared to SendAsyncResponse: added `Internal` and `Generic`. Note: `Generic` is not used yet, it is introduced for upgradability purposes.
message SendAsyncVersionedResponse {
Error error = 1; // Defined iff the response is an error.

View File

@ -23,7 +23,7 @@ object FatalError {
case None => logger.error(message)
}
sys.exit(1)
sys.exit(117)
}
def exitOnFatalError(error: CantonError, logger: TracedLogger)(implicit

View File

@ -0,0 +1,147 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.sequencing
import cats.data.EitherT
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.lifecycle.UnlessShutdown.{AbortedDueToShutdown, Outcome}
import com.digitalasset.canton.lifecycle.{
FutureUnlessShutdown,
PromiseUnlessShutdown,
UnlessShutdown,
}
import com.digitalasset.canton.logging.{ErrorLoggingContext, TracedLogger}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.ThereafterOps
import com.digitalasset.canton.util.{ErrorUtil, FutureUtil}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.concurrent.{ExecutionContext, blocking}
import scala.util.{Failure, Success, Try}
/** Utility class to make BFT-style operations.
*/
object BftSender {
/** Returned when the request fails to reach the required threshold
* @param successes The operators that successfully performed the request, grouped by hash of the result
* @param failures The operators that failed to perform the request
* @tparam K type of the value hash
* @tparam I type of the identity of operators
* @tparam E error type of the operation performed
*/
final case class FailedToReachThreshold[K, I, E](
successes: Map[K, Set[I]],
failures: Map[I, Either[Throwable, E]],
)
/** Make a request to multiple operators and aggregate the responses such as the final result will be successful
* only if "threshold" responses were identical.
* As soon as the threshold is reached, this method returns. It will also return with an error as soon as it is guaranteed that
* it cannot possibly gather sufficiently identical requests to meet the threshold.
* @param description description of the request
* @param threshold minimum value of identical results that need to be received for the request to be successful (inclusive)
* @param operators operators to use for the request. The request will be performed via every operator.
* @param performRequest request to be performed.
* @param resultHashKey function to provide a hash from a result. This is what determine whether 2 responses are identical.
* @tparam I key of the operator, typically and ID
* @tparam E Error type of performRequest
* @tparam O operator type: object with which the performRequest function will be called
* @tparam A type of the result
* @tparam K type of the result hash
* @return The result of performRequest if sufficiently many responses were identical from the operators.
*/
def makeRequest[I, E, O, A, K](
description: String,
futureSupervisor: FutureSupervisor,
logger: TracedLogger,
operators: Map[I, O],
threshold: PositiveInt,
performRequest: O => EitherT[FutureUnlessShutdown, E, A],
resultHashKey: A => K,
)(implicit
traceContext: TraceContext,
executionContext: ExecutionContext,
): EitherT[FutureUnlessShutdown, FailedToReachThreshold[K, I, E], A] = {
implicit val elc: ErrorLoggingContext = ErrorLoggingContext.fromTracedLogger(logger)
// Keeps track of successful responses in a hashmap so we can easily count how many identical ones we get
val successfulResults = TrieMap.empty[K, Set[I]]
// Separately keep track of the total number of responses received for fast comparison
val responsesCount = new AtomicInteger(0)
// We don't technically need the failures, but keep them around so we can log and return them if we never reach the threshold
val failedResults = TrieMap.empty[I, Either[Throwable, E]]
// Promise that provide the result for this method
val promise = new PromiseUnlessShutdown[Either[FailedToReachThreshold[K, I, E], A]](
description,
futureSupervisor,
)
// Provides an object on which to synchronize to avoid concurrency issues when checking results
val lock = new Object
def addResult(operatorId: I, result: Try[UnlessShutdown[Either[E, A]]]): Unit = blocking {
lock.synchronized {
if (promise.isCompleted) {
logger.debug(
s"Ignoring response $result from $operatorId for $description since the threshold has already been reached or the request has failed."
)
} else {
val responsesReceived = responsesCount.incrementAndGet()
// If there's not enough missing responses left to get to the threshold, we can fail immediately
def checkIfStillPossibleToReachThreshold(): Unit = {
val missingResponses = operators.size - responsesReceived
val bestChanceOfReachingThreshold =
successfulResults.values.map(_.size).foldLeft(0)(_.max(_))
if (bestChanceOfReachingThreshold + missingResponses < threshold.value) {
logger.info(
s"Cannot reach threshold for $description. Threshold = ${threshold.value}, failed results: $failedResults, successful results: $successfulResults"
)
promise.outcome(
Left(FailedToReachThreshold[K, I, E](successfulResults.toMap, failedResults.toMap))
)
}
}
// Checks that the operator has not provided a result yet (successful or not)
ErrorUtil.requireState(
!(successfulResults.values.toList.flatten ++ failedResults.keySet).contains(operatorId),
s"Operator $operatorId has already provided a result. Please report this as a bug",
)
result match {
case Success(Outcome(Right(value))) =>
val updated = successfulResults.updateWith(resultHashKey(value)) {
case Some(operators) => Some(operators ++ Set(operatorId))
case None => Some(Set(operatorId))
}
// If we've reached the threshold we can stop
if (updated.map(_.size).getOrElse(0) >= threshold.value) promise.outcome(Right(value))
case Success(Outcome(Left(error))) =>
failedResults.put(operatorId, Right(error)).discard
case Failure(ex) =>
failedResults.put(operatorId, Left(ex)).discard
case Success(AbortedDueToShutdown) =>
promise.shutdown()
}
if (!promise.isCompleted) checkIfStillPossibleToReachThreshold()
}
}
}
operators.foreach { case (operatorId, operator) =>
FutureUtil.doNotAwaitUnlessShutdown(
performRequest(operator).value
.thereafter(addResult(operatorId, _))
.map(_ => ()),
s"$description failed for $operatorId",
)
}
EitherT(promise.futureUS)
}
}

View File

@ -44,7 +44,6 @@ object MemberAuthentication extends MemberAuthentication {
def apply(member: Member): Either[AuthenticationError, MemberAuthentication] = member match {
case _: ParticipantId | _: MediatorId => Right(this)
case _: SequencerId => Left(AuthenticationNotSupportedForMember(member))
case _: UnauthenticatedMemberId => Left(AuthenticationNotSupportedForMember(member))
}
sealed abstract class AuthenticationError(val reason: String, val code: String)

View File

@ -15,7 +15,7 @@ import com.digitalasset.canton.sequencing.authentication.{
AuthenticationTokenManagerConfig,
}
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.{AuthenticatedMember, DomainId, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.{DomainId, Member}
import com.digitalasset.canton.tracing.TraceContext
import com.google.common.annotations.VisibleForTesting
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall
@ -33,7 +33,7 @@ import scala.util.control.NonFatal
*/
private[grpc] class SequencerClientTokenAuthentication(
domainId: DomainId,
member: AuthenticatedMember,
member: Member,
tokenManagerPerEndpoint: NonEmpty[Map[Endpoint, AuthenticationTokenManager]],
protected val loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext)
@ -160,7 +160,7 @@ private[grpc] class SequencerClientTokenAuthentication(
object SequencerClientTokenAuthentication {
def apply(
domainId: DomainId,
authenticatedMember: AuthenticatedMember,
member: Member,
obtainTokenPerEndpoint: NonEmpty[
Map[
Endpoint,
@ -186,7 +186,7 @@ object SequencerClientTokenAuthentication {
}
new SequencerClientTokenAuthentication(
domainId,
authenticatedMember,
member,
tokenManagerPerEndpoint,
loggerFactory,
)
@ -194,32 +194,6 @@ object SequencerClientTokenAuthentication {
}
class SequencerClientNoAuthentication(domainId: DomainId, member: UnauthenticatedMemberId)
extends SequencerClientAuthentication {
private val metadata: Metadata = {
val metadata = new Metadata()
metadata.put(Constant.MEMBER_ID_METADATA_KEY, member.toProtoPrimitive)
metadata.put(Constant.DOMAIN_ID_METADATA_KEY, domainId.toProtoPrimitive)
metadata
}
override def apply[S <: AbstractStub[S]](client: S): S =
client.withCallCredentials(callCredentials)
@VisibleForTesting
private[grpc] val callCredentials: CallCredentials = new CallCredentials {
override def applyRequestMetadata(
requestInfo: CallCredentials.RequestInfo,
appExecutor: Executor,
applier: CallCredentials.MetadataApplier,
): Unit = applier.apply(metadata)
override def thisUsesUnstableApi(): Unit = {
// yes, we know - cheers grpc
}
}
}
trait SequencerClientAuthentication {
/** Apply the sequencer authentication components to a grpc client stub */

View File

@ -291,7 +291,6 @@ object SequencerSubscriptionFactoryPekko {
def fromTransport[E](
sequencerID: SequencerId,
transport: SequencerClientTransportPekko.Aux[E],
requiresAuthentication: Boolean,
member: Member,
protocolVersion: ProtocolVersion,
): SequencerSubscriptionFactoryPekko[E] =
@ -302,8 +301,7 @@ object SequencerSubscriptionFactoryPekko {
traceContext: TraceContext
): SequencerSubscriptionPekko[E] = {
val request = SubscriptionRequest(member, startingCounter, protocolVersion)
if (requiresAuthentication) transport.subscribe(request)
else transport.subscribeUnauthenticated(request)
transport.subscribe(request)
}
override val retryPolicy: SubscriptionErrorRetryPolicyPekko[E] =

View File

@ -315,14 +315,13 @@ object ResilientSequencerSubscription extends SequencerSubscriptionErrorGroup {
warnDelay: FiniteDuration,
maxRetryDelay: FiniteDuration,
timeouts: ProcessingTimeout,
requiresAuthentication: Boolean,
loggerFactory: NamedLoggerFactory,
)(implicit executionContext: ExecutionContext): ResilientSequencerSubscription[E] = {
new ResilientSequencerSubscription[E](
sequencerId,
startingFrom,
handler,
createSubscription(member, getTransport, requiresAuthentication, protocolVersion),
createSubscription(member, getTransport, protocolVersion),
SubscriptionRetryDelayRule(
initialDelay,
warnDelay,
@ -337,7 +336,6 @@ object ResilientSequencerSubscription extends SequencerSubscriptionErrorGroup {
private def createSubscription[E](
member: Member,
getTransport: => UnlessShutdown[SequencerClientTransport],
requiresAuthentication: Boolean,
protocolVersion: ProtocolVersion,
): SequencerSubscriptionFactory[E] =
new SequencerSubscriptionFactory[E] {
@ -348,8 +346,7 @@ object ResilientSequencerSubscription extends SequencerSubscriptionErrorGroup {
getTransport
.map { transport =>
val subscription =
if (requiresAuthentication) transport.subscribe(request, handler)(traceContext)
else transport.subscribeUnauthenticated(request, handler)(traceContext)
transport.subscribe(request, handler)(traceContext)
(subscription, transport.subscriptionRetryPolicy)
}
}

View File

@ -50,6 +50,7 @@ import com.digitalasset.canton.util.ErrorUtil
import com.digitalasset.canton.util.PekkoUtil.WithKillSwitch
import com.digitalasset.canton.util.PekkoUtil.syntax.*
import com.digitalasset.canton.version.ProtocolVersion
import com.google.common.annotations.VisibleForTesting
import scala.concurrent.{ExecutionContext, Future}
@ -409,9 +410,7 @@ trait SequencedEventValidatorFactory {
* The [[com.digitalasset.canton.sequencing.client.SequencerSubscription]] requests this event again.
* @param unauthenticated Whether the subscription is unauthenticated
*/
def create(
unauthenticated: Boolean
)(implicit loggingContext: NamedLoggingContext): SequencedEventValidator
def create()(implicit loggingContext: NamedLoggingContext): SequencedEventValidator
}
object SequencedEventValidatorFactory {
@ -426,19 +425,13 @@ object SequencedEventValidatorFactory {
domainId: DomainId,
warn: Boolean = true,
): SequencedEventValidatorFactory = new SequencedEventValidatorFactory {
override def create(
unauthenticated: Boolean
)(implicit loggingContext: NamedLoggingContext): SequencedEventValidator =
override def create()(implicit loggingContext: NamedLoggingContext): SequencedEventValidator =
SequencedEventValidator.noValidation(domainId, warn)
}
}
/** Validate whether a received event is valid for processing.
*
* @param unauthenticated if true, then the connection is unauthenticated. in such cases, we have to skip some validations.
*/
/** Validate whether a received event is valid for processing. */
class SequencedEventValidatorImpl(
unauthenticated: Boolean,
domainId: DomainId,
protocolVersion: ProtocolVersion,
syncCryptoApi: SyncCryptoClient[SyncCryptoApi],
@ -576,21 +569,15 @@ class SequencedEventValidatorImpl(
Either.cond(receivedDomainId == domainId, (), BadDomainId(domainId, receivedDomainId))
}
private def verifySignature(
@VisibleForTesting
protected def verifySignature(
priorEventO: Option[PossiblyIgnoredSerializedEvent],
event: OrdinarySerializedEvent,
sequencerId: SequencerId,
protocolVersion: ProtocolVersion,
): EitherT[FutureUnlessShutdown, SequencedEventValidationError[Nothing], Unit] = {
implicit val traceContext: TraceContext = event.traceContext
if (unauthenticated) {
// TODO(i4933) once we have topology data on the sequencer api, we might fetch the domain keys
// and use the domain keys to validate anything here if we are unauthenticated
logger.debug(
s"Skipping sequenced event validation for counter ${event.counter} and timestamp ${event.timestamp} in unauthenticated subscription from $sequencerId"
)
EitherT.fromEither[FutureUnlessShutdown](checkNoTimestampOfSigningKey(event))
} else if (event.counter == SequencerCounter.Genesis) {
if (event.counter == SequencerCounter.Genesis) {
// TODO(#4933) This is a fresh subscription. Either fetch the domain keys via a future sequencer API and validate the signature
// or wait until the topology processor has processed the topology information in the first message and then validate the signature.
logger.info(

View File

@ -8,7 +8,6 @@ import cats.data.EitherT
import cats.implicits.catsSyntaxOptionId
import cats.syntax.alternative.*
import cats.syntax.either.*
import cats.syntax.foldable.*
import cats.syntax.functor.*
import cats.syntax.parallel.*
import com.daml.metrics.Timed
@ -101,31 +100,6 @@ trait SequencerClient extends SequencerClientSend with FlagCloseable {
*/
def trafficStateController: TrafficStateController
/** Sends a request to sequence a deliver event to the sequencer.
* This method merely dispatches to one of the other methods (`sendAsync` or `sendAsyncUnauthenticated`)
* depending if member is Authenticated or Unauthenticated.
*/
def sendAsyncUnauthenticatedOrNot(
batch: Batch[DefaultOpenEnvelope],
topologyTimestamp: Option[CantonTimestamp] = None,
maxSequencingTime: CantonTimestamp = generateMaxSequencingTime,
messageId: MessageId = generateMessageId,
aggregationRule: Option[AggregationRule] = None,
callback: SendCallback = SendCallback.empty,
amplify: Boolean = false,
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit]
/** Does the same as [[sendAsync]], except that this method is supposed to be used
* only by unauthenticated members for very specific operations that do not require authentication
* such as requesting that a participant's topology data gets accepted by the topology manager
*/
def sendAsyncUnauthenticated(
batch: Batch[DefaultOpenEnvelope],
maxSequencingTime: CantonTimestamp = generateMaxSequencingTime,
messageId: MessageId = generateMessageId,
callback: SendCallback = SendCallback.empty,
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit]
/** Create a subscription for sequenced events for this member,
* starting after the prehead in the `sequencerCounterTrackerStore`.
*
@ -170,15 +144,6 @@ trait SequencerClient extends SequencerClientSend with FlagCloseable {
fetchCleanTimestamp: PeriodicAcknowledgements.FetchCleanTimestamp,
)(implicit traceContext: TraceContext): Future[Unit]
/** Does the same as [[subscribeAfter]], except that this method is supposed to be used
* only by unauthenticated members
*/
def subscribeAfterUnauthenticated(
priorTimestamp: CantonTimestamp,
eventHandler: PossiblyIgnoredApplicationHandler[ClosedEnvelope],
timeTracker: DomainTimeTracker,
)(implicit traceContext: TraceContext): Future[Unit]
/** Acknowledge that we have successfully processed all events up to and including the given timestamp.
* The client should then never subscribe for events from before this point.
*/
@ -250,37 +215,6 @@ abstract class SequencerClientImpl(
private lazy val printer =
new CantonPrettyPrinter(loggingConfig.api.maxStringLength, loggingConfig.api.maxMessageLines)
override def sendAsyncUnauthenticatedOrNot(
batch: Batch[DefaultOpenEnvelope],
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
amplify: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] = {
member match {
case _: AuthenticatedMember =>
sendAsync(
batch = batch,
topologyTimestamp = topologyTimestamp,
maxSequencingTime = maxSequencingTime,
messageId = messageId,
aggregationRule = aggregationRule,
callback = callback,
)
case _: UnauthenticatedMemberId =>
sendAsyncUnauthenticated(
batch = batch,
maxSequencingTime = maxSequencingTime,
messageId = messageId,
callback = callback,
)
}
}
override def sendAsync(
batch: Batch[DefaultOpenEnvelope],
topologyTimestamp: Option[CantonTimestamp],
@ -293,29 +227,9 @@ abstract class SequencerClientImpl(
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] =
for {
_ <- EitherT.cond[FutureUnlessShutdown](
member.isAuthenticated,
(),
SendAsyncClientError.RequestInvalid(
"Only authenticated members can use the authenticated send operation"
): SendAsyncClientError,
)
// TODO(#12950): Validate that group addresses map to at least one member
_ <- EitherT.cond[FutureUnlessShutdown](
topologyTimestamp.isEmpty || batch.envelopes.forall(
_.recipients.allRecipients.forall {
case MemberRecipient(m) => m.isAuthenticated
case _ => true
}
),
(),
SendAsyncClientError.RequestInvalid(
"Requests addressed to unauthenticated members must not specify a topology timestamp"
): SendAsyncClientError,
)
result <- sendAsyncInternal(
batch,
requiresAuthentication = true,
topologyTimestamp,
maxSequencingTime,
messageId,
@ -325,37 +239,6 @@ abstract class SequencerClientImpl(
)
} yield result
/** Does the same as [[sendAsync]], except that this method is supposed to be used
* only by unauthenticated members for very specific operations that do not require authentication
* such as requesting that a participant's topology data gets accepted by the topology manager
*/
override def sendAsyncUnauthenticated(
batch: Batch[DefaultOpenEnvelope],
maxSequencingTime: CantonTimestamp = generateMaxSequencingTime,
messageId: MessageId = generateMessageId,
callback: SendCallback = SendCallback.empty,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] =
if (member.isAuthenticated)
EitherT.leftT(
SendAsyncClientError.RequestInvalid(
"Only unauthenticated members can use the unauthenticated send operation"
)
)
else
sendAsyncInternal(
batch,
requiresAuthentication = false,
// Requests involving unauthenticated members must not specify a topology timestamp
topologyTimestamp = None,
maxSequencingTime = maxSequencingTime,
messageId = messageId,
aggregationRule = None,
callback = callback,
amplify = false,
)
private def checkRequestSize(
request: SubmissionRequest,
maxRequestSize: MaxRequestSize,
@ -375,7 +258,6 @@ abstract class SequencerClientImpl(
private def sendAsyncInternal(
batch: Batch[DefaultOpenEnvelope],
requiresAuthentication: Boolean,
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
@ -523,7 +405,6 @@ abstract class SequencerClientImpl(
_ <- performSend(
messageId,
request,
requiresAuthentication,
amplify,
() => peekAtSendResult(),
syncCryptoApi,
@ -537,7 +418,6 @@ abstract class SequencerClientImpl(
private def performSend(
messageId: MessageId,
request: SubmissionRequest,
requiresAuthentication: Boolean,
amplify: Boolean,
peekAtSendResult: () => Option[UnlessShutdown[SendResult]],
topologySnapshot: SyncCryptoApi,
@ -546,45 +426,41 @@ abstract class SequencerClientImpl(
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] = {
EitherTUtil
.timed(metrics.submissions.sends) {
val timeout = timeouts.network.duration
if (requiresAuthentication) {
val (sequencerId, transport, patienceO) =
sequencersTransportState.nextAmplifiedTransport(Seq.empty)
// Do not add an aggregation rule for amplifiable requests if amplification has not been configured
val amplifiableRequest =
if (amplify && request.aggregationRule.isEmpty && patienceO.isDefined) {
val aggregationRule =
AggregationRule(NonEmpty(Seq, member), PositiveInt.one, protocolVersion)
logger.debug(
s"Adding aggregation rule $aggregationRule to submission request with message ID $messageId"
)
request.copy(aggregationRule = aggregationRule.some)
} else request
for {
signedContent <- requestSigner
.signRequest(
amplifiableRequest,
HashPurpose.SubmissionRequestSignature,
Some(topologySnapshot),
)
.leftMap { err =>
val message = s"Error signing submission request $err"
logger.error(message)
SendAsyncClientError.RequestRefused(SendAsyncError.RequestRefused(message))
}
_ <- amplifiedSend(
signedContent,
sequencerId,
transport,
if (amplify) patienceO else None,
peekAtSendResult,
val (sequencerId, transport, patienceO) =
sequencersTransportState.nextAmplifiedTransport(Seq.empty)
// Do not add an aggregation rule for amplifiable requests if amplification has not been configured
val amplifiableRequest =
if (amplify && request.aggregationRule.isEmpty && patienceO.isDefined) {
val aggregationRule =
AggregationRule(NonEmpty(Seq, member), PositiveInt.one, protocolVersion)
logger.debug(
s"Adding aggregation rule $aggregationRule to submission request with message ID $messageId"
)
} yield ()
} else
sequencersTransportState.transport
.sendAsyncUnauthenticatedVersioned(request, timeout)
request.copy(aggregationRule = aggregationRule.some)
} else request
for {
signedContent <- requestSigner
.signRequest(
amplifiableRequest,
HashPurpose.SubmissionRequestSignature,
Some(topologySnapshot),
)
.leftMap { err =>
val message = s"Error signing submission request $err"
logger.error(message)
SendAsyncClientError.RequestRefused(SendAsyncError.RequestRefused(message))
}
_ <- amplifiedSend(
signedContent,
sequencerId,
transport,
if (amplify) patienceO else None,
peekAtSendResult,
)
} yield ()
}
.leftSemiflatMap { err =>
// increment appropriate error metrics
@ -796,27 +672,6 @@ abstract class SequencerClientImpl(
eventHandler,
timeTracker,
fetchCleanTimestamp,
requiresAuthentication = true,
)
/** Does the same as [[subscribeAfter]], except that this method is supposed to be used
* only by unauthenticated members
*
* The method does not verify the signature of the server.
*/
def subscribeAfterUnauthenticated(
priorTimestamp: CantonTimestamp,
eventHandler: PossiblyIgnoredApplicationHandler[ClosedEnvelope],
timeTracker: DomainTimeTracker,
)(implicit traceContext: TraceContext): Future[Unit] =
subscribeAfterInternal(
priorTimestamp,
// We do not track cleanliness for unauthenticated subscriptions
cleanPreheadTsO = None,
eventHandler,
timeTracker,
PeriodicAcknowledgements.noAcknowledgements,
requiresAuthentication = false,
)
protected def subscribeAfterInternal(
@ -825,7 +680,6 @@ abstract class SequencerClientImpl(
nonThrottledEventHandler: PossiblyIgnoredApplicationHandler[ClosedEnvelope],
timeTracker: DomainTimeTracker,
fetchCleanTimestamp: PeriodicAcknowledgements.FetchCleanTimestamp,
requiresAuthentication: Boolean,
)(implicit traceContext: TraceContext): Future[Unit]
/** Acknowledge that we have successfully processed all events up to and including the given timestamp.
@ -967,7 +821,6 @@ class RichSequencerClientImpl(
nonThrottledEventHandler: PossiblyIgnoredApplicationHandler[ClosedEnvelope],
timeTracker: DomainTimeTracker,
fetchCleanTimestamp: PeriodicAcknowledgements.FetchCleanTimestamp,
requiresAuthentication: Boolean,
)(implicit traceContext: TraceContext): Future[Unit] = {
val throttledEventHandler = ThrottlingApplicationEventHandler.throttle(
config.maximumInFlightEventBatches,
@ -1054,7 +907,6 @@ class RichSequencerClientImpl(
sequencerAlias,
sequencerTransport.sequencerId,
preSubscriptionEvent,
requiresAuthentication,
eventHandler,
).discard
}
@ -1062,21 +914,19 @@ class RichSequencerClientImpl(
// periodically acknowledge that we've successfully processed up to the clean counter
// We only need to it setup once; the sequencer client will direct the acknowledgements to the
// right transport.
if (requiresAuthentication) { // unauthenticated members don't need to ack
periodicAcknowledgementsRef.set(
PeriodicAcknowledgements
.create(
config.acknowledgementInterval.underlying,
deferredSubscriptionHealth.getState.isOk,
RichSequencerClientImpl.this,
fetchCleanTimestamp,
clock,
timeouts,
loggerFactory,
)
.some
)
}
periodicAcknowledgementsRef.set(
PeriodicAcknowledgements
.create(
config.acknowledgementInterval.underlying,
deferredSubscriptionHealth.getState.isOk,
RichSequencerClientImpl.this,
fetchCleanTimestamp,
clock,
timeouts,
loggerFactory,
)
.some
)
}
}
@ -1095,14 +945,13 @@ class RichSequencerClientImpl(
sequencerAlias: SequencerAlias,
sequencerId: SequencerId,
preSubscriptionEvent: Option[PossiblyIgnoredSerializedEvent],
requiresAuthentication: Boolean,
eventHandler: OrdinaryApplicationHandler[ClosedEnvelope],
)(implicit
traceContext: TraceContext
): ResilientSequencerSubscription[SequencerClientSubscriptionError] = {
// previously seen counter takes precedence over the lower bound
val nextCounter = preSubscriptionEvent.fold(initialCounterLowerBound)(_.counter)
val eventValidator = eventValidatorFactory.create(unauthenticated = !requiresAuthentication)
val eventValidator = eventValidatorFactory.create()
logger.info(
s"Starting subscription for alias=$sequencerAlias, id=$sequencerId at timestamp ${preSubscriptionEvent
.map(_.timestamp)}; next counter $nextCounter"
@ -1145,7 +994,6 @@ class RichSequencerClientImpl(
config.warnDisconnectDelay.underlying,
config.maxConnectionRetryDelay.underlying,
timeouts,
requiresAuthentication,
loggerFactory,
)
@ -1598,7 +1446,6 @@ class SequencerClientImplPekko[E: Pretty](
nonThrottledEventHandler: PossiblyIgnoredApplicationHandler[ClosedEnvelope],
timeTracker: DomainTimeTracker,
fetchCleanTimestamp: FetchCleanTimestamp,
requiresAuthentication: Boolean,
)(implicit traceContext: TraceContext): Future[Unit] = {
val throttledEventHandler = ThrottlingApplicationEventHandler.throttle(
config.maximumInFlightEventBatches,
@ -1671,7 +1518,7 @@ class SequencerClientImplPekko[E: Pretty](
}
logger.debug(subscriptionStartLogMessage)
val eventValidator = eventValidatorFactory.create(unauthenticated = !requiresAuthentication)
val eventValidator = eventValidatorFactory.create()
val aggregator = new SequencerAggregatorPekko(
domainId,
eventValidator,
@ -1707,7 +1554,6 @@ class SequencerClientImplPekko[E: Pretty](
SequencerSubscriptionFactoryPekko.fromTransport(
transportContainer.sequencerId,
transportContainer.clientTransport,
requiresAuthentication,
member,
protocolVersion,
)
@ -1827,21 +1673,19 @@ class SequencerClientImplPekko[E: Pretty](
// periodically acknowledge that we've successfully processed up to the clean counter
// We only need to it setup once; the sequencer client will direct the acknowledgements to the
// right transport.
if (requiresAuthentication) { // unauthenticated members don't need to ack
periodicAcknowledgementsRef.set(
PeriodicAcknowledgements
.create(
config.acknowledgementInterval.underlying,
health.getState.isOk,
this,
fetchCleanTimestamp,
clock,
timeouts,
loggerFactory,
)
.some
)
}
periodicAcknowledgementsRef.set(
PeriodicAcknowledgements
.create(
config.acknowledgementInterval.underlying,
health.getState.isOk,
this,
fetchCleanTimestamp,
clock,
timeouts,
loggerFactory,
)
.some
)
replayCompleted.futureUS
}

View File

@ -141,14 +141,13 @@ object SequencerClientFactory {
)
// pluggable send approach to support transitioning to the new async sends
validatorFactory = new SequencedEventValidatorFactory {
override def create(
unauthenticated: Boolean
)(implicit loggingContext: NamedLoggingContext): SequencedEventValidator =
override def create()(implicit
loggingContext: NamedLoggingContext
): SequencedEventValidator =
if (config.skipSequencedEventValidation) {
SequencedEventValidator.noValidation(domainId)
} else {
new SequencedEventValidatorImpl(
unauthenticated,
domainId,
domainParameters.protocolVersion,
syncCryptoApi,

View File

@ -11,21 +11,13 @@ import com.digitalasset.canton.lifecycle.Lifecycle.CloseableChannel
import com.digitalasset.canton.lifecycle.{FlagCloseable, Lifecycle}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.networking.Endpoint
import com.digitalasset.canton.sequencing.authentication.grpc.{
SequencerClientNoAuthentication,
SequencerClientTokenAuthentication,
}
import com.digitalasset.canton.sequencing.authentication.grpc.SequencerClientTokenAuthentication
import com.digitalasset.canton.sequencing.authentication.{
AuthenticationTokenManagerConfig,
AuthenticationTokenProvider,
}
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.{
AuthenticatedMember,
DomainId,
Member,
UnauthenticatedMemberId,
}
import com.digitalasset.canton.topology.{DomainId, Member}
import com.digitalasset.canton.tracing.{TraceContext, TraceContextGrpc}
import com.digitalasset.canton.version.ProtocolVersion
import io.grpc.ManagedChannel
@ -68,20 +60,15 @@ class GrpcSequencerClientAuth(
tokenProvider.generateToken(authenticationClient)
}
}
val clientAuthentication = member match {
case unauthenticatedMember: UnauthenticatedMemberId =>
new SequencerClientNoAuthentication(domainId, unauthenticatedMember)
case authenticatedMember: AuthenticatedMember =>
SequencerClientTokenAuthentication(
domainId,
authenticatedMember,
obtainTokenPerEndpoint,
tokenProvider.isClosing,
tokenManagerConfig,
clock,
loggerFactory,
)
}
val clientAuthentication = SequencerClientTokenAuthentication(
domainId,
member,
obtainTokenPerEndpoint,
tokenProvider.isClosing,
tokenManagerConfig,
clock,
loggerFactory,
)
clientAuthentication(client)
}

View File

@ -124,29 +124,16 @@ private[transports] abstract class GrpcSequencerClientTransportCommon(
"send-async-versioned",
request.content.messageId,
timeout,
SendAsyncUnauthenticatedVersionedResponse.fromSendAsyncVersionedResponseProto,
SendAsyncVersionedResponse.fromProtoV30,
)
}
override def sendAsyncUnauthenticatedVersioned(request: SubmissionRequest, timeout: Duration)(
implicit traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] = sendInternal(
stub =>
stub.sendAsyncUnauthenticatedVersioned(
v30.SendAsyncUnauthenticatedVersionedRequest(submissionRequest = request.toByteString)
),
"send-async-unauthenticated-versioned",
request.messageId,
timeout,
SendAsyncUnauthenticatedVersionedResponse.fromSendAsyncUnauthenticatedVersionedResponseProto,
)
private def sendInternal[Resp](
send: SequencerServiceStub => Future[Resp],
endpoint: String,
messageId: MessageId,
timeout: Duration,
fromResponseProto: Resp => ParsingResult[SendAsyncUnauthenticatedVersionedResponse],
fromResponseProto: Resp => ParsingResult[SendAsyncVersionedResponse],
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] = {
@ -171,7 +158,7 @@ private[transports] abstract class GrpcSequencerClientTransportCommon(
private def fromResponse[Proto](
p: Proto,
deserializer: Proto => ParsingResult[SendAsyncUnauthenticatedVersionedResponse],
deserializer: Proto => ParsingResult[SendAsyncVersionedResponse],
): Either[SendAsyncClientResponseError, Unit] = {
for {
response <- deserializer(p)
@ -333,13 +320,6 @@ class GrpcSequencerClientTransport(
override def subscribe[E](
subscriptionRequest: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] =
subscribeInternal(subscriptionRequest, handler, requiresAuthentication = true)
private def subscribeInternal[E](
subscriptionRequest: SubscriptionRequest,
handler: SerializedEventHandler[E],
requiresAuthentication: Boolean,
)(implicit traceContext: TraceContext): SequencerSubscription[E] = {
// we intentionally don't use `Context.current()` as we don't want to inherit the
// cancellation scope from upstream requests
@ -355,29 +335,16 @@ class GrpcSequencerClientTransport(
context.run(() =>
TraceContextGrpc.withGrpcContext(traceContext) {
if (requiresAuthentication) {
sequencerServiceClient.subscribeVersioned(
subscriptionRequest.toProtoV30,
subscription.observer,
)
} else {
sequencerServiceClient.subscribeUnauthenticatedVersioned(
subscriptionRequest.toProtoV30,
subscription.observer,
)
}
sequencerServiceClient.subscribeVersioned(
subscriptionRequest.toProtoV30,
subscription.observer,
)
}
)
subscription
}
override def subscribeUnauthenticated[E](
request: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] =
subscribeInternal(request, handler, requiresAuthentication = false)
override def subscriptionRetryPolicy: SubscriptionErrorRetryPolicy =
new GrpcSubscriptionErrorRetryPolicy(loggerFactory)
}

View File

@ -62,20 +62,9 @@ class GrpcSequencerClientTransportPekko(
override type SubscriptionError = GrpcSequencerSubscriptionError
override def subscribe(request: SubscriptionRequest)(implicit
override def subscribe(subscriptionRequest: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] =
subscribeInternal(request, requiresAuthentication = true)
override def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] =
subscribeInternal(request, requiresAuthentication = false)
private def subscribeInternal(
subscriptionRequest: SubscriptionRequest,
requiresAuthentication: Boolean,
)(implicit traceContext: TraceContext): SequencerSubscriptionPekko[SubscriptionError] = {
): SequencerSubscriptionPekko[SubscriptionError] = {
val subscriptionRequestP = subscriptionRequest.toProtoV30
@ -133,9 +122,7 @@ class GrpcSequencerClientTransportPekko(
)
}
val subscriber =
if (requiresAuthentication) sequencerServiceClient.subscribeVersioned _
else sequencerServiceClient.subscribeUnauthenticatedVersioned _
val subscriber = sequencerServiceClient.subscribeVersioned _
mkSubscription(subscriber)(SubscriptionResponse.fromVersionedProtoV30(protocolVersion)(_)(_))
}

View File

@ -32,13 +32,6 @@ trait SequencerClientTransportCommon extends FlagCloseable with SupportsHandshak
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit]
def sendAsyncUnauthenticatedVersioned(
request: SubmissionRequest,
timeout: Duration,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit]
/** Acknowledge that we have successfully processed all events up to and including the given timestamp.
* The client should then never subscribe for events from before this point.
*
@ -67,10 +60,6 @@ trait SequencerClientTransport extends SequencerClientTransportCommon {
traceContext: TraceContext
): SequencerSubscription[E]
def subscribeUnauthenticated[E](request: SubscriptionRequest, handler: SerializedEventHandler[E])(
implicit traceContext: TraceContext
): SequencerSubscription[E]
/** The transport can decide which errors will cause the sequencer client to not try to reestablish a subscription */
def subscriptionRetryPolicy: SubscriptionErrorRetryPolicy
}

View File

@ -22,10 +22,6 @@ trait SequencerClientTransportPekko extends SequencerClientTransportCommon {
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError]
def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError]
/** The transport can decide which errors will cause the sequencer client to not try to reestablish a subscription */
def subscriptionRetryPolicyPekko: SubscriptionErrorRetryPolicyPekko[SubscriptionError]
}

View File

@ -63,11 +63,6 @@ class ReplayingEventsSequencerClientTransport(
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] =
EitherT.rightT(())
/** Does nothing */
override def sendAsyncUnauthenticatedVersioned(request: SubmissionRequest, timeout: Duration)(
implicit traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] = EitherT.rightT(())
/** Does nothing */
override def acknowledgeSigned(request: SignedContent[AcknowledgeRequest])(implicit
traceContext: TraceContext
@ -113,11 +108,6 @@ class ReplayingEventsSequencerClientTransport(
new ReplayingSequencerSubscription(timeouts, loggerFactory)
}
override def subscribeUnauthenticated[E](
request: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] = subscribe(request, handler)
/** Will never request a retry. */
override def subscriptionRetryPolicy: SubscriptionErrorRetryPolicy =
SubscriptionErrorRetryPolicy.never
@ -147,10 +137,6 @@ class ReplayingEventsSequencerClientTransport(
new UnsupportedOperationException("subscribe(SubmissionRequest) is not yet implemented")
)
override def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[Nothing] = subscribe(request)
override def subscriptionRetryPolicyPekko: SubscriptionErrorRetryPolicyPekko[Nothing] =
SubscriptionErrorRetryPolicyPekko.never
}

View File

@ -384,14 +384,6 @@ abstract class ReplayingSendsSequencerClientTransportCommon(
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] =
EitherT.rightT(())
/** We're replaying sends so shouldn't allow the app to send any new ones */
override def sendAsyncUnauthenticatedVersioned(
request: SubmissionRequest,
timeout: Duration,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] = EitherT.rightT(())
override def acknowledgeSigned(request: SignedContent[AcknowledgeRequest])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, Boolean] =
@ -451,11 +443,6 @@ class ReplayingSendsSequencerClientTransportImpl(
): Unit = closeReasonPromise.trySuccess(reason).discard[Boolean]
}
override def subscribeUnauthenticated[E](
request: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] = subscribe(request, handler)
override def subscriptionRetryPolicy: SubscriptionErrorRetryPolicy =
SubscriptionErrorRetryPolicy.never
@ -471,11 +458,6 @@ class ReplayingSendsSequencerClientTransportImpl(
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] = underlyingTransport.subscribe(request)
override def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] =
underlyingTransport.subscribeUnauthenticated(request)
override def subscriptionRetryPolicyPekko: SubscriptionErrorRetryPolicyPekko[SubscriptionError] =
SubscriptionErrorRetryPolicyPekko.never
}

View File

@ -17,15 +17,10 @@ sealed trait SendAsyncError extends PrettyPrinting {
val message: String
protected def toResponseReasonProto: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason
private[protocol] def toSendAsyncUnauthenticatedVersionedResponseProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error =
v30.SendAsyncUnauthenticatedVersionedResponse.Error(toResponseReasonProto)
private[protocol] def toSendAsyncVersionedResponseProto: v30.SendAsyncVersionedResponse.Error =
v30.SendAsyncVersionedResponse.Error(toSignedResponseReasonProto)
private[protocol] def toResponseProtoV30: v30.SendAsyncVersionedResponse.Error =
v30.SendAsyncVersionedResponse.Error(toProtoV30)
override def pretty: Pretty[SendAsyncError] = prettyOfClass(unnamedParam(_.message.unquoted))
@ -37,59 +32,41 @@ object SendAsyncError {
/** The request could not be deserialized to be processed */
final case class RequestInvalid(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.RequestInvalid(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.RequestInvalid(message)
override def category: ErrorCategory = ErrorCategory.InvalidIndependentOfSystemState
}
/** The request server could read the request but refused to accept it */
final case class RequestRefused(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.RequestRefused(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.RequestRefused(message)
override def category: ErrorCategory = ErrorCategory.InvalidGivenCurrentSystemStateOther
}
/** The Sequencer is overloaded and declined to handle the request */
final case class Overloaded(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.Overloaded(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.Overloaded(message)
override def category: ErrorCategory = ErrorCategory.ContentionOnSharedResources
}
/** The sequencer is unable to process requests (if the service is running it could mean the sequencer is going through a crash recovery process) */
final case class Unavailable(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.Unavailable(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.Unavailable(message)
override def category: ErrorCategory = ErrorCategory.TransientServerFailure
}
/** The Sequencer was unable to handle the send as the sender was unknown so could not asynchronously deliver them a deliver event or error */
final case class SenderUnknown(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.SenderUnknown(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.SenderUnknown(message)
override def category: ErrorCategory = ErrorCategory.InvalidGivenCurrentSystemStateOther
}
final case class UnknownRecipients(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.UnknownRecipients(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.UnknownRecipients(message)
override def category: ErrorCategory = ErrorCategory.InvalidGivenCurrentSystemStateOther
}
@ -103,70 +80,33 @@ object SendAsyncError {
/** The sequencer declined to process new requests as it is shutting down */
final case class ShuttingDown(message: String = "Sequencer shutting down")
extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.ShuttingDown(message)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.ShuttingDown(message)
override def category: ErrorCategory = ErrorCategory.TransientServerFailure
}
final case class Internal(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
throw new IllegalStateException(
"Message `Internal` introduced with protocol version 4 should not be included in `v30.SendAsyncUnauthenticatedVersionedResponse`"
)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.Internal(message)
override def category: ErrorCategory = ErrorCategory.TransientServerFailure
}
final case class Generic(message: String) extends SendAsyncError {
protected def toResponseReasonProto
: v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason =
throw new IllegalStateException(
"Message `Generic` introduced with protocol version 4 should not be included in `v30.SendAsyncUnauthenticatedVersionedResponse`"
)
protected def toSignedResponseReasonProto: v30.SendAsyncVersionedResponse.Error.Reason =
protected def toProtoV30: v30.SendAsyncVersionedResponse.Error.Reason =
v30.SendAsyncVersionedResponse.Error.Reason.Generic(message)
override def category: ErrorCategory = ErrorCategory.TransientServerFailure
}
private[protocol] def fromErrorProto(
error: v30.SendAsyncUnauthenticatedVersionedResponse.Error
): ParsingResult[SendAsyncError] =
error.reason match {
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.Empty =>
ProtoDeserializationError
.FieldNotSet("SendAsyncUnauthenticatedVersionedResponse.error.reason")
.asLeft
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.RequestInvalid(message) =>
RequestInvalid(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.RequestRefused(message) =>
RequestRefused(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.Overloaded(message) =>
Overloaded(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.Unavailable(message) =>
Unavailable(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.SenderUnknown(message) =>
SenderUnknown(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.UnknownRecipients(message) =>
UnknownRecipients(message).asRight
case v30.SendAsyncUnauthenticatedVersionedResponse.Error.Reason.ShuttingDown(message) =>
ShuttingDown(message).asRight
}
private[protocol] def fromSignedErrorProto(
error: v30.SendAsyncVersionedResponse.Error
): ParsingResult[SendAsyncError] =
error.reason match {
case v30.SendAsyncVersionedResponse.Error.Reason.Empty =>
ProtoDeserializationError
.FieldNotSet("SendAsyncUnauthenticatedVersionedResponse.error.reason")
.FieldNotSet("SendAsyncVersionedResponse.error.reason")
.asLeft
case v30.SendAsyncVersionedResponse.Error.Reason.RequestInvalid(message) =>
RequestInvalid(message).asRight
@ -188,29 +128,17 @@ object SendAsyncError {
}
}
final case class SendAsyncUnauthenticatedVersionedResponse(error: Option[SendAsyncError]) {
def toSendAsyncUnauthenticatedVersionedResponseProto
: v30.SendAsyncUnauthenticatedVersionedResponse =
v30.SendAsyncUnauthenticatedVersionedResponse(
error.map(_.toSendAsyncUnauthenticatedVersionedResponseProto)
)
final case class SendAsyncVersionedResponse(error: Option[SendAsyncError]) {
def toSendAsyncVersionedResponseProto: v30.SendAsyncVersionedResponse =
v30.SendAsyncVersionedResponse(error.map(_.toSendAsyncVersionedResponseProto))
def toProtoV30: v30.SendAsyncVersionedResponse =
v30.SendAsyncVersionedResponse(error.map(_.toResponseProtoV30))
}
object SendAsyncUnauthenticatedVersionedResponse {
def fromSendAsyncUnauthenticatedVersionedResponseProto(
responseP: v30.SendAsyncUnauthenticatedVersionedResponse
): ParsingResult[SendAsyncUnauthenticatedVersionedResponse] =
for {
error <- responseP.error.traverse(SendAsyncError.fromErrorProto)
} yield SendAsyncUnauthenticatedVersionedResponse(error)
def fromSendAsyncVersionedResponseProto(
object SendAsyncVersionedResponse {
def fromProtoV30(
responseP: v30.SendAsyncVersionedResponse
): ParsingResult[SendAsyncUnauthenticatedVersionedResponse] =
): ParsingResult[SendAsyncVersionedResponse] =
for {
error <- responseP.error.traverse(SendAsyncError.fromSignedErrorProto)
} yield SendAsyncUnauthenticatedVersionedResponse(error)
} yield SendAsyncVersionedResponse(error)
}

View File

@ -130,7 +130,7 @@ object TimeProof {
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] =
client.sendAsyncUnauthenticatedOrNot(
client.sendAsync(
// we intentionally ask for an empty event to be sequenced to observe the time.
// this means we can safely share this event without mentioning other recipients.
batch = Batch.empty(protocolVersion),

View File

@ -9,12 +9,10 @@ import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.ProtoDeserializationError.ValueConversionError
import com.digitalasset.canton.config.CantonRequireTypes.{String255, String3, String300}
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.crypto.RandomOps
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.store.db.DbDeserializationException
import com.digitalasset.canton.topology.MediatorGroup.MediatorGroupIndex
import com.digitalasset.canton.util.HexString
import com.digitalasset.canton.{LedgerParticipantId, LfPartyId, ProtoDeserializationError}
import com.google.common.annotations.VisibleForTesting
import io.circe.Encoder
@ -53,7 +51,6 @@ object MemberCode {
case MediatorId.Code.threeLetterId => Right(MediatorId.Code)
case ParticipantId.Code.threeLetterId => Right(ParticipantId.Code)
case SequencerId.Code.threeLetterId => Right(SequencerId.Code)
case UnauthenticatedMemberId.Code.threeLetterId => Right(UnauthenticatedMemberId.Code)
case _ => Left(s"Unknown three letter type $code")
}
@ -76,8 +73,6 @@ sealed trait Member extends Identity with Product with Serializable {
def description: String
def isAuthenticated: Boolean
override def toProtoPrimitive: String = toLengthLimitedString.unwrap
def toLengthLimitedString: String300 =
@ -102,7 +97,6 @@ object Member {
case MediatorId.Code => Right(MediatorId(uid))
case ParticipantId.Code => Right(ParticipantId(uid))
case SequencerId.Code => Right(SequencerId(uid))
case UnauthenticatedMemberId.Code => Right(UnauthenticatedMemberId(uid))
}
}
@ -151,35 +145,6 @@ object Member {
}
sealed trait AuthenticatedMember extends Member {
override def code: AuthenticatedMemberCode
override def isAuthenticated: Boolean = true
}
sealed trait AuthenticatedMemberCode extends MemberCode
final case class UnauthenticatedMemberId(uid: UniqueIdentifier) extends Member {
override def code: MemberCode = UnauthenticatedMemberId.Code
override val description: String = "unauthenticated member"
override def isAuthenticated: Boolean = false
}
object UnauthenticatedMemberId {
object Code extends MemberCode {
val threeLetterId: String3 = String3.tryCreate("UNM")
}
private val RandomIdentifierNumberOfBytes = 20
def tryCreate(namespace: Namespace)(randomOps: RandomOps): UnauthenticatedMemberId =
UnauthenticatedMemberId(
UniqueIdentifier.tryCreate(
HexString.toHexString(randomOps.generateRandomByteString(RandomIdentifierNumberOfBytes)),
namespace.fingerprint.unwrap,
)
)
}
final case class DomainId(uid: UniqueIdentifier) extends Identity {
def unwrap: UniqueIdentifier = uid
def toLengthLimitedString: String255 = uid.toLengthLimitedString
@ -218,11 +183,9 @@ object DomainId {
}
/** A participant identifier */
final case class ParticipantId(uid: UniqueIdentifier)
extends AuthenticatedMember
with NodeIdentity {
final case class ParticipantId(uid: UniqueIdentifier) extends Member with NodeIdentity {
override def code: AuthenticatedMemberCode = ParticipantId.Code
override def code: MemberCode = ParticipantId.Code
override val description: String = "participant"
@ -233,7 +196,7 @@ final case class ParticipantId(uid: UniqueIdentifier)
}
object ParticipantId {
object Code extends AuthenticatedMemberCode {
object Code extends MemberCode {
val threeLetterId: String3 = String3.tryCreate("PAR")
}
def apply(identifier: String, namespace: Namespace): ParticipantId =
@ -314,8 +277,6 @@ object PartyId {
}
sealed trait DomainMember extends AuthenticatedMember
/** @param index uniquely identifies the group, just like [[MediatorId]] for single mediators.
* @param active the active mediators belonging to the group
* @param passive the passive mediators belonging to the group
@ -337,14 +298,14 @@ object MediatorGroup {
val MediatorGroupIndex = NonNegativeInt
}
final case class MediatorId(uid: UniqueIdentifier) extends DomainMember with NodeIdentity {
override def code: AuthenticatedMemberCode = MediatorId.Code
final case class MediatorId(uid: UniqueIdentifier) extends Member with NodeIdentity {
override def code: MemberCode = MediatorId.Code
override val description: String = "mediator"
override def member: Member = this
}
object MediatorId {
object Code extends AuthenticatedMemberCode {
object Code extends MemberCode {
val threeLetterId = String3.tryCreate("MED")
}
@ -371,15 +332,15 @@ final case class SequencerGroup(
threshold: PositiveInt,
)
final case class SequencerId(uid: UniqueIdentifier) extends DomainMember with NodeIdentity {
override def code: AuthenticatedMemberCode = SequencerId.Code
final case class SequencerId(uid: UniqueIdentifier) extends Member with NodeIdentity {
override def code: MemberCode = SequencerId.Code
override val description: String = "sequencer"
override def member: Member = this
}
object SequencerId {
object Code extends AuthenticatedMemberCode {
object Code extends MemberCode {
val threeLetterId = String3.tryCreate("SEQ")
}

View File

@ -410,7 +410,7 @@ class ValidatingTopologyMappingChecks(
toValidate.mapping.member match {
case participantId: ParticipantId =>
ensureParticipantDoesNotHostParties(effective, participantId)
case _: UnauthenticatedMemberId | _: AuthenticatedMember => EitherTUtil.unit
case _ => EitherTUtil.unit
}
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: CantonExamples

View File

@ -133,7 +133,7 @@ class DomainTopologyService(
): EitherT[FutureUnlessShutdown, SendAsyncClientError, Unit] = {
logger.debug(s"Broadcasting topology transaction: ${request.broadcasts}")
EitherTUtil.logOnErrorU(
sequencerClient.sendAsyncUnauthenticatedOrNot(
sequencerClient.sendAsync(
Batch.of(protocolVersion, (request, Recipients.cc(TopologyBroadcastAddress.recipient))),
maxSequencingTime =
clock.now.add(topologyConfig.topologyTransactionRegistrationTimeout.toInternal.duration),

View File

@ -3,12 +3,13 @@
package com.digitalasset.canton.sequencing
import cats.data.EitherT
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.crypto.provider.symbolic.SymbolicCrypto
import com.digitalasset.canton.crypto.{Fingerprint, Signature}
import com.digitalasset.canton.health.{AtomicHealthComponent, ComponentHealthState}
import com.digitalasset.canton.lifecycle.OnShutdownRunner
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, OnShutdownRunner}
import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.sequencing.SequencerAggregatorPekko.HasSequencerSubscriptionFactoryPekko
import com.digitalasset.canton.sequencing.SequencerAggregatorPekkoTest.Config
@ -18,15 +19,7 @@ import com.digitalasset.canton.sequencing.client.TestSequencerSubscriptionFactor
Failure,
}
import com.digitalasset.canton.sequencing.client.TestSubscriptionError.UnretryableError
import com.digitalasset.canton.sequencing.client.{
ResilientSequencerSubscription,
SequencedEventTestFixture,
SequencedEventValidator,
SequencedEventValidatorImpl,
SequencerSubscriptionFactoryPekko,
TestSequencerSubscriptionFactoryPekko,
TestSubscriptionError,
}
import com.digitalasset.canton.sequencing.client.*
import com.digitalasset.canton.topology.{DefaultTestIdentities, SequencerId}
import com.digitalasset.canton.util.OrderedBucketMergeHub.{
ActiveSourceTerminated,
@ -34,7 +27,8 @@ import com.digitalasset.canton.util.OrderedBucketMergeHub.{
DeadlockTrigger,
NewConfiguration,
}
import com.digitalasset.canton.util.{OrderedBucketMergeConfig, ResourceUtil}
import com.digitalasset.canton.util.{EitherTUtil, OrderedBucketMergeConfig, ResourceUtil}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{
BaseTest,
HasExecutionContext,
@ -324,14 +318,21 @@ class SequencerAggregatorPekkoTest
import fixture.*
val validator = new SequencedEventValidatorImpl(
// Disable signature checking
unauthenticated = true,
defaultDomainId,
testedProtocolVersion,
subscriberCryptoApi,
loggerFactory,
timeouts,
)
) {
override protected def verifySignature(
priorEventO: Option[PossiblyIgnoredSerializedEvent],
event: OrdinarySerializedEvent,
sequencerId: SequencerId,
protocolVersion: ProtocolVersion,
): EitherT[FutureUnlessShutdown, SequencedEventValidationError[Nothing], Unit] =
EitherTUtil.unitUS
}
val initialCounter = SequencerCounter(10)
val aggregator = mkAggregatorPekko(validator)
val ((source, (doneF, health_)), sink) = Source
@ -371,11 +372,16 @@ class SequencerAggregatorPekkoTest
val events = mkEvents(initialCounter, 4)
val events1 = events.take(2)
// alice reports events 10,11,12,13
factoryAlice.add(events.map(_.copy(signatures = NonEmpty(Set, signatureAlice)))*)
// bob reports events 10,11
factoryBob.add(events1.map(_.copy(signatures = NonEmpty(Set, signatureBob)))*)
// events
val events2 = events.drop(1)
// bob reports events 12,13
factoryBob.add(events2.drop(1).map(_.copy(signatures = NonEmpty(Set, signatureBob)))*)
// carlos reports events 11,12
factoryCarlos.add(
events2.take(2).map(_.copy(signatures = NonEmpty(Set, signatureCarlos)))*
)

View File

@ -0,0 +1,176 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.sequencing.client
import cats.data.EitherT
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.lifecycle.UnlessShutdown.AbortedDueToShutdown
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, PromiseUnlessShutdown}
import com.digitalasset.canton.logging.SuppressionRule
import com.digitalasset.canton.sequencing.BftSender
import com.digitalasset.canton.sequencing.BftSender.FailedToReachThreshold
import com.digitalasset.canton.{BaseTest, HasExecutionContext}
import org.scalatest.Outcome
import org.scalatest.wordspec.FixtureAnyWordSpec
import org.slf4j.event.Level
import scala.concurrent.duration.*
class BftSenderTest extends FixtureAnyWordSpec with BaseTest with HasExecutionContext {
class Env {
val promise1 = new PromiseUnlessShutdown[Either[String, Int]]("p1", futureSupervisor)
val promise2 = new PromiseUnlessShutdown[Either[String, Int]]("p2", futureSupervisor)
val promise3 = new PromiseUnlessShutdown[Either[String, Int]]("p3", futureSupervisor)
val transports: Map[String, MockTransport] = Map(
"sequencer1" -> new MockTransport(EitherT(promise1.futureUS)),
"sequencer2" -> new MockTransport(EitherT(promise2.futureUS)),
"sequencer3" -> new MockTransport(EitherT(promise3.futureUS)),
)
}
override type FixtureParam = Env
override def withFixture(test: OneArgTest): Outcome =
withFixture(test.toNoArgTest(new Env()))
final class MockTransport(result: EitherT[FutureUnlessShutdown, String, Int]) {
def performRequest: EitherT[FutureUnlessShutdown, String, Int] = result
}
private def checkNotCompleted[E, A](result: EitherT[FutureUnlessShutdown, E, A]) = {
always(1.second) {
result.value.isCompleted shouldBe false
}
}
private def mkRequest(threshold: PositiveInt)(implicit env: Env) = {
import env.*
BftSender.makeRequest[String, String, MockTransport, Int, Int](
"test",
futureSupervisor,
logger,
transports,
threshold,
_.performRequest,
identity,
)
}
"BftSender" should {
"gather requests from threshold-many transports" in { implicit env =>
import env.*
val threshold = PositiveInt.tryCreate(2)
val result = mkRequest(threshold)
checkNotCompleted(result)
promise1.outcome(Right(1))
checkNotCompleted(result)
promise2.outcome(Right(2))
checkNotCompleted(result)
promise3.outcome(Right(1))
result.valueOrFailShutdown("result").futureValue shouldBe 1
}
"return as soon as it has enough identical responses" in { implicit env =>
import env.*
val threshold = PositiveInt.tryCreate(2)
val result = mkRequest(threshold)
checkNotCompleted(result)
promise1.outcome(Right(1))
checkNotCompleted(result)
promise2.outcome(Right(1))
result.valueOrFailShutdown("result").futureValue shouldBe 1
}
"fail early if it can't get enough responses" in { env =>
import env.*
val threshold = PositiveInt.tryCreate(3)
val promise4 = new PromiseUnlessShutdown[Either[String, Int]]("p4", futureSupervisor)
val promise5 = new PromiseUnlessShutdown[Either[String, Int]]("p5", futureSupervisor)
val transports: Map[String, MockTransport] = Map(
"sequencer1" -> new MockTransport(EitherT(promise1.futureUS)),
"sequencer2" -> new MockTransport(EitherT(promise2.futureUS)),
"sequencer3" -> new MockTransport(EitherT(promise3.futureUS)),
"sequencer4" -> new MockTransport(EitherT(promise4.futureUS)),
"sequencer5" -> new MockTransport(EitherT(promise5.futureUS)),
)
loggerFactory.assertEventuallyLogsSeq(SuppressionRule.Level(Level.ERROR))(
{
val result = BftSender.makeRequest[String, String, MockTransport, Int, Int](
"test",
futureSupervisor,
logger,
transports,
threshold,
_.performRequest,
identity,
)
val exception = new RuntimeException("BOOM")
checkNotCompleted(result)
promise1.outcome(Right(1))
checkNotCompleted(result)
promise2.outcome(Right(2))
checkNotCompleted(result)
promise3.outcome(Left("failed"))
checkNotCompleted(result)
promise4.failure(exception)
result.value.failOnShutdown.futureValue shouldBe Left(
FailedToReachThreshold(
Map(1 -> Set("sequencer1"), 2 -> Set("sequencer2")),
Map[String, Either[Throwable, String]](
"sequencer3" -> Right("failed"),
"sequencer4" -> Left(exception),
),
)
)
},
logs => {
forExactly(1, logs) { m =>
m.toString should include(s"test failed for sequencer4")
}
},
)
}
"fail with shutdown if any response is a shutdown" in { implicit env =>
import env.*
val threshold = PositiveInt.tryCreate(3)
val result = mkRequest(threshold)
checkNotCompleted(result)
promise1.outcome(Right(1))
checkNotCompleted(result)
promise2.shutdown()
result.value.unwrap.futureValue shouldBe AbortedDueToShutdown
}
"subsequent results should not trigger errors" in { implicit env =>
import env.*
val threshold = PositiveInt.one
val result = mkRequest(threshold)
checkNotCompleted(result)
promise1.outcome(Right(1))
result.valueOrFailShutdown("result").futureValue shouldBe 1
promise2.outcome(Right(1))
promise3.outcome(Left("failed"))
}
}
}

View File

@ -135,7 +135,6 @@ class SequencedEventTestFixture(
syncCryptoApi: DomainSyncCryptoClient = subscriberCryptoApi
)(implicit executionContext: ExecutionContext): SequencedEventValidatorImpl = {
new SequencedEventValidatorImpl(
unauthenticated = false,
defaultDomainId,
testedProtocolVersion,
syncCryptoApi,

View File

@ -1103,13 +1103,6 @@ class SequencerClientTest
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] =
sendAsync(request.content).mapK(FutureUnlessShutdown.outcomeK)
override def sendAsyncUnauthenticatedVersioned(
request: SubmissionRequest,
timeout: Duration,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] = ???
override def subscribe[E](request: SubscriptionRequest, handler: SerializedEventHandler[E])(
implicit traceContext: TraceContext
): SequencerSubscription[E] = {
@ -1134,11 +1127,6 @@ class SequencerClientTest
override protected def loggerFactory: NamedLoggerFactory =
SequencerClientTest.this.loggerFactory
override def subscribeUnauthenticated[E](
request: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] = ???
override def downloadTopologyStateForInit(request: TopologyStateForInitRequest)(implicit
traceContext: TraceContext
): EitherT[Future, String, TopologyStateForInitResponse] = ???
@ -1166,10 +1154,6 @@ class SequencerClientTest
)
}
override def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] = subscribe(request)
override def subscriptionRetryPolicyPekko
: SubscriptionErrorRetryPolicyPekko[SubscriptionError] =
SubscriptionErrorRetryPolicyPekko.never
@ -1268,9 +1252,7 @@ class SequencerClientTest
private class ConstantSequencedEventValidatorFactory(eventValidator: SequencedEventValidator)
extends SequencedEventValidatorFactory {
override def create(
unauthenticated: Boolean
)(implicit loggingContext: NamedLoggingContext): SequencedEventValidator =
override def create()(implicit loggingContext: NamedLoggingContext): SequencedEventValidator =
eventValidator
}

View File

@ -10,8 +10,6 @@ import org.scalacheck.Arbitrary
object GeneratorsTopology {
import com.digitalasset.canton.config.GeneratorsConfig.*
implicit val domainMemberArb: Arbitrary[DomainMember] = genArbitrary
implicit val authenticatedMemberArb: Arbitrary[AuthenticatedMember] = genArbitrary
implicit val fingerprintArb: Arbitrary[Fingerprint] = Arbitrary(
string68Arb.arbitrary.map(Fingerprint.tryCreate)
)

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: medical-records

View File

@ -6,11 +6,7 @@ package com.digitalasset.canton.domain.block
import cats.syntax.either.*
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.RawLedgerBlock.RawBlockEvent
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.{
SenderSigned,
SignedOrderingRequest,
SignedOrderingRequestOps,
}
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SenderSigned
import com.digitalasset.canton.logging.HasLoggerName
import com.digitalasset.canton.sequencing.protocol.{
AcknowledgeRequest,
@ -18,11 +14,8 @@ import com.digitalasset.canton.sequencing.protocol.{
SignedContent,
SubmissionRequest,
}
import com.digitalasset.canton.serialization.HasCryptographicEvidence
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.serialization.{
BytestringWithCryptographicEvidence,
HasCryptographicEvidence,
}
import com.digitalasset.canton.tracing.Traced
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{LfTimestamp, ProtoDeserializationError}
@ -38,13 +31,12 @@ object LedgerBlockEvent extends HasLoggerName {
final case class Send(
timestamp: CantonTimestamp,
signedOrderingRequest: SignedOrderingRequest,
signedSubmissionRequest: SenderSigned[SubmissionRequest],
originalPayloadSize: Int =
0, // default is 0 for testing as this value is only used for metrics
) extends LedgerBlockEvent {
lazy val signedSubmissionRequest = signedOrderingRequest.signedSubmissionRequest
}
final case class Acknowledgment(request: SignedContent[AcknowledgeRequest])
) extends LedgerBlockEvent
final case class Acknowledgment(request: SenderSigned[AcknowledgeRequest])
extends LedgerBlockEvent
def fromRawBlockEvent(
@ -53,7 +45,7 @@ object LedgerBlockEvent extends HasLoggerName {
blockEvent match {
case RawBlockEvent.Send(request, microsecondsSinceEpoch) =>
for {
deserializedRequest <- deserializeSignedOrderingRequest(protocolVersion)(request)
deserializedRequest <- deserializeSignedRequest(protocolVersion)(request)
timestamp <- LfTimestamp
.fromLong(microsecondsSinceEpoch)
.leftMap(e => ProtoDeserializationError.TimestampConversionError(e))
@ -64,32 +56,21 @@ object LedgerBlockEvent extends HasLoggerName {
)
}
def deserializeSignedOrderingRequest(
def deserializeSignedRequest(
protocolVersion: ProtocolVersion
)(submissionRequestBytes: ByteString): ParsingResult[SignedOrderingRequest] = {
)(
submissionRequestBytes: ByteString
): ParsingResult[SenderSigned[SubmissionRequest]] = {
// TODO(i10428) Prevent zip bombing when decompressing the request
for {
sequencerSignedContent <- SignedContent
.fromByteString(protocolVersion)(submissionRequestBytes)
signedOrderingRequest <- sequencerSignedContent
.deserializeContent(
deserializeOrderingRequestToValueClass(protocolVersion)
.andThen(deserializeOrderingRequestSignedContent(protocolVersion))
)
signedOrderingRequest <- deserializeSenderSignedSubmissionRequest(protocolVersion)(
sequencerSignedContent
)
} yield signedOrderingRequest
}
private def deserializeOrderingRequestToValueClass(
protocolVersion: ProtocolVersion
): ByteString => ParsingResult[SignedContent[BytestringWithCryptographicEvidence]] =
SignedContent.fromByteString(protocolVersion)
private def deserializeOrderingRequestSignedContent(protocolVersion: ProtocolVersion)(
signedContentParsingResult: ParsingResult[SignedContent[BytestringWithCryptographicEvidence]]
): ParsingResult[SenderSigned[SubmissionRequest]] = for {
signedContent <- signedContentParsingResult
senderSignedRequest <- deserializeSenderSignedSubmissionRequest(protocolVersion)(signedContent)
} yield senderSignedRequest
private def deserializeSenderSignedSubmissionRequest[A <: HasCryptographicEvidence](
protocolVersion: ProtocolVersion

View File

@ -34,7 +34,7 @@ import com.digitalasset.canton.resource.IdempotentInsert.insertVerifyingConflict
import com.digitalasset.canton.resource.{DbStorage, DbStore}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.TrafficState
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{SequencerCounter, resource}
@ -152,20 +152,14 @@ class DbSequencerBlockStore(
val addMember = sequencerStore.addMemberDBIO(_, _)
val addAcks = sequencerStore.acknowledgeDBIO _
val addEvents = sequencerStore.addEventsDBIO(trafficState)(_)
val (unauthenticated, disabledMembers) = membersDisabled.partitionMap {
case unauthenticated: UnauthenticatedMemberId => Left(unauthenticated)
case other => Right(other)
}
val disableMember = sequencerStore.disableMemberDBIO _
val unregisterUnauthenticatedMember = sequencerStore.unregisterUnauthenticatedMember _
val dbio = DBIO
.seq(
newMembers.toSeq.map(addMember.tupled) ++
acknowledgments.toSeq.map(addAcks.tupled) ++
unauthenticated.map(unregisterUnauthenticatedMember) ++
Seq(sequencerStore.addInFlightAggregationUpdatesDBIO(inFlightAggregationUpdates)) ++
disabledMembers.map(disableMember): _*
membersDisabled.map(disableMember): _*
)
.transactionally

View File

@ -31,7 +31,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.{
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.TrafficState
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import monocle.macros.syntax.lens.*
import org.apache.pekko.NotUsed
@ -112,11 +112,6 @@ class InMemorySequencerBlockStore(
val addEvents = sequencerStore.addEvents(_, trafficState)
val addAcks = sequencerStore.acknowledge(_, _)
val disableMember = sequencerStore.disableMember(_)
val unregisterUnauthenticatedMember = sequencerStore.unregisterUnauthenticatedMember(_)
val (unauthenticated, disabledMembers) = membersDisabled.partitionMap {
case unauthenticated: UnauthenticatedMemberId => Left(unauthenticated)
case other => Right(other)
}
// Since these updates are being run sequentially from the state manager, there is no problem with this
// implementation not being atomic.
// Also because this is an in-mem implementation, there is no concern about crashing mid update since all state
@ -125,8 +120,7 @@ class InMemorySequencerBlockStore(
_ <- Future.traverse(newMembers.toSeq)(addMember.tupled)
_ <- Future.traverse(events)(addEvents)
_ <- Future.traverse(acknowledgments.toSeq)(addAcks.tupled)
_ <- Future.traverse(disabledMembers)(disableMember)
_ <- Future.traverse(unauthenticated)(unregisterUnauthenticatedMember)
_ <- Future.traverse(membersDisabled)(disableMember)
_ <- sequencerStore.addInFlightAggregationUpdates(inFlightAggregationUpdates)
} yield ()
}

View File

@ -364,11 +364,9 @@ private[update] final class BlockChunkProcessor(
sequencedSubmission
def recipientIsKnown(member: Member): Future[Option[Member]] = {
if (!member.isAuthenticated) Future.successful(None)
else
sequencingSnapshot.ipsSnapshot
.isMemberKnown(member)
.map(Option.when(_)(member))
sequencingSnapshot.ipsSnapshot
.isMemberKnown(member)
.map(Option.when(_)(member))
}
val topologySnapshot = topologySnapshotO.getOrElse(sequencingSnapshot).ipsSnapshot
@ -394,10 +392,9 @@ private[update] final class BlockChunkProcessor(
)
} yield {
val knownGroupMembers = groupToMembers.values.flatten
val allowUnauthenticatedSender = Option.when(!sender.isAuthenticated)(sender).toList
val allMembersInSubmission =
Set.empty ++ knownGroupMembers ++ knownMemberRecipientsOrSender ++ allowUnauthenticatedSender
Set.empty ++ knownGroupMembers ++ knownMemberRecipientsOrSender
(allMembersInSubmission -- state.ephemeral.registeredMembers)
.map(_ -> sequencingTimestamp)
.toSeq
@ -474,7 +471,7 @@ private[update] final class BlockChunkProcessor(
value.foreach(_.withTraceContext { implicit traceContext =>
{
case LedgerBlockEvent.Send(_, signedSubmissionRequest, payloadSize) =>
signedSubmissionRequest.content.content.batch.allRecipients
signedSubmissionRequest.content.batch.allRecipients
.foldLeft(RecipientStats()) {
case (acc, MemberRecipient(ParticipantId(_)) | ParticipantsOfParty(_)) =>
acc.copy(participants = true)
@ -482,15 +479,10 @@ private[update] final class BlockChunkProcessor(
acc.copy(mediators = true)
case (acc, MemberRecipient(SequencerId(_)) | SequencersOfDomain) =>
acc.copy(sequencers = true)
case (
acc,
MemberRecipient(UnauthenticatedMemberId(_)),
) =>
acc // not used
case (acc, AllMembersOfDomain) => acc.copy(broadcast = true)
}
.updateMetric(
signedSubmissionRequest.content.content.sender,
signedSubmissionRequest.content.sender,
payloadSize,
logger,
metrics,

View File

@ -20,7 +20,6 @@ import com.digitalasset.canton.domain.block.data.{
}
import com.digitalasset.canton.domain.block.{BlockEvents, LedgerBlockEvent, RawLedgerBlock}
import com.digitalasset.canton.domain.metrics.BlockMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequestOps
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.InvalidLedgerEvent
import com.digitalasset.canton.domain.sequencing.sequencer.traffic.SequencerRateLimitManager
@ -171,9 +170,9 @@ class BlockUpdateGeneratorImpl(
private def isAddressingSequencers(event: LedgerBlockEvent): Boolean =
event match {
case Send(_, signedOrderingRequest, _) =>
case Send(_, signedSubmissionRequest, _) =>
val allRecipients =
signedOrderingRequest.signedSubmissionRequest.content.batch.allRecipients
signedSubmissionRequest.content.batch.allRecipients
allRecipients.contains(AllMembersOfDomain) ||
allRecipients.contains(SequencersOfDomain)
case _ => false

View File

@ -712,9 +712,7 @@ private[update] final class SubmissionRequestValidator(
// If we haven't seen any topology transactions yet, then we cannot verify signatures, so we skip it.
// In practice this should only happen for the first ever transaction, which contains the initial topology data.
val skipCheck =
latestSequencerEventTimestamp.isEmpty || !submissionRequest.sender.isAuthenticated
if (skipCheck) {
if (latestSequencerEventTimestamp.isEmpty) {
EitherT.pure[FutureUnlessShutdown, SubmissionRequestOutcome](())
} else {
val alarm = for {

View File

@ -30,7 +30,7 @@ import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.store.SequencedEventStore.OrdinarySequencedEvent
import com.digitalasset.canton.store.db.DbDeserializationException
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.{SerializableTraceContext, TraceContext}
import com.digitalasset.canton.util.{ErrorUtil, RangeUtil}
import com.digitalasset.canton.version.*
@ -450,15 +450,6 @@ class DbSequencerStateManagerStore(
_ <- sqlu"update seq_state_manager_members set enabled = ${false} where member = $member"
} yield ()
def unregisterUnauthenticatedMember(
member: UnauthenticatedMemberId
): DbAction.WriteOnly[Unit] = for {
_ <-
sqlu"delete from seq_state_manager_events where member = $member"
_ <-
sqlu"delete from seq_state_manager_members where member = $member"
} yield ()
override def isEnabled(member: Member)(implicit traceContext: TraceContext): Future[Boolean] =
storage
.query(

View File

@ -24,7 +24,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.{
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.TrafficState
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ErrorUtil
import org.apache.pekko.NotUsed
@ -208,9 +208,6 @@ class InMemorySequencerStateManagerStore(
def disableMember(member: Member): State =
copy(indices = indices + (member -> indices(member).disable()))
def unregisterUnauthenticatedMember(member: UnauthenticatedMemberId): State =
copy(indices = indices - member)
def addEvents(
events: Map[Member, OrdinarySerializedEvent],
trafficSate: Map[Member, TrafficState],
@ -310,15 +307,6 @@ class InMemorySequencerStateManagerStore(
Future.unit
}
def unregisterUnauthenticatedMember(
member: UnauthenticatedMemberId
): Future[Unit] = {
state.getAndUpdate {
_.unregisterUnauthenticatedMember(member)
}
Future.unit
}
override def isEnabled(member: Member)(implicit traceContext: TraceContext): Future[Boolean] =
Future.successful(state.get().indices.get(member).fold(false)(_.isEnabled))

View File

@ -19,10 +19,9 @@ import com.digitalasset.canton.sequencing.protocol.{
}
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.time.{Clock, PeriodicAction}
import com.digitalasset.canton.topology.{DomainMember, Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.Spanning.SpanWrapper
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
import com.digitalasset.canton.util.ErrorUtil
import com.digitalasset.canton.util.ShowUtil.*
import io.opentelemetry.api.trace.Tracer
@ -69,11 +68,6 @@ abstract class BaseSequencer(
)
.leftMap(e => SendAsyncError.RequestRefused(e))
.mapK(FutureUnlessShutdown.outcomeK)
_ <- EitherT
.right(
autoRegisterUnauthenticatedSender(submission)
)
.mapK(FutureUnlessShutdown.outcomeK) // TODO(#18399): Propagate FUS
_ <- sendAsyncSignedInternal(signedSubmissionWithFixedTs)
} yield ()
}
@ -102,11 +96,6 @@ abstract class BaseSequencer(
withSpan("Sequencer.sendAsync") { implicit traceContext => span =>
setSpanAttributes(span, submission)
for {
_ <- EitherT
.right(
autoRegisterUnauthenticatedSender(submission)
)
.mapK(FutureUnlessShutdown.outcomeK) // TODO(#18399): Propagate FUS
_ <- sendAsyncInternal(submission)
} yield ()
}
@ -116,19 +105,7 @@ abstract class BaseSequencer(
span.setAttribute("message_id", submission.messageId.unwrap)
}
private def autoRegisterUnauthenticatedSender(
submission: SubmissionRequest
)(implicit traceContext: TraceContext): Future[Unit] =
submission.sender match {
case member: UnauthenticatedMemberId =>
registerMember(member).valueOr(error =>
// this error should not happen, as currently registration errors are only for authenticated users
ErrorUtil.invalidState(s"Unexpected error: $error")
)
case _ => Future.unit
}
protected def localSequencerMember: DomainMember
protected def localSequencerMember: Member
protected def disableMemberInternal(member: Member)(implicit
traceContext: TraceContext
): Future[Unit]

View File

@ -25,25 +25,11 @@ import com.digitalasset.canton.metrics.MetricsHelper
import com.digitalasset.canton.resource.Storage
import com.digitalasset.canton.scheduler.PruningScheduler
import com.digitalasset.canton.sequencing.client.SequencerClient
import com.digitalasset.canton.sequencing.protocol.{
AcknowledgeRequest,
MemberRecipient,
SendAsyncError,
SignedContent,
SubmissionRequest,
TrafficState,
}
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.sequencing.traffic.TrafficControlErrors
import com.digitalasset.canton.time.EnrichedDurations.*
import com.digitalasset.canton.time.{Clock, NonNegativeFiniteDuration}
import com.digitalasset.canton.topology.{
AuthenticatedMember,
DomainId,
DomainMember,
Member,
SequencerId,
UnauthenticatedMemberId,
}
import com.digitalasset.canton.topology.{DomainId, Member, SequencerId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext
import com.digitalasset.canton.util.FutureUtil.doNotAwait
@ -242,37 +228,26 @@ class DatabaseSequencer(
override def registerMember(member: Member)(implicit
traceContext: TraceContext
): EitherT[Future, RegisterError, Unit] = {
if (!member.isAuthenticated) {
for {
isRegistered <- EitherT.right[RegisterError](isRegistered(member))
_ <- EitherTUtil.ifThenET[Future, RegisterError](
!isRegistered
) {
registerMemberInternal(member, clock.now)
}
} yield ()
} else {
for {
firstKnownAtO <- EitherT.right[RegisterError](
cryptoApi.headSnapshot.ipsSnapshot.memberFirstKnownAt(member)
)
_ <- firstKnownAtO match {
case Some(firstKnownAt) =>
logger.debug(s"Registering member $member with timestamp $firstKnownAt")
registerMemberInternal(member, firstKnownAt)
for {
firstKnownAtO <- EitherT.right[RegisterError](
cryptoApi.headSnapshot.ipsSnapshot.memberFirstKnownAt(member)
)
_ <- firstKnownAtO match {
case Some(firstKnownAt) =>
logger.debug(s"Registering member $member with timestamp $firstKnownAt")
registerMemberInternal(member, firstKnownAt)
case None =>
val error: RegisterError =
OperationError[RegisterMemberError](
RegisterMemberError.UnexpectedError(
member,
s"Member $member is not known in the topology",
)
case None =>
val error: RegisterError =
OperationError[RegisterMemberError](
RegisterMemberError.UnexpectedError(
member,
s"Member $member is not known in the topology",
)
EitherT.leftT[Future, Unit](error)
}
} yield ()
}
)
EitherT.leftT[Future, Unit](error)
}
} yield ()
}
/** Package private to use access method in tests, see `TestDatabaseSequencerWrapper`.
@ -329,27 +304,20 @@ class DatabaseSequencer(
if (!unifiedSequencer) {
reader.read(member, offset)
} else {
if (!member.isAuthenticated) {
// allowing unauthenticated members to read events is the same as automatically registering an unauthenticated member
// and then proceeding with the subscription.
// optimization: if the member is unauthenticated, we don't need to fetch all members from the snapshot
reader.read(member, offset)
} else {
for {
isKnown <- EitherT.right[CreateSubscriptionError](
cryptoApi.currentSnapshotApproximation.ipsSnapshot.isMemberKnown(member)
)
_ <- EitherTUtil.condUnitET[Future](
isKnown,
CreateSubscriptionError.UnknownMember(member): CreateSubscriptionError,
)
isRegistered <- EitherT.right(isRegistered(member))
_ <- EitherTUtil.ifThenET[Future, CreateSubscriptionError](!isRegistered) {
registerMember(member).leftMap(CreateSubscriptionError.MemberRegisterError)
}
eventSource <- reader.read(member, offset)
} yield eventSource
}
for {
isKnown <- EitherT.right[CreateSubscriptionError](
cryptoApi.currentSnapshotApproximation.ipsSnapshot.isMemberKnown(member)
)
_ <- EitherTUtil.condUnitET[Future](
isKnown,
CreateSubscriptionError.UnknownMember(member): CreateSubscriptionError,
)
isRegistered <- EitherT.right(isRegistered(member))
_ <- EitherTUtil.ifThenET[Future, CreateSubscriptionError](!isRegistered) {
registerMember(member).leftMap(CreateSubscriptionError.MemberRegisterError)
}
eventSource <- reader.read(member, offset)
} yield eventSource
}
}
@ -375,19 +343,13 @@ class DatabaseSequencer(
member: Member
)(implicit traceContext: TraceContext): Future[Unit] = {
withExpectedRegisteredMember(member, "Disable member") { memberId =>
member match {
// Unauthenticated members being disabled get automatically unregistered
case unauthenticated: UnauthenticatedMemberId =>
store.unregisterUnauthenticatedMember(unauthenticated)
case _: AuthenticatedMember =>
store.disableMember(memberId)
}
store.disableMember(memberId)
}
}
// For the database sequencer, the SequencerId serves as the local sequencer identity/member
// until the database and block sequencers are unified.
override protected def localSequencerMember: DomainMember = SequencerId(domainId.uid)
override protected def localSequencerMember: Member = SequencerId(domainId.uid)
/** helper for performing operations that are expected to be called with a registered member so will just throw if we
* find the member is unregistered.

View File

@ -73,16 +73,6 @@ class DirectSequencerClientTransport(
.sendAsyncSigned(request)
.leftMap(SendAsyncClientError.RequestRefused)
override def sendAsyncUnauthenticatedVersioned(
request: SubmissionRequest,
timeout: Duration,
)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncClientResponseError, Unit] =
ErrorUtil.internalError(
new UnsupportedOperationException("Direct client does not support unauthenticated sends")
)
override def acknowledgeSigned(request: SignedContent[AcknowledgeRequest])(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, String, Boolean] =
@ -152,19 +142,6 @@ class DirectSequencerClientTransport(
}
}
override def subscribeUnauthenticated[E](
request: SubscriptionRequest,
handler: SerializedEventHandler[E],
)(implicit traceContext: TraceContext): SequencerSubscription[E] =
unsupportedUnauthenticatedSubscription
private def unsupportedUnauthenticatedSubscription(implicit traceContext: TraceContext): Nothing =
ErrorUtil.internalError(
new UnsupportedOperationException(
"Direct client does not support unauthenticated subscriptions"
)
)
override def subscriptionRetryPolicy: SubscriptionErrorRetryPolicy =
// unlikely there will be any errors with this direct transport implementation
SubscriptionErrorRetryPolicy.never
@ -207,11 +184,6 @@ class DirectSequencerClientTransport(
SequencerSubscriptionPekko(source, health)
}
override def subscribeUnauthenticated(request: SubscriptionRequest)(implicit
traceContext: TraceContext
): SequencerSubscriptionPekko[SubscriptionError] =
unsupportedUnauthenticatedSubscription
override def subscriptionRetryPolicyPekko: SubscriptionErrorRetryPolicyPekko[SubscriptionError] =
// unlikely there will be any errors with this direct transport implementation
SubscriptionErrorRetryPolicyPekko.never

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.domain.sequencing.sequencer
import cats.data.EitherT
import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
import com.digitalasset.canton.crypto.{DomainSnapshotSyncCryptoApi, HashPurpose}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.RegisterError
import com.digitalasset.canton.domain.sequencing.sequencer.errors.{
@ -31,12 +32,13 @@ import com.digitalasset.canton.sequencing.traffic.TrafficControlErrors.TrafficCo
import com.digitalasset.canton.serialization.HasCryptographicEvidence
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import io.grpc.ServerServiceDefinition
import org.apache.pekko.Done
import org.apache.pekko.stream.KillSwitch
import org.apache.pekko.stream.scaladsl.Source
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
/** Errors from pruning */
sealed trait PruningError {
@ -283,9 +285,11 @@ object Sequencer extends HasLoggerName {
*/
type SequencerSigned[A <: HasCryptographicEvidence] = SignedContent[SenderSigned[A]]
/** Ordering request signed by the sequencer.
* Outer signature is the signature of the sequencer that received the submission request.
* Inner signature is the signature of the member from which the submission request originated.
/** A signed ordering request. This is a double-signed message where the outer signature is the sequencer's,
* and the inner signature is the sender's.
* The sequencer signature may be introduced and used by the implementation to ensure that the submission
* originates from the expected sequencer node. This may be necessary if the implementation is split across
* multiple processes.
*
* ┌─────────────────┐ ┌────────────┐
* SenderSigned Sequencer
@ -307,6 +311,31 @@ object Sequencer extends HasLoggerName {
*/
type SignedOrderingRequest = SequencerSigned[SubmissionRequest]
/** Sign a submission request with the sequencer's private key.
* This utility may be used by a [[com.digitalasset.canton.domain.sequencing.sequencer.block.BlockOrderer]]
* implementation as described in [[SignedOrderingRequest]].
*/
def signOrderingRequest[A <: HasCryptographicEvidence](
content: SignedContent[SubmissionRequest],
cryptoApi: DomainSnapshotSyncCryptoApi,
protocolVersion: ProtocolVersion,
)(implicit
ec: ExecutionContext,
tc: TraceContext,
): EitherT[FutureUnlessShutdown, SendAsyncError.Internal, SignedOrderingRequest] =
for {
signed <- SignedContent
.create(
cryptoApi.pureCrypto,
cryptoApi,
content,
Some(cryptoApi.ipsSnapshot.timestamp),
HashPurpose.OrderingRequestSignature,
protocolVersion,
)
.leftMap(error => SendAsyncError.Internal(s"Could not sign ordering request: $error"))
} yield signed
implicit class SignedOrderingRequestOps(val value: SignedOrderingRequest) extends AnyVal {
def signedSubmissionRequest: SignedContent[SubmissionRequest] =
value.content

View File

@ -8,8 +8,7 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.sequencer.admin.v30
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.time.NonNegativeFiniteDuration
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
trait AbstractSequencerMemberStatus extends Product with Serializable {
def registeredAt: CantonTimestamp
@ -166,17 +165,6 @@ final case class SequencerPruningStatus(
*/
lazy val safePruningTimestamp: CantonTimestamp = safePruningTimestampFor(now)
def unauthenticatedMembersToDisable(retentionPeriod: NonNegativeFiniteDuration): Set[Member] =
members.foldLeft(Set.empty[Member]) { (toDisable, memberStatus) =>
memberStatus.member match {
case _: UnauthenticatedMemberId if memberStatus.enabled =>
if (now.minus(retentionPeriod.unwrap) > memberStatus.safePruningTimestamp) {
toDisable + memberStatus.member
} else toDisable
case _ => toDisable
}
}
/** List clients that would need to be disabled to allow pruning at the given timestamp.
*/
def clientsPreventingPruning(timestamp: CantonTimestamp): SequencerClients =

View File

@ -246,8 +246,7 @@ class SequencerReader(
signingSnapshot <- OptionT
.fromOption[FutureUnlessShutdown](topologySnapshotO)
.getOrElseF {
val warnIfApproximate =
(event.counter > SequencerCounter.Genesis) && member.isAuthenticated
val warnIfApproximate = event.counter > SequencerCounter.Genesis
SyncCryptoClient.getSnapshotForTimestampUS(
syncCryptoApi,
event.timestamp,

View File

@ -56,12 +56,6 @@ object SequencerValidations {
(),
"Sender is not eligible according to the aggregation rule",
)
unauthenticatedEligibleSenders = eligibleSenders.filterNot(_.isAuthenticated)
_ <- Either.cond(
unauthenticatedEligibleSenders.isEmpty,
(),
s"Eligible senders in aggregation rule must be authenticated, but found unauthenticated members $unauthenticatedEligibleSenders",
)
} yield ()
}

View File

@ -5,12 +5,12 @@ package com.digitalasset.canton.domain.sequencing.sequencer.block
import cats.data.EitherT
import com.digitalasset.canton.domain.block.{RawLedgerBlock, SequencerDriverHealthStatus}
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequest
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SenderSigned
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.sequencing.protocol.{
AcknowledgeRequest,
SendAsyncError,
SignedContent,
SubmissionRequest,
}
import com.digitalasset.canton.tracing.TraceContext
import io.grpc.ServerServiceDefinition
@ -63,12 +63,8 @@ trait BlockOrderer extends AutoCloseable {
/** Orders a submission.
* If the sequencer node is honest, this normally results in a [[com.digitalasset.canton.domain.block.RawLedgerBlock.RawBlockEvent.Send]].
* In exceptional cases (crashes, high load, ...), a sequencer may drop submissions.
* There's a double [[com.digitalasset.canton.sequencing.protocol.SignedContent]] wrapping because
* the outer signature is the sequencer's, and the inner signature is the sender's.
* The sequencer signature may be used by the implementation to ensure that the submission originates from the
* expected sequencer node. This may be necessary if the implementation is split across multiple processes.
*/
def send(signedSubmission: SignedOrderingRequest)(implicit
def send(signedSubmissionRequest: SenderSigned[SubmissionRequest])(implicit
traceContext: TraceContext
): EitherT[Future, SendAsyncError, Unit]
@ -76,7 +72,7 @@ trait BlockOrderer extends AutoCloseable {
* If the sequencer node is honest, this normally results in a [[com.digitalasset.canton.domain.block.RawLedgerBlock.RawBlockEvent.Acknowledgment]].
* In exceptional cases (crashes, high load, ...), a sequencer may drop acknowledgements.
*/
def acknowledge(signedAcknowledgeRequest: SignedContent[AcknowledgeRequest])(implicit
def acknowledge(signedAcknowledgeRequest: SenderSigned[AcknowledgeRequest])(implicit
traceContext: TraceContext
): Future[Unit]

View File

@ -9,17 +9,14 @@ import com.digitalasset.canton.SequencerCounter
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, HashPurpose, Signature}
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, Signature}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.block.BlockSequencerStateManagerBase
import com.digitalasset.canton.domain.block.data.SequencerBlockStore
import com.digitalasset.canton.domain.block.update.{BlockUpdateGeneratorImpl, LocalBlockUpdate}
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.PruningError.UnsafePruningPoint
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.{
EventSource,
SignedOrderingRequest,
}
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.EventSource
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscriptionError
@ -42,7 +39,6 @@ import com.digitalasset.canton.sequencing.traffic.{
TrafficControlErrors,
TrafficPurchasedSubmissionHandler,
}
import com.digitalasset.canton.serialization.HasCryptographicEvidence
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.tracing.{TraceContext, Traced}
@ -230,26 +226,6 @@ class BlockSequencer(
override def adminServices: Seq[ServerServiceDefinition] = blockOrderer.adminServices
private def signOrderingRequest[A <: HasCryptographicEvidence](
content: SignedContent[SubmissionRequest]
)(implicit
tc: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncError.Internal, SignedOrderingRequest] = {
val privateCrypto = cryptoApi.currentSnapshotApproximation
for {
signed <- SignedContent
.create(
cryptoApi.pureCrypto,
privateCrypto,
content,
Some(privateCrypto.ipsSnapshot.timestamp),
HashPurpose.OrderingRequestSignature,
protocolVersion,
)
.leftMap(error => SendAsyncError.Internal(s"Could not sign ordering request: $error"))
} yield signed
}
override protected def sendAsyncSignedInternal(
signedSubmission: SignedContent[SubmissionRequest]
)(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] = {
@ -276,9 +252,7 @@ class BlockSequencer(
// TODO(#18399): currentApproximation vs headSnapshot?
cryptoApi.currentSnapshotApproximation.ipsSnapshot
.allMembers()
.map(allMembers =>
(member: Member) => allMembers.contains(member) || !member.isAuthenticated
)
.map(allMembers => (member: Member) => allMembers.contains(member))
)
.mapK(FutureUnlessShutdown.outcomeK)
// TODO(#18399): Why we don't check group recipients here?
@ -293,14 +267,13 @@ class BlockSequencer(
s"Invoking send operation on the ledger with the following protobuf message serialized to bytes ${prettyPrinter
.printAdHoc(submission.toProtoVersioned)}"
)
signedOrderingRequest <- signOrderingRequest(signedSubmission)
_ <-
EitherT(
futureSupervisor
.supervised(
s"Sending submission request with id ${submission.messageId} from $sender to ${batch.allRecipients}"
)(
blockOrderer.send(signedOrderingRequest).value
blockOrderer.send(signedSubmission).value
)
).mapK(FutureUnlessShutdown.outcomeK)
} yield ()
@ -313,22 +286,15 @@ class BlockSequencer(
if (unifiedSequencer) {
super.readInternal(member, offset)
} else {
if (!member.isAuthenticated) {
// allowing unauthenticated members to read events is the same as automatically registering an unauthenticated member
// and then proceeding with the subscription.
// optimization: if the member is unauthenticated, we don't need to fetch all members from the snapshot
EitherT.fromEither[Future](stateManager.readEventsForMember(member, offset))
} else {
EitherT
.right(cryptoApi.currentSnapshotApproximation.ipsSnapshot.isMemberKnown(member))
.flatMap { isKnown =>
if (isKnown) {
EitherT.fromEither[Future](stateManager.readEventsForMember(member, offset))
} else {
EitherT.leftT(CreateSubscriptionError.UnknownMember(member))
}
EitherT
.right(cryptoApi.currentSnapshotApproximation.ipsSnapshot.isMemberKnown(member))
.flatMap { isKnown =>
if (isKnown) {
EitherT.fromEither[Future](stateManager.readEventsForMember(member, offset))
} else {
EitherT.leftT(CreateSubscriptionError.UnknownMember(member))
}
}
}
}
}
@ -338,8 +304,7 @@ class BlockSequencer(
if (unifiedSequencer) {
super.isRegistered(member)
} else {
if (!member.isAuthenticated) Future.successful(true)
else cryptoApi.headSnapshot.ipsSnapshot.isMemberKnown(member)
cryptoApi.headSnapshot.ipsSnapshot.isMemberKnown(member)
}
}
@ -374,7 +339,7 @@ class BlockSequencer(
}
}
override protected def localSequencerMember: DomainMember = sequencerId
override protected def localSequencerMember: Member = sequencerId
override def acknowledge(member: Member, timestamp: CantonTimestamp)(implicit
traceContext: TraceContext

View File

@ -9,7 +9,7 @@ import com.digitalasset.canton.domain.block.{
SequencerDriver,
SequencerDriverHealthStatus,
}
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequest
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SenderSigned
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.tracing.TraceContext
@ -33,14 +33,14 @@ class DriverBlockOrderer(
driver.subscribe()
override def send(
signedSubmission: SignedOrderingRequest
signedSubmissionRequest: SenderSigned[SubmissionRequest]
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncError, Unit] =
// The driver API doesn't provide error reporting, so we don't attempt to translate the exception
EitherT.right(
driver.send(signedSubmission.toByteString)
driver.send(signedSubmissionRequest.toByteString)
)
override def acknowledge(signedAcknowledgeRequest: SignedContent[AcknowledgeRequest])(implicit
override def acknowledge(signedAcknowledgeRequest: SenderSigned[AcknowledgeRequest])(implicit
traceContext: TraceContext
): Future[Unit] =
driver.acknowledge(signedAcknowledgeRequest.toByteString)

View File

@ -6,7 +6,7 @@ package com.digitalasset.canton.domain.sequencing.sequencer.errors
import com.daml.error.{ErrorCategory, ErrorCode, Explanation, Resolution}
import com.digitalasset.canton.error.BaseCantonError
import com.digitalasset.canton.error.CantonErrorGroups.SequencerErrorGroup
import com.digitalasset.canton.topology.DomainMember
import com.digitalasset.canton.topology.Member
sealed trait SequencerAdministrationError extends BaseCantonError
@ -26,7 +26,7 @@ object SequencerAdministrationError extends SequencerErrorGroup {
"CANNOT_DISABLE_LOCAL_SEQUENCER_MEMBER",
ErrorCategory.InvalidIndependentOfSystemState,
) {
final case class Error(sequencerMember: DomainMember)
final case class Error(sequencerMember: Member)
extends BaseCantonError.Impl(
cause = s"Sequencer ${sequencerMember} cannot disable its local sequencer subscription"
)

View File

@ -30,7 +30,7 @@ import com.digitalasset.canton.resource.DbStorage.Profile.{H2, Oracle, Postgres}
import com.digitalasset.canton.resource.DbStorage.*
import com.digitalasset.canton.sequencing.protocol.MessageId
import com.digitalasset.canton.store.db.DbDeserializationException
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.{SerializableTraceContext, TraceContext}
import com.digitalasset.canton.util.EitherTUtil.condUnitET
import com.digitalasset.canton.util.{EitherTUtil, ErrorUtil, retry}
@ -382,23 +382,6 @@ class DbSequencerStore(
"registerMember",
)
def unregisterUnauthenticatedMember(member: UnauthenticatedMemberId)(implicit
traceContext: TraceContext
): Future[Unit] =
for {
memberRemoved <- storage.update(
{
sqlu"""
delete from sequencer_members where member = $member
"""
},
functionFullName,
)
_ = evictFromCache(member)
} yield logger.debug(
s"Removed at least $memberRemoved unauthenticated members"
)
protected override def lookupMemberInternal(member: Member)(implicit
traceContext: TraceContext
): Future[Option[RegisteredMember]] =

View File

@ -17,7 +17,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.InMemorySequencerStore.CheckpointDataAtCounter
import com.digitalasset.canton.lifecycle.CloseContext
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.util.{EitherTUtil, ErrorUtil, retry}
@ -79,18 +79,6 @@ class InMemorySequencerStore(
.memberId
}
override def unregisterUnauthenticatedMember(member: UnauthenticatedMemberId)(implicit
traceContext: TraceContext
): Future[Unit] = {
disabledClientsRef
.getAndUpdate(disabledClients =>
disabledClients.copy(members = disabledClients.members - member)
)
evictFromCache(member)
Future.successful(members.remove(member)).void
}
protected override def lookupMemberInternal(member: Member)(implicit
traceContext: TraceContext
): Future[Option[RegisteredMember]] =

View File

@ -3,7 +3,7 @@
package com.digitalasset.canton.domain.sequencing.sequencer.store
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.{TraceContext, Traced}
import com.github.blemale.scaffeine.{Cache, Scaffeine}
@ -36,11 +36,4 @@ class SequencerMemberCache(populate: Traced[Member] => Future[Option[RegisteredM
cache.getIfPresent(member).fold(lookupFromStore)(result => Future.successful(Option(result)))
}
/** Evicts an unauthenticated member from the cache. Used when unregistering unauthenticated members.
* @param member member to evict from the cache
*/
def evict(member: UnauthenticatedMemberId): Unit = {
cache.invalidate(member)
}
}

View File

@ -14,14 +14,8 @@ import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveNumeric}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.PruningError.UnsafePruningPoint
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.SequencerStore.SequencerPruningResult
import com.digitalasset.canton.domain.sequencing.sequencer.{
CommitMode,
PruningError,
SequencerPruningStatus,
SequencerSnapshot,
WriteNotification,
}
import com.digitalasset.canton.lifecycle.CloseContext
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
@ -29,7 +23,7 @@ import com.digitalasset.canton.resource.{DbStorage, MemoryStorage, Storage}
import com.digitalasset.canton.sequencing.protocol.{MessageId, SequencedEvent}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.time.NonNegativeFiniteDuration
import com.digitalasset.canton.topology.{Member, UnauthenticatedMemberId}
import com.digitalasset.canton.topology.Member
import com.digitalasset.canton.tracing.{HasTraceContext, TraceContext, Traced}
import com.digitalasset.canton.util.EitherTUtil.condUnitET
import com.digitalasset.canton.util.FutureInstances.*
@ -462,19 +456,6 @@ trait SequencerStore extends NamedLogging with AutoCloseable {
traceContext: TraceContext
): Future[SequencerMemberId]
/** Unregister a disabled unauthenticated member.
* This should delete the member from the store.
*/
def unregisterUnauthenticatedMember(member: UnauthenticatedMemberId)(implicit
traceContext: TraceContext
): Future[Unit]
/** Evict unauthenticated member from the cache.
*/
final protected def evictFromCache(member: UnauthenticatedMemberId): Unit = {
memberCache.evict(member)
}
/** Lookup an existing member id for the given member.
* Will return a cached value if available.
* Return [[scala.None]] if no id exists.

View File

@ -7,6 +7,7 @@ import cats.data.EitherT
import cats.instances.future.*
import cats.syntax.either.*
import cats.syntax.foldable.*
import cats.syntax.functor.*
import com.daml.metrics.api.MetricsContext
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.ProtoDeserializationError.ProtoDeserializationFailure
@ -27,7 +28,6 @@ import com.digitalasset.canton.protocol.DomainParametersLookup.SequencerDomainPa
import com.digitalasset.canton.protocol.DynamicDomainParametersLookup
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.store.{
@ -140,85 +140,6 @@ object GrpcSequencerService {
protocolVersion,
)
/** Abstracts the steps that are different in processing the submission requests coming from the various sendAsync endpoints
* @tparam ProtoClass The scalapb generated class of the RPC request message
*/
private sealed trait SubmissionRequestProcessing[ProtoClass <: scalapb.GeneratedMessage] {
/** The Scala class to which the `ProtoClass` should deserialize to */
type ValueClass
/** Tries to parse the proto class to the value class, erroring if the request exceeds the given limit. */
def parse(
requestP: ProtoClass,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[ValueClass]
/** Extract the [[SubmissionRequest]] from the value class */
def unwrap(request: ValueClass): SubmissionRequest
/** Call the appropriate send method on the [[Sequencer]] */
def send(request: ValueClass, sequencer: Sequencer)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncError, Unit]
}
private object VersionedSignedSubmissionRequestProcessing
extends SubmissionRequestProcessing[v30.SendAsyncVersionedRequest] {
override type ValueClass = SignedContent[SubmissionRequest]
override def parse(
requestP: v30.SendAsyncVersionedRequest,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[SignedContent[SubmissionRequest]] = {
for {
signedContent <- SignedContent.fromByteString(protocolVersion)(
requestP.signedSubmissionRequest
)
signedSubmissionRequest <- signedContent.deserializeContent(
SubmissionRequest
.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)
)
} yield signedSubmissionRequest
}
override def unwrap(request: SignedContent[SubmissionRequest]): SubmissionRequest =
request.content
override def send(request: SignedContent[SubmissionRequest], sequencer: Sequencer)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] = sequencer.sendAsyncSigned(request)
}
private object VersionedUnsignedSubmissionRequestProcessing
extends SubmissionRequestProcessing[v30.SendAsyncUnauthenticatedVersionedRequest] {
override type ValueClass = SubmissionRequest
override def parse(
requestP: v30.SendAsyncUnauthenticatedVersionedRequest,
maxRequestSize: MaxRequestSize,
protocolVersion: ProtocolVersion,
): ParsingResult[SubmissionRequest] =
SubmissionRequest.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)(
requestP.submissionRequest
)
override def unwrap(request: SubmissionRequest): SubmissionRequest = request
override def send(request: SubmissionRequest, sequencer: Sequencer)(implicit
traceContext: TraceContext
): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] =
sequencer.sendAsync(request)
}
private sealed trait WrappedAcknowledgeRequest extends Product with Serializable {
def unwrap: AcknowledgeRequest
}
@ -269,26 +190,11 @@ class GrpcSequencerService(
override def sendAsyncVersioned(
requestP: v30.SendAsyncVersionedRequest
): Future[v30.SendAsyncVersionedResponse] =
validateAndSend(
requestP,
VersionedSignedSubmissionRequestProcessing,
isUsingAuthenticatedEndpoint = true,
).map(_.toSendAsyncVersionedResponseProto)
validateAndSend(requestP).map(_.toProtoV30)
override def sendAsyncUnauthenticatedVersioned(
requestP: v30.SendAsyncUnauthenticatedVersionedRequest
): Future[v30.SendAsyncUnauthenticatedVersionedResponse] =
validateAndSend(
requestP,
VersionedUnsignedSubmissionRequestProcessing,
isUsingAuthenticatedEndpoint = false,
).map(_.toSendAsyncUnauthenticatedVersionedResponseProto)
private def validateAndSend[ProtoClass <: scalapb.GeneratedMessage](
proto: ProtoClass,
processing: SubmissionRequestProcessing[ProtoClass],
isUsingAuthenticatedEndpoint: Boolean,
): Future[SendAsyncUnauthenticatedVersionedResponse] = {
private def validateAndSend(
proto: v30.SendAsyncVersionedRequest
): Future[SendAsyncVersionedResponse] = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
// This has to run at the beginning, because it reads from a thread-local.
@ -296,17 +202,24 @@ class GrpcSequencerService(
def parseAndValidate(
maxRequestSize: MaxRequestSize
): Either[SendAsyncError, processing.ValueClass] = for {
request <- processing
.parse(proto, maxRequestSize, protocolVersion)
): Either[SendAsyncError, SignedContent[SubmissionRequest]] = for {
signedContent <- SignedContent
.fromByteString(protocolVersion)(proto.signedSubmissionRequest)
.leftMap(requestDeserializationError(_, maxRequestSize))
signedSubmissionRequest <- signedContent
.deserializeContent(
SubmissionRequest
.fromByteString(protocolVersion)(
MaxRequestSizeToDeserialize.Limit(maxRequestSize.value)
)
)
.leftMap(requestDeserializationError(_, maxRequestSize))
_ <- validateSubmissionRequest(
proto.serializedSize,
processing.unwrap(request),
signedSubmissionRequest.content,
senderFromMetadata,
)
_ <- checkSenderPermission(processing.unwrap(request), isUsingAuthenticatedEndpoint)
} yield request
} yield signedSubmissionRequest
lazy val sendET = for {
domainParameters <- EitherT
@ -317,25 +230,25 @@ class GrpcSequencerService(
request <- EitherT.fromEither[FutureUnlessShutdown](
parseAndValidate(domainParameters.maxRequestSize)
)
_ <- checkRate(processing.unwrap(request)).mapK(FutureUnlessShutdown.outcomeK)
_ <- processing.send(request, sequencer)
_ <- checkRate(request.content).mapK(FutureUnlessShutdown.outcomeK)
_ <- sequencer.sendAsyncSigned(request)
} yield ()
performUnlessClosingUSF(functionFullName)(sendET.value.map { res =>
res.left.foreach { err =>
logger.info(s"Rejecting submission request by $senderFromMetadata with $err")
}
toSendAsyncUnauthenticatedVersionedResponse(res)
toSendAsyncVersionedResponse(res)
})
.onShutdown(
SendAsyncUnauthenticatedVersionedResponse(error = Some(SendAsyncError.ShuttingDown()))
SendAsyncVersionedResponse(error = Some(SendAsyncError.ShuttingDown()))
)
}
private def toSendAsyncUnauthenticatedVersionedResponse(
private def toSendAsyncVersionedResponse(
result: Either[SendAsyncError, Unit]
): SendAsyncUnauthenticatedVersionedResponse =
SendAsyncUnauthenticatedVersionedResponse(result.swap.toOption)
): SendAsyncVersionedResponse =
SendAsyncVersionedResponse(result.swap.toOption)
private def requestDeserializationError(
error: ProtoDeserializationError,
@ -354,29 +267,6 @@ class GrpcSequencerService(
SendAsyncError.RequestInvalid(message)
}
private def checkSenderPermission(
submissionRequest: SubmissionRequest,
isUsingAuthenticatedEndpoint: Boolean,
)(implicit traceContext: TraceContext): Either[SendAsyncError, Unit] = {
val sender = submissionRequest.sender
for {
_ <- Either.cond(
sender.isAuthenticated == isUsingAuthenticatedEndpoint,
(),
refuse(submissionRequest.messageId.toProtoPrimitive, sender)(
s"Sender $sender needs to use ${if (isUsingAuthenticatedEndpoint) "unauthenticated"
else "authenticated"} send operation"
),
)
_ <- sender match {
case authMember: AuthenticatedMember =>
checkAuthenticatedSendPermission(submissionRequest, authMember)
case unauthMember: UnauthenticatedMemberId =>
checkUnauthenticatedSendPermission(submissionRequest, unauthMember)
}
} yield ()
}
private def validateSubmissionRequest(
requestSize: Int,
request: SubmissionRequest,
@ -424,14 +314,6 @@ class GrpcSequencerService(
SequencerValidations.checkToAtMostOneMediator(request),
"Batch contains multiple mediators as recipients.",
)
_ <- refuseUnless(sender)(
noTopologyTimestampIfUnauthenticated(
sender,
request.topologyTimestamp,
request.batch.envelopes,
),
"Requests sent from or to unauthenticated members must not specify the topology timestamp",
)
_ <- request.aggregationRule.traverse_(validateAggregationRule(sender, messageId, _))
} yield {
metrics.publicApi.bytesProcessed.mark(requestSize.toLong)(MetricsContext.Empty)
@ -442,22 +324,6 @@ class GrpcSequencerService(
}
}
/** Reject requests that involve unauthenticated members and specify the topology timestamp.
* This is because the unauthenticated member typically does not know the domain topology state
* and therefore cannot validate that the requested timestamp is within the topology timestamp tolerance.
*/
private def noTopologyTimestampIfUnauthenticated(
sender: Member,
topologyTimestampO: Option[CantonTimestamp],
envelopes: Seq[ClosedEnvelope],
): Boolean =
topologyTimestampO.isEmpty || (sender.isAuthenticated && envelopes.forall(
_.recipients.allRecipients.forall {
case MemberRecipient(m) => m.isAuthenticated
case _ => true
}
))
private def validateAggregationRule(
sender: Member,
messageId: MessageId,
@ -481,46 +347,6 @@ class GrpcSequencerService(
SendAsyncError.RequestRefused(message)
}
private def checkAuthenticatedSendPermission(
request: SubmissionRequest,
sender: AuthenticatedMember,
)(implicit traceContext: TraceContext): Either[SendAsyncError, Unit] = sender match {
case _ =>
val unauthRecipients = request.batch.envelopes
.toSet[ClosedEnvelope]
.flatMap(_.recipients.allRecipients)
.collect { case MemberRecipient(unauthMember: UnauthenticatedMemberId) =>
unauthMember
}
Either.cond(
unauthRecipients.isEmpty,
(),
refuse(request.messageId.toProtoPrimitive, sender)(
s"Member is trying to send message to unauthenticated ${unauthRecipients.mkString(" ,")}. Only domain manager can do that."
),
)
}
private def checkUnauthenticatedSendPermission(
request: SubmissionRequest,
unauthenticatedMember: UnauthenticatedMemberId,
)(implicit traceContext: TraceContext): Either[SendAsyncError, Unit] = {
// unauthenticated member can only send messages to IDM
val nonIdmRecipients = request.batch.envelopes
.flatMap(_.recipients.allRecipients)
.filter {
case TopologyBroadcastAddress.recipient => false
case _ => true
}
Either.cond(
nonIdmRecipients.isEmpty,
(),
refuse(request.messageId.toProtoPrimitive, unauthenticatedMember)(
s"Unauthenticated member is trying to send message to members other than the topology broadcast address ${TopologyBroadcastAddress.recipient}"
),
)
}
private def checkRate(
request: SubmissionRequest
)(implicit
@ -602,25 +428,12 @@ class GrpcSequencerService(
subscribeInternal[v30.VersionedSubscriptionResponse](
request,
responseObserver,
requiresAuthentication = true,
toVersionSubscriptionResponseV0,
)
override def subscribeUnauthenticatedVersioned(
request: v30.SubscriptionRequest,
responseObserver: StreamObserver[v30.VersionedSubscriptionResponse],
): Unit =
subscribeInternal[v30.VersionedSubscriptionResponse](
request,
responseObserver,
requiresAuthentication = false,
toVersionSubscriptionResponseV0,
)
private def subscribeInternal[T](
request: v30.SubscriptionRequest,
responseObserver: StreamObserver[T],
requiresAuthentication: Boolean,
toSubscriptionResponse: OrdinarySerializedEvent => T,
): Unit = {
implicit val traceContext: TraceContext = TraceContextGrpc.fromGrpcContext
@ -637,7 +450,7 @@ class GrpcSequencerService(
(),
Status.UNAVAILABLE.withDescription("Domain is being shutdown."),
)
_ <- checkSubscriptionMemberPermission(member, requiresAuthentication)
_ <- checkSubscriptionMemberPermission(member)
authenticationTokenO = IdentityContextHelper.getCurrentStoredAuthenticationToken
_ <- subscriptionPool
.create(
@ -659,27 +472,10 @@ class GrpcSequencerService(
}
}
private def checkSubscriptionMemberPermission(member: Member, requiresAuthentication: Boolean)(
implicit traceContext: TraceContext
private def checkSubscriptionMemberPermission(member: Member)(implicit
traceContext: TraceContext
): Either[Status, Unit] =
(member, requiresAuthentication) match {
case (authMember: AuthenticatedMember, true) =>
checkAuthenticatedMemberPermission(authMember)
case (authMember: AuthenticatedMember, false) =>
Left(
Status.PERMISSION_DENIED.withDescription(
s"Member $authMember needs to use authenticated subscribe operation"
)
)
case (_: UnauthenticatedMemberId, false) =>
Right(())
case (unauthMember: UnauthenticatedMemberId, true) =>
Left(
Status.PERMISSION_DENIED.withDescription(
s"Member $unauthMember cannot use authenticated subscribe operation"
)
)
}
checkAuthenticatedMemberPermission(member)
override def acknowledgeSigned(
request: v30.AcknowledgeSignedRequest

View File

@ -23,7 +23,7 @@ import com.digitalasset.canton.sequencing.authentication.{
AuthenticationTokenManagerConfig,
}
import com.digitalasset.canton.time.SimClock
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.{DomainId, ParticipantId, UniqueIdentifier}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.{BaseTest, HasExecutionContext}
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
@ -93,11 +93,6 @@ class SequencerAuthenticationServerInterceptorTest
val participantId =
UniqueIdentifier.fromProtoPrimitive_("p1::default").map(new ParticipantId(_)).value
val unauthenticatedMemberId =
UniqueIdentifier
.fromProtoPrimitive_("unm1::default")
.map(new UnauthenticatedMemberId(_))
.value
val neverExpire = CantonTimestamp.MaxValue
val crypto = new SymbolicPureCrypto
val token = AuthenticationTokenWithExpiry(AuthenticationToken.generate(crypto), neverExpire)

View File

@ -26,13 +26,7 @@ import com.digitalasset.canton.topology.DefaultTestIdentities.{
participant2,
sequencerId,
}
import com.digitalasset.canton.topology.{
DomainMember,
Member,
SequencerId,
UnauthenticatedMemberId,
UniqueIdentifier,
}
import com.digitalasset.canton.topology.{Member, SequencerId, UniqueIdentifier}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.{BaseTest, SequencerCounter}
import com.google.protobuf.ByteString
@ -70,9 +64,6 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
private implicit val materializer: Materializer = mock[Materializer] // not used
private val unauthenticatedMemberId =
UniqueIdentifier.fromProtoPrimitive_("unm1::default").map(new UnauthenticatedMemberId(_)).value
class StubSequencer(existingMembers: Set[Member])
extends BaseSequencer(
loggerFactory,
@ -156,7 +147,7 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
traceContext: TraceContext
): EitherT[Future, String, SequencerSnapshot] =
???
override protected val localSequencerMember: DomainMember = sequencerId
override protected val localSequencerMember: Member = sequencerId
override protected def disableMemberInternal(member: Member)(implicit
traceContext: TraceContext
): Future[Unit] = Future.unit
@ -201,16 +192,7 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
name should {
"sends from an unauthenticated member should auto register this member" in {
val sequencer = new StubSequencer(existingMembers = Set(participant1))
val request =
submission(from = unauthenticatedMemberId, to = Set(participant1, participant2))
for {
_ <- send(sequencer)(request).value.failOnShutdown
} yield sequencer.newlyRegisteredMembers should contain only unauthenticatedMemberId
}
"sends from anyone else should not auto register" in {
"sends should not auto register" in {
val sequencer = new StubSequencer(existingMembers = Set(participant1))
val request = submission(from = participant1, to = Set(participant1, participant2))
@ -221,17 +203,6 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest {
}
}
"read" should {
"read from an unauthenticated member should auto register this member" in {
val sequencer = new StubSequencer(existingMembers = Set(participant1))
for {
_ <- sequencer
.read(unauthenticatedMemberId, SequencerCounter(0))
.value
} yield sequencer.newlyRegisteredMembers should contain only unauthenticatedMemberId
}
}
"health" should {
"onHealthChange should register listener and immediately call it with current status" in {
val sequencer = new StubSequencer(Set())

View File

@ -689,43 +689,6 @@ abstract class SequencerApiTest
}
}
}
"require all eligible senders be authenticated" onlyRunWhen testAggregation in { env =>
import env.*
val unauthenticatedMember =
UnauthenticatedMemberId(UniqueIdentifier.tryCreate("unauthenticated", "member"))
// TODO(i10412): See above
val aggregationRule = AggregationRule(
NonEmpty(Seq, p19, unauthenticatedMember),
PositiveInt.tryCreate(1),
testedProtocolVersion,
)
val messageId = MessageId.tryCreate("unreachable-threshold")
val request = SubmissionRequest.tryCreate(
p19,
messageId,
Batch.empty(testedProtocolVersion),
maxSequencingTime = CantonTimestamp.Epoch.add(Duration.ofSeconds(60)),
topologyTimestamp = None,
aggregationRule = Some(aggregationRule),
Option.empty[SequencingSubmissionCost],
testedProtocolVersion,
)
for {
_ <- sequencer.sendAsync(request).valueOrFailShutdown("Sent async")
reads <- readForMembers(Seq(p19), sequencer)
} yield {
checkRejection(reads, p19, messageId) {
case SequencerErrors.SubmissionRequestMalformed(reason) =>
reason should include(
"Eligible senders in aggregation rule must be authenticated, but found unauthenticated members"
)
}
}
}
}
}
}

View File

@ -30,7 +30,7 @@ import com.digitalasset.canton.domain.block.{
SequencerDriverHealthStatus,
}
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SignedOrderingRequest
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.SenderSigned
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerIntegration
import com.digitalasset.canton.domain.sequencing.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode
import com.digitalasset.canton.domain.sequencing.traffic.RateLimitManagerTesting
@ -43,6 +43,7 @@ import com.digitalasset.canton.sequencing.protocol.{
AcknowledgeRequest,
SendAsyncError,
SignedContent,
SubmissionRequest,
}
import com.digitalasset.canton.time.{Clock, SimClock}
import com.digitalasset.canton.topology.Member
@ -213,7 +214,7 @@ class BlockSequencerTest
override def close(): Unit = ()
// No need to implement these methods for the test
override def send(signedSubmission: SignedOrderingRequest)(implicit
override def send(signedSubmissionRequest: SenderSigned[SubmissionRequest])(implicit
traceContext: TraceContext
): EitherT[Future, SendAsyncError, Unit] = ???
override def health(implicit traceContext: TraceContext): Future[SequencerDriverHealthStatus] =

View File

@ -10,24 +10,13 @@ import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.data.{CantonTimestamp, Counter}
import com.digitalasset.canton.domain.sequencing.sequencer.DomainSequencingTestUtils.mockDeliverStoreEvent
import com.digitalasset.canton.domain.sequencing.sequencer.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.SaveLowerBoundError.BoundLowerThanExisting
import com.digitalasset.canton.domain.sequencing.sequencer.{
CommitMode,
DomainSequencingTestUtils,
SequencerMemberStatus,
SequencerPruningStatus,
SequencerSnapshot,
}
import com.digitalasset.canton.lifecycle.{FlagCloseable, HasCloseContext}
import com.digitalasset.canton.sequencing.protocol.{MessageId, SequencerErrors}
import com.digitalasset.canton.store.db.DbTest
import com.digitalasset.canton.time.NonNegativeFiniteDuration
import com.digitalasset.canton.topology.{
Member,
ParticipantId,
UnauthenticatedMemberId,
UniqueIdentifier,
}
import com.digitalasset.canton.topology.{Member, ParticipantId}
import com.digitalasset.canton.util.FutureInstances.*
import com.digitalasset.canton.{BaseTest, ProtocolVersionChecksAsyncWordSpec, SequencerCounter}
import com.google.protobuf.ByteString
@ -943,25 +932,6 @@ trait SequencerStoreTest
} yield succeed
}
"unregister unauthenticated members" in {
val env = Env()
import env.*
val unauthenticatedAlice: UnauthenticatedMemberId =
UnauthenticatedMemberId(UniqueIdentifier.tryCreate("alice_unauthenticated", "fingerprint"))
for {
id <- store.registerMember(unauthenticatedAlice, ts1)
aliceLookup1 <- store.lookupMember(unauthenticatedAlice)
_ = aliceLookup1 shouldBe Some(RegisteredMember(id, ts1))
_ <- store.unregisterUnauthenticatedMember(unauthenticatedAlice)
aliceLookup2 <- store.lookupMember(unauthenticatedAlice)
_ = aliceLookup2 shouldBe empty
// should also be idempotent
_ <- store.unregisterUnauthenticatedMember(unauthenticatedAlice)
} yield succeed
}
"validating commit mode" should {
"be successful during tests" in {
val store = mk()

View File

@ -78,8 +78,6 @@ class GrpcSequencerServiceTest
private lazy val participant = DefaultTestIdentities.participant1
private lazy val crypto = new SymbolicPureCrypto
private lazy val unauthenticatedMember =
UnauthenticatedMemberId.tryCreate(participant.namespace)(crypto)
class Environment(member: Member) extends Matchers {
val sequencer: Sequencer = mock[Sequencer]
@ -251,67 +249,45 @@ class GrpcSequencerServiceTest
signedContent(request.toByteString)
def sendProto(
versionedRequest: ByteString,
versionedSignedRequest: ByteString,
authenticated: Boolean,
versionedSignedRequest: ByteString
)(implicit
env: Environment
): Future[ParsingResult[SendAsyncUnauthenticatedVersionedResponse]] = {
): Future[ParsingResult[SendAsyncVersionedResponse]] = {
import env.*
if (!authenticated) {
val requestP = v30.SendAsyncUnauthenticatedVersionedRequest(versionedRequest)
val response = service.sendAsyncUnauthenticatedVersioned(requestP)
response.map(
SendAsyncUnauthenticatedVersionedResponse.fromSendAsyncUnauthenticatedVersionedResponseProto
)
} else {
val requestP = v30.SendAsyncVersionedRequest(versionedSignedRequest)
val response = service.sendAsyncVersioned(requestP)
val requestP = v30.SendAsyncVersionedRequest(versionedSignedRequest)
val response = service.sendAsyncVersioned(requestP)
response.map(SendAsyncUnauthenticatedVersionedResponse.fromSendAsyncVersionedResponseProto)
}
response.map(SendAsyncVersionedResponse.fromProtoV30)
}
def send(request: SubmissionRequest, authenticated: Boolean)(implicit
def send(request: SubmissionRequest)(implicit
env: Environment
): Future[ParsingResult[SendAsyncUnauthenticatedVersionedResponse]] = {
val signedRequest = signedSubmissionReq(request)
sendProto(
request.toByteString,
signedRequest.toByteString,
authenticated,
)
): Future[ParsingResult[SendAsyncVersionedResponse]] = {
sendProto(signedSubmissionReq(request).toByteString)
}
def sendAndCheckSucceed(request: SubmissionRequest)(implicit
env: Environment
): Future[Assertion] =
send(request, authenticated = true).map { responseP =>
send(request).map { responseP =>
responseP.value.error shouldBe None
}
def sendAndCheckError(
request: SubmissionRequest,
authenticated: Boolean = true,
request: SubmissionRequest
)(assertion: PartialFunction[SendAsyncError, Assertion])(implicit
env: Environment
): Future[Assertion] =
send(request, authenticated).map { responseP =>
send(request).map { responseP =>
assertion(responseP.value.error.value)
}
def sendProtoAndCheckError(
versionedRequest: ByteString,
versionedSignedRequest: ByteString,
assertion: PartialFunction[SendAsyncError, Assertion],
authenticated: Boolean = true,
)(implicit env: Environment): Future[Assertion] =
sendProto(
versionedRequest,
versionedSignedRequest,
authenticated,
).map { responseP =>
sendProto(versionedSignedRequest).map { responseP =>
assertion(responseP.value.error.value)
}
@ -332,7 +308,6 @@ class GrpcSequencerServiceTest
loggerFactory.assertLogs(
sendProtoAndCheckError(
VersionedMessage(requestV1.toByteString, 0).toByteString,
signedRequestV0.toByteString,
{ case SendAsyncError.RequestInvalid(message) =>
message should startWith("ValueConversionError(sender,Invalid member ``")
@ -367,7 +342,6 @@ class GrpcSequencerServiceTest
)
loggerFactory.assertLogs(
sendProtoAndCheckError(
VersionedMessage(requestV1.toByteString, 0).toByteString,
signedRequestV0.toByteString,
{ case SendAsyncError.RequestInvalid(message) =>
message should startWith(
@ -418,56 +392,14 @@ class GrpcSequencerServiceTest
)
}
"reject unauthenticated member that uses authenticated send" in { _ =>
val request = defaultRequest
.focus(_.sender)
.replace(unauthenticatedMember)
loggerFactory.assertLogs(
sendAndCheckError(request, authenticated = true) {
case SendAsyncError.RequestRefused(message) =>
message should include("needs to use unauthenticated send operation")
}(new Environment(unauthenticatedMember)),
_.warningMessage should include("needs to use unauthenticated send operation"),
)
}
"reject non domain manager authenticated member sending message to unauthenticated member" in {
implicit env =>
val request = defaultRequest
.focus(_.batch)
.replace(
Batch(
List(
ClosedEnvelope.create(
content,
Recipients.cc(unauthenticatedMember),
Seq.empty,
testedProtocolVersion,
)
),
testedProtocolVersion,
)
)
loggerFactory.assertLogs(
sendAndCheckError(request, authenticated = true) {
case SendAsyncError.RequestRefused(message) =>
message should include("Member is trying to send message to unauthenticated")
},
_.warningMessage should include(
"Member is trying to send message to unauthenticated"
),
)
}
"reject on confirmation rate excess" in { implicit env =>
def expectSuccess(): Future[Assertion] = {
sendAndCheckSucceed(defaultConfirmationRequest)
}
def expectOneSuccessOneOverloaded(): Future[Assertion] = {
val result1F = send(defaultConfirmationRequest, authenticated = true)
val result2F = send(defaultConfirmationRequest, authenticated = true)
val result1F = send(defaultConfirmationRequest)
val result2F = send(defaultConfirmationRequest)
for {
result1 <- result1F
result2 <- result2F
@ -592,63 +524,6 @@ class GrpcSequencerServiceTest
),
)
"reject requests to unauthenticated members with a signing key timestamps" in { implicit env =>
val request = defaultRequest
.focus(_.topologyTimestamp)
.replace(Some(CantonTimestamp.ofEpochSecond(1)))
.focus(_.batch)
.replace(
Batch(
List(
ClosedEnvelope.create(
content,
Recipients.cc(unauthenticatedMember),
Seq.empty,
testedProtocolVersion,
)
),
testedProtocolVersion,
)
)
loggerFactory.assertLogs(
sendAndCheckError(request) { case SendAsyncError.RequestRefused(message) =>
message should include(
"Requests sent from or to unauthenticated members must not specify the topology timestamp"
)
},
_.warningMessage should include(
"Requests sent from or to unauthenticated members must not specify the topology timestamp"
),
)
}
"reject unauthenticated eligible members in aggregation rule" in { implicit env =>
val request = defaultRequest
.focus(_.topologyTimestamp)
.replace(Some(CantonTimestamp.ofEpochSecond(1)))
.focus(_.aggregationRule)
.replace(
Some(
AggregationRule(
eligibleMembers = NonEmpty(Seq, participant, unauthenticatedMember),
threshold = PositiveInt.tryCreate(1),
testedProtocolVersion,
)
)
)
loggerFactory.assertLogs(
sendAndCheckError(request) { case SendAsyncError.RequestInvalid(message) =>
message should include(
"Eligible senders in aggregation rule must be authenticated, but found unauthenticated members"
)
},
_.warningMessage should include(
"Eligible senders in aggregation rule must be authenticated, but found unauthenticated members"
),
)
}
"reject unachievable threshold in aggregation rule" in { implicit env =>
val request = defaultRequest
.focus(_.topologyTimestamp)
@ -694,71 +569,6 @@ class GrpcSequencerServiceTest
),
)
}
"reject unauthenticated member sending message to anything other than broadcast" in { _ =>
val request = defaultRequest
.focus(_.sender)
.replace(unauthenticatedMember)
val errorMsg =
"Unauthenticated member is trying to send message to members other than the topology broadcast address All"
loggerFactory.assertLogs(
sendAndCheckError(request, authenticated = false) {
case SendAsyncError.RequestRefused(message) =>
message should include(errorMsg)
}(new Environment(unauthenticatedMember)),
_.warningMessage should include(errorMsg),
)
}
"reject authenticated member that uses unauthenticated send" in { implicit env =>
val request = defaultRequest
.focus(_.sender)
.replace(DefaultTestIdentities.participant1)
loggerFactory.assertLogs(
sendAndCheckError(request, authenticated = false) {
case SendAsyncError.RequestRefused(message) =>
message should include("needs to use authenticated send operation")
},
_.warningMessage should include("needs to use authenticated send operation"),
)
}
"reject requests from unauthenticated senders with a signing key timestamp" in { _ =>
val request = defaultRequest
.focus(_.sender)
.replace(unauthenticatedMember)
.focus(_.topologyTimestamp)
.replace(Some(CantonTimestamp.Epoch))
.focus(_.batch)
.replace(
Batch(
List(
ClosedEnvelope.create(
content,
Recipients.cc(DefaultTestIdentities.sequencerId),
Seq.empty,
testedProtocolVersion,
)
),
testedProtocolVersion,
)
)
loggerFactory.assertLogs(
sendAndCheckError(request, authenticated = false) {
case SendAsyncError.RequestRefused(message) =>
message should include(
"Requests sent from or to unauthenticated members must not specify the topology timestamp"
)
}(new Environment(unauthenticatedMember)),
_.warningMessage should include(
"Requests sent from or to unauthenticated members must not specify the topology timestamp"
),
)
}
}
"versionedSubscribe" should {
@ -828,42 +638,6 @@ class GrpcSequencerServiceTest
case Seq(StreamError(err: StatusException)) if err.getStatus.getCode == PERMISSION_DENIED =>
}
}
"return error if authenticated member sending request unauthenticated endpoint" in { env =>
val observer = new MockServerStreamObserver[v30.VersionedSubscriptionResponse]()
val requestP =
SubscriptionRequest(
participant,
SequencerCounter.Genesis,
testedProtocolVersion,
).toProtoV30
loggerFactory.suppressWarningsAndErrors {
env.service.subscribeUnauthenticatedVersioned(requestP, observer)
}
observer.items.toSeq should matchPattern {
case Seq(StreamError(err: StatusException)) if err.getStatus.getCode == PERMISSION_DENIED =>
}
}
"return error if unauthenticated member sending request authenticated endpoint" in { env =>
val observer = new MockServerStreamObserver[v30.VersionedSubscriptionResponse]()
val requestP =
SubscriptionRequest(
unauthenticatedMember,
SequencerCounter.Genesis,
testedProtocolVersion,
).toProtoV30
loggerFactory.suppressWarningsAndErrors {
env.service.subscribeVersioned(requestP, observer)
}
observer.items.toSeq should matchPattern {
case Seq(StreamError(err: StatusException)) if err.getStatus.getCode == PERMISSION_DENIED =>
}
}
}
def performAcknowledgeRequest(env: Environment)(request: AcknowledgeRequest) =

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --enable-interfaces=yes
name: carbonv1-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --enable-interfaces=yes
name: carbonv2-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
name: experimental-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --enable-interfaces=yes
name: model-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
name: package-management-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --enable-interfaces=yes
name: semantic-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
name: upgrade-tests
source: .
version: 1.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
name: upgrade-tests
source: .
version: 2.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
name: upgrade-tests
source: .
version: 3.0.0

View File

@ -14,7 +14,7 @@ import io.netty.handler.ssl.SslContext
final case class LedgerClientChannelConfiguration(
sslContext: Option[SslContext],
maxInboundMetadataSize: Int = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
maxInboundMessageSize: Int = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE,
maxInboundMessageSize: Int = LedgerClientChannelConfiguration.DefaultMaxInboundMessageSize,
) {
def builderFor(host: String, port: Int): NettyChannelBuilder = {
@ -29,6 +29,7 @@ final case class LedgerClientChannelConfiguration(
object LedgerClientChannelConfiguration {
val DefaultMaxInboundMessageSize: Int = 10 * 1024 * 1024
val InsecureDefaults: LedgerClientChannelConfiguration =
LedgerClientChannelConfiguration(sslContext = None)

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: JsonEncodingTest

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.dev
name: JsonEncodingTestDev

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240530.13105.0.v076ddc13
sdk-version: 3.1.0-snapshot.20240531.13108.0.v60488be0
build-options:
- --target=2.1
name: AdminWorkflows

View File

@ -966,9 +966,7 @@ final class RepairService(
choiceAuthorizers = None, // default (signatories + actingParties)
children = ImmArray.empty[LfNodeId],
exerciseResult = Some(LfValue.ValueNone),
// Not setting the contract key as the indexer deletes contract keys along with contracts.
// If the contract keys were needed, we'd have to reinterpret the contract to look up the key.
keyOpt = None,
keyOpt = c.metadata.maybeKeyWithMaintainers,
byKey = false,
version = c.rawContractInstance.contractInstance.version,
)

View File

@ -166,7 +166,7 @@ trait DomainRegistryHelpers extends FlagCloseable with NamedLogging { this: HasF
def ifParticipant[C](configO: Option[C]): Member => Option[C] = {
case _: ParticipantId => configO
case _ => None // unauthenticated members don't need it
case _ => None
}
SequencerClientFactory(
domainId,

View File

@ -1 +1 @@
20240531.13403.v0762c427
20240604.13418.v5318c201