Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/model.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ jobs:
- name: Download model config files
run: |
hf download Qwen/Qwen2.5-0.5B-Instruct --local-dir $HOME/models/Qwen/Qwen2.5-0.5B-Instruct
hf download Skywork/Skywork-Reward-V2-Qwen3-0.6B --local-dir $HOME/models/Skywork/Skywork-Reward-V2-Qwen3-0.6B

- name: Running mcore engine tests on 8 L20 GPUs
run: |
Expand Down
111 changes: 107 additions & 4 deletions tests/models/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,27 @@
import pytest
import ray
import torch
from transformers import AutoModelForCausalLM
from transformers import AutoModelForCausalLM, AutoModelForTokenClassification

from verl import DataProto
from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup
from verl.utils.model import compute_position_id_with_mask, create_random_mask
from verl.utils.torch_functional import logprobs_from_logits_naive
from verl.workers.config import (
ActorConfig,
CriticConfig,
FSDPEngineConfig,
FSDPOptimizerConfig,
HFModelConfig,
McoreEngineConfig,
McoreOptimizerConfig,
)
from verl.workers.roles import ActorWorker
from verl.workers.roles import ActorWorker, CriticWorker
from verl.workers.roles.utils.losses import ppo_loss, sft_loss


@pytest.mark.parametrize("strategy", ["megatron", "fsdp", "fsdp2"])
def test_mcore_engine(strategy):
def test_actor_engine(strategy):
ray.init()

path = os.path.expanduser("~/models/Qwen/Qwen2.5-0.5B-Instruct")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Hardcoding the model path using os.path.expanduser makes this test dependent on a specific local file system setup. This will cause the test to fail for other developers or in CI environments where ~/models/Qwen/Qwen2.5-0.5B-Instruct does not exist. For large, pre-existing models, consider making the path configurable via an environment variable or a test configuration file, and provide instructions for setting it up. This will improve the test's portability and reproducibility.

Expand Down Expand Up @@ -72,7 +73,7 @@ def test_mcore_engine(strategy):
ppo_mini_batch_size=4,
optim=optimizer_config,
use_dynamic_bsz=True,
n=1,
rollout_n=1,
)
ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(ActorWorker), config=config)
resource_pool = RayResourcePool(process_on_nodes=[8])
Expand Down Expand Up @@ -151,3 +152,105 @@ def test_mcore_engine(strategy):
print(ppo_metrics)

ray.shutdown()


@pytest.mark.parametrize("strategy", ["fsdp", "fsdp2"])
def test_critic_engine(strategy):
ray.init()

path = os.path.expanduser("~/models/Skywork/Skywork-Reward-V2-Qwen3-0.6B")
model_config = HFModelConfig(path=path)

strategy = "fsdp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The strategy parameter from pytest.mark.parametrize is being overwritten on this line. This will cause the test to run with strategy="fsdp" for all parameterized cases, and the fsdp2 case will not be tested. The if/elif/else block for strategy selection will also not function as intended. Please remove this line to allow the test to correctly use the parameterized strategy values.


if strategy == "megatron":
engine_config = McoreEngineConfig(
forward_only=False,
use_mbridge=False,
tensor_model_parallel_size=2,
pipeline_model_parallel_size=2,
context_parallel_size=2,
)
optimizer_config = McoreOptimizerConfig(lr_decay_steps=10)
elif strategy in ["fsdp", "fsdp2"]:
engine_config = FSDPEngineConfig(
forward_only=False, fsdp_size=4, strategy=strategy, ulysses_sequence_parallel_size=2
)
optimizer_config = FSDPOptimizerConfig()
else:
raise NotImplementedError(f"strategy {strategy} is not supported")

config = CriticConfig(
model_config=model_config,
engine=engine_config,
strategy=strategy,
ppo_micro_batch_size_per_gpu=256,
ppo_mini_batch_size=4,
optim=optimizer_config,
use_dynamic_bsz=True,
rollout_n=1,
)
ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(CriticWorker), config=config)
resource_pool = RayResourcePool(process_on_nodes=[8])
wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init)
# init model
wg.init_model()

batch_size = 8
seqlen = 32

response_length = seqlen // 2

torch.manual_seed(1)
np.random.seed(1)

input_ids = torch.randint(0, model_config.hf_config.vocab_size, (batch_size, seqlen))
attention_mask = create_random_mask(
input_ids=input_ids, max_ratio_of_valid_token=0.8, max_ratio_of_left_padding=0.2, min_ratio_of_valid_token=0.6
)
position_ids = compute_position_id_with_mask(attention_mask)

global_token_num = torch.sum(attention_mask, dim=-1).tolist()

print(input_ids.float().mean(), attention_mask.float().mean())

responses = input_ids[:, response_length:]
response_mask = attention_mask[:, response_length:]

assert torch.all(response_mask[:, 0] == 1)

data = DataProto.from_single_dict(
{
"input_ids": input_ids,
"attention_mask": attention_mask,
"position_ids": position_ids,
"responses": responses,
"response_mask": response_mask,
},
meta_info={"temperature": 1.0, "global_token_num": global_token_num},
)

# eval
output = wg.compute_values(data)

# load hf model and compare results with hf model
hf_model = AutoModelForTokenClassification.from_pretrained(path, torch_dtype=torch.bfloat16)
hf_output = hf_model(input_ids, attention_mask=attention_mask)
hf_values = hf_output.logits[:, -response_length - 1 : -1, :].float().squeeze(-1)
hf_values_mean = torch.mean(hf_values * response_mask)

engine_values = torch.mean(output.batch["values"] * response_mask)

torch.testing.assert_close(hf_values_mean, engine_values, atol=1e-3, rtol=1e-2)

data = data.union(output)

# add ppo data
data.batch["values"] = torch.rand_like(responses, dtype=torch.float32)
data.batch["returns"] = torch.rand_like(responses, dtype=torch.float32)

# update again
ppo_metrics = wg.update_critic(data)
print(ppo_metrics)

ray.shutdown()
29 changes: 17 additions & 12 deletions verl/utils/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
from torch import nn
from transformers import (
AutoConfig,
AutoModel,
AutoModelForCausalLM,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
AutoModelForVision2Seq,
GenerationConfig,
MistralForSequenceClassification,
PretrainedConfig,
Expand Down Expand Up @@ -658,13 +662,15 @@ def load_valuehead_model(local_path, torch_dtype, model_config, trust_remote_cod
return model


def get_hf_auto_model_class(hf_config):
from transformers import (
AutoModel,
AutoModelForCausalLM,
AutoModelForVision2Seq,
)
_architecture_to_auto_class = {
"ForCausalLM": AutoModelForCausalLM,
"ForVision2Seq": AutoModelForVision2Seq,
"ForTokenClassification": AutoModelForTokenClassification,
"ForSequenceClassification": AutoModelForSequenceClassification,
}


def get_hf_auto_model_class(hf_config):
has_remote_code = hasattr(hf_config, "auto_map") and any(
hf_config.architectures[0] in val for val in hf_config.auto_map.values()
)
Expand All @@ -678,12 +684,11 @@ def get_hf_auto_model_class(hf_config):
case _:
actor_module_class = AutoModel
else:
if type(hf_config) in AutoModelForVision2Seq._model_mapping.keys():
actor_module_class = AutoModelForVision2Seq
elif type(hf_config) in AutoModelForCausalLM._model_mapping.keys():
actor_module_class = AutoModelForCausalLM
else:
actor_module_class = AutoModel
actor_module_class = AutoModel
for key, cls in _architecture_to_auto_class.items():
if key in hf_config.architectures[0]:
actor_module_class = cls
break

return actor_module_class

Expand Down
4 changes: 2 additions & 2 deletions verl/workers/config/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ class ActorConfig(BaseConfig):
profiler: ProfilerConfig = field(default_factory=ProfilerConfig)
engine: BaseConfig = field(default_factory=BaseConfig)
data_loader_seed = 1
n: int = 1 # must be override by sampling config
rollout_n: int = 1 # must be override by sampling config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this renaming complete in all references to the "n" parameter?

model_config: HFModelConfig = field(default_factory=BaseConfig)

def __post_init__(self):
"""Validate actor configuration parameters."""
assert self.strategy != MISSING
assert self.n != MISSING
assert self.rollout_n != MISSING
if not self.use_dynamic_bsz:
if self.ppo_micro_batch_size is not None and self.ppo_micro_batch_size_per_gpu is not None:
raise ValueError(
Expand Down
8 changes: 7 additions & 1 deletion verl/workers/config/critic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from verl.utils.profiler import ProfilerConfig

from .engine import FSDPEngineConfig, McoreEngineConfig
from .model import HFModelConfig
from .optimizer import OptimizerConfig

__all__ = ["CriticConfig", "FSDPCriticConfig", "McoreCriticConfig", "FSDPCriticModelCfg"]
Expand Down Expand Up @@ -66,14 +67,19 @@ class CriticConfig(BaseConfig):
ppo_mini_batch_size: int = 1
use_dynamic_bsz: bool = False
ppo_max_token_len_per_gpu: int = 32768
# deprecate this
forward_max_token_len_per_gpu: int = 32768
ppo_infer_micro_batch_size_per_gpu: Optional[int] = None
ppo_infer_max_token_len_per_gpu: int = 32768
ppo_epochs: int = 1
data_loader_seed: int = 1
shuffle: bool = True
cliprange_value: float = 0.5
loss_agg_mode: str = "token-mean"
ppo_micro_batch_size: Optional[int] = None
engine: BaseConfig = field(default_factory=BaseConfig)
optim: OptimizerConfig = field(default_factory=OptimizerConfig)
model: BaseModelConfig = field(default_factory=BaseModelConfig)
model_config: HFModelConfig = field(default_factory=BaseConfig)
checkpoint: CheckpointConfig = field(default_factory=CheckpointConfig)
profiler: ProfilerConfig = field(default_factory=ProfilerConfig)

Expand Down
42 changes: 42 additions & 0 deletions verl/workers/engine/fsdp/transformer_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,3 +878,45 @@ def forward_step(self, micro_batch: TensorDict, loss_function, forward_only):
}

return loss, output


@EngineRegistry.register(model_type="value_model", backend=["fsdp", "fsdp2"])
class FSDPEngineWithValueHead(FSDPEngineWithLMHead):
"""
The only difference between critic and actor is how the raw model output is processed
"""

def prepare_model_outputs(self, output, output_args, micro_batch: TensorDict):
use_remove_padding = tu.get_non_tensor_data(data=micro_batch, key="use_remove_padding", default=True)

if use_remove_padding:
input_ids = micro_batch["input_ids"]
batch_size, seqlen = input_ids.shape
response_length = micro_batch["responses"].size(-1)

if hasattr(self.module, "v_head"):
# For trl.AutoModelForCausalLMWithValueHead
values_rmpad = output[2].squeeze(0).unsqueeze(-1)
else:
values_rmpad = output.logits
values_rmpad = values_rmpad.squeeze(0) # (total_nnz)

indices = output_args["indices"]

# gather output if sp > 1
if self.use_ulysses_sp:
pad_size = output_args["pad_size"]
values_rmpad = gather_outputs_and_unpad(values_rmpad, gather_dim=0, unpad_dim=0, padding_size=pad_size)

# pad it back
values = pad_input(values_rmpad, indices=indices, batch=batch_size, seqlen=seqlen).squeeze(-1)
values = values[:, -response_length - 1 : -1]
else:
if hasattr(self.module, "v_head"):
# For trl.AutoModelForCausalLMWithValueHead
values = output[2]
else:
values = output.logits
values = values[:, -response_length - 1 : -1].squeeze(-1)

return {"values": values}
48 changes: 19 additions & 29 deletions verl/workers/roles/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,21 @@ def __init__(self, config: ActorConfig):
self.loss_fn = partial(ppo_loss, config=self.config)

def _build_engine(self):
model_config = self.config.model_config
engine_config = self.config.engine
optimizer_config = self.config.optim
checkpoint_config = self.config.checkpoint

if self.config.strategy == "megatron":
from verl.workers.engine.megatron.transformer_impl import MegatronEngineWithLMHead

self.engine = MegatronEngineWithLMHead(
model_config=model_config,
engine_config=engine_config,
optimizer_config=optimizer_config,
checkpoint_config=checkpoint_config,
)
elif self.config.strategy in ["fsdp", "fsdp2"]:
from verl.workers.engine.fsdp.transformer_impl import FSDPEngineWithLMHead

self.engine = FSDPEngineWithLMHead(
model_config=model_config,
engine_config=engine_config,
optimizer_config=optimizer_config,
checkpoint_config=checkpoint_config,
)
else:
raise ValueError(f"Unknown strategy {self.config.strategy}")
self.model_config = self.config.model_config
self.engine_config = self.config.engine
self.optimizer_config = self.config.optim
self.checkpoint_config = self.config.checkpoint

from verl.workers.engine import BaseEngine, EngineRegistry

self.engine: BaseEngine = EngineRegistry.new(
model_type="language_model",
backend=self.config.strategy,
model_config=self.model_config,
engine_config=self.engine_config,
optimizer_config=self.optimizer_config,
checkpoint_config=self.checkpoint_config,
)

# build dispatch info
self._register_dispatch_collect_info(
Expand All @@ -95,14 +85,14 @@ def _build_engine(self):
)

# aggregate with bon sampling
self.ppo_mini_batch_size = self.config.ppo_mini_batch_size * self.config.n
self.ppo_mini_batch_size = self.config.ppo_mini_batch_size * self.config.rollout_n
assert self.ppo_mini_batch_size % self.engine.get_data_parallel_size() == 0, (
f"{self.ppo_mini_batch_size=} is not divisible by {self.engine.get_data_parallel_size()=}"
)
self.ppo_mini_batch_size_per_dp = self.ppo_mini_batch_size // self.engine.get_data_parallel_size()

# setup flops counter
self.flops_counter = FlopsCounter(model_config.hf_config)
self.flops_counter = FlopsCounter(self.model_config.hf_config)

@register(dispatch_mode=Dispatch.ONE_TO_ALL)
def init_model(self):
Expand All @@ -128,9 +118,9 @@ def compute_log_prob(self, data: DataProto):
# TODO: make worker API to accept TensorDict as well
data = data.to_tensordict()
output = self.engine.infer_batch(data)
output = output.get("model_output", {})

if "log_probs" in output and "entropy" in output:
if self.engine.is_mp_src_rank_with_outputs():
output = output["model_output"]
# in megatron, only last pp contains valid data and returned to the single controller
output = DataProto.from_dict(
tensors={"old_log_probs": output["log_probs"].float(), "entropy": output["entropy"].float()},
Expand Down
Loading
Loading