From d08cb704b05ed15dca323e50e7f5090741c668d3 Mon Sep 17 00:00:00 2001 From: Dmitry Bushev Date: Mon, 3 Jun 2024 07:50:59 +0100 Subject: [PATCH] Record interaction between GUI and LS (#10107) close #8328 Changelog: - add: message callbacks to JsonRpc and Binary servers - update: use events log to collect the RPC messages --- .../runtime/events/RuntimeEventsMonitor.java | 34 ++++++++--- .../enso/languageserver/boot/MainModule.scala | 40 ++++++++---- .../languageserver/boot/ProfilingConfig.scala | 47 ++++++++++---- .../http/server/BinaryWebSocketServer.scala | 24 +++++++- .../profiling/EventsMonitorActor.scala | 46 ++++++++++++++ .../profiling/EventsMonitorProtocol.scala | 20 ++++++ .../profiling/ProfilingManager.scala | 41 +++++++++---- .../runtime/RuntimeConnector.scala | 61 +++++-------------- .../websocket/json/BaseServerTest.scala | 9 ++- .../websocket/json/ProfilingManagerTest.scala | 25 -------- .../org/enso/jsonrpc/JsonRpcServer.scala | 17 +++++- .../enso/profiling/events/EventsMonitor.java | 7 ++- .../profiling/events/NoopEventsMonitor.java | 9 ++- 13 files changed, 257 insertions(+), 123 deletions(-) create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorActor.scala create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorProtocol.scala diff --git a/engine/language-server/src/main/java/org/enso/languageserver/runtime/events/RuntimeEventsMonitor.java b/engine/language-server/src/main/java/org/enso/languageserver/runtime/events/RuntimeEventsMonitor.java index 7f2bb91b0e..5c3981fbe4 100644 --- a/engine/language-server/src/main/java/org/enso/languageserver/runtime/events/RuntimeEventsMonitor.java +++ b/engine/language-server/src/main/java/org/enso/languageserver/runtime/events/RuntimeEventsMonitor.java @@ -2,8 +2,10 @@ package org.enso.languageserver.runtime.events; import java.io.IOException; import java.io.PrintStream; +import java.nio.ByteBuffer; import java.time.Clock; import java.time.Instant; +import java.util.Base64; import java.util.UUID; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -30,6 +32,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor { private static final String RECORDS_TAG_CLOSE = ""; private static final String MESSAGE_SEPARATOR = ","; private static final String MESSAGE_EMPTY_REQUEST_ID = ""; + private static final String HEARTBEAT_PATTERN = "\"method\": \"heartbeat/"; /** * Create an instance of {@link RuntimeEventsMonitor}. @@ -61,7 +64,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor { } @Override - public void registerEvent(Object event) { + public void registerRuntimeMessage(Object event) { if (event instanceof Runtime.ApiEnvelope envelope) { registerApiEnvelope(envelope); } else if (event instanceof RuntimeConnector.MessageFromRuntime messageFromRuntime) { @@ -69,6 +72,22 @@ public final class RuntimeEventsMonitor implements EventsMonitor { } } + @Override + public void registerTextRpcMessage(String message) { + if (message.contains(HEARTBEAT_PATTERN)) return; + String entry = buildEntry(Direction.REQUEST, Option.empty(), message); + out.print(entry); + } + + @Override + public void registerBinaryRpcMessage(ByteBuffer message) { + byte[] bytes = new byte[message.remaining()]; + message.get(bytes); + String payload = Base64.getEncoder().encodeToString(bytes); + String entry = buildEntry(Direction.REQUEST, Option.empty(), payload); + out.print(entry); + } + @Override public void close() throws IOException { out.println(RECORDS_TAG_CLOSE); @@ -77,19 +96,18 @@ public final class RuntimeEventsMonitor implements EventsMonitor { private void registerApiEnvelope(Runtime.ApiEnvelope event) { if (event instanceof Runtime$Api$Request request) { - String entry = - buildEntry(Direction.REQUEST, request.requestId(), request.payload().getClass()); + String payload = request.payload().getClass().getSimpleName(); + String entry = buildEntry(Direction.REQUEST, request.requestId(), payload); out.print(entry); } else if (event instanceof Runtime$Api$Response response) { - String entry = - buildEntry(Direction.RESPONSE, response.correlationId(), response.payload().getClass()); + String payload = response.payload().getClass().getSimpleName(); + String entry = buildEntry(Direction.RESPONSE, response.correlationId(), payload); out.print(entry); } } - private String buildEntry(Direction direction, Option requestId, Class payload) { + private String buildEntry(Direction direction, Option requestId, String payload) { String requestIdEntry = requestId.fold(() -> MESSAGE_EMPTY_REQUEST_ID, UUID::toString); - String payloadEntry = payload.getSimpleName(); Instant timeEntry = clock.instant(); String message = @@ -98,7 +116,7 @@ public final class RuntimeEventsMonitor implements EventsMonitor { .append(MESSAGE_SEPARATOR) .append(requestIdEntry) .append(MESSAGE_SEPARATOR) - .append(payloadEntry) + .append(payload) .toString(); LogRecord record = new LogRecord(Level.INFO, message); diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala index f832707ef4..fac66de002 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala @@ -24,7 +24,7 @@ import org.enso.languageserver.monitoring.{ IdlenessEndpoint, IdlenessMonitor } -import org.enso.languageserver.profiling.ProfilingManager +import org.enso.languageserver.profiling.{EventsMonitorActor, ProfilingManager} import org.enso.languageserver.protocol.binary.{ BinaryConnectionControllerFactory, InboundMessageDecoder @@ -172,22 +172,39 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { "lock-manager-service" ) - val runtimeEventsMonitor = + private val (runtimeEventsMonitor, messagesCallbackOpt) = languageServerConfig.profiling.profilingEventsLogPath match { case Some(path) => val out = new PrintStream(path.toFile, StandardCharsets.UTF_8) - new RuntimeEventsMonitor(out) + new RuntimeEventsMonitor(out) -> Some(()) case None => - new NoopEventsMonitor() + new NoopEventsMonitor() -> None } log.trace( "Started runtime events monitor [{}].", runtimeEventsMonitor.getClass.getName ) + private val eventsMonitor = + system.actorOf( + EventsMonitorActor.props(runtimeEventsMonitor), + "events-monitor" + ) + + private val messagesCallback = + messagesCallbackOpt + .map(_ => EventsMonitorActor.messagesCallback(eventsMonitor)) + .toList + + private val profilingManager = + system.actorOf( + ProfilingManager.props(eventsMonitor, distributionManager), + "profiling-manager" + ) + lazy val runtimeConnector = system.actorOf( - RuntimeConnector.props(lockManagerService, runtimeEventsMonitor), + RuntimeConnector.props(lockManagerService, eventsMonitor), "runtime-connector" ) @@ -370,12 +387,6 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { "project-settings-manager" ) - val profilingManager = - system.actorOf( - ProfilingManager.props(runtimeConnector, distributionManager), - "profiling-manager" - ) - val libraryLocations = LibraryLocations.resolve( distributionManager, @@ -476,7 +487,8 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { lazyMessageTimeout = 10.seconds, secureConfig = secureConfig ), - List(healthCheckEndpoint, idlenessEndpoint) + List(healthCheckEndpoint, idlenessEndpoint), + messagesCallback ) log.trace("Created JSON RPC Server [{}].", jsonRpcServer) @@ -489,7 +501,8 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { outgoingBufferSize = 100, lazyMessageTimeout = 10.seconds, secureConfig = secureConfig - ) + ), + messagesCallback ) log.trace("Created Binary WebSocket Server [{}].", binaryServer) @@ -502,6 +515,7 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { def close(): Unit = { suggestionsRepo.close() context.close() + runtimeEventsMonitor.close() log.info("Closed Language Server main module.") } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/ProfilingConfig.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/ProfilingConfig.scala index c82aed7ffe..e93232da8e 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/ProfilingConfig.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/ProfilingConfig.scala @@ -23,19 +23,44 @@ case class ProfilingConfig( * @return the path to the runtime events log file */ def profilingEventsLogPath: Option[Path] = - profilingPath.map { path => - val profilingDirectory = path.getParent - val profilingFileName = path.getFileName.toString - val profilingFileExtension = FilenameUtils.getExtension(profilingFileName) - val eventsLogFileName = - profilingFileName.stripSuffix( - profilingFileExtension - ) + ProfilingConfig.EventsLogExtension - - profilingDirectory.resolve(eventsLogFileName) - } + profilingPath.map( + ProfilingConfig.modifyPath( + _, + ProfilingConfig.EventsLogSuffix, + ProfilingConfig.EventsLogExtension + ) + ) } + object ProfilingConfig { + private val EventsLogSuffix = "" private val EventsLogExtension = "log" + + /** Modify the path by adding a suffix and changing the file extension. + * + * @param path the path to modify + * @param suffix the suffix to add + * @param extension the new file extension + * @return the modified path + */ + private def modifyPath( + path: Path, + suffix: String, + extension: String + ): Path = { + val directory = path.getParent + val fileName = path.getFileName.toString + val fileExtension = FilenameUtils.getExtension(fileName) + val modifiedFileName = + if (fileExtension.isEmpty) { + s"$fileName$suffix.$extension" + } else { + val fileNameWithoutExtension = fileName.stripSuffix(s".$fileExtension") + s"$fileNameWithoutExtension$suffix.$extension" + } + + directory.resolve(modifiedFileName) + } + } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/http/server/BinaryWebSocketServer.scala b/engine/language-server/src/main/scala/org/enso/languageserver/http/server/BinaryWebSocketServer.scala index c7dfaeee3c..2f101da3e1 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/http/server/BinaryWebSocketServer.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/http/server/BinaryWebSocketServer.scala @@ -25,6 +25,8 @@ import org.enso.languageserver.util.binary.{ DecodingFailure } +import java.nio.ByteBuffer + import scala.concurrent.duration._ import scala.concurrent.ExecutionContext @@ -35,6 +37,7 @@ import scala.concurrent.ExecutionContext * @param factory creates front controller per a single connection that is * responsible for handling all incoming requests * @param config a configuration object for properties of the server + * @param messageCallbacks a list of message callbacks * @param system an actor system that hosts the server * @param materializer an actor materializer that converts inbound and outbound * flows into actors running these streams @@ -45,7 +48,8 @@ class BinaryWebSocketServer[A, B]( decoder: BinaryDecoder[A], encoder: BinaryEncoder[B], factory: ConnectionControllerFactory, - config: Config = Config.default + config: Config = Config.default, + messageCallbacks: List[ByteBuffer => Unit] = List.empty )( implicit val system: ActorSystem, implicit val materializer: Materializer @@ -54,6 +58,13 @@ class BinaryWebSocketServer[A, B]( implicit val ec: ExecutionContext = system.dispatcher + private val messageCallbackSinks = + messageCallbacks.map { callback => + Sink.foreach[ByteBuffer] { byteBuffer => + callback(byteBuffer.asReadOnlyBuffer()) + } + } + private val route: Route = extractClientIP { case RemoteAddress.Unknown => @@ -108,7 +119,7 @@ class BinaryWebSocketServer[A, B]( frontController: ActorRef, ip: RemoteAddress.IP ): Sink[Message, NotUsed] = { - Flow[Message] + val flow = Flow[Message] .mapConcat[BinaryMessage] { case msg: TextMessage => logger.warn( @@ -125,7 +136,14 @@ class BinaryWebSocketServer[A, B]( _.toStrict(config.lazyMessageTimeout) } .map { binaryMsg => - val bytes = binaryMsg.data.asByteBuffer + binaryMsg.data.asByteBuffer + } + + val flowWithCallbacks = + messageCallbackSinks.foldLeft(flow)(_ alsoTo _) + + flowWithCallbacks + .map { bytes => decoder.decode(bytes) } .to { diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorActor.scala b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorActor.scala new file mode 100644 index 0000000000..eb3b0170ac --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorActor.scala @@ -0,0 +1,46 @@ +package org.enso.languageserver.profiling + +import akka.actor.{Actor, ActorRef, Props} +import org.enso.jsonrpc.MessageHandler +import org.enso.languageserver.util.UnhandledLogging +import org.enso.profiling.events.EventsMonitor + +import java.nio.ByteBuffer + +final class EventsMonitorActor(initialEventsMonitor: EventsMonitor) + extends Actor + with UnhandledLogging { + + override def receive: Receive = + initialized(initialEventsMonitor) + + private def initialized(eventsMonitor: EventsMonitor): Receive = { + case EventsMonitorProtocol.RegisterEventsMonitor(newEventsMonitor) => + context.become(initialized(newEventsMonitor)) + + case EventsMonitorProtocol.RegisterRuntimeMessage(message) => + eventsMonitor.registerRuntimeMessage(message) + + case EventsMonitorProtocol.RegisterTextRpcMessage(message) => + eventsMonitor.registerTextRpcMessage(message) + + case EventsMonitorProtocol.RegisterBinaryRpcMessage(message) => + eventsMonitor.registerBinaryRpcMessage(message) + } +} + +object EventsMonitorActor { + + def props(eventsMonitor: EventsMonitor): Props = + Props(new EventsMonitorActor(eventsMonitor)) + + def messagesCallback(eventsMonitor: ActorRef): Actor.Receive = { + case webMessage: MessageHandler.WebMessage => + eventsMonitor ! EventsMonitorProtocol.RegisterTextRpcMessage( + webMessage.message + ) + + case byteBuffer: ByteBuffer => + eventsMonitor ! EventsMonitorProtocol.RegisterBinaryRpcMessage(byteBuffer) + } +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorProtocol.scala b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorProtocol.scala new file mode 100644 index 0000000000..b3110ead16 --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/EventsMonitorProtocol.scala @@ -0,0 +1,20 @@ +package org.enso.languageserver.profiling + +import org.enso.profiling.events.EventsMonitor + +import java.nio.ByteBuffer + +object EventsMonitorProtocol { + + /** Protocol message to register new events monitor in the runtime connector. + * + * @param eventsMonitor the events monitor to register + */ + case class RegisterEventsMonitor(eventsMonitor: EventsMonitor) + + case class RegisterRuntimeMessage(message: Any) + + case class RegisterTextRpcMessage(message: String) + + case class RegisterBinaryRpcMessage(message: ByteBuffer) +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/profiling/ProfilingManager.scala b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/ProfilingManager.scala index a9dfc6a7eb..e42dc85d69 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/profiling/ProfilingManager.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/profiling/ProfilingManager.scala @@ -3,10 +3,9 @@ package org.enso.languageserver.profiling import akka.actor.{Actor, ActorRef, Props} import com.typesafe.scalalogging.LazyLogging import org.enso.distribution.DistributionManager -import org.enso.languageserver.runtime.RuntimeConnector import org.enso.languageserver.runtime.events.RuntimeEventsMonitor import org.enso.logger.masking.MaskedPath -import org.enso.profiling.events.NoopEventsMonitor +import org.enso.profiling.events.{EventsMonitor, NoopEventsMonitor} import org.enso.profiling.sampler.{MethodsSampler, OutputStreamSampler} import org.enso.profiling.snapshot.{HeapDumpSnapshot, ProfilingSnapshot} @@ -21,13 +20,13 @@ import scala.util.{Failure, Success, Try} /** Handles the profiling commands. * - * @param runtimeConnector the connection to runtime + * @param eventsMonitorActor the events monitor actor * @param distributionManager the distribution manager * @param profilingSnapshot the profiling snapshot generator * @param clock the system clock */ final class ProfilingManager( - runtimeConnector: ActorRef, + eventsMonitorActor: ActorRef, distributionManager: DistributionManager, profilingSnapshot: ProfilingSnapshot, clock: Clock @@ -56,22 +55,39 @@ final class ProfilingManager( sampler.start() val eventsMonitor = createEventsMonitor(instant) - runtimeConnector ! RuntimeConnector.RegisterEventsMonitor( + eventsMonitorActor ! EventsMonitorProtocol.RegisterEventsMonitor( eventsMonitor ) sender() ! ProfilingProtocol.ProfilingStartResponse context.become( initialized( - Some(RunningSampler(instant, sampler, result, memorySnapshot)) + Some( + RunningSampler( + instant, + sampler, + result, + memorySnapshot, + eventsMonitor + ) + ) ) ) } case ProfilingProtocol.ProfilingStopRequest => sampler match { - case Some(RunningSampler(instant, sampler, result, memorySnapshot)) => + case Some( + RunningSampler( + instant, + sampler, + result, + memorySnapshot, + eventsMonitor + ) + ) => sampler.stop() + eventsMonitor.close() Try(saveSamplerResult(result.toByteArray, instant)) match { case Failure(exception) => @@ -87,7 +103,7 @@ final class ProfilingManager( saveMemorySnapshot(instant) } - runtimeConnector ! RuntimeConnector.RegisterEventsMonitor( + eventsMonitorActor ! EventsMonitorProtocol.RegisterEventsMonitor( new NoopEventsMonitor ) @@ -172,7 +188,8 @@ object ProfilingManager { instant: Instant, sampler: MethodsSampler, result: ByteArrayOutputStream, - memorySnapshot: Boolean + memorySnapshot: Boolean, + eventsMonitor: EventsMonitor ) private def createProfilingFileName(instant: Instant): String = { @@ -197,20 +214,20 @@ object ProfilingManager { /** Creates the configuration object used to create a [[ProfilingManager]]. * - * @param runtimeConnector the connection to runtime + * @param eventsMonitor the events monitor actor * @param distributionManager the distribution manager * @param profilingSnapshot the profiling snapshot generator * @param clock the system clock */ def props( - runtimeConnector: ActorRef, + eventsMonitor: ActorRef, distributionManager: DistributionManager, profilingSnapshot: ProfilingSnapshot = new HeapDumpSnapshot(), clock: Clock = Clock.systemUTC() ): Props = Props( new ProfilingManager( - runtimeConnector, + eventsMonitor, distributionManager, profilingSnapshot, clock diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala index a1e73016d9..5e355252d9 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala @@ -2,6 +2,7 @@ package org.enso.languageserver.runtime import akka.actor.{Actor, ActorRef, Props, Stash} import com.typesafe.scalalogging.LazyLogging +import org.enso.languageserver.profiling.EventsMonitorProtocol import org.enso.languageserver.runtime.RuntimeConnector.{ Destroy, MessageFromRuntime @@ -12,7 +13,6 @@ import org.enso.logger.akka.ActorMessageLogging import org.enso.logger.masking.ToLogString import org.enso.polyglot.runtime.Runtime import org.enso.polyglot.runtime.Runtime.{Api, ApiEnvelope} -import org.enso.profiling.events.EventsMonitor import org.graalvm.polyglot.io.MessageEndpoint import java.nio.ByteBuffer @@ -22,7 +22,7 @@ import scala.util.{Failure, Success} /** An actor managing a connection to Enso's runtime server. */ final class RuntimeConnector( handlers: Map[Class[_], ActorRef], - initialEventsMonitor: EventsMonitor + eventsMonitor: ActorRef ) extends Actor with LazyLogging with ActorMessageLogging @@ -40,15 +40,12 @@ final class RuntimeConnector( engine ) unstashAll() - context.become(waitingOnEndpoint(engine, initialEventsMonitor)) + context.become(waitingOnEndpoint(engine)) case _ => stash() } - private def waitingOnEndpoint( - engine: MessageEndpoint, - eventsMonitor: EventsMonitor - ): Receive = - registerEvent(eventsMonitor).andThen(LoggingReceive { + private def waitingOnEndpoint(engine: MessageEndpoint): Receive = + registerMessage.andThen(LoggingReceive { case MessageFromRuntime( Runtime.Api.Response(None, Api.InitializedNotification()) ) => @@ -57,11 +54,7 @@ final class RuntimeConnector( engine ) unstashAll() - context.become(initialized(engine, eventsMonitor, Map())) - - case RuntimeConnector.RegisterEventsMonitor(newEventsMonitor) => - eventsMonitor.close() - context.become(waitingOnEndpoint(engine, newEventsMonitor)) + context.become(initialized(engine, Map())) case _ => stash() }) @@ -87,30 +80,21 @@ final class RuntimeConnector( * the runtime are forwarded to one of the registered handlers. * * @param engine endpoint of a runtime - * @param eventsMonitor the current events monitor * @param senders request ids with corresponding senders */ def initialized( engine: MessageEndpoint, - eventsMonitor: EventsMonitor, senders: Map[Runtime.Api.RequestId, ActorRef] - ): Receive = registerEvent(eventsMonitor).andThen(LoggingReceive { + ): Receive = registerMessage.andThen(LoggingReceive { case Destroy => - eventsMonitor.close() context.stop(self) - case RuntimeConnector.RegisterEventsMonitor(newEventsMonitor) => - eventsMonitor.close() - context.become(initialized(engine, newEventsMonitor, senders)) - case msg: Runtime.ApiEnvelope => engine.sendBinary(Runtime.Api.serialize(msg)) msg match { case Api.Request(Some(id), _) => - context.become( - initialized(engine, eventsMonitor, senders + (id -> sender())) - ) + context.become(initialized(engine, senders + (id -> sender()))) case _ => } @@ -146,20 +130,13 @@ final class RuntimeConnector( case _ => } } - context.become( - initialized(engine, eventsMonitor, senders - correlationId) - ) + context.become(initialized(engine, senders - correlationId)) }) - /** Register event in the events monitor - * - * @param eventsMonitor the current events monitor - */ - private def registerEvent( - eventsMonitor: EventsMonitor - ): PartialFunction[Any, Any] = { event => - eventsMonitor.registerEvent(event) - event + /** Register event in the events monitor. */ + private def registerMessage: PartialFunction[Any, Any] = { message => + eventsMonitor ! EventsMonitorProtocol.RegisterRuntimeMessage(message) + message } } @@ -171,12 +148,6 @@ object RuntimeConnector { */ case class Initialize(engineConnection: MessageEndpoint) - /** Protocol message to register new events monitor in the runtime connector. - * - * @param eventsMonitor the events monitor to register - */ - case class RegisterEventsMonitor(eventsMonitor: EventsMonitor) - /** Protocol message to inform the actor about the connection being closed. */ case object Destroy @@ -184,15 +155,15 @@ object RuntimeConnector { /** Helper for creating instances of the [[RuntimeConnector]] actor. * * @param lockManagerService a reference to the lock manager service actor - * @param monitor events monitor that handles messages between the language + * @param eventsMonitor events monitor that handles messages between the language * server and the runtime * @return a [[Props]] instance for the newly created actor. */ - def props(lockManagerService: ActorRef, monitor: EventsMonitor): Props = { + def props(lockManagerService: ActorRef, eventsMonitor: ActorRef): Props = { val lockRequests = LockManagerService.handledRequestTypes.map(_ -> lockManagerService) val handlers: Map[Class[_], ActorRef] = Map.from(lockRequests) - Props(new RuntimeConnector(handlers, monitor)) + Props(new RuntimeConnector(handlers, eventsMonitor)) } /** Endpoint implementation used to handle connections with the runtime. diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala index 88a18c9d91..8d6838c5e2 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala @@ -35,6 +35,7 @@ import org.enso.languageserver.io._ import org.enso.languageserver.libraries._ import org.enso.languageserver.monitoring.IdlenessMonitor import org.enso.languageserver.profiling.{ + EventsMonitorActor, ProfilingManager, TestProfilingSnapshot } @@ -54,6 +55,7 @@ import org.enso.librarymanager.published.PublishedLibraryCache import org.enso.pkg.PackageManager import org.enso.polyglot.data.TypeGraph import org.enso.polyglot.runtime.Runtime.Api +import org.enso.profiling.events.NoopEventsMonitor import org.enso.runtimeversionmanager.test.{ FakeEnvironment, TestableThreadSafeFileLockManager @@ -71,6 +73,7 @@ import java.nio.file.{Files, Path} import java.util.UUID import java.util.concurrent.{Executors, ThreadFactory} import java.util.concurrent.atomic.AtomicInteger + import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import org.slf4j.LoggerFactory @@ -375,9 +378,13 @@ abstract class BaseServerTest ) ) + val eventsMonitor = system.actorOf( + EventsMonitorActor.props(new NoopEventsMonitor) + ) + val profilingManager = system.actorOf( ProfilingManager.props( - runtimeConnectorProbe.ref, + eventsMonitor, distributionManager, profilingSnapshot, clock diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProfilingManagerTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProfilingManagerTest.scala index 3373335c95..77852fcdeb 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProfilingManagerTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProfilingManagerTest.scala @@ -2,7 +2,6 @@ package org.enso.languageserver.websocket.json import org.enso.distribution.DistributionManager import org.enso.languageserver.profiling.ProfilingManager -import org.enso.languageserver.runtime.RuntimeConnector import org.enso.logger.ReportLogsOnFailure import java.nio.file.Files @@ -20,21 +19,9 @@ class ProfilingManagerTest extends BaseServerTest with ReportLogsOnFailure { val client = getInitialisedWsClient() client.send(json.profilingStart(1)) - runtimeConnectorProbe.receiveN(1).head match { - case _: RuntimeConnector.RegisterEventsMonitor => - // Ok - case other => - fail(s"Unexpected message: $other") - } client.expectJson(json.ok(1)) client.send(json.profilingStop(2)) - runtimeConnectorProbe.receiveN(1).head match { - case _: RuntimeConnector.RegisterEventsMonitor => - // Ok - case other => - fail(s"Unexpected message: $other") - } client.expectJson(json.ok(2)) val distributionManager = getDistributionManager @@ -54,21 +41,9 @@ class ProfilingManagerTest extends BaseServerTest with ReportLogsOnFailure { val client = getInitialisedWsClient() client.send(json.profilingStart(1, memorySnapshot = true)) - runtimeConnectorProbe.receiveN(1).head match { - case _: RuntimeConnector.RegisterEventsMonitor => - // Ok - case other => - fail(s"Unexpected message: $other") - } client.expectJson(json.ok(1)) client.send(json.profilingStop(2)) - runtimeConnectorProbe.receiveN(1).head match { - case _: RuntimeConnector.RegisterEventsMonitor => - // Ok - case other => - fail(s"Unexpected message: $other") - } client.expectJson(json.ok(2)) val distributionManager = getDistributionManager diff --git a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala index bc2ded54c2..2dc1985ec6 100644 --- a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala +++ b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala @@ -8,6 +8,7 @@ import akka.http.scaladsl.server.Route import akka.stream.scaladsl.{Flow, Sink, Source} import akka.stream.{Materializer, OverflowStrategy} import com.typesafe.scalalogging.LazyLogging +import org.enso.jsonrpc.MessageHandler.WebMessage import java.util.UUID @@ -20,14 +21,16 @@ import scala.concurrent.ExecutionContext * @param clientControllerFactory a factory used to create a client controller * @param config a server config * @param optionalEndpoints a list of optional endpoints + * @param messageCallbacks a list of message callbacks * @param system an actor system * @param materializer a materializer */ class JsonRpcServer( protocolFactory: ProtocolFactory, clientControllerFactory: ClientControllerFactory, - config: JsonRpcServer.Config = JsonRpcServer.Config.default, - optionalEndpoints: List[Endpoint] = List.empty + config: JsonRpcServer.Config = JsonRpcServer.Config.default, + optionalEndpoints: List[Endpoint] = List.empty, + messageCallbacks: List[WebMessage => Unit] = List.empty )( implicit val system: ActorSystem, implicit val materializer: Materializer @@ -36,6 +39,9 @@ class JsonRpcServer( implicit val ec: ExecutionContext = system.dispatcher + private val messageCallbackSinks = + messageCallbacks.map(Sink.foreach[WebMessage]) + private def newUser(port: Int): Flow[Message, Message, NotUsed] = { val messageHandler = system.actorOf( @@ -49,7 +55,7 @@ class JsonRpcServer( s"message-handler-supervisor-${UUID.randomUUID()}" ) - val incomingMessages: Sink[Message, NotUsed] = + val incomingMessagesFlow = Flow[Message] .mapConcat({ case textMsg: TextMessage => textMsg :: Nil @@ -59,6 +65,11 @@ class JsonRpcServer( _.toStrict(config.lazyMessageTimeout) .map(msg => MessageHandler.WebMessage(msg.text)) ) + val incomingMessagesFlowWithCallbacks = + messageCallbackSinks.foldLeft(incomingMessagesFlow)(_ alsoTo _) + + val incomingMessages: Sink[Message, NotUsed] = + incomingMessagesFlowWithCallbacks .wireTap { webMessage => logger.trace(s"Received text message: ${webMessage.message}.") } diff --git a/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/EventsMonitor.java b/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/EventsMonitor.java index c532a9522e..4724eec1ce 100644 --- a/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/EventsMonitor.java +++ b/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/EventsMonitor.java @@ -1,6 +1,7 @@ package org.enso.profiling.events; import java.io.Closeable; +import java.nio.ByteBuffer; /** Diagnostic tool that processes event messages. Used for debugging or performance review. */ public interface EventsMonitor extends Closeable { @@ -10,5 +11,9 @@ public interface EventsMonitor extends Closeable { * * @param event the event to register. */ - void registerEvent(Object event); + void registerRuntimeMessage(Object event); + + void registerTextRpcMessage(String message); + + void registerBinaryRpcMessage(ByteBuffer message); } diff --git a/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/NoopEventsMonitor.java b/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/NoopEventsMonitor.java index 2e87ba1f83..de555c13ab 100644 --- a/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/NoopEventsMonitor.java +++ b/lib/scala/profiling-utils/src/main/java/org/enso/profiling/events/NoopEventsMonitor.java @@ -1,12 +1,19 @@ package org.enso.profiling.events; import java.io.IOException; +import java.nio.ByteBuffer; /** Events monitor that does nothing. */ public final class NoopEventsMonitor implements EventsMonitor { @Override - public void registerEvent(Object event) {} + public void registerRuntimeMessage(Object event) {} + + @Override + public void registerTextRpcMessage(String message) {} + + @Override + public void registerBinaryRpcMessage(ByteBuffer message) {} @Override public void close() throws IOException {}