Metering Aggregation [DPP-818] (#12723)

* Add support for aggregated transaction metering

changelog_begin
Support added for aggregated transaction metering
changelog_end

* Update with review comments
This commit is contained in:
Simon Maxen 2022-02-14 11:36:23 +00:00 committed by GitHub
parent c2a6397751
commit cc4c06c640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1005 additions and 76 deletions

View File

@ -515,6 +515,11 @@ final class Metrics(val registry: MetricRegistry) {
"loadStringInterningEntries"
)
val meteringAggregator: DatabaseMetrics = createDbMetrics("metering_aggregator")
val initializeMeteringAggregator: DatabaseMetrics = createDbMetrics(
"initialize_metering_aggregator"
)
object translation {
private val Prefix: MetricName = db.Prefix :+ "translation"
val cache = new CacheMetrics(registry, Prefix :+ "cache")

View File

@ -1 +1 @@
6b18e00f6dc6d79123a115a080b4092fc94350cf582f9373e122837f7ef741e7
40d6f8974169a2d8702b31a9edbe2e22527aa837f0f4057922814631f40510ef

View File

@ -538,3 +538,18 @@ CREATE TABLE transaction_metering (
);
CREATE INDEX transaction_metering_ledger_offset ON transaction_metering(ledger_offset);
CREATE TABLE metering_parameters (
ledger_metering_end VARCHAR,
ledger_metering_timestamp BIGINT NOT NULL
);
CREATE TABLE participant_metering (
application_id VARCHAR NOT NULL,
from_timestamp BIGINT NOT NULL,
to_timestamp BIGINT NOT NULL,
action_count INTEGER NOT NULL,
ledger_offset VARCHAR NOT NULL
);
CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);

View File

@ -0,0 +1 @@
45e3500305c86bf7928f78ba6b97572cf8f900e18805e181b44d7837fe0c3c09

View File

@ -0,0 +1,24 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- Transaction metering alterations
ALTER TABLE transaction_metering MODIFY application_id VARCHAR2(4000);
-- Create metering parameters
CREATE TABLE metering_parameters (
ledger_metering_end VARCHAR2(4000),
ledger_metering_timestamp NUMBER NOT NULL
);
-- Create participant metering
CREATE TABLE participant_metering (
application_id VARCHAR2(4000) NOT NULL,
from_timestamp NUMBER NOT NULL,
to_timestamp NUMBER NOT NULL,
action_count NUMBER NOT NULL,
ledger_offset VARCHAR2(4000) NOT NULL
);
CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);

View File

@ -0,0 +1 @@
08ea4e020bc4ed309d0345d1d0b9e4cbe4f25edc79d3415a05538cacdedb1470

View File

@ -0,0 +1,35 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- Transaction metering alterations
SELECT 'Harmonise the transaction_metering columns with other examples in the db';
ALTER TABLE transaction_metering ALTER application_id TYPE TEXT;
ALTER TABLE transaction_metering ALTER ledger_offset TYPE TEXT;
-- Parameter alterations
SELECT 'Add Metering Parameters: Creating table...';
-- Create metering parameters
CREATE TABLE metering_parameters (
ledger_metering_end TEXT,
ledger_metering_timestamp BIGINT NOT NULL
);
-- Create participant metering
SELECT 'Add Participant Metering: Creating table...';
CREATE TABLE participant_metering (
application_id TEXT NOT NULL,
from_timestamp BIGINT NOT NULL,
to_timestamp BIGINT NOT NULL,
action_count INTEGER NOT NULL,
ledger_offset TEXT NOT NULL
);
SELECT 'Add Participant Metering: Creating indexes...';
CREATE UNIQUE INDEX participant_metering_from_to_application ON participant_metering(from_timestamp, to_timestamp, application_id);
SELECT 'Add Participant Metering: Done.';

View File

@ -38,7 +38,9 @@ object JdbcIndexer {
val factory = StorageBackendFactory.of(DbType.jdbcType(config.jdbcUrl))
val dataSourceStorageBackend = factory.createDataSourceStorageBackend
val ingestionStorageBackend = factory.createIngestionStorageBackend
val meteringStoreBackend = factory.createMeteringStorageWriteBackend
val parameterStorageBackend = factory.createParameterStorageBackend
val meteringParameterStorageBackend = factory.createMeteringParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
@ -92,9 +94,16 @@ object JdbcIndexer {
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
meteringAggregator = new MeteringAggregator.Owner(
meteringStore = meteringStoreBackend,
meteringParameterStore = meteringParameterStorageBackend,
parameterStore = parameterStorageBackend,
metrics = metrics,
).apply,
mat = materializer,
readService = readService,
)
indexer
}

View File

@ -0,0 +1,204 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore
import com.daml.ledger.participant.state.index.v2.MeteringStore.ParticipantMetering
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.indexer.MeteringAggregator.{toOffsetDateTime, toTimestamp}
import com.daml.platform.store.appendonlydao.SqlExecutor
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.{
MeteringParameterStorageBackend,
MeteringStorageWriteBackend,
ParameterStorageBackend,
}
import java.sql.Connection
import java.time.temporal.ChronoUnit
import java.time.{OffsetDateTime, ZoneOffset}
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object MeteringAggregator {
private val logger = ContextualizedLogger.get(getClass)
class Owner(
meteringStore: MeteringStorageWriteBackend,
parameterStore: ParameterStorageBackend,
meteringParameterStore: MeteringParameterStorageBackend,
metrics: Metrics,
period: FiniteDuration = 6.minutes,
maxTaskDuration: FiniteDuration = 6.hours,
) {
private[platform] def apply(
sqlExecutor: SqlExecutor
)(implicit loggingContext: LoggingContext): ResourceOwner[Unit] = {
val aggregator = new MeteringAggregator(
meteringStore,
parameterStore,
meteringParameterStore,
metrics,
sqlExecutor,
)
for {
_ <- ResourceOwner.forFuture(() => aggregator.initialize())
_ <- ResourceOwner.forTimer(() => new Timer()).map { timer =>
timer.scheduleAtFixedRate(
new TimerTask {
override def run(): Unit = {
Try {
Await.ready(aggregator.run(), maxTaskDuration)
} match {
case Success(_) => ()
case Failure(e) =>
logger.error(s"Metering not aggregated after ${maxTaskDuration}", e)
}
}
},
period.toMillis,
period.toMillis,
)
}
} yield ()
}
}
private def toTimestamp(dateTime: OffsetDateTime): Timestamp =
Timestamp.assertFromInstant(dateTime.toInstant)
private def toOffsetDateTime(timestamp: Timestamp): OffsetDateTime =
OffsetDateTime.ofInstant(timestamp.toInstant, ZoneOffset.UTC)
}
class MeteringAggregator(
meteringStore: MeteringStorageWriteBackend,
parameterStore: ParameterStorageBackend,
meteringParameterStore: MeteringParameterStorageBackend,
metrics: Metrics,
sqlExecutor: SqlExecutor,
clock: () => Timestamp = () => Timestamp.now(),
)(implicit loggingContext: LoggingContext) {
private val parasitic: ExecutionContext = ExecutionContext.parasitic
private val logger = ContextualizedLogger.get(getClass)
private[platform] def initialize(): Future[Unit] = {
val initTimestamp = toOffsetDateTime(clock()).truncatedTo(ChronoUnit.HOURS).minusHours(1)
val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, toTimestamp(initTimestamp))
sqlExecutor.executeSql(metrics.daml.index.db.initializeMeteringAggregator) {
meteringParameterStore.initializeLedgerMeteringEnd(initLedgerMeteringEnd)
}
}
private[platform] def run(): Future[Unit] = {
val future = sqlExecutor.executeSql(metrics.daml.index.db.meteringAggregator) { conn =>
val nowUtcTime = toOffsetDateTime(clock())
val lastLedgerMeteringEnd = getLedgerMeteringEnd(conn)
val startUtcTime: OffsetDateTime = toOffsetDateTime(lastLedgerMeteringEnd.timestamp)
val endUtcTime = startUtcTime.plusHours(1)
if (nowUtcTime.isAfter(endUtcTime)) {
val toEndTime = toTimestamp(endUtcTime)
val ingestedLedgerEnd = parameterStore.ledgerEnd(conn).lastOffset
val maybeMaxOffset =
meteringStore.transactionMeteringMaxOffset(lastLedgerMeteringEnd.offset, toEndTime)(conn)
val (
periodIngested, // This is true if the time period is closed fully ingested
hasMetering, // This is true if there are transaction_metering records to aggregate
toOffsetEnd, // This is the 'to' offset for the period being aggregated
) = maybeMaxOffset match {
case Some(offset) => (offset <= ingestedLedgerEnd, true, offset)
case None => (true, false, lastLedgerMeteringEnd.offset)
}
if (periodIngested) {
Some(
aggregate(
conn = conn,
lastLedgerMeteringEnd = lastLedgerMeteringEnd,
thisLedgerMeteringEnd = LedgerMeteringEnd(toOffsetEnd, toEndTime),
hasMetering = hasMetering,
)
)
} else {
logger.info("Not all transaction metering for aggregation time period is yet ingested")
None
}
} else {
None
}
}
future.onComplete({
case Success(None) => logger.info("No transaction metering aggregation required")
case Success(Some(lme)) =>
logger.info(s"Aggregating transaction metering completed up to $lme")
case Failure(e) => logger.error("Failed to aggregate transaction metering", e)
})(parasitic)
future.map(_ => ())(parasitic)
}
private def aggregate(
conn: Connection,
lastLedgerMeteringEnd: LedgerMeteringEnd,
thisLedgerMeteringEnd: LedgerMeteringEnd,
hasMetering: Boolean,
): LedgerMeteringEnd = {
logger.info(s"Aggregating transaction metering for $thisLedgerMeteringEnd")
if (hasMetering) {
populateParticipantMetering(conn, lastLedgerMeteringEnd, thisLedgerMeteringEnd)
}
meteringParameterStore.updateLedgerMeteringEnd(thisLedgerMeteringEnd)(conn)
thisLedgerMeteringEnd
}
private def getLedgerMeteringEnd(conn: Connection): LedgerMeteringEnd =
meteringParameterStore.ledgerMeteringEnd(conn).getOrElse {
throw new IllegalStateException("Ledger metering is not initialized")
}
private def populateParticipantMetering(
conn: Connection,
lastLedgerMeteringEnd: LedgerMeteringEnd,
thisLedgerMeteringEnd: LedgerMeteringEnd,
): Unit = {
val transactionMetering: Seq[MeteringStore.TransactionMetering] =
meteringStore.transactionMetering(lastLedgerMeteringEnd.offset, thisLedgerMeteringEnd.offset)(
conn
)
val participantMetering = transactionMetering
.groupBy(_.applicationId)
.map { case (applicationId, metering) =>
ParticipantMetering(
applicationId = applicationId,
from = lastLedgerMeteringEnd.timestamp,
to = thisLedgerMeteringEnd.timestamp,
actionCount = metering.map(_.actionCount).sum,
ledgerOffset = metering.map(_.ledgerOffset).max,
)
}
.toVector
meteringStore.insertParticipantMetering(participantMetering)(conn)
}
}

View File

@ -3,18 +3,15 @@
package com.daml.platform.indexer.parallel
import java.util.Timer
import java.util.concurrent.Executors
import akka.stream.{KillSwitch, Materializer}
import com.daml.ledger.participant.state.v2.ReadService
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.Indexer
import com.daml.platform.indexer.ha.{HaConfig, HaCoordinator, Handle, NoopHaCoordinator}
import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.indexer.Indexer
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{
@ -25,10 +22,12 @@ import com.daml.platform.store.backend.{
import com.daml.platform.store.interning.StringInterningView
import com.google.common.util.concurrent.ThreadFactoryBuilder
import java.util.Timer
import java.util.concurrent.Executors
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
object ParallelIndexerFactory {
@ -45,6 +44,7 @@ object ParallelIndexerFactory {
initializeParallelIngestion: InitializeParallelIngestion,
parallelIndexerSubscription: ParallelIndexerSubscription[_],
stringInterningStorageBackend: StringInterningStorageBackend,
meteringAggregator: DbDispatcher => ResourceOwner[Unit],
mat: Materializer,
readService: ReadService,
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] =
@ -95,24 +95,27 @@ object ParallelIndexerFactory {
implicit val ec: ExecutionContext = resourceContext.executionContext
haCoordinator.protectedExecution(connectionInitializer =>
initializeHandle(
DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = dataSourceStorageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize =
ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
for {
dbDispatcher <- DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = dataSourceStorageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize =
ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
_ <- meteringAggregator(dbDispatcher)
} yield dbDispatcher
) { dbDispatcher =>
val stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>

View File

@ -8,6 +8,7 @@ import anorm._
import com.daml.ledger.offset.Offset
import com.daml.lf.crypto.Hash
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId
import com.daml.lf.value.Value
import spray.json.DefaultJsonProtocol._
@ -354,6 +355,11 @@ private[platform] object Conversions {
def offset(name: String): RowParser[Offset] =
SqlParser.get[String](name).map(v => Offset.fromHexString(Ref.HexString.assertFromString(v)))
def offset(position: Int): RowParser[Offset] =
SqlParser
.get[String](position)
.map(v => Offset.fromHexString(Ref.HexString.assertFromString(v)))
implicit val columnToOffset: Column[Offset] =
Column.nonNull((value: Any, meta) =>
Column
@ -363,9 +369,17 @@ private[platform] object Conversions {
// Timestamp
implicit object TimestampToStatement extends ToStatement[Timestamp] {
override def set(s: PreparedStatement, index: Int, v: Timestamp): Unit =
s.setLong(index, v.micros)
}
def timestampFromMicros(name: String): RowParser[com.daml.lf.data.Time.Timestamp] =
SqlParser.get[Long](name).map(com.daml.lf.data.Time.Timestamp.assertFromLong)
def timestampFromMicros(position: Int): RowParser[com.daml.lf.data.Time.Timestamp] =
SqlParser.get[Long](position).map(com.daml.lf.data.Time.Timestamp.assertFromLong)
// Hash
implicit object HashToStatement extends ToStatement[Hash] {

View File

@ -26,7 +26,8 @@ private[platform] final class DbDispatcher private (
overallWaitTimer: Timer,
overallExecutionTimer: Timer,
)(implicit loggingContext: LoggingContext)
extends ReportsHealth {
extends SqlExecutor
with ReportsHealth {
private val logger = ContextualizedLogger.get(this.getClass)
private val executionContext = ExecutionContext.fromExecutor(

View File

@ -0,0 +1,16 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.store.appendonlydao
import com.daml.logging.LoggingContext
import com.daml.metrics.DatabaseMetrics
import java.sql.Connection
import scala.concurrent.Future
private[platform] trait SqlExecutor {
def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
loggingContext: LoggingContext
): Future[T]
}

View File

@ -7,7 +7,10 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails, User,
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.MeteringStore.{
ParticipantMetering,
TransactionMetering,
}
import com.daml.ledger.participant.state.index.v2.PackageDetails
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{ApplicationId, UserId}
@ -18,6 +21,7 @@ import com.daml.platform
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw}
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState
@ -93,10 +97,13 @@ trait ParameterStorageBackend {
/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit
def prunedUpToInclusive(connection: Connection): Option[Offset]
def updatePrunedAllDivulgedContractsUpToInclusive(
prunedUpToInclusive: Offset
)(connection: Connection): Unit
def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset]
@ -106,9 +113,9 @@ trait ParameterStorageBackend {
* - If no identity parameters are stored, then they are set to the given value.
* - If identity parameters are stored, then they are compared to the given ones.
* - Ledger identity parameters are written at most once, and are never overwritten.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* This method is NOT safe to call concurrently.
* This method is NOT safe to call concurrently.
*/
def initializeParameters(params: ParameterStorageBackend.IdentityParams)(connection: Connection)(
implicit loggingContext: LoggingContext
@ -118,6 +125,24 @@ trait ParameterStorageBackend {
def ledgerIdentity(connection: Connection): Option[ParameterStorageBackend.IdentityParams]
}
object MeteringParameterStorageBackend {
case class LedgerMeteringEnd(offset: Offset, timestamp: Timestamp)
}
trait MeteringParameterStorageBackend {
/** Initialize the ledger metering end parameters if unset */
def initializeLedgerMeteringEnd(init: LedgerMeteringEnd)(connection: Connection)(implicit
loggingContext: LoggingContext
): Unit
/** The timestamp and offset for which billable metering is available */
def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd]
/** Update the timestamp and offset for which billable metering is available */
def updateLedgerMeteringEnd(ledgerMeteringEnd: LedgerMeteringEnd)(connection: Connection): Unit
}
object ParameterStorageBackend {
case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int) {
def lastOffsetOption: Option[Offset] =
@ -128,6 +153,7 @@ object ParameterStorageBackend {
ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0)
}
case class IdentityParams(ledgerId: LedgerId, participantId: ParticipantId)
}
trait ConfigurationStorageBackend {
@ -424,7 +450,7 @@ object UserManagementStorageBackend {
case class DbUserRight(domainRight: UserRight, grantedAt: Long)
}
trait MeteringStorageBackend {
trait MeteringStorageReadBackend {
def transactionMetering(
from: Timestamp,
@ -432,3 +458,30 @@ trait MeteringStorageBackend {
applicationId: Option[ApplicationId],
)(connection: Connection): Vector[TransactionMetering]
}
trait MeteringStorageWriteBackend {
/** This method will return the maximum offset of the transaction_metering record
* which has an offset greater than the from offset and a timestamp prior to the
* to timestamp, if any.
*
* Note that the offset returned may not have been fully ingested. This is to allow the metering to wait if there
* are still un-fully ingested records withing the time window.
*/
def transactionMeteringMaxOffset(from: Offset, to: Timestamp)(
connection: Connection
): Option[Offset]
/** This method will return all transaction metering records between the from offset (exclusive)
* and the to offset (inclusive).
*/
def transactionMetering(from: Offset, to: Offset)(
connection: Connection
): Vector[TransactionMetering]
def insertParticipantMetering(metering: Vector[ParticipantMetering])(connection: Connection): Unit
/** Test Only - will be removed once reporting can be based if participant metering */
def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering]
}

View File

@ -13,6 +13,7 @@ import com.daml.platform.store.interning.StringInterning
trait StorageBackendFactory {
def createIngestionStorageBackend: IngestionStorageBackend[_]
def createParameterStorageBackend: ParameterStorageBackend
def createMeteringParameterStorageBackend: MeteringParameterStorageBackend
def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
@ -31,7 +32,8 @@ trait StorageBackendFactory {
def createResetStorageBackend: ResetStorageBackend
def createStringInterningStorageBackend: StringInterningStorageBackend
def createUserManagementStorageBackend: UserManagementStorageBackend
def createMeteringStorageBackend(ledgerEndCache: LedgerEndCache): MeteringStorageBackend
def createMeteringStorageReadBackend(ledgerEndCache: LedgerEndCache): MeteringStorageReadBackend
def createMeteringStorageWriteBackend: MeteringStorageWriteBackend
final def readStorageBackend(
ledgerEndCache: LedgerEndCache,
@ -44,7 +46,7 @@ trait StorageBackendFactory {
completionStorageBackend = createCompletionStorageBackend(stringInterning),
contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning),
eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning),
meteringStorageBackend = createMeteringStorageBackend(ledgerEndCache),
meteringStorageBackend = createMeteringStorageReadBackend(ledgerEndCache),
)
}
@ -64,5 +66,5 @@ case class ReadStorageBackend(
completionStorageBackend: CompletionStorageBackend,
contractStorageBackend: ContractStorageBackend,
eventStorageBackend: EventStorageBackend,
meteringStorageBackend: MeteringStorageBackend,
meteringStorageBackend: MeteringStorageReadBackend,
)

View File

@ -14,6 +14,9 @@ trait CommonStorageBackendFactory extends StorageBackendFactory {
override val createParameterStorageBackend: ParameterStorageBackend =
ParameterStorageBackendTemplate
override val createMeteringParameterStorageBackend: MeteringParameterStorageBackend =
MeteringParameterStorageBackendTemplate
override def createConfigurationStorageBackend(
ledgerEndCache: LedgerEndCache
): ConfigurationStorageBackend =
@ -28,8 +31,13 @@ trait CommonStorageBackendFactory extends StorageBackendFactory {
override val createUserManagementStorageBackend: UserManagementStorageBackend =
UserManagementStorageBackendTemplate
override def createMeteringStorageBackend(
override def createMeteringStorageReadBackend(
ledgerEndCache: LedgerEndCache
): MeteringStorageBackend =
new MeteringStorageBackendTemplate(ledgerEndCache)
): MeteringStorageReadBackend =
new MeteringStorageBackendReadTemplate(ledgerEndCache)
def createMeteringStorageWriteBackend: MeteringStorageWriteBackend = {
MeteringStorageBackendWriteTemplate
}
}

View File

@ -0,0 +1,76 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com
package daml.platform.store.backend.common
import daml.ledger.offset.Offset
import daml.logging.{ContextualizedLogger, LoggingContext}
import daml.platform.store.Conversions.{offset, timestampFromMicros}
import daml.platform.store.backend.MeteringParameterStorageBackend
import daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import daml.scalautil.Statement.discard
import anorm.{RowParser, ~}
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import java.sql.Connection
private[backend] object MeteringParameterStorageBackendTemplate
extends MeteringParameterStorageBackend {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
def initializeLedgerMeteringEnd(
init: LedgerMeteringEnd
)(connection: Connection)(implicit loggingContext: LoggingContext): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
import com.daml.platform.store.Conversions.TimestampToStatement
ledgerMeteringEnd(connection) match {
case None =>
logger.info(s"Initializing ledger metering end to $init")
discard(
SQL"""insert into metering_parameters(
ledger_metering_end,
ledger_metering_timestamp
) values (
${init.offset},
${init.timestamp}
)"""
.execute()(connection)
)
case Some(existing) =>
logger.info(s"Found existing ledger metering end $existing")
}
}
def ledgerMeteringEnd(connection: Connection): Option[LedgerMeteringEnd] = {
val LedgerMeteringEndParser: RowParser[LedgerMeteringEnd] = (
offset("ledger_metering_end").?.map(_.getOrElse(Offset.beforeBegin)) ~
timestampFromMicros("ledger_metering_timestamp")
) map { case ledgerMeteringEnd ~ ledgerMeteringTimestamp =>
LedgerMeteringEnd(ledgerMeteringEnd, ledgerMeteringTimestamp)
}
SQL"""SELECT ledger_metering_end, ledger_metering_timestamp FROM metering_parameters"""
.as(LedgerMeteringEndParser.singleOpt)(connection)
}
def updateLedgerMeteringEnd(
ledgerMeteringEnd: LedgerMeteringEnd
)(connection: Connection): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
import com.daml.platform.store.Conversions.TimestampToStatement
SQL"""
UPDATE
metering_parameters
SET
ledger_metering_end = ${ledgerMeteringEnd.offset},
ledger_metering_timestamp = ${ledgerMeteringEnd.timestamp}
"""
.execute()(connection)
()
}
}

View File

@ -5,21 +5,26 @@ package com.daml.platform.store.backend.common
import anorm.SqlParser.int
import anorm.{RowParser, ~}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.{
ParticipantMetering,
TransactionMetering,
}
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.platform.store.Conversions.{applicationId, offset, timestampFromMicros}
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.backend.MeteringStorageBackend
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.common.MeteringStorageBackendTemplate.transactionMeteringParser
import com.daml.platform.store.backend.{MeteringStorageReadBackend, MeteringStorageWriteBackend}
import com.daml.platform.store.cache.LedgerEndCache
import java.sql.Connection
private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndCache)
extends MeteringStorageBackend {
private[backend] object MeteringStorageBackendTemplate {
private val transactionMeteringParser: RowParser[TransactionMetering] = {
val transactionMeteringParser: RowParser[TransactionMetering] = {
(
applicationId("application_id") ~
int("action_count") ~
@ -31,15 +36,20 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC
meteringTimestamp ~
ledgerOffset =>
TransactionMetering(
applicationId,
actionCount,
meteringTimestamp,
ledgerOffset,
applicationId = applicationId,
actionCount = actionCount,
meteringTimestamp = meteringTimestamp,
ledgerOffset = ledgerOffset,
)
}
}
}
private[backend] class MeteringStorageBackendReadTemplate(ledgerEndCache: LedgerEndCache)
extends MeteringStorageReadBackend {
override def transactionMetering(
from: Time.Timestamp,
to: Option[Time.Timestamp],
@ -66,3 +76,95 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC
private def isSet(o: Option[_]): Int = o.fold(0)(_ => 1)
}
private[backend] object MeteringStorageBackendWriteTemplate extends MeteringStorageWriteBackend {
val participantMeteringParser: RowParser[ParticipantMetering] = {
(
applicationId("application_id") ~
timestampFromMicros("from_timestamp") ~
timestampFromMicros("to_timestamp") ~
int("action_count") ~
offset("ledger_offset")
).map {
case applicationId ~
from ~
to ~
actionCount ~
ledgerOffset =>
ParticipantMetering(
applicationId,
from,
to,
actionCount,
ledgerOffset,
)
}
}
def transactionMeteringMaxOffset(from: Offset, to: Timestamp)(
connection: Connection
): Option[Offset] = {
import com.daml.platform.store.Conversions.OffsetToStatement
import com.daml.platform.store.Conversions.TimestampToStatement
SQL"""
select max(ledger_offset)
from transaction_metering
where ledger_offset > $from
and metering_timestamp < $to
"""
.as(offset(1).?.single)(connection)
}
def transactionMetering(from: Offset, to: Offset)(
connection: Connection
): Vector[TransactionMetering] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL"""
select
application_id,
action_count,
metering_timestamp,
ledger_offset
from transaction_metering
where ledger_offset > $from
and ledger_offset <= $to
"""
.asVectorOf(transactionMeteringParser)(connection)
}
def insertParticipantMetering(metering: Vector[ParticipantMetering])(
connection: Connection
): Unit = {
import com.daml.platform.store.Conversions.OffsetToStatement
import com.daml.platform.store.Conversions.TimestampToStatement
metering.foreach { participantMetering =>
import participantMetering._
SQL"""
insert into participant_metering(application_id, from_timestamp, to_timestamp, action_count, ledger_offset)
values (${participantMetering.applicationId.toString}, $from, $to, $actionCount, $ledgerOffset)
""".execute()(connection)
}
}
def allParticipantMetering()(connection: Connection): Vector[ParticipantMetering] = {
SQL"""
select
application_id,
from_timestamp,
to_timestamp,
action_count,
ledger_offset
from participant_metering
"""
.asVectorOf(participantMeteringParser)(connection)
}
}

View File

@ -3,21 +3,21 @@
package com.daml.platform.store.backend.common
import java.sql.Connection
import anorm.SqlParser.{int, long}
import anorm.{RowParser, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.offset.Offset
import com.daml.platform.store.Conversions.{ledgerString, offset}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.common.MismatchException
import com.daml.platform.store.Conversions
import com.daml.platform.store.Conversions.{ledgerString, offset}
import com.daml.platform.store.backend.ParameterStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.scalautil.Statement.discard
import scalaz.syntax.tag._
import java.sql.Connection
private[backend] object ParameterStorageBackendTemplate extends ParameterStorageBackend {
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
@ -193,4 +193,5 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage
connection
)
}
}

View File

@ -3,24 +3,8 @@
package com.daml.platform.store.backend.postgresql
import com.daml.platform.store.backend.common.{
CommonStorageBackendFactory,
CompletionStorageBackendTemplate,
ContractStorageBackendTemplate,
IngestionStorageBackendTemplate,
PartyStorageBackendTemplate,
}
import com.daml.platform.store.backend.{
CompletionStorageBackend,
ContractStorageBackend,
DBLockStorageBackend,
DataSourceStorageBackend,
EventStorageBackend,
IngestionStorageBackend,
PartyStorageBackend,
ResetStorageBackend,
StorageBackendFactory,
}
import com.daml.platform.store.backend.common._
import com.daml.platform.store.backend._
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.interning.StringInterning
@ -59,4 +43,5 @@ object PostgresStorageBackendFactory
override val createResetStorageBackend: ResetStorageBackend =
PostgresResetStorageBackend
}

View File

@ -3,8 +3,6 @@
package com.daml.platform.store.backend
import java.sql.Connection
import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.h2.H2StorageBackendFactory
@ -16,6 +14,8 @@ import com.daml.testing.oracle.OracleAroundAll
import com.daml.testing.postgresql.PostgresAroundAll
import org.scalatest.Suite
import java.sql.Connection
/** Creates a database and a [[TestBackend]].
* Used by [[StorageBackendSpec]] to run all StorageBackend tests on different databases.
*/
@ -76,6 +76,7 @@ private[backend] trait StorageBackendProviderOracle
case class TestBackend(
ingestion: IngestionStorageBackend[_],
parameter: ParameterStorageBackend,
meteringParameter: MeteringParameterStorageBackend,
configuration: ConfigurationStorageBackend,
party: PartyStorageBackend,
packageBackend: PackageStorageBackend,
@ -90,16 +91,30 @@ case class TestBackend(
ledgerEndCache: MutableLedgerEndCache,
stringInterningSupport: MockStringInterning,
userManagement: UserManagementStorageBackend,
metering: MeteringStorageBackend,
metering: TestMeteringBackend,
)
case class TestMeteringBackend(
read: MeteringStorageReadBackend,
write: MeteringStorageWriteBackend,
)
object TestBackend {
def apply(storageBackendFactory: StorageBackendFactory): TestBackend = {
val ledgerEndCache = MutableLedgerEndCache()
val stringInterning = new MockStringInterning
def createTestMeteringBackend: TestMeteringBackend = {
TestMeteringBackend(
read = storageBackendFactory.createMeteringStorageReadBackend(ledgerEndCache),
write = storageBackendFactory.createMeteringStorageWriteBackend,
)
}
TestBackend(
ingestion = storageBackendFactory.createIngestionStorageBackend,
parameter = storageBackendFactory.createParameterStorageBackend,
meteringParameter = storageBackendFactory.createMeteringParameterStorageBackend,
configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache),
party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache),
packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache),
@ -115,7 +130,8 @@ object TestBackend {
ledgerEndCache = ledgerEndCache,
stringInterningSupport = stringInterning,
userManagement = storageBackendFactory.createUserManagementStorageBackend,
metering = storageBackendFactory.createMeteringStorageBackend(ledgerEndCache),
metering = createTestMeteringBackend,
)
}
}

View File

@ -119,7 +119,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
)
)
val metering1 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
val metering1 =
executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
// Restart the indexer - should delete data from the partial insert above
val end2 = executeSql(backend.parameter.ledgerEnd)
@ -154,7 +155,8 @@ private[backend] trait StorageBackendTestsInitializeIngestion
)
)
val metering2 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
val metering2 =
executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
parties1 should have length 1
packages1 should have size 1

View File

@ -3,10 +3,15 @@
package com.daml.platform.store.backend
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.{
ParticipantMetering,
TransactionMetering,
}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}
@ -20,7 +25,7 @@ private[backend] trait StorageBackendTestsMetering
import StorageBackendTestValues._
{
behavior of "StorageBackend (metering)"
behavior of "StorageBackend (read metering)"
it should "persist transaction metering" in {
@ -38,7 +43,7 @@ private[backend] trait StorageBackendTestsMetering
executeSql(ingest(Vector(dtoTransactionMetering(metering)), _))
executeSql(updateLedgerEnd(toOffset, 5L))
val Vector(actual) =
executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None))
executeSql(backend.metering.read.transactionMetering(Timestamp.Epoch, None, None))
actual shouldBe expected
}
@ -90,7 +95,9 @@ private[backend] trait StorageBackendTestsMetering
populate()
val from = someTime.addMicros(fromIdx)
val to = toIdx.map(someTime.addMicros)
val actual = executeSql(backend.metering.transactionMetering(from, to, applicationId)).toSet
val actual = executeSql(
backend.metering.read.transactionMetering(from, to, applicationId)
).toSet
val expected = execute(from, to, applicationId)
actual.map(_.ledgerOffset) shouldBe expected.map(_.ledgerOffset)
actual shouldBe expected
@ -106,4 +113,109 @@ private[backend] trait StorageBackendTestsMetering
check(2, None, Some(appIdA))
}
}
{
behavior of "StorageBackend (metering parameters)"
val initLedgerMeteringEnd = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch)
it should "fetch un-initialized ledger metering end" in {
executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe None
}
it should "initialized ledger metering end" in {
val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.Epoch)
executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(expected))
executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
}
it should "update ledger metering end with `before begin` offset" in {
executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd))
val expected = LedgerMeteringEnd(Offset.beforeBegin, Timestamp.now())
executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected))
executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
}
it should "update ledger metering end with valid offset" in {
executeSql(backend.meteringParameter.initializeLedgerMeteringEnd(initLedgerMeteringEnd))
val expected = LedgerMeteringEnd(
Offset.fromHexString(Ref.HexString.assertFromString("07")),
Timestamp.now(),
)
executeSql(backend.meteringParameter.updateLedgerMeteringEnd(expected))
executeSql(backend.meteringParameter.ledgerMeteringEnd) shouldBe Some(expected)
}
}
{
behavior of "StorageBackend (write metering)"
val metering = Vector(7L, 8L, 9L, 10L).map { i =>
TransactionMetering(
someApplicationId,
actionCount = 1,
meteringTimestamp = someTime.addMicros(i),
ledgerOffset = offset(i),
)
}
val meteringOffsets = metering.map(_.ledgerOffset)
val firstOffset = meteringOffsets.min
val lastOffset = meteringOffsets.max
val lastTime = metering.map(_.meteringTimestamp).max
it should "return the maximum transaction metering offset" in {
def check(from: Offset, to: Timestamp): Assertion = {
val expected = metering
.filter(_.ledgerOffset > from)
.filter(_.meteringTimestamp < to)
.map(_.ledgerOffset)
.maxOption
val actual = executeSql(backend.metering.write.transactionMeteringMaxOffset(from, to))
actual shouldBe expected
}
executeSql(ingest(metering.map(dtoTransactionMetering), _))
check(firstOffset, lastTime) // 9
check(firstOffset, lastTime.addMicros(1)) // 10
check(lastOffset, lastTime.addMicros(1)) // Unset
}
it should "select transaction metering for aggregation" in {
executeSql(ingest(metering.map(dtoTransactionMetering), _))
val nextLastOffset: Offset = meteringOffsets.filter(_ < lastOffset).max
val expected = meteringOffsets.filter(_ > firstOffset).filter(_ <= nextLastOffset).toSet
val actual = executeSql(
backend.metering.write.transactionMetering(firstOffset, nextLastOffset)
).map(_.ledgerOffset).toSet
actual shouldBe expected
}
it should "insert new participant metering records" in {
val expected = Vector(7L, 8L, 9L).map { i =>
ParticipantMetering(
someApplicationId,
someTime.addMicros(i),
someTime.addMicros(i + 1),
actionCount = 1,
ledgerOffset = offset(i),
)
}
executeSql(backend.metering.write.insertParticipantMetering(expected))
val actual =
executeSql(backend.metering.write.allParticipantMetering()).sortBy(_.ledgerOffset)
actual shouldBe expected
}
}
}

View File

@ -0,0 +1,235 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.indexer
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.{
ParticipantMetering,
TransactionMetering,
}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.{DatabaseMetrics, Metrics}
import com.daml.platform.store.appendonlydao.SqlExecutor
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.{
MeteringParameterStorageBackend,
MeteringStorageWriteBackend,
ParameterStorageBackend,
}
import org.mockito.ArgumentMatchersSugar.any
import org.mockito.MockitoSugar
import org.mockito.captor.ArgCaptor
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import java.sql.Connection
import java.time.temporal.ChronoUnit
import java.time.{LocalDate, LocalTime, OffsetDateTime, ZoneOffset}
import scala.concurrent.Future
//noinspection TypeAnnotation
final class MeteringAggregatorSpec extends AnyWordSpecLike with MockitoSugar with Matchers {
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
// private implicit val ec = scala.concurrent.ExecutionContext.global
private val metrics = new Metrics(new MetricRegistry)
private def toTS(t: OffsetDateTime): Timestamp = Timestamp.assertFromInstant(t.toInstant)
"MeteringAggregator" should {
val applicationA = Ref.ApplicationId.assertFromString("appA")
val applicationB = Ref.ApplicationId.assertFromString("appB")
class TestSetup {
val lastAggEndTime: OffsetDateTime =
OffsetDateTime.of(LocalDate.now(), LocalTime.of(15, 0), ZoneOffset.UTC)
val nextAggEndTime: OffsetDateTime = lastAggEndTime.plusHours(1)
val timeNow: OffsetDateTime = lastAggEndTime.plusHours(1).plusMinutes(+5)
val lastAggOffset: Offset = Offset.fromHexString(Ref.HexString.assertFromString("01"))
val conn: Connection = mock[Connection]
val dispatcher: SqlExecutor = new SqlExecutor {
override def executeSql[T](databaseMetrics: DatabaseMetrics)(sql: Connection => T)(implicit
loggingContext: LoggingContext
): Future[T] = Future.successful {
sql(conn)
}
}
val parameterStore: ParameterStorageBackend = mock[ParameterStorageBackend]
val meteringParameterStore: MeteringParameterStorageBackend =
mock[MeteringParameterStorageBackend]
val meteringStore: MeteringStorageWriteBackend = mock[MeteringStorageWriteBackend]
def runUnderTest(
transactionMetering: Vector[TransactionMetering],
maybeLedgerEnd: Option[Offset] = None,
): Future[Unit] = {
val ledgerEndOffset = (maybeLedgerEnd, transactionMetering.lastOption) match {
case (Some(le), _) => le
case (None, Some(t)) => t.ledgerOffset
case (None, None) => lastAggOffset
}
when(meteringParameterStore.ledgerMeteringEnd(conn))
.thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime))))
when(meteringStore.transactionMeteringMaxOffset(lastAggOffset, toTS(nextAggEndTime))(conn))
.thenReturn(transactionMetering.lastOption.map(_.ledgerOffset))
when(parameterStore.ledgerEnd(conn))
.thenReturn(LedgerEnd(ledgerEndOffset, 0L, 0))
transactionMetering.lastOption.map { last =>
when(meteringStore.transactionMetering(lastAggOffset, last.ledgerOffset)(conn))
.thenReturn(transactionMetering)
}
new MeteringAggregator(
meteringStore,
parameterStore,
meteringParameterStore,
metrics,
dispatcher,
() => toTS(timeNow),
)
.run()
}
}
"aggregate transaction metering records" in new TestSetup {
val transactionMetering = Vector(10, 15, 20).map { i =>
TransactionMetering(
applicationId = applicationA,
actionCount = i,
meteringTimestamp = toTS(lastAggEndTime.plusMinutes(i.toLong)),
ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString(i.toString)),
)
}
val expected: ParticipantMetering = ParticipantMetering(
applicationA,
from = toTS(lastAggEndTime),
to = toTS(nextAggEndTime),
actionCount = transactionMetering.map(_.actionCount).sum,
transactionMetering.last.ledgerOffset,
)
runUnderTest(transactionMetering)
verify(meteringStore).insertParticipantMetering(Vector(expected))(conn)
verify(meteringParameterStore).updateLedgerMeteringEnd(
LedgerMeteringEnd(expected.ledgerOffset, expected.to)
)(conn)
}
"not aggregate if there is not a full period" in new TestSetup {
override val timeNow = lastAggEndTime.plusHours(1).plusMinutes(-5)
when(meteringParameterStore.ledgerMeteringEnd(conn))
.thenReturn(Some(LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime))))
runUnderTest(Vector.empty)
verifyNoMoreInteractions(meteringStore)
}
"aggregate over multiple applications" in new TestSetup {
val expected = Set(applicationA, applicationB)
val transactionMetering = expected.toVector.map { a =>
TransactionMetering(
applicationId = a,
actionCount = 1,
meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)),
ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("10")),
)
}
runUnderTest(transactionMetering)
val participantMeteringCaptor = ArgCaptor[Vector[ParticipantMetering]]
verify(meteringStore).insertParticipantMetering(participantMeteringCaptor)(any[Connection])
participantMeteringCaptor.value.map(_.applicationId).toSet shouldBe expected
}
"increase ledger metering end even if there are not transaction metering records" in new TestSetup {
runUnderTest(Vector.empty[TransactionMetering])
verify(meteringParameterStore).updateLedgerMeteringEnd(
LedgerMeteringEnd(lastAggOffset, toTS(lastAggEndTime.plusHours(1)))
)(conn)
}
"skip aggregation if the last transaction metering offset within the time range has not been fully ingested" in new TestSetup {
val transactionMetering = Vector(
TransactionMetering(
applicationId = applicationA,
actionCount = 1,
meteringTimestamp = toTS(lastAggEndTime.plusMinutes(1)),
ledgerOffset = Offset.fromHexString(Ref.HexString.assertFromString("03")),
)
)
runUnderTest(
transactionMetering,
maybeLedgerEnd = Some(Offset.fromHexString(Ref.HexString.assertFromString("02"))),
)
verify(meteringParameterStore, never).updateLedgerMeteringEnd(any[LedgerMeteringEnd])(
any[Connection]
)
}
"fail if an attempt is made to run un-initialized" in new TestSetup {
// Note this only works as we do not use a real future for testing
intercept[IllegalStateException] {
when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None)
val underTest =
new MeteringAggregator(
meteringStore,
parameterStore,
meteringParameterStore,
metrics,
dispatcher,
() => toTS(timeNow),
)
underTest.run()
}
}
"initialize the metering ledger end to the hour before the current hour" in new TestSetup {
when(meteringParameterStore.ledgerMeteringEnd(conn)).thenReturn(None)
val underTest =
new MeteringAggregator(
meteringStore,
parameterStore,
meteringParameterStore,
metrics,
dispatcher,
() => toTS(timeNow),
)
underTest.initialize()
val expected = LedgerMeteringEnd(
Offset.beforeBegin,
toTS(timeNow.truncatedTo(ChronoUnit.HOURS).minusHours(1)),
)
verify(meteringParameterStore).initializeLedgerMeteringEnd(expected)(conn)
}
}
}

View File

@ -4,7 +4,6 @@
package com.daml.platform.store.appendonlydao
import java.sql.Connection
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.data.Ref

View File

@ -21,10 +21,20 @@ trait MeteringStore {
}
object MeteringStore {
case class TransactionMetering(
applicationId: Ref.ApplicationId,
actionCount: Int,
meteringTimestamp: Timestamp,
ledgerOffset: Offset,
)
case class ParticipantMetering(
applicationId: Ref.ApplicationId,
from: Timestamp,
to: Timestamp,
actionCount: Int,
ledgerOffset: Offset,
)
}