Skip to content

Commit 6b8771d

Browse files
committed
feat(py): implement Reflection API v2 with WebSocket and JSON-RPC 2.0
This implements the Reflection API v2 for Python following RFC #4211. Architecture Change: - V1 (HTTP server): Runtime hosts an HTTP server, CLI/DevUI connect to it - V2 (WebSocket client): CLI hosts a WebSocket server, Runtimes connect outbound The v2 API reverses the connection direction, allowing better support for bidirectional actions and environments where binding a port is impractical. Module Reorganization: - reflection.py: New v2 WebSocket client (primary when enabled) - reflection_v1.py: Existing HTTP server implementation (default) V2 Implementation: - ReflectionClientV2 connects to a runtime manager via WebSocket - JSON-RPC 2.0 protocol for all communication - Supports: listActions, runAction, cancelAction, listValues - Streaming: runActionState and streamChunk notifications - Auto-reconnection with exponential backoff (1s to 60s max) - Proper task-based cancellation via asyncio.current_task().cancel() Activation: - V2 is activated when GENKIT_REFLECTION_V2_SERVER env var is set - V1 remains the default when env var is not set Dependencies: - websockets>=15.0 as core dependency Tests: - 25 tests for v2 WebSocket client - 7 tests for v1 HTTP server See: RFC #4211
1 parent 714e068 commit 6b8771d

File tree

9 files changed

+1942
-713
lines changed

9 files changed

+1942
-713
lines changed

py/packages/genkit/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies = [
5454
"uvloop>=0.21.0; sys_platform != 'win32'",
5555
"anyio>=4.9.0",
5656
"opentelemetry-instrumentation-logging>=0.60b1",
57+
"websockets>=15.0",
5758
]
5859
description = "Genkit AI Framework"
5960
license = "Apache-2.0"

py/packages/genkit/src/genkit/ai/_base_async.py

Lines changed: 98 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
from genkit.core.environment import is_dev_environment
3131
from genkit.core.logging import get_logger
3232
from genkit.core.plugin import Plugin
33-
from genkit.core.reflection import create_reflection_asgi_app
33+
from genkit.core.reflection import (
34+
ReflectionClientV2,
35+
get_reflection_v2_url,
36+
)
37+
from genkit.core.reflection_v1 import create_reflection_asgi_app
3438
from genkit.core.registry import Registry
3539
from genkit.web.manager._ports import find_free_port_sync
3640

@@ -145,8 +149,6 @@ async def run_user_coro_wrapper() -> None:
145149
finally:
146150
user_task_finished_event.set()
147151

148-
reflection_server = _make_reflection_server(self.registry, server_spec)
149-
150152
# Setup signal handlers for graceful shutdown (parity with JS)
151153

152154
# Actually, anyio.run handles Ctrl+C (SIGINT) by raising KeyboardInterrupt/CancelledError
@@ -162,65 +164,36 @@ async def handle_sigterm(tg_to_cancel: anyio.abc.TaskGroup) -> None: # type: ig
162164
return
163165

164166
try:
165-
# Use lazy_write=True to prevent race condition where file exists before server is up
166-
async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager:
167-
# We use anyio.TaskGroup because it is compatible with
168-
# asyncio's event loop and works with Python 3.10
169-
# (asyncio.TaskGroup was added in 3.11, and we can switch to
170-
# that when we drop support for 3.10).
167+
# Check if Reflection API v2 is enabled
168+
v2_url = get_reflection_v2_url()
169+
170+
if v2_url:
171+
# Reflection API v2: Use WebSocket client connecting to runtime manager
172+
client = ReflectionClientV2(self.registry, v2_url)
173+
171174
async with anyio.create_task_group() as tg:
172-
# Start reflection server in the background.
173-
tg.start_soon(reflection_server.serve, name='genkit-reflection-server')
174-
await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}')
175+
# Start v2 client in background (handles its own reconnection)
176+
tg.start_soon(client.run, name='genkit-reflection-v2-client')
177+
await logger.ainfo(f'Started Genkit Reflection v2 client connecting to {v2_url}')
175178

176179
# Start SIGTERM handler
177180
tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler')
178181

179-
# Wait for server to be responsive
180-
# We need to loop and poll the health endpoint or wait for uvicorn to be ready
181-
# Since uvicorn run is blocking (but we are in a task), we can't easily hook into its startup
182-
# unless we use uvicorn's server object directly which we do.
183-
# reflection_server.started is set when uvicorn starts.
184-
185-
# Simple polling loop
186-
187-
max_retries = 20 # 2 seconds total roughly
188-
for _i in range(max_retries):
189-
try:
190-
# TODO(#4334): Use async http client if available to avoid blocking loop?
191-
# But we are in dev mode, so maybe okay.
192-
# Actually we should use anyio.to_thread to avoid blocking event loop
193-
# or assume standard lib urllib is fast enough for localhost.
194-
195-
# Using sync urllib in async loop blocks the loop!
196-
# We must use anyio.to_thread or a non-blocking check.
197-
# But let's check if reflection_server object has a 'started' flag we can trust.
198-
# uvicorn.Server has 'started' attribute but it might be internal state.
199-
200-
# Let's stick to simple polling with to_thread for safety
201-
def check_health() -> bool:
202-
health_url = f'{server_spec.url}/api/__health'
203-
with urllib.request.urlopen(health_url, timeout=0.5) as response:
204-
return response.status == 200
205-
206-
is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined]
207-
if is_healthy:
208-
break
209-
except Exception:
210-
await anyio.sleep(0.1)
211-
else:
212-
logger.warning(f'Reflection server at {server_spec.url} did not become healthy in time.')
213-
214-
# Now write the file (or verify it persisted)
215-
_ = runtime_manager.write_runtime_file()
216-
217-
# Start the (potentially short-lived) user coroutine wrapper
182+
# Start the user coroutine
218183
tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine')
219184
await logger.ainfo('Started Genkit user coroutine')
220185

221186
# Block here until the task group is canceled (e.g. Ctrl+C)
222-
# or a task raises an unhandled exception. It should not
223-
# exit just because the user coroutine finishes.
187+
# or a task raises an unhandled exception
188+
189+
else:
190+
# Reflection API v1: Start HTTP server
191+
await _run_v1_reflection_server(
192+
registry=self.registry,
193+
server_spec=server_spec,
194+
handle_sigterm=handle_sigterm,
195+
run_user_coro_wrapper=run_user_coro_wrapper,
196+
)
224197

225198
except anyio.get_cancelled_exc_class():
226199
logger.info('Development server task group cancelled (e.g., Ctrl+C).')
@@ -242,6 +215,78 @@ def check_health() -> bool:
242215
return anyio.run(dev_runner)
243216

244217

218+
async def _run_v1_reflection_server(
219+
registry: Registry,
220+
server_spec: ServerSpec,
221+
handle_sigterm: Any, # noqa: ANN401 - callback type is complex
222+
run_user_coro_wrapper: Any, # noqa: ANN401 - callback type is complex
223+
) -> None:
224+
"""Run the Reflection API v1 HTTP server with health checking.
225+
226+
This function encapsulates all V1 server startup logic including:
227+
- Creating and starting the uvicorn server
228+
- Managing the runtime file lifecycle
229+
- Polling for server health before writing the runtime file
230+
- Starting the user coroutine and SIGTERM handler
231+
232+
Args:
233+
registry: The Genkit registry.
234+
server_spec: Server specification (host, port, scheme).
235+
handle_sigterm: Callback to handle SIGTERM signals.
236+
run_user_coro_wrapper: The user's coroutine wrapped for execution.
237+
"""
238+
reflection_server = _make_reflection_server(registry, server_spec)
239+
240+
# Use lazy_write=True to prevent race condition where file exists before server is up
241+
async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager:
242+
async with anyio.create_task_group() as tg:
243+
# Start reflection server in the background.
244+
tg.start_soon(reflection_server.serve, name='genkit-reflection-server')
245+
await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}')
246+
247+
# Start SIGTERM handler
248+
tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler')
249+
250+
# Poll for server readiness before writing runtime file
251+
await _wait_for_server_health(server_spec)
252+
253+
# Now write the runtime file
254+
_ = runtime_manager.write_runtime_file()
255+
256+
# Start the user coroutine
257+
tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine')
258+
await logger.ainfo('Started Genkit user coroutine')
259+
260+
# Block here until the task group is canceled (e.g. Ctrl+C)
261+
# or a task raises an unhandled exception
262+
263+
264+
async def _wait_for_server_health(server_spec: ServerSpec, max_retries: int = 20) -> None:
265+
"""Wait for the reflection server to become healthy.
266+
267+
Polls the health endpoint until it responds successfully or max retries reached.
268+
269+
Args:
270+
server_spec: Server specification with URL.
271+
max_retries: Maximum number of retry attempts (default 20, ~2 seconds total).
272+
"""
273+
for _i in range(max_retries):
274+
try:
275+
276+
def check_health() -> bool:
277+
health_url = f'{server_spec.url}/api/__health'
278+
with urllib.request.urlopen(health_url, timeout=0.5) as response:
279+
return response.status == 200
280+
281+
is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined]
282+
if is_healthy:
283+
return
284+
except Exception:
285+
await anyio.sleep(0.1)
286+
287+
logger.warning(f'Reflection server at {server_spec.url} did not become healthy in time.')
288+
289+
245290
def _make_reflection_server(registry: Registry, spec: ServerSpec) -> uvicorn.Server:
246291
"""Make a reflection server for the given registry and spec.
247292

py/packages/genkit/src/genkit/core/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@
2323
GENKIT_VERSION = DEFAULT_GENKIT_VERSION
2424

2525
GENKIT_CLIENT_HEADER = f'genkit-python/{DEFAULT_GENKIT_VERSION}'
26+
27+
# Reflection API specification version.
28+
# This should match the value in JS (genkit-tools).
29+
GENKIT_REFLECTION_API_SPEC_VERSION = 1

0 commit comments

Comments
 (0)