-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Open
Description
fix: Add exponential backoff for LLM rate limits and preserve failed emails for retry
Problem
When LLM API rate limits are hit during email processing, emails are permanently lost:
- Webhook fires and processes email batch
- LLM call hits rate limit (429/quota exceeded)
- Error is caught, logged, and user notified via banner
- Critical bug:
lastSyncedHistoryIdis still updated even though email processing failed - Failed emails are never retried - they're skipped forever
- User clicking "I've fixed them" only clears the banner, doesn't reprocess emails
Root Cause
// apps/web/app/api/google/webhook/process-history.ts:202-226
for (const event of uniqueEvents) {
try {
await processHistoryItem(event, options, log);
} catch (error) {
captureException(error, ...);
logger.error("Error processing history item", { error });
// Error swallowed - continues to next email
}
}
// ALWAYS updates, even if errors occurred:
const lastSyncedHistoryId = history[history.length - 1].id;
await updateLastSyncedHistoryId({ emailAccountId, lastSyncedHistoryId });Expected Behavior
- LLM calls should retry with exponential backoff when rate limited
lastSyncedHistoryIdshould only update for successfully processed emails- Failed emails should be preserved for future retry
Suggested Implementation
1. Add withLLMRetry to apps/web/utils/llms/retry.ts
Using p-retry (already installed v7.1.0, used in gmail/retry.ts and outlook/retry.ts):
import "server-only";
import pRetry from "p-retry";
import { createScopedLogger } from "@/utils/logger";
import { sleep } from "@/utils/sleep";
const logger = createScopedLogger("llms:retry");
// ... existing code ...
/**
* Error class to signal LLM rate limit was hit.
* Used to communicate rate limit state up the call stack.
*/
export class LLMRateLimitError extends Error {
constructor(
message: string,
public readonly retryAfterMs: number,
public readonly provider: string,
) {
super(message);
this.name = "LLMRateLimitError";
}
}
interface LLMErrorInfo {
retryable: boolean;
isRateLimit: boolean;
retryAfterMs?: number;
provider?: string;
}
/**
* Detects if an error is a retryable LLM rate limit error.
* Supports OpenAI, Anthropic, Google, and OpenRouter error formats.
*/
export function extractLLMErrorInfo(error: unknown): LLMErrorInfo {
const err = error as Record<string, unknown>;
const cause = (err?.cause ?? err) as Record<string, unknown>;
// Extract status code
const status =
(cause?.status as number) ??
(err?.status as number) ??
((cause?.response as Record<string, unknown>)?.status as number);
// Extract error message
const message =
(cause?.message as string) ??
(err?.message as string) ??
"";
// Extract error code/type
const errorCode =
(cause?.code as string) ??
(err?.code as string) ??
((cause?.error as Record<string, unknown>)?.type as string) ??
"";
// Detect rate limit errors across providers
const isRateLimit =
status === 429 ||
errorCode === "rate_limit_exceeded" || // OpenAI
errorCode === "rate_limit_error" || // Anthropic
errorCode === "RESOURCE_EXHAUSTED" || // Google
/rate.?limit/i.test(message) ||
/quota.?exceeded/i.test(message) ||
/too.?many.?requests/i.test(message);
// Detect server errors that should be retried
const isServerError =
status === 500 ||
status === 502 ||
status === 503 ||
status === 504 ||
/internal.?error/i.test(message) ||
/server.?error/i.test(message);
// Extract Retry-After header if present
let retryAfterMs: number | undefined;
const headers = (cause?.response as Record<string, unknown>)?.headers as Record<string, string> | undefined;
const retryAfterHeader = headers?.["retry-after"] ?? headers?.["x-ratelimit-reset-requests"];
if (retryAfterHeader) {
const seconds = Number.parseInt(retryAfterHeader, 10);
if (!Number.isNaN(seconds)) {
retryAfterMs = seconds * 1000;
} else {
// Try parsing as HTTP-date
const retryDate = new Date(retryAfterHeader);
if (!Number.isNaN(retryDate.getTime())) {
retryAfterMs = Math.max(0, retryDate.getTime() - Date.now());
}
}
}
// Also check for retry time in error message (some providers include it)
if (!retryAfterMs) {
const retryMatch = message.match(/retry.?after\s+(\d+)/i);
if (retryMatch) {
retryAfterMs = Number.parseInt(retryMatch[1], 10) * 1000;
}
}
return {
retryable: isRateLimit || isServerError,
isRateLimit,
retryAfterMs,
};
}
/**
* Retries an LLM operation with exponential backoff on rate limits.
*
* Uses p-retry with custom delay calculation that honors Retry-After headers.
* On persistent rate limit failure, throws LLMRateLimitError to signal
* upstream code to stop batch processing.
*
* @param operation - The LLM operation to execute
* @param options - Configuration options
* @returns The result of the operation
* @throws LLMRateLimitError if rate limit persists after retries
*/
export async function withLLMRetry<T>(
operation: () => Promise<T>,
options: {
label: string;
provider: string;
maxRetries?: number;
},
): Promise<T> {
const { label, provider, maxRetries = 3 } = options;
let lastRateLimitError: { retryAfterMs: number } | null = null;
try {
return await pRetry(operation, {
retries: maxRetries,
onFailedAttempt: async (error) => {
const errorInfo = extractLLMErrorInfo(error);
if (!errorInfo.retryable) {
throw error; // Non-retryable, propagate immediately
}
if (errorInfo.isRateLimit) {
lastRateLimitError = { retryAfterMs: errorInfo.retryAfterMs ?? 60000 };
}
// Calculate delay: use Retry-After if available, otherwise exponential backoff
// Backoff: 2s, 4s, 8s, 16s, 32s, capped at 60s
const baseDelayMs = 2000 * Math.pow(2, error.attemptNumber - 1);
const delayMs = errorInfo.retryAfterMs ?? Math.min(baseDelayMs, 60000);
// Add jitter (0-10% of delay) to prevent thundering herd
const jitter = Math.random() * 0.1 * delayMs;
const totalDelayMs = delayMs + jitter;
logger.warn("LLM error, retrying", {
label,
provider,
attemptNumber: error.attemptNumber,
maxRetries,
delayMs: Math.round(totalDelayMs),
isRateLimit: errorInfo.isRateLimit,
retryAfterMs: errorInfo.retryAfterMs,
});
await sleep(totalDelayMs);
},
});
} catch (error) {
// If we exhausted retries due to rate limit, throw specialized error
if (lastRateLimitError) {
const errorInfo = extractLLMErrorInfo(error);
if (errorInfo.isRateLimit) {
throw new LLMRateLimitError(
`LLM rate limit exceeded after ${maxRetries} retries`,
lastRateLimitError.retryAfterMs,
provider,
);
}
}
throw error;
}
}2. Update apps/web/utils/llms/index.ts to use retry
Wrap the LLM call in createChatModel:
// In createChatModel function, around line 206-226
// Change from:
try {
return await withNetworkRetry(generate, {
label,
shouldRetry: (error) =>
NoObjectGeneratedError.isInstance(error) ||
TypeValidationError.isInstance(error),
});
} catch (error) {
await handleError(
error,
emailAccount.userId,
emailAccount.email,
emailAccount.id,
label,
modelOptions.modelName,
modelOptions.provider,
);
throw error;
}
// Change to:
try {
return await withLLMRetry(
() => withNetworkRetry(generate, {
label,
shouldRetry: (error) =>
NoObjectGeneratedError.isInstance(error) ||
TypeValidationError.isInstance(error),
}),
{
label,
provider: modelOptions.provider,
maxRetries: 3,
},
);
} catch (error) {
// Don't store error message for rate limits - they're transient
// and we want the next webhook to retry
if (!(error instanceof LLMRateLimitError)) {
await handleError(
error,
emailAccount.userId,
emailAccount.email,
emailAccount.id,
label,
modelOptions.modelName,
modelOptions.provider,
);
}
throw error;
}Add import at top:
import { withLLMRetry, LLMRateLimitError } from "@/utils/llms/retry";3. Fix apps/web/app/api/google/webhook/process-history.ts
Update processHistory function to only advance lastSyncedHistoryId on success:
import { LLMRateLimitError } from "@/utils/llms/retry";
async function processHistory(options: ProcessHistoryOptions, logger: Logger) {
const { history, emailAccount } = options;
const { email: userEmail, id: emailAccountId } = emailAccount;
if (!history?.length) return;
let lastSuccessfulHistoryId: string | null = null;
let rateLimitHit = false;
for (const h of history) {
// If we hit a rate limit, stop processing this batch
// The next webhook will resume from lastSuccessfulHistoryId
if (rateLimitHit) break;
const historyMessages = [
...(h.messagesAdded || []),
...(h.labelsAdded || []),
...(h.labelsRemoved || []),
];
if (!historyMessages.length) {
// No messages to process, but mark this history item as processed
lastSuccessfulHistoryId = h.id ?? lastSuccessfulHistoryId;
continue;
}
const allEvents = [
...(h.messagesAdded || [])
.filter((m) => {
const isRelevant = isInboxOrSentMessage(m);
if (!isRelevant) {
logger.info("Skipping message not in inbox or sent", {
messageId: m.message?.id,
labelIds: m.message?.labelIds,
});
}
return isRelevant;
})
.map((m) => ({ type: HistoryEventType.MESSAGE_ADDED, item: m })),
...(h.labelsAdded || []).map((m) => ({
type: HistoryEventType.LABEL_ADDED,
item: m,
})),
...(h.labelsRemoved || []).map((m) => ({
type: HistoryEventType.LABEL_REMOVED,
item: m,
})),
];
const uniqueEvents = uniqBy(
allEvents,
(e) => `${e.type}:${e.item.message?.id}`,
);
let historyItemSuccessful = true;
for (const event of uniqueEvents) {
const log = logger.with({
messageId: event.item.message?.id,
threadId: event.item.message?.threadId,
});
try {
await processHistoryItem(event, options, log);
} catch (error) {
// On LLM rate limit, stop processing entirely
// Next webhook will resume from last successful point
if (error instanceof LLMRateLimitError) {
logger.warn("LLM rate limit hit, stopping batch processing", {
provider: error.provider,
retryAfterMs: error.retryAfterMs,
lastSuccessfulHistoryId,
});
rateLimitHit = true;
historyItemSuccessful = false;
break;
}
// For other errors, log but continue processing
captureException(error, {
userEmail,
extra: { messageId: event.item.message?.id },
});
logger.error("Error processing history item", { error });
historyItemSuccessful = false;
}
}
// Only advance if this entire history item was processed successfully
if (historyItemSuccessful && h.id) {
lastSuccessfulHistoryId = h.id;
}
}
// Only update to the last successfully processed history ID
if (lastSuccessfulHistoryId) {
await updateLastSyncedHistoryId({
emailAccountId,
lastSyncedHistoryId: lastSuccessfulHistoryId,
});
} else {
logger.warn("No history items processed successfully", {
emailAccountId,
historyCount: history.length,
rateLimitHit,
});
}
}4. Apply same fix to apps/web/app/api/outlook/webhook/process-history.ts
Similar changes for Outlook webhook processing.
Why p-retry?
p-retry is the standard TypeScript exponential backoff library (as of 2025):
- Already used in this codebase (
utils/gmail/retry.ts,utils/outlook/retry.ts) - Well-maintained by sindresorhus
- Built-in exponential backoff with configurable options
- Proper TypeScript types
- Supports
AbortControllerfor cancellation - Minimal dependencies
Files to Modify
| File | Change |
|---|---|
apps/web/utils/llms/retry.ts |
Add withLLMRetry, LLMRateLimitError, extractLLMErrorInfo |
apps/web/utils/llms/index.ts |
Wrap LLM calls with withLLMRetry |
apps/web/app/api/google/webhook/process-history.ts |
Track successful processing, stop on rate limit |
apps/web/app/api/outlook/webhook/process-history.ts |
Same fix for Outlook |
Testing
- Configure LLM provider with very low rate limits (or mock rate limit responses)
- Trigger bulk email processing
- Verify:
- Retries occur with exponential backoff
Retry-Afterheaders are honored- Emails aren't skipped on temporary failures
lastSyncedHistoryIdonly advances on success- Next webhook resumes from correct point
Metadata
Metadata
Assignees
Labels
No labels