Introduce Threaded Executions to the Runtime Server Instrument - Part 1 (#781)

This commit is contained in:
Łukasz Olczak 2020-05-26 17:37:54 +02:00 committed by GitHub
parent 8463162dfa
commit 806bf9dcb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1026 additions and 503 deletions

View File

@ -160,7 +160,12 @@ class MainModule(serverConfig: LanguageServerConfig) {
)
lazy val jsonRpcServer =
new JsonRpcServer(JsonRpc.protocol, jsonRpcControllerFactory)
new JsonRpcServer(
JsonRpc.protocol,
jsonRpcControllerFactory,
JsonRpcServer
.Config(outgoingBufferSize = 10000, lazyMessageTimeout = 10.seconds)
)
lazy val binaryServer =
new BinaryWebSocketServer(

View File

@ -265,4 +265,12 @@ public class Context {
public boolean isStrictErrors() {
return getEnvironment().getOptions().get(RuntimeOptions.STRICT_ERRORS_KEY);
}
/**
* Creates a new thread that has access to the current language context.
*/
public Thread createThread(Runnable runnable) {
return environment.createThread(runnable);
}
}

View File

@ -1,37 +1,20 @@
package org.enso.interpreter.instrument
import java.io.File
import java.nio.ByteBuffer
import java.util.UUID
import java.util.function.Consumer
import java.util.logging.Level
import cats.implicits._
import com.oracle.truffle.api.TruffleContext
import org.enso.interpreter.instrument.Handler.{
EvalFailure,
EvaluationFailed,
ModuleNotFound
import org.enso.interpreter.instrument.command.CommandFactory
import org.enso.interpreter.instrument.execution.{
CommandProcessor,
PreemptiveCommandProcessor,
RuntimeContext
}
import org.enso.interpreter.instrument.IdExecutionInstrument.{
ExpressionCall,
ExpressionValue
}
import org.enso.interpreter.node.callable.FunctionCallInstrumentationNode.FunctionCall
import org.enso.interpreter.service.ExecutionService
import org.enso.pkg.QualifiedName
import org.enso.polyglot.runtime.Runtime.Api.{
ContextId,
ExpressionId,
RequestId,
VisualisationId
}
import org.enso.polyglot.runtime.Runtime.{Api, ApiResponse}
import org.enso.polyglot.runtime.Runtime.Api
import org.graalvm.polyglot.io.MessageEndpoint
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
import scala.util.control.NonFatal
import scala.concurrent.Await
import scala.concurrent.duration._
/**
* A message endpoint implementation used by the
@ -79,6 +62,7 @@ final class Handler {
var executionService: ExecutionService = _
var truffleContext: TruffleContext = _
var commandProcessor: CommandProcessor = _
/**
* Initializes the handler with relevant Truffle objects, allowing it to
@ -94,228 +78,8 @@ final class Handler {
executionService = service
truffleContext = context
endpoint.sendToClient(Api.Response(Api.InitializedNotification()))
}
sealed private trait ExecutionItem
private object ExecutionItem {
case class Method(
file: File,
constructor: String,
function: String
) extends ExecutionItem
case class CallData(callData: FunctionCall) extends ExecutionItem
}
private def onExpressionValueComputed(
contextId: Api.ContextId,
value: ExpressionValue
): Unit = {
sendValueUpdate(contextId, value)
fireVisualisationUpdates(contextId, value)
}
private def sendValueUpdate(
contextId: ContextId,
value: ExpressionValue
): Unit = {
endpoint.sendToClient(
Api.Response(
Api.ExpressionValuesComputed(
contextId,
Vector(
Api.ExpressionValueUpdate(
value.getExpressionId,
OptionConverters.toScala(value.getType),
Some(value.getValue.toString),
toMethodPointer(value)
)
)
)
)
)
}
private def fireVisualisationUpdates(
contextId: ContextId,
value: ExpressionValue
): Unit = {
val visualisations =
contextManager.findVisualisationForExpression(
contextId,
value.getExpressionId
)
visualisations foreach { visualisation =>
emitVisualisationUpdate(contextId, value, visualisation)
}
}
private def emitVisualisationUpdate(
contextId: ContextId,
value: ExpressionValue,
visualisation: Visualisation
): Unit = {
val errorMsgOrVisualisationData =
Either
.catchNonFatal {
executionService.callFunction(
visualisation.callback,
value.getValue
)
}
.leftMap(_.getMessage)
.flatMap {
case text: String => Right(text.getBytes("UTF-8"))
case bytes: Array[Byte] => Right(bytes)
case other =>
Left(s"Cannot encode ${other.getClass} to byte array")
}
errorMsgOrVisualisationData match {
case Left(msg) =>
endpoint.sendToClient(
Api.Response(Api.VisualisationEvaluationFailed(msg))
)
case Right(data) =>
endpoint.sendToClient(
Api.Response(
Api.VisualisationUpdate(
Api.VisualisationContext(
visualisation.id,
contextId,
value.getExpressionId
),
data
)
)
)
}
}
private def toMethodPointer(
value: ExpressionValue
): Option[Api.MethodPointer] =
for {
call <- Option(value.getCall)
qualifiedName <- QualifiedName.fromString(
call.getFunction.getCallTarget.getRootNode.getQualifiedName
)
moduleName <- qualifiedName.getParent
functionName <- QualifiedName.fromString(call.getFunction.getName)
typeName <- functionName.getParent
module <- OptionConverters.toScala(
executionService.getContext.getCompiler.topScope
.getModule(moduleName.toString)
)
modulePath <- Option(module.getPath)
} yield Api.MethodPointer(
new File(modulePath),
typeName.toString,
functionName.module
)
@scala.annotation.tailrec
private def execute(
executionItem: ExecutionItem,
callStack: List[UUID],
valueCallback: Consumer[ExpressionValue]
): Unit = {
var enterables: Map[UUID, FunctionCall] = Map()
val valsCallback: Consumer[ExpressionValue] =
if (callStack.isEmpty) valueCallback else _ => ()
val callablesCallback: Consumer[ExpressionCall] = fun =>
enterables += fun.getExpressionId -> fun.getCall
executionItem match {
case ExecutionItem.Method(file, cons, function) =>
executionService.execute(
file,
cons,
function,
cache,
valsCallback,
callablesCallback
)
case ExecutionItem.CallData(callData) =>
executionService.execute(
callData,
cache,
valsCallback,
callablesCallback
)
}
callStack match {
case Nil => ()
case item :: tail =>
enterables.get(item) match {
case Some(call) =>
execute(ExecutionItem.CallData(call), tail, valueCallback)
case None =>
()
}
}
}
private def execute(
contextId: Api.ContextId,
stack: List[Api.StackItem]
): Either[String, Unit] = {
def unwind(
stack: List[Api.StackItem],
explicitCalls: List[Api.StackItem.ExplicitCall],
localCalls: List[UUID]
): (List[Api.StackItem.ExplicitCall], List[UUID]) =
stack match {
case Nil =>
(explicitCalls, localCalls)
case List(call: Api.StackItem.ExplicitCall) =>
(List(call), localCalls)
case Api.StackItem.LocalCall(id) :: xs =>
unwind(xs, explicitCalls, id :: localCalls)
}
val (explicitCalls, localCalls) = unwind(stack, Nil, Nil)
for {
stackItem <- Either.fromOption(explicitCalls.headOption, "stack is empty")
item = toExecutionItem(stackItem)
_ <- Either
.catchNonFatal(
execute(item, localCalls, onExpressionValueComputed(contextId, _))
)
.leftMap { ex =>
executionService.getLogger.log(
Level.FINE,
s"Error executing a function '${item.function}'",
ex
)
s"error in function: ${item.function}"
}
} yield ()
}
private def executeAll(): Unit =
contextManager.getAll
.filter(kv => kv._2.nonEmpty)
.mapValues(_.toList)
.foreach(Function.tupled(execute))
private def toExecutionItem(
call: Api.StackItem.ExplicitCall
): ExecutionItem.Method =
ExecutionItem.Method(
call.methodPointer.file,
call.methodPointer.definedOnType,
call.methodPointer.name
)
private def withContext[A](action: => A): A = {
val token = truffleContext.enter()
try {
action
} finally {
truffleContext.leave(token)
}
commandProcessor =
new PreemptiveCommandProcessor(1, executionService.getContext)
}
/**
@ -324,257 +88,17 @@ final class Handler {
* @param msg the message to handle.
*/
def onMessage(msg: Api.Request): Unit = {
val requestId = msg.requestId
msg.payload match {
case Api.CreateContextRequest(contextId) =>
contextManager.create(contextId)
endpoint.sendToClient(
Api.Response(requestId, Api.CreateContextResponse(contextId))
)
case Api.PushContextRequest(contextId, item) => {
if (contextManager.get(contextId).isDefined) {
val stack = contextManager.getStack(contextId)
val payload = item match {
case call: Api.StackItem.ExplicitCall if stack.isEmpty =>
contextManager.push(contextId, item)
withContext(execute(contextId, List(call))) match {
case Right(()) => Api.PushContextResponse(contextId)
case Left(e) => Api.ExecutionFailed(contextId, e)
}
case _: Api.StackItem.LocalCall if stack.nonEmpty =>
contextManager.push(contextId, item)
withContext(execute(contextId, stack.toList)) match {
case Right(()) => Api.PushContextResponse(contextId)
case Left(e) => Api.ExecutionFailed(contextId, e)
}
case _ =>
Api.InvalidStackItemError(contextId)
}
endpoint.sendToClient(Api.Response(requestId, payload))
} else {
endpoint.sendToClient(
Api.Response(requestId, Api.ContextNotExistError(contextId))
)
}
}
case Api.PopContextRequest(contextId) =>
if (contextManager.get(contextId).isDefined) {
val payload = contextManager.pop(contextId) match {
case Some(_: Api.StackItem.ExplicitCall) =>
Api.PopContextResponse(contextId)
case Some(_: Api.StackItem.LocalCall) =>
val stack = contextManager.getStack(contextId)
withContext(execute(contextId, stack.toList)) match {
case Right(()) => Api.PopContextResponse(contextId)
case Left(e) => Api.ExecutionFailed(contextId, e)
}
case None =>
Api.EmptyStackError(contextId)
}
endpoint.sendToClient(Api.Response(requestId, payload))
} else {
endpoint.sendToClient(
Api.Response(requestId, Api.ContextNotExistError(contextId))
)
}
case Api.DestroyContextRequest(contextId) =>
if (contextManager.get(contextId).isDefined) {
contextManager.destroy(contextId)
endpoint.sendToClient(
Api.Response(requestId, Api.DestroyContextResponse(contextId))
)
} else {
endpoint.sendToClient(
Api.Response(requestId, Api.ContextNotExistError(contextId))
)
}
case Api.RecomputeContextRequest(contextId, _) =>
if (contextManager.get(contextId).isDefined) {
val stack = contextManager.getStack(contextId)
val payload = if (stack.isEmpty) {
Api.EmptyStackError(contextId)
} else {
withContext(execute(contextId, stack.toList)) match {
case Right(()) => Api.RecomputeContextResponse(contextId)
case Left(e) => Api.ExecutionFailed(contextId, e)
}
}
endpoint.sendToClient(Api.Response(requestId, payload))
} else {
endpoint.sendToClient(
Api.Response(requestId, Api.ContextNotExistError(contextId))
)
}
case Api.OpenFileNotification(path, contents) =>
executionService.setModuleSources(path, contents)
case Api.CloseFileNotification(path) =>
executionService.resetModuleSources(path)
case Api.EditFileNotification(path, edits) =>
executionService.modifyModuleSources(path, edits.asJava)
withContext(executeAll())
case Api.AttachVisualisation(visualisationId, expressionId, config) =>
if (contextManager.contains(config.executionContextId)) {
upsertVisualisation(
requestId,
visualisationId,
expressionId,
config,
Api.VisualisationAttached()
)
} else {
endpoint.sendToClient(
Api.Response(
requestId,
Api.ContextNotExistError(config.executionContextId)
)
)
}
case Api.DetachVisualisation(ctxId, visualisationId, exprId) =>
if (contextManager.contains(ctxId)) {
contextManager.removeVisualisation(ctxId, exprId, visualisationId)
endpoint.sendToClient(
Api.Response(
requestId,
Api.VisualisationDetached()
)
)
} else {
endpoint.sendToClient(
Api.Response(
requestId,
Api.ContextNotExistError(ctxId)
)
)
}
case Api.ModifyVisualisation(visualisationId, config) =>
if (contextManager.contains(config.executionContextId)) {
val maybeVisualisation = contextManager.getVisualisationById(
config.executionContextId,
visualisationId
)
maybeVisualisation match {
case None =>
endpoint.sendToClient(
Api.Response(requestId, Api.VisualisationNotFound())
)
case Some(visualisation) =>
upsertVisualisation(
requestId,
visualisationId,
visualisation.expressionId,
config,
Api.VisualisationModified()
)
}
} else {
endpoint.sendToClient(
Api.Response(
requestId,
Api.ContextNotExistError(config.executionContextId)
)
)
}
}
}
private def upsertVisualisation(
requestId: Option[RequestId],
visualisationId: VisualisationId,
expressionId: ExpressionId,
config: Api.VisualisationConfiguration,
replyWith: ApiResponse
): Unit = {
val maybeCallable =
evaluateExpression(config.visualisationModule, config.expression)
maybeCallable match {
case Left(ModuleNotFound) =>
endpoint.sendToClient(
Api.Response(
requestId,
Api.ModuleNotFound(config.visualisationModule)
)
)
case Left(EvaluationFailed(msg)) =>
endpoint.sendToClient(
Api.Response(
requestId,
Api.VisualisationExpressionFailed(msg)
)
)
case Right(callable) =>
val visualisation = Visualisation(
visualisationId,
expressionId,
callable
)
contextManager.upsertVisualisation(
config.executionContextId,
visualisation
)
endpoint.sendToClient(
Api.Response(requestId, replyWith)
)
val stack = contextManager.getStack(config.executionContextId)
withContext(execute(config.executionContextId, stack.toList))
}
}
private def evaluateExpression(
moduleName: String,
expression: String
): Either[EvalFailure, AnyRef] = {
val maybeModule = executionService.findModule(moduleName)
val notFoundOrModule =
if (maybeModule.isPresent) Right(maybeModule.get())
else Left(ModuleNotFound)
notFoundOrModule.flatMap { module =>
try {
withContext {
executionService.evaluateExpression(module, expression).asRight
}
} catch {
case NonFatal(th) => EvaluationFailed(th.getMessage).asLeft
}
}
val cmd = CommandFactory.createCommand(msg)
val ctx = RuntimeContext(
executionService,
contextManager,
endpoint,
truffleContext,
cache,
commandProcessor
)
val future = commandProcessor.invoke(cmd, ctx)
Await.result(future, 1.minute)
}
}
object Handler {
/**
* Base trait for evaluation failures.
*/
sealed trait EvalFailure
/**
* Signals that a module cannto be found.
*/
case object ModuleNotFound extends EvalFailure
/**
* Signals that an evaluation of an expression failed.
*
* @param msg the textual reason of a failure
*/
case class EvaluationFailed(msg: String) extends EvalFailure
}

View File

@ -0,0 +1,42 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that attaches a visualisation to an expression.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class AttachVisualisationCmd(
maybeRequestId: Option[RequestId],
request: Api.AttachVisualisation
) extends BaseVisualisationCmd {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.contains(
request.visualisationConfig.executionContextId
)) {
upsertVisualisation(
maybeRequestId,
request.visualisationId,
request.expressionId,
request.visualisationConfig,
Api.VisualisationAttached()
)
} else {
ctx.endpoint.sendToClient(
Api.Response(
maybeRequestId,
Api.ContextNotExistError(
request.visualisationConfig.executionContextId
)
)
)
}
}
}

View File

@ -0,0 +1,115 @@
package org.enso.interpreter.instrument.command
import cats.implicits._
import org.enso.interpreter.instrument.command.BaseVisualisationCmd.{
EvalFailure,
EvaluationFailed,
ModuleNotFound
}
import org.enso.interpreter.instrument.Visualisation
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api.{
ExpressionId,
RequestId,
VisualisationId
}
import org.enso.polyglot.runtime.Runtime.{Api, ApiResponse}
import scala.util.control.NonFatal
/**
* Base class for visualisation modification commands.
*/
abstract class BaseVisualisationCmd
extends Command
with ProgramExecutionSupport {
protected def upsertVisualisation(
requestId: Option[RequestId],
visualisationId: VisualisationId,
expressionId: ExpressionId,
config: Api.VisualisationConfiguration,
replyWith: ApiResponse
)(implicit ctx: RuntimeContext): Unit = {
val maybeCallable =
evaluateExpression(config.visualisationModule, config.expression)
maybeCallable match {
case Left(ModuleNotFound) =>
ctx.endpoint.sendToClient(
Api.Response(
requestId,
Api.ModuleNotFound(config.visualisationModule)
)
)
case Left(EvaluationFailed(msg)) =>
ctx.endpoint.sendToClient(
Api.Response(
requestId,
Api.VisualisationExpressionFailed(msg)
)
)
case Right(callable) =>
val visualisation = Visualisation(
visualisationId,
expressionId,
callable
)
ctx.contextManager.upsertVisualisation(
config.executionContextId,
visualisation
)
ctx.endpoint.sendToClient(
Api.Response(requestId, replyWith)
)
val stack = ctx.contextManager.getStack(config.executionContextId)
withContext(runProgram(config.executionContextId, stack.toList))
}
}
private def evaluateExpression(
moduleName: String,
expression: String
)(implicit ctx: RuntimeContext): Either[EvalFailure, AnyRef] = {
val maybeModule = ctx.executionService.findModule(moduleName)
val notFoundOrModule =
if (maybeModule.isPresent) Right(maybeModule.get())
else Left(ModuleNotFound)
notFoundOrModule.flatMap { module =>
try {
withContext {
ctx.executionService.evaluateExpression(module, expression).asRight
}
} catch {
case NonFatal(th) => EvaluationFailed(th.getMessage).asLeft
}
}
}
}
object BaseVisualisationCmd {
/**
* Base trait for evaluation failures.
*/
sealed trait EvalFailure
/**
* Signals that a module cannto be found.
*/
case object ModuleNotFound extends EvalFailure
/**
* Signals that an evaluation of an expression failed.
*
* @param msg the textual reason of a failure
*/
case class EvaluationFailed(msg: String) extends EvalFailure
}

View File

@ -0,0 +1,18 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
/**
* A command that closes a file.
*
* @param request a request for a service
*/
class CloseFileCmd(request: Api.CloseFileNotification) extends Command {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
ctx.executionService.resetModuleSources(request.path)
}
}

View File

@ -0,0 +1,18 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
/**
* Base command trait that encapsulates a function request. Uses
* [[RuntimeContext]] to perform a request.
*/
trait Command {
/**
* Executes a request.
*
* @param ctx contains suppliers of services to perform a request
*/
def execute(implicit ctx: RuntimeContext): Unit
}

View File

@ -0,0 +1,47 @@
package org.enso.interpreter.instrument.command
import org.enso.polyglot.runtime.Runtime.Api
/**
* A factory that creates a command for an API request.
*/
object CommandFactory {
/**
* Creates a command that encapsulates a function request as an object.
*
* @param request an API request
* @return a command
*/
def createCommand(request: Api.Request): Command =
request.payload match {
case payload: Api.CreateContextRequest =>
new CreateContextCmd(request.requestId, payload)
case payload: Api.PushContextRequest =>
new PushContextCmd(request.requestId, payload)
case payload: Api.PopContextRequest =>
new PopContextCmd(request.requestId, payload)
case payload: Api.DestroyContextRequest =>
new DestroyContextCmd(request.requestId, payload)
case payload: Api.RecomputeContextRequest =>
new RecomputeContextCmd(request.requestId, payload)
case payload: Api.AttachVisualisation =>
new AttachVisualisationCmd(request.requestId, payload)
case payload: Api.DetachVisualisation =>
new DetachVisualisationCmd(request.requestId, payload)
case payload: Api.ModifyVisualisation =>
new ModifyVisualisationCmd(request.requestId, payload)
case payload: Api.OpenFileNotification => new OpenFileCmd(payload)
case payload: Api.CloseFileNotification => new CloseFileCmd(payload)
case payload: Api.EditFileNotification => new EditFileCmd(payload)
}
}

View File

@ -0,0 +1,26 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that creates an execution context.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class CreateContextCmd(
maybeRequestId: Option[RequestId],
request: Api.CreateContextRequest
) extends Command {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
ctx.contextManager.create(request.contextId)
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.CreateContextResponse(request.contextId))
)
}
}

View File

@ -0,0 +1,37 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that destroys the specified execution context.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class DestroyContextCmd(
maybeRequestId: Option[RequestId],
request: Api.DestroyContextRequest
) extends Command
with ProgramExecutionSupport {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.get(request.contextId).isDefined) {
ctx.contextManager.destroy(request.contextId)
ctx.endpoint.sendToClient(
Api.Response(
maybeRequestId,
Api.DestroyContextResponse(request.contextId)
)
)
} else {
ctx.endpoint.sendToClient(
Api
.Response(maybeRequestId, Api.ContextNotExistError(request.contextId))
)
}
}
}

View File

@ -0,0 +1,42 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that detaches a visualisation from the expression.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class DetachVisualisationCmd(
maybeRequestId: Option[RequestId],
request: Api.DetachVisualisation
) extends Command {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.contains(request.contextId)) {
ctx.contextManager.removeVisualisation(
request.contextId,
request.expressionId,
request.visualisationId
)
ctx.endpoint.sendToClient(
Api.Response(
maybeRequestId,
Api.VisualisationDetached()
)
)
} else {
ctx.endpoint.sendToClient(
Api.Response(
maybeRequestId,
Api.ContextNotExistError(request.contextId)
)
)
}
}
}

View File

@ -0,0 +1,29 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import scala.jdk.CollectionConverters._
/**
* A command that edits a file.
*
* @param request a request for a service
*/
class EditFileCmd(request: Api.EditFileNotification)
extends Command
with ProgramExecutionSupport {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
ctx.executionService.modifyModuleSources(request.path, request.edits.asJava)
withContext(executeAll())
}
private def executeAll()(implicit ctx: RuntimeContext): Unit =
ctx.contextManager.getAll
.filter(kv => kv._2.nonEmpty)
.mapValues(_.toList)
.foreach(Function.tupled(runProgram))
}

View File

@ -0,0 +1,55 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that modifies a visualisation.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class ModifyVisualisationCmd(
maybeRequestId: Option[RequestId],
request: Api.ModifyVisualisation
) extends BaseVisualisationCmd {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.contains(
request.visualisationConfig.executionContextId
)) {
val maybeVisualisation = ctx.contextManager.getVisualisationById(
request.visualisationConfig.executionContextId,
request.visualisationId
)
maybeVisualisation match {
case None =>
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualisationNotFound())
)
case Some(visualisation) =>
upsertVisualisation(
maybeRequestId,
request.visualisationId,
visualisation.expressionId,
request.visualisationConfig,
Api.VisualisationModified()
)
}
} else {
ctx.endpoint.sendToClient(
Api.Response(
maybeRequestId,
Api.ContextNotExistError(
request.visualisationConfig.executionContextId
)
)
)
}
}
}

View File

@ -0,0 +1,18 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
/**
* A command that opens a file.
*
* @param request a request for a service
*/
class OpenFileCmd(request: Api.OpenFileNotification) extends Command {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
ctx.executionService.setModuleSources(request.path, request.contents)
}
}

View File

@ -0,0 +1,43 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that pops an item from a stack.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class PopContextCmd(
maybeRequestId: Option[RequestId],
request: Api.PopContextRequest
) extends Command
with ProgramExecutionSupport {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.get(request.contextId).isDefined) {
val payload = ctx.contextManager.pop(request.contextId) match {
case Some(_: Api.StackItem.ExplicitCall) =>
Api.PopContextResponse(request.contextId)
case Some(_: Api.StackItem.LocalCall) =>
val stack = ctx.contextManager.getStack(request.contextId)
withContext(runProgram(request.contextId, stack.toList)) match {
case Right(()) => Api.PopContextResponse(request.contextId)
case Left(e) => Api.ExecutionFailed(request.contextId, e)
}
case None =>
Api.EmptyStackError(request.contextId)
}
ctx.endpoint.sendToClient(Api.Response(maybeRequestId, payload))
} else {
ctx.endpoint.sendToClient(
Api
.Response(maybeRequestId, Api.ContextNotExistError(request.contextId))
)
}
}
}

View File

@ -0,0 +1,274 @@
package org.enso.interpreter.instrument.command
import java.io.File
import java.util.UUID
import java.util.function.Consumer
import java.util.logging.Level
import cats.implicits._
import org.enso.interpreter.instrument.IdExecutionInstrument.{
ExpressionCall,
ExpressionValue
}
import org.enso.interpreter.instrument.command.ProgramExecutionSupport.ExecutionItem
import org.enso.interpreter.instrument.Visualisation
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.node.callable.FunctionCallInstrumentationNode.FunctionCall
import org.enso.pkg.QualifiedName
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.ContextId
import scala.jdk.javaapi.OptionConverters
/**
* Provides support for executing Enso code. Adds convenient methods to
* run Enso programs in a Truffle context.
*/
trait ProgramExecutionSupport {
/**
* Executes action in a newly created Truffle context.
*
* @param action an action
* @param ctx a runtime context
* @return a result of executing the action
*/
def withContext[A](action: => A)(implicit ctx: RuntimeContext): A = {
val token = ctx.truffleContext.enter()
try {
action
} finally {
ctx.truffleContext.leave(token)
}
}
/**
* Runs an Enso program.
*
* @param executionItem an execution item
* @param callStack a call stack
* @param valueCallback a listener of computed values
* @param ctx a runtime context
*/
@scala.annotation.tailrec
final def runProgram(
executionItem: ExecutionItem,
callStack: List[UUID],
valueCallback: Consumer[ExpressionValue]
)(implicit ctx: RuntimeContext): Unit = {
var enterables: Map[UUID, FunctionCall] = Map()
val valsCallback: Consumer[ExpressionValue] =
if (callStack.isEmpty) valueCallback else _ => ()
val callablesCallback: Consumer[ExpressionCall] = fun =>
enterables += fun.getExpressionId -> fun.getCall
executionItem match {
case ExecutionItem.Method(file, cons, function) =>
ctx.executionService.execute(
file,
cons,
function,
ctx.cache,
valsCallback,
callablesCallback
)
case ExecutionItem.CallData(callData) =>
ctx.executionService.execute(
callData,
ctx.cache,
valsCallback,
callablesCallback
)
}
callStack match {
case Nil => ()
case item :: tail =>
enterables.get(item) match {
case Some(call) =>
runProgram(ExecutionItem.CallData(call), tail, valueCallback)
case None =>
()
}
}
}
/**
* Runs an Enso program.
*
* @param contextId an identifier of an execution context
* @param stack a call stack
* @param ctx a runtime context
* @return either an error message or Unit signaling completion of a program
*/
def runProgram(
contextId: Api.ContextId,
stack: List[Api.StackItem]
)(implicit ctx: RuntimeContext): Either[String, Unit] = {
def unwind(
stack: List[Api.StackItem],
explicitCalls: List[Api.StackItem.ExplicitCall],
localCalls: List[UUID]
): (List[Api.StackItem.ExplicitCall], List[UUID]) =
stack match {
case Nil =>
(explicitCalls, localCalls)
case List(call: Api.StackItem.ExplicitCall) =>
(List(call), localCalls)
case Api.StackItem.LocalCall(id) :: xs =>
unwind(xs, explicitCalls, id :: localCalls)
}
val (explicitCalls, localCalls) = unwind(stack, Nil, Nil)
for {
stackItem <- Either.fromOption(explicitCalls.headOption, "stack is empty")
item = toExecutionItem(stackItem)
_ <- Either
.catchNonFatal(
runProgram(item, localCalls, onExpressionValueComputed(contextId, _))
)
.leftMap { ex =>
ctx.executionService.getLogger.log(
Level.FINE,
s"Error executing a function '${item.function}'",
ex
)
s"error in function: ${item.function}"
}
} yield ()
}
private def onExpressionValueComputed(
contextId: Api.ContextId,
value: ExpressionValue
)(implicit ctx: RuntimeContext): Unit = {
sendValueUpdate(contextId, value)
fireVisualisationUpdates(contextId, value)
}
private def sendValueUpdate(
contextId: ContextId,
value: ExpressionValue
)(implicit ctx: RuntimeContext): Unit = {
ctx.endpoint.sendToClient(
Api.Response(
Api.ExpressionValuesComputed(
contextId,
Vector(
Api.ExpressionValueUpdate(
value.getExpressionId,
OptionConverters.toScala(value.getType),
Some(value.getValue.toString),
toMethodPointer(value)
)
)
)
)
)
}
private def fireVisualisationUpdates(
contextId: ContextId,
value: ExpressionValue
)(implicit ctx: RuntimeContext): Unit = {
val visualisations =
ctx.contextManager.findVisualisationForExpression(
contextId,
value.getExpressionId
)
visualisations foreach { visualisation =>
emitVisualisationUpdate(contextId, value, visualisation)
}
}
private def emitVisualisationUpdate(
contextId: ContextId,
value: ExpressionValue,
visualisation: Visualisation
)(implicit ctx: RuntimeContext): Unit = {
val errorMsgOrVisualisationData =
Either
.catchNonFatal {
ctx.executionService.callFunction(
visualisation.callback,
value.getValue
)
}
.leftMap(_.getMessage)
.flatMap {
case text: String => Right(text.getBytes("UTF-8"))
case bytes: Array[Byte] => Right(bytes)
case other =>
Left(s"Cannot encode ${other.getClass} to byte array")
}
errorMsgOrVisualisationData match {
case Left(msg) =>
ctx.endpoint.sendToClient(
Api.Response(Api.VisualisationEvaluationFailed(msg))
)
case Right(data) =>
ctx.endpoint.sendToClient(
Api.Response(
Api.VisualisationUpdate(
Api.VisualisationContext(
visualisation.id,
contextId,
value.getExpressionId
),
data
)
)
)
}
}
private def toMethodPointer(
value: ExpressionValue
)(implicit ctx: RuntimeContext): Option[Api.MethodPointer] =
for {
call <- Option(value.getCall)
qualifiedName <- QualifiedName.fromString(
call.getFunction.getCallTarget.getRootNode.getQualifiedName
)
moduleName <- qualifiedName.getParent
functionName <- QualifiedName.fromString(call.getFunction.getName)
typeName <- functionName.getParent
module <- OptionConverters.toScala(
ctx.executionService.getContext.getCompiler.topScope
.getModule(moduleName.toString)
)
modulePath <- Option(module.getPath)
} yield Api.MethodPointer(
new File(modulePath),
typeName.toString,
functionName.module
)
private def toExecutionItem(
call: Api.StackItem.ExplicitCall
): ExecutionItem.Method =
ExecutionItem.Method(
call.methodPointer.file,
call.methodPointer.definedOnType,
call.methodPointer.name
)
}
object ProgramExecutionSupport {
sealed private trait ExecutionItem
private object ExecutionItem {
case class Method(
file: File,
constructor: String,
function: String
) extends ExecutionItem
case class CallData(callData: FunctionCall) extends ExecutionItem
}
}

View File

@ -0,0 +1,48 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that pushes an item onto a stack.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class PushContextCmd(
maybeRequestId: Option[RequestId],
request: Api.PushContextRequest
) extends Command
with ProgramExecutionSupport {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.get(request.contextId).isDefined) {
val stack = ctx.contextManager.getStack(request.contextId)
val payload = request.stackItem match {
case call: Api.StackItem.ExplicitCall if stack.isEmpty =>
ctx.contextManager.push(request.contextId, request.stackItem)
withContext(runProgram(request.contextId, List(call))) match {
case Right(()) => Api.PushContextResponse(request.contextId)
case Left(e) => Api.ExecutionFailed(request.contextId, e)
}
case _: Api.StackItem.LocalCall if stack.nonEmpty =>
ctx.contextManager.push(request.contextId, request.stackItem)
withContext(runProgram(request.contextId, stack.toList)) match {
case Right(()) => Api.PushContextResponse(request.contextId)
case Left(e) => Api.ExecutionFailed(request.contextId, e)
}
case _ =>
Api.InvalidStackItemError(request.contextId)
}
ctx.endpoint.sendToClient(Api.Response(maybeRequestId, payload))
} else {
ctx.endpoint.sendToClient(
Api
.Response(maybeRequestId, Api.ContextNotExistError(request.contextId))
)
}
}
}

View File

@ -0,0 +1,40 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
/**
* A command that forces a recomputation of the current position.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class RecomputeContextCmd(
maybeRequestId: Option[RequestId],
request: Api.RecomputeContextRequest
) extends Command
with ProgramExecutionSupport {
/** @inheritdoc **/
override def execute(implicit ctx: RuntimeContext): Unit = {
if (ctx.contextManager.get(request.contextId).isDefined) {
val stack = ctx.contextManager.getStack(request.contextId)
val payload = if (stack.isEmpty) {
Api.EmptyStackError(request.contextId)
} else {
withContext(runProgram(request.contextId, stack.toList)) match {
case Right(()) => Api.RecomputeContextResponse(request.contextId)
case Left(e) => Api.ExecutionFailed(request.contextId, e)
}
}
ctx.endpoint.sendToClient(Api.Response(maybeRequestId, payload))
} else {
ctx.endpoint.sendToClient(
Api
.Response(maybeRequestId, Api.ContextNotExistError(request.contextId))
)
}
}
}

View File

@ -0,0 +1,31 @@
package org.enso.interpreter.instrument.execution
import org.enso.interpreter.instrument.command.Command
import org.enso.interpreter.instrument.execution.CommandProcessor.Done
import scala.concurrent.Future
/**
* Defines a uniform interface to execute commands.
*/
trait CommandProcessor {
/**
* Invokes a command with the provided context.
*
* @param cmd a command to execute
* @param ctx contains suppliers of services to perform a request
* @return a future signaling the completion of computations
*/
def invoke(cmd: Command, ctx: RuntimeContext): Future[Done.type]
}
object CommandProcessor {
/**
* Signals completion of computations.
*/
case object Done
}

View File

@ -0,0 +1,48 @@
package org.enso.interpreter.instrument.execution
import java.util.concurrent.{Callable, Executors}
import java.util.logging.Level
import org.enso.interpreter.instrument.command.Command
import org.enso.interpreter.instrument.execution.CommandProcessor.Done
import org.enso.interpreter.runtime.Context
import scala.concurrent.{Future, Promise}
import scala.util.control.NonFatal
/**
* This component schedules the execution of commands. It keep a queue of
* pending commands. It activates command execution in FIFO order.
*
* @param parallelism the size of the underlying compute thread pool
* @param context the language context
*/
class PreemptiveCommandProcessor(parallelism: Int, context: Context)
extends CommandProcessor {
private val executor = Executors.newFixedThreadPool(
parallelism,
new TruffleThreadFactory(context, "truffle-execution-engine")
)
/** @inheritdoc **/
def invoke(cmd: Command, ctx: RuntimeContext): Future[Done.type] = {
val promise = Promise[Done.type]()
executor.submit[Unit](new Callable[Unit] {
override def call(): Unit = {
val logger = ctx.executionService.getLogger
logger.log(Level.FINE, s"Executing command: $cmd...")
try {
cmd.execute(ctx)
logger.log(Level.FINE, s"Command $cmd finished.")
promise.success(Done)
} catch {
case NonFatal(ex) => promise.failure(ex)
}
}
})
promise.future
}
}

View File

@ -0,0 +1,30 @@
package org.enso.interpreter.instrument.execution
import com.oracle.truffle.api.TruffleContext
import org.enso.interpreter.instrument.{
Cache,
Endpoint,
ExecutionContextManager
}
import org.enso.interpreter.service.ExecutionService
/**
* Contains suppliers of services that provide application specific
* functionality.
*
* @param executionService a service allowing externally-triggered code
* execution
* @param contextManager a storage for active execution contexts
* @param endpoint a message endpoint
* @param truffleContext a context of a set of Truffle languages
* @param cache a storage for computed values
* @param commandProcessor a component responsible for executing commands
*/
case class RuntimeContext(
executionService: ExecutionService,
contextManager: ExecutionContextManager,
endpoint: Endpoint,
truffleContext: TruffleContext,
cache: Cache,
commandProcessor: CommandProcessor
)

View File

@ -0,0 +1,27 @@
package org.enso.interpreter.instrument.execution
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
import org.enso.interpreter.runtime.Context
/**
* A factory that creates new truffle threads on demand.
*
* @param context the language context
* @param prefix the prefix for names of created threads
*/
class TruffleThreadFactory(context: Context, prefix: String)
extends ThreadFactory {
private val counter = new AtomicInteger(0)
/** @inheritdoc **/
override def newThread(r: Runnable): Thread = {
val thread = context.createThread(r)
thread.setName(s"$prefix-${counter.incrementAndGet()}")
thread
}
}

View File

@ -75,9 +75,7 @@ class JsonRpcServer(
messageHandler ! MessageHandler.Connected(outActor)
NotUsed
}
.map(
(outMsg: MessageHandler.WebMessage) => TextMessage(outMsg.message)
)
.map((outMsg: MessageHandler.WebMessage) => TextMessage(outMsg.message))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
@ -123,7 +121,7 @@ object JsonRpcServer {
* @return a default config.
*/
def default: Config =
Config(outgoingBufferSize = 10, lazyMessageTimeout = 10.seconds)
Config(outgoingBufferSize = 1000, lazyMessageTimeout = 10.seconds)
}
case class WebConnect(webActor: ActorRef)