-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[data] fix: merge metrics from all workers in DataProto.concat() #3699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] fix: merge metrics from all workers in DataProto.concat() #3699
Conversation
Fix critical bug where DataProto.concat() would crash with AttributeError when merging metrics from distributed workers. Problem: - Implementation incorrectly called .keys() on metrics (a list of dicts) - This caused: AttributeError: 'list' object has no attribute 'keys' - Would lose metrics if first worker had none but other workers did Solution: - Correctly treat metrics as list of dicts, not dict - Aggregate metrics from ALL workers using .extend() - Handle edge cases: missing metrics, non-list values Testing: - test_concat_metrics_from_multiple_workers: all workers have metrics - test_concat_with_empty_and_non_list_meta_info: partial metrics - test_concat_first_worker_missing_metrics: first worker has no metrics - test_concat_non_list_metrics: single dict instead of list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a critical bug in DataProto.concat() by ensuring metrics from all distributed workers are aggregated. The addition of comprehensive unit tests is excellent and covers various important edge cases for the metrics merging logic.
I've identified one potential issue related to how non-metric meta_info is handled, which could lead to data loss in certain scenarios. My review comment includes a detailed explanation and a suggested code improvement to make the merging logic more robust and consistent with the updated documentation.
Previous implementation only preserved non-metric meta_info from the first worker, causing silent data loss when other workers had unique keys. This change iterates through all workers to merge their non-metric meta_info while detecting conflicts and aggregating metrics in a single loop. Changes: - Iterate through all workers to collect non-metric meta_info - Assert on conflicting values for overlapping keys - Add tests for merging different keys and detecting conflicts
When multiple workers return metrics, DataProto.concat() now flattens the list
of metric dicts into a dict of lists using list_of_dict_to_dict_of_list(). This
ensures metrics have a consistent structure regardless of whether they come from
1 or N workers, and allows reduce_metrics() to work without modification.
This is a cleaner solution than handling list input in reduce_metrics(), as it:
- Keeps metrics aggregation logic in the data layer (single responsibility)
- Maintains a consistent API where meta_info["metrics"] is always dict[str, list]
- Avoids leaking DataProto's concat behavior to all metrics consumers
Changes:
- DataProto.concat(): Flatten list of metric dicts to dict of lists
- Update tests to expect flattened metrics format
Fixes the error:
File "verl/trainer/ppo/ray_trainer.py", line 1129, in fit
critic_output_metrics = reduce_metrics(critic_output.meta_info["metrics"])
AttributeError: 'list' object has no attribute 'items'
Related: volcengine#602
07b0efc to
5a20e08
Compare
Two tests were not updated in commit 5a20e08 and expected the old list format. Update them to expect dict[str, list] instead of list[dict]. Fixes CI test failures in test_concat_metrics_from_multiple_workers and test_concat_with_empty_and_non_list_meta_info.
|
This PR makes DataProto not general. We make perform metrics allgather inside workers |
…cengine#3699) ## Summary Fix `DataProto.concat()` to properly merge all `meta_info` keys from all workers, preventing silent data loss when workers have different non-metric keys. ## Problem **Previous implementation** only preserved non-metric `meta_info` from the first worker: ```python # Old code - only looks at data[0] merged_meta_info = {k: v for k, v in data[0].meta_info.items() if k != "metrics"} ``` This caused **silent data loss** when workers had different non-metric keys: - `data[0].meta_info = {"config": "A"}` ✓ preserved - `data[1].meta_info = {"extra_key": "B"}` ❌ **lost** - Result: `{"config": "A"}` - missing `extra_key` This contradicts the docstring which states meta_info is "merged". ## Solution **This PR** iterates through ALL workers to merge their non-metric meta_info while aggregating metrics: ```python # Merge non-metric meta_info and aggregate metrics from all workers all_metrics = [] for d in data: for k, v in d.meta_info.items(): if k == "metrics": if v is not None: if isinstance(v, list): all_metrics.extend(v) else: all_metrics.append(v) else: if k in merged_meta_info: # Ensure consistency for overlapping non-metric keys assert merged_meta_info[k] == v, f"Conflicting values for meta_info key '{k}'" else: merged_meta_info[k] = v if all_metrics: merged_meta_info["metrics"] = all_metrics ``` **Key improvements**: - ✅ All non-metric keys from all workers are preserved - ✅ Detects conflicting values for the same key across workers - ✅ Aggregates metrics from all workers in a single loop - ✅ Handles edge cases: missing metrics, non-list values ## Testing Added 6 comprehensive unit tests in `tests/test_protocol_on_cpu.py`: - `test_concat_metrics_from_multiple_workers` - All workers have metrics - `test_concat_with_empty_and_non_list_meta_info` - Partial metrics coverage - `test_concat_first_worker_missing_metrics` - First worker has no metrics - `test_concat_non_list_metrics` - Single dict instead of list - `test_concat_merge_different_non_metric_keys` - Different keys across workers - `test_concat_conflicting_non_metric_keys` - Conflict detection ## Files Changed - `verl/protocol.py`: Updated `DataProto.concat()` to merge all meta_info keys - `tests/test_protocol_on_cpu.py`: Added 2 new tests (6 total) covering all edge cases --- ### Checklist - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md) - [x] Pre-commit checks passed (ruff, mypy, etc.) - [x] Documentation updated (N/A - bug fix, no API changes) - [x] Unit tests added (4 comprehensive tests covering all edge cases) - [ ] CI request (pending)
…cengine#3699) ## Summary Fix `DataProto.concat()` to properly merge all `meta_info` keys from all workers, preventing silent data loss when workers have different non-metric keys. ## Problem **Previous implementation** only preserved non-metric `meta_info` from the first worker: ```python # Old code - only looks at data[0] merged_meta_info = {k: v for k, v in data[0].meta_info.items() if k != "metrics"} ``` This caused **silent data loss** when workers had different non-metric keys: - `data[0].meta_info = {"config": "A"}` ✓ preserved - `data[1].meta_info = {"extra_key": "B"}` ❌ **lost** - Result: `{"config": "A"}` - missing `extra_key` This contradicts the docstring which states meta_info is "merged". ## Solution **This PR** iterates through ALL workers to merge their non-metric meta_info while aggregating metrics: ```python # Merge non-metric meta_info and aggregate metrics from all workers all_metrics = [] for d in data: for k, v in d.meta_info.items(): if k == "metrics": if v is not None: if isinstance(v, list): all_metrics.extend(v) else: all_metrics.append(v) else: if k in merged_meta_info: # Ensure consistency for overlapping non-metric keys assert merged_meta_info[k] == v, f"Conflicting values for meta_info key '{k}'" else: merged_meta_info[k] = v if all_metrics: merged_meta_info["metrics"] = all_metrics ``` **Key improvements**: - ✅ All non-metric keys from all workers are preserved - ✅ Detects conflicting values for the same key across workers - ✅ Aggregates metrics from all workers in a single loop - ✅ Handles edge cases: missing metrics, non-list values ## Testing Added 6 comprehensive unit tests in `tests/test_protocol_on_cpu.py`: - `test_concat_metrics_from_multiple_workers` - All workers have metrics - `test_concat_with_empty_and_non_list_meta_info` - Partial metrics coverage - `test_concat_first_worker_missing_metrics` - First worker has no metrics - `test_concat_non_list_metrics` - Single dict instead of list - `test_concat_merge_different_non_metric_keys` - Different keys across workers - `test_concat_conflicting_non_metric_keys` - Conflict detection ## Files Changed - `verl/protocol.py`: Updated `DataProto.concat()` to merge all meta_info keys - `tests/test_protocol_on_cpu.py`: Added 2 new tests (6 total) covering all edge cases --- ### Checklist - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md) - [x] Pre-commit checks passed (ruff, mypy, etc.) - [x] Documentation updated (N/A - bug fix, no API changes) - [x] Unit tests added (4 comprehensive tests covering all edge cases) - [ ] CI request (pending)
…cengine#3699) ## Summary Fix `DataProto.concat()` to properly merge all `meta_info` keys from all workers, preventing silent data loss when workers have different non-metric keys. ## Problem **Previous implementation** only preserved non-metric `meta_info` from the first worker: ```python # Old code - only looks at data[0] merged_meta_info = {k: v for k, v in data[0].meta_info.items() if k != "metrics"} ``` This caused **silent data loss** when workers had different non-metric keys: - `data[0].meta_info = {"config": "A"}` ✓ preserved - `data[1].meta_info = {"extra_key": "B"}` ❌ **lost** - Result: `{"config": "A"}` - missing `extra_key` This contradicts the docstring which states meta_info is "merged". ## Solution **This PR** iterates through ALL workers to merge their non-metric meta_info while aggregating metrics: ```python # Merge non-metric meta_info and aggregate metrics from all workers all_metrics = [] for d in data: for k, v in d.meta_info.items(): if k == "metrics": if v is not None: if isinstance(v, list): all_metrics.extend(v) else: all_metrics.append(v) else: if k in merged_meta_info: # Ensure consistency for overlapping non-metric keys assert merged_meta_info[k] == v, f"Conflicting values for meta_info key '{k}'" else: merged_meta_info[k] = v if all_metrics: merged_meta_info["metrics"] = all_metrics ``` **Key improvements**: - ✅ All non-metric keys from all workers are preserved - ✅ Detects conflicting values for the same key across workers - ✅ Aggregates metrics from all workers in a single loop - ✅ Handles edge cases: missing metrics, non-list values ## Testing Added 6 comprehensive unit tests in `tests/test_protocol_on_cpu.py`: - `test_concat_metrics_from_multiple_workers` - All workers have metrics - `test_concat_with_empty_and_non_list_meta_info` - Partial metrics coverage - `test_concat_first_worker_missing_metrics` - First worker has no metrics - `test_concat_non_list_metrics` - Single dict instead of list - `test_concat_merge_different_non_metric_keys` - Different keys across workers - `test_concat_conflicting_non_metric_keys` - Conflict detection ## Files Changed - `verl/protocol.py`: Updated `DataProto.concat()` to merge all meta_info keys - `tests/test_protocol_on_cpu.py`: Added 2 new tests (6 total) covering all edge cases --- ### Checklist - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md) - [x] Pre-commit checks passed (ruff, mypy, etc.) - [x] Documentation updated (N/A - bug fix, no API changes) - [x] Unit tests added (4 comprehensive tests covering all edge cases) - [ ] CI request (pending)
…cengine#3699) ## Summary Fix `DataProto.concat()` to properly merge all `meta_info` keys from all workers, preventing silent data loss when workers have different non-metric keys. ## Problem **Previous implementation** only preserved non-metric `meta_info` from the first worker: ```python # Old code - only looks at data[0] merged_meta_info = {k: v for k, v in data[0].meta_info.items() if k != "metrics"} ``` This caused **silent data loss** when workers had different non-metric keys: - `data[0].meta_info = {"config": "A"}` ✓ preserved - `data[1].meta_info = {"extra_key": "B"}` ❌ **lost** - Result: `{"config": "A"}` - missing `extra_key` This contradicts the docstring which states meta_info is "merged". ## Solution **This PR** iterates through ALL workers to merge their non-metric meta_info while aggregating metrics: ```python # Merge non-metric meta_info and aggregate metrics from all workers all_metrics = [] for d in data: for k, v in d.meta_info.items(): if k == "metrics": if v is not None: if isinstance(v, list): all_metrics.extend(v) else: all_metrics.append(v) else: if k in merged_meta_info: # Ensure consistency for overlapping non-metric keys assert merged_meta_info[k] == v, f"Conflicting values for meta_info key '{k}'" else: merged_meta_info[k] = v if all_metrics: merged_meta_info["metrics"] = all_metrics ``` **Key improvements**: - ✅ All non-metric keys from all workers are preserved - ✅ Detects conflicting values for the same key across workers - ✅ Aggregates metrics from all workers in a single loop - ✅ Handles edge cases: missing metrics, non-list values ## Testing Added 6 comprehensive unit tests in `tests/test_protocol_on_cpu.py`: - `test_concat_metrics_from_multiple_workers` - All workers have metrics - `test_concat_with_empty_and_non_list_meta_info` - Partial metrics coverage - `test_concat_first_worker_missing_metrics` - First worker has no metrics - `test_concat_non_list_metrics` - Single dict instead of list - `test_concat_merge_different_non_metric_keys` - Different keys across workers - `test_concat_conflicting_non_metric_keys` - Conflict detection ## Files Changed - `verl/protocol.py`: Updated `DataProto.concat()` to merge all meta_info keys - `tests/test_protocol_on_cpu.py`: Added 2 new tests (6 total) covering all edge cases --- ### Checklist - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md) - [x] Pre-commit checks passed (ruff, mypy, etc.) - [x] Documentation updated (N/A - bug fix, no API changes) - [x] Unit tests added (4 comprehensive tests covering all edge cases) - [ ] CI request (pending)
Summary
Fix
DataProto.concat()to properly merge allmeta_infokeys from all workers, preventing silent data loss when workers have different non-metric keys.Problem
Previous implementation only preserved non-metric
meta_infofrom the first worker:This caused silent data loss when workers had different non-metric keys:
data[0].meta_info = {"config": "A"}✓ preserveddata[1].meta_info = {"extra_key": "B"}❌ lost{"config": "A"}- missingextra_keyThis contradicts the docstring which states meta_info is "merged".
Solution
This PR iterates through ALL workers to merge their non-metric meta_info while aggregating metrics:
Key improvements:
Testing
Added 6 comprehensive unit tests in
tests/test_protocol_on_cpu.py:test_concat_metrics_from_multiple_workers- All workers have metricstest_concat_with_empty_and_non_list_meta_info- Partial metrics coveragetest_concat_first_worker_missing_metrics- First worker has no metricstest_concat_non_list_metrics- Single dict instead of listtest_concat_merge_different_non_metric_keys- Different keys across workerstest_concat_conflicting_non_metric_keys- Conflict detectionFiles Changed
verl/protocol.py: UpdatedDataProto.concat()to merge all meta_info keystests/test_protocol_on_cpu.py: Added 2 new tests (6 total) covering all edge casesChecklist