chore: unify transports, serialize events with acks (#239)

This commit is contained in:
Pavel Feldman 2019-12-13 13:53:49 -08:00 committed by Yury Semikhatsky
parent 39fb556f27
commit 046d015782
7 changed files with 109 additions and 184 deletions

View File

@ -17,7 +17,7 @@
import * as debug from 'debug';
import { EventEmitter } from 'events';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SlowMoTransport } from '../transport';
import { assert } from '../helper';
import { Protocol } from './protocol';
@ -30,7 +30,6 @@ export const ConnectionEvents = {
export class Connection extends EventEmitter {
private _url: string;
private _lastId = 0;
private _delay: number;
private _transport: ConnectionTransport;
private _sessions = new Map<string, CDPSession>();
readonly rootSession: CDPSession;
@ -39,9 +38,8 @@ export class Connection extends EventEmitter {
constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) {
super();
this._url = url;
this._delay = delay;
this._transport = transport;
this._transport = SlowMoTransport.wrap(transport, delay);
this._transport.onmessage = this._onMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
this.rootSession = new CDPSession(this, 'browser', '');
@ -72,8 +70,6 @@ export class Connection extends EventEmitter {
}
async _onMessage(message: string) {
if (this._delay)
await new Promise(f => setTimeout(f, this._delay));
debugProtocol('◀ RECV ' + message);
const object = JSON.parse(message);
if (object.method === 'Target.attachedToTarget') {
@ -101,7 +97,7 @@ export class Connection extends EventEmitter {
for (const session of this._sessions.values())
session._onClosed();
this._sessions.clear();
this.emit(ConnectionEvents.Disconnected);
Promise.resolve().then(() => this.emit(ConnectionEvents.Disconnected));
}
dispose() {
@ -164,7 +160,7 @@ export class CDPSession extends EventEmitter {
callback.resolve(object.result);
} else {
assert(!object.id);
this.emit(object.method, object.params);
Promise.resolve().then(() => this.emit(object.method, object.params));
}
}
@ -179,7 +175,7 @@ export class CDPSession extends EventEmitter {
callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
this._callbacks.clear();
this._connection = null;
this.emit(CDPSessionEvents.Disconnected);
Promise.resolve().then(() => this.emit(CDPSessionEvents.Disconnected));
}
}

View File

@ -18,7 +18,7 @@
import {assert} from '../helper';
import {EventEmitter} from 'events';
import * as debug from 'debug';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SlowMoTransport } from '../transport';
import { Protocol } from './protocol';
const debugProtocol = debug('playwright:protocol');
@ -30,18 +30,17 @@ export class Connection extends EventEmitter {
private _url: string;
private _lastId: number;
private _callbacks: Map<number, {resolve: Function, reject: Function, error: Error, method: string}>;
private _delay: number;
private _transport: ConnectionTransport;
private _sessions: Map<string, JugglerSession>;
_closed: boolean;
constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) {
super();
this._url = url;
this._lastId = 0;
this._callbacks = new Map();
this._delay = delay;
this._transport = transport;
this._transport = SlowMoTransport.wrap(transport, delay);
this._transport.onmessage = this._onMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
this._sessions = new Map();
@ -76,8 +75,6 @@ export class Connection extends EventEmitter {
}
async _onMessage(message: string) {
if (this._delay)
await new Promise(f => setTimeout(f, this._delay));
debugProtocol('◀ RECV ' + message);
const object = JSON.parse(message);
if (object.method === 'Target.attachedToTarget') {
@ -106,7 +103,7 @@ export class Connection extends EventEmitter {
callback.resolve(object.result);
}
} else {
this.emit(object.method, object.params);
Promise.resolve().then(() => this.emit(object.method, object.params));
}
}
@ -122,7 +119,7 @@ export class Connection extends EventEmitter {
for (const session of this._sessions.values())
session._onClosed();
this._sessions.clear();
this.emit(ConnectionEvents.Disconnected);
Promise.resolve().then(() => this.emit(ConnectionEvents.Disconnected));
}
dispose() {
@ -181,7 +178,7 @@ export class JugglerSession extends EventEmitter {
callback.resolve(object.result);
} else {
assert(!object.id);
this.emit(object.method, object.params);
Promise.resolve().then(() => this.emit(object.method, object.params));
}
}
@ -196,7 +193,7 @@ export class JugglerSession extends EventEmitter {
callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
this._callbacks.clear();
this._connection = null;
this.emit(JugglerSessionEvents.Disconnected);
Promise.resolve().then(() => this.emit(JugglerSessionEvents.Disconnected));
}
}

View File

@ -17,14 +17,14 @@
import * as os from 'os';
import * as path from 'path';
import {Connection} from './Connection';
import {Browser} from './Browser';
import {BrowserFetcher, BrowserFetcherOptions} from '../browserFetcher';
import { Connection } from './Connection';
import { Browser } from './Browser';
import { BrowserFetcher, BrowserFetcherOptions } from '../browserFetcher';
import * as fs from 'fs';
import * as util from 'util';
import {debugError, assert} from '../helper';
import {TimeoutError} from '../errors';
import {WebSocketTransport} from './WebSocketTransport';
import { debugError, assert } from '../helper';
import { TimeoutError } from '../errors';
import { WebSocketTransport } from '../transport';
import { launchProcess, waitForLine } from '../processLauncher';
const mkdtempAsync = util.promisify(fs.mkdtemp);

View File

@ -1,89 +0,0 @@
/**
* Copyright 2018 Google Inc. All rights reserved.
* Modifications copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConnectionTransport } from '../transport';
import * as WebSocket from 'ws';
export class WebSocketTransport implements ConnectionTransport {
_ws: WebSocket;
_dispatchQueue: DispatchQueue;
onclose?: () => void;
onmessage?: (message: string) => void;
static create(url: string): Promise<WebSocketTransport> {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url, [], { perMessageDeflate: false });
ws.addEventListener('open', () => resolve(new WebSocketTransport(ws)));
ws.addEventListener('error', reject);
});
}
constructor(ws: WebSocket) {
this._ws = ws;
this._dispatchQueue = new DispatchQueue(this);
this._ws.addEventListener('message', event => {
this._dispatchQueue.enqueue(event.data);
});
this._ws.addEventListener('close', event => {
if (this.onclose)
this.onclose.call(null);
});
// Silently ignore all errors - we don't know what to do with them.
this._ws.addEventListener('error', () => {});
}
send(message: string) {
this._ws.send(message);
}
close() {
this._ws.close();
}
}
// We want to dispatch all "message" events in separate tasks
// to make sure all message-related promises are resolved first
// before dispatching next message.
//
// We cannot just use setTimeout() in Node.js here like we would
// do in Browser - see https://github.com/nodejs/node/issues/23773
// Thus implement a dispatch queue that enforces new tasks manually.
class DispatchQueue {
_transport: ConnectionTransport;
_timeoutId: NodeJS.Timer = null;
_queue: string[] = [];
constructor(transport : ConnectionTransport) {
this._transport = transport;
this._dispatch = this._dispatch.bind(this);
}
enqueue(message: string) {
this._queue.push(message);
if (!this._timeoutId)
this._timeoutId = setTimeout(this._dispatch, 0);
}
_dispatch() {
const message = this._queue.shift();
if (this._queue.length)
this._timeoutId = setTimeout(this._dispatch, 0);
else
this._timeoutId = null;
if (this._transport.onmessage)
this._transport.onmessage.call(null, message);
}
}

View File

@ -117,3 +117,72 @@ export class PipeTransport implements ConnectionTransport {
helper.removeEventListeners(this._eventListeners);
}
}
export class SlowMoTransport {
private readonly _delay: number;
private readonly _delegate: ConnectionTransport;
private _incomingMessageQueue: string[] = [];
private _dispatchTimerId?: NodeJS.Timer;
private _closed = false;
onmessage?: (message: string) => void;
onclose?: () => void;
static wrap(transport: ConnectionTransport, delay?: number): ConnectionTransport {
return delay ? new SlowMoTransport(transport, delay) : transport;
}
constructor(transport: ConnectionTransport, delay: number) {
this._delay = delay;
this._delegate = transport;
this._delegate.onmessage = this._enqueueMessage.bind(this);
this._delegate.onclose = this._onClose.bind(this);
}
private _enqueueMessage(message: string) {
this._incomingMessageQueue.push(message);
this._scheduleQueueDispatch();
}
private _scheduleQueueDispatch() {
if (this._dispatchTimerId)
return;
if (!this._incomingMessageQueue.length)
return;
this._dispatchTimerId = setTimeout(() => {
this._dispatchTimerId = undefined;
this._dispatchOneMessageFromQueue();
}, this._delay);
}
private _dispatchOneMessageFromQueue() {
if (this._closed)
return;
const message = this._incomingMessageQueue.shift();
try {
if (this.onmessage)
this.onmessage(message);
} finally {
this._scheduleQueueDispatch();
}
}
private _onClose() {
if (this._closed)
return;
if (this.onclose)
this.onclose();
this._closed = true;
this._delegate.onmessage = null;
this._delegate.onclose = null;
}
send(s: string) {
this._delegate.send(s);
}
close() {
this._closed = true;
this._delegate.close();
}
}

View File

@ -58,8 +58,8 @@ export class Browser extends EventEmitter implements BrowserInterface {
this._eventListeners = [
helper.addEventListener(this._connection, ConnectionEvents.TargetCreated, this._onTargetCreated.bind(this)),
helper.addEventListener(this._connection, 'Target.targetDestroyed', this._onTargetDestroyed.bind(this)),
helper.addEventListener(this._connection, 'Target.didCommitProvisionalTarget', this._onProvisionalTargetCommitted.bind(this)),
helper.addEventListener(this._connection, ConnectionEvents.TargetDestroyed, this._onTargetDestroyed.bind(this)),
helper.addEventListener(this._connection, ConnectionEvents.DidCommitProvisionalTarget, this._onProvisionalTargetCommitted.bind(this)),
];
// Intercept provisional targets during cross-process navigation.
@ -142,7 +142,7 @@ export class Browser extends EventEmitter implements BrowserInterface {
return contextPages.reduce((acc, x) => acc.concat(x), []);
}
async _onTargetCreated(session: TargetSession, targetInfo: Protocol.Target.TargetInfo) {
_onTargetCreated(session: TargetSession, targetInfo: Protocol.Target.TargetInfo) {
let context = null;
if (targetInfo.browserContextId) {
// FIXME: we don't know about the default context id, so assume that all targets from
@ -170,9 +170,10 @@ export class Browser extends EventEmitter implements BrowserInterface {
const openerPage = opener._page;
if (!openerPage || !openerPage.listenerCount(Events.Page.Popup))
return;
const page = await target.page();
openerPage.emit(Events.Page.Popup, page);
target.page().then(page => openerPage.emit(Events.Page.Popup, page));
}
if (targetInfo.isPaused)
this._connection.send('Target.resume', { targetId: targetInfo.targetId }).catch(debugError);
}
_onTargetDestroyed({targetId}) {

View File

@ -15,37 +15,33 @@
* limitations under the License.
*/
import {assert, debugError} from '../helper';
import { assert } from '../helper';
import * as debug from 'debug';
import { EventEmitter } from 'events';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SlowMoTransport } from '../transport';
import { Protocol } from './protocol';
const debugProtocol = debug('playwright:protocol');
const debugWrappedMessage = require('debug')('wrapped');
export const ConnectionEvents = {
TargetCreated: Symbol('ConnectionEvents.TargetCreated')
TargetCreated: Symbol('ConnectionEvents.TargetCreated'),
TargetDestroyed: Symbol('Connection.TargetDestroyed'),
DidCommitProvisionalTarget: Symbol('Connection.DidCommitProvisionalTarget')
};
export class Connection extends EventEmitter {
_lastId = 0;
private readonly _callbacks = new Map<number, {resolve:(o: any) => void, reject: (e: Error) => void, error: Error, method: string}>();
private readonly _delay: number;
private readonly _transport: ConnectionTransport;
private readonly _sessions = new Map<string, TargetSession>();
private _incomingMessageQueue: string[] = [];
private _dispatchTimerId?: NodeJS.Timer;
private _sameDispatchTask: boolean = false;
_closed = false;
constructor(transport: ConnectionTransport, delay: number | undefined = 0) {
super();
this._delay = delay;
this._transport = transport;
this._transport.onmessage = this._onMessage.bind(this);
this._transport = SlowMoTransport.wrap(transport, delay);
this._transport.onmessage = this._dispatchMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
}
@ -71,52 +67,6 @@ export class Connection extends EventEmitter {
return id;
}
private _onMessage(message: string) {
if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) {
this._enqueueMessage(message);
} else {
this._sameDispatchTask = true;
// This is for the case when several messages come in a batch and read
// in a loop by transport ending up in the same task.
Promise.resolve().then(() => this._sameDispatchTask = false);
this._dispatchMessage(message);
}
}
private _enqueueMessage(message: string) {
this._incomingMessageQueue.push(message);
this._scheduleQueueDispatch();
}
private _enqueueProvisionalMessages(messages: string[]) {
// Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message.
this._incomingMessageQueue = messages.concat(this._incomingMessageQueue);
this._scheduleQueueDispatch();
}
private _scheduleQueueDispatch() {
if (this._dispatchTimerId)
return;
if (!this._incomingMessageQueue.length)
return;
const delay = this._delay || 0;
this._dispatchTimerId = setTimeout(() => {
this._dispatchTimerId = undefined;
this._dispatchOneMessageFromQueue();
}, delay);
}
private _dispatchOneMessageFromQueue() {
if (this._closed)
return;
const message = this._incomingMessageQueue.shift();
try {
this._dispatchMessage(message);
} finally {
this._scheduleQueueDispatch();
}
}
private _dispatchMessage(message: string) {
debugProtocol('◀ RECV ' + message);
const object = JSON.parse(message);
@ -134,7 +84,7 @@ export class Connection extends EventEmitter {
assert(this._closed, 'Received response for unknown callback: ' + object.id);
}
} else {
this.emit(object.method, object.params);
Promise.resolve().then(() => this.emit(object.method, object.params));
}
}
@ -143,26 +93,26 @@ export class Connection extends EventEmitter {
const targetInfo = object.params.targetInfo as Protocol.Target.TargetInfo;
const session = new TargetSession(this, targetInfo);
this._sessions.set(session._sessionId, session);
this.emit(ConnectionEvents.TargetCreated, session, object.params.targetInfo);
if (targetInfo.isPaused)
this.send('Target.resume', { targetId: targetInfo.targetId }).catch(debugError);
Promise.resolve().then(() => this.emit(ConnectionEvents.TargetCreated, session, object.params.targetInfo));
} else if (object.method === 'Target.targetDestroyed') {
const session = this._sessions.get(object.params.targetId);
if (session) {
session._onClosed();
this._sessions.delete(object.params.targetId);
}
Promise.resolve().then(() => this.emit(ConnectionEvents.TargetDestroyed, { targetId: object.params.targetId }));
} else if (object.method === 'Target.dispatchMessageFromTarget') {
const {targetId, message} = object.params as Protocol.Target.dispatchMessageFromTargetPayload;
const session = this._sessions.get(targetId);
if (!session)
throw new Error('Unknown target: ' + targetId);
if (session.isProvisional())
session._addProvisionalMessage(wrappedMessage);
session._addProvisionalMessage(message);
else
session._dispatchMessageFromTarget(message);
} else if (object.method === 'Target.didCommitProvisionalTarget') {
const {oldTargetId, newTargetId} = object.params as Protocol.Target.didCommitProvisionalTargetPayload;
Promise.resolve().then(() => this.emit(ConnectionEvents.DidCommitProvisionalTarget, { oldTargetId, newTargetId }));
const newSession = this._sessions.get(newTargetId);
if (!newSession)
throw new Error('Unknown new target: ' + newTargetId);
@ -170,7 +120,8 @@ export class Connection extends EventEmitter {
if (!oldSession)
throw new Error('Unknown old target: ' + oldTargetId);
oldSession._swappedOut = true;
this._enqueueProvisionalMessages(newSession._takeProvisionalMessagesAndCommit());
for (const message of newSession._takeProvisionalMessagesAndCommit())
newSession._dispatchMessageFromTarget(message);
}
}
@ -278,7 +229,7 @@ export class TargetSession extends EventEmitter {
callback.resolve(object.result);
} else {
assert(!object.id);
this.emit(object.method, object.params);
Promise.resolve().then(() => this.emit(object.method, object.params));
}
}
@ -292,7 +243,7 @@ export class TargetSession extends EventEmitter {
}
this._callbacks.clear();
this._connection = null;
this.emit(TargetSessionEvents.Disconnected);
Promise.resolve().then(() => this.emit(TargetSessionEvents.Disconnected));
}
}