Replace TestNamedLoggerFactory with in-memory appender (#4071)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Stefano Baghino 2020-01-16 17:07:31 +01:00 committed by GitHub
parent 0a26591849
commit 522555fb69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 116 deletions

View File

@ -267,7 +267,6 @@ da_scala_library(
"//libs-scala/resources",
"//libs-scala/timer-utils",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_auth0_java_jwt",
"@maven//:com_google_guava_guava",
"@maven//:com_typesafe_akka_akka_actor_2_12",

View File

@ -1,80 +0,0 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.logging
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase
import ch.qos.logback.{classic => logback}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.sandbox.logging.TestNamedLoggerFactory._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable
class TestNamedLoggerFactory(override val name: String) extends NamedLoggerFactory {
private val loggers = mutable.Map[String, (Logger, ListAppender[ILoggingEvent])]()
private val cleanupOperations = mutable.Buffer[() => Unit]()
override protected def getLogger(fullName: String): Logger =
loggers
.getOrElseUpdate(
fullName, {
val logger = LoggerFactory.getLogger(fullName).asInstanceOf[logback.Logger]
val additive = logger.isAdditive
logger.setAdditive(false)
val appender = new ListAppender[ILoggingEvent]
val normalAppenders = logger.iteratorForAppenders().asScala.toVector
normalAppenders.foreach(logger.detachAppender)
logger.addAppender(appender)
appender.start()
cleanupOperations += (() => {
logger.detachAppender(appender)
normalAppenders.foreach(logger.addAppender)
logger.setAdditive(additive)
})
(logger, appender)
}
)
._1
def logs[C](cls: Class[C]): Seq[LogEvent] =
logs(nameOfClass(cls))
def logs(fullName: String): Seq[LogEvent] =
loggers
.get(fullName)
.map(_._2.events)
.getOrElse(Seq.empty)
.map(event => event.getLevel -> event.getMessage)
def cleanup(): Unit = {
for (operation <- cleanupOperations)
operation()
cleanupOperations.clear()
loggers.clear()
}
}
object TestNamedLoggerFactory {
type LogEvent = (Level, String)
def apply(name: String): TestNamedLoggerFactory = new TestNamedLoggerFactory(name)
def apply(cls: Class[_]): TestNamedLoggerFactory = apply(cls.getSimpleName)
private class ListAppender[E] extends AppenderBase[E] {
private val eventsBuffer = mutable.Buffer[E]()
override def append(eventObject: E): Unit = {
eventsBuffer += eventObject
}
def events: Seq[E] =
eventsBuffer
}
}

View File

@ -17,8 +17,12 @@
<appender-ref ref="FILE"/>
</appender>
<appender name="RECOVERING" class="com.digitalasset.platform.indexer.RecoveringIndexerSpec$Appender" />
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC"/>
<appender-ref ref="RECOVERING" />
</root>
</configuration>

View File

@ -8,10 +8,12 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.ActorSystem
import akka.pattern.after
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.UnsynchronizedAppenderBase
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.indexer.RecoveringIndexerSpec._
import com.digitalasset.platform.indexer.TestIndexer._
import com.digitalasset.platform.sandbox.logging.TestNamedLoggerFactory
import com.digitalasset.resources.{Resource, ResourceOwner}
import org.scalatest.{AsyncWordSpec, BeforeAndAfterEach, Matchers}
@ -119,16 +121,16 @@ object TestIndexer {
case object SuccessfullyCompletes extends SubscribeStatus
}
class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAfterEach {
final class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAfterEach {
private[this] implicit val executionContext: ExecutionContext = DirectExecutionContext
private[this] val actorSystem = ActorSystem("RecoveringIndexerIT")
private[this] val scheduler = actorSystem.scheduler
private[this] val loggerFactory = TestNamedLoggerFactory(getClass)
private[this] val loggerFactory = NamedLoggerFactory(getClass)
override def afterEach(): Unit = {
loggerFactory.cleanup()
super.afterEach()
override def beforeEach(): Unit = {
super.beforeEach()
clearLog()
}
"RecoveringIndexer" should {
@ -151,12 +153,11 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
EventStreamComplete("A"),
EventStopCalled("A"),
)
logs should be(
Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
))
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
)
testIndexer.openSubscriptions should be(mutable.Set.empty)
}
}
@ -187,12 +188,11 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
EventSubscribeSuccess("A"),
EventStopCalled("A"),
)
logs should be(
Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
))
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
)
testIndexer.openSubscriptions should be(mutable.Set.empty)
}
}
@ -208,22 +208,20 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
val resource = recoveringIndexer.start(() => testIndexer.subscribe())
resource.asFuture
.map { complete =>
logs should be(
Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
))
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
)
complete
}
.flatten
.transformWith(finallyRelease(resource))
.map { _ =>
logs should be(
Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
))
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
)
testIndexer.openSubscriptions should be(mutable.Set.empty)
}
}
@ -257,7 +255,7 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
EventStreamComplete("C"),
EventStopCalled("C"),
)
logs should be(Seq(
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.ERROR -> "Error while running indexer, restart scheduled after 10 milliseconds",
Level.INFO -> "Starting Indexer Server",
@ -266,7 +264,7 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
))
)
testIndexer.openSubscriptions should be(mutable.Set.empty)
}
}
@ -297,27 +295,40 @@ class RecoveringIndexerSpec extends AsyncWordSpec with Matchers with BeforeAndAf
EventStreamComplete("B"),
EventStopCalled("B"),
)
logs should be(Seq(
readLog() should contain theSameElementsInOrderAs Seq(
Level.INFO -> "Starting Indexer Server",
Level.ERROR -> "Error while running indexer, restart scheduled after 500 milliseconds",
Level.INFO -> "Starting Indexer Server",
Level.INFO -> "Started Indexer Server",
Level.INFO -> "Successfully finished processing state updates",
))
)
testIndexer.openSubscriptions should be(mutable.Set.empty)
}
}
}
private def logs: Seq[TestNamedLoggerFactory.LogEvent] =
loggerFactory.logs(classOf[RecoveringIndexer])
}
object RecoveringIndexerSpec {
def finallyRelease[T](resource: Resource[_])(
implicit executionContext: ExecutionContext
): Try[T] => Future[T] = {
case Success(value) => resource.release().map(_ => value)
case Failure(exception) => resource.release().flatMap(_ => Future.failed(exception))
}
private[this] val log = Vector.newBuilder[(Level, String)]
private def readLog(): Seq[(Level, String)] = log.synchronized { log.result() }
private def clearLog(): Unit = log.synchronized { log.clear() }
final class Appender extends UnsynchronizedAppenderBase[ILoggingEvent] {
override def append(e: ILoggingEvent): Unit = {
log.synchronized {
val _ = log += e.getLevel -> e.getFormattedMessage
}
}
}
}