update canton to 3.0.0-snapshot.100000000.20240219.12601.0.v2b06a70b (#18521)

* update canton to 3.0.0-snapshot.100000000.20240219.12601.0.v2b06a70b

tell-slack: canton

* fix build

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Paul Brauner <paul.brauner@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-02-20 11:46:02 +00:00 committed by GitHub
parent 532ae367ca
commit 03d3297ebe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
106 changed files with 1036 additions and 916 deletions

View File

@ -787,6 +787,7 @@ scala_library(
":community_ledger_ledger-common",
":community_lib_slick_slick-fork",
":community_lib_wartremover",
":community_reference-driver",
":community_sequencer-driver",
":community_util-external",
":community_util-logging",

View File

@ -31,6 +31,12 @@ message SequencerConnections {
// This field determines the minimum level of agreement, or consensus, required among the sequencers before a message
// is considered reliable and accepted by the system.
// The value set here should not be zero.
// This value must be positive.
uint32 sequencerTrustThreshold = 2;
// This field determines to how many sequencers a client should try to send a submission request that is eligible for deduplication.
// A higher value increases the chance of a submission request being accepted by the system, but also increases the load on the sequencers
// and thus the costs incurred by the client.
// This value must be positive.
uint32 submissionRequestAmplification = 3;
}

View File

@ -9,85 +9,132 @@ import "com/digitalasset/canton/admin/participant/v30/domain_connectivity.proto"
import "google/protobuf/timestamp.proto";
import "scalapb/scalapb.proto";
// Moving ACS from one participant to another
// Perform participant repair operations, including exporting and importing ACS, purging
// contracts, and migrating domains. These operations directly act on the internal state of the participant
// and should be used with caution.
service ParticipantRepairService {
// get contracts for a party
// Export the ACS for the given parties from the participant
rpc ExportAcs(ExportAcsRequest) returns (stream ExportAcsResponse);
// upload contracts for a party
// Import an existing export into the participant
rpc ImportAcs(stream ImportAcsRequest) returns (ImportAcsResponse);
// purge contracts
// Remove the given contracts from the participant
rpc PurgeContracts(PurgeContractsRequest) returns (PurgeContractsResponse);
// Change the assignation of contracts to the given domain
rpc MigrateDomain(MigrateDomainRequest) returns (MigrateDomainResponse);
}
message PurgeContractsRequest {
// Alias of the domain on which the contracts are currently assigned
// Required
string domain = 1;
// Contracts to be purged
// Required
repeated string contract_ids = 2;
// If true, will ignore already purged contract; if false, will return an error if a contract is already purged
// Useful re-run the same request in case of failure in an idempotent fashion
// Optional, `true` by default
bool ignore_already_purged = 3;
}
message PurgeContractsResponse {}
message MigrateDomainRequest {
// the alias of the source domain
// Alias of the domain on which the contracts are currently assigned
// Required
string source_alias = 1;
// the target domain connection configuration
// Configuration to connect to the domain on which the contracts will be assigned as a result of the migration
// Required
DomainConnectionConfig target_domain_connection_config = 2;
}
message MigrateDomainResponse {}
message ExportAcsRequest {
message TargetDomain {
// The ID of the domain where the contract is supposed to be assigned when the export is being imported
string domain_id = 1;
// Protocol version of the target domain
// The protocol version associated to the domain where the contract is supposed to be assigned when the contracts snapshot is being imported
int32 protocol_version = 2;
}
// The parties for which the ACS should be exported
// Required
repeated string parties = 1;
string filter_domain_id = 2; // optional; exact match if set
google.protobuf.Timestamp timestamp = 3; // optional; take acs state for this timestamp
// Optional mapping from one domain id to another to rename the associated domain of contracts in the export
// NOTE: This is not a proper domain migration of contracts.
// The IDs of the domains to filter the contracts by
// Optional, if set the contracts will be filtered by the exact match of the ID of the domain to which it's assigned
string filter_domain_id = 2; // optional; exact match if set
// The timestamp at which the ACS should be valid. If provided, it will be checked that the ACS is
// available at the given timestamp (i.e. the timestamp is considered clean and the ACS has not been
// pruned).
// Optional, if missing the latest clean timestamp will be used
google.protobuf.Timestamp timestamp = 3;
// Mapping from one domain ID and protocol version. If provided, the contracts currently assigned to the domain
// ID in the key will be assigned to the domain ID and protocol version in the value. This is not a proper domain
// migration of contracts and it's supposed to be used only in exceptional cases.
// Optional, if not provided the contracts will be exported with the same domain id as they are currently assigned
map<string, TargetDomain> contract_domain_renames = 4;
// Do not check whether the provided timestamp is clean
// Should not be used in production.
// Note: for this option to yield a consistent snapshot, you need to wait at least
// If true, do not check whether the provided timestamp is clean (see `timestamp` field).
// NOT FOR PRODUCTION USE.
// For this option to yield a consistent snapshot, you need to wait at least
// participantResponseTimeout + mediatorReactionTimeout after the last submitted request.
// Optional, `false` by default
bool force = 5;
// true if the parties will be offboarded after the replication (party migration)
// Optional, `false` by default
bool parties_offboarding = 6;
}
message ExportAcsResponse {
bytes chunk = 1; // a chunk of the acs snapshot download
// The raw binary of the ACS snapshot to be imported
// This is meant to be streamed and every message should be a chunk of the snapshot
// Required
bytes chunk = 1;
}
// Schema definition for the exported ACS snapshot
message ActiveContract {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.UnstableProtoVersion";
// The protocol version of the domain the contract is currently assigned to
// Required
int32 protocol_version = 1;
// The ID of the domain where the contract was assigned at the time of the export
// Required
string domain_id = 2;
// The raw binary containing the contract and its associated metadata
// Required
bytes contract = 3;
// The number of times the contract has been reassigned at the time of the export
// Required
int64 reassignment_counter = 4;
}
message ImportAcsRequest {
// The raw binary of the ACS snapshot to be imported
// Required
bytes acs_snapshot = 1;
// The domain id prefix to be used for the imported contracts
// Optional, if not provided the service will generate a prefix
string workflow_id_prefix = 2;
// If false, the service will fail if any contract ID suffix doesn't match the scheme
// associated to the domain where the contract is being assigned as a result of the import.
// If true, any contract ID suffix will be recomputed to match the scheme associated to the domain.
// Optional, `false` by default
bool allow_contract_id_suffix_recomputation = 3;
}
message ImportAcsResponse {
// Mapping from the old contract id to the new contract id
map<string, string> contract_id_mapping = 1;
}

View File

@ -384,6 +384,7 @@ object ParticipantAdminCommands {
final case class ExportAcs(
parties: Set[PartyId],
partiesOffboarding: Boolean,
filterDomainId: Option[DomainId],
timestamp: Option[Instant],
observer: StreamObserver[ExportAcsResponse],
@ -415,6 +416,7 @@ object ParticipantAdminCommands {
(source.toProtoPrimitive, targetDomain)
},
force = force,
partiesOffboarding = partiesOffboarding,
)
)
}

View File

@ -155,9 +155,9 @@ class CommunityCantonHealthAdministration(override val consoleEnv: ConsoleEnviro
@Help.Summary("Aggregate status info of all participants and domains")
def status(): CommunityCantonStatus = {
CommunityCantonStatus.getStatus(
statusMap[SequencerNodeReference](consoleEnv.sequencersX),
statusMap[MediatorReference](consoleEnv.mediatorsX),
statusMap[ParticipantReference](consoleEnv.participantsX),
statusMap[SequencerNodeReference](consoleEnv.sequencers),
statusMap[MediatorReference](consoleEnv.mediators),
statusMap[ParticipantReference](consoleEnv.participants),
)
}
}

View File

@ -107,12 +107,12 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
lazy val participantHelperItems = {
// due to the use of reflection to grab the help-items, i need to write the following, repetitive stuff explicitly
val subItems =
if (participantsX.local.nonEmpty)
participantsX.local.headOption.toList.flatMap(p =>
if (participants.local.nonEmpty)
participants.local.headOption.toList.flatMap(p =>
Help.getItems(p, baseTopic = Seq("$participant"), scope = scope)
)
else if (participantsX.remote.nonEmpty)
participantsX.remote.headOption.toList.flatMap(p =>
else if (participants.remote.nonEmpty)
participants.remote.headOption.toList.flatMap(p =>
Help.getItems(p, baseTopic = Seq("$participant"), scope = scope)
)
else Seq()
@ -292,7 +292,7 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
Help.Topic(Help.defaultTopLevelTopic),
))
lazy val participantsX: NodeReferences[
lazy val participants: NodeReferences[
ParticipantReference,
RemoteParticipantReference,
LocalParticipantReference,
@ -304,7 +304,7 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
.toSeq,
)
lazy val sequencersX: NodeReferences[
lazy val sequencers: NodeReferences[
SequencerNodeReference,
RemoteSequencerNodeReference,
LocalSequencerNodeReference,
@ -314,7 +314,7 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
environment.config.remoteSequencersByString.keys.map(createRemoteSequencerReference).toSeq,
)
lazy val mediatorsX
lazy val mediators
: NodeReferences[MediatorReference, RemoteMediatorReference, LocalMediatorReference] =
NodeReferences(
environment.config.mediatorsByString.keys.map(createMediatorReference).toSeq,
@ -340,14 +340,14 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
] = {
NodeReferences(
mergeLocalInstances(
participantsX.local,
sequencersX.local,
mediatorsX.local,
participants.local,
sequencers.local,
mediators.local,
),
mergeRemoteInstances(
participantsX.remote,
sequencersX.remote,
mediatorsX.remote,
participants.remote,
sequencers.remote,
mediators.remote,
),
)
}
@ -364,27 +364,27 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
protected def topLevelValues: Seq[TopLevelValue[_]] = {
val nodeTopic = Seq(topicNodeReferences)
val localParticipantXBinds: Seq[TopLevelValue[_]] =
participantsX.local.map(p =>
participants.local.map(p =>
TopLevelValue(p.name, helpText("participant x", p.name), p, nodeTopic)
)
val remoteParticipantXBinds: Seq[TopLevelValue[_]] =
participantsX.remote.map(p =>
participants.remote.map(p =>
TopLevelValue(p.name, helpText("remote participant x", p.name), p, nodeTopic)
)
val localMediatorXBinds: Seq[TopLevelValue[_]] =
mediatorsX.local.map(d =>
mediators.local.map(d =>
TopLevelValue(d.name, helpText("local mediator-x", d.name), d, nodeTopic)
)
val remoteMediatorXBinds: Seq[TopLevelValue[_]] =
mediatorsX.remote.map(d =>
mediators.remote.map(d =>
TopLevelValue(d.name, helpText("remote mediator-x", d.name), d, nodeTopic)
)
val localSequencerXBinds: Seq[TopLevelValue[_]] =
sequencersX.local.map(d =>
sequencers.local.map(d =>
TopLevelValue(d.name, helpText("local sequencer-x", d.name), d, nodeTopic)
)
val remoteSequencerXBinds: Seq[TopLevelValue[_]] =
sequencersX.remote.map(d =>
sequencers.remote.map(d =>
TopLevelValue(d.name, helpText("remote sequencer-x", d.name), d, nodeTopic)
)
val clockBinds: Option[TopLevelValue[_]] =
@ -397,19 +397,19 @@ trait ConsoleEnvironment extends NamedLogging with FlagCloseable with NoTracing
TopLevelValue(
"participantsX",
"All participant x nodes" + genericNodeReferencesDoc,
participantsX,
participants,
referencesTopic,
) :+
TopLevelValue(
"mediatorsX",
"All mediator-x nodes" + genericNodeReferencesDoc,
mediatorsX,
mediators,
referencesTopic,
) :+
TopLevelValue(
"sequencersX",
"All sequencer-x nodes" + genericNodeReferencesDoc,
sequencersX,
sequencers,
referencesTopic,
) :+
TopLevelValue("nodes", "All nodes" + genericNodeReferencesDoc, nodes, referencesTopic)

View File

@ -175,7 +175,7 @@ trait ConsoleMacros extends NamedLogging with NoTracing {
}
val partyAndParticipants =
env.participantsX.all.flatMap(_.parties.list().flatMap(partyIdToParticipant(_).toList))
env.participants.all.flatMap(_.parties.list().flatMap(partyIdToParticipant(_).toList))
val allPartiesSingleParticipant =
partyAndParticipants.groupBy { case (partyId, _) => partyId }.forall {
case (_, participants) => participants.sizeCompare(1) <= 0
@ -205,7 +205,7 @@ trait ConsoleMacros extends NamedLogging with NoTracing {
def participantValue(p: ParticipantReference): String =
if (useParticipantAlias) p.name else p.uid.toProtoPrimitive
val allParticipants = env.participantsX.all
val allParticipants = env.participants.all
val participantsData =
allParticipants.map(p => (participantValue(p), toLedgerApi(p.config))).toMap
val uidToAlias = allParticipants.map(p => (p.id, p.name)).toMap
@ -967,16 +967,16 @@ trait ConsoleMacros extends NamedLogging with NoTracing {
mediators
.filter(!_.health.initialized())
.foreach(
_.setup
.assign(
domainId,
staticDomainParameters,
SequencerConnections.tryMany(
sequencers
.map(s => s.sequencerConnection.withAlias(SequencerAlias.tryCreate(s.name))),
PositiveInt.tryCreate(1),
),
)
_.setup.assign(
domainId,
staticDomainParameters,
SequencerConnections.tryMany(
sequencers
.map(s => s.sequencerConnection.withAlias(SequencerAlias.tryCreate(s.name))),
PositiveInt.one,
PositiveInt.one,
),
)
)
domainId

View File

@ -543,7 +543,7 @@ abstract class ParticipantReference(
val connected = domains.list_connected().map(_.domainId).toSet
// for every participant
consoleEnvironment.participantsX.all
consoleEnvironment.participants.all
.filter(p => p.health.running() && p.health.initialized())
.foreach { participant =>
// for every domain this participant is connected to as well
@ -649,17 +649,17 @@ class LocalParticipantReference(
with LocalInstanceReference
with BaseInspection[ParticipantNodeX] {
override private[console] val nodes = consoleEnvironment.environment.participantsX
override private[console] val nodes = consoleEnvironment.environment.participants
@Help.Summary("Return participant config")
def config: LocalParticipantConfig =
consoleEnvironment.environment.config.participantsByString(name)
override def runningNode: Option[ParticipantNodeBootstrapX] =
consoleEnvironment.environment.participantsX.getRunning(name)
consoleEnvironment.environment.participants.getRunning(name)
override def startingNode: Option[ParticipantNodeBootstrapX] =
consoleEnvironment.environment.participantsX.getStarting(name)
consoleEnvironment.environment.participants.getStarting(name)
/** secret, not publicly documented way to get the admin token */
def adminToken: Option[String] = underlying.map(_.adminToken.secret)
@ -1167,7 +1167,7 @@ class LocalSequencerNodeReference(
.fold(err => sys.error(s"Sequencer $name has invalid connection config: $err"), identity)
private[console] val nodes: SequencerNodesX[?] =
consoleEnvironment.environment.sequencersX
consoleEnvironment.environment.sequencers
override protected[console] def runningNode: Option[SequencerNodeBootstrapX] =
nodes.getRunning(name)
@ -1285,7 +1285,7 @@ class LocalMediatorReference(consoleEnvironment: ConsoleEnvironment, val name: S
override def config: MediatorNodeConfigCommon =
consoleEnvironment.environment.config.mediatorsByString(name)
private[console] val nodes: MediatorNodesX[?] = consoleEnvironment.environment.mediatorsX
private[console] val nodes: MediatorNodesX[?] = consoleEnvironment.environment.mediators
override protected[console] def runningNode: Option[MediatorNodeBootstrapX] =
nodes.getRunning(name)

View File

@ -2297,7 +2297,7 @@ trait LedgerApiAdministration extends BaseLedgerApiAdministration {
// changes during the command's execution. We'll have to live with it for the moment, as there's no convenient
// way to get the record time of the transaction to pass to the parties.list call.
val domainPartiesAndParticipants = {
consoleEnvironment.participantsX.all.iterator
consoleEnvironment.participants.all.iterator
.filter(x => x.health.running() && x.health.initialized() && x.name == name)
.flatMap(_.parties.list(filterDomain = txDomain.filterString))
.toSet
@ -2338,7 +2338,7 @@ trait LedgerApiAdministration extends BaseLedgerApiAdministration {
val involvedConsoleParticipants = cand.participants.mapFilter { pd =>
for {
participantReference <-
consoleEnvironment.participantsX.all
consoleEnvironment.participants.all
.filter(x => x.health.running() && x.health.initialized())
.find(identityIs(_, pd.participant))
_ <- pd.domains.find(_.domain == txDomain)

View File

@ -108,7 +108,8 @@ private[console] object ParticipantCommands {
manualConnect: Boolean = false,
maxRetryDelay: Option[NonNegativeFiniteDuration] = None,
priority: Int = 0,
sequencerTrustThreshold: PositiveInt = PositiveInt.tryCreate(1),
sequencerTrustThreshold: PositiveInt = PositiveInt.one,
submissionRequestAmplification: PositiveInt = PositiveInt.one,
): DomainConnectionConfig = {
DomainConnectionConfig(
domainAlias,
@ -117,6 +118,7 @@ private[console] object ParticipantCommands {
domain.sequencerConnection.withAlias(alias)
},
sequencerTrustThreshold,
submissionRequestAmplification,
),
manualConnect = manualConnect,
None,
@ -1067,7 +1069,8 @@ trait ParticipantAdministration extends FeatureFlagFilter {
synchronize: Option[NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
sequencerTrustThreshold: PositiveInt = PositiveInt.tryCreate(1),
sequencerTrustThreshold: PositiveInt = PositiveInt.one,
submissionRequestAmplification: PositiveInt = PositiveInt.one,
): Unit = {
val config = ParticipantCommands.domains.reference_to_config(
domain,
@ -1076,6 +1079,7 @@ trait ParticipantAdministration extends FeatureFlagFilter {
maxRetryDelayMillis.map(NonNegativeFiniteDuration.tryOfMillis),
priority,
sequencerTrustThreshold,
submissionRequestAmplification,
)
connectFromConfig(config, synchronize)
}

View File

@ -109,13 +109,23 @@ class ParticipantRepairAdministration(
|Note that the 'export_acs' command execution may take a long time to complete and may require significant
|resources.
|
|If `force` is set to true, then the check that the timestamp is clean will not be done.
|For this option to yield a consistent snapshot, you need to wait at least
|participantResponseTimeout + mediatorReactionTimeout after the last submitted request.
|
|The arguments are:
|- parties: identifying contracts having at least one stakeholder from the given set
|- partiesOffboarding: true if the parties will be offboarded (party migration)
|- outputFile: the output file name where to store the data. Use .gz as a suffix to get a compressed file (recommended)
|- filterDomainId: restrict the export to a given domain
|- timestamp: optionally a timestamp for which we should take the state (useful to reconcile states of a domain)
|- contractDomainRenames: As part of the export, allow to rename the associated domain id of contracts from one domain to another based on the mapping.
|- force: if is set to true, then the check that the timestamp is clean will not be done.
| For this option to yield a consistent snapshot, you need to wait at least
| participantResponseTimeout + mediatorReactionTimeout after the last submitted request.
"""
)
def export_acs(
parties: Set[PartyId],
partiesOffboarding: Boolean,
outputFile: String = ParticipantRepairAdministration.ExportAcsDefaultFile,
filterDomainId: Option[DomainId] = None,
timestamp: Option[Instant] = None,
@ -127,6 +137,7 @@ class ParticipantRepairAdministration(
val command = ParticipantAdminCommands.ParticipantRepairManagement
.ExportAcs(
parties,
partiesOffboarding = partiesOffboarding,
filterDomainId,
timestamp,
collector.observer,

View File

@ -413,7 +413,7 @@ object TopologySynchronisationX {
val partiesWithId = partyAssignment.map { case (party, participantRef) =>
(party, participantRef.id)
}
env.sequencersX.all.forall { sequencer =>
env.sequencers.all.forall { sequencer =>
val domainId = sequencer.domain_id
!participant.domains.is_connected(domainId) || {
val timestamp = participant.testing.fetch_domain_time(domainId)

View File

@ -2059,7 +2059,7 @@ class TopologyAdministrationGroup(
synchronize: Option[config.NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
waitForParticipants: Seq[ParticipantReference] = consoleEnvironment.participantsX.all,
waitForParticipants: Seq[ParticipantReference] = consoleEnvironment.participants.all,
force: Boolean = false,
): SignedTopologyTransactionX[TopologyChangeOpX, DomainParametersStateX] = { // TODO(#15815): Don't expose internal TopologyMappingX and TopologyChangeOpX classes
@ -2136,7 +2136,7 @@ class TopologyAdministrationGroup(
synchronize: Option[config.NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
waitForParticipants: Seq[ParticipantReference] = consoleEnvironment.participantsX.all,
waitForParticipants: Seq[ParticipantReference] = consoleEnvironment.participants.all,
force: Boolean = false,
): Unit = {
val domainStore = domainId.filterString

View File

@ -241,7 +241,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
private val testingTimeService = new TestingTimeService(clock, () => simClocks)
lazy val participantsX =
lazy val participants =
new ParticipantNodesX[Config#ParticipantConfigType](
createParticipantX,
migrationsFactory,
@ -251,7 +251,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
loggerFactory,
)
val sequencersX = new SequencerNodesX(
val sequencers = new SequencerNodesX(
createSequencerX,
migrationsFactory,
timeouts,
@ -260,7 +260,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
loggerFactory,
)
val mediatorsX =
val mediators =
new MediatorNodesX(
createMediatorX,
migrationsFactory,
@ -273,7 +273,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
// convenient grouping of all node collections for performing operations
// intentionally defined in the order we'd like to start them
protected def allNodes: List[Nodes[CantonNode, CantonNodeBootstrap[CantonNode]]] =
List(sequencersX, mediatorsX, participantsX)
List(sequencers, mediators, participants)
private def runningNodes: Seq[CantonNodeBootstrap[CantonNode]] = allNodes.flatMap(_.running)
private def autoConnectLocalNodes(): Either[StartupError, Unit] = {
@ -314,7 +314,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
): Unit = {
final case class ParticipantApis(ledgerApi: Int, adminApi: Int)
config.parameters.portsFile.foreach { portsFile =>
val items = participantsX.running.map { node =>
val items = participants.running.map { node =>
(
node.name.unwrap,
ParticipantApis(node.config.ledgerApi.port.unwrap, node.config.adminApi.port.unwrap),
@ -361,7 +361,7 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
config.parameters.timeouts.processing.unbounded.await("reconnect-particiapnts")(
MonadUtil
.parTraverseWithLimit_(config.parameters.getStartupParallelism(numThreads))(
participantsX.running
participants.running
)(reconnect)
.value
)
@ -493,9 +493,9 @@ trait Environment extends NamedLogging with AutoCloseable with NoTracing {
)
private def simClocks: Seq[SimClock] = {
val clocks = clock +: (participantsX.running.map(_.clock) ++ sequencersX.running.map(
val clocks = clock +: (participants.running.map(_.clock) ++ sequencers.running.map(
_.clock
) ++ mediatorsX.running.map(_.clock))
) ++ mediators.running.map(_.clock))
val simclocks = clocks.collect { case sc: SimClock => sc }
if (simclocks.sizeCompare(clocks) < 0)
logger.warn(s"Found non-sim clocks, testing time service will be broken.")

View File

@ -14,7 +14,7 @@ bootstrap.domain(
// The macro is convenient for local testing, but obviously doesn't work in a distributed setup.
participant1.domains.connect_local(sequencer1, alias = "da")
val daPort = Option(System.getProperty("canton-examples.da-port")).getOrElse("5018")
val daPort = Option(System.getProperty("canton-examples.da-port")).getOrElse("5001")
// Connect participant2 to da using just the target URL and a local name we use to refer to this particular
// connection. This is actually everything Canton requires and this second type of connect call can be used

View File

@ -91,9 +91,9 @@ class ConsoleTest extends AnyWordSpec with BaseTest {
when(environment.testingConfig).thenReturn(
TestingConfigInternal(initializeGlobalOpenTelemetry = false)
)
when(environment.participantsX).thenReturn(participants)
when(environment.sequencersX).thenReturn(sequencersX)
when(environment.mediatorsX).thenReturn(mediatorsX)
when(environment.participants).thenReturn(participants)
when(environment.sequencers).thenReturn(sequencersX)
when(environment.mediators).thenReturn(mediatorsX)
when(environment.simClock).thenReturn(None)
when(environment.loggerFactory).thenReturn(loggerFactory)
when(environment.configuredOpenTelemetry).thenReturn(

View File

@ -268,9 +268,9 @@ class CommunityEnvironmentTest extends AnyWordSpec with BaseTest with HasExecuti
// will terminate eagerly. so we actually have to wait until the processes finished
// in the background
eventually() {
environment.sequencersX.running.toSet shouldBe Set(s1, s2)
environment.mediatorsX.running.toSet shouldBe Set(m1, m2)
environment.participantsX.running should contain.only(p1)
environment.sequencers.running.toSet shouldBe Set(s1, s2)
environment.mediators.running.toSet shouldBe Set(m1, m2)
environment.participants.running should contain.only(p1)
}
}
}

View File

@ -44,9 +44,9 @@ sealed trait SimplestPingXCommunityIntegrationTest
sequencer1.health.status shouldBe a[NodeStatus.Success[?]]
mediator1.health.status shouldBe a[NodeStatus.Success[?]]
participantsX.local.start()
participants.local.start()
participantsX.local.domains.connect_local(sequencer1, "da")
participants.local.domains.connect_local(sequencer1, "da")
mediator1.testing
.fetch_domain_time() // Test if the DomainTimeService works for community mediators as well.
participant1.health.ping(participant2)

View File

@ -45,39 +45,40 @@ object HashPurpose {
/* HashPurposes are listed as `val` rather than `case object`s such that they are initialized eagerly.
* This ensures that HashPurpose id clashes are detected eagerly. Otherwise, it may be there are two hash purposes
* with the same id, but they are never used in the same Java process and therefore the clash is not detected.
* NOTE: We're keeping around the old hash purposes (no longer used) to prevent accidental reuse.
*/
val SequencedEventSignature = HashPurpose(1, "SequencedEventSignature")
val Hmac = HashPurpose(2, "Hmac")
val _Hmac = HashPurpose(2, "Hmac")
val MerkleTreeInnerNode = HashPurpose(3, "MerkleTreeInnerNode")
val Discriminator = HashPurpose(4, "Discriminator")
val _Discriminator = HashPurpose(4, "Discriminator")
val SubmitterMetadata = HashPurpose(5, "SubmitterMetadata")
val CommonMetadata = HashPurpose(6, "CommonMetadata")
val ParticipantMetadata = HashPurpose(7, "ParticipantMetadata")
val ViewCommonData = HashPurpose(8, "ViewCommonData")
val ViewParticipantData = HashPurpose(9, "ViewParticipantData")
val MalformedMediatorRequestResult = HashPurpose(10, "MalformedMediatorRequestResult")
val _MalformedMediatorRequestResult = HashPurpose(10, "MalformedMediatorRequestResult")
val TopologyTransactionSignature = HashPurpose(11, "TopologyTransactionSignature")
val PublicKeyFingerprint = HashPurpose(12, "PublicKeyFingerprint")
val DarIdentifier = HashPurpose(13, "DarIdentifier")
val AuthenticationToken = HashPurpose(14, "AuthenticationToken")
val AgreementId = HashPurpose(15, "AgreementId")
val MediatorResponseSignature = HashPurpose(16, "MediatorResponseSignature")
val TransactionResultSignature = HashPurpose(17, "TransactionResultSignature")
val TransferResultSignature = HashPurpose(19, "TransferResultSignature")
val ParticipantStateSignature = HashPurpose(20, "ParticipantStateSignature")
val _AgreementId = HashPurpose(15, "AgreementId")
val _MediatorResponseSignature = HashPurpose(16, "MediatorResponseSignature")
val _TransactionResultSignature = HashPurpose(17, "TransactionResultSignature")
val _TransferResultSignature = HashPurpose(19, "TransferResultSignature")
val _ParticipantStateSignature = HashPurpose(20, "ParticipantStateSignature")
val DomainTopologyTransactionMessageSignature =
HashPurpose(21, "DomainTopologyTransactionMessageSignature")
val AcsCommitment = HashPurpose(22, "AcsCommitment")
val _AcsCommitment = HashPurpose(22, "AcsCommitment")
val Stakeholders = HashPurpose(23, "Stakeholders")
val TransferOutCommonData = HashPurpose(24, "TransferOutCommonData")
val TransferOutView = HashPurpose(25, "TransferOutView")
val TransferInCommonData = HashPurpose(26, "TransferInCommonData")
val TransferInView = HashPurpose(27, "TransferInView")
val TransferViewTreeMessageSeed = HashPurpose(28, "TransferViewTreeMessageSeed")
val _TransferViewTreeMessageSeed = HashPurpose(28, "TransferViewTreeMessageSeed")
val Unicum = HashPurpose(29, "Unicum")
val RepairTransactionId = HashPurpose(30, "RepairTransactionId")
val MediatorLeadershipEvent = HashPurpose(31, "MediatorLeadershipEvent")
val LegalIdentityClaim = HashPurpose(32, "LegalIdentityClaim")
val _MediatorLeadershipEvent = HashPurpose(31, "MediatorLeadershipEvent")
val _LegalIdentityClaim = HashPurpose(32, "LegalIdentityClaim")
val DbLockId = HashPurpose(33, "DbLockId")
val AcsCommitmentDb = HashPurpose(34, "AcsCommitmentDb")
val SubmissionRequestSignature = HashPurpose(35, "SubmissionRequestSignature")
@ -86,5 +87,5 @@ object HashPurpose {
val SignedProtocolMessageSignature = HashPurpose(38, "SignedProtocolMessageSignature")
val AggregationId = HashPurpose(39, "AggregationId")
val BftOrderingPbftBlock = HashPurpose(40, "BftOrderingPbftBlock")
val SetTrafficBalance = HashPurpose(41, "SetTrafficBalance")
val _SetTrafficBalance = HashPurpose(41, "SetTrafficBalance")
}

View File

@ -232,14 +232,13 @@ object TransferInCommonData
}
}
// TODO(#15159) For transfer counter, remove the note that it is defined iff...
/** Aggregates the data of a transfer-in request that is only sent to the involved participants
*
* @param salt The salt to blind the Merkle hash
* @param submitter The submitter of the transfer-in request
* @param creatingTransactionId The id of the transaction that created the contract
* @param contract The contract to be transferred including the instance
* @param creatingTransactionId The id of the transaction that created the contract
* @param transferOutResultEvent The signed deliver event of the transfer-out result message
* @param sourceProtocolVersion Protocol version of the source domain.
* @param transferCounter The [[com.digitalasset.canton.TransferCounter]] of the contract.
*/
final case class TransferInView private (

View File

@ -5,7 +5,6 @@ package com.digitalasset.canton.protocol.messages
import cats.syntax.either.*
import com.digitalasset.canton.ProtoDeserializationError
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.{CantonTimestamp, CantonTimestampSecond}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.messages.SignedProtocolMessageContent.SignedMessageContentCast
@ -146,8 +145,6 @@ abstract sealed case class AcsCommitment private (
getCryptographicEvidence
)
override def hashPurpose: HashPurpose = HashPurpose.AcsCommitment
override lazy val pretty: Pretty[AcsCommitment] = {
prettyOfClass(
param("domainId", _.domainId),

View File

@ -4,7 +4,6 @@
package com.digitalasset.canton.protocol.messages
import cats.syntax.either.*
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.ViewType
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.messages.SignedProtocolMessageContent.SignedMessageContentCast
@ -29,8 +28,7 @@ import com.google.protobuf.ByteString
* @param domainId The domain ID of the mediator
* @param verdict The rejection reason as a verdict
*/
@SuppressWarnings(Array("org.wartremover.warts.FinalCaseClass")) // This class is mocked in tests
case class MalformedMediatorRequestResult private (
final case class MalformedMediatorRequestResult private (
override val requestId: RequestId,
override val domainId: DomainId,
override val viewType: ViewType,
@ -45,8 +43,6 @@ case class MalformedMediatorRequestResult private (
with HasProtocolVersionedWrapper[MalformedMediatorRequestResult]
with PrettyPrinting {
override def hashPurpose: HashPurpose = HashPurpose.MalformedMediatorRequestResult
override protected[messages] def toProtoTypedSomeSignedProtocolMessage
: v30.TypedSignedProtocolMessageContent.SomeSignedProtocolMessage =
v30.TypedSignedProtocolMessageContent.SomeSignedProtocolMessage.MalformedMediatorRequestResult(

View File

@ -7,7 +7,6 @@ import cats.syntax.either.*
import cats.syntax.traverse.*
import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.ProtoDeserializationError.InvariantViolation
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.{CantonTimestamp, ViewPosition}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.messages.MediatorResponse.InvalidMediatorResponse
@ -131,8 +130,6 @@ case class MediatorResponse private (
getCryptographicEvidence
)
override def hashPurpose: HashPurpose = HashPurpose.MediatorResponseSignature
override def pretty: Pretty[this.type] =
prettyOfClass(
param("sender", _.sender),

View File

@ -4,7 +4,6 @@
package com.digitalasset.canton.protocol.messages
import com.digitalasset.canton.config.RequireTypes.NonNegativeLong
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.pretty.PrettyPrinting
import com.digitalasset.canton.protocol.v30
@ -53,7 +52,6 @@ final case class SetTrafficBalanceMessage private (
v30.TypedSignedProtocolMessageContent.SomeSignedProtocolMessage.SetTrafficBalance(
getCryptographicEvidence
)
override def hashPurpose: HashPurpose = HashPurpose.SetTrafficBalance
}
object SetTrafficBalanceMessage

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.protocol.messages
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.v30
@ -22,9 +21,6 @@ trait SignedProtocolMessageContent
protected[messages] def toProtoTypedSomeSignedProtocolMessage
: v30.TypedSignedProtocolMessageContent.SomeSignedProtocolMessage
/** Hash purpose that uniquely identifies the type of message content to be signed. */
def hashPurpose: HashPurpose
/** The timestamp of the [[com.digitalasset.canton.crypto.SyncCryptoApi]] used for signing this message.
* If no timestamp is provided, the head snapshot will be used. This is only used for security tests.
*/

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.protocol.messages
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.ViewType.TransactionViewType
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.messages.SignedProtocolMessageContent.SignedMessageContentCast
@ -73,8 +72,6 @@ case class TransactionResultMessage private (
getCryptographicEvidence
)
override def hashPurpose: HashPurpose = HashPurpose.TransactionResultSignature
override def pretty: Pretty[TransactionResultMessage] =
prettyOfClass(
param("requestId", _.requestId.unwrap),

View File

@ -9,7 +9,6 @@ import cats.syntax.functorFilter.*
import cats.syntax.traverse.*
import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.ProtoDeserializationError.FieldNotSet
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.data.ViewType
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.protocol.TransferDomainId.TransferDomainIdCast
@ -35,8 +34,7 @@ import com.google.protobuf.ByteString
*
* @param requestId timestamp of the corresponding [[TransferOutRequest]] on the source domain
*/
@SuppressWarnings(Array("org.wartremover.warts.FinalCaseClass")) // This class is mocked in tests
case class TransferResult[+Domain <: TransferDomainId] private (
final case class TransferResult[+Domain <: TransferDomainId] private (
override val requestId: RequestId,
informees: Set[LfPartyId],
domain: Domain, // For transfer-out, this is the source domain. For transfer-in, this is the target domain.
@ -78,8 +76,6 @@ case class TransferResult[+Domain <: TransferDomainId] private (
override protected[this] def toByteStringUnmemoized: ByteString =
super[HasProtocolVersionedWrapper].toByteString
override def hashPurpose: HashPurpose = HashPurpose.TransferResultSignature
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
private[TransferResult] def traverse[F[_], Domain2 <: TransferDomainId](
f: Domain => F[Domain2]

View File

@ -62,7 +62,7 @@ import slick.lifted.Aliases
import slick.util.{AsyncExecutor, AsyncExecutorWithMetrics, ClassLoaderUtil}
import java.io.ByteArrayInputStream
import java.sql.{Blob, SQLException, Statement}
import java.sql.{Blob, SQLException, SQLTransientException, Statement}
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
@ -423,6 +423,9 @@ object DbStorage {
final case class PassiveInstanceException(reason: String)
extends RuntimeException(s"DbStorage instance is not active: $reason")
final case class NoConnectionAvailable()
extends SQLTransientException("No free connection available")
sealed trait Profile extends Product with Serializable with PrettyPrinting {
def jdbc: JdbcProfile

View File

@ -3,15 +3,15 @@
package com.digitalasset.canton.sequencing
import cats.Monad
import cats.syntax.either.*
import cats.syntax.foldable.*
import cats.{Id, Monad}
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import com.digitalasset.canton.admin.domain.v30
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.{ParsingResult, parseRequiredNonEmpty}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.version.{
HasVersionedMessageCompanion,
HasVersionedMessageCompanionCommon,
@ -28,6 +28,7 @@ import java.net.URI
final case class SequencerConnections private (
aliasToConnection: NonEmpty[Map[SequencerAlias, SequencerConnection]],
sequencerTrustThreshold: PositiveInt,
submissionRequestAmplification: PositiveInt,
) extends HasVersionedWrapper[SequencerConnections]
with PrettyPrinting {
require(
@ -44,31 +45,12 @@ final case class SequencerConnections private (
def default: SequencerConnection = aliasToConnection.head1._2
/** In case of BFT domain - multiple sequencers are required for proper functioning.
* Some functionalities are only available in non-bft domain.
* When nonBftSetup is false, it means that more than one sequencer connection is provided which doesn't imply a bft domain.
*/
def nonBftSetup: Boolean = aliasToConnection.sizeIs == 1
def connections: NonEmpty[Seq[SequencerConnection]] = aliasToConnection.map(_._2).toSeq
def modify(
sequencerAlias: SequencerAlias,
m: SequencerConnection => SequencerConnection,
): SequencerConnections = {
aliasToConnection
.get(sequencerAlias)
.map { connection =>
SequencerConnections(
aliasToConnection.updated(
sequencerAlias,
m(connection),
),
sequencerTrustThreshold,
)
}
.getOrElse(this)
}
): SequencerConnections = modifyM[Id](sequencerAlias, m)
private def modifyM[M[_]](
sequencerAlias: SequencerAlias,
@ -77,13 +59,12 @@ final case class SequencerConnections private (
aliasToConnection
.get(sequencerAlias)
.map { connection =>
M.map(m(connection)) { x =>
SequencerConnections(
M.map(m(connection)) { newSequencerConnection =>
this.copy(
aliasToConnection.updated(
sequencerAlias,
x,
),
sequencerTrustThreshold,
newSequencerConnection,
)
)
}
}
@ -117,9 +98,14 @@ final case class SequencerConnections private (
prettyOfParam(_.aliasToConnection.forgetNE)
def toProtoV30: v30.SequencerConnections =
new v30.SequencerConnections(connections.map(_.toProtoV30), sequencerTrustThreshold.unwrap)
new v30.SequencerConnections(
connections.map(_.toProtoV30),
sequencerTrustThreshold.unwrap,
submissionRequestAmplification.unwrap,
)
override protected def companionObj: HasVersionedMessageCompanionCommon[SequencerConnections] =
@transient override protected lazy val companionObj
: HasVersionedMessageCompanionCommon[SequencerConnections] =
SequencerConnections
}
@ -129,69 +115,84 @@ object SequencerConnections
def single(connection: SequencerConnection): SequencerConnections =
new SequencerConnections(
NonEmpty.mk(Seq, (connection.sequencerAlias, connection)).toMap,
PositiveInt.tryCreate(1),
aliasToConnection = NonEmpty.mk(Seq, (connection.sequencerAlias, connection)).toMap,
sequencerTrustThreshold = PositiveInt.one,
submissionRequestAmplification = PositiveInt.one,
)
def many(
connections: NonEmpty[Seq[SequencerConnection]],
sequencerTrustThreshold: PositiveInt,
submissionRequestAmplification: PositiveInt,
): Either[String, SequencerConnections] =
if (connections.sizeIs == 1) {
Right(SequencerConnections.single(connections.head1))
} else if (connections.map(_.sequencerAlias).toSet.sizeCompare(connections) < 0) {
val duplicatesAliases = connections.map(_.sequencerAlias).groupBy(identity).collect {
case (alias, aliases) if aliases.lengthCompare(1) > 0 => alias
} else {
val repeatedAliases = connections.groupBy(_.sequencerAlias).filter { case (_, connections) =>
connections.lengthCompare(1) > 0
}
Left(s"Non-unique sequencer aliases detected: $duplicatesAliases")
} else
Either
.catchOnly[IllegalArgumentException](
new SequencerConnections(
connections.map(conn => (conn.sequencerAlias, conn)).toMap,
sequencerTrustThreshold,
)
for {
_ <- Either.cond(
repeatedAliases.isEmpty,
(),
s"Repeated sequencer aliases found: $repeatedAliases",
)
.leftMap(_.getMessage)
sequencerConnections <- Either
.catchOnly[IllegalArgumentException](
new SequencerConnections(
connections.map(conn => (conn.sequencerAlias, conn)).toMap,
sequencerTrustThreshold,
submissionRequestAmplification,
)
)
.leftMap(_.getMessage)
} yield sequencerConnections
}
def tryMany(
connections: Seq[SequencerConnection],
sequencerTrustThreshold: PositiveInt,
): SequencerConnections = {
many(NonEmptyUtil.fromUnsafe(connections), sequencerTrustThreshold).valueOr(err =>
throw new IllegalArgumentException(err)
)
}
private def fromProtoV30(
fieldName: String,
connections: Seq[v30.SequencerConnection],
sequencerTrustThreshold: PositiveInt,
): ParsingResult[SequencerConnections] = for {
sequencerConnectionsNes <- parseRequiredNonEmpty(
SequencerConnection.fromProtoV30,
fieldName,
connections,
)
_ <- Either.cond(
sequencerConnectionsNes.map(_.sequencerAlias).toSet.size == sequencerConnectionsNes.size,
(),
ProtoDeserializationError.ValueConversionError(
fieldName,
"Every sequencer connection must have a unique sequencer alias",
),
)
sequencerConnections <- many(sequencerConnectionsNes, sequencerTrustThreshold).leftMap(
ProtoDeserializationError.InvariantViolation(_)
)
} yield sequencerConnections
submissionRequestAmplification: PositiveInt,
): SequencerConnections =
many(
NonEmptyUtil.fromUnsafe(connections),
sequencerTrustThreshold,
submissionRequestAmplification,
).valueOr(err => throw new IllegalArgumentException(err))
def fromProtoV30(
sequencerConnections: v30.SequencerConnections
): ParsingResult[SequencerConnections] =
ProtoConverter
.parsePositiveInt(sequencerConnections.sequencerTrustThreshold)
.flatMap(fromProtoV30("sequencer_connections", sequencerConnections.sequencerConnections, _))
sequencerConnectionsProto: v30.SequencerConnections
): ParsingResult[SequencerConnections] = {
val v30.SequencerConnections(
sequencerConnectionsP,
sequencerTrustThresholdP,
submissionRequestAmplificationP,
) = sequencerConnectionsProto
for {
sequencerTrustThreshold <- ProtoConverter.parsePositiveInt(sequencerTrustThresholdP)
submissionRequestAmplification <- ProtoConverter.parsePositiveInt(
submissionRequestAmplificationP
)
sequencerConnectionsNes <- ProtoConverter.parseRequiredNonEmpty(
SequencerConnection.fromProtoV30,
"sequencer_connections",
sequencerConnectionsP,
)
_ <- Either.cond(
sequencerConnectionsNes.map(_.sequencerAlias).toSet.size == sequencerConnectionsNes.size,
(),
ProtoDeserializationError.ValueConversionError(
"sequencer_connections",
"Every sequencer connection must have a unique sequencer alias",
),
)
sequencerConnections <- many(
sequencerConnectionsNes,
sequencerTrustThreshold,
submissionRequestAmplification,
).leftMap(ProtoDeserializationError.InvariantViolation(_))
} yield sequencerConnections
}
override def name: String = "sequencer connections"

View File

@ -55,7 +55,7 @@ import com.digitalasset.canton.sequencing.handlers.{
StoreSequencedEvent,
ThrottlingApplicationEventHandler,
}
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.sequencing.protocol.{AggregationRule, *}
import com.digitalasset.canton.store.CursorPrehead.SequencerCounterCursorPrehead
import com.digitalasset.canton.store.SequencedEventStore.PossiblyIgnoredSequencedEvent
import com.digitalasset.canton.store.*
@ -100,6 +100,7 @@ trait SequencerClient extends SequencerClientSend with FlagCloseable {
messageId: MessageId = generateMessageId,
aggregationRule: Option[AggregationRule] = None,
callback: SendCallback = SendCallback.empty,
amplify: Boolean = false,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit]
/** Does the same as [[sendAsync]], except that this method is supposed to be used
@ -244,12 +245,13 @@ abstract class SequencerClientImpl(
override def sendAsyncUnauthenticatedOrNot(
batch: Batch[DefaultOpenEnvelope],
sendType: SendType = SendType.Other,
topologyTimestamp: Option[CantonTimestamp] = None,
maxSequencingTime: CantonTimestamp = generateMaxSequencingTime,
messageId: MessageId = generateMessageId,
aggregationRule: Option[AggregationRule] = None,
callback: SendCallback = SendCallback.empty,
sendType: SendType,
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
amplify: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] = {
member match {
case _: AuthenticatedMember =>
@ -275,12 +277,13 @@ abstract class SequencerClientImpl(
override def sendAsync(
batch: Batch[DefaultOpenEnvelope],
sendType: SendType = SendType.Other,
topologyTimestamp: Option[CantonTimestamp] = None,
maxSequencingTime: CantonTimestamp = generateMaxSequencingTime,
messageId: MessageId = generateMessageId,
aggregationRule: Option[AggregationRule] = None,
callback: SendCallback = SendCallback.empty,
sendType: SendType,
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
amplify: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] =
for {
_ <- EitherT.cond[Future](
@ -312,6 +315,7 @@ abstract class SequencerClientImpl(
messageId,
aggregationRule,
callback,
amplify,
)
} yield result
@ -337,12 +341,13 @@ abstract class SequencerClientImpl(
batch,
requiresAuthentication = false,
sendType,
// Requests involving unauthenticated members must not specify a signing key
None,
maxSequencingTime,
messageId,
None,
callback,
// Requests involving unauthenticated members must not specify a topology timestamp
topologyTimestamp = None,
maxSequencingTime = maxSequencingTime,
messageId = messageId,
aggregationRule = None,
callback = callback,
amplify = false,
)
private def checkRequestSize(
@ -371,6 +376,7 @@ abstract class SequencerClientImpl(
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
amplify: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] =
withSpan("SequencerClient.sendAsync") { implicit traceContext => span =>
val requestE = SubmissionRequest
@ -409,14 +415,13 @@ abstract class SequencerClientImpl(
case _: ParticipantId => true
case _ => false
}
val domainParamsF =
EitherTUtil.fromFuture(
domainParametersLookup.getApproximateOrDefaultValue(warnOnUsingDefaults),
throwable =>
SendAsyncClientError.RequestFailed(
s"failed to retrieve maxRequestSize because ${throwable.getMessage}"
),
)
val domainParamsF = EitherTUtil.fromFuture(
domainParametersLookup.getApproximateOrDefaultValue(warnOnUsingDefaults),
throwable =>
SendAsyncClientError.RequestFailed(
s"failed to retrieve maxRequestSize because ${throwable.getMessage}"
),
)
def trackSend: EitherT[Future, SendAsyncClientError, Unit] =
sendTracker
@ -454,7 +459,7 @@ abstract class SequencerClientImpl(
_ <- EitherT.fromEither[Future](checkRequestSize(request, domainParams.maxRequestSize))
_ <- trackSend
_ = recorderO.foreach(_.recordSubmission(request))
_ <- performSend(messageId, request, requiresAuthentication)
_ <- performSend(messageId, request, requiresAuthentication, amplify)
} yield ()
}
}
@ -465,22 +470,49 @@ abstract class SequencerClientImpl(
messageId: MessageId,
request: SubmissionRequest,
requiresAuthentication: Boolean,
amplify: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] = {
EitherTUtil
.timed(metrics.submissions.sends) {
val timeout = timeouts.network.duration
if (requiresAuthentication) {
val (transports, amplifiableRequest) = if (amplify) {
val transports = sequencersTransportState.amplifiedTransports
val amplifiableRequest = if (transports.sizeIs > 1 && request.aggregationRule.isEmpty) {
val aggregationRule =
AggregationRule(NonEmpty(Seq, member), PositiveInt.one, protocolVersion)
logger.debug(
s"Adding aggregation rule $aggregationRule to submission request with message ID $messageId"
)
request.copy(aggregationRule = aggregationRule.some)
} else request
(transports, amplifiableRequest)
} else { (NonEmpty(Seq, sequencersTransportState.transport), request) }
for {
signedContent <- requestSigner
.signRequest(request, HashPurpose.SubmissionRequestSignature)
.signRequest(amplifiableRequest, HashPurpose.SubmissionRequestSignature)
.leftMap { err =>
val message = s"Error signing submission request $err"
logger.error(message)
SendAsyncClientError.RequestRefused(SendAsyncError.RequestRefused(message))
}
_ <- sequencersTransportState.transport.sendAsyncSigned(signedContent, timeout)
sendResults <- EitherT.right(
transports.toNEF.parTraverse(_.sendAsyncSigned(signedContent, timeout).value)
)
(errors, successes) = sendResults.forgetNE.separate
_ <- EitherT.cond[Future](
successes.nonEmpty,
(),
errors match {
case Seq(single) => single
case multiple =>
SendAsyncClientError.RequestFailed(
s"Failed to send submission request to any sequencer: ${multiple.mkString(", ")}"
)
},
)
} yield ()
} else
sequencersTransportState.transport.sendAsyncUnauthenticatedVersioned(request, timeout)
}
@ -1695,6 +1727,7 @@ object SequencerClient {
final case class SequencerTransports[E](
sequencerToTransportMap: NonEmpty[Map[SequencerAlias, SequencerTransportContainer[E]]],
sequencerTrustThreshold: PositiveInt,
submissionRequestAmplification: PositiveInt,
) {
def expectedSequencers: NonEmpty[Set[SequencerId]] =
sequencerToTransportMap.map(_._2.sequencerId).toSet
@ -1716,6 +1749,7 @@ object SequencerClient {
],
expectedSequencers: NonEmpty[Map[SequencerAlias, SequencerId]],
sequencerSignatureThreshold: PositiveInt,
submissionRequestAmplification: PositiveInt,
): Either[String, SequencerTransports[E]] =
if (sequencerTransportsMap.keySet != expectedSequencers.keySet) {
Left("Inconsistent map of sequencer transports and their ids.")
@ -1728,6 +1762,7 @@ object SequencerClient {
sequencerAlias -> SequencerTransportContainer(sequencerId, transport)
}.toMap,
sequencerTrustThreshold = sequencerSignatureThreshold,
submissionRequestAmplification = submissionRequestAmplification,
)
)
@ -1743,7 +1778,8 @@ object SequencerClient {
sequencerAlias -> SequencerTransportContainer(sequencerId, transport),
)
.toMap,
PositiveInt.tryCreate(1),
PositiveInt.one,
PositiveInt.one,
)
def default[E](

View File

@ -127,6 +127,7 @@ object SequencerClientFactory {
sequencerTransportsMap,
expectedSequencers,
sequencerConnections.sequencerTrustThreshold,
sequencerConnections.submissionRequestAmplification,
)
)

View File

@ -48,6 +48,7 @@ trait SequencerClientSend {
messageId: MessageId = generateMessageId,
aggregationRule: Option[AggregationRule] = None,
callback: SendCallback = SendCallback.empty,
amplify: Boolean = false,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit]
/** Provides a value for max-sequencing-time to use for `sendAsync` if no better application provided timeout is available.

View File

@ -5,6 +5,7 @@ package com.digitalasset.canton.sequencing.client
import cats.implicits.catsSyntaxFlatten
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.DiscardOps
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.PositiveInt
@ -25,10 +26,9 @@ import com.digitalasset.canton.sequencing.client.transports.{
import com.digitalasset.canton.topology.SequencerId
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.*
import com.digitalasset.canton.util.{ErrorUtil, MonadUtil}
import com.digitalasset.canton.util.{ErrorUtil, MonadUtil, SeqUtil}
import java.util.concurrent.atomic.AtomicReference
import scala.collection.compat.immutable.ArraySeq
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
import scala.util.{Failure, Random, Success, Try}
@ -43,6 +43,14 @@ trait SequencerTransportLookup {
*/
def transport(implicit traceContext: TraceContext): SequencerClientTransportCommon
/** Similar to `transport` except that it returns as many [[com.digitalasset.canton.sequencing.client.transports.SequencerClientTransportCommon]]
* as currently configured in [[com.digitalasset.canton.sequencing.client.SequencerClient.SequencerTransports.submissionRequestAmplification]],
* capped by the number of healthy subscriptions.
*/
def amplifiedTransports(implicit
traceContext: TraceContext
): NonEmpty[Seq[SequencerClientTransportCommon]]
/** Returns the transport for the given [[com.digitalasset.canton.topology.SequencerId]].
*
* @throws java.lang.IllegalArgumentException if the [[com.digitalasset.canton.topology.SequencerId]] currently has not transport
@ -75,7 +83,12 @@ class SequencersTransportState(
private val sequencerTrustThreshold =
new AtomicReference[PositiveInt](initialSequencerTransports.sequencerTrustThreshold)
def getSequencerTrustThreshold = sequencerTrustThreshold.get()
def getSequencerTrustThreshold: PositiveInt = sequencerTrustThreshold.get()
private val submissionRequestAmplification =
new AtomicReference[PositiveInt](initialSequencerTransports.submissionRequestAmplification)
def getSubmissionRequestAmplification: PositiveInt = submissionRequestAmplification.get()
blocking(lock.synchronized {
val sequencerIdToTransportStateMap = initialSequencerTransports.sequencerIdToTransportMap.map {
@ -114,28 +127,55 @@ class SequencersTransportState(
override def transport(implicit traceContext: TraceContext): SequencerClientTransportCommon =
blocking(lock.synchronized {
// Pick a random healthy sequencer to send to.
// We can use a plain Random instance across all threads calling this method,
// because this method anyway uses locking on its own.
// (In general, ThreadLocalRandom would void contention on the random number generation, but
// the plain Random has the advantage that we can hard-code the seed so that the chosen sequencers
// are easier to reproduce for debugging and tests.)
val healthySequencers = state.view
.collect { case (_sequencerId, state) if state.isSubscriptionHealthy => state }
.to(ArraySeq)
val chosenSequencer =
if (healthySequencers.isEmpty)
// TODO(i12377): Can we fallback to first sequencer transport here or should we
// introduce EitherT and propagate error handling?
state.values.headOption
.getOrElse(
// TODO(i12377): Error handling
ErrorUtil.invalidState("No sequencer subscription at the moment. Try again later.")
)
else healthySequencers(random.nextInt(healthySequencers.size))
chosenSequencer.transport.clientTransport
transportInternal(PositiveInt.one).head1
})
override def amplifiedTransports(implicit
traceContext: TraceContext
): NonEmpty[Seq[SequencerClientTransportCommon]] =
blocking(lock.synchronized {
transportInternal(submissionRequestAmplification.get())
})
/** Pick `amount` many random healthy sequencer connections.
* If are no healthy sequencers, returns a single unhealthy sequencer connection.
* Must only be called inside a `lock.synchronized` block.
*/
private[this] def transportInternal(
amount: PositiveInt
)(implicit traceContext: TraceContext): NonEmpty[Seq[SequencerClientTransportCommon]] = {
// We can use a plain Random instance across all threads calling this method,
// because this method anyway uses locking on its own.
// (In general, ThreadLocalRandom would void contention on the random number generation, but
// the plain Random has the advantage that we can hard-code the seed so that the chosen sequencers
// are easier to reproduce for debugging and tests.)
val healthySequencers = state.view
.collect { case (_sequencerId, state) if state.isSubscriptionHealthy => state }
// Use a `Vector` so that we get fast updates when picking the random subset
.to(Vector)
val chosen = SeqUtil
.randomSubsetShuffle(healthySequencers, amount.unwrap, random)
.map(_.transport.clientTransport)
NonEmpty.from(chosen) match {
case Some(ne) => ne
case None => NonEmpty(Seq, pickUnhealthySequencer)
}
}
private[this] def pickUnhealthySequencer(implicit
traceContext: TraceContext
): SequencerClientTransportCommon = {
// TODO(i12377): Can we fallback to first sequencer transport here or should we
// introduce EitherT and propagate error handling?
state.values.headOption
.getOrElse(
// TODO(i12377): Error handling
ErrorUtil.invalidState("No sequencer subscription at the moment. Try again later.")
)
.transport
.clientTransport
}
override def transport(sequencerId: SequencerId)(implicit
traceContext: TraceContext
): UnlessShutdown[SequencerClientTransport] =
@ -181,6 +221,7 @@ class SequencersTransportState(
sequencerTransports: SequencerTransports[?]
)(implicit traceContext: TraceContext): Future[Unit] = blocking(lock.synchronized {
sequencerTrustThreshold.set(sequencerTransports.sequencerTrustThreshold)
submissionRequestAmplification.set(sequencerTransports.submissionRequestAmplification)
val oldSequencerIds = state.keySet.toSet
val newSequencerIds = sequencerTransports.sequencerIdToTransportMap.keySet

View File

@ -139,6 +139,8 @@ object TimeProof {
// so instead we just use the maximum value allowed.
maxSequencingTime = CantonTimestamp.MaxValue,
messageId = mkTimeProofRequestMessageId,
// Do not amplify because max sequencing time is set to MaxValue and therefore will exceed the aggregation time bound
amplify = false,
)
/** Use a constant prefix for a message which would permit the sequencer to track how many

View File

@ -352,7 +352,7 @@ object DomainMember {
*/
final case class MediatorGroup(
index: MediatorGroupIndex,
active: Seq[MediatorId],
active: NonEmpty[Seq[MediatorId]],
passive: Seq[MediatorId],
threshold: PositiveInt,
) {

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.topology
import cats.data.EitherT
import cats.instances.seq.*
import cats.syntax.foldable.*
import cats.syntax.functor.*
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.crypto.CryptoPureApi
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
@ -32,7 +33,7 @@ import com.digitalasset.canton.topology.transaction.{
TopologyMappingXChecks,
}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ErrorUtil
import com.digitalasset.canton.util.{EitherTUtil, ErrorUtil}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.concurrent.TrieMap
@ -378,28 +379,30 @@ class TopologyStateProcessorX(
val (isMerge, tx_deduplicatedAndMerged) =
mergeSignatures(tx_inStore, tx_mergedProposalSignatures)
val ret = for {
// Run mapping specific semantic checks
_ <- topologyMappingXChecks.checkTransaction(effective, tx_deduplicatedAndMerged, tx_inStore)
_ <-
// we potentially merge the transaction with the currently active if this is just a signature update
// now, check if the serial is monotonically increasing
if (isMerge) {
EitherTUtil.unit[TopologyTransactionRejection]
} else {
EitherT.fromEither[Future](
serialIsMonotonicallyIncreasing(tx_inStore, tx_deduplicatedAndMerged).void
)
}
// we check if the transaction is properly authorized given the current topology state
// if it is a proposal, then we demand that all signatures are appropriate (but
// not necessarily sufficient)
tx_authorized <- transactionIsAuthorized(
// !THIS CHECK NEEDS TO BE THE LAST CHECK! because the transaction authorization validator
// will update its internal cache. If a transaction then gets rejected afterwards, the cache
// is corrupted.
fullyValidated <- transactionIsAuthorized(
effective,
tx_inStore,
tx_deduplicatedAndMerged,
expectFullAuthorization,
)
// Run mapping specific semantic checks
_ <- topologyMappingXChecks.checkTransaction(effective, tx_authorized, tx_inStore)
// we potentially merge the transaction with the currently active if this is just a signature update
// now, check if the serial is monotonically increasing
fullyValidated <-
if (isMerge)
EitherT.rightT[Future, TopologyTransactionRejection](tx_authorized)
else {
EitherT.fromEither[Future](
serialIsMonotonicallyIncreasing(tx_inStore, tx_authorized).map(_ => tx_authorized)
)
}
} yield fullyValidated
ret.fold(
// TODO(#12390) emit appropriate log message and use correct rejection reason

View File

@ -239,7 +239,7 @@ trait PartyTopologySnapshotClient {
/** Returns Right if all parties have at least an active participant passing the check. Otherwise, all parties not passing are passed as Left */
def allHaveActiveParticipants(
parties: Set[LfPartyId],
check: (ParticipantPermission => Boolean) = _.isActive,
check: (ParticipantPermission => Boolean) = _ => true,
)(implicit traceContext: TraceContext): EitherT[Future, Set[LfPartyId], Unit]
/** Returns the consortium thresholds (how many votes from different participants that host the consortium party
@ -272,7 +272,7 @@ trait PartyTopologySnapshotClient {
def allHostedOn(
partyIds: Set[LfPartyId],
participantId: ParticipantId,
permissionCheck: ParticipantAttributes => Boolean = _.permission.isActive,
permissionCheck: ParticipantAttributes => Boolean = _ => true,
)(implicit traceContext: TraceContext): Future[Boolean]
/** Returns whether a participant can confirm on behalf of a party. */
@ -692,14 +692,14 @@ private[client] trait ParticipantTopologySnapshotLoader extends ParticipantTopol
override def isParticipantActive(participantId: ParticipantId)(implicit
traceContext: TraceContext
): Future[Boolean] =
findParticipantState(participantId).map(_.exists(_.permission.isActive))
findParticipantState(participantId).map(_.isDefined)
override def isParticipantActiveAndCanLoginAt(
participantId: ParticipantId,
timestamp: CantonTimestamp,
)(implicit traceContext: TraceContext): Future[Boolean] =
findParticipantState(participantId).map { attributesO =>
attributesO.exists(attr => attr.permission.isActive && attr.loginAfter.forall(_ <= timestamp))
attributesO.exists(_.loginAfter.forall(_ <= timestamp))
}
final def findParticipantState(participantId: ParticipantId)(implicit
@ -720,14 +720,14 @@ private[client] trait PartyTopologySnapshotBaseClient {
override def allHaveActiveParticipants(
parties: Set[LfPartyId],
check: (ParticipantPermission => Boolean) = _.isActive,
check: (ParticipantPermission => Boolean) = _ => true,
)(implicit traceContext: TraceContext): EitherT[Future, Set[LfPartyId], Unit] = {
val fetchedF = activeParticipantsOfPartiesWithAttributes(parties.toSeq)
EitherT(
fetchedF
.map { fetched =>
fetched.foldLeft(Set.empty[LfPartyId]) { case (acc, (party, relationships)) =>
if (relationships.exists(x => check(x._2.permission)))
if (relationships.exists { case (_, attributes) => check(attributes.permission) })
acc
else acc + party
}
@ -763,7 +763,7 @@ private[client] trait PartyTopologySnapshotBaseClient {
override def allHostedOn(
partyIds: Set[LfPartyId],
participantId: ParticipantId,
permissionCheck: ParticipantAttributes => Boolean = _.permission.isActive,
permissionCheck: ParticipantAttributes => Boolean = _ => true,
)(implicit traceContext: TraceContext): Future[Boolean] =
hostedOn(partyIds, participantId).map(partiesWithAttributes =>
partiesWithAttributes.view

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.topology.processing
import cats.Monoid
import cats.data.EitherT
import cats.syntax.parallel.*
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.crypto.CryptoPureApi
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
@ -330,12 +331,18 @@ class IncomingTopologyTransactionAuthorizationValidatorX(
private def processDecentralizedNamespaceDefinition(
op: TopologyChangeOpX,
tx: AuthorizedDecentralizedNamespaceDefinitionX,
): (Boolean, () => Unit) = {
)(implicit traceContext: TraceContext): (Boolean, () => Unit) = {
val decentralizedNamespace = tx.mapping.namespace
val dnsGraph = decentralizedNamespaceCache
.get(decentralizedNamespace)
.map { case (_, dnsGraph) => dnsGraph }
.getOrElse {
val serialToValidate = tx.signedTransaction.transaction.serial
if (serialToValidate > PositiveInt.one) {
logger.warn(
s"decentralizedNamespaceCache did not contain namespace $decentralizedNamespace even though the serial to validate is $serialToValidate"
)
}
val directDecentralizedNamespaceGraph = namespaceCache.getOrElseUpdate(
decentralizedNamespace,
new AuthorizationGraphX(

View File

@ -159,7 +159,7 @@ class DbTopologyStoreX[StoreId <: TopologyStoreId](
) ++ removeTxs.map(txHash => sql"tx_hash=${txHash.hash.toLengthLimitedHexString}")
lazy val updateRemovals =
(sql"UPDATE topology_transactions_x SET valid_until = ${Some(effectiveTs)} WHERE store_id=$transactionStoreIdName AND (" ++
(sql"UPDATE topology_transactions SET valid_until = ${Some(effectiveTs)} WHERE store_id=$transactionStoreIdName AND (" ++
transactionRemovals
.intercalate(
sql" OR "
@ -206,7 +206,7 @@ class DbTopologyStoreX[StoreId <: TopologyStoreId](
)
val query =
sql"SELECT instance, sequenced, valid_from, valid_until, rejection_reason FROM topology_transactions_x WHERE store_id = $transactionStoreIdName ORDER BY id"
sql"SELECT instance, sequenced, valid_from, valid_until, rejection_reason FROM topology_transactions WHERE store_id = $transactionStoreIdName ORDER BY id"
val entriesF =
storage
@ -585,15 +585,18 @@ class DbTopologyStoreX[StoreId <: TopologyStoreId](
}
}
// TODO(#14061): Decide whether we want additional indices by mapping_key_hash and tx_hash (e.g. for update/removal and lookups)
// TODO(#14061): Come up with columns/indexing for efficient ParticipantId => Seq[PartyId] lookup
// TODO(#12390) should mapping_key_hash rather be tx_hash?
storage.profile match {
case _: DbStorage.Profile.Postgres | _: DbStorage.Profile.H2 =>
(sql"""INSERT INTO topology_transactions_x (store_id, sequenced, valid_from, valid_until, transaction_type, namespace,
(sql"""INSERT INTO topology_transactions (store_id, sequenced, valid_from, valid_until, transaction_type, namespace,
identifier, mapping_key_hash, serial_counter, operation, instance, tx_hash, is_proposal, rejection_reason, representative_protocol_version, hash_of_signatures) VALUES""" ++
transactions
.map(sqlTransactionParameters)
.toList
.intercalate(sql", ")
++ sql" ON CONFLICT DO NOTHING" // idempotency-"conflict" based on topology_transactions_x unique constraint
++ sql" ON CONFLICT DO NOTHING" // idempotency-"conflict" based on topology_transactions unique constraint
).asUpdate
case _: DbStorage.Profile.Oracle =>
throw new IllegalStateException("Oracle not supported by daml 3.0/X yet")
@ -700,7 +703,7 @@ class DbTopologyStoreX[StoreId <: TopologyStoreId](
): Future[GenericStoredTopologyTransactionsX] = {
val mapping = transaction.mapping
queryForTransactions(
// Query for leading fields of `topology_transactions_x_idx` to enable use of this index
// Query for leading fields of `topology_transactions_idx` to enable use of this index
sql" AND transaction_type = ${mapping.code} AND namespace = ${mapping.namespace} AND identifier = ${mapping.maybeUid
.fold(String185.empty)(_.id.toLengthLimitedString)}"
++ sql" AND valid_from < $asOfExclusive"
@ -724,7 +727,7 @@ class DbTopologyStoreX[StoreId <: TopologyStoreId](
traceContext: TraceContext
): Future[GenericStoredTopologyTransactionsX] = {
val query =
sql"SELECT instance, sequenced, valid_from, valid_until FROM topology_transactions_x WHERE store_id = $transactionStoreIdName" ++
sql"SELECT instance, sequenced, valid_from, valid_until FROM topology_transactions WHERE store_id = $transactionStoreIdName" ++
subQuery ++ (if (!includeRejected) sql" AND rejection_reason IS NULL"
else sql"") ++ sql" #${orderBy} #${limit}"
storage

View File

@ -28,7 +28,6 @@ final case class ParticipantAttributes(
*/
sealed trait ParticipantPermission extends Product with Serializable {
def canConfirm: Boolean = false // can confirm transactions
def isActive: Boolean = true // can receive messages
val level: Byte // used for serialization and ordering.
def toProtoEnum: v30.Enums.ParticipantPermission

View File

@ -18,7 +18,7 @@ import com.digitalasset.canton.protocol.messages.{
SignedProtocolMessage,
}
import com.digitalasset.canton.sequencing.TrafficControlParameters
import com.digitalasset.canton.sequencing.client.{SendCallback, SendResult, SequencerClient}
import com.digitalasset.canton.sequencing.client.{SendCallback, SendResult, SequencerClientSend}
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.{DomainId, Member}
@ -53,7 +53,7 @@ class TrafficBalanceSubmissionHandler(
protocolVersion: ProtocolVersion,
serial: NonNegativeLong,
totalTrafficBalance: NonNegativeLong,
sequencerClient: SequencerClient,
sequencerClient: SequencerClientSend,
cryptoApi: DomainSyncCryptoClient,
)(implicit
ec: ExecutionContext,
@ -134,7 +134,7 @@ class TrafficBalanceSubmissionHandler(
}
private def sendRequest(
sequencerClient: SequencerClient,
sequencerClient: SequencerClientSend,
batch: Batch[DefaultOpenEnvelope],
aggregationRule: AggregationRule,
maxSequencingTime: CantonTimestamp,

View File

@ -126,6 +126,13 @@ object MonadUtil {
*/
def sequentialTraverseMonoid[M[_], A, B](
xs: immutable.Iterable[A]
)(step: A => M[B])(implicit monad: Monad[M], monoid: Monoid[B]): M[B] =
sequentialTraverseMonoid(xs.iterator)(step)
/** Conceptually equivalent to `sequentialTraverse(xs)(step).map(monoid.combineAll)`.
*/
def sequentialTraverseMonoid[M[_], A, B](
xs: Iterator[A]
)(step: A => M[B])(implicit monad: Monad[M], monoid: Monoid[B]): M[B] =
foldLeftM[M, B, A](monoid.empty, xs) { (acc, x) =>
monad.map(step(x))(monoid.combine(acc, _))

View File

@ -6,6 +6,7 @@ package com.digitalasset.canton.util
import com.daml.nonempty.{NonEmpty, NonEmptyUtil}
import scala.annotation.tailrec
import scala.util.Random
object SeqUtil {
@ -40,4 +41,32 @@ object SeqUtil {
go(xs)
grouped.result()
}
/** Picks a random subset of indices of size `size` from `xs` and returns a random permutation of the elements
* at these indices.
* Implements the Fisher-Yates shuffle (https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle)
* with a separate output array.
*
* Note: The inside-out version of Fisher-Yates would not have to modify the input sequence,
* but doesn't work for picking a proper subset because it would be equivalent to shuffling only the prefix of length size.
*/
def randomSubsetShuffle[A](xs: IndexedSeq[A], size: Int, random: Random): Seq[A] = {
val outputSize = xs.size min size max 0
val output = Seq.newBuilder[A]
output.sizeHint(outputSize)
@tailrec def go(lowerBound: Int, remainingElems: IndexedSeq[A]): Unit = {
if (lowerBound >= outputSize) ()
else {
val index = random.nextInt(xs.size - lowerBound)
val chosen = remainingElems(lowerBound + index)
output.addOne(chosen)
val newRemainingElems =
remainingElems.updated(lowerBound + index, remainingElems(lowerBound))
go(lowerBound + 1, newRemainingElems)
}
}
go(0, xs)
output.result()
}
}

View File

@ -234,9 +234,10 @@ abstract class RetryWithDelay(
)
} else {
val level = retryLogLevel.getOrElse {
if (totalRetries < complainAfterRetries || totalMaxRetries != Int.MaxValue)
Level.INFO
else Level.WARN
if (totalRetries < complainAfterRetries || totalMaxRetries != Int.MaxValue) {
// Check if a different log level has been configured by default for the outcome, otherwise log to INFO
retryable.retryLogLevel(outcome).getOrElse(Level.INFO)
} else Level.WARN
}
val change = if (errorKind == lastErrorKind) {
""

View File

@ -6,9 +6,12 @@ package com.digitalasset.canton.util.retry
import com.digitalasset.canton.DiscardOps
import com.digitalasset.canton.logging.{ErrorLoggingContext, TracedLogger}
import com.digitalasset.canton.resource.DatabaseStorageError.DatabaseStorageDegradation.DatabaseTaskRejected
import com.digitalasset.canton.resource.DbStorage.NoConnectionAvailable
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.LoggerUtil
import com.digitalasset.canton.util.TryUtil.ForFailedOps
import org.postgresql.util.PSQLException
import org.slf4j.event.Level
import java.sql.*
import scala.annotation.tailrec
@ -32,15 +35,32 @@ object RetryUtil {
protected def logThrowable(e: Throwable, logger: TracedLogger)(implicit
traceContext: TraceContext
): Unit = e match {
case sqlE: SQLException =>
// Unfortunately, the sql state and error code won't get logged automatically.
logger.info(
s"Detected an SQLException. SQL state: ${sqlE.getSQLState}, error code: ${sqlE.getErrorCode}",
e,
)
case _: Throwable =>
logger.info(s"Detected an error.", e)
): Unit = {
val level = retryLogLevel(e).getOrElse(Level.INFO)
implicit val errorLoggingContext: ErrorLoggingContext =
ErrorLoggingContext.fromTracedLogger(logger)
e match {
case sqlE: SQLException =>
// Unfortunately, the sql state and error code won't get logged automatically.
LoggerUtil.logThrowableAtLevel(
level,
s"Detected an SQLException. SQL state: ${sqlE.getSQLState}, error code: ${sqlE.getErrorCode}",
e,
)
case _: Throwable =>
LoggerUtil.logThrowableAtLevel(level, s"Detected an error.", e)
}
}
/** Return an optional log level to log an exception with.
*
* This allows to override the log level for particular exceptions on retry globally.
*/
def retryLogLevel(e: Throwable): Option[Level] = None
def retryLogLevel(outcome: Try[Any]): Option[Level] = outcome match {
case Failure(exception) => retryLogLevel(exception)
case util.Success(_value) => None
}
}
@ -246,6 +266,13 @@ object RetryUtil {
case _ => FatalErrorKind
}
override def retryLogLevel(e: Throwable): Option[Level] = e match {
case _: NoConnectionAvailable =>
// Avoid log noise if no connection is available either due to contention or a temporary network problem
Some(Level.DEBUG)
case _ => None
}
}
/** Retry on any exception.

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: CantonExamples

View File

@ -1 +1 @@
18535d6b7bd08a5d7366432c7e0cd34579a8eca55616f6f5f0b0551101c6784b
c25a15982b0ff960030606813a98b7527589ec03e3f897ee98f602c7373fc1cc

View File

@ -69,7 +69,6 @@ create table contracts (
contract_salt binary large object,
-- Metadata: signatories, stakeholders, keys
-- Stored as a Protobuf blob as H2 will only support typed arrays in 1.4.201
-- TODO(#3256): change when H2 is upgraded
metadata binary large object not null,
-- The ledger time when the contract was created.
ledger_create_time varchar(300) not null,
@ -776,7 +775,7 @@ create table sequencer_domain_configuration (
);
CREATE TABLE pruning_schedules(
-- node_type is one of "PAR", "MED", or "SEQ"
-- node_type is one of "MED", or "SEQ"
-- since mediator and sequencer sometimes share the same db
node_type varchar(3) not null primary key,
cron varchar(300) not null,
@ -812,8 +811,6 @@ CREATE TABLE participant_pruning_schedules (
prune_internally_only boolean NOT NULL DEFAULT false -- whether to prune only canton-internal stores not visible to ledger api
);
-- TODO(#15155) Move this to stable when releasing BFT: BEGIN
CREATE TABLE in_flight_aggregation(
aggregation_id varchar(300) not null primary key,
-- UTC timestamp in microseconds relative to EPOCH
@ -835,7 +832,7 @@ CREATE TABLE in_flight_aggregated_sender(
);
-- stores the topology-x state transactions
CREATE TABLE topology_transactions_x (
CREATE TABLE topology_transactions (
-- serial identifier used to preserve insertion order
id bigserial not null primary key,
-- the id of the store
@ -883,12 +880,10 @@ CREATE TABLE topology_transactions_x (
-- tx_hash but different signatures
hash_of_signatures varchar(300) not null,
-- index used for idempotency during crash recovery
-- TODO(#12390) should mapping_key_hash rather be tx_hash?
unique (store_id, mapping_key_hash, serial_counter, valid_from, operation, representative_protocol_version, hash_of_signatures)
);
CREATE INDEX topology_transactions_x_idx ON topology_transactions_x (store_id, transaction_type, namespace, identifier, valid_until, valid_from);
-- TODO(#14061): Decide whether we want additional indices by mapping_key_hash and tx_hash (e.g. for update/removal and lookups)
-- TODO(#14061): Come up with columns/indexing for efficient ParticipantId => Seq[PartyId] lookup
CREATE INDEX topology_transactions_idx ON topology_transactions (store_id, transaction_type, namespace, identifier, valid_until, valid_from);
-- update the sequencer_state_manager_events to store traffic information per event
-- this will be needed to re-hydrate the sequencer from a specific point in time deterministically

View File

@ -1 +1 @@
6d177be973e97d3314c6f3d58bc96443011b5bf91f772e5d0f89bb92fbd640a9
22d205ad669ad143af6e779d2ee51d4a1c4a468d2530e86e2d27c01920fabd2f

View File

@ -73,7 +73,6 @@ create table contracts (
instance bytea not null,
-- Metadata: signatories, stakeholders, keys
-- Stored as a Protobuf blob as H2 will only support typed arrays in 1.4.201
-- TODO(#3256): change when H2 is upgraded
metadata bytea not null,
-- The ledger time when the contract was created.
ledger_create_time varchar(300) collate "C" not null,
@ -828,7 +827,7 @@ create table mediator_deduplication_store (
create index idx_mediator_deduplication_store_expire_after on mediator_deduplication_store(expire_after, mediator_id);
CREATE TABLE pruning_schedules(
-- node_type is one of "PAR", "MED", or "SEQ"
-- node_type is one of "MED", or "SEQ"
-- since mediator and sequencer sometimes share the same db
node_type varchar(3) collate "C" not null primary key,
cron varchar(300) collate "C" not null,
@ -836,8 +835,6 @@ CREATE TABLE pruning_schedules(
retention bigint not null -- positive number of seconds
);
-- TODO(#15155) Move this to stable when releasing BFT: BEGIN
CREATE TABLE in_flight_aggregation(
aggregation_id varchar(300) collate "C" not null primary key,
-- UTC timestamp in microseconds relative to EPOCH
@ -859,7 +856,7 @@ CREATE TABLE in_flight_aggregated_sender(
);
-- stores the topology-x state transactions
CREATE TABLE topology_transactions_x (
CREATE TABLE topology_transactions (
-- serial identifier used to preserve insertion order
id bigserial not null primary key,
-- the id of the store
@ -907,17 +904,10 @@ CREATE TABLE topology_transactions_x (
-- tx_hash but different signatures
hash_of_signatures varchar(300) collate "C" not null,
-- index used for idempotency during crash recovery
-- TODO(#12390) should mapping_key_hash rather be tx_hash?
unique (store_id, mapping_key_hash, serial_counter, valid_from, operation, representative_protocol_version, hash_of_signatures)
);
CREATE INDEX topology_transactions_x_idx ON topology_transactions_x (store_id, transaction_type, namespace, identifier, valid_until, valid_from);
-- TODO(#14061): Decide whether we want additional indices by mapping_key_hash and tx_hash (e.g. for update/removal and lookups)
-- TODO(#14061): Come up with columns/indexing for efficient ParticipantId => Seq[PartyId] lookup
CREATE INDEX topology_transactions_idx ON topology_transactions (store_id, transaction_type, namespace, identifier, valid_until, valid_from);
-- TODO(#15155) Move this to stable when releasing BFT: END
-- TODO(#13104) Move traffic control to stable release: BEGIN
-- update the sequencer_state_manager_events to store traffic information per event
-- this will be needed to re-hydrate the sequencer from a specific point in time deterministically
-- adds extra traffic remainder at the time of the event
@ -970,7 +960,6 @@ create table top_up_events (
);
create index top_up_events_idx ON top_up_events (member);
-- TODO(#13104) Move traffic control to stable release: END
-- BFT Ordering Tables

View File

@ -142,6 +142,8 @@ class DomainTopologyServiceX(
maxSequencingTime =
clock.now.add(topologyXConfig.topologyTransactionRegistrationTimeout.toInternal.duration),
callback = sendCallback,
// Do not amplify because we are running our own retry loop here anyway
amplify = false,
),
s"Failed sending topology transaction broadcast: ${request}",
)

View File

@ -119,6 +119,7 @@ class SequencerInfoLoader(
.many(
nonEmptyResult.map(_.connection),
sequencerConnections.sequencerTrustThreshold,
sequencerConnections.submissionRequestAmplification,
)
.leftMap(SequencerInfoLoaderError.FailedToConnectToSequencers)
.map(connections =>

View File

@ -5,7 +5,7 @@ package com.digitalasset.canton.data
import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.config.RequireTypes.NonNegativeLong
import com.digitalasset.canton.crypto.{GeneratorsCrypto, HashPurpose, Salt, TestHash}
import com.digitalasset.canton.crypto.{GeneratorsCrypto, Salt, TestHash}
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.protocol.messages.{
DeliveredTransferOutResult,
@ -151,7 +151,10 @@ final class GeneratorsTransferData(
SignedProtocolMessage.from(
result,
protocolVersion,
GeneratorsCrypto.sign("TransferOutResult-mediator", HashPurpose.TransferResultSignature),
GeneratorsCrypto.sign(
"TransferOutResult-mediator",
TestHash.testHashPurpose,
),
)
recipients <- recipientsArb.arbitrary
@ -163,7 +166,7 @@ final class GeneratorsTransferData(
} yield DeliveredTransferOutResult {
SignedContent(
deliver,
sign("TransferOutResult-sequencer", HashPurpose.TransferResultSignature),
sign("TransferOutResult-sequencer", TestHash.testHashPurpose),
Some(transferOutTimestamp),
protocolVersion,
)

View File

@ -1,39 +0,0 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.protocol.messages
import com.digitalasset.canton.BaseTest
import com.digitalasset.canton.crypto.HashPurpose
import com.digitalasset.canton.protocol.TransferDomainId
import org.scalatest.wordspec.AnyWordSpec
import scala.collection.mutable
class SignedProtocolMessageContentTest extends AnyWordSpec with BaseTest {
"hashPurpose" must {
"be different for each subclass" in {
val subclasses = Seq[SignedProtocolMessageContent](
mock[MediatorResponse],
mock[TransactionResultMessage],
mock[TransferResult[TransferDomainId]],
mock[AcsCommitment],
mock[MalformedMediatorRequestResult],
)
subclasses.foreach(subclass => when(subclass.hashPurpose).thenCallRealMethod())
val hashPurposes = mutable.Set.empty[HashPurpose]
subclasses.foreach { subclass =>
val hashPurpose = subclass.hashPurpose
assert(
hashPurposes.add(hashPurpose),
s"Clash on hash purpose ID $hashPurpose for ${HashPurpose.description(hashPurpose)}",
)
}
}
}
}

View File

@ -37,9 +37,12 @@ object GeneratorsSequencing {
.nonEmptySetGen[SequencerConnection]
.map(_.toSeq)
.map(_.distinctBy(_.sequencerAlias))
sequencerTrustThreshold <- Gen
.choose(1, connections.size)
.map(PositiveInt.tryCreate)
} yield SequencerConnections.tryMany(connections, sequencerTrustThreshold)
sequencerTrustThreshold <- Gen.choose(1, connections.size).map(PositiveInt.tryCreate)
submissionRequestAmplification <- Gen.choose(1, connections.size).map(PositiveInt.tryCreate)
} yield SequencerConnections.tryMany(
connections,
sequencerTrustThreshold,
submissionRequestAmplification,
)
)
}

View File

@ -30,6 +30,7 @@ class TestSequencerClientSend extends SequencerClientSend {
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
amplify: Boolean,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] = {
requestsQueue.add(
Request(batch, sendType, topologyTimestamp, maxSequencingTime, messageId, aggregationRule)

View File

@ -29,9 +29,9 @@ trait DbTopologyStoreXHelper {
import storage.api.*
storage.update(
DBIO.seq(
sqlu"delete from topology_transactions_x where store_id=${storeId.dbString}"
sqlu"delete from topology_transactions where store_id=${storeId.dbString}"
),
operationName = s"${this.getClass}: Delete topology_transactions_x for ${storeId.dbString}",
operationName = s"${this.getClass}: Delete topology_transactions for ${storeId.dbString}",
)
}

View File

@ -20,7 +20,7 @@ import com.digitalasset.canton.sequencing.client.{
SendCallback,
SendResult,
SendType,
SequencerClient,
SequencerClientSend,
}
import com.digitalasset.canton.sequencing.protocol.{SequencersOfDomain, *}
import com.digitalasset.canton.time.SimClock
@ -49,7 +49,7 @@ class TrafficBalanceSubmissionHandlerTest
with ProtocolVersionChecksAnyWordSpec {
private val recipient1 = DefaultTestIdentities.participant1.member
private val sequencerClient = mock[SequencerClient]
private val sequencerClient = mock[SequencerClientSend]
private val domainId = DomainId.tryFromString("da::default")
private val clock = new SimClock(loggerFactory = loggerFactory)
private val trafficParams = TrafficControlParameters()
@ -91,6 +91,7 @@ class TrafficBalanceSubmissionHandlerTest
any[MessageId],
aggregationRuleCapture.capture(),
callbackCapture.capture(),
any[Boolean],
)(any[TraceContext])
).thenReturn(EitherT.pure(()))
@ -172,6 +173,7 @@ class TrafficBalanceSubmissionHandlerTest
any[MessageId],
any[Option[AggregationRule]],
callbackCapture.capture(),
any[Boolean],
)(any[TraceContext])
).thenReturn(EitherT.pure(()))
@ -224,6 +226,7 @@ class TrafficBalanceSubmissionHandlerTest
any[MessageId],
any[Option[AggregationRule]],
any[SendCallback],
any[Boolean],
)(any[TraceContext])
)
.thenReturn(EitherT.leftT(SendAsyncClientError.RequestFailed("failed")))
@ -259,6 +262,7 @@ class TrafficBalanceSubmissionHandlerTest
any[MessageId],
any[Option[AggregationRule]],
callbackCapture.capture(),
any[Boolean],
)(any[TraceContext])
)
.thenReturn(EitherT.pure(()))
@ -310,6 +314,7 @@ class TrafficBalanceSubmissionHandlerTest
any[MessageId],
any[Option[AggregationRule]],
callbackCapture.capture(),
any[Boolean],
)(any[TraceContext])
)
.thenReturn(EitherT.pure(()))

View File

@ -8,6 +8,7 @@ import com.digitalasset.canton.BaseTest
import org.scalatest.wordspec.AnyWordSpec
import scala.annotation.tailrec
import scala.util.Random
class SeqUtilTest extends AnyWordSpec with BaseTest {
@ -48,4 +49,42 @@ class SeqUtilTest extends AnyWordSpec with BaseTest {
) shouldBe Seq(NonEmpty(Seq, 1, 2, 3, 4, 5), NonEmpty(Seq, 6, 7, 8, 9, 10))
}
}
"randomSubsetShuffle" should {
"pick a random subset of the given size" in {
val iterations = 1000
for { i <- 1 to iterations } {
val subset = SeqUtil.randomSubsetShuffle(1 to 100, 30, new Random(i))
subset should have size 30
subset.distinct shouldBe subset
(1 to 1000) should contain allElementsOf subset
}
}
"deal with tiny subsets" in {
val random = new Random(123)
val subsetSize1 = SeqUtil.randomSubsetShuffle(1 to 1000, 1, random)
subsetSize1 should have size 1
val subsetSize0 = SeqUtil.randomSubsetShuffle(1 to 1000, 0, random)
subsetSize0 should have size 0
}
"deal with large subsets" in {
val random = new Random(345)
val subsetSize999 = SeqUtil.randomSubsetShuffle(1 to 1000, 999, random)
subsetSize999 should have size 999
subsetSize999.distinct shouldBe subsetSize999
(1 to 1000) should contain allElementsOf subsetSize999
val fullShuffle = SeqUtil.randomSubsetShuffle(1 to 1000, 1000, random)
fullShuffle.sorted shouldBe (1 to 1000)
}
"cap the size" in {
val random = new Random(678)
val more = SeqUtil.randomSubsetShuffle(1 to 10, 20, random)
more.sorted shouldBe (1 to 10)
}
}
}

View File

@ -14,11 +14,14 @@ import com.digitalasset.canton.lifecycle.{
PerformUnlessClosing,
UnlessShutdown,
}
import com.digitalasset.canton.logging.{SuppressionRule, TracedLogger}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.Thereafter.syntax.*
import com.digitalasset.canton.util.retry.Jitter.RandomSource
import com.digitalasset.canton.util.retry.RetryUtil.{
AllExnRetryable,
DbExceptionRetryable,
ExceptionRetryable,
NoExnRetryable,
}
import com.digitalasset.canton.util.{DelayUtil, FutureUtil}
@ -30,6 +33,7 @@ import java.util.Random
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference}
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success as TrySuccess, Try}
class PolicyTest extends AsyncFunSpec with BaseTest with HasExecutorService {
@ -147,6 +151,8 @@ class PolicyTest extends AsyncFunSpec with BaseTest with HasExecutorService {
testSuspend(maxRetries =>
suspend => Directly(logger, flagCloseable, maxRetries, "op", suspendRetries = suspend)
)
testExceptionLogging(Directly(logger, flagCloseable, 3, "op"))
}
describe("retry.Pause") {
@ -211,6 +217,8 @@ class PolicyTest extends AsyncFunSpec with BaseTest with HasExecutorService {
testSuspend(maxRetries =>
suspend => Pause(logger, flagCloseable, maxRetries, 5.millis, "op", suspendRetries = suspend)
)
testExceptionLogging(Pause(logger, flagCloseable, 3, 1.millis, "op"))
}
describe("retry.Backoff") {
@ -285,6 +293,8 @@ class PolicyTest extends AsyncFunSpec with BaseTest with HasExecutorService {
suspendRetries = suspend,
)
)
testExceptionLogging(Backoff(logger, flagCloseable, 3, 1.millis, 1.millis, "op"))
}
def testJitterBackoff(name: String, algoCreator: FiniteDuration => Jitter): Unit = {
@ -807,4 +817,45 @@ class PolicyTest extends AsyncFunSpec with BaseTest with HasExecutorService {
}
}
}
def testExceptionLogging(policy: => Policy): Unit = {
it("should log an exception with the configured retry log level") {
// We don't care about the success criteria as we always throw an exception
implicit val success: Success[Any] = Success.always
case class TestException() extends RuntimeException("test exception")
val retryable = new ExceptionRetryable() {
override def retryOK(
outcome: Try[_],
logger: TracedLogger,
lastErrorKind: Option[RetryUtil.ErrorKind],
)(implicit tc: TraceContext): RetryUtil.ErrorKind = RetryUtil.TransientErrorKind
override def retryLogLevel(e: Throwable): Option[Level] = e match {
case TestException() => Some(Level.WARN)
case _ => None
}
}
loggerFactory
.assertLogsSeq(SuppressionRule.Level(Level.WARN))(
policy(Future.failed(TestException()), retryable),
{ entries =>
forEvery(entries) { e =>
e.warningMessage should (include(
"The operation 'op' has failed with an exception"
) or include("Now retrying operation 'op'"))
}
},
)
.transform {
case Failure(TestException()) =>
logger.debug("Retry terminated with expected exception")
TrySuccess(succeed)
case result => result
}
}
}
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: medical-records

View File

@ -603,20 +603,20 @@ object ReferenceDemoScript {
): Unit = {
def getSequencer(str: String): SequencerNodeReference =
consoleEnvironment.sequencersX.all
consoleEnvironment.sequencers.all
.find(_.name == str)
.getOrElse(sys.error(s"can not find domain named ${str}"))
val bankingSequencers = consoleEnvironment.sequencersX.all.filter(_.name == SequencerBanking)
val bankingMediators = consoleEnvironment.mediatorsX.all.filter(_.name == "mediatorBanking")
val bankingSequencers = consoleEnvironment.sequencers.all.filter(_.name == SequencerBanking)
val bankingMediators = consoleEnvironment.mediators.all.filter(_.name == "mediatorBanking")
val bankingDomainId = ConsoleMacros.bootstrap.domain(
domainName = SequencerBanking,
sequencers = bankingSequencers,
mediators = bankingMediators,
domainOwners = bankingSequencers ++ bankingMediators,
)
val medicalSequencers = consoleEnvironment.sequencersX.all.filter(_.name == SequencerMedical)
val medicalMediators = consoleEnvironment.mediatorsX.all.filter(_.name == "mediatorMedical")
val medicalSequencers = consoleEnvironment.sequencers.all.filter(_.name == SequencerMedical)
val medicalMediators = consoleEnvironment.mediators.all.filter(_.name == "mediatorMedical")
val medicalDomainId = ConsoleMacros.bootstrap.domain(
domainName = SequencerMedical,
sequencers = medicalSequencers,
@ -650,7 +650,7 @@ object ReferenceDemoScript {
)
val script = new ReferenceDemoScript(
consoleEnvironment.participantsX.all,
consoleEnvironment.participants.all,
bankingConnection,
medicalConnection,
location,

View File

@ -1177,9 +1177,7 @@ class BlockUpdateGenerator(
case InFlightAggregation.AlreadyDelivered(deliveredAt) =>
val message =
s"The aggregatable request with aggregation ID $aggregationId was previously delivered at $deliveredAt"
refuse(
SequencerErrors.AggregateSubmissionAlreadySent(message)
)
refuse(SequencerErrors.AggregateSubmissionAlreadySent(message))
case InFlightAggregation.AggregationStuffing(_, at) =>
val message =
s"The sender ${submissionRequest.sender} previously contributed to the aggregatable submission with ID $aggregationId at $at"
@ -1248,7 +1246,7 @@ class BlockUpdateGenerator(
.merge
} yield groups
.map(group =>
MediatorsOfDomain(group.index) -> (group.active ++ group.passive)
MediatorsOfDomain(group.index) -> (group.active.forgetNE ++ group.passive)
.toSet[Member]
)
.toMap[GroupRecipient, Set[Member]]
@ -1369,7 +1367,7 @@ class BlockUpdateGenerator(
}
} yield groups
.map(group =>
MediatorsOfDomain(group.index) -> (group.active ++ group.passive)
MediatorsOfDomain(group.index) -> (group.active.forgetNE ++ group.passive)
.toSet[Member]
)
.toMap[GroupRecipient, Set[Member]]

View File

@ -171,6 +171,7 @@ private[mediator] class DefaultVerdictSender(
callback = callback,
maxSequencingTime = decisionTime,
aggregationRule = aggregationRule,
amplify = true,
)
} else {
logger.info(

View File

@ -35,7 +35,7 @@ import com.digitalasset.canton.domain.sequencing.traffic.{
import com.digitalasset.canton.domain.server.DynamicDomainGrpcServer
import com.digitalasset.canton.environment.*
import com.digitalasset.canton.health.{ComponentStatus, GrpcHealthReporter, HealthService}
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, HasCloseContext}
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, HasCloseContext, Lifecycle}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.protocol.DomainParameters.MaxRequestSize
import com.digitalasset.canton.protocol.StaticDomainParameters
@ -95,6 +95,8 @@ class SequencerNodeBootstrapX(
Sequencer,
NamedLoggerFactory,
) => Option[ServerServiceDefinition],
// Allow to pass in additional resources that need to be closed as part of the node bootstrap closing
closeables: Seq[AutoCloseable] = Seq.empty,
)(implicit
executionContext: ExecutionContextIdlenessExecutorService,
scheduler: ScheduledExecutorService,
@ -606,6 +608,7 @@ class SequencerNodeBootstrapX(
(healthService.dependencies ++ sequencerPublicApiHealthService.dependencies).map(
_.toComponentStatus
),
closeables,
)
addCloseable(node)
Some(new RunningNode(bootstrapStageCallback, node))
@ -625,6 +628,7 @@ class SequencerNodeX(
loggerFactory: NamedLoggerFactory,
sequencerNodeServer: DynamicDomainGrpcServer,
components: => Seq[ComponentStatus],
closeables: Seq[AutoCloseable],
)(implicit executionContext: ExecutionContextExecutorService)
extends SequencerNodeCommon(
config,
@ -639,5 +643,7 @@ class SequencerNodeX(
override def close(): Unit = {
super.close()
// TODO(#17222): Close the additional resources (e.g. KMS) last to avoid crypto usage after shutdown
Lifecycle.close(closeables*)(logger)
}
}

View File

@ -20,7 +20,7 @@ final case class CommunitySequencerNodeXConfig(
override val adminApi: CommunityAdminServerConfig = CommunityAdminServerConfig(),
override val storage: CommunityStorageConfig = CommunityStorageConfig.Memory(),
override val crypto: CommunityCryptoConfig = CommunityCryptoConfig(),
override val sequencer: CommunitySequencerConfig = CommunitySequencerConfig.Database(),
override val sequencer: CommunitySequencerConfig = CommunitySequencerConfig.default,
override val auditLogging: Boolean = false,
override val timeTracker: DomainTimeTrackerConfig = DomainTimeTrackerConfig(),
override val sequencerClient: SequencerClientConfig = SequencerClientConfig(),

View File

@ -3,8 +3,12 @@
package com.digitalasset.canton.domain.sequencing.sequencer
import com.digitalasset.canton.config.NonNegativeFiniteDuration
import com.digitalasset.canton.config.{CommunityStorageConfig, NonNegativeFiniteDuration}
import com.digitalasset.canton.domain.sequencing.sequencer.DatabaseSequencerConfig.TestingInterceptor
import com.digitalasset.canton.domain.sequencing.sequencer.reference.{
CommunityReferenceSequencerDriverFactory,
ReferenceBlockOrderer,
}
import com.digitalasset.canton.time.Clock
import pureconfig.ConfigCursor
@ -62,6 +66,20 @@ object CommunitySequencerConfig {
) extends CommunitySequencerConfig {
override def supportsReplicas: Boolean = false
}
def default: CommunitySequencerConfig = {
val driverFactory = new CommunityReferenceSequencerDriverFactory
External(
driverFactory.name,
ConfigCursor(
driverFactory
.configWriter(confidential = false)
.to(ReferenceBlockOrderer.Config(storage = CommunityStorageConfig.Memory())),
List(),
),
None,
)
}
}
/** Health check related sequencer config

View File

@ -184,6 +184,7 @@ object GrpcSequencerConnectionService {
sequencerTransportsMap,
newEndpointsInfo.expectedSequencers,
newEndpointsInfo.sequencerConnections.sequencerTrustThreshold,
newEndpointsInfo.sequencerConnections.submissionRequestAmplification,
)
)

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.domain.mediator
import cats.data.EitherT
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.*
import com.digitalasset.canton.config.CachingConfigs
@ -23,19 +22,13 @@ import com.digitalasset.canton.logging.LogEntry
import com.digitalasset.canton.protocol.*
import com.digitalasset.canton.protocol.messages.Verdict.{Approve, MediatorReject}
import com.digitalasset.canton.protocol.messages.*
import com.digitalasset.canton.sequencing.client.{
SendAsyncClientError,
SendCallback,
SendType,
SequencerClientSend,
}
import com.digitalasset.canton.sequencing.client.TestSequencerClientSend
import com.digitalasset.canton.sequencing.protocol.*
import com.digitalasset.canton.time.{Clock, DomainTimeTracker, NonNegativeFiniteDuration}
import com.digitalasset.canton.topology.MediatorGroup.MediatorGroupIndex
import com.digitalasset.canton.topology.*
import com.digitalasset.canton.topology.client.TopologySnapshot
import com.digitalasset.canton.topology.transaction.*
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.MonadUtil.{sequentialTraverse, sequentialTraverse_}
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.version.HasTestCloseContext
@ -64,21 +57,18 @@ class ConfirmationResponseProcessorTestV5
protected val passiveMediator3 = MediatorId(UniqueIdentifier.tryCreate("mediator", "three"))
protected val activeMediator4 = MediatorId(UniqueIdentifier.tryCreate("mediator", "four"))
private def mediatorGroup0(mediators: MediatorId*) =
private def mediatorGroup0(mediators: NonEmpty[Seq[MediatorId]]) =
MediatorGroup(MediatorGroupIndex.zero, mediators, Seq.empty, PositiveInt.one)
protected val mediatorGroup: MediatorGroup = MediatorGroup(
index = MediatorGroupIndex.zero,
active = Seq(
activeMediator1,
activeMediator2,
),
active = NonEmpty.mk(Seq, activeMediator1, activeMediator2),
passive = Seq(passiveMediator3),
threshold = PositiveInt.tryCreate(2),
)
protected val mediatorGroup2: MediatorGroup = MediatorGroup(
index = MediatorGroupIndex.one,
active = Seq(activeMediator4),
active = NonEmpty.mk(Seq, activeMediator4),
passive = Seq.empty,
threshold = PositiveInt.one,
)
@ -166,7 +156,8 @@ class ConfirmationResponseProcessorTestV5
private lazy val identityFactory3: TestingIdentityFactoryBase = {
val otherMediatorId = MediatorId(UniqueIdentifier.tryCreate("mediator", "other"))
val topology3 = topology.copy(mediatorGroups = Set(mediatorGroup0(otherMediatorId)))
val topology3 =
topology.copy(mediatorGroups = Set(mediatorGroup0(NonEmpty.mk(Seq, otherMediatorId))))
TestingIdentityFactoryX(
topology3,
loggerFactory,
@ -181,7 +172,7 @@ class ConfirmationResponseProcessorTestV5
Map(
submitter -> Map(participant1 -> ParticipantPermission.Confirmation)
),
Set(mediatorGroup0(mediatorId)),
Set(mediatorGroup0(NonEmpty.mk(Seq, mediatorId))),
sequencerGroup,
),
loggerFactory,
@ -197,30 +188,10 @@ class ConfirmationResponseProcessorTestV5
protected lazy val decisionTime = requestIdTs.plusSeconds(120)
class Fixture(syncCryptoApi: DomainSyncCryptoClient = domainSyncCryptoApi) {
val interceptedBatchesQueue: java.util.concurrent.BlockingQueue[
(Batch[DefaultOpenEnvelope], Option[AggregationRule])
] =
new java.util.concurrent.LinkedBlockingQueue()
private val sequencerSend: SequencerClientSend = new SequencerClientSend {
override def sendAsync(
batch: Batch[DefaultOpenEnvelope],
sendType: SendType,
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] = {
interceptedBatchesQueue.add((batch, aggregationRule))
EitherT.pure(())
}
override def generateMaxSequencingTime: CantonTimestamp = ???
}
private val sequencerSend: TestSequencerClientSend = new TestSequencerClientSend
def interceptedBatches: Iterable[Batch[DefaultOpenEnvelope]] =
interceptedBatchesQueue.asScala.map(_._1)
sequencerSend.requestsQueue.asScala.map(_.batch)
val verdictSender: TestVerdictSender =
new TestVerdictSender(

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.domain.mediator
import cats.data.EitherT
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.crypto.{DomainSyncCryptoClient, Signature}
@ -17,18 +16,12 @@ import com.digitalasset.canton.protocol.messages.{
Verdict,
}
import com.digitalasset.canton.protocol.{ExampleTransactionFactory, RequestId, TestDomainParameters}
import com.digitalasset.canton.sequencing.client.{
SendAsyncClientError,
SendCallback,
SendType,
SequencerClientSend,
}
import com.digitalasset.canton.sequencing.client.TestSequencerClientSend
import com.digitalasset.canton.sequencing.protocol.{
AggregationRule,
Batch,
MediatorsOfDomain,
MemberRecipient,
MessageId,
OpenEnvelope,
Recipients,
}
@ -43,12 +36,12 @@ import com.digitalasset.canton.topology.{
TestingTopologyX,
UniqueIdentifier,
}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{BaseTest, ProtocolVersionChecksAsyncWordSpec}
import org.scalatest.wordspec.AsyncWordSpec
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
class DefaultVerdictSenderTest
extends AsyncWordSpec
@ -62,10 +55,7 @@ class DefaultVerdictSenderTest
private val mediatorGroupRecipient = MediatorsOfDomain(MediatorGroupIndex.zero)
private val mediatorGroup: MediatorGroup = MediatorGroup(
index = mediatorGroupRecipient.group,
active = Seq(
activeMediator1,
activeMediator2,
),
active = NonEmpty.mk(Seq, activeMediator1, activeMediator2),
passive = Seq(
passiveMediator3
),
@ -238,7 +228,14 @@ class DefaultVerdictSenderTest
observer ->
Map(participant -> ParticipantPermission.Observation),
),
Set(MediatorGroup(MediatorGroupIndex.zero, Seq(mediatorId), Seq.empty, PositiveInt.one)),
Set(
MediatorGroup(
MediatorGroupIndex.zero,
NonEmpty.mk(Seq, mediatorId),
Seq.empty,
PositiveInt.one,
)
),
)
val identityFactory = TestingIdentityFactoryX(
@ -250,28 +247,15 @@ class DefaultVerdictSenderTest
identityFactory.forOwnerAndDomain(mediatorId, domainId)
}
val interceptedMessages: java.util.concurrent.BlockingQueue[
(Batch[DefaultOpenEnvelope], Option[AggregationRule])
] =
new java.util.concurrent.LinkedBlockingQueue()
private val sequencerClientSend: TestSequencerClientSend = new TestSequencerClientSend
def interceptedMessages: Seq[(Batch[DefaultOpenEnvelope], Option[AggregationRule])] =
sequencerClientSend.requestsQueue.asScala.map { request =>
(request.batch, request.aggregationRule)
}.toSeq
val verdictSender = new DefaultVerdictSender(
new SequencerClientSend {
override def sendAsync(
batch: Batch[DefaultOpenEnvelope],
sendType: SendType,
topologyTimestamp: Option[CantonTimestamp],
maxSequencingTime: CantonTimestamp,
messageId: MessageId,
aggregationRule: Option[AggregationRule],
callback: SendCallback,
)(implicit traceContext: TraceContext): EitherT[Future, SendAsyncClientError, Unit] = {
interceptedMessages.add((batch, aggregationRule))
EitherT.pure(())
}
override def generateMaxSequencingTime: CantonTimestamp = ???
},
sequencerClientSend,
domainSyncCryptoApi,
mediatorId,
testedProtocolVersion,

View File

@ -18,46 +18,46 @@ trait ConsoleEnvironmentTestHelpers[+CE <: ConsoleEnvironment] { this: CE =>
// helpers for creating participant, sequencer, and mediator references by name
// unknown names will throw
def lpx(name: String): LocalParticipantReference = participantsX.local
def lpx(name: String): LocalParticipantReference = participants.local
.find(_.name == name)
.getOrElse(sys.error(s"participant [$name] not configured"))
def rpx(name: String): RemoteParticipantReference =
participantsX.remote
participants.remote
.find(_.name == name)
.getOrElse(sys.error(s"remote participant [$name] not configured"))
def px(name: String): ParticipantReference = participantsX.all
def px(name: String): ParticipantReference = participants.all
.find(_.name == name)
.getOrElse(sys.error(s"neither local nor remote participant [$name] is configured"))
def sx(name: String): SequencerNodeReference =
sequencersX.all
sequencers.all
.find(_.name == name)
.getOrElse(sys.error(s"sequencer [$name] not configured"))
def lsx(name: String): LocalSequencerNodeReference =
sequencersX.local
sequencers.local
.find(_.name == name)
.getOrElse(sys.error(s"local sequencer [$name] not configured"))
def rsx(name: String): RemoteSequencerNodeReference =
sequencersX.remote
sequencers.remote
.find(_.name == name)
.getOrElse(sys.error(s"remote sequencer [$name] not configured"))
def mx(name: String): LocalMediatorReference =
mediatorsX.local
mediators.local
.find(_.name == name)
.getOrElse(sys.error(s"mediator [$name] not configured"))
def lmx(name: String): LocalMediatorReference =
mediatorsX.local
mediators.local
.find(_.name == name)
.getOrElse(sys.error(s"local mediator [$name] not configured"))
def rmx(name: String): RemoteMediatorReference =
mediatorsX.remote
mediators.remote
.find(_.name == name)
.getOrElse(sys.error(s"remote mediator [$name] not configured"))
}

View File

@ -1,38 +0,0 @@
-- Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
module Bench where
inefficientFibonacci : Int -> Int
inefficientFibonacci n =
case n of
0 -> 0
1 -> 1
m -> inefficientFibonacci (m - 2) + inefficientFibonacci (m - 1)
template InefficientFibonacciResult
with
owner: Party
result: Int
where
signatory owner
template InefficientFibonacci
with
owner: Party
where
signatory owner
choice InefficientFibonacci_Compute : ContractId InefficientFibonacciResult
with
value: Int
controller owner
do create (InefficientFibonacciResult owner (inefficientFibonacci value))
nonconsuming choice InefficientFibonacci_NcCompute : Int
with
value: Int
controller owner
do
pure (inefficientFibonacci value)

View File

@ -1,211 +0,0 @@
-- Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
module Foo where
import DA.Functor (void)
template Divulger
with
divulgees: [Party] -- Parties to whom something is divulged
divulger: Party -- Party who divulges something
keyId: Text
where
signatory [divulger] <> divulgees
key (divulger, keyId): (Party, Text)
maintainer key._1
nonconsuming choice DivulgeContractImmediate: ()
with
fooObservers : [Party]
fooPayload : Text
fooKeyId: Text
fooTemplateName: Text
controller divulger
do
-- Parties from 'divulgees' see the creation of Foo even though
-- they are not contract stakeholders, i.e. immediate divulgence occurs.
if fooTemplateName == "Foo1" then
void $ create Foo1 with
signatory = divulger
observers = fooObservers
payload = fooPayload
keyId = fooKeyId
else if fooTemplateName == "Foo2" then
void $ create Foo2 with
signatory = divulger
observers = fooObservers
payload = fooPayload
keyId = fooKeyId
else if fooTemplateName == "Foo3" then
void $ create Foo3 with
signatory = divulger
observers = fooObservers
payload = fooPayload
keyId = fooKeyId
else
return ()
nonconsuming choice DivulgeConsumingExercise: ()
with
fooTemplateName: Text
fooKey: (Party, Text)
fooConsumingPayload : Text
controller divulger
do
-- Parties from 'divulgees' see the consuming exercise on Foo even though they are not
-- contract stakeholders or choice controllers/observers, i.e. divulgence occurs.
if fooTemplateName == "Foo1" then
void $ exerciseByKey @Foo1 fooKey (Foo1_ConsumingChoice fooConsumingPayload)
else if fooTemplateName == "Foo2" then
void $ exerciseByKey @Foo2 fooKey (Foo2_ConsumingChoice fooConsumingPayload)
else if fooTemplateName == "Foo3" then
void $ exerciseByKey @Foo3 fooKey (Foo3_ConsumingChoice fooConsumingPayload)
else
return ()
template Foo1
with
signatory : Party
observers : [Party]
payload : Text
keyId: Text
where
signatory signatory
observer observers
key (signatory, keyId): (Party, Text)
maintainer key._1
nonconsuming choice Foo1_NonconsumingChoice: ()
with
exercisePayload: Text
observer observers
controller signatory
do
return ()
choice Foo1_ConsumingChoice: ()
with
exercisePayload: Text
controller signatory
do
return ()
interface instance FooI1 for Foo1 where
view = FooData with templateName = "Foo1", ..
template Foo2
with
signatory : Party
observers : [Party]
payload : Text
keyId: Text
where
signatory signatory
observer observers
key (signatory, keyId): (Party, Text)
maintainer key._1
nonconsuming choice Foo2_NonconsumingChoice: ()
with
exercisePayload: Text
observer observers
controller signatory
do
return ()
choice Foo2_ConsumingChoice: ()
with
exercisePayload: Text
controller signatory
do
return ()
interface instance FooI2 for Foo2 where
view = foo2ToFooData $ foo2Roundtrip 10 this
template Foo3
with
signatory : Party
observers : [Party]
payload : Text
keyId: Text
where
signatory signatory
observer observers
key (signatory, keyId): (Party, Text)
maintainer key._1
nonconsuming choice Foo3_NonconsumingChoice: ()
with
exercisePayload: Text
observer observers
controller signatory
do
return ()
choice Foo3_ConsumingChoice: ()
with
exercisePayload: Text
controller signatory
do
return ()
interface instance FooI3 for Foo3 where
view = foo3ToFooData $ foo3Roundtrip 100 this
template Dummy
with
signatory: Party
where
signatory signatory
data FooData = FooData
with
signatory : Party
observers : [Party]
payload : Text
keyId: Text
templateName: Text
deriving (Eq, Show)
-- FooI1 is exposing the most simple case of the interface views - just copying the data from within the template
interface FooI1 where
viewtype FooData
foo2ToFooData : Foo2 -> FooData
foo2ToFooData Foo2{..} = FooData with templateName = "Foo2", ..
fooDataToFoo2 : FooData -> Foo2
fooDataToFoo2 FooData{..}
| templateName == "Foo2" = Foo2 {..}
| otherwise = error "fooDataToFoo2 called non non-foo2"
foo2Roundtrip : Int -> Foo2 -> Foo2
foo2Roundtrip n x
| n <= 0 = x
| otherwise = foo2Roundtrip (n - 1) (fooDataToFoo2 $ foo2ToFooData x)
-- FooI2 is exposing a FooData view through 10 round-trips in the recursion calls
interface FooI2 where
viewtype FooData
foo3ToFooData : Foo3 -> FooData
foo3ToFooData Foo3{..} = FooData with templateName = "Foo3", ..
fooDataToFoo3 : FooData -> Foo3
fooDataToFoo3 FooData{..}
| templateName == "Foo3" = Foo3 {..}
| otherwise = error "fooDataToFoo3 called non non-foo3"
foo3Roundtrip : Int -> Foo3 -> Foo3
foo3Roundtrip n x
| n <= 0 = x
| otherwise = foo3Roundtrip (n - 1) (fooDataToFoo3 $ foo3ToFooData x)
-- FooI3 is exposing a FooData view through 100 round-trips in the recursion calls
interface FooI3 where
viewtype FooData

View File

@ -1,7 +0,0 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
name: benchtool-tests
source: .
version: 3.0.0
dependencies:
- daml-prim
- daml-stdlib

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: carbonv1-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: carbonv2-tests
data-dependencies:
- ../../../../scala-2.13/resource_managed/main/carbonv1-tests.dar

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: experimental-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: model-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: package-management-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
name: semantic-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: JsonEncodingTest

View File

@ -1,4 +1,4 @@
sdk-version: 3.0.0-snapshot.20240215.12772.0.v26fd657b
sdk-version: 3.0.0-snapshot.20240216.12780.0.vd6ed98e0
build-options:
- --target=2.1
name: AdminWorkflows

View File

@ -98,6 +98,7 @@ object GrpcParticipantRepairService {
timestamp,
contractDomainRenames.toMap,
force = request.force,
partiesOffboarding = request.partiesOffboarding,
)
}
@ -117,6 +118,7 @@ object GrpcParticipantRepairService {
timestamp: Option[CantonTimestamp],
contractDomainRenames: Map[DomainId, (DomainId, ProtocolVersion)],
force: Boolean, // if true, does not check whether `timestamp` is clean
partiesOffboarding: Boolean,
)
}
@ -162,6 +164,8 @@ final class GrpcParticipantRepairService(
s"Contract $contractId for domain $domainId cannot be serialized due to an invariant violation: $errorMessage"
)
RepairServiceError.SerializationError.Error(domainId, contractId)
case inspection.Error.OffboardingParty(domainId, error) =>
RepairServiceError.InvalidArgument.Error(s"Parties offboarding on domain $domainId: $error")
}
/** purge contracts
@ -222,6 +226,7 @@ final class GrpcParticipantRepairService(
validRequest.timestamp,
validRequest.contractDomainRenames,
skipCleanTimestampCheck = validRequest.force,
partiesOffboarding = validRequest.partiesOffboarding,
)
)
.leftMap(toRepairServiceError)
@ -276,16 +281,50 @@ final class GrpcParticipantRepairService(
): StreamObserver[ImportAcsRequest] = {
// TODO(i12481): This buffer will contain the whole ACS snapshot.
val outputStream = new ByteArrayOutputStream()
val request = new AtomicReference[ImportAcsRequest]
// (workflowIdPrefix, allowContractIdSuffixRecomputation)
val args = new AtomicReference[Option[(String, Boolean)]](None)
def tryArgs: (String, Boolean) =
args
.get()
.getOrElse(throw new IllegalStateException("The import ACS request fields are not set"))
new StreamObserver[ImportAcsRequest] {
override def onNext(value: ImportAcsRequest): Unit = {
Try(outputStream.write(value.acsSnapshot.toByteArray)) match {
def setOrCheck(
workflowIdPrefix: String,
allowContractIdSuffixRecomputation: Boolean,
): Try[Unit] =
Try {
val newOrMatchingValue = Some((workflowIdPrefix, allowContractIdSuffixRecomputation))
if (!args.compareAndSet(None, newOrMatchingValue)) {
val (oldWorkflowIdPrefix, oldAllowContractIdSuffixRecomputation) = tryArgs
if (workflowIdPrefix != oldWorkflowIdPrefix) {
throw new IllegalArgumentException(
s"Workflow ID prefix cannot be changed from $oldWorkflowIdPrefix to $workflowIdPrefix"
)
} else if (
oldAllowContractIdSuffixRecomputation != allowContractIdSuffixRecomputation
) {
throw new IllegalArgumentException(
s"Contract ID suffix recomputation cannot be changed from $oldAllowContractIdSuffixRecomputation to $allowContractIdSuffixRecomputation"
)
}
}
}
override def onNext(request: ImportAcsRequest): Unit = {
val processRequest =
for {
_ <- setOrCheck(request.workflowIdPrefix, request.allowContractIdSuffixRecomputation)
_ <- Try(outputStream.write(request.acsSnapshot.toByteArray))
} yield ()
processRequest match {
case Failure(exception) =>
outputStream.close()
responseObserver.onError(exception)
case Success(_) =>
request.set(value)
() // Nothing to do, just move on to the next request
}
}
@ -301,11 +340,11 @@ final class GrpcParticipantRepairService(
activeContracts <- EitherT.fromEither[Future](
loadFromByteString(ByteString.copyFrom(outputStream.toByteArray))
)
req = request.get()
(workflowIdPrefix, allowContractIdSuffixRecomputation) = tryArgs
activeContractsWithRemapping <- EnsureValidContractIds(
loggerFactory,
sync.protocolVersionGetter,
Option.when(req.allowContractIdSuffixRecomputation)(sync.pureCryptoApi),
Option.when(allowContractIdSuffixRecomputation)(sync.pureCryptoApi),
)(activeContracts)
(activeContractsWithValidContractIds, contractIdRemapping) =
activeContractsWithRemapping
@ -337,7 +376,7 @@ final class GrpcParticipantRepairService(
),
ignoreAlreadyAdded = true,
ignoreStakeholderCheck = true,
workflowIdPrefix = Option(req.workflowIdPrefix),
workflowIdPrefix = Option.when(workflowIdPrefix != "")(workflowIdPrefix),
)
)
} yield ()

View File

@ -5,23 +5,31 @@ package com.digitalasset.canton.participant.admin.inspection
import cats.data.EitherT
import cats.syntax.foldable.*
import cats.syntax.traverse.*
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.participant.store.{StoredContract, SyncDomainPersistentState}
import com.digitalasset.canton.participant.store.SyncDomainPersistentState
import com.digitalasset.canton.protocol.ContractIdSyntax.orderingLfContractId
import com.digitalasset.canton.protocol.{LfContractId, SerializableContract}
import com.digitalasset.canton.topology.{DomainId, PartyId}
import com.digitalasset.canton.topology.client.{DomainTopologyClient, TopologySnapshot}
import com.digitalasset.canton.topology.{DomainId, ParticipantId, PartyId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.MonadUtil
import com.digitalasset.canton.{LfPartyId, TransferCounterO}
import scala.collection.immutable.SortedMap
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
private[inspection] object AcsInspection {
private val BatchSize = PositiveInt.tryCreate(1000)
final case class AcsSnapshot[S](
snapshot: S,
ts: CantonTimestamp, // timestamp of the snapshot
)
def findContracts(
state: SyncDomainPersistentState,
filterId: Option[String],
@ -32,37 +40,46 @@ private[inspection] object AcsInspection {
traceContext: TraceContext,
ec: ExecutionContext,
): Future[List[(Boolean, SerializableContract)]] =
for {
acs <- getCurrentSnapshot(state)
contracts <- state.contractStore
.find(filterId, filterPackage, filterTemplate, limit)
.map(_.map(sc => (acs.contains(sc.contractId), sc)))
} yield contracts
getCurrentSnapshot(state)
.flatMap(_.traverse { acs =>
state.contractStore
.find(filterId, filterPackage, filterTemplate, limit)
.map(_.map(sc => (acs.snapshot.contains(sc.contractId), sc)))
})
.map(_.getOrElse(Nil))
def hasActiveContracts(state: SyncDomainPersistentState, partyId: PartyId)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): Future[Boolean] =
for {
acs <- getCurrentSnapshot(state)
res <- state.contractStore.hasActiveContracts(partyId, acs.keysIterator)
acsSnapshotO <- getCurrentSnapshot(state)
res <- acsSnapshotO.fold(Future.successful(false))(acsSnapshot =>
state.contractStore.hasActiveContracts(partyId, acsSnapshot.snapshot.keysIterator)
)
} yield res
/** Get the current snapshot
* @return A snapshot (with its timestamp) or None if no clean timestamp is known
*/
def getCurrentSnapshot(state: SyncDomainPersistentState)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): Future[SortedMap[LfContractId, (CantonTimestamp, TransferCounterO)]] =
): Future[Option[AcsSnapshot[SortedMap[LfContractId, (CantonTimestamp, TransferCounterO)]]]] =
for {
cursorHeadO <- state.requestJournalStore.preheadClean
snapshot <- cursorHeadO.fold(
Future.successful(SortedMap.empty[LfContractId, (CantonTimestamp, TransferCounterO)])
)(cursorHead =>
state.activeContractStore
.snapshot(cursorHead.timestamp)
.map(_.map { case (id, (timestamp, transferCounter)) =>
id -> (timestamp, transferCounter)
})
)
snapshot <- cursorHeadO
.traverse { cursorHead =>
val ts = cursorHead.timestamp
val snapshotF = state.activeContractStore
.snapshot(ts)
.map(_.map { case (id, (timestamp, transferCounter)) =>
id -> (timestamp, transferCounter)
})
snapshotF.map(snapshot => Some(AcsSnapshot(snapshot, ts)))
}
.map(_.flatten)
} yield snapshot
// fetch acs, checking that the requested timestamp is clean
@ -72,7 +89,9 @@ private[inspection] object AcsInspection {
)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): EitherT[Future, Error, SortedMap[LfContractId, (CantonTimestamp, TransferCounterO)]] =
): EitherT[Future, Error, AcsSnapshot[
SortedMap[LfContractId, (CantonTimestamp, TransferCounterO)]
]] =
for {
_ <-
if (!skipCleanTimestampCheck)
@ -89,9 +108,7 @@ private[inspection] object AcsInspection {
state.activeContractStore.pruningStatus,
timestamp,
)
} yield snapshot.map { case (id, (timestamp, transferCounter)) =>
id -> (timestamp, transferCounter)
}
} yield AcsSnapshot(snapshot, timestamp)
// sort acs for easier comparison
private def getAcsSnapshot(
@ -102,36 +119,106 @@ private[inspection] object AcsInspection {
)(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): EitherT[Future, Error, Iterator[Seq[(LfContractId, TransferCounterO)]]] =
timestamp
.map(getSnapshotAt(domainId, state)(_, skipCleanTimestampCheck = skipCleanTimestampCheck))
.getOrElse(EitherT.right(getCurrentSnapshot(state)))
.map(
_.iterator
): EitherT[Future, Error, Option[
AcsSnapshot[Iterator[Seq[(LfContractId, TransferCounterO)]]]
]] = {
type MaybeSnapshot =
Option[AcsSnapshot[SortedMap[LfContractId, (CantonTimestamp, TransferCounterO)]]]
val maybeSnapshotET: EitherT[Future, Error, MaybeSnapshot] = timestamp match {
case Some(timestamp) =>
getSnapshotAt(domainId, state)(timestamp, skipCleanTimestampCheck = skipCleanTimestampCheck)
.map(Some(_))
case None =>
EitherT.liftF[Future, Error, MaybeSnapshot](getCurrentSnapshot(state))
}
maybeSnapshotET.map(
_.map { case AcsSnapshot(snapshot, ts) =>
val groupedSnapshot = snapshot.iterator
.map { case (cid, (_, transferCounter)) =>
cid -> transferCounter
}
.toSeq
.grouped(
AcsInspection.BatchSize.value
) // TODO(#14818): Batching should be done by the caller not here
)
) // TODO(#14818): Batching should be done by the caller not here))
AcsSnapshot(groupedSnapshot, ts)
}
)
}
def forEachVisibleActiveContract(
domainId: DomainId,
state: SyncDomainPersistentState,
parties: Set[LfPartyId],
timestamp: Option[CantonTimestamp],
force: Boolean = false, // if true, does not check whether `timestamp` is clean
skipCleanTimestampCheck: Boolean = false,
)(f: (SerializableContract, TransferCounterO) => Either[Error, Unit])(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): EitherT[Future, Error, Unit] =
): EitherT[Future, Error, Option[(Set[LfPartyId], CantonTimestamp)]] = {
for {
acs <- getAcsSnapshot(domainId, state, timestamp, skipCleanTimestampCheck = force)
unit <- MonadUtil.sequentialTraverse_(acs)(forEachBatch(domainId, state, parties, f))
} yield unit
acsSnapshotO <- getAcsSnapshot(
domainId,
state,
timestamp,
skipCleanTimestampCheck = skipCleanTimestampCheck,
)
allStakeholdersAndTs <- acsSnapshotO.traverse { acsSnapshot =>
MonadUtil
.sequentialTraverseMonoid(acsSnapshot.snapshot)(
forEachBatch(domainId, state, parties, f)
)
.map((_, acsSnapshot.ts))
}
} yield allStakeholdersAndTs
}
/** Check that the ACS snapshot does not contain contracts that are still needed on the participant.
* In the context of party offboarding, we want to avoid purging contracts
* that are needed for other parties hosted on the participant.
*/
def checkOffboardingSnapshot(
participantId: ParticipantId,
offboardedParties: Set[LfPartyId],
allStakeholders: Set[LfPartyId],
snapshotTs: CantonTimestamp,
topologyClient: DomainTopologyClient,
)(implicit
executionContext: ExecutionContext,
traceContext: TraceContext,
): EitherT[Future, String, Unit] = {
for {
topologySnapshot <- EitherT.liftF[Future, String, TopologySnapshot](
topologyClient.awaitSnapshot(snapshotTs)
)
hostedStakeholders <-
EitherT.liftF[Future, String, Seq[LfPartyId]](
topologySnapshot
.hostedOn(allStakeholders, participantId)
.map(_.keysIterator.toSeq)
)
remainingHostedStakeholders = hostedStakeholders.diff(offboardedParties.toSeq)
_ <- EitherT.cond[Future](
remainingHostedStakeholders.isEmpty,
(),
s"Cannot take snapshot to offboard parties ${offboardedParties.toSeq} at $snapshotTs, because the following parties have contracts: ${remainingHostedStakeholders
.mkString(", ")}",
)
} yield ()
}
/** Applies function f to all the contracts in the batch whose set of stakeholders has
* non-empty intersection with `parties`
* @return The union of all stakeholders of all contracts on which `f` was applied
*/
private def forEachBatch(
domainId: DomainId,
state: SyncDomainPersistentState,
@ -140,29 +227,29 @@ private[inspection] object AcsInspection {
)(batch: Seq[(LfContractId, TransferCounterO)])(implicit
traceContext: TraceContext,
ec: ExecutionContext,
): EitherT[Future, Error, Unit] = {
): EitherT[Future, Error, Set[LfPartyId]] = {
val (cids, transferCounters) = batch.unzip
val allStakeholders: mutable.Set[LfPartyId] = mutable.Set()
for {
batch <- state.contractStore
.lookupManyUncached(cids)
.leftMap(missingContract => Error.InconsistentSnapshot(domainId, missingContract))
chop = batch.zip(transferCounters)
contractsWithTransferCounter = batch.zip(transferCounters)
_ <- EitherT.fromEither[Future](applyToBatch(parties, f)(chop))
} yield ()
stakeholdersE = contractsWithTransferCounter
.traverse_ { case (storedContract, transferCounter) =>
if (parties.exists(storedContract.contract.metadata.stakeholders)) {
allStakeholders ++= storedContract.contract.metadata.stakeholders
f(storedContract.contract, transferCounter)
} else
Right(())
}
.map(_ => allStakeholders.toSet)
allStakeholders <- EitherT.fromEither[Future](stakeholdersE)
} yield allStakeholders
}
private def applyToBatch(
parties: Set[LfPartyId],
f: (SerializableContract, TransferCounterO) => Either[Error, Unit],
)(batch: List[(StoredContract, TransferCounterO)]): Either[Error, Unit] =
batch.traverse_ { case (storedContract, transferCounter) =>
if (parties.exists(storedContract.contract.metadata.stakeholders))
f(storedContract.contract, transferCounter)
else
Right(())
}
}

View File

@ -41,4 +41,5 @@ private[admin] object Error {
errorMessage: String,
) extends Error
final case class OffboardingParty(domainId: DomainId, error: String) extends Error
}

View File

@ -5,7 +5,10 @@ package com.digitalasset.canton.participant.admin.inspection
import cats.Eval
import cats.data.{EitherT, OptionT}
import cats.implicits.*
import cats.syntax.foldable.*
import cats.syntax.functor.*
import cats.syntax.parallel.*
import cats.syntax.traverse.*
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.config.ProcessingTimeout
@ -20,6 +23,7 @@ import com.digitalasset.canton.participant.protocol.RequestJournal
import com.digitalasset.canton.participant.store.ActiveContractStore.AcsError
import com.digitalasset.canton.participant.store.*
import com.digitalasset.canton.participant.sync.{
ConnectedDomainsLookup,
LedgerSyncEvent,
SyncDomainPersistentStateManager,
TimestampedEvent,
@ -84,6 +88,8 @@ final class SyncStateInspection(
participantNodePersistentState: Eval[ParticipantNodePersistentState],
timeouts: ProcessingTimeout,
journalCleaningControl: JournalGarbageCollectorControl,
connectedDomainsLookup: ConnectedDomainsLookup,
participantId: ParticipantId,
override protected val loggerFactory: NamedLoggerFactory,
)(implicit ec: ExecutionContext)
extends NamedLogging {
@ -128,14 +134,20 @@ final class SyncStateInspection(
domainAlias: DomainAlias
)(implicit
traceContext: TraceContext
): EitherT[Future, AcsError, Map[LfContractId, (CantonTimestamp, TransferCounterO)]] =
OptionT(
syncDomainPersistentStateManager
.getByAlias(domainAlias)
.map(AcsInspection.getCurrentSnapshot)
.sequence
).widen[Map[LfContractId, (CantonTimestamp, TransferCounterO)]]
.toRight(SyncStateInspection.NoSuchDomain(domainAlias))
): EitherT[Future, AcsError, Map[LfContractId, (CantonTimestamp, TransferCounterO)]] = {
for {
state <- EitherT.fromEither[Future](
syncDomainPersistentStateManager
.getByAlias(domainAlias)
.toRight(SyncStateInspection.NoSuchDomain(domainAlias))
)
snapshotO <- EitherT.liftF(AcsInspection.getCurrentSnapshot(state).map(_.map(_.snapshot)))
} yield snapshotO.fold(Map.empty[LfContractId, (CantonTimestamp, TransferCounterO)])(
_.toMap
)
}
/** searches the pcs and returns the contract and activeness flag */
def findContracts(
@ -149,8 +161,8 @@ final class SyncStateInspection(
timeouts.inspection.await("findContracts") {
syncDomainPersistentStateManager
.getByAlias(domain)
.map(AcsInspection.findContracts(_, filterId, filterPackage, filterTemplate, limit))
.sequence
.traverse(AcsInspection.findContracts(_, filterId, filterPackage, filterTemplate, limit))
},
domain,
)
@ -180,10 +192,12 @@ final class SyncStateInspection(
timestamp: Option[CantonTimestamp],
contractDomainRenames: Map[DomainId, (DomainId, ProtocolVersion)],
skipCleanTimestampCheck: Boolean,
partiesOffboarding: Boolean,
)(implicit
traceContext: TraceContext
): EitherT[Future, Error, Unit] = {
val allDomains = syncDomainPersistentStateManager.getAll
// disable journal cleaning for the duration of the dump
disableJournalCleaningForFilter(allDomains, filterDomain).flatMap { _ =>
MonadUtil.sequentialTraverse_(allDomains) {
@ -192,13 +206,13 @@ final class SyncStateInspection(
contractDomainRenames.getOrElse(domainId, (domainId, state.protocolVersion))
val ret = for {
_ <- AcsInspection
result <- AcsInspection
.forEachVisibleActiveContract(
domainId,
state,
parties,
timestamp,
force = skipCleanTimestampCheck,
skipCleanTimestampCheck = skipCleanTimestampCheck,
) { case (contract, transferCounter) =>
val activeContractE =
ActiveContract.create(domainIdForExport, contract, transferCounter)(
@ -217,8 +231,34 @@ final class SyncStateInspection(
Right(())
}
}
}
_ <- result match {
case Some((allStakeholders, snapshotTs)) if partiesOffboarding =>
for {
syncDomain <- EitherT.fromOption[Future](
connectedDomainsLookup.get(domainId),
Error.OffboardingParty(
domainId,
s"Unable to get topology client for domain $domainId; check domain connectivity.",
),
)
_ <- AcsInspection
.checkOffboardingSnapshot(
participantId,
offboardedParties = parties,
allStakeholders = allStakeholders,
snapshotTs = snapshotTs,
topologyClient = syncDomain.topologyClient,
)
.leftMap[Error](err => Error.OffboardingParty(domainId, err))
} yield ()
// Snapshot is empty or partiesOffboarding is false
case _ => EitherTUtil.unit[Error]
}
} yield ()
// re-enable journal cleaning after the dump
ret.thereafter { _ =>

View File

@ -37,8 +37,5 @@ package object repair {
executionContext: ExecutionContext,
traceContext: TraceContext,
): Future[Set[LfPartyId]] =
snapshot
.hostedOn(parties, participantId)
.map(_.collect { case (party, attributes) if attributes.permission.isActive => party }.toSet)
snapshot.hostedOn(parties, participantId).map(_.keySet)
}

View File

@ -22,7 +22,7 @@ import com.digitalasset.canton.protocol.messages.{
import com.digitalasset.canton.sequencing.client.{
SendAsyncClientError,
SendCallback,
SequencerClient,
SequencerClientSend,
}
import com.digitalasset.canton.sequencing.protocol.{Batch, MessageId, Recipients}
import com.digitalasset.canton.tracing.TraceContext
@ -37,7 +37,7 @@ import scala.concurrent.{ExecutionContext, Future}
abstract class AbstractMessageProcessor(
ephemeral: SyncDomainEphemeralState,
crypto: DomainSyncCryptoClient,
sequencerClient: SequencerClient,
sequencerClient: SequencerClientSend,
protocolVersion: ProtocolVersion,
)(implicit ec: ExecutionContext)
extends NamedLogging
@ -105,6 +105,7 @@ abstract class AbstractMessageProcessor(
maxSequencingTime = maxSequencingTime,
messageId = messageId.getOrElse(MessageId.randomMessageId()),
callback = SendCallback.log(s"Response message for request [$requestId]", logger),
amplify = true,
)
} yield ()
}

View File

@ -96,7 +96,7 @@ abstract class ProtocolProcessor[
inFlightSubmissionTracker: InFlightSubmissionTracker,
ephemeral: SyncDomainEphemeralState,
crypto: DomainSyncCryptoClient,
sequencerClient: SequencerClient,
sequencerClient: SequencerClientSend,
domainId: DomainId,
protocolVersion: ProtocolVersion,
override protected val loggerFactory: NamedLoggerFactory,
@ -460,6 +460,7 @@ abstract class ProtocolProcessor[
callback = res => sendResultP.trySuccess(res).discard,
maxSequencingTime = maxSequencingTime,
messageId = messageId,
amplify = true,
)
.mapK(FutureUnlessShutdown.outcomeK)
.leftMap { err =>

View File

@ -11,6 +11,7 @@ import cats.syntax.foldable.*
import cats.syntax.functorFilter.*
import cats.syntax.option.*
import cats.syntax.parallel.*
import com.daml.error.utils.DecodedCantonError
import com.daml.nameof.NameOf.functionFullName
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.config.ProcessingTimeout
@ -27,6 +28,7 @@ import com.digitalasset.canton.participant.store.*
import com.digitalasset.canton.participant.sync.TimestampedEvent.TimelyRejectionEventId
import com.digitalasset.canton.participant.sync.{LedgerSyncEvent, ParticipantEventPublisher}
import com.digitalasset.canton.protocol.RootHash
import com.digitalasset.canton.sequencing.protocol.SequencerErrors.AggregateSubmissionAlreadySent
import com.digitalasset.canton.sequencing.protocol.{DeliverError, MessageId}
import com.digitalasset.canton.time.DomainTimeTracker
import com.digitalasset.canton.topology.DomainId
@ -187,13 +189,23 @@ class InFlightSubmissionTracker(
val domainId = deliverError.domainId
val messageId = deliverError.messageId
for {
inFlightO <- store.value.lookupSomeMessageId(domainId, messageId)
toUpdateO = updatedTrackingData(inFlightO)
_ <- toUpdateO.traverse_ { case (changeIdHash, newTrackingData) =>
store.value.updateUnsequenced(changeIdHash, domainId, messageId, newTrackingData)
}
} yield ()
// Ignore already sequenced errors here to deal with submission request amplification
val isAlreadySequencedError = DecodedCantonError
.fromGrpcStatus(deliverError.reason)
.exists(_.code.id == AggregateSubmissionAlreadySent.id)
if (isAlreadySequencedError) {
logger.debug(
s"Ignoring deliver error $deliverError for $domainId and $messageId because the message was already sequenced."
)
Future.successful(())
} else
for {
inFlightO <- store.value.lookupSomeMessageId(domainId, messageId)
toUpdateO = updatedTrackingData(inFlightO)
_ <- toUpdateO.traverse_ { case (changeIdHash, newTrackingData) =>
store.value.updateUnsequenced(changeIdHash, domainId, messageId, newTrackingData)
}
} yield ()
}
/** Marks the timestamp as having been observed on the domain. */

View File

@ -58,7 +58,7 @@ object UsableDomain {
tc: TraceContext,
): EitherT[Future, MissingActiveParticipant, Unit] =
snapshot
.allHaveActiveParticipants(parties, _.isActive)
.allHaveActiveParticipants(parties)
.leftMap(MissingActiveParticipant(domainId, _))
private def unknownPackages(snapshot: TopologySnapshot)(

View File

@ -195,7 +195,7 @@ class DomainRouter(
)
Future.successful(false)
},
_.allHaveActiveParticipants(informees, _.isActive).value.map(_.isRight),
_.allHaveActiveParticipants(informees).value.map(_.isRight),
)
}.merge

View File

@ -87,7 +87,7 @@ object ExtractUsedAndCreated {
)(implicit ec: ExecutionContext, tc: TraceContext): Future[Map[LfPartyId, Boolean]] = {
topologySnapshot.hostedOn(parties, participantId).map { partyWithAttributes =>
parties
.map(partyId => partyId -> partyWithAttributes.get(partyId).exists(_.permission.isActive))
.map(partyId => partyId -> partyWithAttributes.contains(partyId))
.toMap
}
}

View File

@ -45,7 +45,7 @@ import com.digitalasset.canton.protocol.{
WithContractHash,
}
import com.digitalasset.canton.sequencing.client.SendAsyncClientError.RequestRefused
import com.digitalasset.canton.sequencing.client.{SendType, SequencerClient}
import com.digitalasset.canton.sequencing.client.{SendType, SequencerClientSend}
import com.digitalasset.canton.sequencing.protocol.{Batch, OpenEnvelope, Recipients, SendAsyncError}
import com.digitalasset.canton.store.SequencerCounterTrackerStore
import com.digitalasset.canton.time.PositiveSeconds
@ -154,7 +154,7 @@ import scala.math.Ordering.Implicits.*
class AcsCommitmentProcessor(
domainId: DomainId,
participantId: ParticipantId,
val sequencerClient: SequencerClient,
sequencerClient: SequencerClientSend,
domainCrypto: SyncCryptoClient[SyncCryptoApi],
sortedReconciliationIntervalsProvider: SortedReconciliationIntervalsProvider,
store: AcsCommitmentStore,
@ -1157,7 +1157,13 @@ class AcsCommitmentProcessor(
EitherT(
FutureUtil.logOnFailure(
sequencerClient
.sendAsync(batch, SendType.Other, None)
.sendAsync(
batch,
SendType.Other,
None,
// ACS commitments are best effort, so no need to amplify them
amplify = false,
)
.leftMap {
case RequestRefused(SendAsyncError.ShuttingDown(msg)) =>
logger.info(

View File

@ -437,6 +437,8 @@ class CantonSyncService(
.foreach(_.removeJournalGarageCollectionLock())
}
},
connectedDomainsLookup,
participantId,
loggerFactory,
)

View File

@ -13,7 +13,6 @@ import com.digitalasset.canton.crypto.{
AsymmetricEncrypted,
Encrypted,
Fingerprint,
HashPurpose,
SecureRandomness,
SymmetricKeyScheme,
TestHash,
@ -1324,7 +1323,6 @@ private[protocol] object MessageDispatcherTest {
s"${this.getClass.getSimpleName} cannot be serialized"
)
override def hashPurpose: HashPurpose = TestHash.testHashPurpose
override def deserializedFrom: Option[ByteString] = None
override protected[this] def toByteStringUnmemoized: ByteString = ByteString.EMPTY

Some files were not shown because too many files have changed in this diff Show More