feat(infra): introduce op pattern (#8734)

This commit is contained in:
liuyi 2024-11-09 15:23:38 +08:00 committed by GitHub
parent 571e25a7a1
commit d6618b6891
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1239 additions and 1 deletions

View File

@ -16,6 +16,7 @@
"@affine/templates": "workspace:*",
"@blocksuite/affine": "0.17.28",
"@datastructures-js/binary-search-tree": "^5.3.2",
"eventemitter2": "^6.4.9",
"foxact": "^0.2.33",
"fractional-indexing": "^3.2.0",
"fuse.js": "^7.0.0",

View File

@ -0,0 +1,159 @@
# Introduction
Operation Pattern is a tiny `RPC` framework available both in frontend and backend.
It introduces super simple call and listen signatures to make Worker, cross tabs SharedWorker or BroadcastChannel easier to use and reduce boilerplate.
# usage
## Register Op Handlers
### Function call handler
```ts
interface Ops extends OpSchema {
add: [{ a: number; b: number }, number]
}
// register
const consumer: OpConsumer<Ops>;
consumer.register('add', ({ a, b }) => a + b);
// call
const client: OpClient<Ops>;
const ret = client.call('add', { a: 1, b: 2 })); // Promise<3>
```
### Stream call handler
```ts
interface Ops extends OpSchema {
subscribeStatus: [number, string];
}
// register
const consumer: OpConsumer<Ops>;
consumer.register('subscribeStatus', (id: number) => {
return interval(3000).pipe(map(() => 'connected'));
});
// subscribe
const client: OpClient<Ops>;
client.subscribe('subscribeStatus', 123, {
next: status => {
ui.setServerStatus(status);
},
error: error => {
ui.setServerError(error);
},
complete: () => {
//
},
});
```
### Transfer variables
> [Transferable Objects](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects)
#### Client transferables
```ts
interface Ops extends OpSchema {
heavyWork: [{ name: string; data: Uint8Array; data2: Uint8Array }, void];
}
const client: OpClient<Ops>;
const data = new Uint8Array([1, 2, 3]);
const nonTransferredData = new Uint8Array([1, 2, 3]);
client.call(
'heavyWork',
transfer(
{
name: '',
data: data,
data2: nonTransferredData,
},
[data.buffer]
)
);
// after transferring, you can not use the transferred variables anymore!!!
// moved
assertEq(data.byteLength, 0);
// copied
assertEq(nonTransferredData.byteLength, 3);
```
#### Consumer transferables
```ts
interface Ops extends OpSchema {
job: [{ id: string }, Uint8Array];
}
const consumer: OpConsumer<Ops>;
consumer.register('ops', ({ id }) => {
return interval(3000).pipe(
map(() => {
const data = new Uint8Array([1, 2, 3]);
transfer(data, [data.buffer]);
})
);
});
```
## Communication
### BroadcastChannel
:::CAUTION
BroadcastChannel doesn't support transfer transferable objects. All data passed through it's `postMessage` api would be structured cloned
see [Structured_clone_algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
:::
```ts
const channel = new BroadcastChannel('domain');
const consumer = new OpConsumer(channel);
consumer.listen();
const client = new OpClient(channel);
client.listen();
```
### MessagePort
```ts
const { port1, port2 } = new MessagePort();
const client = new OpClient(port1);
const consumer = new OpConsumer(port2);
```
### Worker
```ts
const worker = new Worker('./xxx-worker');
const client = new OpClient(worker);
// in worker
const consumer = new OpConsumer(globalThis);
consumer.listen();
```
### SharedWorker
```ts
const worker = new SharedWorker('./xxx-worker');
const client = new OpClient(worker.port);
// in worker
globalThis.addEventListener('connect', event => {
const port = event.ports[0];
const consumer = new OpConsumer(port);
consumer.listen();
});
```

View File

@ -0,0 +1,215 @@
import { afterEach } from 'node:test';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { OpClient } from '../client';
import { type MessageHandlers, transfer } from '../message';
import type { OpSchema } from '../types';
interface TestOps extends OpSchema {
add: [{ a: number; b: number }, number];
bin: [Uint8Array, Uint8Array];
sub: [Uint8Array, number];
}
declare module 'vitest' {
interface TestContext {
producer: OpClient<TestOps>;
handlers: MessageHandlers;
postMessage: ReturnType<typeof vi.fn>;
}
}
describe('op client', () => {
beforeEach(ctx => {
const { port1 } = new MessageChannel();
// @ts-expect-error patch postMessage
port1.postMessage = vi.fn(port1.postMessage);
// @ts-expect-error patch postMessage
ctx.postMessage = port1.postMessage;
ctx.producer = new OpClient(port1);
// @ts-expect-error internal api
ctx.handlers = ctx.producer.handlers;
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('should send call op', async ctx => {
// @ts-expect-error internal api
const pendingCalls = ctx.producer.pendingCalls;
const result = ctx.producer.call('add', { a: 1, b: 2 });
expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(`
{
"id": "add:1",
"name": "add",
"payload": {
"a": 1,
"b": 2,
},
"type": "call",
}
`);
expect(pendingCalls.has('add:1')).toBe(true);
// fake consumer return
ctx.handlers.return({ type: 'return', id: 'add:1', data: 3 });
await expect(result).resolves.toBe(3);
expect(pendingCalls.has('add:1')).toBe(false);
});
it('should transfer transferables with call op', async ctx => {
const data = new Uint8Array([1, 2, 3]);
const result = ctx.producer.call('bin', transfer(data, [data.buffer]));
expect(ctx.postMessage.mock.calls[0][1].transfer[0]).toBeInstanceOf(
ArrayBuffer
);
// fake consumer return
ctx.handlers.return({
type: 'return',
id: 'bin:1',
data: new Uint8Array([3, 2, 1]),
});
await expect(result).resolves.toEqual(new Uint8Array([3, 2, 1]));
expect(data.byteLength).toBe(0);
});
it('should cancel call', async ctx => {
const promise = ctx.producer.call('add', { a: 1, b: 2 });
promise.cancel();
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
[
{
"id": "add:1",
"type": "cancel",
},
]
`);
await expect(promise).rejects.toThrow('canceled');
});
it('should timeout call', async ctx => {
const promise = ctx.producer.call('add', { a: 1, b: 2 });
vi.advanceTimersByTime(4000);
await expect(promise).rejects.toThrow('timeout');
});
it('should send subscribe op', async ctx => {
let ob = {
next: vi.fn(),
error: vi.fn(),
complete: vi.fn(),
};
// @ts-expect-error internal api
const subscriptions = ctx.producer.obs;
ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob);
expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(`
{
"id": "sub:1",
"name": "sub",
"payload": Uint8Array [
1,
2,
3,
],
"type": "subscribe",
}
`);
expect(subscriptions.has('sub:1')).toBe(true);
// fake consumer return
ctx.handlers.next({ type: 'next', id: 'sub:1', data: 1 });
ctx.handlers.next({ type: 'next', id: 'sub:1', data: 2 });
ctx.handlers.next({ type: 'next', id: 'sub:1', data: 3 });
expect(subscriptions.has('sub:1')).toBe(true);
ctx.handlers.complete({ type: 'complete', id: 'sub:1' });
expect(ob.next).toHaveBeenCalledTimes(3);
expect(ob.complete).toHaveBeenCalledTimes(1);
expect(subscriptions.has('sub:1')).toBe(false);
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
[
{
"id": "sub:1",
"type": "unsubscribe",
},
]
`);
// smoking
ob = {
next: vi.fn(),
error: vi.fn(),
complete: vi.fn(),
};
ctx.producer.subscribe('sub', new Uint8Array([1, 2, 3]), ob);
expect(subscriptions.has('sub:2')).toBe(true);
ctx.handlers.next({ type: 'next', id: 'sub:2', data: 1 });
ctx.handlers.error({
type: 'error',
id: 'sub:2',
error: new Error('test'),
});
expect(ob.next).toHaveBeenCalledTimes(1);
expect(ob.error).toHaveBeenCalledTimes(1);
expect(subscriptions.has('sub')).toBe(false);
});
it('should transfer transferables with subscribe op', async ctx => {
const data = new Uint8Array([1, 2, 3]);
const unsubscribe = ctx.producer.subscribe(
'bin',
transfer(data, [data.buffer]),
{
next: vi.fn(),
}
);
expect(data.byteLength).toBe(0);
unsubscribe();
});
it('should unsubscribe subscription op', ctx => {
const unsubscribe = ctx.producer.subscribe(
'sub',
new Uint8Array([1, 2, 3]),
{
next: vi.fn(),
}
);
unsubscribe();
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
[
{
"id": "sub:1",
"type": "unsubscribe",
},
]
`);
});
});

View File

@ -0,0 +1,197 @@
import { afterEach } from 'node:test';
import { Observable } from 'rxjs';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { OpConsumer } from '../consumer';
import { type MessageHandlers, transfer } from '../message';
import type { OpSchema } from '../types';
interface TestOps extends OpSchema {
add: [{ a: number; b: number }, number];
any: [any, any];
}
declare module 'vitest' {
interface TestContext {
consumer: OpConsumer<TestOps>;
handlers: MessageHandlers;
postMessage: ReturnType<typeof vi.fn>;
}
}
describe('op consumer', () => {
beforeEach(ctx => {
const { port2 } = new MessageChannel();
// @ts-expect-error patch postMessage
port2.postMessage = vi.fn(port2.postMessage);
// @ts-expect-error patch postMessage
ctx.postMessage = port2.postMessage;
ctx.consumer = new OpConsumer(port2);
// @ts-expect-error internal api
ctx.handlers = ctx.consumer.handlers;
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('should throw if no handler registered', async ctx => {
ctx.handlers.call({ type: 'call', id: 'add:1', name: 'add', payload: {} });
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
[
{
"error": [Error: Handler for operation [add] is not registered.],
"id": "add:1",
"type": "return",
},
]
`);
});
it('should handle call message', async ctx => {
ctx.consumer.register('add', ({ a, b }) => a + b);
ctx.handlers.call({
type: 'call',
id: 'add:1',
name: 'add',
payload: { a: 1, b: 2 },
});
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(`
{
"data": 3,
"id": "add:1",
"type": "return",
}
`);
});
it('should handle cancel message', async ctx => {
ctx.consumer.register('add', ({ a, b }, { signal }) => {
const { reject, resolve, promise } = Promise.withResolvers<number>();
signal?.addEventListener('abort', () => {
reject(new Error('canceled'));
});
setTimeout(() => {
resolve(a + b);
}, Number.MAX_SAFE_INTEGER);
return promise;
});
ctx.handlers.call({
type: 'call',
id: 'add:1',
name: 'add',
payload: { a: 1, b: 2 },
});
ctx.handlers.cancel({ type: 'cancel', id: 'add:1' });
await vi.advanceTimersByTimeAsync(1);
expect(ctx.postMessage).not.toBeCalled();
});
it('should transfer transferables in return', async ctx => {
const data = new Uint8Array([1, 2, 3]);
const nonTransferred = new Uint8Array([4, 5, 6]);
ctx.consumer.register('any', () => {
return transfer({ data: { data, nonTransferred } }, [data.buffer]);
});
ctx.handlers.call({ type: 'call', id: 'any:1', name: 'any', payload: {} });
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage).toHaveBeenCalledOnce();
expect(data.byteLength).toBe(0);
expect(nonTransferred.byteLength).toBe(3);
});
it('should handle subscribe message', async ctx => {
ctx.consumer.register('any', data => {
return new Observable(observer => {
data.forEach((v: number) => observer.next(v));
observer.complete();
});
});
ctx.handlers.subscribe({
type: 'subscribe',
id: 'any:1',
name: 'any',
payload: transfer(new Uint8Array([1, 2, 3]), [
new Uint8Array([1, 2, 3]).buffer,
]),
});
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls.map(call => call[0]))
.toMatchInlineSnapshot(`
[
{
"data": 1,
"id": "any:1",
"type": "next",
},
{
"data": 2,
"id": "any:1",
"type": "next",
},
{
"data": 3,
"id": "any:1",
"type": "next",
},
{
"id": "any:1",
"type": "complete",
},
]
`);
});
it('should handle unsubscribe message', async ctx => {
ctx.consumer.register('any', data => {
return new Observable(observer => {
data.forEach((v: number) => {
setTimeout(() => {
observer.next(v);
}, 1);
});
setTimeout(() => {
observer.complete();
}, 1);
});
});
ctx.handlers.subscribe({
type: 'subscribe',
id: 'any:1',
name: 'any',
payload: transfer(new Uint8Array([1, 2, 3]), [
new Uint8Array([1, 2, 3]).buffer,
]),
});
ctx.handlers.unsubscribe({ type: 'unsubscribe', id: 'any:1' });
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(`
[
[
{
"id": "any:1",
"type": "complete",
},
],
]
`);
});
});

View File

@ -0,0 +1,76 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
AutoMessageHandler,
ignoreUnknownEvent,
KNOWN_MESSAGE_TYPES,
type MessageCommunicapable,
type MessageHandlers,
} from '../message';
class CustomMessageHandler extends AutoMessageHandler {
public handlers: Partial<MessageHandlers> = {
call: vi.fn(),
cancel: vi.fn(),
subscribe: vi.fn(),
unsubscribe: vi.fn(),
return: vi.fn(),
next: vi.fn(),
error: vi.fn(),
complete: vi.fn(),
};
}
declare module 'vitest' {
interface TestContext {
sendPort: MessageCommunicapable;
receivePort: MessageCommunicapable;
handler: CustomMessageHandler;
}
}
describe('message', () => {
beforeEach(ctx => {
const listeners: ((event: MessageEvent) => void)[] = [];
ctx.sendPort = {
postMessage: (msg: any) => {
listeners.forEach(listener => {
listener(new MessageEvent('message', { data: msg }));
});
},
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
};
ctx.receivePort = {
postMessage: vi.fn(),
addEventListener: vi.fn((_event, handler) => {
listeners.push(handler);
}),
removeEventListener: vi.fn(),
};
ctx.handler = new CustomMessageHandler(ctx.receivePort);
ctx.handler.listen();
});
it('should ignore unknown message type', ctx => {
const handler = vi.fn();
// @ts-expect-error internal api
ctx.handler.handleMessage = ignoreUnknownEvent(handler);
ctx.sendPort.postMessage('connected');
ctx.sendPort.postMessage({ type: 'call1' });
ctx.sendPort.postMessage(new Uint8Array());
ctx.sendPort.postMessage(null);
ctx.sendPort.postMessage(undefined);
expect(handler).not.toHaveBeenCalled();
});
it('should handle known message type', async ctx => {
for (const type of KNOWN_MESSAGE_TYPES) {
ctx.sendPort.postMessage({ type });
expect(ctx.handler.handlers[type]).toBeCalled();
}
});
});

View File

@ -0,0 +1,206 @@
import { merge } from 'lodash-es';
import { Observable, type Observer } from 'rxjs';
import {
AutoMessageHandler,
type CallMessage,
type CancelMessage,
fetchTransferables,
type MessageCommunicapable,
type MessageHandlers,
type SubscribeMessage,
type UnsubscribeMessage,
} from './message';
import type { OpInput, OpNames, OpOutput, OpSchema } from './types';
export interface CancelablePromise<T> extends Promise<T> {
cancel(): void;
}
interface PendingCall extends PromiseWithResolvers<any> {
id: string;
timeout: number | NodeJS.Timeout;
}
interface OpClientOptions {
timeout?: number;
}
export class OpClient<Ops extends OpSchema> extends AutoMessageHandler {
private readonly callIds = new Map<OpNames<Ops>, number>();
private readonly pendingCalls = new Map<string, PendingCall>();
private readonly obs = new Map<string, Observer<any>>();
private readonly options: OpClientOptions = {
timeout: 3000,
};
constructor(port: MessageCommunicapable, options: OpClientOptions = {}) {
super(port);
merge(this.options, options);
}
protected override get handlers() {
return {
return: this.handleReturnMessage,
next: this.handleSubscriptionNextMessage,
error: this.handleSubscriptionErrorMessage,
complete: this.handleSubscriptionCompleteMessage,
};
}
private readonly handleReturnMessage: MessageHandlers['return'] = msg => {
const pending = this.pendingCalls.get(msg.id);
if (!pending) {
return;
}
if ('error' in msg) {
pending.reject(msg.error);
} else {
pending.resolve(msg.data);
}
clearTimeout(pending.timeout);
this.pendingCalls.delete(msg.id);
};
private readonly handleSubscriptionNextMessage: MessageHandlers['next'] =
msg => {
const ob = this.obs.get(msg.id);
if (!ob) {
return;
}
ob.next(msg.data);
};
private readonly handleSubscriptionErrorMessage: MessageHandlers['error'] =
msg => {
const ob = this.obs.get(msg.id);
if (!ob) {
return;
}
ob.error(msg.error);
};
private readonly handleSubscriptionCompleteMessage: MessageHandlers['complete'] =
msg => {
const ob = this.obs.get(msg.id);
if (!ob) {
return;
}
ob.complete();
};
protected nextCallId(op: OpNames<Ops>) {
let id = this.callIds.get(op) ?? 0;
id++;
this.callIds.set(op, id);
return `${op}:${id}`;
}
protected currentCallId(op: OpNames<Ops>) {
return this.callIds.get(op) ?? 0;
}
call<Op extends OpNames<Ops>>(
op: Op,
...args: OpInput<Ops, Op>
): CancelablePromise<OpOutput<Ops, Op>> {
const promiseWithResolvers = Promise.withResolvers<any>();
const payload = args[0];
const msg = {
type: 'call',
id: this.nextCallId(op),
name: op as string,
payload,
} satisfies CallMessage;
const promise = promiseWithResolvers.promise as CancelablePromise<any>;
const raise = (reason: string) => {
const pending = this.pendingCalls.get(msg.id);
if (!pending) {
return;
}
this.port.postMessage({
type: 'cancel',
id: msg.id,
} satisfies CancelMessage);
promiseWithResolvers.reject(new Error(reason));
clearTimeout(pending.timeout);
this.pendingCalls.delete(msg.id);
};
promise.cancel = () => {
raise('canceled');
};
const timeout = setTimeout(() => {
raise('timeout');
}, this.options.timeout);
const transferables = fetchTransferables(payload);
this.port.postMessage(msg, { transfer: transferables });
this.pendingCalls.set(msg.id, {
...promiseWithResolvers,
timeout,
id: msg.id,
});
return promise;
}
subscribe<Op extends OpNames<Ops>, Out extends OpOutput<Ops, Op>>(
op: Op,
...args: [
...OpInput<Ops, Op>,
Partial<Observer<Out>> | ((value: Out) => void),
]
): () => void {
const payload = args[0];
const observer = args[1] as Partial<Observer<Out>> | ((value: Out) => void);
const msg = {
type: 'subscribe',
id: this.nextCallId(op),
name: op as string,
payload,
} satisfies SubscribeMessage;
const sub = new Observable<Out>(ob => {
this.obs.set(msg.id, ob);
}).subscribe(observer);
sub.add(() => {
this.obs.delete(msg.id);
this.port.postMessage({
type: 'unsubscribe',
id: msg.id,
} satisfies UnsubscribeMessage);
});
const transferables = fetchTransferables(payload);
this.port.postMessage(msg, { transfer: transferables });
return () => {
sub.unsubscribe();
};
}
destroy() {
super.close();
this.pendingCalls.forEach(call => {
call.reject(new Error('client destroyed'));
});
this.pendingCalls.clear();
this.obs.forEach(ob => {
ob.complete();
});
this.obs.clear();
}
}

View File

@ -0,0 +1,188 @@
import EventEmitter2 from 'eventemitter2';
import {
defer,
from,
fromEvent,
Observable,
of,
share,
take,
takeUntil,
} from 'rxjs';
import {
AutoMessageHandler,
type CallMessage,
fetchTransferables,
type MessageHandlers,
type ReturnMessage,
type SubscribeMessage,
type SubscriptionCompleteMessage,
type SubscriptionErrorMessage,
type SubscriptionNextMessage,
} from './message';
import type { OpInput, OpNames, OpOutput, OpSchema } from './types';
interface OpCallContext {
signal: AbortSignal;
}
export type OpHandler<Ops extends OpSchema, Op extends OpNames<Ops>> = (
payload: OpInput<Ops, Op>[0],
ctx: OpCallContext
) =>
| OpOutput<Ops, Op>
| Promise<OpOutput<Ops, Op>>
| Observable<OpOutput<Ops, Op>>;
export class OpConsumer<Ops extends OpSchema> extends AutoMessageHandler {
private readonly eventBus = new EventEmitter2();
private readonly registeredOpHandlers = new Map<
OpNames<Ops>,
OpHandler<Ops, any>
>();
private readonly processing = new Map<string, AbortController>();
override get handlers() {
return {
call: this.handleCallMessage,
cancel: this.handleCancelMessage,
subscribe: this.handleSubscribeMessage,
unsubscribe: this.handleCancelMessage,
};
}
private readonly handleCallMessage: MessageHandlers['call'] = async msg => {
const abortController = new AbortController();
this.processing.set(msg.id, abortController);
this.eventBus.emit(`before:${msg.name}`, msg.payload);
this.ob$(msg, abortController.signal)
.pipe(take(1))
.subscribe({
next: data => {
this.eventBus.emit(`after:${msg.name}`, msg.payload, data);
const transferables = fetchTransferables(data);
this.port.postMessage(
{
type: 'return',
id: msg.id,
data,
} satisfies ReturnMessage,
{ transfer: transferables }
);
},
error: error => {
this.port.postMessage({
type: 'return',
id: msg.id,
error: error as Error,
} satisfies ReturnMessage);
},
complete: () => {
this.processing.delete(msg.id);
},
});
};
private readonly handleSubscribeMessage: MessageHandlers['subscribe'] =
msg => {
const abortController = new AbortController();
this.processing.set(msg.id, abortController);
this.ob$(msg, abortController.signal).subscribe({
next: data => {
const transferables = fetchTransferables(data);
this.port.postMessage(
{
type: 'next',
id: msg.id,
data,
} satisfies SubscriptionNextMessage,
{ transfer: transferables }
);
},
error: error => {
this.port.postMessage({
type: 'error',
id: msg.id,
error: error as Error,
} satisfies SubscriptionErrorMessage);
},
complete: () => {
this.port.postMessage({
type: 'complete',
id: msg.id,
} satisfies SubscriptionCompleteMessage);
this.processing.delete(msg.id);
},
});
};
private readonly handleCancelMessage: MessageHandlers['cancel'] &
MessageHandlers['unsubscribe'] = msg => {
const abortController = this.processing.get(msg.id);
if (!abortController) {
return;
}
abortController.abort();
};
register<Op extends OpNames<Ops>>(op: Op, handler: OpHandler<Ops, Op>) {
this.registeredOpHandlers.set(op, handler);
}
before<Op extends OpNames<Ops>>(
op: Op,
handler: (...input: OpInput<Ops, Op>) => void
) {
this.eventBus.on(`before:${op}`, handler);
}
after<Op extends OpNames<Ops>>(
op: Op,
handler: (...args: [...OpInput<Ops, Op>, OpOutput<Ops, Op>]) => void
) {
this.eventBus.on(`after:${op}`, handler);
}
/**
* @internal
*/
ob$(op: CallMessage | SubscribeMessage, signal: AbortSignal) {
return defer(() => {
const handler = this.registeredOpHandlers.get(op.name as any);
if (!handler) {
throw new Error(
`Handler for operation [${op.name}] is not registered.`
);
}
const ret$ = handler(op.payload, { signal });
let ob$: Observable<any>;
if (ret$ instanceof Promise) {
ob$ = from(ret$);
} else if (ret$ instanceof Observable) {
ob$ = ret$;
} else {
ob$ = of(ret$);
}
return ob$.pipe(share(), takeUntil(fromEvent(signal, 'abort')));
});
}
destroy() {
super.close();
this.registeredOpHandlers.clear();
this.processing.forEach(controller => {
controller.abort();
});
this.processing.clear();
this.eventBus.removeAllListeners();
}
}

View File

@ -0,0 +1,4 @@
export * from './client';
export * from './consumer';
export { type MessageCommunicapable, transfer } from './message';
export type { OpSchema } from './types';

View File

@ -0,0 +1,155 @@
const PRODUCER_MESSAGE_TYPES = [
'call',
'cancel',
'subscribe',
'unsubscribe',
] as const;
const CONSUMER_MESSAGE_TYPES = ['return', 'next', 'error', 'complete'] as const;
export const KNOWN_MESSAGE_TYPES = new Set([
...PRODUCER_MESSAGE_TYPES,
...CONSUMER_MESSAGE_TYPES,
]);
type MessageType =
| (typeof PRODUCER_MESSAGE_TYPES)[number]
| (typeof CONSUMER_MESSAGE_TYPES)[number];
export interface Message {
type: MessageType;
}
// in
export interface CallMessage extends Message {
type: 'call';
id: string;
name: string;
payload: any;
}
export interface CancelMessage extends Message {
type: 'cancel';
id: string;
}
export interface SubscribeMessage extends Message {
type: 'subscribe';
id: string;
name: string;
payload: any;
}
export interface UnsubscribeMessage extends Message {
type: 'unsubscribe';
id: string;
}
// out
export type ReturnMessage = {
type: 'return';
id: string;
} & (
| {
data: any;
}
| {
error: Error;
}
);
export interface SubscriptionNextMessage extends Message {
type: 'next';
id: string;
data: any;
}
export interface SubscriptionErrorMessage extends Message {
type: 'error';
id: string;
error: Error;
}
export type SubscriptionCompleteMessage = {
type: 'complete';
id: string;
};
export type Messages =
| CallMessage
| CancelMessage
| SubscribeMessage
| UnsubscribeMessage
| ReturnMessage
| SubscriptionNextMessage
| SubscriptionErrorMessage
| SubscriptionCompleteMessage;
export type MessageHandlers = {
[Type in Messages['type']]: (
message: Extract<Messages, { type: Type }>
) => void;
};
export type MessageCommunicapable = Pick<
MessagePort,
'postMessage' | 'addEventListener' | 'removeEventListener'
> & {
start?(): void;
close?(): void;
};
export function ignoreUnknownEvent(handler: (data: Messages) => void) {
return (event: MessageEvent<Message>) => {
const data = event.data;
if (
!data ||
typeof data !== 'object' ||
typeof data.type !== 'string' ||
!KNOWN_MESSAGE_TYPES.has(data.type)
) {
return;
}
handler(data as any);
};
}
const TRANSFERABLES_CACHE = new Map<any, Transferable[]>();
export function transfer<T>(data: T, transferables: Transferable[]): T {
TRANSFERABLES_CACHE.set(data, transferables);
return data;
}
export function fetchTransferables(data: any): Transferable[] | undefined {
const transferables = TRANSFERABLES_CACHE.get(data);
if (transferables) {
TRANSFERABLES_CACHE.delete(data);
}
return transferables;
}
export abstract class AutoMessageHandler {
protected abstract handlers: Partial<MessageHandlers>;
constructor(protected readonly port: MessageCommunicapable) {}
protected handleMessage = ignoreUnknownEvent((msg: Messages) => {
const handler = this.handlers[msg.type];
if (!handler) {
return;
}
handler(msg as any);
});
listen() {
this.port.addEventListener('message', this.handleMessage);
this.port.start?.();
}
close() {
this.port.close?.();
this.port.removeEventListener('message', this.handleMessage);
}
}

View File

@ -0,0 +1,36 @@
type KeyToKey<T extends OpSchema> = {
[K in keyof T]: string extends K ? never : K;
};
declare type ValuesOf<T> = T extends {
[K in keyof T]: infer _U;
}
? _U
: never;
export interface OpSchema {
[key: string]: [any, any?];
}
type RequiredInput<In> = In extends void ? [] : In extends never ? [] : [In];
export type OpNames<T extends OpSchema> = ValuesOf<KeyToKey<T>>;
export type OpInput<
Ops extends OpSchema,
Type extends OpNames<Ops>,
> = Type extends keyof Ops
? Ops[Type] extends [infer In]
? RequiredInput<In>
: Ops[Type] extends [infer In, infer _Out]
? RequiredInput<In>
: never
: never;
export type OpOutput<
Ops extends OpSchema,
Type extends OpNames<Ops>,
> = Type extends keyof Ops
? Ops[Type] extends [infer _In, infer Out]
? Out
: never
: never;

View File

@ -13075,6 +13075,7 @@ __metadata:
"@blocksuite/affine": "npm:0.17.28"
"@datastructures-js/binary-search-tree": "npm:^5.3.2"
"@testing-library/react": "npm:^16.0.0"
eventemitter2: "npm:^6.4.9"
fake-indexeddb: "npm:^6.0.0"
foxact: "npm:^0.2.33"
fractional-indexing: "npm:^3.2.0"
@ -19984,7 +19985,7 @@ __metadata:
languageName: node
linkType: hard
"eventemitter2@npm:6.4.9":
"eventemitter2@npm:6.4.9, eventemitter2@npm:^6.4.9":
version: 6.4.9
resolution: "eventemitter2@npm:6.4.9"
checksum: 10/b829b1c6b11e15926b635092b5ad62b4463d1c928859831dcae606e988cf41893059e3541f5a8209d21d2f15314422ddd4d84d20830b4bf44978608d15b06b08