[PD] prepare request in prefill instance by multi threads#7724
[PD] prepare request in prefill instance by multi threads#7724juncaipeng wants to merge 1 commit intoPaddlePaddle:release/2.6from
Conversation
|
Thanks for your contribution! |
9de9265 to
7f32773
Compare
There was a problem hiding this comment.
Pull request overview
该 PR 旨在在 PD disaggregation 场景下,将 prefill 实例的 request 获取/准备(prepare)阶段从原先与调度线程耦合的实现中拆分出来,并尝试通过多线程并发 fetch/prepare 来提升吞吐;同时简化了 engine 与 cache_messager 之间关于“add cache task 完成”的同步机制,并新增了相关环境变量开关。
Changes:
- 将
ENABLE_V1_KVCACHE_SCHEDULER下的“取请求+准备请求”逻辑抽到EngineServicePrepareMixin,并在EngineService.start()中新增prepare_request_thread与schedule_request_thread两条线程分工协作。 - 移除
EngineWorkerQueue中finished_add_cache_task_*相关队列/锁/manager register 及put/get_finished_add_cache_task_req接口,cache_messager 侧改为在发送缓存前按需等待任务就绪。 - 新增环境变量
FD_PREFILL_FETCH_THREAD_NUM控制 prefill 实例的 fetch worker 线程数。
PR 标题/描述需要补充:
- 标题未按仓库约定携带标签(形如
[Optimization].../[PD Disaggregation]...),建议改为例如:[PD Disaggregation][Optimization] Prepare requests in prefill with multi-thread fetching。 - PR 描述模板未填写 Motivation / Modifications / Usage / Accuracy Tests 等关键信息,建议至少补充:为何需要多线程、预期收益/风险、如何验证(benchmark/trace/压测指标)、以及为何无需/如何补充单测。
- 新增环境变量与行为变更建议同步更新文档(如 environment_variables.md/相关使用说明)。
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/inter_communicator/engine_worker_queue.py | 移除“add cache task 完成”相关共享队列/锁/注册与接口,简化 e2w 同步面。 |
| fastdeploy/envs.py | 新增 FD_PREFILL_FETCH_THREAD_NUM 环境变量读取。 |
| fastdeploy/engine/common_engine.py | EngineService 引入 prepare mixin;拆分 prepare/schedule 两条线程;退出时增加 fetch pool shutdown。 |
| fastdeploy/engine/common_engine_prepare_mixin.py | 新增 mixin:实现 mixed/prefill/decode 三种角色的 fetch/prepare,并用 ThreadPoolExecutor 支持 prefill 多线程 fetch。 |
| fastdeploy/cache_manager/cache_messager.py | 移除对 finished_add_cache_task 队列/屏障的依赖;新增等待 cache task 到达的逻辑;补充/调整部分注释与日志。 |
Comments suppressed due to low confidence (1)
fastdeploy/inter_communicator/engine_worker_queue.py:100
- 该 PR 移除了 finished_add_cache_task_* 相关队列/锁/接口(例如 put/get_finished_add_cache_task_req 及对应 manager register)。当前仓库测试里仍有多处直接引用这些接口(如 tests/inter_communicator/test_e2w_queue.py、tests/engine/test_common_engine.py、tests/cache_manager/test_cache_messager.py),会导致单测失败或下游代码调用报错。建议同步更新测试与所有调用方;如需兼容旧行为,可保留接口但实现为空/弃用并在日志中提示迁移。
]
self.connected_client_counter_init: List[Value] = [
Value("i", 0) for _ in range(self.local_data_parallel_size)
]
self.finished_req_list = [list() for _ in range(self.local_data_parallel_size)]
self.cache_infos_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)]
self.connect_rdma_tasks_list = [list() for _ in range(self.local_data_parallel_size)]
self.connect_rdma_tasks_response_list = [list() for _ in range(self.local_data_parallel_size)]
self.client_read_info_flag_init: List[List[int]] = [
[0] * self.num_client for _ in range(self.local_data_parallel_size)
]
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览当前 1 个 Required 任务失败(
2 任务状态汇总2.1 Required 任务:5/10 通过
2.2 可选任务 — 26/30 通过
3 失败详情(仅 required)Approval — 审批门控未通过(置信度: 高)诊断结论
修复建议
|
7f32773 to
3814451
Compare
3814451 to
b2ff318
Compare
| Value("i", 0) for _ in range(self.local_data_parallel_size) | ||
| ] | ||
| self.finished_req_list = [list() for _ in range(self.local_data_parallel_size)] | ||
| self.finished_add_cache_task_list = [list() for _ in range(self.local_data_parallel_size)] | ||
| self.cache_infos_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)] | ||
| self.connect_rdma_tasks_list = [list() for _ in range(self.local_data_parallel_size)] |
| message = self._serialize_message(msg_type, payload) | ||
| try: | ||
| self.logger.info(f"_send_message: msg_type={msg_type} addr={addr}") | ||
| sock = self._get_push_socket(addr) | ||
| sock.send_multipart(message) | ||
| with self._socket_lock: | ||
| sock = self._get_push_socket(addr) | ||
| sock.send_multipart(message) | ||
|
|
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-06 18:10:47
📋 Review 摘要
PR 概述:在 PD 分离 Prefill 实例中,将请求准备阶段(资源预分配、D 侧协调、异步预处理)从串行改为多线程并发执行,同步清理旧的 finish_add_cache_task IPC 同步机制并为 ZMQ socket 访问加锁。
变更范围:fastdeploy/engine/、fastdeploy/cache_manager/、fastdeploy/inter_communicator/、fastdeploy/splitwise/、fastdeploy/envs.py
影响面 Tag:[Engine] [PD Disaggregation] [KVCache]
📝 PR 规范检查
存在两处规范问题:① 标题使用了非官方 Tag [PD](官方为 [PD Disaggregation]);② 目标分支为 release/2.6,需遵循 Cherry-Pick 格式,但标题和 Checklist 末项均未体现;③ Motivation / Modifications / Usage or Command / Accuracy Tests 四个必填 section 均为空。
标题建议(可直接复制):
[Cherry-Pick][PD Disaggregation] prepare request in prefill instance by multi threads (#<原PR号>)
PR 描述建议(可直接复制):
## Motivation
在 PD 分离部署中,Prefill 实例的请求准备流程(P 侧资源预分配、向 D 侧申请资源、异步预处理等待)串行执行,成为吞吐瓶颈。本 PR 将请求准备逻辑提取为独立的 `EngineServicePrepareMixin`,并通过多线程池(默认 5 个 worker)并发处理,从而提升 Prefill 实例整体吞吐量。同步移除了旧的 `finish_add_cache_task` IPC 同步机制(barrier/lock/queue),改为在 cache messager 侧的自旋等待;为 ZMQ DEALER socket 新增 RLock 保护以支持多线程并发发送。
## Modifications
- 新增 `fastdeploy/engine/common_engine_prepare_mixin.py`:提取 `EngineServicePrepareMixin`,包含 `_fetch_request_prefill`(Prefill 角色,支持多线程)、`_fetch_request_decode`(Decode 角色)、`_fetch_request_mixed`(Mixed 角色)及 `_prepare_request_v1` 多线程入口
- `fastdeploy/engine/common_engine.py`:`EngineService` 继承 `EngineServicePrepareMixin`;`_schedule_request_to_worker_v1` 拆分为独立的 `prepare_request_thread` + `schedule_request_thread`,移除 `ThreadPoolExecutor` 内嵌逻辑
- `fastdeploy/cache_manager/cache_messager.py`:移除 `finish_add_cache_task_barrier` 同步;新增 `_maybe_wait_for_cache_task`,在 `prefill_layerwise_send_cache_thread` 中等待 cache task 就绪
- `fastdeploy/inter_communicator/engine_worker_queue.py`:清理 `finish_add_cache_task` 相关 IPC 原语(队列、锁、barrier、client flag、Value 标志位)
- `fastdeploy/envs.py`:新增 `FD_PREFILL_PREPARE_REQ_THREAD_NUM`(默认 5)控制 Prefill 侧请求准备线程数
- `fastdeploy/splitwise/splitwise_connector.py`:新增 `_socket_lock`(RLock)保护多线程并发访问 ZMQ DEALER socket
## Usage or Command
```bash
# 控制 Prefill 实例请求准备线程数(默认 5)
export FD_PREFILL_PREPARE_REQ_THREAD_NUM=5
```
## Accuracy Tests
N/A(纯性能优化,无模型前向计算逻辑变更)
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [x] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | cache_messager.py:925 |
_maybe_wait_for_cache_task 无超时退出,cache task 丢失时将永久阻塞 KV 传输线程 |
| ❓ 疑问 | common_engine_prepare_mixin.py:34 |
_fetch_request_mixed 含未验证 FIXME,MTP 场景行为待确认 |
总体评价
整体重构方向合理:将串行准备阶段并发化、清理冗余 IPC 原语、修复多线程 ZMQ socket 竞争。核心线程安全问题(scheduler/resource_manager/ZMQ socket)经验证均有锁保护。_maybe_wait_for_cache_task 缺少超时退出需在正式上线前处理,避免因 cache info 丢失导致服务级挂起。
b2ff318 to
ccef373
Compare
Motivation
Modifications
fastdeploy/engine/common_engine_prepare_mixin.py(新增):抽取EngineServicePrepareMixin,实现_fetch_request_mixed、_fetch_request_prefill、_fetch_request_decode三种角色的请求准备逻辑,以及_fetch_loopworker 线程和_prepare_request_v1入口。fastdeploy/engine/common_engine.py:EngineService继承EngineServicePrepareMixin;在ENABLE_V1_KVCACHE_SCHEDULER模式下新增prepare_request_thread与schedule_request_thread并行运行;移除原内联ThreadPoolExecutor及_fetch_request闭包。fastdeploy/envs.py:新增FD_PREFILL_PREPARE_REQ_THREAD_NUM(默认 5)控制 prefill 实例的请求准备线程数。fastdeploy/inter_communicator/engine_worker_queue.py:删除finished_add_cache_task相关的队列、锁、barrier 及标志位,移除put_finished_add_cache_task_req/get_finished_add_cache_task_req方法。fastdeploy/cache_manager/cache_messager.py:新增_maybe_wait_for_cache_task轮询等待方法;在prefill_layerwise_send_cache_thread中调用该方法主动等待 cache task 就绪;移除旧的finish_add_cache_task_barrier同步点。Usage or Command
Tests
性能测试条件:EB45 0.3B模型、1P1D、1000条请求、并发256、输入平均1280、输出平均700
N/A(本次变更为性能优化,不影响模型计算结果)
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.