Skip to content

Commit e94c128

Browse files
baymax591Ubuntu
authored andcommitted
[worker] fix: create a new event loop if none exists (volcengine#3839)
### What does this PR do? If asyncio.run and asyncio.get_event_loop are called sequentially in the same thread, a 'RuntimeError: There is no current event loop in thread %r' is thrown. This PR fixes the above-mentioned issue. same issue as [pr#3703](volcengine#3703). ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: [pr#3703](volcengine#3703) - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
1 parent f09d9cd commit e94c128

File tree

4 files changed

+19
-20
lines changed

4 files changed

+19
-20
lines changed

recipe/one_step_off_policy/fsdp_workers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import asyncio
1716
import logging
1817
import os
1918

@@ -41,6 +40,7 @@
4140
from verl.utils.model import get_generation_config, update_model_config
4241
from verl.utils.profiler import DistProfiler, DistProfilerExtension, ProfilerConfig, log_gpu_memory_usage, simple_timer
4342
from verl.utils.profiler.performance import reduce_timing, topk_reduce_ratio_min_max
43+
from verl.utils.ray_utils import get_event_loop
4444
from verl.workers.config import HFModelConfig, RolloutConfig
4545
from verl.workers.fsdp_workers import ActorRolloutRefWorker as ARRWorker
4646
from verl.workers.fsdp_workers import CriticWorker
@@ -97,7 +97,7 @@ def sync_rollout_weights(self):
9797
inference_model = self.rollout._engine
9898
else:
9999
raise NotImplementedError(f"Unknown rollout name: {rollout_name}")
100-
loop = asyncio.get_event_loop()
100+
loop = get_event_loop()
101101
for key, shape, dtype in self._weights_info:
102102
tensor = torch.empty(shape, dtype=dtype, device=get_torch_device().current_device())
103103
if self._is_actor:

verl/utils/ray_utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Contains commonly used utilities for ray
1616
"""
1717

18+
import asyncio
1819
import concurrent.futures
1920
import os
2021
from typing import Any, Optional
@@ -79,3 +80,13 @@ def put_data(index, data):
7980
output[index] = data_ref
8081

8182
return output
83+
84+
85+
def get_event_loop():
86+
try:
87+
loop = asyncio.get_event_loop()
88+
except RuntimeError:
89+
loop = asyncio.new_event_loop()
90+
asyncio.set_event_loop(loop)
91+
92+
return loop

verl/workers/fsdp_workers.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
The main entry point to run the PPO algorithm
1616
"""
1717

18-
import asyncio
1918
import datetime
2019
import json
2120
import logging
@@ -85,6 +84,7 @@
8584
from verl.utils.profiler import DistProfiler, DistProfilerExtension, ProfilerConfig, log_gpu_memory_usage, simple_timer
8685
from verl.utils.profiler.performance import reduce_timing, topk_reduce_ratio_min_max
8786
from verl.utils.py_functional import convert_to_regular_types
87+
from verl.utils.ray_utils import get_event_loop
8888
from verl.workers.config import FSDPCriticConfig, FSDPEngineConfig, HFModelConfig, RolloutConfig
8989
from verl.workers.rollout import get_rollout_class
9090
from verl.workers.sharding_manager.fsdp_ulysses import FSDPUlyssesShardingManager
@@ -630,11 +630,7 @@ def _build_rollout(self, trust_remote_code=False):
630630
# For sync mode, we directly switch to trainer mode here.
631631
# For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager.
632632
if rollout_config.mode == "sync" and self._is_actor:
633-
try:
634-
loop = asyncio.get_event_loop()
635-
except RuntimeError:
636-
loop = asyncio.new_event_loop()
637-
asyncio.set_event_loop(loop)
633+
loop = get_event_loop()
638634
loop.run_until_complete(self.trainer_mode())
639635

640636
async def rollout_mode(self):
@@ -911,11 +907,7 @@ def generate_sequences(self, prompts: DataProto):
911907

912908
timing_generate = {}
913909
if self._is_actor: # For rollout only, we do not switch context.
914-
try:
915-
loop = asyncio.get_event_loop()
916-
except RuntimeError:
917-
loop = asyncio.new_event_loop()
918-
asyncio.set_event_loop(loop)
910+
loop = get_event_loop()
919911
loop.run_until_complete(self.rollout_mode())
920912
log_gpu_memory_usage("After switch to rollout mode", logger=logger)
921913

verl/workers/megatron_workers.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
The main entry point to run the PPO algorithm
1616
"""
1717

18-
import asyncio
1918
import datetime
2019
import logging
2120
import os
@@ -69,6 +68,7 @@
6968
simple_timer,
7069
)
7170
from verl.utils.profiler.performance import reduce_timing, topk_reduce_ratio_min_max
71+
from verl.utils.ray_utils import get_event_loop
7272
from verl.workers.actor.megatron_actor import MegatronPPOActor
7373
from verl.workers.config import HFModelConfig, McoreCriticConfig, RolloutConfig
7474
from verl.workers.critic.megatron_critic import MegatronPPOCritic
@@ -444,7 +444,7 @@ def _build_rollout(self, trust_remote_code=False):
444444
# For sync mode, we directly switch to trainer mode here.
445445
# For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager.
446446
if rollout_config.mode == "sync" and self._is_actor:
447-
loop = asyncio.get_event_loop()
447+
loop = get_event_loop()
448448
loop.run_until_complete(self.trainer_mode())
449449

450450
@register(dispatch_mode=Dispatch.ONE_TO_ALL)
@@ -674,11 +674,7 @@ def generate_sequences(self, prompts: DataProto):
674674

675675
timing_generate = {}
676676
if self._is_actor: # For rollout only, we do not switch context.
677-
try:
678-
loop = asyncio.get_event_loop()
679-
except RuntimeError:
680-
loop = asyncio.new_event_loop()
681-
asyncio.set_event_loop(loop)
677+
loop = get_event_loop()
682678
loop.run_until_complete(self.rollout_mode())
683679
log_gpu_memory_usage("After switch to rollout mode", logger=logger)
684680

0 commit comments

Comments
 (0)