feat: github agent, browser worker (#13336)

This commit is contained in:
Yury Semikhatsky 2022-04-06 18:03:27 -07:00 committed by GitHub
parent c80365dd43
commit fccc14cdfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 26 deletions

View File

@ -1,5 +1,6 @@
[*]
../client/
../dispatchers/
../remote/
../server/
../utils/

View File

@ -0,0 +1,67 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the 'License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { GridAgentLaunchOptions, GridFactory } from './gridServer';
import https from 'https';
import debug from 'debug';
const repoName = process.env.GITHUB_AGENT_REPO;
if (!repoName)
throw new Error('GITHUB_AGENT_REPO is not specified.');
const repoAccessToken = process.env.GITHUB_AGENT_REPO_ACCESS_TOKEN;
if (!repoAccessToken)
throw new Error('GITHUB_AGENT_REPO_ACCESS_TOKEN is not specified.');
const log = debug(`pw:grid:server`);
const githubFactory: GridFactory = {
name: 'Agents hosted on Github',
capacity: 10,
launchTimeout: 30000,
retireTimeout: 600000,
launch: async (options: GridAgentLaunchOptions) => {
await createWorkflow(options);
},
};
async function createWorkflow(inputs: GridAgentLaunchOptions): Promise<boolean> {
return new Promise(fulfill => {
log(`triggering workflow ${JSON.stringify(inputs)}`);
const req = https.request(`https://api.github.com/repos/${repoName}/actions/workflows/agent.yml/dispatches`, {
method: 'POST',
headers: {
'User-Agent': 'request',
'Accept': 'application/vnd.github.v3+json',
'Authorization': `token ${repoAccessToken}`,
}
}, response => {
log(`workflow ${inputs.agentId} response: ${response.statusCode} ${response.statusMessage}`);
const success = !!response.statusCode && 200 <= response.statusCode && response.statusCode < 300;
fulfill(success);
});
req.on('error', e => {
log(`failed to create workflow ${inputs.agentId}`);
fulfill(false);
});
req.end(JSON.stringify({
'ref': 'refs/heads/main',
inputs
}));
});
}
export default githubFactory;

View File

@ -26,9 +26,13 @@ export function launchGridAgent(agentId: string, gridURL: string) {
params.set('pwVersion', getPlaywrightVersion(true /* majorMinorOnly */));
params.set('agentId', agentId);
const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerAgent?` + params.toString());
ws.on('message', (workerId: string) => {
log('Worker requested ' + workerId);
fork(require.resolve('./gridWorker.js'), [gridURL, agentId, workerId], { detached: true });
ws.on('message', (message: string) => {
log('worker requested ' + message);
const { workerId, browserAlias } = JSON.parse(message);
if (browserAlias)
fork(require.resolve('./gridBrowserWorker.js'), [gridURL, agentId, workerId, browserAlias], { detached: true });
else
fork(require.resolve('./gridWorker.js'), [gridURL, agentId, workerId], { detached: true });
});
ws.on('close', () => process.exit(0));
}

View File

@ -0,0 +1,35 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import debug from 'debug';
import WebSocket from 'ws';
import { PlaywrightConnection } from '../remote/playwrightConnection';
import { gracefullyCloseAll } from '../utils/processLauncher';
function launchGridBrowserWorker(gridURL: string, agentId: string, workerId: string, browserAlias: string) {
const log = debug(`pw:grid:worker:${workerId}`);
log('created');
const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`);
new PlaywrightConnection(ws, true, browserAlias, undefined, log, async () => {
log('exiting process');
setTimeout(() => process.exit(0), 30000);
// Meanwhile, try to gracefully close all browsers.
await gracefullyCloseAll();
process.exit(0);
});
}
launchGridBrowserWorker(process.argv[2], process.argv[3], process.argv[4], process.argv[5]);

View File

@ -28,7 +28,7 @@ export class GridClient {
params.set('pwVersion', getPlaywrightVersion(true /* majorMinorOnly */));
const ws = new WebSocket(`${gridURL}/claimWorker?` + params.toString());
const errorText = await Promise.race([
new Promise(f => ws.once('message', () => f(undefined))),
new Promise(f => ws.once('open', () => f(undefined))),
new Promise(f => ws.once('close', (code, reason) => f(reason))),
]);
if (errorText)

View File

@ -15,13 +15,12 @@
*/
import debug from 'debug';
import assert from 'assert';
import { EventEmitter } from 'events';
import { URL } from 'url';
import type { Server as WebSocketServer } from 'ws';
import type WebSocket from 'ws';
import { HttpServer } from '../utils/httpServer';
import { createGuid, getPlaywrightVersion } from '../utils/utils';
import { assert, createGuid, getPlaywrightVersion } from '../utils/utils';
export type GridAgentLaunchOptions = {
agentId: string,
@ -58,18 +57,35 @@ const WSErrors = {
AGENT_MANUALLY_STOPPED: { code: 1000, reason: 'Grid agent was manually stopped' },
};
type GridWorkerParams = {
browserAlias?: string;
headless?: boolean;
};
class GridWorker extends EventEmitter {
readonly workerId = createGuid();
readonly params: GridWorkerParams;
private _workerSocket: WebSocket | undefined;
private _clientSocket: WebSocket;
private _log: debug.Debugger;
private _bufferedMessages: WebSocket.RawData[] = [];
constructor(clientSocket: WebSocket) {
constructor(clientSocket: WebSocket, params: GridWorkerParams) {
super();
this._log = debug(`pw:grid:worker${this.workerId}`);
this._log = debug(`pw:grid:worker:${this.workerId}`);
this._clientSocket = clientSocket;
this.params = params;
clientSocket.on('close', (code: number, reason: string) => this.closeWorker(WSErrors.NO_ERROR));
clientSocket.on('error', (error: Error) => this.closeWorker(WSErrors.CLIENT_SOCKET_ERROR));
// clientSocket.pause() would be preferrable but according to the docs " Some events can still be
// emitted after it is called, until all buffered data is consumed."
this._clientSocket.on('message', data => {
if (this._workerSocket)
this._workerSocket.send(data);
else
this._bufferedMessages.push(data);
});
}
workerConnected(workerSocket: WebSocket) {
@ -77,13 +93,14 @@ class GridWorker extends EventEmitter {
this._workerSocket = workerSocket;
workerSocket.on('close', (code: number, reason: string) => this.closeWorker(WSErrors.NO_ERROR));
workerSocket.on('error', (error: Error) => this.closeWorker(WSErrors.WORKER_SOCKET_ERROR));
this._clientSocket.on('message', data => workerSocket!.send(data));
workerSocket.on('message', data => this._clientSocket!.send(data));
this._clientSocket.send('run');
for (const data of this._bufferedMessages)
workerSocket.send(data);
this._bufferedMessages = [];
}
closeWorker(errorCode: ErrorCode) {
this._log('close');
this._log(`close ${errorCode.reason}`);
this._workerSocket?.close(errorCode.code, errorCode.reason);
this._clientSocket.close(errorCode.code, errorCode.reason);
this.emit('close');
@ -111,7 +128,7 @@ class GridAgent extends EventEmitter {
constructor(capacity = Infinity, creationTimeout = 5 * 60000, retireTimeout = 30000) {
super();
this._capacity = capacity;
this._log = debug(`pw:grid:agent${this.agentId}`);
this._log = debug(`pw:grid:agent:${this.agentId}`);
this.setStatus('created');
this._retireTimeout = retireTimeout;
this._agentCreationTimeoutId = setTimeout(() => {
@ -132,10 +149,8 @@ class GridAgent extends EventEmitter {
clearTimeout(this._agentCreationTimeoutId);
this.setStatus('connected');
this._ws = ws;
for (const worker of this._workersWaitingForAgentConnected) {
this._log(`send worker id: ${worker.workerId}`);
ws.send(worker.workerId);
}
for (const worker of this._workersWaitingForAgentConnected)
this._sendStartWorkerMessage(worker);
this._workersWaitingForAgentConnected.clear();
}
@ -143,12 +158,12 @@ class GridAgent extends EventEmitter {
return this._workers.size < this._capacity;
}
async createWorker(clientSocket: WebSocket) {
async createWorker(clientSocket: WebSocket, params: GridWorkerParams) {
if (this._retireTimeoutId)
clearTimeout(this._retireTimeoutId);
if (this._ws)
this.setStatus('connected');
const worker = new GridWorker(clientSocket);
const worker = new GridWorker(clientSocket, params);
this._log(`create worker: ${worker.workerId}`);
this._workers.set(worker.workerId, worker);
worker.on('close', () => {
@ -162,12 +177,10 @@ class GridAgent extends EventEmitter {
this._retireTimeoutId = setTimeout(() => this.closeAgent(WSErrors.AGENT_RETIRED), this._retireTimeout);
}
});
if (this._ws) {
this._log(`send worker id: ${worker.workerId}`);
this._ws.send(worker.workerId);
} else {
if (this._ws)
this._sendStartWorkerMessage(worker);
else
this._workersWaitingForAgentConnected.add(worker);
}
}
workerConnected(workerId: string, ws: WebSocket) {
@ -185,6 +198,16 @@ class GridAgent extends EventEmitter {
this._ws?.close(errorCode.code, errorCode.reason);
this.emit('close');
}
private _sendStartWorkerMessage(worker: GridWorker) {
const message = JSON.stringify({
...worker.params,
'workerId': worker.workerId,
});
this._log(`start worker message: ${message}`);
assert(this._ws);
this._ws.send(message);
}
}
export class GridServer {
@ -198,6 +221,7 @@ export class GridServer {
constructor(factory: GridFactory, authToken: string = '') {
this._log = debug(`pw:grid:server`);
this._log(`using factory ${factory.name}`);
this._authToken = authToken || '';
this._server = new HttpServer();
this._factory = factory;
@ -240,17 +264,23 @@ export class GridServer {
this._wsServer.on('connection', async (ws, request) => {
if (request.url?.startsWith(this._securePath('/claimWorker'))) {
const params = new URL('http://localhost/' + request.url).searchParams;
if (params.get('pwVersion') !== this._pwVersion && !process.env.PWTEST_UNSAFE_GRID_VERSION) {
const version = params.get('pwVersion');
if (version !== this._pwVersion && !process.env.PWTEST_UNSAFE_GRID_VERSION) {
this._log(`version mismatch: ${version} !== ${this._pwVersion}`);
ws.close(WSErrors.CLIENT_PLAYWRIGHT_VERSION_MISMATCH.code, WSErrors.CLIENT_PLAYWRIGHT_VERSION_MISMATCH.reason);
return;
}
const agent = [...this._agents.values()].find(w => w.canCreateWorker()) || this._createAgent()?.agent;
if (!agent) {
this._log(`failed to get agent`);
ws.close(WSErrors.AGENT_CREATION_FAILED.code, WSErrors.AGENT_CREATION_FAILED.reason);
return;
}
agent.createWorker(ws);
agent.createWorker(ws, {
browserAlias: request.headers['x-playwright-browser'] as string | undefined,
headless: request.headers['x-playwright-headless'] !== '0',
});
return;
}

View File

@ -23,7 +23,7 @@ import { gracefullyCloseAll } from '../utils/processLauncher';
import { SocksProxy } from '../utils/socksProxy';
function launchGridWorker(gridURL: string, agentId: string, workerId: string) {
const log = debug(`pw:grid:worker${workerId}`);
const log = debug(`pw:grid:worker:${workerId}`);
log('created');
const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`);
const dispatcherConnection = new DispatcherConnection();