Add executionContext/interrupt API command (#3952)

Implement the `executionContext/interrupt` API command that forcibly stops the program execution.
This commit is contained in:
Dmitry Bushev 2022-12-08 03:04:46 +03:00 committed by GitHub
parent 4641426ce9
commit 43167c1617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 560 additions and 0 deletions

View File

@ -469,6 +469,7 @@
- [Don't export polyglot symbols][3915] - [Don't export polyglot symbols][3915]
- [From/all import must not include module in name resolution][3931] - [From/all import must not include module in name resolution][3931]
- [Vector returns warnings of individual elements][3938] - [Vector returns warnings of individual elements][3938]
- [Add executionContext/interrupt API command][3952]
[3227]: https://github.com/enso-org/enso/pull/3227 [3227]: https://github.com/enso-org/enso/pull/3227
[3248]: https://github.com/enso-org/enso/pull/3248 [3248]: https://github.com/enso-org/enso/pull/3248
@ -542,6 +543,7 @@
[3915]: https://github.com/enso-org/enso/pull/3915 [3915]: https://github.com/enso-org/enso/pull/3915
[3931]: https://github.com/enso-org/enso/pull/3931 [3931]: https://github.com/enso-org/enso/pull/3931
[3938]: https://github.com/enso-org/enso/pull/3938 [3938]: https://github.com/enso-org/enso/pull/3938
[3952]: https://github.com/enso-org/enso/pull/3952
# Enso 2.0.0-alpha.18 (2021-10-12) # Enso 2.0.0-alpha.18 (2021-10-12)

View File

@ -133,6 +133,7 @@ transport formats, please look [here](./protocol-architecture).
- [`executionContext/push`](#executioncontextpush) - [`executionContext/push`](#executioncontextpush)
- [`executionContext/pop`](#executioncontextpop) - [`executionContext/pop`](#executioncontextpop)
- [`executionContext/recompute`](#executioncontextrecompute) - [`executionContext/recompute`](#executioncontextrecompute)
- [`executionContext/interrupt`](#executioncontextinterrupt)
- [`executionContext/getComponentGroups`](#executioncontextgetcomponentgroups) - [`executionContext/getComponentGroups`](#executioncontextgetcomponentgroups)
- [`executionContext/expressionUpdates`](#executioncontextexpressionupdates) - [`executionContext/expressionUpdates`](#executioncontextexpressionupdates)
- [`executionContext/executionFailed`](#executioncontextexecutionfailed) - [`executionContext/executionFailed`](#executioncontextexecutionfailed)
@ -1889,6 +1890,7 @@ destroying the context.
- [`executionContext/destroy`](#executioncontextdestroy) - [`executionContext/destroy`](#executioncontextdestroy)
- [`executionContext/recompute`](#executioncontextrecompute) - [`executionContext/recompute`](#executioncontextrecompute)
- [`executionContext/interrupt`](#executioncontextinterrupt)
- [`executionContext/push`](#executioncontextpush) - [`executionContext/push`](#executioncontextpush)
- [`executionContext/pop`](#executioncontextpop) - [`executionContext/pop`](#executioncontextpop)
- [`executionContext/executeExpression`](#executioncontextexecuteexpression) - [`executionContext/executeExpression`](#executioncontextexecuteexpression)
@ -3730,6 +3732,35 @@ null;
- [`EmptyStackError`](#emptystackerror) when the user tries to recompute an - [`EmptyStackError`](#emptystackerror) when the user tries to recompute an
empty stack. empty stack.
### `executionContext/interrupt`
Sent from the client to the server to interrupt the program execution in the
provided execution context.
- **Type:** Request
- **Direction:** Client -> Server
- **Connection:** Protocol
- **Visibility:** Public
#### Parameters
```typescript
{
contextId: ContextId;
}
```
#### Result
```typescript
null;
```
#### Errors
- [`AccessDeniedError`](#accessdeniederror) when the user does not hold the
`executionContext/canModify` capability for this context.
### `executionContext/getComponentGroups` ### `executionContext/getComponentGroups`
Sent from the client to the server to get the list of component groups available Sent from the client to the server to get the list of component groups available

View File

@ -485,6 +485,8 @@ class JsonConnectionController(
.props(requestTimeout, contextRegistry, rpcSession), .props(requestTimeout, contextRegistry, rpcSession),
ExecutionContextRecompute -> executioncontext.RecomputeHandler ExecutionContextRecompute -> executioncontext.RecomputeHandler
.props(requestTimeout, contextRegistry, rpcSession), .props(requestTimeout, contextRegistry, rpcSession),
ExecutionContextInterrupt -> executioncontext.InterruptHandler
.props(requestTimeout, contextRegistry, rpcSession),
ExecutionContextGetComponentGroups -> executioncontext.GetComponentGroupsHandler ExecutionContextGetComponentGroups -> executioncontext.GetComponentGroupsHandler
.props(requestTimeout, contextRegistry, rpcSession.clientId), .props(requestTimeout, contextRegistry, rpcSession.clientId),
GetSuggestionsDatabaseVersion -> search.GetSuggestionsDatabaseVersionHandler GetSuggestionsDatabaseVersion -> search.GetSuggestionsDatabaseVersionHandler

View File

@ -68,6 +68,7 @@ object JsonRpc {
.registerRequest(ExecutionContextPush) .registerRequest(ExecutionContextPush)
.registerRequest(ExecutionContextPop) .registerRequest(ExecutionContextPop)
.registerRequest(ExecutionContextRecompute) .registerRequest(ExecutionContextRecompute)
.registerRequest(ExecutionContextInterrupt)
.registerRequest(ExecutionContextGetComponentGroups) .registerRequest(ExecutionContextGetComponentGroups)
.registerRequest(ExecuteExpression) .registerRequest(ExecuteExpression)
.registerRequest(AttachVisualisation) .registerRequest(AttachVisualisation)

View File

@ -0,0 +1,87 @@
package org.enso.languageserver.requesthandler.executioncontext
import akka.actor.{Actor, ActorRef, Cancellable, Props}
import com.typesafe.scalalogging.LazyLogging
import org.enso.jsonrpc._
import org.enso.languageserver.requesthandler.RequestTimeout
import org.enso.languageserver.runtime.ExecutionApi._
import org.enso.languageserver.runtime.{
ContextRegistryProtocol,
RuntimeFailureMapper
}
import org.enso.languageserver.session.JsonSession
import org.enso.languageserver.util.UnhandledLogging
import scala.concurrent.duration.FiniteDuration
/** A request handler for `executionContext/interrupt` commands.
*
* @param timeout request timeout
* @param contextRegistry a reference to the context registry.
* @param session an object representing a client connected to the language server
*/
class InterruptHandler(
timeout: FiniteDuration,
contextRegistry: ActorRef,
session: JsonSession
) extends Actor
with LazyLogging
with UnhandledLogging {
import context.dispatcher
override def receive: Receive = requestStage
private def requestStage: Receive = {
case Request(
ExecutionContextInterrupt,
id,
params: ExecutionContextInterrupt.Params
) =>
contextRegistry ! ContextRegistryProtocol.InterruptContextRequest(
session,
params.contextId
)
val cancellable =
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
context.become(responseStage(id, sender(), cancellable))
}
private def responseStage(
id: Id,
replyTo: ActorRef,
cancellable: Cancellable
): Receive = {
case RequestTimeout =>
logger.error("Request [{}] timed out.", id)
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
context.stop(self)
case ContextRegistryProtocol.InterruptContextResponse(_) =>
replyTo ! ResponseResult(ExecutionContextInterrupt, id, Unused)
cancellable.cancel()
context.stop(self)
case error: ContextRegistryProtocol.Failure =>
replyTo ! ResponseError(Some(id), RuntimeFailureMapper.mapFailure(error))
cancellable.cancel()
context.stop(self)
}
}
object InterruptHandler {
/** Creates configuration object used to create an [[InterruptHandler]].
*
* @param timeout request timeout
* @param contextRegistry a reference to the context registry.
* @param rpcSession an object representing a client connected to the language server
*/
def props(
timeout: FiniteDuration,
contextRegistry: ActorRef,
rpcSession: JsonSession
): Props =
Props(new InterruptHandler(timeout, contextRegistry, rpcSession))
}

View File

@ -207,6 +207,21 @@ final class ContextRegistry(
sender() ! AccessDenied sender() ! AccessDenied
} }
case InterruptContextRequest(client, contextId) =>
if (store.hasContext(client.clientId, contextId)) {
val handler =
context.actorOf(
InterruptContextHandler.props(
runtimeFailureMapper,
timeout,
runtime
)
)
handler.forward(Api.InterruptContextRequest(contextId))
} else {
sender() ! AccessDenied
}
case GetComponentGroupsRequest(clientId, contextId) => case GetComponentGroupsRequest(clientId, contextId) =>
if (store.hasContext(clientId, contextId)) { if (store.hasContext(clientId, contextId)) {
val handler = val handler =

View File

@ -111,6 +111,22 @@ object ContextRegistryProtocol {
*/ */
case class RecomputeContextResponse(contextId: ContextId) case class RecomputeContextResponse(contextId: ContextId)
/** A request to the context registry to interrupt an execution context.
*
* @param rpcSession reference to the client
* @param contextId execution context identifier
*/
case class InterruptContextRequest(
rpcSession: JsonSession,
contextId: ContextId
)
/** A response about interrupting the context.
*
* @param contextId execution context identifier
*/
case class InterruptContextResponse(contextId: ContextId)
/** A request to the context registry to get the loaded component groups. /** A request to the context registry to get the loaded component groups.
* *
* @param clientId the internal id of the client * @param clientId the internal id of the client

View File

@ -102,6 +102,19 @@ object ExecutionApi {
} }
} }
case object ExecutionContextInterrupt
extends Method("executionContext/interrupt") {
case class Params(contextId: ContextId)
implicit val hasParams = new HasParams[this.type] {
type Params = ExecutionContextInterrupt.Params
}
implicit val hasResult = new HasResult[this.type] {
type Result = Unused.type
}
}
case object ExecutionContextGetComponentGroups case object ExecutionContextGetComponentGroups
extends Method("executionContext/getComponentGroups") { extends Method("executionContext/getComponentGroups") {

View File

@ -0,0 +1,77 @@
package org.enso.languageserver.runtime.handler
import akka.actor.{Actor, ActorRef, Cancellable, Props}
import akka.pattern.pipe
import com.typesafe.scalalogging.LazyLogging
import org.enso.languageserver.requesthandler.RequestTimeout
import org.enso.languageserver.runtime.{
ContextRegistryProtocol,
RuntimeFailureMapper
}
import org.enso.languageserver.util.UnhandledLogging
import org.enso.polyglot.runtime.Runtime.Api
import java.util.UUID
import scala.concurrent.duration.FiniteDuration
/** A request handler for interrupt command.
*
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime connector
*/
final class InterruptContextHandler(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
) extends Actor
with LazyLogging
with UnhandledLogging {
import context.dispatcher
override def receive: Receive = requestStage
private def requestStage: Receive = { case msg: Api.InterruptContextRequest =>
runtime ! Api.Request(UUID.randomUUID(), msg)
val cancellable =
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
context.become(responseStage(sender(), cancellable))
}
private def responseStage(
replyTo: ActorRef,
cancellable: Cancellable
): Receive = {
case RequestTimeout =>
replyTo ! RequestTimeout
context.stop(self)
case Api.Response(_, Api.InterruptContextResponse(contextId)) =>
replyTo ! ContextRegistryProtocol.InterruptContextResponse(contextId)
cancellable.cancel()
context.stop(self)
case Api.Response(_, error: Api.Error) =>
runtimeFailureMapper.mapApiError(error).pipeTo(replyTo)
cancellable.cancel()
context.stop(self)
}
}
object InterruptContextHandler {
/** Creates configuration object used to create an [[InterruptContextHandler]].
*
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime connector
*/
def props(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
): Props =
Props(new InterruptContextHandler(runtimeFailureMapper, timeout, runtime))
}

View File

@ -77,6 +77,14 @@ object Runtime {
value = classOf[Api.RecomputeContextResponse], value = classOf[Api.RecomputeContextResponse],
name = "recomputeContextResponse" name = "recomputeContextResponse"
), ),
new JsonSubTypes.Type(
value = classOf[Api.InterruptContextRequest],
name = "interruptContextRequest"
),
new JsonSubTypes.Type(
value = classOf[Api.InterruptContextResponse],
name = "interruptContextResponse"
),
new JsonSubTypes.Type( new JsonSubTypes.Type(
value = classOf[Api.GetComponentGroupsRequest], value = classOf[Api.GetComponentGroupsRequest],
name = "getComponentGroupsRequest" name = "getComponentGroupsRequest"
@ -1167,6 +1175,22 @@ object Runtime {
final case class RecomputeContextResponse(contextId: ContextId) final case class RecomputeContextResponse(contextId: ContextId)
extends ApiResponse extends ApiResponse
/** A Request sent from the client to the runtime server, to interrupt
* the execution context.
*
* @param contextId the context's id.
*/
final case class InterruptContextRequest(contextId: ContextId)
extends ApiRequest
/** A response sent from the server upon handling the
* [[InterruptContextRequest]].
*
* @param contextId the context's id.
*/
final case class InterruptContextResponse(contextId: ContextId)
extends ApiResponse
/** A request sent from the client to the runtime server to get the /** A request sent from the client to the runtime server to get the
* component groups loaded in runtime. * component groups loaded in runtime.
*/ */

View File

@ -0,0 +1,245 @@
package org.enso.interpreter.test.instrument
import org.enso.distribution.FileSystem
import org.enso.distribution.locking.ThreadSafeFileLockManager
import org.enso.interpreter.instrument.execution.Timer
import org.enso.interpreter.test.Metadata
import org.enso.pkg.{Package, PackageManager}
import org.enso.polyglot._
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.text.{ContentVersion, Sha3_224VersionCalculator}
import org.graalvm.polyglot.Context
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.io.{ByteArrayOutputStream, File}
import java.nio.file.{Files, Path, Paths}
import java.util.UUID
@scala.annotation.nowarn("msg=multiarg infix syntax")
class RuntimeAsyncCommandsTest
extends AnyFlatSpec
with Matchers
with BeforeAndAfterEach {
// === Test Timer ===========================================================
class TestTimer extends Timer {
override def getTime(): Long = 0
}
// === Test Utilities =======================================================
var context: TestContext = _
class TestContext(packageName: String) extends InstrumentTestContext {
val tmpDir: Path = Files.createTempDirectory("enso-test-packages")
sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir))
val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks"))
val runtimeServerEmulator =
new RuntimeServerEmulator(messageQueue, lockManager)
val pkg: Package[File] =
PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test")
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val executionContext = new PolyglotContext(
Context
.newBuilder(LanguageInfo.ID)
.allowExperimentalOptions(true)
.allowAllAccess(true)
.option(RuntimeOptions.PROJECT_ROOT, pkg.root.getAbsolutePath)
.option(RuntimeOptions.LOG_LEVEL, "WARNING")
.option(
RuntimeOptions.INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION,
"false"
)
.option(RuntimeOptions.ENABLE_PROJECT_SUGGESTIONS, "false")
.option(RuntimeOptions.ENABLE_GLOBAL_SUGGESTIONS, "false")
.option(
RuntimeOptions.DISABLE_IR_CACHES,
InstrumentTestContext.DISABLE_IR_CACHE
)
.option(RuntimeServerInfo.ENABLE_OPTION, "true")
.option(RuntimeOptions.INTERACTIVE_MODE, "true")
.option(
RuntimeOptions.LANGUAGE_HOME_OVERRIDE,
Paths
.get("../../test/micro-distribution/component")
.toFile
.getAbsolutePath
)
.option(RuntimeOptions.EDITION_OVERRIDE, "0.0.0-dev")
.out(out)
.serverTransport(runtimeServerEmulator.makeServerTransport)
.build()
)
executionContext.context.initialize(LanguageInfo.ID)
val languageContext = executionContext.context
.getBindings(LanguageInfo.ID)
.invokeMember(MethodNames.TopScope.LEAK_CONTEXT)
.asHostObject[org.enso.interpreter.runtime.EnsoContext]
languageContext.getLanguage.getIdExecutionService.ifPresent(
_.overrideTimer(new TestTimer)
)
def writeMain(contents: String): File =
Files.write(pkg.mainFile.toPath, contents.getBytes).toFile
def writeFile(file: File, contents: String): File =
Files.write(file.toPath, contents.getBytes).toFile
def writeInSrcDir(moduleName: String, contents: String): File = {
val file = new File(pkg.sourceDir, s"$moduleName.enso")
Files.write(file.toPath, contents.getBytes).toFile
}
def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg)
def consumeOut: List[String] = {
val result = out.toString
out.reset()
result.linesIterator.toList
}
def executionComplete(contextId: UUID): Api.Response =
Api.Response(Api.ExecutionComplete(contextId))
}
def contentsVersion(content: String): ContentVersion =
Sha3_224VersionCalculator.evalVersion(content)
override protected def beforeEach(): Unit = {
context = new TestContext("Test")
val Some(Api.Response(_, Api.InitializedNotification())) = context.receive
}
it should "interrupt stopped execution context" in {
val moduleName = "Enso_Test.Test.Main"
val contextId = UUID.randomUUID()
val requestId = UUID.randomUUID()
val metadata = new Metadata
val code =
"""main = "Hello World!"
|""".stripMargin.linesIterator.mkString("\n")
val contents = metadata.appendToCode(code)
val mainFile = context.writeMain(contents)
// create context
context.send(Api.Request(requestId, Api.CreateContextRequest(contextId)))
context.receive shouldEqual Some(
Api.Response(requestId, Api.CreateContextResponse(contextId))
)
// Open the new file
context.send(
Api.Request(Api.OpenFileNotification(mainFile, contents))
)
context.receiveNone shouldEqual None
// push main
val item1 = Api.StackItem.ExplicitCall(
Api.MethodPointer(moduleName, "Enso_Test.Test.Main", "main"),
None,
Vector()
)
context.send(
Api.Request(requestId, Api.PushContextRequest(contextId, item1))
)
context.receiveNIgnoreExpressionUpdates(
2
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.PushContextResponse(contextId)),
context.executionComplete(contextId)
)
// interrupt
context.send(
Api.Request(requestId, Api.InterruptContextRequest(contextId))
)
context.receiveNIgnoreExpressionUpdates(
1
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.InterruptContextResponse(contextId))
)
}
it should "interrupt running execution context" in {
val moduleName = "Enso_Test.Test.Main"
val contextId = UUID.randomUUID()
val requestId = UUID.randomUUID()
val metadata = new Metadata
val code =
"""from Standard.Base import all
|polyglot java import java.lang.Thread
|
|loop n s=0 =
| if (s > n) then s else
| Thread.sleep 100
| loop n s+1
|
|main =
| IO.println "started"
| loop 100
|""".stripMargin.linesIterator.mkString("\n")
val contents = metadata.appendToCode(code)
val mainFile = context.writeMain(contents)
// create context
context.send(Api.Request(requestId, Api.CreateContextRequest(contextId)))
context.receive shouldEqual Some(
Api.Response(requestId, Api.CreateContextResponse(contextId))
)
// Open the new file
context.send(
Api.Request(Api.OpenFileNotification(mainFile, contents))
)
context.receiveNone shouldEqual None
// push main
val item1 = Api.StackItem.ExplicitCall(
Api.MethodPointer(moduleName, "Enso_Test.Test.Main", "main"),
None,
Vector()
)
context.send(
Api.Request(requestId, Api.PushContextRequest(contextId, item1))
)
context.receiveNIgnoreExpressionUpdates(
1
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.PushContextResponse(contextId))
)
// wait for program to start
var isProgramStared = false
var iteration = 0
while (!isProgramStared && iteration < 50) {
val out = context.consumeOut
Thread.sleep(200)
isProgramStared = out == List("started")
iteration += 1
}
// interrupt
context.send(
Api.Request(requestId, Api.InterruptContextRequest(contextId))
)
context.receiveNIgnoreExpressionUpdates(
2
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.InterruptContextResponse(contextId)),
Api.Response(
Api.ExecutionFailed(
contextId,
Api.ExecutionResult
.Failure("Execution of function main interrupted.", None)
)
)
)
}
}

View File

@ -28,6 +28,9 @@ object CommandFactory {
case payload: Api.RecomputeContextRequest => case payload: Api.RecomputeContextRequest =>
new RecomputeContextCmd(request.requestId, payload) new RecomputeContextCmd(request.requestId, payload)
case payload: Api.InterruptContextRequest =>
new InterruptContextCmd(request.requestId, payload)
case _: Api.GetComponentGroupsRequest => case _: Api.GetComponentGroupsRequest =>
new GetComponentGroupsCmd(request.requestId) new GetComponentGroupsCmd(request.requestId)

View File

@ -0,0 +1,44 @@
package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
import scala.concurrent.{ExecutionContext, Future}
/** A command that forces an interruption of the current execution.
*
* @param maybeRequestId an option with request id
* @param request a request for a service
*/
class InterruptContextCmd(
maybeRequestId: Option[RequestId],
request: Api.InterruptContextRequest
) extends Command(maybeRequestId) {
/** @inheritdoc */
override def execute(implicit
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] =
if (doesContextExist) {
Future {
ctx.jobControlPlane.abortJobs(request.contextId)
reply(Api.InterruptContextResponse(request.contextId))
}
} else {
replyWithContextNotExistError()
}
private def doesContextExist(implicit ctx: RuntimeContext): Boolean = {
ctx.contextManager.contains(request.contextId)
}
private def replyWithContextNotExistError()(implicit
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] =
Future {
reply(Api.ContextNotExistError(request.contextId))
}
}