UBERF-7924: Fix workspace variable in logs + reuse installation account (#6376)

This commit is contained in:
Andrey Sobolev 2024-08-23 21:02:48 +07:00 committed by GitHub
parent 9b7880f86a
commit c92bd622ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 258 additions and 69 deletions

View File

@ -21,7 +21,7 @@ import { setMetadata } from '@hcengineering/platform'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import tracker from '@hcengineering/tracker'
import { Installation } from '@octokit/webhooks-types'
import { Installation, type InstallationCreatedEvent, type InstallationUnsuspendEvent } from '@octokit/webhooks-types'
import { Collection } from 'mongodb'
import { App, Octokit } from 'octokit'
@ -38,12 +38,15 @@ import { registerLoaders } from './loaders'
import { createNotification } from './notifications'
import { errorToObj } from './sync/utils'
import { GithubIntegrationRecord, GithubUserRecord } from './types'
import { UserManager } from './users'
import { GithubWorker, syncUser } from './worker'
export interface InstallationRecord {
installationName: string
login: string
loginNodeId: string
repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
type: 'Bot' | 'User' | 'Organization'
octokit: Octokit
}
@ -60,12 +63,14 @@ export class PlatformWorker {
mongoRef!: MongoClientReference
integrationCollection!: Collection<GithubIntegrationRecord>
usersCollection!: Collection<GithubUserRecord>
periodicTimer: any
periodicSyncPromise: Promise<void> | undefined
canceled = false
userManager!: UserManager
private constructor (
readonly ctx: MeasureContext,
readonly app: App,
@ -82,7 +87,8 @@ export class PlatformWorker {
const db = mongoClient.db(config.ConfigurationDB)
this.integrationCollection = db.collection<GithubIntegrationRecord>('installations')
this.usersCollection = db.collection<GithubUserRecord>('users')
this.userManager = new UserManager(db.collection<GithubUserRecord>('users'))
const storageConfig = storageConfigFromEnv()
this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL)
@ -165,7 +171,7 @@ export class PlatformWorker {
}
private async findUsersWorkspaces (): Promise<Map<string, GithubUserRecord[]>> {
const i = this.usersCollection.find({})
const i = this.userManager.getAllUsers()
const workspaces = new Map<string, GithubUserRecord[]>()
while (await i.hasNext()) {
const userInfo = await i.next()
@ -178,19 +184,16 @@ export class PlatformWorker {
}
}
}
await i.close()
return workspaces
}
public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
return await this.userManager.getUsers(workspace)
}
public async getUser (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.find<GithubUserRecord>({ _id: login }).toArray()).shift()
return await this.userManager.getAccount(login)
}
async mapInstallation (
@ -263,8 +266,8 @@ export class PlatformWorker {
private async removeInstallationFromWorkspace (client: Client, installationId: number): Promise<void> {
const wsIntegerations = await client.findAll(github.class.GithubIntegration, { installationId })
const ops = new TxOperations(client, core.account.System)
for (const intValue of wsIntegerations) {
const ops = new TxOperations(client, core.account.System)
await ops.remove<GithubIntegration>(intValue)
}
}
@ -347,12 +350,13 @@ export class PlatformWorker {
scope: resultJson.scope,
accounts: { [payload.workspace]: payload.accountId }
}
const [existingUser] = await this.usersCollection.find({ _id: user.data.login }).toArray()
if (existingUser === undefined) {
await this.usersCollection.insertOne(dta)
await this.userManager.updateUser(dta)
const existingUser = await this.userManager.getAccount(user.data.login)
if (existingUser == null) {
await this.userManager.insertUser(dta)
} else {
dta.accounts = { ...existingUser.accounts, [payload.workspace]: payload.accountId }
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
await this.userManager.updateUser(dta)
}
// Update workspace client login info.
@ -520,17 +524,17 @@ export class PlatformWorker {
auth.refreshTokenExpiresIn = dta.refreshTokenExpiresIn
auth.scope = dta.scope
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
await this.userManager.updateUser(dta)
}
}
}
async getAccount (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ _id: login })) ?? undefined
return await this.userManager.getAccount(login)
}
async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined
return await this.userManager.getAccountByRef(workspace, ref)
}
private async updateInstallation (installationId: number): Promise<void> {
@ -544,6 +548,24 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}`
}
this.updateInstallationRecord(installationId, val)
}
}
private updateInstallationRecord (installationId: number, val: InstallationRecord): void {
const current = this.installations.get(installationId)
if (current !== undefined) {
if (val.octokit !== undefined) {
current.octokit = val.octokit
}
current.login = val.login
current.loginNodeId = val.loginNodeId
current.type = val.type
current.installationName = val.installationName
if (val.repositories !== undefined) {
current.repositories = val.repositories
}
} else {
this.installations.set(installationId, val)
}
}
@ -558,7 +580,7 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}`
}
this.installations.set(install.installation.id, val)
this.updateInstallationRecord(install.installation.id, val)
ctx.info('Found installation', {
installationId: install.installation.id,
url: install.installation.account?.html_url ?? ''
@ -566,16 +588,22 @@ export class PlatformWorker {
}
}
async handleInstallationEvent (install: Installation, enabled: boolean): Promise<void> {
async handleInstallationEvent (
install: Installation,
repositories: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'],
enabled: boolean
): Promise<void> {
this.ctx.info('handle integration add', { installId: install.id, name: install.html_url })
const okit = await this.app.getInstallationOctokit(install.id)
const iName = `${install.account.html_url ?? ''}`
this.installations.set(install.id, {
this.updateInstallationRecord(install.id, {
octokit: okit,
login: install.account.login,
type: install.account?.type ?? 'User',
loginNodeId: install.account.node_id,
installationName: iName
installationName: iName,
repositories
})
const worker = this.getWorker(install.id)
@ -612,6 +640,9 @@ export class PlatformWorker {
if (integeration !== undefined) {
integeration.enabled = false
integeration.synchronized = new Set()
await this.removeInstallationFromWorkspace(worker._client, installId)
await worker._client.remove(integeration.integration)
}
worker.integrations.delete(installId)
@ -797,11 +828,11 @@ export class PlatformWorker {
if (event.payload.action === 'revoked') {
const sender = event.payload.sender
const records = await this.usersCollection.find({ _id: sender.login }).toArray()
for (const r of records) {
await this.revokeUserAuth(r)
const record = await this.getAccount(sender.login)
if (record !== undefined) {
await this.revokeUserAuth(record)
await this.userManager.removeUser(sender.login)
}
await this.usersCollection.deleteOne({ _id: sender.login })
}
})
@ -902,7 +933,7 @@ export class PlatformWorker {
case 'created':
case 'unsuspend': {
catchEventError(
this.handleInstallationEvent(payload.installation, true),
this.handleInstallationEvent(payload.installation, payload.repositories, true),
payload.action,
name,
id,
@ -912,7 +943,7 @@ export class PlatformWorker {
}
case 'suspend': {
catchEventError(
this.handleInstallationEvent(payload.installation, false),
this.handleInstallationEvent(payload.installation, payload.repositories, false),
payload.action,
name,
id,

View File

@ -4,9 +4,14 @@
//
import core, { Doc, DocData, DocumentUpdate, MeasureContext, TxOperations, generateId } from '@hcengineering/core'
import { Endpoints } from '@octokit/types'
import { Repository, RepositoryEvent } from '@octokit/webhooks-types'
import github, { DocSyncInfo, GithubIntegrationRepository, GithubProject } from '@hcengineering/github'
import { Endpoints } from '@octokit/types'
import {
Repository,
RepositoryEvent,
type InstallationCreatedEvent,
type InstallationUnsuspendEvent
} from '@octokit/webhooks-types'
import { App } from 'octokit'
import { DocSyncManager, ExternalSyncField, IntegrationContainer, IntegrationManager } from '../types'
import { collectUpdate } from './utils'
@ -34,8 +39,67 @@ export class RepositorySyncMapper implements DocSyncManager {
return {}
}
async reloadRepositories (integration: IntegrationContainer): Promise<void> {
async reloadRepositories (
integration: IntegrationContainer,
repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
): Promise<void> {
integration.synchronized.delete(syncReposKey)
if (repositories !== undefined) {
// We have a list of repositories, so we could create them if they are missing.
// Need to find all repositories, not only active, so passed repositories are not work.
const allRepositories = (
await this.provider.liveQuery.queryFind(github.class.GithubIntegrationRepository, {})
).filter((it) => it.attachedTo === integration.integration._id)
const allRepos: GithubIntegrationRepository[] = [...allRepositories]
for (const repository of repositories) {
const integrationRepo: GithubIntegrationRepository | undefined = allRepos.find(
(it) => it.repositoryId === repository.id
)
if (integrationRepo === undefined) {
// No integration repository found, we need to push one.
await this.client.addCollection(
github.class.GithubIntegrationRepository,
integration.integration.space,
integration.integration._id,
integration.integration._class,
'repositories',
{
nodeId: repository.node_id,
name: repository.name,
url: integration.installationName + '/' + repository.name,
repositoryId: repository.id,
enabled: true,
deleted: false,
archived: false,
fork: false,
forks: 0,
hasDiscussions: false,
hasDownloads: false,
hasIssues: false,
hasPages: false,
hasProjects: false,
hasWiki: false,
openIssues: 0,
private: repository.private,
size: 0,
stargazers: 0,
watchers: 0,
visibility: repository.private ? 'private' : 'public'
},
undefined, // id
Date.now(),
integration.integration.createdBy
)
this.ctx.info('Creating repository info document...', {
url: repository.full_name,
workspace: this.provider.getWorkspaceId().name
})
}
}
}
}
async handleEvent<T>(integration: IntegrationContainer, derivedClient: TxOperations, evt: T): Promise<void> {
@ -191,6 +255,7 @@ export class RepositorySyncMapper implements DocSyncManager {
installationId: integration.installationId,
workspace: this.provider.getWorkspaceId().name
})
const iterable = this.app.eachRepository.iterator({ installationId: integration.installationId })
// Need to find all repositories, not only active, so passed repositories are not work.

View File

@ -0,0 +1,69 @@
import type { Account, Ref } from '@hcengineering/core'
import type { Collection, FindCursor } from 'mongodb'
import type { GithubUserRecord } from './types'
export class UserManager {
userCache = new Map<string, GithubUserRecord>()
refUserCache = new Map<string, GithubUserRecord>()
constructor (readonly usersCollection: Collection<GithubUserRecord>) {}
public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
}
async getAccount (login: string): Promise<GithubUserRecord | undefined> {
let res = this.userCache.get(login)
if (res !== undefined) {
return res
}
res = (await this.usersCollection.findOne({ _id: login })) ?? undefined
if (res !== undefined) {
if (this.userCache.size > 1000) {
this.userCache.clear()
}
this.userCache.set(login, res)
}
return res
}
async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
const key = `${workspace}.${ref}`
let rec = this.refUserCache.get(key)
if (rec !== undefined) {
return rec
}
rec = (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined
if (rec !== undefined) {
if (this.refUserCache.size > 1000) {
this.refUserCache.clear()
}
this.refUserCache.set(key, rec)
}
return rec
}
async updateUser (dta: GithubUserRecord): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
}
async insertUser (dta: GithubUserRecord): Promise<void> {
await this.usersCollection.insertOne(dta)
}
async removeUser (login: string): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.deleteOne({ _id: login })
}
getAllUsers (): FindCursor<GithubUserRecord> {
return this.usersCollection.find({})
}
}

View File

@ -31,6 +31,7 @@ import core, {
concatLink,
generateId,
groupByArray,
reduceCalls,
toIdMap,
type Blob,
type MigrationState
@ -189,9 +190,11 @@ export class GithubWorker implements IntegrationManager {
if (v.octokit === undefined) {
continue
}
const project = await this.liveQuery.findOne<GithubProject>(github.mixin.GithubProject, {
_id: space as Ref<GithubProject>
})
const project = (
await this.liveQuery.queryFind<GithubProject>(github.mixin.GithubProject, {
_id: space as Ref<GithubProject>
})
).shift()
if (project !== undefined) {
const repositories = await this.liveQuery.queryFind<GithubIntegrationRepository>(
github.class.GithubIntegrationRepository,
@ -299,7 +302,7 @@ export class GithubWorker implements IntegrationManager {
person,
role: AccountRole.User
})
const acc = await this._client.findOne(contact.class.PersonAccount, { _id: id })
const acc = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: id })
return acc
}
}
@ -391,12 +394,12 @@ export class GithubWorker implements IntegrationManager {
let person: Ref<Person> | undefined
// try to find by account.
if (userInfo.email != null) {
const personAccount = await this.client.findOne(contact.class.PersonAccount, { email: userInfo.email })
const personAccount = await this.liveQuery.findOne(contact.class.PersonAccount, { email: userInfo.email })
person = personAccount?.person
}
if (person === undefined) {
const channel = await this.client.findOne(contact.class.Channel, {
const channel = await this.liveQuery.findOne(contact.class.Channel, {
provider: contact.channelProvider.GitHub,
value: userInfo.login
})
@ -475,14 +478,19 @@ export class GithubWorker implements IntegrationManager {
async syncUserData (ctx: MeasureContext, users: GithubUserRecord[]): Promise<void> {
// Let's sync information about users and send some details
const accounts = await this._client.findAll(contact.class.PersonAccount, {
email: { $in: users.map((it) => `github:${it._id}`) }
})
const userAuths = await this._client.findAll(github.class.GithubAuthentication, {})
const persons = await this._client.findAll(contact.class.Person, { _id: { $in: accounts.map((it) => it.person) } })
for (const record of users) {
if (record.error !== undefined) {
// Skip accounts with error
continue
}
const account = await this._client.findOne(contact.class.PersonAccount, { email: `github:${record._id}` })
const userAuth = await this._client.findOne(github.class.GithubAuthentication, { login: record._id })
const person = await this._client.findOne(contact.class.Person, { _id: account?.person })
const account = accounts.find((it) => it.email === `github:${record._id}`)
const userAuth = userAuths.find((it) => it.login === record._id)
const person = persons.find((it) => account?.person)
if (account === undefined || userAuth === undefined || person === undefined) {
continue
}
@ -517,12 +525,11 @@ export class GithubWorker implements IntegrationManager {
let record = await this.platform.getAccountByRef(this.workspace.name, account)
// const accountRef = this.accounts.find((it) => it._id === account)
const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account })
const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account })
if (record === undefined) {
if (accountRef !== undefined) {
const accounts = await this._client.findAll(contact.class.PersonAccount, {})
const allAccounts = accounts.filter((it) => it.person === accountRef.person)
for (const aa of allAccounts) {
const accounts = this._client.getModel().getAccountByPersonId(accountRef.person)
for (const aa of accounts) {
record = await this.platform.getAccountByRef(this.workspace.name, aa._id)
if (record !== undefined) {
break
@ -532,7 +539,7 @@ export class GithubWorker implements IntegrationManager {
}
// Check and refresh token if required.
if (record !== undefined) {
this.ctx.info('get octokit', { account, recordId: record._id })
this.ctx.info('get octokit', { account, recordId: record._id, workspace: this.workspace.name })
await this.platform.checkRefreshToken(record)
return new Octokit({
auth: record.token,
@ -543,9 +550,9 @@ export class GithubWorker implements IntegrationManager {
// We need to inform user, he need to authorize this account with github.
if (accountRef !== undefined) {
const person = await this.client.findOne(contact.class.Person, { _id: accountRef.person })
const person = await this.liveQuery.findOne(contact.class.Person, { _id: accountRef.person })
if (person !== undefined) {
const personSpace = await this.client.findOne(contact.class.PersonSpace, { person: person._id })
const personSpace = await this.liveQuery.findOne(contact.class.PersonSpace, { person: person._id })
if (personSpace !== undefined) {
await createNotification(this._client, person, {
user: account,
@ -556,17 +563,16 @@ export class GithubWorker implements IntegrationManager {
}
}
}
this.ctx.info('get octokit: return bot', { account })
this.ctx.info('get octokit: return bot', { account, workspace: this.workspace.name })
}
async isPlatformUser (account: Ref<PersonAccount>): Promise<boolean> {
let record = await this.platform.getAccountByRef(this.workspace.name, account)
const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account })
const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account })
if (record === undefined) {
if (accountRef !== undefined) {
const accounts = await this._client.findAll(contact.class.PersonAccount, {})
const allAccounts = accounts.filter((it) => it.person === accountRef.person)
for (const aa of allAccounts) {
const accounts = this._client.getModel().getAccountByPersonId(accountRef.person)
for (const aa of accounts) {
record = await this.platform.getAccountByRef(this.workspace.name, aa._id)
if (record !== undefined) {
break
@ -588,11 +594,11 @@ export class GithubWorker implements IntegrationManager {
integrationsRaw: GithubIntegration[] = []
async getProjectType (type: Ref<ProjectType>): Promise<ProjectType | undefined> {
return await this._client.findOne(task.class.ProjectType, { _id: type })
return (await this.liveQuery.queryFind(task.class.ProjectType, { _id: type })).shift()
}
async getTaskType (type: Ref<TaskType>): Promise<TaskType | undefined> {
return await this._client.findOne(task.class.TaskType, { _id: type })
return (await this.liveQuery.queryFind(task.class.TaskType, { _id: type })).shift()
}
async getTaskTypeOf (project: Ref<ProjectType>, ofClass: Ref<Class<Doc>>): Promise<TaskType | undefined> {
@ -687,13 +693,16 @@ export class GithubWorker implements IntegrationManager {
})
})
const userRecords = await this.platform.getUsers(this.workspace.name)
await this.syncUserData(this.ctx, userRecords)
this.triggerRequests = 1
this.updateRequests = 1
this.syncPromise = this.syncAndWait()
const userRecords = await this.platform.getUsers(this.workspace.name)
try {
await this.syncUserData(this.ctx, userRecords)
} catch (err: any) {
Analytics.handleError(err)
}
}
projects: GithubProject[] = []
@ -756,7 +765,7 @@ export class GithubWorker implements IntegrationManager {
syncLock: new Map()
}
this.integrations.set(it.installationId, current)
await this.repositoryManager.reloadRepositories(current)
await this.repositoryManager.reloadRepositories(current, inst.repositories)
} catch (err: any) {
Analytics.handleError(err)
this.ctx.error('Error', { err })
@ -768,7 +777,7 @@ export class GithubWorker implements IntegrationManager {
continue
}
current.integration = it
await this.repositoryManager.reloadRepositories(current)
await this.repositoryManager.reloadRepositories(current, inst.repositories)
}
}
}
@ -919,7 +928,14 @@ export class GithubWorker implements IntegrationManager {
continue
}
this.ctx.info('External Syncing', { name: repo.name, prj: prj.name, field, version, docs: docs.length })
this.ctx.info('External Syncing', {
name: repo.name,
prj: prj.name,
field,
version,
docs: docs.length,
workspace: this.workspace.name
})
const byClass = this.groupByClass(docs)
for (const [_class, _docs] of byClass.entries()) {
@ -1043,7 +1059,7 @@ export class GithubWorker implements IntegrationManager {
if (this.updateRequests > 0) {
this.updateRequests = 0 // Just in case
await this.updateIntegrations()
await this.performFullSync()
void this.performFullSync()
}
const { projects, repositories } = await this.collectActiveProjects()
@ -1072,7 +1088,7 @@ export class GithubWorker implements IntegrationManager {
if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) {
if (this.previousWait !== 0) {
this.ctx.info('Wait for changes:', { previousWait: this.previousWait })
this.ctx.info('Wait for changes:', { previousWait: this.previousWait, workspace: this.workspace.name })
this.previousWait = 0
}
// Wait until some sync documents will be modified, updated.
@ -1125,7 +1141,7 @@ export class GithubWorker implements IntegrationManager {
if (docs.length > 0) {
this.previousWait += docs.length
this.ctx.info('Syncing', { docs: docs.length })
this.ctx.info('Syncing', { docs: docs.length, workspace: this.workspace.name })
await this.doSyncFor(docs)
}
@ -1239,7 +1255,7 @@ export class GithubWorker implements IntegrationManager {
const existing = externalDocs.find((it) => it._id === info._id)
const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper
if (mapper === undefined) {
this.ctx.info('No mapper for class', { objectClass: info.objectClass })
this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name })
await derivedClient.update<DocSyncInfo>(info, {
needSync: githubSyncVersion
})
@ -1333,7 +1349,11 @@ export class GithubWorker implements IntegrationManager {
private async waitChanges (): Promise<void> {
if (this.triggerRequests > 0 || this.updateRequests > 0) {
this.ctx.info('Trigger check pending:', { requests: this.triggerRequests, updates: this.updateRequests })
this.ctx.info('Trigger check pending:', {
requests: this.triggerRequests,
updates: this.updateRequests,
workspace: this.workspace.name
})
this.triggerRequests = 0
return
}
@ -1347,7 +1367,7 @@ export class GithubWorker implements IntegrationManager {
triggerTimeout = setTimeout(() => {
triggerTimeout = undefined
if (was0) {
this.ctx.info('Sync triggered', { request: this.triggerRequests })
this.ctx.info('Sync triggered', { request: this.triggerRequests, workspace: this.workspace.name })
}
resolve()
}, 50) // Small timeout to aggregate few bulk changes.
@ -1361,7 +1381,7 @@ export class GithubWorker implements IntegrationManager {
updateTimeout = setTimeout(() => {
updateTimeout = undefined
if (was0) {
this.ctx.info('Sync update triggered', { requests: this.updateRequests })
this.ctx.info('Sync update triggered', { requests: this.updateRequests, workspace: this.workspace.name })
}
resolve()
}, 50) // Small timeout to aggregate few bulk changes.
@ -1378,7 +1398,11 @@ export class GithubWorker implements IntegrationManager {
this.triggerSync()
}
async performFullSync (): Promise<void> {
performFullSync = reduceCalls(async () => {
await this._performFullSync()
})
async _performFullSync (): Promise<void> {
// Wait previous active sync
for (const integration of this.integrations.values()) {
await this.ctx.withLog('external sync', { installation: integration.installationName }, async () => {