Record interaction between GUI and LS (#10107)

close #8328

Changelog:
- add: message callbacks to JsonRpc and Binary servers
- update: use events log to collect the RPC messages
This commit is contained in:
Dmitry Bushev 2024-06-03 07:50:59 +01:00 committed by GitHub
parent 9632f04e9b
commit d08cb704b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 257 additions and 123 deletions

View File

@ -2,8 +2,10 @@ package org.enso.languageserver.runtime.events;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Instant;
import java.util.Base64;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.LogRecord;
@ -30,6 +32,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor {
private static final String RECORDS_TAG_CLOSE = "</records>";
private static final String MESSAGE_SEPARATOR = ",";
private static final String MESSAGE_EMPTY_REQUEST_ID = "";
private static final String HEARTBEAT_PATTERN = "\"method\": \"heartbeat/";
/**
* Create an instance of {@link RuntimeEventsMonitor}.
@ -61,7 +64,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor {
}
@Override
public void registerEvent(Object event) {
public void registerRuntimeMessage(Object event) {
if (event instanceof Runtime.ApiEnvelope envelope) {
registerApiEnvelope(envelope);
} else if (event instanceof RuntimeConnector.MessageFromRuntime messageFromRuntime) {
@ -69,6 +72,22 @@ public final class RuntimeEventsMonitor implements EventsMonitor {
}
}
@Override
public void registerTextRpcMessage(String message) {
if (message.contains(HEARTBEAT_PATTERN)) return;
String entry = buildEntry(Direction.REQUEST, Option.empty(), message);
out.print(entry);
}
@Override
public void registerBinaryRpcMessage(ByteBuffer message) {
byte[] bytes = new byte[message.remaining()];
message.get(bytes);
String payload = Base64.getEncoder().encodeToString(bytes);
String entry = buildEntry(Direction.REQUEST, Option.empty(), payload);
out.print(entry);
}
@Override
public void close() throws IOException {
out.println(RECORDS_TAG_CLOSE);
@ -77,19 +96,18 @@ public final class RuntimeEventsMonitor implements EventsMonitor {
private void registerApiEnvelope(Runtime.ApiEnvelope event) {
if (event instanceof Runtime$Api$Request request) {
String entry =
buildEntry(Direction.REQUEST, request.requestId(), request.payload().getClass());
String payload = request.payload().getClass().getSimpleName();
String entry = buildEntry(Direction.REQUEST, request.requestId(), payload);
out.print(entry);
} else if (event instanceof Runtime$Api$Response response) {
String entry =
buildEntry(Direction.RESPONSE, response.correlationId(), response.payload().getClass());
String payload = response.payload().getClass().getSimpleName();
String entry = buildEntry(Direction.RESPONSE, response.correlationId(), payload);
out.print(entry);
}
}
private String buildEntry(Direction direction, Option<UUID> requestId, Class<?> payload) {
private String buildEntry(Direction direction, Option<UUID> requestId, String payload) {
String requestIdEntry = requestId.fold(() -> MESSAGE_EMPTY_REQUEST_ID, UUID::toString);
String payloadEntry = payload.getSimpleName();
Instant timeEntry = clock.instant();
String message =
@ -98,7 +116,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor {
.append(MESSAGE_SEPARATOR)
.append(requestIdEntry)
.append(MESSAGE_SEPARATOR)
.append(payloadEntry)
.append(payload)
.toString();
LogRecord record = new LogRecord(Level.INFO, message);

View File

@ -24,7 +24,7 @@ import org.enso.languageserver.monitoring.{
IdlenessEndpoint,
IdlenessMonitor
}
import org.enso.languageserver.profiling.ProfilingManager
import org.enso.languageserver.profiling.{EventsMonitorActor, ProfilingManager}
import org.enso.languageserver.protocol.binary.{
BinaryConnectionControllerFactory,
InboundMessageDecoder
@ -172,22 +172,39 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
"lock-manager-service"
)
val runtimeEventsMonitor =
private val (runtimeEventsMonitor, messagesCallbackOpt) =
languageServerConfig.profiling.profilingEventsLogPath match {
case Some(path) =>
val out = new PrintStream(path.toFile, StandardCharsets.UTF_8)
new RuntimeEventsMonitor(out)
new RuntimeEventsMonitor(out) -> Some(())
case None =>
new NoopEventsMonitor()
new NoopEventsMonitor() -> None
}
log.trace(
"Started runtime events monitor [{}].",
runtimeEventsMonitor.getClass.getName
)
private val eventsMonitor =
system.actorOf(
EventsMonitorActor.props(runtimeEventsMonitor),
"events-monitor"
)
private val messagesCallback =
messagesCallbackOpt
.map(_ => EventsMonitorActor.messagesCallback(eventsMonitor))
.toList
private val profilingManager =
system.actorOf(
ProfilingManager.props(eventsMonitor, distributionManager),
"profiling-manager"
)
lazy val runtimeConnector =
system.actorOf(
RuntimeConnector.props(lockManagerService, runtimeEventsMonitor),
RuntimeConnector.props(lockManagerService, eventsMonitor),
"runtime-connector"
)
@ -370,12 +387,6 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
"project-settings-manager"
)
val profilingManager =
system.actorOf(
ProfilingManager.props(runtimeConnector, distributionManager),
"profiling-manager"
)
val libraryLocations =
LibraryLocations.resolve(
distributionManager,
@ -476,7 +487,8 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
lazyMessageTimeout = 10.seconds,
secureConfig = secureConfig
),
List(healthCheckEndpoint, idlenessEndpoint)
List(healthCheckEndpoint, idlenessEndpoint),
messagesCallback
)
log.trace("Created JSON RPC Server [{}].", jsonRpcServer)
@ -489,7 +501,8 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
outgoingBufferSize = 100,
lazyMessageTimeout = 10.seconds,
secureConfig = secureConfig
)
),
messagesCallback
)
log.trace("Created Binary WebSocket Server [{}].", binaryServer)
@ -502,6 +515,7 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
def close(): Unit = {
suggestionsRepo.close()
context.close()
runtimeEventsMonitor.close()
log.info("Closed Language Server main module.")
}

View File

@ -23,19 +23,44 @@ case class ProfilingConfig(
* @return the path to the runtime events log file
*/
def profilingEventsLogPath: Option[Path] =
profilingPath.map { path =>
val profilingDirectory = path.getParent
val profilingFileName = path.getFileName.toString
val profilingFileExtension = FilenameUtils.getExtension(profilingFileName)
val eventsLogFileName =
profilingFileName.stripSuffix(
profilingFileExtension
) + ProfilingConfig.EventsLogExtension
profilingPath.map(
ProfilingConfig.modifyPath(
_,
ProfilingConfig.EventsLogSuffix,
ProfilingConfig.EventsLogExtension
)
)
}
profilingDirectory.resolve(eventsLogFileName)
}
}
object ProfilingConfig {
private val EventsLogSuffix = ""
private val EventsLogExtension = "log"
/** Modify the path by adding a suffix and changing the file extension.
*
* @param path the path to modify
* @param suffix the suffix to add
* @param extension the new file extension
* @return the modified path
*/
private def modifyPath(
path: Path,
suffix: String,
extension: String
): Path = {
val directory = path.getParent
val fileName = path.getFileName.toString
val fileExtension = FilenameUtils.getExtension(fileName)
val modifiedFileName =
if (fileExtension.isEmpty) {
s"$fileName$suffix.$extension"
} else {
val fileNameWithoutExtension = fileName.stripSuffix(s".$fileExtension")
s"$fileNameWithoutExtension$suffix.$extension"
}
directory.resolve(modifiedFileName)
}
}

View File

@ -25,6 +25,8 @@ import org.enso.languageserver.util.binary.{
DecodingFailure
}
import java.nio.ByteBuffer
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
@ -35,6 +37,7 @@ import scala.concurrent.ExecutionContext
* @param factory creates front controller per a single connection that is
* responsible for handling all incoming requests
* @param config a configuration object for properties of the server
* @param messageCallbacks a list of message callbacks
* @param system an actor system that hosts the server
* @param materializer an actor materializer that converts inbound and outbound
* flows into actors running these streams
@ -45,7 +48,8 @@ class BinaryWebSocketServer[A, B](
decoder: BinaryDecoder[A],
encoder: BinaryEncoder[B],
factory: ConnectionControllerFactory,
config: Config = Config.default
config: Config = Config.default,
messageCallbacks: List[ByteBuffer => Unit] = List.empty
)(
implicit val system: ActorSystem,
implicit val materializer: Materializer
@ -54,6 +58,13 @@ class BinaryWebSocketServer[A, B](
implicit val ec: ExecutionContext = system.dispatcher
private val messageCallbackSinks =
messageCallbacks.map { callback =>
Sink.foreach[ByteBuffer] { byteBuffer =>
callback(byteBuffer.asReadOnlyBuffer())
}
}
private val route: Route =
extractClientIP {
case RemoteAddress.Unknown =>
@ -108,7 +119,7 @@ class BinaryWebSocketServer[A, B](
frontController: ActorRef,
ip: RemoteAddress.IP
): Sink[Message, NotUsed] = {
Flow[Message]
val flow = Flow[Message]
.mapConcat[BinaryMessage] {
case msg: TextMessage =>
logger.warn(
@ -125,7 +136,14 @@ class BinaryWebSocketServer[A, B](
_.toStrict(config.lazyMessageTimeout)
}
.map { binaryMsg =>
val bytes = binaryMsg.data.asByteBuffer
binaryMsg.data.asByteBuffer
}
val flowWithCallbacks =
messageCallbackSinks.foldLeft(flow)(_ alsoTo _)
flowWithCallbacks
.map { bytes =>
decoder.decode(bytes)
}
.to {

View File

@ -0,0 +1,46 @@
package org.enso.languageserver.profiling
import akka.actor.{Actor, ActorRef, Props}
import org.enso.jsonrpc.MessageHandler
import org.enso.languageserver.util.UnhandledLogging
import org.enso.profiling.events.EventsMonitor
import java.nio.ByteBuffer
final class EventsMonitorActor(initialEventsMonitor: EventsMonitor)
extends Actor
with UnhandledLogging {
override def receive: Receive =
initialized(initialEventsMonitor)
private def initialized(eventsMonitor: EventsMonitor): Receive = {
case EventsMonitorProtocol.RegisterEventsMonitor(newEventsMonitor) =>
context.become(initialized(newEventsMonitor))
case EventsMonitorProtocol.RegisterRuntimeMessage(message) =>
eventsMonitor.registerRuntimeMessage(message)
case EventsMonitorProtocol.RegisterTextRpcMessage(message) =>
eventsMonitor.registerTextRpcMessage(message)
case EventsMonitorProtocol.RegisterBinaryRpcMessage(message) =>
eventsMonitor.registerBinaryRpcMessage(message)
}
}
object EventsMonitorActor {
def props(eventsMonitor: EventsMonitor): Props =
Props(new EventsMonitorActor(eventsMonitor))
def messagesCallback(eventsMonitor: ActorRef): Actor.Receive = {
case webMessage: MessageHandler.WebMessage =>
eventsMonitor ! EventsMonitorProtocol.RegisterTextRpcMessage(
webMessage.message
)
case byteBuffer: ByteBuffer =>
eventsMonitor ! EventsMonitorProtocol.RegisterBinaryRpcMessage(byteBuffer)
}
}

View File

@ -0,0 +1,20 @@
package org.enso.languageserver.profiling
import org.enso.profiling.events.EventsMonitor
import java.nio.ByteBuffer
object EventsMonitorProtocol {
/** Protocol message to register new events monitor in the runtime connector.
*
* @param eventsMonitor the events monitor to register
*/
case class RegisterEventsMonitor(eventsMonitor: EventsMonitor)
case class RegisterRuntimeMessage(message: Any)
case class RegisterTextRpcMessage(message: String)
case class RegisterBinaryRpcMessage(message: ByteBuffer)
}

View File

@ -3,10 +3,9 @@ package org.enso.languageserver.profiling
import akka.actor.{Actor, ActorRef, Props}
import com.typesafe.scalalogging.LazyLogging
import org.enso.distribution.DistributionManager
import org.enso.languageserver.runtime.RuntimeConnector
import org.enso.languageserver.runtime.events.RuntimeEventsMonitor
import org.enso.logger.masking.MaskedPath
import org.enso.profiling.events.NoopEventsMonitor
import org.enso.profiling.events.{EventsMonitor, NoopEventsMonitor}
import org.enso.profiling.sampler.{MethodsSampler, OutputStreamSampler}
import org.enso.profiling.snapshot.{HeapDumpSnapshot, ProfilingSnapshot}
@ -21,13 +20,13 @@ import scala.util.{Failure, Success, Try}
/** Handles the profiling commands.
*
* @param runtimeConnector the connection to runtime
* @param eventsMonitorActor the events monitor actor
* @param distributionManager the distribution manager
* @param profilingSnapshot the profiling snapshot generator
* @param clock the system clock
*/
final class ProfilingManager(
runtimeConnector: ActorRef,
eventsMonitorActor: ActorRef,
distributionManager: DistributionManager,
profilingSnapshot: ProfilingSnapshot,
clock: Clock
@ -56,22 +55,39 @@ final class ProfilingManager(
sampler.start()
val eventsMonitor = createEventsMonitor(instant)
runtimeConnector ! RuntimeConnector.RegisterEventsMonitor(
eventsMonitorActor ! EventsMonitorProtocol.RegisterEventsMonitor(
eventsMonitor
)
sender() ! ProfilingProtocol.ProfilingStartResponse
context.become(
initialized(
Some(RunningSampler(instant, sampler, result, memorySnapshot))
Some(
RunningSampler(
instant,
sampler,
result,
memorySnapshot,
eventsMonitor
)
)
)
)
}
case ProfilingProtocol.ProfilingStopRequest =>
sampler match {
case Some(RunningSampler(instant, sampler, result, memorySnapshot)) =>
case Some(
RunningSampler(
instant,
sampler,
result,
memorySnapshot,
eventsMonitor
)
) =>
sampler.stop()
eventsMonitor.close()
Try(saveSamplerResult(result.toByteArray, instant)) match {
case Failure(exception) =>
@ -87,7 +103,7 @@ final class ProfilingManager(
saveMemorySnapshot(instant)
}
runtimeConnector ! RuntimeConnector.RegisterEventsMonitor(
eventsMonitorActor ! EventsMonitorProtocol.RegisterEventsMonitor(
new NoopEventsMonitor
)
@ -172,7 +188,8 @@ object ProfilingManager {
instant: Instant,
sampler: MethodsSampler,
result: ByteArrayOutputStream,
memorySnapshot: Boolean
memorySnapshot: Boolean,
eventsMonitor: EventsMonitor
)
private def createProfilingFileName(instant: Instant): String = {
@ -197,20 +214,20 @@ object ProfilingManager {
/** Creates the configuration object used to create a [[ProfilingManager]].
*
* @param runtimeConnector the connection to runtime
* @param eventsMonitor the events monitor actor
* @param distributionManager the distribution manager
* @param profilingSnapshot the profiling snapshot generator
* @param clock the system clock
*/
def props(
runtimeConnector: ActorRef,
eventsMonitor: ActorRef,
distributionManager: DistributionManager,
profilingSnapshot: ProfilingSnapshot = new HeapDumpSnapshot(),
clock: Clock = Clock.systemUTC()
): Props =
Props(
new ProfilingManager(
runtimeConnector,
eventsMonitor,
distributionManager,
profilingSnapshot,
clock

View File

@ -2,6 +2,7 @@ package org.enso.languageserver.runtime
import akka.actor.{Actor, ActorRef, Props, Stash}
import com.typesafe.scalalogging.LazyLogging
import org.enso.languageserver.profiling.EventsMonitorProtocol
import org.enso.languageserver.runtime.RuntimeConnector.{
Destroy,
MessageFromRuntime
@ -12,7 +13,6 @@ import org.enso.logger.akka.ActorMessageLogging
import org.enso.logger.masking.ToLogString
import org.enso.polyglot.runtime.Runtime
import org.enso.polyglot.runtime.Runtime.{Api, ApiEnvelope}
import org.enso.profiling.events.EventsMonitor
import org.graalvm.polyglot.io.MessageEndpoint
import java.nio.ByteBuffer
@ -22,7 +22,7 @@ import scala.util.{Failure, Success}
/** An actor managing a connection to Enso's runtime server. */
final class RuntimeConnector(
handlers: Map[Class[_], ActorRef],
initialEventsMonitor: EventsMonitor
eventsMonitor: ActorRef
) extends Actor
with LazyLogging
with ActorMessageLogging
@ -40,15 +40,12 @@ final class RuntimeConnector(
engine
)
unstashAll()
context.become(waitingOnEndpoint(engine, initialEventsMonitor))
context.become(waitingOnEndpoint(engine))
case _ => stash()
}
private def waitingOnEndpoint(
engine: MessageEndpoint,
eventsMonitor: EventsMonitor
): Receive =
registerEvent(eventsMonitor).andThen(LoggingReceive {
private def waitingOnEndpoint(engine: MessageEndpoint): Receive =
registerMessage.andThen(LoggingReceive {
case MessageFromRuntime(
Runtime.Api.Response(None, Api.InitializedNotification())
) =>
@ -57,11 +54,7 @@ final class RuntimeConnector(
engine
)
unstashAll()
context.become(initialized(engine, eventsMonitor, Map()))
case RuntimeConnector.RegisterEventsMonitor(newEventsMonitor) =>
eventsMonitor.close()
context.become(waitingOnEndpoint(engine, newEventsMonitor))
context.become(initialized(engine, Map()))
case _ => stash()
})
@ -87,30 +80,21 @@ final class RuntimeConnector(
* the runtime are forwarded to one of the registered handlers.
*
* @param engine endpoint of a runtime
* @param eventsMonitor the current events monitor
* @param senders request ids with corresponding senders
*/
def initialized(
engine: MessageEndpoint,
eventsMonitor: EventsMonitor,
senders: Map[Runtime.Api.RequestId, ActorRef]
): Receive = registerEvent(eventsMonitor).andThen(LoggingReceive {
): Receive = registerMessage.andThen(LoggingReceive {
case Destroy =>
eventsMonitor.close()
context.stop(self)
case RuntimeConnector.RegisterEventsMonitor(newEventsMonitor) =>
eventsMonitor.close()
context.become(initialized(engine, newEventsMonitor, senders))
case msg: Runtime.ApiEnvelope =>
engine.sendBinary(Runtime.Api.serialize(msg))
msg match {
case Api.Request(Some(id), _) =>
context.become(
initialized(engine, eventsMonitor, senders + (id -> sender()))
)
context.become(initialized(engine, senders + (id -> sender())))
case _ =>
}
@ -146,20 +130,13 @@ final class RuntimeConnector(
case _ =>
}
}
context.become(
initialized(engine, eventsMonitor, senders - correlationId)
)
context.become(initialized(engine, senders - correlationId))
})
/** Register event in the events monitor
*
* @param eventsMonitor the current events monitor
*/
private def registerEvent(
eventsMonitor: EventsMonitor
): PartialFunction[Any, Any] = { event =>
eventsMonitor.registerEvent(event)
event
/** Register event in the events monitor. */
private def registerMessage: PartialFunction[Any, Any] = { message =>
eventsMonitor ! EventsMonitorProtocol.RegisterRuntimeMessage(message)
message
}
}
@ -171,12 +148,6 @@ object RuntimeConnector {
*/
case class Initialize(engineConnection: MessageEndpoint)
/** Protocol message to register new events monitor in the runtime connector.
*
* @param eventsMonitor the events monitor to register
*/
case class RegisterEventsMonitor(eventsMonitor: EventsMonitor)
/** Protocol message to inform the actor about the connection being closed.
*/
case object Destroy
@ -184,15 +155,15 @@ object RuntimeConnector {
/** Helper for creating instances of the [[RuntimeConnector]] actor.
*
* @param lockManagerService a reference to the lock manager service actor
* @param monitor events monitor that handles messages between the language
* @param eventsMonitor events monitor that handles messages between the language
* server and the runtime
* @return a [[Props]] instance for the newly created actor.
*/
def props(lockManagerService: ActorRef, monitor: EventsMonitor): Props = {
def props(lockManagerService: ActorRef, eventsMonitor: ActorRef): Props = {
val lockRequests =
LockManagerService.handledRequestTypes.map(_ -> lockManagerService)
val handlers: Map[Class[_], ActorRef] = Map.from(lockRequests)
Props(new RuntimeConnector(handlers, monitor))
Props(new RuntimeConnector(handlers, eventsMonitor))
}
/** Endpoint implementation used to handle connections with the runtime.

View File

@ -35,6 +35,7 @@ import org.enso.languageserver.io._
import org.enso.languageserver.libraries._
import org.enso.languageserver.monitoring.IdlenessMonitor
import org.enso.languageserver.profiling.{
EventsMonitorActor,
ProfilingManager,
TestProfilingSnapshot
}
@ -54,6 +55,7 @@ import org.enso.librarymanager.published.PublishedLibraryCache
import org.enso.pkg.PackageManager
import org.enso.polyglot.data.TypeGraph
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.profiling.events.NoopEventsMonitor
import org.enso.runtimeversionmanager.test.{
FakeEnvironment,
TestableThreadSafeFileLockManager
@ -71,6 +73,7 @@ import java.nio.file.{Files, Path}
import java.util.UUID
import java.util.concurrent.{Executors, ThreadFactory}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import org.slf4j.LoggerFactory
@ -375,9 +378,13 @@ abstract class BaseServerTest
)
)
val eventsMonitor = system.actorOf(
EventsMonitorActor.props(new NoopEventsMonitor)
)
val profilingManager = system.actorOf(
ProfilingManager.props(
runtimeConnectorProbe.ref,
eventsMonitor,
distributionManager,
profilingSnapshot,
clock

View File

@ -2,7 +2,6 @@ package org.enso.languageserver.websocket.json
import org.enso.distribution.DistributionManager
import org.enso.languageserver.profiling.ProfilingManager
import org.enso.languageserver.runtime.RuntimeConnector
import org.enso.logger.ReportLogsOnFailure
import java.nio.file.Files
@ -20,21 +19,9 @@ class ProfilingManagerTest extends BaseServerTest with ReportLogsOnFailure {
val client = getInitialisedWsClient()
client.send(json.profilingStart(1))
runtimeConnectorProbe.receiveN(1).head match {
case _: RuntimeConnector.RegisterEventsMonitor =>
// Ok
case other =>
fail(s"Unexpected message: $other")
}
client.expectJson(json.ok(1))
client.send(json.profilingStop(2))
runtimeConnectorProbe.receiveN(1).head match {
case _: RuntimeConnector.RegisterEventsMonitor =>
// Ok
case other =>
fail(s"Unexpected message: $other")
}
client.expectJson(json.ok(2))
val distributionManager = getDistributionManager
@ -54,21 +41,9 @@ class ProfilingManagerTest extends BaseServerTest with ReportLogsOnFailure {
val client = getInitialisedWsClient()
client.send(json.profilingStart(1, memorySnapshot = true))
runtimeConnectorProbe.receiveN(1).head match {
case _: RuntimeConnector.RegisterEventsMonitor =>
// Ok
case other =>
fail(s"Unexpected message: $other")
}
client.expectJson(json.ok(1))
client.send(json.profilingStop(2))
runtimeConnectorProbe.receiveN(1).head match {
case _: RuntimeConnector.RegisterEventsMonitor =>
// Ok
case other =>
fail(s"Unexpected message: $other")
}
client.expectJson(json.ok(2))
val distributionManager = getDistributionManager

View File

@ -8,6 +8,7 @@ import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{Materializer, OverflowStrategy}
import com.typesafe.scalalogging.LazyLogging
import org.enso.jsonrpc.MessageHandler.WebMessage
import java.util.UUID
@ -20,6 +21,7 @@ import scala.concurrent.ExecutionContext
* @param clientControllerFactory a factory used to create a client controller
* @param config a server config
* @param optionalEndpoints a list of optional endpoints
* @param messageCallbacks a list of message callbacks
* @param system an actor system
* @param materializer a materializer
*/
@ -27,7 +29,8 @@ class JsonRpcServer(
protocolFactory: ProtocolFactory,
clientControllerFactory: ClientControllerFactory,
config: JsonRpcServer.Config = JsonRpcServer.Config.default,
optionalEndpoints: List[Endpoint] = List.empty
optionalEndpoints: List[Endpoint] = List.empty,
messageCallbacks: List[WebMessage => Unit] = List.empty
)(
implicit val system: ActorSystem,
implicit val materializer: Materializer
@ -36,6 +39,9 @@ class JsonRpcServer(
implicit val ec: ExecutionContext = system.dispatcher
private val messageCallbackSinks =
messageCallbacks.map(Sink.foreach[WebMessage])
private def newUser(port: Int): Flow[Message, Message, NotUsed] = {
val messageHandler =
system.actorOf(
@ -49,7 +55,7 @@ class JsonRpcServer(
s"message-handler-supervisor-${UUID.randomUUID()}"
)
val incomingMessages: Sink[Message, NotUsed] =
val incomingMessagesFlow =
Flow[Message]
.mapConcat({
case textMsg: TextMessage => textMsg :: Nil
@ -59,6 +65,11 @@ class JsonRpcServer(
_.toStrict(config.lazyMessageTimeout)
.map(msg => MessageHandler.WebMessage(msg.text))
)
val incomingMessagesFlowWithCallbacks =
messageCallbackSinks.foldLeft(incomingMessagesFlow)(_ alsoTo _)
val incomingMessages: Sink[Message, NotUsed] =
incomingMessagesFlowWithCallbacks
.wireTap { webMessage =>
logger.trace(s"Received text message: ${webMessage.message}.")
}

View File

@ -1,6 +1,7 @@
package org.enso.profiling.events;
import java.io.Closeable;
import java.nio.ByteBuffer;
/** Diagnostic tool that processes event messages. Used for debugging or performance review. */
public interface EventsMonitor extends Closeable {
@ -10,5 +11,9 @@ public interface EventsMonitor extends Closeable {
*
* @param event the event to register.
*/
void registerEvent(Object event);
void registerRuntimeMessage(Object event);
void registerTextRpcMessage(String message);
void registerBinaryRpcMessage(ByteBuffer message);
}

View File

@ -1,12 +1,19 @@
package org.enso.profiling.events;
import java.io.IOException;
import java.nio.ByteBuffer;
/** Events monitor that does nothing. */
public final class NoopEventsMonitor implements EventsMonitor {
@Override
public void registerEvent(Object event) {}
public void registerRuntimeMessage(Object event) {}
@Override
public void registerTextRpcMessage(String message) {}
@Override
public void registerBinaryRpcMessage(ByteBuffer message) {}
@Override
public void close() throws IOException {}