mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Add first metrics to non-repudiation proxy (#8766)
* Add first metrics to non-repudiation proxy changelog_begin changelog_end Contributes to https://github.com/digital-asset/daml/issues/8635 Add a few key metrics for the non-repudiation proxy, with more to follow, in particular keeping track of the performance overhead associated with accessing the underlying database. All metrics can be seen in com.daml.nonrepudiation.Metrics Running the conformance tests successfully shows a summary of those metrics with the expected period (five seconds). * Address https://github.com/digital-asset/daml/pull/8766#discussion_r575044128
This commit is contained in:
parent
dbd017ee49
commit
a2d87b9396
@ -20,7 +20,7 @@ import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
|
||||
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionService
|
||||
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
|
||||
import com.daml.nonrepudiation.client.SigningInterceptor
|
||||
import com.daml.nonrepudiation.{AlgorithmString, NonRepudiationProxy}
|
||||
import com.daml.nonrepudiation.{AlgorithmString, MetricsReporterOwner, NonRepudiationProxy}
|
||||
import com.daml.platform.sandbox.config.SandboxConfig
|
||||
import com.daml.platform.sandboxnext.{Runner => Sandbox}
|
||||
import com.daml.ports.Port
|
||||
@ -77,6 +77,7 @@ final class NonRepudiationProxyConformance
|
||||
sandboxChannelBuilder,
|
||||
shutdownTimeout = 5.seconds,
|
||||
)
|
||||
_ <- MetricsReporterOwner.slf4j[ResourceContext](period = 5.seconds)
|
||||
transactor <- managedHikariTransactor(postgresDatabase.url, maxPoolSize = 10)
|
||||
db = Tables.initialize(transactor)
|
||||
_ = db.certificates.put(certificate)
|
||||
|
@ -23,6 +23,7 @@ da_scala_library(
|
||||
"//libs-scala/resources",
|
||||
"//runtime-components/non-repudiation-core",
|
||||
"@maven//:com_google_guava_guava",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
)
|
||||
|
@ -5,6 +5,8 @@ package com.daml.nonrepudiation
|
||||
|
||||
import java.security.cert.X509Certificate
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
|
||||
object CertificateRepository {
|
||||
|
||||
trait Read {
|
||||
@ -15,6 +17,11 @@ object CertificateRepository {
|
||||
def put(certificate: X509Certificate): FingerprintBytes
|
||||
}
|
||||
|
||||
final class Timed(timer: Timer, delegate: Read) extends Read {
|
||||
override def get(fingerprint: FingerprintBytes): Option[X509Certificate] =
|
||||
timer.time(() => delegate.get(fingerprint))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
trait CertificateRepository extends CertificateRepository.Read with CertificateRepository.Write
|
||||
|
@ -0,0 +1,55 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.nonrepudiation
|
||||
|
||||
import com.codahale.metrics.{Meter, MetricRegistry, Timer}
|
||||
|
||||
object Metrics extends Metrics {
|
||||
|
||||
// We only need a singleton right now
|
||||
// Having multiple registries is useful
|
||||
// "if you want to organize your metrics in particular reporting groups"
|
||||
// See: https://metrics.dropwizard.io/4.1.2/manual/core.html
|
||||
object Registry extends MetricRegistry
|
||||
|
||||
}
|
||||
|
||||
sealed abstract class Metrics {
|
||||
|
||||
private val Prefix = "daml.nonrepudiation"
|
||||
|
||||
private def name(suffix: String): String = s"$Prefix.$suffix"
|
||||
|
||||
// For further details on the metrics below, see: https://metrics.dropwizard.io/4.1.2/manual/core.html
|
||||
// Quick reference:
|
||||
// - meters track rates, keeping both historical mean and exponentially-weighted
|
||||
// moving average over the last 1, 5 and 15 minutes
|
||||
// - timers act as meters and also keep an histogram of the time for the
|
||||
// measured action, giving exponentially more weight to more recent data
|
||||
|
||||
// daml.nonrepudiation.processing
|
||||
// Overall time taken from interception to forwarding to the participant (or rejecting)
|
||||
val processingTimer: Timer = Metrics.Registry.timer(name("processing"))
|
||||
|
||||
// daml.nonrepudiation.get_key
|
||||
// Time taken to retrieve the key from the certificate store
|
||||
// Part of the time tracked in daml.nonrepudiation.processing
|
||||
val getKeyTimer: Timer = Metrics.Registry.timer(name("get_key"))
|
||||
|
||||
// daml.nonrepudiation.verify_signature
|
||||
// Time taken to verify the signature of a command
|
||||
// Part of the time tracked in daml.nonrepudiation.processing
|
||||
val verifySignatureTimer: Timer = Metrics.Registry.timer(name("verify_signature"))
|
||||
|
||||
// daml.nonrepudiation.add_signed_payload
|
||||
// Time taken to add the signed payload before ultimately forwarding the command
|
||||
// Part of the time tracked in daml.nonrepudiation.processing
|
||||
val addSignedPayloadTimer: Timer = Metrics.Registry.timer(name("add_signed_payload"))
|
||||
|
||||
// daml.nonrepudiation.rejections
|
||||
// Rate of calls that are being rejected before they can be forwarded to the participant
|
||||
// Historical and exponentially-weighted moving average rate over the latest 1, 5 and 15 minutes
|
||||
val rejectionsMeter: Meter = Metrics.Registry.meter(name("rejections"))
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.nonrepudiation
|
||||
|
||||
import com.codahale.metrics.{ScheduledReporter, Slf4jReporter}
|
||||
import com.daml.resources._
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
// We don't need access to the underlying resource, we use
|
||||
// the owner only to manage the reporter's life cycle
|
||||
sealed abstract class MetricsReporterOwner[Context: HasExecutionContext]
|
||||
extends AbstractResourceOwner[Context, Unit]
|
||||
|
||||
object MetricsReporterOwner {
|
||||
|
||||
def slf4j[Context: HasExecutionContext](
|
||||
period: FiniteDuration
|
||||
): MetricsReporterOwner[Context] =
|
||||
new Scheduled(period, Slf4jReporter.forRegistry(Metrics.Registry).build())
|
||||
|
||||
private final class Scheduled[Context: HasExecutionContext, Delegate <: ScheduledReporter](
|
||||
period: FiniteDuration,
|
||||
reporter: Delegate,
|
||||
) extends MetricsReporterOwner[Context] {
|
||||
override def acquire()(implicit context: Context): Resource[Context, Unit] = {
|
||||
ReleasableResource(Future(reporter.start(period.length, period.unit)))(_ =>
|
||||
Future(reporter.stop())
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -11,7 +11,7 @@ import io.grpc.{Channel, Server, ServerBuilder}
|
||||
|
||||
object NonRepudiationProxy {
|
||||
|
||||
def owner[Context: HasExecutionContext](
|
||||
def owner[Context](
|
||||
participant: Channel,
|
||||
serverBuilder: ServerBuilder[_],
|
||||
certificateRepository: CertificateRepository.Read,
|
||||
@ -19,13 +19,15 @@ object NonRepudiationProxy {
|
||||
timestampProvider: Clock,
|
||||
serviceName: String,
|
||||
serviceNames: String*
|
||||
): AbstractResourceOwner[Context, Server] = {
|
||||
)(implicit context: HasExecutionContext[Context]): AbstractResourceOwner[Context, Server] = {
|
||||
|
||||
val signatureVerification =
|
||||
new SignatureVerificationInterceptor(
|
||||
certificateRepository,
|
||||
signedPayloadRepository,
|
||||
timestampProvider,
|
||||
)
|
||||
|
||||
ReverseProxy.owner(
|
||||
backend = participant,
|
||||
serverBuilder = serverBuilder,
|
||||
@ -33,6 +35,7 @@ object NonRepudiationProxy {
|
||||
.map(service => service -> Seq(signatureVerification))
|
||||
.toMap,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,39 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.nonrepudiation
|
||||
|
||||
import java.security.Signature
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
object SignatureVerification {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(classOf[SignatureVerification])
|
||||
|
||||
final class Timed(timer: Timer) extends SignatureVerification {
|
||||
override def apply(payload: Array[Byte], signatureData: SignatureData): Try[Boolean] =
|
||||
timer.time(() => super.apply(payload, signatureData))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sealed abstract class SignatureVerification {
|
||||
|
||||
import SignatureVerification.logger
|
||||
|
||||
def apply(payload: Array[Byte], signatureData: SignatureData): Try[Boolean] =
|
||||
Try {
|
||||
logger.trace("Decoding signature bytes from Base64-encoded signature")
|
||||
logger.trace("Initializing signature verifier")
|
||||
val verifier = Signature.getInstance(signatureData.algorithm)
|
||||
verifier.initVerify(signatureData.key)
|
||||
verifier.update(payload)
|
||||
logger.trace("Verifying signature '{}'", signatureData.signature.base64)
|
||||
verifier.verify(signatureData.signature.unsafeArray)
|
||||
}
|
||||
|
||||
}
|
@ -4,9 +4,10 @@
|
||||
package com.daml.nonrepudiation
|
||||
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.security.{PublicKey, Signature}
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
import com.daml.grpc.interceptors.ForwardingServerCallListener
|
||||
import io.grpc.Metadata.Key
|
||||
import io.grpc._
|
||||
@ -16,24 +17,35 @@ import scala.util.Try
|
||||
|
||||
final class SignatureVerificationInterceptor(
|
||||
certificateRepository: CertificateRepository.Read,
|
||||
signedPayloads: SignedPayloadRepository.Write,
|
||||
signedPayloadRepository: SignedPayloadRepository.Write,
|
||||
timestampProvider: Clock,
|
||||
) extends ServerInterceptor {
|
||||
|
||||
import SignatureVerificationInterceptor._
|
||||
|
||||
private val timedCertificateRepository =
|
||||
new CertificateRepository.Timed(Metrics.getKeyTimer, certificateRepository)
|
||||
|
||||
private val timedSignedPayloadRepository =
|
||||
new SignedPayloadRepository.Timed(Metrics.addSignedPayloadTimer, signedPayloadRepository)
|
||||
|
||||
private val timedSignatureVerification =
|
||||
new SignatureVerification.Timed(Metrics.verifySignatureTimer)
|
||||
|
||||
override def interceptCall[ReqT, RespT](
|
||||
call: ServerCall[ReqT, RespT],
|
||||
metadata: Metadata,
|
||||
next: ServerCallHandler[ReqT, RespT],
|
||||
): ServerCall.Listener[ReqT] = {
|
||||
|
||||
val runningTimer = Metrics.processingTimer.time()
|
||||
|
||||
val signatureData =
|
||||
for {
|
||||
signature <- getHeader(metadata, Headers.SIGNATURE, SignatureBytes.wrap)
|
||||
algorithm <- getHeader(metadata, Headers.ALGORITHM, AlgorithmString.wrap)
|
||||
fingerprint <- getHeader(metadata, Headers.FINGERPRINT, FingerprintBytes.wrap)
|
||||
key <- getKey(certificateRepository, fingerprint)
|
||||
key <- getKey(timedCertificateRepository, fingerprint)
|
||||
} yield SignatureData(
|
||||
signature = signature,
|
||||
algorithm = algorithm,
|
||||
@ -48,11 +60,13 @@ final class SignatureVerificationInterceptor(
|
||||
metadata = metadata,
|
||||
next = next,
|
||||
signatureData = signatureData,
|
||||
signedPayloads = signedPayloads,
|
||||
signatureVerification = timedSignatureVerification,
|
||||
signedPayloads = timedSignedPayloadRepository,
|
||||
timestampProvider = timestampProvider,
|
||||
runningTimer = runningTimer,
|
||||
)
|
||||
case Left(rejection) =>
|
||||
rejection.report()
|
||||
rejection.report(runningTimer)
|
||||
call.close(SignatureVerificationFailed, new Metadata())
|
||||
new ServerCall.Listener[ReqT] {}
|
||||
}
|
||||
@ -84,7 +98,9 @@ object SignatureVerificationInterceptor {
|
||||
}
|
||||
|
||||
private trait Rejection {
|
||||
def report(): Unit = {
|
||||
def report(timer: Timer.Context): Unit = {
|
||||
Metrics.rejectionsMeter.mark()
|
||||
timer.stop()
|
||||
this match {
|
||||
case Rejection.Error(reason) =>
|
||||
logger.debug(reason)
|
||||
@ -124,8 +140,10 @@ object SignatureVerificationInterceptor {
|
||||
metadata: Metadata,
|
||||
next: ServerCallHandler[ReqT, RespT],
|
||||
signatureData: SignatureData,
|
||||
signatureVerification: SignatureVerification,
|
||||
signedPayloads: SignedPayloadRepository.Write,
|
||||
timestampProvider: Clock,
|
||||
runningTimer: Timer.Context,
|
||||
) extends ForwardingServerCallListener(call, metadata, next) {
|
||||
|
||||
private def castToByteArray(request: ReqT): Either[Rejection, Array[Byte]] = {
|
||||
@ -134,15 +152,7 @@ object SignatureVerificationInterceptor {
|
||||
}
|
||||
|
||||
private def verifySignature(payload: Array[Byte]): Either[Rejection, Boolean] =
|
||||
Try {
|
||||
logger.trace("Decoding signature bytes from Base64-encoded signature")
|
||||
logger.trace("Initializing signature verifier")
|
||||
val verifier = Signature.getInstance(signatureData.algorithm)
|
||||
verifier.initVerify(signatureData.key)
|
||||
verifier.update(payload)
|
||||
logger.trace("Verifying signature '{}'", signatureData.signature.base64)
|
||||
verifier.verify(signatureData.signature.unsafeArray)
|
||||
}.toEither.left
|
||||
signatureVerification(payload, signatureData).toEither.left
|
||||
.map(Rejection.fromException)
|
||||
.filterOrElse(identity, Rejection.SignatureVerificationFailed)
|
||||
|
||||
@ -168,12 +178,13 @@ object SignatureVerificationInterceptor {
|
||||
_ <- addSignedCommand(payload)
|
||||
} yield {
|
||||
val input = new ByteArrayInputStream(payload)
|
||||
val dup = call.getMethodDescriptor.parseRequest(input)
|
||||
super.onMessage(dup)
|
||||
val copy = call.getMethodDescriptor.parseRequest(input)
|
||||
runningTimer.stop()
|
||||
super.onMessage(copy)
|
||||
}
|
||||
|
||||
result.left.foreach { rejection =>
|
||||
rejection.report()
|
||||
rejection.report(runningTimer)
|
||||
call.close(SignatureVerificationFailed, new Metadata())
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
package com.daml.nonrepudiation
|
||||
|
||||
import com.codahale.metrics.Timer
|
||||
import com.daml.nonrepudiation.SignedPayloadRepository.KeyEncoder
|
||||
|
||||
object SignedPayloadRepository {
|
||||
@ -30,6 +31,11 @@ object SignedPayloadRepository {
|
||||
def put(signedPayload: SignedPayload): Unit
|
||||
}
|
||||
|
||||
final class Timed(timer: Timer, delegate: Write) extends Write {
|
||||
override def put(signedPayload: SignedPayload): Unit =
|
||||
timer.time[Unit](() => delegate.put(signedPayload))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
abstract class SignedPayloadRepository[Key](implicit val keyEncoder: KeyEncoder[Key])
|
||||
|
@ -127,7 +127,14 @@ final class NonRepudiationProxySpec
|
||||
val (privateKey, certificate) = Setup.generateKeyAndCertificate()
|
||||
|
||||
NonRepudiationProxy
|
||||
.owner(channel, proxyBuilder, certificates, signatures, Clock.systemUTC(), Health.Name)
|
||||
.owner(
|
||||
channel,
|
||||
proxyBuilder,
|
||||
certificates,
|
||||
signatures,
|
||||
Clock.systemUTC(),
|
||||
Health.Name,
|
||||
)
|
||||
.use { _ =>
|
||||
the[StatusRuntimeException] thrownBy {
|
||||
Health.getHealthStatus(
|
||||
|
Loading…
Reference in New Issue
Block a user