Implementation of the Language Server Supervisor Process (#643)

This commit is contained in:
Łukasz Olczak 2020-04-10 12:11:15 +02:00 committed by GitHub
parent 76faaaabcd
commit e966392cc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1446 additions and 82 deletions

View File

@ -33,7 +33,7 @@ env:
graalVersion: 20.0.0
javaVersion: java8
# Please ensure that this is in sync with project/build.properties
sbtVersion: 1.3.3
sbtVersion: 1.3.5
jobs:

View File

@ -361,14 +361,16 @@ lazy val project_manager = (project in file("common/project-manager"))
libraryDependencies ++= akka,
libraryDependencies ++= circe,
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.0",
"com.github.pureconfig" %% "pureconfig" % "0.12.2",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"dev.zio" %% "zio" % "1.0.0-RC18-2",
"dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC12",
"commons-io" % "commons-io" % "2.6",
"com.beachape" %% "enumeratum-circe" % "1.5.23"
"com.typesafe" % "config" % "1.4.0",
"com.github.pureconfig" %% "pureconfig" % "0.12.2",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"dev.zio" %% "zio" % "1.0.0-RC18-2",
"dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC12",
"commons-io" % "commons-io" % "2.6",
"com.beachape" %% "enumeratum-circe" % "1.5.23",
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % Test,
"org.mockito" %% "mockito-scala" % "1.13.7" % Test
),
addCompilerPlugin(
"org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full

View File

@ -21,6 +21,14 @@ project-manager {
delay-between-retry = 2 second
}
supervision {
initial-delay = 5 seconds
heartbeat-interval = 15 seconds
heartbeat-timeout = 10 seconds
number-of-restarts = 5
delay-between-restarts = 2 second
}
storage {
projects-root = ${user.home}/enso
projects-root=${?PROJECTS_ROOT}

View File

@ -78,8 +78,9 @@ class MainModule[F[+_, +_]: Sync: ErrorChannel: Exec: CovariantFlatMap: Async](
lazy val languageServerController =
system.actorOf(
LanguageServerRegistry.props(config.network, config.bootloader),
"language-server-controller"
LanguageServerRegistry
.props(config.network, config.bootloader, config.supervision),
"language-server-registry"
)
lazy val languageServerService = new LanguageServerRegistryProxy[F](

View File

@ -16,7 +16,8 @@ object configuration {
storage: StorageConfig,
timeout: TimeoutConfig,
network: NetworkConfig,
bootloader: BootloaderConfig
bootloader: BootloaderConfig,
supervision: SupervisionConfig
)
/**
@ -72,4 +73,21 @@ object configuration {
delayBetweenRetry: FiniteDuration
)
/**
* A configuration object for supervisor properties.
*
* @param initialDelay a time that the supervisor wait before starts
* monitoring
* @param heartbeatInterval an interval between heartbeat sessions
* @param heartbeatTimeout a timeout for pong reply
* @param numberOfRestarts a maximum number of restarts
* @param delayBetweenRestarts a delay between server restarts
*/
case class SupervisionConfig(
initialDelay: FiniteDuration,
heartbeatInterval: FiniteDuration,
heartbeatTimeout: FiniteDuration,
numberOfRestarts: Int,
delayBetweenRestarts: FiniteDuration
)
}

View File

@ -0,0 +1,3 @@
package org.enso.projectmanager.data
case class Socket(host: String, port: Int)

View File

@ -1,3 +0,0 @@
package org.enso.projectmanager.data
case class SocketData(host: String, port: Int)

View File

@ -0,0 +1,103 @@
package org.enso.projectmanager.infrastructure.http
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.pattern.pipe
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{CompletionStrategy, OverflowStrategy}
import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnection._
import org.enso.projectmanager.infrastructure.http.FanOutReceiver.Listen
import org.enso.projectmanager.infrastructure.http.WebSocketConnection.{
WebSocketConnected,
WebSocketMessage,
WebSocketStreamClosed,
WebSocketStreamFailure
}
/**
* An Akka-based bidirectional web socket connection.
*
* @param address a server address
* @param system an actor system
*/
class AkkaBasedWebSocketConnection(address: String)(
implicit system: ActorSystem
) extends WebSocketConnection {
import system.dispatcher
private val receiver = system.actorOf(Props(new FanOutReceiver))
private var outboundChannel: ActorRef = _
private val source: Source[Message, NotUsed] = Source
.actorRef[String](
completionMatcher,
PartialFunction.empty,
1,
OverflowStrategy.fail
)
.mapMaterializedValue { actorRef =>
outboundChannel = actorRef
NotUsed
}
.map { txt: String =>
TextMessage(txt)
}
private def completionMatcher: PartialFunction[Any, CompletionStrategy] = {
case CloseWebSocket => CompletionStrategy.immediately
}
private val sink: Sink[Message, NotUsed] = Flow[Message]
.map {
case TextMessage.Strict(s) => WebSocketMessage(s)
}
.to(
Sink.actorRef[WebSocketMessage](
receiver,
WebSocketStreamClosed,
WebSocketStreamFailure(_)
)
)
private val flow = Flow.fromSinkAndSource(sink, source)
/** @inheritdoc **/
override def attachListener(listener: ActorRef): Unit =
receiver ! Listen(listener)
/** @inheritdoc **/
def connect(): Unit = {
val (future, _) =
Http()
.singleWebSocketRequest(
WebSocketRequest(address),
flow
)
future
.map {
case ValidUpgrade(_, _) =>
WebSocketConnected
case InvalidUpgradeResponse(_, cause) =>
WebSocketStreamFailure(new Exception(s"Cannot connect $cause"))
}
.pipeTo(receiver)
}
/** @inheritdoc **/
def send(message: String): Unit = outboundChannel ! message
/** @inheritdoc **/
def disconnect(): Unit = outboundChannel ! CloseWebSocket
}
object AkkaBasedWebSocketConnection {
private object CloseWebSocket
}

View File

@ -0,0 +1,15 @@
package org.enso.projectmanager.infrastructure.http
import akka.actor.ActorSystem
import org.enso.projectmanager.data.Socket
/**
* A factory of Akka-based web socket connections.
*/
class AkkaBasedWebSocketConnectionFactory(implicit system: ActorSystem)
extends WebSocketConnectionFactory {
/** @inheritdoc **/
override def createConnection(socket: Socket): WebSocketConnection =
new AkkaBasedWebSocketConnection(s"ws://${socket.host}:${socket.port}")
}

View File

@ -0,0 +1,29 @@
package org.enso.projectmanager.infrastructure.http
import akka.actor.{Actor, ActorRef}
import org.enso.projectmanager.infrastructure.http.FanOutReceiver.Listen
/**
* A fan-out receiver that delivers messages to multiple listeners.
*/
class FanOutReceiver extends Actor {
override def receive: Receive = running()
private def running(listeners: Set[ActorRef] = Set.empty): Receive = {
case Listen(listener) => context.become(running(listeners + listener))
case msg => listeners.foreach(_ ! msg)
}
}
object FanOutReceiver {
/**
* An attach listener command.
*
* @param listener a listener to attach
*/
case class Listen(listener: ActorRef)
}

View File

@ -0,0 +1,62 @@
package org.enso.projectmanager.infrastructure.http
import akka.actor.ActorRef
/**
* An abstraction representing web socket connection.
*/
trait WebSocketConnection {
/**
* Connects to the server.
*/
def connect(): Unit
/**
* Disconnects from the server.
*/
def disconnect(): Unit
/**
* Sends a message to the server.
*
* @param message a message to sent
*/
def send(message: String): Unit
/**
* Attaches a listener of incoming messages.
*
* @param listener a message listener for inbound channel
*/
def attachListener(listener: ActorRef): Unit
}
object WebSocketConnection {
/**
* Signals that a connection was established.
*/
case object WebSocketConnected
/**
* An envelope for text messages.
*
* @param payload a text message
*/
case class WebSocketMessage(payload: String)
/**
* Signals that connection was closed.
*/
case object WebSocketStreamClosed
/**
* Signals a connection failure.
*
* @param th a throwable
*/
case class WebSocketStreamFailure(th: Throwable)
}

View File

@ -0,0 +1,18 @@
package org.enso.projectmanager.infrastructure.http
import org.enso.projectmanager.data.Socket
/**
* Abstract connection factory.
*/
trait WebSocketConnectionFactory {
/**
* Creates web socket connection.
*
* @param socket a server address
* @return a connection
*/
def createConnection(socket: Socket): WebSocketConnection
}

View File

@ -0,0 +1,179 @@
package org.enso.projectmanager.infrastructure.languageserver
import java.util.UUID
import akka.actor.{Actor, ActorLogging, Cancellable, Props, Scheduler}
import io.circe.parser._
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.infrastructure.http.WebSocketConnection.{
WebSocketConnected,
WebSocketMessage,
WebSocketStreamClosed,
WebSocketStreamFailure
}
import org.enso.projectmanager.infrastructure.http.WebSocketConnectionFactory
import org.enso.projectmanager.infrastructure.languageserver.HeartbeatSession.{
HeartbeatTimeout,
SocketClosureTimeout
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerSupervisor.ServerUnresponsive
import org.enso.projectmanager.util.UnhandledLogging
import scala.concurrent.duration.FiniteDuration
/**
* Implements one ping-pong session.
*
* @param socket a server socket
* @param timeout a session timeout
* @param connectionFactory a web socket connection factory
* @param scheduler a scheduler
*/
class HeartbeatSession(
socket: Socket,
timeout: FiniteDuration,
connectionFactory: WebSocketConnectionFactory,
scheduler: Scheduler
) extends Actor
with ActorLogging
with UnhandledLogging {
import context.dispatcher
private val requestId = UUID.randomUUID()
private val connection = connectionFactory.createConnection(socket)
override def preStart(): Unit = {
connection.attachListener(self)
connection.connect()
}
override def receive: Receive = pingStage
private def pingStage: Receive = {
case WebSocketConnected =>
log.debug(s"Sending ping message to $socket")
connection.send(s"""
|{
| "jsonrpc": "2.0",
| "method": "heartbeat/ping",
| "id": "$requestId",
| "params": null
|}
|""".stripMargin)
val cancellable = scheduler.scheduleOnce(timeout, self, HeartbeatTimeout)
context.become(pongStage(cancellable))
case WebSocketStreamFailure(th) =>
log.error(s"An error occurred during connecting to websocket $socket", th)
context.parent ! ServerUnresponsive
stop()
case GracefulStop =>
stop()
}
private def pongStage(cancellable: Cancellable): Receive = {
case WebSocketMessage(payload) =>
val maybeJson =
parse(payload).flatMap(_.hcursor.downField("id").as[String])
maybeJson match {
case Left(error) =>
log.error("An error occurred during parsing pong reply", error)
case Right(id) =>
if (id == requestId.toString) {
log.debug(s"Received correct pong message from $socket")
cancellable.cancel()
connection.disconnect()
val closureTimeout =
scheduler.scheduleOnce(timeout, self, SocketClosureTimeout)
context.become(socketClosureStage(closureTimeout))
} else {
log.warning(s"Received unknown response $payload")
}
}
case HeartbeatTimeout =>
log.debug(s"Heartbeat timeout detected for $requestId")
context.parent ! ServerUnresponsive
connection.disconnect()
val closureTimeout =
scheduler.scheduleOnce(timeout, self, SocketClosureTimeout)
context.become(socketClosureStage(closureTimeout))
case WebSocketStreamClosed =>
context.parent ! ServerUnresponsive
context.stop(self)
case WebSocketStreamFailure(th) =>
log.error(s"An error occurred during waiting for Pong message", th)
context.parent ! ServerUnresponsive
cancellable.cancel()
connection.disconnect()
context.stop(self)
case GracefulStop =>
cancellable.cancel()
stop()
}
private def socketClosureStage(cancellable: Cancellable): Receive = {
case WebSocketStreamClosed =>
context.stop(self)
cancellable.cancel()
case WebSocketStreamFailure(th) =>
log.error(s"An error occurred during closing web socket", th)
context.stop(self)
cancellable.cancel()
case SocketClosureTimeout =>
log.error(s"Socket closure timed out")
context.stop(self)
case GracefulStop =>
//ignoring it, because the actor is already closing
}
private def stop(): Unit = {
connection.disconnect()
val closureTimeout =
scheduler.scheduleOnce(timeout, self, SocketClosureTimeout)
context.become(socketClosureStage(closureTimeout))
}
}
object HeartbeatSession {
/**
* Signals heartbeat timeout.
*/
case object HeartbeatTimeout
/**
* Signals socket closure timeout.
*/
case object SocketClosureTimeout
/**
* Creates a configuration object used to create a [[LanguageServerSupervisor]].
*
* @param socket a server socket
* @param timeout a session timeout
* @param connectionFactory a web socket connection factory
* @param scheduler a scheduler
* @return a configuration object
*/
def props(
socket: Socket,
timeout: FiniteDuration,
connectionFactory: WebSocketConnectionFactory,
scheduler: Scheduler
): Props =
Props(new HeartbeatSession(socket, timeout, connectionFactory, scheduler))
}

View File

@ -8,7 +8,7 @@ import org.enso.languageserver.boot.{
LanguageServerConfig
}
import org.enso.projectmanager.boot.configuration.BootloaderConfig
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootLoader.{
Boot,
FindFreeSocket,
@ -54,11 +54,11 @@ class LanguageServerBootLoader(
)
self ! Boot
context.become(
booting(SocketData(descriptor.networkConfig.interface, port), retry)
booting(Socket(descriptor.networkConfig.interface, port), retry)
)
}
private def booting(socket: SocketData, retryCount: Int): Receive = {
private def booting(socket: Socket, retryCount: Int): Receive = {
case Boot =>
log.debug("Booting a language server")
val config = LanguageServerConfig(

View File

@ -15,24 +15,27 @@ import akka.actor.{
Terminated
}
import akka.pattern.pipe
import org.enso.languageserver.boot.LanguageServerComponent.ServerStopped
import org.enso.languageserver.boot.LifecycleComponent.ComponentStopped
import org.enso.languageserver.boot.{
LanguageServerComponent,
LanguageServerConfig
}
import org.enso.projectmanager.boot.configuration.{
BootloaderConfig,
NetworkConfig
NetworkConfig,
SupervisionConfig
}
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.event.ClientEvent.ClientDisconnected
import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnectionFactory
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootLoader.{
ServerBootFailed,
ServerBooted
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.{
Boot,
BootTimeout
BootTimeout,
ServerDied
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.ServerShutDown
@ -52,13 +55,14 @@ import scala.concurrent.duration._
class LanguageServerController(
project: Project,
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
) extends Actor
with ActorLogging
with Stash
with UnhandledLogging {
import context.dispatcher
import context.{dispatcher, system}
private val descriptor =
LanguageServerDescriptor(
@ -81,7 +85,8 @@ class LanguageServerController(
case Boot =>
val bootloader =
context.actorOf(
LanguageServerBootLoader.props(descriptor, bootloaderConfig)
LanguageServerBootLoader.props(descriptor, bootloaderConfig),
"bootloader"
)
context.watch(bootloader)
val timeoutCancellable =
@ -108,6 +113,16 @@ class LanguageServerController(
unstashAll()
timeoutCancellable.cancel()
context.become(supervising(config, server))
context.actorOf(
LanguageServerSupervisor.props(
config,
server,
supervisionConfig,
new AkkaBasedWebSocketConnectionFactory(),
context.system.scheduler
),
"supervisor"
)
case Terminated(Bootloader) =>
log.error(s"Bootloader for project ${project.name} failed")
@ -127,7 +142,7 @@ class LanguageServerController(
): Receive = {
case StartServer(clientId, _) =>
sender() ! ServerStarted(
SocketData(config.interface, config.port)
Socket(config.interface, config.port)
)
context.become(supervising(config, server, clients + clientId))
@ -140,6 +155,9 @@ class LanguageServerController(
case ClientDisconnected(clientId) =>
removeClient(config, server, clients, clientId, None)
case ServerDied =>
log.error(s"Language server died [$config]")
context.stop(self)
}
private def removeClient(
@ -163,7 +181,6 @@ class LanguageServerController(
case StartServer(_, _) =>
sender() ! LanguageServerProtocol.ServerBootFailed(th)
stop()
}
private def stopping(maybeRequester: Option[ActorRef]): Receive = {
@ -175,15 +192,28 @@ class LanguageServerController(
maybeRequester.foreach(_ ! FailureDuringStoppage(th))
stop()
case ServerStopped =>
case ComponentStopped =>
log.info(s"Language server shut down successfully [$project].")
maybeRequester.foreach(_ ! LanguageServerProtocol.ServerStopped)
maybeRequester.foreach(_ ! ServerStopped)
stop()
}
private def waitingForChildren(): Receive = {
case Terminated(_) =>
if (context.children.isEmpty) {
context.stop(self)
}
}
private def stop(): Unit = {
context.stop(self)
context.parent ! ServerShutDown(project.id)
if (context.children.isEmpty) {
context.stop(self)
} else {
context.children.foreach(_ ! GracefulStop)
context.children.foreach(context.watch)
context.become(waitingForChildren())
}
}
}
@ -201,10 +231,16 @@ object LanguageServerController {
def props(
project: Project,
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
): Props =
Props(
new LanguageServerController(project, networkConfig, bootloaderConfig)
new LanguageServerController(
project,
networkConfig,
bootloaderConfig,
supervisionConfig
)
)
/**
@ -217,4 +253,6 @@ object LanguageServerController {
*/
case object Boot
case object ServerDied
}

View File

@ -2,7 +2,7 @@ package org.enso.projectmanager.infrastructure.languageserver
import java.util.UUID
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.model.Project
/**
@ -28,7 +28,7 @@ object LanguageServerProtocol {
*
* @param socket the server socket
*/
case class ServerStarted(socket: SocketData) extends ServerStartupResult
case class ServerStarted(socket: Socket) extends ServerStartupResult
/**
* Base trait for server startup failures.

View File

@ -5,7 +5,8 @@ import java.util.UUID
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
import org.enso.projectmanager.boot.configuration.{
BootloaderConfig,
NetworkConfig
NetworkConfig,
SupervisionConfig
}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{
CheckIfServerIsRunning,
@ -26,7 +27,8 @@ import org.enso.projectmanager.util.UnhandledLogging
*/
class LanguageServerRegistry(
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
) extends Actor
with ActorLogging
with UnhandledLogging {
@ -42,7 +44,7 @@ class LanguageServerRegistry(
} else {
val controller = context.actorOf(
LanguageServerController
.props(project, networkConfig, bootloaderConfig)
.props(project, networkConfig, bootloaderConfig, supervisionConfig)
)
context.watch(controller)
controller.forward(msg)
@ -87,8 +89,15 @@ object LanguageServerRegistry {
*/
def props(
networkConfig: NetworkConfig,
bootloaderConfig: BootloaderConfig
bootloaderConfig: BootloaderConfig,
supervisionConfig: SupervisionConfig
): Props =
Props(new LanguageServerRegistry(networkConfig, bootloaderConfig))
Props(
new LanguageServerRegistry(
networkConfig,
bootloaderConfig,
supervisionConfig
)
)
}

View File

@ -10,7 +10,7 @@ import org.enso.projectmanager.control.core.CovariantFlatMap
import org.enso.projectmanager.control.core.syntax._
import org.enso.projectmanager.control.effect.syntax._
import org.enso.projectmanager.control.effect.{Async, ErrorChannel}
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._
import org.enso.projectmanager.model.Project
@ -33,7 +33,7 @@ class LanguageServerRegistryProxy[F[+_, +_]: Async: ErrorChannel: CovariantFlatM
override def start(
clientId: UUID,
project: Project
): F[ServerStartupFailure, SocketData] =
): F[ServerStartupFailure, Socket] =
Async[F]
.fromFuture { () =>
(registry ? StartServer(clientId, project)).mapTo[ServerStartupResult]

View File

@ -2,7 +2,7 @@ package org.enso.projectmanager.infrastructure.languageserver
import java.util.UUID
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol.{
CheckTimeout,
ServerStartupFailure,
@ -27,7 +27,7 @@ trait LanguageServerService[F[+_, +_]] {
def start(
clientId: UUID,
project: Project
): F[ServerStartupFailure, SocketData]
): F[ServerStartupFailure, Socket]
/**
* Stops a lang. server.

View File

@ -0,0 +1,190 @@
package org.enso.projectmanager.infrastructure.languageserver
import akka.actor.Status.Failure
import akka.actor.{
Actor,
ActorLogging,
Cancellable,
Props,
Scheduler,
Terminated
}
import akka.pattern.pipe
import org.enso.languageserver.boot.LifecycleComponent.ComponentRestarted
import org.enso.languageserver.boot.{LanguageServerConfig, LifecycleComponent}
import org.enso.projectmanager.boot.configuration.SupervisionConfig
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.infrastructure.http.WebSocketConnectionFactory
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.ServerDied
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerSupervisor.{
RestartServer,
SendHeartbeat,
ServerUnresponsive,
StartSupervision
}
import org.enso.projectmanager.util.UnhandledLogging
/**
* A supervisor process responsible for monitoring language server and
* restarting it when the server is unresponsive. It delegates server
* monitoring to the [[HeartbeatSession]] actor.
*
* @param config a server config
* @param server a server handle
* @param supervisionConfig a supervision config
* @param connectionFactory a web socket connection factory
* @param scheduler a scheduler
*/
class LanguageServerSupervisor(
config: LanguageServerConfig,
server: LifecycleComponent,
supervisionConfig: SupervisionConfig,
connectionFactory: WebSocketConnectionFactory,
scheduler: Scheduler
) extends Actor
with ActorLogging
with UnhandledLogging {
import context.dispatcher
override def preStart(): Unit = { self ! StartSupervision }
override def receive: Receive = uninitialized
private def uninitialized: Receive = {
case GracefulStop =>
context.stop(self)
case StartSupervision =>
val cancellable =
scheduler.scheduleAtFixedRate(
supervisionConfig.initialDelay,
supervisionConfig.heartbeatInterval,
self,
SendHeartbeat
)
context.become(supervising(cancellable))
}
private def supervising(cancellable: Cancellable): Receive = {
case SendHeartbeat =>
val socket = Socket(config.interface, config.port)
context.actorOf(
HeartbeatSession.props(
socket,
supervisionConfig.heartbeatTimeout,
connectionFactory,
scheduler
)
)
case ServerUnresponsive =>
log.info(s"Server is unresponsive [$config]. Restarting it...")
cancellable.cancel()
log.info(s"Restarting first time the server")
server.restart() pipeTo self
context.become(restarting())
case GracefulStop =>
cancellable.cancel()
stop()
}
private def restarting(restartCount: Int = 1): Receive = {
case RestartServer =>
log.info(s"Restarting $restartCount time the server")
server.restart() pipeTo self
case Failure(th) =>
log.error(s"An error occurred during restarting the server [$config]", th)
if (restartCount < supervisionConfig.numberOfRestarts) {
scheduler.scheduleOnce(
supervisionConfig.delayBetweenRestarts,
self,
RestartServer
)
context.become(restarting(restartCount + 1))
} else {
log.error("Cannot restart language server")
context.parent ! ServerDied
context.stop(self)
}
case ComponentRestarted =>
log.info(s"Language server restarted [$config]")
val cancellable =
scheduler.scheduleAtFixedRate(
supervisionConfig.initialDelay,
supervisionConfig.heartbeatInterval,
self,
SendHeartbeat
)
context.become(supervising(cancellable))
case GracefulStop =>
stop()
}
private def waitingForChildren(): Receive = {
case Terminated(_) =>
if (context.children.isEmpty) {
context.stop(self)
}
}
private def stop(): Unit = {
if (context.children.isEmpty) {
context.stop(self)
} else {
context.children.foreach(_ ! GracefulStop)
context.children.foreach(context.watch)
context.become(waitingForChildren())
}
}
}
object LanguageServerSupervisor {
private case object StartSupervision
private case object RestartServer
/**
* A command responsible for initiating heartbeat session.
*/
case object SendHeartbeat
/**
* Signals that server is unresponsive.
*/
case object ServerUnresponsive
/**
* Creates a configuration object used to create a [[LanguageServerSupervisor]].
*
* @param config a server config
* @param server a server handle
* @param supervisionConfig a supervision config
* @param connectionFactory a web socket connection factory
* @param scheduler a scheduler
* @return a configuration object
*/
def props(
config: LanguageServerConfig,
server: LifecycleComponent,
supervisionConfig: SupervisionConfig,
connectionFactory: WebSocketConnectionFactory,
scheduler: Scheduler
): Props =
Props(
new LanguageServerSupervisor(
config,
server,
supervisionConfig,
connectionFactory,
scheduler
)
)
}

View File

@ -0,0 +1,10 @@
package org.enso.projectmanager.infrastructure
package object languageserver {
/**
* A stop command.
*/
case object GracefulStop
}

View File

@ -3,7 +3,7 @@ package org.enso.projectmanager.protocol
import java.util.UUID
import org.enso.jsonrpc.{Error, HasParams, HasResult, Method, Unused}
import org.enso.projectmanager.data.{ProjectMetadata, SocketData}
import org.enso.projectmanager.data.{ProjectMetadata, Socket}
/**
* The project management JSON RPC API provided by the project manager.
@ -42,7 +42,7 @@ object ProjectManagementApi {
case class Params(projectId: UUID)
case class Result(languageServerAddress: SocketData)
case class Result(languageServerAddress: Socket)
implicit val hasParams = new HasParams[this.type] {
type Params = ProjectOpen.Params

View File

@ -7,7 +7,7 @@ import akka.pattern.pipe
import org.enso.jsonrpc.Errors.ServiceError
import org.enso.jsonrpc.{Id, Request, ResponseError, ResponseResult}
import org.enso.projectmanager.control.effect.Exec
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
import org.enso.projectmanager.protocol.ProjectManagementApi.ProjectOpen
import org.enso.projectmanager.requesthandler.ProjectServiceFailureMapper.mapFailure
import org.enso.projectmanager.service.{
@ -67,7 +67,7 @@ class ProjectOpenHandler[F[+_, +_]: Exec](
cancellable.cancel()
context.stop(self)
case Right(socket: SocketData) =>
case Right(socket: Socket) =>
replyTo ! ResponseResult(
ProjectOpen,
id,

View File

@ -7,7 +7,7 @@ import org.enso.projectmanager.control.core.CovariantFlatMap
import org.enso.projectmanager.control.core.syntax._
import org.enso.projectmanager.control.effect.ErrorChannel
import org.enso.projectmanager.control.effect.syntax._
import org.enso.projectmanager.data.{ProjectMetadata, SocketData}
import org.enso.projectmanager.data.{ProjectMetadata, Socket}
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerService
import org.enso.projectmanager.infrastructure.log.Logging
@ -94,7 +94,7 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap](
override def openProject(
clientId: UUID,
projectId: UUID
): F[ProjectServiceFailure, SocketData] = {
): F[ProjectServiceFailure, Socket] = {
// format: off
for {
_ <- log.debug(s"Opening project $projectId")
@ -110,7 +110,7 @@ class ProjectService[F[+_, +_]: ErrorChannel: CovariantFlatMap](
private def startServer(
clientId: UUID,
project: Project
): F[ProjectServiceFailure, SocketData] =
): F[ProjectServiceFailure, Socket] =
languageServerService
.start(clientId, project)
.mapError {

View File

@ -2,7 +2,7 @@ package org.enso.projectmanager.service
import java.util.UUID
import org.enso.projectmanager.data.{ProjectMetadata, SocketData}
import org.enso.projectmanager.data.{ProjectMetadata, Socket}
/**
* A contract for the Project Service.
@ -37,7 +37,7 @@ trait ProjectServiceApi[F[+_, +_]] {
def openProject(
clientId: UUID,
projectId: UUID
): F[ProjectServiceFailure, SocketData]
): F[ProjectServiceFailure, Socket]
/**
* Closes a project. Tries to shut down the Language Server.

View File

@ -8,7 +8,7 @@
</encoder>
</appender>
<root level="ERROR">
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,203 @@
package org.enso.projectmanager.infrastructure.languageserver
import java.util.UUID
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.miguno.akka.testing.VirtualTime
import org.enso.languageserver.boot.LifecycleComponent.ComponentRestarted
import org.enso.languageserver.boot.{LanguageServerConfig, LifecycleComponent}
import org.enso.projectmanager.boot.configuration.SupervisionConfig
import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnectionFactory
import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.ServerDied
import org.enso.projectmanager.infrastructure.languageserver.ProgrammableWebSocketServer.{
Reject,
ReplyWith
}
import org.enso.projectmanager.infrastructure.languageserver.StepParent.ChildTerminated
import org.enso.projectmanager.infrastructure.net.Tcp
import org.mockito.BDDMockito._
import org.mockito.Mockito._
import org.mockito.MockitoSugar
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.must.Matchers
import scala.concurrent.Future
import scala.concurrent.duration._
class LanguageServerSupervisorSpec
extends TestKit(ActorSystem("LanguageServerSupervisorSpec"))
with ImplicitSender
with AnyFlatSpecLike
with Matchers
with BeforeAndAfterAll
with MockitoSugar {
"A language supervisor" should "monitor language server by sending ping requests on regular basis" in new TestCtx {
//given
val probe = TestProbe()
fakeServer.withBehaviour {
case ping @ PingMatcher(requestId) =>
probe.ref ! ping
ReplyWith(
s"""{ "jsonrpc": "2.0", "id": "$requestId", "result": null }"""
)
}
probe.expectNoMessage()
//when
virtualTime.advance(testInitialDelay)
(1 to 10).foreach { _ =>
probe.expectMsgPF() { case PingMatcher(_) => () }
virtualTime.advance(testHeartbeatInterval / 2)
probe.expectNoMessage()
virtualTime.advance(testHeartbeatInterval / 2)
}
//then
`then`(serverComponent.restart()).shouldHaveNoInteractions()
//teardown
parent ! GracefulStop
parentProbe.expectMsg(ChildTerminated)
system.stop(parent)
fakeServer.stop()
}
it should "restart server when pong message doesn't arrive on time" in new TestCtx {
//given
when(serverComponent.restart())
.thenReturn(Future.successful(ComponentRestarted))
val probe = TestProbe()
@volatile var pingCount = 0
fakeServer.withBehaviour {
case ping @ PingMatcher(requestId) =>
probe.ref ! ping
pingCount += 1
if (pingCount == 5) {
Reject
} else {
ReplyWith(
s"""{ "jsonrpc": "2.0", "id": "$requestId", "result": null }"""
)
}
}
probe.expectNoMessage()
//when
virtualTime.advance(testInitialDelay)
(1 to 4).foreach { _ =>
verifyNoInteractions(serverComponent)
probe.expectMsgPF() { case PingMatcher(_) => () }
virtualTime.advance(testHeartbeatInterval / 2)
probe.expectNoMessage()
virtualTime.advance(testHeartbeatInterval / 2)
}
probe.expectMsgPF() { case PingMatcher(_) => () }
virtualTime.advance(testHeartbeatTimeout)
verify(serverComponent, timeout(VerificationTimeout).times(1)).restart()
virtualTime.advance(testInitialDelay)
(1 to 10).foreach { _ =>
verifyNoMoreInteractions(serverComponent)
probe.expectMsgPF() { case PingMatcher(_) => () }
virtualTime.advance(testHeartbeatInterval / 2)
probe.expectNoMessage()
virtualTime.advance(testHeartbeatInterval / 2)
}
//teardown
parent ! GracefulStop
parentProbe.expectMsg(ChildTerminated)
system.stop(parent)
fakeServer.stop()
}
it should "restart server limited number of times" in new TestCtx {
//given
when(serverComponent.restart()).thenReturn(Future.failed(new Exception))
val probe = TestProbe()
fakeServer.withBehaviour {
case ping @ PingMatcher(_) =>
probe.ref ! ping
Reject
}
probe.expectNoMessage()
//when
virtualTime.advance(testInitialDelay)
probe.expectMsgPF(5.seconds) { case PingMatcher(_) => () }
verifyNoInteractions(serverComponent)
virtualTime.advance(testHeartbeatTimeout)
(1 to testRestartLimit).foreach { i =>
verify(serverComponent, timeout(VerificationTimeout).times(i)).restart()
//I need to wait some time to give the supervisor time to schedule next
// restart command
Thread.sleep(1000)
virtualTime.advance(testRestartDelay)
}
virtualTime.advance(testHeartbeatInterval)
probe.expectNoMessage()
verifyNoMoreInteractions(serverComponent)
//then
parentProbe.expectMsg(ServerDied)
parentProbe.expectMsg(ChildTerminated)
//teardown
system.stop(parent)
fakeServer.stop()
}
override def afterAll {
TestKit.shutdownActorSystem(system)
}
trait TestCtx {
val VerificationTimeout = 120000
val virtualTime = new VirtualTime
val serverComponent = mock[LifecycleComponent]
val testHost = "127.0.0.1"
val testPort = Tcp.findAvailablePort(testHost, 49152, 65535)
val testInitialDelay = 5.seconds
val testHeartbeatInterval = 10.seconds
val testHeartbeatTimeout = 7.seconds
val testRestartLimit = 3
val testRestartDelay = 2.seconds
val fakeServer = new ProgrammableWebSocketServer(testHost, testPort)
fakeServer.start()
val serverConfig =
LanguageServerConfig(testHost, testPort, UUID.randomUUID(), "/tmp")
val supervisionConfig =
SupervisionConfig(
testInitialDelay,
testHeartbeatInterval,
testHeartbeatTimeout,
testRestartLimit,
testRestartDelay
)
val parentProbe = TestProbe()
val parent = system.actorOf(
Props(
new StepParent(
LanguageServerSupervisor.props(
serverConfig,
serverComponent,
supervisionConfig,
new AkkaBasedWebSocketConnectionFactory(),
virtualTime.scheduler
),
parentProbe.ref
)
)
)
}
}

View File

@ -0,0 +1,19 @@
package org.enso.projectmanager.infrastructure.languageserver
import java.util.UUID
import io.circe.parser._
object PingMatcher {
def unapply(arg: String): Option[UUID] = {
val maybeJson = parse(arg).toOption
for {
json <- maybeJson
method <- json.hcursor.downField("method").as[String].toOption
if method == "heartbeat/ping"
id <- json.hcursor.downField("id").as[String].toOption
} yield UUID.fromString(id)
}
}

View File

@ -0,0 +1,78 @@
package org.enso.projectmanager.infrastructure.languageserver
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.http.scaladsl.server.Directives.{handleWebSocketMessages, path}
import akka.stream.scaladsl.{Flow, Sink}
import org.enso.projectmanager.infrastructure.languageserver.ProgrammableWebSocketServer.{
Behaviour,
Reject,
ReplyWith
}
import scala.concurrent.Await
import scala.concurrent.duration._
class ProgrammableWebSocketServer(interface: String, port: Int)(
implicit system: ActorSystem
) {
@volatile
private var behaviour: Behaviour = (_ => Reject)
@volatile
private var maybeBinding: Option[Http.ServerBinding] = None
def withBehaviour(f: Behaviour): Unit = {
this.behaviour = f
}
private val handler: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
val payload = Await.result(tm.toStrict(3.seconds), 3.seconds).text
if (behaviour.isDefinedAt(payload)) {
behaviour.apply(payload) match {
case Reject => Nil
case ReplyWith(reply) => TextMessage(reply) :: Nil
}
} else {
Nil
}
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
private val websocketRoute =
path("") {
handleWebSocketMessages(handler)
}
def start(): Unit = {
val binding =
Await.result(
Http().bindAndHandle(websocketRoute, interface, port),
3.seconds
)
maybeBinding = Some(binding)
}
def stop(): Unit =
maybeBinding.foreach { binding =>
Await.result(binding.unbind(), 5.seconds)
}
}
object ProgrammableWebSocketServer {
type Behaviour = PartialFunction[String, Response]
sealed trait Response
case object Reject extends Response
case class ReplyWith(reply: String) extends Response
}

View File

@ -0,0 +1,22 @@
package org.enso.projectmanager.infrastructure.languageserver
import akka.actor.{Actor, ActorRef, Props, Terminated}
import org.enso.projectmanager.infrastructure.languageserver.StepParent.ChildTerminated
class StepParent(childProps: Props, probe: ActorRef) extends Actor {
val child = context.actorOf(childProps)
context.watch(child)
override def receive: Receive = {
case Terminated(`child`) => probe ! ChildTerminated
case GracefulStop => child ! GracefulStop
case msg => probe.tell(msg, sender)
}
}
object StepParent {
case object ChildTerminated
}

View File

@ -13,6 +13,7 @@ import org.enso.projectmanager.boot.configuration.{
BootloaderConfig,
NetworkConfig,
StorageConfig,
SupervisionConfig,
TimeoutConfig
}
import org.enso.projectmanager.control.effect.ZioEnvExec
@ -69,6 +70,9 @@ class BaseServerSpec extends JsonRpcServerTestKit {
lazy val netConfig = NetworkConfig("127.0.0.1", 40000, 60000)
lazy val supervisionConfig =
SupervisionConfig(5.seconds, 10.seconds, 5.seconds, 3, 1.seconds)
implicit val exec = new ZioEnvExec(Runtime.default)
lazy val fileSystem = new BlockingFileSystem(5.seconds)
@ -92,7 +96,10 @@ class BaseServerSpec extends JsonRpcServerTestKit {
lazy val projectValidator = new MonadicProjectValidator[ZIO[ZEnv, *, *]]()
lazy val languageServerRegistry =
system.actorOf(LanguageServerRegistry.props(netConfig, bootloaderConfig))
system.actorOf(
LanguageServerRegistry
.props(netConfig, bootloaderConfig, supervisionConfig)
)
lazy val languageServerService =
new LanguageServerRegistryProxy[ZIO[ZEnv, +*, +*]](

View File

@ -6,7 +6,7 @@ import java.util.UUID
import io.circe.literal._
import io.circe.parser.parse
import org.enso.projectmanager.data.SocketData
import org.enso.projectmanager.data.Socket
class ProjectManagementApiSpec extends BaseServerSpec {
@ -474,7 +474,7 @@ class ProjectManagementApiSpec extends BaseServerSpec {
def openProject(
projectId: UUID
)(implicit client: WsTestClient): SocketData = {
)(implicit client: WsTestClient): Socket = {
client.send(json"""
{ "jsonrpc": "2.0",
"method": "project/open",
@ -490,7 +490,7 @@ class ProjectManagementApiSpec extends BaseServerSpec {
.downField("languageServerAddress")
val Right(host) = socketField.downField("host").as[String]
val Right(port) = socketField.downField("port").as[Int]
SocketData(host, port)
Socket(host, port)
}
def closeProject(

View File

@ -109,6 +109,8 @@ services components, as well as any open questions that may remain.
- [`workspace/connect`](#workspaceconnect)
- [`workspace/undo`](#workspaceundo)
- [`workspace/redo`](#workspaceredo)
- [Monitoring](#monitoring)
- [`heartbeat/ping`](#heartbeatping)
- [Execution Management](#execution-management-1)
- [Types](#types-2)
- [`executionContext/create`](#executioncontextcreate)
@ -2214,6 +2216,35 @@ null
##### Errors
TBC
### Monitoring
The language server also has a heartbeat operation to monitor the Language
server. This API is private and should be used only by the Project Manager.
#### `heartbeat/ping`
This request is sent from the supervisor process to the server to check the
health of the Language Server.
- **Type:** Request
- **Direction:** Supervisor -> Server
- **Connection:** Protocol
- **Visibility:** Private
##### Parameters
```typescript
null
```
##### Result
```typescript
null
```
##### Errors
None
### Execution Management
The execution management portion of the language server API deals with exposing
fine-grained control over program and expression execution to the clients of

View File

@ -7,6 +7,7 @@ import org.enso.languageserver.event.{
ClientDisconnected,
ClientEvent
}
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.util.UnhandledLogging
object LanguageProtocol {
@ -34,6 +35,7 @@ class LanguageServer(config: Config)
}
override def receive: Receive = {
case Ping => sender() ! Pong
case Initialize =>
log.debug("Language Server initialized.")
unstashAll()
@ -45,6 +47,7 @@ class LanguageServer(config: Config)
config: Config,
env: Environment = Environment.empty
): Receive = {
case Ping => sender() ! Pong
case ClientConnected(client) =>
log.info("Client connected [{}].", client.id)
context.become(

View File

@ -3,9 +3,10 @@ package org.enso.languageserver.boot
import akka.http.scaladsl.Http
import com.typesafe.scalalogging.LazyLogging
import org.enso.languageserver.LanguageProtocol
import org.enso.languageserver.boot.LanguageServerComponent.{
ServerStarted,
ServerStopped
import org.enso.languageserver.boot.LifecycleComponent.{
ComponentRestarted,
ComponentStarted,
ComponentStopped
}
import scala.concurrent.Future
@ -17,19 +18,16 @@ import scala.concurrent.duration._
* @param config a LS config
*/
class LanguageServerComponent(config: LanguageServerConfig)
extends LazyLogging {
extends LifecycleComponent
with LazyLogging {
@volatile
private var maybeServerState: Option[(MainModule, Http.ServerBinding)] = None
implicit private val ec = config.computeExecutionContext
/**
* Starts asynchronously a server.
*
* @return a notice that the server started successfully
*/
def start(): Future[ServerStarted.type] = {
/** @inheritdoc **/
override def start(): Future[ComponentStarted.type] = {
logger.info("Starting Language Server...")
for {
mainModule <- Future { new MainModule(config) }
@ -39,15 +37,11 @@ class LanguageServerComponent(config: LanguageServerConfig)
_ <- Future {
logger.info(s"Started server at ${config.interface}:${config.port}")
}
} yield ServerStarted
} yield ComponentStarted
}
/**
* Stops asynchronously a server.
*
* @return a notice that the server stopped successfully
*/
def stop(): Future[ServerStopped.type] =
/** @inheritdoc **/
override def stop(): Future[ComponentStopped.type] =
maybeServerState match {
case None =>
Future.failed(new Exception("Server isn't running"))
@ -56,15 +50,35 @@ class LanguageServerComponent(config: LanguageServerConfig)
for {
_ <- binding.terminate(10.seconds)
_ <- mainModule.system.terminate()
} yield ServerStopped
_ <- Future { mainModule.context.close(true) }
_ <- Future { maybeServerState = None }
} yield ComponentStopped
}
}
/** @inheritdoc **/
override def restart(): Future[ComponentRestarted.type] =
for {
_ <- forceStop()
_ <- start()
} yield ComponentRestarted
object LanguageServerComponent {
private def forceStop(): Future[Unit] = {
maybeServerState match {
case None =>
Future.successful(())
case object ServerStarted
case Some((mainModule, binding)) =>
for {
_ <- binding.terminate(10.seconds).recover(logError)
_ <- mainModule.system.terminate().recover(logError)
_ <- Future { mainModule.context.close(true) }.recover(logError)
_ <- Future { maybeServerState = None }
} yield ComponentStopped
}
}
case object ServerStopped
private val logError: PartialFunction[Throwable, Unit] = {
case th => logger.error("An error occurred during stopping server", th)
}
}

View File

@ -0,0 +1,56 @@
package org.enso.languageserver.boot
import org.enso.languageserver.boot.LifecycleComponent.{
ComponentRestarted,
ComponentStarted,
ComponentStopped
}
import scala.concurrent.Future
/**
* An abstraction for components that can be started and stopped.
*/
trait LifecycleComponent {
/**
* Starts asynchronously a server.
*
* @return a notice that the server started successfully
*/
def start(): Future[ComponentStarted.type]
/**
* Stops asynchronously a server.
*
* @return a notice that the server stopped successfully
*/
def stop(): Future[ComponentStopped.type]
/**
* Restarts asynchronously a server.
*
* @return a notice that the server restarted successfully
*/
def restart(): Future[ComponentRestarted.type]
}
object LifecycleComponent {
/**
* Signals that component was started.
*/
case object ComponentStarted
/**
* Signals that component was stopped.
*/
case object ComponentStopped
/**
* Signals that component was restarted.
*/
case object ComponentRestarted
}

View File

@ -10,6 +10,7 @@ import org.enso.languageserver.data.{
CapabilityRegistration,
ReceivesTreeUpdates
}
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.util.UnhandledLogging
/**
@ -28,6 +29,9 @@ class CapabilityRouter(
with UnhandledLogging {
override def receive: Receive = {
case Ping =>
sender() ! Pong
case msg @ AcquireCapability(_, CapabilityRegistration(CanEdit(_))) =>
bufferRegistry.forward(msg)

View File

@ -5,6 +5,7 @@ import akka.routing.SmallestMailboxPool
import akka.pattern.pipe
import org.enso.languageserver.effect._
import org.enso.languageserver.data.Config
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.util.UnhandledLogging
import zio._
@ -27,6 +28,9 @@ class FileManager(
import context.dispatcher
override def receive: Receive = {
case Ping =>
sender() ! Pong
case FileManagerProtocol.WriteFile(path, content) =>
val result =
for {

View File

@ -0,0 +1,21 @@
package org.enso.languageserver.monitoring
import org.enso.jsonrpc.{HasParams, HasResult, Method, Unused}
/**
* The monitoring JSON RPC API provided by the language server.
* See [[https://github.com/luna/enso/blob/master/doc/design/engine/engine-services.md]]
* for message specifications.
*/
object MonitoringApi {
case object Ping extends Method("heartbeat/ping") {
implicit val hasParams = new HasParams[this.type] {
type Params = Unused.type
}
implicit val hasResult = new HasResult[this.type] {
type Result = Unused.type
}
}
}

View File

@ -0,0 +1,15 @@
package org.enso.languageserver.monitoring
object MonitoringProtocol {
/**
* A ping command.
*/
case object Ping
/**
* A pong reply.
*/
case object Pong
}

View File

@ -14,9 +14,11 @@ import org.enso.languageserver.capability.CapabilityApi.{
import org.enso.languageserver.capability.CapabilityProtocol
import org.enso.languageserver.data.Client
import org.enso.languageserver.event.{ClientConnected, ClientDisconnected}
import org.enso.languageserver.filemanager.PathWatcherProtocol
import org.enso.languageserver.filemanager.FileManagerApi._
import org.enso.languageserver.filemanager.PathWatcherProtocol
import org.enso.languageserver.monitoring.MonitoringApi.Ping
import org.enso.languageserver.requesthandler._
import org.enso.languageserver.requesthandler.monitoring.PingHandler
import org.enso.languageserver.runtime.ExecutionApi._
import org.enso.languageserver.util.UnhandledLogging
import org.enso.languageserver.text.TextApi._
@ -55,6 +57,16 @@ class ClientController(
private val requestHandlers: Map[Method, Props] =
Map(
Ping -> PingHandler.props(
List(
server,
bufferRegistry,
capabilityRouter,
fileManager,
contextRegistry
),
requestTimeout
),
AcquireCapability -> AcquireCapabilityHandler
.props(capabilityRouter, requestTimeout, client),
ReleaseCapability -> ReleaseCapabilityHandler

View File

@ -9,8 +9,9 @@ import org.enso.languageserver.capability.CapabilityApi.{
ReleaseCapability
}
import org.enso.languageserver.filemanager.FileManagerApi._
import org.enso.languageserver.text.TextApi._
import org.enso.languageserver.monitoring.MonitoringApi.Ping
import org.enso.languageserver.runtime.ExecutionApi._
import org.enso.languageserver.text.TextApi._
object JsonRpc {
@ -18,6 +19,7 @@ object JsonRpc {
* A description of supported JSON RPC messages.
*/
val protocol: Protocol = Protocol.empty
.registerRequest(Ping)
.registerRequest(AcquireCapability)
.registerRequest(ReleaseCapability)
.registerRequest(WriteFile)

View File

@ -0,0 +1,71 @@
package org.enso.languageserver.requesthandler.monitoring
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props}
import org.enso.jsonrpc.{Id, Request, ResponseResult, Unused}
import org.enso.languageserver.monitoring.MonitoringApi
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.requesthandler.RequestTimeout
import scala.concurrent.duration.FiniteDuration
/**
* A request handler for `heartbeat/ping` commands.
*
* @param subsystems a list of monitored subsystems
* @param timeout a request timeout
*/
class PingHandler(
subsystems: List[ActorRef],
timeout: FiniteDuration
) extends Actor
with ActorLogging {
import context.dispatcher
override def receive: Receive = scatter
private def scatter: Receive = {
case Request(MonitoringApi.Ping, id, Unused) =>
subsystems.foreach(_ ! Ping)
val cancellable =
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
context.become(gather(id, sender(), cancellable))
}
private def gather(
id: Id,
replyTo: ActorRef,
cancellable: Cancellable,
count: Int = 0
): Receive = {
case RequestTimeout =>
log.error(
s"Health check timed out. Only $count/${subsystems.size} subsystems replied on time."
)
context.stop(self)
case Pong =>
if (count + 1 == subsystems.size) {
replyTo ! ResponseResult(MonitoringApi.Ping, id, Unused)
cancellable.cancel()
context.stop(self)
} else {
context.become(gather(id, replyTo, cancellable, count + 1))
}
}
}
object PingHandler {
/**
* Creates a configuration object used to create a [[PingHandler]]
*
* @param subsystems a list of monitored subsystems
* @param timeout a request timeout
* @return a configuration object
*/
def props(subsystems: List[ActorRef], timeout: FiniteDuration): Props =
Props(new PingHandler(subsystems, timeout))
}

View File

@ -3,6 +3,7 @@ package org.enso.languageserver.runtime
import java.util.UUID
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.data.Config
import org.enso.languageserver.filemanager.FileSystemFailure
import org.enso.languageserver.runtime.ExecutionApi.ContextId
@ -52,6 +53,9 @@ final class ContextRegistry(config: Config, runtime: ActorRef)
withStore(ContextRegistry.Store(Map()))
private def withStore(store: ContextRegistry.Store): Receive = {
case Ping =>
sender() ! Pong
case CreateContextRequest(client) =>
val handler =
context.actorOf(CreateContextHandler.props(timeout, runtime))

View File

@ -13,6 +13,7 @@ import org.enso.languageserver.data.{
ContentBasedVersioning
}
import org.enso.languageserver.filemanager.Path
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.enso.languageserver.util.UnhandledLogging
import org.enso.languageserver.text.TextProtocol.{
ApplyEdit,
@ -39,6 +40,9 @@ class BufferRegistry(fileManager: ActorRef)(
override def receive: Receive = running(Map.empty)
private def running(registry: Map[Path, ActorRef]): Receive = {
case Ping =>
sender() ! Pong
case msg @ OpenFile(_, path) =>
if (registry.contains(path)) {
registry(path).forward(msg)

View File

@ -0,0 +1,85 @@
package org.enso.languageserver.requesthandler.monitoring
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.enso.jsonrpc.Id.Number
import org.enso.jsonrpc.{Request, ResponseResult, Unused}
import org.enso.languageserver.monitoring.MonitoringApi
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.must.Matchers
import scala.concurrent.duration._
class PingHandlerSpec
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender
with AnyFlatSpecLike
with Matchers {
"A PingHandler" must "scatter pings to all subsystems" in {
//given
val subsystem1 = TestProbe()
val subsystem2 = TestProbe()
val subsystem3 = TestProbe()
val actorUnderTest = system.actorOf(
PingHandler
.props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 10.seconds)
)
//when
actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused)
//then
subsystem1.expectMsg(Ping)
subsystem2.expectMsg(Ping)
subsystem3.expectMsg(Ping)
//teardown
system.stop(actorUnderTest)
}
it must "gather Pong messages and reply with success response" in {
//given
val subsystem1 = TestProbe()
val subsystem2 = TestProbe()
val subsystem3 = TestProbe()
val actorUnderTest = system.actorOf(
PingHandler
.props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 10.seconds)
)
//when
actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused)
subsystem1.expectMsg(Ping)
subsystem1.lastSender ! Pong
subsystem2.expectMsg(Ping)
subsystem2.lastSender ! Pong
subsystem3.expectMsg(Ping)
subsystem3.lastSender ! Pong
//then
expectMsg(ResponseResult(MonitoringApi.Ping, Number(1), Unused))
//teardown
system.stop(actorUnderTest)
}
it must "stop without replying when some of subsystems don't reply on time" in {
//given
val subsystem1 = TestProbe()
val subsystem2 = TestProbe()
val subsystem3 = TestProbe()
val actorUnderTest = system.actorOf(
PingHandler
.props(List(subsystem1.ref, subsystem2.ref, subsystem3.ref), 2.seconds)
)
watch(actorUnderTest)
//when
actorUnderTest ! Request(MonitoringApi.Ping, Number(1), Unused)
subsystem2.expectMsg(Ping)
subsystem2.lastSender ! Pong
subsystem3.expectMsg(Ping)
subsystem3.lastSender ! Pong
//then
expectTerminated(actorUnderTest)
expectNoMessage()
//teardown
system.stop(actorUnderTest)
}
}

View File

@ -0,0 +1,27 @@
package org.enso.languageserver.websocket
import io.circe.literal._
class MonitoringTest extends BaseServerTest {
"Monitoring subsystem" must {
"reply to ping requests" in {
val client = new WsTestClient(address)
client.send(json"""
{ "jsonrpc": "2.0",
"method": "heartbeat/ping",
"id": 1,
"params": null
}
""")
client.expectJson(json"""
{ "jsonrpc": "2.0",
"id": 1,
"result": null
}
""")
}
}
}

View File

@ -1 +1 @@
sbt.version=1.3.3
sbt.version=1.3.5