Skip to content

Commit cdffc3e

Browse files
authored
fix: Actorized MCP servers have 30 seconds timeout to connect (#272)
* fix: Actorized mcp server have 5 seconds timeout to connect * feat: get mcp server tools in parallel * fix: default stramable and backup SSE transport * fix: improve based on code review feedback * fix: types after master merge
1 parent 3e10360 commit cdffc3e

File tree

6 files changed

+103
-27
lines changed

6 files changed

+103
-27
lines changed

src/errors.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export class TimeoutError extends Error {
2+
override readonly name = 'TimeoutError';
3+
}

src/mcp/client.ts

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/
44

55
import log from '@apify/log';
66

7+
import { TimeoutError } from '../errors.js';
8+
import { ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC } from './const.js';
79
import { getMCPServerID } from './utils.js';
810

911
/**
@@ -12,16 +14,55 @@ import { getMCPServerID } from './utils.js';
1214
*/
1315
export async function connectMCPClient(
1416
url: string, token: string,
15-
): Promise<Client> {
17+
): Promise<Client | null> {
18+
let client: Client;
1619
try {
17-
return await createMCPStreamableClient(url, token);
18-
} catch {
20+
client = await createMCPStreamableClient(url, token);
21+
return client;
22+
} catch (error) {
23+
// If streamable HTTP transport fails on not timeout error, continue with SSE transport
24+
if (error instanceof TimeoutError) {
25+
log.warning('Connection to MCP server using streamable HTTP transport timed out', { url });
26+
return null;
27+
}
28+
1929
// If streamable HTTP transport fails, fall back to SSE transport
2030
log.debug('Streamable HTTP transport failed, falling back to SSE transport', {
2131
url,
2232
});
23-
return await createMCPSSEClient(url, token);
2433
}
34+
35+
try {
36+
client = await createMCPSSEClient(url, token);
37+
return client;
38+
} catch (error) {
39+
if (error instanceof TimeoutError) {
40+
log.warning('Connection to MCP server using SSE transport timed out', { url });
41+
return null;
42+
}
43+
44+
log.error('Failed to connect to MCP server using SSE transport', { cause: error });
45+
throw error;
46+
}
47+
}
48+
49+
async function withTimeout<T>(millis: number, promise: Promise<T>): Promise<T> {
50+
let timeoutPid: NodeJS.Timeout;
51+
const timeout = new Promise<never>((_resolve, reject) => {
52+
timeoutPid = setTimeout(
53+
() => reject(new TimeoutError(`Timed out after ${millis} ms.`)),
54+
millis,
55+
);
56+
});
57+
58+
return Promise.race([
59+
promise,
60+
timeout,
61+
]).finally(() => {
62+
if (timeoutPid) {
63+
clearTimeout(timeoutPid);
64+
}
65+
});
2566
}
2667

2768
/**
@@ -47,7 +88,7 @@ async function createMCPSSEClient(
4788
headers.set('authorization', `Bearer ${token}`);
4889
return fetch(input, { ...init, headers });
4990
},
50-
// We have to cast to "any" to use it, since it's non-standard
91+
// We have to cast to "any" to use it, since it's non-standard
5192
} as any, // eslint-disable-line @typescript-eslint/no-explicit-any
5293
});
5394

@@ -56,7 +97,7 @@ async function createMCPSSEClient(
5697
version: '1.0.0',
5798
});
5899

59-
await client.connect(transport);
100+
await withTimeout(ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC, client.connect(transport));
60101

61102
return client;
62103
}
@@ -82,7 +123,7 @@ async function createMCPStreamableClient(
82123
version: '1.0.0',
83124
});
84125

85-
await client.connect(transport);
126+
await withTimeout(ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC, client.connect(transport));
86127

87128
return client;
88129
}

src/mcp/const.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export const MAX_TOOL_NAME_LENGTH = 64;
22
export const SERVER_ID_LENGTH = 8;
33
export const EXTERNAL_TOOL_CALL_TIMEOUT_MSEC = 120_000; // 2 minutes
4+
export const ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC = 30_000; // 30 seconds
45

56
export const LOG_LEVEL_MAP: Record<string, number> = {
67
debug: 0,

src/mcp/server.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,19 @@ export class ActorsMcpServer {
551551

552552
if (tool.type === 'actor-mcp') {
553553
const serverTool = tool.tool as ActorMcpTool;
554-
let client: Client | undefined;
554+
let client: Client | null = null;
555555
try {
556556
client = await connectMCPClient(serverTool.serverUrl, apifyToken);
557+
if (!client) {
558+
const msg = `Failed to connect to MCP server ${serverTool.serverUrl}`;
559+
log.error(msg);
560+
await this.server.sendLoggingMessage({ level: 'error', data: msg });
561+
return {
562+
content: [
563+
{ type: 'text', text: msg },
564+
],
565+
};
566+
}
557567

558568
// Only set up notification handlers if progressToken is provided by the client
559569
if (progressToken) {

src/tools/actor.ts

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ export async function getNormalActorsAsTools(
180180
Actor description: ${actorDefinitionPruned.description}
181181
Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`,
182182
inputSchema: actorDefinitionPruned.input
183-
// So Actor without input schema works - MCP client expects JSON schema valid output
184-
|| {
185-
type: 'object',
186-
properties: {},
187-
required: [],
188-
},
183+
// So Actor without input schema works - MCP client expects JSON schema valid output
184+
|| {
185+
type: 'object',
186+
properties: {},
187+
required: [],
188+
},
189189
// Additional props true to allow skyfire-pay-id
190190
ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }),
191191
memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes,
@@ -207,21 +207,22 @@ async function getMCPServersAsTools(
207207
/**
208208
* This is case for the Skyfire request without any Apify token, we do not support
209209
* standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors).
210-
*/
210+
*/
211211
if (apifyToken === null || apifyToken === undefined) {
212212
return [];
213213
}
214214

215-
const actorsMCPServerTools: ToolEntry[] = [];
216-
for (const actorInfo of actorsInfo) {
215+
// Process all actors in parallel
216+
const actorToolPromises = actorsInfo.map(async (actorInfo) => {
217217
const actorId = actorInfo.actorDefinitionPruned.id;
218218
if (!actorInfo.webServerMcpPath) {
219219
log.warning('Actor does not have a web server MCP path, skipping', {
220220
actorFullName: actorInfo.actorDefinitionPruned.actorFullName,
221221
actorId,
222222
});
223-
continue;
223+
return [];
224224
}
225+
225226
const mcpServerUrl = await getActorMCPServerURL(
226227
actorInfo.actorDefinitionPruned.id, // Real ID of the Actor
227228
actorInfo.webServerMcpPath,
@@ -232,17 +233,25 @@ async function getMCPServersAsTools(
232233
mcpServerUrl,
233234
});
234235

235-
let client: Client | undefined;
236+
let client: Client | null = null;
236237
try {
237238
client = await connectMCPClient(mcpServerUrl, apifyToken);
239+
if (!client) {
240+
// Skip this Actor, connectMCPClient will log the error
241+
return [];
242+
}
238243
const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl);
239-
actorsMCPServerTools.push(...serverTools);
244+
return serverTools;
240245
} finally {
241246
if (client) await client.close();
242247
}
243-
}
248+
});
244249

245-
return actorsMCPServerTools;
250+
// Wait for all actors to be processed in parallel
251+
const actorToolsArrays = await Promise.all(actorToolPromises);
252+
253+
// Flatten the arrays of tools
254+
return actorToolsArrays.flat();
246255
}
247256

248257
export async function getActorsAsTools(
@@ -382,10 +391,13 @@ The step parameter enforces this workflow - you cannot call an Actor without fir
382391
if (isActorMcpServer) {
383392
// MCP server: list tools
384393
const mcpServerUrl = mcpServerUrlOrFalse;
385-
let client: Client | undefined;
394+
let client: Client | null = null;
386395
// Nested try to ensure client is closed
387396
try {
388397
client = await connectMCPClient(mcpServerUrl, apifyToken);
398+
if (!client) {
399+
return buildMCPResponse([`Failed to connect to MCP server ${mcpServerUrl}`]);
400+
}
389401
const toolsResponse = await client.listTools();
390402

391403
const toolsInfo = toolsResponse.tools.map((tool) => `**${tool.name}**\n${tool.description || 'No description'}\nInput Schema: ${JSON.stringify(tool.inputSchema, null, 2)}`,
@@ -451,9 +463,12 @@ The step parameter enforces this workflow - you cannot call an Actor without fir
451463
}
452464

453465
const mcpServerUrl = mcpServerUrlOrFalse;
454-
let client: Client | undefined;
466+
let client: Client | null = null;
455467
try {
456468
client = await connectMCPClient(mcpServerUrl, apifyToken);
469+
if (!client) {
470+
return buildMCPResponse([`Failed to connect to MCP server ${mcpServerUrl}`]);
471+
}
457472

458473
const result = await client.callTool({
459474
name: mcpToolName,
@@ -495,7 +510,7 @@ The step parameter enforces this workflow - you cannot call an Actor without fir
495510
if (!callResult) {
496511
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
497512
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
498-
return { };
513+
return {};
499514
}
500515

501516
const content = buildActorResponseContent(actorName, callResult);

tests/integration/suite.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ export function createIntegrationTestsSuite(
398398
limit: 5,
399399
},
400400
});
401-
const content = result.content as {text: string}[];
401+
const content = result.content as { text: string }[];
402402
expect(content.some((item) => item.text.includes(ACTOR_PYTHON_EXAMPLE))).toBe(true);
403403
});
404404

@@ -415,7 +415,7 @@ export function createIntegrationTestsSuite(
415415
limit: 100,
416416
},
417417
});
418-
const content = result.content as {text: string}[];
418+
const content = result.content as { text: string }[];
419419
expect(content.length).toBe(1);
420420
const outputText = content[0].text;
421421

@@ -972,5 +972,11 @@ export function createIntegrationTestsSuite(
972972

973973
await client.close();
974974
});
975+
976+
it('should connect to MCP server and at least one tool is available', async () => {
977+
client = await createClientFn({ tools: [ACTOR_MCP_SERVER_ACTOR_NAME] });
978+
const tools = await client.listTools();
979+
expect(tools.tools.length).toBeGreaterThan(0);
980+
});
975981
});
976982
}

0 commit comments

Comments
 (0)