update canton to 20240701.13579.vfc58fab6 (#19486)

* update canton to 20240701.13579.vfc58fab6

tell-slack: canton

* Fixed UpgradesSpecBase.scala.

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Andreas Triantafyllos <andreas.triantafyllos@digitalasset.com>
This commit is contained in:
azure-pipelines[bot] 2024-07-02 16:50:26 +02:00 committed by GitHub
parent 6e9c240ff6
commit 7154279046
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
57 changed files with 729 additions and 378 deletions

View File

@ -63,7 +63,8 @@ object ResourceUtil {
)(implicit M: MonadThrow[M], TM: Thereafter[M]): M[V] = {
import Thereafter.syntax.*
import cats.syntax.flatMap.*
MonadThrow[M].fromTry(Try(f(r))).flatten.thereafter(_ => r.close())
val resource: T = r
MonadThrow[M].fromTry(Try(f(resource))).flatten.thereafter(_ => resource.close())
}
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: CantonExamples

View File

@ -224,8 +224,7 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext
loggerFactory = loggerFactory,
)
// Futures in loadSequencerInfoAsync can race such that more than the expected tolerance can be returned.
// Increase to 2 if result size checks are flaky
val toleranceForRaciness = 1
val toleranceForRaciness = 3
"return complete results when requested" in {
val scs = sequencerConnections(10)

View File

@ -7,6 +7,7 @@ import cats.data.EitherT
import com.digitalasset.canton.{BaseTest, HasExecutionContext}
import org.scalatest.wordspec.AnyWordSpec
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
class ResourceUtilTest extends AnyWordSpec with BaseTest with HasExecutionContext {
@ -223,6 +224,25 @@ class ResourceUtilTest extends AnyWordSpec with BaseTest with HasExecutionContex
.getSuppressed()(0) shouldBe TestException("Something happened when closing")
}
"create a resource only once" in {
val counter = new AtomicInteger(0)
def newResource() = new AutoCloseable {
// Increment the counter when the resource is created
counter.incrementAndGet()
override def close(): Unit = ()
}
ResourceUtil
.withResourceEitherT(newResource())(_ => EitherTUtil.unit[String])
.value
.futureValue
counter.get() shouldBe 1
}
}
}
}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: ai-analysis

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: bank

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: doctor

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: health-insurance

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: medical-records

View File

@ -295,6 +295,8 @@ class BlockSequencerStateManager(
s"and ${chunk.inFlightAggregationUpdates.size} in-flight aggregation updates"
)(handleChunkUpdate(priorHead, chunk, dbSequencerIntegration)(traceContext))
case complete: CompleteBlockUpdate =>
// TODO(#18401): Consider: wait for the DBS watermark to be updated to the blocks last timestamp
// in a supervisory manner, to detect things not functioning properly
LoggerUtil.clueF(
s"Storing completion of block $currentBlockNumber"
)(handleComplete(priorHead, complete.block)(traceContext))

View File

@ -306,6 +306,7 @@ object SequencerBlockStore {
enableAdditionalConsistencyChecks: Boolean,
checkedInvariant: Option[Member],
loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit
executionContext: ExecutionContext
): SequencerBlockStore =
@ -320,6 +321,7 @@ object SequencerBlockStore {
enableAdditionalConsistencyChecks,
checkedInvariant,
loggerFactory,
unifiedSequencer = unifiedSequencer,
)
}

View File

@ -30,6 +30,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.{
InternalSequencerPruningStatus,
}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.resource.DbStorage.Profile.{H2, Oracle, Postgres}
import com.digitalasset.canton.resource.IdempotentInsert.insertVerifyingConflicts
import com.digitalasset.canton.resource.{DbStorage, DbStore}
import com.digitalasset.canton.sequencing.OrdinarySerializedEvent
@ -54,6 +55,7 @@ class DbSequencerBlockStore(
enableAdditionalConsistencyChecks: Boolean,
private val checkedInvariant: Option[Member],
override protected val loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
)(implicit override protected val executionContext: ExecutionContext)
extends SequencerBlockStore
with DbStore {
@ -74,7 +76,19 @@ class DbSequencerBlockStore(
override def readHead(implicit traceContext: TraceContext): Future[BlockEphemeralState] =
storage.query(
for {
blockInfoO <- readLatestBlockInfo()
blockInfoO <- {
if (unifiedSequencer) {
for {
watermark <- safeWaterMarkDBIO
blockInfoO <- watermark match {
case Some(watermark) => findBlockContainingTimestamp(watermark)
case None => readLatestBlockInfo()
}
} yield blockInfoO
} else {
readLatestBlockInfo()
}
}
state <- blockInfoO match {
case None => DBIO.successful(BlockEphemeralState.empty)
case Some(blockInfo) =>
@ -92,16 +106,32 @@ class DbSequencerBlockStore(
functionFullName,
)
private def safeWaterMarkDBIO: DBIOAction[Option[CantonTimestamp], NoStream, Effect.Read] = {
val query = storage.profile match {
case _: H2 | _: Postgres =>
// TODO(#18401): Below only works for a single instance database sequencer
sql"select min(watermark_ts) from sequencer_watermarks"
case _: Oracle =>
sql"select min(watermark_ts) from sequencer_watermarks"
}
// `min` may return null that is wrapped into None
query.as[Option[CantonTimestamp]].headOption.map(_.flatten)
}
private def findBlockContainingTimestamp(
timestamp: CantonTimestamp
): DBIOAction[Option[BlockInfo], NoStream, Effect.Read] =
(sql"""select height, latest_event_ts, latest_sequencer_event_ts from seq_block_height where latest_event_ts >= $timestamp order by height """ ++ topRow)
.as[BlockInfo]
.headOption
override def readStateForBlockContainingTimestamp(
timestamp: CantonTimestamp
)(implicit traceContext: TraceContext): EitherT[Future, InvalidTimestamp, BlockEphemeralState] =
EitherT(
storage.query(
for {
heightAndTimestamp <-
(sql"""select height, latest_event_ts, latest_sequencer_event_ts from seq_block_height where latest_event_ts >= $timestamp order by height """ ++ topRow)
.as[BlockInfo]
.headOption
heightAndTimestamp <- findBlockContainingTimestamp(timestamp)
state <- heightAndTimestamp match {
case None => DBIO.successful(Left(InvalidTimestamp(timestamp)))
case Some(block) => readAtBlock(block).map(Right.apply)

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.crypto.DomainSyncCryptoClient
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.metrics.SequencerMetrics
import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.RegisterError
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerWriter.ResetWatermark
import com.digitalasset.canton.domain.sequencing.sequencer.errors.*
import com.digitalasset.canton.domain.sequencing.sequencer.store.SequencerStore.SequencerPruningResult
import com.digitalasset.canton.domain.sequencing.sequencer.store.*
@ -174,7 +175,7 @@ class DatabaseSequencer(
protected val memberValidator: SequencerMemberValidator = store
protected def resetWatermarkTo: Option[CantonTimestamp] = None
protected def resetWatermarkTo: ResetWatermark = SequencerWriter.ResetWatermarkToClockNow
// Only start pruning scheduler after `store` variable above has been initialized to avoid racy NPE
withNewTraceContext { implicit traceContext =>

View File

@ -85,6 +85,8 @@ abstract class DatabaseSequencerFactory(
DefaultMaxSqlInListSize,
timeouts,
loggerFactory,
unifiedSequencer =
true, // // TODO(#18401): does not affect the usage below, but should be correctly set
// At the moment this store instance is only used for the sequencer initialization,
// if it is retrying a db operation and the factory is closed, the store will be closed as well;
// if request succeeds, the store will no be retrying and doesn't need to be closed

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.config
import com.digitalasset.canton.config.ProcessingTimeout
import com.digitalasset.canton.config.RequireTypes.{PositiveInt, PositiveNumeric}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.domain.sequencing.sequencer.SequencerWriter.ResetWatermark
import com.digitalasset.canton.domain.sequencing.sequencer.WriterStartupError.FailedToInitializeFromSnapshot
import com.digitalasset.canton.domain.sequencing.sequencer.store.*
import com.digitalasset.canton.health.admin.data.SequencerHealthStatus
@ -148,6 +149,7 @@ class SequencerWriter(
maxSqlInListSize,
timeouts,
loggerFactory,
unifiedSequencer = unifiedSequencer,
// Overriding the store's close context with the writers, so that when the writer gets closed, the store
// stops retrying forever
overrideCloseContext = Some(this.closeContext),
@ -236,7 +238,7 @@ class SequencerWriter(
*/
def startOrLogError(
initialSnapshot: Option[SequencerInitialState],
resetWatermarkTo: => Option[CantonTimestamp],
resetWatermarkTo: => ResetWatermark,
)(implicit traceContext: TraceContext): Future[Unit] =
start(initialSnapshot, resetWatermarkTo).fold(
err => logger.error(s"Failed to startup sequencer writer: $err"),
@ -244,8 +246,9 @@ class SequencerWriter(
)
def start(
// TODO(#18401): Move initialization from snapshot into the sequencer factory
initialSnapshot: Option[SequencerInitialState] = None,
resetWatermarkTo: => Option[CantonTimestamp],
resetWatermarkTo: => ResetWatermark,
)(implicit traceContext: TraceContext): EitherT[Future, WriterStartupError, Unit] =
performUnlessClosingEitherT[WriterStartupError, Unit](
functionFullName,
@ -282,19 +285,21 @@ class SequencerWriter(
_ <- expectedCommitMode
.fold(EitherTUtil.unit[String])(writerStore.validateCommitMode)
.leftMap(WriterStartupError.BadCommitMode)
resetWatermarkToO = resetWatermarkTo
resetWatermarkToValue = resetWatermarkTo
_ <- {
resetWatermarkToO
.fold(EitherTUtil.unit[String]) { watermark =>
(resetWatermarkToValue match {
case SequencerWriter.ResetWatermarkToClockNow |
SequencerWriter.DoNotResetWatermark =>
EitherT.pure[Future, String](())
case SequencerWriter.ResetWatermarkToTimestamp(timestamp) =>
logger.debug(
s"Resetting the watermark to an externally passed value of $watermark"
s"Resetting the watermark to the externally passed timestamp of $timestamp"
)
writerStore.resetWatermark(watermark).leftMap(_.toString)
}
.leftMap(WriterStartupError.WatermarkResetError)
writerStore.resetWatermark(timestamp).leftMap(_.toString)
}).leftMap(WriterStartupError.WatermarkResetError)
}
onlineTimestamp <- EitherT.right[WriterStartupError](
runRecovery(writerStore, resetWatermarkToO)
runRecovery(writerStore, resetWatermarkToValue)
)
_ <- EitherT.right[WriterStartupError](waitForOnline(onlineTimestamp))
} yield ()
@ -348,11 +353,23 @@ class SequencerWriter(
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
private def runRecovery(
store: SequencerWriterStore,
resetWatermarkTo: Option[CantonTimestamp],
resetWatermarkTo: ResetWatermark,
)(implicit traceContext: TraceContext): Future[CantonTimestamp] =
for {
_ <- store.deleteEventsPastWatermark()
onlineTimestamp <- store.goOnline(resetWatermarkTo.getOrElse(clock.now))
pastWatermarkO <- store.deleteEventsPastWatermark()
goOnlineAt = resetWatermarkTo match {
case SequencerWriter.ResetWatermarkToClockNow =>
clock.now
case SequencerWriter.ResetWatermarkToTimestamp(timestamp) =>
timestamp
case SequencerWriter.DoNotResetWatermark =>
// Used in unified sequencer mode only, do not jump to now
// the default value ensures that we are not ahead of BlockSequencer's timestamps, when rehydrating
pastWatermarkO.getOrElse(CantonTimestamp.MinValue)
}
onlineTimestamp <- store.goOnline(
goOnlineAt
) // actual online timestamp depends on other instances
_ = if (clock.isSimClock && clock.now < onlineTimestamp) {
logger.debug(s"The sequencer will not start unless sim clock moves to $onlineTimestamp")
logger.debug(
@ -411,7 +428,7 @@ class SequencerWriter(
.getAndSet(None)
.parTraverse_(_.close())
.recover { case NonFatal(e) =>
logger.debug("Failed to close running writer", e)
logger.debug("Running writer will be recovered, due to non-fatal error:", e)
}
// determine whether we can run recovery or not
@ -429,7 +446,9 @@ class SequencerWriter(
FutureUtil.doNotAwait(
// Wait for the writer store to be closed before re-starting, otherwise we might end up with
// concurrent write stores trying to connect to the DB within the same sequencer node
closed.flatMap(_ => startOrLogError(None, None)(traceContext)),
closed.flatMap(_ =>
startOrLogError(None, SequencerWriter.DoNotResetWatermark)(traceContext)
),
"SequencerWriter recovery",
)
} else {
@ -516,4 +535,9 @@ object SequencerWriter {
)
}
sealed trait ResetWatermark
final case object DoNotResetWatermark extends ResetWatermark
final case object ResetWatermarkToClockNow extends ResetWatermark
final case class ResetWatermarkToTimestamp(timestamp: CantonTimestamp) extends ResetWatermark
}

View File

@ -138,9 +138,8 @@ class BlockSequencer(
override private[sequencing] def firstSequencerCounterServeableForSequencer: SequencerCounter =
stateManager.firstSequencerCounterServableForSequencer
override protected def resetWatermarkTo: Option[CantonTimestamp] = {
Some(stateManager.getHeadState.block.lastTs)
}
override protected def resetWatermarkTo: SequencerWriter.ResetWatermark =
SequencerWriter.ResetWatermarkToTimestamp(stateManager.getHeadState.block.lastTs)
private val (killSwitchF, localEventsQueue, done) = {
val headState = stateManager.getHeadState

View File

@ -71,6 +71,7 @@ abstract class BlockSequencerFactory(
nodeParameters.enableAdditionalConsistencyChecks && !nodeParameters.useUnifiedSequencer
)(sequencerId),
loggerFactory,
unifiedSequencer = nodeParameters.useUnifiedSequencer,
)
private val trafficPurchasedStore = TrafficPurchasedStore(

View File

@ -59,6 +59,7 @@ class DbSequencerStore(
maxInClauseSize: PositiveNumeric[Int],
override protected val timeouts: ProcessingTimeout,
override protected val loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
overrideCloseContext: Option[CloseContext] = None,
)(protected implicit val executionContext: ExecutionContext)
extends SequencerStore
@ -392,6 +393,30 @@ class DbSequencerStore(
functionFullName,
)
/** In unified sequencer payload ids are deterministic (these are sequencing times from the block sequencer),
* so we can somewhat safely ignore conflicts arising from sequencer restarts, crash recovery, ha lock loses,
* unlike the complicate `savePayloads` method below
*/
private def savePayloadsUS(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit
traceContext: TraceContext
): EitherT[Future, SavePayloadsError, Unit] = {
val insertSql =
"""insert into sequencer_payloads (id, instance_discriminator, content) values (?, ?, ?) on conflict do nothing"""
EitherT.right[SavePayloadsError](
storage
.queryAndUpdate(
DbStorage.bulkOperation(insertSql, payloads, storage.profile) { pp => payload =>
pp >> payload.id.unwrap
pp >> instanceDiscriminator
pp >> payload.content
},
functionFullName,
)
.map(_ => ())
)
}
/** Save the provided payloads to the store.
*
* For DB implementations we suspect that this will be a hot spot for performance primarily due to size of the payload
@ -424,7 +449,10 @@ class DbSequencerStore(
* - Finally we filter to payloads that haven't yet been successfully inserted and go back to the first step attempting
* to reinsert just this subset.
*/
override def savePayloads(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit
private def savePayloadsResolvingConflicts(
payloads: NonEmpty[Seq[Payload]],
instanceDiscriminator: UUID,
)(implicit
traceContext: TraceContext
): EitherT[Future, SavePayloadsError, Unit] = {
@ -544,6 +572,15 @@ class DbSequencerStore(
go(payloads)
}
override def savePayloads(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit
traceContext: TraceContext
): EitherT[Future, SavePayloadsError, Unit] =
if (unifiedSequencer) {
savePayloadsUS(payloads, instanceDiscriminator)
} else {
savePayloadsResolvingConflicts(payloads, instanceDiscriminator)
}
override def saveEvents(instanceIndex: Int, events: NonEmpty[Seq[Sequenced[PayloadId]]])(implicit
traceContext: TraceContext
): Future[Unit] = {
@ -987,21 +1024,32 @@ class DbSequencerStore(
override def deleteEventsPastWatermark(
instanceIndex: Int
)(implicit traceContext: TraceContext): Future[Unit] =
)(implicit traceContext: TraceContext): Future[Option[CantonTimestamp]] =
for {
watermarkO <- storage.query(
sql"select watermark_ts from sequencer_watermarks where node_index = $instanceIndex"
.as[CantonTimestamp]
.headOption,
functionFullName,
)
watermark = watermarkO.getOrElse(CantonTimestamp.MinValue)
// TODO(#18401): Also cleanup payloads (beyond the payload to event margin)
eventsRemoved <- storage.update(
{
sqlu"""
delete from sequencer_events
where node_index = $instanceIndex
and ts > coalesce((select watermark_ts from sequencer_watermarks where node_index = $instanceIndex), -1)
and ts > $watermark
"""
},
functionFullName,
)
} yield logger.debug(
s"Removed at least $eventsRemoved that were past the last watermark for this sequencer"
} yield {
logger.debug(
s"Removed at least $eventsRemoved that were past the last watermark ($watermarkO) for this sequencer"
)
watermarkO
}
override def saveCounterCheckpoint(
memberId: SequencerMemberId,

View File

@ -43,6 +43,7 @@ class UniqueKeyViolationException(message: String) extends RuntimeException(mess
class InMemorySequencerStore(
protocolVersion: ProtocolVersion,
unifiedSequencer: Boolean,
protected val loggerFactory: NamedLoggerFactory,
)(implicit
protected val executionContext: ExecutionContext
@ -99,9 +100,14 @@ class InMemorySequencerStore(
.flatMap { existingPayload =>
// if we found an existing payload it must have a matching instance discriminator
if (existingPayload.instanceDiscriminator == instanceDiscriminator) None // no error
else
else {
if (unifiedSequencer) {
None
} else {
SavePayloadsError.ConflictingPayloadId(id, existingPayload.instanceDiscriminator).some
}
}
}
.toLeft(())
.leftWiden[SavePayloadsError]
.toEitherT[Future]
@ -212,8 +218,8 @@ class InMemorySequencerStore(
/** No implementation as only required for crash recovery */
override def deleteEventsPastWatermark(instanceIndex: Int)(implicit
traceContext: TraceContext
): Future[Unit] =
Future.unit
): Future[Option[CantonTimestamp]] =
Future.successful(watermark.get().map(_.timestamp))
override def saveCounterCheckpoint(
memberId: SequencerMemberId,

View File

@ -567,10 +567,11 @@ trait SequencerStore extends SequencerMemberValidator with NamedLogging with Aut
/** Delete all events that are ahead of the watermark of this sequencer.
* These events will not have been read and should be removed before returning the sequencer online.
* Should not be called alongside updating the watermark for this sequencer and only while the sequencer is offline.
* Returns the watermark that was used for the deletion.
*/
def deleteEventsPastWatermark(instanceIndex: Int)(implicit
traceContext: TraceContext
): Future[Unit]
): Future[Option[CantonTimestamp]]
/** Save a checkpoint that as of a certain timestamp the member has this counter value.
* Any future subscriptions can then use this as a starting point for serving their event stream rather than starting from 0.
@ -827,10 +828,16 @@ object SequencerStore {
maxInClauseSize: PositiveNumeric[Int],
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
unifiedSequencer: Boolean,
overrideCloseContext: Option[CloseContext] = None,
)(implicit executionContext: ExecutionContext): SequencerStore =
storage match {
case _: MemoryStorage => new InMemorySequencerStore(protocolVersion, loggerFactory)
case _: MemoryStorage =>
new InMemorySequencerStore(
protocolVersion,
unifiedSequencer = unifiedSequencer,
loggerFactory,
)
case dbStorage: DbStorage =>
new DbSequencerStore(
dbStorage,
@ -838,6 +845,7 @@ object SequencerStore {
maxInClauseSize,
timeouts,
loggerFactory,
unifiedSequencer = unifiedSequencer,
overrideCloseContext,
)
}

View File

@ -92,8 +92,11 @@ trait SequencerWriterStore extends AutoCloseable {
/** Delete all events that are ahead of the watermark of this sequencer.
* These events will not have been read and should be removed before returning the sequencer online.
* Should not be called alongside updating the watermark for this sequencer and only while the sequencer is offline.
* Returns the watermark that was used for the deletion.
*/
def deleteEventsPastWatermark()(implicit traceContext: TraceContext): Future[Unit] =
def deleteEventsPastWatermark()(implicit
traceContext: TraceContext
): Future[Option[CantonTimestamp]] =
store.deleteEventsPastWatermark(instanceIndex)
}

View File

@ -116,7 +116,11 @@ class SequencerReaderTest extends FixtureAsyncWordSpec with BaseTest {
new AtomicBoolean(true) // should the latest timestamp be added to the signaller when stored
val actorSystem: ActorSystem = ActorSystem(classOf[SequencerReaderTest].getSimpleName)
implicit val materializer: Materializer = Materializer(actorSystem)
val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory)
val store = new InMemorySequencerStore(
protocolVersion = testedProtocolVersion,
unifiedSequencer = testedUseUnifiedSequencer,
loggerFactory = loggerFactory,
)
val instanceIndex: Int = 0
// create a spy so we can add verifications on how many times methods were called
val storeSpy: InMemorySequencerStore = spy[InMemorySequencerStore](store)

View File

@ -73,7 +73,11 @@ class SequencerTest extends FixtureAsyncWordSpec with BaseTest with HasExecution
Some(parallelExecutionContext),
)
private val materializer = implicitly[Materializer]
val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory)
val store = new InMemorySequencerStore(
protocolVersion = testedProtocolVersion,
unifiedSequencer = testedUseUnifiedSequencer,
loggerFactory = loggerFactory,
)
val clock = new WallClock(timeouts, loggerFactory = loggerFactory)
val crypto: DomainSyncCryptoClient = valueOrFail(
TestingTopology(sequencerGroup =

View File

@ -84,7 +84,9 @@ class SequencerWriterSourceTest extends AsyncWordSpec with BaseTest with HasExec
class InMemoryStoreWithTimeAdvancement(lFactory: NamedLoggerFactory)(implicit
ec: ExecutionContext
) extends InMemorySequencerStore(testedProtocolVersion, lFactory)(ec) {
) extends InMemorySequencerStore(testedProtocolVersion, testedUseUnifiedSequencer, lFactory)(
ec
) {
private val timeAdvancement = new AtomicReference[java.time.Duration](java.time.Duration.ZERO)
def setClockAdvanceBeforeSavePayloads(duration: java.time.Duration): Unit =

View File

@ -25,7 +25,6 @@ import com.digitalasset.canton.time.SimClock
import com.digitalasset.canton.topology.DefaultTestIdentities
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.MonadUtil
import org.mockito.ArgumentMatchers
import org.scalatest.FutureOutcome
import org.scalatest.wordspec.FixtureAsyncWordSpec
@ -60,8 +59,11 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest {
val clock = new SimClock(loggerFactory = loggerFactory)
val runningFlows = mutable.Buffer[MockRunningWriterFlow]()
val storage = new MemoryStorage(loggerFactory, timeouts)
val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory)
val storeSpy = spy(store)
val store = new InMemorySequencerStore(
protocolVersion = testedProtocolVersion,
unifiedSequencer = testedUseUnifiedSequencer,
loggerFactory = loggerFactory,
)
val instanceIndex = 0
val storageFactory = new MockWriterStoreFactory()
@ -82,11 +84,6 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest {
override val generalStore: SequencerStore = store
}
def setupNextGoOnlineTimestamp(ts: CantonTimestamp): Unit =
when(
storeSpy.goOnline(ArgumentMatchers.eq(instanceIndex), any[CantonTimestamp])(anyTraceContext)
)
.thenReturn(Future.successful(ts))
def numberOfFlowsCreated: Int = runningFlows.size
def latestRunningWriterFlowPromise: Promise[Unit] =
@ -127,21 +124,14 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest {
"wait for online timestamp to be reached" in { env =>
import env.*
// set our time to ts2 but the returned goOnline timestamp to ts4
clock.advanceTo(ts(3))
setupNextGoOnlineTimestamp(ts(4))
val startET = writer.start(None, None)
val startET = writer.start(None, SequencerWriter.ResetWatermarkToClockNow)
for {
_ <- allowScheduledFuturesToComplete
_ = writer.isRunning shouldBe false
_ = clock.advanceTo(ts(3)) // still not reached our online time
_ <- allowScheduledFuturesToComplete
_ = writer.isRunning shouldBe false
_ = startET.value.isCompleted shouldBe false
// finally reach our online timestamp
_ = clock.advanceTo(ts(4))
_ <- valueOrFail(startET)("Starting Sequencer Writer")
} yield writer.isRunning shouldBe true
}
@ -163,10 +153,11 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest {
)
for {
_ <- valueOrFail(writer.start(None, None))("Starting writer")
_ <- valueOrFail(writer.start(None, SequencerWriter.ResetWatermarkToClockNow))(
"Starting writer"
)
_ = writer.isRunning shouldBe true
// set the next goOffline timestamp to way in the future to delay the recovery so we can run assertions
_ = setupNextGoOnlineTimestamp(ts(10))
// have the writer flow blow up with an exception saying we've been knocked offline
_ = latestRunningWriterFlowPromise.failure(new SequencerOfflineException(42))
_ <- allowScheduledFuturesToComplete
@ -175,11 +166,10 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest {
// load balancers
sendError <- leftOrFail(writer.send(mockSubmissionRequest))("send when unavailable")
_ = sendError shouldBe SendAsyncError.Unavailable("Unavailable")
// now progress to allow crash recovery to complete
_ = clock.advanceTo(ts(10))
// there may be a number of future hops to work its way through completing the second flow which we currently
// can't capture via flushes, so just check it eventually happens
_ <- MonadUtil.sequentialTraverse(0 until 10)(_ => allowScheduledFuturesToComplete)
_ = writer.isRunning shouldBe true
} yield {
numberOfFlowsCreated shouldBe 2
writer.isRunning shouldBe true

View File

@ -27,6 +27,7 @@ trait DbSequencerStoreTest extends SequencerStoreTest with MultiTenantedSequence
MaxInClauseSize,
timeouts,
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)
)
behave like multiTenantedSequencerStore(() =>
@ -36,6 +37,7 @@ trait DbSequencerStoreTest extends SequencerStoreTest with MultiTenantedSequence
MaxInClauseSize,
timeouts,
loggerFactory,
unifiedSequencer = testedUseUnifiedSequencer,
)
)
}

View File

@ -461,7 +461,7 @@ trait SequencerStoreTest
}
"save payloads" should {
"return an error if there is a conflicting id" in {
"return an error if there is a conflicting id for database sequencer" onlyRunWhen (!testedUseUnifiedSequencer) in {
val env = Env()
val Seq(p1, p2, p3) =
0.until(3).map(n => Payload(PayloadId(ts(n)), ByteString.copyFromUtf8(n.toString)))
@ -477,6 +477,23 @@ trait SequencerStoreTest
)("savePayloads2")
} yield error shouldBe SavePayloadsError.ConflictingPayloadId(p2.id, instanceDiscriminator1)
}
"succeed on a conflicting payload id for unified sequencer" onlyRunWhen (testedUseUnifiedSequencer) in {
val env = Env()
val Seq(p1, p2, p3) =
0.until(3).map(n => Payload(PayloadId(ts(n)), ByteString.copyFromUtf8(n.toString)))
// we'll first write p1 and p2 that should work
// then write p2 and p3 with a separate instance discriminator which should fail due to a conflicting id
for {
_ <- valueOrFail(env.store.savePayloads(NonEmpty(Seq, p1, p2), instanceDiscriminator1))(
"savePayloads1"
)
_ <- valueOrFail(
env.store.savePayloads(NonEmpty(Seq, p2, p3), instanceDiscriminator2)
)("savePayloads2")
} yield succeed
}
}
"counter checkpoints" should {
@ -1144,5 +1161,20 @@ trait SequencerStoreTest
}
}
}
"deleteEventsPastWatermark" should {
"return the watermark used for the deletion" in {
val testWatermark = CantonTimestamp.assertFromLong(1719841168208718L)
val env = Env()
import env.*
for {
_ <- saveWatermark(testWatermark).valueOrFail("saveWatermark")
watermark <- store.deleteEventsPastWatermark(0)
} yield {
watermark shouldBe Some(testWatermark)
}
}
}
}
}

View File

@ -9,7 +9,11 @@ import org.scalatest.wordspec.AsyncWordSpec
class SequencerStoreTestInMemory extends AsyncWordSpec with BaseTest with SequencerStoreTest {
"InMemorySequencerStore" should {
behave like sequencerStore(() =>
new InMemorySequencerStore(testedProtocolVersion, loggerFactory)
new InMemorySequencerStore(
protocolVersion = testedProtocolVersion,
unifiedSequencer = testedUseUnifiedSequencer,
loggerFactory = loggerFactory,
)
)
}
}

View File

@ -23,7 +23,6 @@ import io.opentelemetry.api.trace.Tracer
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.*
/** Wrapper and life-cycle manager for the in-memory Ledger API state. */
private[platform] class InMemoryState(
@ -83,6 +82,9 @@ object InMemoryState {
executionContext: ExecutionContext,
tracer: Tracer,
loggerFactory: NamedLoggerFactory,
)(
mutableLedgerEndCache: MutableLedgerEndCache,
stringInterningView: StringInterningView,
)(implicit traceContext: TraceContext): ResourceOwner[InMemoryState] = {
val initialLedgerEnd = LedgerEnd.beforeBegin
@ -95,10 +97,7 @@ object InMemoryState {
loggerFactory,
)
} yield new InMemoryState(
ledgerEndCache = MutableLedgerEndCache()
.tap(
_.set((initialLedgerEnd.lastOffset, initialLedgerEnd.lastEventSeqId))
),
ledgerEndCache = mutableLedgerEndCache,
dispatcherState = dispatcherState,
contractStateCaches = ContractStateCaches.build(
initialLedgerEnd.lastOffset,
@ -113,7 +112,7 @@ object InMemoryState {
maxBufferedChunkSize = bufferedStreamsPageSize,
loggerFactory = loggerFactory,
),
stringInterningView = new StringInterningView(loggerFactory),
stringInterningView = stringInterningView,
submissionTracker = submissionTracker,
commandProgressTracker = commandProgressTracker,
loggerFactory = loggerFactory,

View File

@ -9,6 +9,8 @@ import com.digitalasset.canton.metrics.LedgerApiServerMetrics
import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker
import com.digitalasset.canton.platform.config.IndexServiceConfig
import com.digitalasset.canton.platform.index.InMemoryStateUpdater
import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache
import com.digitalasset.canton.platform.store.interning.StringInterningView
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
@ -23,6 +25,9 @@ object LedgerApiServer {
executionContext: ExecutionContext,
tracer: Tracer,
loggerFactory: NamedLoggerFactory,
)(
mutableLedgerEndCache: MutableLedgerEndCache,
stringInterningView: StringInterningView,
)(implicit
traceContext: TraceContext
): ResourceOwner[(InMemoryState, InMemoryStateUpdater.UpdaterFlow)] = {
@ -40,7 +45,7 @@ object LedgerApiServer {
metrics = metrics,
tracer = tracer,
loggerFactory = loggerFactory,
)
)(mutableLedgerEndCache, stringInterningView)
inMemoryStateUpdater <- InMemoryStateUpdater.owner(
inMemoryState = inMemoryState,

View File

@ -32,29 +32,29 @@ class PackageUpgradeValidator(
)(implicit executionContext: ExecutionContext)
extends NamedLogging {
def validateUpgrade(upgradingPackage: (Ref.PackageId, Ast.Package))(implicit
def validateUpgrade(uploadedPackage: (Ref.PackageId, Ast.Package))(implicit
loggingContext: LoggingContextWithTrace
): EitherT[Future, DamlError, Unit] = {
val packageMap = getPackageMap(loggingContext)
val (upgradingPackageId, upgradingPackageAst) = upgradingPackage
val optUpgradingDar = Some(upgradingPackage)
val (uploadedPackageId, uploadedPackageAst) = uploadedPackage
val optUpgradingDar = Some(uploadedPackage)
logger.info(
s"Uploading DAR file for $upgradingPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}."
s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}."
)
existingVersionedPackageId(upgradingPackageAst, packageMap) match {
case Some(uploadedPackageId) =>
if (uploadedPackageId == upgradingPackageId) {
existingVersionedPackageId(uploadedPackageAst, packageMap) match {
case Some(existingPackageId) =>
if (existingPackageId == uploadedPackageId) {
logger.info(
s"Ignoring upload of package $upgradingPackageId as it has been previously uploaded"
s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded"
)
EitherT.rightT[Future, DamlError](())
} else {
EitherT.leftT[Future, Unit](
Validation.UpgradeVersion
.Error(
uploadedPackageId,
upgradingPackageId,
upgradingPackageAst.metadata.version,
uploadedPackageId = uploadedPackageId,
existingPackage = existingPackageId,
packageVersion = uploadedPackageAst.metadata.version,
): DamlError
)
}
@ -63,7 +63,7 @@ class PackageUpgradeValidator(
for {
optMaximalDar <- EitherT.right[DamlError](
maximalVersionedDar(
upgradingPackageAst,
uploadedPackageAst,
packageMap,
)
)
@ -73,14 +73,14 @@ class PackageUpgradeValidator(
optMaximalDar,
)
optMinimalDar <- EitherT.right[DamlError](
minimalVersionedDar(upgradingPackageAst, packageMap)
minimalVersionedDar(uploadedPackageAst, packageMap)
)
_ <- typecheckUpgrades(
TypecheckUpgrades.MinimalDarCheck,
optMinimalDar,
optUpgradingDar,
)
_ = logger.info(s"Typechecking upgrades for $upgradingPackageId succeeded.")
_ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.")
} yield ()
}
}
@ -165,7 +165,12 @@ class PackageUpgradeValidator(
.toEither
)
).leftMap[DamlError] {
case err: UpgradeError => Validation.Upgradeability.Error(newPkgId1, oldPkgId2, err)
case err: UpgradeError =>
Validation.Upgradeability.Error(
upgradingPackage = newPkgId1,
upgradedPackage = oldPkgId2,
upgradeError = err,
)
case unhandledErr =>
InternalError.Unhandled(
unhandledErr,

View File

@ -32,7 +32,9 @@ import com.digitalasset.canton.platform.store.DbSupport.{
DbConfig,
ParticipantDataSourceConfig,
}
import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache
import com.digitalasset.canton.platform.store.dao.events.{ContractLoader, LfValueTranslation}
import com.digitalasset.canton.platform.store.interning.StringInterningView
import com.digitalasset.canton.platform.store.packagemeta.PackageMetadata
import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced}
import com.digitalasset.daml.lf.data.Ref
@ -93,6 +95,8 @@ trait IndexComponentTest extends PekkoBeforeAndAfterAll with BaseTest {
val engine = new Engine(
EngineConfig(LanguageVersion.StableVersions(LanguageMajorVersion.V2))
)
val mutableLedgerEndCache = MutableLedgerEndCache()
val stringInterningView = new StringInterningView(loggerFactory)
val indexResourceOwner =
for {
@ -104,7 +108,7 @@ trait IndexComponentTest extends PekkoBeforeAndAfterAll with BaseTest {
executionContext = ec,
tracer = NoReportingTracerProvider.tracer,
loggerFactory = loggerFactory,
)
)(mutableLedgerEndCache, stringInterningView)
dbSupport <- DbSupport
.owner(
serverRole = ServerRole.ApiServer,

View File

@ -38,6 +38,7 @@ import com.digitalasset.canton.platform.store.DbSupport.{
ParticipantDataSourceConfig,
}
import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache
import com.digitalasset.canton.platform.store.interning.StringInterningView
import com.digitalasset.canton.tracing.TraceContext.{withNewTraceContext, wrapWithNewTraceContext}
import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced}
import com.digitalasset.canton.{HasExecutionContext, config}
@ -252,6 +253,8 @@ class RecoveringIndexerIntegrationSpec
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
mutableLedgerEndCache = MutableLedgerEndCache()
stringInterningView = new StringInterningView(loggerFactory)
(inMemoryState, inMemoryStateUpdaterFlow) <-
LedgerApiServer
.createInMemoryStateAndUpdater(
@ -262,7 +265,7 @@ class RecoveringIndexerIntegrationSpec
parallelExecutionContext,
tracer,
loggerFactory,
)
)(mutableLedgerEndCache, stringInterningView)
dbSupport <- DbSupport
.owner(
serverRole = ServerRole.Testing(getClass),

View File

@ -18,6 +18,8 @@ import com.digitalasset.canton.platform.indexer.{
IndexerStartupMode,
}
import com.digitalasset.canton.platform.store.DbSupport.ParticipantDataSourceConfig
import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache
import com.digitalasset.canton.platform.store.interning.StringInterningView
import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext
import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext}
import io.opentelemetry.api.trace.Tracer
@ -91,6 +93,8 @@ final class IndexerStabilityTestFixture(loggerFactory: NamedLoggerFactory) {
NoOpMetricsFactory,
)
}
mutableLedgerEndCache = MutableLedgerEndCache()
stringInterningView = new StringInterningView(loggerFactory)
(inMemoryState, inMemoryStateUpdaterFlow) <-
LedgerApiServer
.createInMemoryStateAndUpdater(
@ -101,7 +105,7 @@ final class IndexerStabilityTestFixture(loggerFactory: NamedLoggerFactory) {
executionContext,
tracer,
loggerFactory,
)
)(mutableLedgerEndCache, stringInterningView)
.acquire()
// Create an indexer and immediately start it

View File

@ -1,188 +1,191 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.ledger.indexerbenchmark
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.daml.metrics.api.{HistogramInventory, MetricName}
import com.daml.resources
import com.digitalasset.canton.concurrent.DirectExecutionContext
import com.digitalasset.canton.data.Offset
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.canton.ledger.participant.state.{
InternalStateServiceProviderImpl,
ReadService,
Update,
}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
import com.digitalasset.canton.platform.LedgerApiServer
import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker
import com.digitalasset.canton.platform.indexer.ha.HaConfig
import com.digitalasset.canton.platform.indexer.{Indexer, IndexerServiceOwner, JdbcIndexer}
import com.digitalasset.canton.platform.store.DbSupport.DataSourceProperties
import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext
import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced}
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Source
import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.io.StdIn
// TODO(i12337): Use it or remove it
class IndexerBenchmark extends NamedLogging {
override val loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root
private val directEc = DirectExecutionContext(noTracingLogger)
def run(
createUpdates: () => Future[Source[(Offset, Traced[Update]), NotUsed]],
config: Config,
dataSourceProperties: DataSourceProperties,
highAvailability: HaConfig,
): Future[Unit] = {
withNewTraceContext { implicit traceContext =>
val system = ActorSystem("IndexerBenchmark")
implicit val materializer: Materializer = Materializer(system)
implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
val indexerExecutor = Executors.newWorkStealingPool()
val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor)
val tracer: Tracer = NoReportingTracerProvider.tracer
println("Generating state updates...")
val updates = Await.result(createUpdates(), Duration(10, "minute"))
println("Creating read service and indexer...")
val readService = createReadService(updates)
val metrics = new LedgerApiServerMetrics(
new LedgerApiServerHistograms(MetricName("noop"))(new HistogramInventory),
NoOpMetricsFactory,
)
val resource = for {
servicesExecutionContext <- ResourceOwner
.forExecutorService(() => Executors.newWorkStealingPool())
.map(ExecutionContext.fromExecutorService)
.acquire()
(inMemoryState, inMemoryStateUpdaterFlow) <-
LedgerApiServer
.createInMemoryStateAndUpdater(
CommandProgressTracker.NoOp,
config.indexServiceConfig,
256,
metrics,
indexerExecutionContext,
tracer,
loggerFactory,
)
.acquire()
indexerFactory = new JdbcIndexer.Factory(
config.participantId,
config.dataSource,
config.indexerConfig,
Set.empty,
readService,
metrics,
inMemoryState,
inMemoryStateUpdaterFlow,
servicesExecutionContext,
tracer,
loggerFactory,
dataSourceProperties,
highAvailability,
None,
)
_ = println("Setting up the index database...")
indexer <- indexer(config, indexerExecutionContext, indexerFactory)
_ = println("Starting the indexing...")
startTime = System.nanoTime()
handle <- indexer.acquire()
_ <- Resource.fromFuture(handle)
stopTime = System.nanoTime()
_ = println("Indexing done.")
_ <- Resource.fromFuture(system.terminate())
_ = indexerExecutor.shutdown()
} yield {
val result = new IndexerBenchmarkResult(
config,
metrics,
startTime,
stopTime,
)
println(result.banner)
// Note: this allows the user to inspect the contents of an ephemeral database
if (config.waitForUserInput) {
println(
s"Index database is still running at ${config.dataSource.jdbcUrl}."
)
StdIn.readLine("Press <enter> to terminate this process.").discard
}
if (result.failure) throw new RuntimeException("Indexer Benchmark failure.")
()
}
resource.asFuture
}
}
private def indexer(
config: Config,
indexerExecutionContext: ExecutionContextExecutor,
indexerFactory: JdbcIndexer.Factory,
)(implicit
traceContext: TraceContext,
rc: ResourceContext,
): resources.Resource[ResourceContext, Indexer] =
Await
.result(
IndexerServiceOwner
.migrateOnly(config.dataSource.jdbcUrl, loggerFactory)(rc.executionContext, traceContext)
.map(_ => indexerFactory.initialized(logger))(indexerExecutionContext),
Duration(5, "minute"),
)
.acquire()
private[this] def createReadService(
updates: Source[(Offset, Traced[Update]), NotUsed]
): ReadService = {
new ReadService with InternalStateServiceProviderImpl {
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit traceContext: TraceContext): Source[(Offset, Traced[Update]), NotUsed] = {
assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
updates
}
override def currentHealth(): HealthStatus = Healthy
}
}
def runAndExit(
config: Config,
dataSourceProperties: DataSourceProperties,
highAvailability: HaConfig,
updates: () => Future[Source[(Offset, Traced[Update]), NotUsed]],
): Unit = {
val result: Future[Unit] = new IndexerBenchmark()
.run(updates, config, dataSourceProperties, highAvailability)
.recover { case ex =>
logger.error("Error running benchmark", ex)(TraceContext.empty)
sys.exit(1)
}(directEc)
Await.result(result, Duration(100, "hour"))
println("Done.")
// We call sys.exit because some actor system or thread pool is still running, preventing a normal shutdown.
sys.exit(0)
}
}
//// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
//// SPDX-License-Identifier: Apache-2.0
//
//package com.digitalasset.canton.ledger.indexerbenchmark
//
//import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
//import com.daml.metrics.api.noop.NoOpMetricsFactory
//import com.daml.metrics.api.{HistogramInventory, MetricName}
//import com.daml.resources
//import com.digitalasset.canton.concurrent.DirectExecutionContext
//import com.digitalasset.canton.data.Offset
//import com.digitalasset.canton.discard.Implicits.DiscardOps
//import com.digitalasset.canton.ledger.api.health.{HealthStatus, Healthy}
//import com.digitalasset.canton.ledger.participant.state.{
// InternalStateServiceProviderImpl,
// ReadService,
// Update,
//}
//import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
//import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics}
//import com.digitalasset.canton.platform.LedgerApiServer
//import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker
//import com.digitalasset.canton.platform.indexer.ha.HaConfig
//import com.digitalasset.canton.platform.indexer.{Indexer, IndexerServiceOwner, JdbcIndexer}
//import com.digitalasset.canton.platform.store.DbSupport.DataSourceProperties
//import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext
//import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced}
//import io.opentelemetry.api.trace.Tracer
//import org.apache.pekko.NotUsed
//import org.apache.pekko.actor.ActorSystem
//import org.apache.pekko.stream.Materializer
//import org.apache.pekko.stream.scaladsl.Source
//
//import java.util.concurrent.Executors
//import scala.concurrent.duration.Duration
//import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
//import scala.io.StdIn
//
//class IndexerBenchmark extends NamedLogging {
//
// override val loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root
//
// private val directEc = DirectExecutionContext(noTracingLogger)
//
// def run(
// createUpdates: () => Future[Source[(Offset, Traced[Update]), NotUsed]],
// config: Config,
// dataSourceProperties: DataSourceProperties,
// highAvailability: HaConfig,
// ): Future[Unit] = {
// withNewTraceContext { implicit traceContext =>
// val system = ActorSystem("IndexerBenchmark")
// implicit val materializer: Materializer = Materializer(system)
// implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
//
// val indexerExecutor = Executors.newWorkStealingPool()
// val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor)
// val tracer: Tracer = NoReportingTracerProvider.tracer
//
// println("Generating state updates...")
// val updates = Await.result(createUpdates(), Duration(10, "minute"))
//
// println("Creating read service and indexer...")
// val readService = createReadService(updates)
// val metrics = new LedgerApiServerMetrics(
// new LedgerApiServerHistograms(MetricName("noop"))(new HistogramInventory),
// NoOpMetricsFactory,
// )
// val resource = for {
// servicesExecutionContext <- ResourceOwner
// .forExecutorService(() => Executors.newWorkStealingPool())
// .map(ExecutionContext.fromExecutorService)
// .acquire()
// (inMemoryState, inMemoryStateUpdaterFlow) <-
// LedgerApiServer
// .createInMemoryStateAndUpdater(
// CommandProgressTracker.NoOp,
// config.indexServiceConfig,
// 256,
// metrics,
// indexerExecutionContext,
// tracer,
// loggerFactory,
// )
// .acquire()
// indexerFactory = new JdbcIndexer.Factory(
// config.participantId,
// config.dataSource,
// config.indexerConfig,
// Set.empty,
// readService,
// metrics,
// inMemoryState,
// inMemoryStateUpdaterFlow,
// servicesExecutionContext,
// tracer,
// loggerFactory,
// dataSourceProperties,
// highAvailability,
// None,
// )
// _ = println("Setting up the index database...")
// indexer <- indexer(config, indexerExecutionContext, indexerFactory)
// _ = println("Starting the indexing...")
// startTime = System.nanoTime()
// handle <- indexer.acquire()
// _ <- Resource.fromFuture(handle)
// stopTime = System.nanoTime()
// _ = println("Indexing done.")
// _ <- Resource.fromFuture(system.terminate())
// _ = indexerExecutor.shutdown()
// } yield {
// val result = new IndexerBenchmarkResult(
// config,
// metrics,
// startTime,
// stopTime,
// )
//
// println(result.banner)
//
// // Note: this allows the user to inspect the contents of an ephemeral database
// if (config.waitForUserInput) {
// println(
// s"Index database is still running at ${config.dataSource.jdbcUrl}."
// )
// StdIn.readLine("Press <enter> to terminate this process.").discard
// }
//
// if (result.failure) throw new RuntimeException("Indexer Benchmark failure.")
// ()
// }
// resource.asFuture
// }
// }
//
// private def indexer(
// config: Config,
// indexerExecutionContext: ExecutionContextExecutor,
// indexerFactory: JdbcIndexer.Factory,
// )(implicit
// traceContext: TraceContext,
// rc: ResourceContext,
// ): resources.Resource[ResourceContext, Indexer] =
// Await
// .result(
// IndexerServiceOwner
// .migrateOnly(config.dataSource.jdbcUrl, loggerFactory)(rc.executionContext, traceContext)
// .map(_ => indexerFactory.initialized(logger))(indexerExecutionContext),
// Duration(5, "minute"),
// )
// .acquire()
//
// private[this] def createReadService(
// updates: Source[(Offset, Traced[Update]), NotUsed]
// ): ReadService = {
// new ReadService with InternalStateServiceProviderImpl {
// override def stateUpdates(
// beginAfter: Option[Offset]
// )(implicit traceContext: TraceContext): Source[(Offset, Traced[Update]), NotUsed] = {
// assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
// updates
// }
//
// override def currentHealth(): HealthStatus = Healthy
// }
// }
//
// def runAndExit(
// config: Config,
// dataSourceProperties: DataSourceProperties,
// highAvailability: HaConfig,
// updates: () => Future[Source[(Offset, Traced[Update]), NotUsed]],
// ): Unit = {
// val result: Future[Unit] = new IndexerBenchmark()
// .run(updates, config, dataSourceProperties, highAvailability)
// .recover { case ex =>
// logger.error("Error running benchmark", ex)(TraceContext.empty)
// sys.exit(1)
// }(directEc)
//
// Await.result(result, Duration(100, "hour"))
// println("Done.")
// // We call sys.exit because some actor system or thread pool is still running, preventing a normal shutdown.
// sys.exit(0)
// }
//}

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --enable-interfaces=yes
name: carbonv1-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --enable-interfaces=yes
name: carbonv2-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
name: experimental-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --enable-interfaces=yes
name: model-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
name: package-management-tests
source: .
version: 3.1.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --enable-interfaces=yes
name: semantic-tests

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
name: upgrade-tests
source: .
version: 1.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
name: upgrade-tests
source: .
version: 2.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
name: upgrade-tests
source: .
version: 3.0.0

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: JsonEncodingTest

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.dev
name: JsonEncodingTestDev

View File

@ -1,4 +1,4 @@
sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330
sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70
build-options:
- --target=2.1
name: AdminWorkflows

View File

@ -130,6 +130,7 @@ class CantonLedgerApiServerFactory(
futureSupervisor = futureSupervisor,
parameters = parameters,
excludedPackageIds = excludedPackageIds,
ledgerApiStore = participantNodePersistentState.map(_.ledgerApiStore),
)(executionContext, actorSystem)
.leftMap { err =>
// The MigrateOnEmptySchema exception is private, thus match on the expected message

View File

@ -543,11 +543,14 @@ class ParticipantNodeBootstrap(
persistentStateFactory.create(
syncDomainPersistentStateManager,
storage,
config.storage,
clock,
config.init.ledgerApi.maxDeduplicationDuration.toInternal.some,
parameterConfig.batchingConfig,
ReleaseProtocolVersion.latest,
arguments.metrics,
participantId.toLf,
config.ledgerApi,
indexedStringStore,
parameterConfig.processingTimeouts,
futureSupervisor,

View File

@ -3,6 +3,7 @@
package com.digitalasset.canton.participant.ledger.api
import cats.Eval
import cats.data.EitherT
import cats.syntax.either.*
import com.daml.tracing.DefaultOpenTelemetry
@ -24,10 +25,10 @@ import com.digitalasset.canton.platform.apiserver.*
import com.digitalasset.canton.platform.apiserver.meteringreport.MeteringReportKey
import com.digitalasset.canton.platform.indexer.IndexerConfig
import com.digitalasset.canton.platform.indexer.ha.HaConfig
import com.digitalasset.canton.platform.store.DbSupport
import com.digitalasset.canton.tracing.{NoTracing, TracerProvider}
import com.digitalasset.canton.{LedgerParticipantId, LfPackageId}
import com.digitalasset.daml.lf.engine.Engine
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.actor.ActorSystem
import scala.util.{Failure, Success}
@ -92,45 +93,22 @@ object CantonLedgerApiServerWrapper extends NoTracing {
startLedgerApiServer: Boolean,
futureSupervisor: FutureSupervisor,
excludedPackageIds: Set[LfPackageId],
ledgerApiStore: Eval[LedgerApiStore],
)(implicit
ec: ExecutionContextIdlenessExecutorService,
actorSystem: ActorSystem,
): EitherT[FutureUnlessShutdown, LedgerApiServerError, LedgerApiServerState] = {
val ledgerApiStorageE = LedgerApiStorage.fromStorageConfig(
config.storageConfig,
config.participantId,
)
EitherT
.fromEither[FutureUnlessShutdown](ledgerApiStorageE)
.flatMap { ledgerApiStorage =>
val connectionPoolConfig = DbSupport.ConnectionPoolConfig(
connectionPoolSize = config.storageConfig.numConnectionsLedgerApiServer.unwrap,
connectionTimeout = config.serverConfig.databaseConnectionTimeout.underlying,
)
val dbConfig = DbSupport.DbConfig(
jdbcUrl = ledgerApiStorage.jdbcUrl,
connectionPool = connectionPoolConfig,
postgres = config.serverConfig.postgresDataSource,
)
val participantDataSourceConfig =
DbSupport.ParticipantDataSourceConfig(ledgerApiStorage.jdbcUrl)
implicit val tracer = config.tracerProvider.tracer
implicit val tracer: Tracer = config.tracerProvider.tracer
val startableStoppableLedgerApiServer =
new StartableStoppableLedgerApiServer(
config = config,
participantDataSourceConfig = participantDataSourceConfig,
dbConfig = dbConfig,
telemetry = new DefaultOpenTelemetry(config.tracerProvider.openTelemetry),
futureSupervisor = futureSupervisor,
parameters = parameters,
commandProgressTracker = config.syncService.commandProgressTracker,
excludedPackageIds = excludedPackageIds,
ledgerApiStore = ledgerApiStore,
)
val startFUS = for {
_ <-
@ -143,7 +121,6 @@ object CantonLedgerApiServerWrapper extends NoTracing {
FutureUnlessShutdown.pure(
Either.right(
LedgerApiServerState(
ledgerApiStorage,
startableStoppableLedgerApiServer,
config.logger,
config.cantonParameterConfig.processingTimeouts,
@ -153,17 +130,15 @@ object CantonLedgerApiServerWrapper extends NoTracing {
case Failure(e) => FutureUnlessShutdown.pure(Left(FailedToStartLedgerApiServer(e)))
})
}
}
final case class LedgerApiServerState(
ledgerApiStorage: LedgerApiStorage,
startableStoppableLedgerApi: StartableStoppableLedgerApiServer,
override protected val logger: TracedLogger,
protected override val timeouts: ProcessingTimeout,
) extends FlagCloseable {
override protected def onClosed(): Unit =
Lifecycle.close(startableStoppableLedgerApi, ledgerApiStorage)(logger)
Lifecycle.close(startableStoppableLedgerApi)(logger)
override def toString: String = getClass.getSimpleName
}

View File

@ -0,0 +1,139 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.canton.participant.ledger.api
import com.daml.logging.entries.LoggingEntries
import com.daml.metrics.DatabaseMetrics
import com.digitalasset.canton.concurrent.ExecutionContextIdlenessExecutorService
import com.digitalasset.canton.config.{MemoryStorageConfig, ProcessingTimeout, StorageConfig}
import com.digitalasset.canton.logging.{LoggingContextWithTrace, NamedLoggerFactory}
import com.digitalasset.canton.metrics.LedgerApiServerMetrics
import com.digitalasset.canton.platform.ResourceCloseable
import com.digitalasset.canton.platform.config.ServerRole
import com.digitalasset.canton.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.digitalasset.canton.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache
import com.digitalasset.canton.platform.store.interning.StringInterningView
import com.digitalasset.canton.platform.store.{DbSupport, FlywayMigrations}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.{LedgerParticipantId, config}
import java.sql.Connection
import scala.concurrent.{ExecutionContext, Future}
final class LedgerApiStore(
val ledgerApiDbSupport: DbSupport,
val ledgerApiStorage: LedgerApiStorage,
val ledgerEndCache: MutableLedgerEndCache,
val stringInterningView: StringInterningView,
val metrics: LedgerApiServerMetrics,
val loggerFactory: NamedLoggerFactory,
val timeouts: ProcessingTimeout,
) extends ResourceCloseable {
private val parameterStorageBackend =
ledgerApiDbSupport.storageBackendFactory.createParameterStorageBackend
private val stringInterningStorageBackend =
ledgerApiDbSupport.storageBackendFactory.createStringInterningStorageBackend
private def executeSql[T](databaseMetrics: DatabaseMetrics)(
sql: Connection => T
)(implicit traceContext: TraceContext): Future[T] =
ledgerApiDbSupport.dbDispatcher.executeSql(databaseMetrics)(sql)(
new LoggingContextWithTrace(LoggingEntries.empty, traceContext)
)
def ledgerEnd(implicit traceContext: TraceContext): Future[LedgerEnd] =
executeSql(metrics.index.db.getLedgerEnd)(
parameterStorageBackend.ledgerEnd
)
private[api] def initializeInMemoryState(implicit
traceContext: TraceContext,
executionContext: ExecutionContext,
): Future[Unit] =
for {
currentLedgerEnd <- ledgerEnd
_ <- stringInterningView.update(currentLedgerEnd.lastStringInterningId)(
(fromExclusive, toInclusive) =>
executeSql(metrics.index.db.loadStringInterningEntries)(
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
)
)
} yield {
ledgerEndCache.set(
(
currentLedgerEnd.lastOffset,
currentLedgerEnd.lastEventSeqId,
)
)
}
}
object LedgerApiStore {
def initialize(
storageConfig: StorageConfig,
ledgerParticipantId: LedgerParticipantId,
legderApiDatabaseConnectionTimeout: config.NonNegativeFiniteDuration,
ledgerApiPostgresDataSourceConfig: PostgresDataSourceConfig,
timeouts: ProcessingTimeout,
loggerFactory: NamedLoggerFactory,
metrics: LedgerApiServerMetrics,
)(implicit
traceContext: TraceContext,
executionContext: ExecutionContextIdlenessExecutorService,
): Future[LedgerApiStore] = {
val initializationLogger = loggerFactory.getTracedLogger(LedgerApiStore.getClass)
val ledgerApiStorage = LedgerApiStorage
.fromStorageConfig(storageConfig, ledgerParticipantId)
.fold(
error => throw new IllegalStateException(s"Constructing LedgerApiStorage failed: $error"),
identity,
)
val dbConfig = DbSupport.DbConfig(
jdbcUrl = ledgerApiStorage.jdbcUrl,
connectionPool = DbSupport.ConnectionPoolConfig(
connectionPoolSize = storageConfig.numConnectionsLedgerApiServer.unwrap,
connectionTimeout = legderApiDatabaseConnectionTimeout.underlying,
),
postgres = ledgerApiPostgresDataSourceConfig,
)
val numLedgerApi = dbConfig.connectionPool.connectionPoolSize
initializationLogger.info(s"Creating ledger API storage num-ledger-api: $numLedgerApi")
for {
_ <- storageConfig match {
// ledger api server needs an H2 db to run in memory
case _: MemoryStorageConfig =>
new FlywayMigrations(
ledgerApiStorage.jdbcUrl,
loggerFactory,
).migrate()
case _ => Future.unit
}
ledgerApiStore <- DbSupport
.owner(
serverRole = ServerRole.ApiServer,
metrics = metrics,
dbConfig = dbConfig,
loggerFactory = loggerFactory,
)
.map(dbSupport =>
new LedgerApiStore(
ledgerApiDbSupport = dbSupport,
ledgerApiStorage = ledgerApiStorage,
ledgerEndCache = MutableLedgerEndCache(),
stringInterningView = new StringInterningView(loggerFactory),
metrics = metrics,
loggerFactory = loggerFactory,
timeouts = timeouts,
)
)
.acquireFlagCloseable("Ledger API DB Support")
_ <- ledgerApiStore.initializeInMemoryState
} yield ledgerApiStore
}
}

View File

@ -3,6 +3,7 @@
package com.digitalasset.canton.participant.ledger.api
import cats.Eval
import com.daml.executors.executors.{NamedExecutor, QueueAwareExecutor}
import com.daml.ledger.api.v2.experimental_features.ExperimentalCommandInspectionService
import com.daml.ledger.api.v2.state_service.GetActiveContractsResponse
@ -49,7 +50,7 @@ import com.digitalasset.canton.platform.apiserver.ratelimiting.{
ThreadpoolCheck,
}
import com.digitalasset.canton.platform.apiserver.{ApiServiceOwner, LedgerFeatures}
import com.digitalasset.canton.platform.config.{IdentityProviderManagementConfig, ServerRole}
import com.digitalasset.canton.platform.config.IdentityProviderManagementConfig
import com.digitalasset.canton.platform.index.IndexServiceOwner
import com.digitalasset.canton.platform.indexer.{
IndexerConfig,
@ -57,7 +58,6 @@ import com.digitalasset.canton.platform.indexer.{
IndexerStartupMode,
}
import com.digitalasset.canton.platform.store.DbSupport
import com.digitalasset.canton.platform.store.DbSupport.ParticipantDataSourceConfig
import com.digitalasset.canton.platform.store.dao.events.{ContractLoader, LfValueTranslation}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{FutureUtil, SimpleExecutionQueue}
@ -76,19 +76,16 @@ import scala.concurrent.Future
* depending on whether the participant node is a High Availability active or passive replica.
*
* @param config ledger api server configuration
* @param participantDataSourceConfig configuration for the data source (e.g., jdbc url)
* @param dbConfig the Index DB config
* @param executionContext the execution context
*/
class StartableStoppableLedgerApiServer(
config: CantonLedgerApiServerWrapper.Config,
participantDataSourceConfig: ParticipantDataSourceConfig,
dbConfig: DbSupport.DbConfig,
telemetry: Telemetry,
futureSupervisor: FutureSupervisor,
parameters: ParticipantNodeParameters,
commandProgressTracker: CommandProgressTracker,
excludedPackageIds: Set[LfPackageId],
ledgerApiStore: Eval[LedgerApiStore],
)(implicit
executionContext: ExecutionContextIdlenessExecutorService,
actorSystem: ActorSystem,
@ -178,7 +175,6 @@ class StartableStoppableLedgerApiServer(
private def buildLedgerApiServerOwner(
)(implicit traceContext: TraceContext) = {
implicit val loggingContextWithTrace: LoggingContextWithTrace =
LoggingContextWithTrace(loggerFactory, telemetry)
@ -188,8 +184,7 @@ class StartableStoppableLedgerApiServer(
case _ => IndexerStartupMode.JustStart
}
val numIndexer = config.indexerConfig.ingestionParallelism.unwrap
val numLedgerApi = dbConfig.connectionPool.connectionPoolSize
logger.info(s"Creating storage, num-indexer: $numIndexer, num-ledger-api: $numLedgerApi")
logger.info(s"Creating indexer storage, num-indexer: $numIndexer")
val indexServiceConfig = config.serverConfig.indexService
@ -210,6 +205,7 @@ class StartableStoppableLedgerApiServer(
override def bindService(): ServerServiceDefinition =
ApiInfoServiceGrpc.bindService(this, executionContext)
}
val dbSupport = ledgerApiStore.value.ledgerApiDbSupport
for {
(inMemoryState, inMemoryStateUpdaterFlow) <-
@ -221,19 +217,12 @@ class StartableStoppableLedgerApiServer(
executionContext,
tracer,
loggerFactory,
)
)(ledgerApiStore.value.ledgerEndCache, ledgerApiStore.value.stringInterningView)
timedWriteService = new TimedWriteService(config.syncService, config.metrics)
timedReadService = new TimedReadService(config.syncService, config.metrics)
dbSupport <- DbSupport
.owner(
serverRole = ServerRole.ApiServer,
metrics = config.metrics,
dbConfig = dbConfig,
loggerFactory = loggerFactory,
)
indexerHealth <- new IndexerServiceOwner(
config.participantId,
participantDataSourceConfig,
DbSupport.ParticipantDataSourceConfig(ledgerApiStore.value.ledgerApiStorage.jdbcUrl),
timedReadService,
config.indexerConfig,
config.metrics,

View File

@ -1545,7 +1545,7 @@ abstract class ProtocolProcessor[
_ <- ifThenET(!cleanReplay) {
for {
_unit <- {
logger.debug(
logger.info(
show"Finalizing ${steps.requestKind.unquoted} request=${requestId.unwrap} with event $eventO."
)
@ -1573,6 +1573,7 @@ abstract class ProtocolProcessor[
requestCounter,
commitSet,
)
_ = logger.info(show"About to wrap up request ${requestId}")
requestTimestamp = requestId.unwrap
_unit <- EitherT.right[steps.ResultError](
FutureUnlessShutdown.outcomeF(
@ -1586,7 +1587,7 @@ abstract class ProtocolProcessor[
)
} yield pendingSubmissionDataO.foreach(steps.postProcessResult(verdict, _))
}
} yield ()
} yield logger.info(show"Finished async result processing of request ${requestId}")
}
private def checkContradictoryMediatorApprove(

View File

@ -6,8 +6,12 @@ package com.digitalasset.canton.participant.store
import cats.Eval
import cats.syntax.foldable.*
import com.daml.nameof.NameOf.functionFullName
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.config.{BatchingConfig, ProcessingTimeout}
import com.digitalasset.canton.LedgerParticipantId
import com.digitalasset.canton.concurrent.{
ExecutionContextIdlenessExecutorService,
FutureSupervisor,
}
import com.digitalasset.canton.config.{BatchingConfig, ProcessingTimeout, StorageConfig}
import com.digitalasset.canton.lifecycle.{
CloseContext,
FlagCloseable,
@ -20,6 +24,8 @@ import com.digitalasset.canton.logging.{
NamedLogging,
NamedLoggingContext,
}
import com.digitalasset.canton.participant.config.LedgerApiServerConfig
import com.digitalasset.canton.participant.ledger.api.LedgerApiStore
import com.digitalasset.canton.participant.metrics.ParticipantMetrics
import com.digitalasset.canton.participant.sync.SyncDomainPersistentStateLookup
import com.digitalasset.canton.resource.Storage
@ -32,7 +38,6 @@ import com.digitalasset.canton.util.{ErrorUtil, retry}
import com.digitalasset.canton.version.ReleaseProtocolVersion
import org.apache.pekko.stream.Materializer
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.*
/** Some of the state of a participant that is not tied to a domain and must survive restarts.
@ -41,6 +46,7 @@ import scala.concurrent.duration.*
*/
class ParticipantNodePersistentState private (
val settingsStore: ParticipantSettingsStore,
val ledgerApiStore: LedgerApiStore,
val participantEventLog: ParticipantEventLog,
val multiDomainEventLog: MultiDomainEventLog,
val inFlightSubmissionStore: InFlightSubmissionStore,
@ -58,6 +64,7 @@ class ParticipantNodePersistentState private (
inFlightSubmissionStore,
commandDeduplicationStore,
pruningStore,
ledgerApiStore,
)(logger)
}
@ -65,17 +72,20 @@ trait ParticipantNodePersistentStateFactory {
def create(
syncDomainPersistentStates: SyncDomainPersistentStateLookup,
storage: Storage,
storageConfig: StorageConfig,
clock: Clock,
maxDeduplicationDurationO: Option[NonNegativeFiniteDuration],
batching: BatchingConfig,
releaseProtocolVersion: ReleaseProtocolVersion,
metrics: ParticipantMetrics,
ledgerParticipantId: LedgerParticipantId,
ledgerApiServerConfig: LedgerApiServerConfig,
indexedStringStore: IndexedStringStore,
timeouts: ProcessingTimeout,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
)(implicit
ec: ExecutionContext,
ec: ExecutionContextIdlenessExecutorService,
mat: Materializer,
traceContext: TraceContext,
): FutureUnlessShutdown[Eval[ParticipantNodePersistentState]]
@ -85,28 +95,34 @@ object ParticipantNodePersistentStateFactory extends ParticipantNodePersistentSt
override def create(
syncDomainPersistentStates: SyncDomainPersistentStateLookup,
storage: Storage,
storageConfig: StorageConfig,
clock: Clock,
maxDeduplicationDurationO: Option[NonNegativeFiniteDuration],
batching: BatchingConfig,
releaseProtocolVersion: ReleaseProtocolVersion,
metrics: ParticipantMetrics,
ledgerParticipantId: LedgerParticipantId,
ledgerApiServerConfig: LedgerApiServerConfig,
indexedStringStore: IndexedStringStore,
timeouts: ProcessingTimeout,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
)(implicit
ec: ExecutionContext,
ec: ExecutionContextIdlenessExecutorService,
mat: Materializer,
traceContext: TraceContext,
): FutureUnlessShutdown[Eval[ParticipantNodePersistentState]] = ParticipantNodePersistentState
.create(
syncDomainPersistentStates,
storage,
storageConfig,
clock,
maxDeduplicationDurationO,
batching,
releaseProtocolVersion,
metrics,
ledgerParticipantId,
ledgerApiServerConfig,
indexedStringStore,
timeouts,
futureSupervisor,
@ -122,17 +138,20 @@ object ParticipantNodePersistentState extends HasLoggerName {
def create(
syncDomainPersistentStates: SyncDomainPersistentStateLookup,
storage: Storage,
storageConfig: StorageConfig,
clock: Clock,
maxDeduplicationDurationO: Option[NonNegativeFiniteDuration],
batching: BatchingConfig,
releaseProtocolVersion: ReleaseProtocolVersion,
metrics: ParticipantMetrics,
ledgerParticipantId: LedgerParticipantId,
ledgerApiServerConfig: LedgerApiServerConfig,
indexedStringStore: IndexedStringStore,
timeouts: ProcessingTimeout,
futureSupervisor: FutureSupervisor,
loggerFactory: NamedLoggerFactory,
)(implicit
ec: ExecutionContext,
ec: ExecutionContextIdlenessExecutorService,
mat: Materializer,
traceContext: TraceContext,
): FutureUnlessShutdown[ParticipantNodePersistentState] = {
@ -235,10 +254,22 @@ object ParticipantNodePersistentState extends HasLoggerName {
loggerFactory,
)
)
ledgerApiStore <- FutureUnlessShutdown.outcomeF(
LedgerApiStore.initialize(
storageConfig = storageConfig,
ledgerParticipantId = ledgerParticipantId,
legderApiDatabaseConnectionTimeout = ledgerApiServerConfig.databaseConnectionTimeout,
ledgerApiPostgresDataSourceConfig = ledgerApiServerConfig.postgresDataSource,
timeouts = timeouts,
loggerFactory = loggerFactory,
metrics = metrics.ledgerApiServer,
)
)
_ = flagCloseable.close()
} yield {
new ParticipantNodePersistentState(
settingsStore,
ledgerApiStore,
participantEventLog,
multiDomainEventLog,
inFlightSubmissionStore,

View File

@ -14,6 +14,7 @@ import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt}
import com.digitalasset.canton.config.{
BatchingConfig,
CachingConfigs,
CommunityStorageConfig,
DefaultProcessingTimeouts,
ProcessingTimeout,
TestingConfigInternal,
@ -26,6 +27,7 @@ import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.ledger.participant.state.CompletionInfo
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, UnlessShutdown}
import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.participant.config.LedgerApiServerConfig
import com.digitalasset.canton.participant.metrics.ParticipantTestMetrics
import com.digitalasset.canton.participant.protocol.EngineController.EngineAbortStatus
import com.digitalasset.canton.participant.protocol.Phase37Synchronizer.RequestOutcome
@ -258,11 +260,14 @@ class ProtocolProcessorTest
.create(
syncDomainPersistentStates,
new MemoryStorage(loggerFactory, timeouts),
CommunityStorageConfig.Memory(),
clock,
None,
BatchingConfig(),
testedReleaseProtocolVersion,
ParticipantTestMetrics,
participant.toLf,
LedgerApiServerConfig(),
indexedStringStore,
timeouts,
futureSupervisor,

View File

@ -1 +1 @@
20240629.13565.vdf3ebde5
20240701.13579.vfc58fab6

View File

@ -773,7 +773,7 @@ abstract class UpgradesSpec(val suffix: String)
case _ => {}
}
cantonLogSrc should include regex (
s"KNOWN_DAR_VERSION\\(.+,.+\\): A DAR with the same version number has previously been uploaded. err-context:\\{existingPackage=$testPackageV2Id, location=.+, packageVersion=$packageVersion, uploadedPackageId=$testPackageV1Id\\}"
s"KNOWN_DAR_VERSION\\(.+,.+\\): A DAR with the same version number has previously been uploaded. err-context:\\{existingPackage=$testPackageV1Id, location=.+, packageVersion=$packageVersion, uploadedPackageId=$testPackageV2Id\\}"
)
uploadV2Result match {
case None =>