Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ add_library(
src/request.cpp
src/request_am.cpp
src/request_data.cpp
src/request_flush.cpp
src/request_helper.cpp
src/request_mem.cpp
src/request_stream.cpp
src/request_tag.cpp
src/request_tag_multi.cpp
Expand Down
15 changes: 15 additions & 0 deletions cpp/include/ucxx/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class Notifier;
class RemoteKey;
class Request;
class RequestAm;
class RequestFlush;
class RequestMem;
class RequestStream;
class RequestTag;
class RequestTagMulti;
Expand Down Expand Up @@ -75,6 +77,12 @@ std::shared_ptr<RequestAm> createRequestAm(
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestFlush> createRequestFlush(std::shared_ptr<Component> endpointOrWorker,
const data::Flush requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::StreamSend, data::StreamReceive> requestData,
Expand All @@ -87,6 +95,13 @@ std::shared_ptr<RequestTag> createRequestTag(
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestMem> createRequestMem(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::MemPut, data::MemGet> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMulti(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::TagMultiSend, data::TagMultiReceive> requestData,
Expand Down
149 changes: 149 additions & 0 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,130 @@ class Endpoint : public Component {
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory put operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the remote data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the destination remote memory address to write to.
* @param[in] rkey the remote memory key associated with the remote memory
* address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memPut(void* buffer,
size_t length,
uint64_t remote_addr,
ucp_rkey_h rkey,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory put operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the remote data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteKey the remote memory key associated with the remote memory
* address.
* @param[in] remoteAddrOffset the destination remote memory address offset where to
* start writing to, `0` means start writing from beginning
* of the base address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memPut(void* buffer,
size_t length,
std::shared_ptr<ucxx::RemoteKey> remoteKey,
uint64_t remoteAddrOffset = 0,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory get operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the local data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the source remote memory address to read from.
* @param[in] rkey the remote memory key associated with the remote memory
* address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memGet(void* buffer,
size_t length,
uint64_t remoteAddr,
ucp_rkey_h rkey,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory get operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the local data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteKey the remote memory key associated with the remote memory
* address.
* @param[in] remoteAddrOffset the destination remote memory address offset where to
* start reading from, `0` means start writing from
* beginning of the base address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memGet(void* buffer,
size_t length,
std::shared_ptr<ucxx::RemoteKey> remoteKey,
uint64_t remoteAddrOffset = 0,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a stream send operation.
*
Expand Down Expand Up @@ -511,6 +635,31 @@ class Endpoint : public Component {
const TagMask tagMask,
const bool enablePythonFuture);

/**
* @brief Enqueue a flush operation.
*
* Enqueue request to flush outstanding AMO (Atomic Memory Operation) and RMA (Remote
* Memory Access) operations on the endpoint, returning a pointer to a request object that
* can be later awaited and checked for errors. This is a non-blocking operation, and its
* status must be verified from the resulting request object to confirm the flush
* operation has completed successfully.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
* @param[in] callbackFunction user-defined callback function to call upon completion.
* @param[in] callbackData user-defined data to pass to the `callbackFunction`.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> flush(const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Get `ucxx::Worker` component from a worker or listener object.
*
Expand Down
83 changes: 81 additions & 2 deletions cpp/include/ucxx/request_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,82 @@ class AmReceive {
AmReceive();
};

/**
* @brief Data for a flush operation.
*
* Type identifying a flush operation and containing data specific to this request type.
*/
class Flush {
public:
/**
* @brief Constructor for flush-specific data.
*
* Construct an object containing flush-specific data.
*/
Flush();
};

/**
* @brief Data for a memory send.
*
* Type identifying a memory send operation and containing data specific to this request type.
*/
class MemPut {
public:
const void* _buffer{nullptr}; ///< The raw pointer where data to be sent is stored.
const size_t _length{0}; ///< The length of the message.
const uint64_t _remoteAddr{0}; ///< Remote memory address to write to.
const ucp_rkey_h _rkey{}; ///< UCX remote key associated with the remote memory address.

/**
* @brief Constructor for memory-specific data.
*
* Construct an object containing memory-specific data.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the destination remote memory address to write to.
* @param[in] rkey the remote memory key associated with the remote memory address.
*/
explicit MemPut(const decltype(_buffer) buffer,
const decltype(_length) length,
const decltype(_remoteAddr) remoteAddr,
const decltype(_rkey) rkey);

MemPut() = delete;
};

/**
* @brief Data for a memory receive.
*
* Type identifying a memory receive operation and containing data specific to this request
* type.
*/
class MemGet {
public:
void* _buffer{nullptr}; ///< The raw pointer where received data should be stored.
const size_t _length{0}; ///< The length of the message.
const uint64_t _remoteAddr{0}; ///< Remote memory address to read from.
const ucp_rkey_h _rkey{}; ///< UCX remote key associated with the remote memory address.

/**
* @brief Constructor for memory-specific data.
*
* Construct an object containing memory-specific data.
*
* @param[out] buffer a raw pointer to the received data.
* @param[in] length the size in bytes of the tag message to be received.
* @param[in] remoteAddr the source remote memory address to read from.
* @param[in] rkey the remote memory key associated with the remote memory address.
*/
explicit MemGet(decltype(_buffer) buffer,
const decltype(_length) length,
const decltype(_remoteAddr) remoteAddr,
const decltype(_rkey) rkey);

MemGet() = delete;
};

/**
* @brief Data for a Stream send.
*
Expand Down Expand Up @@ -127,7 +203,7 @@ class TagSend {
const ::ucxx::Tag _tag{0}; ///< Tag to match

/**
* @brief Constructor for tag/multi-buffer tag-specific data.
* @brief Constructor for tag-specific data.
*
* Construct an object containing tag-specific data.
*
Expand Down Expand Up @@ -156,7 +232,7 @@ class TagReceive {
const ::ucxx::TagMask _tagMask{0}; ///< Tag mask to use

/**
* @brief Constructor send tag-specific data.
* @brief Constructor for tag-specific data.
*
* Construct an object containing send tag-specific data.
*
Expand Down Expand Up @@ -231,6 +307,9 @@ class TagMultiReceive {
using RequestData = std::variant<std::monostate,
AmSend,
AmReceive,
Flush,
MemPut,
MemGet,
StreamSend,
StreamReceive,
TagSend,
Expand Down
Loading