resources: Pull out the shutdown hook logic into ProgramResource. (#4356)

It's not trivial, so it's almost certainly less error-prone to just
define it once.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2020-02-04 10:32:35 +01:00 committed by GitHub
parent 93a613ff42
commit e203fdea90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 196 additions and 175 deletions

View File

@ -29,6 +29,7 @@ da_scala_library(
"//ledger/ledger-api-common",
"//ledger/participant-state",
"//ledger/sandbox",
"//libs-scala/resources",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_spray_spray_json_2_12",

View File

@ -4,21 +4,13 @@
package com.digitalasset.daml.lf.engine.script
import java.io.FileInputStream
import akka.actor.ActorSystem
import akka.stream._
import com.typesafe.scalalogging.StrictLogging
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.io.Source
import scalaz.syntax.traverse._
import spray.json._
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.archive.{Dar, DarReader}
import com.digitalasset.daml.lf.archive.Decode
import com.digitalasset.daml.lf.archive.{Dar, DarReader, Decode}
import com.digitalasset.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName}
import com.digitalasset.daml.lf.language.Ast
import com.digitalasset.daml.lf.language.Ast.Package
@ -35,8 +27,13 @@ import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.google.protobuf.ByteString
import com.typesafe.scalalogging.StrictLogging
import scalaz.syntax.traverse._
import spray.json._
import scala.util.control.NonFatal
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.io.Source
import scala.util.{Failure, Success}
object TestMain extends StrictLogging {
@ -45,7 +42,7 @@ object TestMain extends StrictLogging {
TestConfig.parse(args) match {
case None => sys.exit(1)
case Some(config) => {
case Some(config) =>
val encodedDar: Dar[(PackageId, DamlLf.ArchivePayload)] =
DarReader().readArchiveFromFile(config.darPath).get
val dar: Dar[(PackageId, Package)] = encodedDar.map {
@ -79,7 +76,7 @@ object TestMain extends StrictLogging {
val runner = new Runner(dar, applicationId, commandUpdater)
val (participantParams, participantCleanup) = config.participantConfig match {
case Some(file) => {
case Some(file) =>
val source = Source.fromFile(file)
val fileContent = try {
source.mkString
@ -88,39 +85,29 @@ object TestMain extends StrictLogging {
}
val jsVal = fileContent.parseJson
import ParticipantsJsonProtocol._
(jsVal.convertTo[Participants[ApiParameters]], () => ())
}
(jsVal.convertTo[Participants[ApiParameters]], () => Future.successful(()))
case None =>
val (apiParameters, cleanup) = if (config.ledgerHost.isEmpty) {
val sandboxConfig = SandboxConfig.default.copy(
port = 0, // Automatically choose a free port.
timeProviderType = config.timeProviderType
timeProviderType = config.timeProviderType,
)
val sandbox = new SandboxServer(sandboxConfig)
val sandboxClosed = new AtomicBoolean(false)
def closeSandbox(): Unit = {
if (sandboxClosed.compareAndSet(false, true)) sandbox.close()
}
try Runtime.getRuntime.addShutdownHook(new Thread(() => closeSandbox()))
catch {
case NonFatal(t) =>
logger.error(
"Shutting down Sandbox application because of initialization error",
t)
closeSandbox()
}
(ApiParameters("localhost", sandbox.port), () => closeSandbox())
val sandboxResource = SandboxServer.owner(sandboxConfig).acquire()
val sandboxPort = Await.result(sandboxResource.asFuture.flatMap(_.port), Duration.Inf)
(ApiParameters("localhost", sandboxPort), () => sandboxResource.release())
} else {
(ApiParameters(config.ledgerHost.get, config.ledgerPort.get), () => ())
(
ApiParameters(config.ledgerHost.get, config.ledgerPort.get),
() => Future.successful(()),
)
}
(
Participants(
default_participant = Some(apiParameters),
participants = Map.empty,
party_participants = Map.empty),
cleanup)
cleanup,
)
}
val flow: Future[Boolean] = for {
@ -162,14 +149,13 @@ object TestMain extends StrictLogging {
} yield success.get()
flow.onComplete { _ =>
participantCleanup()
Await.result(participantCleanup(), Duration.Inf)
system.terminate()
}
if (!Await.result(flow, Duration.Inf)) {
sys.exit(1)
}
}
}
}
}

View File

@ -18,11 +18,10 @@ import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer}
import com.digitalasset.platform.indexer.{IndexerConfig, StandaloneIndexerServer}
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import org.slf4j.LoggerFactory
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.ExecutionContext
import scala.util.Try
object ReferenceServer extends App {
@ -41,26 +40,23 @@ object ReferenceServer extends App {
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.dispatcher
val resource = newLoggingContext { implicit logCtx =>
val owner = newLoggingContext { implicit logCtx =>
for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- AkkaResourceOwner.forActorSystem(() => system).acquire()
_ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire()
_ <- AkkaResourceOwner.forActorSystem(() => system)
_ <- AkkaResourceOwner.forMaterializer(() => materializer)
ledger <- ResourceOwner
.forCloseable(() => new InMemoryKVParticipantState(config.participantId))
.acquire()
_ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file =>
_ <- ResourceOwner.sequenceIgnoringValues(config.archiveFiles.map { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- ResourceOwner
.forTry(() =>
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file))
.acquire()
_ <- ResourceOwner
.forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None))
.acquire()
} yield ()
})
_ <- startIndexerServer(config, readService = ledger)
@ -70,10 +66,8 @@ object ReferenceServer extends App {
writeService = ledger,
authService = AuthServiceWildcard,
)
_ <- Resource.sequenceIgnoringValues(
for {
(extraParticipantId, port, jdbcUrl) <- config.extraParticipants
} yield {
_ <- ResourceOwner.sequenceIgnoringValues(
for ((extraParticipantId, port, jdbcUrl) <- config.extraParticipants) yield {
val participantConfig = config.copy(
port = port,
participantId = extraParticipantId,
@ -93,27 +87,23 @@ object ReferenceServer extends App {
} yield ()
}
resource.asFuture.failed.foreach { exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}
Runtime.getRuntime.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds)))
new ProgramResource(owner).run()
private def startIndexerServer(config: Config, readService: ReadService)(
implicit logCtx: LoggingContext): Resource[Unit] =
implicit logCtx: LoggingContext
): ResourceOwner[Unit] =
new StandaloneIndexerServer(
readService,
IndexerConfig(config.participantId, config.jdbcUrl, config.startupMode),
SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"),
).acquire()
)
private def startApiServer(
config: Config,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
)(implicit logCtx: LoggingContext): Resource[Unit] =
)(implicit logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneApiServer(
ApiServerConfig(
config.participantId,
@ -130,5 +120,5 @@ object ReferenceServer extends App {
writeService,
authService,
SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"),
).acquire()
)
}

View File

@ -4,12 +4,11 @@
package com.daml.ledger.on.memory
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.resources.ProgramResource
import scala.concurrent.ExecutionContext.Implicits.global
object Main extends App {
Runner(
"In-Memory Ledger",
(ledgerId, participantId) => InMemoryLedgerReaderWriter.owner(ledgerId, participantId),
).run(args)
new ProgramResource(
Runner("In-Memory Ledger", InMemoryLedgerReaderWriter.owner(_, _)).owner(args)).run()
}

View File

@ -56,7 +56,6 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"//libs-scala/direct-execution-context",
"//libs-scala/resources",
],
)

View File

@ -9,13 +9,13 @@ import java.nio.file.Path
import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser
import scala.concurrent.ExecutionContext.Implicits.global
object Main extends App {
Runner("File System Ledger", FileSystemLedgerFactory).run(args)
new ProgramResource(Runner("File System Ledger", FileSystemLedgerFactory).owner(args)).run()
case class ExtraConfig(root: Option[Path])

View File

@ -7,29 +7,33 @@ import java.nio.file.{Files, Path}
import com.daml.ledger.on.filesystem.posix.DeleteFiles.deleteFiles
import com.daml.ledger.participant.state.kvutils.app.Runner
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.resources.Resource
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.{ProgramResource, Resource, ResourceOwner}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ExecutionContext, Future}
object MainWithEphemeralDirectory extends App {
implicit val executionContext: ExecutionContext = DirectExecutionContext
new ProgramResource(Runner("Ephemeral File System Ledger", owner _).owner(args)).run()
val root = Files.createTempDirectory("ledger-on-posix-filesystem-ephemeral-")
def owner(
ledgerId: LedgerId,
participantId: ParticipantId,
): ResourceOwner[FileSystemLedgerReaderWriter] =
for {
root <- temporaryDirectory("ledger-on-posix-filesystem-ephemeral-")
participant <- FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
)
} yield participant
for {
root <- Resource[Path](
Future.successful(root),
directory => Future.successful(deleteFiles(directory)),
)
_ <- Runner(
"Ephemeral File System Ledger",
(ledgerId, participantId) =>
FileSystemLedgerReaderWriter.owner(
ledgerId = ledgerId,
participantId = participantId,
root = root,
),
).run(args)
} yield ()
def temporaryDirectory(prefix: String): ResourceOwner[Path] = new ResourceOwner[Path] {
def acquire()(implicit executionContext: ExecutionContext): Resource[Path] =
Resource[Path](
Future(Files.createTempDirectory(prefix))(executionContext),
directory => Future(deleteFiles(directory))(executionContext),
)(executionContext)
}
}

View File

@ -7,13 +7,13 @@ import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser
import scala.concurrent.ExecutionContext.Implicits.global
object Main extends App {
Runner("SQL Ledger", SqlLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", SqlLedgerFactory).owner(args)).run()
case class ExtraConfig(jdbcUrl: Option[String])

View File

@ -9,15 +9,17 @@ import akka.stream.Materializer
import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser
import scala.concurrent.ExecutionContext.Implicits.global
object MainWithEphemeralDirectory extends App {
val DirectoryPattern = "%DIR"
val directory = Files.createTempDirectory("ledger-on-sql-ephemeral-")
Runner("SQL Ledger", TestLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", TestLedgerFactory).owner(args)).run()
object TestLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, ExtraConfig] {
override val defaultExtraConfig: ExtraConfig = SqlLedgerFactory.defaultExtraConfig

View File

@ -7,15 +7,17 @@ import akka.stream.Materializer
import com.daml.ledger.on.sql.Main.{ExtraConfig, SqlLedgerFactory}
import com.daml.ledger.participant.state.kvutils.app.{Config, LedgerFactory, Runner}
import com.daml.ledger.participant.state.v1.{LedgerId, ParticipantId}
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.{ProgramResource, ResourceOwner}
import com.digitalasset.testing.postgresql.PostgresAround
import scopt.OptionParser
import scala.concurrent.ExecutionContext.Implicits.global
object MainWithEphemeralPostgresql extends App with PostgresAround {
startEphemeralPostgres()
sys.addShutdownHook(stopAndCleanUpPostgres())
Runner("SQL Ledger", PostgresqlLedgerFactory).run(args)
new ProgramResource(Runner("SQL Ledger", PostgresqlLedgerFactory).owner(args)).run()
object PostgresqlLedgerFactory extends LedgerFactory[SqlLedgerReaderWriter, Unit] {
override val defaultExtraConfig: Unit = ()

View File

@ -42,6 +42,5 @@ da_scala_library(
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_slf4j_slf4j_api",
],
)

View File

@ -9,6 +9,7 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.app.Runner._
import com.daml.ledger.participant.state.v1.{
LedgerId,
ParticipantId,
@ -29,51 +30,46 @@ import com.digitalasset.platform.indexer.{
IndexerStartupMode,
StandaloneIndexerServer
}
import com.digitalasset.resources.ProgramResource.SuppressedException
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.ExecutionContext
import scala.util.Try
class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T, Extra]) {
def run(args: Seq[String]): Resource[Unit] = {
val config = Config
.parse(name, factory.extraConfigParser, factory.defaultExtraConfig, args)
.getOrElse(sys.exit(1))
val logger = LoggerFactory.getLogger(getClass)
def owner(args: Seq[String]): ResourceOwner[Unit] = {
implicit val system: ActorSystem = ActorSystem(
"[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-"))
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.dispatcher
val ledgerId =
config.ledgerId.getOrElse(Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
val resource = newLoggingContext { implicit logCtx =>
newLoggingContext { implicit logCtx =>
for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- AkkaResourceOwner.forActorSystem(() => system).acquire()
_ <- AkkaResourceOwner.forMaterializer(() => materializer).acquire()
_ <- AkkaResourceOwner.forActorSystem(() => system)
_ <- AkkaResourceOwner.forMaterializer(() => materializer)
config <- Config
.parse(name, factory.extraConfigParser, factory.defaultExtraConfig, args)
.fold(ResourceOwner.failed[Config[Extra]](new ConfigParseException))(
ResourceOwner.successful)
ledgerId = config.ledgerId.getOrElse(
Ref.LedgerString.assertFromString(UUID.randomUUID.toString))
readerWriter <- factory
.owner(ledgerId, config.participantId, config.extra)
.acquire()
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
_ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file =>
_ <- ResourceOwner.sequenceIgnoringValues(config.archiveFiles.map { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- ResourceOwner
.forTry(() =>
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file.toFile))
.acquire()
_ <- ResourceOwner
.forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None))
.acquire()
} yield ()
})
_ <- startIndexerServer(config, readService = ledger)
@ -85,22 +81,12 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
)
} yield ()
}
resource.asFuture.failed.foreach { exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}
Runtime.getRuntime
.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds)))
resource
}
private def startIndexerServer(
config: Config[Extra],
readService: ReadService,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Resource[Unit] =
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneIndexerServer(
readService,
IndexerConfig(
@ -109,14 +95,14 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
startupMode = IndexerStartupMode.MigrateAndStart,
),
SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"),
).acquire()
)
private def startApiServer(
config: Config[Extra],
readService: ReadService,
writeService: WriteService,
authService: AuthService,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): Resource[Unit] =
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneApiServer(
ApiServerConfig(
config.participantId,
@ -133,7 +119,7 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
writeService,
authService,
SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"),
).acquire()
)
}
object Runner {
@ -148,4 +134,6 @@ object Runner {
factory: LedgerFactory[T, Extra],
): Runner[T, Extra] =
new Runner(name, factory)
class ConfigParseException extends SuppressedException
}

View File

@ -3,44 +3,22 @@
package com.digitalasset.platform.sandbox
import java.util.concurrent.atomic.AtomicBoolean
import ch.qos.logback.classic.Level
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.platform.sandbox.cli.Cli
import com.digitalasset.resources.ProgramResource
import org.slf4j.{Logger, LoggerFactory}
import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext
object SandboxMain extends App {
private implicit val executionContext: ExecutionContext = DirectExecutionContext
private val logger = LoggerFactory.getLogger(this.getClass)
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
Cli.parse(args).fold(sys.exit(1)) { config =>
setGlobalLogLevel(config.logLevel)
val server = new SandboxServer(config)
val closed = new AtomicBoolean(false)
def closeServer(): Unit = {
if (closed.compareAndSet(false, true)) server.close()
}
server.failure.foreach { exception =>
logger.error(
s"Shutting down Sandbox application due to an initialization error:\n${exception.getMessage}")
closeServer()
sys.exit(1)
}
try {
Runtime.getRuntime.addShutdownHook(new Thread(() => closeServer()))
} catch {
case NonFatal(exception) =>
logger.error("Shutting down Sandbox application due to an initialization error.", exception)
closeServer()
sys.exit(1)
}
new ProgramResource(SandboxServer.owner(config)).run()
}
// Copied from language-support/scala/codegen/src/main/scala/com/digitalasset/codegen/Main.scala

View File

@ -51,6 +51,7 @@ import com.digitalasset.resources.{Resource, ResourceOwner}
import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
object SandboxServer {
private val ActorSystemName = "sandbox"
@ -62,6 +63,16 @@ object SandboxServer {
// repeated validation of the sames packages after each reset
private val engine = Engine()
def owner(config: SandboxConfig): ResourceOwner[SandboxState] = new ResourceOwner[SandboxState] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[SandboxState] = {
for {
server <- ResourceOwner.forTry(() => Try(new SandboxServer(config))).acquire()
state <- server.sandboxState
_ <- state.apiServer
} yield state
}
}
// if requested, initialize the ledger state with the given scenario
private def createInitialState(config: SandboxConfig, packageStore: InMemoryPackageStore)
: (InMemoryActiveLedgerState, ImmArray[LedgerEntryOrBump], Option[Instant]) = {
@ -91,21 +102,20 @@ object SandboxServer {
}
}
private final case class SandboxState(
final case class SandboxState(
// nested resource so we can release it independently when restarting
apiServer: Resource[ApiServer],
packageStore: InMemoryPackageStore,
materializer: Materializer,
) {
val executionContext: ExecutionContext = materializer.executionContext
private implicit val executionContext: ExecutionContext = materializer.executionContext
def port: Future[Int] = {
apiServer.asFuture.map(_.port)(executionContext)
}
def port: Future[Int] =
apiServer.map(_.port).asFuture
}
}
final class SandboxServer(config: => SandboxConfig) extends AutoCloseable {
final class SandboxServer(config: SandboxConfig) extends AutoCloseable {
// Name of this participant
// TODO: Pass this info in command-line (See issue #2025)
@ -127,21 +137,11 @@ final class SandboxServer(config: => SandboxConfig) extends AutoCloseable {
} yield state
}
def failure: Option[Throwable] =
Await
.result(
sandboxState.asFuture
.flatMap(_.apiServer.asFuture)(DirectExecutionContext)
.transformWith(Future.successful)(DirectExecutionContext),
AsyncTolerance)
.failed
.toOption
def port: Int =
Await.result(portF(DirectExecutionContext), AsyncTolerance)
def portF(implicit executionContext: ExecutionContext): Future[Int] =
sandboxState.asFuture.flatMap(_.apiServer.asFuture).map(_.port)
sandboxState.flatMap(_.apiServer).map(_.port).asFuture
/** the reset service is special, since it triggers a server shutdown */
private def resetService(
@ -158,7 +158,7 @@ final class SandboxServer(config: => SandboxConfig) extends AutoCloseable {
def resetAndRestartServer()(implicit executionContext: ExecutionContext): Future[Unit] = {
val apiServicesClosed =
sandboxState.asFuture.flatMap(_.apiServer.asFuture).flatMap(_.servicesClosed())
sandboxState.flatMap(_.apiServer).asFuture.flatMap(_.servicesClosed())
// Need to run this async otherwise the callback kills the server under the in-flight reset service request!
// TODO: eliminate the state mutation somehow

View File

@ -11,6 +11,7 @@ da_scala_library(
"//visibility:public",
],
deps = [
"//libs-scala/contextualized-logging",
"@maven//:org_scala_lang_modules_scala_java8_compat_2_12",
],
)

View File

@ -0,0 +1,48 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.resources
import com.digitalasset.logging.ContextualizedLogger
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.resources.ProgramResource._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
class ProgramResource[T](owner: ResourceOwner[T]) {
private val logger = ContextualizedLogger.get(getClass)
def run()(implicit executionContext: ExecutionContext): Resource[T] = {
newLoggingContext { implicit logCtx =>
val resource = owner.acquire()
resource.asFuture.failed.foreach {
case _: SuppressedException =>
System.exit(1)
case exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}
try {
sys.runtime.addShutdownHook(new Thread(() =>
Await.result(resource.release(), AsyncTimeout)))
} catch {
case NonFatal(exception) =>
logger.error("Shutting down because of an initialization error.", exception)
Await.result(resource.release(), AsyncTimeout)
System.exit(1)
}
resource
}
}
}
object ProgramResource {
private val AsyncTimeout = 10.seconds
abstract class SuppressedException extends RuntimeException
}

View File

@ -6,8 +6,10 @@ package com.digitalasset.resources
import java.util.Timer
import java.util.concurrent.{CompletionStage, ExecutorService}
import scala.collection.generic.CanBuildFrom
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Success, Try}
@FunctionalInterface
@ -75,4 +77,26 @@ object ResourceOwner {
def forTimer(acquire: () => Timer): ResourceOwner[Timer] =
new TimerResourceOwner(acquire)
def sequence[T, C[X] <: TraversableOnce[X]](seq: C[ResourceOwner[T]])(
implicit bf: CanBuildFrom[C[ResourceOwner[T]], T, C[T]],
executionContext: ExecutionContext,
): ResourceOwner[C[T]] =
seq
.foldLeft(ResourceOwner.successful(bf()))((builderResource, elementResource) =>
for {
builder <- builderResource
element <- elementResource
} yield builder += element)
.map(_.result())
def sequenceIgnoringValues[T, C[X] <: TraversableOnce[X]](seq: C[ResourceOwner[T]])(
implicit executionContext: ExecutionContext,
): ResourceOwner[Unit] =
seq
.foldLeft(ResourceOwner.successful(()))((builderResource, elementResource) =>
for {
_ <- builderResource
_ <- elementResource
} yield ())
}