Robust stoppage of the Language Server (#826)

This commit is contained in:
Łukasz Olczak 2020-06-10 15:51:38 +02:00 committed by GitHub
parent 765d08bc79
commit 214cf164c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 146 additions and 59 deletions

View File

@ -9,8 +9,8 @@ import org.enso.languageserver.boot.LifecycleComponent.{
ComponentStopped
}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
/**
* A lifecycle component used to start and stop a Language Server.
@ -30,7 +30,7 @@ class LanguageServerComponent(config: LanguageServerConfig)
override def start(): Future[ComponentStarted.type] = {
logger.info("Starting Language Server...")
for {
module <- Future.successful(new MainModule(config))
module <- Future { new MainModule(config) }
jsonBinding <- module.jsonRpcServer.bind(config.interface, config.rpcPort)
binaryBinding <- module.binaryServer
.bind(config.interface, config.dataPort)
@ -52,40 +52,49 @@ class LanguageServerComponent(config: LanguageServerConfig)
case None =>
Future.failed(new Exception("Server isn't running"))
case Some(serverState) =>
case Some(serverContext) =>
for {
_ <- serverState.jsonBinding.terminate(10.seconds)
_ <- serverState.binaryBinding.terminate(10.seconds)
_ <- serverState.mainModule.system.terminate()
_ <- Future { serverState.mainModule.context.close(true) }
_ <- terminateAkka(serverContext)
_ <- terminateTruffle(serverContext)
_ <- Future { maybeServerCtx = None }
} yield ComponentStopped
}
private def terminateAkka(serverContext: ServerContext): Future[Unit] = {
for {
_ <- serverContext.jsonBinding.terminate(2.seconds).recover(logError)
_ <- Future { logger.info("Terminated json connections") }
_ <- serverContext.binaryBinding.terminate(2.seconds).recover(logError)
_ <- Future { logger.info("Terminated binary connections") }
_ <- Await
.ready(
serverContext.mainModule.system.terminate().recover(logError),
2.seconds
)
.recover(logError)
_ <- Future { logger.info("Terminated actor system") }
} yield ()
}
private def terminateTruffle(serverContext: ServerContext): Future[Unit] = {
val killFiber =
Future {
serverContext.mainModule.context.close(true)
}
for {
_ <- Await.ready(killFiber, 5.seconds).recover(logError)
_ <- Future { logger.info("Terminated truffle context") }
} yield ()
}
/** @inheritdoc **/
override def restart(): Future[ComponentRestarted.type] =
for {
_ <- forceStop()
_ <- stop()
_ <- start()
} yield ComponentRestarted
private def forceStop(): Future[Unit] = {
maybeServerCtx match {
case None =>
Future.successful(())
case Some(serverState) =>
for {
_ <- serverState.jsonBinding.terminate(10.seconds).recover(logError)
_ <- serverState.binaryBinding.terminate(10.seconds).recover(logError)
_ <- serverState.mainModule.system.terminate().recover(logError)
_ <- Future { serverState.mainModule.context.close(true) }
.recover(logError)
_ <- Future { maybeServerCtx = None }
} yield ()
}
}
private val logError: PartialFunction[Throwable, Unit] = {
case th => logger.error("An error occurred during stopping server", th)
}

View File

@ -43,6 +43,7 @@ project-manager {
io-timeout = 5 seconds
request-timeout = 10 seconds
boot-timeout = 40 seconds
shutdown-timeout = 15 seconds
}
tutorials {

View File

@ -79,7 +79,12 @@ class MainModule[F[+_, +_]: Sync: ErrorChannel: Exec: CovariantFlatMap: Async](
lazy val languageServerRegistry =
system.actorOf(
LanguageServerRegistry
.props(config.network, config.bootloader, config.supervision),
.props(
config.network,
config.bootloader,
config.supervision,
config.timeout
),
"language-server-registry"
)

View File

@ -50,7 +50,8 @@ object configuration {
case class TimeoutConfig(
ioTimeout: FiniteDuration,
requestTimeout: FiniteDuration,
bootTimeout: FiniteDuration
bootTimeout: FiniteDuration,
shutdownTimeout: FiniteDuration
)
/**

View File

@ -23,7 +23,8 @@ import org.enso.languageserver.boot.{
import org.enso.projectmanager.boot.configuration.{
BootloaderConfig,
NetworkConfig,
SupervisionConfig
SupervisionConfig,
TimeoutConfig
}
import org.enso.projectmanager.data.{LanguageServerSockets, Socket}
import org.enso.projectmanager.event.ClientEvent.ClientDisconnected
@ -35,7 +36,8 @@ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootL
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.{
Boot,
BootTimeout,
ServerDied
ServerDied,
ShutdownTimeout
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.ServerShutDown
@ -51,12 +53,15 @@ import scala.concurrent.duration._
* @param project a project open by the server
* @param networkConfig a net config
* @param bootloaderConfig a bootloader config
* @param supervisionConfig a supervision config
* @param timeoutConfig a timeout config
*/
class LanguageServerController(
project: Project,
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
supervisionConfig: SupervisionConfig,
timeoutConfig: TimeoutConfig
) extends Actor
with ActorLogging
with Stash
@ -178,8 +183,12 @@ class LanguageServerController(
): Unit = {
val updatedClients = clients - clientId
if (updatedClients.isEmpty) {
context.children.foreach(_ ! GracefulStop)
server.stop() pipeTo self
context.become(stopping(maybeRequester))
val cancellable =
context.system.scheduler
.scheduleOnce(timeoutConfig.shutdownTimeout, self, ShutdownTimeout)
context.become(stopping(cancellable, maybeRequester))
} else {
sender() ! CannotDisconnectOtherClients
context.become(supervising(config, server, updatedClients))
@ -192,19 +201,32 @@ class LanguageServerController(
stop()
}
private def stopping(maybeRequester: Option[ActorRef]): Receive = {
private def stopping(
cancellable: Cancellable,
maybeRequester: Option[ActorRef]
): Receive = {
case Failure(th) =>
cancellable.cancel()
log.error(
th,
s"An error occurred during Language server shutdown [$project]."
)
maybeRequester.foreach(_ ! FailureDuringStoppage(th))
maybeRequester.foreach(_ ! FailureDuringShutdown(th))
stop()
case ComponentStopped =>
cancellable.cancel()
log.info(s"Language server shut down successfully [$project].")
maybeRequester.foreach(_ ! ServerStopped)
stop()
case ShutdownTimeout =>
log.error("Language server shutdown timed out")
maybeRequester.foreach(_ ! ServerShutdownTimedOut)
stop()
case StartServer(_, _) =>
sender() ! PreviousInstanceNotShutDown
}
private def waitingForChildren(): Receive = {
@ -235,20 +257,24 @@ object LanguageServerController {
* @param project a project open by the server
* @param networkConfig a net config
* @param bootloaderConfig a bootloader config
* @param supervisionConfig a supervision config
* @param timeoutConfig a timeout config
* @return a configuration object
*/
def props(
project: Project,
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
supervisionConfig: SupervisionConfig,
timeoutConfig: TimeoutConfig
): Props =
Props(
new LanguageServerController(
project,
networkConfig,
bootloaderConfig,
supervisionConfig
supervisionConfig,
timeoutConfig
)
)
@ -262,6 +288,11 @@ object LanguageServerController {
*/
case object Boot
/**
* Signals shutdown timeout.
*/
case object ShutdownTimeout
case object ServerDied
}

View File

@ -48,6 +48,11 @@ object LanguageServerProtocol {
*/
case object ServerBootTimedOut extends ServerStartupFailure
/**
* Signals that previous instance of the server hasn't been shut down yet.
*/
case object PreviousInstanceNotShutDown extends ServerStartupFailure
/**
* Command to stop a server.
*
@ -57,37 +62,42 @@ object LanguageServerProtocol {
case class StopServer(clientId: UUID, projectId: UUID)
/**
* Base trait for server stoppage results.
* Base trait for server shutdown results.
*/
sealed trait ServerStoppageResult
sealed trait ServerShutdownResult
/**
* Signals that server stopped successfully.
*/
case object ServerStopped extends ServerStoppageResult
case object ServerStopped extends ServerShutdownResult
/**
* Base trait for server stoppage failures.
* Base trait for server shutdown failures.
*/
sealed trait ServerStoppageFailure extends ServerStoppageResult
sealed trait ServerShutdownFailure extends ServerShutdownResult
/**
* Signals that server shutdown timed out.
*/
case object ServerShutdownTimedOut extends ServerShutdownFailure
/**
* Signals that an exception was thrown during stopping a server.
*
* @param th an exception
*/
case class FailureDuringStoppage(th: Throwable) extends ServerStoppageFailure
case class FailureDuringShutdown(th: Throwable) extends ServerShutdownFailure
/**
* Signals that server wasn't started.
*/
case object ServerNotRunning extends ServerStoppageFailure
case object ServerNotRunning extends ServerShutdownFailure
/**
* Signals that server cannot be stopped, because other clients are connected
* to the server.
*/
case object CannotDisconnectOtherClients extends ServerStoppageFailure
case object CannotDisconnectOtherClients extends ServerShutdownFailure
/**
* Request to check is server is running.

View File

@ -6,7 +6,8 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
import org.enso.projectmanager.boot.configuration.{
BootloaderConfig,
NetworkConfig,
SupervisionConfig
SupervisionConfig,
TimeoutConfig
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{
CheckIfServerIsRunning,
@ -24,11 +25,14 @@ import org.enso.projectmanager.util.UnhandledLogging
*
* @param networkConfig a net config
* @param bootloaderConfig a bootloader config
* @param supervisionConfig a supervision config
* @param timeoutConfig a timeout config
*/
class LanguageServerRegistry(
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
supervisionConfig: SupervisionConfig,
timeoutConfig: TimeoutConfig
) extends Actor
with ActorLogging
with UnhandledLogging {
@ -44,7 +48,13 @@ class LanguageServerRegistry(
} else {
val controller = context.actorOf(
LanguageServerController
.props(project, networkConfig, bootloaderConfig, supervisionConfig),
.props(
project,
networkConfig,
bootloaderConfig,
supervisionConfig,
timeoutConfig
),
s"language-server-controller-${project.id}"
)
context.watch(controller)
@ -86,18 +96,22 @@ object LanguageServerRegistry {
*
* @param networkConfig a net config
* @param bootloaderConfig a bootloader config
* @param supervisionConfig a supervision config
* @param timeoutConfig a timeout config
* @return
*/
def props(
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
supervisionConfig: SupervisionConfig,
timeoutConfig: TimeoutConfig
): Props =
Props(
new LanguageServerRegistry(
networkConfig,
bootloaderConfig,
supervisionConfig
supervisionConfig,
timeoutConfig
)
)

View File

@ -48,15 +48,15 @@ class LanguageServerRegistryProxy[F[+_, +_]: Async: ErrorChannel: CovariantFlatM
override def stop(
clientId: UUID,
projectId: UUID
): F[ServerStoppageFailure, Unit] =
): F[ServerShutdownFailure, Unit] =
Async[F]
.fromFuture { () =>
(registry ? StopServer(clientId, projectId)).mapTo[ServerStoppageResult]
(registry ? StopServer(clientId, projectId)).mapTo[ServerShutdownResult]
}
.mapError(FailureDuringStoppage)
.mapError(FailureDuringShutdown)
.flatMap {
case ServerStopped => CovariantFlatMap[F].pure(())
case f: ServerStoppageFailure => ErrorChannel[F].fail(f)
case f: ServerShutdownFailure => ErrorChannel[F].fail(f)
}
/** @inheritdoc **/

View File

@ -5,8 +5,8 @@ import java.util.UUID
import org.enso.projectmanager.data.LanguageServerSockets
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{
CheckTimeout,
ServerStartupFailure,
ServerStoppageFailure
ServerShutdownFailure,
ServerStartupFailure
}
import org.enso.projectmanager.model.Project
@ -39,7 +39,7 @@ trait LanguageServerService[F[+_, +_]] {
def stop(
clientId: UUID,
projectId: UUID
): F[ServerStoppageFailure, Unit]
): F[ServerShutdownFailure, Unit]
/**
* Checks if server is running for project.

View File

@ -16,6 +16,7 @@ import org.enso.projectmanager.service.ProjectServiceApi
import org.enso.projectmanager.util.UnhandledLogging
import scala.annotation.unused
import scala.concurrent.duration._
/**
* An actor handling communications between a single client and the project
@ -43,7 +44,11 @@ class ClientController[F[+_, +_]: Exec](
ProjectOpen -> ProjectOpenHandler
.props[F](clientId, projectService, config.bootTimeout),
ProjectClose -> ProjectCloseHandler
.props[F](clientId, projectService, config.requestTimeout),
.props[F](
clientId,
projectService,
config.shutdownTimeout.plus(1.second)
),
ProjectListRecent -> ProjectListRecentHandler
.props[F](clientId, projectService, config.requestTimeout)
)

View File

@ -114,6 +114,12 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap](
languageServerService
.start(clientId, project)
.mapError {
case PreviousInstanceNotShutDown =>
ProjectOpenFailed(
"The previous instance of the Language Server hasn't been shut " +
"down yet."
)
case ServerBootTimedOut =>
ProjectOpenFailed("Language server boot timed out")
@ -130,7 +136,10 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap](
): F[ProjectServiceFailure, Unit] = {
log.debug(s"Closing project $projectId") *>
languageServerService.stop(clientId, projectId).mapError {
case FailureDuringStoppage(th) => ProjectCloseFailed(th.getMessage)
case ServerShutdownTimedOut =>
ProjectCloseFailed("Server shutdown timed out")
case FailureDuringShutdown(th) => ProjectCloseFailed(th.getMessage)
case ServerNotRunning => ProjectNotOpen
case CannotDisconnectOtherClients => ProjectOpenByOtherPeers
}

View File

@ -43,6 +43,7 @@ project-manager {
io-timeout = 5 seconds
request-timeout = 10 seconds
boot-timeout = 30 seconds
shutdown-timeout = 10 seconds
}
tutorials {

View File

@ -66,7 +66,8 @@ class BaseServerSpec extends JsonRpcServerTestKit {
lazy val bootloaderConfig = BootloaderConfig(3, 1.second)
lazy val timeoutConfig = TimeoutConfig(3.seconds, 3.seconds, 3.seconds)
lazy val timeoutConfig =
TimeoutConfig(3.seconds, 3.seconds, 3.seconds, 5.seconds)
lazy val netConfig = NetworkConfig("127.0.0.1", 40000, 60000)
@ -98,7 +99,7 @@ class BaseServerSpec extends JsonRpcServerTestKit {
lazy val languageServerRegistry =
system.actorOf(
LanguageServerRegistry
.props(netConfig, bootloaderConfig, supervisionConfig)
.props(netConfig, bootloaderConfig, supervisionConfig, timeoutConfig)
)
lazy val languageServerService =