Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/bright-cameras-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

test: stabilize streamed run result leak tests
26 changes: 26 additions & 0 deletions packages/agents-core/test/manual/gcRuntime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import v8 from 'node:v8';
import vm from 'node:vm';

let cachedGc: (() => void) | undefined;

export function getExposedGc(): () => void {
if (cachedGc) {
return cachedGc;
}

const maybeGc = (globalThis as { gc?: () => void }).gc;
if (typeof maybeGc === 'function') {
cachedGc = maybeGc;
return maybeGc;
}

// Test workers are not always launched with --expose-gc, so enable it lazily.
v8.setFlagsFromString('--expose_gc');
const exposedGc = vm.runInNewContext('gc');
if (typeof exposedGc !== 'function') {
throw new Error('global.gc is not available. Run with --expose-gc.');
}

cachedGc = exposedGc;
return exposedGc;
}
234 changes: 164 additions & 70 deletions packages/agents-core/test/manual/streamedRunResultLeakCheck.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Agent } from '../../src/agent';
import { StreamedRunResult } from '../../src/result';
import { RunContext } from '../../src/runContext';
import { RunState } from '../../src/runState';
import { getEventListeners } from 'node:events';
import { getExposedGc } from './gcRuntime';

/**
* Manual GC-based leak regression check for StreamedRunResult abort listeners.
Expand Down Expand Up @@ -35,14 +38,6 @@ type FinalizationRegistryConstructor = new <T>(
cleanup: (heldValue: T) => void,
) => FinalizationRegistryLike<T>;

const weakRefConstructor = (globalThis as { WeakRef?: WeakRefConstructor })
.WeakRef;
const finalizationRegistryConstructor = (
globalThis as {
FinalizationRegistry?: FinalizationRegistryConstructor;
}
).FinalizationRegistry;

type ScenarioRefs = {
agentRef: WeakRefLike<Agent<any, any>>;
stateRef: WeakRefLike<RunState<any, Agent<any, any>>>;
Expand All @@ -54,35 +49,52 @@ type ScenarioRefs = {
};
};

const maybeGc = (globalThis as { gc?: () => void }).gc;
if (typeof maybeGc !== 'function') {
console.error('global.gc is not available. Run with --expose-gc.');
process.exit(2);
}
const gc: () => void = maybeGc;

if (
typeof weakRefConstructor !== 'function' ||
typeof finalizationRegistryConstructor !== 'function'
) {
console.error(
'WeakRef/FinalizationRegistry are not available in this runtime.',
);
process.exit(2);
export type StreamedRunResultLeakCheckOptions = {
retainSignals?: boolean;
debug?: boolean;
collectionAttempts?: number;
pressureSize?: number;
forceRemoveFail?: boolean;
};

export type StreamedRunResultLeakCheckResult = {
doneCollected: boolean;
errorCollected: boolean;
};

type LeakCheckRuntime = {
gc: () => void;
weakRefCtor: WeakRefConstructor;
finalizationRegistryCtor: FinalizationRegistryConstructor;
};

function getLeakCheckRuntime(): LeakCheckRuntime {
const weakRefConstructor = (globalThis as { WeakRef?: WeakRefConstructor })
.WeakRef;
const finalizationRegistryConstructor = (
globalThis as {
FinalizationRegistry?: FinalizationRegistryConstructor;
}
).FinalizationRegistry;
if (
typeof weakRefConstructor !== 'function' ||
typeof finalizationRegistryConstructor !== 'function'
) {
throw new Error(
'WeakRef/FinalizationRegistry are not available in this runtime.',
);
}

return {
gc: getExposedGc(),
weakRefCtor: weakRefConstructor,
finalizationRegistryCtor: finalizationRegistryConstructor,
};
}
const weakRefCtor: WeakRefConstructor = weakRefConstructor;
const finalizationRegistryCtor: FinalizationRegistryConstructor =
finalizationRegistryConstructor;

// Retain abort signals to mimic Node's persistent abort-signal bookkeeping.
const retainedSignals: AbortSignal[] = [];
const retainSignals = process.env.LEAK_CHECK_RETAIN_SIGNAL !== '0';
const debug = process.env.LEAK_CHECK_DEBUG === '1';
const collectionAttempts = Number(process.env.LEAK_CHECK_ATTEMPTS ?? 120);
const pressureSize = Number(process.env.LEAK_CHECK_PRESSURE_SIZE ?? 80_000);
const forceRemoveFail = process.env.LEAK_CHECK_FORCE_REMOVE_FAIL === '1';

function patchAbortSignalRemovalFailure(): (() => void) | undefined {

function patchAbortSignalRemovalFailure(
forceRemoveFail: boolean,
): (() => void) | undefined {
if (!forceRemoveFail) {
return undefined;
}
Expand All @@ -102,15 +114,20 @@ function patchAbortSignalRemovalFailure(): (() => void) | undefined {
};
}

const finalizedTokens = new Set<string>();
// Use finalization tokens because WeakRef alone is not a stable assertion surface.
const registry = new finalizationRegistryCtor<string>((token) => {
finalizedTokens.add(token);
});

const waitTick = () => new Promise((resolve) => setTimeout(resolve, 0));

async function runScenario(kind: 'done' | 'error'): Promise<ScenarioRefs> {
async function runScenario(
kind: 'done' | 'error',
options: {
debug: boolean;
registry: FinalizationRegistryLike<string>;
retainSignals: boolean;
retainedSignals: AbortSignal[];
weakRefCtor: WeakRefConstructor;
},
): Promise<ScenarioRefs> {
const { debug, registry, retainSignals, retainedSignals, weakRefCtor } =
options;
// Register three tokens so we can detect that all relevant objects are collected.
const tokens = {
agent: `${kind}:agent`,
Expand Down Expand Up @@ -166,7 +183,18 @@ async function runScenario(kind: 'done' | 'error'): Promise<ScenarioRefs> {
return { agentRef, stateRef, resultRef, tokens };
}

async function forceCollection(refs: ScenarioRefs): Promise<boolean> {
async function forceCollection(
refs: ScenarioRefs,
options: {
collectionAttempts: number;
pressureSize: number;
debug: boolean;
gc: () => void;
finalizedTokens: Set<string>;
},
): Promise<boolean> {
const { collectionAttempts, pressureSize, debug, gc, finalizedTokens } =
options;
for (let i = 0; i < collectionAttempts; i += 1) {
gc();
// Apply memory pressure to encourage full collections in practice.
Expand All @@ -193,37 +221,103 @@ async function forceCollection(refs: ScenarioRefs): Promise<boolean> {
return weakCollected || finalized;
}

async function main() {
const restoreRemoveEventListener = patchAbortSignalRemovalFailure();
export async function runStreamedRunResultLeakCheck(
options: StreamedRunResultLeakCheckOptions = {},
): Promise<StreamedRunResultLeakCheckResult> {
const runtime = getLeakCheckRuntime();
const retainSignals =
options.retainSignals ?? process.env.LEAK_CHECK_RETAIN_SIGNAL !== '0';
const debug = options.debug ?? process.env.LEAK_CHECK_DEBUG === '1';
const collectionAttempts = Number(
options.collectionAttempts ?? process.env.LEAK_CHECK_ATTEMPTS ?? 120,
);
const pressureSize = Number(
options.pressureSize ?? process.env.LEAK_CHECK_PRESSURE_SIZE ?? 80_000,
);
const forceRemoveFail =
options.forceRemoveFail ?? process.env.LEAK_CHECK_FORCE_REMOVE_FAIL === '1';
const retainedSignals: AbortSignal[] = [];
const finalizedTokens = new Set<string>();
// Use finalization tokens because WeakRef alone is not a stable assertion surface.
const registry = new runtime.finalizationRegistryCtor<string>((token) => {
finalizedTokens.add(token);
});
const restoreRemoveEventListener =
patchAbortSignalRemovalFailure(forceRemoveFail);
try {
const doneRefs = await runScenario('done');
const errorRefs = await runScenario('error');
const doneRefs = await runScenario('done', {
debug,
registry,
retainSignals,
retainedSignals,
weakRefCtor: runtime.weakRefCtor,
});
const errorRefs = await runScenario('error', {
debug,
registry,
retainSignals,
retainedSignals,
weakRefCtor: runtime.weakRefCtor,
});

const doneCollected = await forceCollection(doneRefs);
const errorCollected = await forceCollection(errorRefs);
const doneCollected = await forceCollection(doneRefs, {
collectionAttempts,
pressureSize,
debug,
gc: runtime.gc,
finalizedTokens,
});
const errorCollected = await forceCollection(errorRefs, {
collectionAttempts,
pressureSize,
debug,
gc: runtime.gc,
finalizedTokens,
});

if (!doneCollected || !errorCollected) {
if (debug) {
console.error(
`done finalized: agent=${finalizedTokens.has(doneRefs.tokens.agent)} state=${finalizedTokens.has(doneRefs.tokens.state)} result=${finalizedTokens.has(doneRefs.tokens.result)}`,
);
console.error(
`error finalized: agent=${finalizedTokens.has(errorRefs.tokens.agent)} state=${finalizedTokens.has(errorRefs.tokens.state)} result=${finalizedTokens.has(errorRefs.tokens.result)}`,
);
}
console.error(
`collection failed: done=${doneCollected} error=${errorCollected}`,
);
process.exit(1);
}

console.log('OK: streamed run state is collectable.');
return { doneCollected, errorCollected };
} finally {
restoreRemoveEventListener?.();
}
}

main().catch((err) => {
console.error(`unexpected error: ${err}`);
process.exit(1);
});
async function writeLine(stream: NodeJS.WriteStream, line: string) {
await new Promise<void>((resolve, reject) => {
stream.write(`${line}\n`, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}

async function main(): Promise<number> {
const result = await runStreamedRunResultLeakCheck();
if (!result.doneCollected || !result.errorCollected) {
await writeLine(
process.stderr,
`collection failed: done=${result.doneCollected} error=${result.errorCollected}`,
);
return 1;
}

await writeLine(process.stdout, 'OK: streamed run state is collectable.');
return 0;
}

const isMain =
process.argv[1] &&
resolve(process.argv[1]) === fileURLToPath(import.meta.url);

if (isMain) {
main()
.then((exitCode) => {
process.exitCode = exitCode;
})
.catch(async (err) => {
await writeLine(process.stderr, `unexpected error: ${err}`);
process.exit(1);
});
}
Loading