mirror of
https://github.com/enso-org/enso.git
synced 2024-11-30 05:35:09 +03:00
Message handler supervisor (#3881)
Changelog: - Fix a potential `null` value in diagnostic messages - Add a supervising strategy to the message handler
This commit is contained in:
parent
d54faab903
commit
52a8c72303
@ -61,6 +61,7 @@ class JsonConnectionControllerFactory(
|
||||
projectSettingsManager = projectSettingsManager,
|
||||
libraryConfig = libraryConfig,
|
||||
languageServerConfig = config
|
||||
)
|
||||
),
|
||||
s"json-connection-controller-$clientId"
|
||||
)
|
||||
}
|
||||
|
@ -308,7 +308,7 @@ object ContextRegistryProtocol {
|
||||
*/
|
||||
case class ExecutionDiagnostic(
|
||||
kind: ExecutionDiagnosticKind,
|
||||
message: String,
|
||||
message: Option[String],
|
||||
path: Option[Path],
|
||||
location: Option[model.Range],
|
||||
expressionId: Option[UUID],
|
||||
|
@ -409,7 +409,7 @@ class ContextEventsListenerSpec
|
||||
Seq(
|
||||
ExecutionDiagnostic(
|
||||
ExecutionDiagnosticKind.Error,
|
||||
message,
|
||||
Some(message),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
|
@ -847,7 +847,7 @@ object Runtime {
|
||||
*/
|
||||
case class Diagnostic(
|
||||
kind: DiagnosticType,
|
||||
message: String,
|
||||
message: Option[String],
|
||||
file: Option[File],
|
||||
location: Option[model.Range],
|
||||
expressionId: Option[ExpressionId],
|
||||
@ -858,7 +858,7 @@ object Runtime {
|
||||
override def toLogString(shouldMask: Boolean): String =
|
||||
"Diagnostic(" +
|
||||
s"kind=$kind," +
|
||||
s"message=${MaskedString(message).toLogString(shouldMask)}," +
|
||||
s"message=${message.map(m => MaskedString(m).toLogString(shouldMask))}," +
|
||||
s"file=${file.map(f => MaskedPath(f.toPath).toLogString(shouldMask))}," +
|
||||
s"location=$location," +
|
||||
s"expressionId=$expressionId," +
|
||||
@ -886,7 +886,7 @@ object Runtime {
|
||||
): Diagnostic =
|
||||
new Diagnostic(
|
||||
DiagnosticType.Error(),
|
||||
message,
|
||||
Option(message),
|
||||
file,
|
||||
location,
|
||||
expressionId,
|
||||
@ -911,7 +911,7 @@ object Runtime {
|
||||
): Diagnostic =
|
||||
new Diagnostic(
|
||||
DiagnosticType.Warning(),
|
||||
message,
|
||||
Option(message),
|
||||
file,
|
||||
location,
|
||||
expressionId,
|
||||
|
@ -183,7 +183,7 @@ final class EnsureCompiledJob(protected val files: Iterable[File])
|
||||
): Api.ExecutionResult.Diagnostic = {
|
||||
Api.ExecutionResult.Diagnostic(
|
||||
kind,
|
||||
diagnostic.message,
|
||||
Option(diagnostic.message),
|
||||
Option(module.getPath).map(new File(_)),
|
||||
diagnostic.location
|
||||
.map(loc =>
|
||||
|
@ -36,14 +36,11 @@ class JsonRpcServer(
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
|
||||
private def newUser(): Flow[Message, Message, NotUsed] = {
|
||||
val clientId = UUID.randomUUID()
|
||||
val clientActor = clientControllerFactory.createClientController(clientId)
|
||||
|
||||
val messageHandler =
|
||||
system.actorOf(
|
||||
Props(new MessageHandler(protocol, clientActor))
|
||||
Props(new MessageHandlerSupervisor(clientControllerFactory, protocol)),
|
||||
s"message-handler-supervisor-${UUID.randomUUID()}"
|
||||
)
|
||||
clientActor ! JsonRpcServer.WebConnect(messageHandler)
|
||||
|
||||
val incomingMessages: Sink[Message, NotUsed] =
|
||||
Flow[Message]
|
||||
|
@ -0,0 +1,77 @@
|
||||
package org.enso.jsonrpc
|
||||
|
||||
import akka.actor.{
|
||||
Actor,
|
||||
ActorRef,
|
||||
OneForOneStrategy,
|
||||
Props,
|
||||
Stash,
|
||||
SupervisorStrategy
|
||||
}
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
/** An actor responsible for supervising the [[MessageHandler]].
|
||||
*
|
||||
* @param protocol a protocol supported be the server
|
||||
* @param clientControllerFactory a factory used to create a client controller
|
||||
*/
|
||||
final class MessageHandlerSupervisor(
|
||||
clientControllerFactory: ClientControllerFactory,
|
||||
protocol: Protocol
|
||||
) extends Actor
|
||||
with LazyLogging
|
||||
with Stash {
|
||||
|
||||
import MessageHandlerSupervisor._
|
||||
|
||||
override def preStart(): Unit = {
|
||||
self ! Initialize
|
||||
}
|
||||
|
||||
override def receive: Receive = uninitialized
|
||||
|
||||
override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
|
||||
errorDecider.orElse(SupervisorStrategy.defaultDecider)
|
||||
}
|
||||
|
||||
/** Method defining the supervising behavior in case of errors.
|
||||
*
|
||||
* Child [[MessageHandler]] actor maintains a state that will be lost if
|
||||
* the actor is restarted (default akka supervising behavior). Instead of
|
||||
* restarting we log the error and allow it to continue handling messages.
|
||||
*/
|
||||
private def errorDecider: SupervisorStrategy.Decider = {
|
||||
case error: Exception =>
|
||||
logger.warn("Resuming after error.", error)
|
||||
SupervisorStrategy.Resume
|
||||
}
|
||||
|
||||
private def uninitialized: Receive = {
|
||||
case Initialize =>
|
||||
val clientId = UUID.randomUUID()
|
||||
val clientActor = clientControllerFactory.createClientController(clientId)
|
||||
|
||||
val messageHandler =
|
||||
context.actorOf(
|
||||
Props(new MessageHandler(protocol, clientActor)),
|
||||
s"message-handler-$clientId"
|
||||
)
|
||||
clientActor ! JsonRpcServer.WebConnect(messageHandler)
|
||||
context.become(initialized(messageHandler))
|
||||
unstashAll()
|
||||
|
||||
case _ =>
|
||||
stash()
|
||||
}
|
||||
|
||||
private def initialized(messageHandler: ActorRef): Receive = { case message =>
|
||||
messageHandler.forward(message)
|
||||
}
|
||||
}
|
||||
|
||||
object MessageHandlerSupervisor {
|
||||
|
||||
case object Initialize
|
||||
}
|
@ -50,7 +50,7 @@ class ManagerClientControllerFactory[
|
||||
loggingServiceDescriptor,
|
||||
timeoutConfig
|
||||
),
|
||||
s"jsonrpc-connection-controller-$clientId"
|
||||
s"manager-client-controller-$clientId"
|
||||
)
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user