-
Notifications
You must be signed in to change notification settings - Fork 878
Fix EP buffer allocation for MNNVL clusters #1629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d44dfbf
caf7902
9abf963
28cb287
abf67d0
c1c4f60
0c365fe
f67db33
83f9f1b
bb4b07b
ca53647
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,28 @@ | ||
| #include <mooncake_ep_buffer.h> | ||
| #include <arpa/inet.h> | ||
| #include <glog/logging.h> | ||
|
|
||
| namespace mooncake { | ||
|
|
||
| // Check if all GPUs support fabric memory handles (MNNVL). | ||
| // Mirrors the check in nvlink_transport.cpp. | ||
| static bool supportFabricMem() { | ||
| if (getenv("MC_USE_NVLINK_IPC")) return false; | ||
|
|
||
| int num_devices = 0; | ||
| cudaError_t err = cudaGetDeviceCount(&num_devices); | ||
| if (err != cudaSuccess || num_devices == 0) return false; | ||
|
|
||
| for (int dev = 0; dev < num_devices; ++dev) { | ||
| int supported = 0; | ||
| cuDeviceGetAttribute(&supported, | ||
| CU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTED, | ||
| dev); | ||
| if (!supported) return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| // Check if IPv6 address is an IPv4-mapped address (::ffff:x.x.x.x) | ||
| static inline bool ipv6_addr_v4mapped(const struct in6_addr* a) { | ||
| return ((a->s6_addr32[0] | a->s6_addr32[1]) == 0 && | ||
|
|
@@ -44,7 +64,91 @@ MooncakeEpBuffer::MooncakeEpBuffer(int rank, int num_ranks, | |
| CUDA_CHECK(cudaGetDevice(&device_id)); | ||
| CUDA_CHECK(cudaDeviceGetAttribute(&clock_rate_khz, cudaDevAttrClockRate, | ||
| device_id)); | ||
| CUDA_CHECK(cudaMalloc(&gdr_buffer, num_ep_buffer_bytes)); | ||
|
|
||
| // Allocate gdr_buffer. On MNNVL clusters, use cuMemCreate with a fabric | ||
| // handle so the buffer is accessible cross-node via NVLink fabric. | ||
| // On IB clusters or single-node setups, fall back to cudaMalloc. | ||
| use_fabric_mem_ = supportFabricMem(); | ||
| if (use_fabric_mem_) { | ||
| CUdevice cu_dev; | ||
| CUresult res = cuDeviceGet(&cu_dev, device_id); | ||
| if (res != CUDA_SUCCESS) { | ||
| LOG(ERROR) << "[EP] cuDeviceGet failed: " << res; | ||
| throw std::runtime_error("cuDeviceGet failed"); | ||
| } | ||
|
|
||
| CUmemAllocationProp prop = {}; | ||
| prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; | ||
| prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE; | ||
| prop.location.id = cu_dev; | ||
| prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_FABRIC; | ||
|
|
||
| int rdma_flag = 0; | ||
| cuDeviceGetAttribute( | ||
| &rdma_flag, | ||
| CU_DEVICE_ATTRIBUTE_GPU_DIRECT_RDMA_WITH_CUDA_VMM_SUPPORTED, | ||
| cu_dev); | ||
| if (rdma_flag) prop.allocFlags.gpuDirectRDMACapable = 1; | ||
|
|
||
| size_t granularity = 0; | ||
| res = cuMemGetAllocationGranularity(&granularity, &prop, | ||
| CU_MEM_ALLOC_GRANULARITY_MINIMUM); | ||
| if (res != CUDA_SUCCESS) { | ||
| LOG(ERROR) << "[EP] cuMemGetAllocationGranularity failed: " << res; | ||
| throw std::runtime_error("cuMemGetAllocationGranularity failed"); | ||
| } | ||
|
|
||
| fabric_alloc_size_ = | ||
| (num_ep_buffer_bytes + granularity - 1) & ~(granularity - 1); | ||
| if (fabric_alloc_size_ == 0) fabric_alloc_size_ = granularity; | ||
|
|
||
| res = cuMemCreate(&fabric_mem_handle_, fabric_alloc_size_, &prop, 0); | ||
| if (res != CUDA_SUCCESS) { | ||
| LOG(ERROR) << "[EP] cuMemCreate(FABRIC) failed: " << res; | ||
| throw std::runtime_error("cuMemCreate failed"); | ||
| } | ||
|
|
||
| CUdeviceptr dptr = 0; | ||
| res = cuMemAddressReserve(&dptr, fabric_alloc_size_, granularity, 0, 0); | ||
| if (res != CUDA_SUCCESS) { | ||
| cuMemRelease(fabric_mem_handle_); | ||
| LOG(ERROR) << "[EP] cuMemAddressReserve failed: " << res; | ||
| throw std::runtime_error("cuMemAddressReserve failed"); | ||
| } | ||
|
|
||
| res = cuMemMap(dptr, fabric_alloc_size_, 0, fabric_mem_handle_, 0); | ||
| if (res != CUDA_SUCCESS) { | ||
| cuMemAddressFree(dptr, fabric_alloc_size_); | ||
| cuMemRelease(fabric_mem_handle_); | ||
| LOG(ERROR) << "[EP] cuMemMap failed: " << res; | ||
| throw std::runtime_error("cuMemMap failed"); | ||
| } | ||
|
|
||
| // Grant read/write access to all devices in the fabric clique | ||
| int device_count = 0; | ||
| cudaGetDeviceCount(&device_count); | ||
| std::vector<CUmemAccessDesc> access(device_count); | ||
| for (int i = 0; i < device_count; ++i) { | ||
| access[i].location.type = CU_MEM_LOCATION_TYPE_DEVICE; | ||
| access[i].location.id = i; | ||
| access[i].flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; | ||
| } | ||
| res = cuMemSetAccess(dptr, fabric_alloc_size_, access.data(), | ||
| device_count); | ||
| if (res != CUDA_SUCCESS) { | ||
| cuMemUnmap(dptr, fabric_alloc_size_); | ||
| cuMemAddressFree(dptr, fabric_alloc_size_); | ||
| cuMemRelease(fabric_mem_handle_); | ||
| LOG(ERROR) << "[EP] cuMemSetAccess failed: " << res; | ||
| throw std::runtime_error("cuMemSetAccess failed"); | ||
| } | ||
|
Comment on lines
+110
to
+143
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling logic in this block for fabric memory allocation has a lot of duplicated resource cleanup code. For example, Consider refactoring this to reduce repetition. Common patterns for this in C++ are:
This would centralize the cleanup logic and make the allocation flow easier to follow. |
||
|
|
||
| gdr_buffer = reinterpret_cast<void*>(dptr); | ||
| LOG(INFO) << "[EP] Allocated " << fabric_alloc_size_ | ||
| << " bytes with fabric handle on GPU " << device_id; | ||
| } else { | ||
| CUDA_CHECK(cudaMalloc(&gdr_buffer, num_ep_buffer_bytes)); | ||
| } | ||
| CUDA_CHECK(cudaMalloc(&raddrs, num_ranks * sizeof(uint64_t))); | ||
| CUDA_CHECK(cudaMalloc(&rkeys, num_ranks * sizeof(uint32_t))); | ||
| CUDA_CHECK( | ||
|
|
@@ -71,7 +175,14 @@ MooncakeEpBuffer::MooncakeEpBuffer(int rank, int num_ranks, | |
| } | ||
|
|
||
| MooncakeEpBuffer::~MooncakeEpBuffer() noexcept(false) { | ||
| cudaFree(gdr_buffer); | ||
| if (use_fabric_mem_) { | ||
| CUdeviceptr dptr = reinterpret_cast<CUdeviceptr>(gdr_buffer); | ||
| cuMemUnmap(dptr, fabric_alloc_size_); | ||
| cuMemAddressFree(dptr, fabric_alloc_size_); | ||
| cuMemRelease(fabric_mem_handle_); | ||
| } else { | ||
| cudaFree(gdr_buffer); | ||
| } | ||
| cudaFree(raddrs); | ||
| cudaFree(rkeys); | ||
| cudaFree(qp_devctxs); | ||
|
|
@@ -529,6 +640,12 @@ void MooncakeEpBuffer::sync_roce(const std::vector<int64_t>& remote_addrs, | |
| } | ||
|
|
||
| std::vector<int32_t> MooncakeEpBuffer::get_ipc_handle() { | ||
| if (use_fabric_mem_) { | ||
| // Fabric memory is globally accessible via cuMemSetAccess — no IPC | ||
| // handle exchange needed. Return an empty vector so the caller knows | ||
| // to skip IPC for this rank. | ||
| return {}; | ||
| } | ||
| cudaIpcMemHandle_t handle; | ||
| CUDA_CHECK(cudaIpcGetMemHandle(&handle, gdr_buffer)); | ||
| // Convert handle bytes to int32_t array | ||
|
|
@@ -542,106 +659,112 @@ std::vector<int32_t> MooncakeEpBuffer::get_ipc_handle() { | |
|
|
||
| void MooncakeEpBuffer::sync_nvlink_ipc_handles( | ||
| const std::vector<std::vector<int32_t>>& remote_handles) { | ||
| // We assume ranks are grouped by device_count (same node) | ||
| int device_count = 0; | ||
| CUDA_CHECK(cudaGetDeviceCount(&device_count)); | ||
|
|
||
| std::vector<int32_t> nvlink_array(num_ranks, 0); | ||
| nvlink_array[rank] = 1; | ||
|
|
||
| int node_id = rank / device_count; | ||
| int group_start = node_id * device_count; | ||
| int group_end = std::min(group_start + device_count, num_ranks); | ||
|
|
||
| // Check peer access and enable it within the same node group | ||
| for (int dst_rank = group_start; dst_rank < group_end; ++dst_rank) { | ||
| if (dst_rank == rank) { | ||
| // Local rank - use local pointer | ||
| ipc_peer_ptrs_host[dst_rank] = gdr_buffer; | ||
| continue; | ||
| if (use_fabric_mem_) { | ||
| // MNNVL: fabric addresses are globally visible across the clique. | ||
| // All ranks can directly access each other's gdr_buffer without IPC | ||
| // handle exchange — cuMemSetAccess already granted all devices | ||
| // read/write access during allocation. | ||
| for (int i = 0; i < num_ranks; ++i) { | ||
| nvlink_array[i] = 1; | ||
| // Each rank's gdr_buffer is directly accessible; the remote | ||
| // addresses will be exchanged via the RDMA address sync path | ||
| // (sync_ib / sync_roce) or via a separate fabric address exchange. | ||
| // For local rank, point to our own buffer. | ||
| ipc_peer_ptrs_host[i] = (i == rank) ? gdr_buffer : nullptr; | ||
| } | ||
|
Comment on lines
+712
to
719
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code introduces a denial of service (DoS) vulnerability. In |
||
| p2p_ipc_all_enabled_ = true; | ||
| LOG(INFO) << "[EP] Fabric memory enabled, skipping IPC handle exchange"; | ||
| } else { | ||
| // Non-MNNVL: use cudaIpc for intra-node P2P (original path) | ||
| int node_id = rank / device_count; | ||
| int group_start = node_id * device_count; | ||
| int group_end = std::min(group_start + device_count, num_ranks); | ||
|
|
||
| for (int dst_rank = group_start; dst_rank < group_end; ++dst_rank) { | ||
| if (dst_rank == rank) { | ||
| ipc_peer_ptrs_host[dst_rank] = gdr_buffer; | ||
| continue; | ||
| } | ||
|
|
||
| int dst_device = dst_rank % device_count; | ||
| int can_access_peer = 0; | ||
| cudaError_t err = | ||
| cudaDeviceCanAccessPeer(&can_access_peer, device_id, dst_device); | ||
| if (err == cudaSuccess && can_access_peer) { | ||
| cudaError_t peer_err = cudaDeviceEnablePeerAccess(dst_device, 0); | ||
| if (peer_err == cudaSuccess || | ||
| peer_err == cudaErrorPeerAccessAlreadyEnabled) { | ||
| // Clear sticky error on re-init so CUDA graph capture / | ||
| // dispatch later does not see | ||
| // cudaErrorPeerAccessAlreadyEnabled. | ||
| if (peer_err == cudaErrorPeerAccessAlreadyEnabled) { | ||
| cudaGetLastError(); | ||
| } | ||
| nvlink_array[dst_rank] = 1; | ||
|
|
||
| // Open IPC handle for this peer | ||
| if (dst_rank >= static_cast<int>(remote_handles.size())) { | ||
| LOG(WARNING) << "[EP] Rank " << rank | ||
| << " missing IPC handle for rank " << dst_rank; | ||
| continue; | ||
| } | ||
|
|
||
| const size_t handle_size = sizeof(cudaIpcMemHandle_t); | ||
| const size_t num_int32s = | ||
| (handle_size + sizeof(int32_t) - 1) / sizeof(int32_t); | ||
| const auto& handle_ints = remote_handles[dst_rank]; | ||
| if (handle_ints.size() < num_int32s) { | ||
| LOG(WARNING) | ||
| << "[EP] Rank " << rank | ||
| << " invalid IPC handle size for rank " << dst_rank; | ||
| continue; | ||
| } | ||
|
|
||
| cudaIpcMemHandle_t remote_handle; | ||
| memcpy(&remote_handle, handle_ints.data(), handle_size); | ||
|
|
||
| void* peer_ptr = nullptr; | ||
| cudaError_t ipc_err = cudaIpcOpenMemHandle( | ||
| &peer_ptr, remote_handle, cudaIpcMemLazyEnablePeerAccess); | ||
| if (ipc_err != cudaSuccess) { | ||
| LOG(WARNING) | ||
| << "[EP] Rank " << rank | ||
| << " failed to open IPC handle for rank " << dst_rank | ||
| << ": " << cudaGetErrorString(ipc_err); | ||
| nvlink_array[dst_rank] = 0; | ||
| } else { | ||
| ipc_peer_ptrs_host[dst_rank] = peer_ptr; | ||
| int dst_device = dst_rank % device_count; | ||
| int can_access_peer = 0; | ||
| cudaError_t err = cudaDeviceCanAccessPeer(&can_access_peer, | ||
| device_id, dst_device); | ||
| if (err == cudaSuccess && can_access_peer) { | ||
| cudaError_t peer_err = | ||
| cudaDeviceEnablePeerAccess(dst_device, 0); | ||
| if (peer_err == cudaSuccess || | ||
| peer_err == cudaErrorPeerAccessAlreadyEnabled) { | ||
| if (peer_err == cudaErrorPeerAccessAlreadyEnabled) { | ||
| cudaGetLastError(); | ||
| } | ||
| nvlink_array[dst_rank] = 1; | ||
|
|
||
| if (dst_rank >= | ||
| static_cast<int>(remote_handles.size())) { | ||
| LOG(WARNING) | ||
| << "[EP] Rank " << rank | ||
| << " missing IPC handle for rank " << dst_rank; | ||
| continue; | ||
| } | ||
|
|
||
| const size_t handle_size = sizeof(cudaIpcMemHandle_t); | ||
| const size_t num_int32s = | ||
| (handle_size + sizeof(int32_t) - 1) / sizeof(int32_t); | ||
| const auto& handle_ints = remote_handles[dst_rank]; | ||
| if (handle_ints.size() < num_int32s) { | ||
| LOG(WARNING) | ||
| << "[EP] Rank " << rank | ||
| << " invalid IPC handle size for rank " | ||
| << dst_rank; | ||
| continue; | ||
| } | ||
|
|
||
| cudaIpcMemHandle_t remote_handle; | ||
| memcpy(&remote_handle, handle_ints.data(), handle_size); | ||
|
|
||
| void* peer_ptr = nullptr; | ||
| cudaError_t ipc_err = cudaIpcOpenMemHandle( | ||
| &peer_ptr, remote_handle, | ||
| cudaIpcMemLazyEnablePeerAccess); | ||
| if (ipc_err != cudaSuccess) { | ||
| LOG(WARNING) | ||
| << "[EP] Rank " << rank | ||
| << " failed to open IPC handle for rank " | ||
| << dst_rank << ": " | ||
| << cudaGetErrorString(ipc_err); | ||
| nvlink_array[dst_rank] = 0; | ||
| } else { | ||
| ipc_peer_ptrs_host[dst_rank] = peer_ptr; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Check if P2P+IPC is available for ALL rank pairs. | ||
| // For P2P+IPC to be fully usable without IBGDA, every rank must be able to | ||
| // access every other rank via P2P+IPC. Since we only check within the same | ||
| // node group, all ranks must be in the same node group. | ||
| p2p_ipc_all_enabled_ = true; | ||
| for (int i = 0; i < num_ranks; ++i) { | ||
| // Must have P2P enabled and a valid peer pointer for every rank. | ||
| // Note: for local rank we set ipc_peer_ptrs_host[rank] = gdr_buffer. | ||
| if (nvlink_array[i] == 0 || ipc_peer_ptrs_host[i] == nullptr) { | ||
| p2p_ipc_all_enabled_ = false; | ||
| break; | ||
| p2p_ipc_all_enabled_ = true; | ||
| for (int i = 0; i < num_ranks; ++i) { | ||
| if (nvlink_array[i] == 0 || ipc_peer_ptrs_host[i] == nullptr) { | ||
| p2p_ipc_all_enabled_ = false; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| // Verify all ranks are in the same node group (cross-node requires IBGDA) | ||
| if (p2p_ipc_all_enabled_ && num_ranks > 1) { | ||
| int first_node_id = 0 / device_count; | ||
| int last_node_id = (num_ranks - 1) / device_count; | ||
| if (first_node_id != last_node_id) { | ||
| // Ranks span multiple nodes, P2P only works within nodes | ||
| p2p_ipc_all_enabled_ = false; | ||
| if (p2p_ipc_all_enabled_ && num_ranks > 1) { | ||
| int first_node_id = 0 / device_count; | ||
| int last_node_id = (num_ranks - 1) / device_count; | ||
| if (first_node_id != last_node_id) { | ||
| p2p_ipc_all_enabled_ = false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Copy NVLink availability to device memory | ||
| CUDA_CHECK(cudaMemcpy(nvlink_available, nvlink_array.data(), | ||
| num_ranks * sizeof(int32_t), cudaMemcpyHostToDevice)); | ||
|
|
||
| // Copy IPC pointers to device memory for kernel access | ||
| CUDA_CHECK(cudaMemcpy(ipc_peer_ptrs, ipc_peer_ptrs_host, | ||
| num_ranks * sizeof(void*), cudaMemcpyHostToDevice)); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
supportFabricMem()function checks for theCU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTEDattribute on all GPUs. This attribute is true for all NVIDIA Hopper (e.g., H100) and later GPUs. However, the PR assumes that if this attribute is true, the system is a GB200 MNNVL cluster where all nodes are in a single NVLink fabric clique. On standard H100 clusters that use InfiniBand for cross-node communication,supportFabricMem()will return true, but these clusters are NOT a single NVLink domain across nodes. The code insync_nvlink_ipc_handleswill then incorrectly setnvlink_available[i] = 1for all ranks, including those on different nodes. This will cause the kernels to attempt NVLink access for cross-node communication, which will fail and crash the application. This is a significant regression that breaks Mooncake on any multi-node Hopper cluster that is not MNNVL.