feat: add copilot metrics (#8455)

fix CLOUD-73
This commit is contained in:
darkskygit 2024-10-18 03:30:02 +00:00
parent 4122cec096
commit fa554b1054
No known key found for this signature in database
GPG Key ID: 97B7D036B1566E9D
14 changed files with 198 additions and 112 deletions

View File

@ -3,7 +3,7 @@ import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import {
CallTimer,
CallMetric,
Config,
type EventPayload,
metrics,
@ -47,7 +47,7 @@ export class DocStorageCronJob implements OnModuleInit {
}
}
@CallTimer('doc', 'auto_merge_pending_doc_updates')
@CallMetric('doc', 'auto_merge_pending_doc_updates')
async autoMergePendingDocUpdates() {
try {
const randomDoc = await this.workspace.randomDoc();

View File

@ -3,7 +3,7 @@ import { chunk } from 'lodash-es';
import * as Y from 'yjs';
import {
CallTimer,
CallMetric,
Config,
mergeUpdatesInApplyWay as yotcoMergeUpdates,
metrics,
@ -89,12 +89,12 @@ export class DocStorageOptions implements IDocStorageOptions {
return this.config.doc.history.interval;
};
@CallTimer('doc', 'yjs_merge_updates')
@CallMetric('doc', 'yjs_merge_updates')
private simpleMergeUpdates(updates: Uint8Array[]) {
return Y.mergeUpdates(updates);
}
@CallTimer('doc', 'yjs_recover_updates_to_doc')
@CallMetric('doc', 'yjs_recover_updates_to_doc')
private recoverDoc(updates: Uint8Array[]): Promise<Y.Doc> {
const doc = new Y.Doc();
const chunks = chunk(updates, 10);

View File

@ -7,7 +7,7 @@ import {
UndoManager,
} from 'yjs';
import { CallTimer } from '../../../fundamentals';
import { CallMetric } from '../../../fundamentals';
import { Connection } from './connection';
import { SingletonLocker } from './lock';
@ -165,7 +165,7 @@ export abstract class DocStorageAdapter extends Connection {
force?: boolean
): Promise<boolean>;
@CallTimer('doc', 'squash')
@CallMetric('doc', 'squash')
protected async squash(updates: DocUpdate[]): Promise<DocUpdate> {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
const lastUpdate = updates.at(-1);

View File

@ -12,7 +12,7 @@ import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';
import {
AlreadyInSpace,
CallTimer,
CallMetric,
Config,
DocNotFound,
GatewayErrorWrapper,
@ -33,7 +33,7 @@ import { DocID } from '../utils/doc';
const SubscribeMessage = (event: string) =>
applyDecorators(
GatewayErrorWrapper(event),
CallTimer('socketio', 'event_duration', { event }),
CallMetric('socketio', 'event_duration', undefined, { event }),
RawSubscribeMessage(event)
);

View File

@ -6,7 +6,7 @@ import {
AccessDenied,
ActionForbidden,
BlobNotFound,
CallTimer,
CallMetric,
DocHistoryNotFound,
DocNotFound,
InvalidHistoryTimestamp,
@ -32,7 +32,7 @@ export class WorkspacesController {
// NOTE: because graphql can't represent a File, so we have to use REST API to get blob
@Public()
@Get('/:id/blobs/:name')
@CallTimer('controllers', 'workspace_get_blob')
@CallMetric('controllers', 'workspace_get_blob')
async blob(
@CurrentUser() user: CurrentUser | undefined,
@Param('id') workspaceId: string,
@ -76,7 +76,7 @@ export class WorkspacesController {
// get doc binary
@Public()
@Get('/:id/docs/:guid')
@CallTimer('controllers', 'workspace_get_doc')
@CallMetric('controllers', 'workspace_get_doc')
async doc(
@CurrentUser() user: CurrentUser | undefined,
@Param('id') ws: string,
@ -128,7 +128,7 @@ export class WorkspacesController {
}
@Get('/:id/docs/:guid/histories/:timestamp')
@CallTimer('controllers', 'workspace_get_history')
@CallMetric('controllers', 'workspace_get_history')
async history(
@CurrentUser() user: CurrentUser,
@Param('id') ws: string,

View File

@ -19,7 +19,7 @@ export type { GraphqlContext } from './graphql';
export * from './guard';
export { CryptoHelper, URLHelper } from './helpers';
export { MailService } from './mailer';
export { CallCounter, CallTimer, metrics } from './metrics';
export { CallMetric, metrics } from './metrics';
export { type ILocker, Lock, Locker, Mutex, RequestMutex } from './mutex';
export {
GatewayErrorWrapper,

View File

@ -36,7 +36,8 @@ export type KnownMetricScopes =
| 'controllers'
| 'doc'
| 'sse'
| 'mail';
| 'mail'
| 'ai';
const metricCreators: MetricCreators = {
counter(meter: Meter, name: string, opts?: MetricOptions) {

View File

@ -1,10 +1,18 @@
import { Attributes } from '@opentelemetry/api';
import type { Attributes } from '@opentelemetry/api';
import { KnownMetricScopes, metrics } from './metrics';
import { type KnownMetricScopes, metrics } from './metrics';
export const CallTimer = (
/**
* Decorator for measuring the call time, record call count and if is throw of a function call
* @param scope metric scope
* @param name metric event name
* @param attrs attributes
* @returns
*/
export const CallMetric = (
scope: KnownMetricScopes,
name: string,
record?: { timer?: boolean; count?: boolean; error?: boolean },
attrs?: Attributes
): MethodDecorator => {
// @ts-expect-error allow
@ -23,54 +31,35 @@ export const CallTimer = (
description: `function call time costs of ${name}`,
unit: 'ms',
});
metrics[scope]
.counter(`${name}_calls`, {
description: `function call counts of ${name}`,
})
.add(1, attrs);
const count = metrics[scope].counter(`${name}_calls`, {
description: `function call counter of ${name}`,
});
const errorCount = metrics[scope].counter(`${name}_errors`, {
description: `function call error counter of ${name}`,
});
const start = Date.now();
const end = () => {
timer.record(Date.now() - start, attrs);
timer?.record(Date.now() - start, attrs);
};
try {
if (!record || !!record.count) {
count.add(1, attrs);
}
return await originalMethod.apply(this, args);
} catch (err) {
if (!record || !!record.error) {
errorCount.add(1, attrs);
}
throw err;
} finally {
end();
if (!record || !!record.timer) {
end();
}
}
};
return desc;
};
};
export const CallCounter = (
scope: KnownMetricScopes,
name: string,
attrs?: Attributes
): MethodDecorator => {
// @ts-expect-error allow
return (
_target,
_key,
desc: TypedPropertyDescriptor<(...args: any[]) => any>
) => {
const originalMethod = desc.value;
if (!originalMethod) {
return desc;
}
desc.value = function (...args: any[]) {
const count = metrics[scope].counter(name, {
description: `function call counter of ${name}`,
});
count.add(1, attrs);
return originalMethod.apply(this, args);
};
return desc;
};
};

View File

@ -30,10 +30,12 @@ import {
import { CurrentUser, Public } from '../../core/auth';
import {
BlobNotFound,
CallMetric,
Config,
CopilotFailedToGenerateText,
CopilotSessionNotFound,
mapSseError,
metrics,
NoCopilotProviderAvailable,
UnsplashIsNotConfigured,
} from '../../fundamentals';
@ -178,6 +180,7 @@ export class CopilotController {
}
@Get('/chat/:sessionId')
@CallMetric('ai', 'chat', { timer: true })
async chat(
@CurrentUser() user: CurrentUser,
@Req() req: Request,
@ -185,6 +188,7 @@ export class CopilotController {
@Query() params: Record<string, string | string[]>
): Promise<string> {
const { messageId } = this.prepareParams(params);
const provider = await this.chooseTextProvider(
user.id,
sessionId,
@ -192,8 +196,8 @@ export class CopilotController {
);
const session = await this.appendSessionMessage(sessionId, messageId);
try {
metrics.ai.counter('chat_calls').add(1, { model: session.model });
const content = await provider.generateText(
session.finish(params),
session.model,
@ -213,27 +217,30 @@ export class CopilotController {
return content;
} catch (e: any) {
metrics.ai.counter('chat_errors').add(1, { model: session.model });
throw new CopilotFailedToGenerateText(e.message);
}
}
@Sse('/chat/:sessionId/stream')
@CallMetric('ai', 'chat_stream', { timer: true })
async chatStream(
@CurrentUser() user: CurrentUser,
@Req() req: Request,
@Param('sessionId') sessionId: string,
@Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params);
const provider = await this.chooseTextProvider(
user.id,
sessionId,
messageId
);
const session = await this.appendSessionMessage(sessionId, messageId);
try {
const { messageId } = this.prepareParams(params);
const provider = await this.chooseTextProvider(
user.id,
sessionId,
messageId
);
const session = await this.appendSessionMessage(sessionId, messageId);
metrics.ai.counter('chat_stream_calls').add(1, { model: session.model });
const source$ = from(
provider.generateTextStream(session.finish(params), session.model, {
...session.config.promptConfig,
@ -262,25 +269,34 @@ export class CopilotController {
)
)
),
catchError(mapSseError)
catchError(e => {
metrics.ai
.counter('chat_stream_errors')
.add(1, { model: session.model });
return mapSseError(e);
})
);
return this.mergePingStream(messageId, source$);
} catch (err) {
metrics.ai.counter('chat_stream_errors').add(1, { model: session.model });
return mapSseError(err);
}
}
@Sse('/chat/:sessionId/workflow')
@CallMetric('ai', 'chat_workflow', { timer: true })
async chatWorkflow(
@CurrentUser() user: CurrentUser,
@Req() req: Request,
@Param('sessionId') sessionId: string,
@Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params);
const session = await this.appendSessionMessage(sessionId, messageId);
try {
const { messageId } = this.prepareParams(params);
const session = await this.appendSessionMessage(sessionId, messageId);
metrics.ai.counter('workflow_calls').add(1, { model: session.model });
const latestMessage = session.stashMessages.findLast(
m => m.role === 'user'
);
@ -347,41 +363,51 @@ export class CopilotController {
)
)
),
catchError(mapSseError)
catchError(e => {
metrics.ai
.counter('workflow_errors')
.add(1, { model: session.model });
return mapSseError(e);
})
);
return this.mergePingStream(messageId, source$);
} catch (err) {
metrics.ai.counter('workflow_errors').add(1, { model: session.model });
return mapSseError(err);
}
}
@Sse('/chat/:sessionId/images')
@CallMetric('ai', 'chat_images', { timer: true })
async chatImagesStream(
@CurrentUser() user: CurrentUser,
@Req() req: Request,
@Param('sessionId') sessionId: string,
@Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params);
const { model, hasAttachment } = await this.checkRequest(
user.id,
sessionId,
messageId
);
const provider = await this.provider.getProviderByCapability(
hasAttachment
? CopilotCapability.ImageToImage
: CopilotCapability.TextToImage,
model
);
if (!provider) {
throw new NoCopilotProviderAvailable();
}
const session = await this.appendSessionMessage(sessionId, messageId);
try {
const { messageId } = this.prepareParams(params);
const { model, hasAttachment } = await this.checkRequest(
user.id,
sessionId,
messageId
);
const provider = await this.provider.getProviderByCapability(
hasAttachment
? CopilotCapability.ImageToImage
: CopilotCapability.TextToImage,
model
);
if (!provider) {
throw new NoCopilotProviderAvailable();
}
const session = await this.appendSessionMessage(sessionId, messageId);
metrics.ai
.counter('images_stream_calls')
.add(1, { model: session.model });
const handleRemoteLink = this.storage.handleRemoteLink.bind(
this.storage,
user.id,
@ -423,15 +449,24 @@ export class CopilotController {
)
)
),
catchError(mapSseError)
catchError(e => {
metrics.ai
.counter('images_stream_errors')
.add(1, { model: session.model });
return mapSseError(e);
})
);
return this.mergePingStream(messageId, source$);
} catch (err) {
metrics.ai
.counter('images_stream_errors')
.add(1, { model: session.model });
return mapSseError(err);
}
}
@CallMetric('ai', 'unsplash')
@Get('/unsplash/photos')
async unsplashPhotos(
@Req() req: Request,

View File

@ -9,6 +9,7 @@ import { z, ZodType } from 'zod';
import {
CopilotPromptInvalid,
CopilotProviderSideError,
metrics,
UserFriendlyError,
} from '../../../fundamentals';
import {
@ -217,6 +218,7 @@ export class FalProvider
// by default, image prompt assumes there is only one message
const prompt = this.extractPrompt(messages.pop());
try {
metrics.ai.counter('chat_text_calls').add(1, { model });
const response = await fetch(`https://fal.run/fal-ai/${model}`, {
method: 'POST',
headers: {
@ -237,6 +239,7 @@ export class FalProvider
}
return data.output;
} catch (e: any) {
metrics.ai.counter('chat_text_errors').add(1, { model });
throw this.handleError(e);
}
}
@ -246,15 +249,21 @@ export class FalProvider
model: string = 'llava-next',
options: CopilotChatOptions = {}
): AsyncIterable<string> {
const result = await this.generateText(messages, model, options);
try {
metrics.ai.counter('chat_text_stream_calls').add(1, { model });
const result = await this.generateText(messages, model, options);
for await (const content of result) {
if (content) {
yield content;
if (options.signal?.aborted) {
break;
for await (const content of result) {
if (content) {
yield content;
if (options.signal?.aborted) {
break;
}
}
}
} catch (e) {
metrics.ai.counter('chat_text_stream_errors').add(1, { model });
throw e;
}
}
@ -299,6 +308,8 @@ export class FalProvider
}
try {
metrics.ai.counter('generate_images_calls').add(1, { model });
const data = await this.buildResponse(messages, model, options);
if (!data.images?.length && !data.image?.url) {
@ -315,6 +326,7 @@ export class FalProvider
.map(image => image.url) || []
);
} catch (e: any) {
metrics.ai.counter('generate_images_errors').add(1, { model });
throw this.handleError(e);
}
}
@ -324,9 +336,15 @@ export class FalProvider
model: string = this.availableModels[0],
options: CopilotImageOptions = {}
): AsyncIterable<string> {
const ret = await this.generateImages(messages, model, options);
for (const url of ret) {
yield url;
try {
metrics.ai.counter('generate_images_stream_calls').add(1, { model });
const ret = await this.generateImages(messages, model, options);
for (const url of ret) {
yield url;
}
} catch (e) {
metrics.ai.counter('generate_images_stream_errors').add(1, { model });
throw e;
}
}
}

View File

@ -1,9 +1,10 @@
import { Logger } from '@nestjs/common';
import { APIError, ClientOptions, OpenAI } from 'openai';
import { APIError, BadRequestError, ClientOptions, OpenAI } from 'openai';
import {
CopilotPromptInvalid,
CopilotProviderSideError,
metrics,
UserFriendlyError,
} from '../../../fundamentals';
import {
@ -179,10 +180,23 @@ export class OpenAIProvider
}
}
private handleError(e: any) {
private handleError(
e: any,
model: string,
options: CopilotImageOptions = {}
) {
if (e instanceof UserFriendlyError) {
return e;
} else if (e instanceof APIError) {
if (
e instanceof BadRequestError &&
(e.message.includes('safety') || e.message.includes('risk'))
) {
metrics.ai
.counter('chat_text_risk_errors')
.add(1, { model, user: options.user || undefined });
}
return new CopilotProviderSideError({
provider: this.type,
kind: e.type || 'unknown',
@ -206,6 +220,7 @@ export class OpenAIProvider
this.checkParams({ messages, model, options });
try {
metrics.ai.counter('chat_text_calls').add(1, { model });
const result = await this.instance.chat.completions.create(
{
messages: this.chatToGPTMessage(messages),
@ -223,7 +238,8 @@ export class OpenAIProvider
if (!content) throw new Error('Failed to generate text');
return content.trim();
} catch (e: any) {
throw this.handleError(e);
metrics.ai.counter('chat_text_errors').add(1, { model });
throw this.handleError(e, model, options);
}
}
@ -235,6 +251,7 @@ export class OpenAIProvider
this.checkParams({ messages, model, options });
try {
metrics.ai.counter('chat_text_stream_calls').add(1, { model });
const result = await this.instance.chat.completions.create(
{
stream: true,
@ -268,7 +285,8 @@ export class OpenAIProvider
}
}
} catch (e: any) {
throw this.handleError(e);
metrics.ai.counter('chat_text_stream_errors').add(1, { model });
throw this.handleError(e, model, options);
}
}
@ -283,15 +301,19 @@ export class OpenAIProvider
this.checkParams({ embeddings: messages, model, options });
try {
metrics.ai.counter('generate_embedding_calls').add(1, { model });
const result = await this.instance.embeddings.create({
model: model,
input: messages,
dimensions: options.dimensions || DEFAULT_DIMENSIONS,
user: options.user,
});
return result.data.map(e => e.embedding);
return result.data
.map(e => e?.embedding)
.filter(v => v && Array.isArray(v));
} catch (e: any) {
throw this.handleError(e);
metrics.ai.counter('generate_embedding_errors').add(1, { model });
throw this.handleError(e, model, options);
}
}
@ -305,6 +327,7 @@ export class OpenAIProvider
if (!prompt) throw new CopilotPromptInvalid('Prompt is required');
try {
metrics.ai.counter('generate_images_calls').add(1, { model });
const result = await this.instance.images.generate(
{
prompt,
@ -319,7 +342,8 @@ export class OpenAIProvider
.map(image => image.url)
.filter((v): v is string => !!v);
} catch (e: any) {
throw this.handleError(e);
metrics.ai.counter('generate_images_errors').add(1, { model });
throw this.handleError(e, model, options);
}
}
@ -328,9 +352,15 @@ export class OpenAIProvider
model: string = 'dall-e-3',
options: CopilotImageOptions = {}
): AsyncIterable<string> {
const ret = await this.generateImages(messages, model, options);
for (const url of ret) {
yield url;
try {
metrics.ai.counter('generate_images_stream_calls').add(1, { model });
const ret = await this.generateImages(messages, model, options);
for (const url of ret) {
yield url;
}
} catch (e) {
metrics.ai.counter('generate_images_stream_errors').add(1, { model });
throw e;
}
}
}

View File

@ -24,6 +24,7 @@ import { Admin } from '../../core/common';
import { PermissionService } from '../../core/permission';
import { UserType } from '../../core/user';
import {
CallMetric,
CopilotFailedToCreateMessage,
FileUpload,
RequestMutex,
@ -308,6 +309,7 @@ export class CopilotResolver {
}
@ResolveField(() => [CopilotHistoriesType], {})
@CallMetric('ai', 'histories')
async histories(
@Parent() copilot: CopilotType,
@CurrentUser() user: CurrentUser,
@ -334,6 +336,7 @@ export class CopilotResolver {
options,
true
);
return histories.map(h => ({
...h,
// filter out empty messages
@ -344,6 +347,7 @@ export class CopilotResolver {
@Mutation(() => String, {
description: 'Create a chat session',
})
@CallMetric('ai', 'chat_session_create')
async createCopilotSession(
@CurrentUser() user: CurrentUser,
@Args({ name: 'options', type: () => CreateChatSessionInput })
@ -362,16 +366,16 @@ export class CopilotResolver {
await this.chatSession.checkQuota(user.id);
const session = await this.chatSession.create({
return await this.chatSession.create({
...options,
userId: user.id,
});
return session;
}
@Mutation(() => String, {
description: 'Create a chat session',
})
@CallMetric('ai', 'chat_session_fork')
async forkCopilotSession(
@CurrentUser() user: CurrentUser,
@Args({ name: 'options', type: () => ForkChatSessionInput })
@ -390,16 +394,16 @@ export class CopilotResolver {
await this.chatSession.checkQuota(user.id);
const session = await this.chatSession.fork({
return await this.chatSession.fork({
...options,
userId: user.id,
});
return session;
}
@Mutation(() => [String], {
description: 'Cleanup sessions',
})
@CallMetric('ai', 'chat_session_cleanup')
async cleanupCopilotSession(
@CurrentUser() user: CurrentUser,
@Args({ name: 'options', type: () => DeleteSessionInput })
@ -428,6 +432,7 @@ export class CopilotResolver {
@Mutation(() => String, {
description: 'Create a chat message',
})
@CallMetric('ai', 'chat_message_create')
async createCopilotMessage(
@CurrentUser() user: CurrentUser,
@Args({ name: 'options', type: () => CreateChatMessageInput })

View File

@ -559,6 +559,7 @@ export class ChatSessionService {
this.logger.error(`Prompt not found: ${options.promptName}`);
throw new CopilotPromptNotFound({ name: options.promptName });
}
return await this.setSession({
...options,
sessionId,

View File

@ -6,6 +6,7 @@ import { QuotaManagementService } from '../../core/quota';
import {
type BlobInputType,
BlobQuotaExceeded,
CallMetric,
Config,
type FileUpload,
type StorageProvider,
@ -28,6 +29,7 @@ export class CopilotStorage {
);
}
@CallMetric('ai', 'blob_put')
async put(
userId: string,
workspaceId: string,
@ -43,20 +45,24 @@ export class CopilotStorage {
return this.url.link(`/api/copilot/blob/${name}`);
}
@CallMetric('ai', 'blob_get')
async get(userId: string, workspaceId: string, key: string) {
return this.provider.get(`${userId}/${workspaceId}/${key}`);
}
@CallMetric('ai', 'blob_delete')
async delete(userId: string, workspaceId: string, key: string) {
return this.provider.delete(`${userId}/${workspaceId}/${key}`);
await this.provider.delete(`${userId}/${workspaceId}/${key}`);
}
@CallMetric('ai', 'blob_upload')
async handleUpload(userId: string, blob: FileUpload) {
const checkExceeded = await this.quota.getQuotaCalculator(userId);
if (checkExceeded(0)) {
throw new BlobQuotaExceeded();
}
const buffer = await new Promise<Buffer>((resolve, reject) => {
const stream = blob.createReadStream();
const chunks: Uint8Array[] = [];
@ -87,6 +93,7 @@ export class CopilotStorage {
};
}
@CallMetric('ai', 'blob_proxy_remote_url')
async handleRemoteLink(userId: string, workspaceId: string, link: string) {
const response = await fetch(link);
const buffer = new Uint8Array(await response.arrayBuffer());