mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 03:14:40 +03:00
UBERF-7854: Fix live query $lookup update (#6304)
This commit is contained in:
parent
00633cda71
commit
9215e966cd
@ -1111,15 +1111,21 @@ export class LiveQuery implements WithTx, Client {
|
||||
for (const resDoc of docs) {
|
||||
const obj = getObjectValue(objWay, resDoc)
|
||||
if (obj === undefined) continue
|
||||
const value = getObjectValue('$lookup.' + key, obj)
|
||||
let value = getObjectValue('$lookup.' + key, obj)
|
||||
const reverseCheck = reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id
|
||||
if (value == null && reverseCheck) {
|
||||
value = []
|
||||
obj.$lookup[key] = value
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc)) {
|
||||
if (reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id) {
|
||||
if ((value as Doc[]).find((p) => p._id === doc._id) === undefined) {
|
||||
value.push(doc)
|
||||
needCallback = true
|
||||
}
|
||||
if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc) && reverseCheck) {
|
||||
const idx = (value as Doc[]).findIndex((p) => p._id === doc._id)
|
||||
if (idx === -1) {
|
||||
value.push(doc)
|
||||
} else {
|
||||
value[idx] = doc
|
||||
}
|
||||
needCallback = true
|
||||
}
|
||||
} else {
|
||||
if (obj[key] === doc._id) {
|
||||
|
@ -84,7 +84,7 @@
|
||||
labelIntl: getEmbeddedLabel('Github Repositories')
|
||||
}
|
||||
]
|
||||
let selectedTab: string = tabs[0].id
|
||||
let selectedTab = tabs[0].id
|
||||
|
||||
$: loading = $ticker - (auth?.authRequestTime ?? 0) < 5000
|
||||
</script>
|
||||
|
@ -15,7 +15,7 @@ import core, {
|
||||
Ref,
|
||||
TxOperations
|
||||
} from '@hcengineering/core'
|
||||
import github, { GithubAuthentication, GithubIntegration, makeQuery } from '@hcengineering/github'
|
||||
import github, { GithubAuthentication, makeQuery } from '@hcengineering/github'
|
||||
import { MongoClientReference, getMongoClient } from '@hcengineering/mongo'
|
||||
import { setMetadata } from '@hcengineering/platform'
|
||||
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
|
||||
@ -104,11 +104,6 @@ export class PlatformWorker {
|
||||
this.integrations = await this.integrationCollection.find({}).toArray()
|
||||
await this.queryInstallations(ctx)
|
||||
|
||||
const workspacesToCheck = new Set<string>()
|
||||
// We need to delete local integrations not retrieved by queryInstallations()
|
||||
for (const intValue of this.integrations) {
|
||||
workspacesToCheck.add(intValue.workspace)
|
||||
}
|
||||
for (const integr of [...this.integrations]) {
|
||||
// We need to check and remove integrations without a real integration's
|
||||
if (!this.installations.has(integr.installationId)) {
|
||||
@ -116,31 +111,11 @@ export class PlatformWorker {
|
||||
installationId: integr.installationId,
|
||||
workspace: integr.workspace
|
||||
})
|
||||
await this.integrationCollection.deleteOne({ installationId: integr.installationId })
|
||||
this.integrations = this.integrations.filter((it) => it.installationId !== integr.installationId)
|
||||
}
|
||||
}
|
||||
|
||||
const checkClean = async (): Promise<void> => {
|
||||
const rateLimit = new RateLimiter(10)
|
||||
for (const workspace of workspacesToCheck) {
|
||||
// We need to connect to workspace and verify all installations and clean if required
|
||||
try {
|
||||
await rateLimit.add(async () => {
|
||||
ctx.info('check clean', { workspace })
|
||||
try {
|
||||
await this.cleanWorkspaceInstallations(ctx, workspace)
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to check clean', { workspace })
|
||||
}
|
||||
})
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to clean workspace', { err, workspace })
|
||||
}
|
||||
}
|
||||
await rateLimit.waitProcessing()
|
||||
}
|
||||
void checkClean()
|
||||
|
||||
void this.doSyncWorkspaces().catch((err) => {
|
||||
ctx.error('error during sync workspaces', { err })
|
||||
process.exit(1)
|
||||
@ -181,6 +156,7 @@ export class PlatformWorker {
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
this.triggerCheckWorkspaces = resolve
|
||||
this.ctx.info('Workspaces check triggered')
|
||||
if (errors) {
|
||||
setTimeout(resolve, 5000)
|
||||
}
|
||||
@ -217,44 +193,6 @@ export class PlatformWorker {
|
||||
return (await this.usersCollection.find<GithubUserRecord>({ _id: login }).toArray()).shift()
|
||||
}
|
||||
|
||||
async cleanWorkspaceInstallations (ctx: MeasureContext, workspace: string, installId?: number): Promise<void> {
|
||||
// TODO: Do not remove record from $github if we failed to clean github installations inside workspace.
|
||||
const token = generateToken(
|
||||
config.SystemEmail,
|
||||
{
|
||||
name: workspace,
|
||||
productId: config.ProductID
|
||||
},
|
||||
{ mode: 'github' }
|
||||
)
|
||||
let workspaceInfo: ClientWorkspaceInfo
|
||||
try {
|
||||
workspaceInfo = await getWorkspaceInfo(token)
|
||||
} catch (err: any) {
|
||||
ctx.error('Workspace not found:', { workspace })
|
||||
return
|
||||
}
|
||||
if (workspaceInfo === undefined) {
|
||||
ctx.error('No workspace found', { workspace })
|
||||
return
|
||||
}
|
||||
let client: Client | undefined
|
||||
try {
|
||||
client = await createPlatformClient(workspace, config.ProductID, 10000)
|
||||
const ops = new TxOperations(client, core.account.System)
|
||||
|
||||
const wsIntegerations = await client.findAll(github.class.GithubIntegration, {})
|
||||
|
||||
for (const intValue of wsIntegerations) {
|
||||
if (!this.installations.has(intValue.installationId) || intValue.installationId === installId) {
|
||||
await ops.remove<GithubIntegration>(intValue)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await client?.close()
|
||||
}
|
||||
}
|
||||
|
||||
async mapInstallation (
|
||||
ctx: MeasureContext,
|
||||
workspace: string,
|
||||
@ -297,8 +235,6 @@ export class PlatformWorker {
|
||||
installation_id: installationId
|
||||
})
|
||||
}
|
||||
// Clean workspace
|
||||
await this.cleanWorkspaceInstallations(ctx, workspace, installationId)
|
||||
this.triggerCheckWorkspaces()
|
||||
}
|
||||
|
||||
@ -679,6 +615,7 @@ export class PlatformWorker {
|
||||
index: widx,
|
||||
total: workspaces.length
|
||||
})
|
||||
|
||||
const worker = await GithubWorker.create(
|
||||
this,
|
||||
workerCtx,
|
||||
@ -708,6 +645,12 @@ export class PlatformWorker {
|
||||
// No if no integration, we will try connect one more time in a time period
|
||||
this.clients.set(workspace, worker)
|
||||
} else {
|
||||
workerCtx.info('Failed Register worker, timeout or integrations removed', {
|
||||
workspaceId: workspaceInfo.workspaceId,
|
||||
workspace: workspaceInfo.workspace,
|
||||
index: widx,
|
||||
total: workspaces.length
|
||||
})
|
||||
errors++
|
||||
}
|
||||
} catch (e: any) {
|
||||
@ -729,7 +672,6 @@ export class PlatformWorker {
|
||||
const ws = this.clients.get(deleted)
|
||||
if (ws !== undefined) {
|
||||
try {
|
||||
await ws.ctx.logger.close()
|
||||
this.ctx.info('workspace removed from tracking list', { workspace: deleted })
|
||||
this.clients.delete(deleted)
|
||||
await ws.close()
|
||||
|
@ -1491,6 +1491,8 @@ export class GithubWorker implements IntegrationManager {
|
||||
reconnect(workspace.name, event)
|
||||
})
|
||||
|
||||
await GithubWorker.checkIntegrations(client, installations)
|
||||
|
||||
const worker = new GithubWorker(
|
||||
ctx,
|
||||
platformWorker,
|
||||
@ -1510,6 +1512,17 @@ export class GithubWorker implements IntegrationManager {
|
||||
await client?.close()
|
||||
}
|
||||
}
|
||||
|
||||
static async checkIntegrations (client: Client, installations: Map<number, InstallationRecord>): Promise<void> {
|
||||
const wsIntegerations = await client.findAll(github.class.GithubIntegration, {})
|
||||
|
||||
for (const intValue of wsIntegerations) {
|
||||
if (!installations.has(intValue.installationId)) {
|
||||
const ops = new TxOperations(client, core.account.System)
|
||||
await ops.remove<GithubIntegration>(intValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function syncUser (
|
||||
|
Loading…
Reference in New Issue
Block a user