Sandbox-Next: Add the ResetService. (#4802)

* sandbox-next: Make the Runner a real ResourceOwner.

* sandbox: Don't construct the ResetService twice.

* sandbox: Inline and simplify methods in StandaloneApiServer.

* resources: Define a `ResettableResource`, which can be `reset()`.

`reset()` releases the resource, performs an optional reset operation,
and then re-acquires it, binding it to the same variable.

* resources: Pass the resource value into the reset operation.

* sandbox: Fix warnings in `TestCommands`.

* sandbox-next: Add the ResetService.

CHANGELOG_BEGIN
CHANGELOG_END

* sandbox: Make sure the SandboxResetService resets asynchronously.

It was being too clever and negating its own asynchronous behavior.

* sandbox-next: Forbid no seeding.

This double negative is really hard to phrase well.

* sandbox-next: Implement ResetService for a persistent ledger.

* sandbox: Delete the comment heading StandaloneIndexerServer.

It's no longer meaningful.

* sandbox-next: No need to wrap the SandboxResetService in an owner.

* sandbox-next: Bump the ResetService test timeouts.

It looks like it's definitely slower than on Sandbox Classic™. Gonna
look into this as part of future work.

* Revert to previous asynchronous reset behavior

Co-authored-by: Gerolf Seitz <gerolf.seitz@digitalasset.com>
This commit is contained in:
Samir Talwar 2020-03-04 15:40:35 +01:00 committed by GitHub
parent 3239a810eb
commit 85998f8f06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 965 additions and 540 deletions

View File

@ -9,7 +9,7 @@ import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1.{ReadService, SeedService, SubmissionId, WriteService}
import com.daml.ledger.participant.state.v1.{ReadService, SubmissionId, WriteService}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.AuthService
@ -89,7 +89,6 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
readService = ledger,
writeService = ledger,
authService = factory.authService(config),
seedService = Some(SeedService(config.seeding)),
)
} yield ()
@ -109,7 +108,6 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
seedService: Option[SeedService]
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneApiServer(
factory.apiServerConfig(participantConfig, config),
@ -120,6 +118,6 @@ class Runner[T <: KeyValueLedger, Extra](name: String, factory: LedgerFactory[T,
authService,
factory.apiServerMetricRegistry(participantConfig),
factory.timeServiceBackend(config),
seedService,
config.seeding,
).map(_ => ())
}

View File

@ -72,7 +72,10 @@ final class LedgerApiServer(
workerEventLoopGroup,
apiServices,
).acquire()
_ <- new ReorderApiServices(apiServicesResource, servicesClosedPromise).acquire()
// Notify the caller that the services have been closed, so a reset request can complete
// without blocking on the server terminating.
_ <- Resource(Future.successful(()))(_ =>
apiServicesResource.release().map(_ => servicesClosedPromise.success(())))
} yield {
val host = address.getOrElse("localhost")
val actualPort = server.getPort
@ -143,19 +146,6 @@ final class LedgerApiServer(
}
}
// This is necessary because we need to signal to the ResetService that we have shut down the
// APIs so it can consider the reset "done". Once it's finished, the reset request will finish,
// the gRPC connection will be closed and we can safely shut down the gRPC server. If we don't
// do that, the server won't shut down and we'll enter a deadlock.
private final class ReorderApiServices(
apiServices: Resource[ApiServices],
servicesClosedPromise: Promise[Unit],
) extends ResourceOwner[ApiServices] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[ApiServices] =
Resource(apiServices.asFuture)(_ =>
apiServices.release().map(_ => servicesClosedPromise.success(())))
}
final class UnableToBind(port: Port, cause: Throwable)
extends RuntimeException(
s"LedgerApiServer was unable to bind to port $port. " +

View File

@ -11,6 +11,7 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.participant.state.v1.SeedService.Seeding
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SeedService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.engine.Engine
@ -31,9 +32,11 @@ import com.digitalasset.platform.packages.InMemoryPackageStore
import com.digitalasset.ports.Port
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{Resource, ResourceOwner}
import io.grpc.{BindableService, ServerInterceptor}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.collection.immutable
import scala.concurrent.ExecutionContext
// Main entry point to start an index server that also hosts the ledger API.
// See v2.ReferenceServer on how it is used.
@ -46,22 +49,80 @@ final class StandaloneApiServer(
authService: AuthService,
metrics: MetricRegistry,
timeServiceBackend: Option[TimeServiceBackend] = None,
seedService: Option[SeedService],
seeding: Seeding,
otherServices: immutable.Seq[BindableService] = immutable.Seq.empty,
otherInterceptors: List[ServerInterceptor] = List.empty,
engine: Engine = sharedEngine // allows sharing DAML engine with DAML-on-X participant
)(implicit logCtx: LoggingContext)
extends ResourceOwner[Port] {
extends ResourceOwner[ApiServer] {
private val logger = ContextualizedLogger.get(this.getClass)
// Name of this participant,
val participantId: ParticipantId = config.participantId
override def acquire()(implicit executionContext: ExecutionContext): Resource[Port] =
buildAndStartApiServer().map { server =>
logger.info("Started Index Server")
server.port
override def acquire()(implicit executionContext: ExecutionContext): Resource[ApiServer] = {
val packageStore = loadDamlPackages()
preloadPackages(packageStore)
val owner = for {
actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem(actorSystemName))
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem))
initialConditions <- ResourceOwner.forFuture(() =>
readService.getLedgerInitialConditions().runWith(Sink.head)(materializer))
authorizer = new Authorizer(
() => java.time.Clock.systemUTC.instant(),
initialConditions.ledgerId,
participantId)
indexService <- JdbcIndex.owner(
initialConditions.config.timeModel,
domain.LedgerId(initialConditions.ledgerId),
participantId,
config.jdbcUrl,
metrics,
)(materializer, logCtx)
healthChecks = new HealthChecks(
"index" -> indexService,
"read" -> readService,
"write" -> writeService,
)
apiServer <- new LedgerApiServer(
(mat: Materializer, esf: ExecutionSequencerFactory) => {
ApiServices
.create(
participantId = participantId,
writeService = writeService,
indexService = indexService,
authorizer = authorizer,
engine = engine,
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
defaultLedgerConfiguration = initialConditions.config,
commandConfig = commandConfig,
submissionConfig = SubmissionConfiguration.default,
optTimeServiceBackend = timeServiceBackend,
metrics = metrics,
healthChecks = healthChecks,
seedService = Some(SeedService(seeding)),
)(mat, esf, logCtx)
.map(_.withServices(otherServices))
},
config.port,
config.maxInboundMessageSize,
config.address,
config.tlsConfig.flatMap(_.server),
AuthorizationInterceptor(authService, executionContext) :: otherInterceptors,
metrics
)(actorSystem, materializer, logCtx)
} yield {
writePortFile(apiServer.port)
logger.info(
s"Initialized API server version ${BuildInfo.Version} with ledger-id = ${initialConditions.ledgerId}, port = ${apiServer.port}, dar file = ${config.archiveFiles}")
apiServer
}
owner.acquire()
}
// if requested, initialize the ledger state with the given scenario
private def preloadPackages(packageContainer: InMemoryPackageStore): Unit = {
// [[ScenarioLoader]] needs all the packages to be already compiled --
@ -94,73 +155,10 @@ final class StandaloneApiServer(
.fold({ case (err, file) => sys.error(s"Could not load package $file: $err") }, identity)
}
private def buildAndStartApiServer()(implicit ec: ExecutionContext): Resource[ApiServer] = {
val packageStore = loadDamlPackages()
preloadPackages(packageStore)
for {
actorSystem <- AkkaResourceOwner.forActorSystem(() => ActorSystem(actorSystemName)).acquire()
materializer <- AkkaResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire()
initialConditions <- Resource.fromFuture(
readService.getLedgerInitialConditions().runWith(Sink.head)(materializer))
authorizer = new Authorizer(
() => java.time.Clock.systemUTC.instant(),
initialConditions.ledgerId,
participantId)
indexService <- JdbcIndex
.owner(
initialConditions.config.timeModel,
domain.LedgerId(initialConditions.ledgerId),
participantId,
config.jdbcUrl,
metrics,
)(materializer, logCtx)
.acquire()
healthChecks = new HealthChecks(
"index" -> indexService,
"read" -> readService,
"write" -> writeService,
)
apiServer <- new LedgerApiServer(
(mat: Materializer, esf: ExecutionSequencerFactory) => {
ApiServices
.create(
participantId = participantId,
writeService = writeService,
indexService = indexService,
authorizer = authorizer,
engine = engine,
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
defaultLedgerConfiguration = initialConditions.config,
commandConfig = commandConfig,
submissionConfig = SubmissionConfiguration.default,
optTimeServiceBackend = timeServiceBackend,
metrics = metrics,
healthChecks = healthChecks,
seedService = seedService,
)(mat, esf, logCtx)
},
config.port,
config.maxInboundMessageSize,
config.address,
config.tlsConfig.flatMap(_.server),
List(AuthorizationInterceptor(authService, ec)),
metrics
)(actorSystem, materializer, logCtx).acquire()
_ <- Resource.fromFuture(writePortFile(apiServer.port))
} yield {
logger.info(
s"Initialized index server version ${BuildInfo.Version} with ledger-id = ${initialConditions.ledgerId}, port = ${apiServer.port}, dar file = ${config.archiveFiles}")
apiServer
private def writePortFile(port: Port): Unit =
config.portFile.foreach { path =>
Files.write(path, Seq(port.toString).asJava)
}
}
private def writePortFile(port: Port)(
implicit executionContext: ExecutionContext
): Future[Unit] =
config.portFile
.map(path => Future(Files.write(path, Seq(port.toString).asJava)).map(_ => ()))
.getOrElse(Future.successful(()))
}
object StandaloneApiServer {

View File

@ -6,9 +6,11 @@ package com.digitalasset.platform.indexer
sealed trait IndexerStartupMode
object IndexerStartupMode {
case object ValidateAndStart extends IndexerStartupMode
case object MigrateAndStart extends IndexerStartupMode
case object MigrateOnly extends IndexerStartupMode
}

View File

@ -12,8 +12,6 @@ import com.digitalasset.resources.{Resource, ResourceOwner}
import scala.concurrent.{ExecutionContext, Future}
// Main entry point to start an indexer server.
// See v2.ReferenceServer for the usage
final class StandaloneIndexerServer(
readService: ReadService,
config: IndexerConfig,

View File

@ -22,8 +22,8 @@ import com.digitalasset.ledger.api.auth.interceptor.AuthorizationInterceptor
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard, Authorizer}
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.health.HealthChecks
import com.digitalasset.logging.ContextualizedLogger
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.apiserver.{
ApiServer,
ApiServices,
@ -180,23 +180,9 @@ final class SandboxServer(
def portF(implicit executionContext: ExecutionContext): Future[Port] =
apiServer.map(_.port)
/** the reset service is special, since it triggers a server shutdown */
private def resetService(
ledgerId: LedgerId,
authorizer: Authorizer,
executionContext: ExecutionContext,
)(implicit logCtx: LoggingContext): SandboxResetService =
new SandboxResetService(
ledgerId,
() => executionContext,
() => resetAndRestartServer()(executionContext),
authorizer,
)
def resetAndRestartServer()(implicit executionContext: ExecutionContext): Future[Unit] = {
val apiServicesClosed = apiServer.flatMap(_.servicesClosed())
// Need to run this async otherwise the callback kills the server under the in-flight reset service request!
// TODO: eliminate the state mutation somehow
sandboxState = sandboxState.flatMap(
_.reset(
@ -209,7 +195,8 @@ final class SandboxServer(
Some(port),
)))
// waits for the services to be closed, so we can guarantee that future API calls after finishing the reset will never be handled by the old one
// Wait for the services to be closed, so we can guarantee that future API calls after finishing
// the reset will never be handled by the old one.
apiServicesClosed
}
@ -286,6 +273,12 @@ final class SandboxServer(
.map(_ => ()))))
.getOrElse(ResourceOwner.unit)
.acquire()
// the reset service is special, since it triggers a server shutdown
resetService = new SandboxResetService(
ledgerId,
() => resetAndRestartServer(),
authorizer,
)
apiServer <- new LedgerApiServer(
(mat: Materializer, esf: ExecutionSequencerFactory) =>
ApiServices
@ -304,7 +297,7 @@ final class SandboxServer(
healthChecks = healthChecks,
seedService = config.seeding.map(SeedService(_)),
)(mat, esf, logCtx)
.map(_.withServices(List(resetService(ledgerId, authorizer, executionContext)))),
.map(_.withServices(List(resetService))),
// NOTE: Re-use the same port after reset.
currentPort.getOrElse(config.port),
config.maxInboundMessageSize,
@ -312,7 +305,7 @@ final class SandboxServer(
config.tlsConfig.flatMap(_.server),
List(
AuthorizationInterceptor(authService, executionContext),
resetService(ledgerId, authorizer, executionContext),
resetService,
),
metrics
).acquire()

View File

@ -0,0 +1,10 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.config
import com.digitalasset.resources.ProgramResource.StartupException
class InvalidConfigException(message: String)
extends RuntimeException(message)
with StartupException

View File

@ -15,13 +15,13 @@ import com.google.protobuf.empty.Empty
import io.grpc.ServerCall.Listener
import io.grpc._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.Future
class SandboxResetService(
ledgerId: LedgerId,
getEc: () => ExecutionContext,
resetAndRestartServer: () => Future[Unit],
authorizer: Authorizer)(implicit logCtx: LoggingContext)
authorizer: Authorizer,
)(implicit logCtx: LoggingContext)
extends ResetServiceGrpc.ResetService
with BindableService
with ServerInterceptor {
@ -42,7 +42,7 @@ class SandboxResetService(
serverCallHandler: ServerCallHandler[ReqT, RespT]): Listener[ReqT] = {
if (resetInitialized.get) {
throw new StatusRuntimeException(
Status.UNAVAILABLE.withDescription("Sandbox server is currently being resetted"))
Status.UNAVAILABLE.withDescription("Sandbox server is currently being reset"))
}
serverCallHandler.startCall(serverCall, metadata)
@ -67,16 +67,9 @@ class SandboxResetService(
if (!resetInitialized.compareAndSet(false, true))
throw new StatusRuntimeException(
Status.FAILED_PRECONDITION.withDescription("Sandbox server is currently being resetted"))
Status.FAILED_PRECONDITION.withDescription("Sandbox server is currently being reset"))
val servicesAreDown = Promise[Unit]()
// We need to run this asynchronously since otherwise we have a deadlock: `buildAndStartServer` will block
// until all the in flight requests have been served, so we need to schedule this in another thread so that
// the code that clears the in flight request is not in an in flight request itself.
getEc().execute({ () =>
logger.info(s"Stopping and starting the server.")
servicesAreDown.completeWith(resetAndRestartServer())
})
servicesAreDown.future
logger.info(s"Stopping and starting the server.")
resetAndRestartServer()
}
}

View File

@ -12,6 +12,6 @@ object Main {
def main(args: Array[String]): Unit = {
val config = Cli.parse(args, SandboxConfig.nextDefault).getOrElse(sys.exit(1))
config.logLevel.foreach(GlobalLogLevel.set)
new ProgramResource(new Runner().owner(config)).run()
new ProgramResource(new Runner(config)).run()
}
}

View File

@ -15,15 +15,16 @@ import com.daml.ledger.on.sql.Database.InvalidDatabaseException
import com.daml.ledger.on.sql.SqlLedgerReaderWriter
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1
import com.daml.ledger.participant.state.v1.{ReadService, SeedService, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
import com.digitalasset.ledger.api.auth.{AuthServiceWildcard, Authorizer}
import com.digitalasset.ledger.api.domain
import com.digitalasset.logging.ContextualizedLogger
import com.digitalasset.logging.LoggingContext.newLoggingContext
import com.digitalasset.logging.{ContextualizedLogger, LoggingContext}
import com.digitalasset.platform.apiserver.{
ApiServer,
ApiServerConfig,
StandaloneApiServer,
TimeServiceBackend
@ -36,17 +37,19 @@ import com.digitalasset.platform.indexer.{
StandaloneIndexerServer
}
import com.digitalasset.platform.sandbox.banner.Banner
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.config.{InvalidConfigException, SandboxConfig}
import com.digitalasset.platform.sandbox.services.SandboxResetService
import com.digitalasset.platform.sandboxnext.Runner._
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.platform.store.FlywayMigrations
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.resources.akka.AkkaResourceOwner
import com.digitalasset.resources.{ResettableResourceOwner, Resource, ResourceOwner}
import scalaz.syntax.tag._
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
/**
@ -55,13 +58,11 @@ import scala.util.Try
* Known issues:
* - does not support implicit party allocation
* - does not support scenarios
* - does not provide the reset service
*/
class Runner {
def owner(config: SandboxConfig): ResourceOwner[Unit] = {
class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[Port] = {
implicit val system: ActorSystem = ActorSystem("sandbox")
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.dispatcher
val specifiedLedgerId: Option[v1.LedgerId] = config.ledgerIdMode match {
case LedgerIdMode.Static(ledgerId) =>
@ -70,16 +71,26 @@ class Runner {
None
}
val (ledgerType, ledgerJdbcUrl, indexJdbcUrl) = config.jdbcUrl match {
case Some(url) if url.startsWith("jdbc:postgresql") => ("PostgreSQL", url, url)
case Some(url) if url.startsWith("jdbc:h2:mem:") => ("in-memory", InMemoryLedgerJdbcUrl, url)
case Some(url) if url.startsWith("jdbc:h2:") =>
throw new InvalidDatabaseException(
"This version of Sandbox does not support file-based H2 databases. Please use SQLite instead.")
case Some(url) if url.startsWith("jdbc:sqlite:") => ("SQLite", url, InMemoryIndexJdbcUrl)
case Some(url) => throw new InvalidDatabaseException(s"Unknown database: $url")
case None => ("in-memory", InMemoryLedgerJdbcUrl, InMemoryIndexJdbcUrl)
}
val (ledgerType, ledgerJdbcUrl, indexJdbcUrl, startupMode): (
String,
String,
String,
StartupMode) =
config.jdbcUrl match {
case Some(url) if url.startsWith("jdbc:postgresql:") =>
("PostgreSQL", url, url, StartupMode.MigrateAndStart)
case Some(url) if url.startsWith("jdbc:h2:mem:") =>
("in-memory", InMemoryLedgerJdbcUrl, url, StartupMode.ResetAndStart)
case Some(url) if url.startsWith("jdbc:h2:") =>
throw new InvalidDatabaseException(
"This version of Sandbox does not support file-based H2 databases. Please use SQLite instead.")
case Some(url) if url.startsWith("jdbc:sqlite:") =>
("SQLite", url, InMemoryIndexJdbcUrl, StartupMode.MigrateAndStart)
case Some(url) =>
throw new InvalidDatabaseException(s"Unknown database: $url")
case None =>
("in-memory", InMemoryLedgerJdbcUrl, InMemoryIndexJdbcUrl, StartupMode.ResetAndStart)
}
val timeProviderType = config.timeProviderType.getOrElse(TimeProviderType.Static)
val (timeServiceBackend, heartbeatMechanism) = timeProviderType match {
@ -91,49 +102,118 @@ class Runner {
(None, new RegularHeartbeat(clock, HeartbeatInterval))
}
newLoggingContext { implicit logCtx =>
val seeding = config.seeding.getOrElse {
throw new InvalidConfigException(
"This version of Sandbox will not start without a seeding mode. Please specify an appropriate seeding mode.")
}
val owner = newLoggingContext { implicit logCtx =>
for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- AkkaResourceOwner.forActorSystem(() => system)
_ <- AkkaResourceOwner.forMaterializer(() => materializer)
heartbeats <- heartbeatMechanism
readerWriter <- SqlLedgerReaderWriter.owner(
initialLedgerId = specifiedLedgerId,
participantId = ParticipantId,
jdbcUrl = ledgerJdbcUrl,
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
heartbeats = heartbeats,
apiServer <- ResettableResourceOwner[ApiServer, (Option[Port], StartupMode)](
initialValue = (None, startupMode),
owner = reset => {
case (currentPort, startupMode) =>
for {
_ <- startupMode match {
case StartupMode.MigrateAndStart =>
ResourceOwner.successful(())
case StartupMode.ResetAndStart =>
// Resetting through Flyway removes all tables in the database schema.
// Therefore we don't need to "reset" the KV Ledger and Index separately.
ResourceOwner.forFuture(() => new FlywayMigrations(indexJdbcUrl).reset())
}
heartbeats <- heartbeatMechanism
readerWriter <- SqlLedgerReaderWriter.owner(
initialLedgerId = specifiedLedgerId,
participantId = ParticipantId,
jdbcUrl = ledgerJdbcUrl,
timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC),
heartbeats = heartbeats,
)
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
ledgerId <- ResourceOwner.forFuture(() =>
ledger.getLedgerInitialConditions().runWith(Sink.head).map(_.ledgerId))
_ <- ResourceOwner.forFuture(() =>
Future.sequence(config.damlPackages.map(uploadDar(_, ledger))))
_ <- new StandaloneIndexerServer(
readService = ledger,
config = IndexerConfig(
ParticipantId,
jdbcUrl = indexJdbcUrl,
startupMode = IndexerStartupMode.MigrateAndStart,
allowExistingSchema = true,
),
metrics = SharedMetricRegistries.getOrCreate(s"indexer-$ParticipantId"),
)
authService = config.authService.getOrElse(AuthServiceWildcard)
promise = Promise[Unit]
resetService = {
val clock = Clock.systemUTC()
val authorizer = new Authorizer(() => clock.instant(), ledgerId, ParticipantId)
new SandboxResetService(
domain.LedgerId(ledgerId),
() => {
// Don't block the reset request; just wait until the services are closed.
// Otherwise we end up in deadlock, because the server won't shut down until
// all requests are completed.
reset()
promise.future
},
authorizer
)
}
apiServer <- new StandaloneApiServer(
ApiServerConfig(
participantId = ParticipantId,
archiveFiles = config.damlPackages,
// Re-use the same port when resetting the server.
port = currentPort.getOrElse(config.port),
address = config.address,
jdbcUrl = indexJdbcUrl,
tlsConfig = config.tlsConfig,
maxInboundMessageSize = config.maxInboundMessageSize,
portFile = config.portFile,
),
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
readService = ledger,
writeService = ledger,
authService = authService,
metrics = SharedMetricRegistries.getOrCreate(s"ledger-api-server-$ParticipantId"),
timeServiceBackend = timeServiceBackend,
seeding = seeding,
otherServices = List(resetService),
otherInterceptors = List(resetService),
)
_ = promise.completeWith(apiServer.servicesClosed())
} yield {
Banner.show(Console.out)
logger.withoutContext.info(
"Initialized sandbox version {} with ledger-id = {}, port = {}, dar file = {}, time mode = {}, ledger = {}, auth-service = {}, contract ids seeding = {}",
BuildInfo.Version,
ledgerId,
apiServer.port.toString,
config.damlPackages,
timeProviderType.description,
ledgerType,
authService.getClass.getSimpleName,
seeding.toString.toLowerCase,
)
apiServer
}
},
resetOperation =
apiServer => Future.successful((Some(apiServer.port), StartupMode.ResetAndStart))
)
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
ledgerId <- ResourceOwner.forFuture(() =>
ledger.getLedgerInitialConditions().runWith(Sink.head).map(_.ledgerId))
authService = config.authService.getOrElse(AuthServiceWildcard)
_ <- ResourceOwner.forFuture(() =>
Future.sequence(config.damlPackages.map(uploadDar(_, ledger))))
port <- startParticipant(
config,
indexJdbcUrl,
ledger,
authService,
timeServiceBackend,
config.seeding.map(SeedService(_)),
)
} yield {
Banner.show(Console.out)
logger.withoutContext.info(
"Initialized sandbox version {} with ledger-id = {}, port = {}, dar file = {}, time mode = {}, ledger = {}, auth-service = {}, contract ids seeding = {}",
BuildInfo.Version,
ledgerId,
port.toString,
config.damlPackages,
timeProviderType.description,
ledgerType,
authService.getClass.getSimpleName,
config.seeding.fold("no")(_.toString.toLowerCase),
)
}
} yield apiServer.port
}
owner.acquire()
}
private def uploadDar(from: File, to: KeyValueParticipantState)(
@ -146,77 +226,6 @@ class Runner {
_ <- to.uploadPackages(submissionId, dar.all, None).toScala
} yield ()
}
private def startParticipant(
config: SandboxConfig,
indexJdbcUrl: String,
ledger: KeyValueParticipantState,
authService: AuthService,
timeServiceBackend: Option[TimeServiceBackend],
seedService: Option[SeedService],
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Port] =
for {
_ <- startIndexerServer(
config = config,
indexJdbcUrl = indexJdbcUrl,
readService = ledger,
)
port <- startApiServer(
config = config,
indexJdbcUrl = indexJdbcUrl,
readService = ledger,
writeService = ledger,
authService = authService,
timeServiceBackend = timeServiceBackend,
seedService = seedService
)
} yield port
private def startIndexerServer(
config: SandboxConfig,
indexJdbcUrl: String,
readService: ReadService,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Unit] =
new StandaloneIndexerServer(
readService = readService,
config = IndexerConfig(
ParticipantId,
jdbcUrl = indexJdbcUrl,
startupMode = IndexerStartupMode.MigrateAndStart,
allowExistingSchema = true,
),
metrics = SharedMetricRegistries.getOrCreate(s"indexer-$ParticipantId"),
)
private def startApiServer(
config: SandboxConfig,
indexJdbcUrl: String,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
timeServiceBackend: Option[TimeServiceBackend],
seedService: Option[SeedService],
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext): ResourceOwner[Port] =
new StandaloneApiServer(
ApiServerConfig(
participantId = ParticipantId,
archiveFiles = config.damlPackages,
port = config.port,
address = config.address,
jdbcUrl = indexJdbcUrl,
tlsConfig = config.tlsConfig,
maxInboundMessageSize = config.maxInboundMessageSize,
portFile = config.portFile,
),
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
readService = readService,
writeService = writeService,
authService = authService,
metrics = SharedMetricRegistries.getOrCreate(s"ledger-api-server-$ParticipantId"),
timeServiceBackend = timeServiceBackend,
seedService = seedService,
)
}
object Runner {

View File

@ -0,0 +1,14 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandboxnext
sealed trait StartupMode
object StartupMode {
case object MigrateAndStart extends StartupMode
case object ResetAndStart extends StartupMode
}

View File

@ -46,6 +46,16 @@ class FlywayMigrations(jdbcUrl: String)(implicit logCtx: LoggingContext) {
}
}
def reset()(implicit executionContext: ExecutionContext): Future[Unit] =
dataSource.use { ds =>
Future {
val flyway = configurationBase(dbType).dataSource(ds).load()
logger.info("Running Flyway clean...")
flyway.clean()
logger.info("Flyway schema clean finished successfully.")
}
}
private def dataSource: ResourceOwner[HikariDataSource] =
HikariConnection.owner(jdbcUrl, "daml.index.db.migration", 2, 2, 250.millis, None)
}

View File

@ -0,0 +1,112 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox
import java.io.File
import java.net.InetAddress
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.ledger.participant.state.v1.TimeModel
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.bazeltools.BazelRunfiles._
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.ledger.api.auth.client.LedgerCallCredentials
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
LedgerIdentityServiceGrpc
}
import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc
import com.digitalasset.ledger.client.services.testing.time.StaticTime
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.grpc.Channel
import org.scalatest.{BeforeAndAfterAll, Suite}
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
trait AbstractSandboxFixture extends BeforeAndAfterAll {
self: Suite =>
private[this] val logger = LoggerFactory.getLogger(getClass)
private[this] val actorSystemName = this.getClass.getSimpleName
protected lazy val sandboxExecutionContext: ExecutionContext =
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$actorSystemName-thread-pool-worker-%d")
.setUncaughtExceptionHandler((thread, _) =>
logger.error(s"got an uncaught exception on thread: ${thread.getName}"))
.build()))
protected implicit val system: ActorSystem =
ActorSystem(actorSystemName, defaultExecutionContext = Some(sandboxExecutionContext))
protected implicit val materializer: Materializer = Materializer(system)
protected implicit val executionSequencerFactory: ExecutionSequencerFactory =
new AkkaExecutionSequencerPool("esf-" + this.getClass.getSimpleName)(system)
override protected def afterAll(): Unit = {
executionSequencerFactory.close()
materializer.shutdown()
Await.result(system.terminate(), 30.seconds)
super.afterAll()
}
protected def darFile = new File(rlocation("ledger/test-common/Test-stable.dar"))
protected def ledgerId(token: Option[String] = None): domain.LedgerId =
domain.LedgerId(
LedgerIdentityServiceGrpc
.blockingStub(channel)
.withCallCredentials(token.map(new LedgerCallCredentials(_)).orNull)
.getLedgerIdentity(GetLedgerIdentityRequest())
.ledgerId)
protected def getTimeProviderForClient(
implicit mat: Materializer,
esf: ExecutionSequencerFactory
): TimeProvider = {
Try(TimeServiceGrpc.stub(channel))
.map(StaticTime.updatedVia(_, ledgerId().unwrap)(mat, esf))
.fold[TimeProvider](_ => TimeProvider.UTC, Await.result(_, 30.seconds))
}
protected def config: SandboxConfig =
SandboxConfig.default.copy(
port = Port.Dynamic,
damlPackages = packageFiles,
timeProviderType = Some(TimeProviderType.Static),
timeModel = TimeModel.reasonableDefault,
scenario = scenario,
ledgerIdMode = LedgerIdMode.Static(LedgerId("sandbox-server")),
)
protected def packageFiles: List[File] = List(darFile)
protected def scenario: Option[String] = None
protected def database: Option[ResourceOwner[String]] = None
protected def serverHost: String = InetAddress.getLoopbackAddress.getHostName
protected def serverPort: Port
protected def channel: Channel
}

View File

@ -3,117 +3,26 @@
package com.digitalasset.platform.sandbox.services
import java.io.File
import java.net.InetAddress
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.ledger.participant.state.v1.TimeModel
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.bazeltools.BazelRunfiles._
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.ledger.api.auth.client.LedgerCallCredentials
import com.digitalasset.ledger.api.domain
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.testing.utils.{OwnedResource, Resource, SuiteResource}
import com.digitalasset.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
LedgerIdentityServiceGrpc
}
import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc
import com.digitalasset.ledger.client.services.testing.time.StaticTime
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.SandboxServer
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.platform.sandbox.{AbstractSandboxFixture, SandboxServer}
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.grpc.Channel
import org.scalatest.{BeforeAndAfterAll, Suite}
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._
import org.scalatest.Suite
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
import scala.concurrent.ExecutionContext
trait SandboxFixture extends SuiteResource[(SandboxServer, Channel)] with BeforeAndAfterAll {
trait SandboxFixture extends AbstractSandboxFixture with SuiteResource[(SandboxServer, Channel)] {
self: Suite =>
private[this] val logger = LoggerFactory.getLogger(getClass)
private[this] val actorSystemName = this.getClass.getSimpleName
private lazy val executionContext = ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$actorSystemName-thread-pool-worker-%d")
.setUncaughtExceptionHandler((thread, _) =>
logger.error(s"got an uncaught exception on thread: ${thread.getName}"))
.build()))
protected implicit val system: ActorSystem =
ActorSystem(actorSystemName, defaultExecutionContext = Some(executionContext))
protected implicit val materializer: Materializer = Materializer(system)
protected implicit val executionSequencerFactory: ExecutionSequencerFactory =
new AkkaExecutionSequencerPool("esf-" + this.getClass.getSimpleName)(system)
override protected def afterAll(): Unit = {
executionSequencerFactory.close()
materializer.shutdown()
Await.result(system.terminate(), 30.seconds)
super.afterAll()
}
protected def darFile = new File(rlocation("ledger/test-common/Test-stable.dar"))
protected def ledgerId(token: Option[String] = None): domain.LedgerId =
domain.LedgerId(
LedgerIdentityServiceGrpc
.blockingStub(channel)
.withCallCredentials(token.map(new LedgerCallCredentials(_)).orNull)
.getLedgerIdentity(GetLedgerIdentityRequest())
.ledgerId)
protected def getTimeProviderForClient(
implicit mat: Materializer,
esf: ExecutionSequencerFactory): TimeProvider = {
Try(TimeServiceGrpc.stub(channel))
.map(StaticTime.updatedVia(_, ledgerId().unwrap)(mat, esf))
.fold[TimeProvider](_ => TimeProvider.UTC, Await.result(_, 30.seconds))
}
protected def config: SandboxConfig =
SandboxConfig.default.copy(
port = Port.Dynamic,
damlPackages = packageFiles,
timeProviderType = Some(TimeProviderType.Static),
timeModel = TimeModel.reasonableDefault,
scenario = scenario,
ledgerIdMode = LedgerIdMode.Static(LedgerId("sandbox-server")),
)
protected def packageFiles: List[File] = List(darFile)
protected def scenario: Option[String] = None
protected def database: Option[ResourceOwner[String]] = None
protected def server: SandboxServer = suiteResource.value._1
protected def serverHost: String = InetAddress.getLoopbackAddress.getHostName
override protected def serverPort: Port = server.port
protected def serverPort: Port = server.port
protected def channel: Channel = suiteResource.value._2
override protected def channel: Channel = suiteResource.value._2
override protected lazy val suiteResource: Resource[(SandboxServer, Channel)] = {
implicit val ec: ExecutionContext = executionContext
implicit val ec: ExecutionContext = sandboxExecutionContext
new OwnedResource[(SandboxServer, Channel)](
for {
jdbcUrl <- database

View File

@ -24,11 +24,11 @@ import com.digitalasset.ledger.api.v1.commands.{Command, Commands, CreateCommand
import com.digitalasset.ledger.api.v1.value.Value.Sum
import com.digitalasset.ledger.api.v1.value.Value.Sum.{Bool, Party, Text, Timestamp}
import com.digitalasset.ledger.api.v1.value.{Identifier, Record, RecordField, Value, Variant}
import com.digitalasset.platform.participant.util.ValueConversions._
import com.digitalasset.platform.testing.TestTemplateIdentifiers
import com.google.protobuf.timestamp.{Timestamp => GTimestamp}
import scalaz.syntax.tag._
@SuppressWarnings(Array("org.wartremover.warts.Any"))
trait TestCommands {
protected def darFile: File
@ -43,7 +43,8 @@ trait TestCommands {
commands: Seq[Command],
let: GTimestamp = ledgerEffectiveTime,
maxRecordTime: GTimestamp = maximumRecordTime,
appId: String = applicationId) =
appId: String = applicationId,
): SubmitRequest =
M.submitRequest.update(
_.commands.commandId := commandId,
_.commands.ledgerId := ledgerId.unwrap,
@ -56,7 +57,8 @@ trait TestCommands {
protected def dummyCommands(
ledgerId: domain.LedgerId,
commandId: String,
party: String = "party") =
party: String = M.party,
): SubmitRequest =
buildRequest(
ledgerId,
commandId,
@ -67,7 +69,7 @@ trait TestCommands {
)
)
protected def createWithOperator(templateId: Identifier, party: String = "party") =
protected def createWithOperator(templateId: Identifier, party: String = M.party): Command =
Command(
Create(CreateCommand(
Some(templateId),
@ -80,7 +82,7 @@ trait TestCommands {
new String(array)
}
protected def oneKbCommand(templateId: Identifier) =
protected def oneKbCommand(templateId: Identifier): Command =
Command(
Create(
CreateCommand(
@ -94,8 +96,6 @@ trait TestCommands {
)))
)))
import com.digitalasset.platform.participant.util.ValueConversions._
private def integerListRecordLabel = "integerList"
protected def paramShowcaseArgs: Record = {
val variant = Value(Value.Sum.Variant(Variant(None, "SomeInteger", 1.asInt64)))
val nestedVariant = Vector("value" -> variant).asRecordValue
@ -111,11 +111,12 @@ trait TestCommands {
RecordField("time", Value(Timestamp(0))),
RecordField("relTime", 42.asInt64), // RelTime gets now compiled to Integer with the new primitive types
RecordField("nestedOptionalInteger", nestedVariant),
RecordField(integerListRecordLabel, integerList)
RecordField("integerList", integerList),
)
)
}
protected def paramShowcase = Commands(
protected def paramShowcase: Commands = Commands(
"ledgerId",
"workflowId",
"appId",
@ -128,18 +129,18 @@ trait TestCommands {
CreateCommand(Some(templateIds.parameterShowcase), Option(paramShowcaseArgs)))))
)
protected def oneKbCommandRequest(ledgerId: domain.LedgerId, commandId: String) =
protected def oneKbCommandRequest(ledgerId: domain.LedgerId, commandId: String): SubmitRequest =
buildRequest(ledgerId, commandId, List(oneKbCommand(templateIds.textContainer)))
protected def exerciseWithUnit(
templateId: Identifier,
contractId: String,
choice: String,
args: Option[Value] = Some(Value(Sum.Record(Record.defaultInstance)))) =
args: Option[Value] = Some(Value(Sum.Record(Record.defaultInstance)))
): Command =
Command(Exercise(ExerciseCommand(Some(templateId), contractId, choice, args)))
implicit class SubmitRequestEnhancer(request: SubmitRequest) {
def toSync: SubmitAndWaitRequest = SubmitAndWaitRequest(request.commands)
}
}

View File

@ -0,0 +1,194 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.services.reset
import java.io.File
import java.util.UUID
import com.digitalasset.daml.bazeltools.BazelRunfiles.rlocation
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.testing.utils.{
IsStatusException,
SuiteResourceManagementAroundEach,
MockMessages => M
}
import com.digitalasset.ledger.api.v1.active_contracts_service.{
ActiveContractsServiceGrpc,
GetActiveContractsRequest,
GetActiveContractsResponse
}
import com.digitalasset.ledger.api.v1.admin.party_management_service.{
AllocatePartyRequest,
PartyManagementServiceGrpc
}
import com.digitalasset.ledger.api.v1.command_completion_service.{
CommandCompletionServiceGrpc,
CompletionStreamRequest
}
import com.digitalasset.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
import com.digitalasset.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc
import com.digitalasset.ledger.api.v1.event.CreatedEvent
import com.digitalasset.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
LedgerIdentityServiceGrpc
}
import com.digitalasset.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc}
import com.digitalasset.ledger.api.v1.transaction_filter.TransactionFilter
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.AbstractSandboxFixture
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.TestCommands
import com.digitalasset.platform.testing.{StreamConsumer, WaitForCompletionsObserver}
import com.digitalasset.timer.RetryStrategy
import com.google.protobuf.empty.Empty
import io.grpc.Status
import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScalaFutures}
import org.scalatest.time.Span
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, DurationLong, FiniteDuration}
abstract class ResetServiceITBase
extends AsyncWordSpec
with AsyncTimeLimitedTests
with Matchers
with ScalaFutures
with AbstractSandboxFixture
with SuiteResourceManagementAroundEach
with TestCommands {
override def timeLimit: Span = scaled(30.seconds)
override protected def config: SandboxConfig =
super.config.copy(ledgerIdMode = LedgerIdMode.Dynamic)
protected val eventually: RetryStrategy = RetryStrategy.exponentialBackoff(10, 10.millis)
override protected def darFile: File = new File(rlocation("ledger/test-common/Test-stable.dar"))
protected def fetchLedgerId(): Future[String] =
LedgerIdentityServiceGrpc
.stub(channel)
.getLedgerIdentity(GetLedgerIdentityRequest())
.map(_.ledgerId)
// Resets and waits for a new ledger identity to be available
protected def reset(ledgerId: String): Future[String] =
for {
_ <- ResetServiceGrpc.stub(channel).reset(ResetRequest(ledgerId))
newLedgerId <- eventually { (_, _) =>
fetchLedgerId()
}
} yield newLedgerId
protected def timedReset(ledgerId: String): Future[(String, FiniteDuration)] = {
val start = System.nanoTime()
reset(ledgerId).map(_ -> (System.nanoTime() - start).nanos)
}
protected def allocateParty(hint: String): Future[String] =
PartyManagementServiceGrpc
.stub(channel)
.allocateParty(AllocatePartyRequest(hint))
.map(_.partyDetails.get.party)
protected def submitAndWait(req: SubmitAndWaitRequest): Future[Empty] =
CommandServiceGrpc.stub(channel).submitAndWait(req)
protected def activeContracts(ledgerId: String, f: TransactionFilter): Future[Set[CreatedEvent]] =
new StreamConsumer[GetActiveContractsResponse](
ActiveContractsServiceGrpc
.stub(channel)
.getActiveContracts(GetActiveContractsRequest(ledgerId, Some(f)), _))
.all()
.map(_.flatMap(_.activeContracts)(collection.breakOut))
protected def submitAndExpectCompletions(
ledgerId: String,
commands: Int,
party: String,
): Future[Unit] =
for {
_ <- Future.sequence(
Vector.fill(commands)(
CommandSubmissionServiceGrpc
.stub(channel)
.submit(dummyCommands(LedgerId(ledgerId), UUID.randomUUID.toString, party))))
unit <- WaitForCompletionsObserver(commands)(
CommandCompletionServiceGrpc
.stub(channel)
.completionStream(
CompletionStreamRequest(
ledgerId = ledgerId,
applicationId = M.applicationId,
parties = Seq(party),
offset = Some(M.ledgerBegin)
),
_))
} yield unit
"ResetService" when {
"state is reset" should {
"return a new ledger ID" in {
for {
lid1 <- fetchLedgerId()
lid2 <- reset(lid1)
throwable <- reset(lid1).failed
} yield {
lid1 should not equal lid2
IsStatusException(Status.Code.NOT_FOUND)(throwable)
}
}
"return new ledger ID - 20 resets" in {
Future
.sequence(Iterator.iterate(fetchLedgerId())(_.flatMap(reset)).take(20).toVector)
.map(ids => ids.distinct should have size 20L)
}
// 5 attempts with 5 transactions each seem to strike the right balance to complete before the
// 30 seconds test timeout in normal conditions while still causing the test to fail if
// something goes wrong.
//
// The 10 seconds timeout built into the context's ledger reset will be hit if something goes
// horribly wrong, causing an exception to report "waitForNewLedger: out of retries".
val expectedResetCompletionTime = Span.convertSpanToDuration(scaled(5.seconds))
s"consistently complete within $expectedResetCompletionTime" in {
val numberOfCommands = 5
val numberOfAttempts = 5
Future
.sequence(
Iterator
.iterate(fetchLedgerId()) { ledgerIdF =>
for {
ledgerId <- ledgerIdF
party <- allocateParty(M.party)
_ <- submitAndExpectCompletions(ledgerId, numberOfCommands, party)
(newLedgerId, completionTime) <- timedReset(ledgerId)
_ = completionTime should be <= expectedResetCompletionTime
} yield newLedgerId
}
.take(numberOfAttempts)
)
.map(_ => succeed)
}
"remove contracts from ACS after reset" in {
for {
ledgerId <- fetchLedgerId()
party <- allocateParty(M.party)
request = dummyCommands(LedgerId(ledgerId), "commandId1", party)
_ <- submitAndWait(SubmitAndWaitRequest(commands = request.commands))
events <- activeContracts(ledgerId, M.transactionFilter)
_ = events should have size 3
newLid <- reset(ledgerId)
newEvents <- activeContracts(newLid, M.transactionFilter)
} yield {
newEvents should have size 0
}
}
}
}
}

View File

@ -0,0 +1,41 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandboxnext
import com.daml.ledger.participant.state.v1.SeedService
import com.digitalasset.ledger.api.testing.utils.{OwnedResource, Resource, SuiteResource}
import com.digitalasset.platform.sandbox.AbstractSandboxFixture
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.SandboxClientResource
import com.digitalasset.ports.Port
import com.digitalasset.resources.ResourceOwner
import io.grpc.Channel
import org.scalatest.Suite
import scala.concurrent.ExecutionContext
trait SandboxNextFixture extends AbstractSandboxFixture with SuiteResource[(Port, Channel)] {
self: Suite =>
override protected def config: SandboxConfig =
super.config.copy(
seeding = Some(SeedService.Seeding.Weak),
)
override protected def serverPort: Port = suiteResource.value._1
override protected def channel: Channel = suiteResource.value._2
override protected lazy val suiteResource: Resource[(Port, Channel)] = {
implicit val ec: ExecutionContext = sandboxExecutionContext
new OwnedResource[(Port, Channel)](
for {
jdbcUrl <- database
.fold[ResourceOwner[Option[String]]](ResourceOwner.successful(None))(_.map(Some(_)))
port <- new Runner(config.copy(jdbcUrl = jdbcUrl))
channel <- SandboxClientResource.owner(port)
} yield (port, channel)
)
}
}

View File

@ -3,186 +3,22 @@
package com.digitalasset.platform.sandbox.services.reset
import java.io.File
import java.util.UUID
import com.digitalasset.platform.sandbox.services.SandboxFixture
import com.digitalasset.daml.bazeltools.BazelRunfiles.rlocation
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.testing.utils.{
IsStatusException,
SuiteResourceManagementAroundEach,
MockMessages => M
}
import com.digitalasset.ledger.api.v1.active_contracts_service.{
ActiveContractsServiceGrpc,
GetActiveContractsRequest,
GetActiveContractsResponse
}
import com.digitalasset.ledger.api.v1.command_completion_service.{
CommandCompletionServiceGrpc,
CompletionStreamRequest
}
import com.digitalasset.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest}
import com.digitalasset.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc
import com.digitalasset.ledger.api.v1.event.CreatedEvent
import com.digitalasset.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
LedgerIdentityServiceGrpc
}
import com.digitalasset.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc}
import com.digitalasset.ledger.api.v1.transaction_filter.TransactionFilter
import com.digitalasset.platform.common.LedgerIdMode
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services.{SandboxFixture, TestCommands}
import com.digitalasset.platform.testing.{StreamConsumer, WaitForCompletionsObserver}
import com.digitalasset.timer.RetryStrategy
import com.google.protobuf.empty.Empty
import io.grpc.Status
import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScalaFutures}
import org.scalatest.time.Span
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.concurrent.duration.{DurationInt, DurationLong, FiniteDuration}
import scala.concurrent.{Await, Future}
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.ref.WeakReference
final class ResetServiceIT
extends AsyncWordSpec
with AsyncTimeLimitedTests
with Matchers
with ScalaFutures
with SandboxFixture
with SuiteResourceManagementAroundEach
with TestCommands {
private val eventually = RetryStrategy.exponentialBackoff(10, 10.millis)
override def timeLimit: Span = scaled(30.seconds)
override protected val config: SandboxConfig =
super.config.copy(ledgerIdMode = LedgerIdMode.Dynamic)
override protected def darFile: File = new File(rlocation("ledger/test-common/Test-stable.dar"))
private def fetchLedgerId(): Future[String] =
LedgerIdentityServiceGrpc
.stub(channel)
.getLedgerIdentity(GetLedgerIdentityRequest())
.map(_.ledgerId)
// Resets and waits for a new ledger identity to be available
private def reset(ledgerId: String): Future[String] =
for {
_ <- ResetServiceGrpc.stub(channel).reset(ResetRequest(ledgerId))
newLedgerId <- eventually { (_, _) =>
fetchLedgerId()
}
} yield newLedgerId
private def timedReset(ledgerId: String): Future[(String, FiniteDuration)] = {
val start = System.nanoTime()
reset(ledgerId).map(_ -> (System.nanoTime() - start).nanos)
}
private def submitAndWait(req: SubmitAndWaitRequest): Future[Empty] =
CommandServiceGrpc.stub(channel).submitAndWait(req)
private def activeContracts(ledgerId: String, f: TransactionFilter): Future[Set[CreatedEvent]] =
new StreamConsumer[GetActiveContractsResponse](
ActiveContractsServiceGrpc
.stub(channel)
.getActiveContracts(GetActiveContractsRequest(ledgerId, Some(f)), _))
.all()
.map(_.flatMap(_.activeContracts)(collection.breakOut))
private def submitAndExpectCompletions(ledgerId: String, commands: Int): Future[Unit] =
for {
_ <- Future.sequence(
Vector.fill(commands)(
CommandSubmissionServiceGrpc
.stub(channel)
.submit(dummyCommands(LedgerId(ledgerId), UUID.randomUUID.toString))))
unit <- WaitForCompletionsObserver(commands)(
CommandCompletionServiceGrpc
.stub(channel)
.completionStream(
CompletionStreamRequest(
ledgerId = ledgerId,
applicationId = M.applicationId,
parties = Seq(M.party),
offset = Some(M.ledgerBegin)
),
_))
} yield unit
final class ResetServiceIT extends ResetServiceITBase with SandboxFixture {
"ResetService" when {
"state is reset" should {
"return a new ledger ID" in {
for {
lid1 <- fetchLedgerId()
lid2 <- reset(lid1)
throwable <- reset(lid1).failed
} yield {
lid1 should not equal lid2
IsStatusException(Status.Code.NOT_FOUND)(throwable)
}
}
"return new ledger ID - 20 resets" in {
Future
.sequence(Iterator.iterate(fetchLedgerId())(_.flatMap(reset)).take(20).toVector)
.map(ids => ids.distinct should have size 20L)
}
// 5 attempts with 5 transactions each seem to strike the right balance
// to complete before the 30 seconds test timeout in normal conditions while
// still causing the test to fail if something goes wrong
//
// the 10 seconds timeout built into the context's ledger reset will
// be hit if something goes horribly wrong, causing an exception to report
// waitForNewLedger: out of retries
"consistently complete within 5 seconds" in {
val numberOfCommands = 5
val numberOfAttempts = 5
Future
.sequence(
Iterator
.iterate(fetchLedgerId()) { ledgerIdF =>
for {
ledgerId <- ledgerIdF
_ <- submitAndExpectCompletions(ledgerId, numberOfCommands)
(newLedgerId, timing) <- timedReset(ledgerId)
_ = timing should be <= 5.seconds
} yield newLedgerId
}
.take(numberOfAttempts)
)
.map(_ => succeed)
}
"remove contracts from ACS after reset" in {
for {
lid <- fetchLedgerId()
req = dummyCommands(LedgerId(lid), "commandId1")
_ <- submitAndWait(SubmitAndWaitRequest(commands = req.commands))
events <- activeContracts(lid, M.transactionFilter)
_ = events.size shouldBe 3
newLid <- reset(lid)
newEvents <- activeContracts(newLid, M.transactionFilter)
} yield {
newEvents.size shouldBe 0
}
}
"clear out all garbage" in {
val state = new WeakReference(Await.result(server.sandboxState, 5.seconds))
for {
lid <- fetchLedgerId()
_ <- reset(lid)
} yield {
System.gc()
state.get should be(None)
}
"clear out all garbage" in {
val state = new WeakReference(Await.result(server.sandboxState, 5.seconds))
for {
lid <- fetchLedgerId()
_ <- reset(lid)
} yield {
System.gc()
state.get should be(None)
}
}
}

View File

@ -0,0 +1,11 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandboxnext.services.reset
import com.digitalasset.platform.sandbox.services.reset.ResetServiceITBase
import com.digitalasset.platform.sandboxnext.SandboxNextFixture
final class ResetServiceInMemoryIT extends ResetServiceITBase with SandboxNextFixture {
override def spanScaleFactor: Double = super.spanScaleFactor * 2
}

View File

@ -0,0 +1,16 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandboxnext.services.reset
import com.digitalasset.platform.sandbox.services.reset.ResetServiceITBase
import com.digitalasset.platform.sandboxnext.SandboxNextFixture
import com.digitalasset.resources.ResourceOwner
import com.digitalasset.testing.postgresql.PostgresResource
final class ResetServiceOnPostgresqlIT extends ResetServiceITBase with SandboxNextFixture {
override def spanScaleFactor: Double = super.spanScaleFactor * 4
override protected def database: Option[ResourceOwner[String]] =
Some(PostgresResource.owner().map(_.jdbcUrl))
}

View File

@ -0,0 +1,73 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.resources
import java.util.concurrent.atomic.AtomicReference
import com.digitalasset.resources.ResettableResourceOwner._
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, Future, Promise}
class ResettableResourceOwner[A, ResetValue] private (
initialValue: ResetValue,
owner: Reset => ResetValue => ResourceOwner[A],
resetOperation: A => Future[ResetValue],
) extends ResourceOwner[A] {
override def acquire()(implicit executionContext: ExecutionContext): Resource[A] =
new Resource[A] {
private val resettableOwner: ResetValue => ResourceOwner[A] = owner(reset _)
@volatile
private var resource = resettableOwner(initialValue).acquire()
private val resetPromise = new AtomicReference[Option[Promise[Unit]]](None)
override def asFuture: Future[A] =
resetPromise.get().getOrElse(Promise.successful(())).future.flatMap(_ => resource.asFuture)
override def release(): Future[Unit] =
resetPromise.get().getOrElse(Promise.successful(())).future.flatMap(_ => resource.release())
@tailrec
private def reset(): Future[Unit] = {
val currentResetPromise = resetPromise.get()
currentResetPromise match {
case None =>
val newResetPromise = Some(Promise[Unit]())
if (resetPromise.compareAndSet(None, newResetPromise)) {
for {
value <- resource.asFuture
_ <- resource.release()
resetValue <- resetOperation(value)
} yield {
resource = resettableOwner(resetValue).acquire()
newResetPromise.get.success(())
resetPromise.set(None)
}
} else {
reset()
}
case Some(currentResetPromise) =>
currentResetPromise.future
}
}
}
}
object ResettableResourceOwner {
type Reset = () => Future[Unit]
def apply[A](owner: Reset => ResourceOwner[A]) =
new ResettableResourceOwner[A, Unit](
initialValue = (),
reset => _ => owner(reset),
resetOperation = _ => Future.unit,
)
def apply[A, ResetValue](
initialValue: ResetValue,
owner: Reset => ResetValue => ResourceOwner[A],
resetOperation: A => Future[ResetValue],
) = new ResettableResourceOwner(initialValue, owner, resetOperation)
}

View File

@ -20,7 +20,7 @@ trait Resource[+A] {
/**
* Every [[Resource]] has an underlying [[Future]] representation.
*/
val asFuture: Future[A]
def asFuture: Future[A]
/**
* Every [[Resource]] can be (asynchronously) released. Releasing a resource will also release

View File

@ -0,0 +1,217 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.resources
import java.util.concurrent.atomic.AtomicInteger
import com.digitalasset.resources.ResettableResourceOwner.Reset
import org.scalatest.concurrent.AsyncTimeLimitedTests
import org.scalatest.time.Span
import org.scalatest.{AsyncWordSpec, Matchers}
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.ref.WeakReference
class ResettableResourceOwnerSpec extends AsyncWordSpec with AsyncTimeLimitedTests with Matchers {
override def timeLimit: Span = 10.seconds
"resetting a resource" should {
"reconstruct everything" in {
val acquisitionCounter = new AtomicInteger(0)
val releaseCounter = new AtomicInteger(0)
val owner = ResettableResourceOwner(reset =>
new ResourceOwner[(Reset, Int)] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[(Reset, Int)] =
Resource(Future.successful((reset, acquisitionCounter.incrementAndGet()))) { _ =>
releaseCounter.incrementAndGet()
Future.unit
}
})
withClue("before acquisition, ") {
acquisitionCounter.get() should be(0)
releaseCounter.get() should be(0)
}
val resource = owner.acquire()
for {
(reset, value) <- resource.asFuture
_ = withClue("after first acquisition, ") {
value should be(1)
acquisitionCounter.get() should be(1)
releaseCounter.get() should be(0)
}
_ <- reset()
(reset, value) <- resource.asFuture
_ = withClue("after first reset, ") {
value should be(2)
acquisitionCounter.get() should be(2)
releaseCounter.get() should be(1)
}
_ <- reset()
(_, value) <- resource.asFuture
_ = withClue("after second reset, ") {
value should be(3)
acquisitionCounter.get() should be(3)
releaseCounter.get() should be(2)
}
_ <- resource.release()
} yield {
withClue("after release, ") {
acquisitionCounter.get() should be(3)
releaseCounter.get() should be(3)
}
}
}
"avoid resetting twice concurrently" in {
val acquisitionCounter = new AtomicInteger(0)
val releaseCounter = new AtomicInteger(0)
val owner = ResettableResourceOwner(reset =>
new ResourceOwner[(Reset, Int)] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[(Reset, Int)] =
Resource(Future.successful((reset, acquisitionCounter.incrementAndGet()))) { _ =>
releaseCounter.incrementAndGet()
Future.unit
}
})
val resource = owner.acquire()
for {
(reset, _) <- resource.asFuture
_ <- Future.sequence((1 to 10).map(_ => reset()))
_ <- resource.release()
} yield {
acquisitionCounter.get() should be(2)
releaseCounter.get() should be(2)
}
}
"run an extra operation if specified" in {
val acquisitionCounter = new AtomicInteger(0)
val releaseCounter = new AtomicInteger(0)
val resetCounter = new AtomicInteger(0)
val resetOperationInputs = mutable.Buffer[Int]()
val owner = ResettableResourceOwner[(Reset, Int), Unit](
initialValue = {},
owner = reset =>
_ =>
new ResourceOwner[(Reset, Int)] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[(Reset, Int)] =
Resource(Future.successful((reset, acquisitionCounter.incrementAndGet()))) { _ =>
releaseCounter.incrementAndGet()
Future.unit
}
},
resetOperation = {
case (_, counter) =>
resetCounter.incrementAndGet()
resetOperationInputs += counter
Future.unit
}
)
val resource = owner.acquire()
withClue("after first acquisition, ") {
acquisitionCounter.get() should be(1)
releaseCounter.get() should be(0)
resetCounter.get() should be(0)
}
for {
(reset, _) <- resource.asFuture
_ <- reset()
_ = withClue("after reset, ") {
acquisitionCounter.get() should be(2)
releaseCounter.get() should be(1)
resetCounter.get() should be(1)
}
_ <- resource.release()
} yield {
acquisitionCounter.get() should be(2)
releaseCounter.get() should be(2)
resetCounter.get() should be(1)
resetOperationInputs should be(Seq(1))
}
}
"pass reset operation values through" in {
val owner = ResettableResourceOwner[(Reset, Int), Int](
initialValue = 0,
owner = reset =>
value =>
new ResourceOwner[(Reset, Int)] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[(Reset, Int)] = {
Resource.fromFuture(Future.successful((reset, value + 1)))
}
},
resetOperation = {
case (_, value) =>
Future.successful(value + 1)
}
)
val resource = owner.acquire()
for {
(reset, value) <- resource.asFuture
_ = withClue("after first acquisition, ") {
value should be(1)
}
_ <- reset()
(_, value) <- resource.asFuture
_ = withClue("after reset, ") {
value should be(3)
}
_ <- resource.release()
} yield succeed
}
"not hold on to old values" in {
var acquisitions = mutable.Buffer[WeakReference[Object]]()
val owner = ResettableResourceOwner(reset =>
new ResourceOwner[(Reset, Object)] {
override def acquire()(
implicit executionContext: ExecutionContext
): Resource[(Reset, Object)] = {
val obj = new Object
acquisitions += new WeakReference(obj)
Resource.fromFuture(Future.successful((reset, obj)))
}
})
val resource = owner.acquire()
System.gc()
acquisitions should have size 1
acquisitions.filter(_.get.isDefined) should have size 1
for {
(reset, _) <- resource.asFuture
_ <- reset()
_ = withClue("after reset, ") {
System.gc()
acquisitions should have size 2
acquisitions.filter(_.get.isDefined) should have size 1
}
_ <- resource.release()
} yield {
System.gc()
acquisitions should have size 2
acquisitions.filter(_.get.isDefined) should have size 0
}
}
}
}