Skip to content

Commit cd9d7f0

Browse files
committed
feat(py): implement Reflection API v2 with WebSocket and JSON-RPC 2.0
This implements the Reflection API v2 for Python following RFC #4211. Architecture Change: - V1 (HTTP server): Runtime hosts an HTTP server, CLI/DevUI connect to it - V2 (WebSocket client): CLI hosts a WebSocket server, Runtimes connect outbound The v2 API reverses the connection direction, allowing better support for bidirectional actions and environments where binding a port is impractical. Module Reorganization: - reflection.py: New v2 WebSocket client (primary when enabled) - reflection_v1.py: Existing HTTP server implementation (default) V2 Implementation: - ReflectionClientV2 connects to a runtime manager via WebSocket - JSON-RPC 2.0 protocol for all communication - Supports: listActions, runAction, cancelAction, listValues - Streaming: runActionState and streamChunk notifications - Auto-reconnection with exponential backoff (1s to 60s max) - Proper task-based cancellation via asyncio.current_task().cancel() Activation: - V2 is activated when GENKIT_REFLECTION_V2_SERVER env var is set - V1 remains the default when env var is not set Dependencies: - websockets>=15.0 as core dependency Tests: - 25 tests for v2 WebSocket client - 7 tests for v1 HTTP server See: RFC #4211
1 parent 714e068 commit cd9d7f0

File tree

9 files changed

+1916
-714
lines changed

9 files changed

+1916
-714
lines changed

py/packages/genkit/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies = [
5454
"uvloop>=0.21.0; sys_platform != 'win32'",
5555
"anyio>=4.9.0",
5656
"opentelemetry-instrumentation-logging>=0.60b1",
57+
"websockets>=15.0",
5758
]
5859
description = "Genkit AI Framework"
5960
license = "Apache-2.0"

py/packages/genkit/src/genkit/ai/_base_async.py

Lines changed: 84 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
from genkit.core.environment import is_dev_environment
3131
from genkit.core.logging import get_logger
3232
from genkit.core.plugin import Plugin
33-
from genkit.core.reflection import create_reflection_asgi_app
33+
from genkit.core.reflection import (
34+
ReflectionClientV2,
35+
get_reflection_v2_url,
36+
)
37+
from genkit.core.reflection_v1 import create_reflection_asgi_app
3438
from genkit.core.registry import Registry
3539
from genkit.web.manager._ports import find_free_port_sync
3640

@@ -145,8 +149,6 @@ async def run_user_coro_wrapper() -> None:
145149
finally:
146150
user_task_finished_event.set()
147151

148-
reflection_server = _make_reflection_server(self.registry, server_spec)
149-
150152
# Setup signal handlers for graceful shutdown (parity with JS)
151153

152154
# Actually, anyio.run handles Ctrl+C (SIGINT) by raising KeyboardInterrupt/CancelledError
@@ -162,65 +164,94 @@ async def handle_sigterm(tg_to_cancel: anyio.abc.TaskGroup) -> None: # type: ig
162164
return
163165

164166
try:
165-
# Use lazy_write=True to prevent race condition where file exists before server is up
166-
async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager:
167-
# We use anyio.TaskGroup because it is compatible with
168-
# asyncio's event loop and works with Python 3.10
169-
# (asyncio.TaskGroup was added in 3.11, and we can switch to
170-
# that when we drop support for 3.10).
167+
# Check if Reflection API v2 is enabled
168+
v2_url = get_reflection_v2_url()
169+
170+
if v2_url:
171+
# Reflection API v2: Use WebSocket client connecting to runtime manager
172+
client = ReflectionClientV2(self.registry, v2_url)
173+
171174
async with anyio.create_task_group() as tg:
172-
# Start reflection server in the background.
173-
tg.start_soon(reflection_server.serve, name='genkit-reflection-server')
174-
await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}')
175+
# Start v2 client in background (handles its own reconnection)
176+
tg.start_soon(client.run, name='genkit-reflection-v2-client')
177+
await logger.ainfo(f'Started Genkit Reflection v2 client connecting to {v2_url}')
175178

176179
# Start SIGTERM handler
177180
tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler')
178181

179-
# Wait for server to be responsive
180-
# We need to loop and poll the health endpoint or wait for uvicorn to be ready
181-
# Since uvicorn run is blocking (but we are in a task), we can't easily hook into its startup
182-
# unless we use uvicorn's server object directly which we do.
183-
# reflection_server.started is set when uvicorn starts.
184-
185-
# Simple polling loop
186-
187-
max_retries = 20 # 2 seconds total roughly
188-
for _i in range(max_retries):
189-
try:
190-
# TODO(#4334): Use async http client if available to avoid blocking loop?
191-
# But we are in dev mode, so maybe okay.
192-
# Actually we should use anyio.to_thread to avoid blocking event loop
193-
# or assume standard lib urllib is fast enough for localhost.
194-
195-
# Using sync urllib in async loop blocks the loop!
196-
# We must use anyio.to_thread or a non-blocking check.
197-
# But let's check if reflection_server object has a 'started' flag we can trust.
198-
# uvicorn.Server has 'started' attribute but it might be internal state.
199-
200-
# Let's stick to simple polling with to_thread for safety
201-
def check_health() -> bool:
202-
health_url = f'{server_spec.url}/api/__health'
203-
with urllib.request.urlopen(health_url, timeout=0.5) as response:
204-
return response.status == 200
205-
206-
is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined]
207-
if is_healthy:
208-
break
209-
except Exception:
210-
await anyio.sleep(0.1)
211-
else:
212-
logger.warning(f'Reflection server at {server_spec.url} did not become healthy in time.')
213-
214-
# Now write the file (or verify it persisted)
215-
_ = runtime_manager.write_runtime_file()
216-
217-
# Start the (potentially short-lived) user coroutine wrapper
182+
# Start the user coroutine
218183
tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine')
219184
await logger.ainfo('Started Genkit user coroutine')
220185

221186
# Block here until the task group is canceled (e.g. Ctrl+C)
222-
# or a task raises an unhandled exception. It should not
223-
# exit just because the user coroutine finishes.
187+
# or a task raises an unhandled exception
188+
189+
else:
190+
# Reflection API v1: Start HTTP server
191+
reflection_server = _make_reflection_server(self.registry, server_spec)
192+
193+
# Use lazy_write=True to prevent race condition where file exists before server is up
194+
async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager:
195+
# We use anyio.TaskGroup because it is compatible with
196+
# asyncio's event loop and works with Python 3.10
197+
# (asyncio.TaskGroup was added in 3.11, and we can switch to
198+
# that when we drop support for 3.10).
199+
async with anyio.create_task_group() as tg:
200+
# Start reflection server in the background.
201+
tg.start_soon(reflection_server.serve, name='genkit-reflection-server')
202+
await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}')
203+
204+
# Start SIGTERM handler
205+
tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler')
206+
207+
# Wait for server to be responsive
208+
# We need to loop and poll the health endpoint or wait for uvicorn to be ready
209+
# Since uvicorn run is blocking (but we are in a task), we can't
210+
# easily hook into its startup
211+
# unless we use uvicorn's server object directly which we do.
212+
# reflection_server.started is set when uvicorn starts.
213+
214+
# Simple polling loop
215+
216+
max_retries = 20 # 2 seconds total roughly
217+
for _i in range(max_retries):
218+
try:
219+
# TODO(#4334): Use async http client if available to avoid blocking loop?
220+
# But we are in dev mode, so maybe okay.
221+
# Actually we should use anyio.to_thread to avoid blocking event loop
222+
# or assume standard lib urllib is fast enough for localhost.
223+
224+
# Using sync urllib in async loop blocks the loop!
225+
# We must use anyio.to_thread or a non-blocking check.
226+
# But let's check if reflection_server object has a 'started' flag we can trust.
227+
# uvicorn.Server has 'started' attribute but it might be internal state.
228+
229+
# Let's stick to simple polling with to_thread for safety
230+
def check_health() -> bool:
231+
health_url = f'{server_spec.url}/api/__health'
232+
with urllib.request.urlopen(health_url, timeout=0.5) as response:
233+
return response.status == 200
234+
235+
is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined]
236+
if is_healthy:
237+
break
238+
except Exception:
239+
await anyio.sleep(0.1)
240+
else:
241+
logger.warning(
242+
f'Reflection server at {server_spec.url} did not become healthy in time.'
243+
)
244+
245+
# Now write the file (or verify it persisted)
246+
_ = runtime_manager.write_runtime_file()
247+
248+
# Start the (potentially short-lived) user coroutine wrapper
249+
tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine')
250+
await logger.ainfo('Started Genkit user coroutine')
251+
252+
# Block here until the task group is canceled (e.g. Ctrl+C)
253+
# or a task raises an unhandled exception. It should not
254+
# exit just because the user coroutine finishes.
224255

225256
except anyio.get_cancelled_exc_class():
226257
logger.info('Development server task group cancelled (e.g., Ctrl+C).')

py/packages/genkit/src/genkit/core/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@
2323
GENKIT_VERSION = DEFAULT_GENKIT_VERSION
2424

2525
GENKIT_CLIENT_HEADER = f'genkit-python/{DEFAULT_GENKIT_VERSION}'
26+
27+
# Reflection API specification version.
28+
# This should match the value in JS (genkit-tools).
29+
GENKIT_REFLECTION_API_SPEC_VERSION = 1

0 commit comments

Comments
 (0)