Added support for multithreaded type checking in CLI. (#8309)

This commit is contained in:
Eric Traut 2024-07-04 20:52:37 -07:00 committed by GitHub
parent 746749272c
commit 057b351efa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 359 additions and 9 deletions

View File

@ -19,12 +19,13 @@ Pyright can be run as either a VS Code extension or as a node-based command-line
| --skipunannotated | Skip type analysis of unannotated functions |
| --stats | Print detailed performance stats |
| -t, --typeshedpath `<DIRECTORY>` | Use typeshed type stubs at this location (3) |
| -v, --venvpath `<DIRECTORY>` | Directory that contains virtual environments (4) |
| --threads <optional N> | Use up to N threads to parallelize type checking (4) |
| -v, --venvpath `<DIRECTORY>` | Directory that contains virtual environments (5) |
| --verbose | Emit verbose diagnostics |
| --verifytypes `<IMPORT>` | Verify completeness of types in py.typed package |
| --version | Print pyright version and exit |
| --warnings | Use exit code of 1 if warnings are reported |
| -w, --watch | Continue to run and watch for changes (5) |
| -w, --watch | Continue to run and watch for changes (6) |
| - | Read file or directory list from stdin |
(1) If specific files are specified on the command line, it overrides the files or directories specified in the pyrightconfig.json or pyproject.toml file.
@ -33,9 +34,11 @@ Pyright can be run as either a VS Code extension or as a node-based command-line
(3) Pyright has built-in typeshed type stubs for Python stdlib functionality. To use a different version of typeshed type stubs, specify the directory with this option.
(4) This option is the same as the language server setting `python.venvPath`. It used in conjunction with configuration file, which can refer to different virtual environments by name. For more details, refer to the [configuration](configuration.md) and [import resolution](import-resolution.md#configuring-your-python-environment) documentation. This allows a common config file to be checked in to the project and shared by everyone on the development team without making assumptions about the local paths to the venv directory on each developers computer.
(4) This feature is experimental. If thread count is > 1, multiple copies of pyright are executed in parallel to type check files in a project. If no thread count is specified, the thread count is based on the number of available logical processors (if at least 4) or 1 (if less than 4).
(5) When running in watch mode, pyright will reanalyze only those files that have been modified. These “deltas” are typically much faster than the initial analysis, which needs to analyze all files in the source tree.
(5) This option is the same as the language server setting `python.venvPath`. It used in conjunction with configuration file, which can refer to different virtual environments by name. For more details, refer to the [configuration](configuration.md) and [import resolution](import-resolution.md#configuring-your-python-environment) documentation. This allows a common config file to be checked in to the project and shared by everyone on the development team without making assumptions about the local paths to the venv directory on each developers computer.
(6) When running in watch mode, pyright will reanalyze only those files that have been modified. These “deltas” are typically much faster than the initial analysis, which needs to analyze all files in the source tree.
# Pyright Exit Codes

View File

@ -16,10 +16,15 @@ import { timingStats } from './common/timing';
import chalk from 'chalk';
import commandLineArgs, { CommandLineOptions, OptionDefinition } from 'command-line-args';
import * as fs from 'fs';
import * as os from 'os';
import { ChildProcess, fork } from 'child_process';
import { AnalysisResults } from './analyzer/analysis';
import { PackageTypeReport, TypeKnownStatus } from './analyzer/packageTypeReport';
import { PackageTypeVerifier } from './analyzer/packageTypeVerifier';
import { AnalyzerService } from './analyzer/service';
import { maxSourceFileSize } from './analyzer/sourceFile';
import { SourceFileInfo } from './analyzer/sourceFileInfo';
import { ChokidarFileWatcherProvider } from './common/chokidarFileWatcherProvider';
import { CommandLineOptions as PyrightCommandLineOptions } from './common/commandLineOptions';
import { ConsoleInterface, LogLevel, StandardConsole, StderrConsole } from './common/console';
@ -155,6 +160,7 @@ async function processArgs(): Promise<ExitStatus> {
{ name: 'pythonversion', type: String },
{ name: 'skipunannotated', type: Boolean },
{ name: 'stats', type: Boolean },
{ name: 'threads', type: parseThreadsArgValue },
{ name: 'typeshed-path', type: String },
{ name: 'typeshedpath', alias: 't', type: String },
{ name: 'venv-path', type: String },
@ -192,7 +198,7 @@ async function processArgs(): Promise<ExitStatus> {
}
for (const [arg, value] of Object.entries(args)) {
if (value === null) {
if (value === null && arg !== 'threads') {
console.error(`'${arg}' option requires a value`);
return ExitStatus.ParameterError;
}
@ -209,7 +215,7 @@ async function processArgs(): Promise<ExitStatus> {
}
if (args.verifytypes !== undefined) {
const incompatibleArgs = ['watch', 'stats', 'createstub', 'dependencies', 'skipunannotated'];
const incompatibleArgs = ['watch', 'stats', 'createstub', 'dependencies', 'skipunannotated', 'threads'];
for (const arg of incompatibleArgs) {
if (args[arg] !== undefined) {
console.error(`'verifytypes' option cannot be used with '${arg}' option`);
@ -219,7 +225,7 @@ async function processArgs(): Promise<ExitStatus> {
}
if (args.createstub) {
const incompatibleArgs = ['watch', 'stats', 'verifytypes', 'dependencies', 'skipunannotated'];
const incompatibleArgs = ['watch', 'stats', 'verifytypes', 'dependencies', 'skipunannotated', 'threads'];
for (const arg of incompatibleArgs) {
if (args[arg] !== undefined) {
console.error(`'createstub' option cannot be used with '${arg}' option`);
@ -228,6 +234,16 @@ async function processArgs(): Promise<ExitStatus> {
}
}
if (args.threads) {
const incompatibleArgs = ['watch', 'stats', 'dependencies'];
for (const arg of incompatibleArgs) {
if (args[arg] !== undefined) {
console.error(`'threads' option cannot be used with '${arg}' option`);
return ExitStatus.ParameterError;
}
}
}
const options = new PyrightCommandLineOptions(process.cwd(), false);
const tempFile = new RealTempFile();
@ -362,7 +378,6 @@ async function processArgs(): Promise<ExitStatus> {
options.logTypeEvaluationTime = true;
}
const treatWarningsAsErrors = !!args.warnings;
let logLevel = LogLevel.Error;
if (args.stats || args.verbose) {
logLevel = LogLevel.Info;
@ -396,12 +411,44 @@ async function processArgs(): Promise<ExitStatus> {
options.watchForSourceChanges = watch;
options.watchForConfigChanges = watch;
// Refresh service after 2 seconds after the last library file change is detected.
const service = new AnalyzerService('<default>', serviceProvider, {
console: output,
hostFactory: () => new FullAccessHost(serviceProvider),
// Refresh service 2 seconds after the last library file change is detected.
libraryReanalysisTimeProvider: () => 2 * 1000,
});
if ('threads' in args) {
let threadCount = args['threads'];
// If the thread count was unspecified, use the number of
// logical CPUs (i.e. hardware threads). We find empirically
// that going below 4 threads usually doesn't help.
if (threadCount === null) {
threadCount = os.cpus().length;
if (threadCount < 4) {
threadCount = 1;
}
}
if (threadCount > 1) {
return runMultiThreaded(args, options, threadCount, service, minSeverityLevel, output);
}
}
return runSingleThreaded(args, options, service, minSeverityLevel, output);
}
async function runSingleThreaded(
args: CommandLineOptions,
options: PyrightCommandLineOptions,
service: AnalyzerService,
minSeverityLevel: SeverityLevel,
output: ConsoleInterface
) {
const watch = args.watch !== undefined;
const treatWarningsAsErrors = !!args.warnings;
const exitStatus = createDeferred<ExitStatus>();
service.setCompletionCallback((results) => {
@ -492,6 +539,285 @@ async function processArgs(): Promise<ExitStatus> {
return await exitStatus.promise;
}
async function runMultiThreaded(
args: CommandLineOptions,
options: PyrightCommandLineOptions,
maxThreadCount: number,
service: AnalyzerService,
minSeverityLevel: SeverityLevel,
output: ConsoleInterface
) {
const workers: ChildProcess[] = [];
const startTime = Date.now();
const treatWarningsAsErrors = !!args.warnings;
const exitStatus = createDeferred<ExitStatus>();
// Specify that only open files should be checked. This will allow us
// to control which files are checked by which workers.
options.checkOnlyOpenFiles = true;
// This will trigger discovery of files in the project.
service.setOptions(options);
const program = service.backgroundAnalysisProgram.program;
// Get the list of "tracked" source files -- those that will be type checked.
const sourceFilesToAnalyze = program.getSourceFileInfoList().filter((info) => info.isTracked);
// Don't create more workers than there are files.
const workerCount = Math.min(maxThreadCount, sourceFilesToAnalyze.length);
// Split the source files into affinity queues, one for each worker. We assume
// that files that are next to each other in the directory hierarchy probably
// have more common imports, so we want to analyze them with the same worker
// if possible to maximize type cache hits.
const affinityQueues: SourceFileInfo[][] = new Array<SourceFileInfo[]>(workerCount);
const filesPerAffinityQueue = sourceFilesToAnalyze.length / workerCount;
for (let i = 0; i < sourceFilesToAnalyze.length; i++) {
const affinityIndex = Math.floor(i / filesPerAffinityQueue);
if (affinityQueues[affinityIndex] === undefined) {
affinityQueues[affinityIndex] = [];
}
affinityQueues[affinityIndex].push(sourceFilesToAnalyze[i]);
}
console.info(`Found ${sourceFilesToAnalyze.length} files to analyze`);
console.info(`Using ${workerCount} threads`);
const fileDiagnostics: FileDiagnostics[] = [];
let pendingAnalysisCount = 0;
const sendMessageToWorker = (worker: ChildProcess, message: string, data: any) => {
worker.send(JSON.stringify({ action: message, data: data }));
};
const analyzeNextFile = (workerIndex: number) => {
const worker = workers[workerIndex];
let nextFileToAnalyze: SourceFileInfo | undefined;
// Determine the next file to analyze for this worker.
for (let i = 0; i < affinityQueues.length; i++) {
const affinityIndex = (workerIndex + i) % affinityQueues.length;
if (affinityQueues[affinityIndex].length > 0) {
nextFileToAnalyze = affinityQueues[affinityIndex].shift()!;
break;
}
}
if (nextFileToAnalyze) {
// Tell the worker to analyze the next file.
const fileUri = nextFileToAnalyze.sourceFile.getUri().toString();
sendMessageToWorker(worker, 'analyzeFile', fileUri);
pendingAnalysisCount++;
} else {
// Kill the worker since there's nothing left to do.
worker.kill();
if (pendingAnalysisCount === 0) {
// If there are no more files to analyze and all pending analysis
// is complete, report the results and exit.
if (!exitStatus.resolved) {
const elapsedTime = (Date.now() - startTime) / 1000;
let errorCount = 0;
if (args.outputjson) {
const report = reportDiagnosticsAsJson(
fileDiagnostics,
minSeverityLevel,
sourceFilesToAnalyze.length,
elapsedTime
);
errorCount += report.errorCount;
if (treatWarningsAsErrors) {
errorCount += report.warningCount;
}
} else {
printVersion(output);
const report = reportDiagnosticsAsText(fileDiagnostics, minSeverityLevel);
errorCount += report.errorCount;
if (treatWarningsAsErrors) {
errorCount += report.warningCount;
}
// Print the total time.
output.info(`Completed in ${elapsedTime}sec`);
}
exitStatus.resolve(errorCount > 0 ? ExitStatus.ErrorsReported : ExitStatus.NoErrors);
}
}
}
};
// Launch worker processes.
for (let i = 0; i < workerCount; i++) {
const mainModulePath = process.mainModule!.filename;
const worker = fork(mainModulePath, ['worker', i.toString()]);
worker.on('message', (message) => {
let messageObj: any;
try {
messageObj = JSON.parse(message as string);
} catch {
console.error(`Invalid message from worker: ${message}`);
console.error(`Resolved exitStatus: FatalError`);
exitStatus.resolve(ExitStatus.FatalError);
}
// If the exit status has already been resolved, another thread
// generated a fatal error, so we shouldn't continue.
if (exitStatus.resolved) {
return;
}
switch (messageObj.action) {
case 'analysisResults': {
pendingAnalysisCount--;
const results = messageObj.data as AnalysisResults;
if (results.fatalErrorOccurred) {
console.error(`Fatal error from worker`);
console.error(`Resolved exitStatus: FatalError`);
exitStatus.resolve(ExitStatus.FatalError);
return;
}
if (results.configParseErrorOccurred) {
console.error(`Resolved exitStatus: ConfigFileParseError`);
exitStatus.resolve(ExitStatus.ConfigFileParseError);
return;
}
for (const fileDiag of results.diagnostics) {
fileDiagnostics.push(FileDiagnostics.fromJsonObj(fileDiag));
}
analyzeNextFile(i);
break;
}
default: {
console.error(`Unknown message from worker: ${message}`);
console.error(`Resolved exitStatus: FatalError`);
exitStatus.resolve(ExitStatus.FatalError);
break;
}
}
});
worker.on('error', (err) => {
console.error('Failed to start child process:', err);
console.error(`Resolved exitStatus: FatalError`);
exitStatus.resolve(ExitStatus.FatalError);
});
sendMessageToWorker(worker, 'setOptions', options);
workers.push(worker);
// Tell the worker to analyze the next file.
analyzeNextFile(i);
}
return await exitStatus.promise;
}
// This is the message loop for a worker process used used for
// multi-threaded analysis.
function runWorkerMessageLoop(workerNum: number) {
let serviceProvider: ServiceProvider | undefined;
let service: AnalyzerService | undefined;
let fileSystem: PyrightFileSystem | undefined;
let lastOpenFileUri: Uri | undefined;
const sendMessageToParent = (message: string, data: any) => {
process.send?.(JSON.stringify({ action: message, data: data }));
};
process.on('message', (message) => {
let messageObj: any;
try {
messageObj = JSON.parse(message as string);
} catch {
console.error(`Invalid message from parent: ${message}`);
return;
}
switch (messageObj.action) {
case 'setOptions': {
const options = new PyrightCommandLineOptions(process.cwd(), false);
Object.keys(messageObj.data).forEach((key) => {
(options as any)[key] = messageObj.data[key];
});
let logLevel = LogLevel.Error;
if (options.verboseOutput) {
logLevel = LogLevel.Info;
}
const output = new StderrConsole(logLevel);
const tempFile = new RealTempFile();
fileSystem = new PyrightFileSystem(
createFromRealFileSystem(tempFile, output, new ChokidarFileWatcherProvider(output))
);
serviceProvider = createServiceProvider(fileSystem, output, tempFile);
service = new AnalyzerService('<default>', serviceProvider, {
console: output,
hostFactory: () => new FullAccessHost(serviceProvider!),
// Refresh service 2 seconds after the last library file change is detected.
libraryReanalysisTimeProvider: () => 2 * 1000,
});
service.setCompletionCallback((results) => {
// We're interested only in diagnostics for the last open file.
const fileDiags = results.diagnostics.filter((fileDiag) =>
fileDiag.fileUri.equals(lastOpenFileUri)
);
// Convert JSON-compatible format.
const resultsObj = {
...results,
diagnostics: fileDiags.map((fileDiag) => FileDiagnostics.toJsonObj(fileDiag)),
};
sendMessageToParent('analysisResults', resultsObj);
});
service.setOptions(options);
break;
}
case 'analyzeFile': {
if (serviceProvider && fileSystem && service) {
const uri = Uri.parse(messageObj.data as string, serviceProvider);
// Check the file's length before attempting to read its full contents.
const fileStat = fileSystem.statSync(uri);
if (fileStat.size > maxSourceFileSize) {
console.error(
`File length of "${uri}" is ${fileStat.size} ` +
`which exceeds the maximum supported file size of ${maxSourceFileSize}`
);
throw new Error('File larger than max');
}
const fileContents = fileSystem.readFileSync(uri, 'utf8');
lastOpenFileUri = uri;
service?.setFileOpened(uri, /* version */ 1, fileContents);
}
break;
}
}
});
}
function verifyPackageTypes(
serviceProvider: ServiceProvider,
packageName: string,
@ -787,6 +1113,7 @@ function printUsage() {
' --skipunannotated Skip analysis of functions with no type annotations\n' +
' --stats Print detailed performance stats\n' +
' -t,--typeshedpath <DIRECTORY> Use typeshed type stubs at this location\n' +
' --threads <optional COUNT> Use separate threads to parallelize type checking \n' +
' -v,--venvpath <DIRECTORY> Directory that contains virtual environments\n' +
' --verbose Emit verbose diagnostics\n' +
' --verifytypes <PACKAGE> Verify type completeness of a py.typed package\n' +
@ -984,6 +1311,19 @@ function logDiagnosticToConsole(diag: PyrightJsonDiagnostic, prefix = ' ') {
console.info(message);
}
function parseThreadsArgValue(input: string | null): any {
if (input === null || input === 'auto') {
return null;
}
const value = parseInt(input, 10);
if (isNaN(value) || value < 1) {
return null;
}
return value;
}
// Increase the default stack trace limit from 16 to 64 to help diagnose
// crashes with deep stack traces.
Error.stackTraceLimit = 64;
@ -994,6 +1334,13 @@ export async function main() {
require('source-map-support').install();
}
// Is this a worker process for multi-threaded analysis?
if (process.argv[2] === 'worker') {
const workerNumber = parseInt(process.argv[3]);
runWorkerMessageLoop(workerNumber);
return;
}
const exitCode = await processArgs();
process.exitCode = exitCode;
// Don't call process.exit; stdout may not have been flushed which can break readers.