From 6682b97c6f53011befdbf0dcba1592c73e0447dc Mon Sep 17 00:00:00 2001 From: "hzji210@gmail.com" Date: Thu, 9 Oct 2025 10:24:55 +0800 Subject: [PATCH 1/3] replace run_until_complete with asyncio.run --- verl/workers/fsdp_workers.py | 8 +++----- verl/workers/megatron_workers.py | 8 +++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/verl/workers/fsdp_workers.py b/verl/workers/fsdp_workers.py index 2f231250ea4..8a6cb762dbb 100644 --- a/verl/workers/fsdp_workers.py +++ b/verl/workers/fsdp_workers.py @@ -625,8 +625,7 @@ def _build_rollout(self, trust_remote_code=False): # For sync mode, we directly switch to trainer mode here. # For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager. if rollout_config.mode == "sync" and self._is_actor: - loop = asyncio.get_event_loop() - loop.run_until_complete(self.trainer_mode()) + asyncio.run(self.trainer_mode()) async def rollout_mode(self): """Context switch hybridengine to rollout mode.""" @@ -902,15 +901,14 @@ def generate_sequences(self, prompts: DataProto): timing_generate = {} if self._is_actor: # For rollout only, we do not switch context. - loop = asyncio.get_event_loop() - loop.run_until_complete(self.rollout_mode()) + asyncio.run(self.rollout_mode()) log_gpu_memory_usage("After switch to rollout mode", logger=logger) with simple_timer("generate_sequences", timing_generate): output = self.rollout.generate_sequences(prompts=prompts) if self._is_actor: - loop.run_until_complete(self.trainer_mode()) + asyncio.run(self.trainer_mode()) log_gpu_memory_usage("After switch to trainer mode", logger=logger) # We calculate the average timing across all ranks diff --git a/verl/workers/megatron_workers.py b/verl/workers/megatron_workers.py index b85c7d95550..f60cf697731 100644 --- a/verl/workers/megatron_workers.py +++ b/verl/workers/megatron_workers.py @@ -439,8 +439,7 @@ def _build_rollout(self, trust_remote_code=False): # For sync mode, we directly switch to trainer mode here. # For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager. if rollout_config.mode == "sync" and self._is_actor: - loop = asyncio.get_event_loop() - loop.run_until_complete(self.trainer_mode()) + asyncio.run(self.trainer_mode()) @register(dispatch_mode=Dispatch.ONE_TO_ALL) def init_model(self): @@ -669,15 +668,14 @@ def generate_sequences(self, prompts: DataProto): timing_generate = {} if self._is_actor: # For rollout only, we do not switch context. - loop = asyncio.get_event_loop() - loop.run_until_complete(self.rollout_mode()) + asyncio.run(self.rollout_mode()) log_gpu_memory_usage("After switch to rollout mode", logger=logger) with simple_timer("generate_sequences", timing_generate): output = self.rollout.generate_sequences(prompts=prompts) if self._is_actor: - loop.run_until_complete(self.trainer_mode()) + asyncio.run(self.trainer_mode()) log_gpu_memory_usage("After switch to trainer mode", logger=logger) # We calculate the average timing across all ranks From 3a28c36f2a544dd3c9366d1def3e33e4d2113e09 Mon Sep 17 00:00:00 2001 From: "hzji210@gmail.com" Date: Thu, 9 Oct 2025 12:22:38 +0800 Subject: [PATCH 2/3] format --- verl/workers/rollout/schemas.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/verl/workers/rollout/schemas.py b/verl/workers/rollout/schemas.py index 0d7f6202628..b640ba64a77 100644 --- a/verl/workers/rollout/schemas.py +++ b/verl/workers/rollout/schemas.py @@ -181,8 +181,8 @@ def initialize_request(cls, values): # Only log the warning to avoid truncating in the middle of generation prompt. Consider raising an # error for this case in the future. # Ensure batch_data_id exists with default value if not provided - if 'batch_data_id' not in values: - values['batch_data_id'] = cls.model_fields['batch_data_id'].default + if "batch_data_id" not in values: + values["batch_data_id"] = cls.model_fields["batch_data_id"].default logger.warning( f"Prompt {values['batch_data_id']} has length {values['input_ids'].shape[-1]} " f"which is greater than max_prompt_len {max_prompt_len} after applied chat template with tools." From c46e6369dc25f970c5815e58191183cc4f09891a Mon Sep 17 00:00:00 2001 From: "hzji210@gmail.com" Date: Thu, 9 Oct 2025 15:48:18 +0800 Subject: [PATCH 3/3] fix --- verl/workers/fsdp_workers.py | 12 +++++++++--- verl/workers/megatron_workers.py | 12 +++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/verl/workers/fsdp_workers.py b/verl/workers/fsdp_workers.py index 8a6cb762dbb..0e5e5c41c15 100644 --- a/verl/workers/fsdp_workers.py +++ b/verl/workers/fsdp_workers.py @@ -625,7 +625,8 @@ def _build_rollout(self, trust_remote_code=False): # For sync mode, we directly switch to trainer mode here. # For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager. if rollout_config.mode == "sync" and self._is_actor: - asyncio.run(self.trainer_mode()) + loop = asyncio.get_event_loop() + loop.run_until_complete(self.trainer_mode()) async def rollout_mode(self): """Context switch hybridengine to rollout mode.""" @@ -901,14 +902,19 @@ def generate_sequences(self, prompts: DataProto): timing_generate = {} if self._is_actor: # For rollout only, we do not switch context. - asyncio.run(self.rollout_mode()) + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.rollout_mode()) log_gpu_memory_usage("After switch to rollout mode", logger=logger) with simple_timer("generate_sequences", timing_generate): output = self.rollout.generate_sequences(prompts=prompts) if self._is_actor: - asyncio.run(self.trainer_mode()) + loop.run_until_complete(self.trainer_mode()) log_gpu_memory_usage("After switch to trainer mode", logger=logger) # We calculate the average timing across all ranks diff --git a/verl/workers/megatron_workers.py b/verl/workers/megatron_workers.py index f60cf697731..194c829b7ce 100644 --- a/verl/workers/megatron_workers.py +++ b/verl/workers/megatron_workers.py @@ -439,7 +439,8 @@ def _build_rollout(self, trust_remote_code=False): # For sync mode, we directly switch to trainer mode here. # For async mode, we can't call run_until_complete here, so we will switch to trainer mode in AgentLoopManager. if rollout_config.mode == "sync" and self._is_actor: - asyncio.run(self.trainer_mode()) + loop = asyncio.get_event_loop() + loop.run_until_complete(self.trainer_mode()) @register(dispatch_mode=Dispatch.ONE_TO_ALL) def init_model(self): @@ -668,14 +669,19 @@ def generate_sequences(self, prompts: DataProto): timing_generate = {} if self._is_actor: # For rollout only, we do not switch context. - asyncio.run(self.rollout_mode()) + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.rollout_mode()) log_gpu_memory_usage("After switch to rollout mode", logger=logger) with simple_timer("generate_sequences", timing_generate): output = self.rollout.generate_sequences(prompts=prompts) if self._is_actor: - asyncio.run(self.trainer_mode()) + loop.run_until_complete(self.trainer_mode()) log_gpu_memory_usage("After switch to trainer mode", logger=logger) # We calculate the average timing across all ranks