chore: prepare to load scripts in subprocess (#20174)

This commit is contained in:
Pavel Feldman 2023-01-17 14:53:11 -08:00 committed by GitHub
parent 9ba5a1be38
commit 020dcd89fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 47 additions and 53 deletions

View File

@ -111,7 +111,7 @@ export class Dispatcher {
worker = this._createWorker(job, index, this._loader.serialize());
this._workerSlots[index].worker = worker;
worker.on('exit', () => this._workerSlots[index].worker = undefined);
await worker.init();
await worker.start();
if (this._isStopped) // Check stopped signal after async hop.
return;
}

View File

@ -35,9 +35,9 @@ export type WorkerIsolation =
export type ProcessInitParams = {
workerIndex?: number;
stdoutParams: TtyParams;
stderrParams: TtyParams;
processName: string;
};
export type WorkerInitParams = {

View File

@ -37,9 +37,7 @@ export type ProtocolResponse = {
export class ProcessRunner {
appendProcessTeardownDiagnostics(error: TestInfoError) { }
unhandledError(reason: any) { }
async cleanup(): Promise<void> { }
async stop(): Promise<void> { }
async gracefullyClose(): Promise<void> { }
protected dispatchEvent(method: string, params: any) {
const response: ProtocolResponse = { method, params };
@ -74,29 +72,18 @@ process.on('SIGINT', () => {});
process.on('SIGTERM', () => {});
let processRunner: ProcessRunner;
let workerIndex: number | undefined;
process.on('unhandledRejection', (reason, promise) => {
if (processRunner)
processRunner.unhandledError(reason);
});
process.on('uncaughtException', error => {
if (processRunner)
processRunner.unhandledError(error);
});
let processName: string | undefined;
process.on('message', async message => {
if (message.method === 'init') {
const initParams = message.params as ProcessInitParams;
workerIndex = initParams.workerIndex;
initConsoleParameters(initParams);
if (message.method === '__init__') {
const { processParams, runnerParams, runnerScript } = message.params as { processParams: ProcessInitParams, runnerParams: any, runnerScript: string };
setTtyParams(process.stdout, processParams.stdoutParams);
setTtyParams(process.stderr, processParams.stderrParams);
startProfiling();
const { create } = require(process.env.PW_PROCESS_RUNNER_SCRIPT!);
processRunner = create(initParams) as ProcessRunner;
const { create } = require(runnerScript);
processRunner = create(runnerParams) as ProcessRunner;
return;
}
if (message.method === 'stop') {
if (message.method === '__stop__') {
await gracefullyCloseAndExit();
return;
}
@ -121,12 +108,10 @@ async function gracefullyCloseAndExit() {
setTimeout(() => process.exit(0), 30000);
// Meanwhile, try to gracefully shutdown.
try {
if (processRunner) {
await processRunner.stop();
await processRunner.cleanup();
}
if (workerIndex !== undefined)
await stopProfiling(workerIndex);
if (processRunner)
await processRunner.gracefullyClose();
if (processName)
await stopProfiling(processName);
} catch (e) {
try {
const error = serializeError(e);
@ -155,12 +140,6 @@ function chunkToParams(chunk: Buffer | string): { text?: string, buffer?: strin
return { text: chunk };
}
function initConsoleParameters(initParams: ProcessInitParams) {
// Make sure the output supports colors.
setTtyParams(process.stdout, initParams.stdoutParams);
setTtyParams(process.stderr, initParams.stderrParams);
}
function setTtyParams(stream: WriteStream, params: TtyParams) {
stream.isTTY = true;
if (params.rows)

View File

@ -33,13 +33,15 @@ export class ProcessHost<InitParams> extends EventEmitter {
private _runnerScript: string;
private _lastMessageId = 0;
private _callbacks = new Map<number, { resolve: (result: any) => void, reject: (error: Error) => void }>();
private _processName: string;
constructor(runnerScript: string) {
constructor(runnerScript: string, processName: string) {
super();
this._runnerScript = runnerScript;
this._processName = processName;
}
async doInit(params: InitParams) {
protected async startRunner(runnerParams: InitParams) {
this.process = child_process.fork(require.resolve('./process'), {
detached: false,
env: {
@ -90,9 +92,16 @@ export class ProcessHost<InitParams> extends EventEmitter {
columns: process.stderr.columns,
colorDepth: process.stderr.getColorDepth?.() || 8
},
processName: this._processName
};
this.send({ method: 'init', params: { ...processParams, ...params } });
this.send({
method: '__init__', params: {
processParams,
runnerScript: this._runnerScript,
runnerParams
}
});
}
protected sendMessage(message: { method: string, params?: any }) {
@ -116,7 +125,7 @@ export class ProcessHost<InitParams> extends EventEmitter {
if (this.didExit)
return;
if (!this._didSendStop) {
this.send({ method: 'stop' });
this.send({ method: '__stop__' });
this._didSendStop = true;
}
await new Promise(f => this.once('exit', f));

View File

@ -34,14 +34,14 @@ export async function startProfiling() {
});
}
export async function stopProfiling(workerIndex: number | undefined) {
export async function stopProfiling(processName: string | undefined) {
if (!profileDir)
return;
await new Promise<void>(f => session.post('Profiler.stop', (err, { profile }) => {
if (!err) {
fs.mkdirSync(profileDir, { recursive: true });
fs.writeFileSync(path.join(profileDir, workerIndex === undefined ? 'runner.json' : 'worker' + workerIndex + '.json'), JSON.stringify(profile));
fs.writeFileSync(path.join(profileDir, (processName || 'runner') + '.json'), JSON.stringify(profile));
}
f();
}));

View File

@ -25,15 +25,16 @@ export class WorkerHost extends ProcessHost<WorkerInitParams> {
readonly workerIndex: number;
private _hash: string;
currentTestId: string | null = null;
private _initParams: WorkerInitParams;
private _params: WorkerInitParams;
constructor(testGroup: TestGroup, parallelIndex: number, workerIsolation: WorkerIsolation, loader: SerializedLoaderData) {
super(require.resolve('./workerRunner.js'));
this.workerIndex = lastWorkerIndex++;
const workerIndex = lastWorkerIndex++;
super(require.resolve('./workerRunner.js'), `worker-${workerIndex}`);
this.workerIndex = workerIndex;
this.parallelIndex = parallelIndex;
this._hash = testGroup.workerHash;
this._initParams = {
this._params = {
workerIsolation,
workerIndex: this.workerIndex,
parallelIndex,
@ -43,8 +44,8 @@ export class WorkerHost extends ProcessHost<WorkerInitParams> {
};
}
async init() {
await this.doInit(this._initParams);
async start() {
await this.startRunner(this._params);
}
runTestGroup(runPayload: RunPayload) {

View File

@ -68,9 +68,12 @@ export class WorkerRunner extends ProcessRunner {
// Resolve this promise, so worker does not stall waiting for the non-existent run to finish,
// when it was sopped before running any test group.
this._runFinished.resolve();
process.on('unhandledRejection', reason => this.unhandledError(reason));
process.on('uncaughtException', error => this.unhandledError(error));
}
override stop(): Promise<void> {
private _stop(): Promise<void> {
if (!this._isStopped) {
this._isStopped = true;
@ -83,7 +86,9 @@ export class WorkerRunner extends ProcessRunner {
return this._runFinished;
}
override async cleanup() {
override async gracefullyClose() {
await this._stop();
// We have to load the project to get the right deadline below.
await this._loadIfNeeded();
await this._teardownScopes();
@ -133,7 +138,7 @@ export class WorkerRunner extends ProcessRunner {
this._fatalErrors.push(timeoutError);
}
override unhandledError(error: Error | any) {
unhandledError(error: Error | any) {
// Usually, we do not differentiate between errors in the control flow
// and unhandled errors - both lead to the test failing. This is good for regular tests,
// so that you can, e.g. expect() from inside an event handler. The test fails,
@ -155,7 +160,7 @@ export class WorkerRunner extends ProcessRunner {
if (!this._fatalErrors.length)
this._fatalErrors.push(serializeError(error));
}
this.stop();
this._stop();
}
private async _loadIfNeeded() {
@ -208,7 +213,7 @@ export class WorkerRunner extends ProcessRunner {
}
} else {
fatalUnknownTestIds = runPayload.entries.map(e => e.testId);
this.stop();
this._stop();
}
} catch (e) {
// In theory, we should run above code without any errors.