Re-revert) "Ledger API Server refactor" (#1109)

* Revert "Revert "Ledger API Server refactor (#1077)" (#1096)"

This reverts commit e2fa13e62b.

* fixing race conditions why resetting the ledger
This commit is contained in:
Gabor Aranyossy 2019-05-13 17:23:03 +02:00 committed by GitHub
parent e428618f03
commit 496cf5069f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 249 additions and 215 deletions

View File

@ -74,19 +74,22 @@ trait LedgerContext {
def reflectionService: ServerReflectionGrpc.ServerReflectionStub
def resetService: ResetService
/** Reset the ledger server and wait for it to start again. */
final def reset()(implicit system: ActorSystem, mat: Materializer): Future[Unit] = {
/**
* Reset the ledger server and wait for it to start again.
* @return the new ledger id
* */
final def reset()(implicit system: ActorSystem, mat: Materializer): Future[String] = {
implicit val ec: ExecutionContext = mat.executionContext
def waitForNewLedger(retries: Int): Future[Unit] =
def waitForNewLedger(retries: Int): Future[String] =
if (retries <= 0)
Future.failed(new RuntimeException("waitForNewLedger: out of retries"))
else {
ledgerIdentityService
.getLedgerIdentity(GetLedgerIdentityRequest())
.flatMap { _ =>
.flatMap { resp =>
// TODO(JM): Could check that ledger-id has changed. However,
// the tests use a static ledger-id...
Future.successful(())
Future.successful(resp.ledgerId)
}
.recoverWith {
case _: StatusRuntimeException =>
@ -101,8 +104,8 @@ trait LedgerContext {
}
for {
_ <- resetService.reset(ResetRequest(ledgerId))
_ <- waitForNewLedger(10)
} yield ()
newLedgerId <- waitForNewLedger(10)
} yield newLedgerId
}
/**

View File

@ -0,0 +1,157 @@
// Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.ledger.server.LedgerApiServer
import akka.stream.ActorMaterializer
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.engine.{Engine, EngineInfo}
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionEndRequest
import com.digitalasset.ledger.api.v1.ledger_configuration_service.LedgerConfiguration
import com.digitalasset.ledger.backend.api.v1.LedgerBackend
import com.digitalasset.ledger.client.services.commands.CommandSubmissionFlow
import com.digitalasset.platform.api.grpc.GrpcApiUtil
import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext}
import com.digitalasset.platform.sandbox.services.transaction.SandboxTransactionService
import com.digitalasset.platform.sandbox.services._
import com.digitalasset.platform.sandbox.stores.ledger.CommandExecutorImpl
import com.digitalasset.platform.server.api.validation.IdentifierResolver
import com.digitalasset.platform.server.services.command.ReferenceCommandService
import com.digitalasset.platform.server.services.identity.LedgerIdentityServiceImpl
import com.digitalasset.platform.server.services.testing.{ReferenceTimeService, TimeServiceBackend}
import com.digitalasset.platform.services.time.TimeProviderType
import io.grpc.BindableService
import io.grpc.protobuf.services.ProtoReflectionService
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
trait ApiServices extends AutoCloseable {
val services: Iterable[BindableService]
}
private class ApiServicesBundle(val services: Iterable[BindableService]) extends ApiServices {
override def close(): Unit =
services.foreach {
case closeable: AutoCloseable => closeable.close()
case _ => ()
}
}
object ApiServices {
private val logger = LoggerFactory.getLogger(this.getClass)
//TODO: we can split this into two later
//TODO: we could create and easy combinator for merging them
def create(
config: SandboxConfig,
ledgerBackend: LedgerBackend,
engine: Engine,
timeProvider: TimeProvider,
optTimeServiceBackend: Option[TimeServiceBackend])(
implicit mat: ActorMaterializer,
esf: ExecutionSequencerFactory): ApiServices = {
implicit val ec: ExecutionContext = mat.system.dispatcher
val context = SandboxContext.fromConfig(config)
val packageResolver = (pkgId: Ref.PackageId) =>
Future.successful(context.packageContainer.getPackage(pkgId))
val identifierResolver: IdentifierResolver = new IdentifierResolver(packageResolver)
val submissionService =
SandboxSubmissionService.createApiService(
context.packageContainer,
identifierResolver,
ledgerBackend,
config.timeModel,
timeProvider,
new CommandExecutorImpl(engine, context.packageContainer)
)
logger.info(EngineInfo.show)
val transactionService =
SandboxTransactionService.createApiService(ledgerBackend, identifierResolver)
val identityService = LedgerIdentityServiceImpl(ledgerBackend.ledgerId)
val packageService = SandboxPackageService(context.sandboxTemplateStore, ledgerBackend.ledgerId)
val configurationService =
LedgerConfigurationServiceImpl(
LedgerConfiguration(
Some(GrpcApiUtil.durationToProto(config.timeModel.minTtl)),
Some(GrpcApiUtil.durationToProto(config.timeModel.maxTtl))),
ledgerBackend.ledgerId
)
val completionService =
SandboxCommandCompletionService(ledgerBackend)
val commandService = ReferenceCommandService(
ReferenceCommandService.Configuration(
ledgerBackend.ledgerId,
config.commandConfig.inputBufferSize,
config.commandConfig.maxParallelSubmissions,
config.commandConfig.maxCommandsInFlight,
config.commandConfig.limitMaxCommandsInFlight,
config.commandConfig.historySize,
config.commandConfig.retentionPeriod,
config.commandConfig.commandTtl
),
// Using local services skips the gRPC layer, improving performance.
ReferenceCommandService.LowLevelCommandServiceAccess.LocalServices(
CommandSubmissionFlow(
submissionService.submit,
config.commandConfig.maxParallelSubmissions),
r =>
completionService.service
.asInstanceOf[SandboxCommandCompletionService]
.completionStreamSource(r),
() => completionService.completionEnd(CompletionEndRequest(ledgerBackend.ledgerId)),
transactionService.getTransactionById,
transactionService.getFlatTransactionById
)
)
val activeContractsService =
SandboxActiveContractsService(ledgerBackend, identifierResolver)
val reflectionService = ProtoReflectionService.newInstance()
val timeServiceOpt =
optTimeServiceBackend.map { tsb =>
ReferenceTimeService(
ledgerBackend.ledgerId,
tsb,
config.timeProviderType == TimeProviderType.StaticAllowBackwards
)
}
new ApiServicesBundle(
timeServiceOpt.toList :::
List(
identityService,
packageService,
configurationService,
submissionService,
transactionService,
completionService,
commandService,
activeContractsService,
reflectionService
)) {
override def close(): Unit = {
super.close()
ledgerBackend.close()
}
}
}
}

View File

@ -10,32 +10,20 @@ import java.util.concurrent.TimeUnit
import akka.stream.ActorMaterializer
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.engine.{Engine, EngineInfo}
import com.digitalasset.grpc.adapter.AkkaExecutionSequencerPool
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionEndRequest
import com.digitalasset.ledger.api.v1.ledger_configuration_service.LedgerConfiguration
import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.digitalasset.ledger.backend.api.v1.LedgerBackend
import com.digitalasset.ledger.client.services.commands.CommandSubmissionFlow
import com.digitalasset.platform.api.grpc.GrpcApiUtil
import com.digitalasset.platform.sandbox.config.{SandboxConfig, SandboxContext}
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.services._
import com.digitalasset.platform.sandbox.services.transaction.SandboxTransactionService
import com.digitalasset.platform.sandbox.stores.ledger._
import com.digitalasset.platform.server.api.validation.IdentifierResolver
import com.digitalasset.platform.server.services.command.ReferenceCommandService
import com.digitalasset.platform.server.services.identity.LedgerIdentityServiceImpl
import com.digitalasset.platform.server.services.testing.{ReferenceTimeService, TimeServiceBackend}
import com.digitalasset.platform.services.time.TimeProviderType
import com.digitalasset.platform.server.services.testing.TimeServiceBackend
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.{BindableService, Server}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import io.netty.util.concurrent.DefaultThreadFactory
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Future, Promise}
import scala.util.control.NoStackTrace
object LedgerApiServer {
@ -46,34 +34,33 @@ object LedgerApiServer {
config: SandboxConfig,
//even though the port is in the config as well, in case of a reset we have to keep the port to what it was originally set for the first time
serverPort: Int,
optTimeServiceBackend: Option[TimeServiceBackend],
optResetService: Option[SandboxResetService])(
timeServiceBackend: Option[TimeServiceBackend],
resetService: Option[SandboxResetService])(
implicit mat: ActorMaterializer): LedgerApiServer = {
new LedgerApiServer(
ledgerBackend,
timeProvider,
engine,
(am: ActorMaterializer, esf: ExecutionSequencerFactory) =>
ApiServices.create(config, ledgerBackend, engine, timeProvider, timeServiceBackend)(
am,
esf),
config,
serverPort,
optTimeServiceBackend,
optResetService,
timeServiceBackend,
resetService,
config.address,
config.tlsConfig.flatMap(_.server)
).start()
)
}
}
class LedgerApiServer(
ledgerBackend: LedgerBackend,
timeProvider: TimeProvider,
engine: Engine,
createApiServices: (ActorMaterializer, ExecutionSequencerFactory) => ApiServices,
config: SandboxConfig,
serverPort: Int,
optTimeServiceBackend: Option[TimeServiceBackend],
optResetService: Option[SandboxResetService],
addressOption: Option[String],
maybeBundle: Option[SslContext] = None)(implicit mat: ActorMaterializer)
timeServiceBackend: Option[TimeServiceBackend],
resetService: Option[SandboxResetService],
address: Option[String],
sslContext: Option[SslContext] = None)(implicit mat: ActorMaterializer)
extends AutoCloseable {
class UnableToBind(port: Int, cause: Throwable)
@ -83,11 +70,6 @@ class LedgerApiServer(
cause)
with NoStackTrace
private val logger = LoggerFactory.getLogger(this.getClass)
private var actualPort
: Int = -1 // we need this to remember ephemeral ports when using ResetService
def port: Int = if (actualPort == -1) serverPort else actualPort
private implicit val serverEsf = new AkkaExecutionSequencerPool(
// NOTE(JM): Pick a unique pool name as we want to allow multiple ledger api server
// instances, and it's pretty difficult to wait for the name to become available
@ -97,20 +79,26 @@ class LedgerApiServer(
poolName = s"ledger-api-server-rs-grpc-bridge-${UUID.randomUUID}",
actorCount = Runtime.getRuntime.availableProcessors() * 8
)(mat.system)
private val serverEventLoopGroup = createEventLoopGroup(mat.system.name)
private var runningServices: Iterable[BindableService] =
services(config, ledgerBackend, engine, timeProvider, optTimeServiceBackend)
private val apiServices = createApiServices(mat, serverEsf)
def getServer = server
private val logger = LoggerFactory.getLogger(this.getClass)
@volatile
private var actualPort
: Int = -1 // we need this to remember ephemeral ports when using ResetService
def port: Int = if (actualPort == -1) serverPort else actualPort
private var server: Server = _
private val grpcServer: Server = startServer()
private def start(): LedgerApiServer = {
val builder = addressOption.fold(NettyServerBuilder.forPort(port))(address =>
def getServer = grpcServer
private def startServer() = {
val builder = address.fold(NettyServerBuilder.forPort(port))(address =>
NettyServerBuilder.forAddress(new InetSocketAddress(address, port)))
maybeBundle
sslContext
.fold {
logger.info("Starting plainText server")
} { sslContext =>
@ -122,26 +110,18 @@ class LedgerApiServer(
builder.workerEventLoopGroup(serverEventLoopGroup)
builder.permitKeepAliveTime(10, TimeUnit.SECONDS)
builder.permitKeepAliveWithoutCalls(true)
server = optResetService.toList
.foldLeft(runningServices.foldLeft(builder)(_ addService _))(_ addService _)
val grpcServer = resetService.toList
.foldLeft(apiServices.services.foldLeft(builder)(_ addService _))(_ addService _)
.build
try {
server.start()
actualPort = server.getPort
grpcServer.start()
actualPort = grpcServer.getPort
} catch {
case io: IOException if io.getCause != null && io.getCause.isInstanceOf[BindException] =>
throw new UnableToBind(port, io.getCause)
}
logger.info(s"listening on ${addressOption.getOrElse("localhost")}:${server.getPort}")
this
}
def closeAllServices(): Unit = {
runningServices.foreach {
case closeable: AutoCloseable => closeable.close()
case _ => ()
}
runningServices = Nil
logger.info(s"listening on ${address.getOrElse("localhost")}:${grpcServer.getPort}")
grpcServer
}
private def createEventLoopGroup(threadPoolName: String): NioEventLoopGroup = {
@ -151,13 +131,22 @@ class LedgerApiServer(
new NioEventLoopGroup(parallelism, threadFactory)
}
private val servicesClosedP = Promise[Unit]()
/** returns when all services have been closed during the shutdown */
def servicesClosed(): Future[Unit] = servicesClosedP.future
override def close(): Unit = {
closeAllServices()
ledgerBackend.close()
Option(server).foreach { s =>
s.shutdownNow()
s.awaitTermination(10, TimeUnit.SECONDS)
server = null
apiServices.close()
servicesClosedP.success(())
grpcServer.shutdown()
if (!grpcServer.awaitTermination(10L, TimeUnit.SECONDS)) {
logger.warn(
"Server did not terminate gracefully in one second. " +
"Clients probably did not disconnect. " +
"Proceeding with forced termination.")
grpcServer.shutdownNow()
}
// `shutdownGracefully` has a "quiet period" which specifies a time window in which
// _no requests_ must be witnessed before shutdown is _initiated_. Here we want to
@ -167,107 +156,10 @@ class LedgerApiServer(
// no quiet period, this can also be 0.
// See <https://netty.io/4.1/api/io/netty/util/concurrent/EventExecutorGroup.html#shutdownGracefully-long-long-java.util.concurrent.TimeUnit->.
// The 10 seconds to wait is sort of arbitrary, it's long enough to be noticeable though.
Option(serverEventLoopGroup).foreach(
_.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).await(10L, TimeUnit.SECONDS))
Option(serverEsf).foreach(_.close())
serverEventLoopGroup
.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
.await(10L, TimeUnit.SECONDS)
serverEsf.close()
}
private def services(
config: SandboxConfig,
ledgerBackend: LedgerBackend,
engine: Engine,
timeProvider: TimeProvider,
optTimeServiceBackend: Option[TimeServiceBackend]): List[BindableService] = {
implicit val ec: ExecutionContext = mat.system.dispatcher
val context = SandboxContext.fromConfig(config)
val packageResolver = (pkgId: Ref.PackageId) =>
Future.successful(context.packageContainer.getPackage(pkgId))
val identifierResolver: IdentifierResolver = new IdentifierResolver(packageResolver)
val submissionService =
SandboxSubmissionService.createApiService(
context.packageContainer,
identifierResolver,
ledgerBackend,
config.timeModel,
timeProvider,
new CommandExecutorImpl(engine, context.packageContainer)
)
logger.info(EngineInfo.show)
val transactionService =
SandboxTransactionService.createApiService(ledgerBackend, identifierResolver)
val identityService = LedgerIdentityServiceImpl(ledgerBackend.ledgerId)
val packageService = SandboxPackageService(context.sandboxTemplateStore, ledgerBackend.ledgerId)
val configurationService =
LedgerConfigurationServiceImpl(
LedgerConfiguration(
Some(GrpcApiUtil.durationToProto(config.timeModel.minTtl)),
Some(GrpcApiUtil.durationToProto(config.timeModel.maxTtl))),
ledgerBackend.ledgerId
)
val completionService =
SandboxCommandCompletionService(ledgerBackend)
val commandService = ReferenceCommandService(
ReferenceCommandService.Configuration(
ledgerBackend.ledgerId,
config.commandConfig.inputBufferSize,
config.commandConfig.maxParallelSubmissions,
config.commandConfig.maxCommandsInFlight,
config.commandConfig.limitMaxCommandsInFlight,
config.commandConfig.historySize,
config.commandConfig.retentionPeriod,
config.commandConfig.commandTtl
),
// Using local services skips the gRPC layer, improving performance.
ReferenceCommandService.LowLevelCommandServiceAccess.LocalServices(
CommandSubmissionFlow(
submissionService.submit,
config.commandConfig.maxParallelSubmissions),
r =>
completionService.service
.asInstanceOf[SandboxCommandCompletionService]
.completionStreamSource(r),
() => completionService.completionEnd(CompletionEndRequest(ledgerBackend.ledgerId)),
transactionService.getTransactionById,
transactionService.getFlatTransactionById
)
)
val activeContractsService =
SandboxActiveContractsService(ledgerBackend, identifierResolver)
val reflectionService = ProtoReflectionService.newInstance()
val timeServiceOpt =
optTimeServiceBackend.map { tsb =>
ReferenceTimeService(
ledgerBackend.ledgerId,
tsb,
config.timeProviderType == TimeProviderType.StaticAllowBackwards
)
}
timeServiceOpt.toList :::
List(
identityService,
packageService,
configurationService,
submissionService,
transactionService,
completionService,
commandService,
activeContractsService,
reflectionService
)
}
}

View File

@ -51,15 +51,17 @@ object SandboxApplication {
/** the reset service is special, since it triggers a server shutdown */
private val resetService: SandboxResetService = new SandboxResetService(
() => ledgerId,
() => server.getServer,
() => materializer.executionContext,
() => {
stopHeartbeats()
server.closeAllServices()
},
() => {
server.close() // fully tear down the old server.
buildAndStartServer(SqlStartMode.AlwaysReset)
//need to run this async otherwise the callback kills the server under the in-flight reset service request!
Future {
server.close() // fully tear down the old server.
buildAndStartServer(SqlStartMode.AlwaysReset)
}(materializer.executionContext)
server.servicesClosed()
},
)

View File

@ -2,31 +2,29 @@
// SPDX-License-Identifier: Apache-2.0
package com.digitalasset.platform.sandbox.services
import java.util.concurrent.TimeUnit
import com.digitalasset.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc}
import com.digitalasset.platform.common.util.DirectExecutionContext
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
import com.digitalasset.platform.server.api.validation.CommandValidations
import com.google.protobuf.empty.Empty
import io.grpc.{BindableService, Server, ServerServiceDefinition, Status}
import io.grpc.{BindableService, ServerServiceDefinition}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
class SandboxResetService(
getLedgerId: () => String,
getServer: () => Server,
getEc: () => ExecutionContext,
closeAllServices: () => Unit,
resetAndStartServer: () => Unit)
// returns with a Future firing when all services have been closed!
resetAndStartServer: () => Future[Unit])
extends ResetServiceGrpc.ResetService
with BindableService {
private val logger = LoggerFactory.getLogger(this.getClass)
override def bindService(): ServerServiceDefinition =
ResetServiceGrpc.bindService(this, DirectExecutionContext)
ResetServiceGrpc.bindService(this, DE)
override def reset(request: ResetRequest): Future[Empty] = {
@ -38,39 +36,21 @@ class SandboxResetService(
// * ...but not before serving the request to the reset request itself, which we've already done.
CommandValidations
.matchLedgerId(getLedgerId())(request.ledgerId)
.fold(
Future.failed[Empty], { _ =>
Option(getServer())
.fold(Future.failed[Empty](
new ApiException(Status.ABORTED.withDescription("Server is not live"))))({ server =>
actuallyReset(server)
Future.successful(new Empty())
})
}
)
.fold(Future.failed[Empty], { _ =>
actuallyReset().map(_ => Empty())(DE)
})
}
private def actuallyReset(server: Server) = {
logger.info("Initiating server reset.")
server.shutdown()
logger.info("Closing all services...")
closeAllServices()
private def actuallyReset() = {
logger.info("Initiating server reset.")
val servicesAreDown = Promise[Unit]()
// We need to run this asynchronously since otherwise we have a deadlock: `buildAndStartServer` will block
// until all the in flight requests have been served, so we need to schedule this in another thread so that
// the code that clears the in flight request is not in an in flight request itself.
getEc().execute({ () =>
logger.info(s"Awaiting termination...")
if (!server.awaitTermination(1L, TimeUnit.SECONDS)) {
logger.warn(
"Server did not terminate gracefully in one second. " +
"Clients probably did not disconnect. " +
"Proceeding with forced termination.")
server.shutdownNow()
}
logger.info(s"Rebuilding server...")
resetAndStartServer()
logger.info(s"Server reset complete.")
logger.info(s"Stopping and starting the server.")
servicesAreDown.completeWith(resetAndStartServer())
})
servicesAreDown.future
}
}