diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index f1bf4d957e..2103c7aab9 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -84,6 +84,7 @@ da_scala_test_suite( "@maven//:com_typesafe_akka_akka_stream", "@maven//:com_typesafe_akka_akka_stream_testkit", "@maven//:com_typesafe_akka_akka_testkit", + "@maven//:org_mockito_mockito_scala", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest", "@maven//:org_scalaz_scalaz_core", @@ -111,13 +112,20 @@ da_scala_test_suite( "//ledger/ledger-api-domain", "//ledger/ledger-api-health", "//ledger/metrics", + "//ledger/metrics:metrics-test-lib", "//libs-scala/concurrent", "//libs-scala/grpc-utils", "@maven//:ch_qos_logback_logback_classic", "@maven//:ch_qos_logback_logback_core", + "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:io_netty_netty_handler", + "@maven//:io_opentelemetry_opentelemetry_api", + "@maven//:io_opentelemetry_opentelemetry_context", + "@maven//:io_opentelemetry_opentelemetry_sdk_testing", + "@maven//:io_opentelemetry_opentelemetry_sdk_trace", "@maven//:io_zipkin_brave_brave", "@maven//:org_awaitility_awaitility", + "@maven//:org_mockito_mockito_core", "@maven//:org_reactivestreams_reactive_streams", ], ) diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala new file mode 100644 index 0000000000..0c363861ae --- /dev/null +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionServiceSpec.scala @@ -0,0 +1,94 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.server.api.services.grpc + +import java.time.{Duration, Instant} + +import com.codahale.metrics.MetricRegistry +import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.api.messages.command.submission.SubmitRequest +import com.daml.ledger.api.testing.utils.MockMessages.{ + applicationId, + commandId, + commands, + ledgerId, + party, + submitRequest, + workflowId, +} +import com.daml.ledger.api.v1.commands.{Command, CreateCommand} +import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} +import com.daml.metrics.Metrics +import com.daml.platform.server.api.services.domain.CommandSubmissionService +import com.daml.telemetry.{SpanAttribute, TelemetryContext, TelemetrySpecBase} +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import scala.concurrent.Future + +class GrpcCommandSubmissionServiceSpec + extends AsyncWordSpec + with TelemetrySpecBase + with MockitoSugar + with Matchers + with ArgumentMatchersSugar { + + import GrpcCommandSubmissionServiceSpec._ + + "GrpcCommandSubmissionService" should { + "propagate trace context" in { + val mockCommandSubmissionService = mock[CommandSubmissionService with AutoCloseable] + when(mockCommandSubmissionService.submit(any[SubmitRequest])(any[TelemetryContext])) + .thenReturn(Future.unit) + val grpcCommandSubmissionService = new GrpcCommandSubmissionService( + mockCommandSubmissionService, + ledgerId = LedgerId(ledgerId), + currentLedgerTime = () => Instant.EPOCH, + currentUtcTime = () => Instant.EPOCH, + maxDeduplicationTime = () => Some(Duration.ZERO), + metrics = new Metrics(new MetricRegistry), + ) + + val span = anEmptySpan() + val scope = span.makeCurrent() + try { + grpcCommandSubmissionService + .submit(aSubmitRequest) + .map { _ => + val spanAttributes = spanExporter.finishedSpanAttributes + spanAttributes should contain(SpanAttribute.ApplicationId -> applicationId) + spanAttributes should contain(SpanAttribute.CommandId -> commandId) + spanAttributes should contain(SpanAttribute.Submitter -> party) + spanAttributes should contain(SpanAttribute.WorkflowId -> workflowId) + } + } finally { + scope.close() + span.end() + } + } + } +} + +object GrpcCommandSubmissionServiceSpec { + private val aCommand = { + Command( + Command.Command.Create( + CreateCommand( + Some(Identifier("package", moduleName = "module", entityName = "entity")), + Some( + Record( + Some(Identifier("package", moduleName = "module", entityName = "entity")), + Seq(RecordField("something", Some(Value(Value.Sum.Bool(true))))), + ) + ), + ) + ) + ) + } + + private val aSubmitRequest = submitRequest.copy( + commands = Some(commands.copy(commands = Seq(aCommand))) + ) +} diff --git a/ledger/metrics/BUILD.bazel b/ledger/metrics/BUILD.bazel index 462775dafa..cf0c9a0e22 100644 --- a/ledger/metrics/BUILD.bazel +++ b/ledger/metrics/BUILD.bazel @@ -36,6 +36,10 @@ da_scala_library( da_scala_library( name = "metrics-test-lib", srcs = glob(["src/test/lib/scala/**/*.scala"]), + scala_deps = [ + "@maven//:org_scalactic_scalactic", + "@maven//:org_scalatest_scalatest", + ], tags = ["maven_coordinates=com.daml:metrics-test-lib:__VERSION__"], versioned_scala_deps = { "2.12": ["@maven//:org_scala_lang_modules_scala_collection_compat"], diff --git a/ledger/metrics/src/test/lib/scala/com/daml/telemetry/TelemetrySpecBase.scala b/ledger/metrics/src/test/lib/scala/com/daml/telemetry/TelemetrySpecBase.scala index d2fdcaef66..416b230344 100644 --- a/ledger/metrics/src/test/lib/scala/com/daml/telemetry/TelemetrySpecBase.scala +++ b/ledger/metrics/src/test/lib/scala/com/daml/telemetry/TelemetrySpecBase.scala @@ -3,16 +3,17 @@ package com.daml.telemetry +import io.opentelemetry.api.trace.{Span, Tracer} import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor +import org.scalatest.{BeforeAndAfterEach, Suite} import scala.jdk.CollectionConverters._ -trait TelemetrySpecBase { +trait TelemetrySpecBase extends BeforeAndAfterEach { self: Suite => - protected val anInstrumentationName: String = this.getClass.getCanonicalName protected val aSpanName = "aSpan" protected val anApplicationIdSpanAttribute: (SpanAttribute, String) = SpanAttribute.ApplicationId -> "anApplicationId" @@ -20,12 +21,23 @@ trait TelemetrySpecBase { SpanAttribute.CommandId -> "aCommandId" protected val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create - protected val tracerProvider: SdkTracerProvider = SdkTracerProvider - .builder() - .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) - .build() + protected val tracer: Tracer = { + val tracerProvider = SdkTracerProvider + .builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build() + val anInstrumentationName = this.getClass.getCanonicalName + tracerProvider.get(anInstrumentationName) + } - protected object TestTelemetry extends DefaultTelemetry(tracerProvider.get(anInstrumentationName)) + override protected def afterEach(): Unit = { + spanExporter.reset() + super.afterEach() + } + + protected def anEmptySpan(): Span = tracer.spanBuilder(aSpanName).startSpan() + + protected object TestTelemetry extends DefaultTelemetry(tracer) protected implicit class RichInMemorySpanExporter(exporter: InMemorySpanExporter) { def finishedSpanAttributes: Map[SpanAttribute, String] = finishedSpansToAttributes { spanData => diff --git a/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetryContextSpec.scala b/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetryContextSpec.scala index aa5496522e..1e4501436b 100644 --- a/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetryContextSpec.scala +++ b/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetryContextSpec.scala @@ -5,29 +5,20 @@ package com.daml.telemetry import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context +import org.scalatest.Assertion import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.{Assertion, BeforeAndAfterEach} +import org.scalatest.wordspec.AnyWordSpec /** Other cases are covered by [[TelemetrySpec]] */ -class TelemetryContextSpec - extends TelemetrySpecBase - with AnyWordSpecLike - with Matchers - with BeforeAndAfterEach { - - override protected def afterEach(): Unit = spanExporter.reset() +class TelemetryContextSpec extends AnyWordSpec with TelemetrySpecBase with Matchers { "DefaultTelemetryContext.runInOpenTelemetryScope" should { "run a body and create a current context with a span" in { - val tracer = tracerProvider.get(anInstrumentationName) - val span = tracer - .spanBuilder(aSpanName) - .setAttribute( - anApplicationIdSpanAttribute._1.key, - anApplicationIdSpanAttribute._2, - ) - .startSpan() + val span = anEmptySpan() + span.setAttribute( + anApplicationIdSpanAttribute._1.key, + anApplicationIdSpanAttribute._2, + ) runInOpenTelemetryScopeAndAssert(DefaultTelemetryContext(tracer, span)) @@ -36,10 +27,7 @@ class TelemetryContextSpec } "return a raw Open Telemetry context" in { - val tracer = tracerProvider.get(anInstrumentationName) - val span = tracer - .spanBuilder(aSpanName) - .startSpan() + val span = anEmptySpan() val openTelemetryContext = DefaultTelemetryContext(tracer, span).openTelemetryContext @@ -49,7 +37,6 @@ class TelemetryContextSpec "RootDefaultTelemetryContext.runInOpenTelemetryScope" should { "run a body" in { - val tracer = tracerProvider.get(anInstrumentationName) runInOpenTelemetryScopeAndAssert(RootDefaultTelemetryContext(tracer)) } } diff --git a/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetrySpec.scala b/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetrySpec.scala index ae022dfa56..13ae5a49c1 100644 --- a/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetrySpec.scala +++ b/ledger/metrics/src/test/suite/scala/com/daml/telemetry/TelemetrySpec.scala @@ -6,34 +6,29 @@ package com.daml.telemetry import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.scalatest.Assertion import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AsyncWordSpecLike -import org.scalatest.{Assertion, BeforeAndAfterEach} +import org.scalatest.wordspec.AsyncWordSpec import scala.concurrent.Future import scala.util.Try -class TelemetrySpec - extends TelemetrySpecBase - with AsyncWordSpecLike - with BeforeAndAfterEach - with Matchers { +class TelemetrySpec extends AsyncWordSpec with TelemetrySpecBase with Matchers { import TelemetrySpec._ - override protected def afterEach(): Unit = spanExporter.reset() - "contextFromGrpcThreadLocalContext" should { "return a context" in { - val tracer = tracerProvider.get(anInstrumentationName) - tracer - .spanBuilder(aSpanName) - .setAttribute("existingKey", "existingValue") - .startSpan() - .makeCurrent() - val context = DefaultTelemetry.contextFromGrpcThreadLocalContext() - context.setAttribute(anApplicationIdSpanAttribute._1, anApplicationIdSpanAttribute._2) - Span.current.end() + val span = anEmptySpan() + span.setAttribute("existingKey", "existingValue") + val scope = span.makeCurrent() + try { + val context = DefaultTelemetry.contextFromGrpcThreadLocalContext() + context.setAttribute(anApplicationIdSpanAttribute._1, anApplicationIdSpanAttribute._2) + } finally { + scope.close() + span.end() + } val attributes = spanExporter.finishedSpanAttributes attributes should contain(SpanAttribute("existingKey") -> "existingValue") @@ -47,8 +42,7 @@ class TelemetrySpec "contextFromMetadata" should { "return an extracted context" in { - val tracer = tracerProvider.get(anInstrumentationName) - val span = tracer.spanBuilder(aSpanName).startSpan() + val span = anEmptySpan() val metadata = DefaultTelemetryContext(tracer, span).encodeMetadata() val context = DefaultTelemetry.contextFromMetadata(Some(metadata)) @@ -69,8 +63,7 @@ class TelemetrySpec "contextFromOpenTelemetryContext" should { "return a raw Open Telemetry Context" in { - val tracer = tracerProvider.get(anInstrumentationName) - val span = tracer.spanBuilder(aSpanName).startSpan() + val span = anEmptySpan() val openTelemetryContext = Context.current.`with`(span) val context = DefaultTelemetry.contextFromOpenTelemetryContext(openTelemetryContext)