Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion blocks/generateMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ export const generateMessage: AppBlock = {
},
schema: {
name: "Schema",
description: "The JSON schema to generate the object from.",
description:
"Schema used to constrain the output of the model to follow a specific schema, ensuring valid, parseable output for downstream processing.",
type: {
type: "object",
additionalProperties: true,
Expand Down
173 changes: 24 additions & 149 deletions blocks/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Anthropic from "@anthropic-ai/sdk";
import { events, kv, timers, messaging } from "@slflows/sdk/v1";
import { Schema, Validator } from "jsonschema";

interface ToolDefinition {
blockId: string;
Expand Down Expand Up @@ -66,6 +65,7 @@ export function streamMessage(params: {
force: boolean | string;
thinking?: boolean | undefined;
thinkingBudget?: number | undefined;
schema?: Anthropic.Messages.Tool.InputSchema | undefined;
}) {
const {
apiKey,
Expand All @@ -79,6 +79,7 @@ export function streamMessage(params: {
force,
thinking,
thinkingBudget,
schema,
} = params;

const client = new Anthropic({
Expand Down Expand Up @@ -136,7 +137,13 @@ export function streamMessage(params: {
disable_parallel_tool_use: hasMCPServers,
}
: undefined,
betas: ["mcp-client-2025-04-04"],
output_format: schema
? {
type: "json_schema",
schema,
}
: undefined,
betas: ["mcp-client-2025-04-04", "structured-outputs-2025-11-13"],
});
}

Expand Down Expand Up @@ -269,135 +276,11 @@ export async function syncPendingEventWithStream(
}
}

export async function generateObject(
finalText: string,
params: {
apiKey: string;
model: string;
maxTokens: number;
messages: Anthropic.Beta.Messages.BetaMessageParam[];
schema: Anthropic.Messages.Tool.InputSchema;
maxRetries: number;
pendingId: string;
inputTokens: number;
outputTokens: number;
parentEventId: string;
},
): Promise<void> {
const {
apiKey,
model,
maxTokens,
messages,
schema,
maxRetries,
pendingId,
parentEventId,
} = params;

let retryCount = 0;
let { inputTokens, outputTokens } = params;

let lastError: Error | undefined;

while (retryCount < maxRetries) {
try {
await events.updatePending(pendingId, {
statusDescription:
retryCount === 0
? "Generating object..."
: `Generating object... (retry ${retryCount + 1})`,
});

// Anthropic currently does not support structured output in the same request as the user prompt.
// So we need to call the model one more time and force it to use the JSON tool.
// The arguments that the model will respond with will be the object that we want to generate.

// Remove thinking blocks from messages since we're disabling thinking for this call
const messagesWithoutThinking = messages.map((msg) => ({
...msg,
content: Array.isArray(msg.content)
? msg.content.filter((block: any) => block.type !== "thinking")
: msg.content,
}));

const stream = streamMessage({
maxTokens,
model,
messages: messagesWithoutThinking,
tools: [
{
name: "json",
description: "Respond with a JSON object.",
input_schema: schema,
},
],
mcpServers: [],
force: "json",
apiKey,
});

const message = await stream.finalMessage();

inputTokens += message.usage.input_tokens;
outputTokens += message.usage.output_tokens;

if (message.stop_reason === "tool_use") {
const toolCall = message.content.find(
(content) => content.type === "tool_use",
);

if (toolCall) {
const validator = new Validator();
const result = validator.validate(toolCall.input, schema as Schema);

if (result.errors.length === 0) {
return emitResult(
pendingId,
{
text: finalText,
object: toolCall.input,
usage: {
inputTokens,
outputTokens,
},
},
parentEventId,
);
}
}
}

retryCount++;
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
retryCount++;

// If this was the last retry, we'll exit the loop and handle the error below
if (retryCount >= maxRetries) {
break;
}
}
}

// If we get here, all retries failed
await events.cancelPending(
pendingId,
lastError
? `Object generation failed: ${lastError.message}`
: "Failed to generate object",
);

if (lastError) {
throw lastError;
}
}

export async function emitResult(
pendingId: string,
result: {
text: string | null;
object: unknown;
output: unknown;
usage: {
inputTokens: number;
outputTokens: number;
Expand All @@ -408,8 +291,11 @@ export async function emitResult(
await events.emit(
{
text: result.text,
object: result.object,
output: result.output,
usage: result.usage,

// TODO: Deprecated
object: null,
},
{
complete: pendingId,
Expand Down Expand Up @@ -658,7 +544,6 @@ export async function handleModelResponse(params: {
mcpServers,
systemPrompt,
turn,
apiKey,
maxRetries,
schema,
thinking,
Expand All @@ -676,32 +561,21 @@ export async function handleModelResponse(params: {
throw new Error("Model did not respond with text");
}

let output = null;

if (schema) {
return generateObject(textPart.text, {
apiKey,
model,
maxTokens,
messages: [
...previousMessages,
{
role: message.role,
content: message.content,
},
],
schema,
maxRetries,
pendingId,
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
parentEventId: eventId,
});
try {
output = JSON.parse(textPart.text);
} catch {
console.error("Failed to parse structured output");
}
}

return emitResult(
pendingId,
{
text: textPart.text,
object: null,
text: schema ? null : textPart.text,
output,
usage: {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
Expand Down Expand Up @@ -833,6 +707,7 @@ export async function executeTurn(params: {
thinking,
thinkingBudget,
temperature,
schema,
});

await syncPendingEventWithStream(pendingId, stream);
Expand Down
Loading