Share heap usage numbers to support cache cleanup in VS code (#7605)

* Pass shared array buffer around for shared heap stats

* Put back skip logic for background analysis

* Review feedback
This commit is contained in:
Rich Chiodo 2024-04-02 16:49:40 -07:00 committed by GitHub
parent 5fd8830666
commit 4106a09001
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 151 additions and 18 deletions

View File

@ -8,6 +8,9 @@
* if memory usage approaches the max heap space.
*/
import type { HeapInfo } from 'v8';
import { Worker } from 'worker_threads';
import { AnalysisRequest } from '../backgroundAnalysisBase';
import { ConsoleInterface } from '../common/console';
import { fail } from '../common/debug';
import { getHeapStatistics } from '../common/memUtils';
@ -24,12 +27,43 @@ export interface CacheOwner {
export class CacheManager {
private _pausedCount = 0;
private readonly _cacheOwners: CacheOwner[] = [];
private _sharedUsageBuffer: SharedArrayBuffer | undefined;
private _sharedUsagePosition = 0;
private _lastHeapStats = Date.now();
constructor(private readonly _maxWorkers: number = 0) {}
registerCacheOwner(provider: CacheOwner) {
this._cacheOwners.push(provider);
}
addWorker(index: number, worker: Worker) {
// Send the sharedArrayBuffer to the worker so it can be used
// to keep track of heap usage on all threads.
const buffer = this._getSharedUsageBuffer();
if (buffer) {
// The SharedArrayBuffer needs to be separate from data in order for it
// to be marshalled correctly.
worker.postMessage({ requestType: 'cacheUsageBuffer', sharedUsageBuffer: buffer, data: index.toString() });
worker.on('exit', () => {
const view = new Float64Array(buffer);
view[index] = 0;
});
}
}
handleCachedUsageBufferMessage(msg: AnalysisRequest) {
if (msg.requestType === 'cacheUsageBuffer') {
const index = parseInt(msg.data || '0');
const buffer = msg.sharedUsageBuffer;
// Index of zero is reserved for the main thread so if
// the index isn't passed, don't save the shared buffer.
if (buffer && index) {
this._sharedUsageBuffer = buffer;
this._sharedUsagePosition = index;
}
}
}
unregisterCacheOwner(provider: CacheOwner) {
const index = this._cacheOwners.findIndex((p) => p === provider);
if (index < 0) {
@ -82,6 +116,7 @@ export class CacheManager {
// Returns a ratio of used bytes to total bytes.
getUsedHeapRatio(console?: ConsoleInterface) {
const heapStats = getHeapStatistics();
let usage = this._getTotalHeapUsage(heapStats);
if (console && Date.now() - this._lastHeapStats > 1000) {
// This can fill up the user's console, so we only do it once per second.
@ -90,18 +125,50 @@ export class CacheManager {
`Heap stats: ` +
`total_heap_size=${this._convertToMB(heapStats.total_heap_size)}, ` +
`used_heap_size=${this._convertToMB(heapStats.used_heap_size)}, ` +
`cross_worker_used_heap_size=${this._convertToMB(usage)}, ` +
`total_physical_size=${this._convertToMB(heapStats.total_physical_size)}, ` +
`total_available_size=${this._convertToMB(heapStats.total_available_size)}, ` +
`heap_size_limit=${this._convertToMB(heapStats.heap_size_limit)}`
);
}
return heapStats.used_heap_size / heapStats.heap_size_limit;
// Total usage seems to be off by about 5%, so we'll add that back in
// to make the ratio more accurate. (200MB at 4GB)
usage += usage * 0.05;
return usage / heapStats.heap_size_limit;
}
private _convertToMB(bytes: number) {
return `${Math.round(bytes / (1024 * 1024))}MB`;
}
private _getSharedUsageBuffer() {
try {
if (!this._sharedUsageBuffer && this._maxWorkers > 0) {
// Allocate enough space for the workers and the main thread.
this._sharedUsageBuffer = new SharedArrayBuffer(8 * (this._maxWorkers + 1));
}
return this._sharedUsageBuffer;
} catch {
// SharedArrayBuffer is not supported.
return undefined;
}
}
private _getTotalHeapUsage(heapStats: HeapInfo): number {
// If the SharedArrayBuffer is supported, we'll use it to to get usage
// from other threads and add that to our own
const buffer = this._getSharedUsageBuffer();
if (buffer) {
const view = new Float64Array(buffer);
view[this._sharedUsagePosition] = heapStats.used_heap_size;
return view.reduce((a, b) => a + b, 0);
}
return heapStats.used_heap_size;
}
}
export namespace CacheManager {

View File

@ -997,14 +997,16 @@ export class Program {
private _handleMemoryHighUsage() {
const cacheUsage = this._cacheManager.getCacheUsage();
const usedHeapRatio = this._cacheManager.getUsedHeapRatio(
this._configOptions.verboseOutput ? this._console : undefined
);
// If the total cache has exceeded 75%, determine whether we should empty
// the cache.
if (cacheUsage > 0.75) {
const usedHeapRatio = this._cacheManager.getUsedHeapRatio(
this._configOptions.verboseOutput ? this._console : undefined
);
// the cache. If the usedHeapRatio has exceeded 90%, we should definitely
// empty the cache. This can happen before the cacheUsage maxes out because
// we might be on the background thread and a bunch of the cacheUsage is on the main
// thread.
if (cacheUsage > 0.75 || usedHeapRatio > 0.9) {
// The type cache uses a Map, which has an absolute limit of 2^24 entries
// before it will fail. If we cross the 95% mark, we'll empty the cache.
const absoluteMaxCacheEntryCount = (1 << 24) * 0.9;

View File

@ -19,6 +19,8 @@ import { ServiceProvider } from './common/serviceProvider';
import { getRootUri } from './common/uri/uriUtils';
export class BackgroundAnalysis extends BackgroundAnalysisBase {
private static _workerIndex = 0;
constructor(serviceProvider: ServiceProvider) {
super(serviceProvider.console());
@ -26,11 +28,15 @@ export class BackgroundAnalysis extends BackgroundAnalysisBase {
rootUri: getRootUri(serviceProvider)?.toString() ?? '',
cancellationFolderName: getCancellationFolderName(),
runner: undefined,
workerIndex: ++BackgroundAnalysis._workerIndex,
};
// this will load this same file in BG thread and start listener
const worker = new Worker(__filename, { workerData: initialData });
this.setup(worker);
// Tell the cacheManager we have a worker that needs to share data.
serviceProvider.cacheManager()?.addWorker(initialData.workerIndex, worker);
}
}

View File

@ -326,6 +326,10 @@ export abstract class BackgroundAnalysisRunnerBase extends BackgroundThreadBase
protected onMessage(msg: AnalysisRequest) {
switch (msg.requestType) {
case 'cacheUsageBuffer': {
this.serviceProvider.cacheManager()?.handleCachedUsageBufferMessage(msg);
break;
}
case 'analyze': {
const port = msg.port!;
const data = deserialize(msg.data);
@ -740,12 +744,14 @@ export type AnalysisRequestKind =
| 'setImportResolver'
| 'shutdown'
| 'addInterimFile'
| 'analyzeFile';
| 'analyzeFile'
| 'cacheUsageBuffer';
export interface AnalysisRequest {
requestType: AnalysisRequestKind;
data: string | null;
port?: MessagePort | undefined;
sharedUsageBuffer?: SharedArrayBuffer;
}
export type AnalysisResponseKind = 'log' | 'analysisResult' | 'analysisPaused' | 'analysisDone';

View File

@ -8,6 +8,7 @@
import { MessagePort, parentPort, TransferListItem } from 'worker_threads';
import { CacheManager } from './analyzer/cacheManager';
import { OperationCanceledException, setCancellationFolderName } from './common/cancellationUtils';
import { ConfigOptions } from './common/configOptions';
import { ConsoleInterface, LogLevel } from './common/console';
@ -15,9 +16,9 @@ import { isThenable } from './common/core';
import * as debug from './common/debug';
import { PythonVersion } from './common/pythonVersion';
import { createFromRealFileSystem, RealTempFile } from './common/realFileSystem';
import { ServiceKeys } from './common/serviceKeys';
import { ServiceProvider } from './common/serviceProvider';
import './common/serviceProviderExtensions';
import { ServiceKeys } from './common/serviceKeys';
import { Uri } from './common/uri/uri';
export class BackgroundConsole implements ConsoleInterface {
@ -75,6 +76,9 @@ export class BackgroundThreadBase {
)
);
}
if (!this._serviceProvider.tryGet(ServiceKeys.cacheManager)) {
this._serviceProvider.add(ServiceKeys.cacheManager, new CacheManager());
}
// Stash the base directory into a global variable.
(global as any).__rootDirectory = Uri.parse(data.rootUri, this._serviceProvider).getFilePath();
@ -242,6 +246,7 @@ export interface InitializationData {
cancellationFolderName: string | undefined;
runner: string | undefined;
title?: string;
workerIndex: number;
}
export interface RequestResponse {

View File

@ -25,6 +25,7 @@ declare module './serviceProvider' {
tmp(): TempFile | undefined;
sourceFileFactory(): ISourceFileFactory;
partialStubs(): SupportPartialStubs;
cacheManager(): CacheManager | undefined;
}
}
@ -75,6 +76,11 @@ ServiceProvider.prototype.sourceFileFactory = function () {
return result || DefaultSourceFileFactory;
};
ServiceProvider.prototype.cacheManager = function () {
const result = this.tryGet(ServiceKeys.cacheManager);
return result;
};
const DefaultSourceFileFactory: ISourceFileFactory = {
createSourceFile(
serviceProvider: ReadOnlyServiceProvider,

View File

@ -11,9 +11,9 @@ import { ServiceProvider } from './common/serviceProvider';
import { run } from './nodeServer';
import { PyrightServer } from './server';
export function main() {
export function main(maxWorkers: number) {
run(
(conn) => new PyrightServer(conn),
(conn) => new PyrightServer(conn, maxWorkers),
() => {
const runner = new BackgroundAnalysisRunner(new ServiceProvider());
runner.start();

View File

@ -31,24 +31,24 @@ import { FileBasedCancellationProvider } from './common/fileBasedCancellationUti
import { FileSystem } from './common/fileSystem';
import { FullAccessHost } from './common/fullAccessHost';
import { Host } from './common/host';
import { ServerSettings } from './common/languageServerInterface';
import { ProgressReporter } from './common/progressReporter';
import { RealTempFile, WorkspaceFileWatcherProvider, createFromRealFileSystem } from './common/realFileSystem';
import { ServiceProvider } from './common/serviceProvider';
import { createServiceProvider } from './common/serviceProviderExtensions';
import { Uri } from './common/uri/uri';
import { getRootUri } from './common/uri/uriUtils';
import { ServerSettings } from './common/languageServerInterface';
import { LanguageServerBase } from './languageServerBase';
import { CodeActionProvider } from './languageService/codeActionProvider';
import { PyrightFileSystem } from './pyrightFileSystem';
import { WellKnownWorkspaceKinds, Workspace } from './workspaceFactory';
import { LanguageServerBase } from './languageServerBase';
const maxAnalysisTimeInForeground = { openFilesTimeInMs: 50, noOpenFilesTimeInMs: 200 };
export class PyrightServer extends LanguageServerBase {
private _controller: CommandController;
constructor(connection: Connection, realFileSystem?: FileSystem) {
constructor(connection: Connection, maxWorkers: number, realFileSystem?: FileSystem) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const version = require('../package.json').version || '';
@ -57,7 +57,7 @@ export class PyrightServer extends LanguageServerBase {
const fileWatcherProvider = new WorkspaceFileWatcherProvider();
const fileSystem = realFileSystem ?? createFromRealFileSystem(tempFile, console, fileWatcherProvider);
const pyrightFs = new PyrightFileSystem(fileSystem);
const cacheManager = new CacheManager();
const cacheManager = new CacheManager(maxWorkers);
const serviceProvider = createServiceProvider(pyrightFs, tempFile, console, cacheManager);

View File

@ -8,6 +8,7 @@
import assert from 'assert';
import { Worker } from 'worker_threads';
import { CacheManager, CacheOwner } from '../analyzer/cacheManager';
test('basic', () => {
@ -69,6 +70,44 @@ test('multiple owners', () => {
assert.strictEqual(manager.getCacheUsage(), 0);
});
test('Shared memory', async () => {
const manager = new CacheManager(/* maxWorkers */ 1);
// Without the .js output from Jest, we need to generate a non module worker. Use a string
// to do so. This means the worker can't use the CacheManager, but it just needs to
// listen for the sharedArrayBuffer message.
const workerSource = `
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg.requestType === 'cacheUsageBuffer') {
const buffer = msg.sharedUsageBuffer;
const view = new Float64Array(buffer);
view[1] = 50 * 1024 * 1024 * 1024; // Make this super huge, 50GB to make sure usage is over 100%
parentPort.postMessage('done');
}
});
`;
const worker = new Worker(workerSource, { eval: true });
worker.on('error', (err) => {
throw err;
});
manager.addWorker(1, worker);
// Wait for the worker to post a message back to us.
await new Promise<void>((resolve, reject) => {
worker.on('message', (msg: string) => {
if (msg === 'done') {
resolve();
}
});
});
// Get the heap usage and verify it's more than 100%
const usage = manager.getUsedHeapRatio();
worker.terminate();
assert(usage > 1);
});
class MockCacheOwner implements CacheOwner {
constructor(private _used: number) {
// empty

View File

@ -137,7 +137,7 @@ class TestServer extends PyrightServer {
fs: FileSystem,
private readonly _supportsBackgroundAnalysis: boolean | undefined
) {
super(connection, fs);
super(connection, _supportsBackgroundAnalysis ? 1 : 0, fs);
}
test_onDidChangeWatchedFiles(params: any) {

View File

@ -1,3 +1,4 @@
import { main } from 'pyright-internal/nodeMain';
main();
// Command line version doesn't use any worker threads.
main(/* maxWorkers */ 0);

View File

@ -2,4 +2,5 @@ import { main } from 'pyright-internal/nodeMain';
Error.stackTraceLimit = 256;
main();
// VS Code version of the server has one background thread.
main(/* maxWorkers */ 1);