Fix concurrent gauge creation (#7116)

* Added failing test.
CHANGELOG_BEGIN
CHANGELOG_END

* Fixed test.

* Added missing header.

* Reformatted.

* Fixed concurrency issue for CacheMetrics as well.

* Reworded test case description a bit.

* Code tidying.

* Use da_scala_test_suite instead.
This commit is contained in:
Miklos 2020-08-13 18:59:56 +02:00 committed by GitHub
parent bed52a0db5
commit 2f4aed4506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 8 deletions

View File

@ -4,6 +4,7 @@
load( load(
"//bazel_tools:scala.bzl", "//bazel_tools:scala.bzl",
"da_scala_library", "da_scala_library",
"da_scala_test_suite",
) )
da_scala_library( da_scala_library(
@ -23,3 +24,14 @@ da_scala_library(
"@maven//:io_dropwizard_metrics_metrics_jvm", "@maven//:io_dropwizard_metrics_metrics_jvm",
], ],
) )
da_scala_test_suite(
name = "metrics-tests",
size = "small",
srcs = glob(["src/test/scala/**/*.scala"]),
deps = [
":metrics",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_scalatest_scalatest_2_12",
],
)

View File

@ -23,8 +23,9 @@ final class CacheMetrics(
def registerWeightGauge(weightGauge: Gauge[Long]): Unit = def registerWeightGauge(weightGauge: Gauge[Long]): Unit =
register(prefix :+ "weight", () => weightGauge) register(prefix :+ "weight", () => weightGauge)
private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = { private def register[T](name: MetricName, gaugeSupplier: MetricSupplier[Gauge[_]]): Gauge[T] =
registry.remove(name) registry.synchronized {
registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]] registry.remove(name)
} registry.gauge(name, gaugeSupplier).asInstanceOf[Gauge[T]]
}
} }

View File

@ -8,10 +8,13 @@ import com.codahale.metrics._
final class Metrics(val registry: MetricRegistry) { final class Metrics(val registry: MetricRegistry) {
private def gauge[T](name: MetricName, metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] = { private[metrics] def gauge[T](
registry.remove(name) name: MetricName,
registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]] metricSupplier: MetricSupplier[Gauge[_]]): Gauge[T] =
} registry.synchronized {
registry.remove(name)
registry.gauge(name, metricSupplier).asInstanceOf[Gauge[T]]
}
object test { object test {
private val Prefix: MetricName = MetricName("test") private val Prefix: MetricName = MetricName("test")

View File

@ -0,0 +1,27 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.metrics
import com.codahale.metrics.MetricRegistry
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.{ExecutionContext, Future}
class CacheMetricsSpec extends AsyncWordSpec with Matchers {
"gauge registrations" should {
"succeed on multiple threads in parallel for the same metric registry" in {
val cacheMetrics = new CacheMetrics(new MetricRegistry, MetricName.DAML)
implicit val executionContext: ExecutionContext = ExecutionContext.global
val instances =
(1 to 1000).map(_ =>
Future {
cacheMetrics.registerSizeGauge(() => 1L)
cacheMetrics.registerWeightGauge(() => 2L)
})
Future.sequence(instances).map { _ =>
succeed
}
}
}
}

View File

@ -0,0 +1,24 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.metrics
import com.codahale.metrics.MetricRegistry
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.{ExecutionContext, Future}
class MetricsSpec extends AsyncWordSpec with Matchers {
"gauge registration" should {
"succeed on multiple threads in parallel for the same metric name" in {
val metrics = new Metrics(new MetricRegistry)
implicit val executionContext: ExecutionContext = ExecutionContext.global
val metricName = MetricName.DAML :+ "a" :+ "test"
val instances =
(1 to 1000).map(_ => Future(metrics.gauge[Double](metricName, () => () => 1.0)))
Future.sequence(instances).map { _ =>
succeed
}
}
}
}