mirror of
https://github.com/enso-org/enso.git
synced 2024-11-26 08:52:58 +03:00
Mitigate Language Server visualizations DOS scenario on startup (#10549)
* Mitiigate LS DDOS scenario on startup For a medium-size project with a lot of visualizations, Language Server will be flooded with visualization requests on startup. Due to an existing limit for the job execution engine, some of those requests might have been silently dropped. This change lifts that limit until a better fix can be invented. Additionally, a slow startup might lead to a timeout when serving open file request. This change adds some retries as a fallback mechanism/progress monitoring. * add runtime-fat-jar to a list of aggregates
This commit is contained in:
parent
69da3d5999
commit
e585904291
@ -316,6 +316,7 @@ lazy val enso = (project in file("."))
|
||||
`runtime-benchmarks`,
|
||||
`runtime-parser`,
|
||||
`runtime-compiler`,
|
||||
`runtime-fat-jar`,
|
||||
`runtime-suggestions`,
|
||||
`runtime-language-epb`,
|
||||
`runtime-language-arrow`,
|
||||
|
@ -39,22 +39,35 @@ class OpenBufferHandler(
|
||||
bufferRegistry ! TextProtocol.OpenBuffer(rpcSession, params.path)
|
||||
val cancellable =
|
||||
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
|
||||
context.become(responseStage(id, sender(), cancellable))
|
||||
context.become(responseStage(id, sender(), cancellable, 10))
|
||||
}
|
||||
|
||||
private def responseStage(
|
||||
id: Id,
|
||||
replyTo: ActorRef,
|
||||
cancellable: Cancellable
|
||||
cancellable: Cancellable,
|
||||
retries: Int
|
||||
): Receive = {
|
||||
case RequestTimeout =>
|
||||
logger.error(
|
||||
"Opening buffer request [{}] for [{}] timed out.",
|
||||
id,
|
||||
rpcSession.clientId
|
||||
)
|
||||
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
|
||||
context.stop(self)
|
||||
if (retries > 0) {
|
||||
logger.error(
|
||||
"Opening buffer request [{}] for [{}] timed out. Retry attempts left {}...",
|
||||
id,
|
||||
rpcSession.clientId,
|
||||
retries
|
||||
)
|
||||
val newCancellable =
|
||||
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
|
||||
context.become(responseStage(id, replyTo, newCancellable, retries - 1))
|
||||
} else {
|
||||
logger.error(
|
||||
"Opening buffer request [{}] for [{}] timed out.",
|
||||
id,
|
||||
rpcSession.clientId
|
||||
)
|
||||
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
case OpenFileResponse(Right(OpenFileResult(buffer, capability))) =>
|
||||
replyTo ! ResponseResult(
|
||||
|
@ -39,22 +39,35 @@ class OpenFileHandler(
|
||||
bufferRegistry ! TextProtocol.OpenFile(rpcSession, params.path)
|
||||
val cancellable =
|
||||
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
|
||||
context.become(responseStage(id, sender(), cancellable))
|
||||
context.become(responseStage(id, sender(), cancellable, 10))
|
||||
}
|
||||
|
||||
private def responseStage(
|
||||
id: Id,
|
||||
replyTo: ActorRef,
|
||||
cancellable: Cancellable
|
||||
cancellable: Cancellable,
|
||||
retries: Int
|
||||
): Receive = {
|
||||
case RequestTimeout =>
|
||||
logger.error(
|
||||
"Opening file request [{}] for [{}] timed out.",
|
||||
id,
|
||||
rpcSession.clientId
|
||||
)
|
||||
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
|
||||
context.stop(self)
|
||||
if (retries > 0) {
|
||||
logger.error(
|
||||
"Opening file request [{}] for [{}] timed out. Retry attempts left {}...",
|
||||
id,
|
||||
rpcSession.clientId,
|
||||
retries
|
||||
)
|
||||
val newCancellable =
|
||||
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
|
||||
context.become(responseStage(id, replyTo, newCancellable, retries - 1))
|
||||
} else {
|
||||
logger.error(
|
||||
"Opening file request [{}] for [{}] timed out.",
|
||||
id,
|
||||
rpcSession.clientId
|
||||
)
|
||||
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
case OpenFileResponse(Right(OpenFileResult(buffer, capability))) =>
|
||||
replyTo ! ResponseResult(
|
||||
|
@ -46,17 +46,20 @@ final class JobExecutionEngine(
|
||||
val jobExecutor: ExecutorService =
|
||||
context.newFixedThreadPool(jobParallelism, "job-pool", false)
|
||||
|
||||
private val MaxJobLimit =
|
||||
Integer.MAX_VALUE // Temporary solution to avoid jobs being dropped
|
||||
|
||||
val highPriorityJobExecutor: ExecutorService =
|
||||
context.newCachedThreadPool(
|
||||
"prioritized-job-pool",
|
||||
2,
|
||||
4,
|
||||
50,
|
||||
MaxJobLimit,
|
||||
false
|
||||
)
|
||||
|
||||
private val backgroundJobExecutor: ExecutorService =
|
||||
context.newCachedThreadPool("background-job-pool", 1, 4, 50, false)
|
||||
context.newCachedThreadPool("background-job-pool", 1, 4, MaxJobLimit, false)
|
||||
|
||||
private val runtimeContext =
|
||||
RuntimeContext(
|
||||
@ -146,7 +149,12 @@ final class JobExecutionEngine(
|
||||
logger.log(Level.SEVERE, s"Error executing $job", err)
|
||||
throw err
|
||||
} finally {
|
||||
runningJobsRef.updateAndGet(_.filterNot(_.id == jobId))
|
||||
val remaining = runningJobsRef.updateAndGet(_.filterNot(_.id == jobId))
|
||||
logger.log(
|
||||
Level.FINEST,
|
||||
"Number of remaining pending jobs: {}",
|
||||
remaining.size
|
||||
)
|
||||
}
|
||||
})
|
||||
val runningJob = RunningJob(jobId, job, future)
|
||||
|
Loading…
Reference in New Issue
Block a user