Skip to content

Commit 02b065c

Browse files
ccclyutechkang
authored andcommitted
[trainer] refactor: move rollout log to inheritable trainer (#3576)
### What does this PR do? move log rollout logic to one standalone function that can be re-used in other trainers such as `DAPORayTrainer` etc and avoid duplicated code. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
1 parent 54c204a commit 02b065c

File tree

4 files changed

+40
-55
lines changed

4 files changed

+40
-55
lines changed

recipe/dapo/dapo_ray_trainer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,11 @@ def fit(self):
332332
actor_output_metrics = reduce_metrics(actor_output.meta_info["metrics"])
333333
metrics.update(actor_output_metrics)
334334

335+
# Log rollout generations if enabled
336+
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
337+
if rollout_data_dir:
338+
self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)
339+
335340
# validate
336341
if (
337342
self.val_reward_fn is not None

recipe/one_step_off_policy/ray_trainer.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -552,22 +552,7 @@ def fit(self):
552552
# Log rollout generations if enabled
553553
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
554554
if rollout_data_dir:
555-
with marked_timer("dump_rollout_generations", timing_raw, color="green"):
556-
inputs = self.tokenizer.batch_decode(batch.batch["prompts"], skip_special_tokens=True)
557-
outputs = self.tokenizer.batch_decode(batch.batch["responses"], skip_special_tokens=True)
558-
scores = batch.batch["token_level_scores"].sum(-1).cpu().tolist()
559-
sample_gts = [
560-
item.non_tensor_batch.get("reward_model", {}).get("ground_truth", None) for item in batch
561-
]
562-
563-
self._dump_generations(
564-
inputs=inputs,
565-
outputs=outputs,
566-
gts=sample_gts,
567-
scores=scores,
568-
reward_extra_infos_dict=reward_extra_infos_dict,
569-
dump_path=rollout_data_dir,
570-
)
555+
self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)
571556

572557
# validate
573558
if (

recipe/sppo/sppo_ray_trainer.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -308,22 +308,7 @@ def fit(self):
308308
# Log rollout generations if enabled
309309
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
310310
if rollout_data_dir:
311-
with simple_timer("dump_rollout_generations", timing_raw):
312-
print(batch.batch.keys())
313-
inputs = self.tokenizer.batch_decode(batch.batch["prompts"], skip_special_tokens=True)
314-
outputs = self.tokenizer.batch_decode(batch.batch["responses"], skip_special_tokens=True)
315-
scores = batch.batch["token_level_scores"].sum(-1).cpu().tolist()
316-
sample_gts = [
317-
item.non_tensor_batch.get("reward_model", {}).get("ground_truth", None) for item in batch
318-
]
319-
self._dump_generations(
320-
inputs=inputs,
321-
outputs=outputs,
322-
scores=scores,
323-
reward_extra_infos_dict=reward_extra_infos_dict,
324-
gts=sample_gts,
325-
dump_path=rollout_data_dir,
326-
)
311+
self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)
327312

328313
# validate
329314
if (

verl/trainer/ppo/ray_trainer.py

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,38 @@ def _dump_generations(self, inputs, outputs, gts, scores, reward_extra_infos_dic
441441

442442
print(f"Dumped generations to {filename}")
443443

444+
def _log_rollout_data(
445+
self, batch: DataProto, reward_extra_infos_dict: dict, timing_raw: dict, rollout_data_dir: str
446+
):
447+
"""Log rollout data to disk.
448+
Args:
449+
batch (DataProto): The batch containing rollout data
450+
reward_extra_infos_dict (dict): Additional reward information to log
451+
timing_raw (dict): Timing information for profiling
452+
rollout_data_dir (str): Directory path to save the rollout data
453+
"""
454+
with marked_timer("dump_rollout_generations", timing_raw, color="green"):
455+
inputs = self.tokenizer.batch_decode(batch.batch["prompts"], skip_special_tokens=True)
456+
outputs = self.tokenizer.batch_decode(batch.batch["responses"], skip_special_tokens=True)
457+
scores = batch.batch["token_level_scores"].sum(-1).cpu().tolist()
458+
sample_gts = [item.non_tensor_batch.get("reward_model", {}).get("ground_truth", None) for item in batch]
459+
460+
reward_extra_infos_to_dump = reward_extra_infos_dict.copy()
461+
if "request_id" in batch.non_tensor_batch:
462+
reward_extra_infos_dict.setdefault(
463+
"request_id",
464+
batch.non_tensor_batch["request_id"].tolist(),
465+
)
466+
467+
self._dump_generations(
468+
inputs=inputs,
469+
outputs=outputs,
470+
gts=sample_gts,
471+
scores=scores,
472+
reward_extra_infos_dict=reward_extra_infos_to_dump,
473+
dump_path=rollout_data_dir,
474+
)
475+
444476
def _maybe_log_val_generations(self, inputs, outputs, scores):
445477
"""Log a table of validation samples to the configured logger (wandb or swanlab)"""
446478

@@ -1111,29 +1143,7 @@ def fit(self):
11111143
# Log rollout generations if enabled
11121144
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
11131145
if rollout_data_dir:
1114-
with marked_timer("dump_rollout_generations", timing_raw, color="green"):
1115-
inputs = self.tokenizer.batch_decode(batch.batch["prompts"], skip_special_tokens=True)
1116-
outputs = self.tokenizer.batch_decode(batch.batch["responses"], skip_special_tokens=True)
1117-
scores = batch.batch["token_level_scores"].sum(-1).cpu().tolist()
1118-
sample_gts = [
1119-
item.non_tensor_batch.get("reward_model", {}).get("ground_truth", None)
1120-
for item in batch
1121-
]
1122-
1123-
if "request_id" in batch.non_tensor_batch:
1124-
reward_extra_infos_dict.setdefault(
1125-
"request_id",
1126-
batch.non_tensor_batch["request_id"].tolist(),
1127-
)
1128-
1129-
self._dump_generations(
1130-
inputs=inputs,
1131-
outputs=outputs,
1132-
gts=sample_gts,
1133-
scores=scores,
1134-
reward_extra_infos_dict=reward_extra_infos_dict,
1135-
dump_path=rollout_data_dir,
1136-
)
1146+
self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)
11371147

11381148
# validate
11391149
if (

0 commit comments

Comments
 (0)