diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000000..09ec455e77 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,114 @@ +name: Build Pathfinder Self-hosted + +on: + workflow_dispatch: + +# Cancels all previous workflow runs for pull requests that have not completed. +# See https://docs.github.com/en/actions/using-jobs/using-concurrency +concurrency: + # The concurrency group contains the workflow name and the branch name for + # pull requests or the commit hash for any other events. + group: ${{ github.workflow }}-${{ github.event_name == 'pull_request' && github.head_ref || github.sha }} + cancel-in-progress: true + +jobs: + build-self-hosted: + name: Build Community + if: github.ref == 'refs/heads/master' + runs-on: self-hosted + environment: development + + steps: + - uses: actions/checkout@v2 + - uses: pnpm/action-setup@v2 + with: + version: 'latest' + + - name: Use Node.js + uses: actions/setup-node@v2 + with: + node-version: 18.x + cache: 'pnpm' + + - name: Restore cache + uses: actions/cache@v3 + with: + path: | + .next/cache + # Generate a new cache whenever packages or source files change. + key: ${{ runner.os }}-nextjs-${{ hashFiles('**/pnpm-lock.yaml') }}-${{ hashFiles('**.[jt]s', '**.[jt]sx') }} + # If source files changed but packages didn't, rebuild from a prior cache. + restore-keys: | + ${{ runner.os }}-nextjs-${{ hashFiles('**/pnpm-lock.yaml') }}- + + - name: Install dependencies + run: pnpm install + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_GITHUB_AUTH_TOKEN }} + + - name: Build + run: pnpm build + env: + NEXT_PUBLIC_FIREBASE_API_KEY: ${{ secrets.NEXT_PUBLIC_FIREBASE_API_KEY }} + NEXT_PUBLIC_FIREBASE_AUTH_DOMAIN: ${{ secrets.NEXT_PUBLIC_FIREBASE_AUTH_DOMAIN }} + NEXT_PUBLIC_FIREBASE_PROJECT_ID: ${{ secrets.NEXT_PUBLIC_FIREBASE_PROJECT_ID }} + NEXT_PUBLIC_FIREBASE_STORAGE_BUCKET: ${{ secrets.NEXT_PUBLIC_FIREBASE_STORAGE_BUCKET }} + NEXT_PUBLIC_FIREBASE_MESSAGING_SENDER_ID: ${{ secrets.NEXT_PUBLIC_FIREBASE_MESSAGING_SENDER_ID }} + NEXT_PUBLIC_FIREBASE_APP_ID: ${{ secrets.NEXT_PUBLIC_FIREBASE_APP_ID }} + NEXT_PUBLIC_FIREBASE_MEASUREMENT_ID: ${{ secrets.NEXT_PUBLIC_FIREBASE_MEASUREMENT_ID }} + + - name: Export + run: pnpm export + + - name: Upload artifact + uses: actions/upload-artifact@v3 + with: + path: ./packages/app/out + + publish-self-hosted: + name: Push Community Image + if: github.ref == 'refs/heads/master' + runs-on: ubuntu-latest + needs: build-self-hosted + + permissions: + contents: read + packages: write + + env: + REGISTRY: ghcr.io + IMAGE_NAME: 'toeverything/affine-static' + IMAGE_TAG_LATEST: abbey-wood + + steps: + - name: Check out the repo + uses: actions/checkout@v2 + + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: artifact + path: packages/app/out/ + + - name: Log in to Docker Hub + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: ${{ env.IMAGE_TAG_LATEST }} + + - name: Build Docker image + uses: docker/build-push-action@v3 + with: + context: . + push: true + file: ./.github/deployment/Dockerfile + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/packages/data-center/src/datacenter.ts b/packages/data-center/src/datacenter.ts index 3e31e9f1d8..59f72b6afc 100644 --- a/packages/data-center/src/datacenter.ts +++ b/packages/data-center/src/datacenter.ts @@ -4,8 +4,13 @@ import { Workspace, Signal } from '@blocksuite/store'; import { getLogger } from './index.js'; import { getApis, Apis } from './apis/index.js'; -import { AffineProvider, BaseProvider } from './provider/index.js'; -import { LocalProvider } from './provider/index.js'; +import { + AffineProvider, + BaseProvider, + LocalProvider, + SelfHostedProvider, +} from './provider/index.js'; + import { getKVConfigure } from './store.js'; // load workspace's config @@ -43,6 +48,7 @@ export class DataCenter { const dc = new DataCenter(debug); dc.addProvider(AffineProvider); dc.addProvider(LocalProvider); + dc.addProvider(SelfHostedProvider); return dc; } diff --git a/packages/data-center/src/provider/index.ts b/packages/data-center/src/provider/index.ts index 5718b7eef0..dc4667af7c 100644 --- a/packages/data-center/src/provider/index.ts +++ b/packages/data-center/src/provider/index.ts @@ -20,3 +20,4 @@ export type { Apis, ConfigStore, DataCenterSignals, Workspace }; export type { BaseProvider } from './base.js'; export { AffineProvider } from './affine/index.js'; export { LocalProvider } from './local/index.js'; +export { SelfHostedProvider } from './selfhosted/index.js'; diff --git a/packages/data-center/src/provider/selfhosted/index.ts b/packages/data-center/src/provider/selfhosted/index.ts new file mode 100644 index 0000000000..48632480ef --- /dev/null +++ b/packages/data-center/src/provider/selfhosted/index.ts @@ -0,0 +1,63 @@ +import assert from 'assert'; + +import { LocalProvider } from '../local/index.js'; +import { WebsocketProvider } from './sync.js'; + +export class SelfHostedProvider extends LocalProvider { + static id = 'selfhosted'; + private _ws?: WebsocketProvider; + + constructor() { + super(); + } + + async destroy() { + this._ws?.disconnect(); + } + + async initData() { + const databases = await indexedDB.databases(); + await super.initData( + // set locally to true if exists a same name db + databases + .map(db => db.name) + .filter(v => v) + .includes(this._workspace.room) + ); + + const workspace = this._workspace; + const doc = workspace.doc; + + if (workspace.room) { + try { + // Wait for ws synchronization to complete, otherwise the data will be modified in reverse, which can be optimized later + this._ws = new WebsocketProvider(this.host, workspace.room, doc); + await new Promise((resolve, reject) => { + // TODO: synced will also be triggered on reconnection after losing sync + // There needs to be an event mechanism to emit the synchronization state to the upper layer + assert(this._ws); + this._ws.once('synced', () => resolve()); + this._ws.once('lost-connection', () => resolve()); + this._ws.once('connection-error', () => reject()); + }); + this._signals.listAdd.emit({ + workspace: workspace.room, + provider: this.id, + locally: true, + }); + } catch (e) { + this._logger('Failed to init cloud workspace', e); + } + } + + // if after update, the space:meta is empty + // then we need to get map with doc + // just a workaround for yjs + doc.getMap('space:meta'); + } + + private get host() { + const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + return `${protocol}//${location.host}/collaboration/`; + } +} diff --git a/packages/data-center/src/provider/selfhosted/sync.js b/packages/data-center/src/provider/selfhosted/sync.js new file mode 100644 index 0000000000..54c8dec628 --- /dev/null +++ b/packages/data-center/src/provider/selfhosted/sync.js @@ -0,0 +1,508 @@ +/* eslint-disable no-undef */ +/** + * @module provider/websocket + */ + +/* eslint-env browser */ + +// import * as Y from 'yjs'; // eslint-disable-line +import * as bc from 'lib0/broadcastchannel'; +import * as time from 'lib0/time'; +import * as encoding from 'lib0/encoding'; +import * as decoding from 'lib0/decoding'; +import * as syncProtocol from 'y-protocols/sync'; +import * as authProtocol from 'y-protocols/auth'; +import * as awarenessProtocol from 'y-protocols/awareness'; +import { Observable } from 'lib0/observable'; +import * as math from 'lib0/math'; +import * as url from 'lib0/url'; + +export const messageSync = 0; +export const messageQueryAwareness = 3; +export const messageAwareness = 1; +export const messageAuth = 2; + +/** + * encoder, decoder, provider, emitSynced, messageType + * @type {Array} + */ +const messageHandlers = []; + +messageHandlers[messageSync] = ( + encoder, + decoder, + provider, + emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageSync); + const syncMessageType = syncProtocol.readSyncMessage( + decoder, + encoder, + provider.doc, + provider + ); + if ( + emitSynced && + syncMessageType === syncProtocol.messageYjsSyncStep2 && + !provider.synced + ) { + provider.synced = true; + } +}; + +messageHandlers[messageQueryAwareness] = ( + encoder, + _decoder, + provider, + _emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + provider.awareness, + Array.from(provider.awareness.getStates().keys()) + ) + ); +}; + +messageHandlers[messageAwareness] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType +) => { + awarenessProtocol.applyAwarenessUpdate( + provider.awareness, + decoding.readVarUint8Array(decoder), + provider + ); +}; + +messageHandlers[messageAuth] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType +) => { + authProtocol.readAuthMessage(decoder, provider.doc, (_ydoc, reason) => + permissionDeniedHandler(provider, reason) + ); +}; + +// @todo - this should depend on awareness.outdatedTime +const messageReconnectTimeout = 30000; + +/** + * @param {WebsocketProvider} provider + * @param {string} reason + */ +const permissionDeniedHandler = (provider, reason) => + console.warn(`Permission denied to access ${provider.url}.\n${reason}`); + +/** + * @param {WebsocketProvider} provider + * @param {Uint8Array} buf + * @param {boolean} emitSynced + * @return {encoding.Encoder} + */ +const readMessage = (provider, buf, emitSynced) => { + const decoder = decoding.createDecoder(buf); + const encoder = encoding.createEncoder(); + const messageType = decoding.readVarUint(decoder); + const messageHandler = provider.messageHandlers[messageType]; + if (/** @type {any} */ (messageHandler)) { + messageHandler(encoder, decoder, provider, emitSynced, messageType); + } else { + console.error('Unable to compute message'); + } + return encoder; +}; + +/** + * @param {WebsocketProvider} provider + */ +const setupWS = provider => { + if (provider.shouldConnect && provider.ws === null) { + const websocket = new provider._WS(provider.url, 'AFFiNE'); + websocket.binaryType = 'arraybuffer'; + provider.ws = websocket; + provider.wsconnecting = true; + provider.wsconnected = false; + provider.synced = false; + + websocket.onmessage = event => { + provider.wsLastMessageReceived = time.getUnixTime(); + const encoder = readMessage(provider, new Uint8Array(event.data), true); + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)); + } + }; + websocket.onerror = event => { + provider.emit('connection-error', [event, provider]); + }; + websocket.onclose = event => { + provider.emit('connection-close', [event, provider]); + provider.ws = null; + provider.wsconnecting = false; + if (provider.wsconnected) { + provider.wsconnected = false; + provider.synced = false; + // update awareness (all users except local left) + awarenessProtocol.removeAwarenessStates( + provider.awareness, + Array.from(provider.awareness.getStates().keys()).filter( + client => client !== provider.doc.clientID + ), + provider + ); + provider.emit('status', [ + { + status: 'disconnected', + }, + ]); + } else { + provider.wsUnsuccessfulReconnects++; + } + // Start with no reconnect timeout and increase timeout by + // using exponential backoff starting with 100ms + setTimeout( + setupWS, + math.min( + math.pow(2, provider.wsUnsuccessfulReconnects) * 100, + provider.maxBackoffTime + ), + provider + ); + }; + websocket.onopen = () => { + provider.wsLastMessageReceived = time.getUnixTime(); + provider.wsconnecting = false; + provider.wsconnected = true; + provider.wsUnsuccessfulReconnects = 0; + provider.emit('status', [ + { + status: 'connected', + }, + ]); + // always send sync step 1 when connected + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeSyncStep1(encoder, provider.doc); + websocket.send(encoding.toUint8Array(encoder)); + // broadcast local awareness state + if (provider.awareness.getLocalState() !== null) { + const encoderAwarenessState = encoding.createEncoder(); + encoding.writeVarUint(encoderAwarenessState, messageAwareness); + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [ + provider.doc.clientID, + ]) + ); + websocket.send(encoding.toUint8Array(encoderAwarenessState)); + } + }; + + provider.emit('status', [ + { + status: 'connecting', + }, + ]); + } +}; + +/** + * @param {WebsocketProvider} provider + * @param {ArrayBuffer} buf + */ +const broadcastMessage = (provider, buf) => { + if (provider.wsconnected) { + /** @type {WebSocket} */ (provider.ws).send(buf); + } + if (provider.bcconnected) { + bc.publish(provider.bcChannel, buf, provider); + } +}; + +/** + * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. + * The document name is attached to the provided url. I.e. the following example + * creates a websocket connection to http://localhost:1234/my-document-name + * + * @example + * import * as Y from 'yjs' + * import { WebsocketProvider } from 'y-websocket' + * const doc = new Y.Doc() + * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) + * + * @extends {Observable} + */ +export class WebsocketProvider extends Observable { + /** + * @param {string} serverUrl + * @param {string} roomname + * @param {Y.Doc} doc + * @param {object} [opts] + * @param {boolean} [opts.connect] + * @param {awarenessProtocol.Awareness} [opts.awareness] + * @param {Object} [opts.params] + * @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill + * @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds + * @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff) + * @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication + */ + constructor( + serverUrl, + roomname, + doc, + { + connect = true, + awareness = new awarenessProtocol.Awareness(doc), + params = {}, + WebSocketPolyfill = WebSocket, + resyncInterval = -1, + maxBackoffTime = 2500, + disableBc = false, + } = {} + ) { + super(); + // ensure that url is always ends with / + while (serverUrl[serverUrl.length - 1] === '/') { + serverUrl = serverUrl.slice(0, serverUrl.length - 1); + } + const encodedParams = url.encodeQueryParams(params); + this.maxBackoffTime = maxBackoffTime; + this.bcChannel = serverUrl + '/' + roomname; + this.url = + serverUrl + + '/' + + roomname + + (encodedParams.length === 0 ? '' : '?' + encodedParams); + this.roomname = roomname; + this.doc = doc; + this._WS = WebSocketPolyfill; + this.awareness = awareness; + this.wsconnected = false; + this.wsconnecting = false; + this.bcconnected = false; + this.disableBc = disableBc; + this.wsUnsuccessfulReconnects = 0; + this.messageHandlers = messageHandlers.slice(); + /** + * @type {boolean} + */ + this._synced = false; + /** + * @type {WebSocket?} + */ + this.ws = null; + this.wsLastMessageReceived = 0; + /** + * Whether to connect to other peers or not + * @type {boolean} + */ + this.shouldConnect = connect; + + /** + * @type {number} + */ + this._resyncInterval = 0; + if (resyncInterval > 0) { + this._resyncInterval = /** @type {any} */ ( + setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + // resend sync step 1 + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeSyncStep1(encoder, doc); + this.ws.send(encoding.toUint8Array(encoder)); + } + }, resyncInterval) + ); + } + + /** + * @param {ArrayBuffer} data + * @param {any} origin + */ + this._bcSubscriber = (data, origin) => { + if (origin !== this) { + const encoder = readMessage(this, new Uint8Array(data), false); + if (encoding.length(encoder) > 1) { + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this); + } + } + }; + /** + * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + * @param {Uint8Array} update + * @param {any} origin + */ + this._updateHandler = (update, origin) => { + if (origin !== this) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + broadcastMessage(this, encoding.toUint8Array(encoder)); + } + }; + this.doc.on('update', this._updateHandler); + /** + * @param {any} changed + * @param {any} _origin + */ + this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { + const changedClients = added.concat(updated).concat(removed); + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) + ); + broadcastMessage(this, encoding.toUint8Array(encoder)); + }; + this._unloadHandler = () => { + awarenessProtocol.removeAwarenessStates( + this.awareness, + [doc.clientID], + 'window unload' + ); + }; + if (typeof window !== 'undefined') { + window.addEventListener('unload', this._unloadHandler); + } else if (typeof process !== 'undefined') { + process.on('exit', this._unloadHandler); + } + awareness.on('update', this._awarenessUpdateHandler); + this._checkInterval = /** @type {any} */ ( + setInterval(() => { + if ( + this.wsconnected && + messageReconnectTimeout < + time.getUnixTime() - this.wsLastMessageReceived + ) { + // no message received in a long time - not even your own awareness + // updates (which are updated every 15 seconds) + /** @type {WebSocket} */ (this.ws).close(); + } + }, messageReconnectTimeout / 10) + ); + if (connect) { + this.connect(); + } + } + + /** + * @type {boolean} + */ + get synced() { + return this._synced; + } + + set synced(state) { + if (this._synced !== state) { + this._synced = state; + this.emit('synced', [state]); + this.emit('sync', [state]); + } + } + + destroy() { + if (this._resyncInterval !== 0) { + clearInterval(this._resyncInterval); + } + clearInterval(this._checkInterval); + this.disconnect(); + if (typeof window !== 'undefined') { + window.removeEventListener('unload', this._unloadHandler); + } else if (typeof process !== 'undefined') { + process.off('exit', this._unloadHandler); + } + this.awareness.off('update', this._awarenessUpdateHandler); + this.doc.off('update', this._updateHandler); + super.destroy(); + } + + connectBc() { + if (this.disableBc) { + return; + } + if (!this.bcconnected) { + bc.subscribe(this.bcChannel, this._bcSubscriber); + this.bcconnected = true; + } + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder(); + encoding.writeVarUint(encoderSync, messageSync); + syncProtocol.writeSyncStep1(encoderSync, this.doc); + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this); + // broadcast local state + const encoderState = encoding.createEncoder(); + encoding.writeVarUint(encoderState, messageSync); + syncProtocol.writeSyncStep2(encoderState, this.doc); + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this); + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder(); + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness); + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessQuery), + this + ); + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder(); + encoding.writeVarUint(encoderAwarenessState, messageAwareness); + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ]) + ); + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessState), + this + ); + } + + disconnectBc() { + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.awareness, + [this.doc.clientID], + new Map() + ) + ); + broadcastMessage(this, encoding.toUint8Array(encoder)); + if (this.bcconnected) { + bc.unsubscribe(this.bcChannel, this._bcSubscriber); + this.bcconnected = false; + } + } + + disconnect() { + this.shouldConnect = false; + this.disconnectBc(); + if (this.ws !== null) { + this.ws.close(); + } + } + + connect() { + this.shouldConnect = true; + if (!this.wsconnected && this.ws === null) { + setupWS(this); + this.connectBc(); + } + } +}