Skip to content

Commit 9ec39bb

Browse files
committed
Translate to english
1 parent 284cc2a commit 9ec39bb

29 files changed

+360
-360
lines changed

roll/distributed/scheduler/generate_scheduler.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def generate(self, data: DataProto, actor_cluster: Union[Any, Cluster], pipeline
176176

177177
def get_available_dp_rank(self):
178178
while True:
179-
# 负载均衡逻辑,期望各dp 正在处理的条数基本接近
179+
# Load balancing logic, expect the number of items being processed by each dp to be roughly similar
180180
sorted_ranks = sorted(
181181
self.load_balance_coordinator.keys(), key=lambda rank: (self.load_balance_coordinator[rank], rank)
182182
)
@@ -210,26 +210,26 @@ def generate_opt_level_1(self, data: DataProto):
210210
)
211211
self.cluster.start_server(data=DataProto(meta_info=data.meta_info), blocking=True)
212212

213-
# 分发数据至收到target rollout 完成
214-
# 无限循环,把所有的response发送给dp worker
213+
# Distribute data until target rollout completion
214+
# Infinite loop, send all responses to dp workers
215215
send_request_count = 0
216216
request_refs = []
217217
data_index_counter = itertools.count()
218218
last_alive_check = time.time()
219219
while not self.is_completed:
220220

221-
# 探测dp worker是否存活,dp worker的server thread可能由于异常退出,造成hang
221+
# Check if dp worker is alive, dp worker's server thread may exit due to exceptions, causing hang
222222
current_time = time.time()
223223
if current_time - last_alive_check >= self.alive_check_interval:
224224
self.cluster.add_request(command=GenerateRequestType.ALIVE_CHECK, data=DataProto())
225225
last_alive_check = current_time
226226

227227
if send_request_count < data.batch.batch_size[0]:
228-
# 取一个可以发送request的dp worker
228+
# Get a dp worker that can send requests
229229
dp_rank = next(self.get_available_dp_rank())
230230

231-
# 还有数据需要发送, 取需要发送的数据
232-
# request_id 全局递增,否则vllm/sglang scheduler状态不对
231+
# Still have data to send, get the data that needs to be sent
232+
# request_id increments globally, otherwise vllm/sglang scheduler state is incorrect
233233
request_id = next(self.request_counter)
234234
data_index = next(data_index_counter)
235235
request_data = collate_fn([self.data[data_index]])
@@ -240,7 +240,7 @@ def generate_opt_level_1(self, data: DataProto):
240240
].item()
241241
self.request_id_2_dp_rank[request_data.meta_info["request_id"]] = dp_rank
242242
self.prompt_id_2_request_ids[prompt_id].add(request_data.meta_info["request_id"])
243-
# 需要注意上面的调用顺序, report_response中会更新request_id索引dp_rank,所以这里需要最后add request_id
243+
# Need to pay attention to the calling order above, report_response will update request_id index dp_rank, so need to add request_id last
244244
request_data.meta_info["response_callback_fn"] = self.response_callback_fn
245245
request_data.meta_info["generation_config"] = data.meta_info["generation_config"]
246246
request_refs.append(
@@ -257,7 +257,7 @@ def generate_opt_level_1(self, data: DataProto):
257257
request_refs = []
258258

259259
gen_metrics = self.cluster.stop_server()
260-
# generate结束时,应该收到num_return_sequences + drop_generation_num 条返回结果
260+
# When generation ends, should receive num_return_sequences + drop_generation_num return results
261261
generate_return_num = num_return_sequences + self.pipeline_config.drop_generation_num
262262
response_ids_list_of_list = []
263263
eos_token_id = None
@@ -401,7 +401,7 @@ def set_scheduler(
401401
state: Dict[str, Any] = None,
402402
):
403403
"""
404-
GenerateScheduler可以由多个实例,不再局限于单例
404+
GenerateScheduler can have multiple instances, no longer limited to singleton
405405
"""
406406
self.actor_cluster = actor_cluster
407407
self.reward_clusters = reward_clusters
@@ -466,9 +466,9 @@ def reset_status(self):
466466

467467
def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
468468
"""
469-
从dataset里,按给定策略sample batch
470-
1. 常规无过滤
471-
2. 动态过滤
469+
Sample batch from dataset using given strategy
470+
1. Regular without filtering
471+
2. Dynamic filtering
472472
"""
473473
self.batch_size = batch_size
474474
self.reset_status()
@@ -529,7 +529,7 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
529529
f"used queries: {query_use_count} query_filter_count: {self.query_filter_count} "
530530
f"response_filter_count: {self.response_filter_count}"
531531
)
532-
# TODO: 这里 len(collect_data) > rollout_batch_size, 可以尝试动态扩大batch_size
532+
# TODO: Here len(collect_data) > rollout_batch_size, can try dynamically expanding batch_size
533533
batch = DataProto.concat(collect_data[: self.batch_size * num_return_sequences])
534534
batch.meta_info["metrics"] = {
535535
f"scheduler/query_filter_count": self.query_filter_count,
@@ -538,7 +538,7 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
538538
f"scheduler/query_use_count": query_use_count,
539539
}
540540

541-
# 统计全部response metrics
541+
# Count all response metrics
542542
metrics = {}
543543
for domain, response_batches in self.response_cache.items():
544544
response_batch = DataProto.concat(response_batches[:])
@@ -555,8 +555,8 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
555555
@ray.method(concurrency_group="multi_thread")
556556
def report_response(self, data: DataProto):
557557
"""
558-
这里需要考虑多线程数据访问
559-
data 返回可能有多条的
558+
Need to consider multi-threaded data access here
559+
Data return may have multiple entries
560560
"""
561561
try:
562562
request_id = data.meta_info["request_id"]
@@ -577,15 +577,15 @@ def report_response(self, data: DataProto):
577577
return
578578

579579
# call reward
580-
# reward worker得能支持单条数据计算, dynamic sampling对需要batch计算reward的需要注意...
581-
# 多域的时候,llm as judge, 需要单独为reward worker分配gpu
580+
# reward worker must support single data calculation, dynamic sampling needs attention for batch reward calculation...
581+
# In multi-domain cases, llm as judge, need to allocate gpu separately for reward worker
582582
rewards: DataProto = ray.get(reward_worker.compute_rewards.remote(batch))
583583
batch.union(rewards)
584584

585585
response_buffers: List[DataProto] = []
586586
batch_expanded = [batch[[idx]] for idx in range(output_count)]
587587

588-
# response_filter, 不太需要response filter
588+
# response_filter, don't really need response filter
589589
for batch_item in batch_expanded:
590590
if self.response_filter_fn(batch_item, self.pipeline_config):
591591
response_buffers.append(batch_item)
@@ -713,7 +713,7 @@ def expand_requests(self, data: DataProto):
713713
return target_requests
714714

715715
def check_worker_alive(self, cluster):
716-
# 探测dp worker是否存活,dp worker的server thread可能由于异常退出,造成hang
716+
# Check if dp worker is alive, dp worker's server thread may exit due to exceptions, causing hang
717717
current_time = time.time()
718718
if current_time - self.last_alive_check >= self.alive_check_interval:
719719
cluster.add_request(command=GenerateRequestType.ALIVE_CHECK, data=DataProto())
@@ -734,7 +734,7 @@ def check_send_new_request(self) -> bool:
734734

735735
def get_available_dp_rank(self):
736736
while True:
737-
# 负载均衡逻辑,期望各dp 正在处理的条数基本接近
737+
# Load balancing logic, expect the number of items being processed by each dp to be roughly similar
738738
sorted_ranks = sorted(
739739
self.load_balance_coordinator.keys(), key=lambda rank: (self.load_balance_coordinator[rank], rank)
740740
)

roll/distributed/scheduler/reward_scheduler.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
@ray.remote
1616
class RewardScheduler:
1717
"""
18-
reward 服务化和generate不同, request接口:
19-
reward scheduler需要解决的是不同域的sample的reward计算问题, 不需要实现request粒度的接口;
20-
并且reward计算和vllm不同,vllm可以continue batch,所以可以动态add request, reward不行,
21-
直接rpc调用reward_cluster.compute_rewards即可(使用rpc方式调用,可以增加reward的数量,增大并发处理能力)
18+
Reward service is different from generation, request interface:
19+
Reward scheduler needs to solve the reward calculation problem for samples from different domains, no need to implement request-level interface;
20+
And reward calculation is different from vllm, vllm can continue batch, so it can dynamically add requests, reward cannot,
21+
directly use rpc to call reward_cluster.compute_rewards (using rpc method, can increase the number of rewards, increase concurrent processing capacity)
2222
23-
reward scheduler需要解决的问题:
24-
按domain路由reward
25-
dp dispatch 均分/不足dp_size 的限制
23+
Problems that reward scheduler needs to solve:
24+
Route rewards by domain
25+
dp dispatch load balancing/insufficient dp_size limitations
2626
"""
2727

2828
def __init__(self):
@@ -32,13 +32,13 @@ def __init__(self):
3232

3333
def compute_rewards(self, data: DataProto, reward_clusters: Dict[str, Any], pipeline_config) -> DataProto:
3434
"""
35-
保序返回rewards
35+
Return rewards in order
3636
"""
3737
self.pipeline_config = pipeline_config
3838
self.reward_clusters = reward_clusters
3939
data.batch["prompt_id"] = torch.arange(data.batch.batch_size[0], device=data.batch.device)
4040

41-
# 按domain group by data
41+
# Group data by domain
4242
grouped_data: Dict[str, DataProto] = data.group_by("domain")
4343

4444
domain_rewards_refs: Dict[str, List[ray.ObjectRef]] = defaultdict(list)
@@ -51,8 +51,8 @@ def compute_rewards(self, data: DataProto, reward_clusters: Dict[str, Any], pipe
5151

5252
rewards_list: List[DataProto] = []
5353
for domain, domain_rewards_ref in domain_rewards_refs.items():
54-
# 各reward的输出schema要求一致
55-
# reward worker compute_rewards 接口返回结果保序
54+
# All rewards require consistent output schema
55+
# Reward worker compute_rewards interface returns results in order
5656
if domain not in grouped_data.keys():
5757
continue
5858
domain_rewards: DataProto = DataProto.materialize_concat(data_refs=domain_rewards_ref)

roll/models/model_providers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ def default_reward_model_provider(
209209
is_trainable: Optional[bool] = False,
210210
):
211211
"""
212-
model.forward 遵循TokenClassifierOutput 协议
212+
model.forward follows TokenClassifierOutput protocol
213213
class TokenClassifierOutput(ModelOutput):
214-
logits: torch.FloatTensor # 必须要有
214+
logits: torch.FloatTensor # Required
215215
loss: Optional[torch.FloatTensor] = None
216216
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
217217
attentions: Optional[Tuple[torch.FloatTensor, ...]] = None

roll/pipeline/agentic/agentic_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class EnvManagerConfig(WorkerConfig):
4545

4646
def __post_init__(self):
4747
"""
48-
根据es config计算world_size
48+
Calculate world_size based on es config
4949
"""
5050
self.world_size = self.env_groups * self.group_size
5151
self.env_configs: Optional[Dict[int, Dict]] = None
@@ -266,7 +266,7 @@ def set_max_steps(self, max_steps: int):
266266
self.critic.training_args.per_device_train_batch_size
267267
* self.critic.training_args.gradient_accumulation_steps
268268
)
269-
# 没有除dp_size,需要在分布式环境初始化后再除
269+
# Not divided by dp_size, need to divide after distributed environment initialization
270270
self.actor_train.training_args.max_steps = max_steps * (
271271
self.rollout_batch_size
272272
* self.actor_infer.generating_args.num_return_sequences

roll/pipeline/agentic/agentic_pipeline.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def __init__(self, pipeline_config: AgenticConfig):
107107

108108
@torch.no_grad()
109109
def run(self):
110-
# 计算tokens per second 系统吞吐
110+
# Calculate tokens per second system throughput
111111
tps_timer = _Timer(window_size=5)
112112

113113
for global_step in range(self.pipeline_config.max_steps):
@@ -191,8 +191,8 @@ def run(self):
191191
metrics.update(reduce_metrics(old_log_probs.meta_info.pop("metrics", {})))
192192
metrics["time/old_log_probs_values"] = cal_old_logpb_timer.last
193193

194-
# 要按group by处理reward
195-
# 可以tag(env_type)/traj_group_id(group)/batch(rollout_batch)... group_by计算reward/adv
194+
# Need to process rewards by group
195+
# Can group by tag(env_type)/traj_group_id(group)/batch(rollout_batch)... to calculate reward/adv
196196
batch.batch["prompt_id"] = torch.arange(batch.batch.batch_size[0], device=batch.batch.device)
197197
with Timer(name="adv", logger=None) as timer:
198198
grouping = self.pipeline_config.reward_normalization.grouping
@@ -228,7 +228,7 @@ def run(self):
228228
batch = DataProto.concat(batch_list)
229229
batch.reorder(indices=torch.argsort(batch.batch["prompt_id"]))
230230
batch.pop("prompt_id")
231-
# advantage是全局batch计算,还是group内计算?
231+
# Is advantage calculated globally across batch or within groups?
232232
batch = compute_advantage(
233233
data=batch,
234234
gamma=self.pipeline_config.gamma,
@@ -314,8 +314,8 @@ def run(self):
314314

315315

316316
def compute_data_metrics(batch):
317-
# token_level_scores 是reward model给每个token的打分,可能经过了norm/clip
318-
# score 为env的reward,raw value
317+
# token_level_scores are scores given by reward model to each token, possibly after norm/clip
318+
# score is the environment reward, raw value
319319
sequence_score = batch.batch["scores"].sum(-1)
320320
sequence_reward = batch.batch["token_level_rewards"].sum(-1)
321321
advantages = batch.batch["advantages"]

0 commit comments

Comments
 (0)