fix: move toTextStream to affine (#6628)

This commit is contained in:
pengx17 2024-04-19 07:59:23 +00:00
parent 6d62ba856c
commit 6d5dbbd7f4
No known key found for this signature in database
GPG Key ID: 23F23D9E8B3971ED
2 changed files with 101 additions and 10 deletions

View File

@ -0,0 +1,86 @@
import { GeneralNetworkError } from '@blocksuite/blocks';
export function delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
export type AffineTextEvent = {
type: 'attachment' | 'message';
data: string;
};
type AffineTextStream = AsyncIterable<AffineTextEvent>;
type toTextStreamOptions = {
timeout?: number;
};
export function toTextStream(
eventSource: EventSource,
{ timeout }: toTextStreamOptions = {}
): AffineTextStream {
return {
[Symbol.asyncIterator]: async function* () {
const messageQueue: AffineTextEvent[] = [];
let resolveMessagePromise: () => void;
let rejectMessagePromise: (err: Error) => void;
function resetMessagePromise() {
if (resolveMessagePromise) {
resolveMessagePromise();
}
return new Promise<void>((resolve, reject) => {
resolveMessagePromise = resolve;
rejectMessagePromise = reject;
});
}
let messagePromise = resetMessagePromise();
function messageListener(event: MessageEvent) {
messageQueue.push({
type: event.type as 'attachment' | 'message',
data: event.data as string,
});
messagePromise = resetMessagePromise();
}
eventSource.addEventListener('message', messageListener);
eventSource.addEventListener('attachment', messageListener);
eventSource.addEventListener('error', event => {
const errorMessage = (event as unknown as { data: string }).data;
// if there is data in Error event, it means the server sent an error message
// otherwise, the stream is finished successfully
if (event.type === 'error' && errorMessage) {
rejectMessagePromise(new GeneralNetworkError(errorMessage));
} else {
resolveMessagePromise();
}
eventSource.close();
});
try {
while (eventSource.readyState !== EventSource.CLOSED) {
if (messageQueue.length === 0) {
// Wait for the next message or timeout
await (timeout
? Promise.race([
messagePromise,
delay(timeout).then(() => {
throw new Error('Timeout');
}),
])
: messagePromise);
} else if (messageQueue.length > 0) {
const top = messageQueue.shift();
if (top) {
yield top;
}
}
}
} finally {
eventSource.close();
}
},
};
}

View File

@ -1,7 +1,7 @@
import { toTextStream } from '@blocksuite/presets';
import { partition } from 'lodash-es';
import { CopilotClient } from './copilot-client';
import { delay, toTextStream } from './event-source';
import type { PromptKey } from './prompt';
const TIMEOUT = 50000;
@ -116,21 +116,22 @@ export function textToText({
params,
sessionId,
});
const eventSource = client.chatTextStream({
sessionId: message.sessionId,
messageId: message.messageId,
});
yield* toTextStream(eventSource, { timeout });
for await (const event of toTextStream(eventSource, { timeout })) {
if (event.type === 'message') {
yield event.data;
}
}
},
};
} else {
return Promise.race([
timeout
? new Promise((_res, rej) => {
setTimeout(() => {
rej(new Error('Timeout'));
}, timeout);
? delay(timeout).then(() => {
throw new Error('Timeout');
})
: null,
createSessionMessage({
@ -141,8 +142,8 @@ export function textToText({
attachments,
params,
sessionId,
}).then(async message => {
return await client.chatText({
}).then(message => {
return client.chatText({
sessionId: message.sessionId,
messageId: message.messageId,
});
@ -175,7 +176,11 @@ export function toImage({
});
const eventSource = client.imagesStream(messageId, sessionId);
yield* toTextStream(eventSource, { timeout, type: 'attachment' });
for await (const event of toTextStream(eventSource, { timeout })) {
if (event.type === 'attachment') {
yield event.data;
}
}
},
};
}