[PD] PD send cache via storage & Refine swap_cache_layout op#7839
[PD] PD send cache via storage & Refine swap_cache_layout op#7839juncaipeng wants to merge 4 commits into
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
本 PR 在 PD 分离场景中新增「Storage Pool 模式」:P 侧不再通过 RDMA/IPC 直接把 KVCache 推到 D 侧 GPU,而是写入全局存储池,由 D 侧从存储池读回,依靠环境变量 FD_PD_TRANSFER_VIA_STORAGE 开关。同时对 swap_cache_layout.cu 进行重构,引入按 swap block 聚合所有 layer 的 D2H/H2D kernel 与全局 staging/指针缓冲,以提升交换性能。
Changes:
- 新增
FD_PD_TRANSFER_VIA_STORAGE环境变量及在 P 侧token_processor、cache_messager、D 侧common_engine、resource_manager_v1中的对应分支,配套增加prefix_cache_manager.write_all_cache_to_storage/read_cache_from_storage_for_pd。 - 重构
swap_cache_layout.cu,引入swap_d2h_kernel/scatter_blocks_kernel与文件级 staging/指针缓存,针对 cpu_block_ids 连续的常见路径走优化分支。 - 调整若干调度调试日志(
schedule()内强制update_metrics(True)、scheduled_reqs日志移出守卫),更新examples/cache_storage/run_03b_pd_storage.sh中MOONCAKE_GLOBAL_SEGMENT_SIZE。
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/envs.py | 新增 FD_PD_TRANSFER_VIA_STORAGE 开关 |
| fastdeploy/output/token_processor.py | P 侧在 send_first_token 前把全量 cache 写入存储 |
| fastdeploy/cache_manager/cache_messager.py | Storage 模式下跳过 RDMA 传输直接标记完成 |
| fastdeploy/cache_manager/prefix_cache_manager.py | 新增 PD 存储池模式专用的 write/read 流程及 :partial:N key |
| fastdeploy/engine/common_engine.py | D 侧在加入运行队列前同步从存储池读取 cache |
| fastdeploy/engine/sched/resource_manager_v1.py | finish 路径走新写法 + 调度内增加 verbose metrics/debug 日志 |
| custom_ops/gpu_ops/swap_cache_layout.cu | 重构为基于全局缓冲与 kernel 的批量 swap 实现 |
| examples/cache_storage/run_03b_pd_storage.sh | 调大示例脚本中的 Mooncake segment size |
Comments suppressed due to low confidence (2)
fastdeploy/engine/common_engine.py:1854
- 这里直接以
self.resource_manager.requests[request_id]取请求对象,没有任何防御性判断。在 D 流程中,前面add_prefilled_request之前的几个分支(first token eos、error_code != 200)已经会走pre_recycle_resource然后continue,但是当存储读取失败时本函数自己又会调用pre_recycle_resource(request_id)然后continue,紧接着外层循环可能再次拿到同一request_id(例如重试或者错误重发场景),或者上游已经清理过requests字典时,就会抛KeyError并把整个_process_prefilled_requests异常吞掉,导致后续请求处理被阻塞。建议使用self.resource_manager.requests.get(request_id),并在为None时记录错误并跳过/回收,而不是依赖外层 try/except。
request = self.resource_manager.requests[request_id]
fastdeploy/cache_manager/cache_messager.py:695
- 此处的 log 字符串
"[PD Storage] Skip RDMA transfer, mark as finished, " f"req_id: {task['request_id']}"实际是两个相邻字面量拼接,第二个才是 f-string。代码功能没问题,但写法容易让读者误以为前半段也是 f-string;建议直接合并为一个 f-string。类似写法在本 PR 的common_engine.py1859、1866、1873 行也出现,建议一并统一。
logger.info(
f"[PD Storage] Skip RDMA transfer, mark as finished, " f"req_id: {task['request_id']}"
)
| running_req_ids = [req.request_id for req in self.running] | ||
| llm_logger.debug(f"running requests num: {len(self.running)}, running_req_ids: {running_req_ids}") | ||
| self.update_metrics(True) |
| # PD disaggregation cache transfer mode: | ||
| # 0 (default): Direct transfer mode, P writes cache to D's GPU via RDMA/IPC | ||
| # 1: Storage pool mode, P writes cache to global storage pool, D reads from storage pool | ||
| "FD_PD_TRANSFER_VIA_STORAGE": lambda: int(os.getenv("FD_PD_TRANSFER_VIA_STORAGE", "0")), |
8aa93f4 to
2c1334e
Compare
2c1334e to
100061d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
fastdeploy/cache_manager/prefix_cache_manager.py:1410
write_all_cache_to_storage与read_cache_from_storage_for_pd两个方法中,partial block 的 key 构造、token_ids 还原、block_size 切分逻辑几乎完全一致(lines 1304–1331 与 1385–1410)。建议抽出一个私有方法(例如_compute_storage_keys_with_partial(input_token_ids, request))返回 keys 列表,避免后续修改 hash 算法时两处遗漏导致 P/D 端 key 不一致而读不到 cache。
# 2. Calculate cache keys using same algorithm as write_all_cache_to_storage
keys = []
prefix_block_key = []
block_size = self.config.cache_config.block_size
mm_idx = 0
for i in range(0, len(input_token_ids), block_size):
block_token_ids = input_token_ids[i : i + block_size]
actual_token_num = len(block_token_ids)
if actual_token_num < block_size:
key = get_hash_str(block_token_ids, prefix_block_key)
key = f"{key}:partial:{actual_token_num}"
keys.append(key)
else:
mm_idx, extra_keys = self.get_block_hash_extra_keys(
request=request,
start_idx=i,
end_idx=i + block_size,
mm_idx=mm_idx,
)
prefix_block_key.extend(extra_keys)
key = get_hash_str(block_token_ids, prefix_block_key)
keys.append(key)
prefix_block_key = [key]
| cudaMemcpyAsync(g_device_block_ids, | ||
| gpu_block_ids.data(), | ||
| n_blocks * sizeof(int64_t), | ||
| cudaMemcpyHostToDevice, | ||
| stream); | ||
|
|
||
| std::vector<DataType_*> h_layer_ptrs(layer_number); | ||
| for (int64_t i = 0; i < layer_number; i++) { | ||
| h_layer_ptrs[i] = reinterpret_cast<DataType_*>( | ||
| const_cast<data_t*>(cache_gpu_tensors[i].data<data_t>())); | ||
| } | ||
| cudaMemcpyAsync(g_device_layer_ptrs, | ||
| h_layer_ptrs.data(), | ||
| layer_number * sizeof(DataType_*), | ||
| cudaMemcpyHostToDevice, | ||
| stream); |
| static void* g_staging_buffer = nullptr; | ||
| static size_t g_staging_buffer_size = 0; | ||
| static void* g_device_block_ids = nullptr; | ||
| static size_t g_device_block_ids_size = 0; | ||
| static void* g_device_layer_ptrs = nullptr; | ||
| static size_t g_device_layer_ptrs_size = 0; | ||
|
|
||
| static void ensure_staging_buffer(size_t required_size) { | ||
| if (g_staging_buffer_size < required_size) { | ||
| if (g_staging_buffer) cudaFree(g_staging_buffer); | ||
| cudaError_t err = cudaMalloc(&g_staging_buffer, required_size); | ||
| PADDLE_ENFORCE_EQ( | ||
| err, | ||
| cudaSuccess, | ||
| phi::errors::External("cudaMalloc staging buffer failed: %s", | ||
| cudaGetErrorString(err))); | ||
| g_staging_buffer_size = required_size; | ||
| } | ||
| } | ||
|
|
||
| static void ensure_device_block_ids(size_t required_size) { | ||
| if (g_device_block_ids_size < required_size) { | ||
| if (g_device_block_ids) cudaFree(g_device_block_ids); | ||
| cudaError_t err = cudaMalloc(&g_device_block_ids, required_size); | ||
| PADDLE_ENFORCE_EQ( | ||
| err, | ||
| cudaSuccess, | ||
| phi::errors::External("cudaMalloc device block_ids failed: %s", | ||
| cudaGetErrorString(err))); | ||
| g_device_block_ids_size = required_size; | ||
| } | ||
| } | ||
|
|
||
| static void ensure_device_layer_ptrs(size_t required_size) { | ||
| if (g_device_layer_ptrs_size < required_size) { | ||
| if (g_device_layer_ptrs) cudaFree(g_device_layer_ptrs); | ||
| cudaError_t err = cudaMalloc(&g_device_layer_ptrs, required_size); | ||
| PADDLE_ENFORCE_EQ( | ||
| err, | ||
| cudaSuccess, | ||
| phi::errors::External("cudaMalloc device layer_ptrs failed: %s", | ||
| cudaGetErrorString(err))); | ||
| g_device_layer_ptrs_size = required_size; | ||
| } | ||
| } |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.6 #7839 +/- ##
==============================================
Coverage ? 72.31%
==============================================
Files ? 381
Lines ? 54262
Branches ? 8479
==============================================
Hits ? 39241
Misses ? 12259
Partials ? 2762
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务 : 8/10 通过
2.2 可选任务 — 23/26 通过
3 失败详情(仅 required)Approval — 流程审批问题(置信度: 高)Approval
根因详情:
关键日志: 修复建议:
修复建议摘要: 请 jiangjiajun 等 RD 审核 envs.py 修改并 Approve 链接: 查看日志 |
This comment was marked as outdated.
This comment was marked as outdated.
| # 2. Calculate cache keys using same algorithm as write_all_cache_to_storage | ||
| keys = [] | ||
| prefix_block_key = [] | ||
| block_size = self.config.cache_config.block_size | ||
| mm_idx = 0 | ||
|
|
||
| for i in range(0, len(input_token_ids), block_size): | ||
| block_token_ids = input_token_ids[i : i + block_size] | ||
| actual_token_num = len(block_token_ids) | ||
|
|
||
| if actual_token_num < block_size: | ||
| key = get_hash_str(block_token_ids, prefix_block_key) | ||
| key = f"{key}:partial:{actual_token_num}" | ||
| keys.append(key) | ||
| else: | ||
| mm_idx, extra_keys = self.get_block_hash_extra_keys( | ||
| request=request, | ||
| start_idx=i, | ||
| end_idx=i + block_size, | ||
| mm_idx=mm_idx, | ||
| ) | ||
| prefix_block_key.extend(extra_keys) | ||
| key = get_hash_str(block_token_ids, prefix_block_key) | ||
| keys.append(key) | ||
|
|
||
| prefix_block_key = [key] | ||
|
|
||
| if not keys: | ||
| return [] | ||
|
|
||
| # 3. gpu_block_ids = D's pre-allocated block_tables | ||
| gpu_block_ids = request.block_tables[: len(keys)] |
| if envs.FD_PD_TRANSFER_VIA_STORAGE: | ||
| # Storage pool mode: bypass CacheMessager entirely. | ||
| # At this point, all transformer layers are complete and KV cache is in GPU memory. | ||
| # Directly write cache to storage and send first token to D. | ||
| result.metrics.wait_for_sending_cache_time = time.time() | ||
| trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_START, task_id, getattr(task, "user", "")) | ||
| if result.error_code == 200: | ||
| write_cache_start_time = time.time() | ||
| llm_logger.info(f"[PD Storage] P writing cache to storage (direct), request_id: {task_id}") | ||
| self.resource_manager.cache_manager.write_all_cache_to_storage(task, include_output=False) | ||
| llm_logger.info( | ||
| f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}" | ||
| f"[PD Storage] P finished writing cache to storage (direct), " | ||
| f"request_id: {task_id}, cost: {time.time()-write_cache_start_time:.5f}s" | ||
| ) | ||
| trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_END, task_id, getattr(task, "user", "")) | ||
| result.metrics.send_request_output_to_decode_time = time.time() | ||
| self.split_connector.send_first_token(task.disaggregate_info, [result]) | ||
| if envs.ENABLE_V1_KVCACHE_SCHEDULER: | ||
| self.resource_manager.finish_requests_async(task_id) | ||
| else: | ||
| self.resource_manager.stop_flags[index] = True | ||
| self.resource_manager.tasks_list[index] = None | ||
| self.resource_manager._recycle_block_tables(task) | ||
| if task_id in self.resource_manager.req_dict: | ||
| del self.resource_manager.req_dict[task_id] | ||
| break | ||
| trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_END, task_id, getattr(task, "user", "")) | ||
| result.metrics.send_request_output_to_decode_time = time.time() | ||
| self.split_connector.send_first_token(task.disaggregate_info, [result]) | ||
| if envs.ENABLE_V1_KVCACHE_SCHEDULER: | ||
| self.resource_manager.finish_requests_async(task_id) | ||
| else: | ||
| # TODO: Refine checking sending cache and do not keep waiting | ||
| if time.time() - start_time > 30: | ||
| llm_logger.warning(f"wait for sending cache, {task_id}") | ||
| time.sleep(0.002) | ||
| self.resource_manager.stop_flags[index] = True | ||
| self.resource_manager.tasks_list[index] = None | ||
| self.resource_manager._recycle_block_tables(task) | ||
| if task_id in self.resource_manager.req_dict: | ||
| del self.resource_manager.req_dict[task_id] |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-18 17:46:07
📋 Review 摘要
PR 概述:新增 PD 分离 Storage Pool 传输模式(通过 Mooncake 存储解耦 P/D 直连),并重构 swap_cache_layout CUDA 算子(连续 Block 合并 DMA + Scatter Kernel 优化)
变更范围:custom_ops/gpu_ops/、fastdeploy/cache_manager/、fastdeploy/engine/、fastdeploy/envs.py、fastdeploy/model_executor/layers/attention/
影响面 Tag:[OP] [KVCache] [Engine] [PD Disaggregation]
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | custom_ops/gpu_ops/swap_cache_layout.cu:76 |
三个全局 CUDA 缓冲区(g_staging_buffer 等)只增不减,进程退出前无 cudaFree,符合 §C 显存泄漏信号 |
| 🟡 建议 | custom_ops/gpu_ops/swap_cache_layout.cu:32 |
num_vec_per_layer 使用整除,block_stride * sizeof(T) 不能被 16 整除时尾部数据静默丢失 |
| ❓ 疑问 | fastdeploy/cache_manager/cache_transfer_manager.py:1302 |
swap_to_storage_barrier.reset() 被移除,需确认 Barrier 实现是否自动重置,否则并发请求可能死锁 |
📝 PR 规范检查
存在两处标题不合规:
- 使用
[PD](非官方 Tag),官方对应 Tag 为[PD Disaggregation] - 目标分支为
release/2.6(非 develop),必须使用[Cherry-Pick]格式;Checklist 第 5 条(cherry-pick)未勾选,且未在描述中注明 develop PR 编号
标题建议(可直接复制):
[Cherry-Pick][PD Disaggregation] PD send cache via storage & Refine swap_cache_layout op (#原始PR号)
注:
#原始PR号请替换为已合入 develop 分支的对应 PR 编号。
总体评价
Storage Pool 模式整体方案设计合理,TP 多 rank 结果聚合(min(saved_results, key=len))逻辑清晰,错误处理链路也有明显改善。但 CUDA 侧引入了三个永不释放的全局缓冲区,在 GPU 显存紧张的生产环境会造成显存碎片,属于 §C 必报项,需修复后合入。
| } | ||
| } | ||
|
|
||
| static void* g_staging_buffer = nullptr; |
There was a problem hiding this comment.
🔴 Bug g_staging_buffer、g_device_block_ids、g_device_layer_ptrs 三个全局静态 CUDA 缓冲区仅在需要扩容时释放旧缓冲区,但最终最大缓冲区在进程退出前永远不会被 cudaFree,违反 §C 显存泄漏检查项。在 GPU 显存吃紧的推理场景,这些缓冲区(大小 = n_blocks × layer_num × block_stride × sizeof(T))会长期持有 VRAM 无法归还。
建议修复方式:使用 RAII 结构体管理缓冲区生命周期,析构时自动 cudaFree:
struct CudaBuffer {
void* ptr = nullptr;
size_t size = 0;
~CudaBuffer() { if (ptr) { cudaFree(ptr); ptr = nullptr; } }
void ensure(size_t required) {
if (size < required) {
if (ptr) cudaFree(ptr);
PADDLE_ENFORCE_EQ(cudaMalloc(&ptr, required), cudaSuccess, ...);
size = required;
}
}
};
static CudaBuffer g_staging, g_block_ids, g_layer_ptrs;| if (block_idx >= n_blocks) return; | ||
|
|
||
| int64_t gpu_block = gpu_block_ids[block_idx]; | ||
| int64_t num_vec_per_layer = (block_stride * sizeof(T)) / sizeof(float4); |
There was a problem hiding this comment.
🟡 建议 num_vec_per_layer = (block_stride * sizeof(T)) / sizeof(float4) 使用整除截断:若 block_stride * sizeof(T) 不能被 sizeof(float4)(16 字节)整除,尾部元素会被静默丢弃,导致 KV Cache 数据不完整。
当前常见配置(head_dim=128,BF16)满足对齐要求,但建议加断言防止未来配置变更时触发静默错误:
assert((cache_block_stride * sizeof(DataType_)) % sizeof(float4) == 0 &&
"block_stride must be float4-aligned");scatter_blocks_kernel 的第 67 行存在相同问题,建议一并修复。
| @@ -1265,14 +1302,19 @@ def write_back_storage_task(self, task: WriteStorageTask): | |||
|
|
|||
There was a problem hiding this comment.
❓ 疑问 原代码中 rank 0 在调用 put_transfer_done_signal 之前会显式调用 swap_to_storage_barrier.reset(),新代码移除了该调用,改为所有 rank 都发送信号。
请确认 swap_to_storage_barrier 的具体实现是否在所有 party 到达后自动重置(Python threading.Barrier 有此行为)。若该 Barrier 属于需要手动重置的自定义实现,多请求并发时后续请求将在 barrier.wait() 处永久阻塞,引发死锁。
Motivation
在 PD 分离场景下,原方案 P 实例通过 RDMA/IPC 直传 KV Cache 到 D 实例 GPU,耦合度较高。本 PR 新增 Storage Pool 模式(
FD_PD_TRANSFER_VIA_STORAGE=1),P 将 KV Cache 写入全局存储池(Mooncake),D 从存储读取,解耦 P/D 直连依赖,提升部署灵活性。同时重构swap_cache_layoutCUDA 算子,CPU Block ID 连续时合并为单次 DMA + Scatter Kernel,降低 PCIe 传输开销。Modifications
fastdeploy/envs.py:新增FD_PD_TRANSFER_VIA_STORAGE(0=直传,1=存储池)fastdeploy/cache_manager/prefix_cache_manager.py:新增write_all_cache_to_storage()(支持最后不完整 block,key 加:partial:N后缀)和read_cache_from_storage_for_pd()fastdeploy/cache_manager/cache_messager.py:存储池模式下跳过 RDMA 传输,直接 mark finishedfastdeploy/output/token_processor.py:P 端 prefill 结束后直接写存储再发 first_tokenfastdeploy/engine/common_engine.py:D 端收到 first_token 后从存储读 Cache,失败返回 502fastdeploy/engine/common_engine_prepare_mixin.py:存储池模式跳过 send_cache_info_to_messagerfastdeploy/engine/sched/resource_manager_v1.py:存储池模式仅 D 端写 Cachefastdeploy/model_executor/layers/attention/*.py:存储池模式跳过 kv_signal 初始化custom_ops/gpu_ops/swap_cache_layout.cu:重构算子,连续 Block 走 D2H kernel / H2D staging+scatter 优化路径Usage or Command
Accuracy Tests
N/A
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.