Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ add_library(
src/request.cpp
src/request_am.cpp
src/request_data.cpp
src/request_flush.cpp
src/request_helper.cpp
src/request_mem.cpp
src/request_stream.cpp
src/request_tag.cpp
src/request_tag_multi.cpp
Expand Down
15 changes: 15 additions & 0 deletions cpp/include/ucxx/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class Listener;
class Notifier;
class Request;
class RequestAm;
class RequestFlush;
class RequestMem;
class RequestStream;
class RequestTag;
class RequestTagMulti;
Expand Down Expand Up @@ -63,6 +65,12 @@ std::shared_ptr<RequestAm> createRequestAm(
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestFlush> createRequestFlush(std::shared_ptr<Component> endpointOrWorker,
const std::variant<data::Flush> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::StreamSend, data::StreamReceive> requestData,
Expand All @@ -75,6 +83,13 @@ std::shared_ptr<RequestTag> createRequestTag(
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestMem> createRequestMem(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::MemPut, data::MemGet> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMulti(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::TagMultiSend, data::TagMultiReceive> requestData,
Expand Down
149 changes: 149 additions & 0 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,130 @@ class Endpoint : public Component {
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory put operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the remote data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the destination remote memory address to write to.
* @param[in] rkey the remote memory key associated with the remote memory
* address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memPut(void* buffer,
size_t length,
uint64_t remote_addr,
ucp_rkey_h rkey,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory put operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the remote data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteKey the remote memory key associated with the remote memory
* address.
* @param[in] remoteAddrOffset the destination remote memory address offset where to
* start writing to, `0` means start writing from beginning
* of the base address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memPut(void* buffer,
size_t length,
std::shared_ptr<ucxx::RemoteKey> remoteKey,
uint64_t remoteAddrOffset = 0,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory get operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the local data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the source remote memory address to read from.
* @param[in] rkey the remote memory key associated with the remote memory
* address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memGet(void* buffer,
size_t length,
uint64_t remoteAddr,
ucp_rkey_h rkey,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a memory get operation.
*
* Enqueue a memory operation, returning a `std::shared<ucxx::Request>` that can be later
* awaited and checked for errors. This is a non-blocking operation, and the status of the
* transfer must be verified from the resulting request object before both local and
* remote data can be released and the local data can be consumed.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteKey the remote memory key associated with the remote memory
* address.
* @param[in] remoteAddrOffset the destination remote memory address offset where to
* start reading from, `0` means start writing from
* beginning of the base address.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> memGet(void* buffer,
size_t length,
std::shared_ptr<ucxx::RemoteKey> remoteKey,
uint64_t remoteAddrOffset = 0,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a stream send operation.
*
Expand Down Expand Up @@ -511,6 +635,31 @@ class Endpoint : public Component {
const TagMask tagMask,
const bool enablePythonFuture);

/**
* @brief Enqueue a flush operation.
*
* Enqueue request to flush outstanding AMO (Atomic Memory Operation) and RMA (Remote
* Memory Access) operations on the endpoint, returning a pointer to a request object that
* can be later awaited and checked for errors. This is a non-blocking operation, and its
* status must be verified from the resulting request object to confirm the flush
* operation has completed successfully.
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
* @param[in] callbackFunction user-defined callback function to call upon completion.
* @param[in] callbackData user-defined data to pass to the `callbackFunction`.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> flush(const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Get `ucxx::Worker` component from a worker or listener object.
*
Expand Down
83 changes: 81 additions & 2 deletions cpp/include/ucxx/request_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,82 @@ class AmReceive {
AmReceive();
};

/**
* @brief Data for a flush operation.
*
* Type identifying a flush operation and containing data specific to this request type.
*/
class Flush {
public:
/**
* @brief Constructor for flush-specific data.
*
* Construct an object containing flush-specific data.
*/
Flush();
};

/**
* @brief Data for a memory send.
*
* Type identifying a memory send operation and containing data specific to this request type.
*/
class MemPut {
public:
const void* _buffer{nullptr}; ///< The raw pointer where data to be sent is stored.
const size_t _length{0}; ///< The length of the message.
const uint64_t _remoteAddr{0}; ///< Remote memory address to write to.
const ucp_rkey_h _rkey{}; ///< UCX remote key associated with the remote memory address.

/**
* @brief Constructor for memory-specific data.
*
* Construct an object containing memory-specific data.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] remoteAddr the destination remote memory address to write to.
* @param[in] rkey the remote memory key associated with the remote memory address.
*/
explicit MemPut(const decltype(_buffer) buffer,
const decltype(_length) length,
const decltype(_remoteAddr) remoteAddr,
const decltype(_rkey) rkey);

MemPut() = delete;
};

/**
* @brief Data for a memory receive.
*
* Type identifying a memory receive operation and containing data specific to this request
* type.
*/
class MemGet {
public:
void* _buffer{nullptr}; ///< The raw pointer where received data should be stored.
const size_t _length{0}; ///< The length of the message.
const uint64_t _remoteAddr{0}; ///< Remote memory address to read from.
const ucp_rkey_h _rkey{}; ///< UCX remote key associated with the remote memory address.

/**
* @brief Constructor for memory-specific data.
*
* Construct an object containing memory-specific data.
*
* @param[out] buffer a raw pointer to the received data.
* @param[in] length the size in bytes of the tag message to be received.
* @param[in] remoteAddr the source remote memory address to read from.
* @param[in] rkey the remote memory key associated with the remote memory address.
*/
explicit MemGet(decltype(_buffer) buffer,
const decltype(_length) length,
const decltype(_remoteAddr) remoteAddr,
const decltype(_rkey) rkey);

MemGet() = delete;
};

/**
* @brief Data for a Stream send.
*
Expand Down Expand Up @@ -127,7 +203,7 @@ class TagSend {
const ::ucxx::Tag _tag{0}; ///< Tag to match

/**
* @brief Constructor for tag/multi-buffer tag-specific data.
* @brief Constructor for tag-specific data.
*
* Construct an object containing tag-specific data.
*
Expand Down Expand Up @@ -156,7 +232,7 @@ class TagReceive {
const ::ucxx::TagMask _tagMask{0}; ///< Tag mask to use

/**
* @brief Constructor send tag-specific data.
* @brief Constructor for tag-specific data.
*
* Construct an object containing send tag-specific data.
*
Expand Down Expand Up @@ -231,6 +307,9 @@ class TagMultiReceive {
using RequestData = std::variant<std::monostate,
AmSend,
AmReceive,
Flush,
MemPut,
MemGet,
StreamSend,
StreamReceive,
TagSend,
Expand Down
Loading