diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/crypto/Encryption.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/crypto/Encryption.scala index 9953a3735d..37ddbdfa6e 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/crypto/Encryption.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/crypto/Encryption.scala @@ -207,10 +207,6 @@ final case class AsymmetricEncrypted[+M]( def encrypted: Encrypted[M] = new Encrypted(ciphertext) } -object AsymmetricEncrypted { - val noEncryptionFingerprint = Fingerprint.tryCreate("no-encryption") -} - /** Key schemes for asymmetric/hybrid encryption. */ sealed trait EncryptionKeyScheme extends Product with Serializable with PrettyPrinting { def name: String diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/protocol/messages/EncryptedViewMessage.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/protocol/messages/EncryptedViewMessage.scala index 6e2bfadbca..5db8fb1fe4 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/protocol/messages/EncryptedViewMessage.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/protocol/messages/EncryptedViewMessage.scala @@ -20,7 +20,6 @@ import com.digitalasset.canton.protocol.messages.EncryptedView.checkEncryptionKe import com.digitalasset.canton.protocol.messages.EncryptedViewMessageError.{ SessionKeyCreationError, SyncCryptoDecryptError, - WrongRandomnessLength, } import com.digitalasset.canton.protocol.messages.ProtocolMessage.ProtocolMessageContentCast import com.digitalasset.canton.protocol.{v30, *} @@ -386,91 +385,73 @@ object EncryptedViewMessage extends HasProtocolVersionedCompanion[EncryptedViewM .toEitherT[FutureUnlessShutdown] } yield randomness - encrypted.sessionKey - .collectFirst { - case AsymmetricEncrypted(ciphertext, encryptedFor) - // if we're using no encryption, it means we're using group addressing - // which currently does not support encryption of the randomness - if encryptedFor == AsymmetricEncrypted.noEncryptionFingerprint => - SecureRandomness - .fromByteString(randomnessLength)(encrypted.randomness.ciphertext) - .leftMap[EncryptedViewMessageError](_ => - WrongRandomnessLength(ciphertext.size(), randomnessLength) - ) - .toEitherT[FutureUnlessShutdown] - } - .getOrElse { - for { - /* We first need to check whether the target private encryption key exists and is active in the store; otherwise, - * we cannot decrypt and should abort. This situation can occur - * if an encryption key has been added to this participant's topology by another entity with the - * correct rights to do so, but this participant does not have the corresponding private key in the store. - */ - encryptionKeys <- EitherT - .right( - FutureUnlessShutdown.outcomeF(snapshot.ipsSnapshot.encryptionKeys(participantId)) - ) - .map(_.map(_.id).toSet) - encryptedSessionKeyForParticipant <- encrypted.sessionKey - .find(e => encryptionKeys.contains(e.encryptedFor)) - .toRight( - EncryptedViewMessageError.MissingParticipantKey(participantId) - ) - .toEitherT[FutureUnlessShutdown] - _ <- snapshot.crypto.cryptoPrivateStore - .existsDecryptionKey(encryptedSessionKeyForParticipant.encryptedFor) - .leftMap(err => EncryptedViewMessageError.PrivateKeyStoreVerificationError(err)) - .subflatMap { - Either.cond( - _, - (), - EncryptedViewMessageError.PrivateKeyStoreVerificationError( - FailedToReadKey( - encryptedSessionKeyForParticipant.encryptedFor, - "matching private key does not exist", - ) - ), + for { + /* We first need to check whether the target private encryption key exists and is active in the store; otherwise, + * we cannot decrypt and should abort. This situation can occur + * if an encryption key has been added to this participant's topology by another entity with the + * correct rights to do so, but this participant does not have the corresponding private key in the store. + */ + encryptionKeys <- EitherT + .right( + FutureUnlessShutdown.outcomeF(snapshot.ipsSnapshot.encryptionKeys(participantId)) + ) + .map(_.map(_.id).toSet) + encryptedSessionKeyForParticipant <- encrypted.sessionKey + .find(e => encryptionKeys.contains(e.encryptedFor)) + .toRight( + EncryptedViewMessageError.MissingParticipantKey(participantId) + ) + .toEitherT[FutureUnlessShutdown] + _ <- snapshot.crypto.cryptoPrivateStore + .existsDecryptionKey(encryptedSessionKeyForParticipant.encryptedFor) + .leftMap(err => EncryptedViewMessageError.PrivateKeyStoreVerificationError(err)) + .subflatMap { + Either.cond( + _, + (), + EncryptedViewMessageError.PrivateKeyStoreVerificationError( + FailedToReadKey( + encryptedSessionKeyForParticipant.encryptedFor, + "matching private key does not exist", ) - } - _ <- checkEncryptionKeyScheme( - snapshot.crypto.cryptoPublicStore, - encryptedSessionKeyForParticipant.encryptedFor, - allowedEncryptionKeySchemes, + ), ) - .leftMap(err => - EncryptedViewMessageError - .SyncCryptoDecryptError( - SyncCryptoDecryptionError(err) - ) + } + _ <- checkEncryptionKeyScheme( + snapshot.crypto.cryptoPublicStore, + encryptedSessionKeyForParticipant.encryptedFor, + allowedEncryptionKeySchemes, + ) + .leftMap(err => + EncryptedViewMessageError + .SyncCryptoDecryptError( + SyncCryptoDecryptionError(err) ) + ) - // we get the randomness for the session key from the message or by searching the cache, - // which means that a previous view with the same recipients has been received before. - skRandom <- - // we try to search for the cached session key randomness. If it does not exist - // (or is disabled) we decrypt and store it - // the result in the cache. There is no need to sync on this read-write operation because - // there is not problem if the value gets re-written. - sessionKeyStore - .getSessionKeyRandomness( - snapshot.crypto.privateCrypto, - encrypted.viewEncryptionScheme.keySizeInBytes, - encryptedSessionKeyForParticipant, - ) - .leftMap[EncryptedViewMessageError](err => - SyncCryptoDecryptError( - SyncCryptoDecryptionError(err) - ) - ) - viewRandomness <- decryptViewRandomness(skRandom) - } yield viewRandomness - } + // we get the randomness for the session key from the message or by searching the cache, + // which means that a previous view with the same recipients has been received before. + skRandom <- + // we try to search for the cached session key randomness. If it does not exist + // (or is disabled) we decrypt and store it + // the result in the cache. There is no need to sync on this read-write operation because + // there is not problem if the value gets re-written. + sessionKeyStore + .getSessionKeyRandomness( + snapshot.crypto.privateCrypto, + encrypted.viewEncryptionScheme.keySizeInBytes, + encryptedSessionKeyForParticipant, + ) + .leftMap[EncryptedViewMessageError](err => + SyncCryptoDecryptError( + SyncCryptoDecryptionError(err) + ) + ) + viewRandomness <- decryptViewRandomness(skRandom) + } yield viewRandomness } - final case class RecipientsInfo( - informeeParticipants: Set[ParticipantId], - doNotEncrypt: Boolean, - ) + final case class RecipientsInfo(informeeParticipants: Set[ParticipantId]) private def eitherT[VT <: ViewType, B](value: Either[EncryptedViewMessageError, B])(implicit ec: ExecutionContext diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/protocol/SequencerDeliverError.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/protocol/SequencerDeliverError.scala index 5d6d79b253..b965f44e58 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/protocol/SequencerDeliverError.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/protocol/SequencerDeliverError.scala @@ -7,7 +7,13 @@ import com.daml.error.* import com.digitalasset.canton.SequencerCounter import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.error.CantonErrorGroups.SequencerErrorGroup -import com.digitalasset.canton.error.{BaseCantonError, TransactionError, TransactionErrorImpl} +import com.digitalasset.canton.error.{ + Alarm, + AlarmErrorCode, + BaseCantonError, + TransactionError, + TransactionErrorImpl, +} import com.digitalasset.canton.topology.Member import com.google.rpc.status.Status @@ -36,15 +42,24 @@ sealed abstract class SequencerDeliverErrorCode(id: String, category: ErrorCateg @Explanation("""Delivery errors wrapped into sequenced events""") object SequencerErrors extends SequencerErrorGroup { - @Explanation("""This error occurs when the sequencer cannot parse the submission request.""") - @Resolution( - """This usually indicates a misconfiguration of the system components or an application bug and requires operator intervention.""" - ) + @Explanation(""" + |This error occurs when the sequencer receives an invalid submission request, e.g. it has an + |aggregation rule with an unreachable threshold. + |Malformed requests will not emit any deliver event. + |""".stripMargin) + @Resolution(""" + |Check if the sender is running an attack. + |If you can rule out an attack, please reach out to Canton support. + |""".stripMargin) case object SubmissionRequestMalformed - extends SequencerDeliverErrorCode( - id = "SEQUENCER_SUBMISSION_REQUEST_MALFORMED", - ErrorCategory.InvalidIndependentOfSystemState, - ) + extends AlarmErrorCode(id = "SEQUENCER_SUBMISSION_REQUEST_MALFORMED") { + final case class Error( + submissionRequest: SubmissionRequest, + error: String, + ) extends Alarm({ + s"Send request [${submissionRequest.messageId}] is malformed. Discarding request. $error" + }) + } @Explanation( """This error occurs when the sequencer cannot accept submission request due to the current state of the system.""" diff --git a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/traffic/TrafficControlProcessorTest.scala b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/traffic/TrafficControlProcessorTest.scala index 5ea5b77f6a..6a2e2d1d98 100644 --- a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/traffic/TrafficControlProcessorTest.scala +++ b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/traffic/TrafficControlProcessorTest.scala @@ -152,7 +152,7 @@ class TrafficControlProcessorTest extends AnyWordSpec with BaseTest with HasExec ts, domainId, MessageId.fromUuid(new UUID(0, 1)), - SequencerErrors.SubmissionRequestMalformed("Some error"), + SequencerErrors.SubmissionRequestRefused("Some error"), testedProtocolVersion, Option.empty[TrafficReceipt], ) diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/SubmissionRequestValidator.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/SubmissionRequestValidator.scala index bdbc353ecc..bdd5e92b35 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/SubmissionRequestValidator.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/SubmissionRequestValidator.scala @@ -781,24 +781,34 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[Future, SubmissionRequestOutcome, InFlightAggregationUpdate] = + ): EitherT[Future, SubmissionRequestOutcome, InFlightAggregationUpdate] = { + val rule = submissionRequest.aggregationRule.getOrElse( + ErrorUtil.internalError( + new IllegalStateException( + "A submission request with an aggregation id must have an aggregation rule" + ) + ) + ) + for { - inFlightAggregationAndUpdate <- { - EitherT - .fromOption(inFlightAggregationO, ()) - .map(_ -> InFlightAggregationUpdate.empty) - .leftFlatMap { _ => - val rule = submissionRequest.aggregationRule.getOrElse( - ErrorUtil.internalError( - new IllegalStateException( - "A submission request with an aggregation id must have an aggregation rule" - ) - ) + inFlightAggregationAndUpdate <- inFlightAggregationO match { + case None => + // New aggregation + validateAggregationRule(state, submissionRequest, sequencingTimestamp, rule).map { _ => + val fresh = FreshInFlightAggregation(submissionRequest.maxSequencingTime, rule) + InFlightAggregation.initial(fresh) -> InFlightAggregationUpdate( + Some(fresh), + Chain.empty, ) - validateAggregationRule(state, submissionRequest, sequencingTimestamp, rule) } + + case Some(inFlightAggregation) => + // Existing aggregation + wellFormedAggregationRule(submissionRequest, rule) + .map(_ => inFlightAggregation -> InFlightAggregationUpdate.empty) } (inFlightAggregation, inFlightAggregationUpdate) = inFlightAggregationAndUpdate + aggregatedSender = AggregatedSender( submissionRequest.sender, AggregationBySender( @@ -856,6 +866,7 @@ private[update] final class SubmissionRequestValidator( }, ) } yield fullInFlightAggregationUpdate + } private def validateAggregationRule( state: BlockUpdateEphemeralState, @@ -865,20 +876,10 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[Future, SubmissionRequestOutcome, (InFlightAggregation, InFlightAggregationUpdate)] = + ): EitherT[Future, SubmissionRequestOutcome, Unit] = for { - _ <- EitherT.fromEither( - SequencerValidations - .wellformedAggregationRule(submissionRequest.sender, rule) - .leftMap(message => - invalidSubmissionRequest( - state, - submissionRequest, - sequencingTimestamp, - SequencerErrors.SubmissionRequestMalformed(message), - ) - ) - ) + _ <- wellFormedAggregationRule(submissionRequest, rule) + unregisteredEligibleMembers <- { if (unifiedSequencer) { EitherT.right( @@ -909,10 +910,25 @@ private[update] final class SubmissionRequestValidator( ), ), ) - fresh = FreshInFlightAggregation(submissionRequest.maxSequencingTime, rule) - } yield InFlightAggregation.initial(fresh) -> InFlightAggregationUpdate( - Some(fresh), - Chain.empty, + } yield () + + private def wellFormedAggregationRule( + submissionRequest: SubmissionRequest, + rule: AggregationRule, + )(implicit + executionContext: ExecutionContext, + traceContext: TraceContext, + ): EitherT[Future, SubmissionRequestOutcome, Unit] = + EitherT.fromEither( + SequencerValidations + .wellformedAggregationRule(submissionRequest.sender, rule) + .leftMap { message => + val alarm = SequencerErrors.SubmissionRequestMalformed + .Error(submissionRequest, message) + alarm.report() + + SubmissionRequestOutcome.discardSubmissionRequest + } ) private def deliverReceipt( diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencerIntegration.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencerIntegration.scala index 82cab5d740..dccd81e0e7 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencerIntegration.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencerIntegration.scala @@ -6,11 +6,14 @@ package com.digitalasset.canton.domain.sequencing.sequencer import cats.data.EitherT import cats.syntax.parallel.* import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.sequencing.protocol.SendAsyncError import com.digitalasset.canton.topology.Member import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.FutureInstances.parallelFuture -import com.digitalasset.canton.util.MonadUtil +import com.digitalasset.canton.util.retry.RetryUtil.NoExnRetryable +import com.digitalasset.canton.util.{MonadUtil, retry} +import scala.concurrent.duration.* import scala.concurrent.{ExecutionContext, Future} /** This trait defines the interface for BlockSequencer's BlockUpdateGenerator to use on DatabaseSequencer @@ -49,6 +52,26 @@ object SequencerIntegration { trait DatabaseSequencerIntegration extends SequencerIntegration { this: DatabaseSequencer => + + private implicit val retryConditionIfOverloaded: retry.Success[Either[SendAsyncError, Unit]] = + new retry.Success({ + // We only retry overloaded as other possible error here: + // * Unavailable - indicates a programming bug and should not happen during normal operation + // * ShuttingDown - should not be retried as the sequencer is shutting down + case Left(SendAsyncError.Overloaded(_)) => false + case _ => true + }) + private val retryWithBackoff = retry.Backoff( + logger = logger, + flagCloseable = this, + maxRetries = retry.Forever, + // TODO(#18407): Consider making the values below configurable + initialDelay = 10.milliseconds, + maxDelay = 250.milliseconds, + operationName = "block-sequencer-write-internal", + longDescription = "Write the processed submissions into database sequencer store", + ) + override def blockSequencerAcknowledge(acknowledgements: Map[Member, CantonTimestamp])(implicit executionContext: ExecutionContext, traceContext: TraceContext, @@ -70,8 +93,13 @@ trait DatabaseSequencerIntegration extends SequencerIntegration { case _: SubmissionOutcome.Discard.type => EitherT.pure[Future, String](()) case outcome: DeliverableSubmissionOutcome => - this - .blockSequencerWriteInternal(outcome)(outcome.submissionTraceContext) - .leftMap(_.toString) + EitherT( + retryWithBackoff( + this + .blockSequencerWriteInternal(outcome)(outcome.submissionTraceContext) + .value, + NoExnRetryable, + ) + ).leftMap(_.toString) } } diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/Sequencer.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/Sequencer.scala index 0bba2cc68b..7beb7606c5 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/Sequencer.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/Sequencer.scala @@ -258,7 +258,7 @@ object Sequencer extends HasLoggerName { */ type SenderSigned[A <: HasCryptographicEvidence] = SignedContent[A] - /** Type alias for content that has been signed but the sequencer. The purpose of this is to identify which sequencer has processed a submission request, + /** Type alias for content that has been signed by the sequencer. The purpose of this is to identify which sequencer has processed a submission request, * such that after the request is ordered and processed by all sequencers, each sequencer knows which sequencer received the submission request. * The signature here will always be one of a sequencer. */ diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerConfig.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerConfig.scala index 9326b4e6c6..9ca1b50266 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerConfig.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerConfig.scala @@ -42,7 +42,7 @@ object DatabaseSequencerConfig { // TODO(#18407): Allow configuration of database sequencer as a part of unified sequencer // instead of hardcoding the values below private[sequencer] final case class ForBlockSequencer( - writer: SequencerWriterConfig = SequencerWriterConfig.LowLatency(), + writer: SequencerWriterConfig = SequencerWriterConfig.HighThroughput(), reader: SequencerReaderConfig = new SequencerReaderConfig { override val readBatchSize: Int = SequencerReaderConfig.defaultReadBatchSize override val checkpointInterval: NonNegativeFiniteDuration = diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerValidations.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerValidations.scala index 8f02e30ba0..063bd259e2 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerValidations.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerValidations.scala @@ -54,7 +54,7 @@ object SequencerValidations { _ <- Either.cond( eligibleSenders.contains(sender), (), - "Sender is not eligible according to the aggregation rule", + s"Sender [$sender] is not eligible according to the aggregation rule", ) } yield () } diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerApiTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerApiTest.scala index 99613c979b..106378e3f1 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerApiTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerApiTest.scala @@ -15,8 +15,8 @@ import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscrip import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.ExceededMaxSequencingTime import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer as CantonSequencer import com.digitalasset.canton.lifecycle.Lifecycle -import com.digitalasset.canton.logging.SuppressionRule import com.digitalasset.canton.logging.pretty.Pretty +import com.digitalasset.canton.logging.{LogEntry, SuppressionRule} import com.digitalasset.canton.sequencing.OrdinarySerializedEvent import com.digitalasset.canton.sequencing.protocol.SendAsyncError.RequestInvalid import com.digitalasset.canton.sequencing.protocol.* @@ -363,7 +363,7 @@ abstract class SequencerApiTest } } - "bounce on write path aggregate submissions with maxSequencingTime exceeding bound" onlyRunWhen (testAggregation) in { + "bounce on write path aggregate submissions with maxSequencingTime exceeding bound" onlyRunWhen testAggregation in { env => import env.* @@ -742,8 +742,9 @@ abstract class SequencerApiTest import env.* // TODO(i10412): See above + val faultyThreshold = PositiveInt.tryCreate(2) val aggregationRule = - AggregationRule(NonEmpty(Seq, p17, p17), PositiveInt.tryCreate(2), testedProtocolVersion) + AggregationRule(NonEmpty(Seq, p17, p17), faultyThreshold, testedProtocolVersion) val messageId = MessageId.tryCreate("unreachable-threshold") val request = SubmissionRequest.tryCreate( @@ -758,13 +759,27 @@ abstract class SequencerApiTest ) for { - _ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async") - reads <- readForMembers(Seq(p17), sequencer) + reads <- loggerFactory.assertLoggedWarningsAndErrorsSeq( + for { + _ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async") + reads <- readForMembers(Seq(p17), sequencer, timeout = 5.seconds) + } yield reads, + LogEntry.assertLogSeq( + Seq( + ( + _.shouldBeCantonError( + SequencerErrors.SubmissionRequestMalformed, + _ shouldBe s"Send request [$messageId] is malformed. " + + s"Discarding request. Threshold $faultyThreshold cannot be reached", + ), + "p17's submission generates an alarm", + ) + ) + ), + ) } yield { - checkRejection(reads, p17, messageId, defaultExpectedTrafficReceipt) { - case SequencerErrors.SubmissionRequestMalformed(reason) => - reason should include("Threshold 2 cannot be reached") - } + // p17 gets nothing + checkMessages(Seq(), reads) } } @@ -775,7 +790,7 @@ abstract class SequencerApiTest val aggregationRule = AggregationRule(NonEmpty(Seq, p17), PositiveInt.tryCreate(1), testedProtocolVersion) - val messageId = MessageId.tryCreate("unreachable-threshold") + val messageId = MessageId.tryCreate("first-sender-not-eligible") val request = SubmissionRequest.tryCreate( p18, messageId, @@ -788,13 +803,130 @@ abstract class SequencerApiTest ) for { - _ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async") - reads <- readForMembers(Seq(p18), sequencer) + reads <- loggerFactory.assertLoggedWarningsAndErrorsSeq( + for { + _ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async") + reads <- readForMembers(Seq(p18), sequencer, timeout = 5.seconds) + } yield reads, + LogEntry.assertLogSeq( + Seq( + ( + _.shouldBeCantonError( + SequencerErrors.SubmissionRequestMalformed, + _ shouldBe s"Send request [$messageId] is malformed. " + + s"Discarding request. Sender [$p18] is not eligible according to the aggregation rule", + ), + "p18's submission generates an alarm", + ) + ) + ), + ) } yield { - checkRejection(reads, p18, messageId, defaultExpectedTrafficReceipt) { - case SequencerErrors.SubmissionRequestMalformed(reason) => - reason should include("Sender is not eligible according to the aggregation rule") - } + // p18 gets nothing + checkMessages(Seq(), reads) + } + } + + "prevent non-eligible senders from contributing" onlyRunWhen testAggregation in { env => + import env.* + + val messageContent = "aggregatable-message" + val aggregationRule = + AggregationRule(NonEmpty(Seq, p1, p2), PositiveInt.tryCreate(2), testedProtocolVersion) + + val requestFromP1 = createSendRequest( + sender = p1, + messageContent, + Recipients.cc(p3), + maxSequencingTime = CantonTimestamp.Epoch.add(Duration.ofSeconds(60)), + aggregationRule = Some(aggregationRule), + topologyTimestamp = Some(CantonTimestamp.Epoch), + ) + + // Request with non-eligible sender + val messageId = MessageId.tryCreate("further-sender-not-eligible") + val requestFromP4 = requestFromP1.copy(sender = p4, messageId = messageId) + + val requestFromP2 = + requestFromP1.copy(sender = p2, messageId = MessageId.fromUuid(new UUID(1, 2))) + + for { + _ <- sequencer + .sendAsyncSigned(sign(requestFromP1)) + .valueOrFailShutdown("Sent async for participant1") + + readsForP1 <- readForMembers(Seq(p1), sequencer) + + readsForP4 <- loggerFactory.assertLoggedWarningsAndErrorsSeq( + for { + _ <- sequencer + .sendAsyncSigned(sign(requestFromP4)) + .valueOrFailShutdown("Sent async for non-eligible participant4") + reads <- readForMembers(Seq(p4), sequencer, timeout = 5.seconds) + } yield reads, + LogEntry.assertLogSeq( + Seq( + ( + _.shouldBeCantonError( + SequencerErrors.SubmissionRequestMalformed, + _ shouldBe s"Send request [$messageId] is malformed. " + + s"Discarding request. Sender [$p4] is not eligible according to the aggregation rule", + ), + "p4's submission generates an alarm", + ) + ) + ), + ) + + _ <- sequencer + .sendAsyncSigned(sign(requestFromP2)) + .valueOrFailShutdown("Sent async for participant2") + + readsForP2 <- readForMembers(Seq(p2), sequencer) + readsForP3 <- readForMembers(Seq(p3), sequencer) + } yield { + // p1 gets the receipt immediately + checkMessages( + Seq( + EventDetails( + SequencerCounter.Genesis, + p1, + Some(requestFromP1.messageId), + defaultExpectedTrafficReceipt, + ) + ), + readsForP1, + ) + + // p2 gets the receipt only + checkMessages( + Seq( + EventDetails( + SequencerCounter.Genesis, + p2, + Some(requestFromP2.messageId), + defaultExpectedTrafficReceipt, + ) + ), + readsForP2, + ) + + // p3 gets the message + checkMessages( + Seq( + EventDetails( + SequencerCounter.Genesis, + p3, + None, + None, + EnvelopeDetails(messageContent, Recipients.cc(p3)), + ) + ), + readsForP3, + ) + + // p4 gets nothing + checkMessages(Seq(), readsForP4) } } diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/service/GrpcSequencerServiceTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/service/GrpcSequencerServiceTest.scala index c7bf385532..236b66a98b 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/service/GrpcSequencerServiceTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/service/GrpcSequencerServiceTest.scala @@ -560,10 +560,12 @@ class GrpcSequencerServiceTest ) loggerFactory.assertLogs( sendAndCheckError(request) { case SendAsyncError.RequestInvalid(message) => - message should include("Sender is not eligible according to the aggregation rule") + message should include( + s"Sender [$participant] is not eligible according to the aggregation rule" + ) }, _.warningMessage should include( - "Sender is not eligible according to the aggregation rule" + s"Sender [$participant] is not eligible according to the aggregation rule" ), ) } diff --git a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/admin/command_inspection_service.proto b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/admin/command_inspection_service.proto index 08e5e56de6..2b7a5cdae7 100644 --- a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/admin/command_inspection_service.proto +++ b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/admin/command_inspection_service.proto @@ -43,31 +43,35 @@ message GetCommandStatusRequest { } message GetCommandStatusResponse { - message CommandStatus { - google.protobuf.Timestamp started = 1; - google.protobuf.Timestamp completed = 2; - Completion completion = 3; - CommandState state = 4; - repeated Command commands = 5; - message RequestStatistics { - uint32 envelopes = 1; - uint32 request_size = 2; - uint32 recipients = 3; - } - RequestStatistics request_statistics = 6; - message CommandUpdates { - message Contract { - Identifier template_id = 1; - string contract_id = 2; - Value contract_key = 3; - } - repeated Contract created = 1; - repeated Contract archived = 2; - uint32 exercised = 3; - uint32 fetched = 4; - uint32 looked_up_by_key = 5; - } - CommandUpdates updates = 7; - } repeated CommandStatus command_status = 1; } + +message CommandStatus { + google.protobuf.Timestamp started = 1; + google.protobuf.Timestamp completed = 2; + Completion completion = 3; + CommandState state = 4; + repeated Command commands = 5; + RequestStatistics request_statistics = 6; + CommandUpdates updates = 7; +} + +message RequestStatistics { + uint32 envelopes = 1; + uint32 request_size = 2; + uint32 recipients = 3; +} + +message CommandUpdates { + repeated Contract created = 1; + repeated Contract archived = 2; + uint32 exercised = 3; + uint32 fetched = 4; + uint32 looked_up_by_key = 5; +} + +message Contract { + Identifier template_id = 1; + string contract_id = 2; + Value contract_key = 3; +} diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/execution/CommandProgressTracker.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/execution/CommandProgressTracker.scala index 5163e83acd..e8212edbd5 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/execution/CommandProgressTracker.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/execution/CommandProgressTracker.scala @@ -4,13 +4,11 @@ package com.digitalasset.canton.platform.apiserver.execution import com.daml.error.utils.DecodedCantonError -import com.daml.ledger.api.v2.admin.command_inspection_service.GetCommandStatusResponse.CommandStatus.{ - CommandUpdates, - RequestStatistics, -} import com.daml.ledger.api.v2.admin.command_inspection_service.{ CommandState, - GetCommandStatusResponse, + CommandStatus as ApiCommandStatus, + CommandUpdates, + RequestStatistics, } import com.daml.ledger.api.v2.commands.Command import com.daml.ledger.api.v2.completion.Completion @@ -35,8 +33,8 @@ final case class CommandStatus( requestStatistics: RequestStatistics, updates: CommandUpdates, ) extends PrettyPrinting { - def toProto: GetCommandStatusResponse.CommandStatus = { - GetCommandStatusResponse.CommandStatus( + def toProto: ApiCommandStatus = { + ApiCommandStatus( started = Some(started.toProtoTimestamp), completed = completed.map(_.toProtoTimestamp), completion = Some(completion), @@ -92,9 +90,9 @@ final case class CommandStatus( object CommandStatus { def fromProto( - proto: GetCommandStatusResponse.CommandStatus + proto: ApiCommandStatus ): Either[ProtoDeserializationError, CommandStatus] = { - val GetCommandStatusResponse.CommandStatus( + val ApiCommandStatus( startedP, completedP, completionP, diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/submission/EncryptedViewMessageFactory.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/submission/EncryptedViewMessageFactory.scala index 58d5663185..3be56673bd 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/submission/EncryptedViewMessageFactory.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/submission/EncryptedViewMessageFactory.scala @@ -20,7 +20,6 @@ import com.digitalasset.canton.store.SessionKeyStore.RecipientGroup import com.digitalasset.canton.topology.{DomainId, ParticipantId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.version.{HasVersionedToByteString, ProtocolVersion} -import com.google.protobuf.ByteString import scala.concurrent.{ExecutionContext, Future} @@ -65,10 +64,7 @@ object EncryptedViewMessageFactory { partiesWithGroupAddressing <- EitherT.right( cryptoSnapshot.ipsSnapshot.partiesWithGroupAddressing(informeeParties) ) - } yield RecipientsInfo( - informeeParticipants = informeeParticipants, - doNotEncrypt = partiesWithGroupAddressing.nonEmpty, - ) + } yield RecipientsInfo(informeeParticipants = informeeParticipants) } def generateAndEncryptSessionKeyRandomness( @@ -172,45 +168,25 @@ object EncryptedViewMessageFactory { signature: Option[Signature], encryptedView: EncryptedView[VT], ): EitherT[Future, EncryptedViewMessageCreationError, EncryptedViewMessage[VT]] = - (if (!recipientsInfo.doNotEncrypt) { - for { - sessionKeyAndRandomnessMap <- getSessionKey(recipientsInfo) - (sessionKey, sessionKeyRandomnessMap) = sessionKeyAndRandomnessMap - sessionKeyRandomnessMapNE <- EitherT.fromEither[Future]( - NonEmpty - .from(sessionKeyRandomnessMap) - .toRight( - UnableToDetermineSessionKeyRandomness( - "The session key randomness map is empty" - ) - ) - ) - encryptedSessionKeyInfo <- encryptRandomnessWithSessionKey(sessionKey).map( - encryptedRandomness => (encryptedRandomness, sessionKeyRandomnessMapNE) - ) - } yield encryptedSessionKeyInfo - } else { - val encryptedRandomness = Encrypted.fromByteString[SecureRandomness](randomness.unwrap) - eitherT( - Right( - ( - encryptedRandomness, - NonEmpty( - Seq, - AsymmetricEncrypted[SecureRandomness]( - ByteString.EMPTY, - AsymmetricEncrypted.noEncryptionFingerprint, - ), - ), - ) - ) - ) - }).map { case (randomnessV2, sessionKeyMap) => + for { + sessionKeyAndRandomnessMap <- getSessionKey(recipientsInfo) + (sessionKey, sessionKeyRandomnessMap) = sessionKeyAndRandomnessMap + sessionKeyRandomnessMapNE <- EitherT.fromEither[Future]( + NonEmpty + .from(sessionKeyRandomnessMap) + .toRight( + UnableToDetermineSessionKeyRandomness( + "The session key randomness map is empty" + ) + ) + ) + encryptedRandomness <- encryptRandomnessWithSessionKey(sessionKey) + } yield { EncryptedViewMessage[VT]( signature, viewTree.viewHash, - randomnessV2, - sessionKeyMap, + encryptedRandomness, + sessionKeyRandomnessMapNE, encryptedView, viewTree.domainId, viewEncryptionScheme, @@ -277,7 +253,6 @@ object EncryptedViewMessageFactory { cryptoSnapshot.domainId, ): EncryptedViewMessageCreationError } - .map(_.toMap) sealed trait EncryptedViewMessageCreationError extends Product diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/sync/CommandProgressTrackerImpl.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/sync/CommandProgressTrackerImpl.scala index 05af00c9bd..e69015596a 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/sync/CommandProgressTrackerImpl.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/sync/CommandProgressTrackerImpl.scala @@ -3,9 +3,10 @@ package com.digitalasset.canton.participant.sync -import com.daml.ledger.api.v2.admin.command_inspection_service.CommandState -import com.daml.ledger.api.v2.admin.command_inspection_service.GetCommandStatusResponse.CommandStatus.{ +import com.daml.ledger.api.v2.admin.command_inspection_service.{ + CommandState, CommandUpdates, + Contract, RequestStatistics, } import com.daml.ledger.api.v2.commands.Command @@ -129,8 +130,8 @@ class CommandProgressTrackerImpl( def recordTransactionImpact( transaction: LfSubmittedTransaction ): Unit = { - val creates = mutable.ListBuffer.empty[CommandUpdates.Contract] - val archives = mutable.ListBuffer.empty[CommandUpdates.Contract] + val creates = mutable.ListBuffer.empty[Contract] + val archives = mutable.ListBuffer.empty[Contract] final case class Stats( exercised: Int = 0, fetched: Int = 0, @@ -140,8 +141,8 @@ class CommandProgressTrackerImpl( templateId: TypeConName, coid: String, keyOpt: Option[GlobalKeyWithMaintainers], - ): CommandUpdates.Contract = { - CommandUpdates.Contract( + ): Contract = { + Contract( templateId = Some( Identifier( templateId.packageId, diff --git a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/MessageDispatcherTest.scala b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/MessageDispatcherTest.scala index 334d1a4a3f..bce6930633 100644 --- a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/MessageDispatcherTest.scala +++ b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/MessageDispatcherTest.scala @@ -1284,7 +1284,7 @@ trait MessageDispatcherTest { CantonTimestamp.ofEpochSecond(3), domainId, messageId3, - SequencerErrors.SubmissionRequestMalformed("invalid batch"), + SequencerErrors.SubmissionRequestRefused("invalid batch"), testedProtocolVersion, Option.empty[TrafficReceipt], ) diff --git a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/submission/TransactionConfirmationRequestFactoryTest.scala b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/submission/TransactionConfirmationRequestFactoryTest.scala index e81eaf0919..e23b9acc4b 100644 --- a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/submission/TransactionConfirmationRequestFactoryTest.scala +++ b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/submission/TransactionConfirmationRequestFactoryTest.scala @@ -28,7 +28,7 @@ import com.digitalasset.canton.protocol.ExampleTransactionFactory.* import com.digitalasset.canton.protocol.WellFormedTransaction.{WithSuffixes, WithoutSuffixes} import com.digitalasset.canton.protocol.* import com.digitalasset.canton.protocol.messages.* -import com.digitalasset.canton.sequencing.protocol.{MediatorGroupRecipient, OpenEnvelope, Recipient} +import com.digitalasset.canton.sequencing.protocol.{MediatorGroupRecipient, OpenEnvelope} import com.digitalasset.canton.store.SessionKeyStore.RecipientGroup import com.digitalasset.canton.store.SessionKeyStoreWithInMemoryCache import com.digitalasset.canton.topology.MediatorGroup.MediatorGroupIndex @@ -246,7 +246,7 @@ class TransactionConfirmationRequestFactoryTest val cryptoPureApi = cryptoSnapshot.pureCrypto val viewEncryptionScheme = cryptoPureApi.defaultSymmetricKeyScheme - val privateKeysetCache: TrieMap[NonEmpty[Set[Recipient]], SecureRandomness] = + val privateKeysetCache: TrieMap[NonEmpty[Set[ParticipantId]], SecureRandomness] = TrieMap.empty val expectedTransactionViewMessages = example.transactionViewTreesWithWitnesses.map { @@ -307,7 +307,9 @@ class TransactionConfirmationRequestFactoryTest { // simulates session key cache val keySeedSession = privateKeysetCache.getOrElseUpdate( - recipients.leafRecipients, + NonEmpty + .from(participants) + .getOrElse(fail("View without active participants of informees")), cryptoPureApi .computeHkdf( cryptoPureApi.generateSecureRandomness(keySeed.unwrap.size()).unwrap, diff --git a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/store/InFlightSubmissionStoreTest.scala b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/store/InFlightSubmissionStoreTest.scala index 75a5dad1eb..d75f362d2e 100644 --- a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/store/InFlightSubmissionStoreTest.scala +++ b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/store/InFlightSubmissionStoreTest.scala @@ -52,7 +52,7 @@ trait InFlightSubmissionStoreTest extends AsyncWordSpec with BaseTest { completionInfo, TransactionSubmissionTrackingData.CauseWithTemplate( SequencerErrors - .SubmissionRequestMalformed("Some invalid batch") + .SubmissionRequestRefused("Some invalid batch") .rpcStatusWithoutLoggingContext() ), DomainId.tryFromString("da::default"), diff --git a/sdk/canton/ref b/sdk/canton/ref index 57b7f21202..bd77e54e7a 100644 --- a/sdk/canton/ref +++ b/sdk/canton/ref @@ -1 +1 @@ -20240626.13546.vc5b68d04 +20240626.13552.v5a246d80