feat(infra): livedata (#5562)

LiveData is a reactive data type.

## basic usage

@example
```ts
const livedata = new LiveData(0); // create livedata with initial value

livedata.next(1); // update value

console.log(livedata.value); // get current value

livedata.subscribe(v => { // subscribe to value changes
 console.log(v); // 1
});
```

## observable

LiveData is a rxjs observable, you can use rxjs operators.

@example
```ts
new LiveData(0).pipe(
  map(v => v + 1),
  filter(v => v > 1),
  ...
)
```

NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it.

## from observable

LiveData can be created from observable or from other livedata.

@example
```ts
const A = LiveData.from(
  of(1, 2, 3, 4), // from observable
  0 // initial value
);

const B = LiveData.from(
  A.pipe(map(v => 'from a ' + v)), // from other livedata
  '' // initial value
);
```

NOTICE: LiveData.from will not complete when the observable completes, you can use `spreadComplete` option to change
this behavior.

## Why is it called LiveData

This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData.
This commit is contained in:
EYHN 2024-01-30 06:31:11 +00:00
parent 588b3bcf33
commit c9f8e49f75
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
6 changed files with 597 additions and 0 deletions

View File

@ -7,6 +7,7 @@
"./command": "./src/command/index.ts",
"./atom": "./src/atom/index.ts",
"./app-config-storage": "./src/app-config-storage.ts",
"./livedata": "./src/livedata/index.ts",
".": "./src/index.ts"
},
"dependencies": {
@ -16,9 +17,11 @@
"@blocksuite/blocks": "0.12.0-nightly-202401290223-b6302df",
"@blocksuite/global": "0.12.0-nightly-202401290223-b6302df",
"@blocksuite/store": "0.12.0-nightly-202401290223-b6302df",
"foxact": "^0.2.20",
"jotai": "^2.5.1",
"jotai-effect": "^0.2.3",
"nanoid": "^5.0.3",
"react": "18.2.0",
"tinykeys": "^2.1.0",
"yjs": "^13.6.10",
"zod": "^3.22.4"
@ -28,6 +31,7 @@
"@affine/templates": "workspace:*",
"@blocksuite/lit": "0.12.0-nightly-202401290223-b6302df",
"@blocksuite/presets": "0.12.0-nightly-202401290223-b6302df",
"@testing-library/react": "^14.0.0",
"async-call-rpc": "^6.3.1",
"react": "^18.2.0",
"rxjs": "^7.8.1",

View File

@ -0,0 +1,188 @@
import type { Subscriber } from 'rxjs';
import { combineLatest, Observable, of } from 'rxjs';
import { describe, expect, test, vitest } from 'vitest';
import { LiveData } from '..';
describe('livedata', () => {
test('LiveData', async () => {
const livedata = new LiveData(0);
expect(livedata.value).toBe(0);
livedata.next(1);
expect(livedata.value).toBe(1);
let subscribed = 0;
livedata.subscribe(v => {
subscribed = v;
});
livedata.next(2);
expect(livedata.value).toBe(2);
await vitest.waitFor(() => subscribed === 2);
});
test('from', async () => {
{
const livedata = LiveData.from(of(1, 2, 3, 4), 0);
expect(livedata.value).toBe(4);
}
{
let subscriber: Subscriber<number> = null!;
const observable = new Observable<number>(s => {
subscriber = s;
});
const livedata = LiveData.from(observable, 0);
let value = 0;
livedata.subscribe(v => {
value = v;
});
expect(value).toBe(0);
subscriber.next(1);
expect(value).toBe(1);
subscriber.next(2);
expect(value).toBe(2);
}
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
expect(observableSubscribed).toBe(false);
const subscription = livedata.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription.unsubscribe();
expect(observableClosed).toBe(true);
}
{
let subscriber: Subscriber<number> = null!;
const observable = new Observable<number>(s => {
subscriber = s;
});
const livedata = LiveData.from(observable, 0);
let value1 = 0;
livedata.subscribe(v => {
value1 = v;
});
let value2 = 0;
livedata.subscribe(v => {
value2 = v;
});
expect(value1).toBe(0);
expect(value2).toBe(0);
subscriber.next(1);
expect(value1).toBe(1);
expect(value2).toBe(1);
subscriber.next(2);
expect(value1).toBe(2);
expect(value2).toBe(2);
}
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
expect(observableSubscribed).toBe(false);
const subscription1 = livedata.subscribe(_ => {});
const subscription2 = livedata.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription1.unsubscribe();
expect(observableClosed).toBe(false);
subscription2.unsubscribe();
expect(observableClosed).toBe(true);
}
{
let observerCount = 0;
const observable = new Observable(_ => {
observerCount++;
});
const livedata = LiveData.from(observable, 0);
livedata.subscribe(_ => {});
livedata.subscribe(_ => {});
expect(observerCount).toBe(1);
}
{
let value = 0;
const observable = new Observable<number>(subscriber => {
subscriber.next(value);
});
const livedata = LiveData.from(observable, 0);
expect(livedata.value).toBe(0);
value = 1;
expect(livedata.value).toBe(1);
}
});
test('map', () => {
{
const livedata = new LiveData(0);
const mapped = livedata.map(v => v + 1);
expect(mapped.value).toBe(1);
livedata.next(1);
expect(mapped.value).toBe(2);
}
{
const livedata = new LiveData(0);
const mapped = livedata.map(v => v + 1);
let value = 0;
mapped.subscribe(v => {
value = v;
});
expect(value).toBe(1);
livedata.next(1);
expect(value).toBe(2);
}
{
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable<number>(subscriber => {
observableSubscribed = true;
subscriber.next(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
const mapped = livedata.map(v => v + 1);
expect(observableSubscribed).toBe(false);
const subscription = mapped.subscribe(_ => {});
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
subscription.unsubscribe();
expect(observableClosed).toBe(true);
}
});
test('interop with rxjs', () => {
const ob = combineLatest([new LiveData(1)]);
let value = 0;
ob.subscribe(v => {
value = v[0];
});
expect(value).toBe(1);
});
});

View File

@ -0,0 +1,60 @@
/**
* @vitest-environment happy-dom
*/
import { render, screen } from '@testing-library/react';
import { useRef } from 'react';
import { Observable } from 'rxjs';
import { describe, expect, test, vi } from 'vitest';
import { LiveData, useLiveData } from '..';
describe('livedata', () => {
test('react', () => {
const livedata = new LiveData(0);
const Component = () => {
const renderCount = useRef(0);
renderCount.current++;
const value = useLiveData(livedata);
return (
<main>
{renderCount.current}:{value}
</main>
);
};
const { rerender } = render(<Component />);
expect(screen.getByRole('main').innerText).toBe('1:0');
livedata.next(1);
rerender(<Component />);
expect(screen.getByRole('main').innerText).toBe('3:1');
});
test('lifecycle', async () => {
let observableSubscribed = false;
let observableClosed = false;
const observable = new Observable<number>(subscriber => {
observableSubscribed = true;
subscriber.next(1);
console.log(1);
return () => {
observableClosed = true;
};
});
const livedata = LiveData.from(observable, 0);
const Component1 = () => {
const value = useLiveData(livedata);
return <main>{value}</main>;
};
expect(observableSubscribed).toBe(false);
const { rerender } = render(<Component1 />);
expect(observableSubscribed).toBe(true);
expect(observableClosed).toBe(false);
const Component2 = () => {
return <main></main>;
};
rerender(<Component2 />);
await vi.waitUntil(() => observableClosed);
});
});

View File

@ -0,0 +1,299 @@
import { DebugLogger } from '@affine/debug';
import {
distinctUntilChanged,
EMPTY,
filter,
type InteropObservable,
map,
Observable,
type Observer,
of,
type OperatorFunction,
scan,
skip,
type Subscription,
switchMap,
} from 'rxjs';
import { BehaviorSubject, Subject } from 'rxjs';
export * from './react';
const logger = new DebugLogger('livedata');
/**
* LiveData is a reactive data type.
*
* ## basic usage
*
* @example
* ```ts
* const livedata = new LiveData(0); // create livedata with initial value
*
* livedata.next(1); // update value
*
* console.log(livedata.value); // get current value
*
* livedata.subscribe(v => { // subscribe to value changes
* console.log(v); // 1
* });
* ```
*
* ## observable
*
* LiveData is a rxjs observable, you can use rxjs operators.
*
* @example
* ```ts
* new LiveData(0).pipe(
* map(v => v + 1),
* filter(v => v > 1),
* ...
* )
* ```
*
* NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it.
*
* ## from observable
*
* LiveData can be created from observable or from other livedata.
*
* @example
* ```ts
* const A = LiveData.from(
* of(1, 2, 3, 4), // from observable
* 0 // initial value
* );
*
* const B = LiveData.from(
* A.pipe(map(v => 'from a ' + v)), // from other livedata
* '' // initial value
* );
* ```
*
* ## Why is it called LiveData
*
* This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData.
*
* @see {@link https://rxjs.dev/api/index/class/BehaviorSubject}
* @see {@link https://developer.android.com/topic/libraries/architecture/livedata}
*/
export class LiveData<T = unknown> implements InteropObservable<T> {
static from<T>(
upstream:
| Observable<T>
| InteropObservable<T>
| ((stream: Observable<LiveDataOperation>) => Observable<T>),
initialValue: T
): LiveData<T> {
const data = new LiveData(
initialValue,
typeof upstream === 'function'
? upstream
: stream =>
stream.pipe(
filter(
(op): op is Exclude<LiveDataOperation, 'set'> => op !== 'set'
),
switchMap(v => {
if (v === 'get') {
return of('watch' as const, 'unwatch' as const);
} else {
return of(v);
}
}),
scan((acc, op) => {
if (op === 'watch') {
return acc + 1;
} else if (op === 'unwatch') {
return acc - 1;
} else {
return acc;
}
}, 0),
map(count => {
if (count > 0) {
return 'watch';
} else {
return 'unwatch';
}
}),
distinctUntilChanged(),
switchMap(op => {
if (op === 'watch') {
return upstream;
} else {
return EMPTY;
}
})
)
);
return data;
}
private readonly raw: BehaviorSubject<T>;
private readonly ops = new Subject<LiveDataOperation>();
private readonly upstreamSubscription: Subscription | undefined;
constructor(
initialValue: T,
upstream:
| ((upstream: Observable<LiveDataOperation>) => Observable<T>)
| undefined = undefined
) {
this.raw = new BehaviorSubject(initialValue);
if (upstream) {
this.upstreamSubscription = upstream(this.ops).subscribe({
next: v => {
this.raw.next(v);
},
complete: () => {
if (!this.raw.closed) {
logger.error('livedata upstream unexpected complete');
}
},
error: err => {
logger.error('uncatched error in livedata', err);
},
});
}
}
getValue(): T {
this.ops.next('get');
return this.raw.value;
}
setValue(v: T) {
this.raw.next(v);
this.ops.next('set');
}
get value(): T {
return this.getValue();
}
set value(v: T) {
this.setValue(v);
}
next(v: T) {
this.setValue(v);
}
subscribe(
observer: Partial<Observer<T>> | ((value: T) => void) | undefined
): Subscription {
this.ops.next('watch');
const subscription = this.raw.subscribe(observer);
subscription.add(() => {
this.ops.next('unwatch');
});
return subscription;
}
map<R>(mapper: (v: T) => R): LiveData<R> {
const sub = LiveData.from(
new Observable<R>(subscriber =>
this.subscribe({
next: v => {
subscriber.next(mapper(v));
},
complete: () => {
sub.complete();
},
})
),
undefined as R // is safe
);
return sub;
}
asObservable(): Observable<T> {
return new Observable<T>(subscriber => {
return this.subscribe(subscriber);
});
}
pipe(): Observable<T>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
pipe<A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Observable<B>;
pipe<A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Observable<C>;
pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
pipe(...args: any[]) {
return new Observable(subscriber => {
this.ops.next('watch');
// eslint-disable-next-line prefer-spread
const subscription = this.raw.pipe
.apply(this.raw, args as any)
.subscribe(subscriber);
subscription.add(() => {
this.ops.next('unwatch');
});
return subscription;
});
}
complete() {
this.ops.complete();
this.raw.complete();
this.upstreamSubscription?.unsubscribe();
}
reactSubscribe = (cb: () => void) => {
this.ops.next('watch');
const subscription = this.raw
.pipe(distinctUntilChanged(), skip(1))
.subscribe(cb);
subscription.add(() => {
this.ops.next('unwatch');
});
return () => subscription.unsubscribe();
};
reactGetSnapshot = () => {
this.ops.next('watch');
setImmediate(() => {
this.ops.next('unwatch');
});
return this.raw.value;
};
[Symbol.observable || '@@observable']() {
return this;
}
[Symbol.observable]() {
return this;
}
}
export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch';

View File

@ -0,0 +1,44 @@
import { use } from 'foxact/use';
import { useSyncExternalStore } from 'react';
import type { LiveData } from './index';
/**
* subscribe LiveData and return the value.
*/
export function useLiveData<T>(liveData: LiveData<T>): T {
return useSyncExternalStore(
liveData.reactSubscribe,
liveData.reactGetSnapshot
);
}
/**
* subscribe LiveData and return the value. If the value is nullish, will suspends until the value is not nullish.
*/
export function useEnsureLiveData<T>(liveData: LiveData<T>): NonNullable<T> {
const data = useLiveData(liveData);
if (data === null || data === undefined) {
return use(
new Promise((resolve, reject) => {
const subscription = liveData.subscribe({
next(value) {
if (value === null || value === undefined) {
resolve(value);
subscription.unsubscribe();
}
},
error(err) {
reject(err);
},
complete() {
reject(new Error('Unexpected completion'));
},
});
})
);
}
return data;
}

View File

@ -13049,7 +13049,9 @@ __metadata:
"@blocksuite/lit": "npm:0.12.0-nightly-202401290223-b6302df"
"@blocksuite/presets": "npm:0.12.0-nightly-202401290223-b6302df"
"@blocksuite/store": "npm:0.12.0-nightly-202401290223-b6302df"
"@testing-library/react": "npm:^14.0.0"
async-call-rpc: "npm:^6.3.1"
foxact: "npm:^0.2.20"
jotai: "npm:^2.5.1"
jotai-effect: "npm:^0.2.3"
nanoid: "npm:^5.0.3"