Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/split_placement/config/ppo_trainer_split.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,6 @@ trainer:
default_hdfs_dir: null
default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name}

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
12 changes: 8 additions & 4 deletions examples/split_placement/main_ppo_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import hydra
import ray
import torch
from omegaconf import OmegaConf
from split_monkey_patch import fit

from verl import DataProto
Expand Down Expand Up @@ -93,11 +94,14 @@ def __call__(self, data: DataProto, return_dict: bool = False):
@hydra.main(config_path="config", config_name="ppo_trainer_split", version_base=None)
def main(config):
if not ray.is_initialized():
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

ray.get(main_task.remote(config))

Expand Down
15 changes: 9 additions & 6 deletions recipe/dapo/main_dapo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ def main(config):

def run_ppo(config) -> None:
if not ray.is_initialized():
default_runtime_env = {
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

if (
is_cuda_available
Expand Down
26 changes: 15 additions & 11 deletions recipe/entropy/main_entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from .entropy_ray_trainer import RayEntropyTrainer
from .reward import load_reward_manager
Expand All @@ -29,18 +30,21 @@ def main(config):

def run_ppo(config) -> None:
if not ray.is_initialized():
default_runtime_env = {
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN",
"WANDB_API_KEY": "YOUR_WANDB_API_KEY",
}
}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN",
"WANDB_API_KEY": "YOUR_WANDB_API_KEY",
}
},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
Expand Down
13 changes: 8 additions & 5 deletions recipe/one_step_off_policy/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def run_ppo(config) -> None:
# Set environment variables in the runtime environment to control tokenizer parallelism,
# NCCL debug level, VLLM logging level, and allow runtime LoRA updating
# `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration
ray.init(
runtime_env=get_ppo_ray_runtime_env(),
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = get_ppo_ray_runtime_env()
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**ray_init_kwargs)

# Create a remote instance of the TaskRunner class, and
# Execute the `run` method of the TaskRunner instance remotely and wait for it to complete
Expand All @@ -63,7 +66,7 @@ def run_ppo(config) -> None:

# [Optional] get the path of the timeline trace file from the configuration, default to None
# This file is used for performance analysis
timeline_json_file = config.ray_init.get("timeline_json_file", None)
timeline_json_file = config.ray_kwargs.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)

Expand Down
12 changes: 8 additions & 4 deletions recipe/prime/main_prime.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from .prime_ray_trainer import RayPRIMETrainer

Expand All @@ -42,11 +43,14 @@ def main(config):

def run_prime(config, compute_score=None):
if not ray.is_initialized():
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

ray.get(main_task.remote(config, compute_score))

Expand Down
5 changes: 3 additions & 2 deletions recipe/r1/config/evaluation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ custom_reward_function:
path: null
name: compute_score

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
2 changes: 1 addition & 1 deletion recipe/r1/main_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def main(config):

# Initialize Ray
if not ray.is_initialized():
ray.init(num_cpus=config.ray_init.num_cpus)
ray.init(**config.ray_kwargs.get("ray_init", {}))

# evaluate test_score based on data source
data_source_reward = defaultdict(list)
Expand Down
16 changes: 10 additions & 6 deletions recipe/sppo/main_sppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import hydra
import ray
from omegaconf import OmegaConf

from verl.trainer.ppo.reward import load_reward_manager

Expand All @@ -37,13 +38,16 @@ def run_ppo(config) -> None:
# isolation, will solve in the future
os.environ["ENSURE_CUDA_VISIBLE_DEVICES"] = os.environ.get("CUDA_VISIBLE_DEVICES", "")
if not ray.is_initialized():
default_runtime_env = {
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={
"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN"}
},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
Expand Down
5 changes: 3 additions & 2 deletions verl/trainer/config/_generated_ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ global_profiler:
capture-range: cudaProfilerApi
capture-range-end: null
kill: none
ray_init:
num_cpus: null
ray_kwargs:
ray_init:
num_cpus: null
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/_generated_ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ global_profiler:
capture-range: cudaProfilerApi
capture-range-end: null
kill: none
ray_init:
num_cpus: null
ray_kwargs:
ray_init:
num_cpus: null
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/evaluation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ custom_reward_function:
path: null
name: compute_score

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/generation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ actor:
fsdp_size: -1
forward_prefetch: False # FSDP1 forward_prefetch configuration

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
5 changes: 3 additions & 2 deletions verl/trainer/config/ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ global_profiler:
# Send signal to the target application's process group. We let the program to exit by itself.
kill: none

ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
ray_kwargs:
ray_init:
num_cpus: null # `None` means using all CPUs, which might cause hang if limited in systems like SLURM. Please set to a number allowed then.
timeline_json_file: null
11 changes: 7 additions & 4 deletions verl/trainer/config/ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,14 @@ global_profiler:
# Send signal to the target application's process group. We let the program to exit by itself.
kill: none

# configs related to ray initialization
ray_init:
# configs related to ray
ray_kwargs:

# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null
# configs related to ray initialization
ray_init:

# Number of CPUs for Ray. Use a fixed number instead of null when using SLURM.
num_cpus: null

# Path to save Ray timeline JSON for performance profiling
timeline_json_file: null
2 changes: 1 addition & 1 deletion verl/trainer/main_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def main(config):

# Initialize Ray
if not ray.is_initialized():
ray.init(num_cpus=config.ray_init.num_cpus)
ray.init(**config.ray_kwargs.get("ray_init", {}))

# evaluate test_score based on data source
data_source_reward = defaultdict(list)
Expand Down
11 changes: 7 additions & 4 deletions verl/trainer/main_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ def main(config):

def run_generation(config) -> None:
if not ray.is_initialized():
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}},
num_cpus=config.ray_init.num_cpus,
)
ray.init(**ray_init_kwargs)

ray.get(main_task.remote(config))

Expand Down
13 changes: 8 additions & 5 deletions verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ def run_ppo(config) -> None:
# Set environment variables in the runtime environment to control tokenizer parallelism,
# NCCL debug level, VLLM logging level, and allow runtime LoRA updating
# `num_cpus` specifies the number of CPU cores Ray can use, obtained from the configuration
ray.init(
runtime_env=get_ppo_ray_runtime_env(),
num_cpus=config.ray_init.num_cpus,
)
default_runtime_env = get_ppo_ray_runtime_env()
ray_init_kwargs = config.ray_kwargs.pop("ray_init", {})
runtime_env_kwargs = ray_init_kwargs.pop("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
ray_init_kwargs["runtime_env"] = runtime_env
print(f"ray init kwargs: {ray_init_kwargs}")
ray.init(**ray_init_kwargs)

# Create a remote instance of the TaskRunner class, and
# Execute the `run` method of the TaskRunner instance remotely and wait for it to complete
Expand All @@ -81,7 +84,7 @@ def run_ppo(config) -> None:

# [Optional] get the path of the timeline trace file from the configuration, default to None
# This file is used for performance analysis
timeline_json_file = config.ray_init.get("timeline_json_file", None)
timeline_json_file = config.ray_kwargs.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)

Expand Down
Loading