Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions py/packages/genkit/src/genkit/_core/_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,33 @@ def extract_action_args_and_types(
# =============================================================================


GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR = '_genkit_dynamic_action_provider'


def parse_dap_qualified_name(name: str) -> tuple[str, str, str] | None:
"""Parse DAP-qualified segment ``provider:innerKind/innerName``.

Used when the action key kind is ``dynamic-action-provider`` and the name
references a nested action exposed by a provider (e.g. MCP tools).

Returns:
``(provider_name, inner_kind, inner_name)`` if the string matches the
pattern; otherwise ``None`` so callers can treat the name as a plain
dynamic-action-provider id.
"""
if ':' not in name or '/' not in name:
return None
colon = name.index(':')
provider = name[:colon]
rest = name[colon + 1 :]
if '/' not in rest:
return None
inner_kind, inner_name = rest.split('/', 1)
if not provider or not inner_kind or not inner_name:
return None
return (provider, inner_kind, inner_name)


def parse_action_key(key: str) -> tuple[ActionKind, str]:
"""Parse '/<kind>/<name>' key into (ActionKind, name)."""
tokens = key.split('/')
Expand Down
10 changes: 8 additions & 2 deletions py/packages/genkit/src/genkit/_core/_dap.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
from collections.abc import Awaitable, Callable, Mapping
from typing import Any

from genkit._core._action import Action, ActionKind
from genkit._core._action import (
GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR,
Action,
ActionKind,
)
from genkit._core._registry import Registry

ActionMetadataLike = Mapping[str, object]
Expand Down Expand Up @@ -151,4 +155,6 @@ async def dap_action(input: DapMetadata) -> DapMetadata:
metadata={**(metadata or {}), 'type': 'dynamic-action-provider'},
)

return DynamicActionProvider(action, fn, cache_ttl_millis)
dap = DynamicActionProvider(action, fn, cache_ttl_millis)
setattr(action, GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR, dap)
return dap
157 changes: 135 additions & 22 deletions py/packages/genkit/src/genkit/_core/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

"""Registry for managing Genkit resources and actions."""

from __future__ import annotations

import asyncio
import threading
from collections.abc import Awaitable, Callable
Expand All @@ -26,13 +28,15 @@
from typing_extensions import Never, TypeVar

from genkit._core._action import (
GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR,
Action,
ActionKind,
ActionMetadata,
ActionName,
ActionRunContext,
SpanAttributeValue,
parse_action_key,
parse_dap_qualified_name,
)
from genkit._core._logger import get_logger
from genkit._core._model import (
Expand Down Expand Up @@ -84,17 +88,29 @@ class Registry:
plugins, and schemas. It provides methods for registering new resources and
looking them up at runtime.

Supports a **child registry** pattern (see ``new_child``): a child registry
delegates lookups to its parent when a name is not found locally. This is
used to create cheap, ephemeral registries scoped to a single generate call
(for DAP-resolved tools) without polluting the root registry.

This class is thread-safe and can be safely shared between multiple threads.

Attributes:
entries: A nested dictionary mapping ActionKind to a dictionary of
action names and their corresponding Action instances.
"""

default_model: str | None = None
def __init__(self, parent: Registry | None = None) -> None:
"""Initialize a Registry instance.

def __init__(self) -> None:
"""Initialize an empty Registry instance."""
Args:
parent: Optional parent registry. When provided this is a *child*
registry that falls back to the parent for any lookup that
returns ``None`` locally. Use ``new_child()`` as the
preferred factory rather than passing ``parent`` directly.
"""
self._parent: Registry | None = parent
self._default_model: str | None = None
self._entries: ActionStore = {}
self._value_by_kind_and_name: dict[str, dict[str, object]] = {}
self._schemas_by_name: dict[str, dict[str, object]] = {}
Expand All @@ -111,7 +127,10 @@ def __init__(self) -> None:
async def async_schema_resolver(name: str) -> dict[str, object] | str:
return self.lookup_schema(name) or name

self.dotprompt: Dotprompt = Dotprompt(schema_resolver=async_schema_resolver)
# Children share the parent's Dotprompt instance (prompts are global).
self.dotprompt: Dotprompt = (
parent.dotprompt if parent is not None else Dotprompt(schema_resolver=async_schema_resolver)
)
# TODO(#4352): Figure out how to set this.
self.api_stability: str = 'stable'

Expand All @@ -128,6 +147,44 @@ async def async_schema_resolver(name: str) -> dict[str, object] | str:
self._plugins: dict[str, Plugin] = {}
self._plugin_init_tasks: dict[str, asyncio.Task[None]] = {}

# -------------------------------------------------------------------------
# Child registry support
# -------------------------------------------------------------------------

def new_child(self) -> Registry:
"""Create a cheap child registry that inherits from this registry.

Child registries are used to create short-lived, ephemeral scopes (e.g.
per-generate-call tool registrations from a DAP) without polluting the
root registry. Any lookup that fails locally falls back to this parent.
Writes on the child never propagate back to the parent.

Returns:
A new ``Registry`` whose parent is ``self``.
"""
return Registry(parent=self)

@property
def parent(self) -> Registry | None:
"""The parent registry, or ``None`` if this is a root registry."""
return self._parent

@property
def is_child(self) -> bool:
"""``True`` if this registry has a parent."""
return self._parent is not None

@property
def default_model(self) -> str | None:
"""The default model name, falling back to parent if not set locally."""
if self._default_model is not None:
return self._default_model
return self._parent.default_model if self._parent is not None else None

@default_model.setter
def default_model(self, value: str | None) -> None:
self._default_model = value

def register_action(
self,
kind: ActionKind,
Expand Down Expand Up @@ -238,10 +295,13 @@ def lookup_value(self, kind: str, name: str) -> object | None:
name: The name of the value (e.g., "json", "text").

Returns:
The value or None if not found.
The value or None if not found. Falls back to parent registry.
"""
with self._lock:
return self._value_by_kind_and_name.get(kind, {}).get(name)
local = self._value_by_kind_and_name.get(kind, {}).get(name)
if local is not None:
return local
return self._parent.lookup_value(kind, name) if self._parent is not None else None

def list_values(self, kind: str) -> list[str]:
"""List all values registered for a specific kind.
Expand Down Expand Up @@ -448,38 +508,73 @@ async def resolve_action(self, kind: ActionKind, name: str) -> Action | None:
else:
providers_dict = {}
providers = list(providers_dict.values())
for provider in providers:
for provider_action in providers:
dap = getattr(provider_action, GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR, None)
if dap is None:
continue
try:
response = await provider.run({'kind': kind, 'name': name})
if response.response:
self.register_action_instance(response.response)
return await self._trigger_lazy_loading(response.response)
resolved = await dap.get_action(str(kind), name)
if resolved is not None:
return resolved
except Exception as e:
logger.debug(
f'Dynamic action provider {provider.name} failed for {kind}/{name}',
f'Dynamic action provider {provider_action.name} failed for {kind}/{name}',
exc_info=e,
)
continue

# Final fallback: delegate to parent registry.
if self._parent is not None:
return await self._parent.resolve_action(kind, name)

return None

async def resolve_action_by_key(self, key: str) -> Action | None:
"""Resolve an action using its combined key string.

The key format is `<kind>/<name>`, where kind must be a valid
`ActionKind` and name may be prefixed with plugin namespace or unprefixed.
The key format is ``/<kind>/<name>``, where kind must be a valid
``ActionKind`` and name may be prefixed with plugin namespace or
unprefixed.

For nested actions exposed by a dynamic action provider, use
``/dynamic-action-provider/<provider>:<innerKind>/<innerName>`` (for
example ``/dynamic-action-provider/my-mcp:tool/echo``).

Args:
key: The action key in the format `<kind>/<name>`.
key: The action key in the format ``/<kind>/<name>``.

Returns:
The `Action` instance if found, None otherwise.
The ``Action`` instance if found, None otherwise.

Raises:
ValueError: If the key format is invalid, the kind is not a valid
`ActionKind`, or an unprefixed name is ambiguous.
``ActionKind``, or an unprefixed name is ambiguous.
"""
kind, name = parse_action_key(key)
if kind == ActionKind.DYNAMIC_ACTION_PROVIDER:
dap_parts = parse_dap_qualified_name(name)
if dap_parts is not None:
provider_name, inner_kind_str, inner_name = dap_parts
provider_action = await self.resolve_action(
ActionKind.DYNAMIC_ACTION_PROVIDER,
provider_name,
)
if provider_action is None:
return None
dap = getattr(provider_action, GENKIT_DYNAMIC_ACTION_PROVIDER_ATTR, None)
if dap is None:
return None
try:
resolved = await dap.get_action(inner_kind_str, inner_name)
except Exception as e:
logger.debug(
f'Dynamic action provider {provider_name} failed for '
f'qualified key {inner_kind_str}/{inner_name}',
exc_info=e,
)
return None
if resolved is None:
return None
return resolved
return await self.resolve_action(kind, name)

async def list_actions(self, allowed_kinds: list[ActionKind] | None = None) -> list[ActionMetadata]:
Expand All @@ -489,6 +584,9 @@ async def list_actions(self, allowed_kinds: list[ActionKind] | None = None) -> l
plugins. It does NOT trigger plugin initialization and does NOT consult
the registry's internal action store.

For a child registry, parent actions are included but child actions with
the same (kind, name) take precedence.

Args:
allowed_kinds: Optional list of action kinds to filter by.

Expand All @@ -499,6 +597,7 @@ async def list_actions(self, allowed_kinds: list[ActionKind] | None = None) -> l
ValueError: If a plugin returns invalid ActionMetadata.
"""
metas: list[ActionMetadata] = []
seen: set[tuple[ActionKind, str]] = set()
with self._lock:
plugins = list(self._plugins.items())
for plugin_name, plugin in plugins:
Expand All @@ -513,7 +612,15 @@ async def list_actions(self, allowed_kinds: list[ActionKind] | None = None) -> l

if allowed_kinds and meta.kind not in allowed_kinds:
continue
seen.add((meta.kind, meta.name))
metas.append(meta)

# Include parent actions not shadowed by local plugins.
if self._parent is not None:
for parent_meta in await self._parent.list_actions(allowed_kinds):
if (parent_meta.kind, parent_meta.name) not in seen:
metas.append(parent_meta)

return metas

def register_schema(self, name: str, schema: dict[str, object], schema_type: type[BaseModel] | None = None) -> None:
Expand Down Expand Up @@ -545,10 +652,13 @@ def lookup_schema(self, name: str) -> dict[str, object] | None:
name: The name of the schema to look up.

Returns:
The schema data if found, None otherwise.
The schema data if found, None otherwise. Falls back to parent.
"""
with self._lock:
return self._schemas_by_name.get(name)
local = self._schemas_by_name.get(name)
if local is not None:
return local
return self._parent.lookup_schema(name) if self._parent is not None else None

def lookup_schema_type(self, name: str) -> type[BaseModel] | None:
"""Looks up a schema's Pydantic type by name.
Expand All @@ -557,10 +667,13 @@ def lookup_schema_type(self, name: str) -> type[BaseModel] | None:
name: The name of the schema to look up.

Returns:
The Pydantic model class if found, None otherwise.
The Pydantic model class if found, None otherwise. Falls back to parent.
"""
with self._lock:
return self._schema_types_by_name.get(name)
local = self._schema_types_by_name.get(name)
if local is not None:
return local
return self._parent.lookup_schema_type(name) if self._parent is not None else None

# ===== Typed Action Lookups =====
#
Expand Down
Loading
Loading