Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/split_placement/config/ppo_trainer_split.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,6 @@ trainer:
default_hdfs_dir: null
default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name}

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
12 changes: 8 additions & 4 deletions examples/split_placement/main_ppo_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import hydra
import ray
import torch
from omegaconf import OmegaConf
from split_monkey_patch import fit

from verl import DataProto
Expand Down Expand Up @@ -94,10 +95,13 @@ def __call__(self, data: DataProto, return_dict: bool = False):
def main(config):
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

ray.get(main_task.remote(config))

Expand Down
15 changes: 9 additions & 6 deletions recipe/dapo/main_dapo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ def main(config):
def run_ppo(config) -> None:
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
},
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = {
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

if (
is_cuda_available
Expand Down
26 changes: 15 additions & 11 deletions recipe/entropy/main_entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from .entropy_ray_trainer import RayEntropyTrainer
from .reward import load_reward_manager
Expand All @@ -30,17 +31,20 @@ def main(config):
def run_ppo(config) -> None:
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN",
"WANDB_API_KEY": "YOUR_WANDB_API_KEY",
}
},
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = {
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN",
"WANDB_API_KEY": "YOUR_WANDB_API_KEY",
}
}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
Expand Down
13 changes: 8 additions & 5 deletions recipe/one_step_off_policy/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def run_ppo(config) -> None:
# Set environment variables in the runtime environment to control tokenizer parallelism,
# NCCL debug level, VLLM logging level, and allow runtime LoRA updating
# `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration
ray.init(
runtime_env=get_ppo_ray_runtime_env(),
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = get_ppo_ray_runtime_env()
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

# Create a remote instance of the TaskRunner class, and
# Execute the `run` method of the TaskRunner instance remotely and wait for it to complete
Expand All @@ -63,7 +66,7 @@ def run_ppo(config) -> None:

# [Optional] get the path of the timeline trace file from the configuration, default to None
# This file is used for performance analysis
timeline_json_file = config.ray_init.get("timeline_json_file", None)
timeline_json_file = config.ray_kwargs.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)

Expand Down
12 changes: 8 additions & 4 deletions recipe/prime/main_prime.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from .prime_ray_trainer import RayPRIMETrainer

Expand All @@ -42,11 +43,14 @@ def main(config):

def run_prime(config, compute_score=None):
if not ray.is_initialized():
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**OmegaConf.to_container(ray_init_kwargs))

ray.get(main_task.remote(config, compute_score))

Expand Down
5 changes: 3 additions & 2 deletions recipe/r1/config/evaluation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ custom_reward_function:
path: null
name: compute_score

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
3 changes: 2 additions & 1 deletion recipe/r1/main_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import numpy as np
import pandas as pd
import ray
from omegaconf import OmegaConf
from tqdm import tqdm

from verl.trainer.ppo.reward import get_custom_reward_fn
Expand All @@ -49,7 +50,7 @@ def main(config):

# Initialize Ray
if not ray.is_initialized():
ray.init(num_cpus=config.ray_init.num_cpus)
ray.init(**OmegaConf.to_container(config.ray_kwargs.get("ray_init", {})))

# evaluate test_score based on data source
data_source_reward = defaultdict(list)
Expand Down
16 changes: 10 additions & 6 deletions recipe/sppo/main_sppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from verl.trainer.ppo.reward import load_reward_manager

Expand All @@ -38,12 +39,15 @@ def run_ppo(config) -> None:
os.environ["ENSURE_CUDA_VISIBLE_DEVICES"] = os.environ.get("CUDA_VISIBLE_DEVICES", "")
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
},
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = {
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
Expand Down
5 changes: 3 additions & 2 deletions tests/trainer/config/legacy_ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ trainer:
with_stack: False
analysis: True

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
10 changes: 6 additions & 4 deletions tests/trainer/config/legacy_ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,11 +1103,13 @@ trainer:
# Device to run training on (e.g., "cuda", "cpu")
device: cuda

# configs related to ray initialization
ray_init:
# configs related to ray
ray_kwargs:
# configs related to ray initialization
ray_init:

# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null
# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null

# Path to save Ray timeline JSON for performance profiling
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/_generated_ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ global_profiler:
capture-range: cudaProfilerApi
capture-range-end: null
kill: none
ray_init:
num_cpus: null
ray_kwargs:
ray_init:
num_cpus: null
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/_generated_ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ global_profiler:
capture-range: cudaProfilerApi
capture-range-end: null
kill: none
ray_init:
num_cpus: null
ray_kwargs:
ray_init:
num_cpus: null
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/evaluation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ custom_reward_function:
path: null
name: compute_score

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/generation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ actor:
fsdp_size: -1
forward_prefetch: False # FSDP1 forward_prefetch configuration

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ global_profiler:
# Send signal to the target application's process group. We let the program to exit by itself.
kill: none

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
11 changes: 7 additions & 4 deletions verl/trainer/config/ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,14 @@ global_profiler:
# Send signal to the target application's process group. We let the program to exit by itself.
kill: none

# configs related to ray initialization
ray_init:
# configs related to ray
ray_kwargs:

# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null
# configs related to ray initialization
ray_init:

# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null

# Path to save Ray timeline JSON for performance profiling
timeline_json_file: null
3 changes: 2 additions & 1 deletion verl/trainer/main_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import numpy as np
import pandas as pd
import ray
from omegaconf import OmegaConf
from tqdm import tqdm

from verl.trainer.ppo.reward import get_custom_reward_fn
Expand All @@ -48,7 +49,7 @@ def main(config):

# Initialize Ray
if not ray.is_initialized():
ray.init(num_cpus=config.ray_init.num_cpus)
ray.init(**OmegaConf.to_container(config.ray_kwargs.get("ray_init", {})))

# evaluate test_score based on data source
data_source_reward = defaultdict(list)
Expand Down
11 changes: 7 additions & 4 deletions verl/trainer/main_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ def main(config):
def run_generation(config) -> None:
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

ray.get(main_task.remote(config))

Expand Down
13 changes: 8 additions & 5 deletions verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ def run_ppo(config) -> None:
# Set environment variables in the runtime environment to control tokenizer parallelism,
# NCCL debug level, VLLM logging level, and allow runtime LoRA updating
# `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration
ray.init(
runtime_env=get_ppo_ray_runtime_env(),
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = get_ppo_ray_runtime_env()
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**OmegaConf.to_container(ray_init_kwargs))

# Create a remote instance of the TaskRunner class, and
# Execute the `run` method of the TaskRunner instance remotely and wait for it to complete
Expand All @@ -81,7 +84,7 @@ def run_ppo(config) -> None:

# [Optional] get the path of the timeline trace file from the configuration, default to None
# This file is used for performance analysis
timeline_json_file = config.ray_init.get("timeline_json_file", None)
timeline_json_file = config.ray_kwargs.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)

Expand Down
Loading