mirror of
https://github.com/enso-org/enso.git
synced 2025-01-03 21:12:11 +03:00
cbf7248370
- Closes #8398 # Important Notes - The original error caused by a failing `text/openFile` (`openTextFile`) is still present, but (seemingly?) harder to repro now
220 lines
6.9 KiB
TypeScript
220 lines
6.9 KiB
TypeScript
import {
|
|
applyAwarenessUpdate,
|
|
Awareness,
|
|
encodeAwarenessUpdate,
|
|
removeAwarenessStates,
|
|
} from 'y-protocols/awareness'
|
|
import { readSyncMessage, writeSyncStep1, writeUpdate } from 'y-protocols/sync'
|
|
import * as Y from 'yjs'
|
|
|
|
import * as decoding from 'lib0/decoding'
|
|
import * as encoding from 'lib0/encoding'
|
|
import { ObservableV2 } from 'lib0/observable'
|
|
import { WebSocket } from 'ws'
|
|
import { LanguageServerSession } from './languageServerSession'
|
|
|
|
const pingTimeout = 30000
|
|
|
|
const messageSync = 0
|
|
const messageAwareness = 1
|
|
|
|
interface AwarenessUpdate {
|
|
added: number[]
|
|
updated: number[]
|
|
removed: number[]
|
|
}
|
|
|
|
type ConnectionId = YjsConnection | string
|
|
|
|
/**
|
|
* A Yjs document that is shared over multiple websocket connections.
|
|
*/
|
|
export class WSSharedDoc {
|
|
doc: Y.Doc
|
|
/**
|
|
* Maps from connection id to set of controlled user ids.
|
|
* Delete all user ids from awareness when this conn is closed.
|
|
*/
|
|
conns: Map<ConnectionId, Set<number>>
|
|
awareness: Awareness
|
|
|
|
constructor(gc = true) {
|
|
this.doc = new Y.Doc({ gc })
|
|
// this.name = name
|
|
this.conns = new Map()
|
|
|
|
this.awareness = new Awareness(this.doc)
|
|
this.awareness.setLocalState(null)
|
|
|
|
this.awareness.on(
|
|
'update',
|
|
({ added, updated, removed }: AwarenessUpdate, conn: ConnectionId | null) => {
|
|
const changedClients = added.concat(updated, removed)
|
|
if (conn !== null) {
|
|
const connControlledIDs = this.conns.get(conn)
|
|
if (connControlledIDs !== undefined) {
|
|
for (const clientID of added) connControlledIDs.add(clientID)
|
|
for (const clientID of removed) connControlledIDs.delete(clientID)
|
|
}
|
|
}
|
|
// broadcast awareness update
|
|
const encoder = encoding.createEncoder()
|
|
encoding.writeVarUint(encoder, messageAwareness)
|
|
const update = encodeAwarenessUpdate(this.awareness, changedClients)
|
|
encoding.writeVarUint8Array(encoder, update)
|
|
this.broadcast(encoding.toUint8Array(encoder))
|
|
},
|
|
)
|
|
this.doc.on('update', (update, origin) => this.updateHandler(update, origin))
|
|
}
|
|
|
|
broadcast(message: Uint8Array) {
|
|
for (const [conn] of this.conns) {
|
|
if (typeof conn !== 'string') conn.send(message)
|
|
}
|
|
}
|
|
|
|
updateHandler(update: Uint8Array, _origin: any) {
|
|
const encoder = encoding.createEncoder()
|
|
encoding.writeVarUint(encoder, messageSync)
|
|
writeUpdate(encoder, update)
|
|
this.broadcast(encoding.toUint8Array(encoder))
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle servicing incoming WebSocket connection listening for given document updates.
|
|
* @param ws The newly connected WebSocket requesting Yjs document synchronization
|
|
* @param lsUrl Address of the language server to connect to. Each unique language server address
|
|
* will be assigned its own `DistributedProject` instance with a unique namespace of Yjs documents.
|
|
* @param docName The name of the document to synchronize. When the document name is `index`, the
|
|
* document is considered to be the root document of the `DistributedProject` data model.
|
|
*/
|
|
export function setupGatewayClient(ws: WebSocket, lsUrl: string, docName: string) {
|
|
const lsSession = LanguageServerSession.get(lsUrl)
|
|
const wsDoc = lsSession.getYDoc(docName)
|
|
if (wsDoc == null) {
|
|
console.error(`Document '${docName}' not found in language server session '${lsUrl}'.`)
|
|
ws.close()
|
|
return
|
|
}
|
|
const connection = new YjsConnection(ws, wsDoc)
|
|
connection.once('close', async () => {
|
|
try {
|
|
await lsSession.release()
|
|
} catch (error) {
|
|
console.error('Session release failed.\n', error)
|
|
}
|
|
})
|
|
}
|
|
|
|
class YjsConnection extends ObservableV2<{ close(): void }> {
|
|
ws: WebSocket
|
|
wsDoc: WSSharedDoc
|
|
constructor(ws: WebSocket, wsDoc: WSSharedDoc) {
|
|
super()
|
|
this.ws = ws
|
|
this.wsDoc = wsDoc
|
|
const isLoaded = wsDoc.conns.size > 0
|
|
wsDoc.conns.set(this, new Set())
|
|
ws.binaryType = 'arraybuffer'
|
|
ws.on('message', (message: ArrayBuffer) => this.messageListener(new Uint8Array(message)))
|
|
ws.on('close', () => this.close())
|
|
if (!isLoaded) wsDoc.doc.load()
|
|
this.initPing()
|
|
this.sendSyncMessage()
|
|
}
|
|
|
|
private initPing() {
|
|
// Check if connection is still alive
|
|
let pongReceived = true
|
|
const pingInterval = setInterval(() => {
|
|
if (!pongReceived) {
|
|
if (this.wsDoc.conns.has(this)) this.close()
|
|
clearInterval(pingInterval)
|
|
} else if (this.wsDoc.conns.has(this)) {
|
|
pongReceived = false
|
|
try {
|
|
this.ws.ping()
|
|
} catch (error) {
|
|
console.error('Error sending ping:', error)
|
|
this.close()
|
|
clearInterval(pingInterval)
|
|
}
|
|
}
|
|
}, pingTimeout)
|
|
this.ws.on('close', () => clearInterval(pingInterval))
|
|
this.ws.on('pong', () => (pongReceived = true))
|
|
}
|
|
|
|
sendSyncMessage() {
|
|
const encoder = encoding.createEncoder()
|
|
encoding.writeVarUint(encoder, messageSync)
|
|
writeSyncStep1(encoder, this.wsDoc.doc)
|
|
this.send(encoding.toUint8Array(encoder))
|
|
const awarenessStates = this.wsDoc.awareness.getStates()
|
|
if (awarenessStates.size > 0) {
|
|
const encoder = encoding.createEncoder()
|
|
encoding.writeVarUint(encoder, messageAwareness)
|
|
encoding.writeVarUint8Array(
|
|
encoder,
|
|
encodeAwarenessUpdate(this.wsDoc.awareness, Array.from(awarenessStates.keys())),
|
|
)
|
|
this.send(encoding.toUint8Array(encoder))
|
|
}
|
|
}
|
|
|
|
send(message: Uint8Array) {
|
|
if (this.ws.readyState !== WebSocket.CONNECTING && this.ws.readyState !== WebSocket.OPEN) {
|
|
this.close()
|
|
}
|
|
try {
|
|
this.ws.send(message, (error) => error && this.close())
|
|
} catch (e) {
|
|
this.close()
|
|
}
|
|
}
|
|
|
|
messageListener(message: Uint8Array) {
|
|
try {
|
|
const encoder = encoding.createEncoder()
|
|
const decoder = decoding.createDecoder(message)
|
|
const messageType = decoding.readVarUint(decoder)
|
|
switch (messageType) {
|
|
case messageSync: {
|
|
encoding.writeVarUint(encoder, messageSync)
|
|
readSyncMessage(decoder, encoder, this.wsDoc.doc, this)
|
|
// If the `encoder` only contains the type of reply message and no
|
|
// message, there is no need to send the message. When `encoder` only
|
|
// contains the type of reply, its length is 1.
|
|
if (encoding.length(encoder) > 1) {
|
|
this.send(encoding.toUint8Array(encoder))
|
|
}
|
|
break
|
|
}
|
|
case messageAwareness: {
|
|
const update = decoding.readVarUint8Array(decoder)
|
|
applyAwarenessUpdate(this.wsDoc.awareness, update, this)
|
|
break
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error(err)
|
|
this.wsDoc.doc.emit('error', [err])
|
|
}
|
|
}
|
|
|
|
close() {
|
|
const controlledIds = this.wsDoc.conns.get(this)
|
|
this.wsDoc.conns.delete(this)
|
|
if (controlledIds != null) {
|
|
removeAwarenessStates(this.wsDoc.awareness, Array.from(controlledIds), null)
|
|
}
|
|
this.ws.close()
|
|
this.emit('close', [])
|
|
if (this.wsDoc.conns.size === 0) {
|
|
this.wsDoc.doc.emit('unload', [])
|
|
}
|
|
}
|
|
}
|