Skip to content

Commit 1e18794

Browse files
committed
Translate to english
1 parent b756338 commit 1e18794

29 files changed

+359
-359
lines changed

roll/distributed/scheduler/generate_scheduler.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def generate(self, data: DataProto, actor_cluster: Union[Any, Cluster], pipeline
171171

172172
def get_available_dp_rank(self):
173173
while True:
174-
# 负载均衡逻辑,期望各dp 正在处理的条数基本接近
174+
# Load balancing logic, expect the number of items being processed by each dp to be roughly similar
175175
sorted_ranks = sorted(
176176
self.load_balance_coordinator.keys(), key=lambda rank: (self.load_balance_coordinator[rank], rank)
177177
)
@@ -205,26 +205,26 @@ def generate_opt_level_1(self, data: DataProto):
205205
)
206206
self.cluster.start_server(data=DataProto(meta_info=data.meta_info), blocking=True)
207207

208-
# 分发数据至收到target rollout 完成
209-
# 无限循环,把所有的response发送给dp worker
208+
# Distribute data until target rollout completion
209+
# Infinite loop, send all responses to dp workers
210210
send_request_count = 0
211211
request_refs = []
212212
data_index_counter = itertools.count()
213213
last_alive_check = time.time()
214214
while not self.is_completed:
215215

216-
# 探测dp worker是否存活,dp worker的server thread可能由于异常退出,造成hang
216+
# Check if dp worker is alive, dp worker's server thread may exit due to exceptions, causing hang
217217
current_time = time.time()
218218
if current_time - last_alive_check >= self.alive_check_interval:
219219
self.cluster.add_request(command=GenerateRequestType.ALIVE_CHECK, data=DataProto())
220220
last_alive_check = current_time
221221

222222
if send_request_count < data.batch.batch_size[0]:
223-
# 取一个可以发送request的dp worker
223+
# Get a dp worker that can send requests
224224
dp_rank = next(self.get_available_dp_rank())
225225

226-
# 还有数据需要发送, 取需要发送的数据
227-
# request_id 全局递增,否则vllm/sglang scheduler状态不对
226+
# Still have data to send, get the data that needs to be sent
227+
# request_id increments globally, otherwise vllm/sglang scheduler state is incorrect
228228
request_id = next(self.request_counter)
229229
data_index = next(data_index_counter)
230230
request_data = collate_fn([self.data[data_index]])
@@ -235,7 +235,7 @@ def generate_opt_level_1(self, data: DataProto):
235235
].item()
236236
self.request_id_2_dp_rank[request_data.meta_info["request_id"]] = dp_rank
237237
self.prompt_id_2_request_ids[prompt_id].add(request_data.meta_info["request_id"])
238-
# 需要注意上面的调用顺序, report_response中会更新request_id索引dp_rank,所以这里需要最后add request_id
238+
# Need to pay attention to the calling order above, report_response will update request_id index dp_rank, so need to add request_id last
239239
request_data.meta_info["response_callback_fn"] = self.response_callback_fn
240240
request_data.meta_info["generation_config"] = data.meta_info["generation_config"]
241241
request_refs.append(
@@ -394,7 +394,7 @@ def set_scheduler(
394394
state: Dict[str, Any] = None,
395395
):
396396
"""
397-
GenerateScheduler可以由多个实例,不再局限于单例
397+
GenerateScheduler can have multiple instances, no longer limited to singleton
398398
"""
399399
self.actor_cluster = actor_cluster
400400
self.reward_clusters = reward_clusters
@@ -459,9 +459,9 @@ def reset_status(self):
459459

460460
def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
461461
"""
462-
从dataset里,按给定策略sample batch
463-
1. 常规无过滤
464-
2. 动态过滤
462+
Sample batch from dataset using given strategy
463+
1. Regular without filtering
464+
2. Dynamic filtering
465465
"""
466466
self.batch_size = batch_size
467467
self.reset_status()
@@ -522,7 +522,7 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
522522
f"used queries: {query_use_count} query_filter_count: {self.query_filter_count} "
523523
f"response_filter_count: {self.response_filter_count}"
524524
)
525-
# TODO: 这里 len(collect_data) > rollout_batch_size, 可以尝试动态扩大batch_size
525+
# TODO: Here len(collect_data) > rollout_batch_size, can try dynamically expanding batch_size
526526
batch = DataProto.concat(collect_data[: self.batch_size * num_return_sequences])
527527
batch.meta_info["metrics"] = {
528528
f"scheduler/query_filter_count": self.query_filter_count,
@@ -531,7 +531,7 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
531531
f"scheduler/query_use_count": query_use_count,
532532
}
533533

534-
# 统计全部response metrics
534+
# Count all response metrics
535535
metrics = {}
536536
for domain, response_batches in self.response_cache.items():
537537
response_batch = DataProto.concat(response_batches[:])
@@ -548,8 +548,8 @@ def get_batch(self, data: DataProto, batch_size: int) -> DataProto:
548548
@ray.method(concurrency_group="multi_thread")
549549
def report_response(self, data: DataProto):
550550
"""
551-
这里需要考虑多线程数据访问
552-
data 返回可能有多条的
551+
Need to consider multi-threaded data access here
552+
Data return may have multiple entries
553553
"""
554554
try:
555555
request_id = data.meta_info["request_id"]
@@ -570,15 +570,15 @@ def report_response(self, data: DataProto):
570570
return
571571

572572
# call reward
573-
# reward worker得能支持单条数据计算, dynamic sampling对需要batch计算reward的需要注意...
574-
# 多域的时候,llm as judge, 需要单独为reward worker分配gpu
573+
# reward worker must support single data calculation, dynamic sampling needs attention for batch reward calculation...
574+
# In multi-domain cases, llm as judge, need to allocate gpu separately for reward worker
575575
rewards: DataProto = ray.get(reward_worker.compute_rewards.remote(batch))
576576
batch.union(rewards)
577577

578578
response_buffers: List[DataProto] = []
579579
batch_expanded = [batch[[idx]] for idx in range(output_count)]
580580

581-
# response_filter, 不太需要response filter
581+
# response_filter, don't really need response filter
582582
for batch_item in batch_expanded:
583583
if self.response_filter_fn(batch_item, self.pipeline_config):
584584
response_buffers.append(batch_item)
@@ -706,7 +706,7 @@ def expand_requests(self, data: DataProto):
706706
return target_requests
707707

708708
def check_worker_alive(self, cluster):
709-
# 探测dp worker是否存活,dp worker的server thread可能由于异常退出,造成hang
709+
# Check if dp worker is alive, dp worker's server thread may exit due to exceptions, causing hang
710710
current_time = time.time()
711711
if current_time - self.last_alive_check >= self.alive_check_interval:
712712
cluster.add_request(command=GenerateRequestType.ALIVE_CHECK, data=DataProto())
@@ -727,7 +727,7 @@ def check_send_new_request(self) -> bool:
727727

728728
def get_available_dp_rank(self):
729729
while True:
730-
# 负载均衡逻辑,期望各dp 正在处理的条数基本接近
730+
# Load balancing logic, expect the number of items being processed by each dp to be roughly similar
731731
sorted_ranks = sorted(
732732
self.load_balance_coordinator.keys(), key=lambda rank: (self.load_balance_coordinator[rank], rank)
733733
)

roll/distributed/scheduler/reward_scheduler.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
@ray.remote
1616
class RewardScheduler:
1717
"""
18-
reward 服务化和generate不同, request接口:
19-
reward scheduler需要解决的是不同域的sample的reward计算问题, 不需要实现request粒度的接口;
20-
并且reward计算和vllm不同,vllm可以continue batch,所以可以动态add request, reward不行,
21-
直接rpc调用reward_cluster.compute_rewards即可(使用rpc方式调用,可以增加reward的数量,增大并发处理能力)
18+
Reward service is different from generation, request interface:
19+
Reward scheduler needs to solve the reward calculation problem for samples from different domains, no need to implement request-level interface;
20+
And reward calculation is different from vllm, vllm can continue batch, so it can dynamically add requests, reward cannot,
21+
directly use rpc to call reward_cluster.compute_rewards (using rpc method, can increase the number of rewards, increase concurrent processing capacity)
2222
23-
reward scheduler需要解决的问题:
24-
按domain路由reward
25-
dp dispatch 均分/不足dp_size 的限制
23+
Problems that reward scheduler needs to solve:
24+
Route rewards by domain
25+
dp dispatch load balancing/insufficient dp_size limitations
2626
"""
2727

2828
def __init__(self):
@@ -32,13 +32,13 @@ def __init__(self):
3232

3333
def compute_rewards(self, data: DataProto, reward_clusters: Dict[str, Any], pipeline_config) -> DataProto:
3434
"""
35-
保序返回rewards
35+
Return rewards in order
3636
"""
3737
self.pipeline_config = pipeline_config
3838
self.reward_clusters = reward_clusters
3939
data.batch["prompt_id"] = torch.arange(data.batch.batch_size[0], device=data.batch.device)
4040

41-
# 按domain group by data
41+
# Group data by domain
4242
grouped_data: Dict[str, DataProto] = data.group_by("domain")
4343

4444
domain_rewards_refs: Dict[str, List[ray.ObjectRef]] = defaultdict(list)
@@ -51,8 +51,8 @@ def compute_rewards(self, data: DataProto, reward_clusters: Dict[str, Any], pipe
5151

5252
rewards_list: List[DataProto] = []
5353
for domain, domain_rewards_ref in domain_rewards_refs.items():
54-
# 各reward的输出schema要求一致
55-
# reward worker compute_rewards 接口返回结果保序
54+
# All rewards require consistent output schema
55+
# Reward worker compute_rewards interface returns results in order
5656
if domain not in grouped_data.keys():
5757
continue
5858
domain_rewards: DataProto = DataProto.materialize_concat(data_refs=domain_rewards_ref)

roll/models/model_providers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,9 @@ def default_reward_model_provider(
376376
is_trainable: Optional[bool] = False,
377377
):
378378
"""
379-
model.forward 遵循TokenClassifierOutput 协议
379+
model.forward follows TokenClassifierOutput protocol
380380
class TokenClassifierOutput(ModelOutput):
381-
logits: torch.FloatTensor # 必须要有
381+
logits: torch.FloatTensor # Required
382382
loss: Optional[torch.FloatTensor] = None
383383
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
384384
attentions: Optional[Tuple[torch.FloatTensor, ...]] = None

roll/pipeline/agentic/agentic_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class EnvManagerConfig(WorkerConfig):
4545

4646
def __post_init__(self):
4747
"""
48-
根据es config计算world_size
48+
Calculate world_size based on es config
4949
"""
5050
self.world_size = self.env_groups * self.group_size
5151
self.env_configs: Optional[Dict[int, Dict]] = None
@@ -266,7 +266,7 @@ def set_max_steps(self, max_steps: int):
266266
self.critic.training_args.per_device_train_batch_size
267267
* self.critic.training_args.gradient_accumulation_steps
268268
)
269-
# 没有除dp_size,需要在分布式环境初始化后再除
269+
# Not divided by dp_size, need to divide after distributed environment initialization
270270
self.actor_train.training_args.max_steps = max_steps * (
271271
self.rollout_batch_size
272272
* self.actor_infer.generating_args.num_return_sequences

roll/pipeline/agentic/agentic_pipeline.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def __init__(self, pipeline_config: AgenticConfig):
107107

108108
@torch.no_grad()
109109
def run(self):
110-
# 计算tokens per second 系统吞吐
110+
# Calculate tokens per second system throughput
111111
tps_timer = _Timer(window_size=5)
112112

113113
for global_step in range(self.pipeline_config.max_steps):
@@ -191,8 +191,8 @@ def run(self):
191191
metrics.update(reduce_metrics(old_log_probs.meta_info.pop("metrics", {})))
192192
metrics["time/old_log_probs_values"] = cal_old_logpb_timer.last
193193

194-
# 要按group by处理reward
195-
# 可以tag(env_type)/traj_group_id(group)/batch(rollout_batch)... group_by计算reward/adv
194+
# Need to process rewards by group
195+
# Can group by tag(env_type)/traj_group_id(group)/batch(rollout_batch)... to calculate reward/adv
196196
batch.batch["prompt_id"] = torch.arange(batch.batch.batch_size[0], device=batch.batch.device)
197197
with Timer(name="adv", logger=None) as timer:
198198
grouping = self.pipeline_config.reward_normalization.grouping
@@ -228,7 +228,7 @@ def run(self):
228228
batch = DataProto.concat(batch_list)
229229
batch.reorder(indices=torch.argsort(batch.batch["prompt_id"]))
230230
batch.pop("prompt_id")
231-
# advantage是全局batch计算,还是group内计算?
231+
# Is advantage calculated globally across batch or within groups?
232232
batch = compute_advantage(
233233
data=batch,
234234
gamma=self.pipeline_config.gamma,
@@ -314,8 +314,8 @@ def run(self):
314314

315315

316316
def compute_data_metrics(batch):
317-
# token_level_scores 是reward model给每个token的打分,可能经过了norm/clip
318-
# score 为env的reward,raw value
317+
# token_level_scores are scores given by reward model to each token, possibly after norm/clip
318+
# score is the environment reward, raw value
319319
sequence_score = batch.batch["scores"].sum(-1)
320320
sequence_reward = batch.batch["token_level_rewards"].sum(-1)
321321
advantages = batch.batch["advantages"]

0 commit comments

Comments
 (0)