Sandbox reset 'ledger ID not found' fix (#2331)

* Sandbox reset 'ledger ID not found' fix

* Improve Haskell Ledger bindings tests

- Add an interceptor to Ledger Sandbox rejecting calls during reset
- Wait for new Sandbox - retrying getLedgerId calls in Haskell's ResetService - retry on errors and when Ledger IDs don't change

* satisfy hlint

* backout unnecessary haskell changes

* fmt.sh
This commit is contained in:
Michał Majcherski 2019-08-06 14:42:16 +02:00 committed by nickchapman-da
parent 72a7bc63a6
commit 86bec619d0
4 changed files with 42 additions and 12 deletions

View File

@ -180,6 +180,7 @@ object LedgerContext {
ledgerIdentityService
.getLedgerIdentity(GetLedgerIdentityRequest())
.flatMap { resp =>
// TODO: compare with current Ledger ID and retry when not changed
Future.successful(domain.LedgerId(resp.ledgerId))
}
.recoverWith {

View File

@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit
import akka.stream.ActorMaterializer
import com.digitalasset.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import io.grpc.netty.NettyServerBuilder
import io.grpc.ServerInterceptor
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import io.netty.util.concurrent.DefaultThreadFactory
@ -36,7 +37,9 @@ object LedgerApiServer {
desiredPort: Int,
maxInboundMessageSize: Int,
address: Option[String],
sslContext: Option[SslContext] = None)(implicit mat: ActorMaterializer): Future[ApiServer] = {
sslContext: Option[SslContext] = None,
interceptors: List[ServerInterceptor] = List.empty)(
implicit mat: ActorMaterializer): Future[ApiServer] = {
val serverEsf = new AkkaExecutionSequencerPool(
// NOTE(JM): Pick a unique pool name as we want to allow multiple ledger api server
@ -55,7 +58,8 @@ object LedgerApiServer {
desiredPort,
maxInboundMessageSize,
address,
sslContext
sslContext,
interceptors
)
/** returns the api port the server is listening on */
@ -79,7 +83,8 @@ private class LedgerApiServer(
desiredPort: Int,
maxInboundMessageSize: Int,
address: Option[String],
sslContext: Option[SslContext] = None)(implicit mat: ActorMaterializer)
sslContext: Option[SslContext] = None,
interceptors: List[ServerInterceptor] = List.empty)(implicit mat: ActorMaterializer)
extends ApiServer {
private val logger = LoggerFactory.getLogger(this.getClass)
@ -118,6 +123,7 @@ private class LedgerApiServer(
builder.permitKeepAliveTime(10, TimeUnit.SECONDS)
builder.permitKeepAliveWithoutCalls(true)
builder.maxInboundMessageSize(maxInboundMessageSize)
interceptors.foreach(builder.intercept)
val grpcServer = apiServices.services.foldLeft(builder)(_ addService _).build
try {
grpcServer.start()

View File

@ -122,6 +122,7 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
packageStore: InMemoryPackageStore)
extends AutoCloseable {
override def close(): Unit = {
// FIXME: extra close - when closed during reset close is called on already closed service causing an exception!
apiServerState.close()
infra.close()
}
@ -130,6 +131,7 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
implicit val ec: ExecutionContext = sandboxState.infra.executionContext
val apiServicesClosed = apiServerState.apiServer.servicesClosed()
//need to run this async otherwise the callback kills the server under the in-flight reset service request!
Future {
apiServerState.close // fully tear down the old server
//TODO: eliminate the state mutation somehow
@ -148,8 +150,8 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
def port: Int = sandboxState.apiServerState.port
/** the reset service is special, since it triggers a server shutdown */
private val resetService: SandboxResetService = new SandboxResetService(
() => sandboxState.apiServerState.ledgerId,
private def resetService(ledgerId: LedgerId): SandboxResetService = new SandboxResetService(
ledgerId,
() => sandboxState.infra.executionContext,
() => sandboxState.resetAndRestartServer()
)
@ -235,12 +237,13 @@ class SandboxServer(actorSystemName: String, config: => SandboxConfig) extends A
indexAndWriteService.publishHeartbeat
))
)(am, esf)
.map(_.withServices(List(resetService))),
.map(_.withServices(List(resetService(ledgerId)))),
// NOTE(JM): Re-use the same port after reset.
Option(sandboxState).fold(config.port)(_.apiServerState.port),
config.maxInboundMessageSize,
config.address,
config.tlsConfig.flatMap(_.server)
config.tlsConfig.flatMap(_.server),
List(resetService(ledgerId))
),
asyncTolerance
)

View File

@ -8,20 +8,24 @@ import com.digitalasset.ledger.api.v1.testing.reset_service.{ResetRequest, Reset
import com.digitalasset.platform.common.util.{DirectExecutionContext => DE}
import com.digitalasset.platform.server.api.validation.ErrorFactories
import com.google.protobuf.empty.Empty
import io.grpc.{BindableService, ServerServiceDefinition}
import io.grpc._
import io.grpc.ServerCall.Listener
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future, Promise}
class SandboxResetService(
getLedgerId: () => LedgerId,
ledgerId: LedgerId,
getEc: () => ExecutionContext,
resetAndRestartServer: () => Future[Unit])
extends ResetServiceGrpc.ResetService
with BindableService {
with BindableService
with ServerInterceptor {
private val logger = LoggerFactory.getLogger(this.getClass)
private val resetInitialized = new AtomicBoolean(false)
override def bindService(): ServerServiceDefinition =
ResetServiceGrpc.bindService(this, DE)
@ -33,7 +37,6 @@ class SandboxResetService(
// * serve the response to the reset request;
// * then, close all the services so hopefully the graceful shutdown will terminate quickly...
// * ...but not before serving the request to the reset request itself, which we've already done.
val ledgerId = getLedgerId()
Either
.cond(
ledgerId == LedgerId(request.ledgerId),
@ -44,8 +47,25 @@ class SandboxResetService(
})
}
override def interceptCall[ReqT, RespT](
serverCall: ServerCall[ReqT, RespT],
metadata: Metadata,
serverCallHandler: ServerCallHandler[ReqT, RespT]): Listener[ReqT] = {
if (resetInitialized.get) {
throw new StatusRuntimeException(
Status.UNAVAILABLE.withDescription("Sandbox server is currently being resetted"))
}
serverCallHandler.startCall(serverCall, metadata)
}
private def actuallyReset() = {
logger.info("Initiating server reset.")
if (!resetInitialized.compareAndSet(false, true))
throw new StatusRuntimeException(
Status.FAILED_PRECONDITION.withDescription("Sandbox server is currently being resetted"))
val servicesAreDown = Promise[Unit]()
// We need to run this asynchronously since otherwise we have a deadlock: `buildAndStartServer` will block
// until all the in flight requests have been served, so we need to schedule this in another thread so that