Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions packages/core/src/code_assist/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ import { OAuth2Client } from 'google-auth-library';
import { UserTierId, ActionStatus } from './types.js';
import { FinishReason } from '@google/genai';
import { LlmRole } from '../telemetry/types.js';
import { logInvalidChunk } from '../telemetry/loggers.js';
import { makeFakeConfig } from '../test-utils/config.js';

vi.mock('google-auth-library');
vi.mock('../telemetry/loggers.js', () => ({
logBillingEvent: vi.fn(),
logInvalidChunk: vi.fn(),
}));

function createTestServer(headers: Record<string, string> = {}) {
const mockRequest = vi.fn();
Expand Down Expand Up @@ -671,4 +677,233 @@ describe('CodeAssistServer', () => {
expect(requestPostSpy).toHaveBeenCalledWith('retrieveUserQuota', req);
expect(response).toEqual(mockResponse);
});

describe('robustness testing', () => {
it('should not crash on random error objects in loadCodeAssist (isVpcScAffectedUser)', async () => {
const { server } = createTestServer();
const errors = [
null,
undefined,
'string error',
123,
{ some: 'object' },
new Error('standard error'),
{ response: {} },
{ response: { data: {} } },
];

for (const err of errors) {
vi.spyOn(server, 'requestPost').mockRejectedValueOnce(err);
try {
await server.loadCodeAssist({ metadata: {} });
} catch (e) {
expect(e).toBe(err);
}
}
});

it('should handle randomly fragmented SSE streams gracefully', async () => {
const { server, mockRequest } = createTestServer();
const { Readable } = await import('node:stream');

const fragmentedCases = [
['d', 'ata: {"foo":', ' "bar"}\n\n'],
['data: {"foo": "bar"}\n', '\n'],
['data: ', '{"foo": "bar"}', '\n\n'],
['data: {"foo": "bar"}\n\n', 'data: {"baz": 1}\n\n'],
];

for (const chunks of fragmentedCases) {
const mockStream = new Readable({
read() {
for (const chunk of chunks) {
this.push(chunk);
}
this.push(null);
},
});
mockRequest.mockResolvedValueOnce({ data: mockStream });

const stream = await server.requestStreamingPost('testStream', {});
const results = [];
try {
for await (const res of stream) {
results.push(res);
}
} catch (_e) {
// Fragmented JSON might cause SyntaxError, which is fine as long as it doesn't hang
}
}
});

it('should correctly parse valid JSON split across multiple data lines', async () => {
const { server, mockRequest } = createTestServer();
const { Readable } = await import('node:stream');
const jsonObj = {
complex: { structure: [1, 2, 3] },
bool: true,
str: 'value',
};
const jsonString = JSON.stringify(jsonObj, null, 2);
const lines = jsonString.split('\n');
const ssePayload = lines.map((line) => `data: ${line}\n`).join('') + '\n';

const mockStream = new Readable({
read() {
this.push(ssePayload);
this.push(null);
},
});
mockRequest.mockResolvedValueOnce({ data: mockStream });

const stream = await server.requestStreamingPost('testStream', {});
const results = [];
for await (const res of stream) {
results.push(res);
}
expect(results).toHaveLength(1);
expect(results[0]).toEqual(jsonObj);
});

it('should not crash on objects partially matching VPC SC error structure', async () => {
const { server } = createTestServer();
const partialErrors = [
{ response: { data: { error: { details: [{ reason: 'OTHER' }] } } } },
{ response: { data: { error: { details: [] } } } },
{ response: { data: { error: {} } } },
{ response: { data: {} } },
];

for (const err of partialErrors) {
vi.spyOn(server, 'requestPost').mockRejectedValueOnce(err);
try {
await server.loadCodeAssist({ metadata: {} });
} catch (e) {
expect(e).toBe(err);
}
}
});

it('should correctly ignore arbitrary SSE comments and ID lines and empty lines before data', async () => {
const { server, mockRequest } = createTestServer();
const { Readable } = await import('node:stream');
const jsonObj = { foo: 'bar' };
const jsonString = JSON.stringify(jsonObj);

const ssePayload = `id: 123
:comment
retry: 100

data: ${jsonString}

`;

const mockStream = new Readable({
read() {
this.push(ssePayload);
this.push(null);
},
});
mockRequest.mockResolvedValueOnce({ data: mockStream });

const stream = await server.requestStreamingPost('testStream', {});
const results = [];
for await (const res of stream) {
results.push(res);
}
expect(results).toHaveLength(1);
expect(results[0]).toEqual(jsonObj);
});

it('should log InvalidChunkEvent when SSE chunk is not valid JSON', async () => {
const config = makeFakeConfig();
const mockRequest = vi.fn();
const client = { request: mockRequest } as unknown as OAuth2Client;
const server = new CodeAssistServer(
client,
'test-project',
{},
'test-session',
UserTierId.FREE,
undefined,
undefined,
config,
);

const { Readable } = await import('node:stream');
const mockStream = new Readable({
read() {},
});

mockRequest.mockResolvedValue({ data: mockStream });

const stream = await server.requestStreamingPost('testStream', {});

setTimeout(() => {
mockStream.push('data: { "invalid": json }\n\n');
mockStream.push(null);
}, 0);

const results = [];
for await (const res of stream) {
results.push(res);
}

expect(results).toHaveLength(0);
expect(logInvalidChunk).toHaveBeenCalledWith(
config,
expect.objectContaining({
error_message: expect.stringContaining('Unexpected token'),
}),
);
});

it('should safely process random response streams in generateContentStream (consumed/remaining credits)', async () => {
const { mockRequest, client } = createTestServer();
const testServer = new CodeAssistServer(
client,
'test-project',
{},
'test-session',
UserTierId.FREE,
undefined,
{ id: 'test-tier', name: 'tier', availableCredits: [] },
);
const { Readable } = await import('node:stream');

const streamResponses = [
{
traceId: '1',
consumedCredits: [{ creditType: 'A', creditAmount: '10' }],
},
{ traceId: '2', remainingCredits: [{ creditType: 'B' }] },
{ traceId: '3' },
{ traceId: '4', consumedCredits: null, remainingCredits: undefined },
];

const mockStream = new Readable({
read() {
for (const resp of streamResponses) {
this.push(`data: ${JSON.stringify(resp)}\n\n`);
}
this.push(null);
},
});
mockRequest.mockResolvedValueOnce({ data: mockStream });
vi.spyOn(testServer, 'recordCodeAssistMetrics').mockResolvedValue(
undefined,
);

const stream = await testServer.generateContentStream(
{ model: 'test-model', contents: [] },
'user-prompt-id',
LlmRole.MAIN,
);

for await (const _ of stream) {
// Drain stream
}
// Should not crash
});
});
});
21 changes: 16 additions & 5 deletions packages/core/src/code_assist/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
isOverageEligibleModel,
shouldAutoUseCredits,
} from '../billing/billing.js';
import { logBillingEvent } from '../telemetry/loggers.js';
import { logBillingEvent, logInvalidChunk } from '../telemetry/loggers.js';
import { CreditsUsedEvent } from '../telemetry/billingEvents.js';
import {
fromCountTokenResponse,
Expand All @@ -62,7 +62,8 @@ import {
recordConversationOffered,
} from './telemetry.js';
import { getClientMetadata } from './experiments/client_metadata.js';
import type { LlmRole } from '../telemetry/types.js';
import { InvalidChunkEvent, type LlmRole } from '../telemetry/types.js';
import { getErrorMessage } from '../utils/errors.js';
/** HTTP options to be used in each of the requests. */
export interface HttpOptions {
/** Additional HTTP headers to be sent with the request. */
Expand Down Expand Up @@ -466,7 +467,7 @@ export class CodeAssistServer implements ContentGenerator {
retry: false,
});

return (async function* (): AsyncGenerator<T> {
return (async function* (server: CodeAssistServer): AsyncGenerator<T> {
const rl = readline.createInterface({
input: Readable.from(res.data),
crlfDelay: Infinity, // Recognizes '\r\n' and '\n' as line breaks
Expand All @@ -480,12 +481,22 @@ export class CodeAssistServer implements ContentGenerator {
if (bufferedLines.length === 0) {
continue; // no data to yield
}
yield JSON.parse(bufferedLines.join('\n'));
const chunk = bufferedLines.join('\n');
try {
yield JSON.parse(chunk);
} catch (e) {
if (server.config) {
logInvalidChunk(
server.config,
new InvalidChunkEvent(getErrorMessage(e)),
);
}
}
bufferedLines = []; // Reset the buffer after yielding
}
// Ignore other lines like comments or id fields
}
})();
})(this);
}

private getBaseUrl(): string {
Expand Down
36 changes: 36 additions & 0 deletions packages/core/src/telemetry/loggers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
logFlashFallback,
logChatCompression,
logMalformedJsonResponse,
logInvalidChunk,
logFileOperation,
logRipgrepFallback,
logToolOutputTruncated,
Expand Down Expand Up @@ -68,6 +69,7 @@ import {
EVENT_AGENT_START,
EVENT_AGENT_FINISH,
EVENT_WEB_FETCH_FALLBACK_ATTEMPT,
EVENT_INVALID_CHUNK,
ApiErrorEvent,
ApiRequestEvent,
ApiResponseEvent,
Expand All @@ -77,6 +79,7 @@ import {
FlashFallbackEvent,
RipgrepFallbackEvent,
MalformedJsonResponseEvent,
InvalidChunkEvent,
makeChatCompressionEvent,
FileOperationEvent,
ToolOutputTruncatedEvent,
Expand Down Expand Up @@ -1736,6 +1739,39 @@ describe('loggers', () => {
});
});

describe('logInvalidChunk', () => {
beforeEach(() => {
vi.spyOn(ClearcutLogger.prototype, 'logInvalidChunkEvent');
vi.spyOn(metrics, 'recordInvalidChunk');
});

it('logs the event to Clearcut and OTEL', () => {
const mockConfig = makeFakeConfig();
const event = new InvalidChunkEvent('Unexpected token');

logInvalidChunk(mockConfig, event);

expect(
ClearcutLogger.prototype.logInvalidChunkEvent,
).toHaveBeenCalledWith(event);

expect(mockLogger.emit).toHaveBeenCalledWith({
body: 'Invalid chunk received from stream.',
attributes: {
'session.id': 'test-session-id',
'user.email': '[email protected]',
'installation.id': 'test-installation-id',
'event.name': EVENT_INVALID_CHUNK,
'event.timestamp': '2025-01-01T00:00:00.000Z',
interactive: false,
'error.message': 'Unexpected token',
},
});

expect(metrics.recordInvalidChunk).toHaveBeenCalledWith(mockConfig);
});
});

describe('logFileOperation', () => {
const mockConfig = {
getSessionId: () => 'test-session-id',
Expand Down
Loading
Loading