Skip to content
Merged
6 changes: 6 additions & 0 deletions mooncake-ep/include/mooncake_ep_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ATen/cuda/CUDAContext.h>
#include <cuda_bf16.h>
#include <cuda.h>
#include <cuda_runtime.h>
#include <fstream>
#include <mooncake_ibgda/memheap.h>
Expand Down Expand Up @@ -91,6 +92,11 @@ struct MooncakeEpBuffer {
mlx5dv_pd mpd;
memheap* ctrl_buf_heap;

// Fabric memory (MNNVL)
bool use_fabric_mem_ = false;
CUmemGenericAllocationHandle fabric_mem_handle_{};
size_t fabric_alloc_size_ = 0;

// NVLink P2P
int32_t* nvlink_available = nullptr;
void** ipc_peer_ptrs_host = nullptr;
Expand Down
17 changes: 15 additions & 2 deletions mooncake-ep/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from setuptools import setup
import torch
from torch.utils.cpp_extension import BuildExtension, CUDAExtension
from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CUDA_HOME


torch_version = re.match(r"\d+(?:\.\d+)*", torch.__version__).group()
Expand All @@ -13,6 +13,18 @@
abi_flag = int(torch._C._GLIBCXX_USE_CXX11_ABI)
current_dir = os.path.abspath(os.path.dirname(__file__))

# Try to link against the CUDA driver stub library if it exists.
cuda_libraries = ["ibverbs", "mlx5"]
cuda_library_dirs = []

if CUDA_HOME is not None:
cuda_stub_dir = os.path.join(CUDA_HOME, "lib64", "stubs")
cuda_stub_lib = os.path.join(cuda_stub_dir, "libcuda.so")
if os.path.exists(cuda_stub_lib):
cuda_libraries.insert(0, "cuda")
cuda_library_dirs.append(cuda_stub_dir)



setup(
name=module_name,
Expand All @@ -33,7 +45,8 @@
"cxx": [f"-D_GLIBCXX_USE_CXX11_ABI={abi_flag}", "-std=c++20", "-O3", "-g0"],
"nvcc": [f"-D_GLIBCXX_USE_CXX11_ABI={abi_flag}", "-std=c++20", "-Xcompiler", "-O3", "-Xcompiler", "-g0"],
},
libraries=["ibverbs", "mlx5"],
libraries=cuda_libraries,
library_dirs=cuda_library_dirs,
extra_link_args=[
"-Wl,-rpath,$ORIGIN",
"-L" + os.path.join(current_dir, "../mooncake-wheel/mooncake"),
Expand Down
287 changes: 203 additions & 84 deletions mooncake-ep/src/mooncake_ep_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
#include <mooncake_ep_buffer.h>
#include <arpa/inet.h>
#include <glog/logging.h>

namespace mooncake {

// Check if all GPUs support fabric memory handles (MNNVL).
// Mirrors the check in nvlink_transport.cpp.
static bool supportFabricMem() {
if (getenv("MC_USE_NVLINK_IPC")) return false;

int num_devices = 0;
cudaError_t err = cudaGetDeviceCount(&num_devices);
if (err != cudaSuccess || num_devices == 0) return false;

for (int dev = 0; dev < num_devices; ++dev) {
int supported = 0;
cuDeviceGetAttribute(
&supported, CU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTED, dev);
if (!supported) return false;
}
return true;
}
Comment on lines +9 to +23

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The supportFabricMem() function checks for the CU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTED attribute on all GPUs. This attribute is true for all NVIDIA Hopper (e.g., H100) and later GPUs. However, the PR assumes that if this attribute is true, the system is a GB200 MNNVL cluster where all nodes are in a single NVLink fabric clique. On standard H100 clusters that use InfiniBand for cross-node communication, supportFabricMem() will return true, but these clusters are NOT a single NVLink domain across nodes. The code in sync_nvlink_ipc_handles will then incorrectly set nvlink_available[i] = 1 for all ranks, including those on different nodes. This will cause the kernels to attempt NVLink access for cross-node communication, which will fail and crash the application. This is a significant regression that breaks Mooncake on any multi-node Hopper cluster that is not MNNVL.


// Check if IPv6 address is an IPv4-mapped address (::ffff:x.x.x.x)
static inline bool ipv6_addr_v4mapped(const struct in6_addr* a) {
return ((a->s6_addr32[0] | a->s6_addr32[1]) == 0 &&
Expand Down Expand Up @@ -44,7 +63,91 @@ MooncakeEpBuffer::MooncakeEpBuffer(int rank, int num_ranks,
CUDA_CHECK(cudaGetDevice(&device_id));
CUDA_CHECK(cudaDeviceGetAttribute(&clock_rate_khz, cudaDevAttrClockRate,
device_id));
CUDA_CHECK(cudaMalloc(&gdr_buffer, num_ep_buffer_bytes));

// Allocate gdr_buffer. On MNNVL clusters, use cuMemCreate with a fabric
// handle so the buffer is accessible cross-node via NVLink fabric.
// On IB clusters or single-node setups, fall back to cudaMalloc.
use_fabric_mem_ = supportFabricMem();
if (use_fabric_mem_) {
CUdevice cu_dev;
CUresult res = cuDeviceGet(&cu_dev, device_id);
if (res != CUDA_SUCCESS) {
LOG(ERROR) << "[EP] cuDeviceGet failed: " << res;
throw std::runtime_error("cuDeviceGet failed");
}

CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = cu_dev;
prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_FABRIC;

int rdma_flag = 0;
cuDeviceGetAttribute(
&rdma_flag,
CU_DEVICE_ATTRIBUTE_GPU_DIRECT_RDMA_WITH_CUDA_VMM_SUPPORTED,
cu_dev);
if (rdma_flag) prop.allocFlags.gpuDirectRDMACapable = 1;

size_t granularity = 0;
res = cuMemGetAllocationGranularity(&granularity, &prop,
CU_MEM_ALLOC_GRANULARITY_MINIMUM);
if (res != CUDA_SUCCESS) {
LOG(ERROR) << "[EP] cuMemGetAllocationGranularity failed: " << res;
throw std::runtime_error("cuMemGetAllocationGranularity failed");
}

fabric_alloc_size_ =
(num_ep_buffer_bytes + granularity - 1) & ~(granularity - 1);
if (fabric_alloc_size_ == 0) fabric_alloc_size_ = granularity;

res = cuMemCreate(&fabric_mem_handle_, fabric_alloc_size_, &prop, 0);
if (res != CUDA_SUCCESS) {
LOG(ERROR) << "[EP] cuMemCreate(FABRIC) failed: " << res;
throw std::runtime_error("cuMemCreate failed");
}

CUdeviceptr dptr = 0;
res = cuMemAddressReserve(&dptr, fabric_alloc_size_, granularity, 0, 0);
if (res != CUDA_SUCCESS) {
cuMemRelease(fabric_mem_handle_);
LOG(ERROR) << "[EP] cuMemAddressReserve failed: " << res;
throw std::runtime_error("cuMemAddressReserve failed");
}

res = cuMemMap(dptr, fabric_alloc_size_, 0, fabric_mem_handle_, 0);
if (res != CUDA_SUCCESS) {
cuMemAddressFree(dptr, fabric_alloc_size_);
cuMemRelease(fabric_mem_handle_);
LOG(ERROR) << "[EP] cuMemMap failed: " << res;
throw std::runtime_error("cuMemMap failed");
}

// Grant read/write access to all devices in the fabric clique
int device_count = 0;
cudaGetDeviceCount(&device_count);
std::vector<CUmemAccessDesc> access(device_count);
for (int i = 0; i < device_count; ++i) {
access[i].location.type = CU_MEM_LOCATION_TYPE_DEVICE;
access[i].location.id = i;
access[i].flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
}
res = cuMemSetAccess(dptr, fabric_alloc_size_, access.data(),
device_count);
if (res != CUDA_SUCCESS) {
cuMemUnmap(dptr, fabric_alloc_size_);
cuMemAddressFree(dptr, fabric_alloc_size_);
cuMemRelease(fabric_mem_handle_);
LOG(ERROR) << "[EP] cuMemSetAccess failed: " << res;
throw std::runtime_error("cuMemSetAccess failed");
}
Comment on lines +110 to +143

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error handling logic in this block for fabric memory allocation has a lot of duplicated resource cleanup code. For example, cuMemRelease(fabric_mem_handle_) is called in multiple failure paths. This makes the code harder to read and maintain.

Consider refactoring this to reduce repetition. Common patterns for this in C++ are:

  • Using RAII wrapper classes for the CUDA driver API resources (CUmemGenericAllocationHandle, CUdeviceptr for reserved VA). The destructor of the wrapper would handle cleanup.
  • Using a goto-based cleanup block at the end of the function, which is a common C-style pattern for resource management.

This would centralize the cleanup logic and make the allocation flow easier to follow.


gdr_buffer = reinterpret_cast<void*>(dptr);
LOG(INFO) << "[EP] Allocated " << fabric_alloc_size_
<< " bytes with fabric handle on GPU " << device_id;
} else {
CUDA_CHECK(cudaMalloc(&gdr_buffer, num_ep_buffer_bytes));
}
CUDA_CHECK(cudaMalloc(&raddrs, num_ranks * sizeof(uint64_t)));
CUDA_CHECK(cudaMalloc(&rkeys, num_ranks * sizeof(uint32_t)));
CUDA_CHECK(
Expand All @@ -71,7 +174,14 @@ MooncakeEpBuffer::MooncakeEpBuffer(int rank, int num_ranks,
}

MooncakeEpBuffer::~MooncakeEpBuffer() noexcept(false) {
cudaFree(gdr_buffer);
if (use_fabric_mem_) {
CUdeviceptr dptr = reinterpret_cast<CUdeviceptr>(gdr_buffer);
cuMemUnmap(dptr, fabric_alloc_size_);
cuMemAddressFree(dptr, fabric_alloc_size_);
cuMemRelease(fabric_mem_handle_);
} else {
cudaFree(gdr_buffer);
}
cudaFree(raddrs);
cudaFree(rkeys);
cudaFree(qp_devctxs);
Expand Down Expand Up @@ -569,6 +679,12 @@ void MooncakeEpBuffer::sync_roce(const std::vector<int64_t>& remote_addrs,
}

std::vector<int32_t> MooncakeEpBuffer::get_ipc_handle() {
if (use_fabric_mem_) {
// Fabric memory is globally accessible via cuMemSetAccess — no IPC
// handle exchange needed. Return an empty vector so the caller knows
// to skip IPC for this rank.
return {};
}
cudaIpcMemHandle_t handle;
CUDA_CHECK(cudaIpcGetMemHandle(&handle, gdr_buffer));
// Convert handle bytes to int32_t array
Expand All @@ -582,106 +698,109 @@ std::vector<int32_t> MooncakeEpBuffer::get_ipc_handle() {

void MooncakeEpBuffer::sync_nvlink_ipc_handles(
const std::vector<std::vector<int32_t>>& remote_handles) {
// We assume ranks are grouped by device_count (same node)
int device_count = 0;
CUDA_CHECK(cudaGetDeviceCount(&device_count));

std::vector<int32_t> nvlink_array(num_ranks, 0);
nvlink_array[rank] = 1;

int node_id = rank / device_count;
int group_start = node_id * device_count;
int group_end = std::min(group_start + device_count, num_ranks);

// Check peer access and enable it within the same node group
for (int dst_rank = group_start; dst_rank < group_end; ++dst_rank) {
if (dst_rank == rank) {
// Local rank - use local pointer
ipc_peer_ptrs_host[dst_rank] = gdr_buffer;
continue;
if (use_fabric_mem_) {
// MNNVL: fabric addresses are globally visible across the clique.
// All ranks can directly access each other's gdr_buffer without IPC
// handle exchange — cuMemSetAccess already granted all devices
// read/write access during allocation.
for (int i = 0; i < num_ranks; ++i) {
nvlink_array[i] = 1;
// Each rank's gdr_buffer is directly accessible; the remote
// addresses will be exchanged via the RDMA address sync path
// (sync_ib / sync_roce) or via a separate fabric address exchange.
// For local rank, point to our own buffer.
ipc_peer_ptrs_host[i] = (i == rank) ? gdr_buffer : nullptr;
}
Comment on lines +712 to 719

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

This code introduces a denial of service (DoS) vulnerability. In MooncakeEpBuffer::sync_nvlink_ipc_handles, when use_fabric_mem_ is true, ipc_peer_ptrs_host[i] is incorrectly set to nullptr for remote ranks. Despite this, nvlink_array[i] is set to 1, indicating NVLink availability. GPU kernels (dispatch and combine in mooncake-ep/src/mooncake_ep_kernel.cu) will then attempt to access remote memory via these null pointers, causing a page fault and CUDA context crash, leading to DoS. For cross-node access with fabric memory, explicit handle exchange and mapping are required, as cuMemSetAccess is insufficient. The correct procedure involves exporting a CUmemFabricHandle from the CUmemGenericAllocationHandle using cuMemExportToShareableHandle, exchanging these fabric handles between ranks, importing them using cuMemImportFromShareableHandle, mapping them into the VA space using cuMemMap, and storing the resulting pointers in ipc_peer_ptrs_host. Refer to the implementation in mooncake-transfer-engine/src/transport/nvlink_transport/nvlink_transport.cpp's relocateSharedMemoryAddress function for a good example of this process.

p2p_ipc_all_enabled_ = true;
LOG(INFO) << "[EP] Fabric memory enabled, skipping IPC handle exchange";
} else {
// Non-MNNVL: use cudaIpc for intra-node P2P (original path)
int node_id = rank / device_count;
int group_start = node_id * device_count;
int group_end = std::min(group_start + device_count, num_ranks);

for (int dst_rank = group_start; dst_rank < group_end; ++dst_rank) {
if (dst_rank == rank) {
ipc_peer_ptrs_host[dst_rank] = gdr_buffer;
continue;
}

int dst_device = dst_rank % device_count;
int can_access_peer = 0;
cudaError_t err =
cudaDeviceCanAccessPeer(&can_access_peer, device_id, dst_device);
if (err == cudaSuccess && can_access_peer) {
cudaError_t peer_err = cudaDeviceEnablePeerAccess(dst_device, 0);
if (peer_err == cudaSuccess ||
peer_err == cudaErrorPeerAccessAlreadyEnabled) {
// Clear sticky error on re-init so CUDA graph capture /
// dispatch later does not see
// cudaErrorPeerAccessAlreadyEnabled.
if (peer_err == cudaErrorPeerAccessAlreadyEnabled) {
cudaGetLastError();
}
nvlink_array[dst_rank] = 1;

// Open IPC handle for this peer
if (dst_rank >= static_cast<int>(remote_handles.size())) {
LOG(WARNING) << "[EP] Rank " << rank
<< " missing IPC handle for rank " << dst_rank;
continue;
}

const size_t handle_size = sizeof(cudaIpcMemHandle_t);
const size_t num_int32s =
(handle_size + sizeof(int32_t) - 1) / sizeof(int32_t);
const auto& handle_ints = remote_handles[dst_rank];
if (handle_ints.size() < num_int32s) {
LOG(WARNING)
<< "[EP] Rank " << rank
<< " invalid IPC handle size for rank " << dst_rank;
continue;
}

cudaIpcMemHandle_t remote_handle;
memcpy(&remote_handle, handle_ints.data(), handle_size);

void* peer_ptr = nullptr;
cudaError_t ipc_err = cudaIpcOpenMemHandle(
&peer_ptr, remote_handle, cudaIpcMemLazyEnablePeerAccess);
if (ipc_err != cudaSuccess) {
LOG(WARNING)
<< "[EP] Rank " << rank
<< " failed to open IPC handle for rank " << dst_rank
<< ": " << cudaGetErrorString(ipc_err);
nvlink_array[dst_rank] = 0;
} else {
ipc_peer_ptrs_host[dst_rank] = peer_ptr;
int dst_device = dst_rank % device_count;
int can_access_peer = 0;
cudaError_t err = cudaDeviceCanAccessPeer(&can_access_peer,
device_id, dst_device);
if (err == cudaSuccess && can_access_peer) {
cudaError_t peer_err =
cudaDeviceEnablePeerAccess(dst_device, 0);
if (peer_err == cudaSuccess ||
peer_err == cudaErrorPeerAccessAlreadyEnabled) {
if (peer_err == cudaErrorPeerAccessAlreadyEnabled) {
cudaGetLastError();
}
nvlink_array[dst_rank] = 1;

if (dst_rank >= static_cast<int>(remote_handles.size())) {
LOG(WARNING)
<< "[EP] Rank " << rank
<< " missing IPC handle for rank " << dst_rank;
continue;
}

const size_t handle_size = sizeof(cudaIpcMemHandle_t);
const size_t num_int32s =
(handle_size + sizeof(int32_t) - 1) / sizeof(int32_t);
const auto& handle_ints = remote_handles[dst_rank];
if (handle_ints.size() < num_int32s) {
LOG(WARNING)
<< "[EP] Rank " << rank
<< " invalid IPC handle size for rank " << dst_rank;
continue;
}

cudaIpcMemHandle_t remote_handle;
memcpy(&remote_handle, handle_ints.data(), handle_size);

void* peer_ptr = nullptr;
cudaError_t ipc_err =
cudaIpcOpenMemHandle(&peer_ptr, remote_handle,
cudaIpcMemLazyEnablePeerAccess);
if (ipc_err != cudaSuccess) {
LOG(WARNING)
<< "[EP] Rank " << rank
<< " failed to open IPC handle for rank "
<< dst_rank << ": " << cudaGetErrorString(ipc_err);
nvlink_array[dst_rank] = 0;
} else {
ipc_peer_ptrs_host[dst_rank] = peer_ptr;
}
}
}
}
}

// Check if P2P+IPC is available for ALL rank pairs.
// For P2P+IPC to be fully usable without IBGDA, every rank must be able to
// access every other rank via P2P+IPC. Since we only check within the same
// node group, all ranks must be in the same node group.
p2p_ipc_all_enabled_ = true;
for (int i = 0; i < num_ranks; ++i) {
// Must have P2P enabled and a valid peer pointer for every rank.
// Note: for local rank we set ipc_peer_ptrs_host[rank] = gdr_buffer.
if (nvlink_array[i] == 0 || ipc_peer_ptrs_host[i] == nullptr) {
p2p_ipc_all_enabled_ = false;
break;
p2p_ipc_all_enabled_ = true;
for (int i = 0; i < num_ranks; ++i) {
if (nvlink_array[i] == 0 || ipc_peer_ptrs_host[i] == nullptr) {
p2p_ipc_all_enabled_ = false;
break;
}
}
}
// Verify all ranks are in the same node group (cross-node requires IBGDA)
if (p2p_ipc_all_enabled_ && num_ranks > 1) {
int first_node_id = 0 / device_count;
int last_node_id = (num_ranks - 1) / device_count;
if (first_node_id != last_node_id) {
// Ranks span multiple nodes, P2P only works within nodes
p2p_ipc_all_enabled_ = false;
if (p2p_ipc_all_enabled_ && num_ranks > 1) {
int first_node_id = 0 / device_count;
int last_node_id = (num_ranks - 1) / device_count;
if (first_node_id != last_node_id) {
p2p_ipc_all_enabled_ = false;
}
}
}

// Copy NVLink availability to device memory
CUDA_CHECK(cudaMemcpy(nvlink_available, nvlink_array.data(),
num_ranks * sizeof(int32_t), cudaMemcpyHostToDevice));

// Copy IPC pointers to device memory for kernel access
CUDA_CHECK(cudaMemcpy(ipc_peer_ptrs, ipc_peer_ptrs_host,
num_ranks * sizeof(void*), cudaMemcpyHostToDevice));
}
Expand Down
Loading