diff --git a/examples/split_placement/config/ppo_trainer_split.yaml b/examples/split_placement/config/ppo_trainer_split.yaml index 36af6359e31..a27afb15083 100644 --- a/examples/split_placement/config/ppo_trainer_split.yaml +++ b/examples/split_placement/config/ppo_trainer_split.yaml @@ -183,5 +183,6 @@ trainer: default_hdfs_dir: null default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name} -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. diff --git a/examples/split_placement/main_ppo_split.py b/examples/split_placement/main_ppo_split.py index 0d17832a6d6..d8d880adee9 100644 --- a/examples/split_placement/main_ppo_split.py +++ b/examples/split_placement/main_ppo_split.py @@ -18,6 +18,7 @@ import hydra import ray import torch +from omegaconf import OmegaConf from split_monkey_patch import fit from verl import DataProto @@ -94,10 +95,13 @@ def __call__(self, data: DataProto, return_dict: bool = False): def main(config): if not ray.is_initialized(): # this is for local ray cluster - ray.init( - runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}, - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}} + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) ray.get(main_task.remote(config)) diff --git a/recipe/dapo/main_dapo.py b/recipe/dapo/main_dapo.py index 1a68a8d4511..abb8de8b5eb 100644 --- a/recipe/dapo/main_dapo.py +++ b/recipe/dapo/main_dapo.py @@ -36,12 +36,15 @@ def main(config): def run_ppo(config) -> None: if not ray.is_initialized(): # this is for local ray cluster - ray.init( - runtime_env={ - "env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"} - }, - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = { + "env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"} + } + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) if ( is_cuda_available diff --git a/recipe/entropy/main_entropy.py b/recipe/entropy/main_entropy.py index ffed8e4235a..f662e89f2ba 100644 --- a/recipe/entropy/main_entropy.py +++ b/recipe/entropy/main_entropy.py @@ -17,6 +17,7 @@ import hydra import ray +from omegaconf import OmegaConf from .entropy_ray_trainer import RayEntropyTrainer from .reward import load_reward_manager @@ -30,17 +31,20 @@ def main(config): def run_ppo(config) -> None: if not ray.is_initialized(): # this is for local ray cluster - ray.init( - runtime_env={ - "env_vars": { - "TOKENIZERS_PARALLELISM": "true", - "NCCL_DEBUG": "WARN", - "VLLM_LOGGING_LEVEL": "WARN", - "WANDB_API_KEY": "YOUR_WANDB_API_KEY", - } - }, - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = { + "env_vars": { + "TOKENIZERS_PARALLELISM": "true", + "NCCL_DEBUG": "WARN", + "VLLM_LOGGING_LEVEL": "WARN", + "WANDB_API_KEY": "YOUR_WANDB_API_KEY", + } + } + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) runner = TaskRunner.remote() ray.get(runner.run.remote(config)) diff --git a/recipe/one_step_off_policy/main_ppo.py b/recipe/one_step_off_policy/main_ppo.py index 66d3aabb032..ea869b5489f 100644 --- a/recipe/one_step_off_policy/main_ppo.py +++ b/recipe/one_step_off_policy/main_ppo.py @@ -43,10 +43,13 @@ def run_ppo(config) -> None: # Set environment variables in the runtime environment to control tokenizer parallelism, # NCCL debug level, VLLM logging level, and allow runtime LoRA updating # `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration - ray.init( - runtime_env=get_ppo_ray_runtime_env(), - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = get_ppo_ray_runtime_env() + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) # Create a remote instance of the TaskRunner class, and # Execute the `run` method of the TaskRunner instance remotely and wait for it to complete @@ -63,7 +66,7 @@ def run_ppo(config) -> None: # [Optional] get the path of the timeline trace file from the configuration, default to None # This file is used for performance analysis - timeline_json_file = config.ray_init.get("timeline_json_file", None) + timeline_json_file = config.ray_kwargs.get("timeline_json_file", None) if timeline_json_file: ray.timeline(filename=timeline_json_file) diff --git a/recipe/prime/main_prime.py b/recipe/prime/main_prime.py index 687bc6e421a..4c3ed6e6d9e 100644 --- a/recipe/prime/main_prime.py +++ b/recipe/prime/main_prime.py @@ -31,6 +31,7 @@ import hydra import ray +from omegaconf import OmegaConf from .prime_ray_trainer import RayPRIMETrainer @@ -42,11 +43,14 @@ def main(config): def run_prime(config, compute_score=None): if not ray.is_initialized(): + default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}} + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") # this is for local ray cluster - ray.init( - runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}, - num_cpus=config.ray_init.num_cpus, - ) + ray.init(**OmegaConf.to_container(ray_init_kwargs)) ray.get(main_task.remote(config, compute_score)) diff --git a/recipe/r1/config/evaluation.yaml b/recipe/r1/config/evaluation.yaml index 1bd9f4e9319..4fe664ae43a 100644 --- a/recipe/r1/config/evaluation.yaml +++ b/recipe/r1/config/evaluation.yaml @@ -9,5 +9,6 @@ custom_reward_function: path: null name: compute_score -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. \ No newline at end of file +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. \ No newline at end of file diff --git a/recipe/r1/main_eval.py b/recipe/r1/main_eval.py index b9c03791bc5..5c0e735a1a5 100644 --- a/recipe/r1/main_eval.py +++ b/recipe/r1/main_eval.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd import ray +from omegaconf import OmegaConf from tqdm import tqdm from verl.trainer.ppo.reward import get_custom_reward_fn @@ -49,7 +50,7 @@ def main(config): # Initialize Ray if not ray.is_initialized(): - ray.init(num_cpus=config.ray_init.num_cpus) + ray.init(**OmegaConf.to_container(config.ray_kwargs.get("ray_init", {}))) # evaluate test_score based on data source data_source_reward = defaultdict(list) diff --git a/recipe/sppo/main_sppo.py b/recipe/sppo/main_sppo.py index a96fc28873d..eb080eba06b 100644 --- a/recipe/sppo/main_sppo.py +++ b/recipe/sppo/main_sppo.py @@ -21,6 +21,7 @@ import hydra import ray +from omegaconf import OmegaConf from verl.trainer.ppo.reward import load_reward_manager @@ -38,12 +39,15 @@ def run_ppo(config) -> None: os.environ["ENSURE_CUDA_VISIBLE_DEVICES"] = os.environ.get("CUDA_VISIBLE_DEVICES", "") if not ray.is_initialized(): # this is for local ray cluster - ray.init( - runtime_env={ - "env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"} - }, - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = { + "env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"} + } + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) runner = TaskRunner.remote() ray.get(runner.run.remote(config)) diff --git a/tests/trainer/config/legacy_ppo_megatron_trainer.yaml b/tests/trainer/config/legacy_ppo_megatron_trainer.yaml index 66c82b89cd7..9937e82d890 100644 --- a/tests/trainer/config/legacy_ppo_megatron_trainer.yaml +++ b/tests/trainer/config/legacy_ppo_megatron_trainer.yaml @@ -460,6 +460,7 @@ trainer: with_stack: False analysis: True -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. timeline_json_file: null diff --git a/tests/trainer/config/legacy_ppo_trainer.yaml b/tests/trainer/config/legacy_ppo_trainer.yaml index bfd74bca4a5..ded79cf399b 100644 --- a/tests/trainer/config/legacy_ppo_trainer.yaml +++ b/tests/trainer/config/legacy_ppo_trainer.yaml @@ -1103,11 +1103,13 @@ trainer: # Device to run training on (e.g., "cuda", "cpu") device: cuda -# configs related to ray initialization -ray_init: +# configs related to ray +ray_kwargs: + # configs related to ray initialization + ray_init: - # Number of CPUs for Ray. Use a fixed number instead of null when using SLURM. - num_cpus: null + # Number of CPUs for Ray. Use a fixed number instead of null when using SLURM. + num_cpus: null # Path to save Ray timeline JSON for performance profiling timeline_json_file: null diff --git a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml index b030fddac06..496de8e9dab 100644 --- a/verl/trainer/config/_generated_ppo_megatron_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_megatron_trainer.yaml @@ -460,6 +460,7 @@ global_profiler: capture-range: cudaProfilerApi capture-range-end: null kill: none -ray_init: - num_cpus: null +ray_kwargs: + ray_init: + num_cpus: null timeline_json_file: null diff --git a/verl/trainer/config/_generated_ppo_trainer.yaml b/verl/trainer/config/_generated_ppo_trainer.yaml index f90422bb5b2..f66d6a6e8df 100644 --- a/verl/trainer/config/_generated_ppo_trainer.yaml +++ b/verl/trainer/config/_generated_ppo_trainer.yaml @@ -426,6 +426,7 @@ global_profiler: capture-range: cudaProfilerApi capture-range-end: null kill: none -ray_init: - num_cpus: null +ray_kwargs: + ray_init: + num_cpus: null timeline_json_file: null diff --git a/verl/trainer/config/evaluation.yaml b/verl/trainer/config/evaluation.yaml index efca03da404..6a88d77f1e7 100644 --- a/verl/trainer/config/evaluation.yaml +++ b/verl/trainer/config/evaluation.yaml @@ -9,6 +9,7 @@ custom_reward_function: path: null name: compute_score -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. timeline_json_file: null diff --git a/verl/trainer/config/generation.yaml b/verl/trainer/config/generation.yaml index c5d66218bb2..cfbdd35f098 100644 --- a/verl/trainer/config/generation.yaml +++ b/verl/trainer/config/generation.yaml @@ -51,6 +51,7 @@ actor: fsdp_size: -1 forward_prefetch: False # FSDP1 forward_prefetch configuration -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. timeline_json_file: null diff --git a/verl/trainer/config/ppo_megatron_trainer.yaml b/verl/trainer/config/ppo_megatron_trainer.yaml index cdc9af6f2a5..b73d995c635 100644 --- a/verl/trainer/config/ppo_megatron_trainer.yaml +++ b/verl/trainer/config/ppo_megatron_trainer.yaml @@ -165,6 +165,7 @@ global_profiler: # Send signal to the target application's process group. We let the program to exit by itself. kill: none -ray_init: - num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. +ray_kwargs: + ray_init: + num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then. timeline_json_file: null diff --git a/verl/trainer/config/ppo_trainer.yaml b/verl/trainer/config/ppo_trainer.yaml index 99c4cf14fcf..0e521f85881 100644 --- a/verl/trainer/config/ppo_trainer.yaml +++ b/verl/trainer/config/ppo_trainer.yaml @@ -338,11 +338,14 @@ global_profiler: # Send signal to the target application's process group. We let the program to exit by itself. kill: none -# configs related to ray initialization -ray_init: +# configs related to ray +ray_kwargs: - # Number of CPUs for Ray. Use a fixed number instead of null when using SLURM. - num_cpus: null + # configs related to ray initialization + ray_init: + + # Number of CPUs for Ray. Use a fixed number instead of null when using SLURM. + num_cpus: null # Path to save Ray timeline JSON for performance profiling timeline_json_file: null \ No newline at end of file diff --git a/verl/trainer/main_eval.py b/verl/trainer/main_eval.py index 0a5c5817717..579adbf803f 100644 --- a/verl/trainer/main_eval.py +++ b/verl/trainer/main_eval.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd import ray +from omegaconf import OmegaConf from tqdm import tqdm from verl.trainer.ppo.reward import get_custom_reward_fn @@ -48,7 +49,7 @@ def main(config): # Initialize Ray if not ray.is_initialized(): - ray.init(num_cpus=config.ray_init.num_cpus) + ray.init(**OmegaConf.to_container(config.ray_kwargs.get("ray_init", {}))) # evaluate test_score based on data source data_source_reward = defaultdict(list) diff --git a/verl/trainer/main_generation.py b/verl/trainer/main_generation.py index fdaff2905a1..791c17af7ef 100644 --- a/verl/trainer/main_generation.py +++ b/verl/trainer/main_generation.py @@ -48,10 +48,13 @@ def main(config): def run_generation(config) -> None: if not ray.is_initialized(): # this is for local ray cluster - ray.init( - runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}, - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}} + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) ray.get(main_task.remote(config)) diff --git a/verl/trainer/main_ppo.py b/verl/trainer/main_ppo.py index 3ed3739cf24..f7d8825b57d 100644 --- a/verl/trainer/main_ppo.py +++ b/verl/trainer/main_ppo.py @@ -55,10 +55,13 @@ def run_ppo(config) -> None: # Set environment variables in the runtime environment to control tokenizer parallelism, # NCCL debug level, VLLM logging level, and allow runtime LoRA updating # `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration - ray.init( - runtime_env=get_ppo_ray_runtime_env(), - num_cpus=config.ray_init.num_cpus, - ) + default_runtime_env = get_ppo_ray_runtime_env() + ray_init_kwargs = config.ray_kwargs.get("ray_init", {}) + runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {}) + runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs) + ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env}) + print(f"ray init kwargs: {ray_init_kwargs}") + ray.init(**OmegaConf.to_container(ray_init_kwargs)) # Create a remote instance of the TaskRunner class, and # Execute the `run` method of the TaskRunner instance remotely and wait for it to complete @@ -81,7 +84,7 @@ def run_ppo(config) -> None: # [Optional] get the path of the timeline trace file from the configuration, default to None # This file is used for performance analysis - timeline_json_file = config.ray_init.get("timeline_json_file", None) + timeline_json_file = config.ray_kwargs.get("timeline_json_file", None) if timeline_json_file: ray.timeline(filename=timeline_json_file)