Skip to content

Commit df2a159

Browse files
authored
Add tutorial for launching workers on separate machines (#213)
1 parent 675fc86 commit df2a159

File tree

3 files changed

+73
-6
lines changed

3 files changed

+73
-6
lines changed

.github/workflows/examples-calc-x.yml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,16 @@ jobs:
156156

157157
- name: Calc-X training with external store
158158
run: |
159-
set -ex
159+
set -euo pipefail
160160
source .venv/bin/activate
161161
cd examples/calc_x
162162
../../scripts/restart_ray.sh
163163
164164
agl store --port 4747 &
165165
sleep 5
166-
AGL_MANAGED_STORE=0 AGL_CURRENT_ROLE=runner python train_calc_agent.py --external-store-address http://localhost:4747 --val-file data/test_mini.parquet --ci &
166+
AGL_MANAGED_STORE=0 AGL_CURRENT_ROLE=runner python train_calc_agent.py --external-store-address http://localhost:4747 --val-file data/test_mini.parquet --ci-fast &
167167
sleep 5
168-
AGL_MANAGED_STORE=0 AGL_CURRENT_ROLE=algorithm python train_calc_agent.py --external-store-address http://localhost:4747 --val-file data/test_mini.parquet --ci
168+
AGL_MANAGED_STORE=0 AGL_CURRENT_ROLE=algorithm python train_calc_agent.py --external-store-address http://localhost:4747 --val-file data/test_mini.parquet --ci-fast
169169
170170
pkill -f agl && echo "SIGTERM sent to agl" || echo "No agl process found"
171171
while pgrep -f agl; do
@@ -178,10 +178,30 @@ jobs:
178178
sleep 5
179179
done
180180
echo "train_calc_agent.py has finished."
181-
182-
sleep 10
183181
shell: bash
184182
env:
185183
WANDB_BASE_URL: ${{ secrets.MSR_WANDB_BASE_URL }}
186184
WANDB_API_KEY: ${{ secrets.MSR_WANDB_API_KEY }}
187185
id: calc_x_train_external_store
186+
187+
- name: Calc-X training with role-based environment variables
188+
run: |
189+
set -euo pipefail
190+
source .venv/bin/activate
191+
cd examples/calc_x
192+
../../scripts/restart_ray.sh
193+
194+
PYTHONUNBUFFERED=1 AGL_SERVER_HOST=127.0.0.1 AGL_SERVER_PORT=5858 AGL_CURRENT_ROLE=runner python train_calc_agent.py --val-file data/test_mini.parquet --ci-fast &
195+
sleep 5
196+
PYTHONUNBUFFERED=1 AGL_SERVER_HOST=0.0.0.0 AGL_SERVER_PORT=5858 AGL_CURRENT_ROLE=algorithm python train_calc_agent.py --val-file data/test_mini.parquet --ci-fast
197+
198+
pkill -f train_calc_agent.py && echo "SIGTERM sent to train_calc_agent.py" || echo "No train_calc_agent.py process found"
199+
while pgrep -f train_calc_agent.py; do
200+
echo "Waiting for train_calc_agent.py to finish..."
201+
sleep 5
202+
done
203+
echo "train_calc_agent.py has finished."
204+
shell: bash
205+
env:
206+
WANDB_BASE_URL: ${{ secrets.MSR_WANDB_BASE_URL }}
207+
WANDB_API_KEY: ${{ secrets.MSR_WANDB_API_KEY }}

docs/tutorials/parallelize.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,39 @@ Set `AGL_SERVER_HOST` and `AGL_SERVER_PORT` if you prefer environment-based conf
167167

168168
Algorithms sometimes require heterogeneous computation resources, such as GPU accelerators, while runners sometimes require a specific environment to run because many agent frameworks are fragile in their dependencies. A role-based launch pattern helps you place the algorithm on a dedicated machine with more GPU memory, while runners can live on another machine with more flexible dependencies. This is possible via `AGL_CURRENT_ROLE="algorithm"` or `AGL_CURRENT_ROLE="runner"` environment variables. When running on different machines, you also need to set `AGL_SERVER_HOST` and `AGL_SERVER_PORT` to the IP address and port of the algorithm machine. You might recognize that this convention is very similar to `MASTER_ADDR` and `MASTER_PORT` in [PyTorch distributed training](https://docs.pytorch.org/docs/stable/notes/ddp.html).
169169

170+
### Launching Algorithm and Runner Roles on Separate Machines
171+
172+
When you want to stretch the algorithm onto a GPU-rich machine and keep rollout workers close to the data source (or on machines with a more permissive dependency stack), launch the same training script in different terminals with role-specific environment variables. The client–server strategy will route each process to the right side of the queue as long as they share the same `AGL_SERVER_HOST`/`AGL_SERVER_PORT` pair.
173+
174+
**1. Pick an address and port for the store.** Decide which machine will host the algorithm. Choose a TCP port that can be reached by the runner machines (for example, open it in your firewall configuration). In this example we will use `10.0.0.4:4747`.
175+
176+
**2. Start the algorithm process.** On the machine that should run the algorithm, expose the store by binding to all network interfaces and mark the role as `algorithm`.
177+
178+
```bash
179+
export AGL_SERVER_HOST=0.0.0.0
180+
export AGL_SERVER_PORT=4747
181+
export AGL_CURRENT_ROLE=algorithm
182+
183+
python train_calc_agent.py
184+
```
185+
186+
Leaving `AGL_MANAGED_STORE` unset (or setting it to `1`) lets the strategy create the [`LightningStoreServer`][agentlightning.LightningStoreServer] for you. Otherwise, you can use the method in the previous section to create a store on your own.
187+
188+
**3. Start rollout workers on remote machines.** Every runner machine should point to the algorithm host and declare itself as the `runner` role. You can start multiple processes per machine or repeat the command on additional hosts.
189+
190+
```bash
191+
export AGL_SERVER_HOST=10.0.0.4
192+
export AGL_SERVER_PORT=4747
193+
export AGL_CURRENT_ROLE=runner
194+
python train_calc_agent.py --n-runners 4
195+
```
196+
197+
The runner process automatically connects via [`LightningStoreClient`][agentlightning.LightningStoreClient]. Adjust `--n-runners` to spawn the desired number of worker processes on that machine.
198+
199+
**4. Scale out as needed.** Repeat step 3 on as many machines as you need. When you are done, stop the algorithm process. However, since the runners are on different machines, the strategy WILL NOT send a cooperative stop signal to the connected runners. So you need to kill the runners on your own.
200+
201+
This role-based launch mirrors what [`Trainer.fit`][agentlightning.Trainer.fit] does inside a single machine while letting you spread work across a fleet. Because every process shares the same training script, you keep a single source of truth for dataset loading, adapters, and tracers, but you can tune compute resources independently for the algorithm and rollout workers.
202+
170203
### Shared-memory Strategy
171204

172205
[`SharedMemoryExecutionStrategy`][agentlightning.SharedMemoryExecutionStrategy] keeps everything inside one process. The runner runs on the main thread (by default) while the algorithm lives on a Python thread guarded by [`LightningStoreThreaded`][agentlightning.LightningStoreThreaded].

examples/calc_x/train_calc_agent.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def train(
105105
model: Optional[str],
106106
llm_proxy: bool,
107107
ci: bool,
108+
ci_fast: bool,
108109
n_runners: int,
109110
external_store_address: str,
110111
):
@@ -117,6 +118,7 @@ def train(
117118
llm_proxy: Whether to enable LLM Proxy tracing/adapter.
118119
ci: Whether to run a minimal CI-style training loop.
119120
n_runners: The number of runners for the Trainer.
121+
ci_fast: Whether to cap the training loop at a single step (implies CI toggles).
120122
external_store_address: Connects to an external store instead of creating a new one in memory.
121123
"""
122124
# Load datasets (respect CLI file paths)
@@ -134,7 +136,7 @@ def train(
134136
config["actor_rollout_ref"]["model"]["path"] = model
135137

136138
# CI toggle keeps everything else the same but you can tweak the lightweight bits here if desired
137-
if ci:
139+
if ci or ci_fast:
138140
# Config the experiment name and project name so that they are available to CI
139141
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
140142
EXPERIMENT_NAME = f"calc_x_{timestamp}"
@@ -161,6 +163,11 @@ def train(
161163
config["trainer"]["project_name"] = PROJECT_NAME
162164
config["trainer"].pop("save_freq", None)
163165

166+
if ci_fast:
167+
# Extra fast CI toggle for testing purposes.
168+
config["trainer"]["total_training_steps"] = 1
169+
config["trainer"]["test_freq"] = 1
170+
164171
algorithm = agl.VERL(config)
165172

166173
if external_store_address:
@@ -185,6 +192,9 @@ def main():
185192
parser.add_argument("--model", type=str, default=None, help="HF model id or path (optional)")
186193
parser.add_argument("--llm-proxy", action="store_true", help="Enable LLM Proxy tracing/adapter")
187194
parser.add_argument("--ci", action="store_true", help="Run a minimal CI-style training loop")
195+
parser.add_argument(
196+
"--ci-fast", action="store_true", help="Limit the training loop to a single step (implies --ci)"
197+
)
188198
parser.add_argument("--n-runners", type=int, default=10, help="Number of runners for Trainer")
189199
parser.add_argument(
190200
"--external-store-address",
@@ -203,12 +213,16 @@ def main():
203213
"Otherwise the trainer will still try to manage the store lifecycle for you!"
204214
)
205215

216+
if args.ci_fast:
217+
args.ci = True
218+
206219
train(
207220
train_file=args.train_file,
208221
val_file=args.val_file,
209222
model=args.model,
210223
llm_proxy=args.llm_proxy,
211224
ci=args.ci,
225+
ci_fast=args.ci_fast,
212226
n_runners=args.n_runners,
213227
external_store_address=args.external_store_address,
214228
)

0 commit comments

Comments
 (0)