Skip to content

Commit 1a5984a

Browse files
committed
Add Connection::atomicAdd interface
1 parent 4d9bb9f commit 1a5984a

File tree

11 files changed

+181
-2
lines changed

11 files changed

+181
-2
lines changed

include/mscclpp/core.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,12 @@ class Connection {
597597
/// @param newValue The new value to write.
598598
virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0;
599599

600+
/// Atomically add an 8-byte value to a destination RegisteredMemory.
601+
/// @param dst The destination RegisteredMemory.
602+
/// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory.
603+
/// @param value The value to add.
604+
virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) = 0;
605+
600606
/// Flush any pending writes to the remote process.
601607
/// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout)
602608
virtual void flush(int64_t timeoutUsec = -1) = 0;

include/mscclpp/gpu.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
8888
#define cudaIpcGetMemHandle(...) hipIpcGetMemHandle(__VA_ARGS__)
8989
#define cudaIpcOpenMemHandle(...) hipIpcOpenMemHandle(__VA_ARGS__)
9090
#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__)
91+
#define cudaLaunchKernel(...) hipLaunchKernel(__VA_ARGS__)
9192

9293
#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__)
9394
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)

include/mscclpp/port_channel.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ProxyService : public BaseProxyService {
7373
/// Stop the proxy service.
7474
void stopProxy();
7575

76-
private:
76+
protected:
7777
std::vector<std::shared_ptr<Host2DeviceSemaphore>> semaphores_;
7878
std::vector<RegisteredMemory> memories_;
7979
std::shared_ptr<Proxy> proxy_;

python/mscclpp/core_py.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ void register_core(nb::module_& m) {
148148
self->updateAndSync(dst, dstOffset, (uint64_t*)src, newValue);
149149
},
150150
nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("newValue"))
151+
.def("atomic_add", &Connection::atomicAdd, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("value"))
151152
.def("flush", &Connection::flush, nb::call_guard<nb::gil_scoped_release>(), nb::arg("timeoutUsec") = (int64_t)3e7)
152153
.def("transport", &Connection::transport)
153154
.def("remote_transport", &Connection::remoteTransport)

src/connection.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <thread>
1414

1515
#include "api.h"
16+
#include "connection_kernels.hpp"
1617
#include "context.hpp"
1718
#include "debug.h"
1819
#include "endpoint.hpp"
@@ -141,6 +142,17 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
141142
#endif
142143
}
143144

145+
void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
146+
validateTransport(dst, remoteTransport());
147+
148+
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);
149+
void* args[] = {reinterpret_cast<void**>(&dstPtr), &value};
150+
151+
stream_->launch(connectionAtomicAddKernelFunc(), dim3(1), dim3(1), args, 0);
152+
153+
INFO(MSCCLPP_P2P, "CudaIpcConnection atomicAdd: value %lu to %p", value, dstPtr);
154+
}
155+
144156
void CudaIpcConnection::flush(int64_t timeoutUsec) {
145157
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
146158
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
@@ -244,6 +256,19 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6
244256
#endif
245257
}
246258

259+
void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
260+
validateTransport(dst, remoteTransport());
261+
auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport());
262+
if (dstTransportInfo.ibLocal) {
263+
throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage);
264+
}
265+
266+
auto dstMrInfo = dstTransportInfo.ibMrInfo;
267+
qp_.lock()->stageAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, value, /*signaled=*/true);
268+
qp_.lock()->postSend();
269+
INFO(MSCCLPP_NET, "IBConnection atomicAdd: value %lu to %p", value, (uint8_t*)dstMrInfo.addr + dstOffset);
270+
}
271+
247272
void IBConnection::flush(int64_t timeoutUsec) {
248273
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
249274
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
@@ -409,6 +434,11 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
409434
#endif
410435
}
411436

437+
void EthernetConnection::atomicAdd([[maybe_unused]] RegisteredMemory dst, [[maybe_unused]] uint64_t dstOffset,
438+
[[maybe_unused]] uint64_t value) {
439+
throw mscclpp::Error("EthernetConnection does not support atomicAdd", ErrorCode::InvalidUsage);
440+
}
441+
412442
void EthernetConnection::flush(int64_t) {
413443
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
414444
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);

src/connection_kernels.cu

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
#include <mscclpp/atomic_device.hpp>
5+
6+
#include "connection_kernels.hpp"
7+
8+
namespace mscclpp {
9+
10+
__global__ void connectionAtomicAddKernel(uint64_t* dst, uint64_t value) {
11+
atomicFetchAdd(dst, value, memoryOrderRelaxed);
12+
}
13+
14+
const void* connectionAtomicAddKernelFunc() {
15+
static const void* func = reinterpret_cast<const void*>(&connectionAtomicAddKernel);
16+
return func;
17+
}
18+
19+
} // namespace mscclpp

src/context.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ void CudaIpcStream::memcpyH2D(void *dst, const void *src, size_t nbytes) {
3535
dirty_ = true;
3636
}
3737

38+
void CudaIpcStream::launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem) {
39+
setStreamIfNeeded();
40+
MSCCLPP_CUDATHROW(cudaLaunchKernel(func, gridDim, blockDim, args, sharedMem, *stream_));
41+
}
42+
3843
void CudaIpcStream::sync() {
3944
setStreamIfNeeded();
4045
if (dirty_) {

src/include/connection.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ class CudaIpcConnection : public Connection {
2828

2929
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
3030
uint64_t size) override;
31+
3132
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
3233

34+
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;
35+
3336
void flush(int64_t timeoutUsec) override;
3437
};
3538

@@ -51,8 +54,11 @@ class IBConnection : public Connection {
5154

5255
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
5356
uint64_t size) override;
57+
5458
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
5559

60+
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;
61+
5662
void flush(int64_t timeoutUsec) override;
5763
};
5864

@@ -82,8 +88,11 @@ class EthernetConnection : public Connection {
8288

8389
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
8490
uint64_t size) override;
91+
8592
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
8693

94+
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;
95+
8796
void flush(int64_t timeoutUsec) override;
8897
};
8998

src/include/connection_kernels.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
#ifndef MSCCLPP_CONNECTION_KERNEL_HPP_
5+
#define MSCCLPP_CONNECTION_KERNEL_HPP_
6+
7+
namespace mscclpp {
8+
9+
const void *connectionAtomicAddKernelFunc();
10+
11+
} // namespace mscclpp
12+
13+
#endif // MSCCLPP_CONNECTION_KERNEL_HPP_

src/include/context.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class CudaIpcStream {
2828

2929
void memcpyH2D(void *dst, const void *src, size_t nbytes);
3030

31+
void launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem);
32+
3133
void sync();
3234

3335
operator cudaStream_t() const { return *stream_; }

0 commit comments

Comments
 (0)