mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
String interning read integration [DPP-706] (#11638)
* Wire to read pipelines (initialization and update) * Patch StringInterning through the appropriate StorageBackend-s * Creating a StringInterning for tests (which interns everything it sees) changelog_begin changelog_end
This commit is contained in:
parent
c5a1f0bb61
commit
fd61d0bcbc
@ -31,6 +31,11 @@ import com.daml.platform.store.appendonlydao.{
|
||||
}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
|
||||
import com.daml.platform.store.interning.{
|
||||
StringInterning,
|
||||
StringInterningView,
|
||||
UpdatingStringInterningView,
|
||||
}
|
||||
import com.daml.platform.store.{BaseLedger, DbType, LfValueTranslationCache}
|
||||
import com.daml.resources.ProgramResource.StartupException
|
||||
import com.daml.timer.RetryStrategy
|
||||
@ -79,15 +84,27 @@ private[platform] object ReadOnlySqlLedger {
|
||||
metrics = metrics,
|
||||
)
|
||||
.acquire()
|
||||
ledgerDao = ledgerDaoOwner(
|
||||
stringInterningStorageBackend = storageBackendFactory.createStringInterningStorageBackend
|
||||
stringInterningView = new StringInterningView(
|
||||
loadPrefixedEntries = (fromExclusive, toInclusive) =>
|
||||
implicit loggingContext =>
|
||||
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
|
||||
stringInterningStorageBackend.loadStringInterningEntries(
|
||||
fromExclusive,
|
||||
toInclusive,
|
||||
)
|
||||
}
|
||||
)
|
||||
ledgerDao = createLedgerDao(
|
||||
servicesExecutionContext,
|
||||
errorFactories,
|
||||
ledgerEndCache,
|
||||
stringInterningView,
|
||||
dbDispatcher,
|
||||
storageBackendFactory,
|
||||
)
|
||||
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
|
||||
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
|
||||
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache, stringInterningView).acquire()
|
||||
} yield ledger
|
||||
}
|
||||
|
||||
@ -95,11 +112,13 @@ private[platform] object ReadOnlySqlLedger {
|
||||
ledgerDao: LedgerReadDao,
|
||||
ledgerId: LedgerId,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
updatingStringInterningView: UpdatingStringInterningView,
|
||||
) =
|
||||
if (enableMutableContractStateCache) {
|
||||
new ReadOnlySqlLedgerWithMutableCache.Owner(
|
||||
ledgerDao,
|
||||
ledgerEndCache,
|
||||
updatingStringInterningView,
|
||||
enricher,
|
||||
ledgerId,
|
||||
metrics,
|
||||
@ -113,6 +132,7 @@ private[platform] object ReadOnlySqlLedger {
|
||||
new ReadOnlySqlLedgerWithTranslationCache.Owner(
|
||||
ledgerDao,
|
||||
ledgerEndCache,
|
||||
updatingStringInterningView,
|
||||
ledgerId,
|
||||
lfValueTranslationCache,
|
||||
)
|
||||
@ -158,10 +178,11 @@ private[platform] object ReadOnlySqlLedger {
|
||||
}
|
||||
}
|
||||
|
||||
private def ledgerDaoOwner(
|
||||
private def createLedgerDao(
|
||||
servicesExecutionContext: ExecutionContext,
|
||||
errorFactories: ErrorFactories,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
dbDispatcher: DbDispatcher,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
): LedgerReadDao =
|
||||
@ -176,6 +197,7 @@ private[platform] object ReadOnlySqlLedger {
|
||||
participantId = participantId,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterning,
|
||||
errorFactories = errorFactories,
|
||||
)
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import com.daml.platform.store.cache.{
|
||||
MutableLedgerEndCache,
|
||||
}
|
||||
import com.daml.platform.store.interfaces.TransactionLogUpdate
|
||||
import com.daml.platform.store.interning.UpdatingStringInterningView
|
||||
import com.daml.scalautil.Statement.discard
|
||||
|
||||
import scala.collection.mutable
|
||||
@ -39,6 +40,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
|
||||
final class Owner(
|
||||
ledgerDao: LedgerReadDao,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
updatingStringInterningView: UpdatingStringInterningView,
|
||||
enricher: ValueEnricher,
|
||||
ledgerId: LedgerId,
|
||||
metrics: Metrics,
|
||||
@ -59,6 +61,9 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
|
||||
ledgerDao.lookupLedgerEnd()
|
||||
)
|
||||
_ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId))
|
||||
_ <- Resource.fromFuture(
|
||||
updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
|
||||
)
|
||||
prefetchingDispatcher <- dispatcherOffsetSeqIdOwner(
|
||||
ledgerEnd.lastOffset,
|
||||
ledgerEnd.lastEventSeqId,
|
||||
@ -154,6 +159,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
|
||||
cacheUpdatesDispatcher,
|
||||
generalDispatcher,
|
||||
dispatcherLagMeter,
|
||||
updatingStringInterningView,
|
||||
)
|
||||
)
|
||||
} yield ledger
|
||||
@ -226,6 +232,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
|
||||
contractStateEventsDispatcher = cacheUpdatesDispatcher,
|
||||
dispatcher = generalDispatcher,
|
||||
dispatcherLagger = dispatcherLagMeter,
|
||||
updatingStringInterningView = updatingStringInterningView,
|
||||
)
|
||||
)
|
||||
} yield ledger
|
||||
@ -277,6 +284,7 @@ private final class ReadOnlySqlLedgerWithMutableCache(
|
||||
contractStateEventsDispatcher: Dispatcher[(Offset, Long)],
|
||||
dispatcher: Dispatcher[Offset],
|
||||
dispatcherLagger: DispatcherLagMeter,
|
||||
updatingStringInterningView: UpdatingStringInterningView,
|
||||
)(implicit mat: Materializer, loggingContext: LoggingContext)
|
||||
extends ReadOnlySqlLedger(
|
||||
ledgerId,
|
||||
@ -294,7 +302,14 @@ private final class ReadOnlySqlLedgerWithMutableCache(
|
||||
)(() =>
|
||||
Source
|
||||
.tick(0.millis, 100.millis, ())
|
||||
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
|
||||
.mapAsync(1) {
|
||||
implicit val ec: ExecutionContext = mat.executionContext
|
||||
_ =>
|
||||
for {
|
||||
ledgerEnd <- ledgerDao.lookupLedgerEnd()
|
||||
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
|
||||
} yield ledgerEnd
|
||||
}
|
||||
)
|
||||
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
|
||||
.toMat(Sink.foreach { case newLedgerHead =>
|
||||
|
@ -16,15 +16,17 @@ import com.daml.platform.akkastreams.dispatcher.Dispatcher
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
import com.daml.platform.store.appendonlydao.LedgerReadDao
|
||||
import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore}
|
||||
import com.daml.platform.store.interning.UpdatingStringInterningView
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, Future}
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
|
||||
private[index] object ReadOnlySqlLedgerWithTranslationCache {
|
||||
|
||||
final class Owner(
|
||||
ledgerDao: LedgerReadDao,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
updatingStringInterningView: UpdatingStringInterningView,
|
||||
ledgerId: LedgerId,
|
||||
lfValueTranslationCache: LfValueTranslationCache.Cache,
|
||||
)(implicit mat: Materializer, loggingContext: LoggingContext)
|
||||
@ -50,6 +52,7 @@ private[index] object ReadOnlySqlLedgerWithTranslationCache {
|
||||
ledgerEndCache,
|
||||
contractsStore,
|
||||
dispatcher,
|
||||
updatingStringInterningView,
|
||||
)
|
||||
)
|
||||
|
||||
@ -72,6 +75,7 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
contractStore: ContractStore,
|
||||
dispatcher: Dispatcher[Offset],
|
||||
updatingStringInterningView: UpdatingStringInterningView,
|
||||
)(implicit mat: Materializer, loggingContext: LoggingContext)
|
||||
extends ReadOnlySqlLedger(
|
||||
ledgerId,
|
||||
@ -89,7 +93,14 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
|
||||
)(() =>
|
||||
Source
|
||||
.tick(0.millis, 100.millis, ())
|
||||
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
|
||||
.mapAsync(1) {
|
||||
implicit val ec: ExecutionContext = mat.executionContext
|
||||
_ =>
|
||||
for {
|
||||
ledgerEnd <- ledgerDao.lookupLedgerEnd()
|
||||
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
|
||||
} yield ledgerEnd
|
||||
}
|
||||
)
|
||||
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
|
||||
.toMat(Sink.foreach { ledgerEnd =>
|
||||
|
@ -17,6 +17,7 @@ import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterningView
|
||||
import scalaz.Tag
|
||||
|
||||
import scala.concurrent.duration._
|
||||
@ -67,9 +68,11 @@ object IndexMetadata {
|
||||
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
|
||||
enricher = None,
|
||||
participantId = Ref.ParticipantId.assertFromString("1"),
|
||||
errorFactories = errorFactories,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = MutableLedgerEndCache(), // not used
|
||||
errorFactories = errorFactories,
|
||||
stringInterning =
|
||||
new StringInterningView((_, _) => _ => Future.successful(Nil)), // not used
|
||||
)
|
||||
)
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ import com.daml.platform.store.entries.{
|
||||
PackageLedgerEntry,
|
||||
PartyLedgerEntry,
|
||||
}
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
@ -759,6 +760,7 @@ private[platform] object JdbcLedgerDao {
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): LedgerReadDao =
|
||||
new MeteredLedgerReadDao(
|
||||
new JdbcLedgerDao(
|
||||
@ -773,7 +775,7 @@ private[platform] object JdbcLedgerDao {
|
||||
enricher,
|
||||
SequentialWriteDao.noop,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
@ -795,6 +797,7 @@ private[platform] object JdbcLedgerDao {
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): LedgerDao =
|
||||
new MeteredLedgerDao(
|
||||
new JdbcLedgerDao(
|
||||
@ -809,7 +812,7 @@ private[platform] object JdbcLedgerDao {
|
||||
enricher,
|
||||
sequentialWriteDao,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
@ -832,6 +835,7 @@ private[platform] object JdbcLedgerDao {
|
||||
errorFactories: ErrorFactories,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): LedgerDao =
|
||||
new MeteredLedgerDao(
|
||||
new JdbcLedgerDao(
|
||||
@ -846,7 +850,7 @@ private[platform] object JdbcLedgerDao {
|
||||
enricher,
|
||||
sequentialWriteDao,
|
||||
participantId,
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache),
|
||||
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
|
||||
storageBackendFactory.createParameterStorageBackend,
|
||||
storageBackendFactory.createDeduplicationStorageBackend,
|
||||
storageBackendFactory.createResetStorageBackend,
|
||||
|
@ -8,6 +8,7 @@ import com.daml.platform.store.backend.h2.H2StorageBackendFactory
|
||||
import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory
|
||||
import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
trait StorageBackendFactory {
|
||||
def createIngestionStorageBackend: IngestionStorageBackend[_]
|
||||
@ -16,23 +17,32 @@ trait StorageBackendFactory {
|
||||
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
|
||||
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
|
||||
def createDeduplicationStorageBackend: DeduplicationStorageBackend
|
||||
def createCompletionStorageBackend: CompletionStorageBackend
|
||||
def createContractStorageBackend(ledgerEndCache: LedgerEndCache): ContractStorageBackend
|
||||
def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend
|
||||
def createCompletionStorageBackend(stringInterning: StringInterning): CompletionStorageBackend
|
||||
def createContractStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): ContractStorageBackend
|
||||
def createEventStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): EventStorageBackend
|
||||
def createDataSourceStorageBackend: DataSourceStorageBackend
|
||||
def createDBLockStorageBackend: DBLockStorageBackend
|
||||
def createIntegrityStorageBackend: IntegrityStorageBackend
|
||||
def createResetStorageBackend: ResetStorageBackend
|
||||
def createStringInterningStorageBackend: StringInterningStorageBackend
|
||||
|
||||
final def readStorageBackend(ledgerEndCache: LedgerEndCache): ReadStorageBackend =
|
||||
final def readStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): ReadStorageBackend =
|
||||
ReadStorageBackend(
|
||||
configurationStorageBackend = createConfigurationStorageBackend(ledgerEndCache),
|
||||
partyStorageBackend = createPartyStorageBackend(ledgerEndCache),
|
||||
packageStorageBackend = createPackageStorageBackend(ledgerEndCache),
|
||||
completionStorageBackend = createCompletionStorageBackend,
|
||||
contractStorageBackend = createContractStorageBackend(ledgerEndCache),
|
||||
eventStorageBackend = createEventStorageBackend(ledgerEndCache),
|
||||
completionStorageBackend = createCompletionStorageBackend(stringInterning),
|
||||
contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning),
|
||||
eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
package com.daml.platform.store.backend.common
|
||||
|
||||
import java.sql.Connection
|
||||
|
||||
import anorm.SqlParser.{byteArray, int, long, str}
|
||||
import anorm.{Row, RowParser, SimpleSql, ~}
|
||||
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
|
||||
@ -17,12 +18,15 @@ import com.daml.platform.store.CompletionFromTransaction
|
||||
import com.daml.platform.store.Conversions.{offset, timestampFromMicros}
|
||||
import com.daml.platform.store.backend.CompletionStorageBackend
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
import com.google.protobuf.any
|
||||
import com.google.rpc.status.{Status => StatusProto}
|
||||
|
||||
class CompletionStorageBackendTemplate(queryStrategy: QueryStrategy)
|
||||
extends CompletionStorageBackend {
|
||||
|
||||
class CompletionStorageBackendTemplate(
|
||||
queryStrategy: QueryStrategy,
|
||||
stringInterning: StringInterning,
|
||||
) extends CompletionStorageBackend {
|
||||
assert(stringInterning != null) // TODO remove
|
||||
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
override def commandCompletions(
|
||||
|
@ -27,14 +27,16 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
|
||||
KeyState,
|
||||
KeyUnassigned,
|
||||
}
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
class ContractStorageBackendTemplate(
|
||||
queryStrategy: QueryStrategy,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
) extends ContractStorageBackend {
|
||||
|
||||
assert(stringInterning != null) // TODO remove
|
||||
override def contractKeyGlobally(key: Key)(connection: Connection): Option[ContractId] =
|
||||
contractKey(
|
||||
resultColumns = List("contract_id"),
|
||||
|
@ -25,6 +25,7 @@ import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeP
|
||||
import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
import scala.collection.compat.immutable.ArraySeq
|
||||
|
||||
@ -32,12 +33,13 @@ abstract class EventStorageBackendTemplate(
|
||||
eventStrategy: EventStrategy,
|
||||
queryStrategy: QueryStrategy,
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
// TODO Refactoring: This method is needed in pruneEvents, but belongs to [[ParameterStorageBackend]].
|
||||
// Remove with the break-out of pruneEvents.
|
||||
participantAllDivulgedContractsPrunedUpToInclusive: Connection => Option[Offset],
|
||||
) extends EventStorageBackend {
|
||||
import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray
|
||||
|
||||
assert(stringInterning != null) // TODO remove
|
||||
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
private val selectColumnsForFlatTransactions =
|
||||
|
@ -8,9 +8,10 @@ import com.daml.platform.store.appendonlydao.events.ContractId
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
|
||||
import com.daml.platform.store.backend.common.ContractStorageBackendTemplate
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
class H2ContractStorageBackend(ledgerEndCache: LedgerEndCache)
|
||||
extends ContractStorageBackendTemplate(H2QueryStrategy, ledgerEndCache) {
|
||||
class H2ContractStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
|
||||
extends ContractStorageBackendTemplate(H2QueryStrategy, ledgerEndCache, stringInterning) {
|
||||
override def maximumLedgerTimeSqlLiteral(
|
||||
id: ContractId,
|
||||
lastEventSequentialId: Long,
|
||||
|
@ -13,12 +13,14 @@ import com.daml.platform.store.backend.common.{
|
||||
ParameterStorageBackendTemplate,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
class H2EventStorageBackend(ledgerEndCache: LedgerEndCache)
|
||||
class H2EventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
|
||||
extends EventStorageBackendTemplate(
|
||||
queryStrategy = H2QueryStrategy,
|
||||
eventStrategy = H2EventStrategy,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterning,
|
||||
participantAllDivulgedContractsPrunedUpToInclusive =
|
||||
ParameterStorageBackendTemplate.participantAllDivulgedContractsPrunedUpToInclusive,
|
||||
) {
|
||||
|
@ -22,6 +22,7 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackendFactory,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
object H2StorageBackendFactory extends StorageBackendFactory with CommonStorageBackendFactory {
|
||||
|
||||
@ -34,16 +35,22 @@ object H2StorageBackendFactory extends StorageBackendFactory with CommonStorageB
|
||||
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
|
||||
H2DeduplicationStorageBackend
|
||||
|
||||
override val createCompletionStorageBackend: CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(H2QueryStrategy)
|
||||
override def createCompletionStorageBackend(
|
||||
stringInterning: StringInterning
|
||||
): CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(H2QueryStrategy, stringInterning)
|
||||
|
||||
override def createContractStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): ContractStorageBackend =
|
||||
new H2ContractStorageBackend(ledgerEndCache)
|
||||
new H2ContractStorageBackend(ledgerEndCache, stringInterning)
|
||||
|
||||
override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend =
|
||||
new H2EventStorageBackend(ledgerEndCache)
|
||||
override def createEventStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): EventStorageBackend =
|
||||
new H2EventStorageBackend(ledgerEndCache, stringInterning)
|
||||
|
||||
override val createDataSourceStorageBackend: DataSourceStorageBackend =
|
||||
H2DataSourceStorageBackend
|
||||
|
@ -13,12 +13,14 @@ import com.daml.platform.store.backend.common.{
|
||||
ParameterStorageBackendTemplate,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
class OracleEventStorageBackend(ledgerEndCache: LedgerEndCache)
|
||||
class OracleEventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
|
||||
extends EventStorageBackendTemplate(
|
||||
eventStrategy = OracleEventStrategy,
|
||||
queryStrategy = OracleQueryStrategy,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterning,
|
||||
participantAllDivulgedContractsPrunedUpToInclusive =
|
||||
ParameterStorageBackendTemplate.participantAllDivulgedContractsPrunedUpToInclusive,
|
||||
) {
|
||||
|
@ -23,6 +23,7 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackendFactory,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
object OracleStorageBackendFactory extends StorageBackendFactory with CommonStorageBackendFactory {
|
||||
|
||||
@ -35,16 +36,22 @@ object OracleStorageBackendFactory extends StorageBackendFactory with CommonStor
|
||||
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
|
||||
OracleDeduplicationStorageBackend
|
||||
|
||||
override def createCompletionStorageBackend: CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(OracleQueryStrategy)
|
||||
override def createCompletionStorageBackend(
|
||||
stringInterning: StringInterning
|
||||
): CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(OracleQueryStrategy, stringInterning)
|
||||
|
||||
override def createContractStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): ContractStorageBackend =
|
||||
new ContractStorageBackendTemplate(OracleQueryStrategy, ledgerEndCache)
|
||||
new ContractStorageBackendTemplate(OracleQueryStrategy, ledgerEndCache, stringInterning)
|
||||
|
||||
override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend =
|
||||
new OracleEventStorageBackend(ledgerEndCache)
|
||||
override def createEventStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): EventStorageBackend =
|
||||
new OracleEventStorageBackend(ledgerEndCache, stringInterning)
|
||||
|
||||
override val createDataSourceStorageBackend: DataSourceStorageBackend =
|
||||
OracleDataSourceStorageBackend
|
||||
|
@ -13,12 +13,14 @@ import com.daml.platform.store.backend.common.{
|
||||
ParameterStorageBackendTemplate,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
class PostgresEventStorageBackend(ledgerEndCache: LedgerEndCache)
|
||||
class PostgresEventStorageBackend(ledgerEndCache: LedgerEndCache, stringInterning: StringInterning)
|
||||
extends EventStorageBackendTemplate(
|
||||
eventStrategy = PostgresEventStrategy,
|
||||
queryStrategy = PostgresQueryStrategy,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterning,
|
||||
participantAllDivulgedContractsPrunedUpToInclusive =
|
||||
ParameterStorageBackendTemplate.participantAllDivulgedContractsPrunedUpToInclusive,
|
||||
) {
|
||||
|
@ -23,6 +23,7 @@ import com.daml.platform.store.backend.{
|
||||
StorageBackendFactory,
|
||||
}
|
||||
import com.daml.platform.store.cache.LedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterning
|
||||
|
||||
object PostgresStorageBackendFactory
|
||||
extends StorageBackendFactory
|
||||
@ -37,16 +38,22 @@ object PostgresStorageBackendFactory
|
||||
override val createDeduplicationStorageBackend: DeduplicationStorageBackend =
|
||||
PostgresDeduplicationStorageBackend
|
||||
|
||||
override val createCompletionStorageBackend: CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(PostgresQueryStrategy)
|
||||
override def createCompletionStorageBackend(
|
||||
stringInterning: StringInterning
|
||||
): CompletionStorageBackend =
|
||||
new CompletionStorageBackendTemplate(PostgresQueryStrategy, stringInterning)
|
||||
|
||||
override def createContractStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): ContractStorageBackend =
|
||||
new ContractStorageBackendTemplate(PostgresQueryStrategy, ledgerEndCache)
|
||||
new ContractStorageBackendTemplate(PostgresQueryStrategy, ledgerEndCache, stringInterning)
|
||||
|
||||
override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend =
|
||||
new PostgresEventStorageBackend(ledgerEndCache)
|
||||
override def createEventStorageBackend(
|
||||
ledgerEndCache: LedgerEndCache,
|
||||
stringInterning: StringInterning,
|
||||
): EventStorageBackend =
|
||||
new PostgresEventStorageBackend(ledgerEndCache, stringInterning)
|
||||
|
||||
override val createDataSourceStorageBackend: DataSourceStorageBackend =
|
||||
PostgresDataSourceStorageBackend
|
||||
|
@ -11,6 +11,7 @@ import com.daml.platform.store.backend.h2.H2StorageBackendFactory
|
||||
import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory
|
||||
import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import com.daml.platform.store.interning.InterningStringInterning
|
||||
import com.daml.testing.oracle.OracleAroundAll
|
||||
import com.daml.testing.postgresql.PostgresAroundAll
|
||||
import org.scalatest.Suite
|
||||
@ -90,6 +91,7 @@ case class TestBackend(
|
||||
object TestBackend {
|
||||
def apply(storageBackendFactory: StorageBackendFactory): TestBackend = {
|
||||
val ledgerEndCache = MutableLedgerEndCache()
|
||||
val stringInterning = new InterningStringInterning
|
||||
TestBackend(
|
||||
ingestion = storageBackendFactory.createIngestionStorageBackend,
|
||||
parameter = storageBackendFactory.createParameterStorageBackend,
|
||||
@ -97,9 +99,10 @@ object TestBackend {
|
||||
party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache),
|
||||
packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache),
|
||||
deduplication = storageBackendFactory.createDeduplicationStorageBackend,
|
||||
completion = storageBackendFactory.createCompletionStorageBackend,
|
||||
contract = storageBackendFactory.createContractStorageBackend(ledgerEndCache),
|
||||
event = storageBackendFactory.createEventStorageBackend(ledgerEndCache),
|
||||
completion = storageBackendFactory.createCompletionStorageBackend(stringInterning),
|
||||
contract =
|
||||
storageBackendFactory.createContractStorageBackend(ledgerEndCache, stringInterning),
|
||||
event = storageBackendFactory.createEventStorageBackend(ledgerEndCache, stringInterning),
|
||||
dataSource = storageBackendFactory.createDataSourceStorageBackend,
|
||||
dbLock = storageBackendFactory.createDBLockStorageBackend,
|
||||
integrity = storageBackendFactory.createIntegrityStorageBackend,
|
||||
|
@ -100,8 +100,9 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
|
||||
enricher = Some(new ValueEnricher(new Engine())),
|
||||
participantId = JdbcLedgerDaoBackend.TestParticipantIdRef,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterningView,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -82,8 +82,9 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
|
||||
enricher = None,
|
||||
participantId = participantId,
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
stringInterning = stringInterningView,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,64 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.store.interning
|
||||
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Ref.Party
|
||||
|
||||
class InterningStringInterning extends StringInterning {
|
||||
private var idToString: Map[Int, String] = Map.empty
|
||||
private var stringToId: Map[String, Int] = Map.empty
|
||||
private var lastId: Int = 0
|
||||
|
||||
private val rawStringInterning: StringInterningAccessor[String] =
|
||||
new StringInterningAccessor[String] {
|
||||
override def internalize(t: String): Int = tryInternalize(t).get
|
||||
|
||||
override def tryInternalize(t: String): Option[Int] = synchronized {
|
||||
stringToId.get(t) match {
|
||||
case Some(id) => Some(id)
|
||||
case None =>
|
||||
lastId += 1
|
||||
idToString = idToString + (lastId -> t)
|
||||
stringToId = stringToId + (t -> lastId)
|
||||
Some(lastId)
|
||||
}
|
||||
}
|
||||
|
||||
override def externalize(id: Int): String = tryExternalize(id).get
|
||||
|
||||
override def tryExternalize(id: Int): Option[String] = idToString.get(id)
|
||||
|
||||
}
|
||||
|
||||
override val templateId: StringInterningDomain[Ref.Identifier] =
|
||||
new StringInterningDomain[Ref.Identifier] {
|
||||
override val unsafe: StringInterningAccessor[String] = rawStringInterning
|
||||
|
||||
override def internalize(t: Ref.Identifier): Int = tryInternalize(t).get
|
||||
|
||||
override def tryInternalize(t: Ref.Identifier): Option[Int] =
|
||||
rawStringInterning.tryInternalize(t.toString)
|
||||
|
||||
override def externalize(id: Int): Ref.Identifier = tryExternalize(id).get
|
||||
|
||||
override def tryExternalize(id: Int): Option[Ref.Identifier] =
|
||||
rawStringInterning.tryExternalize(id).map(Ref.Identifier.assertFromString)
|
||||
}
|
||||
|
||||
override def party: StringInterningDomain[Party] =
|
||||
new StringInterningDomain[Party] {
|
||||
override val unsafe: StringInterningAccessor[String] = rawStringInterning
|
||||
|
||||
override def internalize(t: Party): Int = tryInternalize(t).get
|
||||
|
||||
override def tryInternalize(t: Party): Option[Int] =
|
||||
rawStringInterning.tryInternalize(t.toString)
|
||||
|
||||
override def externalize(id: Int): Party = tryExternalize(id).get
|
||||
|
||||
override def tryExternalize(id: Int): Option[Party] =
|
||||
rawStringInterning.tryExternalize(id).map(Party.assertFromString)
|
||||
}
|
||||
}
|
@ -32,6 +32,7 @@ import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao, Ledge
|
||||
import com.daml.platform.store.{DbType, LfValueTranslationCache}
|
||||
import com.daml.platform.store.backend.StorageBackendFactory
|
||||
import com.daml.platform.store.cache.MutableLedgerEndCache
|
||||
import com.daml.platform.store.interning.StringInterningView
|
||||
import com.daml.platform.testing.LogCollector
|
||||
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
|
||||
import com.daml.timer.RetryStrategy
|
||||
@ -226,10 +227,12 @@ class RecoveringIndexerIntegrationSpec
|
||||
}
|
||||
}
|
||||
|
||||
// TODO we probably do not need a full dao for this purpose: refactoring with direct usage of StorageBackend?
|
||||
private def dao(implicit
|
||||
loggingContext: LoggingContext
|
||||
): ResourceOwner[(LedgerReadDao, MutableLedgerEndCache)] = {
|
||||
val mutableLedgerEndCache = MutableLedgerEndCache()
|
||||
val stringInterning = new StringInterningView((_, _) => _ => Future.successful(Nil)) // not used
|
||||
val jdbcUrl =
|
||||
s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false"
|
||||
val errorFactories: ErrorFactories = mock[ErrorFactories]
|
||||
@ -256,6 +259,7 @@ class RecoveringIndexerIntegrationSpec
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = mutableLedgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
stringInterning = stringInterning,
|
||||
) -> mutableLedgerEndCache
|
||||
)
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ private[sandbox] object SqlLedger {
|
||||
stringInterningStorageBackend.loadStringInterningEntries(fromExclusive, toInclusive)
|
||||
}
|
||||
)
|
||||
dao = ledgerDaoOwner(
|
||||
dao = ledgerDao(
|
||||
dbDispatcher,
|
||||
storageBackendFactory,
|
||||
ledgerEndCache,
|
||||
@ -291,7 +291,7 @@ private[sandbox] object SqlLedger {
|
||||
}
|
||||
}
|
||||
|
||||
private def ledgerDaoOwner(
|
||||
private def ledgerDao(
|
||||
dbDispatcher: DbDispatcher,
|
||||
storageBackendFactory: StorageBackendFactory,
|
||||
ledgerEndCache: MutableLedgerEndCache,
|
||||
@ -326,6 +326,7 @@ private[sandbox] object SqlLedger {
|
||||
storageBackendFactory = storageBackendFactory,
|
||||
ledgerEndCache = ledgerEndCache,
|
||||
errorFactories = errorFactories,
|
||||
stringInterning = stringInterningView,
|
||||
)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user