diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/util/ResourceUtil.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/util/ResourceUtil.scala index 3ba54381fea..3d2e96b2ae7 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/util/ResourceUtil.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/util/ResourceUtil.scala @@ -63,7 +63,8 @@ object ResourceUtil { )(implicit M: MonadThrow[M], TM: Thereafter[M]): M[V] = { import Thereafter.syntax.* import cats.syntax.flatMap.* - MonadThrow[M].fromTry(Try(f(r))).flatten.thereafter(_ => r.close()) + val resource: T = r + MonadThrow[M].fromTry(Try(f(resource))).flatten.thereafter(_ => resource.close()) } } diff --git a/sdk/canton/community/common/src/main/daml/CantonExamples/daml.yaml b/sdk/canton/community/common/src/main/daml/CantonExamples/daml.yaml index e3f84151532..b74e1086fe1 100644 --- a/sdk/canton/community/common/src/main/daml/CantonExamples/daml.yaml +++ b/sdk/canton/community/common/src/main/daml/CantonExamples/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: CantonExamples diff --git a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/domain/grpc/SequencerInfoLoaderTest.scala b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/domain/grpc/SequencerInfoLoaderTest.scala index 306fddbc35a..08c5c1ee374 100644 --- a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/domain/grpc/SequencerInfoLoaderTest.scala +++ b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/domain/grpc/SequencerInfoLoaderTest.scala @@ -224,8 +224,7 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext loggerFactory = loggerFactory, ) // Futures in loadSequencerInfoAsync can race such that more than the expected tolerance can be returned. - // Increase to 2 if result size checks are flaky - val toleranceForRaciness = 1 + val toleranceForRaciness = 3 "return complete results when requested" in { val scs = sequencerConnections(10) diff --git a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/util/ResourceUtilTest.scala b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/util/ResourceUtilTest.scala index 3a18046faf1..243a05e0c9b 100644 --- a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/util/ResourceUtilTest.scala +++ b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/util/ResourceUtilTest.scala @@ -7,6 +7,7 @@ import cats.data.EitherT import com.digitalasset.canton.{BaseTest, HasExecutionContext} import org.scalatest.wordspec.AnyWordSpec +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Future class ResourceUtilTest extends AnyWordSpec with BaseTest with HasExecutionContext { @@ -223,6 +224,25 @@ class ResourceUtilTest extends AnyWordSpec with BaseTest with HasExecutionContex .getSuppressed()(0) shouldBe TestException("Something happened when closing") } + "create a resource only once" in { + val counter = new AtomicInteger(0) + + def newResource() = new AutoCloseable { + + // Increment the counter when the resource is created + counter.incrementAndGet() + + override def close(): Unit = () + } + + ResourceUtil + .withResourceEitherT(newResource())(_ => EitherTUtil.unit[String]) + .value + .futureValue + + counter.get() shouldBe 1 + } + } } } diff --git a/sdk/canton/community/demo/src/main/daml/ai-analysis/daml.yaml b/sdk/canton/community/demo/src/main/daml/ai-analysis/daml.yaml index 259019ad9c8..a2423f37802 100644 --- a/sdk/canton/community/demo/src/main/daml/ai-analysis/daml.yaml +++ b/sdk/canton/community/demo/src/main/daml/ai-analysis/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: ai-analysis diff --git a/sdk/canton/community/demo/src/main/daml/bank/daml.yaml b/sdk/canton/community/demo/src/main/daml/bank/daml.yaml index dd522de36bc..9c69e74b64a 100644 --- a/sdk/canton/community/demo/src/main/daml/bank/daml.yaml +++ b/sdk/canton/community/demo/src/main/daml/bank/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: bank diff --git a/sdk/canton/community/demo/src/main/daml/doctor/daml.yaml b/sdk/canton/community/demo/src/main/daml/doctor/daml.yaml index 9684a374f66..38b688f00dc 100644 --- a/sdk/canton/community/demo/src/main/daml/doctor/daml.yaml +++ b/sdk/canton/community/demo/src/main/daml/doctor/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: doctor diff --git a/sdk/canton/community/demo/src/main/daml/health-insurance/daml.yaml b/sdk/canton/community/demo/src/main/daml/health-insurance/daml.yaml index 813c9ec2412..bbc2283f0bc 100644 --- a/sdk/canton/community/demo/src/main/daml/health-insurance/daml.yaml +++ b/sdk/canton/community/demo/src/main/daml/health-insurance/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: health-insurance diff --git a/sdk/canton/community/demo/src/main/daml/medical-records/daml.yaml b/sdk/canton/community/demo/src/main/daml/medical-records/daml.yaml index 0f7c3d6762f..c6a591757f2 100644 --- a/sdk/canton/community/demo/src/main/daml/medical-records/daml.yaml +++ b/sdk/canton/community/demo/src/main/daml/medical-records/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: medical-records diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/BlockSequencerStateManager.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/BlockSequencerStateManager.scala index fc2b1a35334..2da5b74fcdd 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/BlockSequencerStateManager.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/BlockSequencerStateManager.scala @@ -295,6 +295,8 @@ class BlockSequencerStateManager( s"and ${chunk.inFlightAggregationUpdates.size} in-flight aggregation updates" )(handleChunkUpdate(priorHead, chunk, dbSequencerIntegration)(traceContext)) case complete: CompleteBlockUpdate => + // TODO(#18401): Consider: wait for the DBS watermark to be updated to the blocks last timestamp + // in a supervisory manner, to detect things not functioning properly LoggerUtil.clueF( s"Storing completion of block $currentBlockNumber" )(handleComplete(priorHead, complete.block)(traceContext)) diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/SequencerBlockStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/SequencerBlockStore.scala index aa132ec4379..35ae7cdd8ec 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/SequencerBlockStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/SequencerBlockStore.scala @@ -306,6 +306,7 @@ object SequencerBlockStore { enableAdditionalConsistencyChecks: Boolean, checkedInvariant: Option[Member], loggerFactory: NamedLoggerFactory, + unifiedSequencer: Boolean, )(implicit executionContext: ExecutionContext ): SequencerBlockStore = @@ -320,6 +321,7 @@ object SequencerBlockStore { enableAdditionalConsistencyChecks, checkedInvariant, loggerFactory, + unifiedSequencer = unifiedSequencer, ) } diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/db/DbSequencerBlockStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/db/DbSequencerBlockStore.scala index f67e92ea32f..0d7b67bc50b 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/db/DbSequencerBlockStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/data/db/DbSequencerBlockStore.scala @@ -30,6 +30,7 @@ import com.digitalasset.canton.domain.sequencing.sequencer.{ InternalSequencerPruningStatus, } import com.digitalasset.canton.logging.NamedLoggerFactory +import com.digitalasset.canton.resource.DbStorage.Profile.{H2, Oracle, Postgres} import com.digitalasset.canton.resource.IdempotentInsert.insertVerifyingConflicts import com.digitalasset.canton.resource.{DbStorage, DbStore} import com.digitalasset.canton.sequencing.OrdinarySerializedEvent @@ -54,6 +55,7 @@ class DbSequencerBlockStore( enableAdditionalConsistencyChecks: Boolean, private val checkedInvariant: Option[Member], override protected val loggerFactory: NamedLoggerFactory, + unifiedSequencer: Boolean, )(implicit override protected val executionContext: ExecutionContext) extends SequencerBlockStore with DbStore { @@ -74,7 +76,19 @@ class DbSequencerBlockStore( override def readHead(implicit traceContext: TraceContext): Future[BlockEphemeralState] = storage.query( for { - blockInfoO <- readLatestBlockInfo() + blockInfoO <- { + if (unifiedSequencer) { + for { + watermark <- safeWaterMarkDBIO + blockInfoO <- watermark match { + case Some(watermark) => findBlockContainingTimestamp(watermark) + case None => readLatestBlockInfo() + } + } yield blockInfoO + } else { + readLatestBlockInfo() + } + } state <- blockInfoO match { case None => DBIO.successful(BlockEphemeralState.empty) case Some(blockInfo) => @@ -92,16 +106,32 @@ class DbSequencerBlockStore( functionFullName, ) + private def safeWaterMarkDBIO: DBIOAction[Option[CantonTimestamp], NoStream, Effect.Read] = { + val query = storage.profile match { + case _: H2 | _: Postgres => + // TODO(#18401): Below only works for a single instance database sequencer + sql"select min(watermark_ts) from sequencer_watermarks" + case _: Oracle => + sql"select min(watermark_ts) from sequencer_watermarks" + } + // `min` may return null that is wrapped into None + query.as[Option[CantonTimestamp]].headOption.map(_.flatten) + } + + private def findBlockContainingTimestamp( + timestamp: CantonTimestamp + ): DBIOAction[Option[BlockInfo], NoStream, Effect.Read] = + (sql"""select height, latest_event_ts, latest_sequencer_event_ts from seq_block_height where latest_event_ts >= $timestamp order by height """ ++ topRow) + .as[BlockInfo] + .headOption + override def readStateForBlockContainingTimestamp( timestamp: CantonTimestamp )(implicit traceContext: TraceContext): EitherT[Future, InvalidTimestamp, BlockEphemeralState] = EitherT( storage.query( for { - heightAndTimestamp <- - (sql"""select height, latest_event_ts, latest_sequencer_event_ts from seq_block_height where latest_event_ts >= $timestamp order by height """ ++ topRow) - .as[BlockInfo] - .headOption + heightAndTimestamp <- findBlockContainingTimestamp(timestamp) state <- heightAndTimestamp match { case None => DBIO.successful(Left(InvalidTimestamp(timestamp))) case Some(block) => readAtBlock(block).map(Right.apply) diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencer.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencer.scala index 2bb711285d3..e24c9a7aad3 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencer.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/DatabaseSequencer.scala @@ -14,6 +14,7 @@ import com.digitalasset.canton.crypto.DomainSyncCryptoClient import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.domain.metrics.SequencerMetrics import com.digitalasset.canton.domain.sequencing.sequencer.Sequencer.RegisterError +import com.digitalasset.canton.domain.sequencing.sequencer.SequencerWriter.ResetWatermark import com.digitalasset.canton.domain.sequencing.sequencer.errors.* import com.digitalasset.canton.domain.sequencing.sequencer.store.SequencerStore.SequencerPruningResult import com.digitalasset.canton.domain.sequencing.sequencer.store.* @@ -174,7 +175,7 @@ class DatabaseSequencer( protected val memberValidator: SequencerMemberValidator = store - protected def resetWatermarkTo: Option[CantonTimestamp] = None + protected def resetWatermarkTo: ResetWatermark = SequencerWriter.ResetWatermarkToClockNow // Only start pruning scheduler after `store` variable above has been initialized to avoid racy NPE withNewTraceContext { implicit traceContext => diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerFactory.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerFactory.scala index ee5ee5de45f..6e1e10164ef 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerFactory.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerFactory.scala @@ -85,6 +85,8 @@ abstract class DatabaseSequencerFactory( DefaultMaxSqlInListSize, timeouts, loggerFactory, + unifiedSequencer = + true, // // TODO(#18401): does not affect the usage below, but should be correctly set // At the moment this store instance is only used for the sequencer initialization, // if it is retrying a db operation and the factory is closed, the store will be closed as well; // if request succeeds, the store will no be retrying and doesn't need to be closed diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriter.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriter.scala index 2f352866abc..2daba18f452 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriter.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriter.scala @@ -14,6 +14,7 @@ import com.digitalasset.canton.config import com.digitalasset.canton.config.ProcessingTimeout import com.digitalasset.canton.config.RequireTypes.{PositiveInt, PositiveNumeric} import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.domain.sequencing.sequencer.SequencerWriter.ResetWatermark import com.digitalasset.canton.domain.sequencing.sequencer.WriterStartupError.FailedToInitializeFromSnapshot import com.digitalasset.canton.domain.sequencing.sequencer.store.* import com.digitalasset.canton.health.admin.data.SequencerHealthStatus @@ -148,6 +149,7 @@ class SequencerWriter( maxSqlInListSize, timeouts, loggerFactory, + unifiedSequencer = unifiedSequencer, // Overriding the store's close context with the writers, so that when the writer gets closed, the store // stops retrying forever overrideCloseContext = Some(this.closeContext), @@ -236,7 +238,7 @@ class SequencerWriter( */ def startOrLogError( initialSnapshot: Option[SequencerInitialState], - resetWatermarkTo: => Option[CantonTimestamp], + resetWatermarkTo: => ResetWatermark, )(implicit traceContext: TraceContext): Future[Unit] = start(initialSnapshot, resetWatermarkTo).fold( err => logger.error(s"Failed to startup sequencer writer: $err"), @@ -244,8 +246,9 @@ class SequencerWriter( ) def start( + // TODO(#18401): Move initialization from snapshot into the sequencer factory initialSnapshot: Option[SequencerInitialState] = None, - resetWatermarkTo: => Option[CantonTimestamp], + resetWatermarkTo: => ResetWatermark, )(implicit traceContext: TraceContext): EitherT[Future, WriterStartupError, Unit] = performUnlessClosingEitherT[WriterStartupError, Unit]( functionFullName, @@ -282,19 +285,21 @@ class SequencerWriter( _ <- expectedCommitMode .fold(EitherTUtil.unit[String])(writerStore.validateCommitMode) .leftMap(WriterStartupError.BadCommitMode) - resetWatermarkToO = resetWatermarkTo + resetWatermarkToValue = resetWatermarkTo _ <- { - resetWatermarkToO - .fold(EitherTUtil.unit[String]) { watermark => + (resetWatermarkToValue match { + case SequencerWriter.ResetWatermarkToClockNow | + SequencerWriter.DoNotResetWatermark => + EitherT.pure[Future, String](()) + case SequencerWriter.ResetWatermarkToTimestamp(timestamp) => logger.debug( - s"Resetting the watermark to an externally passed value of $watermark" + s"Resetting the watermark to the externally passed timestamp of $timestamp" ) - writerStore.resetWatermark(watermark).leftMap(_.toString) - } - .leftMap(WriterStartupError.WatermarkResetError) + writerStore.resetWatermark(timestamp).leftMap(_.toString) + }).leftMap(WriterStartupError.WatermarkResetError) } onlineTimestamp <- EitherT.right[WriterStartupError]( - runRecovery(writerStore, resetWatermarkToO) + runRecovery(writerStore, resetWatermarkToValue) ) _ <- EitherT.right[WriterStartupError](waitForOnline(onlineTimestamp)) } yield () @@ -348,11 +353,23 @@ class SequencerWriter( @SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) private def runRecovery( store: SequencerWriterStore, - resetWatermarkTo: Option[CantonTimestamp], + resetWatermarkTo: ResetWatermark, )(implicit traceContext: TraceContext): Future[CantonTimestamp] = for { - _ <- store.deleteEventsPastWatermark() - onlineTimestamp <- store.goOnline(resetWatermarkTo.getOrElse(clock.now)) + pastWatermarkO <- store.deleteEventsPastWatermark() + goOnlineAt = resetWatermarkTo match { + case SequencerWriter.ResetWatermarkToClockNow => + clock.now + case SequencerWriter.ResetWatermarkToTimestamp(timestamp) => + timestamp + case SequencerWriter.DoNotResetWatermark => + // Used in unified sequencer mode only, do not jump to now + // the default value ensures that we are not ahead of BlockSequencer's timestamps, when rehydrating + pastWatermarkO.getOrElse(CantonTimestamp.MinValue) + } + onlineTimestamp <- store.goOnline( + goOnlineAt + ) // actual online timestamp depends on other instances _ = if (clock.isSimClock && clock.now < onlineTimestamp) { logger.debug(s"The sequencer will not start unless sim clock moves to $onlineTimestamp") logger.debug( @@ -411,7 +428,7 @@ class SequencerWriter( .getAndSet(None) .parTraverse_(_.close()) .recover { case NonFatal(e) => - logger.debug("Failed to close running writer", e) + logger.debug("Running writer will be recovered, due to non-fatal error:", e) } // determine whether we can run recovery or not @@ -429,7 +446,9 @@ class SequencerWriter( FutureUtil.doNotAwait( // Wait for the writer store to be closed before re-starting, otherwise we might end up with // concurrent write stores trying to connect to the DB within the same sequencer node - closed.flatMap(_ => startOrLogError(None, None)(traceContext)), + closed.flatMap(_ => + startOrLogError(None, SequencerWriter.DoNotResetWatermark)(traceContext) + ), "SequencerWriter recovery", ) } else { @@ -516,4 +535,9 @@ object SequencerWriter { ) } + sealed trait ResetWatermark + final case object DoNotResetWatermark extends ResetWatermark + final case object ResetWatermarkToClockNow extends ResetWatermark + final case class ResetWatermarkToTimestamp(timestamp: CantonTimestamp) extends ResetWatermark + } diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencer.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencer.scala index f2c9d36e992..c98c5665540 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencer.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencer.scala @@ -138,9 +138,8 @@ class BlockSequencer( override private[sequencing] def firstSequencerCounterServeableForSequencer: SequencerCounter = stateManager.firstSequencerCounterServableForSequencer - override protected def resetWatermarkTo: Option[CantonTimestamp] = { - Some(stateManager.getHeadState.block.lastTs) - } + override protected def resetWatermarkTo: SequencerWriter.ResetWatermark = + SequencerWriter.ResetWatermarkToTimestamp(stateManager.getHeadState.block.lastTs) private val (killSwitchF, localEventsQueue, done) = { val headState = stateManager.getHeadState diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencerFactory.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencerFactory.scala index 9bd6fd9b725..a63722c70af 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencerFactory.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/block/BlockSequencerFactory.scala @@ -71,6 +71,7 @@ abstract class BlockSequencerFactory( nodeParameters.enableAdditionalConsistencyChecks && !nodeParameters.useUnifiedSequencer )(sequencerId), loggerFactory, + unifiedSequencer = nodeParameters.useUnifiedSequencer, ) private val trafficPurchasedStore = TrafficPurchasedStore( diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStore.scala index ad0ec278c6f..6d904a994a7 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStore.scala @@ -59,6 +59,7 @@ class DbSequencerStore( maxInClauseSize: PositiveNumeric[Int], override protected val timeouts: ProcessingTimeout, override protected val loggerFactory: NamedLoggerFactory, + unifiedSequencer: Boolean, overrideCloseContext: Option[CloseContext] = None, )(protected implicit val executionContext: ExecutionContext) extends SequencerStore @@ -392,6 +393,30 @@ class DbSequencerStore( functionFullName, ) + /** In unified sequencer payload ids are deterministic (these are sequencing times from the block sequencer), + * so we can somewhat safely ignore conflicts arising from sequencer restarts, crash recovery, ha lock loses, + * unlike the complicate `savePayloads` method below + */ + private def savePayloadsUS(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit + traceContext: TraceContext + ): EitherT[Future, SavePayloadsError, Unit] = { + val insertSql = + """insert into sequencer_payloads (id, instance_discriminator, content) values (?, ?, ?) on conflict do nothing""" + + EitherT.right[SavePayloadsError]( + storage + .queryAndUpdate( + DbStorage.bulkOperation(insertSql, payloads, storage.profile) { pp => payload => + pp >> payload.id.unwrap + pp >> instanceDiscriminator + pp >> payload.content + }, + functionFullName, + ) + .map(_ => ()) + ) + } + /** Save the provided payloads to the store. * * For DB implementations we suspect that this will be a hot spot for performance primarily due to size of the payload @@ -424,7 +449,10 @@ class DbSequencerStore( * - Finally we filter to payloads that haven't yet been successfully inserted and go back to the first step attempting * to reinsert just this subset. */ - override def savePayloads(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit + private def savePayloadsResolvingConflicts( + payloads: NonEmpty[Seq[Payload]], + instanceDiscriminator: UUID, + )(implicit traceContext: TraceContext ): EitherT[Future, SavePayloadsError, Unit] = { @@ -544,6 +572,15 @@ class DbSequencerStore( go(payloads) } + override def savePayloads(payloads: NonEmpty[Seq[Payload]], instanceDiscriminator: UUID)(implicit + traceContext: TraceContext + ): EitherT[Future, SavePayloadsError, Unit] = + if (unifiedSequencer) { + savePayloadsUS(payloads, instanceDiscriminator) + } else { + savePayloadsResolvingConflicts(payloads, instanceDiscriminator) + } + override def saveEvents(instanceIndex: Int, events: NonEmpty[Seq[Sequenced[PayloadId]]])(implicit traceContext: TraceContext ): Future[Unit] = { @@ -987,21 +1024,32 @@ class DbSequencerStore( override def deleteEventsPastWatermark( instanceIndex: Int - )(implicit traceContext: TraceContext): Future[Unit] = + )(implicit traceContext: TraceContext): Future[Option[CantonTimestamp]] = for { + watermarkO <- storage.query( + sql"select watermark_ts from sequencer_watermarks where node_index = $instanceIndex" + .as[CantonTimestamp] + .headOption, + functionFullName, + ) + watermark = watermarkO.getOrElse(CantonTimestamp.MinValue) + // TODO(#18401): Also cleanup payloads (beyond the payload to event margin) eventsRemoved <- storage.update( { sqlu""" delete from sequencer_events where node_index = $instanceIndex - and ts > coalesce((select watermark_ts from sequencer_watermarks where node_index = $instanceIndex), -1) + and ts > $watermark """ }, functionFullName, ) - } yield logger.debug( - s"Removed at least $eventsRemoved that were past the last watermark for this sequencer" - ) + } yield { + logger.debug( + s"Removed at least $eventsRemoved that were past the last watermark ($watermarkO) for this sequencer" + ) + watermarkO + } override def saveCounterCheckpoint( memberId: SequencerMemberId, diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/InMemorySequencerStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/InMemorySequencerStore.scala index 08f5256f1d2..abced82e065 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/InMemorySequencerStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/InMemorySequencerStore.scala @@ -43,6 +43,7 @@ class UniqueKeyViolationException(message: String) extends RuntimeException(mess class InMemorySequencerStore( protocolVersion: ProtocolVersion, + unifiedSequencer: Boolean, protected val loggerFactory: NamedLoggerFactory, )(implicit protected val executionContext: ExecutionContext @@ -99,8 +100,13 @@ class InMemorySequencerStore( .flatMap { existingPayload => // if we found an existing payload it must have a matching instance discriminator if (existingPayload.instanceDiscriminator == instanceDiscriminator) None // no error - else - SavePayloadsError.ConflictingPayloadId(id, existingPayload.instanceDiscriminator).some + else { + if (unifiedSequencer) { + None + } else { + SavePayloadsError.ConflictingPayloadId(id, existingPayload.instanceDiscriminator).some + } + } } .toLeft(()) .leftWiden[SavePayloadsError] @@ -212,8 +218,8 @@ class InMemorySequencerStore( /** No implementation as only required for crash recovery */ override def deleteEventsPastWatermark(instanceIndex: Int)(implicit traceContext: TraceContext - ): Future[Unit] = - Future.unit + ): Future[Option[CantonTimestamp]] = + Future.successful(watermark.get().map(_.timestamp)) override def saveCounterCheckpoint( memberId: SequencerMemberId, diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStore.scala index d679212ea6a..1e3a88d9612 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStore.scala @@ -567,10 +567,11 @@ trait SequencerStore extends SequencerMemberValidator with NamedLogging with Aut /** Delete all events that are ahead of the watermark of this sequencer. * These events will not have been read and should be removed before returning the sequencer online. * Should not be called alongside updating the watermark for this sequencer and only while the sequencer is offline. + * Returns the watermark that was used for the deletion. */ def deleteEventsPastWatermark(instanceIndex: Int)(implicit traceContext: TraceContext - ): Future[Unit] + ): Future[Option[CantonTimestamp]] /** Save a checkpoint that as of a certain timestamp the member has this counter value. * Any future subscriptions can then use this as a starting point for serving their event stream rather than starting from 0. @@ -827,10 +828,16 @@ object SequencerStore { maxInClauseSize: PositiveNumeric[Int], timeouts: ProcessingTimeout, loggerFactory: NamedLoggerFactory, + unifiedSequencer: Boolean, overrideCloseContext: Option[CloseContext] = None, )(implicit executionContext: ExecutionContext): SequencerStore = storage match { - case _: MemoryStorage => new InMemorySequencerStore(protocolVersion, loggerFactory) + case _: MemoryStorage => + new InMemorySequencerStore( + protocolVersion, + unifiedSequencer = unifiedSequencer, + loggerFactory, + ) case dbStorage: DbStorage => new DbSequencerStore( dbStorage, @@ -838,6 +845,7 @@ object SequencerStore { maxInClauseSize, timeouts, loggerFactory, + unifiedSequencer = unifiedSequencer, overrideCloseContext, ) } diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerWriterStore.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerWriterStore.scala index 1c74c0a7972..64dd18a1da8 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerWriterStore.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerWriterStore.scala @@ -92,8 +92,11 @@ trait SequencerWriterStore extends AutoCloseable { /** Delete all events that are ahead of the watermark of this sequencer. * These events will not have been read and should be removed before returning the sequencer online. * Should not be called alongside updating the watermark for this sequencer and only while the sequencer is offline. + * Returns the watermark that was used for the deletion. */ - def deleteEventsPastWatermark()(implicit traceContext: TraceContext): Future[Unit] = + def deleteEventsPastWatermark()(implicit + traceContext: TraceContext + ): Future[Option[CantonTimestamp]] = store.deleteEventsPastWatermark(instanceIndex) } diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerReaderTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerReaderTest.scala index 0677c5ed7b5..06faf56a90a 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerReaderTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerReaderTest.scala @@ -116,7 +116,11 @@ class SequencerReaderTest extends FixtureAsyncWordSpec with BaseTest { new AtomicBoolean(true) // should the latest timestamp be added to the signaller when stored val actorSystem: ActorSystem = ActorSystem(classOf[SequencerReaderTest].getSimpleName) implicit val materializer: Materializer = Materializer(actorSystem) - val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory) + val store = new InMemorySequencerStore( + protocolVersion = testedProtocolVersion, + unifiedSequencer = testedUseUnifiedSequencer, + loggerFactory = loggerFactory, + ) val instanceIndex: Int = 0 // create a spy so we can add verifications on how many times methods were called val storeSpy: InMemorySequencerStore = spy[InMemorySequencerStore](store) diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerTest.scala index fc2f0f558bd..013f10a8c5a 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerTest.scala @@ -73,7 +73,11 @@ class SequencerTest extends FixtureAsyncWordSpec with BaseTest with HasExecution Some(parallelExecutionContext), ) private val materializer = implicitly[Materializer] - val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory) + val store = new InMemorySequencerStore( + protocolVersion = testedProtocolVersion, + unifiedSequencer = testedUseUnifiedSequencer, + loggerFactory = loggerFactory, + ) val clock = new WallClock(timeouts, loggerFactory = loggerFactory) val crypto: DomainSyncCryptoClient = valueOrFail( TestingTopology(sequencerGroup = diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterSourceTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterSourceTest.scala index b15b59612a4..534b0d148a2 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterSourceTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterSourceTest.scala @@ -84,7 +84,9 @@ class SequencerWriterSourceTest extends AsyncWordSpec with BaseTest with HasExec class InMemoryStoreWithTimeAdvancement(lFactory: NamedLoggerFactory)(implicit ec: ExecutionContext - ) extends InMemorySequencerStore(testedProtocolVersion, lFactory)(ec) { + ) extends InMemorySequencerStore(testedProtocolVersion, testedUseUnifiedSequencer, lFactory)( + ec + ) { private val timeAdvancement = new AtomicReference[java.time.Duration](java.time.Duration.ZERO) def setClockAdvanceBeforeSavePayloads(duration: java.time.Duration): Unit = diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterTest.scala index 3dbbeb74e0c..bd08ad65b9a 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/SequencerWriterTest.scala @@ -25,7 +25,6 @@ import com.digitalasset.canton.time.SimClock import com.digitalasset.canton.topology.DefaultTestIdentities import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.MonadUtil -import org.mockito.ArgumentMatchers import org.scalatest.FutureOutcome import org.scalatest.wordspec.FixtureAsyncWordSpec @@ -60,8 +59,11 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest { val clock = new SimClock(loggerFactory = loggerFactory) val runningFlows = mutable.Buffer[MockRunningWriterFlow]() val storage = new MemoryStorage(loggerFactory, timeouts) - val store = new InMemorySequencerStore(testedProtocolVersion, loggerFactory) - val storeSpy = spy(store) + val store = new InMemorySequencerStore( + protocolVersion = testedProtocolVersion, + unifiedSequencer = testedUseUnifiedSequencer, + loggerFactory = loggerFactory, + ) val instanceIndex = 0 val storageFactory = new MockWriterStoreFactory() @@ -82,11 +84,6 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest { override val generalStore: SequencerStore = store } - def setupNextGoOnlineTimestamp(ts: CantonTimestamp): Unit = - when( - storeSpy.goOnline(ArgumentMatchers.eq(instanceIndex), any[CantonTimestamp])(anyTraceContext) - ) - .thenReturn(Future.successful(ts)) def numberOfFlowsCreated: Int = runningFlows.size def latestRunningWriterFlowPromise: Promise[Unit] = @@ -127,21 +124,14 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest { "wait for online timestamp to be reached" in { env => import env.* - // set our time to ts2 but the returned goOnline timestamp to ts4 - clock.advanceTo(ts(3)) - setupNextGoOnlineTimestamp(ts(4)) - - val startET = writer.start(None, None) + val startET = writer.start(None, SequencerWriter.ResetWatermarkToClockNow) for { _ <- allowScheduledFuturesToComplete _ = writer.isRunning shouldBe false - _ = clock.advanceTo(ts(3)) // still not reached our online time _ <- allowScheduledFuturesToComplete _ = writer.isRunning shouldBe false _ = startET.value.isCompleted shouldBe false - // finally reach our online timestamp - _ = clock.advanceTo(ts(4)) _ <- valueOrFail(startET)("Starting Sequencer Writer") } yield writer.isRunning shouldBe true } @@ -163,10 +153,11 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest { ) for { - _ <- valueOrFail(writer.start(None, None))("Starting writer") + _ <- valueOrFail(writer.start(None, SequencerWriter.ResetWatermarkToClockNow))( + "Starting writer" + ) _ = writer.isRunning shouldBe true - // set the next goOffline timestamp to way in the future to delay the recovery so we can run assertions - _ = setupNextGoOnlineTimestamp(ts(10)) + // have the writer flow blow up with an exception saying we've been knocked offline _ = latestRunningWriterFlowPromise.failure(new SequencerOfflineException(42)) _ <- allowScheduledFuturesToComplete @@ -175,11 +166,10 @@ class SequencerWriterTest extends FixtureAsyncWordSpec with BaseTest { // load balancers sendError <- leftOrFail(writer.send(mockSubmissionRequest))("send when unavailable") _ = sendError shouldBe SendAsyncError.Unavailable("Unavailable") - // now progress to allow crash recovery to complete - _ = clock.advanceTo(ts(10)) // there may be a number of future hops to work its way through completing the second flow which we currently // can't capture via flushes, so just check it eventually happens _ <- MonadUtil.sequentialTraverse(0 until 10)(_ => allowScheduledFuturesToComplete) + _ = writer.isRunning shouldBe true } yield { numberOfFlowsCreated shouldBe 2 writer.isRunning shouldBe true diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStoreTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStoreTest.scala index 8d0f343d40c..931d6c4e7fb 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStoreTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/DbSequencerStoreTest.scala @@ -27,6 +27,7 @@ trait DbSequencerStoreTest extends SequencerStoreTest with MultiTenantedSequence MaxInClauseSize, timeouts, loggerFactory, + unifiedSequencer = testedUseUnifiedSequencer, ) ) behave like multiTenantedSequencerStore(() => @@ -36,6 +37,7 @@ trait DbSequencerStoreTest extends SequencerStoreTest with MultiTenantedSequence MaxInClauseSize, timeouts, loggerFactory, + unifiedSequencer = testedUseUnifiedSequencer, ) ) } diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTest.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTest.scala index 18d723ec81d..e9bf9cce46f 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTest.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTest.scala @@ -461,7 +461,7 @@ trait SequencerStoreTest } "save payloads" should { - "return an error if there is a conflicting id" in { + "return an error if there is a conflicting id for database sequencer" onlyRunWhen (!testedUseUnifiedSequencer) in { val env = Env() val Seq(p1, p2, p3) = 0.until(3).map(n => Payload(PayloadId(ts(n)), ByteString.copyFromUtf8(n.toString))) @@ -477,6 +477,23 @@ trait SequencerStoreTest )("savePayloads2") } yield error shouldBe SavePayloadsError.ConflictingPayloadId(p2.id, instanceDiscriminator1) } + + "succeed on a conflicting payload id for unified sequencer" onlyRunWhen (testedUseUnifiedSequencer) in { + val env = Env() + val Seq(p1, p2, p3) = + 0.until(3).map(n => Payload(PayloadId(ts(n)), ByteString.copyFromUtf8(n.toString))) + + // we'll first write p1 and p2 that should work + // then write p2 and p3 with a separate instance discriminator which should fail due to a conflicting id + for { + _ <- valueOrFail(env.store.savePayloads(NonEmpty(Seq, p1, p2), instanceDiscriminator1))( + "savePayloads1" + ) + _ <- valueOrFail( + env.store.savePayloads(NonEmpty(Seq, p2, p3), instanceDiscriminator2) + )("savePayloads2") + } yield succeed + } } "counter checkpoints" should { @@ -1144,5 +1161,20 @@ trait SequencerStoreTest } } } + + "deleteEventsPastWatermark" should { + "return the watermark used for the deletion" in { + val testWatermark = CantonTimestamp.assertFromLong(1719841168208718L) + val env = Env() + import env.* + + for { + _ <- saveWatermark(testWatermark).valueOrFail("saveWatermark") + watermark <- store.deleteEventsPastWatermark(0) + } yield { + watermark shouldBe Some(testWatermark) + } + } + } } } diff --git a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTestInMemory.scala b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTestInMemory.scala index 79663a937bb..c0ad568c7b3 100644 --- a/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTestInMemory.scala +++ b/sdk/canton/community/domain/src/test/scala/com/digitalasset/canton/domain/sequencing/sequencer/store/SequencerStoreTestInMemory.scala @@ -9,7 +9,11 @@ import org.scalatest.wordspec.AsyncWordSpec class SequencerStoreTestInMemory extends AsyncWordSpec with BaseTest with SequencerStoreTest { "InMemorySequencerStore" should { behave like sequencerStore(() => - new InMemorySequencerStore(testedProtocolVersion, loggerFactory) + new InMemorySequencerStore( + protocolVersion = testedProtocolVersion, + unifiedSequencer = testedUseUnifiedSequencer, + loggerFactory = loggerFactory, + ) ) } } diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/InMemoryState.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/InMemoryState.scala index 37a8e4a53fa..5c1283c46fd 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/InMemoryState.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/InMemoryState.scala @@ -23,7 +23,6 @@ import io.opentelemetry.api.trace.Tracer import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -import scala.util.chaining.* /** Wrapper and life-cycle manager for the in-memory Ledger API state. */ private[platform] class InMemoryState( @@ -83,6 +82,9 @@ object InMemoryState { executionContext: ExecutionContext, tracer: Tracer, loggerFactory: NamedLoggerFactory, + )( + mutableLedgerEndCache: MutableLedgerEndCache, + stringInterningView: StringInterningView, )(implicit traceContext: TraceContext): ResourceOwner[InMemoryState] = { val initialLedgerEnd = LedgerEnd.beforeBegin @@ -95,10 +97,7 @@ object InMemoryState { loggerFactory, ) } yield new InMemoryState( - ledgerEndCache = MutableLedgerEndCache() - .tap( - _.set((initialLedgerEnd.lastOffset, initialLedgerEnd.lastEventSeqId)) - ), + ledgerEndCache = mutableLedgerEndCache, dispatcherState = dispatcherState, contractStateCaches = ContractStateCaches.build( initialLedgerEnd.lastOffset, @@ -113,7 +112,7 @@ object InMemoryState { maxBufferedChunkSize = bufferedStreamsPageSize, loggerFactory = loggerFactory, ), - stringInterningView = new StringInterningView(loggerFactory), + stringInterningView = stringInterningView, submissionTracker = submissionTracker, commandProgressTracker = commandProgressTracker, loggerFactory = loggerFactory, diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/LedgerApiServer.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/LedgerApiServer.scala index e06e6fe2e47..85ebe629401 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/LedgerApiServer.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/LedgerApiServer.scala @@ -9,6 +9,8 @@ import com.digitalasset.canton.metrics.LedgerApiServerMetrics import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker import com.digitalasset.canton.platform.config.IndexServiceConfig import com.digitalasset.canton.platform.index.InMemoryStateUpdater +import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache +import com.digitalasset.canton.platform.store.interning.StringInterningView import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer @@ -23,6 +25,9 @@ object LedgerApiServer { executionContext: ExecutionContext, tracer: Tracer, loggerFactory: NamedLoggerFactory, + )( + mutableLedgerEndCache: MutableLedgerEndCache, + stringInterningView: StringInterningView, )(implicit traceContext: TraceContext ): ResourceOwner[(InMemoryState, InMemoryStateUpdater.UpdaterFlow)] = { @@ -40,7 +45,7 @@ object LedgerApiServer { metrics = metrics, tracer = tracer, loggerFactory = loggerFactory, - ) + )(mutableLedgerEndCache, stringInterningView) inMemoryStateUpdater <- InMemoryStateUpdater.owner( inMemoryState = inMemoryState, diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala index e61d69132a8..5074be983e6 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala @@ -32,29 +32,29 @@ class PackageUpgradeValidator( )(implicit executionContext: ExecutionContext) extends NamedLogging { - def validateUpgrade(upgradingPackage: (Ref.PackageId, Ast.Package))(implicit + def validateUpgrade(uploadedPackage: (Ref.PackageId, Ast.Package))(implicit loggingContext: LoggingContextWithTrace ): EitherT[Future, DamlError, Unit] = { val packageMap = getPackageMap(loggingContext) - val (upgradingPackageId, upgradingPackageAst) = upgradingPackage - val optUpgradingDar = Some(upgradingPackage) + val (uploadedPackageId, uploadedPackageAst) = uploadedPackage + val optUpgradingDar = Some(uploadedPackage) logger.info( - s"Uploading DAR file for $upgradingPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." + s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." ) - existingVersionedPackageId(upgradingPackageAst, packageMap) match { - case Some(uploadedPackageId) => - if (uploadedPackageId == upgradingPackageId) { + existingVersionedPackageId(uploadedPackageAst, packageMap) match { + case Some(existingPackageId) => + if (existingPackageId == uploadedPackageId) { logger.info( - s"Ignoring upload of package $upgradingPackageId as it has been previously uploaded" + s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded" ) EitherT.rightT[Future, DamlError](()) } else { EitherT.leftT[Future, Unit]( Validation.UpgradeVersion .Error( - uploadedPackageId, - upgradingPackageId, - upgradingPackageAst.metadata.version, + uploadedPackageId = uploadedPackageId, + existingPackage = existingPackageId, + packageVersion = uploadedPackageAst.metadata.version, ): DamlError ) } @@ -63,7 +63,7 @@ class PackageUpgradeValidator( for { optMaximalDar <- EitherT.right[DamlError]( maximalVersionedDar( - upgradingPackageAst, + uploadedPackageAst, packageMap, ) ) @@ -73,14 +73,14 @@ class PackageUpgradeValidator( optMaximalDar, ) optMinimalDar <- EitherT.right[DamlError]( - minimalVersionedDar(upgradingPackageAst, packageMap) + minimalVersionedDar(uploadedPackageAst, packageMap) ) _ <- typecheckUpgrades( TypecheckUpgrades.MinimalDarCheck, optMinimalDar, optUpgradingDar, ) - _ = logger.info(s"Typechecking upgrades for $upgradingPackageId succeeded.") + _ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.") } yield () } } @@ -165,7 +165,12 @@ class PackageUpgradeValidator( .toEither ) ).leftMap[DamlError] { - case err: UpgradeError => Validation.Upgradeability.Error(newPkgId1, oldPkgId2, err) + case err: UpgradeError => + Validation.Upgradeability.Error( + upgradingPackage = newPkgId1, + upgradedPackage = oldPkgId2, + upgradeError = err, + ) case unhandledErr => InternalError.Unhandled( unhandledErr, diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/IndexComponentTest.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/IndexComponentTest.scala index da0c04a82c7..efa5bae01c2 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/IndexComponentTest.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/IndexComponentTest.scala @@ -32,7 +32,9 @@ import com.digitalasset.canton.platform.store.DbSupport.{ DbConfig, ParticipantDataSourceConfig, } +import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache import com.digitalasset.canton.platform.store.dao.events.{ContractLoader, LfValueTranslation} +import com.digitalasset.canton.platform.store.interning.StringInterningView import com.digitalasset.canton.platform.store.packagemeta.PackageMetadata import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced} import com.digitalasset.daml.lf.data.Ref @@ -93,6 +95,8 @@ trait IndexComponentTest extends PekkoBeforeAndAfterAll with BaseTest { val engine = new Engine( EngineConfig(LanguageVersion.StableVersions(LanguageMajorVersion.V2)) ) + val mutableLedgerEndCache = MutableLedgerEndCache() + val stringInterningView = new StringInterningView(loggerFactory) val indexResourceOwner = for { @@ -104,7 +108,7 @@ trait IndexComponentTest extends PekkoBeforeAndAfterAll with BaseTest { executionContext = ec, tracer = NoReportingTracerProvider.tracer, loggerFactory = loggerFactory, - ) + )(mutableLedgerEndCache, stringInterningView) dbSupport <- DbSupport .owner( serverRole = ServerRole.ApiServer, diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/RecoveringIndexerIntegrationSpec.scala index 0a18b615fc7..7c1f4735d74 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -38,6 +38,7 @@ import com.digitalasset.canton.platform.store.DbSupport.{ ParticipantDataSourceConfig, } import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache +import com.digitalasset.canton.platform.store.interning.StringInterningView import com.digitalasset.canton.tracing.TraceContext.{withNewTraceContext, wrapWithNewTraceContext} import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced} import com.digitalasset.canton.{HasExecutionContext, config} @@ -252,6 +253,8 @@ class RecoveringIndexerIntegrationSpec servicesExecutionContext <- ResourceOwner .forExecutorService(() => Executors.newWorkStealingPool()) .map(ExecutionContext.fromExecutorService) + mutableLedgerEndCache = MutableLedgerEndCache() + stringInterningView = new StringInterningView(loggerFactory) (inMemoryState, inMemoryStateUpdaterFlow) <- LedgerApiServer .createInMemoryStateAndUpdater( @@ -262,7 +265,7 @@ class RecoveringIndexerIntegrationSpec parallelExecutionContext, tracer, loggerFactory, - ) + )(mutableLedgerEndCache, stringInterningView) dbSupport <- DbSupport .owner( serverRole = ServerRole.Testing(getClass), diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/ha/IndexerStabilityTestFixture.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/ha/IndexerStabilityTestFixture.scala index dbdac171f8d..b0d7dd3f82e 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/ha/IndexerStabilityTestFixture.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/indexer/ha/IndexerStabilityTestFixture.scala @@ -18,6 +18,8 @@ import com.digitalasset.canton.platform.indexer.{ IndexerStartupMode, } import com.digitalasset.canton.platform.store.DbSupport.ParticipantDataSourceConfig +import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache +import com.digitalasset.canton.platform.store.interning.StringInterningView import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext} import io.opentelemetry.api.trace.Tracer @@ -91,6 +93,8 @@ final class IndexerStabilityTestFixture(loggerFactory: NamedLoggerFactory) { NoOpMetricsFactory, ) } + mutableLedgerEndCache = MutableLedgerEndCache() + stringInterningView = new StringInterningView(loggerFactory) (inMemoryState, inMemoryStateUpdaterFlow) <- LedgerApiServer .createInMemoryStateAndUpdater( @@ -101,7 +105,7 @@ final class IndexerStabilityTestFixture(loggerFactory: NamedLoggerFactory) { executionContext, tracer, loggerFactory, - ) + )(mutableLedgerEndCache, stringInterningView) .acquire() // Create an indexer and immediately start it diff --git a/sdk/canton/community/ledger/ledger-api-tools/src/main/scala/com/digitalasset/canton/ledger/indexerbenchmark/IndexerBenchmark.scala b/sdk/canton/community/ledger/ledger-api-tools/src/main/scala/com/digitalasset/canton/ledger/indexerbenchmark/IndexerBenchmark.scala index 1ce4f401613..d8e67f262fc 100644 --- a/sdk/canton/community/ledger/ledger-api-tools/src/main/scala/com/digitalasset/canton/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/sdk/canton/community/ledger/ledger-api-tools/src/main/scala/com/digitalasset/canton/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -1,188 +1,191 @@ // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -package com.digitalasset.canton.ledger.indexerbenchmark - -import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} -import com.daml.metrics.api.noop.NoOpMetricsFactory -import com.daml.metrics.api.{HistogramInventory, MetricName} -import com.daml.resources -import com.digitalasset.canton.concurrent.DirectExecutionContext -import com.digitalasset.canton.data.Offset -import com.digitalasset.canton.discard.Implicits.DiscardOps -import com.digitalasset.canton.ledger.api.health.{HealthStatus, Healthy} -import com.digitalasset.canton.ledger.participant.state.{ - InternalStateServiceProviderImpl, - ReadService, - Update, -} -import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics} -import com.digitalasset.canton.platform.LedgerApiServer -import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker -import com.digitalasset.canton.platform.indexer.ha.HaConfig -import com.digitalasset.canton.platform.indexer.{Indexer, IndexerServiceOwner, JdbcIndexer} -import com.digitalasset.canton.platform.store.DbSupport.DataSourceProperties -import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext -import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced} -import io.opentelemetry.api.trace.Tracer -import org.apache.pekko.NotUsed -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.Materializer -import org.apache.pekko.stream.scaladsl.Source - -import java.util.concurrent.Executors -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} -import scala.io.StdIn - // TODO(i12337): Use it or remove it -class IndexerBenchmark extends NamedLogging { - - override val loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root - - private val directEc = DirectExecutionContext(noTracingLogger) - - def run( - createUpdates: () => Future[Source[(Offset, Traced[Update]), NotUsed]], - config: Config, - dataSourceProperties: DataSourceProperties, - highAvailability: HaConfig, - ): Future[Unit] = { - withNewTraceContext { implicit traceContext => - val system = ActorSystem("IndexerBenchmark") - implicit val materializer: Materializer = Materializer(system) - implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher) - - val indexerExecutor = Executors.newWorkStealingPool() - val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor) - val tracer: Tracer = NoReportingTracerProvider.tracer - - println("Generating state updates...") - val updates = Await.result(createUpdates(), Duration(10, "minute")) - - println("Creating read service and indexer...") - val readService = createReadService(updates) - val metrics = new LedgerApiServerMetrics( - new LedgerApiServerHistograms(MetricName("noop"))(new HistogramInventory), - NoOpMetricsFactory, - ) - val resource = for { - servicesExecutionContext <- ResourceOwner - .forExecutorService(() => Executors.newWorkStealingPool()) - .map(ExecutionContext.fromExecutorService) - .acquire() - (inMemoryState, inMemoryStateUpdaterFlow) <- - LedgerApiServer - .createInMemoryStateAndUpdater( - CommandProgressTracker.NoOp, - config.indexServiceConfig, - 256, - metrics, - indexerExecutionContext, - tracer, - loggerFactory, - ) - .acquire() - indexerFactory = new JdbcIndexer.Factory( - config.participantId, - config.dataSource, - config.indexerConfig, - Set.empty, - readService, - metrics, - inMemoryState, - inMemoryStateUpdaterFlow, - servicesExecutionContext, - tracer, - loggerFactory, - dataSourceProperties, - highAvailability, - None, - ) - _ = println("Setting up the index database...") - indexer <- indexer(config, indexerExecutionContext, indexerFactory) - _ = println("Starting the indexing...") - startTime = System.nanoTime() - handle <- indexer.acquire() - _ <- Resource.fromFuture(handle) - stopTime = System.nanoTime() - _ = println("Indexing done.") - _ <- Resource.fromFuture(system.terminate()) - _ = indexerExecutor.shutdown() - } yield { - val result = new IndexerBenchmarkResult( - config, - metrics, - startTime, - stopTime, - ) - - println(result.banner) - - // Note: this allows the user to inspect the contents of an ephemeral database - if (config.waitForUserInput) { - println( - s"Index database is still running at ${config.dataSource.jdbcUrl}." - ) - StdIn.readLine("Press to terminate this process.").discard - } - - if (result.failure) throw new RuntimeException("Indexer Benchmark failure.") - () - } - resource.asFuture - } - } - - private def indexer( - config: Config, - indexerExecutionContext: ExecutionContextExecutor, - indexerFactory: JdbcIndexer.Factory, - )(implicit - traceContext: TraceContext, - rc: ResourceContext, - ): resources.Resource[ResourceContext, Indexer] = - Await - .result( - IndexerServiceOwner - .migrateOnly(config.dataSource.jdbcUrl, loggerFactory)(rc.executionContext, traceContext) - .map(_ => indexerFactory.initialized(logger))(indexerExecutionContext), - Duration(5, "minute"), - ) - .acquire() - - private[this] def createReadService( - updates: Source[(Offset, Traced[Update]), NotUsed] - ): ReadService = { - new ReadService with InternalStateServiceProviderImpl { - override def stateUpdates( - beginAfter: Option[Offset] - )(implicit traceContext: TraceContext): Source[(Offset, Traced[Update]), NotUsed] = { - assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") - updates - } - - override def currentHealth(): HealthStatus = Healthy - } - } - - def runAndExit( - config: Config, - dataSourceProperties: DataSourceProperties, - highAvailability: HaConfig, - updates: () => Future[Source[(Offset, Traced[Update]), NotUsed]], - ): Unit = { - val result: Future[Unit] = new IndexerBenchmark() - .run(updates, config, dataSourceProperties, highAvailability) - .recover { case ex => - logger.error("Error running benchmark", ex)(TraceContext.empty) - sys.exit(1) - }(directEc) - - Await.result(result, Duration(100, "hour")) - println("Done.") - // We call sys.exit because some actor system or thread pool is still running, preventing a normal shutdown. - sys.exit(0) - } -} +//// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +//// SPDX-License-Identifier: Apache-2.0 +// +//package com.digitalasset.canton.ledger.indexerbenchmark +// +//import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +//import com.daml.metrics.api.noop.NoOpMetricsFactory +//import com.daml.metrics.api.{HistogramInventory, MetricName} +//import com.daml.resources +//import com.digitalasset.canton.concurrent.DirectExecutionContext +//import com.digitalasset.canton.data.Offset +//import com.digitalasset.canton.discard.Implicits.DiscardOps +//import com.digitalasset.canton.ledger.api.health.{HealthStatus, Healthy} +//import com.digitalasset.canton.ledger.participant.state.{ +// InternalStateServiceProviderImpl, +// ReadService, +// Update, +//} +//import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} +//import com.digitalasset.canton.metrics.{LedgerApiServerHistograms, LedgerApiServerMetrics} +//import com.digitalasset.canton.platform.LedgerApiServer +//import com.digitalasset.canton.platform.apiserver.execution.CommandProgressTracker +//import com.digitalasset.canton.platform.indexer.ha.HaConfig +//import com.digitalasset.canton.platform.indexer.{Indexer, IndexerServiceOwner, JdbcIndexer} +//import com.digitalasset.canton.platform.store.DbSupport.DataSourceProperties +//import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext +//import com.digitalasset.canton.tracing.{NoReportingTracerProvider, TraceContext, Traced} +//import io.opentelemetry.api.trace.Tracer +//import org.apache.pekko.NotUsed +//import org.apache.pekko.actor.ActorSystem +//import org.apache.pekko.stream.Materializer +//import org.apache.pekko.stream.scaladsl.Source +// +//import java.util.concurrent.Executors +//import scala.concurrent.duration.Duration +//import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} +//import scala.io.StdIn +// +//class IndexerBenchmark extends NamedLogging { +// +// override val loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root +// +// private val directEc = DirectExecutionContext(noTracingLogger) +// +// def run( +// createUpdates: () => Future[Source[(Offset, Traced[Update]), NotUsed]], +// config: Config, +// dataSourceProperties: DataSourceProperties, +// highAvailability: HaConfig, +// ): Future[Unit] = { +// withNewTraceContext { implicit traceContext => +// val system = ActorSystem("IndexerBenchmark") +// implicit val materializer: Materializer = Materializer(system) +// implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher) +// +// val indexerExecutor = Executors.newWorkStealingPool() +// val indexerExecutionContext = ExecutionContext.fromExecutor(indexerExecutor) +// val tracer: Tracer = NoReportingTracerProvider.tracer +// +// println("Generating state updates...") +// val updates = Await.result(createUpdates(), Duration(10, "minute")) +// +// println("Creating read service and indexer...") +// val readService = createReadService(updates) +// val metrics = new LedgerApiServerMetrics( +// new LedgerApiServerHistograms(MetricName("noop"))(new HistogramInventory), +// NoOpMetricsFactory, +// ) +// val resource = for { +// servicesExecutionContext <- ResourceOwner +// .forExecutorService(() => Executors.newWorkStealingPool()) +// .map(ExecutionContext.fromExecutorService) +// .acquire() +// (inMemoryState, inMemoryStateUpdaterFlow) <- +// LedgerApiServer +// .createInMemoryStateAndUpdater( +// CommandProgressTracker.NoOp, +// config.indexServiceConfig, +// 256, +// metrics, +// indexerExecutionContext, +// tracer, +// loggerFactory, +// ) +// .acquire() +// indexerFactory = new JdbcIndexer.Factory( +// config.participantId, +// config.dataSource, +// config.indexerConfig, +// Set.empty, +// readService, +// metrics, +// inMemoryState, +// inMemoryStateUpdaterFlow, +// servicesExecutionContext, +// tracer, +// loggerFactory, +// dataSourceProperties, +// highAvailability, +// None, +// ) +// _ = println("Setting up the index database...") +// indexer <- indexer(config, indexerExecutionContext, indexerFactory) +// _ = println("Starting the indexing...") +// startTime = System.nanoTime() +// handle <- indexer.acquire() +// _ <- Resource.fromFuture(handle) +// stopTime = System.nanoTime() +// _ = println("Indexing done.") +// _ <- Resource.fromFuture(system.terminate()) +// _ = indexerExecutor.shutdown() +// } yield { +// val result = new IndexerBenchmarkResult( +// config, +// metrics, +// startTime, +// stopTime, +// ) +// +// println(result.banner) +// +// // Note: this allows the user to inspect the contents of an ephemeral database +// if (config.waitForUserInput) { +// println( +// s"Index database is still running at ${config.dataSource.jdbcUrl}." +// ) +// StdIn.readLine("Press to terminate this process.").discard +// } +// +// if (result.failure) throw new RuntimeException("Indexer Benchmark failure.") +// () +// } +// resource.asFuture +// } +// } +// +// private def indexer( +// config: Config, +// indexerExecutionContext: ExecutionContextExecutor, +// indexerFactory: JdbcIndexer.Factory, +// )(implicit +// traceContext: TraceContext, +// rc: ResourceContext, +// ): resources.Resource[ResourceContext, Indexer] = +// Await +// .result( +// IndexerServiceOwner +// .migrateOnly(config.dataSource.jdbcUrl, loggerFactory)(rc.executionContext, traceContext) +// .map(_ => indexerFactory.initialized(logger))(indexerExecutionContext), +// Duration(5, "minute"), +// ) +// .acquire() +// +// private[this] def createReadService( +// updates: Source[(Offset, Traced[Update]), NotUsed] +// ): ReadService = { +// new ReadService with InternalStateServiceProviderImpl { +// override def stateUpdates( +// beginAfter: Option[Offset] +// )(implicit traceContext: TraceContext): Source[(Offset, Traced[Update]), NotUsed] = { +// assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") +// updates +// } +// +// override def currentHealth(): HealthStatus = Healthy +// } +// } +// +// def runAndExit( +// config: Config, +// dataSourceProperties: DataSourceProperties, +// highAvailability: HaConfig, +// updates: () => Future[Source[(Offset, Traced[Update]), NotUsed]], +// ): Unit = { +// val result: Future[Unit] = new IndexerBenchmark() +// .run(updates, config, dataSourceProperties, highAvailability) +// .recover { case ex => +// logger.error("Error running benchmark", ex)(TraceContext.empty) +// sys.exit(1) +// }(directEc) +// +// Await.result(result, Duration(100, "hour")) +// println("Done.") +// // We call sys.exit because some actor system or thread pool is still running, preventing a normal shutdown. +// sys.exit(0) +// } +//} diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv1/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv1/daml.yaml index e31316d01b5..c7abc0c5f3d 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv1/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv1/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --enable-interfaces=yes name: carbonv1-tests diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv2/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv2/daml.yaml index cf3081dbaa4..2683839ee74 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv2/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/carbonv2/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --enable-interfaces=yes name: carbonv2-tests diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/experimental/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/experimental/daml.yaml index 4efe8164c41..cfd7078c8fe 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/experimental/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/experimental/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 name: experimental-tests source: . version: 3.1.0 diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/model/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/model/daml.yaml index 02292d34bec..057348f49dc 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/model/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/model/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --enable-interfaces=yes name: model-tests diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/package_management/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/package_management/daml.yaml index 9e687e70f04..5f729b59528 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/package_management/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/package_management/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 name: package-management-tests source: . version: 3.1.0 diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/semantic/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/semantic/daml.yaml index dfa429b89d3..6b128c48038 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/semantic/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/semantic/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --enable-interfaces=yes name: semantic-tests diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/1.0.0/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/1.0.0/daml.yaml index 3d42e6c47fc..b4ba6ced8ec 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/1.0.0/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/1.0.0/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 name: upgrade-tests source: . version: 1.0.0 diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/2.0.0/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/2.0.0/daml.yaml index f43bbd248cd..15dbd698874 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/2.0.0/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/2.0.0/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 name: upgrade-tests source: . version: 2.0.0 diff --git a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/3.0.0/daml.yaml b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/3.0.0/daml.yaml index aaff4db5c74..9ede17ce050 100644 --- a/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/3.0.0/daml.yaml +++ b/sdk/canton/community/ledger/ledger-common-dars/src/main/daml/upgrade/3.0.0/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 name: upgrade-tests source: . version: 3.0.0 diff --git a/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_1/daml.yaml b/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_1/daml.yaml index 5170e8ecff7..9aa6b8566c3 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_1/daml.yaml +++ b/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_1/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: JsonEncodingTest diff --git a/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_dev/daml.yaml b/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_dev/daml.yaml index 12a03f1dbfd..bcdac6805eb 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_dev/daml.yaml +++ b/sdk/canton/community/ledger/ledger-json-api/src/test/daml/v2_dev/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.dev name: JsonEncodingTestDev diff --git a/sdk/canton/community/participant/src/main/daml/daml.yaml b/sdk/canton/community/participant/src/main/daml/daml.yaml index 9758a5506ef..4bc298d2001 100644 --- a/sdk/canton/community/participant/src/main/daml/daml.yaml +++ b/sdk/canton/community/participant/src/main/daml/daml.yaml @@ -1,4 +1,4 @@ -sdk-version: 3.1.0-snapshot.20240625.13151.0.v74f04330 +sdk-version: 3.1.0-snapshot.20240627.13156.0.vc8bcbe70 build-options: - --target=2.1 name: AdminWorkflows diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/CantonLedgerApiServerFactory.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/CantonLedgerApiServerFactory.scala index 7d01a6bf958..78ea979b938 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/CantonLedgerApiServerFactory.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/CantonLedgerApiServerFactory.scala @@ -130,6 +130,7 @@ class CantonLedgerApiServerFactory( futureSupervisor = futureSupervisor, parameters = parameters, excludedPackageIds = excludedPackageIds, + ledgerApiStore = participantNodePersistentState.map(_.ledgerApiStore), )(executionContext, actorSystem) .leftMap { err => // The MigrateOnEmptySchema exception is private, thus match on the expected message diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ParticipantNode.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ParticipantNode.scala index 66ab3531bd0..6f647c78765 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ParticipantNode.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ParticipantNode.scala @@ -543,11 +543,14 @@ class ParticipantNodeBootstrap( persistentStateFactory.create( syncDomainPersistentStateManager, storage, + config.storage, clock, config.init.ledgerApi.maxDeduplicationDuration.toInternal.some, parameterConfig.batchingConfig, ReleaseProtocolVersion.latest, arguments.metrics, + participantId.toLf, + config.ledgerApi, indexedStringStore, parameterConfig.processingTimeouts, futureSupervisor, diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/CantonLedgerApiServerWrapper.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/CantonLedgerApiServerWrapper.scala index 1d1ed7633da..09023553b92 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/CantonLedgerApiServerWrapper.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/CantonLedgerApiServerWrapper.scala @@ -3,6 +3,7 @@ package com.digitalasset.canton.participant.ledger.api +import cats.Eval import cats.data.EitherT import cats.syntax.either.* import com.daml.tracing.DefaultOpenTelemetry @@ -24,10 +25,10 @@ import com.digitalasset.canton.platform.apiserver.* import com.digitalasset.canton.platform.apiserver.meteringreport.MeteringReportKey import com.digitalasset.canton.platform.indexer.IndexerConfig import com.digitalasset.canton.platform.indexer.ha.HaConfig -import com.digitalasset.canton.platform.store.DbSupport import com.digitalasset.canton.tracing.{NoTracing, TracerProvider} import com.digitalasset.canton.{LedgerParticipantId, LfPackageId} import com.digitalasset.daml.lf.engine.Engine +import io.opentelemetry.api.trace.Tracer import org.apache.pekko.actor.ActorSystem import scala.util.{Failure, Success} @@ -92,78 +93,52 @@ object CantonLedgerApiServerWrapper extends NoTracing { startLedgerApiServer: Boolean, futureSupervisor: FutureSupervisor, excludedPackageIds: Set[LfPackageId], + ledgerApiStore: Eval[LedgerApiStore], )(implicit ec: ExecutionContextIdlenessExecutorService, actorSystem: ActorSystem, ): EitherT[FutureUnlessShutdown, LedgerApiServerError, LedgerApiServerState] = { + implicit val tracer: Tracer = config.tracerProvider.tracer - val ledgerApiStorageE = LedgerApiStorage.fromStorageConfig( - config.storageConfig, - config.participantId, - ) + val startableStoppableLedgerApiServer = + new StartableStoppableLedgerApiServer( + config = config, + telemetry = new DefaultOpenTelemetry(config.tracerProvider.openTelemetry), + futureSupervisor = futureSupervisor, + parameters = parameters, + commandProgressTracker = config.syncService.commandProgressTracker, + excludedPackageIds = excludedPackageIds, + ledgerApiStore = ledgerApiStore, + ) + val startFUS = for { + _ <- + if (startLedgerApiServer) startableStoppableLedgerApiServer.start() + else FutureUnlessShutdown.unit + } yield () - EitherT - .fromEither[FutureUnlessShutdown](ledgerApiStorageE) - .flatMap { ledgerApiStorage => - val connectionPoolConfig = DbSupport.ConnectionPoolConfig( - connectionPoolSize = config.storageConfig.numConnectionsLedgerApiServer.unwrap, - connectionTimeout = config.serverConfig.databaseConnectionTimeout.underlying, - ) - - val dbConfig = DbSupport.DbConfig( - jdbcUrl = ledgerApiStorage.jdbcUrl, - connectionPool = connectionPoolConfig, - postgres = config.serverConfig.postgresDataSource, - ) - - val participantDataSourceConfig = - DbSupport.ParticipantDataSourceConfig(ledgerApiStorage.jdbcUrl) - - implicit val tracer = config.tracerProvider.tracer - - val startableStoppableLedgerApiServer = - new StartableStoppableLedgerApiServer( - config = config, - participantDataSourceConfig = participantDataSourceConfig, - dbConfig = dbConfig, - telemetry = new DefaultOpenTelemetry(config.tracerProvider.openTelemetry), - futureSupervisor = futureSupervisor, - parameters = parameters, - commandProgressTracker = config.syncService.commandProgressTracker, - excludedPackageIds = excludedPackageIds, - ) - val startFUS = for { - _ <- - if (startLedgerApiServer) startableStoppableLedgerApiServer.start() - else FutureUnlessShutdown.unit - } yield () - - EitherT(startFUS.transformWith { - case Success(_) => - FutureUnlessShutdown.pure( - Either.right( - LedgerApiServerState( - ledgerApiStorage, - startableStoppableLedgerApiServer, - config.logger, - config.cantonParameterConfig.processingTimeouts, - ) - ) + EitherT(startFUS.transformWith { + case Success(_) => + FutureUnlessShutdown.pure( + Either.right( + LedgerApiServerState( + startableStoppableLedgerApiServer, + config.logger, + config.cantonParameterConfig.processingTimeouts, ) - case Failure(e) => FutureUnlessShutdown.pure(Left(FailedToStartLedgerApiServer(e))) - }) - } + ) + ) + case Failure(e) => FutureUnlessShutdown.pure(Left(FailedToStartLedgerApiServer(e))) + }) } final case class LedgerApiServerState( - ledgerApiStorage: LedgerApiStorage, startableStoppableLedgerApi: StartableStoppableLedgerApiServer, override protected val logger: TracedLogger, protected override val timeouts: ProcessingTimeout, ) extends FlagCloseable { override protected def onClosed(): Unit = - Lifecycle.close(startableStoppableLedgerApi, ledgerApiStorage)(logger) + Lifecycle.close(startableStoppableLedgerApi)(logger) override def toString: String = getClass.getSimpleName } diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/LedgerApiStore.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/LedgerApiStore.scala new file mode 100644 index 00000000000..d19f24efd94 --- /dev/null +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/LedgerApiStore.scala @@ -0,0 +1,139 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.canton.participant.ledger.api + +import com.daml.logging.entries.LoggingEntries +import com.daml.metrics.DatabaseMetrics +import com.digitalasset.canton.concurrent.ExecutionContextIdlenessExecutorService +import com.digitalasset.canton.config.{MemoryStorageConfig, ProcessingTimeout, StorageConfig} +import com.digitalasset.canton.logging.{LoggingContextWithTrace, NamedLoggerFactory} +import com.digitalasset.canton.metrics.LedgerApiServerMetrics +import com.digitalasset.canton.platform.ResourceCloseable +import com.digitalasset.canton.platform.config.ServerRole +import com.digitalasset.canton.platform.store.backend.ParameterStorageBackend.LedgerEnd +import com.digitalasset.canton.platform.store.backend.postgresql.PostgresDataSourceConfig +import com.digitalasset.canton.platform.store.cache.MutableLedgerEndCache +import com.digitalasset.canton.platform.store.interning.StringInterningView +import com.digitalasset.canton.platform.store.{DbSupport, FlywayMigrations} +import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.{LedgerParticipantId, config} + +import java.sql.Connection +import scala.concurrent.{ExecutionContext, Future} + +final class LedgerApiStore( + val ledgerApiDbSupport: DbSupport, + val ledgerApiStorage: LedgerApiStorage, + val ledgerEndCache: MutableLedgerEndCache, + val stringInterningView: StringInterningView, + val metrics: LedgerApiServerMetrics, + val loggerFactory: NamedLoggerFactory, + val timeouts: ProcessingTimeout, +) extends ResourceCloseable { + private val parameterStorageBackend = + ledgerApiDbSupport.storageBackendFactory.createParameterStorageBackend + private val stringInterningStorageBackend = + ledgerApiDbSupport.storageBackendFactory.createStringInterningStorageBackend + + private def executeSql[T](databaseMetrics: DatabaseMetrics)( + sql: Connection => T + )(implicit traceContext: TraceContext): Future[T] = + ledgerApiDbSupport.dbDispatcher.executeSql(databaseMetrics)(sql)( + new LoggingContextWithTrace(LoggingEntries.empty, traceContext) + ) + + def ledgerEnd(implicit traceContext: TraceContext): Future[LedgerEnd] = + executeSql(metrics.index.db.getLedgerEnd)( + parameterStorageBackend.ledgerEnd + ) + + private[api] def initializeInMemoryState(implicit + traceContext: TraceContext, + executionContext: ExecutionContext, + ): Future[Unit] = + for { + currentLedgerEnd <- ledgerEnd + _ <- stringInterningView.update(currentLedgerEnd.lastStringInterningId)( + (fromExclusive, toInclusive) => + executeSql(metrics.index.db.loadStringInterningEntries)( + stringInterningStorageBackend.loadStringInterningEntries( + fromExclusive, + toInclusive, + ) + ) + ) + } yield { + ledgerEndCache.set( + ( + currentLedgerEnd.lastOffset, + currentLedgerEnd.lastEventSeqId, + ) + ) + } +} + +object LedgerApiStore { + def initialize( + storageConfig: StorageConfig, + ledgerParticipantId: LedgerParticipantId, + legderApiDatabaseConnectionTimeout: config.NonNegativeFiniteDuration, + ledgerApiPostgresDataSourceConfig: PostgresDataSourceConfig, + timeouts: ProcessingTimeout, + loggerFactory: NamedLoggerFactory, + metrics: LedgerApiServerMetrics, + )(implicit + traceContext: TraceContext, + executionContext: ExecutionContextIdlenessExecutorService, + ): Future[LedgerApiStore] = { + val initializationLogger = loggerFactory.getTracedLogger(LedgerApiStore.getClass) + val ledgerApiStorage = LedgerApiStorage + .fromStorageConfig(storageConfig, ledgerParticipantId) + .fold( + error => throw new IllegalStateException(s"Constructing LedgerApiStorage failed: $error"), + identity, + ) + val dbConfig = DbSupport.DbConfig( + jdbcUrl = ledgerApiStorage.jdbcUrl, + connectionPool = DbSupport.ConnectionPoolConfig( + connectionPoolSize = storageConfig.numConnectionsLedgerApiServer.unwrap, + connectionTimeout = legderApiDatabaseConnectionTimeout.underlying, + ), + postgres = ledgerApiPostgresDataSourceConfig, + ) + val numLedgerApi = dbConfig.connectionPool.connectionPoolSize + initializationLogger.info(s"Creating ledger API storage num-ledger-api: $numLedgerApi") + + for { + _ <- storageConfig match { + // ledger api server needs an H2 db to run in memory + case _: MemoryStorageConfig => + new FlywayMigrations( + ledgerApiStorage.jdbcUrl, + loggerFactory, + ).migrate() + case _ => Future.unit + } + ledgerApiStore <- DbSupport + .owner( + serverRole = ServerRole.ApiServer, + metrics = metrics, + dbConfig = dbConfig, + loggerFactory = loggerFactory, + ) + .map(dbSupport => + new LedgerApiStore( + ledgerApiDbSupport = dbSupport, + ledgerApiStorage = ledgerApiStorage, + ledgerEndCache = MutableLedgerEndCache(), + stringInterningView = new StringInterningView(loggerFactory), + metrics = metrics, + loggerFactory = loggerFactory, + timeouts = timeouts, + ) + ) + .acquireFlagCloseable("Ledger API DB Support") + _ <- ledgerApiStore.initializeInMemoryState + } yield ledgerApiStore + } +} diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala index ef941c006df..bc56d355fe6 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala @@ -3,6 +3,7 @@ package com.digitalasset.canton.participant.ledger.api +import cats.Eval import com.daml.executors.executors.{NamedExecutor, QueueAwareExecutor} import com.daml.ledger.api.v2.experimental_features.ExperimentalCommandInspectionService import com.daml.ledger.api.v2.state_service.GetActiveContractsResponse @@ -49,7 +50,7 @@ import com.digitalasset.canton.platform.apiserver.ratelimiting.{ ThreadpoolCheck, } import com.digitalasset.canton.platform.apiserver.{ApiServiceOwner, LedgerFeatures} -import com.digitalasset.canton.platform.config.{IdentityProviderManagementConfig, ServerRole} +import com.digitalasset.canton.platform.config.IdentityProviderManagementConfig import com.digitalasset.canton.platform.index.IndexServiceOwner import com.digitalasset.canton.platform.indexer.{ IndexerConfig, @@ -57,7 +58,6 @@ import com.digitalasset.canton.platform.indexer.{ IndexerStartupMode, } import com.digitalasset.canton.platform.store.DbSupport -import com.digitalasset.canton.platform.store.DbSupport.ParticipantDataSourceConfig import com.digitalasset.canton.platform.store.dao.events.{ContractLoader, LfValueTranslation} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.{FutureUtil, SimpleExecutionQueue} @@ -76,19 +76,16 @@ import scala.concurrent.Future * depending on whether the participant node is a High Availability active or passive replica. * * @param config ledger api server configuration - * @param participantDataSourceConfig configuration for the data source (e.g., jdbc url) - * @param dbConfig the Index DB config * @param executionContext the execution context */ class StartableStoppableLedgerApiServer( config: CantonLedgerApiServerWrapper.Config, - participantDataSourceConfig: ParticipantDataSourceConfig, - dbConfig: DbSupport.DbConfig, telemetry: Telemetry, futureSupervisor: FutureSupervisor, parameters: ParticipantNodeParameters, commandProgressTracker: CommandProgressTracker, excludedPackageIds: Set[LfPackageId], + ledgerApiStore: Eval[LedgerApiStore], )(implicit executionContext: ExecutionContextIdlenessExecutorService, actorSystem: ActorSystem, @@ -178,7 +175,6 @@ class StartableStoppableLedgerApiServer( private def buildLedgerApiServerOwner( )(implicit traceContext: TraceContext) = { - implicit val loggingContextWithTrace: LoggingContextWithTrace = LoggingContextWithTrace(loggerFactory, telemetry) @@ -188,8 +184,7 @@ class StartableStoppableLedgerApiServer( case _ => IndexerStartupMode.JustStart } val numIndexer = config.indexerConfig.ingestionParallelism.unwrap - val numLedgerApi = dbConfig.connectionPool.connectionPoolSize - logger.info(s"Creating storage, num-indexer: $numIndexer, num-ledger-api: $numLedgerApi") + logger.info(s"Creating indexer storage, num-indexer: $numIndexer") val indexServiceConfig = config.serverConfig.indexService @@ -210,6 +205,7 @@ class StartableStoppableLedgerApiServer( override def bindService(): ServerServiceDefinition = ApiInfoServiceGrpc.bindService(this, executionContext) } + val dbSupport = ledgerApiStore.value.ledgerApiDbSupport for { (inMemoryState, inMemoryStateUpdaterFlow) <- @@ -221,19 +217,12 @@ class StartableStoppableLedgerApiServer( executionContext, tracer, loggerFactory, - ) + )(ledgerApiStore.value.ledgerEndCache, ledgerApiStore.value.stringInterningView) timedWriteService = new TimedWriteService(config.syncService, config.metrics) timedReadService = new TimedReadService(config.syncService, config.metrics) - dbSupport <- DbSupport - .owner( - serverRole = ServerRole.ApiServer, - metrics = config.metrics, - dbConfig = dbConfig, - loggerFactory = loggerFactory, - ) indexerHealth <- new IndexerServiceOwner( config.participantId, - participantDataSourceConfig, + DbSupport.ParticipantDataSourceConfig(ledgerApiStore.value.ledgerApiStorage.jdbcUrl), timedReadService, config.indexerConfig, config.metrics, diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessor.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessor.scala index 93cd498e6c3..aeb2a7d35d2 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessor.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessor.scala @@ -1545,7 +1545,7 @@ abstract class ProtocolProcessor[ _ <- ifThenET(!cleanReplay) { for { _unit <- { - logger.debug( + logger.info( show"Finalizing ${steps.requestKind.unquoted} request=${requestId.unwrap} with event $eventO." ) @@ -1573,6 +1573,7 @@ abstract class ProtocolProcessor[ requestCounter, commitSet, ) + _ = logger.info(show"About to wrap up request ${requestId}") requestTimestamp = requestId.unwrap _unit <- EitherT.right[steps.ResultError]( FutureUnlessShutdown.outcomeF( @@ -1586,7 +1587,7 @@ abstract class ProtocolProcessor[ ) } yield pendingSubmissionDataO.foreach(steps.postProcessResult(verdict, _)) } - } yield () + } yield logger.info(show"Finished async result processing of request ${requestId}") } private def checkContradictoryMediatorApprove( diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/store/ParticipantNodePersistentState.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/store/ParticipantNodePersistentState.scala index 42108ebcc4c..2b299bc597f 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/store/ParticipantNodePersistentState.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/store/ParticipantNodePersistentState.scala @@ -6,8 +6,12 @@ package com.digitalasset.canton.participant.store import cats.Eval import cats.syntax.foldable.* import com.daml.nameof.NameOf.functionFullName -import com.digitalasset.canton.concurrent.FutureSupervisor -import com.digitalasset.canton.config.{BatchingConfig, ProcessingTimeout} +import com.digitalasset.canton.LedgerParticipantId +import com.digitalasset.canton.concurrent.{ + ExecutionContextIdlenessExecutorService, + FutureSupervisor, +} +import com.digitalasset.canton.config.{BatchingConfig, ProcessingTimeout, StorageConfig} import com.digitalasset.canton.lifecycle.{ CloseContext, FlagCloseable, @@ -20,6 +24,8 @@ import com.digitalasset.canton.logging.{ NamedLogging, NamedLoggingContext, } +import com.digitalasset.canton.participant.config.LedgerApiServerConfig +import com.digitalasset.canton.participant.ledger.api.LedgerApiStore import com.digitalasset.canton.participant.metrics.ParticipantMetrics import com.digitalasset.canton.participant.sync.SyncDomainPersistentStateLookup import com.digitalasset.canton.resource.Storage @@ -32,7 +38,6 @@ import com.digitalasset.canton.util.{ErrorUtil, retry} import com.digitalasset.canton.version.ReleaseProtocolVersion import org.apache.pekko.stream.Materializer -import scala.concurrent.ExecutionContext import scala.concurrent.duration.* /** Some of the state of a participant that is not tied to a domain and must survive restarts. @@ -41,6 +46,7 @@ import scala.concurrent.duration.* */ class ParticipantNodePersistentState private ( val settingsStore: ParticipantSettingsStore, + val ledgerApiStore: LedgerApiStore, val participantEventLog: ParticipantEventLog, val multiDomainEventLog: MultiDomainEventLog, val inFlightSubmissionStore: InFlightSubmissionStore, @@ -58,6 +64,7 @@ class ParticipantNodePersistentState private ( inFlightSubmissionStore, commandDeduplicationStore, pruningStore, + ledgerApiStore, )(logger) } @@ -65,17 +72,20 @@ trait ParticipantNodePersistentStateFactory { def create( syncDomainPersistentStates: SyncDomainPersistentStateLookup, storage: Storage, + storageConfig: StorageConfig, clock: Clock, maxDeduplicationDurationO: Option[NonNegativeFiniteDuration], batching: BatchingConfig, releaseProtocolVersion: ReleaseProtocolVersion, metrics: ParticipantMetrics, + ledgerParticipantId: LedgerParticipantId, + ledgerApiServerConfig: LedgerApiServerConfig, indexedStringStore: IndexedStringStore, timeouts: ProcessingTimeout, futureSupervisor: FutureSupervisor, loggerFactory: NamedLoggerFactory, )(implicit - ec: ExecutionContext, + ec: ExecutionContextIdlenessExecutorService, mat: Materializer, traceContext: TraceContext, ): FutureUnlessShutdown[Eval[ParticipantNodePersistentState]] @@ -85,28 +95,34 @@ object ParticipantNodePersistentStateFactory extends ParticipantNodePersistentSt override def create( syncDomainPersistentStates: SyncDomainPersistentStateLookup, storage: Storage, + storageConfig: StorageConfig, clock: Clock, maxDeduplicationDurationO: Option[NonNegativeFiniteDuration], batching: BatchingConfig, releaseProtocolVersion: ReleaseProtocolVersion, metrics: ParticipantMetrics, + ledgerParticipantId: LedgerParticipantId, + ledgerApiServerConfig: LedgerApiServerConfig, indexedStringStore: IndexedStringStore, timeouts: ProcessingTimeout, futureSupervisor: FutureSupervisor, loggerFactory: NamedLoggerFactory, )(implicit - ec: ExecutionContext, + ec: ExecutionContextIdlenessExecutorService, mat: Materializer, traceContext: TraceContext, ): FutureUnlessShutdown[Eval[ParticipantNodePersistentState]] = ParticipantNodePersistentState .create( syncDomainPersistentStates, storage, + storageConfig, clock, maxDeduplicationDurationO, batching, releaseProtocolVersion, metrics, + ledgerParticipantId, + ledgerApiServerConfig, indexedStringStore, timeouts, futureSupervisor, @@ -122,17 +138,20 @@ object ParticipantNodePersistentState extends HasLoggerName { def create( syncDomainPersistentStates: SyncDomainPersistentStateLookup, storage: Storage, + storageConfig: StorageConfig, clock: Clock, maxDeduplicationDurationO: Option[NonNegativeFiniteDuration], batching: BatchingConfig, releaseProtocolVersion: ReleaseProtocolVersion, metrics: ParticipantMetrics, + ledgerParticipantId: LedgerParticipantId, + ledgerApiServerConfig: LedgerApiServerConfig, indexedStringStore: IndexedStringStore, timeouts: ProcessingTimeout, futureSupervisor: FutureSupervisor, loggerFactory: NamedLoggerFactory, )(implicit - ec: ExecutionContext, + ec: ExecutionContextIdlenessExecutorService, mat: Materializer, traceContext: TraceContext, ): FutureUnlessShutdown[ParticipantNodePersistentState] = { @@ -235,10 +254,22 @@ object ParticipantNodePersistentState extends HasLoggerName { loggerFactory, ) ) + ledgerApiStore <- FutureUnlessShutdown.outcomeF( + LedgerApiStore.initialize( + storageConfig = storageConfig, + ledgerParticipantId = ledgerParticipantId, + legderApiDatabaseConnectionTimeout = ledgerApiServerConfig.databaseConnectionTimeout, + ledgerApiPostgresDataSourceConfig = ledgerApiServerConfig.postgresDataSource, + timeouts = timeouts, + loggerFactory = loggerFactory, + metrics = metrics.ledgerApiServer, + ) + ) _ = flagCloseable.close() } yield { new ParticipantNodePersistentState( settingsStore, + ledgerApiStore, participantEventLog, multiDomainEventLog, inFlightSubmissionStore, diff --git a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessorTest.scala b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessorTest.scala index bd934258eb9..b8805aaf81a 100644 --- a/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessorTest.scala +++ b/sdk/canton/community/participant/src/test/scala/com/digitalasset/canton/participant/protocol/ProtocolProcessorTest.scala @@ -14,6 +14,7 @@ import com.digitalasset.canton.config.RequireTypes.{NonNegativeInt, PositiveInt} import com.digitalasset.canton.config.{ BatchingConfig, CachingConfigs, + CommunityStorageConfig, DefaultProcessingTimeouts, ProcessingTimeout, TestingConfigInternal, @@ -26,6 +27,7 @@ import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.ledger.participant.state.CompletionInfo import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, UnlessShutdown} import com.digitalasset.canton.logging.pretty.Pretty +import com.digitalasset.canton.participant.config.LedgerApiServerConfig import com.digitalasset.canton.participant.metrics.ParticipantTestMetrics import com.digitalasset.canton.participant.protocol.EngineController.EngineAbortStatus import com.digitalasset.canton.participant.protocol.Phase37Synchronizer.RequestOutcome @@ -258,11 +260,14 @@ class ProtocolProcessorTest .create( syncDomainPersistentStates, new MemoryStorage(loggerFactory, timeouts), + CommunityStorageConfig.Memory(), clock, None, BatchingConfig(), testedReleaseProtocolVersion, ParticipantTestMetrics, + participant.toLf, + LedgerApiServerConfig(), indexedStringStore, timeouts, futureSupervisor, diff --git a/sdk/canton/ref b/sdk/canton/ref index 735a17cac0a..ce430440eda 100644 --- a/sdk/canton/ref +++ b/sdk/canton/ref @@ -1 +1 @@ -20240629.13565.vdf3ebde5 +20240701.13579.vfc58fab6 diff --git a/sdk/daml-lf/validation/src/test/scala/com/digitalasset/daml/lf/validation/upgrade/UpgradesSpecBase.scala b/sdk/daml-lf/validation/src/test/scala/com/digitalasset/daml/lf/validation/upgrade/UpgradesSpecBase.scala index 54a51cc052c..c0f52b0ae21 100644 --- a/sdk/daml-lf/validation/src/test/scala/com/digitalasset/daml/lf/validation/upgrade/UpgradesSpecBase.scala +++ b/sdk/daml-lf/validation/src/test/scala/com/digitalasset/daml/lf/validation/upgrade/UpgradesSpecBase.scala @@ -773,7 +773,7 @@ abstract class UpgradesSpec(val suffix: String) case _ => {} } cantonLogSrc should include regex ( - s"KNOWN_DAR_VERSION\\(.+,.+\\): A DAR with the same version number has previously been uploaded. err-context:\\{existingPackage=$testPackageV2Id, location=.+, packageVersion=$packageVersion, uploadedPackageId=$testPackageV1Id\\}" + s"KNOWN_DAR_VERSION\\(.+,.+\\): A DAR with the same version number has previously been uploaded. err-context:\\{existingPackage=$testPackageV1Id, location=.+, packageVersion=$packageVersion, uploadedPackageId=$testPackageV2Id\\}" ) uploadV2Result match { case None =>