Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4530341
Initial plan
Copilot Jan 28, 2026
d529dee
Add Resource class and integrate with Component, RayProcess, and Tuner
Copilot Jan 28, 2026
f59376c
Add comprehensive tests for Resource implementation
Copilot Jan 28, 2026
0219833
Add examples for resource requirements in Ray processes
Copilot Jan 28, 2026
3ec7854
Fix type annotations and address code review feedback
Copilot Jan 28, 2026
9515154
Address code review feedback: generalize suffixes, make memory intege…
Copilot Jan 31, 2026
c400a97
Move suffixes to top
toby-coleman Feb 3, 2026
a38aa88
Rework logic for tune placement bundles
toby-coleman Feb 3, 2026
573a0e1
Merge remote-tracking branch 'origin/main' into copilot/implement-res…
toby-coleman Feb 7, 2026
31092ab
Docs config improved for schemas
toby-coleman Feb 7, 2026
6bf7017
Tidy up documentation on resources example
toby-coleman Feb 7, 2026
bae7c46
Add resources for smoke tests
toby-coleman Feb 7, 2026
a9a4ef5
Make sure resources are available in runner
toby-coleman Feb 7, 2026
c14cbaf
Remove non-existent cross refs in docs
toby-coleman Feb 7, 2026
2588544
Fixup types
toby-coleman Feb 7, 2026
d2e7aa2
Test that actor is passed the resource requirements
toby-coleman Feb 7, 2026
3301419
Increase timeouts on test CI
toby-coleman Feb 7, 2026
9b361df
Typing
toby-coleman Feb 7, 2026
d7c87c6
Fixup test
toby-coleman Feb 7, 2026
a3c9555
Increase tuner test timeout
toby-coleman Feb 7, 2026
c9aa1ce
Correct format for placement groups in Ray tune
toby-coleman Feb 7, 2026
155c571
Typo
toby-coleman Feb 8, 2026
a562b88
Coverage
toby-coleman Feb 8, 2026
5fb86bc
refactor: Resource value validation
chrisk314 Feb 11, 2026
b0fd414
fix: Inconsistent type annotation syntax
chrisk314 Feb 11, 2026
3f8031f
Allows declaring resources at Component declaration time
chrisk314 Feb 11, 2026
82a76cf
Enhances tests
chrisk314 Feb 11, 2026
2892fe9
Ignore lint error
chrisk314 Feb 11, 2026
ea310c1
Update docs and examples to show class-level resource declaration
Copilot Feb 11, 2026
f30d38d
Update docs
toby-coleman Feb 14, 2026
a1f6af2
Update snippet
toby-coleman Feb 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
test-unit:
name: Tests - unit
runs-on: ubuntu-latest
timeout-minutes: 5
timeout-minutes: 8
strategy:
matrix:
python_version: [3.12, 3.13]
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
test-integration:
name: Tests - integration
runs-on: ubuntu-latest
timeout-minutes: 6
timeout-minutes: 10
strategy:
matrix:
python_version: [3.12, 3.13]
Expand Down Expand Up @@ -157,7 +157,7 @@ jobs:
test-integration-tuner:
name: Tests - integration:tuner
runs-on: ubuntu-latest
timeout-minutes: 5
timeout-minutes: 8
strategy:
matrix:
python_version: [3.12, 3.13]
Expand Down
2 changes: 1 addition & 1 deletion docs/api/schemas/schemas.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
::: plugboard.schemas
::: plugboard_schemas
31 changes: 31 additions & 0 deletions docs/examples/tutorials/running-in-parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,34 @@ Specifying the process type and channel builder type in the YAML is the only cha

1. Tell Plugboard to use a [`RayProcess`][plugboard.process.RayProcess] instead of the default [`LocalProcess`][plugboard.process.LocalProcess].
2. Also change the connector builder to [`RayConnector`][plugboard.connector.RayConnector], which will build [`RayChannel`][plugboard.connector.RayChannel] objects when creating the `Process`.

## Specifying resource requirements

When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks) and you are running on a Ray cluster.

!!! tip
Normally Ray will start automatically when you are using Plugboard locally. If you want to start a separate Ray instance, for example so that you can control the configuration options, you can launch it from the [CLI](https://docs.ray.io/en/latest/ray-core/starting-ray.html). For example, this command will start a Ray instance with enough resources to run the example below.

```sh
uv run ray start --head --num-cpus=4 --num-gpus=1 --resources='{"custom_hardware": 5}'
```

For example, you can specify [`Resource`][plugboard.schemas.Resource] requirements like this when creating components:

```python
--8<-- "examples/tutorials/004_using_ray/resources_example.py:resources"
```

Or in YAML:

```yaml
--8<-- "examples/tutorials/004_using_ray/resources-example.yaml:10:"
```

1. Requires 1 CPU.
2. Requires 2 CPU resources.
3. Requires 0.5 CPU, this time specified in Kubernetes-style format, i.e. 500 milli CPUs.
4. Requires 1 GPU.
5. Requires a custom resource called `custom_hardware`. This needs to specified in the configuration of your Ray cluster to make it available.

See the [Ray documentation](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html) for more information about specifying resource requirements.
40 changes: 40 additions & 0 deletions examples/tutorials/004_using_ray/resources-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Example YAML configuration with resource requirements
Comment thread
toby-coleman marked this conversation as resolved.
#
# This configuration demonstrates how to specify resource requirements
# for components in a Plugboard process. Resources can be specified as:
# - Numerical values: cpu: 2.0
# - Milli-units: cpu: "250m" (equals 0.25)
# - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes)
plugboard:
process:
type: plugboard.process.RayProcess
connector_builder:
type: plugboard.connector.RayConnector
args:
name: resource-example-process
components:
- type: examples.tutorials.004_using_ray.resources_example.DataProducer
args:
name: producer
iters: 10
resources:
cpu: 1.0 # (1)!
- type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask
args:
name: cpu-task
resources:
cpu: 2.0 # (2)!
memory: "512Mi" # Requires 512MB memory
- type: examples.tutorials.004_using_ray.resources_example.GPUTask
args:
name: gpu-task
resources:
cpu: "500m" # (3)!
gpu: 1 # (4)!
resources:
custom_hardware: 2 # (5)!
connectors:
- source: producer.output
target: cpu-task.x
- source: cpu-task.y
target: gpu-task.data
84 changes: 84 additions & 0 deletions examples/tutorials/004_using_ray/resources_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Example demonstrating resource requirements for components in RayProcess."""

import asyncio
import typing as _t

import ray

from plugboard.component import Component, IOController as IO
from plugboard.connector import RayConnector
from plugboard.process import RayProcess
from plugboard.schemas import ComponentArgsDict, ConnectorSpec, Resource


class CPUIntensiveTask(Component):
"""Component that requires more CPU resources."""

io = IO(inputs=["x"], outputs=["y"])

async def step(self) -> None:
# Simulate CPU-intensive work
result = sum(i**2 for i in range(int(self.x * 10000)))
self.y = result


class GPUTask(Component):
"""Component that requires GPU resources."""

io = IO(inputs=["data"], outputs=["result"])

async def step(self) -> None:
# Simulate GPU computation
self.result = self.data * 2


class DataProducer(Component):
"""Produces data for processing."""

io = IO(outputs=["output"])

def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._iters = iters

async def init(self) -> None:
self._seq = iter(range(self._iters))

async def step(self) -> None:
try:
self.output = next(self._seq)
except StopIteration:
await self.io.close()


async def main() -> None:
"""Run the process with resource-constrained components."""
# Define resource requirements for components
# --8<-- [start:resources]
cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs
gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU

process = RayProcess(
components=[
DataProducer(name="producer", iters=5, resources=cpu_resources),
CPUIntensiveTask(name="cpu-task", resources=cpu_resources),
GPUTask(name="gpu-task", resources=gpu_resources),
],
connectors=[
RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")),
RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")),
],
)
# --8<-- [end:resources]

async with process:
await process.run()

print("Process completed successfully!")


if __name__ == "__main__":
if not ray.is_initialized():
# Ray must be initialised with the necessary resources
ray.init(num_cpus=5, num_gpus=1, resources={"custom_hardware": 10}, include_dashboard=True)
asyncio.run(main())
3 changes: 2 additions & 1 deletion mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ plugins:
default_handler: python
handlers:
python:
paths: [src]
paths: [plugboard, plugboard-schemas]
options:
docstring_style: google
show_source: false
Expand Down Expand Up @@ -113,6 +113,7 @@ watch:
- docs
- examples
- plugboard
- plugboard-schemas
- README.md
- CONTRIBUTING.md

Expand Down
3 changes: 2 additions & 1 deletion plugboard-schemas/plugboard_schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from importlib.metadata import version

from ._common import PlugboardBaseModel
from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec
from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec, Resource
from .config import ConfigSpec, ProcessConfigSpec
from .connector import (
DEFAULT_CONNECTOR_CLS_PATH,
Expand Down Expand Up @@ -77,6 +77,7 @@
"ProcessArgsDict",
"ProcessArgsSpec",
"RAY_STATE_BACKEND_CLS_PATH",
"Resource",
"StateBackendSpec",
"StateBackendArgsDict",
"StateBackendArgsSpec",
Expand Down
124 changes: 123 additions & 1 deletion plugboard-schemas/plugboard_schemas/component.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,139 @@
"""Provides `ComponentSpec` class."""

import re
import typing as _t

from pydantic import Field
from pydantic import Field, field_validator

from ._common import PlugboardBaseModel


RESOURCE_SUFFIXES = {
"n": 1e-9,
"u": 1e-6,
"m": 1e-3,
"k": 1e3,
"M": 1e6,
"G": 1e9,
"T": 1e12,
"P": 1e15,
"E": 1e18,
"Ki": 1024,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
"Pi": 1024**5,
"Ei": 1024**6,
}


def _parse_resource_value(value: str | float | int) -> float:
"""Parse a resource value from string or number.

Supports:
- Direct numerical values: 1, 0.5, 2.0
Comment thread
toby-coleman marked this conversation as resolved.
- Decimal SI prefixes: n, u, m, k, M, G, T, P, E (e.g., "250m" -> 0.25, "5k" -> 5000)
- Binary prefixes: Ki, Mi, Gi, Ti, Pi, Ei (e.g., "10Mi" -> 10485760)

Args:
value: The resource value to parse.

Returns:
The parsed float value.

Raises:
ValueError: If the value format is invalid.
"""
if isinstance(value, (int, float)):
return float(value)

# Handle string values
value = value.strip()

# Sort by length (longest first) to match "Ki" before "k", etc.
for suffix in sorted(RESOURCE_SUFFIXES.keys(), key=len, reverse=True):
if value.endswith(suffix):
# Use re.escape to safely escape the suffix in the regex pattern
pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$"
match = re.match(pattern, value)
if match:
return float(match.group(1)) * RESOURCE_SUFFIXES[suffix]
raise ValueError(f"Invalid format for suffix '{suffix}': {value}")

# Try to parse as a plain number
try:
return float(value)
except ValueError:
raise ValueError(f"Invalid resource value format: {value}")


class Resource(PlugboardBaseModel):
"""Resource requirements for a component.

Supports specification of CPU, GPU, memory, and custom resources.
Values can be specified as numbers or strings with units (e.g., "250m" for 0.25, "10Mi" for
10 * 1024 * 1024).

Attributes:
cpu: CPU requirement (default: 0.001).
gpu: GPU requirement (default: 0).
memory: Memory requirement in bytes as an integer (default: 0).
resources: Custom resource requirements as a dictionary.
"""

cpu: float = 0.001
gpu: float = 0
memory: int = 0
resources: dict[str, float] = Field(default_factory=dict)

@field_validator("cpu", "gpu", mode="before")
@classmethod
def _parse_cpu_gpu_field(cls, v: str | float | int) -> float:
"""Validate and parse CPU and GPU fields."""
return _parse_resource_value(v)

@field_validator("memory", mode="before")
@classmethod
def _parse_memory_field(cls, v: str | float | int) -> int:
"""Validate and parse memory field as integer bytes."""
return int(_parse_resource_value(v))

@field_validator("resources", mode="before")
@classmethod
def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, float]:
"""Validate and parse custom resources dictionary."""
return {key: _parse_resource_value(value) for key, value in v.items()}

def to_ray_options(self) -> dict[str, _t.Any]:
"""Convert resource requirements to Ray actor options.

Returns:
Dictionary of Ray actor options.
"""
options: dict[str, _t.Any] = {}

if self.cpu > 0:
options["num_cpus"] = self.cpu
if self.gpu > 0:
options["num_gpus"] = self.gpu
if self.memory > 0:
options["memory"] = self.memory

# Add custom resources
if self.resources:
options["resources"] = self.resources

return options


class ComponentArgsDict(_t.TypedDict):
"""`TypedDict` of the [`Component`][plugboard.component.Component] constructor arguments."""

name: str
initial_values: _t.NotRequired[dict[str, _t.Any] | None]
parameters: _t.NotRequired[dict[str, _t.Any] | None]
constraints: _t.NotRequired[dict[str, _t.Any] | None]
resources: _t.NotRequired[Resource | None]


class ComponentArgsSpec(PlugboardBaseModel, extra="allow"):
Expand All @@ -24,12 +144,14 @@ class ComponentArgsSpec(PlugboardBaseModel, extra="allow"):
initial_values: Initial values for the `Component`.
parameters: Parameters for the `Component`.
constraints: Constraints for the `Component`.
resources: Resource requirements for the `Component`.
"""

name: str = Field(pattern=r"^([a-zA-Z_][a-zA-Z0-9_-]*)$")
initial_values: dict[str, _t.Any] = {}
parameters: dict[str, _t.Any] = {}
constraints: dict[str, _t.Any] = {}
resources: "Resource | None" = None


class ComponentSpec(PlugboardBaseModel):
Expand Down
Loading
Loading