sandbox: Add timers for all read, write, and index service methods. (#5147)

CHANGELOG_BEGIN
- [Sandbox] Added metrics under "daml.services" for various internal
  events.
CHANGELOG_END
This commit is contained in:
Samir Talwar 2020-03-24 08:03:33 +01:00 committed by GitHub
parent ed67e5c188
commit 22aa642a9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 300 additions and 21 deletions

View File

@ -11,6 +11,7 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SeedService, WriteService}
import com.digitalasset.api.util.TimeProvider
@ -49,6 +50,7 @@ final class StandaloneApiServer(
readService: ReadService,
writeService: WriteService,
authService: AuthService,
transformIndexService: IndexService => IndexService = identity,
metrics: MetricRegistry,
timeServiceBackend: Option[TimeServiceBackend] = None,
seeding: Option[Seeding],
@ -74,14 +76,16 @@ final class StandaloneApiServer(
() => java.time.Clock.systemUTC.instant(),
initialConditions.ledgerId,
participantId)
indexService <- JdbcIndex.owner(
ServerRole.ApiServer,
initialConditions.config.timeModel,
domain.LedgerId(initialConditions.ledgerId),
participantId,
config.jdbcUrl,
metrics,
)
indexService <- JdbcIndex
.owner(
ServerRole.ApiServer,
initialConditions.config.timeModel,
domain.LedgerId(initialConditions.ledgerId),
participantId,
config.jdbcUrl,
metrics,
)
.map(transformIndexService)
healthChecks = new HealthChecks(
"index" -> indexService,
"read" -> readService,

View File

@ -3,16 +3,39 @@
package com.digitalasset.platform
import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.scaladsl.{Keep, Source}
import com.codahale.metrics.Timer
import com.digitalasset.dec.DirectExecutionContext
import scala.concurrent.Future
package object metrics {
def timedFuture[T](timer: Timer, f: => Future[T]): Future[T] = {
def timedFuture[T](timer: Timer, future: => CompletionStage[T]): CompletionStage[T] = {
val ctx = timer.time()
val res = f
res.onComplete(_ => ctx.stop())(DirectExecutionContext)
res
future.whenComplete { (_, _) =>
ctx.stop()
()
}
}
def timedFuture[T](timer: Timer, future: => Future[T]): Future[T] = {
val ctx = timer.time()
val result = future
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
result
}
def timedSource[Out, Mat](timer: Timer, source: => Source[Out, Mat]): Source[Out, Mat] = {
val ctx = timer.time()
source
.watchTermination()(Keep.both[Mat, Future[Done]])
.mapMaterializedValue {
case (mat, done) =>
done.onComplete(_ => ctx.stop())(DirectExecutionContext)
mat
}
}
}

View File

@ -45,6 +45,7 @@ import com.digitalasset.platform.sandbox.stores.{
SandboxIndexAndWriteService
}
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.platform.state.{TimedIndexService, TimedWriteService}
import com.digitalasset.ports.Port
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
@ -293,8 +294,14 @@ final class SandboxServer(
ApiServices
.create(
participantId = participantId,
writeService = indexAndWriteService.writeService,
indexService = indexAndWriteService.indexService,
writeService = new TimedWriteService(
indexAndWriteService.writeService,
metrics,
"daml.sandbox.writeService"),
indexService = new TimedIndexService(
indexAndWriteService.indexService,
metrics,
"daml.sandbox.indexService"),
authorizer = authorizer,
engine = SandboxServer.engine,
timeProvider = timeProvider,

View File

@ -14,7 +14,7 @@ import com.daml.ledger.on.sql.Database.InvalidDatabaseException
import com.daml.ledger.on.sql.SqlLedgerReaderWriter
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1
import com.daml.ledger.participant.state.v1.SeedService
import com.daml.ledger.participant.state.v1.{SeedService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.buildinfo.BuildInfo
import com.digitalasset.daml.lf.archive.DarReader
@ -42,6 +42,7 @@ import com.digitalasset.platform.sandbox.metrics.MetricsReporting
import com.digitalasset.platform.sandbox.services.SandboxResetService
import com.digitalasset.platform.sandboxnext.Runner._
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.platform.state.{TimedIndexService, TimedReadService, TimedWriteService}
import com.digitalasset.platform.store.FlywayMigrations
import com.digitalasset.ports.Port
import com.digitalasset.resources.akka.AkkaResourceOwner
@ -158,12 +159,14 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
seedService = SeedService(seeding)
)
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
readService = new TimedReadService(ledger, metrics, ReadServicePrefix)
writeService = new TimedWriteService(ledger, metrics, WriteServicePrefix)
ledgerId <- ResourceOwner.forFuture(() =>
ledger.getLedgerInitialConditions().runWith(Sink.head).map(_.ledgerId))
readService.getLedgerInitialConditions().runWith(Sink.head).map(_.ledgerId))
_ <- ResourceOwner.forFuture(() =>
Future.sequence(config.damlPackages.map(uploadDar(_, ledger))))
Future.sequence(config.damlPackages.map(uploadDar(_, writeService))))
_ <- new StandaloneIndexerServer(
readService = ledger,
readService = readService,
config = IndexerConfig(
ParticipantId,
jdbcUrl = indexJdbcUrl,
@ -204,9 +207,10 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
commandConfig = config.commandConfig,
partyConfig = config.partyConfig,
submissionConfig = config.submissionConfig,
readService = ledger,
writeService = ledger,
readService = readService,
writeService = writeService,
authService = authService,
transformIndexService = new TimedIndexService(_, metrics, IndexServicePrefix),
metrics = metrics,
timeServiceBackend = timeServiceBackend,
seeding = Some(seeding),
@ -238,7 +242,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
owner.acquire()
}
private def uploadDar(from: File, to: KeyValueParticipantState)(
private def uploadDar(from: File, to: WriteService)(
implicit executionContext: ExecutionContext
): Future[Unit] = {
val submissionId = v1.SubmissionId.assertFromString(UUID.randomUUID().toString)
@ -262,5 +266,9 @@ object Runner {
private val InMemoryIndexJdbcUrl =
"jdbc:h2:mem:index;db_close_delay=-1;db_close_on_exit=false"
private val ReadServicePrefix = "daml.services.read"
private val IndexServicePrefix = "daml.services.index"
private val WriteServicePrefix = "daml.services.write"
private val HeartbeatInterval: FiniteDuration = 1.second
}

View File

@ -0,0 +1,150 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.state
import java.time.Instant
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.v1.{Configuration, PackageId, ParticipantId, Party}
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.transaction.Node
import com.digitalasset.daml.lf.value.Value
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.{ApplicationId, LedgerId, LedgerOffset, TransactionId}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.digitalasset.ledger.api.v1.transaction_service.{
GetFlatTransactionResponse,
GetTransactionResponse,
GetTransactionTreesResponse,
GetTransactionsResponse
}
import com.digitalasset.platform.metrics.{timedFuture, timedSource}
import scala.concurrent.Future
final class TimedIndexService(delegate: IndexService, metrics: MetricRegistry, prefix: String)
extends IndexService {
override def listLfPackages(): Future[Map[PackageId, v2.PackageDetails]] =
time("listLfPackages", delegate.listLfPackages())
override def getLfArchive(packageId: PackageId): Future[Option[DamlLf.Archive]] =
time("getLfArchive", delegate.getLfArchive(packageId))
override def getLfPackage(packageId: PackageId): Future[Option[Ast.Package]] =
time("getLfPackage", delegate.getLfPackage(packageId))
override def packageEntries(
startExclusive: LedgerOffset.Absolute
): Source[domain.PackageEntry, NotUsed] =
time("packageEntries", delegate.packageEntries(startExclusive))
override def getLedgerConfiguration(): Source[v2.LedgerConfiguration, NotUsed] =
time("getLedgerConfiguration", delegate.getLedgerConfiguration())
override def currentLedgerEnd(): Future[LedgerOffset.Absolute] =
time("currentLedgerEnd", delegate.currentLedgerEnd())
override def getCompletions(
begin: domain.LedgerOffset,
applicationId: ApplicationId,
parties: Set[Party]
): Source[CompletionStreamResponse, NotUsed] =
time("getCompletions", delegate.getCompletions(begin, applicationId, parties))
override def transactions(
begin: domain.LedgerOffset,
endAt: Option[domain.LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean
): Source[GetTransactionsResponse, NotUsed] =
time("transactions", delegate.transactions(begin, endAt, filter, verbose))
override def transactionTrees(
begin: domain.LedgerOffset,
endAt: Option[domain.LedgerOffset],
filter: domain.TransactionFilter,
verbose: Boolean
): Source[GetTransactionTreesResponse, NotUsed] =
time("transactionTrees", delegate.transactionTrees(begin, endAt, filter, verbose))
override def getTransactionById(
transactionId: TransactionId,
requestingParties: Set[Party]
): Future[Option[GetFlatTransactionResponse]] =
time("getTransactionById", delegate.getTransactionById(transactionId, requestingParties))
override def getTransactionTreeById(
transactionId: TransactionId,
requestingParties: Set[Party]
): Future[Option[GetTransactionResponse]] =
time(
"getTransactionTreeById",
delegate.getTransactionTreeById(transactionId, requestingParties))
override def getActiveContractSetSnapshot(
filter: domain.TransactionFilter
): Future[v2.ActiveContractSetSnapshot] =
time("getActiveContractSetSnapshot", delegate.getActiveContractSetSnapshot(filter))
override def lookupActiveContract(
submitter: Party,
contractId: Value.AbsoluteContractId
): Future[Option[Value.ContractInst[Value.VersionedValue[Value.AbsoluteContractId]]]] =
time("lookupActiveContract", delegate.lookupActiveContract(submitter, contractId))
override def lookupContractKey(
submitter: Party,
key: Node.GlobalKey
): Future[Option[Value.AbsoluteContractId]] =
time("lookupContractKey", delegate.lookupContractKey(submitter, key))
override def getLedgerId(): Future[LedgerId] =
time("getLedgerId", delegate.getLedgerId())
override def getParticipantId(): Future[ParticipantId] =
time("getParticipantId", delegate.getParticipantId())
override def getParties(parties: Seq[Party]): Future[List[domain.PartyDetails]] =
time("getParties", delegate.getParties(parties))
override def listKnownParties(): Future[List[domain.PartyDetails]] =
time("listKnownParties", delegate.listKnownParties())
override def partyEntries(
startExclusive: LedgerOffset.Absolute
): Source[domain.PartyEntry, NotUsed] =
time("partyEntries", delegate.partyEntries(startExclusive))
override def lookupConfiguration(): Future[Option[(LedgerOffset.Absolute, Configuration)]] =
time("lookupConfiguration", delegate.lookupConfiguration())
override def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]
): Source[domain.ConfigurationEntry, NotUsed] =
time("configurationEntries", delegate.configurationEntries(startExclusive))
override def deduplicateCommand(
deduplicationKey: String,
submittedAt: Instant,
deduplicateUntil: Instant
): Future[v2.CommandDeduplicationResult] =
time(
"deduplicateCommand",
delegate.deduplicateCommand(deduplicationKey, submittedAt, deduplicateUntil))
override def currentHealth(): HealthStatus =
delegate.currentHealth()
private def time[T](name: String, future: => Future[T]): Future[T] =
timedFuture(metrics.timer(s"$prefix.$name"), future)
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
timedSource(metrics.timer(s"$prefix.$name"), source)
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.state
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.{LedgerInitialConditions, Offset, ReadService, Update}
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.metrics.timedSource
final class TimedReadService(delegate: ReadService, metrics: MetricRegistry, prefix: String)
extends ReadService {
override def getLedgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
time("getLedgerInitialConditions", delegate.getLedgerInitialConditions())
override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
time("stateUpdates", delegate.stateUpdates(beginAfter))
override def currentHealth(): HealthStatus =
delegate.currentHealth()
private def time[Out, Mat](name: String, source: => Source[Out, Mat]): Source[Out, Mat] =
timedSource(metrics.timer(s"$prefix.$name"), source)
}

View File

@ -0,0 +1,61 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.state
import java.util.concurrent.CompletionStage
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.{
Configuration,
Party,
SubmissionId,
SubmissionResult,
SubmittedTransaction,
SubmitterInfo,
TransactionMeta,
WriteService
}
import com.digitalasset.daml.lf.data.Time
import com.digitalasset.daml_lf_dev.DamlLf
import com.digitalasset.ledger.api.health.HealthStatus
import com.digitalasset.platform.metrics.timedFuture
final class TimedWriteService(delegate: WriteService, metrics: MetricRegistry, prefix: String)
extends WriteService {
override def submitTransaction(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction
): CompletionStage[SubmissionResult] =
time(
"submitTransaction",
delegate.submitTransaction(submitterInfo, transactionMeta, transaction))
override def uploadPackages(
submissionId: SubmissionId,
archives: List[DamlLf.Archive],
sourceDescription: Option[String]
): CompletionStage[SubmissionResult] =
time("uploadPackages", delegate.uploadPackages(submissionId, archives, sourceDescription))
override def allocateParty(
hint: Option[Party],
displayName: Option[String],
submissionId: SubmissionId
): CompletionStage[SubmissionResult] =
time("allocateParty", delegate.allocateParty(hint, displayName, submissionId))
override def submitConfiguration(
maxRecordTime: Time.Timestamp,
submissionId: SubmissionId,
config: Configuration
): CompletionStage[SubmissionResult] =
time("submitConfiguration", delegate.submitConfiguration(maxRecordTime, submissionId, config))
override def currentHealth(): HealthStatus =
delegate.currentHealth()
private def time[T](name: String, future: => CompletionStage[T]): CompletionStage[T] =
timedFuture(metrics.timer(s"$prefix.$name"), future)
}