Skip to content
183 changes: 110 additions & 73 deletions verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,62 +89,22 @@ class TaskRunner:

This class encapsulates the main training logic and runs as a Ray remote actor
to enable distributed execution across multiple nodes and GPUs.
"""

def run(self, config):
"""Execute the main PPO training workflow.

This method sets up the distributed training environment, initializes
workers, datasets, and reward functions, then starts the training process.

Args:
config: Training configuration object containing all parameters needed
for setting up and running the PPO training process.
"""
# Print the initial configuration. `resolve=True` will evaluate symbolic values.
from pprint import pprint

from omegaconf import OmegaConf

from verl.utils.fs import copy_to_local

print(f"TaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()}")
pprint(OmegaConf.to_container(config, resolve=True))
OmegaConf.resolve(config)

# Download the checkpoint from HDFS to the local machine.
# `use_shm` determines whether to use shared memory, which could lead to faster model loading if turned on
local_path = copy_to_local(
config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False)
)

# Instantiate the tokenizer and processor.
from verl.utils import hf_processor, hf_tokenizer
Attributes:
role_worker_mapping: Dictionary mapping Role enums to Ray remote worker classes
mapping: Dictionary mapping Role enums to resource pool IDs for GPU allocation
"""

trust_remote_code = config.data.get("trust_remote_code", False)
tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code)
# Used for multimodal LLM, could be None
processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True)
def __init__(self):
self.role_worker_mapping = {}
self.mapping = {}

# Define worker classes based on the actor strategy.
def add_actor_rollout_worker(self, config):
"""Add actor rollout worker based on the actor strategy."""
if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
assert config.critic.strategy in {"fsdp", "fsdp2"}
from verl.single_controller.ray import RayWorkerGroup
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker

use_legacy_worker_impl = config.trainer.get("use_legacy_worker_impl", "auto")
if use_legacy_worker_impl in ["auto", "enable"]:
# import warnings
# warnings.warn(f"Legacy worker impl is going to be deprecated, will be removed in the future. \
# Please set trainer.use_legacy_worker_impl = false to switch to the new worker implementation.")
from verl.workers.fsdp_workers import CriticWorker
elif use_legacy_worker_impl == "disable":
from verl.workers.roles import CriticWorker

print("Using new worker implementation")
else:
raise ValueError(f"Invalid use_legacy_worker_impl: {use_legacy_worker_impl}")

actor_rollout_cls = (
AsyncActorRolloutRefWorker
if config.actor_rollout_ref.rollout.mode == "async"
Expand All @@ -153,9 +113,8 @@ def run(self, config):
ray_worker_group_cls = RayWorkerGroup

elif config.actor_rollout_ref.actor.strategy == "megatron":
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker

actor_rollout_cls = (
AsyncActorRolloutRefWorker
Expand All @@ -167,45 +126,122 @@ def run(self, config):
else:
raise NotImplementedError

from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role
from verl.trainer.ppo.ray_trainer import Role

# Map roles to their corresponding remote worker classes.
role_worker_mapping = {
Role.ActorRollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}
self.role_worker_mapping[Role.ActorRollout] = ray.remote(actor_rollout_cls)

return actor_rollout_cls, ray_worker_group_cls

def add_critic_worker(self, config):
"""Add critic worker to role mapping."""
if config.critic.strategy in {"fsdp", "fsdp2"}:
use_legacy_worker_impl = config.trainer.get("use_legacy_worker_impl", "auto")
if use_legacy_worker_impl in ["auto", "enable"]:
from verl.workers.fsdp_workers import CriticWorker
elif use_legacy_worker_impl == "disable":
from verl.workers.roles import CriticWorker

print("Using new worker implementation")
else:
raise ValueError(f"Invalid use_legacy_worker_impl: {use_legacy_worker_impl}")

elif config.critic.strategy == "megatron":
from verl.workers.megatron_workers import CriticWorker

else:
raise NotImplementedError

from verl.trainer.ppo.ray_trainer import Role

self.role_worker_mapping[Role.Critic] = ray.remote(CriticWorker)

def init_resource_pool_mgr(self, config):
"""Initialize resource pool manager."""
from verl.trainer.ppo.ray_trainer import Role

# Define the resource pool specification.
# Map roles to the resource pool.
global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
self.mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
}
from verl.trainer.ppo.ray_trainer import ResourcePoolManager

resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=self.mapping)
return resource_pool_manager

def add_reward_model_worker(self, config):
"""Add reward model worker if enabled."""
from verl.trainer.ppo.ray_trainer import Role

# We should adopt a multi-source reward function here:
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# finally, we combine all the rewards together
# The reward type depends on the tag of the data
if config.reward_model.enable:
if config.reward_model.strategy in {"fsdp", "fsdp2"}:
from verl.workers.fsdp_workers import RewardModelWorker
elif config.reward_model.strategy == "megatron":
from verl.workers.megatron_workers import RewardModelWorker
else:
raise NotImplementedError
role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
mapping[Role.RewardModel] = global_pool_id
self.role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
self.mapping[Role.RewardModel] = "global_pool"

def add_ref_policy_worker(self, config, ref_policy_cls):
"""Add reference policy worker if KL loss or KL reward is used."""
from verl.trainer.ppo.ray_trainer import Role

# Add a reference policy worker if KL loss or KL reward is used.
if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker)
mapping[Role.RefPolicy] = global_pool_id
self.role_worker_mapping[Role.RefPolicy] = ray.remote(ref_policy_cls)
self.mapping[Role.RefPolicy] = "global_pool"

def run(self, config):
"""Execute the main PPO training workflow.

This method sets up the distributed training environment, initializes
workers, datasets, and reward functions, then starts the training process.

Args:
config: Training configuration object containing all parameters needed
for setting up and running the PPO training process.
"""
# Print the initial configuration. `resolve=True` will evaluate symbolic values.
from pprint import pprint

from omegaconf import OmegaConf

from verl.utils.fs import copy_to_local

print(f"TaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()}")
pprint(OmegaConf.to_container(config, resolve=True))
OmegaConf.resolve(config)

# Download the checkpoint from HDFS to the local machine.
# `use_shm` determines whether to use shared memory, which could lead to faster model loading if turned on
local_path = copy_to_local(
config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False)
)

# Instantiate the tokenizer and processor.
from verl.utils import hf_processor, hf_tokenizer

trust_remote_code = config.data.get("trust_remote_code", False)
tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code)
# Used for multimodal LLM, could be None
processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True)

actor_rollout_cls, ray_worker_group_cls = self.add_actor_rollout_worker(config)
self.add_critic_worker(config)

# We should adopt a multi-source reward function here:
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# finally, we combine all the rewards together
# The reward type depends on the tag of the data
self.add_reward_model_worker(config)

# Add a reference policy worker if KL loss or KL reward is used.
self.add_ref_policy_worker(config, actor_rollout_cls)

# Load the reward manager for training and validation.
reward_fn = load_reward_manager(
Expand All @@ -214,7 +250,8 @@ def run(self, config):
val_reward_fn = load_reward_manager(
config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {})
)
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

resource_pool_manager = self.init_resource_pool_mgr(config)

from verl.utils.dataset.rl_dataset import collate_fn

Expand All @@ -228,7 +265,7 @@ def run(self, config):
config=config,
tokenizer=tokenizer,
processor=processor,
role_worker_mapping=role_worker_mapping,
role_worker_mapping=self.role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
Expand Down
Loading