-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[worker] fix: create a new event loop if none exists #3703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request replaces the deprecated asyncio.get_event_loop().run_until_complete() with the more modern and safer asyncio.run(). This is a good practice as asyncio.run() handles the event loop lifecycle automatically. However, I've identified a potential issue related to the mixed usage of synchronous and asynchronous patterns in the ActorRolloutRefWorker class hierarchy, which could lead to runtime errors in the future. Please see my detailed comment.
3d2a875 to
eef6a1c
Compare
* fix * [worker] fix: create a new event loop if none exists (volcengine#3703) ### What does this PR do? I am working on integrating transferqueue into verl. Specifically, we convert metadata into dataproto in the `register` method of `single_controller/base/decorator.py/`. In this step, `asyncio.run(tq_client.async_get_data(metadata)` is called to get the specific data. If `asyncio.run` and `asyncio.get_event_loop` are called sequentially in the same thread, a RuntimeError: `There is no current event loop in thread %r` is thrown. This PR fixes the above-mentioned issue. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [ ] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).) * fix put_data --------- Co-authored-by: Huazhong <[email protected]>
### What does this PR do?
I am working on integrating transferqueue into verl. Specifically, we
convert metadata into dataproto in the `register` method of
`single_controller/base/decorator.py/`. In this step,
`asyncio.run(tq_client.async_get_data(metadata)` is called to get the
specific data.
If `asyncio.run` and `asyncio.get_event_loop` are called sequentially in
the same thread, a RuntimeError: `There is no current event loop in
thread %r` is thrown.
This PR fixes the above-mentioned issue.
### Checklist Before Starting
- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
- `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
- Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`
### Test
> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.
### API and Usage Example
> Demonstrate how the API changes if any, and provide usage example(s)
if possible.
```python
# Add code snippet or script demonstrating how to use this
```
### Design & Code Changes
> Demonstrate the high-level design if this PR is complex, and list the
specific changes.
### Checklist Before Submitting
> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.
- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
… rollouts (a safer fix) (#3828) ### What does this PR do? * **Error occurred** (see below the full traces): Hit uvloop/asyncio semantics inside a Ray worker: in that process there’s no event loop bound to the (so-called) “MainThread”, so uvloop.get_event_loop() raises. * **Fix**: when the error occurs, create and set a loop before using it (similar to #3703); Handle with `try..except` logic. * **Why is this better fix than #3803**: `asyncio.run()` solves lifecycle, but can’t be nested in cases where an event loop is already running (thank @vermouth1992 for pointing out the issue). But in fact, simply doing `loop = asyncio.get_event_loop()` would raise a `RuntimeError` (see traces below) if no event loop is running, which can happen depending on the execution context. I suggest considering the current fix. ```shell Error executing job with overrides: ['algorithm.adv_estimator=grpo', 'algorithm.norm_adv_by_std_in_grpo=False', 'data.train_files=data/omni_math_rule/omni_math_rule.parquet', 'data.val_files=[data/math/aime.parquet]', 'data.train_batch_size=64', 'data.max_prompt_length=512', 'data.max_response_length=8192', 'data.filter_overlong_prompts=True', 'data.truncation=error', 'actor_rollout_ref.model.path=Qwen/Qwen3-1.7B', 'actor_rollout_ref.actor.optim.lr=1e-6', 'actor_rollout_ref.actor.loss_agg_mode=token-mean', 'actor_rollout_ref.model.use_remove_padding=True', 'actor_rollout_ref.actor.use_dynamic_bsz=true', 'actor_rollout_ref.ref.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.actor.ppo_max_token_len_per_gpu=8704', 'actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.actor.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.actor.ppo_mini_batch_size=32', 'actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4', 'actor_rollout_ref.actor.use_kl_loss=False', 'actor_rollout_ref.actor.kl_loss_coef=0.0', 'actor_rollout_ref.actor.clip_ratio_low=0.2', 'actor_rollout_ref.actor.clip_ratio_high=0.28', 'actor_rollout_ref.actor.kl_loss_type=low_var_kl', 'actor_rollout_ref.actor.entropy_coeff=0', 'actor_rollout_ref.model.enable_gradient_checkpointing=True', 'actor_rollout_ref.actor.fsdp_config.param_offload=False', 'actor_rollout_ref.actor.fsdp_config.optimizer_offload=False', 'actor_rollout_ref.actor.entropy_checkpointing=true', 'actor_rollout_ref.rollout.tensor_model_parallel_size=1', 'actor_rollout_ref.rollout.enable_chunked_prefill=true', 'actor_rollout_ref.rollout.max_num_batched_tokens=8704', 'actor_rollout_ref.rollout.temperature=1.0', 'actor_rollout_ref.rollout.top_p=1.0', 'actor_rollout_ref.rollout.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.temperature=1.0', 'actor_rollout_ref.rollout.val_kwargs.top_p=0.7', 'actor_rollout_ref.rollout.val_kwargs.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.do_sample=true', 'actor_rollout_ref.rollout.val_kwargs.n=1', 'actor_rollout_ref.rollout.name=vllm', 'actor_rollout_ref.rollout.gpu_memory_utilization=0.5', 'actor_rollout_ref.rollout.n=8', 'actor_rollout_ref.ref.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.ref.fsdp_config.param_offload=False', 'algorithm.use_kl_in_reward=False', 'trainer.default_local_dir=/mnt/blob_output/v-dachengwen/HonestRL/logs_math-grpo-1760683127', 'trainer.critic_warmup=0', 'trainer.logger=[wandb]', 'trainer.project_name=LLM-IDK', 'trainer.experiment_name=HonestRL-1760683127', 'trainer.n_gpus_per_node=8', 'trainer.nnodes=1', 'trainer.val_before_train=false', 'trainer.save_freq=20', 'trainer.test_freq=10000000', 'trainer.total_epochs=30'] Traceback (most recent call last): File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 42, in main run_ppo(config) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 85, in run_ppo ray.get(runner.run.remote(config)) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper return fn(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 2962, in get values, debugger_breakpoint = worker.get_objects( File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 1026, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(RuntimeError): �[36mray::TaskRunner.run()�[39m (pid=9153, ip=100.64.105.140, actor_id=42e1e297ac301d0f26ffde5001000000, repr=<main_ppo.TaskRunner object at 0x7560eebc4eb0>) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 314, in run trainer.init_workers() File "/scratch/amlt_code/src/verl/trainer/ppo/ray_trainer.py", line 757, in init_workers self.actor_rollout_wg.init_model() File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 48, in __call__ output = ray.get(output) ray.exceptions.RayTaskError(RuntimeError): �[36mray::WorkerDict.actor_rollout_init_model()�[39m (pid=11755, ip=100.64.105.140, actor_id=3aa4262c5b924717a377680e01000000, repr=<verl.single_controller.ray.base.WorkerDict object at 0x78ef57785420>) File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 700, in func return getattr(self.worker_dict[key], name)(*args, **kwargs) File "/scratch/amlt_code/src/verl/single_controller/base/decorator.py", line 433, in inner return func(*args, **kwargs) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 798, in init_model self._build_rollout(trust_remote_code=self.config.model.get("trust_remote_code", False)) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 634, in _build_rollout loop = asyncio.get_event_loop() File "/opt/conda/envs/ptca/lib/python3.10/site-packages/uvloop/__init__.py", line 206, in get_event_loop raise RuntimeError( RuntimeError: There is no current event loop in thread 'MainThread'. ``` > Add **concise** overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do? If asyncio.run and asyncio.get_event_loop are called sequentially in the same thread, a 'RuntimeError: There is no current event loop in thread %r' is thrown. This PR fixes the above-mentioned issue. same issue as [pr#3703](#3703). ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: [pr#3703](#3703) - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do?
I am working on integrating transferqueue into verl. Specifically, we
convert metadata into dataproto in the `register` method of
`single_controller/base/decorator.py/`. In this step,
`asyncio.run(tq_client.async_get_data(metadata)` is called to get the
specific data.
If `asyncio.run` and `asyncio.get_event_loop` are called sequentially in
the same thread, a RuntimeError: `There is no current event loop in
thread %r` is thrown.
This PR fixes the above-mentioned issue.
### Checklist Before Starting
- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
- `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
- Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`
### Test
> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.
### API and Usage Example
> Demonstrate how the API changes if any, and provide usage example(s)
if possible.
```python
# Add code snippet or script demonstrating how to use this
```
### Design & Code Changes
> Demonstrate the high-level design if this PR is complex, and list the
specific changes.
### Checklist Before Submitting
> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.
- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do?
I am working on integrating transferqueue into verl. Specifically, we
convert metadata into dataproto in the `register` method of
`single_controller/base/decorator.py/`. In this step,
`asyncio.run(tq_client.async_get_data(metadata)` is called to get the
specific data.
If `asyncio.run` and `asyncio.get_event_loop` are called sequentially in
the same thread, a RuntimeError: `There is no current event loop in
thread %r` is thrown.
This PR fixes the above-mentioned issue.
### Checklist Before Starting
- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
- `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
- Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`
### Test
> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.
### API and Usage Example
> Demonstrate how the API changes if any, and provide usage example(s)
if possible.
```python
# Add code snippet or script demonstrating how to use this
```
### Design & Code Changes
> Demonstrate the high-level design if this PR is complex, and list the
specific changes.
### Checklist Before Submitting
> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.
- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do?
I am working on integrating transferqueue into verl. Specifically, we
convert metadata into dataproto in the `register` method of
`single_controller/base/decorator.py/`. In this step,
`asyncio.run(tq_client.async_get_data(metadata)` is called to get the
specific data.
If `asyncio.run` and `asyncio.get_event_loop` are called sequentially in
the same thread, a RuntimeError: `There is no current event loop in
thread %r` is thrown.
This PR fixes the above-mentioned issue.
### Checklist Before Starting
- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
- `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
- Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`
### Test
> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.
### API and Usage Example
> Demonstrate how the API changes if any, and provide usage example(s)
if possible.
```python
# Add code snippet or script demonstrating how to use this
```
### Design & Code Changes
> Demonstrate the high-level design if this PR is complex, and list the
specific changes.
### Checklist Before Submitting
> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.
- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
… rollouts (a safer fix) (volcengine#3828) ### What does this PR do? * **Error occurred** (see below the full traces): Hit uvloop/asyncio semantics inside a Ray worker: in that process there’s no event loop bound to the (so-called) “MainThread”, so uvloop.get_event_loop() raises. * **Fix**: when the error occurs, create and set a loop before using it (similar to volcengine#3703); Handle with `try..except` logic. * **Why is this better fix than volcengine#3803**: `asyncio.run()` solves lifecycle, but can’t be nested in cases where an event loop is already running (thank @vermouth1992 for pointing out the issue). But in fact, simply doing `loop = asyncio.get_event_loop()` would raise a `RuntimeError` (see traces below) if no event loop is running, which can happen depending on the execution context. I suggest considering the current fix. ```shell Error executing job with overrides: ['algorithm.adv_estimator=grpo', 'algorithm.norm_adv_by_std_in_grpo=False', 'data.train_files=data/omni_math_rule/omni_math_rule.parquet', 'data.val_files=[data/math/aime.parquet]', 'data.train_batch_size=64', 'data.max_prompt_length=512', 'data.max_response_length=8192', 'data.filter_overlong_prompts=True', 'data.truncation=error', 'actor_rollout_ref.model.path=Qwen/Qwen3-1.7B', 'actor_rollout_ref.actor.optim.lr=1e-6', 'actor_rollout_ref.actor.loss_agg_mode=token-mean', 'actor_rollout_ref.model.use_remove_padding=True', 'actor_rollout_ref.actor.use_dynamic_bsz=true', 'actor_rollout_ref.ref.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.actor.ppo_max_token_len_per_gpu=8704', 'actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.actor.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.actor.ppo_mini_batch_size=32', 'actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4', 'actor_rollout_ref.actor.use_kl_loss=False', 'actor_rollout_ref.actor.kl_loss_coef=0.0', 'actor_rollout_ref.actor.clip_ratio_low=0.2', 'actor_rollout_ref.actor.clip_ratio_high=0.28', 'actor_rollout_ref.actor.kl_loss_type=low_var_kl', 'actor_rollout_ref.actor.entropy_coeff=0', 'actor_rollout_ref.model.enable_gradient_checkpointing=True', 'actor_rollout_ref.actor.fsdp_config.param_offload=False', 'actor_rollout_ref.actor.fsdp_config.optimizer_offload=False', 'actor_rollout_ref.actor.entropy_checkpointing=true', 'actor_rollout_ref.rollout.tensor_model_parallel_size=1', 'actor_rollout_ref.rollout.enable_chunked_prefill=true', 'actor_rollout_ref.rollout.max_num_batched_tokens=8704', 'actor_rollout_ref.rollout.temperature=1.0', 'actor_rollout_ref.rollout.top_p=1.0', 'actor_rollout_ref.rollout.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.temperature=1.0', 'actor_rollout_ref.rollout.val_kwargs.top_p=0.7', 'actor_rollout_ref.rollout.val_kwargs.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.do_sample=true', 'actor_rollout_ref.rollout.val_kwargs.n=1', 'actor_rollout_ref.rollout.name=vllm', 'actor_rollout_ref.rollout.gpu_memory_utilization=0.5', 'actor_rollout_ref.rollout.n=8', 'actor_rollout_ref.ref.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.ref.fsdp_config.param_offload=False', 'algorithm.use_kl_in_reward=False', 'trainer.default_local_dir=/mnt/blob_output/v-dachengwen/HonestRL/logs_math-grpo-1760683127', 'trainer.critic_warmup=0', 'trainer.logger=[wandb]', 'trainer.project_name=LLM-IDK', 'trainer.experiment_name=HonestRL-1760683127', 'trainer.n_gpus_per_node=8', 'trainer.nnodes=1', 'trainer.val_before_train=false', 'trainer.save_freq=20', 'trainer.test_freq=10000000', 'trainer.total_epochs=30'] Traceback (most recent call last): File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 42, in main run_ppo(config) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 85, in run_ppo ray.get(runner.run.remote(config)) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper return fn(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 2962, in get values, debugger_breakpoint = worker.get_objects( File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 1026, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(RuntimeError): �[36mray::TaskRunner.run()�[39m (pid=9153, ip=100.64.105.140, actor_id=42e1e297ac301d0f26ffde5001000000, repr=<main_ppo.TaskRunner object at 0x7560eebc4eb0>) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 314, in run trainer.init_workers() File "/scratch/amlt_code/src/verl/trainer/ppo/ray_trainer.py", line 757, in init_workers self.actor_rollout_wg.init_model() File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 48, in __call__ output = ray.get(output) ray.exceptions.RayTaskError(RuntimeError): �[36mray::WorkerDict.actor_rollout_init_model()�[39m (pid=11755, ip=100.64.105.140, actor_id=3aa4262c5b924717a377680e01000000, repr=<verl.single_controller.ray.base.WorkerDict object at 0x78ef57785420>) File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 700, in func return getattr(self.worker_dict[key], name)(*args, **kwargs) File "/scratch/amlt_code/src/verl/single_controller/base/decorator.py", line 433, in inner return func(*args, **kwargs) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 798, in init_model self._build_rollout(trust_remote_code=self.config.model.get("trust_remote_code", False)) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 634, in _build_rollout loop = asyncio.get_event_loop() File "/opt/conda/envs/ptca/lib/python3.10/site-packages/uvloop/__init__.py", line 206, in get_event_loop raise RuntimeError( RuntimeError: There is no current event loop in thread 'MainThread'. ``` > Add **concise** overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do? If asyncio.run and asyncio.get_event_loop are called sequentially in the same thread, a 'RuntimeError: There is no current event loop in thread %r' is thrown. This PR fixes the above-mentioned issue. same issue as [pr#3703](volcengine#3703). ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: [pr#3703](volcengine#3703) - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
… rollouts (a safer fix) (volcengine#3828) ### What does this PR do? * **Error occurred** (see below the full traces): Hit uvloop/asyncio semantics inside a Ray worker: in that process there’s no event loop bound to the (so-called) “MainThread”, so uvloop.get_event_loop() raises. * **Fix**: when the error occurs, create and set a loop before using it (similar to volcengine#3703); Handle with `try..except` logic. * **Why is this better fix than volcengine#3803**: `asyncio.run()` solves lifecycle, but can’t be nested in cases where an event loop is already running (thank @vermouth1992 for pointing out the issue). But in fact, simply doing `loop = asyncio.get_event_loop()` would raise a `RuntimeError` (see traces below) if no event loop is running, which can happen depending on the execution context. I suggest considering the current fix. ```shell Error executing job with overrides: ['algorithm.adv_estimator=grpo', 'algorithm.norm_adv_by_std_in_grpo=False', 'data.train_files=data/omni_math_rule/omni_math_rule.parquet', 'data.val_files=[data/math/aime.parquet]', 'data.train_batch_size=64', 'data.max_prompt_length=512', 'data.max_response_length=8192', 'data.filter_overlong_prompts=True', 'data.truncation=error', 'actor_rollout_ref.model.path=Qwen/Qwen3-1.7B', 'actor_rollout_ref.actor.optim.lr=1e-6', 'actor_rollout_ref.actor.loss_agg_mode=token-mean', 'actor_rollout_ref.model.use_remove_padding=True', 'actor_rollout_ref.actor.use_dynamic_bsz=true', 'actor_rollout_ref.ref.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.actor.ppo_max_token_len_per_gpu=8704', 'actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.actor.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.actor.ppo_mini_batch_size=32', 'actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4', 'actor_rollout_ref.actor.use_kl_loss=False', 'actor_rollout_ref.actor.kl_loss_coef=0.0', 'actor_rollout_ref.actor.clip_ratio_low=0.2', 'actor_rollout_ref.actor.clip_ratio_high=0.28', 'actor_rollout_ref.actor.kl_loss_type=low_var_kl', 'actor_rollout_ref.actor.entropy_coeff=0', 'actor_rollout_ref.model.enable_gradient_checkpointing=True', 'actor_rollout_ref.actor.fsdp_config.param_offload=False', 'actor_rollout_ref.actor.fsdp_config.optimizer_offload=False', 'actor_rollout_ref.actor.entropy_checkpointing=true', 'actor_rollout_ref.rollout.tensor_model_parallel_size=1', 'actor_rollout_ref.rollout.enable_chunked_prefill=true', 'actor_rollout_ref.rollout.max_num_batched_tokens=8704', 'actor_rollout_ref.rollout.temperature=1.0', 'actor_rollout_ref.rollout.top_p=1.0', 'actor_rollout_ref.rollout.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.temperature=1.0', 'actor_rollout_ref.rollout.val_kwargs.top_p=0.7', 'actor_rollout_ref.rollout.val_kwargs.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.do_sample=true', 'actor_rollout_ref.rollout.val_kwargs.n=1', 'actor_rollout_ref.rollout.name=vllm', 'actor_rollout_ref.rollout.gpu_memory_utilization=0.5', 'actor_rollout_ref.rollout.n=8', 'actor_rollout_ref.ref.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.ref.fsdp_config.param_offload=False', 'algorithm.use_kl_in_reward=False', 'trainer.default_local_dir=/mnt/blob_output/v-dachengwen/HonestRL/logs_math-grpo-1760683127', 'trainer.critic_warmup=0', 'trainer.logger=[wandb]', 'trainer.project_name=LLM-IDK', 'trainer.experiment_name=HonestRL-1760683127', 'trainer.n_gpus_per_node=8', 'trainer.nnodes=1', 'trainer.val_before_train=false', 'trainer.save_freq=20', 'trainer.test_freq=10000000', 'trainer.total_epochs=30'] Traceback (most recent call last): File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 42, in main run_ppo(config) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 85, in run_ppo ray.get(runner.run.remote(config)) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper return fn(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 2962, in get values, debugger_breakpoint = worker.get_objects( File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 1026, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(RuntimeError): �[36mray::TaskRunner.run()�[39m (pid=9153, ip=100.64.105.140, actor_id=42e1e297ac301d0f26ffde5001000000, repr=<main_ppo.TaskRunner object at 0x7560eebc4eb0>) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 314, in run trainer.init_workers() File "/scratch/amlt_code/src/verl/trainer/ppo/ray_trainer.py", line 757, in init_workers self.actor_rollout_wg.init_model() File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 48, in __call__ output = ray.get(output) ray.exceptions.RayTaskError(RuntimeError): �[36mray::WorkerDict.actor_rollout_init_model()�[39m (pid=11755, ip=100.64.105.140, actor_id=3aa4262c5b924717a377680e01000000, repr=<verl.single_controller.ray.base.WorkerDict object at 0x78ef57785420>) File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 700, in func return getattr(self.worker_dict[key], name)(*args, **kwargs) File "/scratch/amlt_code/src/verl/single_controller/base/decorator.py", line 433, in inner return func(*args, **kwargs) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 798, in init_model self._build_rollout(trust_remote_code=self.config.model.get("trust_remote_code", False)) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 634, in _build_rollout loop = asyncio.get_event_loop() File "/opt/conda/envs/ptca/lib/python3.10/site-packages/uvloop/__init__.py", line 206, in get_event_loop raise RuntimeError( RuntimeError: There is no current event loop in thread 'MainThread'. ``` > Add **concise** overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do? If asyncio.run and asyncio.get_event_loop are called sequentially in the same thread, a 'RuntimeError: There is no current event loop in thread %r' is thrown. This PR fixes the above-mentioned issue. same issue as [pr#3703](volcengine#3703). ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: [pr#3703](volcengine#3703) - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
… rollouts (a safer fix) (volcengine#3828) ### What does this PR do? * **Error occurred** (see below the full traces): Hit uvloop/asyncio semantics inside a Ray worker: in that process there’s no event loop bound to the (so-called) “MainThread”, so uvloop.get_event_loop() raises. * **Fix**: when the error occurs, create and set a loop before using it (similar to volcengine#3703); Handle with `try..except` logic. * **Why is this better fix than volcengine#3803**: `asyncio.run()` solves lifecycle, but can’t be nested in cases where an event loop is already running (thank @vermouth1992 for pointing out the issue). But in fact, simply doing `loop = asyncio.get_event_loop()` would raise a `RuntimeError` (see traces below) if no event loop is running, which can happen depending on the execution context. I suggest considering the current fix. ```shell Error executing job with overrides: ['algorithm.adv_estimator=grpo', 'algorithm.norm_adv_by_std_in_grpo=False', 'data.train_files=data/omni_math_rule/omni_math_rule.parquet', 'data.val_files=[data/math/aime.parquet]', 'data.train_batch_size=64', 'data.max_prompt_length=512', 'data.max_response_length=8192', 'data.filter_overlong_prompts=True', 'data.truncation=error', 'actor_rollout_ref.model.path=Qwen/Qwen3-1.7B', 'actor_rollout_ref.actor.optim.lr=1e-6', 'actor_rollout_ref.actor.loss_agg_mode=token-mean', 'actor_rollout_ref.model.use_remove_padding=True', 'actor_rollout_ref.actor.use_dynamic_bsz=true', 'actor_rollout_ref.ref.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=true', 'actor_rollout_ref.actor.ppo_max_token_len_per_gpu=8704', 'actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=8704', 'actor_rollout_ref.actor.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.actor.ppo_mini_batch_size=32', 'actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4', 'actor_rollout_ref.actor.use_kl_loss=False', 'actor_rollout_ref.actor.kl_loss_coef=0.0', 'actor_rollout_ref.actor.clip_ratio_low=0.2', 'actor_rollout_ref.actor.clip_ratio_high=0.28', 'actor_rollout_ref.actor.kl_loss_type=low_var_kl', 'actor_rollout_ref.actor.entropy_coeff=0', 'actor_rollout_ref.model.enable_gradient_checkpointing=True', 'actor_rollout_ref.actor.fsdp_config.param_offload=False', 'actor_rollout_ref.actor.fsdp_config.optimizer_offload=False', 'actor_rollout_ref.actor.entropy_checkpointing=true', 'actor_rollout_ref.rollout.tensor_model_parallel_size=1', 'actor_rollout_ref.rollout.enable_chunked_prefill=true', 'actor_rollout_ref.rollout.max_num_batched_tokens=8704', 'actor_rollout_ref.rollout.temperature=1.0', 'actor_rollout_ref.rollout.top_p=1.0', 'actor_rollout_ref.rollout.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.temperature=1.0', 'actor_rollout_ref.rollout.val_kwargs.top_p=0.7', 'actor_rollout_ref.rollout.val_kwargs.top_k=-1', 'actor_rollout_ref.rollout.val_kwargs.do_sample=true', 'actor_rollout_ref.rollout.val_kwargs.n=1', 'actor_rollout_ref.rollout.name=vllm', 'actor_rollout_ref.rollout.gpu_memory_utilization=0.5', 'actor_rollout_ref.rollout.n=8', 'actor_rollout_ref.ref.ulysses_sequence_parallel_size=1', 'actor_rollout_ref.ref.fsdp_config.param_offload=False', 'algorithm.use_kl_in_reward=False', 'trainer.default_local_dir=/mnt/blob_output/v-dachengwen/HonestRL/logs_math-grpo-1760683127', 'trainer.critic_warmup=0', 'trainer.logger=[wandb]', 'trainer.project_name=LLM-IDK', 'trainer.experiment_name=HonestRL-1760683127', 'trainer.n_gpus_per_node=8', 'trainer.nnodes=1', 'trainer.val_before_train=false', 'trainer.save_freq=20', 'trainer.test_freq=10000000', 'trainer.total_epochs=30'] Traceback (most recent call last): File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 42, in main run_ppo(config) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 85, in run_ppo ray.get(runner.run.remote(config)) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper return fn(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 2962, in get values, debugger_breakpoint = worker.get_objects( File "/opt/conda/envs/ptca/lib/python3.10/site-packages/ray/_private/worker.py", line 1026, in get_objects raise value.as_instanceof_cause() ray.exceptions.RayTaskError(RuntimeError): �[36mray::TaskRunner.run()�[39m (pid=9153, ip=100.64.105.140, actor_id=42e1e297ac301d0f26ffde5001000000, repr=<main_ppo.TaskRunner object at 0x7560eebc4eb0>) File "/scratch/amlt_code/src/verl/trainer/main_ppo.py", line 314, in run trainer.init_workers() File "/scratch/amlt_code/src/verl/trainer/ppo/ray_trainer.py", line 757, in init_workers self.actor_rollout_wg.init_model() File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 48, in __call__ output = ray.get(output) ray.exceptions.RayTaskError(RuntimeError): �[36mray::WorkerDict.actor_rollout_init_model()�[39m (pid=11755, ip=100.64.105.140, actor_id=3aa4262c5b924717a377680e01000000, repr=<verl.single_controller.ray.base.WorkerDict object at 0x78ef57785420>) File "/scratch/amlt_code/src/verl/single_controller/ray/base.py", line 700, in func return getattr(self.worker_dict[key], name)(*args, **kwargs) File "/scratch/amlt_code/src/verl/single_controller/base/decorator.py", line 433, in inner return func(*args, **kwargs) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 798, in init_model self._build_rollout(trust_remote_code=self.config.model.get("trust_remote_code", False)) File "/scratch/amlt_code/src/verl/workers/fsdp_workers.py", line 634, in _build_rollout loop = asyncio.get_event_loop() File "/opt/conda/envs/ptca/lib/python3.10/site-packages/uvloop/__init__.py", line 206, in get_event_loop raise RuntimeError( RuntimeError: There is no current event loop in thread 'MainThread'. ``` > Add **concise** overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
### What does this PR do? If asyncio.run and asyncio.get_event_loop are called sequentially in the same thread, a 'RuntimeError: There is no current event loop in thread %r' is thrown. This PR fixes the above-mentioned issue. same issue as [pr#3703](volcengine#3703). ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: [pr#3703](volcengine#3703) - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
What does this PR do?
I am working on integrating transferqueue into verl. Specifically, we convert metadata into dataproto in the
registermethod ofsingle_controller/base/decorator.py/. In this step,asyncio.run(tq_client.async_get_data(metadata)is called to get the specific data.If
asyncio.runandasyncio.get_event_loopare called sequentially in the same thread, a RuntimeError:There is no current event loop in thread %ris thrown.This PR fixes the above-mentioned issue.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)