Makes sandbox-classic compression configurable (#9770)

* Adds configuration parameter and wiring
* Only append only schema is supported (for mutating schema it has no effect)

changelog_begin
changelog_end
This commit is contained in:
Marton Nagy 2021-05-21 14:12:20 +02:00 committed by GitHub
parent b8afa02e12
commit 6f20d78056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 15 deletions

View File

@ -1020,6 +1020,7 @@ private[platform] object JdbcLedgerDao {
lfValueTranslationCache,
enricher = enricher,
participantId = participantId,
compressionStrategy = CompressionStrategy.none(metrics), // not needed
).map(new MeteredLedgerReadDao(_, metrics))
}
@ -1051,6 +1052,7 @@ private[platform] object JdbcLedgerDao {
if (dbType.supportsAsynchronousCommits) jdbcAsyncCommitMode else DbType.SynchronousCommit,
enricher = enricher,
participantId = participantId,
compressionStrategy = CompressionStrategy.none(metrics), // not needed
).map(new MeteredLedgerDao(_, metrics))
}
@ -1066,6 +1068,7 @@ private[platform] object JdbcLedgerDao {
validatePartyAllocation: Boolean = false,
enricher: Option[ValueEnricher],
participantId: v1.ParticipantId,
compressionStrategy: CompressionStrategy,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
owner(
@ -1081,6 +1084,7 @@ private[platform] object JdbcLedgerDao {
validatePartyAllocation,
enricher = enricher,
participantId = participantId,
compressionStrategy = compressionStrategy,
).map(new MeteredLedgerDao(_, metrics))
}
@ -1089,6 +1093,7 @@ private[platform] object JdbcLedgerDao {
participantId: v1.ParticipantId,
lfValueTranslationCache: LfValueTranslationCache.Cache,
metrics: Metrics,
compressionStrategy: CompressionStrategy,
): SequentialWriteDao =
SequentialWriteDaoImpl(
storageBackend = StorageBackend.of(dbType),
@ -1100,9 +1105,7 @@ private[platform] object JdbcLedgerDao {
enricherO = None,
loadPackage = (_, _) => Future.successful(None),
),
compressionStrategy = CompressionStrategy.none(
metrics
), // TODO append-only: is it ok, to turn it off completely for sandbox-classic?
compressionStrategy = compressionStrategy,
),
)
@ -1144,6 +1147,7 @@ private[platform] object JdbcLedgerDao {
jdbcAsyncCommitMode: DbType.AsyncCommitMode = DbType.SynchronousCommit,
enricher: Option[ValueEnricher],
participantId: v1.ParticipantId,
compressionStrategy: CompressionStrategy,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] =
for {
dbDispatcher <- DbDispatcher.owner(
@ -1165,7 +1169,13 @@ private[platform] object JdbcLedgerDao {
lfValueTranslationCache,
validatePartyAllocation,
enricher,
sequentialWriteDao(dbType, participantId, lfValueTranslationCache, metrics),
sequentialWriteDao(
dbType,
participantId,
lfValueTranslationCache,
metrics,
compressionStrategy,
),
participantId,
)

View File

@ -335,6 +335,7 @@ final class SandboxServer(
engine = engine,
validatePartyAllocation = !config.implicitPartyAllocation,
enableAppendOnlySchema = config.enableAppendOnlySchema,
enableCompression = config.enableCompression,
)
case None =>
SandboxIndexAndWriteService.inMemory(

View File

@ -62,6 +62,7 @@ private[sandbox] object SandboxIndexAndWriteService {
lfValueTranslationCache: LfValueTranslationCache.Cache,
engine: Engine,
enableAppendOnlySchema: Boolean,
enableCompression: Boolean,
validatePartyAllocation: Boolean = false,
)(implicit
mat: Materializer,
@ -88,6 +89,7 @@ private[sandbox] object SandboxIndexAndWriteService {
engine = engine,
validatePartyAllocation = validatePartyAllocation,
enableAppendOnlySchema = enableAppendOnlySchema,
enableCompression = enableCompression,
).flatMap(ledger => owner(MeteredLedger(ledger, metrics), participantId, timeProvider))
def inMemory(

View File

@ -35,6 +35,7 @@ import com.daml.platform.sandbox.config.LedgerName
import com.daml.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump
import com.daml.platform.sandbox.stores.ledger.sql.SqlLedger._
import com.daml.platform.sandbox.stores.ledger.{Ledger, SandboxOffset}
import com.daml.platform.store.appendonlydao.events.CompressionStrategy
import com.daml.platform.store.cache.TranslationCacheBackedContractStore
import com.daml.platform.store.dao.{LedgerDao, LedgerWriteDao}
import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry}
@ -84,6 +85,7 @@ private[sandbox] object SqlLedger {
engine: Engine,
validatePartyAllocation: Boolean = false,
enableAppendOnlySchema: Boolean = false,
enableCompression: Boolean = false,
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[Ledger] {
@ -239,18 +241,21 @@ private[sandbox] object SqlLedger {
): ResourceOwner[LedgerDao] =
if (enableAppendOnlySchema)
com.daml.platform.store.appendonlydao.JdbcLedgerDao.validatingWriteOwner(
serverRole,
jdbcUrl,
databaseConnectionPoolSize,
databaseConnectionTimeout,
eventsPageSize,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
validatePartyAllocation,
Some(new ValueEnricher(engine)),
com.daml.ledger.participant.state.v1.ParticipantId
serverRole = serverRole,
jdbcUrl = jdbcUrl,
connectionPoolSize = databaseConnectionPoolSize,
connectionTimeout = databaseConnectionTimeout,
eventsPageSize = eventsPageSize,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
validatePartyAllocation = validatePartyAllocation,
enricher = Some(new ValueEnricher(engine)),
participantId = com.daml.ledger.participant.state.v1.ParticipantId
.assertFromString(participantId.toString),
compressionStrategy =
if (enableCompression) CompressionStrategy.allGZIP(metrics)
else CompressionStrategy.none(metrics),
)
else
com.daml.platform.store.dao.JdbcLedgerDao.validatingWriteOwner(

View File

@ -291,6 +291,7 @@ class CommonCliBase(name: LedgerName) {
s"The timeout used for requests by management services of the Ledger API. The default is set to ${SandboxConfig.DefaultManagementServiceTimeout.getSeconds} seconds."
)
// TODO append-only: cleanup
opt[Unit]("enable-append-only-schema")
.hidden()
.optional()
@ -299,6 +300,15 @@ class CommonCliBase(name: LedgerName) {
s"Turns on append-only schema support."
)
// TODO append-only: cleanup
opt[Unit]("enable-compression")
.hidden()
.optional()
.action((_, config) => config.copy(enableCompression = true))
.text(
s"By default compression is off, his switch enables it. This has only effect for append-only ingestion." // TODO append-only: fix description
)
help("help").text("Print the usage text")
checkConfig(c => {

View File

@ -53,6 +53,7 @@ final case class SandboxConfig(
managementServiceTimeout: Duration,
sqlStartMode: Option[PostgresStartupMode],
enableAppendOnlySchema: Boolean,
enableCompression: Boolean,
)
object SandboxConfig {
@ -110,6 +111,7 @@ object SandboxConfig {
managementServiceTimeout = DefaultManagementServiceTimeout,
sqlStartMode = Some(DefaultSqlStartupMode),
enableAppendOnlySchema = false,
enableCompression = false,
)
sealed abstract class EngineMode extends Product with Serializable