From c92bd622ac06fb7e5e0873a7517ccdec74cfcfc1 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 23 Aug 2024 21:02:48 +0700 Subject: [PATCH] UBERF-7924: Fix workspace variable in logs + reuse installation account (#6376) --- services/github/pod-github/src/platform.ts | 87 ++++++++++----- .../github/pod-github/src/sync/repository.ts | 71 ++++++++++++- services/github/pod-github/src/users.ts | 69 ++++++++++++ services/github/pod-github/src/worker.ts | 100 +++++++++++------- 4 files changed, 258 insertions(+), 69 deletions(-) create mode 100644 services/github/pod-github/src/users.ts diff --git a/services/github/pod-github/src/platform.ts b/services/github/pod-github/src/platform.ts index c2aed038a3..3ddc0a3908 100644 --- a/services/github/pod-github/src/platform.ts +++ b/services/github/pod-github/src/platform.ts @@ -21,7 +21,7 @@ import { setMetadata } from '@hcengineering/platform' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import serverToken, { generateToken } from '@hcengineering/server-token' import tracker from '@hcengineering/tracker' -import { Installation } from '@octokit/webhooks-types' +import { Installation, type InstallationCreatedEvent, type InstallationUnsuspendEvent } from '@octokit/webhooks-types' import { Collection } from 'mongodb' import { App, Octokit } from 'octokit' @@ -38,12 +38,15 @@ import { registerLoaders } from './loaders' import { createNotification } from './notifications' import { errorToObj } from './sync/utils' import { GithubIntegrationRecord, GithubUserRecord } from './types' +import { UserManager } from './users' import { GithubWorker, syncUser } from './worker' export interface InstallationRecord { installationName: string login: string loginNodeId: string + + repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'] type: 'Bot' | 'User' | 'Organization' octokit: Octokit } @@ -60,12 +63,14 @@ export class PlatformWorker { mongoRef!: MongoClientReference integrationCollection!: Collection - usersCollection!: Collection + periodicTimer: any periodicSyncPromise: Promise | undefined canceled = false + userManager!: UserManager + private constructor ( readonly ctx: MeasureContext, readonly app: App, @@ -82,7 +87,8 @@ export class PlatformWorker { const db = mongoClient.db(config.ConfigurationDB) this.integrationCollection = db.collection('installations') - this.usersCollection = db.collection('users') + + this.userManager = new UserManager(db.collection('users')) const storageConfig = storageConfigFromEnv() this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL) @@ -165,7 +171,7 @@ export class PlatformWorker { } private async findUsersWorkspaces (): Promise> { - const i = this.usersCollection.find({}) + const i = this.userManager.getAllUsers() const workspaces = new Map() while (await i.hasNext()) { const userInfo = await i.next() @@ -178,19 +184,16 @@ export class PlatformWorker { } } } + await i.close() return workspaces } public async getUsers (workspace: string): Promise { - return await this.usersCollection - .find({ - [`accounts.${workspace}`]: { $exists: true } - }) - .toArray() + return await this.userManager.getUsers(workspace) } public async getUser (login: string): Promise { - return (await this.usersCollection.find({ _id: login }).toArray()).shift() + return await this.userManager.getAccount(login) } async mapInstallation ( @@ -263,8 +266,8 @@ export class PlatformWorker { private async removeInstallationFromWorkspace (client: Client, installationId: number): Promise { const wsIntegerations = await client.findAll(github.class.GithubIntegration, { installationId }) + const ops = new TxOperations(client, core.account.System) for (const intValue of wsIntegerations) { - const ops = new TxOperations(client, core.account.System) await ops.remove(intValue) } } @@ -347,12 +350,13 @@ export class PlatformWorker { scope: resultJson.scope, accounts: { [payload.workspace]: payload.accountId } } - const [existingUser] = await this.usersCollection.find({ _id: user.data.login }).toArray() - if (existingUser === undefined) { - await this.usersCollection.insertOne(dta) + await this.userManager.updateUser(dta) + const existingUser = await this.userManager.getAccount(user.data.login) + if (existingUser == null) { + await this.userManager.insertUser(dta) } else { dta.accounts = { ...existingUser.accounts, [payload.workspace]: payload.accountId } - await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any) + await this.userManager.updateUser(dta) } // Update workspace client login info. @@ -520,17 +524,17 @@ export class PlatformWorker { auth.refreshTokenExpiresIn = dta.refreshTokenExpiresIn auth.scope = dta.scope - await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any) + await this.userManager.updateUser(dta) } } } async getAccount (login: string): Promise { - return (await this.usersCollection.findOne({ _id: login })) ?? undefined + return await this.userManager.getAccount(login) } async getAccountByRef (workspace: string, ref: Ref): Promise { - return (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined + return await this.userManager.getAccountByRef(workspace, ref) } private async updateInstallation (installationId: number): Promise { @@ -544,6 +548,24 @@ export class PlatformWorker { type: tinst.account?.type ?? 'User', installationName: `${tinst.account?.html_url ?? ''}` } + this.updateInstallationRecord(installationId, val) + } + } + + private updateInstallationRecord (installationId: number, val: InstallationRecord): void { + const current = this.installations.get(installationId) + if (current !== undefined) { + if (val.octokit !== undefined) { + current.octokit = val.octokit + } + current.login = val.login + current.loginNodeId = val.loginNodeId + current.type = val.type + current.installationName = val.installationName + if (val.repositories !== undefined) { + current.repositories = val.repositories + } + } else { this.installations.set(installationId, val) } } @@ -558,7 +580,7 @@ export class PlatformWorker { type: tinst.account?.type ?? 'User', installationName: `${tinst.account?.html_url ?? ''}` } - this.installations.set(install.installation.id, val) + this.updateInstallationRecord(install.installation.id, val) ctx.info('Found installation', { installationId: install.installation.id, url: install.installation.account?.html_url ?? '' @@ -566,16 +588,22 @@ export class PlatformWorker { } } - async handleInstallationEvent (install: Installation, enabled: boolean): Promise { + async handleInstallationEvent ( + install: Installation, + repositories: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'], + enabled: boolean + ): Promise { this.ctx.info('handle integration add', { installId: install.id, name: install.html_url }) const okit = await this.app.getInstallationOctokit(install.id) const iName = `${install.account.html_url ?? ''}` - this.installations.set(install.id, { + + this.updateInstallationRecord(install.id, { octokit: okit, login: install.account.login, type: install.account?.type ?? 'User', loginNodeId: install.account.node_id, - installationName: iName + installationName: iName, + repositories }) const worker = this.getWorker(install.id) @@ -612,6 +640,9 @@ export class PlatformWorker { if (integeration !== undefined) { integeration.enabled = false integeration.synchronized = new Set() + + await this.removeInstallationFromWorkspace(worker._client, installId) + await worker._client.remove(integeration.integration) } worker.integrations.delete(installId) @@ -797,11 +828,11 @@ export class PlatformWorker { if (event.payload.action === 'revoked') { const sender = event.payload.sender - const records = await this.usersCollection.find({ _id: sender.login }).toArray() - for (const r of records) { - await this.revokeUserAuth(r) + const record = await this.getAccount(sender.login) + if (record !== undefined) { + await this.revokeUserAuth(record) + await this.userManager.removeUser(sender.login) } - await this.usersCollection.deleteOne({ _id: sender.login }) } }) @@ -902,7 +933,7 @@ export class PlatformWorker { case 'created': case 'unsuspend': { catchEventError( - this.handleInstallationEvent(payload.installation, true), + this.handleInstallationEvent(payload.installation, payload.repositories, true), payload.action, name, id, @@ -912,7 +943,7 @@ export class PlatformWorker { } case 'suspend': { catchEventError( - this.handleInstallationEvent(payload.installation, false), + this.handleInstallationEvent(payload.installation, payload.repositories, false), payload.action, name, id, diff --git a/services/github/pod-github/src/sync/repository.ts b/services/github/pod-github/src/sync/repository.ts index ce5bb45a20..245068c8be 100644 --- a/services/github/pod-github/src/sync/repository.ts +++ b/services/github/pod-github/src/sync/repository.ts @@ -4,9 +4,14 @@ // import core, { Doc, DocData, DocumentUpdate, MeasureContext, TxOperations, generateId } from '@hcengineering/core' -import { Endpoints } from '@octokit/types' -import { Repository, RepositoryEvent } from '@octokit/webhooks-types' import github, { DocSyncInfo, GithubIntegrationRepository, GithubProject } from '@hcengineering/github' +import { Endpoints } from '@octokit/types' +import { + Repository, + RepositoryEvent, + type InstallationCreatedEvent, + type InstallationUnsuspendEvent +} from '@octokit/webhooks-types' import { App } from 'octokit' import { DocSyncManager, ExternalSyncField, IntegrationContainer, IntegrationManager } from '../types' import { collectUpdate } from './utils' @@ -34,8 +39,67 @@ export class RepositorySyncMapper implements DocSyncManager { return {} } - async reloadRepositories (integration: IntegrationContainer): Promise { + async reloadRepositories ( + integration: IntegrationContainer, + repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'] + ): Promise { integration.synchronized.delete(syncReposKey) + + if (repositories !== undefined) { + // We have a list of repositories, so we could create them if they are missing. + // Need to find all repositories, not only active, so passed repositories are not work. + const allRepositories = ( + await this.provider.liveQuery.queryFind(github.class.GithubIntegrationRepository, {}) + ).filter((it) => it.attachedTo === integration.integration._id) + + const allRepos: GithubIntegrationRepository[] = [...allRepositories] + for (const repository of repositories) { + const integrationRepo: GithubIntegrationRepository | undefined = allRepos.find( + (it) => it.repositoryId === repository.id + ) + + if (integrationRepo === undefined) { + // No integration repository found, we need to push one. + await this.client.addCollection( + github.class.GithubIntegrationRepository, + integration.integration.space, + integration.integration._id, + integration.integration._class, + 'repositories', + { + nodeId: repository.node_id, + name: repository.name, + url: integration.installationName + '/' + repository.name, + repositoryId: repository.id, + enabled: true, + deleted: false, + archived: false, + fork: false, + forks: 0, + hasDiscussions: false, + hasDownloads: false, + hasIssues: false, + hasPages: false, + hasProjects: false, + hasWiki: false, + openIssues: 0, + private: repository.private, + size: 0, + stargazers: 0, + watchers: 0, + visibility: repository.private ? 'private' : 'public' + }, + undefined, // id + Date.now(), + integration.integration.createdBy + ) + this.ctx.info('Creating repository info document...', { + url: repository.full_name, + workspace: this.provider.getWorkspaceId().name + }) + } + } + } } async handleEvent(integration: IntegrationContainer, derivedClient: TxOperations, evt: T): Promise { @@ -191,6 +255,7 @@ export class RepositorySyncMapper implements DocSyncManager { installationId: integration.installationId, workspace: this.provider.getWorkspaceId().name }) + const iterable = this.app.eachRepository.iterator({ installationId: integration.installationId }) // Need to find all repositories, not only active, so passed repositories are not work. diff --git a/services/github/pod-github/src/users.ts b/services/github/pod-github/src/users.ts new file mode 100644 index 0000000000..21aef70acc --- /dev/null +++ b/services/github/pod-github/src/users.ts @@ -0,0 +1,69 @@ +import type { Account, Ref } from '@hcengineering/core' +import type { Collection, FindCursor } from 'mongodb' +import type { GithubUserRecord } from './types' + +export class UserManager { + userCache = new Map() + refUserCache = new Map() + + constructor (readonly usersCollection: Collection) {} + + public async getUsers (workspace: string): Promise { + return await this.usersCollection + .find({ + [`accounts.${workspace}`]: { $exists: true } + }) + .toArray() + } + + async getAccount (login: string): Promise { + let res = this.userCache.get(login) + if (res !== undefined) { + return res + } + res = (await this.usersCollection.findOne({ _id: login })) ?? undefined + if (res !== undefined) { + if (this.userCache.size > 1000) { + this.userCache.clear() + } + this.userCache.set(login, res) + } + return res + } + + async getAccountByRef (workspace: string, ref: Ref): Promise { + const key = `${workspace}.${ref}` + let rec = this.refUserCache.get(key) + if (rec !== undefined) { + return rec + } + rec = (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined + if (rec !== undefined) { + if (this.refUserCache.size > 1000) { + this.refUserCache.clear() + } + this.refUserCache.set(key, rec) + } + return rec + } + + async updateUser (dta: GithubUserRecord): Promise { + this.userCache.clear() + this.refUserCache.clear() + await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any) + } + + async insertUser (dta: GithubUserRecord): Promise { + await this.usersCollection.insertOne(dta) + } + + async removeUser (login: string): Promise { + this.userCache.clear() + this.refUserCache.clear() + await this.usersCollection.deleteOne({ _id: login }) + } + + getAllUsers (): FindCursor { + return this.usersCollection.find({}) + } +} diff --git a/services/github/pod-github/src/worker.ts b/services/github/pod-github/src/worker.ts index 0f273b91ef..452a97d0b6 100644 --- a/services/github/pod-github/src/worker.ts +++ b/services/github/pod-github/src/worker.ts @@ -31,6 +31,7 @@ import core, { concatLink, generateId, groupByArray, + reduceCalls, toIdMap, type Blob, type MigrationState @@ -189,9 +190,11 @@ export class GithubWorker implements IntegrationManager { if (v.octokit === undefined) { continue } - const project = await this.liveQuery.findOne(github.mixin.GithubProject, { - _id: space as Ref - }) + const project = ( + await this.liveQuery.queryFind(github.mixin.GithubProject, { + _id: space as Ref + }) + ).shift() if (project !== undefined) { const repositories = await this.liveQuery.queryFind( github.class.GithubIntegrationRepository, @@ -299,7 +302,7 @@ export class GithubWorker implements IntegrationManager { person, role: AccountRole.User }) - const acc = await this._client.findOne(contact.class.PersonAccount, { _id: id }) + const acc = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: id }) return acc } } @@ -391,12 +394,12 @@ export class GithubWorker implements IntegrationManager { let person: Ref | undefined // try to find by account. if (userInfo.email != null) { - const personAccount = await this.client.findOne(contact.class.PersonAccount, { email: userInfo.email }) + const personAccount = await this.liveQuery.findOne(contact.class.PersonAccount, { email: userInfo.email }) person = personAccount?.person } if (person === undefined) { - const channel = await this.client.findOne(contact.class.Channel, { + const channel = await this.liveQuery.findOne(contact.class.Channel, { provider: contact.channelProvider.GitHub, value: userInfo.login }) @@ -475,14 +478,19 @@ export class GithubWorker implements IntegrationManager { async syncUserData (ctx: MeasureContext, users: GithubUserRecord[]): Promise { // Let's sync information about users and send some details + const accounts = await this._client.findAll(contact.class.PersonAccount, { + email: { $in: users.map((it) => `github:${it._id}`) } + }) + const userAuths = await this._client.findAll(github.class.GithubAuthentication, {}) + const persons = await this._client.findAll(contact.class.Person, { _id: { $in: accounts.map((it) => it.person) } }) for (const record of users) { if (record.error !== undefined) { // Skip accounts with error continue } - const account = await this._client.findOne(contact.class.PersonAccount, { email: `github:${record._id}` }) - const userAuth = await this._client.findOne(github.class.GithubAuthentication, { login: record._id }) - const person = await this._client.findOne(contact.class.Person, { _id: account?.person }) + const account = accounts.find((it) => it.email === `github:${record._id}`) + const userAuth = userAuths.find((it) => it.login === record._id) + const person = persons.find((it) => account?.person) if (account === undefined || userAuth === undefined || person === undefined) { continue } @@ -517,12 +525,11 @@ export class GithubWorker implements IntegrationManager { let record = await this.platform.getAccountByRef(this.workspace.name, account) // const accountRef = this.accounts.find((it) => it._id === account) - const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account }) + const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account }) if (record === undefined) { if (accountRef !== undefined) { - const accounts = await this._client.findAll(contact.class.PersonAccount, {}) - const allAccounts = accounts.filter((it) => it.person === accountRef.person) - for (const aa of allAccounts) { + const accounts = this._client.getModel().getAccountByPersonId(accountRef.person) + for (const aa of accounts) { record = await this.platform.getAccountByRef(this.workspace.name, aa._id) if (record !== undefined) { break @@ -532,7 +539,7 @@ export class GithubWorker implements IntegrationManager { } // Check and refresh token if required. if (record !== undefined) { - this.ctx.info('get octokit', { account, recordId: record._id }) + this.ctx.info('get octokit', { account, recordId: record._id, workspace: this.workspace.name }) await this.platform.checkRefreshToken(record) return new Octokit({ auth: record.token, @@ -543,9 +550,9 @@ export class GithubWorker implements IntegrationManager { // We need to inform user, he need to authorize this account with github. if (accountRef !== undefined) { - const person = await this.client.findOne(contact.class.Person, { _id: accountRef.person }) + const person = await this.liveQuery.findOne(contact.class.Person, { _id: accountRef.person }) if (person !== undefined) { - const personSpace = await this.client.findOne(contact.class.PersonSpace, { person: person._id }) + const personSpace = await this.liveQuery.findOne(contact.class.PersonSpace, { person: person._id }) if (personSpace !== undefined) { await createNotification(this._client, person, { user: account, @@ -556,17 +563,16 @@ export class GithubWorker implements IntegrationManager { } } } - this.ctx.info('get octokit: return bot', { account }) + this.ctx.info('get octokit: return bot', { account, workspace: this.workspace.name }) } async isPlatformUser (account: Ref): Promise { let record = await this.platform.getAccountByRef(this.workspace.name, account) - const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account }) + const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account }) if (record === undefined) { if (accountRef !== undefined) { - const accounts = await this._client.findAll(contact.class.PersonAccount, {}) - const allAccounts = accounts.filter((it) => it.person === accountRef.person) - for (const aa of allAccounts) { + const accounts = this._client.getModel().getAccountByPersonId(accountRef.person) + for (const aa of accounts) { record = await this.platform.getAccountByRef(this.workspace.name, aa._id) if (record !== undefined) { break @@ -588,11 +594,11 @@ export class GithubWorker implements IntegrationManager { integrationsRaw: GithubIntegration[] = [] async getProjectType (type: Ref): Promise { - return await this._client.findOne(task.class.ProjectType, { _id: type }) + return (await this.liveQuery.queryFind(task.class.ProjectType, { _id: type })).shift() } async getTaskType (type: Ref): Promise { - return await this._client.findOne(task.class.TaskType, { _id: type }) + return (await this.liveQuery.queryFind(task.class.TaskType, { _id: type })).shift() } async getTaskTypeOf (project: Ref, ofClass: Ref>): Promise { @@ -687,13 +693,16 @@ export class GithubWorker implements IntegrationManager { }) }) - const userRecords = await this.platform.getUsers(this.workspace.name) - - await this.syncUserData(this.ctx, userRecords) - this.triggerRequests = 1 this.updateRequests = 1 this.syncPromise = this.syncAndWait() + + const userRecords = await this.platform.getUsers(this.workspace.name) + try { + await this.syncUserData(this.ctx, userRecords) + } catch (err: any) { + Analytics.handleError(err) + } } projects: GithubProject[] = [] @@ -756,7 +765,7 @@ export class GithubWorker implements IntegrationManager { syncLock: new Map() } this.integrations.set(it.installationId, current) - await this.repositoryManager.reloadRepositories(current) + await this.repositoryManager.reloadRepositories(current, inst.repositories) } catch (err: any) { Analytics.handleError(err) this.ctx.error('Error', { err }) @@ -768,7 +777,7 @@ export class GithubWorker implements IntegrationManager { continue } current.integration = it - await this.repositoryManager.reloadRepositories(current) + await this.repositoryManager.reloadRepositories(current, inst.repositories) } } } @@ -919,7 +928,14 @@ export class GithubWorker implements IntegrationManager { continue } - this.ctx.info('External Syncing', { name: repo.name, prj: prj.name, field, version, docs: docs.length }) + this.ctx.info('External Syncing', { + name: repo.name, + prj: prj.name, + field, + version, + docs: docs.length, + workspace: this.workspace.name + }) const byClass = this.groupByClass(docs) for (const [_class, _docs] of byClass.entries()) { @@ -1043,7 +1059,7 @@ export class GithubWorker implements IntegrationManager { if (this.updateRequests > 0) { this.updateRequests = 0 // Just in case await this.updateIntegrations() - await this.performFullSync() + void this.performFullSync() } const { projects, repositories } = await this.collectActiveProjects() @@ -1072,7 +1088,7 @@ export class GithubWorker implements IntegrationManager { if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) { if (this.previousWait !== 0) { - this.ctx.info('Wait for changes:', { previousWait: this.previousWait }) + this.ctx.info('Wait for changes:', { previousWait: this.previousWait, workspace: this.workspace.name }) this.previousWait = 0 } // Wait until some sync documents will be modified, updated. @@ -1125,7 +1141,7 @@ export class GithubWorker implements IntegrationManager { if (docs.length > 0) { this.previousWait += docs.length - this.ctx.info('Syncing', { docs: docs.length }) + this.ctx.info('Syncing', { docs: docs.length, workspace: this.workspace.name }) await this.doSyncFor(docs) } @@ -1239,7 +1255,7 @@ export class GithubWorker implements IntegrationManager { const existing = externalDocs.find((it) => it._id === info._id) const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper if (mapper === undefined) { - this.ctx.info('No mapper for class', { objectClass: info.objectClass }) + this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name }) await derivedClient.update(info, { needSync: githubSyncVersion }) @@ -1333,7 +1349,11 @@ export class GithubWorker implements IntegrationManager { private async waitChanges (): Promise { if (this.triggerRequests > 0 || this.updateRequests > 0) { - this.ctx.info('Trigger check pending:', { requests: this.triggerRequests, updates: this.updateRequests }) + this.ctx.info('Trigger check pending:', { + requests: this.triggerRequests, + updates: this.updateRequests, + workspace: this.workspace.name + }) this.triggerRequests = 0 return } @@ -1347,7 +1367,7 @@ export class GithubWorker implements IntegrationManager { triggerTimeout = setTimeout(() => { triggerTimeout = undefined if (was0) { - this.ctx.info('Sync triggered', { request: this.triggerRequests }) + this.ctx.info('Sync triggered', { request: this.triggerRequests, workspace: this.workspace.name }) } resolve() }, 50) // Small timeout to aggregate few bulk changes. @@ -1361,7 +1381,7 @@ export class GithubWorker implements IntegrationManager { updateTimeout = setTimeout(() => { updateTimeout = undefined if (was0) { - this.ctx.info('Sync update triggered', { requests: this.updateRequests }) + this.ctx.info('Sync update triggered', { requests: this.updateRequests, workspace: this.workspace.name }) } resolve() }, 50) // Small timeout to aggregate few bulk changes. @@ -1378,7 +1398,11 @@ export class GithubWorker implements IntegrationManager { this.triggerSync() } - async performFullSync (): Promise { + performFullSync = reduceCalls(async () => { + await this._performFullSync() + }) + + async _performFullSync (): Promise { // Wait previous active sync for (const integration of this.integrations.values()) { await this.ctx.withLog('external sync', { installation: integration.installationName }, async () => {