Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions packages/core/src/agents/a2a-client-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
type AuthenticationHandler,
type Client,
} from '@a2a-js/sdk/client';
import type { Config } from '../config/config.js';
import { Agent as UndiciAgent, ProxyAgent } from 'undici';
import { debugLogger } from '../utils/debugLogger.js';

vi.mock('../utils/debugLogger.js', () => ({
Expand Down Expand Up @@ -117,6 +119,51 @@ describe('A2AClientManager', () => {
expect(instance1).toBe(instance2);
});

describe('getInstance / dispatcher initialization', () => {
it('should use UndiciAgent when no proxy is configured', async () => {
await manager.loadAgent('TestAgent', 'http://test.agent/card');

const resolverOptions = vi.mocked(DefaultAgentCardResolver).mock
.calls[0][0];
const cardFetch = resolverOptions?.fetchImpl as typeof fetch;
await cardFetch('http://test.agent/card');

const fetchCall = vi
.mocked(fetch)
.mock.calls.find((call) => call[0] === 'http://test.agent/card');
expect(fetchCall).toBeDefined();
expect(
(fetchCall![1] as { dispatcher?: unknown })?.dispatcher,
).toBeInstanceOf(UndiciAgent);
expect(
(fetchCall![1] as { dispatcher?: unknown })?.dispatcher,
).not.toBeInstanceOf(ProxyAgent);
});

it('should use ProxyAgent when a proxy is configured via Config', async () => {
A2AClientManager.resetInstanceForTesting();
const mockConfig = {
getProxy: () => 'http://my-proxy:8080',
} as Config;

manager = A2AClientManager.getInstance(mockConfig);
await manager.loadAgent('TestProxyAgent', 'http://test.proxy.agent/card');

const resolverOptions = vi.mocked(DefaultAgentCardResolver).mock
.calls[0][0];
const cardFetch = resolverOptions?.fetchImpl as typeof fetch;
await cardFetch('http://test.proxy.agent/card');

const fetchCall = vi
.mocked(fetch)
.mock.calls.find((call) => call[0] === 'http://test.proxy.agent/card');
expect(fetchCall).toBeDefined();
expect(
(fetchCall![1] as { dispatcher?: unknown })?.dispatcher,
).toBeInstanceOf(ProxyAgent);
});
});

describe('loadAgent', () => {
it('should create and cache an A2AClient', async () => {
const agentCard = await manager.loadAgent(
Expand Down
53 changes: 36 additions & 17 deletions packages/core/src/agents/a2a-client-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,15 @@ import {
createAuthenticatingFetchWithRetry,
} from '@a2a-js/sdk/client';
import { v4 as uuidv4 } from 'uuid';
import { Agent as UndiciAgent } from 'undici';
import { Agent as UndiciAgent, ProxyAgent } from 'undici';
import type { Config } from '../config/config.js';
import { debugLogger } from '../utils/debugLogger.js';
import { safeLookup } from '../utils/fetch.js';
import { classifyAgentError } from './a2a-errors.js';

// Remote agents can take 10+ minutes (e.g. Deep Research).
// Use a dedicated dispatcher so the global 5-min timeout isn't affected.
const A2A_TIMEOUT = 1800000; // 30 minutes
const a2aDispatcher = new UndiciAgent({
headersTimeout: A2A_TIMEOUT,
bodyTimeout: A2A_TIMEOUT,
connect: {
lookup: safeLookup, // SSRF protection at connection level
},
});
const a2aFetch: typeof fetch = (input, init) =>
// eslint-disable-next-line no-restricted-syntax -- TODO: Migrate to safeFetch for SSRF protection
fetch(input, { ...init, dispatcher: a2aDispatcher } as RequestInit);

export type SendMessageResult =
| Message
Expand All @@ -59,14 +50,39 @@ export class A2AClientManager {
private clients = new Map<string, Client>();
private agentCards = new Map<string, AgentCard>();

private constructor() {}
private a2aDispatcher: UndiciAgent | ProxyAgent;
private a2aFetch: typeof fetch;

private constructor(config?: Config) {
const proxyUrl = config?.getProxy();
const agentOptions = {
headersTimeout: A2A_TIMEOUT,
bodyTimeout: A2A_TIMEOUT,
connect: {
lookup: safeLookup, // SSRF protection at connection level
},
};

if (proxyUrl) {
this.a2aDispatcher = new ProxyAgent({
uri: proxyUrl,
...agentOptions,
});
} else {
this.a2aDispatcher = new UndiciAgent(agentOptions);
}

this.a2aFetch = (input, init) =>
// eslint-disable-next-line no-restricted-syntax -- TODO: Migrate to safeFetch for SSRF protection
fetch(input, { ...init, dispatcher: this.a2aDispatcher } as RequestInit);
}

/**
* Gets the singleton instance of the A2AClientManager.
*/
static getInstance(): A2AClientManager {
static getInstance(config?: Config): A2AClientManager {
if (!A2AClientManager.instance) {
A2AClientManager.instance = new A2AClientManager();
A2AClientManager.instance = new A2AClientManager(config);
}
return A2AClientManager.instance;
}
Expand Down Expand Up @@ -97,9 +113,12 @@ export class A2AClientManager {
}

// Authenticated fetch for API calls (transports).
let authFetch: typeof fetch = a2aFetch;
let authFetch: typeof fetch = this.a2aFetch;
if (authHandler) {
authFetch = createAuthenticatingFetchWithRetry(a2aFetch, authHandler);
authFetch = createAuthenticatingFetchWithRetry(
this.a2aFetch,
authHandler,
);
}

// Use unauthenticated fetch for the agent card unless explicitly required.
Expand All @@ -109,7 +128,7 @@ export class A2AClientManager {
init?: RequestInit,
): Promise<Response> => {
// Try without auth first
const response = await a2aFetch(input, init);
const response = await this.a2aFetch(input, init);

// Retry with auth if we hit a 401/403
if ((response.status === 401 || response.status === 403) && authFetch) {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/agents/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class AgentRegistry {
* Clears the current registry and re-scans for agents.
*/
async reload(): Promise<void> {
A2AClientManager.getInstance().clearCache();
A2AClientManager.getInstance(this.config).clearCache();
await this.config.reloadAgents();
this.agents.clear();
this.allDefinitions.clear();
Expand Down Expand Up @@ -414,7 +414,7 @@ export class AgentRegistry {

// Load the remote A2A agent card and register.
try {
const clientManager = A2AClientManager.getInstance();
const clientManager = A2AClientManager.getInstance(this.config);
let authHandler: AuthenticationHandler | undefined;
if (definition.auth) {
const provider = await A2AAuthProviderFactory.create({
Expand Down
Loading