[LLP] Implement the possibility to have a shared StringInterningView [DPP-1042] (#14062)

* Allow the possibility to have a shared StringInterningView

changelog_begin
changelog_end

* Solve TODO in RecoveringIndexerIntegrationSpec

* Addressed Marton's review comments

* Added thread-safety mentionsin string interning interfaces
This commit is contained in:
tudor-da 2022-06-08 16:13:31 +02:00 committed by GitHub
parent d81b4a7071
commit fc47a8995c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 307 additions and 187 deletions

View File

@ -76,6 +76,7 @@ class IndexerBenchmark() {
readService,
metrics,
LfValueTranslationCache.Cache.none,
None,
)
val resource = for {

View File

@ -3,7 +3,7 @@
package com.daml.platform.store.interning
import org.openjdk.jmh.annotations.{Level, Param, Scope, Setup, State}
import org.openjdk.jmh.annotations._
import scala.concurrent.Future
import scala.concurrent.duration._
@ -50,15 +50,11 @@ object BenchmarkState {
entries
}
def createInterning(entries: Array[(Int, String)]): StringInterningView = {
Console.print(s"Creating an interning view...")
val interning = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
// Note: for slice(), the begin is inclusive and the end is exclusive (opposite of the enclosing call)
_ => Future.successful(entries.view.slice(fromExclusive + 1, toInclusive + 1))
)
Console.println(s" done.")
interning
def loadStringInterningEntries(
entries: Array[(Int, String)]
): LoadStringInterningEntries = {
(fromExclusive, toInclusive) =>
// Note: for slice(), the begin is inclusive and the end is exclusive (opposite of the enclosing call)
_ => Future.successful(entries.view.slice(fromExclusive + 1, toInclusive + 1))
}
}

View File

@ -22,7 +22,7 @@ import scala.concurrent.Await
class InitializationTimeBenchmark extends BenchmarkState {
@Setup(Level.Invocation)
def setupIteration(): Unit = {
interning = BenchmarkState.createInterning(entries)
interning = new StringInterningView()
}
@Benchmark
@ -32,6 +32,12 @@ class InitializationTimeBenchmark extends BenchmarkState {
@Warmup(iterations = 5)
@Measurement(iterations = 5)
def run(): Unit = {
Await.result(interning.update(stringCount)(LoggingContext.ForTesting), perfTestTimeout)
Await.result(
interning
.update(stringCount)(BenchmarkState.loadStringInterningEntries(entries))(
LoggingContext.ForTesting
),
perfTestTimeout,
)
}
}

View File

@ -4,16 +4,7 @@
package com.daml.platform.store.interning
import com.daml.logging.LoggingContext
import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Fork,
Level,
Measurement,
Mode,
Setup,
Warmup,
}
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
@ -23,10 +14,15 @@ class UpdateTimeBenchmark extends BenchmarkState {
@Setup(Level.Iteration)
def setupIteration(): Unit = {
interning = BenchmarkState.createInterning(entries)
interning = new StringInterningView()
interningEnd = stringCount
Await.result(interning.update(interningEnd)(LoggingContext.ForTesting), perfTestTimeout)
Await.result(
interning.update(interningEnd)(BenchmarkState.loadStringInterningEntries(entries))(
LoggingContext.ForTesting
),
perfTestTimeout,
)
}
@Benchmark
@ -38,6 +34,11 @@ class UpdateTimeBenchmark extends BenchmarkState {
interningEnd = interningEnd + 1
if (interningEnd > entries.length) throw new RuntimeException("Can't ingest any more strings")
Await.result(interning.update(interningEnd)(LoggingContext.ForTesting), perfTestTimeout)
Await.result(
interning.update(interningEnd)(BenchmarkState.loadStringInterningEntries(entries))(
LoggingContext.ForTesting
),
perfTestTimeout,
)
}
}

View File

@ -14,6 +14,7 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.IndexServiceConfig
import com.daml.platform.index.IndexServiceBuilder
import com.daml.platform.store.interning.StringInterningView
import com.daml.platform.store.{DbSupport, LfValueTranslationCache}
import scala.concurrent.ExecutionContextExecutor
@ -28,6 +29,8 @@ object StandaloneIndexService {
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
lfValueTranslationCache: LfValueTranslationCache.Cache,
// TODO LLP: Always pass shared stringInterningView
sharedStringInterningViewO: Option[StringInterningView] = None,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
@ -42,6 +45,7 @@ object StandaloneIndexService {
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = new ValueEnricher(engine),
sharedStringInterningViewO = sharedStringInterningViewO,
)(materializer, loggingContext, servicesExecutionContext)
.owner()
.map(index => new TimedIndexService(index, metrics))

View File

@ -30,6 +30,7 @@ import com.daml.platform.store.cache.{
}
import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.interning.{
LoadStringInterningEntries,
StringInterning,
StringInterningView,
UpdatingStringInterningView,
@ -50,6 +51,7 @@ private[platform] case class IndexServiceBuilder(
lfValueTranslationCache: LfValueTranslationCache.Cache,
enricher: ValueEnricher,
participantId: Ref.ParticipantId,
sharedStringInterningViewO: Option[StringInterningView],
)(implicit
mat: Materializer,
loggingContext: LoggingContext,
@ -59,15 +61,22 @@ private[platform] case class IndexServiceBuilder(
def owner(): ResourceOwner[IndexService] = {
val ledgerEndCache = MutableLedgerEndCache()
val stringInterningView = createStringInterningView()
val isSharedStringInterningView = sharedStringInterningViewO.nonEmpty
val stringInterningView = sharedStringInterningViewO.getOrElse(new StringInterningView())
val ledgerDao = createLedgerReadDao(ledgerEndCache, stringInterningView)
for {
ledgerId <- ResourceOwner.forFuture(() => verifyLedgerId(ledgerDao))
ledgerEnd <- ResourceOwner.forFuture(() => ledgerDao.lookupLedgerEnd())
_ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId))
_ <- ResourceOwner.forFuture(() =>
stringInterningView.update(ledgerEnd.lastStringInterningId)
)
_ <-
if (isSharedStringInterningView) {
// The participant-wide (shared) StringInterningView is updated by the Indexer
ResourceOwner.unit
} else {
ResourceOwner.forFuture(() =>
stringInterningView.update(ledgerEnd.lastStringInterningId)(loadStringInterningEntries)
)
}
prefetchingDispatcher <- dispatcherOffsetSeqIdOwner(ledgerEnd)
generalDispatcher <- dispatcherOwner(ledgerEnd.lastOffset)
instrumentedSignalNewLedgerHead = buildInstrumentedSignalNewLedgerHead(
@ -88,6 +97,7 @@ private[platform] case class IndexServiceBuilder(
ledgerEndCache,
)
_ <- cachesUpdaterSubscription(
isSharedStringInterningView,
ledgerDao,
stringInterningView,
instrumentedSignalNewLedgerHead,
@ -106,6 +116,7 @@ private[platform] case class IndexServiceBuilder(
}
private def cachesUpdaterSubscription(
sharedStringInterningView: Boolean,
ledgerDao: LedgerReadDao,
updatingStringInterningView: UpdatingStringInterningView,
instrumentedSignalNewLedgerHead: InstrumentedSignalNewLedgerHead,
@ -117,7 +128,14 @@ private[platform] case class IndexServiceBuilder(
ledgerDao,
newLedgerHead =>
for {
_ <- updatingStringInterningView.update(newLedgerHead.lastStringInterningId)
_ <-
if (sharedStringInterningView) {
// The participant-wide (shared) StringInterningView is updated by the Indexer
Future.unit
} else {
updatingStringInterningView
.update(newLedgerHead.lastStringInterningId)(loadStringInterningEntries)
}
} yield {
instrumentedSignalNewLedgerHead.startTimer(newLedgerHead.lastOffset)
prefetchingDispatcher.signalNewHead(
@ -128,6 +146,18 @@ private[platform] case class IndexServiceBuilder(
)(_.release())
.map(_ => ())
private def loadStringInterningEntries: LoadStringInterningEntries = {
(fromExclusive: Int, toInclusive: Int) => implicit loggingContext: LoggingContext =>
dbSupport.dbDispatcher
.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
dbSupport.storageBackendFactory.createStringInterningStorageBackend
.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
}
private def buildInstrumentedSignalNewLedgerHead(
ledgerEndCache: MutableLedgerEndCache,
generalDispatcher: Dispatcher[Offset],
@ -157,22 +187,6 @@ private[platform] case class IndexServiceBuilder(
)(servicesExecutionContext, loggingContext),
)(servicesExecutionContext, loggingContext)
private def createStringInterningView() = {
val stringInterningStorageBackend =
dbSupport.storageBackendFactory.createStringInterningStorageBackend
new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
implicit loggingContext =>
dbSupport.dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
}
private def cacheComponentsAndSubscription(
config: IndexServiceConfig,
contractStore: MutableCacheBackedContractStore,

View File

@ -17,6 +17,7 @@ import com.daml.platform.indexer.parallel.{
import com.daml.platform.store.DbSupport.ParticipantDataSourceConfig
import com.daml.platform.store.dao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.interning.StringInterningView
import com.daml.platform.store.{DbType, LfValueTranslationCache}
import scala.concurrent.Future
@ -29,6 +30,7 @@ object JdbcIndexer {
readService: state.ReadService,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
stringInterningViewO: Option[StringInterningView],
)(implicit materializer: Materializer) {
def initialized()(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = {
@ -53,6 +55,7 @@ object JdbcIndexer {
providedParticipantId = participantId,
parameterStorageBackend = parameterStorageBackend,
ingestionStorageBackend = ingestionStorageBackend,
stringInterningStorageBackend = stringInterningStorageBackend,
metrics = metrics,
),
parallelIndexerSubscription = ParallelIndexerSubscription(
@ -75,7 +78,6 @@ object JdbcIndexer {
submissionBatchSize = config.submissionBatchSize,
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
meteringAggregator = new MeteringAggregator.Owner(
meteringStore = meteringStoreBackend,
meteringParameterStore = meteringParameterStorageBackend,
@ -84,10 +86,10 @@ object JdbcIndexer {
).apply,
mat = materializer,
readService = readService,
stringInterningViewO = stringInterningViewO,
)
indexer
}
}
}

View File

@ -11,6 +11,7 @@ import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.store.DbSupport.ParticipantDataSourceConfig
import com.daml.platform.store.interning.StringInterningView
import com.daml.platform.store.{FlywayMigrations, LfValueTranslationCache}
import scala.concurrent.Future
@ -23,6 +24,8 @@ final class StandaloneIndexerServer(
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
additionalMigrationPaths: Seq[String] = Seq.empty,
// TODO LLP: Always pass shared stringInterningView
stringInterningViewO: Option[StringInterningView] = None,
)(implicit materializer: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[ReportsHealth] {
@ -41,6 +44,7 @@ final class StandaloneIndexerServer(
readService,
metrics,
lfValueTranslationCache,
stringInterningViewO,
)
val indexer = RecoveringIndexer(
materializer.system.scheduler,

View File

@ -13,7 +13,11 @@ import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.store.dao.DbDispatcher
import com.daml.platform.store.backend.{IngestionStorageBackend, ParameterStorageBackend}
import com.daml.platform.store.backend.{
IngestionStorageBackend,
ParameterStorageBackend,
StringInterningStorageBackend,
}
import com.daml.platform.store.interning.UpdatingStringInterningView
import scala.concurrent.{ExecutionContext, Future}
@ -22,6 +26,7 @@ private[platform] case class InitializeParallelIngestion(
providedParticipantId: Ref.ParticipantId,
ingestionStorageBackend: IngestionStorageBackend[_],
parameterStorageBackend: ParameterStorageBackend,
stringInterningStorageBackend: StringInterningStorageBackend,
metrics: Metrics,
) {
@ -55,7 +60,7 @@ private[platform] case class InitializeParallelIngestion(
_ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
ingestionStorageBackend.deletePartiallyIngestedData(ledgerEnd)
)
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
_ <- updateStringInterningView(dbDispatcher, updatingStringInterningView, ledgerEnd)
} yield InitializeParallelIngestion.Initialized(
initialEventSeqId = ledgerEnd.lastEventSeqId,
initialStringInterningId = ledgerEnd.lastStringInterningId,
@ -63,6 +68,21 @@ private[platform] case class InitializeParallelIngestion(
)
}
private def updateStringInterningView(
dbDispatcher: DbDispatcher,
updatingStringInterningView: UpdatingStringInterningView,
ledgerEnd: ParameterStorageBackend.LedgerEnd,
)(implicit loggingContext: LoggingContext): Future[Unit] =
updatingStringInterningView.update(ledgerEnd.lastStringInterningId)(
(fromExclusive, toInclusive) =>
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
}
object InitializeParallelIngestion {

View File

@ -13,12 +13,8 @@ import com.daml.platform.indexer.Indexer
import com.daml.platform.indexer.ha.{HaConfig, HaCoordinator, Handle, NoopHaCoordinator}
import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.store.DbSupport.DbConfig
import com.daml.platform.store.backend.{DBLockStorageBackend, DataSourceStorageBackend}
import com.daml.platform.store.dao.DbDispatcher
import com.daml.platform.store.backend.{
DBLockStorageBackend,
DataSourceStorageBackend,
StringInterningStorageBackend,
}
import com.daml.platform.store.interning.StringInterningView
import com.google.common.util.concurrent.ThreadFactoryBuilder
@ -40,10 +36,10 @@ object ParallelIndexerFactory {
dataSourceStorageBackend: DataSourceStorageBackend,
initializeParallelIngestion: InitializeParallelIngestion,
parallelIndexerSubscription: ParallelIndexerSubscription[_],
stringInterningStorageBackend: StringInterningStorageBackend,
meteringAggregator: DbDispatcher => ResourceOwner[Unit],
mat: Materializer,
readService: ReadService,
stringInterningViewO: Option[StringInterningView],
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] =
for {
inputMapperExecutor <- asyncPool(
@ -109,16 +105,7 @@ object ParallelIndexerFactory {
_ <- meteringAggregator(dbDispatcher)
} yield dbDispatcher
) { dbDispatcher =>
val stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
val stringInterningView = stringInterningViewO.getOrElse(new StringInterningView)
initializeParallelIngestion(
dbDispatcher = dbDispatcher,
updatingStringInterningView = stringInterningView,

View File

@ -78,8 +78,7 @@ object IndexMetadata {
enricher = None,
participantId = Ref.ParticipantId.assertFromString("1"),
ledgerEndCache = MutableLedgerEndCache(), // not used
stringInterning =
new StringInterningView((_, _) => _ => Future.successful(Nil)), // not used
stringInterning = new StringInterningView(), // not used
)
)
}

View File

@ -35,4 +35,19 @@ private[interning] object RawStringInterning {
(index + 1 + rawStringInterning.lastId, string)
}
.toVector
def resetTo(
lastPersistedStringInterningId: Int,
rawStringInterning: RawStringInterning,
): RawStringInterning =
if (lastPersistedStringInterningId < rawStringInterning.lastId) {
val idsToBeRemoved = lastPersistedStringInterningId + 1 to rawStringInterning.lastId
val stringsToBeRemoved = idsToBeRemoved.map(rawStringInterning.idMap)
RawStringInterning(
map = rawStringInterning.map.removedAll(stringsToBeRemoved),
idMap = rawStringInterning.idMap.removedAll(idsToBeRemoved),
lastId = lastPersistedStringInterningId,
)
} else rawStringInterning
}

View File

@ -6,6 +6,9 @@ package com.daml.platform.store.interning
import com.daml.platform.{Identifier, Party}
/** The facade for all supported string-interning domains
*
* @note The accessors defined in this interface are thread-safe and can
* be used concurrently with [[StringInterningView.internize]] and [[StringInterningView.update]].
*/
trait StringInterning {
def templateId: StringInterningDomain[Identifier]

View File

@ -19,6 +19,8 @@ trait InternizingStringInterningView {
*
* @param domainStringIterators iterators of the new entires
* @return If some of the entries were not part of the view: they will be added, and these will be returned as a interned-id and raw, prefixed string pairs.
*
* @note This method is thread-safe.
*/
def internize(domainStringIterators: DomainStringIterators): Iterable[(Int, String)]
}
@ -28,9 +30,17 @@ trait UpdatingStringInterningView {
/** Update the StringInterningView from persistence
*
* @param lastStringInterningId this is the "version" of the persistent view, which from the StringInterningView can see if it is behind
* @return a completion Future: if the view is behind it will load the missing entries from persistence, and update the view state
* @return a completion Future:
*
* * if the view is behind, it will load the missing entries from persistence, and update the view state.
*
* * if the view is ahead, it will remove all entries with ids greater than the `lastStringInterningId`
*
* @note This method is NOT thread-safe and should not be called concurrently with itself or [[InternizingStringInterningView.internize]].
*/
def update(lastStringInterningId: Int)(implicit loggingContext: LoggingContext): Future[Unit]
def update(lastStringInterningId: Int)(
loadPrefixedEntries: LoadStringInterningEntries
)(implicit loggingContext: LoggingContext): Future[Unit]
}
/** Encapsulate the dependency to load a range of string-interning-entries from persistence
@ -47,7 +57,7 @@ trait LoadStringInterningEntries {
* - The single, volatile reference enables non-synchronized access from all threads, accessing persistent-immutable datastructure
* - On the writing side it synchronizes (this usage is anyway expected) and maintains the immutable internal datastructure
*/
class StringInterningView(loadPrefixedEntries: LoadStringInterningEntries)
class StringInterningView()
extends StringInterning
with InternizingStringInterningView
with UpdatingStringInterningView {
@ -92,13 +102,14 @@ class StringInterningView(loadPrefixedEntries: LoadStringInterningEntries)
newEntries
}
override def update(
lastStringInterningId: Int
override def update(lastStringInterningId: Int)(
loadStringInterningEntries: LoadStringInterningEntries
)(implicit loggingContext: LoggingContext): Future[Unit] =
if (lastStringInterningId <= raw.lastId) {
raw = RawStringInterning.resetTo(lastStringInterningId, raw)
Future.unit
} else {
loadPrefixedEntries(raw.lastId, lastStringInterningId)(loggingContext)
loadStringInterningEntries(raw.lastId, lastStringInterningId)(loggingContext)
.map(updateView)(ExecutionContext.parasitic)
}

View File

@ -13,6 +13,7 @@ import com.daml.metrics.Metrics
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneIndexerServer}
import com.daml.platform.store.DbSupport.ParticipantDataSourceConfig
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.interning.StringInterningView
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
@ -101,6 +102,7 @@ object IndexerStabilityTestFixture {
config = indexerConfig,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
stringInterningViewO = Some(new StringInterningView),
).acquire()
} yield ReadServiceAndIndexer(readService, indexing)
)

View File

@ -14,16 +14,16 @@ import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.DbSupport.{ConnectionPoolConfig, DbConfig}
import com.daml.platform.store.dao.events.CompressionStrategy
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.dao.JdbcLedgerDaoBackend.{TestLedgerId, TestParticipantId}
import com.daml.platform.store.dao.events.CompressionStrategy
import com.daml.platform.store.interning.StringInterningView
import com.daml.platform.store.{DbSupport, DbType, LfValueTranslationCache}
import org.scalatest.AsyncTestSuite
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
object JdbcLedgerDaoBackend {
@ -114,7 +114,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
// We use the dispatcher here because the default Scalatest execution context is too slow.
implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
ledgerEndCache = MutableLedgerEndCache()
stringInterningView = new StringInterningView((_, _) => _ => Future.successful(Nil))
stringInterningView = new StringInterningView()
resource = newLoggingContext { implicit loggingContext =>
for {
dao <- daoOwner(

View File

@ -90,4 +90,18 @@ class RawStringInterningSpec extends AnyFlatSpec with Matchers {
)
newEntries shouldBe Vector(3 -> "three", 4 -> "four")
}
it should "remove entries after the lastPersistedStringInterningId on `resetTo`" in {
val current = RawStringInterning(Map("one" -> 1, "two" -> 2), Map(1 -> "one", 2 -> "two"), 2)
val purgedStringInterning =
RawStringInterning.resetTo(lastPersistedStringInterningId = 1, current)
purgedStringInterning shouldBe RawStringInterning(Map("one" -> 1), Map(1 -> "one"), 1)
}
it should "not remove entries if lastPersistedStringInterningId is lteq lastId on `resetTo`" in {
val current = RawStringInterning(Map("one" -> 1, "two" -> 2), Map(1 -> "one", 2 -> "two"), 2)
val purgedStringInterning =
RawStringInterning.resetTo(lastPersistedStringInterningId = 2, current)
purgedStringInterning shouldBe current
}
}

View File

@ -17,7 +17,7 @@ class StringInterningViewSpec extends AsyncFlatSpec with Matchers {
behavior of "StringInterningView"
it should "provide working cache by extending" in {
val testee = new StringInterningView((_, _) => _ => Future.successful(Nil))
val testee = new StringInterningView()
partyAbsent(testee, "p1")
partyAbsent(testee, "p2")
partyAbsent(testee, "22:same:name")
@ -48,7 +48,7 @@ class StringInterningViewSpec extends AsyncFlatSpec with Matchers {
}
it should "extend working view correctly" in {
val testee = new StringInterningView((_, _) => _ => Future.successful(Nil))
val testee = new StringInterningView()
partyAbsent(testee, "p1")
partyAbsent(testee, "p2")
partyAbsent(testee, "22:same:name")
@ -93,56 +93,45 @@ class StringInterningViewSpec extends AsyncFlatSpec with Matchers {
templateAbsent(testee, "22:unkno:wn")
}
it should "not update view if last id is behind" in {
val testee = new StringInterningView((from, to) =>
_ => {
from shouldBe 0
to shouldBe 6
Future.successful(
Vector(
1 -> "p|p1",
2 -> "p|p2",
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
)
}
)
it should "correctly load prefixing entries in the view on `update`" in {
val testee = new StringInterningView()
partyAbsent(testee, "p1")
partyAbsent(testee, "p2")
partyAbsent(testee, "22:same:name")
templateAbsent(testee, "22:t:a")
templateAbsent(testee, "22:t:b")
templateAbsent(testee, "22:same:name")
testee.update(6).map { _ =>
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templatePresent(testee, "22:t:b", 5)
templatePresent(testee, "22:same:name", 6)
templateAbsent(testee, "22:unk:nown")
}
testee
.update(6)((from, to) =>
_ => {
from shouldBe 0
to shouldBe 6
Future.successful(
Vector(
1 -> "p|p1",
2 -> "p|p2",
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
)
}
)
.map { _ =>
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templatePresent(testee, "22:t:b", 5)
templatePresent(testee, "22:same:name", 6)
templateAbsent(testee, "22:unk:nown")
}
}
it should "be able to update working view correctly" in {
val testee = new StringInterningView((from, to) =>
_ => {
from shouldBe 2
to shouldBe 6
Future.successful(
Vector(
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
)
}
)
val testee = new StringInterningView()
partyAbsent(testee, "p1")
partyAbsent(testee, "p2")
partyAbsent(testee, "22:same:name")
@ -161,16 +150,82 @@ class StringInterningViewSpec extends AsyncFlatSpec with Matchers {
templateAbsent(testee, "22:t:a")
templateAbsent(testee, "22:t:b")
templateAbsent(testee, "22:same:name")
testee.update(6).map { _ =>
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templatePresent(testee, "22:t:b", 5)
templatePresent(testee, "22:same:name", 6)
templateAbsent(testee, "22:unk:nown")
}
testee
.update(6)((from, to) =>
_ => {
from shouldBe 2
to shouldBe 6
Future.successful(
Vector(
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
)
}
)
.map { _ =>
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templatePresent(testee, "22:t:b", 5)
templatePresent(testee, "22:same:name", 6)
templateAbsent(testee, "22:unk:nown")
}
}
it should "remove entries if lastStringInterningId is greater than lastId" in {
val testee = new StringInterningView()
testee.internize(
new DomainStringIterators(
parties = List("p1", "p2", "22:same:name").iterator,
templateIds = List("22:t:a", "22:t:b", "22:same:name").iterator,
)
) shouldBe Vector(
1 -> "p|p1",
2 -> "p|p2",
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templatePresent(testee, "22:t:b", 5)
templatePresent(testee, "22:same:name", 6)
templateAbsent(testee, "22:unkno:wn")
testee
.update(4)((from, to) =>
_ => {
from shouldBe 2
to shouldBe 6
Future.successful(
Vector(
3 -> "p|22:same:name",
4 -> "t|22:t:a",
5 -> "t|22:t:b",
6 -> "t|22:same:name",
)
)
}
)
.map { _ =>
partyPresent(testee, "p1", 1)
partyPresent(testee, "p2", 2)
partyPresent(testee, "22:same:name", 3)
partyAbsent(testee, "unknown")
templatePresent(testee, "22:t:a", 4)
templateAbsent(testee, "22:t:b")
templateAbsent(testee, "22:same:name")
templateAbsent(testee, "22:unkno:wn")
}
}
private def partyPresent(view: StringInterning, party: String, id: Int) = {

View File

@ -3,11 +3,6 @@
package com.daml.platform.indexer
import java.time.Instant
import java.time.temporal.ChronoUnit.SECONDS
import java.util.UUID
import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicLong
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{BroadcastHub, Keep, Source}
@ -36,9 +31,7 @@ import com.daml.platform.store.DbSupport.{
DbConfig,
ParticipantDataSourceConfig,
}
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.interning.StringInterningView
import com.daml.platform.store.{DbSupport, LfValueTranslationCache}
import com.daml.platform.testing.LogCollector
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
@ -49,6 +42,11 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import java.time.Instant
import java.time.temporal.ChronoUnit.SECONDS
import java.util.UUID
import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
@ -217,26 +215,7 @@ class RecoveringIndexerIntegrationSpec
private def eventuallyPartiesShouldBe(partyNames: String*)(implicit
loggingContext: LoggingContext
): Future[Unit] =
dao.use { case (ledgerDao, ledgerEndCache) =>
eventually { (_, _) =>
for {
ledgerEnd <- ledgerDao.lookupLedgerEnd()
_ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
knownParties <- ledgerDao.listKnownParties()
} yield {
knownParties.map(_.displayName) shouldBe partyNames.map(Some(_))
()
}
}
}
// TODO we probably do not need a full dao for this purpose: refactoring with direct usage of StorageBackend?
private def dao(implicit
loggingContext: LoggingContext
): ResourceOwner[(LedgerReadDao, MutableLedgerEndCache)] = {
val mutableLedgerEndCache = MutableLedgerEndCache()
val stringInterning = new StringInterningView((_, _) => _ => Future.successful(Nil)) // not used
): Future[Unit] = {
val jdbcUrl =
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false"
val metrics = new Metrics(new MetricRegistry)
@ -252,26 +231,26 @@ class RecoveringIndexerIntegrationSpec
),
),
)
.map(dbSupport =>
JdbcLedgerDao.read(
dbSupport = dbSupport,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
acsIdPageSize = 20000,
acsIdPageBufferSize = 1,
acsIdPageWorkingMemoryBytes = 100 * 1024 * 1024,
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"),
ledgerEndCache = mutableLedgerEndCache,
stringInterning = stringInterning,
) -> mutableLedgerEndCache
)
.use { dbSupport =>
val ledgerEndCache = MutableLedgerEndCache()
val storageBackendFactory = dbSupport.storageBackendFactory
val partyStorageBacked = storageBackendFactory.createPartyStorageBackend(ledgerEndCache)
val parameterStorageBackend = storageBackendFactory.createParameterStorageBackend
val dbDispatcher = dbSupport.dbDispatcher
eventually { (_, _) =>
for {
ledgerEnd <- dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerEnd)(parameterStorageBackend.ledgerEnd)
_ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
knownParties <- dbDispatcher
.executeSql(metrics.daml.index.db.loadAllParties)(partyStorageBacked.knownParties)
} yield {
knownParties.map(_.displayName) shouldBe partyNames.map(Some(_))
()
}
}
}
}
}

View File

@ -49,6 +49,7 @@ import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.runner.common.MetricsConfig.MetricRegistryType
import com.daml.platform.store.DbSupport.ParticipantDataSourceConfig
import com.daml.ports.Port
import com.daml.platform.store.interning.StringInterningView
import scala.util.Try
@ -182,15 +183,6 @@ object SandboxOnXRunner {
stateUpdatesSource,
)
indexerHealthChecks <- buildIndexerServer(
metrics,
new TimedReadService(readServiceWithSubscriber, metrics),
translationCache,
participantId,
participantConfig,
participantDataSourceConfig,
)
dbSupport <- DbSupport
.owner(
serverRole = ServerRole.ApiServer,
@ -200,6 +192,18 @@ object SandboxOnXRunner {
),
)
sharedStringInterningView = new StringInterningView
indexerHealthChecks <- buildIndexerServer(
metrics,
new TimedReadService(readServiceWithSubscriber, metrics),
translationCache,
participantId,
participantConfig,
participantDataSourceConfig,
sharedStringInterningView,
)
indexService <- StandaloneIndexService(
ledgerId = config.ledgerId,
config = participantConfig.indexService,
@ -209,6 +213,7 @@ object SandboxOnXRunner {
lfValueTranslationCache = translationCache,
dbSupport = dbSupport,
participantId = participantId,
sharedStringInterningViewO = Some(sharedStringInterningView),
)
timeServiceBackend = configAdaptor.timeServiceBackend(participantConfig.apiServer)
@ -307,6 +312,7 @@ object SandboxOnXRunner {
participantId: Ref.ParticipantId,
participantConfig: ParticipantConfig,
participantDataSourceConfig: ParticipantDataSourceConfig,
stringInterningView: StringInterningView,
)(implicit
loggingContext: LoggingContext,
materializer: Materializer,
@ -319,6 +325,7 @@ object SandboxOnXRunner {
config = participantConfig.indexer,
metrics = metrics,
lfValueTranslationCache = translationCache,
stringInterningViewO = Some(stringInterningView),
)
} yield new HealthChecks(
"read" -> readService,