From e966392cc8c22b30170ef3898806d74d410adc8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Olczak?= Date: Fri, 10 Apr 2020 12:11:15 +0200 Subject: [PATCH] Implementation of the Language Server Supervisor Process (#643) --- .github/workflows/scala.yml | 2 +- build.sbt | 18 +- .../src/main/resources/application.conf | 8 + .../enso/projectmanager/boot/MainModule.scala | 5 +- .../projectmanager/boot/configuration.scala | 20 +- .../org/enso/projectmanager/data/Socket.scala | 3 + .../enso/projectmanager/data/SocketData.scala | 3 - .../http/AkkaBasedWebSocketConnection.scala | 103 +++++++++ .../AkkaBasedWebSocketConnectionFactory.scala | 15 ++ .../infrastructure/http/FanOutReceiver.scala | 29 +++ .../http/WebSocketConnection.scala | 62 ++++++ .../http/WebSocketConnectionFactory.scala | 18 ++ .../languageserver/HeartbeatSession.scala | 179 +++++++++++++++ .../LanguageServerBootLoader.scala | 6 +- .../LanguageServerController.scala | 66 ++++-- .../LanguageServerProtocol.scala | 4 +- .../LanguageServerRegistry.scala | 19 +- .../LanguageServerRegistryProxy.scala | 4 +- .../LanguageServerService.scala | 4 +- .../LanguageServerSupervisor.scala | 190 ++++++++++++++++ .../languageserver/package.scala | 10 + .../protocol/ProjectManagementApi.scala | 4 +- .../requesthandler/ProjectOpenHandler.scala | 4 +- .../service/ProjectService.scala | 6 +- .../service/ProjectServiceApi.scala | 4 +- .../src/test/resources/logback-test.xml | 2 +- .../LanguageServerSupervisorSpec.scala | 203 ++++++++++++++++++ .../languageserver/PingMatcher.scala | 19 ++ .../ProgrammableWebSocketServer.scala | 78 +++++++ .../languageserver/StepParent.scala | 22 ++ .../protocol/BaseServerSpec.scala | 9 +- .../protocol/ProjectManagementApiSpec.scala | 6 +- doc/design/engine/engine-services.md | 31 +++ .../enso/languageserver/LanguageServer.scala | 3 + .../boot/LanguageServerComponent.scala | 58 +++-- .../boot/LifecycleComponent.scala | 56 +++++ .../capability/CapabilityRouter.scala | 4 + .../filemanager/FileManager.scala | 4 + .../monitoring/MonitoringApi.scala | 21 ++ .../monitoring/MonitoringProtocol.scala | 15 ++ .../protocol/ClientController.scala | 14 +- .../languageserver/protocol/JsonRpc.scala | 4 +- .../monitoring/PingHandler.scala | 71 ++++++ .../runtime/ContextRegistry.scala | 4 + .../languageserver/text/BufferRegistry.scala | 4 + .../monitoring/PingHandlerSpec.scala | 85 ++++++++ .../websocket/MonitoringTest.scala | 27 +++ project/build.properties | 2 +- 48 files changed, 1446 insertions(+), 82 deletions(-) create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/data/Socket.scala delete mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/data/SocketData.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnection.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnectionFactory.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/FanOutReceiver.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnection.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnectionFactory.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/HeartbeatSession.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisor.scala create mode 100644 common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/package.scala create mode 100644 common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisorSpec.scala create mode 100644 common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/PingMatcher.scala create mode 100644 common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/ProgrammableWebSocketServer.scala create mode 100644 common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/StepParent.scala create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/boot/LifecycleComponent.scala create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringApi.scala create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringProtocol.scala create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/requesthandler/monitoring/PingHandler.scala create mode 100644 engine/language-server/src/test/scala/org/enso/languageserver/requesthandler/monitoring/PingHandlerSpec.scala create mode 100644 engine/language-server/src/test/scala/org/enso/languageserver/websocket/MonitoringTest.scala diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 89610d8d079..87e25fc9df0 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -33,7 +33,7 @@ env: graalVersion: 20.0.0 javaVersion: java8 # Please ensure that this is in sync with project/build.properties - sbtVersion: 1.3.3 + sbtVersion: 1.3.5 jobs: diff --git a/build.sbt b/build.sbt index 47587398c96..8b2df2b04db 100644 --- a/build.sbt +++ b/build.sbt @@ -361,14 +361,16 @@ lazy val project_manager = (project in file("common/project-manager")) libraryDependencies ++= akka, libraryDependencies ++= circe, libraryDependencies ++= Seq( - "com.typesafe" % "config" % "1.4.0", - "com.github.pureconfig" %% "pureconfig" % "0.12.2", - "ch.qos.logback" % "logback-classic" % "1.2.3", - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", - "dev.zio" %% "zio" % "1.0.0-RC18-2", - "dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC12", - "commons-io" % "commons-io" % "2.6", - "com.beachape" %% "enumeratum-circe" % "1.5.23" + "com.typesafe" % "config" % "1.4.0", + "com.github.pureconfig" %% "pureconfig" % "0.12.2", + "ch.qos.logback" % "logback-classic" % "1.2.3", + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", + "dev.zio" %% "zio" % "1.0.0-RC18-2", + "dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC12", + "commons-io" % "commons-io" % "2.6", + "com.beachape" %% "enumeratum-circe" % "1.5.23", + "com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % Test, + "org.mockito" %% "mockito-scala" % "1.13.7" % Test ), addCompilerPlugin( "org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full diff --git a/common/project-manager/src/main/resources/application.conf b/common/project-manager/src/main/resources/application.conf index 3df967ce316..74182f330c2 100644 --- a/common/project-manager/src/main/resources/application.conf +++ b/common/project-manager/src/main/resources/application.conf @@ -21,6 +21,14 @@ project-manager { delay-between-retry = 2 second } + supervision { + initial-delay = 5 seconds + heartbeat-interval = 15 seconds + heartbeat-timeout = 10 seconds + number-of-restarts = 5 + delay-between-restarts = 2 second + } + storage { projects-root = ${user.home}/enso projects-root=${?PROJECTS_ROOT} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/boot/MainModule.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/boot/MainModule.scala index d5afbe1ed92..3f43b30c5dd 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/boot/MainModule.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/boot/MainModule.scala @@ -78,8 +78,9 @@ class MainModule[F[+_, +_]: Sync: ErrorChannel: Exec: CovariantFlatMap: Async]( lazy val languageServerController = system.actorOf( - LanguageServerRegistry.props(config.network, config.bootloader), - "language-server-controller" + LanguageServerRegistry + .props(config.network, config.bootloader, config.supervision), + "language-server-registry" ) lazy val languageServerService = new LanguageServerRegistryProxy[F]( diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/boot/configuration.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/boot/configuration.scala index 8756ddd1e61..b9c4fb4934c 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/boot/configuration.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/boot/configuration.scala @@ -16,7 +16,8 @@ object configuration { storage: StorageConfig, timeout: TimeoutConfig, network: NetworkConfig, - bootloader: BootloaderConfig + bootloader: BootloaderConfig, + supervision: SupervisionConfig ) /** @@ -72,4 +73,21 @@ object configuration { delayBetweenRetry: FiniteDuration ) + /** + * A configuration object for supervisor properties. + * + * @param initialDelay a time that the supervisor wait before starts + * monitoring + * @param heartbeatInterval an interval between heartbeat sessions + * @param heartbeatTimeout a timeout for pong reply + * @param numberOfRestarts a maximum number of restarts + * @param delayBetweenRestarts a delay between server restarts + */ + case class SupervisionConfig( + initialDelay: FiniteDuration, + heartbeatInterval: FiniteDuration, + heartbeatTimeout: FiniteDuration, + numberOfRestarts: Int, + delayBetweenRestarts: FiniteDuration + ) } diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/data/Socket.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/data/Socket.scala new file mode 100644 index 00000000000..fcc826cf394 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/data/Socket.scala @@ -0,0 +1,3 @@ +package org.enso.projectmanager.data + +case class Socket(host: String, port: Int) diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/data/SocketData.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/data/SocketData.scala deleted file mode 100644 index 00a42883cc5..00000000000 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/data/SocketData.scala +++ /dev/null @@ -1,3 +0,0 @@ -package org.enso.projectmanager.data - -case class SocketData(host: String, port: Int) diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnection.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnection.scala new file mode 100644 index 00000000000..e24c6d0efa8 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnection.scala @@ -0,0 +1,103 @@ +package org.enso.projectmanager.infrastructure.http + +import akka.NotUsed +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws._ +import akka.pattern.pipe +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.{CompletionStrategy, OverflowStrategy} +import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnection._ +import org.enso.projectmanager.infrastructure.http.FanOutReceiver.Listen +import org.enso.projectmanager.infrastructure.http.WebSocketConnection.{ + WebSocketConnected, + WebSocketMessage, + WebSocketStreamClosed, + WebSocketStreamFailure +} + +/** + * An Akka-based bidirectional web socket connection. + * + * @param address a server address + * @param system an actor system + */ +class AkkaBasedWebSocketConnection(address: String)( + implicit system: ActorSystem +) extends WebSocketConnection { + + import system.dispatcher + + private val receiver = system.actorOf(Props(new FanOutReceiver)) + + private var outboundChannel: ActorRef = _ + + private val source: Source[Message, NotUsed] = Source + .actorRef[String]( + completionMatcher, + PartialFunction.empty, + 1, + OverflowStrategy.fail + ) + .mapMaterializedValue { actorRef => + outboundChannel = actorRef + NotUsed + } + .map { txt: String => + TextMessage(txt) + } + + private def completionMatcher: PartialFunction[Any, CompletionStrategy] = { + case CloseWebSocket => CompletionStrategy.immediately + } + + private val sink: Sink[Message, NotUsed] = Flow[Message] + .map { + case TextMessage.Strict(s) => WebSocketMessage(s) + } + .to( + Sink.actorRef[WebSocketMessage]( + receiver, + WebSocketStreamClosed, + WebSocketStreamFailure(_) + ) + ) + + private val flow = Flow.fromSinkAndSource(sink, source) + + /** @inheritdoc **/ + override def attachListener(listener: ActorRef): Unit = + receiver ! Listen(listener) + + /** @inheritdoc **/ + def connect(): Unit = { + val (future, _) = + Http() + .singleWebSocketRequest( + WebSocketRequest(address), + flow + ) + future + .map { + case ValidUpgrade(_, _) => + WebSocketConnected + + case InvalidUpgradeResponse(_, cause) => + WebSocketStreamFailure(new Exception(s"Cannot connect $cause")) + } + .pipeTo(receiver) + } + + /** @inheritdoc **/ + def send(message: String): Unit = outboundChannel ! message + + /** @inheritdoc **/ + def disconnect(): Unit = outboundChannel ! CloseWebSocket + +} + +object AkkaBasedWebSocketConnection { + + private object CloseWebSocket + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnectionFactory.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnectionFactory.scala new file mode 100644 index 00000000000..e4f035704da --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/AkkaBasedWebSocketConnectionFactory.scala @@ -0,0 +1,15 @@ +package org.enso.projectmanager.infrastructure.http +import akka.actor.ActorSystem +import org.enso.projectmanager.data.Socket + +/** + * A factory of Akka-based web socket connections. + */ +class AkkaBasedWebSocketConnectionFactory(implicit system: ActorSystem) + extends WebSocketConnectionFactory { + + /** @inheritdoc **/ + override def createConnection(socket: Socket): WebSocketConnection = + new AkkaBasedWebSocketConnection(s"ws://${socket.host}:${socket.port}") + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/FanOutReceiver.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/FanOutReceiver.scala new file mode 100644 index 00000000000..93bc0053f93 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/FanOutReceiver.scala @@ -0,0 +1,29 @@ +package org.enso.projectmanager.infrastructure.http + +import akka.actor.{Actor, ActorRef} +import org.enso.projectmanager.infrastructure.http.FanOutReceiver.Listen + +/** + * A fan-out receiver that delivers messages to multiple listeners. + */ +class FanOutReceiver extends Actor { + + override def receive: Receive = running() + + private def running(listeners: Set[ActorRef] = Set.empty): Receive = { + case Listen(listener) => context.become(running(listeners + listener)) + case msg => listeners.foreach(_ ! msg) + } + +} + +object FanOutReceiver { + + /** + * An attach listener command. + * + * @param listener a listener to attach + */ + case class Listen(listener: ActorRef) + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnection.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnection.scala new file mode 100644 index 00000000000..b7492512039 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnection.scala @@ -0,0 +1,62 @@ +package org.enso.projectmanager.infrastructure.http + +import akka.actor.ActorRef + +/** + * An abstraction representing web socket connection. + */ +trait WebSocketConnection { + + /** + * Connects to the server. + */ + def connect(): Unit + + /** + * Disconnects from the server. + */ + def disconnect(): Unit + + /** + * Sends a message to the server. + * + * @param message a message to sent + */ + def send(message: String): Unit + + /** + * Attaches a listener of incoming messages. + * + * @param listener a message listener for inbound channel + */ + def attachListener(listener: ActorRef): Unit + +} + +object WebSocketConnection { + + /** + * Signals that a connection was established. + */ + case object WebSocketConnected + + /** + * An envelope for text messages. + * + * @param payload a text message + */ + case class WebSocketMessage(payload: String) + + /** + * Signals that connection was closed. + */ + case object WebSocketStreamClosed + + /** + * Signals a connection failure. + * + * @param th a throwable + */ + case class WebSocketStreamFailure(th: Throwable) + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnectionFactory.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnectionFactory.scala new file mode 100644 index 00000000000..249718f5c80 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/http/WebSocketConnectionFactory.scala @@ -0,0 +1,18 @@ +package org.enso.projectmanager.infrastructure.http + +import org.enso.projectmanager.data.Socket + +/** + * Abstract connection factory. + */ +trait WebSocketConnectionFactory { + + /** + * Creates web socket connection. + * + * @param socket a server address + * @return a connection + */ + def createConnection(socket: Socket): WebSocketConnection + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/HeartbeatSession.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/HeartbeatSession.scala new file mode 100644 index 00000000000..74da022de67 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/HeartbeatSession.scala @@ -0,0 +1,179 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import java.util.UUID + +import akka.actor.{Actor, ActorLogging, Cancellable, Props, Scheduler} +import io.circe.parser._ +import org.enso.projectmanager.data.Socket +import org.enso.projectmanager.infrastructure.http.WebSocketConnection.{ + WebSocketConnected, + WebSocketMessage, + WebSocketStreamClosed, + WebSocketStreamFailure +} +import org.enso.projectmanager.infrastructure.http.WebSocketConnectionFactory +import org.enso.projectmanager.infrastructure.languageserver.HeartbeatSession.{ + HeartbeatTimeout, + SocketClosureTimeout +} +import org.enso.projectmanager.infrastructure.languageserver.LanguageServerSupervisor.ServerUnresponsive +import org.enso.projectmanager.util.UnhandledLogging + +import scala.concurrent.duration.FiniteDuration + +/** + * Implements one ping-pong session. + * + * @param socket a server socket + * @param timeout a session timeout + * @param connectionFactory a web socket connection factory + * @param scheduler a scheduler + */ +class HeartbeatSession( + socket: Socket, + timeout: FiniteDuration, + connectionFactory: WebSocketConnectionFactory, + scheduler: Scheduler +) extends Actor + with ActorLogging + with UnhandledLogging { + + import context.dispatcher + + private val requestId = UUID.randomUUID() + + private val connection = connectionFactory.createConnection(socket) + + override def preStart(): Unit = { + connection.attachListener(self) + connection.connect() + } + + override def receive: Receive = pingStage + + private def pingStage: Receive = { + case WebSocketConnected => + log.debug(s"Sending ping message to $socket") + connection.send(s""" + |{ + | "jsonrpc": "2.0", + | "method": "heartbeat/ping", + | "id": "$requestId", + | "params": null + |} + |""".stripMargin) + val cancellable = scheduler.scheduleOnce(timeout, self, HeartbeatTimeout) + context.become(pongStage(cancellable)) + + case WebSocketStreamFailure(th) => + log.error(s"An error occurred during connecting to websocket $socket", th) + context.parent ! ServerUnresponsive + stop() + + case GracefulStop => + stop() + } + + private def pongStage(cancellable: Cancellable): Receive = { + case WebSocketMessage(payload) => + val maybeJson = + parse(payload).flatMap(_.hcursor.downField("id").as[String]) + + maybeJson match { + case Left(error) => + log.error("An error occurred during parsing pong reply", error) + + case Right(id) => + if (id == requestId.toString) { + log.debug(s"Received correct pong message from $socket") + cancellable.cancel() + connection.disconnect() + val closureTimeout = + scheduler.scheduleOnce(timeout, self, SocketClosureTimeout) + context.become(socketClosureStage(closureTimeout)) + } else { + log.warning(s"Received unknown response $payload") + } + } + + case HeartbeatTimeout => + log.debug(s"Heartbeat timeout detected for $requestId") + context.parent ! ServerUnresponsive + connection.disconnect() + val closureTimeout = + scheduler.scheduleOnce(timeout, self, SocketClosureTimeout) + context.become(socketClosureStage(closureTimeout)) + + case WebSocketStreamClosed => + context.parent ! ServerUnresponsive + context.stop(self) + + case WebSocketStreamFailure(th) => + log.error(s"An error occurred during waiting for Pong message", th) + context.parent ! ServerUnresponsive + cancellable.cancel() + connection.disconnect() + context.stop(self) + + case GracefulStop => + cancellable.cancel() + stop() + } + + private def socketClosureStage(cancellable: Cancellable): Receive = { + case WebSocketStreamClosed => + context.stop(self) + cancellable.cancel() + + case WebSocketStreamFailure(th) => + log.error(s"An error occurred during closing web socket", th) + context.stop(self) + cancellable.cancel() + + case SocketClosureTimeout => + log.error(s"Socket closure timed out") + context.stop(self) + + case GracefulStop => + //ignoring it, because the actor is already closing + } + + private def stop(): Unit = { + connection.disconnect() + val closureTimeout = + scheduler.scheduleOnce(timeout, self, SocketClosureTimeout) + context.become(socketClosureStage(closureTimeout)) + } + +} + +object HeartbeatSession { + + /** + * Signals heartbeat timeout. + */ + case object HeartbeatTimeout + + /** + * Signals socket closure timeout. + */ + case object SocketClosureTimeout + + /** + * Creates a configuration object used to create a [[LanguageServerSupervisor]]. + * + * @param socket a server socket + * @param timeout a session timeout + * @param connectionFactory a web socket connection factory + * @param scheduler a scheduler + * @return a configuration object + */ + def props( + socket: Socket, + timeout: FiniteDuration, + connectionFactory: WebSocketConnectionFactory, + scheduler: Scheduler + ): Props = + Props(new HeartbeatSession(socket, timeout, connectionFactory, scheduler)) + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerBootLoader.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerBootLoader.scala index eb226635a80..d2bf01e7936 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerBootLoader.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerBootLoader.scala @@ -8,7 +8,7 @@ import org.enso.languageserver.boot.{ LanguageServerConfig } import org.enso.projectmanager.boot.configuration.BootloaderConfig -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootLoader.{ Boot, FindFreeSocket, @@ -54,11 +54,11 @@ class LanguageServerBootLoader( ) self ! Boot context.become( - booting(SocketData(descriptor.networkConfig.interface, port), retry) + booting(Socket(descriptor.networkConfig.interface, port), retry) ) } - private def booting(socket: SocketData, retryCount: Int): Receive = { + private def booting(socket: Socket, retryCount: Int): Receive = { case Boot => log.debug("Booting a language server") val config = LanguageServerConfig( diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala index 11835fd74cc..aad8e911338 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala @@ -15,24 +15,27 @@ import akka.actor.{ Terminated } import akka.pattern.pipe -import org.enso.languageserver.boot.LanguageServerComponent.ServerStopped +import org.enso.languageserver.boot.LifecycleComponent.ComponentStopped import org.enso.languageserver.boot.{ LanguageServerComponent, LanguageServerConfig } import org.enso.projectmanager.boot.configuration.{ BootloaderConfig, - NetworkConfig + NetworkConfig, + SupervisionConfig } -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.event.ClientEvent.ClientDisconnected +import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnectionFactory import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootLoader.{ ServerBootFailed, ServerBooted } import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.{ Boot, - BootTimeout + BootTimeout, + ServerDied } import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.ServerShutDown @@ -52,13 +55,14 @@ import scala.concurrent.duration._ class LanguageServerController( project: Project, networkConfig: NetworkConfig, - bootloaderConfig: BootloaderConfig + bootloaderConfig: BootloaderConfig, + supervisionConfig: SupervisionConfig ) extends Actor with ActorLogging with Stash with UnhandledLogging { - import context.dispatcher + import context.{dispatcher, system} private val descriptor = LanguageServerDescriptor( @@ -81,7 +85,8 @@ class LanguageServerController( case Boot => val bootloader = context.actorOf( - LanguageServerBootLoader.props(descriptor, bootloaderConfig) + LanguageServerBootLoader.props(descriptor, bootloaderConfig), + "bootloader" ) context.watch(bootloader) val timeoutCancellable = @@ -108,6 +113,16 @@ class LanguageServerController( unstashAll() timeoutCancellable.cancel() context.become(supervising(config, server)) + context.actorOf( + LanguageServerSupervisor.props( + config, + server, + supervisionConfig, + new AkkaBasedWebSocketConnectionFactory(), + context.system.scheduler + ), + "supervisor" + ) case Terminated(Bootloader) => log.error(s"Bootloader for project ${project.name} failed") @@ -127,7 +142,7 @@ class LanguageServerController( ): Receive = { case StartServer(clientId, _) => sender() ! ServerStarted( - SocketData(config.interface, config.port) + Socket(config.interface, config.port) ) context.become(supervising(config, server, clients + clientId)) @@ -140,6 +155,9 @@ class LanguageServerController( case ClientDisconnected(clientId) => removeClient(config, server, clients, clientId, None) + case ServerDied => + log.error(s"Language server died [$config]") + context.stop(self) } private def removeClient( @@ -163,7 +181,6 @@ class LanguageServerController( case StartServer(_, _) => sender() ! LanguageServerProtocol.ServerBootFailed(th) stop() - } private def stopping(maybeRequester: Option[ActorRef]): Receive = { @@ -175,15 +192,28 @@ class LanguageServerController( maybeRequester.foreach(_ ! FailureDuringStoppage(th)) stop() - case ServerStopped => + case ComponentStopped => log.info(s"Language server shut down successfully [$project].") - maybeRequester.foreach(_ ! LanguageServerProtocol.ServerStopped) + maybeRequester.foreach(_ ! ServerStopped) stop() } + private def waitingForChildren(): Receive = { + case Terminated(_) => + if (context.children.isEmpty) { + context.stop(self) + } + } + private def stop(): Unit = { - context.stop(self) context.parent ! ServerShutDown(project.id) + if (context.children.isEmpty) { + context.stop(self) + } else { + context.children.foreach(_ ! GracefulStop) + context.children.foreach(context.watch) + context.become(waitingForChildren()) + } } } @@ -201,10 +231,16 @@ object LanguageServerController { def props( project: Project, networkConfig: NetworkConfig, - bootloaderConfig: BootloaderConfig + bootloaderConfig: BootloaderConfig, + supervisionConfig: SupervisionConfig ): Props = Props( - new LanguageServerController(project, networkConfig, bootloaderConfig) + new LanguageServerController( + project, + networkConfig, + bootloaderConfig, + supervisionConfig + ) ) /** @@ -217,4 +253,6 @@ object LanguageServerController { */ case object Boot + case object ServerDied + } diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerProtocol.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerProtocol.scala index f28bc54f9c5..e7a7c059fc0 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerProtocol.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerProtocol.scala @@ -2,7 +2,7 @@ package org.enso.projectmanager.infrastructure.languageserver import java.util.UUID -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.model.Project /** @@ -28,7 +28,7 @@ object LanguageServerProtocol { * * @param socket the server socket */ - case class ServerStarted(socket: SocketData) extends ServerStartupResult + case class ServerStarted(socket: Socket) extends ServerStartupResult /** * Base trait for server startup failures. diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala index 31b7a26a49b..56412e7ddaf 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala @@ -5,7 +5,8 @@ import java.util.UUID import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} import org.enso.projectmanager.boot.configuration.{ BootloaderConfig, - NetworkConfig + NetworkConfig, + SupervisionConfig } import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{ CheckIfServerIsRunning, @@ -26,7 +27,8 @@ import org.enso.projectmanager.util.UnhandledLogging */ class LanguageServerRegistry( networkConfig: NetworkConfig, - bootloaderConfig: BootloaderConfig + bootloaderConfig: BootloaderConfig, + supervisionConfig: SupervisionConfig ) extends Actor with ActorLogging with UnhandledLogging { @@ -42,7 +44,7 @@ class LanguageServerRegistry( } else { val controller = context.actorOf( LanguageServerController - .props(project, networkConfig, bootloaderConfig) + .props(project, networkConfig, bootloaderConfig, supervisionConfig) ) context.watch(controller) controller.forward(msg) @@ -87,8 +89,15 @@ object LanguageServerRegistry { */ def props( networkConfig: NetworkConfig, - bootloaderConfig: BootloaderConfig + bootloaderConfig: BootloaderConfig, + supervisionConfig: SupervisionConfig ): Props = - Props(new LanguageServerRegistry(networkConfig, bootloaderConfig)) + Props( + new LanguageServerRegistry( + networkConfig, + bootloaderConfig, + supervisionConfig + ) + ) } diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistryProxy.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistryProxy.scala index 57035058643..eb0f9fd072d 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistryProxy.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistryProxy.scala @@ -10,7 +10,7 @@ import org.enso.projectmanager.control.core.CovariantFlatMap import org.enso.projectmanager.control.core.syntax._ import org.enso.projectmanager.control.effect.syntax._ import org.enso.projectmanager.control.effect.{Async, ErrorChannel} -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._ import org.enso.projectmanager.model.Project @@ -33,7 +33,7 @@ class LanguageServerRegistryProxy[F[+_, +_]: Async: ErrorChannel: CovariantFlatM override def start( clientId: UUID, project: Project - ): F[ServerStartupFailure, SocketData] = + ): F[ServerStartupFailure, Socket] = Async[F] .fromFuture { () => (registry ? StartServer(clientId, project)).mapTo[ServerStartupResult] diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerService.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerService.scala index 4eb1c0a4ef6..5c085679b13 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerService.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerService.scala @@ -2,7 +2,7 @@ package org.enso.projectmanager.infrastructure.languageserver import java.util.UUID -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{ CheckTimeout, ServerStartupFailure, @@ -27,7 +27,7 @@ trait LanguageServerService[F[+_, +_]] { def start( clientId: UUID, project: Project - ): F[ServerStartupFailure, SocketData] + ): F[ServerStartupFailure, Socket] /** * Stops a lang. server. diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisor.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisor.scala new file mode 100644 index 00000000000..587da2ea455 --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisor.scala @@ -0,0 +1,190 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import akka.actor.Status.Failure +import akka.actor.{ + Actor, + ActorLogging, + Cancellable, + Props, + Scheduler, + Terminated +} +import akka.pattern.pipe +import org.enso.languageserver.boot.LifecycleComponent.ComponentRestarted +import org.enso.languageserver.boot.{LanguageServerConfig, LifecycleComponent} +import org.enso.projectmanager.boot.configuration.SupervisionConfig +import org.enso.projectmanager.data.Socket +import org.enso.projectmanager.infrastructure.http.WebSocketConnectionFactory +import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.ServerDied +import org.enso.projectmanager.infrastructure.languageserver.LanguageServerSupervisor.{ + RestartServer, + SendHeartbeat, + ServerUnresponsive, + StartSupervision +} +import org.enso.projectmanager.util.UnhandledLogging + +/** + * A supervisor process responsible for monitoring language server and + * restarting it when the server is unresponsive. It delegates server + * monitoring to the [[HeartbeatSession]] actor. + * + * @param config a server config + * @param server a server handle + * @param supervisionConfig a supervision config + * @param connectionFactory a web socket connection factory + * @param scheduler a scheduler + */ +class LanguageServerSupervisor( + config: LanguageServerConfig, + server: LifecycleComponent, + supervisionConfig: SupervisionConfig, + connectionFactory: WebSocketConnectionFactory, + scheduler: Scheduler +) extends Actor + with ActorLogging + with UnhandledLogging { + + import context.dispatcher + + override def preStart(): Unit = { self ! StartSupervision } + + override def receive: Receive = uninitialized + + private def uninitialized: Receive = { + case GracefulStop => + context.stop(self) + + case StartSupervision => + val cancellable = + scheduler.scheduleAtFixedRate( + supervisionConfig.initialDelay, + supervisionConfig.heartbeatInterval, + self, + SendHeartbeat + ) + context.become(supervising(cancellable)) + } + + private def supervising(cancellable: Cancellable): Receive = { + case SendHeartbeat => + val socket = Socket(config.interface, config.port) + context.actorOf( + HeartbeatSession.props( + socket, + supervisionConfig.heartbeatTimeout, + connectionFactory, + scheduler + ) + ) + + case ServerUnresponsive => + log.info(s"Server is unresponsive [$config]. Restarting it...") + cancellable.cancel() + log.info(s"Restarting first time the server") + server.restart() pipeTo self + context.become(restarting()) + + case GracefulStop => + cancellable.cancel() + stop() + } + + private def restarting(restartCount: Int = 1): Receive = { + case RestartServer => + log.info(s"Restarting $restartCount time the server") + server.restart() pipeTo self + + case Failure(th) => + log.error(s"An error occurred during restarting the server [$config]", th) + if (restartCount < supervisionConfig.numberOfRestarts) { + scheduler.scheduleOnce( + supervisionConfig.delayBetweenRestarts, + self, + RestartServer + ) + context.become(restarting(restartCount + 1)) + } else { + log.error("Cannot restart language server") + context.parent ! ServerDied + context.stop(self) + } + + case ComponentRestarted => + log.info(s"Language server restarted [$config]") + val cancellable = + scheduler.scheduleAtFixedRate( + supervisionConfig.initialDelay, + supervisionConfig.heartbeatInterval, + self, + SendHeartbeat + ) + context.become(supervising(cancellable)) + + case GracefulStop => + stop() + } + + private def waitingForChildren(): Receive = { + case Terminated(_) => + if (context.children.isEmpty) { + context.stop(self) + } + } + + private def stop(): Unit = { + if (context.children.isEmpty) { + context.stop(self) + } else { + context.children.foreach(_ ! GracefulStop) + context.children.foreach(context.watch) + context.become(waitingForChildren()) + } + } + +} + +object LanguageServerSupervisor { + + private case object StartSupervision + + private case object RestartServer + + /** + * A command responsible for initiating heartbeat session. + */ + case object SendHeartbeat + + /** + * Signals that server is unresponsive. + */ + case object ServerUnresponsive + + /** + * Creates a configuration object used to create a [[LanguageServerSupervisor]]. + * + * @param config a server config + * @param server a server handle + * @param supervisionConfig a supervision config + * @param connectionFactory a web socket connection factory + * @param scheduler a scheduler + * @return a configuration object + */ + def props( + config: LanguageServerConfig, + server: LifecycleComponent, + supervisionConfig: SupervisionConfig, + connectionFactory: WebSocketConnectionFactory, + scheduler: Scheduler + ): Props = + Props( + new LanguageServerSupervisor( + config, + server, + supervisionConfig, + connectionFactory, + scheduler + ) + ) + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/package.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/package.scala new file mode 100644 index 00000000000..77b27a702ff --- /dev/null +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/package.scala @@ -0,0 +1,10 @@ +package org.enso.projectmanager.infrastructure + +package object languageserver { + + /** + * A stop command. + */ + case object GracefulStop + +} diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala index b0358a93661..d6257ea2acc 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala @@ -3,7 +3,7 @@ package org.enso.projectmanager.protocol import java.util.UUID import org.enso.jsonrpc.{Error, HasParams, HasResult, Method, Unused} -import org.enso.projectmanager.data.{ProjectMetadata, SocketData} +import org.enso.projectmanager.data.{ProjectMetadata, Socket} /** * The project management JSON RPC API provided by the project manager. @@ -42,7 +42,7 @@ object ProjectManagementApi { case class Params(projectId: UUID) - case class Result(languageServerAddress: SocketData) + case class Result(languageServerAddress: Socket) implicit val hasParams = new HasParams[this.type] { type Params = ProjectOpen.Params diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectOpenHandler.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectOpenHandler.scala index 026500349d9..e0a9a7035be 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectOpenHandler.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectOpenHandler.scala @@ -7,7 +7,7 @@ import akka.pattern.pipe import org.enso.jsonrpc.Errors.ServiceError import org.enso.jsonrpc.{Id, Request, ResponseError, ResponseResult} import org.enso.projectmanager.control.effect.Exec -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket import org.enso.projectmanager.protocol.ProjectManagementApi.ProjectOpen import org.enso.projectmanager.requesthandler.ProjectServiceFailureMapper.mapFailure import org.enso.projectmanager.service.{ @@ -67,7 +67,7 @@ class ProjectOpenHandler[F[+_, +_]: Exec]( cancellable.cancel() context.stop(self) - case Right(socket: SocketData) => + case Right(socket: Socket) => replyTo ! ResponseResult( ProjectOpen, id, diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala index 1cbf24bad28..2805dd46f73 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala @@ -7,7 +7,7 @@ import org.enso.projectmanager.control.core.CovariantFlatMap import org.enso.projectmanager.control.core.syntax._ import org.enso.projectmanager.control.effect.ErrorChannel import org.enso.projectmanager.control.effect.syntax._ -import org.enso.projectmanager.data.{ProjectMetadata, SocketData} +import org.enso.projectmanager.data.{ProjectMetadata, Socket} import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerService import org.enso.projectmanager.infrastructure.log.Logging @@ -94,7 +94,7 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap]( override def openProject( clientId: UUID, projectId: UUID - ): F[ProjectServiceFailure, SocketData] = { + ): F[ProjectServiceFailure, Socket] = { // format: off for { _ <- log.debug(s"Opening project $projectId") @@ -110,7 +110,7 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap]( private def startServer( clientId: UUID, project: Project - ): F[ProjectServiceFailure, SocketData] = + ): F[ProjectServiceFailure, Socket] = languageServerService .start(clientId, project) .mapError { diff --git a/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala b/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala index 473fb92fcb1..5169a520562 100644 --- a/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala +++ b/common/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala @@ -2,7 +2,7 @@ package org.enso.projectmanager.service import java.util.UUID -import org.enso.projectmanager.data.{ProjectMetadata, SocketData} +import org.enso.projectmanager.data.{ProjectMetadata, Socket} /** * A contract for the Project Service. @@ -37,7 +37,7 @@ trait ProjectServiceApi[F[+_, +_]] { def openProject( clientId: UUID, projectId: UUID - ): F[ProjectServiceFailure, SocketData] + ): F[ProjectServiceFailure, Socket] /** * Closes a project. Tries to shut down the Language Server. diff --git a/common/project-manager/src/test/resources/logback-test.xml b/common/project-manager/src/test/resources/logback-test.xml index a57a5ed5625..2b48285dff3 100644 --- a/common/project-manager/src/test/resources/logback-test.xml +++ b/common/project-manager/src/test/resources/logback-test.xml @@ -8,7 +8,7 @@ - + \ No newline at end of file diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisorSpec.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisorSpec.scala new file mode 100644 index 00000000000..a836ed446bb --- /dev/null +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerSupervisorSpec.scala @@ -0,0 +1,203 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import java.util.UUID + +import akka.actor.{ActorSystem, Props} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import com.miguno.akka.testing.VirtualTime +import org.enso.languageserver.boot.LifecycleComponent.ComponentRestarted +import org.enso.languageserver.boot.{LanguageServerConfig, LifecycleComponent} +import org.enso.projectmanager.boot.configuration.SupervisionConfig +import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnectionFactory +import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.ServerDied +import org.enso.projectmanager.infrastructure.languageserver.ProgrammableWebSocketServer.{ + Reject, + ReplyWith +} +import org.enso.projectmanager.infrastructure.languageserver.StepParent.ChildTerminated +import org.enso.projectmanager.infrastructure.net.Tcp +import org.mockito.BDDMockito._ +import org.mockito.Mockito._ +import org.mockito.MockitoSugar +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.must.Matchers + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class LanguageServerSupervisorSpec + extends TestKit(ActorSystem("LanguageServerSupervisorSpec")) + with ImplicitSender + with AnyFlatSpecLike + with Matchers + with BeforeAndAfterAll + with MockitoSugar { + + "A language supervisor" should "monitor language server by sending ping requests on regular basis" in new TestCtx { + //given + val probe = TestProbe() + fakeServer.withBehaviour { + case ping @ PingMatcher(requestId) => + probe.ref ! ping + ReplyWith( + s"""{ "jsonrpc": "2.0", "id": "$requestId", "result": null }""" + ) + } + probe.expectNoMessage() + //when + virtualTime.advance(testInitialDelay) + (1 to 10).foreach { _ => + probe.expectMsgPF() { case PingMatcher(_) => () } + virtualTime.advance(testHeartbeatInterval / 2) + probe.expectNoMessage() + virtualTime.advance(testHeartbeatInterval / 2) + } + //then + `then`(serverComponent.restart()).shouldHaveNoInteractions() + //teardown + parent ! GracefulStop + parentProbe.expectMsg(ChildTerminated) + system.stop(parent) + fakeServer.stop() + } + + it should "restart server when pong message doesn't arrive on time" in new TestCtx { + //given + when(serverComponent.restart()) + .thenReturn(Future.successful(ComponentRestarted)) + val probe = TestProbe() + @volatile var pingCount = 0 + fakeServer.withBehaviour { + case ping @ PingMatcher(requestId) => + probe.ref ! ping + pingCount += 1 + if (pingCount == 5) { + Reject + } else { + ReplyWith( + s"""{ "jsonrpc": "2.0", "id": "$requestId", "result": null }""" + ) + } + } + probe.expectNoMessage() + //when + virtualTime.advance(testInitialDelay) + (1 to 4).foreach { _ => + verifyNoInteractions(serverComponent) + probe.expectMsgPF() { case PingMatcher(_) => () } + virtualTime.advance(testHeartbeatInterval / 2) + probe.expectNoMessage() + virtualTime.advance(testHeartbeatInterval / 2) + } + probe.expectMsgPF() { case PingMatcher(_) => () } + virtualTime.advance(testHeartbeatTimeout) + verify(serverComponent, timeout(VerificationTimeout).times(1)).restart() + virtualTime.advance(testInitialDelay) + (1 to 10).foreach { _ => + verifyNoMoreInteractions(serverComponent) + probe.expectMsgPF() { case PingMatcher(_) => () } + virtualTime.advance(testHeartbeatInterval / 2) + probe.expectNoMessage() + virtualTime.advance(testHeartbeatInterval / 2) + } + //teardown + parent ! GracefulStop + parentProbe.expectMsg(ChildTerminated) + system.stop(parent) + fakeServer.stop() + } + + it should "restart server limited number of times" in new TestCtx { + //given + when(serverComponent.restart()).thenReturn(Future.failed(new Exception)) + val probe = TestProbe() + fakeServer.withBehaviour { + case ping @ PingMatcher(_) => + probe.ref ! ping + Reject + } + probe.expectNoMessage() + //when + virtualTime.advance(testInitialDelay) + probe.expectMsgPF(5.seconds) { case PingMatcher(_) => () } + verifyNoInteractions(serverComponent) + virtualTime.advance(testHeartbeatTimeout) + (1 to testRestartLimit).foreach { i => + verify(serverComponent, timeout(VerificationTimeout).times(i)).restart() + //I need to wait some time to give the supervisor time to schedule next + // restart command + Thread.sleep(1000) + virtualTime.advance(testRestartDelay) + } + virtualTime.advance(testHeartbeatInterval) + probe.expectNoMessage() + verifyNoMoreInteractions(serverComponent) + //then + parentProbe.expectMsg(ServerDied) + parentProbe.expectMsg(ChildTerminated) + //teardown + system.stop(parent) + fakeServer.stop() + } + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + trait TestCtx { + + val VerificationTimeout = 120000 + + val virtualTime = new VirtualTime + + val serverComponent = mock[LifecycleComponent] + + val testHost = "127.0.0.1" + + val testPort = Tcp.findAvailablePort(testHost, 49152, 65535) + + val testInitialDelay = 5.seconds + + val testHeartbeatInterval = 10.seconds + + val testHeartbeatTimeout = 7.seconds + + val testRestartLimit = 3 + + val testRestartDelay = 2.seconds + + val fakeServer = new ProgrammableWebSocketServer(testHost, testPort) + fakeServer.start() + + val serverConfig = + LanguageServerConfig(testHost, testPort, UUID.randomUUID(), "/tmp") + + val supervisionConfig = + SupervisionConfig( + testInitialDelay, + testHeartbeatInterval, + testHeartbeatTimeout, + testRestartLimit, + testRestartDelay + ) + + val parentProbe = TestProbe() + + val parent = system.actorOf( + Props( + new StepParent( + LanguageServerSupervisor.props( + serverConfig, + serverComponent, + supervisionConfig, + new AkkaBasedWebSocketConnectionFactory(), + virtualTime.scheduler + ), + parentProbe.ref + ) + ) + ) + + } +} diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/PingMatcher.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/PingMatcher.scala new file mode 100644 index 00000000000..369c924ffc9 --- /dev/null +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/PingMatcher.scala @@ -0,0 +1,19 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import java.util.UUID +import io.circe.parser._ + +object PingMatcher { + + def unapply(arg: String): Option[UUID] = { + val maybeJson = parse(arg).toOption + + for { + json <- maybeJson + method <- json.hcursor.downField("method").as[String].toOption + if method == "heartbeat/ping" + id <- json.hcursor.downField("id").as[String].toOption + } yield UUID.fromString(id) + } + +} diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/ProgrammableWebSocketServer.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/ProgrammableWebSocketServer.scala new file mode 100644 index 00000000000..afa7e5d1e05 --- /dev/null +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/ProgrammableWebSocketServer.scala @@ -0,0 +1,78 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import akka.http.scaladsl.server.Directives.{handleWebSocketMessages, path} +import akka.stream.scaladsl.{Flow, Sink} +import org.enso.projectmanager.infrastructure.languageserver.ProgrammableWebSocketServer.{ + Behaviour, + Reject, + ReplyWith +} + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ProgrammableWebSocketServer(interface: String, port: Int)( + implicit system: ActorSystem +) { + + @volatile + private var behaviour: Behaviour = (_ => Reject) + + @volatile + private var maybeBinding: Option[Http.ServerBinding] = None + + def withBehaviour(f: Behaviour): Unit = { + this.behaviour = f + } + + private val handler: Flow[Message, Message, Any] = + Flow[Message].mapConcat { + case tm: TextMessage => + val payload = Await.result(tm.toStrict(3.seconds), 3.seconds).text + if (behaviour.isDefinedAt(payload)) { + behaviour.apply(payload) match { + case Reject => Nil + case ReplyWith(reply) => TextMessage(reply) :: Nil + } + } else { + Nil + } + + case bm: BinaryMessage => + bm.dataStream.runWith(Sink.ignore) + Nil + } + + private val websocketRoute = + path("") { + handleWebSocketMessages(handler) + } + + def start(): Unit = { + val binding = + Await.result( + Http().bindAndHandle(websocketRoute, interface, port), + 3.seconds + ) + maybeBinding = Some(binding) + } + + def stop(): Unit = + maybeBinding.foreach { binding => + Await.result(binding.unbind(), 5.seconds) + } + +} + +object ProgrammableWebSocketServer { + + type Behaviour = PartialFunction[String, Response] + + sealed trait Response + case object Reject extends Response + case class ReplyWith(reply: String) extends Response + +} diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/StepParent.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/StepParent.scala new file mode 100644 index 00000000000..200dbb5f88f --- /dev/null +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/StepParent.scala @@ -0,0 +1,22 @@ +package org.enso.projectmanager.infrastructure.languageserver + +import akka.actor.{Actor, ActorRef, Props, Terminated} +import org.enso.projectmanager.infrastructure.languageserver.StepParent.ChildTerminated + +class StepParent(childProps: Props, probe: ActorRef) extends Actor { + + val child = context.actorOf(childProps) + context.watch(child) + + override def receive: Receive = { + case Terminated(`child`) => probe ! ChildTerminated + case GracefulStop => child ! GracefulStop + case msg => probe.tell(msg, sender) + } +} + +object StepParent { + + case object ChildTerminated + +} diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/BaseServerSpec.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/BaseServerSpec.scala index 2ac481bbd54..5ba86eb469d 100644 --- a/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/BaseServerSpec.scala +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/BaseServerSpec.scala @@ -13,6 +13,7 @@ import org.enso.projectmanager.boot.configuration.{ BootloaderConfig, NetworkConfig, StorageConfig, + SupervisionConfig, TimeoutConfig } import org.enso.projectmanager.control.effect.ZioEnvExec @@ -69,6 +70,9 @@ class BaseServerSpec extends JsonRpcServerTestKit { lazy val netConfig = NetworkConfig("127.0.0.1", 40000, 60000) + lazy val supervisionConfig = + SupervisionConfig(5.seconds, 10.seconds, 5.seconds, 3, 1.seconds) + implicit val exec = new ZioEnvExec(Runtime.default) lazy val fileSystem = new BlockingFileSystem(5.seconds) @@ -92,7 +96,10 @@ class BaseServerSpec extends JsonRpcServerTestKit { lazy val projectValidator = new MonadicProjectValidator[ZIO[ZEnv, *, *]]() lazy val languageServerRegistry = - system.actorOf(LanguageServerRegistry.props(netConfig, bootloaderConfig)) + system.actorOf( + LanguageServerRegistry + .props(netConfig, bootloaderConfig, supervisionConfig) + ) lazy val languageServerService = new LanguageServerRegistryProxy[ZIO[ZEnv, +*, +*]]( diff --git a/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala b/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala index a476105df8e..f1e39a41cb4 100644 --- a/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala +++ b/common/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala @@ -6,7 +6,7 @@ import java.util.UUID import io.circe.literal._ import io.circe.parser.parse -import org.enso.projectmanager.data.SocketData +import org.enso.projectmanager.data.Socket class ProjectManagementApiSpec extends BaseServerSpec { @@ -474,7 +474,7 @@ class ProjectManagementApiSpec extends BaseServerSpec { def openProject( projectId: UUID - )(implicit client: WsTestClient): SocketData = { + )(implicit client: WsTestClient): Socket = { client.send(json""" { "jsonrpc": "2.0", "method": "project/open", @@ -490,7 +490,7 @@ class ProjectManagementApiSpec extends BaseServerSpec { .downField("languageServerAddress") val Right(host) = socketField.downField("host").as[String] val Right(port) = socketField.downField("port").as[Int] - SocketData(host, port) + Socket(host, port) } def closeProject( diff --git a/doc/design/engine/engine-services.md b/doc/design/engine/engine-services.md index c8db0b7e9cf..ced46ce91b5 100644 --- a/doc/design/engine/engine-services.md +++ b/doc/design/engine/engine-services.md @@ -109,6 +109,8 @@ services components, as well as any open questions that may remain. - [`workspace/connect`](#workspaceconnect) - [`workspace/undo`](#workspaceundo) - [`workspace/redo`](#workspaceredo) + - [Monitoring](#monitoring) + - [`heartbeat/ping`](#heartbeatping) - [Execution Management](#execution-management-1) - [Types](#types-2) - [`executionContext/create`](#executioncontextcreate) @@ -2214,6 +2216,35 @@ null ##### Errors TBC +### Monitoring +The language server also has a heartbeat operation to monitor the Language +server. This API is private and should be used only by the Project Manager. + +#### `heartbeat/ping` +This request is sent from the supervisor process to the server to check the +health of the Language Server. + +- **Type:** Request +- **Direction:** Supervisor -> Server +- **Connection:** Protocol +- **Visibility:** Private + +##### Parameters + +```typescript +null +``` + +##### Result + +```typescript +null +``` + +##### Errors +None + + ### Execution Management The execution management portion of the language server API deals with exposing fine-grained control over program and expression execution to the clients of diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/LanguageServer.scala b/engine/language-server/src/main/scala/org/enso/languageserver/LanguageServer.scala index 2aec47b3f93..e9679e3ca11 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/LanguageServer.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/LanguageServer.scala @@ -7,6 +7,7 @@ import org.enso.languageserver.event.{ ClientDisconnected, ClientEvent } +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} import org.enso.languageserver.util.UnhandledLogging object LanguageProtocol { @@ -34,6 +35,7 @@ class LanguageServer(config: Config) } override def receive: Receive = { + case Ping => sender() ! Pong case Initialize => log.debug("Language Server initialized.") unstashAll() @@ -45,6 +47,7 @@ class LanguageServer(config: Config) config: Config, env: Environment = Environment.empty ): Receive = { + case Ping => sender() ! Pong case ClientConnected(client) => log.info("Client connected [{}].", client.id) context.become( diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/LanguageServerComponent.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/LanguageServerComponent.scala index 50549517b8a..dfdd5354a5c 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/LanguageServerComponent.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/LanguageServerComponent.scala @@ -3,9 +3,10 @@ package org.enso.languageserver.boot import akka.http.scaladsl.Http import com.typesafe.scalalogging.LazyLogging import org.enso.languageserver.LanguageProtocol -import org.enso.languageserver.boot.LanguageServerComponent.{ - ServerStarted, - ServerStopped +import org.enso.languageserver.boot.LifecycleComponent.{ + ComponentRestarted, + ComponentStarted, + ComponentStopped } import scala.concurrent.Future @@ -17,19 +18,16 @@ import scala.concurrent.duration._ * @param config a LS config */ class LanguageServerComponent(config: LanguageServerConfig) - extends LazyLogging { + extends LifecycleComponent + with LazyLogging { @volatile private var maybeServerState: Option[(MainModule, Http.ServerBinding)] = None implicit private val ec = config.computeExecutionContext - /** - * Starts asynchronously a server. - * - * @return a notice that the server started successfully - */ - def start(): Future[ServerStarted.type] = { + /** @inheritdoc **/ + override def start(): Future[ComponentStarted.type] = { logger.info("Starting Language Server...") for { mainModule <- Future { new MainModule(config) } @@ -39,15 +37,11 @@ class LanguageServerComponent(config: LanguageServerConfig) _ <- Future { logger.info(s"Started server at ${config.interface}:${config.port}") } - } yield ServerStarted + } yield ComponentStarted } - /** - * Stops asynchronously a server. - * - * @return a notice that the server stopped successfully - */ - def stop(): Future[ServerStopped.type] = + /** @inheritdoc **/ + override def stop(): Future[ComponentStopped.type] = maybeServerState match { case None => Future.failed(new Exception("Server isn't running")) @@ -56,15 +50,35 @@ class LanguageServerComponent(config: LanguageServerConfig) for { _ <- binding.terminate(10.seconds) _ <- mainModule.system.terminate() - } yield ServerStopped + _ <- Future { mainModule.context.close(true) } + _ <- Future { maybeServerState = None } + } yield ComponentStopped } -} + /** @inheritdoc **/ + override def restart(): Future[ComponentRestarted.type] = + for { + _ <- forceStop() + _ <- start() + } yield ComponentRestarted -object LanguageServerComponent { + private def forceStop(): Future[Unit] = { + maybeServerState match { + case None => + Future.successful(()) - case object ServerStarted + case Some((mainModule, binding)) => + for { + _ <- binding.terminate(10.seconds).recover(logError) + _ <- mainModule.system.terminate().recover(logError) + _ <- Future { mainModule.context.close(true) }.recover(logError) + _ <- Future { maybeServerState = None } + } yield ComponentStopped + } + } - case object ServerStopped + private val logError: PartialFunction[Throwable, Unit] = { + case th => logger.error("An error occurred during stopping server", th) + } } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/LifecycleComponent.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/LifecycleComponent.scala new file mode 100644 index 00000000000..b31696c62fd --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/LifecycleComponent.scala @@ -0,0 +1,56 @@ +package org.enso.languageserver.boot + +import org.enso.languageserver.boot.LifecycleComponent.{ + ComponentRestarted, + ComponentStarted, + ComponentStopped +} + +import scala.concurrent.Future + +/** + * An abstraction for components that can be started and stopped. + */ +trait LifecycleComponent { + + /** + * Starts asynchronously a server. + * + * @return a notice that the server started successfully + */ + def start(): Future[ComponentStarted.type] + + /** + * Stops asynchronously a server. + * + * @return a notice that the server stopped successfully + */ + def stop(): Future[ComponentStopped.type] + + /** + * Restarts asynchronously a server. + * + * @return a notice that the server restarted successfully + */ + def restart(): Future[ComponentRestarted.type] + +} + +object LifecycleComponent { + + /** + * Signals that component was started. + */ + case object ComponentStarted + + /** + * Signals that component was stopped. + */ + case object ComponentStopped + + /** + * Signals that component was restarted. + */ + case object ComponentRestarted + +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/capability/CapabilityRouter.scala b/engine/language-server/src/main/scala/org/enso/languageserver/capability/CapabilityRouter.scala index 5235aea7a6c..6349cbe4e89 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/capability/CapabilityRouter.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/capability/CapabilityRouter.scala @@ -10,6 +10,7 @@ import org.enso.languageserver.data.{ CapabilityRegistration, ReceivesTreeUpdates } +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} import org.enso.languageserver.util.UnhandledLogging /** @@ -28,6 +29,9 @@ class CapabilityRouter( with UnhandledLogging { override def receive: Receive = { + case Ping => + sender() ! Pong + case msg @ AcquireCapability(_, CapabilityRegistration(CanEdit(_))) => bufferRegistry.forward(msg) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/filemanager/FileManager.scala b/engine/language-server/src/main/scala/org/enso/languageserver/filemanager/FileManager.scala index d479c2a1712..a8f0075f9e8 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/filemanager/FileManager.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/filemanager/FileManager.scala @@ -5,6 +5,7 @@ import akka.routing.SmallestMailboxPool import akka.pattern.pipe import org.enso.languageserver.effect._ import org.enso.languageserver.data.Config +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} import org.enso.languageserver.util.UnhandledLogging import zio._ @@ -27,6 +28,9 @@ class FileManager( import context.dispatcher override def receive: Receive = { + case Ping => + sender() ! Pong + case FileManagerProtocol.WriteFile(path, content) => val result = for { diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringApi.scala b/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringApi.scala new file mode 100644 index 00000000000..5305b90495a --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringApi.scala @@ -0,0 +1,21 @@ +package org.enso.languageserver.monitoring + +import org.enso.jsonrpc.{HasParams, HasResult, Method, Unused} + +/** + * The monitoring JSON RPC API provided by the language server. + * See [[https://github.com/luna/enso/blob/master/doc/design/engine/engine-services.md]] + * for message specifications. + */ +object MonitoringApi { + + case object Ping extends Method("heartbeat/ping") { + implicit val hasParams = new HasParams[this.type] { + type Params = Unused.type + } + implicit val hasResult = new HasResult[this.type] { + type Result = Unused.type + } + } + +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringProtocol.scala b/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringProtocol.scala new file mode 100644 index 00000000000..b3c7632fa05 --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/monitoring/MonitoringProtocol.scala @@ -0,0 +1,15 @@ +package org.enso.languageserver.monitoring + +object MonitoringProtocol { + + /** + * A ping command. + */ + case object Ping + + /** + * A pong reply. + */ + case object Pong + +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/ClientController.scala b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/ClientController.scala index 6dd9c27552d..d64cfb2e5c0 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/ClientController.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/ClientController.scala @@ -14,9 +14,11 @@ import org.enso.languageserver.capability.CapabilityApi.{ import org.enso.languageserver.capability.CapabilityProtocol import org.enso.languageserver.data.Client import org.enso.languageserver.event.{ClientConnected, ClientDisconnected} -import org.enso.languageserver.filemanager.PathWatcherProtocol import org.enso.languageserver.filemanager.FileManagerApi._ +import org.enso.languageserver.filemanager.PathWatcherProtocol +import org.enso.languageserver.monitoring.MonitoringApi.Ping import org.enso.languageserver.requesthandler._ +import org.enso.languageserver.requesthandler.monitoring.PingHandler import org.enso.languageserver.runtime.ExecutionApi._ import org.enso.languageserver.util.UnhandledLogging import org.enso.languageserver.text.TextApi._ @@ -55,6 +57,16 @@ class ClientController( private val requestHandlers: Map[Method, Props] = Map( + Ping -> PingHandler.props( + List( + server, + bufferRegistry, + capabilityRouter, + fileManager, + contextRegistry + ), + requestTimeout + ), AcquireCapability -> AcquireCapabilityHandler .props(capabilityRouter, requestTimeout, client), ReleaseCapability -> ReleaseCapabilityHandler diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/JsonRpc.scala b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/JsonRpc.scala index 452c7678b73..806633400ed 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/JsonRpc.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/JsonRpc.scala @@ -9,8 +9,9 @@ import org.enso.languageserver.capability.CapabilityApi.{ ReleaseCapability } import org.enso.languageserver.filemanager.FileManagerApi._ -import org.enso.languageserver.text.TextApi._ +import org.enso.languageserver.monitoring.MonitoringApi.Ping import org.enso.languageserver.runtime.ExecutionApi._ +import org.enso.languageserver.text.TextApi._ object JsonRpc { @@ -18,6 +19,7 @@ object JsonRpc { * A description of supported JSON RPC messages. */ val protocol: Protocol = Protocol.empty + .registerRequest(Ping) .registerRequest(AcquireCapability) .registerRequest(ReleaseCapability) .registerRequest(WriteFile) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/requesthandler/monitoring/PingHandler.scala b/engine/language-server/src/main/scala/org/enso/languageserver/requesthandler/monitoring/PingHandler.scala new file mode 100644 index 00000000000..a6a4687ff3c --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/requesthandler/monitoring/PingHandler.scala @@ -0,0 +1,71 @@ +package org.enso.languageserver.requesthandler.monitoring + +import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props} +import org.enso.jsonrpc.{Id, Request, ResponseResult, Unused} +import org.enso.languageserver.monitoring.MonitoringApi +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} +import org.enso.languageserver.requesthandler.RequestTimeout + +import scala.concurrent.duration.FiniteDuration + +/** + * A request handler for `heartbeat/ping` commands. + * + * @param subsystems a list of monitored subsystems + * @param timeout a request timeout + */ +class PingHandler( + subsystems: List[ActorRef], + timeout: FiniteDuration +) extends Actor + with ActorLogging { + + import context.dispatcher + + override def receive: Receive = scatter + + private def scatter: Receive = { + case Request(MonitoringApi.Ping, id, Unused) => + subsystems.foreach(_ ! Ping) + val cancellable = + context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout) + context.become(gather(id, sender(), cancellable)) + } + + private def gather( + id: Id, + replyTo: ActorRef, + cancellable: Cancellable, + count: Int = 0 + ): Receive = { + case RequestTimeout => + log.error( + s"Health check timed out. Only $count/${subsystems.size} subsystems replied on time." + ) + context.stop(self) + + case Pong => + if (count + 1 == subsystems.size) { + replyTo ! ResponseResult(MonitoringApi.Ping, id, Unused) + cancellable.cancel() + context.stop(self) + } else { + context.become(gather(id, replyTo, cancellable, count + 1)) + } + } + +} + +object PingHandler { + + /** + * Creates a configuration object used to create a [[PingHandler]] + * + * @param subsystems a list of monitored subsystems + * @param timeout a request timeout + * @return a configuration object + */ + def props(subsystems: List[ActorRef], timeout: FiniteDuration): Props = + Props(new PingHandler(subsystems, timeout)) + +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/ContextRegistry.scala b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/ContextRegistry.scala index b2ad970268b..87f8ee8335b 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/ContextRegistry.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/ContextRegistry.scala @@ -3,6 +3,7 @@ package org.enso.languageserver.runtime import java.util.UUID import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} import org.enso.languageserver.data.Config import org.enso.languageserver.filemanager.FileSystemFailure import org.enso.languageserver.runtime.ExecutionApi.ContextId @@ -52,6 +53,9 @@ final class ContextRegistry(config: Config, runtime: ActorRef) withStore(ContextRegistry.Store(Map())) private def withStore(store: ContextRegistry.Store): Receive = { + case Ping => + sender() ! Pong + case CreateContextRequest(client) => val handler = context.actorOf(CreateContextHandler.props(timeout, runtime)) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/text/BufferRegistry.scala b/engine/language-server/src/main/scala/org/enso/languageserver/text/BufferRegistry.scala index 483f2895f26..953ffc74fbe 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/text/BufferRegistry.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/text/BufferRegistry.scala @@ -13,6 +13,7 @@ import org.enso.languageserver.data.{ ContentBasedVersioning } import org.enso.languageserver.filemanager.Path +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} import org.enso.languageserver.util.UnhandledLogging import org.enso.languageserver.text.TextProtocol.{ ApplyEdit, @@ -39,6 +40,9 @@ class BufferRegistry(fileManager: ActorRef)( override def receive: Receive = running(Map.empty) private def running(registry: Map[Path, ActorRef]): Receive = { + case Ping => + sender() ! Pong + case msg @ OpenFile(_, path) => if (registry.contains(path)) { registry(path).forward(msg) diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/requesthandler/monitoring/PingHandlerSpec.scala b/engine/language-server/src/test/scala/org/enso/languageserver/requesthandler/monitoring/PingHandlerSpec.scala new file mode 100644 index 00000000000..a362c802d54 --- /dev/null +++ b/engine/language-server/src/test/scala/org/enso/languageserver/requesthandler/monitoring/PingHandlerSpec.scala @@ -0,0 +1,85 @@ +package org.enso.languageserver.requesthandler.monitoring + +import akka.actor.ActorSystem +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import org.enso.jsonrpc.Id.Number +import org.enso.jsonrpc.{Request, ResponseResult, Unused} +import org.enso.languageserver.monitoring.MonitoringApi +import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.must.Matchers + +import scala.concurrent.duration._ + +class PingHandlerSpec + extends TestKit(ActorSystem("TestSystem")) + with ImplicitSender + with AnyFlatSpecLike + with Matchers { + + "A PingHandler" must "scatter pings to all subsystems" in { + //given + val subsystem1 = TestProbe() + val subsystem2 = TestProbe() + val subsystem3 = TestProbe() + val actorUnderTest = system.actorOf( + PingHandler + .props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 10.seconds) + ) + //when + actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused) + //then + subsystem1.expectMsg(Ping) + subsystem2.expectMsg(Ping) + subsystem3.expectMsg(Ping) + //teardown + system.stop(actorUnderTest) + } + + it must "gather Pong messages and reply with success response" in { + //given + val subsystem1 = TestProbe() + val subsystem2 = TestProbe() + val subsystem3 = TestProbe() + val actorUnderTest = system.actorOf( + PingHandler + .props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 10.seconds) + ) + //when + actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused) + subsystem1.expectMsg(Ping) + subsystem1.lastSender ! Pong + subsystem2.expectMsg(Ping) + subsystem2.lastSender ! Pong + subsystem3.expectMsg(Ping) + subsystem3.lastSender ! Pong + //then + expectMsg(ResponseResult(MonitoringApi.Ping, Number(1), Unused)) + //teardown + system.stop(actorUnderTest) + } + + it must "stop without replying when some of subsystems don't reply on time" in { + //given + val subsystem1 = TestProbe() + val subsystem2 = TestProbe() + val subsystem3 = TestProbe() + val actorUnderTest = system.actorOf( + PingHandler + .props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 2.seconds) + ) + watch(actorUnderTest) + //when + actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused) + subsystem2.expectMsg(Ping) + subsystem2.lastSender ! Pong + subsystem3.expectMsg(Ping) + subsystem3.lastSender ! Pong + //then + expectTerminated(actorUnderTest) + expectNoMessage() + //teardown + system.stop(actorUnderTest) + } + +} diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/MonitoringTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/MonitoringTest.scala new file mode 100644 index 00000000000..c1744f2014e --- /dev/null +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/MonitoringTest.scala @@ -0,0 +1,27 @@ +package org.enso.languageserver.websocket +import io.circe.literal._ + +class MonitoringTest extends BaseServerTest { + + "Monitoring subsystem" must { + + "reply to ping requests" in { + val client = new WsTestClient(address) + + client.send(json""" + { "jsonrpc": "2.0", + "method": "heartbeat/ping", + "id": 1, + "params": null + } + """) + client.expectJson(json""" + { "jsonrpc": "2.0", + "id": 1, + "result": null + } + """) + } + } + +} diff --git a/project/build.properties b/project/build.properties index 6adcdc753fd..6624da70bf7 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.3 +sbt.version=1.3.5