mirror of
https://github.com/enso-org/enso.git
synced 2024-12-23 10:42:05 +03:00
Fixes for Language Server sync server (#8514)
- Closes #8398 # Important Notes - The original error caused by a failing `text/openFile` (`openTextFile`) is still present, but (seemingly?) harder to repro now
This commit is contained in:
parent
23e0bafc75
commit
cbf7248370
54
app/gui2/shared/__tests__/yjsModel.test.ts
Normal file
54
app/gui2/shared/__tests__/yjsModel.test.ts
Normal file
@ -0,0 +1,54 @@
|
||||
import { rangeEncloses, rangeIntersects, type ContentRange } from 'shared/yjsModel'
|
||||
import { expect, test } from 'vitest'
|
||||
|
||||
type RangeTest = { a: ContentRange; b: ContentRange }
|
||||
|
||||
const equalRanges: RangeTest[] = [
|
||||
{ a: [0, 0], b: [0, 0] },
|
||||
{ a: [0, 1], b: [0, 1] },
|
||||
{ a: [-5, 5], b: [-5, 5] },
|
||||
]
|
||||
|
||||
const totalOverlap: RangeTest[] = [
|
||||
{ a: [0, 1], b: [0, 0] },
|
||||
{ a: [0, 2], b: [2, 2] },
|
||||
{ a: [-1, 1], b: [1, 1] },
|
||||
{ a: [0, 2], b: [0, 1] },
|
||||
{ a: [-10, 10], b: [-3, 7] },
|
||||
{ a: [0, 5], b: [1, 2] },
|
||||
{ a: [3, 5], b: [3, 4] },
|
||||
]
|
||||
|
||||
const reverseTotalOverlap: RangeTest[] = totalOverlap.map(({ a, b }) => ({ a: b, b: a }))
|
||||
|
||||
const noOverlap: RangeTest[] = [
|
||||
{ a: [0, 1], b: [2, 3] },
|
||||
{ a: [0, 1], b: [-1, -1] },
|
||||
{ a: [5, 6], b: [2, 3] },
|
||||
{ a: [0, 2], b: [-2, -1] },
|
||||
{ a: [-5, -3], b: [9, 10] },
|
||||
{ a: [-3, 2], b: [3, 4] },
|
||||
]
|
||||
|
||||
const partialOverlap: RangeTest[] = [
|
||||
{ a: [0, 3], b: [-1, 1] },
|
||||
{ a: [0, 1], b: [-1, 0] },
|
||||
{ a: [0, 0], b: [-1, 0] },
|
||||
{ a: [0, 2], b: [1, 4] },
|
||||
{ a: [-8, 0], b: [0, 10] },
|
||||
]
|
||||
|
||||
test.each([...equalRanges, ...totalOverlap])('Range $a should enclose $b', ({ a, b }) =>
|
||||
expect(rangeEncloses(a, b)).toBe(true),
|
||||
)
|
||||
test.each([...noOverlap, ...partialOverlap, ...reverseTotalOverlap])(
|
||||
'Range $a should not enclose $b',
|
||||
({ a, b }) => expect(rangeEncloses(a, b)).toBe(false),
|
||||
)
|
||||
test.each([...equalRanges, ...totalOverlap, ...reverseTotalOverlap, ...partialOverlap])(
|
||||
'Range $a should intersect $b',
|
||||
({ a, b }) => expect(rangeIntersects(a, b)).toBe(true),
|
||||
)
|
||||
test.each([...noOverlap])('Range $a should not intersect $b', ({ a, b }) =>
|
||||
expect(rangeIntersects(a, b)).toBe(false),
|
||||
)
|
@ -132,14 +132,14 @@ export class LanguageServer extends ObservableV2<Notifications> {
|
||||
console.dir(params)
|
||||
}
|
||||
return await this.client.request({ method, params }, RPC_TIMEOUT_MS)
|
||||
} catch (e) {
|
||||
const remoteError = RemoteRpcErrorSchema.safeParse(e)
|
||||
} catch (error) {
|
||||
const remoteError = RemoteRpcErrorSchema.safeParse(error)
|
||||
if (remoteError.success) {
|
||||
throw new LsRpcError(new RemoteRpcError(remoteError.data), method, params)
|
||||
} else if (e instanceof Error) {
|
||||
throw new LsRpcError(e, method, params)
|
||||
} else if (error instanceof Error) {
|
||||
throw new LsRpcError(error, method, params)
|
||||
}
|
||||
throw e
|
||||
throw error
|
||||
} finally {
|
||||
if (DEBUG_LOG_RPC) {
|
||||
console.log(`LS [${uuid}] ${method} took ${performance.now() - now}ms`)
|
||||
@ -402,27 +402,29 @@ export class LanguageServer extends ObservableV2<Notifications> {
|
||||
retry: <T>(cb: () => Promise<T>) => Promise<T> = (f) => f(),
|
||||
) {
|
||||
let running = true
|
||||
;(async () => {
|
||||
this.on('file/event', callback)
|
||||
walkFs(this, { rootId, segments }, (type, path) => {
|
||||
if (
|
||||
!running ||
|
||||
type !== 'File' ||
|
||||
path.segments.length < segments.length ||
|
||||
segments.some((seg, i) => seg !== path.segments[i])
|
||||
)
|
||||
return
|
||||
callback({
|
||||
path: { rootId: path.rootId, segments: path.segments.slice(segments.length) },
|
||||
kind: 'Added',
|
||||
const self = this
|
||||
return {
|
||||
promise: (async () => {
|
||||
self.on('file/event', callback)
|
||||
await retry(async () => running && self.acquireReceivesTreeUpdates({ rootId, segments }))
|
||||
await walkFs(self, { rootId, segments }, (type, path) => {
|
||||
if (
|
||||
!running ||
|
||||
type !== 'File' ||
|
||||
path.segments.length < segments.length ||
|
||||
segments.some((segment, i) => segment !== path.segments[i])
|
||||
)
|
||||
return
|
||||
callback({
|
||||
path: { rootId: path.rootId, segments: path.segments.slice(segments.length) },
|
||||
kind: 'Added',
|
||||
})
|
||||
})
|
||||
})
|
||||
await retry(() => this.acquireReceivesTreeUpdates({ rootId, segments }))
|
||||
if (!running) return
|
||||
})()
|
||||
return () => {
|
||||
running = false
|
||||
this.off('file/event', callback)
|
||||
})(),
|
||||
unsubscribe() {
|
||||
running = false
|
||||
self.off('file/event', callback)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -386,9 +386,7 @@ export namespace response {
|
||||
contentRoots: ContentRoot[]
|
||||
}
|
||||
|
||||
export interface FileContents {
|
||||
contents: TextFileContents
|
||||
}
|
||||
export interface FileContents extends TextFileContents {}
|
||||
|
||||
export interface FileExists {
|
||||
exists: boolean
|
||||
|
118
app/gui2/shared/retry.ts
Normal file
118
app/gui2/shared/retry.ts
Normal file
@ -0,0 +1,118 @@
|
||||
import { wait } from 'lib0/promise'
|
||||
|
||||
export interface BackoffOptions<E> {
|
||||
maxRetries?: number
|
||||
retryDelay?: number
|
||||
retryDelayMultiplier?: number
|
||||
retryDelayMax?: number
|
||||
/** Called when the promise throws an error, and the next retry is about to be attempted.
|
||||
* When this function returns `false`, the backoff is immediately aborted. When this function
|
||||
* is not provided, the backoff will always continue until the maximum number of retries
|
||||
* is reached. * */
|
||||
onBeforeRetry?: (
|
||||
error: E,
|
||||
retryCount: number,
|
||||
maxRetries: number,
|
||||
delay: number,
|
||||
) => boolean | void
|
||||
/** Called right before returning. */
|
||||
onSuccess?: (retryCount: number) => void
|
||||
/** Called after the final retry, right before throwing an error.
|
||||
* Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the
|
||||
* final retry. */
|
||||
onFailure?: (error: E, retryCount: number) => void
|
||||
}
|
||||
|
||||
const defaultBackoffOptions: Required<BackoffOptions<unknown>> = {
|
||||
maxRetries: 3,
|
||||
retryDelay: 1000,
|
||||
retryDelayMultiplier: 2,
|
||||
retryDelayMax: 10000,
|
||||
onBeforeRetry: () => {},
|
||||
onSuccess: () => {},
|
||||
onFailure: () => {},
|
||||
}
|
||||
|
||||
/** Retry a failing promise function with exponential backoff. */
|
||||
export async function exponentialBackoff<T, E>(
|
||||
f: () => Promise<T>,
|
||||
backoffOptions?: BackoffOptions<E>,
|
||||
): Promise<T> {
|
||||
const {
|
||||
maxRetries,
|
||||
retryDelay,
|
||||
retryDelayMultiplier,
|
||||
retryDelayMax,
|
||||
onBeforeRetry,
|
||||
onSuccess,
|
||||
onFailure,
|
||||
} = {
|
||||
...defaultBackoffOptions,
|
||||
...backoffOptions,
|
||||
}
|
||||
for (
|
||||
let retries = 0, delay = retryDelay;
|
||||
;
|
||||
retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier)
|
||||
) {
|
||||
try {
|
||||
const result = await f()
|
||||
onSuccess(retries)
|
||||
return result
|
||||
} catch (error) {
|
||||
if (retries >= maxRetries) {
|
||||
onFailure(error as E, retries)
|
||||
throw error
|
||||
}
|
||||
if (onBeforeRetry(error as E, retries, maxRetries, delay) === false) throw error
|
||||
await wait(delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function defaultOnBeforeRetry(
|
||||
description: string,
|
||||
): NonNullable<BackoffOptions<any>['onBeforeRetry']> {
|
||||
return (error, retryCount, maxRetries, delay) => {
|
||||
console.error(
|
||||
'Could not ' +
|
||||
description +
|
||||
` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`,
|
||||
)
|
||||
console.error(error)
|
||||
}
|
||||
}
|
||||
|
||||
export function defaultOnFailure(
|
||||
description: string,
|
||||
): NonNullable<BackoffOptions<any>['onFailure']> {
|
||||
return (error, retryCount) => {
|
||||
console.error(
|
||||
'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`,
|
||||
)
|
||||
console.error(error)
|
||||
}
|
||||
}
|
||||
|
||||
export function defaultOnSuccess(
|
||||
description: string,
|
||||
): NonNullable<BackoffOptions<any>['onSuccess']> {
|
||||
return (retryCount) => {
|
||||
if (retryCount === 0) return
|
||||
console.info(
|
||||
'Successfully ' +
|
||||
description +
|
||||
` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/** @param successDescription Should be in past tense, without an initial capital letter.
|
||||
* @param errorDescription Should be in present tense, without an initial capital letter. */
|
||||
export function printingCallbacks(successDescription: string, errorDescription: string) {
|
||||
return {
|
||||
onBeforeRetry: defaultOnBeforeRetry(errorDescription),
|
||||
onSuccess: defaultOnSuccess(successDescription),
|
||||
onFailure: defaultOnFailure(errorDescription),
|
||||
} satisfies BackoffOptions<unknown>
|
||||
}
|
@ -215,7 +215,8 @@ export class DistributedModule {
|
||||
}
|
||||
}
|
||||
|
||||
export type RelativeRange = [Y.RelativePosition, Y.RelativePosition]
|
||||
export type SourceRange = readonly [start: number, end: number]
|
||||
export type RelativeRange = [start: Y.RelativePosition, end: Y.RelativePosition]
|
||||
|
||||
/**
|
||||
* Accessor for the ID map stored in shared yjs map as relative ranges. Synchronizes the ranges
|
||||
@ -260,45 +261,39 @@ export class IdMap {
|
||||
return new IdMap(map, text)
|
||||
}
|
||||
|
||||
public static keyForRange(range: readonly [number, number]): string {
|
||||
public static keyForRange(range: SourceRange): string {
|
||||
return `${range[0].toString(16)}:${range[1].toString(16)}`
|
||||
}
|
||||
|
||||
public static rangeForKey(key: string): [number, number] {
|
||||
public static rangeForKey(key: string): SourceRange {
|
||||
return key.split(':').map((x) => parseInt(x, 16)) as [number, number]
|
||||
}
|
||||
|
||||
private modelToIndices(rangeBuffer: Uint8Array): [number, number] | null {
|
||||
const [relStart, relEnd] = decodeRange(rangeBuffer)
|
||||
const start = Y.createAbsolutePositionFromRelativePosition(relStart, this.doc)
|
||||
const end = Y.createAbsolutePositionFromRelativePosition(relEnd, this.doc)
|
||||
if (start == null || end == null) return null
|
||||
private modelToIndices(rangeBuffer: Uint8Array): SourceRange | null {
|
||||
const [relativeStart, relativeEnd] = decodeRange(rangeBuffer)
|
||||
const start = Y.createAbsolutePositionFromRelativePosition(relativeStart, this.doc)
|
||||
const end = Y.createAbsolutePositionFromRelativePosition(relativeEnd, this.doc)
|
||||
if (!start || !end) return null
|
||||
return [start.index, end.index]
|
||||
}
|
||||
|
||||
insertKnownId(range: [number, number], id: ExprId) {
|
||||
if (this.finished) {
|
||||
throw new Error('IdMap already finished')
|
||||
}
|
||||
|
||||
insertKnownId(range: SourceRange, id: ExprId) {
|
||||
if (this.finished) throw new Error('IdMap already finished')
|
||||
const key = IdMap.keyForRange(range)
|
||||
this.rangeToExpr.set(key, id)
|
||||
this.accessed.add(id)
|
||||
}
|
||||
|
||||
getIfExist(range: readonly [number, number]): ExprId | undefined {
|
||||
getIfExist(range: SourceRange): ExprId | undefined {
|
||||
const key = IdMap.keyForRange(range)
|
||||
return this.rangeToExpr.get(key)
|
||||
}
|
||||
|
||||
getOrInsertUniqueId(range: readonly [number, number]): ExprId {
|
||||
if (this.finished) {
|
||||
throw new Error('IdMap already finished')
|
||||
}
|
||||
|
||||
getOrInsertUniqueId(range: SourceRange): ExprId {
|
||||
if (this.finished) throw new Error('IdMap already finished')
|
||||
const key = IdMap.keyForRange(range)
|
||||
const val = this.rangeToExpr.get(key)
|
||||
if (val !== undefined) {
|
||||
if (val) {
|
||||
this.accessed.add(val)
|
||||
return val
|
||||
} else {
|
||||
@ -313,8 +308,8 @@ export class IdMap {
|
||||
return this.accessed
|
||||
}
|
||||
|
||||
toRawRanges(): Record<string, [number, number]> {
|
||||
const ranges: Record<string, [number, number]> = {}
|
||||
toRawRanges(): Record<string, SourceRange> {
|
||||
const ranges: Record<string, SourceRange> = {}
|
||||
for (const [key, expr] of this.rangeToExpr.entries()) {
|
||||
ranges[expr] = IdMap.rangeForKey(key)
|
||||
}
|
||||
@ -328,14 +323,9 @@ export class IdMap {
|
||||
* Can be called at most once. After calling this method, the ID map is no longer usable.
|
||||
*/
|
||||
finishAndSynchronize(): typeof this.yMap {
|
||||
if (this.finished) {
|
||||
throw new Error('IdMap already finished')
|
||||
}
|
||||
if (this.finished) throw new Error('IdMap already finished')
|
||||
this.finished = true
|
||||
|
||||
const doc = this.doc
|
||||
|
||||
doc.transact(() => {
|
||||
this.doc.transact(() => {
|
||||
this.yMap.forEach((_, expr) => {
|
||||
// Expressions that were accessed and present in the map are guaranteed to match. There is
|
||||
// no mechanism for modifying them, so we don't need to check for equality. We only need to
|
||||
@ -385,7 +375,7 @@ export function isUuid(x: unknown): x is Uuid {
|
||||
}
|
||||
|
||||
/** A range represented as start and end indices. */
|
||||
export type ContentRange = [number, number]
|
||||
export type ContentRange = [start: number, end: number]
|
||||
|
||||
export function rangeEquals(a: ContentRange, b: ContentRange): boolean {
|
||||
return a[0] == b[0] && a[1] == b[1]
|
||||
@ -403,58 +393,3 @@ export function rangeIntersects(a: ContentRange, b: ContentRange): boolean {
|
||||
export function rangeIsBefore(a: ContentRange, b: ContentRange): boolean {
|
||||
return a[1] <= b[0]
|
||||
}
|
||||
|
||||
if (import.meta.vitest) {
|
||||
const { test, expect } = import.meta.vitest
|
||||
type RangeTest = { a: ContentRange; b: ContentRange }
|
||||
|
||||
const equalRanges: RangeTest[] = [
|
||||
{ a: [0, 0], b: [0, 0] },
|
||||
{ a: [0, 1], b: [0, 1] },
|
||||
{ a: [-5, 5], b: [-5, 5] },
|
||||
]
|
||||
|
||||
const totalOverlap: RangeTest[] = [
|
||||
{ a: [0, 1], b: [0, 0] },
|
||||
{ a: [0, 2], b: [2, 2] },
|
||||
{ a: [-1, 1], b: [1, 1] },
|
||||
{ a: [0, 2], b: [0, 1] },
|
||||
{ a: [-10, 10], b: [-3, 7] },
|
||||
{ a: [0, 5], b: [1, 2] },
|
||||
{ a: [3, 5], b: [3, 4] },
|
||||
]
|
||||
|
||||
const reverseTotalOverlap: RangeTest[] = totalOverlap.map(({ a, b }) => ({ a: b, b: a }))
|
||||
|
||||
const noOverlap: RangeTest[] = [
|
||||
{ a: [0, 1], b: [2, 3] },
|
||||
{ a: [0, 1], b: [-1, -1] },
|
||||
{ a: [5, 6], b: [2, 3] },
|
||||
{ a: [0, 2], b: [-2, -1] },
|
||||
{ a: [-5, -3], b: [9, 10] },
|
||||
{ a: [-3, 2], b: [3, 4] },
|
||||
]
|
||||
|
||||
const partialOverlap: RangeTest[] = [
|
||||
{ a: [0, 3], b: [-1, 1] },
|
||||
{ a: [0, 1], b: [-1, 0] },
|
||||
{ a: [0, 0], b: [-1, 0] },
|
||||
{ a: [0, 2], b: [1, 4] },
|
||||
{ a: [-8, 0], b: [0, 10] },
|
||||
]
|
||||
|
||||
test.each([...equalRanges, ...totalOverlap])('Range $a should enclose $b', ({ a, b }) =>
|
||||
expect(rangeEncloses(a, b)).toBe(true),
|
||||
)
|
||||
test.each([...noOverlap, ...partialOverlap, ...reverseTotalOverlap])(
|
||||
'Range $a should not enclose $b',
|
||||
({ a, b }) => expect(rangeEncloses(a, b)).toBe(false),
|
||||
)
|
||||
test.each([...equalRanges, ...totalOverlap, ...reverseTotalOverlap, ...partialOverlap])(
|
||||
'Range $a should intersect $b',
|
||||
({ a, b }) => expect(rangeIntersects(a, b)).toBe(true),
|
||||
)
|
||||
test.each([...noOverlap])('Range $a should not intersect $b', ({ a, b }) =>
|
||||
expect(rangeIntersects(a, b)).toBe(false),
|
||||
)
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ export class Uploader {
|
||||
): Promise<Uploader> {
|
||||
const roots = await contentRoots
|
||||
const projectRootId = roots.find((root) => root.type == 'Project')
|
||||
if (!projectRootId) throw new Error('Unable to find project root, uploading not possible.')
|
||||
if (!projectRootId) throw new Error('Could not find project root, uploading not possible.')
|
||||
const instance = new Uploader(
|
||||
await rpc,
|
||||
await binary,
|
||||
@ -149,34 +149,39 @@ export class Uploader {
|
||||
|
||||
private async ensureDataDirExists() {
|
||||
const exists = await this.dataDirExists()
|
||||
if (!exists) {
|
||||
await this.rpc.createFile({
|
||||
type: 'Directory',
|
||||
name: DATA_DIR_NAME,
|
||||
path: { rootId: this.projectRootId, segments: [] },
|
||||
})
|
||||
}
|
||||
if (exists) return
|
||||
await this.rpc.createFile({
|
||||
type: 'Directory',
|
||||
name: DATA_DIR_NAME,
|
||||
path: { rootId: this.projectRootId, segments: [] },
|
||||
})
|
||||
}
|
||||
|
||||
private async dataDirExists(): Promise<boolean> {
|
||||
try {
|
||||
const info = await this.rpc.fileInfo(this.dataDirPath())
|
||||
return info.attributes.kind.type == 'Directory'
|
||||
} catch (err: any) {
|
||||
if (err.cause && err.cause instanceof RemoteRpcError) {
|
||||
if ([ErrorCode.FILE_NOT_FOUND, ErrorCode.CONTENT_ROOT_NOT_FOUND].includes(err.cause.code)) {
|
||||
} catch (error) {
|
||||
if (
|
||||
typeof error === 'object' &&
|
||||
error &&
|
||||
'cause' in error &&
|
||||
error.cause instanceof RemoteRpcError
|
||||
) {
|
||||
if (
|
||||
error.cause.code === ErrorCode.FILE_NOT_FOUND ||
|
||||
error.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND
|
||||
)
|
||||
return false
|
||||
}
|
||||
}
|
||||
throw err
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private async pickUniqueName(suggestedName: string): Promise<string> {
|
||||
const files = await this.rpc.listFiles(this.dataDirPath())
|
||||
const existingNames = new Set(files.paths.map((path) => path.name))
|
||||
const [stem, maybeExtension] = splitFilename(suggestedName)
|
||||
const extension = maybeExtension ?? ''
|
||||
const { stem, extension = '' } = splitFilename(suggestedName)
|
||||
let candidate = suggestedName
|
||||
let num = 1
|
||||
while (existingNames.has(candidate)) {
|
||||
@ -190,14 +195,12 @@ export class Uploader {
|
||||
/**
|
||||
* Split filename into stem and (optional) extension.
|
||||
*/
|
||||
function splitFilename(filename: string): [string, string | null] {
|
||||
const dotIndex = filename.lastIndexOf('.')
|
||||
|
||||
function splitFilename(fileName: string): { stem: string; extension?: string } {
|
||||
const dotIndex = fileName.lastIndexOf('.')
|
||||
if (dotIndex !== -1 && dotIndex !== 0) {
|
||||
const stem = filename.substring(0, dotIndex)
|
||||
const extension = filename.substring(dotIndex + 1)
|
||||
return [stem, extension]
|
||||
const stem = fileName.substring(0, dotIndex)
|
||||
const extension = fileName.substring(dotIndex + 1)
|
||||
return { stem, extension }
|
||||
}
|
||||
|
||||
return [filename, null]
|
||||
return { stem: fileName }
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import { isUrlString } from '@/util/data/urlString'
|
||||
import { isIconName } from '@/util/iconName'
|
||||
import { rpcWithRetries } from '@/util/net'
|
||||
import { defineStore } from 'pinia'
|
||||
import { ErrorCode, LsRpcError, RemoteRpcError } from 'shared/languageServer'
|
||||
import type { Event as LSEvent, VisualizationConfiguration } from 'shared/languageServerTypes'
|
||||
import type { ExprId, VisualizationIdentifier } from 'shared/yjsModel'
|
||||
import { computed, reactive } from 'vue'
|
||||
@ -208,12 +209,28 @@ export const useVisualizationStore = defineStore('visualization', () => {
|
||||
}
|
||||
}
|
||||
|
||||
Promise.all([proj.lsRpcConnection, projectRoot]).then(([ls, projectRoot]) => {
|
||||
Promise.all([proj.lsRpcConnection, projectRoot]).then(async ([ls, projectRoot]) => {
|
||||
if (!projectRoot) {
|
||||
console.error('Could not load custom visualizations: Project directory not found.')
|
||||
return
|
||||
}
|
||||
ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries)
|
||||
try {
|
||||
await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries)
|
||||
.promise
|
||||
} catch (error) {
|
||||
if (
|
||||
error instanceof LsRpcError &&
|
||||
error.cause instanceof RemoteRpcError &&
|
||||
error.cause.code === ErrorCode.FILE_NOT_FOUND
|
||||
) {
|
||||
console.info(
|
||||
"'visualizations/' folder not found in project directory. " +
|
||||
"If you have custom visualizations, please put them under 'visualizations/'.",
|
||||
)
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
function* types(type: Opt<string>) {
|
||||
|
@ -40,12 +40,12 @@ export async function exponentialBackoff<T, E>(
|
||||
backoffOptions?: BackoffOptions<E>,
|
||||
): Promise<Result<T, E>> {
|
||||
const options = { ...defaultBackoffOptions, ...backoffOptions }
|
||||
for (let retries = 0; ; retries += 1) {
|
||||
for (
|
||||
let retries = 0, delay = options.retryDelay;
|
||||
;
|
||||
retries += 1, delay = Math.min(options.retryDelayMax, delay * options.retryDelayMultiplier)
|
||||
) {
|
||||
const result = await f()
|
||||
const delay = Math.min(
|
||||
options.retryDelayMax,
|
||||
options.retryDelay * options.retryDelayMultiplier ** retries,
|
||||
)
|
||||
if (
|
||||
result.ok ||
|
||||
retries >= options.maxRetries ||
|
||||
|
@ -5,12 +5,14 @@ import { ObservableV2 } from 'lib0/observable'
|
||||
import * as random from 'lib0/random'
|
||||
import * as Y from 'yjs'
|
||||
import { LanguageServer, computeTextChecksum } from '../shared/languageServer'
|
||||
import { Checksum, Path } from '../shared/languageServerTypes'
|
||||
import { Checksum, FileEdit, Path, response } from '../shared/languageServerTypes'
|
||||
import { exponentialBackoff, printingCallbacks } from '../shared/retry'
|
||||
import {
|
||||
DistributedProject,
|
||||
ExprId,
|
||||
IdMap,
|
||||
ModuleDoc,
|
||||
SourceRange,
|
||||
type NodeMetadata,
|
||||
type Uuid,
|
||||
} from '../shared/yjsModel'
|
||||
@ -23,15 +25,21 @@ import {
|
||||
import * as fileFormat from './fileFormat'
|
||||
import { WSSharedDoc } from './ydoc'
|
||||
|
||||
const sessions = new Map<string, LanguageServerSession>()
|
||||
const SOURCE_DIR = 'src'
|
||||
const EXTENSION = '.enso'
|
||||
|
||||
const DEBUG_LOG_SYNC = false
|
||||
|
||||
type Events = {
|
||||
error: (error: Error) => void
|
||||
function createOpenRPCClient(url: string) {
|
||||
const transport = new WebSocketTransport(url)
|
||||
const requestManager = new RequestManager([transport])
|
||||
transport.connection.on('error', (error) =>
|
||||
console.error('Language Server transport error:', error),
|
||||
)
|
||||
return new Client(requestManager)
|
||||
}
|
||||
|
||||
export class LanguageServerSession extends ObservableV2<Events> {
|
||||
export class LanguageServerSession {
|
||||
clientId: Uuid
|
||||
indexDoc: WSSharedDoc
|
||||
docs: Map<string, WSSharedDoc>
|
||||
@ -39,22 +47,17 @@ export class LanguageServerSession extends ObservableV2<Events> {
|
||||
url: string
|
||||
client: Client
|
||||
ls: LanguageServer
|
||||
connection: response.InitProtocolConnection | undefined
|
||||
model: DistributedProject
|
||||
projectRootId: Uuid | null
|
||||
authoritativeModules: Map<string, ModulePersistence>
|
||||
|
||||
constructor(url: string) {
|
||||
super()
|
||||
this.clientId = random.uuidv4() as Uuid
|
||||
this.docs = new Map()
|
||||
this.retainCount = 0
|
||||
this.url = url
|
||||
const transport = new WebSocketTransport(url)
|
||||
const requestManager = new RequestManager([transport])
|
||||
this.client = new Client(requestManager)
|
||||
console.log('new session with', url)
|
||||
transport.connection.on('error', (error) => this.emit('error', [error]))
|
||||
this.ls = new LanguageServer(this.client)
|
||||
this.indexDoc = new WSSharedDoc()
|
||||
this.docs.set('index', this.indexDoc)
|
||||
this.model = new DistributedProject(this.indexDoc.doc)
|
||||
@ -64,36 +67,91 @@ export class LanguageServerSession extends ObservableV2<Events> {
|
||||
this.indexDoc.doc.on('subdocs', (subdocs: { loaded: Set<Y.Doc> }) => {
|
||||
for (const doc of subdocs.loaded) {
|
||||
const name = this.model.findModuleByDocId(doc.guid)
|
||||
if (name == null) continue
|
||||
if (!name) continue
|
||||
const persistence = this.authoritativeModules.get(name)
|
||||
if (persistence == null) continue
|
||||
if (!persistence) continue
|
||||
}
|
||||
})
|
||||
const { client, ls } = this.setupClient()
|
||||
this.client = client
|
||||
this.ls = ls
|
||||
}
|
||||
|
||||
static sessions = new Map<string, LanguageServerSession>()
|
||||
static get(url: string): LanguageServerSession {
|
||||
const session = map.setIfUndefined(
|
||||
LanguageServerSession.sessions,
|
||||
url,
|
||||
() => new LanguageServerSession(url),
|
||||
)
|
||||
session.retain()
|
||||
return session
|
||||
}
|
||||
|
||||
private restartClient() {
|
||||
this.client.close()
|
||||
this.ls.destroy()
|
||||
this.connection = undefined
|
||||
this.setupClient()
|
||||
}
|
||||
|
||||
private setupClient() {
|
||||
this.client = createOpenRPCClient(this.url)
|
||||
this.ls = new LanguageServer(this.client)
|
||||
this.ls.on('file/event', async (event) => {
|
||||
if (DEBUG_LOG_SYNC) {
|
||||
console.log('file/event', event)
|
||||
}
|
||||
switch (event.kind) {
|
||||
case 'Added':
|
||||
if (isSourceFile(event.path)) {
|
||||
const fileInfo = await this.ls.fileInfo(event.path)
|
||||
if (fileInfo.attributes.kind.type == 'File') {
|
||||
this.getModuleModel(event.path).open()
|
||||
const path = event.path.segments.join('/')
|
||||
try {
|
||||
switch (event.kind) {
|
||||
case 'Added': {
|
||||
if (isSourceFile(event.path)) {
|
||||
const fileInfo = await this.ls.fileInfo(event.path)
|
||||
if (fileInfo.attributes.kind.type == 'File') {
|
||||
await exponentialBackoff(
|
||||
() => this.getModuleModel(event.path).open(),
|
||||
printingCallbacks(`opened new file '${path}'`, `open new file '${path}'`),
|
||||
)
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
break
|
||||
case 'Modified':
|
||||
this.getModuleModelIfExists(event.path)?.reload()
|
||||
break
|
||||
case 'Modified': {
|
||||
await exponentialBackoff(
|
||||
async () => this.tryGetExistingModuleModel(event.path)?.reload(),
|
||||
printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`),
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
this.restartClient()
|
||||
}
|
||||
})
|
||||
|
||||
this.ls.on('text/fileModifiedOnDisk', async (event) => {
|
||||
this.getModuleModelIfExists(event.path)?.reload()
|
||||
const path = event.path.segments.join('/')
|
||||
try {
|
||||
await exponentialBackoff(
|
||||
async () => this.tryGetExistingModuleModel(event.path)?.reload(),
|
||||
printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`),
|
||||
)
|
||||
} catch {
|
||||
this.restartClient()
|
||||
}
|
||||
})
|
||||
|
||||
this.readInitialState()
|
||||
exponentialBackoff(
|
||||
() => this.readInitialState(),
|
||||
printingCallbacks('read initial state', 'read initial state'),
|
||||
).catch((error) => {
|
||||
console.error('Could not read initial state.')
|
||||
console.error(error)
|
||||
exponentialBackoff(
|
||||
async () => this.restartClient(),
|
||||
printingCallbacks('restarted RPC client', 'restart RPC client'),
|
||||
)
|
||||
})
|
||||
return { client: this.client, ls: this.ls }
|
||||
}
|
||||
|
||||
private assertProjectRoot(): asserts this is { projectRootId: Uuid } {
|
||||
@ -101,39 +159,38 @@ export class LanguageServerSession extends ObservableV2<Events> {
|
||||
}
|
||||
|
||||
private async readInitialState() {
|
||||
let moduleOpenPromises: Promise<void>[] = []
|
||||
try {
|
||||
const { contentRoots } = await this.ls.initProtocolConnection(this.clientId)
|
||||
const projectRoot = contentRoots.find((root) => root.type === 'Project') ?? null
|
||||
if (projectRoot == null) throw new Error('Missing project root')
|
||||
const connection = this.connection ?? (await this.ls.initProtocolConnection(this.clientId))
|
||||
this.connection = connection
|
||||
const projectRoot = connection.contentRoots.find((root) => root.type === 'Project')
|
||||
if (!projectRoot) throw new Error('Missing project root')
|
||||
this.projectRootId = projectRoot.id
|
||||
await this.ls.acquireReceivesTreeUpdates({ rootId: this.projectRootId, segments: [] })
|
||||
const srcFiles = await this.scanSrcFiles()
|
||||
await Promise.all(
|
||||
this.indexDoc.doc.transact(() => {
|
||||
return srcFiles.map((file) =>
|
||||
this.getModuleModel(pushPathSegment(file.path, file.name)).open(),
|
||||
)
|
||||
}, this),
|
||||
const files = await this.scanSourceFiles()
|
||||
moduleOpenPromises = this.indexDoc.doc.transact(
|
||||
() =>
|
||||
files.map((file) => this.getModuleModel(pushPathSegment(file.path, file.name)).open()),
|
||||
this,
|
||||
)
|
||||
await Promise.all(moduleOpenPromises)
|
||||
} catch (error) {
|
||||
console.error('LS Initialization failed:', error)
|
||||
if (error instanceof Error) {
|
||||
this.emit('error', [error])
|
||||
}
|
||||
return
|
||||
console.error('LS initialization failed.')
|
||||
throw error
|
||||
}
|
||||
console.log('LS connection initialized.')
|
||||
}
|
||||
|
||||
async scanSrcFiles() {
|
||||
async scanSourceFiles() {
|
||||
this.assertProjectRoot()
|
||||
const srcModules = await this.ls.listFiles({ rootId: this.projectRootId, segments: ['src'] })
|
||||
return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith('.enso'))
|
||||
const sourceDir: Path = { rootId: this.projectRootId, segments: [SOURCE_DIR] }
|
||||
const srcModules = await this.ls.listFiles(sourceDir)
|
||||
return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith(EXTENSION))
|
||||
}
|
||||
|
||||
getModuleModelIfExists(path: Path): ModulePersistence | null {
|
||||
tryGetExistingModuleModel(path: Path): ModulePersistence | undefined {
|
||||
const name = pathToModuleName(path)
|
||||
return this.authoritativeModules.get(name) ?? null
|
||||
return this.authoritativeModules.get(name)
|
||||
}
|
||||
|
||||
getModuleModel(path: Path): ModulePersistence {
|
||||
@ -147,37 +204,26 @@ export class LanguageServerSession extends ObservableV2<Events> {
|
||||
const index = this.model.findModuleByDocId(wsDoc.doc.guid)
|
||||
this.docs.delete(wsDoc.doc.guid)
|
||||
this.authoritativeModules.delete(name)
|
||||
if (index != null) {
|
||||
this.model.deleteModule(index)
|
||||
}
|
||||
if (index != null) this.model.deleteModule(index)
|
||||
})
|
||||
return mod
|
||||
})
|
||||
}
|
||||
|
||||
static get(url: string): LanguageServerSession {
|
||||
const session = map.setIfUndefined(sessions, url, () => new LanguageServerSession(url))
|
||||
session.retain()
|
||||
return session
|
||||
}
|
||||
|
||||
retain() {
|
||||
this.retainCount++
|
||||
this.retainCount += 1
|
||||
}
|
||||
|
||||
release(): Promise<void> {
|
||||
this.retainCount--
|
||||
if (this.retainCount === 0) {
|
||||
const closing = Promise.all(
|
||||
Array.from(this.authoritativeModules.values(), (mod) => mod.dispose()),
|
||||
).then(() => {})
|
||||
this.authoritativeModules.clear()
|
||||
this.model.doc.destroy()
|
||||
this.ls.dispose()
|
||||
sessions.delete(this.url)
|
||||
return closing
|
||||
}
|
||||
return Promise.resolve()
|
||||
async release(): Promise<void> {
|
||||
this.retainCount -= 1
|
||||
if (this.retainCount !== 0) return
|
||||
const modules = this.authoritativeModules.values()
|
||||
const moduleDisposePromises = Array.from(modules, (mod) => mod.dispose())
|
||||
this.authoritativeModules.clear()
|
||||
this.model.doc.destroy()
|
||||
this.ls.dispose()
|
||||
LanguageServerSession.sessions.delete(this.url)
|
||||
await Promise.all(moduleDisposePromises)
|
||||
}
|
||||
|
||||
getYDoc(guid: string): WSSharedDoc | undefined {
|
||||
@ -185,23 +231,19 @@ export class LanguageServerSession extends ObservableV2<Events> {
|
||||
}
|
||||
}
|
||||
|
||||
const isSourceFile = (path: Path): boolean => {
|
||||
return path.segments[0] === 'src' && path.segments[path.segments.length - 1].endsWith('.enso')
|
||||
function isSourceFile(path: Path): boolean {
|
||||
return (
|
||||
path.segments[0] === SOURCE_DIR && path.segments[path.segments.length - 1].endsWith(EXTENSION)
|
||||
)
|
||||
}
|
||||
|
||||
const pathToModuleName = (path: Path): string => {
|
||||
if (path.segments[0] === 'src') {
|
||||
return path.segments.slice(1).join('/')
|
||||
} else {
|
||||
return '//' + path.segments.join('/')
|
||||
}
|
||||
function pathToModuleName(path: Path): string {
|
||||
if (path.segments[0] === SOURCE_DIR) return path.segments.slice(1).join('/')
|
||||
else return '//' + path.segments.join('/')
|
||||
}
|
||||
|
||||
const pushPathSegment = (path: Path, segment: string): Path => {
|
||||
return {
|
||||
rootId: path.rootId,
|
||||
segments: [...path.segments, segment],
|
||||
}
|
||||
function pushPathSegment(path: Path, segment: string): Path {
|
||||
return { rootId: path.rootId, segments: [...path.segments, segment] }
|
||||
}
|
||||
|
||||
enum LsSyncState {
|
||||
@ -225,14 +267,15 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
ls: LanguageServer
|
||||
path: Path
|
||||
doc: ModuleDoc = new ModuleDoc(new Y.Doc())
|
||||
state: LsSyncState = LsSyncState.Closed
|
||||
lastAction = Promise.resolve()
|
||||
readonly state: LsSyncState = LsSyncState.Closed
|
||||
readonly lastAction = Promise.resolve()
|
||||
updateToApply: Uint8Array | null = null
|
||||
syncedContent: string | null = null
|
||||
syncedVersion: Checksum | null = null
|
||||
syncedMeta: fileFormat.Metadata = fileFormat.tryParseMetadataOrFallback(null)
|
||||
queuedAction: LsAction | null = null
|
||||
cleanup = () => {}
|
||||
|
||||
constructor(ls: LanguageServer, path: Path, sharedDoc: Y.Doc) {
|
||||
super()
|
||||
this.ls = ls
|
||||
@ -240,9 +283,7 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
|
||||
const onRemoteUpdate = this.queueRemoteUpdate.bind(this)
|
||||
const onLocalUpdate = (update: Uint8Array, origin: unknown) => {
|
||||
if (origin === 'file') {
|
||||
Y.applyUpdate(sharedDoc, update, this)
|
||||
}
|
||||
if (origin === 'file') Y.applyUpdate(sharedDoc, update, this)
|
||||
}
|
||||
const onFileModified = this.handleFileModified.bind(this)
|
||||
const onFileRemoved = this.handleFileRemoved.bind(this)
|
||||
@ -258,6 +299,44 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
}
|
||||
}
|
||||
|
||||
private inState(...states: LsSyncState[]): boolean {
|
||||
return states.includes(this.state)
|
||||
}
|
||||
|
||||
private setState(state: LsSyncState) {
|
||||
if (this.state !== LsSyncState.Disposed) {
|
||||
if (DEBUG_LOG_SYNC) {
|
||||
console.debug('State change:', LsSyncState[this.state], '->', LsSyncState[state])
|
||||
}
|
||||
// This is SAFE. `this.state` is only `readonly` to ensure that this is the only place
|
||||
// where it is mutated.
|
||||
// @ts-expect-error
|
||||
this.state = state
|
||||
if (state === LsSyncState.Synchronized) this.trySyncRemoveUpdates()
|
||||
} else {
|
||||
throw new Error('LsSync disposed')
|
||||
}
|
||||
}
|
||||
|
||||
private setLastAction<T>(lastAction: Promise<T>) {
|
||||
// This is SAFE. `this.lastAction` is only `readonly` to ensure that this is the only place
|
||||
// where it is mutated.
|
||||
// @ts-expect-error
|
||||
this.lastAction = lastAction.then(
|
||||
() => {},
|
||||
() => {},
|
||||
)
|
||||
return lastAction
|
||||
}
|
||||
|
||||
/** Set the current state to the given state while the callback is running.
|
||||
* Set the current state back to {@link LsSyncState.Synchronized} when the callback finishes. */
|
||||
private async withState(state: LsSyncState, callback: () => void | Promise<void>) {
|
||||
this.setState(state)
|
||||
await callback()
|
||||
this.setState(LsSyncState.Synchronized)
|
||||
}
|
||||
|
||||
async open() {
|
||||
this.queuedAction = LsAction.Open
|
||||
switch (this.state) {
|
||||
@ -265,29 +344,36 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
case LsSyncState.WritingFile:
|
||||
case LsSyncState.Synchronized:
|
||||
case LsSyncState.WriteError:
|
||||
case LsSyncState.Reloading:
|
||||
case LsSyncState.Reloading: {
|
||||
return
|
||||
case LsSyncState.Closing:
|
||||
}
|
||||
case LsSyncState.Closing: {
|
||||
await this.lastAction
|
||||
if (this.queuedAction === LsAction.Open) {
|
||||
await this.open()
|
||||
}
|
||||
if (this.queuedAction === LsAction.Open) await this.open()
|
||||
return
|
||||
case LsSyncState.Opening:
|
||||
}
|
||||
case LsSyncState.Opening: {
|
||||
await this.lastAction
|
||||
return
|
||||
case LsSyncState.Closed:
|
||||
{
|
||||
this.changeState(LsSyncState.Opening)
|
||||
const opening = this.ls.openTextFile(this.path)
|
||||
this.lastAction = opening.then()
|
||||
const result = await opening
|
||||
}
|
||||
case LsSyncState.Closed: {
|
||||
await this.withState(LsSyncState.Opening, async () => {
|
||||
const promise = this.ls.openTextFile(this.path)
|
||||
this.setLastAction(promise.catch(() => this.setState(LsSyncState.Closed)))
|
||||
const result = await promise
|
||||
if (!result.writeCapability) {
|
||||
console.error('Could not acquire write capability for module:', this.path)
|
||||
throw new Error(
|
||||
`Could not acquire write capability for module '${this.path.segments.join('/')}'`,
|
||||
)
|
||||
}
|
||||
this.syncFileContents(result.content, result.currentVersion)
|
||||
this.changeState(LsSyncState.Synchronized)
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
default: {
|
||||
const _: never = this.state
|
||||
this.state satisfies never
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -308,7 +394,6 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
} else {
|
||||
this.updateToApply = update
|
||||
}
|
||||
|
||||
this.trySyncRemoveUpdates()
|
||||
}
|
||||
|
||||
@ -342,7 +427,7 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
metadataKeys: Y.YMapEvent<NodeMetadata>['keys'] | null,
|
||||
) {
|
||||
if (this.syncedContent == null || this.syncedVersion == null) return
|
||||
if (contentDelta == null && idMapKeys == null && metadataKeys == null) return
|
||||
if (!contentDelta && !idMapKeys && !metadataKeys) return
|
||||
|
||||
const { edits, newContent, newMetadata } = applyDocumentUpdates(
|
||||
this.doc,
|
||||
@ -356,17 +441,17 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
const newVersion = computeTextChecksum(newContent)
|
||||
|
||||
if (DEBUG_LOG_SYNC) {
|
||||
console.log(' === changes === ')
|
||||
console.log('number of edits:', edits.length)
|
||||
console.log('metadata:', metadataKeys)
|
||||
console.log('content:', contentDelta)
|
||||
console.log('idMap:', idMapKeys)
|
||||
console.debug(' === changes === ')
|
||||
console.debug('number of edits:', edits.length)
|
||||
console.debug('metadata:', metadataKeys)
|
||||
console.debug('content:', contentDelta)
|
||||
console.debug('idMap:', idMapKeys)
|
||||
if (edits.length > 0) {
|
||||
console.log('version:', this.syncedVersion, '->', newVersion)
|
||||
console.log('Content diff:')
|
||||
console.log(prettyPrintDiff(this.syncedContent, newContent))
|
||||
console.debug('version:', this.syncedVersion, '->', newVersion)
|
||||
console.debug('Content diff:')
|
||||
console.debug(prettyPrintDiff(this.syncedContent, newContent))
|
||||
}
|
||||
console.log(' =============== ')
|
||||
console.debug(' =============== ')
|
||||
}
|
||||
if (edits.length === 0) {
|
||||
if (newVersion !== this.syncedVersion) {
|
||||
@ -375,36 +460,30 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
return
|
||||
}
|
||||
|
||||
this.changeState(LsSyncState.WritingFile)
|
||||
this.setState(LsSyncState.WritingFile)
|
||||
|
||||
const execute = contentDelta != null || idMapKeys != null
|
||||
const apply = this.ls.applyEdit(
|
||||
{
|
||||
path: this.path,
|
||||
edits,
|
||||
oldVersion: this.syncedVersion,
|
||||
newVersion,
|
||||
},
|
||||
execute,
|
||||
)
|
||||
return (this.lastAction = apply.then(
|
||||
const edit: FileEdit = { path: this.path, edits, oldVersion: this.syncedVersion, newVersion }
|
||||
const apply = this.ls.applyEdit(edit, execute)
|
||||
const promise = apply.then(
|
||||
() => {
|
||||
this.syncedContent = newContent
|
||||
this.syncedVersion = newVersion
|
||||
this.syncedMeta = newMetadata
|
||||
this.changeState(LsSyncState.Synchronized)
|
||||
this.setState(LsSyncState.Synchronized)
|
||||
},
|
||||
(e) => {
|
||||
console.error('Failed to apply edit:', e)
|
||||
|
||||
// Try to recover by reloading the file. Drop the attempted updates, since applying them
|
||||
// have failed.
|
||||
this.changeState(LsSyncState.WriteError)
|
||||
(error) => {
|
||||
console.error('Could not apply edit:', error)
|
||||
// Try to recover by reloading the file.
|
||||
// Drop the attempted updates, since applying them have failed.
|
||||
this.setState(LsSyncState.WriteError)
|
||||
this.syncedContent = null
|
||||
this.syncedVersion = null
|
||||
return this.reload()
|
||||
},
|
||||
))
|
||||
)
|
||||
this.setLastAction(promise)
|
||||
return promise
|
||||
}
|
||||
|
||||
private syncFileContents(content: string, version: Checksum) {
|
||||
@ -416,12 +495,14 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
|
||||
const idMap = new IdMap(this.doc.idMap, this.doc.contents)
|
||||
for (const [{ index, size }, id] of idMapMeta) {
|
||||
const range = [index.value, index.value + size.value]
|
||||
if (typeof range[0] !== 'number' || typeof range[1] !== 'number') {
|
||||
const start = index.value
|
||||
const end = index.value + size.value
|
||||
const range: SourceRange = [start, end]
|
||||
if (typeof start !== 'number' || typeof end !== 'number') {
|
||||
console.error(`Invalid range for id ${id}:`, range)
|
||||
continue
|
||||
}
|
||||
idMap.insertKnownId([index.value, index.value + size.value], id as ExprId)
|
||||
idMap.insertKnownId(range, id as ExprId)
|
||||
}
|
||||
|
||||
const keysToDelete = new Set(this.doc.metadata.keys())
|
||||
@ -435,13 +516,10 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
keysToDelete.delete(id)
|
||||
this.doc.metadata.set(id, formattedMeta)
|
||||
}
|
||||
for (const id of keysToDelete) {
|
||||
this.doc.metadata.delete(id)
|
||||
}
|
||||
for (const id of keysToDelete) this.doc.metadata.delete(id)
|
||||
this.syncedContent = content
|
||||
this.syncedVersion = version
|
||||
this.syncedMeta = metadata
|
||||
|
||||
const codeDiff = simpleDiffString(this.doc.contents.toString(), code)
|
||||
this.doc.contents.delete(codeDiff.index, codeDiff.remove)
|
||||
this.doc.contents.insert(codeDiff.index, codeDiff.insert)
|
||||
@ -453,29 +531,35 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
this.queuedAction = LsAction.Close
|
||||
switch (this.state) {
|
||||
case LsSyncState.Disposed:
|
||||
case LsSyncState.Closed:
|
||||
case LsSyncState.Closed: {
|
||||
return
|
||||
case LsSyncState.Closing:
|
||||
}
|
||||
case LsSyncState.Closing: {
|
||||
await this.lastAction
|
||||
return
|
||||
}
|
||||
case LsSyncState.Opening:
|
||||
case LsSyncState.WritingFile:
|
||||
case LsSyncState.Reloading:
|
||||
case LsSyncState.Reloading: {
|
||||
await this.lastAction
|
||||
if (this.queuedAction === LsAction.Close) {
|
||||
await this.close()
|
||||
}
|
||||
return
|
||||
}
|
||||
case LsSyncState.WriteError:
|
||||
case LsSyncState.Synchronized: {
|
||||
this.changeState(LsSyncState.Closing)
|
||||
const closing = (this.lastAction = this.ls.closeTextFile(this.path))
|
||||
await closing
|
||||
this.changeState(LsSyncState.Closed)
|
||||
this.setState(LsSyncState.Closing)
|
||||
const promise = this.ls.closeTextFile(this.path)
|
||||
const state = this.state
|
||||
this.setLastAction(promise.catch(() => this.setState(state)))
|
||||
await promise
|
||||
this.setState(LsSyncState.Closed)
|
||||
return
|
||||
}
|
||||
default: {
|
||||
const _: never = this.state
|
||||
this.state satisfies never
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -486,60 +570,83 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> {
|
||||
case LsSyncState.Opening:
|
||||
case LsSyncState.Disposed:
|
||||
case LsSyncState.Closed:
|
||||
case LsSyncState.Closing:
|
||||
case LsSyncState.Closing: {
|
||||
return
|
||||
case LsSyncState.Reloading:
|
||||
}
|
||||
case LsSyncState.Reloading: {
|
||||
await this.lastAction
|
||||
return
|
||||
case LsSyncState.WritingFile:
|
||||
}
|
||||
case LsSyncState.WritingFile: {
|
||||
await this.lastAction
|
||||
if (this.queuedAction === LsAction.Reload) {
|
||||
await this.reload()
|
||||
}
|
||||
if (this.queuedAction === LsAction.Reload) await this.reload()
|
||||
return
|
||||
case LsSyncState.Synchronized:
|
||||
case LsSyncState.WriteError: {
|
||||
this.changeState(LsSyncState.Reloading)
|
||||
const reloading = this.ls.closeTextFile(this.path).then(() => {
|
||||
return this.ls.openTextFile(this.path)
|
||||
}
|
||||
case LsSyncState.Synchronized: {
|
||||
this.withState(LsSyncState.Reloading, async () => {
|
||||
const promise = Promise.all([
|
||||
this.ls.readFile(this.path),
|
||||
this.ls.fileChecksum(this.path),
|
||||
])
|
||||
this.setLastAction(promise)
|
||||
const [contents, checksum] = await promise
|
||||
this.syncFileContents(contents.contents, checksum.checksum)
|
||||
})
|
||||
return
|
||||
}
|
||||
case LsSyncState.WriteError: {
|
||||
this.withState(LsSyncState.Reloading, async () => {
|
||||
const path = this.path.segments.join('/')
|
||||
const reloading = this.ls
|
||||
.closeTextFile(this.path)
|
||||
.catch((error) => {
|
||||
console.error('Could not close file after write error:')
|
||||
console.error(error)
|
||||
})
|
||||
.then(
|
||||
() =>
|
||||
exponentialBackoff(
|
||||
async () => {
|
||||
const result = await this.ls.openTextFile(this.path)
|
||||
if (!result.writeCapability) {
|
||||
const message = `Could not acquire write capability for module '${this.path.segments.join(
|
||||
'/',
|
||||
)}'`
|
||||
console.error(message)
|
||||
throw new Error(message)
|
||||
}
|
||||
return result
|
||||
},
|
||||
printingCallbacks(
|
||||
`opened file '${path}' for writing`,
|
||||
`open file '${path}' for writing`,
|
||||
),
|
||||
),
|
||||
(error) => {
|
||||
console.error('Could not reopen file after write error:')
|
||||
console.error(error)
|
||||
// This error is unrecoverable.
|
||||
throw error
|
||||
},
|
||||
)
|
||||
this.setLastAction(reloading)
|
||||
const result = await reloading
|
||||
this.syncFileContents(result.content, result.currentVersion)
|
||||
})
|
||||
this.lastAction = reloading.then()
|
||||
const result = await reloading
|
||||
this.syncFileContents(result.content, result.currentVersion)
|
||||
this.changeState(LsSyncState.Synchronized)
|
||||
return
|
||||
}
|
||||
default: {
|
||||
const _: never = this.state
|
||||
this.state satisfies never
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private inState(...states: LsSyncState[]): boolean {
|
||||
return states.includes(this.state)
|
||||
}
|
||||
|
||||
private changeState(state: LsSyncState) {
|
||||
if (this.state !== LsSyncState.Disposed) {
|
||||
if (DEBUG_LOG_SYNC) {
|
||||
console.log('State change:', LsSyncState[this.state], '->', LsSyncState[state])
|
||||
}
|
||||
this.state = state
|
||||
if (state === LsSyncState.Synchronized) {
|
||||
this.trySyncRemoveUpdates()
|
||||
}
|
||||
} else {
|
||||
throw new Error('LsSync disposed')
|
||||
}
|
||||
}
|
||||
|
||||
dispose(): Promise<void> {
|
||||
async dispose(): Promise<void> {
|
||||
this.cleanup()
|
||||
const alreadyClosed = this.inState(LsSyncState.Closing, LsSyncState.Closed)
|
||||
this.changeState(LsSyncState.Disposed)
|
||||
if (!alreadyClosed) {
|
||||
return this.ls.closeTextFile(this.path).then()
|
||||
}
|
||||
return Promise.resolve()
|
||||
this.setState(LsSyncState.Disposed)
|
||||
if (alreadyClosed) return Promise.resolve()
|
||||
return this.ls.closeTextFile(this.path)
|
||||
}
|
||||
}
|
||||
|
@ -32,8 +32,8 @@ type ConnectionId = YjsConnection | string
|
||||
export class WSSharedDoc {
|
||||
doc: Y.Doc
|
||||
/**
|
||||
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn
|
||||
* is closed.
|
||||
* Maps from connection id to set of controlled user ids.
|
||||
* Delete all user ids from awareness when this conn is closed.
|
||||
*/
|
||||
conns: Map<ConnectionId, Set<number>>
|
||||
awareness: Awareness
|
||||
@ -53,18 +53,15 @@ export class WSSharedDoc {
|
||||
if (conn !== null) {
|
||||
const connControlledIDs = this.conns.get(conn)
|
||||
if (connControlledIDs !== undefined) {
|
||||
added.forEach((clientID) => {
|
||||
connControlledIDs.add(clientID)
|
||||
})
|
||||
removed.forEach((clientID) => {
|
||||
connControlledIDs.delete(clientID)
|
||||
})
|
||||
for (const clientID of added) connControlledIDs.add(clientID)
|
||||
for (const clientID of removed) connControlledIDs.delete(clientID)
|
||||
}
|
||||
}
|
||||
// broadcast awareness update
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, messageAwareness)
|
||||
encoding.writeVarUint8Array(encoder, encodeAwarenessUpdate(this.awareness, changedClients))
|
||||
const update = encodeAwarenessUpdate(this.awareness, changedClients)
|
||||
encoding.writeVarUint8Array(encoder, update)
|
||||
this.broadcast(encoding.toUint8Array(encoder))
|
||||
},
|
||||
)
|
||||
@ -72,11 +69,9 @@ export class WSSharedDoc {
|
||||
}
|
||||
|
||||
broadcast(message: Uint8Array) {
|
||||
this.conns.forEach((_, conn) => {
|
||||
if (typeof conn !== 'string') {
|
||||
conn.send(message)
|
||||
}
|
||||
})
|
||||
for (const [conn] of this.conns) {
|
||||
if (typeof conn !== 'string') conn.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
updateHandler(update: Uint8Array, _origin: any) {
|
||||
@ -88,8 +83,8 @@ export class WSSharedDoc {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle servicing incoming websocket connection listening for given document updates.
|
||||
* @param ws The newly connected websocket requesting Yjs document synchronization
|
||||
* Handle servicing incoming WebSocket connection listening for given document updates.
|
||||
* @param ws The newly connected WebSocket requesting Yjs document synchronization
|
||||
* @param lsUrl Address of the language server to connect to. Each unique language server address
|
||||
* will be assigned its own `DistributedProject` instance with a unique namespace of Yjs documents.
|
||||
* @param docName The name of the document to synchronize. When the document name is `index`, the
|
||||
@ -99,20 +94,16 @@ export function setupGatewayClient(ws: WebSocket, lsUrl: string, docName: string
|
||||
const lsSession = LanguageServerSession.get(lsUrl)
|
||||
const wsDoc = lsSession.getYDoc(docName)
|
||||
if (wsDoc == null) {
|
||||
console.log(`Document ${docName} not found in language server session ${lsUrl}`)
|
||||
console.error(`Document '${docName}' not found in language server session '${lsUrl}'.`)
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
const connection = new YjsConnection(ws, wsDoc)
|
||||
|
||||
const doClose = () => connection.close()
|
||||
lsSession.once('error', doClose)
|
||||
connection.once('close', async () => {
|
||||
lsSession.off('error', doClose)
|
||||
try {
|
||||
await lsSession.release()
|
||||
} catch (err) {
|
||||
console.error('Session release failed.\n', err)
|
||||
} catch (error) {
|
||||
console.error('Session release failed.\n', error)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -129,11 +120,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> {
|
||||
ws.binaryType = 'arraybuffer'
|
||||
ws.on('message', (message: ArrayBuffer) => this.messageListener(new Uint8Array(message)))
|
||||
ws.on('close', () => this.close())
|
||||
|
||||
if (!isLoaded) {
|
||||
wsDoc.doc.load()
|
||||
}
|
||||
|
||||
if (!isLoaded) wsDoc.doc.load()
|
||||
this.initPing()
|
||||
this.sendSyncMessage()
|
||||
}
|
||||
@ -143,24 +130,21 @@ class YjsConnection extends ObservableV2<{ close(): void }> {
|
||||
let pongReceived = true
|
||||
const pingInterval = setInterval(() => {
|
||||
if (!pongReceived) {
|
||||
if (this.wsDoc.conns.has(this)) {
|
||||
this.close()
|
||||
}
|
||||
if (this.wsDoc.conns.has(this)) this.close()
|
||||
clearInterval(pingInterval)
|
||||
} else if (this.wsDoc.conns.has(this)) {
|
||||
pongReceived = false
|
||||
try {
|
||||
this.ws.ping()
|
||||
} catch (e) {
|
||||
} catch (error) {
|
||||
console.error('Error sending ping:', error)
|
||||
this.close()
|
||||
clearInterval(pingInterval)
|
||||
}
|
||||
}
|
||||
}, pingTimeout)
|
||||
this.ws.on('close', () => clearInterval(pingInterval))
|
||||
this.ws.on('pong', () => {
|
||||
pongReceived = true
|
||||
})
|
||||
this.ws.on('pong', () => (pongReceived = true))
|
||||
}
|
||||
|
||||
sendSyncMessage() {
|
||||
@ -185,11 +169,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> {
|
||||
this.close()
|
||||
}
|
||||
try {
|
||||
this.ws.send(message, (error) => {
|
||||
if (error != null) {
|
||||
this.close()
|
||||
}
|
||||
})
|
||||
this.ws.send(message, (error) => error && this.close())
|
||||
} catch (e) {
|
||||
this.close()
|
||||
}
|
||||
@ -201,10 +181,9 @@ class YjsConnection extends ObservableV2<{ close(): void }> {
|
||||
const decoder = decoding.createDecoder(message)
|
||||
const messageType = decoding.readVarUint(decoder)
|
||||
switch (messageType) {
|
||||
case messageSync:
|
||||
case messageSync: {
|
||||
encoding.writeVarUint(encoder, messageSync)
|
||||
readSyncMessage(decoder, encoder, this.wsDoc.doc, this)
|
||||
|
||||
// If the `encoder` only contains the type of reply message and no
|
||||
// message, there is no need to send the message. When `encoder` only
|
||||
// contains the type of reply, its length is 1.
|
||||
@ -212,6 +191,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> {
|
||||
this.send(encoding.toUint8Array(encoder))
|
||||
}
|
||||
break
|
||||
}
|
||||
case messageAwareness: {
|
||||
const update = decoding.readVarUint8Array(decoder)
|
||||
applyAwarenessUpdate(this.wsDoc.awareness, update, this)
|
||||
|
Loading…
Reference in New Issue
Block a user