mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-04 00:36:58 +03:00
adding Metrics around Ledger operations (#646)
* adding metrics around ledger components * adding missing copyright headers * adding requested comment * fixing merge conflicts
This commit is contained in:
parent
fef8a956ee
commit
40f196f7f6
2
3rdparty/dependencies.digest
vendored
2
3rdparty/dependencies.digest
vendored
@ -1 +1 @@
|
||||
066ab1d8914325c016da01ddaef4b881e3b7c83e dependencies.yaml
|
||||
57cefaabfeddb8747b1fc5f7554c412c247ea337 dependencies.yaml
|
||||
|
14
3rdparty/jvm/io/dropwizard/metrics/BUILD
vendored
14
3rdparty/jvm/io/dropwizard/metrics/BUILD
vendored
@ -14,3 +14,17 @@ java_library(
|
||||
)
|
||||
|
||||
|
||||
|
||||
java_library(
|
||||
name = "metrics_jmx",
|
||||
exports = [
|
||||
"//3rdparty/jvm/org/slf4j:slf4j_api",
|
||||
"//external:jar/io/dropwizard/metrics/metrics_jmx",
|
||||
":metrics_core"
|
||||
],
|
||||
visibility = [
|
||||
"//visibility:public"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
|
6
3rdparty/workspace.bzl
vendored
6
3rdparty/workspace.bzl
vendored
@ -280,7 +280,8 @@ def list_dependencies():
|
||||
{"artifact": "io.circe:circe-jawn_2.12:0.10.0", "lang": "scala", "sha1": "979ce1dc1d26ae0bf33601e0bb4d3a49dba549d9", "sha256": "8ebc3204f91e25b28fa8fb0ae17c58c2826f3fb4c3cb0a6f10d3c95760ecb600", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-jawn_2.12/0.10.0/circe-jawn_2.12-0.10.0.jar", "source": {"sha1": "31fdbc94498b34ee76520960e9ee62e936a30df5", "sha256": "dd626a8a597f82539e453a05c64faddce7431128e1227b3be9c034e45e197797", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-jawn_2.12/0.10.0/circe-jawn_2.12-0.10.0-sources.jar"} , "name": "io_circe_circe_jawn_2_12", "actual": "@io_circe_circe_jawn_2_12//jar:file", "bind": "jar/io/circe/circe_jawn_2_12"},
|
||||
{"artifact": "io.circe:circe-numbers_2.12:0.10.0", "lang": "scala", "sha1": "8c87ded057189cbeee0d7c2e9f55cc4dac487ee1", "sha256": "d7845d065c7320892f82bbc4d360e99de1f91892c32202115b09d96541fcb868", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-numbers_2.12/0.10.0/circe-numbers_2.12-0.10.0.jar", "source": {"sha1": "3806d839f4467d717ffbb4b315044e3c8e0bce98", "sha256": "10542f334c2c3350660983d2db01e1c91597a289eb91d4cc63863dbe7d60d59e", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-numbers_2.12/0.10.0/circe-numbers_2.12-0.10.0-sources.jar"} , "name": "io_circe_circe_numbers_2_12", "actual": "@io_circe_circe_numbers_2_12//jar:file", "bind": "jar/io/circe/circe_numbers_2_12"},
|
||||
{"artifact": "io.circe:circe-parser_2.12:0.10.0", "lang": "scala", "sha1": "a5508300b924dffebbb09f3350014089dd40d7ff", "sha256": "27c58a6bee47df9eeda409373870283c42a9321a5c7859f6ec49dadc802aa520", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-parser_2.12/0.10.0/circe-parser_2.12-0.10.0.jar", "source": {"sha1": "cee287f5e0e016d0313bc74e4dff577684350352", "sha256": "c8b642f70687dff6bedd6b0c596328f55e1c02212a0df9bb1b0b3dbd5883b045", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/circe/circe-parser_2.12/0.10.0/circe-parser_2.12-0.10.0-sources.jar"} , "name": "io_circe_circe_parser_2_12", "actual": "@io_circe_circe_parser_2_12//jar:file", "bind": "jar/io/circe/circe_parser_2_12"},
|
||||
{"artifact": "io.dropwizard.metrics:metrics-core:3.1.2", "lang": "java", "sha1": "224f03afd2521c6c94632f566beb1bb5ee32cf07", "sha256": "245ba2a66a9bc710ce4db14711126e77bcb4e6d96ef7e622659280f3c90cbb5c", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/3.1.2/metrics-core-3.1.2.jar", "source": {"sha1": "25177c6da71f42d7934040a2f5608e3a5f81464c", "sha256": "7b68c2a432bd52a9addeb32d1656c49bb4bcb4eb16c143eb0b7d7af913986924", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/3.1.2/metrics-core-3.1.2-sources.jar"} , "name": "io_dropwizard_metrics_metrics_core", "actual": "@io_dropwizard_metrics_metrics_core//jar", "bind": "jar/io/dropwizard/metrics/metrics_core"},
|
||||
{"artifact": "io.dropwizard.metrics:metrics-core:4.0.0", "lang": "java", "sha1": "551f95873512d1a8564a99ae4b57f64d15eda503", "sha256": "dc1019ece4b18cd9201ee7697eb4f557583c071378b2ac63347c4a7221c1820e", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/4.0.0/metrics-core-4.0.0.jar", "source": {"sha1": "32e938706395d0c704848e4690474191022ecae7", "sha256": "d039d6d676a0533bc17e7eec878a6038a58e7de0849aeb55011be0f111d5fe22", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/4.0.0/metrics-core-4.0.0-sources.jar"} , "name": "io_dropwizard_metrics_metrics_core", "actual": "@io_dropwizard_metrics_metrics_core//jar", "bind": "jar/io/dropwizard/metrics/metrics_core"},
|
||||
{"artifact": "io.dropwizard.metrics:metrics-jmx:4.0.0", "lang": "java", "sha1": "04fb95d49a5bee896cc6963ffeb1cce090acc381", "sha256": "14ebc20796899a0fd28524fb5789d897bc569367449ad64a9685ce878056792c", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-jmx/4.0.0/metrics-jmx-4.0.0.jar", "source": {"sha1": "79ac1ee1f1696307eafc834c03d726815572b21d", "sha256": "605b881f7b989492a59e6f85dd0648e0f7ac26c9974c7a33d86ea6fbe72845ae", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-jmx/4.0.0/metrics-jmx-4.0.0-sources.jar"} , "name": "io_dropwizard_metrics_metrics_jmx", "actual": "@io_dropwizard_metrics_metrics_jmx//jar", "bind": "jar/io/dropwizard/metrics/metrics_jmx"},
|
||||
{"artifact": "io.grpc:grpc-context:1.18.0", "lang": "java", "sha1": "c63e8b86af0fb16b5696480dc14f48e6eaa7193b", "sha256": "12bc83b9fa3aa7550d75c4515b8ae74f124ba14d3692a5ef4737a2e855cbca2f", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/grpc/grpc-context/1.18.0/grpc-context-1.18.0.jar", "source": {"sha1": "b9de26a0c63e994297ca8b36e21139aabc84fc01", "sha256": "006faf6a8355041f5f03a7602bf9dfeeb3d1aa838c17b022862c9aaf438ca640", "repository": "http://central.maven.org/maven2/", "url": "http://central.maven.org/maven2/io/grpc/grpc-context/1.18.0/grpc-context-1.18.0-sources.jar"} , "name": "io_grpc_grpc_context", "actual": "@io_grpc_grpc_context//jar", "bind": "jar/io/grpc/grpc_context"},
|
||||
# duplicates in io.grpc:grpc-core fixed to 1.18.0
|
||||
# - io.grpc:grpc-netty:1.18.0 wanted version [1.18.0]
|
||||
@ -598,7 +599,8 @@ def list_dependencies():
|
||||
# - com.typesafe.scala-logging:scala-logging_2.12:3.5.0 wanted version 1.7.21
|
||||
# - com.typesafe.slick:slick_2.12:3.3.0 wanted version 1.7.25
|
||||
# - com.zaxxer:HikariCP:3.2.0 wanted version 1.7.25
|
||||
# - io.dropwizard.metrics:metrics-core:3.1.2 wanted version 1.7.7
|
||||
# - io.dropwizard.metrics:metrics-core:4.0.0 wanted version 1.7.25
|
||||
# - io.dropwizard.metrics:metrics-jmx:4.0.0 wanted version 1.7.25
|
||||
# - org.apache.logging.log4j:log4j-slf4j-impl:2.8.1 wanted version 1.7.24
|
||||
# - org.mongodb:casbah-commons_2.12.0-RC1:3.1.1 wanted version 1.6.0
|
||||
# - org.rnorth:tcp-unix-socket-proxy:1.0.1 wanted version 1.7.21
|
||||
|
@ -331,7 +331,10 @@ dependencies:
|
||||
io.dropwizard.metrics:
|
||||
metrics-core:
|
||||
lang: java
|
||||
version: "3.1.2"
|
||||
version: "4.0.0"
|
||||
metrics-jmx:
|
||||
lang: java
|
||||
version: "4.0.0"
|
||||
|
||||
com.squareup:
|
||||
javapoet:
|
||||
|
@ -52,6 +52,8 @@ compileDependencies = [
|
||||
"//3rdparty/jvm/org/flywaydb:flyway_core",
|
||||
"//3rdparty/jvm/com/typesafe/play:anorm",
|
||||
"//3rdparty/jvm/com/typesafe/play:anorm_akka",
|
||||
"//3rdparty/jvm/io/dropwizard/metrics:metrics_core",
|
||||
"//3rdparty/jvm/io/dropwizard/metrics:metrics_jmx",
|
||||
]
|
||||
|
||||
da_scala_library(
|
||||
|
@ -14,6 +14,7 @@ import com.digitalasset.ledger.client.configuration.TlsConfiguration
|
||||
import com.digitalasset.ledger.server.LedgerApiServer.LedgerApiServer
|
||||
import com.digitalasset.platform.sandbox.banner.Banner
|
||||
import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext}
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.services.SandboxResetService
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
import com.digitalasset.platform.sandbox.stores.ledger._
|
||||
@ -40,11 +41,13 @@ object SandboxApplication {
|
||||
maybeBundle: Option[SslContext] = None)
|
||||
extends AutoCloseable {
|
||||
|
||||
//TODO: get rid of these vars! Stateful resources should be created as vals when the owner object is created.
|
||||
@volatile private var system: ActorSystem = _
|
||||
@volatile private var materializer: ActorMaterializer = _
|
||||
@volatile private var server: LedgerApiServer = _
|
||||
@volatile private var ledgerId: String = _
|
||||
@volatile private var stopHeartbeats: () => Unit = () => ()
|
||||
@volatile private var metricsManager: MetricsManager = _
|
||||
|
||||
@volatile var port: Int = serverPort
|
||||
|
||||
@ -74,6 +77,7 @@ object SandboxApplication {
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists): Unit = {
|
||||
implicit val mat = materializer
|
||||
implicit val ec: ExecutionContext = mat.system.dispatcher
|
||||
implicit val mm: MetricsManager = metricsManager
|
||||
|
||||
ledgerId = config.ledgerIdMode.ledgerId()
|
||||
|
||||
@ -93,7 +97,8 @@ object SandboxApplication {
|
||||
}
|
||||
|
||||
val (ledgerType, ledger) = config.jdbcUrl match {
|
||||
case None => ("in-memory", Ledger.inMemory(ledgerId, timeProvider, acs, records))
|
||||
case None =>
|
||||
("in-memory", Ledger.metered(Ledger.inMemory(ledgerId, timeProvider, acs, records)))
|
||||
case Some(jdbcUrl) =>
|
||||
val ledgerF = Ledger.postgres(jdbcUrl, ledgerId, timeProvider, records, startMode)
|
||||
|
||||
@ -103,7 +108,7 @@ object SandboxApplication {
|
||||
sys.error(msg)
|
||||
}, identity)
|
||||
|
||||
(s"sql", ledger)
|
||||
(s"sql", Ledger.metered(ledger))
|
||||
}
|
||||
|
||||
val ledgerBackend = new SandboxLedgerBackend(ledger)
|
||||
@ -143,6 +148,7 @@ object SandboxApplication {
|
||||
def start(): Unit = {
|
||||
system = ActorSystem(actorSystemName)
|
||||
materializer = ActorMaterializer()(system)
|
||||
metricsManager = MetricsManager()
|
||||
buildAndStartServer()
|
||||
}
|
||||
|
||||
@ -151,6 +157,7 @@ object SandboxApplication {
|
||||
Option(server).foreach(_.close())
|
||||
Option(materializer).foreach(_.shutdown())
|
||||
Option(system).foreach(s => Await.result(s.terminate(), asyncTolerance))
|
||||
Option(metricsManager).foreach(_.close())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,54 @@
|
||||
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.platform.sandbox.metrics
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.codahale.metrics.Slf4jReporter.LoggingLevel
|
||||
import com.codahale.metrics.jmx.JmxReporter
|
||||
import com.codahale.metrics.{MetricRegistry, Slf4jReporter}
|
||||
import com.digitalasset.platform.common.util.DirectExecutionContext
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** Manages metrics and reporters. Creates and starts a JmxReporter and creates an Slf4jReporter as well, which dumps
|
||||
* its metrics when this object is closed
|
||||
* <br/><br/>
|
||||
* Note that metrics are in general light-weight and add negligible overhead. They are not visible to everyday
|
||||
* users so they can be safely enabled all the time. */
|
||||
final class MetricsManager extends AutoCloseable {
|
||||
|
||||
private val metrics = new MetricRegistry()
|
||||
|
||||
private val jmxReporter = JmxReporter
|
||||
.forRegistry(metrics)
|
||||
.inDomain("com.digitalasset.platform.sandbox")
|
||||
.build
|
||||
|
||||
private val slf4jReporter = Slf4jReporter
|
||||
.forRegistry(metrics)
|
||||
.convertRatesTo(TimeUnit.SECONDS)
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||
.withLoggingLevel(LoggingLevel.DEBUG)
|
||||
.build()
|
||||
|
||||
jmxReporter.start()
|
||||
|
||||
def timedFuture[T](timerName: String, f: => Future[T]) = {
|
||||
val timer = metrics.timer(timerName)
|
||||
val ctx = timer.time()
|
||||
val res = f
|
||||
res.onComplete(_ => ctx.stop())(DirectExecutionContext)
|
||||
res
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
slf4jReporter.report()
|
||||
jmxReporter.close()
|
||||
}
|
||||
}
|
||||
|
||||
object MetricsManager {
|
||||
def apply(): MetricsManager = new MetricsManager()
|
||||
}
|
@ -13,14 +13,16 @@ import com.digitalasset.daml.lf.transaction.Node.GlobalKey
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
|
||||
import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubmission}
|
||||
import com.digitalasset.platform.sandbox.stores.{ActiveContracts, ActiveContractsInMemory}
|
||||
import ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContractsInMemory
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.inmemory.InMemoryLedger
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.{SqlLedger, SqlStartMode}
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** Defines all the functionalities a Ledger needs to provide */
|
||||
trait Ledger extends AutoCloseable {
|
||||
|
||||
def ledgerId: String
|
||||
@ -44,6 +46,15 @@ object Ledger {
|
||||
|
||||
type LedgerFactory = (ActiveContractsInMemory, Seq[LedgerEntry]) => Ledger
|
||||
|
||||
/**
|
||||
* Creates an in-memory ledger
|
||||
*
|
||||
* @param ledgerId the id to be used for the ledger
|
||||
* @param timeProvider the provider of time
|
||||
* @param acs the starting ACS store
|
||||
* @param ledgerEntries the starting entries
|
||||
* @return an in-memory Ledger
|
||||
*/
|
||||
def inMemory(
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
@ -51,14 +62,27 @@ object Ledger {
|
||||
ledgerEntries: Seq[LedgerEntry]): Ledger =
|
||||
new InMemoryLedger(ledgerId, timeProvider, acs, ledgerEntries)
|
||||
|
||||
/**
|
||||
* Creates a Postgres backed ledger
|
||||
*
|
||||
* @param jdbcUrl the jdbc url string containing the username and password as well
|
||||
* @param ledgerId the id to be used for the ledger
|
||||
* @param timeProvider the provider of time
|
||||
* @param ledgerEntries the starting entries
|
||||
* @param startMode whether the ledger should be reset, or continued where it was
|
||||
* @return a Postgres backed Ledger
|
||||
*/
|
||||
def postgres(
|
||||
jdbcUrl: String,
|
||||
ledgerId: String,
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: Seq[LedgerEntry],
|
||||
startMode: SqlStartMode
|
||||
)(implicit mat: Materializer): Future[Ledger] =
|
||||
)(implicit mat: Materializer, mm: MetricsManager): Future[Ledger] =
|
||||
//TODO (robert): casting from Seq to immutable.Seq, make ledgerEntries immutable throughout the Sandbox?
|
||||
SqlLedger(jdbcUrl, Some(ledgerId), timeProvider, immutable.Seq(ledgerEntries: _*), startMode)
|
||||
|
||||
/** Wraps the given Ledger adding metrics around important calls */
|
||||
def metered(ledger: Ledger)(implicit mm: MetricsManager): Ledger = MeteredLedger(ledger)
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.platform.sandbox.stores.ledger
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.daml.lf.transaction.Node.GlobalKey
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.daml.lf.value.Value.AbsoluteContractId
|
||||
import com.digitalasset.ledger.backend.api.v1.{SubmissionResult, TransactionSubmission}
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
private class MeteredLedger(ledger: Ledger, mm: MetricsManager) extends Ledger {
|
||||
|
||||
override def ledgerId: String = ledger.ledgerId
|
||||
|
||||
override def ledgerEntries(offset: Option[Long]): Source[(Long, LedgerEntry), NotUsed] =
|
||||
ledger.ledgerEntries(offset)
|
||||
|
||||
override def ledgerEnd: Long = ledger.ledgerEnd
|
||||
|
||||
override def snapshot(): Future[LedgerSnapshot] =
|
||||
ledger.snapshot()
|
||||
|
||||
override def lookupContract(
|
||||
contractId: Value.AbsoluteContractId): Future[Option[ActiveContract]] =
|
||||
mm.timedFuture("Ledger:lookupContract", ledger.lookupContract(contractId))
|
||||
|
||||
override def lookupKey(key: GlobalKey): Future[Option[AbsoluteContractId]] =
|
||||
mm.timedFuture("Ledger:lookupKey", ledger.lookupKey(key))
|
||||
|
||||
override def publishHeartbeat(time: Instant): Future[Unit] =
|
||||
mm.timedFuture("Ledger:publishHeartbeat", ledger.publishHeartbeat(time))
|
||||
|
||||
override def publishTransaction(
|
||||
transactionSubmission: TransactionSubmission): Future[SubmissionResult] =
|
||||
mm.timedFuture("Ledger:publishTransaction", ledger.publishTransaction(transactionSubmission))
|
||||
|
||||
override def close(): Unit = {
|
||||
ledger.close()
|
||||
}
|
||||
}
|
||||
|
||||
object MeteredLedger {
|
||||
def apply(ledger: Ledger)(implicit mm: MetricsManager): Ledger = new MeteredLedger(ledger, mm)
|
||||
}
|
@ -18,6 +18,7 @@ import com.digitalasset.platform.akkastreams.Dispatcher
|
||||
import com.digitalasset.platform.akkastreams.SteppingMode.RangeQuery
|
||||
import com.digitalasset.platform.common.util.DirectExecutionContext
|
||||
import com.digitalasset.platform.sandbox.config.LedgerIdGenerator
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.services.transaction.SandboxEventIdFormatter
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.sql.SqlStartMode.{
|
||||
@ -59,15 +60,17 @@ object SqlLedger {
|
||||
timeProvider: TimeProvider,
|
||||
ledgerEntries: immutable.Seq[LedgerEntry],
|
||||
startMode: SqlStartMode = SqlStartMode.ContinueIfExists)(
|
||||
implicit mat: Materializer): Future[Ledger] = {
|
||||
implicit mat: Materializer,
|
||||
mm: MetricsManager): Future[Ledger] = {
|
||||
implicit val ec: ExecutionContext = DirectExecutionContext
|
||||
|
||||
val noOfShortLivedConnections = 8
|
||||
val noOfStreamingConnections = 4
|
||||
|
||||
val dbDispatcher = DbDispatcher(jdbcUrl, noOfShortLivedConnections, noOfStreamingConnections)
|
||||
val ledgerDao =
|
||||
PostgresLedgerDao(dbDispatcher, ContractSerializer, TransactionSerializer, ValueSerializer)
|
||||
val ledgerDao = LedgerDao.metered(
|
||||
PostgresLedgerDao(dbDispatcher, ContractSerializer, TransactionSerializer, ValueSerializer))
|
||||
|
||||
val sqlLedgerFactory = SqlLedgerFactory(ledgerDao)
|
||||
|
||||
for {
|
||||
|
@ -13,6 +13,7 @@ import com.digitalasset.daml.lf.transaction.Node
|
||||
import com.digitalasset.daml.lf.transaction.Node.KeyWithMaintainers
|
||||
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, ContractInst, VersionedValue}
|
||||
import com.digitalasset.platform.common.util.DirectExecutionContext
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ActiveContracts.ActiveContract
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
|
||||
|
||||
@ -46,12 +47,12 @@ object PersistenceResponse {
|
||||
|
||||
}
|
||||
|
||||
case class LedgerSnapshot(offset: Long, acs: Source[Contract, NotUsed])
|
||||
|
||||
trait LedgerDao extends AutoCloseable {
|
||||
|
||||
type LedgerOffset = Long
|
||||
|
||||
case class LedgerSnapshot(offset: LedgerOffset, acs: Source[Contract, NotUsed])
|
||||
|
||||
/** Looks up the ledger id */
|
||||
def lookupLedgerId(): Future[Option[String]]
|
||||
|
||||
@ -139,3 +140,9 @@ trait LedgerDao extends AutoCloseable {
|
||||
def reset(): Future[Unit]
|
||||
|
||||
}
|
||||
|
||||
object LedgerDao {
|
||||
|
||||
/** Wraps the given LedgerDao adding metrics around important calls */
|
||||
def metered(dao: LedgerDao)(implicit mm: MetricsManager): LedgerDao = MeteredLedgerDao(dao)
|
||||
}
|
||||
|
@ -0,0 +1,67 @@
|
||||
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.platform.sandbox.stores.ledger.sql.dao
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.digitalasset.daml.lf.transaction.Node
|
||||
import com.digitalasset.daml.lf.value.Value
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import com.digitalasset.platform.sandbox.stores.ledger.LedgerEntry
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
private class MeteredLedgerDao(ledgerDao: LedgerDao, mm: MetricsManager) extends LedgerDao {
|
||||
|
||||
override def lookupLedgerId(): Future[Option[String]] =
|
||||
mm.timedFuture("LedgerDao:lookupLedgerId", ledgerDao.lookupLedgerId())
|
||||
|
||||
override def lookupLedgerEnd(): Future[Long] =
|
||||
mm.timedFuture("LedgerDao:lookupLedgerEnd", ledgerDao.lookupLedgerEnd())
|
||||
|
||||
override def lookupActiveContract(
|
||||
contractId: Value.AbsoluteContractId): Future[Option[Contract]] =
|
||||
mm.timedFuture("LedgerDao:lookupActiveContract", ledgerDao.lookupActiveContract(contractId))
|
||||
|
||||
override def lookupLedgerEntry(offset: Long): Future[Option[LedgerEntry]] =
|
||||
mm.timedFuture("LedgerDao:lookupLedgerEntry", ledgerDao.lookupLedgerEntry(offset))
|
||||
|
||||
override def lookupKey(key: Node.GlobalKey): Future[Option[Value.AbsoluteContractId]] =
|
||||
mm.timedFuture("LedgerDao:lookupKey", ledgerDao.lookupKey(key))
|
||||
|
||||
override def getActiveContractSnapshot()(implicit mat: Materializer): Future[LedgerSnapshot] =
|
||||
ledgerDao.getActiveContractSnapshot()
|
||||
|
||||
override def getLedgerEntries(
|
||||
startInclusive: LedgerOffset,
|
||||
endExclusive: LedgerOffset): Source[(LedgerOffset, LedgerEntry), NotUsed] =
|
||||
ledgerDao.getLedgerEntries(startInclusive, endExclusive)
|
||||
|
||||
override def storeInitialLedgerEnd(ledgerEnd: Long): Future[Unit] =
|
||||
ledgerDao.storeInitialLedgerEnd(ledgerEnd)
|
||||
|
||||
override def storeLedgerId(ledgerId: String): Future[Unit] =
|
||||
ledgerDao.storeLedgerId(ledgerId)
|
||||
|
||||
override def storeLedgerEntry(
|
||||
offset: Long,
|
||||
newLedgerEnd: Long,
|
||||
ledgerEntry: LedgerEntry): Future[PersistenceResponse] =
|
||||
mm.timedFuture(
|
||||
"storeLedgerEntry",
|
||||
ledgerDao.storeLedgerEntry(offset, newLedgerEnd, ledgerEntry))
|
||||
|
||||
override def reset(): Future[Unit] =
|
||||
ledgerDao.reset()
|
||||
|
||||
override def close(): Unit = {
|
||||
ledgerDao.close()
|
||||
}
|
||||
}
|
||||
|
||||
object MeteredLedgerDao {
|
||||
def apply(ledgerDao: LedgerDao)(implicit mm: MetricsManager): LedgerDao =
|
||||
new MeteredLedgerDao(ledgerDao, mm)
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.digitalasset.platform.sandbox
|
||||
|
||||
import com.digitalasset.platform.sandbox.metrics.MetricsManager
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
trait MetricsAround extends BeforeAndAfterAll {
|
||||
self: org.scalatest.Suite =>
|
||||
|
||||
@volatile implicit var metricsManager: MetricsManager = _
|
||||
|
||||
override protected def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
metricsManager = MetricsManager()
|
||||
}
|
||||
|
||||
override protected def afterAll(): Unit = {
|
||||
metricsManager.close()
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ package com.digitalasset.platform.sandbox.stores.ledger.sql
|
||||
|
||||
import com.digitalasset.api.util.TimeProvider
|
||||
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.digitalasset.platform.sandbox.MetricsAround
|
||||
import com.digitalasset.platform.sandbox.persistence.PostgresAroundEach
|
||||
import org.scalatest.concurrent.AsyncTimeLimitedTests
|
||||
import org.scalatest.time.Span
|
||||
@ -17,7 +18,8 @@ class SqlLedgerSpec
|
||||
with AsyncTimeLimitedTests
|
||||
with Matchers
|
||||
with PostgresAroundEach
|
||||
with AkkaBeforeAndAfterAll {
|
||||
with AkkaBeforeAndAfterAll
|
||||
with MetricsAround {
|
||||
|
||||
override val timeLimit: Span = 60.seconds
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user