chore(rpc): bootstrap demo for rpc (#2741)

This commit is contained in:
Pavel Feldman 2020-06-27 11:32:27 -07:00 committed by GitHub
parent 4e94bdabfd
commit e920fde9f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 12 deletions

33
src/rpc/client.ts Normal file
View File

@ -0,0 +1,33 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the 'License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as childProcess from 'child_process';
import * as path from 'path';
import { Connection } from './connection';
import { Transport } from './transport';
(async () => {
const spawnedProcess = childProcess.fork(path.join(__dirname, 'server'), [], { stdio: 'pipe' });
const transport = new Transport(spawnedProcess.stdin, spawnedProcess.stdout);
const connection = new Connection();
connection.onmessage = message => transport.send(message);
transport.onmessage = message => connection.send(message);
const chromium = await connection.waitForObjectWithKnownName('chromium');
const browser = await chromium.launch({ headless: false });
const page = await browser.newPage();
await page.goto('https://example.com');
})();

View File

@ -34,7 +34,7 @@ import { parseError } from './serializers';
export class Connection {
private _channels = new Map<string, Channel>();
private _waitingForObject = new Map<string, any>();
sendMessageToServerTransport = (message: string): void => {};
onmessage = (message: string): void => {};
private _lastId = 0;
private _callbacks = new Map<number, { resolve: (a: any) => void, reject: (a: Error) => void }>();
@ -110,11 +110,11 @@ export class Connection {
const id = ++this._lastId;
const converted = { id, ...message, params: this._replaceChannelsWithGuids(message.params) };
debug('pw:channel:command')(converted);
this.sendMessageToServerTransport(JSON.stringify(converted));
this.onmessage(JSON.stringify(converted));
return new Promise((resolve, reject) => this._callbacks.set(id, { resolve, reject }));
}
dispatchMessageFromServer(message: string) {
send(message: string) {
const parsedMessage = JSON.parse(message);
const { id, guid, method, params, result, error } = parsedMessage;
if (id) {

View File

@ -44,21 +44,21 @@ export class Dispatcher<Type, Initializer> extends EventEmitter implements Chann
export class DispatcherScope {
readonly dispatchers = new Map<string, Dispatcher<any, any>>();
readonly dispatcherSymbol = Symbol('dispatcher');
sendMessageToClientTransport = (message: string) => {};
onmessage = (message: string) => {};
async sendMessageToClient(guid: string, method: string, params: any): Promise<any> {
this.sendMessageToClientTransport(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) }));
this.onmessage(JSON.stringify({ guid, method, params: this._replaceDispatchersWithGuids(params) }));
}
async dispatchMessageFromClient(message: string) {
async send(message: string) {
const parsedMessage = JSON.parse(message);
const { id, guid, method, params } = parsedMessage;
const dispatcher = this.dispatchers.get(guid)!;
try {
const result = await (dispatcher as any)[method](this._replaceGuidsWithDispatchers(params));
this.sendMessageToClientTransport(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) }));
this.onmessage(JSON.stringify({ id, result: this._replaceDispatchersWithGuids(result) }));
} catch (e) {
this.sendMessageToClientTransport(JSON.stringify({ id, error: serializeError(e) }));
this.onmessage(JSON.stringify({ id, error: serializeError(e) }));
}
}

30
src/rpc/server.ts Normal file
View File

@ -0,0 +1,30 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the 'License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Transport } from './transport';
import { DispatcherScope } from './dispatcher';
import { Playwright } from '../server/playwright';
import { BrowserTypeDispatcher } from './server/browserTypeDispatcher';
const dispatcherScope = new DispatcherScope();
const transport = new Transport(process.stdout, process.stdin);
transport.onmessage = message => dispatcherScope.send(message);
dispatcherScope.onmessage = message => transport.send(message);
const playwright = new Playwright(__dirname, require('../../browsers.json')['browsers']);
BrowserTypeDispatcher.from(dispatcherScope, playwright.chromium!);
BrowserTypeDispatcher.from(dispatcherScope, playwright.firefox!);
BrowserTypeDispatcher.from(dispatcherScope, playwright.webkit!);

77
src/rpc/transport.ts Normal file
View File

@ -0,0 +1,77 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { helper } from '../helper';
export class Transport {
private _pipeWrite: NodeJS.WritableStream;
private _data = Buffer.from([]);
private _waitForNextTask = helper.makeWaitForNextTask();
private _closed = false;
private _bytesLeft = 0;
onmessage?: (message: any) => void;
onclose?: () => void;
constructor(pipeWrite: NodeJS.WritableStream, pipeRead: NodeJS.ReadableStream) {
this._pipeWrite = pipeWrite;
pipeRead.on('data', buffer => this._dispatch(buffer));
this.onmessage = undefined;
this.onclose = undefined;
}
send(message: any) {
if (this._closed)
throw new Error('Pipe has been closed');
const data = Buffer.from(JSON.stringify(message), 'utf-8');
const dataLength = Buffer.alloc(4);
dataLength.writeUInt32LE(data.length, 0);
this._pipeWrite.write(dataLength);
this._pipeWrite.write(data);
}
close() {
throw new Error('unimplemented');
}
_dispatch(buffer: Buffer) {
this._data = Buffer.concat([this._data, buffer]);
while (true) {
if (!this._bytesLeft && this._data.length < 4) {
// Need more data.
break;
}
if (!this._bytesLeft) {
this._bytesLeft = this._data.readUInt32LE(0);
this._data = this._data.slice(4);
}
if (!this._bytesLeft || this._data.length < this._bytesLeft) {
// Need more data.
break;
}
const message = this._data.slice(0, this._bytesLeft);
this._data = this._data.slice(this._bytesLeft);
this._bytesLeft = 0;
this._waitForNextTask(() => {
if (this.onmessage)
this.onmessage.call(null, JSON.parse(message.toString('utf-8')));
});
}
}
}

View File

@ -105,11 +105,11 @@ function collect(browserNames) {
if (process.env.PWCHANNEL) {
const dispatcherScope = new DispatcherScope();
const connection = new Connection();
dispatcherScope.sendMessageToClientTransport = async message => {
setImmediate(() => connection.dispatchMessageFromServer(message));
dispatcherScope.onmessage = async message => {
setImmediate(() => connection.send(message));
};
connection.sendMessageToServerTransport = async message => {
const result = await dispatcherScope.dispatchMessageFromClient(message);
connection.onmessage = async message => {
const result = await dispatcherScope.send(message);
await new Promise(f => setImmediate(f));
return result;
};