Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions docs/reflection-v2-protocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Genkit Reflection Protocol V2 (WebSocket)

This document outlines the design for the V2 Reflection API, which uses WebSockets for bidirectional communication between the Genkit CLI (Runtime Manager) and Genkit Runtimes (User Applications).

## Overview

In V2, the connection direction is reversed compared to V1:
- **Server**: The Genkit CLI (`RuntimeManagerV2`) starts a WebSocket server.
- **Client**: The Genkit Runtime connects to the CLI's WebSocket server.

This architecture allows the CLI to easily manage multiple runtimes (e.g., for multi-service projects) and eliminates the need for runtimes to manage their own HTTP servers and ports for reflection.

## Transport

| Feature | Specification |
| :--- | :--- |
| **Protocol** | WebSocket |
| **Data Format** | JSON |
| **Message Structure** | JSON-RPC 2.0 (modified for streaming) |

## Message Format

All messages follow the JSON-RPC 2.0 specification.

### Request
```json
{
"jsonrpc": "2.0",
"method": "methodName",
"params": { ... },
"id": 1
}
```
*Note: The `id` is generated by the sender (Manager). It can be a number (auto-incrementing) or a string (UUID). It must be unique for the pending request within the WebSocket session.*

### Response (Success)
```json
{
"jsonrpc": "2.0",
"result": { ... },
"id": 1
}
```

### Response (Error)
```json
{
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": "Error message",
"data": {
"code": 13,
"message": "Error message",
"details": {
"traceId": "...",
"stack": "..."
}
}
},
"id": 1
}
```

The `data` field contains a `Status` object (matching V1 API) with:
- **`code`**: Genkit canonical status code (e.g., 13 for INTERNAL, 3 for INVALID_ARGUMENT).
- **`message`**: The error message.
- **`details`**: Additional context, including `traceId` and `stack` trace.

### Notification
A request without an `id`.
```json
{
"jsonrpc": "2.0",
"method": "methodName",
"params": { ... }
}
```

## Streaming Extension

JSON-RPC 2.0 does not natively support streaming. We extend it by using Notifications from the Runtime to the Manager associated with a specific Request ID.

| Message Type | Method | Direction | Description |
| :--- | :--- | :--- | :--- |
| **Stream Chunk** | `streamChunk` | Runtime -> Manager | Sent by the Runtime during a streaming `runAction` request. |
| **State Update** | `runActionState` | Runtime -> Manager | Sent by the Runtime to provide status updates (e.g., trace ID) before the result. |

### Stream Chunk Notification
```json
{
"jsonrpc": "2.0",
"method": "streamChunk",
"params": {
"requestId": 1,
"chunk": { ... }
}
}
```

### Run Action State Notification
```json
{
"jsonrpc": "2.0",
"method": "runActionState",
"params": {
"requestId": 1,
"state": { "traceId": "..." }
}
}
```

## Protocol Methods Summary

| Method | Direction | Type | Description |
| :--- | :--- | :--- | :--- |
| **`register`** | Runtime -> Manager | Request | Registers the runtime with the Manager. |
| **`configure`** | Manager -> Runtime | Notification | Pushes configuration updates to the Runtime. |
| **`listActions`** | Manager -> Runtime | Request | Retrieves the list of available actions. |
| **`listValues`** | Manager -> Runtime | Request | Retrieves the list of values (prompts, schemas, etc.). |
| **`runAction`** | Manager -> Runtime | Request | Executes an action. |
| **`cancelAction`** | Manager -> Runtime | Request | Cancels a running action. |

## Detailed API

### 1. Registration
**Direction:** Runtime -> Manager
**Type:** Request

**Parameters:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `id` | `string` | Unique Runtime ID. |
| `pid` | `number` | Process ID. |
| `name` | `string` | App name (optional). |
| `genkitVersion` | `string` | e.g., "0.9.0". |
| `reflectionApiSpecVersion` | `number` | Protocol version. |
| `envs` | `string[]` | Configured environments (optional). |

**Result:** `void`

### 2. Configuration
**Direction:** Manager -> Runtime
**Type:** Notification

**Parameters:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `telemetryServerUrl` | `string` | URL of the telemetry server (optional). |

### 3. List Actions
**Direction:** Manager -> Runtime
**Type:** Request

**Parameters:** `void`

**Result:**
| Type | Description |
| :--- | :--- |
| `Record<string, Action>` | Map of action keys to Action definitions. (Same schema as V1 `/api/actions`) |

### 4. List Values
**Direction:** Manager -> Runtime
**Type:** Request

**Parameters:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `type` | `string` | The type of value to list (e.g., "model", "prompt", "schema"). |

**Result:**
| Type | Description |
| :--- | :--- |
| `Record<string, any>` | Map of value keys to value definitions. |

### 5. Run Action
**Direction:** Manager -> Runtime
**Type:** Request

**Parameters:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `key` | `string` | Action key (e.g., "/flow/myFlow"). |
| `input` | `any` | Input payload. |
| `context` | `any` | Context data (optional). |
| `telemetryLabels` | `Record<string, string>` | Telemetry labels (optional). |
| `stream` | `boolean` | Whether to stream results. |
| `streamInput` | `boolean` | Whether to stream input (for bidi actions). |

**Result (Non-Streaming):**
| Field | Type | Description |
| :--- | :--- | :--- |
| `result` | `any` | The return value. |
| `telemetry` | `object` | Telemetry metadata (e.g., `{ traceId: string }`). |

**Streaming Flow:**
1. Runtime sends optional `runActionState` notifications.
2. Runtime sends `streamChunk` notifications.
3. Runtime sends final response with `result` (same structure as non-streaming).

**Bidirectional Streaming Flow (if `streamInput: true`):**
1. Manager sends `streamInputChunk` notifications.
2. Manager sends `endStreamInput` notification.
3. Runtime behaves as per Streaming Flow.

### 6. Cancel Action
**Direction:** Manager -> Runtime
**Type:** Request

**Parameters:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `traceId` | `string` | The trace ID of the action to cancel. |

**Result:**
| Field | Type | Description |
| :--- | :--- | :--- |
| `message` | `string` | Confirmation message. |

## Health Checks

| Check Type | Description |
| :--- | :--- |
| **Connection State** | The WebSocket connection state itself serves as a basic health check. |
| **Heartbeats** | Standard WebSocket Ping/Pong frames should be used to maintain the connection and detect timeouts. |

## Compatibility

| Version | Architecture |
| :--- | :--- |
| **V1** | HTTP Server on Runtime, Polling/Request from CLI. |
| **V2** | WebSocket Server on CLI, Persistent Connection from Runtime. |

The CLI will determine which mode to use based on configuration (e.g., `--experimental-reflection-v2`).
10 changes: 5 additions & 5 deletions genkit-tools/cli/src/commands/dev-test-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import {
Part,
} from '@genkit-ai/tools-common';
import {
BaseRuntimeManager,
GenkitToolsError,
RuntimeManager,
} from '@genkit-ai/tools-common/manager';
import { findProjectRoot, logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
Expand Down Expand Up @@ -345,7 +345,7 @@ const TEST_CASES: Record<string, TestCase> = {
},
};

async function waitForRuntime(manager: RuntimeManager) {
async function waitForRuntime(manager: BaseRuntimeManager) {
// Poll for runtimes
for (let i = 0; i < 20; i++) {
if (manager.listRuntimes().length > 0) return;
Expand All @@ -355,7 +355,7 @@ async function waitForRuntime(manager: RuntimeManager) {
}

async function runTest(
manager: RuntimeManager,
manager: BaseRuntimeManager,
model: string,
testCase: TestCase
): Promise<boolean> {
Expand Down Expand Up @@ -397,7 +397,7 @@ async function runTest(
}

async function runTestSuite(
manager: RuntimeManager,
manager: BaseRuntimeManager,
suite: TestSuite,
defaultSupports: string[]
): Promise<{ passed: number; failed: number }> {
Expand Down Expand Up @@ -470,7 +470,7 @@ export const devTestModel = new Command('dev:test-model')
if (args) cmd = args;
}

let manager: RuntimeManager;
let manager: BaseRuntimeManager;

if (cmd.length > 0) {
const result = await startDevProcessManager(
Expand Down
31 changes: 27 additions & 4 deletions genkit-tools/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import type { RuntimeManager } from '@genkit-ai/tools-common/manager';
import type { BaseRuntimeManager } from '@genkit-ai/tools-common/manager';
import { startServer } from '@genkit-ai/tools-common/server';
import { findProjectRoot, logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
Expand All @@ -27,6 +27,8 @@ interface RunOptions {
port?: string;
open?: boolean;
disableRealtimeTelemetry?: boolean;
experimentalReflectionV2?: boolean;
allowedTelemetryCorsHostnames?: string;
}

/** Command to run code in dev mode and/or the Dev UI. */
Expand All @@ -39,6 +41,14 @@ export const start = new Command('start')
'--disable-realtime-telemetry',
'Disable real-time telemetry streaming'
)
.option(
'--experimental-reflection-v2',
'start the experimental reflection server (WebSocket)'
)
.option(
'--allowed-telemetry-cors-hostnames <hostnames>',
'comma separated list of allowed telemetry CORS hostnames'
)
.action(async (options: RunOptions) => {
const projectRoot = await findProjectRoot();
if (projectRoot.includes('/.Trash/')) {
Expand All @@ -48,19 +58,32 @@ export const start = new Command('start')
);
}
// Always start the manager.
let manager: RuntimeManager;
let manager: BaseRuntimeManager;
let processPromise: Promise<void> | undefined;
const allowedTelemetryCorsHostnames = options.allowedTelemetryCorsHostnames
? options.allowedTelemetryCorsHostnames.split(',')
: undefined;

if (start.args.length > 0) {
const result = await startDevProcessManager(
projectRoot,
start.args[0],
start.args.slice(1),
{ disableRealtimeTelemetry: options.disableRealtimeTelemetry }
{
disableRealtimeTelemetry: options.disableRealtimeTelemetry,
experimentalReflectionV2: options.experimentalReflectionV2,
allowedTelemetryCorsHostnames,
}
);
manager = result.manager;
processPromise = result.processPromise;
} else {
manager = await startManager(projectRoot, true);
manager = await startManager(
projectRoot,
true,
options.experimentalReflectionV2,
allowedTelemetryCorsHostnames
);
processPromise = new Promise(() => {});
}
if (!options.noui) {
Expand Down
6 changes: 3 additions & 3 deletions genkit-tools/cli/src/mcp/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { RuntimeManager } from '@genkit-ai/tools-common/manager';
import { BaseRuntimeManager } from '@genkit-ai/tools-common/manager';
import { z } from 'zod';
import { startDevProcessManager, startManager } from '../utils/manager-utils';

Expand Down Expand Up @@ -62,7 +62,7 @@ export function resolveProjectRoot(
/** Genkit Runtime manager specifically for the MCP server. Allows lazy
* initialization and dev process manangement. */
export class McpRuntimeManager {
private manager: RuntimeManager | undefined;
private manager: BaseRuntimeManager | undefined;
private currentProjectRoot: string | undefined;

async getManager(projectRoot: string) {
Expand All @@ -83,7 +83,7 @@ export class McpRuntimeManager {
args: string[];
explicitProjectRoot: boolean;
timeout?: number;
}): Promise<RuntimeManager> {
}): Promise<BaseRuntimeManager> {
const { projectRoot, command, args, timeout, explicitProjectRoot } = params;
if (this.manager) {
await this.manager.stop();
Expand Down
Loading
Loading