Read only mongo adapter for elastic rebuild (#671)

Signed-off-by: Denis Bykhov <80476319+BykhovDenis@users.noreply.github.com>
This commit is contained in:
Denis Bykhov 2021-12-20 15:05:50 +06:00 committed by GitHub
parent 74e09c19a8
commit 9a8d9f8804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,14 +14,42 @@
// limitations under the License.
//
import core, { Account, Doc, DOMAIN_TX, generateId, Ref, ServerStorage, Tx } from '@anticrm/core'
import core, {
Account,
Class,
Doc,
FindOptions,
DocumentQuery,
DOMAIN_TX,
FindResult,
generateId,
Hierarchy,
ModelDb,
Ref,
ServerStorage,
Tx,
TxCreateDoc,
TxMixin,
TxProcessor,
TxPutBag,
TxRemoveDoc,
TxResult,
TxUpdateDoc
} from '@anticrm/core'
import { Client as ElasticClient } from '@elastic/elasticsearch'
import { Db, MongoClient } from 'mongodb'
import { Client } from 'minio'
import { createElasticAdapter } from '@anticrm/elastic'
import { createServerStorage, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import {
createServerStorage,
DbAdapter,
DbConfiguration,
FullTextAdapter,
IndexedDoc,
TxAdapter
} from '@anticrm/server-core'
import { DOMAIN_ATTACHMENT } from '@anticrm/model-attachment'
import { createInMemoryAdapter, createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { createMongoAdapter, createMongoTxAdapter } from '@anticrm/mongo'
import { serverChunterId } from '@anticrm/server-chunter'
import { serverRecruitId } from '@anticrm/server-recruit'
import { addLocation } from '@anticrm/platform'
@ -83,7 +111,6 @@ async function restoreElastic (mongoUrl: string, dbName: string, minio: Client,
}
if (await minio.bucketExists(dbName)) {
const minioObjects = await listMinioObjects(minio, dbName)
for (const d of minioObjects) {
await indexAttachment(elastic, minio, db, dbName, d.name)
}
@ -98,15 +125,15 @@ async function createStorage (mongoUrl: string, elasticUrl: string, workspace: s
domains: {
[DOMAIN_TX]: 'MongoTx'
},
defaultAdapter: 'InMemory',
defaultAdapter: 'Mongo',
adapters: {
MongoTx: {
factory: createInMemoryTxAdapter,
factory: createMongoReadOnlyTxAdapter,
url: mongoUrl
},
InMemory: {
factory: createInMemoryAdapter,
url: ''
Mongo: {
factory: createMongoReadOnlyAdapter,
url: mongoUrl
}
},
fulltextAdapter: {
@ -132,8 +159,7 @@ async function indexAttachment (
const data = await minio.getObject(dbName, name)
const chunks: Buffer[] = []
await new Promise((resolve) => {
await new Promise<void>((resolve) => {
data.on('readable', () => {
let chunk
while ((chunk = data.read()) !== null) {
@ -143,7 +169,7 @@ async function indexAttachment (
})
data.on('end', () => {
resolve(null)
resolve()
})
})
@ -161,3 +187,75 @@ async function indexAttachment (
await elastic.index(indexedDoc)
}
async function createMongoReadOnlyAdapter (
hierarchy: Hierarchy,
url: string,
dbName: string,
modelDb: ModelDb
): Promise<DbAdapter> {
const adapter = await createMongoAdapter(hierarchy, url, dbName, modelDb)
return new MongoReadOnlyAdapter(adapter)
}
async function createMongoReadOnlyTxAdapter (
hierarchy: Hierarchy,
url: string,
dbName: string,
modelDb: ModelDb
): Promise<TxAdapter> {
const adapter = await createMongoTxAdapter(hierarchy, url, dbName, modelDb)
return new MongoReadOnlyTxAdapter(adapter)
}
class MongoReadOnlyAdapter extends TxProcessor implements DbAdapter {
constructor (protected readonly adapter: DbAdapter) {
super()
}
protected txCreateDoc (tx: TxCreateDoc<Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txPutBag (tx: TxPutBag<any>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txUpdateDoc (tx: TxUpdateDoc<Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txRemoveDoc (tx: TxRemoveDoc<Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txMixin (tx: TxMixin<Doc, Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
async init (model: Tx[]): Promise<void> {
return await this.adapter.init(model)
}
async findAll<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return await this.adapter.findAll(_class, query, options)
}
override tx (tx: Tx): Promise<TxResult> {
return new Promise((resolve) => resolve({}))
}
}
class MongoReadOnlyTxAdapter extends MongoReadOnlyAdapter implements TxAdapter {
constructor (protected readonly adapter: TxAdapter) {
super(adapter)
}
async getModel (): Promise<Tx[]> {
return await this.adapter.getModel()
}
}