mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-14 18:24:27 +03:00
update canton to 20240626.13552.v5a246d80 (#19462)
* update canton to 20240626.13552.v5a246d80 * Remove the value.proto hopefully one last time --------- Co-authored-by: Remy Haemmerle <Remy.Haemmerle@daml.com> Co-authored-by: Marcin Ziolek <marcin.ziolek@digitalasset.com>
This commit is contained in:
parent
64128328d9
commit
bfd1235ef2
@ -207,10 +207,6 @@ final case class AsymmetricEncrypted[+M](
|
||||
def encrypted: Encrypted[M] = new Encrypted(ciphertext)
|
||||
}
|
||||
|
||||
object AsymmetricEncrypted {
|
||||
val noEncryptionFingerprint = Fingerprint.tryCreate("no-encryption")
|
||||
}
|
||||
|
||||
/** Key schemes for asymmetric/hybrid encryption. */
|
||||
sealed trait EncryptionKeyScheme extends Product with Serializable with PrettyPrinting {
|
||||
def name: String
|
||||
|
@ -20,7 +20,6 @@ import com.digitalasset.canton.protocol.messages.EncryptedView.checkEncryptionKe
|
||||
import com.digitalasset.canton.protocol.messages.EncryptedViewMessageError.{
|
||||
SessionKeyCreationError,
|
||||
SyncCryptoDecryptError,
|
||||
WrongRandomnessLength,
|
||||
}
|
||||
import com.digitalasset.canton.protocol.messages.ProtocolMessage.ProtocolMessageContentCast
|
||||
import com.digitalasset.canton.protocol.{v30, *}
|
||||
@ -386,91 +385,73 @@ object EncryptedViewMessage extends HasProtocolVersionedCompanion[EncryptedViewM
|
||||
.toEitherT[FutureUnlessShutdown]
|
||||
} yield randomness
|
||||
|
||||
encrypted.sessionKey
|
||||
.collectFirst {
|
||||
case AsymmetricEncrypted(ciphertext, encryptedFor)
|
||||
// if we're using no encryption, it means we're using group addressing
|
||||
// which currently does not support encryption of the randomness
|
||||
if encryptedFor == AsymmetricEncrypted.noEncryptionFingerprint =>
|
||||
SecureRandomness
|
||||
.fromByteString(randomnessLength)(encrypted.randomness.ciphertext)
|
||||
.leftMap[EncryptedViewMessageError](_ =>
|
||||
WrongRandomnessLength(ciphertext.size(), randomnessLength)
|
||||
)
|
||||
.toEitherT[FutureUnlessShutdown]
|
||||
}
|
||||
.getOrElse {
|
||||
for {
|
||||
/* We first need to check whether the target private encryption key exists and is active in the store; otherwise,
|
||||
* we cannot decrypt and should abort. This situation can occur
|
||||
* if an encryption key has been added to this participant's topology by another entity with the
|
||||
* correct rights to do so, but this participant does not have the corresponding private key in the store.
|
||||
*/
|
||||
encryptionKeys <- EitherT
|
||||
.right(
|
||||
FutureUnlessShutdown.outcomeF(snapshot.ipsSnapshot.encryptionKeys(participantId))
|
||||
)
|
||||
.map(_.map(_.id).toSet)
|
||||
encryptedSessionKeyForParticipant <- encrypted.sessionKey
|
||||
.find(e => encryptionKeys.contains(e.encryptedFor))
|
||||
.toRight(
|
||||
EncryptedViewMessageError.MissingParticipantKey(participantId)
|
||||
)
|
||||
.toEitherT[FutureUnlessShutdown]
|
||||
_ <- snapshot.crypto.cryptoPrivateStore
|
||||
.existsDecryptionKey(encryptedSessionKeyForParticipant.encryptedFor)
|
||||
.leftMap(err => EncryptedViewMessageError.PrivateKeyStoreVerificationError(err))
|
||||
.subflatMap {
|
||||
Either.cond(
|
||||
_,
|
||||
(),
|
||||
EncryptedViewMessageError.PrivateKeyStoreVerificationError(
|
||||
FailedToReadKey(
|
||||
encryptedSessionKeyForParticipant.encryptedFor,
|
||||
"matching private key does not exist",
|
||||
)
|
||||
),
|
||||
for {
|
||||
/* We first need to check whether the target private encryption key exists and is active in the store; otherwise,
|
||||
* we cannot decrypt and should abort. This situation can occur
|
||||
* if an encryption key has been added to this participant's topology by another entity with the
|
||||
* correct rights to do so, but this participant does not have the corresponding private key in the store.
|
||||
*/
|
||||
encryptionKeys <- EitherT
|
||||
.right(
|
||||
FutureUnlessShutdown.outcomeF(snapshot.ipsSnapshot.encryptionKeys(participantId))
|
||||
)
|
||||
.map(_.map(_.id).toSet)
|
||||
encryptedSessionKeyForParticipant <- encrypted.sessionKey
|
||||
.find(e => encryptionKeys.contains(e.encryptedFor))
|
||||
.toRight(
|
||||
EncryptedViewMessageError.MissingParticipantKey(participantId)
|
||||
)
|
||||
.toEitherT[FutureUnlessShutdown]
|
||||
_ <- snapshot.crypto.cryptoPrivateStore
|
||||
.existsDecryptionKey(encryptedSessionKeyForParticipant.encryptedFor)
|
||||
.leftMap(err => EncryptedViewMessageError.PrivateKeyStoreVerificationError(err))
|
||||
.subflatMap {
|
||||
Either.cond(
|
||||
_,
|
||||
(),
|
||||
EncryptedViewMessageError.PrivateKeyStoreVerificationError(
|
||||
FailedToReadKey(
|
||||
encryptedSessionKeyForParticipant.encryptedFor,
|
||||
"matching private key does not exist",
|
||||
)
|
||||
}
|
||||
_ <- checkEncryptionKeyScheme(
|
||||
snapshot.crypto.cryptoPublicStore,
|
||||
encryptedSessionKeyForParticipant.encryptedFor,
|
||||
allowedEncryptionKeySchemes,
|
||||
),
|
||||
)
|
||||
.leftMap(err =>
|
||||
EncryptedViewMessageError
|
||||
.SyncCryptoDecryptError(
|
||||
SyncCryptoDecryptionError(err)
|
||||
)
|
||||
}
|
||||
_ <- checkEncryptionKeyScheme(
|
||||
snapshot.crypto.cryptoPublicStore,
|
||||
encryptedSessionKeyForParticipant.encryptedFor,
|
||||
allowedEncryptionKeySchemes,
|
||||
)
|
||||
.leftMap(err =>
|
||||
EncryptedViewMessageError
|
||||
.SyncCryptoDecryptError(
|
||||
SyncCryptoDecryptionError(err)
|
||||
)
|
||||
)
|
||||
|
||||
// we get the randomness for the session key from the message or by searching the cache,
|
||||
// which means that a previous view with the same recipients has been received before.
|
||||
skRandom <-
|
||||
// we try to search for the cached session key randomness. If it does not exist
|
||||
// (or is disabled) we decrypt and store it
|
||||
// the result in the cache. There is no need to sync on this read-write operation because
|
||||
// there is not problem if the value gets re-written.
|
||||
sessionKeyStore
|
||||
.getSessionKeyRandomness(
|
||||
snapshot.crypto.privateCrypto,
|
||||
encrypted.viewEncryptionScheme.keySizeInBytes,
|
||||
encryptedSessionKeyForParticipant,
|
||||
)
|
||||
.leftMap[EncryptedViewMessageError](err =>
|
||||
SyncCryptoDecryptError(
|
||||
SyncCryptoDecryptionError(err)
|
||||
)
|
||||
)
|
||||
viewRandomness <- decryptViewRandomness(skRandom)
|
||||
} yield viewRandomness
|
||||
}
|
||||
// we get the randomness for the session key from the message or by searching the cache,
|
||||
// which means that a previous view with the same recipients has been received before.
|
||||
skRandom <-
|
||||
// we try to search for the cached session key randomness. If it does not exist
|
||||
// (or is disabled) we decrypt and store it
|
||||
// the result in the cache. There is no need to sync on this read-write operation because
|
||||
// there is not problem if the value gets re-written.
|
||||
sessionKeyStore
|
||||
.getSessionKeyRandomness(
|
||||
snapshot.crypto.privateCrypto,
|
||||
encrypted.viewEncryptionScheme.keySizeInBytes,
|
||||
encryptedSessionKeyForParticipant,
|
||||
)
|
||||
.leftMap[EncryptedViewMessageError](err =>
|
||||
SyncCryptoDecryptError(
|
||||
SyncCryptoDecryptionError(err)
|
||||
)
|
||||
)
|
||||
viewRandomness <- decryptViewRandomness(skRandom)
|
||||
} yield viewRandomness
|
||||
}
|
||||
|
||||
final case class RecipientsInfo(
|
||||
informeeParticipants: Set[ParticipantId],
|
||||
doNotEncrypt: Boolean,
|
||||
)
|
||||
final case class RecipientsInfo(informeeParticipants: Set[ParticipantId])
|
||||
|
||||
private def eitherT[VT <: ViewType, B](value: Either[EncryptedViewMessageError, B])(implicit
|
||||
ec: ExecutionContext
|
||||
|
@ -7,7 +7,13 @@ import com.daml.error.*
|
||||
import com.digitalasset.canton.SequencerCounter
|
||||
import com.digitalasset.canton.data.CantonTimestamp
|
||||
import com.digitalasset.canton.error.CantonErrorGroups.SequencerErrorGroup
|
||||
import com.digitalasset.canton.error.{BaseCantonError, TransactionError, TransactionErrorImpl}
|
||||
import com.digitalasset.canton.error.{
|
||||
Alarm,
|
||||
AlarmErrorCode,
|
||||
BaseCantonError,
|
||||
TransactionError,
|
||||
TransactionErrorImpl,
|
||||
}
|
||||
import com.digitalasset.canton.topology.Member
|
||||
import com.google.rpc.status.Status
|
||||
|
||||
@ -36,15 +42,24 @@ sealed abstract class SequencerDeliverErrorCode(id: String, category: ErrorCateg
|
||||
|
||||
@Explanation("""Delivery errors wrapped into sequenced events""")
|
||||
object SequencerErrors extends SequencerErrorGroup {
|
||||
@Explanation("""This error occurs when the sequencer cannot parse the submission request.""")
|
||||
@Resolution(
|
||||
"""This usually indicates a misconfiguration of the system components or an application bug and requires operator intervention."""
|
||||
)
|
||||
@Explanation("""
|
||||
|This error occurs when the sequencer receives an invalid submission request, e.g. it has an
|
||||
|aggregation rule with an unreachable threshold.
|
||||
|Malformed requests will not emit any deliver event.
|
||||
|""".stripMargin)
|
||||
@Resolution("""
|
||||
|Check if the sender is running an attack.
|
||||
|If you can rule out an attack, please reach out to Canton support.
|
||||
|""".stripMargin)
|
||||
case object SubmissionRequestMalformed
|
||||
extends SequencerDeliverErrorCode(
|
||||
id = "SEQUENCER_SUBMISSION_REQUEST_MALFORMED",
|
||||
ErrorCategory.InvalidIndependentOfSystemState,
|
||||
)
|
||||
extends AlarmErrorCode(id = "SEQUENCER_SUBMISSION_REQUEST_MALFORMED") {
|
||||
final case class Error(
|
||||
submissionRequest: SubmissionRequest,
|
||||
error: String,
|
||||
) extends Alarm({
|
||||
s"Send request [${submissionRequest.messageId}] is malformed. Discarding request. $error"
|
||||
})
|
||||
}
|
||||
|
||||
@Explanation(
|
||||
"""This error occurs when the sequencer cannot accept submission request due to the current state of the system."""
|
||||
|
@ -152,7 +152,7 @@ class TrafficControlProcessorTest extends AnyWordSpec with BaseTest with HasExec
|
||||
ts,
|
||||
domainId,
|
||||
MessageId.fromUuid(new UUID(0, 1)),
|
||||
SequencerErrors.SubmissionRequestMalformed("Some error"),
|
||||
SequencerErrors.SubmissionRequestRefused("Some error"),
|
||||
testedProtocolVersion,
|
||||
Option.empty[TrafficReceipt],
|
||||
)
|
||||
|
@ -781,24 +781,34 @@ private[update] final class SubmissionRequestValidator(
|
||||
)(implicit
|
||||
executionContext: ExecutionContext,
|
||||
traceContext: TraceContext,
|
||||
): EitherT[Future, SubmissionRequestOutcome, InFlightAggregationUpdate] =
|
||||
): EitherT[Future, SubmissionRequestOutcome, InFlightAggregationUpdate] = {
|
||||
val rule = submissionRequest.aggregationRule.getOrElse(
|
||||
ErrorUtil.internalError(
|
||||
new IllegalStateException(
|
||||
"A submission request with an aggregation id must have an aggregation rule"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for {
|
||||
inFlightAggregationAndUpdate <- {
|
||||
EitherT
|
||||
.fromOption(inFlightAggregationO, ())
|
||||
.map(_ -> InFlightAggregationUpdate.empty)
|
||||
.leftFlatMap { _ =>
|
||||
val rule = submissionRequest.aggregationRule.getOrElse(
|
||||
ErrorUtil.internalError(
|
||||
new IllegalStateException(
|
||||
"A submission request with an aggregation id must have an aggregation rule"
|
||||
)
|
||||
)
|
||||
inFlightAggregationAndUpdate <- inFlightAggregationO match {
|
||||
case None =>
|
||||
// New aggregation
|
||||
validateAggregationRule(state, submissionRequest, sequencingTimestamp, rule).map { _ =>
|
||||
val fresh = FreshInFlightAggregation(submissionRequest.maxSequencingTime, rule)
|
||||
InFlightAggregation.initial(fresh) -> InFlightAggregationUpdate(
|
||||
Some(fresh),
|
||||
Chain.empty,
|
||||
)
|
||||
validateAggregationRule(state, submissionRequest, sequencingTimestamp, rule)
|
||||
}
|
||||
|
||||
case Some(inFlightAggregation) =>
|
||||
// Existing aggregation
|
||||
wellFormedAggregationRule(submissionRequest, rule)
|
||||
.map(_ => inFlightAggregation -> InFlightAggregationUpdate.empty)
|
||||
}
|
||||
(inFlightAggregation, inFlightAggregationUpdate) = inFlightAggregationAndUpdate
|
||||
|
||||
aggregatedSender = AggregatedSender(
|
||||
submissionRequest.sender,
|
||||
AggregationBySender(
|
||||
@ -856,6 +866,7 @@ private[update] final class SubmissionRequestValidator(
|
||||
},
|
||||
)
|
||||
} yield fullInFlightAggregationUpdate
|
||||
}
|
||||
|
||||
private def validateAggregationRule(
|
||||
state: BlockUpdateEphemeralState,
|
||||
@ -865,20 +876,10 @@ private[update] final class SubmissionRequestValidator(
|
||||
)(implicit
|
||||
executionContext: ExecutionContext,
|
||||
traceContext: TraceContext,
|
||||
): EitherT[Future, SubmissionRequestOutcome, (InFlightAggregation, InFlightAggregationUpdate)] =
|
||||
): EitherT[Future, SubmissionRequestOutcome, Unit] =
|
||||
for {
|
||||
_ <- EitherT.fromEither(
|
||||
SequencerValidations
|
||||
.wellformedAggregationRule(submissionRequest.sender, rule)
|
||||
.leftMap(message =>
|
||||
invalidSubmissionRequest(
|
||||
state,
|
||||
submissionRequest,
|
||||
sequencingTimestamp,
|
||||
SequencerErrors.SubmissionRequestMalformed(message),
|
||||
)
|
||||
)
|
||||
)
|
||||
_ <- wellFormedAggregationRule(submissionRequest, rule)
|
||||
|
||||
unregisteredEligibleMembers <- {
|
||||
if (unifiedSequencer) {
|
||||
EitherT.right(
|
||||
@ -909,10 +910,25 @@ private[update] final class SubmissionRequestValidator(
|
||||
),
|
||||
),
|
||||
)
|
||||
fresh = FreshInFlightAggregation(submissionRequest.maxSequencingTime, rule)
|
||||
} yield InFlightAggregation.initial(fresh) -> InFlightAggregationUpdate(
|
||||
Some(fresh),
|
||||
Chain.empty,
|
||||
} yield ()
|
||||
|
||||
private def wellFormedAggregationRule(
|
||||
submissionRequest: SubmissionRequest,
|
||||
rule: AggregationRule,
|
||||
)(implicit
|
||||
executionContext: ExecutionContext,
|
||||
traceContext: TraceContext,
|
||||
): EitherT[Future, SubmissionRequestOutcome, Unit] =
|
||||
EitherT.fromEither(
|
||||
SequencerValidations
|
||||
.wellformedAggregationRule(submissionRequest.sender, rule)
|
||||
.leftMap { message =>
|
||||
val alarm = SequencerErrors.SubmissionRequestMalformed
|
||||
.Error(submissionRequest, message)
|
||||
alarm.report()
|
||||
|
||||
SubmissionRequestOutcome.discardSubmissionRequest
|
||||
}
|
||||
)
|
||||
|
||||
private def deliverReceipt(
|
||||
|
@ -6,11 +6,14 @@ package com.digitalasset.canton.domain.sequencing.sequencer
|
||||
import cats.data.EitherT
|
||||
import cats.syntax.parallel.*
|
||||
import com.digitalasset.canton.data.CantonTimestamp
|
||||
import com.digitalasset.canton.sequencing.protocol.SendAsyncError
|
||||
import com.digitalasset.canton.topology.Member
|
||||
import com.digitalasset.canton.tracing.TraceContext
|
||||
import com.digitalasset.canton.util.FutureInstances.parallelFuture
|
||||
import com.digitalasset.canton.util.MonadUtil
|
||||
import com.digitalasset.canton.util.retry.RetryUtil.NoExnRetryable
|
||||
import com.digitalasset.canton.util.{MonadUtil, retry}
|
||||
|
||||
import scala.concurrent.duration.*
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/** This trait defines the interface for BlockSequencer's BlockUpdateGenerator to use on DatabaseSequencer
|
||||
@ -49,6 +52,26 @@ object SequencerIntegration {
|
||||
|
||||
trait DatabaseSequencerIntegration extends SequencerIntegration {
|
||||
this: DatabaseSequencer =>
|
||||
|
||||
private implicit val retryConditionIfOverloaded: retry.Success[Either[SendAsyncError, Unit]] =
|
||||
new retry.Success({
|
||||
// We only retry overloaded as other possible error here:
|
||||
// * Unavailable - indicates a programming bug and should not happen during normal operation
|
||||
// * ShuttingDown - should not be retried as the sequencer is shutting down
|
||||
case Left(SendAsyncError.Overloaded(_)) => false
|
||||
case _ => true
|
||||
})
|
||||
private val retryWithBackoff = retry.Backoff(
|
||||
logger = logger,
|
||||
flagCloseable = this,
|
||||
maxRetries = retry.Forever,
|
||||
// TODO(#18407): Consider making the values below configurable
|
||||
initialDelay = 10.milliseconds,
|
||||
maxDelay = 250.milliseconds,
|
||||
operationName = "block-sequencer-write-internal",
|
||||
longDescription = "Write the processed submissions into database sequencer store",
|
||||
)
|
||||
|
||||
override def blockSequencerAcknowledge(acknowledgements: Map[Member, CantonTimestamp])(implicit
|
||||
executionContext: ExecutionContext,
|
||||
traceContext: TraceContext,
|
||||
@ -70,8 +93,13 @@ trait DatabaseSequencerIntegration extends SequencerIntegration {
|
||||
case _: SubmissionOutcome.Discard.type =>
|
||||
EitherT.pure[Future, String](())
|
||||
case outcome: DeliverableSubmissionOutcome =>
|
||||
this
|
||||
.blockSequencerWriteInternal(outcome)(outcome.submissionTraceContext)
|
||||
.leftMap(_.toString)
|
||||
EitherT(
|
||||
retryWithBackoff(
|
||||
this
|
||||
.blockSequencerWriteInternal(outcome)(outcome.submissionTraceContext)
|
||||
.value,
|
||||
NoExnRetryable,
|
||||
)
|
||||
).leftMap(_.toString)
|
||||
}
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ object Sequencer extends HasLoggerName {
|
||||
*/
|
||||
type SenderSigned[A <: HasCryptographicEvidence] = SignedContent[A]
|
||||
|
||||
/** Type alias for content that has been signed but the sequencer. The purpose of this is to identify which sequencer has processed a submission request,
|
||||
/** Type alias for content that has been signed by the sequencer. The purpose of this is to identify which sequencer has processed a submission request,
|
||||
* such that after the request is ordered and processed by all sequencers, each sequencer knows which sequencer received the submission request.
|
||||
* The signature here will always be one of a sequencer.
|
||||
*/
|
||||
|
@ -42,7 +42,7 @@ object DatabaseSequencerConfig {
|
||||
// TODO(#18407): Allow configuration of database sequencer as a part of unified sequencer
|
||||
// instead of hardcoding the values below
|
||||
private[sequencer] final case class ForBlockSequencer(
|
||||
writer: SequencerWriterConfig = SequencerWriterConfig.LowLatency(),
|
||||
writer: SequencerWriterConfig = SequencerWriterConfig.HighThroughput(),
|
||||
reader: SequencerReaderConfig = new SequencerReaderConfig {
|
||||
override val readBatchSize: Int = SequencerReaderConfig.defaultReadBatchSize
|
||||
override val checkpointInterval: NonNegativeFiniteDuration =
|
||||
|
@ -54,7 +54,7 @@ object SequencerValidations {
|
||||
_ <- Either.cond(
|
||||
eligibleSenders.contains(sender),
|
||||
(),
|
||||
"Sender is not eligible according to the aggregation rule",
|
||||
s"Sender [$sender] is not eligible according to the aggregation rule",
|
||||
)
|
||||
} yield ()
|
||||
}
|
||||
|
@ -15,8 +15,8 @@ import com.digitalasset.canton.domain.sequencing.sequencer.errors.CreateSubscrip
|
||||
import com.digitalasset.canton.domain.sequencing.sequencer.errors.SequencerError.ExceededMaxSequencingTime
|
||||
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer as CantonSequencer
|
||||
import com.digitalasset.canton.lifecycle.Lifecycle
|
||||
import com.digitalasset.canton.logging.SuppressionRule
|
||||
import com.digitalasset.canton.logging.pretty.Pretty
|
||||
import com.digitalasset.canton.logging.{LogEntry, SuppressionRule}
|
||||
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
|
||||
import com.digitalasset.canton.sequencing.protocol.SendAsyncError.RequestInvalid
|
||||
import com.digitalasset.canton.sequencing.protocol.*
|
||||
@ -363,7 +363,7 @@ abstract class SequencerApiTest
|
||||
}
|
||||
}
|
||||
|
||||
"bounce on write path aggregate submissions with maxSequencingTime exceeding bound" onlyRunWhen (testAggregation) in {
|
||||
"bounce on write path aggregate submissions with maxSequencingTime exceeding bound" onlyRunWhen testAggregation in {
|
||||
env =>
|
||||
import env.*
|
||||
|
||||
@ -742,8 +742,9 @@ abstract class SequencerApiTest
|
||||
import env.*
|
||||
|
||||
// TODO(i10412): See above
|
||||
val faultyThreshold = PositiveInt.tryCreate(2)
|
||||
val aggregationRule =
|
||||
AggregationRule(NonEmpty(Seq, p17, p17), PositiveInt.tryCreate(2), testedProtocolVersion)
|
||||
AggregationRule(NonEmpty(Seq, p17, p17), faultyThreshold, testedProtocolVersion)
|
||||
|
||||
val messageId = MessageId.tryCreate("unreachable-threshold")
|
||||
val request = SubmissionRequest.tryCreate(
|
||||
@ -758,13 +759,27 @@ abstract class SequencerApiTest
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async")
|
||||
reads <- readForMembers(Seq(p17), sequencer)
|
||||
reads <- loggerFactory.assertLoggedWarningsAndErrorsSeq(
|
||||
for {
|
||||
_ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async")
|
||||
reads <- readForMembers(Seq(p17), sequencer, timeout = 5.seconds)
|
||||
} yield reads,
|
||||
LogEntry.assertLogSeq(
|
||||
Seq(
|
||||
(
|
||||
_.shouldBeCantonError(
|
||||
SequencerErrors.SubmissionRequestMalformed,
|
||||
_ shouldBe s"Send request [$messageId] is malformed. " +
|
||||
s"Discarding request. Threshold $faultyThreshold cannot be reached",
|
||||
),
|
||||
"p17's submission generates an alarm",
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
} yield {
|
||||
checkRejection(reads, p17, messageId, defaultExpectedTrafficReceipt) {
|
||||
case SequencerErrors.SubmissionRequestMalformed(reason) =>
|
||||
reason should include("Threshold 2 cannot be reached")
|
||||
}
|
||||
// p17 gets nothing
|
||||
checkMessages(Seq(), reads)
|
||||
}
|
||||
}
|
||||
|
||||
@ -775,7 +790,7 @@ abstract class SequencerApiTest
|
||||
val aggregationRule =
|
||||
AggregationRule(NonEmpty(Seq, p17), PositiveInt.tryCreate(1), testedProtocolVersion)
|
||||
|
||||
val messageId = MessageId.tryCreate("unreachable-threshold")
|
||||
val messageId = MessageId.tryCreate("first-sender-not-eligible")
|
||||
val request = SubmissionRequest.tryCreate(
|
||||
p18,
|
||||
messageId,
|
||||
@ -788,13 +803,130 @@ abstract class SequencerApiTest
|
||||
)
|
||||
|
||||
for {
|
||||
_ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async")
|
||||
reads <- readForMembers(Seq(p18), sequencer)
|
||||
reads <- loggerFactory.assertLoggedWarningsAndErrorsSeq(
|
||||
for {
|
||||
_ <- sequencer.sendAsyncSigned(sign(request)).valueOrFailShutdown("Sent async")
|
||||
reads <- readForMembers(Seq(p18), sequencer, timeout = 5.seconds)
|
||||
} yield reads,
|
||||
LogEntry.assertLogSeq(
|
||||
Seq(
|
||||
(
|
||||
_.shouldBeCantonError(
|
||||
SequencerErrors.SubmissionRequestMalformed,
|
||||
_ shouldBe s"Send request [$messageId] is malformed. " +
|
||||
s"Discarding request. Sender [$p18] is not eligible according to the aggregation rule",
|
||||
),
|
||||
"p18's submission generates an alarm",
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
} yield {
|
||||
checkRejection(reads, p18, messageId, defaultExpectedTrafficReceipt) {
|
||||
case SequencerErrors.SubmissionRequestMalformed(reason) =>
|
||||
reason should include("Sender is not eligible according to the aggregation rule")
|
||||
}
|
||||
// p18 gets nothing
|
||||
checkMessages(Seq(), reads)
|
||||
}
|
||||
}
|
||||
|
||||
"prevent non-eligible senders from contributing" onlyRunWhen testAggregation in { env =>
|
||||
import env.*
|
||||
|
||||
val messageContent = "aggregatable-message"
|
||||
val aggregationRule =
|
||||
AggregationRule(NonEmpty(Seq, p1, p2), PositiveInt.tryCreate(2), testedProtocolVersion)
|
||||
|
||||
val requestFromP1 = createSendRequest(
|
||||
sender = p1,
|
||||
messageContent,
|
||||
Recipients.cc(p3),
|
||||
maxSequencingTime = CantonTimestamp.Epoch.add(Duration.ofSeconds(60)),
|
||||
aggregationRule = Some(aggregationRule),
|
||||
topologyTimestamp = Some(CantonTimestamp.Epoch),
|
||||
)
|
||||
|
||||
// Request with non-eligible sender
|
||||
val messageId = MessageId.tryCreate("further-sender-not-eligible")
|
||||
val requestFromP4 = requestFromP1.copy(sender = p4, messageId = messageId)
|
||||
|
||||
val requestFromP2 =
|
||||
requestFromP1.copy(sender = p2, messageId = MessageId.fromUuid(new UUID(1, 2)))
|
||||
|
||||
for {
|
||||
_ <- sequencer
|
||||
.sendAsyncSigned(sign(requestFromP1))
|
||||
.valueOrFailShutdown("Sent async for participant1")
|
||||
|
||||
readsForP1 <- readForMembers(Seq(p1), sequencer)
|
||||
|
||||
readsForP4 <- loggerFactory.assertLoggedWarningsAndErrorsSeq(
|
||||
for {
|
||||
_ <- sequencer
|
||||
.sendAsyncSigned(sign(requestFromP4))
|
||||
.valueOrFailShutdown("Sent async for non-eligible participant4")
|
||||
reads <- readForMembers(Seq(p4), sequencer, timeout = 5.seconds)
|
||||
} yield reads,
|
||||
LogEntry.assertLogSeq(
|
||||
Seq(
|
||||
(
|
||||
_.shouldBeCantonError(
|
||||
SequencerErrors.SubmissionRequestMalformed,
|
||||
_ shouldBe s"Send request [$messageId] is malformed. " +
|
||||
s"Discarding request. Sender [$p4] is not eligible according to the aggregation rule",
|
||||
),
|
||||
"p4's submission generates an alarm",
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
_ <- sequencer
|
||||
.sendAsyncSigned(sign(requestFromP2))
|
||||
.valueOrFailShutdown("Sent async for participant2")
|
||||
|
||||
readsForP2 <- readForMembers(Seq(p2), sequencer)
|
||||
readsForP3 <- readForMembers(Seq(p3), sequencer)
|
||||
} yield {
|
||||
// p1 gets the receipt immediately
|
||||
checkMessages(
|
||||
Seq(
|
||||
EventDetails(
|
||||
SequencerCounter.Genesis,
|
||||
p1,
|
||||
Some(requestFromP1.messageId),
|
||||
defaultExpectedTrafficReceipt,
|
||||
)
|
||||
),
|
||||
readsForP1,
|
||||
)
|
||||
|
||||
// p2 gets the receipt only
|
||||
checkMessages(
|
||||
Seq(
|
||||
EventDetails(
|
||||
SequencerCounter.Genesis,
|
||||
p2,
|
||||
Some(requestFromP2.messageId),
|
||||
defaultExpectedTrafficReceipt,
|
||||
)
|
||||
),
|
||||
readsForP2,
|
||||
)
|
||||
|
||||
// p3 gets the message
|
||||
checkMessages(
|
||||
Seq(
|
||||
EventDetails(
|
||||
SequencerCounter.Genesis,
|
||||
p3,
|
||||
None,
|
||||
None,
|
||||
EnvelopeDetails(messageContent, Recipients.cc(p3)),
|
||||
)
|
||||
),
|
||||
readsForP3,
|
||||
)
|
||||
|
||||
// p4 gets nothing
|
||||
checkMessages(Seq(), readsForP4)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -560,10 +560,12 @@ class GrpcSequencerServiceTest
|
||||
)
|
||||
loggerFactory.assertLogs(
|
||||
sendAndCheckError(request) { case SendAsyncError.RequestInvalid(message) =>
|
||||
message should include("Sender is not eligible according to the aggregation rule")
|
||||
message should include(
|
||||
s"Sender [$participant] is not eligible according to the aggregation rule"
|
||||
)
|
||||
},
|
||||
_.warningMessage should include(
|
||||
"Sender is not eligible according to the aggregation rule"
|
||||
s"Sender [$participant] is not eligible according to the aggregation rule"
|
||||
),
|
||||
)
|
||||
}
|
||||
|
@ -43,31 +43,35 @@ message GetCommandStatusRequest {
|
||||
}
|
||||
|
||||
message GetCommandStatusResponse {
|
||||
message CommandStatus {
|
||||
google.protobuf.Timestamp started = 1;
|
||||
google.protobuf.Timestamp completed = 2;
|
||||
Completion completion = 3;
|
||||
CommandState state = 4;
|
||||
repeated Command commands = 5;
|
||||
message RequestStatistics {
|
||||
uint32 envelopes = 1;
|
||||
uint32 request_size = 2;
|
||||
uint32 recipients = 3;
|
||||
}
|
||||
RequestStatistics request_statistics = 6;
|
||||
message CommandUpdates {
|
||||
message Contract {
|
||||
Identifier template_id = 1;
|
||||
string contract_id = 2;
|
||||
Value contract_key = 3;
|
||||
}
|
||||
repeated Contract created = 1;
|
||||
repeated Contract archived = 2;
|
||||
uint32 exercised = 3;
|
||||
uint32 fetched = 4;
|
||||
uint32 looked_up_by_key = 5;
|
||||
}
|
||||
CommandUpdates updates = 7;
|
||||
}
|
||||
repeated CommandStatus command_status = 1;
|
||||
}
|
||||
|
||||
message CommandStatus {
|
||||
google.protobuf.Timestamp started = 1;
|
||||
google.protobuf.Timestamp completed = 2;
|
||||
Completion completion = 3;
|
||||
CommandState state = 4;
|
||||
repeated Command commands = 5;
|
||||
RequestStatistics request_statistics = 6;
|
||||
CommandUpdates updates = 7;
|
||||
}
|
||||
|
||||
message RequestStatistics {
|
||||
uint32 envelopes = 1;
|
||||
uint32 request_size = 2;
|
||||
uint32 recipients = 3;
|
||||
}
|
||||
|
||||
message CommandUpdates {
|
||||
repeated Contract created = 1;
|
||||
repeated Contract archived = 2;
|
||||
uint32 exercised = 3;
|
||||
uint32 fetched = 4;
|
||||
uint32 looked_up_by_key = 5;
|
||||
}
|
||||
|
||||
message Contract {
|
||||
Identifier template_id = 1;
|
||||
string contract_id = 2;
|
||||
Value contract_key = 3;
|
||||
}
|
||||
|
@ -4,13 +4,11 @@
|
||||
package com.digitalasset.canton.platform.apiserver.execution
|
||||
|
||||
import com.daml.error.utils.DecodedCantonError
|
||||
import com.daml.ledger.api.v2.admin.command_inspection_service.GetCommandStatusResponse.CommandStatus.{
|
||||
CommandUpdates,
|
||||
RequestStatistics,
|
||||
}
|
||||
import com.daml.ledger.api.v2.admin.command_inspection_service.{
|
||||
CommandState,
|
||||
GetCommandStatusResponse,
|
||||
CommandStatus as ApiCommandStatus,
|
||||
CommandUpdates,
|
||||
RequestStatistics,
|
||||
}
|
||||
import com.daml.ledger.api.v2.commands.Command
|
||||
import com.daml.ledger.api.v2.completion.Completion
|
||||
@ -35,8 +33,8 @@ final case class CommandStatus(
|
||||
requestStatistics: RequestStatistics,
|
||||
updates: CommandUpdates,
|
||||
) extends PrettyPrinting {
|
||||
def toProto: GetCommandStatusResponse.CommandStatus = {
|
||||
GetCommandStatusResponse.CommandStatus(
|
||||
def toProto: ApiCommandStatus = {
|
||||
ApiCommandStatus(
|
||||
started = Some(started.toProtoTimestamp),
|
||||
completed = completed.map(_.toProtoTimestamp),
|
||||
completion = Some(completion),
|
||||
@ -92,9 +90,9 @@ final case class CommandStatus(
|
||||
|
||||
object CommandStatus {
|
||||
def fromProto(
|
||||
proto: GetCommandStatusResponse.CommandStatus
|
||||
proto: ApiCommandStatus
|
||||
): Either[ProtoDeserializationError, CommandStatus] = {
|
||||
val GetCommandStatusResponse.CommandStatus(
|
||||
val ApiCommandStatus(
|
||||
startedP,
|
||||
completedP,
|
||||
completionP,
|
||||
|
@ -20,7 +20,6 @@ import com.digitalasset.canton.store.SessionKeyStore.RecipientGroup
|
||||
import com.digitalasset.canton.topology.{DomainId, ParticipantId}
|
||||
import com.digitalasset.canton.tracing.TraceContext
|
||||
import com.digitalasset.canton.version.{HasVersionedToByteString, ProtocolVersion}
|
||||
import com.google.protobuf.ByteString
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
@ -65,10 +64,7 @@ object EncryptedViewMessageFactory {
|
||||
partiesWithGroupAddressing <- EitherT.right(
|
||||
cryptoSnapshot.ipsSnapshot.partiesWithGroupAddressing(informeeParties)
|
||||
)
|
||||
} yield RecipientsInfo(
|
||||
informeeParticipants = informeeParticipants,
|
||||
doNotEncrypt = partiesWithGroupAddressing.nonEmpty,
|
||||
)
|
||||
} yield RecipientsInfo(informeeParticipants = informeeParticipants)
|
||||
}
|
||||
|
||||
def generateAndEncryptSessionKeyRandomness(
|
||||
@ -172,45 +168,25 @@ object EncryptedViewMessageFactory {
|
||||
signature: Option[Signature],
|
||||
encryptedView: EncryptedView[VT],
|
||||
): EitherT[Future, EncryptedViewMessageCreationError, EncryptedViewMessage[VT]] =
|
||||
(if (!recipientsInfo.doNotEncrypt) {
|
||||
for {
|
||||
sessionKeyAndRandomnessMap <- getSessionKey(recipientsInfo)
|
||||
(sessionKey, sessionKeyRandomnessMap) = sessionKeyAndRandomnessMap
|
||||
sessionKeyRandomnessMapNE <- EitherT.fromEither[Future](
|
||||
NonEmpty
|
||||
.from(sessionKeyRandomnessMap)
|
||||
.toRight(
|
||||
UnableToDetermineSessionKeyRandomness(
|
||||
"The session key randomness map is empty"
|
||||
)
|
||||
)
|
||||
)
|
||||
encryptedSessionKeyInfo <- encryptRandomnessWithSessionKey(sessionKey).map(
|
||||
encryptedRandomness => (encryptedRandomness, sessionKeyRandomnessMapNE)
|
||||
)
|
||||
} yield encryptedSessionKeyInfo
|
||||
} else {
|
||||
val encryptedRandomness = Encrypted.fromByteString[SecureRandomness](randomness.unwrap)
|
||||
eitherT(
|
||||
Right(
|
||||
(
|
||||
encryptedRandomness,
|
||||
NonEmpty(
|
||||
Seq,
|
||||
AsymmetricEncrypted[SecureRandomness](
|
||||
ByteString.EMPTY,
|
||||
AsymmetricEncrypted.noEncryptionFingerprint,
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
)
|
||||
}).map { case (randomnessV2, sessionKeyMap) =>
|
||||
for {
|
||||
sessionKeyAndRandomnessMap <- getSessionKey(recipientsInfo)
|
||||
(sessionKey, sessionKeyRandomnessMap) = sessionKeyAndRandomnessMap
|
||||
sessionKeyRandomnessMapNE <- EitherT.fromEither[Future](
|
||||
NonEmpty
|
||||
.from(sessionKeyRandomnessMap)
|
||||
.toRight(
|
||||
UnableToDetermineSessionKeyRandomness(
|
||||
"The session key randomness map is empty"
|
||||
)
|
||||
)
|
||||
)
|
||||
encryptedRandomness <- encryptRandomnessWithSessionKey(sessionKey)
|
||||
} yield {
|
||||
EncryptedViewMessage[VT](
|
||||
signature,
|
||||
viewTree.viewHash,
|
||||
randomnessV2,
|
||||
sessionKeyMap,
|
||||
encryptedRandomness,
|
||||
sessionKeyRandomnessMapNE,
|
||||
encryptedView,
|
||||
viewTree.domainId,
|
||||
viewEncryptionScheme,
|
||||
@ -277,7 +253,6 @@ object EncryptedViewMessageFactory {
|
||||
cryptoSnapshot.domainId,
|
||||
): EncryptedViewMessageCreationError
|
||||
}
|
||||
.map(_.toMap)
|
||||
|
||||
sealed trait EncryptedViewMessageCreationError
|
||||
extends Product
|
||||
|
@ -3,9 +3,10 @@
|
||||
|
||||
package com.digitalasset.canton.participant.sync
|
||||
|
||||
import com.daml.ledger.api.v2.admin.command_inspection_service.CommandState
|
||||
import com.daml.ledger.api.v2.admin.command_inspection_service.GetCommandStatusResponse.CommandStatus.{
|
||||
import com.daml.ledger.api.v2.admin.command_inspection_service.{
|
||||
CommandState,
|
||||
CommandUpdates,
|
||||
Contract,
|
||||
RequestStatistics,
|
||||
}
|
||||
import com.daml.ledger.api.v2.commands.Command
|
||||
@ -129,8 +130,8 @@ class CommandProgressTrackerImpl(
|
||||
def recordTransactionImpact(
|
||||
transaction: LfSubmittedTransaction
|
||||
): Unit = {
|
||||
val creates = mutable.ListBuffer.empty[CommandUpdates.Contract]
|
||||
val archives = mutable.ListBuffer.empty[CommandUpdates.Contract]
|
||||
val creates = mutable.ListBuffer.empty[Contract]
|
||||
val archives = mutable.ListBuffer.empty[Contract]
|
||||
final case class Stats(
|
||||
exercised: Int = 0,
|
||||
fetched: Int = 0,
|
||||
@ -140,8 +141,8 @@ class CommandProgressTrackerImpl(
|
||||
templateId: TypeConName,
|
||||
coid: String,
|
||||
keyOpt: Option[GlobalKeyWithMaintainers],
|
||||
): CommandUpdates.Contract = {
|
||||
CommandUpdates.Contract(
|
||||
): Contract = {
|
||||
Contract(
|
||||
templateId = Some(
|
||||
Identifier(
|
||||
templateId.packageId,
|
||||
|
@ -1284,7 +1284,7 @@ trait MessageDispatcherTest {
|
||||
CantonTimestamp.ofEpochSecond(3),
|
||||
domainId,
|
||||
messageId3,
|
||||
SequencerErrors.SubmissionRequestMalformed("invalid batch"),
|
||||
SequencerErrors.SubmissionRequestRefused("invalid batch"),
|
||||
testedProtocolVersion,
|
||||
Option.empty[TrafficReceipt],
|
||||
)
|
||||
|
@ -28,7 +28,7 @@ import com.digitalasset.canton.protocol.ExampleTransactionFactory.*
|
||||
import com.digitalasset.canton.protocol.WellFormedTransaction.{WithSuffixes, WithoutSuffixes}
|
||||
import com.digitalasset.canton.protocol.*
|
||||
import com.digitalasset.canton.protocol.messages.*
|
||||
import com.digitalasset.canton.sequencing.protocol.{MediatorGroupRecipient, OpenEnvelope, Recipient}
|
||||
import com.digitalasset.canton.sequencing.protocol.{MediatorGroupRecipient, OpenEnvelope}
|
||||
import com.digitalasset.canton.store.SessionKeyStore.RecipientGroup
|
||||
import com.digitalasset.canton.store.SessionKeyStoreWithInMemoryCache
|
||||
import com.digitalasset.canton.topology.MediatorGroup.MediatorGroupIndex
|
||||
@ -246,7 +246,7 @@ class TransactionConfirmationRequestFactoryTest
|
||||
val cryptoPureApi = cryptoSnapshot.pureCrypto
|
||||
val viewEncryptionScheme = cryptoPureApi.defaultSymmetricKeyScheme
|
||||
|
||||
val privateKeysetCache: TrieMap[NonEmpty[Set[Recipient]], SecureRandomness] =
|
||||
val privateKeysetCache: TrieMap[NonEmpty[Set[ParticipantId]], SecureRandomness] =
|
||||
TrieMap.empty
|
||||
|
||||
val expectedTransactionViewMessages = example.transactionViewTreesWithWitnesses.map {
|
||||
@ -307,7 +307,9 @@ class TransactionConfirmationRequestFactoryTest
|
||||
{
|
||||
// simulates session key cache
|
||||
val keySeedSession = privateKeysetCache.getOrElseUpdate(
|
||||
recipients.leafRecipients,
|
||||
NonEmpty
|
||||
.from(participants)
|
||||
.getOrElse(fail("View without active participants of informees")),
|
||||
cryptoPureApi
|
||||
.computeHkdf(
|
||||
cryptoPureApi.generateSecureRandomness(keySeed.unwrap.size()).unwrap,
|
||||
|
@ -52,7 +52,7 @@ trait InFlightSubmissionStoreTest extends AsyncWordSpec with BaseTest {
|
||||
completionInfo,
|
||||
TransactionSubmissionTrackingData.CauseWithTemplate(
|
||||
SequencerErrors
|
||||
.SubmissionRequestMalformed("Some invalid batch")
|
||||
.SubmissionRequestRefused("Some invalid batch")
|
||||
.rpcStatusWithoutLoggingContext()
|
||||
),
|
||||
DomainId.tryFromString("da::default"),
|
||||
|
@ -1 +1 @@
|
||||
20240626.13546.vc5b68d04
|
||||
20240626.13552.v5a246d80
|
||||
|
Loading…
Reference in New Issue
Block a user