mirror of
https://github.com/hcengineering/platform.git
synced 2024-12-22 19:11:33 +03:00
UBERF-7260 Handle storage errors in collaborator (#5806)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
7b466b8410
commit
ce4070412f
@ -17,5 +17,5 @@ export * from './history/branch'
|
|||||||
export * from './history/history'
|
export * from './history/history'
|
||||||
export * from './history/snapshot'
|
export * from './history/snapshot'
|
||||||
export * from './utils/collaborative-doc'
|
export * from './utils/collaborative-doc'
|
||||||
export * from './utils/minio'
|
export * from './utils/storage'
|
||||||
export * from './utils/ydoc'
|
export * from './utils/ydoc'
|
||||||
|
@ -28,7 +28,7 @@ import { StorageAdapter } from '@hcengineering/server-core'
|
|||||||
import { yDocBranch } from '../history/branch'
|
import { yDocBranch } from '../history/branch'
|
||||||
import { YDocVersion } from '../history/history'
|
import { YDocVersion } from '../history/history'
|
||||||
import { createYdocSnapshot, restoreYdocSnapshot } from '../history/snapshot'
|
import { createYdocSnapshot, restoreYdocSnapshot } from '../history/snapshot'
|
||||||
import { yDocFromStorage, yDocToStorage } from './minio'
|
import { yDocFromStorage, yDocToStorage } from './storage'
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
export function collaborativeHistoryDocId (id: string): string {
|
export function collaborativeHistoryDocId (id: string): string {
|
||||||
|
@ -27,19 +27,18 @@ export async function yDocFromStorage (
|
|||||||
minioDocumentId: string,
|
minioDocumentId: string,
|
||||||
ydoc?: YDoc
|
ydoc?: YDoc
|
||||||
): Promise<YDoc | undefined> {
|
): Promise<YDoc | undefined> {
|
||||||
|
// stat the object to ensure it exists, because read will throw an error in this case
|
||||||
|
const blob = await storageAdapter.stat(ctx, workspace, minioDocumentId)
|
||||||
|
if (blob === undefined) {
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
|
||||||
// no need to apply gc because we load existing document
|
// no need to apply gc because we load existing document
|
||||||
// it is either already gc-ed, or gc not needed and it is disabled
|
// it is either already gc-ed, or gc not needed and it is disabled
|
||||||
ydoc ??= new YDoc({ gc: false })
|
ydoc ??= new YDoc({ gc: false })
|
||||||
|
|
||||||
try {
|
const buffer = await storageAdapter.read(ctx, workspace, minioDocumentId)
|
||||||
const buffer = await storageAdapter.read(ctx, workspace, minioDocumentId)
|
return yDocFromBuffer(Buffer.concat(buffer), ydoc)
|
||||||
return yDocFromBuffer(Buffer.concat(buffer), ydoc)
|
|
||||||
} catch (err: any) {
|
|
||||||
if (err?.code === 'NoSuchKey' || err?.code === 'NotFound') {
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
throw err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
@ -107,8 +107,8 @@ export class StorageExtension implements Extension {
|
|||||||
return await adapter.loadDocument(ctx, documentId, context)
|
return await adapter.loadDocument(ctx, documentId, context)
|
||||||
})
|
})
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
ctx.error('failed to load document content', { documentId, error: err })
|
ctx.error('failed to load document', { documentId, error: err })
|
||||||
return undefined
|
throw new Error('Failed to load document')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,8 +120,8 @@ export class StorageExtension implements Extension {
|
|||||||
await adapter.saveDocument(ctx, documentId, document, context)
|
await adapter.saveDocument(ctx, documentId, document, context)
|
||||||
})
|
})
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
ctx.error('failed to save document content', { documentId, error: err })
|
ctx.error('failed to save document', { documentId, error: err })
|
||||||
return undefined
|
throw new Error('Failed to save document')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,64 +52,63 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
|
|||||||
) {}
|
) {}
|
||||||
|
|
||||||
async loadDocument (ctx: MeasureContext, documentId: DocumentId, context: Context): Promise<YDoc | undefined> {
|
async loadDocument (ctx: MeasureContext, documentId: DocumentId, context: Context): Promise<YDoc | undefined> {
|
||||||
|
// try to load document content
|
||||||
try {
|
try {
|
||||||
// try to load document content
|
ctx.info('load document content', { documentId })
|
||||||
|
const ydoc = await this.loadDocumentFromStorage(ctx, documentId, context)
|
||||||
|
|
||||||
|
if (ydoc !== undefined) {
|
||||||
|
return ydoc
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
ctx.error('failed to load document content', { documentId, error: err })
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
|
||||||
|
// then try to load from inital content
|
||||||
|
const { initialContentId } = context
|
||||||
|
if (initialContentId !== undefined && initialContentId.length > 0) {
|
||||||
try {
|
try {
|
||||||
ctx.info('load document content', { documentId })
|
ctx.info('load document initial content', { documentId, initialContentId })
|
||||||
const ydoc = await this.loadDocumentFromStorage(ctx, documentId, context)
|
const ydoc = await this.loadDocumentFromStorage(ctx, initialContentId, context)
|
||||||
|
|
||||||
if (ydoc !== undefined) {
|
|
||||||
return ydoc
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
ctx.error('failed to load document content', { documentId, error: err })
|
|
||||||
}
|
|
||||||
|
|
||||||
// then try to load from inital content
|
|
||||||
const { initialContentId } = context
|
|
||||||
if (initialContentId !== undefined && initialContentId.length > 0) {
|
|
||||||
try {
|
|
||||||
ctx.info('load document initial content', { documentId, initialContentId })
|
|
||||||
const ydoc = await this.loadDocumentFromStorage(ctx, initialContentId, context)
|
|
||||||
|
|
||||||
// if document was loaded from the initial content or storage we need to save
|
|
||||||
// it to ensure the next time we load it from the ydoc document
|
|
||||||
if (ydoc !== undefined) {
|
|
||||||
ctx.info('save document content', { documentId, initialContentId })
|
|
||||||
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
|
|
||||||
return ydoc
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
ctx.error('failed to load initial document content', { documentId, initialContentId, error: err })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// finally try to load from the platform
|
|
||||||
const { platformDocumentId } = context
|
|
||||||
if (platformDocumentId !== undefined) {
|
|
||||||
ctx.info('load document platform content', { documentId, platformDocumentId })
|
|
||||||
const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => {
|
|
||||||
try {
|
|
||||||
return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
|
|
||||||
} catch (err) {
|
|
||||||
ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err })
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// if document was loaded from the initial content or storage we need to save
|
// if document was loaded from the initial content or storage we need to save
|
||||||
// it to ensure the next time we load it from the ydoc document
|
// it to ensure the next time we load it from the ydoc document
|
||||||
if (ydoc !== undefined) {
|
if (ydoc !== undefined) {
|
||||||
ctx.info('save document content', { documentId, platformDocumentId })
|
ctx.info('save document content', { documentId, initialContentId })
|
||||||
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
|
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
|
||||||
return ydoc
|
return ydoc
|
||||||
}
|
}
|
||||||
|
} catch (err) {
|
||||||
|
ctx.error('failed to load initial document content', { documentId, initialContentId, error: err })
|
||||||
|
throw err
|
||||||
}
|
}
|
||||||
|
|
||||||
// nothing found
|
|
||||||
return undefined
|
|
||||||
} catch (err) {
|
|
||||||
ctx.error('failed to load document', { documentId, error: err })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// finally try to load from the platform
|
||||||
|
const { platformDocumentId } = context
|
||||||
|
if (platformDocumentId !== undefined) {
|
||||||
|
ctx.info('load document platform content', { documentId, platformDocumentId })
|
||||||
|
const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => {
|
||||||
|
try {
|
||||||
|
return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
|
||||||
|
} catch (err) {
|
||||||
|
ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err })
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// if document was loaded from the initial content or storage we need to save
|
||||||
|
// it to ensure the next time we load it from the ydoc document
|
||||||
|
if (ydoc !== undefined) {
|
||||||
|
ctx.info('save document content', { documentId, platformDocumentId })
|
||||||
|
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
|
||||||
|
return ydoc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nothing found
|
||||||
|
return undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
|
async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
|
||||||
@ -133,6 +132,9 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
|
|||||||
await this.saveDocumentToStorage(ctx, documentId, document, context)
|
await this.saveDocumentToStorage(ctx, documentId, document, context)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
ctx.error('failed to save document', { documentId, error: err })
|
ctx.error('failed to save document', { documentId, error: err })
|
||||||
|
// raise an error if failed to save document to storage
|
||||||
|
// this will prevent document from being unloaded from memory
|
||||||
|
throw err
|
||||||
}
|
}
|
||||||
|
|
||||||
const { platformDocumentId } = context
|
const { platformDocumentId } = context
|
||||||
@ -166,12 +168,9 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
|
|||||||
const adapter = this.getStorageAdapter(storage)
|
const adapter = this.getStorageAdapter(storage)
|
||||||
|
|
||||||
return await ctx.with('load-document', { storage }, async (ctx) => {
|
return await ctx.with('load-document', { storage }, async (ctx) => {
|
||||||
try {
|
return await withRetry(ctx, 5, async () => {
|
||||||
return await loadCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, ctx)
|
return await loadCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, ctx)
|
||||||
} catch (err) {
|
})
|
||||||
ctx.error('failed to load storage document', { documentId, collaborativeDoc, error: err })
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,7 +184,9 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
|
|||||||
const adapter = this.getStorageAdapter(storage)
|
const adapter = this.getStorageAdapter(storage)
|
||||||
|
|
||||||
await ctx.with('save-document', {}, async (ctx) => {
|
await ctx.with('save-document', {}, async (ctx) => {
|
||||||
await saveCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, document, ctx)
|
await withRetry(ctx, 5, async () => {
|
||||||
|
await saveCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, document, ctx)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,3 +292,25 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function withRetry<T> (
|
||||||
|
ctx: MeasureContext,
|
||||||
|
retries: number,
|
||||||
|
op: () => Promise<T>,
|
||||||
|
delay: number = 100
|
||||||
|
): Promise<T> {
|
||||||
|
let error: any
|
||||||
|
while (retries > 0) {
|
||||||
|
retries--
|
||||||
|
try {
|
||||||
|
return await op()
|
||||||
|
} catch (err: any) {
|
||||||
|
error = err
|
||||||
|
ctx.error('error', err)
|
||||||
|
if (retries !== 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user