Automatic reconnect with Language Server. (#9691)

Fixes #8520

If the websocket is closed not by us, we automatically try to reconnect with it, and initialize the protocol again. **Restoring state (execution contexts, attached visualizations) is not part of this PR**.

It's a part of making IDE work after hibernation (or LS crash).

# Important Notes
It required somewhat heavy refactoring:
1. I decided to use an existing implementation of reconnecting websocket. Replaced (later discovered by me) our implementation.
2. The LanguageServer class now handles both reconnecting and re-initializing - that make usage of it simpler (no more `Promise<LanguageServer>` - each method will just wait for (re)connection and initialization.
3. The stuff in `net` src's module was partially moved to shared's counterpart (with tests). Merged `exponentialBackoff` implementations, which also brought me to
4. Rewriting LS client, so it returns Result instead of throwing, what is closer our desired state, and allows us using exponentialBackoff method without any wrappers.
This commit is contained in:
Adam Obuchowicz 2024-04-19 15:39:45 +02:00 committed by GitHub
parent f23455d223
commit de406c69fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 1436 additions and 1370 deletions

View File

@ -254,6 +254,7 @@ test('Visualization preview: user visualization selection', async ({ page }) =>
await input.fill('4')
await expect(input).toHaveValue('4')
await expect(locate.jsonVisualization(page)).toExist()
await expect(locate.jsonVisualization(page)).toContainText('"visualizedExpr": "4"')
await locate.showVisualizationSelectorButton(page).click()
await page.getByRole('button', { name: 'Table' }).click()
// The table visualization is not currently working with `executeExpression` (#9194), but we can test that the JSON

View File

@ -12,6 +12,7 @@ test('node can open and load visualization', async ({ page }) => {
await expect(locate.circularMenu(page)).toExist()
await locate.toggleVisualizationButton(page).click()
await expect(locate.anyVisualization(page)).toExist()
await expect(locate.loadingVisualization(page)).toHaveCount(0)
await locate.showVisualizationSelectorButton(page).click()
await page.getByText('JSON').click()
const vis = locate.jsonVisualization(page)
@ -36,6 +37,7 @@ test('Warnings visualization', async ({ page }) => {
await expect(locate.circularMenu(page)).toExist()
await locate.toggleVisualizationButton(page).click()
await expect(locate.anyVisualization(page)).toExist()
await expect(locate.loadingVisualization(page)).toHaveCount(0)
await locate.showVisualizationSelectorButton(page).click()
await page.locator('.VisualizationSelector').getByRole('button', { name: 'Warnings' }).click()
await expect(locate.warningsVisualization(page)).toExist()

View File

@ -145,6 +145,7 @@ function componentLocator<T extends string>(className: SanitizeClassName<T>) {
export const graphEditor = componentLocator('GraphEditor')
// @ts-expect-error
export const anyVisualization = componentLocator('GraphVisualization > *')
export const loadingVisualization = componentLocator('LoadingVisualization')
export const circularMenu = componentLocator('CircularMenu')
export const addNewNodeButton = componentLocator('PlusButton')
export const componentBrowser = componentLocator('ComponentBrowser')

View File

@ -2,9 +2,9 @@
import { useProjectStore } from '@/stores/project'
import { mockFsDirectoryHandle } from '@/util/convert/fsAccess'
import { MockWebSocket, type WebSocketHandler } from '@/util/net'
import { mockDataWSHandler } from 'shared/dataServer/mock'
import { type Path as LSPath } from 'shared/languageServerTypes'
import { watchEffect } from 'vue'
import { mockDataWSHandler } from './dataServer'
const projectStore = useProjectStore()

View File

@ -27,9 +27,9 @@ import {
type AnyOutboundPayload,
type Offset,
type Table,
} from '../binaryProtocol'
import { LanguageServerErrorCode } from '../languageServerTypes'
import { uuidToBits } from '../uuid'
} from 'shared/binaryProtocol'
import { LanguageServerErrorCode } from 'shared/languageServerTypes'
import { uuidToBits } from 'shared/uuid'
const sha3 = createSHA3(224)

View File

@ -1,3 +1,4 @@
import { Pattern } from '@/util/ast/match'
import type { MockYdocProviderImpl } from '@/util/crdt'
import * as random from 'lib0/random'
import * as Ast from 'shared/ast'
@ -9,7 +10,7 @@ import {
VisualizationContext,
VisualizationUpdate,
} from 'shared/binaryProtocol'
import { mockDataWSHandler as originalMockDataWSHandler } from 'shared/dataServer/mock'
import { ErrorCode } from 'shared/languageServer'
import type {
ContextId,
ExpressionId,
@ -26,6 +27,7 @@ import type { QualifiedName } from 'src/util/qualifiedName'
import * as Y from 'yjs'
import { mockFsDirectoryHandle, type FileTree } from '../src/util/convert/fsAccess'
import mockDb from '../stories/mockSuggestions.json' assert { type: 'json' }
import { mockDataWSHandler as originalMockDataWSHandler } from './dataServer'
const mockProjectId = random.uuidv4() as Uuid
const standardBase = 'Standard.Base' as QualifiedName
@ -93,19 +95,21 @@ const visualizationExprIds = new Map<Uuid, ExpressionId>()
const encoder = new TextEncoder()
const encodeJSON = (data: unknown) => encoder.encode(JSON.stringify(data))
const scatterplotJson = encodeJSON({
axis: {
x: { label: 'x-axis label', scale: 'linear' },
y: { label: 'y-axis label', scale: 'logarithmic' },
},
points: { labels: 'visible' },
data: [
{ x: 0.1, y: 0.7, label: 'foo', color: '#FF0000', shape: 'circle', size: 0.2 },
{ x: 0.4, y: 0.2, label: 'baz', color: '#0000FF', shape: 'square', size: 0.3 },
],
})
const scatterplotJson = (params: string[]) =>
encodeJSON({
visualizedExpr: params[0],
axis: {
x: { label: 'x-axis label', scale: 'linear' },
y: { label: 'y-axis label', scale: 'logarithmic' },
},
points: { labels: 'visible' },
data: [
{ x: 0.1, y: 0.7, label: 'foo', color: '#FF0000', shape: 'circle', size: 0.2 },
{ x: 0.4, y: 0.2, label: 'baz', color: '#0000FF', shape: 'square', size: 0.3 },
],
})
const mockVizData: Record<string, Uint8Array | ((params: string[]) => Uint8Array)> = {
const mockVizPreprocessors: Record<string, Uint8Array | ((params: string[]) => Uint8Array)> = {
// JSON
'Standard.Visualization.Preprocessor.default_preprocessor': scatterplotJson,
'Standard.Visualization.Scatter_Plot.process_to_json_text': scatterplotJson,
@ -320,7 +324,6 @@ const mockVizData: Record<string, Uint8Array | ((params: string[]) => Uint8Array
return encodeJSON([])
}
},
'Standard.Visualization.AI.build_ai_prompt': () => encodeJSON('Could you __$$GOAL$$__, please?'),
// The following visualizations do not have unique transformation methods, and as such are only kept
// for posterity.
@ -353,9 +356,9 @@ function createId(id: Uuid) {
return (builder: Builder) => EnsoUUID.createEnsoUUID(builder, low, high)
}
function sendVizData(id: Uuid, config: VisualizationConfiguration) {
function sendVizData(id: Uuid, config: VisualizationConfiguration, expressionId?: Uuid) {
const vizDataHandler =
mockVizData[
mockVizPreprocessors[
typeof config.expression === 'string' ?
`${config.visualizationModule}.${config.expression}`
: `${config.expression.definedOnType}.${config.expression.name}`
@ -365,12 +368,22 @@ function sendVizData(id: Uuid, config: VisualizationConfiguration) {
vizDataHandler instanceof Uint8Array ? vizDataHandler : (
vizDataHandler(config.positionalArgumentsExpressions ?? [])
)
const exprId = expressionId ?? visualizationExprIds.get(id)
sendVizUpdate(id, config.executionContextId, exprId, vizData)
}
function sendVizUpdate(
id: Uuid,
executionCtxId: Uuid,
exprId: Uuid | undefined,
vizData: Uint8Array,
) {
if (!sendData) return
const builder = new Builder()
const exprId = visualizationExprIds.get(id)
const visualizationContextOffset = VisualizationContext.createVisualizationContext(
builder,
createId(id),
createId(config.executionContextId),
createId(executionCtxId),
exprId ? createId(exprId) : null,
)
const dataOffset = VisualizationUpdate.createDataVector(builder, vizData)
@ -446,16 +459,27 @@ export const mockLSHandler: MockTransportData = async (method, data, transport)
expressionId: ExpressionId
expression: string
}
const { func, args } = Ast.analyzeAppLike(Ast.parse(data_.expression))
if (!(func instanceof Ast.PropertyAccess && func.lhs)) return
const visualizationConfig: VisualizationConfiguration = {
executionContextId: data_.executionContextId,
visualizationModule: func.lhs.code(),
expression: func.rhs.code(),
positionalArgumentsExpressions: args.map((ast) => ast.code()),
const aiPromptPat = Pattern.parse('Standard.Visualization.AI.build_ai_prompt __ . to_json')
const exprAst = Ast.parse(data_.expression)
if (aiPromptPat.test(exprAst)) {
sendVizUpdate(
data_.visualizationId,
data_.executionContextId,
data_.expressionId,
encodeJSON('Could you __$$GOAL$$__, please?'),
)
} else {
// Check if there's existing preprocessor mock which matches our expression
const { func, args } = Ast.analyzeAppLike(exprAst)
if (!(func instanceof Ast.PropertyAccess && func.lhs)) return
const visualizationConfig: VisualizationConfiguration = {
executionContextId: data_.executionContextId,
visualizationModule: func.lhs.code(),
expression: func.rhs.code(),
positionalArgumentsExpressions: args.map((ast) => ast.code()),
}
sendVizData(data_.visualizationId, visualizationConfig, data_.expressionId)
}
visualizationExprIds.set(data_.visualizationId, data_.expressionId)
sendVizData(data_.visualizationId, visualizationConfig)
return
}
case 'search/getSuggestionsDatabase':
@ -487,9 +511,16 @@ export const mockLSHandler: MockTransportData = async (method, data, transport)
if (!child || typeof child === 'string' || child instanceof ArrayBuffer) break
}
}
if (!child) return Promise.reject(`Folder '/${data_.path.segments.join('/')}' not found.`)
if (!child)
return Promise.reject({
code: ErrorCode.FILE_NOT_FOUND,
message: `Folder '/${data_.path.segments.join('/')}' not found.`,
})
if (typeof child === 'string' || child instanceof ArrayBuffer)
return Promise.reject(`File '/${data_.path.segments.join('/')}' is not a folder.`)
return Promise.reject({
code: ErrorCode.NOT_DIRECTORY,
message: `File '/${data_.path.segments.join('/')}' is not a folder.`,
})
return {
paths: Object.entries(child).map(([name, entry]) => ({
type: typeof entry === 'string' || entry instanceof ArrayBuffer ? 'File' : 'Directory',
@ -500,8 +531,7 @@ export const mockLSHandler: MockTransportData = async (method, data, transport)
}
case 'ai/completion': {
const { prompt } = data
console.log(prompt)
const match = /^"Could you (.*), please\?"$/.exec(prompt)
const match = /^Could you (.*), please\?$/.exec(prompt)
if (!match) {
return { code: 'How rude!' }
} else if (match[1] === 'convert to table') {

View File

@ -63,6 +63,7 @@
"lib0": "^0.2.85",
"magic-string": "^0.30.3",
"murmurhash": "^2.0.1",
"partysocket": "^1.0.1",
"pinia": "^2.1.7",
"postcss-inline-svg": "^6.0.0",
"postcss-nesting": "^12.0.1",

View File

@ -1,12 +1,13 @@
import { sha3_224 as SHA3 } from '@noble/hashes/sha3'
import { bytesToHex } from '@noble/hashes/utils'
import { Client } from '@open-rpc/client-js'
import { Client, RequestManager } from '@open-rpc/client-js'
import { ObservableV2 } from 'lib0/observable'
import { uuidv4 } from 'lib0/random'
import { z } from 'zod'
import { walkFs } from './languageServer/files'
import type {
Checksum,
ContentRoot,
ContextId,
Event,
ExecutionEnvironment,
@ -21,6 +22,12 @@ import type {
VisualizationConfiguration,
response,
} from './languageServerTypes'
import { Err, Ok, type Result } from './util/data/result'
import {
AbortScope,
exponentialBackoff,
type ReconnectingTransportWithWebsocketEvents,
} from './util/net'
import type { Uuid } from './yjsModel'
const DEBUG_LOG_RPC = false
@ -91,41 +98,117 @@ export class RemoteRpcError {
}
}
export class LsRpcError extends Error {
cause: RemoteRpcError | Error
export class LsRpcError {
cause: RemoteRpcError | Error | string
request: string
params: object
constructor(cause: RemoteRpcError | Error, request: string, params: object) {
super(`Language server request '${request}' failed.`)
constructor(cause: RemoteRpcError | Error | string, request: string, params: object) {
this.cause = cause
this.request = request
this.params = params
}
toString() {
return `Language server request '${this.request} failed: ${this.cause instanceof RemoteRpcError ? this.cause.message : this.cause}`
}
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md) */
export class LanguageServer extends ObservableV2<Notifications> {
export type LsRpcResult<T> = Result<T, LsRpcError>
export type TransportEvents = {
'transport/closed': () => void
'transport/connected': () => void
}
/**
* This client implements the [Language Server Protocol](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md)
*
* It also handles the initialization (and re-initialization on every reconnect); each method
* repressenting a remote call (except the `initProtocolConnection` obviously) waits for
* initialization before sending the request.
*/
export class LanguageServer extends ObservableV2<Notifications & TransportEvents> {
client: Client
handlers: Map<string, Set<(...params: any[]) => void>>
retainCount = 1
/**
* This promise is resolved once the LS protocol is initialized. When connection is lost, this
* field becomes again an unresolved promise until reconnected and reinitialized.
*/
initialized: Promise<LsRpcResult<response.InitProtocolConnection>>
private clientScope: AbortScope = new AbortScope()
private initializationScheduled = false
private retainCount = 1
constructor(client: Client) {
constructor(
private clientID: Uuid,
private transport: ReconnectingTransportWithWebsocketEvents,
) {
super()
this.client = client
this.handlers = new Map()
client.onNotification((notification) => {
this.initialized = this.scheduleInitializationAfterConnect()
const requestManager = new RequestManager([transport])
this.client = new Client(requestManager)
this.client.onNotification((notification) => {
this.emit(notification.method as keyof Notifications, [notification.params])
})
client.onError((error) => {
this.client.onError((error) => {
console.error(`Unexpected LS connection error:`, error)
})
transport.on('error', (error) => console.error('Language Server transport error:', error))
const reinitializeCb = () => {
this.emit('transport/closed', [])
console.log('Language Server: websocket closed')
this.scheduleInitializationAfterConnect()
}
transport.on('close', reinitializeCb)
this.clientScope.onAbort(() => {
this.transport.off('close', reinitializeCb)
this.transport.close()
})
}
private scheduleInitializationAfterConnect() {
if (this.initializationScheduled) return this.initialized
this.initializationScheduled = true
this.initialized = new Promise((resolve) => {
const cb = () => {
this.transport.off('open', cb)
this.emit('transport/connected', [])
this.initializationScheduled = false
exponentialBackoff(() => this.initProtocolConnection(this.clientID), {
onBeforeRetry: (error, _, delay) => {
console.warn(
`Failed to initialize language server connection, retrying after ${delay}ms...\n`,
error,
)
},
}).then((result) => {
if (!result.ok) {
result.error.log('Error initializing Language Server RPC')
}
resolve(result)
})
}
this.transport.on('open', cb)
})
return this.initialized
}
get contentRoots(): Promise<ContentRoot[]> {
return this.initialized.then((result) => (result.ok ? result.value.contentRoots : []))
}
reconnect() {
this.transport.reconnect()
}
// The "magic bag of holding" generic that is only present in the return type is UNSOUND.
// However, it is SAFE, as the return type of the API is statically known.
private async request<T>(method: string, params: object): Promise<T> {
if (this.retainCount === 0) return Promise.reject(new Error('LanguageServer disposed'))
private async request<T>(
method: string,
params: object,
waitForInit = true,
): Promise<LsRpcResult<T>> {
if (this.retainCount === 0)
return Err(new LsRpcError('LanguageServer disposed', method, params))
const uuid = uuidv4()
const now = performance.now()
try {
@ -133,15 +216,18 @@ export class LanguageServer extends ObservableV2<Notifications> {
console.log(`LS [${uuid}] ${method}:`)
console.dir(params)
}
return await this.client.request({ method, params }, RPC_TIMEOUT_MS)
if (waitForInit) {
const initResult = await this.initialized
if (!initResult.ok) return initResult
}
return Ok(await this.client.request({ method, params }, RPC_TIMEOUT_MS))
} catch (error) {
const remoteError = RemoteRpcErrorSchema.safeParse(error)
if (remoteError.success) {
throw new LsRpcError(new RemoteRpcError(remoteError.data), method, params)
return Err(new LsRpcError(new RemoteRpcError(remoteError.data), method, params))
} else if (error instanceof Error) {
throw new LsRpcError(error, method, params)
}
throw error
return Err(new LsRpcError(error, method, params))
} else throw error
} finally {
if (DEBUG_LOG_RPC) {
console.log(`LS [${uuid}] ${method} took ${performance.now() - now}ms`)
@ -150,146 +236,146 @@ export class LanguageServer extends ObservableV2<Notifications> {
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#capabilityacquire) */
acquireCapability(method: string, registerOptions: RegisterOptions): Promise<void> {
acquireCapability(method: string, registerOptions: RegisterOptions): Promise<LsRpcResult<void>> {
return this.request('capability/acquire', { method, registerOptions })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filereceivestreeupdates) */
acquireReceivesTreeUpdates(path: Path): Promise<void> {
acquireReceivesTreeUpdates(path: Path): Promise<LsRpcResult<void>> {
return this.acquireCapability('file/receivesTreeUpdates', { path })
}
acquireExecutionContextCanModify(contextId: ContextId): Promise<void> {
acquireExecutionContextCanModify(contextId: ContextId): Promise<LsRpcResult<void>> {
return this.acquireCapability('executionContext/canModify', { contextId })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#sessioninitprotocolconnection) */
initProtocolConnection(clientId: Uuid): Promise<response.InitProtocolConnection> {
return this.request('session/initProtocolConnection', { clientId })
initProtocolConnection(clientId: Uuid): Promise<LsRpcResult<response.InitProtocolConnection>> {
return this.request('session/initProtocolConnection', { clientId }, false)
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textopenfile) */
openTextFile(path: Path): Promise<response.OpenTextFile> {
openTextFile(path: Path): Promise<LsRpcResult<response.OpenTextFile>> {
return this.request<response.OpenTextFile>('text/openFile', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textclosefile) */
closeTextFile(path: Path): Promise<void> {
closeTextFile(path: Path): Promise<LsRpcResult<void>> {
return this.request('text/closeFile', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textsave) */
saveTextFile(path: Path, currentVersion: Checksum): Promise<void> {
saveTextFile(path: Path, currentVersion: Checksum): Promise<LsRpcResult<void>> {
return this.request('text/save', { path, currentVersion })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textapplyedit) */
applyEdit(edit: FileEdit, execute: boolean): Promise<void> {
applyEdit(edit: FileEdit, execute: boolean): Promise<LsRpcResult<void>> {
return this.request('text/applyEdit', { edit, execute })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filewrite) */
writeFile(path: Path, contents: TextFileContents): Promise<void> {
writeFile(path: Path, contents: TextFileContents): Promise<LsRpcResult<void>> {
return this.request('file/write', { path, contents })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileread) */
readFile(path: Path): Promise<response.FileContents> {
readFile(path: Path): Promise<LsRpcResult<response.FileContents>> {
return this.request('file/read', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filecreate) */
createFile(object: FileSystemObject): Promise<void> {
createFile(object: FileSystemObject): Promise<LsRpcResult<void>> {
return this.request('file/create', { object })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filedelete) */
deleteFile(path: Path): Promise<void> {
deleteFile(path: Path): Promise<LsRpcResult<void>> {
return this.request('file/delete', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filecopy) */
copyFile(from: Path, to: Path): Promise<void> {
copyFile(from: Path, to: Path): Promise<LsRpcResult<void>> {
return this.request('file/copy', { from, to })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filemove) */
moveFile(from: Path, to: Path): Promise<void> {
moveFile(from: Path, to: Path): Promise<LsRpcResult<void>> {
return this.request('file/move', { from, to })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileexists) */
fileExists(path: Path): Promise<response.FileExists> {
fileExists(path: Path): Promise<LsRpcResult<response.FileExists>> {
return this.request('file/exists', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filetree) */
fileTree(path: Path, depth?: number): Promise<response.FileTree> {
fileTree(path: Path, depth?: number): Promise<LsRpcResult<response.FileTree>> {
return this.request('file/tree', { path, depth })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filelist) */
listFiles(path: Path): Promise<response.FileList> {
listFiles(path: Path): Promise<LsRpcResult<response.FileList>> {
return this.request('file/list', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileinfo) */
fileInfo(path: Path): Promise<response.FileInfo> {
fileInfo(path: Path): Promise<LsRpcResult<response.FileInfo>> {
return this.request('file/info', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filechecksum) */
fileChecksum(path: Path): Promise<response.FileChecksum> {
fileChecksum(path: Path): Promise<LsRpcResult<response.FileChecksum>> {
return this.request('file/checksum', { path })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsinit) */
vcsInit(root: Path): Promise<void> {
vcsInit(root: Path): Promise<LsRpcResult<void>> {
return this.request('vcs/init', { root })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcssave) */
vcsSave(root: Path, name?: string): Promise<response.VCSCommit> {
vcsSave(root: Path, name?: string): Promise<LsRpcResult<response.VCSCommit>> {
return this.request('vcs/save', { root, name })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsstatus) */
vcsStatus(root: Path): Promise<response.VCSStatus> {
vcsStatus(root: Path): Promise<LsRpcResult<response.VCSStatus>> {
return this.request('vcs/status', { root })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsrestore) */
vcsRestore(root: Path, commitId?: string): Promise<response.VCSChanges> {
vcsRestore(root: Path, commitId?: string): Promise<LsRpcResult<response.VCSChanges>> {
return this.request('vcs/restore', { root, commitId })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcslist) */
vcsList(root: Path, limit?: number): Promise<response.VCSSaves> {
vcsList(root: Path, limit?: number): Promise<LsRpcResult<response.VCSSaves>> {
return this.request('vcs/list', { root, limit })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextcreate) */
createExecutionContext(contextId?: ContextId): Promise<response.ExecutionContext> {
createExecutionContext(contextId?: ContextId): Promise<LsRpcResult<response.ExecutionContext>> {
return this.request('executionContext/create', { contextId })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextdestroy) */
destroyExecutionContext(contextId: ContextId): Promise<void> {
destroyExecutionContext(contextId: ContextId): Promise<LsRpcResult<void>> {
return this.request('executionContext/destroy', { contextId })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextfork) */
forkExecutionContext(contextId: ContextId): Promise<response.ExecutionContext> {
forkExecutionContext(contextId: ContextId): Promise<LsRpcResult<response.ExecutionContext>> {
return this.request('executionContext/fork', { contextId })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextpush) */
pushExecutionContextItem(contextId: ContextId, stackItem: StackItem): Promise<void> {
pushExecutionContextItem(contextId: ContextId, stackItem: StackItem): Promise<LsRpcResult<void>> {
return this.request('executionContext/push', { contextId, stackItem })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextpop) */
popExecutionContextItem(contextId: ContextId): Promise<void> {
popExecutionContextItem(contextId: ContextId): Promise<LsRpcResult<void>> {
return this.request('executionContext/pop', { contextId })
}
@ -298,7 +384,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
contextId: ContextId,
invalidatedExpressions?: 'all' | string[],
executionEnvironment?: ExecutionEnvironment,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/recompute', {
contextId,
invalidatedExpressions,
@ -307,7 +393,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextinterrupt) */
interruptExecutionContext(contextId: ContextId): Promise<void> {
interruptExecutionContext(contextId: ContextId): Promise<LsRpcResult<void>> {
return this.request('executionContext/interrupt', { contextId })
}
@ -315,7 +401,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
setExecutionEnvironment(
contextId: ContextId,
executionEnvironment?: ExecutionEnvironment,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/setExecutionEnvironment', {
contextId,
executionEnvironment,
@ -328,7 +414,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
visualizationId: Uuid,
expressionId: ExpressionId,
expression: string,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/executeExpression', {
executionContextId,
visualizationId,
@ -342,7 +428,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
visualizationId: Uuid,
expressionId: ExpressionId,
visualizationConfig: VisualizationConfiguration,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/attachVisualization', {
visualizationId,
expressionId,
@ -355,7 +441,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
visualizationId: Uuid,
expressionId: ExpressionId,
contextId: ContextId,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/detachVisualization', {
visualizationId,
expressionId,
@ -367,7 +453,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
modifyVisualization(
visualizationId: Uuid,
visualizationConfig: VisualizationConfiguration,
): Promise<void> {
): Promise<LsRpcResult<void>> {
return this.request('executionContext/modifyVisualization', {
visualizationId,
visualizationConfig,
@ -375,45 +461,43 @@ export class LanguageServer extends ObservableV2<Notifications> {
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#searchgetsuggestionsdatabase) */
getSuggestionsDatabase(): Promise<response.GetSuggestionsDatabase> {
getSuggestionsDatabase(): Promise<LsRpcResult<response.GetSuggestionsDatabase>> {
return this.request('search/getSuggestionsDatabase', {})
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#runtimegetcomponentgroups) */
getComponentGroups(): Promise<response.GetComponentGroups> {
getComponentGroups(): Promise<LsRpcResult<response.GetComponentGroups>> {
return this.request('runtime/getComponentGroups', {})
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#profilingstart) */
profilingStart(memorySnapshot?: boolean): Promise<void> {
profilingStart(memorySnapshot?: boolean): Promise<LsRpcResult<void>> {
return this.request('profiling/start', { memorySnapshot })
}
/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#profilingstop) */
profilingStop(): Promise<void> {
profilingStop(): Promise<LsRpcResult<void>> {
return this.request('profiling/stop', {})
}
aiCompletion(prompt: string, stopSequence: string): Promise<response.AICompletion> {
aiCompletion(prompt: string, stopSequence: string): Promise<LsRpcResult<response.AICompletion>> {
return this.request('ai/completion', { prompt, stopSequence })
}
/** A helper function to subscribe to file updates.
* Please use `ls.on('file/event')` directly if the initial `'Added'` notifications are not
* needed. */
watchFiles(
rootId: Uuid,
segments: string[],
callback: (event: Event<'file/event'>) => void,
retry: <T>(cb: () => Promise<T>) => Promise<T> = (f) => f(),
) {
watchFiles(rootId: Uuid, segments: string[], callback: (event: Event<'file/event'>) => void) {
let running = true
const self = this
return {
promise: (async () => {
self.on('file/event', callback)
await retry(async () => running && self.acquireReceivesTreeUpdates({ rootId, segments }))
await walkFs(self, { rootId, segments }, (type, path) => {
const updatesAcquired = await exponentialBackoff(async () =>
running ? self.acquireReceivesTreeUpdates({ rootId, segments }) : Ok(),
)
if (!updatesAcquired) return updatesAcquired
return await walkFs(self, { rootId, segments }, (type, path) => {
if (
!running ||
type !== 'File' ||
@ -445,7 +529,7 @@ export class LanguageServer extends ObservableV2<Notifications> {
if (this.retainCount > 0) {
this.retainCount -= 1
if (this.retainCount === 0) {
this.client.close()
this.clientScope.dispose('Language server released')
}
} else {
throw new Error('Released already disposed language server.')

View File

@ -1,12 +1,15 @@
import { type LanguageServer } from 'shared/languageServer'
import type { FileSystemObject, Path } from 'shared/languageServerTypes'
import { type LanguageServer } from '../languageServer'
import type { FileSystemObject, Path } from '../languageServerTypes'
import { Err, Ok, type Result } from '../util/data/result'
export async function walkFs(
ls: LanguageServer,
path: Path,
cb: (type: FileSystemObject['type'], path: Path) => void,
) {
for (const file of (await ls.listFiles(path)).paths) {
): Promise<Result<void>> {
const files = await ls.listFiles(path)
if (!files.ok) return files
for (const file of files.value.paths) {
const filePath: Path = {
rootId: file.path.rootId,
segments: [...file.path.segments, file.name],
@ -26,8 +29,9 @@ export async function walkFs(
}
default: {
const unexpected: never = file
throw new Error('Unexpected object: ' + JSON.stringify(unexpected))
return Err('Unexpected object: ' + JSON.stringify(unexpected))
}
}
}
return Ok()
}

View File

@ -1,4 +1,4 @@
import type { Uuid } from 'shared/languageServerTypes'
import type { Uuid } from '../languageServerTypes'
export type SuggestionId = number

View File

@ -1,118 +0,0 @@
import { wait } from 'lib0/promise'
export interface BackoffOptions<E> {
maxRetries?: number
retryDelay?: number
retryDelayMultiplier?: number
retryDelayMax?: number
/** Called when the promise throws an error, and the next retry is about to be attempted.
* When this function returns `false`, the backoff is immediately aborted. When this function
* is not provided, the backoff will always continue until the maximum number of retries
* is reached. * */
onBeforeRetry?: (
error: E,
retryCount: number,
maxRetries: number,
delay: number,
) => boolean | void
/** Called right before returning. */
onSuccess?: (retryCount: number) => void
/** Called after the final retry, right before throwing an error.
* Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the
* final retry. */
onFailure?: (error: E, retryCount: number) => void
}
const defaultBackoffOptions: Required<BackoffOptions<unknown>> = {
maxRetries: 3,
retryDelay: 1000,
retryDelayMultiplier: 2,
retryDelayMax: 10000,
onBeforeRetry: () => {},
onSuccess: () => {},
onFailure: () => {},
}
/** Retry a failing promise function with exponential backoff. */
export async function exponentialBackoff<T, E>(
f: () => Promise<T>,
backoffOptions?: BackoffOptions<E>,
): Promise<T> {
const {
maxRetries,
retryDelay,
retryDelayMultiplier,
retryDelayMax,
onBeforeRetry,
onSuccess,
onFailure,
} = {
...defaultBackoffOptions,
...backoffOptions,
}
for (
let retries = 0, delay = retryDelay;
;
retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier)
) {
try {
const result = await f()
onSuccess(retries)
return result
} catch (error) {
if (retries >= maxRetries) {
onFailure(error as E, retries)
throw error
}
if (onBeforeRetry(error as E, retries, maxRetries, delay) === false) throw error
await wait(delay)
}
}
}
export function defaultOnBeforeRetry(
description: string,
): NonNullable<BackoffOptions<any>['onBeforeRetry']> {
return (error, retryCount, maxRetries, delay) => {
console.error(
'Could not ' +
description +
` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`,
)
console.error(error)
}
}
export function defaultOnFailure(
description: string,
): NonNullable<BackoffOptions<any>['onFailure']> {
return (error, retryCount) => {
console.error(
'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`,
)
console.error(error)
}
}
export function defaultOnSuccess(
description: string,
): NonNullable<BackoffOptions<any>['onSuccess']> {
return (retryCount) => {
if (retryCount === 0) return
console.info(
'Successfully ' +
description +
` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`,
)
}
}
/** @param successDescription Should be in past tense, without an initial capital letter.
* @param errorDescription Should be in present tense, without an initial capital letter. */
export function printingCallbacks(successDescription: string, errorDescription: string) {
return {
onBeforeRetry: defaultOnBeforeRetry(errorDescription),
onSuccess: defaultOnSuccess(successDescription),
onFailure: defaultOnFailure(errorDescription),
} satisfies BackoffOptions<unknown>
}

View File

@ -0,0 +1,74 @@
import { Err, Ok, ResultError } from 'shared/util/data/result'
import { exponentialBackoff } from 'shared/util/net'
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'
beforeEach(() => {
vi.useFakeTimers()
})
afterEach(() => {
vi.useRealTimers()
})
describe('exponentialBackoff', () => {
test('runs successful task once', async () => {
const task = vi.fn(async () => Ok(1))
const result = await exponentialBackoff(task)
expect(result).toEqual({ ok: true, value: 1 })
expect(task).toHaveBeenCalledTimes(1)
})
test('retry failing task up to a limit', async () => {
const task = vi.fn(async () => Err(1))
const promise = exponentialBackoff(task, { maxRetries: 4 })
vi.runAllTimersAsync()
const result = await promise
expect(result).toEqual({ ok: false, error: new ResultError(1) })
expect(task).toHaveBeenCalledTimes(5)
})
test('wait before retrying', async () => {
const task = vi.fn(async () => Err(null))
exponentialBackoff(task, {
maxRetries: 10,
retryDelay: 100,
retryDelayMultiplier: 3,
retryDelayMax: 1000,
})
expect(task).toHaveBeenCalledTimes(1)
await vi.advanceTimersByTimeAsync(100)
expect(task).toHaveBeenCalledTimes(2)
await vi.advanceTimersByTimeAsync(300)
expect(task).toHaveBeenCalledTimes(3)
await vi.advanceTimersByTimeAsync(900)
expect(task).toHaveBeenCalledTimes(4)
await vi.advanceTimersByTimeAsync(5000)
expect(task).toHaveBeenCalledTimes(9)
})
test('retry task until success', async () => {
const task = vi.fn()
task.mockReturnValueOnce(Promise.resolve(Err(3)))
task.mockReturnValueOnce(Promise.resolve(Err(2)))
task.mockReturnValueOnce(Promise.resolve(Ok(1)))
const promise = exponentialBackoff(task)
vi.runAllTimersAsync()
const result = await promise
expect(result).toEqual({ ok: true, value: 1 })
expect(task).toHaveBeenCalledTimes(3)
})
test('call retry callback', async () => {
const task = vi.fn()
task.mockReturnValueOnce(Promise.resolve(Err(3)))
task.mockReturnValueOnce(Promise.resolve(Err(2)))
task.mockReturnValueOnce(Promise.resolve(Ok(1)))
const onBeforeRetry = vi.fn()
const promise = exponentialBackoff(task, { onBeforeRetry })
vi.runAllTimersAsync()
await promise
expect(onBeforeRetry).toHaveBeenCalledTimes(2)
expect(onBeforeRetry).toHaveBeenNthCalledWith(1, new ResultError(3), 0, 3, 1000)
expect(onBeforeRetry).toHaveBeenNthCalledWith(2, new ResultError(2), 1, 3, 2000)
})
})

View File

@ -3,11 +3,13 @@
import { isSome, type Opt } from './opt'
export type Result<T = undefined, E = string> =
export type Result<T = undefined, E = unknown> =
| { ok: true; value: T }
| { ok: false; error: ResultError<E> }
export function Ok<T>(data: T): Result<T, never> {
export function Ok(): Result<undefined, never>
export function Ok<T>(data: T): Result<T, never>
export function Ok<T>(data?: T): Result<T | undefined, never> {
return { ok: true, value: data }
}
@ -40,7 +42,7 @@ export function isResult(v: unknown): v is Result {
)
}
export class ResultError<E = string> {
export class ResultError<E = unknown> {
payload: E
context: (() => string)[]
@ -60,13 +62,28 @@ export class ResultError<E = string> {
}
}
export function withContext<T, E>(context: () => string, f: () => Result<T, E>): Result<T, E> {
export function withContext<T, E>(context: () => string, f: () => Result<T, E>): Result<T, E>
export function withContext<T, E>(
context: () => string,
f: () => Promise<Result<T, E>>,
): Promise<Result<T, E>>
export function withContext<T, E>(
context: () => string,
f: () => Promise<Result<T, E>> | Result<T, E>,
) {
const result = f()
if (result == null) {
throw new Error('withContext: f() returned null or undefined')
const handleResult = (result: Result<T, E>) => {
if (result == null) {
throw new Error('withContext: f() returned null or undefined')
}
if (!result.ok) result.error.context.splice(0, 0, context)
return result
}
if (result instanceof Promise) {
return result.then(handleResult)
} else {
return handleResult(result)
}
if (!result.ok) result.error.context.splice(0, 0, context)
return result
}
/**

View File

@ -1,4 +1,8 @@
import { Transport } from '@open-rpc/client-js/build/transports/Transport'
import type { ObservableV2 } from 'lib0/observable'
import { wait } from 'lib0/promise'
import { type WebSocketEventMap } from 'partysocket/ws'
import { type Result, type ResultError } from './data/result'
interface Disposable {
dispose(): void
@ -37,3 +41,127 @@ export class AbortScope {
return f
}
}
export interface BackoffOptions<E> {
maxRetries?: number
retryDelay?: number
retryDelayMultiplier?: number
retryDelayMax?: number
/** Called when the promise throws an error, and the next retry is about to be attempted.
* When this function returns `false`, the backoff is immediately aborted. When this function
* is not provided, the backoff will always continue until the maximum number of retries
* is reached. * */
onBeforeRetry?: (
error: ResultError<E>,
retryCount: number,
maxRetries: number,
delay: number,
) => boolean | void
/** Called right before returning. */
onSuccess?: (retryCount: number) => void
/** Called after the final retry, right before throwing an error.
* Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the
* final retry. */
onFailure?: (error: ResultError<E>, retryCount: number) => void
}
const defaultBackoffOptions: Required<BackoffOptions<unknown>> = {
maxRetries: 3,
retryDelay: 1000,
retryDelayMultiplier: 2,
retryDelayMax: 10000,
onBeforeRetry: () => {},
onSuccess: () => {},
onFailure: () => {},
}
/** Retry a failing promise function with exponential backoff. */
export async function exponentialBackoff<T, E>(
f: () => Promise<Result<T, E>>,
backoffOptions?: BackoffOptions<E>,
): Promise<Result<T, E>> {
const {
maxRetries,
retryDelay,
retryDelayMultiplier,
retryDelayMax,
onBeforeRetry,
onSuccess,
onFailure,
} = {
...defaultBackoffOptions,
...backoffOptions,
}
for (
let retries = 0, delay = retryDelay;
;
retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier)
) {
const result = await f()
if (result.ok) {
onSuccess(retries)
return result
}
if (retries >= maxRetries) {
onFailure(result.error, retries)
return result
}
if (onBeforeRetry(result.error, retries, maxRetries, delay) === false) {
return result
}
await wait(delay)
}
}
export function defaultOnBeforeRetry(
description: string,
): NonNullable<BackoffOptions<any>['onBeforeRetry']> {
return (error, retryCount, maxRetries, delay) => {
console.error(
'Could not ' +
description +
` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`,
)
console.error(error)
}
}
export function defaultOnFailure(
description: string,
): NonNullable<BackoffOptions<any>['onFailure']> {
return (error, retryCount) => {
console.error(
'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`,
)
console.error(error)
}
}
export function defaultOnSuccess(
description: string,
): NonNullable<BackoffOptions<any>['onSuccess']> {
return (retryCount) => {
if (retryCount === 0) return
console.info(
'Successfully ' +
description +
` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`,
)
}
}
/** @param successDescription Should be in past tense, without an initial capital letter.
* @param errorDescription Should be in present tense, without an initial capital letter. */
export function printingCallbacks(successDescription: string, errorDescription: string) {
return {
onBeforeRetry: defaultOnBeforeRetry(errorDescription),
onSuccess: defaultOnSuccess(successDescription),
onFailure: defaultOnFailure(errorDescription),
} satisfies BackoffOptions<unknown>
}
export type ReconnectingTransportWithWebsocketEvents = Transport & {
on<K extends keyof WebSocketEventMap>(type: K, cb: (event: WebSocketEventMap[K]) => void): void
off<K extends keyof WebSocketEventMap>(type: K, cb: (event: WebSocketEventMap[K]) => void): void
reconnect(): void
}

View File

@ -0,0 +1,85 @@
/**
* This file is modified version of open-rpc/client-js WebSocketTransport implementation
* (https://github.com/open-rpc/client-js/blob/master/src/transports/WebSocketTransport.ts)
* which uses the automatically reconnecting websocket.
*/
import { ERR_UNKNOWN, JSONRPCError } from '@open-rpc/client-js/build/Error'
import {
getBatchRequests,
getNotifications,
type JSONRPCRequestData,
} from '@open-rpc/client-js/build/Request'
import { Transport } from '@open-rpc/client-js/build/transports/Transport'
import WS from 'isomorphic-ws'
import { WebSocket } from 'partysocket'
import { type WebSocketEventMap } from 'partysocket/ws'
class ReconnectingWebSocketTransport extends Transport {
public connection: WebSocket
public uri: string
constructor(uri: string) {
super()
this.uri = uri
this.connection = new WebSocket(uri, undefined, { WebSocket: WS })
}
public connect(): Promise<any> {
return new Promise<void>((resolve) => {
const cb = () => {
this.connection.removeEventListener('open', cb)
resolve()
}
this.connection.addEventListener('open', cb)
this.connection.addEventListener('message', (message: { data: string }) => {
const { data } = message
this.transportRequestManager.resolveResponse(data)
})
})
}
public reconnect() {
this.connection.reconnect()
}
public async sendData(data: JSONRPCRequestData, timeout: number | null = 5000): Promise<any> {
let prom = this.transportRequestManager.addRequest(data, timeout)
const notifications = getNotifications(data)
try {
this.connection.send(JSON.stringify(this.parseData(data)))
this.transportRequestManager.settlePendingRequest(notifications)
} catch (err) {
const jsonError = new JSONRPCError((err as any).message, ERR_UNKNOWN, err)
this.transportRequestManager.settlePendingRequest(notifications, jsonError)
this.transportRequestManager.settlePendingRequest(getBatchRequests(data), jsonError)
prom = Promise.reject(jsonError)
}
return prom
}
public close(): void {
this.connection.close()
}
on<K extends keyof WebSocketEventMap>(
type: K,
cb: (
event: WebSocketEventMap[K] extends Event ? WebSocketEventMap[K] : never,
) => WebSocketEventMap[K] extends Event ? void : never,
): void {
this.connection.addEventListener(type, cb)
}
off<K extends keyof WebSocketEventMap>(
type: K,
cb: (
event: WebSocketEventMap[K] extends Event ? WebSocketEventMap[K] : never,
) => WebSocketEventMap[K] extends Event ? void : never,
): void {
this.connection.removeEventListener(type, cb)
}
}
export default ReconnectingWebSocketTransport

View File

@ -1,192 +0,0 @@
/// <reference lib="DOM" />
/* eslint-env browser */
// The refernce to DOM types is requiredto use `WebSocket` as a type.
// This is preferable over using `any`, for additional type safety.
/* The MIT License (MIT)
*
* Copyright (c) 2019 Kevin Jahns <kevin.jahns@protonmail.com>.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/**
* Tiny websocket connection handler.
*
* Implements exponential backoff reconnects, ping/pong, and a nice event system using [lib0/observable].
*
* @module websocket
*/
import * as math from 'lib0/math'
import { ObservableV2 } from 'lib0/observable'
import * as time from 'lib0/time'
import type { AbortScope } from './util/net'
const reconnectTimeoutBase = 1200
const maxReconnectTimeout = 2500
// @todo - this should depend on awareness.outdatedTime
const messageReconnectTimeout = 30000
const setupWS = (wsclient: WebsocketClient, ws?: WebSocket | null | undefined) => {
if (wsclient.shouldConnect && (wsclient.ws === null || ws)) {
// deepcode ignore MissingClose: This is closed by `WebsocketClient` below.
const websocket = ws ?? new WebSocket(wsclient.url)
const binaryType = wsclient.binaryType
let pingTimeout: any = null
if (binaryType) {
websocket.binaryType = binaryType
}
wsclient.ws = websocket
wsclient.connecting = true
wsclient.connected = false
websocket.onmessage = (event: { data: string | ArrayBuffer | Blob }) => {
wsclient.lastMessageReceived = time.getUnixTime()
const data = event.data
const message = typeof data === 'string' ? JSON.parse(data) : data
if (wsclient.sendPings && message && message.type === 'pong') {
clearTimeout(pingTimeout)
pingTimeout = setTimeout(sendPing, messageReconnectTimeout / 2)
}
wsclient.emit('message', [message, wsclient])
}
const onclose = (error: unknown) => {
if (wsclient.ws !== null) {
wsclient.ws = null
wsclient.connecting = false
if (wsclient.connected) {
wsclient.connected = false
wsclient.emit('disconnect', [{ type: 'disconnect', error }, wsclient])
} else {
wsclient.unsuccessfulReconnects++
}
// Start with no reconnect timeout and increase timeout by
// log10(wsUnsuccessfulReconnects).
// The idea is to increase reconnect timeout slowly and have no reconnect
// timeout at the beginning (log(1) = 0)
setTimeout(
setupWS,
math.min(
math.log10(wsclient.unsuccessfulReconnects + 1) * reconnectTimeoutBase,
maxReconnectTimeout,
),
wsclient,
)
}
clearTimeout(pingTimeout)
}
const sendPing = () => {
if (wsclient.sendPings && wsclient.ws === websocket) {
wsclient.send({
type: 'ping',
})
}
}
websocket.onclose = () => onclose(null)
websocket.onerror = (error: unknown) => onclose(error)
websocket.onopen = () => {
wsclient.lastMessageReceived = time.getUnixTime()
wsclient.connecting = false
wsclient.connected = true
wsclient.unsuccessfulReconnects = 0
wsclient.emit('connect', [{ type: 'connect' }, wsclient])
// set ping
pingTimeout = setTimeout(sendPing, messageReconnectTimeout / 2)
}
}
}
type WebsocketEvents = {
connect: (payload: { type: 'connect' }, self: WebsocketClient) => void
disconnect: (payload: { type: 'disconnect'; error: unknown }, self: WebsocketClient) => void
message: (payload: {} | ArrayBuffer | Blob, self: WebsocketClient) => void
}
export class WebsocketClient extends ObservableV2<WebsocketEvents> {
ws: WebSocket | null
binaryType
sendPings
connected
connecting
unsuccessfulReconnects
lastMessageReceived
shouldConnect
protected _checkInterval
constructor(
public url: string,
abort: AbortScope,
{
binaryType,
sendPings,
}: { binaryType?: 'arraybuffer' | 'blob' | null; sendPings?: boolean } = {},
) {
super()
abort.handleDispose(this)
this.ws = null
this.binaryType = binaryType || null
this.sendPings = sendPings ?? true
this.connected = false
this.connecting = false
this.unsuccessfulReconnects = 0
this.lastMessageReceived = 0
/** Whether to connect to other peers or not */
this.shouldConnect = false
this._checkInterval =
this.sendPings ?
setInterval(() => {
if (
this.connected &&
messageReconnectTimeout < time.getUnixTime() - this.lastMessageReceived
) {
// no message received in a long time - not even your own awareness
// updates (which are updated every 15 seconds)
this.ws?.close()
}
}, messageReconnectTimeout / 2)
: 0
setupWS(this)
}
send(message: {} | ArrayBuffer | Blob) {
if (!this.ws) return
const encoded =
message instanceof ArrayBuffer || message instanceof Blob ? message : JSON.stringify(message)
this.ws.send(encoded)
}
dispose() {
clearInterval(this._checkInterval)
this.disconnect()
super.destroy()
}
disconnect() {
this.shouldConnect = false
this.ws?.close()
}
connect(ws?: WebSocket | null | undefined) {
this.shouldConnect = true
if (ws) this.ws = ws
if ((!this.connected && !this.ws) || ws) {
setupWS(this, ws)
}
}
}

View File

@ -345,6 +345,7 @@ const visualizationSelections = reactive(new Map<SuggestionId | null, Visualizat
const previewedVisualizationId = computed(() => {
return visualizationSelections.get(previewed.value.suggestionId ?? null)
})
function setVisualization(visualization: VisualizationIdentifier) {
visualizationSelections.set(previewed.value.suggestionId ?? null, visualization)
}

View File

@ -11,34 +11,38 @@ const AI_STOP_SEQUENCE = '`'
export function useAI(
graphDb: GraphDb = useGraphStore().db,
project: {
lsRpcConnection: Promise<LanguageServer>
executeExpression(expressionId: ExternalId, expression: string): Promise<Result<string> | null>
lsRpcConnection: LanguageServer
executeExpression(expressionId: ExternalId, expression: string): Promise<Result<any> | null>
} = useProjectStore(),
) {
async function query(query: string, sourceIdentifier: string): Promise<Result<string>> {
const lsRpc = await project.lsRpcConnection
const sourceNodeId = graphDb.getIdentDefiningNode(sourceIdentifier)
const contextId = sourceNodeId && graphDb.nodeIdToNode.get(sourceNodeId)?.outerExpr.externalId
if (!contextId) return Err(`Cannot find node with name ${sourceIdentifier}`)
return withContext(
() => 'When getting AI completion',
async () => {
const lsRpc = project.lsRpcConnection
const sourceNodeId = graphDb.getIdentDefiningNode(sourceIdentifier)
const contextId =
sourceNodeId && graphDb.nodeIdToNode.get(sourceNodeId)?.outerExpr.externalId
if (!contextId) return Err(`Cannot find node with name ${sourceIdentifier}`)
const prompt = await project.executeExpression(
contextId,
`Standard.Visualization.AI.build_ai_prompt ${sourceIdentifier}`,
const prompt = await withContext(
() => 'When building AI propt',
async () => {
const prompt = await project.executeExpression(
contextId,
`Standard.Visualization.AI.build_ai_prompt ${sourceIdentifier} . to_json`,
)
if (!prompt) return Err('No data from AI visualization')
return prompt
},
)
if (!prompt.ok) return prompt
const promptWithGoal = prompt.value.replace(AI_GOAL_PLACEHOLDER, query)
const completion = await lsRpc.aiCompletion(promptWithGoal, AI_STOP_SEQUENCE)
if (!completion.ok) return completion
return Ok(completion.value.code)
},
)
if (!prompt) return Err('No data from AI visualization')
if (!prompt.ok)
return withContext(
() => 'When building AI propt',
() => prompt,
)
const promptWithGoal = prompt.value.replace(AI_GOAL_PLACEHOLDER, query)
if (!prompt.ok) return prompt
try {
const { code } = await lsRpc.aiCompletion(promptWithGoal, AI_STOP_SEQUENCE)
return Ok(code)
} catch (err) {
return Err(`Error when getting AI completion: ${err}`)
}
}
return {

View File

@ -147,10 +147,10 @@ const graphBindingsHandler = graphBindings.handler({
projectStore.module?.undoManager.redo()
},
startProfiling() {
projectStore.lsRpcConnection.then((ls) => ls.profilingStart(true))
projectStore.lsRpcConnection.profilingStart(true)
},
stopProfiling() {
projectStore.lsRpcConnection.then((ls) => ls.profilingStop())
projectStore.lsRpcConnection.profilingStop()
},
openComponentBrowser() {
if (keyboardBusy()) return false
@ -288,7 +288,7 @@ const codeEditorHandler = codeEditorBindings.handler({
/** Handle record-once button presses. */
function onRecordOnceButtonPress() {
projectStore.lsRpcConnection.then(async () => {
projectStore.lsRpcConnection.initialized.then(async () => {
const modeValue = projectStore.executionMode
if (modeValue == undefined) {
return
@ -449,29 +449,29 @@ async function handleFileDrop(event: DragEvent) {
if (!event.dataTransfer?.items) return
;[...event.dataTransfer.items].forEach(async (item, index) => {
try {
if (item.kind === 'file') {
const file = item.getAsFile()
if (!file) return
const clientPos = new Vec2(event.clientX, event.clientY)
const offset = new Vec2(0, index * -MULTIPLE_FILES_GAP)
const pos = graphNavigator.clientToScenePos(clientPos).add(offset)
const uploader = await Uploader.Create(
projectStore.lsRpcConnection,
projectStore.dataConnection,
projectStore.contentRoots,
projectStore.awareness,
file,
pos,
projectStore.isOnLocalBackend,
event.shiftKey,
projectStore.executionContext.getStackTop(),
)
const uploadResult = await uploader.upload()
graphStore.createNode(pos, uploadedExpression(uploadResult))
if (item.kind === 'file') {
const file = item.getAsFile()
if (!file) return
const clientPos = new Vec2(event.clientX, event.clientY)
const offset = new Vec2(0, index * -MULTIPLE_FILES_GAP)
const pos = graphNavigator.clientToScenePos(clientPos).add(offset)
const uploader = await Uploader.Create(
projectStore.lsRpcConnection,
projectStore.dataConnection,
projectStore.contentRoots,
projectStore.awareness,
file,
pos,
projectStore.isOnLocalBackend,
event.shiftKey,
projectStore.executionContext.getStackTop(),
)
const uploadResult = await uploader.upload()
if (uploadResult.ok) {
graphStore.createNode(pos, uploadedExpression(uploadResult.value))
} else {
uploadResult.error.log(`Uploading file failed`)
}
} catch (err) {
console.error(`Uploading file failed. ${err}`)
}
})
}

View File

@ -5,7 +5,8 @@ import LoadingErrorVisualization from '@/components/visualizations/LoadingErrorV
import LoadingVisualization from '@/components/visualizations/LoadingVisualization.vue'
import { focusIsIn, useEvent } from '@/composables/events'
import { provideVisualizationConfig } from '@/providers/visualizationConfig'
import { useProjectStore, type NodeVisualizationConfiguration } from '@/stores/project'
import { useProjectStore } from '@/stores/project'
import { type NodeVisualizationConfiguration } from '@/stores/project/executionContext'
import {
DEFAULT_VISUALIZATION_CONFIGURATION,
DEFAULT_VISUALIZATION_IDENTIFIER,
@ -157,7 +158,7 @@ const effectiveVisualizationData = computed(() => {
const visualizationData = nodeVisualizationData.value ?? expressionVisualizationData.value
if (!visualizationData) return
if (visualizationData.ok) return visualizationData.value
else return { name, error: new Error(visualizationData.error.payload) }
else return { name, error: new Error(`${visualizationData.error.payload}`) }
})
function updatePreprocessor(

View File

@ -8,7 +8,6 @@ export function useGraphEditorToasts() {
const toastStartup = useToast.info({ autoClose: false })
const toastConnectionLost = useToast.error({ autoClose: false })
const toastLspError = useToast.error()
const toastConnectionError = useToast.error()
const toastExecutionFailed = useToast.error()
toastStartup.show('Initializing the project. This can take up to one minute.')
@ -18,12 +17,10 @@ export function useGraphEditorToasts() {
toastConnectionLost.show('Lost connection to Language Server.'),
)
projectStore.lsRpcConnection.then(
(ls) => ls.client.onError((e) => toastLspError.show(`Language server error: ${e}`)),
(e) => toastConnectionError.show(`Connection to language server failed: ${JSON.stringify(e)}`),
)
projectStore.executionContext.on('executionComplete', () => toastExecutionFailed.dismiss())
projectStore.lsRpcConnection.client.onError((e) =>
toastLspError.show(`Language server error: ${e}`),
),
projectStore.executionContext.on('executionComplete', () => toastExecutionFailed.dismiss())
projectStore.executionContext.on('executionFailed', (e) =>
toastExecutionFailed.show(`Execution Failed: ${JSON.stringify(e)}`),
)

View File

@ -1,13 +1,14 @@
import { Awareness } from '@/stores/awareness'
import { Vec2 } from '@/util/data/vec2'
import type { DataServer } from '@/util/net/dataServer'
import { Keccak, sha3_224 as SHA3 } from '@noble/hashes/sha3'
import type { Hash } from '@noble/hashes/utils'
import { bytesToHex } from '@noble/hashes/utils'
import { escapeTextLiteral } from 'shared/ast'
import type { DataServer } from 'shared/dataServer'
import type { LanguageServer } from 'shared/languageServer'
import { ErrorCode, RemoteRpcError } from 'shared/languageServer'
import type { ContentRoot, Path, StackItem, Uuid } from 'shared/languageServerTypes'
import { Err, Ok, withContext, type Result } from 'shared/util/data/result'
import { markRaw, toRaw } from 'vue'
// === Constants ===
@ -54,8 +55,8 @@ export class Uploader {
}
static async Create(
rpc: Promise<LanguageServer>,
binary: Promise<DataServer>,
rpc: LanguageServer,
binary: DataServer,
contentRoots: Promise<ContentRoot[]>,
awareness: Awareness,
file: File,
@ -68,8 +69,8 @@ export class Uploader {
const projectRootId = roots.find((root) => root.type == 'Project')
if (!projectRootId) throw new Error('Could not find project root, uploading not possible.')
const instance = new Uploader(
await rpc,
await binary,
rpc,
binary,
awareness,
file,
projectRootId.id,
@ -81,7 +82,7 @@ export class Uploader {
return instance
}
async upload(): Promise<UploadResult> {
async upload(): Promise<Result<UploadResult>> {
// This non-standard property is defined in Electron.
if (
this.isOnLocalBackend &&
@ -89,18 +90,20 @@ export class Uploader {
'path' in this.file &&
typeof this.file.path === 'string'
) {
return { source: 'FileSystemRoot', name: this.file.path }
return Ok({ source: 'FileSystemRoot', name: this.file.path })
}
await this.ensureDataDirExists()
const dataDirExists = await this.ensureDataDirExists()
if (!dataDirExists.ok) return dataDirExists
const name = await this.pickUniqueName(this.file.name)
this.awareness.addOrUpdateUpload(name, {
if (!name.ok) return name
this.awareness.addOrUpdateUpload(name.value, {
sizePercentage: 0,
position: this.position,
stackItem: this.stackItem,
})
const remotePath: Path = { rootId: this.projectRootId, segments: [DATA_DIR_NAME, name] }
const remotePath: Path = { rootId: this.projectRootId, segments: [DATA_DIR_NAME, name.value] }
const uploader = this
const cleanup = this.cleanup.bind(this, name)
const cleanup = this.cleanup.bind(this, name.value)
const writableStream = new WritableStream<Uint8Array>({
async write(chunk: Uint8Array) {
await uploader.binary.writeBytes(remotePath, uploader.uploadedBytes, false, chunk)
@ -108,7 +111,7 @@ export class Uploader {
uploader.uploadedBytes += BigInt(chunk.length)
const bytes = Number(uploader.uploadedBytes)
const sizePercentage = Math.round((bytes / uploader.file.size) * 100)
uploader.awareness.addOrUpdateUpload(name, {
uploader.awareness.addOrUpdateUpload(name.value, {
sizePercentage,
position: uploader.position,
stackItem: uploader.stackItem,
@ -116,8 +119,6 @@ export class Uploader {
},
async close() {
cleanup()
// Disabled until https://github.com/enso-org/enso/issues/6691 is fixed.
// uploader.assertChecksum(remotePath)
},
async abort(reason: string) {
cleanup()
@ -125,21 +126,27 @@ export class Uploader {
throw new Error(`Uploading process aborted. ${reason}`)
},
})
// Disabled until https://github.com/enso-org/enso/issues/6691 is fixed.
// Plus, handle the error here, as it should be displayed to the user.
// uploader.assertChecksum(remotePath)
await this.file.stream().pipeTo(writableStream)
return { source: 'Project', name }
return Ok({ source: 'Project', name: name.value })
}
private cleanup(name: string) {
this.awareness.removeUpload(name)
}
private async assertChecksum(path: Path) {
private async assertChecksum(path: Path): Promise<Result<void>> {
const engineChecksum = await this.rpc.fileChecksum(path)
if (!engineChecksum.ok) return engineChecksum
const hexChecksum = bytesToHex(this.checksum.digest())
if (hexChecksum != engineChecksum.checksum) {
throw new Error(
`Uploading file failed, checksum does not match. ${hexChecksum} != ${engineChecksum.checksum}`,
if (hexChecksum != engineChecksum.value.checksum) {
return Err(
`Uploading file failed, checksum does not match. ${hexChecksum} != ${engineChecksum.value.checksum}`,
)
} else {
return Ok()
}
}
@ -149,38 +156,38 @@ export class Uploader {
private async ensureDataDirExists() {
const exists = await this.dataDirExists()
if (exists) return
await this.rpc.createFile({
type: 'Directory',
name: DATA_DIR_NAME,
path: { rootId: this.projectRootId, segments: [] },
})
if (!exists.ok) return exists
if (exists.value) return Ok()
return await withContext(
() => 'When creating directory for uploaded file',
async () => {
return await this.rpc.createFile({
type: 'Directory',
name: DATA_DIR_NAME,
path: { rootId: this.projectRootId, segments: [] },
})
},
)
}
private async dataDirExists(): Promise<boolean> {
try {
const info = await this.rpc.fileInfo(this.dataDirPath())
return info.attributes.kind.type == 'Directory'
} catch (error) {
if (
typeof error === 'object' &&
error &&
'cause' in error &&
error.cause instanceof RemoteRpcError
) {
if (
error.cause.code === ErrorCode.FILE_NOT_FOUND ||
error.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND
)
return false
}
throw error
private async dataDirExists(): Promise<Result<boolean>> {
const info = await this.rpc.fileInfo(this.dataDirPath())
if (info.ok) return Ok(info.value.attributes.kind.type == 'Directory')
else if (
info.error.payload.cause instanceof RemoteRpcError &&
(info.error.payload.cause.code === ErrorCode.FILE_NOT_FOUND ||
info.error.payload.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND)
) {
return Ok(false)
} else {
return info
}
}
private async pickUniqueName(suggestedName: string): Promise<string> {
private async pickUniqueName(suggestedName: string): Promise<Result<string>> {
const files = await this.rpc.listFiles(this.dataDirPath())
const existingNames = new Set(files.paths.map((path) => path.name))
if (!files.ok) return files
const existingNames = new Set(files.value.paths.map((path) => path.name))
const { stem, extension = '' } = splitFilename(suggestedName)
let candidate = suggestedName
let num = 1
@ -188,7 +195,7 @@ export class Uploader {
candidate = `${stem}_${num}.${extension}`
num += 1
}
return candidate
return Ok(candidate)
}
}

View File

@ -13,7 +13,8 @@ import {
functionCallConfiguration,
} from '@/providers/widgetRegistry/configuration'
import { useGraphStore } from '@/stores/graph'
import { useProjectStore, type NodeVisualizationConfiguration } from '@/stores/project'
import { useProjectStore } from '@/stores/project'
import { type NodeVisualizationConfiguration } from '@/stores/project/executionContext'
import { entryQn } from '@/stores/suggestionDatabase/entry'
import { assert, assertUnreachable } from '@/util/assert'
import { Ast } from '@/util/ast'

View File

@ -1,4 +1,4 @@
import type { ExecutionContext } from '@/stores/project'
import type { ExecutionContext } from '@/stores/project/executionContext'
import { ReactiveDb, ReactiveIndex } from '@/util/database/reactiveDb'
import type {
ExpressionId,

View File

@ -0,0 +1,342 @@
import { isSome, type Opt } from '@/util/data/opt'
import { Err, Ok, type Result } from '@/util/data/result'
import { AsyncQueue, type AbortScope } from '@/util/net'
import * as array from 'lib0/array'
import * as object from 'lib0/object'
import { ObservableV2 } from 'lib0/observable'
import * as random from 'lib0/random'
import type { LanguageServer } from 'shared/languageServer'
import type {
ContextId,
Diagnostic,
ExecutionEnvironment,
ExplicitCall,
ExpressionId,
ExpressionUpdate,
StackItem,
Uuid,
VisualizationConfiguration,
} from 'shared/languageServerTypes'
import { exponentialBackoff } from 'shared/util/net'
import type { ExternalId } from 'shared/yjsModel'
import { reactive } from 'vue'
export type NodeVisualizationConfiguration = Omit<
VisualizationConfiguration,
'executionContextId'
> & {
expressionId: ExternalId
}
function visualizationConfigEqual(
a: NodeVisualizationConfiguration,
b: NodeVisualizationConfiguration,
): boolean {
return (
a === b ||
(a.visualizationModule === b.visualizationModule &&
(a.positionalArgumentsExpressions === b.positionalArgumentsExpressions ||
(Array.isArray(a.positionalArgumentsExpressions) &&
Array.isArray(b.positionalArgumentsExpressions) &&
array.equalFlat(a.positionalArgumentsExpressions, b.positionalArgumentsExpressions))) &&
(a.expression === b.expression ||
(typeof a.expression === 'object' &&
typeof b.expression === 'object' &&
object.equalFlat(a.expression, b.expression))))
)
}
interface ExecutionContextState {
lsRpc: LanguageServer
created: boolean
visualizations: Map<Uuid, NodeVisualizationConfiguration>
stack: StackItem[]
}
type EntryPoint = Omit<ExplicitCall, 'type'>
type ExecutionContextNotification = {
'expressionUpdates'(updates: ExpressionUpdate[]): void
'visualizationEvaluationFailed'(
visualizationId: Uuid,
expressionId: ExpressionId,
message: string,
diagnostic: Diagnostic | undefined,
): void
'executionFailed'(message: string): void
'executionComplete'(): void
'executionStatus'(diagnostics: Diagnostic[]): void
'newVisualizationConfiguration'(configs: Set<Uuid>): void
'visualizationsConfigured'(configs: Set<Uuid>): void
}
/**
* Execution Context
*
* This class represent an execution context created in the Language Server. It creates
* it and pushes the initial frame upon construction.
*
* It hides the asynchronous nature of the language server. Each call is scheduled and
* run only when the previous call is done.
*/
export class ExecutionContext extends ObservableV2<ExecutionContextNotification> {
id: ContextId = random.uuidv4() as ContextId
queue: AsyncQueue<ExecutionContextState>
taskRunning = false
visSyncScheduled = false
desiredStack: StackItem[] = reactive([])
visualizationConfigs: Map<Uuid, NodeVisualizationConfiguration> = new Map()
constructor(
lsRpc: LanguageServer,
entryPoint: EntryPoint,
private abort: AbortScope,
) {
super()
this.abort.handleDispose(this)
this.queue = new AsyncQueue<ExecutionContextState>(
Promise.resolve({
lsRpc,
created: false,
visualizations: new Map(),
stack: [],
}),
)
this.registerHandlers()
this.create()
this.pushItem({ type: 'ExplicitCall', ...entryPoint })
this.recompute()
}
private async withBackoff<T>(f: () => Promise<Result<T>>, message: string): Promise<T> {
const result = await exponentialBackoff(f, {
onBeforeRetry: (error, _, delay) => {
if (this.abort.signal.aborted) return false
console.warn(`${error.message(message)}. Retrying after ${delay}ms...\n`)
},
})
if (result.ok) return result.value
else throw result.error
}
private syncVisualizations() {
if (this.visSyncScheduled || this.abort.signal.aborted) return
this.visSyncScheduled = true
this.queue.pushTask(async (state) => {
this.visSyncScheduled = false
if (!state.created || this.abort.signal.aborted) return state
this.emit('newVisualizationConfiguration', [new Set(this.visualizationConfigs.keys())])
const promises: Promise<void>[] = []
const attach = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() =>
state.lsRpc.attachVisualization(id, config.expressionId, {
executionContextId: this.id,
expression: config.expression,
visualizationModule: config.visualizationModule,
...(config.positionalArgumentsExpressions ?
{ positionalArgumentsExpressions: config.positionalArgumentsExpressions }
: {}),
}),
'Failed to attach visualization',
).then(() => {
state.visualizations.set(id, config)
})
}
const modify = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() =>
state.lsRpc.modifyVisualization(id, {
executionContextId: this.id,
expression: config.expression,
visualizationModule: config.visualizationModule,
...(config.positionalArgumentsExpressions ?
{ positionalArgumentsExpressions: config.positionalArgumentsExpressions }
: {}),
}),
'Failed to modify visualization',
).then(() => {
state.visualizations.set(id, config)
})
}
const detach = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() => state.lsRpc.detachVisualization(id, config.expressionId, this.id),
'Failed to detach visualization',
).then(() => {
state.visualizations.delete(id)
})
}
// Attach new and update existing visualizations.
for (const [id, config] of this.visualizationConfigs) {
const previousConfig = state.visualizations.get(id)
if (previousConfig == null) {
promises.push(attach(id, config))
} else if (!visualizationConfigEqual(previousConfig, config)) {
if (previousConfig.expressionId === config.expressionId) {
promises.push(modify(id, config))
} else {
promises.push(detach(id, previousConfig).then(() => attach(id, config)))
}
}
}
// Detach removed visualizations.
for (const [id, config] of state.visualizations) {
if (!this.visualizationConfigs.get(id)) {
promises.push(detach(id, config))
}
}
const settled = await Promise.allSettled(promises)
// Emit errors for failed requests.
const errors = settled
.map((result) => (result.status === 'rejected' ? result.reason : null))
.filter(isSome)
if (errors.length > 0) {
console.error('Failed to synchronize visualizations:', errors)
}
this.emit('visualizationsConfigured', [new Set(this.visualizationConfigs.keys())])
// State object was updated in-place in each successful promise.
return state
})
}
private pushItem(item: StackItem) {
this.desiredStack.push(item)
this.queue.pushTask(async (state) => {
if (!state.created) return state
await this.withBackoff(
() => state.lsRpc.pushExecutionContextItem(this.id, item),
'Failed to push item to execution context stack',
)
state.stack.push(item)
return state
})
}
push(expressionId: ExpressionId) {
this.pushItem({ type: 'LocalCall', expressionId })
}
pop() {
if (this.desiredStack.length === 1) {
console.debug('Cannot pop last item from execution context stack')
return
}
this.desiredStack.pop()
this.queue.pushTask(async (state) => {
if (!state.created) return state
if (state.stack.length === 1) {
console.debug('Cannot pop last item from execution context stack')
return state
}
await this.withBackoff(
() => state.lsRpc.popExecutionContextItem(this.id),
'Failed to pop item from execution context stack',
)
state.stack.pop()
return state
})
}
async setVisualization(id: Uuid, configuration: Opt<NodeVisualizationConfiguration>) {
if (configuration == null) {
this.visualizationConfigs.delete(id)
} else {
this.visualizationConfigs.set(id, configuration)
}
this.syncVisualizations()
}
private create() {
this.queue.pushTask(async (state) => {
if (state.created) return state
return this.withBackoff(async () => {
const result = await state.lsRpc.createExecutionContext(this.id)
if (!result.ok) return result
if (result.value.contextId !== this.id) {
return Err('Unexpected Context ID returned by the language server.')
}
state.lsRpc.retain()
return Ok({ ...state, created: true })
}, 'Failed to create execution context')
})
}
private registerHandlers() {
this.queue.pushTask(async (state) => {
this.abort.handleObserve(state.lsRpc, 'executionContext/expressionUpdates', (event) => {
if (event.contextId == this.id) this.emit('expressionUpdates', [event.updates])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionFailed', (event) => {
if (event.contextId == this.id) this.emit('executionFailed', [event.message])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionComplete', (event) => {
if (event.contextId == this.id) this.emit('executionComplete', [])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionStatus', (event) => {
if (event.contextId == this.id) this.emit('executionStatus', [event.diagnostics])
})
this.abort.handleObserve(
state.lsRpc,
'executionContext/visualizationEvaluationFailed',
(event) => {
if (event.contextId == this.id)
this.emit('visualizationEvaluationFailed', [
event.visualizationId,
event.expressionId,
event.message,
event.diagnostic,
])
},
)
return state
})
}
recompute(
expressionIds: 'all' | ExternalId[] = 'all',
executionEnvironment?: ExecutionEnvironment,
) {
this.queue.pushTask(async (state) => {
if (!state.created) return state
await state.lsRpc.recomputeExecutionContext(this.id, expressionIds, executionEnvironment)
return state
})
}
getStackBottom(): StackItem {
return this.desiredStack[0]!
}
getStackTop(): StackItem {
return this.desiredStack[this.desiredStack.length - 1]!
}
setExecutionEnvironment(mode: ExecutionEnvironment) {
this.queue.pushTask(async (state) => {
await state.lsRpc.setExecutionEnvironment(this.id, mode)
return state
})
}
dispose() {
this.queue.pushTask(async (state) => {
if (!state.created) return state
const result = await state.lsRpc.destroyExecutionContext(this.id)
if (!result.ok) {
result.error.log('Failed to destroy execution context')
}
state.lsRpc.release()
return { ...state, created: false }
})
}
}

View File

@ -1,43 +1,26 @@
import { injectGuiConfig, type GuiConfig } from '@/providers/guiConfig'
import { Awareness } from '@/stores/awareness'
import { ComputedValueRegistry } from '@/stores/project/computedValueRegistry'
import {
ExecutionContext,
type NodeVisualizationConfiguration,
} from '@/stores/project/executionContext'
import { VisualizationDataRegistry } from '@/stores/project/visualizationDataRegistry'
import { attachProvider, useObserveYjs } from '@/util/crdt'
import { nextEvent } from '@/util/data/observable'
import { isSome, type Opt } from '@/util/data/opt'
import { type Opt } from '@/util/data/opt'
import { Err, Ok, type Result } from '@/util/data/result'
import { ReactiveMapping } from '@/util/database/reactiveDb'
import {
AsyncQueue,
createRpcTransport,
createWebsocketClient,
rpcWithRetries as lsRpcWithRetries,
useAbortScope,
} from '@/util/net'
import { createDataWebsocket, createRpcTransport, useAbortScope } from '@/util/net'
import { DataServer } from '@/util/net/dataServer'
import { tryQualifiedName } from '@/util/qualifiedName'
import { Client, RequestManager } from '@open-rpc/client-js'
import { computedAsync } from '@vueuse/core'
import * as array from 'lib0/array'
import * as object from 'lib0/object'
import { ObservableV2 } from 'lib0/observable'
import * as random from 'lib0/random'
import { defineStore } from 'pinia'
import { OutboundPayload, VisualizationUpdate } from 'shared/binaryProtocol'
import { DataServer } from 'shared/dataServer'
import { LanguageServer } from 'shared/languageServer'
import type {
ContentRoot,
ContextId,
Diagnostic,
ExecutionEnvironment,
ExplicitCall,
ExpressionId,
ExpressionUpdate,
MethodPointer,
StackItem,
VisualizationConfiguration,
} from 'shared/languageServerTypes'
import type { AbortScope } from 'shared/util/net'
import type { Diagnostic, ExpressionId, MethodPointer } from 'shared/languageServerTypes'
import { type AbortScope } from 'shared/util/net'
import {
DistributedProject,
localUserActionOrigins,
@ -48,7 +31,6 @@ import {
computed,
markRaw,
onScopeDispose,
reactive,
ref,
shallowRef,
watch,
@ -75,360 +57,19 @@ function resolveLsUrl(config: GuiConfig): LsUrls {
throw new Error('Incomplete engine configuration')
}
async function initializeLsRpcConnection(
clientId: Uuid,
url: string,
abort: AbortScope,
): Promise<{
connection: LanguageServer
contentRoots: ContentRoot[]
}> {
function createLsRpcConnection(clientId: Uuid, url: string, abort: AbortScope): LanguageServer {
const transport = createRpcTransport(url)
const requestManager = new RequestManager([transport])
const client = new Client(requestManager)
const connection = new LanguageServer(client)
const connection = new LanguageServer(clientId, transport)
abort.onAbort(() => connection.release())
const initialization = await lsRpcWithRetries(() => connection.initProtocolConnection(clientId), {
onBeforeRetry: (error, _, delay) => {
console.warn(
`Failed to initialize language server connection, retrying after ${delay}ms...\n`,
error,
)
},
}).catch((error) => {
console.error('Error initializing Language Server RPC:', error)
throw error
})
const contentRoots = initialization.contentRoots
return { connection, contentRoots }
}
async function initializeDataConnection(clientId: Uuid, url: string, abort: AbortScope) {
const client = createWebsocketClient(url, abort, { binaryType: 'arraybuffer', sendPings: false })
const connection = new DataServer(client, abort)
onScopeDispose(() => connection.dispose())
await connection.initialize(clientId).catch((error) => {
console.error('Error initializing data connection:', error)
throw error
})
return connection
}
export type NodeVisualizationConfiguration = Omit<
VisualizationConfiguration,
'executionContextId'
> & {
expressionId: ExternalId
}
interface ExecutionContextState {
lsRpc: LanguageServer
created: boolean
visualizations: Map<Uuid, NodeVisualizationConfiguration>
stack: StackItem[]
}
function visualizationConfigEqual(
a: NodeVisualizationConfiguration,
b: NodeVisualizationConfiguration,
): boolean {
return (
a === b ||
(a.visualizationModule === b.visualizationModule &&
(a.positionalArgumentsExpressions === b.positionalArgumentsExpressions ||
(Array.isArray(a.positionalArgumentsExpressions) &&
Array.isArray(b.positionalArgumentsExpressions) &&
array.equalFlat(a.positionalArgumentsExpressions, b.positionalArgumentsExpressions))) &&
(a.expression === b.expression ||
(typeof a.expression === 'object' &&
typeof b.expression === 'object' &&
object.equalFlat(a.expression, b.expression))))
)
}
type EntryPoint = Omit<ExplicitCall, 'type'>
type ExecutionContextNotification = {
'expressionUpdates'(updates: ExpressionUpdate[]): void
'visualizationEvaluationFailed'(
visualizationId: Uuid,
expressionId: ExpressionId,
message: string,
diagnostic: Diagnostic | undefined,
): void
'executionFailed'(message: string): void
'executionComplete'(): void
'executionStatus'(diagnostics: Diagnostic[]): void
'newVisualizationConfiguration'(configs: Set<Uuid>): void
'visualizationsConfigured'(configs: Set<Uuid>): void
}
/**
* Execution Context
*
* This class represent an execution context created in the Language Server. It creates
* it and pushes the initial frame upon construction.
*
* It hides the asynchronous nature of the language server. Each call is scheduled and
* run only when the previous call is done.
*/
export class ExecutionContext extends ObservableV2<ExecutionContextNotification> {
id: ContextId = random.uuidv4() as ContextId
queue: AsyncQueue<ExecutionContextState>
taskRunning = false
visSyncScheduled = false
desiredStack: StackItem[] = reactive([])
visualizationConfigs: Map<Uuid, NodeVisualizationConfiguration> = new Map()
constructor(
lsRpc: Promise<LanguageServer>,
entryPoint: EntryPoint,
private abort: AbortScope,
) {
super()
this.abort.handleDispose(this)
this.queue = new AsyncQueue(
lsRpc.then((lsRpc) => ({
lsRpc,
created: false,
visualizations: new Map(),
stack: [],
})),
)
this.registerHandlers()
this.create()
this.pushItem({ type: 'ExplicitCall', ...entryPoint })
this.recompute()
}
private withBackoff<T>(f: () => Promise<T>, message: string): Promise<T> {
return lsRpcWithRetries(f, {
onBeforeRetry: (error, _, delay) => {
if (this.abort.signal.aborted) return false
console.warn(
`${message}: ${error.payload.cause.message}. Retrying after ${delay}ms...\n`,
error,
)
},
})
}
private syncVisualizations() {
if (this.visSyncScheduled || this.abort.signal.aborted) return
this.visSyncScheduled = true
this.queue.pushTask(async (state) => {
this.visSyncScheduled = false
if (!state.created || this.abort.signal.aborted) return state
this.emit('newVisualizationConfiguration', [new Set(this.visualizationConfigs.keys())])
const promises: Promise<void>[] = []
const attach = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() =>
state.lsRpc.attachVisualization(id, config.expressionId, {
executionContextId: this.id,
expression: config.expression,
visualizationModule: config.visualizationModule,
...(config.positionalArgumentsExpressions ?
{ positionalArgumentsExpressions: config.positionalArgumentsExpressions }
: {}),
}),
'Failed to attach visualization',
).then(() => {
state.visualizations.set(id, config)
})
}
const modify = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() =>
state.lsRpc.modifyVisualization(id, {
executionContextId: this.id,
expression: config.expression,
visualizationModule: config.visualizationModule,
...(config.positionalArgumentsExpressions ?
{ positionalArgumentsExpressions: config.positionalArgumentsExpressions }
: {}),
}),
'Failed to modify visualization',
).then(() => {
state.visualizations.set(id, config)
})
}
const detach = (id: Uuid, config: NodeVisualizationConfiguration) => {
return this.withBackoff(
() => state.lsRpc.detachVisualization(id, config.expressionId, this.id),
'Failed to detach visualization',
).then(() => {
state.visualizations.delete(id)
})
}
// Attach new and update existing visualizations.
for (const [id, config] of this.visualizationConfigs) {
const previousConfig = state.visualizations.get(id)
if (previousConfig == null) {
promises.push(attach(id, config))
} else if (!visualizationConfigEqual(previousConfig, config)) {
if (previousConfig.expressionId === config.expressionId) {
promises.push(modify(id, config))
} else {
promises.push(detach(id, previousConfig).then(() => attach(id, config)))
}
}
}
// Detach removed visualizations.
for (const [id, config] of state.visualizations) {
if (!this.visualizationConfigs.get(id)) {
promises.push(detach(id, config))
}
}
const settled = await Promise.allSettled(promises)
// Emit errors for failed requests.
const errors = settled
.map((result) => (result.status === 'rejected' ? result.reason : null))
.filter(isSome)
if (errors.length > 0) {
console.error('Failed to synchronize visualizations:', errors)
}
this.emit('visualizationsConfigured', [new Set(this.visualizationConfigs.keys())])
// State object was updated in-place in each successful promise.
return state
})
}
private pushItem(item: StackItem) {
this.desiredStack.push(item)
this.queue.pushTask(async (state) => {
if (!state.created) return state
await this.withBackoff(
() => state.lsRpc.pushExecutionContextItem(this.id, item),
'Failed to push item to execution context stack',
)
state.stack.push(item)
return state
})
}
push(expressionId: ExpressionId) {
this.pushItem({ type: 'LocalCall', expressionId })
}
pop() {
if (this.desiredStack.length === 1) {
console.debug('Cannot pop last item from execution context stack')
return
}
this.desiredStack.pop()
this.queue.pushTask(async (state) => {
if (!state.created) return state
if (state.stack.length === 1) {
console.debug('Cannot pop last item from execution context stack')
return state
}
await this.withBackoff(
() => state.lsRpc.popExecutionContextItem(this.id),
'Failed to pop item from execution context stack',
)
state.stack.pop()
return state
})
}
async setVisualization(id: Uuid, configuration: Opt<NodeVisualizationConfiguration>) {
if (configuration == null) {
this.visualizationConfigs.delete(id)
} else {
this.visualizationConfigs.set(id, configuration)
}
this.syncVisualizations()
}
private create() {
this.queue.pushTask(async (state) => {
if (state.created) return state
return this.withBackoff(async () => {
const result = await state.lsRpc.createExecutionContext(this.id)
if (result.contextId !== this.id) {
throw new Error('Unexpected Context ID returned by the language server.')
}
state.lsRpc.retain()
return { ...state, created: true }
}, 'Failed to create execution context')
})
}
private registerHandlers() {
this.queue.pushTask(async (state) => {
this.abort.handleObserve(state.lsRpc, 'executionContext/expressionUpdates', (event) => {
if (event.contextId == this.id) this.emit('expressionUpdates', [event.updates])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionFailed', (event) => {
if (event.contextId == this.id) this.emit('executionFailed', [event.message])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionComplete', (event) => {
if (event.contextId == this.id) this.emit('executionComplete', [])
})
this.abort.handleObserve(state.lsRpc, 'executionContext/executionStatus', (event) => {
if (event.contextId == this.id) this.emit('executionStatus', [event.diagnostics])
})
this.abort.handleObserve(
state.lsRpc,
'executionContext/visualizationEvaluationFailed',
(event) => {
if (event.contextId == this.id)
this.emit('visualizationEvaluationFailed', [
event.visualizationId,
event.expressionId,
event.message,
event.diagnostic,
])
},
)
return state
})
}
recompute(
expressionIds: 'all' | ExternalId[] = 'all',
executionEnvironment?: ExecutionEnvironment,
) {
this.queue.pushTask(async (state) => {
if (!state.created) return state
await state.lsRpc.recomputeExecutionContext(this.id, expressionIds, executionEnvironment)
return state
})
}
getStackBottom(): StackItem {
return this.desiredStack[0]!
}
getStackTop(): StackItem {
return this.desiredStack[this.desiredStack.length - 1]!
}
setExecutionEnvironment(mode: ExecutionEnvironment) {
this.queue.pushTask(async (state) => {
await state.lsRpc.setExecutionEnvironment(this.id, mode)
return state
})
}
dispose() {
this.queue.pushTask(async (state) => {
if (!state.created) return state
await state.lsRpc.destroyExecutionContext(this.id)
state.lsRpc.release()
return { ...state, created: false }
})
}
function initializeDataConnection(clientId: Uuid, url: string, abort: AbortScope) {
const client = createDataWebsocket(url, 'arraybuffer')
const connection = new DataServer(clientId, client, abort)
abort.handleDispose(connection)
onScopeDispose(() => connection.dispose())
return connection
}
/**
@ -451,21 +92,8 @@ export const useProjectStore = defineStore('project', () => {
const clientId = random.uuidv4() as Uuid
const lsUrls = resolveLsUrl(config.value)
const initializedConnection = initializeLsRpcConnection(clientId, lsUrls.rpcUrl, abort)
const lsRpcConnection = initializedConnection.then(
({ connection }) => connection,
(error) => {
console.error('Error getting Language Server connection:', error)
throw error
},
)
const contentRoots = initializedConnection.then(
({ contentRoots }) => contentRoots,
(error) => {
console.error('Error getting content roots:', error)
throw error
},
)
const lsRpcConnection = createLsRpcConnection(clientId, lsUrls.rpcUrl, abort)
const contentRoots = lsRpcConnection.contentRoots
const dataConnection = initializeDataConnection(clientId, lsUrls.dataUrl, abort)
const rpcUrl = new URL(lsUrls.rpcUrl)
@ -563,14 +191,9 @@ export const useProjectStore = defineStore('project', () => {
)
}
const firstExecution = lsRpcConnection.then(
(lsRpc) =>
nextEvent(lsRpc, 'executionContext/executionComplete').catch((error) => {
console.error('First execution failed:', error)
throw error
}),
const firstExecution = nextEvent(lsRpcConnection, 'executionContext/executionComplete').catch(
(error) => {
console.error('Could not get Language Server for first execution:', error)
console.error('First execution failed:', error)
throw error
},
)
@ -597,13 +220,7 @@ export const useProjectStore = defineStore('project', () => {
{ immediate: true, flush: 'post' },
)
return computed(() => {
const json = visualizationDataRegistry.getRawData(id)
if (!json?.ok) return json ?? undefined
const parsed = Ok(JSON.parse(json.value))
markRaw(parsed)
return parsed
})
return computed(() => parseVisualizationData(visualizationDataRegistry.getRawData(id)))
}
const dataflowErrors = new ReactiveMapping(computedValueRegistry.db, (id, info) => {
@ -648,37 +265,52 @@ export const useProjectStore = defineStore('project', () => {
function executeExpression(
expressionId: ExternalId,
expression: string,
): Promise<Result<string> | null> {
): Promise<Result<any> | null> {
return new Promise((resolve) => {
Promise.all([lsRpcConnection, dataConnection]).then(([lsRpc, data]) => {
const visualizationId = random.uuidv4() as Uuid
const dataHandler = (visData: VisualizationUpdate, uuid: Uuid | null) => {
if (uuid === visualizationId) {
const dataStr = visData.dataString()
resolve(dataStr != null ? Ok(dataStr) : null)
data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.off('visualizationEvaluationFailed', errorHandler)
}
const visualizationId = random.uuidv4() as Uuid
const dataHandler = (visData: VisualizationUpdate, uuid: Uuid | null) => {
if (uuid === visualizationId) {
dataConnection.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.off('visualizationEvaluationFailed', errorHandler)
const dataStr = Ok(visData.dataString())
resolve(parseVisualizationData(dataStr))
}
const errorHandler = (
uuid: Uuid,
_expressionId: ExpressionId,
message: string,
_diagnostic: Diagnostic | undefined,
) => {
if (uuid == visualizationId) {
resolve(Err(message))
data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.off('visualizationEvaluationFailed', errorHandler)
}
}
const errorHandler = (
uuid: Uuid,
_expressionId: ExpressionId,
message: string,
_diagnostic: Diagnostic | undefined,
) => {
if (uuid == visualizationId) {
resolve(Err(message))
dataConnection.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.off('visualizationEvaluationFailed', errorHandler)
}
data.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.on('visualizationEvaluationFailed', errorHandler)
lsRpc.executeExpression(executionContext.id, visualizationId, expressionId, expression)
})
}
dataConnection.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler)
executionContext.on('visualizationEvaluationFailed', errorHandler)
return lsRpcConnection.executeExpression(
executionContext.id,
visualizationId,
expressionId,
expression,
)
})
}
function parseVisualizationData(data: Result<string | null> | null): Result<any> | null {
if (!data?.ok) return data
if (data.value == null) return null
try {
return Ok(markRaw(JSON.parse(data.value)))
} catch (error) {
if (error instanceof SyntaxError)
return Err(`Parsing visualization result failed: ${error.message}`)
else throw error
}
}
const { executionMode } = setupSettings(projectModel)
function disposeYDocsProvider() {

View File

@ -1,7 +1,7 @@
import type { ExecutionContext } from '@/stores/project'
import type { ExecutionContext } from '@/stores/project/executionContext'
import { Err, Ok, type Result } from '@/util/data/result'
import type { DataServer } from '@/util/net/dataServer'
import { OutboundPayload, VisualizationUpdate } from 'shared/binaryProtocol'
import type { DataServer } from 'shared/dataServer'
import type {
Diagnostic,
ExpressionId,
@ -24,21 +24,19 @@ export class VisualizationDataRegistry {
/** This map stores only keys representing attached visualization. The responses for
* executeExpression are handled by project store's `executeExpression` method. */
private visualizationValues: Map<Uuid, Result<string> | null>
private dataServer: Promise<DataServer>
private dataServer: DataServer
private executionContext: ExecutionContext
private reconfiguredHandler = this.visualizationsConfigured.bind(this)
private dataHandler = this.visualizationUpdate.bind(this)
private errorHandler = this.visualizationError.bind(this)
constructor(executionContext: ExecutionContext, dataServer: Promise<DataServer>) {
constructor(executionContext: ExecutionContext, dataServer: DataServer) {
this.executionContext = executionContext
this.dataServer = dataServer
this.visualizationValues = reactive(new Map())
this.executionContext.on('newVisualizationConfiguration', this.reconfiguredHandler)
this.dataServer.then((data) => {
data.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler)
})
dataServer.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler)
this.executionContext.on('visualizationEvaluationFailed', this.errorHandler)
}
@ -88,9 +86,7 @@ export class VisualizationDataRegistry {
dispose() {
this.executionContext.off('visualizationsConfigured', this.reconfiguredHandler)
this.dataServer.then((data) => {
data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler)
})
this.dataServer.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler)
this.executionContext.off('visualizationEvaluationFailed', this.errorHandler)
}
}

View File

@ -2,7 +2,7 @@ import { useProjectStore } from '@/stores/project'
import { entryQn, type SuggestionEntry, type SuggestionId } from '@/stores/suggestionDatabase/entry'
import { applyUpdates, entryFromLs } from '@/stores/suggestionDatabase/lsUpdate'
import { ReactiveDb, ReactiveIndex } from '@/util/database/reactiveDb'
import { AsyncQueue, rpcWithRetries } from '@/util/net'
import { AsyncQueue } from '@/util/net'
import {
normalizeQualifiedName,
qnJoin,
@ -13,6 +13,7 @@ import {
import { defineStore } from 'pinia'
import { LanguageServer } from 'shared/languageServer'
import type { MethodPointer } from 'shared/languageServerTypes'
import { exponentialBackoff } from 'shared/util/net'
import { markRaw, ref, type Ref } from 'vue'
export class SuggestionDb extends ReactiveDb<SuggestionId, SuggestionEntry> {
@ -72,10 +73,13 @@ class Synchronizer {
public groups: Ref<Group[]>,
) {
const projectStore = useProjectStore()
const initState = projectStore.lsRpcConnection.then(async (lsRpc) => {
await rpcWithRetries(() =>
lsRpc.acquireCapability('search/receivesSuggestionsDatabaseUpdates', {}),
)
const lsRpc = projectStore.lsRpcConnection
const initState = exponentialBackoff(() =>
lsRpc.acquireCapability('search/receivesSuggestionsDatabaseUpdates', {}),
).then((capability) => {
if (!capability.ok) {
capability.error.log('Will not receive database updates')
}
this.setupUpdateHandler(lsRpc)
this.loadGroups(lsRpc, projectStore.firstExecution)
return Synchronizer.loadDatabase(entries, lsRpc, groups.value)
@ -89,8 +93,14 @@ class Synchronizer {
lsRpc: LanguageServer,
groups: Group[],
): Promise<{ currentVersion: number }> {
const initialDb = await lsRpc.getSuggestionsDatabase()
for (const lsEntry of initialDb.entries) {
const initialDb = await exponentialBackoff(() => lsRpc.getSuggestionsDatabase())
if (!initialDb.ok) {
initialDb.error.log(
'Cannot load initial suggestion database. Continuing with empty suggestion database',
)
return { currentVersion: 0 }
}
for (const lsEntry of initialDb.value.entries) {
const entry = entryFromLs(lsEntry.suggestion, groups)
if (!entry.ok) {
entry.error.log()
@ -99,7 +109,7 @@ class Synchronizer {
entries.set(lsEntry.id, entry.value)
}
}
return { currentVersion: initialDb.currentVersion }
return { currentVersion: initialDb.value.currentVersion }
}
private setupUpdateHandler(lsRpc: LanguageServer) {
@ -132,8 +142,12 @@ class Synchronizer {
private async loadGroups(lsRpc: LanguageServer, firstExecution: Promise<unknown>) {
this.queue.pushTask(async ({ currentVersion }) => {
await firstExecution
const groups = await lsRpc.getComponentGroups()
this.groups.value = groups.componentGroups.map(
const groups = await exponentialBackoff(() => lsRpc.getComponentGroups())
if (!groups.ok) {
groups.error.log('Cannot read component groups. Continuing without gruops.')
return { currentVersion }
}
this.groups.value = groups.value.componentGroups.map(
(group): Group => ({
name: group.name,
...(group.color ? { color: group.color } : {}),

View File

@ -19,8 +19,8 @@ import { VisualizationModule } from '@/stores/visualization/runtimeTypes'
import { assertNever } from '@/util/assert'
import { toError } from '@/util/data/error'
import type { Opt } from '@/util/data/opt'
import type { DataServer } from '@/util/net/dataServer'
import { Error as DataError } from 'shared/binaryProtocol'
import type { DataServer } from 'shared/dataServer'
import type { Uuid } from 'shared/languageServerTypes'
import * as vue from 'vue'

View File

@ -24,7 +24,6 @@ import type { VisualizationModule } from '@/stores/visualization/runtimeTypes'
import type { Opt } from '@/util/data/opt'
import { isUrlString } from '@/util/data/urlString'
import { isIconName } from '@/util/iconName'
import { rpcWithRetries } from '@/util/net'
import { defineStore } from 'pinia'
import { ErrorCode, LsRpcError, RemoteRpcError } from 'shared/languageServer'
import type { Event as LSEvent, VisualizationConfiguration } from 'shared/languageServerTypes'
@ -214,21 +213,20 @@ export const useVisualizationStore = defineStore('visualization', () => {
console.error('Could not load custom visualizations: Project directory not found.')
return
}
try {
await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries)
.promise
} catch (error) {
const watching = await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent)
.promise
if (!watching.ok) {
if (
error instanceof LsRpcError &&
error.cause instanceof RemoteRpcError &&
error.cause.code === ErrorCode.FILE_NOT_FOUND
watching.error.payload instanceof LsRpcError &&
watching.error.payload.cause instanceof RemoteRpcError &&
watching.error.payload.cause.code === ErrorCode.FILE_NOT_FOUND
) {
console.info(
"'visualizations/' folder not found in project directory. " +
"If you have custom visualizations, please put them under 'visualizations/'.",
)
} else {
throw error
watching.error.log('Could not load custom visualizations')
}
}
})

View File

@ -1,5 +1,4 @@
import { Err, Ok, ResultError } from '@/util/data/result'
import { AsyncQueue, exponentialBackoff } from '@/util/net'
import { AsyncQueue } from '@/util/net'
import { wait } from 'lib0/promise'
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'
@ -51,67 +50,3 @@ describe('AsyncQueue', () => {
expect(await queue.waitForCompletion()).toBe(5)
})
})
describe('exponentialBackoff', () => {
test('runs successful task once', async () => {
const task = vi.fn(async () => Ok(1))
const result = await exponentialBackoff(task)
expect(result).toEqual({ ok: true, value: 1 })
expect(task).toHaveBeenCalledTimes(1)
})
test('retry failing task up to a limit', async () => {
const task = vi.fn(async () => Err(1))
const promise = exponentialBackoff(task, { maxRetries: 4 })
vi.runAllTimersAsync()
const result = await promise
expect(result).toEqual({ ok: false, error: new ResultError(1) })
expect(task).toHaveBeenCalledTimes(5)
})
test('wait before retrying', async () => {
const task = vi.fn(async () => Err(null))
exponentialBackoff(task, {
maxRetries: 10,
retryDelay: 100,
retryDelayMultiplier: 3,
retryDelayMax: 1000,
})
expect(task).toHaveBeenCalledTimes(1)
await vi.advanceTimersByTimeAsync(100)
expect(task).toHaveBeenCalledTimes(2)
await vi.advanceTimersByTimeAsync(300)
expect(task).toHaveBeenCalledTimes(3)
await vi.advanceTimersByTimeAsync(900)
expect(task).toHaveBeenCalledTimes(4)
await vi.advanceTimersByTimeAsync(5000)
expect(task).toHaveBeenCalledTimes(9)
})
test('retry task until success', async () => {
const task = vi.fn()
task.mockReturnValueOnce(Promise.resolve(Err(3)))
task.mockReturnValueOnce(Promise.resolve(Err(2)))
task.mockReturnValueOnce(Promise.resolve(Ok(1)))
const promise = exponentialBackoff(task)
vi.runAllTimersAsync()
const result = await promise
expect(result).toEqual({ ok: true, value: 1 })
expect(task).toHaveBeenCalledTimes(3)
})
test('call retry callback', async () => {
const task = vi.fn()
task.mockReturnValueOnce(Promise.resolve(Err(3)))
task.mockReturnValueOnce(Promise.resolve(Err(2)))
task.mockReturnValueOnce(Promise.resolve(Ok(1)))
const onBeforeRetry = vi.fn()
const promise = exponentialBackoff(task, { onBeforeRetry })
vi.runAllTimersAsync()
await promise
expect(onBeforeRetry).toHaveBeenCalledTimes(2)
expect(onBeforeRetry).toHaveBeenNthCalledWith(1, new ResultError(3), 0, 1000)
expect(onBeforeRetry).toHaveBeenNthCalledWith(2, new ResultError(2), 1, 2000)
})
})

View File

@ -39,6 +39,11 @@ export class Pattern {
return matches
}
/** Check if the given expression matches the pattern */
test(target: Ast.Ast): boolean {
return this.match(target) != null
}
/** Create a new concrete example of the pattern, with the placeholders replaced with the given subtrees. */
instantiate(edit: MutableModule, subtrees: Ast.Owned[]): Ast.Owned {
const template = edit.copy(this.template)

View File

@ -1,107 +1,37 @@
import { ResultError, rejectionToResult, type Result } from '@/util/data/result'
import { WebSocketTransport } from '@open-rpc/client-js'
import type {
IJSONRPCNotificationResponse,
JSONRPCRequestData,
} from '@open-rpc/client-js/build/Request'
import { Transport } from '@open-rpc/client-js/build/transports/Transport'
import { type ArgumentsType } from '@vueuse/core'
import { wait } from 'lib0/promise'
import { LsRpcError } from 'shared/languageServer'
import { WebSocket as ReconnectingWebSocket } from 'partysocket'
import { type WebSocketEventMap } from 'partysocket/ws'
import type { Notifications } from 'shared/languageServerTypes'
import { AbortScope } from 'shared/util/net'
import { WebsocketClient } from 'shared/websocket'
import { AbortScope, type ReconnectingTransportWithWebsocketEvents } from 'shared/util/net'
import ReconnectingWebSocketTransport from 'shared/util/net/ReconnectingWSTransport'
import { onScopeDispose } from 'vue'
export interface BackoffOptions<E> {
maxRetries?: number
retryDelay?: number
retryDelayMultiplier?: number
retryDelayMax?: number
/**
* Called when the promise return an error result, and the next retry is about to be attempted.
* When this function returns `false`, the backoff is immediately aborted. When this function is
* not provided, the backoff will always continue until the maximum number of retries is reached.
*/
onBeforeRetry?: (error: ResultError<E>, retryCount: number, delay: number) => boolean | void
}
export { AbortScope } from 'shared/util/net'
const defaultBackoffOptions: Required<BackoffOptions<any>> = {
maxRetries: 3,
retryDelay: 1000,
retryDelayMultiplier: 2,
retryDelayMax: 10000,
onBeforeRetry: () => {},
}
/**
* Retry a failing promise function with exponential backoff.
*/
export async function exponentialBackoff<T, E>(
f: () => Promise<Result<T, E>>,
backoffOptions?: BackoffOptions<E>,
): Promise<Result<T, E>> {
const options = { ...defaultBackoffOptions, ...backoffOptions }
for (
let retries = 0, delay = options.retryDelay;
;
retries += 1, delay = Math.min(options.retryDelayMax, delay * options.retryDelayMultiplier)
) {
const result = await f()
if (
result.ok ||
retries >= options.maxRetries ||
options.onBeforeRetry(result.error, retries, delay) === false
) {
return result
}
await wait(delay)
}
}
export const lsRequestResult = rejectionToResult(LsRpcError)
/**
* Retry a failing Language Server RPC call with exponential backoff. The provided async function is
* called on each retry.
*/
export async function rpcWithRetries<T>(
f: () => Promise<T>,
backoffOptions?: BackoffOptions<LsRpcError>,
): Promise<T> {
const result = await exponentialBackoff(() => lsRequestResult(f()), backoffOptions)
if (result.ok) return result.value
else {
console.error('Too many failed retries.')
throw result.error
}
}
type QueueTask<State> = (state: State) => Promise<State>
export function createRpcTransport(url: string): Transport {
export function createRpcTransport(url: string): ReconnectingTransportWithWebsocketEvents {
if (url.startsWith('mock://')) {
const mockName = url.slice('mock://'.length)
return new MockTransport(mockName)
} else {
const transport = new WebSocketTransport(url)
const transport = new ReconnectingWebSocketTransport(url)
return transport
}
}
export function createWebsocketClient(
url: string,
abort: AbortScope,
options?: { binaryType?: 'arraybuffer' | 'blob' | null; sendPings?: boolean },
): WebsocketClient {
export function createDataWebsocket(url: string, binaryType: 'arraybuffer' | 'blob'): WebSocket {
if (url.startsWith('mock://')) {
const mockWs = new MockWebSocketClient(url, abort)
if (options?.binaryType) mockWs.binaryType = options.binaryType
const mockWs = new MockWebSocket(url, url.slice('mock://'.length))
mockWs.binaryType = binaryType
return mockWs
} else {
const client = new WebsocketClient(url, abort, options)
client.connect()
return client
const websocket = new ReconnectingWebSocket(url)
websocket.binaryType = binaryType
return websocket as WebSocket
}
}
@ -111,6 +41,7 @@ export interface MockTransportData<Methods extends string = string> {
export class MockTransport extends Transport {
static mocks: Map<string, MockTransportData> = new Map()
private openEventListeners = new Set<(event: WebSocketEventMap['open']) => void>()
constructor(public name: string) {
super()
}
@ -119,8 +50,10 @@ export class MockTransport extends Transport {
MockTransport.mocks.set(name, data as any)
}
connect(): Promise<any> {
for (const listener of this.openEventListeners) listener(new Event('open'))
return Promise.resolve()
}
reconnect() {}
close(): void {}
sendData(data: JSONRPCRequestData, timeout?: number | null): Promise<any> {
if (Array.isArray(data)) return Promise.all(data.map((d) => this.sendData(d.request, timeout)))
@ -136,6 +69,15 @@ export class MockTransport extends Transport {
params,
} as IJSONRPCNotificationResponse)
}
on<K extends keyof WebSocketEventMap>(type: K, cb: (event: WebSocketEventMap[K]) => void): void {
if (type === 'open')
this.openEventListeners.add(cb as (event: WebSocketEventMap['open']) => void)
}
off<K extends keyof WebSocketEventMap>(type: K, cb: (event: WebSocketEventMap[K]) => void): void {
if (type === 'open')
this.openEventListeners.delete(cb as (event: WebSocketEventMap['open']) => void)
}
}
export interface WebSocketHandler {
@ -188,12 +130,7 @@ export class MockWebSocket extends EventTarget implements WebSocket {
}
}
export class MockWebSocketClient extends WebsocketClient {
constructor(url: string, abort: AbortScope) {
super(url, abort)
super.connect(new MockWebSocket(url, url.slice('mock://'.length)))
}
}
type QueueTask<State> = (state: State) => Promise<State>
/**
* A serializing queue of asynchronous tasks transforming a state. Each task is a function that

View File

@ -1,6 +1,5 @@
import { ObservableV2 } from 'lib0/observable'
import * as random from 'lib0/random'
import type { Path as LSPath } from 'shared/languageServerTypes'
import {
Builder,
ByteBuffer,
@ -28,11 +27,12 @@ import {
type AnyInboundPayload,
type Offset,
type Table,
} from './binaryProtocol'
import type { AbortScope } from './util/net'
import { uuidFromBits, uuidToBits } from './uuid'
import type { WebsocketClient } from './websocket'
import type { Uuid } from './yjsModel'
} from 'shared/binaryProtocol'
import type { Path as LSPath } from 'shared/languageServerTypes'
import { Err, Ok, type Result } from 'shared/util/data/result'
import { exponentialBackoff, type AbortScope } from 'shared/util/net'
import { uuidFromBits, uuidToBits } from 'shared/uuid'
import type { Uuid } from 'shared/yjsModel'
const PAYLOAD_CONSTRUCTOR = {
[OutboundPayload.NONE]: None,
@ -53,27 +53,20 @@ export type DataServerEvents = {
}
export class DataServer extends ObservableV2<DataServerEvents> {
initialized = false
ready: Promise<void>
clientId!: string
initialized: Promise<Result<void, Error>>
private initializationScheduled = false
resolveCallbacks = new Map<string, (data: any) => void>()
/** `websocket.binaryType` should be `ArrayBuffer`. */
constructor(
public websocket: WebsocketClient,
public clientId: string,
public websocket: WebSocket,
abort: AbortScope,
) {
super()
abort.handleDispose(this)
if (websocket.connected) {
this.ready = Promise.resolve()
} else {
this.ready = new Promise((resolve, reject) => {
websocket.on('connect', () => resolve())
websocket.on('disconnect', reject)
})
}
websocket.on('message', (rawPayload) => {
websocket.addEventListener('message', ({ data: rawPayload }) => {
if (!(rawPayload instanceof ArrayBuffer)) {
console.warn('Data Server: Data type was invalid:', rawPayload)
// Ignore all non-binary messages. If the messages are `Blob`s instead, this is a
@ -98,25 +91,57 @@ export class DataServer extends ObservableV2<DataServerEvents> {
this.emit(`${payloadType}`, [payload, uuid])
}
})
websocket.addEventListener('error', (error) =>
console.error('Language Server Binary socket error:', error),
)
websocket.addEventListener('close', () => {
this.scheduleInitializationAfterConnect()
})
this.initialized = this.initialize()
}
dispose() {
this.websocket.close()
this.resolveCallbacks.clear()
}
async initialize(clientId: Uuid) {
if (!this.initialized) {
this.clientId = clientId
await this.ready
await this.initSession()
}
private scheduleInitializationAfterConnect() {
if (this.initializationScheduled) return this.initialized
this.initializationScheduled = true
this.initialized = new Promise((resolve) => {
const cb = () => {
this.websocket.removeEventListener('open', cb)
this.initializationScheduled = false
resolve(this.initialize())
}
this.websocket.addEventListener('open', cb)
})
return this.initialized
}
protected send<T = void>(
private initialize() {
return exponentialBackoff(() => this.initSession().then(responseAsResult), {
onBeforeRetry: (error, _, delay) => {
console.warn(
`Failed to initialize language server binary connection, retrying after ${delay}ms...\n`,
error,
)
},
}).then((result) => {
if (!result.ok) {
result.error.log('Error initializing Language Server Binary Protocol')
return result
} else return Ok()
})
}
protected async send<T = void>(
builder: Builder,
payloadType: InboundPayload,
payloadOffset: Offset<AnyInboundPayload>,
): Promise<T> {
waitForInit: boolean = true,
): Promise<T | Error> {
const messageUuid = random.uuidv4()
const rootTable = InboundMessage.createInboundMessage(
builder,
@ -125,10 +150,16 @@ export class DataServer extends ObservableV2<DataServerEvents> {
payloadType,
payloadOffset,
)
const promise = new Promise<T>((resolve) => {
if (waitForInit) {
const initResult = await this.initialized
if (!initResult.ok) {
return initResult.error.payload
}
}
this.websocket.send(builder.finish(rootTable).toArrayBuffer())
const promise = new Promise<T | Error>((resolve) => {
this.resolveCallbacks.set(messageUuid, resolve)
})
this.websocket.send(builder.finish(rootTable).toArrayBuffer())
return promise
}
@ -143,7 +174,7 @@ export class DataServer extends ObservableV2<DataServerEvents> {
builder,
this.createUUID(this.clientId),
)
return this.send(builder, InboundPayload.INIT_SESSION_CMD, commandOffset)
return this.send(builder, InboundPayload.INIT_SESSION_CMD, commandOffset, false)
}
async writeFile(
@ -213,3 +244,8 @@ export class DataServer extends ObservableV2<DataServerEvents> {
return await this.send(builder, InboundPayload.WRITE_BYTES_CMD, command)
}
}
function responseAsResult<T>(resp: T | Error): Result<T, Error> {
if (resp instanceof Error) return Err(resp)
else return Ok(resp)
}

View File

@ -1,4 +1,3 @@
import { Client, RequestManager, WebSocketTransport } from '@open-rpc/client-js'
import * as json from 'lib0/json'
import * as map from 'lib0/map'
import { ObservableV2 } from 'lib0/observable'
@ -8,9 +7,18 @@ import * as Ast from '../shared/ast'
import { astCount } from '../shared/ast'
import { EnsoFileParts, combineFileParts, splitFileContents } from '../shared/ensoFile'
import { LanguageServer, computeTextChecksum } from '../shared/languageServer'
import { Checksum, FileEdit, Path, TextEdit, response } from '../shared/languageServerTypes'
import { exponentialBackoff, printingCallbacks } from '../shared/retry'
import { AbortScope } from '../shared/util/net'
import {
Checksum,
FileEdit,
FileEventKind,
Path,
TextEdit,
response,
} from '../shared/languageServerTypes'
import { assertNever } from '../shared/util/assert'
import { Err, Ok, Result, withContext } from '../shared/util/data/result'
import { AbortScope, exponentialBackoff, printingCallbacks } from '../shared/util/net'
import ReconnectingWebSocketTransport from '../shared/util/net/ReconnectingWSTransport'
import {
DistributedProject,
ExternalId,
@ -34,22 +42,12 @@ const EXTENSION = '.enso'
const DEBUG_LOG_SYNC = false
function createOpenRPCClient(url: string) {
const transport = new WebSocketTransport(url)
const requestManager = new RequestManager([transport])
transport.connection.on('error', (error) =>
console.error('Language Server transport error:', error),
)
return new Client(requestManager)
}
export class LanguageServerSession {
clientId: Uuid
indexDoc: WSSharedDoc
docs: Map<string, WSSharedDoc>
retainCount: number
url: string
client: Client
ls: LanguageServer
connection: response.InitProtocolConnection | undefined
model: DistributedProject
@ -78,9 +76,9 @@ export class LanguageServerSession {
if (!persistence) continue
}
})
const { client, ls } = this.setupClient()
this.client = client
this.ls = ls
this.ls = new LanguageServer(this.clientId, new ReconnectingWebSocketTransport(this.url))
this.clientScope.onAbort(() => this.ls.release())
this.setupClient()
}
static sessions = new Map<string, LanguageServerSession>()
@ -95,28 +93,52 @@ export class LanguageServerSession {
}
private restartClient() {
this.clientScope.dispose('Client restarted.')
this.clientScope = new AbortScope()
this.connection = undefined
this.setupClient()
this.ls.reconnect()
return exponentialBackoff(() => this.readInitialState())
}
private setupClient() {
this.client = createOpenRPCClient(this.url)
this.ls = new LanguageServer(this.client)
this.clientScope.onAbort(() => this.ls.release())
this.ls.on('file/event', async (event) => {
if (DEBUG_LOG_SYNC) {
console.log('file/event', event)
}
const result = await this.handleFileEvent(event)
if (!result.ok) this.restartClient()
})
this.ls.on('text/fileModifiedOnDisk', async (event) => {
const path = event.path.segments.join('/')
try {
const result = await exponentialBackoff(
async () => this.tryGetExistingModuleModel(event.path)?.reload() ?? Ok(),
printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`),
)
if (!result.ok) this.restartClient()
})
exponentialBackoff(
() => this.readInitialState(),
printingCallbacks('read initial state', 'read initial state'),
).then((result) => {
if (!result.ok) {
result.error.log('Could not read initial state')
exponentialBackoff(
async () => this.restartClient(),
printingCallbacks('restarted RPC client', 'restart RPC client'),
)
}
})
}
private handleFileEvent(event: { path: Path; kind: FileEventKind }): Promise<Result<void>> {
return withContext(
() => 'Handling file/event',
async () => {
const path = event.path.segments.join('/')
switch (event.kind) {
case 'Added': {
if (isSourceFile(event.path)) {
const fileInfo = await this.ls.fileInfo(event.path)
if (fileInfo.attributes.kind.type == 'File') {
await exponentialBackoff(
if (!fileInfo.ok) return fileInfo
if (fileInfo.value.attributes.kind.type == 'File') {
return await exponentialBackoff(
() => this.getModuleModel(event.path).open(),
printingCallbacks(`opened new file '${path}'`, `open new file '${path}'`),
)
@ -125,74 +147,59 @@ export class LanguageServerSession {
break
}
case 'Modified': {
await exponentialBackoff(
async () => this.tryGetExistingModuleModel(event.path)?.reload(),
return await exponentialBackoff(
() => this.tryGetExistingModuleModel(event.path)?.reload() ?? Promise.resolve(Ok()),
printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`),
)
break
}
}
} catch {
this.restartClient()
}
})
this.ls.on('text/fileModifiedOnDisk', async (event) => {
const path = event.path.segments.join('/')
try {
await exponentialBackoff(
async () => this.tryGetExistingModuleModel(event.path)?.reload(),
printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`),
)
} catch {
this.restartClient()
}
})
exponentialBackoff(
() => this.readInitialState(),
printingCallbacks('read initial state', 'read initial state'),
).catch((error) => {
console.error('Could not read initial state.')
console.error(error)
exponentialBackoff(
async () => this.restartClient(),
printingCallbacks('restarted RPC client', 'restart RPC client'),
)
})
return { client: this.client, ls: this.ls }
return Ok()
},
)
}
private assertProjectRoot(): asserts this is { projectRootId: Uuid } {
if (this.projectRootId == null) throw new Error('Missing project root')
}
private async readInitialState() {
let moduleOpenPromises: Promise<void>[] = []
try {
const connection = this.connection ?? (await this.ls.initProtocolConnection(this.clientId))
this.connection = connection
const projectRoot = connection.contentRoots.find((root) => root.type === 'Project')
if (!projectRoot) throw new Error('Missing project root')
this.projectRootId = projectRoot.id
await this.ls.acquireReceivesTreeUpdates({ rootId: this.projectRootId, segments: [] })
const files = await this.scanSourceFiles()
moduleOpenPromises = this.indexDoc.doc.transact(
() =>
files.map((file) => this.getModuleModel(pushPathSegment(file.path, file.name)).open()),
this,
)
await Promise.all(moduleOpenPromises)
} catch (error) {
console.error('LS initialization failed.')
throw error
}
console.log('LS connection initialized.')
private async readInitialState(): Promise<Result<void>> {
return await withContext(
() => 'When reading initial state',
async () => {
let moduleOpenPromises: Promise<Result<void>>[] = []
const projectRoot = (await this.ls.contentRoots).find((root) => root.type === 'Project')
if (!projectRoot) return Err('Missing project root')
this.projectRootId = projectRoot.id
const aquireResult = await this.ls.acquireReceivesTreeUpdates({
rootId: this.projectRootId,
segments: [],
})
if (!aquireResult.ok) return aquireResult
const files = await this.scanSourceFiles()
if (!files.ok) return files
moduleOpenPromises = this.indexDoc.doc.transact(
() =>
files.value.map((file) =>
this.getModuleModel(pushPathSegment(file.path, file.name)).open(),
),
this,
)
const results = await Promise.all(moduleOpenPromises)
return results.find((res) => !res.ok) ?? Ok()
},
)
}
async scanSourceFiles() {
this.assertProjectRoot()
const sourceDir: Path = { rootId: this.projectRootId, segments: [SOURCE_DIR] }
const srcModules = await this.ls.listFiles(sourceDir)
return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith(EXTENSION))
if (!srcModules.ok) return srcModules
return Ok(
srcModules.value.paths.filter(
(file) => file.type === 'File' && file.name.endsWith(EXTENSION),
),
)
}
tryGetExistingModuleModel(path: Path): ModulePersistence | undefined {
@ -341,51 +348,68 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
/** Set the current state to the given state while the callback is running.
* Set the current state back to {@link LsSyncState.Synchronized} when the callback finishes. */
private async withState(state: LsSyncState, callback: () => void | Promise<void>) {
private async withState(state: LsSyncState, callback: () => void | Promise<void>): Promise<void>
private async withState(
state: LsSyncState,
callback: () => Result<void> | Promise<Result<void>>,
): Promise<Result<void>>
private async withState(
state: LsSyncState,
callback: () => void | Promise<void> | Result<void> | Promise<Result<void>>,
): Promise<Result<void> | void> {
this.setState(state)
await callback()
const result = await callback()
if (result && !result.ok) return result
this.setState(LsSyncState.Synchronized)
if (result) return result
}
async open() {
this.queuedAction = LsAction.Open
switch (this.state) {
case LsSyncState.Disposed:
case LsSyncState.WritingFile:
case LsSyncState.Synchronized:
case LsSyncState.WriteError:
case LsSyncState.Reloading: {
return
}
case LsSyncState.Closing: {
await this.lastAction
if (this.queuedAction === LsAction.Open) await this.open()
return
}
case LsSyncState.Opening: {
await this.lastAction
return
}
case LsSyncState.Closed: {
await this.withState(LsSyncState.Opening, async () => {
const promise = this.ls.openTextFile(this.path)
this.setLastAction(promise.catch(() => this.setState(LsSyncState.Closed)))
const result = await promise
if (!result.writeCapability) {
console.error('Could not acquire write capability for module:', this.path)
throw new Error(
`Could not acquire write capability for module '${this.path.segments.join('/')}'`,
)
async open(): Promise<Result<void>> {
return await withContext(
() => `When opening module ${this.path}`,
async () => {
this.queuedAction = LsAction.Open
switch (this.state) {
case LsSyncState.Disposed:
case LsSyncState.WritingFile:
case LsSyncState.Synchronized:
case LsSyncState.WriteError:
case LsSyncState.Reloading: {
return Ok()
}
this.syncFileContents(result.content, result.currentVersion)
})
return
}
default: {
this.state satisfies never
return
}
}
case LsSyncState.Closing: {
await this.lastAction
if (this.queuedAction === LsAction.Open) return await this.open()
return Ok()
}
case LsSyncState.Opening: {
await this.lastAction
return Ok()
}
case LsSyncState.Closed: {
await this.withState(LsSyncState.Opening, async () => {
const promise = this.ls.openTextFile(this.path)
this.setLastAction(
promise.then((res) => !res.ok && this.setState(LsSyncState.Closed)),
)
const result = await promise
if (!result.ok) return result
if (!result.value.writeCapability) {
return Err(
`Could not acquire write capability for module '${this.path.segments.join('/')}'`,
)
}
this.syncFileContents(result.value.content, result.value.currentVersion)
return Ok()
})
return Ok()
}
default: {
assertNever(this.state)
}
}
},
)
}
handleFileRemoved() {
@ -614,62 +638,61 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
return
}
default: {
this.state satisfies never
return
assertNever(this.state)
}
}
}
async reload() {
this.queuedAction = LsAction.Reload
switch (this.state) {
case LsSyncState.Opening:
case LsSyncState.Disposed:
case LsSyncState.Closed:
case LsSyncState.Closing: {
return
}
case LsSyncState.Reloading: {
await this.lastAction
return
}
case LsSyncState.WritingFile: {
await this.lastAction
if (this.queuedAction === LsAction.Reload) await this.reload()
return
}
case LsSyncState.Synchronized: {
this.withState(LsSyncState.Reloading, async () => {
const promise = Promise.all([
this.ls.readFile(this.path),
this.ls.fileChecksum(this.path),
])
this.setLastAction(promise)
const [contents, checksum] = await promise
this.syncFileContents(contents.contents, checksum.checksum)
})
return
}
case LsSyncState.WriteError: {
this.withState(LsSyncState.Reloading, async () => {
const path = this.path.segments.join('/')
const reloading = this.ls
.closeTextFile(this.path)
.catch((error) => {
console.error('Could not close file after write error:')
console.error(error)
async reload(): Promise<Result<void>> {
return await withContext(
() => `When reloading module ${this.path}`,
async () => {
this.queuedAction = LsAction.Reload
switch (this.state) {
case LsSyncState.Opening:
case LsSyncState.Disposed:
case LsSyncState.Closed:
case LsSyncState.Closing: {
return Ok()
}
case LsSyncState.Reloading: {
await this.lastAction
return Ok()
}
case LsSyncState.WritingFile: {
await this.lastAction
if (this.queuedAction === LsAction.Reload) return await this.reload()
return Ok()
}
case LsSyncState.Synchronized: {
return this.withState(LsSyncState.Reloading, async () => {
const promise = Promise.all([
this.ls.readFile(this.path),
this.ls.fileChecksum(this.path),
])
this.setLastAction(promise)
const [contents, checksum] = await promise
if (!contents.ok) return contents
if (!checksum.ok) return checksum
this.syncFileContents(contents.value.contents, checksum.value.checksum)
return Ok()
})
.then(
() =>
exponentialBackoff(
}
case LsSyncState.WriteError: {
return this.withState(LsSyncState.Reloading, async () => {
const path = this.path.segments.join('/')
const reloading = this.ls.closeTextFile(this.path).then(async (closing) => {
if (!closing.ok) closing.error.log('Could not close file after write error:')
return exponentialBackoff(
async () => {
const result = await this.ls.openTextFile(this.path)
if (!result.writeCapability) {
const message = `Could not acquire write capability for module '${this.path.segments.join(
'/',
)}'`
console.error(message)
throw new Error(message)
if (!result.ok) return result
if (!result.value.writeCapability) {
return Err(
`Could not acquire write capability for module '${this.path.segments.join(
'/',
)}'`,
)
}
return result
},
@ -677,25 +700,22 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
`opened file '${path}' for writing`,
`open file '${path}' for writing`,
),
),
(error) => {
console.error('Could not reopen file after write error:')
console.error(error)
// This error is unrecoverable.
throw error
},
)
this.setLastAction(reloading)
const result = await reloading
this.syncFileContents(result.content, result.currentVersion)
})
return
}
default: {
this.state satisfies never
return
}
}
)
})
this.setLastAction(reloading)
const result = await reloading
if (!result.ok) return result
this.syncFileContents(result.value.content, result.value.currentVersion)
return Ok()
})
}
default: {
assertNever(this.state)
}
}
},
)
}
async dispose(): Promise<void> {
@ -703,6 +723,9 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
const alreadyClosed = this.inState(LsSyncState.Closing, LsSyncState.Closed)
this.setState(LsSyncState.Disposed)
if (alreadyClosed) return Promise.resolve()
return this.ls.closeTextFile(this.path)
const closing = await this.ls.closeTextFile(this.path)
if (!closing.ok) {
closing.error.log(`Closing text file ${this.path}`)
}
}
}

20
package-lock.json generated
View File

@ -52,6 +52,7 @@
"lib0": "^0.2.85",
"magic-string": "^0.30.3",
"murmurhash": "^2.0.1",
"partysocket": "^1.0.1",
"pinia": "^2.1.7",
"postcss-inline-svg": "^6.0.0",
"postcss-nesting": "^12.0.1",
@ -11193,6 +11194,17 @@
"node": ">=4.0.0"
}
},
"node_modules/event-target-shim": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-6.0.2.tgz",
"integrity": "sha512-8q3LsZjRezbFZ2PN+uP+Q7pnHUMmAOziU2vA2OwoFaKIXxlxl38IylhSSgUorWu/rf4er67w0ikBqjBFk/pomA==",
"engines": {
"node": ">=10.13.0"
},
"funding": {
"url": "https://github.com/sponsors/mysticatea"
}
},
"node_modules/events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
@ -15957,6 +15969,14 @@
"node": ">= 0.8"
}
},
"node_modules/partysocket": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/partysocket/-/partysocket-1.0.1.tgz",
"integrity": "sha512-sSnLf9X0Oaxw0wXp0liKho0QQqStDJB5I4ViaqmtI4nHm6cpb2kUealErPrcQpYUF6zgTHzLQhIO++2tcJc59A==",
"dependencies": {
"event-target-shim": "^6.0.2"
}
},
"node_modules/pascal-case": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/pascal-case/-/pascal-case-3.1.2.tgz",