update canton to 20231219.12068.0.v5a57880a/2.9.0-snapshot.20231219.11658.0.vf7fbf8cf/3.0.0-snapshot.20231219.12068.0.v5a57880a (#18055)

CHANGELOG_BEGIN
CHANGELOG_END

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2023-12-21 09:30:31 +00:00 committed by GitHub
parent a3217f67ce
commit 9ab22ec903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
104 changed files with 1389 additions and 668 deletions

View File

@ -454,7 +454,7 @@ class ExternalLedgerApiClient(
object ExternalLedgerApiClient {
def forReference(participant: LocalParticipantReference, token: String)(implicit
def forReference(participant: LocalParticipantReferenceCommon, token: String)(implicit
env: ConsoleEnvironment
): ExternalLedgerApiClient = {
val cc = participant.config.ledgerApi.clientConfig

View File

@ -1151,6 +1151,26 @@ trait ParticipantAdministration extends FeatureFlagFilter {
connectFromConfig(config, None)
}
@Help.Summary("Macro to connect a participant to a domain given by instance")
@Help.Description("""This variant of connect expects an instance with a sequencer connection.
|Otherwise the behaviour is equivalent to the connect command with explicit
|arguments. If the domain is already configured, the domain connection
|will be attempted. If however the domain is offline, the command will fail.
|Generally, this macro should only be used to setup a new domain. However, for
|convenience, we support idempotent invocations where subsequent calls just ensure
|that the participant reconnects to the domain.
|""")
def connect(
instance: InstanceReferenceWithSequencerConnection,
domainAlias: DomainAlias,
): Unit =
connect(
DomainConnectionConfig(
domainAlias,
SequencerConnections.single(instance.sequencerConnection),
)
)
private def connectFromConfig(
config: DomainConnectionConfig,
synchronize: Option[NonNegativeDuration],
@ -1305,12 +1325,25 @@ trait ParticipantAdministration extends FeatureFlagFilter {
synchronize - A timeout duration indicating how long to wait for all topology changes to have been effected on all local nodes.
""")
def reconnect_local(
ref: InstanceReferenceWithSequencerConnection,
ref: InstanceReferenceWithSequencerConnection
): Boolean = reconnect(ref.name)
@Help.Summary("Reconnect this participant to the given local domain")
@Help.Description("""Idempotent attempts to re-establish a connection to the given local domain.
|Same behaviour as generic reconnect.
The arguments are:
domainAlias - The domain alias to connect to
retry - Whether the reconnect should keep on retrying until it succeeded or abort noisly if the connection attempt fails.
synchronize - A timeout duration indicating how long to wait for all topology changes to have been effected on all local nodes.
""")
def reconnect_local(
domainAlias: DomainAlias,
retry: Boolean = true,
synchronize: Option[NonNegativeDuration] = Some(
consoleEnvironment.commandTimeouts.bounded
),
): Boolean = reconnect(ref.name, retry, synchronize)
): Boolean = reconnect(domainAlias, retry, synchronize)
@Help.Summary("Reconnect this participant to all domains which are not marked as manual start")
@Help.Description("""

View File

@ -14,7 +14,7 @@ import com.digitalasset.canton.config.ConfigErrors.{
SubstitutionError,
}
import com.digitalasset.canton.logging.SuppressingLogger.LogEntryOptionality
import com.digitalasset.canton.logging.{ErrorLoggingContext, LogEntry, SuppressionRule}
import com.digitalasset.canton.logging.{LogEntry, SuppressionRule}
import com.digitalasset.canton.version.HandshakeErrors.DeprecatedProtocolVersion
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.scalatest.wordspec.AnyWordSpec
@ -22,7 +22,8 @@ import org.scalatest.wordspec.AnyWordSpec
class CantonCommunityConfigTest extends AnyWordSpec with BaseTest {
import scala.jdk.CollectionConverters.*
val simpleConf = "examples/01-simple-topology/simple-topology.conf"
private val simpleConf = "examples/01-simple-topology/simple-topology.conf"
"the example simple topology configuration" should {
lazy val config =
loadFile(simpleConf).valueOrFail("failed to load simple-topology.conf")
@ -360,8 +361,6 @@ class CantonCommunityConfigTest extends AnyWordSpec with BaseTest {
loadFiles(Seq(resourcePath))
}
val elc: ErrorLoggingContext = ErrorLoggingContext(logger, loggerFactory.properties, traceContext)
private def loadFiles(
resourcePaths: Seq[String]
): Either[CantonConfigError, CantonCommunityConfig] = {
@ -369,6 +368,6 @@ class CantonCommunityConfigTest extends AnyWordSpec with BaseTest {
CantonCommunityConfig.parseAndLoad(files)
}
lazy val baseDir: File = "community" / "app" / "src" / "test" / "resources"
private lazy val baseDir: File = "community" / "app" / "src" / "test" / "resources"
}

View File

@ -4,7 +4,6 @@
package com.digitalasset.canton.integration.tests
import better.files.*
import com.digitalasset.canton.ConsoleScriptRunner
import com.digitalasset.canton.config.CantonRequireTypes.InstanceName
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.environment.Environment
@ -25,6 +24,7 @@ import com.digitalasset.canton.integration.{
import com.digitalasset.canton.logging.NamedLogging
import com.digitalasset.canton.tracing.TracingConfig
import com.digitalasset.canton.util.ShowUtil.*
import com.digitalasset.canton.{ConsoleScriptRunner, DiscardOps}
import monocle.macros.syntax.lens.*
import scala.concurrent.blocking
@ -53,9 +53,8 @@ abstract class ExampleIntegrationTest(configPaths: File*)
trait HasConsoleScriptRunner { this: NamedLogging =>
import org.scalatest.EitherValues.*
def runScript(scriptPath: File)(implicit env: Environment): Unit = {
val () = ConsoleScriptRunner.run(env, scriptPath.toJava, logger = logger).value
}
def runScript(scriptPath: File)(implicit env: Environment): Unit =
ConsoleScriptRunner.run(env, scriptPath.toJava, logger = logger).value.discard
}
object ExampleIntegrationTest {

View File

@ -23,10 +23,10 @@ trait FlagCloseableAsync extends FlagCloseable {
trait AsyncOrSyncCloseable extends AutoCloseable
class AsyncCloseable[D <: RefinedNonNegativeDuration[D]] private (
class AsyncCloseable private (
name: String,
closeFuture: () => Future[?],
timeout: D,
timeout: RefinedNonNegativeDuration[?],
onTimeout: TimeoutException => Unit,
)(implicit
loggingContext: ErrorLoggingContext
@ -38,14 +38,14 @@ class AsyncCloseable[D <: RefinedNonNegativeDuration[D]] private (
}
object AsyncCloseable {
def apply[D <: RefinedNonNegativeDuration[D]](
def apply(
name: String,
closeFuture: => Future[?],
timeout: D,
timeout: RefinedNonNegativeDuration[?],
onTimeout: TimeoutException => Unit = _ => (),
)(implicit
loggingContext: ErrorLoggingContext
): AsyncCloseable[D] =
): AsyncCloseable =
new AsyncCloseable(name, () => closeFuture, timeout, onTimeout)
}

View File

@ -3,7 +3,6 @@
package com.digitalasset.canton.networking.grpc
import com.digitalasset.canton.annotations.UnstableTest
import com.digitalasset.canton.config.ApiLoggingConfig
import com.digitalasset.canton.domain.api.v0.HelloServiceGrpc.HelloService
import com.digitalasset.canton.domain.api.v0.{Hello, HelloServiceGrpc}
@ -31,7 +30,6 @@ import scala.util.control.NonFatal
@SuppressWarnings(Array("org.wartremover.warts.Null"))
@nowarn("msg=match may not be exhaustive")
@UnstableTest
class ApiRequestLoggerTest extends AnyWordSpec with BaseTest with HasExecutionContext {
val ChannelName: String = "testSender"

View File

@ -72,11 +72,12 @@ private[integration] trait BaseIntegrationTest[E <: Environment, TCE <: TestCons
): Assertion =
loggerFactory.assertThrowsAndLogs[CommandFailure](
within,
assertions.map(assertion => { (entry: LogEntry) =>
assertion(entry)
entry.commandFailureMessage
succeed
}): _*
assertions
.map(assertion => { (entry: LogEntry) =>
assertion(entry)
entry.commandFailureMessage
succeed
}) *,
)
/** Similar to [[com.digitalasset.canton.console.commands.ParticipantAdministration#ping]]

View File

@ -10,7 +10,9 @@ import com.daml.lf.value.Value
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.protocol.{SourceDomainId, TargetDomainId}
sealed trait Reassignment
sealed trait Reassignment {
def kind: String
}
object Reassignment {
@ -28,7 +30,9 @@ object Reassignment {
templateId: Ref.Identifier,
stakeholders: List[Ref.Party],
assignmentExclusivity: Option[Timestamp],
) extends Reassignment
) extends Reassignment {
override def kind: String = "unassignment"
}
/** Represents the update of assigning a contract to a domain.
*
@ -40,7 +44,9 @@ object Reassignment {
ledgerEffectiveTime: Timestamp,
createNode: Node.Create,
contractMetadata: Bytes,
) extends Reassignment
) extends Reassignment {
override def kind: String = "assignment"
}
}
/** The common information for all reassigments.

View File

@ -321,6 +321,7 @@ object Update {
paramIfDefined("completion", _.optCompletionInfo),
param("source", _.reassignmentInfo.sourceDomain),
param("target", _.reassignmentInfo.targetDomain),
unnamedParam(_.reassignment.kind.unquoted),
indicateOmittedFields,
)

View File

@ -8,6 +8,7 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.digitalasset.canton.ledger.participant.state.v2.ReadService
import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger}
import com.digitalasset.canton.metrics.Metrics
import com.digitalasset.canton.platform.ResourceOwnerOps
import com.digitalasset.canton.platform.config.ServerRole
import com.digitalasset.canton.platform.indexer.Indexer
import com.digitalasset.canton.platform.indexer.ha.{
@ -61,7 +62,7 @@ object ParallelIndexerFactory {
metrics.executorServiceMetrics,
),
loggerFactory,
)
).afterReleased(logger.debug("Input Mapping Threadpool released"))
batcherExecutor <- asyncPool(
batchingParallelism,
"batching-pool",
@ -70,7 +71,7 @@ object ParallelIndexerFactory {
metrics.executorServiceMetrics,
),
loggerFactory,
)
).afterReleased(logger.debug("Batching Threadpool released"))
haCoordinator <-
if (dbLockStorageBackend.dbLockSupported) {
for {
@ -91,7 +92,11 @@ object ParallelIndexerFactory {
)
)
)
timer <- ResourceOwner.forTimer(() => new Timer)
.afterReleased(logger.debug("HaCoordinator single-threadpool released"))
timer <- ResourceOwner
.forTimer(() => new Timer)
.afterReleased(logger.debug("HaCoordinator Timer released"))
// this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock
// The life-cycle of such connections matches the life-cycle of a protectedExecution
dataSource = dataSourceStorageBackend.createDataSource(
@ -127,8 +132,8 @@ object ParallelIndexerFactory {
ResourceOwner.successful(NoopHaCoordinator)
} yield toIndexer { implicit resourceContext =>
implicit val ec: ExecutionContext = resourceContext.executionContext
haCoordinator.protectedExecution(connectionInitializer =>
initializeHandle(
haCoordinator.protectedExecution { connectionInitializer =>
val indexingHandleF = initializeHandle(
for {
dbDispatcher <- DbDispatcher
.owner(
@ -145,7 +150,9 @@ object ParallelIndexerFactory {
metrics = metrics,
loggerFactory = loggerFactory,
)
.afterReleased(logger.debug("Indexing DbDispatcher released"))
_ <- meteringAggregator(dbDispatcher)
.afterReleased(logger.debug("Metering Aggregator released"))
} yield dbDispatcher
) { dbDispatcher =>
initializeParallelIngestion(
@ -163,7 +170,22 @@ object ParallelIndexerFactory {
)
)
}
)
indexingHandleF.onComplete {
case Success(indexingHandle) =>
logger.info("Indexer initialized, indexing started.")
indexingHandle.completed.onComplete {
case Success(_) =>
logger.info("Indexing finished.")
case Failure(failure) =>
logger.info(s"Indexing finished with failure: ${failure.getMessage}")
}
case Failure(failure) =>
logger.info(s"Indexer initialization failed: ${failure.getMessage}")
}
indexingHandleF
}
}
}

View File

@ -3,8 +3,11 @@
package com.digitalasset.canton
import com.daml.ledger.resources.ResourceOwner
import com.digitalasset.canton.ledger.offset.Offset
import scala.concurrent.Future
/** Type aliases used throughout the package */
package object platform {
import com.daml.lf.value.{Value as lfval}
@ -61,4 +64,15 @@ package object platform {
private[platform] type Hash = crypto.Hash
private[platform] type PruneBuffers = Offset => Unit
implicit class ResourceOwnerOps[T](val resourceOwner: ResourceOwner[T]) extends AnyVal {
def afterReleased(body: => Unit): ResourceOwner[T] =
afterReleasedF(Future.successful(body))
def afterReleasedF(bodyF: => Future[Unit]): ResourceOwner[T] =
for {
_ <- ResourceOwner.forReleasable(() => ())(_ => bodyF)
t <- resourceOwner
} yield t
}
}

View File

@ -79,7 +79,7 @@ private[backend] object AppendOnlySchema {
BooleanOptional(extractor)
def insert[FROM](tableName: String)(fields: (String, Field[FROM, _, _])*): Table[FROM]
def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int)(
def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int, ordering: Ordering[FROM])(
fields: (String, Field[FROM, _, _])*
): Table[FROM]
}
@ -317,6 +317,7 @@ private[backend] object AppendOnlySchema {
fieldStrategy.idempotentInsert(
tableName = "packages",
keyFieldIndex = 0,
ordering = Ordering.by[DbDto.Package, String](_.package_id),
)(
"package_id" -> fieldStrategy.string(_ => _.package_id),
"upload_id" -> fieldStrategy.string(_ => _.upload_id),

View File

@ -12,13 +12,17 @@ private[backend] trait Table[FROM] {
def executeUpdate: Array[Array[_]] => Connection => Unit
}
private[backend] abstract class BaseTable[FROM](fields: Seq[(String, Field[FROM, _, _])])
extends Table[FROM] {
private[backend] abstract class BaseTable[FROM](
fields: Seq[(String, Field[FROM, _, _])],
ordering: Option[Ordering[FROM]] = None,
) extends Table[FROM] {
override def prepareData(
in: Vector[FROM],
stringInterning: StringInterning,
): Array[Array[_]] =
fields.view.map(_._2.toArray(in, stringInterning)).toArray
): Array[Array[_]] = {
val sortedIn = ordering.map(in.sorted(_)).getOrElse(in)
fields.view.map(_._2.toArray(sortedIn, stringInterning)).toArray
}
}
private[backend] object Table {

View File

@ -40,10 +40,14 @@ private[h2] object H2Schema {
): Table[FROM] =
Table.batchedInsert(tableName)(fields: _*)
override def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int)(
override def idempotentInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =
H2Table.idempotentBatchedInsert(tableName, keyFieldIndex)(fields: _*)
H2Table.idempotentBatchedInsert(tableName, keyFieldIndex, ordering)(fields: _*)
}
val schema: Schema[DbDto] = AppendOnlySchema(H2FieldStrategy)

View File

@ -14,8 +14,9 @@ private[h2] object H2Table {
private def idempotentBatchedInsertBase[FROM](
insertStatement: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(fields: Seq[(String, Field[FROM, _, _])]): Table[FROM] =
new BaseTable[FROM](fields) {
new BaseTable[FROM](fields, Some(ordering)) {
override def executeUpdate: Array[Array[_]] => Connection => Unit =
data =>
connection =>
@ -71,11 +72,16 @@ private[h2] object H2Table {
|""".stripMargin
}
def idempotentBatchedInsert[FROM](tableName: String, keyFieldIndex: Int)(
def idempotentBatchedInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =
idempotentBatchedInsertBase(
idempotentBatchedInsertStatement(tableName, fields, keyFieldIndex),
keyFieldIndex,
ordering,
)(fields)
}

View File

@ -35,10 +35,14 @@ private[oracle] object OracleSchema {
): Table[FROM] =
Table.batchedInsert(tableName)(fields: _*)
override def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int)(
override def idempotentInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =
OracleTable.idempotentInsert(tableName, keyFieldIndex)(fields: _*)
OracleTable.idempotentInsert(tableName, keyFieldIndex, ordering)(fields: _*)
}
val schema: Schema[DbDto] = AppendOnlySchema(OracleFieldStrategy)

View File

@ -10,9 +10,10 @@ import java.sql.Connection
private[oracle] object OracleTable {
private def idempotentInsertBase[FROM](
insertStatement: String
insertStatement: String,
ordering: Ordering[FROM],
)(fields: Seq[(String, Field[FROM, _, _])]): Table[FROM] =
new BaseTable[FROM](fields) {
new BaseTable[FROM](fields, Some(ordering)) {
override def executeUpdate: Array[Array[_]] => Connection => Unit =
data =>
connection =>
@ -54,10 +55,15 @@ private[oracle] object OracleTable {
|""".stripMargin
}
def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int)(
def idempotentInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =
idempotentInsertBase(
idempotentInsertStatement(tableName, fields, keyFieldIndex)
idempotentInsertStatement(tableName, fields, keyFieldIndex),
ordering,
)(fields)
}

View File

@ -40,10 +40,14 @@ private[postgresql] object PGSchema {
): Table[FROM] =
PGTable.transposedInsert(tableName)(fields: _*)
override def idempotentInsert[FROM](tableName: String, keyFieldIndex: Int)(
override def idempotentInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] =
PGTable.idempotentTransposedInsert(tableName, keyFieldIndex)(fields: _*)
PGTable.idempotentTransposedInsert(tableName, keyFieldIndex, ordering)(fields: _*)
}
val schema: Schema[DbDto] = AppendOnlySchema(PGFieldStrategy)

View File

@ -10,9 +10,10 @@ import java.sql.Connection
private[postgresql] object PGTable {
private def transposedInsertBase[FROM](
insertStatement: String
insertStatement: String,
ordering: Option[Ordering[FROM]] = None,
)(fields: Seq[(String, Field[FROM, _, _])]): Table[FROM] =
new BaseTable[FROM](fields) {
new BaseTable[FROM](fields, ordering) {
override def executeUpdate: Array[Array[_]] => Connection => Unit =
data =>
connection =>
@ -59,10 +60,17 @@ private[postgresql] object PGTable {
): Table[FROM] =
transposedInsertBase(transposedInsertStatement(tableName, fields))(fields)
def idempotentTransposedInsert[FROM](tableName: String, keyFieldIndex: Int)(
def idempotentTransposedInsert[FROM](
tableName: String,
keyFieldIndex: Int,
ordering: Ordering[FROM],
)(
fields: (String, Field[FROM, _, _])*
): Table[FROM] = {
val insertSuffix = s"on conflict (${fields(keyFieldIndex)._1}) do nothing"
transposedInsertBase(transposedInsertStatement(tableName, fields, insertSuffix))(fields)
transposedInsertBase(
transposedInsertStatement(tableName, fields, insertSuffix),
Some(ordering),
)(fields)
}
}

View File

@ -27,6 +27,7 @@ import com.digitalasset.canton.logging.{
NamedLogging,
}
import com.digitalasset.canton.metrics.Metrics
import com.digitalasset.canton.platform.ResourceOwnerOps
import com.digitalasset.canton.platform.config.ServerRole
import com.digitalasset.canton.tracing.TraceContext
import com.google.common.util.concurrent.ThreadFactoryBuilder
@ -130,7 +131,10 @@ object DbDispatcher {
connectionTimeout: FiniteDuration,
metrics: Metrics,
loggerFactory: NamedLoggerFactory,
): ResourceOwner[DbDispatcher with ReportsHealth] =
): ResourceOwner[DbDispatcher with ReportsHealth] = {
val logger = loggerFactory.getTracedLogger(getClass)
def log(s: String): Unit =
logger.debug(s"[${serverRole.threadPoolSuffix}] $s")(TraceContext.empty)
for {
hikariDataSource <- HikariDataSourceOwner(
dataSource = dataSource,
@ -139,27 +143,39 @@ object DbDispatcher {
maxPoolSize = connectionPoolSize,
connectionTimeout = connectionTimeout,
metrics = Some(metrics.registry),
)
connectionProvider <- DataSourceConnectionProvider.owner(hikariDataSource, loggerFactory)
).afterReleased(log("HikariDataSource released"))
connectionProvider <- DataSourceConnectionProvider
.owner(
hikariDataSource,
serverRole.threadPoolSuffix,
loggerFactory,
)
.afterReleased(log("DataSourceConnectionProvider released"))
threadPoolName = MetricName(
metrics.daml.index.db.threadpool.connection,
serverRole.threadPoolSuffix,
)
executor <- ResourceOwner.forExecutorService(() =>
InstrumentedExecutors.newFixedThreadPoolWithFactory(
threadPoolName,
connectionPoolSize,
new ThreadFactoryBuilder()
.setNameFormat(s"$threadPoolName-%d")
.setUncaughtExceptionHandler((_, e) =>
loggerFactory
.getTracedLogger(getClass)
.error("Uncaught exception in the SQL executor.", e)(TraceContext.empty)
)
.build(),
metrics.executorServiceMetrics,
executor <- ResourceOwner
.forExecutorService(
() =>
InstrumentedExecutors.newFixedThreadPoolWithFactory(
threadPoolName,
connectionPoolSize,
new ThreadFactoryBuilder()
.setNameFormat(s"$threadPoolName-%d")
.setUncaughtExceptionHandler((_, e) =>
loggerFactory
.getTracedLogger(getClass)
.error("Uncaught exception in the SQL executor.", e)(TraceContext.empty)
)
.build(),
metrics.executorServiceMetrics,
),
gracefulAwaitTerminationMillis =
5000, // waiting 5s for ongoing SQL operations to finish and then forcing them with Thread.interrupt...
forcefulAwaitTerminationMillis = 5000, // ...and then waiting 5s more
)
)
.afterReleased(log("ExecutorService released"))
} yield new DbDispatcherImpl(
connectionProvider = connectionProvider,
executor = executor,
@ -167,4 +183,5 @@ object DbDispatcher {
overallExecutionTimer = metrics.daml.index.db.execAll,
loggerFactory = loggerFactory,
)
}
}

View File

@ -6,14 +6,15 @@ package com.digitalasset.canton.platform.store.dao
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.resources.ResourceOwner
import com.daml.metrics.{DatabaseMetrics, Timed}
import com.daml.scalautil.Statement.discard
import com.digitalasset.canton.ledger.api.health.{HealthStatus, Healthy, Unhealthy}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.platform.config.ServerRole
import com.digitalasset.canton.tracing.TraceContext
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.{Connection, SQLTransientConnectionException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Timer, TimerTask}
import javax.sql.DataSource
import scala.concurrent.duration.{DurationInt, FiniteDuration}
@ -49,38 +50,24 @@ object DataSourceConnectionProvider {
def owner(
dataSource: DataSource,
logMarker: String,
loggerFactory: NamedLoggerFactory,
): ResourceOwner[JdbcConnectionProvider] =
for {
healthPoller <- ResourceOwner.forTimer(() =>
new Timer("DataSourceConnectionProvider#healthPoller")
healthPoller <- ResourceOwner.forTimer(
() => new Timer(s"DataSourceConnectionProvider-$logMarker#healthPoller", true),
waitForRunningTasks = false, // do not stop resource release with ongoing healthcheck
)
transientFailureCount = new AtomicInteger(0)
checkHealth <- ResourceOwner.forCloseable(() =>
new HealthCheckTask(
dataSource = dataSource,
transientFailureCount = transientFailureCount,
logMarker = logMarker,
loggerFactory = loggerFactory,
)
)
} yield {
val transientFailureCount = new AtomicInteger(0)
val logger = loggerFactory.getTracedLogger(getClass)
val checkHealth = new TimerTask {
private def printProblem(problem: String): Unit = {
val count = transientFailureCount.incrementAndGet()
if (count == 1)
logger.info(s"Hikari connection health check failed with $problem problem")(
TraceContext.empty
)
()
}
override def run(): Unit =
try {
dataSource.getConnection().close()
transientFailureCount.set(0)
} catch {
case _: SQLTransientConnectionException =>
printProblem("transient connection")
case NonFatal(_) =>
printProblem("unexpected")
}
}
healthPoller.schedule(checkHealth, 0, HealthPollingSchedule.toMillis)
new JdbcConnectionProvider {
@ -119,3 +106,45 @@ object DataSourceConnectionProvider {
}
}
}
class HealthCheckTask(
dataSource: DataSource,
transientFailureCount: AtomicInteger,
logMarker: String,
val loggerFactory: NamedLoggerFactory,
) extends TimerTask
with AutoCloseable
with NamedLogging {
private val closed = new AtomicBoolean(false)
private implicit val emptyTraceContext: TraceContext = TraceContext.empty
private def printProblem(problem: String): Unit = {
val count = transientFailureCount.incrementAndGet()
if (count == 1) {
if (closed.get()) {
logger.debug(
s"$logMarker Hikari connection health check failed after health checking stopped with: $problem"
)
} else {
logger.info(s"$logMarker Hikari connection health check failed with: $problem")
}
}
}
override def run(): Unit =
try {
dataSource.getConnection.close()
transientFailureCount.set(0)
} catch {
case e: SQLTransientConnectionException =>
printProblem(s"transient connection exception: $e")
case NonFatal(e) =>
printProblem(s"unexpected exception: $e")
}
override def close(): Unit = {
discard(this.cancel()) // this prevents further tasks to execute
closed.set(true) // to emit log on debug level instead of info
}
}

View File

@ -150,6 +150,7 @@ private[backend] trait StorageBackendTestsIngestion
)
val conflictingPackageDtos = 11 to 20 map packageFor
val reversedConflictingPackageDtos = conflictingPackageDtos.reverse
val packages1 = 21 to 30 map packageFor
val packages2 = 31 to 40 map packageFor
@ -168,7 +169,7 @@ private[backend] trait StorageBackendTestsIngestion
}
val ingestF1 = ingestPackagesF(connection1, packages1 ++ conflictingPackageDtos)
val ingestF2 = ingestPackagesF(connection2, packages2 ++ conflictingPackageDtos)
val ingestF2 = ingestPackagesF(connection2, packages2 ++ reversedConflictingPackageDtos)
Await.result(ingestF1, Duration(10, "seconds"))
Await.result(ingestF2, Duration(10, "seconds"))

View File

@ -0,0 +1,159 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.platform.store.dao
import com.daml.ledger.resources.ResourceContext
import com.digitalasset.canton.HasExecutionContext
import com.digitalasset.canton.concurrent.Threading
import com.digitalasset.canton.logging.SuppressionRule.{FullSuppression, LoggerNameContains}
import com.digitalasset.canton.logging.{NamedLogging, SuppressingLogger}
import com.digitalasset.canton.platform.config.ServerRole
import com.digitalasset.canton.platform.indexer.ha.TestConnection
import com.digitalasset.canton.tracing.TraceContext
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Seconds, Span}
import java.io.PrintWriter
import java.sql.Connection
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Logger
import javax.naming.OperationNotSupportedException
import javax.sql.DataSource
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
class HikariJdbcConnectionProviderSpec
extends AsyncFlatSpec
with Matchers
with NamedLogging
with HasExecutionContext
with Eventually {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(5, Seconds)))
val loggerFactory: SuppressingLogger = SuppressingLogger(getClass)
private implicit val tc: TraceContext = TraceContext.empty
behavior of "HikariJdbcConnectionProvider"
// in case of a depleted Hikari pool getting the connection can take connectionTimeout
it should "not wait for releasing for a long-running health check" in {
val connectionGate = new AtomicBoolean(true)
val dataSourceFixture = new DataSource {
private def notSupported = throw new OperationNotSupportedException
override def getConnection: Connection =
if (connectionGate.get()) new TestConnection {
override def isClosed: Boolean = {
// connection is signalled as closed after we gate connections
if (connectionGate.get()) super.isClosed
else true
}
// connection is signalled as invalid after we gate connections
override def isValid(i: Int): Boolean = connectionGate.get()
// the rest of the overrides here are needed to successfully mock a connection to Hikari
override def isReadOnly: Boolean = false
override def getAutoCommit: Boolean = true
override def setAutoCommit(b: Boolean): Unit = ()
override def getTransactionIsolation: Int = Connection.TRANSACTION_READ_COMMITTED
override def getNetworkTimeout: Int = 5000
override def setNetworkTimeout(executor: Executor, i: Int): Unit = ()
override def commit(): Unit = ()
override def rollback(): Unit = ()
override def clearWarnings(): Unit = ()
}
else throw new Exception("no connection available at the moment")
override def getConnection(username: String, password: String): Connection = notSupported
override def getLogWriter: PrintWriter = notSupported
override def setLogWriter(out: PrintWriter): Unit = notSupported
override def setLoginTimeout(seconds: Int): Unit = ()
override def getLoginTimeout: Int = 0
override def unwrap[T](iface: Class[T]): T = notSupported
override def isWrapperFor(iface: Class[_]): Boolean = notSupported
override def getParentLogger: Logger = notSupported
}
val connectionProviderOwner =
for {
hikariDataSource <- HikariDataSourceOwner(
dataSource = dataSourceFixture,
serverRole = ServerRole.Testing(this.getClass),
minimumIdle = 10,
maxPoolSize = 10,
connectionTimeout = FiniteDuration(10, "seconds"),
metrics = None,
)
_ <- DataSourceConnectionProvider.owner(
dataSource = hikariDataSource,
logMarker = "test",
loggerFactory = loggerFactory,
)
} yield hikariDataSource
implicit val resourceContext = ResourceContext(implicitly)
val suppressionRules = FullSuppression &&
LoggerNameContains("HealthCheckTask")
loggerFactory.suppress(suppressionRules) {
connectionProviderOwner
.use { hikariDataSource =>
logger.info("HikariJdbcConnectionProvider initialized")
def getConnection = Future(hikariDataSource.getConnection.close())
for {
_ <- Future.sequence(1.to(20).map(_ => getConnection))
} yield {
logger.info(
"HikariJdbcConnectionProvider is functional: fetched and returned 20 connections"
)
val start = System.currentTimeMillis()
connectionGate.set(false);
Threading.sleep(
500
) // so that the Hikari isValid bypass default duration of 500 millis pass (otherwise getting connection from the pool is not necessary checked)
val failingConnection = getConnection
Threading.sleep(500) // so that health check already hangs (this is polled every second)
failingConnection.isCompleted shouldBe false // failing connection will hang for 10 seconds
(start, failingConnection)
}
}
.flatMap { case (start, failingConnection) =>
val released = System.currentTimeMillis()
released - start should be > 1000L
released - start should be < 1500L // because we are not waiting for the healthcheck to be finished
failingConnection.isCompleted shouldBe false // failing connection will hang for 10 seconds
failingConnection.failed.map(_ => start)
}
.map { start =>
val failedConnectionFinished = System.currentTimeMillis()
failedConnectionFinished - start should be > 10000L
eventually {
val logEntries = loggerFactory.fetchRecordedLogEntries
logEntries.size shouldBe 1
logEntries(0).debugMessage should include(
"Hikari connection health check failed after health checking stopped with"
)
}
}
}
}
}

View File

@ -93,7 +93,6 @@ class ParticipantNodeBootstrap(
_ => Future.successful(SchedulersWithParticipantPruning.noop),
ledgerApiServerFactory: CantonLedgerApiServerFactory,
private[canton] val persistentStateFactory: ParticipantNodePersistentStateFactory,
skipRecipientsCheck: Boolean,
)(implicit
executionContext: ExecutionContextIdlenessExecutorService,
scheduler: ScheduledExecutorService,
@ -334,7 +333,6 @@ class ParticipantNodeBootstrap(
topologyManager,
packageDependencyResolver,
componentFactory,
skipRecipientsCheck,
).map {
case (
partyNotifier,
@ -546,7 +544,6 @@ object ParticipantNodeBootstrap {
createResourceService(arguments),
createReplicationServiceFactory(arguments),
persistentStateFactory = ParticipantNodePersistentStateFactory,
skipRecipientsCheck = false,
ledgerApiServerFactory = ledgerApiServerFactory,
)

View File

@ -251,7 +251,6 @@ trait ParticipantNodeBootstrapCommon {
topologyManager: ParticipantTopologyManagerOps,
packageDependencyResolver: PackageDependencyResolver,
componentFactory: ParticipantComponentBootstrapFactory,
skipRecipientsCheck: Boolean,
overrideKeyUniqueness: Option[Boolean] = None, // TODO(i13235) remove when UCK is gone
)(implicit executionSequencerFactory: ExecutionSequencerFactory): EitherT[
FutureUnlessShutdown,
@ -454,7 +453,6 @@ trait ParticipantNodeBootstrapCommon {
sequencerInfoLoader,
arguments.futureSupervisor,
loggerFactory,
skipRecipientsCheck,
multiDomainLedgerAPIEnabled = ledgerApiServerFactory.multiDomainEnabled,
)

View File

@ -83,7 +83,6 @@ class ParticipantNodeBootstrapX(
_ => Future.successful(SchedulersWithParticipantPruning.noop),
private[canton] val persistentStateFactory: ParticipantNodePersistentStateFactory,
ledgerApiServerFactory: CantonLedgerApiServerFactory,
skipRecipientsCheck: Boolean,
)(implicit
executionContext: ExecutionContextIdlenessExecutorService,
scheduler: ScheduledExecutorService,
@ -294,7 +293,6 @@ class ParticipantNodeBootstrapX(
participantOps,
packageDependencyResolver,
componentFactory,
skipRecipientsCheck,
overrideKeyUniqueness = Some(false),
).map {
case (
@ -390,7 +388,6 @@ object ParticipantNodeBootstrapX {
createReplicationServiceFactory(arguments),
persistentStateFactory = ParticipantNodePersistentStateFactory,
ledgerApiServerFactory = ledgerApiServerFactory,
skipRecipientsCheck = true,
)
}

View File

@ -167,11 +167,11 @@ object AcsChange extends HasLoggerName {
)
}
loggingContext.debug(
show"Called fromCommitSet with inputs commitSet creations ${commitSet.creations}" +
show"transferIns ${commitSet.transferIns} archivals ${commitSet.archivals} transferOuts ${commitSet.transferOuts} and" +
show"Called fromCommitSet with inputs commitSet creations=${commitSet.creations};" +
show"transferIns=${commitSet.transferIns}; archivals=${commitSet.archivals}; transferOuts=${commitSet.transferOuts} and" +
show"archival transfer counters from DB $transferCounterOfArchivalIncomplete" +
show"Completed fromCommitSet with results transient $transient" +
show"activations $activations archivalDeactivations $archivalDeactivations transferOutDeactivations $transferOutDeactivations"
show"Completed fromCommitSet with results transient=$transient;" +
show"activations=$activations; archivalDeactivations=$archivalDeactivations; transferOutDeactivations=$transferOutDeactivations"
)
AcsChange(
activations = activations,

View File

@ -101,7 +101,6 @@ abstract class ProtocolProcessor[
protocolVersion: ProtocolVersion,
override protected val loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
skipRecipientsCheck: Boolean,
)(implicit
ec: ExecutionContext,
resultCast: SignedMessageContentCast[Result],
@ -725,17 +724,14 @@ abstract class ProtocolProcessor[
logger.warn(s"Request $rc: Found malformed payload: $incorrectRootHash")
}
// TODO(i12643): Remove this flag when no longer needed
checkRecipientsResult <- EitherT.right(
if (skipRecipientsCheck) FutureUnlessShutdown.pure((Seq.empty, viewsWithCorrectRootHash))
else
FutureUnlessShutdown.outcomeF(
recipientsValidator.retainInputsWithValidRecipients(
requestId,
viewsWithCorrectRootHash,
snapshot.ipsSnapshot,
)
FutureUnlessShutdown.outcomeF(
recipientsValidator.retainInputsWithValidRecipients(
requestId,
viewsWithCorrectRootHash,
snapshot.ipsSnapshot,
)
)
)
(incorrectRecipients, viewsWithCorrectRootHashAndRecipients) = checkRecipientsResult

View File

@ -68,7 +68,6 @@ class TransactionProcessor(
override protected val timeouts: ProcessingTimeout,
override protected val loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
skipRecipientsCheck: Boolean,
enableContractUpgrading: Boolean,
)(implicit val ec: ExecutionContext)
extends ProtocolProcessor[
@ -120,7 +119,6 @@ class TransactionProcessor(
staticDomainParameters.protocolVersion,
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
) {
def submit(

View File

@ -38,7 +38,6 @@ class TransferInProcessor(
targetProtocolVersion: TargetProtocolVersion,
loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
skipRecipientsCheck: Boolean,
)(implicit ec: ExecutionContext)
extends ProtocolProcessor[
TransferInProcessingSteps.SubmissionParam,
@ -64,5 +63,4 @@ class TransferInProcessor(
targetProtocolVersion.v,
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
)

View File

@ -38,7 +38,6 @@ class TransferOutProcessor(
sourceProtocolVersion: SourceProtocolVersion,
loggerFactory: NamedLoggerFactory,
futureSupervisor: FutureSupervisor,
skipRecipientsCheck: Boolean,
)(implicit ec: ExecutionContext)
extends ProtocolProcessor[
TransferOutProcessingSteps.SubmissionParam,
@ -64,5 +63,4 @@ class TransferOutProcessor(
sourceProtocolVersion.v,
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
)

View File

@ -155,7 +155,6 @@ class CantonSyncService(
val isActive: () => Boolean,
futureSupervisor: FutureSupervisor,
protected val loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
multiDomainLedgerAPIEnabled: Boolean,
)(implicit ec: ExecutionContext, mat: Materializer, val tracer: Tracer)
extends state.v2.WriteService
@ -1203,7 +1202,6 @@ class CantonSyncService(
trafficStateController,
futureSupervisor,
domainLoggerFactory,
skipRecipientsCheck = skipRecipientsCheck,
)
// update list of connected domains
@ -1638,7 +1636,6 @@ object CantonSyncService {
sequencerInfoLoader: SequencerInfoLoader,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
multiDomainLedgerAPIEnabled: Boolean,
)(implicit ec: ExecutionContext, mat: Materializer, tracer: Tracer): T
}
@ -1669,7 +1666,6 @@ object CantonSyncService {
sequencerInfoLoader: SequencerInfoLoader,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
multiDomainLedgerAPIEnabled: Boolean,
)(implicit
ec: ExecutionContext,
@ -1703,7 +1699,6 @@ object CantonSyncService {
() => storage.isActive,
futureSupervisor,
loggerFactory,
skipRecipientsCheck = skipRecipientsCheck,
multiDomainLedgerAPIEnabled: Boolean,
)
}

View File

@ -132,7 +132,6 @@ class SyncDomain(
trafficStateController: TrafficStateController,
futureSupervisor: FutureSupervisor,
override protected val loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
)(implicit ec: ExecutionContext, tracer: Tracer)
extends NamedLogging
with StartAndCloseable[Either[SyncDomainInitializationError, Unit]]
@ -189,7 +188,6 @@ class SyncDomain(
timeouts,
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
enableContractUpgrading = parameters.enableContractUpgrading,
)
@ -207,7 +205,6 @@ class SyncDomain(
SourceProtocolVersion(staticDomainParameters.protocolVersion),
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
)
private val transferInProcessor: TransferInProcessor = new TransferInProcessor(
@ -224,7 +221,6 @@ class SyncDomain(
TargetProtocolVersion(staticDomainParameters.protocolVersion),
loggerFactory,
futureSupervisor,
skipRecipientsCheck = skipRecipientsCheck,
)
private val sortedReconciliationIntervalsProvider = new SortedReconciliationIntervalsProvider(
@ -971,7 +967,6 @@ object SyncDomain {
trafficStateController: TrafficStateController,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
)(implicit ec: ExecutionContext, mat: Materializer, tracer: Tracer): T
}
@ -999,7 +994,6 @@ object SyncDomain {
trafficStateController: TrafficStateController,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
skipRecipientsCheck: Boolean,
)(implicit ec: ExecutionContext, mat: Materializer, tracer: Tracer): SyncDomain =
new SyncDomain(
domainId,
@ -1025,7 +1019,6 @@ object SyncDomain {
trafficStateController,
futureSupervisor,
loggerFactory,
skipRecipientsCheck = skipRecipientsCheck,
)
}
}

View File

@ -329,7 +329,6 @@ class ProtocolProcessorTest
testedProtocolVersion,
loggerFactory,
FutureSupervisor.Noop,
skipRecipientsCheck = false,
)(
directExecutionContext: ExecutionContext,
TransactionResultMessage.transactionResultMessageCast,

View File

@ -167,7 +167,6 @@ class CantonSyncServiceTest extends FixtureAnyWordSpec with BaseTest with HasExe
() => true,
FutureSupervisor.Noop,
SuppressingLogger(getClass),
skipRecipientsCheck = false,
multiDomainLedgerAPIEnabled = false,
)
}

View File

@ -34,6 +34,8 @@ trait ProtocolVersionChecksFixtureAnyWordSpec {
this: TestEssentials & FixtureAnyWordSpecLike =>
implicit class ProtocolCheckString(verb: String) {
def onlyRunWhen(condition: Boolean): OnlyRunWhenWordSpecStringWrapper =
new OnlyRunWhenWordSpecStringWrapper(verb, condition)
def onlyRunWithOrGreaterThan(
minProtocolVersion: ProtocolVersion
): OnlyRunWhenWordSpecStringWrapper =

View File

@ -0,0 +1,21 @@
Reference Configurations
========================
This directory contains a set of reference configurations. The configurations aim to provide a
starting point for your own setup. The following configurations are included:
* `sandbox`: A simple setup for a single participant node connected to a single
domain node, using in-memory stores for testing.
* `participant`: A participant node storing data within a PostgresSQL database.
* `domain`: A reference configuration for an embedded domain node which runs all the three domain node types in one process.
For the Enterprise Edition, the following configurations are included:
* `sequencer`: A high-throughput sequencer node.
* `mediator`: A mediator node.
* `manager`: A domain manager node configuration.
If you use TLS, note that you need to have an appropriate set of TLS certificates to run the example configurations.
You can use the `tls/gen-test-certs.sh` script to generate a set of self-signed certificates for testing purposes.
Please check the [installation guide](https://docs.daml.com/canton/usermanual/installation.html) for further details on how to run these configurations.

View File

@ -0,0 +1,44 @@
// Embedded domain configuration example
// Include the shared configuration file (which includes storage and monitoring)
include required("shared.conf")
// TLS configuration
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#tls-configuration
// Comment out the following two lines to disable TLS
include required("tls/tls-public-api.conf")
include required("tls/mtls-admin-api.conf")
canton.domains.mydomain {
init {
// Configure the node identifier
identity.node-identifier = ${?_shared.identifier}
// Domain Parameter Configuration (we recommend to use defaults)
domain-parameters = ${?_shared.domain-dev-params}
}
// Storage configuration (references included storage from shared.conf)
storage = ${_shared.storage}
storage.config.properties.databaseName = "canton_domain"
public-api {
address = localhost
port = 10018
tls = ${?_shared.public-api-tls}
}
admin-api {
address = localhost
port = 10019
tls = ${?_shared.admin-api-mtls}
}
// Configure GRPC Health Server for monitoring
// See https://docs.daml.com/canton/usermanual/monitoring.html#grpc-health-check-service
monitoring.grpc-health-server {
address = localhost
port = 10013
}
}

View File

@ -0,0 +1,33 @@
// Example Domain Manager configuration
// Include the shared configuration file (which includes storage and monitoring)
include required("shared.conf")
// TLS configuration
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#tls-configuration
// Comment out the following two lines to disable TLS
include required("tls/mtls-admin-api.conf")
canton.domain-managers.manager {
init {
// Configure the node identifier
identity.node-identifier = ${?_shared.identifier}
// Domain Parameter Configuration (we recommend to use defaults)
domain-parameters = ${?_shared.domain-dev-params}
}
// Storage configuration (references included storage from shared.conf)
storage = ${_shared.storage}
storage.config.properties.databaseName = "canton_manager"
admin-api {
address = localhost
port = 10052
tls = ${?_shared.admin-api-mtls}
}
}

View File

@ -0,0 +1,23 @@
// Example Mediator configuration
// Include the shared configuration file (which includes storage and monitoring)
include required("shared.conf")
// TLS configuration
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#tls-configuration
// Comment out the following two lines to disable TLS
include required("tls/mtls-admin-api.conf")
canton.mediators.mediator {
// Storage configuration (references included storage from shared.conf)
storage = ${_shared.storage}
storage.config.properties.databaseName = "canton_mediator"
admin-api {
address = localhost
port = 10042
tls = ${?_shared.admin-api-mtls}
}
}

View File

@ -0,0 +1,8 @@
canton.monitoring {
// Enables detailed query monitoring, which you can use to diagnose database performance issues.
log-query-cost.every = 60s
// Logs all messages that enter or exit the server. Has a significant performance impact, but can
// be very useful for debugging.
logging.api.message-payloads = true
}

View File

@ -0,0 +1,18 @@
// The following configuration options turn on future features of the system. These features are not
// stable and not supported for production. You will not be able to upgrade to a stable version of Canton
// anymore.
_shared {
participant-dev-params = {
dev-version-support = true
initial-protocol-version = dev
}
// domain parameters config
domain-dev-params = {
dev-version-support = true
protocol-version = dev
}
}
canton.parameters {
non-standard-config = yes
dev-version-support = yes
}

View File

@ -0,0 +1,9 @@
// The following parameters will enable various dangerous or not yet GA supported commands.
// Please use with care, as they are not supported for production deployments or not given with
// any backwards compatibility guarantees.
canton.features {
enable-testing-commands = yes
enable-repair-commands = yes
enable-preview-commands = yes
}

View File

@ -0,0 +1,19 @@
// Parameter set to reduce the sequencer latency at the expensive of a higher
// database load. Please note that this change is targeting the original
// high-throughput parameter set.
// The other parameter set `low-latency` is optimised for testing such that the ledger
// response time is as low as possible at the cost of reducing the throughput.
_shared {
sequencer-writer {
// If you need lower latency, you can use these low latency parameters
payload-write-batch-max-duration = 1ms
event-write-batch-max-duration = 1ms
payload-write-max-concurrency = 10
}
sequencer-reader {
// How often should the reader poll the database for updates
// low value = low latency, higher db load
polling-interval = 1ms
read-batch-size = 1000
}
}

View File

@ -0,0 +1,9 @@
canton.monitoring.metrics {
report-jvm-metrics = yes
reporters = [{
type = prometheus
address = 0.0.0.0
// This will expose the prometheus metrics on port 9000
port = 9000
}]
}

View File

@ -0,0 +1,6 @@
canton.monitoring.tracing.tracer.exporter = {
// zipkin or otlp are alternatives
type = jaeger
address = 169.254.0.0
port = 14250
}

View File

@ -0,0 +1,73 @@
// Example Participant Configuration
// Include the shared configuration file (which includes storage and monitoring)
include required("shared.conf")
// TLS configuration
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#tls-configuration
// Comment out the following two lines to disable TLS
include required("tls/tls-ledger-api.conf")
include required("tls/mtls-admin-api.conf")
// JWT Configuration
// Enable JWT Authorization on the Ledger API
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#jwt-authorization
include required("jwt/unsafe-hmac256.conf")
// include required("jwt/certificate.conf")
// include required("jwt/jwks.conf")
canton.participants.participant {
// Configure the node identifier
init.identity.node-identifier = ${?_shared.identifier}
// Storage configuration (references included storage from shared.conf)
storage = ${_shared.storage}
storage.config.properties.databaseName = "canton_participant"
// The following database parameter set assumes that the participants runs on a host machine with 8-16 cores
// and that the database server has 8 cores available for this node.
// https://docs.daml.com/2.8.0/canton/usermanual/persistence.html#performance
// Ideal allocation depends on your use-case.
// https://docs.daml.com/2.8.0/canton/usermanual/persistence.html#max-connection-settings
// Large: 18 = (6,6,6), Medium: 9 = (3,3,3), Small: 6 = (2,2,2)
storage.parameters {
connection-allocation {
num-ledger-api = 6
num-reads = 6
num-writes = 6
}
max-connections = 18
// Optional define the ledger-api jdbc URL directly (used for Oracle backends)
ledger-api-jdbc-url = ${?_shared.storage.ledger-api-jdbc-url}
}
// Ledger API Configuration Section
ledger-api {
// by default, canton binds to 127.0.0.1, only enabling localhost connections
// you need to explicitly set the address to enable connections from other hosts
address = localhost
port = 10001
tls = ${?_shared.ledger-api-tls}
// Include JWT Authorization
auth-services = ${?_shared.ledger-api.auth-services}
}
admin-api {
address = localhost
port = 10002
tls = ${?_shared.admin-api-mtls}
}
// Configure GRPC Health Server for monitoring
// See https://docs.daml.com/canton/usermanual/monitoring.html#grpc-health-check-service
monitoring.grpc-health-server {
address = localhost
port = 10003
}
// Optionally include parameters defined in `misc/dev-protocol.conf`
// Please note that you can not use dev features in production.
parameters = ${?_shared.participant-dev-params}
}

View File

@ -0,0 +1,19 @@
// Example remote domain configuration
// Include TLS configuration
include required("../tls/mtls-admin-api.conf")
include required("../tls/tls-public-api.conf")
canton {
remote-domains.mydomain {
public-api = ${?_shared.public-api-client-tls}
public-api {
address = localhost
port = 10018
}
admin-api {
address = localhost
port = 10019
tls = ${?_shared.admin-api-client-mtls}
}
}
}

View File

@ -0,0 +1,14 @@
// Example remote domain manager configuration
// Include TLS configuration
include required("../tls/mtls-admin-api.conf")
canton {
remote-domain-managers.manager {
admin-api {
address = localhost
port = 10052
tls = ${?_shared.admin-api-client-mtls}
}
}
}

View File

@ -0,0 +1,14 @@
// Example remote mediators configuration
// Include TLS configuration
include required("../tls/mtls-admin-api.conf")
canton {
remote-mediators.mediator {
admin-api {
address = localhost
port = 10042
tls = ${?_shared.admin-api-client-mtls}
}
}
}

View File

@ -0,0 +1,19 @@
// Example remote participant configuration
// Include TLS configuration
include required("../tls/mtls-admin-api.conf")
include required("../tls/tls-ledger-api.conf")
canton {
remote-participants.participant {
ledger-api {
address = localhost
port = 10001
tls = ${?_shared.ledger-api-client-tls}
}
admin-api {
address = localhost
port = 10002
tls = ${?_shared.admin-api-client-mtls}
}
}
}

View File

@ -0,0 +1,19 @@
// Example remote sequencer configuration
// Include TLS configuration
include required("../tls/mtls-admin-api.conf")
include required("../tls/tls-public-api.conf")
canton {
remote-sequencers.sequencer {
public-api = ${?_shared.public-api-client-tls}
public-api {
address = localhost
port = 10038
}
admin-api {
address = localhost
port = 10039
tls = ${?_shared.admin-api-client-mtls}
}
}
}

View File

@ -0,0 +1,33 @@
// Sandbox configuration
//
// You can start & auto-connect the sandbox with
// ./bin/canton -c config/sandbox.conf --auto-connect-local
//
include required("misc/debug.conf")
canton {
// Turn on message payload logging to help debugging
monitoring.logging.api.message-payloads = true
participants.sandbox {
// Enable engine stack traces for debugging
parameters.enable-engine-stack-traces = true
ledger-api {
address = localhost
port = 10021
}
admin-api {
address = localhost
port = 10022
}
}
domains.local {
public-api {
address = localhost
port = 10028
}
admin-api {
address = localhost
port = 10029
}
}
}

View File

@ -0,0 +1,45 @@
// Example Sequencer Configuration
// Include the shared configuration file (which includes storage and monitoring)
include required("shared.conf")
// TLS configuration
// Please check with: https://docs.daml.com/2.8.0/canton/usermanual/apis.html#tls-configuration
// Comment out the following two lines to disable TLS
include required("tls/tls-public-api.conf")
include required("tls/mtls-admin-api.conf")
// Optionally include lower latency configuration. This is necessary for pushing
// the transaction latencies from ~ 800ms to ~ 600ms at the expense of higher db
// load due to intensive polling.
// include required("misc/low-latency-sequencer.conf")
canton.sequencers.sequencer {
// Storage configuration (references included storage from shared.conf)
storage = ${_shared.storage}
storage.config.properties.databaseName = "canton_sequencer"
public-api {
address = localhost
port = 10038
tls = ${?_shared.public-api-tls}
}
admin-api {
address = localhost
port = 10039
tls = ${?_shared.admin-api-mtls}
}
sequencer {
type = database
writer = ${?_shared.sequencer-writer}
reader = ${?_shared.sequencer-reader}
// What should the default parameterization be for the writer
// high-throughput or low-latency are two parameter sets
writer.type = high-throughput
high-availability.enabled = true
}
}

View File

@ -0,0 +1,28 @@
// ------------------------------------
// Storage Choice
// ------------------------------------
// Include the Postgres persistence configuration mixin.
// You can define the Postgres connectivity settings either by using the environment
// variables POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PASSWORD
// (see storage/postgres.conf for details) or setting the values directly in the config file.
// You can also remove them from the postgres.conf and add them below directly.
include required("storage/postgres.conf")
// If you do not need persistence, you can pick
// include required("storage/memory.conf")
// Monitoring Configuration
// Turn on Prometheus metrics
include required("monitoring/prometheus.conf")
// Turn on tracing with Jaeger, Zipkin or OTLP
// include require ("monitoring/tracing.conf")
// Upon automatic initialisation, pick the following prefix for the node identifier
// the node will then be <prefix>::<fingerprint of a randomly generated key>
// Random is good for larger networks when you don not want that others know who you
// are. Explicit is better for troubleshooting.
_shared.identifier = {
type = random
// type = explicit
// name = "myNodeIdentifier"
}

View File

@ -3,11 +3,8 @@
# This file defines a shared configuration resources. You can mix it into your configuration by
# refer to the shared storage resource and add the database name.
#
# Check nodes/participant1.conf as an example
# Please note that using H2 is unstable not supported other than for testing.
#
# Please note that using H2 is currently not advised and not supported.
#
_shared {
storage {
type = "h2"
@ -17,4 +14,4 @@ _shared {
driver = org.h2.Driver
}
}
}
}

View File

@ -0,0 +1,5 @@
_shared {
storage {
type = "memory"
}
}

View File

@ -0,0 +1,47 @@
# Postgres persistence configuration mixin
#
# This file defines a shared configuration resources. You can mix it into your configuration by
# refer to the shared storage resource and add the database name.
#
# Example:
# participant1 {
# storage = ${_shared.storage}
# storage.config.properties.databaseName = "participant1"
# }
#
# The user and password is not set. You want to either change this configuration file or pass
# the settings in via environment variables POSTGRES_USER and POSTGRES_PASSWORD.
#
_shared {
storage {
type = postgres
config {
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
properties = {
serverName = "localhost"
# the next line will override above "serverName" in case the environment variable POSTGRES_HOST exists
# which makes it optional
serverName = ${?POSTGRES_HOST}
portNumber = "5432"
portNumber = ${?POSTGRES_PORT}
# user and password are equired
user = ${POSTGRES_USER}
password = ${POSTGRES_PASSWORD}
}
}
parameters {
# If defined, will configure the number of database connections per node.
# Please note that the number of connections can be fine tuned for participant nodes (see participant.conf)
max-connections = ${?POSTGRES_NUM_CONNECTIONS}
# If true, then database migrations will be applied on startup automatically
# Otherwise, you will have to run the migration manually using participant.db.migrate()
migrate-and-start = false
# If true (default), then the node will fail to start if it can not connect to the database.
# The setting is useful during initial deployment to get immediate feedback when the
# database is not available.
# In a production setup, you might want to set this to false to allow uncoordinated startups between
# the database and the node.
fail-fast-on-startup = true
}
}
}

View File

@ -0,0 +1,56 @@
#!/bin/bash
# Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# architecture-handbook-entry-begin: GenTestCertsCmds
DAYS=3650
function create_key {
local name=$1
openssl genrsa -out "${name}.key" 4096
# netty requires the keys in pkcs8 format, therefore convert them appropriately
openssl pkcs8 -topk8 -nocrypt -in "${name}.key" -out "${name}.pem"
}
# create self signed certificate
function create_certificate {
local name=$1
local subj=$2
openssl req -new -x509 -sha256 -key "${name}.key" \
-out "${name}.crt" -days ${DAYS} -subj "$subj"
}
# create certificate signing request with subject and SAN
# we need the SANs as our certificates also need to include localhost or the
# loopback IP for the console access to the admin-api and the ledger-api
function create_csr {
local name=$1
local subj=$2
local san=$3
(
echo "authorityKeyIdentifier=keyid,issuer"
echo "basicConstraints=CA:FALSE"
echo "keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment"
) > ${name}.ext
if [[ -n $san ]]; then
echo "subjectAltName=${san}" >> ${name}.ext
fi
# create certificate (but ensure that localhost is there as SAN as otherwise, admin local connections won't work)
openssl req -new -sha256 -key "${name}.key" -out "${name}.csr" -subj "$subj"
}
function sign_csr {
local name=$1
local sign=$2
openssl x509 -req -sha256 -in "${name}.csr" -extfile "${name}.ext" -CA "${sign}.crt" -CAkey "${sign}.key" -CAcreateserial \
-out "${name}.crt" -days ${DAYS}
rm "${name}.ext" "${name}.csr"
}
function print_certificate {
local name=$1
openssl x509 -in "${name}.crt" -text -noout
}
# architecture-handbook-entry-end: GenTestCertsCmds

View File

@ -0,0 +1,38 @@
#!/bin/bash
# Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# architecture-handbook-entry-begin: GenTestCerts
# include certs-common.sh from config/tls
. "$(dirname "${BASH_SOURCE[0]}")/certs-common.sh"
# create root certificate such that we can issue self-signed certs
create_key "root-ca"
create_certificate "root-ca" "/O=TESTING/OU=ROOT CA/emailAddress=canton@digitalasset.com"
#print_certificate "root-ca"
# create public api certificate
create_key "public-api"
create_csr "public-api" "/O=TESTING/OU=DOMAIN/CN=localhost/emailAddress=canton@digitalasset.com" "DNS:localhost,IP:127.0.0.1"
sign_csr "public-api" "root-ca"
print_certificate "public-api"
# create participant ledger-api certificate
create_key "ledger-api"
create_csr "ledger-api" "/O=TESTING/OU=PARTICIPANT/CN=localhost/emailAddress=canton@digitalasset.com" "DNS:localhost,IP:127.0.0.1"
sign_csr "ledger-api" "root-ca"
# create participant admin-api certificate
create_key "admin-api"
create_csr "admin-api" "/O=TESTING/OU=PARTICIPANT ADMIN/CN=localhost/emailAddress=canton@digitalasset.com" "DNS:localhost,IP:127.0.0.1"
sign_csr "admin-api" "root-ca"
# create participant client key and certificate
create_key "admin-client"
create_csr "admin-client" "/O=TESTING/OU=PARTICIPANT ADMIN CLIENT/CN=localhost/emailAddress=canton@digitalasset.com"
sign_csr "admin-client" "root-ca"
print_certificate "admin-client"
# architecture-handbook-entry-end: GenTestCerts

View File

@ -0,0 +1,38 @@
include required("tls-cert-location.conf")
_shared {
admin-api-mtls {
// Certificate and Key used by Admin API server
cert-chain-file = ${?_TLS_CERT_LOCATION}"/admin-api.crt"
private-key-file = ${?_TLS_CERT_LOCATION}"/admin-api.pem"
// Certificate used to validate client certificates. The file also needs to be provided
// if we use a self-signed certificate, such that the internal processes can connect to
// the APIs.
trust-collection-file = ${?_TLS_CERT_LOCATION}"/root-ca.crt"
client-auth = {
// none, optional and require are supported
type = require
// If clients are required to authenticate as well, we need to provide a client
// certificate and the key, as Canton has internal processes that need to connect to these
// APIs. If the server certificate is trusted by the trust-collection, then you can
// just use the server certificates (which usually happens if you use self-signed certs as we
// do in this example). Otherwise, you need to create separate ones.
admin-client {
// In this example, we use the same certificate as the server certificate.
// Please the the remote participant config to see how to configure a remote client.
cert-chain-file = ${?_TLS_CERT_LOCATION}"/admin-api.crt"
private-key-file = ${?_TLS_CERT_LOCATION}"/admin-api.pem"
}
}
}
admin-api-client-mtls {
// Certificate and Key used by remote client
client-cert {
cert-chain-file = ${?_TLS_CERT_LOCATION}"/admin-api.crt"
private-key-file = ${?_TLS_CERT_LOCATION}"/admin-api.pem"
}
// The trust collection used to verify the server certificate. Used here because of the self-signed certs.
trust-collection-file = ${?_TLS_CERT_LOCATION}"/root-ca.crt"
}
}

View File

@ -0,0 +1,2 @@
_TLS_CERT_LOCATION="config/tls"
_TLS_CERT_LOCATION=${?TLS_CERT_LOCATION}

View File

@ -0,0 +1,19 @@
include required("tls-cert-location.conf")
_shared {
ledger-api-tls {
// Certificate to be used by the server
cert-chain-file = ${?_TLS_CERT_LOCATION}"/ledger-api.crt"
// The private key of the server
private-key-file = ${?_TLS_CERT_LOCATION}"/ledger-api.pem"
// The trust collection. we use it in this example as our certificates are self
// signed but we need it such that the internal canton processes can connect to the
// Ledger API. In a production environment, you would use a proper CA and therefore
// not require this.
trust-collection-file = ${?_TLS_CERT_LOCATION}"/root-ca.crt"
}
ledger-api-client-tls {
// The trust collection used to verify the server certificate. Used here because of the self-signed certs.
trust-collection-file = ${?_TLS_CERT_LOCATION}"/root-ca.crt"
}
}

View File

@ -0,0 +1,14 @@
include required("tls-cert-location.conf")
_shared {
public-api-tls {
// certificate to be used by the server
cert-chain-file = ${?_TLS_CERT_LOCATION}"/public-api.crt"
// the private key of the server
private-key-file = ${?_TLS_CERT_LOCATION}"/public-api.pem"
}
public-api-client-tls {
transport-security = true
// The trust collection used to verify the server certificate. Used here because of the self-signed certs.
custom-trust-certificates.pem-file = ${?_TLS_CERT_LOCATION}"/root-ca.crt"
}
}

View File

@ -0,0 +1,5 @@
canton_participant
canton_domain
canton_mediator
canton_sequencer
canton_manager

View File

@ -0,0 +1,10 @@
#!/bin/bash
// user-manual-entry-begin: PostgresDbEnvConfiguration
export POSTGRES_HOST="localhost"
export POSTGRES_USER="test-user"
export POSTGRES_PASSWORD="test-password"
export POSTGRES_DB=postgres
export POSTGRES_PORT=5432
// user-manual-entry-end: PostgresDbEnvConfiguration
export DBPREFIX=""

View File

@ -0,0 +1,128 @@
#!/usr/bin/env bash
# Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
function check_file() {
local -r file="$1"
if [[ ! -e $file ]]; then
echo "Please run this script from the directory containing the $file file."
exit 1
fi
}
function do_usage() {
echo "Usage: $0 <setup|reset|drop|create-user|start|stop>"
echo " setup: create databases"
echo " reset: drop and recreate databases"
echo " drop: drop databases"
echo " create-user: create user"
echo " start [durable]: start docker db. Without durable, it will remove the container after exit"
echo " resume: resume durable docker db"
echo " stop: stop docker db"
}
function do_setup() {
for db in $(cat "databases")
do
echo "creating db ${db}"
echo "create database ${DBPREFIX}${db}; grant all on database ${DBPREFIX}${db} to current_user;" | \
PGPASSWORD=$POSTGRES_PASSWORD psql -h $POSTGRES_HOST -p $POSTGRES_PORT $POSTGRES_DB $POSTGRES_USER
done
}
function do_drop() {
for db in $(cat "databases")
do
echo "dropping db ${db}"
echo "drop database if exists ${DBPREFIX}${db};" | \
PGPASSWORD=$POSTGRES_PASSWORD psql -h $POSTGRES_HOST -p $POSTGRES_PORT $POSTGRES_DB $POSTGRES_USER
done
}
function do_create_user() {
echo "Creating user ${POSTGRES_USER} (assumes your default user can do that on ${POSTGRES_DB})..."
echo "CREATE ROLE \"${POSTGRES_USER}\" LOGIN PASSWORD '${POSTGRES_PASSWORD}';ALTER USER \"${POSTGRES_USER}\" createdb;" | \
psql -h $POSTGRES_HOST -p $POSTGRES_PORT ${POSTGRES_DB}
}
function do_start_docker_db() {
if [ "$1" == "durable" ]; then
removeDockerAfterExit=""
echo "starting durable docker based postgres"
else
echo "starting non-durable docker based postgres"
removeDockerAfterExit="--rm"
fi
docker run -d ${removeDockerAfterExit} --name canton-postgres \
--shm-size 1024mb \
--publish ${POSTGRES_PORT}:5432 \
-e POSTGRES_USER=$POSTGRES_USER \
-e POSTGRES_PASSWORD=$POSTGRES_PASSWORD \
-e POSTGRES_DB=$POSTGRES_DB \
-v "$PWD/postgres.conf":/etc/postgresql/postgresql.conf \
postgres:12 \
-c 'config_file=/etc/postgresql/postgresql.conf'
}
function do_resume_docker_db() {
echo "resuming docker based postgres"
docker start canton-postgres
}
function do_stop_docker_db() {
echo "stopping docker based postgres"
docker stop canton-postgres
}
function check_env {
if [[ -z "$POSTGRES_USER" || -z "$POSTGRES_HOST" || -z "$POSTGRES_DB" || -z "$POSTGRES_PASSWORD" || -z "$POSTGRES_PORT" ]]; then
echo 1
else
echo 0
fi
}
check_file "databases"
if [[ $(check_env) -ne 0 ]]; then
echo "Looking for db.env as environment variables are not set: POSTGRES_USER, POSTGRES_HOST, POSTGRES_DB, POSTGRES_PASSWORD, POSTGRES_PORT."
echo $(env | grep -v POSTGRES_PASSWORD | grep POSTGRES)
check_file "db.env"
source "db.env"
echo $(env | grep -v POSTGRES_PASSWORD | grep POSTGRES)
if [[ $(check_env) -ne 0 ]]; then
echo "POSTGRES_ environment is not properly set in db.env"
exit 1
fi
else
echo "Using host=${POSTGRES_HOST}, port=${POSTGRES_PORT} user=${POSTGRES_USER}, db=${POSTGRES_DB} from environment"
fi
case "$1" in
setup)
do_setup
;;
reset)
do_drop
do_setup
;;
drop)
do_drop
;;
create-user)
do_create_user
;;
start)
do_start_docker_db $2
;;
resume)
do_resume_docker_db
;;
stop)
do_stop_docker_db
;;
*)
do_usage
;;
esac

View File

@ -0,0 +1,37 @@
# Note, this config has been created using https://pgtune.leopard.in.ua/
# It targets a standard small docker deployment.
# DB Version: 12
# OS Type: linux
# DB Type: oltp
# Total Memory (RAM): 8 GB
# CPUs num: 4
# Connections num: 250
# Data Storage: ssd
listen_addresses = '*'
log_destination = 'stderr'
logging_collector = on
log_directory = '/var/log/postgresql/'
log_file_mode = 0644
log_filename = 'postgresql-%Y-%m-%d-%H.log'
log_min_messages = info
log_min_duration_statement = 2500
max_connections = 250
shared_buffers = 4GB
effective_cache_size = 6GB
maintenance_work_mem = 512MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4194kB
huge_pages = off
min_wal_size = 2GB
max_wal_size = 8GB
max_worker_processes = 4
max_parallel_workers_per_gather = 2
max_parallel_workers = 4
max_parallel_maintenance_workers = 2

View File

@ -1,33 +0,0 @@
# Connection to Canton.Global
***
WARNING: The global Canton domain is currently not running. This example does not work at the moment.
***
TODO(#7564) Make this example work again once the global domain is up
***
Participants require a domain to communicate with each other. Digital Asset is running a generally available
global Canton domain (Canton.Global). Any participant can decide to connect to the global domain and use it
for bilateral communication.
The global domain connectivity example demonstrates how to connect a participant node
to the global Canton domain. Currently, the global domain is operated as a test-net.
Longer term, the global domain will serve as a global fall-back committer which can be
used if no closer committer is available.
The global domain connectivity example contains two files, a configuration file and a
script which invokes the necessary registration call and subsequently tests the connection
by pinging the digital asset node.
```
../../bin/canton -c global-domain-participant.conf --bootstrap global-domain-participant.canton
```
After invoking above script, you will be prompted the terms of service for using the global
domain. You will have to accept it once in order to be able to use it.
Please note that right now, the global domain is a pure test-net and we are regularly resetting
the domain entirely, wiping all the content, as we are still developing the protocol. Therefore,
just use it for demonstration purposes.

View File

@ -1,13 +0,0 @@
nodes.local.start()
val domainUrl = sys.env.get("DOMAIN_URL").getOrElse("https://canton.global")
val myself = participant1
myself.domains.connect("global", domainUrl)
myself.health.ping(myself)
val da = myself.parties.list(filterParty="digitalasset").head.participants.head.participant
myself.health.ping(da)

View File

@ -1,16 +0,0 @@
canton {
participants {
participant1 {
admin-api {
port= 6012
}
ledger-api {
port = 6011
}
storage {
type = memory
}
parameters.admin-workflow.bong-test-max-level = 12
}
}
}

View File

@ -1,137 +1,3 @@
# Advanced Configuration Example
This example directory contains a collection of configuration files that can be used to setup domains or
participants for various purposes. The directory contains a set of sub-folders:
- storage: contains "storage mixins" such as [memory.conf](storage/memory.conf) or [postgres.conf](storage/postgres.conf)
- nodes: contains a set of node defintions for domains and participants
- api: contains "api mixins" that modify the API behaviour such as binding to a public address or including jwt authorization
- remote: contains a set of remote node definitions for the nodes in the nodes directory.
- parameters: contains "parameter mixins" that modify the node behaviour in various ways.
## Persistence
For every setup, you need to decide which persistence layer you want to use. Supported are [memory.conf](storage/memory.conf),
[postgres.conf](storage/postgres.conf) or Oracle (Enterprise). Please [consult the manual](https://docs.daml.com/canton/usermanual/installation.html#persistence-using-postgres)
for further instructions. The examples here will illustrate the usage using the in-memory configuration.
There is a small helper script in [dbinit.py](storage/dbinit.py) which you can use to create the appropriate SQL commands
to create users and databases for a series of nodes. This is convenient if you are setting up a test-network. You can
run it using:
```
python3 examples/03-advanced-configuration/storage/dbinit.py \
--type=postgres --user=canton --pwd=<choose-wisely> --participants=2 --domains=1 --drop
```
Please run the script with ``--help`` to get an overview of all commands. Generally, you would just pipe the output
to your SQL console.
## Nodes
The nodes directory contains a set of base configuration files that can be used together with the mix-ins.
### Domain
Start a domain with the following command:
```
./bin/canton -c examples/03-advanced-configuration/storage/memory.conf,examples/03-advanced-configuration/nodes/domain1.conf
```
The domain can be started without any bootstrap script, as it self-initialises by default, waiting for incoming connections.
If you pass in multiple configuration files, they will be combined. It doesn't matter if you separate the
configurations using `,` or if you pass them with several `-c` options.
NOTE: If you unpacked the zip directory, then you might have to make the canton startup script executable
(`chmod u+x bin/canton`).
### Participants
The participant(s) can be started the same way, just by pointing to the participant configuration file.
However, before we can use the participant for any Daml processing, we need to connect it to a domain. You can
connect to the domain interactively, or use the [initialisation script](participant-init.canton).
```
./bin/canton -c examples/03-advanced-configuration/storage/memory.conf \
-c examples/03-advanced-configuration/nodes/participant1.conf,examples/03-advanced-configuration/nodes/participant2.conf \
--bootstrap=examples/03-advanced-configuration/participant-init.canton
```
The initialisation script assumes that the domain can be reached via `localhost`, which needs to change if the domain
runs on a different server.
A setup with more participant nodes can be created using the [participant](nodes/participant1.conf) as a template.
The same applies to the domain configuration. The instance names should be changed (`participant1` to something else),
as otherwise, distinguishing the nodes in a trial run will be difficult.
## API
By default, all the APIs only bind to localhost. If you want to expose them on the network, you should secure them using
TLS and JWT. You can use the mixins configuration in the ``api`` subdirectory for your convenience.
## Parameters
The parameters directory contains a set of mix-ins to modify the behaviour of your nodes.
- [nonuck.conf](nodes/nonuck.conf) enable non-UCK mode such that you can use multiple domains per participant node (preview).
## Test Your Setup
Assuming that you have started both participants and a domain, you can verify that the system works by having
participant2 pinging participant1 (the other way around also works). A ping here is just a built-in Daml
contract which gets sent from one participant to another, and the other responds by exercising a choice.
First, just make sure that the `participant2` is connected to the domain by testing whether the following command
returns `true`
```
@ participant2.domains.active("mydomain")
```
In order to ping participant1, participant2 must know participant1's `ParticipantId`. You could obtain this from
participant1's instance of the Canton console using the command `participant1.id` and copy-pasting the resulting
`ParticipantId` to participant2's Canton console. Another option is to lookup participant1's ID directly using
participant2's console:
```
@ val participant1Id = participant2.parties.list(filterParticipant="participant1").head.participants.head.participant
```
Using the console for participant2, you can now get the two participants to ping each other:
```
@ participant2.health.ping(participant1Id)
```
## Running as Background Process
If you start Canton with the commands above, you will always be in interactive mode within the Canton console.
You can start Canton as well as a non-interactive process using
```
./bin/canton daemon -c examples/03-advanced-configuration/storage/memory.conf \
-c examples/03-advanced-configuration/nodes/participant1.conf \
--bootstrap examples/03-advanced-configuration/participant-init.canton
```
## Connect To Remote Nodes
In many cases, the nodes will run in a background process, started as `daemon`, while the user would
still like the convenience of using the console. This can be achieved by defining remote domains and
participants in the configuration file.
A participant or domain configuration can be turned into a remote config using
```
./bin/canton generate remote-config -c examples/03-advanced-configuration/storage/memory.conf,examples/03-advanced-configuration/nodes/participant1.conf
```
Then, if you start Canton using
```
./bin/canton -c remote-participant1.conf
```
you will have a new instance `participant1`, which will expose most but not all commands
that a node exposes. As an example, run:
```
participant1.health.status
```
Please note that depending on your setup, you might have to adjust the target ip address.
Please note that the configuration examples have been replaced by the reference configuration in the config directory.

View File

@ -1,8 +0,0 @@
_shared {
parameters.ledger-api-server-parameters.jwt-timestamp-leeway {
default = 5
expires-at = 10
issued-at = 15
not-before = 20
}
}

View File

@ -1,7 +0,0 @@
_shared {
ledger-api {
index-service {
max-transactions-in-memory-fan-out-buffer-size = 10000 // default 1000
}
}
}

View File

@ -1,8 +0,0 @@
_shared {
ledger-api {
index-service {
max-contract-state-cache-size = 100000 // default 1e4
max-contract-key-state-cache-size = 100000 // default 1e4
}
}
}

View File

@ -1,7 +0,0 @@
_shared {
admin-api {
// by default, canton binds to 127.0.0.1, only enabling localhost connections
// you need to explicitly set the address to enable connections from other hosts
address = 0.0.0.0
}
}

View File

@ -1,11 +0,0 @@
_shared {
public-api {
// by default, canton binds to 127.0.0.1, only enabling localhost connections
// you need to explicitly set the address to enable connections from other hosts
address = 0.0.0.0
}
ledger-api {
// same as for public-api
address = 0.0.0.0
}
}

View File

@ -1,7 +0,0 @@
_shared {
ledger-api {
auth-services = [{
type = wildcard
}]
}
}

View File

@ -1,18 +0,0 @@
canton {
domains {
domain1 {
storage = ${_shared.storage}
storage.config.properties.databaseName = "domain1"
init.domain-parameters.unique-contract-keys = ${?_.shared.unique-contract-keys}
public-api {
port = 10018
// if defined, this include will override the address we bind to. default is 127.0.0.1
address = ${?_shared.public-api.address}
}
admin-api {
port = 10019
address = ${?_shared.admin-api.address}
}
}
}
}

View File

@ -1,19 +0,0 @@
canton {
participants {
participant1 {
storage = ${_shared.storage}
storage.config.properties.databaseName = "participant1"
init.parameters.unique-contract-keys = ${?_.shared.unique-contract-keys}
admin-api {
port = 10012
// if defined, this include will override the address we bind to. default is 127.0.0.1
address = ${?_shared.admin-api.address}
}
ledger-api {
port = 10011
address = ${?_shared.ledger-api.address}
auth-services = ${?_shared.ledger-api.auth-services}
}
}
}
}

View File

@ -1,19 +0,0 @@
canton {
participants {
participant2 {
storage = ${_shared.storage}
storage.config.properties.databaseName = "participant2"
init.parameters.unique-contract-keys = ${?_.shared.unique-contract-keys}
admin-api {
port = 10022
// if defined, this include will override the address we bind to. default is 127.0.0.1
address = ${?_shared.admin-api.address}
}
ledger-api {
port = 10021
address = ${?_shared.ledger-api.address}
auth-services = ${?_shared.ledger-api.auth-services}
}
}
}
}

View File

@ -1,19 +0,0 @@
canton {
participants {
participant3 {
storage = ${_shared.storage}
storage.config.properties.databaseName = "participant3"
init.parameters.unique-contract-keys = ${?_.shared.unique-contract-keys}
admin-api {
port = 10032
// if defined, this include will override the address we bind to. default is 127.0.0.1
address = ${?_shared.admin-api.address}
}
ledger-api {
port = 10031
address = ${?_shared.ledger-api.address}
auth-services = ${?_shared.ledger-api.auth-services}
}
}
}
}

View File

@ -1,19 +0,0 @@
canton {
participants {
participant4 {
storage = ${_shared.storage}
storage.config.properties.databaseName = "participant4"
init.parameters.unique-contract-keys = ${?_.shared.unique-contract-keys}
admin-api {
port = 10042
// if defined, this include will override the address we bind to. default is 127.0.0.1
address = ${?_shared.admin-api.address}
}
ledger-api {
port = 10041
address = ${?_shared.ledger-api.address}
auth-services = ${?_shared.ledger-api.auth-services}
}
}
}
}

View File

@ -1,3 +0,0 @@
_shared {
unique-contract-keys = no
}

View File

@ -1,22 +0,0 @@
val participant = participants.local.head
// only run once
if(participant.domains.list_registered().isEmpty) {
// connect all local participants to the domain passing a user chosen alias and the domain port as the argument
participants.local.foreach(_.domains.connect("mydomain", "http://localhost:10018"))
// above connect operation is asynchronous. it is generally at the discretion of the domain
// to decide if a participant can join and when. therefore, we need to asynchronously wait here
// until the participant observes its activation on the domain
utils.retry_until_true {
participant.domains.active("mydomain")
}
// synchronize vetting to ensure the participant has the package needed for the ping
participant.packages.synchronize_vetting()
// verify that the connection works
participant.health.ping(participant)
}

View File

@ -1,14 +0,0 @@
canton {
remote-domains {
remoteDomain1 {
public-api {
address = 127.0.0.1
port = 10018
}
admin-api {
port = 10019
address = 127.0.0.1 // default value if omitted
}
}
}
}

View File

@ -1,14 +0,0 @@
canton {
remote-participants {
remoteParticipant1 {
admin-api {
port = 10012
address = 127.0.0.1 // is the default value if omitted
}
ledger-api {
port = 10011
address = 127.0.0.1 // is the default value if omitted
}
}
}
}

View File

@ -1,51 +0,0 @@
#!/usr/bin/python3
#
# Trivial helper script to create users / databases for Canton nodes
#
import argparse
import sys
def get_parser():
parser = argparse.ArgumentParser(description = "Helper utility to setup Canton databases for a set of nodes")
parser.add_argument("--type", help="Type of database to be setup", choices=["postgres"], default="postgres")
parser.add_argument("--participants", type=int, help="Number of participant dbs to generate (will create dbs named participantX for 1 to N)", default=0)
parser.add_argument("--domains", type=int, help="Number of domain dbs to generate (will create dbs named domainX for 1 to N)", default=0)
parser.add_argument("--sequencers", type=int, help="Number of sequencer dbs to generate (will create dbs named sequencerX for 1 to N", default=0)
parser.add_argument("--mediators", type=int, help="Number of mediators dbs to generate (will create dbs named mediatorX for 1 to N", default=0)
parser.add_argument("--user", type=str, help="Database user name. If given, the script will also generate a SQL command to create the user", required=True)
parser.add_argument("--pwd", type=str, help="Database password")
parser.add_argument("--drop", help="Drop existing", action="store_true")
return parser.parse_args()
def do_postgres(args):
print("""
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles
WHERE rolname = '%s') THEN
CREATE ROLE \"%s\" LOGIN PASSWORD '%s';
END IF;
END
$do$;
""" % (args.user, args.user, args.pwd))
for num, prefix in [(args.domains, "domain"), (args.participants, "participant"), (args.mediators, "mediator"), (args.sequencers, "sequencer")]:
for ii in range(1, num + 1):
dbname = prefix + str(ii)
if args.drop:
print("DROP DATABASE IF EXISTS %s;" % (dbname))
print("CREATE DATABASE %s;" % dbname)
print("GRANT ALL ON DATABASE %s to \"%s\";" % (dbname, args.user))
if __name__ == "__main__":
args = get_parser()
if args.type == "postgres":
do_postgres(args)
else:
raise Exception("Unknown database type %s" % (args.type))

View File

@ -1,5 +0,0 @@
_shared {
storage {
type = "memory"
}
}

View File

@ -1,37 +0,0 @@
# Postgres persistence configuration mixin
#
# This file defines a shared configuration resources. You can mix it into your configuration by
# refer to the shared storage resource and add the database name.
#
# Example:
# participant1 {
# storage = ${_shared.storage}
# storage.config.properties.databaseName = "participant1"
# }
#
# The user and password credentials are set to "canton" and "supersafe". As this is not "supersafe", you might
# want to either change this configuration file or pass the settings in via environment variables.
#
_shared {
storage {
type = postgres
config {
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
properties = {
serverName = "localhost"
# the next line will override above "serverName" in case the environment variable POSTGRES_HOST exists
serverName = ${?POSTGRES_HOST}
portNumber = "5432"
portNumber = ${?POSTGRES_PORT}
# the next line will fail configuration parsing if the POSTGRES_USER environment variable is not set
user = ${POSTGRES_USER}
password = ${POSTGRES_PASSWORD}
}
}
// If defined, will configure the number of database connections per node.
// Please ensure that your database is setup with sufficient connections.
// If not configured explicitly, every node will create one connection per core on the host machine. This is
// subject to change with future improvements.
parameters.max-connections = ${?POSTGRES_NUM_CONNECTIONS}
}
}

View File

@ -16,7 +16,7 @@ To set up this scenario, run
```
../../bin/canton -Dcanton-examples.dar-path=../../dars/CantonExamples.dar \
-c participant1.conf,participant2.conf,domain-repair-lost.conf,domain-repair-new.conf \
-c ../03-advanced-configuration/storage/h2.conf,enable-preview-commands.conf \
-c ../../config/storage/h2.conf,enable-preview-commands.conf \
--bootstrap domain-repair-init.canton
```
@ -33,6 +33,6 @@ To set up this scenario, run
```
../../bin/canton -Dcanton-examples.dar-path=../../dars/CantonExamples.dar \
-c participant1.conf,participant2.conf,participant3.conf,participant4.conf,domain-export-ledger.conf,domain-import-ledger.conf \
-c ../03-advanced-configuration/storage/h2.conf,enable-preview-commands.conf \
-c ../../config/storage/h2.conf,enable-preview-commands.conf \
--bootstrap import-ledger-init.canton
```

View File

@ -1,4 +1,4 @@
POSTGRES_PASSWORD=supersafe
POSTGRES_USER=canton
TLS_CERT_LOCATION=enterprise/app/src/test/resources/tls
JWT_URL="https://bla.fasel/jwks.key"
JWT_CERTIFICATE_FILE="community/app/src/test/resources/dummy.crt"
JWT_CERTIFICATE_FILE="community/app/src/test/resources/dummy.crt"
POSTGRES_PASSWORD="supersafe"

View File

@ -0,0 +1,3 @@
canton.participants.participant.ledger-api.index-service {
max-transactions-in-memory-fan-out-buffer-size = 10000 // default 1000
}

View File

@ -0,0 +1,4 @@
canton.participants.participant.ledger-api.index-service {
max-contract-state-cache-size = 100000 // default 1e4
max-contract-key-state-cache-size = 100000 // default 1e4
}

View File

@ -0,0 +1,6 @@
canton.participants.participant.parameters.ledger-api-server-parameters.jwt-timestamp-leeway {
default = 5
expires-at = 10
issued-at = 15
not-before = 20
}

View File

@ -13,8 +13,8 @@ import com.digitalasset.canton.integration.CommunityTests.{
IsolatedCommunityEnvironments,
}
import com.digitalasset.canton.integration.tests.ExampleIntegrationTest.{
advancedConfiguration,
ensureSystemProperties,
referenceConfiguration,
repairConfiguration,
simpleTopology,
}
@ -62,7 +62,7 @@ object ExampleIntegrationTest {
lazy val examplesPath: File = "community" / "app" / "src" / "pack" / "examples"
lazy val simpleTopology: File = examplesPath / "01-simple-topology"
lazy val createDamlApp: File = examplesPath / "04-create-daml-app"
lazy val advancedConfiguration: File = examplesPath / "03-advanced-configuration"
lazy val referenceConfiguration: File = "community" / "app" / "src" / "pack" / "config"
lazy val composabilityConfiguration: File = examplesPath / "05-composability"
lazy val messagingConfiguration: File = examplesPath / "06-messaging"
lazy val repairConfiguration: File = examplesPath / "07-repair"
@ -103,7 +103,7 @@ class SimplePingExampleIntegrationTest
class RepairExampleIntegrationTest
extends ExampleIntegrationTest(
advancedConfiguration / "storage" / "h2.conf",
referenceConfiguration / "storage" / "h2.conf",
repairConfiguration / "domain-repair-lost.conf",
repairConfiguration / "domain-repair-new.conf",
repairConfiguration / "domain-export-ledger.conf",

View File

@ -16,7 +16,7 @@ import scala.concurrent.ExecutionContext
*/
final case class CacheConfig(
maximumSize: PositiveNumeric[Long],
expireAfterAccess: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMinutes(10),
expireAfterAccess: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMinutes(1),
) {
def buildScaffeine()(implicit ec: ExecutionContext): Scaffeine[Any, Any] =

Some files were not shown because too many files have changed in this diff Show More