Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/tutorials/004_using_ray/resources-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Example YAML configuration with resource requirements
#
# This configuration demonstrates how to specify resource requirements
# for components in a Plugboard process. Resources can be specified as:
# - Numerical values: cpu: 2.0
# - Milli-units: cpu: "250m" (equals 0.25)
# - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes)

plugboard:
process:
type: plugboard.process.RayProcess
connector_builder:
type: plugboard.connector.RayConnector
args:
name: resource-example-process
components:
- type: examples.tutorials.004_using_ray.resources_example.DataProducer
args:
name: producer
iters: 10
resources:
cpu: 1.0 # Requires 1 CPU

- type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask
args:
name: cpu-task
resources:
cpu: 2.0 # Requires 2 CPUs
memory: "512Mi" # Requires 512MB memory

- type: examples.tutorials.004_using_ray.resources_example.GPUTask
args:
name: gpu-task
resources:
cpu: "500m" # Requires 0.5 CPU (using milli-unit notation)
gpu: 1 # Requires 1 GPU
resources:
custom_hardware: 2 # Custom resource requirement

connectors:
- source: producer.output
target: cpu-task.x

- source: cpu-task.y
target: gpu-task.data
87 changes: 87 additions & 0 deletions examples/tutorials/004_using_ray/resources_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Example demonstrating resource requirements for components in RayProcess."""

import asyncio
import typing as _t

import ray

from plugboard.component import Component, IOController as IO
from plugboard.connector import RayConnector
from plugboard.process import RayProcess
from plugboard.schemas import ComponentArgsDict, ConnectorSpec, Resource


class CPUIntensiveTask(Component):
"""Component that requires more CPU resources."""

io = IO(inputs=["x"], outputs=["y"])

async def step(self) -> None:
# Simulate CPU-intensive work
result = sum(i**2 for i in range(int(self.x * 10000)))
self.y = result


class GPUTask(Component):
"""Component that requires GPU resources."""

io = IO(inputs=["data"], outputs=["result"])

async def step(self) -> None:
# Simulate GPU computation
self.result = self.data * 2


class DataProducer(Component):
"""Produces data for processing."""

io = IO(outputs=["output"])

def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._iters = iters

async def init(self) -> None:
self._seq = iter(range(self._iters))

async def step(self) -> None:
try:
self.output = next(self._seq)
except StopIteration:
await self.io.close()


async def main() -> None:
"""Run the process with resource-constrained components."""
# Define resource requirements for components
cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs
gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU

process = RayProcess(
components=[
DataProducer(name="producer", iters=5, resources=cpu_resources),
CPUIntensiveTask(name="cpu-task", resources=cpu_resources),
GPUTask(name="gpu-task", resources=gpu_resources),
],
connectors=[
RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")),
RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")),
],
)

async with process:
await process.run()

print("Process completed successfully!")
print(f"Final result from GPU task: {process.components['gpu-task'].result}")


if __name__ == "__main__":
# Initialize Ray
ray.init()

# Run the process
asyncio.run(main())

# Shutdown Ray
ray.shutdown()
3 changes: 2 additions & 1 deletion plugboard-schemas/plugboard_schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from importlib.metadata import version

from ._common import PlugboardBaseModel
from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec
from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec, Resource
from .config import ConfigSpec, ProcessConfigSpec
from .connector import (
DEFAULT_CONNECTOR_CLS_PATH,
Expand Down Expand Up @@ -68,6 +68,7 @@
"ProcessSpec",
"ProcessArgsDict",
"ProcessArgsSpec",
"Resource",
"StateBackendSpec",
"StateBackendArgsDict",
"StateBackendArgsSpec",
Expand Down
115 changes: 114 additions & 1 deletion plugboard-schemas/plugboard_schemas/component.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,130 @@
"""Provides `ComponentSpec` class."""

import re
import typing as _t

from pydantic import Field
from pydantic import Field, field_validator

from ._common import PlugboardBaseModel


def _parse_resource_value(value: str | float | int) -> float:
"""Parse a resource value from string or number.

Supports:
- Direct numerical values: 1, 0.5, 2.0
Copy link
Contributor

@toby-coleman toby-coleman Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Can you make this more general? For example by using/supporting any of the following suffixes:

    suffixes = {
        'n': 1e-9, 'u': 1e-6, 'm': 1e-3,  # Decimal SI
        'k': 1e3, 'M': 1e6, 'G': 1e9, 'T': 1e12, 'P': 1e15, 'E': 1e18,
        'Ki': 1024, 'Mi': 1024**2, 'Gi': 1024**3, 'Ti': 1024**4, 'Pi': 1024**5, 'Ei': 1024**6
    }

- Milli-units: "250m" -> 0.25
- Memory units: "10Mi" -> 10485760 (10 * 1024 * 1024)
- Memory units: "10Gi" -> 10737418240 (10 * 1024 * 1024 * 1024)

Args:
value: The resource value to parse.

Returns:
The parsed float value.

Raises:
ValueError: If the value format is invalid.
"""
if isinstance(value, (int, float)):
return float(value)

# Handle string values
value = value.strip()

# Handle milli-units (e.g., "250m" -> 0.25)
if value.endswith("m"):
match = re.match(r"^(\d+(?:\.\d+)?)m$", value)
if match:
return float(match.group(1)) / 1000.0
raise ValueError(f"Invalid milli-unit format: {value}")

# Handle memory units
# Ki = 1024, Mi = 1024^2, Gi = 1024^3, Ti = 1024^4
memory_units = {
"Ki": 1024,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
}

for suffix, multiplier in memory_units.items():
if value.endswith(suffix):
# Use re.escape to safely escape the suffix in the regex pattern
pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$"
match = re.match(pattern, value)
if match:
return float(match.group(1)) * multiplier
raise ValueError(f"Invalid memory unit format: {value}")

# Try to parse as a plain number
try:
return float(value)
except ValueError:
raise ValueError(f"Invalid resource value format: {value}")


class Resource(PlugboardBaseModel):
"""Resource requirements for a component.

Supports specification of CPU, GPU, memory, and custom resources.
Values can be specified as numbers or strings with units (e.g., "250m" for 0.25, "10Mi" for
10 * 1024 * 1024).

Attributes:
cpu: CPU requirement (default: 0.001).
gpu: GPU requirement (default: 0).
memory: Memory requirement in bytes (default: 0).
resources: Custom resource requirements as a dictionary.
"""

cpu: float = 0.001
gpu: float = 0
memory: float = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory should be specified as an integer number of bytes

resources: dict[str, float] = Field(default_factory=dict)

@field_validator("cpu", "gpu", "memory", mode="before")
@classmethod
def _parse_resource_field(cls, v: str | float | int) -> float:
"""Validate and parse resource fields."""
return _parse_resource_value(v)

@field_validator("resources", mode="before")
@classmethod
def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, float]:
"""Validate and parse custom resources dictionary."""
return {key: _parse_resource_value(value) for key, value in v.items()}

def to_ray_options(self) -> dict[str, _t.Any]:
"""Convert resource requirements to Ray actor options.

Returns:
Dictionary of Ray actor options.
"""
options: dict[str, _t.Any] = {}

if self.cpu > 0:
options["num_cpus"] = self.cpu
if self.gpu > 0:
options["num_gpus"] = self.gpu
if self.memory > 0:
options["memory"] = self.memory

# Add custom resources
if self.resources:
options["resources"] = self.resources

return options


class ComponentArgsDict(_t.TypedDict):
"""`TypedDict` of the [`Component`][plugboard.component.Component] constructor arguments."""

name: str
initial_values: _t.NotRequired[dict[str, _t.Any] | None]
parameters: _t.NotRequired[dict[str, _t.Any] | None]
constraints: _t.NotRequired[dict[str, _t.Any] | None]
resources: _t.NotRequired["Resource | None"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resources: _t.NotRequired["Resource | None"]
resources: _t.NotRequired[Resource | None]



class ComponentArgsSpec(PlugboardBaseModel, extra="allow"):
Expand All @@ -24,12 +135,14 @@ class ComponentArgsSpec(PlugboardBaseModel, extra="allow"):
initial_values: Initial values for the `Component`.
parameters: Parameters for the `Component`.
constraints: Constraints for the `Component`.
resources: Resource requirements for the `Component`.
"""

name: str = Field(pattern=r"^([a-zA-Z_][a-zA-Z0-9_-]*)$")
initial_values: dict[str, _t.Any] = {}
parameters: dict[str, _t.Any] = {}
constraints: dict[str, _t.Any] = {}
resources: "Resource | None" = None


class ComponentSpec(PlugboardBaseModel):
Expand Down
11 changes: 11 additions & 0 deletions plugboard/component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker


if _t.TYPE_CHECKING:
from plugboard.schemas import Resource


_io_key_in: str = str(IODirection.INPUT)
_io_key_out: str = str(IODirection.OUTPUT)

Expand Down Expand Up @@ -56,13 +60,15 @@ def __init__(
parameters: _t.Optional[dict[str, _t.Any]] = None,
state: _t.Optional[StateBackend] = None,
constraints: _t.Optional[dict] = None,
resources: _t.Optional["Resource"] = None,
) -> None:
self.name = name
self._initial_values = initial_values or {}
self._constraints = constraints or {}
self._parameters = parameters or {}
self._state: _t.Optional[StateBackend] = state
self._state_is_connected = False
self._resources = resources

setattr(self, "init", self._handle_init_wrapper())
setattr(self, "step", self._handle_step_wrapper())
Expand Down Expand Up @@ -120,6 +126,11 @@ def parameters(self) -> dict[str, _t.Any]:
"""Gets the parameters of the component."""
return self._parameters

@property
def resources(self) -> _t.Optional["Resource"]:
"""Gets the resource requirements of the component."""
return self._resources

@classmethod
def _configure_io(cls) -> None:
# Get all parent classes that are Component subclasses
Expand Down
17 changes: 14 additions & 3 deletions plugboard/process/ray_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,20 @@ def _create_component_actor(self, component: Component) -> _t.Any:
name = component.id
args = component.export()["args"]
actor_cls = build_actor_wrapper(component.__class__)
return ray.remote(num_cpus=0, name=name, namespace=self._namespace)( # type: ignore
actor_cls
).remote(**args)

# Get resource requirements from component
from plugboard.schemas import Resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this import to the top


resources = component.resources
if resources is None:
# Use default resources if not specified
resources = Resource()

ray_options = resources.to_ray_options()
ray_options["name"] = name
ray_options["namespace"] = self._namespace

return ray.remote(**ray_options)(actor_cls).remote(**args) # type: ignore

async def _update_component_attributes(self) -> None:
"""Updates attributes on local components from remote actors."""
Expand Down
2 changes: 2 additions & 0 deletions plugboard/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
ProcessArgsSpec,
ProcessConfigSpec,
ProcessSpec,
Resource,
StateBackendArgsDict,
StateBackendArgsSpec,
StateBackendSpec,
Expand Down Expand Up @@ -73,6 +74,7 @@
"ProcessSpec",
"ProcessArgsDict",
"ProcessArgsSpec",
"Resource",
"StateBackendSpec",
"StateBackendArgsDict",
"StateBackendArgsSpec",
Expand Down
Loading