chore: extract process and process host (#20166)

This commit is contained in:
Pavel Feldman 2023-01-17 12:43:51 -08:00 committed by GitHub
parent c36827433d
commit 9a64597d74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 302 additions and 180 deletions

View File

@ -14,16 +14,15 @@
* limitations under the License. * limitations under the License.
*/ */
import child_process from 'child_process'; import type { TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, StepBeginPayload, StepEndPayload, TeardownErrorsPayload, WatchTestResolvedPayload, RunPayload, SerializedLoaderData } from './ipc';
import path from 'path';
import { EventEmitter } from 'events';
import type { RunPayload, TestBeginPayload, TestEndPayload, DonePayload, TestOutputPayload, WorkerInitParams, StepBeginPayload, StepEndPayload, SerializedLoaderData, TeardownErrorsPayload, WatchTestResolvedPayload, WorkerIsolation } from './ipc';
import type { TestResult, Reporter, TestStep, TestError } from '../types/testReporter'; import type { TestResult, Reporter, TestStep, TestError } from '../types/testReporter';
import type { Suite } from './test'; import type { Suite } from './test';
import type { Loader } from './loader'; import type { Loader } from './loader';
import type { ProcessExitData } from './processHost';
import { TestCase } from './test'; import { TestCase } from './test';
import { ManualPromise } from 'playwright-core/lib/utils'; import { ManualPromise } from 'playwright-core/lib/utils';
import { TestTypeImpl } from './testType'; import { TestTypeImpl } from './testType';
import { WorkerHost } from './workerHost';
export type TestGroup = { export type TestGroup = {
workerHash: string; workerHash: string;
@ -45,14 +44,8 @@ type TestData = {
resultByWorkerIndex: Map<number, TestResultData>; resultByWorkerIndex: Map<number, TestResultData>;
}; };
type WorkerExitData = {
unexpectedly: boolean;
code: number | null;
signal: NodeJS.Signals | null;
};
export class Dispatcher { export class Dispatcher {
private _workerSlots: { busy: boolean, worker?: Worker }[] = []; private _workerSlots: { busy: boolean, worker?: WorkerHost }[] = [];
private _queue: TestGroup[] = []; private _queue: TestGroup[] = [];
private _queuedOrRunningHashCount = new Map<string, number>(); private _queuedOrRunningHashCount = new Map<string, number>();
private _finished = new ManualPromise<void>(); private _finished = new ManualPromise<void>();
@ -115,10 +108,10 @@ export class Dispatcher {
// 2. Start the worker if it is down. // 2. Start the worker if it is down.
if (!worker) { if (!worker) {
worker = this._createWorker(job.workerHash, index); worker = this._createWorker(job, index, this._loader.serialize());
this._workerSlots[index].worker = worker; this._workerSlots[index].worker = worker;
worker.on('exit', () => this._workerSlots[index].worker = undefined); worker.on('exit', () => this._workerSlots[index].worker = undefined);
await worker.init(job, this._loader.serialize()); await worker.init();
if (this._isStopped) // Check stopped signal after async hop. if (this._isStopped) // Check stopped signal after async hop.
return; return;
} }
@ -147,7 +140,7 @@ export class Dispatcher {
this._finished.resolve(); this._finished.resolve();
} }
private _isWorkerRedundant(worker: Worker) { private _isWorkerRedundant(worker: WorkerHost) {
let workersWithSameHash = 0; let workersWithSameHash = 0;
for (const slot of this._workerSlots) { for (const slot of this._workerSlots) {
if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash()) if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash())
@ -170,8 +163,16 @@ export class Dispatcher {
await this._finished; await this._finished;
} }
async _runJob(worker: Worker, testGroup: TestGroup) { async _runJob(worker: WorkerHost, testGroup: TestGroup) {
worker.run(testGroup); const runPayload: RunPayload = {
file: testGroup.requireFile,
entries: testGroup.tests.map(test => {
return { testId: test.id, retry: test.results.length };
}),
watchMode: testGroup.watchMode,
phase: testGroup.phase,
};
worker.runTestGroup(runPayload);
let doneCallback = () => {}; let doneCallback = () => {};
const result = new Promise<void>(f => doneCallback = f); const result = new Promise<void>(f => doneCallback = f);
@ -203,6 +204,7 @@ export class Dispatcher {
result.workerIndex = worker.workerIndex; result.workerIndex = worker.workerIndex;
result.startTime = new Date(params.startWallTime); result.startTime = new Date(params.startWallTime);
this._reporter.onTestBegin?.(data.test, result); this._reporter.onTestBegin?.(data.test, result);
worker.currentTestId = params.testId;
}; };
worker.addListener('testBegin', onTestBegin); worker.addListener('testBegin', onTestBegin);
@ -235,6 +237,7 @@ export class Dispatcher {
if (isFailure) if (isFailure)
failedTestIds.add(params.testId); failedTestIds.add(params.testId);
this._reportTestEnd(test, result); this._reportTestEnd(test, result);
worker.currentTestId = null;
}; };
worker.addListener('testEnd', onTestEnd); worker.addListener('testEnd', onTestEnd);
@ -424,7 +427,7 @@ export class Dispatcher {
}; };
worker.on('done', onDone); worker.on('done', onDone);
const onExit = (data: WorkerExitData) => { const onExit = (data: ProcessExitData) => {
const unexpectedExitError: TestError | undefined = data.unexpectedly ? { const unexpectedExitError: TestError | undefined = data.unexpectedly ? {
message: `Internal error: worker process exited unexpectedly (code=${data.code}, signal=${data.signal})` message: `Internal error: worker process exited unexpectedly (code=${data.code}, signal=${data.signal})`
} : undefined; } : undefined;
@ -435,8 +438,8 @@ export class Dispatcher {
return result; return result;
} }
_createWorker(hash: string, parallelIndex: number) { _createWorker(testGroup: TestGroup, parallelIndex: number, loaderData: SerializedLoaderData) {
const worker = new Worker(hash, parallelIndex, this._loader.fullConfig()._workerIsolation); const worker = new WorkerHost(testGroup, parallelIndex, this._loader.fullConfig()._workerIsolation, loaderData);
const handleOutput = (params: TestOutputPayload) => { const handleOutput = (params: TestOutputPayload) => {
const chunk = chunkFromParams(params); const chunk = chunkFromParams(params);
if (worker.didFail()) { if (worker.didFail()) {
@ -445,9 +448,9 @@ export class Dispatcher {
// the next retry. // the next retry.
return { chunk }; return { chunk };
} }
if (!params.testId) if (!worker.currentTestId)
return { chunk }; return { chunk };
const data = this._testById.get(params.testId)!; const data = this._testById.get(worker.currentTestId)!;
return { chunk, test: data.test, result: data.resultByWorkerIndex.get(worker.workerIndex)?.result }; return { chunk, test: data.test, result: data.resultByWorkerIndex.get(worker.workerIndex)?.result };
}; };
worker.on('stdOut', (params: TestOutputPayload) => { worker.on('stdOut', (params: TestOutputPayload) => {
@ -495,119 +498,6 @@ export class Dispatcher {
} }
} }
let lastWorkerIndex = 0;
class Worker extends EventEmitter {
private process: child_process.ChildProcess;
private _hash: string;
readonly parallelIndex: number;
readonly workerIndex: number;
private _didSendStop = false;
private _didFail = false;
private didExit = false;
private _ready: Promise<void>;
workerIsolation: WorkerIsolation;
constructor(hash: string, parallelIndex: number, workerIsolation: WorkerIsolation) {
super();
this.workerIndex = lastWorkerIndex++;
this._hash = hash;
this.parallelIndex = parallelIndex;
this.workerIsolation = workerIsolation;
this.process = child_process.fork(path.join(__dirname, 'worker.js'), {
detached: false,
env: {
FORCE_COLOR: '1',
DEBUG_COLORS: '1',
TEST_WORKER_INDEX: String(this.workerIndex),
TEST_PARALLEL_INDEX: String(this.parallelIndex),
...process.env
},
// Can't pipe since piping slows down termination for some reason.
stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc']
});
this.process.on('exit', (code, signal) => {
this.didExit = true;
this.emit('exit', { unexpectedly: !this._didSendStop, code, signal } as WorkerExitData);
});
this.process.on('error', e => {}); // do not yell at a send to dead process.
this.process.on('message', (message: any) => {
const { method, params } = message;
this.emit(method, params);
});
this._ready = new Promise((resolve, reject) => {
this.process.once('exit', (code, signal) => reject(new Error(`worker exited with code "${code}" and signal "${signal}" before it became ready`)));
this.once('ready', () => resolve());
});
}
async init(testGroup: TestGroup, loaderData: SerializedLoaderData) {
await this._ready;
const params: WorkerInitParams = {
workerIsolation: this.workerIsolation,
workerIndex: this.workerIndex,
parallelIndex: this.parallelIndex,
repeatEachIndex: testGroup.repeatEachIndex,
projectId: testGroup.projectId,
loader: loaderData,
stdoutParams: {
rows: process.stdout.rows,
columns: process.stdout.columns,
colorDepth: process.stdout.getColorDepth?.() || 8
},
stderrParams: {
rows: process.stderr.rows,
columns: process.stderr.columns,
colorDepth: process.stderr.getColorDepth?.() || 8
},
};
this.send({ method: 'init', params });
}
run(testGroup: TestGroup) {
const runPayload: RunPayload = {
file: testGroup.requireFile,
entries: testGroup.tests.map(test => {
return { testId: test.id, retry: test.results.length };
}),
watchMode: testGroup.watchMode,
phase: testGroup.phase,
};
this.send({ method: 'run', params: runPayload });
}
didFail() {
return this._didFail;
}
didSendStop() {
return this._didSendStop;
}
hash() {
return this._hash;
}
async stop(didFail?: boolean) {
if (didFail)
this._didFail = true;
if (this.didExit)
return;
if (!this._didSendStop) {
this.send({ method: 'stop' });
this._didSendStop = true;
}
await new Promise(f => this.once('exit', f));
}
private send(message: any) {
// This is a great place for debug logging.
this.process.send(message);
}
}
function chunkFromParams(params: TestOutputPayload): string | Buffer { function chunkFromParams(params: TestOutputPayload): string | Buffer {
if (typeof params.text === 'string') if (typeof params.text === 'string')
return params.text; return params.text;

View File

@ -34,6 +34,12 @@ export type WorkerIsolation =
'isolate-pools'; // create new worker for new worker fixture pool digest 'isolate-pools'; // create new worker for new worker fixture pool digest
export type ProcessInitParams = {
workerIndex?: number;
stdoutParams: TtyParams;
stderrParams: TtyParams;
};
export type WorkerInitParams = { export type WorkerInitParams = {
workerIsolation: WorkerIsolation; workerIsolation: WorkerIsolation;
workerIndex: number; workerIndex: number;
@ -41,8 +47,6 @@ export type WorkerInitParams = {
repeatEachIndex: number; repeatEachIndex: number;
projectId: string; projectId: string;
loader: SerializedLoaderData; loader: SerializedLoaderData;
stdoutParams: TtyParams;
stderrParams: TtyParams;
}; };
export type WatchTestResolvedPayload = { export type WatchTestResolvedPayload = {
@ -105,7 +109,6 @@ export type DonePayload = {
}; };
export type TestOutputPayload = { export type TestOutputPayload = {
testId?: string;
text?: string; text?: string;
buffer?: string; buffer?: string;
}; };

View File

@ -16,31 +16,55 @@
import type { WriteStream } from 'tty'; import type { WriteStream } from 'tty';
import * as util from 'util'; import * as util from 'util';
import type { RunPayload, TeardownErrorsPayload, TestOutputPayload, TtyParams, WorkerInitParams } from './ipc'; import type { ProcessInitParams, TeardownErrorsPayload, TestOutputPayload, TtyParams } from './ipc';
import { startProfiling, stopProfiling } from './profiler'; import { startProfiling, stopProfiling } from './profiler';
import type { TestInfoError } from './types';
import { serializeError } from './util'; import { serializeError } from './util';
import { WorkerRunner } from './workerRunner';
export type ProtocolRequest = {
id: number;
method: string;
params?: any;
};
export type ProtocolResponse = {
id?: number;
error?: string;
method?: string;
params?: any;
result?: any;
};
export class ProcessRunner {
appendProcessTeardownDiagnostics(error: TestInfoError) { }
unhandledError(reason: any) { }
async cleanup(): Promise<void> { }
async stop(): Promise<void> { }
protected dispatchEvent(method: string, params: any) {
const response: ProtocolResponse = { method, params };
sendMessageToParent({ method: '__dispatch__', params: response });
}
}
let closed = false; let closed = false;
sendMessageToParent('ready'); sendMessageToParent({ method: 'ready' });
process.stdout.write = (chunk: string | Buffer) => { process.stdout.write = (chunk: string | Buffer) => {
const outPayload: TestOutputPayload = { const outPayload: TestOutputPayload = {
testId: workerRunner?._currentTest?._test.id,
...chunkToParams(chunk) ...chunkToParams(chunk)
}; };
sendMessageToParent('stdOut', outPayload); sendMessageToParent({ method: 'stdOut', params: outPayload });
return true; return true;
}; };
if (!process.env.PW_RUNNER_DEBUG) { if (!process.env.PW_RUNNER_DEBUG) {
process.stderr.write = (chunk: string | Buffer) => { process.stderr.write = (chunk: string | Buffer) => {
const outPayload: TestOutputPayload = { const outPayload: TestOutputPayload = {
testId: workerRunner?._currentTest?._test.id,
...chunkToParams(chunk) ...chunkToParams(chunk)
}; };
sendMessageToParent('stdErr', outPayload); sendMessageToParent({ method: 'stdErr', params: outPayload });
return true; return true;
}; };
} }
@ -49,37 +73,43 @@ process.on('disconnect', gracefullyCloseAndExit);
process.on('SIGINT', () => {}); process.on('SIGINT', () => {});
process.on('SIGTERM', () => {}); process.on('SIGTERM', () => {});
let workerRunner: WorkerRunner; let processRunner: ProcessRunner;
let workerIndex: number | undefined; let workerIndex: number | undefined;
process.on('unhandledRejection', (reason, promise) => { process.on('unhandledRejection', (reason, promise) => {
if (workerRunner) if (processRunner)
workerRunner.unhandledError(reason); processRunner.unhandledError(reason);
}); });
process.on('uncaughtException', error => { process.on('uncaughtException', error => {
if (workerRunner) if (processRunner)
workerRunner.unhandledError(error); processRunner.unhandledError(error);
}); });
process.on('message', async message => { process.on('message', async message => {
if (message.method === 'init') { if (message.method === 'init') {
const initParams = message.params as WorkerInitParams; const initParams = message.params as ProcessInitParams;
workerIndex = initParams.workerIndex; workerIndex = initParams.workerIndex;
initConsoleParameters(initParams); initConsoleParameters(initParams);
startProfiling(); startProfiling();
workerRunner = new WorkerRunner(initParams); const { create } = require(process.env.PW_PROCESS_RUNNER_SCRIPT!);
for (const event of ['watchTestResolved', 'testBegin', 'testEnd', 'stepBegin', 'stepEnd', 'done', 'teardownErrors']) processRunner = create(initParams) as ProcessRunner;
workerRunner.on(event, sendMessageToParent.bind(null, event));
return; return;
} }
if (message.method === 'stop') { if (message.method === 'stop') {
await gracefullyCloseAndExit(); await gracefullyCloseAndExit();
return; return;
} }
if (message.method === 'run') { if (message.method === '__dispatch__') {
const runPayload = message.params as RunPayload; const { id, method, params } = message.params as ProtocolRequest;
await workerRunner!.runTestGroup(runPayload); try {
const result = await (processRunner as any)[method](params);
const response: ProtocolResponse = { id, result };
sendMessageToParent({ method: '__dispatch__', params: response });
} catch (e) {
const response: ProtocolResponse = { id, error: e.toString() };
sendMessageToParent({ method: '__dispatch__', params: response });
}
} }
}); });
@ -91,27 +121,27 @@ async function gracefullyCloseAndExit() {
setTimeout(() => process.exit(0), 30000); setTimeout(() => process.exit(0), 30000);
// Meanwhile, try to gracefully shutdown. // Meanwhile, try to gracefully shutdown.
try { try {
if (workerRunner) { if (processRunner) {
await workerRunner.stop(); await processRunner.stop();
await workerRunner.cleanup(); await processRunner.cleanup();
} }
if (workerIndex !== undefined) if (workerIndex !== undefined)
await stopProfiling(workerIndex); await stopProfiling(workerIndex);
} catch (e) { } catch (e) {
try { try {
const error = serializeError(e); const error = serializeError(e);
workerRunner.appendWorkerTeardownDiagnostics(error); processRunner.appendProcessTeardownDiagnostics(error);
const payload: TeardownErrorsPayload = { fatalErrors: [error] }; const payload: TeardownErrorsPayload = { fatalErrors: [error] };
process.send!({ method: 'teardownErrors', params: payload }); sendMessageToParent({ method: 'teardownErrors', params: payload });
} catch { } catch {
} }
} }
process.exit(0); process.exit(0);
} }
function sendMessageToParent(method: string, params = {}) { function sendMessageToParent(message: { method: string, params?: any }) {
try { try {
process.send!({ method, params }); process.send!(message);
} catch (e) { } catch (e) {
// Can throw when closing. // Can throw when closing.
} }
@ -125,7 +155,7 @@ function chunkToParams(chunk: Buffer | string): { text?: string, buffer?: strin
return { text: chunk }; return { text: chunk };
} }
function initConsoleParameters(initParams: WorkerInitParams) { function initConsoleParameters(initParams: ProcessInitParams) {
// Make sure the output supports colors. // Make sure the output supports colors.
setTtyParams(process.stdout, initParams.stdoutParams); setTtyParams(process.stdout, initParams.stdoutParams);
setTtyParams(process.stderr, initParams.stderrParams); setTtyParams(process.stderr, initParams.stderrParams);

View File

@ -0,0 +1,137 @@
/**
* Copyright Microsoft Corporation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import child_process from 'child_process';
import { EventEmitter } from 'events';
import type { ProcessInitParams } from './ipc';
import type { ProtocolResponse } from './process';
export type ProcessExitData = {
unexpectedly: boolean;
code: number | null;
signal: NodeJS.Signals | null;
};
export class ProcessHost<InitParams> extends EventEmitter {
private process!: child_process.ChildProcess;
private _didSendStop = false;
private _didFail = false;
private didExit = false;
private _runnerScript: string;
private _lastMessageId = 0;
private _callbacks = new Map<number, { resolve: (result: any) => void, reject: (error: Error) => void }>();
constructor(runnerScript: string) {
super();
this._runnerScript = runnerScript;
}
async doInit(params: InitParams) {
this.process = child_process.fork(require.resolve('./process'), {
detached: false,
env: {
FORCE_COLOR: '1',
DEBUG_COLORS: '1',
PW_PROCESS_RUNNER_SCRIPT: this._runnerScript,
...process.env
},
// Can't pipe since piping slows down termination for some reason.
stdio: ['ignore', 'ignore', process.env.PW_RUNNER_DEBUG ? 'inherit' : 'ignore', 'ipc']
});
this.process.on('exit', (code, signal) => {
this.didExit = true;
this.emit('exit', { unexpectedly: !this._didSendStop, code, signal } as ProcessExitData);
});
this.process.on('error', e => {}); // do not yell at a send to dead process.
this.process.on('message', (message: any) => {
if (message.method === '__dispatch__') {
const { id, error, method, params, result } = message.params as ProtocolResponse;
if (id && this._callbacks.has(id)) {
const { resolve, reject } = this._callbacks.get(id)!;
this._callbacks.delete(id);
if (error)
reject(new Error(error));
else
resolve(result);
} else {
this.emit(method!, params);
}
} else {
this.emit(message.method!, message.params);
}
});
await new Promise<void>((resolve, reject) => {
this.process.once('exit', (code, signal) => reject(new Error(`process exited with code "${code}" and signal "${signal}" before it became ready`)));
this.once('ready', () => resolve());
});
const processParams: ProcessInitParams = {
stdoutParams: {
rows: process.stdout.rows,
columns: process.stdout.columns,
colorDepth: process.stdout.getColorDepth?.() || 8
},
stderrParams: {
rows: process.stderr.rows,
columns: process.stderr.columns,
colorDepth: process.stderr.getColorDepth?.() || 8
},
};
this.send({ method: 'init', params: { ...processParams, ...params } });
}
protected sendMessage(message: { method: string, params?: any }) {
const id = ++this._lastMessageId;
this.send({
method: '__dispatch__',
params: { id, ...message }
});
return new Promise((resolve, reject) => {
this._callbacks.set(id, { resolve, reject });
});
}
protected sendMessageNoReply(message: { method: string, params?: any }) {
this.sendMessage(message).catch(() => {});
}
async stop(didFail?: boolean) {
if (didFail)
this._didFail = true;
if (this.didExit)
return;
if (!this._didSendStop) {
this.send({ method: 'stop' });
this._didSendStop = true;
}
await new Promise(f => this.once('exit', f));
}
didFail() {
return this._didFail;
}
didSendStop() {
return this._didSendStop;
}
private send(message: { method: string, params?: any }) {
// This is a great place for debug logging.
this.process.send(message);
}
}

View File

@ -0,0 +1,57 @@
/**
* Copyright Microsoft Corporation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { TestGroup } from './dispatcher';
import type { RunPayload, SerializedLoaderData, WorkerInitParams, WorkerIsolation } from './ipc';
import { ProcessHost } from './processHost';
let lastWorkerIndex = 0;
export class WorkerHost extends ProcessHost<WorkerInitParams> {
readonly parallelIndex: number;
readonly workerIndex: number;
private _hash: string;
currentTestId: string | null = null;
private _initParams: WorkerInitParams;
constructor(testGroup: TestGroup, parallelIndex: number, workerIsolation: WorkerIsolation, loader: SerializedLoaderData) {
super(require.resolve('./workerRunner.js'));
this.workerIndex = lastWorkerIndex++;
this.parallelIndex = parallelIndex;
this._hash = testGroup.workerHash;
this._initParams = {
workerIsolation,
workerIndex: this.workerIndex,
parallelIndex,
repeatEachIndex: testGroup.repeatEachIndex,
projectId: testGroup.projectId,
loader,
};
}
async init() {
await this.doInit(this._initParams);
}
runTestGroup(runPayload: RunPayload) {
this.sendMessageNoReply({ method: 'runTestGroup', params: runPayload });
}
hash() {
return this._hash;
}
}

View File

@ -16,7 +16,6 @@
import { colors, rimraf } from 'playwright-core/lib/utilsBundle'; import { colors, rimraf } from 'playwright-core/lib/utilsBundle';
import util from 'util'; import util from 'util';
import { EventEmitter } from 'events';
import { debugTest, formatLocation, relativeFilePath, serializeError } from './util'; import { debugTest, formatLocation, relativeFilePath, serializeError } from './util';
import type { TestBeginPayload, TestEndPayload, RunPayload, DonePayload, WorkerInitParams, TeardownErrorsPayload, WatchTestResolvedPayload } from './ipc'; import type { TestBeginPayload, TestEndPayload, RunPayload, DonePayload, WorkerInitParams, TeardownErrorsPayload, WatchTestResolvedPayload } from './ipc';
import { setCurrentTestInfo } from './globals'; import { setCurrentTestInfo } from './globals';
@ -28,10 +27,11 @@ import { ManualPromise } from 'playwright-core/lib/utils';
import { TestInfoImpl } from './testInfo'; import { TestInfoImpl } from './testInfo';
import type { TimeSlot } from './timeoutManager'; import type { TimeSlot } from './timeoutManager';
import { TimeoutManager } from './timeoutManager'; import { TimeoutManager } from './timeoutManager';
import { ProcessRunner } from './process';
const removeFolderAsync = util.promisify(rimraf); const removeFolderAsync = util.promisify(rimraf);
export class WorkerRunner extends EventEmitter { export class WorkerRunner extends ProcessRunner {
private _params: WorkerInitParams; private _params: WorkerInitParams;
private _loader!: Loader; private _loader!: Loader;
private _project!: FullProjectInternal; private _project!: FullProjectInternal;
@ -48,7 +48,7 @@ export class WorkerRunner extends EventEmitter {
private _isStopped = false; private _isStopped = false;
// This promise resolves once the single "run test group" call finishes. // This promise resolves once the single "run test group" call finishes.
private _runFinished = new ManualPromise<void>(); private _runFinished = new ManualPromise<void>();
_currentTest: TestInfoImpl | null = null; private _currentTest: TestInfoImpl | null = null;
private _lastRunningTests: TestInfoImpl[] = []; private _lastRunningTests: TestInfoImpl[] = [];
private _totalRunningTests = 0; private _totalRunningTests = 0;
// Dynamic annotations originated by modifiers with a callback, e.g. `test.skip(() => true)`. // Dynamic annotations originated by modifiers with a callback, e.g. `test.skip(() => true)`.
@ -59,6 +59,9 @@ export class WorkerRunner extends EventEmitter {
constructor(params: WorkerInitParams) { constructor(params: WorkerInitParams) {
super(); super();
process.env.TEST_WORKER_INDEX = String(params.workerIndex);
process.env.TEST_PARALLEL_INDEX = String(params.parallelIndex);
this._params = params; this._params = params;
this._fixtureRunner = new FixtureRunner(); this._fixtureRunner = new FixtureRunner();
@ -67,7 +70,7 @@ export class WorkerRunner extends EventEmitter {
this._runFinished.resolve(); this._runFinished.resolve();
} }
stop(): Promise<void> { override stop(): Promise<void> {
if (!this._isStopped) { if (!this._isStopped) {
this._isStopped = true; this._isStopped = true;
@ -80,18 +83,18 @@ export class WorkerRunner extends EventEmitter {
return this._runFinished; return this._runFinished;
} }
async cleanup() { override async cleanup() {
// We have to load the project to get the right deadline below. // We have to load the project to get the right deadline below.
await this._loadIfNeeded(); await this._loadIfNeeded();
await this._teardownScopes(); await this._teardownScopes();
if (this._fatalErrors.length) { if (this._fatalErrors.length) {
this.appendWorkerTeardownDiagnostics(this._fatalErrors[this._fatalErrors.length - 1]); this.appendProcessTeardownDiagnostics(this._fatalErrors[this._fatalErrors.length - 1]);
const payload: TeardownErrorsPayload = { fatalErrors: this._fatalErrors }; const payload: TeardownErrorsPayload = { fatalErrors: this._fatalErrors };
this.emit('teardownErrors', payload); this.dispatchEvent('teardownErrors', payload);
} }
} }
appendWorkerTeardownDiagnostics(error: TestInfoError) { override appendProcessTeardownDiagnostics(error: TestInfoError) {
if (!this._lastRunningTests.length) if (!this._lastRunningTests.length)
return; return;
const count = this._totalRunningTests === 1 ? '1 test' : `${this._totalRunningTests} tests`; const count = this._totalRunningTests === 1 ? '1 test' : `${this._totalRunningTests} tests`;
@ -130,7 +133,7 @@ export class WorkerRunner extends EventEmitter {
this._fatalErrors.push(timeoutError); this._fatalErrors.push(timeoutError);
} }
unhandledError(error: Error | any) { override unhandledError(error: Error | any) {
// Usually, we do not differentiate between errors in the control flow // Usually, we do not differentiate between errors in the control flow
// and unhandled errors - both lead to the test failing. This is good for regular tests, // and unhandled errors - both lead to the test failing. This is good for regular tests,
// so that you can, e.g. expect() from inside an event handler. The test fails, // so that you can, e.g. expect() from inside an event handler. The test fails,
@ -181,7 +184,7 @@ export class WorkerRunner extends EventEmitter {
title: test.title, title: test.title,
location: test.location location: test.location
}; };
this.emit('watchTestResolved', testResolvedPayload); this.dispatchEvent('watchTestResolved', testResolvedPayload);
entries.set(test.id, { testId: test.id, retry: 0 }); entries.set(test.id, { testId: test.id, retry: 0 });
} }
if (!entries.has(test.id)) if (!entries.has(test.id))
@ -222,7 +225,7 @@ export class WorkerRunner extends EventEmitter {
if (entries.has(test.id)) if (entries.has(test.id))
donePayload.skipTestsDueToSetupFailure.push(test.id); donePayload.skipTestsDueToSetupFailure.push(test.id);
} }
this.emit('done', donePayload); this.dispatchEvent('done', donePayload);
this._fatalErrors = []; this._fatalErrors = [];
this._skipRemainingTestsInSuite = undefined; this._skipRemainingTestsInSuite = undefined;
this._runFinished.resolve(); this._runFinished.resolve();
@ -231,8 +234,8 @@ export class WorkerRunner extends EventEmitter {
private async _runTest(test: TestCase, retry: number, nextTest: TestCase | undefined) { private async _runTest(test: TestCase, retry: number, nextTest: TestCase | undefined) {
const testInfo = new TestInfoImpl(this._loader, this._project, this._params, test, retry, const testInfo = new TestInfoImpl(this._loader, this._project, this._params, test, retry,
stepBeginPayload => this.emit('stepBegin', stepBeginPayload), stepBeginPayload => this.dispatchEvent('stepBegin', stepBeginPayload),
stepEndPayload => this.emit('stepEnd', stepEndPayload)); stepEndPayload => this.dispatchEvent('stepEnd', stepEndPayload));
const processAnnotation = (annotation: Annotation) => { const processAnnotation = (annotation: Annotation) => {
testInfo.annotations.push(annotation); testInfo.annotations.push(annotation);
@ -283,7 +286,7 @@ export class WorkerRunner extends EventEmitter {
this._currentTest = testInfo; this._currentTest = testInfo;
setCurrentTestInfo(testInfo); setCurrentTestInfo(testInfo);
this.emit('testBegin', buildTestBeginPayload(testInfo)); this.dispatchEvent('testBegin', buildTestBeginPayload(testInfo));
const isSkipped = testInfo.expectedStatus === 'skipped'; const isSkipped = testInfo.expectedStatus === 'skipped';
const hasAfterAllToRunBeforeNextTest = reversedSuites.some(suite => { const hasAfterAllToRunBeforeNextTest = reversedSuites.some(suite => {
@ -292,7 +295,7 @@ export class WorkerRunner extends EventEmitter {
if (isSkipped && nextTest && !hasAfterAllToRunBeforeNextTest) { if (isSkipped && nextTest && !hasAfterAllToRunBeforeNextTest) {
// Fast path - this test is skipped, and there are more tests that will handle cleanup. // Fast path - this test is skipped, and there are more tests that will handle cleanup.
testInfo.status = 'skipped'; testInfo.status = 'skipped';
this.emit('testEnd', buildTestEndPayload(testInfo)); this.dispatchEvent('testEnd', buildTestEndPayload(testInfo));
return; return;
} }
@ -474,7 +477,7 @@ export class WorkerRunner extends EventEmitter {
afterHooksStep.complete({ error: firstAfterHooksError }); afterHooksStep.complete({ error: firstAfterHooksError });
this._currentTest = null; this._currentTest = null;
setCurrentTestInfo(null); setCurrentTestInfo(null);
this.emit('testEnd', buildTestEndPayload(testInfo)); this.dispatchEvent('testEnd', buildTestEndPayload(testInfo));
const preserveOutput = this._loader.fullConfig().preserveOutput === 'always' || const preserveOutput = this._loader.fullConfig().preserveOutput === 'always' ||
(this._loader.fullConfig().preserveOutput === 'failures-only' && testInfo._isFailure()); (this._loader.fullConfig().preserveOutput === 'failures-only' && testInfo._isFailure());
@ -623,3 +626,5 @@ function formatTestTitle(test: TestCase, projectName: string) {
const projectTitle = projectName ? `[${projectName}] ` : ''; const projectTitle = projectName ? `[${projectName}] ` : '';
return `${projectTitle}${location} ${titles.join(' ')}`; return `${projectTitle}${location} ${titles.join(' ')}`;
} }
export const create = (params: WorkerInitParams) => new WorkerRunner(params);